fix(static-file): pass producer as Arc<Mutex<_>> to ensure only one is active (#7143)

Co-authored-by: joshieDo <ranriver@protonmail.com>
This commit is contained in:
Alexey Shekhirin
2024-03-15 13:26:15 +00:00
committed by GitHub
parent 189de79bd9
commit 52d49832d9
10 changed files with 149 additions and 46 deletions

View File

@ -27,6 +27,7 @@ tokio-stream.workspace = true
tracing.workspace = true
clap = { workspace = true, features = ["derive"], optional = true }
rayon.workspace = true
parking_lot = { workspace = true, features = ["send_guard", "arc_lock"] }
[dev-dependencies]
reth-db = { workspace = true, features = ["test-utils"] }

View File

@ -13,5 +13,6 @@ mod static_file_producer;
pub use event::StaticFileProducerEvent;
pub use static_file_producer::{
StaticFileProducer, StaticFileProducerResult, StaticFileProducerWithResult, StaticFileTargets,
StaticFileProducer, StaticFileProducerInner, StaticFileProducerResult,
StaticFileProducerWithResult, StaticFileTargets,
};

View File

@ -1,6 +1,7 @@
//! Support for producing static files.
use crate::{segments, segments::Segment, StaticFileProducerEvent};
use parking_lot::Mutex;
use rayon::prelude::*;
use reth_db::database::Database;
use reth_interfaces::RethResult;
@ -10,26 +11,58 @@ use reth_provider::{
ProviderFactory,
};
use reth_tokio_util::EventListeners;
use std::{ops::RangeInclusive, time::Instant};
use std::{
ops::{Deref, RangeInclusive},
sync::Arc,
time::Instant,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, trace};
/// Result of [StaticFileProducer::run] execution.
/// Result of [StaticFileProducerInner::run] execution.
pub type StaticFileProducerResult = RethResult<StaticFileTargets>;
/// The [StaticFileProducer] instance itself with the result of [StaticFileProducer::run]
/// The [StaticFileProducer] instance itself with the result of [StaticFileProducerInner::run]
pub type StaticFileProducerWithResult<DB> = (StaticFileProducer<DB>, StaticFileProducerResult);
/// Static File producer routine. See [StaticFileProducer::run] for more detailed description.
/// Static File producer. It's a wrapper around [StaticFileProducer] that allows to share it
/// between threads.
#[derive(Debug, Clone)]
pub struct StaticFileProducer<DB> {
pub struct StaticFileProducer<DB>(Arc<Mutex<StaticFileProducerInner<DB>>>);
impl<DB: Database> StaticFileProducer<DB> {
/// Creates a new [StaticFileProducer].
pub fn new(
provider_factory: ProviderFactory<DB>,
static_file_provider: StaticFileProvider,
prune_modes: PruneModes,
) -> Self {
Self(Arc::new(Mutex::new(StaticFileProducerInner::new(
provider_factory,
static_file_provider,
prune_modes,
))))
}
}
impl<DB> Deref for StaticFileProducer<DB> {
type Target = Arc<Mutex<StaticFileProducerInner<DB>>>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
/// Static File producer routine. See [StaticFileProducerInner::run] for more detailed description.
#[derive(Debug)]
pub struct StaticFileProducerInner<DB> {
/// Provider factory
provider_factory: ProviderFactory<DB>,
/// Static File provider
static_file_provider: StaticFileProvider,
/// Pruning configuration for every part of the data that can be pruned. Set by user, and
/// needed in [StaticFileProducer] to prevent attempting to move prunable data to static files.
/// See [StaticFileProducer::get_static_file_targets].
/// needed in [StaticFileProducerInner] to prevent attempting to move prunable data to static
/// files. See [StaticFileProducerInner::get_static_file_targets].
prune_modes: PruneModes,
listeners: EventListeners<StaticFileProducerEvent>,
}
@ -68,9 +101,8 @@ impl StaticFileTargets {
}
}
impl<DB: Database> StaticFileProducer<DB> {
/// Creates a new [StaticFileProducer].
pub fn new(
impl<DB: Database> StaticFileProducerInner<DB> {
fn new(
provider_factory: ProviderFactory<DB>,
static_file_provider: StaticFileProvider,
prune_modes: PruneModes,
@ -200,9 +232,11 @@ impl<DB: Database> StaticFileProducer<DB> {
#[cfg(test)]
mod tests {
use crate::{static_file_producer::StaticFileTargets, StaticFileProducer};
use crate::static_file_producer::{
StaticFileProducer, StaticFileProducerInner, StaticFileTargets,
};
use assert_matches::assert_matches;
use reth_db::{database::Database, transaction::DbTx};
use reth_db::{database::Database, test_utils::TempDatabase, transaction::DbTx, DatabaseEnv};
use reth_interfaces::{
provider::ProviderError,
test_utils::{
@ -214,13 +248,18 @@ mod tests {
use reth_primitives::{
static_file::HighestStaticFiles, PruneModes, StaticFileSegment, B256, U256,
};
use reth_provider::providers::StaticFileWriter;
use reth_provider::{
providers::{StaticFileProvider, StaticFileWriter},
ProviderFactory,
};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use std::{
sync::{mpsc::channel, Arc},
time::Duration,
};
#[test]
fn run() {
fn setup() -> (ProviderFactory<Arc<TempDatabase<DatabaseEnv>>>, StaticFileProvider) {
let mut rng = generators::rng();
let db = TestStageDB::default();
let blocks = random_block_range(&mut rng, 0..=3, B256::ZERO, 2..3);
@ -251,8 +290,14 @@ mod tests {
let provider_factory = db.factory;
let static_file_provider = provider_factory.static_file_provider();
(provider_factory, static_file_provider)
}
let mut static_file_producer = StaticFileProducer::new(
#[test]
fn run() {
let (provider_factory, static_file_provider) = setup();
let mut static_file_producer = StaticFileProducerInner::new(
provider_factory,
static_file_provider.clone(),
PruneModes::default(),
@ -324,4 +369,48 @@ mod tests {
HighestStaticFiles { headers: Some(3), receipts: Some(3), transactions: Some(3) }
);
}
/// Tests that a cloneable [`StaticFileProducer`] type is not susceptible to any race condition.
#[test]
fn only_one() {
let (provider_factory, static_file_provider) = setup();
let static_file_producer = StaticFileProducer::new(
provider_factory,
static_file_provider.clone(),
PruneModes::default(),
);
let (tx, rx) = channel();
for i in 0..5 {
let producer = static_file_producer.clone();
let tx = tx.clone();
std::thread::spawn(move || {
let mut locked_producer = producer.lock();
if i == 0 {
// Let other threads spawn as well.
std::thread::sleep(Duration::from_millis(100));
}
let targets = locked_producer
.get_static_file_targets(HighestStaticFiles {
headers: Some(1),
receipts: Some(1),
transactions: Some(1),
})
.expect("get static file targets");
assert_matches!(locked_producer.run(targets.clone()), Ok(_));
tx.send(targets).unwrap();
});
}
drop(tx);
let mut only_one = Some(());
for target in rx {
// Only the first spawn should have any meaningful target.
assert!(only_one.take().is_some_and(|_| target.any()) || !target.any())
}
}
}