mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-09 23:38:10 -05:00
perf(trie): Cache overlays in the OverlayStateProviderFactory (#19752)
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
use alloy_primitives::{BlockNumber, B256};
|
||||
use metrics::Histogram;
|
||||
use metrics::{Counter, Histogram};
|
||||
use reth_db_api::DatabaseError;
|
||||
use reth_errors::{ProviderError, ProviderResult};
|
||||
use reth_metrics::Metrics;
|
||||
@@ -19,7 +19,8 @@ use reth_trie_db::{
|
||||
DatabaseHashedCursorFactory, DatabaseHashedPostState, DatabaseTrieCursorFactory,
|
||||
};
|
||||
use std::{
|
||||
sync::Arc,
|
||||
collections::{hash_map::Entry, HashMap},
|
||||
sync::{Arc, RwLock},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tracing::{debug, debug_span, instrument};
|
||||
@@ -38,6 +39,17 @@ pub(crate) struct OverlayStateProviderMetrics {
|
||||
trie_updates_size: Histogram,
|
||||
/// Size of hashed state (number of entries)
|
||||
hashed_state_size: Histogram,
|
||||
/// Overall duration of the [`OverlayStateProviderFactory::database_provider_ro`] call
|
||||
database_provider_ro_duration: Histogram,
|
||||
/// Number of cache misses when fetching [`Overlay`]s from the overlay cache.
|
||||
overlay_cache_misses: Counter,
|
||||
}
|
||||
|
||||
/// Contains all fields required to initialize an [`OverlayStateProvider`].
|
||||
#[derive(Debug, Clone)]
|
||||
struct Overlay {
|
||||
trie_updates: Arc<TrieUpdatesSorted>,
|
||||
hashed_post_state: Arc<HashedPostStateSorted>,
|
||||
}
|
||||
|
||||
/// Factory for creating overlay state providers with optional reverts and overlays.
|
||||
@@ -56,6 +68,9 @@ pub struct OverlayStateProviderFactory<F> {
|
||||
hashed_state_overlay: Option<Arc<HashedPostStateSorted>>,
|
||||
/// Metrics for tracking provider operations
|
||||
metrics: OverlayStateProviderMetrics,
|
||||
/// A cache which maps `db_tip -> Overlay`. If the db tip changes during usage of the factory
|
||||
/// then a new entry will get added to this, but in most cases only one entry is present.
|
||||
overlay_cache: Arc<RwLock<HashMap<BlockNumber, Overlay>>>,
|
||||
}
|
||||
|
||||
impl<F> OverlayStateProviderFactory<F> {
|
||||
@@ -67,6 +82,7 @@ impl<F> OverlayStateProviderFactory<F> {
|
||||
trie_overlay: None,
|
||||
hashed_state_overlay: None,
|
||||
metrics: OverlayStateProviderMetrics::default(),
|
||||
overlay_cache: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -103,7 +119,10 @@ where
|
||||
F::Provider: TrieReader + StageCheckpointReader + PruneCheckpointReader + BlockNumReader,
|
||||
{
|
||||
/// Returns the block number for [`Self`]'s `block_hash` field, if any.
|
||||
fn get_block_number(&self, provider: &F::Provider) -> ProviderResult<Option<BlockNumber>> {
|
||||
fn get_requested_block_number(
|
||||
&self,
|
||||
provider: &F::Provider,
|
||||
) -> ProviderResult<Option<BlockNumber>> {
|
||||
if let Some(block_hash) = self.block_hash {
|
||||
Ok(Some(
|
||||
provider
|
||||
@@ -115,6 +134,16 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the block which is at the tip of the DB, i.e. the block which the state tables of
|
||||
/// the DB are currently synced to.
|
||||
fn get_db_tip_block_number(&self, provider: &F::Provider) -> ProviderResult<BlockNumber> {
|
||||
provider
|
||||
.get_stage_checkpoint(StageId::MerkleChangeSets)?
|
||||
.as_ref()
|
||||
.map(|chk| chk.block_number)
|
||||
.ok_or_else(|| ProviderError::InsufficientChangesets { requested: 0, available: 0..=0 })
|
||||
}
|
||||
|
||||
/// Returns whether or not it is required to collect reverts, and validates that there are
|
||||
/// sufficient changesets to revert to the requested block number if so.
|
||||
///
|
||||
@@ -124,27 +153,19 @@ where
|
||||
fn reverts_required(
|
||||
&self,
|
||||
provider: &F::Provider,
|
||||
db_tip_block: BlockNumber,
|
||||
requested_block: BlockNumber,
|
||||
) -> ProviderResult<bool> {
|
||||
// Get the MerkleChangeSets stage and prune checkpoints.
|
||||
let stage_checkpoint = provider.get_stage_checkpoint(StageId::MerkleChangeSets)?;
|
||||
let prune_checkpoint = provider.get_prune_checkpoint(PruneSegment::MerkleChangeSets)?;
|
||||
|
||||
// Get the upper bound from stage checkpoint
|
||||
let upper_bound =
|
||||
stage_checkpoint.as_ref().map(|chk| chk.block_number).ok_or_else(|| {
|
||||
ProviderError::InsufficientChangesets {
|
||||
requested: requested_block,
|
||||
available: 0..=0,
|
||||
}
|
||||
})?;
|
||||
|
||||
// If the requested block is the DB tip (determined by the MerkleChangeSets stage
|
||||
// checkpoint) then there won't be any reverts necessary, and we can simply return Ok.
|
||||
if upper_bound == requested_block {
|
||||
// If the requested block is the DB tip then there won't be any reverts necessary, and we
|
||||
// can simply return Ok.
|
||||
if db_tip_block == requested_block {
|
||||
return Ok(false)
|
||||
}
|
||||
|
||||
// Get the MerkleChangeSets prune checkpoints, which will be used to determine the lower
|
||||
// bound.
|
||||
let prune_checkpoint = provider.get_prune_checkpoint(PruneSegment::MerkleChangeSets)?;
|
||||
|
||||
// Extract the lower bound from prune checkpoint if available.
|
||||
//
|
||||
// If not available we assume pruning has never ran and so there is no lower bound. This
|
||||
@@ -158,7 +179,7 @@ where
|
||||
.map(|block_number| block_number + 1)
|
||||
.unwrap_or_default();
|
||||
|
||||
let available_range = lower_bound..=upper_bound;
|
||||
let available_range = lower_bound..=db_tip_block;
|
||||
|
||||
// Check if the requested block is within the available range
|
||||
if !available_range.contains(&requested_block) {
|
||||
@@ -170,29 +191,19 @@ where
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
|
||||
impl<F> DatabaseProviderROFactory for OverlayStateProviderFactory<F>
|
||||
where
|
||||
F: DatabaseProviderFactory,
|
||||
F::Provider: TrieReader + StageCheckpointReader + PruneCheckpointReader + BlockNumReader,
|
||||
{
|
||||
type Provider = OverlayStateProvider<F::Provider>;
|
||||
|
||||
/// Create a read-only [`OverlayStateProvider`].
|
||||
#[instrument(level = "debug", target = "providers::state::overlay", skip_all)]
|
||||
fn database_provider_ro(&self) -> ProviderResult<OverlayStateProvider<F::Provider>> {
|
||||
// Get a read-only provider
|
||||
let provider = {
|
||||
let _guard =
|
||||
debug_span!(target: "providers::state::overlay", "Creating db provider").entered();
|
||||
|
||||
let start = Instant::now();
|
||||
let res = self.factory.database_provider_ro()?;
|
||||
self.metrics.create_provider_duration.record(start.elapsed());
|
||||
res
|
||||
};
|
||||
|
||||
/// Calculates a new [`Overlay`] given a transaction and the current db tip.
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
target = "providers::state::overlay",
|
||||
skip_all,
|
||||
fields(db_tip_block)
|
||||
)]
|
||||
fn calculate_overlay(
|
||||
&self,
|
||||
provider: &F::Provider,
|
||||
db_tip_block: BlockNumber,
|
||||
) -> ProviderResult<Overlay> {
|
||||
// Set up variables we'll use for recording metrics. There's two different code-paths here,
|
||||
// and we want to make sure both record metrics, so we do metrics recording after.
|
||||
let retrieve_trie_reverts_duration;
|
||||
@@ -201,9 +212,9 @@ where
|
||||
let hashed_state_updates_total_len;
|
||||
|
||||
// If block_hash is provided, collect reverts
|
||||
let (trie_updates, hashed_state) = if let Some(from_block) =
|
||||
self.get_block_number(&provider)? &&
|
||||
self.reverts_required(&provider, from_block)?
|
||||
let (trie_updates, hashed_post_state) = if let Some(from_block) =
|
||||
self.get_requested_block_number(provider)? &&
|
||||
self.reverts_required(provider, db_tip_block, from_block)?
|
||||
{
|
||||
// Collect trie reverts
|
||||
let mut trie_reverts = {
|
||||
@@ -295,7 +306,89 @@ where
|
||||
self.metrics.trie_updates_size.record(trie_updates_total_len as f64);
|
||||
self.metrics.hashed_state_size.record(hashed_state_updates_total_len as f64);
|
||||
|
||||
Ok(OverlayStateProvider::new(provider, trie_updates, hashed_state))
|
||||
Ok(Overlay { trie_updates, hashed_post_state })
|
||||
}
|
||||
|
||||
/// Fetches an [`Overlay`] from the cache based on the current db tip block. If there is no
|
||||
/// cached value then this calculates the [`Overlay`] and populates the cache.
|
||||
#[instrument(level = "debug", target = "providers::state::overlay", skip_all)]
|
||||
fn get_overlay(&self, provider: &F::Provider) -> ProviderResult<Overlay> {
|
||||
// If we have no anchor block configured then we will never need to get trie reverts, just
|
||||
// return the in-memory overlay.
|
||||
if self.block_hash.is_none() {
|
||||
let trie_updates =
|
||||
self.trie_overlay.clone().unwrap_or_else(|| Arc::new(TrieUpdatesSorted::default()));
|
||||
let hashed_post_state = self
|
||||
.hashed_state_overlay
|
||||
.clone()
|
||||
.unwrap_or_else(|| Arc::new(HashedPostStateSorted::default()));
|
||||
return Ok(Overlay { trie_updates, hashed_post_state })
|
||||
}
|
||||
|
||||
let db_tip_block = self.get_db_tip_block_number(provider)?;
|
||||
|
||||
// If the overlay is present in the cache then return it directly.
|
||||
if let Some(overlay) =
|
||||
self.overlay_cache.as_ref().read().expect("poisoned mutex").get(&db_tip_block)
|
||||
{
|
||||
return Ok(overlay.clone());
|
||||
}
|
||||
|
||||
// If the overlay is not present then we need to calculate a new one. We grab a write lock,
|
||||
// and then check the cache again in case some other thread populated the cache since we
|
||||
// checked with the read-lock. If still not present we calculate and populate.
|
||||
let mut cache_miss = false;
|
||||
let overlay = match self
|
||||
.overlay_cache
|
||||
.as_ref()
|
||||
.write()
|
||||
.expect("poisoned mutex")
|
||||
.entry(db_tip_block)
|
||||
{
|
||||
Entry::Occupied(entry) => entry.get().clone(),
|
||||
Entry::Vacant(entry) => {
|
||||
cache_miss = true;
|
||||
let overlay = self.calculate_overlay(provider, db_tip_block)?;
|
||||
entry.insert(overlay.clone());
|
||||
overlay
|
||||
}
|
||||
};
|
||||
|
||||
if cache_miss {
|
||||
self.metrics.overlay_cache_misses.increment(1);
|
||||
}
|
||||
|
||||
Ok(overlay)
|
||||
}
|
||||
}
|
||||
|
||||
impl<F> DatabaseProviderROFactory for OverlayStateProviderFactory<F>
|
||||
where
|
||||
F: DatabaseProviderFactory,
|
||||
F::Provider: TrieReader + StageCheckpointReader + PruneCheckpointReader + BlockNumReader,
|
||||
{
|
||||
type Provider = OverlayStateProvider<F::Provider>;
|
||||
|
||||
/// Create a read-only [`OverlayStateProvider`].
|
||||
#[instrument(level = "debug", target = "providers::state::overlay", skip_all)]
|
||||
fn database_provider_ro(&self) -> ProviderResult<OverlayStateProvider<F::Provider>> {
|
||||
let overall_start = Instant::now();
|
||||
|
||||
// Get a read-only provider
|
||||
let provider = {
|
||||
let _guard =
|
||||
debug_span!(target: "providers::state::overlay", "Creating db provider").entered();
|
||||
|
||||
let start = Instant::now();
|
||||
let res = self.factory.database_provider_ro()?;
|
||||
self.metrics.create_provider_duration.record(start.elapsed());
|
||||
res
|
||||
};
|
||||
|
||||
let Overlay { trie_updates, hashed_post_state } = self.get_overlay(&provider)?;
|
||||
|
||||
self.metrics.database_provider_ro_duration.record(overall_start.elapsed());
|
||||
Ok(OverlayStateProvider::new(provider, trie_updates, hashed_post_state))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user