Compare commits

...

1 Commits

Author SHA1 Message Date
joshieDo
b1314bc5cf fix: add BlockRemovalLock to prevent NippyJar panics during reorgs
During reorgs, the persistence service truncates static file mmaps via
remove_blocks_above/prune_rows/set_len. Concurrent readers (payload
builders, engine tree) holding stale Arc<DataReader> mmaps read
garbage/zeroed data and panic with 'slice index starts at N but ends at 0'.

This adds a shared Arc<RwLock<()>> (BlockRemovalLock) that provides
mutual exclusion at the operation level:

- PersistenceService::on_remove_blocks_above acquires a write lock for
  the duration of block removal, preventing concurrent reads.
- BasicPayloadJob::spawn_build_job acquires a read lock for the
  duration of try_build, allowing concurrent payload builds but blocking
  when block removal is in progress.

The lock is created in BuilderContext::new(), threaded through
WithComponents to the launch context, and passed independently to both
PersistenceHandle::spawn_service and BasicPayloadJobGenerator.

Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019d3fdf-990d-748e-b307-cce56b7523a3
2026-03-30 19:23:33 +01:00
11 changed files with 109 additions and 8 deletions

1
Cargo.lock generated
View File

@@ -7566,6 +7566,7 @@ dependencies = [
"futures-core",
"futures-util",
"metrics",
"parking_lot",
"reth-chain-state",
"reth-execution-cache",
"reth-metrics",

View File

@@ -29,6 +29,13 @@ use reth_tasks::Runtime;
use reth_trie_db::ChangesetCache;
use std::sync::Arc;
/// Shared lock for mutual exclusion between block removal (write) and payload building (read).
///
/// During reorgs, the persistence service truncates static file mmaps. Concurrent readers
/// (e.g., payload builders) holding stale mmap references can read garbage data and panic.
/// This lock ensures that block removal and payload building do not execute concurrently.
pub type BlockRemovalLock = Arc<parking_lot::RwLock<()>>;
/// Builds the engine [`ChainOrchestrator`] that drives the chain forward.
///
/// This spawns and wires together the following components:
@@ -65,6 +72,7 @@ pub fn build_engine_orchestrator<N, Client, S, V, C>(
evm_config: C,
changeset_cache: ChangesetCache,
runtime: Runtime,
block_removal_lock: BlockRemovalLock,
) -> ChainOrchestrator<
EngineHandler<
EngineApiRequestHandler<EngineApiRequest<N::Payload, N::Primitives>, N::Primitives>,
@@ -82,8 +90,12 @@ where
{
let downloader = BasicBlockDownloader::new(client, consensus.clone());
let persistence_handle =
PersistenceHandle::<N::Primitives>::spawn_service(provider, pruner, sync_metrics_tx);
let persistence_handle = PersistenceHandle::<N::Primitives>::spawn_service(
provider,
pruner,
sync_metrics_tx,
block_removal_lock.clone(),
);
let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
@@ -99,6 +111,7 @@ where
evm_config,
changeset_cache,
runtime,
block_removal_lock,
);
let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);

View File

@@ -60,6 +60,10 @@ where
/// Pending safe block number to be committed with the next block save.
/// This avoids triggering a separate fsync for each safe block update.
pending_safe_block: Option<u64>,
/// Shared lock for mutual exclusion between block removal (write) and payload
/// building (read). Prevents concurrent readers from accessing static files
/// while they are being truncated during reorgs.
block_removal_lock: Arc<parking_lot::RwLock<()>>,
}
impl<N> PersistenceService<N>
@@ -72,6 +76,7 @@ where
incoming: Receiver<PersistenceAction<N::Primitives>>,
pruner: PrunerWithFactory<ProviderFactory<N>>,
sync_metrics_tx: MetricEventsSender,
block_removal_lock: Arc<parking_lot::RwLock<()>>,
) -> Self {
Self {
provider,
@@ -81,6 +86,7 @@ where
sync_metrics_tx,
pending_finalized_block: None,
pending_safe_block: None,
block_removal_lock,
}
}
}
@@ -133,6 +139,11 @@ where
) -> Result<Option<BlockNumHash>, PersistenceError> {
debug!(target: "engine::persistence", ?new_tip_num, "Removing blocks");
let start_time = Instant::now();
// Acquire exclusive lock to prevent concurrent readers (e.g., payload builders)
// from accessing static files while they are being truncated.
let _write_guard = self.block_removal_lock.write();
let provider_rw = self.provider.database_provider_rw()?;
let new_tip_hash = provider_rw.block_hash(new_tip_num)?;
@@ -267,6 +278,7 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
provider_factory: ProviderFactory<N>,
pruner: PrunerWithFactory<ProviderFactory<N>>,
sync_metrics_tx: MetricEventsSender,
block_removal_lock: Arc<parking_lot::RwLock<()>>,
) -> PersistenceHandle<N::Primitives>
where
N: ProviderNodeTypes,
@@ -275,8 +287,13 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
let (db_service_tx, db_service_rx) = std::sync::mpsc::channel();
// spawn the persistence service
let db_service =
PersistenceService::new(provider_factory, db_service_rx, pruner, sync_metrics_tx);
let db_service = PersistenceService::new(
provider_factory,
db_service_rx,
pruner,
sync_metrics_tx,
block_removal_lock,
);
let join_handle = spawn_os_thread("persistence", || {
if let Err(err) = db_service.run() {
error!(target: "engine::persistence", ?err, "Persistence service failed");
@@ -391,7 +408,12 @@ mod tests {
Pruner::new_with_factory(provider.clone(), vec![], 5, 0, None, finished_exex_height_rx);
let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx)
PersistenceHandle::<EthPrimitives>::spawn_service(
provider,
pruner,
sync_metrics_tx,
Arc::new(parking_lot::RwLock::new(())),
)
}
#[test]

View File

@@ -306,6 +306,10 @@ where
execution_timing_stats: HashMap<B256, Box<ExecutionTimingStats>>,
/// Task runtime for spawning blocking work on named, reusable threads.
runtime: reth_tasks::Runtime,
/// Shared lock for mutual exclusion between block removal (write) and engine/payload
/// building (read). Prevents concurrent readers from accessing static files while they
/// are being truncated during reorgs.
block_removal_lock: crate::launch::BlockRemovalLock,
}
impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
@@ -333,6 +337,7 @@ where
.field("changeset_cache", &self.changeset_cache)
.field("execution_timing_stats", &self.execution_timing_stats.len())
.field("runtime", &self.runtime)
.field("block_removal_lock", &self.block_removal_lock)
.finish()
}
}
@@ -373,6 +378,7 @@ where
evm_config: C,
changeset_cache: ChangesetCache,
runtime: reth_tasks::Runtime,
block_removal_lock: crate::launch::BlockRemovalLock,
) -> Self {
let (incoming_tx, incoming) = crossbeam_channel::unbounded();
@@ -396,6 +402,7 @@ where
changeset_cache,
execution_timing_stats: HashMap::new(),
runtime,
block_removal_lock,
}
}
@@ -417,6 +424,7 @@ where
evm_config: C,
changeset_cache: ChangesetCache,
runtime: reth_tasks::Runtime,
block_removal_lock: crate::launch::BlockRemovalLock,
) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
{
let best_block_number = provider.best_block_number().unwrap_or(0);
@@ -450,6 +458,7 @@ where
evm_config,
changeset_cache,
runtime,
block_removal_lock,
);
let incoming = task.incoming_tx.clone();
spawn_os_thread("engine", || {
@@ -1568,6 +1577,10 @@ where
let has_attrs = payload_attrs.is_some();
let start = Instant::now();
// Hold a read guard to prevent concurrent block removal
// from truncating static files while we read from them.
let block_removal_lock = self.block_removal_lock.clone();
let _read_guard = block_removal_lock.read();
let mut output = self.on_forkchoice_updated(state, payload_attrs);
if let Ok(res) = &mut output {
@@ -1611,6 +1624,10 @@ where
let start = Instant::now();
let gas_used = payload.gas_used();
let num_hash = payload.num_hash();
// Hold a read guard to prevent concurrent block removal
// from truncating static files while we read from them.
let block_removal_lock = self.block_removal_lock.clone();
let _read_guard = block_removal_lock.read();
let mut output = self.on_new_payload(payload);
self.metrics.engine.new_payload.update_response_metrics(
start,

View File

@@ -223,6 +223,7 @@ impl TestHarness {
evm_config,
changeset_cache,
reth_tasks::Runtime::test(),
Arc::new(parking_lot::RwLock::new(())),
);
let block_builder = TestBlockBuilder::default().with_chain_spec((*chain_spec).clone());

View File

@@ -750,17 +750,27 @@ pub struct BuilderContext<Node: FullNodeTypes> {
pub(crate) executor: TaskExecutor,
/// Config container
pub(crate) config_container: WithConfigs<<Node::Types as NodeTypes>::ChainSpec>,
/// Shared lock that provides mutual exclusion between block removal (write) and
/// payload building (read). Prevents concurrent readers from accessing static files
/// while they are being truncated during reorgs.
pub(crate) block_removal_lock: Arc<parking_lot::RwLock<()>>,
}
impl<Node: FullNodeTypes> BuilderContext<Node> {
/// Create a new instance of [`BuilderContext`]
pub const fn new(
pub fn new(
head: Head,
provider: Node::Provider,
executor: TaskExecutor,
config_container: WithConfigs<<Node::Types as NodeTypes>::ChainSpec>,
) -> Self {
Self { head, provider, executor, config_container }
Self {
head,
provider,
executor,
config_container,
block_removal_lock: Arc::new(parking_lot::RwLock::new(())),
}
}
/// Returns the configured provider to interact with the blockchain.
@@ -815,6 +825,12 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
Ok(EnvKzgSettings::Default)
}
/// Returns the shared lock used for mutual exclusion between block removal and payload
/// building.
pub fn block_removal_lock(&self) -> Arc<parking_lot::RwLock<()>> {
self.block_removal_lock.clone()
}
/// Returns the config for payload building.
pub fn payload_builder_config(&self) -> impl PayloadBuilderConfig {
self.config().builder.clone()

View File

@@ -103,7 +103,8 @@ where
ctx.task_executor().clone(),
payload_job_config,
payload_builder,
);
)
.with_block_removal_lock(ctx.block_removal_lock());
let (payload_service, payload_service_handle) =
PayloadBuilderService::new(payload_generator, ctx.provider().canonical_state_stream());

View File

@@ -824,6 +824,7 @@ where
},
node_adapter,
head,
block_removal_lock: builder_ctx.block_removal_lock(),
};
let ctx = LaunchContextWith {
@@ -891,6 +892,11 @@ where
&self.node_adapter().provider
}
/// Returns the shared lock for mutual exclusion between block removal and payload building.
pub fn block_removal_lock(&self) -> Arc<parking_lot::RwLock<()>> {
self.right().block_removal_lock.clone()
}
/// Returns the initial backfill to sync to at launch.
///
/// This returns the configured `debug.tip` if set, otherwise it will check if backfill was
@@ -1242,6 +1248,8 @@ where
db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
node_adapter: NodeAdapter<T, CB::Components>,
head: Head,
/// Shared lock for mutual exclusion between block removal and payload building.
block_removal_lock: Arc<parking_lot::RwLock<()>>,
}
/// Returns the metrics hooks for the node.

View File

@@ -243,6 +243,7 @@ impl EngineNodeLauncher {
ctx.components().evm_config().clone(),
changeset_cache,
ctx.task_executor().clone(),
ctx.block_removal_lock(),
);
info!(target: "reth::cli", "Consensus engine initialized");

View File

@@ -41,3 +41,4 @@ metrics.workspace = true
# misc
tracing.workspace = true
serde.workspace = true
parking_lot.workspace = true

View File

@@ -67,6 +67,9 @@ pub struct BasicPayloadJobGenerator<Client, Builder> {
builder: Builder,
/// Stored `cached_reads` for new payload jobs.
pre_cached: Option<PrecachedState>,
/// Shared lock that prevents payload building from racing with block removal.
/// When set, a read guard is held for the duration of each `try_build` call.
block_removal_lock: Option<Arc<parking_lot::RwLock<()>>>,
}
// === impl BasicPayloadJobGenerator ===
@@ -87,9 +90,19 @@ impl<Client, Builder> BasicPayloadJobGenerator<Client, Builder> {
config,
builder,
pre_cached: None,
block_removal_lock: None,
}
}
/// Sets the shared block removal lock.
///
/// When set, a read guard is acquired for the duration of each payload `try_build` call,
/// preventing concurrent block removal from truncating static files mid-read.
pub fn with_block_removal_lock(mut self, lock: Arc<parking_lot::RwLock<()>>) -> Self {
self.block_removal_lock = Some(lock);
self
}
/// Returns the maximum duration a job should be allowed to run.
///
/// This adheres to the following specification:
@@ -182,6 +195,7 @@ where
payload_task_guard: self.payload_task_guard.clone(),
metrics: Default::default(),
builder: self.builder.clone(),
block_removal_lock: self.block_removal_lock.clone(),
};
// start the first job right away
@@ -337,6 +351,8 @@ where
///
/// See [`PayloadBuilder`]
builder: Builder,
/// Shared lock that prevents payload building from racing with block removal.
block_removal_lock: Option<Arc<parking_lot::RwLock<()>>>,
}
impl<Builder> BasicPayloadJob<Builder>
@@ -359,6 +375,7 @@ where
let execution_cache = self.execution_cache.clone();
let trie_handle = self.trie_handle.take();
let builder = self.builder.clone();
let block_removal_lock = self.block_removal_lock.clone();
self.executor.spawn_blocking_task(async move {
// acquire the permit for executing the task
let _permit = guard.acquire().await;
@@ -370,6 +387,9 @@ where
cancel,
best_payload,
};
// Hold a read guard for the duration of try_build to prevent concurrent
// block removal from truncating static files while we read from them.
let _read_guard = block_removal_lock.as_ref().map(|lock| lock.read());
let result = builder.try_build(args);
let _ = tx.send(result);
});