mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
7 Commits
staging
...
feat/bal-c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6cbcfe01a0 | ||
|
|
a0aac13f75 | ||
|
|
9f5cf847cc | ||
|
|
df1413167a | ||
|
|
3f50a36191 | ||
|
|
d750b4976d | ||
|
|
7a65d2595d |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -10628,6 +10628,7 @@ dependencies = [
|
||||
"jsonrpsee-core",
|
||||
"jsonrpsee-types",
|
||||
"metrics",
|
||||
"parking_lot",
|
||||
"reth-chainspec",
|
||||
"reth-engine-primitives",
|
||||
"reth-ethereum-engine-primitives",
|
||||
|
||||
@@ -41,6 +41,7 @@ metrics.workspace = true
|
||||
async-trait.workspace = true
|
||||
jsonrpsee-core.workspace = true
|
||||
jsonrpsee-types.workspace = true
|
||||
parking_lot.workspace = true
|
||||
serde.workspace = true
|
||||
thiserror.workspace = true
|
||||
tracing.workspace = true
|
||||
|
||||
274
crates/rpc/rpc-engine-api/src/bal_cache.rs
Normal file
274
crates/rpc/rpc-engine-api/src/bal_cache.rs
Normal file
@@ -0,0 +1,274 @@
|
||||
//! Block Access List (BAL) cache for EIP-7928.
|
||||
//!
|
||||
//! This module provides an in-memory cache for storing Block Access Lists received via
|
||||
//! the Engine API. BALs are stored for valid payloads and can be retrieved via
|
||||
//! `engine_getBALsByHashV1` and `engine_getBALsByRangeV1`.
|
||||
//!
|
||||
//! According to EIP-7928, the EL MUST retain BALs for at least the duration of the
|
||||
//! weak subjectivity period (~3533 epochs) to support synchronization with re-execution.
|
||||
//! This initial implementation uses a simple in-memory cache with configurable capacity.
|
||||
|
||||
use alloy_primitives::{BlockHash, BlockNumber, Bytes};
|
||||
use parking_lot::RwLock;
|
||||
use reth_metrics::{
|
||||
metrics::{Counter, Gauge},
|
||||
Metrics,
|
||||
};
|
||||
use std::{
|
||||
collections::{BTreeMap, HashMap},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
/// Default capacity for the BAL cache.
|
||||
///
|
||||
/// This is a conservative default - production deployments should configure based on
|
||||
/// weak subjectivity period requirements (~3533 epochs ≈ 113,000 blocks).
|
||||
const DEFAULT_BAL_CACHE_CAPACITY: u32 = 1024;
|
||||
|
||||
/// In-memory cache for Block Access Lists (BALs).
|
||||
///
|
||||
/// Provides O(1) lookups by block hash and O(log n) range queries by block number.
|
||||
/// Evicts the oldest (lowest) block numbers when capacity is exceeded.
|
||||
///
|
||||
/// This type is cheaply cloneable as it wraps an `Arc` internally.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BalCache {
|
||||
inner: Arc<BalCacheInner>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct BalCacheInner {
|
||||
/// Maximum number of entries to store.
|
||||
capacity: u32,
|
||||
/// Mapping from block hash to BAL bytes.
|
||||
entries: RwLock<HashMap<BlockHash, Bytes>>,
|
||||
/// Index mapping block number to block hash for range queries.
|
||||
/// Uses `BTreeMap` for efficient range iteration and eviction of oldest blocks.
|
||||
block_index: RwLock<BTreeMap<BlockNumber, BlockHash>>,
|
||||
/// Cache metrics.
|
||||
metrics: BalCacheMetrics,
|
||||
}
|
||||
|
||||
impl BalCache {
|
||||
/// Creates a new BAL cache with the default capacity.
|
||||
pub fn new() -> Self {
|
||||
Self::with_capacity(DEFAULT_BAL_CACHE_CAPACITY)
|
||||
}
|
||||
|
||||
/// Creates a new BAL cache with the specified capacity.
|
||||
pub fn with_capacity(capacity: u32) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(BalCacheInner {
|
||||
capacity,
|
||||
entries: RwLock::new(HashMap::new()),
|
||||
block_index: RwLock::new(BTreeMap::new()),
|
||||
metrics: BalCacheMetrics::default(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Inserts a BAL into the cache.
|
||||
///
|
||||
/// If a different hash already exists for this block number (reorg), the old entry
|
||||
/// is removed first. If the cache is at capacity, the oldest block number is evicted.
|
||||
pub fn insert(&self, block_hash: BlockHash, block_number: BlockNumber, bal: Bytes) {
|
||||
let mut entries = self.inner.entries.write();
|
||||
let mut block_index = self.inner.block_index.write();
|
||||
|
||||
// If this block number already has a different hash, remove the old entry
|
||||
if let Some(old_hash) = block_index.get(&block_number) &&
|
||||
*old_hash != block_hash
|
||||
{
|
||||
entries.remove(old_hash);
|
||||
}
|
||||
|
||||
// Evict oldest block if at capacity and this is a new entry
|
||||
if !entries.contains_key(&block_hash) &&
|
||||
entries.len() as u32 >= self.inner.capacity &&
|
||||
let Some((&oldest_num, &oldest_hash)) = block_index.first_key_value()
|
||||
{
|
||||
entries.remove(&oldest_hash);
|
||||
block_index.remove(&oldest_num);
|
||||
}
|
||||
|
||||
entries.insert(block_hash, bal);
|
||||
|
||||
block_index.insert(block_number, block_hash);
|
||||
|
||||
self.inner.metrics.inserts.increment(1);
|
||||
self.inner.metrics.count.set(entries.len() as f64);
|
||||
}
|
||||
|
||||
/// Retrieves BALs for the given block hashes.
|
||||
///
|
||||
/// Returns a vector with the same length as `block_hashes`, where each element
|
||||
/// is `Some(bal)` if found or `None` if not in cache.
|
||||
pub fn get_by_hashes(&self, block_hashes: &[BlockHash]) -> Vec<Option<Bytes>> {
|
||||
let entries = self.inner.entries.read();
|
||||
block_hashes
|
||||
.iter()
|
||||
.map(|hash| {
|
||||
let result = entries.get(hash).cloned();
|
||||
if result.is_some() {
|
||||
self.inner.metrics.hits.increment(1);
|
||||
} else {
|
||||
self.inner.metrics.misses.increment(1);
|
||||
}
|
||||
result
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Retrieves BALs for a range of blocks starting at `start` for `count` blocks.
|
||||
///
|
||||
/// Returns a vector of contiguous BALs in block number order, stopping at the first
|
||||
/// missing block. This ensures the caller knows the returned BALs correspond to
|
||||
/// blocks `[start, start + len)`.
|
||||
pub fn get_by_range(&self, start: BlockNumber, count: u64) -> Vec<Bytes> {
|
||||
let entries = self.inner.entries.read();
|
||||
let block_index = self.inner.block_index.read();
|
||||
|
||||
let mut result = Vec::new();
|
||||
for block_num in start..start.saturating_add(count) {
|
||||
let Some(hash) = block_index.get(&block_num) else {
|
||||
break;
|
||||
};
|
||||
let Some(bal) = entries.get(hash) else {
|
||||
break;
|
||||
};
|
||||
result.push(bal.clone());
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
/// Returns the number of entries in the cache.
|
||||
#[cfg(test)]
|
||||
fn len(&self) -> usize {
|
||||
self.inner.entries.read().len()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for BalCache {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
/// Metrics for the BAL cache.
|
||||
#[derive(Metrics)]
|
||||
#[metrics(scope = "engine.bal_cache")]
|
||||
struct BalCacheMetrics {
|
||||
/// The total number of BALs in the cache.
|
||||
count: Gauge,
|
||||
/// The number of cache inserts.
|
||||
inserts: Counter,
|
||||
/// The number of cache hits.
|
||||
hits: Counter,
|
||||
/// The number of cache misses.
|
||||
misses: Counter,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use alloy_primitives::B256;
|
||||
|
||||
#[test]
|
||||
fn test_insert_and_get_by_hash() {
|
||||
let cache = BalCache::with_capacity(10);
|
||||
|
||||
let hash1 = B256::random();
|
||||
let hash2 = B256::random();
|
||||
let bal1 = Bytes::from_static(b"bal1");
|
||||
let bal2 = Bytes::from_static(b"bal2");
|
||||
|
||||
cache.insert(hash1, 1, bal1.clone());
|
||||
cache.insert(hash2, 2, bal2.clone());
|
||||
|
||||
let results = cache.get_by_hashes(&[hash1, hash2, B256::random()]);
|
||||
assert_eq!(results.len(), 3);
|
||||
assert_eq!(results[0], Some(bal1));
|
||||
assert_eq!(results[1], Some(bal2));
|
||||
assert_eq!(results[2], None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_by_range() {
|
||||
let cache = BalCache::with_capacity(10);
|
||||
|
||||
for i in 1..=5 {
|
||||
let hash = B256::random();
|
||||
let bal = Bytes::from(format!("bal{i}").into_bytes());
|
||||
cache.insert(hash, i, bal);
|
||||
}
|
||||
|
||||
let results = cache.get_by_range(2, 3);
|
||||
assert_eq!(results.len(), 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_by_range_stops_at_gap() {
|
||||
let cache = BalCache::with_capacity(10);
|
||||
|
||||
// Insert blocks 1, 2, 4, 5 (missing block 3)
|
||||
for i in [1, 2, 4, 5] {
|
||||
let hash = B256::random();
|
||||
let bal = Bytes::from(format!("bal{i}").into_bytes());
|
||||
cache.insert(hash, i, bal);
|
||||
}
|
||||
|
||||
// Requesting range starting at 1 should stop at the gap (block 3)
|
||||
let results = cache.get_by_range(1, 5);
|
||||
assert_eq!(results.len(), 2); // Only blocks 1 and 2
|
||||
|
||||
// Requesting range starting at 4 should return 4 and 5
|
||||
let results = cache.get_by_range(4, 3);
|
||||
assert_eq!(results.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_eviction_oldest_first() {
|
||||
let cache = BalCache::with_capacity(3);
|
||||
|
||||
// Insert blocks 10, 20, 30
|
||||
for i in [10, 20, 30] {
|
||||
let hash = B256::random();
|
||||
cache.insert(hash, i, Bytes::from_static(b"bal"));
|
||||
}
|
||||
assert_eq!(cache.len(), 3);
|
||||
|
||||
// Insert block 40, should evict block 10 (oldest/lowest)
|
||||
let hash40 = B256::random();
|
||||
cache.insert(hash40, 40, Bytes::from_static(b"bal40"));
|
||||
assert_eq!(cache.len(), 3);
|
||||
|
||||
// Block 10 should be gone, block 20 should still be there
|
||||
let results = cache.get_by_range(10, 1);
|
||||
assert_eq!(results.len(), 0);
|
||||
|
||||
let results = cache.get_by_range(20, 1);
|
||||
assert_eq!(results.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_reorg_replaces_hash() {
|
||||
let cache = BalCache::with_capacity(10);
|
||||
|
||||
let hash1 = B256::random();
|
||||
let hash2 = B256::random();
|
||||
let bal1 = Bytes::from_static(b"bal1");
|
||||
let bal2 = Bytes::from_static(b"bal2");
|
||||
|
||||
// Insert block 100 with hash1
|
||||
cache.insert(hash1, 100, bal1.clone());
|
||||
assert_eq!(cache.get_by_hashes(&[hash1])[0], Some(bal1));
|
||||
|
||||
// Reorg: insert block 100 with hash2
|
||||
cache.insert(hash2, 100, bal2.clone());
|
||||
|
||||
// hash1 should be gone, hash2 should be there
|
||||
assert_eq!(cache.get_by_hashes(&[hash1])[0], None);
|
||||
assert_eq!(cache.get_by_hashes(&[hash2])[0], Some(bal2));
|
||||
assert_eq!(cache.len(), 1);
|
||||
}
|
||||
}
|
||||
@@ -1,13 +1,15 @@
|
||||
use crate::{
|
||||
capabilities::EngineCapabilities, metrics::EngineApiMetrics, EngineApiError, EngineApiResult,
|
||||
bal_cache::BalCache, capabilities::EngineCapabilities, metrics::EngineApiMetrics,
|
||||
EngineApiError, EngineApiResult,
|
||||
};
|
||||
use alloy_eips::{
|
||||
eip1898::BlockHashOrNumber,
|
||||
eip4844::{BlobAndProofV1, BlobAndProofV2},
|
||||
eip4895::Withdrawals,
|
||||
eip7685::RequestsOrHash,
|
||||
BlockNumHash,
|
||||
};
|
||||
use alloy_primitives::{BlockHash, BlockNumber, B256, U64};
|
||||
use alloy_primitives::{BlockHash, BlockNumber, Bytes, B256, U64};
|
||||
use alloy_rpc_types_engine::{
|
||||
CancunPayloadFields, ClientVersionV1, ExecutionData, ExecutionPayloadBodiesV1,
|
||||
ExecutionPayloadBodyV1, ExecutionPayloadInputV2, ExecutionPayloadSidecar, ExecutionPayloadV1,
|
||||
@@ -21,7 +23,7 @@ use reth_engine_primitives::{ConsensusEngineHandle, EngineApiValidator, EngineTy
|
||||
use reth_network_api::NetworkInfo;
|
||||
use reth_payload_builder::PayloadStore;
|
||||
use reth_payload_primitives::{
|
||||
validate_payload_timestamp, EngineApiMessageVersion, MessageValidationKind,
|
||||
validate_payload_timestamp, EngineApiMessageVersion, ExecutionPayload, MessageValidationKind,
|
||||
PayloadOrAttributes, PayloadTypes,
|
||||
};
|
||||
use reth_primitives_traits::{Block, BlockBody};
|
||||
@@ -96,6 +98,38 @@ where
|
||||
validator: Validator,
|
||||
accept_execution_requests_hash: bool,
|
||||
network: impl NetworkInfo + 'static,
|
||||
) -> Self {
|
||||
Self::with_bal_cache(
|
||||
provider,
|
||||
chain_spec,
|
||||
beacon_consensus,
|
||||
payload_store,
|
||||
tx_pool,
|
||||
task_spawner,
|
||||
client,
|
||||
capabilities,
|
||||
validator,
|
||||
accept_execution_requests_hash,
|
||||
network,
|
||||
BalCache::new(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Create new instance of [`EngineApi`] with a custom BAL cache.
|
||||
#[expect(clippy::too_many_arguments)]
|
||||
pub fn with_bal_cache(
|
||||
provider: Provider,
|
||||
chain_spec: Arc<ChainSpec>,
|
||||
beacon_consensus: ConsensusEngineHandle<PayloadT>,
|
||||
payload_store: PayloadStore<PayloadT>,
|
||||
tx_pool: Pool,
|
||||
task_spawner: Box<dyn TaskSpawner>,
|
||||
client: ClientVersionV1,
|
||||
capabilities: EngineCapabilities,
|
||||
validator: Validator,
|
||||
accept_execution_requests_hash: bool,
|
||||
network: impl NetworkInfo + 'static,
|
||||
bal_cache: BalCache,
|
||||
) -> Self {
|
||||
let is_syncing = Arc::new(move || network.is_syncing());
|
||||
let inner = Arc::new(EngineApiInner {
|
||||
@@ -111,10 +145,25 @@ where
|
||||
validator,
|
||||
accept_execution_requests_hash,
|
||||
is_syncing,
|
||||
bal_cache,
|
||||
});
|
||||
Self { inner }
|
||||
}
|
||||
|
||||
/// Returns a reference to the BAL cache.
|
||||
pub fn bal_cache(&self) -> &BalCache {
|
||||
&self.inner.bal_cache
|
||||
}
|
||||
|
||||
/// Caches the BAL if the status is valid.
|
||||
fn maybe_cache_bal(&self, num_hash: BlockNumHash, bal: Option<Bytes>, status: &PayloadStatus) {
|
||||
if status.is_valid() &&
|
||||
let Some(bal) = bal
|
||||
{
|
||||
self.inner.bal_cache.insert(num_hash.hash, num_hash.number, bal);
|
||||
}
|
||||
}
|
||||
|
||||
/// Fetches the client version.
|
||||
pub fn get_client_version_v1(
|
||||
&self,
|
||||
@@ -149,7 +198,11 @@ where
|
||||
.validator
|
||||
.validate_version_specific_fields(EngineApiMessageVersion::V1, payload_or_attrs)?;
|
||||
|
||||
Ok(self.inner.beacon_consensus.new_payload(payload).await?)
|
||||
let num_hash = payload.num_hash();
|
||||
let bal = payload.block_access_list().cloned();
|
||||
let status = self.inner.beacon_consensus.new_payload(payload).await?;
|
||||
self.maybe_cache_bal(num_hash, bal, &status);
|
||||
Ok(status)
|
||||
}
|
||||
|
||||
/// Metered version of `new_payload_v1`.
|
||||
@@ -177,7 +230,12 @@ where
|
||||
self.inner
|
||||
.validator
|
||||
.validate_version_specific_fields(EngineApiMessageVersion::V2, payload_or_attrs)?;
|
||||
Ok(self.inner.beacon_consensus.new_payload(payload).await?)
|
||||
|
||||
let num_hash = payload.num_hash();
|
||||
let bal = payload.block_access_list().cloned();
|
||||
let status = self.inner.beacon_consensus.new_payload(payload).await?;
|
||||
self.maybe_cache_bal(num_hash, bal, &status);
|
||||
Ok(status)
|
||||
}
|
||||
|
||||
/// Metered version of `new_payload_v2`.
|
||||
@@ -206,7 +264,11 @@ where
|
||||
.validator
|
||||
.validate_version_specific_fields(EngineApiMessageVersion::V3, payload_or_attrs)?;
|
||||
|
||||
Ok(self.inner.beacon_consensus.new_payload(payload).await?)
|
||||
let num_hash = payload.num_hash();
|
||||
let bal = payload.block_access_list().cloned();
|
||||
let status = self.inner.beacon_consensus.new_payload(payload).await?;
|
||||
self.maybe_cache_bal(num_hash, bal, &status);
|
||||
Ok(status)
|
||||
}
|
||||
|
||||
/// Metrics version of `new_payload_v3`
|
||||
@@ -236,7 +298,11 @@ where
|
||||
.validator
|
||||
.validate_version_specific_fields(EngineApiMessageVersion::V4, payload_or_attrs)?;
|
||||
|
||||
Ok(self.inner.beacon_consensus.new_payload(payload).await?)
|
||||
let num_hash = payload.num_hash();
|
||||
let bal = payload.block_access_list().cloned();
|
||||
let status = self.inner.beacon_consensus.new_payload(payload).await?;
|
||||
self.maybe_cache_bal(num_hash, bal, &status);
|
||||
Ok(status)
|
||||
}
|
||||
|
||||
/// Metrics version of `new_payload_v4`
|
||||
@@ -881,6 +947,22 @@ where
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
/// Retrieves BALs for the given block hashes from the cache.
|
||||
///
|
||||
/// Returns the RLP-encoded BALs for blocks found in the cache.
|
||||
/// Missing blocks are returned as empty bytes.
|
||||
pub fn get_bals_by_hash(&self, block_hashes: Vec<BlockHash>) -> Vec<alloy_primitives::Bytes> {
|
||||
let results = self.inner.bal_cache.get_by_hashes(&block_hashes);
|
||||
results.into_iter().map(|opt| opt.unwrap_or_default()).collect()
|
||||
}
|
||||
|
||||
/// Retrieves BALs for a range of blocks from the cache.
|
||||
///
|
||||
/// Returns the RLP-encoded BALs for blocks in the range `[start, start + count)`.
|
||||
pub fn get_bals_by_range(&self, start: u64, count: u64) -> Vec<alloy_primitives::Bytes> {
|
||||
self.inner.bal_cache.get_by_range(start, count)
|
||||
}
|
||||
}
|
||||
|
||||
// This is the concrete ethereum engine API implementation.
|
||||
@@ -1205,12 +1287,10 @@ where
|
||||
/// See also <https://eips.ethereum.org/EIPS/eip-7928>
|
||||
async fn get_bals_by_hash_v1(
|
||||
&self,
|
||||
_block_hashes: Vec<BlockHash>,
|
||||
block_hashes: Vec<BlockHash>,
|
||||
) -> RpcResult<Vec<alloy_primitives::Bytes>> {
|
||||
trace!(target: "rpc::engine", "Serving engine_getBALsByHashV1");
|
||||
Err(EngineApiError::EngineObjectValidationError(
|
||||
reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
|
||||
))?
|
||||
Ok(self.get_bals_by_hash(block_hashes))
|
||||
}
|
||||
|
||||
/// Handler for `engine_getBALsByRangeV1`
|
||||
@@ -1218,13 +1298,11 @@ where
|
||||
/// See also <https://eips.ethereum.org/EIPS/eip-7928>
|
||||
async fn get_bals_by_range_v1(
|
||||
&self,
|
||||
_start: U64,
|
||||
_count: U64,
|
||||
start: U64,
|
||||
count: U64,
|
||||
) -> RpcResult<Vec<alloy_primitives::Bytes>> {
|
||||
trace!(target: "rpc::engine", "Serving engine_getBALsByRangeV1");
|
||||
Err(EngineApiError::EngineObjectValidationError(
|
||||
reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
|
||||
))?
|
||||
Ok(self.get_bals_by_range(start.to(), count.to()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1284,6 +1362,8 @@ struct EngineApiInner<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSp
|
||||
accept_execution_requests_hash: bool,
|
||||
/// Returns `true` if the node is currently syncing.
|
||||
is_syncing: Arc<dyn Fn() -> bool + Send + Sync>,
|
||||
/// Cache for Block Access Lists (BALs) per EIP-7928.
|
||||
bal_cache: BalCache,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -12,6 +12,10 @@
|
||||
/// The Engine API implementation.
|
||||
mod engine_api;
|
||||
|
||||
/// Block Access List (BAL) cache for EIP-7928.
|
||||
mod bal_cache;
|
||||
pub use bal_cache::BalCache;
|
||||
|
||||
/// Engine API capabilities.
|
||||
pub mod capabilities;
|
||||
pub use capabilities::EngineCapabilities;
|
||||
|
||||
Reference in New Issue
Block a user