From c2650388bd8c479b155d29eaa8920ec3ac5e04c6 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Wed, 12 Jun 2024 11:28:13 +0200 Subject: [PATCH] fix: add `start_time` to `ProcessUID` on `StorageLock` (#8753) --- crates/storage/db/Cargo.toml | 1 + crates/storage/db/src/lockfile.rs | 105 ++++++++++++++++++++++++------ testing/ef-tests/Cargo.toml | 2 +- 3 files changed, 88 insertions(+), 20 deletions(-) diff --git a/crates/storage/db/Cargo.toml b/crates/storage/db/Cargo.toml index ca27a9a40..8fb417fb3 100644 --- a/crates/storage/db/Cargo.toml +++ b/crates/storage/db/Cargo.toml @@ -79,6 +79,7 @@ mdbx = ["reth-libmdbx"] bench = [] arbitrary = ["reth-primitives/arbitrary", "reth-db-api/arbitrary"] optimism = [] +disable-lock = [] [[bench]] name = "hash_keys" diff --git a/crates/storage/db/src/lockfile.rs b/crates/storage/db/src/lockfile.rs index 2d7704506..e0da20348 100644 --- a/crates/storage/db/src/lockfile.rs +++ b/crates/storage/db/src/lockfile.rs @@ -1,5 +1,7 @@ //! Storage lock utils. +#![cfg_attr(feature = "disable-lock", allow(dead_code))] + use reth_storage_errors::lockfile::StorageLockError; use reth_tracing::tracing::error; use std::{ @@ -7,7 +9,7 @@ use std::{ process, sync::Arc, }; -use sysinfo::System; +use sysinfo::{ProcessRefreshKind, RefreshKind, System}; /// File lock name. const LOCKFILE_NAME: &str = "lock"; @@ -28,15 +30,31 @@ impl StorageLock { /// Note: In-process exclusivity is not on scope. If called from the same process (or another /// with the same PID), it will succeed. pub fn try_acquire(path: &Path) -> Result { - let path = path.join(LOCKFILE_NAME); + let file_path = path.join(LOCKFILE_NAME); - if let Some(pid) = parse_lock_file_pid(&path)? { - if pid != (process::id() as usize) && System::new_all().process(pid.into()).is_some() { - return Err(StorageLockError::Taken(pid)) - } + #[cfg(feature = "disable-lock")] + { + // Too expensive for ef-tests to write/read lock to/from disk. + Ok(Self(Arc::new(StorageLockInner { file_path }))) } - Ok(Self(Arc::new(StorageLockInner::new(path)?))) + #[cfg(not(feature = "disable-lock"))] + { + if let Some(process_lock) = ProcessUID::parse(&file_path)? { + if process_lock.pid != (process::id() as usize) && process_lock.is_active() { + error!( + target: "reth::db::lockfile", + path = ?file_path, + pid = process_lock.pid, + start_time = process_lock.start_time, + "Storage lock already taken." + ); + return Err(StorageLockError::Taken(process_lock.pid)) + } + } + + Ok(Self(Arc::new(StorageLockInner::new(file_path)?))) + } } } @@ -66,19 +84,61 @@ impl StorageLockInner { reth_fs_util::create_dir_all(parent)?; } - reth_fs_util::write(&file_path, format!("{}", process::id()))?; + // Write this process unique identifier (pid & start_time) to file + ProcessUID::own().write(&file_path)?; Ok(Self { file_path }) } } -/// Parses the PID from the lock file if it exists. -fn parse_lock_file_pid(path: &Path) -> Result, StorageLockError> { - if path.exists() { - let contents = reth_fs_util::read_to_string(path)?; - return Ok(contents.trim().parse().ok()) +#[derive(Debug)] +struct ProcessUID { + /// OS process identifier + pid: usize, + /// Process start time + start_time: u64, +} + +impl ProcessUID { + /// Creates [`Self`] for the provided PID. + fn new(pid: usize) -> Option { + System::new_with_specifics(RefreshKind::new().with_processes(ProcessRefreshKind::new())) + .process(pid.into()) + .map(|process| Self { pid, start_time: process.start_time() }) + } + + /// Creates [`Self`] from own process. + fn own() -> Self { + Self::new(process::id() as usize).expect("own process") + } + + /// Parses [`Self`] from a file. + fn parse(path: &Path) -> Result, StorageLockError> { + if path.exists() { + if let Ok(contents) = reth_fs_util::read_to_string(path) { + let mut lines = contents.lines(); + if let (Some(Ok(pid)), Some(Ok(start_time))) = ( + lines.next().map(str::trim).map(str::parse), + lines.next().map(str::trim).map(str::parse), + ) { + return Ok(Some(Self { pid, start_time })); + } + } + } + Ok(None) + } + + /// Whether a process with this `pid` and `start_time` exists. + fn is_active(&self) -> bool { + System::new_with_specifics(RefreshKind::new().with_processes(ProcessRefreshKind::new())) + .process(self.pid.into()) + .is_some_and(|p| p.start_time() == self.start_time) + } + + /// Writes `pid` and `start_time` to a file. + fn write(&self, path: &Path) -> Result<(), StorageLockError> { + Ok(reth_fs_util::write(path, format!("{}\n{}", self.pid, self.start_time))?) } - Ok(None) } #[cfg(test)] @@ -101,12 +161,19 @@ mod tests { while system.process(fake_pid.into()).is_some() { fake_pid += 1; } - reth_fs_util::write(&lock_file, format!("{}", fake_pid)).unwrap(); - assert_eq!(Ok(lock), StorageLock::try_acquire(temp_dir.path())); + ProcessUID { pid: fake_pid, start_time: u64::MAX }.write(&lock_file).unwrap(); + assert_eq!(Ok(lock.clone()), StorageLock::try_acquire(temp_dir.path())); - // A lock of a different but existing PID cannot be acquired. - reth_fs_util::write(&lock_file, "1").unwrap(); + let mut pid_1 = ProcessUID::new(1).unwrap(); + + // If a parsed `ProcessUID` exists, the lock can NOT be acquired. + pid_1.write(&lock_file).unwrap(); assert_eq!(Err(StorageLockError::Taken(1)), StorageLock::try_acquire(temp_dir.path())); + + // A lock of a different but existing PID can be acquired ONLY IF the start_time differs. + pid_1.start_time += 1; + pid_1.write(&lock_file).unwrap(); + assert_eq!(Ok(lock), StorageLock::try_acquire(temp_dir.path())); } #[test] @@ -116,8 +183,8 @@ mod tests { let lock = StorageLock::try_acquire(temp_dir.path()).unwrap(); + assert!(lock_file.exists()); drop(lock); - assert!(!lock_file.exists()); } } diff --git a/testing/ef-tests/Cargo.toml b/testing/ef-tests/Cargo.toml index ecacc0a67..ea991402e 100644 --- a/testing/ef-tests/Cargo.toml +++ b/testing/ef-tests/Cargo.toml @@ -17,7 +17,7 @@ asm-keccak = ["reth-primitives/asm-keccak"] [dependencies] reth-primitives.workspace = true -reth-db = { workspace = true, features = ["mdbx", "test-utils"] } +reth-db = { workspace = true, features = ["mdbx", "test-utils", "disable-lock"] } reth-db-api.workspace = true reth-provider = { workspace = true, features = ["test-utils"] } reth-stages.workspace = true