Compare commits

...

9 Commits
push ... v1.8.3

Author SHA1 Message Date
Matthias Seitz
4219741510 chore: bump 1.8.3 (#19379) 2025-10-29 13:09:06 +01:00
Matthias Seitz
53fcbcb9e9 chore: bump alloy-evm 0.21.3 2025-10-29 11:06:31 +01:00
Matthias Seitz
9c30bf7af5 chore: bump alloy 1.0.37 (#18795) 2025-09-30 15:29:51 +01:00
Tim
8950b4eb1e chore: bump version to 1.8.2 (#18792)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2025-09-30 15:03:40 +01:00
Delweng
1de013b21f fix(rpc/engine): check osaka in getBlobsV1 (#18669)
Signed-off-by: Delweng <delweng@gmail.com>
2025-09-30 14:57:37 +01:00
Matthias Seitz
95897e21b8 fix: remove cancun check (#18787) 2025-09-30 14:50:33 +01:00
nethoxa
12794769c1 fix(rpc): fix eth_config impl (#18744)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2025-09-30 14:50:27 +01:00
Mablr
611c307213 feat: make more EVM and RPC conversions fallible (#18685) 2025-09-30 14:50:23 +01:00
YK
994d73edf6 chore: bump rust to edition 2024 (#18692) 2025-09-30 14:50:22 +01:00
127 changed files with 1645 additions and 1600 deletions

427
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
[workspace.package]
version = "1.8.1"
edition = "2021"
version = "1.8.3"
edition = "2024"
rust-version = "1.88"
license = "MIT OR Apache-2.0"
homepage = "https://paradigmxyz.github.io/reth"
@@ -188,6 +188,7 @@ rust.missing_docs = "warn"
rust.rust_2018_idioms = { level = "deny", priority = -1 }
rust.unreachable_pub = "warn"
rust.unused_must_use = "deny"
rust.rust_2024_incompatible_pat = "warn"
rustdoc.all = "warn"
# rust.unnameable-types = "warn"
@@ -477,7 +478,7 @@ revm-inspectors = "0.30.0"
alloy-chains = { version = "0.2.5", default-features = false }
alloy-dyn-abi = "1.3.1"
alloy-eip2124 = { version = "0.2.0", default-features = false }
alloy-evm = { version = "0.21.0", default-features = false }
alloy-evm = { version = "0.21.3", default-features = false }
alloy-primitives = { version = "1.3.1", default-features = false, features = ["map-foldhash"] }
alloy-rlp = { version = "0.3.10", default-features = false, features = ["core-net"] }
alloy-sol-macro = "1.3.1"
@@ -486,36 +487,36 @@ alloy-trie = { version = "0.9.1", default-features = false }
alloy-hardforks = "0.3.5"
alloy-consensus = { version = "1.0.35", default-features = false }
alloy-contract = { version = "1.0.35", default-features = false }
alloy-eips = { version = "1.0.35", default-features = false }
alloy-genesis = { version = "1.0.35", default-features = false }
alloy-json-rpc = { version = "1.0.35", default-features = false }
alloy-network = { version = "1.0.35", default-features = false }
alloy-network-primitives = { version = "1.0.35", default-features = false }
alloy-provider = { version = "1.0.35", features = ["reqwest"], default-features = false }
alloy-pubsub = { version = "1.0.35", default-features = false }
alloy-rpc-client = { version = "1.0.35", default-features = false }
alloy-rpc-types = { version = "1.0.35", features = ["eth"], default-features = false }
alloy-rpc-types-admin = { version = "1.0.35", default-features = false }
alloy-rpc-types-anvil = { version = "1.0.35", default-features = false }
alloy-rpc-types-beacon = { version = "1.0.35", default-features = false }
alloy-rpc-types-debug = { version = "1.0.35", default-features = false }
alloy-rpc-types-engine = { version = "1.0.35", default-features = false }
alloy-rpc-types-eth = { version = "1.0.35", default-features = false }
alloy-rpc-types-mev = { version = "1.0.35", default-features = false }
alloy-rpc-types-trace = { version = "1.0.35", default-features = false }
alloy-rpc-types-txpool = { version = "1.0.35", default-features = false }
alloy-serde = { version = "1.0.35", default-features = false }
alloy-signer = { version = "1.0.35", default-features = false }
alloy-signer-local = { version = "1.0.35", default-features = false }
alloy-transport = { version = "1.0.35" }
alloy-transport-http = { version = "1.0.35", features = ["reqwest-rustls-tls"], default-features = false }
alloy-transport-ipc = { version = "1.0.35", default-features = false }
alloy-transport-ws = { version = "1.0.35", default-features = false }
alloy-consensus = { version = "1.0.37", default-features = false }
alloy-contract = { version = "1.0.37", default-features = false }
alloy-eips = { version = "1.0.37", default-features = false }
alloy-genesis = { version = "1.0.37", default-features = false }
alloy-json-rpc = { version = "1.0.37", default-features = false }
alloy-network = { version = "1.0.37", default-features = false }
alloy-network-primitives = { version = "1.0.37", default-features = false }
alloy-provider = { version = "1.0.37", features = ["reqwest"], default-features = false }
alloy-pubsub = { version = "1.0.37", default-features = false }
alloy-rpc-client = { version = "1.0.37", default-features = false }
alloy-rpc-types = { version = "1.0.37", features = ["eth"], default-features = false }
alloy-rpc-types-admin = { version = "1.0.37", default-features = false }
alloy-rpc-types-anvil = { version = "1.0.37", default-features = false }
alloy-rpc-types-beacon = { version = "1.0.37", default-features = false }
alloy-rpc-types-debug = { version = "1.0.37", default-features = false }
alloy-rpc-types-engine = { version = "1.0.37", default-features = false }
alloy-rpc-types-eth = { version = "1.0.37", default-features = false }
alloy-rpc-types-mev = { version = "1.0.37", default-features = false }
alloy-rpc-types-trace = { version = "1.0.37", default-features = false }
alloy-rpc-types-txpool = { version = "1.0.37", default-features = false }
alloy-serde = { version = "1.0.37", default-features = false }
alloy-signer = { version = "1.0.37", default-features = false }
alloy-signer-local = { version = "1.0.37", default-features = false }
alloy-transport = { version = "1.0.37" }
alloy-transport-http = { version = "1.0.37", features = ["reqwest-rustls-tls"], default-features = false }
alloy-transport-ipc = { version = "1.0.37", default-features = false }
alloy-transport-ws = { version = "1.0.37", default-features = false }
# op
alloy-op-evm = { version = "0.21.0", default-features = false }
alloy-op-evm = { version = "0.21.3", default-features = false }
alloy-op-hardforks = "0.3.5"
op-alloy-rpc-types = { version = "0.20.0", default-features = false }
op-alloy-rpc-types-engine = { version = "0.20.0", default-features = false }
@@ -666,7 +667,7 @@ snmalloc-rs = { version = "0.3.7", features = ["build_cc"] }
aes = "0.8.1"
ahash = "0.8"
anyhow = "1.0"
bindgen = { version = "0.70", default-features = false }
bindgen = { version = "0.71", default-features = false }
block-padding = "0.3.2"
cc = "=1.2.15"
cipher = "0.4.3"

View File

@@ -26,7 +26,9 @@ use reth_cli_runner::CliRunner;
fn main() {
// Enable backtraces unless a RUST_BACKTRACE value has already been explicitly provided.
if std::env::var_os("RUST_BACKTRACE").is_none() {
std::env::set_var("RUST_BACKTRACE", "1");
unsafe {
std::env::set_var("RUST_BACKTRACE", "1");
}
}
// Run until either exit or sigint or sigterm

View File

@@ -141,10 +141,10 @@ impl<R: Read> ProgressReader<R> {
impl<R: Read> Read for ProgressReader<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let bytes = self.reader.read(buf)?;
if bytes > 0 {
if let Err(e) = self.progress.update(bytes as u64) {
return Err(io::Error::other(e));
}
if bytes > 0 &&
let Err(e) = self.progress.update(bytes as u64)
{
return Err(io::Error::other(e));
}
Ok(bytes)
}

View File

@@ -192,7 +192,7 @@ pub fn build_import_pipeline_impl<N, C, E>(
static_file_producer: StaticFileProducer<ProviderFactory<N>>,
disable_exec: bool,
evm_config: E,
) -> eyre::Result<(Pipeline<N>, impl futures::Stream<Item = NodeEvent<N::Primitives>>)>
) -> eyre::Result<(Pipeline<N>, impl futures::Stream<Item = NodeEvent<N::Primitives>> + use<N, C, E>)>
where
N: ProviderNodeTypes,
C: FullConsensus<N::Primitives, Error = reth_consensus::ConsensusError> + 'static,

View File

@@ -7,7 +7,7 @@ use std::{
fmt, mem, ptr,
};
extern "C" {
unsafe extern "C" {
fn backtrace_symbols_fd(buffer: *const *mut libc::c_void, size: libc::c_int, fd: libc::c_int);
}

View File

@@ -96,10 +96,11 @@ where
}
// Connect last node with the first if there are more than two
if idx + 1 == num_nodes && num_nodes > 2 {
if let Some(first_node) = nodes.first_mut() {
node.connect(first_node).await;
}
if idx + 1 == num_nodes &&
num_nodes > 2 &&
let Some(first_node) = nodes.first_mut()
{
node.connect(first_node).await;
}
nodes.push(node);
@@ -207,10 +208,11 @@ where
}
// Connect last node with the first if there are more than two
if idx + 1 == num_nodes && num_nodes > 2 {
if let Some(first_node) = nodes.first_mut() {
node.connect(first_node).await;
}
if idx + 1 == num_nodes &&
num_nodes > 2 &&
let Some(first_node) = nodes.first_mut()
{
node.connect(first_node).await;
}
}

View File

@@ -150,14 +150,13 @@ where
loop {
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
if !check && wait_finish_checkpoint {
if let Some(checkpoint) =
self.inner.provider.get_stage_checkpoint(StageId::Finish)?
{
if checkpoint.block_number >= number {
check = true
}
}
if !check &&
wait_finish_checkpoint &&
let Some(checkpoint) =
self.inner.provider.get_stage_checkpoint(StageId::Finish)? &&
checkpoint.block_number >= number
{
check = true
}
if check {
@@ -178,10 +177,10 @@ where
pub async fn wait_unwind(&self, number: BlockNumber) -> eyre::Result<()> {
loop {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
if let Some(checkpoint) = self.inner.provider.get_stage_checkpoint(StageId::Headers)? {
if checkpoint.block_number == number {
break
}
if let Some(checkpoint) = self.inner.provider.get_stage_checkpoint(StageId::Headers)? &&
checkpoint.block_number == number
{
break
}
}
Ok(())
@@ -207,14 +206,13 @@ where
// wait for the block to commit
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
if let Some(latest_block) =
self.inner.provider.block_by_number_or_tag(BlockNumberOrTag::Latest)?
self.inner.provider.block_by_number_or_tag(BlockNumberOrTag::Latest)? &&
latest_block.header().number() == block_number
{
if latest_block.header().number() == block_number {
// make sure the block hash we submitted via FCU engine api is the new latest
// block using an RPC call
assert_eq!(latest_block.header().hash_slow(), block_hash);
break
}
// make sure the block hash we submitted via FCU engine api is the new latest
// block using an RPC call
assert_eq!(latest_block.header().hash_slow(), block_hash);
break
}
}
Ok(())

View File

@@ -174,16 +174,13 @@ where
];
// if we're on a fork, validate it now that it's canonical
if let Ok(active_state) = env.active_node_state() {
if let Some(fork_base) = active_state.current_fork_base {
debug!(
"MakeCanonical: Adding fork validation from base block {}",
fork_base
);
actions.push(Box::new(ValidateFork::new(fork_base)));
// clear the fork base since we're now canonical
env.active_node_state_mut()?.current_fork_base = None;
}
if let Ok(active_state) = env.active_node_state() &&
let Some(fork_base) = active_state.current_fork_base
{
debug!("MakeCanonical: Adding fork validation from base block {}", fork_base);
actions.push(Box::new(ValidateFork::new(fork_base)));
// clear the fork base since we're now canonical
env.active_node_state_mut()?.current_fork_base = None;
}
let mut sequence = Sequence::new(actions);

View File

@@ -195,15 +195,15 @@ where
.copied()
.ok_or_else(|| eyre::eyre!("Block tag '{}' not found in registry", self.tag))?;
if let Some(expected_node) = self.expected_node_idx {
if node_idx != expected_node {
return Err(eyre::eyre!(
"Block tag '{}' came from node {} but expected node {}",
self.tag,
node_idx,
expected_node
));
}
if let Some(expected_node) = self.expected_node_idx &&
node_idx != expected_node
{
return Err(eyre::eyre!(
"Block tag '{}' came from node {} but expected node {}",
self.tag,
node_idx,
expected_node
));
}
debug!(

View File

@@ -219,7 +219,7 @@ where
let is_dev = self.is_dev;
let node_count = self.network.node_count;
let attributes_generator = self.create_attributes_generator::<N>();
let attributes_generator = Self::create_static_attributes_generator::<N>();
let result = setup_engine_with_connection::<N>(
node_count,
@@ -304,10 +304,11 @@ where
.await
}
/// Create the attributes generator function
fn create_attributes_generator<N>(
&self,
) -> impl Fn(u64) -> <<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes + Copy
/// Create a static attributes generator that doesn't capture any instance data
fn create_static_attributes_generator<N>(
) -> impl Fn(u64) -> <<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes
+ Copy
+ use<N, I>
where
N: NodeBuilderHelper,
LocalPayloadAttributesBuilder<N::ChainSpec>: PayloadAttributesBuilder<

View File

@@ -89,11 +89,11 @@ async fn test_apply_with_import() -> Result<()> {
)
.await;
if let Ok(Some(block)) = block_result {
if block.header.number == 10 {
debug!("Pipeline finished, block 10 is fully available");
break;
}
if let Ok(Some(block)) = block_result &&
block.header.number == 10
{
debug!("Pipeline finished, block 10 is fully available");
break;
}
if start.elapsed() > std::time::Duration::from_secs(10) {

View File

@@ -664,7 +664,7 @@ mod tests {
unsafe impl GlobalAlloc for TrackingAllocator {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
let ret = self.inner.alloc(layout);
let ret = unsafe { self.inner.alloc(layout) };
if !ret.is_null() {
self.allocated.fetch_add(layout.size(), Ordering::SeqCst);
self.total_allocated.fetch_add(layout.size(), Ordering::SeqCst);
@@ -674,7 +674,7 @@ mod tests {
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
self.allocated.fetch_sub(layout.size(), Ordering::SeqCst);
self.inner.dealloc(ptr, layout)
unsafe { self.inner.dealloc(ptr, layout) }
}
}
}

View File

@@ -1801,10 +1801,10 @@ where
fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> ProviderResult<PayloadStatus> {
// Edge case: the `latestValid` field is the zero hash if the parent block is the terminal
// PoW block, which we need to identify by looking at the parent's block difficulty
if let Some(parent) = self.sealed_header_by_hash(parent_hash)? {
if !parent.difficulty().is_zero() {
parent_hash = B256::ZERO;
}
if let Some(parent) = self.sealed_header_by_hash(parent_hash)? &&
!parent.difficulty().is_zero()
{
parent_hash = B256::ZERO;
}
let valid_parent_hash = self.latest_valid_hash_for_invalid_payload(parent_hash)?;
@@ -1970,62 +1970,65 @@ where
let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();
// check if the downloaded block is the tracked finalized block
let mut exceeds_backfill_threshold = if let Some(buffered_finalized) = sync_target_state
.as_ref()
.and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
{
// if we have buffered the finalized block, we should check how far
// we're off
self.exceeds_backfill_run_threshold(canonical_tip_num, buffered_finalized.number())
} else {
// check if the distance exceeds the threshold for backfill sync
self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number)
};
// If this is invoked after we downloaded a block we can check if this block is the
// finalized block
if let (Some(downloaded_block), Some(ref state)) = (downloaded_block, sync_target_state) {
if downloaded_block.hash == state.finalized_block_hash {
// we downloaded the finalized block and can now check how far we're off
exceeds_backfill_threshold =
self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number);
}
}
let exceeds_backfill_threshold =
match (downloaded_block.as_ref(), sync_target_state.as_ref()) {
// if we downloaded the finalized block we can now check how far we're off
(Some(downloaded_block), Some(state))
if downloaded_block.hash == state.finalized_block_hash =>
{
self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number)
}
_ => match sync_target_state
.as_ref()
.and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
{
Some(buffered_finalized) => {
// if we have buffered the finalized block, we should check how far we're
// off
self.exceeds_backfill_run_threshold(
canonical_tip_num,
buffered_finalized.number(),
)
}
None => {
// check if the distance exceeds the threshold for backfill sync
self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number)
}
},
};
// if the number of missing blocks is greater than the max, trigger backfill
if exceeds_backfill_threshold {
if let Some(state) = sync_target_state {
// if we have already canonicalized the finalized block, we should skip backfill
match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
Err(err) => {
warn!(target: "engine::tree", %err, "Failed to get finalized block header");
if exceeds_backfill_threshold && let Some(state) = sync_target_state {
// if we have already canonicalized the finalized block, we should skip backfill
match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
Err(err) => {
warn!(target: "engine::tree", %err, "Failed to get finalized block header");
}
Ok(None) => {
// ensure the finalized block is known (not the zero hash)
if !state.finalized_block_hash.is_zero() {
// we don't have the block yet and the distance exceeds the allowed
// threshold
return Some(state.finalized_block_hash)
}
Ok(None) => {
// ensure the finalized block is known (not the zero hash)
if !state.finalized_block_hash.is_zero() {
// we don't have the block yet and the distance exceeds the allowed
// threshold
return Some(state.finalized_block_hash)
}
// OPTIMISTIC SYNCING
//
// It can happen when the node is doing an
// optimistic sync, where the CL has no knowledge of the finalized hash,
// but is expecting the EL to sync as high
// as possible before finalizing.
//
// This usually doesn't happen on ETH mainnet since CLs use the more
// secure checkpoint syncing.
//
// However, optimism chains will do this. The risk of a reorg is however
// low.
debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
return Some(state.head_block_hash)
}
Ok(Some(_)) => {
// we're fully synced to the finalized block
}
// OPTIMISTIC SYNCING
//
// It can happen when the node is doing an
// optimistic sync, where the CL has no knowledge of the finalized hash,
// but is expecting the EL to sync as high
// as possible before finalizing.
//
// This usually doesn't happen on ETH mainnet since CLs use the more
// secure checkpoint syncing.
//
// However, optimism chains will do this. The risk of a reorg is however
// low.
debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
return Some(state.head_block_hash)
}
Ok(Some(_)) => {
// we're fully synced to the finalized block
}
}
}

View File

@@ -224,14 +224,14 @@ where
pub fn evm_env_for<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
&self,
input: &BlockOrPayload<T>,
) -> EvmEnvFor<Evm>
) -> Result<EvmEnvFor<Evm>, Evm::Error>
where
V: PayloadValidator<T, Block = N::Block>,
Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
{
match input {
BlockOrPayload::Payload(payload) => self.evm_config.evm_env_for_payload(payload),
BlockOrPayload::Block(block) => self.evm_config.evm_env(block.header()),
BlockOrPayload::Payload(payload) => Ok(self.evm_config.evm_env_for_payload(payload)),
BlockOrPayload::Block(block) => Ok(self.evm_config.evm_env(block.header())?),
}
}
@@ -259,14 +259,14 @@ where
pub fn execution_ctx_for<'a, T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
&self,
input: &'a BlockOrPayload<T>,
) -> ExecutionCtxFor<'a, Evm>
) -> Result<ExecutionCtxFor<'a, Evm>, Evm::Error>
where
V: PayloadValidator<T, Block = N::Block>,
Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
{
match input {
BlockOrPayload::Payload(payload) => self.evm_config.context_for_payload(payload),
BlockOrPayload::Block(block) => self.evm_config.context_for_block(block),
BlockOrPayload::Payload(payload) => Ok(self.evm_config.context_for_payload(payload)),
BlockOrPayload::Block(block) => Ok(self.evm_config.context_for_block(block)?),
}
}
@@ -370,7 +370,7 @@ where
.into())
};
let evm_env = self.evm_env_for(&input);
let evm_env = self.evm_env_for(&input).map_err(NewPayloadError::other)?;
let env = ExecutionEnv { evm_env, hash: input.hash(), parent_hash: input.parent_hash() };
@@ -740,7 +740,8 @@ where
.build();
let evm = self.evm_config.evm_with_env(&mut db, env.evm_env.clone());
let ctx = self.execution_ctx_for(input);
let ctx =
self.execution_ctx_for(input).map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?;
let mut executor = self.evm_config.create_executor(evm, ctx);
if !self.config.precompile_cache_disabled() {

View File

@@ -140,10 +140,10 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
let next = ready!(this.stream.poll_next_unpin(cx));
if let Some(msg) = &next {
if let Err(error) = this.store.on_message(msg, SystemTime::now()) {
error!(target: "engine::stream::store", ?msg, %error, "Error handling Engine API message");
}
if let Some(msg) = &next &&
let Err(error) = this.store.on_message(msg, SystemTime::now())
{
error!(target: "engine::stream::store", ?msg, %error, "Error handling Engine API message");
}
Poll::Ready(next)
}

View File

@@ -285,8 +285,8 @@ where
.with_bundle_update()
.build();
let ctx = evm_config.context_for_block(&reorg_target);
let evm = evm_config.evm_for_block(&mut state, &reorg_target);
let ctx = evm_config.context_for_block(&reorg_target).map_err(RethError::other)?;
let evm = evm_config.evm_for_block(&mut state, &reorg_target).map_err(RethError::other)?;
let mut builder = evm_config.create_block_builder(evm, &reorg_target_parent, ctx);
builder.apply_pre_execution_changes()?;

View File

@@ -106,12 +106,11 @@ impl<Http: HttpClient + Clone> EraClient<Http> {
if let Ok(mut dir) = fs::read_dir(&self.folder).await {
while let Ok(Some(entry)) = dir.next_entry().await {
if let Some(name) = entry.file_name().to_str() {
if let Some(number) = self.file_name_to_number(name) {
if max.is_none() || matches!(max, Some(max) if number > max) {
max.replace(number + 1);
}
}
if let Some(name) = entry.file_name().to_str() &&
let Some(number) = self.file_name_to_number(name) &&
(max.is_none() || matches!(max, Some(max) if number > max))
{
max.replace(number + 1);
}
}
}
@@ -125,14 +124,13 @@ impl<Http: HttpClient + Clone> EraClient<Http> {
if let Ok(mut dir) = fs::read_dir(&self.folder).await {
while let Ok(Some(entry)) = dir.next_entry().await {
if let Some(name) = entry.file_name().to_str() {
if let Some(number) = self.file_name_to_number(name) {
if number < index || number >= last {
eprintln!("Deleting file {}", entry.path().display());
eprintln!("{number} < {index} || {number} >= {last}");
reth_fs_util::remove_file(entry.path())?;
}
}
if let Some(name) = entry.file_name().to_str() &&
let Some(number) = self.file_name_to_number(name) &&
(number < index || number >= last)
{
eprintln!("Deleting file {}", entry.path().display());
eprintln!("{number} < {index} || {number} >= {last}");
reth_fs_util::remove_file(entry.path())?;
}
}
}
@@ -208,12 +206,12 @@ impl<Http: HttpClient + Clone> EraClient<Http> {
let mut writer = io::BufWriter::new(file);
while let Some(line) = lines.next_line().await? {
if let Some(j) = line.find(".era1") {
if let Some(i) = line[..j].rfind(|c: char| !c.is_alphanumeric() && c != '-') {
let era = &line[i + 1..j + 5];
writer.write_all(era.as_bytes()).await?;
writer.write_all(b"\n").await?;
}
if let Some(j) = line.find(".era1") &&
let Some(i) = line[..j].rfind(|c: char| !c.is_alphanumeric() && c != '-')
{
let era = &line[i + 1..j + 5];
writer.write_all(era.as_bytes()).await?;
writer.write_all(b"\n").await?;
}
}
writer.flush().await?;

View File

@@ -17,16 +17,16 @@ pub fn read_dir(
(|| {
let path = entry?.path();
if path.extension() == Some("era1".as_ref()) {
if let Some(last) = path.components().next_back() {
let str = last.as_os_str().to_string_lossy().to_string();
let parts = str.split('-').collect::<Vec<_>>();
if path.extension() == Some("era1".as_ref()) &&
let Some(last) = path.components().next_back()
{
let str = last.as_os_str().to_string_lossy().to_string();
let parts = str.split('-').collect::<Vec<_>>();
if parts.len() == 3 {
let number = usize::from_str(parts[1])?;
if parts.len() == 3 {
let number = usize::from_str(parts[1])?;
return Ok(Some((number, path.into_boxed_path())));
}
return Ok(Some((number, path.into_boxed_path())));
}
}
if path.file_name() == Some("checksums.txt".as_ref()) {

View File

@@ -262,47 +262,47 @@ impl<Http: HttpClient + Clone + Send + Sync + 'static + Unpin> Stream for Starti
self.fetch_file_list();
}
if self.state == State::FetchFileList {
if let Poll::Ready(result) = self.fetch_file_list.poll_unpin(cx) {
match result {
Ok(_) => self.delete_outside_range(),
Err(e) => {
self.fetch_file_list();
if self.state == State::FetchFileList &&
let Poll::Ready(result) = self.fetch_file_list.poll_unpin(cx)
{
match result {
Ok(_) => self.delete_outside_range(),
Err(e) => {
self.fetch_file_list();
return Poll::Ready(Some(Box::pin(async move { Err(e) })));
}
return Poll::Ready(Some(Box::pin(async move { Err(e) })));
}
}
}
if self.state == State::DeleteOutsideRange {
if let Poll::Ready(result) = self.delete_outside_range.poll_unpin(cx) {
match result {
Ok(_) => self.recover_index(),
Err(e) => {
self.delete_outside_range();
if self.state == State::DeleteOutsideRange &&
let Poll::Ready(result) = self.delete_outside_range.poll_unpin(cx)
{
match result {
Ok(_) => self.recover_index(),
Err(e) => {
self.delete_outside_range();
return Poll::Ready(Some(Box::pin(async move { Err(e) })));
}
return Poll::Ready(Some(Box::pin(async move { Err(e) })));
}
}
}
if self.state == State::RecoverIndex {
if let Poll::Ready(last) = self.recover_index.poll_unpin(cx) {
self.last = last;
self.count_files();
}
if self.state == State::RecoverIndex &&
let Poll::Ready(last) = self.recover_index.poll_unpin(cx)
{
self.last = last;
self.count_files();
}
if self.state == State::CountFiles {
if let Poll::Ready(downloaded) = self.files_count.poll_unpin(cx) {
let max_missing = self
.max_files
.saturating_sub(downloaded + self.downloading)
.max(self.last.unwrap_or_default().saturating_sub(self.index));
self.state = State::Missing(max_missing);
}
if self.state == State::CountFiles &&
let Poll::Ready(downloaded) = self.files_count.poll_unpin(cx)
{
let max_missing = self
.max_files
.saturating_sub(downloaded + self.downloading)
.max(self.last.unwrap_or_default().saturating_sub(self.index));
self.state = State::Missing(max_missing);
}
if let State::Missing(max_missing) = self.state {
@@ -316,18 +316,16 @@ impl<Http: HttpClient + Clone + Send + Sync + 'static + Unpin> Stream for Starti
}
}
if let State::NextUrl(max_missing) = self.state {
if let Poll::Ready(url) = self.next_url.poll_unpin(cx) {
self.state = State::Missing(max_missing - 1);
if let State::NextUrl(max_missing) = self.state &&
let Poll::Ready(url) = self.next_url.poll_unpin(cx)
{
self.state = State::Missing(max_missing - 1);
return Poll::Ready(url.transpose().map(|url| -> DownloadFuture {
let mut client = self.client.clone();
return Poll::Ready(url.transpose().map(|url| -> DownloadFuture {
let mut client = self.client.clone();
Box::pin(
async move { client.download_to_file(url?).await.map(EraRemoteMeta::new) },
)
}));
}
Box::pin(async move { client.download_to_file(url?).await.map(EraRemoteMeta::new) })
}));
}
Poll::Pending

View File

@@ -302,10 +302,10 @@ where
if number <= last_header_number {
continue;
}
if let Some(target) = target {
if number > target {
break;
}
if let Some(target) = target &&
number > target
{
break;
}
let hash = header.hash_slow();
@@ -351,19 +351,18 @@ where
// Database cursor for hash to number index
let mut cursor_header_numbers =
provider.tx_ref().cursor_write::<RawTable<tables::HeaderNumbers>>()?;
let mut first_sync = false;
// If we only have the genesis block hash, then we are at first sync, and we can remove it,
// add it to the collector and use tx.append on all hashes.
if provider.tx_ref().entries::<RawTable<tables::HeaderNumbers>>()? == 1 {
if let Some((hash, block_number)) = cursor_header_numbers.last()? {
if block_number.value()? == 0 {
hash_collector.insert(hash.key()?, 0)?;
cursor_header_numbers.delete_current()?;
first_sync = true;
}
}
}
let first_sync = if provider.tx_ref().entries::<RawTable<tables::HeaderNumbers>>()? == 1 &&
let Some((hash, block_number)) = cursor_header_numbers.last()? &&
block_number.value()? == 0
{
hash_collector.insert(hash.key()?, 0)?;
cursor_header_numbers.delete_current()?;
true
} else {
false
};
let interval = (total_headers / 10).max(8192);

View File

@@ -37,17 +37,19 @@ where
// operation as hashing that is required for state root got calculated in every
// transaction This was replaced with is_success flag.
// See more about EIP here: https://eips.ethereum.org/EIPS/eip-658
if chain_spec.is_byzantium_active_at_block(block.header().number()) {
if let Err(error) =
verify_receipts(block.header().receipts_root(), block.header().logs_bloom(), receipts)
{
let receipts = receipts
.iter()
.map(|r| Bytes::from(r.with_bloom_ref().encoded_2718()))
.collect::<Vec<_>>();
tracing::debug!(%error, ?receipts, "receipts verification failed");
return Err(error)
}
if chain_spec.is_byzantium_active_at_block(block.header().number()) &&
let Err(error) = verify_receipts(
block.header().receipts_root(),
block.header().logs_bloom(),
receipts,
)
{
let receipts = receipts
.iter()
.map(|r| Bytes::from(r.with_bloom_ref().encoded_2718()))
.collect::<Vec<_>>();
tracing::debug!(%error, ?receipts, "receipts verification failed");
return Err(error)
}
// Validate that the header requests hash matches the calculated requests hash

View File

@@ -154,7 +154,7 @@ where
&self.block_assembler
}
fn evm_env(&self, header: &Header) -> EvmEnv {
fn evm_env(&self, header: &Header) -> Result<EvmEnv, Self::Error> {
let blob_params = self.chain_spec().blob_params_at_timestamp(header.timestamp);
let spec = config::revm_spec(self.chain_spec(), header);
@@ -189,7 +189,7 @@ where
blob_excess_gas_and_price,
};
EvmEnv { cfg_env, block_env }
Ok(EvmEnv { cfg_env, block_env })
}
fn next_evm_env(
@@ -265,26 +265,29 @@ where
Ok((cfg, block_env).into())
}
fn context_for_block<'a>(&self, block: &'a SealedBlock<Block>) -> EthBlockExecutionCtx<'a> {
EthBlockExecutionCtx {
fn context_for_block<'a>(
&self,
block: &'a SealedBlock<Block>,
) -> Result<EthBlockExecutionCtx<'a>, Self::Error> {
Ok(EthBlockExecutionCtx {
parent_hash: block.header().parent_hash,
parent_beacon_block_root: block.header().parent_beacon_block_root,
ommers: &block.body().ommers,
withdrawals: block.body().withdrawals.as_ref().map(Cow::Borrowed),
}
})
}
fn context_for_next_block(
&self,
parent: &SealedHeader,
attributes: Self::NextBlockEnvCtx,
) -> EthBlockExecutionCtx<'_> {
EthBlockExecutionCtx {
) -> Result<EthBlockExecutionCtx<'_>, Self::Error> {
Ok(EthBlockExecutionCtx {
parent_hash: parent.hash(),
parent_beacon_block_root: attributes.parent_beacon_block_root,
ommers: &[],
withdrawals: attributes.withdrawals.map(Cow::Owned),
}
})
}
}
@@ -401,7 +404,7 @@ mod tests {
// Use the `EthEvmConfig` to fill the `cfg_env` and `block_env` based on the ChainSpec,
// Header, and total difficulty
let EvmEnv { cfg_env, .. } =
EthEvmConfig::new(Arc::new(chain_spec.clone())).evm_env(&header);
EthEvmConfig::new(Arc::new(chain_spec.clone())).evm_env(&header).unwrap();
// Assert that the chain ID in the `cfg_env` is correctly set to the chain ID of the
// ChainSpec

View File

@@ -160,7 +160,7 @@ impl ConfigureEvm for MockEvmConfig {
self.inner.block_assembler()
}
fn evm_env(&self, header: &Header) -> EvmEnvFor<Self> {
fn evm_env(&self, header: &Header) -> Result<EvmEnvFor<Self>, Self::Error> {
self.inner.evm_env(header)
}
@@ -175,7 +175,7 @@ impl ConfigureEvm for MockEvmConfig {
fn context_for_block<'a>(
&self,
block: &'a SealedBlock<BlockTy<Self::Primitives>>,
) -> reth_evm::ExecutionCtxFor<'a, Self> {
) -> Result<reth_evm::ExecutionCtxFor<'a, Self>, Self::Error> {
self.inner.context_for_block(block)
}
@@ -183,7 +183,7 @@ impl ConfigureEvm for MockEvmConfig {
&self,
parent: &SealedHeader,
attributes: Self::NextBlockEnvCtx,
) -> reth_evm::ExecutionCtxFor<'_, Self> {
) -> Result<reth_evm::ExecutionCtxFor<'_, Self>, Self::Error> {
self.inner.context_for_next_block(parent, attributes)
}
}

View File

@@ -323,7 +323,7 @@ async fn test_eth_config() -> eyre::Result<()> {
let config = provider.client().request_noparams::<EthConfig>("eth_config").await?;
assert_eq!(config.last.unwrap().activation_time, 0);
assert_eq!(config.last.unwrap().activation_time, osaka_timestamp);
assert_eq!(config.current.activation_time, prague_timestamp);
assert_eq!(config.next.unwrap().activation_time, osaka_timestamp);

View File

@@ -559,6 +559,7 @@ where
let result = self
.strategy_factory
.executor_for_block(&mut self.db, block)
.map_err(BlockExecutionError::other)?
.execute_block(block.transactions_recovered())?;
self.db.merge_transitions(BundleRetention::Reverts);
@@ -577,6 +578,7 @@ where
let result = self
.strategy_factory
.executor_for_block(&mut self.db, block)
.map_err(BlockExecutionError::other)?
.with_state_hook(Some(Box::new(state_hook)))
.execute_block(block.transactions_recovered())?;

View File

@@ -219,7 +219,7 @@ pub trait ConfigureEvm: Clone + Debug + Send + Sync + Unpin {
fn block_assembler(&self) -> &Self::BlockAssembler;
/// Creates a new [`EvmEnv`] for the given header.
fn evm_env(&self, header: &HeaderTy<Self::Primitives>) -> EvmEnvFor<Self>;
fn evm_env(&self, header: &HeaderTy<Self::Primitives>) -> Result<EvmEnvFor<Self>, Self::Error>;
/// Returns the configured [`EvmEnv`] for `parent + 1` block.
///
@@ -246,7 +246,7 @@ pub trait ConfigureEvm: Clone + Debug + Send + Sync + Unpin {
fn context_for_block<'a>(
&self,
block: &'a SealedBlock<BlockTy<Self::Primitives>>,
) -> ExecutionCtxFor<'a, Self>;
) -> Result<ExecutionCtxFor<'a, Self>, Self::Error>;
/// Returns the configured [`BlockExecutorFactory::ExecutionCtx`] for `parent + 1`
/// block.
@@ -254,7 +254,7 @@ pub trait ConfigureEvm: Clone + Debug + Send + Sync + Unpin {
&self,
parent: &SealedHeader<HeaderTy<Self::Primitives>>,
attributes: Self::NextBlockEnvCtx,
) -> ExecutionCtxFor<'_, Self>;
) -> Result<ExecutionCtxFor<'_, Self>, Self::Error>;
/// Returns a [`TxEnv`] from a transaction and [`Address`].
fn tx_env(&self, transaction: impl IntoTxEnv<TxEnvFor<Self>>) -> TxEnvFor<Self> {
@@ -285,9 +285,9 @@ pub trait ConfigureEvm: Clone + Debug + Send + Sync + Unpin {
&self,
db: DB,
header: &HeaderTy<Self::Primitives>,
) -> EvmFor<Self, DB> {
let evm_env = self.evm_env(header);
self.evm_with_env(db, evm_env)
) -> Result<EvmFor<Self, DB>, Self::Error> {
let evm_env = self.evm_env(header)?;
Ok(self.evm_with_env(db, evm_env))
}
/// Returns a new EVM with the given database configured with the given environment settings,
@@ -327,10 +327,10 @@ pub trait ConfigureEvm: Clone + Debug + Send + Sync + Unpin {
&'a self,
db: &'a mut State<DB>,
block: &'a SealedBlock<<Self::Primitives as NodePrimitives>::Block>,
) -> impl BlockExecutorFor<'a, Self::BlockExecutorFactory, DB> {
let evm = self.evm_for_block(db, block.header());
let ctx = self.context_for_block(block);
self.create_executor(evm, ctx)
) -> Result<impl BlockExecutorFor<'a, Self::BlockExecutorFactory, DB>, Self::Error> {
let evm = self.evm_for_block(db, block.header())?;
let ctx = self.context_for_block(block)?;
Ok(self.create_executor(evm, ctx))
}
/// Creates a [`BlockBuilder`]. Should be used when building a new block.
@@ -407,7 +407,7 @@ pub trait ConfigureEvm: Clone + Debug + Send + Sync + Unpin {
) -> Result<impl BlockBuilder<Primitives = Self::Primitives>, Self::Error> {
let evm_env = self.next_evm_env(parent, &attributes)?;
let evm = self.evm_with_env(db, evm_env);
let ctx = self.context_for_next_block(parent, attributes);
let ctx = self.context_for_next_block(parent, attributes)?;
Ok(self.create_block_builder(evm, parent, ctx))
}

View File

@@ -43,7 +43,7 @@ where
self.inner().block_assembler()
}
fn evm_env(&self, header: &HeaderTy<Self::Primitives>) -> EvmEnvFor<Self> {
fn evm_env(&self, header: &HeaderTy<Self::Primitives>) -> Result<EvmEnvFor<Self>, Self::Error> {
self.inner().evm_env(header)
}
@@ -58,7 +58,7 @@ where
fn context_for_block<'a>(
&self,
block: &'a SealedBlock<BlockTy<Self::Primitives>>,
) -> crate::ExecutionCtxFor<'a, Self> {
) -> Result<crate::ExecutionCtxFor<'a, Self>, Self::Error> {
self.inner().context_for_block(block)
}
@@ -66,7 +66,7 @@ where
&self,
parent: &SealedHeader<HeaderTy<Self::Primitives>>,
attributes: Self::NextBlockEnvCtx,
) -> crate::ExecutionCtxFor<'_, Self> {
) -> Result<crate::ExecutionCtxFor<'_, Self>, Self::Error> {
self.inner().context_for_next_block(parent, attributes)
}
}

View File

@@ -501,11 +501,11 @@ where
.next_notification_id
.checked_sub(this.min_id)
.expect("exex expected notification ID outside the manager's range");
if let Some(notification) = this.buffer.get(notification_index) {
if let Poll::Ready(Err(err)) = exex.send(cx, notification) {
// The channel was closed, which is irrecoverable for the manager
return Poll::Ready(Err(err.into()))
}
if let Some(notification) = this.buffer.get(notification_index) &&
let Poll::Ready(Err(err)) = exex.send(cx, notification)
{
// The channel was closed, which is irrecoverable for the manager
return Poll::Ready(Err(err.into()))
}
min_id = min_id.min(exex.next_notification_id);
this.exex_handles.push(exex);

View File

@@ -59,11 +59,11 @@ impl BanList {
pub fn evict_peers(&mut self, now: Instant) -> Vec<PeerId> {
let mut evicted = Vec::new();
self.banned_peers.retain(|peer, until| {
if let Some(until) = until {
if now > *until {
evicted.push(*peer);
return false
}
if let Some(until) = until &&
now > *until
{
evicted.push(*peer);
return false
}
true
});
@@ -74,11 +74,11 @@ impl BanList {
pub fn evict_ips(&mut self, now: Instant) -> Vec<IpAddr> {
let mut evicted = Vec::new();
self.banned_ips.retain(|peer, until| {
if let Some(until) = until {
if now > *until {
evicted.push(*peer);
return false
}
if let Some(until) = until &&
now > *until
{
evicted.push(*peer);
return false
}
true
});

View File

@@ -627,10 +627,10 @@ impl Discv4Service {
/// Sets the external Ip to the configured external IP if [`NatResolver::ExternalIp`].
fn resolve_external_ip(&mut self) {
if let Some(r) = &self.resolve_external_ip_interval {
if let Some(external_ip) = r.resolver().as_external_ip() {
self.set_external_ip_addr(external_ip);
}
if let Some(r) = &self.resolve_external_ip_interval &&
let Some(external_ip) = r.resolver().as_external_ip()
{
self.set_external_ip_addr(external_ip);
}
}
@@ -904,10 +904,10 @@ impl Discv4Service {
/// Check if the peer has an active bond.
fn has_bond(&self, remote_id: PeerId, remote_ip: IpAddr) -> bool {
if let Some(timestamp) = self.received_pongs.last_pong(remote_id, remote_ip) {
if timestamp.elapsed() < self.config.bond_expiration {
return true
}
if let Some(timestamp) = self.received_pongs.last_pong(remote_id, remote_ip) &&
timestamp.elapsed() < self.config.bond_expiration
{
return true
}
false
}
@@ -3048,12 +3048,11 @@ mod tests {
loop {
tokio::select! {
Some(update) = updates.next() => {
if let DiscoveryUpdate::Added(record) = update {
if record.id == peerid_1 {
if let DiscoveryUpdate::Added(record) = update
&& record.id == peerid_1 {
bootnode_appeared = true;
break;
}
}
}
_ = &mut timeout => break,
}

View File

@@ -152,10 +152,10 @@ impl ConfigBuilder {
/// Adds a comma-separated list of enodes, serialized unsigned node records, to boot nodes.
pub fn add_serialized_unsigned_boot_nodes(mut self, enodes: &[&str]) -> Self {
for node in enodes {
if let Ok(node) = node.parse() {
if let Ok(node) = BootNode::from_unsigned(node) {
self.bootstrap_nodes.insert(node);
}
if let Ok(node) = node.parse() &&
let Ok(node) = BootNode::from_unsigned(node)
{
self.bootstrap_nodes.insert(node);
}
}
@@ -411,14 +411,14 @@ pub fn discv5_sockets_wrt_rlpx_addr(
let discv5_socket_ipv6 =
discv5_addr_ipv6.map(|ip| SocketAddrV6::new(ip, discv5_port_ipv6, 0, 0));
if let Some(discv5_addr) = discv5_addr_ipv4 {
if discv5_addr != rlpx_addr {
debug!(target: "net::discv5",
%discv5_addr,
%rlpx_addr,
"Overwriting discv5 IPv4 address with RLPx IPv4 address, limited to one advertised IP address per IP version"
);
}
if let Some(discv5_addr) = discv5_addr_ipv4 &&
discv5_addr != rlpx_addr
{
debug!(target: "net::discv5",
%discv5_addr,
%rlpx_addr,
"Overwriting discv5 IPv4 address with RLPx IPv4 address, limited to one advertised IP address per IP version"
);
}
// overwrite discv5 ipv4 addr with RLPx address. this is since there is no
@@ -430,14 +430,14 @@ pub fn discv5_sockets_wrt_rlpx_addr(
let discv5_socket_ipv4 =
discv5_addr_ipv4.map(|ip| SocketAddrV4::new(ip, discv5_port_ipv4));
if let Some(discv5_addr) = discv5_addr_ipv6 {
if discv5_addr != rlpx_addr {
debug!(target: "net::discv5",
%discv5_addr,
%rlpx_addr,
"Overwriting discv5 IPv6 address with RLPx IPv6 address, limited to one advertised IP address per IP version"
);
}
if let Some(discv5_addr) = discv5_addr_ipv6 &&
discv5_addr != rlpx_addr
{
debug!(target: "net::discv5",
%discv5_addr,
%rlpx_addr,
"Overwriting discv5 IPv6 address with RLPx IPv6 address, limited to one advertised IP address per IP version"
);
}
// overwrite discv5 ipv6 addr with RLPx address. this is since there is no

View File

@@ -80,12 +80,12 @@ impl<R: Resolver, K: EnrKeyUnambiguous> QueryPool<R, K> {
// queue in new queries if we have capacity
'queries: while self.active_queries.len() < self.rate_limit.limit() as usize {
if self.rate_limit.poll_ready(cx).is_ready() {
if let Some(query) = self.queued_queries.pop_front() {
self.rate_limit.tick();
self.active_queries.push(query);
continue 'queries
}
if self.rate_limit.poll_ready(cx).is_ready() &&
let Some(query) = self.queued_queries.pop_front()
{
self.rate_limit.tick();
self.active_queries.push(query);
continue 'queries
}
break
}

View File

@@ -172,19 +172,16 @@ where
///
/// Returns `None` if no more requests are required.
fn next_request(&mut self) -> Option<HeadersRequest> {
if let Some(local_head) = self.local_block_number() {
if self.next_request_block_number > local_head {
let request = calc_next_request(
local_head,
self.next_request_block_number,
self.request_limit,
);
// need to shift the tracked request block number based on the number of requested
// headers so follow-up requests will use that as start.
self.next_request_block_number -= request.limit;
if let Some(local_head) = self.local_block_number() &&
self.next_request_block_number > local_head
{
let request =
calc_next_request(local_head, self.next_request_block_number, self.request_limit);
// need to shift the tracked request block number based on the number of requested
// headers so follow-up requests will use that as start.
self.next_request_block_number -= request.limit;
return Some(request)
}
return Some(request)
}
None

View File

@@ -179,18 +179,18 @@ where
}
// Ensure peer's total difficulty is reasonable
if let StatusMessage::Legacy(s) = their_status_message {
if s.total_difficulty.bit_len() > 160 {
unauth
.disconnect(DisconnectReason::ProtocolBreach)
.await
.map_err(EthStreamError::from)?;
return Err(EthHandshakeError::TotalDifficultyBitLenTooLarge {
got: s.total_difficulty.bit_len(),
maximum: 160,
}
.into());
if let StatusMessage::Legacy(s) = their_status_message &&
s.total_difficulty.bit_len() > 160
{
unauth
.disconnect(DisconnectReason::ProtocolBreach)
.await
.map_err(EthStreamError::from)?;
return Err(EthHandshakeError::TotalDifficultyBitLenTooLarge {
got: s.total_difficulty.bit_len(),
maximum: 160,
}
.into());
}
// Fork validation

View File

@@ -656,13 +656,11 @@ impl<N: NetworkPrimitives> NetworkConfigBuilder<N> {
// If default DNS config is used then we add the known dns network to bootstrap from
if let Some(dns_networks) =
dns_discovery_config.as_mut().and_then(|c| c.bootstrap_dns_networks.as_mut())
dns_discovery_config.as_mut().and_then(|c| c.bootstrap_dns_networks.as_mut()) &&
dns_networks.is_empty() &&
let Some(link) = chain_spec.chain().public_dns_network_protocol()
{
if dns_networks.is_empty() {
if let Some(link) = chain_spec.chain().public_dns_network_protocol() {
dns_networks.insert(link.parse().expect("is valid DNS link entry"));
}
}
dns_networks.insert(link.parse().expect("is valid DNS link entry"));
}
NetworkConfig {

View File

@@ -267,12 +267,11 @@ impl Discovery {
while let Some(Poll::Ready(Some(update))) =
self.discv5_updates.as_mut().map(|updates| updates.poll_next_unpin(cx))
{
if let Some(discv5) = self.discv5.as_mut() {
if let Some(DiscoveredPeer { node_record, fork_id }) =
if let Some(discv5) = self.discv5.as_mut() &&
let Some(DiscoveredPeer { node_record, fork_id }) =
discv5.on_discv5_update(update)
{
self.on_node_record_update(node_record, fork_id);
}
{
self.on_node_record_update(node_record, fork_id);
}
}

View File

@@ -116,12 +116,12 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
///
/// Returns `true` if this a newer block
pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: B256, number: u64) -> bool {
if let Some(peer) = self.peers.get_mut(peer_id) {
if number > peer.best_number {
peer.best_hash = hash;
peer.best_number = number;
return true
}
if let Some(peer) = self.peers.get_mut(peer_id) &&
number > peer.best_number
{
peer.best_hash = hash;
peer.best_number = number;
return true
}
false
}

View File

@@ -382,14 +382,15 @@ impl PeersManager {
/// Bans the peer temporarily with the configured ban timeout
fn ban_peer(&mut self, peer_id: PeerId) {
let mut ban_duration = self.ban_duration;
if let Some(peer) = self.peers.get(&peer_id) {
if peer.is_trusted() || peer.is_static() {
// For misbehaving trusted or static peers, we provide a bit more leeway when
// penalizing them.
ban_duration = self.backoff_durations.low / 2;
}
}
let ban_duration = if let Some(peer) = self.peers.get(&peer_id) &&
(peer.is_trusted() || peer.is_static())
{
// For misbehaving trusted or static peers, we provide a bit more leeway when
// penalizing them.
self.backoff_durations.low / 2
} else {
self.ban_duration
};
self.ban_list.ban_peer_until(peer_id, std::time::Instant::now() + ban_duration);
self.queued_actions.push_back(PeerAction::BanPeer { peer_id });

View File

@@ -738,11 +738,11 @@ impl<N: NetworkPrimitives> Future for ActiveSession<N> {
while this.internal_request_timeout_interval.poll_tick(cx).is_ready() {
// check for timed out requests
if this.check_timed_out_requests(Instant::now()) {
if let Poll::Ready(Ok(_)) = this.to_session_manager.poll_reserve(cx) {
let msg = ActiveSessionMessage::ProtocolBreach { peer_id: this.remote_peer_id };
this.pending_message_to_session = Some(msg);
}
if this.check_timed_out_requests(Instant::now()) &&
let Poll::Ready(Ok(_)) = this.to_session_manager.poll_reserve(cx)
{
let msg = ActiveSessionMessage::ProtocolBreach { peer_id: this.remote_peer_id };
this.pending_message_to_session = Some(msg);
}
}

View File

@@ -80,10 +80,10 @@ impl SessionCounter {
}
const fn ensure(current: u32, limit: Option<u32>) -> Result<(), ExceedsSessionLimit> {
if let Some(limit) = limit {
if current >= limit {
return Err(ExceedsSessionLimit(limit))
}
if let Some(limit) = limit &&
current >= limit
{
return Err(ExceedsSessionLimit(limit))
}
Ok(())
}

View File

@@ -697,12 +697,11 @@ impl<Pool: TransactionPool, N: NetworkPrimitives, PBundle: TransactionPolicies>
}
};
if is_eth68_message {
if let Some((actual_ty_byte, _)) = *metadata_ref_mut {
if let Ok(parsed_tx_type) = TxType::try_from(actual_ty_byte) {
tx_types_counter.increase_by_tx_type(parsed_tx_type);
}
}
if is_eth68_message &&
let Some((actual_ty_byte, _)) = *metadata_ref_mut &&
let Ok(parsed_tx_type) = TxType::try_from(actual_ty_byte)
{
tx_types_counter.increase_by_tx_type(parsed_tx_type);
}
let decision = self

View File

@@ -280,18 +280,18 @@ where
Client: BlockClient,
{
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ResponseResult<Client::Header, Client::Body>> {
if let Some(fut) = Pin::new(&mut self.header).as_pin_mut() {
if let Poll::Ready(res) = fut.poll(cx) {
self.header = None;
return Poll::Ready(ResponseResult::Header(res))
}
if let Some(fut) = Pin::new(&mut self.header).as_pin_mut() &&
let Poll::Ready(res) = fut.poll(cx)
{
self.header = None;
return Poll::Ready(ResponseResult::Header(res))
}
if let Some(fut) = Pin::new(&mut self.body).as_pin_mut() {
if let Poll::Ready(res) = fut.poll(cx) {
self.body = None;
return Poll::Ready(ResponseResult::Body(res))
}
if let Some(fut) = Pin::new(&mut self.body).as_pin_mut() &&
let Poll::Ready(res) = fut.poll(cx)
{
self.body = None;
return Poll::Ready(ResponseResult::Body(res))
}
Poll::Pending
@@ -621,18 +621,18 @@ where
&mut self,
cx: &mut Context<'_>,
) -> Poll<RangeResponseResult<Client::Header, Client::Body>> {
if let Some(fut) = Pin::new(&mut self.headers).as_pin_mut() {
if let Poll::Ready(res) = fut.poll(cx) {
self.headers = None;
return Poll::Ready(RangeResponseResult::Header(res))
}
if let Some(fut) = Pin::new(&mut self.headers).as_pin_mut() &&
let Poll::Ready(res) = fut.poll(cx)
{
self.headers = None;
return Poll::Ready(RangeResponseResult::Header(res))
}
if let Some(fut) = Pin::new(&mut self.bodies).as_pin_mut() {
if let Poll::Ready(res) = fut.poll(cx) {
self.bodies = None;
return Poll::Ready(RangeResponseResult::Body(res))
}
if let Some(fut) = Pin::new(&mut self.bodies).as_pin_mut() &&
let Poll::Ready(res) = fut.poll(cx)
{
self.bodies = None;
return Poll::Ready(RangeResponseResult::Body(res))
}
Poll::Pending

View File

@@ -63,11 +63,11 @@ impl NodeRecord {
/// See also [`std::net::Ipv6Addr::to_ipv4_mapped`]
pub fn convert_ipv4_mapped(&mut self) -> bool {
// convert IPv4 mapped IPv6 address
if let IpAddr::V6(v6) = self.address {
if let Some(v4) = v6.to_ipv4_mapped() {
self.address = v4.into();
return true
}
if let IpAddr::V6(v6) = self.address &&
let Some(v4) = v6.to_ipv4_mapped()
{
self.address = v4.into();
return true
}
false
}

View File

@@ -956,23 +956,24 @@ where
where
T: FullNodeTypes<Provider: StaticFileProviderFactory>,
{
if self.node_config().pruning.bodies_pre_merge {
if let Some(merge_block) =
self.chain_spec().ethereum_fork_activation(EthereumHardfork::Paris).block_number()
{
// Ensure we only expire transactions after we synced past the merge block.
let Some(latest) = self.blockchain_db().latest_header()? else { return Ok(()) };
if latest.number() > merge_block {
let provider = self.blockchain_db().static_file_provider();
if provider
.get_lowest_transaction_static_file_block()
.is_some_and(|lowest| lowest < merge_block)
{
info!(target: "reth::cli", merge_block, "Expiring pre-merge transactions");
provider.delete_transactions_below(merge_block)?;
} else {
debug!(target: "reth::cli", merge_block, "No pre-merge transactions to expire");
}
if self.node_config().pruning.bodies_pre_merge &&
let Some(merge_block) = self
.chain_spec()
.ethereum_fork_activation(EthereumHardfork::Paris)
.block_number()
{
// Ensure we only expire transactions after we synced past the merge block.
let Some(latest) = self.blockchain_db().latest_header()? else { return Ok(()) };
if latest.number() > merge_block {
let provider = self.blockchain_db().static_file_provider();
if provider
.get_lowest_transaction_static_file_block()
.is_some_and(|lowest| lowest < merge_block)
{
info!(target: "reth::cli", merge_block, "Expiring pre-merge transactions");
provider.delete_transactions_below(merge_block)?;
} else {
debug!(target: "reth::cli", merge_block, "No pre-merge transactions to expire");
}
}
}

View File

@@ -181,14 +181,14 @@ where
let response =
timeout(READ_TIMEOUT, conn.read_json()).await.map_err(|_| EthStatsError::Timeout)??;
if let Some(ack) = response.get("emit") {
if ack.get(0) == Some(&Value::String("ready".to_string())) {
info!(
target: "ethstats",
"Login successful to EthStats server as node_id {}", self.credentials.node_id
);
return Ok(());
}
if let Some(ack) = response.get("emit") &&
ack.get(0) == Some(&Value::String("ready".to_string()))
{
info!(
target: "ethstats",
"Login successful to EthStats server as node_id {}", self.credentials.node_id
);
return Ok(());
}
debug!(target: "ethstats", "Login failed: Unauthorized or unexpected login response");
@@ -595,10 +595,10 @@ where
tokio::spawn(async move {
loop {
let head = canonical_stream.next().await;
if let Some(head) = head {
if head_tx.send(head).await.is_err() {
break;
}
if let Some(head) = head &&
head_tx.send(head).await.is_err()
{
break;
}
}
@@ -681,10 +681,10 @@ where
/// Attempts to close the connection cleanly and logs any errors
/// that occur during the process.
async fn disconnect(&self) {
if let Some(conn) = self.conn.write().await.take() {
if let Err(e) = conn.close().await {
debug!(target: "ethstats", "Error closing connection: {}", e);
}
if let Some(conn) = self.conn.write().await.take() &&
let Err(e) = conn.close().await
{
debug!(target: "ethstats", "Error closing connection: {}", e);
}
}
@@ -733,16 +733,13 @@ mod tests {
// Handle ping
while let Some(Ok(msg)) = ws_stream.next().await {
if let Message::Text(text) = msg {
if text.contains("node-ping") {
let pong = json!({
"emit": ["node-pong", {"id": "test-node"}]
});
ws_stream
.send(Message::Text(Utf8Bytes::from(pong.to_string())))
.await
.unwrap();
}
if let Message::Text(text) = msg &&
text.contains("node-ping")
{
let pong = json!({
"emit": ["node-pong", {"id": "test-node"}]
});
ws_stream.send(Message::Text(Utf8Bytes::from(pong.to_string()))).await.unwrap();
}
}
});

View File

@@ -13,7 +13,9 @@ fn main() {
// Enable backtraces unless a RUST_BACKTRACE value has already been explicitly provided.
if std::env::var_os("RUST_BACKTRACE").is_none() {
std::env::set_var("RUST_BACKTRACE", "1");
unsafe {
std::env::set_var("RUST_BACKTRACE", "1");
}
}
if let Err(err) =

View File

@@ -459,33 +459,33 @@ impl OpGenesisInfo {
.unwrap_or_default(),
..Default::default()
};
if let Some(optimism_base_fee_info) = &info.optimism_chain_info.base_fee_info {
if let (Some(elasticity), Some(denominator)) = (
if let Some(optimism_base_fee_info) = &info.optimism_chain_info.base_fee_info &&
let (Some(elasticity), Some(denominator)) = (
optimism_base_fee_info.eip1559_elasticity,
optimism_base_fee_info.eip1559_denominator,
) {
let base_fee_params = if let Some(canyon_denominator) =
optimism_base_fee_info.eip1559_denominator_canyon
{
BaseFeeParamsKind::Variable(
vec![
(
EthereumHardfork::London.boxed(),
BaseFeeParams::new(denominator as u128, elasticity as u128),
),
(
OpHardfork::Canyon.boxed(),
BaseFeeParams::new(canyon_denominator as u128, elasticity as u128),
),
]
.into(),
)
} else {
BaseFeeParams::new(denominator as u128, elasticity as u128).into()
};
)
{
let base_fee_params = if let Some(canyon_denominator) =
optimism_base_fee_info.eip1559_denominator_canyon
{
BaseFeeParamsKind::Variable(
vec![
(
EthereumHardfork::London.boxed(),
BaseFeeParams::new(denominator as u128, elasticity as u128),
),
(
OpHardfork::Canyon.boxed(),
BaseFeeParams::new(canyon_denominator as u128, elasticity as u128),
),
]
.into(),
)
} else {
BaseFeeParams::new(denominator as u128, elasticity as u128).into()
};
info.base_fee_params = base_fee_params;
}
info.base_fee_params = base_fee_params;
}
info
@@ -498,19 +498,18 @@ pub fn make_op_genesis_header(genesis: &Genesis, hardforks: &ChainHardforks) ->
// If Isthmus is active, overwrite the withdrawals root with the storage root of predeploy
// `L2ToL1MessagePasser.sol`
if hardforks.fork(OpHardfork::Isthmus).active_at_timestamp(header.timestamp) {
if let Some(predeploy) = genesis.alloc.get(&ADDRESS_L2_TO_L1_MESSAGE_PASSER) {
if let Some(storage) = &predeploy.storage {
header.withdrawals_root =
Some(storage_root_unhashed(storage.iter().filter_map(|(k, v)| {
if v.is_zero() {
None
} else {
Some((*k, (*v).into()))
}
})));
}
}
if hardforks.fork(OpHardfork::Isthmus).active_at_timestamp(header.timestamp) &&
let Some(predeploy) = genesis.alloc.get(&ADDRESS_L2_TO_L1_MESSAGE_PASSER) &&
let Some(storage) = &predeploy.storage
{
header.withdrawals_root =
Some(storage_root_unhashed(storage.iter().filter_map(|(k, v)| {
if v.is_zero() {
None
} else {
Some((*k, (*v).into()))
}
})));
}
header

View File

@@ -141,11 +141,10 @@ where
// Ensure that receipts hasn't been initialized apart from `init_genesis`.
if let Some(num_receipts) =
static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts)
static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts) &&
num_receipts > 0
{
if num_receipts > 0 {
eyre::bail!("Expected no receipts in storage, but found {num_receipts}.");
}
eyre::bail!("Expected no receipts in storage, but found {num_receipts}.");
}
match static_file_provider.get_highest_static_file_block(StaticFileSegment::Receipts) {
Some(receipts_block) => {

View File

@@ -303,7 +303,7 @@ mod tests {
// Verify deposit transaction
let deposit_tx = match &deposit_decoded.transaction {
OpTypedTransaction::Legacy(ref tx) => tx,
OpTypedTransaction::Legacy(tx) => tx,
_ => panic!("Expected legacy transaction for NFT deposit"),
};
@@ -345,7 +345,7 @@ mod tests {
assert!(system_decoded.is_legacy());
let system_tx = match &system_decoded.transaction {
OpTypedTransaction::Legacy(ref tx) => tx,
OpTypedTransaction::Legacy(tx) => tx,
_ => panic!("Expected Legacy transaction"),
};

View File

@@ -93,21 +93,21 @@ pub fn validate_block_post_execution<R: DepositReceipt>(
// operation as hashing that is required for state root got calculated in every
// transaction This was replaced with is_success flag.
// See more about EIP here: https://eips.ethereum.org/EIPS/eip-658
if chain_spec.is_byzantium_active_at_block(header.number()) {
if let Err(error) = verify_receipts_optimism(
if chain_spec.is_byzantium_active_at_block(header.number()) &&
let Err(error) = verify_receipts_optimism(
header.receipts_root(),
header.logs_bloom(),
receipts,
chain_spec,
header.timestamp(),
) {
let receipts = receipts
.iter()
.map(|r| Bytes::from(r.with_bloom_ref().encoded_2718()))
.collect::<Vec<_>>();
tracing::debug!(%error, ?receipts, "receipts verification failed");
return Err(error)
}
)
{
let receipts = receipts
.iter()
.map(|r| Bytes::from(r.with_bloom_ref().encoded_2718()))
.collect::<Vec<_>>();
tracing::debug!(%error, ?receipts, "receipts verification failed");
return Err(error)
}
// Check if gas used matches the value set in header.

View File

@@ -151,7 +151,7 @@ where
&self.block_assembler
}
fn evm_env(&self, header: &Header) -> EvmEnv<OpSpecId> {
fn evm_env(&self, header: &Header) -> Result<EvmEnv<OpSpecId>, Self::Error> {
let spec = config::revm_spec(self.chain_spec(), header);
let cfg_env = CfgEnv::new().with_chain_id(self.chain_spec().chain().id()).with_spec(spec);
@@ -181,7 +181,7 @@ where
blob_excess_gas_and_price,
};
EvmEnv { cfg_env, block_env }
Ok(EvmEnv { cfg_env, block_env })
}
fn next_evm_env(
@@ -222,24 +222,27 @@ where
Ok(EvmEnv { cfg_env, block_env })
}
fn context_for_block(&self, block: &'_ SealedBlock<N::Block>) -> OpBlockExecutionCtx {
OpBlockExecutionCtx {
fn context_for_block(
&self,
block: &'_ SealedBlock<N::Block>,
) -> Result<OpBlockExecutionCtx, Self::Error> {
Ok(OpBlockExecutionCtx {
parent_hash: block.header().parent_hash(),
parent_beacon_block_root: block.header().parent_beacon_block_root(),
extra_data: block.header().extra_data().clone(),
}
})
}
fn context_for_next_block(
&self,
parent: &SealedHeader<N::BlockHeader>,
attributes: Self::NextBlockEnvCtx,
) -> OpBlockExecutionCtx {
OpBlockExecutionCtx {
) -> Result<OpBlockExecutionCtx, Self::Error> {
Ok(OpBlockExecutionCtx {
parent_hash: parent.hash(),
parent_beacon_block_root: attributes.parent_beacon_block_root,
extra_data: attributes.extra_data,
}
})
}
}
@@ -359,7 +362,8 @@ mod tests {
// Header, and total difficulty
let EvmEnv { cfg_env, .. } =
OpEvmConfig::optimism(Arc::new(OpChainSpec { inner: chain_spec.clone() }))
.evm_env(&header);
.evm_env(&header)
.unwrap();
// Assert that the chain ID in the `cfg_env` is correctly set to the chain ID of the
// ChainSpec

View File

@@ -103,7 +103,12 @@ where
/// Returns `None` if the flashblock have no `base` or the base is not a child block of latest.
fn build_args(
&mut self,
) -> Option<BuildArgs<impl IntoIterator<Item = WithEncoded<Recovered<N::SignedTx>>>>> {
) -> Option<
BuildArgs<
impl IntoIterator<Item = WithEncoded<Recovered<N::SignedTx>>>
+ use<N, S, EvmConfig, Provider>,
>,
> {
let Some(base) = self.blocks.payload_base() else {
trace!(
flashblock_number = ?self.blocks.block_number(),
@@ -115,11 +120,11 @@ where
};
// attempt an initial consecutive check
if let Some(latest) = self.builder.provider().latest_header().ok().flatten() {
if latest.hash() != base.parent_hash {
trace!(flashblock_parent=?base.parent_hash, flashblock_number=base.block_number, local_latest=?latest.num_hash(), "Skipping non consecutive build attempt");
return None;
}
if let Some(latest) = self.builder.provider().latest_header().ok().flatten() &&
latest.hash() != base.parent_hash
{
trace!(flashblock_parent=?base.parent_hash, flashblock_number=base.block_number, local_latest=?latest.num_hash(), "Skipping non consecutive build attempt");
return None;
}
Some(BuildArgs {
@@ -216,16 +221,15 @@ where
let fut = this.canon_receiver.recv();
pin!(fut);
fut.poll_unpin(cx)
} {
if let Some(current) = this.on_new_tip(state) {
trace!(
parent_hash = %current.block().parent_hash(),
block_number = current.block().number(),
"Clearing current flashblock on new canonical block"
);
} && let Some(current) = this.on_new_tip(state)
{
trace!(
parent_hash = %current.block().parent_hash(),
block_number = current.block().number(),
"Clearing current flashblock on new canonical block"
);
return Poll::Ready(Some(Ok(None)))
}
return Poll::Ready(Some(Ok(None)))
}
if !this.rebuild && this.current.is_some() {

View File

@@ -690,11 +690,11 @@ where
// We skip invalid cross chain txs, they would be removed on the next block update in
// the maintenance job
if let Some(interop) = interop {
if !is_valid_interop(interop, self.config.attributes.timestamp()) {
best_txs.mark_invalid(tx.signer(), tx.nonce());
continue
}
if let Some(interop) = interop &&
!is_valid_interop(interop, self.config.attributes.timestamp())
{
best_txs.mark_invalid(tx.signer(), tx.nonce());
continue
}
// check if the job was cancelled, if so we can exit early
if self.cancel.is_cancelled() {

View File

@@ -108,11 +108,10 @@ where
if let Some(notification) = canonical_notification {
let chain = notification.committed();
for block in chain.blocks_iter() {
if block.body().contains_transaction(&hash) {
if let Some(receipt) = this.transaction_receipt(hash).await? {
if block.body().contains_transaction(&hash)
&& let Some(receipt) = this.transaction_receipt(hash).await? {
return Ok(receipt);
}
}
}
} else {
// Canonical stream ended
@@ -130,11 +129,10 @@ where
// Check flashblocks for faster confirmation (Optimism-specific)
if let Ok(Some(pending_block)) = this.pending_flashblock() {
let block_and_receipts = pending_block.into_block_and_receipts();
if block_and_receipts.block.body().contains_transaction(&hash) {
if let Some(receipt) = this.transaction_receipt(hash).await? {
if block_and_receipts.block.body().contains_transaction(&hash)
&& let Some(receipt) = this.transaction_receipt(hash).await? {
return Ok(receipt);
}
}
}
}
}

View File

@@ -587,15 +587,15 @@ where
let this = self.get_mut();
// check if there is a better payload before returning the best payload
if let Some(fut) = Pin::new(&mut this.maybe_better).as_pin_mut() {
if let Poll::Ready(res) = fut.poll(cx) {
this.maybe_better = None;
if let Ok(Some(payload)) = res.map(|out| out.into_payload())
.inspect_err(|err| warn!(target: "payload_builder", %err, "failed to resolve pending payload"))
{
debug!(target: "payload_builder", "resolving better payload");
return Poll::Ready(Ok(payload))
}
if let Some(fut) = Pin::new(&mut this.maybe_better).as_pin_mut() &&
let Poll::Ready(res) = fut.poll(cx)
{
this.maybe_better = None;
if let Ok(Some(payload)) = res.map(|out| out.into_payload()).inspect_err(
|err| warn!(target: "payload_builder", %err, "failed to resolve pending payload"),
) {
debug!(target: "payload_builder", "resolving better payload");
return Poll::Ready(Ok(payload))
}
}
@@ -604,20 +604,20 @@ where
return Poll::Ready(Ok(best))
}
if let Some(fut) = Pin::new(&mut this.empty_payload).as_pin_mut() {
if let Poll::Ready(res) = fut.poll(cx) {
this.empty_payload = None;
return match res {
Ok(res) => {
if let Err(err) = &res {
warn!(target: "payload_builder", %err, "failed to resolve empty payload");
} else {
debug!(target: "payload_builder", "resolving empty payload");
}
Poll::Ready(res)
if let Some(fut) = Pin::new(&mut this.empty_payload).as_pin_mut() &&
let Poll::Ready(res) = fut.poll(cx)
{
this.empty_payload = None;
return match res {
Ok(res) => {
if let Err(err) = &res {
warn!(target: "payload_builder", %err, "failed to resolve empty payload");
} else {
debug!(target: "payload_builder", "resolving empty payload");
}
Err(err) => Poll::Ready(Err(err.into())),
Poll::Ready(res)
}
Err(err) => Poll::Ready(Err(err.into())),
}
}

View File

@@ -305,10 +305,10 @@ where
) -> Option<PayloadFuture<T::BuiltPayload>> {
debug!(target: "payload_builder", %id, "resolving payload job");
if let Some((cached, _, payload)) = &*self.cached_payload_rx.borrow() {
if *cached == id {
return Some(Box::pin(core::future::ready(Ok(payload.clone()))));
}
if let Some((cached, _, payload)) = &*self.cached_payload_rx.borrow() &&
*cached == id
{
return Some(Box::pin(core::future::ready(Ok(payload.clone()))));
}
let job = self.payload_jobs.iter().position(|(_, job_id)| *job_id == id)?;
@@ -356,10 +356,10 @@ where
{
/// Returns the payload timestamp for the given payload.
fn payload_timestamp(&self, id: PayloadId) -> Option<Result<u64, PayloadBuilderError>> {
if let Some((cached_id, timestamp, _)) = *self.cached_payload_rx.borrow() {
if cached_id == id {
return Some(Ok(timestamp));
}
if let Some((cached_id, timestamp, _)) = *self.cached_payload_rx.borrow() &&
cached_id == id
{
return Some(Ok(timestamp));
}
let timestamp = self

View File

@@ -48,18 +48,17 @@ where
// data. If the TransactionLookup checkpoint is lagging behind (which can happen e.g. when
// pre-merge history is dropped and then later tx lookup pruning is enabled) then we can
// only prune from the tx checkpoint and onwards.
if let Some(txs_checkpoint) = provider.get_prune_checkpoint(PruneSegment::Transactions)? {
if input
if let Some(txs_checkpoint) = provider.get_prune_checkpoint(PruneSegment::Transactions)? &&
input
.previous_checkpoint
.is_none_or(|checkpoint| checkpoint.block_number < txs_checkpoint.block_number)
{
input.previous_checkpoint = Some(txs_checkpoint);
debug!(
target: "pruner",
transactions_checkpoint = ?input.previous_checkpoint,
"No TransactionLookup checkpoint found, using Transactions checkpoint as fallback"
);
}
{
input.previous_checkpoint = Some(txs_checkpoint);
debug!(
target: "pruner",
transactions_checkpoint = ?input.previous_checkpoint,
"No TransactionLookup checkpoint found, using Transactions checkpoint as fallback"
);
}
let (start, end) = match input.get_next_tx_num_range(provider)? {

View File

@@ -96,12 +96,11 @@ impl ReceiptsLogPruneConfig {
let mut lowest = None;
for mode in self.values() {
if mode.is_distance() {
if let Some((block, _)) =
if mode.is_distance() &&
let Some((block, _)) =
mode.prune_target_block(tip, PruneSegment::ContractLogs, PrunePurpose::User)?
{
lowest = Some(lowest.unwrap_or(u64::MAX).min(block));
}
{
lowest = Some(lowest.unwrap_or(u64::MAX).min(block));
}
}

View File

@@ -120,19 +120,15 @@ where
let mut executed = self.pending_state.executed_block(&ancestor_hash);
// If it's not present, attempt to lookup invalid block.
if executed.is_none() {
if let Some(invalid) =
if executed.is_none() &&
let Some(invalid) =
self.pending_state.invalid_recovered_block(&ancestor_hash)
{
trace!(target: "reth::ress_provider", %block_hash, %ancestor_hash, "Using invalid ancestor block for witness construction");
executed = Some(ExecutedBlockWithTrieUpdates {
block: ExecutedBlock {
recovered_block: invalid,
..Default::default()
},
trie: ExecutedTrieUpdates::empty(),
});
}
{
trace!(target: "reth::ress_provider", %block_hash, %ancestor_hash, "Using invalid ancestor block for witness construction");
executed = Some(ExecutedBlockWithTrieUpdates {
block: ExecutedBlock { recovered_block: invalid, ..Default::default() },
trie: ExecutedTrieUpdates::empty(),
});
}
let Some(executed) = executed else {

View File

@@ -144,11 +144,11 @@ where
{
// set permissions only on unix
use std::os::unix::fs::PermissionsExt;
if let Some(perms_str) = &self.cfg.ipc_socket_permissions {
if let Ok(mode) = u32::from_str_radix(&perms_str.replace("0o", ""), 8) {
let perms = std::fs::Permissions::from_mode(mode);
let _ = std::fs::set_permissions(&self.endpoint, perms);
}
if let Some(perms_str) = &self.cfg.ipc_socket_permissions &&
let Ok(mode) = u32::from_str_radix(&perms_str.replace("0o", ""), 8)
{
let perms = std::fs::Permissions::from_mode(mode);
let _ = std::fs::set_permissions(&self.endpoint, perms);
}
}
listener

View File

@@ -70,8 +70,15 @@ pub trait ReceiptConverter<N: NodePrimitives>: Debug + 'static {
/// A type that knows how to convert a consensus header into an RPC header.
pub trait HeaderConverter<Consensus, Rpc>: Debug + Send + Sync + Unpin + Clone + 'static {
/// An associated RPC conversion error.
type Err: error::Error;
/// Converts a consensus header into an RPC header.
fn convert_header(&self, header: SealedHeader<Consensus>, block_size: usize) -> Rpc;
fn convert_header(
&self,
header: SealedHeader<Consensus>,
block_size: usize,
) -> Result<Rpc, Self::Err>;
}
/// Default implementation of [`HeaderConverter`] that uses [`FromConsensusHeader`] to convert
@@ -80,8 +87,14 @@ impl<Consensus, Rpc> HeaderConverter<Consensus, Rpc> for ()
where
Rpc: FromConsensusHeader<Consensus>,
{
fn convert_header(&self, header: SealedHeader<Consensus>, block_size: usize) -> Rpc {
Rpc::from_consensus_header(header, block_size)
type Err = Infallible;
fn convert_header(
&self,
header: SealedHeader<Consensus>,
block_size: usize,
) -> Result<Rpc, Self::Err> {
Ok(Rpc::from_consensus_header(header, block_size))
}
}
@@ -205,10 +218,12 @@ pub trait IntoRpcTx<T> {
/// An additional context, usually [`TransactionInfo`] in a wrapper that carries some
/// implementation specific extra information.
type TxInfo;
/// An associated RPC conversion error.
type Err: error::Error;
/// Performs the conversion consuming `self` with `signer` and `tx_info`. See [`IntoRpcTx`]
/// for details.
fn into_rpc_tx(self, signer: Address, tx_info: Self::TxInfo) -> T;
fn into_rpc_tx(self, signer: Address, tx_info: Self::TxInfo) -> Result<T, Self::Err>;
}
/// Converts `T` into `self`. It is reciprocal of [`IntoRpcTx`].
@@ -222,23 +237,30 @@ pub trait IntoRpcTx<T> {
/// Prefer using [`IntoRpcTx`] over using [`FromConsensusTx`] when specifying trait bounds on a
/// generic function. This way, types that directly implement [`IntoRpcTx`] can be used as arguments
/// as well.
pub trait FromConsensusTx<T> {
pub trait FromConsensusTx<T>: Sized {
/// An additional context, usually [`TransactionInfo`] in a wrapper that carries some
/// implementation specific extra information.
type TxInfo;
/// An associated RPC conversion error.
type Err: error::Error;
/// Performs the conversion consuming `tx` with `signer` and `tx_info`. See [`FromConsensusTx`]
/// for details.
fn from_consensus_tx(tx: T, signer: Address, tx_info: Self::TxInfo) -> Self;
fn from_consensus_tx(tx: T, signer: Address, tx_info: Self::TxInfo) -> Result<Self, Self::Err>;
}
impl<TxIn: alloy_consensus::Transaction, T: alloy_consensus::Transaction + From<TxIn>>
FromConsensusTx<TxIn> for Transaction<T>
{
type TxInfo = TransactionInfo;
type Err = Infallible;
fn from_consensus_tx(tx: TxIn, signer: Address, tx_info: Self::TxInfo) -> Self {
Self::from_transaction(Recovered::new_unchecked(tx.into(), signer), tx_info)
fn from_consensus_tx(
tx: TxIn,
signer: Address,
tx_info: Self::TxInfo,
) -> Result<Self, Self::Err> {
Ok(Self::from_transaction(Recovered::new_unchecked(tx.into(), signer), tx_info))
}
}
@@ -246,10 +268,12 @@ impl<ConsensusTx, RpcTx> IntoRpcTx<RpcTx> for ConsensusTx
where
ConsensusTx: alloy_consensus::Transaction,
RpcTx: FromConsensusTx<Self>,
<RpcTx as FromConsensusTx<ConsensusTx>>::Err: Debug,
{
type TxInfo = RpcTx::TxInfo;
type Err = <RpcTx as FromConsensusTx<ConsensusTx>>::Err;
fn into_rpc_tx(self, signer: Address, tx_info: Self::TxInfo) -> RpcTx {
fn into_rpc_tx(self, signer: Address, tx_info: Self::TxInfo) -> Result<RpcTx, Self::Err> {
RpcTx::from_consensus_tx(self, signer, tx_info)
}
}
@@ -285,7 +309,7 @@ impl<Tx, RpcTx> RpcTxConverter<Tx, RpcTx, Tx::TxInfo> for ()
where
Tx: IntoRpcTx<RpcTx>,
{
type Err = Infallible;
type Err = Tx::Err;
fn convert_rpc_tx(
&self,
@@ -293,7 +317,7 @@ where
signer: Address,
tx_info: Tx::TxInfo,
) -> Result<RpcTx, Self::Err> {
Ok(tx.into_rpc_tx(signer, tx_info))
tx.into_rpc_tx(signer, tx_info)
}
}
@@ -893,6 +917,7 @@ where
+ From<TxEnv::Error>
+ From<<Map as TxInfoMapper<TxTy<N>>>::Err>
+ From<RpcTx::Err>
+ From<Header::Err>
+ Error
+ Unpin
+ Sync
@@ -924,7 +949,7 @@ where
let (tx, signer) = tx.into_parts();
let tx_info = self.mapper.try_map(&tx, tx_info)?;
Ok(self.rpc_tx_converter.convert_rpc_tx(tx, signer, tx_info)?)
self.rpc_tx_converter.convert_rpc_tx(tx, signer, tx_info).map_err(Into::into)
}
fn build_simulate_v1_transaction(
@@ -966,7 +991,7 @@ where
header: SealedHeaderFor<Self::Primitives>,
block_size: usize,
) -> Result<RpcHeader<Self::Network>, Self::Error> {
Ok(self.header_converter.convert_header(header, block_size))
Ok(self.header_converter.convert_header(header, block_size)?)
}
}
@@ -1016,9 +1041,14 @@ pub mod op {
for op_alloy_rpc_types::Transaction<T>
{
type TxInfo = OpTransactionInfo;
type Err = Infallible;
fn from_consensus_tx(tx: T, signer: Address, tx_info: Self::TxInfo) -> Self {
Self::from_transaction(Recovered::new_unchecked(tx, signer), tx_info)
fn from_consensus_tx(
tx: T,
signer: Address,
tx_info: Self::TxInfo,
) -> Result<Self, Self::Err> {
Ok(Self::from_transaction(Recovered::new_unchecked(tx, signer), tx_info))
}
}

View File

@@ -29,7 +29,10 @@ use reth_rpc_api::{EngineApiServer, IntoEngineApiRpcModule};
use reth_storage_api::{BlockReader, HeaderProvider, StateProviderFactory};
use reth_tasks::TaskSpawner;
use reth_transaction_pool::TransactionPool;
use std::{sync::Arc, time::Instant};
use std::{
sync::Arc,
time::{Instant, SystemTime},
};
use tokio::sync::oneshot;
use tracing::{debug, trace, warn};
@@ -572,11 +575,10 @@ where
// > Client software MUST NOT return trailing null values if the request extends past the current latest known block.
// truncate the end if it's greater than the last block
if let Ok(best_block) = inner.provider.best_block_number() {
if end > best_block {
if let Ok(best_block) = inner.provider.best_block_number()
&& end > best_block {
end = best_block;
}
}
for num in start..=end {
let block_result = inner.provider.block(BlockHashOrNumber::Number(num));
@@ -753,6 +755,15 @@ where
&self,
versioned_hashes: Vec<B256>,
) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
// Only allow this method before Osaka fork
let current_timestamp =
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
if self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
return Err(EngineApiError::EngineObjectValidationError(
reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
));
}
if versioned_hashes.len() > MAX_BLOB_LIMIT {
return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
}
@@ -788,6 +799,15 @@ where
&self,
versioned_hashes: Vec<B256>,
) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
// Check if Osaka fork is active
let current_timestamp =
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
if !self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
return Err(EngineApiError::EngineObjectValidationError(
reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
));
}
if versioned_hashes.len() > MAX_BLOB_LIMIT {
return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
}

View File

@@ -195,16 +195,14 @@ pub trait EthBlocks:
}
if let Some(block_hash) =
self.provider().block_hash_for_id(block_id).map_err(Self::Error::from_eth_err)?
{
if let Some((block, receipts)) = self
self.provider().block_hash_for_id(block_id).map_err(Self::Error::from_eth_err)? &&
let Some((block, receipts)) = self
.cache()
.get_block_and_receipts(block_hash)
.await
.map_err(Self::Error::from_eth_err)?
{
return Ok(Some((block, receipts)));
}
{
return Ok(Some((block, receipts)));
}
Ok(None)

View File

@@ -122,14 +122,11 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
if let Some(block_overrides) = block_overrides {
// ensure we don't allow uncapped gas limit per block
if let Some(gas_limit_override) = block_overrides.gas_limit {
if gas_limit_override > evm_env.block_env.gas_limit &&
gas_limit_override > this.call_gas_limit()
{
return Err(
EthApiError::other(EthSimulateError::GasLimitReached).into()
)
}
if let Some(gas_limit_override) = block_overrides.gas_limit &&
gas_limit_override > evm_env.block_env.gas_limit &&
gas_limit_override > this.call_gas_limit()
{
return Err(EthApiError::other(EthSimulateError::GasLimitReached).into())
}
apply_block_overrides(block_overrides, &mut db, &mut evm_env.block_env);
}
@@ -163,7 +160,9 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
let ctx = this
.evm_config()
.context_for_next_block(&parent, this.next_env_attributes(&parent)?);
.context_for_next_block(&parent, this.next_env_attributes(&parent)?)
.map_err(RethError::other)
.map_err(Self::Error::from_eth_err)?;
let (result, results) = if trace_transfers {
// prepare inspector to capture transfer inside the evm so they are recorded
// and included in logs

View File

@@ -1,6 +1,6 @@
//! Loads chain configuration.
use alloy_consensus::{BlockHeader, Header};
use alloy_consensus::Header;
use alloy_eips::eip7910::{EthConfig, EthForkConfig, SystemContract};
use alloy_evm::precompiles::Precompile;
use alloy_primitives::Address;
@@ -12,9 +12,9 @@ use reth_node_api::NodePrimitives;
use reth_revm::db::EmptyDB;
use reth_rpc_eth_types::EthApiError;
use reth_storage_api::BlockReaderIdExt;
use revm::precompile::PrecompileId;
use std::{borrow::Borrow, collections::BTreeMap};
use std::collections::BTreeMap;
/// RPC endpoint support for [EIP-7910](https://eips.ethereum.org/EIPS/eip-7910)
#[cfg_attr(not(feature = "client"), rpc(server, namespace = "eth"))]
#[cfg_attr(feature = "client", rpc(server, client, namespace = "eth"))]
pub trait EthConfigApi {
@@ -89,16 +89,13 @@ where
.ok_or_else(|| ProviderError::BestBlockNotFound)?
.into_header();
// Short-circuit if Cancun is not active.
if !chain_spec.is_cancun_active_at_timestamp(latest.timestamp()) {
return Err(RethError::msg("cancun has not been activated"))
}
let current_precompiles =
evm_to_precompiles_map(self.evm_config.evm_for_block(EmptyDB::default(), &latest));
let current_precompiles = evm_to_precompiles_map(
self.evm_config.evm_for_block(EmptyDB::default(), &latest).map_err(RethError::other)?,
);
let mut fork_timestamps =
chain_spec.forks_iter().filter_map(|(_, cond)| cond.as_timestamp()).collect::<Vec<_>>();
fork_timestamps.sort_unstable();
fork_timestamps.dedup();
let (current_fork_idx, current_fork_timestamp) = fork_timestamps
@@ -115,34 +112,38 @@ where
let mut config = EthConfig { current, next: None, last: None };
if let Some(last_fork_idx) = current_fork_idx.checked_sub(1) {
if let Some(last_fork_timestamp) = fork_timestamps.get(last_fork_idx).copied() {
let fake_header = {
let mut header = latest.clone();
header.timestamp = last_fork_timestamp;
header
};
let last_precompiles = evm_to_precompiles_map(
self.evm_config.evm_for_block(EmptyDB::default(), &fake_header),
);
config.last = self.build_fork_config_at(last_fork_timestamp, last_precompiles);
}
}
if let Some(next_fork_timestamp) = fork_timestamps.get(current_fork_idx + 1).copied() {
let fake_header = {
let mut header = latest;
let mut header = latest.clone();
header.timestamp = next_fork_timestamp;
header
};
let next_precompiles = evm_to_precompiles_map(
self.evm_config.evm_for_block(EmptyDB::default(), &fake_header),
self.evm_config
.evm_for_block(EmptyDB::default(), &fake_header)
.map_err(RethError::other)?,
);
config.next = self.build_fork_config_at(next_fork_timestamp, next_precompiles);
} else {
// If there is no fork scheduled, there is no "last" or "final" fork scheduled.
return Ok(config);
}
let last_fork_timestamp = fork_timestamps.last().copied().unwrap();
let fake_header = {
let mut header = latest;
header.timestamp = last_fork_timestamp;
header
};
let last_precompiles = evm_to_precompiles_map(
self.evm_config
.evm_for_block(EmptyDB::default(), &fake_header)
.map_err(RethError::other)?,
);
config.last = self.build_fork_config_at(last_fork_timestamp, last_precompiles);
Ok(config)
}
}
@@ -166,33 +167,7 @@ fn evm_to_precompiles_map(
precompiles
.addresses()
.filter_map(|address| {
Some((precompile_to_str(precompiles.get(address)?.precompile_id()), *address))
Some((precompiles.get(address)?.precompile_id().name().to_string(), *address))
})
.collect()
}
// TODO: move
fn precompile_to_str(id: &PrecompileId) -> String {
let str = match id {
PrecompileId::EcRec => "ECREC",
PrecompileId::Sha256 => "SHA256",
PrecompileId::Ripemd160 => "RIPEMD160",
PrecompileId::Identity => "ID",
PrecompileId::ModExp => "MODEXP",
PrecompileId::Bn254Add => "BN254_ADD",
PrecompileId::Bn254Mul => "BN254_MUL",
PrecompileId::Bn254Pairing => "BN254_PAIRING",
PrecompileId::Blake2F => "BLAKE2F",
PrecompileId::KzgPointEvaluation => "KZG_POINT_EVALUATION",
PrecompileId::Bls12G1Add => "BLS12_G1ADD",
PrecompileId::Bls12G1Msm => "BLS12_G1MSM",
PrecompileId::Bls12G2Add => "BLS12_G2ADD",
PrecompileId::Bls12G2Msm => "BLS12_G2MSM",
PrecompileId::Bls12Pairing => "BLS12_PAIRING_CHECK",
PrecompileId::Bls12MapFpToGp1 => "BLS12_MAP_FP_TO_G1",
PrecompileId::Bls12MapFp2ToGp2 => "BLS12_MAP_FP2_TO_G2",
PrecompileId::P256Verify => "P256_VERIFY",
PrecompileId::Custom(custom) => custom.borrow(),
};
str.to_owned()
}

View File

@@ -88,14 +88,14 @@ pub trait EstimateCall: Call {
let mut tx_env = self.create_txn_env(&evm_env, request, &mut db)?;
// Check if this is a basic transfer (no input data to account with no code)
let mut is_basic_transfer = false;
if tx_env.input().is_empty() {
if let TxKind::Call(to) = tx_env.kind() {
if let Ok(code) = db.db.account_code(&to) {
is_basic_transfer = code.map(|code| code.is_empty()).unwrap_or(true);
}
}
}
let is_basic_transfer = if tx_env.input().is_empty() &&
let TxKind::Call(to) = tx_env.kind() &&
let Ok(code) = db.db.account_code(&to)
{
code.map(|code| code.is_empty()).unwrap_or(true)
} else {
false
};
// Check funds of the sender (only useful to check if transaction gas price is more than 0).
//
@@ -123,10 +123,10 @@ pub trait EstimateCall: Call {
min_tx_env.set_gas_limit(MIN_TRANSACTION_GAS);
// Reuse the same EVM instance
if let Ok(res) = evm.transact(min_tx_env).map_err(Self::Error::from_evm_err) {
if res.result.is_success() {
return Ok(U256::from(MIN_TRANSACTION_GAS))
}
if let Ok(res) = evm.transact(min_tx_env).map_err(Self::Error::from_evm_err) &&
res.result.is_success()
{
return Ok(U256::from(MIN_TRANSACTION_GAS))
}
}

View File

@@ -109,10 +109,10 @@ pub trait EthFees:
// need to validate that they are monotonically
// increasing and 0 <= p <= 100
// Note: The types used ensure that the percentiles are never < 0
if let Some(percentiles) = &reward_percentiles {
if percentiles.windows(2).any(|w| w[0] > w[1] || w[0] > 100.) {
return Err(EthApiError::InvalidRewardPercentiles.into())
}
if let Some(percentiles) = &reward_percentiles &&
percentiles.windows(2).any(|w| w[0] > w[1] || w[0] > 100.)
{
return Err(EthApiError::InvalidRewardPercentiles.into())
}
// Fetch the headers and ensure we got all of them

View File

@@ -72,22 +72,25 @@ pub trait LoadPendingBlock:
>,
Self::Error,
> {
if let Some(block) = self.provider().pending_block().map_err(Self::Error::from_eth_err)? {
if let Some(receipts) = self
if let Some(block) = self.provider().pending_block().map_err(Self::Error::from_eth_err)? &&
let Some(receipts) = self
.provider()
.receipts_by_block(block.hash().into())
.map_err(Self::Error::from_eth_err)?
{
// Note: for the PENDING block we assume it is past the known merge block and
// thus this will not fail when looking up the total
// difficulty value for the blockenv.
let evm_env = self.evm_config().evm_env(block.header());
{
// Note: for the PENDING block we assume it is past the known merge block and
// thus this will not fail when looking up the total
// difficulty value for the blockenv.
let evm_env = self
.evm_config()
.evm_env(block.header())
.map_err(RethError::other)
.map_err(Self::Error::from_eth_err)?;
return Ok(PendingBlockEnv::new(
evm_env,
PendingBlockEnvOrigin::ActualPending(Arc::new(block), Arc::new(receipts)),
));
}
return Ok(PendingBlockEnv::new(
evm_env,
PendingBlockEnvOrigin::ActualPending(Arc::new(block), Arc::new(receipts)),
));
}
// no pending block from the CL yet, so we use the latest block and modify the env
@@ -309,21 +312,21 @@ pub trait LoadPendingBlock:
// There's only limited amount of blob space available per block, so we need to
// check if the EIP-4844 can still fit in the block
if let Some(tx_blob_gas) = tx.blob_gas_used() {
if sum_blob_gas_used + tx_blob_gas > blob_params.max_blob_gas_per_block() {
// we can't fit this _blob_ transaction into the block, so we mark it as
// invalid, which removes its dependent transactions from
// the iterator. This is similar to the gas limit condition
// for regular transactions above.
best_txs.mark_invalid(
&pool_tx,
InvalidPoolTransactionError::ExceedsGasLimit(
tx_blob_gas,
blob_params.max_blob_gas_per_block(),
),
);
continue
}
if let Some(tx_blob_gas) = tx.blob_gas_used() &&
sum_blob_gas_used + tx_blob_gas > blob_params.max_blob_gas_per_block()
{
// we can't fit this _blob_ transaction into the block, so we mark it as
// invalid, which removes its dependent transactions from
// the iterator. This is similar to the gas limit condition
// for regular transactions above.
best_txs.mark_invalid(
&pool_tx,
InvalidPoolTransactionError::ExceedsGasLimit(
tx_blob_gas,
blob_params.max_blob_gas_per_block(),
),
);
continue
}
let gas_used = match builder.execute_transaction(tx.clone()) {

View File

@@ -221,10 +221,10 @@ pub trait LoadState:
Self: SpawnBlocking,
{
async move {
if at.is_pending() {
if let Ok(Some(state)) = self.local_pending_state().await {
return Ok(state)
}
if at.is_pending() &&
let Ok(Some(state)) = self.local_pending_state().await
{
return Ok(state)
}
self.provider().state_by_block_id(at).map_err(Self::Error::from_eth_err)
@@ -281,7 +281,11 @@ pub trait LoadState:
let header =
self.cache().get_header(block_hash).await.map_err(Self::Error::from_eth_err)?;
let evm_env = self.evm_config().evm_env(&header);
let evm_env = self
.evm_config()
.evm_env(&header)
.map_err(RethError::other)
.map_err(Self::Error::from_eth_err)?;
Ok((evm_env, block_hash.into()))
}

View File

@@ -92,10 +92,10 @@ pub trait EthTransactions: LoadTransaction<Provider: BlockReaderIdExt> {
while let Some(notification) = stream.next().await {
let chain = notification.committed();
for block in chain.blocks_iter() {
if block.body().contains_transaction(&hash) {
if let Some(receipt) = this.transaction_receipt(hash).await? {
return Ok(receipt);
}
if block.body().contains_transaction(&hash) &&
let Some(receipt) = this.transaction_receipt(hash).await?
{
return Ok(receipt);
}
}
}
@@ -294,13 +294,12 @@ pub trait EthTransactions: LoadTransaction<Provider: BlockReaderIdExt> {
{
async move {
// Check the pool first
if include_pending {
if let Some(tx) =
if include_pending &&
let Some(tx) =
RpcNodeCore::pool(self).get_transaction_by_sender_and_nonce(sender, nonce)
{
let transaction = tx.transaction.clone_into_consensus();
return Ok(Some(self.tx_resp_builder().fill_pending(transaction)?));
}
{
let transaction = tx.transaction.clone_into_consensus();
return Ok(Some(self.tx_resp_builder().fill_pending(transaction)?));
}
// Check if the sender is a contract
@@ -370,10 +369,10 @@ pub trait EthTransactions: LoadTransaction<Provider: BlockReaderIdExt> {
Self: LoadBlock,
{
async move {
if let Some(block) = self.recovered_block(block_id).await? {
if let Some(tx) = block.body().transactions().get(index) {
return Ok(Some(tx.encoded_2718().into()))
}
if let Some(block) = self.recovered_block(block_id).await? &&
let Some(tx) = block.body().transactions().get(index)
{
return Ok(Some(tx.encoded_2718().into()))
}
Ok(None)

View File

@@ -100,11 +100,11 @@ where
{
let size = value.size();
if self.cache.limiter().is_over_the_limit(self.cache.len() + 1) {
if let Some((_, evicted)) = self.cache.pop_oldest() {
// update tracked memory with the evicted value
self.memory_usage = self.memory_usage.saturating_sub(evicted.size());
}
if self.cache.limiter().is_over_the_limit(self.cache.len() + 1) &&
let Some((_, evicted)) = self.cache.pop_oldest()
{
// update tracked memory with the evicted value
self.memory_usage = self.memory_usage.saturating_sub(evicted.size());
}
if self.cache.insert(key, value) {

View File

@@ -234,13 +234,13 @@ pub async fn fee_history_cache_new_blocks_task<St, Provider, N>(
let mut fetch_missing_block = Fuse::terminated();
loop {
if fetch_missing_block.is_terminated() {
if let Some(block_number) = missing_blocks.pop_front() {
trace!(target: "rpc::fee", ?block_number, "Fetching missing block for fee history cache");
if let Ok(Some(hash)) = provider.block_hash(block_number) {
// fetch missing block
fetch_missing_block = cache.get_block_and_receipts(hash).boxed().fuse();
}
if fetch_missing_block.is_terminated() &&
let Some(block_number) = missing_blocks.pop_front()
{
trace!(target: "rpc::fee", ?block_number, "Fetching missing block for fee history cache");
if let Ok(Some(hash)) = provider.block_hash(block_number) {
// fetch missing block
fetch_missing_block = cache.get_block_and_receipts(hash).boxed().fuse();
}
}

View File

@@ -204,10 +204,10 @@ where
};
// constrain to the max price
if let Some(max_price) = self.oracle_config.max_price {
if price > max_price {
price = max_price;
}
if let Some(max_price) = self.oracle_config.max_price &&
price > max_price
{
price = max_price;
}
inner.last_price = GasPriceOracleResult { block_hash: header.hash(), price };
@@ -254,10 +254,10 @@ where
};
// ignore transactions with a tip under the configured threshold
if let Some(ignore_under) = self.ignore_price {
if effective_tip < Some(ignore_under) {
continue
}
if let Some(ignore_under) = self.ignore_price &&
effective_tip < Some(ignore_under)
{
continue
}
// check if the sender was the coinbase, if so, ignore
@@ -338,10 +338,10 @@ where
}
// constrain to the max price
if let Some(max_price) = self.oracle_config.max_price {
if suggestion > max_price {
suggestion = max_price;
}
if let Some(max_price) = self.oracle_config.max_price &&
suggestion > max_price
{
suggestion = max_price;
}
inner.last_price = GasPriceOracleResult { block_hash: header.hash(), price: suggestion };

View File

@@ -18,6 +18,7 @@ use alloy_rpc_types_trace::geth::{
use async_trait::async_trait;
use jsonrpsee::core::RpcResult;
use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks};
use reth_errors::RethError;
use reth_evm::{execute::Executor, ConfigureEvm, EvmEnvFor, TxEnvFor};
use reth_primitives_traits::{Block as _, BlockBody, ReceiptWithBloom, RecoveredBlock};
use reth_revm::{
@@ -151,7 +152,12 @@ where
.map_err(BlockError::RlpDecodeRawBlock)
.map_err(Eth::Error::from_eth_err)?;
let evm_env = self.eth_api().evm_config().evm_env(block.header());
let evm_env = self
.eth_api()
.evm_config()
.evm_env(block.header())
.map_err(RethError::other)
.map_err(Eth::Error::from_eth_err)?;
// Depending on EIP-2 we need to recover the transactions differently
let senders =

View File

@@ -501,11 +501,11 @@ where
.transpose()?
.flatten();
if let Some(f) = from {
if f > info.best_number {
// start block higher than local head, can return empty
return Ok(Vec::new());
}
if let Some(f) = from &&
f > info.best_number
{
// start block higher than local head, can return empty
return Ok(Vec::new());
}
let (from_block_number, to_block_number) =
@@ -658,22 +658,23 @@ where
// size check but only if range is multiple blocks, so we always return all
// logs of a single block
let is_multi_block_range = from_block != to_block;
if let Some(max_logs_per_response) = limits.max_logs_per_response {
if is_multi_block_range && all_logs.len() > max_logs_per_response {
debug!(
target: "rpc::eth::filter",
logs_found = all_logs.len(),
max_logs_per_response,
from_block,
to_block = num_hash.number.saturating_sub(1),
"Query exceeded max logs per response limit"
);
return Err(EthFilterError::QueryExceedsMaxResults {
max_logs: max_logs_per_response,
from_block,
to_block: num_hash.number.saturating_sub(1),
});
}
if let Some(max_logs_per_response) = limits.max_logs_per_response &&
is_multi_block_range &&
all_logs.len() > max_logs_per_response
{
debug!(
target: "rpc::eth::filter",
logs_found = all_logs.len(),
max_logs_per_response,
from_block,
to_block = num_hash.number.saturating_sub(1),
"Query exceeded max logs per response limit"
);
return Err(EthFilterError::QueryExceedsMaxResults {
max_logs: max_logs_per_response,
from_block,
to_block: num_hash.number.saturating_sub(1),
});
}
}

View File

@@ -490,14 +490,14 @@ where
let mut maybe_traces =
maybe_traces.map(|traces| traces.into_iter().flatten().collect::<Vec<_>>());
if let (Some(block), Some(traces)) = (maybe_block, maybe_traces.as_mut()) {
if let Some(base_block_reward) = self.calculate_base_block_reward(block.header())? {
traces.extend(self.extract_reward_traces(
block.header(),
block.body().ommers(),
base_block_reward,
));
}
if let (Some(block), Some(traces)) = (maybe_block, maybe_traces.as_mut()) &&
let Some(base_block_reward) = self.calculate_base_block_reward(block.header())?
{
traces.extend(self.extract_reward_traces(
block.header(),
block.body().ommers(),
base_block_reward,
));
}
Ok(maybe_traces)

View File

@@ -143,10 +143,10 @@ where
if self.disallow.contains(sender) {
return Err(ValidationApiError::Blacklist(*sender))
}
if let Some(to) = tx.to() {
if self.disallow.contains(&to) {
return Err(ValidationApiError::Blacklist(to))
}
if let Some(to) = tx.to() &&
self.disallow.contains(&to)
{
return Err(ValidationApiError::Blacklist(to))
}
}
}
@@ -334,10 +334,10 @@ where
return Err(ValidationApiError::ProposerPayment)
}
if let Some(block_base_fee) = block.header().base_fee_per_gas() {
if tx.effective_tip_per_gas(block_base_fee).unwrap_or_default() != 0 {
return Err(ValidationApiError::ProposerPayment)
}
if let Some(block_base_fee) = block.header().base_fee_per_gas() &&
tx.effective_tip_per_gas(block_base_fee).unwrap_or_default() != 0
{
return Err(ValidationApiError::ProposerPayment)
}
Ok(())

View File

@@ -73,16 +73,15 @@ impl<Provider> StageSetBuilder<Provider> {
fn upsert_stage_state(&mut self, stage: Box<dyn Stage<Provider>>, added_at_index: usize) {
let stage_id = stage.id();
if self.stages.insert(stage.id(), StageEntry { stage, enabled: true }).is_some() {
if let Some(to_remove) = self
if self.stages.insert(stage.id(), StageEntry { stage, enabled: true }).is_some() &&
let Some(to_remove) = self
.order
.iter()
.enumerate()
.find(|(i, id)| *i != added_at_index && **id == stage_id)
.map(|(i, _)| i)
{
self.order.remove(to_remove);
}
{
self.order.remove(to_remove);
}
}
@@ -264,10 +263,10 @@ impl<Provider> StageSetBuilder<Provider> {
pub fn build(mut self) -> Vec<Box<dyn Stage<Provider>>> {
let mut stages = Vec::new();
for id in &self.order {
if let Some(entry) = self.stages.remove(id) {
if entry.enabled {
stages.push(entry.stage);
}
if let Some(entry) = self.stages.remove(id) &&
entry.enabled
{
stages.push(entry.stage);
}
}
stages

View File

@@ -702,11 +702,10 @@ mod tests {
// Validate sequentiality only after prev progress,
// since the data before is mocked and can contain gaps
if number > prev_progress {
if let Some(prev_key) = prev_number {
if number > prev_progress
&& let Some(prev_key) = prev_number {
assert_eq!(prev_key + 1, number, "Body entries must be sequential");
}
}
// Validate that the current entry is below or equals to the highest allowed block
assert!(

View File

@@ -150,18 +150,17 @@ where
return Poll::Ready(Ok(()));
}
if self.stream.is_none() {
if let Some(source) = self.source.clone() {
self.stream.replace(source.create(input)?);
}
if self.stream.is_none() &&
let Some(source) = self.source.clone()
{
self.stream.replace(source.create(input)?);
}
if let Some(stream) = &mut self.stream {
if let Some(next) = ready!(stream.poll_next_unpin(cx))
if let Some(stream) = &mut self.stream &&
let Some(next) = ready!(stream.poll_next_unpin(cx))
.transpose()
.map_err(|e| StageError::Fatal(e.into()))?
{
self.item.replace(next);
}
{
self.item.replace(next);
}
Poll::Ready(Ok(()))
@@ -546,11 +545,10 @@ mod tests {
// Validate sequentiality only after prev progress,
// since the data before is mocked and can contain gaps
if number > prev_progress {
if let Some(prev_key) = prev_number {
if number > prev_progress
&& let Some(prev_key) = prev_number {
assert_eq!(prev_key + 1, number, "Body entries must be sequential");
}
}
// Validate that the current entry is below or equals to the highest allowed block
assert!(

View File

@@ -145,19 +145,18 @@ where
let mut cursor_header_numbers =
provider.tx_ref().cursor_write::<RawTable<tables::HeaderNumbers>>()?;
let mut first_sync = false;
// If we only have the genesis block hash, then we are at first sync, and we can remove it,
// add it to the collector and use tx.append on all hashes.
if provider.tx_ref().entries::<RawTable<tables::HeaderNumbers>>()? == 1 {
if let Some((hash, block_number)) = cursor_header_numbers.last()? {
if block_number.value()? == 0 {
self.hash_collector.insert(hash.key()?, 0)?;
cursor_header_numbers.delete_current()?;
first_sync = true;
}
}
}
let first_sync = if provider.tx_ref().entries::<RawTable<tables::HeaderNumbers>>()? == 1 &&
let Some((hash, block_number)) = cursor_header_numbers.last()? &&
block_number.value()? == 0
{
self.hash_collector.insert(hash.key()?, 0)?;
cursor_header_numbers.delete_current()?;
true
} else {
false
};
// Since ETL sorts all entries by hashes, we are either appending (first sync) or inserting
// in order (further syncs).

View File

@@ -67,23 +67,22 @@ where
)
})
.transpose()?
.flatten()
.flatten() &&
target_prunable_block > input.checkpoint().block_number
{
if target_prunable_block > input.checkpoint().block_number {
input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
// Save prune checkpoint only if we don't have one already.
// Otherwise, pruner may skip the unpruned range of blocks.
if provider.get_prune_checkpoint(PruneSegment::AccountHistory)?.is_none() {
provider.save_prune_checkpoint(
PruneSegment::AccountHistory,
PruneCheckpoint {
block_number: Some(target_prunable_block),
tx_number: None,
prune_mode,
},
)?;
}
// Save prune checkpoint only if we don't have one already.
// Otherwise, pruner may skip the unpruned range of blocks.
if provider.get_prune_checkpoint(PruneSegment::AccountHistory)?.is_none() {
provider.save_prune_checkpoint(
PruneSegment::AccountHistory,
PruneCheckpoint {
block_number: Some(target_prunable_block),
tx_number: None,
prune_mode,
},
)?;
}
}

View File

@@ -70,23 +70,22 @@ where
)
})
.transpose()?
.flatten()
.flatten() &&
target_prunable_block > input.checkpoint().block_number
{
if target_prunable_block > input.checkpoint().block_number {
input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
// Save prune checkpoint only if we don't have one already.
// Otherwise, pruner may skip the unpruned range of blocks.
if provider.get_prune_checkpoint(PruneSegment::StorageHistory)?.is_none() {
provider.save_prune_checkpoint(
PruneSegment::StorageHistory,
PruneCheckpoint {
block_number: Some(target_prunable_block),
tx_number: None,
prune_mode,
},
)?;
}
// Save prune checkpoint only if we don't have one already.
// Otherwise, pruner may skip the unpruned range of blocks.
if provider.get_prune_checkpoint(PruneSegment::StorageHistory)?.is_none() {
provider.save_prune_checkpoint(
PruneSegment::StorageHistory,
PruneCheckpoint {
block_number: Some(target_prunable_block),
tx_number: None,
prune_mode,
},
)?;
}
}

View File

@@ -88,28 +88,27 @@ where
)
})
.transpose()?
.flatten()
.flatten() &&
target_prunable_block > input.checkpoint().block_number
{
if target_prunable_block > input.checkpoint().block_number {
input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
// Save prune checkpoint only if we don't have one already.
// Otherwise, pruner may skip the unpruned range of blocks.
if provider.get_prune_checkpoint(PruneSegment::TransactionLookup)?.is_none() {
let target_prunable_tx_number = provider
.block_body_indices(target_prunable_block)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(target_prunable_block))?
.last_tx_num();
// Save prune checkpoint only if we don't have one already.
// Otherwise, pruner may skip the unpruned range of blocks.
if provider.get_prune_checkpoint(PruneSegment::TransactionLookup)?.is_none() {
let target_prunable_tx_number = provider
.block_body_indices(target_prunable_block)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(target_prunable_block))?
.last_tx_num();
provider.save_prune_checkpoint(
PruneSegment::TransactionLookup,
PruneCheckpoint {
block_number: Some(target_prunable_block),
tx_number: Some(target_prunable_tx_number),
prune_mode,
},
)?;
}
provider.save_prune_checkpoint(
PruneSegment::TransactionLookup,
PruneCheckpoint {
block_number: Some(target_prunable_block),
tx_number: Some(target_prunable_tx_number),
prune_mode,
},
)?;
}
}
if input.target_reached() {
@@ -213,10 +212,10 @@ where
// Delete all transactions that belong to this block
for tx_id in body.tx_num_range() {
// First delete the transaction and hash to id mapping
if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? {
if tx_hash_number_cursor.seek_exact(transaction.trie_hash())?.is_some() {
tx_hash_number_cursor.delete_current()?;
}
if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? &&
tx_hash_number_cursor.seek_exact(transaction.trie_hash())?.is_some()
{
tx_hash_number_cursor.delete_current()?;
}
}
}
@@ -538,11 +537,10 @@ mod tests {
})
.transpose()
.expect("prune target block for transaction lookup")
.flatten()
.flatten() &&
target_prunable_block > input.checkpoint().block_number
{
if target_prunable_block > input.checkpoint().block_number {
input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
}
input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
}
let start_block = input.next_block();
let end_block = output.checkpoint.block_number;

View File

@@ -156,12 +156,11 @@ where
// If it's not the first sync, there might an existing shard already, so we need to
// merge it with the one coming from the collector
if !append_only {
if let Some((_, last_database_shard)) =
if !append_only &&
let Some((_, last_database_shard)) =
write_cursor.seek_exact(sharded_key_factory(current_partial, u64::MAX))?
{
current_list.extend(last_database_shard.iter());
}
{
current_list.extend(last_database_shard.iter());
}
}
@@ -265,10 +264,10 @@ where
// To be extra safe, we make sure that the last tx num matches the last block from its indices.
// If not, get it.
loop {
if let Some(indices) = provider.block_body_indices(last_block)? {
if indices.last_tx_num() <= last_tx_num {
break
}
if let Some(indices) = provider.block_body_indices(last_block)? &&
indices.last_tx_num() <= last_tx_num
{
break
}
if last_block == 0 {
break

View File

@@ -23,11 +23,11 @@ pub fn maybe_generate_tests(
let mut iter = args.into_iter().peekable();
// we check if there's a crate argument which is used from inside the codecs crate directly
if let Some(arg) = iter.peek() {
if arg.to_string() == "crate" {
is_crate = true;
iter.next();
}
if let Some(arg) = iter.peek() &&
arg.to_string() == "crate"
{
is_crate = true;
iter.next();
}
for arg in iter {

View File

@@ -171,28 +171,14 @@ fn load_field_from_segments(
///
/// If so, we use another impl to code/decode its data.
fn should_use_alt_impl(ftype: &str, segment: &syn::PathSegment) -> bool {
if ftype == "Vec" || ftype == "Option" {
if let syn::PathArguments::AngleBracketed(ref args) = segment.arguments {
if let Some(syn::GenericArgument::Type(syn::Type::Path(arg_path))) = args.args.last() {
if let (Some(path), 1) =
(arg_path.path.segments.first(), arg_path.path.segments.len())
{
if [
"B256",
"Address",
"Address",
"Bloom",
"TxHash",
"BlockHash",
"CompactPlaceholder",
]
.contains(&path.ident.to_string().as_str())
{
return true
}
}
}
}
if (ftype == "Vec" || ftype == "Option") &&
let syn::PathArguments::AngleBracketed(ref args) = segment.arguments &&
let Some(syn::GenericArgument::Type(syn::Type::Path(arg_path))) = args.args.last() &&
let (Some(path), 1) = (arg_path.path.segments.first(), arg_path.path.segments.len()) &&
["B256", "Address", "Address", "Bloom", "TxHash", "BlockHash", "CompactPlaceholder"]
.contains(&path.ident.to_string().as_str())
{
return true
}
false
}

View File

@@ -69,8 +69,8 @@ pub fn derive_zstd(input: TokenStream) -> TokenStream {
let mut decompressor = None;
for attr in &input.attrs {
if attr.path().is_ident("reth_zstd") {
if let Err(err) = attr.parse_nested_meta(|meta| {
if attr.path().is_ident("reth_zstd") &&
let Err(err) = attr.parse_nested_meta(|meta| {
if meta.path.is_ident("compressor") {
let value = meta.value()?;
let path: syn::Path = value.parse()?;
@@ -83,9 +83,9 @@ pub fn derive_zstd(input: TokenStream) -> TokenStream {
return Err(meta.error("unsupported attribute"))
}
Ok(())
}) {
return err.to_compile_error().into()
}
})
{
return err.to_compile_error().into()
}
}

View File

@@ -44,17 +44,18 @@ impl StorageLock {
#[cfg(any(test, not(feature = "disable-lock")))]
fn try_acquire_file_lock(path: &Path) -> Result<Self, StorageLockError> {
let file_path = path.join(LOCKFILE_NAME);
if let Some(process_lock) = ProcessUID::parse(&file_path)? {
if process_lock.pid != (process::id() as usize) && process_lock.is_active() {
reth_tracing::tracing::error!(
target: "reth::db::lockfile",
path = ?file_path,
pid = process_lock.pid,
start_time = process_lock.start_time,
"Storage lock already taken."
);
return Err(StorageLockError::Taken(process_lock.pid))
}
if let Some(process_lock) = ProcessUID::parse(&file_path)? &&
process_lock.pid != (process::id() as usize) &&
process_lock.is_active()
{
reth_tracing::tracing::error!(
target: "reth::db::lockfile",
path = ?file_path,
pid = process_lock.pid,
start_time = process_lock.start_time,
"Storage lock already taken."
);
return Err(StorageLockError::Taken(process_lock.pid))
}
Ok(Self(Arc::new(StorageLockInner::new(file_path)?)))
@@ -141,15 +142,15 @@ impl ProcessUID {
/// Parses [`Self`] from a file.
fn parse(path: &Path) -> Result<Option<Self>, StorageLockError> {
if path.exists() {
if let Ok(contents) = reth_fs_util::read_to_string(path) {
let mut lines = contents.lines();
if let (Some(Ok(pid)), Some(Ok(start_time))) = (
lines.next().map(str::trim).map(str::parse),
lines.next().map(str::trim).map(str::parse),
) {
return Ok(Some(Self { pid, start_time }));
}
if path.exists() &&
let Ok(contents) = reth_fs_util::read_to_string(path)
{
let mut lines = contents.lines();
if let (Some(Ok(pid)), Some(Ok(start_time))) = (
lines.next().map(str::trim).map(str::parse),
lines.next().map(str::trim).map(str::parse),
) {
return Ok(Some(Self { pid, start_time }));
}
}
Ok(None)

View File

@@ -33,25 +33,22 @@ pub fn iter_static_files(path: &Path) -> Result<SortedStaticFiles, NippyJarError
.map_err(|err| NippyJarError::Custom(err.to_string()))?
.filter_map(Result::ok);
for entry in entries {
if entry.metadata().is_ok_and(|metadata| metadata.is_file()) {
if let Some((segment, _)) =
if entry.metadata().is_ok_and(|metadata| metadata.is_file()) &&
let Some((segment, _)) =
StaticFileSegment::parse_filename(&entry.file_name().to_string_lossy())
{
let jar = NippyJar::<SegmentHeader>::load(&entry.path())?;
{
let jar = NippyJar::<SegmentHeader>::load(&entry.path())?;
let (block_range, tx_range) = (
jar.user_header().block_range().copied(),
jar.user_header().tx_range().copied(),
);
let (block_range, tx_range) =
(jar.user_header().block_range().copied(), jar.user_header().tx_range().copied());
if let Some(block_range) = block_range {
match static_files.entry(segment) {
Entry::Occupied(mut entry) => {
entry.get_mut().push((block_range, tx_range));
}
Entry::Vacant(entry) => {
entry.insert(vec![(block_range, tx_range)]);
}
if let Some(block_range) = block_range {
match static_files.entry(segment) {
Entry::Occupied(mut entry) => {
entry.get_mut().push((block_range, tx_range));
}
Entry::Vacant(entry) => {
entry.insert(vec![(block_range, tx_range)]);
}
}
}

View File

@@ -17,7 +17,7 @@ pub trait TableObject: Sized {
_: *const ffi::MDBX_txn,
data_val: ffi::MDBX_val,
) -> Result<Self, Error> {
let s = slice::from_raw_parts(data_val.iov_base as *const u8, data_val.iov_len);
let s = unsafe { slice::from_raw_parts(data_val.iov_base as *const u8, data_val.iov_len) };
Self::decode(s)
}
}
@@ -32,7 +32,7 @@ impl TableObject for Cow<'_, [u8]> {
_txn: *const ffi::MDBX_txn,
data_val: ffi::MDBX_val,
) -> Result<Self, Error> {
let s = slice::from_raw_parts(data_val.iov_base as *const u8, data_val.iov_len);
let s = unsafe { slice::from_raw_parts(data_val.iov_base as *const u8, data_val.iov_len) };
#[cfg(feature = "return-borrowed")]
{

View File

@@ -476,7 +476,7 @@ impl Transaction<RW> {
/// Caller must close ALL other [Database] and [Cursor] instances pointing to the same dbi
/// BEFORE calling this function.
pub unsafe fn drop_db(&self, db: Database) -> Result<()> {
mdbx_result(self.txn_execute(|txn| ffi::mdbx_drop(txn, db.dbi(), true))?)?;
mdbx_result(self.txn_execute(|txn| unsafe { ffi::mdbx_drop(txn, db.dbi(), true) })?)?;
Ok(())
}
@@ -489,7 +489,7 @@ impl Transaction<RO> {
/// Caller must close ALL other [Database] and [Cursor] instances pointing to the same dbi
/// BEFORE calling this function.
pub unsafe fn close_db(&self, db: Database) -> Result<()> {
mdbx_result(ffi::mdbx_dbi_close(self.env().env_ptr(), db.dbi()))?;
mdbx_result(unsafe { ffi::mdbx_dbi_close(self.env().env_ptr(), db.dbi()) })?;
Ok(())
}

View File

@@ -309,10 +309,10 @@ impl<H: NippyJarHeader> NippyJar<H> {
return Err(NippyJarError::ColumnLenMismatch(self.columns, columns.len()))
}
if let Some(compression) = &self.compressor {
if !compression.is_ready() {
return Err(NippyJarError::CompressorNotReady)
}
if let Some(compression) = &self.compressor &&
!compression.is_ready()
{
return Err(NippyJarError::CompressorNotReady)
}
Ok(())

View File

@@ -404,10 +404,10 @@ impl<H: NippyJarHeader> NippyJarWriter<H> {
// Appends new offsets to disk
for offset in self.offsets.drain(..) {
if let Some(last_offset_ondisk) = last_offset_ondisk.take() {
if last_offset_ondisk == offset {
continue
}
if let Some(last_offset_ondisk) = last_offset_ondisk.take() &&
last_offset_ondisk == offset
{
continue
}
self.offsets_file.write_all(&offset.to_le_bytes())?;
}

View File

@@ -594,10 +594,10 @@ impl<N: ProviderNodeTypes> StateProviderFactory for BlockchainProvider<N> {
}
fn pending_state_by_hash(&self, block_hash: B256) -> ProviderResult<Option<StateProviderBox>> {
if let Some(pending) = self.canonical_in_memory_state.pending_state() {
if pending.hash() == block_hash {
return Ok(Some(Box::new(self.block_state_provider(&pending)?)));
}
if let Some(pending) = self.canonical_in_memory_state.pending_state() &&
pending.hash() == block_hash
{
return Ok(Some(Box::new(self.block_state_provider(&pending)?)));
}
Ok(None)
}
@@ -965,26 +965,26 @@ mod tests {
) {
let hook_provider = provider.clone();
provider.database.db_ref().set_post_transaction_hook(Box::new(move || {
if let Some(state) = hook_provider.canonical_in_memory_state.head_state() {
if state.anchor().number + 1 == block_number {
let mut lowest_memory_block =
state.parent_state_chain().last().expect("qed").block();
let num_hash = lowest_memory_block.recovered_block().num_hash();
if let Some(state) = hook_provider.canonical_in_memory_state.head_state() &&
state.anchor().number + 1 == block_number
{
let mut lowest_memory_block =
state.parent_state_chain().last().expect("qed").block();
let num_hash = lowest_memory_block.recovered_block().num_hash();
let mut execution_output = (*lowest_memory_block.execution_output).clone();
execution_output.first_block = lowest_memory_block.recovered_block().number;
lowest_memory_block.execution_output = Arc::new(execution_output);
let mut execution_output = (*lowest_memory_block.execution_output).clone();
execution_output.first_block = lowest_memory_block.recovered_block().number;
lowest_memory_block.execution_output = Arc::new(execution_output);
// Push to disk
let provider_rw = hook_provider.database_provider_rw().unwrap();
UnifiedStorageWriter::from(&provider_rw, &hook_provider.static_file_provider())
.save_blocks(vec![lowest_memory_block])
.unwrap();
UnifiedStorageWriter::commit(provider_rw).unwrap();
// Push to disk
let provider_rw = hook_provider.database_provider_rw().unwrap();
UnifiedStorageWriter::from(&provider_rw, &hook_provider.static_file_provider())
.save_blocks(vec![lowest_memory_block])
.unwrap();
UnifiedStorageWriter::commit(provider_rw).unwrap();
// Remove from memory
hook_provider.canonical_in_memory_state.remove_persisted_blocks(num_hash);
}
// Remove from memory
hook_provider.canonical_in_memory_state.remove_persisted_blocks(num_hash);
}
}));
}

View File

@@ -536,10 +536,10 @@ impl<N: ProviderNodeTypes> ConsistentProvider<N> {
// If the transaction number is less than the first in-memory transaction number, make a
// database lookup
if let HashOrNumber::Number(id) = id {
if id < in_memory_tx_num {
return fetch_from_db(provider)
}
if let HashOrNumber::Number(id) = id &&
id < in_memory_tx_num
{
return fetch_from_db(provider)
}
// Iterate from the lowest block to the highest
@@ -816,14 +816,14 @@ impl<N: ProviderNodeTypes> BlockReader for ConsistentProvider<N> {
hash: B256,
source: BlockSource,
) -> ProviderResult<Option<Self::Block>> {
if matches!(source, BlockSource::Canonical | BlockSource::Any) {
if let Some(block) = self.get_in_memory_or_storage_by_block(
if matches!(source, BlockSource::Canonical | BlockSource::Any) &&
let Some(block) = self.get_in_memory_or_storage_by_block(
hash.into(),
|db_provider| db_provider.find_block_by_hash(hash, BlockSource::Canonical),
|block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
)? {
return Ok(Some(block))
}
)?
{
return Ok(Some(block))
}
if matches!(source, BlockSource::Pending | BlockSource::Any) {
@@ -1133,14 +1133,14 @@ impl<N: ProviderNodeTypes> ReceiptProviderIdExt for ConsistentProvider<N> {
match block {
BlockId::Hash(rpc_block_hash) => {
let mut receipts = self.receipts_by_block(rpc_block_hash.block_hash.into())?;
if receipts.is_none() && !rpc_block_hash.require_canonical.unwrap_or(false) {
if let Some(state) = self
if receipts.is_none() &&
!rpc_block_hash.require_canonical.unwrap_or(false) &&
let Some(state) = self
.head_block
.as_ref()
.and_then(|b| b.block_on_chain(rpc_block_hash.block_hash.into()))
{
receipts = Some(state.executed_block_receipts());
}
{
receipts = Some(state.executed_block_receipts());
}
Ok(receipts)
}

View File

@@ -67,10 +67,10 @@ where
//
// To ensure this doesn't happen, we just have to make sure that we fetch from the same
// data source that we used during initialization. In this case, that is static files
if let Some((hash, number)) = self.tip {
if provider_ro.sealed_header(number)?.is_none_or(|header| header.hash() != hash) {
return Err(ConsistentViewError::Reorged { block: hash }.into())
}
if let Some((hash, number)) = self.tip &&
provider_ro.sealed_header(number)?.is_none_or(|header| header.hash() != hash)
{
return Err(ConsistentViewError::Reorged { block: hash }.into())
}
Ok(provider_ro)

View File

@@ -1020,12 +1020,12 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabasePro
}
fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult<Option<U256>> {
if self.chain_spec.is_paris_active_at_block(number) {
if let Some(td) = self.chain_spec.final_paris_total_difficulty() {
// if this block is higher than the final paris(merge) block, return the final paris
// difficulty
return Ok(Some(td))
}
if self.chain_spec.is_paris_active_at_block(number) &&
let Some(td) = self.chain_spec.final_paris_total_difficulty()
{
// if this block is higher than the final paris(merge) block, return the final paris
// difficulty
return Ok(Some(td))
}
self.static_file_provider.get_with_static_file_or_database(
@@ -1180,25 +1180,25 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvid
/// If the header is found, but the transactions either do not exist, or are not indexed, this
/// will return None.
fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
if let Some(number) = self.convert_hash_or_number(id)? {
if let Some(header) = self.header_by_number(number)? {
// If the body indices are not found, this means that the transactions either do not
// exist in the database yet, or they do exit but are not indexed.
// If they exist but are not indexed, we don't have enough
// information to return the block anyways, so we return `None`.
let Some(transactions) = self.transactions_by_block(number.into())? else {
return Ok(None)
};
if let Some(number) = self.convert_hash_or_number(id)? &&
let Some(header) = self.header_by_number(number)?
{
// If the body indices are not found, this means that the transactions either do not
// exist in the database yet, or they do exit but are not indexed.
// If they exist but are not indexed, we don't have enough
// information to return the block anyways, so we return `None`.
let Some(transactions) = self.transactions_by_block(number.into())? else {
return Ok(None)
};
let body = self
.storage
.reader()
.read_block_bodies(self, vec![(&header, transactions)])?
.pop()
.ok_or(ProviderError::InvalidStorageOutput)?;
let body = self
.storage
.reader()
.read_block_bodies(self, vec![(&header, transactions)])?
.pop()
.ok_or(ProviderError::InvalidStorageOutput)?;
return Ok(Some(Self::Block::new(header, body)))
}
return Ok(Some(Self::Block::new(header, body)))
}
Ok(None)
@@ -1416,34 +1416,31 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for Datab
tx_hash: TxHash,
) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
let mut transaction_cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
if let Some(transaction_id) = self.transaction_id(tx_hash)? {
if let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? {
if let Some(block_number) =
transaction_cursor.seek(transaction_id).map(|b| b.map(|(_, bn)| bn))?
{
if let Some(sealed_header) = self.sealed_header(block_number)? {
let (header, block_hash) = sealed_header.split();
if let Some(block_body) = self.block_body_indices(block_number)? {
// the index of the tx in the block is the offset:
// len([start..tx_id])
// NOTE: `transaction_id` is always `>=` the block's first
// index
let index = transaction_id - block_body.first_tx_num();
if let Some(transaction_id) = self.transaction_id(tx_hash)? &&
let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? &&
let Some(block_number) =
transaction_cursor.seek(transaction_id).map(|b| b.map(|(_, bn)| bn))? &&
let Some(sealed_header) = self.sealed_header(block_number)?
{
let (header, block_hash) = sealed_header.split();
if let Some(block_body) = self.block_body_indices(block_number)? {
// the index of the tx in the block is the offset:
// len([start..tx_id])
// NOTE: `transaction_id` is always `>=` the block's first
// index
let index = transaction_id - block_body.first_tx_num();
let meta = TransactionMeta {
tx_hash,
index,
block_hash,
block_number,
base_fee: header.base_fee_per_gas(),
excess_blob_gas: header.excess_blob_gas(),
timestamp: header.timestamp(),
};
let meta = TransactionMeta {
tx_hash,
index,
block_hash,
block_number,
base_fee: header.base_fee_per_gas(),
excess_blob_gas: header.excess_blob_gas(),
timestamp: header.timestamp(),
};
return Ok(Some((transaction, meta)))
}
}
}
return Ok(Some((transaction, meta)))
}
}
@@ -1461,14 +1458,14 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for Datab
) -> ProviderResult<Option<Vec<Self::Transaction>>> {
let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<Self::Transaction>>()?;
if let Some(block_number) = self.convert_hash_or_number(id)? {
if let Some(body) = self.block_body_indices(block_number)? {
let tx_range = body.tx_num_range();
return if tx_range.is_empty() {
Ok(Some(Vec::new()))
} else {
Ok(Some(self.transactions_by_tx_range_with_cursor(tx_range, &mut tx_cursor)?))
}
if let Some(block_number) = self.convert_hash_or_number(id)? &&
let Some(body) = self.block_body_indices(block_number)?
{
let tx_range = body.tx_num_range();
return if tx_range.is_empty() {
Ok(Some(Vec::new()))
} else {
Ok(Some(self.transactions_by_tx_range_with_cursor(tx_range, &mut tx_cursor)?))
}
}
Ok(None)
@@ -1543,14 +1540,14 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabasePr
&self,
block: BlockHashOrNumber,
) -> ProviderResult<Option<Vec<Self::Receipt>>> {
if let Some(number) = self.convert_hash_or_number(block)? {
if let Some(body) = self.block_body_indices(number)? {
let tx_range = body.tx_num_range();
return if tx_range.is_empty() {
Ok(Some(Vec::new()))
} else {
self.receipts_by_tx_range(tx_range).map(Some)
}
if let Some(number) = self.convert_hash_or_number(block)? &&
let Some(body) = self.block_body_indices(number)?
{
let tx_range = body.tx_num_range();
return if tx_range.is_empty() {
Ok(Some(Vec::new()))
} else {
self.receipts_by_tx_range(tx_range).map(Some)
}
}
Ok(None)
@@ -2000,10 +1997,10 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
for entry in storage {
tracing::trace!(?address, ?entry.key, "Updating plain state storage");
if let Some(db_entry) = storages_cursor.seek_by_key_subkey(address, entry.key)? {
if db_entry.key == entry.key {
storages_cursor.delete_current()?;
}
if let Some(db_entry) = storages_cursor.seek_by_key_subkey(address, entry.key)? &&
db_entry.key == entry.key
{
storages_cursor.delete_current()?;
}
if !entry.value.is_zero() {
@@ -2038,11 +2035,10 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
for (hashed_slot, value) in storage.storage_slots_sorted() {
let entry = StorageEntry { key: hashed_slot, value };
if let Some(db_entry) =
hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)?
hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)? &&
db_entry.key == entry.key
{
if db_entry.key == entry.key {
hashed_storage_cursor.delete_current()?;
}
hashed_storage_cursor.delete_current()?;
}
if !entry.value.is_zero() {

View File

@@ -158,10 +158,10 @@ impl<Provider: DBProvider + BlockHashReader> StateProvider
storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
let mut cursor = self.tx().cursor_dup_read::<tables::PlainStorageState>()?;
if let Some(entry) = cursor.seek_by_key_subkey(account, storage_key)? {
if entry.key == storage_key {
return Ok(Some(entry.value))
}
if let Some(entry) = cursor.seek_by_key_subkey(account, storage_key)? &&
entry.key == storage_key
{
return Ok(Some(entry.value))
}
Ok(None)
}

Some files were not shown because too many files have changed in this diff Show More