diff --git a/crates/storage/nippy-jar/src/cursor.rs b/crates/storage/nippy-jar/src/cursor.rs index ba876ccaee..010bb1df2a 100644 --- a/crates/storage/nippy-jar/src/cursor.rs +++ b/crates/storage/nippy-jar/src/cursor.rs @@ -1,10 +1,9 @@ use crate::{ compression::{Compression, Compressors, Zstd}, - InclusionFilter, NippyJar, NippyJarError, PerfectHashingFunction, RefRow, + InclusionFilter, MmapHandle, NippyJar, NippyJarError, PerfectHashingFunction, RefRow, }; -use memmap2::Mmap; use serde::{de::Deserialize, ser::Serialize}; -use std::{fs::File, ops::Range, sync::Arc}; +use std::ops::Range; use sucds::int_vectors::Access; use zstd::bulk::Decompressor; @@ -14,10 +13,7 @@ pub struct NippyJarCursor<'a, H = ()> { /// [`NippyJar`] which holds most of the required configuration to read from the file. jar: &'a NippyJar, /// Data file. - #[allow(unused)] - file_handle: Arc, - /// Data file. - mmap_handle: Arc, + mmap_handle: MmapHandle, /// Internal buffer to unload data to without reallocating memory on each retrieval. internal_buffer: Vec, /// Cursor row position. @@ -38,16 +34,24 @@ where H: Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static, { pub fn new(jar: &'a NippyJar) -> Result { - let file = File::open(jar.data_path())?; - - // SAFETY: File is read-only and its descriptor is kept alive as long as the mmap handle. - let mmap = unsafe { Mmap::map(&file)? }; let max_row_size = jar.max_row_size; - Ok(NippyJarCursor { jar, - file_handle: Arc::new(file), - mmap_handle: Arc::new(mmap), + mmap_handle: jar.open_data()?, + // Makes sure that we have enough buffer capacity to decompress any row of data. + internal_buffer: Vec::with_capacity(max_row_size), + row: 0, + }) + } + + pub fn with_handle( + jar: &'a NippyJar, + mmap_handle: MmapHandle, + ) -> Result { + let max_row_size = jar.max_row_size; + Ok(NippyJarCursor { + jar, + mmap_handle, // Makes sure that we have enough buffer capacity to decompress any row of data. internal_buffer: Vec::with_capacity(max_row_size), row: 0, diff --git a/crates/storage/nippy-jar/src/lib.rs b/crates/storage/nippy-jar/src/lib.rs index 036356946e..b1f932bece 100644 --- a/crates/storage/nippy-jar/src/lib.rs +++ b/crates/storage/nippy-jar/src/lib.rs @@ -10,6 +10,7 @@ #![deny(unused_must_use, rust_2018_idioms)] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] +use memmap2::Mmap; use serde::{Deserialize, Serialize}; use std::{ clone::Clone, @@ -17,7 +18,9 @@ use std::{ fs::File, io::{Seek, Write}, marker::Sync, + ops::Deref, path::{Path, PathBuf}, + sync::Arc, }; use sucds::{ int_vectors::PrefixSummedEliasFano, @@ -247,6 +250,11 @@ where .join(format!("{}.idx", data_path.file_name().expect("exists").to_string_lossy())) } + /// Returns a [`MmapHandle`] of the data file + pub fn open_data(&self) -> Result { + MmapHandle::new(self.data_path()) + } + /// If required, prepares any compression algorithm to an early pass of the data. pub fn prepare_compression( &mut self, @@ -487,6 +495,34 @@ where } } +/// Holds an `Arc` over a file and its associated mmap handle. +#[derive(Debug, Clone)] +pub struct MmapHandle { + /// File descriptor. Needs to be kept alive as long as the mmap handle. + #[allow(unused)] + file: Arc, + /// Mmap handle. + mmap: Arc, +} + +impl MmapHandle { + pub fn new(path: impl AsRef) -> Result { + let file = File::open(path)?; + + // SAFETY: File is read-only and its descriptor is kept alive as long as the mmap handle. + let mmap = unsafe { Mmap::map(&file)? }; + + Ok(Self { file: Arc::new(file), mmap: Arc::new(mmap) }) + } +} + +impl Deref for MmapHandle { + type Target = Mmap; + fn deref(&self) -> &Self::Target { + &self.mmap + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/storage/provider/src/providers/snapshot.rs b/crates/storage/provider/src/providers/snapshot.rs deleted file mode 100644 index 8087eb159c..0000000000 --- a/crates/storage/provider/src/providers/snapshot.rs +++ /dev/null @@ -1,508 +0,0 @@ -use crate::{BlockHashReader, BlockNumReader, HeaderProvider, TransactionsProvider}; -use dashmap::DashMap; -use reth_db::{ - table::{Decompress, Table}, - HeaderTD, -}; -use reth_interfaces::{provider::ProviderError, RethResult}; -use reth_nippy_jar::{NippyJar, NippyJarCursor}; -use reth_primitives::{ - snapshot::{SegmentHeader, BLOCKS_PER_SNAPSHOT}, - Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, Header, SealedHeader, - SnapshotSegment, TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber, - B256, U256, -}; -use std::{ops::RangeBounds, path::PathBuf}; - -/// Alias type for each specific `NippyJar`. -type NippyJarRef<'a> = - dashmap::mapref::one::Ref<'a, (u64, SnapshotSegment), NippyJar>; - -/// SnapshotProvider -#[derive(Debug, Default)] -pub struct SnapshotProvider { - /// Maintains a map which allows for concurrent access to different `NippyJars`, over different - /// segments and ranges. - map: DashMap<(BlockNumber, SnapshotSegment), NippyJar>, -} - -impl SnapshotProvider { - /// Gets the provider of the requested segment and range. - pub fn get_segment_provider( - &self, - segment: SnapshotSegment, - block: BlockNumber, - mut path: Option, - ) -> RethResult> { - // TODO this invalidates custom length snapshots. - let snapshot = block / BLOCKS_PER_SNAPSHOT; - let key = (snapshot, segment); - - if let Some(jar) = self.map.get(&key) { - return Ok(SnapshotJarProvider { jar }) - } - - if let Some(path) = &path { - let jar = NippyJar::load(path)?; - self.map.insert(key, jar); - } else { - path = Some(segment.filename( - &((snapshot * BLOCKS_PER_SNAPSHOT)..=((snapshot + 1) * BLOCKS_PER_SNAPSHOT - 1)), - )); - } - - self.get_segment_provider(segment, block, path) - } -} - -impl HeaderProvider for SnapshotProvider { - fn header(&self, _block_hash: &BlockHash) -> RethResult> { - todo!() - } - - fn header_by_number(&self, num: BlockNumber) -> RethResult> { - self.get_segment_provider(SnapshotSegment::Headers, num, None)?.header_by_number(num) - } - - fn header_td(&self, _block_hash: &BlockHash) -> RethResult> { - todo!() - } - - fn header_td_by_number(&self, _number: BlockNumber) -> RethResult> { - todo!(); - } - - fn headers_range(&self, _range: impl RangeBounds) -> RethResult> { - todo!(); - } - - fn sealed_headers_range( - &self, - _range: impl RangeBounds, - ) -> RethResult> { - todo!(); - } - - fn sealed_header(&self, _number: BlockNumber) -> RethResult> { - todo!(); - } -} - -impl BlockHashReader for SnapshotProvider { - fn block_hash(&self, _number: u64) -> RethResult> { - todo!() - } - - fn canonical_hashes_range( - &self, - _start: BlockNumber, - _end: BlockNumber, - ) -> RethResult> { - todo!() - } -} - -impl BlockNumReader for SnapshotProvider { - fn chain_info(&self) -> RethResult { - todo!() - } - - fn best_block_number(&self) -> RethResult { - todo!() - } - - fn last_block_number(&self) -> RethResult { - todo!() - } - - fn block_number(&self, _hash: B256) -> RethResult> { - todo!() - } -} - -impl TransactionsProvider for SnapshotProvider { - fn transaction_id(&self, _tx_hash: TxHash) -> RethResult> { - todo!() - } - - fn transaction_by_id(&self, num: TxNumber) -> RethResult> { - // TODO `num` is provided after checking the index - let block_num = num; - self.get_segment_provider(SnapshotSegment::Transactions, block_num, None)? - .transaction_by_id(num) - } - - fn transaction_by_id_no_hash( - &self, - _id: TxNumber, - ) -> RethResult> { - todo!() - } - - fn transaction_by_hash(&self, _hash: TxHash) -> RethResult> { - todo!() - } - - fn transaction_by_hash_with_meta( - &self, - _hash: TxHash, - ) -> RethResult> { - todo!() - } - - fn transaction_block(&self, _id: TxNumber) -> RethResult> { - todo!() - } - - fn transactions_by_block( - &self, - _block_id: BlockHashOrNumber, - ) -> RethResult>> { - todo!() - } - - fn transactions_by_block_range( - &self, - _range: impl RangeBounds, - ) -> RethResult>> { - todo!() - } - - fn senders_by_tx_range(&self, _range: impl RangeBounds) -> RethResult> { - todo!() - } - - fn transactions_by_tx_range( - &self, - _range: impl RangeBounds, - ) -> RethResult> { - todo!() - } - - fn transaction_sender(&self, _id: TxNumber) -> RethResult> { - todo!() - } -} - -/// Provider over a specific `NippyJar` and range. -#[derive(Debug)] -pub struct SnapshotJarProvider<'a> { - /// Reference to a value on [`SnapshotProvider`] - pub jar: NippyJarRef<'a>, -} - -impl<'a> SnapshotJarProvider<'a> { - /// Provides a cursor for more granular data access. - pub fn cursor<'b>(&'b self) -> RethResult> - where - 'b: 'a, - { - Ok(NippyJarCursor::new(self.jar.value())?) - } -} - -impl<'a> HeaderProvider for SnapshotJarProvider<'a> { - fn header(&self, block_hash: &BlockHash) -> RethResult> { - // WIP - let mut cursor = NippyJarCursor::new(self.jar.value())?; - - let header = Header::decompress( - cursor.row_by_key_with_cols::<0b01, 2>(&block_hash.0).unwrap().unwrap()[0], - ) - .unwrap(); - - if &header.hash_slow() == block_hash { - return Ok(Some(header)) - } else { - // check next snapshot - } - Ok(None) - } - - fn header_by_number(&self, num: BlockNumber) -> RethResult> { - Header::decompress( - NippyJarCursor::new(self.jar.value())? - .row_by_number_with_cols::<0b01, 2>( - (num - self.jar.user_header().block_start()) as usize, - )? - .ok_or(ProviderError::HeaderNotFound(num.into()))?[0], - ) - .map(Some) - .map_err(Into::into) - } - - fn header_td(&self, block_hash: &BlockHash) -> RethResult> { - // WIP - let mut cursor = NippyJarCursor::new(self.jar.value())?; - - let row = cursor.row_by_key_with_cols::<0b11, 2>(&block_hash.0).unwrap().unwrap(); - - let header = Header::decompress(row[0]).unwrap(); - let td = ::Value::decompress(row[1]).unwrap(); - - if &header.hash_slow() == block_hash { - return Ok(Some(td.0)) - } else { - // check next snapshot - } - Ok(None) - } - - fn header_td_by_number(&self, _number: BlockNumber) -> RethResult> { - unimplemented!(); - } - - fn headers_range(&self, _range: impl RangeBounds) -> RethResult> { - unimplemented!(); - } - - fn sealed_headers_range( - &self, - _range: impl RangeBounds, - ) -> RethResult> { - unimplemented!(); - } - - fn sealed_header(&self, _number: BlockNumber) -> RethResult> { - unimplemented!(); - } -} - -impl<'a> BlockHashReader for SnapshotJarProvider<'a> { - fn block_hash(&self, _number: u64) -> RethResult> { - todo!() - } - - fn canonical_hashes_range( - &self, - _start: BlockNumber, - _end: BlockNumber, - ) -> RethResult> { - todo!() - } -} - -impl<'a> BlockNumReader for SnapshotJarProvider<'a> { - fn chain_info(&self) -> RethResult { - todo!() - } - - fn best_block_number(&self) -> RethResult { - todo!() - } - - fn last_block_number(&self) -> RethResult { - todo!() - } - - fn block_number(&self, _hash: B256) -> RethResult> { - todo!() - } -} - -impl<'a> TransactionsProvider for SnapshotJarProvider<'a> { - fn transaction_id(&self, _tx_hash: TxHash) -> RethResult> { - todo!() - } - - fn transaction_by_id(&self, num: TxNumber) -> RethResult> { - TransactionSignedNoHash::decompress( - NippyJarCursor::new(self.jar.value())? - .row_by_number_with_cols::<0b1, 1>( - (num - self.jar.user_header().tx_start()) as usize, - )? - .ok_or(ProviderError::TransactionNotFound(num.into()))?[0], - ) - .map(Into::into) - .map(Some) - .map_err(Into::into) - } - - fn transaction_by_id_no_hash( - &self, - _id: TxNumber, - ) -> RethResult> { - todo!() - } - - fn transaction_by_hash(&self, hash: TxHash) -> RethResult> { - // WIP - let mut cursor = NippyJarCursor::new(self.jar.value())?; - - let tx = TransactionSignedNoHash::decompress( - cursor.row_by_key_with_cols::<0b1, 1>(&hash.0).unwrap().unwrap()[0], - ) - .unwrap() - .with_hash(); - - if tx.hash() == hash { - return Ok(Some(tx)) - } else { - // check next snapshot - } - Ok(None) - } - - fn transaction_by_hash_with_meta( - &self, - _hash: TxHash, - ) -> RethResult> { - todo!() - } - - fn transaction_block(&self, _id: TxNumber) -> RethResult> { - todo!() - } - - fn transactions_by_block( - &self, - _block_id: BlockHashOrNumber, - ) -> RethResult>> { - todo!() - } - - fn transactions_by_block_range( - &self, - _range: impl RangeBounds, - ) -> RethResult>> { - todo!() - } - - fn senders_by_tx_range(&self, _range: impl RangeBounds) -> RethResult> { - todo!() - } - - fn transactions_by_tx_range( - &self, - _range: impl RangeBounds, - ) -> RethResult> { - todo!() - } - - fn transaction_sender(&self, _id: TxNumber) -> RethResult> { - todo!() - } -} - -#[cfg(test)] -mod test { - use super::*; - use crate::ProviderFactory; - use rand::{self, seq::SliceRandom}; - use reth_db::{ - cursor::DbCursorRO, - database::Database, - snapshot::create_snapshot_T1_T2, - test_utils::create_test_rw_db, - transaction::{DbTx, DbTxMut}, - CanonicalHeaders, DatabaseError, HeaderNumbers, HeaderTD, Headers, RawTable, - }; - use reth_interfaces::test_utils::generators::{self, random_header_range}; - use reth_nippy_jar::NippyJar; - use reth_primitives::{B256, MAINNET}; - - #[test] - fn test_snap() { - // Ranges - let row_count = 100u64; - let range = 0..=(row_count - 1); - let segment_header = SegmentHeader::new(range.clone(), range.clone()); - - // Data sources - let db = create_test_rw_db(); - let factory = ProviderFactory::new(&db, MAINNET.clone()); - let snap_file = tempfile::NamedTempFile::new().unwrap(); - - // Setup data - let mut headers = random_header_range( - &mut generators::rng(), - *range.start()..(*range.end() + 1), - B256::random(), - ); - - db.update(|tx| -> Result<(), DatabaseError> { - let mut td = U256::ZERO; - for header in headers.clone() { - td += header.header.difficulty; - let hash = header.hash(); - - tx.put::(header.number, hash)?; - tx.put::(header.number, header.clone().unseal())?; - tx.put::(header.number, td.into())?; - tx.put::(hash, header.number)?; - } - Ok(()) - }) - .unwrap() - .unwrap(); - - // Create Snapshot - { - let with_compression = true; - let with_filter = true; - - let mut nippy_jar = NippyJar::new(2, snap_file.path(), segment_header); - - if with_compression { - nippy_jar = nippy_jar.with_zstd(false, 0); - } - - if with_filter { - nippy_jar = nippy_jar.with_cuckoo_filter(row_count as usize + 10).with_fmph(); - } - - let tx = db.tx().unwrap(); - - // Hacky type inference. TODO fix - let mut none_vec = Some(vec![vec![vec![0u8]].into_iter()]); - let _ = none_vec.take(); - - // Generate list of hashes for filters & PHF - let mut cursor = tx.cursor_read::>().unwrap(); - let hashes = cursor - .walk(None) - .unwrap() - .map(|row| row.map(|(_key, value)| value.into_value()).map_err(|e| e.into())); - - create_snapshot_T1_T2::( - &tx, - range, - None, - none_vec, - Some(hashes), - row_count as usize, - &mut nippy_jar, - ) - .unwrap(); - } - - // Use providers to query Header data and compare if it matches - { - let db_provider = factory.provider().unwrap(); - let manager = SnapshotProvider::default(); - let jar_provider = manager - .get_segment_provider(SnapshotSegment::Headers, 0, Some(snap_file.path().into())) - .unwrap(); - - assert!(!headers.is_empty()); - - // Shuffled for chaos. - headers.shuffle(&mut generators::rng()); - - for header in headers { - let header_hash = header.hash(); - let header = header.unseal(); - - // Compare Header - assert_eq!(header, db_provider.header(&header_hash).unwrap().unwrap()); - assert_eq!(header, jar_provider.header(&header_hash).unwrap().unwrap()); - - // Compare HeaderTD - assert_eq!( - db_provider.header_td(&header_hash).unwrap().unwrap(), - jar_provider.header_td(&header_hash).unwrap().unwrap() - ); - } - } - } -} diff --git a/crates/storage/provider/src/providers/snapshot/jar.rs b/crates/storage/provider/src/providers/snapshot/jar.rs new file mode 100644 index 0000000000..4dd8099cf6 --- /dev/null +++ b/crates/storage/provider/src/providers/snapshot/jar.rs @@ -0,0 +1,222 @@ +use super::LoadedJarRef; +use crate::{BlockHashReader, BlockNumReader, HeaderProvider, TransactionsProvider}; +use reth_db::{ + table::{Decompress, Table}, + HeaderTD, +}; +use reth_interfaces::{provider::ProviderError, RethResult}; +use reth_nippy_jar::NippyJarCursor; +use reth_primitives::{ + snapshot::SegmentHeader, Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, Header, + SealedHeader, TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber, + B256, U256, +}; +use std::ops::{Deref, RangeBounds}; + +/// Provider over a specific `NippyJar` and range. +#[derive(Debug)] +pub struct SnapshotJarProvider<'a>(LoadedJarRef<'a>); + +impl<'a> Deref for SnapshotJarProvider<'a> { + type Target = LoadedJarRef<'a>; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a> From> for SnapshotJarProvider<'a> { + fn from(value: LoadedJarRef<'a>) -> Self { + SnapshotJarProvider(value) + } +} + +impl<'a> SnapshotJarProvider<'a> { + /// Provides a cursor for more granular data access. + pub fn cursor<'b>(&'b self) -> RethResult> + where + 'b: 'a, + { + Ok(NippyJarCursor::with_handle(self.value(), self.mmap_handle())?) + } +} + +impl<'a> HeaderProvider for SnapshotJarProvider<'a> { + fn header(&self, block_hash: &BlockHash) -> RethResult> { + // WIP + let mut cursor = self.cursor()?; + + let header = Header::decompress( + cursor.row_by_key_with_cols::<0b01, 2>(&block_hash.0).unwrap().unwrap()[0], + ) + .unwrap(); + + if &header.hash_slow() == block_hash { + return Ok(Some(header)) + } else { + // check next snapshot + } + Ok(None) + } + + fn header_by_number(&self, num: BlockNumber) -> RethResult> { + Header::decompress( + self.cursor()? + .row_by_number_with_cols::<0b01, 2>( + (num - self.user_header().block_start()) as usize, + )? + .ok_or(ProviderError::HeaderNotFound(num.into()))?[0], + ) + .map(Some) + .map_err(Into::into) + } + + fn header_td(&self, block_hash: &BlockHash) -> RethResult> { + // WIP + let mut cursor = NippyJarCursor::with_handle(self.value(), self.mmap_handle())?; + + let row = cursor.row_by_key_with_cols::<0b11, 2>(&block_hash.0).unwrap().unwrap(); + + let header = Header::decompress(row[0]).unwrap(); + let td = ::Value::decompress(row[1]).unwrap(); + + if &header.hash_slow() == block_hash { + return Ok(Some(td.0)) + } else { + // check next snapshot + } + Ok(None) + } + + fn header_td_by_number(&self, _number: BlockNumber) -> RethResult> { + unimplemented!(); + } + + fn headers_range(&self, _range: impl RangeBounds) -> RethResult> { + unimplemented!(); + } + + fn sealed_headers_range( + &self, + _range: impl RangeBounds, + ) -> RethResult> { + unimplemented!(); + } + + fn sealed_header(&self, _number: BlockNumber) -> RethResult> { + unimplemented!(); + } +} + +impl<'a> BlockHashReader for SnapshotJarProvider<'a> { + fn block_hash(&self, _number: u64) -> RethResult> { + todo!() + } + + fn canonical_hashes_range( + &self, + _start: BlockNumber, + _end: BlockNumber, + ) -> RethResult> { + todo!() + } +} + +impl<'a> BlockNumReader for SnapshotJarProvider<'a> { + fn chain_info(&self) -> RethResult { + todo!() + } + + fn best_block_number(&self) -> RethResult { + todo!() + } + + fn last_block_number(&self) -> RethResult { + todo!() + } + + fn block_number(&self, _hash: B256) -> RethResult> { + todo!() + } +} + +impl<'a> TransactionsProvider for SnapshotJarProvider<'a> { + fn transaction_id(&self, _tx_hash: TxHash) -> RethResult> { + todo!() + } + + fn transaction_by_id(&self, num: TxNumber) -> RethResult> { + TransactionSignedNoHash::decompress( + self.cursor()? + .row_by_number_with_cols::<0b1, 1>((num - self.user_header().tx_start()) as usize)? + .ok_or(ProviderError::TransactionNotFound(num.into()))?[0], + ) + .map(Into::into) + .map(Some) + .map_err(Into::into) + } + + fn transaction_by_id_no_hash( + &self, + _id: TxNumber, + ) -> RethResult> { + todo!() + } + + fn transaction_by_hash(&self, hash: TxHash) -> RethResult> { + // WIP + let mut cursor = self.cursor()?; + + let tx = TransactionSignedNoHash::decompress( + cursor.row_by_key_with_cols::<0b1, 1>(&hash.0).unwrap().unwrap()[0], + ) + .unwrap() + .with_hash(); + + if tx.hash() == hash { + return Ok(Some(tx)) + } else { + // check next snapshot + } + Ok(None) + } + + fn transaction_by_hash_with_meta( + &self, + _hash: TxHash, + ) -> RethResult> { + todo!() + } + + fn transaction_block(&self, _id: TxNumber) -> RethResult> { + todo!() + } + + fn transactions_by_block( + &self, + _block_id: BlockHashOrNumber, + ) -> RethResult>> { + todo!() + } + + fn transactions_by_block_range( + &self, + _range: impl RangeBounds, + ) -> RethResult>> { + todo!() + } + + fn senders_by_tx_range(&self, _range: impl RangeBounds) -> RethResult> { + todo!() + } + + fn transactions_by_tx_range( + &self, + _range: impl RangeBounds, + ) -> RethResult> { + todo!() + } + + fn transaction_sender(&self, _id: TxNumber) -> RethResult> { + todo!() + } +} diff --git a/crates/storage/provider/src/providers/snapshot/manager.rs b/crates/storage/provider/src/providers/snapshot/manager.rs new file mode 100644 index 0000000000..a065f6d1db --- /dev/null +++ b/crates/storage/provider/src/providers/snapshot/manager.rs @@ -0,0 +1,176 @@ +use super::{LoadedJar, SnapshotJarProvider}; +use crate::{BlockHashReader, BlockNumReader, HeaderProvider, TransactionsProvider}; +use dashmap::DashMap; +use reth_interfaces::RethResult; +use reth_nippy_jar::NippyJar; +use reth_primitives::{ + snapshot::BLOCKS_PER_SNAPSHOT, Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, + Header, SealedHeader, SnapshotSegment, TransactionMeta, TransactionSigned, + TransactionSignedNoHash, TxHash, TxNumber, B256, U256, +}; +use std::{ops::RangeBounds, path::PathBuf}; + +/// SnapshotProvider +#[derive(Debug, Default)] +pub struct SnapshotProvider { + /// Maintains a map which allows for concurrent access to different `NippyJars`, over different + /// segments and ranges. + map: DashMap<(BlockNumber, SnapshotSegment), LoadedJar>, +} + +impl SnapshotProvider { + /// Gets the provider of the requested segment and range. + pub fn get_segment_provider( + &self, + segment: SnapshotSegment, + block: BlockNumber, + mut path: Option, + ) -> RethResult> { + // TODO this invalidates custom length snapshots. + let snapshot = block / BLOCKS_PER_SNAPSHOT; + let key = (snapshot, segment); + + if let Some(jar) = self.map.get(&key) { + return Ok(jar.into()) + } + + if let Some(path) = &path { + self.map.insert(key, LoadedJar::new(NippyJar::load(path)?)?); + } else { + path = Some(segment.filename( + &((snapshot * BLOCKS_PER_SNAPSHOT)..=((snapshot + 1) * BLOCKS_PER_SNAPSHOT - 1)), + )); + } + + self.get_segment_provider(segment, block, path) + } +} + +impl HeaderProvider for SnapshotProvider { + fn header(&self, _block_hash: &BlockHash) -> RethResult> { + todo!() + } + + fn header_by_number(&self, num: BlockNumber) -> RethResult> { + self.get_segment_provider(SnapshotSegment::Headers, num, None)?.header_by_number(num) + } + + fn header_td(&self, _block_hash: &BlockHash) -> RethResult> { + todo!() + } + + fn header_td_by_number(&self, _number: BlockNumber) -> RethResult> { + todo!(); + } + + fn headers_range(&self, _range: impl RangeBounds) -> RethResult> { + todo!(); + } + + fn sealed_headers_range( + &self, + _range: impl RangeBounds, + ) -> RethResult> { + todo!(); + } + + fn sealed_header(&self, _number: BlockNumber) -> RethResult> { + todo!(); + } +} + +impl BlockHashReader for SnapshotProvider { + fn block_hash(&self, _number: u64) -> RethResult> { + todo!() + } + + fn canonical_hashes_range( + &self, + _start: BlockNumber, + _end: BlockNumber, + ) -> RethResult> { + todo!() + } +} + +impl BlockNumReader for SnapshotProvider { + fn chain_info(&self) -> RethResult { + todo!() + } + + fn best_block_number(&self) -> RethResult { + todo!() + } + + fn last_block_number(&self) -> RethResult { + todo!() + } + + fn block_number(&self, _hash: B256) -> RethResult> { + todo!() + } +} + +impl TransactionsProvider for SnapshotProvider { + fn transaction_id(&self, _tx_hash: TxHash) -> RethResult> { + todo!() + } + + fn transaction_by_id(&self, num: TxNumber) -> RethResult> { + // TODO `num` is provided after checking the index + let block_num = num; + self.get_segment_provider(SnapshotSegment::Transactions, block_num, None)? + .transaction_by_id(num) + } + + fn transaction_by_id_no_hash( + &self, + _id: TxNumber, + ) -> RethResult> { + todo!() + } + + fn transaction_by_hash(&self, _hash: TxHash) -> RethResult> { + todo!() + } + + fn transaction_by_hash_with_meta( + &self, + _hash: TxHash, + ) -> RethResult> { + todo!() + } + + fn transaction_block(&self, _id: TxNumber) -> RethResult> { + todo!() + } + + fn transactions_by_block( + &self, + _block_id: BlockHashOrNumber, + ) -> RethResult>> { + todo!() + } + + fn transactions_by_block_range( + &self, + _range: impl RangeBounds, + ) -> RethResult>> { + todo!() + } + + fn senders_by_tx_range(&self, _range: impl RangeBounds) -> RethResult> { + todo!() + } + + fn transactions_by_tx_range( + &self, + _range: impl RangeBounds, + ) -> RethResult> { + todo!() + } + + fn transaction_sender(&self, _id: TxNumber) -> RethResult> { + todo!() + } +} diff --git a/crates/storage/provider/src/providers/snapshot/mod.rs b/crates/storage/provider/src/providers/snapshot/mod.rs new file mode 100644 index 0000000000..2f0dad3c58 --- /dev/null +++ b/crates/storage/provider/src/providers/snapshot/mod.rs @@ -0,0 +1,162 @@ +mod manager; +pub use manager::SnapshotProvider; + +mod jar; +pub use jar::SnapshotJarProvider; + +use reth_interfaces::RethResult; +use reth_nippy_jar::NippyJar; +use reth_primitives::{snapshot::SegmentHeader, SnapshotSegment}; +use std::ops::Deref; + +/// Alias type for each specific `NippyJar`. +type LoadedJarRef<'a> = dashmap::mapref::one::Ref<'a, (u64, SnapshotSegment), LoadedJar>; + +/// Helper type to reuse an associated snapshot mmap handle on created cursors. +#[derive(Debug)] +pub struct LoadedJar { + jar: NippyJar, + mmap_handle: reth_nippy_jar::MmapHandle, +} + +impl LoadedJar { + fn new(jar: NippyJar) -> RethResult { + let mmap_handle = jar.open_data()?; + Ok(Self { jar, mmap_handle }) + } + + /// Returns a clone of the mmap handle that can be used to instantiate a cursor. + fn mmap_handle(&self) -> reth_nippy_jar::MmapHandle { + self.mmap_handle.clone() + } +} + +impl Deref for LoadedJar { + type Target = NippyJar; + fn deref(&self) -> &Self::Target { + &self.jar + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::{HeaderProvider, ProviderFactory}; + use rand::{self, seq::SliceRandom}; + use reth_db::{ + cursor::DbCursorRO, + database::Database, + snapshot::create_snapshot_T1_T2, + test_utils::create_test_rw_db, + transaction::{DbTx, DbTxMut}, + CanonicalHeaders, DatabaseError, HeaderNumbers, HeaderTD, Headers, RawTable, + }; + use reth_interfaces::test_utils::generators::{self, random_header_range}; + use reth_nippy_jar::NippyJar; + use reth_primitives::{BlockNumber, B256, MAINNET, U256}; + + #[test] + fn test_snap() { + // Ranges + let row_count = 100u64; + let range = 0..=(row_count - 1); + let segment_header = SegmentHeader::new(range.clone(), range.clone()); + + // Data sources + let db = create_test_rw_db(); + let factory = ProviderFactory::new(&db, MAINNET.clone()); + let snap_file = tempfile::NamedTempFile::new().unwrap(); + + // Setup data + let mut headers = random_header_range( + &mut generators::rng(), + *range.start()..(*range.end() + 1), + B256::random(), + ); + + db.update(|tx| -> Result<(), DatabaseError> { + let mut td = U256::ZERO; + for header in headers.clone() { + td += header.header.difficulty; + let hash = header.hash(); + + tx.put::(header.number, hash)?; + tx.put::(header.number, header.clone().unseal())?; + tx.put::(header.number, td.into())?; + tx.put::(hash, header.number)?; + } + Ok(()) + }) + .unwrap() + .unwrap(); + + // Create Snapshot + { + let with_compression = true; + let with_filter = true; + + let mut nippy_jar = NippyJar::new(2, snap_file.path(), segment_header); + + if with_compression { + nippy_jar = nippy_jar.with_zstd(false, 0); + } + + if with_filter { + nippy_jar = nippy_jar.with_cuckoo_filter(row_count as usize + 10).with_fmph(); + } + + let tx = db.tx().unwrap(); + + // Hacky type inference. TODO fix + let mut none_vec = Some(vec![vec![vec![0u8]].into_iter()]); + let _ = none_vec.take(); + + // Generate list of hashes for filters & PHF + let mut cursor = tx.cursor_read::>().unwrap(); + let hashes = cursor + .walk(None) + .unwrap() + .map(|row| row.map(|(_key, value)| value.into_value()).map_err(|e| e.into())); + + create_snapshot_T1_T2::( + &tx, + range, + None, + none_vec, + Some(hashes), + row_count as usize, + &mut nippy_jar, + ) + .unwrap(); + } + + // Use providers to query Header data and compare if it matches + { + let db_provider = factory.provider().unwrap(); + let manager = SnapshotProvider::default(); + let jar_provider = manager + .get_segment_provider(SnapshotSegment::Headers, 0, Some(snap_file.path().into())) + .unwrap(); + + assert!(!headers.is_empty()); + + // Shuffled for chaos. + headers.shuffle(&mut generators::rng()); + + for header in headers { + let header_hash = header.hash(); + let header = header.unseal(); + + // Compare Header + assert_eq!(header, db_provider.header(&header_hash).unwrap().unwrap()); + assert_eq!(header, jar_provider.header(&header_hash).unwrap().unwrap()); + + // Compare HeaderTD + assert_eq!( + db_provider.header_td(&header_hash).unwrap().unwrap(), + jar_provider.header_td(&header_hash).unwrap().unwrap() + ); + } + } + } +}