diff --git a/docs/environment-variables.md b/docs/environment-variables.md index a0a3cfd8cf5..5e5bfb32d91 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -287,3 +287,10 @@ those. - `GRAPH_ENABLE_SQL_QUERIES`: Enable the experimental [SQL query interface](implementation/sql-interface.md). (default: false) +- `GRAPH_STORE_ACCOUNT_LIKE_SCAN_INTERVAL_HOURS`: If set, enables an experimental job that + periodically scans for entity tables that may benefit from an [account-like optimization](https://thegraph.com/docs/en/indexing/tooling/graph-node/#account-like-optimisation) and marks them with an + account-like flag. The value is the interval in hours at which the job + should run. The job reads data from the `info.table_stats` materialized view, which refreshes every six hours. Expects an integer value, e.g., 24. Requires also setting + `GRAPH_STORE_ACCOUNT_LIKE_MIN_VERSIONS_COUNT` and `GRAPH_STORE_ACCOUNT_LIKE_MAX_UNIQUE_RATIO`. +- `GRAPH_STORE_ACCOUNT_LIKE_MIN_VERSIONS_COUNT`: Sets the minimum total number of versions a table must have to be considered for account-like flagging. Expects a positive integer value. No default value. +- `GRAPH_STORE_ACCOUNT_LIKE_MAX_UNIQUE_RATIO`: Sets the maximum unique entities to version ratio (e.g., 0.01 ≈ 1:100 entity-to-version ratio). diff --git a/graph/src/env/store.rs b/graph/src/env/store.rs index e267b28d8ce..6d51b04ba34 100644 --- a/graph/src/env/store.rs +++ b/graph/src/env/store.rs @@ -149,6 +149,18 @@ pub struct EnvVarsStore { /// The number of rows to fetch from the foreign data wrapper in one go, /// this will be set as the option 'fetch_size' on all foreign servers pub fdw_fetch_size: usize, + + /// Experimental feature to automatically set the account-like flag on eligible tables + /// Set by the environment variable `GRAPH_STORE_ACCOUNT_LIKE_SCAN_INTERVAL_HOURS` + /// If not set, the job is disabled. + /// Utilizes materialized view stats that refresh every 6 hours to discover heavy-write tables. + pub account_like_scan_interval_hours: Option, + /// Set by the environment variable `GRAPH_STORE_ACCOUNT_LIKE_MIN_VERSIONS_COUNT` + /// Tables must have at least this many total versions to be considered. + pub account_like_min_versions_count: Option, + /// Set by the environment variable `GRAPH_STORE_ACCOUNT_LIKE_MAX_UNIQUE_RATIO` + /// Defines the maximum share of unique entities (e.g. 0.01 for a 1:100 entity-to-version ratio). + pub account_like_max_unique_ratio: Option, } // This does not print any values avoid accidentally leaking any sensitive env vars @@ -206,6 +218,9 @@ impl TryFrom for EnvVarsStore { disable_block_cache_for_lookup: x.disable_block_cache_for_lookup, insert_extra_cols: x.insert_extra_cols, fdw_fetch_size: x.fdw_fetch_size, + account_like_scan_interval_hours: x.account_like_scan_interval_hours, + account_like_min_versions_count: x.account_like_min_versions_count, + account_like_max_unique_ratio: x.account_like_max_unique_ratio.map(|r| r.0), }; if let Some(timeout) = vars.batch_timeout { if timeout < 2 * vars.batch_target_duration { @@ -217,6 +232,16 @@ impl TryFrom for EnvVarsStore { if vars.batch_workers < 1 { bail!("GRAPH_STORE_BATCH_WORKERS must be at least 1"); } + if vars.account_like_scan_interval_hours.is_some() + && (vars.account_like_min_versions_count.is_none() + || vars.account_like_max_unique_ratio.is_none()) + { + bail!( + "Both GRAPH_STORE_ACCOUNT_LIKE_MIN_VERSIONS_COUNT and \ + GRAPH_STORE_ACCOUNT_LIKE_MAX_UNIQUE_RATIO must be set when \ + GRAPH_STORE_ACCOUNT_LIKE_SCAN_INTERVAL_HOURS is set" + ); + } Ok(vars) } } @@ -295,6 +320,12 @@ pub struct InnerStore { insert_extra_cols: usize, #[envconfig(from = "GRAPH_STORE_FDW_FETCH_SIZE", default = "1000")] fdw_fetch_size: usize, + #[envconfig(from = "GRAPH_STORE_ACCOUNT_LIKE_SCAN_INTERVAL_HOURS")] + account_like_scan_interval_hours: Option, + #[envconfig(from = "GRAPH_STORE_ACCOUNT_LIKE_MIN_VERSIONS_COUNT")] + account_like_min_versions_count: Option, + #[envconfig(from = "GRAPH_STORE_ACCOUNT_LIKE_MAX_UNIQUE_RATIO")] + account_like_max_unique_ratio: Option, } #[derive(Clone, Copy, Debug)] diff --git a/store/postgres/migrations/2025-10-30-094807-0000_table_stats_view/down.sql b/store/postgres/migrations/2025-10-30-094807-0000_table_stats_view/down.sql new file mode 100644 index 00000000000..5881e08a7f8 --- /dev/null +++ b/store/postgres/migrations/2025-10-30-094807-0000_table_stats_view/down.sql @@ -0,0 +1 @@ +DROP MATERIALIZED VIEW IF EXISTS info.table_stats; diff --git a/store/postgres/migrations/2025-10-30-094807-0000_table_stats_view/up.sql b/store/postgres/migrations/2025-10-30-094807-0000_table_stats_view/up.sql new file mode 100644 index 00000000000..6852ded87a8 --- /dev/null +++ b/store/postgres/migrations/2025-10-30-094807-0000_table_stats_view/up.sql @@ -0,0 +1,35 @@ +CREATE MATERIALIZED VIEW info.table_stats AS +WITH table_info AS ( + SELECT + s.schemaname AS schema_name, + sd.id AS deployment_id, + sd.subgraph AS subgraph, + s.tablename AS table_name, + c.reltuples AS total_row_count, + s.n_distinct AS n_distinct + FROM pg_stats s + JOIN pg_namespace n ON n.nspname = s.schemaname + JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.tablename + JOIN subgraphs.deployment sd ON sd.id::text = substring(s.schemaname, 4) + WHERE + s.attname = 'id' + AND s.schemaname LIKE 'sgd%' + AND c.relname NOT IN ('poi2$', 'data_sources$') +) +SELECT + schema_name, + deployment_id AS deployment, + subgraph, + table_name, + CASE + WHEN n_distinct < 0 THEN (-n_distinct) * total_row_count + ELSE n_distinct + END::bigint AS entities, + total_row_count::bigint AS versions, + CASE + WHEN total_row_count = 0 THEN 0::float8 + WHEN n_distinct < 0 THEN (-n_distinct)::float8 + ELSE n_distinct::numeric / total_row_count::numeric + END AS ratio +FROM table_info +WITH NO DATA; diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index f9aa0dfde75..0be292be80e 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -811,6 +811,50 @@ impl DeploymentStore { .await } + pub(crate) async fn identify_account_like_candidates( + &self, + min_versions: u64, + ratio: f64, + ) -> Result, StoreError> { + #[derive(QueryableByName)] + struct TableStat { + #[diesel(sql_type = diesel::sql_types::Text)] + subgraph: String, + #[diesel(sql_type = diesel::sql_types::Text)] + table_name: String, + } + let result = self + .with_conn(move |conn, _| { + let query = r#" + SELECT + stats.subgraph, + stats.table_name + FROM info.table_stats AS stats + LEFT JOIN subgraphs.table_stats ts + ON ts.deployment = stats.deployment + AND ts.table_name = stats.table_name + WHERE + stats.versions > $1 + AND stats.ratio < $2 + AND ts.is_account_like IS NOT TRUE + "#; + + diesel::sql_query(query) + .bind::(min_versions as i64) + .bind::(ratio) + .load::(conn) + .map_err(Into::into) + }) + .await; + + result.map(|tables| { + tables + .into_iter() + .map(|table_stat| (table_stat.subgraph, table_stat.table_name)) + .collect() + }) + } + pub(crate) async fn set_account_like( &self, site: Arc, @@ -1845,10 +1889,11 @@ impl DeploymentStore { // We hardcode our materialized views, but could also use // pg_matviews to list all of them, though that might inadvertently // refresh materialized views that operators created themselves - const VIEWS: [&str; 3] = [ + const VIEWS: [&str; 4] = [ "info.table_sizes", "info.subgraph_sizes", "info.chain_sizes", + "info.table_stats", ]; store .with_conn(|conn, cancel| { diff --git a/store/postgres/src/jobs.rs b/store/postgres/src/jobs.rs index d8177667183..1ef7b18543b 100644 --- a/store/postgres/src/jobs.rs +++ b/store/postgres/src/jobs.rs @@ -47,6 +47,13 @@ pub fn register( Arc::new(RefreshMaterializedView::new(store.subgraph_store())), 6 * ONE_HOUR, ); + + if let Some(interval) = ENV_VARS.store.account_like_scan_interval_hours { + runner.register( + Arc::new(AccountLikeJob::new(store.subgraph_store())), + interval * ONE_HOUR, + ); + } } /// A job that vacuums `subgraphs.deployment` and `subgraphs.head`. With a @@ -234,3 +241,37 @@ impl Job for UnusedJob { } } } + +struct AccountLikeJob { + store: Arc, +} + +impl AccountLikeJob { + fn new(store: Arc) -> AccountLikeJob { + AccountLikeJob { store } + } +} + +#[async_trait] +impl Job for AccountLikeJob { + fn name(&self) -> &str { + "Set account-like flag on eligible tables" + } + + async fn run(&self, logger: &Logger) { + // Safe to unwrap due to a startup validation + // which ensures these values are present when account_like_scan_interval_hours is set. + let min_versions_count = ENV_VARS.store.account_like_min_versions_count.unwrap(); + let ratio = ENV_VARS.store.account_like_max_unique_ratio.unwrap(); + + self.store + .identify_and_set_account_like(logger, min_versions_count, ratio) + .await + .unwrap_or_else(|e| { + error!( + logger, + "Failed to set account-like flag on eligible tables: {}", e + ) + }); + } +} diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 7f5993735c2..2b42914af99 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -1220,6 +1220,43 @@ impl SubgraphStoreInner { store.drop_index(site, index_name).await } + pub(crate) async fn identify_and_set_account_like( + &self, + logger: &Logger, + min_records: u64, + ratio: f64, + ) -> Result<(), StoreError> { + for (_shard, store) in &self.stores { + let candidates = store + .identify_account_like_candidates(min_records, ratio) + .await?; + + graph::slog::debug!( + logger, + "Found {} account-like candidates in shard {}", + candidates.len(), + _shard + ); + + for (subgraph, table_name) in candidates { + graph::slog::debug!( + logger, + "Setting table {} as account-like for deployment {}", + table_name, + subgraph + ); + + let hash = DeploymentHash::new(subgraph.clone()).map_err(|_| { + anyhow!("Failed to create deployment hash for subgraph: {subgraph}") + })?; + let (store, site) = self.store(&hash)?; + store.set_account_like(site, &table_name, true).await?; + } + } + + Ok(()) + } + pub async fn set_account_like( &self, deployment: &DeploymentLocator,