mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
1 Commits
devnet4
...
fix/block-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b1314bc5cf |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -7566,6 +7566,7 @@ dependencies = [
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"metrics",
|
||||
"parking_lot",
|
||||
"reth-chain-state",
|
||||
"reth-execution-cache",
|
||||
"reth-metrics",
|
||||
|
||||
@@ -29,6 +29,13 @@ use reth_tasks::Runtime;
|
||||
use reth_trie_db::ChangesetCache;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Shared lock for mutual exclusion between block removal (write) and payload building (read).
|
||||
///
|
||||
/// During reorgs, the persistence service truncates static file mmaps. Concurrent readers
|
||||
/// (e.g., payload builders) holding stale mmap references can read garbage data and panic.
|
||||
/// This lock ensures that block removal and payload building do not execute concurrently.
|
||||
pub type BlockRemovalLock = Arc<parking_lot::RwLock<()>>;
|
||||
|
||||
/// Builds the engine [`ChainOrchestrator`] that drives the chain forward.
|
||||
///
|
||||
/// This spawns and wires together the following components:
|
||||
@@ -65,6 +72,7 @@ pub fn build_engine_orchestrator<N, Client, S, V, C>(
|
||||
evm_config: C,
|
||||
changeset_cache: ChangesetCache,
|
||||
runtime: Runtime,
|
||||
block_removal_lock: BlockRemovalLock,
|
||||
) -> ChainOrchestrator<
|
||||
EngineHandler<
|
||||
EngineApiRequestHandler<EngineApiRequest<N::Payload, N::Primitives>, N::Primitives>,
|
||||
@@ -82,8 +90,12 @@ where
|
||||
{
|
||||
let downloader = BasicBlockDownloader::new(client, consensus.clone());
|
||||
|
||||
let persistence_handle =
|
||||
PersistenceHandle::<N::Primitives>::spawn_service(provider, pruner, sync_metrics_tx);
|
||||
let persistence_handle = PersistenceHandle::<N::Primitives>::spawn_service(
|
||||
provider,
|
||||
pruner,
|
||||
sync_metrics_tx,
|
||||
block_removal_lock.clone(),
|
||||
);
|
||||
|
||||
let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
|
||||
|
||||
@@ -99,6 +111,7 @@ where
|
||||
evm_config,
|
||||
changeset_cache,
|
||||
runtime,
|
||||
block_removal_lock,
|
||||
);
|
||||
|
||||
let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
|
||||
|
||||
@@ -60,6 +60,10 @@ where
|
||||
/// Pending safe block number to be committed with the next block save.
|
||||
/// This avoids triggering a separate fsync for each safe block update.
|
||||
pending_safe_block: Option<u64>,
|
||||
/// Shared lock for mutual exclusion between block removal (write) and payload
|
||||
/// building (read). Prevents concurrent readers from accessing static files
|
||||
/// while they are being truncated during reorgs.
|
||||
block_removal_lock: Arc<parking_lot::RwLock<()>>,
|
||||
}
|
||||
|
||||
impl<N> PersistenceService<N>
|
||||
@@ -72,6 +76,7 @@ where
|
||||
incoming: Receiver<PersistenceAction<N::Primitives>>,
|
||||
pruner: PrunerWithFactory<ProviderFactory<N>>,
|
||||
sync_metrics_tx: MetricEventsSender,
|
||||
block_removal_lock: Arc<parking_lot::RwLock<()>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
provider,
|
||||
@@ -81,6 +86,7 @@ where
|
||||
sync_metrics_tx,
|
||||
pending_finalized_block: None,
|
||||
pending_safe_block: None,
|
||||
block_removal_lock,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -133,6 +139,11 @@ where
|
||||
) -> Result<Option<BlockNumHash>, PersistenceError> {
|
||||
debug!(target: "engine::persistence", ?new_tip_num, "Removing blocks");
|
||||
let start_time = Instant::now();
|
||||
|
||||
// Acquire exclusive lock to prevent concurrent readers (e.g., payload builders)
|
||||
// from accessing static files while they are being truncated.
|
||||
let _write_guard = self.block_removal_lock.write();
|
||||
|
||||
let provider_rw = self.provider.database_provider_rw()?;
|
||||
|
||||
let new_tip_hash = provider_rw.block_hash(new_tip_num)?;
|
||||
@@ -267,6 +278,7 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
|
||||
provider_factory: ProviderFactory<N>,
|
||||
pruner: PrunerWithFactory<ProviderFactory<N>>,
|
||||
sync_metrics_tx: MetricEventsSender,
|
||||
block_removal_lock: Arc<parking_lot::RwLock<()>>,
|
||||
) -> PersistenceHandle<N::Primitives>
|
||||
where
|
||||
N: ProviderNodeTypes,
|
||||
@@ -275,8 +287,13 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
|
||||
let (db_service_tx, db_service_rx) = std::sync::mpsc::channel();
|
||||
|
||||
// spawn the persistence service
|
||||
let db_service =
|
||||
PersistenceService::new(provider_factory, db_service_rx, pruner, sync_metrics_tx);
|
||||
let db_service = PersistenceService::new(
|
||||
provider_factory,
|
||||
db_service_rx,
|
||||
pruner,
|
||||
sync_metrics_tx,
|
||||
block_removal_lock,
|
||||
);
|
||||
let join_handle = spawn_os_thread("persistence", || {
|
||||
if let Err(err) = db_service.run() {
|
||||
error!(target: "engine::persistence", ?err, "Persistence service failed");
|
||||
@@ -391,7 +408,12 @@ mod tests {
|
||||
Pruner::new_with_factory(provider.clone(), vec![], 5, 0, None, finished_exex_height_rx);
|
||||
|
||||
let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
|
||||
PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx)
|
||||
PersistenceHandle::<EthPrimitives>::spawn_service(
|
||||
provider,
|
||||
pruner,
|
||||
sync_metrics_tx,
|
||||
Arc::new(parking_lot::RwLock::new(())),
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -306,6 +306,10 @@ where
|
||||
execution_timing_stats: HashMap<B256, Box<ExecutionTimingStats>>,
|
||||
/// Task runtime for spawning blocking work on named, reusable threads.
|
||||
runtime: reth_tasks::Runtime,
|
||||
/// Shared lock for mutual exclusion between block removal (write) and engine/payload
|
||||
/// building (read). Prevents concurrent readers from accessing static files while they
|
||||
/// are being truncated during reorgs.
|
||||
block_removal_lock: crate::launch::BlockRemovalLock,
|
||||
}
|
||||
|
||||
impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
|
||||
@@ -333,6 +337,7 @@ where
|
||||
.field("changeset_cache", &self.changeset_cache)
|
||||
.field("execution_timing_stats", &self.execution_timing_stats.len())
|
||||
.field("runtime", &self.runtime)
|
||||
.field("block_removal_lock", &self.block_removal_lock)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@@ -373,6 +378,7 @@ where
|
||||
evm_config: C,
|
||||
changeset_cache: ChangesetCache,
|
||||
runtime: reth_tasks::Runtime,
|
||||
block_removal_lock: crate::launch::BlockRemovalLock,
|
||||
) -> Self {
|
||||
let (incoming_tx, incoming) = crossbeam_channel::unbounded();
|
||||
|
||||
@@ -396,6 +402,7 @@ where
|
||||
changeset_cache,
|
||||
execution_timing_stats: HashMap::new(),
|
||||
runtime,
|
||||
block_removal_lock,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -417,6 +424,7 @@ where
|
||||
evm_config: C,
|
||||
changeset_cache: ChangesetCache,
|
||||
runtime: reth_tasks::Runtime,
|
||||
block_removal_lock: crate::launch::BlockRemovalLock,
|
||||
) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
|
||||
{
|
||||
let best_block_number = provider.best_block_number().unwrap_or(0);
|
||||
@@ -450,6 +458,7 @@ where
|
||||
evm_config,
|
||||
changeset_cache,
|
||||
runtime,
|
||||
block_removal_lock,
|
||||
);
|
||||
let incoming = task.incoming_tx.clone();
|
||||
spawn_os_thread("engine", || {
|
||||
@@ -1568,6 +1577,10 @@ where
|
||||
let has_attrs = payload_attrs.is_some();
|
||||
|
||||
let start = Instant::now();
|
||||
// Hold a read guard to prevent concurrent block removal
|
||||
// from truncating static files while we read from them.
|
||||
let block_removal_lock = self.block_removal_lock.clone();
|
||||
let _read_guard = block_removal_lock.read();
|
||||
let mut output = self.on_forkchoice_updated(state, payload_attrs);
|
||||
|
||||
if let Ok(res) = &mut output {
|
||||
@@ -1611,6 +1624,10 @@ where
|
||||
let start = Instant::now();
|
||||
let gas_used = payload.gas_used();
|
||||
let num_hash = payload.num_hash();
|
||||
// Hold a read guard to prevent concurrent block removal
|
||||
// from truncating static files while we read from them.
|
||||
let block_removal_lock = self.block_removal_lock.clone();
|
||||
let _read_guard = block_removal_lock.read();
|
||||
let mut output = self.on_new_payload(payload);
|
||||
self.metrics.engine.new_payload.update_response_metrics(
|
||||
start,
|
||||
|
||||
@@ -223,6 +223,7 @@ impl TestHarness {
|
||||
evm_config,
|
||||
changeset_cache,
|
||||
reth_tasks::Runtime::test(),
|
||||
Arc::new(parking_lot::RwLock::new(())),
|
||||
);
|
||||
|
||||
let block_builder = TestBlockBuilder::default().with_chain_spec((*chain_spec).clone());
|
||||
|
||||
@@ -750,17 +750,27 @@ pub struct BuilderContext<Node: FullNodeTypes> {
|
||||
pub(crate) executor: TaskExecutor,
|
||||
/// Config container
|
||||
pub(crate) config_container: WithConfigs<<Node::Types as NodeTypes>::ChainSpec>,
|
||||
/// Shared lock that provides mutual exclusion between block removal (write) and
|
||||
/// payload building (read). Prevents concurrent readers from accessing static files
|
||||
/// while they are being truncated during reorgs.
|
||||
pub(crate) block_removal_lock: Arc<parking_lot::RwLock<()>>,
|
||||
}
|
||||
|
||||
impl<Node: FullNodeTypes> BuilderContext<Node> {
|
||||
/// Create a new instance of [`BuilderContext`]
|
||||
pub const fn new(
|
||||
pub fn new(
|
||||
head: Head,
|
||||
provider: Node::Provider,
|
||||
executor: TaskExecutor,
|
||||
config_container: WithConfigs<<Node::Types as NodeTypes>::ChainSpec>,
|
||||
) -> Self {
|
||||
Self { head, provider, executor, config_container }
|
||||
Self {
|
||||
head,
|
||||
provider,
|
||||
executor,
|
||||
config_container,
|
||||
block_removal_lock: Arc::new(parking_lot::RwLock::new(())),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the configured provider to interact with the blockchain.
|
||||
@@ -815,6 +825,12 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
|
||||
Ok(EnvKzgSettings::Default)
|
||||
}
|
||||
|
||||
/// Returns the shared lock used for mutual exclusion between block removal and payload
|
||||
/// building.
|
||||
pub fn block_removal_lock(&self) -> Arc<parking_lot::RwLock<()>> {
|
||||
self.block_removal_lock.clone()
|
||||
}
|
||||
|
||||
/// Returns the config for payload building.
|
||||
pub fn payload_builder_config(&self) -> impl PayloadBuilderConfig {
|
||||
self.config().builder.clone()
|
||||
|
||||
@@ -103,7 +103,8 @@ where
|
||||
ctx.task_executor().clone(),
|
||||
payload_job_config,
|
||||
payload_builder,
|
||||
);
|
||||
)
|
||||
.with_block_removal_lock(ctx.block_removal_lock());
|
||||
let (payload_service, payload_service_handle) =
|
||||
PayloadBuilderService::new(payload_generator, ctx.provider().canonical_state_stream());
|
||||
|
||||
|
||||
@@ -824,6 +824,7 @@ where
|
||||
},
|
||||
node_adapter,
|
||||
head,
|
||||
block_removal_lock: builder_ctx.block_removal_lock(),
|
||||
};
|
||||
|
||||
let ctx = LaunchContextWith {
|
||||
@@ -891,6 +892,11 @@ where
|
||||
&self.node_adapter().provider
|
||||
}
|
||||
|
||||
/// Returns the shared lock for mutual exclusion between block removal and payload building.
|
||||
pub fn block_removal_lock(&self) -> Arc<parking_lot::RwLock<()>> {
|
||||
self.right().block_removal_lock.clone()
|
||||
}
|
||||
|
||||
/// Returns the initial backfill to sync to at launch.
|
||||
///
|
||||
/// This returns the configured `debug.tip` if set, otherwise it will check if backfill was
|
||||
@@ -1242,6 +1248,8 @@ where
|
||||
db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
|
||||
node_adapter: NodeAdapter<T, CB::Components>,
|
||||
head: Head,
|
||||
/// Shared lock for mutual exclusion between block removal and payload building.
|
||||
block_removal_lock: Arc<parking_lot::RwLock<()>>,
|
||||
}
|
||||
|
||||
/// Returns the metrics hooks for the node.
|
||||
|
||||
@@ -243,6 +243,7 @@ impl EngineNodeLauncher {
|
||||
ctx.components().evm_config().clone(),
|
||||
changeset_cache,
|
||||
ctx.task_executor().clone(),
|
||||
ctx.block_removal_lock(),
|
||||
);
|
||||
|
||||
info!(target: "reth::cli", "Consensus engine initialized");
|
||||
|
||||
@@ -41,3 +41,4 @@ metrics.workspace = true
|
||||
# misc
|
||||
tracing.workspace = true
|
||||
serde.workspace = true
|
||||
parking_lot.workspace = true
|
||||
|
||||
@@ -67,6 +67,9 @@ pub struct BasicPayloadJobGenerator<Client, Builder> {
|
||||
builder: Builder,
|
||||
/// Stored `cached_reads` for new payload jobs.
|
||||
pre_cached: Option<PrecachedState>,
|
||||
/// Shared lock that prevents payload building from racing with block removal.
|
||||
/// When set, a read guard is held for the duration of each `try_build` call.
|
||||
block_removal_lock: Option<Arc<parking_lot::RwLock<()>>>,
|
||||
}
|
||||
|
||||
// === impl BasicPayloadJobGenerator ===
|
||||
@@ -87,9 +90,19 @@ impl<Client, Builder> BasicPayloadJobGenerator<Client, Builder> {
|
||||
config,
|
||||
builder,
|
||||
pre_cached: None,
|
||||
block_removal_lock: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the shared block removal lock.
|
||||
///
|
||||
/// When set, a read guard is acquired for the duration of each payload `try_build` call,
|
||||
/// preventing concurrent block removal from truncating static files mid-read.
|
||||
pub fn with_block_removal_lock(mut self, lock: Arc<parking_lot::RwLock<()>>) -> Self {
|
||||
self.block_removal_lock = Some(lock);
|
||||
self
|
||||
}
|
||||
|
||||
/// Returns the maximum duration a job should be allowed to run.
|
||||
///
|
||||
/// This adheres to the following specification:
|
||||
@@ -182,6 +195,7 @@ where
|
||||
payload_task_guard: self.payload_task_guard.clone(),
|
||||
metrics: Default::default(),
|
||||
builder: self.builder.clone(),
|
||||
block_removal_lock: self.block_removal_lock.clone(),
|
||||
};
|
||||
|
||||
// start the first job right away
|
||||
@@ -337,6 +351,8 @@ where
|
||||
///
|
||||
/// See [`PayloadBuilder`]
|
||||
builder: Builder,
|
||||
/// Shared lock that prevents payload building from racing with block removal.
|
||||
block_removal_lock: Option<Arc<parking_lot::RwLock<()>>>,
|
||||
}
|
||||
|
||||
impl<Builder> BasicPayloadJob<Builder>
|
||||
@@ -359,6 +375,7 @@ where
|
||||
let execution_cache = self.execution_cache.clone();
|
||||
let trie_handle = self.trie_handle.take();
|
||||
let builder = self.builder.clone();
|
||||
let block_removal_lock = self.block_removal_lock.clone();
|
||||
self.executor.spawn_blocking_task(async move {
|
||||
// acquire the permit for executing the task
|
||||
let _permit = guard.acquire().await;
|
||||
@@ -370,6 +387,9 @@ where
|
||||
cancel,
|
||||
best_payload,
|
||||
};
|
||||
// Hold a read guard for the duration of try_build to prevent concurrent
|
||||
// block removal from truncating static files while we read from them.
|
||||
let _read_guard = block_removal_lock.as_ref().map(|lock| lock.read());
|
||||
let result = builder.try_build(args);
|
||||
let _ = tx.send(result);
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user