diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index e59219cf29..6fe8d27444 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -45,7 +45,7 @@ use std::{ collections::{BTreeMap, HashMap, HashSet}, ops::Bound, sync::{ - mpsc::{Receiver, RecvError, RecvTimeoutError}, + mpsc::{Receiver, RecvError, RecvTimeoutError, Sender}, Arc, }, time::Instant, @@ -365,6 +365,15 @@ pub struct EngineApiTreeHandler { payload_validator: ExecutionPayloadValidator, /// Keeps track of internals such as executed and buffered blocks. state: EngineApiTreeState, + /// The half for sending messages to the engine. + /// + /// This is kept so that we can queue in messages to ourself that we can process later, for + /// example distributing workload across multiple messages that would otherwise take too long + /// to process. E.g. we might receive a range of downloaded blocks and we want to process + /// them one by one so that we can handle incoming engine API in between and don't become + /// unresponsive. This can happen during live sync transition where we're trying to close the + /// gap (up to 3 epochs of blocks in the worst case). + incoming_tx: Sender>>, /// Incoming engine API requests. incoming: Receiver>>, /// Outgoing events that are emitted to the handler. @@ -400,7 +409,6 @@ where executor_provider: E, consensus: Arc, payload_validator: ExecutionPayloadValidator, - incoming: Receiver>>, outgoing: UnboundedSender, state: EngineApiTreeState, canonical_in_memory_state: CanonicalInMemoryState, @@ -409,6 +417,7 @@ where payload_builder: PayloadBuilderHandle, config: TreeConfig, ) -> Self { + let (incoming_tx, incoming) = std::sync::mpsc::channel(); Self { provider, executor_provider, @@ -424,24 +433,26 @@ where payload_builder, config, metrics: Default::default(), + incoming_tx, } } - /// Creates a new `EngineApiTreeHandlerImpl` instance and spawns it in its - /// own thread. Returns the receiver end of a `EngineApiEvent` unbounded - /// channel to receive events from the engine. + /// Creates a new [`EngineApiTreeHandler`] instance and spawns it in its + /// own thread. + /// + /// Returns the sender through which incoming requests can be sent to the task and the receiver + /// end of a [`EngineApiEvent`] unbounded channel to receive events from the engine. #[allow(clippy::too_many_arguments)] pub fn spawn_new( provider: P, executor_provider: E, consensus: Arc, payload_validator: ExecutionPayloadValidator, - incoming: Receiver>>, persistence: PersistenceHandle, payload_builder: PayloadBuilderHandle, canonical_in_memory_state: CanonicalInMemoryState, config: TreeConfig, - ) -> UnboundedReceiver { + ) -> (Sender>>, UnboundedReceiver) { let best_block_number = provider.best_block_number().unwrap_or(0); let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default(); @@ -463,7 +474,6 @@ where executor_provider, consensus, payload_validator, - incoming, tx, state, canonical_in_memory_state, @@ -472,8 +482,14 @@ where payload_builder, config, ); + let incoming = task.incoming_tx.clone(); std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| task.run()).unwrap(); - outgoing + (incoming, outgoing) + } + + /// Returns a new [`Sender`] to send messages to this type. + pub fn sender(&self) -> Sender>> { + self.incoming_tx.clone() } /// Run the engine API handler. @@ -504,6 +520,7 @@ where /// Invoked when previously requested blocks were downloaded. fn on_downloaded(&mut self, blocks: Vec) -> Option { trace!(target: "engine", block_count = %blocks.len(), "received downloaded blocks"); + // TODO(mattsse): on process a certain number of blocks sequentially for block in blocks { if let Some(event) = self.on_downloaded_block(block) { let needs_backfill = event.is_backfill_action(); @@ -1970,7 +1987,6 @@ mod tests { let payload_validator = ExecutionPayloadValidator::new(chain_spec.clone()); - let (to_tree_tx, to_tree_rx) = channel(); let (from_tree_tx, from_tree_rx) = unbounded_channel(); let header = chain_spec.genesis_header().seal_slow(); @@ -1985,7 +2001,6 @@ mod tests { executor_provider.clone(), consensus, payload_validator, - to_tree_rx, from_tree_tx, engine_api_tree_state, canonical_in_memory_state, @@ -1999,8 +2014,8 @@ mod tests { .with_chain_spec((*chain_spec).clone()) .with_signer(Address::random()); Self { + to_tree_tx: tree.incoming_tx.clone(), tree, - to_tree_tx, from_tree_rx, blocks: vec![], action_rx, diff --git a/crates/ethereum/engine/src/service.rs b/crates/ethereum/engine/src/service.rs index 2ba3a4c6c7..9ec9fcf9cc 100644 --- a/crates/ethereum/engine/src/service.rs +++ b/crates/ethereum/engine/src/service.rs @@ -25,7 +25,7 @@ use reth_stages_api::Pipeline; use reth_tasks::TaskSpawner; use std::{ pin::Pin, - sync::{mpsc::channel, Arc}, + sync::Arc, task::{Context, Poll}, }; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -73,20 +73,17 @@ where let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone())); let downloader = BasicBlockDownloader::new(client, consensus.clone()); - let (to_tree_tx, to_tree_rx) = channel(); - let persistence_handle = PersistenceHandle::spawn_service(provider, pruner); let payload_validator = ExecutionPayloadValidator::new(chain_spec.clone()); let executor_factory = EthExecutorProvider::ethereum(chain_spec); let canonical_in_memory_state = blockchain_db.canonical_in_memory_state(); - let from_tree = EngineApiTreeHandler::spawn_new( + let (to_tree_tx, from_tree) = EngineApiTreeHandler::spawn_new( blockchain_db, executor_factory, consensus, payload_validator, - to_tree_rx, persistence_handle, payload_builder, canonical_in_memory_state,