fix: return the correct action on drop (#14054)

This commit is contained in:
Matthias Seitz
2025-01-29 10:47:04 +01:00
committed by GitHub
parent a4b408848f
commit dac2085d46

View File

@@ -65,50 +65,6 @@ type HeaderLruCache<H, L> = MultiConsumerLruCache<B256, H, L, HeaderResponseSend
pub struct EthStateCache<B: Block, R> {
to_service: UnboundedSender<CacheAction<B, R>>,
}
/// Drop aware sender struct
#[derive(Debug)]
struct ActionSender<B: Block, R: Send + Sync> {
blockhash: B256,
tx: Option<UnboundedSender<CacheAction<B, R>>>,
}
impl<R: Send + Sync, B: Block> ActionSender<B, R> {
const fn new(blockhash: B256, tx: Option<UnboundedSender<CacheAction<B, R>>>) -> Self {
Self { blockhash, tx }
}
fn send_block(&mut self, block_sender: Result<Option<Arc<RecoveredBlock<B>>>, ProviderError>) {
if let Some(tx) = self.tx.take() {
let _ = tx.send(CacheAction::BlockWithSendersResult {
block_hash: self.blockhash,
res: block_sender,
});
}
}
fn send_receipts(&mut self, receipts: Result<Option<Arc<Vec<R>>>, ProviderError>) {
if let Some(tx) = self.tx.take() {
let _ =
tx.send(CacheAction::ReceiptsResult { block_hash: self.blockhash, res: receipts });
}
}
fn send_header(&mut self, header: Result<<B as Block>::Header, ProviderError>) {
if let Some(tx) = self.tx.take() {
let _ = tx.send(CacheAction::HeaderResult {
block_hash: self.blockhash,
res: Box::new(header),
});
}
}
}
impl<R: Send + Sync, B: Block> Drop for ActionSender<B, R> {
fn drop(&mut self) {
if let Some(tx) = self.tx.take() {
let _ = tx.send(CacheAction::BlockWithSendersResult {
block_hash: self.blockhash,
res: Err(ProviderError::CacheServiceUnavailable),
});
}
}
}
impl<B: Block, R> Clone for EthStateCache<B, R> {
fn clone(&self) -> Self {
@@ -406,7 +362,7 @@ where
let action_tx = this.action_tx.clone();
let rate_limiter = this.rate_limiter.clone();
let mut action_sender =
ActionSender::new(block_hash, Some(action_tx));
ActionSender::new(CacheKind::Block, block_hash, action_tx);
this.action_task_spawner.spawn_blocking(Box::pin(async move {
// Acquire permit
let _permit = rate_limiter.acquire().await;
@@ -435,7 +391,7 @@ where
let action_tx = this.action_tx.clone();
let rate_limiter = this.rate_limiter.clone();
let mut action_sender =
ActionSender::new(block_hash, Some(action_tx));
ActionSender::new(CacheKind::Receipt, block_hash, action_tx);
this.action_task_spawner.spawn_blocking(Box::pin(async move {
// Acquire permit
let _permit = rate_limiter.acquire().await;
@@ -467,7 +423,7 @@ where
let action_tx = this.action_tx.clone();
let rate_limiter = this.rate_limiter.clone();
let mut action_sender =
ActionSender::new(block_hash, Some(action_tx));
ActionSender::new(CacheKind::Header, block_hash, action_tx);
this.action_task_spawner.spawn_blocking(Box::pin(async move {
// Acquire permit
let _permit = rate_limiter.acquire().await;
@@ -580,6 +536,77 @@ impl<B: Block, R: Clone> ChainChange<B, R> {
}
}
/// Identifier for the caches.
#[derive(Copy, Clone, Debug)]
enum CacheKind {
Block,
Receipt,
Header,
}
/// Drop aware sender struct that ensures a response is always emitted even if the db task panics
/// before a result could be sent.
///
/// This type wraps a sender and in case the sender is still present on drop emit an error response.
#[derive(Debug)]
struct ActionSender<B: Block, R: Send + Sync> {
kind: CacheKind,
blockhash: B256,
tx: Option<UnboundedSender<CacheAction<B, R>>>,
}
impl<R: Send + Sync, B: Block> ActionSender<B, R> {
const fn new(kind: CacheKind, blockhash: B256, tx: UnboundedSender<CacheAction<B, R>>) -> Self {
Self { kind, blockhash, tx: Some(tx) }
}
fn send_block(&mut self, block_sender: Result<Option<Arc<RecoveredBlock<B>>>, ProviderError>) {
if let Some(tx) = self.tx.take() {
let _ = tx.send(CacheAction::BlockWithSendersResult {
block_hash: self.blockhash,
res: block_sender,
});
}
}
fn send_receipts(&mut self, receipts: Result<Option<Arc<Vec<R>>>, ProviderError>) {
if let Some(tx) = self.tx.take() {
let _ =
tx.send(CacheAction::ReceiptsResult { block_hash: self.blockhash, res: receipts });
}
}
fn send_header(&mut self, header: Result<<B as Block>::Header, ProviderError>) {
if let Some(tx) = self.tx.take() {
let _ = tx.send(CacheAction::HeaderResult {
block_hash: self.blockhash,
res: Box::new(header),
});
}
}
}
impl<R: Send + Sync, B: Block> Drop for ActionSender<B, R> {
fn drop(&mut self) {
if let Some(tx) = self.tx.take() {
let msg = match self.kind {
CacheKind::Block => CacheAction::BlockWithSendersResult {
block_hash: self.blockhash,
res: Err(ProviderError::CacheServiceUnavailable),
},
CacheKind::Receipt => CacheAction::ReceiptsResult {
block_hash: self.blockhash,
res: Err(ProviderError::CacheServiceUnavailable),
},
CacheKind::Header => CacheAction::HeaderResult {
block_hash: self.blockhash,
res: Box::new(Err(ProviderError::CacheServiceUnavailable)),
},
};
let _ = tx.send(msg);
}
}
}
/// Awaits for new chain events and directly inserts them into the cache so they're available
/// immediately before they need to be fetched from disk.
///