mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-27 16:18:08 -05:00
feat: add canonical in memory state (#9588)
This commit is contained in:
@@ -4,6 +4,7 @@ use crate::{
|
||||
engine::{DownloadRequest, EngineApiEvent, FromEngine},
|
||||
persistence::PersistenceHandle,
|
||||
};
|
||||
use parking_lot::RwLock;
|
||||
use reth_beacon_consensus::{
|
||||
BeaconEngineMessage, ForkchoiceStateTracker, InvalidHeaderCache, OnForkChoiceUpdated,
|
||||
};
|
||||
@@ -19,10 +20,11 @@ use reth_payload_primitives::PayloadTypes;
|
||||
use reth_payload_validator::ExecutionPayloadValidator;
|
||||
use reth_primitives::{
|
||||
Address, Block, BlockNumber, GotExpected, Receipts, Requests, SealedBlock,
|
||||
SealedBlockWithSenders, B256, U256,
|
||||
SealedBlockWithSenders, SealedHeader, B256, U256,
|
||||
};
|
||||
use reth_provider::{
|
||||
BlockReader, ExecutionOutcome, StateProvider, StateProviderFactory, StateRootProvider,
|
||||
providers::ChainInfoTracker, BlockReader, ExecutionOutcome, StateProvider,
|
||||
StateProviderFactory, StateRootProvider,
|
||||
};
|
||||
use reth_revm::database::StateProviderDatabase;
|
||||
use reth_rpc_types::{
|
||||
@@ -155,9 +157,9 @@ impl TreeState {
|
||||
/// Container type for in memory state data.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct InMemoryStateImpl {
|
||||
blocks: HashMap<B256, Arc<State>>,
|
||||
numbers: HashMap<u64, B256>,
|
||||
pending: Option<State>,
|
||||
blocks: RwLock<HashMap<B256, Arc<State>>>,
|
||||
numbers: RwLock<HashMap<u64, B256>>,
|
||||
pending: RwLock<Option<State>>,
|
||||
}
|
||||
|
||||
impl InMemoryStateImpl {
|
||||
@@ -166,29 +168,94 @@ impl InMemoryStateImpl {
|
||||
numbers: HashMap<u64, B256>,
|
||||
pending: Option<State>,
|
||||
) -> Self {
|
||||
Self { blocks, numbers, pending }
|
||||
Self {
|
||||
blocks: RwLock::new(blocks),
|
||||
numbers: RwLock::new(numbers),
|
||||
pending: RwLock::new(pending),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl InMemoryState for InMemoryStateImpl {
|
||||
fn state_by_hash(&self, hash: B256) -> Option<Arc<State>> {
|
||||
self.blocks.get(&hash).cloned()
|
||||
self.blocks.read().get(&hash).cloned()
|
||||
}
|
||||
|
||||
fn state_by_number(&self, number: u64) -> Option<Arc<State>> {
|
||||
self.numbers.get(&number).and_then(|hash| self.blocks.get(hash).cloned())
|
||||
self.numbers.read().get(&number).and_then(|hash| self.blocks.read().get(hash).cloned())
|
||||
}
|
||||
|
||||
fn current_head(&self) -> Option<(BlockNumber, B256)> {
|
||||
self.numbers.iter().max_by_key(|(&number, _)| number).map(|(&number, &hash)| (number, hash))
|
||||
}
|
||||
|
||||
fn pending_block_hash(&self) -> Option<B256> {
|
||||
self.pending.as_ref().map(|state| state.hash())
|
||||
fn head_state(&self) -> Option<Arc<State>> {
|
||||
self.numbers
|
||||
.read()
|
||||
.iter()
|
||||
.max_by_key(|(&number, _)| number)
|
||||
.and_then(|(_, hash)| self.blocks.read().get(hash).cloned())
|
||||
}
|
||||
|
||||
fn pending_state(&self) -> Option<Arc<State>> {
|
||||
self.pending.as_ref().map(|state| Arc::new(State(state.0.clone())))
|
||||
self.pending.read().as_ref().map(|state| Arc::new(State(state.0.clone())))
|
||||
}
|
||||
}
|
||||
|
||||
/// Inner type to provide in memory state. It includes a chain tracker to be
|
||||
/// advanced internally by the tree.
|
||||
#[derive(Debug)]
|
||||
struct CanonicalInMemoryStateInner {
|
||||
chain_info_tracker: ChainInfoTracker,
|
||||
in_memory_state: InMemoryStateImpl,
|
||||
}
|
||||
|
||||
/// This type is responsible for providing the blocks, receipts, and state for
|
||||
/// all canonical blocks not on disk yet and keeps track of the block range that
|
||||
/// is in memory.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CanonicalInMemoryState {
|
||||
inner: Arc<CanonicalInMemoryStateInner>,
|
||||
}
|
||||
|
||||
impl CanonicalInMemoryState {
|
||||
fn new(
|
||||
blocks: HashMap<B256, Arc<State>>,
|
||||
numbers: HashMap<u64, B256>,
|
||||
pending: Option<State>,
|
||||
) -> Self {
|
||||
let in_memory_state = InMemoryStateImpl::new(blocks, numbers, pending);
|
||||
let head_state = in_memory_state.head_state();
|
||||
let header = match head_state {
|
||||
Some(state) => state.block().block().header.clone(),
|
||||
None => SealedHeader::default(),
|
||||
};
|
||||
let chain_info_tracker = ChainInfoTracker::new(header);
|
||||
let inner = CanonicalInMemoryStateInner { chain_info_tracker, in_memory_state };
|
||||
|
||||
Self { inner: Arc::new(inner) }
|
||||
}
|
||||
|
||||
fn with_header(header: SealedHeader) -> Self {
|
||||
let chain_info_tracker = ChainInfoTracker::new(header);
|
||||
let in_memory_state = InMemoryStateImpl::default();
|
||||
let inner = CanonicalInMemoryStateInner { chain_info_tracker, in_memory_state };
|
||||
|
||||
Self { inner: Arc::new(inner) }
|
||||
}
|
||||
}
|
||||
|
||||
impl InMemoryState for CanonicalInMemoryState {
|
||||
fn state_by_hash(&self, hash: B256) -> Option<Arc<State>> {
|
||||
self.inner.in_memory_state.state_by_hash(hash)
|
||||
}
|
||||
|
||||
fn state_by_number(&self, number: u64) -> Option<Arc<State>> {
|
||||
self.inner.in_memory_state.state_by_number(number)
|
||||
}
|
||||
|
||||
fn head_state(&self) -> Option<Arc<State>> {
|
||||
self.inner.in_memory_state.head_state()
|
||||
}
|
||||
|
||||
fn pending_state(&self) -> Option<Arc<State>> {
|
||||
self.inner.in_memory_state.pending_state()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -316,7 +383,7 @@ pub struct EngineApiTreeHandlerImpl<P, E, T: EngineTypes> {
|
||||
persistence_state: PersistenceState,
|
||||
/// (tmp) The flag indicating whether the pipeline is active.
|
||||
is_pipeline_active: bool,
|
||||
canonical_in_memory_state: InMemoryStateImpl,
|
||||
canonical_in_memory_state: CanonicalInMemoryState,
|
||||
_marker: PhantomData<T>,
|
||||
}
|
||||
|
||||
@@ -335,6 +402,7 @@ where
|
||||
incoming: Receiver<FromEngine<BeaconEngineMessage<T>>>,
|
||||
outgoing: UnboundedSender<EngineApiEvent>,
|
||||
state: EngineApiTreeState,
|
||||
header: SealedHeader,
|
||||
persistence: PersistenceHandle,
|
||||
) -> Self {
|
||||
Self {
|
||||
@@ -348,7 +416,7 @@ where
|
||||
persistence_state: PersistenceState::default(),
|
||||
is_pipeline_active: false,
|
||||
state,
|
||||
canonical_in_memory_state: InMemoryStateImpl::default(),
|
||||
canonical_in_memory_state: CanonicalInMemoryState::with_header(header),
|
||||
_marker: PhantomData,
|
||||
}
|
||||
}
|
||||
@@ -361,6 +429,7 @@ where
|
||||
payload_validator: ExecutionPayloadValidator,
|
||||
incoming: Receiver<FromEngine<BeaconEngineMessage<T>>>,
|
||||
state: EngineApiTreeState,
|
||||
header: SealedHeader,
|
||||
persistence: PersistenceHandle,
|
||||
) -> UnboundedSender<EngineApiEvent> {
|
||||
let (outgoing, rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
@@ -372,6 +441,7 @@ where
|
||||
incoming,
|
||||
outgoing.clone(),
|
||||
state,
|
||||
header,
|
||||
persistence,
|
||||
);
|
||||
std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| task.run()).unwrap();
|
||||
@@ -935,17 +1005,15 @@ trait InMemoryState: Send + Sync {
|
||||
fn state_by_hash(&self, hash: B256) -> Option<Arc<State>>;
|
||||
/// Returns the state for a given block number.
|
||||
fn state_by_number(&self, number: u64) -> Option<Arc<State>>;
|
||||
/// Returns the current chain head.
|
||||
fn current_head(&self) -> Option<(BlockNumber, B256)>;
|
||||
/// Returns the pending block hash.
|
||||
fn pending_block_hash(&self) -> Option<B256>;
|
||||
/// Returns the current chain head state.
|
||||
fn head_state(&self) -> Option<Arc<State>>;
|
||||
/// Returns the pending state corresponding to the current head plus one,
|
||||
/// from the payload received in newPayload that does not have a FCU yet.
|
||||
fn pending_state(&self) -> Option<Arc<State>>;
|
||||
}
|
||||
|
||||
/// State after applying the given block.
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
pub struct State(ExecutedBlock);
|
||||
|
||||
impl State {
|
||||
@@ -1047,6 +1115,7 @@ mod tests {
|
||||
forkchoice_state_tracker: ForkchoiceStateTracker::default(),
|
||||
};
|
||||
|
||||
let header = blocks.first().unwrap().block().header.clone();
|
||||
let mut tree = EngineApiTreeHandlerImpl::new(
|
||||
provider,
|
||||
executor_factory,
|
||||
@@ -1055,12 +1124,13 @@ mod tests {
|
||||
to_tree_rx,
|
||||
from_tree_tx,
|
||||
engine_api_tree_state,
|
||||
header,
|
||||
persistence_handle,
|
||||
);
|
||||
let last_executed_block = blocks.last().unwrap().clone();
|
||||
let pending = Some(State::new(last_executed_block));
|
||||
tree.canonical_in_memory_state =
|
||||
InMemoryStateImpl::new(state_by_hash, hash_by_number, pending);
|
||||
CanonicalInMemoryState::new(state_by_hash, hash_by_number, pending);
|
||||
|
||||
TestHarness { tree, to_tree_tx, blocks, sf_action_rx }
|
||||
}
|
||||
@@ -1103,12 +1173,20 @@ mod tests {
|
||||
|
||||
let expected_state = State::new(executed_block.clone());
|
||||
|
||||
let actual_state_by_hash =
|
||||
tree.canonical_in_memory_state.state_by_hash(sealed_block.hash()).unwrap();
|
||||
let actual_state_by_hash = tree
|
||||
.canonical_in_memory_state
|
||||
.inner
|
||||
.in_memory_state
|
||||
.state_by_hash(sealed_block.hash())
|
||||
.unwrap();
|
||||
assert_eq!(expected_state, *actual_state_by_hash);
|
||||
|
||||
let actual_state_by_number =
|
||||
tree.canonical_in_memory_state.state_by_number(sealed_block.number).unwrap();
|
||||
let actual_state_by_number = tree
|
||||
.canonical_in_memory_state
|
||||
.inner
|
||||
.in_memory_state
|
||||
.state_by_number(sealed_block.number)
|
||||
.unwrap();
|
||||
assert_eq!(expected_state, *actual_state_by_number);
|
||||
}
|
||||
}
|
||||
@@ -1145,28 +1223,23 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_in_memory_state_impl_current_head() {
|
||||
async fn test_in_memory_state_impl_head_state() {
|
||||
let mut state_by_hash = HashMap::new();
|
||||
let mut hash_by_number = HashMap::new();
|
||||
let hash1 = B256::random();
|
||||
let hash2 = B256::random();
|
||||
let state1 = Arc::new(create_mock_state(1));
|
||||
let state2 = Arc::new(create_mock_state(2));
|
||||
let hash1 = state1.hash();
|
||||
let hash2 = state2.hash();
|
||||
hash_by_number.insert(1, hash1);
|
||||
hash_by_number.insert(2, hash2);
|
||||
state_by_hash.insert(hash1, state1);
|
||||
state_by_hash.insert(hash2, state2);
|
||||
|
||||
let in_memory_state = InMemoryStateImpl::new(HashMap::new(), hash_by_number, None);
|
||||
let in_memory_state = InMemoryStateImpl::new(state_by_hash, hash_by_number, None);
|
||||
let head_state = in_memory_state.head_state().unwrap();
|
||||
|
||||
assert_eq!(in_memory_state.current_head(), Some((2, hash2)));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_in_memory_state_impl_pending_block_hash() {
|
||||
let number = rand::thread_rng().gen::<u64>();
|
||||
let pending_state = create_mock_state(number);
|
||||
let pending_hash = pending_state.hash();
|
||||
|
||||
let in_memory_state =
|
||||
InMemoryStateImpl::new(HashMap::new(), HashMap::new(), Some(pending_state));
|
||||
|
||||
assert_eq!(in_memory_state.pending_block_hash(), Some(pending_hash));
|
||||
assert_eq!(head_state.hash(), hash2);
|
||||
assert_eq!(head_state.number(), 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -1189,7 +1262,6 @@ mod tests {
|
||||
async fn test_in_memory_state_impl_no_pending_state() {
|
||||
let in_memory_state = InMemoryStateImpl::new(HashMap::new(), HashMap::new(), None);
|
||||
|
||||
assert_eq!(in_memory_state.pending_block_hash(), None);
|
||||
assert_eq!(in_memory_state.pending_state(), None);
|
||||
}
|
||||
|
||||
|
||||
@@ -12,13 +12,13 @@ use tokio::sync::watch;
|
||||
|
||||
/// Tracks the chain info: canonical head, safe block, finalized block.
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct ChainInfoTracker {
|
||||
pub struct ChainInfoTracker {
|
||||
inner: Arc<ChainInfoInner>,
|
||||
}
|
||||
|
||||
impl ChainInfoTracker {
|
||||
/// Create a new chain info container for the given canonical head.
|
||||
pub(crate) fn new(head: SealedHeader) -> Self {
|
||||
pub fn new(head: SealedHeader) -> Self {
|
||||
let (finalized_block, _) = watch::channel(None);
|
||||
let (safe_block, _) = watch::channel(None);
|
||||
Self {
|
||||
@@ -34,73 +34,73 @@ impl ChainInfoTracker {
|
||||
}
|
||||
|
||||
/// Returns the [`ChainInfo`] for the canonical head.
|
||||
pub(crate) fn chain_info(&self) -> ChainInfo {
|
||||
pub fn chain_info(&self) -> ChainInfo {
|
||||
let inner = self.inner.canonical_head.read();
|
||||
ChainInfo { best_hash: inner.hash(), best_number: inner.number }
|
||||
}
|
||||
|
||||
/// Update the timestamp when we received a forkchoice update.
|
||||
pub(crate) fn on_forkchoice_update_received(&self) {
|
||||
pub fn on_forkchoice_update_received(&self) {
|
||||
self.inner.last_forkchoice_update.write().replace(Instant::now());
|
||||
}
|
||||
|
||||
/// Returns the instant when we received the latest forkchoice update.
|
||||
pub(crate) fn last_forkchoice_update_received_at(&self) -> Option<Instant> {
|
||||
pub fn last_forkchoice_update_received_at(&self) -> Option<Instant> {
|
||||
*self.inner.last_forkchoice_update.read()
|
||||
}
|
||||
|
||||
/// Update the timestamp when we exchanged a transition configuration.
|
||||
pub(crate) fn on_transition_configuration_exchanged(&self) {
|
||||
pub fn on_transition_configuration_exchanged(&self) {
|
||||
self.inner.last_transition_configuration_exchange.write().replace(Instant::now());
|
||||
}
|
||||
|
||||
/// Returns the instant when we exchanged the transition configuration last time.
|
||||
pub(crate) fn last_transition_configuration_exchanged_at(&self) -> Option<Instant> {
|
||||
pub fn last_transition_configuration_exchanged_at(&self) -> Option<Instant> {
|
||||
*self.inner.last_transition_configuration_exchange.read()
|
||||
}
|
||||
|
||||
/// Returns the canonical head of the chain.
|
||||
pub(crate) fn get_canonical_head(&self) -> SealedHeader {
|
||||
pub fn get_canonical_head(&self) -> SealedHeader {
|
||||
self.inner.canonical_head.read().clone()
|
||||
}
|
||||
|
||||
/// Returns the safe header of the chain.
|
||||
pub(crate) fn get_safe_header(&self) -> Option<SealedHeader> {
|
||||
pub fn get_safe_header(&self) -> Option<SealedHeader> {
|
||||
self.inner.safe_block.borrow().clone()
|
||||
}
|
||||
|
||||
/// Returns the finalized header of the chain.
|
||||
pub(crate) fn get_finalized_header(&self) -> Option<SealedHeader> {
|
||||
pub fn get_finalized_header(&self) -> Option<SealedHeader> {
|
||||
self.inner.finalized_block.borrow().clone()
|
||||
}
|
||||
|
||||
/// Returns the canonical head of the chain.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn get_canonical_num_hash(&self) -> BlockNumHash {
|
||||
pub fn get_canonical_num_hash(&self) -> BlockNumHash {
|
||||
self.inner.canonical_head.read().num_hash()
|
||||
}
|
||||
|
||||
/// Returns the canonical head of the chain.
|
||||
pub(crate) fn get_canonical_block_number(&self) -> BlockNumber {
|
||||
pub fn get_canonical_block_number(&self) -> BlockNumber {
|
||||
self.inner.canonical_head_number.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Returns the safe header of the chain.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn get_safe_num_hash(&self) -> Option<BlockNumHash> {
|
||||
pub fn get_safe_num_hash(&self) -> Option<BlockNumHash> {
|
||||
let h = self.inner.safe_block.borrow();
|
||||
h.as_ref().map(|h| h.num_hash())
|
||||
}
|
||||
|
||||
/// Returns the finalized header of the chain.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn get_finalized_num_hash(&self) -> Option<BlockNumHash> {
|
||||
pub fn get_finalized_num_hash(&self) -> Option<BlockNumHash> {
|
||||
let h = self.inner.finalized_block.borrow();
|
||||
h.as_ref().map(|h| h.num_hash())
|
||||
}
|
||||
|
||||
/// Sets the canonical head of the chain.
|
||||
pub(crate) fn set_canonical_head(&self, header: SealedHeader) {
|
||||
pub fn set_canonical_head(&self, header: SealedHeader) {
|
||||
let number = header.number;
|
||||
*self.inner.canonical_head.write() = header;
|
||||
|
||||
@@ -109,14 +109,14 @@ impl ChainInfoTracker {
|
||||
}
|
||||
|
||||
/// Sets the safe header of the chain.
|
||||
pub(crate) fn set_safe(&self, header: SealedHeader) {
|
||||
pub fn set_safe(&self, header: SealedHeader) {
|
||||
self.inner.safe_block.send_modify(|h| {
|
||||
let _ = h.replace(header);
|
||||
});
|
||||
}
|
||||
|
||||
/// Sets the finalized header of the chain.
|
||||
pub(crate) fn set_finalized(&self, header: SealedHeader) {
|
||||
pub fn set_finalized(&self, header: SealedHeader) {
|
||||
self.inner.finalized_block.send_modify(|h| {
|
||||
let _ = h.replace(header);
|
||||
});
|
||||
|
||||
@@ -55,7 +55,7 @@ mod bundle_state_provider;
|
||||
pub use bundle_state_provider::BundleStateProvider;
|
||||
|
||||
mod chain_info;
|
||||
use chain_info::ChainInfoTracker;
|
||||
pub use chain_info::ChainInfoTracker;
|
||||
|
||||
mod consistent_view;
|
||||
use alloy_rpc_types_engine::ForkchoiceState;
|
||||
@@ -74,8 +74,8 @@ pub struct BlockchainProvider<DB> {
|
||||
tree: Arc<dyn TreeViewer>,
|
||||
/// Tracks the chain info wrt forkchoice updates
|
||||
chain_info: ChainInfoTracker,
|
||||
// TODO: In-memory state for recent blocks and pending state.
|
||||
//in_memory_state: Arc<dyn InMemoryState>,
|
||||
// TODO: replace chain_info with CanonicalInMemoryState.
|
||||
//canonical_in_memory_state: CanonicalInMemoryState,
|
||||
}
|
||||
|
||||
impl<DB> Clone for BlockchainProvider<DB> {
|
||||
@@ -84,8 +84,8 @@ impl<DB> Clone for BlockchainProvider<DB> {
|
||||
database: self.database.clone(),
|
||||
tree: self.tree.clone(),
|
||||
chain_info: self.chain_info.clone(),
|
||||
// TODO: add in_memory_state
|
||||
// in_memory_state: self.in_memory_state.clone(),
|
||||
// TODO: add canonical_in_memory_state
|
||||
// canonical_in_memory_state: self.canonical_in_memory_state.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user