perf(rpc): use caching service for fetching logs and transactions (#2054)

This commit is contained in:
Matthias Seitz
2023-03-31 16:26:23 +02:00
committed by GitHub
parent ff6eb5a499
commit 4a8ed999c0
3 changed files with 94 additions and 21 deletions

View File

@@ -661,6 +661,7 @@ where
self.pool.clone(),
self.events.clone(),
self.network.clone(),
cache.clone(),
);
let eth = EthHandlers { api, cache, filter, pubsub };

View File

@@ -1,8 +1,8 @@
//! Async caching support for eth RPC
use futures::StreamExt;
use futures::{future::Either, StreamExt};
use reth_interfaces::{provider::ProviderError, Result};
use reth_primitives::{Block, Receipt, H256};
use reth_primitives::{Block, Receipt, TransactionSigned, H256};
use reth_provider::{BlockProvider, EvmEnvProvider, StateProviderFactory};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use revm::primitives::{BlockEnv, CfgEnv};
@@ -24,13 +24,21 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
/// The type that can send the response to a requested [Block]
type BlockResponseSender = oneshot::Sender<Result<Option<Block>>>;
/// The type that can send the response to a requested [Block]
type BlockTransactionsResponseSender = oneshot::Sender<Result<Option<Vec<TransactionSigned>>>>;
/// The type that can send the response to the requested receipts of a block.
type ReceiptsResponseSender = oneshot::Sender<Result<Option<Vec<Receipt>>>>;
/// The type that can send the response to a requested env
type EnvResponseSender = oneshot::Sender<Result<(CfgEnv, BlockEnv)>>;
type BlockLruCache<L> = MultiConsumerLruCache<H256, Block, L, BlockResponseSender>;
type BlockLruCache<L> = MultiConsumerLruCache<
H256,
Block,
L,
Either<BlockResponseSender, BlockTransactionsResponseSender>,
>;
type ReceiptsLruCache<L> = MultiConsumerLruCache<H256, Vec<Receipt>, L, ReceiptsResponseSender>;
@@ -141,6 +149,18 @@ impl EthStateCache {
rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
}
/// Requests the transactions of the [Block]
///
/// Returns `None` if the block does not exist.
pub(crate) async fn get_block_transactions(
&self,
block_hash: H256,
) -> Result<Option<Vec<TransactionSigned>>> {
let (response_tx, rx) = oneshot::channel();
let _ = self.to_service.send(CacheAction::GetBlockTransactions { block_hash, response_tx });
rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
}
/// Requests the [Receipt] for the block hash
///
/// Returns `None` if the block was not found.
@@ -231,7 +251,25 @@ where
}
// block is not in the cache, request it if this is the first consumer
if this.full_block_cache.queue(block_hash, response_tx) {
if this.full_block_cache.queue(block_hash, Either::Left(response_tx)) {
let client = this.client.clone();
let action_tx = this.action_tx.clone();
this.action_task_spawner.spawn(Box::pin(async move {
let res = client.block_by_hash(block_hash);
let _ = action_tx
.send(CacheAction::BlockResult { block_hash, res });
}));
}
}
CacheAction::GetBlockTransactions { block_hash, response_tx } => {
// check if block is cached
if let Some(block) = this.full_block_cache.cache.get(&block_hash) {
let _ = response_tx.send(Ok(Some(block.body.clone())));
continue
}
// block is not in the cache, request it if this is the first consumer
if this.full_block_cache.queue(block_hash, Either::Right(response_tx)) {
let client = this.client.clone();
let action_tx = this.action_tx.clone();
this.action_task_spawner.spawn(Box::pin(async move {
@@ -290,7 +328,16 @@ where
if let Some(queued) = this.full_block_cache.queued.remove(&block_hash) {
// send the response to queued senders
for tx in queued {
let _ = tx.send(res.clone());
match tx {
Either::Left(block_tx) => {
let _ = block_tx.send(res.clone());
}
Either::Right(transaction_tx) => {
let _ = transaction_tx.send(res.clone().map(
|maybe_block| maybe_block.map(|block| block.body),
));
}
}
}
}
@@ -381,6 +428,7 @@ where
/// All message variants sent through the channel
enum CacheAction {
GetBlock { block_hash: H256, response_tx: BlockResponseSender },
GetBlockTransactions { block_hash: H256, response_tx: BlockTransactionsResponseSender },
GetEnv { block_hash: H256, response_tx: EnvResponseSender },
GetReceipts { block_hash: H256, response_tx: ReceiptsResponseSender },
BlockResult { block_hash: H256, res: Result<Option<Block>> },

View File

@@ -1,11 +1,11 @@
//! `eth_` PubSub RPC handler implementation
use crate::eth::logs_utils;
use crate::eth::{cache::EthStateCache, logs_utils};
use futures::StreamExt;
use jsonrpsee::{types::SubscriptionResult, SubscriptionSink};
use reth_interfaces::events::ChainEventSubscriptions;
use reth_interfaces::events::{ChainEventSubscriptions, NewBlockNotification};
use reth_network_api::NetworkInfo;
use reth_primitives::{filter::FilteredParams, TxHash};
use reth_primitives::{filter::FilteredParams, Receipt, TransactionSigned, TxHash};
use reth_provider::{BlockProvider, EvmEnvProvider};
use reth_rpc_api::EthPubSubApiServer;
use reth_rpc_types::{
@@ -39,8 +39,21 @@ impl<Client, Pool, Events, Network> EthPubSub<Client, Pool, Events, Network> {
/// Creates a new, shareable instance.
///
/// Subscription tasks are spawned via [tokio::task::spawn]
pub fn new(client: Client, pool: Pool, chain_events: Events, network: Network) -> Self {
Self::with_spawner(client, pool, chain_events, network, Box::<TokioTaskExecutor>::default())
pub fn new(
client: Client,
pool: Pool,
chain_events: Events,
network: Network,
eth_cache: EthStateCache,
) -> Self {
Self::with_spawner(
client,
pool,
chain_events,
network,
eth_cache,
Box::<TokioTaskExecutor>::default(),
)
}
/// Creates a new, shareable instance.
@@ -49,9 +62,10 @@ impl<Client, Pool, Events, Network> EthPubSub<Client, Pool, Events, Network> {
pool: Pool,
chain_events: Events,
network: Network,
eth_cache: EthStateCache,
subscription_task_spawner: Box<dyn TaskSpawner>,
) -> Self {
let inner = EthPubSubInner { client, pool, chain_events, network };
let inner = EthPubSubInner { client, pool, chain_events, network, eth_cache };
Self { inner, subscription_task_spawner }
}
}
@@ -163,6 +177,8 @@ struct EthPubSubInner<Client, Pool, Events, Network> {
chain_events: Events,
/// The network.
network: Network,
/// The async cache frontend for eth related data
eth_cache: EthStateCache,
}
// == impl EthPubSubInner ===
@@ -203,6 +219,7 @@ where
Client: BlockProvider + EvmEnvProvider + 'static,
Events: ChainEventSubscriptions + 'static,
Network: NetworkInfo + 'static,
Pool: 'static,
{
/// Returns a stream that yields all new RPC blocks.
fn into_new_headers_stream(self) -> impl Stream<Item = Header> {
@@ -216,16 +233,7 @@ where
fn into_log_stream(self, filter: FilteredParams) -> impl Stream<Item = Log> {
BroadcastStream::new(self.chain_events.subscribe_new_blocks())
.filter_map(move |new_block| {
let Some(new_block) = new_block.ok() else { return futures::future::ready(None); };
let block_id = new_block.hash.into();
let txs = self.client.transactions_by_block(block_id).ok().flatten();
let receipts = self.client.receipts_by_block(block_id).ok().flatten();
match (txs, receipts) {
(Some(txs), Some(receipts)) => {
futures::future::ready(Some((new_block, txs, receipts)))
}
_ => futures::future::ready(None),
}
Box::pin(get_block_receipts(self.eth_cache.clone(), new_block.ok()))
})
.flat_map(move |(new_block, transactions, receipts)| {
let block_hash = new_block.hash;
@@ -240,3 +248,19 @@ where
})
}
}
/// Helper function for getting block receipts and transactions
async fn get_block_receipts(
eth_cache: EthStateCache,
new_block: Option<NewBlockNotification>,
) -> Option<(NewBlockNotification, Vec<TransactionSigned>, Vec<Receipt>)> {
let Some(new_block) = new_block else { return None; };
let (txs, receipts) = futures::join!(
eth_cache.get_block_transactions(new_block.hash),
eth_cache.get_receipts(new_block.hash)
);
match (txs.ok().flatten(), receipts.ok().flatten()) {
(Some(txs), Some(receipts)) => Some((new_block, txs, receipts)),
_ => None,
}
}