chore: move primitives/stage to reth-stages-types (#8749)

This commit is contained in:
joshieDo
2024-06-11 16:38:26 +02:00
committed by GitHub
parent 87c22d22f8
commit b8759733d4
71 changed files with 231 additions and 181 deletions

View File

@@ -39,7 +39,6 @@ mod receipt;
mod request;
/// Helpers for working with revm
pub mod revm;
pub mod stage;
pub use reth_static_file_types as static_file;
mod storage;
pub mod transaction;

View File

@@ -1,399 +0,0 @@
use crate::{Address, BlockNumber, B256};
use bytes::Buf;
use reth_codecs::{main_codec, Compact};
use reth_trie_types::{hash_builder::HashBuilderState, StoredSubNode};
use std::ops::RangeInclusive;
use super::StageId;
/// Saves the progress of Merkle stage.
#[derive(Default, Debug, Clone, PartialEq)]
pub struct MerkleCheckpoint {
/// The target block number.
pub target_block: BlockNumber,
/// The last hashed account key processed.
pub last_account_key: B256,
/// Previously recorded walker stack.
pub walker_stack: Vec<StoredSubNode>,
/// The hash builder state.
pub state: HashBuilderState,
}
impl MerkleCheckpoint {
/// Creates a new Merkle checkpoint.
pub fn new(
target_block: BlockNumber,
last_account_key: B256,
walker_stack: Vec<StoredSubNode>,
state: HashBuilderState,
) -> Self {
Self { target_block, last_account_key, walker_stack, state }
}
}
impl Compact for MerkleCheckpoint {
fn to_compact<B>(self, buf: &mut B) -> usize
where
B: bytes::BufMut + AsMut<[u8]>,
{
let mut len = 0;
buf.put_u64(self.target_block);
len += 8;
buf.put_slice(self.last_account_key.as_slice());
len += self.last_account_key.len();
buf.put_u16(self.walker_stack.len() as u16);
len += 2;
for item in self.walker_stack {
len += item.to_compact(buf);
}
len += self.state.to_compact(buf);
len
}
fn from_compact(mut buf: &[u8], _len: usize) -> (Self, &[u8]) {
let target_block = buf.get_u64();
let last_account_key = B256::from_slice(&buf[..32]);
buf.advance(32);
let walker_stack_len = buf.get_u16() as usize;
let mut walker_stack = Vec::with_capacity(walker_stack_len);
for _ in 0..walker_stack_len {
let (item, rest) = StoredSubNode::from_compact(buf, 0);
walker_stack.push(item);
buf = rest;
}
let (state, buf) = HashBuilderState::from_compact(buf, 0);
(Self { target_block, last_account_key, walker_stack, state }, buf)
}
}
/// Saves the progress of AccountHashing stage.
#[main_codec]
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)]
pub struct AccountHashingCheckpoint {
/// The next account to start hashing from.
pub address: Option<Address>,
/// Block range which this checkpoint is valid for.
pub block_range: CheckpointBlockRange,
/// Progress measured in accounts.
pub progress: EntitiesCheckpoint,
}
/// Saves the progress of StorageHashing stage.
#[main_codec]
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)]
pub struct StorageHashingCheckpoint {
/// The next account to start hashing from.
pub address: Option<Address>,
/// The next storage slot to start hashing from.
pub storage: Option<B256>,
/// Block range which this checkpoint is valid for.
pub block_range: CheckpointBlockRange,
/// Progress measured in storage slots.
pub progress: EntitiesCheckpoint,
}
/// Saves the progress of Execution stage.
#[main_codec]
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)]
pub struct ExecutionCheckpoint {
/// Block range which this checkpoint is valid for.
pub block_range: CheckpointBlockRange,
/// Progress measured in gas.
pub progress: EntitiesCheckpoint,
}
/// Saves the progress of Headers stage.
#[main_codec]
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)]
pub struct HeadersCheckpoint {
/// Block range which this checkpoint is valid for.
pub block_range: CheckpointBlockRange,
/// Progress measured in gas.
pub progress: EntitiesCheckpoint,
}
/// Saves the progress of Index History stages.
#[main_codec]
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)]
pub struct IndexHistoryCheckpoint {
/// Block range which this checkpoint is valid for.
pub block_range: CheckpointBlockRange,
/// Progress measured in changesets.
pub progress: EntitiesCheckpoint,
}
/// Saves the progress of abstract stage iterating over or downloading entities.
#[main_codec]
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
pub struct EntitiesCheckpoint {
/// Number of entities already processed.
pub processed: u64,
/// Total entities to be processed.
pub total: u64,
}
impl EntitiesCheckpoint {
/// Formats entities checkpoint as percentage, i.e. `processed / total`.
///
/// Return [None] if `total == 0`.
pub fn fmt_percentage(&self) -> Option<String> {
if self.total == 0 {
return None
}
// Calculate percentage with 2 decimal places.
let percentage = 100.0 * self.processed as f64 / self.total as f64;
// Truncate to 2 decimal places, rounding down so that 99.999% becomes 99.99% and not 100%.
Some(format!("{:.2}%", (percentage * 100.0).floor() / 100.0))
}
}
/// Saves the block range. Usually, it's used to check the validity of some stage checkpoint across
/// multiple executions.
#[main_codec]
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)]
pub struct CheckpointBlockRange {
/// The first block of the range, inclusive.
pub from: BlockNumber,
/// The last block of the range, inclusive.
pub to: BlockNumber,
}
impl From<RangeInclusive<BlockNumber>> for CheckpointBlockRange {
fn from(range: RangeInclusive<BlockNumber>) -> Self {
Self { from: *range.start(), to: *range.end() }
}
}
impl From<&RangeInclusive<BlockNumber>> for CheckpointBlockRange {
fn from(range: &RangeInclusive<BlockNumber>) -> Self {
Self { from: *range.start(), to: *range.end() }
}
}
/// Saves the progress of a stage.
#[main_codec]
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
pub struct StageCheckpoint {
/// The maximum block processed by the stage.
pub block_number: BlockNumber,
/// Stage-specific checkpoint. None if stage uses only block-based checkpoints.
pub stage_checkpoint: Option<StageUnitCheckpoint>,
}
impl StageCheckpoint {
/// Creates a new [`StageCheckpoint`] with only `block_number` set.
pub fn new(block_number: BlockNumber) -> Self {
Self { block_number, ..Default::default() }
}
/// Sets the block number.
pub const fn with_block_number(mut self, block_number: BlockNumber) -> Self {
self.block_number = block_number;
self
}
/// Sets the block range, if checkpoint uses block range.
pub fn with_block_range(mut self, stage_id: &StageId, from: u64, to: u64) -> Self {
self.stage_checkpoint = Some(match stage_id {
StageId::Execution => StageUnitCheckpoint::Execution(ExecutionCheckpoint::default()),
StageId::AccountHashing => {
StageUnitCheckpoint::Account(AccountHashingCheckpoint::default())
}
StageId::StorageHashing => {
StageUnitCheckpoint::Storage(StorageHashingCheckpoint::default())
}
StageId::IndexStorageHistory | StageId::IndexAccountHistory => {
StageUnitCheckpoint::IndexHistory(IndexHistoryCheckpoint::default())
}
_ => return self,
});
_ = self.stage_checkpoint.map(|mut checkpoint| checkpoint.set_block_range(from, to));
self
}
/// Get the underlying [`EntitiesCheckpoint`], if any, to determine the number of entities
/// processed, and the number of total entities to process.
pub fn entities(&self) -> Option<EntitiesCheckpoint> {
let stage_checkpoint = self.stage_checkpoint?;
match stage_checkpoint {
StageUnitCheckpoint::Account(AccountHashingCheckpoint {
progress: entities, ..
}) |
StageUnitCheckpoint::Storage(StorageHashingCheckpoint {
progress: entities, ..
}) |
StageUnitCheckpoint::Entities(entities) |
StageUnitCheckpoint::Execution(ExecutionCheckpoint { progress: entities, .. }) |
StageUnitCheckpoint::Headers(HeadersCheckpoint { progress: entities, .. }) |
StageUnitCheckpoint::IndexHistory(IndexHistoryCheckpoint {
progress: entities,
..
}) => Some(entities),
}
}
}
// TODO(alexey): add a merkle checkpoint. Currently it's hard because [`MerkleCheckpoint`]
// is not a Copy type.
/// Stage-specific checkpoint metrics.
#[main_codec]
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum StageUnitCheckpoint {
/// Saves the progress of AccountHashing stage.
Account(AccountHashingCheckpoint),
/// Saves the progress of StorageHashing stage.
Storage(StorageHashingCheckpoint),
/// Saves the progress of abstract stage iterating over or downloading entities.
Entities(EntitiesCheckpoint),
/// Saves the progress of Execution stage.
Execution(ExecutionCheckpoint),
/// Saves the progress of Headers stage.
Headers(HeadersCheckpoint),
/// Saves the progress of Index History stage.
IndexHistory(IndexHistoryCheckpoint),
}
impl StageUnitCheckpoint {
/// Sets the block range. Returns old block range, or `None` if checkpoint doesn't use block
/// range.
pub fn set_block_range(&mut self, from: u64, to: u64) -> Option<CheckpointBlockRange> {
match self {
Self::Account(AccountHashingCheckpoint { ref mut block_range, .. }) |
Self::Storage(StorageHashingCheckpoint { ref mut block_range, .. }) |
Self::Execution(ExecutionCheckpoint { ref mut block_range, .. }) |
Self::IndexHistory(IndexHistoryCheckpoint { ref mut block_range, .. }) => {
let old_range = *block_range;
*block_range = CheckpointBlockRange { from, to };
Some(old_range)
}
_ => None,
}
}
}
#[cfg(test)]
impl Default for StageUnitCheckpoint {
fn default() -> Self {
Self::Account(AccountHashingCheckpoint::default())
}
}
/// Generates [`StageCheckpoint`] getter and builder methods.
macro_rules! stage_unit_checkpoints {
($(($index:expr,$enum_variant:tt,$checkpoint_ty:ty,#[doc = $fn_get_doc:expr]$fn_get_name:ident,#[doc = $fn_build_doc:expr]$fn_build_name:ident)),+) => {
impl StageCheckpoint {
$(
#[doc = $fn_get_doc]
pub const fn $fn_get_name(&self) -> Option<$checkpoint_ty> {
match self.stage_checkpoint {
Some(StageUnitCheckpoint::$enum_variant(checkpoint)) => Some(checkpoint),
_ => None,
}
}
#[doc = $fn_build_doc]
pub const fn $fn_build_name(
mut self,
checkpoint: $checkpoint_ty,
) -> Self {
self.stage_checkpoint = Some(StageUnitCheckpoint::$enum_variant(checkpoint));
self
}
)+
}
};
}
stage_unit_checkpoints!(
(
0,
Account,
AccountHashingCheckpoint,
/// Returns the account hashing stage checkpoint, if any.
account_hashing_stage_checkpoint,
/// Sets the stage checkpoint to account hashing.
with_account_hashing_stage_checkpoint
),
(
1,
Storage,
StorageHashingCheckpoint,
/// Returns the storage hashing stage checkpoint, if any.
storage_hashing_stage_checkpoint,
/// Sets the stage checkpoint to storage hashing.
with_storage_hashing_stage_checkpoint
),
(
2,
Entities,
EntitiesCheckpoint,
/// Returns the entities stage checkpoint, if any.
entities_stage_checkpoint,
/// Sets the stage checkpoint to entities.
with_entities_stage_checkpoint
),
(
3,
Execution,
ExecutionCheckpoint,
/// Returns the execution stage checkpoint, if any.
execution_stage_checkpoint,
/// Sets the stage checkpoint to execution.
with_execution_stage_checkpoint
),
(
4,
Headers,
HeadersCheckpoint,
/// Returns the headers stage checkpoint, if any.
headers_stage_checkpoint,
/// Sets the stage checkpoint to headers.
with_headers_stage_checkpoint
),
(
5,
IndexHistory,
IndexHistoryCheckpoint,
/// Returns the index history stage checkpoint, if any.
index_history_stage_checkpoint,
/// Sets the stage checkpoint to index history.
with_index_history_stage_checkpoint
)
);
#[cfg(test)]
mod tests {
use super::*;
use rand::Rng;
#[test]
fn merkle_checkpoint_roundtrip() {
let mut rng = rand::thread_rng();
let checkpoint = MerkleCheckpoint {
target_block: rng.gen(),
last_account_key: rng.gen(),
walker_stack: vec![StoredSubNode {
key: B256::random_with(&mut rng).to_vec(),
nibble: Some(rng.gen()),
node: None,
}],
state: HashBuilderState::default(),
};
let mut buf = Vec::new();
let encoded = checkpoint.clone().to_compact(&mut buf);
let (decoded, _) = MerkleCheckpoint::from_compact(&buf, encoded);
assert_eq!(decoded, checkpoint);
}
}

View File

@@ -1,146 +0,0 @@
/// Stage IDs for all known stages.
///
/// For custom stages, use [`StageId::Other`]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum StageId {
/// Static File stage in the process.
#[deprecated(
note = "Static Files are generated outside of the pipeline and do not require a separate stage"
)]
StaticFile,
/// Header stage in the process.
Headers,
/// Bodies stage in the process.
Bodies,
/// Sender recovery stage in the process.
SenderRecovery,
/// Execution stage in the process.
Execution,
/// Merkle unwind stage in the process.
MerkleUnwind,
/// Account hashing stage in the process.
AccountHashing,
/// Storage hashing stage in the process.
StorageHashing,
/// Merkle execute stage in the process.
MerkleExecute,
/// Transaction lookup stage in the process.
TransactionLookup,
/// Index storage history stage in the process.
IndexStorageHistory,
/// Index account history stage in the process.
IndexAccountHistory,
/// Finish stage in the process.
Finish,
/// Other custom stage with a provided string identifier.
Other(&'static str),
}
impl StageId {
/// All supported Stages
pub const ALL: [Self; 12] = [
Self::Headers,
Self::Bodies,
Self::SenderRecovery,
Self::Execution,
Self::MerkleUnwind,
Self::AccountHashing,
Self::StorageHashing,
Self::MerkleExecute,
Self::TransactionLookup,
Self::IndexStorageHistory,
Self::IndexAccountHistory,
Self::Finish,
];
/// Stages that require state.
pub const STATE_REQUIRED: [Self; 7] = [
Self::Execution,
Self::MerkleUnwind,
Self::AccountHashing,
Self::StorageHashing,
Self::MerkleExecute,
Self::IndexStorageHistory,
Self::IndexAccountHistory,
];
/// Return stage id formatted as string.
pub const fn as_str(&self) -> &str {
match self {
#[allow(deprecated)]
Self::StaticFile => "StaticFile",
Self::Headers => "Headers",
Self::Bodies => "Bodies",
Self::SenderRecovery => "SenderRecovery",
Self::Execution => "Execution",
Self::MerkleUnwind => "MerkleUnwind",
Self::AccountHashing => "AccountHashing",
Self::StorageHashing => "StorageHashing",
Self::MerkleExecute => "MerkleExecute",
Self::TransactionLookup => "TransactionLookup",
Self::IndexAccountHistory => "IndexAccountHistory",
Self::IndexStorageHistory => "IndexStorageHistory",
Self::Finish => "Finish",
Self::Other(s) => s,
}
}
/// Returns true if it's a downloading stage [`StageId::Headers`] or [`StageId::Bodies`]
pub const fn is_downloading_stage(&self) -> bool {
matches!(self, Self::Headers | Self::Bodies)
}
/// Returns `true` if it's [`TransactionLookup`](StageId::TransactionLookup) stage.
pub const fn is_tx_lookup(&self) -> bool {
matches!(self, Self::TransactionLookup)
}
/// Returns true indicating if it's the finish stage [`StageId::Finish`]
pub const fn is_finish(&self) -> bool {
matches!(self, Self::Finish)
}
}
impl std::fmt::Display for StageId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn stage_id_as_string() {
assert_eq!(StageId::Headers.to_string(), "Headers");
assert_eq!(StageId::Bodies.to_string(), "Bodies");
assert_eq!(StageId::SenderRecovery.to_string(), "SenderRecovery");
assert_eq!(StageId::Execution.to_string(), "Execution");
assert_eq!(StageId::MerkleUnwind.to_string(), "MerkleUnwind");
assert_eq!(StageId::AccountHashing.to_string(), "AccountHashing");
assert_eq!(StageId::StorageHashing.to_string(), "StorageHashing");
assert_eq!(StageId::MerkleExecute.to_string(), "MerkleExecute");
assert_eq!(StageId::IndexAccountHistory.to_string(), "IndexAccountHistory");
assert_eq!(StageId::IndexStorageHistory.to_string(), "IndexStorageHistory");
assert_eq!(StageId::TransactionLookup.to_string(), "TransactionLookup");
assert_eq!(StageId::Finish.to_string(), "Finish");
assert_eq!(StageId::Other("Foo").to_string(), "Foo");
}
#[test]
fn is_downloading_stage() {
assert!(StageId::Headers.is_downloading_stage());
assert!(StageId::Bodies.is_downloading_stage());
assert!(!StageId::Execution.is_downloading_stage());
}
// Multiple places around the codebase assume headers is the first stage.
// Feel free to remove this test if the assumption changes.
#[test]
fn stage_all_headers_first() {
assert_eq!(*StageId::ALL.first().unwrap(), StageId::Headers);
}
}

View File

@@ -1,66 +0,0 @@
//! Staged sync primitives.
mod id;
use crate::{BlockHash, BlockNumber};
pub use id::StageId;
mod checkpoints;
pub use checkpoints::{
AccountHashingCheckpoint, CheckpointBlockRange, EntitiesCheckpoint, ExecutionCheckpoint,
HeadersCheckpoint, IndexHistoryCheckpoint, MerkleCheckpoint, StageCheckpoint,
StageUnitCheckpoint, StorageHashingCheckpoint,
};
/// Direction and target block for pipeline operations.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PipelineTarget {
/// Target for forward synchronization, indicating a block hash to sync to.
Sync(BlockHash),
/// Target for backward unwinding, indicating a block number to unwind to.
Unwind(BlockNumber),
}
impl PipelineTarget {
/// Returns the target block hash for forward synchronization, if applicable.
///
/// # Returns
///
/// - `Some(BlockHash)`: The target block hash for forward synchronization.
/// - `None`: If the target is for backward unwinding.
pub const fn sync_target(self) -> Option<BlockHash> {
match self {
Self::Sync(hash) => Some(hash),
Self::Unwind(_) => None,
}
}
/// Returns the target block number for backward unwinding, if applicable.
///
/// # Returns
///
/// - `Some(BlockNumber)`: The target block number for backward unwinding.
/// - `None`: If the target is for forward synchronization.
pub const fn unwind_target(self) -> Option<BlockNumber> {
match self {
Self::Sync(_) => None,
Self::Unwind(number) => Some(number),
}
}
}
impl From<BlockHash> for PipelineTarget {
fn from(hash: BlockHash) -> Self {
Self::Sync(hash)
}
}
impl std::fmt::Display for PipelineTarget {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Sync(block) => {
write!(f, "Sync({block})")
}
Self::Unwind(block) => write!(f, "Unwind({block})"),
}
}
}