feat: add directory watcher to StaticFileProvider::read_only (#10701)

This commit is contained in:
joshieDo
2024-09-05 17:32:37 +01:00
committed by GitHub
parent 1ec5678081
commit e8128a3c85
9 changed files with 187 additions and 8 deletions

View File

@@ -87,7 +87,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> EnvironmentArgs<C> {
),
AccessRights::RO => (
Arc::new(open_db_read_only(&db_path, self.db.database_args())?),
StaticFileProvider::read_only(sf_path)?,
StaticFileProvider::read_only(sf_path, false)?,
),
};

View File

@@ -173,7 +173,7 @@ impl Command {
}
let static_files = iter_static_files(data_dir.static_files())?;
let static_file_provider = StaticFileProvider::read_only(data_dir.static_files())?;
let static_file_provider = StaticFileProvider::read_only(data_dir.static_files(), false)?;
let mut total_data_size = 0;
let mut total_index_size = 0;

View File

@@ -59,7 +59,7 @@ const NIPPY_JAR_VERSION: usize = 1;
const INDEX_FILE_EXTENSION: &str = "idx";
const OFFSETS_FILE_EXTENSION: &str = "off";
const CONFIG_FILE_EXTENSION: &str = "conf";
pub const CONFIG_FILE_EXTENSION: &str = "conf";
/// A [`RefRow`] is a list of column value slices pointing to either an internal buffer or a
/// memory-mapped file.

View File

@@ -51,6 +51,7 @@ metrics.workspace = true
# misc
auto_impl.workspace = true
itertools.workspace = true
notify = { workspace = true, default-features = false, features = ["macos_fsevent"] }
parking_lot.workspace = true
dashmap = { workspace = true, features = ["inline"] }
strum.workspace = true

View File

@@ -9,6 +9,7 @@ use crate::{
TransactionVariant, TransactionsProvider, TransactionsProviderExt, WithdrawalsProvider,
};
use dashmap::DashMap;
use notify::{RecommendedWatcher, RecursiveMode, Watcher};
use parking_lot::RwLock;
use reth_chainspec::ChainInfo;
use reth_db::{
@@ -22,7 +23,7 @@ use reth_db_api::{
table::Table,
transaction::DbTx,
};
use reth_nippy_jar::{NippyJar, NippyJarChecker};
use reth_nippy_jar::{NippyJar, NippyJarChecker, CONFIG_FILE_EXTENSION};
use reth_primitives::{
keccak256,
static_file::{find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive},
@@ -82,14 +83,105 @@ impl StaticFileProvider {
}
/// Creates a new [`StaticFileProvider`] with read-only access.
pub fn read_only(path: impl AsRef<Path>) -> ProviderResult<Self> {
Self::new(path, StaticFileAccess::RO)
///
/// Set `watch_directory` to `true` to track the most recent changes in static files. Otherwise,
/// new data won't be detected or queryable.
pub fn read_only(path: impl AsRef<Path>, watch_directory: bool) -> ProviderResult<Self> {
let provider = Self::new(path, StaticFileAccess::RO)?;
if watch_directory {
provider.watch_directory();
}
Ok(provider)
}
/// Creates a new [`StaticFileProvider`] with read-write access.
pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
Self::new(path, StaticFileAccess::RW)
}
/// Watches the directory for changes and updates the in-memory index when modifications
/// are detected.
///
/// This may be necessary, since a non-node process that owns a [`StaticFileProvider`] does not
/// receive `update_index` notifications from a node that appends/truncates data.
pub fn watch_directory(&self) {
let provider = self.clone();
std::thread::spawn(move || {
let (tx, rx) = std::sync::mpsc::channel();
let mut watcher = RecommendedWatcher::new(
move |res| tx.send(res).unwrap(),
notify::Config::default(),
)
.expect("failed to create watcher");
watcher
.watch(&provider.path, RecursiveMode::NonRecursive)
.expect("failed to watch path");
// Some backends send repeated modified events
let mut last_event_timestamp = None;
while let Ok(res) = rx.recv() {
match res {
Ok(event) => {
// We only care about modified data events
if !matches!(
event.kind,
notify::EventKind::Modify(notify::event::ModifyKind::Data(_))
) {
continue
}
// We only trigger a re-initialization if a configuration file was
// modified. This means that a
// static_file_provider.commit() was called on the node after
// appending/truncating rows
for segment in event.paths {
// Ensure it's a file with the .conf extension
if !segment
.extension()
.is_some_and(|s| s.to_str() == Some(CONFIG_FILE_EXTENSION))
{
continue
}
// Ensure it's well formatted static file name
if StaticFileSegment::parse_filename(
&segment.file_stem().expect("qed").to_string_lossy(),
)
.is_none()
{
continue
}
// If we can read the metadata and modified timestamp, ensure this is
// not an old or repeated event.
if let Ok(current_modified_timestamp) =
std::fs::metadata(&segment).and_then(|m| m.modified())
{
if last_event_timestamp.is_some_and(|last_timestamp| {
last_timestamp >= current_modified_timestamp
}) {
continue
}
last_event_timestamp = Some(current_modified_timestamp);
}
info!(target: "providers::static_file", updated_file = ?segment.file_stem(), "re-initializing static file provider index");
if let Err(err) = provider.initialize_index() {
warn!(target: "providers::static_file", "failed to re-initialize index: {err}");
}
break
}
}
Err(err) => warn!(target: "providers::watcher", "watch error: {err:?}"),
}
}
});
}
}
impl Deref for StaticFileProvider {
@@ -454,6 +546,7 @@ impl StaticFileProvider {
let mut max_block = self.static_files_max_block.write();
let mut tx_index = self.static_files_tx_index.write();
max_block.clear();
tx_index.clear();
for (segment, ranges) in
@@ -481,6 +574,9 @@ impl StaticFileProvider {
}
}
// If this is a re-initialization, we need to clear this as well
self.map.clear();
Ok(())
}