refactor(sync): header sync mode (#1831)

This commit is contained in:
Roman Krasiuk
2023-03-19 15:59:37 +02:00
committed by GitHub
parent 6bb341a169
commit 25ac1370fd
15 changed files with 155 additions and 252 deletions

1
Cargo.lock generated
View File

@@ -4583,7 +4583,6 @@ dependencies = [
"reth-consensus-common",
"reth-interfaces",
"reth-primitives",
"tokio",
]
[[package]]

View File

@@ -12,11 +12,9 @@ use reth_downloaders::{
headers::reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::FileClient,
};
use reth_interfaces::{
consensus::{Consensus, ForkchoiceState},
p2p::headers::client::NoopStatusUpdater,
sync::SyncStateUpdater,
consensus::Consensus, p2p::headers::client::NoopStatusUpdater, sync::SyncStateUpdater,
};
use reth_primitives::ChainSpec;
use reth_primitives::{ChainSpec, H256};
use reth_staged_sync::{
utils::{
chainspec::genesis_value_parser,
@@ -26,9 +24,10 @@ use reth_staged_sync::{
};
use reth_stages::{
prelude::*,
stages::{ExecutionStage, SenderRecoveryStage, TotalDifficultyStage},
stages::{ExecutionStage, HeaderSyncMode, SenderRecoveryStage, TotalDifficultyStage},
};
use std::sync::Arc;
use tokio::sync::watch;
use tracing::{debug, info};
/// Syncs RLP encoded blocks from a file.
@@ -89,6 +88,9 @@ impl ImportCommand {
init_genesis(db.clone(), self.chain.clone())?;
let consensus = Arc::new(BeaconConsensus::new(self.chain.clone()));
info!(target: "reth::cli", "Consensus engine initialized");
// create a new FileClient
info!(target: "reth::cli", "Importing chain file");
let file_client = Arc::new(FileClient::new(&self.path).await?);
@@ -97,18 +99,13 @@ impl ImportCommand {
let tip = file_client.tip().expect("file client has no tip");
info!(target: "reth::cli", "Chain file imported");
let (consensus, notifier) = BeaconConsensus::builder().build(self.chain.clone());
debug!(target: "reth::cli", %tip, "Tip manually set");
notifier.send(ForkchoiceState {
head_block_hash: tip,
safe_block_hash: tip,
finalized_block_hash: tip,
})?;
info!(target: "reth::cli", "Consensus engine initialized");
let (mut pipeline, events) =
self.build_import_pipeline(config, db.clone(), &consensus, file_client).await?;
// override the tip
pipeline.set_tip(tip);
debug!(target: "reth::cli", %tip, "Tip manually set");
tokio::spawn(handle_events(None, events));
// Run pipeline
@@ -140,12 +137,15 @@ impl ImportCommand {
.build(file_client.clone(), consensus.clone(), db)
.into_task();
let (tip_tx, tip_rx) = watch::channel(H256::zero());
let factory = reth_executor::Factory::new(self.chain.clone());
let mut pipeline = Pipeline::builder()
.with_tip_sender(tip_tx)
.with_sync_state_updater(file_client)
.add_stages(
DefaultStages::new(
HeaderSyncMode::Tip(tip_rx),
consensus.clone(),
header_downloader,
body_downloader,

View File

@@ -50,7 +50,7 @@ use reth_staged_sync::{
};
use reth_stages::{
prelude::*,
stages::{ExecutionStage, HeaderStage, SenderRecoveryStage, TotalDifficultyStage, FINISH},
stages::{ExecutionStage, HeaderSyncMode, SenderRecoveryStage, TotalDifficultyStage, FINISH},
};
use reth_tasks::TaskExecutor;
use std::{
@@ -154,7 +154,7 @@ impl Command {
init_genesis(db.clone(), self.chain.clone())?;
let (consensus, forkchoice_state_tx) = self.init_consensus()?;
let consensus = Arc::new(BeaconConsensus::new(self.chain.clone())) as Arc<dyn Consensus>;
info!(target: "reth::cli", "Consensus engine initialized");
self.init_trusted_nodes(&mut config);
@@ -183,8 +183,9 @@ impl Command {
info!(target: "reth::cli", "Continuous sync mode enabled");
}
let engine_api_handle =
self.init_engine_api(Arc::clone(&db), forkchoice_state_tx, &ctx.task_executor);
// TODO: This will be fixed with the sync controller (https://github.com/paradigmxyz/reth/pull/1662)
let (tx, _rx) = watch::channel(ForkchoiceState::default());
let engine_api_handle = self.init_engine_api(Arc::clone(&db), tx, &ctx.task_executor);
info!(target: "reth::cli", "Engine API handler initialized");
let _auth_server = self
@@ -209,6 +210,16 @@ impl Command {
)
.await?;
if let Some(tip) = self.tip {
pipeline.set_tip(tip);
debug!(target: "reth::cli", %tip, "Tip manually set");
} else {
let warn_msg = "No tip specified. \
reth cannot communicate with consensus clients, \
so a tip must manually be provided for the online stages with --debug.tip <HASH>.";
warn!(target: "reth::cli", warn_msg);
}
ctx.task_executor.spawn(events::handle_events(Some(network.clone()), events));
// Run pipeline
@@ -304,26 +315,6 @@ impl Command {
}
}
fn init_consensus(&self) -> eyre::Result<(Arc<dyn Consensus>, watch::Sender<ForkchoiceState>)> {
let (consensus, notifier) = BeaconConsensus::builder().build(self.chain.clone());
if let Some(tip) = self.tip {
debug!(target: "reth::cli", %tip, "Tip manually set");
notifier.send(ForkchoiceState {
head_block_hash: tip,
safe_block_hash: tip,
finalized_block_hash: tip,
})?;
} else {
let warn_msg = "No tip specified. \
reth cannot communicate with consensus clients, \
so a tip must manually be provided for the online stages with --debug.tip <HASH>.";
warn!(target: "reth::cli", warn_msg);
}
Ok((consensus, notifier))
}
fn init_engine_api(
&self,
db: Arc<Env<WriteMap>>,
@@ -486,44 +477,31 @@ impl Command {
builder = builder.with_max_block(max_block)
}
let (tip_tx, tip_rx) = watch::channel(H256::zero());
let factory = reth_executor::Factory::new(self.chain.clone());
let default_stages = if continuous {
let continuous_headers =
HeaderStage::new(header_downloader, consensus.clone()).continuous();
let online_builder = OnlineStages::builder_with_headers(
continuous_headers,
consensus.clone(),
body_downloader,
);
DefaultStages::<H, B, U, reth_executor::Factory>::add_offline_stages(
online_builder,
updater.clone(),
factory.clone(),
)
} else {
DefaultStages::new(
consensus.clone(),
header_downloader,
body_downloader,
updater.clone(),
factory.clone(),
)
.builder()
};
let header_mode =
if continuous { HeaderSyncMode::Continuous } else { HeaderSyncMode::Tip(tip_rx) };
let pipeline = builder
.with_sync_state_updater(updater)
.with_sync_state_updater(updater.clone())
.with_tip_sender(tip_tx)
.add_stages(
default_stages
.set(
TotalDifficultyStage::new(consensus.clone())
.with_commit_threshold(stage_conf.total_difficulty.commit_threshold),
)
.set(SenderRecoveryStage {
commit_threshold: stage_conf.sender_recovery.commit_threshold,
})
.set(ExecutionStage::new(factory, stage_conf.execution.commit_threshold)),
DefaultStages::new(
header_mode,
consensus.clone(),
header_downloader,
body_downloader,
updater,
factory.clone(),
)
.set(
TotalDifficultyStage::new(consensus.clone())
.with_commit_threshold(stage_conf.total_difficulty.commit_threshold),
)
.set(SenderRecoveryStage {
commit_threshold: stage_conf.sender_recovery.commit_threshold,
})
.set(ExecutionStage::new(factory, stage_conf.execution.commit_threshold)),
)
.build();

View File

@@ -123,7 +123,7 @@ impl Command {
match self.stage {
StageEnum::Bodies => {
let (consensus, _) = BeaconConsensus::builder().build(self.chain.clone());
let consensus = Arc::new(BeaconConsensus::new(self.chain.clone()));
let mut config = config;
config.peers.connect_trusted_nodes_only = self.network.trusted_only;

View File

@@ -12,8 +12,5 @@ reth-consensus-common = { path = "../common" }
reth-primitives = { path = "../../primitives" }
reth-interfaces = { path = "../../interfaces" }
# async
tokio = { version = "1", features = ["sync"] }
[dev-dependencies]
reth-interfaces = { path = "../../interfaces", features = ["test-utils"] }

View File

@@ -1,44 +1,26 @@
//! Consensus for ethereum network
use reth_consensus_common::validation;
use reth_interfaces::consensus::{Consensus, ConsensusError, ForkchoiceState};
use reth_interfaces::consensus::{Consensus, ConsensusError};
use reth_primitives::{ChainSpec, Hardfork, SealedBlock, SealedHeader, EMPTY_OMMER_ROOT, U256};
use std::sync::Arc;
use tokio::sync::watch;
use super::BeaconConsensusBuilder;
/// Ethereum beacon consensus
///
/// This consensus engine does basic checks as outlined in the execution specs,
/// but otherwise defers consensus on what the current chain is to a consensus client.
/// This consensus engine does basic checks as outlined in the execution specs.
#[derive(Debug)]
pub struct BeaconConsensus {
/// Watcher over the forkchoice state
forkchoice_state_rx: watch::Receiver<ForkchoiceState>,
/// Configuration
chain_spec: Arc<ChainSpec>,
}
impl BeaconConsensus {
/// Create a new instance of [BeaconConsensus]
pub fn new(
chain_spec: Arc<ChainSpec>,
forkchoice_state_rx: watch::Receiver<ForkchoiceState>,
) -> Self {
Self { chain_spec, forkchoice_state_rx }
}
/// Create new [BeaconConsensusBuilder].
pub fn builder() -> BeaconConsensusBuilder {
BeaconConsensusBuilder::default()
pub fn new(chain_spec: Arc<ChainSpec>) -> Self {
Self { chain_spec }
}
}
impl Consensus for BeaconConsensus {
fn fork_choice_state(&self) -> watch::Receiver<ForkchoiceState> {
self.forkchoice_state_rx.clone()
}
fn pre_validate_header(
&self,
header: &SealedHeader,
@@ -101,7 +83,7 @@ mod test {
#[test]
fn test_has_block_reward_before_paris() {
let chain_spec = Arc::new(ChainSpecBuilder::mainnet().build());
let (consensus, _) = BeaconConsensus::builder().build(chain_spec);
let consensus = BeaconConsensus::new(chain_spec);
assert!(consensus.has_block_reward(U256::ZERO, U256::ZERO));
}
}

View File

@@ -1,22 +0,0 @@
use super::BeaconConsensus;
use reth_interfaces::consensus::ForkchoiceState;
use reth_primitives::ChainSpec;
use std::sync::Arc;
use tokio::sync::watch;
/// TODO:
#[derive(Debug, Default)]
pub struct BeaconConsensusBuilder;
impl BeaconConsensusBuilder {
/// Create new instance of [BeaconConsensus] and forkchoice notifier. Internally, creates a
/// [watch::channel] for updating the forkchoice state.
pub fn build(
self,
chain_spec: Arc<ChainSpec>,
) -> (Arc<BeaconConsensus>, watch::Sender<ForkchoiceState>) {
let (forkchoice_state_tx, forkchoice_state_rx) = watch::channel(ForkchoiceState::default());
let inner = Arc::new(BeaconConsensus::new(chain_spec, forkchoice_state_rx));
(inner, forkchoice_state_tx)
}
}

View File

@@ -7,7 +7,4 @@
//! Beacon consensus implementation.
mod beacon_consensus;
mod builder;
pub use beacon_consensus::BeaconConsensus;
pub use builder::BeaconConsensusBuilder;

View File

@@ -3,7 +3,6 @@ use reth_primitives::{
BlockHash, BlockNumber, InvalidTransactionError, SealedBlock, SealedHeader, H256, U256,
};
use std::fmt::Debug;
use tokio::sync::watch::Receiver;
/// Re-export fork choice state
pub use reth_rpc_types::engine::ForkchoiceState;
@@ -12,9 +11,6 @@ pub use reth_rpc_types::engine::ForkchoiceState;
#[async_trait]
#[auto_impl::auto_impl(&, Arc)]
pub trait Consensus: Debug + Send + Sync {
/// Get a receiver for the fork choice state
fn fork_choice_state(&self) -> Receiver<ForkchoiceState>;
/// Validate if the header is correct and follows consensus specification.
///
/// This is called before properties that are not in the header itself (like total difficulty)

View File

@@ -259,22 +259,13 @@ impl HeadersClient for TestHeadersClient {
/// Consensus engine implementation for testing
#[derive(Debug)]
pub struct TestConsensus {
/// Watcher over the forkchoice state
channel: (watch::Sender<ForkchoiceState>, watch::Receiver<ForkchoiceState>),
/// Flag whether the header validation should purposefully fail
fail_validation: AtomicBool,
}
impl Default for TestConsensus {
fn default() -> Self {
Self {
channel: watch::channel(ForkchoiceState {
head_block_hash: H256::zero(),
finalized_block_hash: H256::zero(),
safe_block_hash: H256::zero(),
}),
fail_validation: AtomicBool::new(false),
}
Self { fail_validation: AtomicBool::new(false) }
}
}
@@ -288,14 +279,6 @@ impl TestConsensus {
pub fn set_fail_validation(&self, val: bool) {
self.fail_validation.store(val, Ordering::SeqCst)
}
/// Update the forkchoice state.
pub fn notify_fork_choice_state(
&self,
state: ForkchoiceState,
) -> Result<(), SendError<ForkchoiceState>> {
self.channel.0.send(state)
}
}
/// Status updater for testing.
@@ -323,10 +306,6 @@ impl StatusUpdater for TestStatusUpdater {
#[async_trait::async_trait]
impl Consensus for TestConsensus {
fn fork_choice_state(&self) -> watch::Receiver<ForkchoiceState> {
self.channel.1.clone()
}
fn pre_validate_header(
&self,
header: &SealedHeader,

View File

@@ -28,9 +28,11 @@
//! # use reth_interfaces::sync::NoopSyncStateUpdate;
//! # use reth_interfaces::test_utils::{TestBodiesClient, TestConsensus, TestHeadersClient, TestStatusUpdater};
//! # use reth_executor::Factory;
//! # use reth_primitives::{PeerId,MAINNET};
//! # use reth_primitives::{PeerId, MAINNET, H256};
//! # use reth_stages::Pipeline;
//! # use reth_stages::sets::DefaultStages;
//! # use reth_stages::stages::HeaderSyncMode;
//! # use tokio::sync::watch;
//! # let consensus: Arc<dyn Consensus> = Arc::new(TestConsensus::default());
//! # let headers_downloader = ReverseHeadersDownloaderBuilder::default().build(
//! # Arc::new(TestHeadersClient::default()),
@@ -41,13 +43,15 @@
//! # consensus.clone(),
//! # create_test_rw_db()
//! # );
//! # let (tip_tx, tip_rx) = watch::channel(H256::default());
//! # let factory = Factory::new(Arc::new(MAINNET.clone()));
//! # let (status_updater, _) = TestStatusUpdater::new();
//! // Create a pipeline that can fully sync
//! # let pipeline: Pipeline<Env<WriteMap>, NoopSyncStateUpdate> =
//! Pipeline::builder()
//! .with_tip_sender(tip_tx)
//! .add_stages(
//! DefaultStages::new(consensus, headers_downloader, bodies_downloader, status_updater, factory)
//! DefaultStages::new(HeaderSyncMode::Tip(tip_rx), consensus, headers_downloader, bodies_downloader, status_updater, factory)
//! )
//! .build();
//! ```

View File

@@ -1,7 +1,8 @@
use crate::{Pipeline, Stage, StageSet};
use reth_db::database::Database;
use reth_interfaces::sync::{NoopSyncStateUpdate, SyncStateUpdater};
use reth_primitives::BlockNumber;
use reth_primitives::{BlockNumber, H256};
use tokio::sync::watch;
/// Builds a [`Pipeline`].
#[derive(Debug)]
@@ -56,6 +57,12 @@ where
self
}
/// Set the tip sender.
pub fn with_tip_sender(mut self, tip_tx: watch::Sender<H256>) -> Self {
self.pipeline.tip_tx = Some(tip_tx);
self
}
/// Set a [SyncStateUpdater].
pub fn with_sync_state_updater(mut self, updater: U) -> Self {
self.pipeline.sync_state_updater = Some(updater);

View File

@@ -3,7 +3,7 @@ use metrics::Gauge;
use reth_db::database::Database;
use reth_interfaces::sync::{SyncState, SyncStateUpdater};
use reth_metrics_derive::Metrics;
use reth_primitives::BlockNumber;
use reth_primitives::{BlockNumber, H256};
use reth_provider::Transaction;
use std::{
collections::HashMap,
@@ -11,6 +11,7 @@ use std::{
ops::Deref,
sync::Arc,
};
use tokio::sync::watch;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::*;
@@ -80,6 +81,7 @@ pub struct Pipeline<DB: Database, U: SyncStateUpdater> {
listeners: PipelineEventListeners,
sync_state_updater: Option<U>,
progress: PipelineProgress,
tip_tx: Option<watch::Sender<H256>>,
metrics: Metrics,
}
@@ -91,6 +93,7 @@ impl<DB: Database, U: SyncStateUpdater> Default for Pipeline<DB, U> {
listeners: PipelineEventListeners::default(),
sync_state_updater: None,
progress: PipelineProgress::default(),
tip_tx: None,
metrics: Metrics::default(),
}
}
@@ -111,6 +114,11 @@ impl<DB: Database, U: SyncStateUpdater> Pipeline<DB, U> {
PipelineBuilder::default()
}
/// Set tip for reverse sync.
pub fn set_tip(&self, tip: H256) {
self.tip_tx.as_ref().expect("tip sender is set").send(tip).expect("tip channel closed");
}
/// Listen for events on the pipeline.
pub fn events(&mut self) -> UnboundedReceiverStream<PipelineEvent> {
self.listeners.new_listener()

View File

@@ -40,7 +40,7 @@
//! ```
use crate::{
stages::{
AccountHashingStage, BodyStage, ExecutionStage, FinishStage, HeaderStage,
AccountHashingStage, BodyStage, ExecutionStage, FinishStage, HeaderStage, HeaderSyncMode,
IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage,
StorageHashingStage, TotalDifficultyStage, TransactionLookupStage,
},
@@ -77,6 +77,7 @@ pub struct DefaultStages<H, B, S, EF> {
impl<H, B, S, EF> DefaultStages<H, B, S, EF> {
/// Create a new set of default stages with default values.
pub fn new(
header_mode: HeaderSyncMode,
consensus: Arc<dyn Consensus>,
header_downloader: H,
body_downloader: B,
@@ -87,7 +88,7 @@ impl<H, B, S, EF> DefaultStages<H, B, S, EF> {
EF: ExecutorFactory,
{
Self {
online: OnlineStages::new(consensus, header_downloader, body_downloader),
online: OnlineStages::new(header_mode, consensus, header_downloader, body_downloader),
executor_factory,
status_updater,
}
@@ -130,6 +131,8 @@ where
/// themselves offline.
#[derive(Debug)]
pub struct OnlineStages<H, B> {
/// The sync mode for the headers stage.
header_mode: HeaderSyncMode,
/// The consensus engine used to validate incoming data.
consensus: Arc<dyn Consensus>,
/// The block header downloader
@@ -140,8 +143,13 @@ pub struct OnlineStages<H, B> {
impl<H, B> OnlineStages<H, B> {
/// Create a new set of online stages with default values.
pub fn new(consensus: Arc<dyn Consensus>, header_downloader: H, body_downloader: B) -> Self {
Self { consensus, header_downloader, body_downloader }
pub fn new(
header_mode: HeaderSyncMode,
consensus: Arc<dyn Consensus>,
header_downloader: H,
body_downloader: B,
) -> Self {
Self { header_mode, consensus, header_downloader, body_downloader }
}
}
@@ -153,8 +161,8 @@ where
/// Create a new builder using the given headers stage.
pub fn builder_with_headers<DB: Database>(
headers: HeaderStage<H>,
consensus: Arc<dyn Consensus>,
body_downloader: B,
consensus: Arc<dyn Consensus>,
) -> StageSetBuilder<DB> {
StageSetBuilder::default()
.add_stage(headers)
@@ -165,11 +173,12 @@ where
/// Create a new builder using the given bodies stage.
pub fn builder_with_bodies<DB: Database>(
bodies: BodyStage<B>,
consensus: Arc<dyn Consensus>,
mode: HeaderSyncMode,
header_downloader: H,
consensus: Arc<dyn Consensus>,
) -> StageSetBuilder<DB> {
StageSetBuilder::default()
.add_stage(HeaderStage::new(header_downloader, consensus.clone()))
.add_stage(HeaderStage::new(header_downloader, mode))
.add_stage(TotalDifficultyStage::new(consensus.clone()))
.add_stage(bodies)
}
@@ -183,7 +192,7 @@ where
{
fn builder(self) -> StageSetBuilder<DB> {
StageSetBuilder::default()
.add_stage(HeaderStage::new(self.header_downloader, self.consensus.clone()))
.add_stage(HeaderStage::new(self.header_downloader, self.header_mode))
.add_stage(TotalDifficultyStage::new(self.consensus.clone()))
.add_stage(BodyStage { downloader: self.body_downloader, consensus: self.consensus })
}

View File

@@ -7,18 +7,28 @@ use reth_db::{
transaction::{DbTx, DbTxMut},
};
use reth_interfaces::{
consensus::{Consensus, ForkchoiceState},
p2p::headers::downloader::{HeaderDownloader, SyncTarget},
provider::ProviderError,
};
use reth_primitives::{BlockHashOrNumber, BlockNumber, SealedHeader};
use reth_primitives::{BlockHashOrNumber, BlockNumber, SealedHeader, H256};
use reth_provider::Transaction;
use std::sync::Arc;
use tokio::sync::watch;
use tracing::*;
/// The [`StageId`] of the headers downloader stage.
pub const HEADERS: StageId = StageId("Headers");
/// The header sync mode.
#[derive(Debug)]
pub enum HeaderSyncMode {
/// A sync mode in which the stage continuously requests the downloader for
/// next blocks.
Continuous,
/// A sync mode in which the stage polls the receiver for the next tip
/// to download from.
Tip(watch::Receiver<H256>),
}
/// The headers stage.
///
/// The headers stage downloads all block headers from the highest block in the local database to
@@ -36,10 +46,8 @@ pub const HEADERS: StageId = StageId("Headers");
pub struct HeaderStage<D: HeaderDownloader> {
/// Strategy for downloading the headers
downloader: D,
/// Consensus client implementation
consensus: Arc<dyn Consensus>,
/// Whether or not the stage should download continuously, or wait for the fork choice state
continuous: bool,
/// The sync mode for the stage.
mode: HeaderSyncMode,
}
// === impl HeaderStage ===
@@ -49,8 +57,8 @@ where
D: HeaderDownloader,
{
/// Create a new header stage
pub fn new(downloader: D, consensus: Arc<dyn Consensus>) -> Self {
Self { downloader, consensus, continuous: false }
pub fn new(downloader: D, mode: HeaderSyncMode) -> Self {
Self { downloader, mode }
}
fn is_stage_done<DB: Database>(
@@ -66,17 +74,11 @@ where
Ok(header_cursor.next()?.map(|(next_num, _)| head_num + 1 == next_num).unwrap_or_default())
}
/// Set the stage to download continuously
pub fn continuous(mut self) -> Self {
self.continuous = true;
self
}
/// Get the head and tip of the range we need to sync
///
/// See also [SyncTarget]
async fn get_sync_gap<DB: Database>(
&self,
&mut self,
tx: &Transaction<'_, DB>,
stage_progress: u64,
) -> Result<SyncGap, StageError> {
@@ -112,28 +114,27 @@ where
// reverse from there. Else, it should use whatever the forkchoice state reports.
let target = match next_header {
Some(header) if stage_progress + 1 != header.number => SyncTarget::Gap(header),
None => {
if self.continuous {
tracing::trace!(target: "sync::stages::headers", ?head_num, "No next header found, using continuous sync strategy");
SyncTarget::TipNum(head_num + 1)
} else {
SyncTarget::Tip(self.next_fork_choice_state().await.head_block_hash)
}
}
None => self.next_sync_target(head_num).await,
_ => return Err(StageError::StageProgress(stage_progress)),
};
Ok(SyncGap { local_head, target })
}
/// Awaits the next [ForkchoiceState] message from [Consensus] with a non-zero block hash
async fn next_fork_choice_state(&self) -> ForkchoiceState {
let mut state_rcv = self.consensus.fork_choice_state();
loop {
let _ = state_rcv.changed().await;
let forkchoice = state_rcv.borrow();
if !forkchoice.head_block_hash.is_zero() {
return forkchoice.clone()
async fn next_sync_target(&mut self, head: BlockNumber) -> SyncTarget {
match self.mode {
HeaderSyncMode::Tip(ref mut rx) => {
loop {
let _ = rx.changed().await; // TODO: remove this await?
let tip = rx.borrow();
if !tip.is_zero() {
return SyncTarget::Tip(*tip)
}
}
}
HeaderSyncMode::Continuous => {
tracing::trace!(target: "sync::stages::headers", head, "No next header found, using continuous sync strategy");
SyncTarget::TipNum(head + 1)
}
}
}
@@ -285,35 +286,20 @@ mod tests {
use test_runner::HeadersTestRunner;
mod test_runner {
use crate::{
stages::headers::HeaderStage,
test_utils::{
ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction,
UnwindStageTestRunner,
},
ExecInput, ExecOutput, UnwindInput,
};
use reth_db::{
tables,
transaction::{DbTx, DbTxMut},
};
use super::*;
use crate::test_utils::{TestRunnerError, TestTransaction};
use reth_downloaders::headers::reverse_headers::{
ReverseHeadersDownloader, ReverseHeadersDownloaderBuilder,
};
use reth_interfaces::{
consensus::ForkchoiceState,
p2p::headers::downloader::HeaderDownloader,
test_utils::{
generators::{random_header, random_header_range},
TestConsensus, TestHeaderDownloader, TestHeadersClient,
},
use reth_interfaces::test_utils::{
generators::random_header_range, TestConsensus, TestHeaderDownloader, TestHeadersClient,
};
use reth_primitives::{BlockNumber, SealedHeader, U256};
use reth_primitives::U256;
use std::sync::Arc;
pub(crate) struct HeadersTestRunner<D: HeaderDownloader> {
pub(crate) consensus: Arc<TestConsensus>,
pub(crate) client: Arc<TestHeadersClient>,
channel: (watch::Sender<H256>, watch::Receiver<H256>),
downloader_factory: Box<dyn Fn() -> D + Send + Sync + 'static>,
tx: TestTransaction,
}
@@ -321,12 +307,16 @@ mod tests {
impl Default for HeadersTestRunner<TestHeaderDownloader> {
fn default() -> Self {
let client = Arc::new(TestHeadersClient::default());
let consensus = Arc::new(TestConsensus::default());
Self {
client: client.clone(),
consensus: consensus.clone(),
channel: watch::channel(H256::zero()),
downloader_factory: Box::new(move || {
TestHeaderDownloader::new(client.clone(), consensus.clone(), 1000, 1000)
TestHeaderDownloader::new(
client.clone(),
Arc::new(TestConsensus::default()),
1000,
1000,
)
}),
tx: TestTransaction::default(),
}
@@ -342,9 +332,8 @@ mod tests {
fn stage(&self) -> Self::S {
HeaderStage {
consensus: self.consensus.clone(),
mode: HeaderSyncMode::Tip(self.channel.1.clone()),
downloader: (*self.downloader_factory)(),
continuous: false,
}
}
}
@@ -414,12 +403,7 @@ mod tests {
self.tx.insert_headers(std::iter::once(&tip))?;
tip.hash()
};
self.consensus
.notify_fork_choice_state(ForkchoiceState {
head_block_hash: tip,
..Default::default()
})
.expect("Setting tip failed");
self.send_tip(tip);
Ok(())
}
}
@@ -433,14 +417,13 @@ mod tests {
impl HeadersTestRunner<ReverseHeadersDownloader<TestHeadersClient>> {
pub(crate) fn with_linear_downloader() -> Self {
let client = Arc::new(TestHeadersClient::default());
let consensus = Arc::new(TestConsensus::default());
Self {
client: client.clone(),
consensus: consensus.clone(),
channel: watch::channel(H256::zero()),
downloader_factory: Box::new(move || {
ReverseHeadersDownloaderBuilder::default()
.stream_batch_size(500)
.build(client.clone(), consensus.clone())
.build(client.clone(), Arc::new(TestConsensus::default()))
}),
tx: TestTransaction::default(),
}
@@ -458,6 +441,10 @@ mod tests {
self.tx.ensure_no_entry_above::<tables::Headers, _>(block, |key| key)?;
Ok(())
}
pub(crate) fn send_tip(&self, tip: H256) {
self.channel.0.send(tip).expect("failed to send tip");
}
}
}
@@ -479,13 +466,7 @@ mod tests {
// skip `after_execution` hook for linear downloader
let tip = headers.last().unwrap();
runner
.consensus
.notify_fork_choice_state(ForkchoiceState {
head_block_hash: tip.hash(),
..Default::default()
})
.expect("Setting tip failed");
runner.send_tip(tip.hash());
let result = rx.await.unwrap();
assert_matches!(result, Ok(ExecOutput { done: true, stage_progress }) if stage_progress == tip.number);
@@ -497,16 +478,10 @@ mod tests {
async fn head_and_tip_lookup() {
let runner = HeadersTestRunner::default();
let tx = runner.tx().inner();
let stage = runner.stage();
let mut stage = runner.stage();
let consensus_tip = H256::random();
runner
.consensus
.notify_fork_choice_state(ForkchoiceState {
head_block_hash: consensus_tip,
..Default::default()
})
.expect("Setting tip failed");
runner.send_tip(consensus_tip);
// Genesis
let stage_progress = 0;
@@ -570,13 +545,7 @@ mod tests {
// skip `after_execution` hook for linear downloader
let tip = headers.last().unwrap();
runner
.consensus
.notify_fork_choice_state(ForkchoiceState {
head_block_hash: tip.hash(),
..Default::default()
})
.expect("Setting tip failed");
runner.send_tip(tip.hash());
let result = rx.await.unwrap();
assert_matches!(result, Ok(ExecOutput { done: false, stage_progress: progress }) if progress == stage_progress);