mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
todos
This commit is contained in:
@@ -494,6 +494,10 @@ impl ExecutionCacheBuilder {
|
||||
|
||||
let storage_cache = CacheBuilder::new(self.storage_cache_entries)
|
||||
.weigher(|_key: &Address, value: &Arc<AccountStorageCache>| -> u32 {
|
||||
// TODO: this can't be done by just doing `value.len()` because it's slow, and we
|
||||
// can't keep a usize counter in `AccountStorageCache` because cache can be evicted
|
||||
// in background my moka itself
|
||||
//
|
||||
// values based on results from measure_storage_cache_overhead test
|
||||
let base_weight = 39_000;
|
||||
let slots_weight = value.len() * 218;
|
||||
|
||||
@@ -116,6 +116,8 @@ where
|
||||
P: BlockReader + StateProviderFactory + StateReader + Clone,
|
||||
{
|
||||
/// Creates a new state provider from this builder.
|
||||
/// TODO: maybe clone mdbx transaction because it allows to do so (need to use MDBX API for
|
||||
/// that)
|
||||
pub fn build(&self) -> ProviderResult<StateProviderBox> {
|
||||
let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
|
||||
if let Some(overlay) = self.overlay.clone() {
|
||||
@@ -2465,6 +2467,7 @@ where
|
||||
let block_num_hash = block_id.block;
|
||||
debug!(target: "engine::tree", block=?block_num_hash, parent = ?block_id.parent, "Inserting new block into tree");
|
||||
|
||||
// TODO: do in parallel because it can go to database
|
||||
match self.sealed_header_by_hash(block_num_hash.hash) {
|
||||
Err(err) => {
|
||||
let block = convert_to_block(self, input)?;
|
||||
@@ -2508,6 +2511,7 @@ where
|
||||
}
|
||||
|
||||
// determine whether we are on a fork chain
|
||||
// TODO: get rid of this as we only use it for the event, can make it simpler
|
||||
let is_fork = match self.is_fork(block_id) {
|
||||
Err(err) => {
|
||||
let block = convert_to_block(self, input)?;
|
||||
@@ -2808,6 +2812,8 @@ where
|
||||
where
|
||||
P: BlockReader + StateProviderFactory + StateReader + Clone,
|
||||
{
|
||||
// TODO: fix issues with going to historical instead of latest (problematic with persistence
|
||||
// racing with validation)
|
||||
if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
|
||||
debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
|
||||
// the block leads back to the canonical chain
|
||||
|
||||
@@ -249,6 +249,7 @@ where
|
||||
let multi_proof_task = MultiProofTask::new(
|
||||
proof_handle.clone(),
|
||||
to_sparse_trie,
|
||||
// TOOD: benchmark on Tempo with disabled chunking
|
||||
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
|
||||
);
|
||||
|
||||
@@ -278,6 +279,7 @@ where
|
||||
)
|
||||
} else {
|
||||
// Normal path: spawn with full prewarming
|
||||
// TODO: rename the method to mention prewarming
|
||||
self.spawn_caching_with(
|
||||
env,
|
||||
prewarm_rx,
|
||||
@@ -293,6 +295,7 @@ where
|
||||
let _enter = parent_span.entered();
|
||||
// Build a state provider for the multiproof task
|
||||
let provider = provider_builder.build().expect("failed to build provider");
|
||||
// TODO: look through MultiProofTask::run and whether we're doing it the best way
|
||||
multi_proof_task.run(provider);
|
||||
});
|
||||
|
||||
@@ -371,6 +374,7 @@ where
|
||||
// to the execution task in order.
|
||||
self.executor.spawn_blocking(move || {
|
||||
let mut next_for_execution = 0;
|
||||
// TODO: maybe reuse allocation, check profiles
|
||||
let mut queue = BTreeMap::new();
|
||||
while let Ok((idx, tx)) = ooo_rx.recv() {
|
||||
if next_for_execution == idx {
|
||||
@@ -410,6 +414,7 @@ where
|
||||
transactions = mpsc::channel().1;
|
||||
}
|
||||
|
||||
// TODO: maybe refactor so it's not some some some
|
||||
let (saved_cache, cache, cache_metrics) = if self.disable_state_cache {
|
||||
(None, None, None)
|
||||
} else {
|
||||
@@ -443,6 +448,7 @@ where
|
||||
// spawn pre-warm task
|
||||
{
|
||||
let to_prewarm_task = to_prewarm_task.clone();
|
||||
// TODO: add name for this thread
|
||||
self.executor.spawn_blocking(move || {
|
||||
prewarm_task.run(transactions, to_prewarm_task);
|
||||
});
|
||||
@@ -750,6 +756,8 @@ impl ExecutionCache {
|
||||
|
||||
cache
|
||||
.as_ref()
|
||||
// TODO: add comment on why we're doing is_available: because cache updates can happen
|
||||
// in background by other threads
|
||||
.filter(|c| c.executed_block_hash() == parent_hash && c.is_available())
|
||||
.cloned()
|
||||
}
|
||||
|
||||
@@ -1459,6 +1459,7 @@ impl MultiProofTask {
|
||||
///
|
||||
/// Contains processing state that persists across loop iterations.
|
||||
struct MultiproofBatchCtx {
|
||||
// TODO: mention `process_multiproof_message` and `try_recv` batching in the comment
|
||||
/// Buffers a non-matching message type encountered during batching.
|
||||
/// Processed first in next iteration to preserve ordering while allowing same-type
|
||||
/// messages to batch.
|
||||
|
||||
@@ -145,6 +145,7 @@ where
|
||||
let transaction_count_hint = self.transaction_count_hint;
|
||||
let span = Span::current();
|
||||
|
||||
// TODO: thread name
|
||||
self.executor.spawn_blocking(move || {
|
||||
let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: span, "spawn_all").entered();
|
||||
|
||||
@@ -179,7 +180,10 @@ where
|
||||
break;
|
||||
}
|
||||
|
||||
// TODO: this index is incorrect, these indexes are not actual order indexes, so we need to get on `pending` channel `(idx, tx)`
|
||||
let indexed_tx = IndexedTransaction { index: tx_index, tx };
|
||||
// TODO: this check is hit by every Tempo transaction, we should expose a `is_system_tx` on tx itself
|
||||
// TODO: maybe even more remove it altogether? idk how useful
|
||||
let is_system_tx = indexed_tx.tx.tx().ty() > MAX_STANDARD_TX_TYPE;
|
||||
|
||||
// System transactions (type > 4) in the first position set critical metadata
|
||||
@@ -265,6 +269,10 @@ where
|
||||
let new_cache = SavedCache::new(hash, caches, cache_metrics);
|
||||
|
||||
// Insert state into cache while holding the lock
|
||||
//
|
||||
// TODO: this is very slow because we do cache invalidations on every cache update,
|
||||
// instead we should do it once after payload validation before receiving the next
|
||||
// payload
|
||||
if new_cache.cache().insert_state(&state).is_err() {
|
||||
// Clear the cache on error to prevent having a polluted cache
|
||||
*cached = None;
|
||||
@@ -322,6 +330,8 @@ where
|
||||
trace!(target: "engine::tree::payload_processor::prewarm", "Received termination signal");
|
||||
final_block_output = Some(block_output);
|
||||
|
||||
// We can't just exit here because there may be prewarm tasks that are still
|
||||
// updating the cache
|
||||
if finished_execution {
|
||||
// all tasks are done, we can exit, which will save caches and exit
|
||||
break
|
||||
@@ -358,6 +368,7 @@ where
|
||||
N: NodePrimitives,
|
||||
Evm: ConfigureEvm<Primitives = N>,
|
||||
{
|
||||
// TOOD: maybe Arc these two fields
|
||||
pub(super) env: ExecutionEnv<Evm>,
|
||||
pub(super) evm_config: Evm,
|
||||
pub(super) saved_cache: Option<SavedCache>,
|
||||
@@ -511,10 +522,16 @@ where
|
||||
|
||||
// Only send outcome for transactions after the first txn
|
||||
// as the main execution will be just as fast
|
||||
//
|
||||
// TODO: share current transaction index between execution and prewarming to skip txs
|
||||
// that we already executed
|
||||
if index > 0 {
|
||||
let _enter =
|
||||
debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm outcome", index, tx_hash=%tx.tx().tx_hash())
|
||||
.entered();
|
||||
// TODO: `multiproof_targets_from_state` calculates keccaks for address and storage
|
||||
// slots, maybe we should pre-calculate them in the beginning of newPayload to it's
|
||||
// cached by keccak impl?
|
||||
let (targets, storage_targets) = multiproof_targets_from_state(res.state);
|
||||
metrics.prefetch_storage_targets.record(storage_targets as f64);
|
||||
let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) });
|
||||
|
||||
@@ -361,6 +361,8 @@ where
|
||||
trace!(target: "engine::tree::payload_validator", "Fetching block state provider");
|
||||
let _enter =
|
||||
debug_span!(target: "engine::tree::payload_validator", "state provider").entered();
|
||||
// input to this method
|
||||
// TODO: deduplicate with another `state_provider_builder` and get `provider_builder` as
|
||||
let Some(provider_builder) =
|
||||
ensure_ok!(self.state_provider_builder(parent_hash, ctx.state()))
|
||||
else {
|
||||
@@ -375,6 +377,8 @@ where
|
||||
drop(_enter);
|
||||
|
||||
// fetch parent block
|
||||
// TODO: add comments that it goes to memory most of the time UNLESS parent block is beyond
|
||||
// in-memory buffer
|
||||
let Some(parent_block) = ensure_ok!(self.sealed_header_by_hash(parent_hash, ctx.state()))
|
||||
else {
|
||||
return Err(InsertBlockError::new(
|
||||
@@ -399,6 +403,7 @@ where
|
||||
"Decided which state root algorithm to run"
|
||||
);
|
||||
|
||||
// TODO: wrong comment vvv
|
||||
// use prewarming background task
|
||||
let txs = self.tx_iterator_for(&input)?;
|
||||
|
||||
@@ -553,6 +558,7 @@ where
|
||||
}
|
||||
|
||||
// terminate prewarming task with good state output
|
||||
// TODO: Arc the output state here, so that we don't clone it in caching
|
||||
handle.terminate_caching(Some(&output.state));
|
||||
|
||||
Ok(self.spawn_deferred_trie_task(
|
||||
@@ -627,6 +633,7 @@ where
|
||||
self.execution_ctx_for(input).map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?;
|
||||
let mut executor = self.evm_config.create_executor(evm, ctx);
|
||||
|
||||
// TODO: measure how long does this take and maybe initialize once and re-use
|
||||
if !self.config.precompile_cache_disabled() {
|
||||
// Only cache pure precompiles to avoid issues with stateful precompiles
|
||||
executor.evm_mut().precompiles_mut().map_pure_precompiles(|address, precompile| {
|
||||
@@ -800,6 +807,7 @@ where
|
||||
StateRootStrategy::StateRootTask => {
|
||||
// Compute trie input
|
||||
let trie_input_start = Instant::now();
|
||||
// TOOD: spawn prewarming and tx decoding and recovery before this
|
||||
let (trie_input, block_hash) = self.compute_trie_input(parent_hash, state)?;
|
||||
|
||||
// Create OverlayStateProviderFactory with sorted trie data for multiproofs
|
||||
|
||||
@@ -292,6 +292,7 @@ where
|
||||
&self,
|
||||
payload: &ExecutionData,
|
||||
) -> Result<impl ExecutableTxIterator<Self>, Self::Error> {
|
||||
// Cloning transactions is cheap here because `Bytes` is cheap to clone
|
||||
let txs = payload.payload.transactions().clone();
|
||||
let convert = |tx: Bytes| {
|
||||
let tx =
|
||||
|
||||
@@ -76,6 +76,7 @@ where
|
||||
let expected_hash = payload.block_hash();
|
||||
|
||||
// First parse the block
|
||||
// TODO: pass already decoded transactions from `tx_iterator`
|
||||
let sealed_block = payload.try_into_block_with_sidecar(&sidecar)?.seal_slow();
|
||||
|
||||
// Ensure the hash included in the payload matches the block hash
|
||||
|
||||
@@ -150,6 +150,8 @@ impl ProofWorkerHandle {
|
||||
let work_rx_clone = storage_work_rx.clone();
|
||||
let storage_available_workers_clone = storage_available_workers.clone();
|
||||
|
||||
// TODO: add name to this executor so we can differentiate proof task worker threads in
|
||||
// Samply easier
|
||||
executor.spawn_blocking(move || {
|
||||
#[cfg(feature = "metrics")]
|
||||
let metrics = ProofTaskTrieMetrics::default();
|
||||
|
||||
Reference in New Issue
Block a user