fix(engine): wait for persistence service thread before RocksDB drop (#21640)

Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
This commit is contained in:
Georgios Konstantopoulos
2026-02-01 11:55:45 -08:00
committed by GitHub
parent 83364aa2d6
commit 5528aae8f6
4 changed files with 52 additions and 20 deletions

1
Cargo.lock generated
View File

@@ -8436,7 +8436,6 @@ dependencies = [
"reth-engine-tree",
"reth-ethereum-consensus",
"reth-ethereum-engine-primitives",
"reth-ethereum-primitives",
"reth-evm",
"reth-evm-ethereum",
"reth-exex-types",

View File

@@ -17,7 +17,6 @@ reth-engine-tree.workspace = true
reth-evm.workspace = true
reth-network-p2p.workspace = true
reth-payload-builder.workspace = true
reth-ethereum-primitives.workspace = true
reth-provider.workspace = true
reth-prune.workspace = true
reth-stages-api.workspace = true

View File

@@ -14,7 +14,6 @@ pub use reth_engine_tree::{
chain::{ChainEvent, ChainOrchestrator},
engine::EngineApiEvent,
};
use reth_ethereum_primitives::EthPrimitives;
use reth_evm::ConfigureEvm;
use reth_network_p2p::BlockClient;
use reth_node_types::{BlockTy, NodeTypes};
@@ -97,7 +96,7 @@ where
let downloader = BasicBlockDownloader::new(client, consensus.clone());
let persistence_handle =
PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx);
PersistenceHandle::<N::Primitives>::spawn_service(provider, pruner, sync_metrics_tx);
let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();

View File

@@ -12,7 +12,11 @@ use reth_provider::{
use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory};
use reth_stages_api::{MetricEvent, MetricEventsSender};
use std::{
sync::mpsc::{Receiver, SendError, Sender},
sync::{
mpsc::{Receiver, SendError, Sender},
Arc,
},
thread::JoinHandle,
time::Instant,
};
use thiserror::Error;
@@ -227,15 +231,25 @@ pub enum PersistenceAction<N: NodePrimitives = EthPrimitives> {
pub struct PersistenceHandle<N: NodePrimitives = EthPrimitives> {
/// The channel used to communicate with the persistence service
sender: Sender<PersistenceAction<N>>,
/// Guard that joins the service thread when all handles are dropped.
/// Uses `Arc` so the handle remains `Clone`.
_service_guard: Arc<ServiceGuard>,
}
impl<T: NodePrimitives> PersistenceHandle<T> {
/// Create a new [`PersistenceHandle`] from a [`Sender<PersistenceAction>`].
pub const fn new(sender: Sender<PersistenceAction<T>>) -> Self {
Self { sender }
///
/// This is intended for testing purposes where you want to mock the persistence service.
/// For production use, prefer [`spawn_service`](Self::spawn_service).
pub fn new(sender: Sender<PersistenceAction<T>>) -> Self {
Self { sender, _service_guard: Arc::new(ServiceGuard(None)) }
}
/// Create a new [`PersistenceHandle`], and spawn the persistence service.
///
/// The returned handle can be cloned and shared. When all clones are dropped, the service
/// thread will be joined, ensuring graceful shutdown before resources (like `RocksDB`) are
/// released.
pub fn spawn_service<N>(
provider_factory: ProviderFactory<N>,
pruner: PrunerWithFactory<ProviderFactory<N>>,
@@ -247,13 +261,10 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
// create the initial channels
let (db_service_tx, db_service_rx) = std::sync::mpsc::channel();
// construct persistence handle
let persistence_handle = PersistenceHandle::new(db_service_tx);
// spawn the persistence service
let db_service =
PersistenceService::new(provider_factory, db_service_rx, pruner, sync_metrics_tx);
std::thread::Builder::new()
let join_handle = std::thread::Builder::new()
.name("Persistence Service".to_string())
.spawn(|| {
if let Err(err) = db_service.run() {
@@ -262,7 +273,10 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
})
.unwrap();
persistence_handle
PersistenceHandle {
sender: db_service_tx,
_service_guard: Arc::new(ServiceGuard(Some(join_handle))),
}
}
/// Sends a specific [`PersistenceAction`] in the contained channel. The caller is responsible
@@ -326,6 +340,27 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
}
}
/// Guard that joins the persistence service thread when dropped.
///
/// This ensures graceful shutdown - the service thread completes before resources like
/// `RocksDB` are released. Stored in an `Arc` inside [`PersistenceHandle`] so the handle
/// can be cloned while sharing the same guard.
struct ServiceGuard(Option<JoinHandle<()>>);
impl std::fmt::Debug for ServiceGuard {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("ServiceGuard").field(&self.0.as_ref().map(|_| "...")).finish()
}
}
impl Drop for ServiceGuard {
fn drop(&mut self) {
if let Some(join_handle) = self.0.take() {
let _ = join_handle.join();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -352,12 +387,12 @@ mod tests {
#[test]
fn test_save_blocks_empty() {
reth_tracing::init_test_tracing();
let persistence_handle = default_persistence_handle();
let handle = default_persistence_handle();
let blocks = vec![];
let (tx, rx) = crossbeam_channel::bounded(1);
persistence_handle.save_blocks(blocks, tx).unwrap();
handle.save_blocks(blocks, tx).unwrap();
let hash = rx.recv().unwrap();
assert_eq!(hash, None);
@@ -366,7 +401,7 @@ mod tests {
#[test]
fn test_save_blocks_single_block() {
reth_tracing::init_test_tracing();
let persistence_handle = default_persistence_handle();
let handle = default_persistence_handle();
let block_number = 0;
let mut test_block_builder = TestBlockBuilder::eth();
let executed =
@@ -376,7 +411,7 @@ mod tests {
let blocks = vec![executed];
let (tx, rx) = crossbeam_channel::bounded(1);
persistence_handle.save_blocks(blocks, tx).unwrap();
handle.save_blocks(blocks, tx).unwrap();
let BlockNumHash { hash: actual_hash, number: _ } = rx
.recv_timeout(std::time::Duration::from_secs(10))
@@ -389,14 +424,14 @@ mod tests {
#[test]
fn test_save_blocks_multiple_blocks() {
reth_tracing::init_test_tracing();
let persistence_handle = default_persistence_handle();
let handle = default_persistence_handle();
let mut test_block_builder = TestBlockBuilder::eth();
let blocks = test_block_builder.get_executed_blocks(0..5).collect::<Vec<_>>();
let last_hash = blocks.last().unwrap().recovered_block().hash();
let (tx, rx) = crossbeam_channel::bounded(1);
persistence_handle.save_blocks(blocks, tx).unwrap();
handle.save_blocks(blocks, tx).unwrap();
let BlockNumHash { hash: actual_hash, number: _ } = rx.recv().unwrap().unwrap();
assert_eq!(last_hash, actual_hash);
}
@@ -404,7 +439,7 @@ mod tests {
#[test]
fn test_save_blocks_multiple_calls() {
reth_tracing::init_test_tracing();
let persistence_handle = default_persistence_handle();
let handle = default_persistence_handle();
let ranges = [0..1, 1..2, 2..4, 4..5];
let mut test_block_builder = TestBlockBuilder::eth();
@@ -413,7 +448,7 @@ mod tests {
let last_hash = blocks.last().unwrap().recovered_block().hash();
let (tx, rx) = crossbeam_channel::bounded(1);
persistence_handle.save_blocks(blocks, tx).unwrap();
handle.save_blocks(blocks, tx).unwrap();
let BlockNumHash { hash: actual_hash, number: _ } = rx.recv().unwrap().unwrap();
assert_eq!(last_hash, actual_hash);