feat: add BeaconConsensusEvent for live sync download requests (#7230)

Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
This commit is contained in:
Dan Cline
2024-03-19 13:12:50 -04:00
committed by GitHub
parent d86d4d2380
commit ce89a2be8d
4 changed files with 77 additions and 9 deletions

View File

@@ -1,6 +1,6 @@
use crate::engine::forkchoice::ForkchoiceStatus;
use reth_interfaces::consensus::ForkchoiceState;
use reth_primitives::{SealedBlock, SealedHeader};
use reth_primitives::{SealedBlock, SealedHeader, B256};
use std::{sync::Arc, time::Duration};
/// Events emitted by [crate::BeaconConsensusEngine].
@@ -12,6 +12,20 @@ pub enum BeaconConsensusEngineEvent {
CanonicalBlockAdded(Arc<SealedBlock>, Duration),
/// A canonical chain was committed, and the elapsed time committing the data
CanonicalChainCommitted(Box<SealedHeader>, Duration),
/// The consensus engine is involved in live sync, and has specific progress
LiveSyncProgress(ConsensusEngineLiveSyncProgress),
/// A block was added to the fork chain.
ForkBlockAdded(Arc<SealedBlock>),
}
/// Progress of the consensus engine during live sync.
#[derive(Clone, Debug)]
pub enum ConsensusEngineLiveSyncProgress {
/// The consensus engine is downloading blocks from the network.
DownloadingBlocks {
/// The number of blocks remaining to download.
remaining_blocks: u64,
/// The target block hash and number to download.
target: B256,
},
}

View File

@@ -63,7 +63,7 @@ mod invalid_headers;
use invalid_headers::InvalidHeaderCache;
mod event;
pub use event::BeaconConsensusEngineEvent;
pub use event::{BeaconConsensusEngineEvent, ConsensusEngineLiveSyncProgress};
mod handle;
pub use handle::BeaconConsensusEngineHandle;
@@ -287,6 +287,7 @@ where
hooks: EngineHooks,
) -> RethResult<(Self, BeaconConsensusEngineHandle<EngineT>)> {
let handle = BeaconConsensusEngineHandle { to_engine };
let listeners = EventListeners::default();
let sync = EngineSyncController::new(
pipeline,
client,
@@ -294,6 +295,7 @@ where
run_pipeline_continuously,
max_block,
blockchain.chain_spec(),
listeners.clone(),
);
let mut this = Self {
sync,
@@ -304,7 +306,7 @@ where
handle: handle.clone(),
forkchoice_state_tracker: Default::default(),
payload_builder,
listeners: EventListeners::default(),
listeners,
invalid_headers: InvalidHeaderCache::new(MAX_INVALID_HEADERS),
metrics: EngineMetrics::default(),
pipeline_run_threshold,
@@ -605,6 +607,13 @@ where
self.handle.clone()
}
/// Pushes an [UnboundedSender] to the engine's listeners. Also pushes an [UnboundedSender] to
/// the sync controller's listeners.
pub(crate) fn push_listener(&mut self, listener: UnboundedSender<BeaconConsensusEngineEvent>) {
self.listeners.push_listener(listener.clone());
self.sync.push_listener(listener);
}
/// Returns true if the distance from the local tip to the block is greater than the configured
/// threshold.
///
@@ -1867,9 +1876,7 @@ where
BeaconEngineMessage::TransitionConfigurationExchanged => {
this.blockchain.on_transition_configuration_exchanged();
}
BeaconEngineMessage::EventListener(tx) => {
this.listeners.push_listener(tx);
}
BeaconEngineMessage::EventListener(tx) => this.push_listener(tx),
}
continue
}

View File

@@ -1,6 +1,9 @@
//! Sync management for the engine implementation.
use crate::{engine::metrics::EngineSyncMetrics, BeaconConsensus};
use crate::{
engine::metrics::EngineSyncMetrics, BeaconConsensus, BeaconConsensusEngineEvent,
ConsensusEngineLiveSyncProgress,
};
use futures::FutureExt;
use reth_db::database::Database;
use reth_interfaces::p2p::{
@@ -11,13 +14,14 @@ use reth_interfaces::p2p::{
use reth_primitives::{BlockNumber, ChainSpec, SealedBlock, B256};
use reth_stages::{ControlFlow, Pipeline, PipelineError, PipelineWithResult};
use reth_tasks::TaskSpawner;
use reth_tokio_util::EventListeners;
use std::{
cmp::{Ordering, Reverse},
collections::{binary_heap::PeekMut, BinaryHeap},
sync::Arc,
task::{ready, Context, Poll},
};
use tokio::sync::oneshot;
use tokio::sync::{mpsc::UnboundedSender, oneshot};
use tracing::trace;
/// Manages syncing under the control of the engine.
@@ -45,6 +49,8 @@ where
inflight_full_block_requests: Vec<FetchFullBlockFuture<Client>>,
/// In-flight full block _range_ requests in progress.
inflight_block_range_requests: Vec<FetchFullBlockRangeFuture<Client>>,
/// Listeners for engine events.
listeners: EventListeners<BeaconConsensusEngineEvent>,
/// Buffered blocks from downloads - this is a min-heap of blocks, using the block number for
/// ordering. This means the blocks will be popped from the heap with ascending block numbers.
range_buffered_blocks: BinaryHeap<Reverse<OrderedSealedBlock>>,
@@ -70,6 +76,7 @@ where
run_pipeline_continuously: bool,
max_block: Option<BlockNumber>,
chain_spec: Arc<ChainSpec>,
listeners: EventListeners<BeaconConsensusEngineEvent>,
) -> Self {
Self {
full_block_client: FullBlockClient::new(
@@ -83,6 +90,7 @@ where
inflight_block_range_requests: Vec::new(),
range_buffered_blocks: BinaryHeap::new(),
run_pipeline_continuously,
listeners,
max_block,
metrics: EngineSyncMetrics::default(),
}
@@ -119,6 +127,11 @@ where
self.run_pipeline_continuously
}
/// Pushes an [UnboundedSender] to the sync controller's listeners.
pub(crate) fn push_listener(&mut self, listener: UnboundedSender<BeaconConsensusEngineEvent>) {
self.listeners.push_listener(listener);
}
/// Returns `true` if a pipeline target is queued and will be triggered on the next `poll`.
#[allow(dead_code)]
pub(crate) fn is_pipeline_sync_pending(&self) -> bool {
@@ -145,6 +158,14 @@ where
/// If the `count` is 1, this will use the `download_full_block` method instead, because it
/// downloads headers and bodies for the block concurrently.
pub(crate) fn download_block_range(&mut self, hash: B256, count: u64) {
// notify listeners that we're downloading a block
self.listeners.notify(BeaconConsensusEngineEvent::LiveSyncProgress(
ConsensusEngineLiveSyncProgress::DownloadingBlocks {
remaining_blocks: count,
target: hash,
},
));
if count == 1 {
self.download_full_block(hash);
} else {
@@ -176,6 +197,15 @@ where
?hash,
"Start downloading full block"
);
// notify listeners that we're downloading a block
self.listeners.notify(BeaconConsensusEngineEvent::LiveSyncProgress(
ConsensusEngineLiveSyncProgress::DownloadingBlocks {
remaining_blocks: 1,
target: hash,
},
));
let request = self.full_block_client.get_full_block(hash);
self.inflight_full_block_requests.push(request);
@@ -525,6 +555,7 @@ mod tests {
false,
self.max_block,
chain_spec,
Default::default(),
)
}
}

View File

@@ -2,7 +2,9 @@
use crate::events::cl::ConsensusLayerHealthEvent;
use futures::Stream;
use reth_beacon_consensus::{BeaconConsensusEngineEvent, ForkchoiceStatus};
use reth_beacon_consensus::{
BeaconConsensusEngineEvent, ConsensusEngineLiveSyncProgress, ForkchoiceStatus,
};
use reth_db::{database::Database, database_metrics::DatabaseMetadata};
use reth_interfaces::consensus::ForkchoiceState;
use reth_network::{NetworkEvent, NetworkHandle};
@@ -233,6 +235,20 @@ impl<DB> NodeState<DB> {
self.safe_block_hash = Some(safe_block_hash);
self.finalized_block_hash = Some(finalized_block_hash);
}
BeaconConsensusEngineEvent::LiveSyncProgress(live_sync_progress) => {
match live_sync_progress {
ConsensusEngineLiveSyncProgress::DownloadingBlocks {
remaining_blocks,
target,
} => {
info!(
remaining_blocks,
target_block_hash=?target,
"Live sync in progress, downloading blocks"
);
}
}
}
BeaconConsensusEngineEvent::CanonicalBlockAdded(block, elapsed) => {
info!(
number=block.number,