diff --git a/crates/rpc/rpc-eth-types/src/cache/mod.rs b/crates/rpc/rpc-eth-types/src/cache/mod.rs index d4db9d9c02..de69f5ae75 100644 --- a/crates/rpc/rpc-eth-types/src/cache/mod.rs +++ b/crates/rpc/rpc-eth-types/src/cache/mod.rs @@ -65,50 +65,6 @@ type HeaderLruCache = MultiConsumerLruCache { to_service: UnboundedSender>, } -/// Drop aware sender struct -#[derive(Debug)] -struct ActionSender { - blockhash: B256, - tx: Option>>, -} - -impl ActionSender { - const fn new(blockhash: B256, tx: Option>>) -> Self { - Self { blockhash, tx } - } - fn send_block(&mut self, block_sender: Result>>, ProviderError>) { - if let Some(tx) = self.tx.take() { - let _ = tx.send(CacheAction::BlockWithSendersResult { - block_hash: self.blockhash, - res: block_sender, - }); - } - } - fn send_receipts(&mut self, receipts: Result>>, ProviderError>) { - if let Some(tx) = self.tx.take() { - let _ = - tx.send(CacheAction::ReceiptsResult { block_hash: self.blockhash, res: receipts }); - } - } - fn send_header(&mut self, header: Result<::Header, ProviderError>) { - if let Some(tx) = self.tx.take() { - let _ = tx.send(CacheAction::HeaderResult { - block_hash: self.blockhash, - res: Box::new(header), - }); - } - } -} -impl Drop for ActionSender { - fn drop(&mut self) { - if let Some(tx) = self.tx.take() { - let _ = tx.send(CacheAction::BlockWithSendersResult { - block_hash: self.blockhash, - res: Err(ProviderError::CacheServiceUnavailable), - }); - } - } -} impl Clone for EthStateCache { fn clone(&self) -> Self { @@ -406,7 +362,7 @@ where let action_tx = this.action_tx.clone(); let rate_limiter = this.rate_limiter.clone(); let mut action_sender = - ActionSender::new(block_hash, Some(action_tx)); + ActionSender::new(CacheKind::Block, block_hash, action_tx); this.action_task_spawner.spawn_blocking(Box::pin(async move { // Acquire permit let _permit = rate_limiter.acquire().await; @@ -435,7 +391,7 @@ where let action_tx = this.action_tx.clone(); let rate_limiter = this.rate_limiter.clone(); let mut action_sender = - ActionSender::new(block_hash, Some(action_tx)); + ActionSender::new(CacheKind::Receipt, block_hash, action_tx); this.action_task_spawner.spawn_blocking(Box::pin(async move { // Acquire permit let _permit = rate_limiter.acquire().await; @@ -467,7 +423,7 @@ where let action_tx = this.action_tx.clone(); let rate_limiter = this.rate_limiter.clone(); let mut action_sender = - ActionSender::new(block_hash, Some(action_tx)); + ActionSender::new(CacheKind::Header, block_hash, action_tx); this.action_task_spawner.spawn_blocking(Box::pin(async move { // Acquire permit let _permit = rate_limiter.acquire().await; @@ -580,6 +536,77 @@ impl ChainChange { } } +/// Identifier for the caches. +#[derive(Copy, Clone, Debug)] +enum CacheKind { + Block, + Receipt, + Header, +} + +/// Drop aware sender struct that ensures a response is always emitted even if the db task panics +/// before a result could be sent. +/// +/// This type wraps a sender and in case the sender is still present on drop emit an error response. +#[derive(Debug)] +struct ActionSender { + kind: CacheKind, + blockhash: B256, + tx: Option>>, +} + +impl ActionSender { + const fn new(kind: CacheKind, blockhash: B256, tx: UnboundedSender>) -> Self { + Self { kind, blockhash, tx: Some(tx) } + } + + fn send_block(&mut self, block_sender: Result>>, ProviderError>) { + if let Some(tx) = self.tx.take() { + let _ = tx.send(CacheAction::BlockWithSendersResult { + block_hash: self.blockhash, + res: block_sender, + }); + } + } + + fn send_receipts(&mut self, receipts: Result>>, ProviderError>) { + if let Some(tx) = self.tx.take() { + let _ = + tx.send(CacheAction::ReceiptsResult { block_hash: self.blockhash, res: receipts }); + } + } + + fn send_header(&mut self, header: Result<::Header, ProviderError>) { + if let Some(tx) = self.tx.take() { + let _ = tx.send(CacheAction::HeaderResult { + block_hash: self.blockhash, + res: Box::new(header), + }); + } + } +} +impl Drop for ActionSender { + fn drop(&mut self) { + if let Some(tx) = self.tx.take() { + let msg = match self.kind { + CacheKind::Block => CacheAction::BlockWithSendersResult { + block_hash: self.blockhash, + res: Err(ProviderError::CacheServiceUnavailable), + }, + CacheKind::Receipt => CacheAction::ReceiptsResult { + block_hash: self.blockhash, + res: Err(ProviderError::CacheServiceUnavailable), + }, + CacheKind::Header => CacheAction::HeaderResult { + block_hash: self.blockhash, + res: Box::new(Err(ProviderError::CacheServiceUnavailable)), + }, + }; + let _ = tx.send(msg); + } + } +} + /// Awaits for new chain events and directly inserts them into the cache so they're available /// immediately before they need to be fetched from disk. ///