From d11bbe686ba1478b66d228501833d9d67d689380 Mon Sep 17 00:00:00 2001 From: Arsenii Kulikov Date: Wed, 11 Sep 2024 11:36:34 +0400 Subject: [PATCH] refactor: reduce `ProviderFactory` usage (#10798) --- Cargo.lock | 4 +- bin/reth/src/commands/debug_cmd/execution.rs | 2 +- crates/cli/commands/src/import.rs | 2 +- .../beacon/src/engine/hooks/static_file.rs | 31 +++++++--- crates/node/builder/src/launch/common.rs | 4 +- crates/node/builder/src/setup.rs | 4 +- crates/node/core/Cargo.toml | 4 +- crates/node/core/src/node_config.rs | 17 ++--- .../cli/src/commands/build_pipeline.rs | 2 +- crates/stages/api/src/pipeline/builder.rs | 2 +- crates/stages/api/src/pipeline/mod.rs | 2 +- .../static-file/src/segments/headers.rs | 8 +-- .../static-file/src/segments/mod.rs | 7 +-- .../static-file/src/segments/receipts.rs | 8 +-- .../static-file/src/segments/transactions.rs | 8 +-- .../static-file/src/static_file_producer.rs | 62 ++++++++++--------- .../src/providers/blockchain_provider.rs | 7 ++- .../provider/src/providers/consistent_view.rs | 25 ++++---- .../provider/src/providers/database/mod.rs | 7 ++- .../src/providers/database/provider.rs | 14 ++++- crates/storage/provider/src/providers/mod.rs | 7 ++- .../provider/src/traits/database_provider.rs | 9 --- crates/storage/provider/src/traits/full.rs | 4 +- crates/storage/provider/src/traits/mod.rs | 3 - crates/storage/storage-api/Cargo.toml | 1 + .../storage-api/src/database_provider.rs | 36 +++++++++++ crates/storage/storage-api/src/lib.rs | 3 + crates/trie/parallel/src/async_root.rs | 18 +++--- crates/trie/parallel/src/parallel_root.rs | 18 +++--- 29 files changed, 190 insertions(+), 129 deletions(-) delete mode 100644 crates/storage/provider/src/traits/database_provider.rs create mode 100644 crates/storage/storage-api/src/database_provider.rs diff --git a/Cargo.lock b/Cargo.lock index 82b8947e9..667c28293 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7692,10 +7692,8 @@ dependencies = [ "reth-network", "reth-network-p2p", "reth-network-peers", - "reth-node-types", "reth-optimism-chainspec", "reth-primitives", - "reth-provider", "reth-prune-types", "reth-rpc-api", "reth-rpc-eth-api", @@ -7704,6 +7702,7 @@ dependencies = [ "reth-rpc-types", "reth-rpc-types-compat", "reth-stages-types", + "reth-storage-api", "reth-storage-errors", "reth-tracing", "reth-transaction-pool", @@ -8679,6 +8678,7 @@ dependencies = [ "alloy-primitives", "auto_impl", "reth-chainspec", + "reth-db-api", "reth-db-models", "reth-execution-types", "reth-primitives", diff --git a/bin/reth/src/commands/debug_cmd/execution.rs b/bin/reth/src/commands/debug_cmd/execution.rs index c216c3c66..64ca4dc2d 100644 --- a/bin/reth/src/commands/debug_cmd/execution.rs +++ b/bin/reth/src/commands/debug_cmd/execution.rs @@ -64,7 +64,7 @@ impl> Command { consensus: Arc, provider_factory: ProviderFactory, task_executor: &TaskExecutor, - static_file_producer: StaticFileProducer, + static_file_producer: StaticFileProducer>, ) -> eyre::Result> where Client: BlockClient + 'static, diff --git a/crates/cli/commands/src/import.rs b/crates/cli/commands/src/import.rs index 2ab128732..5b35e8aa1 100644 --- a/crates/cli/commands/src/import.rs +++ b/crates/cli/commands/src/import.rs @@ -163,7 +163,7 @@ pub fn build_import_pipeline( provider_factory: ProviderFactory, consensus: &Arc, file_client: Arc, - static_file_producer: StaticFileProducer, + static_file_producer: StaticFileProducer>, disable_exec: bool, executor: E, ) -> eyre::Result<(Pipeline, impl Stream)> diff --git a/crates/consensus/beacon/src/engine/hooks/static_file.rs b/crates/consensus/beacon/src/engine/hooks/static_file.rs index fbadb5e5c..8a5a28f95 100644 --- a/crates/consensus/beacon/src/engine/hooks/static_file.rs +++ b/crates/consensus/beacon/src/engine/hooks/static_file.rs @@ -7,9 +7,10 @@ use crate::{ use alloy_primitives::BlockNumber; use futures::FutureExt; use reth_errors::RethResult; -use reth_node_types::NodeTypesWithDB; use reth_primitives::static_file::HighestStaticFiles; -use reth_provider::providers::ProviderNodeTypes; +use reth_provider::{ + BlockReader, DatabaseProviderFactory, StageCheckpointReader, StaticFileProviderFactory, +}; use reth_static_file::{StaticFileProducer, StaticFileProducerWithResult}; use reth_tasks::TaskSpawner; use std::task::{ready, Context, Poll}; @@ -20,17 +21,22 @@ use tracing::trace; /// /// This type controls the [`StaticFileProducer`]. #[derive(Debug)] -pub struct StaticFileHook { +pub struct StaticFileHook { /// The current state of the `static_file_producer`. - state: StaticFileProducerState, + state: StaticFileProducerState, /// The type that can spawn the `static_file_producer` task. task_spawner: Box, } -impl StaticFileHook { +impl StaticFileHook +where + Provider: StaticFileProviderFactory + + DatabaseProviderFactory + + 'static, +{ /// Create a new instance pub fn new( - static_file_producer: StaticFileProducer, + static_file_producer: StaticFileProducer, task_spawner: Box, ) -> Self { Self { state: StaticFileProducerState::Idle(Some(static_file_producer)), task_spawner } @@ -128,7 +134,12 @@ impl StaticFileHook { } } -impl EngineHook for StaticFileHook { +impl EngineHook for StaticFileHook +where + Provider: StaticFileProviderFactory + + DatabaseProviderFactory + + 'static, +{ fn name(&self) -> &'static str { "StaticFile" } @@ -164,9 +175,9 @@ impl EngineHook for StaticFileHook { /// [`StaticFileProducerState::Idle`] means that the static file producer is currently idle. /// [`StaticFileProducerState::Running`] means that the static file producer is currently running. #[derive(Debug)] -enum StaticFileProducerState { +enum StaticFileProducerState { /// [`StaticFileProducer`] is idle. - Idle(Option>), + Idle(Option>), /// [`StaticFileProducer`] is running and waiting for a response - Running(oneshot::Receiver>), + Running(oneshot::Receiver>), } diff --git a/crates/node/builder/src/launch/common.rs b/crates/node/builder/src/launch/common.rs index 6768aa4ea..e15a852e7 100644 --- a/crates/node/builder/src/launch/common.rs +++ b/crates/node/builder/src/launch/common.rs @@ -620,7 +620,7 @@ where /// If the database is empty, returns the genesis block. pub fn lookup_head(&self) -> eyre::Result { self.node_config() - .lookup_head(self.provider_factory().clone()) + .lookup_head(self.provider_factory()) .wrap_err("the head block is missing") } @@ -744,7 +744,7 @@ where } /// Creates a new [`StaticFileProducer`] with the attached database. - pub fn static_file_producer(&self) -> StaticFileProducer { + pub fn static_file_producer(&self) -> StaticFileProducer> { StaticFileProducer::new(self.provider_factory().clone(), self.prune_modes()) } diff --git a/crates/node/builder/src/setup.rs b/crates/node/builder/src/setup.rs index 69972a5a2..10144d869 100644 --- a/crates/node/builder/src/setup.rs +++ b/crates/node/builder/src/setup.rs @@ -32,7 +32,7 @@ pub fn build_networked_pipeline( metrics_tx: reth_stages::MetricEventsSender, prune_config: Option, max_block: Option, - static_file_producer: StaticFileProducer, + static_file_producer: StaticFileProducer>, executor: Executor, exex_manager_handle: ExExManagerHandle, ) -> eyre::Result> @@ -78,7 +78,7 @@ pub fn build_pipeline( max_block: Option, metrics_tx: reth_stages::MetricEventsSender, prune_config: Option, - static_file_producer: StaticFileProducer, + static_file_producer: StaticFileProducer>, executor: Executor, exex_manager_handle: ExExManagerHandle, ) -> eyre::Result> diff --git a/crates/node/core/Cargo.toml b/crates/node/core/Cargo.toml index 1064a4212..1cd4264cc 100644 --- a/crates/node/core/Cargo.toml +++ b/crates/node/core/Cargo.toml @@ -19,7 +19,7 @@ reth-cli-util.workspace = true reth-fs-util.workspace = true reth-db = { workspace = true, features = ["mdbx"] } reth-storage-errors.workspace = true -reth-provider.workspace = true +reth-storage-api.workspace = true reth-network = { workspace = true, features = ["serde"] } reth-network-p2p.workspace = true reth-rpc-eth-types.workspace = true @@ -39,7 +39,6 @@ reth-consensus-common.workspace = true reth-prune-types.workspace = true reth-stages-types.workspace = true reth-optimism-chainspec = { workspace = true, optional = true } -reth-node-types.workspace = true # ethereum alloy-genesis.workspace = true @@ -85,7 +84,6 @@ tempfile.workspace = true [features] optimism = [ "reth-primitives/optimism", - "reth-provider/optimism", "reth-rpc-types-compat/optimism", "reth-rpc-eth-api/optimism", "dep:reth-optimism-chainspec", diff --git a/crates/node/core/src/node_config.rs b/crates/node/core/src/node_config.rs index 64a3da93f..d4cdd2f88 100644 --- a/crates/node/core/src/node_config.rs +++ b/crates/node/core/src/node_config.rs @@ -16,10 +16,11 @@ use serde::{de::DeserializeOwned, Serialize}; use std::{fs, path::Path}; use alloy_primitives::{BlockNumber, B256}; -use reth_node_types::NodeTypesWithDB; use reth_primitives::{BlockHashOrNumber, Head, SealedHeader}; -use reth_provider::{BlockHashReader, HeaderProvider, ProviderFactory, StageCheckpointReader}; use reth_stages_types::StageId; +use reth_storage_api::{ + BlockHashReader, DatabaseProviderFactory, HeaderProvider, StageCheckpointReader, +}; use reth_storage_errors::provider::ProviderResult; use std::{net::SocketAddr, path::PathBuf, sync::Arc}; use tracing::*; @@ -263,11 +264,13 @@ impl NodeConfig { /// Fetches the head block from the database. /// /// If the database is empty, returns the genesis block. - pub fn lookup_head>( - &self, - factory: ProviderFactory, - ) -> ProviderResult { - let provider = factory.provider()?; + pub fn lookup_head(&self, factory: &Factory) -> ProviderResult + where + Factory: DatabaseProviderFactory< + Provider: HeaderProvider + StageCheckpointReader + BlockHashReader, + >, + { + let provider = factory.database_provider_ro()?; let head = provider.get_stage_checkpoint(StageId::Finish)?.unwrap_or_default().block_number; diff --git a/crates/optimism/cli/src/commands/build_pipeline.rs b/crates/optimism/cli/src/commands/build_pipeline.rs index 20f01cc17..b546a1a51 100644 --- a/crates/optimism/cli/src/commands/build_pipeline.rs +++ b/crates/optimism/cli/src/commands/build_pipeline.rs @@ -32,7 +32,7 @@ pub(crate) async fn build_import_pipeline( provider_factory: ProviderFactory, consensus: &Arc, file_client: Arc, - static_file_producer: StaticFileProducer, + static_file_producer: StaticFileProducer>, disable_exec: bool, ) -> eyre::Result<(Pipeline, impl Stream)> where diff --git a/crates/stages/api/src/pipeline/builder.rs b/crates/stages/api/src/pipeline/builder.rs index c5c585cfc..849350493 100644 --- a/crates/stages/api/src/pipeline/builder.rs +++ b/crates/stages/api/src/pipeline/builder.rs @@ -72,7 +72,7 @@ where pub fn build>( self, provider_factory: ProviderFactory, - static_file_producer: StaticFileProducer, + static_file_producer: StaticFileProducer>, ) -> Pipeline { let Self { stages, max_block, tip_tx, metrics_tx } = self; Pipeline { diff --git a/crates/stages/api/src/pipeline/mod.rs b/crates/stages/api/src/pipeline/mod.rs index 470adf228..a94112396 100644 --- a/crates/stages/api/src/pipeline/mod.rs +++ b/crates/stages/api/src/pipeline/mod.rs @@ -71,7 +71,7 @@ pub struct Pipeline { stages: Vec>, /// The maximum block number to sync to. max_block: Option, - static_file_producer: StaticFileProducer, + static_file_producer: StaticFileProducer>, /// Sender for events the pipeline emits. event_sender: EventSender, /// Keeps track of the progress of the pipeline. diff --git a/crates/static-file/static-file/src/segments/headers.rs b/crates/static-file/static-file/src/segments/headers.rs index 3212c0cd8..54d5bee65 100644 --- a/crates/static-file/static-file/src/segments/headers.rs +++ b/crates/static-file/static-file/src/segments/headers.rs @@ -1,10 +1,10 @@ use crate::segments::Segment; use alloy_primitives::BlockNumber; use reth_db::tables; -use reth_db_api::{cursor::DbCursorRO, database::Database, transaction::DbTx}; +use reth_db_api::{cursor::DbCursorRO, transaction::DbTx}; use reth_provider::{ providers::{StaticFileProvider, StaticFileWriter}, - DatabaseProviderRO, + DBProvider, }; use reth_static_file_types::StaticFileSegment; use reth_storage_errors::provider::ProviderResult; @@ -14,14 +14,14 @@ use std::ops::RangeInclusive; #[derive(Debug, Default)] pub struct Headers; -impl Segment for Headers { +impl Segment for Headers { fn segment(&self) -> StaticFileSegment { StaticFileSegment::Headers } fn copy_to_static_files( &self, - provider: DatabaseProviderRO, + provider: Provider, static_file_provider: StaticFileProvider, block_range: RangeInclusive, ) -> ProviderResult<()> { diff --git a/crates/static-file/static-file/src/segments/mod.rs b/crates/static-file/static-file/src/segments/mod.rs index 1125b2085..3d961c7b1 100644 --- a/crates/static-file/static-file/src/segments/mod.rs +++ b/crates/static-file/static-file/src/segments/mod.rs @@ -10,14 +10,13 @@ mod receipts; pub use receipts::Receipts; use alloy_primitives::BlockNumber; -use reth_db_api::database::Database; -use reth_provider::{providers::StaticFileProvider, DatabaseProviderRO}; +use reth_provider::providers::StaticFileProvider; use reth_static_file_types::StaticFileSegment; use reth_storage_errors::provider::ProviderResult; use std::ops::RangeInclusive; /// A segment represents moving some portion of the data to static files. -pub trait Segment: Send + Sync { +pub trait Segment: Send + Sync { /// Returns the [`StaticFileSegment`]. fn segment(&self) -> StaticFileSegment; @@ -25,7 +24,7 @@ pub trait Segment: Send + Sync { /// the management of and writing to files. fn copy_to_static_files( &self, - provider: DatabaseProviderRO, + provider: Provider, static_file_provider: StaticFileProvider, block_range: RangeInclusive, ) -> ProviderResult<()>; diff --git a/crates/static-file/static-file/src/segments/receipts.rs b/crates/static-file/static-file/src/segments/receipts.rs index b63d083a0..4e2185a59 100644 --- a/crates/static-file/static-file/src/segments/receipts.rs +++ b/crates/static-file/static-file/src/segments/receipts.rs @@ -1,10 +1,10 @@ use crate::segments::Segment; use alloy_primitives::BlockNumber; use reth_db::tables; -use reth_db_api::{cursor::DbCursorRO, database::Database, transaction::DbTx}; +use reth_db_api::{cursor::DbCursorRO, transaction::DbTx}; use reth_provider::{ providers::{StaticFileProvider, StaticFileWriter}, - BlockReader, DatabaseProviderRO, + BlockReader, DBProvider, }; use reth_static_file_types::StaticFileSegment; use reth_storage_errors::provider::{ProviderError, ProviderResult}; @@ -14,14 +14,14 @@ use std::ops::RangeInclusive; #[derive(Debug, Default)] pub struct Receipts; -impl Segment for Receipts { +impl Segment for Receipts { fn segment(&self) -> StaticFileSegment { StaticFileSegment::Receipts } fn copy_to_static_files( &self, - provider: DatabaseProviderRO, + provider: Provider, static_file_provider: StaticFileProvider, block_range: RangeInclusive, ) -> ProviderResult<()> { diff --git a/crates/static-file/static-file/src/segments/transactions.rs b/crates/static-file/static-file/src/segments/transactions.rs index ac690def4..52e0ca8b5 100644 --- a/crates/static-file/static-file/src/segments/transactions.rs +++ b/crates/static-file/static-file/src/segments/transactions.rs @@ -1,10 +1,10 @@ use crate::segments::Segment; use alloy_primitives::BlockNumber; use reth_db::tables; -use reth_db_api::{cursor::DbCursorRO, database::Database, transaction::DbTx}; +use reth_db_api::{cursor::DbCursorRO, transaction::DbTx}; use reth_provider::{ providers::{StaticFileProvider, StaticFileWriter}, - BlockReader, DatabaseProviderRO, + BlockReader, DBProvider, }; use reth_static_file_types::StaticFileSegment; use reth_storage_errors::provider::{ProviderError, ProviderResult}; @@ -14,7 +14,7 @@ use std::ops::RangeInclusive; #[derive(Debug, Default)] pub struct Transactions; -impl Segment for Transactions { +impl Segment for Transactions { fn segment(&self) -> StaticFileSegment { StaticFileSegment::Transactions } @@ -23,7 +23,7 @@ impl Segment for Transactions { /// [`StaticFileSegment::Transactions`] for the provided block range. fn copy_to_static_files( &self, - provider: DatabaseProviderRO, + provider: Provider, static_file_provider: StaticFileProvider, block_range: RangeInclusive, ) -> ProviderResult<()> { diff --git a/crates/static-file/static-file/src/static_file_producer.rs b/crates/static-file/static-file/src/static_file_producer.rs index fd2c48bbf..eca59e1e7 100644 --- a/crates/static-file/static-file/src/static_file_producer.rs +++ b/crates/static-file/static-file/src/static_file_producer.rs @@ -4,11 +4,9 @@ use crate::{segments, segments::Segment, StaticFileProducerEvent}; use alloy_primitives::BlockNumber; use parking_lot::Mutex; use rayon::prelude::*; -use reth_chainspec::ChainSpec; -use reth_node_types::NodeTypesWithDB; use reth_provider::{ - providers::StaticFileWriter, ProviderFactory, StageCheckpointReader as _, - StaticFileProviderFactory, + providers::StaticFileWriter, BlockReader, DBProvider, DatabaseProviderFactory, + StageCheckpointReader, StaticFileProviderFactory, }; use reth_prune_types::PruneModes; use reth_stages_types::StageId; @@ -26,28 +24,29 @@ use tracing::{debug, trace}; pub type StaticFileProducerResult = ProviderResult; /// The [`StaticFileProducer`] instance itself with the result of [`StaticFileProducerInner::run`] -pub type StaticFileProducerWithResult = (StaticFileProducer, StaticFileProducerResult); +pub type StaticFileProducerWithResult = + (StaticFileProducer, StaticFileProducerResult); /// Static File producer. It's a wrapper around [`StaticFileProducer`] that allows to share it /// between threads. #[derive(Debug)] -pub struct StaticFileProducer(Arc>>); +pub struct StaticFileProducer(Arc>>); -impl StaticFileProducer { +impl StaticFileProducer { /// Creates a new [`StaticFileProducer`]. - pub fn new(provider_factory: ProviderFactory, prune_modes: PruneModes) -> Self { - Self(Arc::new(Mutex::new(StaticFileProducerInner::new(provider_factory, prune_modes)))) + pub fn new(provider: Provider, prune_modes: PruneModes) -> Self { + Self(Arc::new(Mutex::new(StaticFileProducerInner::new(provider, prune_modes)))) } } -impl Clone for StaticFileProducer { +impl Clone for StaticFileProducer { fn clone(&self) -> Self { Self(self.0.clone()) } } -impl Deref for StaticFileProducer { - type Target = Arc>>; +impl Deref for StaticFileProducer { + type Target = Arc>>; fn deref(&self) -> &Self::Target { &self.0 @@ -57,9 +56,9 @@ impl Deref for StaticFileProducer { /// Static File producer routine. See [`StaticFileProducerInner::run`] for more detailed /// description. #[derive(Debug)] -pub struct StaticFileProducerInner { +pub struct StaticFileProducerInner { /// Provider factory - provider_factory: ProviderFactory, + provider: Provider, /// Pruning configuration for every part of the data that can be pruned. Set by user, and /// needed in [`StaticFileProducerInner`] to prevent attempting to move prunable data to static /// files. See [`StaticFileProducerInner::get_static_file_targets`]. @@ -101,13 +100,17 @@ impl StaticFileTargets { } } -impl StaticFileProducerInner { - fn new(provider_factory: ProviderFactory, prune_modes: PruneModes) -> Self { - Self { provider_factory, prune_modes, event_sender: Default::default() } +impl StaticFileProducerInner { + fn new(provider: Provider, prune_modes: PruneModes) -> Self { + Self { provider, prune_modes, event_sender: Default::default() } } } -impl> StaticFileProducerInner { +impl StaticFileProducerInner +where + Provider: StaticFileProviderFactory + + DatabaseProviderFactory, +{ /// Listen for events on the `static_file_producer`. pub fn events(&self) -> EventStream { self.event_sender.new_listener() @@ -117,8 +120,8 @@ impl> StaticFileProducerInner { /// /// For each [Some] target in [`StaticFileTargets`], initializes a corresponding [Segment] and /// runs it with the provided block range using [`reth_provider::providers::StaticFileProvider`] - /// and a read-only database transaction from [`ProviderFactory`]. All segments are run in - /// parallel. + /// and a read-only database transaction from [`DatabaseProviderFactory`]. All segments are run + /// in parallel. /// /// NOTE: it doesn't delete the data from database, and the actual deleting (aka pruning) logic /// lives in the `prune` crate. @@ -129,7 +132,7 @@ impl> StaticFileProducerInner { } debug_assert!(targets.is_contiguous_to_highest_static_files( - self.provider_factory.static_file_provider().get_highest_static_files() + self.provider.static_file_provider().get_highest_static_files() )); self.event_sender.notify(StaticFileProducerEvent::Started { targets: targets.clone() }); @@ -137,7 +140,8 @@ impl> StaticFileProducerInner { debug!(target: "static_file", ?targets, "StaticFileProducer started"); let start = Instant::now(); - let mut segments = Vec::<(Box>, RangeInclusive)>::new(); + let mut segments = + Vec::<(Box>, RangeInclusive)>::new(); if let Some(block_range) = targets.transactions.clone() { segments.push((Box::new(segments::Transactions), block_range)); @@ -155,8 +159,9 @@ impl> StaticFileProducerInner { // Create a new database transaction on every segment to prevent long-lived read-only // transactions - let provider = self.provider_factory.provider()?.disable_long_read_transaction_safety(); - segment.copy_to_static_files(provider, self.provider_factory.static_file_provider(), block_range.clone())?; + let mut provider = self.provider.database_provider_ro()?; + provider.disable_long_read_transaction_safety(); + segment.copy_to_static_files(provider, self.provider.static_file_provider(), block_range.clone())?; let elapsed = start.elapsed(); // TODO(alexey): track in metrics debug!(target: "static_file", segment = %segment.segment(), ?block_range, ?elapsed, "Finished StaticFileProducer segment"); @@ -164,9 +169,9 @@ impl> StaticFileProducerInner { Ok(()) })?; - self.provider_factory.static_file_provider().commit()?; + self.provider.static_file_provider().commit()?; for (segment, block_range) in segments { - self.provider_factory + self.provider .static_file_provider() .update_index(segment.segment(), Some(*block_range.end()))?; } @@ -185,7 +190,7 @@ impl> StaticFileProducerInner { /// /// Returns highest block numbers for all static file segments. pub fn copy_to_static_files(&self) -> ProviderResult { - let provider = self.provider_factory.provider()?; + let provider = self.provider.database_provider_ro()?; let stages_checkpoints = [StageId::Headers, StageId::Execution, StageId::Bodies] .into_iter() .map(|stage| provider.get_stage_checkpoint(stage).map(|c| c.map(|c| c.block_number))) @@ -209,8 +214,7 @@ impl> StaticFileProducerInner { &self, finalized_block_numbers: HighestStaticFiles, ) -> ProviderResult { - let highest_static_files = - self.provider_factory.static_file_provider().get_highest_static_files(); + let highest_static_files = self.provider.static_file_provider().get_highest_static_files(); let targets = StaticFileTargets { headers: finalized_block_numbers.headers.and_then(|finalized_block_number| { diff --git a/crates/storage/provider/src/providers/blockchain_provider.rs b/crates/storage/provider/src/providers/blockchain_provider.rs index 91a4b306b..adf73bd70 100644 --- a/crates/storage/provider/src/providers/blockchain_provider.rs +++ b/crates/storage/provider/src/providers/blockchain_provider.rs @@ -261,8 +261,11 @@ impl BlockchainProvider2 { } } -impl DatabaseProviderFactory for BlockchainProvider2 { - fn database_provider_ro(&self) -> ProviderResult> { +impl DatabaseProviderFactory for BlockchainProvider2 { + type DB = N::DB; + type Provider = DatabaseProviderRO; + + fn database_provider_ro(&self) -> ProviderResult { self.database.provider() } } diff --git a/crates/storage/provider/src/providers/consistent_view.rs b/crates/storage/provider/src/providers/consistent_view.rs index fe9b65941..c5d98a238 100644 --- a/crates/storage/provider/src/providers/consistent_view.rs +++ b/crates/storage/provider/src/providers/consistent_view.rs @@ -1,8 +1,7 @@ -use crate::{BlockNumReader, DatabaseProviderFactory, DatabaseProviderRO, HeaderProvider}; -use reth_db_api::database::Database; +use crate::{BlockNumReader, DatabaseProviderFactory, HeaderProvider}; use reth_primitives::{GotExpected, B256}; +use reth_storage_api::BlockReader; use reth_storage_errors::provider::ProviderResult; -use std::marker::PhantomData; pub use reth_storage_errors::provider::ConsistentViewError; @@ -22,24 +21,22 @@ pub use reth_storage_errors::provider::ConsistentViewError; /// appropriately. /// 2) be sure that the state does not change. #[derive(Clone, Debug)] -pub struct ConsistentDbView { - database: PhantomData, - provider: Provider, +pub struct ConsistentDbView { + factory: Factory, tip: Option, } -impl ConsistentDbView +impl ConsistentDbView where - DB: Database, - Provider: DatabaseProviderFactory, + Factory: DatabaseProviderFactory, { /// Creates new consistent database view. - pub const fn new(provider: Provider, tip: Option) -> Self { - Self { database: PhantomData, provider, tip } + pub const fn new(factory: Factory, tip: Option) -> Self { + Self { factory, tip } } /// Creates new consistent database view with latest tip. - pub fn new_with_latest_tip(provider: Provider) -> ProviderResult { + pub fn new_with_latest_tip(provider: Factory) -> ProviderResult { let provider_ro = provider.database_provider_ro()?; let last_num = provider_ro.last_block_number()?; let tip = provider_ro.sealed_header(last_num)?.map(|h| h.hash()); @@ -47,9 +44,9 @@ where } /// Creates new read-only provider and performs consistency checks on the current tip. - pub fn provider_ro(&self) -> ProviderResult> { + pub fn provider_ro(&self) -> ProviderResult { // Create a new provider. - let provider_ro = self.provider.database_provider_ro()?; + let provider_ro = self.factory.database_provider_ro()?; // Check that the latest stored header number matches the number // that consistent view was initialized with. diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index 249d1f116..2c79134ca 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -182,8 +182,11 @@ impl ProviderFactory { } } -impl DatabaseProviderFactory for ProviderFactory { - fn database_provider_ro(&self) -> ProviderResult> { +impl DatabaseProviderFactory for ProviderFactory { + type DB = N::DB; + type Provider = DatabaseProviderRO; + + fn database_provider_ro(&self) -> ProviderResult { self.provider() } } diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 643b2359e..666edb148 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -7,7 +7,7 @@ use crate::{ }, writer::UnifiedStorageWriter, AccountReader, BlockExecutionReader, BlockExecutionWriter, BlockHashReader, BlockNumReader, - BlockReader, BlockWriter, BundleStateInit, EvmEnvProvider, FinalizedBlockReader, + BlockReader, BlockWriter, BundleStateInit, DBProvider, EvmEnvProvider, FinalizedBlockReader, FinalizedBlockWriter, HashingWriter, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider, HistoricalStateProvider, HistoryWriter, LatestStateProvider, OriginalValuesKnown, ProviderError, PruneCheckpointReader, PruneCheckpointWriter, RequestsProvider, RevertsInit, @@ -3700,6 +3700,18 @@ impl FinalizedBlockWriter for DatabaseProvider { } } +impl DBProvider for DatabaseProvider { + type Tx = TX; + + fn tx_ref(&self) -> &Self::Tx { + &self.tx + } + + fn tx_mut(&mut self) -> &mut Self::Tx { + &mut self.tx + } +} + /// Helper method to recover senders for any blocks in the db which do not have senders. This /// compares the length of the input senders [`Vec`], with the length of given transactions [`Vec`], /// and will add to the input senders vec if there are more transactions. diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index 246bd426f..652d275f3 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -169,8 +169,11 @@ where } } -impl DatabaseProviderFactory for BlockchainProvider { - fn database_provider_ro(&self) -> ProviderResult> { +impl DatabaseProviderFactory for BlockchainProvider { + type DB = N::DB; + type Provider = DatabaseProviderRO; + + fn database_provider_ro(&self) -> ProviderResult { self.database.provider() } } diff --git a/crates/storage/provider/src/traits/database_provider.rs b/crates/storage/provider/src/traits/database_provider.rs deleted file mode 100644 index fab60fe2e..000000000 --- a/crates/storage/provider/src/traits/database_provider.rs +++ /dev/null @@ -1,9 +0,0 @@ -use crate::DatabaseProviderRO; -use reth_db_api::database::Database; -use reth_storage_errors::provider::ProviderResult; - -/// Database provider factory. -pub trait DatabaseProviderFactory { - /// Create new read-only database provider. - fn database_provider_ro(&self) -> ProviderResult>; -} diff --git a/crates/storage/provider/src/traits/full.rs b/crates/storage/provider/src/traits/full.rs index 52346257a..1022184d6 100644 --- a/crates/storage/provider/src/traits/full.rs +++ b/crates/storage/provider/src/traits/full.rs @@ -11,7 +11,7 @@ use reth_node_types::NodeTypesWithDB; /// Helper trait to unify all provider traits for simplicity. pub trait FullProvider: - DatabaseProviderFactory + DatabaseProviderFactory + StaticFileProviderFactory + BlockReaderIdExt + AccountReader @@ -29,7 +29,7 @@ pub trait FullProvider: } impl FullProvider for T where - T: DatabaseProviderFactory + T: DatabaseProviderFactory + StaticFileProviderFactory + BlockReaderIdExt + AccountReader diff --git a/crates/storage/provider/src/traits/mod.rs b/crates/storage/provider/src/traits/mod.rs index 3e692b7ed..8bae4f67a 100644 --- a/crates/storage/provider/src/traits/mod.rs +++ b/crates/storage/provider/src/traits/mod.rs @@ -29,9 +29,6 @@ pub use trie::{StorageTrieWriter, TrieWriter}; mod history; pub use history::HistoryWriter; -mod database_provider; -pub use database_provider::DatabaseProviderFactory; - mod static_file_provider; pub use static_file_provider::StaticFileProviderFactory; diff --git a/crates/storage/storage-api/Cargo.toml b/crates/storage/storage-api/Cargo.toml index e469cd6dc..ce043213c 100644 --- a/crates/storage/storage-api/Cargo.toml +++ b/crates/storage/storage-api/Cargo.toml @@ -15,6 +15,7 @@ workspace = true # reth reth-chainspec.workspace = true reth-db-models.workspace = true +reth-db-api.workspace = true reth-execution-types.workspace = true reth-primitives.workspace = true reth-prune-types.workspace = true diff --git a/crates/storage/storage-api/src/database_provider.rs b/crates/storage/storage-api/src/database_provider.rs new file mode 100644 index 000000000..fd15411d7 --- /dev/null +++ b/crates/storage/storage-api/src/database_provider.rs @@ -0,0 +1,36 @@ +use reth_db_api::{database::Database, transaction::DbTx}; +use reth_storage_errors::provider::ProviderResult; + +/// Database provider. +pub trait DBProvider: Send + Sync { + /// Underlying database transaction held by the provider. + type Tx: DbTx; + + /// Returns a reference to the underlying transaction. + fn tx_ref(&self) -> &Self::Tx; + + /// Returns a mutable reference to the underlying transaction. + fn tx_mut(&mut self) -> &mut Self::Tx; + + /// Disables long-lived read transaction safety guarantees for leaks prevention and + /// observability improvements. + /// + /// CAUTION: In most of the cases, you want the safety guarantees for long read transactions + /// enabled. Use this only if you're sure that no write transaction is open in parallel, meaning + /// that Reth as a node is offline and not progressing. + fn disable_long_read_transaction_safety(&mut self) { + self.tx_mut().disable_long_read_transaction_safety(); + } +} + +/// Database provider factory. +pub trait DatabaseProviderFactory: Send + Sync { + /// Database this factory produces providers for. + type DB: Database; + + /// Provider type returned by the factory. + type Provider: DBProvider::TX>; + + /// Create new read-only database provider. + fn database_provider_ro(&self) -> ProviderResult; +} diff --git a/crates/storage/storage-api/src/lib.rs b/crates/storage/storage-api/src/lib.rs index 440c27d37..3f93bbbde 100644 --- a/crates/storage/storage-api/src/lib.rs +++ b/crates/storage/storage-api/src/lib.rs @@ -52,4 +52,7 @@ pub use trie::*; mod withdrawals; pub use withdrawals::*; +mod database_provider; +pub use database_provider::*; + pub mod noop; diff --git a/crates/trie/parallel/src/async_root.rs b/crates/trie/parallel/src/async_root.rs index ef206064f..179c7daba 100644 --- a/crates/trie/parallel/src/async_root.rs +++ b/crates/trie/parallel/src/async_root.rs @@ -3,10 +3,11 @@ use crate::metrics::ParallelStateRootMetrics; use crate::{stats::ParallelTrieTracker, storage_root_targets::StorageRootTargets}; use alloy_rlp::{BufMut, Encodable}; use itertools::Itertools; -use reth_db_api::database::Database; use reth_execution_errors::StorageRootError; use reth_primitives::B256; -use reth_provider::{providers::ConsistentDbView, DatabaseProviderFactory, ProviderError}; +use reth_provider::{ + providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory, ProviderError, +}; use reth_tasks::pool::BlockingTaskPool; use reth_trie::{ hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory}, @@ -35,9 +36,9 @@ use tracing::*; /// /// For sync usage, take a look at `ParallelStateRoot`. #[derive(Debug)] -pub struct AsyncStateRoot { +pub struct AsyncStateRoot { /// Consistent view of the database. - view: ConsistentDbView, + view: ConsistentDbView, /// Blocking task pool. blocking_pool: BlockingTaskPool, /// Changed hashed state. @@ -47,10 +48,10 @@ pub struct AsyncStateRoot { metrics: ParallelStateRootMetrics, } -impl AsyncStateRoot { +impl AsyncStateRoot { /// Create new async state root calculator. pub fn new( - view: ConsistentDbView, + view: ConsistentDbView, blocking_pool: BlockingTaskPool, hashed_state: HashedPostState, ) -> Self { @@ -64,10 +65,9 @@ impl AsyncStateRoot { } } -impl AsyncStateRoot +impl AsyncStateRoot where - DB: Database + Clone + 'static, - Provider: DatabaseProviderFactory + Clone + Send + Sync + 'static, + Factory: DatabaseProviderFactory + Clone + Send + Sync + 'static, { /// Calculate incremental state root asynchronously. pub async fn incremental_root(self) -> Result { diff --git a/crates/trie/parallel/src/parallel_root.rs b/crates/trie/parallel/src/parallel_root.rs index 015d41677..b41d9319c 100644 --- a/crates/trie/parallel/src/parallel_root.rs +++ b/crates/trie/parallel/src/parallel_root.rs @@ -3,10 +3,11 @@ use crate::metrics::ParallelStateRootMetrics; use crate::{stats::ParallelTrieTracker, storage_root_targets::StorageRootTargets}; use alloy_rlp::{BufMut, Encodable}; use rayon::prelude::*; -use reth_db_api::database::Database; use reth_execution_errors::StorageRootError; use reth_primitives::B256; -use reth_provider::{providers::ConsistentDbView, DatabaseProviderFactory, ProviderError}; +use reth_provider::{ + providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory, ProviderError, +}; use reth_trie::{ hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory}, node_iter::{TrieElement, TrieNodeIter}, @@ -33,9 +34,9 @@ use tracing::*; /// /// If possible, use more optimized `AsyncStateRoot` instead. #[derive(Debug)] -pub struct ParallelStateRoot { +pub struct ParallelStateRoot { /// Consistent view of the database. - view: ConsistentDbView, + view: ConsistentDbView, /// Changed hashed state. hashed_state: HashedPostState, /// Parallel state root metrics. @@ -43,9 +44,9 @@ pub struct ParallelStateRoot { metrics: ParallelStateRootMetrics, } -impl ParallelStateRoot { +impl ParallelStateRoot { /// Create new parallel state root calculator. - pub fn new(view: ConsistentDbView, hashed_state: HashedPostState) -> Self { + pub fn new(view: ConsistentDbView, hashed_state: HashedPostState) -> Self { Self { view, hashed_state, @@ -55,10 +56,9 @@ impl ParallelStateRoot { } } -impl ParallelStateRoot +impl ParallelStateRoot where - DB: Database, - Provider: DatabaseProviderFactory + Send + Sync, + Factory: DatabaseProviderFactory + Send + Sync, { /// Calculate incremental state root in parallel. pub fn incremental_root(self) -> Result {