feat(BlockchainTree): Broadcast new canonical block headers (#2027)

This commit is contained in:
rakita
2023-03-30 12:58:02 +02:00
committed by GitHub
parent 41b1b3c769
commit d239b82ccb
6 changed files with 118 additions and 58 deletions

View File

@@ -2,13 +2,21 @@
use chain::{BlockChainId, Chain, ForkBlock};
use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx};
use reth_interfaces::{
blockchain_tree::BlockStatus, consensus::Consensus, executor::Error as ExecError, Error,
blockchain_tree::BlockStatus,
consensus::Consensus,
events::{NewBlockNotifications, NewBlockNotificationsSender},
executor::Error as ExecError,
Error,
};
use reth_primitives::{
BlockHash, BlockNumHash, BlockNumber, Hardfork, SealedBlock, SealedBlockWithSenders, U256,
BlockHash, BlockNumHash, BlockNumber, Hardfork, SealedBlock, SealedBlockWithSenders,
SealedHeader, U256,
};
use reth_provider::{post_state::PostState, ExecutorFactory, HeaderProvider, Transaction};
use std::collections::{BTreeMap, HashMap};
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
};
pub mod block_indices;
use block_indices::BlockIndices;
@@ -80,6 +88,8 @@ pub struct BlockchainTree<DB: Database, C: Consensus, EF: ExecutorFactory> {
externals: TreeExternals<DB, C, EF>,
/// Tree configuration
config: BlockchainTreeConfig,
/// Unbounded channel for sending new block notifications.
new_block_notication_sender: NewBlockNotificationsSender,
}
/// A container that wraps chains and block indices to allow searching for block hashes across all
@@ -118,6 +128,11 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
last_canonical_hashes.last().cloned().unwrap_or_default()
};
// size of the broadcast is double of max reorg depth because at max reorg depth we can have
// send at least N block at the time.
let (new_block_notication_sender, _) =
tokio::sync::broadcast::channel(2 * max_reorg_depth as usize);
Ok(Self {
externals,
block_chain_id_generator: 0,
@@ -127,6 +142,7 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
BTreeMap::from_iter(last_canonical_hashes.into_iter()),
),
config,
new_block_notication_sender,
})
}
@@ -549,6 +565,8 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
// update canonical index
self.block_indices.canonicalize_blocks(new_canon_chain.blocks());
let headers: Vec<Arc<SealedHeader>> =
new_canon_chain.blocks().iter().map(|(_, b)| Arc::new(b.header.clone())).collect();
// if joins to the tip
if new_canon_chain.fork_block_hash() == old_tip.hash {
// append to database
@@ -565,13 +583,27 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
let old_canon_chain = self.revert_canonical(canon_fork.number)?;
// commit new canonical chain.
self.commit_canonical(new_canon_chain)?;
// insert old canon chain
self.insert_chain(old_canon_chain);
}
// Broadcast new canonical blocks.
headers.into_iter().for_each(|header| {
// ignore if receiver is dropped.
let _ = self.new_block_notication_sender.send(header);
});
Ok(())
}
/// Subscribe to new blocks events.
///
/// Note: Only canonical blocks are send.
pub fn subscribe_new_blocks(&self) -> NewBlockNotifications {
self.new_block_notication_sender.subscribe()
}
/// Canonicalize the given chain and commit it to the database.
fn commit_canonical(&mut self, chain: Chain) -> Result<(), Error> {
let mut tx = Transaction::new(&self.externals.db)?;
@@ -726,8 +758,8 @@ mod tests {
}
}
#[test]
fn sanity_path() {
#[tokio::test]
async fn sanity_path() {
let data = BlockChainTestData::default();
let (mut block1, exec1) = data.blocks[0].clone();
block1.number = 11;
@@ -743,6 +775,7 @@ mod tests {
// make tree
let config = BlockchainTreeConfig::new(1, 2, 3);
let mut tree = BlockchainTree::new(externals, config).expect("failed to create tree");
let mut new_block_notification = tree.subscribe_new_blocks();
// genesis block 10 is already canonical
assert_eq!(tree.make_canonical(&H256::zero()), Ok(()));
@@ -789,8 +822,13 @@ mod tests {
// make block1 canonical
assert_eq!(tree.make_canonical(&block1.hash()), Ok(()));
// check notification
assert_eq!(new_block_notification.try_recv(), Ok(Arc::new(block1.header.clone())));
// make block2 canonical
assert_eq!(tree.make_canonical(&block2.hash()), Ok(()));
// check notification.
assert_eq!(new_block_notification.try_recv(), Ok(Arc::new(block2.header.clone())));
// Trie state:
// b2 (canonical block)
@@ -847,6 +885,9 @@ mod tests {
// make b2a canonical
assert_eq!(tree.make_canonical(&block2a_hash), Ok(()));
// check notification.
assert_eq!(new_block_notification.try_recv(), Ok(Arc::new(block2a.header.clone())));
// Trie state:
// b2a b2 (side chain)
// | /
@@ -866,6 +907,9 @@ mod tests {
.assert(&tree);
assert_eq!(tree.make_canonical(&block1a_hash), Ok(()));
// check notification.
assert_eq!(new_block_notification.try_recv(), Ok(Arc::new(block1a.header.clone())));
// Trie state:
// b2a b2 (side chain)
// | /
@@ -890,6 +934,11 @@ mod tests {
// make b2 canonical
assert_eq!(tree.make_canonical(&block2.hash()), Ok(()));
// check notification.
assert_eq!(new_block_notification.try_recv(), Ok(Arc::new(block1.header.clone())));
assert_eq!(new_block_notification.try_recv(), Ok(Arc::new(block2.header.clone())));
// Trie state:
// b2 b2a (side chain)
// | /
@@ -947,6 +996,9 @@ mod tests {
// commit b2a
assert_eq!(tree.make_canonical(&block2.hash), Ok(()));
// check notification.
assert_eq!(new_block_notification.try_recv(), Ok(Arc::new(block2.header.clone())));
// Trie state:
// b2 b2a (side chain)
// | /

View File

@@ -4,6 +4,7 @@ use reth_db::database::Database;
use reth_interfaces::{
blockchain_tree::{BlockStatus, BlockchainTreeEngine, BlockchainTreeViewer},
consensus::Consensus,
events::{ChainEventSubscriptions, NewBlockNotifications},
Error,
};
use reth_primitives::{BlockHash, BlockNumHash, BlockNumber, SealedBlockWithSenders};
@@ -82,3 +83,11 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTreePendingState
Ok(Box::new(post_state))
}
}
impl<DB: Database, C: Consensus, EF: ExecutorFactory> ChainEventSubscriptions
for ShareableBlockchainTree<DB, C, EF>
{
fn subscribe_new_blocks(&self) -> NewBlockNotifications {
self.tree.read().subscribe_new_blocks()
}
}

View File

@@ -1,21 +1,18 @@
use reth_primitives::{Header, H256};
use reth_primitives::SealedHeader;
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::broadcast::{Receiver, Sender};
/// New block notification that is Arc around [SealedHeader].
pub type NewBlockNotification = Arc<SealedHeader>;
/// Type alias for a receiver that receives [NewBlockNotification]
pub type NewBlockNotifications = UnboundedReceiver<NewBlockNotification>;
pub type NewBlockNotifications = Receiver<NewBlockNotification>;
/// Type alias for a sender that sends [NewBlockNotification]
pub type NewBlockNotificationsSender = Sender<NewBlockNotification>;
/// A type that allows to register chain related event subscriptions.
pub trait ChainEventSubscriptions: Send + Sync {
/// Get notified when a new block was imported.
fn subscribe_new_blocks(&self) -> NewBlockNotifications;
}
/// A notification that's emitted when a new block was imported.
#[derive(Clone, Debug)]
pub struct NewBlockNotification {
/// Hash of the block that was imported
pub hash: H256,
/// The block header of the new block
pub header: Arc<Header>,
}

View File

@@ -1,31 +1,27 @@
use crate::events::{ChainEventSubscriptions, NewBlockNotification, NewBlockNotifications};
use async_trait::async_trait;
use reth_primitives::{Header, H256};
use reth_primitives::{Header, SealedHeader, H256};
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::broadcast::{self, Receiver, Sender};
/// A test ChainEventSubscriptions
#[derive(Clone, Default)]
pub struct TestChainEventSubscriptions {
new_blocks_txs: Arc<Mutex<Vec<UnboundedSender<NewBlockNotification>>>>,
new_blocks_txs: Arc<Mutex<Vec<Sender<NewBlockNotification>>>>,
}
impl TestChainEventSubscriptions {
/// Adds new block to the queue that can be consumed with
/// [`TestChainEventSubscriptions::subscribe_new_blocks`]
pub fn add_new_block(&mut self, hash: H256, header: Header) {
pub fn add_new_block(&mut self, header: SealedHeader) {
let header = Arc::new(header);
self.new_blocks_txs
.lock()
.as_mut()
.unwrap()
.retain(|tx| tx.send(NewBlockNotification { hash, header: header.clone() }).is_ok())
self.new_blocks_txs.lock().as_mut().unwrap().retain(|tx| tx.send(header.clone()).is_ok())
}
}
impl ChainEventSubscriptions for TestChainEventSubscriptions {
fn subscribe_new_blocks(&self) -> NewBlockNotifications {
let (new_blocks_tx, new_blocks_rx) = unbounded_channel();
let (new_blocks_tx, new_blocks_rx) = broadcast::channel(100);
self.new_blocks_txs.lock().as_mut().unwrap().push(new_blocks_tx);
new_blocks_rx

View File

@@ -1,8 +1,8 @@
//! Contains types that represent ethereum types in [reth_primitives] when used in RPC
use crate::Transaction;
use reth_primitives::{
Address, Block as PrimitiveBlock, Bloom, Bytes, Header as PrimitiveHeader, Withdrawal, H256,
H64, U256,
Address, Block as PrimitiveBlock, Bloom, Bytes, Header as PrimitiveHeader, SealedHeader,
Withdrawal, H256, H64, U256,
};
use reth_rlp::Encodable;
use serde::{ser::Error, Deserialize, Serialize, Serializer};
@@ -166,7 +166,7 @@ impl Block {
let uncles = block.ommers.into_iter().map(|h| h.hash_slow()).collect();
let base_fee_per_gas = block.header.base_fee_per_gas;
let header = Header::from_primitive_with_hash(block.header, block_hash);
let header = Header::from_primitive_with_hash(block.header.seal(block_hash));
Self {
header,
@@ -183,7 +183,7 @@ impl Block {
/// an Uncle from its header.
pub fn uncle_block_from_header(header: PrimitiveHeader) -> Self {
let hash = header.hash_slow();
let rpc_header = Header::from_primitive_with_hash(header.clone(), hash);
let rpc_header = Header::from_primitive_with_hash(header.clone().seal(hash));
let uncle_block = PrimitiveBlock { header, ..Default::default() };
let size = Some(U256::from(uncle_block.length()));
Self {
@@ -246,29 +246,33 @@ impl Header {
/// Converts the primitive header type to this RPC type
///
/// CAUTION: this takes the header's hash as is and does _not_ calculate the hash.
pub fn from_primitive_with_hash(primitive_header: PrimitiveHeader, block_hash: H256) -> Self {
let PrimitiveHeader {
parent_hash,
ommers_hash,
beneficiary,
state_root,
transactions_root,
receipts_root,
logs_bloom,
difficulty,
number,
gas_limit,
gas_used,
timestamp,
mix_hash,
nonce,
base_fee_per_gas: _,
extra_data,
withdrawals_root,
pub fn from_primitive_with_hash(primitive_header: SealedHeader) -> Self {
let SealedHeader {
header:
PrimitiveHeader {
parent_hash,
ommers_hash,
beneficiary,
state_root,
transactions_root,
receipts_root,
logs_bloom,
difficulty,
number,
gas_limit,
gas_used,
timestamp,
mix_hash,
nonce,
base_fee_per_gas: _,
extra_data,
withdrawals_root,
},
hash,
} = primitive_header;
Header {
hash: Some(block_hash),
hash: Some(hash),
parent_hash,
uncles_hash: ommers_hash,
miner: beneficiary,

View File

@@ -5,7 +5,7 @@ use futures::StreamExt;
use jsonrpsee::{types::SubscriptionResult, SubscriptionSink};
use reth_interfaces::events::ChainEventSubscriptions;
use reth_network_api::NetworkInfo;
use reth_primitives::{filter::FilteredParams, BlockId, TxHash};
use reth_primitives::{filter::FilteredParams, TxHash};
use reth_provider::{BlockProvider, EvmEnvProvider};
use reth_rpc_api::EthPubSubApiServer;
use reth_rpc_types::{
@@ -18,7 +18,7 @@ use reth_rpc_types::{
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use reth_transaction_pool::TransactionPool;
use tokio_stream::{
wrappers::{ReceiverStream, UnboundedReceiverStream},
wrappers::{BroadcastStream, ReceiverStream},
Stream,
};
@@ -121,7 +121,7 @@ async fn handle_accepted<Client, Pool, Events, Network>(
subscription_task_spawner.spawn(Box::pin(async move {
// get new block subscription
let mut new_blocks =
UnboundedReceiverStream::new(pubsub.chain_events.subscribe_new_blocks());
BroadcastStream::new(pubsub.chain_events.subscribe_new_blocks());
// get current sync status
let mut initial_sync_status = pubsub.network.is_syncing();
let current_sub_res = pubsub.sync_status(initial_sync_status).await;
@@ -206,16 +206,18 @@ where
{
/// Returns a stream that yields all new RPC blocks.
fn into_new_headers_stream(self) -> impl Stream<Item = Header> {
UnboundedReceiverStream::new(self.chain_events.subscribe_new_blocks()).map(|new_block| {
Header::from_primitive_with_hash(new_block.header.as_ref().clone(), new_block.hash)
BroadcastStream::new(self.chain_events.subscribe_new_blocks()).map(|new_block| {
let new_block = new_block.expect("new block subscription never ends; qed");
Header::from_primitive_with_hash(new_block.as_ref().clone())
})
}
/// Returns a stream that yields all logs that match the given filter.
fn into_log_stream(self, filter: FilteredParams) -> impl Stream<Item = Log> {
UnboundedReceiverStream::new(self.chain_events.subscribe_new_blocks())
BroadcastStream::new(self.chain_events.subscribe_new_blocks())
.filter_map(move |new_block| {
let block_id: BlockId = new_block.hash.into();
let Some(new_block) = new_block.ok() else { return futures::future::ready(None); };
let block_id = new_block.hash.into();
let txs = self.client.transactions_by_block(block_id).ok().flatten();
let receipts = self.client.receipts_by_block(block_id).ok().flatten();
match (txs, receipts) {