mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-08 03:01:12 -04:00
feat(rpc): add receipts caching (#2038)
This commit is contained in:
@@ -2,7 +2,7 @@
|
||||
|
||||
use futures::StreamExt;
|
||||
use reth_interfaces::{provider::ProviderError, Result};
|
||||
use reth_primitives::{Block, H256};
|
||||
use reth_primitives::{Block, Receipt, H256};
|
||||
use reth_provider::{BlockProvider, EvmEnvProvider, StateProviderFactory};
|
||||
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
|
||||
use revm::primitives::{BlockEnv, CfgEnv};
|
||||
@@ -24,11 +24,16 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
/// The type that can send the response to a requested [Block]
|
||||
type BlockResponseSender = oneshot::Sender<Result<Option<Block>>>;
|
||||
|
||||
/// The type that can send the response to the requested receipts of a block.
|
||||
type ReceiptsResponseSender = oneshot::Sender<Result<Option<Vec<Receipt>>>>;
|
||||
|
||||
/// The type that can send the response to a requested env
|
||||
type EnvResponseSender = oneshot::Sender<Result<(CfgEnv, BlockEnv)>>;
|
||||
|
||||
type BlockLruCache<L> = MultiConsumerLruCache<H256, Block, L, BlockResponseSender>;
|
||||
|
||||
type ReceiptsLruCache<L> = MultiConsumerLruCache<H256, Vec<Receipt>, L, ReceiptsResponseSender>;
|
||||
|
||||
type EnvLruCache<L> = MultiConsumerLruCache<H256, (CfgEnv, BlockEnv), L, EnvResponseSender>;
|
||||
|
||||
/// Settings for the [EthStateCache]
|
||||
@@ -39,6 +44,10 @@ pub struct EthStateCacheConfig {
|
||||
///
|
||||
/// Default is 50MB
|
||||
pub max_block_bytes: usize,
|
||||
/// Max number of bytes for cached receipt data.
|
||||
///
|
||||
/// Default is 50MB
|
||||
pub max_receipt_bytes: usize,
|
||||
/// Max number of bytes for cached env data.
|
||||
///
|
||||
/// Default is 500kb (env configs are very small)
|
||||
@@ -47,7 +56,11 @@ pub struct EthStateCacheConfig {
|
||||
|
||||
impl Default for EthStateCacheConfig {
|
||||
fn default() -> Self {
|
||||
Self { max_block_bytes: 50 * 1024 * 1024, max_env_bytes: 500 * 1024 }
|
||||
Self {
|
||||
max_block_bytes: 50 * 1024 * 1024,
|
||||
max_receipt_bytes: 50 * 1024 * 1024,
|
||||
max_env_bytes: 500 * 1024,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -66,12 +79,14 @@ impl EthStateCache {
|
||||
client: Client,
|
||||
action_task_spawner: Tasks,
|
||||
max_block_bytes: usize,
|
||||
max_receipt_bytes: usize,
|
||||
max_env_bytes: usize,
|
||||
) -> (Self, EthStateCacheService<Client, Tasks>) {
|
||||
let (to_service, rx) = unbounded_channel();
|
||||
let service = EthStateCacheService {
|
||||
client,
|
||||
full_block_cache: BlockLruCache::with_memory_budget(max_block_bytes),
|
||||
receipts_cache: ReceiptsLruCache::with_memory_budget(max_receipt_bytes),
|
||||
evm_env_cache: EnvLruCache::with_memory_budget(max_env_bytes),
|
||||
action_tx: to_service.clone(),
|
||||
action_rx: UnboundedReceiverStream::new(rx),
|
||||
@@ -105,9 +120,14 @@ impl EthStateCache {
|
||||
Client: StateProviderFactory + BlockProvider + EvmEnvProvider + Clone + Unpin + 'static,
|
||||
Tasks: TaskSpawner + Clone + 'static,
|
||||
{
|
||||
let EthStateCacheConfig { max_block_bytes, max_env_bytes } = config;
|
||||
let (this, service) =
|
||||
Self::create(client, executor.clone(), max_block_bytes, max_env_bytes);
|
||||
let EthStateCacheConfig { max_block_bytes, max_receipt_bytes, max_env_bytes } = config;
|
||||
let (this, service) = Self::create(
|
||||
client,
|
||||
executor.clone(),
|
||||
max_block_bytes,
|
||||
max_receipt_bytes,
|
||||
max_env_bytes,
|
||||
);
|
||||
executor.spawn_critical("eth state cache", Box::pin(service));
|
||||
this
|
||||
}
|
||||
@@ -121,6 +141,15 @@ impl EthStateCache {
|
||||
rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
|
||||
}
|
||||
|
||||
/// Requests the [Receipt] for the block hash
|
||||
///
|
||||
/// Returns `None` if the block was not found.
|
||||
pub(crate) async fn get_receipts(&self, block_hash: H256) -> Result<Option<Vec<Receipt>>> {
|
||||
let (response_tx, rx) = oneshot::channel();
|
||||
let _ = self.to_service.send(CacheAction::GetReceipts { block_hash, response_tx });
|
||||
rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
|
||||
}
|
||||
|
||||
/// Requests the evm env config for the block hash.
|
||||
///
|
||||
/// Returns an error if the corresponding header (required for populating the envs) was not
|
||||
@@ -152,15 +181,19 @@ pub(crate) struct EthStateCacheService<
|
||||
Client,
|
||||
Tasks,
|
||||
LimitBlocks = ByMemoryUsage,
|
||||
LimitReceipts = ByMemoryUsage,
|
||||
LimitEnvs = ByMemoryUsage,
|
||||
> where
|
||||
LimitBlocks: Limiter<H256, Block>,
|
||||
LimitReceipts: Limiter<H256, Vec<Receipt>>,
|
||||
LimitEnvs: Limiter<H256, (CfgEnv, BlockEnv)>,
|
||||
{
|
||||
/// The type used to lookup data from disk
|
||||
client: Client,
|
||||
/// The LRU cache for full blocks grouped by their hash.
|
||||
full_block_cache: BlockLruCache<LimitBlocks>,
|
||||
/// The LRU cache for full blocks grouped by their hash.
|
||||
receipts_cache: ReceiptsLruCache<LimitReceipts>,
|
||||
/// The LRU cache for revm environments
|
||||
evm_env_cache: EnvLruCache<LimitEnvs>,
|
||||
/// Sender half of the action channel.
|
||||
@@ -208,6 +241,26 @@ where
|
||||
}));
|
||||
}
|
||||
}
|
||||
CacheAction::GetReceipts { block_hash, response_tx } => {
|
||||
// check if block is cached
|
||||
if let Some(receipts) =
|
||||
this.receipts_cache.cache.get(&block_hash).cloned()
|
||||
{
|
||||
let _ = response_tx.send(Ok(Some(receipts)));
|
||||
continue
|
||||
}
|
||||
|
||||
// block is not in the cache, request it if this is the first consumer
|
||||
if this.receipts_cache.queue(block_hash, response_tx) {
|
||||
let client = this.client.clone();
|
||||
let action_tx = this.action_tx.clone();
|
||||
this.action_task_spawner.spawn(Box::pin(async move {
|
||||
let res = client.receipts_by_block(block_hash.into());
|
||||
let _ = action_tx
|
||||
.send(CacheAction::ReceiptsResult { block_hash, res });
|
||||
}));
|
||||
}
|
||||
}
|
||||
CacheAction::GetEnv { block_hash, response_tx } => {
|
||||
// check if env data is cached
|
||||
if let Some(env) = this.evm_env_cache.cache.get(&block_hash).cloned() {
|
||||
@@ -246,6 +299,19 @@ where
|
||||
this.full_block_cache.cache.insert(block_hash, block);
|
||||
}
|
||||
}
|
||||
CacheAction::ReceiptsResult { block_hash, res } => {
|
||||
if let Some(queued) = this.receipts_cache.queued.remove(&block_hash) {
|
||||
// send the response to queued senders
|
||||
for tx in queued {
|
||||
let _ = tx.send(res.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// cache good receipts
|
||||
if let Ok(Some(receipts)) = res {
|
||||
this.receipts_cache.cache.insert(block_hash, receipts);
|
||||
}
|
||||
}
|
||||
CacheAction::EnvResult { block_hash, res } => {
|
||||
let res = *res;
|
||||
if let Some(queued) = this.evm_env_cache.queued.remove(&block_hash) {
|
||||
@@ -316,6 +382,8 @@ where
|
||||
enum CacheAction {
|
||||
GetBlock { block_hash: H256, response_tx: BlockResponseSender },
|
||||
GetEnv { block_hash: H256, response_tx: EnvResponseSender },
|
||||
GetReceipts { block_hash: H256, response_tx: ReceiptsResponseSender },
|
||||
BlockResult { block_hash: H256, res: Result<Option<Block>> },
|
||||
ReceiptsResult { block_hash: H256, res: Result<Option<Vec<Receipt>>> },
|
||||
EnvResult { block_hash: H256, res: Box<Result<(CfgEnv, BlockEnv)>> },
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user