From 006259b4f696942dfa72def8662653d194dc0aac Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Fri, 27 Oct 2023 11:06:31 +0100 Subject: [PATCH] feat: move `ZstdDictionary` inside `NippyJar` and create a snapshot manager (#5139) Co-authored-by: Alexey Shekhirin --- Cargo.lock | 2 + bin/reth/src/db/snapshots/headers.rs | 27 +- bin/reth/src/db/snapshots/mod.rs | 26 +- bin/reth/src/db/snapshots/receipts.rs | 34 +-- bin/reth/src/db/snapshots/transactions.rs | 34 +-- crates/primitives/src/snapshot/mod.rs | 3 + crates/primitives/src/snapshot/segment.rs | 70 +++++- crates/snapshot/src/segments/mod.rs | 44 +--- crates/storage/nippy-jar/Cargo.toml | 1 + .../storage/nippy-jar/src/compression/zstd.rs | 191 +++++++++++--- crates/storage/nippy-jar/src/cursor.rs | 67 ++--- crates/storage/nippy-jar/src/error.rs | 4 + crates/storage/nippy-jar/src/lib.rs | 66 ++--- crates/storage/provider/Cargo.toml | 1 + crates/storage/provider/src/providers/mod.rs | 2 +- .../provider/src/providers/snapshot.rs | 238 +++++++++++++++--- 16 files changed, 538 insertions(+), 272 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1237826cac..8b683aa07f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6195,6 +6195,7 @@ dependencies = [ "bincode", "bytes", "cuckoofilter", + "derive_more", "hex", "lz4_flex", "memmap2 0.7.1", @@ -6287,6 +6288,7 @@ dependencies = [ "alloy-rlp", "assert_matches", "auto_impl", + "dashmap", "itertools 0.11.0", "parking_lot 0.12.1", "pin-project", diff --git a/bin/reth/src/db/snapshots/headers.rs b/bin/reth/src/db/snapshots/headers.rs index 7a9e813566..48661b218a 100644 --- a/bin/reth/src/db/snapshots/headers.rs +++ b/bin/reth/src/db/snapshots/headers.rs @@ -5,13 +5,14 @@ use super::{ use rand::{seq::SliceRandom, Rng}; use reth_db::{database::Database, open_db_read_only, table::Decompress}; use reth_interfaces::db::LogLevel; -use reth_nippy_jar::NippyJar; use reth_primitives::{ snapshot::{Compression, Filters, InclusionFilter, PerfectHashingFunction}, ChainSpec, Header, SnapshotSegment, }; -use reth_provider::{DatabaseProviderRO, HeaderProvider, ProviderError, ProviderFactory}; -use reth_snapshot::segments::{get_snapshot_segment_file_name, Headers, Segment}; +use reth_provider::{ + providers::SnapshotProvider, DatabaseProviderRO, HeaderProvider, ProviderError, ProviderFactory, +}; +use reth_snapshot::segments::{Headers, Segment}; use std::{path::Path, sync::Arc}; impl Command { @@ -54,20 +55,12 @@ impl Command { let mut row_indexes = range.clone().collect::>(); let mut rng = rand::thread_rng(); - let mut dictionaries = None; - let mut jar = NippyJar::load(&get_snapshot_segment_file_name( - SnapshotSegment::Headers, - filters, - compression, - &range, - ))?; - - let (provider, decompressors) = self.prepare_jar_provider(&mut jar, &mut dictionaries)?; - let mut cursor = if !decompressors.is_empty() { - provider.cursor_with_decompressors(decompressors) - } else { - provider.cursor() - }; + let path = + SnapshotSegment::Headers.filename_with_configuration(filters, compression, &range); + let provider = SnapshotProvider::default(); + let jar_provider = + provider.get_segment_provider(SnapshotSegment::Headers, self.from, Some(path))?; + let mut cursor = jar_provider.cursor()?; for bench_kind in [BenchKind::Walk, BenchKind::RandomAll] { bench( diff --git a/bin/reth/src/db/snapshots/mod.rs b/bin/reth/src/db/snapshots/mod.rs index 1e663321e1..efce487839 100644 --- a/bin/reth/src/db/snapshots/mod.rs +++ b/bin/reth/src/db/snapshots/mod.rs @@ -2,15 +2,11 @@ use clap::Parser; use itertools::Itertools; use reth_db::{open_db_read_only, DatabaseEnvRO}; use reth_interfaces::db::LogLevel; -use reth_nippy_jar::{ - compression::{DecoderDictionary, Decompressor}, - NippyJar, -}; use reth_primitives::{ - snapshot::{Compression, InclusionFilter, PerfectHashingFunction, SegmentHeader}, + snapshot::{Compression, InclusionFilter, PerfectHashingFunction}, BlockNumber, ChainSpec, SnapshotSegment, }; -use reth_provider::{providers::SnapshotProvider, ProviderFactory}; +use reth_provider::ProviderFactory; use std::{path::Path, sync::Arc}; mod bench; @@ -134,22 +130,4 @@ impl Command { Ok(()) } - - /// Returns a [`SnapshotProvider`] of the provided [`NippyJar`], alongside a list of - /// [`DecoderDictionary`] and [`Decompressor`] if necessary. - fn prepare_jar_provider<'a>( - &self, - jar: &'a mut NippyJar, - dictionaries: &'a mut Option>>, - ) -> eyre::Result<(SnapshotProvider<'a>, Vec>)> { - let mut decompressors: Vec> = vec![]; - if let Some(reth_nippy_jar::compression::Compressors::Zstd(zstd)) = jar.compressor_mut() { - if zstd.use_dict { - *dictionaries = zstd.generate_decompress_dictionaries(); - decompressors = zstd.generate_decompressors(dictionaries.as_ref().expect("qed"))?; - } - } - - Ok((SnapshotProvider { jar: &*jar }, decompressors)) - } } diff --git a/bin/reth/src/db/snapshots/receipts.rs b/bin/reth/src/db/snapshots/receipts.rs index b6b472170d..378f057e8f 100644 --- a/bin/reth/src/db/snapshots/receipts.rs +++ b/bin/reth/src/db/snapshots/receipts.rs @@ -5,19 +5,15 @@ use super::{ use rand::{seq::SliceRandom, Rng}; use reth_db::{database::Database, open_db_read_only, table::Decompress}; use reth_interfaces::db::LogLevel; -use reth_nippy_jar::NippyJar; use reth_primitives::{ snapshot::{Filters, InclusionFilter}, ChainSpec, Receipt, SnapshotSegment, }; use reth_provider::{ - DatabaseProviderRO, ProviderError, ProviderFactory, ReceiptProvider, TransactionsProvider, - TransactionsProviderExt, -}; -use reth_snapshot::{ - segments, - segments::{get_snapshot_segment_file_name, Segment}, + providers::SnapshotProvider, DatabaseProviderRO, ProviderError, ProviderFactory, + ReceiptProvider, TransactionsProvider, TransactionsProviderExt, }; +use reth_snapshot::{segments, segments::Segment}; use std::{path::Path, sync::Arc}; impl Command { @@ -59,26 +55,22 @@ impl Command { let block_range = self.from..=(self.from + self.block_interval - 1); let mut rng = rand::thread_rng(); - let mut dictionaries = None; - let mut jar = NippyJar::load(&get_snapshot_segment_file_name( - SnapshotSegment::Receipts, - filters, - compression, - &block_range, - ))?; let tx_range = ProviderFactory::new(open_db_read_only(db_path, log_level)?, chain.clone()) .provider()? - .transaction_range_by_block_range(block_range)?; + .transaction_range_by_block_range(block_range.clone())?; let mut row_indexes = tx_range.clone().collect::>(); - let (provider, decompressors) = self.prepare_jar_provider(&mut jar, &mut dictionaries)?; - let mut cursor = if !decompressors.is_empty() { - provider.cursor_with_decompressors(decompressors) - } else { - provider.cursor() - }; + let path = SnapshotSegment::Receipts.filename_with_configuration( + filters, + compression, + &block_range, + ); + let provider = SnapshotProvider::default(); + let jar_provider = + provider.get_segment_provider(SnapshotSegment::Receipts, self.from, Some(path))?; + let mut cursor = jar_provider.cursor()?; for bench_kind in [BenchKind::Walk, BenchKind::RandomAll] { bench( diff --git a/bin/reth/src/db/snapshots/transactions.rs b/bin/reth/src/db/snapshots/transactions.rs index 8c4544386b..2bee7e25b0 100644 --- a/bin/reth/src/db/snapshots/transactions.rs +++ b/bin/reth/src/db/snapshots/transactions.rs @@ -5,19 +5,15 @@ use super::{ use rand::{seq::SliceRandom, Rng}; use reth_db::{database::Database, open_db_read_only, table::Decompress}; use reth_interfaces::db::LogLevel; -use reth_nippy_jar::NippyJar; use reth_primitives::{ snapshot::{Filters, InclusionFilter}, ChainSpec, SnapshotSegment, TransactionSignedNoHash, }; use reth_provider::{ - DatabaseProviderRO, ProviderError, ProviderFactory, TransactionsProvider, - TransactionsProviderExt, -}; -use reth_snapshot::{ - segments, - segments::{get_snapshot_segment_file_name, Segment}, + providers::SnapshotProvider, DatabaseProviderRO, ProviderError, ProviderFactory, + TransactionsProvider, TransactionsProviderExt, }; +use reth_snapshot::{segments, segments::Segment}; use std::{path::Path, sync::Arc}; impl Command { @@ -59,26 +55,22 @@ impl Command { let block_range = self.from..=(self.from + self.block_interval - 1); let mut rng = rand::thread_rng(); - let mut dictionaries = None; - let mut jar = NippyJar::load(&get_snapshot_segment_file_name( - SnapshotSegment::Transactions, - filters, - compression, - &block_range, - ))?; let tx_range = ProviderFactory::new(open_db_read_only(db_path, log_level)?, chain.clone()) .provider()? - .transaction_range_by_block_range(block_range)?; + .transaction_range_by_block_range(block_range.clone())?; let mut row_indexes = tx_range.clone().collect::>(); - let (provider, decompressors) = self.prepare_jar_provider(&mut jar, &mut dictionaries)?; - let mut cursor = if !decompressors.is_empty() { - provider.cursor_with_decompressors(decompressors) - } else { - provider.cursor() - }; + let path = SnapshotSegment::Transactions.filename_with_configuration( + filters, + compression, + &block_range, + ); + let provider = SnapshotProvider::default(); + let jar_provider = + provider.get_segment_provider(SnapshotSegment::Transactions, self.from, Some(path))?; + let mut cursor = jar_provider.cursor()?; for bench_kind in [BenchKind::Walk, BenchKind::RandomAll] { bench( diff --git a/crates/primitives/src/snapshot/mod.rs b/crates/primitives/src/snapshot/mod.rs index a61dedfb3c..d8fc8db536 100644 --- a/crates/primitives/src/snapshot/mod.rs +++ b/crates/primitives/src/snapshot/mod.rs @@ -7,3 +7,6 @@ mod segment; pub use compression::Compression; pub use filters::{Filters, InclusionFilter, PerfectHashingFunction}; pub use segment::{SegmentHeader, SnapshotSegment}; + +/// Default snapshot block count. +pub const BLOCKS_PER_SNAPSHOT: u64 = 500_000; diff --git a/crates/primitives/src/snapshot/segment.rs b/crates/primitives/src/snapshot/segment.rs index cbd9ad432a..79d98fe0e8 100644 --- a/crates/primitives/src/snapshot/segment.rs +++ b/crates/primitives/src/snapshot/segment.rs @@ -1,6 +1,8 @@ -use crate::{BlockNumber, TxNumber}; +use crate::{snapshot::PerfectHashingFunction, BlockNumber, TxNumber}; use serde::{Deserialize, Serialize}; -use std::ops::RangeInclusive; +use std::{ops::RangeInclusive, path::PathBuf}; + +use super::{Compression, Filters, InclusionFilter}; #[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd, Deserialize, Serialize)] #[cfg_attr(feature = "clap", derive(clap::ValueEnum))] @@ -14,8 +16,70 @@ pub enum SnapshotSegment { Receipts, } +impl SnapshotSegment { + /// Returns the default configuration of the segment. + const fn config(&self) -> (Filters, Compression) { + let default_config = ( + Filters::WithFilters(InclusionFilter::Cuckoo, super::PerfectHashingFunction::Fmph), + Compression::Lz4, + ); + + match self { + SnapshotSegment::Headers => default_config, + SnapshotSegment::Transactions => default_config, + SnapshotSegment::Receipts => default_config, + } + } + + /// Returns the default file name for the provided segment and range. + pub fn filename(&self, range: &RangeInclusive) -> PathBuf { + let (filters, compression) = self.config(); + self.filename_with_configuration(filters, compression, range) + } + + /// Returns file name for the provided segment, filters, compression and range. + pub fn filename_with_configuration( + &self, + filters: Filters, + compression: Compression, + range: &RangeInclusive, + ) -> PathBuf { + let segment_name = match self { + SnapshotSegment::Headers => "headers", + SnapshotSegment::Transactions => "transactions", + SnapshotSegment::Receipts => "receipts", + }; + let filters_name = match filters { + Filters::WithFilters(inclusion_filter, phf) => { + let inclusion_filter = match inclusion_filter { + InclusionFilter::Cuckoo => "cuckoo", + }; + let phf = match phf { + PerfectHashingFunction::Fmph => "fmph", + PerfectHashingFunction::GoFmph => "gofmph", + }; + format!("{inclusion_filter}-{phf}") + } + Filters::WithoutFilters => "none".to_string(), + }; + let compression_name = match compression { + Compression::Lz4 => "lz4", + Compression::Zstd => "zstd", + Compression::ZstdWithDictionary => "zstd-dict", + Compression::Uncompressed => "uncompressed", + }; + + format!( + "snapshot_{segment_name}_{}_{}_{filters_name}_{compression_name}", + range.start(), + range.end(), + ) + .into() + } +} + /// A segment header that contains information common to all segments. Used for storage. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Hash)] pub struct SegmentHeader { block_range: RangeInclusive, tx_range: RangeInclusive, diff --git a/crates/snapshot/src/segments/mod.rs b/crates/snapshot/src/segments/mod.rs index 8d649115d0..991677a781 100644 --- a/crates/snapshot/src/segments/mod.rs +++ b/crates/snapshot/src/segments/mod.rs @@ -19,7 +19,7 @@ use reth_primitives::{ BlockNumber, SnapshotSegment, }; use reth_provider::{DatabaseProviderRO, TransactionsProviderExt}; -use std::{ops::RangeInclusive, path::PathBuf}; +use std::ops::RangeInclusive; pub(crate) type Rows = [Vec>; COLUMNS]; @@ -61,7 +61,7 @@ pub(crate) fn prepare_jar( let tx_range = provider.transaction_range_by_block_range(block_range.clone())?; let mut nippy_jar = NippyJar::new( COLUMNS, - &get_snapshot_segment_file_name(segment, filters, compression, &block_range), + &segment.filename_with_configuration(filters, compression, &block_range), SegmentHeader::new(block_range, tx_range), ); @@ -90,43 +90,3 @@ pub(crate) fn prepare_jar( Ok(nippy_jar) } - -/// Returns file name for the provided segment, filters, compression and range. -pub fn get_snapshot_segment_file_name( - segment: SnapshotSegment, - filters: Filters, - compression: Compression, - range: &RangeInclusive, -) -> PathBuf { - let segment_name = match segment { - SnapshotSegment::Headers => "headers", - SnapshotSegment::Transactions => "transactions", - SnapshotSegment::Receipts => "receipts", - }; - let filters_name = match filters { - Filters::WithFilters(inclusion_filter, phf) => { - let inclusion_filter = match inclusion_filter { - InclusionFilter::Cuckoo => "cuckoo", - }; - let phf = match phf { - PerfectHashingFunction::Fmph => "fmph", - PerfectHashingFunction::GoFmph => "gofmph", - }; - format!("{inclusion_filter}-{phf}") - } - Filters::WithoutFilters => "none".to_string(), - }; - let compression_name = match compression { - Compression::Lz4 => "lz4", - Compression::Zstd => "zstd", - Compression::ZstdWithDictionary => "zstd-dict", - Compression::Uncompressed => "uncompressed", - }; - - format!( - "snapshot_{segment_name}_{}_{}_{filters_name}_{compression_name}", - range.start(), - range.end(), - ) - .into() -} diff --git a/crates/storage/nippy-jar/Cargo.toml b/crates/storage/nippy-jar/Cargo.toml index 9be19824a6..4a2fc967bd 100644 --- a/crates/storage/nippy-jar/Cargo.toml +++ b/crates/storage/nippy-jar/Cargo.toml @@ -34,6 +34,7 @@ tracing-appender = "0.2" anyhow = "1.0" thiserror.workspace = true hex = "*" +derive_more = "0.99" [dev-dependencies] rand = { version = "0.8", features = ["small_rng"] } diff --git a/crates/storage/nippy-jar/src/compression/zstd.rs b/crates/storage/nippy-jar/src/compression/zstd.rs index df1182f283..a70d566ca4 100644 --- a/crates/storage/nippy-jar/src/compression/zstd.rs +++ b/crates/storage/nippy-jar/src/compression/zstd.rs @@ -1,8 +1,10 @@ use crate::{compression::Compression, NippyJarError}; -use serde::{Deserialize, Serialize}; +use derive_more::Deref; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::{ fs::File, io::{Read, Write}, + sync::Arc, }; use tracing::*; use zstd::bulk::Compressor; @@ -17,7 +19,8 @@ pub enum ZstdState { Ready, } -#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[cfg_attr(test, derive(PartialEq))] +#[derive(Debug, Serialize, Deserialize)] /// Zstd compression structure. Supports a compression dictionary per column. pub struct Zstd { /// State. Should be ready before compressing. @@ -29,7 +32,8 @@ pub struct Zstd { /// Max size of a dictionary pub(crate) max_dict_size: usize, /// List of column dictionaries. - pub(crate) raw_dictionaries: Option>, + #[serde(with = "dictionaries_serde")] + pub(crate) dictionaries: Option>>, /// Number of columns to compress. columns: usize, } @@ -42,7 +46,7 @@ impl Zstd { level: 0, use_dict, max_dict_size, - raw_dictionaries: None, + dictionaries: None, columns, } } @@ -52,31 +56,18 @@ impl Zstd { self } - /// If using dictionaries, creates a list of [`DecoderDictionary`]. - /// - /// Consumes `self.raw_dictionaries` in the process. - pub fn generate_decompress_dictionaries<'a>(&mut self) -> Option>> { - self.raw_dictionaries.take().map(|dicts| { - // TODO Can we use ::new instead, and avoid consuming? - dicts.iter().map(|dict| DecoderDictionary::copy(dict)).collect() - }) - } + /// Creates a list of [`Decompressor`] if using dictionaries. + pub fn decompressors(&self) -> Result>, NippyJarError> { + if let Some(dictionaries) = &self.dictionaries { + debug_assert!(dictionaries.len() == self.columns); + return dictionaries.decompressors() + } - /// Creates a list of [`Decompressor`] using the given dictionaries. - pub fn generate_decompressors<'a>( - &self, - dictionaries: &'a [DecoderDictionary<'a>], - ) -> Result>, NippyJarError> { - debug_assert!(dictionaries.len() == self.columns); - - Ok(dictionaries - .iter() - .map(Decompressor::with_prepared_dictionary) - .collect::, _>>()?) + Ok(vec![]) } /// If using dictionaries, creates a list of [`Compressor`]. - pub fn generate_compressors<'a>(&self) -> Result>>, NippyJarError> { + pub fn compressors(&self) -> Result>>, NippyJarError> { match self.state { ZstdState::PendingDictionary => Err(NippyJarError::CompressorNotReady), ZstdState::Ready => { @@ -84,18 +75,11 @@ impl Zstd { return Ok(None) } - let mut compressors = None; - if let Some(dictionaries) = &self.raw_dictionaries { + if let Some(dictionaries) = &self.dictionaries { debug!(target: "nippy-jar", count=?dictionaries.len(), "Generating ZSTD compressor dictionaries."); - - let mut cmp = Vec::with_capacity(dictionaries.len()); - - for dict in dictionaries { - cmp.push(Compressor::with_dictionary(0, dict)?); - } - compressors = Some(cmp) + return Ok(Some(dictionaries.compressors()?)) } - Ok(compressors) + Ok(None) } } } @@ -243,9 +227,144 @@ impl Compression for Zstd { debug_assert_eq!(dictionaries.len(), self.columns); - self.raw_dictionaries = Some(dictionaries); + self.dictionaries = Some(Arc::new(ZstdDictionaries::new(dictionaries))); self.state = ZstdState::Ready; Ok(()) } } + +mod dictionaries_serde { + use super::*; + + pub fn serialize( + dictionaries: &Option>>, + serializer: S, + ) -> Result + where + S: Serializer, + { + match dictionaries { + Some(dicts) => serializer.serialize_some(dicts.as_ref()), + None => serializer.serialize_none(), + } + } + + pub fn deserialize<'de, D>( + deserializer: D, + ) -> Result>>, D::Error> + where + D: Deserializer<'de>, + { + let dictionaries: Option> = Option::deserialize(deserializer)?; + Ok(dictionaries.map(|dicts| Arc::new(ZstdDictionaries::load(dicts)))) + } +} + +/// List of [`ZstdDictionary`] +#[cfg_attr(test, derive(PartialEq))] +#[derive(Serialize, Deserialize, Deref)] +pub struct ZstdDictionaries<'a>(Vec>); + +impl<'a> std::fmt::Debug for ZstdDictionaries<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ZstdDictionaries").field("num", &self.len()).finish_non_exhaustive() + } +} + +impl<'a> ZstdDictionaries<'a> { + /// Creates [`ZstdDictionaries`]. + pub fn new(raw: Vec) -> Self { + Self(raw.into_iter().map(ZstdDictionary::Raw).collect()) + } + + /// Loads a list [`RawDictionary`] into a list of [`ZstdDictionary::Loaded`]. + pub fn load(raw: Vec) -> Self { + Self( + raw.into_iter() + .map(|dict| ZstdDictionary::Loaded(DecoderDictionary::copy(&dict))) + .collect(), + ) + } + + /// Creates a list of decompressors from a list of [`ZstdDictionary::Loaded`]. + pub fn decompressors(&self) -> Result>, NippyJarError> { + Ok(self + .iter() + .flat_map(|dict| { + dict.loaded() + .ok_or(NippyJarError::DictionaryNotLoaded) + .map(Decompressor::with_prepared_dictionary) + }) + .collect::, _>>()?) + } + + /// Creates a list of compressors from a list of [`ZstdDictionary::Raw`]. + pub fn compressors(&self) -> Result>, NippyJarError> { + Ok(self + .iter() + .flat_map(|dict| { + dict.raw() + .ok_or(NippyJarError::CompressorNotAllowed) + .map(|dict| Compressor::with_dictionary(0, dict)) + }) + .collect::, _>>()?) + } +} + +/// A Zstd dictionary. It's created and serialized with [`ZstdDictionary::Raw`], and deserialized as +/// [`ZstdDictionary::Loaded`]. +pub enum ZstdDictionary<'a> { + Raw(RawDictionary), + Loaded(DecoderDictionary<'a>), +} + +impl<'a> ZstdDictionary<'a> { + /// Returns a reference to the expected `RawDictionary` + pub fn raw(&self) -> Option<&RawDictionary> { + match self { + ZstdDictionary::Raw(dict) => Some(dict), + ZstdDictionary::Loaded(_) => None, + } + } + + /// Returns a reference to the expected `DecoderDictionary` + pub fn loaded(&self) -> Option<&DecoderDictionary<'_>> { + match self { + ZstdDictionary::Raw(_) => None, + ZstdDictionary::Loaded(dict) => Some(dict), + } + } +} + +impl<'de, 'a> Deserialize<'de> for ZstdDictionary<'a> { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let dict = RawDictionary::deserialize(deserializer)?; + Ok(Self::Loaded(DecoderDictionary::copy(&dict))) + } +} + +impl<'a> Serialize for ZstdDictionary<'a> { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match self { + ZstdDictionary::Raw(r) => r.serialize(serializer), + ZstdDictionary::Loaded(_) => unreachable!(), + } + } +} + +#[cfg(test)] +impl<'a> PartialEq for ZstdDictionary<'a> { + fn eq(&self, other: &Self) -> bool { + if let (Self::Raw(a), Self::Raw(b)) = (self, &other) { + return a == b + } + unimplemented!("`DecoderDictionary` can't be compared. So comparison should be done after decompressing a value."); + } +} diff --git a/crates/storage/nippy-jar/src/cursor.rs b/crates/storage/nippy-jar/src/cursor.rs index 19e39fa0cd..ba876ccaee 100644 --- a/crates/storage/nippy-jar/src/cursor.rs +++ b/crates/storage/nippy-jar/src/cursor.rs @@ -1,24 +1,23 @@ use crate::{ - compression::{Compression, Zstd}, + compression::{Compression, Compressors, Zstd}, InclusionFilter, NippyJar, NippyJarError, PerfectHashingFunction, RefRow, }; use memmap2::Mmap; use serde::{de::Deserialize, ser::Serialize}; -use std::{fs::File, ops::Range}; +use std::{fs::File, ops::Range, sync::Arc}; use sucds::int_vectors::Access; use zstd::bulk::Decompressor; /// Simple cursor implementation to retrieve data from [`NippyJar`]. +#[derive(Clone)] pub struct NippyJarCursor<'a, H = ()> { /// [`NippyJar`] which holds most of the required configuration to read from the file. jar: &'a NippyJar, - /// Optional dictionary decompressors. - zstd_decompressors: Option>>, /// Data file. #[allow(unused)] - file_handle: File, + file_handle: Arc, /// Data file. - mmap_handle: Mmap, + mmap_handle: Arc, /// Internal buffer to unload data to without reallocating memory on each retrieval. internal_buffer: Vec, /// Cursor row position. @@ -36,24 +35,21 @@ where impl<'a, H> NippyJarCursor<'a, H> where - H: Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug, + H: Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static, { - pub fn new( - jar: &'a NippyJar, - zstd_decompressors: Option>>, - ) -> Result { + pub fn new(jar: &'a NippyJar) -> Result { let file = File::open(jar.data_path())?; // SAFETY: File is read-only and its descriptor is kept alive as long as the mmap handle. let mmap = unsafe { Mmap::map(&file)? }; + let max_row_size = jar.max_row_size; Ok(NippyJarCursor { jar, - zstd_decompressors, - file_handle: file, - mmap_handle: mmap, + file_handle: Arc::new(file), + mmap_handle: Arc::new(mmap), // Makes sure that we have enough buffer capacity to decompress any row of data. - internal_buffer: Vec::with_capacity(jar.max_row_size), + internal_buffer: Vec::with_capacity(max_row_size), row: 0, }) } @@ -218,23 +214,32 @@ where value_offset..next_value_offset }; - if let Some(zstd_dict_decompressors) = self.zstd_decompressors.as_mut() { - let from: usize = self.internal_buffer.len(); - if let Some(decompressor) = zstd_dict_decompressors.get_mut(column) { - Zstd::decompress_with_dictionary( - &self.mmap_handle[column_offset_range], - &mut self.internal_buffer, - decompressor, - )?; - } - let to = self.internal_buffer.len(); - - row.push(ValueRange::Internal(from..to)); - } else if let Some(compression) = self.jar.compressor() { - // Uses the chosen default decompressor + if let Some(compression) = self.jar.compressor() { let from = self.internal_buffer.len(); - compression - .decompress_to(&self.mmap_handle[column_offset_range], &mut self.internal_buffer)?; + match compression { + Compressors::Zstd(z) if z.use_dict => { + // If we are here, then for sure we have the necessary dictionaries and they're + // loaded (happens during deserialization). Otherwise, there's an issue + // somewhere else and we can't recover here anyway. + let dictionaries = z.dictionaries.as_ref().expect("dictionaries to exist") + [column] + .loaded() + .expect("dictionary to be loaded"); + let mut decompressor = Decompressor::with_prepared_dictionary(dictionaries)?; + Zstd::decompress_with_dictionary( + &self.mmap_handle[column_offset_range], + &mut self.internal_buffer, + &mut decompressor, + )?; + } + _ => { + // Uses the chosen default decompressor + compression.decompress_to( + &self.mmap_handle[column_offset_range], + &mut self.internal_buffer, + )?; + } + } let to = self.internal_buffer.len(); row.push(ValueRange::Internal(from..to)); diff --git a/crates/storage/nippy-jar/src/error.rs b/crates/storage/nippy-jar/src/error.rs index 20513c414a..dbb37c1f86 100644 --- a/crates/storage/nippy-jar/src/error.rs +++ b/crates/storage/nippy-jar/src/error.rs @@ -37,4 +37,8 @@ pub enum NippyJarError { UnsupportedFilterQuery, #[error("compression or decompression requires a bigger destination output")] OutputTooSmall, + #[error("Dictionary is not loaded.")] + DictionaryNotLoaded, + #[error("It's not possible to generate a compressor after loading a dictionary.")] + CompressorNotAllowed, } diff --git a/crates/storage/nippy-jar/src/lib.rs b/crates/storage/nippy-jar/src/lib.rs index 69e348ec38..036356946e 100644 --- a/crates/storage/nippy-jar/src/lib.rs +++ b/crates/storage/nippy-jar/src/lib.rs @@ -323,7 +323,7 @@ where // implementation let mut maybe_zstd_compressors = None; if let Some(Compressors::Zstd(zstd)) = &self.compressor { - maybe_zstd_compressors = zstd.generate_compressors()?; + maybe_zstd_compressors = zstd.compressors()?; } // Temporary buffer to avoid multiple reallocations if compressing to a buffer (eg. zstd w/ @@ -394,6 +394,9 @@ where column_iterators = iterators.into_iter(); } + // drops immutable borrow + drop(maybe_zstd_compressors); + // Write offsets and offset index to file self.freeze_offsets(offsets)?; @@ -622,7 +625,7 @@ mod tests { assert!(nippy.compressor().is_some()); if let Some(Compressors::Zstd(zstd)) = &mut nippy.compressor_mut() { - assert!(matches!(zstd.generate_compressors(), Err(NippyJarError::CompressorNotReady))); + assert!(matches!(zstd.compressors(), Err(NippyJarError::CompressorNotReady))); // Make sure the number of column iterators match the initial set up ones. assert!(matches!( @@ -642,27 +645,26 @@ mod tests { if let Some(Compressors::Zstd(zstd)) = &nippy.compressor() { assert!(matches!( - (&zstd.state, zstd.raw_dictionaries.as_ref().map(|dict| dict.len())), + (&zstd.state, zstd.dictionaries.as_ref().map(|dict| dict.len())), (compression::ZstdState::Ready, Some(columns)) if columns == num_columns )); } nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows).unwrap(); - let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap(); - assert_eq!(nippy, loaded_nippy); - - let mut dicts = vec![]; - if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor_mut() { - dicts = zstd.generate_decompress_dictionaries().unwrap() - } + let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap(); + assert_eq!(nippy.version, loaded_nippy.version); + assert_eq!(nippy.columns, loaded_nippy.columns); + assert_eq!(nippy.filter, loaded_nippy.filter); + assert_eq!(nippy.phf, loaded_nippy.phf); + assert_eq!(nippy.offsets_index, loaded_nippy.offsets_index); + assert_eq!(nippy.offsets, loaded_nippy.offsets); + assert_eq!(nippy.max_row_size, loaded_nippy.max_row_size); + assert_eq!(nippy.path, loaded_nippy.path); if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() { - let mut cursor = NippyJarCursor::new( - &loaded_nippy, - Some(zstd.generate_decompressors(&dicts).unwrap()), - ) - .unwrap(); + assert!(zstd.use_dict); + let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap(); // Iterate over compressed values and compare let mut row_index = 0usize; @@ -673,6 +675,8 @@ mod tests { ); row_index += 1; } + } else { + panic!("Expected Zstd compressor") } } @@ -695,7 +699,7 @@ mod tests { assert_eq!(nippy, loaded_nippy); if let Some(Compressors::Lz4(_)) = loaded_nippy.compressor() { - let mut cursor = NippyJarCursor::new(&loaded_nippy, None).unwrap(); + let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap(); // Iterate over compressed values and compare let mut row_index = 0usize; @@ -733,7 +737,7 @@ mod tests { if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() { assert!(!zstd.use_dict); - let mut cursor = NippyJarCursor::new(&loaded_nippy, None).unwrap(); + let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap(); // Iterate over compressed values and compare let mut row_index = 0usize; @@ -782,23 +786,15 @@ mod tests { // Read file { - let mut loaded_nippy = NippyJar::::load(file_path.path()).unwrap(); + let loaded_nippy = NippyJar::::load(file_path.path()).unwrap(); assert!(loaded_nippy.compressor().is_some()); assert!(loaded_nippy.filter.is_some()); assert!(loaded_nippy.phf.is_some()); assert_eq!(loaded_nippy.user_header().block_start, block_start); - let mut dicts = vec![]; - if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor_mut() { - dicts = zstd.generate_decompress_dictionaries().unwrap() - } - if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() { - let mut cursor = NippyJarCursor::new( - &loaded_nippy, - Some(zstd.generate_decompressors(&dicts).unwrap()), - ) - .unwrap(); + if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() { + let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap(); // Iterate over compressed values and compare let mut row_num = 0usize; @@ -860,18 +856,10 @@ mod tests { // Read file { - let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap(); + let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap(); - let mut dicts = vec![]; - if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor_mut() { - dicts = zstd.generate_decompress_dictionaries().unwrap() - } - if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() { - let mut cursor = NippyJarCursor::new( - &loaded_nippy, - Some(zstd.generate_decompressors(&dicts).unwrap()), - ) - .unwrap(); + if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() { + let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap(); // Shuffled for chaos. let mut data = col1.iter().zip(col2.iter()).enumerate().collect::>(); diff --git a/crates/storage/provider/Cargo.toml b/crates/storage/provider/Cargo.toml index c8c7b4fcb2..d37a5b6e3a 100644 --- a/crates/storage/provider/Cargo.toml +++ b/crates/storage/provider/Cargo.toml @@ -30,6 +30,7 @@ auto_impl = "1.0" itertools.workspace = true pin-project.workspace = true parking_lot.workspace = true +dashmap = { version = "5.5", features = ["inline"] } # test-utils alloy-rlp = { workspace = true, optional = true } diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index bc0704f076..17041d2f2d 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -37,7 +37,7 @@ mod bundle_state_provider; mod chain_info; mod database; mod snapshot; -pub use snapshot::SnapshotProvider; +pub use snapshot::{SnapshotJarProvider, SnapshotProvider}; mod state; use crate::{providers::chain_info::ChainInfoTracker, traits::BlockSource}; pub use bundle_state_provider::BundleStateProvider; diff --git a/crates/storage/provider/src/providers/snapshot.rs b/crates/storage/provider/src/providers/snapshot.rs index 6c81965d75..8087eb159c 100644 --- a/crates/storage/provider/src/providers/snapshot.rs +++ b/crates/storage/provider/src/providers/snapshot.rs @@ -1,47 +1,210 @@ use crate::{BlockHashReader, BlockNumReader, HeaderProvider, TransactionsProvider}; +use dashmap::DashMap; use reth_db::{ table::{Decompress, Table}, HeaderTD, }; use reth_interfaces::{provider::ProviderError, RethResult}; -use reth_nippy_jar::{compression::Decompressor, NippyJar, NippyJarCursor}; +use reth_nippy_jar::{NippyJar, NippyJarCursor}; use reth_primitives::{ - snapshot::SegmentHeader, Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, Header, - SealedHeader, TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber, + snapshot::{SegmentHeader, BLOCKS_PER_SNAPSHOT}, + Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, Header, SealedHeader, + SnapshotSegment, TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber, B256, U256, }; -use std::ops::RangeBounds; +use std::{ops::RangeBounds, path::PathBuf}; + +/// Alias type for each specific `NippyJar`. +type NippyJarRef<'a> = + dashmap::mapref::one::Ref<'a, (u64, SnapshotSegment), NippyJar>; /// SnapshotProvider -/// -/// WIP Rudimentary impl just for tests -/// TODO: should be able to walk through snapshot files/block_ranges -/// TODO: Arc over NippyJars and/or NippyJarCursors (LRU) -#[derive(Debug)] -pub struct SnapshotProvider<'a> { - /// NippyJar - pub jar: &'a NippyJar, +#[derive(Debug, Default)] +pub struct SnapshotProvider { + /// Maintains a map which allows for concurrent access to different `NippyJars`, over different + /// segments and ranges. + map: DashMap<(BlockNumber, SnapshotSegment), NippyJar>, } -impl<'a> SnapshotProvider<'a> { - /// Creates cursor - pub fn cursor(&self) -> NippyJarCursor<'a, SegmentHeader> { - NippyJarCursor::new(self.jar, None).unwrap() - } - - /// Creates cursor with zstd decompressors - pub fn cursor_with_decompressors( +impl SnapshotProvider { + /// Gets the provider of the requested segment and range. + pub fn get_segment_provider( &self, - decompressors: Vec>, - ) -> NippyJarCursor<'a, SegmentHeader> { - NippyJarCursor::new(self.jar, Some(decompressors)).unwrap() + segment: SnapshotSegment, + block: BlockNumber, + mut path: Option, + ) -> RethResult> { + // TODO this invalidates custom length snapshots. + let snapshot = block / BLOCKS_PER_SNAPSHOT; + let key = (snapshot, segment); + + if let Some(jar) = self.map.get(&key) { + return Ok(SnapshotJarProvider { jar }) + } + + if let Some(path) = &path { + let jar = NippyJar::load(path)?; + self.map.insert(key, jar); + } else { + path = Some(segment.filename( + &((snapshot * BLOCKS_PER_SNAPSHOT)..=((snapshot + 1) * BLOCKS_PER_SNAPSHOT - 1)), + )); + } + + self.get_segment_provider(segment, block, path) } } -impl<'a> HeaderProvider for SnapshotProvider<'a> { +impl HeaderProvider for SnapshotProvider { + fn header(&self, _block_hash: &BlockHash) -> RethResult> { + todo!() + } + + fn header_by_number(&self, num: BlockNumber) -> RethResult> { + self.get_segment_provider(SnapshotSegment::Headers, num, None)?.header_by_number(num) + } + + fn header_td(&self, _block_hash: &BlockHash) -> RethResult> { + todo!() + } + + fn header_td_by_number(&self, _number: BlockNumber) -> RethResult> { + todo!(); + } + + fn headers_range(&self, _range: impl RangeBounds) -> RethResult> { + todo!(); + } + + fn sealed_headers_range( + &self, + _range: impl RangeBounds, + ) -> RethResult> { + todo!(); + } + + fn sealed_header(&self, _number: BlockNumber) -> RethResult> { + todo!(); + } +} + +impl BlockHashReader for SnapshotProvider { + fn block_hash(&self, _number: u64) -> RethResult> { + todo!() + } + + fn canonical_hashes_range( + &self, + _start: BlockNumber, + _end: BlockNumber, + ) -> RethResult> { + todo!() + } +} + +impl BlockNumReader for SnapshotProvider { + fn chain_info(&self) -> RethResult { + todo!() + } + + fn best_block_number(&self) -> RethResult { + todo!() + } + + fn last_block_number(&self) -> RethResult { + todo!() + } + + fn block_number(&self, _hash: B256) -> RethResult> { + todo!() + } +} + +impl TransactionsProvider for SnapshotProvider { + fn transaction_id(&self, _tx_hash: TxHash) -> RethResult> { + todo!() + } + + fn transaction_by_id(&self, num: TxNumber) -> RethResult> { + // TODO `num` is provided after checking the index + let block_num = num; + self.get_segment_provider(SnapshotSegment::Transactions, block_num, None)? + .transaction_by_id(num) + } + + fn transaction_by_id_no_hash( + &self, + _id: TxNumber, + ) -> RethResult> { + todo!() + } + + fn transaction_by_hash(&self, _hash: TxHash) -> RethResult> { + todo!() + } + + fn transaction_by_hash_with_meta( + &self, + _hash: TxHash, + ) -> RethResult> { + todo!() + } + + fn transaction_block(&self, _id: TxNumber) -> RethResult> { + todo!() + } + + fn transactions_by_block( + &self, + _block_id: BlockHashOrNumber, + ) -> RethResult>> { + todo!() + } + + fn transactions_by_block_range( + &self, + _range: impl RangeBounds, + ) -> RethResult>> { + todo!() + } + + fn senders_by_tx_range(&self, _range: impl RangeBounds) -> RethResult> { + todo!() + } + + fn transactions_by_tx_range( + &self, + _range: impl RangeBounds, + ) -> RethResult> { + todo!() + } + + fn transaction_sender(&self, _id: TxNumber) -> RethResult> { + todo!() + } +} + +/// Provider over a specific `NippyJar` and range. +#[derive(Debug)] +pub struct SnapshotJarProvider<'a> { + /// Reference to a value on [`SnapshotProvider`] + pub jar: NippyJarRef<'a>, +} + +impl<'a> SnapshotJarProvider<'a> { + /// Provides a cursor for more granular data access. + pub fn cursor<'b>(&'b self) -> RethResult> + where + 'b: 'a, + { + Ok(NippyJarCursor::new(self.jar.value())?) + } +} + +impl<'a> HeaderProvider for SnapshotJarProvider<'a> { fn header(&self, block_hash: &BlockHash) -> RethResult> { // WIP - let mut cursor = self.cursor(); + let mut cursor = NippyJarCursor::new(self.jar.value())?; let header = Header::decompress( cursor.row_by_key_with_cols::<0b01, 2>(&block_hash.0).unwrap().unwrap()[0], @@ -58,7 +221,7 @@ impl<'a> HeaderProvider for SnapshotProvider<'a> { fn header_by_number(&self, num: BlockNumber) -> RethResult> { Header::decompress( - self.cursor() + NippyJarCursor::new(self.jar.value())? .row_by_number_with_cols::<0b01, 2>( (num - self.jar.user_header().block_start()) as usize, )? @@ -70,7 +233,7 @@ impl<'a> HeaderProvider for SnapshotProvider<'a> { fn header_td(&self, block_hash: &BlockHash) -> RethResult> { // WIP - let mut cursor = self.cursor(); + let mut cursor = NippyJarCursor::new(self.jar.value())?; let row = cursor.row_by_key_with_cols::<0b11, 2>(&block_hash.0).unwrap().unwrap(); @@ -105,7 +268,7 @@ impl<'a> HeaderProvider for SnapshotProvider<'a> { } } -impl<'a> BlockHashReader for SnapshotProvider<'a> { +impl<'a> BlockHashReader for SnapshotJarProvider<'a> { fn block_hash(&self, _number: u64) -> RethResult> { todo!() } @@ -119,7 +282,7 @@ impl<'a> BlockHashReader for SnapshotProvider<'a> { } } -impl<'a> BlockNumReader for SnapshotProvider<'a> { +impl<'a> BlockNumReader for SnapshotJarProvider<'a> { fn chain_info(&self) -> RethResult { todo!() } @@ -137,14 +300,14 @@ impl<'a> BlockNumReader for SnapshotProvider<'a> { } } -impl<'a> TransactionsProvider for SnapshotProvider<'a> { +impl<'a> TransactionsProvider for SnapshotJarProvider<'a> { fn transaction_id(&self, _tx_hash: TxHash) -> RethResult> { todo!() } fn transaction_by_id(&self, num: TxNumber) -> RethResult> { TransactionSignedNoHash::decompress( - self.cursor() + NippyJarCursor::new(self.jar.value())? .row_by_number_with_cols::<0b1, 1>( (num - self.jar.user_header().tx_start()) as usize, )? @@ -164,7 +327,7 @@ impl<'a> TransactionsProvider for SnapshotProvider<'a> { fn transaction_by_hash(&self, hash: TxHash) -> RethResult> { // WIP - let mut cursor = self.cursor(); + let mut cursor = NippyJarCursor::new(self.jar.value())?; let tx = TransactionSignedNoHash::decompress( cursor.row_by_key_with_cols::<0b1, 1>(&hash.0).unwrap().unwrap()[0], @@ -315,10 +478,11 @@ mod test { // Use providers to query Header data and compare if it matches { - let jar = NippyJar::load(snap_file.path()).unwrap(); - let db_provider = factory.provider().unwrap(); - let snap_provider = SnapshotProvider { jar: &jar }; + let manager = SnapshotProvider::default(); + let jar_provider = manager + .get_segment_provider(SnapshotSegment::Headers, 0, Some(snap_file.path().into())) + .unwrap(); assert!(!headers.is_empty()); @@ -331,12 +495,12 @@ mod test { // Compare Header assert_eq!(header, db_provider.header(&header_hash).unwrap().unwrap()); - assert_eq!(header, snap_provider.header(&header_hash).unwrap().unwrap()); + assert_eq!(header, jar_provider.header(&header_hash).unwrap().unwrap()); // Compare HeaderTD assert_eq!( db_provider.header_td(&header_hash).unwrap().unwrap(), - snap_provider.header_td(&header_hash).unwrap().unwrap() + jar_provider.header_td(&header_hash).unwrap().unwrap() ); } }