feat: add forkchoice update tracker type (#2555)

Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com>
This commit is contained in:
Matthias Seitz
2023-05-12 12:24:41 +02:00
committed by GitHub
parent 92b557d84e
commit 08972ca5e4
10 changed files with 271 additions and 38 deletions

View File

@@ -190,7 +190,7 @@ impl Command {
// setup the blockchain provider
let shareable_db = ShareableDatabase::new(Arc::clone(&db), Arc::clone(&self.chain));
let blockchain_db = BlockchainProvider::new(shareable_db, blockchain_tree.clone());
let blockchain_db = BlockchainProvider::new(shareable_db, blockchain_tree.clone())?;
let transaction_pool = reth_transaction_pool::Pool::eth_pool(
EthTransactionValidator::new(blockchain_db.clone(), Arc::clone(&self.chain)),
@@ -341,7 +341,7 @@ impl Command {
Arc::clone(&db),
ctx.task_executor.clone(),
pipeline,
blockchain_tree.clone(),
blockchain_db.clone(),
self.debug.max_block,
self.debug.continuous,
payload_builder.clone(),

View File

@@ -13,6 +13,7 @@ reth-primitives = { path = "../../primitives" }
reth-interfaces = { path = "../../interfaces" }
reth-stages = { path = "../../stages" }
reth-db = { path = "../../storage/db" }
reth-provider = { path = "../../storage/provider" }
reth-rpc-types = { path = "../../rpc/rpc-types" }
reth-tasks = { path = "../../tasks" }
reth-payload-builder = { path = "../../payload/builder" }

View File

@@ -11,6 +11,7 @@ use reth_payload_builder::{PayloadBuilderAttributes, PayloadBuilderHandle};
use reth_primitives::{
listener::EventListeners, BlockNumber, Header, SealedBlock, SealedHeader, H256, U256,
};
use reth_provider::{BlockProvider, BlockSource, CanonChainTracker, ProviderError};
use reth_rpc_types::engine::{
ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, PayloadStatusEnum,
PayloadValidationError,
@@ -137,7 +138,7 @@ pub struct BeaconConsensusEngine<DB, TS, BT>
where
DB: Database,
TS: TaskSpawner,
BT: BlockchainTreeEngine,
BT: BlockchainTreeEngine + BlockProvider + CanonChainTracker,
{
/// The database handle.
db: DB,
@@ -147,8 +148,8 @@ where
/// Must always be [Some] unless the state is being reevaluated.
/// The pipeline is used for historical sync by setting the current forkchoice head.
pipeline_state: Option<PipelineState<DB>>,
/// The blockchain tree used for live sync and reorg tracking.
blockchain_tree: BT,
/// The type we can use to query both the database and the blockchain tree.
blockchain: BT,
/// The Engine API message receiver.
engine_message_rx: UnboundedReceiverStream<BeaconEngineMessage>,
/// A clone of the handle
@@ -179,14 +180,14 @@ impl<DB, TS, BT> BeaconConsensusEngine<DB, TS, BT>
where
DB: Database + Unpin + 'static,
TS: TaskSpawner,
BT: BlockchainTreeEngine + 'static,
BT: BlockchainTreeEngine + BlockProvider + CanonChainTracker + 'static,
{
/// Create a new instance of the [BeaconConsensusEngine].
pub fn new(
db: DB,
task_spawner: TS,
pipeline: Pipeline<DB>,
blockchain_tree: BT,
blockchain: BT,
max_block: Option<BlockNumber>,
continuous: bool,
payload_builder: PayloadBuilderHandle,
@@ -196,7 +197,7 @@ where
db,
task_spawner,
pipeline,
blockchain_tree,
blockchain,
max_block,
continuous,
payload_builder,
@@ -212,7 +213,7 @@ where
db: DB,
task_spawner: TS,
pipeline: Pipeline<DB>,
blockchain_tree: BT,
blockchain: BT,
max_block: Option<BlockNumber>,
continuous: bool,
payload_builder: PayloadBuilderHandle,
@@ -224,7 +225,7 @@ where
db,
task_spawner,
pipeline_state: Some(PipelineState::Idle(pipeline)),
blockchain_tree,
blockchain,
engine_message_rx: UnboundedReceiverStream::new(rx),
handle: handle.clone(),
forkchoice_state: None,
@@ -279,7 +280,7 @@ where
return Some(H256::zero())
}
self.blockchain_tree.find_canonical_ancestor(parent_hash)
self.blockchain.find_canonical_ancestor(parent_hash)
}
/// Loads the header for the given `block_number` from the database.
@@ -334,7 +335,7 @@ where
let is_first_forkchoice = self.forkchoice_state.is_none();
self.forkchoice_state = Some(state);
let status = if self.is_pipeline_idle() {
match self.blockchain_tree.make_canonical(&state.head_block_hash) {
match self.blockchain.make_canonical(&state.head_block_hash) {
Ok(_) => {
let head_block_number = self
.get_block_number(state.head_block_hash)?
@@ -354,9 +355,20 @@ where
let header = self
.load_header(head_block_number)?
.expect("was canonicalized, so it exists");
return Ok(self.process_payload_attributes(attrs, header, state))
let payload_response =
self.process_payload_attributes(attrs, header, state);
if payload_response.is_valid_update() {
// we will return VALID, so let's make sure the info tracker is
// properly updated
self.update_canon_chain(&state)?;
}
return Ok(payload_response)
}
// we will return VALID, so let's make sure the info tracker is
// properly updated
self.update_canon_chain(&state)?;
PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash))
}
Err(error) => {
@@ -380,6 +392,42 @@ where
Ok(OnForkChoiceUpdated::valid(status))
}
/// Sets the state of the canon chain tracker based on the given forkchoice update. This should
/// be called before issuing a VALID forkchoice update.
fn update_canon_chain(&self, update: &ForkchoiceState) -> Result<(), BeaconEngineError> {
if !update.finalized_block_hash.is_zero() {
let finalized = self
.blockchain
.find_block_by_hash(update.finalized_block_hash, BlockSource::Any)?
.ok_or_else(|| {
Error::Provider(ProviderError::UnknownBlockHash(update.finalized_block_hash))
})?;
self.blockchain.set_finalized(finalized.header.seal(update.finalized_block_hash));
}
if !update.safe_block_hash.is_zero() {
let safe = self
.blockchain
.find_block_by_hash(update.safe_block_hash, BlockSource::Any)?
.ok_or_else(|| {
Error::Provider(ProviderError::UnknownBlockHash(update.safe_block_hash))
})?;
self.blockchain.set_safe(safe.header.seal(update.safe_block_hash));
}
// the consensus engine should ensure the head is not zero so we always update the head
let head = self
.blockchain
.find_block_by_hash(update.head_block_hash, BlockSource::Any)?
.ok_or_else(|| {
Error::Provider(ProviderError::UnknownBlockHash(update.head_block_hash))
})?;
self.blockchain.set_canonical_head(head.header.seal(update.head_block_hash));
self.blockchain.on_forkchoice_update_received(update);
Ok(())
}
/// Handler for a failed a forkchoice update due to a canonicalization error.
///
/// This will determine if the state's head is invalid, and if so, return immediately.
@@ -425,6 +473,9 @@ where
}
/// Validates the payload attributes with respect to the header and fork choice state.
///
/// Note: At this point, the fork choice update is considered to be VALID, however, we can still
/// return an error if the payload attributes are invalid.
fn process_payload_attributes(
&self,
attrs: PayloadAttributes,
@@ -505,7 +556,7 @@ where
let header = block.header.clone();
let status = if self.is_pipeline_idle() {
match self.blockchain_tree.insert_block_without_senders(block) {
match self.blockchain.insert_block_without_senders(block) {
Ok(status) => {
let mut latest_valid_hash = None;
let status = match status {
@@ -538,7 +589,7 @@ where
PayloadStatus::new(status, latest_valid_hash)
}
}
} else if let Err(error) = self.blockchain_tree.buffer_block_without_sender(block) {
} else if let Err(error) = self.blockchain.buffer_block_without_sender(block) {
// received a new payload while we're still syncing to the target
let latest_valid_hash =
self.latest_valid_hash_for_invalid_payload(parent_hash, Some(&error));
@@ -598,7 +649,7 @@ where
let needs_pipeline_run = match self.get_block_number(state.finalized_block_hash)? {
Some(number) => {
// Attempt to restore the tree.
self.blockchain_tree.restore_canonical_hashes(number)?;
self.blockchain.restore_canonical_hashes(number)?;
// After restoring the tree, check if the head block is missing.
self.db
@@ -645,7 +696,7 @@ impl<DB, TS, BT> Future for BeaconConsensusEngine<DB, TS, BT>
where
DB: Database + Unpin + 'static,
TS: TaskSpawner + Unpin,
BT: BlockchainTreeEngine + Unpin + 'static,
BT: BlockchainTreeEngine + BlockProvider + CanonChainTracker + Unpin + 'static,
{
type Output = Result<(), BeaconEngineError>;
@@ -672,7 +723,7 @@ where
// Terminate the sync early if it's reached the maximum user
// configured block.
if is_valid_response {
let tip_number = this.blockchain_tree.canonical_tip().number;
let tip_number = this.blockchain.canonical_tip().number;
if this.has_reached_max_block(tip_number) {
return Poll::Ready(Ok(()))
}
@@ -809,7 +860,10 @@ mod tests {
use reth_interfaces::test_utils::TestConsensus;
use reth_payload_builder::test_utils::spawn_test_payload_service;
use reth_primitives::{ChainSpec, ChainSpecBuilder, SealedBlockWithSenders, H256, MAINNET};
use reth_provider::{test_utils::TestExecutorFactory, Transaction};
use reth_provider::{
providers::BlockchainProvider, test_utils::TestExecutorFactory, ShareableDatabase,
Transaction,
};
use reth_stages::{test_utils::TestStages, ExecOutput, PipelineError, StageError};
use reth_tasks::TokioTaskExecutor;
use std::{collections::VecDeque, sync::Arc, time::Duration};
@@ -821,7 +875,10 @@ mod tests {
type TestBeaconConsensusEngine = BeaconConsensusEngine<
Arc<Env<WriteMap>>,
TokioTaskExecutor,
ShareableBlockchainTree<Arc<Env<WriteMap>>, TestConsensus, TestExecutorFactory>,
BlockchainProvider<
Arc<Env<WriteMap>>,
ShareableBlockchainTree<Arc<Env<WriteMap>>, TestConsensus, TestExecutorFactory>,
>,
>;
struct TestEnv<DB> {
@@ -905,18 +962,22 @@ mod tests {
.build(db.clone());
// Setup blockchain tree
let externals = TreeExternals::new(db.clone(), consensus, executor_factory, chain_spec);
let externals =
TreeExternals::new(db.clone(), consensus, executor_factory, chain_spec.clone());
let config = BlockchainTreeConfig::new(1, 2, 3, 2);
let (canon_state_notification_sender, _) = tokio::sync::broadcast::channel(3);
let tree = ShareableBlockchainTree::new(
BlockchainTree::new(externals, canon_state_notification_sender, config)
.expect("failed to create tree"),
);
let shareable_db = ShareableDatabase::new(db.clone(), chain_spec.clone());
let latest = chain_spec.genesis_header().seal_slow();
let blockchain_provider = BlockchainProvider::with_latest(shareable_db, tree, latest);
let (engine, handle) = BeaconConsensusEngine::new(
db.clone(),
TokioTaskExecutor::default(),
pipeline,
tree,
blockchain_provider,
None,
false,
payload_builder,

View File

@@ -22,8 +22,11 @@ pub type ForkChoiceUpdateResult = Result<ForkchoiceUpdated, ForkchoiceUpdateErro
#[derive(Default, Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ForkchoiceState {
/// Hash of the head block.
pub head_block_hash: H256,
/// Hash of the safe block.
pub safe_block_hash: H256,
/// Hash of finalized block.
pub finalized_block_hash: H256,
}

View File

@@ -28,10 +28,10 @@ auto_impl = "1.0"
itertools = "0.10"
pin-project = "1.0"
derive_more = "0.99"
parking_lot = "0.12"
# test-utils
reth-rlp = { path = "../../rlp", optional = true }
parking_lot = { version = "0.12", optional = true }
[dev-dependencies]
reth-db = { path = "../db", features = ["test-utils"] }
@@ -42,4 +42,4 @@ parking_lot = "0.12"
[features]
bench = []
test-utils = ["reth-rlp", "parking_lot"]
test-utils = ["reth-rlp"]

View File

@@ -13,10 +13,11 @@ mod traits;
pub use traits::{
AccountProvider, BlockExecutor, BlockHashProvider, BlockIdProvider, BlockNumProvider,
BlockProvider, BlockProviderIdExt, BlockSource, BlockchainTreePendingStateProvider,
CanonStateNotification, CanonStateNotificationSender, CanonStateNotifications,
CanonStateSubscriptions, EvmEnvProvider, ExecutorFactory, HeaderProvider,
PostStateDataProvider, ReceiptProvider, ReceiptProviderIdExt, StateProvider, StateProviderBox,
StateProviderFactory, StateRootProvider, TransactionsProvider, WithdrawalsProvider,
CanonChainTracker, CanonStateNotification, CanonStateNotificationSender,
CanonStateNotifications, CanonStateSubscriptions, EvmEnvProvider, ExecutorFactory,
HeaderProvider, PostStateDataProvider, ReceiptProvider, ReceiptProviderIdExt, StateProvider,
StateProviderBox, StateProviderFactory, StateRootProvider, TransactionsProvider,
WithdrawalsProvider,
};
/// Provider trait implementations.

View File

@@ -0,0 +1,102 @@
use parking_lot::RwLock;
use reth_primitives::{BlockNumHash, SealedHeader};
use std::{sync::Arc, time::Instant};
/// Tracks the chain info: canonical head, safe block, finalized block.
#[derive(Debug, Clone)]
pub(crate) struct ChainInfoTracker {
inner: Arc<ChainInfoInner>,
}
impl ChainInfoTracker {
/// Create a new chain info container for the given canonical head.
pub(crate) fn new(head: SealedHeader) -> Self {
Self {
inner: Arc::new(ChainInfoInner {
last_forkchoice_update: RwLock::new(Instant::now()),
canonical_head: RwLock::new(head),
safe_block: RwLock::new(None),
finalized_block: RwLock::new(None),
}),
}
}
/// Update the timestamp when we received a forkchoice update.
pub(crate) fn on_forkchoice_update_received(&self) {
*self.inner.last_forkchoice_update.write() = Instant::now();
}
/// Returns the instant when we received the latest forkchoice update.
#[allow(unused)]
pub(crate) fn last_forkchoice_update_received_at(&self) -> Instant {
*self.inner.last_forkchoice_update.read()
}
/// Returns the canonical head of the chain.
#[allow(unused)]
pub(crate) fn get_canonical_head(&self) -> SealedHeader {
self.inner.canonical_head.read().clone()
}
/// Returns the safe header of the chain.
#[allow(unused)]
pub(crate) fn get_safe_header(&self) -> Option<SealedHeader> {
self.inner.safe_block.read().clone()
}
/// Returns the finalized header of the chain.
#[allow(unused)]
pub(crate) fn get_finalized_header(&self) -> Option<SealedHeader> {
self.inner.finalized_block.read().clone()
}
/// Returns the canonical head of the chain.
#[allow(unused)]
pub(crate) fn get_canonical_num_hash(&self) -> BlockNumHash {
self.inner.canonical_head.read().num_hash()
}
/// Returns the safe header of the chain.
#[allow(unused)]
pub(crate) fn get_safe_num_hash(&self) -> Option<BlockNumHash> {
let h = self.inner.safe_block.read();
h.as_ref().map(|h| h.num_hash())
}
/// Returns the finalized header of the chain.
#[allow(unused)]
pub(crate) fn get_finalized_num_hash(&self) -> Option<BlockNumHash> {
let h = self.inner.finalized_block.read();
h.as_ref().map(|h| h.num_hash())
}
/// Sets the canonical head of the chain.
pub(crate) fn set_canonical_head(&self, header: SealedHeader) {
*self.inner.canonical_head.write() = header;
}
/// Sets the safe header of the chain.
pub(crate) fn set_safe(&self, header: SealedHeader) {
self.inner.safe_block.write().replace(header);
}
/// Sets the finalized header of the chain.
pub(crate) fn set_finalized(&self, header: SealedHeader) {
self.inner.finalized_block.write().replace(header);
}
}
/// Container type for all chain info fields
#[derive(Debug)]
struct ChainInfoInner {
/// Timestamp when we received the last fork choice update.
///
/// This is mainly used to track if we're connected to a beacon node.
last_forkchoice_update: RwLock<Instant>,
/// The canonical head of the chain.
canonical_head: RwLock<SealedHeader>,
/// The block that the beacon node considers safe.
safe_block: RwLock<Option<SealedHeader>>,
/// The block that the beacon node considers finalized.
finalized_block: RwLock<Option<SealedHeader>>,
}

View File

@@ -1,18 +1,20 @@
use crate::{
BlockHashProvider, BlockIdProvider, BlockNumProvider, BlockProvider, BlockProviderIdExt,
BlockchainTreePendingStateProvider, CanonStateNotifications, CanonStateSubscriptions,
EvmEnvProvider, HeaderProvider, PostStateDataProvider, ReceiptProvider, StateProviderBox,
StateProviderFactory, TransactionsProvider, WithdrawalsProvider,
BlockchainTreePendingStateProvider, CanonChainTracker, CanonStateNotifications,
CanonStateSubscriptions, EvmEnvProvider, HeaderProvider, PostStateDataProvider, ProviderError,
ReceiptProvider, StateProviderBox, StateProviderFactory, TransactionsProvider,
WithdrawalsProvider,
};
use reth_db::database::Database;
use reth_interfaces::{
blockchain_tree::{BlockStatus, BlockchainTreeEngine, BlockchainTreeViewer},
Result,
consensus::ForkchoiceState,
Error, Result,
};
use reth_primitives::{
Block, BlockHash, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumber, ChainInfo, Header,
Receipt, SealedBlock, SealedBlockWithSenders, TransactionMeta, TransactionSigned, TxHash,
TxNumber, Withdrawal, H256, U256,
Receipt, SealedBlock, SealedBlockWithSenders, SealedHeader, TransactionMeta, TransactionSigned,
TxHash, TxNumber, Withdrawal, H256, U256,
};
use reth_revm_primitives::primitives::{BlockEnv, CfgEnv};
pub use state::{
@@ -25,10 +27,11 @@ use std::{
};
use tracing::trace;
mod chain_info;
mod database;
mod post_state_provider;
mod state;
use crate::traits::BlockSource;
use crate::{providers::chain_info::ChainInfoTracker, traits::BlockSource};
pub use database::*;
pub use post_state_provider::PostStateProvider;
@@ -43,12 +46,30 @@ pub struct BlockchainProvider<DB, Tree> {
database: ShareableDatabase<DB>,
/// The blockchain tree instance.
tree: Tree,
/// Tracks the chain info wrt forkchoice updates
chain_info: ChainInfoTracker,
}
impl<DB, Tree> BlockchainProvider<DB, Tree> {
/// Create new provider instance that wraps the database and the blockchain tree.
pub fn new(database: ShareableDatabase<DB>, tree: Tree) -> Self {
Self { database, tree }
/// Create new provider instance that wraps the database and the blockchain tree, using the
/// provided latest header to initialize the chain info tracker.
pub fn with_latest(database: ShareableDatabase<DB>, tree: Tree, latest: SealedHeader) -> Self {
Self { database, tree, chain_info: ChainInfoTracker::new(latest) }
}
}
impl<DB, Tree> BlockchainProvider<DB, Tree>
where
DB: Database,
{
/// Create a new provider using only the database and the tree, fetching the latest header from
/// the database to initialize the provider.
pub fn new(database: ShareableDatabase<DB>, tree: Tree) -> Result<Self> {
let best = database.chain_info()?;
match database.header_by_number(best.best_number)? {
Some(header) => Ok(Self::with_latest(database, tree, header.seal(best.best_hash))),
None => Err(Error::Provider(ProviderError::Header { number: best.best_number })),
}
}
}
@@ -410,6 +431,30 @@ where
}
}
impl<DB, Tree> CanonChainTracker for BlockchainProvider<DB, Tree>
where
DB: Send + Sync,
Tree: Send + Sync,
Self: BlockProvider,
{
fn on_forkchoice_update_received(&self, _update: &ForkchoiceState) {
// update timestamp
self.chain_info.on_forkchoice_update_received();
}
fn set_finalized(&self, header: SealedHeader) {
self.chain_info.set_finalized(header);
}
fn set_safe(&self, header: SealedHeader) {
self.chain_info.set_safe(header);
}
fn set_canonical_head(&self, header: SealedHeader) {
self.chain_info.set_canonical_head(header);
}
}
impl<DB, Tree> BlockProviderIdExt for BlockchainProvider<DB, Tree>
where
Self: BlockProvider + BlockIdProvider,

View File

@@ -0,0 +1,17 @@
use reth_interfaces::consensus::ForkchoiceState;
use reth_primitives::SealedHeader;
/// A type that can track updates related to fork choice updates.
pub trait CanonChainTracker: Send + Sync {
/// Notify the tracker about a received fork choice update.
fn on_forkchoice_update_received(&self, update: &ForkchoiceState);
/// Sets the canonical head of the chain.
fn set_canonical_head(&self, header: SealedHeader);
/// Sets the safe block of the chain.
fn set_safe(&self, header: SealedHeader);
/// Sets the finalized block of the chain.
fn set_finalized(&self, header: SealedHeader);
}

View File

@@ -15,6 +15,9 @@ pub use block_id::{BlockIdProvider, BlockNumProvider};
mod evm_env;
pub use evm_env::EvmEnvProvider;
mod chain_info;
pub use chain_info::CanonChainTracker;
mod header;
pub use header::HeaderProvider;