Add drop aware sender new type (fixes #13242) (#13495)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
abdaze
2024-12-24 15:43:20 +05:30
committed by GitHub
parent 934fd1f7f0
commit f67625fd67
2 changed files with 57 additions and 11 deletions

View File

@@ -65,6 +65,53 @@ type HeaderLruCache<H, L> = MultiConsumerLruCache<B256, H, L, HeaderResponseSend
pub struct EthStateCache<B: Block, R> {
to_service: UnboundedSender<CacheAction<B, R>>,
}
/// Drop aware sender struct
#[derive(Debug)]
struct ActionSender<B: Block, R: Send + Sync> {
blockhash: B256,
tx: Option<UnboundedSender<CacheAction<B, R>>>,
}
impl<R: Send + Sync, B: Block> ActionSender<B, R> {
const fn new(blockhash: B256, tx: Option<UnboundedSender<CacheAction<B, R>>>) -> Self {
Self { blockhash, tx }
}
fn send_block(
&mut self,
block_sender: Result<Option<Arc<SealedBlockWithSenders<B>>>, ProviderError>,
) {
if let Some(tx) = self.tx.take() {
let _ = tx.send(CacheAction::BlockWithSendersResult {
block_hash: self.blockhash,
res: block_sender,
});
}
}
fn send_receipts(&mut self, receipts: Result<Option<Arc<Vec<R>>>, ProviderError>) {
if let Some(tx) = self.tx.take() {
let _ =
tx.send(CacheAction::ReceiptsResult { block_hash: self.blockhash, res: receipts });
}
}
fn send_header(&mut self, header: Result<<B as Block>::Header, ProviderError>) {
if let Some(tx) = self.tx.take() {
let _ = tx.send(CacheAction::HeaderResult {
block_hash: self.blockhash,
res: Box::new(header),
});
}
}
}
impl<R: Send + Sync, B: Block> Drop for ActionSender<B, R> {
fn drop(&mut self) {
if let Some(tx) = self.tx.take() {
let _ = tx.send(CacheAction::BlockWithSendersResult {
block_hash: self.blockhash,
res: Err(ProviderError::CacheServiceUnavailable),
});
}
}
}
impl<B: Block, R> Clone for EthStateCache<B, R> {
fn clone(&self) -> Self {
@@ -359,6 +406,8 @@ where
let provider = this.provider.clone();
let action_tx = this.action_tx.clone();
let rate_limiter = this.rate_limiter.clone();
let mut action_sender =
ActionSender::new(block_hash, Some(action_tx));
this.action_task_spawner.spawn_blocking(Box::pin(async move {
// Acquire permit
let _permit = rate_limiter.acquire().await;
@@ -370,10 +419,7 @@ where
TransactionVariant::WithHash,
)
.map(|maybe_block| maybe_block.map(Arc::new));
let _ = action_tx.send(CacheAction::BlockWithSendersResult {
block_hash,
res: block_sender,
});
action_sender.send_block(block_sender);
}));
}
}
@@ -389,6 +435,8 @@ where
let provider = this.provider.clone();
let action_tx = this.action_tx.clone();
let rate_limiter = this.rate_limiter.clone();
let mut action_sender =
ActionSender::new(block_hash, Some(action_tx));
this.action_task_spawner.spawn_blocking(Box::pin(async move {
// Acquire permit
let _permit = rate_limiter.acquire().await;
@@ -396,8 +444,7 @@ where
.receipts_by_block(block_hash.into())
.map(|maybe_receipts| maybe_receipts.map(Arc::new));
let _ = action_tx
.send(CacheAction::ReceiptsResult { block_hash, res });
action_sender.send_receipts(res);
}));
}
}
@@ -414,6 +461,8 @@ where
let provider = this.provider.clone();
let action_tx = this.action_tx.clone();
let rate_limiter = this.rate_limiter.clone();
let mut action_sender =
ActionSender::new(block_hash, Some(action_tx));
this.action_task_spawner.spawn_blocking(Box::pin(async move {
// Acquire permit
let _permit = rate_limiter.acquire().await;
@@ -422,10 +471,7 @@ where
ProviderError::HeaderNotFound(block_hash.into())
})
});
let _ = action_tx.send(CacheAction::HeaderResult {
block_hash,
res: Box::new(header),
});
action_sender.send_header(header);
}));
}
}

View File

@@ -245,7 +245,7 @@ pub async fn fee_history_cache_new_blocks_task<St, Provider, N>(
event = events.next() => {
let Some(event) = event else {
// the stream ended, we are done
break;
break
};
let committed = event.committed();