feat: add pruner to persistence task (#9251)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Federico Gimenez <fgimenez@users.noreply.github.com>
This commit is contained in:
Dan Cline
2024-07-03 07:18:42 -04:00
committed by GitHub
parent 8e5204c119
commit b5d61d80eb

View File

@@ -8,7 +8,8 @@ use reth_provider::{
bundle_state::HashedStateChanges, BlockWriter, HistoryWriter, OriginalValuesKnown,
ProviderFactory, StageCheckpointWriter, StateWriter,
};
use std::sync::mpsc::{Receiver, Sender};
use reth_prune::{PruneProgress, Pruner};
use std::sync::mpsc::{Receiver, SendError, Sender};
use tokio::sync::oneshot;
use tracing::debug;
@@ -29,12 +30,18 @@ pub struct Persistence<DB> {
provider: ProviderFactory<DB>,
/// Incoming requests to persist stuff
incoming: Receiver<PersistenceAction>,
/// The pruner
pruner: Pruner<DB>,
}
impl<DB: Database> Persistence<DB> {
/// Create a new persistence task
const fn new(provider: ProviderFactory<DB>, incoming: Receiver<PersistenceAction>) -> Self {
Self { provider, incoming }
const fn new(
provider: ProviderFactory<DB>,
incoming: Receiver<PersistenceAction>,
pruner: Pruner<DB>,
) -> Self {
Self { provider, incoming, pruner }
}
/// Writes the cloned tree state to the database
@@ -101,8 +108,9 @@ impl<DB: Database> Persistence<DB> {
/// Prunes block data before the given block hash according to the configured prune
/// configuration.
fn prune_before(&self, _block_hash: B256) {
todo!("implement this")
fn prune_before(&mut self, block_num: u64) -> PruneProgress {
// TODO: doing this properly depends on pruner segment changes
self.pruner.run(block_num).expect("todo: handle errors")
}
/// Removes static file related data from the database, depending on the current block height in
@@ -117,9 +125,9 @@ where
DB: Database + 'static,
{
/// Create a new persistence task, spawning it, and returning a [`PersistenceHandle`].
fn spawn_new(provider: ProviderFactory<DB>) -> PersistenceHandle {
fn spawn_new(provider: ProviderFactory<DB>, pruner: Pruner<DB>) -> PersistenceHandle {
let (tx, rx) = std::sync::mpsc::channel();
let task = Self::new(provider, rx);
let task = Self::new(provider, rx, pruner);
std::thread::Builder::new()
.name("Persistence Task".to_string())
.spawn(|| task.run())
@@ -135,7 +143,7 @@ where
{
/// This is the main loop, that will listen to persistence events and perform the requested
/// database actions
fn run(self) {
fn run(mut self) {
// If the receiver errors then senders have disconnected, so the loop should then end.
while let Ok(action) = self.incoming.recv() {
match action {
@@ -155,11 +163,11 @@ where
// we ignore the error because the caller may or may not care about the result
let _ = sender.send(last_block_hash);
}
PersistenceAction::PruneBefore((block_hash, sender)) => {
self.prune_before(block_hash);
PersistenceAction::PruneBefore((block_num, sender)) => {
let res = self.prune_before(block_num);
// we ignore the error because the caller may or may not care about the result
let _ = sender.send(());
let _ = sender.send(res);
}
PersistenceAction::CleanStaticFileDuplicates(sender) => {
self.clean_static_file_duplicates();
@@ -182,9 +190,9 @@ pub enum PersistenceAction {
/// Removes the blocks above the given block number from the database.
RemoveBlocksAbove((u64, oneshot::Sender<Vec<ExecutedBlock>>)),
/// Prune associated block data before the given hash, according to already-configured prune
/// modes.
PruneBefore((B256, oneshot::Sender<()>)),
/// Prune associated block data before the given block number, according to already-configured
/// prune modes.
PruneBefore((u64, oneshot::Sender<PruneProgress>)),
/// Trigger a read of static file data, and delete data depending on the highest block in each
/// static file segment.
@@ -206,8 +214,11 @@ impl PersistenceHandle {
/// Sends a specific [`PersistenceAction`] in the contained channel. The caller is responsible
/// for creating any channels for the given action.
pub fn send_action(&self, action: PersistenceAction) {
self.sender.send(action).expect("should be able to send");
pub fn send_action(
&self,
action: PersistenceAction,
) -> Result<(), SendError<PersistenceAction>> {
self.sender.send(action)
}
/// Tells the persistence task to save a certain list of finalized blocks. The blocks are
@@ -235,10 +246,10 @@ impl PersistenceHandle {
/// Tells the persistence task to remove block data before the given hash, according to the
/// configured prune config.
pub async fn prune_before(&self, block_hash: B256) {
pub async fn prune_before(&self, block_num: u64) -> PruneProgress {
let (tx, rx) = oneshot::channel();
self.sender
.send(PersistenceAction::PruneBefore((block_hash, tx)))
.send(PersistenceAction::PruneBefore((block_num, tx)))
.expect("should be able to send");
rx.await.expect("todo: err handling")
}