Merge remote-tracking branch 'origin/main' into transaction-fetcher

This commit is contained in:
Sean Matthew
2023-10-18 14:28:45 -04:00
59 changed files with 832 additions and 562 deletions

View File

@@ -83,7 +83,7 @@ body:
id: prune-config
attributes:
label: What prune config do you use, if any?
description: `[prune]` section in `reth.toml` file
description: The `[prune]` section in `reth.toml` file
validations:
required: false
- type: input

View File

@@ -94,6 +94,11 @@ jobs:
--debug.tip 0x91c90676cab257a59cd956d7cb0bceb9b1a71d79755c23c7277a0697ccfaf8c4 \
--debug.max-block 100000 \
--debug.terminate
- name: Verify the target block hash
run: |
cargo run --profile release \
db get CanonicalHeaders 100000 | grep 0x91c90676cab257a59cd956d7cb0bceb9b1a71d79755c23c7277a0697ccfaf8c4
integration-success:
if: always()

2
Cargo.lock generated
View File

@@ -6184,6 +6184,7 @@ dependencies = [
"assert_matches",
"async-trait",
"bytes",
"derive_more",
"futures",
"http",
"http-body",
@@ -6447,6 +6448,7 @@ name = "reth-trie"
version = "0.1.0-alpha.10"
dependencies = [
"alloy-rlp",
"auto_impl",
"criterion",
"derive_more",
"once_cell",

View File

@@ -2,26 +2,24 @@ use super::tui::DbListTUI;
use crate::utils::{DbTool, ListFilter};
use clap::Parser;
use eyre::WrapErr;
use reth_db::{database::Database, table::Table, DatabaseEnvRO, TableType, TableViewer, Tables};
use reth_db::{database::Database, table::Table, DatabaseEnvRO, TableViewer, Tables};
use reth_primitives::hex;
use std::cell::RefCell;
use tracing::error;
const DEFAULT_NUM_ITEMS: &str = "5";
#[derive(Parser, Debug)]
/// The arguments for the `reth db list` command
pub struct Command {
/// The table name
table: Tables,
/// Skip first N entries
#[arg(long, short, default_value = "0")]
#[arg(long, short, default_value_t = 0)]
skip: usize,
/// Reverse the order of the entries. If enabled last table entries are read.
#[arg(long, short, default_value = "false")]
#[arg(long, short, default_value_t = false)]
reverse: bool,
/// How many items to take from the walker
#[arg(long, short, default_value = DEFAULT_NUM_ITEMS)]
#[arg(long, short, default_value_t = 5)]
len: usize,
/// Search parameter for both keys and values. Prefix it with `0x` to search for binary data,
/// and text otherwise.
@@ -41,13 +39,7 @@ pub struct Command {
impl Command {
/// Execute `db list` command
pub fn execute(self, tool: &DbTool<'_, DatabaseEnvRO>) -> eyre::Result<()> {
if self.table.table_type() == TableType::DupSort {
error!(target: "reth::cli", "Unsupported table.");
}
self.table.view(&ListTableViewer { tool, args: &self })?;
Ok(())
self.table.view(&ListTableViewer { tool, args: &self })
}
/// Generate [`ListFilter`] from command.
@@ -105,7 +97,7 @@ impl TableViewer<()> for ListTableViewer<'_> {
if self.args.count {
println!("{count} entries found.")
}else {
} else {
println!("{}", serde_json::to_string_pretty(&list)?);
}
Ok(())

View File

@@ -33,7 +33,7 @@ use reth_rpc_types::engine::{
CancunPayloadFields, ExecutionPayload, PayloadAttributes, PayloadError, PayloadStatus,
PayloadStatusEnum, PayloadValidationError,
};
use reth_rpc_types_compat::payload::{try_into_block, validate_block_hash};
use reth_rpc_types_compat::engine::payload::{try_into_block, validate_block_hash};
use reth_stages::{ControlFlow, Pipeline, PipelineError};
use reth_tasks::TaskSpawner;
use std::{

View File

@@ -468,7 +468,7 @@ mod tests {
BlockBody, BlockHash, BlockHashOrNumber, Bytes, ChainSpecBuilder, Header, Signature,
TransactionKind, TransactionSigned, Withdrawal, MAINNET, U256,
};
use std::ops::RangeInclusive;
use std::ops::RangeBounds;
mock! {
WithdrawalsProvider {}
@@ -539,13 +539,13 @@ mod tests {
Ok(None)
}
fn headers_range(&self, _range: RangeInclusive<BlockNumber>) -> RethResult<Vec<Header>> {
fn headers_range(&self, _range: impl RangeBounds<BlockNumber>) -> RethResult<Vec<Header>> {
Ok(vec![])
}
fn sealed_headers_range(
&self,
_range: RangeInclusive<BlockNumber>,
_range: impl RangeBounds<BlockNumber>,
) -> RethResult<Vec<SealedHeader>> {
Ok(vec![])
}

View File

@@ -314,7 +314,7 @@ where
}
type PayloadFuture =
Pin<Box<dyn Future<Output = Result<Arc<BuiltPayload>, PayloadBuilderError>> + Send>>;
Pin<Box<dyn Future<Output = Result<Arc<BuiltPayload>, PayloadBuilderError>> + Send + Sync>>;
/// Message type for the [PayloadBuilderService].
enum PayloadServiceCommand {

View File

@@ -17,6 +17,7 @@ pub struct MerkleCheckpoint {
pub target_block: BlockNumber,
/// The last hashed account key processed.
pub last_account_key: B256,
// TODO: remove in the next breaking release.
/// The last walker key processed.
pub last_walker_key: Vec<u8>,
/// Previously recorded walker stack.
@@ -30,11 +31,16 @@ impl MerkleCheckpoint {
pub fn new(
target_block: BlockNumber,
last_account_key: B256,
last_walker_key: Vec<u8>,
walker_stack: Vec<StoredSubNode>,
state: HashBuilderState,
) -> Self {
Self { target_block, last_account_key, last_walker_key, walker_stack, state }
Self {
target_block,
last_account_key,
walker_stack,
state,
last_walker_key: Vec::default(),
}
}
}

View File

@@ -553,7 +553,9 @@ mod tests {
use reth_primitives::{
bytes,
constants::{BEACON_ROOTS_ADDRESS, SYSTEM_ADDRESS},
keccak256, Account, Bytecode, Bytes, ChainSpecBuilder, ForkCondition, StorageKey, MAINNET,
keccak256,
trie::AccountProof,
Account, Bytecode, Bytes, ChainSpecBuilder, ForkCondition, StorageKey, MAINNET,
};
use reth_provider::{AccountReader, BlockHashReader, StateRootProvider};
use revm::{Database, TransitionState};
@@ -634,12 +636,8 @@ mod tests {
Ok(self.contracts.get(&code_hash).cloned())
}
fn proof(
&self,
_address: Address,
_keys: &[B256],
) -> RethResult<(Vec<Bytes>, B256, Vec<Vec<Bytes>>)> {
todo!()
fn proof(&self, _address: Address, _keys: &[B256]) -> RethResult<AccountProof> {
unimplemented!("proof generation is not supported")
}
}

View File

@@ -0,0 +1,60 @@
use jsonrpsee::proc_macros::rpc;
use reth_primitives::{Bytes, B256};
use reth_rpc_types::{
CancelBundleRequest, CancelPrivateTransactionRequest, EthBundleHash, EthCallBundleResponse,
EthCallBundleTransactionResult, EthSendBundle, PrivateTransactionRequest,
};
/// Eth bundle rpc interface.
///
/// See also <https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint>
/// Eth bundle rpc interface.
///
/// See also <https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint>
#[cfg_attr(not(feature = "client"), rpc(server, namespace = "eth"))]
#[cfg_attr(feature = "client", rpc(server, client, namespace = "eth"))]
#[async_trait::async_trait]
pub trait EthBundleApi {
/// `eth_sendBundle` can be used to send your bundles to the builder.
#[method(name = "sendBundle")]
async fn send_bundle(&self, bundle: EthSendBundle)
-> jsonrpsee::core::RpcResult<EthBundleHash>;
/// `eth_callBundle` can be used to simulate a bundle against a specific block number,
/// including simulating a bundle at the top of the next block.
#[method(name = "callBundle")]
async fn call_bundle(
&self,
request: EthCallBundleResponse,
) -> jsonrpsee::core::RpcResult<EthCallBundleTransactionResult>;
/// `eth_cancelBundle` is used to prevent a submitted bundle from being included on-chain. See [bundle cancellations](https://docs.flashbots.net/flashbots-auction/searchers/advanced/bundle-cancellations) for more information.
#[method(name = "cancelBundle")]
async fn cancel_bundle(&self, request: CancelBundleRequest) -> jsonrpsee::core::RpcResult<()>;
/// `eth_sendPrivateTransaction` is used to send a single transaction to Flashbots. Flashbots will attempt to build a block including the transaction for the next 25 blocks. See [Private Transactions](https://docs.flashbots.net/flashbots-protect/additional-documentation/eth-sendPrivateTransaction) for more info.
#[method(name = "sendPrivateTransaction")]
async fn send_private_transaction(
&self,
request: PrivateTransactionRequest,
) -> jsonrpsee::core::RpcResult<B256>;
/// The `eth_sendPrivateRawTransaction` method can be used to send private transactions to
/// the RPC endpoint. Private transactions are protected from frontrunning and kept
/// private until included in a block. A request to this endpoint needs to follow
/// the standard eth_sendRawTransaction
#[method(name = "sendPrivateRawTransaction")]
async fn send_private_raw_transaction(&self, bytes: Bytes) -> jsonrpsee::core::RpcResult<B256>;
/// The `eth_cancelPrivateTransaction` method stops private transactions from being
/// submitted for future blocks.
///
/// A transaction can only be cancelled if the request is signed by the same key as the
/// eth_sendPrivateTransaction call submitting the transaction in first place.
#[method(name = "cancelPrivateTransaction")]
async fn cancel_private_transaction(
&self,
request: CancelPrivateTransactionRequest,
) -> jsonrpsee::core::RpcResult<bool>;
}

View File

@@ -16,11 +16,13 @@
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
mod admin;
mod bundle;
mod debug;
mod engine;
mod eth;
mod eth_filter;
mod eth_pubsub;
mod mev;
mod net;
mod otterscan;
mod reth;
@@ -36,11 +38,13 @@ pub use servers::*;
pub mod servers {
pub use crate::{
admin::AdminApiServer,
bundle::EthBundleApiServer,
debug::DebugApiServer,
engine::{EngineApiServer, EngineEthApiServer},
eth::EthApiServer,
eth_filter::EthFilterApiServer,
eth_pubsub::EthPubSubApiServer,
mev::MevApiServer,
net::NetApiServer,
otterscan::OtterscanServer,
reth::RethApiServer,
@@ -60,10 +64,12 @@ pub use clients::*;
pub mod clients {
pub use crate::{
admin::AdminApiClient,
bundle::EthBundleApiClient,
debug::DebugApiClient,
engine::{EngineApiClient, EngineEthApiClient},
eth::EthApiClient,
eth_filter::EthFilterApiClient,
mev::MevApiClient,
net::NetApiClient,
otterscan::OtterscanClient,
rpc::RpcApiServer,

View File

@@ -0,0 +1,27 @@
use jsonrpsee::proc_macros::rpc;
use reth_rpc_types::{
SendBundleRequest, SendBundleResponse, SimBundleOverrides, SimBundleResponse,
};
/// Mev rpc interface.
#[cfg_attr(not(feature = "client"), rpc(server, namespace = "mev"))]
#[cfg_attr(feature = "client", rpc(server, client, namespace = "mev"))]
#[async_trait::async_trait]
pub trait MevApi {
/// Submitting bundles to the relay. It takes in a bundle and provides a bundle hash as a
/// return value.
#[method(name = "sendBundle")]
async fn send_bundle(
&self,
request: SendBundleRequest,
) -> jsonrpsee::core::RpcResult<SendBundleResponse>;
/// Similar to `mev_sendBundle` but instead of submitting a bundle to the relay, it returns
/// a simulation result. Only fully matched bundles can be simulated.
#[method(name = "simBundle")]
async fn sim_bundle(
&self,
bundle: SendBundleRequest,
sim_overrides: SimBundleOverrides,
) -> jsonrpsee::core::RpcResult<SimBundleResponse>;
}

View File

@@ -17,8 +17,8 @@ use reth_provider::{
};
use reth_rpc::{
eth::{cache::EthStateCache, gas_oracle::GasPriceOracle},
AuthLayer, Claims, EngineEthApi, EthApi, EthFilter, EthSubscriptionIdProvider,
JwtAuthValidator, JwtSecret, TracingCallPool,
AuthLayer, BlockingTaskPool, Claims, EngineEthApi, EthApi, EthFilter,
EthSubscriptionIdProvider, JwtAuthValidator, JwtSecret,
};
use reth_rpc_api::{servers::*, EngineApiServer};
use reth_tasks::TaskSpawner;
@@ -66,7 +66,7 @@ where
gas_oracle,
EthConfig::default().rpc_gas_cap,
Box::new(executor.clone()),
TracingCallPool::build().expect("failed to build tracing pool"),
BlockingTaskPool::build().expect("failed to build tracing pool"),
);
let eth_filter = EthFilter::new(
provider,

View File

@@ -5,7 +5,7 @@ use reth_rpc::{
gas_oracle::GasPriceOracleConfig,
RPC_DEFAULT_GAS_CAP,
},
EthApi, EthFilter, EthPubSub, TracingCallPool,
BlockingTaskPool, EthApi, EthFilter, EthPubSub,
};
use serde::{Deserialize, Serialize};
@@ -21,7 +21,7 @@ pub struct EthHandlers<Provider, Pool, Network, Events> {
/// Handler for subscriptions only available for transports that support it (ws, ipc)
pub pubsub: EthPubSub<Provider, Pool, Events, Network>,
/// The configured tracing call pool
pub tracing_call_pool: TracingCallPool,
pub blocking_task_pool: BlockingTaskPool,
}
/// Additional config values for the eth namespace

View File

@@ -117,9 +117,9 @@ use reth_rpc::{
cache::{cache_new_blocks_task, EthStateCache},
gas_oracle::GasPriceOracle,
},
AdminApi, DebugApi, EngineEthApi, EthApi, EthFilter, EthPubSub, EthSubscriptionIdProvider,
NetApi, OtterscanApi, RPCApi, RethApi, TraceApi, TracingCallGuard, TracingCallPool, TxPoolApi,
Web3Api,
AdminApi, BlockingTaskGuard, BlockingTaskPool, DebugApi, EngineEthApi, EthApi, EthFilter,
EthPubSub, EthSubscriptionIdProvider, NetApi, OtterscanApi, RPCApi, RethApi, TraceApi,
TxPoolApi, Web3Api,
};
use reth_rpc_api::{servers::*, EngineApiServer};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
@@ -719,7 +719,7 @@ pub struct RethModuleRegistry<Provider, Pool, Network, Tasks, Events> {
/// Holds a clone of all the eth namespace handlers
eth: Option<EthHandlers<Provider, Pool, Network, Events>>,
/// to put trace calls behind semaphore
tracing_call_guard: TracingCallGuard,
blocking_pool_guard: BlockingTaskGuard,
/// Contains the [Methods] of a module
modules: HashMap<RethRpcModule, Methods>,
}
@@ -745,7 +745,7 @@ impl<Provider, Pool, Network, Tasks, Events>
eth: None,
executor,
modules: Default::default(),
tracing_call_guard: TracingCallGuard::new(config.eth.max_tracing_requests),
blocking_pool_guard: BlockingTaskGuard::new(config.eth.max_tracing_requests),
config,
events,
}
@@ -927,7 +927,7 @@ where
filter: eth_filter,
pubsub: eth_pubsub,
cache: _,
tracing_call_pool: _,
blocking_task_pool: _,
} = self.with_eth(|eth| eth.clone());
// Create a copy, so we can list out all the methods for rpc_ api
@@ -946,7 +946,7 @@ where
self.provider.clone(),
eth_api.clone(),
Box::new(self.executor.clone()),
self.tracing_call_guard.clone(),
self.blocking_pool_guard.clone(),
)
.into_rpc()
.into(),
@@ -964,7 +964,7 @@ where
RethRpcModule::Trace => TraceApi::new(
self.provider.clone(),
eth_api.clone(),
self.tracing_call_guard.clone(),
self.blocking_pool_guard.clone(),
)
.into_rpc()
.into(),
@@ -1026,7 +1026,8 @@ where
);
let executor = Box::new(self.executor.clone());
let tracing_call_pool = TracingCallPool::build().expect("failed to build tracing pool");
let blocking_task_pool =
BlockingTaskPool::build().expect("failed to build tracing pool");
let api = EthApi::with_spawner(
self.provider.clone(),
self.pool.clone(),
@@ -1035,7 +1036,7 @@ where
gas_oracle,
self.config.eth.rpc_gas_cap,
executor.clone(),
tracing_call_pool.clone(),
blocking_task_pool.clone(),
);
let filter = EthFilter::new(
self.provider.clone(),
@@ -1053,7 +1054,7 @@ where
executor,
);
let eth = EthHandlers { api, cache, filter, pubsub, tracing_call_pool };
let eth = EthHandlers { api, cache, filter, pubsub, blocking_task_pool };
self.eth = Some(eth);
}
f(self.eth.as_ref().expect("exists; qed"))
@@ -1071,7 +1072,7 @@ where
/// Instantiates TraceApi
pub fn trace_api(&mut self) -> TraceApi<Provider, EthApi<Provider, Pool, Network>> {
let eth = self.eth_handlers();
TraceApi::new(self.provider.clone(), eth.api, self.tracing_call_guard.clone())
TraceApi::new(self.provider.clone(), eth.api, self.blocking_pool_guard.clone())
}
/// Instantiates OtterscanApi
@@ -1087,7 +1088,7 @@ where
self.provider.clone(),
eth_api,
Box::new(self.executor.clone()),
self.tracing_call_guard.clone(),
self.blocking_pool_guard.clone(),
)
}

View File

@@ -114,11 +114,9 @@ where
EthApiClient::submit_hashrate(client, U256::default(), B256::default()).await.unwrap();
EthApiClient::gas_price(client).await.unwrap_err();
EthApiClient::max_priority_fee_per_gas(client).await.unwrap_err();
EthApiClient::get_proof(client, address, vec![], None).await.unwrap();
// Unimplemented
assert!(is_unimplemented(
EthApiClient::get_proof(client, address, vec![], None).await.err().unwrap()
));
assert!(is_unimplemented(EthApiClient::author(client).await.err().unwrap()));
assert!(is_unimplemented(EthApiClient::is_mining(client).await.err().unwrap()));
assert!(is_unimplemented(EthApiClient::get_work(client).await.err().unwrap()));

View File

@@ -12,10 +12,7 @@
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
pub mod block;
pub use block::*;
pub mod transaction;
pub use transaction::*;
pub mod engine;
pub use engine::*;
pub mod log;
pub use log::*;
pub mod proof;
pub mod transaction;

View File

@@ -0,0 +1,27 @@
//! Compatibility functions for rpc proof related types.
use reth_primitives::{
serde_helper::JsonStorageKey,
trie::{AccountProof, StorageProof},
U64,
};
use reth_rpc_types::{EIP1186AccountProofResponse, EIP1186StorageProof};
/// Creates a new rpc storage proof from a primitive storage proof type.
pub fn from_primitive_storage_proof(proof: StorageProof) -> EIP1186StorageProof {
EIP1186StorageProof { key: JsonStorageKey(proof.key), value: proof.value, proof: proof.proof }
}
/// Creates a new rpc account proof from a primitive account proof type.
pub fn from_primitive_account_proof(proof: AccountProof) -> EIP1186AccountProofResponse {
let info = proof.info.unwrap_or_default();
EIP1186AccountProofResponse {
address: proof.address,
balance: info.balance,
code_hash: info.get_bytecode_hash(),
nonce: U64::from(info.nonce),
storage_hash: proof.storage_root,
account_proof: proof.proof,
storage_proof: proof.storage_proofs.into_iter().map(from_primitive_storage_proof).collect(),
}
}

View File

@@ -12,7 +12,7 @@ pub struct AccountInfo {
/// Data structure with proof for one single storage-entry
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct StorageProof {
pub struct EIP1186StorageProof {
/// Storage key.
pub key: JsonStorageKey,
/// Value that the key holds
@@ -31,7 +31,7 @@ pub struct EIP1186AccountProofResponse {
pub nonce: U64,
pub storage_hash: B256,
pub account_proof: Vec<Bytes>,
pub storage_proof: Vec<StorageProof>,
pub storage_proof: Vec<EIP1186StorageProof>,
}
/// Extended account information (used by `parity_allAccountInfo`).

View File

@@ -68,6 +68,7 @@ tracing.workspace = true
tracing-futures = "0.2"
schnellru = "0.2"
futures.workspace = true
derive_more = "0.99"
[dev-dependencies]
jsonrpsee = { workspace = true, features = ["client"] }

View File

@@ -13,17 +13,19 @@ use tokio::sync::{oneshot, AcquireError, OwnedSemaphorePermit, Semaphore};
/// RPC Tracing call guard semaphore.
///
/// This is used to restrict the number of concurrent RPC requests to tracing methods like
/// `debug_traceTransaction` because they can consume a lot of memory and CPU.
/// `debug_traceTransaction` as well as `eth_getProof` because they can consume a lot of
/// memory and CPU.
///
/// This types serves as an entry guard for the [TracingCallPool] and is used to rate limit parallel
/// tracing calls on the pool.
/// This types serves as an entry guard for the [BlockingTaskPool] and is used to rate limit
/// parallel blocking tasks in the pool.
#[derive(Clone, Debug)]
pub struct TracingCallGuard(Arc<Semaphore>);
pub struct BlockingTaskGuard(Arc<Semaphore>);
impl TracingCallGuard {
/// Create a new `TracingCallGuard` with the given maximum number of tracing calls in parallel.
pub fn new(max_tracing_requests: u32) -> Self {
Self(Arc::new(Semaphore::new(max_tracing_requests as usize)))
impl BlockingTaskGuard {
/// Create a new `BlockingTaskGuard` with the given maximum number of blocking tasks in
/// parallel.
pub fn new(max_blocking_tasks: u32) -> Self {
Self(Arc::new(Semaphore::new(max_blocking_tasks as usize)))
}
/// See also [Semaphore::acquire_owned]
@@ -37,24 +39,24 @@ impl TracingCallGuard {
}
}
/// Used to execute tracing calls on a rayon threadpool from within a tokio runtime.
/// Used to execute blocking tasks on a rayon threadpool from within a tokio runtime.
///
/// This is a dedicated threadpool for tracing calls which are CPU bound.
/// This is a dedicated threadpool for blocking tasks which are CPU bound.
/// RPC calls that perform blocking IO (disk lookups) are not executed on this pool but on the tokio
/// runtime's blocking pool, which performs poorly with CPU bound tasks. Once the tokio blocking
/// pool is saturated it is converted into a queue, tracing calls could then interfere with the
/// pool is saturated it is converted into a queue, blocking tasks could then interfere with the
/// queue and block other RPC calls.
///
/// See also [tokio-docs] for more information.
///
/// [tokio-docs]: https://docs.rs/tokio/latest/tokio/index.html#cpu-bound-tasks-and-blocking-code
#[derive(Clone, Debug)]
pub struct TracingCallPool {
pub struct BlockingTaskPool {
pool: Arc<rayon::ThreadPool>,
}
impl TracingCallPool {
/// Create a new `TracingCallPool` with the given threadpool.
impl BlockingTaskPool {
/// Create a new `BlockingTaskPool` with the given threadpool.
pub fn new(pool: rayon::ThreadPool) -> Self {
Self { pool: Arc::new(pool) }
}
@@ -83,7 +85,7 @@ impl TracingCallPool {
/// function's return value.
///
/// If the function panics, the future will resolve to an error.
pub fn spawn<F, R>(&self, func: F) -> TracingCallHandle<R>
pub fn spawn<F, R>(&self, func: F) -> BlockingTaskHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
@@ -94,7 +96,7 @@ impl TracingCallPool {
let _result = tx.send(catch_unwind(AssertUnwindSafe(func)));
});
TracingCallHandle { rx }
BlockingTaskHandle { rx }
}
/// Asynchronous wrapper around Rayon's
@@ -104,7 +106,7 @@ impl TracingCallPool {
/// function's return value.
///
/// If the function panics, the future will resolve to an error.
pub fn spawn_fifo<F, R>(&self, func: F) -> TracingCallHandle<R>
pub fn spawn_fifo<F, R>(&self, func: F) -> BlockingTaskHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
@@ -115,11 +117,11 @@ impl TracingCallPool {
let _result = tx.send(catch_unwind(AssertUnwindSafe(func)));
});
TracingCallHandle { rx }
BlockingTaskHandle { rx }
}
}
/// Async handle for a blocking tracing task running in a Rayon thread pool.
/// Async handle for a blocking task running in a Rayon thread pool.
///
/// ## Panics
///
@@ -127,18 +129,18 @@ impl TracingCallPool {
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[pin_project::pin_project]
pub struct TracingCallHandle<T> {
pub struct BlockingTaskHandle<T> {
#[pin]
pub(crate) rx: oneshot::Receiver<thread::Result<T>>,
}
impl<T> Future for TracingCallHandle<T> {
impl<T> Future for BlockingTaskHandle<T> {
type Output = thread::Result<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match ready!(self.project().rx.poll(cx)) {
Ok(res) => Poll::Ready(res),
Err(_) => Poll::Ready(Err(Box::<TokioTracingCallError>::default())),
Err(_) => Poll::Ready(Err(Box::<TokioBlockingTaskError>::default())),
}
}
}
@@ -149,23 +151,23 @@ impl<T> Future for TracingCallHandle<T> {
#[derive(Debug, Default, thiserror::Error)]
#[error("Tokio channel dropped while awaiting result")]
#[non_exhaustive]
pub struct TokioTracingCallError;
pub struct TokioBlockingTaskError;
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn tracing_pool() {
let pool = TracingCallPool::build().unwrap();
async fn blocking_pool() {
let pool = BlockingTaskPool::build().unwrap();
let res = pool.spawn(move || 5);
let res = res.await.unwrap();
assert_eq!(res, 5);
}
#[tokio::test]
async fn tracing_pool_panic() {
let pool = TracingCallPool::build().unwrap();
async fn blocking_pool_panic() {
let pool = BlockingTaskPool::build().unwrap();
let res = pool.spawn(move || -> i32 {
panic!();
});

View File

@@ -8,7 +8,7 @@ use crate::{
EthTransactions, TransactionSource,
},
result::{internal_rpc_err, ToRpcResult},
EthApiSpec, TracingCallGuard,
BlockingTaskGuard, EthApiSpec,
};
use alloy_rlp::{Decodable, Encodable};
use async_trait::async_trait;
@@ -61,10 +61,10 @@ impl<Provider, Eth> DebugApi<Provider, Eth> {
provider: Provider,
eth: Eth,
task_spawner: Box<dyn TaskSpawner>,
tracing_call_guard: TracingCallGuard,
blocking_task_guard: BlockingTaskGuard,
) -> Self {
let inner =
Arc::new(DebugApiInner { provider, eth_api: eth, task_spawner, tracing_call_guard });
Arc::new(DebugApiInner { provider, eth_api: eth, task_spawner, blocking_task_guard });
Self { inner }
}
}
@@ -78,7 +78,7 @@ where
{
/// Acquires a permit to execute a tracing call.
async fn acquire_trace_permit(&self) -> Result<OwnedSemaphorePermit, AcquireError> {
self.inner.tracing_call_guard.clone().acquire_owned().await
self.inner.blocking_task_guard.clone().acquire_owned().await
}
/// Trace the entire block asynchronously
@@ -1010,8 +1010,8 @@ struct DebugApiInner<Provider, Eth> {
provider: Provider,
/// The implementation of `eth` API
eth_api: Eth,
// restrict the number of concurrent calls to tracing calls
tracing_call_guard: TracingCallGuard,
// restrict the number of concurrent calls to blocking calls
blocking_task_guard: BlockingTaskGuard,
/// The type that can spawn tasks which would otherwise block.
task_spawner: Box<dyn TaskSpawner>,
}

View File

@@ -39,7 +39,7 @@ mod sign;
mod state;
mod transactions;
use crate::TracingCallPool;
use crate::BlockingTaskPool;
pub use transactions::{EthTransactions, TransactionSource};
/// `Eth` API trait.
@@ -91,7 +91,7 @@ where
eth_cache: EthStateCache,
gas_oracle: GasPriceOracle<Provider>,
gas_cap: impl Into<GasCap>,
tracing_call_pool: TracingCallPool,
blocking_task_pool: BlockingTaskPool,
) -> Self {
Self::with_spawner(
provider,
@@ -101,7 +101,7 @@ where
gas_oracle,
gas_cap.into().into(),
Box::<TokioTaskExecutor>::default(),
tracing_call_pool,
blocking_task_pool,
)
}
@@ -115,7 +115,7 @@ where
gas_oracle: GasPriceOracle<Provider>,
gas_cap: u64,
task_spawner: Box<dyn TaskSpawner>,
tracing_call_pool: TracingCallPool,
blocking_task_pool: BlockingTaskPool,
) -> Self {
// get the block number of the latest block
let latest_block = provider
@@ -136,7 +136,7 @@ where
starting_block: U256::from(latest_block),
task_spawner,
pending_block: Default::default(),
tracing_call_pool,
blocking_task_pool,
};
Self { inner: Arc::new(inner) }
}
@@ -436,6 +436,6 @@ struct EthApiInner<Provider, Pool, Network> {
task_spawner: Box<dyn TaskSpawner>,
/// Cached pending block if any
pending_block: Mutex<Option<PendingBlock>>,
/// A pool dedicated to tracing calls
tracing_call_pool: TracingCallPool,
/// A pool dedicated to blocking tasks.
blocking_task_pool: BlockingTaskPool,
}

View File

@@ -5,6 +5,7 @@ use super::EthApiSpec;
use crate::{
eth::{
api::{EthApi, EthTransactions},
error::EthApiError,
revm_utils::EvmOverrides,
},
result::{internal_rpc_err, ToRpcResult},
@@ -368,21 +369,19 @@ where
/// Handler for: `eth_getProof`
async fn get_proof(
&self,
_address: Address,
_keys: Vec<JsonStorageKey>,
_block_number: Option<BlockId>,
address: Address,
keys: Vec<JsonStorageKey>,
block_number: Option<BlockId>,
) -> Result<EIP1186AccountProofResponse> {
// TODO: uncomment when implemented
// trace!(target: "rpc::eth", ?address, ?keys, ?block_number, "Serving eth_getProof");
// let res = EthApi::get_proof(self, address, keys, block_number);
trace!(target: "rpc::eth", ?address, ?keys, ?block_number, "Serving eth_getProof");
let res = EthApi::get_proof(self, address, keys, block_number).await;
// Ok(res.map_err(|e| match e {
// EthApiError::InvalidBlockRange => {
// internal_rpc_err("eth_getProof is unimplemented for historical blocks")
// }
// _ => e.into(),
// })?)
Err(internal_rpc_err("unimplemented"))
Ok(res.map_err(|e| match e {
EthApiError::InvalidBlockRange => {
internal_rpc_err("eth_getProof is unimplemented for historical blocks")
}
_ => e.into(),
})?)
}
}
@@ -390,7 +389,7 @@ where
mod tests {
use crate::{
eth::{cache::EthStateCache, gas_oracle::GasPriceOracle},
EthApi, TracingCallPool,
BlockingTaskPool, EthApi,
};
use jsonrpsee::types::error::INVALID_PARAMS_CODE;
use reth_interfaces::test_utils::{generators, generators::Rng};
@@ -428,7 +427,7 @@ mod tests {
cache.clone(),
GasPriceOracle::new(provider, Default::default(), cache),
ETHEREUM_BLOCK_GAS_LIMIT,
TracingCallPool::build().expect("failed to build tracing pool"),
BlockingTaskPool::build().expect("failed to build tracing pool"),
)
}

View File

@@ -5,14 +5,13 @@ use crate::{
EthApi,
};
use reth_primitives::{
serde_helper::JsonStorageKey, Address, BlockId, BlockNumberOrTag, Bytes, B256, KECCAK_EMPTY,
U256, U64,
serde_helper::JsonStorageKey, Address, BlockId, BlockNumberOrTag, Bytes, B256, U256,
};
use reth_provider::{
AccountReader, BlockReaderIdExt, ChainSpecProvider, EvmEnvProvider, StateProvider,
StateProviderFactory,
BlockReaderIdExt, ChainSpecProvider, EvmEnvProvider, StateProvider, StateProviderFactory,
};
use reth_rpc_types::{EIP1186AccountProofResponse, StorageProof};
use reth_rpc_types::EIP1186AccountProofResponse;
use reth_rpc_types_compat::proof::from_primitive_account_proof;
use reth_transaction_pool::{PoolTransaction, TransactionPool};
impl<Provider, Pool, Network> EthApi<Provider, Pool, Network>
@@ -84,8 +83,7 @@ where
Ok(B256::new(value.to_be_bytes()))
}
#[allow(unused)]
pub(crate) fn get_proof(
pub(crate) async fn get_proof(
&self,
address: Address,
keys: Vec<JsonStorageKey>,
@@ -97,7 +95,7 @@ where
// if we are trying to create a proof for the latest block, but have a BlockId as input
// that is not BlockNumberOrTag::Latest, then we need to figure out whether or not the
// BlockId corresponds to the latest block
let is_blockid_latest = match block_id {
let is_latest_block = match block_id {
BlockId::Number(BlockNumberOrTag::Number(num)) => num == chain_info.best_number,
BlockId::Hash(hash) => hash == chain_info.best_hash.into(),
BlockId::Number(BlockNumberOrTag::Latest) => true,
@@ -105,43 +103,21 @@ where
};
// TODO: remove when HistoricalStateProviderRef::proof is implemented
if !is_blockid_latest {
if !is_latest_block {
return Err(EthApiError::InvalidBlockRange)
}
let state = self.state_at_block_id(block_id)?;
let hash_keys = keys.iter().map(|key| key.0).collect::<Vec<_>>();
let (account_proof, storage_hash, stg_proofs) = state.proof(address, &hash_keys)?;
let storage_proof = keys
.into_iter()
.zip(stg_proofs)
.map(|(key, proof)| {
state.storage(address, key.0).map(|op| StorageProof {
key,
value: op.unwrap_or_default(),
proof,
})
let this = self.clone();
self.inner
.blocking_task_pool
.spawn(move || {
let state = this.state_at_block_id(block_id)?;
let storage_keys = keys.iter().map(|key| key.0).collect::<Vec<_>>();
let proof = state.proof(address, &storage_keys)?;
Ok(from_primitive_account_proof(proof))
})
.collect::<Result<_, _>>()?;
let mut proof = EIP1186AccountProofResponse {
address,
code_hash: KECCAK_EMPTY,
account_proof,
storage_hash,
storage_proof,
..Default::default()
};
if let Some(account) = state.basic_account(proof.address)? {
proof.balance = account.balance;
proof.nonce = U64::from(account.nonce);
proof.code_hash = account.get_bytecode_hash();
}
Ok(proof)
.await
.map_err(|_| EthApiError::InternalBlockingTaskError)?
}
}
@@ -150,7 +126,7 @@ mod tests {
use super::*;
use crate::{
eth::{cache::EthStateCache, gas_oracle::GasPriceOracle},
TracingCallPool,
BlockingTaskPool,
};
use reth_primitives::{constants::ETHEREUM_BLOCK_GAS_LIMIT, StorageKey, StorageValue};
use reth_provider::test_utils::{ExtendedAccount, MockEthProvider, NoopProvider};
@@ -170,7 +146,7 @@ mod tests {
cache.clone(),
GasPriceOracle::new(NoopProvider::default(), Default::default(), cache),
ETHEREUM_BLOCK_GAS_LIMIT,
TracingCallPool::build().expect("failed to build tracing pool"),
BlockingTaskPool::build().expect("failed to build tracing pool"),
);
let address = Address::random();
let storage = eth_api.storage_at(address, U256::ZERO.into(), None).unwrap();
@@ -192,7 +168,7 @@ mod tests {
cache.clone(),
GasPriceOracle::new(mock_provider, Default::default(), cache),
ETHEREUM_BLOCK_GAS_LIMIT,
TracingCallPool::build().expect("failed to build tracing pool"),
BlockingTaskPool::build().expect("failed to build tracing pool"),
);
let storage_key: U256 = storage_key.into();

View File

@@ -32,7 +32,7 @@ use reth_rpc_types::{
BlockError, CallRequest, Index, Log, Transaction, TransactionInfo, TransactionReceipt,
TransactionRequest, TypedTransactionRequest,
};
use reth_rpc_types_compat::from_recovered_with_block_context;
use reth_rpc_types_compat::transaction::from_recovered_with_block_context;
use reth_transaction_pool::{TransactionOrigin, TransactionPool};
use revm::{
db::CacheDB,
@@ -50,7 +50,7 @@ pub(crate) type StateCacheDB<'r> = CacheDB<StateProviderDatabase<StateProviderBo
/// Commonly used transaction related functions for the [EthApi] type in the `eth_` namespace.
///
/// Async functions that are spawned onto the
/// [TracingCallPool](crate::tracing_call::TracingCallPool) begin with `spawn_`
/// [BlockingTaskPool](crate::blocking_pool::BlockingTaskPool) begin with `spawn_`
#[async_trait::async_trait]
pub trait EthTransactions: Send + Sync {
/// Returns default gas limit to use for `eth_call` and tracing RPC methods.
@@ -222,7 +222,7 @@ pub trait EthTransactions: Send + Sync {
/// the database that points to the beginning of the transaction.
///
/// Note: Implementers should use a threadpool where blocking is allowed, such as
/// [TracingCallPool](crate::tracing_call::TracingCallPool).
/// [BlockingTaskPool](crate::blocking_pool::BlockingTaskPool).
async fn spawn_trace_transaction_in_block<F, R>(
&self,
hash: B256,
@@ -325,13 +325,13 @@ where
{
let this = self.clone();
self.inner
.tracing_call_pool
.blocking_task_pool
.spawn(move || {
let state = this.state_at(at)?;
f(state)
})
.await
.map_err(|_| EthApiError::InternalTracingError)?
.map_err(|_| EthApiError::InternalBlockingTaskError)?
}
async fn evm_env_at(&self, at: BlockId) -> EthResult<(CfgEnv, BlockEnv, BlockId)> {
@@ -594,7 +594,7 @@ where
let (cfg, block_env, at) = self.evm_env_at(at).await?;
let this = self.clone();
self.inner
.tracing_call_pool
.blocking_task_pool
.spawn(move || {
let state = this.state_at(at)?;
let mut db = CacheDB::new(StateProviderDatabase::new(state));
@@ -610,7 +610,7 @@ where
f(db, env)
})
.await
.map_err(|_| EthApiError::InternalTracingError)?
.map_err(|_| EthApiError::InternalBlockingTaskError)?
}
async fn transact_call_at(
@@ -1094,7 +1094,7 @@ mod tests {
use super::*;
use crate::{
eth::{cache::EthStateCache, gas_oracle::GasPriceOracle},
EthApi, TracingCallPool,
BlockingTaskPool, EthApi,
};
use reth_network_api::noop::NoopNetwork;
use reth_primitives::{constants::ETHEREUM_BLOCK_GAS_LIMIT, hex_literal::hex, Bytes};
@@ -1116,7 +1116,7 @@ mod tests {
cache.clone(),
GasPriceOracle::new(noop_provider, Default::default(), cache),
ETHEREUM_BLOCK_GAS_LIMIT,
TracingCallPool::build().expect("failed to build tracing pool"),
BlockingTaskPool::build().expect("failed to build tracing pool"),
);
// https://etherscan.io/tx/0xa694b71e6c128a2ed8e2e0f6770bddbe52e3bb8f10e8472f9a79ab81497a8b5d

View File

@@ -83,12 +83,12 @@ pub enum EthApiError {
/// Percentile array is invalid
#[error("invalid reward percentiles")]
InvalidRewardPercentiles,
/// Error thrown when a spawned tracing task failed to deliver an anticipated response.
/// Error thrown when a spawned blocking task failed to deliver an anticipated response.
///
/// This only happens if the tracing task panics and is aborted before it can return a response
/// back to the request handler.
#[error("internal error while tracing")]
InternalTracingError,
/// This only happens if the blocking task panics and is aborted before it can return a
/// response back to the request handler.
#[error("internal blocking task error")]
InternalBlockingTaskError,
/// Error thrown when a spawned blocking task failed to deliver an anticipated response.
#[error("internal eth error")]
InternalEthError,
@@ -133,7 +133,7 @@ impl From<EthApiError> for ErrorObject<'static> {
err @ EthApiError::ExecutionTimedOut(_) => {
rpc_error_with_code(CALL_EXECUTION_FAILED_CODE, err.to_string())
}
err @ EthApiError::InternalTracingError => internal_rpc_err(err.to_string()),
err @ EthApiError::InternalBlockingTaskError => internal_rpc_err(err.to_string()),
err @ EthApiError::InternalEthError => internal_rpc_err(err.to_string()),
err @ EthApiError::CallInputError(_) => invalid_params_rpc_err(err.to_string()),
}

View File

@@ -4,9 +4,12 @@ use crate::eth::{
cache::EthStateCache,
error::{EthApiError, EthResult, RpcInvalidTransactionError},
};
use derive_more::{Deref, DerefMut};
use reth_primitives::{constants::GWEI_TO_WEI, BlockNumberOrTag, B256, U256};
use reth_provider::BlockReaderIdExt;
use schnellru::{ByLength, LruMap};
use serde::{Deserialize, Serialize};
use std::fmt::{self, Debug, Formatter};
use tokio::sync::Mutex;
use tracing::warn;
@@ -90,8 +93,9 @@ pub struct GasPriceOracle<Provider> {
oracle_config: GasPriceOracleConfig,
/// The price under which the sample will be ignored.
ignore_price: Option<u128>,
/// The latest calculated price and its block hash
last_price: Mutex<GasPriceOracleResult>,
/// Stores the latest calculated price and its block hash and Cache stores the lowest effective
/// tip values of recent blocks
inner: Mutex<GasPriceOracleInner>,
}
impl<Provider> GasPriceOracle<Provider>
@@ -111,7 +115,16 @@ where
}
let ignore_price = oracle_config.ignore_price.map(|price| price.saturating_to());
Self { provider, oracle_config, last_price: Default::default(), cache, ignore_price }
// this is the number of blocks that we will cache the values for
let cached_values = (oracle_config.blocks * 5).max(oracle_config.max_block_history as u32);
let inner = Mutex::new(GasPriceOracleInner {
last_price: Default::default(),
lowest_effective_tip_cache: EffectiveTipLruCache(LruMap::new(ByLength::new(
cached_values,
))),
});
Self { provider, oracle_config, cache, ignore_price, inner }
}
/// Returns the configuration of the gas price oracle.
@@ -126,11 +139,11 @@ where
.sealed_header_by_number_or_tag(BlockNumberOrTag::Latest)?
.ok_or(EthApiError::UnknownBlockNumber)?;
let mut last_price = self.last_price.lock().await;
let mut inner = self.inner.lock().await;
// if we have stored a last price, then we check whether or not it was for the same head
if last_price.block_hash == header.hash {
return Ok(last_price.price)
if inner.last_price.block_hash == header.hash {
return Ok(inner.last_price.price)
}
// if all responses are empty, then we can return a maximum of 2*check_block blocks' worth
@@ -150,13 +163,24 @@ where
};
for _ in 0..max_blocks {
let (parent_hash, block_values) = self
.get_block_values(current_hash, SAMPLE_NUMBER)
.await?
.ok_or(EthApiError::UnknownBlockNumber)?;
// Check if current hash is in cache
let (parent_hash, block_values) =
if let Some(vals) = inner.lowest_effective_tip_cache.get(&current_hash) {
vals.to_owned()
} else {
// Otherwise we fetch it using get_block_values
let (parent_hash, block_values) = self
.get_block_values(current_hash, SAMPLE_NUMBER)
.await?
.ok_or(EthApiError::UnknownBlockNumber)?;
inner
.lowest_effective_tip_cache
.insert(current_hash, (parent_hash, block_values.clone()));
(parent_hash, block_values)
};
if block_values.is_empty() {
results.push(U256::from(last_price.price));
results.push(U256::from(inner.last_price.price));
} else {
results.extend(block_values);
populated_blocks += 1;
@@ -171,7 +195,7 @@ where
}
// sort results then take the configured percentile result
let mut price = last_price.price;
let mut price = inner.last_price.price;
if !results.is_empty() {
results.sort_unstable();
price = *results
@@ -186,7 +210,7 @@ where
}
}
*last_price = GasPriceOracleResult { block_hash: header.hash, price };
inner.last_price = GasPriceOracleResult { block_hash: header.hash, price };
Ok(price)
}
@@ -253,6 +277,26 @@ where
}
}
/// Container type for mutable inner state of the [GasPriceOracle]
#[derive(Debug)]
struct GasPriceOracleInner {
last_price: GasPriceOracleResult,
lowest_effective_tip_cache: EffectiveTipLruCache,
}
/// Wrapper struct for LruMap
#[derive(Deref, DerefMut)]
pub struct EffectiveTipLruCache(LruMap<B256, (B256, Vec<U256>), ByLength>);
impl Debug for EffectiveTipLruCache {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("EffectiveTipLruCache")
.field("cache_length", &self.len())
.field("cache_memory_usage", &self.memory_usage())
.finish()
}
}
/// Stores the last result that the oracle returned
#[derive(Debug, Clone)]
pub struct GasPriceOracleResult {

View File

@@ -36,11 +36,11 @@ mod otterscan;
mod reth;
mod rpc;
mod trace;
pub mod tracing_call;
mod txpool;
mod web3;
pub use admin::AdminApi;
pub use blocking_pool::{BlockingTaskGuard, BlockingTaskPool};
pub use debug::DebugApi;
pub use engine::{EngineApi, EngineEthApi};
pub use eth::{EthApi, EthApiSpec, EthFilter, EthPubSub, EthSubscriptionIdProvider};
@@ -50,8 +50,8 @@ pub use otterscan::OtterscanApi;
pub use reth::RethApi;
pub use rpc::RPCApi;
pub use trace::TraceApi;
pub use tracing_call::{TracingCallGuard, TracingCallPool};
pub use txpool::TxPoolApi;
pub use web3::Web3Api;
pub mod blocking_pool;
pub mod result;

View File

@@ -5,7 +5,7 @@ use crate::{
utils::recover_raw_transaction,
EthTransactions,
},
TracingCallGuard,
BlockingTaskGuard,
};
use async_trait::async_trait;
use jsonrpsee::core::RpcResult as Result;
@@ -44,8 +44,8 @@ impl<Provider, Eth> TraceApi<Provider, Eth> {
}
/// Create a new instance of the [TraceApi]
pub fn new(provider: Provider, eth_api: Eth, tracing_call_guard: TracingCallGuard) -> Self {
let inner = Arc::new(TraceApiInner { provider, eth_api, tracing_call_guard });
pub fn new(provider: Provider, eth_api: Eth, blocking_task_guard: BlockingTaskGuard) -> Self {
let inner = Arc::new(TraceApiInner { provider, eth_api, blocking_task_guard });
Self { inner }
}
@@ -53,7 +53,7 @@ impl<Provider, Eth> TraceApi<Provider, Eth> {
async fn acquire_trace_permit(
&self,
) -> std::result::Result<OwnedSemaphorePermit, AcquireError> {
self.inner.tracing_call_guard.clone().acquire_owned().await
self.inner.blocking_task_guard.clone().acquire_owned().await
}
}
@@ -557,7 +557,7 @@ struct TraceApiInner<Provider, Eth> {
/// Access to commonly used code of the `eth` namespace
eth_api: Eth,
// restrict the number of concurrent calls to `trace_*`
tracing_call_guard: TracingCallGuard,
blocking_task_guard: BlockingTaskGuard,
}
/// Returns the [TracingInspectorConfig] depending on the enabled [TraceType]s

View File

@@ -224,7 +224,6 @@ impl<DB: Database> Stage<DB> for MerkleStage {
let checkpoint = MerkleCheckpoint::new(
to_block,
state.last_account_key,
state.last_walker_key.hex_data.to_vec(),
state.walker_stack.into_iter().map(StoredSubNode::from).collect(),
state.hash_builder.into(),
);

View File

@@ -2,7 +2,7 @@
use reth_codecs::{main_codec, Compact};
use reth_primitives::{Header, TxNumber, Withdrawal, B256};
use std::ops::RangeInclusive;
use std::ops::Range;
/// Total number of transactions.
pub type NumTransactions = u64;
@@ -28,8 +28,8 @@ pub struct StoredBlockBodyIndices {
impl StoredBlockBodyIndices {
/// Return the range of transaction ids for this block.
pub fn tx_num_range(&self) -> RangeInclusive<TxNumber> {
self.first_tx_num..=(self.first_tx_num + self.tx_count).saturating_sub(1)
pub fn tx_num_range(&self) -> Range<TxNumber> {
self.first_tx_num..self.first_tx_num + self.tx_count
}
/// Return the index of last transaction in this block unless the block
@@ -111,6 +111,6 @@ mod test {
assert_eq!(block_indices.last_tx_num(), first_tx_num + tx_count - 1);
assert_eq!(block_indices.next_tx_num(), first_tx_num + tx_count);
assert_eq!(block_indices.tx_count(), tx_count);
assert_eq!(block_indices.tx_num_range(), first_tx_num..=first_tx_num + tx_count - 1);
assert_eq!(block_indices.tx_num_range(), first_tx_num..first_tx_num + tx_count);
}
}

View File

@@ -3,7 +3,7 @@ use crate::{
StateProvider, StateRootProvider,
};
use reth_interfaces::{provider::ProviderError, RethResult};
use reth_primitives::{Account, Address, BlockNumber, Bytecode, Bytes, B256};
use reth_primitives::{trie::AccountProof, Account, Address, BlockNumber, Bytecode, B256};
/// A state provider that either resolves to data in a wrapped [`crate::BundleStateWithReceipts`],
/// or an underlying state provider.
@@ -92,11 +92,7 @@ impl<SP: StateProvider, BSDP: BundleStateDataProvider> StateProvider
self.state_provider.bytecode_by_hash(code_hash)
}
fn proof(
&self,
_address: Address,
_keys: &[B256],
) -> RethResult<(Vec<Bytes>, B256, Vec<Vec<Bytes>>)> {
fn proof(&self, _address: Address, _keys: &[B256]) -> RethResult<AccountProof> {
Err(ProviderError::StateRootNotAvailableForHistoricalBlock.into())
}
}

View File

@@ -16,7 +16,7 @@ use reth_primitives::{
};
use revm::primitives::{BlockEnv, CfgEnv};
use std::{
ops::{Range, RangeInclusive},
ops::{RangeBounds, RangeInclusive},
sync::Arc,
};
use tracing::trace;
@@ -170,13 +170,13 @@ impl<DB: Database> HeaderProvider for ProviderFactory<DB> {
self.provider()?.header_td_by_number(number)
}
fn headers_range(&self, range: RangeInclusive<BlockNumber>) -> RethResult<Vec<Header>> {
fn headers_range(&self, range: impl RangeBounds<BlockNumber>) -> RethResult<Vec<Header>> {
self.provider()?.headers_range(range)
}
fn sealed_headers_range(
&self,
range: RangeInclusive<BlockNumber>,
range: impl RangeBounds<BlockNumber>,
) -> RethResult<Vec<SealedHeader>> {
self.provider()?.sealed_headers_range(range)
}
@@ -295,19 +295,19 @@ impl<DB: Database> TransactionsProvider for ProviderFactory<DB> {
fn transactions_by_block_range(
&self,
range: Range<BlockNumber>,
range: impl RangeBounds<BlockNumber>,
) -> RethResult<Vec<Vec<TransactionSigned>>> {
self.provider()?.transactions_by_block_range(range)
}
fn transactions_by_tx_range(
&self,
range: RangeInclusive<TxNumber>,
range: impl RangeBounds<TxNumber>,
) -> RethResult<Vec<TransactionSignedNoHash>> {
self.provider()?.transactions_by_tx_range(range)
}
fn senders_by_tx_range(&self, range: RangeInclusive<TxNumber>) -> RethResult<Vec<Address>> {
fn senders_by_tx_range(&self, range: impl RangeBounds<TxNumber>) -> RethResult<Vec<Address>> {
self.provider()?.senders_by_tx_range(range)
}

View File

@@ -889,7 +889,7 @@ impl<TX: DbTx> HeaderProvider for DatabaseProvider<TX> {
Ok(self.tx.get::<tables::HeaderTD>(number)?.map(|td| td.0))
}
fn headers_range(&self, range: RangeInclusive<BlockNumber>) -> RethResult<Vec<Header>> {
fn headers_range(&self, range: impl RangeBounds<BlockNumber>) -> RethResult<Vec<Header>> {
let mut cursor = self.tx.cursor_read::<tables::Headers>()?;
cursor
.walk_range(range)?
@@ -899,7 +899,7 @@ impl<TX: DbTx> HeaderProvider for DatabaseProvider<TX> {
fn sealed_headers_range(
&self,
range: RangeInclusive<BlockNumber>,
range: impl RangeBounds<BlockNumber>,
) -> RethResult<Vec<SealedHeader>> {
let mut headers = vec![];
for entry in self.tx.cursor_read::<tables::Headers>()?.walk_range(range)? {
@@ -1242,7 +1242,7 @@ impl<TX: DbTx> TransactionsProvider for DatabaseProvider<TX> {
fn transactions_by_block_range(
&self,
range: Range<BlockNumber>,
range: impl RangeBounds<BlockNumber>,
) -> RethResult<Vec<Vec<TransactionSigned>>> {
let mut results = Vec::new();
let mut body_cursor = self.tx.cursor_read::<tables::BlockBodyIndices>()?;
@@ -1266,7 +1266,7 @@ impl<TX: DbTx> TransactionsProvider for DatabaseProvider<TX> {
fn transactions_by_tx_range(
&self,
range: RangeInclusive<TxNumber>,
range: impl RangeBounds<TxNumber>,
) -> RethResult<Vec<TransactionSignedNoHash>> {
Ok(self
.tx
@@ -1276,7 +1276,7 @@ impl<TX: DbTx> TransactionsProvider for DatabaseProvider<TX> {
.collect::<Result<Vec<_>, _>>()?)
}
fn senders_by_tx_range(&self, range: RangeInclusive<TxNumber>) -> RethResult<Vec<Address>> {
fn senders_by_tx_range(&self, range: impl RangeBounds<TxNumber>) -> RethResult<Vec<Address>> {
Ok(self
.tx
.cursor_read::<tables::TxSenders>()?

View File

@@ -22,7 +22,7 @@ use reth_primitives::{
use revm::primitives::{BlockEnv, CfgEnv};
use std::{
collections::{BTreeMap, HashSet},
ops::{Range, RangeInclusive},
ops::{RangeBounds, RangeInclusive},
sync::Arc,
time::Instant,
};
@@ -136,13 +136,13 @@ where
self.database.provider()?.header_td_by_number(number)
}
fn headers_range(&self, range: RangeInclusive<BlockNumber>) -> RethResult<Vec<Header>> {
fn headers_range(&self, range: impl RangeBounds<BlockNumber>) -> RethResult<Vec<Header>> {
self.database.provider()?.headers_range(range)
}
fn sealed_headers_range(
&self,
range: RangeInclusive<BlockNumber>,
range: impl RangeBounds<BlockNumber>,
) -> RethResult<Vec<SealedHeader>> {
self.database.provider()?.sealed_headers_range(range)
}
@@ -319,19 +319,19 @@ where
fn transactions_by_block_range(
&self,
range: Range<BlockNumber>,
range: impl RangeBounds<BlockNumber>,
) -> RethResult<Vec<Vec<TransactionSigned>>> {
self.database.provider()?.transactions_by_block_range(range)
}
fn transactions_by_tx_range(
&self,
range: RangeInclusive<TxNumber>,
range: impl RangeBounds<TxNumber>,
) -> RethResult<Vec<TransactionSignedNoHash>> {
self.database.provider()?.transactions_by_tx_range(range)
}
fn senders_by_tx_range(&self, range: RangeInclusive<TxNumber>) -> RethResult<Vec<Address>> {
fn senders_by_tx_range(&self, range: impl RangeBounds<TxNumber>) -> RethResult<Vec<Address>> {
self.database.provider()?.senders_by_tx_range(range)
}

View File

@@ -6,7 +6,7 @@ use reth_db::{
use reth_interfaces::{provider::ProviderError, RethResult};
use reth_nippy_jar::{compression::Decompressor, NippyJar, NippyJarCursor};
use reth_primitives::{BlockHash, BlockNumber, Header, SealedHeader, U256};
use std::ops::RangeInclusive;
use std::ops::RangeBounds;
/// SnapshotProvider
///
@@ -85,13 +85,13 @@ impl<'a> HeaderProvider for SnapshotProvider<'a> {
unimplemented!();
}
fn headers_range(&self, _range: RangeInclusive<BlockNumber>) -> RethResult<Vec<Header>> {
fn headers_range(&self, _range: impl RangeBounds<BlockNumber>) -> RethResult<Vec<Header>> {
unimplemented!();
}
fn sealed_headers_range(
&self,
_range: RangeInclusive<BlockNumber>,
_range: impl RangeBounds<BlockNumber>,
) -> RethResult<Vec<SealedHeader>> {
unimplemented!();
}

View File

@@ -12,7 +12,7 @@ use reth_db::{
};
use reth_interfaces::RethResult;
use reth_primitives::{
Account, Address, BlockNumber, Bytecode, Bytes, StorageKey, StorageValue, B256,
trie::AccountProof, Account, Address, BlockNumber, Bytecode, StorageKey, StorageValue, B256,
};
/// State provider for a given block number which takes a tx reference.
@@ -240,11 +240,7 @@ impl<'b, TX: DbTx> StateProvider for HistoricalStateProviderRef<'b, TX> {
}
/// Get account and storage proofs.
fn proof(
&self,
_address: Address,
_keys: &[B256],
) -> RethResult<(Vec<Bytes>, B256, Vec<Vec<Bytes>>)> {
fn proof(&self, _address: Address, _keys: &[B256]) -> RethResult<AccountProof> {
Err(ProviderError::StateRootNotAvailableForHistoricalBlock.into())
}
}

View File

@@ -9,7 +9,8 @@ use reth_db::{
};
use reth_interfaces::{provider::ProviderError, RethError, RethResult};
use reth_primitives::{
keccak256, Account, Address, BlockNumber, Bytecode, Bytes, StorageKey, StorageValue, B256,
keccak256, trie::AccountProof, Account, Address, BlockNumber, Bytecode, StorageKey,
StorageValue, B256,
};
/// State provider over latest state that takes tx reference.
@@ -84,11 +85,7 @@ impl<'b, TX: DbTx> StateProvider for LatestStateProviderRef<'b, TX> {
self.db.get::<tables::Bytecodes>(code_hash).map_err(Into::into)
}
fn proof(
&self,
address: Address,
_keys: &[B256],
) -> RethResult<(Vec<Bytes>, B256, Vec<Vec<Bytes>>)> {
fn proof(&self, address: Address, _keys: &[B256]) -> RethResult<AccountProof> {
let _hashed_address = keccak256(address);
let _root = self
.db

View File

@@ -42,7 +42,7 @@ macro_rules! delegate_provider_impls {
}
StateProvider $(where [$($generics)*])?{
fn storage(&self, account: reth_primitives::Address, storage_key: reth_primitives::StorageKey) -> reth_interfaces::RethResult<Option<reth_primitives::StorageValue>>;
fn proof(&self, address: reth_primitives::Address, keys: &[reth_primitives::B256]) -> reth_interfaces::RethResult<(Vec<reth_primitives::Bytes>, reth_primitives::B256, Vec<Vec<reth_primitives::Bytes>>)>;
fn proof(&self, address: reth_primitives::Address, keys: &[reth_primitives::B256]) -> reth_interfaces::RethResult<reth_primitives::trie::AccountProof>;
fn bytecode_by_hash(&self, code_hash: reth_primitives::B256) -> reth_interfaces::RethResult<Option<reth_primitives::Bytecode>>;
}
);

View File

@@ -10,15 +10,15 @@ use parking_lot::Mutex;
use reth_db::models::StoredBlockBodyIndices;
use reth_interfaces::{provider::ProviderError, RethResult};
use reth_primitives::{
keccak256, Account, Address, Block, BlockHash, BlockHashOrNumber, BlockId, BlockNumber,
BlockWithSenders, Bytecode, Bytes, ChainInfo, ChainSpec, Header, Receipt, SealedBlock,
SealedHeader, StorageKey, StorageValue, TransactionMeta, TransactionSigned,
keccak256, trie::AccountProof, Account, Address, Block, BlockHash, BlockHashOrNumber, BlockId,
BlockNumber, BlockWithSenders, Bytecode, Bytes, ChainInfo, ChainSpec, Header, Receipt,
SealedBlock, SealedHeader, StorageKey, StorageValue, TransactionMeta, TransactionSigned,
TransactionSignedNoHash, TxHash, TxNumber, B256, U256,
};
use revm::primitives::{BlockEnv, CfgEnv};
use std::{
collections::{BTreeMap, HashMap},
ops::{Range, RangeInclusive},
ops::{RangeBounds, RangeInclusive},
sync::Arc,
};
@@ -152,7 +152,7 @@ impl HeaderProvider for MockEthProvider {
Ok(Some(sum))
}
fn headers_range(&self, range: RangeInclusive<BlockNumber>) -> RethResult<Vec<Header>> {
fn headers_range(&self, range: impl RangeBounds<BlockNumber>) -> RethResult<Vec<Header>> {
let lock = self.headers.lock();
let mut headers: Vec<_> =
@@ -164,7 +164,7 @@ impl HeaderProvider for MockEthProvider {
fn sealed_headers_range(
&self,
range: RangeInclusive<BlockNumber>,
range: impl RangeBounds<BlockNumber>,
) -> RethResult<Vec<SealedHeader>> {
Ok(self.headers_range(range)?.into_iter().map(|h| h.seal_slow()).collect())
}
@@ -265,7 +265,7 @@ impl TransactionsProvider for MockEthProvider {
fn transactions_by_block_range(
&self,
range: Range<reth_primitives::BlockNumber>,
range: impl RangeBounds<reth_primitives::BlockNumber>,
) -> RethResult<Vec<Vec<TransactionSigned>>> {
// init btreemap so we can return in order
let mut map = BTreeMap::new();
@@ -280,7 +280,7 @@ impl TransactionsProvider for MockEthProvider {
fn transactions_by_tx_range(
&self,
range: RangeInclusive<TxNumber>,
range: impl RangeBounds<TxNumber>,
) -> RethResult<Vec<reth_primitives::TransactionSignedNoHash>> {
let lock = self.blocks.lock();
let transactions = lock
@@ -299,7 +299,7 @@ impl TransactionsProvider for MockEthProvider {
Ok(transactions)
}
fn senders_by_tx_range(&self, range: RangeInclusive<TxNumber>) -> RethResult<Vec<Address>> {
fn senders_by_tx_range(&self, range: impl RangeBounds<TxNumber>) -> RethResult<Vec<Address>> {
let lock = self.blocks.lock();
let transactions = lock
.values()
@@ -507,11 +507,7 @@ impl StateProvider for MockEthProvider {
}))
}
fn proof(
&self,
_address: Address,
_keys: &[B256],
) -> RethResult<(Vec<Bytes>, B256, Vec<Vec<Bytes>>)> {
fn proof(&self, _address: Address, _keys: &[B256]) -> RethResult<AccountProof> {
todo!()
}
}

View File

@@ -10,14 +10,15 @@ use reth_db::models::{AccountBeforeTx, StoredBlockBodyIndices};
use reth_interfaces::RethResult;
use reth_primitives::{
stage::{StageCheckpoint, StageId},
Account, Address, Block, BlockHash, BlockHashOrNumber, BlockId, BlockNumber, Bytecode, Bytes,
trie::AccountProof,
Account, Address, Block, BlockHash, BlockHashOrNumber, BlockId, BlockNumber, Bytecode,
ChainInfo, ChainSpec, Header, PruneCheckpoint, PruneSegment, Receipt, SealedBlock,
SealedHeader, StorageKey, StorageValue, TransactionMeta, TransactionSigned,
TransactionSignedNoHash, TxHash, TxNumber, B256, KECCAK_EMPTY, MAINNET, U256,
TransactionSignedNoHash, TxHash, TxNumber, B256, MAINNET, U256,
};
use revm::primitives::{BlockEnv, CfgEnv};
use std::{
ops::{Range, RangeInclusive},
ops::{RangeBounds, RangeInclusive},
sync::Arc,
};
@@ -174,18 +175,18 @@ impl TransactionsProvider for NoopProvider {
fn transactions_by_block_range(
&self,
_range: Range<BlockNumber>,
_range: impl RangeBounds<BlockNumber>,
) -> RethResult<Vec<Vec<TransactionSigned>>> {
Ok(Vec::default())
}
fn senders_by_tx_range(&self, _range: RangeInclusive<TxNumber>) -> RethResult<Vec<Address>> {
fn senders_by_tx_range(&self, _range: impl RangeBounds<TxNumber>) -> RethResult<Vec<Address>> {
Ok(Vec::default())
}
fn transactions_by_tx_range(
&self,
_range: RangeInclusive<TxNumber>,
_range: impl RangeBounds<TxNumber>,
) -> RethResult<Vec<reth_primitives::TransactionSignedNoHash>> {
Ok(Vec::default())
}
@@ -228,13 +229,13 @@ impl HeaderProvider for NoopProvider {
Ok(None)
}
fn headers_range(&self, _range: RangeInclusive<BlockNumber>) -> RethResult<Vec<Header>> {
fn headers_range(&self, _range: impl RangeBounds<BlockNumber>) -> RethResult<Vec<Header>> {
Ok(vec![])
}
fn sealed_headers_range(
&self,
_range: RangeInclusive<BlockNumber>,
_range: impl RangeBounds<BlockNumber>,
) -> RethResult<Vec<SealedHeader>> {
Ok(vec![])
}
@@ -278,12 +279,8 @@ impl StateProvider for NoopProvider {
Ok(None)
}
fn proof(
&self,
_address: Address,
_keys: &[B256],
) -> RethResult<(Vec<Bytes>, B256, Vec<Vec<Bytes>>)> {
Ok((vec![], KECCAK_EMPTY, vec![]))
fn proof(&self, _address: Address, _keys: &[B256]) -> RethResult<AccountProof> {
Ok(AccountProof::default())
}
}

View File

@@ -1,7 +1,7 @@
use auto_impl::auto_impl;
use reth_interfaces::RethResult;
use reth_primitives::{BlockHash, BlockHashOrNumber, BlockNumber, Header, SealedHeader, U256};
use std::ops::RangeInclusive;
use std::ops::RangeBounds;
/// Client trait for fetching `Header` related data.
#[auto_impl(&, Arc)]
@@ -35,12 +35,12 @@ pub trait HeaderProvider: Send + Sync {
fn header_td_by_number(&self, number: BlockNumber) -> RethResult<Option<U256>>;
/// Get headers in range of block numbers
fn headers_range(&self, range: RangeInclusive<BlockNumber>) -> RethResult<Vec<Header>>;
fn headers_range(&self, range: impl RangeBounds<BlockNumber>) -> RethResult<Vec<Header>>;
/// Get headers in range of block numbers
fn sealed_headers_range(
&self,
range: RangeInclusive<BlockNumber>,
range: impl RangeBounds<BlockNumber>,
) -> RethResult<Vec<SealedHeader>>;
/// Get a single sealed header by block number

View File

@@ -3,8 +3,8 @@ use crate::{BlockHashReader, BlockIdReader, BundleStateWithReceipts};
use auto_impl::auto_impl;
use reth_interfaces::{provider::ProviderError, RethResult};
use reth_primitives::{
Address, BlockHash, BlockId, BlockNumHash, BlockNumber, BlockNumberOrTag, Bytecode, Bytes,
StorageKey, StorageValue, B256, KECCAK_EMPTY, U256,
trie::AccountProof, Address, BlockHash, BlockId, BlockNumHash, BlockNumber, BlockNumberOrTag,
Bytecode, StorageKey, StorageValue, B256, KECCAK_EMPTY, U256,
};
/// Type alias of boxed [StateProvider].
@@ -24,11 +24,7 @@ pub trait StateProvider: BlockHashReader + AccountReader + StateRootProvider + S
fn bytecode_by_hash(&self, code_hash: B256) -> RethResult<Option<Bytecode>>;
/// Get account and storage proofs.
fn proof(
&self,
address: Address,
keys: &[B256],
) -> RethResult<(Vec<Bytes>, B256, Vec<Vec<Bytes>>)>;
fn proof(&self, address: Address, keys: &[B256]) -> RethResult<AccountProof>;
/// Get account code by its address.
///

View File

@@ -4,7 +4,7 @@ use reth_primitives::{
Address, BlockHashOrNumber, BlockNumber, TransactionMeta, TransactionSigned,
TransactionSignedNoHash, TxHash, TxNumber,
};
use std::ops::{Range, RangeInclusive};
use std::ops::RangeBounds;
/// Client trait for fetching [TransactionSigned] related data.
#[auto_impl::auto_impl(&, Arc)]
@@ -46,17 +46,17 @@ pub trait TransactionsProvider: BlockNumReader + Send + Sync {
/// Get transactions by block range.
fn transactions_by_block_range(
&self,
range: Range<BlockNumber>,
range: impl RangeBounds<BlockNumber>,
) -> RethResult<Vec<Vec<TransactionSigned>>>;
/// Get transactions by tx range.
fn transactions_by_tx_range(
&self,
range: RangeInclusive<TxNumber>,
range: impl RangeBounds<TxNumber>,
) -> RethResult<Vec<TransactionSignedNoHash>>;
/// Get Senders from a tx range.
fn senders_by_tx_range(&self, range: RangeInclusive<TxNumber>) -> RethResult<Vec<Address>>;
fn senders_by_tx_range(&self, range: impl RangeBounds<TxNumber>) -> RethResult<Vec<Address>>;
/// Get transaction sender.
///

View File

@@ -147,6 +147,7 @@ impl<T: TransactionOrdering> TxPool<T> {
/// Updates the tracked blob fee
fn update_blob_fee(&mut self, _pending_blob_fee: u128) {
// TODO: std::mem::swap pending_blob_fee
// TODO(mattsse): update blob txs
}
@@ -154,14 +155,16 @@ impl<T: TransactionOrdering> TxPool<T> {
///
/// Depending on the change in direction of the basefee, this will promote or demote
/// transactions from the basefee pool.
fn update_basefee(&mut self, pending_basefee: u64) {
match pending_basefee.cmp(&self.all_transactions.pending_basefee) {
fn update_basefee(&mut self, mut pending_basefee: u64) {
std::mem::swap(&mut self.all_transactions.pending_basefee, &mut pending_basefee);
match self.all_transactions.pending_basefee.cmp(&pending_basefee) {
Ordering::Equal => {
// fee unchanged, nothing to update
}
Ordering::Greater => {
// increased base fee: recheck pending pool and remove all that are no longer valid
let removed = self.pending_pool.update_base_fee(pending_basefee);
let removed =
self.pending_pool.update_base_fee(self.all_transactions.pending_basefee);
for tx in removed {
let to = {
let tx =
@@ -175,7 +178,8 @@ impl<T: TransactionOrdering> TxPool<T> {
}
Ordering::Less => {
// decreased base fee: recheck basefee pool and promote all that are now valid
let removed = self.basefee_pool.enforce_basefee(pending_basefee);
let removed =
self.basefee_pool.enforce_basefee(self.all_transactions.pending_basefee);
for tx in removed {
let to = {
let tx =
@@ -202,11 +206,9 @@ impl<T: TransactionOrdering> TxPool<T> {
} = info;
self.all_transactions.last_seen_block_hash = last_seen_block_hash;
self.all_transactions.last_seen_block_number = last_seen_block_number;
self.all_transactions.pending_basefee = pending_basefee;
self.update_basefee(pending_basefee);
if let Some(blob_fee) = pending_blob_fee {
self.all_transactions.pending_blob_fee = blob_fee;
self.update_blob_fee(blob_fee)
}
}
@@ -2113,6 +2115,29 @@ mod tests {
assert_eq!(pool.all_transactions.txs.get(&id).unwrap().subpool, SubPool::BaseFee)
}
#[test]
fn update_basefee_subpools_setting_block_info() {
let mut f = MockTransactionFactory::default();
let mut pool = TxPool::new(MockOrdering::default(), Default::default());
let tx = MockTransaction::eip1559().inc_price_by(10);
let validated = f.validated(tx.clone());
let id = *validated.id();
pool.add_transaction(validated, U256::from(1_000), 0).unwrap();
assert_eq!(pool.pending_pool.len(), 1);
// use set_block_info for the basefee update
let mut block_info = pool.block_info();
block_info.pending_basefee = (tx.max_fee_per_gas() + 1) as u64;
pool.set_block_info(block_info);
assert!(pool.pending_pool.is_empty());
assert_eq!(pool.basefee_pool.len(), 1);
assert_eq!(pool.all_transactions.txs.get(&id).unwrap().subpool, SubPool::BaseFee)
}
#[test]
fn discard_nonce_too_low() {
let mut f = MockTransactionFactory::default();

View File

@@ -27,6 +27,7 @@ tracing.workspace = true
# misc
thiserror.workspace = true
derive_more = "0.99"
auto_impl = "1"
# test-utils
triehash = { version = "0.8", optional = true }

View File

@@ -34,6 +34,9 @@ pub mod walker;
mod errors;
pub use errors::*;
// The iterators for traversing existing intermediate hashes and updated trie leaves.
pub(crate) mod node_iter;
/// Merkle proof generation.
pub mod proof;

View File

@@ -0,0 +1,208 @@
use crate::{
hashed_cursor::{HashedAccountCursor, HashedStorageCursor},
trie_cursor::TrieCursor,
walker::TrieWalker,
StateRootError, StorageRootError,
};
use reth_primitives::{trie::Nibbles, Account, StorageEntry, B256, U256};
#[derive(Debug)]
pub(crate) struct TrieBranchNode {
pub(crate) key: Nibbles,
pub(crate) value: B256,
pub(crate) children_are_in_trie: bool,
}
impl TrieBranchNode {
pub(crate) fn new(key: Nibbles, value: B256, children_are_in_trie: bool) -> Self {
Self { key, value, children_are_in_trie }
}
}
#[derive(Debug)]
pub(crate) enum AccountNode {
Branch(TrieBranchNode),
Leaf(B256, Account),
}
#[derive(Debug)]
pub(crate) enum StorageNode {
Branch(TrieBranchNode),
Leaf(B256, U256),
}
/// An iterator over existing intermediate branch nodes and updated leaf nodes.
#[derive(Debug)]
pub(crate) struct AccountNodeIter<C, H> {
/// Underlying walker over intermediate nodes.
pub(crate) walker: TrieWalker<C>,
/// The cursor for the hashed account entries.
pub(crate) hashed_account_cursor: H,
/// The previous account key. If the iteration was previously interrupted, this value can be
/// used to resume iterating from the last returned leaf node.
previous_account_key: Option<B256>,
/// Current hashed account entry.
current_hashed_entry: Option<(B256, Account)>,
/// Flag indicating whether we should check the current walker key.
current_walker_key_checked: bool,
}
impl<C, H> AccountNodeIter<C, H> {
pub(crate) fn new(walker: TrieWalker<C>, hashed_account_cursor: H) -> Self {
Self {
walker,
hashed_account_cursor,
previous_account_key: None,
current_hashed_entry: None,
current_walker_key_checked: false,
}
}
pub(crate) fn with_last_account_key(mut self, previous_account_key: B256) -> Self {
self.previous_account_key = Some(previous_account_key);
self
}
}
impl<C, H> AccountNodeIter<C, H>
where
C: TrieCursor,
H: HashedAccountCursor,
{
/// Return the next account trie node to be added to the hash builder.
///
/// Returns the nodes using this algorithm:
/// 1. Return the current intermediate branch node if it hasn't been updated.
/// 2. Advance the trie walker to the next intermediate branch node and retrieve next
/// unprocessed key.
/// 3. Reposition the hashed account cursor on the next unprocessed key.
/// 4. Return every hashed account entry up to the key of the current intermediate branch node.
/// 5. Repeat.
///
/// NOTE: The iteration will start from the key of the previous hashed entry if it was supplied.
pub(crate) fn try_next(&mut self) -> Result<Option<AccountNode>, StateRootError> {
loop {
if let Some(key) = self.walker.key() {
if !self.current_walker_key_checked && self.previous_account_key.is_none() {
self.current_walker_key_checked = true;
if self.walker.can_skip_current_node {
return Ok(Some(AccountNode::Branch(TrieBranchNode::new(
key,
self.walker.hash().unwrap(),
self.walker.children_are_in_trie(),
))))
}
}
}
if let Some((hashed_address, account)) = self.current_hashed_entry.take() {
if self.walker.key().map_or(false, |key| key < Nibbles::unpack(hashed_address)) {
self.current_walker_key_checked = false;
continue
}
self.current_hashed_entry = self.hashed_account_cursor.next()?;
return Ok(Some(AccountNode::Leaf(hashed_address, account)))
}
match self.previous_account_key.take() {
Some(account_key) => {
self.hashed_account_cursor.seek(account_key)?;
self.current_hashed_entry = self.hashed_account_cursor.next()?;
}
None => {
let seek_key = match self.walker.next_unprocessed_key() {
Some(key) => key,
None => break, // no more keys
};
self.current_hashed_entry = self.hashed_account_cursor.seek(seek_key)?;
self.walker.advance()?;
}
}
}
Ok(None)
}
}
#[derive(Debug)]
pub(crate) struct StorageNodeIter<C, H> {
/// Underlying walker over intermediate nodes.
pub(crate) walker: TrieWalker<C>,
/// The cursor for the hashed storage entries.
pub(crate) hashed_storage_cursor: H,
/// The hashed address this storage trie belongs to.
hashed_address: B256,
/// Current hashed storage entry.
current_hashed_entry: Option<StorageEntry>,
/// Flag indicating whether we should check the current walker key.
current_walker_key_checked: bool,
}
impl<C, H> StorageNodeIter<C, H> {
pub(crate) fn new(
walker: TrieWalker<C>,
hashed_storage_cursor: H,
hashed_address: B256,
) -> Self {
Self {
walker,
hashed_storage_cursor,
hashed_address,
current_walker_key_checked: false,
current_hashed_entry: None,
}
}
}
impl<C, H> StorageNodeIter<C, H>
where
C: TrieCursor,
H: HashedStorageCursor,
{
/// Return the next storage trie node to be added to the hash builder.
///
/// Returns the nodes using this algorithm:
/// 1. Return the current intermediate branch node if it hasn't been updated.
/// 2. Advance the trie walker to the next intermediate branch node and retrieve next
/// unprocessed key.
/// 3. Reposition the hashed storage cursor on the next unprocessed key.
/// 4. Return every hashed storage entry up to the key of the current intermediate branch node.
/// 5. Repeat.
pub(crate) fn try_next(&mut self) -> Result<Option<StorageNode>, StorageRootError> {
loop {
if let Some(key) = self.walker.key() {
if !self.current_walker_key_checked {
self.current_walker_key_checked = true;
if self.walker.can_skip_current_node {
return Ok(Some(StorageNode::Branch(TrieBranchNode::new(
key,
self.walker.hash().unwrap(),
self.walker.children_are_in_trie(),
))))
}
}
}
if let Some(StorageEntry { key: hashed_key, value }) = self.current_hashed_entry.take()
{
if self.walker.key().map_or(false, |key| key < Nibbles::unpack(hashed_key)) {
self.current_walker_key_checked = false;
continue
}
self.current_hashed_entry = self.hashed_storage_cursor.next()?;
return Ok(Some(StorageNode::Leaf(hashed_key, value)))
}
let Some(seek_key) = self.walker.next_unprocessed_key() else { break };
self.current_hashed_entry =
self.hashed_storage_cursor.seek(self.hashed_address, seek_key)?;
self.walker.advance()?;
}
Ok(None)
}
}

View File

@@ -1,9 +1,5 @@
use crate::{trie_cursor::CursorSubNode, updates::TrieUpdates};
use reth_primitives::{
stage::MerkleCheckpoint,
trie::{hash_builder::HashBuilder, Nibbles},
B256,
};
use reth_primitives::{stage::MerkleCheckpoint, trie::hash_builder::HashBuilder, B256};
/// The progress of the state root computation.
#[derive(Debug)]
@@ -24,8 +20,6 @@ pub struct IntermediateStateRootState {
pub walker_stack: Vec<CursorSubNode>,
/// The last hashed account key processed.
pub last_account_key: B256,
/// The last walker key processed.
pub last_walker_key: Nibbles,
}
impl From<MerkleCheckpoint> for IntermediateStateRootState {
@@ -34,7 +28,6 @@ impl From<MerkleCheckpoint> for IntermediateStateRootState {
hash_builder: HashBuilder::from(value.state),
walker_stack: value.walker_stack.into_iter().map(CursorSubNode::from).collect(),
last_account_key: value.last_account_key,
last_walker_key: Nibbles::from_hex(value.last_walker_key),
}
}
}

View File

@@ -1,10 +1,11 @@
use crate::{
account::EthAccount,
hashed_cursor::{HashedAccountCursor, HashedCursorFactory, HashedStorageCursor},
hashed_cursor::{HashedCursorFactory, HashedStorageCursor},
node_iter::{AccountNode, AccountNodeIter, StorageNode, StorageNodeIter},
prefix_set::PrefixSetMut,
trie_cursor::{AccountTrieCursor, StorageTrieCursor},
walker::TrieWalker,
StorageRootError,
StateRootError, StorageRootError,
};
use alloy_rlp::{BufMut, Encodable};
use reth_db::{tables, transaction::DbTx};
@@ -12,7 +13,7 @@ use reth_primitives::{
keccak256,
proofs::EMPTY_ROOT,
trie::{AccountProof, HashBuilder, Nibbles, StorageProof},
Address, StorageEntry, B256,
Address, B256,
};
/// A struct for generating merkle proofs.
@@ -45,65 +46,46 @@ where
&self,
address: Address,
slots: &[B256],
) -> Result<AccountProof, StorageRootError> {
) -> Result<AccountProof, StateRootError> {
let target_hashed_address = keccak256(address);
let target_nibbles = Nibbles::unpack(target_hashed_address);
let mut account_proof = AccountProof::new(address);
let mut trie_cursor =
AccountTrieCursor::new(self.tx.cursor_read::<tables::AccountsTrie>()?);
let mut hashed_account_cursor = self.hashed_cursor_factory.hashed_account_cursor()?;
let hashed_account_cursor = self.hashed_cursor_factory.hashed_account_cursor()?;
let trie_cursor = AccountTrieCursor::new(self.tx.cursor_read::<tables::AccountsTrie>()?);
// Create the walker.
let mut prefix_set = PrefixSetMut::default();
prefix_set.insert(target_nibbles.clone());
let mut walker = TrieWalker::new(&mut trie_cursor, prefix_set.freeze());
let walker = TrieWalker::new(trie_cursor, prefix_set.freeze());
// Create a hash builder to rebuild the root node since it is not available in the database.
let mut hash_builder =
HashBuilder::default().with_proof_retainer(Vec::from([target_nibbles.clone()]));
let mut account_rlp = Vec::with_capacity(128);
while let Some(key) = walker.key() {
if walker.can_skip_current_node {
let value = walker.hash().unwrap();
let is_in_db_trie = walker.children_are_in_trie();
hash_builder.add_branch(key.clone(), value, is_in_db_trie);
}
let seek_key = match walker.next_unprocessed_key() {
Some(key) => key,
None => break, // no more keys
};
let next_key = walker.advance()?;
let mut next_account_entry = hashed_account_cursor.seek(seek_key)?;
while let Some((hashed_address, account)) = next_account_entry {
let account_nibbles = Nibbles::unpack(hashed_address);
if let Some(ref key) = next_key {
if key < &account_nibbles {
break
}
let mut account_node_iter = AccountNodeIter::new(walker, hashed_account_cursor);
while let Some(account_node) = account_node_iter.try_next()? {
match account_node {
AccountNode::Branch(node) => {
hash_builder.add_branch(node.key, node.value, node.children_are_in_trie);
}
AccountNode::Leaf(hashed_address, account) => {
let storage_root = if hashed_address == target_hashed_address {
let (storage_root, storage_proofs) =
self.storage_root_with_proofs(hashed_address, slots)?;
account_proof.set_account(account, storage_root, storage_proofs);
storage_root
} else {
self.storage_root(hashed_address)?
};
let storage_root = if hashed_address == target_hashed_address {
let (storage_root, storage_proofs) =
self.storage_root_with_proofs(hashed_address, slots)?;
account_proof.set_account(account, storage_root, storage_proofs);
storage_root
} else {
self.storage_root(hashed_address)?
};
account_rlp.clear();
let account = EthAccount::from(account).with_storage_root(storage_root);
account.encode(&mut account_rlp as &mut dyn BufMut);
account_rlp.clear();
let account = EthAccount::from(account).with_storage_root(storage_root);
account.encode(&mut &mut account_rlp as &mut dyn BufMut);
hash_builder.add_leaf(account_nibbles, &account_rlp);
// Move the next account entry
next_account_entry = hashed_account_cursor.next()?;
hash_builder.add_leaf(Nibbles::unpack(hashed_address), &account_rlp);
}
}
}
@@ -129,11 +111,6 @@ where
) -> Result<(B256, Vec<StorageProof>), StorageRootError> {
let mut hashed_storage_cursor = self.hashed_cursor_factory.hashed_storage_cursor()?;
let mut trie_cursor = StorageTrieCursor::new(
self.tx.cursor_dup_read::<tables::StoragesTrie>()?,
hashed_address,
);
let mut proofs = slots.iter().copied().map(StorageProof::new).collect::<Vec<_>>();
// short circuit on empty storage
@@ -143,52 +120,41 @@ where
let target_nibbles = proofs.iter().map(|p| p.nibbles.clone()).collect::<Vec<_>>();
let prefix_set = PrefixSetMut::from(target_nibbles.clone()).freeze();
let mut walker = TrieWalker::new(&mut trie_cursor, prefix_set);
let trie_cursor = StorageTrieCursor::new(
self.tx.cursor_dup_read::<tables::StoragesTrie>()?,
hashed_address,
);
let walker = TrieWalker::new(trie_cursor, prefix_set);
let mut hash_builder = HashBuilder::default().with_proof_retainer(target_nibbles);
while let Some(key) = walker.key() {
if walker.can_skip_current_node {
hash_builder.add_branch(key, walker.hash().unwrap(), walker.children_are_in_trie());
}
let seek_key = match walker.next_unprocessed_key() {
Some(key) => key,
None => break, // no more keys
};
let next_key = walker.advance()?;
let mut storage = hashed_storage_cursor.seek(hashed_address, seek_key)?;
while let Some(StorageEntry { key: hashed_key, value }) = storage {
let hashed_key_nibbles = Nibbles::unpack(hashed_key);
if let Some(ref key) = next_key {
if key < &hashed_key_nibbles {
break
let mut storage_node_iter =
StorageNodeIter::new(walker, hashed_storage_cursor, hashed_address);
while let Some(node) = storage_node_iter.try_next()? {
match node {
StorageNode::Branch(node) => {
hash_builder.add_branch(node.key, node.value, node.children_are_in_trie);
}
StorageNode::Leaf(hashed_slot, value) => {
let nibbles = Nibbles::unpack(hashed_slot);
if let Some(proof) = proofs.iter_mut().find(|proof| proof.nibbles == nibbles) {
proof.set_value(value);
}
hash_builder.add_leaf(nibbles, alloy_rlp::encode_fixed_size(&value).as_ref());
}
if let Some(proof) =
proofs.iter_mut().find(|proof| proof.nibbles == hashed_key_nibbles)
{
proof.set_value(value);
}
hash_builder
.add_leaf(hashed_key_nibbles, alloy_rlp::encode_fixed_size(&value).as_ref());
storage = hashed_storage_cursor.next()?;
}
}
let root = hash_builder.root();
let proof_nodes = hash_builder.take_proofs();
let all_proof_nodes = hash_builder.take_proofs();
for proof in proofs.iter_mut() {
proof.set_proof(
proof_nodes
.iter()
.filter(|(path, _)| proof.nibbles.starts_with(path))
.map(|(_, node)| node.clone())
.collect(),
);
// Iterate over all proof nodes and find the matching ones.
// The filtered results are guaranteed to be in order.
let matching_proof_nodes = all_proof_nodes
.iter()
.filter(|(path, _)| proof.nibbles.starts_with(path))
.map(|(_, node)| node.clone());
proof.set_proof(matching_proof_nodes.collect());
}
Ok((root, proofs))

View File

@@ -1,6 +1,7 @@
use crate::{
account::EthAccount,
hashed_cursor::{HashedAccountCursor, HashedCursorFactory, HashedStorageCursor},
hashed_cursor::{HashedCursorFactory, HashedStorageCursor},
node_iter::{AccountNode, AccountNodeIter, StorageNode, StorageNodeIter},
prefix_set::{PrefixSet, PrefixSetLoader, PrefixSetMut},
progress::{IntermediateStateRootState, StateRootProgress},
trie_cursor::{AccountTrieCursor, StorageTrieCursor},
@@ -8,13 +9,13 @@ use crate::{
walker::TrieWalker,
StateRootError, StorageRootError,
};
use alloy_rlp::Encodable;
use alloy_rlp::{BufMut, Encodable};
use reth_db::{tables, transaction::DbTx};
use reth_primitives::{
keccak256,
proofs::EMPTY_ROOT,
trie::{HashBuilder, Nibbles},
Address, BlockNumber, StorageEntry, B256,
Address, BlockNumber, B256,
};
use std::{
collections::{HashMap, HashSet},
@@ -224,136 +225,104 @@ where
tracing::debug!(target: "loader", "calculating state root");
let mut trie_updates = TrieUpdates::default();
let mut hashed_account_cursor = self.hashed_cursor_factory.hashed_account_cursor()?;
let mut trie_cursor =
AccountTrieCursor::new(self.tx.cursor_read::<tables::AccountsTrie>()?);
let hashed_account_cursor = self.hashed_cursor_factory.hashed_account_cursor()?;
let trie_cursor = AccountTrieCursor::new(self.tx.cursor_read::<tables::AccountsTrie>()?);
let (mut walker, mut hash_builder, mut last_account_key, mut last_walker_key) =
match self.previous_state {
Some(state) => (
TrieWalker::from_stack(
&mut trie_cursor,
state.walker_stack,
self.changed_account_prefixes,
),
let (mut hash_builder, mut account_node_iter) = match self.previous_state {
Some(state) => {
let walker = TrieWalker::from_stack(
trie_cursor,
state.walker_stack,
self.changed_account_prefixes,
);
(
state.hash_builder,
Some(state.last_account_key),
Some(state.last_walker_key),
),
None => (
TrieWalker::new(&mut trie_cursor, self.changed_account_prefixes),
HashBuilder::default(),
None,
None,
),
};
AccountNodeIter::new(walker, hashed_account_cursor)
.with_last_account_key(state.last_account_key),
)
}
None => {
let walker = TrieWalker::new(trie_cursor, self.changed_account_prefixes);
(HashBuilder::default(), AccountNodeIter::new(walker, hashed_account_cursor))
}
};
walker.set_updates(retain_updates);
account_node_iter.walker.set_updates(retain_updates);
hash_builder.set_updates(retain_updates);
let mut account_rlp = Vec::with_capacity(128);
let mut hashed_entries_walked = 0;
while let Some(key) = last_walker_key.take().or_else(|| walker.key()) {
// Take the last account key to make sure we take it into consideration only once.
let (next_key, mut next_account_entry) = match last_account_key.take() {
// Seek the last processed entry and take the next after.
Some(account_key) => {
hashed_account_cursor.seek(account_key)?;
(walker.key(), hashed_account_cursor.next()?)
while let Some(node) = account_node_iter.try_next()? {
match node {
AccountNode::Branch(node) => {
hash_builder.add_branch(node.key, node.value, node.children_are_in_trie);
}
None => {
if walker.can_skip_current_node {
let value = walker.hash().unwrap();
let is_in_db_trie = walker.children_are_in_trie();
hash_builder.add_branch(key.clone(), value, is_in_db_trie);
}
AccountNode::Leaf(hashed_address, account) => {
hashed_entries_walked += 1;
let seek_key = match walker.next_unprocessed_key() {
Some(key) => key,
None => break, // no more keys
// We assume we can always calculate a storage root without
// OOMing. This opens us up to a potential DOS vector if
// a contract had too many storage entries and they were
// all buffered w/o us returning and committing our intermediate
// progress.
// TODO: We can consider introducing the TrieProgress::Progress/Complete
// abstraction inside StorageRoot, but let's give it a try as-is for now.
let storage_root_calculator = StorageRoot::new_hashed(self.tx, hashed_address)
.with_hashed_cursor_factory(self.hashed_cursor_factory.clone())
.with_changed_prefixes(
self.changed_storage_prefixes
.get(&hashed_address)
.cloned()
.unwrap_or_default(),
);
let storage_root = if retain_updates {
let (root, storage_slots_walked, updates) =
storage_root_calculator.root_with_updates()?;
hashed_entries_walked += storage_slots_walked;
trie_updates.extend(updates.into_iter());
root
} else {
storage_root_calculator.root()?
};
(walker.advance()?, hashed_account_cursor.seek(seek_key)?)
}
};
let account = EthAccount::from(account).with_storage_root(storage_root);
while let Some((hashed_address, account)) = next_account_entry {
hashed_entries_walked += 1;
let account_nibbles = Nibbles::unpack(hashed_address);
account_rlp.clear();
account.encode(&mut account_rlp as &mut dyn BufMut);
if let Some(ref key) = next_key {
if key < &account_nibbles {
tracing::trace!(target: "loader", "breaking, already detected");
break
hash_builder.add_leaf(Nibbles::unpack(hashed_address), &account_rlp);
// Decide if we need to return intermediate progress.
let total_updates_len = trie_updates.len() +
account_node_iter.walker.updates_len() +
hash_builder.updates_len();
if retain_updates && total_updates_len as u64 >= self.threshold {
let (walker_stack, walker_updates) = account_node_iter.walker.split();
let (hash_builder, hash_builder_updates) = hash_builder.split();
let state = IntermediateStateRootState {
hash_builder,
walker_stack,
last_account_key: hashed_address,
};
trie_updates.extend(walker_updates.into_iter());
trie_updates.extend_with_account_updates(hash_builder_updates);
return Ok(StateRootProgress::Progress(
Box::new(state),
hashed_entries_walked,
trie_updates,
))
}
}
// We assume we can always calculate a storage root without
// OOMing. This opens us up to a potential DOS vector if
// a contract had too many storage entries and they were
// all buffered w/o us returning and committing our intermediate
// progress.
// TODO: We can consider introducing the TrieProgress::Progress/Complete
// abstraction inside StorageRoot, but let's give it a try as-is for now.
let storage_root_calculator = StorageRoot::new_hashed(self.tx, hashed_address)
.with_hashed_cursor_factory(self.hashed_cursor_factory.clone())
.with_changed_prefixes(
self.changed_storage_prefixes
.get(&hashed_address)
.cloned()
.unwrap_or_default(),
);
let storage_root = if retain_updates {
let (root, storage_slots_walked, updates) =
storage_root_calculator.root_with_updates()?;
hashed_entries_walked += storage_slots_walked;
trie_updates.extend(updates.into_iter());
root
} else {
storage_root_calculator.root()?
};
let account = EthAccount::from(account).with_storage_root(storage_root);
account_rlp.clear();
account.encode(&mut &mut account_rlp);
hash_builder.add_leaf(account_nibbles, &account_rlp);
// Decide if we need to return intermediate progress.
let total_updates_len =
trie_updates.len() + walker.updates_len() + hash_builder.updates_len();
if retain_updates && total_updates_len as u64 >= self.threshold {
let (walker_stack, walker_updates) = walker.split();
let (hash_builder, hash_builder_updates) = hash_builder.split();
let state = IntermediateStateRootState {
hash_builder,
walker_stack,
last_walker_key: key,
last_account_key: hashed_address,
};
trie_updates.extend(walker_updates.into_iter());
trie_updates.extend_with_account_updates(hash_builder_updates);
return Ok(StateRootProgress::Progress(
Box::new(state),
hashed_entries_walked,
trie_updates,
))
}
// Move the next account entry
next_account_entry = hashed_account_cursor.next()?;
}
}
let root = hash_builder.root();
let (_, walker_updates) = walker.split();
let (_, walker_updates) = account_node_iter.walker.split();
let (_, hash_builder_updates) = hash_builder.split();
trie_updates.extend(walker_updates.into_iter());
@@ -464,14 +433,8 @@ where
retain_updates: bool,
) -> Result<(B256, usize, TrieUpdates), StorageRootError> {
tracing::debug!(target: "trie::storage_root", hashed_address = ?self.hashed_address, "calculating storage root");
let mut hashed_storage_cursor = self.hashed_cursor_factory.hashed_storage_cursor()?;
let mut trie_cursor = StorageTrieCursor::new(
self.tx.cursor_dup_read::<tables::StoragesTrie>()?,
self.hashed_address,
);
// short circuit on empty storage
if hashed_storage_cursor.is_storage_empty(self.hashed_address)? {
return Ok((
@@ -481,43 +444,37 @@ where
))
}
let mut walker = TrieWalker::new(&mut trie_cursor, self.changed_prefixes.clone())
let trie_cursor = StorageTrieCursor::new(
self.tx.cursor_dup_read::<tables::StoragesTrie>()?,
self.hashed_address,
);
let walker = TrieWalker::new(trie_cursor, self.changed_prefixes.clone())
.with_updates(retain_updates);
let mut hash_builder = HashBuilder::default().with_updates(retain_updates);
let mut storage_slots_walked = 0;
while let Some(key) = walker.key() {
if walker.can_skip_current_node {
hash_builder.add_branch(key, walker.hash().unwrap(), walker.children_are_in_trie());
}
let seek_key = match walker.next_unprocessed_key() {
Some(key) => key,
None => break, // no more keys
};
let next_key = walker.advance()?;
let mut storage = hashed_storage_cursor.seek(self.hashed_address, seek_key)?;
while let Some(StorageEntry { key: hashed_key, value }) = storage {
storage_slots_walked += 1;
let storage_key_nibbles = Nibbles::unpack(hashed_key);
if let Some(ref key) = next_key {
if key < &storage_key_nibbles {
break
}
let mut storage_node_iter =
StorageNodeIter::new(walker, hashed_storage_cursor, self.hashed_address);
while let Some(node) = storage_node_iter.try_next()? {
match node {
StorageNode::Branch(node) => {
hash_builder.add_branch(node.key, node.value, node.children_are_in_trie);
}
StorageNode::Leaf(hashed_slot, value) => {
storage_slots_walked += 1;
hash_builder.add_leaf(
Nibbles::unpack(hashed_slot),
alloy_rlp::encode_fixed_size(&value).as_ref(),
);
}
hash_builder
.add_leaf(storage_key_nibbles, alloy_rlp::encode_fixed_size(&value).as_ref());
storage = hashed_storage_cursor.next()?;
}
}
let root = hash_builder.root();
let (_, hash_builder_updates) = hash_builder.split();
let (_, walker_updates) = walker.split();
let (_, walker_updates) = storage_node_iter.walker.split();
let mut trie_updates = TrieUpdates::default();
trie_updates.extend(walker_updates.into_iter());
@@ -529,7 +486,6 @@ where
}
#[cfg(test)]
#[allow(clippy::mutable_key_type)]
mod tests {
use super::*;
use crate::test_utils::{
@@ -548,7 +504,7 @@ mod tests {
keccak256,
proofs::triehash::KeccakHasher,
trie::{BranchNodeCompact, TrieMask},
Account, Address, B256, MAINNET, U256,
Account, Address, StorageEntry, B256, MAINNET, U256,
};
use reth_provider::{DatabaseProviderRW, ProviderFactory};
use std::{collections::BTreeMap, ops::Mul, str::FromStr};
@@ -621,7 +577,6 @@ mod tests {
}
#[test]
// TODO: Try to find the edge case by creating some more very complex trie.
fn branch_node_child_changes() {
incremental_vs_full_root(
&[

View File

@@ -14,20 +14,22 @@ impl<C> AccountTrieCursor<C> {
}
}
impl<C> TrieCursor<StoredNibbles> for AccountTrieCursor<C>
impl<C> TrieCursor for AccountTrieCursor<C>
where
C: DbCursorRO<tables::AccountsTrie>,
{
type Key = StoredNibbles;
fn seek_exact(
&mut self,
key: StoredNibbles,
key: Self::Key,
) -> Result<Option<(Vec<u8>, BranchNodeCompact)>, DatabaseError> {
Ok(self.0.seek_exact(key)?.map(|value| (value.0.inner.to_vec(), value.1)))
}
fn seek(
&mut self,
key: StoredNibbles,
key: Self::Key,
) -> Result<Option<(Vec<u8>, BranchNodeCompact)>, DatabaseError> {
Ok(self.0.seek(key)?.map(|value| (value.0.inner.to_vec(), value.1)))
}

View File

@@ -1,5 +1,5 @@
use crate::updates::TrieKey;
use reth_db::{table::Key, DatabaseError};
use reth_db::DatabaseError;
use reth_primitives::trie::BranchNodeCompact;
mod account_cursor;
@@ -11,13 +11,22 @@ pub use self::{
};
/// A cursor for navigating a trie that works with both Tables and DupSort tables.
pub trait TrieCursor<K: Key> {
#[auto_impl::auto_impl(&mut)]
pub trait TrieCursor {
/// The key type of the cursor.
type Key: From<Vec<u8>>;
/// Move the cursor to the key and return if it is an exact match.
fn seek_exact(&mut self, key: K)
-> Result<Option<(Vec<u8>, BranchNodeCompact)>, DatabaseError>;
fn seek_exact(
&mut self,
key: Self::Key,
) -> Result<Option<(Vec<u8>, BranchNodeCompact)>, DatabaseError>;
/// Move the cursor to the key and return a value matching of greater than the key.
fn seek(&mut self, key: K) -> Result<Option<(Vec<u8>, BranchNodeCompact)>, DatabaseError>;
fn seek(
&mut self,
key: Self::Key,
) -> Result<Option<(Vec<u8>, BranchNodeCompact)>, DatabaseError>;
/// Get the current entry.
fn current(&mut self) -> Result<Option<TrieKey>, DatabaseError>;

View File

@@ -24,13 +24,15 @@ impl<C> StorageTrieCursor<C> {
}
}
impl<C> TrieCursor<StoredNibblesSubKey> for StorageTrieCursor<C>
impl<C> TrieCursor for StorageTrieCursor<C>
where
C: DbDupCursorRO<tables::StoragesTrie> + DbCursorRO<tables::StoragesTrie>,
{
type Key = StoredNibblesSubKey;
fn seek_exact(
&mut self,
key: StoredNibblesSubKey,
key: Self::Key,
) -> Result<Option<(Vec<u8>, BranchNodeCompact)>, DatabaseError> {
Ok(self
.cursor
@@ -41,7 +43,7 @@ where
fn seek(
&mut self,
key: StoredNibblesSubKey,
key: Self::Key,
) -> Result<Option<(Vec<u8>, BranchNodeCompact)>, DatabaseError> {
Ok(self
.cursor

View File

@@ -77,7 +77,6 @@ impl TrieUpdates {
}
/// Extend the updates with account trie updates.
#[allow(clippy::mutable_key_type)]
pub fn extend_with_account_updates(&mut self, updates: HashMap<Nibbles, BranchNodeCompact>) {
self.extend(updates.into_iter().map(|(nibbles, node)| {
(TrieKey::AccountNode(nibbles.hex_data.to_vec().into()), TrieOp::Update(node))
@@ -85,7 +84,6 @@ impl TrieUpdates {
}
/// Extend the updates with storage trie updates.
#[allow(clippy::mutable_key_type)]
pub fn extend_with_storage_updates(
&mut self,
hashed_address: B256,

View File

@@ -3,19 +3,19 @@ use crate::{
trie_cursor::{CursorSubNode, TrieCursor},
updates::TrieUpdates,
};
use reth_db::{table::Key, DatabaseError};
use reth_db::DatabaseError;
use reth_primitives::{
trie::{BranchNodeCompact, Nibbles},
B256,
};
use std::marker::PhantomData;
/// `TrieWalker` is a structure that enables traversal of a Merkle trie.
/// It allows moving through the trie in a depth-first manner, skipping certain branches if the .
/// It allows moving through the trie in a depth-first manner, skipping certain branches
/// if they have not changed.
#[derive(Debug)]
pub struct TrieWalker<'a, K, C> {
pub struct TrieWalker<C> {
/// A mutable reference to a trie cursor instance used for navigating the trie.
pub cursor: &'a mut C,
pub cursor: C,
/// A vector containing the trie nodes that have been visited.
pub stack: Vec<CursorSubNode>,
/// A flag indicating whether the current node can be skipped when traversing the trie. This
@@ -26,12 +26,11 @@ pub struct TrieWalker<'a, K, C> {
pub changes: PrefixSet,
/// The trie updates to be applied to the trie.
trie_updates: Option<TrieUpdates>,
__phantom: PhantomData<K>,
}
impl<'a, K: Key + From<Vec<u8>>, C: TrieCursor<K>> TrieWalker<'a, K, C> {
impl<C: TrieCursor> TrieWalker<C> {
/// Constructs a new TrieWalker, setting up the initial state of the stack and cursor.
pub fn new(cursor: &'a mut C, changes: PrefixSet) -> Self {
pub fn new(cursor: C, changes: PrefixSet) -> Self {
// Initialize the walker with a single empty stack element.
let mut this = Self {
cursor,
@@ -39,7 +38,6 @@ impl<'a, K: Key + From<Vec<u8>>, C: TrieCursor<K>> TrieWalker<'a, K, C> {
stack: vec![CursorSubNode::default()],
can_skip_current_node: false,
trie_updates: None,
__phantom: PhantomData,
};
// Set up the root node of the trie in the stack, if it exists.
@@ -53,15 +51,9 @@ impl<'a, K: Key + From<Vec<u8>>, C: TrieCursor<K>> TrieWalker<'a, K, C> {
}
/// Constructs a new TrieWalker from existing stack and a cursor.
pub fn from_stack(cursor: &'a mut C, stack: Vec<CursorSubNode>, changes: PrefixSet) -> Self {
let mut this = Self {
cursor,
changes,
stack,
can_skip_current_node: false,
trie_updates: None,
__phantom: PhantomData,
};
pub fn from_stack(cursor: C, stack: Vec<CursorSubNode>, changes: PrefixSet) -> Self {
let mut this =
Self { cursor, changes, stack, can_skip_current_node: false, trie_updates: None };
this.update_skip_node();
this
}
@@ -255,7 +247,6 @@ impl<'a, K: Key + From<Vec<u8>>, C: TrieCursor<K>> TrieWalker<'a, K, C> {
#[cfg(test)]
mod tests {
use super::*;
use crate::{
prefix_set::PrefixSetMut,
@@ -316,10 +307,9 @@ mod tests {
test_cursor(storage_trie, &expected);
}
fn test_cursor<K, T>(mut trie: T, expected: &[Vec<u8>])
fn test_cursor<T>(mut trie: T, expected: &[Vec<u8>])
where
K: Key + From<Vec<u8>>,
T: TrieCursor<K>,
T: TrieCursor,
{
let mut walker = TrieWalker::new(&mut trie, Default::default());
assert!(walker.key().unwrap().is_empty());

View File

@@ -69,7 +69,7 @@ fn header_provider_example<T: HeaderProvider>(provider: T, number: u64) -> eyre:
assert_ne!(td, U256::ZERO);
// Can query headers by range as well, already sealed!
let headers = provider.sealed_headers_range(100..=200)?;
let headers = provider.sealed_headers_range(100..200)?;
assert_eq!(headers.len(), 100);
Ok(())
@@ -101,7 +101,7 @@ fn txs_provider_example<T: TransactionsProvider>(provider: T) -> eyre::Result<()
let _block = provider.transaction_block(txid)?;
// Can query the txs in the range [100, 200)
let _txs_by_tx_range = provider.transactions_by_tx_range(100..=200)?;
let _txs_by_tx_range = provider.transactions_by_tx_range(100..200)?;
// Can query the txs in the _block_ range [100, 200)]
let _txs_by_block_range = provider.transactions_by_block_range(100..200)?;

View File

@@ -17,7 +17,7 @@ use reth::{
Cli,
},
primitives::{Address, IntoRecoveredTransaction},
rpc::{compat::transaction_to_call_request, types::trace::parity::TraceType},
rpc::{compat::transaction::transaction_to_call_request, types::trace::parity::TraceType},
tasks::TaskSpawner,
transaction_pool::TransactionPool,
};