From faff24b6836e428479c43f8690f0bdc95db3a9c2 Mon Sep 17 00:00:00 2001 From: Jennifer Date: Thu, 8 Aug 2024 12:11:50 +0100 Subject: [PATCH] Convert pending block to a watch channel (#10203) --- crates/chain-state/src/in_memory.rs | 41 ++++++++++++++++------------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/crates/chain-state/src/in_memory.rs b/crates/chain-state/src/in_memory.rs index fefecd7393..889c703650 100644 --- a/crates/chain-state/src/in_memory.rs +++ b/crates/chain-state/src/in_memory.rs @@ -20,7 +20,7 @@ use std::{ sync::Arc, time::Instant, }; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, watch}; /// Size of the broadcast channel used to notify canonical state events. const CANON_STATE_NOTIFICATION_CHANNEL_SIZE: usize = 256; @@ -48,7 +48,7 @@ pub(crate) struct InMemoryState { /// Mapping of block numbers to block hashes. numbers: RwLock>, /// The pending block that has not yet been made canonical. - pending: RwLock>, + pending: watch::Sender>, /// Metrics for the in-memory state. metrics: InMemoryStateMetrics, } @@ -59,10 +59,11 @@ impl InMemoryState { numbers: BTreeMap, pending: Option, ) -> Self { + let (pending, _) = watch::channel(pending); let this = Self { blocks: RwLock::new(blocks), numbers: RwLock::new(numbers), - pending: RwLock::new(pending), + pending, metrics: Default::default(), }; this.update_metrics(); @@ -112,7 +113,7 @@ impl InMemoryState { /// Returns the pending state corresponding to the current head plus one, /// from the payload received in newPayload that does not have a FCU yet. pub(crate) fn pending_state(&self) -> Option> { - self.pending.read().as_ref().map(|state| Arc::new(BlockState::new(state.block.clone()))) + self.pending.borrow().as_ref().map(|state| Arc::new(BlockState::new(state.block.clone()))) } #[cfg(test)] @@ -140,11 +141,11 @@ impl CanonicalInMemoryStateInner { { let mut blocks = self.in_memory_state.blocks.write(); let mut numbers = self.in_memory_state.numbers.write(); - let mut pending = self.in_memory_state.pending.write(); - blocks.clear(); numbers.clear(); - pending.take(); + self.in_memory_state.pending.send_modify(|p| { + p.take(); + }); } self.in_memory_state.update_metrics(); } @@ -229,7 +230,9 @@ impl CanonicalInMemoryState { // fetch the state of the pending block's parent block let parent = self.state_by_hash(pending.block().parent_hash); let pending = BlockState::with_parent(pending, parent.map(|p| (*p).clone())); - *self.inner.in_memory_state.pending.write() = Some(pending); + self.inner.in_memory_state.pending.send_modify(|p| { + p.replace(pending); + }); self.inner.in_memory_state.update_metrics(); } @@ -242,7 +245,6 @@ impl CanonicalInMemoryState { // acquire all locks let mut numbers = self.inner.in_memory_state.numbers.write(); let mut blocks = self.inner.in_memory_state.blocks.write(); - let mut pending = self.inner.in_memory_state.pending.write(); // we first remove the blocks from the reorged chain for block in reorged { @@ -266,7 +268,9 @@ impl CanonicalInMemoryState { } // remove the pending state - pending.take(); + self.inner.in_memory_state.pending.send_modify(|p| { + p.take(); + }); } self.inner.in_memory_state.update_metrics(); } @@ -291,7 +295,6 @@ impl CanonicalInMemoryState { { let mut blocks = self.inner.in_memory_state.blocks.write(); let mut numbers = self.inner.in_memory_state.numbers.write(); - let mut pending = self.inner.in_memory_state.pending.write(); // clear all numbers numbers.clear(); @@ -319,12 +322,14 @@ impl CanonicalInMemoryState { } // also shift the pending state if it exists - if let Some(pending) = pending.as_mut() { - pending.parent = blocks - .get(&pending.block().block.parent_hash) - .cloned() - .map(|p| Box::new((*p).clone())); - } + self.inner.in_memory_state.pending.send_modify(|p| { + if let Some(p) = p.as_mut() { + p.parent = blocks + .get(&p.block().block.parent_hash) + .cloned() + .map(|p| Box::new((*p).clone())); + } + }); } self.inner.in_memory_state.update_metrics(); } @@ -486,7 +491,7 @@ impl CanonicalInMemoryState { /// Returns an iterator over all canonical blocks in the in-memory state, from newest to oldest. pub fn canonical_chain(&self) -> impl Iterator> { - let pending = self.inner.in_memory_state.pending.read().clone(); + let pending = self.inner.in_memory_state.pending.borrow().clone(); let head = self.inner.in_memory_state.head_state(); // this clone is cheap because we only expect to keep in memory a few