feat: adds StorageLock to StaticFileProvider and mdbx::DatabaseEnv (#8528)

This commit is contained in:
joshieDo
2024-06-03 14:28:56 +02:00
committed by GitHub
parent 063807b3ae
commit db3d1335a8
23 changed files with 216 additions and 75 deletions

37
Cargo.lock generated
View File

@ -4754,7 +4754,7 @@ dependencies = [
"once_cell", "once_cell",
"procfs", "procfs",
"rlimit", "rlimit",
"windows", "windows 0.56.0",
] ]
[[package]] [[package]]
@ -5019,6 +5019,15 @@ dependencies = [
"minimal-lexical", "minimal-lexical",
] ]
[[package]]
name = "ntapi"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4"
dependencies = [
"winapi",
]
[[package]] [[package]]
name = "nu-ansi-term" name = "nu-ansi-term"
version = "0.46.0" version = "0.46.0"
@ -6452,6 +6461,7 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"strum", "strum",
"sysinfo",
"tempfile", "tempfile",
"test-fuzz", "test-fuzz",
"thiserror", "thiserror",
@ -9113,6 +9123,21 @@ dependencies = [
"syn 2.0.66", "syn 2.0.66",
] ]
[[package]]
name = "sysinfo"
version = "0.30.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "732ffa00f53e6b2af46208fba5718d9662a421049204e156328b66791ffa15ae"
dependencies = [
"cfg-if",
"core-foundation-sys",
"libc",
"ntapi",
"once_cell",
"rayon",
"windows 0.52.0",
]
[[package]] [[package]]
name = "system-configuration" name = "system-configuration"
version = "0.5.1" version = "0.5.1"
@ -10157,6 +10182,16 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be"
dependencies = [
"windows-core 0.52.0",
"windows-targets 0.52.5",
]
[[package]] [[package]]
name = "windows" name = "windows"
version = "0.56.0" version = "0.56.0"

View File

@ -102,7 +102,7 @@ impl Command {
let provider_factory = Arc::new(ProviderFactory::new( let provider_factory = Arc::new(ProviderFactory::new(
db, db,
chain, chain,
StaticFileProvider::read_only(data_dir.static_files())?, StaticFileProvider::read_write(data_dir.static_files())?,
)); ));
{ {

View File

@ -110,14 +110,10 @@ impl Command {
/// Fetches the best block block from the database. /// Fetches the best block block from the database.
/// ///
/// If the database is empty, returns the genesis block. /// If the database is empty, returns the genesis block.
fn lookup_best_block(&self, db: Arc<DatabaseEnv>) -> RethResult<Arc<SealedBlock>> { fn lookup_best_block(
let factory = ProviderFactory::new( &self,
db, factory: ProviderFactory<Arc<DatabaseEnv>>,
self.chain.clone(), ) -> RethResult<Arc<SealedBlock>> {
StaticFileProvider::read_only(
self.datadir.unwrap_or_chain_default(self.chain.chain).static_files(),
)?,
);
let provider = factory.provider()?; let provider = factory.provider()?;
let best_number = let best_number =
@ -158,7 +154,7 @@ impl Command {
let provider_factory = ProviderFactory::new( let provider_factory = ProviderFactory::new(
Arc::clone(&db), Arc::clone(&db),
Arc::clone(&self.chain), Arc::clone(&self.chain),
StaticFileProvider::read_only(data_dir.static_files())?, StaticFileProvider::read_write(data_dir.static_files())?,
); );
let consensus: Arc<dyn Consensus> = let consensus: Arc<dyn Consensus> =
@ -173,8 +169,9 @@ impl Command {
let blockchain_tree = Arc::new(ShareableBlockchainTree::new(tree)); let blockchain_tree = Arc::new(ShareableBlockchainTree::new(tree));
// fetch the best block from the database // fetch the best block from the database
let best_block = let best_block = self
self.lookup_best_block(Arc::clone(&db)).wrap_err("the head block is missing")?; .lookup_best_block(provider_factory.clone())
.wrap_err("the head block is missing")?;
let blockchain_db = let blockchain_db =
BlockchainProvider::new(provider_factory.clone(), blockchain_tree.clone())?; BlockchainProvider::new(provider_factory.clone(), blockchain_tree.clone())?;

View File

@ -151,14 +151,11 @@ impl Command {
&self, &self,
config: &Config, config: &Config,
task_executor: TaskExecutor, task_executor: TaskExecutor,
db: Arc<DatabaseEnv>, provider_factory: ProviderFactory<Arc<DatabaseEnv>>,
network_secret_path: PathBuf, network_secret_path: PathBuf,
default_peers_path: PathBuf, default_peers_path: PathBuf,
) -> eyre::Result<NetworkHandle> { ) -> eyre::Result<NetworkHandle> {
let secret_key = get_secret_key(&network_secret_path)?; let secret_key = get_secret_key(&network_secret_path)?;
let static_files = StaticFileProvider::read_only(
self.datadir.unwrap_or_chain_default(self.chain.chain).static_files(),
)?;
let network = self let network = self
.network .network
.network_config(config, self.chain.clone(), secret_key, default_peers_path) .network_config(config, self.chain.clone(), secret_key, default_peers_path)
@ -168,7 +165,7 @@ impl Command {
self.network.discovery.addr, self.network.discovery.addr,
self.network.discovery.port, self.network.discovery.port,
)) ))
.build(ProviderFactory::new(db, self.chain.clone(), static_files)) .build(provider_factory.clone())
.start_network() .start_network()
.await?; .await?;
info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network"); info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network");
@ -228,7 +225,7 @@ impl Command {
.build_network( .build_network(
&config, &config,
ctx.task_executor.clone(), ctx.task_executor.clone(),
db.clone(), provider_factory.clone(),
network_secret_path, network_secret_path,
data_dir.known_peers(), data_dir.known_peers(),
) )

View File

@ -80,7 +80,7 @@ impl Command {
&self, &self,
config: &Config, config: &Config,
task_executor: TaskExecutor, task_executor: TaskExecutor,
db: Arc<DatabaseEnv>, provider_factory: ProviderFactory<Arc<DatabaseEnv>>,
network_secret_path: PathBuf, network_secret_path: PathBuf,
default_peers_path: PathBuf, default_peers_path: PathBuf,
) -> eyre::Result<NetworkHandle> { ) -> eyre::Result<NetworkHandle> {
@ -94,13 +94,7 @@ impl Command {
self.network.discovery.addr, self.network.discovery.addr,
self.network.discovery.port, self.network.discovery.port,
)) ))
.build(ProviderFactory::new( .build(provider_factory)
db,
self.chain.clone(),
StaticFileProvider::read_only(
self.datadir.unwrap_or_chain_default(self.chain.chain).static_files(),
)?,
))
.start_network() .start_network()
.await?; .await?;
info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network"); info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network");
@ -119,11 +113,9 @@ impl Command {
// initialize the database // initialize the database
let db = Arc::new(init_db(db_path, self.db.database_args())?); let db = Arc::new(init_db(db_path, self.db.database_args())?);
let factory = ProviderFactory::new( let static_file_provider = StaticFileProvider::read_write(data_dir.static_files())?;
&db, let factory =
self.chain.clone(), ProviderFactory::new(db.clone(), self.chain.clone(), static_file_provider.clone());
StaticFileProvider::read_only(data_dir.static_files())?,
);
let provider = factory.provider()?; let provider = factory.provider()?;
// Look up merkle checkpoint // Look up merkle checkpoint
@ -140,7 +132,7 @@ impl Command {
.build_network( .build_network(
&config, &config,
ctx.task_executor.clone(), ctx.task_executor.clone(),
db.clone(), factory.clone(),
network_secret_path, network_secret_path,
data_dir.known_peers(), data_dir.known_peers(),
) )

View File

@ -86,7 +86,7 @@ impl Command {
&self, &self,
config: &Config, config: &Config,
task_executor: TaskExecutor, task_executor: TaskExecutor,
db: Arc<DatabaseEnv>, provider_factory: ProviderFactory<Arc<DatabaseEnv>>,
network_secret_path: PathBuf, network_secret_path: PathBuf,
default_peers_path: PathBuf, default_peers_path: PathBuf,
) -> eyre::Result<NetworkHandle> { ) -> eyre::Result<NetworkHandle> {
@ -100,13 +100,7 @@ impl Command {
self.network.discovery.addr, self.network.discovery.addr,
self.network.discovery.port, self.network.discovery.port,
)) ))
.build(ProviderFactory::new( .build(provider_factory.clone())
db,
self.chain.clone(),
StaticFileProvider::read_only(
self.datadir.unwrap_or_chain_default(self.chain.chain).static_files(),
)?,
))
.start_network() .start_network()
.await?; .await?;
info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network"); info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network");
@ -126,9 +120,9 @@ impl Command {
// initialize the database // initialize the database
let db = Arc::new(init_db(db_path, self.db.database_args())?); let db = Arc::new(init_db(db_path, self.db.database_args())?);
let factory = ProviderFactory::new( let factory = ProviderFactory::new(
&db, db.clone(),
self.chain.clone(), self.chain.clone(),
StaticFileProvider::read_only(data_dir.static_files())?, StaticFileProvider::read_write(data_dir.static_files())?,
); );
let provider_rw = factory.provider_rw()?; let provider_rw = factory.provider_rw()?;
@ -139,7 +133,7 @@ impl Command {
.build_network( .build_network(
&config, &config,
ctx.task_executor.clone(), ctx.task_executor.clone(),
db.clone(), factory.clone(),
network_secret_path, network_secret_path,
data_dir.known_peers(), data_dir.known_peers(),
) )

View File

@ -83,7 +83,7 @@ impl Command {
&self, &self,
config: &Config, config: &Config,
task_executor: TaskExecutor, task_executor: TaskExecutor,
db: Arc<DatabaseEnv>, provider_factory: ProviderFactory<Arc<DatabaseEnv>>,
network_secret_path: PathBuf, network_secret_path: PathBuf,
default_peers_path: PathBuf, default_peers_path: PathBuf,
) -> eyre::Result<NetworkHandle> { ) -> eyre::Result<NetworkHandle> {
@ -97,13 +97,7 @@ impl Command {
self.network.discovery.addr, self.network.discovery.addr,
self.network.discovery.port, self.network.discovery.port,
)) ))
.build(ProviderFactory::new( .build(provider_factory.clone())
db,
self.chain.clone(),
StaticFileProvider::read_only(
self.datadir.unwrap_or_chain_default(self.chain.chain).static_files(),
)?,
))
.start_network() .start_network()
.await?; .await?;
info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network"); info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network");
@ -125,7 +119,7 @@ impl Command {
let provider_factory = ProviderFactory::new( let provider_factory = ProviderFactory::new(
db.clone(), db.clone(),
self.chain.clone(), self.chain.clone(),
StaticFileProvider::read_only(data_dir.static_files())?, StaticFileProvider::read_write(data_dir.static_files())?,
); );
let consensus: Arc<dyn Consensus> = let consensus: Arc<dyn Consensus> =
@ -149,7 +143,7 @@ impl Command {
.build_network( .build_network(
&config, &config,
ctx.task_executor.clone(), ctx.task_executor.clone(),
db.clone(), provider_factory.clone(),
network_secret_path, network_secret_path,
data_dir.known_peers(), data_dir.known_peers(),
) )

View File

@ -28,7 +28,7 @@ pub(crate) async fn dump_execution_stage<DB: Database>(
ProviderFactory::new( ProviderFactory::new(
output_db, output_db,
db_tool.chain.clone(), db_tool.chain.clone(),
StaticFileProvider::read_only(output_datadir.static_files())?, StaticFileProvider::read_write(output_datadir.static_files())?,
), ),
to, to,
from, from,

View File

@ -33,7 +33,7 @@ pub(crate) async fn dump_hashing_account_stage<DB: Database>(
ProviderFactory::new( ProviderFactory::new(
output_db, output_db,
db_tool.chain.clone(), db_tool.chain.clone(),
StaticFileProvider::read_only(output_datadir.static_files())?, StaticFileProvider::read_write(output_datadir.static_files())?,
), ),
to, to,
from, from,

View File

@ -24,7 +24,7 @@ pub(crate) async fn dump_hashing_storage_stage<DB: Database>(
ProviderFactory::new( ProviderFactory::new(
output_db, output_db,
db_tool.chain.clone(), db_tool.chain.clone(),
StaticFileProvider::read_only(output_datadir.static_files())?, StaticFileProvider::read_write(output_datadir.static_files())?,
), ),
to, to,
from, from,

View File

@ -48,7 +48,7 @@ pub(crate) async fn dump_merkle_stage<DB: Database>(
ProviderFactory::new( ProviderFactory::new(
output_db, output_db,
db_tool.chain.clone(), db_tool.chain.clone(),
StaticFileProvider::read_only(output_datadir.static_files())?, StaticFileProvider::read_write(output_datadir.static_files())?,
), ),
to, to,
from, from,

View File

@ -12,8 +12,8 @@ use crate::args::{
use clap::Parser; use clap::Parser;
use reth_db::{ use reth_db::{
cursor::DbCursorRO, database::Database, init_db, mdbx::DatabaseArguments, cursor::DbCursorRO, database::Database, init_db, mdbx::DatabaseArguments,
models::client_version::ClientVersion, table::TableImporter, tables, transaction::DbTx, models::client_version::ClientVersion, open_db_read_only, table::TableImporter, tables,
DatabaseEnv, transaction::DbTx, DatabaseEnv,
}; };
use reth_node_core::dirs::PlatformPath; use reth_node_core::dirs::PlatformPath;
use reth_primitives::ChainSpec; use reth_primitives::ChainSpec;
@ -104,11 +104,11 @@ impl Command {
let data_dir = self.datadir.unwrap_or_chain_default(self.chain.chain); let data_dir = self.datadir.unwrap_or_chain_default(self.chain.chain);
let db_path = data_dir.db(); let db_path = data_dir.db();
info!(target: "reth::cli", path = ?db_path, "Opening database"); info!(target: "reth::cli", path = ?db_path, "Opening database");
let db = Arc::new(init_db(db_path, self.db.database_args())?); let db = Arc::new(open_db_read_only(&db_path, self.db.database_args())?);
let provider_factory = ProviderFactory::new( let provider_factory = ProviderFactory::new(
db, db,
self.chain.clone(), self.chain.clone(),
StaticFileProvider::read_write(data_dir.static_files())?, StaticFileProvider::read_only(data_dir.static_files())?,
); );
info!(target: "reth::cli", "Database opened"); info!(target: "reth::cli", "Database opened");

View File

@ -146,12 +146,12 @@ impl Command {
let db = Arc::new(init_db(db_path, self.db.database_args())?); let db = Arc::new(init_db(db_path, self.db.database_args())?);
info!(target: "reth::cli", "Database opened"); info!(target: "reth::cli", "Database opened");
let factory = ProviderFactory::new( let provider_factory = ProviderFactory::new(
Arc::clone(&db), Arc::clone(&db),
self.chain.clone(), self.chain.clone(),
StaticFileProvider::read_write(data_dir.static_files())?, StaticFileProvider::read_write(data_dir.static_files())?,
); );
let mut provider_rw = factory.provider_rw()?; let mut provider_rw = provider_factory.provider_rw()?;
if let Some(listen_addr) = self.metrics { if let Some(listen_addr) = self.metrics {
info!(target: "reth::cli", "Starting metrics endpoint at {}", listen_addr); info!(target: "reth::cli", "Starting metrics endpoint at {}", listen_addr);
@ -159,7 +159,7 @@ impl Command {
listen_addr, listen_addr,
prometheus_exporter::install_recorder()?, prometheus_exporter::install_recorder()?,
Arc::clone(&db), Arc::clone(&db),
factory.static_file_provider(), provider_factory.static_file_provider(),
metrics_process::Collector::default(), metrics_process::Collector::default(),
ctx.task_executor, ctx.task_executor,
) )
@ -196,12 +196,6 @@ impl Command {
let default_peers_path = data_dir.known_peers(); let default_peers_path = data_dir.known_peers();
let provider_factory = Arc::new(ProviderFactory::new(
db.clone(),
self.chain.clone(),
StaticFileProvider::read_write(data_dir.static_files())?,
));
let network = self let network = self
.network .network
.network_config( .network_config(
@ -226,7 +220,7 @@ impl Command {
config.stages.bodies.downloader_min_concurrent_requests..= config.stages.bodies.downloader_min_concurrent_requests..=
config.stages.bodies.downloader_max_concurrent_requests, config.stages.bodies.downloader_max_concurrent_requests,
) )
.build(fetch_client, consensus.clone(), provider_factory), .build(fetch_client, consensus.clone(), provider_factory.clone()),
); );
(Box::new(stage), None) (Box::new(stage), None)
} }
@ -323,7 +317,7 @@ impl Command {
if self.commit { if self.commit {
provider_rw.commit()?; provider_rw.commit()?;
provider_rw = factory.provider_rw()?; provider_rw = provider_factory.provider_rw()?;
} }
} }
} }
@ -346,7 +340,7 @@ impl Command {
} }
if self.commit { if self.commit {
provider_rw.commit()?; provider_rw.commit()?;
provider_rw = factory.provider_rw()?; provider_rw = provider_factory.provider_rw()?;
} }
if done { if done {

View File

@ -582,7 +582,7 @@ mod tests {
let genesis_hash = init_genesis(ProviderFactory::new( let genesis_hash = init_genesis(ProviderFactory::new(
factory.into_db(), factory.into_db(),
MAINNET.clone(), MAINNET.clone(),
StaticFileProvider::read_write(static_file_provider.path()).unwrap(), static_file_provider,
)); ));
assert_eq!( assert_eq!(

View File

@ -42,6 +42,7 @@ derive_more.workspace = true
eyre.workspace = true eyre.workspace = true
paste.workspace = true paste.workspace = true
rustc-hash.workspace = true rustc-hash.workspace = true
sysinfo = "0.30"
# arbitrary utils # arbitrary utils
arbitrary = { workspace = true, features = ["derive"], optional = true } arbitrary = { workspace = true, features = ["derive"], optional = true }

View File

@ -4,6 +4,7 @@ use crate::{
cursor::{DbCursorRO, DbCursorRW}, cursor::{DbCursorRO, DbCursorRW},
database::Database, database::Database,
database_metrics::{DatabaseMetadata, DatabaseMetadataValue, DatabaseMetrics}, database_metrics::{DatabaseMetadata, DatabaseMetadataValue, DatabaseMetrics},
lockfile::StorageLock,
metrics::DatabaseEnvMetrics, metrics::DatabaseEnvMetrics,
models::client_version::ClientVersion, models::client_version::ClientVersion,
tables::{self, TableType, Tables}, tables::{self, TableType, Tables},
@ -134,6 +135,8 @@ pub struct DatabaseEnv {
inner: Environment, inner: Environment,
/// Cache for metric handles. If `None`, metrics are not recorded. /// Cache for metric handles. If `None`, metrics are not recorded.
metrics: Option<Arc<DatabaseEnvMetrics>>, metrics: Option<Arc<DatabaseEnvMetrics>>,
/// Write lock for when dealing with a read-write environment.
_lock_file: Option<StorageLock>,
} }
impl Database for DatabaseEnv { impl Database for DatabaseEnv {
@ -251,6 +254,15 @@ impl DatabaseEnv {
kind: DatabaseEnvKind, kind: DatabaseEnvKind,
args: DatabaseArguments, args: DatabaseArguments,
) -> Result<Self, DatabaseError> { ) -> Result<Self, DatabaseError> {
let _lock_file = if kind.is_rw() {
Some(
StorageLock::try_acquire(path)
.map_err(|err| DatabaseError::Other(err.to_string()))?,
)
} else {
None
};
let mut inner_env = Environment::builder(); let mut inner_env = Environment::builder();
let mode = match kind { let mode = match kind {
@ -382,6 +394,7 @@ impl DatabaseEnv {
let env = Self { let env = Self {
inner: inner_env.open(path).map_err(|e| DatabaseError::Open(e.into()))?, inner: inner_env.open(path).map_err(|e| DatabaseError::Open(e.into()))?,
metrics: None, metrics: None,
_lock_file,
}; };
Ok(env) Ok(env)

View File

@ -67,6 +67,7 @@
pub mod abstraction; pub mod abstraction;
mod implementation; mod implementation;
pub mod lockfile;
mod metrics; mod metrics;
pub mod static_file; pub mod static_file;
pub mod tables; pub mod tables;

View File

@ -0,0 +1,79 @@
//! Storage lock utils.
use reth_storage_errors::lockfile::StorageLockError;
use reth_tracing::tracing::error;
use std::{
path::{Path, PathBuf},
process,
sync::Arc,
};
use sysinfo::System;
/// A file lock for a storage directory to ensure exclusive read-write access.
///
/// This lock stores the PID of the process holding it and is released (deleted) on a graceful
/// shutdown. On resuming from a crash, the stored PID helps verify that no other process holds the
/// lock.
#[derive(Debug, Clone)]
pub struct StorageLock(Arc<StorageLockInner>);
impl StorageLock {
/// Tries to acquires a write lock on the target directory, returning [StorageLockError] if
/// unsuccessful.
pub fn try_acquire(path: &Path) -> Result<Self, StorageLockError> {
let path = path.join("lock");
let lock = match parse_lock_file_pid(&path)? {
Some(pid) => {
if System::new_all().process(pid.into()).is_some() {
return Err(StorageLockError::Taken(pid))
} else {
// If PID is no longer active, take hold of the lock.
StorageLockInner::new(path)
}
}
None => StorageLockInner::new(path),
};
Ok(Self(Arc::new(lock?)))
}
}
impl Drop for StorageLock {
fn drop(&mut self) {
if Arc::strong_count(&self.0) == 1 && self.0.file_path.exists() {
// TODO: should only happen during tests that the file does not exist: tempdir is
// getting dropped first. However, tempdir shouldn't be dropped
// before any of the storage providers.
if let Err(err) = reth_fs_util::remove_file(&self.0.file_path) {
error!(%err, "Failed to delete lock file");
}
}
}
}
#[derive(Debug)]
struct StorageLockInner {
file_path: PathBuf,
}
impl StorageLockInner {
/// Creates lock file and writes this process PID into it.
fn new(file_path: PathBuf) -> Result<Self, StorageLockError> {
// Create the directory if it doesn't exist
if let Some(parent) = file_path.parent() {
reth_fs_util::create_dir_all(parent)?;
}
reth_fs_util::write(&file_path, format!("{}", process::id()))?;
Ok(Self { file_path })
}
}
/// Parses the PID from the lock file if it exists.
fn parse_lock_file_pid(path: &Path) -> Result<Option<usize>, StorageLockError> {
if path.exists() {
let contents = reth_fs_util::read_to_string(path)?;
return Ok(contents.trim().parse().ok())
}
Ok(None)
}

View File

@ -37,6 +37,9 @@ pub enum DatabaseError {
/// Failed to use the specified log level, as it's not available. /// Failed to use the specified log level, as it's not available.
#[error("log level {0:?} is not available")] #[error("log level {0:?} is not available")]
LogLevelUnavailable(LogLevel), LogLevelUnavailable(LogLevel),
/// Other unspecified error.
#[error("{0}")]
Other(String),
} }
/// Common error struct to propagate implementation-specific error information. /// Common error struct to propagate implementation-specific error information.

View File

@ -11,5 +11,8 @@
/// Database error /// Database error
pub mod db; pub mod db;
/// Lockfile error
pub mod lockfile;
/// Provider error /// Provider error
pub mod provider; pub mod provider;

View File

@ -0,0 +1,20 @@
use reth_fs_util::FsPathError;
use thiserror::Error;
#[derive(Error, Debug, Clone, PartialEq, Eq)]
/// Storage lock error.
pub enum StorageLockError {
/// Write lock taken
#[error("storage directory is currently in use as read-write by another process: {0}")]
Taken(usize),
/// Indicates other unspecified errors.
#[error("{0}")]
Other(String),
}
/// TODO: turn into variant once ProviderError
impl From<FsPathError> for StorageLockError {
fn from(source: FsPathError) -> Self {
Self::Other(source.to_string())
}
}

View File

@ -131,6 +131,9 @@ pub enum ProviderError {
/// Consistent view error. /// Consistent view error.
#[error("failed to initialize consistent view: {0}")] #[error("failed to initialize consistent view: {0}")]
ConsistentView(Box<ConsistentViewError>), ConsistentView(Box<ConsistentViewError>),
/// Storage lock error.
#[error(transparent)]
StorageLockError(#[from] crate::lockfile::StorageLockError),
} }
impl From<reth_fs_util::FsPathError> for ProviderError { impl From<reth_fs_util::FsPathError> for ProviderError {

View File

@ -12,6 +12,7 @@ use parking_lot::RwLock;
use reth_db::{ use reth_db::{
codecs::CompactU256, codecs::CompactU256,
cursor::DbCursorRO, cursor::DbCursorRO,
lockfile::StorageLock,
models::StoredBlockBodyIndices, models::StoredBlockBodyIndices,
static_file::{iter_static_files, HeaderMask, ReceiptMask, StaticFileCursor, TransactionMask}, static_file::{iter_static_files, HeaderMask, ReceiptMask, StaticFileCursor, TransactionMask},
table::Table, table::Table,
@ -58,6 +59,11 @@ impl StaticFileAccess {
pub const fn is_read_only(&self) -> bool { pub const fn is_read_only(&self) -> bool {
matches!(self, Self::RO) matches!(self, Self::RO)
} }
/// Returns `true` if read-write access.
pub const fn is_read_write(&self) -> bool {
matches!(self, Self::RW)
}
} }
/// [`StaticFileProvider`] manages all existing [`StaticFileJarProvider`]. /// [`StaticFileProvider`] manages all existing [`StaticFileJarProvider`].
@ -111,11 +117,19 @@ pub struct StaticFileProviderInner {
metrics: Option<Arc<StaticFileProviderMetrics>>, metrics: Option<Arc<StaticFileProviderMetrics>>,
/// Access rights of the provider. /// Access rights of the provider.
access: StaticFileAccess, access: StaticFileAccess,
/// Write lock for when access is [StaticFileAccess::RW].
_lock_file: Option<StorageLock>,
} }
impl StaticFileProviderInner { impl StaticFileProviderInner {
/// Creates a new [`StaticFileProviderInner`]. /// Creates a new [`StaticFileProviderInner`].
fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> { fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
let _lock_file = if access.is_read_write() {
Some(StorageLock::try_acquire(path.as_ref())?)
} else {
None
};
let provider = Self { let provider = Self {
map: Default::default(), map: Default::default(),
writers: Default::default(), writers: Default::default(),
@ -125,6 +139,7 @@ impl StaticFileProviderInner {
load_filters: false, load_filters: false,
metrics: None, metrics: None,
access, access,
_lock_file,
}; };
Ok(provider) Ok(provider)