diff --git a/Cargo.lock b/Cargo.lock index 2c1f4eea4..0523a1100 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3044,6 +3044,18 @@ version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" +[[package]] +name = "filetime" +version = "0.2.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35c0522e981e68cbfa8c3f978441a5f34b30b96e146b33cd3359176b50fe8586" +dependencies = [ + "cfg-if", + "libc", + "libredox", + "windows-sys 0.59.0", +] + [[package]] name = "findshlibs" version = "0.10.2" @@ -3099,6 +3111,15 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "funty" version = "2.0.0" @@ -3950,6 +3971,26 @@ dependencies = [ "str_stack", ] +[[package]] +name = "inotify" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" +dependencies = [ + "bitflags 1.3.2", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "inout" version = "0.1.3" @@ -4343,6 +4384,26 @@ version = "0.2.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4933f3f57a8e9d9da04db23fb153356ecaf00cbd14aee46279c33dc80925c37" +[[package]] +name = "kqueue" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -4418,6 +4479,7 @@ checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" dependencies = [ "bitflags 2.6.0", "libc", + "redox_syscall 0.5.3", ] [[package]] @@ -4849,6 +4911,24 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "notify" +version = "6.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d" +dependencies = [ + "bitflags 2.6.0", + "filetime", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log", + "mio 0.8.11", + "walkdir", + "windows-sys 0.48.0", +] + [[package]] name = "ntapi" version = "0.4.1" @@ -8023,6 +8103,7 @@ dependencies = [ "eyre", "itertools 0.13.0", "metrics", + "notify", "once_cell", "parking_lot 0.12.3", "rand 0.8.5", diff --git a/Cargo.toml b/Cargo.toml index ed4547899..b102ded79 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -478,6 +478,7 @@ humantime-serde = "1.1" itertools = "0.13" linked_hash_set = "0.1" modular-bitfield = "0.11.2" +notify = { version = "6.1.1", default-features = false, features = ["macos_fsevent"] } nybbles = "0.2.1" once_cell = "1.19" parking_lot = "0.12" diff --git a/crates/cli/commands/src/common.rs b/crates/cli/commands/src/common.rs index 9470f2cfb..a2feb822a 100644 --- a/crates/cli/commands/src/common.rs +++ b/crates/cli/commands/src/common.rs @@ -87,7 +87,7 @@ impl> EnvironmentArgs { ), AccessRights::RO => ( Arc::new(open_db_read_only(&db_path, self.db.database_args())?), - StaticFileProvider::read_only(sf_path)?, + StaticFileProvider::read_only(sf_path, false)?, ), }; diff --git a/crates/cli/commands/src/db/stats.rs b/crates/cli/commands/src/db/stats.rs index 1db42b87e..a98dc23e7 100644 --- a/crates/cli/commands/src/db/stats.rs +++ b/crates/cli/commands/src/db/stats.rs @@ -173,7 +173,7 @@ impl Command { } let static_files = iter_static_files(data_dir.static_files())?; - let static_file_provider = StaticFileProvider::read_only(data_dir.static_files())?; + let static_file_provider = StaticFileProvider::read_only(data_dir.static_files(), false)?; let mut total_data_size = 0; let mut total_index_size = 0; diff --git a/crates/storage/nippy-jar/src/lib.rs b/crates/storage/nippy-jar/src/lib.rs index 20c06c5a8..a720192d6 100644 --- a/crates/storage/nippy-jar/src/lib.rs +++ b/crates/storage/nippy-jar/src/lib.rs @@ -59,7 +59,7 @@ const NIPPY_JAR_VERSION: usize = 1; const INDEX_FILE_EXTENSION: &str = "idx"; const OFFSETS_FILE_EXTENSION: &str = "off"; -const CONFIG_FILE_EXTENSION: &str = "conf"; +pub const CONFIG_FILE_EXTENSION: &str = "conf"; /// A [`RefRow`] is a list of column value slices pointing to either an internal buffer or a /// memory-mapped file. diff --git a/crates/storage/provider/Cargo.toml b/crates/storage/provider/Cargo.toml index 57f01fd51..75d05e477 100644 --- a/crates/storage/provider/Cargo.toml +++ b/crates/storage/provider/Cargo.toml @@ -51,6 +51,7 @@ metrics.workspace = true # misc auto_impl.workspace = true itertools.workspace = true +notify = { workspace = true, default-features = false, features = ["macos_fsevent"] } parking_lot.workspace = true dashmap = { workspace = true, features = ["inline"] } strum.workspace = true diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index 93a0f3523..bca34e4d2 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -9,6 +9,7 @@ use crate::{ TransactionVariant, TransactionsProvider, TransactionsProviderExt, WithdrawalsProvider, }; use dashmap::DashMap; +use notify::{RecommendedWatcher, RecursiveMode, Watcher}; use parking_lot::RwLock; use reth_chainspec::ChainInfo; use reth_db::{ @@ -22,7 +23,7 @@ use reth_db_api::{ table::Table, transaction::DbTx, }; -use reth_nippy_jar::{NippyJar, NippyJarChecker}; +use reth_nippy_jar::{NippyJar, NippyJarChecker, CONFIG_FILE_EXTENSION}; use reth_primitives::{ keccak256, static_file::{find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive}, @@ -82,14 +83,105 @@ impl StaticFileProvider { } /// Creates a new [`StaticFileProvider`] with read-only access. - pub fn read_only(path: impl AsRef) -> ProviderResult { - Self::new(path, StaticFileAccess::RO) + /// + /// Set `watch_directory` to `true` to track the most recent changes in static files. Otherwise, + /// new data won't be detected or queryable. + pub fn read_only(path: impl AsRef, watch_directory: bool) -> ProviderResult { + let provider = Self::new(path, StaticFileAccess::RO)?; + + if watch_directory { + provider.watch_directory(); + } + + Ok(provider) } /// Creates a new [`StaticFileProvider`] with read-write access. pub fn read_write(path: impl AsRef) -> ProviderResult { Self::new(path, StaticFileAccess::RW) } + + /// Watches the directory for changes and updates the in-memory index when modifications + /// are detected. + /// + /// This may be necessary, since a non-node process that owns a [`StaticFileProvider`] does not + /// receive `update_index` notifications from a node that appends/truncates data. + pub fn watch_directory(&self) { + let provider = self.clone(); + std::thread::spawn(move || { + let (tx, rx) = std::sync::mpsc::channel(); + let mut watcher = RecommendedWatcher::new( + move |res| tx.send(res).unwrap(), + notify::Config::default(), + ) + .expect("failed to create watcher"); + + watcher + .watch(&provider.path, RecursiveMode::NonRecursive) + .expect("failed to watch path"); + + // Some backends send repeated modified events + let mut last_event_timestamp = None; + + while let Ok(res) = rx.recv() { + match res { + Ok(event) => { + // We only care about modified data events + if !matches!( + event.kind, + notify::EventKind::Modify(notify::event::ModifyKind::Data(_)) + ) { + continue + } + + // We only trigger a re-initialization if a configuration file was + // modified. This means that a + // static_file_provider.commit() was called on the node after + // appending/truncating rows + for segment in event.paths { + // Ensure it's a file with the .conf extension + if !segment + .extension() + .is_some_and(|s| s.to_str() == Some(CONFIG_FILE_EXTENSION)) + { + continue + } + + // Ensure it's well formatted static file name + if StaticFileSegment::parse_filename( + &segment.file_stem().expect("qed").to_string_lossy(), + ) + .is_none() + { + continue + } + + // If we can read the metadata and modified timestamp, ensure this is + // not an old or repeated event. + if let Ok(current_modified_timestamp) = + std::fs::metadata(&segment).and_then(|m| m.modified()) + { + if last_event_timestamp.is_some_and(|last_timestamp| { + last_timestamp >= current_modified_timestamp + }) { + continue + } + last_event_timestamp = Some(current_modified_timestamp); + } + + info!(target: "providers::static_file", updated_file = ?segment.file_stem(), "re-initializing static file provider index"); + if let Err(err) = provider.initialize_index() { + warn!(target: "providers::static_file", "failed to re-initialize index: {err}"); + } + break + } + } + + Err(err) => warn!(target: "providers::watcher", "watch error: {err:?}"), + } + } + }); + } } impl Deref for StaticFileProvider { @@ -454,6 +546,7 @@ impl StaticFileProvider { let mut max_block = self.static_files_max_block.write(); let mut tx_index = self.static_files_tx_index.write(); + max_block.clear(); tx_index.clear(); for (segment, ranges) in @@ -481,6 +574,9 @@ impl StaticFileProvider { } } + // If this is a re-initialization, we need to clear this as well + self.map.clear(); + Ok(()) } diff --git a/examples/db-access/src/main.rs b/examples/db-access/src/main.rs index 9d11950fc..7886a3bdd 100644 --- a/examples/db-access/src/main.rs +++ b/examples/db-access/src/main.rs @@ -28,7 +28,7 @@ fn main() -> eyre::Result<()> { db_path, spec.into(), Default::default(), - StaticFileProvider::read_only(db_path.join("static_files"))?, + StaticFileProvider::read_only(db_path.join("static_files"), false)?, )?; // This call opens a RO transaction on the database. To write to the DB you'd need to call diff --git a/examples/rpc-db/src/main.rs b/examples/rpc-db/src/main.rs index 1b03f2e5b..ccf81acdd 100644 --- a/examples/rpc-db/src/main.rs +++ b/examples/rpc-db/src/main.rs @@ -53,7 +53,7 @@ async fn main() -> eyre::Result<()> { let factory = ProviderFactory::>>::new( db.clone(), spec.clone(), - StaticFileProvider::read_only(db_path.join("static_files"))?, + StaticFileProvider::read_only(db_path.join("static_files"), true)?, ); // 2. Setup the blockchain provider using only the database provider and a noop for the tree to