refactor: make reth-prune independent of concrete DatabaseProvider (#10921)

This commit is contained in:
Arsenii Kulikov
2024-09-16 14:46:53 +03:00
committed by GitHub
parent 664f8b23be
commit 0fa8e83e16
31 changed files with 458 additions and 393 deletions

1
Cargo.lock generated
View File

@ -8211,7 +8211,6 @@ dependencies = [
"reth-errors",
"reth-exex-types",
"reth-metrics",
"reth-node-types",
"reth-provider",
"reth-prune-types",
"reth-stages",

View File

@ -8,8 +8,7 @@ use alloy_primitives::BlockNumber;
use futures::FutureExt;
use metrics::Counter;
use reth_errors::{RethError, RethResult};
use reth_node_types::NodeTypesWithDB;
use reth_provider::{providers::ProviderNodeTypes, ProviderFactory};
use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter};
use reth_prune::{Pruner, PrunerError, PrunerWithResult};
use reth_tasks::TaskSpawner;
use std::{
@ -21,15 +20,18 @@ use tokio::sync::oneshot;
/// Manages pruning under the control of the engine.
///
/// This type controls the [Pruner].
pub struct PruneHook<N: NodeTypesWithDB> {
pub struct PruneHook<PF: DatabaseProviderFactory> {
/// The current state of the pruner.
pruner_state: PrunerState<N>,
pruner_state: PrunerState<PF>,
/// The type that can spawn the pruner task.
pruner_task_spawner: Box<dyn TaskSpawner>,
metrics: Metrics,
}
impl<N: NodeTypesWithDB> fmt::Debug for PruneHook<N> {
impl<PF> fmt::Debug for PruneHook<PF>
where
PF: DatabaseProviderFactory<ProviderRW: fmt::Debug> + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PruneHook")
.field("pruner_state", &self.pruner_state)
@ -38,10 +40,10 @@ impl<N: NodeTypesWithDB> fmt::Debug for PruneHook<N> {
}
}
impl<N: ProviderNodeTypes> PruneHook<N> {
impl<PF: DatabaseProviderFactory> PruneHook<PF> {
/// Create a new instance
pub fn new(
pruner: Pruner<N::DB, ProviderFactory<N>>,
pruner: Pruner<PF::ProviderRW, PF>,
pruner_task_spawner: Box<dyn TaskSpawner>,
) -> Self {
Self {
@ -79,7 +81,13 @@ impl<N: ProviderNodeTypes> PruneHook<N> {
Poll::Ready(Ok(event))
}
}
impl<PF> PruneHook<PF>
where
PF: DatabaseProviderFactory<ProviderRW: PruneCheckpointReader + PruneCheckpointWriter>
+ 'static,
{
/// This will try to spawn the pruner if it is idle:
/// 1. Check if pruning is needed through [`Pruner::is_pruning_needed`].
///
@ -117,7 +125,11 @@ impl<N: ProviderNodeTypes> PruneHook<N> {
}
}
impl<N: ProviderNodeTypes> EngineHook for PruneHook<N> {
impl<PF> EngineHook for PruneHook<PF>
where
PF: DatabaseProviderFactory<ProviderRW: PruneCheckpointReader + PruneCheckpointWriter>
+ 'static,
{
fn name(&self) -> &'static str {
"Prune"
}
@ -152,16 +164,16 @@ impl<N: ProviderNodeTypes> EngineHook for PruneHook<N> {
/// running, it acquires the write lock over the database. This means that we cannot forward to the
/// blockchain tree any messages that would result in database writes, since it would result in a
/// deadlock.
enum PrunerState<N: NodeTypesWithDB> {
enum PrunerState<PF: DatabaseProviderFactory> {
/// Pruner is idle.
Idle(Option<Pruner<N::DB, ProviderFactory<N>>>),
Idle(Option<Pruner<PF::ProviderRW, PF>>),
/// Pruner is running and waiting for a response
Running(oneshot::Receiver<PrunerWithResult<N::DB, ProviderFactory<N>>>),
Running(oneshot::Receiver<PrunerWithResult<PF::ProviderRW, PF>>),
}
impl<N> fmt::Debug for PrunerState<N>
impl<PF> fmt::Debug for PrunerState<PF>
where
N: NodeTypesWithDB<DB: Debug, ChainSpec: Debug>,
PF: DatabaseProviderFactory<ProviderRW: Debug> + Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {

View File

@ -24,7 +24,7 @@ use reth_payload_builder::test_utils::spawn_test_payload_service;
use reth_provider::{
providers::BlockchainProvider,
test_utils::{create_test_provider_factory_with_chain_spec, MockNodeTypesWithDB},
ExecutionOutcome, ProviderFactory,
ExecutionOutcome,
};
use reth_prune::Pruner;
use reth_prune_types::PruneModes;
@ -397,7 +397,7 @@ where
let blockchain_provider =
BlockchainProvider::with_blocks(provider_factory.clone(), tree, genesis_block, None);
let pruner = Pruner::<_, ProviderFactory<_>>::new(
let pruner = Pruner::new_with_factory(
provider_factory.clone(),
vec![],
5,

View File

@ -19,7 +19,7 @@ use reth_node_types::NodeTypesWithEngine;
use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_validator::ExecutionPayloadValidator;
use reth_provider::{providers::BlockchainProvider2, ProviderFactory};
use reth_prune::Pruner;
use reth_prune::PrunerWithFactory;
use reth_stages_api::{MetricEventsSender, Pipeline};
use reth_tasks::TaskSpawner;
use std::{
@ -73,7 +73,7 @@ where
pipeline_task_spawner: Box<dyn TaskSpawner>,
provider: ProviderFactory<N>,
blockchain_db: BlockchainProvider2<N>,
pruner: Pruner<N::DB, ProviderFactory<N>>,
pruner: PrunerWithFactory<ProviderFactory<N>>,
payload_builder: PayloadBuilderHandle<N::Engine>,
tree_config: TreeConfig,
invalid_block_hook: Box<dyn InvalidBlockHook>,
@ -147,6 +147,7 @@ mod tests {
use reth_network_p2p::test_utils::TestFullBlockClient;
use reth_primitives::SealedHeader;
use reth_provider::test_utils::create_test_provider_factory_with_chain_spec;
use reth_prune::Pruner;
use reth_tasks::TokioTaskExecutor;
use std::sync::Arc;
use tokio::sync::{mpsc::unbounded_channel, watch};
@ -178,8 +179,7 @@ mod tests {
.unwrap();
let (_tx, rx) = watch::channel(FinishedExExHeight::NoExExs);
let pruner =
Pruner::<_, ProviderFactory<_>>::new(provider_factory.clone(), vec![], 0, 0, None, rx);
let pruner = Pruner::new_with_factory(provider_factory.clone(), vec![], 0, 0, None, rx);
let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
let (tx, _rx) = unbounded_channel();

View File

@ -1,13 +1,12 @@
use crate::metrics::PersistenceMetrics;
use reth_chain_state::ExecutedBlock;
use reth_errors::ProviderError;
use reth_node_types::NodeTypesWithDB;
use reth_primitives::BlockNumHash;
use reth_provider::{
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockHashReader, ProviderFactory,
StaticFileProviderFactory,
};
use reth_prune::{Pruner, PrunerError, PrunerOutput};
use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory};
use reth_stages_api::{MetricEvent, MetricEventsSender};
use std::{
sync::mpsc::{Receiver, SendError, Sender},
@ -25,13 +24,13 @@ use tracing::{debug, error};
/// This should be spawned in its own thread with [`std::thread::spawn`], since this performs
/// blocking I/O operations in an endless loop.
#[derive(Debug)]
pub struct PersistenceService<N: NodeTypesWithDB> {
pub struct PersistenceService<N: ProviderNodeTypes> {
/// The provider factory to use
provider: ProviderFactory<N>,
/// Incoming requests
incoming: Receiver<PersistenceAction>,
/// The pruner
pruner: Pruner<N::DB, ProviderFactory<N>>,
pruner: PrunerWithFactory<ProviderFactory<N>>,
/// metrics
metrics: PersistenceMetrics,
/// Sender for sync metrics - we only submit sync metrics for persisted blocks
@ -43,7 +42,7 @@ impl<N: ProviderNodeTypes> PersistenceService<N> {
pub fn new(
provider: ProviderFactory<N>,
incoming: Receiver<PersistenceAction>,
pruner: Pruner<N::DB, ProviderFactory<N>>,
pruner: PrunerWithFactory<ProviderFactory<N>>,
sync_metrics_tx: MetricEventsSender,
) -> Self {
Self { provider, incoming, pruner, metrics: PersistenceMetrics::default(), sync_metrics_tx }
@ -187,7 +186,7 @@ impl PersistenceHandle {
/// Create a new [`PersistenceHandle`], and spawn the persistence service.
pub fn spawn_service<N: ProviderNodeTypes>(
provider_factory: ProviderFactory<N>,
pruner: Pruner<N::DB, ProviderFactory<N>>,
pruner: PrunerWithFactory<ProviderFactory<N>>,
sync_metrics_tx: MetricEventsSender,
) -> Self {
// create the initial channels
@ -268,7 +267,7 @@ mod tests {
use reth_chain_state::test_utils::TestBlockBuilder;
use reth_exex_types::FinishedExExHeight;
use reth_primitives::B256;
use reth_provider::{test_utils::create_test_provider_factory, ProviderFactory};
use reth_provider::test_utils::create_test_provider_factory;
use reth_prune::Pruner;
use tokio::sync::mpsc::unbounded_channel;
@ -278,14 +277,8 @@ mod tests {
let (_finished_exex_height_tx, finished_exex_height_rx) =
tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
let pruner = Pruner::<_, ProviderFactory<_>>::new(
provider.clone(),
vec![],
5,
0,
None,
finished_exex_height_rx,
);
let pruner =
Pruner::new_with_factory(provider.clone(), vec![], 5, 0, None, finished_exex_height_rx);
let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
PersistenceHandle::spawn_service(provider, pruner, sync_metrics_tx)

View File

@ -23,7 +23,6 @@ reth-tokio-util.workspace = true
reth-config.workspace = true
reth-prune-types.workspace = true
reth-static-file-types.workspace = true
reth-node-types.workspace = true
# metrics
reth-metrics.workspace = true

View File

@ -1,10 +1,12 @@
use crate::{segments::SegmentSet, Pruner};
use reth_chainspec::MAINNET;
use reth_config::PruneConfig;
use reth_db_api::database::Database;
use reth_db::transaction::DbTxMut;
use reth_exex_types::FinishedExExHeight;
use reth_node_types::NodeTypesWithDB;
use reth_provider::{providers::StaticFileProvider, ProviderFactory, StaticFileProviderFactory};
use reth_provider::{
providers::StaticFileProvider, BlockReader, DBProvider, DatabaseProviderFactory,
PruneCheckpointWriter, StaticFileProviderFactory, TransactionsProvider,
};
use reth_prune_types::PruneModes;
use std::time::Duration;
use tokio::sync::watch;
@ -72,14 +74,15 @@ impl PrunerBuilder {
}
/// Builds a [Pruner] from the current configuration with the given provider factory.
pub fn build_with_provider_factory<N: NodeTypesWithDB>(
self,
provider_factory: ProviderFactory<N>,
) -> Pruner<N::DB, ProviderFactory<N>> {
pub fn build_with_provider_factory<PF>(self, provider_factory: PF) -> Pruner<PF::ProviderRW, PF>
where
PF: DatabaseProviderFactory<ProviderRW: PruneCheckpointWriter + BlockReader>
+ StaticFileProviderFactory,
{
let segments =
SegmentSet::from_components(provider_factory.static_file_provider(), self.segments);
Pruner::<_, ProviderFactory<N>>::new(
Pruner::new_with_factory(
provider_factory,
segments.into_vec(),
self.block_interval,
@ -90,10 +93,14 @@ impl PrunerBuilder {
}
/// Builds a [Pruner] from the current configuration with the given static file provider.
pub fn build<DB: Database>(self, static_file_provider: StaticFileProvider) -> Pruner<DB, ()> {
let segments = SegmentSet::<DB>::from_components(static_file_provider, self.segments);
pub fn build<Provider>(self, static_file_provider: StaticFileProvider) -> Pruner<Provider, ()>
where
Provider:
DBProvider<Tx: DbTxMut> + BlockReader + PruneCheckpointWriter + TransactionsProvider,
{
let segments = SegmentSet::<Provider>::from_components(static_file_provider, self.segments);
Pruner::<_, ()>::new(
Pruner::new(
segments.into_vec(),
self.block_interval,
self.delete_limit,

View File

@ -0,0 +1,127 @@
use std::{fmt::Debug, ops::RangeBounds};
use reth_db::{
cursor::{DbCursorRO, DbCursorRW, RangeWalker},
table::{Table, TableRow},
transaction::DbTxMut,
DatabaseError,
};
use reth_prune_types::PruneLimiter;
use tracing::debug;
pub(crate) trait DbTxPruneExt: DbTxMut {
/// Prune the table for the specified pre-sorted key iterator.
///
/// Returns number of rows pruned.
fn prune_table_with_iterator<T: Table>(
&self,
keys: impl IntoIterator<Item = T::Key>,
limiter: &mut PruneLimiter,
mut delete_callback: impl FnMut(TableRow<T>),
) -> Result<(usize, bool), DatabaseError> {
let mut cursor = self.cursor_write::<T>()?;
let mut keys = keys.into_iter();
let mut deleted_entries = 0;
for key in &mut keys {
if limiter.is_limit_reached() {
debug!(
target: "providers::db",
?limiter,
deleted_entries_limit = %limiter.is_deleted_entries_limit_reached(),
time_limit = %limiter.is_time_limit_reached(),
table = %T::NAME,
"Pruning limit reached"
);
break
}
let row = cursor.seek_exact(key)?;
if let Some(row) = row {
cursor.delete_current()?;
limiter.increment_deleted_entries_count();
deleted_entries += 1;
delete_callback(row);
}
}
let done = keys.next().is_none();
Ok((deleted_entries, done))
}
/// Prune the table for the specified key range.
///
/// Returns number of rows pruned.
fn prune_table_with_range<T: Table>(
&self,
keys: impl RangeBounds<T::Key> + Clone + Debug,
limiter: &mut PruneLimiter,
mut skip_filter: impl FnMut(&TableRow<T>) -> bool,
mut delete_callback: impl FnMut(TableRow<T>),
) -> Result<(usize, bool), DatabaseError> {
let mut cursor = self.cursor_write::<T>()?;
let mut walker = cursor.walk_range(keys)?;
let mut deleted_entries = 0;
let done = loop {
// check for time out must be done in this scope since it's not done in
// `prune_table_with_range_step`
if limiter.is_limit_reached() {
debug!(
target: "providers::db",
?limiter,
deleted_entries_limit = %limiter.is_deleted_entries_limit_reached(),
time_limit = %limiter.is_time_limit_reached(),
table = %T::NAME,
"Pruning limit reached"
);
break false
}
let done = self.prune_table_with_range_step(
&mut walker,
limiter,
&mut skip_filter,
&mut delete_callback,
)?;
if done {
break true
}
deleted_entries += 1;
};
Ok((deleted_entries, done))
}
/// Steps once with the given walker and prunes the entry in the table.
///
/// Returns `true` if the walker is finished, `false` if it may have more data to prune.
///
/// CAUTION: Pruner limits are not checked. This allows for a clean exit of a prune run that's
/// pruning different tables concurrently, by letting them step to the same height before
/// timing out.
fn prune_table_with_range_step<T: Table>(
&self,
walker: &mut RangeWalker<'_, T, Self::CursorMut<T>>,
limiter: &mut PruneLimiter,
skip_filter: &mut impl FnMut(&TableRow<T>) -> bool,
delete_callback: &mut impl FnMut(TableRow<T>),
) -> Result<bool, DatabaseError> {
let Some(res) = walker.next() else { return Ok(true) };
let row = res?;
if !skip_filter(&row) {
walker.delete_current()?;
limiter.increment_deleted_entries_count();
delete_callback(row);
}
Ok(false)
}
}
impl<Tx> DbTxPruneExt for Tx where Tx: DbTxMut {}

View File

@ -10,6 +10,7 @@
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
mod builder;
mod db_ext;
mod error;
mod event;
mod metrics;
@ -20,7 +21,7 @@ use crate::metrics::Metrics;
pub use builder::PrunerBuilder;
pub use error::PrunerError;
pub use event::PrunerEvent;
pub use pruner::{Pruner, PrunerResult, PrunerWithResult};
pub use pruner::{Pruner, PrunerResult, PrunerWithFactory, PrunerWithResult};
// Re-export prune types
#[doc(inline)]

View File

@ -5,11 +5,9 @@ use crate::{
Metrics, PrunerError, PrunerEvent,
};
use alloy_primitives::BlockNumber;
use reth_db_api::database::Database;
use reth_exex_types::FinishedExExHeight;
use reth_node_types::NodeTypesWithDB;
use reth_provider::{
providers::ProviderNodeTypes, DatabaseProviderRW, ProviderFactory, PruneCheckpointReader,
DBProvider, DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
};
use reth_prune_types::{PruneLimiter, PruneProgress, PruneSegment, PrunerOutput};
use reth_tokio_util::{EventSender, EventStream};
@ -25,12 +23,15 @@ pub type PrunerWithResult<S, DB> = (Pruner<S, DB>, PrunerResult);
type PrunerStats = Vec<(PruneSegment, usize, PruneProgress)>;
/// Pruner with preset provider factory.
pub type PrunerWithFactory<PF> = Pruner<<PF as DatabaseProviderFactory>::ProviderRW, PF>;
/// Pruning routine. Main pruning logic happens in [`Pruner::run`].
#[derive(Debug)]
pub struct Pruner<DB, PF> {
pub struct Pruner<Provider, PF> {
/// Provider factory. If pruner is initialized without it, it will be set to `()`.
provider_factory: PF,
segments: Vec<Box<dyn Segment<DB>>>,
segments: Vec<Box<dyn Segment<Provider>>>,
/// Minimum pruning interval measured in blocks. All prune segments are checked and, if needed,
/// pruned, when the chain advances by the specified number of blocks.
min_block_interval: usize,
@ -49,10 +50,10 @@ pub struct Pruner<DB, PF> {
event_sender: EventSender<PrunerEvent>,
}
impl<DB> Pruner<DB, ()> {
impl<Provider> Pruner<Provider, ()> {
/// Creates a new [Pruner] without a provider factory.
pub fn new(
segments: Vec<Box<dyn Segment<DB>>>,
segments: Vec<Box<dyn Segment<Provider>>>,
min_block_interval: usize,
delete_limit: usize,
timeout: Option<Duration>,
@ -72,11 +73,14 @@ impl<DB> Pruner<DB, ()> {
}
}
impl<N: NodeTypesWithDB> Pruner<N::DB, ProviderFactory<N>> {
impl<PF> Pruner<PF::ProviderRW, PF>
where
PF: DatabaseProviderFactory,
{
/// Crates a new pruner with the given provider factory.
pub fn new(
provider_factory: ProviderFactory<N>,
segments: Vec<Box<dyn Segment<N::DB>>>,
pub fn new_with_factory(
provider_factory: PF,
segments: Vec<Box<dyn Segment<PF::ProviderRW>>>,
min_block_interval: usize,
delete_limit: usize,
timeout: Option<Duration>,
@ -96,15 +100,23 @@ impl<N: NodeTypesWithDB> Pruner<N::DB, ProviderFactory<N>> {
}
}
impl<DB: Database, S> Pruner<DB, S> {
impl<Provider, S> Pruner<Provider, S>
where
Provider: PruneCheckpointReader + PruneCheckpointWriter,
{
/// Listen for events on the pruner.
pub fn events(&self) -> EventStream<PrunerEvent> {
self.event_sender.new_listener()
}
fn run_with_provider(
/// Run the pruner with the given provider. This will only prune data up to the highest finished
/// `ExEx` height, if there are no `ExExes`.
///
/// Returns a [`PruneProgress`], indicating whether pruning is finished, or there is more data
/// to prune.
pub fn run_with_provider(
&mut self,
provider: &DatabaseProviderRW<DB>,
provider: &Provider,
tip_block_number: BlockNumber,
) -> PrunerResult {
let Some(tip_block_number) =
@ -165,7 +177,7 @@ impl<DB: Database, S> Pruner<DB, S> {
/// Returns [`PrunerStats`], total number of entries pruned, and [`PruneProgress`].
fn prune_segments(
&mut self,
provider: &DatabaseProviderRW<DB>,
provider: &Provider,
tip_block_number: BlockNumber,
limiter: &mut PruneLimiter,
) -> Result<(PrunerStats, usize, PrunerOutput), PrunerError> {
@ -299,23 +311,10 @@ impl<DB: Database, S> Pruner<DB, S> {
}
}
impl<DB: Database> Pruner<DB, ()> {
/// Run the pruner with the given provider. This will only prune data up to the highest finished
/// ExEx height, if there are no ExExes.
///
/// Returns a [`PruneProgress`], indicating whether pruning is finished, or there is more data
/// to prune.
#[allow(clippy::doc_markdown)]
pub fn run(
&mut self,
provider: &DatabaseProviderRW<DB>,
tip_block_number: BlockNumber,
) -> PrunerResult {
self.run_with_provider(provider, tip_block_number)
}
}
impl<N: ProviderNodeTypes> Pruner<N::DB, ProviderFactory<N>> {
impl<PF> Pruner<PF::ProviderRW, PF>
where
PF: DatabaseProviderFactory<ProviderRW: PruneCheckpointWriter + PruneCheckpointReader>,
{
/// Run the pruner. This will only prune data up to the highest finished ExEx height, if there
/// are no ExExes.
///
@ -323,7 +322,7 @@ impl<N: ProviderNodeTypes> Pruner<N::DB, ProviderFactory<N>> {
/// to prune.
#[allow(clippy::doc_markdown)]
pub fn run(&mut self, tip_block_number: BlockNumber) -> PrunerResult {
let provider = self.provider_factory.provider_rw()?;
let provider = self.provider_factory.database_provider_rw()?;
let result = self.run_with_provider(&provider, tip_block_number);
provider.commit()?;
result
@ -334,7 +333,7 @@ impl<N: ProviderNodeTypes> Pruner<N::DB, ProviderFactory<N>> {
mod tests {
use crate::Pruner;
use reth_exex_types::FinishedExExHeight;
use reth_provider::{test_utils::create_test_provider_factory, ProviderFactory};
use reth_provider::test_utils::create_test_provider_factory;
#[test]
fn is_pruning_needed() {
@ -343,14 +342,8 @@ mod tests {
let (finished_exex_height_tx, finished_exex_height_rx) =
tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
let mut pruner = Pruner::<_, ProviderFactory<_>>::new(
provider_factory,
vec![],
5,
0,
None,
finished_exex_height_rx,
);
let mut pruner =
Pruner::new_with_factory(provider_factory, vec![], 5, 0, None, finished_exex_height_rx);
// No last pruned block number was set before
let first_block_number = 1;

View File

@ -5,10 +5,7 @@ mod user;
use crate::PrunerError;
use alloy_primitives::{BlockNumber, TxNumber};
use reth_db_api::database::Database;
use reth_provider::{
errors::provider::ProviderResult, BlockReader, DatabaseProviderRW, PruneCheckpointWriter,
};
use reth_provider::{errors::provider::ProviderResult, BlockReader, PruneCheckpointWriter};
use reth_prune_types::{
PruneCheckpoint, PruneLimiter, PruneMode, PrunePurpose, PruneSegment, SegmentOutput,
};
@ -31,7 +28,7 @@ pub use user::{
/// 2. If [`Segment::prune`] returned a [Some] in `checkpoint` of [`SegmentOutput`], call
/// [`Segment::save_checkpoint`].
/// 3. Subtract `pruned` of [`SegmentOutput`] from `delete_limit` of next [`PruneInput`].
pub trait Segment<DB: Database>: Debug + Send + Sync {
pub trait Segment<Provider>: Debug + Send + Sync {
/// Segment of data that's pruned.
fn segment(&self) -> PruneSegment;
@ -42,18 +39,17 @@ pub trait Segment<DB: Database>: Debug + Send + Sync {
fn purpose(&self) -> PrunePurpose;
/// Prune data for [`Self::segment`] using the provided input.
fn prune(
&self,
provider: &DatabaseProviderRW<DB>,
input: PruneInput,
) -> Result<SegmentOutput, PrunerError>;
fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError>;
/// Save checkpoint for [`Self::segment`] to the database.
fn save_checkpoint(
&self,
provider: &DatabaseProviderRW<DB>,
provider: &Provider,
checkpoint: PruneCheckpoint,
) -> ProviderResult<()> {
) -> ProviderResult<()>
where
Provider: PruneCheckpointWriter,
{
provider.save_prune_checkpoint(self.segment(), checkpoint)
}
}
@ -78,9 +74,9 @@ impl PruneInput {
/// 2. If checkpoint doesn't exist, return 0.
///
/// To get the range end: get last tx number for `to_block`.
pub(crate) fn get_next_tx_num_range<DB: Database>(
pub(crate) fn get_next_tx_num_range<Provider: BlockReader>(
&self,
provider: &DatabaseProviderRW<DB>,
provider: &Provider,
) -> ProviderResult<Option<RangeInclusive<TxNumber>>> {
let from_tx_number = self.previous_checkpoint
// Checkpoint exists, prune from the next transaction after the highest pruned one

View File

@ -5,11 +5,10 @@
//! - [`crate::segments::static_file::Receipts`] is responsible for pruning receipts on an archive
//! node after static file producer has finished
use crate::{segments::PruneInput, PrunerError};
use reth_db::tables;
use reth_db_api::database::Database;
use crate::{db_ext::DbTxPruneExt, segments::PruneInput, PrunerError};
use reth_db::{tables, transaction::DbTxMut};
use reth_provider::{
errors::provider::ProviderResult, DatabaseProviderRW, PruneCheckpointWriter,
errors::provider::ProviderResult, BlockReader, DBProvider, PruneCheckpointWriter,
TransactionsProvider,
};
use reth_prune_types::{
@ -17,10 +16,13 @@ use reth_prune_types::{
};
use tracing::trace;
pub(crate) fn prune<DB: Database>(
provider: &DatabaseProviderRW<DB>,
pub(crate) fn prune<Provider>(
provider: &Provider,
input: PruneInput,
) -> Result<SegmentOutput, PrunerError> {
) -> Result<SegmentOutput, PrunerError>
where
Provider: DBProvider<Tx: DbTxMut> + TransactionsProvider + BlockReader,
{
let tx_range = match input.get_next_tx_num_range(provider)? {
Some(range) => range,
None => {
@ -33,7 +35,7 @@ pub(crate) fn prune<DB: Database>(
let mut limiter = input.limiter;
let mut last_pruned_transaction = tx_range_end;
let (pruned, done) = provider.prune_table_with_range::<tables::Receipts>(
let (pruned, done) = provider.tx_ref().prune_table_with_range::<tables::Receipts>(
tx_range,
&mut limiter,
|_| false,
@ -60,8 +62,8 @@ pub(crate) fn prune<DB: Database>(
})
}
pub(crate) fn save_checkpoint<DB: Database>(
provider: &DatabaseProviderRW<DB>,
pub(crate) fn save_checkpoint(
provider: impl PruneCheckpointWriter,
checkpoint: PruneCheckpoint,
) -> ProviderResult<()> {
provider.save_prune_checkpoint(PruneSegment::Receipts, checkpoint)?;
@ -83,7 +85,7 @@ mod tests {
Itertools,
};
use reth_db::tables;
use reth_provider::PruneCheckpointReader;
use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
use reth_prune_types::{
PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress, PruneSegment,
};
@ -158,7 +160,7 @@ mod tests {
)
.sub(1);
let provider = db.factory.provider_rw().unwrap();
let provider = db.factory.database_provider_rw().unwrap();
let result = super::prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);

View File

@ -2,32 +2,35 @@ use crate::segments::{
AccountHistory, ReceiptsByLogs, Segment, SenderRecovery, StorageHistory, TransactionLookup,
UserReceipts,
};
use reth_db_api::database::Database;
use reth_provider::providers::StaticFileProvider;
use reth_db::transaction::DbTxMut;
use reth_provider::{
providers::StaticFileProvider, BlockReader, DBProvider, PruneCheckpointWriter,
TransactionsProvider,
};
use reth_prune_types::PruneModes;
use super::{StaticFileHeaders, StaticFileReceipts, StaticFileTransactions};
/// Collection of [Segment]. Thread-safe, allocated on the heap.
#[derive(Debug)]
pub struct SegmentSet<DB: Database> {
inner: Vec<Box<dyn Segment<DB>>>,
pub struct SegmentSet<Provider> {
inner: Vec<Box<dyn Segment<Provider>>>,
}
impl<DB: Database> SegmentSet<DB> {
impl<Provider> SegmentSet<Provider> {
/// Returns empty [`SegmentSet`] collection.
pub fn new() -> Self {
Self::default()
}
/// Adds new [Segment] to collection.
pub fn segment<S: Segment<DB> + 'static>(mut self, segment: S) -> Self {
pub fn segment<S: Segment<Provider> + 'static>(mut self, segment: S) -> Self {
self.inner.push(Box::new(segment));
self
}
/// Adds new [Segment] to collection if it's [Some].
pub fn segment_opt<S: Segment<DB> + 'static>(self, segment: Option<S>) -> Self {
pub fn segment_opt<S: Segment<Provider> + 'static>(self, segment: Option<S>) -> Self {
if let Some(segment) = segment {
return self.segment(segment)
}
@ -35,10 +38,15 @@ impl<DB: Database> SegmentSet<DB> {
}
/// Consumes [`SegmentSet`] and returns a [Vec].
pub fn into_vec(self) -> Vec<Box<dyn Segment<DB>>> {
pub fn into_vec(self) -> Vec<Box<dyn Segment<Provider>>> {
self.inner
}
}
impl<Provider> SegmentSet<Provider>
where
Provider: DBProvider<Tx: DbTxMut> + TransactionsProvider + PruneCheckpointWriter + BlockReader,
{
/// Creates a [`SegmentSet`] from an existing components, such as [`StaticFileProvider`] and
/// [`PruneModes`].
pub fn from_components(
@ -79,7 +87,7 @@ impl<DB: Database> SegmentSet<DB> {
}
}
impl<DB: Database> Default for SegmentSet<DB> {
impl<Provider> Default for SegmentSet<Provider> {
fn default() -> Self {
Self { inner: Vec::new() }
}

View File

@ -1,6 +1,7 @@
use std::num::NonZeroUsize;
use crate::{
db_ext::DbTxPruneExt,
segments::{PruneInput, Segment},
PrunerError,
};
@ -8,11 +9,10 @@ use alloy_primitives::BlockNumber;
use itertools::Itertools;
use reth_db::{
cursor::{DbCursorRO, RangeWalker},
database::Database,
tables,
transaction::DbTxMut,
};
use reth_provider::{providers::StaticFileProvider, DatabaseProviderRW};
use reth_provider::{providers::StaticFileProvider, DBProvider};
use reth_prune_types::{
PruneLimiter, PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput,
SegmentOutputCheckpoint,
@ -34,7 +34,7 @@ impl Headers {
}
}
impl<DB: Database> Segment<DB> for Headers {
impl<Provider: DBProvider<Tx: DbTxMut>> Segment<Provider> for Headers {
fn segment(&self) -> PruneSegment {
PruneSegment::Headers
}
@ -49,11 +49,7 @@ impl<DB: Database> Segment<DB> for Headers {
PrunePurpose::StaticFile
}
fn prune(
&self,
provider: &DatabaseProviderRW<DB>,
input: PruneInput,
) -> Result<SegmentOutput, PrunerError> {
fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
let (block_range_start, block_range_end) = match input.get_next_block_range() {
Some(range) => (*range.start(), *range.end()),
None => {
@ -106,18 +102,19 @@ impl<DB: Database> Segment<DB> for Headers {
})
}
}
type Walker<'a, DB, T> = RangeWalker<'a, T, <<DB as Database>::TXMut as DbTxMut>::CursorMut<T>>;
type Walker<'a, Provider, T> =
RangeWalker<'a, T, <<Provider as DBProvider>::Tx as DbTxMut>::CursorMut<T>>;
#[allow(missing_debug_implementations)]
struct HeaderTablesIter<'a, DB>
struct HeaderTablesIter<'a, Provider>
where
DB: Database,
Provider: DBProvider<Tx: DbTxMut>,
{
provider: &'a DatabaseProviderRW<DB>,
provider: &'a Provider,
limiter: &'a mut PruneLimiter,
headers_walker: Walker<'a, DB, tables::Headers>,
header_tds_walker: Walker<'a, DB, tables::HeaderTerminalDifficulties>,
canonical_headers_walker: Walker<'a, DB, tables::CanonicalHeaders>,
headers_walker: Walker<'a, Provider, tables::Headers>,
header_tds_walker: Walker<'a, Provider, tables::HeaderTerminalDifficulties>,
canonical_headers_walker: Walker<'a, Provider, tables::CanonicalHeaders>,
}
struct HeaderTablesIterItem {
@ -125,24 +122,24 @@ struct HeaderTablesIterItem {
entries_pruned: usize,
}
impl<'a, DB> HeaderTablesIter<'a, DB>
impl<'a, Provider> HeaderTablesIter<'a, Provider>
where
DB: Database,
Provider: DBProvider<Tx: DbTxMut>,
{
fn new(
provider: &'a DatabaseProviderRW<DB>,
provider: &'a Provider,
limiter: &'a mut PruneLimiter,
headers_walker: Walker<'a, DB, tables::Headers>,
header_tds_walker: Walker<'a, DB, tables::HeaderTerminalDifficulties>,
canonical_headers_walker: Walker<'a, DB, tables::CanonicalHeaders>,
headers_walker: Walker<'a, Provider, tables::Headers>,
header_tds_walker: Walker<'a, Provider, tables::HeaderTerminalDifficulties>,
canonical_headers_walker: Walker<'a, Provider, tables::CanonicalHeaders>,
) -> Self {
Self { provider, limiter, headers_walker, header_tds_walker, canonical_headers_walker }
}
}
impl<'a, DB> Iterator for HeaderTablesIter<'a, DB>
impl<'a, Provider> Iterator for HeaderTablesIter<'a, Provider>
where
DB: Database,
Provider: DBProvider<Tx: DbTxMut>,
{
type Item = Result<HeaderTablesIterItem, PrunerError>;
fn next(&mut self) -> Option<Self::Item> {
@ -154,7 +151,7 @@ where
let mut pruned_block_td = None;
let mut pruned_block_canonical = None;
if let Err(err) = self.provider.prune_table_with_range_step(
if let Err(err) = self.provider.tx_ref().prune_table_with_range_step(
&mut self.headers_walker,
self.limiter,
&mut |_| false,
@ -163,7 +160,7 @@ where
return Some(Err(err.into()))
}
if let Err(err) = self.provider.prune_table_with_range_step(
if let Err(err) = self.provider.tx_ref().prune_table_with_range_step(
&mut self.header_tds_walker,
self.limiter,
&mut |_| false,
@ -172,7 +169,7 @@ where
return Some(Err(err.into()))
}
if let Err(err) = self.provider.prune_table_with_range_step(
if let Err(err) = self.provider.tx_ref().prune_table_with_range_step(
&mut self.canonical_headers_walker,
self.limiter,
&mut |_| false,
@ -202,7 +199,10 @@ mod tests {
use assert_matches::assert_matches;
use reth_db::tables;
use reth_db_api::transaction::DbTx;
use reth_provider::{PruneCheckpointReader, PruneCheckpointWriter, StaticFileProviderFactory};
use reth_provider::{
DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
StaticFileProviderFactory,
};
use reth_prune_types::{
PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress,
PruneSegment, SegmentOutputCheckpoint,
@ -254,7 +254,7 @@ mod tests {
.map(|block_number| block_number + 1)
.unwrap_or_default();
let provider = db.factory.provider_rw().unwrap();
let provider = db.factory.database_provider_rw().unwrap();
let result = segment.prune(&provider, input.clone()).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
trace!(target: "pruner::test",
@ -325,7 +325,7 @@ mod tests {
limiter,
};
let provider = db.factory.provider_rw().unwrap();
let provider = db.factory.database_provider_rw().unwrap();
let segment = super::Headers::new(db.factory.static_file_provider());
let result = segment.prune(&provider, input).unwrap();
assert_eq!(

View File

@ -2,9 +2,10 @@ use crate::{
segments::{PruneInput, Segment},
PrunerError,
};
use reth_db_api::database::Database;
use reth_db::transaction::DbTxMut;
use reth_provider::{
errors::provider::ProviderResult, providers::StaticFileProvider, DatabaseProviderRW,
errors::provider::ProviderResult, providers::StaticFileProvider, BlockReader, DBProvider,
PruneCheckpointWriter, TransactionsProvider,
};
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, SegmentOutput};
use reth_static_file_types::StaticFileSegment;
@ -20,7 +21,10 @@ impl Receipts {
}
}
impl<DB: Database> Segment<DB> for Receipts {
impl<Provider> Segment<Provider> for Receipts
where
Provider: DBProvider<Tx: DbTxMut> + PruneCheckpointWriter + TransactionsProvider + BlockReader,
{
fn segment(&self) -> PruneSegment {
PruneSegment::Receipts
}
@ -35,17 +39,13 @@ impl<DB: Database> Segment<DB> for Receipts {
PrunePurpose::StaticFile
}
fn prune(
&self,
provider: &DatabaseProviderRW<DB>,
input: PruneInput,
) -> Result<SegmentOutput, PrunerError> {
fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
crate::segments::receipts::prune(provider, input)
}
fn save_checkpoint(
&self,
provider: &DatabaseProviderRW<DB>,
provider: &Provider,
checkpoint: PruneCheckpoint,
) -> ProviderResult<()> {
crate::segments::receipts::save_checkpoint(provider, checkpoint)

View File

@ -1,10 +1,10 @@
use crate::{
db_ext::DbTxPruneExt,
segments::{PruneInput, Segment},
PrunerError,
};
use reth_db::tables;
use reth_db_api::database::Database;
use reth_provider::{providers::StaticFileProvider, DatabaseProviderRW, TransactionsProvider};
use reth_db::{tables, transaction::DbTxMut};
use reth_provider::{providers::StaticFileProvider, BlockReader, DBProvider, TransactionsProvider};
use reth_prune_types::{
PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
@ -22,7 +22,10 @@ impl Transactions {
}
}
impl<DB: Database> Segment<DB> for Transactions {
impl<Provider> Segment<Provider> for Transactions
where
Provider: DBProvider<Tx: DbTxMut> + TransactionsProvider + BlockReader,
{
fn segment(&self) -> PruneSegment {
PruneSegment::Transactions
}
@ -37,11 +40,7 @@ impl<DB: Database> Segment<DB> for Transactions {
PrunePurpose::StaticFile
}
fn prune(
&self,
provider: &DatabaseProviderRW<DB>,
input: PruneInput,
) -> Result<SegmentOutput, PrunerError> {
fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
let tx_range = match input.get_next_tx_num_range(provider)? {
Some(range) => range,
None => {
@ -53,7 +52,7 @@ impl<DB: Database> Segment<DB> for Transactions {
let mut limiter = input.limiter;
let mut last_pruned_transaction = *tx_range.end();
let (pruned, done) = provider.prune_table_with_range::<tables::Transactions>(
let (pruned, done) = provider.tx_ref().prune_table_with_range::<tables::Transactions>(
tx_range,
&mut limiter,
|_| false,
@ -91,7 +90,10 @@ mod tests {
Itertools,
};
use reth_db::tables;
use reth_provider::{PruneCheckpointReader, PruneCheckpointWriter, StaticFileProviderFactory};
use reth_provider::{
DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
StaticFileProviderFactory,
};
use reth_prune_types::{
PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress,
PruneSegment, SegmentOutput,
@ -141,7 +143,7 @@ mod tests {
.map(|tx_number| tx_number + 1)
.unwrap_or_default();
let provider = db.factory.provider_rw().unwrap();
let provider = db.factory.database_provider_rw().unwrap();
let result = segment.prune(&provider, input.clone()).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);

View File

@ -1,11 +1,12 @@
use crate::{
db_ext::DbTxPruneExt,
segments::{user::history::prune_history_indices, PruneInput, Segment},
PrunerError,
};
use itertools::Itertools;
use reth_db::tables;
use reth_db_api::{database::Database, models::ShardedKey};
use reth_provider::DatabaseProviderRW;
use reth_db::{tables, transaction::DbTxMut};
use reth_db_api::models::ShardedKey;
use reth_provider::DBProvider;
use reth_prune_types::{
PruneInterruptReason, PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput,
SegmentOutputCheckpoint,
@ -30,7 +31,10 @@ impl AccountHistory {
}
}
impl<DB: Database> Segment<DB> for AccountHistory {
impl<Provider> Segment<Provider> for AccountHistory
where
Provider: DBProvider<Tx: DbTxMut>,
{
fn segment(&self) -> PruneSegment {
PruneSegment::AccountHistory
}
@ -44,11 +48,7 @@ impl<DB: Database> Segment<DB> for AccountHistory {
}
#[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
fn prune(
&self,
provider: &DatabaseProviderRW<DB>,
input: PruneInput,
) -> Result<SegmentOutput, PrunerError> {
fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
let range = match input.get_next_block_range() {
Some(range) => range,
None => {
@ -80,8 +80,8 @@ impl<DB: Database> Segment<DB> for AccountHistory {
// size should be up to 0.5MB + some hashmap overhead. `blocks_since_last_run` is
// additionally limited by the `max_reorg_depth`, so no OOM is expected here.
let mut highest_deleted_accounts = FxHashMap::default();
let (pruned_changesets, done) = provider
.prune_table_with_range::<tables::AccountChangeSets>(
let (pruned_changesets, done) =
provider.tx_ref().prune_table_with_range::<tables::AccountChangeSets>(
range,
&mut limiter,
|_| false,
@ -106,7 +106,7 @@ impl<DB: Database> Segment<DB> for AccountHistory {
.map(|(address, block_number)| {
ShardedKey::new(address, block_number.min(last_changeset_pruned_block))
});
let outcomes = prune_history_indices::<DB, tables::AccountsHistory, _>(
let outcomes = prune_history_indices::<Provider, tables::AccountsHistory, _>(
provider,
highest_sharded_keys,
|a, b| a.key == b.key,
@ -135,7 +135,7 @@ mod tests {
use alloy_primitives::{BlockNumber, B256};
use assert_matches::assert_matches;
use reth_db::{tables, BlockNumberList};
use reth_provider::PruneCheckpointReader;
use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
use reth_prune_types::{
PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress, PruneSegment,
};
@ -203,7 +203,7 @@ mod tests {
};
let segment = AccountHistory::new(prune_mode);
let provider = db.factory.provider_rw().unwrap();
let provider = db.factory.database_provider_rw().unwrap();
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);

View File

@ -2,13 +2,12 @@ use alloy_primitives::BlockNumber;
use reth_db::{BlockNumberList, RawKey, RawTable, RawValue};
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW},
database::Database,
models::ShardedKey,
table::Table,
transaction::DbTxMut,
DatabaseError,
};
use reth_provider::DatabaseProviderRW;
use reth_provider::DBProvider;
enum PruneShardOutcome {
Deleted,
@ -26,13 +25,13 @@ pub(crate) struct PrunedIndices {
/// Prune history indices according to the provided list of highest sharded keys.
///
/// Returns total number of deleted, updated and unchanged entities.
pub(crate) fn prune_history_indices<DB, T, SK>(
provider: &DatabaseProviderRW<DB>,
pub(crate) fn prune_history_indices<Provider, T, SK>(
provider: &Provider,
highest_sharded_keys: impl IntoIterator<Item = T::Key>,
key_matches: impl Fn(&T::Key, &T::Key) -> bool,
) -> Result<PrunedIndices, DatabaseError>
where
DB: Database,
Provider: DBProvider<Tx: DbTxMut>,
T: Table<Value = BlockNumberList>,
T::Key: AsRef<ShardedKey<SK>>,
{

View File

@ -2,8 +2,11 @@ use crate::{
segments::{PruneInput, Segment},
PrunerError,
};
use reth_db_api::database::Database;
use reth_provider::{errors::provider::ProviderResult, DatabaseProviderRW};
use reth_db::transaction::DbTxMut;
use reth_provider::{
errors::provider::ProviderResult, BlockReader, DBProvider, PruneCheckpointWriter,
TransactionsProvider,
};
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, SegmentOutput};
use tracing::instrument;
@ -18,7 +21,10 @@ impl Receipts {
}
}
impl<DB: Database> Segment<DB> for Receipts {
impl<Provider> Segment<Provider> for Receipts
where
Provider: DBProvider<Tx: DbTxMut> + PruneCheckpointWriter + TransactionsProvider + BlockReader,
{
fn segment(&self) -> PruneSegment {
PruneSegment::Receipts
}
@ -32,17 +38,13 @@ impl<DB: Database> Segment<DB> for Receipts {
}
#[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
fn prune(
&self,
provider: &DatabaseProviderRW<DB>,
input: PruneInput,
) -> Result<SegmentOutput, PrunerError> {
fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
crate::segments::receipts::prune(provider, input)
}
fn save_checkpoint(
&self,
provider: &DatabaseProviderRW<DB>,
provider: &Provider,
checkpoint: PruneCheckpoint,
) -> ProviderResult<()> {
crate::segments::receipts::save_checkpoint(provider, checkpoint)

View File

@ -1,10 +1,10 @@
use crate::{
db_ext::DbTxPruneExt,
segments::{PruneInput, Segment},
PrunerError,
};
use reth_db::tables;
use reth_db_api::database::Database;
use reth_provider::{BlockReader, DatabaseProviderRW, PruneCheckpointWriter, TransactionsProvider};
use reth_db::{tables, transaction::DbTxMut};
use reth_provider::{BlockReader, DBProvider, PruneCheckpointWriter, TransactionsProvider};
use reth_prune_types::{
PruneCheckpoint, PruneMode, PruneProgress, PrunePurpose, PruneSegment, ReceiptsLogPruneConfig,
SegmentOutput, MINIMUM_PRUNING_DISTANCE,
@ -22,7 +22,10 @@ impl ReceiptsByLogs {
}
}
impl<DB: Database> Segment<DB> for ReceiptsByLogs {
impl<Provider> Segment<Provider> for ReceiptsByLogs
where
Provider: DBProvider<Tx: DbTxMut> + PruneCheckpointWriter + TransactionsProvider + BlockReader,
{
fn segment(&self) -> PruneSegment {
PruneSegment::ContractLogs
}
@ -36,11 +39,7 @@ impl<DB: Database> Segment<DB> for ReceiptsByLogs {
}
#[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
fn prune(
&self,
provider: &DatabaseProviderRW<DB>,
input: PruneInput,
) -> Result<SegmentOutput, PrunerError> {
fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
// Contract log filtering removes every receipt possible except the ones in the list. So,
// for the other receipts it's as if they had a `PruneMode::Distance()` of
// `MINIMUM_PRUNING_DISTANCE`.
@ -143,7 +142,7 @@ impl<DB: Database> Segment<DB> for ReceiptsByLogs {
// Delete receipts, except the ones in the inclusion list
let mut last_skipped_transaction = 0;
let deleted;
(deleted, done) = provider.prune_table_with_range::<tables::Receipts>(
(deleted, done) = provider.tx_ref().prune_table_with_range::<tables::Receipts>(
tx_range,
&mut limiter,
|(tx_num, receipt)| {
@ -224,7 +223,7 @@ mod tests {
use assert_matches::assert_matches;
use reth_db::tables;
use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
use reth_provider::{PruneCheckpointReader, TransactionsProvider};
use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader, TransactionsProvider};
use reth_prune_types::{PruneLimiter, PruneMode, PruneSegment, ReceiptsLogPruneConfig};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use reth_testing_utils::generators::{
@ -286,7 +285,7 @@ mod tests {
);
let run_prune = || {
let provider = db.factory.provider_rw().unwrap();
let provider = db.factory.database_provider_rw().unwrap();
let prune_before_block: usize = 20;
let prune_mode = PruneMode::Before(prune_before_block as u64);

View File

@ -1,10 +1,10 @@
use crate::{
db_ext::DbTxPruneExt,
segments::{PruneInput, Segment},
PrunerError,
};
use reth_db::tables;
use reth_db_api::database::Database;
use reth_provider::{DatabaseProviderRW, TransactionsProvider};
use reth_db::{tables, transaction::DbTxMut};
use reth_provider::{BlockReader, DBProvider, TransactionsProvider};
use reth_prune_types::{
PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
@ -21,7 +21,10 @@ impl SenderRecovery {
}
}
impl<DB: Database> Segment<DB> for SenderRecovery {
impl<Provider> Segment<Provider> for SenderRecovery
where
Provider: DBProvider<Tx: DbTxMut> + TransactionsProvider + BlockReader,
{
fn segment(&self) -> PruneSegment {
PruneSegment::SenderRecovery
}
@ -35,11 +38,7 @@ impl<DB: Database> Segment<DB> for SenderRecovery {
}
#[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
fn prune(
&self,
provider: &DatabaseProviderRW<DB>,
input: PruneInput,
) -> Result<SegmentOutput, PrunerError> {
fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
let tx_range = match input.get_next_tx_num_range(provider)? {
Some(range) => range,
None => {
@ -52,12 +51,13 @@ impl<DB: Database> Segment<DB> for SenderRecovery {
let mut limiter = input.limiter;
let mut last_pruned_transaction = tx_range_end;
let (pruned, done) = provider.prune_table_with_range::<tables::TransactionSenders>(
tx_range,
&mut limiter,
|_| false,
|row| last_pruned_transaction = row.0,
)?;
let (pruned, done) =
provider.tx_ref().prune_table_with_range::<tables::TransactionSenders>(
tx_range,
&mut limiter,
|_| false,
|row| last_pruned_transaction = row.0,
)?;
trace!(target: "pruner", %pruned, %done, "Pruned transaction senders");
let last_pruned_block = provider
@ -90,7 +90,7 @@ mod tests {
Itertools,
};
use reth_db::tables;
use reth_provider::PruneCheckpointReader;
use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
use reth_prune_types::{PruneCheckpoint, PruneLimiter, PruneMode, PruneProgress, PruneSegment};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
@ -179,7 +179,7 @@ mod tests {
.into_inner()
.0;
let provider = db.factory.provider_rw().unwrap();
let provider = db.factory.database_provider_rw().unwrap();
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);

View File

@ -1,14 +1,12 @@
use crate::{
db_ext::DbTxPruneExt,
segments::{user::history::prune_history_indices, PruneInput, Segment, SegmentOutput},
PrunerError,
};
use itertools::Itertools;
use reth_db::tables;
use reth_db_api::{
database::Database,
models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress},
};
use reth_provider::DatabaseProviderRW;
use reth_db::{tables, transaction::DbTxMut};
use reth_db_api::models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress};
use reth_provider::DBProvider;
use reth_prune_types::{
PruneInterruptReason, PruneMode, PruneProgress, PrunePurpose, PruneSegment,
SegmentOutputCheckpoint,
@ -33,7 +31,10 @@ impl StorageHistory {
}
}
impl<DB: Database> Segment<DB> for StorageHistory {
impl<Provider> Segment<Provider> for StorageHistory
where
Provider: DBProvider<Tx: DbTxMut>,
{
fn segment(&self) -> PruneSegment {
PruneSegment::StorageHistory
}
@ -47,11 +48,7 @@ impl<DB: Database> Segment<DB> for StorageHistory {
}
#[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
fn prune(
&self,
provider: &DatabaseProviderRW<DB>,
input: PruneInput,
) -> Result<SegmentOutput, PrunerError> {
fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
let range = match input.get_next_block_range() {
Some(range) => range,
None => {
@ -83,8 +80,8 @@ impl<DB: Database> Segment<DB> for StorageHistory {
// size should be up to 0.5MB + some hashmap overhead. `blocks_since_last_run` is
// additionally limited by the `max_reorg_depth`, so no OOM is expected here.
let mut highest_deleted_storages = FxHashMap::default();
let (pruned_changesets, done) = provider
.prune_table_with_range::<tables::StorageChangeSets>(
let (pruned_changesets, done) =
provider.tx_ref().prune_table_with_range::<tables::StorageChangeSets>(
BlockNumberAddress::range(range),
&mut limiter,
|_| false,
@ -114,7 +111,7 @@ impl<DB: Database> Segment<DB> for StorageHistory {
block_number.min(last_changeset_pruned_block),
)
});
let outcomes = prune_history_indices::<DB, tables::StoragesHistory, _>(
let outcomes = prune_history_indices::<Provider, tables::StoragesHistory, _>(
provider,
highest_sharded_keys,
|a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key,
@ -143,7 +140,7 @@ mod tests {
use alloy_primitives::{BlockNumber, B256};
use assert_matches::assert_matches;
use reth_db::{tables, BlockNumberList};
use reth_provider::PruneCheckpointReader;
use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
use reth_prune_types::{PruneCheckpoint, PruneLimiter, PruneMode, PruneProgress, PruneSegment};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use reth_testing_utils::generators::{
@ -210,7 +207,7 @@ mod tests {
};
let segment = StorageHistory::new(prune_mode);
let provider = db.factory.provider_rw().unwrap();
let provider = db.factory.database_provider_rw().unwrap();
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);

View File

@ -1,11 +1,11 @@
use crate::{
db_ext::DbTxPruneExt,
segments::{PruneInput, Segment, SegmentOutput},
PrunerError,
};
use rayon::prelude::*;
use reth_db::tables;
use reth_db_api::database::Database;
use reth_provider::{DatabaseProviderRW, TransactionsProvider};
use reth_db::{tables, transaction::DbTxMut};
use reth_provider::{BlockReader, DBProvider, TransactionsProvider};
use reth_prune_types::{
PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutputCheckpoint,
};
@ -22,7 +22,10 @@ impl TransactionLookup {
}
}
impl<DB: Database> Segment<DB> for TransactionLookup {
impl<Provider> Segment<Provider> for TransactionLookup
where
Provider: DBProvider<Tx: DbTxMut> + TransactionsProvider + BlockReader,
{
fn segment(&self) -> PruneSegment {
PruneSegment::TransactionLookup
}
@ -36,11 +39,7 @@ impl<DB: Database> Segment<DB> for TransactionLookup {
}
#[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
fn prune(
&self,
provider: &DatabaseProviderRW<DB>,
input: PruneInput,
) -> Result<SegmentOutput, PrunerError> {
fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
let (start, end) = match input.get_next_tx_num_range(provider)? {
Some(range) => range,
None => {
@ -73,13 +72,15 @@ impl<DB: Database> Segment<DB> for TransactionLookup {
let mut limiter = input.limiter;
let mut last_pruned_transaction = None;
let (pruned, done) = provider.prune_table_with_iterator::<tables::TransactionHashNumbers>(
hashes,
&mut limiter,
|row| {
last_pruned_transaction = Some(last_pruned_transaction.unwrap_or(row.1).max(row.1))
},
)?;
let (pruned, done) =
provider.tx_ref().prune_table_with_iterator::<tables::TransactionHashNumbers>(
hashes,
&mut limiter,
|row| {
last_pruned_transaction =
Some(last_pruned_transaction.unwrap_or(row.1).max(row.1))
},
)?;
let done = done && tx_range_end == end;
trace!(target: "pruner", %pruned, %done, "Pruned transaction lookup");
@ -117,7 +118,7 @@ mod tests {
Itertools,
};
use reth_db::tables;
use reth_provider::PruneCheckpointReader;
use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
use reth_prune_types::{
PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress, PruneSegment,
};
@ -204,7 +205,7 @@ mod tests {
.into_inner()
.0;
let provider = db.factory.provider_rw().unwrap();
let provider = db.factory.database_provider_rw().unwrap();
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);

View File

@ -47,7 +47,7 @@ impl<DB: Database> Stage<DB> for PruneStage {
.delete_limit(self.commit_threshold)
.build(provider.static_file_provider().clone());
let result = pruner.run(provider, input.target())?;
let result = pruner.run_with_provider(&provider.0, input.target())?;
if result.progress.is_finished() {
Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()), done: true })
} else {

View File

@ -159,8 +159,7 @@ where
// Create a new database transaction on every segment to prevent long-lived read-only
// transactions
let mut provider = self.provider.database_provider_ro()?;
provider.disable_long_read_transaction_safety();
let provider = self.provider.database_provider_ro()?.disable_long_read_transaction_safety();
segment.copy_to_static_files(provider, self.provider.static_file_provider(), block_range.clone())?;
let elapsed = start.elapsed(); // TODO(alexey): track in metrics

View File

@ -13,6 +13,7 @@ use reth_chain_state::{
MemoryOverlayStateProvider,
};
use reth_chainspec::ChainInfo;
use reth_db::Database;
use reth_db_api::models::{AccountBeforeTx, StoredBlockBodyIndices};
use reth_evm::ConfigureEvmEnv;
use reth_execution_types::ExecutionOutcome;
@ -34,7 +35,7 @@ use std::{
};
use tracing::trace;
use super::ProviderNodeTypes;
use super::{DatabaseProvider, ProviderNodeTypes};
/// The main type for interacting with the blockchain.
///
@ -263,10 +264,15 @@ impl<N: ProviderNodeTypes> BlockchainProvider2<N> {
impl<N: ProviderNodeTypes> DatabaseProviderFactory for BlockchainProvider2<N> {
type DB = N::DB;
type Provider = DatabaseProviderRO<N::DB>;
type Provider = DatabaseProvider<<N::DB as Database>::TX>;
type ProviderRW = DatabaseProvider<<N::DB as Database>::TXMut>;
fn database_provider_ro(&self) -> ProviderResult<Self::Provider> {
self.database.provider()
self.database.database_provider_ro()
}
fn database_provider_rw(&self) -> ProviderResult<Self::ProviderRW> {
self.database.database_provider_rw()
}
}

View File

@ -186,10 +186,15 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
impl<N: ProviderNodeTypes> DatabaseProviderFactory for ProviderFactory<N> {
type DB = N::DB;
type Provider = DatabaseProviderRO<N::DB>;
type ProviderRW = DatabaseProvider<<N::DB as Database>::TXMut>;
fn database_provider_ro(&self) -> ProviderResult<Self::Provider> {
self.provider()
}
fn database_provider_rw(&self) -> ProviderResult<Self::ProviderRW> {
self.provider_rw().map(|provider| provider.0)
}
}
impl<N: NodeTypesWithDB> StaticFileProviderFactory for ProviderFactory<N> {

View File

@ -23,13 +23,13 @@ use reth_db::{
};
use reth_db_api::{
common::KeyValue,
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, RangeWalker},
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
database::Database,
models::{
sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
ShardedKey, StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals,
},
table::{Table, TableRow},
table::Table,
transaction::{DbTx, DbTxMut},
DatabaseError,
};
@ -43,7 +43,7 @@ use reth_primitives::{
TransactionSigned, TransactionSignedEcRecovered, TransactionSignedNoHash, TxHash, TxNumber,
Withdrawal, Withdrawals, B256, U256,
};
use reth_prune_types::{PruneCheckpoint, PruneLimiter, PruneModes, PruneSegment};
use reth_prune_types::{PruneCheckpoint, PruneModes, PruneSegment};
use reth_stages_types::{StageCheckpoint, StageId};
use reth_storage_api::TryIntoHistoricalStateProvider;
use reth_storage_errors::provider::{ProviderResult, RootMismatch};
@ -1519,119 +1519,6 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
Ok(())
}
/// Prune the table for the specified pre-sorted key iterator.
///
/// Returns number of rows pruned.
pub fn prune_table_with_iterator<T: Table>(
&self,
keys: impl IntoIterator<Item = T::Key>,
limiter: &mut PruneLimiter,
mut delete_callback: impl FnMut(TableRow<T>),
) -> Result<(usize, bool), DatabaseError> {
let mut cursor = self.tx.cursor_write::<T>()?;
let mut keys = keys.into_iter();
let mut deleted_entries = 0;
for key in &mut keys {
if limiter.is_limit_reached() {
debug!(
target: "providers::db",
?limiter,
deleted_entries_limit = %limiter.is_deleted_entries_limit_reached(),
time_limit = %limiter.is_time_limit_reached(),
table = %T::NAME,
"Pruning limit reached"
);
break
}
let row = cursor.seek_exact(key)?;
if let Some(row) = row {
cursor.delete_current()?;
limiter.increment_deleted_entries_count();
deleted_entries += 1;
delete_callback(row);
}
}
let done = keys.next().is_none();
Ok((deleted_entries, done))
}
/// Prune the table for the specified key range.
///
/// Returns number of rows pruned.
pub fn prune_table_with_range<T: Table>(
&self,
keys: impl RangeBounds<T::Key> + Clone + Debug,
limiter: &mut PruneLimiter,
mut skip_filter: impl FnMut(&TableRow<T>) -> bool,
mut delete_callback: impl FnMut(TableRow<T>),
) -> Result<(usize, bool), DatabaseError> {
let mut cursor = self.tx.cursor_write::<T>()?;
let mut walker = cursor.walk_range(keys)?;
let mut deleted_entries = 0;
let done = loop {
// check for time out must be done in this scope since it's not done in
// `prune_table_with_range_step`
if limiter.is_limit_reached() {
debug!(
target: "providers::db",
?limiter,
deleted_entries_limit = %limiter.is_deleted_entries_limit_reached(),
time_limit = %limiter.is_time_limit_reached(),
table = %T::NAME,
"Pruning limit reached"
);
break false
}
let done = self.prune_table_with_range_step(
&mut walker,
limiter,
&mut skip_filter,
&mut delete_callback,
)?;
if done {
break true
}
deleted_entries += 1;
};
Ok((deleted_entries, done))
}
/// Steps once with the given walker and prunes the entry in the table.
///
/// Returns `true` if the walker is finished, `false` if it may have more data to prune.
///
/// CAUTION: Pruner limits are not checked. This allows for a clean exit of a prune run that's
/// pruning different tables concurrently, by letting them step to the same height before
/// timing out.
pub fn prune_table_with_range_step<T: Table>(
&self,
walker: &mut RangeWalker<'_, T, <TX as DbTxMut>::CursorMut<T>>,
limiter: &mut PruneLimiter,
skip_filter: &mut impl FnMut(&TableRow<T>) -> bool,
delete_callback: &mut impl FnMut(TableRow<T>),
) -> Result<bool, DatabaseError> {
let Some(res) = walker.next() else { return Ok(true) };
let row = res?;
if !skip_filter(&row) {
walker.delete_current()?;
limiter.increment_deleted_entries_count();
delete_callback(row);
}
Ok(false)
}
/// Load shard and remove it. If list is empty, last shard was full or
/// there are no shards at all.
fn take_shard<T>(&self, key: T::Key) -> ProviderResult<Vec<u64>>
@ -3690,7 +3577,7 @@ impl<TX: DbTxMut> FinalizedBlockWriter for DatabaseProvider<TX> {
}
}
impl<TX: DbTx> DBProvider for DatabaseProvider<TX> {
impl<TX: DbTx + 'static> DBProvider for DatabaseProvider<TX> {
type Tx = TX;
fn tx_ref(&self) -> &Self::Tx {
@ -3700,6 +3587,10 @@ impl<TX: DbTx> DBProvider for DatabaseProvider<TX> {
fn tx_mut(&mut self) -> &mut Self::Tx {
&mut self.tx
}
fn into_tx(self) -> Self::Tx {
self.tx
}
}
/// Helper method to recover senders for any blocks in the db which do not have senders. This

View File

@ -14,6 +14,7 @@ use reth_blockchain_tree_api::{
};
use reth_chain_state::{ChainInfoTracker, ForkChoiceNotifications, ForkChoiceSubscriptions};
use reth_chainspec::{ChainInfo, ChainSpec};
use reth_db::Database;
use reth_db_api::models::{AccountBeforeTx, StoredBlockBodyIndices};
use reth_evm::ConfigureEvmEnv;
use reth_node_types::NodeTypesWithDB;
@ -171,11 +172,16 @@ where
impl<N: ProviderNodeTypes> DatabaseProviderFactory for BlockchainProvider<N> {
type DB = N::DB;
type Provider = DatabaseProviderRO<N::DB>;
type Provider = DatabaseProvider<<N::DB as Database>::TX>;
type ProviderRW = DatabaseProvider<<N::DB as Database>::TXMut>;
fn database_provider_ro(&self) -> ProviderResult<Self::Provider> {
self.database.provider()
}
fn database_provider_rw(&self) -> ProviderResult<Self::ProviderRW> {
self.database.provider_rw().map(|p| p.0)
}
}
impl<N: ProviderNodeTypes> StaticFileProviderFactory for BlockchainProvider<N> {

View File

@ -146,10 +146,15 @@ impl MockEthProvider {
impl DatabaseProviderFactory for MockEthProvider {
type DB = DatabaseMock;
type Provider = DatabaseProvider<TxMock>;
type ProviderRW = DatabaseProvider<TxMock>;
fn database_provider_ro(&self) -> ProviderResult<Self::Provider> {
Err(ConsistentViewError::Syncing { best_block: GotExpected::new(0, 0) }.into())
}
fn database_provider_rw(&self) -> ProviderResult<Self::ProviderRW> {
Err(ConsistentViewError::Syncing { best_block: GotExpected::new(0, 0) }.into())
}
}
impl HeaderProvider for MockEthProvider {

View File

@ -2,7 +2,7 @@ use reth_db_api::{database::Database, transaction::DbTx};
use reth_storage_errors::provider::ProviderResult;
/// Database provider.
pub trait DBProvider: Send + Sync {
pub trait DBProvider: Send + Sync + Sized + 'static {
/// Underlying database transaction held by the provider.
type Tx: DbTx;
@ -12,18 +12,28 @@ pub trait DBProvider: Send + Sync {
/// Returns a mutable reference to the underlying transaction.
fn tx_mut(&mut self) -> &mut Self::Tx;
/// Consumes the provider and returns the underlying transaction.
fn into_tx(self) -> Self::Tx;
/// Disables long-lived read transaction safety guarantees for leaks prevention and
/// observability improvements.
///
/// CAUTION: In most of the cases, you want the safety guarantees for long read transactions
/// enabled. Use this only if you're sure that no write transaction is open in parallel, meaning
/// that Reth as a node is offline and not progressing.
fn disable_long_read_transaction_safety(&mut self) {
fn disable_long_read_transaction_safety(mut self) -> Self {
self.tx_mut().disable_long_read_transaction_safety();
self
}
/// Commit database transaction
fn commit(self) -> ProviderResult<bool> {
Ok(self.into_tx().commit()?)
}
}
/// Database provider factory.
#[auto_impl::auto_impl(&, Arc)]
pub trait DatabaseProviderFactory: Send + Sync {
/// Database this factory produces providers for.
type DB: Database;
@ -31,6 +41,12 @@ pub trait DatabaseProviderFactory: Send + Sync {
/// Provider type returned by the factory.
type Provider: DBProvider<Tx = <Self::DB as Database>::TX>;
/// Read-write provider type returned by the factory.
type ProviderRW: DBProvider<Tx = <Self::DB as Database>::TXMut>;
/// Create new read-only database provider.
fn database_provider_ro(&self) -> ProviderResult<Self::Provider>;
/// Create new read-write database provider.
fn database_provider_rw(&self) -> ProviderResult<Self::ProviderRW>;
}