diff --git a/crates/engine/tree/src/tree/config.rs b/crates/engine/tree/src/tree/config.rs index 2d5a3b9020..be6e49cbe6 100644 --- a/crates/engine/tree/src/tree/config.rs +++ b/crates/engine/tree/src/tree/config.rs @@ -4,6 +4,8 @@ const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 256; const DEFAULT_BLOCK_BUFFER_LIMIT: u32 = 256; const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256; +const DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE: usize = 4; + /// The configuration of the engine tree. #[derive(Debug)] pub struct TreeConfig { @@ -14,6 +16,8 @@ pub struct TreeConfig { block_buffer_limit: u32, /// Number of invalid headers to keep in cache. max_invalid_header_cache_length: u32, + /// Maximum number of blocks to execute sequentially in a batch. + max_execute_block_batch_size: usize, } impl Default for TreeConfig { @@ -22,6 +26,7 @@ impl Default for TreeConfig { persistence_threshold: DEFAULT_PERSISTENCE_THRESHOLD, block_buffer_limit: DEFAULT_BLOCK_BUFFER_LIMIT, max_invalid_header_cache_length: DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH, + max_execute_block_batch_size: DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE, } } } @@ -32,8 +37,14 @@ impl TreeConfig { persistence_threshold: u64, block_buffer_limit: u32, max_invalid_header_cache_length: u32, + max_execute_block_batch_size: usize, ) -> Self { - Self { persistence_threshold, block_buffer_limit, max_invalid_header_cache_length } + Self { + persistence_threshold, + block_buffer_limit, + max_invalid_header_cache_length, + max_execute_block_batch_size, + } } /// Return the persistence threshold. @@ -51,6 +62,11 @@ impl TreeConfig { self.max_invalid_header_cache_length } + /// Return the maximum execute block batch size. + pub const fn max_execute_block_batch_size(&self) -> usize { + self.max_execute_block_batch_size + } + /// Setter for persistence threshold. pub const fn with_persistence_threshold(mut self, persistence_threshold: u64) -> Self { self.persistence_threshold = persistence_threshold; @@ -71,4 +87,13 @@ impl TreeConfig { self.max_invalid_header_cache_length = max_invalid_header_cache_length; self } + + /// Setter for maximum execute block batch size. + pub const fn with_max_execute_block_batch_size( + mut self, + max_execute_block_batch_size: usize, + ) -> Self { + self.max_execute_block_batch_size = max_execute_block_batch_size; + self + } } diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 5796756596..aca07a9602 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -518,19 +518,34 @@ where } /// Invoked when previously requested blocks were downloaded. - fn on_downloaded(&mut self, blocks: Vec) -> Option { + /// + /// If the block count exceeds the configured batch size we're allowed to execute at once, this + /// will execute the first batch and send the remaining blocks back through the channel so that + /// don't block request processing for a long time. + fn on_downloaded(&mut self, mut blocks: Vec) -> Option { + if blocks.is_empty() { + // nothing to execute + return None + } + trace!(target: "engine", block_count = %blocks.len(), "received downloaded blocks"); - // TODO(mattsse): on process a certain number of blocks sequentially - for block in blocks { + let batch = self.config.max_execute_block_batch_size().min(blocks.len()); + for block in blocks.drain(..batch) { if let Some(event) = self.on_downloaded_block(block) { let needs_backfill = event.is_backfill_action(); self.on_tree_event(event); if needs_backfill { // can exit early if backfill is needed - break + return None } } } + + // if we still have blocks to execute, send them as a followup request + if !blocks.is_empty() { + let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks)); + } + None } @@ -2234,6 +2249,41 @@ mod tests { } } + #[test] + fn test_tree_persist_block_batch() { + let tree_config = TreeConfig::default(); + let chain_spec = MAINNET.clone(); + let mut test_block_builder = + TestBlockBuilder::default().with_chain_spec((*chain_spec).clone()); + + // we need more than tree_config.persistence_threshold() +1 blocks to + // trigger the persistence task. + let blocks: Vec<_> = test_block_builder + .get_executed_blocks(1..tree_config.persistence_threshold() + 2) + .collect(); + let mut test_harness = TestHarness::new(chain_spec).with_blocks(blocks); + + let mut blocks = vec![]; + for idx in 0..tree_config.max_execute_block_batch_size() * 2 { + blocks.push(test_block_builder.generate_random_block(idx as u64, B256::random())); + } + + test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(blocks)).unwrap(); + + // process the message + let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap(); + test_harness.tree.on_engine_message(msg); + + // we now should receive the other batch + let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap(); + match msg { + FromEngine::DownloadedBlocks(blocks) => { + assert_eq!(blocks.len(), tree_config.max_execute_block_batch_size()); + } + _ => panic!("unexpected message: {:#?}", msg), + } + } + #[tokio::test] async fn test_tree_persist_blocks() { let tree_config = TreeConfig::default(); @@ -2721,6 +2771,7 @@ mod tests { let chain_spec = MAINNET.clone(); let mut test_harness = TestHarness::new(chain_spec.clone()); + test_harness.tree.config = test_harness.tree.config.with_max_execute_block_batch_size(100); // create base chain and setup test harness with it let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();