From 0fa8e83e16c26542fca70122dd6086aa2bbf0ece Mon Sep 17 00:00:00 2001 From: Arsenii Kulikov Date: Mon, 16 Sep 2024 14:46:53 +0300 Subject: [PATCH] refactor: make `reth-prune` independent of concrete `DatabaseProvider` (#10921) --- Cargo.lock | 1 - .../beacon/src/engine/hooks/prune.rs | 38 ++++-- .../consensus/beacon/src/engine/test_utils.rs | 4 +- crates/engine/service/src/service.rs | 8 +- crates/engine/tree/src/persistence.rs | 23 ++-- crates/prune/prune/Cargo.toml | 1 - crates/prune/prune/src/builder.rs | 29 ++-- crates/prune/prune/src/db_ext.rs | 127 ++++++++++++++++++ crates/prune/prune/src/lib.rs | 3 +- crates/prune/prune/src/pruner.rs | 77 +++++------ crates/prune/prune/src/segments/mod.rs | 24 ++-- crates/prune/prune/src/segments/receipts.rs | 26 ++-- crates/prune/prune/src/segments/set.rs | 26 ++-- .../prune/src/segments/static_file/headers.rs | 58 ++++---- .../src/segments/static_file/receipts.rs | 18 +-- .../src/segments/static_file/transactions.rs | 26 ++-- .../src/segments/user/account_history.rs | 28 ++-- .../prune/prune/src/segments/user/history.rs | 9 +- .../prune/prune/src/segments/user/receipts.rs | 20 +-- .../src/segments/user/receipts_by_logs.rs | 23 ++-- .../src/segments/user/sender_recovery.rs | 34 ++--- .../src/segments/user/storage_history.rs | 31 ++--- .../src/segments/user/transaction_lookup.rs | 37 ++--- crates/stages/stages/src/stages/prune.rs | 2 +- .../static-file/src/static_file_producer.rs | 3 +- .../src/providers/blockchain_provider.rs | 12 +- .../provider/src/providers/database/mod.rs | 5 + .../src/providers/database/provider.rs | 125 ++--------------- crates/storage/provider/src/providers/mod.rs | 8 +- .../storage/provider/src/test_utils/mock.rs | 5 + .../storage-api/src/database_provider.rs | 20 ++- 31 files changed, 458 insertions(+), 393 deletions(-) create mode 100644 crates/prune/prune/src/db_ext.rs diff --git a/Cargo.lock b/Cargo.lock index 16edfba2d..3a570283c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8211,7 +8211,6 @@ dependencies = [ "reth-errors", "reth-exex-types", "reth-metrics", - "reth-node-types", "reth-provider", "reth-prune-types", "reth-stages", diff --git a/crates/consensus/beacon/src/engine/hooks/prune.rs b/crates/consensus/beacon/src/engine/hooks/prune.rs index eb5ca7cf5..409fc98b8 100644 --- a/crates/consensus/beacon/src/engine/hooks/prune.rs +++ b/crates/consensus/beacon/src/engine/hooks/prune.rs @@ -8,8 +8,7 @@ use alloy_primitives::BlockNumber; use futures::FutureExt; use metrics::Counter; use reth_errors::{RethError, RethResult}; -use reth_node_types::NodeTypesWithDB; -use reth_provider::{providers::ProviderNodeTypes, ProviderFactory}; +use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter}; use reth_prune::{Pruner, PrunerError, PrunerWithResult}; use reth_tasks::TaskSpawner; use std::{ @@ -21,15 +20,18 @@ use tokio::sync::oneshot; /// Manages pruning under the control of the engine. /// /// This type controls the [Pruner]. -pub struct PruneHook { +pub struct PruneHook { /// The current state of the pruner. - pruner_state: PrunerState, + pruner_state: PrunerState, /// The type that can spawn the pruner task. pruner_task_spawner: Box, metrics: Metrics, } -impl fmt::Debug for PruneHook { +impl fmt::Debug for PruneHook +where + PF: DatabaseProviderFactory + fmt::Debug, +{ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("PruneHook") .field("pruner_state", &self.pruner_state) @@ -38,10 +40,10 @@ impl fmt::Debug for PruneHook { } } -impl PruneHook { +impl PruneHook { /// Create a new instance pub fn new( - pruner: Pruner>, + pruner: Pruner, pruner_task_spawner: Box, ) -> Self { Self { @@ -79,7 +81,13 @@ impl PruneHook { Poll::Ready(Ok(event)) } +} +impl PruneHook +where + PF: DatabaseProviderFactory + + 'static, +{ /// This will try to spawn the pruner if it is idle: /// 1. Check if pruning is needed through [`Pruner::is_pruning_needed`]. /// @@ -117,7 +125,11 @@ impl PruneHook { } } -impl EngineHook for PruneHook { +impl EngineHook for PruneHook +where + PF: DatabaseProviderFactory + + 'static, +{ fn name(&self) -> &'static str { "Prune" } @@ -152,16 +164,16 @@ impl EngineHook for PruneHook { /// running, it acquires the write lock over the database. This means that we cannot forward to the /// blockchain tree any messages that would result in database writes, since it would result in a /// deadlock. -enum PrunerState { +enum PrunerState { /// Pruner is idle. - Idle(Option>>), + Idle(Option>), /// Pruner is running and waiting for a response - Running(oneshot::Receiver>>), + Running(oneshot::Receiver>), } -impl fmt::Debug for PrunerState +impl fmt::Debug for PrunerState where - N: NodeTypesWithDB, + PF: DatabaseProviderFactory + Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { diff --git a/crates/consensus/beacon/src/engine/test_utils.rs b/crates/consensus/beacon/src/engine/test_utils.rs index 72e5d825f..0d95f8900 100644 --- a/crates/consensus/beacon/src/engine/test_utils.rs +++ b/crates/consensus/beacon/src/engine/test_utils.rs @@ -24,7 +24,7 @@ use reth_payload_builder::test_utils::spawn_test_payload_service; use reth_provider::{ providers::BlockchainProvider, test_utils::{create_test_provider_factory_with_chain_spec, MockNodeTypesWithDB}, - ExecutionOutcome, ProviderFactory, + ExecutionOutcome, }; use reth_prune::Pruner; use reth_prune_types::PruneModes; @@ -397,7 +397,7 @@ where let blockchain_provider = BlockchainProvider::with_blocks(provider_factory.clone(), tree, genesis_block, None); - let pruner = Pruner::<_, ProviderFactory<_>>::new( + let pruner = Pruner::new_with_factory( provider_factory.clone(), vec![], 5, diff --git a/crates/engine/service/src/service.rs b/crates/engine/service/src/service.rs index 2be1bcca8..880be5c02 100644 --- a/crates/engine/service/src/service.rs +++ b/crates/engine/service/src/service.rs @@ -19,7 +19,7 @@ use reth_node_types::NodeTypesWithEngine; use reth_payload_builder::PayloadBuilderHandle; use reth_payload_validator::ExecutionPayloadValidator; use reth_provider::{providers::BlockchainProvider2, ProviderFactory}; -use reth_prune::Pruner; +use reth_prune::PrunerWithFactory; use reth_stages_api::{MetricEventsSender, Pipeline}; use reth_tasks::TaskSpawner; use std::{ @@ -73,7 +73,7 @@ where pipeline_task_spawner: Box, provider: ProviderFactory, blockchain_db: BlockchainProvider2, - pruner: Pruner>, + pruner: PrunerWithFactory>, payload_builder: PayloadBuilderHandle, tree_config: TreeConfig, invalid_block_hook: Box, @@ -147,6 +147,7 @@ mod tests { use reth_network_p2p::test_utils::TestFullBlockClient; use reth_primitives::SealedHeader; use reth_provider::test_utils::create_test_provider_factory_with_chain_spec; + use reth_prune::Pruner; use reth_tasks::TokioTaskExecutor; use std::sync::Arc; use tokio::sync::{mpsc::unbounded_channel, watch}; @@ -178,8 +179,7 @@ mod tests { .unwrap(); let (_tx, rx) = watch::channel(FinishedExExHeight::NoExExs); - let pruner = - Pruner::<_, ProviderFactory<_>>::new(provider_factory.clone(), vec![], 0, 0, None, rx); + let pruner = Pruner::new_with_factory(provider_factory.clone(), vec![], 0, 0, None, rx); let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel(); let (tx, _rx) = unbounded_channel(); diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index 63a480fed..1a116b781 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -1,13 +1,12 @@ use crate::metrics::PersistenceMetrics; use reth_chain_state::ExecutedBlock; use reth_errors::ProviderError; -use reth_node_types::NodeTypesWithDB; use reth_primitives::BlockNumHash; use reth_provider::{ providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockHashReader, ProviderFactory, StaticFileProviderFactory, }; -use reth_prune::{Pruner, PrunerError, PrunerOutput}; +use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory}; use reth_stages_api::{MetricEvent, MetricEventsSender}; use std::{ sync::mpsc::{Receiver, SendError, Sender}, @@ -25,13 +24,13 @@ use tracing::{debug, error}; /// This should be spawned in its own thread with [`std::thread::spawn`], since this performs /// blocking I/O operations in an endless loop. #[derive(Debug)] -pub struct PersistenceService { +pub struct PersistenceService { /// The provider factory to use provider: ProviderFactory, /// Incoming requests incoming: Receiver, /// The pruner - pruner: Pruner>, + pruner: PrunerWithFactory>, /// metrics metrics: PersistenceMetrics, /// Sender for sync metrics - we only submit sync metrics for persisted blocks @@ -43,7 +42,7 @@ impl PersistenceService { pub fn new( provider: ProviderFactory, incoming: Receiver, - pruner: Pruner>, + pruner: PrunerWithFactory>, sync_metrics_tx: MetricEventsSender, ) -> Self { Self { provider, incoming, pruner, metrics: PersistenceMetrics::default(), sync_metrics_tx } @@ -187,7 +186,7 @@ impl PersistenceHandle { /// Create a new [`PersistenceHandle`], and spawn the persistence service. pub fn spawn_service( provider_factory: ProviderFactory, - pruner: Pruner>, + pruner: PrunerWithFactory>, sync_metrics_tx: MetricEventsSender, ) -> Self { // create the initial channels @@ -268,7 +267,7 @@ mod tests { use reth_chain_state::test_utils::TestBlockBuilder; use reth_exex_types::FinishedExExHeight; use reth_primitives::B256; - use reth_provider::{test_utils::create_test_provider_factory, ProviderFactory}; + use reth_provider::test_utils::create_test_provider_factory; use reth_prune::Pruner; use tokio::sync::mpsc::unbounded_channel; @@ -278,14 +277,8 @@ mod tests { let (_finished_exex_height_tx, finished_exex_height_rx) = tokio::sync::watch::channel(FinishedExExHeight::NoExExs); - let pruner = Pruner::<_, ProviderFactory<_>>::new( - provider.clone(), - vec![], - 5, - 0, - None, - finished_exex_height_rx, - ); + let pruner = + Pruner::new_with_factory(provider.clone(), vec![], 5, 0, None, finished_exex_height_rx); let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel(); PersistenceHandle::spawn_service(provider, pruner, sync_metrics_tx) diff --git a/crates/prune/prune/Cargo.toml b/crates/prune/prune/Cargo.toml index 091ecf424..2f2a37d5b 100644 --- a/crates/prune/prune/Cargo.toml +++ b/crates/prune/prune/Cargo.toml @@ -23,7 +23,6 @@ reth-tokio-util.workspace = true reth-config.workspace = true reth-prune-types.workspace = true reth-static-file-types.workspace = true -reth-node-types.workspace = true # metrics reth-metrics.workspace = true diff --git a/crates/prune/prune/src/builder.rs b/crates/prune/prune/src/builder.rs index 7bd699ce4..71d73c416 100644 --- a/crates/prune/prune/src/builder.rs +++ b/crates/prune/prune/src/builder.rs @@ -1,10 +1,12 @@ use crate::{segments::SegmentSet, Pruner}; use reth_chainspec::MAINNET; use reth_config::PruneConfig; -use reth_db_api::database::Database; +use reth_db::transaction::DbTxMut; use reth_exex_types::FinishedExExHeight; -use reth_node_types::NodeTypesWithDB; -use reth_provider::{providers::StaticFileProvider, ProviderFactory, StaticFileProviderFactory}; +use reth_provider::{ + providers::StaticFileProvider, BlockReader, DBProvider, DatabaseProviderFactory, + PruneCheckpointWriter, StaticFileProviderFactory, TransactionsProvider, +}; use reth_prune_types::PruneModes; use std::time::Duration; use tokio::sync::watch; @@ -72,14 +74,15 @@ impl PrunerBuilder { } /// Builds a [Pruner] from the current configuration with the given provider factory. - pub fn build_with_provider_factory( - self, - provider_factory: ProviderFactory, - ) -> Pruner> { + pub fn build_with_provider_factory(self, provider_factory: PF) -> Pruner + where + PF: DatabaseProviderFactory + + StaticFileProviderFactory, + { let segments = SegmentSet::from_components(provider_factory.static_file_provider(), self.segments); - Pruner::<_, ProviderFactory>::new( + Pruner::new_with_factory( provider_factory, segments.into_vec(), self.block_interval, @@ -90,10 +93,14 @@ impl PrunerBuilder { } /// Builds a [Pruner] from the current configuration with the given static file provider. - pub fn build(self, static_file_provider: StaticFileProvider) -> Pruner { - let segments = SegmentSet::::from_components(static_file_provider, self.segments); + pub fn build(self, static_file_provider: StaticFileProvider) -> Pruner + where + Provider: + DBProvider + BlockReader + PruneCheckpointWriter + TransactionsProvider, + { + let segments = SegmentSet::::from_components(static_file_provider, self.segments); - Pruner::<_, ()>::new( + Pruner::new( segments.into_vec(), self.block_interval, self.delete_limit, diff --git a/crates/prune/prune/src/db_ext.rs b/crates/prune/prune/src/db_ext.rs new file mode 100644 index 000000000..a14127af2 --- /dev/null +++ b/crates/prune/prune/src/db_ext.rs @@ -0,0 +1,127 @@ +use std::{fmt::Debug, ops::RangeBounds}; + +use reth_db::{ + cursor::{DbCursorRO, DbCursorRW, RangeWalker}, + table::{Table, TableRow}, + transaction::DbTxMut, + DatabaseError, +}; +use reth_prune_types::PruneLimiter; +use tracing::debug; + +pub(crate) trait DbTxPruneExt: DbTxMut { + /// Prune the table for the specified pre-sorted key iterator. + /// + /// Returns number of rows pruned. + fn prune_table_with_iterator( + &self, + keys: impl IntoIterator, + limiter: &mut PruneLimiter, + mut delete_callback: impl FnMut(TableRow), + ) -> Result<(usize, bool), DatabaseError> { + let mut cursor = self.cursor_write::()?; + let mut keys = keys.into_iter(); + + let mut deleted_entries = 0; + + for key in &mut keys { + if limiter.is_limit_reached() { + debug!( + target: "providers::db", + ?limiter, + deleted_entries_limit = %limiter.is_deleted_entries_limit_reached(), + time_limit = %limiter.is_time_limit_reached(), + table = %T::NAME, + "Pruning limit reached" + ); + break + } + + let row = cursor.seek_exact(key)?; + if let Some(row) = row { + cursor.delete_current()?; + limiter.increment_deleted_entries_count(); + deleted_entries += 1; + delete_callback(row); + } + } + + let done = keys.next().is_none(); + Ok((deleted_entries, done)) + } + + /// Prune the table for the specified key range. + /// + /// Returns number of rows pruned. + fn prune_table_with_range( + &self, + keys: impl RangeBounds + Clone + Debug, + limiter: &mut PruneLimiter, + mut skip_filter: impl FnMut(&TableRow) -> bool, + mut delete_callback: impl FnMut(TableRow), + ) -> Result<(usize, bool), DatabaseError> { + let mut cursor = self.cursor_write::()?; + let mut walker = cursor.walk_range(keys)?; + + let mut deleted_entries = 0; + + let done = loop { + // check for time out must be done in this scope since it's not done in + // `prune_table_with_range_step` + if limiter.is_limit_reached() { + debug!( + target: "providers::db", + ?limiter, + deleted_entries_limit = %limiter.is_deleted_entries_limit_reached(), + time_limit = %limiter.is_time_limit_reached(), + table = %T::NAME, + "Pruning limit reached" + ); + break false + } + + let done = self.prune_table_with_range_step( + &mut walker, + limiter, + &mut skip_filter, + &mut delete_callback, + )?; + + if done { + break true + } + deleted_entries += 1; + }; + + Ok((deleted_entries, done)) + } + + /// Steps once with the given walker and prunes the entry in the table. + /// + /// Returns `true` if the walker is finished, `false` if it may have more data to prune. + /// + /// CAUTION: Pruner limits are not checked. This allows for a clean exit of a prune run that's + /// pruning different tables concurrently, by letting them step to the same height before + /// timing out. + fn prune_table_with_range_step( + &self, + walker: &mut RangeWalker<'_, T, Self::CursorMut>, + limiter: &mut PruneLimiter, + skip_filter: &mut impl FnMut(&TableRow) -> bool, + delete_callback: &mut impl FnMut(TableRow), + ) -> Result { + let Some(res) = walker.next() else { return Ok(true) }; + + let row = res?; + + if !skip_filter(&row) { + walker.delete_current()?; + limiter.increment_deleted_entries_count(); + delete_callback(row); + } + + Ok(false) + } +} + +impl DbTxPruneExt for Tx where Tx: DbTxMut {} diff --git a/crates/prune/prune/src/lib.rs b/crates/prune/prune/src/lib.rs index 38453385e..5a43afeb5 100644 --- a/crates/prune/prune/src/lib.rs +++ b/crates/prune/prune/src/lib.rs @@ -10,6 +10,7 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] mod builder; +mod db_ext; mod error; mod event; mod metrics; @@ -20,7 +21,7 @@ use crate::metrics::Metrics; pub use builder::PrunerBuilder; pub use error::PrunerError; pub use event::PrunerEvent; -pub use pruner::{Pruner, PrunerResult, PrunerWithResult}; +pub use pruner::{Pruner, PrunerResult, PrunerWithFactory, PrunerWithResult}; // Re-export prune types #[doc(inline)] diff --git a/crates/prune/prune/src/pruner.rs b/crates/prune/prune/src/pruner.rs index f47705211..0fb388b1e 100644 --- a/crates/prune/prune/src/pruner.rs +++ b/crates/prune/prune/src/pruner.rs @@ -5,11 +5,9 @@ use crate::{ Metrics, PrunerError, PrunerEvent, }; use alloy_primitives::BlockNumber; -use reth_db_api::database::Database; use reth_exex_types::FinishedExExHeight; -use reth_node_types::NodeTypesWithDB; use reth_provider::{ - providers::ProviderNodeTypes, DatabaseProviderRW, ProviderFactory, PruneCheckpointReader, + DBProvider, DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter, }; use reth_prune_types::{PruneLimiter, PruneProgress, PruneSegment, PrunerOutput}; use reth_tokio_util::{EventSender, EventStream}; @@ -25,12 +23,15 @@ pub type PrunerWithResult = (Pruner, PrunerResult); type PrunerStats = Vec<(PruneSegment, usize, PruneProgress)>; +/// Pruner with preset provider factory. +pub type PrunerWithFactory = Pruner<::ProviderRW, PF>; + /// Pruning routine. Main pruning logic happens in [`Pruner::run`]. #[derive(Debug)] -pub struct Pruner { +pub struct Pruner { /// Provider factory. If pruner is initialized without it, it will be set to `()`. provider_factory: PF, - segments: Vec>>, + segments: Vec>>, /// Minimum pruning interval measured in blocks. All prune segments are checked and, if needed, /// pruned, when the chain advances by the specified number of blocks. min_block_interval: usize, @@ -49,10 +50,10 @@ pub struct Pruner { event_sender: EventSender, } -impl Pruner { +impl Pruner { /// Creates a new [Pruner] without a provider factory. pub fn new( - segments: Vec>>, + segments: Vec>>, min_block_interval: usize, delete_limit: usize, timeout: Option, @@ -72,11 +73,14 @@ impl Pruner { } } -impl Pruner> { +impl Pruner +where + PF: DatabaseProviderFactory, +{ /// Crates a new pruner with the given provider factory. - pub fn new( - provider_factory: ProviderFactory, - segments: Vec>>, + pub fn new_with_factory( + provider_factory: PF, + segments: Vec>>, min_block_interval: usize, delete_limit: usize, timeout: Option, @@ -96,15 +100,23 @@ impl Pruner> { } } -impl Pruner { +impl Pruner +where + Provider: PruneCheckpointReader + PruneCheckpointWriter, +{ /// Listen for events on the pruner. pub fn events(&self) -> EventStream { self.event_sender.new_listener() } - fn run_with_provider( + /// Run the pruner with the given provider. This will only prune data up to the highest finished + /// `ExEx` height, if there are no `ExExes`. + /// + /// Returns a [`PruneProgress`], indicating whether pruning is finished, or there is more data + /// to prune. + pub fn run_with_provider( &mut self, - provider: &DatabaseProviderRW, + provider: &Provider, tip_block_number: BlockNumber, ) -> PrunerResult { let Some(tip_block_number) = @@ -165,7 +177,7 @@ impl Pruner { /// Returns [`PrunerStats`], total number of entries pruned, and [`PruneProgress`]. fn prune_segments( &mut self, - provider: &DatabaseProviderRW, + provider: &Provider, tip_block_number: BlockNumber, limiter: &mut PruneLimiter, ) -> Result<(PrunerStats, usize, PrunerOutput), PrunerError> { @@ -299,23 +311,10 @@ impl Pruner { } } -impl Pruner { - /// Run the pruner with the given provider. This will only prune data up to the highest finished - /// ExEx height, if there are no ExExes. - /// - /// Returns a [`PruneProgress`], indicating whether pruning is finished, or there is more data - /// to prune. - #[allow(clippy::doc_markdown)] - pub fn run( - &mut self, - provider: &DatabaseProviderRW, - tip_block_number: BlockNumber, - ) -> PrunerResult { - self.run_with_provider(provider, tip_block_number) - } -} - -impl Pruner> { +impl Pruner +where + PF: DatabaseProviderFactory, +{ /// Run the pruner. This will only prune data up to the highest finished ExEx height, if there /// are no ExExes. /// @@ -323,7 +322,7 @@ impl Pruner> { /// to prune. #[allow(clippy::doc_markdown)] pub fn run(&mut self, tip_block_number: BlockNumber) -> PrunerResult { - let provider = self.provider_factory.provider_rw()?; + let provider = self.provider_factory.database_provider_rw()?; let result = self.run_with_provider(&provider, tip_block_number); provider.commit()?; result @@ -334,7 +333,7 @@ impl Pruner> { mod tests { use crate::Pruner; use reth_exex_types::FinishedExExHeight; - use reth_provider::{test_utils::create_test_provider_factory, ProviderFactory}; + use reth_provider::test_utils::create_test_provider_factory; #[test] fn is_pruning_needed() { @@ -343,14 +342,8 @@ mod tests { let (finished_exex_height_tx, finished_exex_height_rx) = tokio::sync::watch::channel(FinishedExExHeight::NoExExs); - let mut pruner = Pruner::<_, ProviderFactory<_>>::new( - provider_factory, - vec![], - 5, - 0, - None, - finished_exex_height_rx, - ); + let mut pruner = + Pruner::new_with_factory(provider_factory, vec![], 5, 0, None, finished_exex_height_rx); // No last pruned block number was set before let first_block_number = 1; diff --git a/crates/prune/prune/src/segments/mod.rs b/crates/prune/prune/src/segments/mod.rs index d9f453806..d1b7819ac 100644 --- a/crates/prune/prune/src/segments/mod.rs +++ b/crates/prune/prune/src/segments/mod.rs @@ -5,10 +5,7 @@ mod user; use crate::PrunerError; use alloy_primitives::{BlockNumber, TxNumber}; -use reth_db_api::database::Database; -use reth_provider::{ - errors::provider::ProviderResult, BlockReader, DatabaseProviderRW, PruneCheckpointWriter, -}; +use reth_provider::{errors::provider::ProviderResult, BlockReader, PruneCheckpointWriter}; use reth_prune_types::{ PruneCheckpoint, PruneLimiter, PruneMode, PrunePurpose, PruneSegment, SegmentOutput, }; @@ -31,7 +28,7 @@ pub use user::{ /// 2. If [`Segment::prune`] returned a [Some] in `checkpoint` of [`SegmentOutput`], call /// [`Segment::save_checkpoint`]. /// 3. Subtract `pruned` of [`SegmentOutput`] from `delete_limit` of next [`PruneInput`]. -pub trait Segment: Debug + Send + Sync { +pub trait Segment: Debug + Send + Sync { /// Segment of data that's pruned. fn segment(&self) -> PruneSegment; @@ -42,18 +39,17 @@ pub trait Segment: Debug + Send + Sync { fn purpose(&self) -> PrunePurpose; /// Prune data for [`Self::segment`] using the provided input. - fn prune( - &self, - provider: &DatabaseProviderRW, - input: PruneInput, - ) -> Result; + fn prune(&self, provider: &Provider, input: PruneInput) -> Result; /// Save checkpoint for [`Self::segment`] to the database. fn save_checkpoint( &self, - provider: &DatabaseProviderRW, + provider: &Provider, checkpoint: PruneCheckpoint, - ) -> ProviderResult<()> { + ) -> ProviderResult<()> + where + Provider: PruneCheckpointWriter, + { provider.save_prune_checkpoint(self.segment(), checkpoint) } } @@ -78,9 +74,9 @@ impl PruneInput { /// 2. If checkpoint doesn't exist, return 0. /// /// To get the range end: get last tx number for `to_block`. - pub(crate) fn get_next_tx_num_range( + pub(crate) fn get_next_tx_num_range( &self, - provider: &DatabaseProviderRW, + provider: &Provider, ) -> ProviderResult>> { let from_tx_number = self.previous_checkpoint // Checkpoint exists, prune from the next transaction after the highest pruned one diff --git a/crates/prune/prune/src/segments/receipts.rs b/crates/prune/prune/src/segments/receipts.rs index 67e889a18..944531fe7 100644 --- a/crates/prune/prune/src/segments/receipts.rs +++ b/crates/prune/prune/src/segments/receipts.rs @@ -5,11 +5,10 @@ //! - [`crate::segments::static_file::Receipts`] is responsible for pruning receipts on an archive //! node after static file producer has finished -use crate::{segments::PruneInput, PrunerError}; -use reth_db::tables; -use reth_db_api::database::Database; +use crate::{db_ext::DbTxPruneExt, segments::PruneInput, PrunerError}; +use reth_db::{tables, transaction::DbTxMut}; use reth_provider::{ - errors::provider::ProviderResult, DatabaseProviderRW, PruneCheckpointWriter, + errors::provider::ProviderResult, BlockReader, DBProvider, PruneCheckpointWriter, TransactionsProvider, }; use reth_prune_types::{ @@ -17,10 +16,13 @@ use reth_prune_types::{ }; use tracing::trace; -pub(crate) fn prune( - provider: &DatabaseProviderRW, +pub(crate) fn prune( + provider: &Provider, input: PruneInput, -) -> Result { +) -> Result +where + Provider: DBProvider + TransactionsProvider + BlockReader, +{ let tx_range = match input.get_next_tx_num_range(provider)? { Some(range) => range, None => { @@ -33,7 +35,7 @@ pub(crate) fn prune( let mut limiter = input.limiter; let mut last_pruned_transaction = tx_range_end; - let (pruned, done) = provider.prune_table_with_range::( + let (pruned, done) = provider.tx_ref().prune_table_with_range::( tx_range, &mut limiter, |_| false, @@ -60,8 +62,8 @@ pub(crate) fn prune( }) } -pub(crate) fn save_checkpoint( - provider: &DatabaseProviderRW, +pub(crate) fn save_checkpoint( + provider: impl PruneCheckpointWriter, checkpoint: PruneCheckpoint, ) -> ProviderResult<()> { provider.save_prune_checkpoint(PruneSegment::Receipts, checkpoint)?; @@ -83,7 +85,7 @@ mod tests { Itertools, }; use reth_db::tables; - use reth_provider::PruneCheckpointReader; + use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader}; use reth_prune_types::{ PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress, PruneSegment, }; @@ -158,7 +160,7 @@ mod tests { ) .sub(1); - let provider = db.factory.provider_rw().unwrap(); + let provider = db.factory.database_provider_rw().unwrap(); let result = super::prune(&provider, input).unwrap(); limiter.increment_deleted_entries_count_by(result.pruned); diff --git a/crates/prune/prune/src/segments/set.rs b/crates/prune/prune/src/segments/set.rs index d152bfe26..710b2b721 100644 --- a/crates/prune/prune/src/segments/set.rs +++ b/crates/prune/prune/src/segments/set.rs @@ -2,32 +2,35 @@ use crate::segments::{ AccountHistory, ReceiptsByLogs, Segment, SenderRecovery, StorageHistory, TransactionLookup, UserReceipts, }; -use reth_db_api::database::Database; -use reth_provider::providers::StaticFileProvider; +use reth_db::transaction::DbTxMut; +use reth_provider::{ + providers::StaticFileProvider, BlockReader, DBProvider, PruneCheckpointWriter, + TransactionsProvider, +}; use reth_prune_types::PruneModes; use super::{StaticFileHeaders, StaticFileReceipts, StaticFileTransactions}; /// Collection of [Segment]. Thread-safe, allocated on the heap. #[derive(Debug)] -pub struct SegmentSet { - inner: Vec>>, +pub struct SegmentSet { + inner: Vec>>, } -impl SegmentSet { +impl SegmentSet { /// Returns empty [`SegmentSet`] collection. pub fn new() -> Self { Self::default() } /// Adds new [Segment] to collection. - pub fn segment + 'static>(mut self, segment: S) -> Self { + pub fn segment + 'static>(mut self, segment: S) -> Self { self.inner.push(Box::new(segment)); self } /// Adds new [Segment] to collection if it's [Some]. - pub fn segment_opt + 'static>(self, segment: Option) -> Self { + pub fn segment_opt + 'static>(self, segment: Option) -> Self { if let Some(segment) = segment { return self.segment(segment) } @@ -35,10 +38,15 @@ impl SegmentSet { } /// Consumes [`SegmentSet`] and returns a [Vec]. - pub fn into_vec(self) -> Vec>> { + pub fn into_vec(self) -> Vec>> { self.inner } +} +impl SegmentSet +where + Provider: DBProvider + TransactionsProvider + PruneCheckpointWriter + BlockReader, +{ /// Creates a [`SegmentSet`] from an existing components, such as [`StaticFileProvider`] and /// [`PruneModes`]. pub fn from_components( @@ -79,7 +87,7 @@ impl SegmentSet { } } -impl Default for SegmentSet { +impl Default for SegmentSet { fn default() -> Self { Self { inner: Vec::new() } } diff --git a/crates/prune/prune/src/segments/static_file/headers.rs b/crates/prune/prune/src/segments/static_file/headers.rs index 450dae3a8..a3daf504e 100644 --- a/crates/prune/prune/src/segments/static_file/headers.rs +++ b/crates/prune/prune/src/segments/static_file/headers.rs @@ -1,6 +1,7 @@ use std::num::NonZeroUsize; use crate::{ + db_ext::DbTxPruneExt, segments::{PruneInput, Segment}, PrunerError, }; @@ -8,11 +9,10 @@ use alloy_primitives::BlockNumber; use itertools::Itertools; use reth_db::{ cursor::{DbCursorRO, RangeWalker}, - database::Database, tables, transaction::DbTxMut, }; -use reth_provider::{providers::StaticFileProvider, DatabaseProviderRW}; +use reth_provider::{providers::StaticFileProvider, DBProvider}; use reth_prune_types::{ PruneLimiter, PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint, @@ -34,7 +34,7 @@ impl Headers { } } -impl Segment for Headers { +impl> Segment for Headers { fn segment(&self) -> PruneSegment { PruneSegment::Headers } @@ -49,11 +49,7 @@ impl Segment for Headers { PrunePurpose::StaticFile } - fn prune( - &self, - provider: &DatabaseProviderRW, - input: PruneInput, - ) -> Result { + fn prune(&self, provider: &Provider, input: PruneInput) -> Result { let (block_range_start, block_range_end) = match input.get_next_block_range() { Some(range) => (*range.start(), *range.end()), None => { @@ -106,18 +102,19 @@ impl Segment for Headers { }) } } -type Walker<'a, DB, T> = RangeWalker<'a, T, <::TXMut as DbTxMut>::CursorMut>; +type Walker<'a, Provider, T> = + RangeWalker<'a, T, <::Tx as DbTxMut>::CursorMut>; #[allow(missing_debug_implementations)] -struct HeaderTablesIter<'a, DB> +struct HeaderTablesIter<'a, Provider> where - DB: Database, + Provider: DBProvider, { - provider: &'a DatabaseProviderRW, + provider: &'a Provider, limiter: &'a mut PruneLimiter, - headers_walker: Walker<'a, DB, tables::Headers>, - header_tds_walker: Walker<'a, DB, tables::HeaderTerminalDifficulties>, - canonical_headers_walker: Walker<'a, DB, tables::CanonicalHeaders>, + headers_walker: Walker<'a, Provider, tables::Headers>, + header_tds_walker: Walker<'a, Provider, tables::HeaderTerminalDifficulties>, + canonical_headers_walker: Walker<'a, Provider, tables::CanonicalHeaders>, } struct HeaderTablesIterItem { @@ -125,24 +122,24 @@ struct HeaderTablesIterItem { entries_pruned: usize, } -impl<'a, DB> HeaderTablesIter<'a, DB> +impl<'a, Provider> HeaderTablesIter<'a, Provider> where - DB: Database, + Provider: DBProvider, { fn new( - provider: &'a DatabaseProviderRW, + provider: &'a Provider, limiter: &'a mut PruneLimiter, - headers_walker: Walker<'a, DB, tables::Headers>, - header_tds_walker: Walker<'a, DB, tables::HeaderTerminalDifficulties>, - canonical_headers_walker: Walker<'a, DB, tables::CanonicalHeaders>, + headers_walker: Walker<'a, Provider, tables::Headers>, + header_tds_walker: Walker<'a, Provider, tables::HeaderTerminalDifficulties>, + canonical_headers_walker: Walker<'a, Provider, tables::CanonicalHeaders>, ) -> Self { Self { provider, limiter, headers_walker, header_tds_walker, canonical_headers_walker } } } -impl<'a, DB> Iterator for HeaderTablesIter<'a, DB> +impl<'a, Provider> Iterator for HeaderTablesIter<'a, Provider> where - DB: Database, + Provider: DBProvider, { type Item = Result; fn next(&mut self) -> Option { @@ -154,7 +151,7 @@ where let mut pruned_block_td = None; let mut pruned_block_canonical = None; - if let Err(err) = self.provider.prune_table_with_range_step( + if let Err(err) = self.provider.tx_ref().prune_table_with_range_step( &mut self.headers_walker, self.limiter, &mut |_| false, @@ -163,7 +160,7 @@ where return Some(Err(err.into())) } - if let Err(err) = self.provider.prune_table_with_range_step( + if let Err(err) = self.provider.tx_ref().prune_table_with_range_step( &mut self.header_tds_walker, self.limiter, &mut |_| false, @@ -172,7 +169,7 @@ where return Some(Err(err.into())) } - if let Err(err) = self.provider.prune_table_with_range_step( + if let Err(err) = self.provider.tx_ref().prune_table_with_range_step( &mut self.canonical_headers_walker, self.limiter, &mut |_| false, @@ -202,7 +199,10 @@ mod tests { use assert_matches::assert_matches; use reth_db::tables; use reth_db_api::transaction::DbTx; - use reth_provider::{PruneCheckpointReader, PruneCheckpointWriter, StaticFileProviderFactory}; + use reth_provider::{ + DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter, + StaticFileProviderFactory, + }; use reth_prune_types::{ PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress, PruneSegment, SegmentOutputCheckpoint, @@ -254,7 +254,7 @@ mod tests { .map(|block_number| block_number + 1) .unwrap_or_default(); - let provider = db.factory.provider_rw().unwrap(); + let provider = db.factory.database_provider_rw().unwrap(); let result = segment.prune(&provider, input.clone()).unwrap(); limiter.increment_deleted_entries_count_by(result.pruned); trace!(target: "pruner::test", @@ -325,7 +325,7 @@ mod tests { limiter, }; - let provider = db.factory.provider_rw().unwrap(); + let provider = db.factory.database_provider_rw().unwrap(); let segment = super::Headers::new(db.factory.static_file_provider()); let result = segment.prune(&provider, input).unwrap(); assert_eq!( diff --git a/crates/prune/prune/src/segments/static_file/receipts.rs b/crates/prune/prune/src/segments/static_file/receipts.rs index e84f7df44..f766f7ea1 100644 --- a/crates/prune/prune/src/segments/static_file/receipts.rs +++ b/crates/prune/prune/src/segments/static_file/receipts.rs @@ -2,9 +2,10 @@ use crate::{ segments::{PruneInput, Segment}, PrunerError, }; -use reth_db_api::database::Database; +use reth_db::transaction::DbTxMut; use reth_provider::{ - errors::provider::ProviderResult, providers::StaticFileProvider, DatabaseProviderRW, + errors::provider::ProviderResult, providers::StaticFileProvider, BlockReader, DBProvider, + PruneCheckpointWriter, TransactionsProvider, }; use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, SegmentOutput}; use reth_static_file_types::StaticFileSegment; @@ -20,7 +21,10 @@ impl Receipts { } } -impl Segment for Receipts { +impl Segment for Receipts +where + Provider: DBProvider + PruneCheckpointWriter + TransactionsProvider + BlockReader, +{ fn segment(&self) -> PruneSegment { PruneSegment::Receipts } @@ -35,17 +39,13 @@ impl Segment for Receipts { PrunePurpose::StaticFile } - fn prune( - &self, - provider: &DatabaseProviderRW, - input: PruneInput, - ) -> Result { + fn prune(&self, provider: &Provider, input: PruneInput) -> Result { crate::segments::receipts::prune(provider, input) } fn save_checkpoint( &self, - provider: &DatabaseProviderRW, + provider: &Provider, checkpoint: PruneCheckpoint, ) -> ProviderResult<()> { crate::segments::receipts::save_checkpoint(provider, checkpoint) diff --git a/crates/prune/prune/src/segments/static_file/transactions.rs b/crates/prune/prune/src/segments/static_file/transactions.rs index 20199ba40..0daf7e254 100644 --- a/crates/prune/prune/src/segments/static_file/transactions.rs +++ b/crates/prune/prune/src/segments/static_file/transactions.rs @@ -1,10 +1,10 @@ use crate::{ + db_ext::DbTxPruneExt, segments::{PruneInput, Segment}, PrunerError, }; -use reth_db::tables; -use reth_db_api::database::Database; -use reth_provider::{providers::StaticFileProvider, DatabaseProviderRW, TransactionsProvider}; +use reth_db::{tables, transaction::DbTxMut}; +use reth_provider::{providers::StaticFileProvider, BlockReader, DBProvider, TransactionsProvider}; use reth_prune_types::{ PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint, }; @@ -22,7 +22,10 @@ impl Transactions { } } -impl Segment for Transactions { +impl Segment for Transactions +where + Provider: DBProvider + TransactionsProvider + BlockReader, +{ fn segment(&self) -> PruneSegment { PruneSegment::Transactions } @@ -37,11 +40,7 @@ impl Segment for Transactions { PrunePurpose::StaticFile } - fn prune( - &self, - provider: &DatabaseProviderRW, - input: PruneInput, - ) -> Result { + fn prune(&self, provider: &Provider, input: PruneInput) -> Result { let tx_range = match input.get_next_tx_num_range(provider)? { Some(range) => range, None => { @@ -53,7 +52,7 @@ impl Segment for Transactions { let mut limiter = input.limiter; let mut last_pruned_transaction = *tx_range.end(); - let (pruned, done) = provider.prune_table_with_range::( + let (pruned, done) = provider.tx_ref().prune_table_with_range::( tx_range, &mut limiter, |_| false, @@ -91,7 +90,10 @@ mod tests { Itertools, }; use reth_db::tables; - use reth_provider::{PruneCheckpointReader, PruneCheckpointWriter, StaticFileProviderFactory}; + use reth_provider::{ + DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter, + StaticFileProviderFactory, + }; use reth_prune_types::{ PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress, PruneSegment, SegmentOutput, @@ -141,7 +143,7 @@ mod tests { .map(|tx_number| tx_number + 1) .unwrap_or_default(); - let provider = db.factory.provider_rw().unwrap(); + let provider = db.factory.database_provider_rw().unwrap(); let result = segment.prune(&provider, input.clone()).unwrap(); limiter.increment_deleted_entries_count_by(result.pruned); diff --git a/crates/prune/prune/src/segments/user/account_history.rs b/crates/prune/prune/src/segments/user/account_history.rs index 4e5a99bc3..016d9a22f 100644 --- a/crates/prune/prune/src/segments/user/account_history.rs +++ b/crates/prune/prune/src/segments/user/account_history.rs @@ -1,11 +1,12 @@ use crate::{ + db_ext::DbTxPruneExt, segments::{user::history::prune_history_indices, PruneInput, Segment}, PrunerError, }; use itertools::Itertools; -use reth_db::tables; -use reth_db_api::{database::Database, models::ShardedKey}; -use reth_provider::DatabaseProviderRW; +use reth_db::{tables, transaction::DbTxMut}; +use reth_db_api::models::ShardedKey; +use reth_provider::DBProvider; use reth_prune_types::{ PruneInterruptReason, PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint, @@ -30,7 +31,10 @@ impl AccountHistory { } } -impl Segment for AccountHistory { +impl Segment for AccountHistory +where + Provider: DBProvider, +{ fn segment(&self) -> PruneSegment { PruneSegment::AccountHistory } @@ -44,11 +48,7 @@ impl Segment for AccountHistory { } #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)] - fn prune( - &self, - provider: &DatabaseProviderRW, - input: PruneInput, - ) -> Result { + fn prune(&self, provider: &Provider, input: PruneInput) -> Result { let range = match input.get_next_block_range() { Some(range) => range, None => { @@ -80,8 +80,8 @@ impl Segment for AccountHistory { // size should be up to 0.5MB + some hashmap overhead. `blocks_since_last_run` is // additionally limited by the `max_reorg_depth`, so no OOM is expected here. let mut highest_deleted_accounts = FxHashMap::default(); - let (pruned_changesets, done) = provider - .prune_table_with_range::( + let (pruned_changesets, done) = + provider.tx_ref().prune_table_with_range::( range, &mut limiter, |_| false, @@ -106,7 +106,7 @@ impl Segment for AccountHistory { .map(|(address, block_number)| { ShardedKey::new(address, block_number.min(last_changeset_pruned_block)) }); - let outcomes = prune_history_indices::( + let outcomes = prune_history_indices::( provider, highest_sharded_keys, |a, b| a.key == b.key, @@ -135,7 +135,7 @@ mod tests { use alloy_primitives::{BlockNumber, B256}; use assert_matches::assert_matches; use reth_db::{tables, BlockNumberList}; - use reth_provider::PruneCheckpointReader; + use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader}; use reth_prune_types::{ PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress, PruneSegment, }; @@ -203,7 +203,7 @@ mod tests { }; let segment = AccountHistory::new(prune_mode); - let provider = db.factory.provider_rw().unwrap(); + let provider = db.factory.database_provider_rw().unwrap(); let result = segment.prune(&provider, input).unwrap(); limiter.increment_deleted_entries_count_by(result.pruned); diff --git a/crates/prune/prune/src/segments/user/history.rs b/crates/prune/prune/src/segments/user/history.rs index 3551deeed..e27884a92 100644 --- a/crates/prune/prune/src/segments/user/history.rs +++ b/crates/prune/prune/src/segments/user/history.rs @@ -2,13 +2,12 @@ use alloy_primitives::BlockNumber; use reth_db::{BlockNumberList, RawKey, RawTable, RawValue}; use reth_db_api::{ cursor::{DbCursorRO, DbCursorRW}, - database::Database, models::ShardedKey, table::Table, transaction::DbTxMut, DatabaseError, }; -use reth_provider::DatabaseProviderRW; +use reth_provider::DBProvider; enum PruneShardOutcome { Deleted, @@ -26,13 +25,13 @@ pub(crate) struct PrunedIndices { /// Prune history indices according to the provided list of highest sharded keys. /// /// Returns total number of deleted, updated and unchanged entities. -pub(crate) fn prune_history_indices( - provider: &DatabaseProviderRW, +pub(crate) fn prune_history_indices( + provider: &Provider, highest_sharded_keys: impl IntoIterator, key_matches: impl Fn(&T::Key, &T::Key) -> bool, ) -> Result where - DB: Database, + Provider: DBProvider, T: Table, T::Key: AsRef>, { diff --git a/crates/prune/prune/src/segments/user/receipts.rs b/crates/prune/prune/src/segments/user/receipts.rs index 7185affd0..5bc9feaf0 100644 --- a/crates/prune/prune/src/segments/user/receipts.rs +++ b/crates/prune/prune/src/segments/user/receipts.rs @@ -2,8 +2,11 @@ use crate::{ segments::{PruneInput, Segment}, PrunerError, }; -use reth_db_api::database::Database; -use reth_provider::{errors::provider::ProviderResult, DatabaseProviderRW}; +use reth_db::transaction::DbTxMut; +use reth_provider::{ + errors::provider::ProviderResult, BlockReader, DBProvider, PruneCheckpointWriter, + TransactionsProvider, +}; use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, SegmentOutput}; use tracing::instrument; @@ -18,7 +21,10 @@ impl Receipts { } } -impl Segment for Receipts { +impl Segment for Receipts +where + Provider: DBProvider + PruneCheckpointWriter + TransactionsProvider + BlockReader, +{ fn segment(&self) -> PruneSegment { PruneSegment::Receipts } @@ -32,17 +38,13 @@ impl Segment for Receipts { } #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)] - fn prune( - &self, - provider: &DatabaseProviderRW, - input: PruneInput, - ) -> Result { + fn prune(&self, provider: &Provider, input: PruneInput) -> Result { crate::segments::receipts::prune(provider, input) } fn save_checkpoint( &self, - provider: &DatabaseProviderRW, + provider: &Provider, checkpoint: PruneCheckpoint, ) -> ProviderResult<()> { crate::segments::receipts::save_checkpoint(provider, checkpoint) diff --git a/crates/prune/prune/src/segments/user/receipts_by_logs.rs b/crates/prune/prune/src/segments/user/receipts_by_logs.rs index 10a385bda..fbb353b41 100644 --- a/crates/prune/prune/src/segments/user/receipts_by_logs.rs +++ b/crates/prune/prune/src/segments/user/receipts_by_logs.rs @@ -1,10 +1,10 @@ use crate::{ + db_ext::DbTxPruneExt, segments::{PruneInput, Segment}, PrunerError, }; -use reth_db::tables; -use reth_db_api::database::Database; -use reth_provider::{BlockReader, DatabaseProviderRW, PruneCheckpointWriter, TransactionsProvider}; +use reth_db::{tables, transaction::DbTxMut}; +use reth_provider::{BlockReader, DBProvider, PruneCheckpointWriter, TransactionsProvider}; use reth_prune_types::{ PruneCheckpoint, PruneMode, PruneProgress, PrunePurpose, PruneSegment, ReceiptsLogPruneConfig, SegmentOutput, MINIMUM_PRUNING_DISTANCE, @@ -22,7 +22,10 @@ impl ReceiptsByLogs { } } -impl Segment for ReceiptsByLogs { +impl Segment for ReceiptsByLogs +where + Provider: DBProvider + PruneCheckpointWriter + TransactionsProvider + BlockReader, +{ fn segment(&self) -> PruneSegment { PruneSegment::ContractLogs } @@ -36,11 +39,7 @@ impl Segment for ReceiptsByLogs { } #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)] - fn prune( - &self, - provider: &DatabaseProviderRW, - input: PruneInput, - ) -> Result { + fn prune(&self, provider: &Provider, input: PruneInput) -> Result { // Contract log filtering removes every receipt possible except the ones in the list. So, // for the other receipts it's as if they had a `PruneMode::Distance()` of // `MINIMUM_PRUNING_DISTANCE`. @@ -143,7 +142,7 @@ impl Segment for ReceiptsByLogs { // Delete receipts, except the ones in the inclusion list let mut last_skipped_transaction = 0; let deleted; - (deleted, done) = provider.prune_table_with_range::( + (deleted, done) = provider.tx_ref().prune_table_with_range::( tx_range, &mut limiter, |(tx_num, receipt)| { @@ -224,7 +223,7 @@ mod tests { use assert_matches::assert_matches; use reth_db::tables; use reth_db_api::{cursor::DbCursorRO, transaction::DbTx}; - use reth_provider::{PruneCheckpointReader, TransactionsProvider}; + use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader, TransactionsProvider}; use reth_prune_types::{PruneLimiter, PruneMode, PruneSegment, ReceiptsLogPruneConfig}; use reth_stages::test_utils::{StorageKind, TestStageDB}; use reth_testing_utils::generators::{ @@ -286,7 +285,7 @@ mod tests { ); let run_prune = || { - let provider = db.factory.provider_rw().unwrap(); + let provider = db.factory.database_provider_rw().unwrap(); let prune_before_block: usize = 20; let prune_mode = PruneMode::Before(prune_before_block as u64); diff --git a/crates/prune/prune/src/segments/user/sender_recovery.rs b/crates/prune/prune/src/segments/user/sender_recovery.rs index 76058608e..de3fd686f 100644 --- a/crates/prune/prune/src/segments/user/sender_recovery.rs +++ b/crates/prune/prune/src/segments/user/sender_recovery.rs @@ -1,10 +1,10 @@ use crate::{ + db_ext::DbTxPruneExt, segments::{PruneInput, Segment}, PrunerError, }; -use reth_db::tables; -use reth_db_api::database::Database; -use reth_provider::{DatabaseProviderRW, TransactionsProvider}; +use reth_db::{tables, transaction::DbTxMut}; +use reth_provider::{BlockReader, DBProvider, TransactionsProvider}; use reth_prune_types::{ PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint, }; @@ -21,7 +21,10 @@ impl SenderRecovery { } } -impl Segment for SenderRecovery { +impl Segment for SenderRecovery +where + Provider: DBProvider + TransactionsProvider + BlockReader, +{ fn segment(&self) -> PruneSegment { PruneSegment::SenderRecovery } @@ -35,11 +38,7 @@ impl Segment for SenderRecovery { } #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)] - fn prune( - &self, - provider: &DatabaseProviderRW, - input: PruneInput, - ) -> Result { + fn prune(&self, provider: &Provider, input: PruneInput) -> Result { let tx_range = match input.get_next_tx_num_range(provider)? { Some(range) => range, None => { @@ -52,12 +51,13 @@ impl Segment for SenderRecovery { let mut limiter = input.limiter; let mut last_pruned_transaction = tx_range_end; - let (pruned, done) = provider.prune_table_with_range::( - tx_range, - &mut limiter, - |_| false, - |row| last_pruned_transaction = row.0, - )?; + let (pruned, done) = + provider.tx_ref().prune_table_with_range::( + tx_range, + &mut limiter, + |_| false, + |row| last_pruned_transaction = row.0, + )?; trace!(target: "pruner", %pruned, %done, "Pruned transaction senders"); let last_pruned_block = provider @@ -90,7 +90,7 @@ mod tests { Itertools, }; use reth_db::tables; - use reth_provider::PruneCheckpointReader; + use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader}; use reth_prune_types::{PruneCheckpoint, PruneLimiter, PruneMode, PruneProgress, PruneSegment}; use reth_stages::test_utils::{StorageKind, TestStageDB}; use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams}; @@ -179,7 +179,7 @@ mod tests { .into_inner() .0; - let provider = db.factory.provider_rw().unwrap(); + let provider = db.factory.database_provider_rw().unwrap(); let result = segment.prune(&provider, input).unwrap(); limiter.increment_deleted_entries_count_by(result.pruned); diff --git a/crates/prune/prune/src/segments/user/storage_history.rs b/crates/prune/prune/src/segments/user/storage_history.rs index f28ea6ccf..5291d822c 100644 --- a/crates/prune/prune/src/segments/user/storage_history.rs +++ b/crates/prune/prune/src/segments/user/storage_history.rs @@ -1,14 +1,12 @@ use crate::{ + db_ext::DbTxPruneExt, segments::{user::history::prune_history_indices, PruneInput, Segment, SegmentOutput}, PrunerError, }; use itertools::Itertools; -use reth_db::tables; -use reth_db_api::{ - database::Database, - models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress}, -}; -use reth_provider::DatabaseProviderRW; +use reth_db::{tables, transaction::DbTxMut}; +use reth_db_api::models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress}; +use reth_provider::DBProvider; use reth_prune_types::{ PruneInterruptReason, PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutputCheckpoint, @@ -33,7 +31,10 @@ impl StorageHistory { } } -impl Segment for StorageHistory { +impl Segment for StorageHistory +where + Provider: DBProvider, +{ fn segment(&self) -> PruneSegment { PruneSegment::StorageHistory } @@ -47,11 +48,7 @@ impl Segment for StorageHistory { } #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)] - fn prune( - &self, - provider: &DatabaseProviderRW, - input: PruneInput, - ) -> Result { + fn prune(&self, provider: &Provider, input: PruneInput) -> Result { let range = match input.get_next_block_range() { Some(range) => range, None => { @@ -83,8 +80,8 @@ impl Segment for StorageHistory { // size should be up to 0.5MB + some hashmap overhead. `blocks_since_last_run` is // additionally limited by the `max_reorg_depth`, so no OOM is expected here. let mut highest_deleted_storages = FxHashMap::default(); - let (pruned_changesets, done) = provider - .prune_table_with_range::( + let (pruned_changesets, done) = + provider.tx_ref().prune_table_with_range::( BlockNumberAddress::range(range), &mut limiter, |_| false, @@ -114,7 +111,7 @@ impl Segment for StorageHistory { block_number.min(last_changeset_pruned_block), ) }); - let outcomes = prune_history_indices::( + let outcomes = prune_history_indices::( provider, highest_sharded_keys, |a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key, @@ -143,7 +140,7 @@ mod tests { use alloy_primitives::{BlockNumber, B256}; use assert_matches::assert_matches; use reth_db::{tables, BlockNumberList}; - use reth_provider::PruneCheckpointReader; + use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader}; use reth_prune_types::{PruneCheckpoint, PruneLimiter, PruneMode, PruneProgress, PruneSegment}; use reth_stages::test_utils::{StorageKind, TestStageDB}; use reth_testing_utils::generators::{ @@ -210,7 +207,7 @@ mod tests { }; let segment = StorageHistory::new(prune_mode); - let provider = db.factory.provider_rw().unwrap(); + let provider = db.factory.database_provider_rw().unwrap(); let result = segment.prune(&provider, input).unwrap(); limiter.increment_deleted_entries_count_by(result.pruned); diff --git a/crates/prune/prune/src/segments/user/transaction_lookup.rs b/crates/prune/prune/src/segments/user/transaction_lookup.rs index ef1fc0ae8..5b9b7454f 100644 --- a/crates/prune/prune/src/segments/user/transaction_lookup.rs +++ b/crates/prune/prune/src/segments/user/transaction_lookup.rs @@ -1,11 +1,11 @@ use crate::{ + db_ext::DbTxPruneExt, segments::{PruneInput, Segment, SegmentOutput}, PrunerError, }; use rayon::prelude::*; -use reth_db::tables; -use reth_db_api::database::Database; -use reth_provider::{DatabaseProviderRW, TransactionsProvider}; +use reth_db::{tables, transaction::DbTxMut}; +use reth_provider::{BlockReader, DBProvider, TransactionsProvider}; use reth_prune_types::{ PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutputCheckpoint, }; @@ -22,7 +22,10 @@ impl TransactionLookup { } } -impl Segment for TransactionLookup { +impl Segment for TransactionLookup +where + Provider: DBProvider + TransactionsProvider + BlockReader, +{ fn segment(&self) -> PruneSegment { PruneSegment::TransactionLookup } @@ -36,11 +39,7 @@ impl Segment for TransactionLookup { } #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)] - fn prune( - &self, - provider: &DatabaseProviderRW, - input: PruneInput, - ) -> Result { + fn prune(&self, provider: &Provider, input: PruneInput) -> Result { let (start, end) = match input.get_next_tx_num_range(provider)? { Some(range) => range, None => { @@ -73,13 +72,15 @@ impl Segment for TransactionLookup { let mut limiter = input.limiter; let mut last_pruned_transaction = None; - let (pruned, done) = provider.prune_table_with_iterator::( - hashes, - &mut limiter, - |row| { - last_pruned_transaction = Some(last_pruned_transaction.unwrap_or(row.1).max(row.1)) - }, - )?; + let (pruned, done) = + provider.tx_ref().prune_table_with_iterator::( + hashes, + &mut limiter, + |row| { + last_pruned_transaction = + Some(last_pruned_transaction.unwrap_or(row.1).max(row.1)) + }, + )?; let done = done && tx_range_end == end; trace!(target: "pruner", %pruned, %done, "Pruned transaction lookup"); @@ -117,7 +118,7 @@ mod tests { Itertools, }; use reth_db::tables; - use reth_provider::PruneCheckpointReader; + use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader}; use reth_prune_types::{ PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress, PruneSegment, }; @@ -204,7 +205,7 @@ mod tests { .into_inner() .0; - let provider = db.factory.provider_rw().unwrap(); + let provider = db.factory.database_provider_rw().unwrap(); let result = segment.prune(&provider, input).unwrap(); limiter.increment_deleted_entries_count_by(result.pruned); diff --git a/crates/stages/stages/src/stages/prune.rs b/crates/stages/stages/src/stages/prune.rs index 7b162bd16..407073bce 100644 --- a/crates/stages/stages/src/stages/prune.rs +++ b/crates/stages/stages/src/stages/prune.rs @@ -47,7 +47,7 @@ impl Stage for PruneStage { .delete_limit(self.commit_threshold) .build(provider.static_file_provider().clone()); - let result = pruner.run(provider, input.target())?; + let result = pruner.run_with_provider(&provider.0, input.target())?; if result.progress.is_finished() { Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()), done: true }) } else { diff --git a/crates/static-file/static-file/src/static_file_producer.rs b/crates/static-file/static-file/src/static_file_producer.rs index eca59e1e7..d15451202 100644 --- a/crates/static-file/static-file/src/static_file_producer.rs +++ b/crates/static-file/static-file/src/static_file_producer.rs @@ -159,8 +159,7 @@ where // Create a new database transaction on every segment to prevent long-lived read-only // transactions - let mut provider = self.provider.database_provider_ro()?; - provider.disable_long_read_transaction_safety(); + let provider = self.provider.database_provider_ro()?.disable_long_read_transaction_safety(); segment.copy_to_static_files(provider, self.provider.static_file_provider(), block_range.clone())?; let elapsed = start.elapsed(); // TODO(alexey): track in metrics diff --git a/crates/storage/provider/src/providers/blockchain_provider.rs b/crates/storage/provider/src/providers/blockchain_provider.rs index 29fd29fdc..2eb598e1f 100644 --- a/crates/storage/provider/src/providers/blockchain_provider.rs +++ b/crates/storage/provider/src/providers/blockchain_provider.rs @@ -13,6 +13,7 @@ use reth_chain_state::{ MemoryOverlayStateProvider, }; use reth_chainspec::ChainInfo; +use reth_db::Database; use reth_db_api::models::{AccountBeforeTx, StoredBlockBodyIndices}; use reth_evm::ConfigureEvmEnv; use reth_execution_types::ExecutionOutcome; @@ -34,7 +35,7 @@ use std::{ }; use tracing::trace; -use super::ProviderNodeTypes; +use super::{DatabaseProvider, ProviderNodeTypes}; /// The main type for interacting with the blockchain. /// @@ -263,10 +264,15 @@ impl BlockchainProvider2 { impl DatabaseProviderFactory for BlockchainProvider2 { type DB = N::DB; - type Provider = DatabaseProviderRO; + type Provider = DatabaseProvider<::TX>; + type ProviderRW = DatabaseProvider<::TXMut>; fn database_provider_ro(&self) -> ProviderResult { - self.database.provider() + self.database.database_provider_ro() + } + + fn database_provider_rw(&self) -> ProviderResult { + self.database.database_provider_rw() } } diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index df29d809c..0011e2848 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -186,10 +186,15 @@ impl ProviderFactory { impl DatabaseProviderFactory for ProviderFactory { type DB = N::DB; type Provider = DatabaseProviderRO; + type ProviderRW = DatabaseProvider<::TXMut>; fn database_provider_ro(&self) -> ProviderResult { self.provider() } + + fn database_provider_rw(&self) -> ProviderResult { + self.provider_rw().map(|provider| provider.0) + } } impl StaticFileProviderFactory for ProviderFactory { diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 0077081b3..91d27ac70 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -23,13 +23,13 @@ use reth_db::{ }; use reth_db_api::{ common::KeyValue, - cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, RangeWalker}, + cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO}, database::Database, models::{ sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress, ShardedKey, StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals, }, - table::{Table, TableRow}, + table::Table, transaction::{DbTx, DbTxMut}, DatabaseError, }; @@ -43,7 +43,7 @@ use reth_primitives::{ TransactionSigned, TransactionSignedEcRecovered, TransactionSignedNoHash, TxHash, TxNumber, Withdrawal, Withdrawals, B256, U256, }; -use reth_prune_types::{PruneCheckpoint, PruneLimiter, PruneModes, PruneSegment}; +use reth_prune_types::{PruneCheckpoint, PruneModes, PruneSegment}; use reth_stages_types::{StageCheckpoint, StageId}; use reth_storage_api::TryIntoHistoricalStateProvider; use reth_storage_errors::provider::{ProviderResult, RootMismatch}; @@ -1519,119 +1519,6 @@ impl DatabaseProvider { Ok(()) } - /// Prune the table for the specified pre-sorted key iterator. - /// - /// Returns number of rows pruned. - pub fn prune_table_with_iterator( - &self, - keys: impl IntoIterator, - limiter: &mut PruneLimiter, - mut delete_callback: impl FnMut(TableRow), - ) -> Result<(usize, bool), DatabaseError> { - let mut cursor = self.tx.cursor_write::()?; - let mut keys = keys.into_iter(); - - let mut deleted_entries = 0; - - for key in &mut keys { - if limiter.is_limit_reached() { - debug!( - target: "providers::db", - ?limiter, - deleted_entries_limit = %limiter.is_deleted_entries_limit_reached(), - time_limit = %limiter.is_time_limit_reached(), - table = %T::NAME, - "Pruning limit reached" - ); - break - } - - let row = cursor.seek_exact(key)?; - if let Some(row) = row { - cursor.delete_current()?; - limiter.increment_deleted_entries_count(); - deleted_entries += 1; - delete_callback(row); - } - } - - let done = keys.next().is_none(); - Ok((deleted_entries, done)) - } - - /// Prune the table for the specified key range. - /// - /// Returns number of rows pruned. - pub fn prune_table_with_range( - &self, - keys: impl RangeBounds + Clone + Debug, - limiter: &mut PruneLimiter, - mut skip_filter: impl FnMut(&TableRow) -> bool, - mut delete_callback: impl FnMut(TableRow), - ) -> Result<(usize, bool), DatabaseError> { - let mut cursor = self.tx.cursor_write::()?; - let mut walker = cursor.walk_range(keys)?; - - let mut deleted_entries = 0; - - let done = loop { - // check for time out must be done in this scope since it's not done in - // `prune_table_with_range_step` - if limiter.is_limit_reached() { - debug!( - target: "providers::db", - ?limiter, - deleted_entries_limit = %limiter.is_deleted_entries_limit_reached(), - time_limit = %limiter.is_time_limit_reached(), - table = %T::NAME, - "Pruning limit reached" - ); - break false - } - - let done = self.prune_table_with_range_step( - &mut walker, - limiter, - &mut skip_filter, - &mut delete_callback, - )?; - - if done { - break true - } - deleted_entries += 1; - }; - - Ok((deleted_entries, done)) - } - - /// Steps once with the given walker and prunes the entry in the table. - /// - /// Returns `true` if the walker is finished, `false` if it may have more data to prune. - /// - /// CAUTION: Pruner limits are not checked. This allows for a clean exit of a prune run that's - /// pruning different tables concurrently, by letting them step to the same height before - /// timing out. - pub fn prune_table_with_range_step( - &self, - walker: &mut RangeWalker<'_, T, ::CursorMut>, - limiter: &mut PruneLimiter, - skip_filter: &mut impl FnMut(&TableRow) -> bool, - delete_callback: &mut impl FnMut(TableRow), - ) -> Result { - let Some(res) = walker.next() else { return Ok(true) }; - - let row = res?; - - if !skip_filter(&row) { - walker.delete_current()?; - limiter.increment_deleted_entries_count(); - delete_callback(row); - } - - Ok(false) - } - /// Load shard and remove it. If list is empty, last shard was full or /// there are no shards at all. fn take_shard(&self, key: T::Key) -> ProviderResult> @@ -3690,7 +3577,7 @@ impl FinalizedBlockWriter for DatabaseProvider { } } -impl DBProvider for DatabaseProvider { +impl DBProvider for DatabaseProvider { type Tx = TX; fn tx_ref(&self) -> &Self::Tx { @@ -3700,6 +3587,10 @@ impl DBProvider for DatabaseProvider { fn tx_mut(&mut self) -> &mut Self::Tx { &mut self.tx } + + fn into_tx(self) -> Self::Tx { + self.tx + } } /// Helper method to recover senders for any blocks in the db which do not have senders. This diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index 652d275f3..6ccdcfea2 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -14,6 +14,7 @@ use reth_blockchain_tree_api::{ }; use reth_chain_state::{ChainInfoTracker, ForkChoiceNotifications, ForkChoiceSubscriptions}; use reth_chainspec::{ChainInfo, ChainSpec}; +use reth_db::Database; use reth_db_api::models::{AccountBeforeTx, StoredBlockBodyIndices}; use reth_evm::ConfigureEvmEnv; use reth_node_types::NodeTypesWithDB; @@ -171,11 +172,16 @@ where impl DatabaseProviderFactory for BlockchainProvider { type DB = N::DB; - type Provider = DatabaseProviderRO; + type Provider = DatabaseProvider<::TX>; + type ProviderRW = DatabaseProvider<::TXMut>; fn database_provider_ro(&self) -> ProviderResult { self.database.provider() } + + fn database_provider_rw(&self) -> ProviderResult { + self.database.provider_rw().map(|p| p.0) + } } impl StaticFileProviderFactory for BlockchainProvider { diff --git a/crates/storage/provider/src/test_utils/mock.rs b/crates/storage/provider/src/test_utils/mock.rs index cf3e358b5..2ee1fa8f2 100644 --- a/crates/storage/provider/src/test_utils/mock.rs +++ b/crates/storage/provider/src/test_utils/mock.rs @@ -146,10 +146,15 @@ impl MockEthProvider { impl DatabaseProviderFactory for MockEthProvider { type DB = DatabaseMock; type Provider = DatabaseProvider; + type ProviderRW = DatabaseProvider; fn database_provider_ro(&self) -> ProviderResult { Err(ConsistentViewError::Syncing { best_block: GotExpected::new(0, 0) }.into()) } + + fn database_provider_rw(&self) -> ProviderResult { + Err(ConsistentViewError::Syncing { best_block: GotExpected::new(0, 0) }.into()) + } } impl HeaderProvider for MockEthProvider { diff --git a/crates/storage/storage-api/src/database_provider.rs b/crates/storage/storage-api/src/database_provider.rs index fd15411d7..b3e233b0f 100644 --- a/crates/storage/storage-api/src/database_provider.rs +++ b/crates/storage/storage-api/src/database_provider.rs @@ -2,7 +2,7 @@ use reth_db_api::{database::Database, transaction::DbTx}; use reth_storage_errors::provider::ProviderResult; /// Database provider. -pub trait DBProvider: Send + Sync { +pub trait DBProvider: Send + Sync + Sized + 'static { /// Underlying database transaction held by the provider. type Tx: DbTx; @@ -12,18 +12,28 @@ pub trait DBProvider: Send + Sync { /// Returns a mutable reference to the underlying transaction. fn tx_mut(&mut self) -> &mut Self::Tx; + /// Consumes the provider and returns the underlying transaction. + fn into_tx(self) -> Self::Tx; + /// Disables long-lived read transaction safety guarantees for leaks prevention and /// observability improvements. /// /// CAUTION: In most of the cases, you want the safety guarantees for long read transactions /// enabled. Use this only if you're sure that no write transaction is open in parallel, meaning /// that Reth as a node is offline and not progressing. - fn disable_long_read_transaction_safety(&mut self) { + fn disable_long_read_transaction_safety(mut self) -> Self { self.tx_mut().disable_long_read_transaction_safety(); + self + } + + /// Commit database transaction + fn commit(self) -> ProviderResult { + Ok(self.into_tx().commit()?) } } /// Database provider factory. +#[auto_impl::auto_impl(&, Arc)] pub trait DatabaseProviderFactory: Send + Sync { /// Database this factory produces providers for. type DB: Database; @@ -31,6 +41,12 @@ pub trait DatabaseProviderFactory: Send + Sync { /// Provider type returned by the factory. type Provider: DBProvider::TX>; + /// Read-write provider type returned by the factory. + type ProviderRW: DBProvider::TXMut>; + /// Create new read-only database provider. fn database_provider_ro(&self) -> ProviderResult; + + /// Create new read-write database provider. + fn database_provider_rw(&self) -> ProviderResult; }