mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
14 Commits
pr-21165
...
docs/rocks
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9a7d3be1fc | ||
|
|
961e232dea | ||
|
|
0c0d12ac9a | ||
|
|
62a97edbf1 | ||
|
|
c9dad4765d | ||
|
|
1d55abeef3 | ||
|
|
f7460e219c | ||
|
|
0c66315f20 | ||
|
|
6a2010e595 | ||
|
|
c2435ff6f8 | ||
|
|
52ec8e9491 | ||
|
|
a901d80ee6 | ||
|
|
915164078f | ||
|
|
be3234d848 |
2
.github/workflows/dependencies.yml
vendored
2
.github/workflows/dependencies.yml
vendored
@@ -15,6 +15,6 @@ permissions:
|
||||
|
||||
jobs:
|
||||
update:
|
||||
uses: ithacaxyz/ci/.github/workflows/cargo-update-pr.yml@main
|
||||
uses: tempoxyz/ci/.github/workflows/cargo-update-pr.yml@main
|
||||
secrets:
|
||||
token: ${{ secrets.GITHUB_TOKEN }}
|
||||
2
.github/workflows/lint.yml
vendored
2
.github/workflows/lint.yml
vendored
@@ -285,7 +285,7 @@ jobs:
|
||||
- run: zepter run check
|
||||
|
||||
deny:
|
||||
uses: ithacaxyz/ci/.github/workflows/deny.yml@main
|
||||
uses: tempoxyz/ci/.github/workflows/deny.yml@main
|
||||
|
||||
lint-success:
|
||||
name: lint success
|
||||
|
||||
@@ -249,7 +249,7 @@ Write comments that remain valuable after the PR is merged. Future readers won't
|
||||
unsafe impl GlobalAlloc for LimitedAllocator { ... }
|
||||
|
||||
// Binary search requires sorted input. Panics on unsorted slices.
|
||||
fn find_index(items: &[Item], target: &Item) -> Option
|
||||
fn find_index(items: &[Item], target: &Item) -> Option<usize>
|
||||
|
||||
// Timeout set to 5s to match EVM block processing limits
|
||||
const TRACER_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
564
Cargo.lock
generated
564
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -1,5 +1,5 @@
|
||||
[workspace.package]
|
||||
version = "1.10.0"
|
||||
version = "1.10.1"
|
||||
edition = "2024"
|
||||
rust-version = "1.88"
|
||||
license = "MIT OR Apache-2.0"
|
||||
|
||||
@@ -163,6 +163,7 @@ impl NodeManager {
|
||||
"eth,reth".to_string(),
|
||||
"--disable-discovery".to_string(),
|
||||
"--trusted-only".to_string(),
|
||||
"--disable-tx-gossip".to_string(),
|
||||
]);
|
||||
|
||||
// Add tracing arguments if OTLP endpoint is configured
|
||||
|
||||
@@ -192,10 +192,10 @@ impl DeferredTrieData {
|
||||
);
|
||||
// Only trigger COW clone if there's actually data to add.
|
||||
if !sorted_hashed_state.is_empty() {
|
||||
Arc::make_mut(&mut overlay.state).extend_ref(&sorted_hashed_state);
|
||||
Arc::make_mut(&mut overlay.state).extend_ref_and_sort(&sorted_hashed_state);
|
||||
}
|
||||
if !sorted_trie_updates.is_empty() {
|
||||
Arc::make_mut(&mut overlay.nodes).extend_ref(&sorted_trie_updates);
|
||||
Arc::make_mut(&mut overlay.nodes).extend_ref_and_sort(&sorted_trie_updates);
|
||||
}
|
||||
overlay
|
||||
}
|
||||
@@ -242,13 +242,13 @@ impl DeferredTrieData {
|
||||
|
||||
for ancestor in ancestors {
|
||||
let ancestor_data = ancestor.wait_cloned();
|
||||
state_mut.extend_ref(ancestor_data.hashed_state.as_ref());
|
||||
nodes_mut.extend_ref(ancestor_data.trie_updates.as_ref());
|
||||
state_mut.extend_ref_and_sort(ancestor_data.hashed_state.as_ref());
|
||||
nodes_mut.extend_ref_and_sort(ancestor_data.trie_updates.as_ref());
|
||||
}
|
||||
|
||||
// Extend with current block's sorted data last (takes precedence)
|
||||
state_mut.extend_ref(sorted_hashed_state);
|
||||
nodes_mut.extend_ref(sorted_trie_updates);
|
||||
state_mut.extend_ref_and_sort(sorted_hashed_state);
|
||||
nodes_mut.extend_ref_and_sort(sorted_trie_updates);
|
||||
|
||||
overlay
|
||||
}
|
||||
@@ -521,7 +521,7 @@ mod tests {
|
||||
let hashed_state = Arc::new(HashedPostStateSorted::new(accounts, B256Map::default()));
|
||||
let trie_updates = Arc::default();
|
||||
let mut overlay = TrieInputSorted::default();
|
||||
Arc::make_mut(&mut overlay.state).extend_ref(hashed_state.as_ref());
|
||||
Arc::make_mut(&mut overlay.state).extend_ref_and_sort(hashed_state.as_ref());
|
||||
|
||||
DeferredTrieData::ready(ComputedTrieData {
|
||||
hashed_state,
|
||||
|
||||
@@ -155,8 +155,8 @@ impl LazyOverlay {
|
||||
|
||||
for block in blocks_iter {
|
||||
let block_data = block.wait_cloned();
|
||||
Arc::make_mut(&mut state).extend_ref(block_data.hashed_state.as_ref());
|
||||
Arc::make_mut(&mut nodes).extend_ref(block_data.trie_updates.as_ref());
|
||||
Arc::make_mut(&mut state).extend_ref_and_sort(block_data.hashed_state.as_ref());
|
||||
Arc::make_mut(&mut nodes).extend_ref_and_sort(block_data.trie_updates.as_ref());
|
||||
}
|
||||
|
||||
TrieInputSorted { state, nodes, prefix_sets: Default::default() }
|
||||
|
||||
@@ -2,14 +2,15 @@ use crate::common::EnvironmentArgs;
|
||||
use clap::Parser;
|
||||
use eyre::Result;
|
||||
use lz4::Decoder;
|
||||
use reqwest::Client;
|
||||
use reqwest::{blocking::Client as BlockingClient, header::RANGE, Client, StatusCode};
|
||||
use reth_chainspec::{EthChainSpec, EthereumHardforks};
|
||||
use reth_cli::chainspec::ChainSpecParser;
|
||||
use reth_fs_util as fs;
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
io::{self, Read, Write},
|
||||
path::Path,
|
||||
fs::OpenOptions,
|
||||
io::{self, BufWriter, Read, Write},
|
||||
path::{Path, PathBuf},
|
||||
sync::{Arc, OnceLock},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
@@ -327,18 +328,158 @@ fn extract_from_file(path: &Path, format: CompressionFormat, target_dir: &Path)
|
||||
extract_archive(file, total_size, format, target_dir)
|
||||
}
|
||||
|
||||
/// Fetches the snapshot from a remote URL, uncompressing it in a streaming fashion.
|
||||
const MAX_DOWNLOAD_RETRIES: u32 = 10;
|
||||
const RETRY_BACKOFF_SECS: u64 = 5;
|
||||
|
||||
/// Wrapper that tracks download progress while writing data.
|
||||
/// Used with [`io::copy`] to display progress during downloads.
|
||||
struct ProgressWriter<W> {
|
||||
inner: W,
|
||||
progress: DownloadProgress,
|
||||
}
|
||||
|
||||
impl<W: Write> Write for ProgressWriter<W> {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
let n = self.inner.write(buf)?;
|
||||
let _ = self.progress.update(n as u64);
|
||||
Ok(n)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
self.inner.flush()
|
||||
}
|
||||
}
|
||||
|
||||
/// Downloads a file with resume support using HTTP Range requests.
|
||||
/// Automatically retries on failure, resuming from where it left off.
|
||||
/// Returns the path to the downloaded file and its total size.
|
||||
fn resumable_download(url: &str, target_dir: &Path) -> Result<(PathBuf, u64)> {
|
||||
let file_name = Url::parse(url)
|
||||
.ok()
|
||||
.and_then(|u| u.path_segments()?.next_back().map(|s| s.to_string()))
|
||||
.unwrap_or_else(|| "snapshot.tar".to_string());
|
||||
|
||||
let final_path = target_dir.join(&file_name);
|
||||
let part_path = target_dir.join(format!("{file_name}.part"));
|
||||
|
||||
let client = BlockingClient::builder().timeout(Duration::from_secs(30)).build()?;
|
||||
|
||||
let mut total_size: Option<u64> = None;
|
||||
let mut last_error: Option<eyre::Error> = None;
|
||||
|
||||
for attempt in 1..=MAX_DOWNLOAD_RETRIES {
|
||||
let existing_size = fs::metadata(&part_path).map(|m| m.len()).unwrap_or(0);
|
||||
|
||||
if let Some(total) = total_size &&
|
||||
existing_size >= total
|
||||
{
|
||||
fs::rename(&part_path, &final_path)?;
|
||||
info!(target: "reth::cli", "Download complete: {}", final_path.display());
|
||||
return Ok((final_path, total));
|
||||
}
|
||||
|
||||
if attempt > 1 {
|
||||
info!(target: "reth::cli",
|
||||
"Retry attempt {}/{} - resuming from {} bytes",
|
||||
attempt, MAX_DOWNLOAD_RETRIES, existing_size
|
||||
);
|
||||
}
|
||||
|
||||
let mut request = client.get(url);
|
||||
if existing_size > 0 {
|
||||
request = request.header(RANGE, format!("bytes={existing_size}-"));
|
||||
if attempt == 1 {
|
||||
info!(target: "reth::cli", "Resuming download from {} bytes", existing_size);
|
||||
}
|
||||
}
|
||||
|
||||
let response = match request.send().and_then(|r| r.error_for_status()) {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
last_error = Some(e.into());
|
||||
if attempt < MAX_DOWNLOAD_RETRIES {
|
||||
info!(target: "reth::cli",
|
||||
"Download failed, retrying in {} seconds...", RETRY_BACKOFF_SECS
|
||||
);
|
||||
std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let is_partial = response.status() == StatusCode::PARTIAL_CONTENT;
|
||||
|
||||
let size = if is_partial {
|
||||
response
|
||||
.headers()
|
||||
.get("Content-Range")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.and_then(|v| v.split('/').next_back())
|
||||
.and_then(|v| v.parse().ok())
|
||||
} else {
|
||||
response.content_length()
|
||||
};
|
||||
|
||||
if total_size.is_none() {
|
||||
total_size = size;
|
||||
}
|
||||
|
||||
let current_total = total_size.ok_or_else(|| {
|
||||
eyre::eyre!("Server did not provide Content-Length or Content-Range header")
|
||||
})?;
|
||||
|
||||
let file = if is_partial && existing_size > 0 {
|
||||
OpenOptions::new()
|
||||
.append(true)
|
||||
.open(&part_path)
|
||||
.map_err(|e| fs::FsPathError::open(e, &part_path))?
|
||||
} else {
|
||||
fs::create_file(&part_path)?
|
||||
};
|
||||
|
||||
let start_offset = if is_partial { existing_size } else { 0 };
|
||||
let mut progress = DownloadProgress::new(current_total);
|
||||
progress.downloaded = start_offset;
|
||||
|
||||
let mut writer = ProgressWriter { inner: BufWriter::new(file), progress };
|
||||
let mut reader = response;
|
||||
|
||||
let copy_result = io::copy(&mut reader, &mut writer);
|
||||
let flush_result = writer.inner.flush();
|
||||
println!();
|
||||
|
||||
if let Err(e) = copy_result.and(flush_result) {
|
||||
last_error = Some(e.into());
|
||||
if attempt < MAX_DOWNLOAD_RETRIES {
|
||||
info!(target: "reth::cli",
|
||||
"Download interrupted, retrying in {} seconds...", RETRY_BACKOFF_SECS
|
||||
);
|
||||
std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
fs::rename(&part_path, &final_path)?;
|
||||
info!(target: "reth::cli", "Download complete: {}", final_path.display());
|
||||
return Ok((final_path, current_total));
|
||||
}
|
||||
|
||||
Err(last_error
|
||||
.unwrap_or_else(|| eyre::eyre!("Download failed after {} attempts", MAX_DOWNLOAD_RETRIES)))
|
||||
}
|
||||
|
||||
/// Fetches the snapshot from a remote URL with resume support, then extracts it.
|
||||
fn download_and_extract(url: &str, format: CompressionFormat, target_dir: &Path) -> Result<()> {
|
||||
let client = reqwest::blocking::Client::builder().build()?;
|
||||
let response = client.get(url).send()?.error_for_status()?;
|
||||
let (downloaded_path, total_size) = resumable_download(url, target_dir)?;
|
||||
|
||||
let total_size = response.content_length().ok_or_else(|| {
|
||||
eyre::eyre!(
|
||||
"Server did not provide Content-Length header. This is required for snapshot downloads"
|
||||
)
|
||||
})?;
|
||||
info!(target: "reth::cli", "Extracting snapshot...");
|
||||
let file = fs::open(&downloaded_path)?;
|
||||
extract_archive(file, total_size, format, target_dir)?;
|
||||
|
||||
extract_archive(response, total_size, format, target_dir)
|
||||
fs::remove_file(&downloaded_path)?;
|
||||
info!(target: "reth::cli", "Removed downloaded archive");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Downloads and extracts a snapshot, blocking until finished.
|
||||
|
||||
@@ -153,7 +153,7 @@ async fn maintain_txpool_reorg() -> eyre::Result<()> {
|
||||
w1.address(),
|
||||
);
|
||||
let pooled_tx1 = EthPooledTransaction::new(tx1.clone(), 200);
|
||||
let tx_hash1 = *pooled_tx1.clone().hash();
|
||||
let tx_hash1 = *pooled_tx1.hash();
|
||||
|
||||
// build tx2 from wallet2
|
||||
let envelop2 = TransactionTestContext::transfer_tx(1, w2.clone()).await;
|
||||
@@ -162,7 +162,7 @@ async fn maintain_txpool_reorg() -> eyre::Result<()> {
|
||||
w2.address(),
|
||||
);
|
||||
let pooled_tx2 = EthPooledTransaction::new(tx2.clone(), 200);
|
||||
let tx_hash2 = *pooled_tx2.clone().hash();
|
||||
let tx_hash2 = *pooled_tx2.hash();
|
||||
|
||||
let block_info = BlockInfo {
|
||||
block_gas_limit: ETHEREUM_BLOCK_GAS_LIMIT_30M,
|
||||
|
||||
@@ -236,7 +236,7 @@ impl reth_codecs::Compact for Transaction {
|
||||
// # Panics
|
||||
//
|
||||
// A panic will be triggered if an identifier larger than 3 is passed from the database. For
|
||||
// optimism a identifier with value [`DEPOSIT_TX_TYPE_ID`] is allowed.
|
||||
// optimism an identifier with value [`DEPOSIT_TX_TYPE_ID`] is allowed.
|
||||
fn from_compact(buf: &[u8], identifier: usize) -> (Self, &[u8]) {
|
||||
let (tx_type, buf) = TxType::from_compact(buf, identifier);
|
||||
|
||||
|
||||
@@ -106,7 +106,7 @@ impl BanList {
|
||||
self.banned_ips.contains_key(ip)
|
||||
}
|
||||
|
||||
/// checks the ban list to see if it contains the given ip
|
||||
/// checks the ban list to see if it contains the given peer
|
||||
#[inline]
|
||||
pub fn is_banned_peer(&self, peer_id: &PeerId) -> bool {
|
||||
self.banned_peers.contains_key(peer_id)
|
||||
@@ -117,7 +117,7 @@ impl BanList {
|
||||
self.banned_ips.remove(ip);
|
||||
}
|
||||
|
||||
/// Unbans the ip address
|
||||
/// Unbans the peer
|
||||
pub fn unban_peer(&mut self, peer_id: &PeerId) {
|
||||
self.banned_peers.remove(peer_id);
|
||||
}
|
||||
|
||||
@@ -12,10 +12,18 @@ use reth_primitives_traits::{AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives
|
||||
use reth_revm::cached::CachedReads;
|
||||
use reth_storage_api::{BlockReaderIdExt, StateProviderFactory};
|
||||
use reth_tasks::TaskExecutor;
|
||||
use std::{sync::Arc, time::Instant};
|
||||
use tokio::sync::{oneshot, watch};
|
||||
use std::{
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tokio::{
|
||||
sync::{oneshot, watch},
|
||||
time::sleep,
|
||||
};
|
||||
use tracing::*;
|
||||
|
||||
const CONNECTION_BACKOUT_PERIOD: Duration = Duration::from_secs(5);
|
||||
|
||||
/// The `FlashBlockService` maintains an in-memory [`PendingFlashBlock`] built out of a sequence of
|
||||
/// [`FlashBlock`]s.
|
||||
#[derive(Debug)]
|
||||
@@ -167,7 +175,13 @@ where
|
||||
self.try_start_build_job();
|
||||
}
|
||||
Some(Err(err)) => {
|
||||
warn!(target: "flashblocks", %err, "Error receiving flashblock");
|
||||
warn!(
|
||||
target: "flashblocks",
|
||||
%err,
|
||||
retry_period = CONNECTION_BACKOUT_PERIOD.as_secs(),
|
||||
"Error receiving flashblock"
|
||||
);
|
||||
sleep(CONNECTION_BACKOUT_PERIOD).await;
|
||||
}
|
||||
None => {
|
||||
warn!(target: "flashblocks", "Flashblock stream ended");
|
||||
|
||||
@@ -1,19 +1,11 @@
|
||||
use crate::stages::utils::collect_history_indices;
|
||||
|
||||
use super::collect_account_history_indices;
|
||||
use super::{collect_account_history_indices, load_history_indices};
|
||||
use alloy_primitives::Address;
|
||||
use reth_config::config::{EtlConfig, IndexHistoryConfig};
|
||||
use reth_db_api::{
|
||||
cursor::DbCursorRO,
|
||||
models::{sharded_key::NUM_OF_INDICES_IN_SHARD, ShardedKey},
|
||||
table::{Decode, Decompress},
|
||||
tables,
|
||||
transaction::{DbTx, DbTxMut},
|
||||
BlockNumberList,
|
||||
};
|
||||
use reth_db_api::{models::ShardedKey, table::Decode, tables, transaction::DbTxMut};
|
||||
use reth_provider::{
|
||||
DBProvider, EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter,
|
||||
RocksDBProviderFactory, StorageSettingsCache,
|
||||
DBProvider, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter, StorageSettingsCache,
|
||||
};
|
||||
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
|
||||
use reth_stages_api::{
|
||||
@@ -61,8 +53,7 @@ where
|
||||
+ PruneCheckpointWriter
|
||||
+ reth_storage_api::ChangeSetReader
|
||||
+ reth_provider::StaticFileProviderFactory
|
||||
+ StorageSettingsCache
|
||||
+ RocksDBProviderFactory,
|
||||
+ StorageSettingsCache,
|
||||
{
|
||||
/// Return the id of the stage
|
||||
fn id(&self) -> StageId {
|
||||
@@ -111,26 +102,14 @@ where
|
||||
let mut range = input.next_block_range();
|
||||
let first_sync = input.checkpoint().block_number == 0;
|
||||
|
||||
// Check if we're using RocksDB for account history
|
||||
let use_rocksdb = provider.cached_storage_settings().account_history_in_rocksdb;
|
||||
|
||||
// On first sync we might have history coming from genesis. We clear the table since it's
|
||||
// faster to rebuild from scratch.
|
||||
if first_sync {
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if use_rocksdb {
|
||||
provider.rocksdb_provider().clear_table::<tables::AccountsHistory>()?;
|
||||
}
|
||||
#[cfg(not(all(unix, feature = "rocksdb")))]
|
||||
let _ = use_rocksdb;
|
||||
|
||||
if !use_rocksdb {
|
||||
provider.tx_ref().clear::<tables::AccountsHistory>()?;
|
||||
}
|
||||
provider.tx_ref().clear::<tables::AccountsHistory>()?;
|
||||
range = 0..=*input.next_block_range().end();
|
||||
}
|
||||
|
||||
info!(target: "sync::stages::index_account_history::exec", ?first_sync, ?use_rocksdb, "Collecting indices");
|
||||
info!(target: "sync::stages::index_account_history::exec", ?first_sync, "Collecting indices");
|
||||
|
||||
let collector = if provider.cached_storage_settings().account_changesets_in_static_files {
|
||||
// Use the provider-based collection that can read from static files.
|
||||
@@ -146,26 +125,14 @@ where
|
||||
};
|
||||
|
||||
info!(target: "sync::stages::index_account_history::exec", "Loading indices into database");
|
||||
|
||||
// Create RocksDB batch if feature is enabled
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
let rocksdb = provider.rocksdb_provider();
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
let rocksdb_batch = rocksdb.batch();
|
||||
#[cfg(not(all(unix, feature = "rocksdb")))]
|
||||
let rocksdb_batch = ();
|
||||
|
||||
// Create writer that routes to either MDBX or RocksDB based on settings
|
||||
let mut writer = EitherWriter::new_accounts_history(provider, rocksdb_batch)?;
|
||||
|
||||
// Load indices using the writer
|
||||
load_account_history_indices_with_writer(provider, &mut writer, collector, first_sync)?;
|
||||
|
||||
// Extract and register RocksDB batch for commit at provider level
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if let Some(batch) = writer.into_raw_rocksdb_batch() {
|
||||
provider.set_pending_rocksdb_batch(batch);
|
||||
}
|
||||
load_history_indices::<_, tables::AccountsHistory, _>(
|
||||
provider,
|
||||
collector,
|
||||
first_sync,
|
||||
ShardedKey::new,
|
||||
ShardedKey::<Address>::decode_owned,
|
||||
|key| key.key,
|
||||
)?;
|
||||
|
||||
Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true })
|
||||
}
|
||||
@@ -186,144 +153,6 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Loads account history indices from a collector into the database using an [`EitherWriter`].
|
||||
///
|
||||
/// This function processes entries from the collector, grouping indices by address and
|
||||
/// writing them as shards when they reach the maximum shard size. It handles merging
|
||||
/// with existing shards during incremental syncs.
|
||||
fn load_account_history_indices_with_writer<Provider, CURSOR, N>(
|
||||
provider: &Provider,
|
||||
writer: &mut EitherWriter<'_, CURSOR, N>,
|
||||
mut collector: reth_etl::Collector<ShardedKey<Address>, BlockNumberList>,
|
||||
append_only: bool,
|
||||
) -> Result<(), StageError>
|
||||
where
|
||||
Provider: DBProvider<Tx: DbTxMut> + StorageSettingsCache + RocksDBProviderFactory,
|
||||
CURSOR: reth_db_api::cursor::DbCursorRW<tables::AccountsHistory>
|
||||
+ reth_db_api::cursor::DbCursorRO<tables::AccountsHistory>,
|
||||
N: reth_primitives_traits::NodePrimitives,
|
||||
{
|
||||
let mut current_address = Address::ZERO;
|
||||
let mut current_list = Vec::<u64>::new();
|
||||
|
||||
// observability
|
||||
let total_entries = collector.len();
|
||||
let interval = (total_entries / 10).max(1);
|
||||
|
||||
for (index, element) in collector.iter()?.enumerate() {
|
||||
let (k, v) = element?;
|
||||
let sharded_key = ShardedKey::<Address>::decode_owned(k)?;
|
||||
let new_list = BlockNumberList::decompress_owned(v)?;
|
||||
|
||||
if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
|
||||
info!(target: "sync::stages::index_account_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
|
||||
}
|
||||
|
||||
let address = sharded_key.key;
|
||||
|
||||
if current_address != address {
|
||||
// We have reached the end of this address, flush remaining indices
|
||||
flush_account_history_shards(writer, current_address, &mut current_list, append_only)?;
|
||||
current_address = address;
|
||||
current_list.clear();
|
||||
|
||||
// If it's not the first sync, merge with existing last shard
|
||||
if !append_only &&
|
||||
let Some(existing_list) =
|
||||
get_last_account_history_shard(provider, current_address)?
|
||||
{
|
||||
current_list.extend(existing_list.iter());
|
||||
}
|
||||
}
|
||||
|
||||
current_list.extend(new_list.iter());
|
||||
|
||||
// Write full shards, keep the partial shard in memory
|
||||
write_full_shards(writer, current_address, &mut current_list)?;
|
||||
}
|
||||
|
||||
// Flush the last address's remaining shard
|
||||
flush_account_history_shards(writer, current_address, &mut current_list, append_only)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Retrieves the last shard for an account from `RocksDB` or MDBX based on storage settings.
|
||||
fn get_last_account_history_shard<Provider>(
|
||||
provider: &Provider,
|
||||
address: Address,
|
||||
) -> Result<Option<BlockNumberList>, StageError>
|
||||
where
|
||||
Provider: DBProvider + StorageSettingsCache + RocksDBProviderFactory,
|
||||
{
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if provider.cached_storage_settings().account_history_in_rocksdb {
|
||||
let rocksdb = provider.rocksdb_provider();
|
||||
let key = ShardedKey::last(address);
|
||||
return Ok(rocksdb.get::<tables::AccountsHistory>(key)?);
|
||||
}
|
||||
|
||||
// Read from MDBX
|
||||
let key = ShardedKey::last(address);
|
||||
let mut cursor = provider.tx_ref().cursor_read::<tables::AccountsHistory>()?;
|
||||
Ok(cursor.seek_exact(key)?.map(|(_, v)| v))
|
||||
}
|
||||
|
||||
/// Writes full shards from `list` to the database, draining them from the list.
|
||||
fn write_full_shards<CURSOR, N>(
|
||||
writer: &mut EitherWriter<'_, CURSOR, N>,
|
||||
address: Address,
|
||||
list: &mut Vec<u64>,
|
||||
) -> Result<(), StageError>
|
||||
where
|
||||
CURSOR: reth_db_api::cursor::DbCursorRW<tables::AccountsHistory>
|
||||
+ reth_db_api::cursor::DbCursorRO<tables::AccountsHistory>,
|
||||
N: reth_primitives_traits::NodePrimitives,
|
||||
{
|
||||
while list.len() > NUM_OF_INDICES_IN_SHARD {
|
||||
let chunk: Vec<u64> = list.drain(..NUM_OF_INDICES_IN_SHARD).collect();
|
||||
let highest = *chunk.last().expect("chunk is not empty");
|
||||
let key = ShardedKey::new(address, highest);
|
||||
let value = BlockNumberList::new_pre_sorted(chunk);
|
||||
writer.put_account_history(key, &value)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Flushes all remaining shards to the database. The last shard gets `u64::MAX` as its highest
|
||||
/// block.
|
||||
fn flush_account_history_shards<CURSOR, N>(
|
||||
writer: &mut EitherWriter<'_, CURSOR, N>,
|
||||
address: Address,
|
||||
list: &mut Vec<u64>,
|
||||
append_only: bool,
|
||||
) -> Result<(), StageError>
|
||||
where
|
||||
CURSOR: reth_db_api::cursor::DbCursorRW<tables::AccountsHistory>
|
||||
+ reth_db_api::cursor::DbCursorRO<tables::AccountsHistory>,
|
||||
N: reth_primitives_traits::NodePrimitives,
|
||||
{
|
||||
if list.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
write_full_shards(writer, address, list)?;
|
||||
|
||||
// The last shard always uses u64::MAX
|
||||
if !list.is_empty() {
|
||||
let key = ShardedKey::last(address);
|
||||
let value = BlockNumberList::new_pre_sorted(list.drain(..));
|
||||
|
||||
// For incremental sync, delete the old last shard first (it will be replaced)
|
||||
if !append_only {
|
||||
writer.delete_account_history(key.clone())?;
|
||||
}
|
||||
writer.put_account_history(key, &value)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -817,187 +646,4 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
mod rocksdb_tests {
|
||||
use super::*;
|
||||
use reth_provider::RocksDBProviderFactory;
|
||||
use reth_storage_api::StorageSettings;
|
||||
|
||||
/// Test that when `account_history_in_rocksdb` is enabled, the stage
|
||||
/// writes account history indices to RocksDB instead of MDBX.
|
||||
#[tokio::test]
|
||||
async fn execute_writes_to_rocksdb_when_enabled() {
|
||||
// init
|
||||
let db = TestStageDB::default();
|
||||
|
||||
// Enable RocksDB for account history
|
||||
db.factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_account_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
// setup - create changesets for blocks 1-10
|
||||
db.commit(|tx| {
|
||||
for block in 0..=10 {
|
||||
tx.put::<tables::BlockBodyIndices>(
|
||||
block,
|
||||
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
|
||||
)?;
|
||||
tx.put::<tables::AccountChangeSets>(block, acc())?;
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// Run stage
|
||||
let input = ExecInput { target: Some(10), ..Default::default() };
|
||||
let mut stage = IndexAccountHistoryStage::default();
|
||||
let provider = db.factory.database_provider_rw().unwrap();
|
||||
let out = stage.execute(&provider, input).unwrap();
|
||||
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
|
||||
provider.commit().unwrap();
|
||||
|
||||
// Verify MDBX table is empty (data should be in RocksDB)
|
||||
let mdbx_table = db.table::<tables::AccountsHistory>().unwrap();
|
||||
assert!(
|
||||
mdbx_table.is_empty(),
|
||||
"MDBX AccountsHistory should be empty when RocksDB is enabled"
|
||||
);
|
||||
|
||||
// Verify RocksDB has the data
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
|
||||
assert!(result.is_some(), "RocksDB should contain account history");
|
||||
|
||||
let block_list = result.unwrap();
|
||||
let blocks: Vec<u64> = block_list.iter().collect();
|
||||
assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
|
||||
}
|
||||
|
||||
/// Test that when `account_history_in_rocksdb` is enabled, the stage
|
||||
/// unwind deletes account history indices from RocksDB instead of MDBX.
|
||||
///
|
||||
/// Note: Full unwind support for RocksDB requires updates to the HistoryWriter trait
|
||||
/// implementation. This test verifies the basic execute functionality.
|
||||
#[tokio::test]
|
||||
async fn unwind_deletes_from_rocksdb_when_enabled() {
|
||||
// init
|
||||
let db = TestStageDB::default();
|
||||
|
||||
// Enable RocksDB for account history
|
||||
db.factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_account_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
// setup - create changesets for blocks 1-10
|
||||
db.commit(|tx| {
|
||||
for block in 0..=10 {
|
||||
tx.put::<tables::BlockBodyIndices>(
|
||||
block,
|
||||
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
|
||||
)?;
|
||||
tx.put::<tables::AccountChangeSets>(block, acc())?;
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// Run stage to populate data
|
||||
let input = ExecInput { target: Some(10), ..Default::default() };
|
||||
let mut stage = IndexAccountHistoryStage::default();
|
||||
let provider = db.factory.database_provider_rw().unwrap();
|
||||
let out = stage.execute(&provider, input).unwrap();
|
||||
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
|
||||
provider.commit().unwrap();
|
||||
|
||||
// Verify RocksDB has the data before unwind
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
|
||||
assert!(result.is_some(), "RocksDB should have data before unwind");
|
||||
|
||||
// Unwind to block 5
|
||||
// Note: The current HistoryWriter implementation doesn't yet support RocksDB unwind.
|
||||
// This test verifies the unwind doesn't panic and properly updates checkpoints.
|
||||
let unwind_input =
|
||||
UnwindInput { checkpoint: StageCheckpoint::new(10), unwind_to: 5, bad_block: None };
|
||||
let provider = db.factory.database_provider_rw().unwrap();
|
||||
let out = stage.unwind(&provider, unwind_input).unwrap();
|
||||
assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(5) });
|
||||
provider.commit().unwrap();
|
||||
|
||||
// Verify RocksDB data still exists (unwind currently uses MDBX path which doesn't
|
||||
// affect RocksDB). Once HistoryWriter is updated for RocksDB, this should verify
|
||||
// that blocks 6-10 are removed.
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
|
||||
assert!(result.is_some(), "RocksDB should still have data");
|
||||
}
|
||||
|
||||
/// Test incremental sync - merging new data with existing data.
|
||||
#[tokio::test]
|
||||
async fn execute_incremental_sync() {
|
||||
// init
|
||||
let db = TestStageDB::default();
|
||||
|
||||
// Enable RocksDB for account history
|
||||
db.factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_account_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
// setup - first batch of changesets
|
||||
db.commit(|tx| {
|
||||
for block in 0..=5 {
|
||||
tx.put::<tables::BlockBodyIndices>(
|
||||
block,
|
||||
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
|
||||
)?;
|
||||
tx.put::<tables::AccountChangeSets>(block, acc())?;
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// First sync (blocks 0-5)
|
||||
let input = ExecInput { target: Some(5), ..Default::default() };
|
||||
let mut stage = IndexAccountHistoryStage::default();
|
||||
let provider = db.factory.database_provider_rw().unwrap();
|
||||
let out = stage.execute(&provider, input).unwrap();
|
||||
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true });
|
||||
provider.commit().unwrap();
|
||||
|
||||
// Verify first sync data in RocksDB
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
|
||||
assert!(result.is_some());
|
||||
let blocks: Vec<u64> = result.unwrap().iter().collect();
|
||||
assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
|
||||
|
||||
// Add more changesets for blocks 6-10
|
||||
db.commit(|tx| {
|
||||
for block in 6..=10 {
|
||||
tx.put::<tables::BlockBodyIndices>(
|
||||
block,
|
||||
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
|
||||
)?;
|
||||
tx.put::<tables::AccountChangeSets>(block, acc())?;
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// Second sync (blocks 6-10)
|
||||
let input = ExecInput { target: Some(10), checkpoint: Some(StageCheckpoint::new(5)) };
|
||||
let provider = db.factory.database_provider_rw().unwrap();
|
||||
let out = stage.execute(&provider, input).unwrap();
|
||||
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
|
||||
provider.commit().unwrap();
|
||||
|
||||
// Verify merged data - should have blocks 0-10
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
|
||||
assert!(result.is_some(), "RocksDB should have merged data");
|
||||
let blocks: Vec<u64> = result.unwrap().iter().collect();
|
||||
assert_eq!(blocks, (0..=10).collect::<Vec<_>>(), "Should have merged blocks 0-10");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -217,7 +217,7 @@ impl MerkleChangeSets {
|
||||
let compute_cumulative_state_revert = |block_number: BlockNumber| -> HashedPostStateSorted {
|
||||
let mut cumulative_revert = HashedPostStateSorted::default();
|
||||
for n in (block_number..target_end).rev() {
|
||||
cumulative_revert.extend_ref(get_block_state_revert(n))
|
||||
cumulative_revert.extend_ref_and_sort(get_block_state_revert(n))
|
||||
}
|
||||
cumulative_revert
|
||||
};
|
||||
@@ -270,7 +270,7 @@ impl MerkleChangeSets {
|
||||
|
||||
let trie_overlay = Arc::clone(&nodes);
|
||||
let mut nodes_mut = Arc::unwrap_or_clone(nodes);
|
||||
nodes_mut.extend_ref(&this_trie_updates);
|
||||
nodes_mut.extend_ref_and_sort(&this_trie_updates);
|
||||
nodes = Arc::new(nodes_mut);
|
||||
|
||||
// Write the changesets to the DB using the trie updates produced by the block, and the
|
||||
|
||||
@@ -57,12 +57,12 @@ where
|
||||
let mut collector = Collector::new(etl_config.file_size, etl_config.dir.clone());
|
||||
let mut cache: HashMap<P, Vec<u64>> = HashMap::default();
|
||||
|
||||
let mut collect = |cache: &HashMap<P, Vec<u64>>| {
|
||||
for (key, indices) in cache {
|
||||
let last = indices.last().expect("qed");
|
||||
let mut collect = |cache: &mut HashMap<P, Vec<u64>>| {
|
||||
for (key, indices) in cache.drain() {
|
||||
let last = *indices.last().expect("qed");
|
||||
collector.insert(
|
||||
sharded_key_factory(*key, *last),
|
||||
BlockNumberList::new_pre_sorted(indices.iter().copied()),
|
||||
sharded_key_factory(key, last),
|
||||
BlockNumberList::new_pre_sorted(indices.into_iter()),
|
||||
)?;
|
||||
}
|
||||
Ok::<(), StageError>(())
|
||||
@@ -87,13 +87,12 @@ where
|
||||
current_block_number = block_number;
|
||||
flush_counter += 1;
|
||||
if flush_counter > DEFAULT_CACHE_THRESHOLD {
|
||||
collect(&cache)?;
|
||||
cache.clear();
|
||||
collect(&mut cache)?;
|
||||
flush_counter = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
collect(&cache)?;
|
||||
collect(&mut cache)?;
|
||||
|
||||
Ok(collector)
|
||||
}
|
||||
|
||||
@@ -69,7 +69,7 @@ pub fn generate_from_to(
|
||||
}
|
||||
}
|
||||
|
||||
/// Generates code to implement the `Compact` trait method `to_compact`.
|
||||
/// Generates code to implement the `Compact` trait method `from_compact`.
|
||||
fn generate_from_compact(
|
||||
fields: &FieldList,
|
||||
ident: &Ident,
|
||||
@@ -155,7 +155,7 @@ fn generate_from_compact(
|
||||
}
|
||||
}
|
||||
|
||||
/// Generates code to implement the `Compact` trait method `from_compact`.
|
||||
/// Generates code to implement the `Compact` trait method `to_compact`.
|
||||
fn generate_to_compact(
|
||||
fields: &FieldList,
|
||||
ident: &Ident,
|
||||
|
||||
@@ -175,7 +175,7 @@ fn should_use_alt_impl(ftype: &str, segment: &syn::PathSegment) -> bool {
|
||||
let syn::PathArguments::AngleBracketed(ref args) = segment.arguments &&
|
||||
let Some(syn::GenericArgument::Type(syn::Type::Path(arg_path))) = args.args.last() &&
|
||||
let (Some(path), 1) = (arg_path.path.segments.first(), arg_path.path.segments.len()) &&
|
||||
["B256", "Address", "Address", "Bloom", "TxHash", "BlockHash", "CompactPlaceholder"]
|
||||
["B256", "Address", "Bloom", "TxHash", "BlockHash", "CompactPlaceholder"]
|
||||
.iter()
|
||||
.any(|&s| path.ident == s)
|
||||
{
|
||||
|
||||
@@ -312,7 +312,7 @@ where
|
||||
}
|
||||
|
||||
/// Position at first key-value pair greater than or equal to specified, return both key and
|
||||
/// data, and the return code depends on a exact match.
|
||||
/// data, and the return code depends on an exact match.
|
||||
///
|
||||
/// For non DupSort-ed collections this works the same as [`Self::set_range()`], but returns
|
||||
/// [false] if key found exactly and [true] if greater key was found.
|
||||
|
||||
@@ -37,12 +37,12 @@ impl<H: NippyJarHeader> NippyJarChecker<H> {
|
||||
Self { jar, data_file: None, offsets_file: None }
|
||||
}
|
||||
|
||||
/// It will throw an error if the [`NippyJar`] is in a inconsistent state.
|
||||
/// It will throw an error if the [`NippyJar`] is in an inconsistent state.
|
||||
pub fn check_consistency(&mut self) -> Result<(), NippyJarError> {
|
||||
self.handle_consistency(ConsistencyFailStrategy::ThrowError)
|
||||
}
|
||||
|
||||
/// It will attempt to heal if the [`NippyJar`] is in a inconsistent state.
|
||||
/// It will attempt to heal if the [`NippyJar`] is in an inconsistent state.
|
||||
///
|
||||
/// **ATTENTION**: disk commit should be handled externally by consuming `Self`
|
||||
pub fn ensure_consistency(&mut self) -> Result<(), NippyJarError> {
|
||||
|
||||
@@ -582,7 +582,8 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
let mut result = blocks_iter.next().expect("non-empty").trie_updates();
|
||||
|
||||
for block in blocks_iter {
|
||||
Arc::make_mut(&mut result).extend_ref(block.trie_updates().as_ref());
|
||||
Arc::make_mut(&mut result)
|
||||
.extend_ref_and_sort(block.trie_updates().as_ref());
|
||||
}
|
||||
|
||||
match Arc::try_unwrap(result) {
|
||||
|
||||
@@ -514,34 +514,6 @@ impl RocksDBProvider {
|
||||
})
|
||||
}
|
||||
|
||||
/// Clears all entries from the specified table.
|
||||
///
|
||||
/// This iterates over all keys and deletes them in a single batch.
|
||||
pub fn clear_table<T: Table>(&self) -> ProviderResult<()> {
|
||||
self.execute_with_operation_metric(RocksDBOperation::BatchWrite, T::NAME, |this| {
|
||||
let cf = this.get_cf_handle::<T>()?;
|
||||
let mut batch = WriteBatchWithTransaction::<true>::default();
|
||||
|
||||
let iter = this.0.db.iterator_cf(cf, IteratorMode::Start);
|
||||
for result in iter {
|
||||
let (key, _) = result.map_err(|e| {
|
||||
ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
|
||||
message: e.to_string().into(),
|
||||
code: -1,
|
||||
}))
|
||||
})?;
|
||||
batch.delete_cf(cf, &key);
|
||||
}
|
||||
|
||||
this.0.db.write_opt(batch, &WriteOptions::default()).map_err(|e| {
|
||||
ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
|
||||
message: e.to_string().into(),
|
||||
code: -1,
|
||||
}))
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/// Writes all `RocksDB` data for multiple blocks in parallel.
|
||||
///
|
||||
/// This handles transaction hash numbers, account history, and storage history based on
|
||||
|
||||
@@ -291,7 +291,7 @@ impl<Provider: DBProvider + ChangeSetReader + BlockNumReader> StateRootProvider
|
||||
fn state_root(&self, hashed_state: HashedPostState) -> ProviderResult<B256> {
|
||||
let mut revert_state = self.revert_state()?;
|
||||
let hashed_state_sorted = hashed_state.into_sorted();
|
||||
revert_state.extend_ref(&hashed_state_sorted);
|
||||
revert_state.extend_ref_and_sort(&hashed_state_sorted);
|
||||
Ok(StateRoot::overlay_root(self.tx(), &revert_state)?)
|
||||
}
|
||||
|
||||
@@ -306,7 +306,7 @@ impl<Provider: DBProvider + ChangeSetReader + BlockNumReader> StateRootProvider
|
||||
) -> ProviderResult<(B256, TrieUpdates)> {
|
||||
let mut revert_state = self.revert_state()?;
|
||||
let hashed_state_sorted = hashed_state.into_sorted();
|
||||
revert_state.extend_ref(&hashed_state_sorted);
|
||||
revert_state.extend_ref_and_sort(&hashed_state_sorted);
|
||||
Ok(StateRoot::overlay_root_with_updates(self.tx(), &revert_state)?)
|
||||
}
|
||||
|
||||
|
||||
@@ -163,12 +163,12 @@ impl<F> OverlayStateProviderFactory<F> {
|
||||
pub fn with_extended_hashed_state_overlay(mut self, other: HashedPostStateSorted) -> Self {
|
||||
match &mut self.overlay_source {
|
||||
Some(OverlaySource::Immediate { state, .. }) => {
|
||||
Arc::make_mut(state).extend_ref(&other);
|
||||
Arc::make_mut(state).extend_ref_and_sort(&other);
|
||||
}
|
||||
Some(OverlaySource::Lazy(lazy)) => {
|
||||
// Resolve lazy overlay and convert to immediate with extension
|
||||
let (trie, mut state) = lazy.as_overlay();
|
||||
Arc::make_mut(&mut state).extend_ref(&other);
|
||||
Arc::make_mut(&mut state).extend_ref_and_sort(&other);
|
||||
self.overlay_source = Some(OverlaySource::Immediate { trie, state });
|
||||
}
|
||||
None => {
|
||||
@@ -342,7 +342,7 @@ where
|
||||
let trie_updates = if trie_reverts.is_empty() {
|
||||
overlay_trie
|
||||
} else if !overlay_trie.is_empty() {
|
||||
trie_reverts.extend_ref(&overlay_trie);
|
||||
trie_reverts.extend_ref_and_sort(&overlay_trie);
|
||||
Arc::new(trie_reverts)
|
||||
} else {
|
||||
Arc::new(trie_reverts)
|
||||
@@ -351,7 +351,7 @@ where
|
||||
let hashed_state_updates = if hashed_state_reverts.is_empty() {
|
||||
overlay_state
|
||||
} else if !overlay_state.is_empty() {
|
||||
hashed_state_reverts.extend_ref(&overlay_state);
|
||||
hashed_state_reverts.extend_ref_and_sort(&overlay_state);
|
||||
Arc::new(hashed_state_reverts)
|
||||
} else {
|
||||
Arc::new(hashed_state_reverts)
|
||||
|
||||
@@ -621,7 +621,9 @@ impl HashedPostStateSorted {
|
||||
|
||||
/// Extends this state with contents of another sorted state.
|
||||
/// Entries in `other` take precedence for duplicate keys.
|
||||
pub fn extend_ref(&mut self, other: &Self) {
|
||||
///
|
||||
/// Sorts the accounts after extending. Sorts the storage after extending, for each account.
|
||||
pub fn extend_ref_and_sort(&mut self, other: &Self) {
|
||||
// Extend accounts
|
||||
extend_sorted_vec(&mut self.accounts, &other.accounts);
|
||||
|
||||
@@ -1416,7 +1418,7 @@ mod tests {
|
||||
storages: B256Map::default(),
|
||||
};
|
||||
|
||||
state1.extend_ref(&state2);
|
||||
state1.extend_ref_and_sort(&state2);
|
||||
|
||||
// Check accounts are merged and sorted
|
||||
assert_eq!(state1.accounts.len(), 6);
|
||||
|
||||
@@ -605,7 +605,10 @@ impl TrieUpdatesSorted {
|
||||
/// This merges the account nodes and storage tries from `other` into `self`.
|
||||
/// Account nodes are merged and re-sorted, with `other`'s values taking precedence
|
||||
/// for duplicate keys.
|
||||
pub fn extend_ref(&mut self, other: &Self) {
|
||||
///
|
||||
/// Sorts the account nodes after extending. Sorts the storage tries after extending, for each
|
||||
/// storage trie.
|
||||
pub fn extend_ref_and_sort(&mut self, other: &Self) {
|
||||
// Extend account nodes
|
||||
extend_sorted_vec(&mut self.account_nodes, &other.account_nodes);
|
||||
|
||||
@@ -834,7 +837,7 @@ mod tests {
|
||||
// Test extending with empty updates
|
||||
let mut updates1 = TrieUpdatesSorted::default();
|
||||
let updates2 = TrieUpdatesSorted::default();
|
||||
updates1.extend_ref(&updates2);
|
||||
updates1.extend_ref_and_sort(&updates2);
|
||||
assert_eq!(updates1.account_nodes.len(), 0);
|
||||
assert_eq!(updates1.storage_tries.len(), 0);
|
||||
|
||||
@@ -853,7 +856,7 @@ mod tests {
|
||||
],
|
||||
storage_tries: B256Map::default(),
|
||||
};
|
||||
updates1.extend_ref(&updates2);
|
||||
updates1.extend_ref_and_sort(&updates2);
|
||||
assert_eq!(updates1.account_nodes.len(), 3);
|
||||
// Should be sorted: 0x01, 0x02, 0x03
|
||||
assert_eq!(updates1.account_nodes[0].0, Nibbles::from_nibbles_unchecked([0x01]));
|
||||
@@ -889,7 +892,7 @@ mod tests {
|
||||
(hashed_address2, storage_trie1),
|
||||
]),
|
||||
};
|
||||
updates1.extend_ref(&updates2);
|
||||
updates1.extend_ref_and_sort(&updates2);
|
||||
assert_eq!(updates1.storage_tries.len(), 2);
|
||||
assert!(updates1.storage_tries.contains_key(&hashed_address1));
|
||||
assert!(updates1.storage_tries.contains_key(&hashed_address2));
|
||||
|
||||
@@ -87,7 +87,7 @@ where
|
||||
|
||||
// This reverts all changes from db tip back to just after block-1 was processed
|
||||
let mut cumulative_state_revert_prev = cumulative_state_revert.clone();
|
||||
cumulative_state_revert_prev.extend_ref(&individual_state_revert);
|
||||
cumulative_state_revert_prev.extend_ref_and_sort(&individual_state_revert);
|
||||
|
||||
// Step 2: Calculate cumulative trie updates revert for block-1
|
||||
// This gives us the trie state as it was after block-1 was processed
|
||||
@@ -469,7 +469,7 @@ impl ChangesetCache {
|
||||
// Since we iterate newest to oldest, older values are added last
|
||||
// and overwrite any conflicting newer values (oldest changeset values take
|
||||
// precedence).
|
||||
accumulated_reverts.extend_ref(&changesets);
|
||||
accumulated_reverts.extend_ref_and_sort(&changesets);
|
||||
}
|
||||
|
||||
let elapsed = timer.elapsed();
|
||||
|
||||
@@ -13,9 +13,9 @@
|
||||
- It also enables [out-of-the-box fuzzing](https://github.com/paradigmxyz/reth/blob/main/crates/storage/db-api/src/tables/codecs/fuzz/mod.rs) using [trailofbits/test-fuzz](https://github.com/trailofbits/test-fuzz).
|
||||
- We implemented that trait for the following encoding formats:
|
||||
- [Ethereum-specific Compact Encoding](https://github.com/paradigmxyz/reth/blob/main/crates/storage/codecs/derive/src/compact/mod.rs): A lot of Ethereum datatypes have unnecessary zeros when serialized, or optional (e.g. on empty hashes) which would be nice not to pay in storage costs.
|
||||
- [Erigon](https://github.com/ledgerwatch/erigon/blob/12ee33a492f5d240458822d052820d9998653a63/docs/programmers_guide/db_walkthrough.MD) achieves that by having a `bitfield` set on Table "PlainState which adds a bitfield to Accounts.
|
||||
- [Erigon](https://github.com/ledgerwatch/erigon/blob/12ee33a492f5d240458822d052820d9998653a63/docs/programmers_guide/db_walkthrough.MD) achieves that by having a `bitfield` set on Table "PlainState" which adds a bitfield to Accounts.
|
||||
- [Akula](https://github.com/akula-bft/akula/) expanded it for other tables and datatypes manually. It also saved some more space by storing the length of certain types (U256, u64) using the [`modular_bitfield`](https://docs.rs/modular-bitfield/latest/modular_bitfield/) crate, which compacts this information.
|
||||
- We generalized it for all types, by writing a derive macro that autogenerates code for implementing the trait. It, also generates the interfaces required for fuzzing using ToB/test-fuzz:
|
||||
- We generalized it for all types, by writing a derive macro that autogenerates code for implementing the trait. It also generates the interfaces required for fuzzing using ToB/test-fuzz:
|
||||
- [Scale Encoding](https://github.com/paritytech/parity-scale-codec)
|
||||
- [Postcard Encoding](https://github.com/jamesmunns/postcard)
|
||||
- Passthrough (called `no_codec` in the codebase)
|
||||
|
||||
@@ -23,6 +23,7 @@ Generally, reth is composed of a few components, with supporting crates. The mai
|
||||
- [Payloads](#payloads)
|
||||
- [Primitives](#primitives)
|
||||
- [Optimism](#optimism)
|
||||
- [Ethereum](#ethereum-specific-crates)
|
||||
- [Misc](#misc)
|
||||
|
||||
The supporting crates are split into two categories: [primitives](#primitives) and [miscellaneous](#misc).
|
||||
@@ -181,7 +182,7 @@ These crates define primitive types or algorithms.
|
||||
|
||||
Crates related to the Optimism rollup live in [optimism](../../crates/optimism/).
|
||||
|
||||
#### Ethereum-Specific Crates
|
||||
### Ethereum-Specific Crates
|
||||
|
||||
Ethereum mainnet-specific implementations and primitives live in `crates/ethereum/`.
|
||||
|
||||
|
||||
@@ -52,6 +52,23 @@ reth node \
|
||||
--authrpc.port 8551
|
||||
```
|
||||
|
||||
### Minimal Storage Mode
|
||||
|
||||
To run Reth in minimal storage mode, follow the steps from the previous chapter on
|
||||
[how to run on mainnet or official testnets](/run/ethereum), and add a `--minimal` flag. For example:
|
||||
|
||||
```bash
|
||||
reth node \
|
||||
--minimal \
|
||||
--authrpc.jwtsecret /path/to/secret \
|
||||
--authrpc.addr 127.0.0.1 \
|
||||
--authrpc.port 8551
|
||||
```
|
||||
|
||||
Minimal storage mode is a preconfigured pruned node profile that aims to minimize disk usage by fully
|
||||
pruning sender recovery, transaction lookup, and receipts, and by keeping only the last 10,064 blocks
|
||||
of account history, storage history, and block bodies with smaller static file segments.
|
||||
|
||||
## Size
|
||||
|
||||
All numbers are as of April 2024 at block number 19.6M for mainnet.
|
||||
|
||||
91
docs/vocs/docs/pages/run/storage.mdx
Normal file
91
docs/vocs/docs/pages/run/storage.mdx
Normal file
@@ -0,0 +1,91 @@
|
||||
---
|
||||
description: Configure RocksDB table routing for optimal performance.
|
||||
---
|
||||
|
||||
# Storage Configuration
|
||||
|
||||
Reth uses a hybrid storage approach with MDBX as the primary database and optional RocksDB for specific tables. This page documents how to configure which tables are stored in RocksDB.
|
||||
|
||||
## Overview
|
||||
|
||||
By default, all tables are stored in MDBX. However, certain large tables can benefit from RocksDB's performance characteristics. Storage settings allow you to route specific tables to RocksDB for improved performance on high-throughput workloads.
|
||||
|
||||
:::warning
|
||||
These settings can only be configured at genesis initialization. Once the node has been initialized, changing these settings requires a full re-sync from scratch.
|
||||
:::
|
||||
|
||||
## RocksDB CLI Flags
|
||||
|
||||
All RocksDB flags require an explicit boolean value (`true` or `false`). They cannot be used as bare flags.
|
||||
|
||||
### `--rocksdb.tx-hash`
|
||||
|
||||
Route the transaction hash to number mapping table to RocksDB instead of MDBX.
|
||||
|
||||
```bash
|
||||
reth node --rocksdb.tx-hash=true
|
||||
```
|
||||
|
||||
**Default:** `false` (stored in MDBX)
|
||||
|
||||
**Use case:** This table maps transaction hashes to their sequential transaction IDs. Moving it to RocksDB can improve lookup performance for high-volume transaction queries.
|
||||
|
||||
### `--rocksdb.storages-history`
|
||||
|
||||
Route storage history tables to RocksDB instead of MDBX.
|
||||
|
||||
```bash
|
||||
reth node --rocksdb.storages-history=true
|
||||
```
|
||||
|
||||
**Default:** `false` (stored in MDBX)
|
||||
|
||||
**Use case:** Storage history tracks historical values of contract storage slots. This table can grow very large on archive nodes. RocksDB may provide better write performance for this workload.
|
||||
|
||||
### `--rocksdb.account-history`
|
||||
|
||||
Route account history tables to RocksDB instead of MDBX.
|
||||
|
||||
```bash
|
||||
reth node --rocksdb.account-history=true
|
||||
```
|
||||
|
||||
**Default:** `false` (stored in MDBX)
|
||||
|
||||
**Use case:** Account history tracks historical account states. Similar to storage history, this table grows significantly on archive nodes and may benefit from RocksDB's handling of large datasets.
|
||||
|
||||
## Example
|
||||
|
||||
```bash
|
||||
reth node --rocksdb.tx-hash=true --rocksdb.storages-history=true --rocksdb.account-history=true
|
||||
```
|
||||
|
||||
## RocksDB Data Directory
|
||||
|
||||
When using RocksDB tables, the data is stored in a separate directory. You can customize this location using the `--datadir.rocksdb` flag:
|
||||
|
||||
```bash
|
||||
reth node --datadir.rocksdb /path/to/rocksdb
|
||||
```
|
||||
|
||||
If not specified, RocksDB data is stored in a subdirectory of your main data directory.
|
||||
|
||||
## When to Use RocksDB Tables
|
||||
|
||||
Consider enabling RocksDB for specific tables when:
|
||||
|
||||
- Running an **archive node** with full historical data
|
||||
- Experiencing **write performance bottlenecks** with large history tables
|
||||
- Performing **high-volume transaction lookups** that stress the transaction hash table
|
||||
|
||||
For most use cases, the default MDBX configuration provides excellent performance. Only enable RocksDB tables if you have measured performance issues with specific workloads.
|
||||
|
||||
## Migration Notes
|
||||
|
||||
Since these settings are genesis-only:
|
||||
|
||||
1. **New nodes:** Configure storage settings before first sync
|
||||
2. **Existing nodes:** Requires full re-sync to change storage settings
|
||||
3. **Backup:** Always backup your data before re-syncing
|
||||
|
||||
The storage settings are persisted to ensure consistency across restarts. Attempting to start a node with different storage settings than what was used during initialization will result in an error.
|
||||
@@ -100,6 +100,10 @@ export const sidebar: SidebarItem[] = [
|
||||
text: "Configuration",
|
||||
link: "/run/configuration"
|
||||
},
|
||||
{
|
||||
text: "Storage",
|
||||
link: "/run/storage"
|
||||
},
|
||||
{
|
||||
text: "Monitoring",
|
||||
link: "/run/monitoring"
|
||||
|
||||
@@ -21,7 +21,7 @@ export default defineConfig({
|
||||
},
|
||||
{ text: 'GitHub', link: 'https://github.com/paradigmxyz/reth' },
|
||||
{
|
||||
text: 'v1.10.0',
|
||||
text: 'v1.10.1',
|
||||
items: [
|
||||
{
|
||||
text: 'Releases',
|
||||
|
||||
Reference in New Issue
Block a user