From 5528aae8f62d0eeea564bf80f1393027f723a107 Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Sun, 1 Feb 2026 11:55:45 -0800 Subject: [PATCH] fix(engine): wait for persistence service thread before RocksDB drop (#21640) Co-authored-by: Amp Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com> --- Cargo.lock | 1 - crates/engine/service/Cargo.toml | 1 - crates/engine/service/src/service.rs | 3 +- crates/engine/tree/src/persistence.rs | 67 ++++++++++++++++++++------- 4 files changed, 52 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4ed3fbd591..a81c722da7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8436,7 +8436,6 @@ dependencies = [ "reth-engine-tree", "reth-ethereum-consensus", "reth-ethereum-engine-primitives", - "reth-ethereum-primitives", "reth-evm", "reth-evm-ethereum", "reth-exex-types", diff --git a/crates/engine/service/Cargo.toml b/crates/engine/service/Cargo.toml index 33468dafdb..8c866e865e 100644 --- a/crates/engine/service/Cargo.toml +++ b/crates/engine/service/Cargo.toml @@ -17,7 +17,6 @@ reth-engine-tree.workspace = true reth-evm.workspace = true reth-network-p2p.workspace = true reth-payload-builder.workspace = true -reth-ethereum-primitives.workspace = true reth-provider.workspace = true reth-prune.workspace = true reth-stages-api.workspace = true diff --git a/crates/engine/service/src/service.rs b/crates/engine/service/src/service.rs index 1983421ead..4427147251 100644 --- a/crates/engine/service/src/service.rs +++ b/crates/engine/service/src/service.rs @@ -14,7 +14,6 @@ pub use reth_engine_tree::{ chain::{ChainEvent, ChainOrchestrator}, engine::EngineApiEvent, }; -use reth_ethereum_primitives::EthPrimitives; use reth_evm::ConfigureEvm; use reth_network_p2p::BlockClient; use reth_node_types::{BlockTy, NodeTypes}; @@ -97,7 +96,7 @@ where let downloader = BasicBlockDownloader::new(client, consensus.clone()); let persistence_handle = - PersistenceHandle::::spawn_service(provider, pruner, sync_metrics_tx); + PersistenceHandle::::spawn_service(provider, pruner, sync_metrics_tx); let canonical_in_memory_state = blockchain_db.canonical_in_memory_state(); diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index 5f9c5dd5dc..02097e918b 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -12,7 +12,11 @@ use reth_provider::{ use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory}; use reth_stages_api::{MetricEvent, MetricEventsSender}; use std::{ - sync::mpsc::{Receiver, SendError, Sender}, + sync::{ + mpsc::{Receiver, SendError, Sender}, + Arc, + }, + thread::JoinHandle, time::Instant, }; use thiserror::Error; @@ -227,15 +231,25 @@ pub enum PersistenceAction { pub struct PersistenceHandle { /// The channel used to communicate with the persistence service sender: Sender>, + /// Guard that joins the service thread when all handles are dropped. + /// Uses `Arc` so the handle remains `Clone`. + _service_guard: Arc, } impl PersistenceHandle { /// Create a new [`PersistenceHandle`] from a [`Sender`]. - pub const fn new(sender: Sender>) -> Self { - Self { sender } + /// + /// This is intended for testing purposes where you want to mock the persistence service. + /// For production use, prefer [`spawn_service`](Self::spawn_service). + pub fn new(sender: Sender>) -> Self { + Self { sender, _service_guard: Arc::new(ServiceGuard(None)) } } /// Create a new [`PersistenceHandle`], and spawn the persistence service. + /// + /// The returned handle can be cloned and shared. When all clones are dropped, the service + /// thread will be joined, ensuring graceful shutdown before resources (like `RocksDB`) are + /// released. pub fn spawn_service( provider_factory: ProviderFactory, pruner: PrunerWithFactory>, @@ -247,13 +261,10 @@ impl PersistenceHandle { // create the initial channels let (db_service_tx, db_service_rx) = std::sync::mpsc::channel(); - // construct persistence handle - let persistence_handle = PersistenceHandle::new(db_service_tx); - // spawn the persistence service let db_service = PersistenceService::new(provider_factory, db_service_rx, pruner, sync_metrics_tx); - std::thread::Builder::new() + let join_handle = std::thread::Builder::new() .name("Persistence Service".to_string()) .spawn(|| { if let Err(err) = db_service.run() { @@ -262,7 +273,10 @@ impl PersistenceHandle { }) .unwrap(); - persistence_handle + PersistenceHandle { + sender: db_service_tx, + _service_guard: Arc::new(ServiceGuard(Some(join_handle))), + } } /// Sends a specific [`PersistenceAction`] in the contained channel. The caller is responsible @@ -326,6 +340,27 @@ impl PersistenceHandle { } } +/// Guard that joins the persistence service thread when dropped. +/// +/// This ensures graceful shutdown - the service thread completes before resources like +/// `RocksDB` are released. Stored in an `Arc` inside [`PersistenceHandle`] so the handle +/// can be cloned while sharing the same guard. +struct ServiceGuard(Option>); + +impl std::fmt::Debug for ServiceGuard { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("ServiceGuard").field(&self.0.as_ref().map(|_| "...")).finish() + } +} + +impl Drop for ServiceGuard { + fn drop(&mut self) { + if let Some(join_handle) = self.0.take() { + let _ = join_handle.join(); + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -352,12 +387,12 @@ mod tests { #[test] fn test_save_blocks_empty() { reth_tracing::init_test_tracing(); - let persistence_handle = default_persistence_handle(); + let handle = default_persistence_handle(); let blocks = vec![]; let (tx, rx) = crossbeam_channel::bounded(1); - persistence_handle.save_blocks(blocks, tx).unwrap(); + handle.save_blocks(blocks, tx).unwrap(); let hash = rx.recv().unwrap(); assert_eq!(hash, None); @@ -366,7 +401,7 @@ mod tests { #[test] fn test_save_blocks_single_block() { reth_tracing::init_test_tracing(); - let persistence_handle = default_persistence_handle(); + let handle = default_persistence_handle(); let block_number = 0; let mut test_block_builder = TestBlockBuilder::eth(); let executed = @@ -376,7 +411,7 @@ mod tests { let blocks = vec![executed]; let (tx, rx) = crossbeam_channel::bounded(1); - persistence_handle.save_blocks(blocks, tx).unwrap(); + handle.save_blocks(blocks, tx).unwrap(); let BlockNumHash { hash: actual_hash, number: _ } = rx .recv_timeout(std::time::Duration::from_secs(10)) @@ -389,14 +424,14 @@ mod tests { #[test] fn test_save_blocks_multiple_blocks() { reth_tracing::init_test_tracing(); - let persistence_handle = default_persistence_handle(); + let handle = default_persistence_handle(); let mut test_block_builder = TestBlockBuilder::eth(); let blocks = test_block_builder.get_executed_blocks(0..5).collect::>(); let last_hash = blocks.last().unwrap().recovered_block().hash(); let (tx, rx) = crossbeam_channel::bounded(1); - persistence_handle.save_blocks(blocks, tx).unwrap(); + handle.save_blocks(blocks, tx).unwrap(); let BlockNumHash { hash: actual_hash, number: _ } = rx.recv().unwrap().unwrap(); assert_eq!(last_hash, actual_hash); } @@ -404,7 +439,7 @@ mod tests { #[test] fn test_save_blocks_multiple_calls() { reth_tracing::init_test_tracing(); - let persistence_handle = default_persistence_handle(); + let handle = default_persistence_handle(); let ranges = [0..1, 1..2, 2..4, 4..5]; let mut test_block_builder = TestBlockBuilder::eth(); @@ -413,7 +448,7 @@ mod tests { let last_hash = blocks.last().unwrap().recovered_block().hash(); let (tx, rx) = crossbeam_channel::bounded(1); - persistence_handle.save_blocks(blocks, tx).unwrap(); + handle.save_blocks(blocks, tx).unwrap(); let BlockNumHash { hash: actual_hash, number: _ } = rx.recv().unwrap().unwrap(); assert_eq!(last_hash, actual_hash);