diff --git a/crates/interfaces/src/test_utils/events.rs b/crates/interfaces/src/test_utils/events.rs new file mode 100644 index 0000000000..427737dad2 --- /dev/null +++ b/crates/interfaces/src/test_utils/events.rs @@ -0,0 +1,33 @@ +use crate::events::{ChainEventSubscriptions, NewBlockNotification, NewBlockNotifications}; +use async_trait::async_trait; +use reth_primitives::{Header, H256}; +use std::sync::{Arc, Mutex}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; + +/// A test ChainEventSubscriptions +#[derive(Clone, Default)] +pub struct TestChainEventSubscriptions { + new_blocks_txs: Arc>>>, +} + +impl TestChainEventSubscriptions { + /// Adds new block to the queue that can be consumed with + /// [`TestChainEventSubscriptions::subscribe_new_blocks`] + pub fn add_new_block(&mut self, hash: H256, header: Header) { + let header = Arc::new(header); + self.new_blocks_txs + .lock() + .as_mut() + .unwrap() + .retain(|tx| tx.send(NewBlockNotification { hash, header: header.clone() }).is_ok()) + } +} + +impl ChainEventSubscriptions for TestChainEventSubscriptions { + fn subscribe_new_blocks(&self) -> NewBlockNotifications { + let (new_blocks_tx, new_blocks_rx) = unbounded_channel(); + self.new_blocks_txs.lock().as_mut().unwrap().push(new_blocks_tx); + + new_blocks_rx + } +} diff --git a/crates/interfaces/src/test_utils/mod.rs b/crates/interfaces/src/test_utils/mod.rs index b64acb592a..5fdab195c7 100644 --- a/crates/interfaces/src/test_utils/mod.rs +++ b/crates/interfaces/src/test_utils/mod.rs @@ -1,10 +1,12 @@ #![allow(unused)] mod bodies; +mod events; mod headers; /// Generators for different data structures like block headers, block bodies and ranges of those. pub mod generators; pub use bodies::*; +pub use events::*; pub use headers::*;