mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
feat(era): Delete files outside the range before counting them (#16805)
This commit is contained in:
@@ -133,6 +133,27 @@ impl<Http: HttpClient + Clone> EraClient<Http> {
|
||||
max
|
||||
}
|
||||
|
||||
/// Deletes files that are outside-of the working range.
|
||||
pub async fn delete_outside_range(&self, index: usize, max_files: usize) -> eyre::Result<()> {
|
||||
let last = index + max_files;
|
||||
|
||||
if let Ok(mut dir) = fs::read_dir(&self.folder).await {
|
||||
while let Ok(Some(entry)) = dir.next_entry().await {
|
||||
if let Some(name) = entry.file_name().to_str() {
|
||||
if let Some(number) = self.file_name_to_number(name) {
|
||||
if number < index || number >= last {
|
||||
eprintln!("Deleting kokot {}", entry.path().display());
|
||||
eprintln!("{number} < {index} || {number} > {last}");
|
||||
reth_fs_util::remove_file(entry.path())?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns a download URL for the file corresponding to `number`.
|
||||
pub async fn url(&self, number: usize) -> eyre::Result<Option<Url>> {
|
||||
Ok(self.number_to_file_name(number).await?.map(|name| self.url.join(&name)).transpose()?)
|
||||
|
||||
@@ -90,6 +90,7 @@ impl<Http> EraStream<Http> {
|
||||
client,
|
||||
files_count: Box::pin(async move { usize::MAX }),
|
||||
next_url: Box::pin(async move { Ok(None) }),
|
||||
delete_outside_range: Box::pin(async move { Ok(()) }),
|
||||
recover_index: Box::pin(async move { None }),
|
||||
fetch_file_list: Box::pin(async move { Ok(()) }),
|
||||
state: Default::default(),
|
||||
@@ -221,6 +222,7 @@ struct StartingStream<Http> {
|
||||
client: EraClient<Http>,
|
||||
files_count: Pin<Box<dyn Future<Output = usize> + Send + Sync + 'static>>,
|
||||
next_url: Pin<Box<dyn Future<Output = eyre::Result<Option<Url>>> + Send + Sync + 'static>>,
|
||||
delete_outside_range: Pin<Box<dyn Future<Output = eyre::Result<()>> + Send + Sync + 'static>>,
|
||||
recover_index: Pin<Box<dyn Future<Output = Option<usize>> + Send + Sync + 'static>>,
|
||||
fetch_file_list: Pin<Box<dyn Future<Output = eyre::Result<()>> + Send + Sync + 'static>>,
|
||||
state: State,
|
||||
@@ -245,6 +247,7 @@ enum State {
|
||||
#[default]
|
||||
Initial,
|
||||
FetchFileList,
|
||||
DeleteOutsideRange,
|
||||
RecoverIndex,
|
||||
CountFiles,
|
||||
Missing(usize),
|
||||
@@ -262,11 +265,24 @@ impl<Http: HttpClient + Clone + Send + Sync + 'static + Unpin> Stream for Starti
|
||||
if self.state == State::FetchFileList {
|
||||
if let Poll::Ready(result) = self.fetch_file_list.poll_unpin(cx) {
|
||||
match result {
|
||||
Ok(_) => self.recover_index(),
|
||||
Ok(_) => self.delete_outside_range(),
|
||||
Err(e) => {
|
||||
self.fetch_file_list();
|
||||
|
||||
return Poll::Ready(Some(Box::pin(async move { Err(e) })))
|
||||
return Poll::Ready(Some(Box::pin(async move { Err(e) })));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if self.state == State::DeleteOutsideRange {
|
||||
if let Poll::Ready(result) = self.delete_outside_range.poll_unpin(cx) {
|
||||
match result {
|
||||
Ok(_) => self.recover_index(),
|
||||
Err(e) => {
|
||||
self.delete_outside_range();
|
||||
|
||||
return Poll::Ready(Some(Box::pin(async move { Err(e) })));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -334,6 +350,17 @@ impl<Http: HttpClient + Clone + Send + Sync + 'static> StartingStream<Http> {
|
||||
self.state = State::FetchFileList;
|
||||
}
|
||||
|
||||
fn delete_outside_range(&mut self) {
|
||||
let index = self.index;
|
||||
let max_files = self.max_files;
|
||||
let client = self.client.clone();
|
||||
|
||||
Pin::new(&mut self.delete_outside_range)
|
||||
.set(Box::pin(async move { client.delete_outside_range(index, max_files).await }));
|
||||
|
||||
self.state = State::DeleteOutsideRange;
|
||||
}
|
||||
|
||||
fn recover_index(&mut self) {
|
||||
let client = self.client.clone();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user