feat: snapshots (#5738)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
Co-authored-by: Bjerg <onbjerg@users.noreply.github.com>
This commit is contained in:
joshieDo
2023-12-18 15:55:43 +00:00
committed by GitHub
parent 900fe7ea4e
commit 18dd1b72a1
28 changed files with 1749 additions and 453 deletions

1
Cargo.lock generated
View File

@@ -6292,6 +6292,7 @@ dependencies = [
"reth-primitives",
"reth-trie",
"revm",
"strum",
"tempfile",
"tokio",
"tokio-stream",

View File

@@ -50,7 +50,7 @@ impl Command {
let path: PathBuf = SnapshotSegment::Headers
.filename_with_configuration(filters, compression, &block_range, &tx_range)
.into();
let provider = SnapshotProvider::default();
let provider = SnapshotProvider::new(PathBuf::default())?;
let jar_provider = provider.get_segment_provider_from_block(
SnapshotSegment::Headers,
self.from,

View File

@@ -4,7 +4,7 @@ use itertools::Itertools;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use reth_db::{database::Database, open_db_read_only, DatabaseEnv};
use reth_interfaces::db::LogLevel;
use reth_nippy_jar::NippyJar;
use reth_nippy_jar::{NippyJar, NippyJarCursor};
use reth_primitives::{
snapshot::{Compression, Filters, InclusionFilter, PerfectHashingFunction, SegmentHeader},
BlockNumber, ChainSpec, SnapshotSegment,
@@ -210,19 +210,18 @@ impl Command {
fn stats(&self, snapshots: Vec<impl AsRef<Path>>) -> eyre::Result<()> {
let mut total_filters_size = 0;
let mut total_index_size = 0;
let mut total_offsets_size = 0;
let mut total_duration = Duration::new(0, 0);
let mut total_file_size = 0;
for snap in &snapshots {
let start_time = Instant::now();
let jar = NippyJar::<SegmentHeader>::load(snap.as_ref())?;
let _cursor = NippyJarCursor::new(&jar)?;
let duration = start_time.elapsed();
let file_size = snap.as_ref().metadata()?.len();
total_filters_size += jar.filter_size();
total_index_size += jar.offsets_index_size();
total_offsets_size += jar.offsets_size();
total_duration += duration;
total_file_size += file_size;
@@ -230,7 +229,6 @@ impl Command {
println!(" File Size: {:>7}", human_bytes(file_size as f64));
println!(" Filters Size: {:>7}", human_bytes(jar.filter_size() as f64));
println!(" Offset Index Size: {:>7}", human_bytes(jar.offsets_index_size() as f64));
println!(" Offset List Size: {:>7}", human_bytes(jar.offsets_size() as f64));
println!(
" Loading Time: {:>7.2} ms | {:>7.2} µs",
duration.as_millis() as f64,
@@ -242,7 +240,6 @@ impl Command {
println!("Total Filters Size: {:>7}", human_bytes(total_filters_size as f64));
println!("Total Offset Index Size: {:>7}", human_bytes(total_index_size as f64));
println!("Total Offset List Size: {:>7}", human_bytes(total_offsets_size as f64));
println!("Total File Size: {:>7}", human_bytes(total_file_size as f64));
println!(
"Average Loading Time: {:>7.2} ms | {:>7.2} µs",

View File

@@ -53,7 +53,7 @@ impl Command {
.filename_with_configuration(filters, compression, &block_range, &tx_range)
.into();
let provider = SnapshotProvider::default();
let provider = SnapshotProvider::new(PathBuf::default())?;
let jar_provider = provider.get_segment_provider_from_block(
SnapshotSegment::Receipts,
self.from,

View File

@@ -50,7 +50,7 @@ impl Command {
let path: PathBuf = SnapshotSegment::Transactions
.filename_with_configuration(filters, compression, &block_range, &tx_range)
.into();
let provider = SnapshotProvider::default();
let provider = SnapshotProvider::new(PathBuf::default())?;
let jar_provider = provider.get_segment_provider_from_block(
SnapshotSegment::Transactions,
self.from,

View File

@@ -266,7 +266,7 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
)?;
provider_factory = provider_factory
.with_snapshots(data_dir.snapshots_path(), snapshotter.highest_snapshot_receiver());
.with_snapshots(data_dir.snapshots_path(), snapshotter.highest_snapshot_receiver())?;
self.start_metrics_endpoint(prometheus_handle, Arc::clone(&db)).await?;

View File

@@ -14,6 +14,9 @@ pub enum ProviderError {
/// Database error.
#[error(transparent)]
Database(#[from] crate::db::DatabaseError),
/// Filesystem path error.
#[error("{0}")]
FsPathError(String),
/// Nippy jar error.
#[error("nippy jar error: {0}")]
NippyJar(String),
@@ -127,6 +130,12 @@ impl From<reth_nippy_jar::NippyJarError> for ProviderError {
}
}
impl From<reth_primitives::fs::FsPathError> for ProviderError {
fn from(err: reth_primitives::fs::FsPathError) -> Self {
ProviderError::FsPathError(err.to_string())
}
}
/// A root mismatch error at a given block height.
#[derive(Clone, Debug, Error, PartialEq, Eq)]
#[error("root mismatch at #{block_number} ({block_hash}): {root}")]

View File

@@ -4,14 +4,11 @@ mod compression;
mod filters;
mod segment;
use alloy_primitives::{BlockNumber, TxNumber};
use alloy_primitives::BlockNumber;
pub use compression::Compression;
pub use filters::{Filters, InclusionFilter, PerfectHashingFunction};
pub use segment::{SegmentConfig, SegmentHeader, SnapshotSegment};
use crate::fs::FsPathError;
use std::{ops::RangeInclusive, path::Path};
/// Default snapshot block count.
pub const BLOCKS_PER_SNAPSHOT: u64 = 500_000;
@@ -48,20 +45,3 @@ impl HighestSnapshots {
}
}
}
/// Given the snapshot's location, it returns an iterator over the existing snapshots in the format
/// of a tuple composed by the segment, block range and transaction range.
pub fn iter_snapshots(
path: impl AsRef<Path>,
) -> Result<
impl Iterator<Item = (SnapshotSegment, RangeInclusive<BlockNumber>, RangeInclusive<TxNumber>)>,
FsPathError,
> {
let entries = crate::fs::read_dir(path.as_ref())?.filter_map(Result::ok);
Ok(entries.filter_map(|entry| {
if entry.metadata().map_or(false, |metadata| metadata.is_file()) {
return SnapshotSegment::parse_filename(&entry.file_name())
}
None
}))
}

View File

@@ -5,7 +5,7 @@ use crate::{
use derive_more::Display;
use serde::{Deserialize, Serialize};
use std::{ffi::OsStr, ops::RangeInclusive, str::FromStr};
use strum::{AsRefStr, EnumString};
use strum::{AsRefStr, EnumIter, EnumString};
#[derive(
Debug,
@@ -19,6 +19,7 @@ use strum::{AsRefStr, EnumString};
Deserialize,
Serialize,
EnumString,
EnumIter,
AsRefStr,
Display,
)]
@@ -153,6 +154,16 @@ impl SegmentHeader {
Self { block_range, tx_range, segment }
}
/// Returns the transaction range.
pub fn tx_range(&self) -> &RangeInclusive<TxNumber> {
&self.tx_range
}
/// Returns the block range.
pub fn block_range(&self) -> &RangeInclusive<BlockNumber> {
&self.block_range
}
/// Returns the first block number of the segment.
pub fn block_start(&self) -> BlockNumber {
*self.block_range.start()

View File

@@ -1,12 +1,9 @@
//! Support for snapshotting.
use crate::{segments, segments::Segment, SnapshotterError};
use reth_db::database::Database;
use reth_db::{database::Database, snapshot::iter_snapshots};
use reth_interfaces::{RethError, RethResult};
use reth_primitives::{
snapshot::{iter_snapshots, HighestSnapshots},
BlockNumber, TxNumber,
};
use reth_primitives::{snapshot::HighestSnapshots, BlockNumber, TxNumber};
use reth_provider::{BlockReader, DatabaseProviderRO, ProviderFactory, TransactionsProviderExt};
use std::{
collections::HashMap,
@@ -155,10 +152,14 @@ impl<DB: Database> Snapshotter<DB> {
// It walks over the directory and parses the snapshot filenames extracting
// `SnapshotSegment` and their inclusive range. It then takes the maximum block
// number for each specific segment.
for (segment, block_range, _) in iter_snapshots(&self.snapshots_path)? {
let max_segment_block = self.highest_snapshots.as_mut(segment);
if max_segment_block.map_or(true, |block| block < *block_range.end()) {
*max_segment_block = Some(*block_range.end());
for (segment, ranges) in
iter_snapshots(&self.snapshots_path).map_err(|err| RethError::Provider(err.into()))?
{
for (block_range, _) in ranges {
let max_segment_block = self.highest_snapshots.as_mut(segment);
if max_segment_block.map_or(true, |block| block < *block_range.end()) {
*max_segment_block = Some(*block_range.end());
}
}
}

View File

@@ -2,8 +2,9 @@ use super::mask::{ColumnSelectorOne, ColumnSelectorThree, ColumnSelectorTwo};
use crate::table::Decompress;
use derive_more::{Deref, DerefMut};
use reth_interfaces::provider::ProviderResult;
use reth_nippy_jar::{MmapHandle, NippyJar, NippyJarCursor};
use reth_nippy_jar::{DataReader, NippyJar, NippyJarCursor};
use reth_primitives::{snapshot::SegmentHeader, B256};
use std::sync::Arc;
/// Cursor of a snapshot segment.
#[derive(Debug, Deref, DerefMut)]
@@ -11,8 +12,8 @@ pub struct SnapshotCursor<'a>(NippyJarCursor<'a, SegmentHeader>);
impl<'a> SnapshotCursor<'a> {
/// Returns a new [`SnapshotCursor`].
pub fn new(jar: &'a NippyJar<SegmentHeader>, mmap_handle: MmapHandle) -> ProviderResult<Self> {
Ok(Self(NippyJarCursor::with_handle(jar, mmap_handle)?))
pub fn new(jar: &'a NippyJar<SegmentHeader>, reader: Arc<DataReader>) -> ProviderResult<Self> {
Ok(Self(NippyJarCursor::with_reader(jar, reader)?))
}
/// Returns the current `BlockNumber` or `TxNumber` of the cursor depending on the kind of

View File

@@ -6,9 +6,8 @@ use crate::{
};
use reth_interfaces::provider::ProviderResult;
use reth_nippy_jar::{ColumnResult, NippyJar, PHFKey};
use reth_nippy_jar::{ColumnResult, NippyJar, NippyJarHeader, PHFKey};
use reth_tracing::tracing::*;
use serde::{Deserialize, Serialize};
use std::{error::Error as StdError, ops::RangeInclusive};
/// Macro that generates snapshot creation functions that take an arbitratry number of [`Table`] and
@@ -35,7 +34,7 @@ macro_rules! generate_snapshot_func {
pub fn [<create_snapshot$(_ $tbl)+>]<
$($tbl: Table<Key=K>,)+
K,
H: for<'a> Deserialize<'a> + Send + Serialize + Sync + std::fmt::Debug
H: NippyJarHeader
>
(
tx: &impl DbTx,

View File

@@ -1,6 +1,12 @@
//! reth's snapshot database table import and access
mod generation;
use std::{
collections::{hash_map::Entry, HashMap},
ops::RangeInclusive,
path::Path,
};
pub use generation::*;
mod cursor;
@@ -8,5 +14,63 @@ pub use cursor::SnapshotCursor;
mod mask;
pub use mask::*;
use reth_nippy_jar::{NippyJar, NippyJarError};
use reth_primitives::{snapshot::SegmentHeader, BlockNumber, SnapshotSegment, TxNumber};
mod masks;
/// Alias type for a map of [`SnapshotSegment`] and sorted lists of existing snapshot ranges.
type SortedSnapshots =
HashMap<SnapshotSegment, Vec<(RangeInclusive<BlockNumber>, RangeInclusive<TxNumber>)>>;
/// Given the snapshots directory path, it returns a list over the existing snapshots organized by
/// [`SnapshotSegment`]. Each segment has a sorted list of block ranges and transaction ranges.
pub fn iter_snapshots(path: impl AsRef<Path>) -> Result<SortedSnapshots, NippyJarError> {
let mut static_files = SortedSnapshots::default();
let entries = reth_primitives::fs::read_dir(path.as_ref())
.map_err(|err| NippyJarError::Custom(err.to_string()))?
.filter_map(Result::ok)
.collect::<Vec<_>>();
for entry in entries {
if entry.metadata().map_or(false, |metadata| metadata.is_file()) {
if let Some((segment, block_range, tx_range)) =
SnapshotSegment::parse_filename(&entry.file_name())
{
let ranges = (block_range, tx_range);
match static_files.entry(segment) {
Entry::Occupied(mut entry) => {
entry.get_mut().push(ranges);
}
Entry::Vacant(entry) => {
entry.insert(vec![ranges]);
}
}
}
}
}
for (segment, range_list) in static_files.iter_mut() {
// Sort by block end range.
range_list.sort_by(|a, b| a.0.end().cmp(b.0.end()));
if let Some((block_range, tx_range)) = range_list.pop() {
// The highest height static file filename might not be indicative of its actual
// block_range, so we need to read its actual configuration.
let jar = NippyJar::<SegmentHeader>::load(
&path.as_ref().join(segment.filename(&block_range, &tx_range)),
)?;
if &tx_range != jar.user_header().tx_range() {
// TODO(joshie): rename
}
range_list.push((
jar.user_header().block_range().clone(),
jar.user_header().tx_range().clone(),
));
}
}
Ok(static_files)
}

View File

@@ -1,9 +1,9 @@
use crate::{
compression::{Compression, Compressors, Zstd},
InclusionFilter, MmapHandle, NippyJar, NippyJarError, PerfectHashingFunction, RefRow,
DataReader, InclusionFilter, NippyJar, NippyJarError, NippyJarHeader, PerfectHashingFunction,
RefRow,
};
use serde::{de::Deserialize, ser::Serialize};
use std::ops::Range;
use std::{ops::Range, sync::Arc};
use sucds::int_vectors::Access;
use zstd::bulk::Decompressor;
@@ -12,46 +12,40 @@ use zstd::bulk::Decompressor;
pub struct NippyJarCursor<'a, H = ()> {
/// [`NippyJar`] which holds most of the required configuration to read from the file.
jar: &'a NippyJar<H>,
/// Data file.
mmap_handle: MmapHandle,
/// Data and offset reader.
reader: Arc<DataReader>,
/// Internal buffer to unload data to without reallocating memory on each retrieval.
internal_buffer: Vec<u8>,
/// Cursor row position.
row: u64,
}
impl<'a, H: std::fmt::Debug> std::fmt::Debug for NippyJarCursor<'a, H>
where
H: Send + Sync + Serialize + for<'b> Deserialize<'b> + core::fmt::Debug,
{
impl<'a, H: NippyJarHeader> std::fmt::Debug for NippyJarCursor<'a, H> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NippyJarCursor").field("config", &self.jar).finish_non_exhaustive()
}
}
impl<'a, H> NippyJarCursor<'a, H>
where
H: Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static,
{
impl<'a, H: NippyJarHeader> NippyJarCursor<'a, H> {
pub fn new(jar: &'a NippyJar<H>) -> Result<Self, NippyJarError> {
let max_row_size = jar.max_row_size;
Ok(NippyJarCursor {
jar,
mmap_handle: jar.open_data()?,
reader: Arc::new(jar.open_data_reader()?),
// Makes sure that we have enough buffer capacity to decompress any row of data.
internal_buffer: Vec::with_capacity(max_row_size),
row: 0,
})
}
pub fn with_handle(
pub fn with_reader(
jar: &'a NippyJar<H>,
mmap_handle: MmapHandle,
reader: Arc<DataReader>,
) -> Result<Self, NippyJarError> {
let max_row_size = jar.max_row_size;
Ok(NippyJarCursor {
jar,
mmap_handle,
reader,
// Makes sure that we have enough buffer capacity to decompress any row of data.
internal_buffer: Vec::with_capacity(max_row_size),
row: 0,
@@ -112,7 +106,7 @@ where
pub fn next_row(&mut self) -> Result<Option<RefRow<'_>>, NippyJarError> {
self.internal_buffer.clear();
if self.row as usize * self.jar.columns >= self.jar.offsets.len() {
if self.row as usize >= self.jar.rows {
// Has reached the end
return Ok(None)
}
@@ -129,7 +123,7 @@ where
Ok(Some(
row.into_iter()
.map(|v| match v {
ValueRange::Mmap(range) => &self.mmap_handle[range],
ValueRange::Mmap(range) => self.reader.data(range),
ValueRange::Internal(range) => &self.internal_buffer[range],
})
.collect(),
@@ -186,7 +180,7 @@ where
pub fn next_row_with_cols(&mut self, mask: usize) -> Result<Option<RefRow<'_>>, NippyJarError> {
self.internal_buffer.clear();
if self.row as usize * self.jar.columns >= self.jar.offsets.len() {
if self.row as usize >= self.jar.rows {
// Has reached the end
return Ok(None)
}
@@ -204,7 +198,7 @@ where
Ok(Some(
row.into_iter()
.map(|v| match v {
ValueRange::Mmap(range) => &self.mmap_handle[range],
ValueRange::Mmap(range) => self.reader.data(range),
ValueRange::Internal(range) => &self.internal_buffer[range],
})
.collect(),
@@ -219,13 +213,13 @@ where
) -> Result<(), NippyJarError> {
// Find out the offset of the column value
let offset_pos = self.row as usize * self.jar.columns + column;
let value_offset = self.jar.offsets.select(offset_pos).expect("should exist");
let value_offset = self.reader.offset(offset_pos) as usize;
let column_offset_range = if self.jar.offsets.len() == (offset_pos + 1) {
let column_offset_range = if self.jar.rows * self.jar.columns == offset_pos + 1 {
// It's the last column of the last row
value_offset..self.mmap_handle.len()
value_offset..self.reader.size()
} else {
let next_value_offset = self.jar.offsets.select(offset_pos + 1).expect("should exist");
let next_value_offset = self.reader.offset(offset_pos + 1) as usize;
value_offset..next_value_offset
};
@@ -242,7 +236,7 @@ where
.expect("dictionary to be loaded");
let mut decompressor = Decompressor::with_prepared_dictionary(dictionaries)?;
Zstd::decompress_with_dictionary(
&self.mmap_handle[column_offset_range],
self.reader.data(column_offset_range),
&mut self.internal_buffer,
&mut decompressor,
)?;
@@ -250,7 +244,7 @@ where
_ => {
// Uses the chosen default decompressor
compression.decompress_to(
&self.mmap_handle[column_offset_range],
self.reader.data(column_offset_range),
&mut self.internal_buffer,
)?;
}

View File

@@ -37,8 +37,12 @@ pub enum NippyJarError {
UnsupportedFilterQuery,
#[error("compression or decompression requires a bigger destination output")]
OutputTooSmall,
#[error("Dictionary is not loaded.")]
#[error("dictionary is not loaded.")]
DictionaryNotLoaded,
#[error("It's not possible to generate a compressor after loading a dictionary.")]
#[error("it's not possible to generate a compressor after loading a dictionary.")]
CompressorNotAllowed,
#[error("number of offsets ({0}) is smaller than prune request ({1}).")]
InvalidPruning(u64, u64),
#[error("jar has been frozen and cannot be modified.")]
FrozenJar,
}

View File

@@ -13,20 +13,13 @@
use memmap2::Mmap;
use serde::{Deserialize, Serialize};
use std::{
clone::Clone,
error::Error as StdError,
fs::File,
io::{Seek, Write},
marker::Sync,
ops::Deref,
ops::Range,
path::{Path, PathBuf},
sync::Arc,
};
use sucds::{
int_vectors::PrefixSummedEliasFano,
mii_sequences::{EliasFano, EliasFanoBuilder},
Serializable,
};
use sucds::{int_vectors::PrefixSummedEliasFano, Serializable};
use tracing::*;
pub mod filter;
@@ -45,15 +38,34 @@ pub use error::NippyJarError;
mod cursor;
pub use cursor::NippyJarCursor;
mod writer;
pub use writer::NippyJarWriter;
const NIPPY_JAR_VERSION: usize = 1;
const INDEX_FILE_EXTENSION: &str = "idx";
const OFFSETS_FILE_EXTENSION: &str = "off";
const CONFIG_FILE_EXTENSION: &str = "conf";
/// A [`RefRow`] is a list of column value slices pointing to either an internal buffer or a
/// memory-mapped file.
type RefRow<'a> = Vec<&'a [u8]>;
/// Alias type for a column value wrapped in `Result`
/// Alias type for a column value wrapped in `Result`.
pub type ColumnResult<T> = Result<T, Box<dyn StdError + Send + Sync>>;
/// A trait for the user-defined header of [NippyJar].
pub trait NippyJarHeader:
Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static
{
}
// Blanket implementation for all types that implement the required traits.
impl<T> NippyJarHeader for T where
T: Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static
{
}
/// `NippyJar` is a specialized storage format designed for immutable data.
///
/// Data is organized into a columnar format, enabling column-based compression. Data retrieval
@@ -88,40 +100,39 @@ pub struct NippyJar<H = ()> {
user_header: H,
/// Number of data columns in the jar.
columns: usize,
/// Number of data rows in the jar.
rows: usize,
/// Optional compression algorithm applied to the data.
compressor: Option<Compressors>,
#[serde(skip)]
/// Optional filter function for data membership checks.
filter: Option<InclusionFilters>,
#[serde(skip)]
/// Optional Perfect Hashing Function (PHF) for unique offset mapping.
phf: Option<Functions>,
/// Index mapping PHF output to value offsets in `offsets`.
#[serde(skip)]
offsets_index: PrefixSummedEliasFano,
/// Offsets within the file for each column value, arranged by row and column.
#[serde(skip)]
offsets: EliasFano,
/// Maximum uncompressed row size of the set. This will enable decompression without any
/// resizing of the output buffer.
#[serde(skip)]
max_row_size: usize,
/// Data path for file. Index file will be `{path}.idx`
/// Data path for file. Supporting files will have a format `{path}.{extension}`.
#[serde(skip)]
path: Option<PathBuf>,
path: PathBuf,
}
impl<H: std::fmt::Debug> std::fmt::Debug for NippyJar<H> {
impl<H: NippyJarHeader> std::fmt::Debug for NippyJar<H> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NippyJar")
.field("version", &self.version)
.field("user_header", &self.user_header)
.field("rows", &self.rows)
.field("columns", &self.columns)
.field("compressor", &self.compressor)
.field("filter", &self.filter)
.field("phf", &self.phf)
.field("offsets_index (len)", &self.offsets_index.len())
.field("offsets_index (size in bytes)", &self.offsets_index.size_in_bytes())
.field("offsets (len)", &self.offsets.len())
.field("offsets (size in bytes)", &self.offsets.size_in_bytes())
.field("path", &self.path)
.field("max_row_size", &self.max_row_size)
.finish_non_exhaustive()
@@ -145,23 +156,20 @@ impl NippyJar<()> {
}
}
impl<H> NippyJar<H>
where
H: Send + Sync + Serialize + for<'a> Deserialize<'a> + std::fmt::Debug,
{
impl<H: NippyJarHeader> NippyJar<H> {
/// Creates a new [`NippyJar`] with a user-defined header data.
pub fn new(columns: usize, path: &Path, user_header: H) -> Self {
NippyJar {
version: NIPPY_JAR_VERSION,
user_header,
columns,
rows: 0,
max_row_size: 0,
compressor: None,
filter: None,
phf: None,
offsets: EliasFano::default(),
offsets_index: PrefixSummedEliasFano::default(),
path: Some(path.to_path_buf()),
path: path.to_path_buf(),
}
}
@@ -201,17 +209,12 @@ where
&self.user_header
}
/// Gets a reference to `self.offsets`.
pub fn offsets_size(&self) -> usize {
self.offsets.size_in_bytes()
}
/// Gets a reference to `self.offsets`.
/// Returns the size of inclusion filter
pub fn filter_size(&self) -> usize {
self.size()
}
/// Gets a reference to `self.offsets_index`.
/// Returns the size of offsets index
pub fn offsets_index_size(&self) -> usize {
self.offsets_index.size_in_bytes()
}
@@ -226,48 +229,53 @@ where
self.compressor.as_mut()
}
/// Loads the file configuration and returns [`Self`].
/// Loads the file configuration and returns [`Self`] without deserializing filters related
/// structures or the offset list.
///
/// **The user must ensure the header type matches the one used during the jar's creation.**
pub fn load(path: &Path) -> Result<Self, NippyJarError> {
// Read [`Self`] located at the data file.
let data_file = File::open(path)?;
// SAFETY: File is read-only and its descriptor is kept alive as long as the mmap handle.
let data_reader = unsafe { memmap2::Mmap::map(&data_file)? };
let mut obj: Self = bincode::deserialize_from(data_reader.as_ref())?;
obj.path = Some(path.to_path_buf());
// Read the offsets lists located at the index file.
let offsets_file = File::open(obj.index_path())?;
// SAFETY: File is read-only and its descriptor is kept alive as long as the mmap handle.
let mmap = unsafe { memmap2::Mmap::map(&offsets_file)? };
let mut offsets_reader = mmap.as_ref();
obj.offsets = EliasFano::deserialize_from(&mut offsets_reader)?;
obj.offsets_index = PrefixSummedEliasFano::deserialize_from(&mut offsets_reader)?;
obj.max_row_size = bincode::deserialize_from(offsets_reader)?;
let config_file = File::open(path.with_extension(CONFIG_FILE_EXTENSION))?;
let mut obj: Self = bincode::deserialize_from(&config_file)?;
obj.path = path.to_path_buf();
Ok(obj)
}
/// Returns the path from the data file
pub fn data_path(&self) -> PathBuf {
self.path.clone().expect("exists")
/// Loads filters into memory
pub fn load_filters(mut self) -> Result<Self, NippyJarError> {
// Read the offsets lists located at the index file.
let mut offsets_file = File::open(self.index_path())?;
self.offsets_index = PrefixSummedEliasFano::deserialize_from(&mut offsets_file)?;
self.phf = bincode::deserialize_from(&mut offsets_file)?;
self.filter = bincode::deserialize_from(&mut offsets_file)?;
Ok(self)
}
/// Returns the path from the index file
/// Returns the path for the data file
pub fn data_path(&self) -> &Path {
self.path.as_ref()
}
/// Returns the path for the index file
pub fn index_path(&self) -> PathBuf {
let data_path = self.data_path();
data_path
.parent()
.expect("exists")
.join(format!("{}.idx", data_path.file_name().expect("exists").to_string_lossy()))
self.path.with_extension(INDEX_FILE_EXTENSION)
}
/// Returns a [`MmapHandle`] of the data file
pub fn open_data(&self) -> Result<MmapHandle, NippyJarError> {
MmapHandle::new(self.data_path())
/// Returns the path for the offsets file
pub fn offsets_path(&self) -> PathBuf {
self.path.with_extension(OFFSETS_FILE_EXTENSION)
}
/// Returns the path for the config file
pub fn config_path(&self) -> PathBuf {
self.path.with_extension(CONFIG_FILE_EXTENSION)
}
/// Returns a [`DataReader`] of the data and offset file
pub fn open_data_reader(&self) -> Result<DataReader, NippyJarError> {
DataReader::new(self.data_path())
}
/// If required, prepares any compression algorithm to an early pass of the data.
@@ -338,124 +346,44 @@ where
columns: Vec<impl IntoIterator<Item = ColumnResult<Vec<u8>>>>,
total_rows: u64,
) -> Result<(), NippyJarError> {
let mut file = self.freeze_check(&columns)?;
self.freeze_config(&mut file)?;
self.check_before_freeze(&columns)?;
// Special case for zstd that might use custom dictionaries/compressors per column
// If any other compression algorithm is added and uses a similar flow, then revisit
// implementation
let mut maybe_zstd_compressors = None;
if let Some(Compressors::Zstd(zstd)) = &self.compressor {
maybe_zstd_compressors = zstd.compressors()?;
}
debug!(target: "nippy-jar", path=?self.data_path(), "Opening data file.");
// Temporary buffer to avoid multiple reallocations if compressing to a buffer (eg. zstd w/
// dict)
let mut tmp_buf = Vec::with_capacity(1_000_000);
// Creates the writer, data and offsets file
let mut writer = NippyJarWriter::new(self)?;
// Write all rows while taking all row start offsets
let mut row_number = 0u64;
let mut offsets = Vec::with_capacity(total_rows as usize * self.columns);
let mut column_iterators =
columns.into_iter().map(|v| v.into_iter()).collect::<Vec<_>>().into_iter();
// Append rows to file while holding offsets in memory
writer.append_rows(columns, total_rows)?;
debug!(target: "nippy-jar", compressor=?self.compressor, "Writing rows.");
// Flushes configuration and offsets to disk
writer.commit()?;
loop {
let mut iterators = Vec::with_capacity(self.columns);
// Write phf, filter and offset index to file
self.freeze_filters()?;
// Write the column value of each row
// TODO: iter_mut if we remove the IntoIterator interface.
let mut uncompressed_row_size = 0;
for (column_number, mut column_iter) in column_iterators.enumerate() {
offsets.push(file.stream_position()? as usize);
match column_iter.next() {
Some(Ok(value)) => {
uncompressed_row_size += value.len();
if let Some(compression) = &self.compressor {
// Special zstd case with dictionaries
if let (Some(dict_compressors), Compressors::Zstd(_)) =
(maybe_zstd_compressors.as_mut(), compression)
{
compression::Zstd::compress_with_dictionary(
&value,
&mut tmp_buf,
&mut file,
Some(dict_compressors.get_mut(column_number).expect("exists")),
)?;
} else {
let before = tmp_buf.len();
let len = compression.compress_to(&value, &mut tmp_buf)?;
file.write_all(&tmp_buf[before..before + len])?;
}
} else {
file.write_all(&value)?;
}
}
None => {
return Err(NippyJarError::UnexpectedMissingValue(
row_number,
column_number as u64,
))
}
Some(Err(err)) => return Err(err.into()),
}
iterators.push(column_iter);
}
tmp_buf.clear();
row_number += 1;
self.max_row_size = self.max_row_size.max(uncompressed_row_size);
if row_number == total_rows {
break
}
column_iterators = iterators.into_iter();
}
// drops immutable borrow
drop(maybe_zstd_compressors);
// Write offsets and offset index to file
self.freeze_offsets(offsets)?;
debug!(target: "nippy-jar", jar=?self, "Finished.");
debug!(target: "nippy-jar", jar=?self, "Finished writing data.");
Ok(())
}
/// Freezes offsets and its own index.
fn freeze_offsets(&mut self, offsets: Vec<usize>) -> Result<(), NippyJarError> {
if !offsets.is_empty() {
debug!(target: "nippy-jar", "Encoding offsets list.");
let mut builder =
EliasFanoBuilder::new(*offsets.last().expect("qed") + 1, offsets.len())?;
for offset in offsets {
builder.push(offset)?;
}
self.offsets = builder.build().enable_rank();
}
/// Freezes [`PerfectHashingFunction`], [`InclusionFilter`] and the offset index to file.
fn freeze_filters(&mut self) -> Result<(), NippyJarError> {
debug!(target: "nippy-jar", path=?self.index_path(), "Writing offsets and offsets index to file.");
let mut file = File::create(self.index_path())?;
self.offsets.serialize_into(&mut file)?;
self.offsets_index.serialize_into(&mut file)?;
self.max_row_size.serialize_into(file)?;
bincode::serialize_into(&mut file, &self.phf)?;
bincode::serialize_into(&mut file, &self.filter)?;
Ok(())
}
/// Safety checks before creating and returning a [`File`] handle to write data to.
fn freeze_check(
fn check_before_freeze(
&mut self,
columns: &[impl IntoIterator<Item = ColumnResult<Vec<u8>>>],
) -> Result<File, NippyJarError> {
) -> Result<(), NippyJarError> {
if columns.len() != self.columns {
return Err(NippyJarError::ColumnLenMismatch(self.columns, columns.len()))
}
@@ -471,23 +399,16 @@ where
let _ = phf.get_index(&[])?;
}
debug!(target: "nippy-jar", path=?self.data_path(), "Opening data file.");
Ok(File::create(self.data_path())?)
Ok(())
}
/// Writes all necessary configuration to file.
fn freeze_config(&mut self, handle: &mut File) -> Result<(), NippyJarError> {
// TODO Split Dictionaries and Bloomfilters Configuration so we dont have to load everything
// at once
Ok(bincode::serialize_into(handle, &self)?)
fn freeze_config(&mut self) -> Result<(), NippyJarError> {
Ok(bincode::serialize_into(File::create(self.config_path())?, &self)?)
}
}
impl<H> InclusionFilter for NippyJar<H>
where
H: Send + Sync + Serialize + for<'a> Deserialize<'a>,
{
impl<H: NippyJarHeader> InclusionFilter for NippyJar<H> {
fn add(&mut self, element: &[u8]) -> Result<(), NippyJarError> {
self.filter.as_mut().ok_or(NippyJarError::FilterMissing)?.add(element)
}
@@ -501,10 +422,7 @@ where
}
}
impl<H> PerfectHashingFunction for NippyJar<H>
where
H: Send + Sync + Serialize + for<'a> Deserialize<'a>,
{
impl<H: NippyJarHeader> PerfectHashingFunction for NippyJar<H> {
fn set_keys<T: PHFKey>(&mut self, keys: &[T]) -> Result<(), NippyJarError> {
self.phf.as_mut().ok_or(NippyJarError::PHFMissing)?.set_keys(keys)
}
@@ -514,31 +432,94 @@ where
}
}
/// Holds an `Arc` over a file and its associated mmap handle.
#[derive(Debug, Clone)]
pub struct MmapHandle {
/// File descriptor. Needs to be kept alive as long as the mmap handle.
/// Manages the reading of snapshot data using memory-mapped files.
///
/// Holds file and mmap descriptors of the data and offsets files of a snapshot.
#[derive(Debug)]
pub struct DataReader {
/// Data file descriptor. Needs to be kept alive as long as `data_mmap` handle.
#[allow(unused)]
file: Arc<File>,
/// Mmap handle.
mmap: Arc<Mmap>,
data_file: File,
/// Mmap handle for data.
data_mmap: Mmap,
/// Offset file descriptor. Needs to be kept alive as long as `offset_mmap` handle.
#[allow(unused)]
offset_file: File,
/// Mmap handle for offsets.
offset_mmap: Mmap,
/// Number of bytes that represent one offset.
offset_size: u64,
}
impl MmapHandle {
impl DataReader {
/// Reads the respective data and offsets file and returns [`DataReader`].
pub fn new(path: impl AsRef<Path>) -> Result<Self, NippyJarError> {
let file = File::open(path)?;
let data_file = File::open(path.as_ref())?;
// SAFETY: File is read-only and its descriptor is kept alive as long as the mmap handle.
let mmap = unsafe { Mmap::map(&file)? };
let data_mmap = unsafe { Mmap::map(&data_file)? };
Ok(Self { file: Arc::new(file), mmap: Arc::new(mmap) })
let offset_file = File::open(path.as_ref().with_extension(OFFSETS_FILE_EXTENSION))?;
// SAFETY: File is read-only and its descriptor is kept alive as long as the mmap handle.
let offset_mmap = unsafe { Mmap::map(&offset_file)? };
Ok(Self {
data_file,
data_mmap,
offset_file,
// First byte is the size of one offset in bytes
offset_size: offset_mmap[0] as u64,
offset_mmap,
})
}
}
impl Deref for MmapHandle {
type Target = Mmap;
fn deref(&self) -> &Self::Target {
&self.mmap
/// Returns the offset for the requested data index
pub fn offset(&self, index: usize) -> u64 {
// + 1 represents the offset_len u8 which is in the beginning of the file
let from = index * self.offset_size as usize + 1;
self.offset_at(from)
}
/// Returns the offset for the requested data index starting from the end
pub fn reverse_offset(&self, index: usize) -> Result<u64, NippyJarError> {
let offsets_file_size = self.offset_file.metadata()?.len() as usize;
if offsets_file_size > 1 {
let from = offsets_file_size - self.offset_size as usize * (index + 1);
Ok(self.offset_at(from))
} else {
Ok(0)
}
}
/// Returns total number of offsets in the file.
/// The size of one offset is determined by the file itself.
pub fn offsets_count(&self) -> Result<usize, NippyJarError> {
Ok((self.offset_file.metadata()?.len().saturating_sub(1) / self.offset_size) as usize)
}
/// Reads one offset-sized (determined by the offset file) u64 at the provided index.
fn offset_at(&self, index: usize) -> u64 {
let mut buffer: [u8; 8] = [0; 8];
buffer[..self.offset_size as usize]
.copy_from_slice(&self.offset_mmap[index..(index + self.offset_size as usize)]);
u64::from_le_bytes(buffer)
}
/// Returns number of bytes that represent one offset.
pub fn offset_size(&self) -> u64 {
self.offset_size
}
/// Returns the underlying data as a slice of bytes for the provided range.
pub fn data(&self, range: Range<usize>) -> &[u8] {
&self.data_mmap[range]
}
/// Returns total size of data
pub fn size(&self) -> usize {
self.data_mmap.len()
}
}
@@ -546,7 +527,7 @@ impl Deref for MmapHandle {
mod tests {
use super::*;
use rand::{rngs::SmallRng, seq::SliceRandom, RngCore, SeedableRng};
use std::collections::HashSet;
use std::{collections::HashSet, fs::OpenOptions};
type ColumnResults<T> = Vec<ColumnResult<T>>;
type ColumnValues = Vec<Vec<u8>>;
@@ -610,7 +591,8 @@ mod tests {
nippy
.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
.unwrap();
let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
let loaded_nippy =
NippyJar::load_without_header(file_path.path()).unwrap().load_filters().unwrap();
assert_eq!(indexes, collect_indexes(&loaded_nippy));
};
@@ -654,7 +636,8 @@ mod tests {
assert!(InclusionFilter::add(&mut nippy, &col1[3]).is_ok());
nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows).unwrap();
let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
let loaded_nippy =
NippyJar::load_without_header(file_path.path()).unwrap().load_filters().unwrap();
assert_eq!(nippy, loaded_nippy);
@@ -707,13 +690,13 @@ mod tests {
nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows).unwrap();
let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
let loaded_nippy =
NippyJar::load_without_header(file_path.path()).unwrap().load_filters().unwrap();
assert_eq!(nippy.version, loaded_nippy.version);
assert_eq!(nippy.columns, loaded_nippy.columns);
assert_eq!(nippy.filter, loaded_nippy.filter);
assert_eq!(nippy.phf, loaded_nippy.phf);
assert_eq!(nippy.offsets_index, loaded_nippy.offsets_index);
assert_eq!(nippy.offsets, loaded_nippy.offsets);
assert_eq!(nippy.max_row_size, loaded_nippy.max_row_size);
assert_eq!(nippy.path, loaded_nippy.path);
@@ -751,6 +734,7 @@ mod tests {
nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows).unwrap();
let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
let loaded_nippy = loaded_nippy.load_filters().unwrap();
assert_eq!(nippy, loaded_nippy);
if let Some(Compressors::Lz4(_)) = loaded_nippy.compressor() {
@@ -786,7 +770,8 @@ mod tests {
nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows).unwrap();
let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
let loaded_nippy =
NippyJar::load_without_header(file_path.path()).unwrap().load_filters().unwrap();
assert_eq!(nippy, loaded_nippy);
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
@@ -841,7 +826,8 @@ mod tests {
// Read file
{
let loaded_nippy = NippyJar::<BlockJarHeader>::load(file_path.path()).unwrap();
let loaded_nippy =
NippyJar::<BlockJarHeader>::load(file_path.path()).unwrap().load_filters().unwrap();
assert!(loaded_nippy.compressor().is_some());
assert!(loaded_nippy.filter.is_some());
@@ -911,7 +897,8 @@ mod tests {
// Read file
{
let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
let loaded_nippy =
NippyJar::load_without_header(file_path.path()).unwrap().load_filters().unwrap();
if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
@@ -1013,4 +1000,294 @@ mod tests {
}
}
}
#[test]
fn test_writer() {
let (col1, col2) = test_data(None);
let num_columns = 2;
let file_path = tempfile::NamedTempFile::new().unwrap();
append_two_rows(num_columns, file_path.path(), &col1, &col2);
// Appends a third row and prunes two rows, to make sure we prune from memory and disk
// offset list
prune_rows(num_columns, file_path.path(), &col1, &col2);
// Should be able to append new rows
append_two_rows(num_columns, file_path.path(), &col1, &col2);
// Simulate an unexpected shutdown before there's a chance to commit, and see that it
// unwinds successfully
test_append_consistency_no_commit(file_path.path(), &col1, &col2);
// Simulate an unexpected shutdown during commit, and see that it unwinds successfully
test_append_consistency_partial_commit(file_path.path(), &col1, &col2);
}
#[test]
fn test_pruner() {
let (col1, col2) = test_data(None);
let num_columns = 2;
let num_rows = 2;
// (missing_offsets, expected number of rows)
// If a row wasnt fully pruned, then it should clear it up as well
let missing_offsets_scenarios = [(1, 1), (2, 1), (3, 0)];
for (missing_offsets, expected_rows) in missing_offsets_scenarios {
let file_path = tempfile::NamedTempFile::new().unwrap();
append_two_rows(num_columns, file_path.path(), &col1, &col2);
simulate_interrupted_prune(num_columns, file_path.path(), num_rows, missing_offsets);
let nippy = NippyJar::load_without_header(file_path.path()).unwrap();
assert_eq!(nippy.rows, expected_rows);
}
}
fn test_append_consistency_partial_commit(
file_path: &Path,
col1: &[Vec<u8>],
col2: &[Vec<u8>],
) {
let mut nippy = NippyJar::load_without_header(file_path).unwrap();
// Set the baseline that should be unwinded to
let initial_rows = nippy.rows;
let initial_data_size =
File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
let initial_offset_size =
File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize;
assert!(initial_data_size > 0);
assert!(initial_offset_size > 0);
// Appends a third row
let mut writer = NippyJarWriter::new(&mut nippy).unwrap();
writer.append_column(Some(Ok(&col1[2]))).unwrap();
writer.append_column(Some(Ok(&col2[2]))).unwrap();
// Makes sure it doesn't write the last one offset (which is the expected file data size)
let _ = writer.offsets_mut().pop();
// `commit_offsets` is not a pub function. we call it here to simulate the shutdown before
// it can flush nippy.rows (config) to disk.
writer.commit_offsets().unwrap();
// Simulate an unexpected shutdown of the writer, before it can finish commit()
drop(writer);
let mut nippy = NippyJar::load_without_header(file_path).unwrap();
assert_eq!(initial_rows, nippy.rows);
// Data was written successfuly
let new_data_size =
File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
assert_eq!(new_data_size, initial_data_size + col1[2].len() + col2[2].len());
// It should be + 16 (two columns were added), but there's a missing one (the one we pop)
assert_eq!(
initial_offset_size + 8,
File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize
);
// Writer will execute a consistency check and verify first that the offset list on disk
// doesn't match the nippy.rows, and prune it. Then, it will prune the data file
// accordingly as well.
let _writer = NippyJarWriter::new(&mut nippy).unwrap();
assert_eq!(initial_rows, nippy.rows);
assert_eq!(
initial_offset_size,
File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize
);
assert_eq!(
initial_data_size,
File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize
);
assert_eq!(initial_rows, nippy.rows);
}
fn test_append_consistency_no_commit(file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
let mut nippy = NippyJar::load_without_header(file_path).unwrap();
// Set the baseline that should be unwinded to
let initial_rows = nippy.rows;
let initial_data_size =
File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
let initial_offset_size =
File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize;
assert!(initial_data_size > 0);
assert!(initial_offset_size > 0);
// Appends a third row, so we have an offset list in memory, which is not flushed to disk,
// while the data has been.
let mut writer = NippyJarWriter::new(&mut nippy).unwrap();
writer.append_column(Some(Ok(&col1[2]))).unwrap();
writer.append_column(Some(Ok(&col2[2]))).unwrap();
// Simulate an unexpected shutdown of the writer, before it can call commit()
drop(writer);
let mut nippy = NippyJar::load_without_header(file_path).unwrap();
assert_eq!(initial_rows, nippy.rows);
// Data was written successfuly
let new_data_size =
File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
assert_eq!(new_data_size, initial_data_size + col1[2].len() + col2[2].len());
// Since offsets only get written on commit(), this remains the same
assert_eq!(
initial_offset_size,
File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize
);
// Writer will execute a consistency check and verify that the data file has more data than
// it should, and resets it to the last offset of the list (on disk here)
let _writer = NippyJarWriter::new(&mut nippy).unwrap();
assert_eq!(initial_rows, nippy.rows);
assert_eq!(
initial_data_size,
File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize
);
assert_eq!(initial_rows, nippy.rows);
}
fn append_two_rows(num_columns: usize, file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
// Create and add 1 row
{
let mut nippy = NippyJar::new_without_header(num_columns, file_path);
nippy.freeze_config().unwrap();
assert_eq!(nippy.max_row_size, 0);
assert_eq!(nippy.rows, 0);
let mut writer = NippyJarWriter::new(&mut nippy).unwrap();
assert_eq!(writer.column(), 0);
writer.append_column(Some(Ok(&col1[0]))).unwrap();
assert_eq!(writer.column(), 1);
writer.append_column(Some(Ok(&col2[0]))).unwrap();
// Adding last column of a row resets writer and updates jar config
assert_eq!(writer.column(), 0);
// One offset per column + 1 offset at the end representing the expected file data size
assert_eq!(writer.offsets().len(), 3);
let expected_data_file_size = *writer.offsets().last().unwrap();
writer.commit().unwrap();
assert_eq!(nippy.max_row_size, col1[0].len() + col2[0].len());
assert_eq!(nippy.rows, 1);
assert_eq!(
File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len(),
1 + num_columns as u64 * 8 + 8
);
assert_eq!(
File::open(nippy.data_path()).unwrap().metadata().unwrap().len(),
expected_data_file_size
);
}
// Load and add 1 row
{
let mut nippy = NippyJar::load_without_header(file_path).unwrap();
// Check if it was committed successfuly
assert_eq!(nippy.max_row_size, col1[0].len() + col2[0].len());
assert_eq!(nippy.rows, 1);
let mut writer = NippyJarWriter::new(&mut nippy).unwrap();
assert_eq!(writer.column(), 0);
writer.append_column(Some(Ok(&col1[1]))).unwrap();
assert_eq!(writer.column(), 1);
writer.append_column(Some(Ok(&col2[1]))).unwrap();
// Adding last column of a row resets writer and updates jar config
assert_eq!(writer.column(), 0);
// One offset per column + 1 offset at the end representing the expected file data size
assert_eq!(writer.offsets().len(), 3);
let expected_data_file_size = *writer.offsets().last().unwrap();
writer.commit().unwrap();
assert_eq!(nippy.max_row_size, col1[0].len() + col2[0].len());
assert_eq!(nippy.rows, 2);
assert_eq!(
File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len(),
1 + nippy.rows as u64 * num_columns as u64 * 8 + 8
);
assert_eq!(
File::open(nippy.data_path()).unwrap().metadata().unwrap().len(),
expected_data_file_size
);
}
}
fn prune_rows(num_columns: usize, file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
let mut nippy = NippyJar::load_without_header(file_path).unwrap();
let mut writer = NippyJarWriter::new(&mut nippy).unwrap();
// Appends a third row, so we have an offset list in memory, which is not flushed to disk
writer.append_column(Some(Ok(&col1[2]))).unwrap();
writer.append_column(Some(Ok(&col2[2]))).unwrap();
// This should prune from the on-memory offset list and ondisk offset list
writer.prune_rows(2).unwrap();
assert_eq!(nippy.rows, 1);
assert_eq!(
File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len(),
1 + nippy.rows as u64 * num_columns as u64 * 8 + 8
);
let expected_data_size = col1[0].len() + col2[0].len();
assert_eq!(
File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize,
expected_data_size
);
let data_reader = nippy.open_data_reader().unwrap();
// there are only two valid offsets. so index 2 actually represents the expected file
// data size.
assert_eq!(data_reader.offset(2), expected_data_size as u64);
// This should prune from the ondisk offset list and clear the jar.
let mut writer = NippyJarWriter::new(&mut nippy).unwrap();
writer.prune_rows(1).unwrap();
assert_eq!(nippy.rows, 0);
assert_eq!(nippy.max_row_size, 0);
assert_eq!(File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize, 0);
// Only the byte that indicates how many bytes per offset should be left
assert_eq!(File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize, 1);
}
fn simulate_interrupted_prune(
num_columns: usize,
file_path: &Path,
num_rows: u64,
missing_offsets: u64,
) {
let mut nippy = NippyJar::load_without_header(file_path).unwrap();
let reader = nippy.open_data_reader().unwrap();
let offsets_file =
OpenOptions::new().read(true).write(true).open(nippy.offsets_path()).unwrap();
let offsets_len = 1 + num_rows * num_columns as u64 * 8 + 8;
assert_eq!(offsets_len, offsets_file.metadata().unwrap().len());
let data_file = OpenOptions::new().read(true).write(true).open(nippy.data_path()).unwrap();
let data_len = reader.reverse_offset(0).unwrap();
assert_eq!(data_len, data_file.metadata().unwrap().len());
// each data column is 32 bytes long
// by deleting from the data file, the `consistency_check` will go through both branches:
// when the offset list wasn't updated after clearing the data (data_len > last
// offset). fixing above, will lead to offset count not match the rows (*
// columns) of the configuration file
data_file.set_len(data_len - 32 * missing_offsets).unwrap();
// runs the consistency check.
let _ = NippyJarWriter::new(&mut nippy).unwrap();
}
}

View File

@@ -0,0 +1,408 @@
use crate::{compression::Compression, ColumnResult, NippyJar, NippyJarError, NippyJarHeader};
use std::{
cmp::Ordering,
fs::{File, OpenOptions},
io::{Read, Seek, SeekFrom, Write},
path::Path,
};
/// Size of one offset in bytes.
const OFFSET_SIZE_BYTES: u64 = 8;
/// Writer of [`NippyJar`]. Handles table data and offsets only.
///
/// Table data is written directly to disk, while offsets and configuration need to be flushed by
/// calling `commit()`.
///
/// ## Offset file layout
/// The first byte is the size of a single offset in bytes, `m`.
/// Then, the file contains `n` entries, each with a size of `m`. Each entry represents an offset,
/// except for the last entry, which represents both the total size of the data file, as well as the
/// next offset to write new data to.
///
/// ## Data file layout
/// The data file is represented just as a sequence of bytes of data without any delimiters
pub struct NippyJarWriter<'a, H> {
/// Reference to the associated [`NippyJar`], containing all necessary configurations for data
/// handling.
jar: &'a mut NippyJar<H>,
/// File handle to where the data is stored.
data_file: File,
/// File handle to where the offsets are stored.
offsets_file: File,
/// Temporary buffer to reuse when compressing data.
tmp_buf: Vec<u8>,
/// Used to find the maximum uncompressed size of a row in a jar.
uncompressed_row_size: usize,
/// Partial offset list which hasn't been flushed to disk.
offsets: Vec<u64>,
/// Column where writer is going to write next.
column: usize,
}
impl<'a, H: NippyJarHeader> NippyJarWriter<'a, H> {
pub fn new(jar: &'a mut NippyJar<H>) -> Result<Self, NippyJarError> {
let (data_file, offsets_file, is_created) =
Self::create_or_open_files(jar.data_path(), &jar.offsets_path())?;
let mut writer = Self {
jar,
data_file,
offsets_file,
tmp_buf: Vec::with_capacity(1_000_000),
uncompressed_row_size: 0,
offsets: Vec::with_capacity(1_000_000),
column: 0,
};
// If we are opening a previously created jar, we need to check its consistency, and make
// changes if necessary.
if !is_created {
writer.check_consistency_and_heal()?;
}
Ok(writer)
}
fn create_or_open_files(
data: &Path,
offsets: &Path,
) -> Result<(File, File, bool), NippyJarError> {
let is_created = !data.exists() || !offsets.exists();
let mut data_file = if !data.exists() {
File::create(data)?
} else {
OpenOptions::new().read(true).write(true).open(data)?
};
data_file.seek(SeekFrom::End(0))?;
let mut offsets_file = if !offsets.exists() {
let mut offsets = File::create(offsets)?;
// First byte of the offset file is the size of one offset in bytes
offsets.write_all(&[OFFSET_SIZE_BYTES as u8])?;
offsets.sync_all()?;
offsets
} else {
OpenOptions::new().read(true).write(true).open(offsets)?
};
offsets_file.seek(SeekFrom::End(0))?;
Ok((data_file, offsets_file, is_created))
}
/// Performs consistency checks on the [`NippyJar`] file and acts upon any issues:
/// * Is the offsets file size expected?
/// * Is the data file size expected?
///
/// This is based on the assumption that [`NippyJar`] configuration is **always** the last one
/// to be updated when something is written, as by the `commit()` function shows.
fn check_consistency_and_heal(&mut self) -> Result<(), NippyJarError> {
let reader = self.jar.open_data_reader()?;
// When an offset size is smaller than the initial (8), we are dealing with immutable
// data.
if reader.offset_size() != OFFSET_SIZE_BYTES {
return Err(NippyJarError::FrozenJar)
}
let expected_offsets_file_size = 1 + // first byte is the size of one offset
OFFSET_SIZE_BYTES * self.jar.rows as u64 * self.jar.columns as u64 + // `offset size * num rows * num columns`
OFFSET_SIZE_BYTES; // expected size of the data file
let actual_offsets_file_size = self.offsets_file.metadata()?.len();
// Offsets configuration wasn't properly committed
match expected_offsets_file_size.cmp(&actual_offsets_file_size) {
Ordering::Less => {
// Happened during an appending job
// TODO: ideally we could truncate until the last offset of the last column of the
// last row inserted
self.offsets_file.set_len(expected_offsets_file_size)?;
}
Ordering::Greater => {
// Happened during a pruning job
// `num rows = (file size - 1 - size of one offset) / num columns`
self.jar.rows = ((actual_offsets_file_size.
saturating_sub(1). // first byte is the size of one offset
saturating_sub(OFFSET_SIZE_BYTES) / // expected size of the data file
(self.jar.columns as u64)) /
OFFSET_SIZE_BYTES) as usize;
// Freeze row count changed
self.jar.freeze_config()?;
}
Ordering::Equal => {}
}
// last offset should match the data_file_len
let last_offset = reader.reverse_offset(0)?;
let data_file_len = self.data_file.metadata()?.len();
// Offset list wasn't properly committed
match last_offset.cmp(&data_file_len) {
Ordering::Less => {
// Happened during an appending job, so we need to truncate the data, since there's
// no way to recover it.
self.data_file.set_len(last_offset)?;
}
Ordering::Greater => {
// Happened during a pruning job, so we need to reverse iterate offsets until we
// find the matching one.
for index in 0..reader.offsets_count()? {
let offset = reader.reverse_offset(index + 1)?;
if offset == data_file_len {
self.offsets_file.set_len(
self.offsets_file
.metadata()?
.len()
.saturating_sub(OFFSET_SIZE_BYTES * (index as u64 + 1)),
)?;
drop(reader);
// Since we decrease the offset list, we need to check the consistency of
// `self.jar.rows` again
self.check_consistency_and_heal()?;
break
}
}
}
Ordering::Equal => {}
}
self.offsets_file.seek(SeekFrom::End(0))?;
self.data_file.seek(SeekFrom::End(0))?;
Ok(())
}
/// Appends rows to data file. `fn commit()` should be called to flush offsets and config to
/// disk.
///
/// `column_values_per_row`: A vector where each element is a column's values in sequence,
/// corresponding to each row. The vector's length equals the number of columns.
pub fn append_rows(
&mut self,
column_values_per_row: Vec<impl IntoIterator<Item = ColumnResult<impl AsRef<[u8]>>>>,
num_rows: u64,
) -> Result<(), NippyJarError> {
let mut column_iterators = column_values_per_row
.into_iter()
.map(|v| v.into_iter())
.collect::<Vec<_>>()
.into_iter();
for _ in 0..num_rows {
let mut iterators = Vec::with_capacity(self.jar.columns);
for mut column_iter in column_iterators {
self.append_column(column_iter.next())?;
iterators.push(column_iter);
}
column_iterators = iterators.into_iter();
}
Ok(())
}
/// Appends a column to data file. `fn commit()` should be called to flush offsets and config to
/// disk.
pub fn append_column(
&mut self,
column: Option<ColumnResult<impl AsRef<[u8]>>>,
) -> Result<(), NippyJarError> {
match column {
Some(Ok(value)) => {
if self.offsets.is_empty() {
// Represents the offset of the soon to be appended data column
self.offsets.push(self.data_file.stream_position()?);
}
self.write_column(value.as_ref())?;
// Last offset represents the size of the data file if no more data is to be
// appended. Otherwise, represents the offset of the next data item.
self.offsets.push(self.data_file.stream_position()?);
}
None => {
return Err(NippyJarError::UnexpectedMissingValue(
self.jar.rows as u64,
self.column as u64,
))
}
Some(Err(err)) => return Err(err.into()),
}
Ok(())
}
/// Writes column to data file. If it's the last column of the row, call `finalize_row()`
fn write_column(&mut self, value: &[u8]) -> Result<(), NippyJarError> {
self.uncompressed_row_size += value.len();
if let Some(compression) = &self.jar.compressor {
let before = self.tmp_buf.len();
let len = compression.compress_to(value, &mut self.tmp_buf)?;
self.data_file.write_all(&self.tmp_buf[before..before + len])?;
} else {
self.data_file.write_all(value)?;
}
self.column += 1;
if self.jar.columns == self.column {
self.finalize_row();
}
Ok(())
}
/// Prunes rows from data and offsets file and updates its configuration on disk
pub fn prune_rows(&mut self, num_rows: usize) -> Result<(), NippyJarError> {
// Each column of a row is one offset
let num_offsets = num_rows * self.jar.columns;
// Calculate the number of offsets to prune from in-memory list
let offsets_prune_count = num_offsets.min(self.offsets.len().saturating_sub(1)); // last element is the expected size of the data file
let remaining_to_prune = num_offsets.saturating_sub(offsets_prune_count);
// Prune in-memory offsets if needed
if offsets_prune_count > 0 {
// Determine new length based on the offset to prune up to
let new_len = self.offsets[(self.offsets.len() - 1) - offsets_prune_count]; // last element is the expected size of the data file
self.offsets.truncate(self.offsets.len() - offsets_prune_count);
// Truncate the data file to the new length
self.data_file.set_len(new_len)?;
}
// Prune from on-disk offset list if there are still rows left to prune
if remaining_to_prune > 0 {
// Get the current length of the on-disk offset file
let length = self.offsets_file.metadata()?.len();
// Handle non-empty offset file
if length > 1 {
// first byte is reserved for `bytes_per_offset`, which is 8 initially.
let num_offsets = (length - 1) / OFFSET_SIZE_BYTES;
if remaining_to_prune as u64 > num_offsets {
return Err(NippyJarError::InvalidPruning(
num_offsets,
remaining_to_prune as u64,
))
}
let new_num_offsets = num_offsets.saturating_sub(remaining_to_prune as u64);
// If all rows are to be pruned
if new_num_offsets <= 1 {
// <= 1 because the one offset would actually be the expected file data size
self.offsets_file.set_len(1)?;
self.data_file.set_len(0)?;
} else {
// Calculate the new length for the on-disk offset list
let new_len = 1 + new_num_offsets * OFFSET_SIZE_BYTES;
// Seek to the position of the last offset
self.offsets_file
.seek(SeekFrom::Start(new_len.saturating_sub(OFFSET_SIZE_BYTES)))?;
// Read the last offset value
let mut last_offset = [0u8; OFFSET_SIZE_BYTES as usize];
self.offsets_file.read_exact(&mut last_offset)?;
let last_offset = u64::from_le_bytes(last_offset);
// Update the lengths of both the offsets and data files
self.offsets_file.set_len(new_len)?;
self.data_file.set_len(last_offset)?;
}
} else {
return Err(NippyJarError::InvalidPruning(0, remaining_to_prune as u64))
}
}
self.offsets_file.sync_all()?;
self.data_file.sync_all()?;
self.offsets_file.seek(SeekFrom::End(0))?;
self.data_file.seek(SeekFrom::End(0))?;
self.jar.rows = self.jar.rows.saturating_sub(num_rows);
if self.jar.rows == 0 {
self.jar.max_row_size = 0;
}
self.jar.freeze_config()?;
Ok(())
}
/// Updates [`NippyJar`] with the new row count and maximum uncompressed row size, while
/// resetting internal fields.
fn finalize_row(&mut self) {
self.jar.max_row_size = self.jar.max_row_size.max(self.uncompressed_row_size);
self.jar.rows += 1;
self.tmp_buf.clear();
self.uncompressed_row_size = 0;
self.column = 0;
}
/// Commits configuration and offsets to disk. It drains the internal offset list.
pub fn commit(&mut self) -> Result<(), NippyJarError> {
self.data_file.sync_all()?;
self.commit_offsets()?;
// Flushes `max_row_size` and total `rows` to disk.
self.jar.freeze_config()?;
Ok(())
}
/// Flushes offsets to disk.
pub(crate) fn commit_offsets(&mut self) -> Result<(), NippyJarError> {
// The last offset on disk can be the first offset of `self.offsets` given how
// `append_column()` works alongside commit. So we need to skip it.
let mut last_offset_ondisk = None;
if self.offsets_file.metadata()?.len() > 1 {
self.offsets_file.seek(SeekFrom::End(-(OFFSET_SIZE_BYTES as i64)))?;
let mut buf = [0u8; OFFSET_SIZE_BYTES as usize];
self.offsets_file.read_exact(&mut buf)?;
last_offset_ondisk = Some(u64::from_le_bytes(buf));
}
self.offsets_file.seek(SeekFrom::End(0))?;
// Appends new offsets to disk
for offset in self.offsets.drain(..) {
if let Some(last_offset_ondisk) = last_offset_ondisk.take() {
if last_offset_ondisk == offset {
continue
}
}
self.offsets_file.write_all(&offset.to_le_bytes())?;
}
self.offsets_file.sync_all()?;
Ok(())
}
#[cfg(test)]
pub fn column(&self) -> usize {
self.column
}
#[cfg(test)]
pub fn offsets(&self) -> &[u64] {
&self.offsets
}
#[cfg(test)]
pub fn offsets_mut(&mut self) -> &mut Vec<u64> {
&mut self.offsets
}
}

View File

@@ -35,6 +35,7 @@ itertools.workspace = true
pin-project.workspace = true
parking_lot.workspace = true
dashmap = { version = "5.5", features = ["inline"] }
strum.workspace = true
# test-utils
alloy-rlp = { workspace = true, optional = true }

View File

@@ -36,3 +36,19 @@ pub use chain::{Chain, DisplayBlocksChain};
pub mod bundle_state;
pub use bundle_state::{BundleStateWithReceipts, OriginalValuesKnown, StateChanges, StateReverts};
pub(crate) fn to_range<R: std::ops::RangeBounds<u64>>(bounds: R) -> std::ops::Range<u64> {
let start = match bounds.start_bound() {
std::ops::Bound::Included(&v) => v,
std::ops::Bound::Excluded(&v) => v + 1,
std::ops::Bound::Unbounded => 0,
};
let end = match bounds.end_bound() {
std::ops::Bound::Included(&v) => v + 1,
std::ops::Bound::Excluded(&v) => v,
std::ops::Bound::Unbounded => u64::MAX,
};
start..end
}

View File

@@ -83,12 +83,12 @@ impl<DB> ProviderFactory<DB> {
mut self,
snapshots_path: PathBuf,
highest_snapshot_tracker: watch::Receiver<Option<HighestSnapshots>>,
) -> Self {
) -> ProviderResult<Self> {
self.snapshot_provider = Some(Arc::new(
SnapshotProvider::new(snapshots_path)
SnapshotProvider::new(snapshots_path)?
.with_highest_tracker(Some(highest_snapshot_tracker)),
));
self
Ok(self)
}
/// Returns reference to the underlying database.
@@ -401,6 +401,13 @@ impl<DB: Database> ReceiptProvider for ProviderFactory<DB> {
fn receipts_by_block(&self, block: BlockHashOrNumber) -> ProviderResult<Option<Vec<Receipt>>> {
self.provider()?.receipts_by_block(block)
}
fn receipts_by_tx_range(
&self,
range: impl RangeBounds<TxNumber>,
) -> ProviderResult<Vec<Receipt>> {
self.provider()?.receipts_by_tx_range(range)
}
}
impl<DB: Database> WithdrawalsProvider for ProviderFactory<DB> {

View File

@@ -1,6 +1,7 @@
use crate::{
bundle_state::{BundleStateInit, BundleStateWithReceipts, HashedStateChanges, RevertsInit},
providers::{database::metrics, SnapshotProvider},
to_range,
traits::{
AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
},
@@ -39,9 +40,9 @@ use reth_primitives::{
trie::Nibbles,
Account, Address, Block, BlockHash, BlockHashOrNumber, BlockNumber, BlockWithSenders,
ChainInfo, ChainSpec, GotExpected, Hardfork, Head, Header, PruneCheckpoint, PruneModes,
PruneSegment, Receipt, SealedBlock, SealedBlockWithSenders, SealedHeader, StorageEntry,
TransactionMeta, TransactionSigned, TransactionSignedEcRecovered, TransactionSignedNoHash,
TxHash, TxNumber, Withdrawal, B256, U256,
PruneSegment, Receipt, SealedBlock, SealedBlockWithSenders, SealedHeader, SnapshotSegment,
StorageEntry, TransactionMeta, TransactionSigned, TransactionSignedEcRecovered,
TransactionSignedNoHash, TxHash, TxNumber, Withdrawal, B256, U256,
};
use reth_trie::{
hashed_cursor::HashedPostState, prefix_set::PrefixSetMut, updates::TrieUpdates, StateRoot,
@@ -117,7 +118,7 @@ impl<TX: DbTx> DatabaseProvider<TX> {
&self,
range: impl RangeBounds<T::Key>,
mut f: impl FnMut(T::Value) -> Result<R, DatabaseError>,
) -> Result<Vec<R>, DatabaseError> {
) -> ProviderResult<Vec<R>> {
self.cursor_read_collect_with_key::<T, R>(range, |_, v| f(v))
}
@@ -125,7 +126,7 @@ impl<TX: DbTx> DatabaseProvider<TX> {
&self,
range: impl RangeBounds<T::Key>,
f: impl FnMut(T::Key, T::Value) -> Result<R, DatabaseError>,
) -> Result<Vec<R>, DatabaseError> {
) -> ProviderResult<Vec<R>> {
let capacity = match range_size_hint(&range) {
Some(0) | None => return Ok(Vec::new()),
Some(capacity) => capacity,
@@ -139,7 +140,7 @@ impl<TX: DbTx> DatabaseProvider<TX> {
cursor: &mut impl DbCursorRO<T>,
range: impl RangeBounds<T::Key>,
mut f: impl FnMut(T::Value) -> Result<R, DatabaseError>,
) -> Result<Vec<R>, DatabaseError> {
) -> ProviderResult<Vec<R>> {
let capacity = range_size_hint(&range).unwrap_or(0);
self.cursor_collect_with_capacity(cursor, range, capacity, |_, v| f(v))
}
@@ -150,7 +151,7 @@ impl<TX: DbTx> DatabaseProvider<TX> {
range: impl RangeBounds<T::Key>,
capacity: usize,
mut f: impl FnMut(T::Key, T::Value) -> Result<R, DatabaseError>,
) -> Result<Vec<R>, DatabaseError> {
) -> ProviderResult<Vec<R>> {
let mut items = Vec::with_capacity(capacity);
for entry in cursor.walk_range(range)? {
let (key, value) = entry?;
@@ -245,6 +246,113 @@ impl<TX: DbTx> DatabaseProvider<TX> {
.walk(Some(T::Key::default()))?
.collect::<Result<Vec<_>, DatabaseError>>()
}
/// Gets data within a specified range, potentially spanning different snapshots and database.
///
/// # Arguments
/// * `segment` - The segment of the snapshot to query.
/// * `block_range` - The range of data to fetch.
/// * `fetch_from_snapshot` - A function to fetch data from the snapshot.
/// * `fetch_from_database` - A function to fetch data from the database.
/// * `predicate` - A function used to evaluate each item in the fetched data. Fetching is
/// terminated when this function returns false, thereby filtering the data based on the
/// provided condition.
fn get_range_with_snapshot<T, P, FS, FD>(
&self,
segment: SnapshotSegment,
mut block_or_tx_range: Range<u64>,
fetch_from_snapshot: FS,
mut fetch_from_database: FD,
mut predicate: P,
) -> ProviderResult<Vec<T>>
where
FS: Fn(&SnapshotProvider, Range<u64>, &mut P) -> ProviderResult<Vec<T>>,
FD: FnMut(Range<u64>, P) -> ProviderResult<Vec<T>>,
P: FnMut(&T) -> bool,
{
let mut data = Vec::new();
if let Some(snapshot_provider) = &self.snapshot_provider {
// If there is, check the maximum block or transaction number of the segment.
if let Some(snapshot_upper_bound) = match segment {
SnapshotSegment::Headers => snapshot_provider.get_highest_snapshot_block(segment),
SnapshotSegment::Transactions | SnapshotSegment::Receipts => {
snapshot_provider.get_highest_snapshot_tx(segment)
}
} {
if block_or_tx_range.start <= snapshot_upper_bound {
let end = block_or_tx_range.end.min(snapshot_upper_bound + 1);
data.extend(fetch_from_snapshot(
snapshot_provider,
block_or_tx_range.start..end,
&mut predicate,
)?);
block_or_tx_range.start = end;
}
}
}
if block_or_tx_range.end > block_or_tx_range.start {
data.extend(fetch_from_database(block_or_tx_range, predicate)?)
}
Ok(data)
}
/// Retrieves data from the database or snapshot, wherever it's available.
///
/// # Arguments
/// * `segment` - The segment of the snapshot to check against.
/// * `index_key` - Requested index key, usually a block or transaction number.
/// * `fetch_from_snapshot` - A closure that defines how to fetch the data from the snapshot
/// provider.
/// * `fetch_from_database` - A closure that defines how to fetch the data from the database
/// when the snapshot doesn't contain the required data or is not available.
fn get_with_snapshot<T, FS, FD>(
&self,
segment: SnapshotSegment,
number: u64,
fetch_from_snapshot: FS,
fetch_from_database: FD,
) -> ProviderResult<Option<T>>
where
FS: Fn(&SnapshotProvider) -> ProviderResult<Option<T>>,
FD: Fn() -> ProviderResult<Option<T>>,
{
if let Some(provider) = &self.snapshot_provider {
// If there is, check the maximum block or transaction number of the segment.
let snapshot_upper_bound = match segment {
SnapshotSegment::Headers => provider.get_highest_snapshot_block(segment),
SnapshotSegment::Transactions | SnapshotSegment::Receipts => {
provider.get_highest_snapshot_tx(segment)
}
};
if snapshot_upper_bound
.map_or(false, |snapshot_upper_bound| snapshot_upper_bound >= number)
{
return fetch_from_snapshot(provider);
}
}
fetch_from_database()
}
fn transactions_by_tx_range_with_cursor<C>(
&self,
range: impl RangeBounds<TxNumber>,
cursor: &mut C,
) -> ProviderResult<Vec<TransactionSignedNoHash>>
where
C: DbCursorRO<tables::Transactions>,
{
self.get_range_with_snapshot(
SnapshotSegment::Transactions,
to_range(range),
|snapshot, range, _| snapshot.transactions_by_tx_range(range),
|range, _| self.cursor_collect(cursor, range, Ok),
|_| true,
)
}
}
impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
@@ -981,7 +1089,12 @@ impl<TX: DbTx> HeaderProvider for DatabaseProvider<TX> {
}
fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Header>> {
Ok(self.tx.get::<tables::Headers>(num)?)
self.get_with_snapshot(
SnapshotSegment::Headers,
num,
|snapshot| snapshot.header_by_number(num),
|| Ok(self.tx.get::<tables::Headers>(num)?),
)
}
fn header_td(&self, block_hash: &BlockHash) -> ProviderResult<Option<U256>> {
@@ -999,52 +1112,81 @@ impl<TX: DbTx> HeaderProvider for DatabaseProvider<TX> {
return Ok(Some(td));
}
Ok(self.tx.get::<tables::HeaderTD>(number)?.map(|td| td.0))
self.get_with_snapshot(
SnapshotSegment::Headers,
number,
|snapshot| snapshot.header_td_by_number(number),
|| Ok(self.tx.get::<tables::HeaderTD>(number)?.map(|td| td.0)),
)
}
fn headers_range(&self, range: impl RangeBounds<BlockNumber>) -> ProviderResult<Vec<Header>> {
let mut cursor = self.tx.cursor_read::<tables::Headers>()?;
cursor
.walk_range(range)?
.map(|result| result.map(|(_, header)| header).map_err(Into::into))
.collect::<ProviderResult<Vec<_>>>()
self.get_range_with_snapshot(
SnapshotSegment::Headers,
to_range(range),
|snapshot, range, _| snapshot.headers_range(range),
|range, _| {
self.cursor_read_collect::<tables::Headers, _>(range, Ok).map_err(Into::into)
},
|_| true,
)
}
fn sealed_header(&self, number: BlockNumber) -> ProviderResult<Option<SealedHeader>> {
if let Some(header) = self.header_by_number(number)? {
let hash = self
.block_hash(number)?
.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
Ok(Some(header.seal(hash)))
} else {
Ok(None)
}
self.get_with_snapshot(
SnapshotSegment::Headers,
number,
|snapshot| snapshot.sealed_header(number),
|| {
if let Some(header) = self.header_by_number(number)? {
let hash = self
.block_hash(number)?
.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
Ok(Some(header.seal(hash)))
} else {
Ok(None)
}
},
)
}
fn sealed_headers_while(
&self,
range: impl RangeBounds<BlockNumber>,
mut predicate: impl FnMut(&SealedHeader) -> bool,
predicate: impl FnMut(&SealedHeader) -> bool,
) -> ProviderResult<Vec<SealedHeader>> {
let mut headers = vec![];
for entry in self.tx.cursor_read::<tables::Headers>()?.walk_range(range)? {
let (number, header) = entry?;
let hash = self
.block_hash(number)?
.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
let sealed = header.seal(hash);
if !predicate(&sealed) {
break;
}
headers.push(sealed);
}
Ok(headers)
self.get_range_with_snapshot(
SnapshotSegment::Headers,
to_range(range),
|snapshot, range, predicate| snapshot.sealed_headers_while(range, predicate),
|range, mut predicate| {
let mut headers = vec![];
for entry in self.tx.cursor_read::<tables::Headers>()?.walk_range(range)? {
let (number, header) = entry?;
let hash = self
.block_hash(number)?
.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
let sealed = header.seal(hash);
if !predicate(&sealed) {
break
}
headers.push(sealed);
}
Ok(headers)
},
predicate,
)
}
}
impl<TX: DbTx> BlockHashReader for DatabaseProvider<TX> {
fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
Ok(self.tx.get::<tables::CanonicalHeaders>(number)?)
self.get_with_snapshot(
SnapshotSegment::Headers,
number,
|snapshot| snapshot.block_hash(number),
|| Ok(self.tx.get::<tables::CanonicalHeaders>(number)?),
)
}
fn canonical_hashes_range(
@@ -1052,12 +1194,16 @@ impl<TX: DbTx> BlockHashReader for DatabaseProvider<TX> {
start: BlockNumber,
end: BlockNumber,
) -> ProviderResult<Vec<B256>> {
let range = start..end;
let mut cursor = self.tx.cursor_read::<tables::CanonicalHeaders>()?;
cursor
.walk_range(range)?
.map(|result| result.map(|(_, hash)| hash).map_err(Into::into))
.collect::<ProviderResult<Vec<_>>>()
self.get_range_with_snapshot(
SnapshotSegment::Headers,
start..end,
|snapshot, range, _| snapshot.canonical_hashes_range(range.start, range.end),
|range, _| {
self.cursor_read_collect::<tables::CanonicalHeaders, _>(range, Ok)
.map_err(Into::into)
},
|_| true,
)
}
}
@@ -1227,7 +1373,14 @@ impl<TX: DbTx> BlockReader for DatabaseProvider<TX> {
// we skip the block.
if let Some((_, block_body_indices)) = block_body_cursor.seek_exact(num)? {
let tx_range = block_body_indices.tx_num_range();
let body = self.cursor_collect(&mut tx_cursor, tx_range, |tx| Ok(tx.into()))?;
let body = if tx_range.is_empty() {
Vec::new()
} else {
self.transactions_by_tx_range_with_cursor(tx_range, &mut tx_cursor)?
.into_iter()
.map(Into::into)
.collect()
};
// If we are past shanghai, then all blocks should have a withdrawal list,
// even if empty
@@ -1263,53 +1416,63 @@ impl<TX: DbTx> TransactionsProviderExt for DatabaseProvider<TX> {
&self,
tx_range: Range<TxNumber>,
) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
let mut tx_cursor = self.tx.cursor_read::<tables::Transactions>()?;
let tx_range_size = tx_range.clone().count();
let tx_walker = tx_cursor.walk_range(tx_range)?;
self.get_range_with_snapshot(
SnapshotSegment::Transactions,
tx_range,
|snapshot, range, _| snapshot.transaction_hashes_by_range(range),
|tx_range, _| {
let mut tx_cursor = self.tx.cursor_read::<tables::Transactions>()?;
let tx_range_size = tx_range.clone().count();
let tx_walker = tx_cursor.walk_range(tx_range)?;
let chunk_size = (tx_range_size / rayon::current_num_threads()).max(1);
let mut channels = Vec::with_capacity(chunk_size);
let mut transaction_count = 0;
let chunk_size = (tx_range_size / rayon::current_num_threads()).max(1);
let mut channels = Vec::with_capacity(chunk_size);
let mut transaction_count = 0;
#[inline]
fn calculate_hash(
entry: Result<(TxNumber, TransactionSignedNoHash), DatabaseError>,
rlp_buf: &mut Vec<u8>,
) -> Result<(B256, TxNumber), Box<ProviderError>> {
let (tx_id, tx) = entry.map_err(|e| Box::new(e.into()))?;
tx.transaction.encode_with_signature(&tx.signature, rlp_buf, false);
Ok((keccak256(rlp_buf), tx_id))
}
for chunk in &tx_walker.chunks(chunk_size) {
let (tx, rx) = mpsc::channel();
channels.push(rx);
// Note: Unfortunate side-effect of how chunk is designed in itertools (it is not Send)
let chunk: Vec<_> = chunk.collect();
transaction_count += chunk.len();
// Spawn the task onto the global rayon pool
// This task will send the results through the channel after it has calculated the hash.
rayon::spawn(move || {
let mut rlp_buf = Vec::with_capacity(128);
for entry in chunk {
rlp_buf.clear();
let _ = tx.send(calculate_hash(entry, &mut rlp_buf));
#[inline]
fn calculate_hash(
entry: Result<(TxNumber, TransactionSignedNoHash), DatabaseError>,
rlp_buf: &mut Vec<u8>,
) -> Result<(B256, TxNumber), Box<ProviderError>> {
let (tx_id, tx) = entry.map_err(|e| Box::new(e.into()))?;
tx.transaction.encode_with_signature(&tx.signature, rlp_buf, false);
Ok((keccak256(rlp_buf), tx_id))
}
});
}
let mut tx_list = Vec::with_capacity(transaction_count);
// Iterate over channels and append the tx hashes unsorted
for channel in channels {
while let Ok(tx) = channel.recv() {
let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
tx_list.push((tx_hash, tx_id));
}
}
for chunk in &tx_walker.chunks(chunk_size) {
let (tx, rx) = mpsc::channel();
channels.push(rx);
Ok(tx_list)
// Note: Unfortunate side-effect of how chunk is designed in itertools (it is
// not Send)
let chunk: Vec<_> = chunk.collect();
transaction_count += chunk.len();
// Spawn the task onto the global rayon pool
// This task will send the results through the channel after it has calculated
// the hash.
rayon::spawn(move || {
let mut rlp_buf = Vec::with_capacity(128);
for entry in chunk {
rlp_buf.clear();
let _ = tx.send(calculate_hash(entry, &mut rlp_buf));
}
});
}
let mut tx_list = Vec::with_capacity(transaction_count);
// Iterate over channels and append the tx hashes unsorted
for channel in channels {
while let Ok(tx) = channel.recv() {
let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
tx_list.push((tx_hash, tx_id));
}
}
Ok(tx_list)
},
|_| true,
)
}
}
@@ -1321,14 +1484,24 @@ impl<TX: DbTx> TransactionsProvider for DatabaseProvider<TX> {
}
fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<TransactionSigned>> {
Ok(self.tx.get::<tables::Transactions>(id)?.map(Into::into))
self.get_with_snapshot(
SnapshotSegment::Transactions,
id,
|snapshot| snapshot.transaction_by_id(id),
|| Ok(self.tx.get::<tables::Transactions>(id)?.map(Into::into)),
)
}
fn transaction_by_id_no_hash(
&self,
id: TxNumber,
) -> ProviderResult<Option<TransactionSignedNoHash>> {
Ok(self.tx.get::<tables::Transactions>(id)?)
self.get_with_snapshot(
SnapshotSegment::Transactions,
id,
|snapshot| snapshot.transaction_by_id_no_hash(id),
|| Ok(self.tx.get::<tables::Transactions>(id)?),
)
}
fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<TransactionSigned>> {
@@ -1396,14 +1569,21 @@ impl<TX: DbTx> TransactionsProvider for DatabaseProvider<TX> {
&self,
id: BlockHashOrNumber,
) -> ProviderResult<Option<Vec<TransactionSigned>>> {
let mut tx_cursor = self.tx.cursor_read::<tables::Transactions>()?;
if let Some(block_number) = self.convert_hash_or_number(id)? {
if let Some(body) = self.block_body_indices(block_number)? {
return self
.cursor_read_collect::<tables::Transactions, _>(body.tx_num_range(), |tx| {
Ok(tx.into())
})
.map(Some)
.map_err(Into::into);
let tx_range = body.tx_num_range();
return if tx_range.is_empty() {
Ok(Some(Vec::new()))
} else {
Ok(Some(
self.transactions_by_tx_range_with_cursor(tx_range, &mut tx_cursor)?
.into_iter()
.map(Into::into)
.collect(),
))
}
}
}
Ok(None)
@@ -1414,17 +1594,33 @@ impl<TX: DbTx> TransactionsProvider for DatabaseProvider<TX> {
range: impl RangeBounds<BlockNumber>,
) -> ProviderResult<Vec<Vec<TransactionSigned>>> {
let mut tx_cursor = self.tx.cursor_read::<tables::Transactions>()?;
self.cursor_read_collect::<tables::BlockBodyIndices, _>(range, |body| {
self.cursor_collect(&mut tx_cursor, body.tx_num_range(), |tx| Ok(tx.into()))
})
.map_err(Into::into)
let mut results = Vec::new();
let mut body_cursor = self.tx.cursor_read::<tables::BlockBodyIndices>()?;
for entry in body_cursor.walk_range(range)? {
let (_, body) = entry?;
let tx_num_range = body.tx_num_range();
if tx_num_range.is_empty() {
results.push(Vec::new());
} else {
results.push(
self.transactions_by_tx_range_with_cursor(tx_num_range, &mut tx_cursor)?
.into_iter()
.map(Into::into)
.collect(),
);
}
}
Ok(results)
}
fn transactions_by_tx_range(
&self,
range: impl RangeBounds<TxNumber>,
) -> ProviderResult<Vec<TransactionSignedNoHash>> {
self.cursor_read_collect::<tables::Transactions, _>(range, Ok).map_err(Into::into)
self.transactions_by_tx_range_with_cursor(
range,
&mut self.tx.cursor_read::<tables::Transactions>()?,
)
}
fn senders_by_tx_range(
@@ -1441,7 +1637,12 @@ impl<TX: DbTx> TransactionsProvider for DatabaseProvider<TX> {
impl<TX: DbTx> ReceiptProvider for DatabaseProvider<TX> {
fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Receipt>> {
Ok(self.tx.get::<tables::Receipts>(id)?)
self.get_with_snapshot(
SnapshotSegment::Receipts,
id,
|snapshot| snapshot.receipt(id),
|| Ok(self.tx.get::<tables::Receipts>(id)?),
)
}
fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Receipt>> {
@@ -1455,14 +1656,31 @@ impl<TX: DbTx> ReceiptProvider for DatabaseProvider<TX> {
fn receipts_by_block(&self, block: BlockHashOrNumber) -> ProviderResult<Option<Vec<Receipt>>> {
if let Some(number) = self.convert_hash_or_number(block)? {
if let Some(body) = self.block_body_indices(number)? {
return self
.cursor_read_collect::<tables::Receipts, _>(body.tx_num_range(), Ok)
.map(Some)
.map_err(Into::into);
let tx_range = body.tx_num_range();
return if tx_range.is_empty() {
Ok(Some(Vec::new()))
} else {
self.receipts_by_tx_range(tx_range).map(Some)
}
}
}
Ok(None)
}
fn receipts_by_tx_range(
&self,
range: impl RangeBounds<TxNumber>,
) -> ProviderResult<Vec<Receipt>> {
self.get_range_with_snapshot(
SnapshotSegment::Receipts,
to_range(range),
|snapshot, range, _| snapshot.receipts_by_tx_range(range),
|range, _| {
self.cursor_read_collect::<tables::Receipts, _>(range, Ok).map_err(Into::into)
},
|_| true,
)
}
}
impl<TX: DbTx> WithdrawalsProvider for DatabaseProvider<TX> {

View File

@@ -374,6 +374,13 @@ where
fn receipts_by_block(&self, block: BlockHashOrNumber) -> ProviderResult<Option<Vec<Receipt>>> {
self.database.provider()?.receipts_by_block(block)
}
fn receipts_by_tx_range(
&self,
range: impl RangeBounds<TxNumber>,
) -> ProviderResult<Vec<Receipt>> {
self.database.provider()?.receipts_by_tx_range(range)
}
}
impl<DB, Tree> ReceiptProviderIdExt for BlockchainProvider<DB, Tree>
where

View File

@@ -1,6 +1,7 @@
use super::LoadedJarRef;
use crate::{
BlockHashReader, BlockNumReader, HeaderProvider, ReceiptProvider, TransactionsProvider,
to_range, BlockHashReader, BlockNumReader, HeaderProvider, ReceiptProvider,
TransactionsProvider,
};
use reth_db::{
codecs::CompactU256,
@@ -11,7 +12,7 @@ use reth_primitives::{
Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, Header, Receipt, SealedHeader,
TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber, B256, U256,
};
use std::ops::{Deref, Range, RangeBounds};
use std::ops::{Deref, RangeBounds};
/// Provider over a specific `NippyJar` and range.
#[derive(Debug)]
@@ -285,20 +286,20 @@ impl<'a> ReceiptProvider for SnapshotJarProvider<'a> {
// provider with `receipt()` instead for each
Err(ProviderError::UnsupportedProvider)
}
}
fn to_range<R: RangeBounds<u64>>(bounds: R) -> Range<u64> {
let start = match bounds.start_bound() {
std::ops::Bound::Included(&v) => v,
std::ops::Bound::Excluded(&v) => v + 1,
std::ops::Bound::Unbounded => 0,
};
let end = match bounds.end_bound() {
std::ops::Bound::Included(&v) => v + 1,
std::ops::Bound::Excluded(&v) => v,
std::ops::Bound::Unbounded => u64::MAX,
};
start..end
fn receipts_by_tx_range(
&self,
range: impl RangeBounds<TxNumber>,
) -> ProviderResult<Vec<Receipt>> {
let range = to_range(range);
let mut cursor = self.cursor()?;
let mut receipts = Vec::with_capacity((range.end - range.start) as usize);
for num in range {
if let Some(tx) = cursor.get_one::<ReceiptMask<Receipt>>(num.into())? {
receipts.push(tx)
}
}
Ok(receipts)
}
}

View File

@@ -1,22 +1,27 @@
use super::{LoadedJar, SnapshotJarProvider};
use crate::{BlockHashReader, BlockNumReader, HeaderProvider, TransactionsProvider};
use crate::{
to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, HeaderProvider,
ReceiptProvider, TransactionVariant, TransactionsProvider, TransactionsProviderExt,
WithdrawalsProvider,
};
use dashmap::DashMap;
use parking_lot::RwLock;
use reth_db::{
codecs::CompactU256,
snapshot::{HeaderMask, TransactionMask},
models::StoredBlockBodyIndices,
snapshot::{iter_snapshots, HeaderMask, ReceiptMask, SnapshotCursor, TransactionMask},
};
use reth_interfaces::provider::{ProviderError, ProviderResult};
use reth_nippy_jar::NippyJar;
use reth_primitives::{
snapshot::HighestSnapshots, Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo,
Header, SealedHeader, SnapshotSegment, TransactionMeta, TransactionSigned,
TransactionSignedNoHash, TxHash, TxNumber, B256, U256,
snapshot::HighestSnapshots, Address, Block, BlockHash, BlockHashOrNumber, BlockNumber,
BlockWithSenders, ChainInfo, Header, Receipt, SealedBlock, SealedBlockWithSenders,
SealedHeader, SnapshotSegment, TransactionMeta, TransactionSigned, TransactionSignedNoHash,
TxHash, TxNumber, Withdrawal, B256, U256,
};
use revm::primitives::HashMap;
use std::{
collections::BTreeMap,
ops::{RangeBounds, RangeInclusive},
collections::{hash_map::Entry, BTreeMap, HashMap},
ops::{Range, RangeBounds, RangeInclusive},
path::{Path, PathBuf},
};
use tokio::sync::watch;
@@ -36,26 +41,39 @@ pub struct SnapshotProvider {
/// Maintains a map which allows for concurrent access to different `NippyJars`, over different
/// segments and ranges.
map: DashMap<(BlockNumber, SnapshotSegment), LoadedJar>,
/// Available snapshot ranges on disk indexed by max blocks.
/// Available snapshot transaction ranges on disk indexed by max blocks.
snapshots_block_index: RwLock<SegmentRanges>,
/// Available snapshot ranges on disk indexed by max transactions.
/// Available snapshot block ranges on disk indexed by max transactions.
snapshots_tx_index: RwLock<SegmentRanges>,
/// Tracks the highest snapshot of every segment.
highest_tracker: Option<watch::Receiver<Option<HighestSnapshots>>>,
/// Directory where snapshots are located
path: PathBuf,
/// Whether [`SnapshotJarProvider`] loads filters into memory. If not, `by_hash` queries won't
/// be able to be queried directly.
load_filters: bool,
}
impl SnapshotProvider {
/// Creates a new [`SnapshotProvider`].
pub fn new(path: impl AsRef<Path>) -> Self {
Self {
pub fn new(path: impl AsRef<Path>) -> ProviderResult<Self> {
let provider = Self {
map: Default::default(),
snapshots_block_index: Default::default(),
snapshots_tx_index: Default::default(),
highest_tracker: None,
path: path.as_ref().to_path_buf(),
}
load_filters: false,
};
provider.update_index()?;
Ok(provider)
}
/// Loads filters into memory when creating a [`SnapshotJarProvider`].
pub fn with_filters(mut self) -> Self {
self.load_filters = true;
self
}
/// Adds a highest snapshot tracker to the provider
@@ -141,12 +159,15 @@ impl SnapshotProvider {
if let Some(jar) = self.map.get(&key) {
Ok(jar.into())
} else {
self.map.insert(
key,
LoadedJar::new(NippyJar::load(
&self.path.join(segment.filename(block_range, tx_range)),
)?)?,
);
let jar = NippyJar::load(&self.path.join(segment.filename(block_range, tx_range)))
.map(|jar| {
if self.load_filters {
return jar.load_filters()
}
Ok(jar)
})??;
self.map.insert(key, LoadedJar::new(jar)?);
Ok(self.map.get(&key).expect("qed").into())
}
}
@@ -166,6 +187,10 @@ impl SnapshotProvider {
let mut snapshots_rev_iter = segment_snapshots.iter().rev().peekable();
while let Some((block_end, tx_range)) = snapshots_rev_iter.next() {
if block > *block_end {
// request block is higher than highest snapshot block
return None
}
// `unwrap_or(0) is safe here as it sets block_start to 0 if the iterator is empty,
// indicating the lowest height snapshot has been reached.
let block_start =
@@ -192,6 +217,10 @@ impl SnapshotProvider {
let mut snapshots_rev_iter = segment_snapshots.iter().rev().peekable();
while let Some((tx_end, block_range)) = snapshots_rev_iter.next() {
if tx > *tx_end {
// request tx is higher than highest snapshot tx
return None
}
let tx_start = snapshots_rev_iter.peek().map(|(tx_end, _)| *tx_end + 1).unwrap_or(0);
if tx_start <= tx {
return Some((block_range.clone(), tx_start..=*tx_end))
@@ -200,11 +229,53 @@ impl SnapshotProvider {
None
}
/// Gets the highest snapshot if it exists for a snapshot segment.
pub fn get_highest_snapshot(&self, segment: SnapshotSegment) -> Option<BlockNumber> {
self.highest_tracker
.as_ref()
.and_then(|tracker| tracker.borrow().and_then(|highest| highest.highest(segment)))
/// Updates the inner transaction and block index
pub fn update_index(&self) -> ProviderResult<()> {
let mut block_index = self.snapshots_block_index.write();
let mut tx_index = self.snapshots_tx_index.write();
for (segment, ranges) in iter_snapshots(&self.path)? {
for (block_range, tx_range) in ranges {
let block_end = *block_range.end();
let tx_end = *tx_range.end();
match tx_index.entry(segment) {
Entry::Occupied(mut index) => {
index.get_mut().insert(tx_end, block_range);
}
Entry::Vacant(index) => {
index.insert(BTreeMap::from([(tx_end, block_range)]));
}
};
match block_index.entry(segment) {
Entry::Occupied(mut index) => {
index.get_mut().insert(block_end, tx_range);
}
Entry::Vacant(index) => {
index.insert(BTreeMap::from([(block_end, tx_range)]));
}
};
}
}
Ok(())
}
/// Gets the highest snapshot block if it exists for a snapshot segment.
pub fn get_highest_snapshot_block(&self, segment: SnapshotSegment) -> Option<BlockNumber> {
self.snapshots_block_index
.read()
.get(&segment)
.and_then(|index| index.last_key_value().map(|(last_block, _)| *last_block))
}
/// Gets the highest snapshotted transaction.
pub fn get_highest_snapshot_tx(&self, segment: SnapshotSegment) -> Option<TxNumber> {
self.snapshots_tx_index
.read()
.get(&segment)
.and_then(|index| index.last_key_value().map(|(last_tx, _)| *last_tx))
}
/// Iterates through segment snapshots in reverse order, executing a function until it returns
@@ -238,6 +309,56 @@ impl SnapshotProvider {
Ok(None)
}
/// Fetches data within a specified range across multiple snapshot files.
///
/// This function iteratively retrieves data using `get_fn` for each item in the given range.
/// It continues fetching until the end of the range is reached or the provided `predicate`
/// returns false.
pub fn fetch_range<T, F, P>(
&self,
segment: SnapshotSegment,
range: Range<u64>,
get_fn: F,
mut predicate: P,
) -> ProviderResult<Vec<T>>
where
F: Fn(&mut SnapshotCursor<'_>, u64) -> ProviderResult<Option<T>>,
P: FnMut(&T) -> bool,
{
let get_provider = |start: u64| match segment {
SnapshotSegment::Headers => self.get_segment_provider_from_block(segment, start, None),
SnapshotSegment::Transactions | SnapshotSegment::Receipts => {
self.get_segment_provider_from_transaction(segment, start, None)
}
};
let mut result = Vec::with_capacity((range.end - range.start).min(100) as usize);
let mut provider = get_provider(range.start)?;
let mut cursor = provider.cursor()?;
// advances number in range
'outer: for number in range {
// advances snapshot files if `get_fn` returns None
'inner: loop {
match get_fn(&mut cursor, number)? {
Some(res) => {
if !predicate(&res) {
break 'outer
}
result.push(res);
break 'inner;
}
None => {
provider = get_provider(number)?;
cursor = provider.cursor()?;
}
}
}
}
Ok(result)
}
}
impl HeaderProvider for SnapshotProvider {
@@ -274,8 +395,13 @@ impl HeaderProvider for SnapshotProvider {
.header_td_by_number(num)
}
fn headers_range(&self, _range: impl RangeBounds<BlockNumber>) -> ProviderResult<Vec<Header>> {
todo!();
fn headers_range(&self, range: impl RangeBounds<BlockNumber>) -> ProviderResult<Vec<Header>> {
self.fetch_range(
SnapshotSegment::Headers,
to_range(range),
|cursor, number| cursor.get_one::<HeaderMask<Header>>(number.into()),
|_| true,
)
}
fn sealed_header(&self, num: BlockNumber) -> ProviderResult<Option<SealedHeader>> {
@@ -285,10 +411,19 @@ impl HeaderProvider for SnapshotProvider {
fn sealed_headers_while(
&self,
_range: impl RangeBounds<BlockNumber>,
_predicate: impl FnMut(&SealedHeader) -> bool,
range: impl RangeBounds<BlockNumber>,
predicate: impl FnMut(&SealedHeader) -> bool,
) -> ProviderResult<Vec<SealedHeader>> {
todo!()
self.fetch_range(
SnapshotSegment::Headers,
to_range(range),
|cursor, number| {
Ok(cursor
.get_two::<HeaderMask<Header, BlockHash>>(number.into())?
.map(|(header, hash)| header.seal(hash)))
},
predicate,
)
}
}
@@ -299,28 +434,63 @@ impl BlockHashReader for SnapshotProvider {
fn canonical_hashes_range(
&self,
_start: BlockNumber,
_end: BlockNumber,
start: BlockNumber,
end: BlockNumber,
) -> ProviderResult<Vec<B256>> {
todo!()
self.fetch_range(
SnapshotSegment::Headers,
start..end,
|cursor, number| cursor.get_one::<HeaderMask<BlockHash>>(number.into()),
|_| true,
)
}
}
impl BlockNumReader for SnapshotProvider {
fn chain_info(&self) -> ProviderResult<ChainInfo> {
todo!()
impl ReceiptProvider for SnapshotProvider {
fn receipt(&self, num: TxNumber) -> ProviderResult<Option<Receipt>> {
self.get_segment_provider_from_transaction(SnapshotSegment::Receipts, num, None)?
.receipt(num)
}
fn best_block_number(&self) -> ProviderResult<BlockNumber> {
todo!()
fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Receipt>> {
if let Some(num) = self.transaction_id(hash)? {
return self.receipt(num)
}
Ok(None)
}
fn last_block_number(&self) -> ProviderResult<BlockNumber> {
todo!()
fn receipts_by_block(&self, _block: BlockHashOrNumber) -> ProviderResult<Option<Vec<Receipt>>> {
unreachable!()
}
fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
todo!()
fn receipts_by_tx_range(
&self,
range: impl RangeBounds<TxNumber>,
) -> ProviderResult<Vec<Receipt>> {
self.fetch_range(
SnapshotSegment::Receipts,
to_range(range),
|cursor, number| cursor.get_one::<ReceiptMask<Receipt>>(number.into()),
|_| true,
)
}
}
impl TransactionsProviderExt for SnapshotProvider {
fn transaction_hashes_by_range(
&self,
tx_range: Range<TxNumber>,
) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
self.fetch_range(
SnapshotSegment::Transactions,
tx_range,
|cursor, number| {
let tx =
cursor.get_one::<TransactionMask<TransactionSignedNoHash>>(number.into())?;
Ok(tx.map(|tx| (tx.hash(), cursor.number())))
},
|_| true,
)
}
}
@@ -367,42 +537,150 @@ impl TransactionsProvider for SnapshotProvider {
&self,
_hash: TxHash,
) -> ProviderResult<Option<(TransactionSigned, TransactionMeta)>> {
todo!()
// Required data not present in snapshots
Err(ProviderError::UnsupportedProvider)
}
fn transaction_block(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
todo!()
// Required data not present in snapshots
Err(ProviderError::UnsupportedProvider)
}
fn transactions_by_block(
&self,
_block_id: BlockHashOrNumber,
) -> ProviderResult<Option<Vec<TransactionSigned>>> {
todo!()
// Required data not present in snapshots
Err(ProviderError::UnsupportedProvider)
}
fn transactions_by_block_range(
&self,
_range: impl RangeBounds<BlockNumber>,
) -> ProviderResult<Vec<Vec<TransactionSigned>>> {
todo!()
// Required data not present in snapshots
Err(ProviderError::UnsupportedProvider)
}
fn senders_by_tx_range(
&self,
_range: impl RangeBounds<TxNumber>,
range: impl RangeBounds<TxNumber>,
) -> ProviderResult<Vec<Address>> {
todo!()
let txes = self.transactions_by_tx_range(range)?;
TransactionSignedNoHash::recover_signers(&txes, txes.len())
.ok_or(ProviderError::SenderRecoveryError)
}
fn transactions_by_tx_range(
&self,
_range: impl RangeBounds<TxNumber>,
range: impl RangeBounds<TxNumber>,
) -> ProviderResult<Vec<reth_primitives::TransactionSignedNoHash>> {
todo!()
self.fetch_range(
SnapshotSegment::Transactions,
to_range(range),
|cursor, number| {
cursor.get_one::<TransactionMask<TransactionSignedNoHash>>(number.into())
},
|_| true,
)
}
fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
Ok(self.transaction_by_id_no_hash(id)?.and_then(|tx| tx.recover_signer()))
}
}
/* Cannot be successfully implemented but must exist for trait requirements */
impl BlockNumReader for SnapshotProvider {
fn chain_info(&self) -> ProviderResult<ChainInfo> {
// Required data not present in snapshots
Err(ProviderError::UnsupportedProvider)
}
fn best_block_number(&self) -> ProviderResult<BlockNumber> {
// Required data not present in snapshots
Err(ProviderError::UnsupportedProvider)
}
fn last_block_number(&self) -> ProviderResult<BlockNumber> {
// Required data not present in snapshots
Err(ProviderError::UnsupportedProvider)
}
fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
// Required data not present in snapshots
Err(ProviderError::UnsupportedProvider)
}
}
impl BlockReader for SnapshotProvider {
fn find_block_by_hash(
&self,
_hash: B256,
_source: BlockSource,
) -> ProviderResult<Option<Block>> {
// Required data not present in snapshots
Err(ProviderError::UnsupportedProvider)
}
fn block(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Block>> {
// Required data not present in snapshots
Err(ProviderError::UnsupportedProvider)
}
fn pending_block(&self) -> ProviderResult<Option<SealedBlock>> {
// Required data not present in snapshots
Err(ProviderError::UnsupportedProvider)
}
fn pending_block_with_senders(&self) -> ProviderResult<Option<SealedBlockWithSenders>> {
// Required data not present in snapshots
Err(ProviderError::UnsupportedProvider)
}
fn pending_block_and_receipts(&self) -> ProviderResult<Option<(SealedBlock, Vec<Receipt>)>> {
// Required data not present in snapshots
Err(ProviderError::UnsupportedProvider)
}
fn ommers(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Vec<Header>>> {
// Required data not present in snapshots
Err(ProviderError::UnsupportedProvider)
}
fn block_body_indices(&self, _num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
// Required data not present in snapshots
Err(ProviderError::UnsupportedProvider)
}
fn block_with_senders(
&self,
_id: BlockHashOrNumber,
_transaction_kind: TransactionVariant,
) -> ProviderResult<Option<BlockWithSenders>> {
// Required data not present in snapshots
Err(ProviderError::UnsupportedProvider)
}
fn block_range(&self, _range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Block>> {
// Required data not present in snapshots
Err(ProviderError::UnsupportedProvider)
}
}
impl WithdrawalsProvider for SnapshotProvider {
fn withdrawals_by_block(
&self,
_id: BlockHashOrNumber,
_timestamp: u64,
) -> ProviderResult<Option<Vec<Withdrawal>>> {
// Required data not present in snapshots
Err(ProviderError::UnsupportedProvider)
}
fn latest_withdrawal(&self) -> ProviderResult<Option<Withdrawal>> {
// Required data not present in snapshots
Err(ProviderError::UnsupportedProvider)
}
}

View File

@@ -7,7 +7,7 @@ pub use jar::SnapshotJarProvider;
use reth_interfaces::provider::ProviderResult;
use reth_nippy_jar::NippyJar;
use reth_primitives::{snapshot::SegmentHeader, SnapshotSegment};
use std::ops::Deref;
use std::{ops::Deref, sync::Arc};
/// Alias type for each specific `NippyJar`.
type LoadedJarRef<'a> = dashmap::mapref::one::Ref<'a, (u64, SnapshotSegment), LoadedJar>;
@@ -16,17 +16,17 @@ type LoadedJarRef<'a> = dashmap::mapref::one::Ref<'a, (u64, SnapshotSegment), Lo
#[derive(Debug)]
pub struct LoadedJar {
jar: NippyJar<SegmentHeader>,
mmap_handle: reth_nippy_jar::MmapHandle,
mmap_handle: Arc<reth_nippy_jar::DataReader>,
}
impl LoadedJar {
fn new(jar: NippyJar<SegmentHeader>) -> ProviderResult<Self> {
let mmap_handle = jar.open_data()?;
let mmap_handle = Arc::new(jar.open_data_reader()?);
Ok(Self { jar, mmap_handle })
}
/// Returns a clone of the mmap handle that can be used to instantiate a cursor.
fn mmap_handle(&self) -> reth_nippy_jar::MmapHandle {
fn mmap_handle(&self) -> Arc<reth_nippy_jar::DataReader> {
self.mmap_handle.clone()
}
}
@@ -131,7 +131,7 @@ mod test {
// Use providers to query Header data and compare if it matches
{
let db_provider = factory.provider().unwrap();
let manager = SnapshotProvider::new(snap_path.path());
let manager = SnapshotProvider::new(snap_path.path()).unwrap().with_filters();
let jar_provider = manager
.get_segment_provider_from_block(SnapshotSegment::Headers, 0, Some(&snap_file))
.unwrap();

View File

@@ -344,6 +344,13 @@ impl ReceiptProvider for MockEthProvider {
fn receipts_by_block(&self, _block: BlockHashOrNumber) -> ProviderResult<Option<Vec<Receipt>>> {
Ok(None)
}
fn receipts_by_tx_range(
&self,
_range: impl RangeBounds<TxNumber>,
) -> ProviderResult<Vec<Receipt>> {
Ok(vec![])
}
}
impl ReceiptProviderIdExt for MockEthProvider {}

View File

@@ -222,6 +222,13 @@ impl ReceiptProvider for NoopProvider {
fn receipts_by_block(&self, _block: BlockHashOrNumber) -> ProviderResult<Option<Vec<Receipt>>> {
Ok(None)
}
fn receipts_by_tx_range(
&self,
_range: impl RangeBounds<TxNumber>,
) -> ProviderResult<Vec<Receipt>> {
Ok(vec![])
}
}
impl ReceiptProviderIdExt for NoopProvider {}

View File

@@ -1,3 +1,5 @@
use std::ops::RangeBounds;
use reth_interfaces::provider::ProviderResult;
use reth_primitives::{BlockHashOrNumber, BlockId, BlockNumberOrTag, Receipt, TxHash, TxNumber};
@@ -20,6 +22,12 @@ pub trait ReceiptProvider: Send + Sync {
///
/// Returns `None` if the block is not found.
fn receipts_by_block(&self, block: BlockHashOrNumber) -> ProviderResult<Option<Vec<Receipt>>>;
/// Get receipts by tx range.
fn receipts_by_tx_range(
&self,
range: impl RangeBounds<TxNumber>,
) -> ProviderResult<Vec<Receipt>>;
}
/// Trait extension for `ReceiptProvider`, for types that implement `BlockId` conversion.