refactor: reduce ProviderFactory usage (#10798)

This commit is contained in:
Arsenii Kulikov
2024-09-11 11:36:34 +04:00
committed by GitHub
parent f6b2021440
commit d11bbe686b
29 changed files with 190 additions and 129 deletions

4
Cargo.lock generated
View File

@ -7692,10 +7692,8 @@ dependencies = [
"reth-network",
"reth-network-p2p",
"reth-network-peers",
"reth-node-types",
"reth-optimism-chainspec",
"reth-primitives",
"reth-provider",
"reth-prune-types",
"reth-rpc-api",
"reth-rpc-eth-api",
@ -7704,6 +7702,7 @@ dependencies = [
"reth-rpc-types",
"reth-rpc-types-compat",
"reth-stages-types",
"reth-storage-api",
"reth-storage-errors",
"reth-tracing",
"reth-transaction-pool",
@ -8679,6 +8678,7 @@ dependencies = [
"alloy-primitives",
"auto_impl",
"reth-chainspec",
"reth-db-api",
"reth-db-models",
"reth-execution-types",
"reth-primitives",

View File

@ -64,7 +64,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
consensus: Arc<dyn Consensus>,
provider_factory: ProviderFactory<N>,
task_executor: &TaskExecutor,
static_file_producer: StaticFileProducer<N>,
static_file_producer: StaticFileProducer<ProviderFactory<N>>,
) -> eyre::Result<Pipeline<N>>
where
Client: BlockClient + 'static,

View File

@ -163,7 +163,7 @@ pub fn build_import_pipeline<N, C, E>(
provider_factory: ProviderFactory<N>,
consensus: &Arc<C>,
file_client: Arc<FileClient>,
static_file_producer: StaticFileProducer<N>,
static_file_producer: StaticFileProducer<ProviderFactory<N>>,
disable_exec: bool,
executor: E,
) -> eyre::Result<(Pipeline<N>, impl Stream<Item = NodeEvent>)>

View File

@ -7,9 +7,10 @@ use crate::{
use alloy_primitives::BlockNumber;
use futures::FutureExt;
use reth_errors::RethResult;
use reth_node_types::NodeTypesWithDB;
use reth_primitives::static_file::HighestStaticFiles;
use reth_provider::providers::ProviderNodeTypes;
use reth_provider::{
BlockReader, DatabaseProviderFactory, StageCheckpointReader, StaticFileProviderFactory,
};
use reth_static_file::{StaticFileProducer, StaticFileProducerWithResult};
use reth_tasks::TaskSpawner;
use std::task::{ready, Context, Poll};
@ -20,17 +21,22 @@ use tracing::trace;
///
/// This type controls the [`StaticFileProducer`].
#[derive(Debug)]
pub struct StaticFileHook<N: NodeTypesWithDB> {
pub struct StaticFileHook<Provider> {
/// The current state of the `static_file_producer`.
state: StaticFileProducerState<N>,
state: StaticFileProducerState<Provider>,
/// The type that can spawn the `static_file_producer` task.
task_spawner: Box<dyn TaskSpawner>,
}
impl<N: ProviderNodeTypes> StaticFileHook<N> {
impl<Provider> StaticFileHook<Provider>
where
Provider: StaticFileProviderFactory
+ DatabaseProviderFactory<Provider: StageCheckpointReader + BlockReader>
+ 'static,
{
/// Create a new instance
pub fn new(
static_file_producer: StaticFileProducer<N>,
static_file_producer: StaticFileProducer<Provider>,
task_spawner: Box<dyn TaskSpawner>,
) -> Self {
Self { state: StaticFileProducerState::Idle(Some(static_file_producer)), task_spawner }
@ -128,7 +134,12 @@ impl<N: ProviderNodeTypes> StaticFileHook<N> {
}
}
impl<N: ProviderNodeTypes> EngineHook for StaticFileHook<N> {
impl<Provider> EngineHook for StaticFileHook<Provider>
where
Provider: StaticFileProviderFactory
+ DatabaseProviderFactory<Provider: StageCheckpointReader + BlockReader>
+ 'static,
{
fn name(&self) -> &'static str {
"StaticFile"
}
@ -164,9 +175,9 @@ impl<N: ProviderNodeTypes> EngineHook for StaticFileHook<N> {
/// [`StaticFileProducerState::Idle`] means that the static file producer is currently idle.
/// [`StaticFileProducerState::Running`] means that the static file producer is currently running.
#[derive(Debug)]
enum StaticFileProducerState<N: NodeTypesWithDB> {
enum StaticFileProducerState<Provider> {
/// [`StaticFileProducer`] is idle.
Idle(Option<StaticFileProducer<N>>),
Idle(Option<StaticFileProducer<Provider>>),
/// [`StaticFileProducer`] is running and waiting for a response
Running(oneshot::Receiver<StaticFileProducerWithResult<N>>),
Running(oneshot::Receiver<StaticFileProducerWithResult<Provider>>),
}

View File

@ -620,7 +620,7 @@ where
/// If the database is empty, returns the genesis block.
pub fn lookup_head(&self) -> eyre::Result<Head> {
self.node_config()
.lookup_head(self.provider_factory().clone())
.lookup_head(self.provider_factory())
.wrap_err("the head block is missing")
}
@ -744,7 +744,7 @@ where
}
/// Creates a new [`StaticFileProducer`] with the attached database.
pub fn static_file_producer(&self) -> StaticFileProducer<T::Types> {
pub fn static_file_producer(&self) -> StaticFileProducer<ProviderFactory<T::Types>> {
StaticFileProducer::new(self.provider_factory().clone(), self.prune_modes())
}

View File

@ -32,7 +32,7 @@ pub fn build_networked_pipeline<N, Client, Executor>(
metrics_tx: reth_stages::MetricEventsSender,
prune_config: Option<PruneConfig>,
max_block: Option<BlockNumber>,
static_file_producer: StaticFileProducer<N>,
static_file_producer: StaticFileProducer<ProviderFactory<N>>,
executor: Executor,
exex_manager_handle: ExExManagerHandle,
) -> eyre::Result<Pipeline<N>>
@ -78,7 +78,7 @@ pub fn build_pipeline<N, H, B, Executor>(
max_block: Option<u64>,
metrics_tx: reth_stages::MetricEventsSender,
prune_config: Option<PruneConfig>,
static_file_producer: StaticFileProducer<N>,
static_file_producer: StaticFileProducer<ProviderFactory<N>>,
executor: Executor,
exex_manager_handle: ExExManagerHandle,
) -> eyre::Result<Pipeline<N>>

View File

@ -19,7 +19,7 @@ reth-cli-util.workspace = true
reth-fs-util.workspace = true
reth-db = { workspace = true, features = ["mdbx"] }
reth-storage-errors.workspace = true
reth-provider.workspace = true
reth-storage-api.workspace = true
reth-network = { workspace = true, features = ["serde"] }
reth-network-p2p.workspace = true
reth-rpc-eth-types.workspace = true
@ -39,7 +39,6 @@ reth-consensus-common.workspace = true
reth-prune-types.workspace = true
reth-stages-types.workspace = true
reth-optimism-chainspec = { workspace = true, optional = true }
reth-node-types.workspace = true
# ethereum
alloy-genesis.workspace = true
@ -85,7 +84,6 @@ tempfile.workspace = true
[features]
optimism = [
"reth-primitives/optimism",
"reth-provider/optimism",
"reth-rpc-types-compat/optimism",
"reth-rpc-eth-api/optimism",
"dep:reth-optimism-chainspec",

View File

@ -16,10 +16,11 @@ use serde::{de::DeserializeOwned, Serialize};
use std::{fs, path::Path};
use alloy_primitives::{BlockNumber, B256};
use reth_node_types::NodeTypesWithDB;
use reth_primitives::{BlockHashOrNumber, Head, SealedHeader};
use reth_provider::{BlockHashReader, HeaderProvider, ProviderFactory, StageCheckpointReader};
use reth_stages_types::StageId;
use reth_storage_api::{
BlockHashReader, DatabaseProviderFactory, HeaderProvider, StageCheckpointReader,
};
use reth_storage_errors::provider::ProviderResult;
use std::{net::SocketAddr, path::PathBuf, sync::Arc};
use tracing::*;
@ -263,11 +264,13 @@ impl NodeConfig {
/// Fetches the head block from the database.
///
/// If the database is empty, returns the genesis block.
pub fn lookup_head<N: NodeTypesWithDB<ChainSpec = ChainSpec>>(
&self,
factory: ProviderFactory<N>,
) -> ProviderResult<Head> {
let provider = factory.provider()?;
pub fn lookup_head<Factory>(&self, factory: &Factory) -> ProviderResult<Head>
where
Factory: DatabaseProviderFactory<
Provider: HeaderProvider + StageCheckpointReader + BlockHashReader,
>,
{
let provider = factory.database_provider_ro()?;
let head = provider.get_stage_checkpoint(StageId::Finish)?.unwrap_or_default().block_number;

View File

@ -32,7 +32,7 @@ pub(crate) async fn build_import_pipeline<N, C>(
provider_factory: ProviderFactory<N>,
consensus: &Arc<C>,
file_client: Arc<FileClient>,
static_file_producer: StaticFileProducer<N>,
static_file_producer: StaticFileProducer<ProviderFactory<N>>,
disable_exec: bool,
) -> eyre::Result<(Pipeline<N>, impl Stream<Item = NodeEvent>)>
where

View File

@ -72,7 +72,7 @@ where
pub fn build<N: NodeTypesWithDB<DB = DB>>(
self,
provider_factory: ProviderFactory<N>,
static_file_producer: StaticFileProducer<N>,
static_file_producer: StaticFileProducer<ProviderFactory<N>>,
) -> Pipeline<N> {
let Self { stages, max_block, tip_tx, metrics_tx } = self;
Pipeline {

View File

@ -71,7 +71,7 @@ pub struct Pipeline<N: NodeTypesWithDB> {
stages: Vec<BoxedStage<N::DB>>,
/// The maximum block number to sync to.
max_block: Option<BlockNumber>,
static_file_producer: StaticFileProducer<N>,
static_file_producer: StaticFileProducer<ProviderFactory<N>>,
/// Sender for events the pipeline emits.
event_sender: EventSender<PipelineEvent>,
/// Keeps track of the progress of the pipeline.

View File

@ -1,10 +1,10 @@
use crate::segments::Segment;
use alloy_primitives::BlockNumber;
use reth_db::tables;
use reth_db_api::{cursor::DbCursorRO, database::Database, transaction::DbTx};
use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
use reth_provider::{
providers::{StaticFileProvider, StaticFileWriter},
DatabaseProviderRO,
DBProvider,
};
use reth_static_file_types::StaticFileSegment;
use reth_storage_errors::provider::ProviderResult;
@ -14,14 +14,14 @@ use std::ops::RangeInclusive;
#[derive(Debug, Default)]
pub struct Headers;
impl<DB: Database> Segment<DB> for Headers {
impl<Provider: DBProvider> Segment<Provider> for Headers {
fn segment(&self) -> StaticFileSegment {
StaticFileSegment::Headers
}
fn copy_to_static_files(
&self,
provider: DatabaseProviderRO<DB>,
provider: Provider,
static_file_provider: StaticFileProvider,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {

View File

@ -10,14 +10,13 @@ mod receipts;
pub use receipts::Receipts;
use alloy_primitives::BlockNumber;
use reth_db_api::database::Database;
use reth_provider::{providers::StaticFileProvider, DatabaseProviderRO};
use reth_provider::providers::StaticFileProvider;
use reth_static_file_types::StaticFileSegment;
use reth_storage_errors::provider::ProviderResult;
use std::ops::RangeInclusive;
/// A segment represents moving some portion of the data to static files.
pub trait Segment<DB: Database>: Send + Sync {
pub trait Segment<Provider>: Send + Sync {
/// Returns the [`StaticFileSegment`].
fn segment(&self) -> StaticFileSegment;
@ -25,7 +24,7 @@ pub trait Segment<DB: Database>: Send + Sync {
/// the management of and writing to files.
fn copy_to_static_files(
&self,
provider: DatabaseProviderRO<DB>,
provider: Provider,
static_file_provider: StaticFileProvider,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()>;

View File

@ -1,10 +1,10 @@
use crate::segments::Segment;
use alloy_primitives::BlockNumber;
use reth_db::tables;
use reth_db_api::{cursor::DbCursorRO, database::Database, transaction::DbTx};
use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
use reth_provider::{
providers::{StaticFileProvider, StaticFileWriter},
BlockReader, DatabaseProviderRO,
BlockReader, DBProvider,
};
use reth_static_file_types::StaticFileSegment;
use reth_storage_errors::provider::{ProviderError, ProviderResult};
@ -14,14 +14,14 @@ use std::ops::RangeInclusive;
#[derive(Debug, Default)]
pub struct Receipts;
impl<DB: Database> Segment<DB> for Receipts {
impl<Provider: DBProvider + BlockReader> Segment<Provider> for Receipts {
fn segment(&self) -> StaticFileSegment {
StaticFileSegment::Receipts
}
fn copy_to_static_files(
&self,
provider: DatabaseProviderRO<DB>,
provider: Provider,
static_file_provider: StaticFileProvider,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {

View File

@ -1,10 +1,10 @@
use crate::segments::Segment;
use alloy_primitives::BlockNumber;
use reth_db::tables;
use reth_db_api::{cursor::DbCursorRO, database::Database, transaction::DbTx};
use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
use reth_provider::{
providers::{StaticFileProvider, StaticFileWriter},
BlockReader, DatabaseProviderRO,
BlockReader, DBProvider,
};
use reth_static_file_types::StaticFileSegment;
use reth_storage_errors::provider::{ProviderError, ProviderResult};
@ -14,7 +14,7 @@ use std::ops::RangeInclusive;
#[derive(Debug, Default)]
pub struct Transactions;
impl<DB: Database> Segment<DB> for Transactions {
impl<Provider: DBProvider + BlockReader> Segment<Provider> for Transactions {
fn segment(&self) -> StaticFileSegment {
StaticFileSegment::Transactions
}
@ -23,7 +23,7 @@ impl<DB: Database> Segment<DB> for Transactions {
/// [`StaticFileSegment::Transactions`] for the provided block range.
fn copy_to_static_files(
&self,
provider: DatabaseProviderRO<DB>,
provider: Provider,
static_file_provider: StaticFileProvider,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {

View File

@ -4,11 +4,9 @@ use crate::{segments, segments::Segment, StaticFileProducerEvent};
use alloy_primitives::BlockNumber;
use parking_lot::Mutex;
use rayon::prelude::*;
use reth_chainspec::ChainSpec;
use reth_node_types::NodeTypesWithDB;
use reth_provider::{
providers::StaticFileWriter, ProviderFactory, StageCheckpointReader as _,
StaticFileProviderFactory,
providers::StaticFileWriter, BlockReader, DBProvider, DatabaseProviderFactory,
StageCheckpointReader, StaticFileProviderFactory,
};
use reth_prune_types::PruneModes;
use reth_stages_types::StageId;
@ -26,28 +24,29 @@ use tracing::{debug, trace};
pub type StaticFileProducerResult = ProviderResult<StaticFileTargets>;
/// The [`StaticFileProducer`] instance itself with the result of [`StaticFileProducerInner::run`]
pub type StaticFileProducerWithResult<N> = (StaticFileProducer<N>, StaticFileProducerResult);
pub type StaticFileProducerWithResult<Provider> =
(StaticFileProducer<Provider>, StaticFileProducerResult);
/// Static File producer. It's a wrapper around [`StaticFileProducer`] that allows to share it
/// between threads.
#[derive(Debug)]
pub struct StaticFileProducer<N: NodeTypesWithDB>(Arc<Mutex<StaticFileProducerInner<N>>>);
pub struct StaticFileProducer<Provider>(Arc<Mutex<StaticFileProducerInner<Provider>>>);
impl<N: NodeTypesWithDB> StaticFileProducer<N> {
impl<Provider> StaticFileProducer<Provider> {
/// Creates a new [`StaticFileProducer`].
pub fn new(provider_factory: ProviderFactory<N>, prune_modes: PruneModes) -> Self {
Self(Arc::new(Mutex::new(StaticFileProducerInner::new(provider_factory, prune_modes))))
pub fn new(provider: Provider, prune_modes: PruneModes) -> Self {
Self(Arc::new(Mutex::new(StaticFileProducerInner::new(provider, prune_modes))))
}
}
impl<N: NodeTypesWithDB> Clone for StaticFileProducer<N> {
impl<Provider> Clone for StaticFileProducer<Provider> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<N: NodeTypesWithDB> Deref for StaticFileProducer<N> {
type Target = Arc<Mutex<StaticFileProducerInner<N>>>;
impl<Provider> Deref for StaticFileProducer<Provider> {
type Target = Arc<Mutex<StaticFileProducerInner<Provider>>>;
fn deref(&self) -> &Self::Target {
&self.0
@ -57,9 +56,9 @@ impl<N: NodeTypesWithDB> Deref for StaticFileProducer<N> {
/// Static File producer routine. See [`StaticFileProducerInner::run`] for more detailed
/// description.
#[derive(Debug)]
pub struct StaticFileProducerInner<N: NodeTypesWithDB> {
pub struct StaticFileProducerInner<Provider> {
/// Provider factory
provider_factory: ProviderFactory<N>,
provider: Provider,
/// Pruning configuration for every part of the data that can be pruned. Set by user, and
/// needed in [`StaticFileProducerInner`] to prevent attempting to move prunable data to static
/// files. See [`StaticFileProducerInner::get_static_file_targets`].
@ -101,13 +100,17 @@ impl StaticFileTargets {
}
}
impl<N: NodeTypesWithDB> StaticFileProducerInner<N> {
fn new(provider_factory: ProviderFactory<N>, prune_modes: PruneModes) -> Self {
Self { provider_factory, prune_modes, event_sender: Default::default() }
impl<Provider> StaticFileProducerInner<Provider> {
fn new(provider: Provider, prune_modes: PruneModes) -> Self {
Self { provider, prune_modes, event_sender: Default::default() }
}
}
impl<N: NodeTypesWithDB<ChainSpec = ChainSpec>> StaticFileProducerInner<N> {
impl<Provider> StaticFileProducerInner<Provider>
where
Provider: StaticFileProviderFactory
+ DatabaseProviderFactory<Provider: StageCheckpointReader + BlockReader>,
{
/// Listen for events on the `static_file_producer`.
pub fn events(&self) -> EventStream<StaticFileProducerEvent> {
self.event_sender.new_listener()
@ -117,8 +120,8 @@ impl<N: NodeTypesWithDB<ChainSpec = ChainSpec>> StaticFileProducerInner<N> {
///
/// For each [Some] target in [`StaticFileTargets`], initializes a corresponding [Segment] and
/// runs it with the provided block range using [`reth_provider::providers::StaticFileProvider`]
/// and a read-only database transaction from [`ProviderFactory`]. All segments are run in
/// parallel.
/// and a read-only database transaction from [`DatabaseProviderFactory`]. All segments are run
/// in parallel.
///
/// NOTE: it doesn't delete the data from database, and the actual deleting (aka pruning) logic
/// lives in the `prune` crate.
@ -129,7 +132,7 @@ impl<N: NodeTypesWithDB<ChainSpec = ChainSpec>> StaticFileProducerInner<N> {
}
debug_assert!(targets.is_contiguous_to_highest_static_files(
self.provider_factory.static_file_provider().get_highest_static_files()
self.provider.static_file_provider().get_highest_static_files()
));
self.event_sender.notify(StaticFileProducerEvent::Started { targets: targets.clone() });
@ -137,7 +140,8 @@ impl<N: NodeTypesWithDB<ChainSpec = ChainSpec>> StaticFileProducerInner<N> {
debug!(target: "static_file", ?targets, "StaticFileProducer started");
let start = Instant::now();
let mut segments = Vec::<(Box<dyn Segment<N::DB>>, RangeInclusive<BlockNumber>)>::new();
let mut segments =
Vec::<(Box<dyn Segment<Provider::Provider>>, RangeInclusive<BlockNumber>)>::new();
if let Some(block_range) = targets.transactions.clone() {
segments.push((Box::new(segments::Transactions), block_range));
@ -155,8 +159,9 @@ impl<N: NodeTypesWithDB<ChainSpec = ChainSpec>> StaticFileProducerInner<N> {
// Create a new database transaction on every segment to prevent long-lived read-only
// transactions
let provider = self.provider_factory.provider()?.disable_long_read_transaction_safety();
segment.copy_to_static_files(provider, self.provider_factory.static_file_provider(), block_range.clone())?;
let mut provider = self.provider.database_provider_ro()?;
provider.disable_long_read_transaction_safety();
segment.copy_to_static_files(provider, self.provider.static_file_provider(), block_range.clone())?;
let elapsed = start.elapsed(); // TODO(alexey): track in metrics
debug!(target: "static_file", segment = %segment.segment(), ?block_range, ?elapsed, "Finished StaticFileProducer segment");
@ -164,9 +169,9 @@ impl<N: NodeTypesWithDB<ChainSpec = ChainSpec>> StaticFileProducerInner<N> {
Ok(())
})?;
self.provider_factory.static_file_provider().commit()?;
self.provider.static_file_provider().commit()?;
for (segment, block_range) in segments {
self.provider_factory
self.provider
.static_file_provider()
.update_index(segment.segment(), Some(*block_range.end()))?;
}
@ -185,7 +190,7 @@ impl<N: NodeTypesWithDB<ChainSpec = ChainSpec>> StaticFileProducerInner<N> {
///
/// Returns highest block numbers for all static file segments.
pub fn copy_to_static_files(&self) -> ProviderResult<HighestStaticFiles> {
let provider = self.provider_factory.provider()?;
let provider = self.provider.database_provider_ro()?;
let stages_checkpoints = [StageId::Headers, StageId::Execution, StageId::Bodies]
.into_iter()
.map(|stage| provider.get_stage_checkpoint(stage).map(|c| c.map(|c| c.block_number)))
@ -209,8 +214,7 @@ impl<N: NodeTypesWithDB<ChainSpec = ChainSpec>> StaticFileProducerInner<N> {
&self,
finalized_block_numbers: HighestStaticFiles,
) -> ProviderResult<StaticFileTargets> {
let highest_static_files =
self.provider_factory.static_file_provider().get_highest_static_files();
let highest_static_files = self.provider.static_file_provider().get_highest_static_files();
let targets = StaticFileTargets {
headers: finalized_block_numbers.headers.and_then(|finalized_block_number| {

View File

@ -261,8 +261,11 @@ impl<N: ProviderNodeTypes> BlockchainProvider2<N> {
}
}
impl<N: ProviderNodeTypes> DatabaseProviderFactory<N::DB> for BlockchainProvider2<N> {
fn database_provider_ro(&self) -> ProviderResult<DatabaseProviderRO<N::DB>> {
impl<N: ProviderNodeTypes> DatabaseProviderFactory for BlockchainProvider2<N> {
type DB = N::DB;
type Provider = DatabaseProviderRO<N::DB>;
fn database_provider_ro(&self) -> ProviderResult<Self::Provider> {
self.database.provider()
}
}

View File

@ -1,8 +1,7 @@
use crate::{BlockNumReader, DatabaseProviderFactory, DatabaseProviderRO, HeaderProvider};
use reth_db_api::database::Database;
use crate::{BlockNumReader, DatabaseProviderFactory, HeaderProvider};
use reth_primitives::{GotExpected, B256};
use reth_storage_api::BlockReader;
use reth_storage_errors::provider::ProviderResult;
use std::marker::PhantomData;
pub use reth_storage_errors::provider::ConsistentViewError;
@ -22,24 +21,22 @@ pub use reth_storage_errors::provider::ConsistentViewError;
/// appropriately.
/// 2) be sure that the state does not change.
#[derive(Clone, Debug)]
pub struct ConsistentDbView<DB, Provider> {
database: PhantomData<DB>,
provider: Provider,
pub struct ConsistentDbView<Factory> {
factory: Factory,
tip: Option<B256>,
}
impl<DB, Provider> ConsistentDbView<DB, Provider>
impl<Factory> ConsistentDbView<Factory>
where
DB: Database,
Provider: DatabaseProviderFactory<DB>,
Factory: DatabaseProviderFactory<Provider: BlockReader>,
{
/// Creates new consistent database view.
pub const fn new(provider: Provider, tip: Option<B256>) -> Self {
Self { database: PhantomData, provider, tip }
pub const fn new(factory: Factory, tip: Option<B256>) -> Self {
Self { factory, tip }
}
/// Creates new consistent database view with latest tip.
pub fn new_with_latest_tip(provider: Provider) -> ProviderResult<Self> {
pub fn new_with_latest_tip(provider: Factory) -> ProviderResult<Self> {
let provider_ro = provider.database_provider_ro()?;
let last_num = provider_ro.last_block_number()?;
let tip = provider_ro.sealed_header(last_num)?.map(|h| h.hash());
@ -47,9 +44,9 @@ where
}
/// Creates new read-only provider and performs consistency checks on the current tip.
pub fn provider_ro(&self) -> ProviderResult<DatabaseProviderRO<DB>> {
pub fn provider_ro(&self) -> ProviderResult<Factory::Provider> {
// Create a new provider.
let provider_ro = self.provider.database_provider_ro()?;
let provider_ro = self.factory.database_provider_ro()?;
// Check that the latest stored header number matches the number
// that consistent view was initialized with.

View File

@ -182,8 +182,11 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
}
}
impl<N: ProviderNodeTypes> DatabaseProviderFactory<N::DB> for ProviderFactory<N> {
fn database_provider_ro(&self) -> ProviderResult<DatabaseProviderRO<N::DB>> {
impl<N: ProviderNodeTypes> DatabaseProviderFactory for ProviderFactory<N> {
type DB = N::DB;
type Provider = DatabaseProviderRO<N::DB>;
fn database_provider_ro(&self) -> ProviderResult<Self::Provider> {
self.provider()
}
}

View File

@ -7,7 +7,7 @@ use crate::{
},
writer::UnifiedStorageWriter,
AccountReader, BlockExecutionReader, BlockExecutionWriter, BlockHashReader, BlockNumReader,
BlockReader, BlockWriter, BundleStateInit, EvmEnvProvider, FinalizedBlockReader,
BlockReader, BlockWriter, BundleStateInit, DBProvider, EvmEnvProvider, FinalizedBlockReader,
FinalizedBlockWriter, HashingWriter, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider,
HistoricalStateProvider, HistoryWriter, LatestStateProvider, OriginalValuesKnown,
ProviderError, PruneCheckpointReader, PruneCheckpointWriter, RequestsProvider, RevertsInit,
@ -3700,6 +3700,18 @@ impl<TX: DbTxMut> FinalizedBlockWriter for DatabaseProvider<TX> {
}
}
impl<TX: DbTx> DBProvider for DatabaseProvider<TX> {
type Tx = TX;
fn tx_ref(&self) -> &Self::Tx {
&self.tx
}
fn tx_mut(&mut self) -> &mut Self::Tx {
&mut self.tx
}
}
/// Helper method to recover senders for any blocks in the db which do not have senders. This
/// compares the length of the input senders [`Vec`], with the length of given transactions [`Vec`],
/// and will add to the input senders vec if there are more transactions.

View File

@ -169,8 +169,11 @@ where
}
}
impl<N: ProviderNodeTypes> DatabaseProviderFactory<N::DB> for BlockchainProvider<N> {
fn database_provider_ro(&self) -> ProviderResult<DatabaseProviderRO<N::DB>> {
impl<N: ProviderNodeTypes> DatabaseProviderFactory for BlockchainProvider<N> {
type DB = N::DB;
type Provider = DatabaseProviderRO<N::DB>;
fn database_provider_ro(&self) -> ProviderResult<Self::Provider> {
self.database.provider()
}
}

View File

@ -1,9 +0,0 @@
use crate::DatabaseProviderRO;
use reth_db_api::database::Database;
use reth_storage_errors::provider::ProviderResult;
/// Database provider factory.
pub trait DatabaseProviderFactory<DB: Database> {
/// Create new read-only database provider.
fn database_provider_ro(&self) -> ProviderResult<DatabaseProviderRO<DB>>;
}

View File

@ -11,7 +11,7 @@ use reth_node_types::NodeTypesWithDB;
/// Helper trait to unify all provider traits for simplicity.
pub trait FullProvider<N: NodeTypesWithDB>:
DatabaseProviderFactory<N::DB>
DatabaseProviderFactory<DB = N::DB>
+ StaticFileProviderFactory
+ BlockReaderIdExt
+ AccountReader
@ -29,7 +29,7 @@ pub trait FullProvider<N: NodeTypesWithDB>:
}
impl<T, N: NodeTypesWithDB> FullProvider<N> for T where
T: DatabaseProviderFactory<N::DB>
T: DatabaseProviderFactory<DB = N::DB>
+ StaticFileProviderFactory
+ BlockReaderIdExt
+ AccountReader

View File

@ -29,9 +29,6 @@ pub use trie::{StorageTrieWriter, TrieWriter};
mod history;
pub use history::HistoryWriter;
mod database_provider;
pub use database_provider::DatabaseProviderFactory;
mod static_file_provider;
pub use static_file_provider::StaticFileProviderFactory;

View File

@ -15,6 +15,7 @@ workspace = true
# reth
reth-chainspec.workspace = true
reth-db-models.workspace = true
reth-db-api.workspace = true
reth-execution-types.workspace = true
reth-primitives.workspace = true
reth-prune-types.workspace = true

View File

@ -0,0 +1,36 @@
use reth_db_api::{database::Database, transaction::DbTx};
use reth_storage_errors::provider::ProviderResult;
/// Database provider.
pub trait DBProvider: Send + Sync {
/// Underlying database transaction held by the provider.
type Tx: DbTx;
/// Returns a reference to the underlying transaction.
fn tx_ref(&self) -> &Self::Tx;
/// Returns a mutable reference to the underlying transaction.
fn tx_mut(&mut self) -> &mut Self::Tx;
/// Disables long-lived read transaction safety guarantees for leaks prevention and
/// observability improvements.
///
/// CAUTION: In most of the cases, you want the safety guarantees for long read transactions
/// enabled. Use this only if you're sure that no write transaction is open in parallel, meaning
/// that Reth as a node is offline and not progressing.
fn disable_long_read_transaction_safety(&mut self) {
self.tx_mut().disable_long_read_transaction_safety();
}
}
/// Database provider factory.
pub trait DatabaseProviderFactory: Send + Sync {
/// Database this factory produces providers for.
type DB: Database;
/// Provider type returned by the factory.
type Provider: DBProvider<Tx = <Self::DB as Database>::TX>;
/// Create new read-only database provider.
fn database_provider_ro(&self) -> ProviderResult<Self::Provider>;
}

View File

@ -52,4 +52,7 @@ pub use trie::*;
mod withdrawals;
pub use withdrawals::*;
mod database_provider;
pub use database_provider::*;
pub mod noop;

View File

@ -3,10 +3,11 @@ use crate::metrics::ParallelStateRootMetrics;
use crate::{stats::ParallelTrieTracker, storage_root_targets::StorageRootTargets};
use alloy_rlp::{BufMut, Encodable};
use itertools::Itertools;
use reth_db_api::database::Database;
use reth_execution_errors::StorageRootError;
use reth_primitives::B256;
use reth_provider::{providers::ConsistentDbView, DatabaseProviderFactory, ProviderError};
use reth_provider::{
providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory, ProviderError,
};
use reth_tasks::pool::BlockingTaskPool;
use reth_trie::{
hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory},
@ -35,9 +36,9 @@ use tracing::*;
///
/// For sync usage, take a look at `ParallelStateRoot`.
#[derive(Debug)]
pub struct AsyncStateRoot<DB, Provider> {
pub struct AsyncStateRoot<Factory> {
/// Consistent view of the database.
view: ConsistentDbView<DB, Provider>,
view: ConsistentDbView<Factory>,
/// Blocking task pool.
blocking_pool: BlockingTaskPool,
/// Changed hashed state.
@ -47,10 +48,10 @@ pub struct AsyncStateRoot<DB, Provider> {
metrics: ParallelStateRootMetrics,
}
impl<DB, Provider> AsyncStateRoot<DB, Provider> {
impl<Factory> AsyncStateRoot<Factory> {
/// Create new async state root calculator.
pub fn new(
view: ConsistentDbView<DB, Provider>,
view: ConsistentDbView<Factory>,
blocking_pool: BlockingTaskPool,
hashed_state: HashedPostState,
) -> Self {
@ -64,10 +65,9 @@ impl<DB, Provider> AsyncStateRoot<DB, Provider> {
}
}
impl<DB, Provider> AsyncStateRoot<DB, Provider>
impl<Factory> AsyncStateRoot<Factory>
where
DB: Database + Clone + 'static,
Provider: DatabaseProviderFactory<DB> + Clone + Send + Sync + 'static,
Factory: DatabaseProviderFactory<Provider: BlockReader> + Clone + Send + Sync + 'static,
{
/// Calculate incremental state root asynchronously.
pub async fn incremental_root(self) -> Result<B256, AsyncStateRootError> {

View File

@ -3,10 +3,11 @@ use crate::metrics::ParallelStateRootMetrics;
use crate::{stats::ParallelTrieTracker, storage_root_targets::StorageRootTargets};
use alloy_rlp::{BufMut, Encodable};
use rayon::prelude::*;
use reth_db_api::database::Database;
use reth_execution_errors::StorageRootError;
use reth_primitives::B256;
use reth_provider::{providers::ConsistentDbView, DatabaseProviderFactory, ProviderError};
use reth_provider::{
providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory, ProviderError,
};
use reth_trie::{
hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory},
node_iter::{TrieElement, TrieNodeIter},
@ -33,9 +34,9 @@ use tracing::*;
///
/// If possible, use more optimized `AsyncStateRoot` instead.
#[derive(Debug)]
pub struct ParallelStateRoot<DB, Provider> {
pub struct ParallelStateRoot<Factory> {
/// Consistent view of the database.
view: ConsistentDbView<DB, Provider>,
view: ConsistentDbView<Factory>,
/// Changed hashed state.
hashed_state: HashedPostState,
/// Parallel state root metrics.
@ -43,9 +44,9 @@ pub struct ParallelStateRoot<DB, Provider> {
metrics: ParallelStateRootMetrics,
}
impl<DB, Provider> ParallelStateRoot<DB, Provider> {
impl<Factory> ParallelStateRoot<Factory> {
/// Create new parallel state root calculator.
pub fn new(view: ConsistentDbView<DB, Provider>, hashed_state: HashedPostState) -> Self {
pub fn new(view: ConsistentDbView<Factory>, hashed_state: HashedPostState) -> Self {
Self {
view,
hashed_state,
@ -55,10 +56,9 @@ impl<DB, Provider> ParallelStateRoot<DB, Provider> {
}
}
impl<DB, Provider> ParallelStateRoot<DB, Provider>
impl<Factory> ParallelStateRoot<Factory>
where
DB: Database,
Provider: DatabaseProviderFactory<DB> + Send + Sync,
Factory: DatabaseProviderFactory<Provider: BlockReader> + Send + Sync,
{
/// Calculate incremental state root in parallel.
pub fn incremental_root(self) -> Result<B256, ParallelStateRootError> {