feat: move ZstdDictionary inside NippyJar and create a snapshot manager (#5139)

Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
This commit is contained in:
joshieDo
2023-10-27 11:06:31 +01:00
committed by GitHub
parent d232c25a90
commit 006259b4f6
16 changed files with 538 additions and 272 deletions

2
Cargo.lock generated
View File

@@ -6195,6 +6195,7 @@ dependencies = [
"bincode",
"bytes",
"cuckoofilter",
"derive_more",
"hex",
"lz4_flex",
"memmap2 0.7.1",
@@ -6287,6 +6288,7 @@ dependencies = [
"alloy-rlp",
"assert_matches",
"auto_impl",
"dashmap",
"itertools 0.11.0",
"parking_lot 0.12.1",
"pin-project",

View File

@@ -5,13 +5,14 @@ use super::{
use rand::{seq::SliceRandom, Rng};
use reth_db::{database::Database, open_db_read_only, table::Decompress};
use reth_interfaces::db::LogLevel;
use reth_nippy_jar::NippyJar;
use reth_primitives::{
snapshot::{Compression, Filters, InclusionFilter, PerfectHashingFunction},
ChainSpec, Header, SnapshotSegment,
};
use reth_provider::{DatabaseProviderRO, HeaderProvider, ProviderError, ProviderFactory};
use reth_snapshot::segments::{get_snapshot_segment_file_name, Headers, Segment};
use reth_provider::{
providers::SnapshotProvider, DatabaseProviderRO, HeaderProvider, ProviderError, ProviderFactory,
};
use reth_snapshot::segments::{Headers, Segment};
use std::{path::Path, sync::Arc};
impl Command {
@@ -54,20 +55,12 @@ impl Command {
let mut row_indexes = range.clone().collect::<Vec<_>>();
let mut rng = rand::thread_rng();
let mut dictionaries = None;
let mut jar = NippyJar::load(&get_snapshot_segment_file_name(
SnapshotSegment::Headers,
filters,
compression,
&range,
))?;
let (provider, decompressors) = self.prepare_jar_provider(&mut jar, &mut dictionaries)?;
let mut cursor = if !decompressors.is_empty() {
provider.cursor_with_decompressors(decompressors)
} else {
provider.cursor()
};
let path =
SnapshotSegment::Headers.filename_with_configuration(filters, compression, &range);
let provider = SnapshotProvider::default();
let jar_provider =
provider.get_segment_provider(SnapshotSegment::Headers, self.from, Some(path))?;
let mut cursor = jar_provider.cursor()?;
for bench_kind in [BenchKind::Walk, BenchKind::RandomAll] {
bench(

View File

@@ -2,15 +2,11 @@ use clap::Parser;
use itertools::Itertools;
use reth_db::{open_db_read_only, DatabaseEnvRO};
use reth_interfaces::db::LogLevel;
use reth_nippy_jar::{
compression::{DecoderDictionary, Decompressor},
NippyJar,
};
use reth_primitives::{
snapshot::{Compression, InclusionFilter, PerfectHashingFunction, SegmentHeader},
snapshot::{Compression, InclusionFilter, PerfectHashingFunction},
BlockNumber, ChainSpec, SnapshotSegment,
};
use reth_provider::{providers::SnapshotProvider, ProviderFactory};
use reth_provider::ProviderFactory;
use std::{path::Path, sync::Arc};
mod bench;
@@ -134,22 +130,4 @@ impl Command {
Ok(())
}
/// Returns a [`SnapshotProvider`] of the provided [`NippyJar`], alongside a list of
/// [`DecoderDictionary`] and [`Decompressor`] if necessary.
fn prepare_jar_provider<'a>(
&self,
jar: &'a mut NippyJar<SegmentHeader>,
dictionaries: &'a mut Option<Vec<DecoderDictionary<'_>>>,
) -> eyre::Result<(SnapshotProvider<'a>, Vec<Decompressor<'a>>)> {
let mut decompressors: Vec<Decompressor<'_>> = vec![];
if let Some(reth_nippy_jar::compression::Compressors::Zstd(zstd)) = jar.compressor_mut() {
if zstd.use_dict {
*dictionaries = zstd.generate_decompress_dictionaries();
decompressors = zstd.generate_decompressors(dictionaries.as_ref().expect("qed"))?;
}
}
Ok((SnapshotProvider { jar: &*jar }, decompressors))
}
}

View File

@@ -5,19 +5,15 @@ use super::{
use rand::{seq::SliceRandom, Rng};
use reth_db::{database::Database, open_db_read_only, table::Decompress};
use reth_interfaces::db::LogLevel;
use reth_nippy_jar::NippyJar;
use reth_primitives::{
snapshot::{Filters, InclusionFilter},
ChainSpec, Receipt, SnapshotSegment,
};
use reth_provider::{
DatabaseProviderRO, ProviderError, ProviderFactory, ReceiptProvider, TransactionsProvider,
TransactionsProviderExt,
};
use reth_snapshot::{
segments,
segments::{get_snapshot_segment_file_name, Segment},
providers::SnapshotProvider, DatabaseProviderRO, ProviderError, ProviderFactory,
ReceiptProvider, TransactionsProvider, TransactionsProviderExt,
};
use reth_snapshot::{segments, segments::Segment};
use std::{path::Path, sync::Arc};
impl Command {
@@ -59,26 +55,22 @@ impl Command {
let block_range = self.from..=(self.from + self.block_interval - 1);
let mut rng = rand::thread_rng();
let mut dictionaries = None;
let mut jar = NippyJar::load(&get_snapshot_segment_file_name(
SnapshotSegment::Receipts,
filters,
compression,
&block_range,
))?;
let tx_range = ProviderFactory::new(open_db_read_only(db_path, log_level)?, chain.clone())
.provider()?
.transaction_range_by_block_range(block_range)?;
.transaction_range_by_block_range(block_range.clone())?;
let mut row_indexes = tx_range.clone().collect::<Vec<_>>();
let (provider, decompressors) = self.prepare_jar_provider(&mut jar, &mut dictionaries)?;
let mut cursor = if !decompressors.is_empty() {
provider.cursor_with_decompressors(decompressors)
} else {
provider.cursor()
};
let path = SnapshotSegment::Receipts.filename_with_configuration(
filters,
compression,
&block_range,
);
let provider = SnapshotProvider::default();
let jar_provider =
provider.get_segment_provider(SnapshotSegment::Receipts, self.from, Some(path))?;
let mut cursor = jar_provider.cursor()?;
for bench_kind in [BenchKind::Walk, BenchKind::RandomAll] {
bench(

View File

@@ -5,19 +5,15 @@ use super::{
use rand::{seq::SliceRandom, Rng};
use reth_db::{database::Database, open_db_read_only, table::Decompress};
use reth_interfaces::db::LogLevel;
use reth_nippy_jar::NippyJar;
use reth_primitives::{
snapshot::{Filters, InclusionFilter},
ChainSpec, SnapshotSegment, TransactionSignedNoHash,
};
use reth_provider::{
DatabaseProviderRO, ProviderError, ProviderFactory, TransactionsProvider,
TransactionsProviderExt,
};
use reth_snapshot::{
segments,
segments::{get_snapshot_segment_file_name, Segment},
providers::SnapshotProvider, DatabaseProviderRO, ProviderError, ProviderFactory,
TransactionsProvider, TransactionsProviderExt,
};
use reth_snapshot::{segments, segments::Segment};
use std::{path::Path, sync::Arc};
impl Command {
@@ -59,26 +55,22 @@ impl Command {
let block_range = self.from..=(self.from + self.block_interval - 1);
let mut rng = rand::thread_rng();
let mut dictionaries = None;
let mut jar = NippyJar::load(&get_snapshot_segment_file_name(
SnapshotSegment::Transactions,
filters,
compression,
&block_range,
))?;
let tx_range = ProviderFactory::new(open_db_read_only(db_path, log_level)?, chain.clone())
.provider()?
.transaction_range_by_block_range(block_range)?;
.transaction_range_by_block_range(block_range.clone())?;
let mut row_indexes = tx_range.clone().collect::<Vec<_>>();
let (provider, decompressors) = self.prepare_jar_provider(&mut jar, &mut dictionaries)?;
let mut cursor = if !decompressors.is_empty() {
provider.cursor_with_decompressors(decompressors)
} else {
provider.cursor()
};
let path = SnapshotSegment::Transactions.filename_with_configuration(
filters,
compression,
&block_range,
);
let provider = SnapshotProvider::default();
let jar_provider =
provider.get_segment_provider(SnapshotSegment::Transactions, self.from, Some(path))?;
let mut cursor = jar_provider.cursor()?;
for bench_kind in [BenchKind::Walk, BenchKind::RandomAll] {
bench(

View File

@@ -7,3 +7,6 @@ mod segment;
pub use compression::Compression;
pub use filters::{Filters, InclusionFilter, PerfectHashingFunction};
pub use segment::{SegmentHeader, SnapshotSegment};
/// Default snapshot block count.
pub const BLOCKS_PER_SNAPSHOT: u64 = 500_000;

View File

@@ -1,6 +1,8 @@
use crate::{BlockNumber, TxNumber};
use crate::{snapshot::PerfectHashingFunction, BlockNumber, TxNumber};
use serde::{Deserialize, Serialize};
use std::ops::RangeInclusive;
use std::{ops::RangeInclusive, path::PathBuf};
use super::{Compression, Filters, InclusionFilter};
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd, Deserialize, Serialize)]
#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
@@ -14,8 +16,70 @@ pub enum SnapshotSegment {
Receipts,
}
impl SnapshotSegment {
/// Returns the default configuration of the segment.
const fn config(&self) -> (Filters, Compression) {
let default_config = (
Filters::WithFilters(InclusionFilter::Cuckoo, super::PerfectHashingFunction::Fmph),
Compression::Lz4,
);
match self {
SnapshotSegment::Headers => default_config,
SnapshotSegment::Transactions => default_config,
SnapshotSegment::Receipts => default_config,
}
}
/// Returns the default file name for the provided segment and range.
pub fn filename(&self, range: &RangeInclusive<BlockNumber>) -> PathBuf {
let (filters, compression) = self.config();
self.filename_with_configuration(filters, compression, range)
}
/// Returns file name for the provided segment, filters, compression and range.
pub fn filename_with_configuration(
&self,
filters: Filters,
compression: Compression,
range: &RangeInclusive<BlockNumber>,
) -> PathBuf {
let segment_name = match self {
SnapshotSegment::Headers => "headers",
SnapshotSegment::Transactions => "transactions",
SnapshotSegment::Receipts => "receipts",
};
let filters_name = match filters {
Filters::WithFilters(inclusion_filter, phf) => {
let inclusion_filter = match inclusion_filter {
InclusionFilter::Cuckoo => "cuckoo",
};
let phf = match phf {
PerfectHashingFunction::Fmph => "fmph",
PerfectHashingFunction::GoFmph => "gofmph",
};
format!("{inclusion_filter}-{phf}")
}
Filters::WithoutFilters => "none".to_string(),
};
let compression_name = match compression {
Compression::Lz4 => "lz4",
Compression::Zstd => "zstd",
Compression::ZstdWithDictionary => "zstd-dict",
Compression::Uncompressed => "uncompressed",
};
format!(
"snapshot_{segment_name}_{}_{}_{filters_name}_{compression_name}",
range.start(),
range.end(),
)
.into()
}
}
/// A segment header that contains information common to all segments. Used for storage.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Hash)]
pub struct SegmentHeader {
block_range: RangeInclusive<BlockNumber>,
tx_range: RangeInclusive<TxNumber>,

View File

@@ -19,7 +19,7 @@ use reth_primitives::{
BlockNumber, SnapshotSegment,
};
use reth_provider::{DatabaseProviderRO, TransactionsProviderExt};
use std::{ops::RangeInclusive, path::PathBuf};
use std::ops::RangeInclusive;
pub(crate) type Rows<const COLUMNS: usize> = [Vec<Vec<u8>>; COLUMNS];
@@ -61,7 +61,7 @@ pub(crate) fn prepare_jar<DB: Database, const COLUMNS: usize>(
let tx_range = provider.transaction_range_by_block_range(block_range.clone())?;
let mut nippy_jar = NippyJar::new(
COLUMNS,
&get_snapshot_segment_file_name(segment, filters, compression, &block_range),
&segment.filename_with_configuration(filters, compression, &block_range),
SegmentHeader::new(block_range, tx_range),
);
@@ -90,43 +90,3 @@ pub(crate) fn prepare_jar<DB: Database, const COLUMNS: usize>(
Ok(nippy_jar)
}
/// Returns file name for the provided segment, filters, compression and range.
pub fn get_snapshot_segment_file_name(
segment: SnapshotSegment,
filters: Filters,
compression: Compression,
range: &RangeInclusive<BlockNumber>,
) -> PathBuf {
let segment_name = match segment {
SnapshotSegment::Headers => "headers",
SnapshotSegment::Transactions => "transactions",
SnapshotSegment::Receipts => "receipts",
};
let filters_name = match filters {
Filters::WithFilters(inclusion_filter, phf) => {
let inclusion_filter = match inclusion_filter {
InclusionFilter::Cuckoo => "cuckoo",
};
let phf = match phf {
PerfectHashingFunction::Fmph => "fmph",
PerfectHashingFunction::GoFmph => "gofmph",
};
format!("{inclusion_filter}-{phf}")
}
Filters::WithoutFilters => "none".to_string(),
};
let compression_name = match compression {
Compression::Lz4 => "lz4",
Compression::Zstd => "zstd",
Compression::ZstdWithDictionary => "zstd-dict",
Compression::Uncompressed => "uncompressed",
};
format!(
"snapshot_{segment_name}_{}_{}_{filters_name}_{compression_name}",
range.start(),
range.end(),
)
.into()
}

View File

@@ -34,6 +34,7 @@ tracing-appender = "0.2"
anyhow = "1.0"
thiserror.workspace = true
hex = "*"
derive_more = "0.99"
[dev-dependencies]
rand = { version = "0.8", features = ["small_rng"] }

View File

@@ -1,8 +1,10 @@
use crate::{compression::Compression, NippyJarError};
use serde::{Deserialize, Serialize};
use derive_more::Deref;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::{
fs::File,
io::{Read, Write},
sync::Arc,
};
use tracing::*;
use zstd::bulk::Compressor;
@@ -17,7 +19,8 @@ pub enum ZstdState {
Ready,
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
#[derive(Debug, Serialize, Deserialize)]
/// Zstd compression structure. Supports a compression dictionary per column.
pub struct Zstd {
/// State. Should be ready before compressing.
@@ -29,7 +32,8 @@ pub struct Zstd {
/// Max size of a dictionary
pub(crate) max_dict_size: usize,
/// List of column dictionaries.
pub(crate) raw_dictionaries: Option<Vec<RawDictionary>>,
#[serde(with = "dictionaries_serde")]
pub(crate) dictionaries: Option<Arc<ZstdDictionaries<'static>>>,
/// Number of columns to compress.
columns: usize,
}
@@ -42,7 +46,7 @@ impl Zstd {
level: 0,
use_dict,
max_dict_size,
raw_dictionaries: None,
dictionaries: None,
columns,
}
}
@@ -52,31 +56,18 @@ impl Zstd {
self
}
/// If using dictionaries, creates a list of [`DecoderDictionary`].
///
/// Consumes `self.raw_dictionaries` in the process.
pub fn generate_decompress_dictionaries<'a>(&mut self) -> Option<Vec<DecoderDictionary<'a>>> {
self.raw_dictionaries.take().map(|dicts| {
// TODO Can we use ::new instead, and avoid consuming?
dicts.iter().map(|dict| DecoderDictionary::copy(dict)).collect()
})
}
/// Creates a list of [`Decompressor`] if using dictionaries.
pub fn decompressors(&self) -> Result<Vec<Decompressor<'_>>, NippyJarError> {
if let Some(dictionaries) = &self.dictionaries {
debug_assert!(dictionaries.len() == self.columns);
return dictionaries.decompressors()
}
/// Creates a list of [`Decompressor`] using the given dictionaries.
pub fn generate_decompressors<'a>(
&self,
dictionaries: &'a [DecoderDictionary<'a>],
) -> Result<Vec<Decompressor<'a>>, NippyJarError> {
debug_assert!(dictionaries.len() == self.columns);
Ok(dictionaries
.iter()
.map(Decompressor::with_prepared_dictionary)
.collect::<Result<Vec<_>, _>>()?)
Ok(vec![])
}
/// If using dictionaries, creates a list of [`Compressor`].
pub fn generate_compressors<'a>(&self) -> Result<Option<Vec<Compressor<'a>>>, NippyJarError> {
pub fn compressors(&self) -> Result<Option<Vec<Compressor<'_>>>, NippyJarError> {
match self.state {
ZstdState::PendingDictionary => Err(NippyJarError::CompressorNotReady),
ZstdState::Ready => {
@@ -84,18 +75,11 @@ impl Zstd {
return Ok(None)
}
let mut compressors = None;
if let Some(dictionaries) = &self.raw_dictionaries {
if let Some(dictionaries) = &self.dictionaries {
debug!(target: "nippy-jar", count=?dictionaries.len(), "Generating ZSTD compressor dictionaries.");
let mut cmp = Vec::with_capacity(dictionaries.len());
for dict in dictionaries {
cmp.push(Compressor::with_dictionary(0, dict)?);
}
compressors = Some(cmp)
return Ok(Some(dictionaries.compressors()?))
}
Ok(compressors)
Ok(None)
}
}
}
@@ -243,9 +227,144 @@ impl Compression for Zstd {
debug_assert_eq!(dictionaries.len(), self.columns);
self.raw_dictionaries = Some(dictionaries);
self.dictionaries = Some(Arc::new(ZstdDictionaries::new(dictionaries)));
self.state = ZstdState::Ready;
Ok(())
}
}
mod dictionaries_serde {
use super::*;
pub fn serialize<S>(
dictionaries: &Option<Arc<ZstdDictionaries<'static>>>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match dictionaries {
Some(dicts) => serializer.serialize_some(dicts.as_ref()),
None => serializer.serialize_none(),
}
}
pub fn deserialize<'de, D>(
deserializer: D,
) -> Result<Option<Arc<ZstdDictionaries<'static>>>, D::Error>
where
D: Deserializer<'de>,
{
let dictionaries: Option<Vec<RawDictionary>> = Option::deserialize(deserializer)?;
Ok(dictionaries.map(|dicts| Arc::new(ZstdDictionaries::load(dicts))))
}
}
/// List of [`ZstdDictionary`]
#[cfg_attr(test, derive(PartialEq))]
#[derive(Serialize, Deserialize, Deref)]
pub struct ZstdDictionaries<'a>(Vec<ZstdDictionary<'a>>);
impl<'a> std::fmt::Debug for ZstdDictionaries<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ZstdDictionaries").field("num", &self.len()).finish_non_exhaustive()
}
}
impl<'a> ZstdDictionaries<'a> {
/// Creates [`ZstdDictionaries`].
pub fn new(raw: Vec<RawDictionary>) -> Self {
Self(raw.into_iter().map(ZstdDictionary::Raw).collect())
}
/// Loads a list [`RawDictionary`] into a list of [`ZstdDictionary::Loaded`].
pub fn load(raw: Vec<RawDictionary>) -> Self {
Self(
raw.into_iter()
.map(|dict| ZstdDictionary::Loaded(DecoderDictionary::copy(&dict)))
.collect(),
)
}
/// Creates a list of decompressors from a list of [`ZstdDictionary::Loaded`].
pub fn decompressors(&self) -> Result<Vec<Decompressor<'_>>, NippyJarError> {
Ok(self
.iter()
.flat_map(|dict| {
dict.loaded()
.ok_or(NippyJarError::DictionaryNotLoaded)
.map(Decompressor::with_prepared_dictionary)
})
.collect::<Result<Vec<_>, _>>()?)
}
/// Creates a list of compressors from a list of [`ZstdDictionary::Raw`].
pub fn compressors(&self) -> Result<Vec<Compressor<'_>>, NippyJarError> {
Ok(self
.iter()
.flat_map(|dict| {
dict.raw()
.ok_or(NippyJarError::CompressorNotAllowed)
.map(|dict| Compressor::with_dictionary(0, dict))
})
.collect::<Result<Vec<_>, _>>()?)
}
}
/// A Zstd dictionary. It's created and serialized with [`ZstdDictionary::Raw`], and deserialized as
/// [`ZstdDictionary::Loaded`].
pub enum ZstdDictionary<'a> {
Raw(RawDictionary),
Loaded(DecoderDictionary<'a>),
}
impl<'a> ZstdDictionary<'a> {
/// Returns a reference to the expected `RawDictionary`
pub fn raw(&self) -> Option<&RawDictionary> {
match self {
ZstdDictionary::Raw(dict) => Some(dict),
ZstdDictionary::Loaded(_) => None,
}
}
/// Returns a reference to the expected `DecoderDictionary`
pub fn loaded(&self) -> Option<&DecoderDictionary<'_>> {
match self {
ZstdDictionary::Raw(_) => None,
ZstdDictionary::Loaded(dict) => Some(dict),
}
}
}
impl<'de, 'a> Deserialize<'de> for ZstdDictionary<'a> {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let dict = RawDictionary::deserialize(deserializer)?;
Ok(Self::Loaded(DecoderDictionary::copy(&dict)))
}
}
impl<'a> Serialize for ZstdDictionary<'a> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match self {
ZstdDictionary::Raw(r) => r.serialize(serializer),
ZstdDictionary::Loaded(_) => unreachable!(),
}
}
}
#[cfg(test)]
impl<'a> PartialEq for ZstdDictionary<'a> {
fn eq(&self, other: &Self) -> bool {
if let (Self::Raw(a), Self::Raw(b)) = (self, &other) {
return a == b
}
unimplemented!("`DecoderDictionary` can't be compared. So comparison should be done after decompressing a value.");
}
}

View File

@@ -1,24 +1,23 @@
use crate::{
compression::{Compression, Zstd},
compression::{Compression, Compressors, Zstd},
InclusionFilter, NippyJar, NippyJarError, PerfectHashingFunction, RefRow,
};
use memmap2::Mmap;
use serde::{de::Deserialize, ser::Serialize};
use std::{fs::File, ops::Range};
use std::{fs::File, ops::Range, sync::Arc};
use sucds::int_vectors::Access;
use zstd::bulk::Decompressor;
/// Simple cursor implementation to retrieve data from [`NippyJar`].
#[derive(Clone)]
pub struct NippyJarCursor<'a, H = ()> {
/// [`NippyJar`] which holds most of the required configuration to read from the file.
jar: &'a NippyJar<H>,
/// Optional dictionary decompressors.
zstd_decompressors: Option<Vec<Decompressor<'a>>>,
/// Data file.
#[allow(unused)]
file_handle: File,
file_handle: Arc<File>,
/// Data file.
mmap_handle: Mmap,
mmap_handle: Arc<Mmap>,
/// Internal buffer to unload data to without reallocating memory on each retrieval.
internal_buffer: Vec<u8>,
/// Cursor row position.
@@ -36,24 +35,21 @@ where
impl<'a, H> NippyJarCursor<'a, H>
where
H: Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug,
H: Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static,
{
pub fn new(
jar: &'a NippyJar<H>,
zstd_decompressors: Option<Vec<Decompressor<'a>>>,
) -> Result<Self, NippyJarError> {
pub fn new(jar: &'a NippyJar<H>) -> Result<Self, NippyJarError> {
let file = File::open(jar.data_path())?;
// SAFETY: File is read-only and its descriptor is kept alive as long as the mmap handle.
let mmap = unsafe { Mmap::map(&file)? };
let max_row_size = jar.max_row_size;
Ok(NippyJarCursor {
jar,
zstd_decompressors,
file_handle: file,
mmap_handle: mmap,
file_handle: Arc::new(file),
mmap_handle: Arc::new(mmap),
// Makes sure that we have enough buffer capacity to decompress any row of data.
internal_buffer: Vec::with_capacity(jar.max_row_size),
internal_buffer: Vec::with_capacity(max_row_size),
row: 0,
})
}
@@ -218,23 +214,32 @@ where
value_offset..next_value_offset
};
if let Some(zstd_dict_decompressors) = self.zstd_decompressors.as_mut() {
let from: usize = self.internal_buffer.len();
if let Some(decompressor) = zstd_dict_decompressors.get_mut(column) {
Zstd::decompress_with_dictionary(
&self.mmap_handle[column_offset_range],
&mut self.internal_buffer,
decompressor,
)?;
}
let to = self.internal_buffer.len();
row.push(ValueRange::Internal(from..to));
} else if let Some(compression) = self.jar.compressor() {
// Uses the chosen default decompressor
if let Some(compression) = self.jar.compressor() {
let from = self.internal_buffer.len();
compression
.decompress_to(&self.mmap_handle[column_offset_range], &mut self.internal_buffer)?;
match compression {
Compressors::Zstd(z) if z.use_dict => {
// If we are here, then for sure we have the necessary dictionaries and they're
// loaded (happens during deserialization). Otherwise, there's an issue
// somewhere else and we can't recover here anyway.
let dictionaries = z.dictionaries.as_ref().expect("dictionaries to exist")
[column]
.loaded()
.expect("dictionary to be loaded");
let mut decompressor = Decompressor::with_prepared_dictionary(dictionaries)?;
Zstd::decompress_with_dictionary(
&self.mmap_handle[column_offset_range],
&mut self.internal_buffer,
&mut decompressor,
)?;
}
_ => {
// Uses the chosen default decompressor
compression.decompress_to(
&self.mmap_handle[column_offset_range],
&mut self.internal_buffer,
)?;
}
}
let to = self.internal_buffer.len();
row.push(ValueRange::Internal(from..to));

View File

@@ -37,4 +37,8 @@ pub enum NippyJarError {
UnsupportedFilterQuery,
#[error("compression or decompression requires a bigger destination output")]
OutputTooSmall,
#[error("Dictionary is not loaded.")]
DictionaryNotLoaded,
#[error("It's not possible to generate a compressor after loading a dictionary.")]
CompressorNotAllowed,
}

View File

@@ -323,7 +323,7 @@ where
// implementation
let mut maybe_zstd_compressors = None;
if let Some(Compressors::Zstd(zstd)) = &self.compressor {
maybe_zstd_compressors = zstd.generate_compressors()?;
maybe_zstd_compressors = zstd.compressors()?;
}
// Temporary buffer to avoid multiple reallocations if compressing to a buffer (eg. zstd w/
@@ -394,6 +394,9 @@ where
column_iterators = iterators.into_iter();
}
// drops immutable borrow
drop(maybe_zstd_compressors);
// Write offsets and offset index to file
self.freeze_offsets(offsets)?;
@@ -622,7 +625,7 @@ mod tests {
assert!(nippy.compressor().is_some());
if let Some(Compressors::Zstd(zstd)) = &mut nippy.compressor_mut() {
assert!(matches!(zstd.generate_compressors(), Err(NippyJarError::CompressorNotReady)));
assert!(matches!(zstd.compressors(), Err(NippyJarError::CompressorNotReady)));
// Make sure the number of column iterators match the initial set up ones.
assert!(matches!(
@@ -642,27 +645,26 @@ mod tests {
if let Some(Compressors::Zstd(zstd)) = &nippy.compressor() {
assert!(matches!(
(&zstd.state, zstd.raw_dictionaries.as_ref().map(|dict| dict.len())),
(&zstd.state, zstd.dictionaries.as_ref().map(|dict| dict.len())),
(compression::ZstdState::Ready, Some(columns)) if columns == num_columns
));
}
nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows).unwrap();
let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
assert_eq!(nippy, loaded_nippy);
let mut dicts = vec![];
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor_mut() {
dicts = zstd.generate_decompress_dictionaries().unwrap()
}
let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
assert_eq!(nippy.version, loaded_nippy.version);
assert_eq!(nippy.columns, loaded_nippy.columns);
assert_eq!(nippy.filter, loaded_nippy.filter);
assert_eq!(nippy.phf, loaded_nippy.phf);
assert_eq!(nippy.offsets_index, loaded_nippy.offsets_index);
assert_eq!(nippy.offsets, loaded_nippy.offsets);
assert_eq!(nippy.max_row_size, loaded_nippy.max_row_size);
assert_eq!(nippy.path, loaded_nippy.path);
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
let mut cursor = NippyJarCursor::new(
&loaded_nippy,
Some(zstd.generate_decompressors(&dicts).unwrap()),
)
.unwrap();
assert!(zstd.use_dict);
let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
// Iterate over compressed values and compare
let mut row_index = 0usize;
@@ -673,6 +675,8 @@ mod tests {
);
row_index += 1;
}
} else {
panic!("Expected Zstd compressor")
}
}
@@ -695,7 +699,7 @@ mod tests {
assert_eq!(nippy, loaded_nippy);
if let Some(Compressors::Lz4(_)) = loaded_nippy.compressor() {
let mut cursor = NippyJarCursor::new(&loaded_nippy, None).unwrap();
let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
// Iterate over compressed values and compare
let mut row_index = 0usize;
@@ -733,7 +737,7 @@ mod tests {
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
assert!(!zstd.use_dict);
let mut cursor = NippyJarCursor::new(&loaded_nippy, None).unwrap();
let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
// Iterate over compressed values and compare
let mut row_index = 0usize;
@@ -782,23 +786,15 @@ mod tests {
// Read file
{
let mut loaded_nippy = NippyJar::<BlockJarHeader>::load(file_path.path()).unwrap();
let loaded_nippy = NippyJar::<BlockJarHeader>::load(file_path.path()).unwrap();
assert!(loaded_nippy.compressor().is_some());
assert!(loaded_nippy.filter.is_some());
assert!(loaded_nippy.phf.is_some());
assert_eq!(loaded_nippy.user_header().block_start, block_start);
let mut dicts = vec![];
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor_mut() {
dicts = zstd.generate_decompress_dictionaries().unwrap()
}
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
let mut cursor = NippyJarCursor::new(
&loaded_nippy,
Some(zstd.generate_decompressors(&dicts).unwrap()),
)
.unwrap();
if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
// Iterate over compressed values and compare
let mut row_num = 0usize;
@@ -860,18 +856,10 @@ mod tests {
// Read file
{
let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
let mut dicts = vec![];
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor_mut() {
dicts = zstd.generate_decompress_dictionaries().unwrap()
}
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
let mut cursor = NippyJarCursor::new(
&loaded_nippy,
Some(zstd.generate_decompressors(&dicts).unwrap()),
)
.unwrap();
if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
// Shuffled for chaos.
let mut data = col1.iter().zip(col2.iter()).enumerate().collect::<Vec<_>>();

View File

@@ -30,6 +30,7 @@ auto_impl = "1.0"
itertools.workspace = true
pin-project.workspace = true
parking_lot.workspace = true
dashmap = { version = "5.5", features = ["inline"] }
# test-utils
alloy-rlp = { workspace = true, optional = true }

View File

@@ -37,7 +37,7 @@ mod bundle_state_provider;
mod chain_info;
mod database;
mod snapshot;
pub use snapshot::SnapshotProvider;
pub use snapshot::{SnapshotJarProvider, SnapshotProvider};
mod state;
use crate::{providers::chain_info::ChainInfoTracker, traits::BlockSource};
pub use bundle_state_provider::BundleStateProvider;

View File

@@ -1,47 +1,210 @@
use crate::{BlockHashReader, BlockNumReader, HeaderProvider, TransactionsProvider};
use dashmap::DashMap;
use reth_db::{
table::{Decompress, Table},
HeaderTD,
};
use reth_interfaces::{provider::ProviderError, RethResult};
use reth_nippy_jar::{compression::Decompressor, NippyJar, NippyJarCursor};
use reth_nippy_jar::{NippyJar, NippyJarCursor};
use reth_primitives::{
snapshot::SegmentHeader, Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, Header,
SealedHeader, TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber,
snapshot::{SegmentHeader, BLOCKS_PER_SNAPSHOT},
Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, Header, SealedHeader,
SnapshotSegment, TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber,
B256, U256,
};
use std::ops::RangeBounds;
use std::{ops::RangeBounds, path::PathBuf};
/// Alias type for each specific `NippyJar`.
type NippyJarRef<'a> =
dashmap::mapref::one::Ref<'a, (u64, SnapshotSegment), NippyJar<SegmentHeader>>;
/// SnapshotProvider
///
/// WIP Rudimentary impl just for tests
/// TODO: should be able to walk through snapshot files/block_ranges
/// TODO: Arc over NippyJars and/or NippyJarCursors (LRU)
#[derive(Debug)]
pub struct SnapshotProvider<'a> {
/// NippyJar
pub jar: &'a NippyJar<SegmentHeader>,
#[derive(Debug, Default)]
pub struct SnapshotProvider {
/// Maintains a map which allows for concurrent access to different `NippyJars`, over different
/// segments and ranges.
map: DashMap<(BlockNumber, SnapshotSegment), NippyJar<SegmentHeader>>,
}
impl<'a> SnapshotProvider<'a> {
/// Creates cursor
pub fn cursor(&self) -> NippyJarCursor<'a, SegmentHeader> {
NippyJarCursor::new(self.jar, None).unwrap()
}
/// Creates cursor with zstd decompressors
pub fn cursor_with_decompressors(
impl SnapshotProvider {
/// Gets the provider of the requested segment and range.
pub fn get_segment_provider(
&self,
decompressors: Vec<Decompressor<'a>>,
) -> NippyJarCursor<'a, SegmentHeader> {
NippyJarCursor::new(self.jar, Some(decompressors)).unwrap()
segment: SnapshotSegment,
block: BlockNumber,
mut path: Option<PathBuf>,
) -> RethResult<SnapshotJarProvider<'_>> {
// TODO this invalidates custom length snapshots.
let snapshot = block / BLOCKS_PER_SNAPSHOT;
let key = (snapshot, segment);
if let Some(jar) = self.map.get(&key) {
return Ok(SnapshotJarProvider { jar })
}
if let Some(path) = &path {
let jar = NippyJar::load(path)?;
self.map.insert(key, jar);
} else {
path = Some(segment.filename(
&((snapshot * BLOCKS_PER_SNAPSHOT)..=((snapshot + 1) * BLOCKS_PER_SNAPSHOT - 1)),
));
}
self.get_segment_provider(segment, block, path)
}
}
impl<'a> HeaderProvider for SnapshotProvider<'a> {
impl HeaderProvider for SnapshotProvider {
fn header(&self, _block_hash: &BlockHash) -> RethResult<Option<Header>> {
todo!()
}
fn header_by_number(&self, num: BlockNumber) -> RethResult<Option<Header>> {
self.get_segment_provider(SnapshotSegment::Headers, num, None)?.header_by_number(num)
}
fn header_td(&self, _block_hash: &BlockHash) -> RethResult<Option<U256>> {
todo!()
}
fn header_td_by_number(&self, _number: BlockNumber) -> RethResult<Option<U256>> {
todo!();
}
fn headers_range(&self, _range: impl RangeBounds<BlockNumber>) -> RethResult<Vec<Header>> {
todo!();
}
fn sealed_headers_range(
&self,
_range: impl RangeBounds<BlockNumber>,
) -> RethResult<Vec<SealedHeader>> {
todo!();
}
fn sealed_header(&self, _number: BlockNumber) -> RethResult<Option<SealedHeader>> {
todo!();
}
}
impl BlockHashReader for SnapshotProvider {
fn block_hash(&self, _number: u64) -> RethResult<Option<B256>> {
todo!()
}
fn canonical_hashes_range(
&self,
_start: BlockNumber,
_end: BlockNumber,
) -> RethResult<Vec<B256>> {
todo!()
}
}
impl BlockNumReader for SnapshotProvider {
fn chain_info(&self) -> RethResult<ChainInfo> {
todo!()
}
fn best_block_number(&self) -> RethResult<BlockNumber> {
todo!()
}
fn last_block_number(&self) -> RethResult<BlockNumber> {
todo!()
}
fn block_number(&self, _hash: B256) -> RethResult<Option<BlockNumber>> {
todo!()
}
}
impl TransactionsProvider for SnapshotProvider {
fn transaction_id(&self, _tx_hash: TxHash) -> RethResult<Option<TxNumber>> {
todo!()
}
fn transaction_by_id(&self, num: TxNumber) -> RethResult<Option<TransactionSigned>> {
// TODO `num` is provided after checking the index
let block_num = num;
self.get_segment_provider(SnapshotSegment::Transactions, block_num, None)?
.transaction_by_id(num)
}
fn transaction_by_id_no_hash(
&self,
_id: TxNumber,
) -> RethResult<Option<TransactionSignedNoHash>> {
todo!()
}
fn transaction_by_hash(&self, _hash: TxHash) -> RethResult<Option<TransactionSigned>> {
todo!()
}
fn transaction_by_hash_with_meta(
&self,
_hash: TxHash,
) -> RethResult<Option<(TransactionSigned, TransactionMeta)>> {
todo!()
}
fn transaction_block(&self, _id: TxNumber) -> RethResult<Option<BlockNumber>> {
todo!()
}
fn transactions_by_block(
&self,
_block_id: BlockHashOrNumber,
) -> RethResult<Option<Vec<TransactionSigned>>> {
todo!()
}
fn transactions_by_block_range(
&self,
_range: impl RangeBounds<BlockNumber>,
) -> RethResult<Vec<Vec<TransactionSigned>>> {
todo!()
}
fn senders_by_tx_range(&self, _range: impl RangeBounds<TxNumber>) -> RethResult<Vec<Address>> {
todo!()
}
fn transactions_by_tx_range(
&self,
_range: impl RangeBounds<TxNumber>,
) -> RethResult<Vec<reth_primitives::TransactionSignedNoHash>> {
todo!()
}
fn transaction_sender(&self, _id: TxNumber) -> RethResult<Option<Address>> {
todo!()
}
}
/// Provider over a specific `NippyJar` and range.
#[derive(Debug)]
pub struct SnapshotJarProvider<'a> {
/// Reference to a value on [`SnapshotProvider`]
pub jar: NippyJarRef<'a>,
}
impl<'a> SnapshotJarProvider<'a> {
/// Provides a cursor for more granular data access.
pub fn cursor<'b>(&'b self) -> RethResult<NippyJarCursor<'a, SegmentHeader>>
where
'b: 'a,
{
Ok(NippyJarCursor::new(self.jar.value())?)
}
}
impl<'a> HeaderProvider for SnapshotJarProvider<'a> {
fn header(&self, block_hash: &BlockHash) -> RethResult<Option<Header>> {
// WIP
let mut cursor = self.cursor();
let mut cursor = NippyJarCursor::new(self.jar.value())?;
let header = Header::decompress(
cursor.row_by_key_with_cols::<0b01, 2>(&block_hash.0).unwrap().unwrap()[0],
@@ -58,7 +221,7 @@ impl<'a> HeaderProvider for SnapshotProvider<'a> {
fn header_by_number(&self, num: BlockNumber) -> RethResult<Option<Header>> {
Header::decompress(
self.cursor()
NippyJarCursor::new(self.jar.value())?
.row_by_number_with_cols::<0b01, 2>(
(num - self.jar.user_header().block_start()) as usize,
)?
@@ -70,7 +233,7 @@ impl<'a> HeaderProvider for SnapshotProvider<'a> {
fn header_td(&self, block_hash: &BlockHash) -> RethResult<Option<U256>> {
// WIP
let mut cursor = self.cursor();
let mut cursor = NippyJarCursor::new(self.jar.value())?;
let row = cursor.row_by_key_with_cols::<0b11, 2>(&block_hash.0).unwrap().unwrap();
@@ -105,7 +268,7 @@ impl<'a> HeaderProvider for SnapshotProvider<'a> {
}
}
impl<'a> BlockHashReader for SnapshotProvider<'a> {
impl<'a> BlockHashReader for SnapshotJarProvider<'a> {
fn block_hash(&self, _number: u64) -> RethResult<Option<B256>> {
todo!()
}
@@ -119,7 +282,7 @@ impl<'a> BlockHashReader for SnapshotProvider<'a> {
}
}
impl<'a> BlockNumReader for SnapshotProvider<'a> {
impl<'a> BlockNumReader for SnapshotJarProvider<'a> {
fn chain_info(&self) -> RethResult<ChainInfo> {
todo!()
}
@@ -137,14 +300,14 @@ impl<'a> BlockNumReader for SnapshotProvider<'a> {
}
}
impl<'a> TransactionsProvider for SnapshotProvider<'a> {
impl<'a> TransactionsProvider for SnapshotJarProvider<'a> {
fn transaction_id(&self, _tx_hash: TxHash) -> RethResult<Option<TxNumber>> {
todo!()
}
fn transaction_by_id(&self, num: TxNumber) -> RethResult<Option<TransactionSigned>> {
TransactionSignedNoHash::decompress(
self.cursor()
NippyJarCursor::new(self.jar.value())?
.row_by_number_with_cols::<0b1, 1>(
(num - self.jar.user_header().tx_start()) as usize,
)?
@@ -164,7 +327,7 @@ impl<'a> TransactionsProvider for SnapshotProvider<'a> {
fn transaction_by_hash(&self, hash: TxHash) -> RethResult<Option<TransactionSigned>> {
// WIP
let mut cursor = self.cursor();
let mut cursor = NippyJarCursor::new(self.jar.value())?;
let tx = TransactionSignedNoHash::decompress(
cursor.row_by_key_with_cols::<0b1, 1>(&hash.0).unwrap().unwrap()[0],
@@ -315,10 +478,11 @@ mod test {
// Use providers to query Header data and compare if it matches
{
let jar = NippyJar::load(snap_file.path()).unwrap();
let db_provider = factory.provider().unwrap();
let snap_provider = SnapshotProvider { jar: &jar };
let manager = SnapshotProvider::default();
let jar_provider = manager
.get_segment_provider(SnapshotSegment::Headers, 0, Some(snap_file.path().into()))
.unwrap();
assert!(!headers.is_empty());
@@ -331,12 +495,12 @@ mod test {
// Compare Header
assert_eq!(header, db_provider.header(&header_hash).unwrap().unwrap());
assert_eq!(header, snap_provider.header(&header_hash).unwrap().unwrap());
assert_eq!(header, jar_provider.header(&header_hash).unwrap().unwrap());
// Compare HeaderTD
assert_eq!(
db_provider.header_td(&header_hash).unwrap().unwrap(),
snap_provider.header_td(&header_hash).unwrap().unwrap()
jar_provider.header_td(&header_hash).unwrap().unwrap()
);
}
}