From b5d61d80eb499b52274ff373e38f85fd9d5600d1 Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Wed, 3 Jul 2024 07:18:42 -0400 Subject: [PATCH] feat: add pruner to persistence task (#9251) Co-authored-by: Matthias Seitz Co-authored-by: Federico Gimenez --- crates/engine/tree/src/persistence.rs | 47 +++++++++++++++++---------- 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index 96268ae64a..2038234393 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -8,7 +8,8 @@ use reth_provider::{ bundle_state::HashedStateChanges, BlockWriter, HistoryWriter, OriginalValuesKnown, ProviderFactory, StageCheckpointWriter, StateWriter, }; -use std::sync::mpsc::{Receiver, Sender}; +use reth_prune::{PruneProgress, Pruner}; +use std::sync::mpsc::{Receiver, SendError, Sender}; use tokio::sync::oneshot; use tracing::debug; @@ -29,12 +30,18 @@ pub struct Persistence { provider: ProviderFactory, /// Incoming requests to persist stuff incoming: Receiver, + /// The pruner + pruner: Pruner, } impl Persistence { /// Create a new persistence task - const fn new(provider: ProviderFactory, incoming: Receiver) -> Self { - Self { provider, incoming } + const fn new( + provider: ProviderFactory, + incoming: Receiver, + pruner: Pruner, + ) -> Self { + Self { provider, incoming, pruner } } /// Writes the cloned tree state to the database @@ -101,8 +108,9 @@ impl Persistence { /// Prunes block data before the given block hash according to the configured prune /// configuration. - fn prune_before(&self, _block_hash: B256) { - todo!("implement this") + fn prune_before(&mut self, block_num: u64) -> PruneProgress { + // TODO: doing this properly depends on pruner segment changes + self.pruner.run(block_num).expect("todo: handle errors") } /// Removes static file related data from the database, depending on the current block height in @@ -117,9 +125,9 @@ where DB: Database + 'static, { /// Create a new persistence task, spawning it, and returning a [`PersistenceHandle`]. - fn spawn_new(provider: ProviderFactory) -> PersistenceHandle { + fn spawn_new(provider: ProviderFactory, pruner: Pruner) -> PersistenceHandle { let (tx, rx) = std::sync::mpsc::channel(); - let task = Self::new(provider, rx); + let task = Self::new(provider, rx, pruner); std::thread::Builder::new() .name("Persistence Task".to_string()) .spawn(|| task.run()) @@ -135,7 +143,7 @@ where { /// This is the main loop, that will listen to persistence events and perform the requested /// database actions - fn run(self) { + fn run(mut self) { // If the receiver errors then senders have disconnected, so the loop should then end. while let Ok(action) = self.incoming.recv() { match action { @@ -155,11 +163,11 @@ where // we ignore the error because the caller may or may not care about the result let _ = sender.send(last_block_hash); } - PersistenceAction::PruneBefore((block_hash, sender)) => { - self.prune_before(block_hash); + PersistenceAction::PruneBefore((block_num, sender)) => { + let res = self.prune_before(block_num); // we ignore the error because the caller may or may not care about the result - let _ = sender.send(()); + let _ = sender.send(res); } PersistenceAction::CleanStaticFileDuplicates(sender) => { self.clean_static_file_duplicates(); @@ -182,9 +190,9 @@ pub enum PersistenceAction { /// Removes the blocks above the given block number from the database. RemoveBlocksAbove((u64, oneshot::Sender>)), - /// Prune associated block data before the given hash, according to already-configured prune - /// modes. - PruneBefore((B256, oneshot::Sender<()>)), + /// Prune associated block data before the given block number, according to already-configured + /// prune modes. + PruneBefore((u64, oneshot::Sender)), /// Trigger a read of static file data, and delete data depending on the highest block in each /// static file segment. @@ -206,8 +214,11 @@ impl PersistenceHandle { /// Sends a specific [`PersistenceAction`] in the contained channel. The caller is responsible /// for creating any channels for the given action. - pub fn send_action(&self, action: PersistenceAction) { - self.sender.send(action).expect("should be able to send"); + pub fn send_action( + &self, + action: PersistenceAction, + ) -> Result<(), SendError> { + self.sender.send(action) } /// Tells the persistence task to save a certain list of finalized blocks. The blocks are @@ -235,10 +246,10 @@ impl PersistenceHandle { /// Tells the persistence task to remove block data before the given hash, according to the /// configured prune config. - pub async fn prune_before(&self, block_hash: B256) { + pub async fn prune_before(&self, block_num: u64) -> PruneProgress { let (tx, rx) = oneshot::channel(); self.sender - .send(PersistenceAction::PruneBefore((block_hash, tx))) + .send(PersistenceAction::PruneBefore((block_num, tx))) .expect("should be able to send"); rx.await.expect("todo: err handling") }