Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,10 @@ those.
- `GRAPH_ENABLE_SQL_QUERIES`: Enable the experimental [SQL query
interface](implementation/sql-interface.md).
(default: false)
- `GRAPH_STORE_ACCOUNT_LIKE_SCAN_INTERVAL_HOURS`: If set, enables an experimental job that
periodically scans for entity tables that may benefit from an [account-like optimization](https://thegraph.com/docs/en/indexing/tooling/graph-node/#account-like-optimisation) and marks them with an
account-like flag. The value is the interval in hours at which the job
should run. The job reads data from the `info.table_stats` materialized view, which refreshes every six hours. Expects an integer value, e.g., 24. Requires also setting
`GRAPH_STORE_ACCOUNT_LIKE_MIN_VERSIONS_COUNT` and `GRAPH_STORE_ACCOUNT_LIKE_MAX_UNIQUE_RATIO`.
- `GRAPH_STORE_ACCOUNT_LIKE_MIN_VERSIONS_COUNT`: Sets the minimum total number of versions a table must have to be considered for account-like flagging. Expects a positive integer value. No default value.
- `GRAPH_STORE_ACCOUNT_LIKE_MAX_UNIQUE_RATIO`: Sets the maximum unique entities to version ratio (e.g., 0.01 ≈ 1:100 entity-to-version ratio).
31 changes: 31 additions & 0 deletions graph/src/env/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,18 @@ pub struct EnvVarsStore {
/// The number of rows to fetch from the foreign data wrapper in one go,
/// this will be set as the option 'fetch_size' on all foreign servers
pub fdw_fetch_size: usize,

/// Experimental feature to automatically set the account-like flag on eligible tables
/// Set by the environment variable `GRAPH_STORE_ACCOUNT_LIKE_SCAN_INTERVAL_HOURS`
/// If not set, the job is disabled.
/// Utilizes materialized view stats that refresh every 6 hours to discover heavy-write tables.
pub account_like_scan_interval_hours: Option<u32>,
/// Set by the environment variable `GRAPH_STORE_ACCOUNT_LIKE_MIN_VERSIONS_COUNT`
/// Tables must have at least this many total versions to be considered.
pub account_like_min_versions_count: Option<u64>,
/// Set by the environment variable `GRAPH_STORE_ACCOUNT_LIKE_MAX_UNIQUE_RATIO`
/// Defines the maximum share of unique entities (e.g. 0.01 for a 1:100 entity-to-version ratio).
pub account_like_max_unique_ratio: Option<f64>,
}

// This does not print any values avoid accidentally leaking any sensitive env vars
Expand Down Expand Up @@ -206,6 +218,9 @@ impl TryFrom<InnerStore> for EnvVarsStore {
disable_block_cache_for_lookup: x.disable_block_cache_for_lookup,
insert_extra_cols: x.insert_extra_cols,
fdw_fetch_size: x.fdw_fetch_size,
account_like_scan_interval_hours: x.account_like_scan_interval_hours,
account_like_min_versions_count: x.account_like_min_versions_count,
account_like_max_unique_ratio: x.account_like_max_unique_ratio.map(|r| r.0),
};
if let Some(timeout) = vars.batch_timeout {
if timeout < 2 * vars.batch_target_duration {
Expand All @@ -217,6 +232,16 @@ impl TryFrom<InnerStore> for EnvVarsStore {
if vars.batch_workers < 1 {
bail!("GRAPH_STORE_BATCH_WORKERS must be at least 1");
}
if vars.account_like_scan_interval_hours.is_some()
&& (vars.account_like_min_versions_count.is_none()
|| vars.account_like_max_unique_ratio.is_none())
{
bail!(
"Both GRAPH_STORE_ACCOUNT_LIKE_MIN_VERSIONS_COUNT and \
GRAPH_STORE_ACCOUNT_LIKE_MAX_UNIQUE_RATIO must be set when \
GRAPH_STORE_ACCOUNT_LIKE_SCAN_INTERVAL_HOURS is set"
);
}
Ok(vars)
}
}
Expand Down Expand Up @@ -295,6 +320,12 @@ pub struct InnerStore {
insert_extra_cols: usize,
#[envconfig(from = "GRAPH_STORE_FDW_FETCH_SIZE", default = "1000")]
fdw_fetch_size: usize,
#[envconfig(from = "GRAPH_STORE_ACCOUNT_LIKE_SCAN_INTERVAL_HOURS")]
account_like_scan_interval_hours: Option<u32>,
#[envconfig(from = "GRAPH_STORE_ACCOUNT_LIKE_MIN_VERSIONS_COUNT")]
account_like_min_versions_count: Option<u64>,
#[envconfig(from = "GRAPH_STORE_ACCOUNT_LIKE_MAX_UNIQUE_RATIO")]
account_like_max_unique_ratio: Option<ZeroToOneF64>,
}

#[derive(Clone, Copy, Debug)]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP MATERIALIZED VIEW IF EXISTS info.table_stats;
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
CREATE MATERIALIZED VIEW info.table_stats AS
WITH table_info AS (
SELECT
s.schemaname AS schema_name,
sd.id AS deployment_id,
sd.subgraph AS subgraph,
s.tablename AS table_name,
c.reltuples AS total_row_count,
s.n_distinct AS n_distinct
FROM pg_stats s
JOIN pg_namespace n ON n.nspname = s.schemaname
JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.tablename
JOIN subgraphs.deployment sd ON sd.id::text = substring(s.schemaname, 4)
WHERE
s.attname = 'id'
AND s.schemaname LIKE 'sgd%'
AND c.relname NOT IN ('poi2$', 'data_sources$')
)
SELECT
schema_name,
deployment_id AS deployment,
subgraph,
table_name,
CASE
WHEN n_distinct < 0 THEN (-n_distinct) * total_row_count
ELSE n_distinct
END::bigint AS entities,
total_row_count::bigint AS versions,
CASE
WHEN total_row_count = 0 THEN 0::float8
WHEN n_distinct < 0 THEN (-n_distinct)::float8
ELSE n_distinct::numeric / total_row_count::numeric
END AS ratio
FROM table_info
WITH NO DATA;
47 changes: 46 additions & 1 deletion store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,50 @@ impl DeploymentStore {
.await
}

pub(crate) async fn identify_account_like_candidates(
&self,
min_versions: u64,
ratio: f64,
) -> Result<Vec<(String, String)>, StoreError> {
#[derive(QueryableByName)]
struct TableStat {
#[diesel(sql_type = diesel::sql_types::Text)]
subgraph: String,
#[diesel(sql_type = diesel::sql_types::Text)]
table_name: String,
}
let result = self
.with_conn(move |conn, _| {
let query = r#"
SELECT
stats.subgraph,
stats.table_name
FROM info.table_stats AS stats
LEFT JOIN subgraphs.table_stats ts
ON ts.deployment = stats.deployment
AND ts.table_name = stats.table_name
WHERE
stats.versions > $1
AND stats.ratio < $2
AND ts.is_account_like IS NOT TRUE
"#;

diesel::sql_query(query)
.bind::<diesel::sql_types::BigInt, _>(min_versions as i64)
.bind::<diesel::sql_types::Double, _>(ratio)
.load::<TableStat>(conn)
.map_err(Into::into)
})
.await;

result.map(|tables| {
tables
.into_iter()
.map(|table_stat| (table_stat.subgraph, table_stat.table_name))
.collect()
})
}

pub(crate) async fn set_account_like(
&self,
site: Arc<Site>,
Expand Down Expand Up @@ -1845,10 +1889,11 @@ impl DeploymentStore {
// We hardcode our materialized views, but could also use
// pg_matviews to list all of them, though that might inadvertently
// refresh materialized views that operators created themselves
const VIEWS: [&str; 3] = [
const VIEWS: [&str; 4] = [
"info.table_sizes",
"info.subgraph_sizes",
"info.chain_sizes",
"info.table_stats",
];
store
.with_conn(|conn, cancel| {
Expand Down
41 changes: 41 additions & 0 deletions store/postgres/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ pub fn register(
Arc::new(RefreshMaterializedView::new(store.subgraph_store())),
6 * ONE_HOUR,
);

if let Some(interval) = ENV_VARS.store.account_like_scan_interval_hours {
runner.register(
Arc::new(AccountLikeJob::new(store.subgraph_store())),
interval * ONE_HOUR,
);
}
}

/// A job that vacuums `subgraphs.deployment` and `subgraphs.head`. With a
Expand Down Expand Up @@ -234,3 +241,37 @@ impl Job for UnusedJob {
}
}
}

struct AccountLikeJob {
store: Arc<SubgraphStore>,
}

impl AccountLikeJob {
fn new(store: Arc<SubgraphStore>) -> AccountLikeJob {
AccountLikeJob { store }
}
}

#[async_trait]
impl Job for AccountLikeJob {
fn name(&self) -> &str {
"Set account-like flag on eligible tables"
}

async fn run(&self, logger: &Logger) {
// Safe to unwrap due to a startup validation
// which ensures these values are present when account_like_scan_interval_hours is set.
let min_versions_count = ENV_VARS.store.account_like_min_versions_count.unwrap();
let ratio = ENV_VARS.store.account_like_max_unique_ratio.unwrap();

self.store
.identify_and_set_account_like(logger, min_versions_count, ratio)
.await
.unwrap_or_else(|e| {
error!(
logger,
"Failed to set account-like flag on eligible tables: {}", e
)
});
}
}
37 changes: 37 additions & 0 deletions store/postgres/src/subgraph_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1220,6 +1220,43 @@ impl SubgraphStoreInner {
store.drop_index(site, index_name).await
}

pub(crate) async fn identify_and_set_account_like(
&self,
logger: &Logger,
min_records: u64,
ratio: f64,
) -> Result<(), StoreError> {
for (_shard, store) in &self.stores {
let candidates = store
.identify_account_like_candidates(min_records, ratio)
.await?;

graph::slog::debug!(
logger,
"Found {} account-like candidates in shard {}",
candidates.len(),
_shard
);

for (subgraph, table_name) in candidates {
graph::slog::debug!(
logger,
"Setting table {} as account-like for deployment {}",
table_name,
subgraph
);

let hash = DeploymentHash::new(subgraph.clone()).map_err(|_| {
anyhow!("Failed to create deployment hash for subgraph: {subgraph}")
})?;
let (store, site) = self.store(&hash)?;
store.set_account_like(site, &table_name, true).await?;
}
}

Ok(())
}

pub async fn set_account_like(
&self,
deployment: &DeploymentLocator,
Expand Down
Loading