From db3d1335a8679ea2f34fd8dad8c36814b3766060 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Mon, 3 Jun 2024 14:28:56 +0200 Subject: [PATCH] feat: adds `StorageLock` to `StaticFileProvider` and `mdbx::DatabaseEnv` (#8528) --- Cargo.lock | 37 ++++++++- bin/reth/src/commands/db/static_files/mod.rs | 2 +- .../src/commands/debug_cmd/build_block.rs | 19 ++--- bin/reth/src/commands/debug_cmd/execution.rs | 9 +-- .../commands/debug_cmd/in_memory_merkle.rs | 20 ++--- bin/reth/src/commands/debug_cmd/merkle.rs | 16 ++-- .../src/commands/debug_cmd/replay_engine.rs | 14 +--- bin/reth/src/commands/stage/dump/execution.rs | 2 +- .../commands/stage/dump/hashing_account.rs | 2 +- .../commands/stage/dump/hashing_storage.rs | 2 +- bin/reth/src/commands/stage/dump/merkle.rs | 2 +- bin/reth/src/commands/stage/dump/mod.rs | 8 +- bin/reth/src/commands/stage/run.rs | 18 ++--- crates/storage/db-common/src/init.rs | 2 +- crates/storage/db/Cargo.toml | 1 + .../storage/db/src/implementation/mdbx/mod.rs | 13 +++ crates/storage/db/src/lib.rs | 1 + crates/storage/db/src/lockfile.rs | 79 +++++++++++++++++++ crates/storage/errors/src/db.rs | 3 + crates/storage/errors/src/lib.rs | 3 + crates/storage/errors/src/lockfile.rs | 20 +++++ crates/storage/errors/src/provider.rs | 3 + .../src/providers/static_file/manager.rs | 15 ++++ 23 files changed, 216 insertions(+), 75 deletions(-) create mode 100644 crates/storage/db/src/lockfile.rs create mode 100644 crates/storage/errors/src/lockfile.rs diff --git a/Cargo.lock b/Cargo.lock index e6b9fdcb6..4eabcf68b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4754,7 +4754,7 @@ dependencies = [ "once_cell", "procfs", "rlimit", - "windows", + "windows 0.56.0", ] [[package]] @@ -5019,6 +5019,15 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "ntapi" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4" +dependencies = [ + "winapi", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -6452,6 +6461,7 @@ dependencies = [ "serde", "serde_json", "strum", + "sysinfo", "tempfile", "test-fuzz", "thiserror", @@ -9113,6 +9123,21 @@ dependencies = [ "syn 2.0.66", ] +[[package]] +name = "sysinfo" +version = "0.30.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "732ffa00f53e6b2af46208fba5718d9662a421049204e156328b66791ffa15ae" +dependencies = [ + "cfg-if", + "core-foundation-sys", + "libc", + "ntapi", + "once_cell", + "rayon", + "windows 0.52.0", +] + [[package]] name = "system-configuration" version = "0.5.1" @@ -10157,6 +10182,16 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" +dependencies = [ + "windows-core 0.52.0", + "windows-targets 0.52.5", +] + [[package]] name = "windows" version = "0.56.0" diff --git a/bin/reth/src/commands/db/static_files/mod.rs b/bin/reth/src/commands/db/static_files/mod.rs index f7532237f..225247d9d 100644 --- a/bin/reth/src/commands/db/static_files/mod.rs +++ b/bin/reth/src/commands/db/static_files/mod.rs @@ -102,7 +102,7 @@ impl Command { let provider_factory = Arc::new(ProviderFactory::new( db, chain, - StaticFileProvider::read_only(data_dir.static_files())?, + StaticFileProvider::read_write(data_dir.static_files())?, )); { diff --git a/bin/reth/src/commands/debug_cmd/build_block.rs b/bin/reth/src/commands/debug_cmd/build_block.rs index f4720d817..7df2e895a 100644 --- a/bin/reth/src/commands/debug_cmd/build_block.rs +++ b/bin/reth/src/commands/debug_cmd/build_block.rs @@ -110,14 +110,10 @@ impl Command { /// Fetches the best block block from the database. /// /// If the database is empty, returns the genesis block. - fn lookup_best_block(&self, db: Arc) -> RethResult> { - let factory = ProviderFactory::new( - db, - self.chain.clone(), - StaticFileProvider::read_only( - self.datadir.unwrap_or_chain_default(self.chain.chain).static_files(), - )?, - ); + fn lookup_best_block( + &self, + factory: ProviderFactory>, + ) -> RethResult> { let provider = factory.provider()?; let best_number = @@ -158,7 +154,7 @@ impl Command { let provider_factory = ProviderFactory::new( Arc::clone(&db), Arc::clone(&self.chain), - StaticFileProvider::read_only(data_dir.static_files())?, + StaticFileProvider::read_write(data_dir.static_files())?, ); let consensus: Arc = @@ -173,8 +169,9 @@ impl Command { let blockchain_tree = Arc::new(ShareableBlockchainTree::new(tree)); // fetch the best block from the database - let best_block = - self.lookup_best_block(Arc::clone(&db)).wrap_err("the head block is missing")?; + let best_block = self + .lookup_best_block(provider_factory.clone()) + .wrap_err("the head block is missing")?; let blockchain_db = BlockchainProvider::new(provider_factory.clone(), blockchain_tree.clone())?; diff --git a/bin/reth/src/commands/debug_cmd/execution.rs b/bin/reth/src/commands/debug_cmd/execution.rs index 5403f450c..f76b09849 100644 --- a/bin/reth/src/commands/debug_cmd/execution.rs +++ b/bin/reth/src/commands/debug_cmd/execution.rs @@ -151,14 +151,11 @@ impl Command { &self, config: &Config, task_executor: TaskExecutor, - db: Arc, + provider_factory: ProviderFactory>, network_secret_path: PathBuf, default_peers_path: PathBuf, ) -> eyre::Result { let secret_key = get_secret_key(&network_secret_path)?; - let static_files = StaticFileProvider::read_only( - self.datadir.unwrap_or_chain_default(self.chain.chain).static_files(), - )?; let network = self .network .network_config(config, self.chain.clone(), secret_key, default_peers_path) @@ -168,7 +165,7 @@ impl Command { self.network.discovery.addr, self.network.discovery.port, )) - .build(ProviderFactory::new(db, self.chain.clone(), static_files)) + .build(provider_factory.clone()) .start_network() .await?; info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network"); @@ -228,7 +225,7 @@ impl Command { .build_network( &config, ctx.task_executor.clone(), - db.clone(), + provider_factory.clone(), network_secret_path, data_dir.known_peers(), ) diff --git a/bin/reth/src/commands/debug_cmd/in_memory_merkle.rs b/bin/reth/src/commands/debug_cmd/in_memory_merkle.rs index 3e1d98468..dbd1663fd 100644 --- a/bin/reth/src/commands/debug_cmd/in_memory_merkle.rs +++ b/bin/reth/src/commands/debug_cmd/in_memory_merkle.rs @@ -80,7 +80,7 @@ impl Command { &self, config: &Config, task_executor: TaskExecutor, - db: Arc, + provider_factory: ProviderFactory>, network_secret_path: PathBuf, default_peers_path: PathBuf, ) -> eyre::Result { @@ -94,13 +94,7 @@ impl Command { self.network.discovery.addr, self.network.discovery.port, )) - .build(ProviderFactory::new( - db, - self.chain.clone(), - StaticFileProvider::read_only( - self.datadir.unwrap_or_chain_default(self.chain.chain).static_files(), - )?, - )) + .build(provider_factory) .start_network() .await?; info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network"); @@ -119,11 +113,9 @@ impl Command { // initialize the database let db = Arc::new(init_db(db_path, self.db.database_args())?); - let factory = ProviderFactory::new( - &db, - self.chain.clone(), - StaticFileProvider::read_only(data_dir.static_files())?, - ); + let static_file_provider = StaticFileProvider::read_write(data_dir.static_files())?; + let factory = + ProviderFactory::new(db.clone(), self.chain.clone(), static_file_provider.clone()); let provider = factory.provider()?; // Look up merkle checkpoint @@ -140,7 +132,7 @@ impl Command { .build_network( &config, ctx.task_executor.clone(), - db.clone(), + factory.clone(), network_secret_path, data_dir.known_peers(), ) diff --git a/bin/reth/src/commands/debug_cmd/merkle.rs b/bin/reth/src/commands/debug_cmd/merkle.rs index 081764ac4..3cc79a037 100644 --- a/bin/reth/src/commands/debug_cmd/merkle.rs +++ b/bin/reth/src/commands/debug_cmd/merkle.rs @@ -86,7 +86,7 @@ impl Command { &self, config: &Config, task_executor: TaskExecutor, - db: Arc, + provider_factory: ProviderFactory>, network_secret_path: PathBuf, default_peers_path: PathBuf, ) -> eyre::Result { @@ -100,13 +100,7 @@ impl Command { self.network.discovery.addr, self.network.discovery.port, )) - .build(ProviderFactory::new( - db, - self.chain.clone(), - StaticFileProvider::read_only( - self.datadir.unwrap_or_chain_default(self.chain.chain).static_files(), - )?, - )) + .build(provider_factory.clone()) .start_network() .await?; info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network"); @@ -126,9 +120,9 @@ impl Command { // initialize the database let db = Arc::new(init_db(db_path, self.db.database_args())?); let factory = ProviderFactory::new( - &db, + db.clone(), self.chain.clone(), - StaticFileProvider::read_only(data_dir.static_files())?, + StaticFileProvider::read_write(data_dir.static_files())?, ); let provider_rw = factory.provider_rw()?; @@ -139,7 +133,7 @@ impl Command { .build_network( &config, ctx.task_executor.clone(), - db.clone(), + factory.clone(), network_secret_path, data_dir.known_peers(), ) diff --git a/bin/reth/src/commands/debug_cmd/replay_engine.rs b/bin/reth/src/commands/debug_cmd/replay_engine.rs index 986668887..212fdb241 100644 --- a/bin/reth/src/commands/debug_cmd/replay_engine.rs +++ b/bin/reth/src/commands/debug_cmd/replay_engine.rs @@ -83,7 +83,7 @@ impl Command { &self, config: &Config, task_executor: TaskExecutor, - db: Arc, + provider_factory: ProviderFactory>, network_secret_path: PathBuf, default_peers_path: PathBuf, ) -> eyre::Result { @@ -97,13 +97,7 @@ impl Command { self.network.discovery.addr, self.network.discovery.port, )) - .build(ProviderFactory::new( - db, - self.chain.clone(), - StaticFileProvider::read_only( - self.datadir.unwrap_or_chain_default(self.chain.chain).static_files(), - )?, - )) + .build(provider_factory.clone()) .start_network() .await?; info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network"); @@ -125,7 +119,7 @@ impl Command { let provider_factory = ProviderFactory::new( db.clone(), self.chain.clone(), - StaticFileProvider::read_only(data_dir.static_files())?, + StaticFileProvider::read_write(data_dir.static_files())?, ); let consensus: Arc = @@ -149,7 +143,7 @@ impl Command { .build_network( &config, ctx.task_executor.clone(), - db.clone(), + provider_factory.clone(), network_secret_path, data_dir.known_peers(), ) diff --git a/bin/reth/src/commands/stage/dump/execution.rs b/bin/reth/src/commands/stage/dump/execution.rs index abe1fccd7..919503b24 100644 --- a/bin/reth/src/commands/stage/dump/execution.rs +++ b/bin/reth/src/commands/stage/dump/execution.rs @@ -28,7 +28,7 @@ pub(crate) async fn dump_execution_stage( ProviderFactory::new( output_db, db_tool.chain.clone(), - StaticFileProvider::read_only(output_datadir.static_files())?, + StaticFileProvider::read_write(output_datadir.static_files())?, ), to, from, diff --git a/bin/reth/src/commands/stage/dump/hashing_account.rs b/bin/reth/src/commands/stage/dump/hashing_account.rs index ebcf1ad8c..125af668a 100644 --- a/bin/reth/src/commands/stage/dump/hashing_account.rs +++ b/bin/reth/src/commands/stage/dump/hashing_account.rs @@ -33,7 +33,7 @@ pub(crate) async fn dump_hashing_account_stage( ProviderFactory::new( output_db, db_tool.chain.clone(), - StaticFileProvider::read_only(output_datadir.static_files())?, + StaticFileProvider::read_write(output_datadir.static_files())?, ), to, from, diff --git a/bin/reth/src/commands/stage/dump/hashing_storage.rs b/bin/reth/src/commands/stage/dump/hashing_storage.rs index f990357d1..98df3a570 100644 --- a/bin/reth/src/commands/stage/dump/hashing_storage.rs +++ b/bin/reth/src/commands/stage/dump/hashing_storage.rs @@ -24,7 +24,7 @@ pub(crate) async fn dump_hashing_storage_stage( ProviderFactory::new( output_db, db_tool.chain.clone(), - StaticFileProvider::read_only(output_datadir.static_files())?, + StaticFileProvider::read_write(output_datadir.static_files())?, ), to, from, diff --git a/bin/reth/src/commands/stage/dump/merkle.rs b/bin/reth/src/commands/stage/dump/merkle.rs index bb3ab80bf..605f423aa 100644 --- a/bin/reth/src/commands/stage/dump/merkle.rs +++ b/bin/reth/src/commands/stage/dump/merkle.rs @@ -48,7 +48,7 @@ pub(crate) async fn dump_merkle_stage( ProviderFactory::new( output_db, db_tool.chain.clone(), - StaticFileProvider::read_only(output_datadir.static_files())?, + StaticFileProvider::read_write(output_datadir.static_files())?, ), to, from, diff --git a/bin/reth/src/commands/stage/dump/mod.rs b/bin/reth/src/commands/stage/dump/mod.rs index 03cff6055..c17962ab6 100644 --- a/bin/reth/src/commands/stage/dump/mod.rs +++ b/bin/reth/src/commands/stage/dump/mod.rs @@ -12,8 +12,8 @@ use crate::args::{ use clap::Parser; use reth_db::{ cursor::DbCursorRO, database::Database, init_db, mdbx::DatabaseArguments, - models::client_version::ClientVersion, table::TableImporter, tables, transaction::DbTx, - DatabaseEnv, + models::client_version::ClientVersion, open_db_read_only, table::TableImporter, tables, + transaction::DbTx, DatabaseEnv, }; use reth_node_core::dirs::PlatformPath; use reth_primitives::ChainSpec; @@ -104,11 +104,11 @@ impl Command { let data_dir = self.datadir.unwrap_or_chain_default(self.chain.chain); let db_path = data_dir.db(); info!(target: "reth::cli", path = ?db_path, "Opening database"); - let db = Arc::new(init_db(db_path, self.db.database_args())?); + let db = Arc::new(open_db_read_only(&db_path, self.db.database_args())?); let provider_factory = ProviderFactory::new( db, self.chain.clone(), - StaticFileProvider::read_write(data_dir.static_files())?, + StaticFileProvider::read_only(data_dir.static_files())?, ); info!(target: "reth::cli", "Database opened"); diff --git a/bin/reth/src/commands/stage/run.rs b/bin/reth/src/commands/stage/run.rs index e6b96bdc6..9c1d90091 100644 --- a/bin/reth/src/commands/stage/run.rs +++ b/bin/reth/src/commands/stage/run.rs @@ -146,12 +146,12 @@ impl Command { let db = Arc::new(init_db(db_path, self.db.database_args())?); info!(target: "reth::cli", "Database opened"); - let factory = ProviderFactory::new( + let provider_factory = ProviderFactory::new( Arc::clone(&db), self.chain.clone(), StaticFileProvider::read_write(data_dir.static_files())?, ); - let mut provider_rw = factory.provider_rw()?; + let mut provider_rw = provider_factory.provider_rw()?; if let Some(listen_addr) = self.metrics { info!(target: "reth::cli", "Starting metrics endpoint at {}", listen_addr); @@ -159,7 +159,7 @@ impl Command { listen_addr, prometheus_exporter::install_recorder()?, Arc::clone(&db), - factory.static_file_provider(), + provider_factory.static_file_provider(), metrics_process::Collector::default(), ctx.task_executor, ) @@ -196,12 +196,6 @@ impl Command { let default_peers_path = data_dir.known_peers(); - let provider_factory = Arc::new(ProviderFactory::new( - db.clone(), - self.chain.clone(), - StaticFileProvider::read_write(data_dir.static_files())?, - )); - let network = self .network .network_config( @@ -226,7 +220,7 @@ impl Command { config.stages.bodies.downloader_min_concurrent_requests..= config.stages.bodies.downloader_max_concurrent_requests, ) - .build(fetch_client, consensus.clone(), provider_factory), + .build(fetch_client, consensus.clone(), provider_factory.clone()), ); (Box::new(stage), None) } @@ -323,7 +317,7 @@ impl Command { if self.commit { provider_rw.commit()?; - provider_rw = factory.provider_rw()?; + provider_rw = provider_factory.provider_rw()?; } } } @@ -346,7 +340,7 @@ impl Command { } if self.commit { provider_rw.commit()?; - provider_rw = factory.provider_rw()?; + provider_rw = provider_factory.provider_rw()?; } if done { diff --git a/crates/storage/db-common/src/init.rs b/crates/storage/db-common/src/init.rs index b193f0458..f804b8790 100644 --- a/crates/storage/db-common/src/init.rs +++ b/crates/storage/db-common/src/init.rs @@ -582,7 +582,7 @@ mod tests { let genesis_hash = init_genesis(ProviderFactory::new( factory.into_db(), MAINNET.clone(), - StaticFileProvider::read_write(static_file_provider.path()).unwrap(), + static_file_provider, )); assert_eq!( diff --git a/crates/storage/db/Cargo.toml b/crates/storage/db/Cargo.toml index 3a9ea5be5..5dd4c5a8c 100644 --- a/crates/storage/db/Cargo.toml +++ b/crates/storage/db/Cargo.toml @@ -42,6 +42,7 @@ derive_more.workspace = true eyre.workspace = true paste.workspace = true rustc-hash.workspace = true +sysinfo = "0.30" # arbitrary utils arbitrary = { workspace = true, features = ["derive"], optional = true } diff --git a/crates/storage/db/src/implementation/mdbx/mod.rs b/crates/storage/db/src/implementation/mdbx/mod.rs index 62fa3e053..947eb6b2b 100644 --- a/crates/storage/db/src/implementation/mdbx/mod.rs +++ b/crates/storage/db/src/implementation/mdbx/mod.rs @@ -4,6 +4,7 @@ use crate::{ cursor::{DbCursorRO, DbCursorRW}, database::Database, database_metrics::{DatabaseMetadata, DatabaseMetadataValue, DatabaseMetrics}, + lockfile::StorageLock, metrics::DatabaseEnvMetrics, models::client_version::ClientVersion, tables::{self, TableType, Tables}, @@ -134,6 +135,8 @@ pub struct DatabaseEnv { inner: Environment, /// Cache for metric handles. If `None`, metrics are not recorded. metrics: Option>, + /// Write lock for when dealing with a read-write environment. + _lock_file: Option, } impl Database for DatabaseEnv { @@ -251,6 +254,15 @@ impl DatabaseEnv { kind: DatabaseEnvKind, args: DatabaseArguments, ) -> Result { + let _lock_file = if kind.is_rw() { + Some( + StorageLock::try_acquire(path) + .map_err(|err| DatabaseError::Other(err.to_string()))?, + ) + } else { + None + }; + let mut inner_env = Environment::builder(); let mode = match kind { @@ -382,6 +394,7 @@ impl DatabaseEnv { let env = Self { inner: inner_env.open(path).map_err(|e| DatabaseError::Open(e.into()))?, metrics: None, + _lock_file, }; Ok(env) diff --git a/crates/storage/db/src/lib.rs b/crates/storage/db/src/lib.rs index 05f2bddd6..15ceb671a 100644 --- a/crates/storage/db/src/lib.rs +++ b/crates/storage/db/src/lib.rs @@ -67,6 +67,7 @@ pub mod abstraction; mod implementation; +pub mod lockfile; mod metrics; pub mod static_file; pub mod tables; diff --git a/crates/storage/db/src/lockfile.rs b/crates/storage/db/src/lockfile.rs new file mode 100644 index 000000000..ebfc240e7 --- /dev/null +++ b/crates/storage/db/src/lockfile.rs @@ -0,0 +1,79 @@ +//! Storage lock utils. + +use reth_storage_errors::lockfile::StorageLockError; +use reth_tracing::tracing::error; +use std::{ + path::{Path, PathBuf}, + process, + sync::Arc, +}; +use sysinfo::System; + +/// A file lock for a storage directory to ensure exclusive read-write access. +/// +/// This lock stores the PID of the process holding it and is released (deleted) on a graceful +/// shutdown. On resuming from a crash, the stored PID helps verify that no other process holds the +/// lock. +#[derive(Debug, Clone)] +pub struct StorageLock(Arc); + +impl StorageLock { + /// Tries to acquires a write lock on the target directory, returning [StorageLockError] if + /// unsuccessful. + pub fn try_acquire(path: &Path) -> Result { + let path = path.join("lock"); + let lock = match parse_lock_file_pid(&path)? { + Some(pid) => { + if System::new_all().process(pid.into()).is_some() { + return Err(StorageLockError::Taken(pid)) + } else { + // If PID is no longer active, take hold of the lock. + StorageLockInner::new(path) + } + } + None => StorageLockInner::new(path), + }; + Ok(Self(Arc::new(lock?))) + } +} + +impl Drop for StorageLock { + fn drop(&mut self) { + if Arc::strong_count(&self.0) == 1 && self.0.file_path.exists() { + // TODO: should only happen during tests that the file does not exist: tempdir is + // getting dropped first. However, tempdir shouldn't be dropped + // before any of the storage providers. + if let Err(err) = reth_fs_util::remove_file(&self.0.file_path) { + error!(%err, "Failed to delete lock file"); + } + } + } +} + +#[derive(Debug)] +struct StorageLockInner { + file_path: PathBuf, +} + +impl StorageLockInner { + /// Creates lock file and writes this process PID into it. + fn new(file_path: PathBuf) -> Result { + // Create the directory if it doesn't exist + if let Some(parent) = file_path.parent() { + reth_fs_util::create_dir_all(parent)?; + } + + reth_fs_util::write(&file_path, format!("{}", process::id()))?; + + Ok(Self { file_path }) + } +} + +/// Parses the PID from the lock file if it exists. +fn parse_lock_file_pid(path: &Path) -> Result, StorageLockError> { + if path.exists() { + let contents = reth_fs_util::read_to_string(path)?; + return Ok(contents.trim().parse().ok()) + } + Ok(None) +} diff --git a/crates/storage/errors/src/db.rs b/crates/storage/errors/src/db.rs index 43b471a3c..33f8b13c0 100644 --- a/crates/storage/errors/src/db.rs +++ b/crates/storage/errors/src/db.rs @@ -37,6 +37,9 @@ pub enum DatabaseError { /// Failed to use the specified log level, as it's not available. #[error("log level {0:?} is not available")] LogLevelUnavailable(LogLevel), + /// Other unspecified error. + #[error("{0}")] + Other(String), } /// Common error struct to propagate implementation-specific error information. diff --git a/crates/storage/errors/src/lib.rs b/crates/storage/errors/src/lib.rs index 6bab8f051..8247c6352 100644 --- a/crates/storage/errors/src/lib.rs +++ b/crates/storage/errors/src/lib.rs @@ -11,5 +11,8 @@ /// Database error pub mod db; +/// Lockfile error +pub mod lockfile; + /// Provider error pub mod provider; diff --git a/crates/storage/errors/src/lockfile.rs b/crates/storage/errors/src/lockfile.rs new file mode 100644 index 000000000..010d02ddb --- /dev/null +++ b/crates/storage/errors/src/lockfile.rs @@ -0,0 +1,20 @@ +use reth_fs_util::FsPathError; +use thiserror::Error; + +#[derive(Error, Debug, Clone, PartialEq, Eq)] +/// Storage lock error. +pub enum StorageLockError { + /// Write lock taken + #[error("storage directory is currently in use as read-write by another process: {0}")] + Taken(usize), + /// Indicates other unspecified errors. + #[error("{0}")] + Other(String), +} + +/// TODO: turn into variant once ProviderError +impl From for StorageLockError { + fn from(source: FsPathError) -> Self { + Self::Other(source.to_string()) + } +} diff --git a/crates/storage/errors/src/provider.rs b/crates/storage/errors/src/provider.rs index 5420b8f3c..7472d500c 100644 --- a/crates/storage/errors/src/provider.rs +++ b/crates/storage/errors/src/provider.rs @@ -131,6 +131,9 @@ pub enum ProviderError { /// Consistent view error. #[error("failed to initialize consistent view: {0}")] ConsistentView(Box), + /// Storage lock error. + #[error(transparent)] + StorageLockError(#[from] crate::lockfile::StorageLockError), } impl From for ProviderError { diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index 0052868ad..0a29436cb 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -12,6 +12,7 @@ use parking_lot::RwLock; use reth_db::{ codecs::CompactU256, cursor::DbCursorRO, + lockfile::StorageLock, models::StoredBlockBodyIndices, static_file::{iter_static_files, HeaderMask, ReceiptMask, StaticFileCursor, TransactionMask}, table::Table, @@ -58,6 +59,11 @@ impl StaticFileAccess { pub const fn is_read_only(&self) -> bool { matches!(self, Self::RO) } + + /// Returns `true` if read-write access. + pub const fn is_read_write(&self) -> bool { + matches!(self, Self::RW) + } } /// [`StaticFileProvider`] manages all existing [`StaticFileJarProvider`]. @@ -111,11 +117,19 @@ pub struct StaticFileProviderInner { metrics: Option>, /// Access rights of the provider. access: StaticFileAccess, + /// Write lock for when access is [StaticFileAccess::RW]. + _lock_file: Option, } impl StaticFileProviderInner { /// Creates a new [`StaticFileProviderInner`]. fn new(path: impl AsRef, access: StaticFileAccess) -> ProviderResult { + let _lock_file = if access.is_read_write() { + Some(StorageLock::try_acquire(path.as_ref())?) + } else { + None + }; + let provider = Self { map: Default::default(), writers: Default::default(), @@ -125,6 +139,7 @@ impl StaticFileProviderInner { load_filters: false, metrics: None, access, + _lock_file, }; Ok(provider)