feat(storage): read headers and transactions only from static files (#18788)

This commit is contained in:
Alexey Shekhirin
2025-10-06 12:39:48 +01:00
committed by GitHub
parent d77bfd89b4
commit e9598ba5ac
33 changed files with 264 additions and 442 deletions

2
Cargo.lock generated
View File

@@ -7845,8 +7845,6 @@ dependencies = [
"reth-chainspec",
"reth-config",
"reth-consensus",
"reth-db",
"reth-db-api",
"reth-ethereum-primitives",
"reth-metrics",
"reth-network-p2p",

View File

@@ -2659,7 +2659,7 @@ where
let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
if canonical.is_none() {
canonical = self.provider.header(&hash)?.map(|header| SealedHeader::new(header, hash));
canonical = self.provider.header(hash)?.map(|header| SealedHeader::new(header, hash));
}
Ok(canonical)
@@ -2883,7 +2883,7 @@ where
}
// Check if the block is persisted
if let Some(header) = self.provider.header(&hash)? {
if let Some(header) = self.provider.header(hash)? {
debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
// For persisted blocks, we create a builder that will fetch state directly from the
// database

View File

@@ -967,7 +967,7 @@ where
}
// Check if the block is persisted
if let Some(header) = self.provider.header(&hash)? {
if let Some(header) = self.provider.header(hash)? {
debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
// For persisted blocks, we create a builder that will fetch state directly from the
// database

View File

@@ -370,7 +370,7 @@ where
.map(|(exex_id, num_hash)| {
num_hash.map_or(Ok((exex_id, num_hash, false)), |num_hash| {
self.provider
.is_known(&num_hash.hash)
.is_known(num_hash.hash)
// Save the ExEx ID, finished height, and whether the hash is canonical
.map(|is_canonical| (exex_id, Some(num_hash), is_canonical))
})

View File

@@ -308,7 +308,7 @@ where
/// we're not on the canonical chain and we need to revert the notification with the ExEx
/// head block.
fn check_canonical(&mut self) -> eyre::Result<Option<ExExNotification<E::Primitives>>> {
if self.provider.is_known(&self.initial_exex_head.block.hash)? &&
if self.provider.is_known(self.initial_exex_head.block.hash)? &&
self.initial_exex_head.block.number <= self.initial_local_head.number
{
// we have the targeted block and that block is below the current head

View File

@@ -22,9 +22,8 @@ reth-storage-api.workspace = true
reth-tasks.workspace = true
# optional deps for the test-utils feature
reth-db = { workspace = true, optional = true }
reth-db-api = { workspace = true, optional = true }
reth-ethereum-primitives = { workspace = true, optional = true }
reth-provider = { workspace = true, optional = true }
reth-testing-utils = { workspace = true, optional = true }
# ethereum
@@ -58,8 +57,6 @@ itertools.workspace = true
async-compression = { workspace = true, features = ["gzip", "tokio"] }
reth-ethereum-primitives.workspace = true
reth-chainspec.workspace = true
reth-db = { workspace = true, features = ["test-utils"] }
reth-db-api.workspace = true
reth-consensus = { workspace = true, features = ["test-utils"] }
reth-network-p2p = { workspace = true, features = ["test-utils"] }
reth-provider = { workspace = true, features = ["test-utils"] }
@@ -76,13 +73,10 @@ default = []
file-client = ["dep:async-compression", "dep:alloy-rlp"]
test-utils = [
"tempfile",
"reth-db-api",
"reth-db/test-utils",
"reth-consensus/test-utils",
"reth-network-p2p/test-utils",
"reth-testing-utils",
"reth-chainspec/test-utils",
"reth-db-api?/test-utils",
"reth-provider/test-utils",
"reth-primitives-traits/test-utils",
"dep:reth-ethereum-primitives",

View File

@@ -618,12 +618,8 @@ mod tests {
};
use alloy_primitives::B256;
use assert_matches::assert_matches;
use reth_chainspec::MAINNET;
use reth_consensus::test_utils::TestConsensus;
use reth_db::test_utils::{create_test_rw_db, create_test_static_files_dir};
use reth_provider::{
providers::StaticFileProvider, test_utils::MockNodeTypesWithDB, ProviderFactory,
};
use reth_provider::test_utils::create_test_provider_factory;
use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
use std::collections::HashMap;
@@ -632,25 +628,20 @@ mod tests {
#[tokio::test]
async fn streams_bodies_in_order() {
// Generate some random blocks
let db = create_test_rw_db();
let factory = create_test_provider_factory();
let (headers, mut bodies) = generate_bodies(0..=19);
insert_headers(db.db(), &headers);
insert_headers(&factory, &headers);
let client = Arc::new(
TestBodiesClient::default().with_bodies(bodies.clone()).with_should_delay(true),
);
let (_static_dir, static_dir_path) = create_test_static_files_dir();
let mut downloader = BodiesDownloaderBuilder::default()
.build::<reth_ethereum_primitives::Block, _, _>(
client.clone(),
Arc::new(TestConsensus::default()),
ProviderFactory::<MockNodeTypesWithDB>::new(
db,
MAINNET.clone(),
StaticFileProvider::read_write(static_dir_path).unwrap(),
),
factory,
);
downloader.set_download_range(0..=19).expect("failed to set download range");
@@ -666,7 +657,7 @@ mod tests {
#[tokio::test]
async fn requests_correct_number_of_times() {
// Generate some random blocks
let db = create_test_rw_db();
let factory = create_test_provider_factory();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
@@ -680,22 +671,17 @@ mod tests {
.map(|block| (block.hash(), block.into_body()))
.collect::<HashMap<_, _>>();
insert_headers(db.db(), &headers);
insert_headers(&factory, &headers);
let request_limit = 10;
let client = Arc::new(TestBodiesClient::default().with_bodies(bodies.clone()));
let (_static_dir, static_dir_path) = create_test_static_files_dir();
let mut downloader = BodiesDownloaderBuilder::default()
.with_request_limit(request_limit)
.build::<reth_ethereum_primitives::Block, _, _>(
client.clone(),
Arc::new(TestConsensus::default()),
ProviderFactory::<MockNodeTypesWithDB>::new(
db,
MAINNET.clone(),
StaticFileProvider::read_write(static_dir_path).unwrap(),
),
factory,
);
downloader.set_download_range(0..=199).expect("failed to set download range");
@@ -708,28 +694,23 @@ mod tests {
#[tokio::test]
async fn streams_bodies_in_order_after_range_reset() {
// Generate some random blocks
let db = create_test_rw_db();
let factory = create_test_provider_factory();
let (headers, mut bodies) = generate_bodies(0..=99);
insert_headers(db.db(), &headers);
insert_headers(&factory, &headers);
let stream_batch_size = 20;
let request_limit = 10;
let client = Arc::new(
TestBodiesClient::default().with_bodies(bodies.clone()).with_should_delay(true),
);
let (_static_dir, static_dir_path) = create_test_static_files_dir();
let mut downloader = BodiesDownloaderBuilder::default()
.with_stream_batch_size(stream_batch_size)
.with_request_limit(request_limit)
.build::<reth_ethereum_primitives::Block, _, _>(
client.clone(),
Arc::new(TestConsensus::default()),
ProviderFactory::<MockNodeTypesWithDB>::new(
db,
MAINNET.clone(),
StaticFileProvider::read_write(static_dir_path).unwrap(),
),
factory,
);
let mut range_start = 0;
@@ -750,24 +731,19 @@ mod tests {
#[tokio::test]
async fn can_download_new_range_after_termination() {
// Generate some random blocks
let db = create_test_rw_db();
let factory = create_test_provider_factory();
let (headers, mut bodies) = generate_bodies(0..=199);
insert_headers(db.db(), &headers);
insert_headers(&factory, &headers);
let client = Arc::new(TestBodiesClient::default().with_bodies(bodies.clone()));
let (_static_dir, static_dir_path) = create_test_static_files_dir();
let mut downloader = BodiesDownloaderBuilder::default()
.with_stream_batch_size(100)
.build::<reth_ethereum_primitives::Block, _, _>(
client.clone(),
Arc::new(TestConsensus::default()),
ProviderFactory::<MockNodeTypesWithDB>::new(
db,
MAINNET.clone(),
StaticFileProvider::read_write(static_dir_path).unwrap(),
),
factory,
);
// Set and download the first range
@@ -792,14 +768,13 @@ mod tests {
#[tokio::test]
async fn can_download_after_exceeding_limit() {
// Generate some random blocks
let db = create_test_rw_db();
let factory = create_test_provider_factory();
let (headers, mut bodies) = generate_bodies(0..=199);
insert_headers(db.db(), &headers);
insert_headers(&factory, &headers);
let client = Arc::new(TestBodiesClient::default().with_bodies(bodies.clone()));
let (_static_dir, static_dir_path) = create_test_static_files_dir();
// Set the max buffered block size to 1 byte, to make sure that every response exceeds the
// limit
let mut downloader = BodiesDownloaderBuilder::default()
@@ -809,11 +784,7 @@ mod tests {
.build::<reth_ethereum_primitives::Block, _, _>(
client.clone(),
Arc::new(TestConsensus::default()),
ProviderFactory::<MockNodeTypesWithDB>::new(
db,
MAINNET.clone(),
StaticFileProvider::read_write(static_dir_path).unwrap(),
),
factory,
);
// Set and download the entire range
@@ -829,16 +800,15 @@ mod tests {
#[tokio::test]
async fn can_tolerate_empty_responses() {
// Generate some random blocks
let db = create_test_rw_db();
let factory = create_test_provider_factory();
let (headers, mut bodies) = generate_bodies(0..=99);
insert_headers(db.db(), &headers);
insert_headers(&factory, &headers);
// respond with empty bodies for every other request.
let client = Arc::new(
TestBodiesClient::default().with_bodies(bodies.clone()).with_empty_responses(2),
);
let (_static_dir, static_dir_path) = create_test_static_files_dir();
let mut downloader = BodiesDownloaderBuilder::default()
.with_request_limit(3)
@@ -846,11 +816,7 @@ mod tests {
.build::<reth_ethereum_primitives::Block, _, _>(
client.clone(),
Arc::new(TestConsensus::default()),
ProviderFactory::<MockNodeTypesWithDB>::new(
db,
MAINNET.clone(),
StaticFileProvider::read_write(static_dir_path).unwrap(),
),
factory,
);
// Download the requested range

View File

@@ -190,7 +190,7 @@ mod tests {
let factory = create_test_provider_factory();
let (headers, mut bodies) = generate_bodies(0..=19);
insert_headers(factory.db_ref().db(), &headers);
insert_headers(&factory, &headers);
let client = Arc::new(
TestBodiesClient::default().with_bodies(bodies.clone()).with_should_delay(true),

View File

@@ -3,12 +3,14 @@
#![allow(dead_code)]
use alloy_consensus::BlockHeader;
use alloy_primitives::B256;
use reth_db::DatabaseEnv;
use reth_db_api::{database::Database, tables, transaction::DbTxMut};
use alloy_primitives::{B256, U256};
use reth_ethereum_primitives::BlockBody;
use reth_network_p2p::bodies::response::BlockResponse;
use reth_primitives_traits::{Block, SealedBlock, SealedHeader};
use reth_provider::{
test_utils::MockNodeTypesWithDB, ProviderFactory, StaticFileProviderFactory, StaticFileSegment,
StaticFileWriter,
};
use std::collections::HashMap;
pub(crate) fn zip_blocks<'a, B: Block>(
@@ -42,12 +44,21 @@ pub(crate) fn create_raw_bodies(
}
#[inline]
pub(crate) fn insert_headers(db: &DatabaseEnv, headers: &[SealedHeader]) {
db.update(|tx| {
for header in headers {
tx.put::<tables::CanonicalHeaders>(header.number, header.hash()).unwrap();
tx.put::<tables::Headers>(header.number, header.clone_header()).unwrap();
}
})
.expect("failed to commit")
pub(crate) fn insert_headers(
factory: &ProviderFactory<MockNodeTypesWithDB>,
headers: &[SealedHeader],
) {
let provider_rw = factory.provider_rw().expect("failed to create provider");
let static_file_provider = provider_rw.static_file_provider();
let mut writer = static_file_provider
.latest_writer(StaticFileSegment::Headers)
.expect("failed to create writer");
for header in headers {
writer
.append_header(header.header(), U256::ZERO, &header.hash())
.expect("failed to append header");
}
drop(writer);
provider_rw.commit().expect("failed to commit");
}

View File

@@ -675,7 +675,7 @@ mod tests {
let factory = create_test_provider_factory();
let (headers, mut bodies) = generate_bodies(0..=19);
insert_headers(factory.db_ref().db(), &headers);
insert_headers(&factory, &headers);
// create an empty file
let file = tempfile::tempfile().unwrap();
@@ -770,7 +770,7 @@ mod tests {
Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());
// insert headers in db for the bodies downloader
insert_headers(factory.db_ref().db(), &headers);
insert_headers(&factory, &headers);
let mut downloader = BodiesDownloaderBuilder::default().build::<Block, _, _>(
client.clone(),

View File

@@ -157,7 +157,7 @@ mod tests {
1..=10,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
let mut tx_hash_numbers = Vec::new();
for block in &blocks {
@@ -170,11 +170,11 @@ mod tests {
db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
assert_eq!(
db.table::<tables::Transactions>().unwrap().len(),
db.count_entries::<tables::Transactions>().unwrap(),
blocks.iter().map(|block| block.transaction_count()).sum::<usize>()
);
assert_eq!(
db.table::<tables::Transactions>().unwrap().len(),
db.count_entries::<tables::Transactions>().unwrap(),
db.table::<tables::TransactionHashNumbers>().unwrap().len()
);

View File

@@ -539,7 +539,7 @@ where
this.action_task_spawner.spawn_blocking(Box::pin(async move {
// Acquire permit
let _permit = rate_limiter.acquire().await;
let header = provider.header(&block_hash).and_then(|header| {
let header = provider.header(block_hash).and_then(|header| {
header.ok_or_else(|| {
ProviderError::HeaderNotFound(block_hash.into())
})

View File

@@ -917,7 +917,7 @@ where
/// Handler for `debug_getRawHeader`
async fn raw_header(&self, block_id: BlockId) -> RpcResult<Bytes> {
let header = match block_id {
BlockId::Hash(hash) => self.provider().header(&hash.into()).to_rpc_result()?,
BlockId::Hash(hash) => self.provider().header(hash.into()).to_rpc_result()?,
BlockId::Number(number_or_tag) => {
let number = self
.provider()

View File

@@ -344,7 +344,7 @@ mod tests {
done: true,
}) if block_number == previous_stage &&
processed == total &&
total == runner.db.table::<tables::PlainAccountState>().unwrap().len() as u64
total == runner.db.count_entries::<tables::PlainAccountState>().unwrap() as u64
);
// Validate the stage execution

View File

@@ -266,7 +266,7 @@ mod tests {
},
..
}) if processed == previous_checkpoint.progress.processed + 1 &&
total == runner.db.table::<tables::PlainStorageState>().unwrap().len() as u64);
total == runner.db.count_entries::<tables::PlainStorageState>().unwrap() as u64);
// Continue from checkpoint
input.checkpoint = Some(checkpoint);
@@ -280,7 +280,7 @@ mod tests {
},
..
}) if processed == total &&
total == runner.db.table::<tables::PlainStorageState>().unwrap().len() as u64);
total == runner.db.count_entries::<tables::PlainStorageState>().unwrap() as u64);
// Validate the stage execution
assert!(

View File

@@ -493,8 +493,8 @@ mod tests {
done: true
}) if block_number == previous_stage && processed == total &&
total == (
runner.db.table::<tables::HashedAccounts>().unwrap().len() +
runner.db.table::<tables::HashedStorages>().unwrap().len()
runner.db.count_entries::<tables::HashedAccounts>().unwrap() +
runner.db.count_entries::<tables::HashedStorages>().unwrap()
) as u64
);
@@ -533,8 +533,8 @@ mod tests {
done: true
}) if block_number == previous_stage && processed == total &&
total == (
runner.db.table::<tables::HashedAccounts>().unwrap().len() +
runner.db.table::<tables::HashedStorages>().unwrap().len()
runner.db.count_entries::<tables::HashedAccounts>().unwrap() +
runner.db.count_entries::<tables::HashedStorages>().unwrap()
) as u64
);
@@ -575,8 +575,8 @@ mod tests {
done: true
}) if block_number == previous_stage && processed == total &&
total == (
runner.db.table::<tables::HashedAccounts>().unwrap().len() +
runner.db.table::<tables::HashedStorages>().unwrap().len()
runner.db.count_entries::<tables::HashedAccounts>().unwrap() +
runner.db.count_entries::<tables::HashedStorages>().unwrap()
) as u64
);

View File

@@ -490,7 +490,7 @@ mod tests {
ExecOutput {
checkpoint: StageCheckpoint::new(expected_progress).with_entities_stage_checkpoint(
EntitiesCheckpoint {
processed: runner.db.table::<tables::TransactionSenders>().unwrap().len()
processed: runner.db.count_entries::<tables::TransactionSenders>().unwrap()
as u64,
total: total_transactions
}

View File

@@ -264,7 +264,6 @@ mod tests {
use reth_primitives_traits::SealedBlock;
use reth_provider::{
providers::StaticFileWriter, BlockBodyIndicesProvider, DatabaseProviderFactory,
StaticFileProviderFactory,
};
use reth_stages_api::StageUnitCheckpoint;
use reth_testing_utils::generators::{
@@ -320,7 +319,7 @@ mod tests {
total
}))
}, done: true }) if block_number == previous_stage && processed == total &&
total == runner.db.factory.static_file_provider().count_entries::<tables::Transactions>().unwrap() as u64
total == runner.db.count_entries::<tables::Transactions>().unwrap() as u64
);
// Validate the stage execution
@@ -366,7 +365,7 @@ mod tests {
total
}))
}, done: true }) if block_number == previous_stage && processed == total &&
total == runner.db.factory.static_file_provider().count_entries::<tables::Transactions>().unwrap() as u64
total == runner.db.count_entries::<tables::Transactions>().unwrap() as u64
);
// Validate the stage execution

View File

@@ -19,7 +19,7 @@ use reth_primitives_traits::{Account, SealedBlock, SealedHeader, StorageEntry};
use reth_provider::{
providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter},
test_utils::MockNodeTypesWithDB,
HistoryWriter, ProviderError, ProviderFactory, StaticFileProviderFactory,
HistoryWriter, ProviderError, ProviderFactory, StaticFileProviderFactory, StatsReader,
};
use reth_static_file_types::StaticFileSegment;
use reth_storage_errors::provider::ProviderResult;
@@ -103,6 +103,11 @@ impl TestStageDB {
})
}
/// Return the number of entries in the table or static file segment
pub fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
self.factory.provider()?.count_entries::<T>()
}
/// Check that there is no table entry above a given
/// number by [`Table::Key`]
pub fn ensure_no_entry_above<T, F>(&self, num: u64, mut selector: F) -> ProviderResult<()>

View File

@@ -255,9 +255,8 @@ mod tests {
use crate::static_file_producer::{
StaticFileProducer, StaticFileProducerInner, StaticFileTargets,
};
use alloy_primitives::{B256, U256};
use alloy_primitives::B256;
use assert_matches::assert_matches;
use reth_db_api::{database::Database, transaction::DbTx};
use reth_provider::{
providers::StaticFileWriter, test_utils::MockNodeTypesWithDB, ProviderError,
ProviderFactory, StaticFileProviderFactory,
@@ -291,12 +290,7 @@ mod tests {
static_file_writer.commit().expect("prune headers");
drop(static_file_writer);
let tx = db.factory.db_ref().tx_mut().expect("init tx");
for block in &blocks {
TestStageDB::insert_header(None, &tx, block.sealed_header(), U256::ZERO)
.expect("insert block header");
}
tx.commit().expect("commit tx");
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let mut receipts = Vec::new();
for block in &blocks {

View File

@@ -182,7 +182,7 @@ impl<N: ProviderNodeTypes> StaticFileProviderFactory for BlockchainProvider<N> {
impl<N: ProviderNodeTypes> HeaderProvider for BlockchainProvider<N> {
type Header = HeaderTy<N>;
fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
self.consistent_provider()?.header(block_hash)
}
@@ -190,7 +190,7 @@ impl<N: ProviderNodeTypes> HeaderProvider for BlockchainProvider<N> {
self.consistent_provider()?.header_by_number(num)
}
fn header_td(&self, hash: &BlockHash) -> ProviderResult<Option<U256>> {
fn header_td(&self, hash: BlockHash) -> ProviderResult<Option<U256>> {
self.consistent_provider()?.header_td(hash)
}
@@ -342,6 +342,10 @@ impl<N: ProviderNodeTypes> BlockReader for BlockchainProvider<N> {
) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
self.consistent_provider()?.recovered_block_range(range)
}
fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
self.consistent_provider()?.block_by_transaction_id(id)
}
}
impl<N: ProviderNodeTypes> TransactionsProvider for BlockchainProvider<N> {
@@ -2290,13 +2294,15 @@ mod tests {
let test_tx_index = 0;
test_non_range!([
// TODO: header should use B256 like others instead of &B256
// (
// ONE,
// header,
// |block: &SealedBlock, tx_num: TxNumber, tx_hash: B256, receipts: &Vec<Vec<Receipt>>| (&block.hash(), Some(block.header.header().clone())),
// (&B256::random())
// ),
(
ONE,
header,
|block: &SealedBlock<Block>, _: TxNumber, _: B256, _: &Vec<Vec<Receipt>>| (
block.hash(),
Some(block.header().clone())
),
B256::random()
),
(
ONE,
header_by_number,

View File

@@ -646,9 +646,9 @@ impl<N: ProviderNodeTypes> StaticFileProviderFactory for ConsistentProvider<N> {
impl<N: ProviderNodeTypes> HeaderProvider for ConsistentProvider<N> {
type Header = HeaderTy<N>;
fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
self.get_in_memory_or_storage_by_block(
(*block_hash).into(),
block_hash.into(),
|db_provider| db_provider.header(block_hash),
|block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
)
@@ -662,8 +662,8 @@ impl<N: ProviderNodeTypes> HeaderProvider for ConsistentProvider<N> {
)
}
fn header_td(&self, hash: &BlockHash) -> ProviderResult<Option<U256>> {
if let Some(num) = self.block_number(*hash)? {
fn header_td(&self, hash: BlockHash) -> ProviderResult<Option<U256>> {
if let Some(num) = self.block_number(hash)? {
self.header_td_by_number(num)
} else {
Ok(None)
@@ -917,6 +917,14 @@ impl<N: ProviderNodeTypes> BlockReader for ConsistentProvider<N> {
|_| true,
)
}
fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
self.get_in_memory_or_storage_by_tx(
id.into(),
|db_provider| db_provider.block_by_transaction_id(id),
|_, _, block_state| Ok(Some(block_state.number())),
)
}
}
impl<N: ProviderNodeTypes> TransactionsProvider for ConsistentProvider<N> {
@@ -1305,14 +1313,14 @@ impl<N: ProviderNodeTypes> BlockReaderIdExt for ConsistentProvider<N> {
) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
Ok(match id {
BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?,
BlockId::Hash(hash) => self.header(&hash.block_hash)?.map(SealedHeader::seal_slow),
BlockId::Hash(hash) => self.header(hash.block_hash)?.map(SealedHeader::seal_slow),
})
}
fn header_by_id(&self, id: BlockId) -> ProviderResult<Option<HeaderTy<N>>> {
Ok(match id {
BlockId::Number(num) => self.header_by_number_or_tag(num)?,
BlockId::Hash(hash) => self.header(&hash.block_hash)?,
BlockId::Hash(hash) => self.header(hash.block_hash)?,
})
}
}

View File

@@ -234,57 +234,41 @@ impl<N: ProviderNodeTypes> HeaderSyncGapProvider for ProviderFactory<N> {
impl<N: ProviderNodeTypes> HeaderProvider for ProviderFactory<N> {
type Header = HeaderTy<N>;
fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
self.provider()?.header(block_hash)
}
fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
self.static_file_provider.get_with_static_file_or_database(
StaticFileSegment::Headers,
num,
|static_file| static_file.header_by_number(num),
|| self.provider()?.header_by_number(num),
)
self.static_file_provider.header_by_number(num)
}
fn header_td(&self, hash: &BlockHash) -> ProviderResult<Option<U256>> {
fn header_td(&self, hash: BlockHash) -> ProviderResult<Option<U256>> {
self.provider()?.header_td(hash)
}
fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult<Option<U256>> {
self.provider()?.header_td_by_number(number)
self.static_file_provider.header_td_by_number(number)
}
fn headers_range(
&self,
range: impl RangeBounds<BlockNumber>,
) -> ProviderResult<Vec<Self::Header>> {
self.static_file_provider.get_range_with_static_file_or_database(
StaticFileSegment::Headers,
to_range(range),
|static_file, range, _| static_file.headers_range(range),
|range, _| self.provider()?.headers_range(range),
|_| true,
)
self.static_file_provider.headers_range(range)
}
fn sealed_header(
&self,
number: BlockNumber,
) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
self.static_file_provider.get_with_static_file_or_database(
StaticFileSegment::Headers,
number,
|static_file| static_file.sealed_header(number),
|| self.provider()?.sealed_header(number),
)
self.static_file_provider.sealed_header(number)
}
fn sealed_headers_range(
&self,
range: impl RangeBounds<BlockNumber>,
) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
self.sealed_headers_while(range, |_| true)
self.static_file_provider.sealed_headers_range(range)
}
fn sealed_headers_while(
@@ -292,24 +276,13 @@ impl<N: ProviderNodeTypes> HeaderProvider for ProviderFactory<N> {
range: impl RangeBounds<BlockNumber>,
predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
self.static_file_provider.get_range_with_static_file_or_database(
StaticFileSegment::Headers,
to_range(range),
|static_file, range, predicate| static_file.sealed_headers_while(range, predicate),
|range, predicate| self.provider()?.sealed_headers_while(range, predicate),
predicate,
)
self.static_file_provider.sealed_headers_while(range, predicate)
}
}
impl<N: ProviderNodeTypes> BlockHashReader for ProviderFactory<N> {
fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
self.static_file_provider.get_with_static_file_or_database(
StaticFileSegment::Headers,
number,
|static_file| static_file.block_hash(number),
|| self.provider()?.block_hash(number),
)
self.static_file_provider.block_hash(number)
}
fn canonical_hashes_range(
@@ -317,13 +290,7 @@ impl<N: ProviderNodeTypes> BlockHashReader for ProviderFactory<N> {
start: BlockNumber,
end: BlockNumber,
) -> ProviderResult<Vec<B256>> {
self.static_file_provider.get_range_with_static_file_or_database(
StaticFileSegment::Headers,
start..end,
|static_file, range, _| static_file.canonical_hashes_range(range.start, range.end),
|range, _| self.provider()?.canonical_hashes_range(range.start, range.end),
|_| true,
)
self.static_file_provider.canonical_hashes_range(start, end)
}
}
@@ -337,7 +304,7 @@ impl<N: ProviderNodeTypes> BlockNumReader for ProviderFactory<N> {
}
fn last_block_number(&self) -> ProviderResult<BlockNumber> {
self.provider()?.last_block_number()
self.static_file_provider.last_block_number()
}
fn earliest_block_number(&self) -> ProviderResult<BlockNumber> {
@@ -409,6 +376,10 @@ impl<N: ProviderNodeTypes> BlockReader for ProviderFactory<N> {
) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
self.provider()?.recovered_block_range(range)
}
fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
self.provider()?.block_by_transaction_id(id)
}
}
impl<N: ProviderNodeTypes> TransactionsProvider for ProviderFactory<N> {
@@ -419,24 +390,14 @@ impl<N: ProviderNodeTypes> TransactionsProvider for ProviderFactory<N> {
}
fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
self.static_file_provider.get_with_static_file_or_database(
StaticFileSegment::Transactions,
id,
|static_file| static_file.transaction_by_id(id),
|| self.provider()?.transaction_by_id(id),
)
self.static_file_provider.transaction_by_id(id)
}
fn transaction_by_id_unhashed(
&self,
id: TxNumber,
) -> ProviderResult<Option<Self::Transaction>> {
self.static_file_provider.get_with_static_file_or_database(
StaticFileSegment::Transactions,
id,
|static_file| static_file.transaction_by_id_unhashed(id),
|| self.provider()?.transaction_by_id_unhashed(id),
)
self.static_file_provider.transaction_by_id_unhashed(id)
}
fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
@@ -472,7 +433,7 @@ impl<N: ProviderNodeTypes> TransactionsProvider for ProviderFactory<N> {
&self,
range: impl RangeBounds<TxNumber>,
) -> ProviderResult<Vec<Self::Transaction>> {
self.provider()?.transactions_by_tx_range(range)
self.static_file_provider.transactions_by_tx_range(range)
}
fn senders_by_tx_range(
@@ -489,6 +450,7 @@ impl<N: ProviderNodeTypes> TransactionsProvider for ProviderFactory<N> {
impl<N: ProviderNodeTypes> ReceiptProvider for ProviderFactory<N> {
type Receipt = ReceiptTy<N>;
fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
self.static_file_provider.get_with_static_file_or_database(
StaticFileSegment::Receipts,
@@ -674,7 +636,6 @@ mod tests {
StaticFileProvider::read_write(static_dir_path).unwrap(),
)
.unwrap();
let provider = factory.provider().unwrap();
provider.block_hash(0).unwrap();
let provider_rw = factory.provider_rw().unwrap();

View File

@@ -22,7 +22,7 @@ use alloy_consensus::{
transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
BlockHeader, TxReceipt,
};
use alloy_eips::{eip2718::Encodable2718, BlockHashOrNumber};
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::{
keccak256,
map::{hash_map, B256Map, HashMap, HashSet},
@@ -42,7 +42,7 @@ use reth_db_api::{
table::Table,
tables,
transaction::{DbTx, DbTxMut},
BlockNumberList, DatabaseError, PlainAccountState, PlainStorageState,
BlockNumberList, PlainAccountState, PlainStorageState,
};
use reth_execution_types::{Chain, ExecutionOutcome};
use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
@@ -74,7 +74,7 @@ use std::{
collections::{BTreeMap, BTreeSet},
fmt::Debug,
ops::{Deref, DerefMut, Not, Range, RangeBounds, RangeInclusive},
sync::{mpsc, Arc},
sync::Arc,
};
use tracing::{debug, trace};
@@ -563,23 +563,6 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
}
impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
fn transactions_by_tx_range_with_cursor<C>(
&self,
range: impl RangeBounds<TxNumber>,
cursor: &mut C,
) -> ProviderResult<Vec<TxTy<N>>>
where
C: DbCursorRO<tables::Transactions<TxTy<N>>>,
{
self.static_file_provider.get_range_with_static_file_or_database(
StaticFileSegment::Transactions,
to_range(range),
|static_file, range, _| static_file.transactions_by_tx_range(range),
|range, _| self.cursor_collect(cursor, range),
|_| true,
)
}
fn recovered_block<H, HF, B, BF>(
&self,
id: BlockHashOrNumber,
@@ -649,7 +632,6 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
let mut blocks = Vec::with_capacity(len);
let headers = headers_range(range.clone())?;
let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<TxTy<N>>>()?;
// If the body indices are not found, this means that the transactions either do
// not exist in the database yet, or they do exit but are
@@ -668,7 +650,7 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
let transactions = if tx_range.is_empty() {
Vec::new()
} else {
self.transactions_by_tx_range_with_cursor(tx_range.clone(), &mut tx_cursor)?
self.transactions_by_tx_range(tx_range.clone())?
};
inputs.push((header.as_ref(), transactions));
@@ -1007,8 +989,8 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
type Header = HeaderTy<N>;
fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
if let Some(num) = self.block_number(*block_hash)? {
fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
if let Some(num) = self.block_number(block_hash)? {
Ok(self.header_by_number(num)?)
} else {
Ok(None)
@@ -1016,16 +998,11 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabasePro
}
fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
self.static_file_provider.get_with_static_file_or_database(
StaticFileSegment::Headers,
num,
|static_file| static_file.header_by_number(num),
|| Ok(self.tx.get::<tables::Headers<Self::Header>>(num)?),
)
self.static_file_provider.header_by_number(num)
}
fn header_td(&self, block_hash: &BlockHash) -> ProviderResult<Option<U256>> {
if let Some(num) = self.block_number(*block_hash)? {
fn header_td(&self, block_hash: BlockHash) -> ProviderResult<Option<U256>> {
if let Some(num) = self.block_number(block_hash)? {
self.header_td_by_number(num)
} else {
Ok(None)
@@ -1041,46 +1018,21 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabasePro
return Ok(Some(td))
}
self.static_file_provider.get_with_static_file_or_database(
StaticFileSegment::Headers,
number,
|static_file| static_file.header_td_by_number(number),
|| Ok(self.tx.get::<tables::HeaderTerminalDifficulties>(number)?.map(|td| td.0)),
)
self.static_file_provider.header_td_by_number(number)
}
fn headers_range(
&self,
range: impl RangeBounds<BlockNumber>,
) -> ProviderResult<Vec<Self::Header>> {
self.static_file_provider.get_range_with_static_file_or_database(
StaticFileSegment::Headers,
to_range(range),
|static_file, range, _| static_file.headers_range(range),
|range, _| self.cursor_read_collect::<tables::Headers<Self::Header>>(range),
|_| true,
)
self.static_file_provider.headers_range(range)
}
fn sealed_header(
&self,
number: BlockNumber,
) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
self.static_file_provider.get_with_static_file_or_database(
StaticFileSegment::Headers,
number,
|static_file| static_file.sealed_header(number),
|| {
if let Some(header) = self.header_by_number(number)? {
let hash = self
.block_hash(number)?
.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
Ok(Some(SealedHeader::new(header, hash)))
} else {
Ok(None)
}
},
)
self.static_file_provider.sealed_header(number)
}
fn sealed_headers_while(
@@ -1088,40 +1040,13 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabasePro
range: impl RangeBounds<BlockNumber>,
predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
self.static_file_provider.get_range_with_static_file_or_database(
StaticFileSegment::Headers,
to_range(range),
|static_file, range, predicate| static_file.sealed_headers_while(range, predicate),
|range, mut predicate| {
let mut headers = vec![];
for entry in
self.tx.cursor_read::<tables::Headers<Self::Header>>()?.walk_range(range)?
{
let (number, header) = entry?;
let hash = self
.block_hash(number)?
.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
let sealed = SealedHeader::new(header, hash);
if !predicate(&sealed) {
break
}
headers.push(sealed);
}
Ok(headers)
},
predicate,
)
self.static_file_provider.sealed_headers_while(range, predicate)
}
}
impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
self.static_file_provider.get_with_static_file_or_database(
StaticFileSegment::Headers,
number,
|static_file| static_file.block_hash(number),
|| Ok(self.tx.get::<tables::CanonicalHeaders>(number)?),
)
self.static_file_provider.block_hash(number)
}
fn canonical_hashes_range(
@@ -1129,13 +1054,7 @@ impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX,
start: BlockNumber,
end: BlockNumber,
) -> ProviderResult<Vec<B256>> {
self.static_file_provider.get_range_with_static_file_or_database(
StaticFileSegment::Headers,
start..end,
|static_file, range, _| static_file.canonical_hashes_range(range.start, range.end),
|range, _| self.cursor_read_collect::<tables::CanonicalHeaders>(range),
|_| true,
)
self.static_file_provider.canonical_hashes_range(start, end)
}
}
@@ -1156,15 +1075,7 @@ impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N
}
fn last_block_number(&self) -> ProviderResult<BlockNumber> {
Ok(self
.tx
.cursor_read::<tables::CanonicalHeaders>()?
.last()?
.map(|(num, _)| num)
.max(
self.static_file_provider.get_highest_static_file_block(StaticFileSegment::Headers),
)
.unwrap_or_default())
self.static_file_provider.last_block_number()
}
fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
@@ -1216,6 +1127,7 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvid
Ok(None)
}
fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
Ok(None)
}
@@ -1313,6 +1225,14 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvid
},
)
}
fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
Ok(self
.tx
.cursor_read::<tables::TransactionBlocks>()?
.seek(id)
.map(|b| b.map(|(_, bn)| bn))?)
}
}
impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
@@ -1324,66 +1244,7 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
&self,
tx_range: Range<TxNumber>,
) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
self.static_file_provider.get_range_with_static_file_or_database(
StaticFileSegment::Transactions,
tx_range,
|static_file, range, _| static_file.transaction_hashes_by_range(range),
|tx_range, _| {
let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<TxTy<N>>>()?;
let tx_range_size = tx_range.clone().count();
let tx_walker = tx_cursor.walk_range(tx_range)?;
let chunk_size = (tx_range_size / rayon::current_num_threads()).max(1);
let mut channels = Vec::with_capacity(chunk_size);
let mut transaction_count = 0;
#[inline]
fn calculate_hash<T>(
entry: Result<(TxNumber, T), DatabaseError>,
rlp_buf: &mut Vec<u8>,
) -> Result<(B256, TxNumber), Box<ProviderError>>
where
T: Encodable2718,
{
let (tx_id, tx) = entry.map_err(|e| Box::new(e.into()))?;
tx.encode_2718(rlp_buf);
Ok((keccak256(rlp_buf), tx_id))
}
for chunk in &tx_walker.chunks(chunk_size) {
let (tx, rx) = mpsc::channel();
channels.push(rx);
// Note: Unfortunate side-effect of how chunk is designed in itertools (it is
// not Send)
let chunk: Vec<_> = chunk.collect();
transaction_count += chunk.len();
// Spawn the task onto the global rayon pool
// This task will send the results through the channel after it has calculated
// the hash.
rayon::spawn(move || {
let mut rlp_buf = Vec::with_capacity(128);
for entry in chunk {
rlp_buf.clear();
let _ = tx.send(calculate_hash(entry, &mut rlp_buf));
}
});
}
let mut tx_list = Vec::with_capacity(transaction_count);
// Iterate over channels and append the tx hashes unsorted
for channel in channels {
while let Ok(tx) = channel.recv() {
let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
tx_list.push((tx_hash, tx_id));
}
}
Ok(tx_list)
},
|_| true,
)
self.static_file_provider.transaction_hashes_by_range(tx_range)
}
}
@@ -1396,24 +1257,14 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for Datab
}
fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
self.static_file_provider.get_with_static_file_or_database(
StaticFileSegment::Transactions,
id,
|static_file| static_file.transaction_by_id(id),
|| Ok(self.tx.get::<tables::Transactions<Self::Transaction>>(id)?),
)
self.static_file_provider.transaction_by_id(id)
}
fn transaction_by_id_unhashed(
&self,
id: TxNumber,
) -> ProviderResult<Option<Self::Transaction>> {
self.static_file_provider.get_with_static_file_or_database(
StaticFileSegment::Transactions,
id,
|static_file| static_file.transaction_by_id_unhashed(id),
|| Ok(self.tx.get::<tables::Transactions<Self::Transaction>>(id)?),
)
self.static_file_provider.transaction_by_id_unhashed(id)
}
fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
@@ -1428,11 +1279,9 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for Datab
&self,
tx_hash: TxHash,
) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
let mut transaction_cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
if let Some(transaction_id) = self.transaction_id(tx_hash)? &&
let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? &&
let Some(block_number) =
transaction_cursor.seek(transaction_id).map(|b| b.map(|(_, bn)| bn))? &&
let Some(block_number) = self.block_by_transaction_id(transaction_id)? &&
let Some(sealed_header) = self.sealed_header(block_number)?
{
let (header, block_hash) = sealed_header.split();
@@ -1469,8 +1318,6 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for Datab
&self,
id: BlockHashOrNumber,
) -> ProviderResult<Option<Vec<Self::Transaction>>> {
let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<Self::Transaction>>()?;
if let Some(block_number) = self.convert_hash_or_number(id)? &&
let Some(body) = self.block_body_indices(block_number)?
{
@@ -1478,7 +1325,7 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for Datab
return if tx_range.is_empty() {
Ok(Some(Vec::new()))
} else {
Ok(Some(self.transactions_by_tx_range_with_cursor(tx_range, &mut tx_cursor)?))
self.transactions_by_tx_range(tx_range).map(Some)
}
}
Ok(None)
@@ -1489,7 +1336,6 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for Datab
range: impl RangeBounds<BlockNumber>,
) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
let range = to_range(range);
let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<Self::Transaction>>()?;
self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
.into_iter()
@@ -1498,10 +1344,7 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for Datab
if tx_num_range.is_empty() {
Ok(Vec::new())
} else {
Ok(self
.transactions_by_tx_range_with_cursor(tx_num_range, &mut tx_cursor)?
.into_iter()
.collect())
self.transactions_by_tx_range(tx_num_range)
}
})
.collect()
@@ -1511,10 +1354,7 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for Datab
&self,
range: impl RangeBounds<TxNumber>,
) -> ProviderResult<Vec<Self::Transaction>> {
self.transactions_by_tx_range_with_cursor(
range,
&mut self.tx.cursor_read::<tables::Transactions<_>>()?,
)
self.static_file_provider.transactions_by_tx_range(range)
}
fn senders_by_tx_range(
@@ -2698,16 +2538,17 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWrite
type Block = BlockTy<N>;
type Receipt = ReceiptTy<N>;
/// Inserts the block into the database, always modifying the following tables:
/// * [`CanonicalHeaders`](tables::CanonicalHeaders)
/// * [`Headers`](tables::Headers)
/// * [`HeaderNumbers`](tables::HeaderNumbers)
/// * [`HeaderTerminalDifficulties`](tables::HeaderTerminalDifficulties)
/// * [`BlockBodyIndices`](tables::BlockBodyIndices)
/// Inserts the block into the database, always modifying the following static file segments and
/// tables:
/// * [`StaticFileSegment::Headers`]
/// * [`tables::HeaderNumbers`]
/// * [`tables::HeaderTerminalDifficulties`]
/// * [`tables::BlockBodyIndices`]
///
/// If there are transactions in the block, the following tables will be modified:
/// * [`Transactions`](tables::Transactions)
/// * [`TransactionBlocks`](tables::TransactionBlocks)
/// If there are transactions in the block, the following static file segments and tables will
/// be modified:
/// * [`StaticFileSegment::Transactions`]
/// * [`tables::TransactionBlocks`]
///
/// If ommers are not empty, this will modify [`BlockOmmers`](tables::BlockOmmers).
/// If withdrawals are not empty, this will modify

View File

@@ -89,11 +89,11 @@ impl<'a, N: NodePrimitives> StaticFileJarProvider<'a, N> {
impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileJarProvider<'_, N> {
type Header = N::BlockHeader;
fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
Ok(self
.cursor()?
.get_two::<HeaderWithHashMask<Self::Header>>(block_hash.into())?
.filter(|(_, hash)| hash == block_hash)
.get_two::<HeaderWithHashMask<Self::Header>>((&block_hash).into())?
.filter(|(_, hash)| hash == &block_hash)
.map(|(header, _)| header))
}
@@ -101,11 +101,11 @@ impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileJarProv
self.cursor()?.get_one::<HeaderMask<Self::Header>>(num.into())
}
fn header_td(&self, block_hash: &BlockHash) -> ProviderResult<Option<U256>> {
fn header_td(&self, block_hash: BlockHash) -> ProviderResult<Option<U256>> {
Ok(self
.cursor()?
.get_two::<TDWithHashMask>(block_hash.into())?
.filter(|(_, hash)| hash == block_hash)
.get_two::<TDWithHashMask>((&block_hash).into())?
.filter(|(_, hash)| hash == &block_hash)
.map(|(td, _)| td.into()))
}

View File

@@ -1109,16 +1109,31 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
F: FnMut(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>>,
P: FnMut(&T) -> bool,
{
let get_provider = |start: u64| {
if segment.is_block_based() {
self.get_segment_provider_from_block(segment, start, None)
} else {
self.get_segment_provider_from_transaction(segment, start, None)
}
};
let mut result = Vec::with_capacity((range.end - range.start).min(100) as usize);
let mut provider = get_provider(range.start)?;
/// Resolves to the provider for the given block or transaction number.
///
/// If the static file is missing, the `result` is returned.
macro_rules! get_provider {
($number:expr) => {{
let provider = if segment.is_block_based() {
self.get_segment_provider_from_block(segment, $number, None)
} else {
self.get_segment_provider_from_transaction(segment, $number, None)
};
match provider {
Ok(provider) => provider,
Err(
ProviderError::MissingStaticFileBlock(_, _) |
ProviderError::MissingStaticFileTx(_, _),
) => return Ok(result),
Err(err) => return Err(err),
}
}};
}
let mut provider = get_provider!(range.start);
let mut cursor = provider.cursor()?;
// advances number in range
@@ -1140,19 +1155,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
}
None => {
if retrying {
warn!(
target: "provider::static_file",
?segment,
?number,
"Could not find block or tx number on a range request"
);
let err = if segment.is_block_based() {
ProviderError::MissingStaticFileBlock(segment, number)
} else {
ProviderError::MissingStaticFileTx(segment, number)
};
return Err(err)
return Ok(result)
}
// There is a very small chance of hitting a deadlock if two consecutive
// static files share the same bucket in the
@@ -1160,7 +1163,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
// before requesting the next one.
drop(cursor);
drop(provider);
provider = get_provider(number)?;
provider = get_provider!(number);
cursor = provider.cursor()?;
retrying = true;
}
@@ -1374,13 +1377,13 @@ impl<N: NodePrimitives> StaticFileWriter for StaticFileProvider<N> {
impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileProvider<N> {
type Header = N::BlockHeader;
fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
Ok(jar_provider
.cursor()?
.get_two::<HeaderWithHashMask<Self::Header>>(block_hash.into())?
.get_two::<HeaderWithHashMask<Self::Header>>((&block_hash).into())?
.and_then(|(header, hash)| {
if &hash == block_hash {
if hash == block_hash {
return Some(header)
}
None
@@ -1400,12 +1403,12 @@ impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileProvide
})
}
fn header_td(&self, block_hash: &BlockHash) -> ProviderResult<Option<U256>> {
fn header_td(&self, block_hash: BlockHash) -> ProviderResult<Option<U256>> {
self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
Ok(jar_provider
.cursor()?
.get_two::<TDWithHashMask>(block_hash.into())?
.and_then(|(td, hash)| (&hash == block_hash).then_some(td.0)))
.get_two::<TDWithHashMask>((&block_hash).into())?
.and_then(|(td, hash)| (hash == block_hash).then_some(td.0)))
})
}
@@ -1468,7 +1471,15 @@ impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileProvide
impl<N: NodePrimitives> BlockHashReader for StaticFileProvider<N> {
fn block_hash(&self, num: u64) -> ProviderResult<Option<B256>> {
self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)?.block_hash(num)
self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)
.and_then(|provider| provider.block_hash(num))
.or_else(|err| {
if let ProviderError::MissingStaticFileBlock(_, _) = err {
Ok(None)
} else {
Err(err)
}
})
}
fn canonical_hashes_range(
@@ -1712,8 +1723,6 @@ impl<N: NodePrimitives<SignedTx: Decompress + SignedTransaction>> TransactionsPr
}
}
/* Cannot be successfully implemented but must exist for trait requirements */
impl<N: NodePrimitives> BlockNumReader for StaticFileProvider<N> {
fn chain_info(&self) -> ProviderResult<ChainInfo> {
// Required data not present in static_files
@@ -1726,8 +1735,7 @@ impl<N: NodePrimitives> BlockNumReader for StaticFileProvider<N> {
}
fn last_block_number(&self) -> ProviderResult<BlockNumber> {
// Required data not present in static_files
Err(ProviderError::UnsupportedProvider)
Ok(self.get_highest_static_file_block(StaticFileSegment::Headers).unwrap_or_default())
}
fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
@@ -1736,6 +1744,8 @@ impl<N: NodePrimitives> BlockNumReader for StaticFileProvider<N> {
}
}
/* Cannot be successfully implemented but must exist for trait requirements */
impl<N: FullNodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> BlockReader
for StaticFileProvider<N>
{
@@ -1803,6 +1813,10 @@ impl<N: FullNodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>>
) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
Err(ProviderError::UnsupportedProvider)
}
fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
Err(ProviderError::UnsupportedProvider)
}
}
impl<N: NodePrimitives> BlockBodyIndicesProvider for StaticFileProvider<N> {

View File

@@ -146,12 +146,12 @@ mod tests {
let header = header.unseal();
// Compare Header
assert_eq!(header, db_provider.header(&header_hash).unwrap().unwrap());
assert_eq!(header, db_provider.header(header_hash).unwrap().unwrap());
assert_eq!(header, jar_provider.header_by_number(header.number).unwrap().unwrap());
// Compare HeaderTerminalDifficulties
assert_eq!(
db_provider.header_td(&header_hash).unwrap().unwrap(),
db_provider.header_td(header_hash).unwrap().unwrap(),
jar_provider.header_td_by_number(header.number).unwrap().unwrap()
);
}

View File

@@ -281,9 +281,9 @@ impl<T: NodePrimitives, ChainSpec: EthChainSpec + Send + Sync + 'static> HeaderP
{
type Header = <T::Block as Block>::Header;
fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
let lock = self.headers.lock();
Ok(lock.get(block_hash).cloned())
Ok(lock.get(&block_hash).cloned())
}
fn header_by_number(&self, num: u64) -> ProviderResult<Option<Self::Header>> {
@@ -291,9 +291,9 @@ impl<T: NodePrimitives, ChainSpec: EthChainSpec + Send + Sync + 'static> HeaderP
Ok(lock.values().find(|h| h.number() == num).cloned())
}
fn header_td(&self, hash: &BlockHash) -> ProviderResult<Option<U256>> {
fn header_td(&self, hash: BlockHash) -> ProviderResult<Option<U256>> {
let lock = self.headers.lock();
Ok(lock.get(hash).map(|target| {
Ok(lock.get(&hash).map(|target| {
lock.values()
.filter(|h| h.number() < target.number())
.fold(target.difficulty(), |td, h| td + h.difficulty())
@@ -718,6 +718,10 @@ impl<T: NodePrimitives, ChainSpec: EthChainSpec + Send + Sync + 'static> BlockRe
) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
Ok(vec![])
}
fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
Ok(None)
}
}
impl<T, ChainSpec> BlockReaderIdExt for MockEthProvider<T, ChainSpec>

View File

@@ -338,9 +338,9 @@ where
{
type Header = HeaderTy<Node>;
fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
let block_response = self.block_on_async(async {
self.provider.get_block_by_hash(*block_hash).await.map_err(ProviderError::other)
self.provider.get_block_by_hash(block_hash).await.map_err(ProviderError::other)
})?;
let Some(block_response) = block_response else {
@@ -364,7 +364,7 @@ where
Ok(Some(sealed_header.into_header()))
}
fn header_td(&self, hash: &BlockHash) -> ProviderResult<Option<U256>> {
fn header_td(&self, hash: BlockHash) -> ProviderResult<Option<U256>> {
let header = self.header(hash).map_err(ProviderError::other)?;
Ok(header.map(|b| b.difficulty()))
@@ -510,6 +510,10 @@ where
) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
Err(ProviderError::UnsupportedProvider)
}
fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
Err(ProviderError::UnsupportedProvider)
}
}
impl<P, Node, N> BlockReaderIdExt for RpcBlockchainProvider<P, Node, N>
@@ -1539,6 +1543,10 @@ where
) -> Result<Vec<RecoveredBlock<Self::Block>>, ProviderError> {
Err(ProviderError::UnsupportedProvider)
}
fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
Err(ProviderError::UnsupportedProvider)
}
}
impl<P, Node, N> TransactionsProvider for RpcBlockchainStateProvider<P, Node, N>
@@ -1658,7 +1666,7 @@ where
{
type Header = HeaderTy<Node>;
fn header(&self, _block_hash: &BlockHash) -> Result<Option<Self::Header>, ProviderError> {
fn header(&self, _block_hash: BlockHash) -> Result<Option<Self::Header>, ProviderError> {
Err(ProviderError::UnsupportedProvider)
}
@@ -1666,7 +1674,7 @@ where
Err(ProviderError::UnsupportedProvider)
}
fn header_td(&self, _hash: &BlockHash) -> Result<Option<U256>, ProviderError> {
fn header_td(&self, _hash: BlockHash) -> Result<Option<U256>, ProviderError> {
Err(ProviderError::UnsupportedProvider)
}

View File

@@ -4,7 +4,7 @@ use crate::{
};
use alloc::{sync::Arc, vec::Vec};
use alloy_eips::{BlockHashOrNumber, BlockId, BlockNumberOrTag};
use alloy_primitives::{BlockNumber, B256};
use alloy_primitives::{BlockNumber, TxNumber, B256};
use core::ops::RangeInclusive;
use reth_primitives_traits::{RecoveredBlock, SealedHeader};
use reth_storage_errors::provider::ProviderResult;
@@ -144,6 +144,9 @@ pub trait BlockReader:
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>>;
/// Returns the block number that contains the given transaction.
fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>>;
}
impl<T: BlockReader> BlockReader for Arc<T> {
@@ -202,6 +205,9 @@ impl<T: BlockReader> BlockReader for Arc<T> {
) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
T::recovered_block_range(self, range)
}
fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
T::block_by_transaction_id(self, id)
}
}
impl<T: BlockReader> BlockReader for &T {
@@ -260,6 +266,9 @@ impl<T: BlockReader> BlockReader for &T {
) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
T::recovered_block_range(self, range)
}
fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
T::block_by_transaction_id(self, id)
}
}
/// Trait extension for `BlockReader`, for types that implement `BlockId` conversion.

View File

@@ -15,19 +15,19 @@ pub trait HeaderProvider: Send + Sync {
type Header: BlockHeader;
/// Check if block is known
fn is_known(&self, block_hash: &BlockHash) -> ProviderResult<bool> {
fn is_known(&self, block_hash: BlockHash) -> ProviderResult<bool> {
self.header(block_hash).map(|header| header.is_some())
}
/// Get header by block hash
fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>>;
fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>>;
/// Retrieves the header sealed by the given block hash.
fn sealed_header_by_hash(
&self,
block_hash: BlockHash,
) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
Ok(self.header(&block_hash)?.map(|header| SealedHeader::new(header, block_hash)))
Ok(self.header(block_hash)?.map(|header| SealedHeader::new(header, block_hash)))
}
/// Get header by block number
@@ -39,13 +39,13 @@ pub trait HeaderProvider: Send + Sync {
hash_or_num: BlockHashOrNumber,
) -> ProviderResult<Option<Self::Header>> {
match hash_or_num {
BlockHashOrNumber::Hash(hash) => self.header(&hash),
BlockHashOrNumber::Hash(hash) => self.header(hash),
BlockHashOrNumber::Number(num) => self.header_by_number(num),
}
}
/// Get total difficulty by block hash.
fn header_td(&self, hash: &BlockHash) -> ProviderResult<Option<U256>>;
fn header_td(&self, hash: BlockHash) -> ProviderResult<Option<U256>>;
/// Get total difficulty by block number.
fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult<Option<U256>>;

View File

@@ -237,6 +237,10 @@ impl<C: Send + Sync, N: NodePrimitives> BlockReader for NoopProvider<C, N> {
) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
Ok(Vec::new())
}
fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
Ok(None)
}
}
impl<C: Send + Sync, N: NodePrimitives> TransactionsProvider for NoopProvider<C, N> {
@@ -343,7 +347,7 @@ impl<C: Send + Sync, N: NodePrimitives> ReceiptProviderIdExt for NoopProvider<C,
impl<C: Send + Sync, N: NodePrimitives> HeaderProvider for NoopProvider<C, N> {
type Header = N::BlockHeader;
fn header(&self, _block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
fn header(&self, _block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
Ok(None)
}
@@ -351,7 +355,7 @@ impl<C: Send + Sync, N: NodePrimitives> HeaderProvider for NoopProvider<C, N> {
Ok(None)
}
fn header_td(&self, _hash: &BlockHash) -> ProviderResult<Option<U256>> {
fn header_td(&self, _hash: BlockHash) -> ProviderResult<Option<U256>> {
Ok(None)
}

View File

@@ -63,7 +63,7 @@ fn header_provider_example<T: HeaderProvider>(provider: T, number: u64) -> eyre:
// Can also query the header by hash!
let header_by_hash =
provider.header(&sealed_header.hash())?.ok_or(eyre::eyre!("header by hash not found"))?;
provider.header(sealed_header.hash())?.ok_or(eyre::eyre!("header by hash not found"))?;
assert_eq!(sealed_header.header(), &header_by_hash);
// The header's total difficulty is stored in a separate table, so we have a separate call for