feat(storage): non-optional committed chain in CanonStateNotification (#7566)

This commit is contained in:
Alexey Shekhirin
2024-04-11 14:27:01 +01:00
committed by GitHub
parent 80f8707a1b
commit 681b1a598f
5 changed files with 35 additions and 44 deletions

View File

@@ -194,23 +194,22 @@ where
}
fn on_new_state(&mut self, new_state: CanonStateNotification) {
if let Some(committed) = new_state.committed() {
let mut cached = CachedReads::default();
let mut cached = CachedReads::default();
// extract the state from the notification and put it into the cache
let new_state = committed.state();
for (addr, acc) in new_state.bundle_accounts_iter() {
if let Some(info) = acc.info.clone() {
// we want pre cache existing accounts and their storage
// this only includes changed accounts and storage but is better than nothing
let storage =
acc.storage.iter().map(|(key, slot)| (*key, slot.present_value)).collect();
cached.insert_account(addr, info, storage);
}
// extract the state from the notification and put it into the cache
let committed = new_state.committed();
let new_state = committed.state();
for (addr, acc) in new_state.bundle_accounts_iter() {
if let Some(info) = acc.info.clone() {
// we want pre cache existing accounts and their storage
// this only includes changed accounts and storage but is better than nothing
let storage =
acc.storage.iter().map(|(key, slot)| (*key, slot.present_value)).collect();
cached.insert_account(addr, info, storage);
}
self.pre_cached = Some(PrecachedState { block: committed.tip().hash(), cached });
}
self.pre_cached = Some(PrecachedState { block: committed.tip().hash(), cached });
}
}

View File

@@ -239,18 +239,17 @@ pub async fn fee_history_cache_new_blocks_task<St, Provider>(
// the stream ended, we are done
break;
};
if let Some(committed) = event.committed() {
let (blocks, receipts): (Vec<_>, Vec<_>) = committed
.blocks_and_receipts()
.map(|(block, receipts)| {
(block.block.clone(), Arc::new(receipts.iter().flatten().cloned().collect::<Vec<_>>()))
})
.unzip();
fee_history_cache.insert_blocks(blocks.into_iter().zip(receipts)).await;
let (blocks, receipts): (Vec<_>, Vec<_>) = event
.committed()
.blocks_and_receipts()
.map(|(block, receipts)| {
(block.block.clone(), Arc::new(receipts.iter().flatten().cloned().collect::<Vec<_>>()))
})
.unzip();
fee_history_cache.insert_blocks(blocks.into_iter().zip(receipts)).await;
// keep track of missing blocks
missing_blocks = fee_history_cache.missing_consecutive_blocks().await;
}
// keep track of missing blocks
missing_blocks = fee_history_cache.missing_consecutive_blocks().await;
}
}
}

View File

@@ -651,12 +651,9 @@ where
eth_state_cache.to_service.send(CacheAction::RemoveReorgedChain { chain_change });
}
if let Some(committed) = event.committed() {
let chain_change = ChainChange::new(committed);
let chain_change = ChainChange::new(event.committed());
let _ = eth_state_cache
.to_service
.send(CacheAction::CacheNewCanonicalChain { chain_change });
}
let _ =
eth_state_cache.to_service.send(CacheAction::CacheNewCanonicalChain { chain_change });
}
}

View File

@@ -314,10 +314,7 @@ where
/// Returns a stream that yields all new RPC blocks.
fn new_headers_stream(&self) -> impl Stream<Item = Header> {
self.chain_events.canonical_state_stream().flat_map(|new_chain| {
let headers = new_chain
.committed()
.map(|chain| chain.headers().collect::<Vec<_>>())
.unwrap_or_default();
let headers = new_chain.committed().headers().collect::<Vec<_>>();
futures::stream::iter(
headers.into_iter().map(reth_rpc_types_compat::block::from_primitive_with_hash),
)

View File

@@ -97,18 +97,18 @@ impl CanonStateNotification {
/// Get old chain if any.
pub fn reverted(&self) -> Option<Arc<Chain>> {
match self {
Self::Reorg { old, .. } => Some(old.clone()),
Self::Commit { .. } => None,
Self::Reorg { old, .. } => Some(old.clone()),
}
}
/// Get the new chain if any.
///
/// Returns the new committed [Chain] for [Self::Reorg] and [Self::Commit] variants.
pub fn committed(&self) -> Option<Arc<Chain>> {
pub fn committed(&self) -> Arc<Chain> {
match self {
Self::Reorg { new, .. } => Some(new.clone()),
Self::Commit { new } => Some(new.clone()),
Self::Commit { new } => new.clone(),
Self::Reorg { new, .. } => new.clone(),
}
}
@@ -118,8 +118,8 @@ impl CanonStateNotification {
/// new block.
pub fn tip(&self) -> &SealedBlockWithSenders {
match self {
Self::Reorg { new, .. } => new.tip(),
Self::Commit { new } => new.tip(),
Self::Reorg { new, .. } => new.tip(),
}
}
@@ -135,10 +135,9 @@ impl CanonStateNotification {
.extend(old.receipts_with_attachment().into_iter().map(|receipt| (receipt, true)));
}
// get new receipts
if let Some(new) = self.committed() {
receipts
.extend(new.receipts_with_attachment().into_iter().map(|receipt| (receipt, false)));
}
receipts.extend(
self.committed().receipts_with_attachment().into_iter().map(|receipt| (receipt, false)),
);
receipts
}
}