fix: add start_time to ProcessUID on StorageLock (#8753)

This commit is contained in:
joshieDo
2024-06-12 11:28:13 +02:00
committed by GitHub
parent 1c148e7f03
commit c2650388bd
3 changed files with 88 additions and 20 deletions

View File

@ -79,6 +79,7 @@ mdbx = ["reth-libmdbx"]
bench = []
arbitrary = ["reth-primitives/arbitrary", "reth-db-api/arbitrary"]
optimism = []
disable-lock = []
[[bench]]
name = "hash_keys"

View File

@ -1,5 +1,7 @@
//! Storage lock utils.
#![cfg_attr(feature = "disable-lock", allow(dead_code))]
use reth_storage_errors::lockfile::StorageLockError;
use reth_tracing::tracing::error;
use std::{
@ -7,7 +9,7 @@ use std::{
process,
sync::Arc,
};
use sysinfo::System;
use sysinfo::{ProcessRefreshKind, RefreshKind, System};
/// File lock name.
const LOCKFILE_NAME: &str = "lock";
@ -28,15 +30,31 @@ impl StorageLock {
/// Note: In-process exclusivity is not on scope. If called from the same process (or another
/// with the same PID), it will succeed.
pub fn try_acquire(path: &Path) -> Result<Self, StorageLockError> {
let path = path.join(LOCKFILE_NAME);
let file_path = path.join(LOCKFILE_NAME);
if let Some(pid) = parse_lock_file_pid(&path)? {
if pid != (process::id() as usize) && System::new_all().process(pid.into()).is_some() {
return Err(StorageLockError::Taken(pid))
}
#[cfg(feature = "disable-lock")]
{
// Too expensive for ef-tests to write/read lock to/from disk.
Ok(Self(Arc::new(StorageLockInner { file_path })))
}
Ok(Self(Arc::new(StorageLockInner::new(path)?)))
#[cfg(not(feature = "disable-lock"))]
{
if let Some(process_lock) = ProcessUID::parse(&file_path)? {
if process_lock.pid != (process::id() as usize) && process_lock.is_active() {
error!(
target: "reth::db::lockfile",
path = ?file_path,
pid = process_lock.pid,
start_time = process_lock.start_time,
"Storage lock already taken."
);
return Err(StorageLockError::Taken(process_lock.pid))
}
}
Ok(Self(Arc::new(StorageLockInner::new(file_path)?)))
}
}
}
@ -66,19 +84,61 @@ impl StorageLockInner {
reth_fs_util::create_dir_all(parent)?;
}
reth_fs_util::write(&file_path, format!("{}", process::id()))?;
// Write this process unique identifier (pid & start_time) to file
ProcessUID::own().write(&file_path)?;
Ok(Self { file_path })
}
}
/// Parses the PID from the lock file if it exists.
fn parse_lock_file_pid(path: &Path) -> Result<Option<usize>, StorageLockError> {
if path.exists() {
let contents = reth_fs_util::read_to_string(path)?;
return Ok(contents.trim().parse().ok())
#[derive(Debug)]
struct ProcessUID {
/// OS process identifier
pid: usize,
/// Process start time
start_time: u64,
}
impl ProcessUID {
/// Creates [`Self`] for the provided PID.
fn new(pid: usize) -> Option<Self> {
System::new_with_specifics(RefreshKind::new().with_processes(ProcessRefreshKind::new()))
.process(pid.into())
.map(|process| Self { pid, start_time: process.start_time() })
}
/// Creates [`Self`] from own process.
fn own() -> Self {
Self::new(process::id() as usize).expect("own process")
}
/// Parses [`Self`] from a file.
fn parse(path: &Path) -> Result<Option<Self>, StorageLockError> {
if path.exists() {
if let Ok(contents) = reth_fs_util::read_to_string(path) {
let mut lines = contents.lines();
if let (Some(Ok(pid)), Some(Ok(start_time))) = (
lines.next().map(str::trim).map(str::parse),
lines.next().map(str::trim).map(str::parse),
) {
return Ok(Some(Self { pid, start_time }));
}
}
}
Ok(None)
}
/// Whether a process with this `pid` and `start_time` exists.
fn is_active(&self) -> bool {
System::new_with_specifics(RefreshKind::new().with_processes(ProcessRefreshKind::new()))
.process(self.pid.into())
.is_some_and(|p| p.start_time() == self.start_time)
}
/// Writes `pid` and `start_time` to a file.
fn write(&self, path: &Path) -> Result<(), StorageLockError> {
Ok(reth_fs_util::write(path, format!("{}\n{}", self.pid, self.start_time))?)
}
Ok(None)
}
#[cfg(test)]
@ -101,12 +161,19 @@ mod tests {
while system.process(fake_pid.into()).is_some() {
fake_pid += 1;
}
reth_fs_util::write(&lock_file, format!("{}", fake_pid)).unwrap();
assert_eq!(Ok(lock), StorageLock::try_acquire(temp_dir.path()));
ProcessUID { pid: fake_pid, start_time: u64::MAX }.write(&lock_file).unwrap();
assert_eq!(Ok(lock.clone()), StorageLock::try_acquire(temp_dir.path()));
// A lock of a different but existing PID cannot be acquired.
reth_fs_util::write(&lock_file, "1").unwrap();
let mut pid_1 = ProcessUID::new(1).unwrap();
// If a parsed `ProcessUID` exists, the lock can NOT be acquired.
pid_1.write(&lock_file).unwrap();
assert_eq!(Err(StorageLockError::Taken(1)), StorageLock::try_acquire(temp_dir.path()));
// A lock of a different but existing PID can be acquired ONLY IF the start_time differs.
pid_1.start_time += 1;
pid_1.write(&lock_file).unwrap();
assert_eq!(Ok(lock), StorageLock::try_acquire(temp_dir.path()));
}
#[test]
@ -116,8 +183,8 @@ mod tests {
let lock = StorageLock::try_acquire(temp_dir.path()).unwrap();
assert!(lock_file.exists());
drop(lock);
assert!(!lock_file.exists());
}
}

View File

@ -17,7 +17,7 @@ asm-keccak = ["reth-primitives/asm-keccak"]
[dependencies]
reth-primitives.workspace = true
reth-db = { workspace = true, features = ["mdbx", "test-utils"] }
reth-db = { workspace = true, features = ["mdbx", "test-utils", "disable-lock"] }
reth-db-api.workspace = true
reth-provider = { workspace = true, features = ["test-utils"] }
reth-stages.workspace = true