diff --git a/crates/rpc/rpc/src/eth/api/fee_history.rs b/crates/rpc/rpc/src/eth/api/fee_history.rs index b78c7f04ec..8c01d672c1 100644 --- a/crates/rpc/rpc/src/eth/api/fee_history.rs +++ b/crates/rpc/rpc/src/eth/api/fee_history.rs @@ -1,40 +1,25 @@ //! Consist of types adjacent to the fee history cache and its configs use crate::eth::{cache::EthStateCache, error::EthApiError, gas_oracle::MAX_HEADER_HISTORY}; -use futures::{Stream, StreamExt}; +use futures::{ + future::{Fuse, FusedFuture}, + FutureExt, Stream, StreamExt, +}; use metrics::atomics::AtomicU64; use reth_primitives::{Receipt, SealedBlock, TransactionSigned, B256, U256}; use reth_provider::{BlockReaderIdExt, CanonStateNotification, ChainSpecProvider}; use reth_rpc_types::TxGasAndReward; use serde::{Deserialize, Serialize}; use std::{ - collections::BTreeMap, + collections::{BTreeMap, VecDeque}, fmt::Debug, sync::{atomic::Ordering::SeqCst, Arc}, }; +use tracing::trace; -/// Settings for the [FeeHistoryCache]. -#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct FeeHistoryCacheConfig { - /// Max number of blocks in cache. - /// - /// Default is [MAX_HEADER_HISTORY] plus some change to also serve slightly older blocks from - /// cache, since fee_history supports the entire range - pub max_blocks: u64, - /// Percentile approximation resolution - /// - /// Default is 4 which means 0.25 - pub resolution: u64, -} - -impl Default for FeeHistoryCacheConfig { - fn default() -> Self { - FeeHistoryCacheConfig { max_blocks: MAX_HEADER_HISTORY + 100, resolution: 4 } - } -} - -/// Wrapper struct for BTreeMap +/// Contains cached fee history entries for blocks. +/// +/// Purpose for this is to provide cached data for `eth_feeHistory`. #[derive(Debug, Clone)] pub struct FeeHistoryCache { inner: Arc, @@ -65,7 +50,24 @@ impl FeeHistoryCache { self.config().resolution } - /// Processing of the arriving blocks + /// Returns all blocks that are missing in the cache in the [lower_bound, upper_bound] range. + /// + /// This function is used to populate the cache with missing blocks, which can happen if the + /// node switched to stage sync node. + async fn missing_consecutive_blocks(&self) -> VecDeque { + let mut missing_blocks = VecDeque::new(); + let lower_bound = self.lower_bound(); + let upper_bound = self.upper_bound(); + let entries = self.inner.entries.read().await; + for block_number in (lower_bound..upper_bound).rev() { + if !entries.contains_key(&block_number) { + missing_blocks.push_back(block_number); + } + } + missing_blocks + } + + /// Insert block data into the cache. async fn insert_blocks(&self, blocks: I) where I: Iterator)>, @@ -99,6 +101,13 @@ impl FeeHistoryCache { } let upper_bound = *entries.last_entry().expect("Contains at least one entry").key(); + + // also enforce proper lower bound in case we have gaps + let target_lower = upper_bound.saturating_sub(self.inner.config.max_blocks); + while entries.len() > 1 && *entries.first_key_value().unwrap().0 < target_lower { + entries.pop_first(); + } + let lower_bound = *entries.first_entry().expect("Contains at least one entry").key(); self.inner.upper_bound.store(upper_bound, SeqCst); self.inner.lower_bound.store(lower_bound, SeqCst); @@ -130,7 +139,7 @@ impl FeeHistoryCache { if start_block >= lower_bound && end_block <= upper_bound { let entries = self.inner.entries.read().await; let result = entries - .range(start_block..=end_block + 1) + .range(start_block..=end_block) .map(|(_, fee_entry)| fee_entry.clone()) .collect::>(); @@ -153,18 +162,39 @@ impl FeeHistoryCache { } } +/// Settings for the [FeeHistoryCache]. +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct FeeHistoryCacheConfig { + /// Max number of blocks in cache. + /// + /// Default is [MAX_HEADER_HISTORY] plus some change to also serve slightly older blocks from + /// cache, since fee_history supports the entire range + pub max_blocks: u64, + /// Percentile approximation resolution + /// + /// Default is 4 which means 0.25 + pub resolution: u64, +} + +impl Default for FeeHistoryCacheConfig { + fn default() -> Self { + FeeHistoryCacheConfig { max_blocks: MAX_HEADER_HISTORY + 100, resolution: 4 } + } +} + /// Container type for shared state in [FeeHistoryCache] #[derive(Debug)] struct FeeHistoryCacheInner { /// Stores the lower bound of the cache lower_bound: AtomicU64, + /// Stores the upper bound of the cache upper_bound: AtomicU64, /// Config for FeeHistoryCache, consists of resolution for percentile approximation /// and max number of blocks config: FeeHistoryCacheConfig, /// Stores the entries of the cache entries: tokio::sync::RwLock>, - #[allow(unused)] eth_cache: EthStateCache, } @@ -173,22 +203,56 @@ struct FeeHistoryCacheInner { pub async fn fee_history_cache_new_blocks_task( fee_history_cache: FeeHistoryCache, mut events: St, - _provider: Provider, + provider: Provider, ) where St: Stream + Unpin + 'static, Provider: BlockReaderIdExt + ChainSpecProvider + 'static, { - // TODO: keep track of gaps in the chain and fill them from disk + // We're listening for new blocks emitted when the node is in live sync. + // If the node transitions to stage sync, we need to fetch the missing blocks + let mut missing_blocks = VecDeque::new(); + let mut fetch_missing_block = Fuse::terminated(); - while let Some(event) = events.next().await { - if let Some(committed) = event.committed() { - let (blocks, receipts): (Vec<_>, Vec<_>) = committed - .blocks_and_receipts() - .map(|(block, receipts)| { - (block.block.clone(), receipts.iter().flatten().cloned().collect::>()) - }) - .unzip(); - fee_history_cache.insert_blocks(blocks.into_iter().zip(receipts)).await; + loop { + if fetch_missing_block.is_terminated() { + if let Some(block_number) = missing_blocks.pop_front() { + trace!(target: "rpc::fee", ?block_number, "Fetching missing block for fee history cache"); + if let Ok(Some(hash)) = provider.block_hash(block_number) { + // fetch missing block + fetch_missing_block = fee_history_cache + .inner + .eth_cache + .get_block_and_receipts(hash) + .boxed() + .fuse(); + } + } + } + + tokio::select! { + res = &mut fetch_missing_block => { + if let Ok(res) = res { + fee_history_cache.insert_blocks(res.into_iter()).await; + } + } + event = events.next() => { + let Some(event) = event else { + // the stream ended, we are done + break; + }; + if let Some(committed) = event.committed() { + let (blocks, receipts): (Vec<_>, Vec<_>) = committed + .blocks_and_receipts() + .map(|(block, receipts)| { + (block.block.clone(), receipts.iter().flatten().cloned().collect::>()) + }) + .unzip(); + fee_history_cache.insert_blocks(blocks.into_iter().zip(receipts)).await; + + // keep track of missing blocks + missing_blocks = fee_history_cache.missing_consecutive_blocks().await; + } + } } } }