From 563a683a62a0e6052ea972539f42b8ea781534ef Mon Sep 17 00:00:00 2001 From: Nil Medvedev Date: Mon, 27 Nov 2023 10:59:27 +0000 Subject: [PATCH] Feat/improve fee history performance (#5182) Co-authored-by: Matthias Seitz --- .../src/test_utils/bodies_client.rs | 2 +- crates/net/eth-wire/src/capability.rs | 2 +- crates/primitives/src/snapshot/segment.rs | 4 +- crates/rpc/rpc-builder/src/auth.rs | 10 +- crates/rpc/rpc-builder/src/eth.rs | 5 +- crates/rpc/rpc-builder/src/lib.rs | 22 +- .../rpc-types/src/serde_helpers/json_u256.rs | 4 +- crates/rpc/rpc/src/eth/api/fee_history.rs | 280 ++++++++++++++++++ crates/rpc/rpc/src/eth/api/fees.rs | 194 ++++++------ crates/rpc/rpc/src/eth/api/mod.rs | 25 +- crates/rpc/rpc/src/eth/api/server.rs | 15 +- crates/rpc/rpc/src/eth/api/state.rs | 11 +- crates/rpc/rpc/src/eth/api/transactions.rs | 10 +- crates/rpc/rpc/src/eth/gas_oracle.rs | 11 +- crates/rpc/rpc/src/eth/mod.rs | 6 +- .../src/providers/database/provider.rs | 1 - .../storage/provider/src/test_utils/mock.rs | 10 +- crates/transaction-pool/src/pool/blob.rs | 2 +- 18 files changed, 490 insertions(+), 124 deletions(-) create mode 100644 crates/rpc/rpc/src/eth/api/fee_history.rs diff --git a/crates/net/downloaders/src/test_utils/bodies_client.rs b/crates/net/downloaders/src/test_utils/bodies_client.rs index ac791742d0..2f3cf2f293 100644 --- a/crates/net/downloaders/src/test_utils/bodies_client.rs +++ b/crates/net/downloaders/src/test_utils/bodies_client.rs @@ -92,7 +92,7 @@ impl BodiesClient for TestBodiesClient { Box::pin(async move { if should_respond_empty { - return Ok((PeerId::default(), vec![]).into()); + return Ok((PeerId::default(), vec![]).into()) } if should_delay { diff --git a/crates/net/eth-wire/src/capability.rs b/crates/net/eth-wire/src/capability.rs index 9bd4afa82e..8f090c72a2 100644 --- a/crates/net/eth-wire/src/capability.rs +++ b/crates/net/eth-wire/src/capability.rs @@ -252,7 +252,7 @@ impl SharedCapability { /// Returns an error if the offset is equal or less than [`MAX_RESERVED_MESSAGE_ID`]. pub(crate) fn new(name: &str, version: u8, offset: u8) -> Result { if offset <= MAX_RESERVED_MESSAGE_ID { - return Err(SharedCapabilityError::ReservedMessageIdOffset(offset)); + return Err(SharedCapabilityError::ReservedMessageIdOffset(offset)) } match name { diff --git a/crates/primitives/src/snapshot/segment.rs b/crates/primitives/src/snapshot/segment.rs index 90145f14fe..879255fa6a 100644 --- a/crates/primitives/src/snapshot/segment.rs +++ b/crates/primitives/src/snapshot/segment.rs @@ -117,7 +117,7 @@ impl SnapshotSegment { ) -> Option<(Self, RangeInclusive, RangeInclusive)> { let mut parts = name.to_str()?.split('_'); if parts.next() != Some("snapshot") { - return None; + return None } let segment = Self::from_str(parts.next()?).ok()?; @@ -125,7 +125,7 @@ impl SnapshotSegment { let (tx_start, tx_end) = (parts.next()?.parse().ok()?, parts.next()?.parse().ok()?); if block_start >= block_end || tx_start > tx_end { - return None; + return None } Some((segment, block_start..=block_end, tx_start..=tx_end)) diff --git a/crates/rpc/rpc-builder/src/auth.rs b/crates/rpc/rpc-builder/src/auth.rs index c95f3bc008..7f1158e1cf 100644 --- a/crates/rpc/rpc-builder/src/auth.rs +++ b/crates/rpc/rpc-builder/src/auth.rs @@ -16,7 +16,10 @@ use reth_provider::{ StateProviderFactory, }; use reth_rpc::{ - eth::{cache::EthStateCache, gas_oracle::GasPriceOracle, EthFilterConfig}, + eth::{ + cache::EthStateCache, gas_oracle::GasPriceOracle, EthFilterConfig, FeeHistoryCache, + FeeHistoryCacheConfig, + }, AuthLayer, BlockingTaskPool, Claims, EngineEthApi, EthApi, EthFilter, EthSubscriptionIdProvider, JwtAuthValidator, JwtSecret, }; @@ -57,7 +60,11 @@ where // spawn a new cache task let eth_cache = EthStateCache::spawn_with(provider.clone(), Default::default(), executor.clone()); + let gas_oracle = GasPriceOracle::new(provider.clone(), Default::default(), eth_cache.clone()); + + let fee_history_cache = + FeeHistoryCache::new(eth_cache.clone(), FeeHistoryCacheConfig::default()); let eth_api = EthApi::with_spawner( provider.clone(), pool.clone(), @@ -67,6 +74,7 @@ where EthConfig::default().rpc_gas_cap, Box::new(executor.clone()), BlockingTaskPool::build().expect("failed to build tracing pool"), + fee_history_cache, ); let config = EthFilterConfig::default() .max_logs_per_response(DEFAULT_MAX_LOGS_PER_RESPONSE) diff --git a/crates/rpc/rpc-builder/src/eth.rs b/crates/rpc/rpc-builder/src/eth.rs index c40226d6ad..8da3405368 100644 --- a/crates/rpc/rpc-builder/src/eth.rs +++ b/crates/rpc/rpc-builder/src/eth.rs @@ -5,7 +5,7 @@ use reth_rpc::{ eth::{ cache::{EthStateCache, EthStateCacheConfig}, gas_oracle::GasPriceOracleConfig, - EthFilterConfig, RPC_DEFAULT_GAS_CAP, + EthFilterConfig, FeeHistoryCacheConfig, RPC_DEFAULT_GAS_CAP, }, BlockingTaskPool, EthApi, EthFilter, EthPubSub, }; @@ -46,6 +46,8 @@ pub struct EthConfig { /// /// Sets TTL for stale filters pub stale_filter_ttl: std::time::Duration, + /// Settings for the fee history cache + pub fee_history_cache: FeeHistoryCacheConfig, } impl EthConfig { @@ -71,6 +73,7 @@ impl Default for EthConfig { max_logs_per_response: DEFAULT_MAX_LOGS_PER_RESPONSE, rpc_gas_cap: RPC_DEFAULT_GAS_CAP.into(), stale_filter_ttl: DEFAULT_STALE_FILTER_TTL, + fee_history_cache: FeeHistoryCacheConfig::default(), } } } diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 9da2a43959..b2c7ff3af9 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -168,8 +168,9 @@ use reth_provider::{ use reth_rpc::{ eth::{ cache::{cache_new_blocks_task, EthStateCache}, + fee_history_cache_new_blocks_task, gas_oracle::GasPriceOracle, - EthBundle, + EthBundle, FeeHistoryCache, }, AdminApi, AuthLayer, BlockingTaskGuard, BlockingTaskPool, Claims, DebugApi, EngineEthApi, EthApi, EthFilter, EthPubSub, EthSubscriptionIdProvider, JwtAuthValidator, JwtSecret, NetApi, @@ -1153,6 +1154,10 @@ where } /// Creates the [EthHandlers] type the first time this is called. + /// + /// This will spawn the required service tasks for [EthApi] for: + /// - [EthStateCache] + /// - [FeeHistoryCache] fn with_eth(&mut self, f: F) -> R where F: FnOnce(&EthHandlers) -> R, @@ -1170,6 +1175,7 @@ where ); let new_canonical_blocks = self.events.canonical_state_stream(); let c = cache.clone(); + self.executor.spawn_critical( "cache canonical blocks task", Box::pin(async move { @@ -1177,6 +1183,19 @@ where }), ); + let fee_history_cache = + FeeHistoryCache::new(cache.clone(), self.config.eth.fee_history_cache.clone()); + let new_canonical_blocks = self.events.canonical_state_stream(); + let fhc = fee_history_cache.clone(); + let provider_clone = self.provider.clone(); + self.executor.spawn_critical( + "cache canonical blocks for fee history task", + Box::pin(async move { + fee_history_cache_new_blocks_task(fhc, new_canonical_blocks, provider_clone) + .await; + }), + ); + let executor = Box::new(self.executor.clone()); let blocking_task_pool = BlockingTaskPool::build().expect("failed to build tracing pool"); @@ -1189,6 +1208,7 @@ where self.config.eth.rpc_gas_cap, executor.clone(), blocking_task_pool.clone(), + fee_history_cache, ); let filter = EthFilter::new( self.provider.clone(), diff --git a/crates/rpc/rpc-types/src/serde_helpers/json_u256.rs b/crates/rpc/rpc-types/src/serde_helpers/json_u256.rs index 3ed3859a2c..a22280e8a8 100644 --- a/crates/rpc/rpc-types/src/serde_helpers/json_u256.rs +++ b/crates/rpc/rpc-types/src/serde_helpers/json_u256.rs @@ -162,11 +162,11 @@ where } else { // We could try to convert to a u128 here but there would probably be loss of // precision, so we just return an error. - return Err(Error::custom("Deserializing a large non-mainnet TTD is not supported")); + return Err(Error::custom("Deserializing a large non-mainnet TTD is not supported")) } } else { // must be i64 - negative numbers are not supported - return Err(Error::custom("Negative TTD values are invalid and will not be deserialized")); + return Err(Error::custom("Negative TTD values are invalid and will not be deserialized")) }; Ok(num) diff --git a/crates/rpc/rpc/src/eth/api/fee_history.rs b/crates/rpc/rpc/src/eth/api/fee_history.rs new file mode 100644 index 0000000000..f25ecc6e00 --- /dev/null +++ b/crates/rpc/rpc/src/eth/api/fee_history.rs @@ -0,0 +1,280 @@ +//! Consist of types adjacent to the fee history cache and its configs + +use crate::eth::{cache::EthStateCache, error::EthApiError, gas_oracle::MAX_HEADER_HISTORY}; +use futures::{Stream, StreamExt}; +use metrics::atomics::AtomicU64; +use reth_primitives::{Receipt, SealedBlock, TransactionSigned, B256, U256}; +use reth_provider::{BlockReaderIdExt, CanonStateNotification, ChainSpecProvider}; +use reth_rpc_types::TxGasAndReward; +use serde::{Deserialize, Serialize}; +use std::{ + collections::BTreeMap, + fmt::Debug, + sync::{atomic::Ordering::SeqCst, Arc}, +}; + +/// Settings for the [FeeHistoryCache]. +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct FeeHistoryCacheConfig { + /// Max number of blocks in cache. + /// + /// Default is [MAX_HEADER_HISTORY] plus some change to also serve slightly older blocks from + /// cache, since fee_history supports the entire range + pub max_blocks: u64, + /// Percentile approximation resolution + /// + /// Default is 4 which means 0.25 + pub resolution: u64, +} + +impl Default for FeeHistoryCacheConfig { + fn default() -> Self { + FeeHistoryCacheConfig { max_blocks: MAX_HEADER_HISTORY + 100, resolution: 4 } + } +} + +/// Wrapper struct for BTreeMap +#[derive(Debug, Clone)] +pub struct FeeHistoryCache { + /// Stores the lower bound of the cache + lower_bound: Arc, + upper_bound: Arc, + /// Config for FeeHistoryCache, consists of resolution for percentile approximation + /// and max number of blocks + config: FeeHistoryCacheConfig, + /// Stores the entries of the cache + entries: Arc>>, + #[allow(unused)] + eth_cache: EthStateCache, +} + +impl FeeHistoryCache { + /// Creates new FeeHistoryCache instance, initialize it with the mose recent data, set bounds + pub fn new(eth_cache: EthStateCache, config: FeeHistoryCacheConfig) -> Self { + let init_tree_map = BTreeMap::new(); + + let entries = Arc::new(tokio::sync::RwLock::new(init_tree_map)); + + let upper_bound = Arc::new(AtomicU64::new(0)); + let lower_bound = Arc::new(AtomicU64::new(0)); + + FeeHistoryCache { config, entries, upper_bound, lower_bound, eth_cache } + } + + /// How the cache is configured. + pub fn config(&self) -> &FeeHistoryCacheConfig { + &self.config + } + + /// Returns the configured resolution for percentile approximation. + #[inline] + pub fn resolution(&self) -> u64 { + self.config.resolution + } + + /// Processing of the arriving blocks + async fn insert_blocks(&self, blocks: I) + where + I: Iterator)>, + { + let mut entries = self.entries.write().await; + + let percentiles = self.predefined_percentiles(); + // Insert all new blocks and calculate approximated rewards + for (block, receipts) in blocks { + let mut fee_history_entry = FeeHistoryEntry::new(&block); + fee_history_entry.rewards = calculate_reward_percentiles_for_block( + &percentiles, + fee_history_entry.gas_used, + fee_history_entry.base_fee_per_gas, + &block.body, + &receipts, + ) + .unwrap_or_default(); + entries.insert(block.number, fee_history_entry); + } + + // enforce bounds by popping the oldest entries + while entries.len() > self.config.max_blocks as usize { + entries.pop_first(); + } + + if entries.len() == 0 { + self.upper_bound.store(0, SeqCst); + self.lower_bound.store(0, SeqCst); + return + } + + let upper_bound = *entries.last_entry().expect("Contains at least one entry").key(); + let lower_bound = *entries.first_entry().expect("Contains at least one entry").key(); + self.upper_bound.store(upper_bound, SeqCst); + self.lower_bound.store(lower_bound, SeqCst); + } + + /// Get UpperBound value for FeeHistoryCache + pub fn upper_bound(&self) -> u64 { + self.upper_bound.load(SeqCst) + } + + /// Get LowerBound value for FeeHistoryCache + pub fn lower_bound(&self) -> u64 { + self.lower_bound.load(SeqCst) + } + + /// Collect fee history for given range. + /// + /// This function retrieves fee history entries from the cache for the specified range. + /// If the requested range (start_block to end_block) is within the cache bounds, + /// it returns the corresponding entries. + /// Otherwise it returns None. + pub async fn get_history( + &self, + start_block: u64, + end_block: u64, + ) -> Option> { + let lower_bound = self.lower_bound(); + let upper_bound = self.upper_bound(); + if start_block >= lower_bound && end_block <= upper_bound { + let entries = self.entries.read().await; + let result = entries + .range(start_block..=end_block + 1) + .map(|(_, fee_entry)| fee_entry.clone()) + .collect::>(); + + if result.is_empty() { + return None + } + + Some(result) + } else { + None + } + } + + /// Generates predefined set of percentiles + /// + /// This returns 100 * resolution points + pub fn predefined_percentiles(&self) -> Vec { + let res = self.resolution() as f64; + (0..=100 * self.resolution()).map(|p| p as f64 / res).collect() + } +} + +/// Awaits for new chain events and directly inserts them into the cache so they're available +/// immediately before they need to be fetched from disk. +pub async fn fee_history_cache_new_blocks_task( + fee_history_cache: FeeHistoryCache, + mut events: St, + _provider: Provider, +) where + St: Stream + Unpin + 'static, + Provider: BlockReaderIdExt + ChainSpecProvider + 'static, +{ + // TODO: keep track of gaps in the chain and fill them from disk + + while let Some(event) = events.next().await { + if let Some(committed) = event.committed() { + let (blocks, receipts): (Vec<_>, Vec<_>) = committed + .blocks_and_receipts() + .map(|(block, receipts)| { + (block.block.clone(), receipts.iter().flatten().cloned().collect::>()) + }) + .unzip(); + fee_history_cache.insert_blocks(blocks.into_iter().zip(receipts)).await; + } + } +} + +/// Calculates reward percentiles for transactions in a block header. +/// Given a list of percentiles and a sealed block header, this function computes +/// the corresponding rewards for the transactions at each percentile. +/// +/// The results are returned as a vector of U256 values. +pub(crate) fn calculate_reward_percentiles_for_block( + percentiles: &[f64], + gas_used: u64, + base_fee_per_gas: u64, + transactions: &[TransactionSigned], + receipts: &[Receipt], +) -> Result, EthApiError> { + let mut transactions = transactions + .iter() + .zip(receipts) + .scan(0, |previous_gas, (tx, receipt)| { + // Convert the cumulative gas used in the receipts + // to the gas usage by the transaction + // + // While we will sum up the gas again later, it is worth + // noting that the order of the transactions will be different, + // so the sum will also be different for each receipt. + let gas_used = receipt.cumulative_gas_used - *previous_gas; + *previous_gas = receipt.cumulative_gas_used; + + Some(TxGasAndReward { + gas_used, + reward: tx.effective_tip_per_gas(Some(base_fee_per_gas)).unwrap_or_default(), + }) + }) + .collect::>(); + + // Sort the transactions by their rewards in ascending order + transactions.sort_by_key(|tx| tx.reward); + + // Find the transaction that corresponds to the given percentile + // + // We use a `tx_index` here that is shared across all percentiles, since we know + // the percentiles are monotonically increasing. + let mut tx_index = 0; + let mut cumulative_gas_used = transactions.first().map(|tx| tx.gas_used).unwrap_or_default(); + let mut rewards_in_block = Vec::new(); + for percentile in percentiles { + // Empty blocks should return in a zero row + if transactions.is_empty() { + rewards_in_block.push(U256::ZERO); + continue + } + + let threshold = (gas_used as f64 * percentile / 100.) as u64; + while cumulative_gas_used < threshold && tx_index < transactions.len() - 1 { + tx_index += 1; + cumulative_gas_used += transactions[tx_index].gas_used; + } + rewards_in_block.push(U256::from(transactions[tx_index].reward)); + } + + Ok(rewards_in_block) +} + +/// A cached entry for a block's fee history. +#[derive(Debug, Clone)] +pub struct FeeHistoryEntry { + /// The base fee per gas for this block. + pub base_fee_per_gas: u64, + /// Gas used ratio this block. + pub gas_used_ratio: f64, + /// Gas used by this block. + pub gas_used: u64, + /// Gas limit by this block. + pub gas_limit: u64, + /// Hash of the block. + pub header_hash: B256, + /// Approximated rewards for the configured percentiles. + pub rewards: Vec, +} + +impl FeeHistoryEntry { + /// Creates a new entry from a sealed block. + /// + /// Note: This does not calculate the rewards for the block. + pub fn new(block: &SealedBlock) -> Self { + FeeHistoryEntry { + base_fee_per_gas: block.base_fee_per_gas.unwrap_or_default(), + gas_used_ratio: block.gas_used as f64 / block.gas_limit as f64, + gas_used: block.gas_used, + header_hash: block.hash, + gas_limit: block.gas_limit, + rewards: Vec::new(), + } + } +} diff --git a/crates/rpc/rpc/src/eth/api/fees.rs b/crates/rpc/rpc/src/eth/api/fees.rs index a16ca5de93..481a8d01a3 100644 --- a/crates/rpc/rpc/src/eth/api/fees.rs +++ b/crates/rpc/rpc/src/eth/api/fees.rs @@ -1,17 +1,17 @@ //! Contains RPC handler implementations for fee history. use crate::{ - eth::error::{EthApiError, EthResult}, + eth::{ + api::fee_history::{calculate_reward_percentiles_for_block, FeeHistoryEntry}, + error::{EthApiError, EthResult}, + }, EthApi, }; use reth_network_api::NetworkInfo; -use reth_primitives::{ - basefee::calculate_next_block_base_fee, BlockNumberOrTag, SealedHeader, U256, -}; +use reth_primitives::{basefee::calculate_next_block_base_fee, BlockNumberOrTag, U256}; use reth_provider::{BlockReaderIdExt, ChainSpecProvider, EvmEnvProvider, StateProviderFactory}; -use reth_rpc_types::{FeeHistory, TxGasAndReward}; +use reth_rpc_types::FeeHistory; use reth_transaction_pool::TransactionPool; - use tracing::debug; impl EthApi @@ -46,8 +46,10 @@ where self.gas_oracle().suggest_tip_cap().await } - /// Reports the fee history, for the given amount of blocks, up until the newest block - /// provided. + /// Reports the fee history, for the given amount of blocks, up until the given newest block. + /// + /// If `reward_percentiles` are provided the [FeeHistory] will include the _approximated_ + /// rewards for the requested range. pub(crate) async fn fee_history( &self, mut block_count: u64, @@ -85,9 +87,9 @@ where block_count = end_block_plus; } - // If reward percentiles were specified, we need to validate that they are monotonically + // If reward percentiles were specified, we + // need to validate that they are monotonically // increasing and 0 <= p <= 100 - // // Note: The types used ensure that the percentiles are never < 0 if let Some(percentiles) = &reward_percentiles { if percentiles.windows(2).any(|w| w[0] > w[1] || w[0] > 100.) { @@ -101,38 +103,89 @@ where // otherwise `newest_block - 2 // SAFETY: We ensured that block count is capped let start_block = end_block_plus - block_count; - let headers = self.provider().sealed_headers_range(start_block..=end_block)?; - if headers.len() != block_count as usize { - return Err(EthApiError::InvalidBlockRange) - } // Collect base fees, gas usage ratios and (optionally) reward percentile data let mut base_fee_per_gas: Vec = Vec::new(); let mut gas_used_ratio: Vec = Vec::new(); let mut rewards: Vec> = Vec::new(); - for header in &headers { - base_fee_per_gas - .push(U256::try_from(header.base_fee_per_gas.unwrap_or_default()).unwrap()); - gas_used_ratio.push(header.gas_used as f64 / header.gas_limit as f64); - // Percentiles were specified, so we need to collect reward percentile ino - if let Some(percentiles) = &reward_percentiles { - rewards.push(self.calculate_reward_percentiles(percentiles, header).await?); + // Check if the requested range is within the cache bounds + let fee_entries = self.fee_history_cache().get_history(start_block, end_block).await; + + if let Some(fee_entries) = fee_entries { + if fee_entries.len() != block_count as usize { + return Err(EthApiError::InvalidBlockRange) } - } - // The spec states that `base_fee_per_gas` "[..] includes the next block after the newest of - // the returned range, because this value can be derived from the newest block" - // - // The unwrap is safe since we checked earlier that we got at least 1 header. - let last_header = headers.last().unwrap(); - let chain_spec = self.provider().chain_spec(); - base_fee_per_gas.push(U256::from(calculate_next_block_base_fee( - last_header.gas_used, - last_header.gas_limit, - last_header.base_fee_per_gas.unwrap_or_default(), - chain_spec.base_fee_params, - ))); + for entry in &fee_entries { + base_fee_per_gas.push(U256::from(entry.base_fee_per_gas)); + gas_used_ratio.push(entry.gas_used_ratio); + + if let Some(percentiles) = &reward_percentiles { + let mut block_rewards = Vec::with_capacity(percentiles.len()); + for &percentile in percentiles.iter() { + block_rewards.push(self.approximate_percentile(entry, percentile)); + } + rewards.push(block_rewards); + } + } + let last_entry = fee_entries.last().expect("is not empty"); + base_fee_per_gas.push(U256::from(calculate_next_block_base_fee( + last_entry.gas_used, + last_entry.gas_limit, + last_entry.base_fee_per_gas, + self.provider().chain_spec().base_fee_params, + ))); + } else { + // read the requested header range + let headers = self.provider().sealed_headers_range(start_block..=end_block)?; + if headers.len() != block_count as usize { + return Err(EthApiError::InvalidBlockRange) + } + + for header in &headers { + base_fee_per_gas.push(U256::from(header.base_fee_per_gas.unwrap_or_default())); + gas_used_ratio.push(header.gas_used as f64 / header.gas_limit as f64); + + // Percentiles were specified, so we need to collect reward percentile ino + if let Some(percentiles) = &reward_percentiles { + let (transactions, receipts) = self + .cache() + .get_transactions_and_receipts(header.hash) + .await? + .ok_or(EthApiError::InvalidBlockRange)?; + rewards.push( + calculate_reward_percentiles_for_block( + percentiles, + header.gas_used, + header.base_fee_per_gas.unwrap_or_default(), + &transactions, + &receipts, + ) + .unwrap_or_default(), + ); + } + } + + // The spec states that `base_fee_per_gas` "[..] includes the next block after the + // newest of the returned range, because this value can be derived from the + // newest block" + // + // The unwrap is safe since we checked earlier that we got at least 1 header. + + // The spec states that `base_fee_per_gas` "[..] includes the next block after the + // newest of the returned range, because this value can be derived from the + // newest block" + // + // The unwrap is safe since we checked earlier that we got at least 1 header. + let last_header = headers.last().expect("is present"); + base_fee_per_gas.push(U256::from(calculate_next_block_base_fee( + last_header.gas_used, + last_header.gas_limit, + last_header.base_fee_per_gas.unwrap_or_default(), + self.provider().chain_spec().base_fee_params, + ))); + }; Ok(FeeHistory { base_fee_per_gas, @@ -142,68 +195,17 @@ where }) } - /// Calculates reward percentiles for transactions in a block header. - /// Given a list of percentiles and a sealed block header, this function computes - /// the corresponding rewards for the transactions at each percentile. - /// - /// The results are returned as a vector of U256 values. - async fn calculate_reward_percentiles( - &self, - percentiles: &[f64], - header: &SealedHeader, - ) -> Result, EthApiError> { - let (transactions, receipts) = self - .cache() - .get_transactions_and_receipts(header.hash) - .await? - .ok_or(EthApiError::InvalidBlockRange)?; + /// Approximates reward at a given percentile for a specific block + /// Based on the configured resolution + fn approximate_percentile(&self, entry: &FeeHistoryEntry, requested_percentile: f64) -> U256 { + let resolution = self.fee_history_cache().resolution(); + let rounded_percentile = + (requested_percentile * resolution as f64).round() / resolution as f64; + let clamped_percentile = rounded_percentile.clamp(0.0, 100.0); - let mut transactions = transactions - .into_iter() - .zip(receipts) - .scan(0, |previous_gas, (tx, receipt)| { - // Convert the cumulative gas used in the receipts - // to the gas usage by the transaction - // - // While we will sum up the gas again later, it is worth - // noting that the order of the transactions will be different, - // so the sum will also be different for each receipt. - let gas_used = receipt.cumulative_gas_used - *previous_gas; - *previous_gas = receipt.cumulative_gas_used; - - Some(TxGasAndReward { - gas_used, - reward: tx.effective_tip_per_gas(header.base_fee_per_gas).unwrap_or_default(), - }) - }) - .collect::>(); - - // Sort the transactions by their rewards in ascending order - transactions.sort_by_key(|tx| tx.reward); - - // Find the transaction that corresponds to the given percentile - // - // We use a `tx_index` here that is shared across all percentiles, since we know - // the percentiles are monotonically increasing. - let mut tx_index = 0; - let mut cumulative_gas_used = - transactions.first().map(|tx| tx.gas_used).unwrap_or_default(); - let mut rewards_in_block = Vec::new(); - for percentile in percentiles { - // Empty blocks should return in a zero row - if transactions.is_empty() { - rewards_in_block.push(U256::ZERO); - continue - } - - let threshold = (header.gas_used as f64 * percentile / 100.) as u64; - while cumulative_gas_used < threshold && tx_index < transactions.len() - 1 { - tx_index += 1; - cumulative_gas_used += transactions[tx_index].gas_used; - } - rewards_in_block.push(U256::from(transactions[tx_index].reward)); - } - - Ok(rewards_in_block) + // Calculate the index in the precomputed rewards array + let index = (clamped_percentile / (1.0 / resolution as f64)).round() as usize; + // Fetch the reward from the FeeHistoryEntry + entry.rewards.get(index).cloned().unwrap_or(U256::ZERO) } } diff --git a/crates/rpc/rpc/src/eth/api/mod.rs b/crates/rpc/rpc/src/eth/api/mod.rs index 4ff22571de..96830b9f30 100644 --- a/crates/rpc/rpc/src/eth/api/mod.rs +++ b/crates/rpc/rpc/src/eth/api/mod.rs @@ -1,15 +1,17 @@ -//! Provides everything related to `eth_` namespace -//! //! The entire implementation of the namespace is quite large, hence it is divided across several //! files. use crate::eth::{ - api::pending_block::{PendingBlock, PendingBlockEnv, PendingBlockEnvOrigin}, + api::{ + fee_history::FeeHistoryCache, + pending_block::{PendingBlock, PendingBlockEnv, PendingBlockEnvOrigin}, + }, cache::EthStateCache, error::{EthApiError, EthResult}, gas_oracle::GasPriceOracle, signer::EthSigner, }; + use async_trait::async_trait; use reth_interfaces::RethResult; use reth_network_api::NetworkInfo; @@ -17,6 +19,7 @@ use reth_primitives::{ revm_primitives::{BlockEnv, CfgEnv}, Address, BlockId, BlockNumberOrTag, ChainInfo, SealedBlockWithSenders, B256, U256, U64, }; + use reth_provider::{ BlockReaderIdExt, ChainSpecProvider, EvmEnvProvider, StateProviderBox, StateProviderFactory, }; @@ -24,14 +27,17 @@ use reth_rpc_types::{SyncInfo, SyncStatus}; use reth_tasks::{TaskSpawner, TokioTaskExecutor}; use reth_transaction_pool::TransactionPool; use std::{ + fmt::Debug, future::Future, sync::Arc, time::{Duration, Instant}, }; + use tokio::sync::{oneshot, Mutex}; mod block; mod call; +pub(crate) mod fee_history; mod fees; #[cfg(feature = "optimism")] mod optimism; @@ -86,6 +92,7 @@ where Provider: BlockReaderIdExt + ChainSpecProvider, { /// Creates a new, shareable instance using the default tokio task spawner. + #[allow(clippy::too_many_arguments)] pub fn new( provider: Provider, pool: Pool, @@ -94,6 +101,7 @@ where gas_oracle: GasPriceOracle, gas_cap: impl Into, blocking_task_pool: BlockingTaskPool, + fee_history_cache: FeeHistoryCache, ) -> Self { Self::with_spawner( provider, @@ -104,6 +112,7 @@ where gas_cap.into().into(), Box::::default(), blocking_task_pool, + fee_history_cache, ) } @@ -118,6 +127,7 @@ where gas_cap: u64, task_spawner: Box, blocking_task_pool: BlockingTaskPool, + fee_history_cache: FeeHistoryCache, ) -> Self { // get the block number of the latest block let latest_block = provider @@ -139,9 +149,11 @@ where task_spawner, pending_block: Default::default(), blocking_task_pool, + fee_history_cache, #[cfg(feature = "optimism")] http_client: reqwest::Client::new(), }; + Self { inner: Arc::new(inner) } } @@ -194,6 +206,11 @@ where pub fn pool(&self) -> &Pool { &self.inner.pool } + + /// Returns fee history cache + pub fn fee_history_cache(&self) -> &FeeHistoryCache { + &self.inner.fee_history_cache + } } // === State access helpers === @@ -448,6 +465,8 @@ struct EthApiInner { pending_block: Mutex>, /// A pool dedicated to blocking tasks. blocking_task_pool: BlockingTaskPool, + /// Cache for block fees history + fee_history_cache: FeeHistoryCache, /// An http client for communicating with sequencers. #[cfg(feature = "optimism")] http_client: reqwest::Client, diff --git a/crates/rpc/rpc/src/eth/api/server.rs b/crates/rpc/rpc/src/eth/api/server.rs index 08bfa54db0..c63de87e71 100644 --- a/crates/rpc/rpc/src/eth/api/server.rs +++ b/crates/rpc/rpc/src/eth/api/server.rs @@ -391,7 +391,10 @@ where #[cfg(test)] mod tests { use crate::{ - eth::{cache::EthStateCache, gas_oracle::GasPriceOracle}, + eth::{ + cache::EthStateCache, gas_oracle::GasPriceOracle, FeeHistoryCache, + FeeHistoryCacheConfig, + }, BlockingTaskPool, EthApi, }; use jsonrpsee::types::error::INVALID_PARAMS_CODE; @@ -422,14 +425,19 @@ mod tests { provider: P, ) -> EthApi { let cache = EthStateCache::spawn(provider.clone(), Default::default()); + + let fee_history_cache = + FeeHistoryCache::new(cache.clone(), FeeHistoryCacheConfig::default()); + EthApi::new( provider.clone(), testing_pool(), NoopNetwork::default(), cache.clone(), - GasPriceOracle::new(provider, Default::default(), cache), + GasPriceOracle::new(provider.clone(), Default::default(), cache.clone()), ETHEREUM_BLOCK_GAS_LIMIT, BlockingTaskPool::build().expect("failed to build tracing pool"), + fee_history_cache, ) } @@ -446,6 +454,7 @@ mod tests { let mut gas_used_ratios = Vec::new(); let mut base_fees_per_gas = Vec::new(); let mut last_header = None; + let mut parent_hash = B256::default(); for i in (0..block_count).rev() { let hash = rng.gen(); @@ -459,9 +468,11 @@ mod tests { gas_limit, gas_used, base_fee_per_gas, + parent_hash, ..Default::default() }; last_header = Some(header.clone()); + parent_hash = hash; let mut transactions = vec![]; for _ in 0..100 { diff --git a/crates/rpc/rpc/src/eth/api/state.rs b/crates/rpc/rpc/src/eth/api/state.rs index 4cf4a69a6a..a63e08126d 100644 --- a/crates/rpc/rpc/src/eth/api/state.rs +++ b/crates/rpc/rpc/src/eth/api/state.rs @@ -125,7 +125,10 @@ where mod tests { use super::*; use crate::{ - eth::{cache::EthStateCache, gas_oracle::GasPriceOracle}, + eth::{ + cache::EthStateCache, gas_oracle::GasPriceOracle, FeeHistoryCache, + FeeHistoryCacheConfig, + }, BlockingTaskPool, }; use reth_primitives::{constants::ETHEREUM_BLOCK_GAS_LIMIT, StorageKey, StorageValue}; @@ -144,9 +147,10 @@ mod tests { pool.clone(), (), cache.clone(), - GasPriceOracle::new(NoopProvider::default(), Default::default(), cache), + GasPriceOracle::new(NoopProvider::default(), Default::default(), cache.clone()), ETHEREUM_BLOCK_GAS_LIMIT, BlockingTaskPool::build().expect("failed to build tracing pool"), + FeeHistoryCache::new(cache.clone(), FeeHistoryCacheConfig::default()), ); let address = Address::random(); let storage = eth_api.storage_at(address, U256::ZERO.into(), None).unwrap(); @@ -166,9 +170,10 @@ mod tests { pool, (), cache.clone(), - GasPriceOracle::new(mock_provider, Default::default(), cache), + GasPriceOracle::new(mock_provider.clone(), Default::default(), cache.clone()), ETHEREUM_BLOCK_GAS_LIMIT, BlockingTaskPool::build().expect("failed to build tracing pool"), + FeeHistoryCache::new(cache.clone(), FeeHistoryCacheConfig::default()), ); let storage_key: U256 = storage_key.into(); diff --git a/crates/rpc/rpc/src/eth/api/transactions.rs b/crates/rpc/rpc/src/eth/api/transactions.rs index 58af7c7697..0da8886cc9 100644 --- a/crates/rpc/rpc/src/eth/api/transactions.rs +++ b/crates/rpc/rpc/src/eth/api/transactions.rs @@ -1254,7 +1254,10 @@ pub(crate) fn build_transaction_receipt_with_block_receipts( mod tests { use super::*; use crate::{ - eth::{cache::EthStateCache, gas_oracle::GasPriceOracle}, + eth::{ + cache::EthStateCache, gas_oracle::GasPriceOracle, FeeHistoryCache, + FeeHistoryCacheConfig, + }, BlockingTaskPool, EthApi, }; use reth_network_api::noop::NoopNetwork; @@ -1270,14 +1273,17 @@ mod tests { let pool = testing_pool(); let cache = EthStateCache::spawn(noop_provider, Default::default()); + let fee_history_cache = + FeeHistoryCache::new(cache.clone(), FeeHistoryCacheConfig::default()); let eth_api = EthApi::new( noop_provider, pool.clone(), noop_network_provider, cache.clone(), - GasPriceOracle::new(noop_provider, Default::default(), cache), + GasPriceOracle::new(noop_provider, Default::default(), cache.clone()), ETHEREUM_BLOCK_GAS_LIMIT, BlockingTaskPool::build().expect("failed to build tracing pool"), + fee_history_cache, ); // https://etherscan.io/tx/0xa694b71e6c128a2ed8e2e0f6770bddbe52e3bb8f10e8472f9a79ab81497a8b5d diff --git a/crates/rpc/rpc/src/eth/gas_oracle.rs b/crates/rpc/rpc/src/eth/gas_oracle.rs index f3afc3ff7b..4eaaa5cdcf 100644 --- a/crates/rpc/rpc/src/eth/gas_oracle.rs +++ b/crates/rpc/rpc/src/eth/gas_oracle.rs @@ -16,6 +16,9 @@ use tracing::warn; /// The number of transactions sampled in a block pub const SAMPLE_NUMBER: usize = 3_usize; +/// The default maximum number of blocks to use for the gas price oracle. +pub const MAX_HEADER_HISTORY: u64 = 1024; + /// The default maximum gas price to use for the estimate pub const DEFAULT_MAX_PRICE: U256 = U256::from_limbs([500_000_000_000u64, 0, 0, 0]); @@ -53,8 +56,8 @@ impl Default for GasPriceOracleConfig { GasPriceOracleConfig { blocks: 20, percentile: 60, - max_header_history: 1024, - max_block_history: 1024, + max_header_history: MAX_HEADER_HISTORY, + max_block_history: MAX_HEADER_HISTORY, default: None, max_price: Some(DEFAULT_MAX_PRICE), ignore_price: Some(DEFAULT_IGNORE_PRICE), @@ -73,8 +76,8 @@ impl GasPriceOracleConfig { Self { blocks: blocks.unwrap_or(20), percentile: percentile.unwrap_or(60), - max_header_history: 1024, - max_block_history: 1024, + max_header_history: MAX_HEADER_HISTORY, + max_block_history: MAX_HEADER_HISTORY, default: None, max_price: max_price.map(U256::from).or(Some(DEFAULT_MAX_PRICE)), ignore_price: ignore_price.map(U256::from).or(Some(DEFAULT_IGNORE_PRICE)), diff --git a/crates/rpc/rpc/src/eth/mod.rs b/crates/rpc/rpc/src/eth/mod.rs index 90ee0e1042..97f57af688 100644 --- a/crates/rpc/rpc/src/eth/mod.rs +++ b/crates/rpc/rpc/src/eth/mod.rs @@ -13,7 +13,11 @@ pub mod revm_utils; mod signer; pub(crate) mod utils; -pub use api::{EthApi, EthApiSpec, EthTransactions, TransactionSource, RPC_DEFAULT_GAS_CAP}; +pub use api::{ + fee_history::{fee_history_cache_new_blocks_task, FeeHistoryCache, FeeHistoryCacheConfig}, + EthApi, EthApiSpec, EthTransactions, TransactionSource, RPC_DEFAULT_GAS_CAP, +}; + pub use bundle::EthBundle; pub use filter::{EthFilter, EthFilterConfig}; pub use id_provider::EthSubscriptionIdProvider; diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 244011c7b6..585f73e16b 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -1114,7 +1114,6 @@ impl BlockReader for DatabaseProvider { transaction_kind: TransactionVariant, ) -> ProviderResult> { let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) }; - let Some(header) = self.header_by_number(block_number)? else { return Ok(None) }; let ommers = self.ommers(block_number.into())?.unwrap_or_default(); diff --git a/crates/storage/provider/src/test_utils/mock.rs b/crates/storage/provider/src/test_utils/mock.rs index e7f8726664..ddc8718aa4 100644 --- a/crates/storage/provider/src/test_utils/mock.rs +++ b/crates/storage/provider/src/test_utils/mock.rs @@ -462,8 +462,14 @@ impl BlockReader for MockEthProvider { Ok(None) } - fn block_range(&self, _range: RangeInclusive) -> ProviderResult> { - Ok(vec![]) + fn block_range(&self, range: RangeInclusive) -> ProviderResult> { + let lock = self.blocks.lock(); + + let mut blocks: Vec<_> = + lock.values().filter(|block| range.contains(&block.number)).cloned().collect(); + blocks.sort_by_key(|block| block.number); + + Ok(blocks) } } diff --git a/crates/transaction-pool/src/pool/blob.rs b/crates/transaction-pool/src/pool/blob.rs index 24c1fa6cad..e7f6fa8f24 100644 --- a/crates/transaction-pool/src/pool/blob.rs +++ b/crates/transaction-pool/src/pool/blob.rs @@ -290,7 +290,7 @@ const LOG_2_1_125: f64 = 0.16992500144231237; pub fn fee_delta(max_tx_fee: u128, current_fee: u128) -> i64 { if max_tx_fee == current_fee { // if these are equal, then there's no fee jump - return 0; + return 0 } let max_tx_fee_jumps = if max_tx_fee == 0 {