chore(tree): migrate tree externals to ProviderFactory (#5531)

This commit is contained in:
Roman Krasiuk
2023-11-22 07:09:53 -08:00
committed by GitHub
parent 3818dea392
commit 3c7f32d839
6 changed files with 69 additions and 78 deletions

View File

@@ -142,15 +142,15 @@ impl Command {
// initialize the database
let db = Arc::new(init_db(db_path, self.db.log_level)?);
let provider_factory = ProviderFactory::new(Arc::clone(&db), Arc::clone(&self.chain));
let consensus: Arc<dyn Consensus> = Arc::new(BeaconConsensus::new(Arc::clone(&self.chain)));
// configure blockchain tree
let tree_externals = TreeExternals::new(
Arc::clone(&db),
provider_factory.clone(),
Arc::clone(&consensus),
Factory::new(self.chain.clone()),
Arc::clone(&self.chain),
);
let tree = BlockchainTree::new(tree_externals, BlockchainTreeConfig::default(), None)?;
let blockchain_tree = ShareableBlockchainTree::new(tree);
@@ -159,8 +159,8 @@ impl Command {
let best_block =
self.lookup_best_block(Arc::clone(&db)).wrap_err("the head block is missing")?;
let factory = ProviderFactory::new(Arc::clone(&db), Arc::clone(&self.chain));
let blockchain_db = BlockchainProvider::new(factory.clone(), blockchain_tree.clone())?;
let blockchain_db =
BlockchainProvider::new(provider_factory.clone(), blockchain_tree.clone())?;
let blob_store = InMemoryBlobStore::default();
let validator = TransactionValidationTaskExecutor::eth_builder(Arc::clone(&self.chain))
@@ -278,7 +278,7 @@ impl Command {
debug!(target: "reth::cli", ?state, "Executed block");
// Attempt to insert new block without committing
let provider_rw = factory.provider_rw()?;
let provider_rw = provider_factory.provider_rw()?;
provider_rw.append_blocks_with_bundle_state(
Vec::from([block_with_senders]),
state,

View File

@@ -259,6 +259,16 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
let db = Arc::new(init_db(&db_path, self.db.log_level)?.with_metrics());
info!(target: "reth::cli", "Database opened");
// configure snapshotter
let snapshotter = reth_snapshot::Snapshotter::new(
db.clone(),
data_dir.snapshots_path(),
self.chain.clone(),
self.chain.snapshot_block_interval,
)?;
let provider_factory = ProviderFactory::new(Arc::clone(&db), Arc::clone(&self.chain))
.with_snapshots(data_dir.snapshots_path(), snapshotter.highest_snapshot_receiver());
self.start_metrics_endpoint(prometheus_handle, Arc::clone(&db)).await?;
debug!(target: "reth::cli", chain=%self.chain.chain, genesis=?self.chain.genesis_hash(), "Initializing genesis");
@@ -281,10 +291,9 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
// configure blockchain tree
let tree_externals = TreeExternals::new(
Arc::clone(&db),
provider_factory.clone(),
Arc::clone(&consensus),
Factory::new(self.chain.clone()),
Arc::clone(&self.chain),
);
let tree = BlockchainTree::new(
tree_externals,
@@ -299,18 +308,8 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
// fetch the head block from the database
let head = self.lookup_head(Arc::clone(&db)).wrap_err("the head block is missing")?;
// configure snapshotter
let snapshotter = reth_snapshot::Snapshotter::new(
db.clone(),
data_dir.snapshots_path(),
self.chain.clone(),
self.chain.snapshot_block_interval,
)?;
// setup the blockchain provider
let factory = ProviderFactory::new(Arc::clone(&db), Arc::clone(&self.chain))
.with_snapshots(data_dir.snapshots_path(), snapshotter.highest_snapshot_receiver());
let blockchain_db = BlockchainProvider::new(factory, blockchain_tree.clone())?;
let blockchain_db = BlockchainProvider::new(provider_factory, blockchain_tree.clone())?;
let blob_store = InMemoryBlobStore::default();
let validator = TransactionValidationTaskExecutor::eth_builder(Arc::clone(&self.chain))
.with_head_timestamp(head.timestamp)

View File

@@ -24,7 +24,7 @@ use reth_provider::{
chain::{ChainSplit, ChainSplitTarget},
BlockExecutionWriter, BlockNumReader, BlockWriter, BundleStateWithReceipts,
CanonStateNotification, CanonStateNotificationSender, CanonStateNotifications, Chain,
DatabaseProvider, DisplayBlocksChain, ExecutorFactory, HeaderProvider,
ChainSpecProvider, DisplayBlocksChain, ExecutorFactory, HeaderProvider,
};
use reth_stages::{MetricEvent, MetricEventsSender};
use std::{collections::BTreeMap, sync::Arc};
@@ -161,7 +161,7 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTree<DB, EF> {
}
// check if block is inside database
if self.externals.database().provider()?.block_number(block.hash)?.is_some() {
if self.externals.provider_factory.provider()?.block_number(block.hash)?.is_some() {
return Ok(Some(BlockStatus::Valid))
}
@@ -380,8 +380,9 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTree<DB, EF> {
// https://github.com/paradigmxyz/reth/issues/1713
let (block_status, chain) = {
let factory = self.externals.database();
let provider = factory
let provider = self
.externals
.provider_factory
.provider()
.map_err(|err| InsertBlockError::new(block.block.clone(), err.into()))?;
@@ -397,7 +398,12 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTree<DB, EF> {
})?;
// Pass the parent total difficulty to short-circuit unnecessary calculations.
if !self.externals.chain_spec.fork(Hardfork::Paris).active_at_ttd(parent_td, U256::ZERO)
if !self
.externals
.provider_factory
.chain_spec()
.fork(Hardfork::Paris)
.active_at_ttd(parent_td, U256::ZERO)
{
return Err(InsertBlockError::execution_error(
BlockValidationError::BlockPreMerge { hash: block.hash }.into(),
@@ -881,8 +887,7 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTree<DB, EF> {
// canonical, but in the db. If it is in a sidechain, it is not canonical. If it is not in
// the db, then it is not canonical.
let factory = self.externals.database();
let provider = factory.provider()?;
let provider = self.externals.provider_factory.provider()?;
let mut header = None;
if let Some(num) = self.block_indices().get_canonical_block_number(hash) {
@@ -930,12 +935,18 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTree<DB, EF> {
if let Some(header) = canonical_header {
info!(target: "blockchain_tree", ?block_hash, "Block is already canonical, ignoring.");
// TODO: this could be fetched from the chainspec first
let td = self.externals.database().provider()?.header_td(block_hash)?.ok_or(
let td = self.externals.provider_factory.provider()?.header_td(block_hash)?.ok_or(
CanonicalError::from(BlockValidationError::MissingTotalDifficulty {
hash: *block_hash,
}),
)?;
if !self.externals.chain_spec.fork(Hardfork::Paris).active_at_ttd(td, U256::ZERO) {
if !self
.externals
.provider_factory
.chain_spec()
.fork(Hardfork::Paris)
.active_at_ttd(td, U256::ZERO)
{
return Err(CanonicalError::from(BlockValidationError::BlockPreMerge {
hash: *block_hash,
})
@@ -1093,14 +1104,11 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTree<DB, EF> {
/// Write the given chain to the database as canonical.
fn commit_canonical_to_database(&self, chain: Chain) -> RethResult<()> {
let provider = DatabaseProvider::new_rw(
self.externals.db.tx_mut()?,
self.externals.chain_spec.clone(),
);
let provider_rw = self.externals.provider_factory.provider_rw()?;
let (blocks, state) = chain.into_inner();
provider
provider_rw
.append_blocks_with_bundle_state(
blocks.into_blocks().collect(),
state,
@@ -1108,7 +1116,7 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTree<DB, EF> {
)
.map_err(|e| BlockExecutionError::CanonicalCommit { inner: e.to_string() })?;
provider.commit()?;
provider_rw.commit()?;
Ok(())
}
@@ -1141,21 +1149,20 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTree<DB, EF> {
revert_until: BlockNumber,
) -> RethResult<Option<Chain>> {
// read data that is needed for new sidechain
let provider_rw = self.externals.provider_factory.provider_rw()?;
let provider = DatabaseProvider::new_rw(
self.externals.db.tx_mut()?,
self.externals.chain_spec.clone(),
);
let tip = provider.last_block_number()?;
let tip = provider_rw.last_block_number()?;
let revert_range = (revert_until + 1)..=tip;
info!(target: "blockchain_tree", "Unwinding canonical chain blocks: {:?}", revert_range);
// read block and execution result from database. and remove traces of block from tables.
let blocks_and_execution = provider
.take_block_and_execution_range(self.externals.chain_spec.as_ref(), revert_range)
let blocks_and_execution = provider_rw
.take_block_and_execution_range(
self.externals.provider_factory.chain_spec().as_ref(),
revert_range,
)
.map_err(|e| BlockExecutionError::CanonicalRevert { inner: e.to_string() })?;
provider.commit()?;
provider_rw.commit()?;
if blocks_and_execution.is_empty() {
Ok(None)
@@ -1197,18 +1204,16 @@ mod tests {
use crate::block_buffer::BufferedBlocks;
use assert_matches::assert_matches;
use linked_hash_set::LinkedHashSet;
use reth_db::{
tables,
test_utils::{create_test_rw_db, TempDatabase},
transaction::DbTxMut,
DatabaseEnv,
};
use reth_db::{tables, test_utils::TempDatabase, transaction::DbTxMut, DatabaseEnv};
use reth_interfaces::test_utils::TestConsensus;
use reth_primitives::{
constants::EMPTY_ROOT_HASH, stage::StageCheckpoint, ChainSpecBuilder, B256, MAINNET,
};
use reth_provider::{
test_utils::{blocks::BlockChainTestData, TestExecutorFactory},
test_utils::{
blocks::BlockChainTestData, create_test_provider_factory_with_chain_spec,
TestExecutorFactory,
},
BlockWriter, BundleStateWithReceipts, ProviderFactory,
};
use std::{
@@ -1219,8 +1224,6 @@ mod tests {
fn setup_externals(
exec_res: Vec<BundleStateWithReceipts>,
) -> TreeExternals<Arc<TempDatabase<DatabaseEnv>>, TestExecutorFactory> {
let db = create_test_rw_db();
let consensus = Arc::new(TestConsensus::default());
let chain_spec = Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
@@ -1228,18 +1231,19 @@ mod tests {
.shanghai_activated()
.build(),
);
let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
let consensus = Arc::new(TestConsensus::default());
let executor_factory = TestExecutorFactory::new(chain_spec.clone());
executor_factory.extend(exec_res);
TreeExternals::new(db, consensus, executor_factory, chain_spec)
TreeExternals::new(provider_factory, consensus, executor_factory)
}
fn setup_genesis<DB: Database>(db: DB, mut genesis: SealedBlock) {
fn setup_genesis<DB: Database>(factory: &ProviderFactory<DB>, mut genesis: SealedBlock) {
// insert genesis to db.
genesis.header.header.number = 10;
genesis.header.header.state_root = EMPTY_ROOT_HASH;
let factory = ProviderFactory::new(&db, MAINNET.clone());
let provider = factory.provider_rw().unwrap();
provider.insert_block(genesis, None, None).unwrap();
@@ -1339,7 +1343,7 @@ mod tests {
let externals = setup_externals(vec![exec2.clone(), exec1.clone(), exec2, exec1]);
// last finalized block would be number 9.
setup_genesis(externals.db.clone(), genesis);
setup_genesis(&externals.provider_factory, genesis);
// make tree
let config = BlockchainTreeConfig::new(1, 2, 3, 2);

View File

@@ -206,9 +206,9 @@ impl AppendableChain {
let block = block.unseal();
// get the state provider.
let db = externals.database();
let canonical_fork = bundle_state_data_provider.canonical_fork();
let state_provider = db.history_by_block_number(canonical_fork.number)?;
let state_provider =
externals.provider_factory.history_by_block_number(canonical_fork.number)?;
let provider = BundleStateProvider::new(state_provider, bundle_state_data_provider);

View File

@@ -2,7 +2,7 @@
use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx};
use reth_interfaces::{consensus::Consensus, RethResult};
use reth_primitives::{BlockHash, BlockNumber, ChainSpec};
use reth_primitives::{BlockHash, BlockNumber};
use reth_provider::ProviderFactory;
use std::{collections::BTreeMap, sync::Arc};
@@ -17,34 +17,26 @@ use std::{collections::BTreeMap, sync::Arc};
/// - The chain spec
#[derive(Debug)]
pub struct TreeExternals<DB, EF> {
/// The database, used to commit the canonical chain, or unwind it.
pub(crate) db: DB,
/// The provider factory, used to commit the canonical chain, or unwind it.
pub(crate) provider_factory: ProviderFactory<DB>,
/// The consensus engine.
pub(crate) consensus: Arc<dyn Consensus>,
/// The executor factory to execute blocks with.
pub(crate) executor_factory: EF,
/// The chain spec.
pub(crate) chain_spec: Arc<ChainSpec>,
}
impl<DB, EF> TreeExternals<DB, EF> {
/// Create new tree externals.
pub fn new(
db: DB,
provider_factory: ProviderFactory<DB>,
consensus: Arc<dyn Consensus>,
executor_factory: EF,
chain_spec: Arc<ChainSpec>,
) -> Self {
Self { db, consensus, executor_factory, chain_spec }
Self { provider_factory, consensus, executor_factory }
}
}
impl<DB: Database, EF> TreeExternals<DB, EF> {
/// Return shareable database helper structure.
pub fn database(&self) -> ProviderFactory<&DB> {
ProviderFactory::new(&self.db, self.chain_spec.clone())
}
/// Fetches the latest canonical block hashes by walking backwards from the head.
///
/// Returns the hashes sorted by increasing block numbers
@@ -53,8 +45,9 @@ impl<DB: Database, EF> TreeExternals<DB, EF> {
num_hashes: usize,
) -> RethResult<BTreeMap<BlockNumber, BlockHash>> {
Ok(self
.db
.tx()?
.provider_factory
.provider()?
.tx_ref()
.cursor_read::<tables::CanonicalHeaders>()?
.walk_back(None)?
.take(num_hashes)

View File

@@ -519,12 +519,7 @@ where
let pipeline = pipeline.build(db.clone(), self.base_config.chain_spec.clone());
// Setup blockchain tree
let externals = TreeExternals::new(
db.clone(),
consensus,
executor_factory,
self.base_config.chain_spec.clone(),
);
let externals = TreeExternals::new(provider_factory.clone(), consensus, executor_factory);
let config = BlockchainTreeConfig::new(1, 2, 3, 2);
let tree = ShareableBlockchainTree::new(
BlockchainTree::new(externals, config, None).expect("failed to create tree"),