diff --git a/graph/src/blockchain/mock.rs b/graph/src/blockchain/mock.rs index d5d26447c64..74d23b388a9 100644 --- a/graph/src/blockchain/mock.rs +++ b/graph/src/blockchain/mock.rs @@ -594,8 +594,8 @@ impl ChainStore for MockChainStore { } async fn clear_stale_call_cache( &self, - _ttl_days: i32, - _ttl_max_contracts: Option, + _ttl_days: usize, + _ttl_max_contracts: Option, ) -> Result<(), Error> { unimplemented!() } diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 0835666e533..4c39f182c01 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -660,8 +660,8 @@ pub trait ChainStore: ChainHeadStore { /// Clears stale call cache entries for the given TTL in days. async fn clear_stale_call_cache( &self, - ttl_days: i32, - ttl_max_contracts: Option, + ttl_days: usize, + ttl_max_contracts: Option, ) -> Result<(), Error>; /// Return the chain identifier for this store. diff --git a/graph/src/env/store.rs b/graph/src/env/store.rs index e36e60c8b84..c38ad9aaac3 100644 --- a/graph/src/env/store.rs +++ b/graph/src/env/store.rs @@ -171,6 +171,11 @@ pub struct EnvVarsStore { /// Disables storing or reading `eth_call` results from the store call cache. /// Set by `GRAPH_STORE_DISABLE_CALL_CACHE`. Defaults to false. pub disable_call_cache: bool, + /// The number of contracts to delete from the call cache in one batch + /// when clearing stale entries, set by + /// `GRAPH_STORE_STALE_CALL_CACHE_CONTRACTS_BATCH_SIZE`. The default + /// value is 100 contracts. + pub stale_call_cache_contracts_batch_size: usize, /// Set by `GRAPH_STORE_DISABLE_CHAIN_HEAD_PTR_CACHE`. Default is false. /// Set to true to disable chain_head_ptr caching (safety escape hatch). pub disable_chain_head_ptr_cache: bool, @@ -248,6 +253,7 @@ impl TryFrom for EnvVarsStore { account_like_min_versions_count: x.account_like_min_versions_count, account_like_max_unique_ratio: x.account_like_max_unique_ratio.map(|r| r.0), disable_call_cache: x.disable_call_cache, + stale_call_cache_contracts_batch_size: x.stale_call_cache_contracts_batch_size, disable_chain_head_ptr_cache: x.disable_chain_head_ptr_cache, connection_validation_idle_secs: Duration::from_secs(x.connection_validation_idle_secs), connection_unavailable_retry: Duration::from_secs( @@ -364,6 +370,11 @@ pub struct InnerStore { account_like_max_unique_ratio: Option, #[envconfig(from = "GRAPH_STORE_DISABLE_CALL_CACHE", default = "false")] disable_call_cache: bool, + #[envconfig( + from = "GRAPH_STORE_STALE_CALL_CACHE_CONTRACTS_BATCH_SIZE", + default = "100" + )] + stale_call_cache_contracts_batch_size: usize, #[envconfig(from = "GRAPH_STORE_DISABLE_CHAIN_HEAD_PTR_CACHE", default = "false")] disable_chain_head_ptr_cache: bool, #[envconfig(from = "GRAPH_STORE_CONNECTION_VALIDATION_IDLE_SECS", default = "30")] diff --git a/node/src/bin/manager.rs b/node/src/bin/manager.rs index 2eb5f41fa27..684c3936070 100644 --- a/node/src/bin/manager.rs +++ b/node/src/bin/manager.rs @@ -619,11 +619,11 @@ pub enum CallCacheCommand { #[clap(long, conflicts_with_all = &["from", "to"])] remove_entire_cache: bool, /// Remove the cache for contracts that have not been accessed in the last days - #[clap(long, conflicts_with_all = &["from", "to", "remove-entire-cache"], value_parser = clap::value_parser!(i32).range(1..))] - ttl_days: Option, + #[clap(long, conflicts_with_all = &["from", "to", "remove-entire-cache"], value_parser = clap::value_parser!(u32).range(1..))] + ttl_days: Option, /// Limits the number of contracts to consider for cache removal when using --ttl_days - #[clap(long, conflicts_with_all = &["remove-entire-cache", "to", "from"], requires = "ttl_days", value_parser = clap::value_parser!(i64).range(1..))] - ttl_max_contracts: Option, + #[clap(long, conflicts_with_all = &["remove-entire-cache", "to", "from"], requires = "ttl_days", value_parser = clap::value_parser!(u64).range(1..))] + ttl_max_contracts: Option, /// Starting block number #[clap(long, short, conflicts_with = "remove-entire-cache", requires = "to")] from: Option, diff --git a/node/src/manager/commands/chain.rs b/node/src/manager/commands/chain.rs index 8afff467ffc..df50c5e4069 100644 --- a/node/src/manager/commands/chain.rs +++ b/node/src/manager/commands/chain.rs @@ -83,8 +83,8 @@ pub async fn clear_call_cache( pub async fn clear_stale_call_cache( chain_store: Arc, - ttl_days: i32, - ttl_max_contracts: Option, + ttl_days: usize, + ttl_max_contracts: Option, ) -> Result<(), Error> { println!( "Removing stale entries from the call cache for `{}`", diff --git a/store/postgres/src/catalog.rs b/store/postgres/src/catalog.rs index 5f094548565..83bc7528900 100644 --- a/store/postgres/src/catalog.rs +++ b/store/postgres/src/catalog.rs @@ -913,6 +913,32 @@ pub(crate) async fn indexes_for_table( Ok(results.into_iter().map(|i| i.def).collect()) } +pub(crate) async fn table_has_index( + conn: &mut AsyncPgConnection, + schema_name: &str, + index_name: &str, +) -> Result { + #[derive(QueryableByName)] + #[allow(dead_code)] + struct Exists { + #[diesel(sql_type = diesel::sql_types::Integer)] + exists: i32, + } + + let exists = sql_query( + "SELECT 1 AS exists FROM pg_indexes \ + WHERE schemaname = $1 AND indexname = $2", + ) + .bind::(schema_name) + .bind::(index_name) + .get_result::(conn) + .await + .optional() + .map_err::(Into::into)?; + + Ok(exists.is_some()) +} + pub(crate) async fn drop_index( conn: &mut AsyncPgConnection, schema_name: &str, diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index 237d4b1e669..dfd088934bc 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -12,7 +12,7 @@ use graph::parking_lot::RwLock; use graph::prelude::alloy::primitives::B256; use graph::prelude::MetricsRegistry; use graph::prometheus::{CounterVec, GaugeVec}; -use graph::slog::Logger; +use graph::slog::{info, Logger}; use graph::stable_hash::crypto_stable_hash; use graph::util::herd_cache::HerdCache; @@ -37,6 +37,7 @@ use graph::prelude::{ use graph::{ensure, internal_error}; use self::recent_blocks_cache::RecentBlocksCache; +use crate::vid_batcher::AdaptiveBatchSize; use crate::AsyncPgConnection; use crate::{chain_head_listener::ChainHeadUpdateSender, pool::ConnectionPool}; @@ -125,7 +126,7 @@ pub use data::Storage; /// Encapuslate access to the blocks table for a chain. mod data { use crate::diesel::dsl::IntervalDsl; - use crate::AsyncPgConnection; + use crate::{catalog, AsyncPgConnection}; use diesel::dsl::sql; use diesel::insert_into; use diesel::sql_types::{Array, Binary, Bool, Nullable, Text}; @@ -159,6 +160,8 @@ mod data { use std::iter::FromIterator; use std::str::FromStr; + use std::time::Instant; + use crate::transaction_receipt::RawTransactionReceipt; use super::JsonBlock; @@ -302,6 +305,11 @@ mod data { fn contract_address(&self) -> DynColumn { self.table.column::("contract_address") } + + fn accessed_at(&self) -> DynColumn { + self.table + .column::(Self::ACCESSED_AT) + } } #[derive(Clone, Debug)] @@ -1640,193 +1648,179 @@ mod data { } } - pub async fn clear_stale_call_cache( + /// Ensure that an index on `contract_address` exists on the + /// call_cache table to speed up deletion queries. If the index does + /// not exist, create it concurrently. + pub(super) async fn ensure_contract_address_index( &self, conn: &mut AsyncPgConnection, logger: &Logger, - ttl_days: i32, - ttl_max_contracts: Option, - ) -> Result<(), Error> { - let mut total_calls: usize = 0; - let mut total_contracts: i64 = 0; - // We process contracts in batches to avoid loading too many entries into memory - // at once. Each contract can have many calls, so we also delete calls in batches. - // Note: The batch sizes were chosen based on experimentation. Potentially, they - // could be made configurable via ENV vars. - let contracts_batch_size: i64 = 2000; - let cache_batch_size: usize = 10000; - - // Limits the number of contracts to process if ttl_max_contracts is set. - // Used also to adjust the final batch size, so we don't process more - // contracts than the set limit. - let remaining_contracts = |processed: i64| -> Option { - ttl_max_contracts.map(|limit| limit.saturating_sub(processed)) + ) -> Result<(), StoreError> { + const CONTRACT_INDEX: &str = "call_cache_contract_address"; + + let (schema_name, table_qname) = match self { + Storage::Shared => ("public", "public.eth_call_cache".to_string()), + Storage::Private(Schema { + name, call_cache, .. + }) => (name.as_str(), call_cache.qname.clone()), }; - match self { - Storage::Shared => { - use public::eth_call_cache as cache; - use public::eth_call_meta as meta; + let has_index = catalog::table_has_index(conn, schema_name, CONTRACT_INDEX).await?; - loop { - if let Some(0) = remaining_contracts(total_contracts) { - info!( - logger, - "Finished cleaning call cache: deleted {} entries for {} contracts (limit reached)", - total_calls, - total_contracts - ); - break; - } + let idx_valid = + catalog::check_index_is_valid(conn, schema_name, CONTRACT_INDEX).await?; - let batch_limit = remaining_contracts(total_contracts) - .map(|left| left.min(contracts_batch_size)) - .unwrap_or(contracts_batch_size); - - let stale_contracts = meta::table - .select(meta::contract_address) - .filter( - meta::accessed_at - .lt(diesel::dsl::date(diesel::dsl::now - ttl_days.days())), - ) - .limit(batch_limit) - .get_results::>(conn) - .await?; + if has_index && idx_valid { + return Ok(()); + } - if stale_contracts.is_empty() { - info!( - logger, - "Finished cleaning call cache: deleted {} entries for {} contracts", - total_calls, - total_contracts - ); - break; - } + if !idx_valid { + conn.batch_execute(&format!( + "drop index concurrently if exists {schema_name}.{}", + CONTRACT_INDEX + )) + .await?; + } - loop { - let next_batch = cache::table - .select(cache::id) - .filter(cache::contract_address.eq_any(&stale_contracts)) - .limit(cache_batch_size as i64) - .get_results::>(conn) - .await?; - let deleted_count = - diesel::delete(cache::table.filter(cache::id.eq_any(&next_batch))) - .execute(conn) - .await?; + let start = Instant::now(); + info!( + logger, + "Creating index {} on {}.contract_address; \ + this may take a long time", + CONTRACT_INDEX, + table_qname + ); + conn.batch_execute(&format!( + "create index concurrently {} on {}(contract_address)", + CONTRACT_INDEX, table_qname + )) + .await?; + let duration = start.elapsed(); + info!( + logger, + "Finished creating index {} on {}.contract_address in {:?}", + CONTRACT_INDEX, + table_qname, + duration + ); - total_calls += deleted_count; + Ok(()) + } - if deleted_count < cache_batch_size { - break; - } - } + /// Find up to `batch_limit` contract addresses that have not + /// been accessed in the last `ttl_days` days + pub(super) async fn stale_contracts( + &self, + conn: &mut AsyncPgConnection, + batch_limit: usize, + ttl_days: usize, + ) -> Result>, StoreError> { + let ttl_days = ttl_days as i64; + let batch_limit = batch_limit as i64; + match self { + Storage::Shared => { + use public::eth_call_meta as meta; - let deleted_contracts = diesel::delete( - meta::table.filter(meta::contract_address.eq_any(&stale_contracts)), + meta::table + .select(meta::contract_address) + .filter( + meta::accessed_at + .lt(diesel::dsl::date(diesel::dsl::now - ttl_days.days())), ) - .execute(conn) - .await?; + .limit(batch_limit) + .get_results::>(conn) + .await + .map_err(StoreError::from) + } + Storage::Private(Schema { call_meta, .. }) => call_meta + .table() + .select(call_meta.contract_address()) + .filter( + call_meta + .accessed_at() + .lt(diesel::dsl::date(diesel::dsl::now - ttl_days.days())), + ) + .limit(batch_limit) + .get_results::>(conn) + .await + .map_err(StoreError::from), + } + } - total_contracts += deleted_contracts as i64; - } + /// Delete up to `batch_size` calls for the given + /// `stale_contracts`. Returns the number of deleted calls. + pub(super) async fn delete_calls( + &self, + conn: &mut AsyncPgConnection, + stale_contracts: &[Vec], + batch_size: i64, + ) -> Result { + match self { + Storage::Shared => { + use public::eth_call_cache as cache; - Ok(()) - } - Storage::Private(Schema { - call_cache, - call_meta, - .. - }) => { - let select_query = format!( - "WITH stale_contracts AS ( - SELECT contract_address - FROM {} - WHERE accessed_at < current_date - interval '{} days' - LIMIT $1 - ) - SELECT contract_address FROM stale_contracts", - call_meta.qname, ttl_days - ); + let next_batch = cache::table + .select(cache::id) + .filter(cache::contract_address.eq_any(stale_contracts)) + .limit(batch_size) + .get_results::>(conn) + .await?; + diesel::delete(cache::table.filter(cache::id.eq_any(&next_batch))) + .execute(conn) + .await + .map_err(StoreError::from) + } + Storage::Private(Schema { call_cache, .. }) => { let delete_cache_query = format!( "WITH targets AS ( SELECT id - FROM {} + FROM {qname} WHERE contract_address = ANY($1) - LIMIT {} + LIMIT $2 ) - DELETE FROM {} USING targets - WHERE {}.id = targets.id", - call_cache.qname, cache_batch_size, call_cache.qname, call_cache.qname + DELETE FROM {qname} USING targets + WHERE {qname}.id = targets.id", + qname = call_cache.qname ); + sql_query(&delete_cache_query) + .bind::, _>(stale_contracts) + .bind::(batch_size) + .execute(conn) + .await + .map_err(StoreError::from) + } + } + } + + pub(super) async fn delete_contracts( + &self, + conn: &mut AsyncPgConnection, + stale_contracts: &[Vec], + ) -> Result { + match self { + Storage::Shared => { + use public::eth_call_meta as meta; + + diesel::delete( + meta::table.filter(meta::contract_address.eq_any(stale_contracts)), + ) + .execute(conn) + .await + .map_err(StoreError::from) + } + Storage::Private(Schema { call_meta, .. }) => { let delete_meta_query = format!( "DELETE FROM {} WHERE contract_address = ANY($1)", call_meta.qname ); - #[derive(QueryableByName)] - struct ContractAddress { - #[diesel(sql_type = Bytea)] - contract_address: Vec, - } - - loop { - if let Some(0) = remaining_contracts(total_contracts) { - info!( - logger, - "Finished cleaning call cache: deleted {} entries for {} contracts (limit reached)", - total_calls, - total_contracts - ); - break; - } - - let batch_limit = remaining_contracts(total_contracts) - .map(|left| left.min(contracts_batch_size)) - .unwrap_or(contracts_batch_size); - - let stale_contracts: Vec> = sql_query(&select_query) - .bind::(batch_limit) - .load::(conn) - .await? - .into_iter() - .map(|r| r.contract_address) - .collect(); - - if stale_contracts.is_empty() { - info!( - logger, - "Finished cleaning call cache: deleted {} entries for {} contracts", - total_calls, - total_contracts - ); - break; - } - - loop { - let deleted_count = sql_query(&delete_cache_query) - .bind::, _>(&stale_contracts) - .execute(conn) - .await?; - - total_calls += deleted_count; - - if deleted_count < cache_batch_size { - break; - } - } - - let deleted_contracts = sql_query(&delete_meta_query) - .bind::, _>(&stale_contracts) - .execute(conn) - .await?; - - total_contracts += deleted_contracts as i64; - } - - Ok(()) + sql_query(&delete_meta_query) + .bind::, _>(stale_contracts) + .execute(conn) + .await + .map_err(StoreError::from) } } } @@ -3284,13 +3278,84 @@ impl ChainStoreTrait for ChainStore { async fn clear_stale_call_cache( &self, - ttl_days: i32, - ttl_max_contracts: Option, + ttl_days: usize, + ttl_max_contracts: Option, ) -> Result<(), Error> { let conn = &mut self.pool.get_permitted().await?; + self.storage - .clear_stale_call_cache(conn, &self.logger, ttl_days, ttl_max_contracts) - .await + .ensure_contract_address_index(conn, &self.logger) + .await?; + + let mut total_calls: usize = 0; + let mut total_contracts: usize = 0; + // We process contracts in batches to avoid loading too many + // entries into memory at once. Each contract can have many + // calls, so we delete calls in adaptive batches that + // self-tune based on query duration. + let contracts_batch_size: usize = ENV_VARS.store.stale_call_cache_contracts_batch_size; + let mut batch_size = AdaptiveBatchSize::with_size(100); + + // Limits the number of contracts to process if ttl_max_contracts is set. + // Used also to adjust the final batch size, so we don't process more + // contracts than the set limit. + let remaining_contracts = |processed: usize| -> Option { + ttl_max_contracts.map(|limit| limit.saturating_sub(processed)) + }; + + loop { + if let Some(0) = remaining_contracts(total_contracts) { + info!(self.logger, + "Finished cleaning call cache: deleted {} entries for {} contracts (limit reached)", + total_calls, total_contracts); + break; + } + + let batch_limit = remaining_contracts(total_contracts) + .map(|left| left.min(contracts_batch_size)) + .unwrap_or(contracts_batch_size); + + let stale_contracts = self + .storage + .stale_contracts(conn, batch_limit, ttl_days) + .await?; + + if stale_contracts.is_empty() { + info!( + self.logger, + "Finished cleaning call cache: deleted {} entries for {} contracts", + total_calls, + total_contracts + ); + break; + } + + loop { + let current_size = batch_size.size; + let start = Instant::now(); + let deleted_count = self + .storage + .delete_calls(conn, &stale_contracts, current_size) + .await?; + + batch_size.adapt(start.elapsed()); + + total_calls += deleted_count; + + if (deleted_count as i64) < current_size { + break; + } + } + + let deleted_contracts = self + .storage + .delete_contracts(conn, &stale_contracts) + .await?; + + total_contracts += deleted_contracts; + } + + Ok(()) } async fn transaction_receipts_in_block( diff --git a/store/postgres/src/vid_batcher.rs b/store/postgres/src/vid_batcher.rs index 8cb0496bd86..7ea633058f8 100644 --- a/store/postgres/src/vid_batcher.rs +++ b/store/postgres/src/vid_batcher.rs @@ -35,6 +35,13 @@ pub(crate) struct AdaptiveBatchSize { } impl AdaptiveBatchSize { + pub fn with_size(size: i64) -> Self { + Self { + size, + target: ENV_VARS.store.batch_target_duration, + } + } + pub fn new(table: &Table) -> Self { let size = if table.columns.iter().any(|col| col.is_list()) { INITIAL_BATCH_SIZE_LIST