mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-04 20:15:03 -05:00
fix(exex): set next file ID when creating WAL storage (#11372)
This commit is contained in:
@@ -14,12 +14,12 @@ use reth_exex_types::ExExNotification;
|
||||
#[derive(Debug, Default)]
|
||||
pub struct BlockCache {
|
||||
/// A min heap of `(Block Number, File ID)` tuples.
|
||||
pub(super) blocks: BinaryHeap<Reverse<(BlockNumber, u64)>>,
|
||||
pub(super) blocks: BinaryHeap<Reverse<(BlockNumber, u32)>>,
|
||||
/// A mapping of committed blocks `Block Hash -> Block`.
|
||||
///
|
||||
/// For each [`ExExNotification::ChainCommitted`] notification, there will be an entry per
|
||||
/// block.
|
||||
pub(super) committed_blocks: FbHashMap<32, (u64, CachedBlock)>,
|
||||
pub(super) committed_blocks: FbHashMap<32, (u32, CachedBlock)>,
|
||||
}
|
||||
|
||||
impl BlockCache {
|
||||
@@ -34,7 +34,7 @@ impl BlockCache {
|
||||
/// # Returns
|
||||
///
|
||||
/// A set of file IDs that were removed.
|
||||
pub(super) fn remove_before(&mut self, block_number: BlockNumber) -> HashSet<u64> {
|
||||
pub(super) fn remove_before(&mut self, block_number: BlockNumber) -> HashSet<u32> {
|
||||
let mut file_ids = HashSet::default();
|
||||
|
||||
while let Some(block @ Reverse((max_block, file_id))) = self.blocks.peek().copied() {
|
||||
@@ -54,14 +54,14 @@ impl BlockCache {
|
||||
|
||||
/// Returns the file ID for the notification containing the given committed block hash, if it
|
||||
/// exists.
|
||||
pub(super) fn get_file_id_by_committed_block_hash(&self, block_hash: &B256) -> Option<u64> {
|
||||
pub(super) fn get_file_id_by_committed_block_hash(&self, block_hash: &B256) -> Option<u32> {
|
||||
self.committed_blocks.get(block_hash).map(|entry| entry.0)
|
||||
}
|
||||
|
||||
/// Inserts the blocks from the notification into the cache with the given file ID.
|
||||
pub(super) fn insert_notification_blocks_with_file_id(
|
||||
&mut self,
|
||||
file_id: u64,
|
||||
file_id: u32,
|
||||
notification: &ExExNotification,
|
||||
) {
|
||||
let reverted_chain = notification.reverted_chain();
|
||||
@@ -85,12 +85,12 @@ impl BlockCache {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(super) fn blocks_sorted(&self) -> Vec<(BlockNumber, u64)> {
|
||||
pub(super) fn blocks_sorted(&self) -> Vec<(BlockNumber, u32)> {
|
||||
self.blocks.clone().into_sorted_vec().into_iter().map(|entry| entry.0).collect()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(super) fn committed_blocks_sorted(&self) -> Vec<(B256, u64, CachedBlock)> {
|
||||
pub(super) fn committed_blocks_sorted(&self) -> Vec<(B256, u32, CachedBlock)> {
|
||||
use itertools::Itertools;
|
||||
|
||||
self.committed_blocks
|
||||
|
||||
@@ -9,7 +9,7 @@ pub use storage::Storage;
|
||||
use std::{
|
||||
path::Path,
|
||||
sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
atomic::{AtomicU32, Ordering},
|
||||
Arc,
|
||||
},
|
||||
};
|
||||
@@ -69,7 +69,7 @@ impl Wal {
|
||||
/// Inner type for the WAL.
|
||||
#[derive(Debug)]
|
||||
struct WalInner {
|
||||
next_file_id: AtomicUsize,
|
||||
next_file_id: AtomicU32,
|
||||
/// The underlying WAL storage backed by a file.
|
||||
storage: Storage,
|
||||
/// WAL block cache. See [`cache::BlockCache`] docs for more details.
|
||||
@@ -79,7 +79,7 @@ struct WalInner {
|
||||
impl WalInner {
|
||||
fn new(directory: impl AsRef<Path>) -> eyre::Result<Self> {
|
||||
let mut wal = Self {
|
||||
next_file_id: AtomicUsize::new(0),
|
||||
next_file_id: AtomicU32::new(0),
|
||||
storage: Storage::new(directory)?,
|
||||
block_cache: RwLock::new(BlockCache::default()),
|
||||
};
|
||||
@@ -95,6 +95,7 @@ impl WalInner {
|
||||
#[instrument(target = "exex::wal", skip(self))]
|
||||
fn fill_block_cache(&mut self) -> eyre::Result<()> {
|
||||
let Some(files_range) = self.storage.files_range()? else { return Ok(()) };
|
||||
self.next_file_id.store(files_range.end() + 1, Ordering::Relaxed);
|
||||
|
||||
let mut block_cache = self.block_cache.write();
|
||||
|
||||
@@ -113,8 +114,6 @@ impl WalInner {
|
||||
);
|
||||
|
||||
block_cache.insert_notification_blocks_with_file_id(file_id, ¬ification);
|
||||
|
||||
self.next_file_id.fetch_max(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -127,7 +126,7 @@ impl WalInner {
|
||||
fn commit(&self, notification: &ExExNotification) -> eyre::Result<()> {
|
||||
let mut block_cache = self.block_cache.write();
|
||||
|
||||
let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed) as u64;
|
||||
let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed);
|
||||
self.storage.write_notification(file_id, notification)?;
|
||||
|
||||
debug!(?file_id, "Inserting notification blocks into the block cache");
|
||||
@@ -211,8 +210,8 @@ mod tests {
|
||||
}
|
||||
|
||||
fn sort_committed_blocks(
|
||||
committed_blocks: Vec<(B256, u64, CachedBlock)>,
|
||||
) -> Vec<(B256, u64, CachedBlock)> {
|
||||
committed_blocks: Vec<(B256, u32, CachedBlock)>,
|
||||
) -> Vec<(B256, u32, CachedBlock)> {
|
||||
committed_blocks
|
||||
.into_iter()
|
||||
.sorted_by_key(|(_, _, block)| (block.block.number, block.block.hash))
|
||||
@@ -444,6 +443,27 @@ mod tests {
|
||||
wal.inner.block_cache().blocks_sorted(),
|
||||
[reorged_notification_cache_blocks, committed_notification_2_cache_blocks]
|
||||
);
|
||||
assert_eq!(
|
||||
wal.inner.block_cache().committed_blocks_sorted(),
|
||||
sort_committed_blocks(
|
||||
[
|
||||
committed_notification_2_cache_committed_blocks.clone(),
|
||||
reorged_notification_cache_committed_blocks.clone()
|
||||
]
|
||||
.concat()
|
||||
)
|
||||
);
|
||||
assert_eq!(
|
||||
read_notifications(&wal)?,
|
||||
vec![committed_notification_2.clone(), reorged_notification.clone()]
|
||||
);
|
||||
|
||||
// Re-open the WAL and verify that the cache population works correctly
|
||||
let wal = Wal::new(&temp_dir)?;
|
||||
assert_eq!(
|
||||
wal.inner.block_cache().blocks_sorted(),
|
||||
[reorged_notification_cache_blocks, committed_notification_2_cache_blocks]
|
||||
);
|
||||
assert_eq!(
|
||||
wal.inner.block_cache().committed_blocks_sorted(),
|
||||
sort_committed_blocks(
|
||||
|
||||
@@ -28,11 +28,11 @@ impl Storage {
|
||||
Ok(Self { path: path.as_ref().to_path_buf() })
|
||||
}
|
||||
|
||||
fn file_path(&self, id: u64) -> PathBuf {
|
||||
fn file_path(&self, id: u32) -> PathBuf {
|
||||
self.path.join(format!("{id}.wal"))
|
||||
}
|
||||
|
||||
fn parse_filename(filename: &str) -> eyre::Result<u64> {
|
||||
fn parse_filename(filename: &str) -> eyre::Result<u32> {
|
||||
filename
|
||||
.strip_suffix(".wal")
|
||||
.and_then(|s| s.parse().ok())
|
||||
@@ -41,7 +41,7 @@ impl Storage {
|
||||
|
||||
/// Removes notification for the given file ID from the storage.
|
||||
#[instrument(target = "exex::wal::storage", skip(self))]
|
||||
fn remove_notification(&self, file_id: u64) -> bool {
|
||||
fn remove_notification(&self, file_id: u32) -> bool {
|
||||
match reth_fs_util::remove_file(self.file_path(file_id)) {
|
||||
Ok(()) => {
|
||||
debug!("Notification was removed from the storage");
|
||||
@@ -57,7 +57,7 @@ impl Storage {
|
||||
/// Returns the range of file IDs in the storage.
|
||||
///
|
||||
/// If there are no files in the storage, returns `None`.
|
||||
pub(super) fn files_range(&self) -> eyre::Result<Option<RangeInclusive<u64>>> {
|
||||
pub(super) fn files_range(&self) -> eyre::Result<Option<RangeInclusive<u32>>> {
|
||||
let mut min_id = None;
|
||||
let mut max_id = None;
|
||||
|
||||
@@ -66,8 +66,8 @@ impl Storage {
|
||||
let file_name = entry.file_name();
|
||||
let file_id = Self::parse_filename(&file_name.to_string_lossy())?;
|
||||
|
||||
min_id = min_id.map_or(Some(file_id), |min_id: u64| Some(min_id.min(file_id)));
|
||||
max_id = max_id.map_or(Some(file_id), |max_id: u64| Some(max_id.max(file_id)));
|
||||
min_id = min_id.map_or(Some(file_id), |min_id: u32| Some(min_id.min(file_id)));
|
||||
max_id = max_id.map_or(Some(file_id), |max_id: u32| Some(max_id.max(file_id)));
|
||||
}
|
||||
|
||||
Ok(min_id.zip(max_id).map(|(min_id, max_id)| min_id..=max_id))
|
||||
@@ -80,7 +80,7 @@ impl Storage {
|
||||
/// Number of removed notifications.
|
||||
pub(super) fn remove_notifications(
|
||||
&self,
|
||||
file_ids: impl IntoIterator<Item = u64>,
|
||||
file_ids: impl IntoIterator<Item = u32>,
|
||||
) -> eyre::Result<usize> {
|
||||
let mut deleted = 0;
|
||||
|
||||
@@ -95,8 +95,8 @@ impl Storage {
|
||||
|
||||
pub(super) fn iter_notifications(
|
||||
&self,
|
||||
range: RangeInclusive<u64>,
|
||||
) -> impl Iterator<Item = eyre::Result<(u64, ExExNotification)>> + '_ {
|
||||
range: RangeInclusive<u32>,
|
||||
) -> impl Iterator<Item = eyre::Result<(u32, ExExNotification)>> + '_ {
|
||||
range.map(move |id| {
|
||||
let notification = self.read_notification(id)?.ok_or_eyre("notification not found")?;
|
||||
|
||||
@@ -106,7 +106,7 @@ impl Storage {
|
||||
|
||||
/// Reads the notification from the file with the given ID.
|
||||
#[instrument(target = "exex::wal::storage", skip(self))]
|
||||
pub(super) fn read_notification(&self, file_id: u64) -> eyre::Result<Option<ExExNotification>> {
|
||||
pub(super) fn read_notification(&self, file_id: u32) -> eyre::Result<Option<ExExNotification>> {
|
||||
let file_path = self.file_path(file_id);
|
||||
debug!(?file_path, "Reading notification from WAL");
|
||||
|
||||
@@ -127,7 +127,7 @@ impl Storage {
|
||||
#[instrument(target = "exex::wal::storage", skip(self, notification))]
|
||||
pub(super) fn write_notification(
|
||||
&self,
|
||||
file_id: u64,
|
||||
file_id: u32,
|
||||
notification: &ExExNotification,
|
||||
) -> eyre::Result<()> {
|
||||
let file_path = self.file_path(file_id);
|
||||
|
||||
Reference in New Issue
Block a user