From 2e465731967551bd212bf2d31f07f5fab66f5514 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 1 Apr 2026 16:18:34 -0700 Subject: [PATCH 1/6] store: Use adaptive batch size for call cache eviction The inner deletion loop in clear_stale_call_cache used a hardcoded batch size of 10,000. Replace it with AdaptiveBatchSize that starts at 100 and self-tunes based on actual query duration, avoiding both excessive lock contention on large caches and unnecessary round-trips on small ones. --- store/postgres/src/chain_store.rs | 30 ++++++++++++++++++++---------- store/postgres/src/vid_batcher.rs | 7 +++++++ 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index 237d4b1e669..a4cdab2bc4a 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -159,7 +159,10 @@ mod data { use std::iter::FromIterator; use std::str::FromStr; + use std::time::Instant; + use crate::transaction_receipt::RawTransactionReceipt; + use crate::vid_batcher::AdaptiveBatchSize; use super::JsonBlock; @@ -1649,12 +1652,12 @@ mod data { ) -> Result<(), Error> { let mut total_calls: usize = 0; let mut total_contracts: i64 = 0; - // We process contracts in batches to avoid loading too many entries into memory - // at once. Each contract can have many calls, so we also delete calls in batches. - // Note: The batch sizes were chosen based on experimentation. Potentially, they - // could be made configurable via ENV vars. + // We process contracts in batches to avoid loading too many + // entries into memory at once. Each contract can have many + // calls, so we delete calls in adaptive batches that + // self-tune based on query duration. let contracts_batch_size: i64 = 2000; - let cache_batch_size: usize = 10000; + let mut batch_size = AdaptiveBatchSize::with_size(100); // Limits the number of contracts to process if ttl_max_contracts is set. // Used also to adjust the final batch size, so we don't process more @@ -1704,20 +1707,23 @@ mod data { } loop { + let current_size = batch_size.size; + let start = Instant::now(); let next_batch = cache::table .select(cache::id) .filter(cache::contract_address.eq_any(&stale_contracts)) - .limit(cache_batch_size as i64) + .limit(current_size) .get_results::>(conn) .await?; let deleted_count = diesel::delete(cache::table.filter(cache::id.eq_any(&next_batch))) .execute(conn) .await?; + batch_size.adapt(start.elapsed()); total_calls += deleted_count; - if deleted_count < cache_batch_size { + if (deleted_count as i64) < current_size { break; } } @@ -1754,11 +1760,11 @@ mod data { SELECT id FROM {} WHERE contract_address = ANY($1) - LIMIT {} + LIMIT $2 ) DELETE FROM {} USING targets WHERE {}.id = targets.id", - call_cache.qname, cache_batch_size, call_cache.qname, call_cache.qname + call_cache.qname, call_cache.qname, call_cache.qname ); let delete_meta_query = format!( @@ -1806,14 +1812,18 @@ mod data { } loop { + let current_size = batch_size.size; + let start = Instant::now(); let deleted_count = sql_query(&delete_cache_query) .bind::, _>(&stale_contracts) + .bind::(current_size) .execute(conn) .await?; + batch_size.adapt(start.elapsed()); total_calls += deleted_count; - if deleted_count < cache_batch_size { + if (deleted_count as i64) < current_size { break; } } diff --git a/store/postgres/src/vid_batcher.rs b/store/postgres/src/vid_batcher.rs index 8cb0496bd86..7ea633058f8 100644 --- a/store/postgres/src/vid_batcher.rs +++ b/store/postgres/src/vid_batcher.rs @@ -35,6 +35,13 @@ pub(crate) struct AdaptiveBatchSize { } impl AdaptiveBatchSize { + pub fn with_size(size: i64) -> Self { + Self { + size, + target: ENV_VARS.store.batch_target_duration, + } + } + pub fn new(table: &Table) -> Self { let size = if table.columns.iter().any(|col| col.is_list()) { INITIAL_BATCH_SIZE_LIST From a2fe8870beb9317518e1b5ed244c4be4950bb9b1 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 1 Apr 2026 16:54:41 -0700 Subject: [PATCH 2/6] store: Create contract_address index before call cache eviction Add catalog::table_has_index() to check for index existence, and use it in clear_stale_call_cache to ensure a btree index on contract_address exists before running deletion queries. The index is created concurrently if missing, avoiding sequential scans on large call_cache tables. --- store/postgres/src/catalog.rs | 26 +++++++++++++++ store/postgres/src/chain_store.rs | 55 +++++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+) diff --git a/store/postgres/src/catalog.rs b/store/postgres/src/catalog.rs index 5f094548565..83bc7528900 100644 --- a/store/postgres/src/catalog.rs +++ b/store/postgres/src/catalog.rs @@ -913,6 +913,32 @@ pub(crate) async fn indexes_for_table( Ok(results.into_iter().map(|i| i.def).collect()) } +pub(crate) async fn table_has_index( + conn: &mut AsyncPgConnection, + schema_name: &str, + index_name: &str, +) -> Result { + #[derive(QueryableByName)] + #[allow(dead_code)] + struct Exists { + #[diesel(sql_type = diesel::sql_types::Integer)] + exists: i32, + } + + let exists = sql_query( + "SELECT 1 AS exists FROM pg_indexes \ + WHERE schemaname = $1 AND indexname = $2", + ) + .bind::(schema_name) + .bind::(index_name) + .get_result::(conn) + .await + .optional() + .map_err::(Into::into)?; + + Ok(exists.is_some()) +} + pub(crate) async fn drop_index( conn: &mut AsyncPgConnection, schema_name: &str, diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index a4cdab2bc4a..3d4f03902dd 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -1643,6 +1643,59 @@ mod data { } } + const CALL_CACHE_CONTRACT_ADDRESS_INDEX: &str = "call_cache_contract_address"; + + /// Ensure that an index on `contract_address` exists on the + /// call_cache table to speed up deletion queries. If the index does + /// not exist, create it concurrently. + async fn ensure_contract_address_index( + &self, + conn: &mut AsyncPgConnection, + logger: &Logger, + ) -> Result<(), Error> { + let (schema_name, table_qname) = match self { + Storage::Shared => ("public", "public.eth_call_cache".to_string()), + Storage::Private(Schema { + name, call_cache, .. + }) => (name.as_str(), call_cache.qname.clone()), + }; + + let has_index = crate::catalog::table_has_index( + conn, + schema_name, + Self::CALL_CACHE_CONTRACT_ADDRESS_INDEX, + ) + .await?; + + if !has_index { + let start = Instant::now(); + info!( + logger, + "Creating index {} on {}.contract_address; \ + this may take a long time", + Self::CALL_CACHE_CONTRACT_ADDRESS_INDEX, + table_qname + ); + conn.batch_execute(&format!( + "create index concurrently if not exists {} \ + on {}(contract_address)", + Self::CALL_CACHE_CONTRACT_ADDRESS_INDEX, + table_qname + )) + .await?; + let duration = start.elapsed(); + info!( + logger, + "Finished creating index {} on {}.contract_address in {:?}", + Self::CALL_CACHE_CONTRACT_ADDRESS_INDEX, + table_qname, + duration + ); + } + + Ok(()) + } + pub async fn clear_stale_call_cache( &self, conn: &mut AsyncPgConnection, @@ -1650,6 +1703,8 @@ mod data { ttl_days: i32, ttl_max_contracts: Option, ) -> Result<(), Error> { + self.ensure_contract_address_index(conn, logger).await?; + let mut total_calls: usize = 0; let mut total_contracts: i64 = 0; // We process contracts in batches to avoid loading too many From f501334a2ba21cfc2cb69795c3523ed979b977f3 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 2 Apr 2026 13:44:02 -0700 Subject: [PATCH 3/6] store: Use dynamic schema in clear_stale_call_cache --- store/postgres/src/chain_store.rs | 40 +++++++++++++------------------ 1 file changed, 16 insertions(+), 24 deletions(-) diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index 3d4f03902dd..d959f86611c 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -305,6 +305,11 @@ mod data { fn contract_address(&self) -> DynColumn { self.table.column::("contract_address") } + + fn accessed_at(&self) -> DynColumn { + self.table + .column::(Self::ACCESSED_AT) + } } #[derive(Clone, Debug)] @@ -1799,17 +1804,6 @@ mod data { call_meta, .. }) => { - let select_query = format!( - "WITH stale_contracts AS ( - SELECT contract_address - FROM {} - WHERE accessed_at < current_date - interval '{} days' - LIMIT $1 - ) - SELECT contract_address FROM stale_contracts", - call_meta.qname, ttl_days - ); - let delete_cache_query = format!( "WITH targets AS ( SELECT id @@ -1827,12 +1821,6 @@ mod data { call_meta.qname ); - #[derive(QueryableByName)] - struct ContractAddress { - #[diesel(sql_type = Bytea)] - contract_address: Vec, - } - loop { if let Some(0) = remaining_contracts(total_contracts) { info!( @@ -1848,13 +1836,17 @@ mod data { .map(|left| left.min(contracts_batch_size)) .unwrap_or(contracts_batch_size); - let stale_contracts: Vec> = sql_query(&select_query) - .bind::(batch_limit) - .load::(conn) - .await? - .into_iter() - .map(|r| r.contract_address) - .collect(); + let stale_contracts = call_meta + .table() + .select(call_meta.contract_address()) + .filter( + call_meta + .accessed_at() + .lt(diesel::dsl::date(diesel::dsl::now - ttl_days.days())), + ) + .limit(batch_limit) + .get_results::>(conn) + .await?; if stale_contracts.is_empty() { info!( From 90b82487c61132ae0368e07a06465fd8f3f15e62 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 2 Apr 2026 14:07:07 -0700 Subject: [PATCH 4/6] store: Refactor clear_stale_call_cache Refactor the code to separate the gymnastics caused by the two separate storage schemes from the overall control loop --- graph/src/blockchain/mock.rs | 4 +- graph/src/components/store/traits.rs | 4 +- node/src/bin/manager.rs | 8 +- node/src/manager/commands/chain.rs | 4 +- store/postgres/src/chain_store.rs | 336 ++++++++++++++------------- 5 files changed, 179 insertions(+), 177 deletions(-) diff --git a/graph/src/blockchain/mock.rs b/graph/src/blockchain/mock.rs index d5d26447c64..74d23b388a9 100644 --- a/graph/src/blockchain/mock.rs +++ b/graph/src/blockchain/mock.rs @@ -594,8 +594,8 @@ impl ChainStore for MockChainStore { } async fn clear_stale_call_cache( &self, - _ttl_days: i32, - _ttl_max_contracts: Option, + _ttl_days: usize, + _ttl_max_contracts: Option, ) -> Result<(), Error> { unimplemented!() } diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 0835666e533..4c39f182c01 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -660,8 +660,8 @@ pub trait ChainStore: ChainHeadStore { /// Clears stale call cache entries for the given TTL in days. async fn clear_stale_call_cache( &self, - ttl_days: i32, - ttl_max_contracts: Option, + ttl_days: usize, + ttl_max_contracts: Option, ) -> Result<(), Error>; /// Return the chain identifier for this store. diff --git a/node/src/bin/manager.rs b/node/src/bin/manager.rs index 2eb5f41fa27..684c3936070 100644 --- a/node/src/bin/manager.rs +++ b/node/src/bin/manager.rs @@ -619,11 +619,11 @@ pub enum CallCacheCommand { #[clap(long, conflicts_with_all = &["from", "to"])] remove_entire_cache: bool, /// Remove the cache for contracts that have not been accessed in the last days - #[clap(long, conflicts_with_all = &["from", "to", "remove-entire-cache"], value_parser = clap::value_parser!(i32).range(1..))] - ttl_days: Option, + #[clap(long, conflicts_with_all = &["from", "to", "remove-entire-cache"], value_parser = clap::value_parser!(u32).range(1..))] + ttl_days: Option, /// Limits the number of contracts to consider for cache removal when using --ttl_days - #[clap(long, conflicts_with_all = &["remove-entire-cache", "to", "from"], requires = "ttl_days", value_parser = clap::value_parser!(i64).range(1..))] - ttl_max_contracts: Option, + #[clap(long, conflicts_with_all = &["remove-entire-cache", "to", "from"], requires = "ttl_days", value_parser = clap::value_parser!(u64).range(1..))] + ttl_max_contracts: Option, /// Starting block number #[clap(long, short, conflicts_with = "remove-entire-cache", requires = "to")] from: Option, diff --git a/node/src/manager/commands/chain.rs b/node/src/manager/commands/chain.rs index 8afff467ffc..df50c5e4069 100644 --- a/node/src/manager/commands/chain.rs +++ b/node/src/manager/commands/chain.rs @@ -83,8 +83,8 @@ pub async fn clear_call_cache( pub async fn clear_stale_call_cache( chain_store: Arc, - ttl_days: i32, - ttl_max_contracts: Option, + ttl_days: usize, + ttl_max_contracts: Option, ) -> Result<(), Error> { println!( "Removing stale entries from the call cache for `{}`", diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index d959f86611c..1f0c8ab33cd 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -12,7 +12,7 @@ use graph::parking_lot::RwLock; use graph::prelude::alloy::primitives::B256; use graph::prelude::MetricsRegistry; use graph::prometheus::{CounterVec, GaugeVec}; -use graph::slog::Logger; +use graph::slog::{info, Logger}; use graph::stable_hash::crypto_stable_hash; use graph::util::herd_cache::HerdCache; @@ -37,6 +37,7 @@ use graph::prelude::{ use graph::{ensure, internal_error}; use self::recent_blocks_cache::RecentBlocksCache; +use crate::vid_batcher::AdaptiveBatchSize; use crate::AsyncPgConnection; use crate::{chain_head_listener::ChainHeadUpdateSender, pool::ConnectionPool}; @@ -162,7 +163,6 @@ mod data { use std::time::Instant; use crate::transaction_receipt::RawTransactionReceipt; - use crate::vid_batcher::AdaptiveBatchSize; use super::JsonBlock; @@ -1653,11 +1653,11 @@ mod data { /// Ensure that an index on `contract_address` exists on the /// call_cache table to speed up deletion queries. If the index does /// not exist, create it concurrently. - async fn ensure_contract_address_index( + pub(super) async fn ensure_contract_address_index( &self, conn: &mut AsyncPgConnection, logger: &Logger, - ) -> Result<(), Error> { + ) -> Result<(), StoreError> { let (schema_name, table_qname) = match self { Storage::Shared => ("public", "public.eth_call_cache".to_string()), Storage::Private(Schema { @@ -1701,189 +1701,120 @@ mod data { Ok(()) } - pub async fn clear_stale_call_cache( + /// Find up to `batch_limit` contract addresses that have not + /// been accessed in the last `ttl_days` days + pub(super) async fn stale_contracts( &self, conn: &mut AsyncPgConnection, - logger: &Logger, - ttl_days: i32, - ttl_max_contracts: Option, - ) -> Result<(), Error> { - self.ensure_contract_address_index(conn, logger).await?; - - let mut total_calls: usize = 0; - let mut total_contracts: i64 = 0; - // We process contracts in batches to avoid loading too many - // entries into memory at once. Each contract can have many - // calls, so we delete calls in adaptive batches that - // self-tune based on query duration. - let contracts_batch_size: i64 = 2000; - let mut batch_size = AdaptiveBatchSize::with_size(100); - - // Limits the number of contracts to process if ttl_max_contracts is set. - // Used also to adjust the final batch size, so we don't process more - // contracts than the set limit. - let remaining_contracts = |processed: i64| -> Option { - ttl_max_contracts.map(|limit| limit.saturating_sub(processed)) - }; - + batch_limit: usize, + ttl_days: usize, + ) -> Result>, StoreError> { + let ttl_days = ttl_days as i64; + let batch_limit = batch_limit as i64; match self { Storage::Shared => { - use public::eth_call_cache as cache; use public::eth_call_meta as meta; - loop { - if let Some(0) = remaining_contracts(total_contracts) { - info!( - logger, - "Finished cleaning call cache: deleted {} entries for {} contracts (limit reached)", - total_calls, - total_contracts - ); - break; - } - - let batch_limit = remaining_contracts(total_contracts) - .map(|left| left.min(contracts_batch_size)) - .unwrap_or(contracts_batch_size); - - let stale_contracts = meta::table - .select(meta::contract_address) - .filter( - meta::accessed_at - .lt(diesel::dsl::date(diesel::dsl::now - ttl_days.days())), - ) - .limit(batch_limit) - .get_results::>(conn) - .await?; - - if stale_contracts.is_empty() { - info!( - logger, - "Finished cleaning call cache: deleted {} entries for {} contracts", - total_calls, - total_contracts - ); - break; - } - - loop { - let current_size = batch_size.size; - let start = Instant::now(); - let next_batch = cache::table - .select(cache::id) - .filter(cache::contract_address.eq_any(&stale_contracts)) - .limit(current_size) - .get_results::>(conn) - .await?; - let deleted_count = - diesel::delete(cache::table.filter(cache::id.eq_any(&next_batch))) - .execute(conn) - .await?; - batch_size.adapt(start.elapsed()); - - total_calls += deleted_count; + meta::table + .select(meta::contract_address) + .filter( + meta::accessed_at + .lt(diesel::dsl::date(diesel::dsl::now - ttl_days.days())), + ) + .limit(batch_limit) + .get_results::>(conn) + .await + .map_err(StoreError::from) + } + Storage::Private(Schema { call_meta, .. }) => call_meta + .table() + .select(call_meta.contract_address()) + .filter( + call_meta + .accessed_at() + .lt(diesel::dsl::date(diesel::dsl::now - ttl_days.days())), + ) + .limit(batch_limit) + .get_results::>(conn) + .await + .map_err(StoreError::from), + } + } - if (deleted_count as i64) < current_size { - break; - } - } + /// Delete up to `batch_size` calls for the given + /// `stale_contracts`. Returns the number of deleted calls. + pub(super) async fn delete_calls( + &self, + conn: &mut AsyncPgConnection, + stale_contracts: &[Vec], + batch_size: i64, + ) -> Result { + match self { + Storage::Shared => { + use public::eth_call_cache as cache; - let deleted_contracts = diesel::delete( - meta::table.filter(meta::contract_address.eq_any(&stale_contracts)), - ) - .execute(conn) + let next_batch = cache::table + .select(cache::id) + .filter(cache::contract_address.eq_any(stale_contracts)) + .limit(batch_size) + .get_results::>(conn) .await?; - total_contracts += deleted_contracts as i64; - } - - Ok(()) + diesel::delete(cache::table.filter(cache::id.eq_any(&next_batch))) + .execute(conn) + .await + .map_err(StoreError::from) } - Storage::Private(Schema { - call_cache, - call_meta, - .. - }) => { + Storage::Private(Schema { call_cache, .. }) => { let delete_cache_query = format!( "WITH targets AS ( SELECT id - FROM {} + FROM {qname} WHERE contract_address = ANY($1) LIMIT $2 ) - DELETE FROM {} USING targets - WHERE {}.id = targets.id", - call_cache.qname, call_cache.qname, call_cache.qname + DELETE FROM {qname} USING targets + WHERE {qname}.id = targets.id", + qname = call_cache.qname ); + sql_query(&delete_cache_query) + .bind::, _>(stale_contracts) + .bind::(batch_size) + .execute(conn) + .await + .map_err(StoreError::from) + } + } + } + + pub(super) async fn delete_contracts( + &self, + conn: &mut AsyncPgConnection, + stale_contracts: &[Vec], + ) -> Result { + match self { + Storage::Shared => { + use public::eth_call_meta as meta; + + diesel::delete( + meta::table.filter(meta::contract_address.eq_any(stale_contracts)), + ) + .execute(conn) + .await + .map_err(StoreError::from) + } + Storage::Private(Schema { call_meta, .. }) => { let delete_meta_query = format!( "DELETE FROM {} WHERE contract_address = ANY($1)", call_meta.qname ); - loop { - if let Some(0) = remaining_contracts(total_contracts) { - info!( - logger, - "Finished cleaning call cache: deleted {} entries for {} contracts (limit reached)", - total_calls, - total_contracts - ); - break; - } - - let batch_limit = remaining_contracts(total_contracts) - .map(|left| left.min(contracts_batch_size)) - .unwrap_or(contracts_batch_size); - - let stale_contracts = call_meta - .table() - .select(call_meta.contract_address()) - .filter( - call_meta - .accessed_at() - .lt(diesel::dsl::date(diesel::dsl::now - ttl_days.days())), - ) - .limit(batch_limit) - .get_results::>(conn) - .await?; - - if stale_contracts.is_empty() { - info!( - logger, - "Finished cleaning call cache: deleted {} entries for {} contracts", - total_calls, - total_contracts - ); - break; - } - - loop { - let current_size = batch_size.size; - let start = Instant::now(); - let deleted_count = sql_query(&delete_cache_query) - .bind::, _>(&stale_contracts) - .bind::(current_size) - .execute(conn) - .await?; - batch_size.adapt(start.elapsed()); - - total_calls += deleted_count; - - if (deleted_count as i64) < current_size { - break; - } - } - - let deleted_contracts = sql_query(&delete_meta_query) - .bind::, _>(&stale_contracts) - .execute(conn) - .await?; - - total_contracts += deleted_contracts as i64; - } - - Ok(()) + sql_query(&delete_meta_query) + .bind::, _>(stale_contracts) + .execute(conn) + .await + .map_err(StoreError::from) } } } @@ -3341,13 +3272,84 @@ impl ChainStoreTrait for ChainStore { async fn clear_stale_call_cache( &self, - ttl_days: i32, - ttl_max_contracts: Option, + ttl_days: usize, + ttl_max_contracts: Option, ) -> Result<(), Error> { let conn = &mut self.pool.get_permitted().await?; + self.storage - .clear_stale_call_cache(conn, &self.logger, ttl_days, ttl_max_contracts) - .await + .ensure_contract_address_index(conn, &self.logger) + .await?; + + let mut total_calls: usize = 0; + let mut total_contracts: usize = 0; + // We process contracts in batches to avoid loading too many + // entries into memory at once. Each contract can have many + // calls, so we delete calls in adaptive batches that + // self-tune based on query duration. + let contracts_batch_size: usize = 2000; + let mut batch_size = AdaptiveBatchSize::with_size(100); + + // Limits the number of contracts to process if ttl_max_contracts is set. + // Used also to adjust the final batch size, so we don't process more + // contracts than the set limit. + let remaining_contracts = |processed: usize| -> Option { + ttl_max_contracts.map(|limit| limit.saturating_sub(processed)) + }; + + loop { + if let Some(0) = remaining_contracts(total_contracts) { + info!(self.logger, + "Finished cleaning call cache: deleted {} entries for {} contracts (limit reached)", + total_calls, total_contracts); + break; + } + + let batch_limit = remaining_contracts(total_contracts) + .map(|left| left.min(contracts_batch_size)) + .unwrap_or(contracts_batch_size); + + let stale_contracts = self + .storage + .stale_contracts(conn, batch_limit, ttl_days) + .await?; + + if stale_contracts.is_empty() { + info!( + self.logger, + "Finished cleaning call cache: deleted {} entries for {} contracts", + total_calls, + total_contracts + ); + break; + } + + loop { + let current_size = batch_size.size; + let start = Instant::now(); + let deleted_count = self + .storage + .delete_calls(conn, &stale_contracts, current_size) + .await?; + + batch_size.adapt(start.elapsed()); + + total_calls += deleted_count; + + if (deleted_count as i64) < current_size { + break; + } + } + + let deleted_contracts = self + .storage + .delete_contracts(conn, &stale_contracts) + .await?; + + total_contracts += deleted_contracts; + } + + Ok(()) } async fn transaction_receipts_in_block( From b2c9bbee8e731a9c97cec5c32aeea24b68465bb6 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 2 Apr 2026 14:27:08 -0700 Subject: [PATCH 5/6] graph, store: Change how many stale contracts get deleted in one batch Introduce an env variable to control that, and lower the default to 100 --- graph/src/env/store.rs | 11 +++++++++++ store/postgres/src/chain_store.rs | 2 +- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/graph/src/env/store.rs b/graph/src/env/store.rs index e36e60c8b84..c38ad9aaac3 100644 --- a/graph/src/env/store.rs +++ b/graph/src/env/store.rs @@ -171,6 +171,11 @@ pub struct EnvVarsStore { /// Disables storing or reading `eth_call` results from the store call cache. /// Set by `GRAPH_STORE_DISABLE_CALL_CACHE`. Defaults to false. pub disable_call_cache: bool, + /// The number of contracts to delete from the call cache in one batch + /// when clearing stale entries, set by + /// `GRAPH_STORE_STALE_CALL_CACHE_CONTRACTS_BATCH_SIZE`. The default + /// value is 100 contracts. + pub stale_call_cache_contracts_batch_size: usize, /// Set by `GRAPH_STORE_DISABLE_CHAIN_HEAD_PTR_CACHE`. Default is false. /// Set to true to disable chain_head_ptr caching (safety escape hatch). pub disable_chain_head_ptr_cache: bool, @@ -248,6 +253,7 @@ impl TryFrom for EnvVarsStore { account_like_min_versions_count: x.account_like_min_versions_count, account_like_max_unique_ratio: x.account_like_max_unique_ratio.map(|r| r.0), disable_call_cache: x.disable_call_cache, + stale_call_cache_contracts_batch_size: x.stale_call_cache_contracts_batch_size, disable_chain_head_ptr_cache: x.disable_chain_head_ptr_cache, connection_validation_idle_secs: Duration::from_secs(x.connection_validation_idle_secs), connection_unavailable_retry: Duration::from_secs( @@ -364,6 +370,11 @@ pub struct InnerStore { account_like_max_unique_ratio: Option, #[envconfig(from = "GRAPH_STORE_DISABLE_CALL_CACHE", default = "false")] disable_call_cache: bool, + #[envconfig( + from = "GRAPH_STORE_STALE_CALL_CACHE_CONTRACTS_BATCH_SIZE", + default = "100" + )] + stale_call_cache_contracts_batch_size: usize, #[envconfig(from = "GRAPH_STORE_DISABLE_CHAIN_HEAD_PTR_CACHE", default = "false")] disable_chain_head_ptr_cache: bool, #[envconfig(from = "GRAPH_STORE_CONNECTION_VALIDATION_IDLE_SECS", default = "30")] diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index 1f0c8ab33cd..075a56662f8 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -3287,7 +3287,7 @@ impl ChainStoreTrait for ChainStore { // entries into memory at once. Each contract can have many // calls, so we delete calls in adaptive batches that // self-tune based on query duration. - let contracts_batch_size: usize = 2000; + let contracts_batch_size: usize = ENV_VARS.store.stale_call_cache_contracts_batch_size; let mut batch_size = AdaptiveBatchSize::with_size(100); // Limits the number of contracts to process if ttl_max_contracts is set. From 6a388cbabb3e4fb409fbf4054dc1f4fc29779137 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 2 Apr 2026 14:40:09 -0700 Subject: [PATCH 6/6] store: Check whether call_cache.contract_address_index is valid If it is not, drop it and recreate it --- store/postgres/src/chain_store.rs | 66 +++++++++++++++++-------------- 1 file changed, 36 insertions(+), 30 deletions(-) diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index 075a56662f8..dfd088934bc 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -126,7 +126,7 @@ pub use data::Storage; /// Encapuslate access to the blocks table for a chain. mod data { use crate::diesel::dsl::IntervalDsl; - use crate::AsyncPgConnection; + use crate::{catalog, AsyncPgConnection}; use diesel::dsl::sql; use diesel::insert_into; use diesel::sql_types::{Array, Binary, Bool, Nullable, Text}; @@ -1648,8 +1648,6 @@ mod data { } } - const CALL_CACHE_CONTRACT_ADDRESS_INDEX: &str = "call_cache_contract_address"; - /// Ensure that an index on `contract_address` exists on the /// call_cache table to speed up deletion queries. If the index does /// not exist, create it concurrently. @@ -1658,6 +1656,8 @@ mod data { conn: &mut AsyncPgConnection, logger: &Logger, ) -> Result<(), StoreError> { + const CONTRACT_INDEX: &str = "call_cache_contract_address"; + let (schema_name, table_qname) = match self { Storage::Shared => ("public", "public.eth_call_cache".to_string()), Storage::Private(Schema { @@ -1665,39 +1665,45 @@ mod data { }) => (name.as_str(), call_cache.qname.clone()), }; - let has_index = crate::catalog::table_has_index( - conn, - schema_name, - Self::CALL_CACHE_CONTRACT_ADDRESS_INDEX, - ) - .await?; + let has_index = catalog::table_has_index(conn, schema_name, CONTRACT_INDEX).await?; - if !has_index { - let start = Instant::now(); - info!( - logger, - "Creating index {} on {}.contract_address; \ - this may take a long time", - Self::CALL_CACHE_CONTRACT_ADDRESS_INDEX, - table_qname - ); + let idx_valid = + catalog::check_index_is_valid(conn, schema_name, CONTRACT_INDEX).await?; + + if has_index && idx_valid { + return Ok(()); + } + + if !idx_valid { conn.batch_execute(&format!( - "create index concurrently if not exists {} \ - on {}(contract_address)", - Self::CALL_CACHE_CONTRACT_ADDRESS_INDEX, - table_qname + "drop index concurrently if exists {schema_name}.{}", + CONTRACT_INDEX )) .await?; - let duration = start.elapsed(); - info!( - logger, - "Finished creating index {} on {}.contract_address in {:?}", - Self::CALL_CACHE_CONTRACT_ADDRESS_INDEX, - table_qname, - duration - ); } + let start = Instant::now(); + info!( + logger, + "Creating index {} on {}.contract_address; \ + this may take a long time", + CONTRACT_INDEX, + table_qname + ); + conn.batch_execute(&format!( + "create index concurrently {} on {}(contract_address)", + CONTRACT_INDEX, table_qname + )) + .await?; + let duration = start.elapsed(); + info!( + logger, + "Finished creating index {} on {}.contract_address in {:?}", + CONTRACT_INDEX, + table_qname, + duration + ); + Ok(()) }