feat: make reth-stages independent of concrete DatabaseProvider (#10934)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Arsenii Kulikov
2024-09-19 13:47:55 +03:00
committed by GitHub
parent 9e7e3247cc
commit 180f10001c
47 changed files with 726 additions and 678 deletions

3
Cargo.lock generated
View File

@ -7008,7 +7008,6 @@ dependencies = [
"reth-exex-types",
"reth-metrics",
"reth-network-p2p",
"reth-node-types",
"reth-payload-builder",
"reth-payload-primitives",
"reth-payload-validator",
@ -8678,11 +8677,9 @@ dependencies = [
"futures-util",
"metrics",
"reth-consensus",
"reth-db-api",
"reth-errors",
"reth-metrics",
"reth-network-p2p",
"reth-node-types",
"reth-primitives-traits",
"reth-provider",
"reth-prune",

View File

@ -173,7 +173,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
.try_seal_with_senders()
.map_err(|_| BlockValidationError::SenderRecoveryError)?,
)?;
let mut storage_writer = UnifiedStorageWriter::from_database(&provider_rw);
let mut storage_writer = UnifiedStorageWriter::from_database(&provider_rw.0);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::No)?;
let storage_lists = provider_rw.changed_storages_with_range(block.number..=block.number)?;
let storages = provider_rw.plain_state_storages(storage_lists)?;

View File

@ -20,8 +20,9 @@ use reth_node_api::{NodeTypesWithDB, NodeTypesWithEngine};
use reth_node_ethereum::EthExecutorProvider;
use reth_primitives::BlockHashOrNumber;
use reth_provider::{
writer::UnifiedStorageWriter, BlockNumReader, BlockWriter, ChainSpecProvider, HeaderProvider,
LatestStateProviderRef, OriginalValuesKnown, ProviderError, ProviderFactory, StateWriter,
writer::UnifiedStorageWriter, BlockNumReader, BlockWriter, ChainSpecProvider,
DatabaseProviderFactory, HeaderProvider, LatestStateProviderRef, OriginalValuesKnown,
ProviderError, ProviderFactory, StateWriter, StaticFileProviderFactory,
};
use reth_revm::database::StateProviderDatabase;
use reth_stages::{
@ -84,7 +85,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
let Environment { provider_factory, config, data_dir } =
self.env.init::<N>(AccessRights::RW)?;
let provider_rw = provider_factory.provider_rw()?;
let provider_rw = provider_factory.database_provider_rw()?;
// Configure and build network
let network_secret_path =

View File

@ -10,7 +10,7 @@ use reth_db_common::DbTool;
use reth_evm::{execute::BlockExecutorProvider, noop::NoopBlockExecutorProvider};
use reth_node_builder::{NodeTypesWithDB, NodeTypesWithDBAdapter};
use reth_node_core::dirs::{ChainPath, DataDirPath};
use reth_provider::{providers::StaticFileProvider, ProviderFactory};
use reth_provider::{providers::StaticFileProvider, DatabaseProviderFactory, ProviderFactory};
use reth_stages::{stages::ExecutionStage, Stage, StageCheckpoint, UnwindInput};
use tracing::info;
@ -135,7 +135,7 @@ fn unwind_and_copy<N: NodeTypesWithDB<ChainSpec = ChainSpec>>(
tip_block_number: u64,
output_db: &DatabaseEnv,
) -> eyre::Result<()> {
let provider = db_tool.provider_factory.provider_rw()?;
let provider = db_tool.provider_factory.database_provider_rw()?;
let mut exec_stage = ExecutionStage::new_with_executor(NoopBlockExecutorProvider::default());
@ -175,7 +175,7 @@ where
let input =
reth_stages::ExecInput { target: Some(to), checkpoint: Some(StageCheckpoint::new(from)) };
exec_stage.execute(&output_provider_factory.provider_rw()?, input)?;
exec_stage.execute(&output_provider_factory.database_provider_rw()?, input)?;
info!(target: "reth::cli", "Success");

View File

@ -9,7 +9,7 @@ use reth_db_api::{database::Database, table::TableImporter};
use reth_db_common::DbTool;
use reth_node_builder::{NodeTypesWithDB, NodeTypesWithDBAdapter};
use reth_node_core::dirs::{ChainPath, DataDirPath};
use reth_provider::{providers::StaticFileProvider, ProviderFactory};
use reth_provider::{providers::StaticFileProvider, DatabaseProviderFactory, ProviderFactory};
use reth_stages::{stages::AccountHashingStage, Stage, StageCheckpoint, UnwindInput};
use tracing::info;
@ -55,7 +55,7 @@ fn unwind_and_copy<N: NodeTypesWithDB<ChainSpec = ChainSpec>>(
tip_block_number: u64,
output_db: &DatabaseEnv,
) -> eyre::Result<()> {
let provider = db_tool.provider_factory.provider_rw()?;
let provider = db_tool.provider_factory.database_provider_rw()?;
let mut exec_stage = AccountHashingStage::default();
exec_stage.unwind(
@ -81,7 +81,7 @@ fn dry_run<N: NodeTypesWithDB<ChainSpec = ChainSpec>>(
) -> eyre::Result<()> {
info!(target: "reth::cli", "Executing stage.");
let provider = output_provider_factory.provider_rw()?;
let provider = output_provider_factory.database_provider_rw()?;
let mut stage = AccountHashingStage {
clean_threshold: 1, // Forces hashing from scratch
..Default::default()

View File

@ -8,7 +8,7 @@ use reth_db_api::{database::Database, table::TableImporter};
use reth_db_common::DbTool;
use reth_node_builder::{NodeTypesWithDB, NodeTypesWithDBAdapter};
use reth_node_core::dirs::{ChainPath, DataDirPath};
use reth_provider::{providers::StaticFileProvider, ProviderFactory};
use reth_provider::{providers::StaticFileProvider, DatabaseProviderFactory, ProviderFactory};
use reth_stages::{stages::StorageHashingStage, Stage, StageCheckpoint, UnwindInput};
use tracing::info;
@ -45,7 +45,7 @@ fn unwind_and_copy<N: NodeTypesWithDB<ChainSpec = ChainSpec>>(
tip_block_number: u64,
output_db: &DatabaseEnv,
) -> eyre::Result<()> {
let provider = db_tool.provider_factory.provider_rw()?;
let provider = db_tool.provider_factory.database_provider_rw()?;
let mut exec_stage = StorageHashingStage::default();
@ -76,7 +76,7 @@ fn dry_run<N: NodeTypesWithDB<ChainSpec = ChainSpec>>(
) -> eyre::Result<()> {
info!(target: "reth::cli", "Executing stage.");
let provider = output_provider_factory.provider_rw()?;
let provider = output_provider_factory.database_provider_rw()?;
let mut stage = StorageHashingStage {
clean_threshold: 1, // Forces hashing from scratch
..Default::default()

View File

@ -12,7 +12,7 @@ use reth_evm::noop::NoopBlockExecutorProvider;
use reth_exex::ExExManagerHandle;
use reth_node_builder::{NodeTypesWithDB, NodeTypesWithDBAdapter};
use reth_node_core::dirs::{ChainPath, DataDirPath};
use reth_provider::{providers::StaticFileProvider, ProviderFactory};
use reth_provider::{providers::StaticFileProvider, DatabaseProviderFactory, ProviderFactory};
use reth_prune::PruneModes;
use reth_stages::{
stages::{
@ -73,7 +73,7 @@ fn unwind_and_copy<N: NodeTypesWithDB<ChainSpec = ChainSpec>>(
output_db: &DatabaseEnv,
) -> eyre::Result<()> {
let (from, to) = range;
let provider = db_tool.provider_factory.provider_rw()?;
let provider = db_tool.provider_factory.database_provider_rw()?;
let unwind = UnwindInput {
unwind_to: from,
@ -150,7 +150,7 @@ fn dry_run<N: NodeTypesWithDB<ChainSpec = ChainSpec>>(
from: u64,
) -> eyre::Result<()> {
info!(target: "reth::cli", "Executing stage.");
let provider = output_provider_factory.provider_rw()?;
let provider = output_provider_factory.database_provider_rw()?;
let mut stage = MerkleStage::Execution {
// Forces updating the root instead of calculating from scratch

View File

@ -34,8 +34,8 @@ use reth_node_metrics::{
version::VersionInfo,
};
use reth_provider::{
writer::UnifiedStorageWriter, ChainSpecProvider, StageCheckpointReader, StageCheckpointWriter,
StaticFileProviderFactory,
writer::UnifiedStorageWriter, ChainSpecProvider, DatabaseProviderFactory,
StageCheckpointReader, StageCheckpointWriter, StaticFileProviderFactory,
};
use reth_stages::{
stages::{
@ -117,7 +117,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
let Environment { provider_factory, config, data_dir } =
self.env.init::<N>(AccessRights::RW)?;
let mut provider_rw = provider_factory.provider_rw()?;
let mut provider_rw = provider_factory.database_provider_rw()?;
if let Some(listen_addr) = self.metrics {
info!(target: "reth::cli", "Starting metrics endpoint at {}", listen_addr);
@ -333,7 +333,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
provider_rw,
provider_factory.static_file_provider(),
)?;
provider_rw = provider_factory.provider_rw()?;
provider_rw = provider_factory.database_provider_rw()?;
}
}
}
@ -356,7 +356,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
}
if self.commit {
UnifiedStorageWriter::commit(provider_rw, provider_factory.static_file_provider())?;
provider_rw = provider_factory.provider_rw()?;
provider_rw = provider_factory.database_provider_rw()?;
}
if done {

View File

@ -10,7 +10,6 @@ use reth_network_p2p::{
full_block::{FetchFullBlockFuture, FetchFullBlockRangeFuture, FullBlockClient},
BlockClient,
};
use reth_node_types::NodeTypesWithDB;
use reth_primitives::SealedBlock;
use reth_provider::providers::ProviderNodeTypes;
use reth_stages_api::{ControlFlow, Pipeline, PipelineError, PipelineTarget, PipelineWithResult};
@ -34,7 +33,7 @@ use tracing::trace;
/// database while the pipeline is still active.
pub(crate) struct EngineSyncController<N, Client>
where
N: NodeTypesWithDB,
N: ProviderNodeTypes,
Client: BlockClient,
{
/// A downloader that can download full blocks from the network.
@ -394,14 +393,14 @@ pub(crate) enum EngineSyncEvent {
/// running, it acquires the write lock over the database. This means that we cannot forward to the
/// blockchain tree any messages that would result in database writes, since it would result in a
/// deadlock.
enum PipelineState<N: NodeTypesWithDB> {
enum PipelineState<N: ProviderNodeTypes> {
/// Pipeline is idle.
Idle(Option<Pipeline<N>>),
/// Pipeline is running and waiting for a response
Running(oneshot::Receiver<PipelineWithResult<N>>),
}
impl<N: NodeTypesWithDB> PipelineState<N> {
impl<N: ProviderNodeTypes> PipelineState<N> {
/// Returns `true` if the state matches idle.
const fn is_idle(&self) -> bool {
matches!(self, Self::Idle(_))

View File

@ -31,7 +31,6 @@ reth-revm.workspace = true
reth-rpc-types.workspace = true
reth-stages-api.workspace = true
reth-tasks.workspace = true
reth-node-types.workspace = true
reth-trie.workspace = true
reth-trie-parallel.workspace = true

View File

@ -8,7 +8,6 @@
//! These modes are mutually exclusive and the node can only be in one mode at a time.
use futures::FutureExt;
use reth_node_types::NodeTypesWithDB;
use reth_provider::providers::ProviderNodeTypes;
use reth_stages_api::{ControlFlow, Pipeline, PipelineError, PipelineTarget, PipelineWithResult};
use reth_tasks::TaskSpawner;
@ -79,7 +78,7 @@ pub enum BackfillEvent {
/// Pipeline sync.
#[derive(Debug)]
pub struct PipelineSync<N: NodeTypesWithDB> {
pub struct PipelineSync<N: ProviderNodeTypes> {
/// The type that can spawn the pipeline task.
pipeline_task_spawner: Box<dyn TaskSpawner>,
/// The current state of the pipeline.
@ -213,14 +212,14 @@ impl<N: ProviderNodeTypes> BackfillSync for PipelineSync<N> {
/// blockchain tree any messages that would result in database writes, since it would result in a
/// deadlock.
#[derive(Debug)]
enum PipelineState<N: NodeTypesWithDB> {
enum PipelineState<N: ProviderNodeTypes> {
/// Pipeline is idle.
Idle(Option<Pipeline<N>>),
/// Pipeline is running and waiting for a response
Running(oneshot::Receiver<PipelineWithResult<N>>),
}
impl<N: NodeTypesWithDB> PipelineState<N> {
impl<N: ProviderNodeTypes> PipelineState<N> {
/// Returns `true` if the state matches idle.
const fn is_idle(&self) -> bool {
matches!(self, Self::Idle(_))

View File

@ -3,8 +3,8 @@ use reth_chain_state::ExecutedBlock;
use reth_errors::ProviderError;
use reth_primitives::BlockNumHash;
use reth_provider::{
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockHashReader, ProviderFactory,
StaticFileProviderFactory,
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockHashReader,
DatabaseProviderFactory, ProviderFactory, StaticFileProviderFactory,
};
use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory};
use reth_stages_api::{MetricEvent, MetricEventsSender};
@ -103,7 +103,7 @@ impl<N: ProviderNodeTypes> PersistenceService<N> {
) -> Result<Option<BlockNumHash>, PersistenceError> {
debug!(target: "engine::persistence", ?new_tip_num, "Removing blocks");
let start_time = Instant::now();
let provider_rw = self.provider.provider_rw()?;
let provider_rw = self.provider.database_provider_rw()?;
let sf_provider = self.provider.static_file_provider();
let new_tip_hash = provider_rw.block_hash(new_tip_num)?;
@ -126,7 +126,7 @@ impl<N: ProviderNodeTypes> PersistenceService<N> {
.map(|block| BlockNumHash { hash: block.block().hash(), number: block.block().number });
if last_block_hash_num.is_some() {
let provider_rw = self.provider.provider_rw()?;
let provider_rw = self.provider.database_provider_rw()?;
let static_file_provider = self.provider.static_file_provider();
UnifiedStorageWriter::from(&provider_rw, &static_file_provider).save_blocks(&blocks)?;

View File

@ -12,7 +12,7 @@ use reth_primitives::{
};
use reth_provider::{
providers::ProviderNodeTypes, BlockWriter as _, ExecutionOutcome, LatestStateProviderRef,
ProviderFactory,
ProviderFactory, StaticFileProviderFactory,
};
use reth_revm::database::StateProviderDatabase;
use reth_testing_utils::generators::sign_tx_with_key_pair;
@ -63,7 +63,7 @@ where
let mut block_execution_output = EthExecutorProvider::ethereum(chain_spec)
.executor(StateProviderDatabase::new(LatestStateProviderRef::new(
provider.tx_ref(),
provider.static_file_provider().clone(),
provider.static_file_provider(),
)))
.execute(BlockExecutionInput { block, total_difficulty: U256::ZERO })?;
block_execution_output.state.reverts.sort();
@ -187,7 +187,7 @@ where
let executor =
EthExecutorProvider::ethereum(chain_spec).batch_executor(StateProviderDatabase::new(
LatestStateProviderRef::new(provider.tx_ref(), provider.static_file_provider().clone()),
LatestStateProviderRef::new(provider.tx_ref(), provider.static_file_provider()),
));
let mut execution_outcome = executor.execute_and_verify_batch(vec![

View File

@ -48,7 +48,7 @@ use reth_prune::{PruneModes, PrunerBuilder};
use reth_rpc_api::clients::EthApiClient;
use reth_rpc_builder::config::RethRpcServerConfig;
use reth_rpc_layer::JwtSecret;
use reth_stages::{sets::DefaultStages, MetricEvent, Pipeline, PipelineTarget, StageId};
use reth_stages::{sets::DefaultStages, MetricEvent, PipelineBuilder, PipelineTarget, StageId};
use reth_static_file::StaticFileProducer;
use reth_tasks::TaskExecutor;
use reth_tracing::tracing::{debug, error, info, warn};
@ -430,7 +430,7 @@ where
let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
// Builds an unwind-only pipeline
let pipeline = Pipeline::<N>::builder()
let pipeline = PipelineBuilder::default()
.add_stages(DefaultStages::new(
factory.clone(),
tip_rx,

View File

@ -14,7 +14,6 @@ workspace = true
# reth
reth-primitives-traits.workspace = true
reth-provider.workspace = true
reth-db-api.workspace = true
reth-static-file.workspace = true
reth-network-p2p.workspace = true
reth-tokio-util.workspace = true
@ -23,7 +22,6 @@ reth-prune.workspace = true
reth-errors.workspace = true
reth-stages-types.workspace = true
reth-static-file-types.workspace = true
reth-node-types.workspace = true
alloy-primitives.workspace = true

View File

@ -1,19 +1,14 @@
use crate::{pipeline::BoxedStage, MetricEventsSender, Pipeline, Stage, StageId, StageSet};
use alloy_primitives::{BlockNumber, B256};
use reth_db_api::database::Database;
use reth_node_types::NodeTypesWithDB;
use reth_provider::ProviderFactory;
use reth_provider::{providers::ProviderNodeTypes, DatabaseProviderFactory, ProviderFactory};
use reth_static_file::StaticFileProducer;
use tokio::sync::watch;
/// Builds a [`Pipeline`].
#[must_use = "call `build` to construct the pipeline"]
pub struct PipelineBuilder<DB>
where
DB: Database,
{
pub struct PipelineBuilder<Provider> {
/// All configured stages in the order they will be executed.
stages: Vec<BoxedStage<DB>>,
stages: Vec<BoxedStage<Provider>>,
/// The maximum block number to sync to.
max_block: Option<BlockNumber>,
/// A receiver for the current chain tip to sync to.
@ -21,14 +16,11 @@ where
metrics_tx: Option<MetricEventsSender>,
}
impl<DB> PipelineBuilder<DB>
where
DB: Database,
{
impl<Provider> PipelineBuilder<Provider> {
/// Add a stage to the pipeline.
pub fn add_stage<S>(mut self, stage: S) -> Self
where
S: Stage<DB> + 'static,
S: Stage<Provider> + 'static,
{
self.stages.push(Box::new(stage));
self
@ -41,7 +33,7 @@ where
/// To customize the stages in the set (reorder, disable, insert a stage) call
/// [`builder`][StageSet::builder] on the set which will convert it to a
/// [`StageSetBuilder`][crate::StageSetBuilder].
pub fn add_stages<Set: StageSet<DB>>(mut self, set: Set) -> Self {
pub fn add_stages<Set: StageSet<Provider>>(mut self, set: Set) -> Self {
for stage in set.builder().build() {
self.stages.push(stage);
}
@ -69,11 +61,15 @@ where
}
/// Builds the final [`Pipeline`] using the given database.
pub fn build<N: NodeTypesWithDB<DB = DB>>(
pub fn build<N>(
self,
provider_factory: ProviderFactory<N>,
static_file_producer: StaticFileProducer<ProviderFactory<N>>,
) -> Pipeline<N> {
) -> Pipeline<N>
where
N: ProviderNodeTypes,
ProviderFactory<N>: DatabaseProviderFactory<ProviderRW = Provider>,
{
let Self { stages, max_block, tip_tx, metrics_tx } = self;
Pipeline {
provider_factory,
@ -88,13 +84,13 @@ where
}
}
impl<DB: Database> Default for PipelineBuilder<DB> {
impl<Provider> Default for PipelineBuilder<Provider> {
fn default() -> Self {
Self { stages: Vec::new(), max_block: None, tip_tx: None, metrics_tx: None }
}
}
impl<DB: Database> std::fmt::Debug for PipelineBuilder<DB> {
impl<Provider> std::fmt::Debug for PipelineBuilder<Provider> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PipelineBuilder")
.field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())

View File

@ -5,12 +5,11 @@ use crate::{PipelineTarget, StageCheckpoint, StageId};
use alloy_primitives::{BlockNumber, B256};
pub use event::*;
use futures_util::Future;
use reth_node_types::NodeTypesWithDB;
use reth_primitives_traits::constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH;
use reth_provider::{
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, FinalizedBlockReader,
FinalizedBlockWriter, ProviderFactory, StageCheckpointReader, StageCheckpointWriter,
StaticFileProviderFactory,
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, DatabaseProviderFactory,
FinalizedBlockReader, FinalizedBlockWriter, ProviderFactory, StageCheckpointReader,
StageCheckpointWriter, StaticFileProviderFactory,
};
use reth_prune::PrunerBuilder;
use reth_static_file::StaticFileProducer;
@ -64,11 +63,11 @@ pub type PipelineWithResult<N> = (Pipeline<N>, Result<ControlFlow, PipelineError
/// # Defaults
///
/// The [`DefaultStages`](crate::sets::DefaultStages) are used to fully sync reth.
pub struct Pipeline<N: NodeTypesWithDB> {
pub struct Pipeline<N: ProviderNodeTypes> {
/// Provider factory.
provider_factory: ProviderFactory<N>,
/// All configured stages in the order they will be executed.
stages: Vec<BoxedStage<N::DB>>,
stages: Vec<BoxedStage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>>,
/// The maximum block number to sync to.
max_block: Option<BlockNumber>,
static_file_producer: StaticFileProducer<ProviderFactory<N>>,
@ -83,7 +82,8 @@ pub struct Pipeline<N: NodeTypesWithDB> {
impl<N: ProviderNodeTypes> Pipeline<N> {
/// Construct a pipeline using a [`PipelineBuilder`].
pub fn builder() -> PipelineBuilder<N::DB> {
pub fn builder() -> PipelineBuilder<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>
{
PipelineBuilder::default()
}
@ -276,7 +276,7 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
// Unwind stages in reverse order of execution
let unwind_pipeline = self.stages.iter_mut().rev();
let mut provider_rw = self.provider_factory.provider_rw()?;
let mut provider_rw = self.provider_factory.database_provider_rw()?;
for stage in unwind_pipeline {
let stage_id = stage.id();
@ -354,7 +354,7 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
stage.post_unwind_commit()?;
provider_rw = self.provider_factory.provider_rw()?;
provider_rw = self.provider_factory.database_provider_rw()?;
}
Err(err) => {
self.event_sender.notify(PipelineEvent::Error { stage_id });
@ -423,7 +423,7 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
};
}
let provider_rw = self.provider_factory.provider_rw()?;
let provider_rw = self.provider_factory.database_provider_rw()?;
self.event_sender.notify(PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress {
@ -513,7 +513,7 @@ fn on_stage_error<N: ProviderNodeTypes>(
// FIXME: When handling errors, we do not commit the database transaction. This
// leads to the Merkle stage not clearing its checkpoint, and restarting from an
// invalid place.
let provider_rw = factory.provider_rw()?;
let provider_rw = factory.database_provider_rw()?;
provider_rw.save_stage_checkpoint_progress(StageId::MerkleExecute, vec![])?;
provider_rw.save_stage_checkpoint(
StageId::MerkleExecute,
@ -574,7 +574,7 @@ fn on_stage_error<N: ProviderNodeTypes>(
}
}
impl<N: NodeTypesWithDB> std::fmt::Debug for Pipeline<N> {
impl<N: ProviderNodeTypes> std::fmt::Debug for Pipeline<N> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Pipeline")
.field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())

View File

@ -1,5 +1,4 @@
use crate::{Stage, StageId};
use reth_db_api::database::Database;
use std::{
collections::HashMap,
fmt::{Debug, Formatter},
@ -11,26 +10,26 @@ use std::{
/// individual stage sets to determine what kind of configuration they expose.
///
/// Individual stages in the set can be added, removed and overridden using [`StageSetBuilder`].
pub trait StageSet<DB: Database>: Sized {
pub trait StageSet<Provider>: Sized {
/// Configures the stages in the set.
fn builder(self) -> StageSetBuilder<DB>;
fn builder(self) -> StageSetBuilder<Provider>;
/// Overrides the given [`Stage`], if it is in this set.
///
/// # Panics
///
/// Panics if the [`Stage`] is not in this set.
fn set<S: Stage<DB> + 'static>(self, stage: S) -> StageSetBuilder<DB> {
fn set<S: Stage<Provider> + 'static>(self, stage: S) -> StageSetBuilder<Provider> {
self.builder().set(stage)
}
}
struct StageEntry<DB> {
stage: Box<dyn Stage<DB>>,
struct StageEntry<Provider> {
stage: Box<dyn Stage<Provider>>,
enabled: bool,
}
impl<DB: Database> Debug for StageEntry<DB> {
impl<Provider> Debug for StageEntry<Provider> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StageEntry")
.field("stage", &self.stage.id())
@ -45,18 +44,18 @@ impl<DB: Database> Debug for StageEntry<DB> {
/// to the final sync pipeline before/after their dependencies.
///
/// Stages inside the set can be disabled, enabled, overridden and reordered.
pub struct StageSetBuilder<DB> {
stages: HashMap<StageId, StageEntry<DB>>,
pub struct StageSetBuilder<Provider> {
stages: HashMap<StageId, StageEntry<Provider>>,
order: Vec<StageId>,
}
impl<DB: Database> Default for StageSetBuilder<DB> {
impl<Provider> Default for StageSetBuilder<Provider> {
fn default() -> Self {
Self { stages: HashMap::new(), order: Vec::new() }
}
}
impl<DB: Database> Debug for StageSetBuilder<DB> {
impl<Provider> Debug for StageSetBuilder<Provider> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StageSetBuilder")
.field("stages", &self.stages)
@ -65,17 +64,14 @@ impl<DB: Database> Debug for StageSetBuilder<DB> {
}
}
impl<DB> StageSetBuilder<DB>
where
DB: Database,
{
impl<Provider> StageSetBuilder<Provider> {
fn index_of(&self, stage_id: StageId) -> usize {
let index = self.order.iter().position(|&id| id == stage_id);
index.unwrap_or_else(|| panic!("Stage does not exist in set: {stage_id}"))
}
fn upsert_stage_state(&mut self, stage: Box<dyn Stage<DB>>, added_at_index: usize) {
fn upsert_stage_state(&mut self, stage: Box<dyn Stage<Provider>>, added_at_index: usize) {
let stage_id = stage.id();
if self.stages.insert(stage.id(), StageEntry { stage, enabled: true }).is_some() {
if let Some(to_remove) = self
@ -95,7 +91,7 @@ where
/// # Panics
///
/// Panics if the [`Stage`] is not in this set.
pub fn set<S: Stage<DB> + 'static>(mut self, stage: S) -> Self {
pub fn set<S: Stage<Provider> + 'static>(mut self, stage: S) -> Self {
let entry = self
.stages
.get_mut(&stage.id())
@ -107,7 +103,7 @@ where
/// Adds the given [`Stage`] at the end of this set.
///
/// If the stage was already in the group, it is removed from its previous place.
pub fn add_stage<S: Stage<DB> + 'static>(mut self, stage: S) -> Self {
pub fn add_stage<S: Stage<Provider> + 'static>(mut self, stage: S) -> Self {
let target_index = self.order.len();
self.order.push(stage.id());
self.upsert_stage_state(Box::new(stage), target_index);
@ -117,7 +113,7 @@ where
/// Adds the given [`Stage`] at the end of this set if it's [`Some`].
///
/// If the stage was already in the group, it is removed from its previous place.
pub fn add_stage_opt<S: Stage<DB> + 'static>(self, stage: Option<S>) -> Self {
pub fn add_stage_opt<S: Stage<Provider> + 'static>(self, stage: Option<S>) -> Self {
if let Some(stage) = stage {
self.add_stage(stage)
} else {
@ -129,7 +125,7 @@ where
///
/// If a stage is in both sets, it is removed from its previous place in this set. Because of
/// this, it is advisable to merge sets first and re-order stages after if needed.
pub fn add_set<Set: StageSet<DB>>(mut self, set: Set) -> Self {
pub fn add_set<Set: StageSet<Provider>>(mut self, set: Set) -> Self {
for stage in set.builder().build() {
let target_index = self.order.len();
self.order.push(stage.id());
@ -145,7 +141,7 @@ where
/// # Panics
///
/// Panics if the dependency stage is not in this set.
pub fn add_before<S: Stage<DB> + 'static>(mut self, stage: S, before: StageId) -> Self {
pub fn add_before<S: Stage<Provider> + 'static>(mut self, stage: S, before: StageId) -> Self {
let target_index = self.index_of(before);
self.order.insert(target_index, stage.id());
self.upsert_stage_state(Box::new(stage), target_index);
@ -159,7 +155,7 @@ where
/// # Panics
///
/// Panics if the dependency stage is not in this set.
pub fn add_after<S: Stage<DB> + 'static>(mut self, stage: S, after: StageId) -> Self {
pub fn add_after<S: Stage<Provider> + 'static>(mut self, stage: S, after: StageId) -> Self {
let target_index = self.index_of(after) + 1;
self.order.insert(target_index, stage.id());
self.upsert_stage_state(Box::new(stage), target_index);
@ -240,7 +236,7 @@ where
}
/// Consumes the builder and returns the contained [`Stage`]s in the order specified.
pub fn build(mut self) -> Vec<Box<dyn Stage<DB>>> {
pub fn build(mut self) -> Vec<Box<dyn Stage<Provider>>> {
let mut stages = Vec::new();
for id in &self.order {
if let Some(entry) = self.stages.remove(id) {
@ -253,7 +249,7 @@ where
}
}
impl<DB: Database> StageSet<DB> for StageSetBuilder<DB> {
impl<Provider> StageSet<Provider> for StageSetBuilder<Provider> {
fn builder(self) -> Self {
self
}

View File

@ -1,7 +1,6 @@
use crate::{error::StageError, StageCheckpoint, StageId};
use alloy_primitives::{BlockNumber, TxNumber};
use reth_db_api::database::Database;
use reth_provider::{BlockReader, DatabaseProviderRW, ProviderError, TransactionsProvider};
use reth_provider::{BlockReader, ProviderError};
use std::{
cmp::{max, min},
future::{poll_fn, Future},
@ -71,11 +70,14 @@ impl ExecInput {
/// Return the next block range determined the number of transactions within it.
/// This function walks the block indices until either the end of the range is reached or
/// the number of transactions exceeds the threshold.
pub fn next_block_range_with_transaction_threshold<DB: Database>(
pub fn next_block_range_with_transaction_threshold<Provider>(
&self,
provider: &DatabaseProviderRW<DB>,
provider: &Provider,
tx_threshold: u64,
) -> Result<(Range<TxNumber>, RangeInclusive<BlockNumber>, bool), StageError> {
) -> Result<(Range<TxNumber>, RangeInclusive<BlockNumber>, bool), StageError>
where
Provider: BlockReader,
{
let start_block = self.next_block();
let target_block = self.target();
@ -186,9 +188,9 @@ pub struct UnwindOutput {
///
/// Stages are executed as part of a pipeline where they are executed serially.
///
/// Stages receive [`DatabaseProviderRW`].
/// Stages receive [`DBProvider`](reth_provider::DBProvider).
#[auto_impl::auto_impl(Box)]
pub trait Stage<DB: Database>: Send + Sync {
pub trait Stage<Provider>: Send + Sync {
/// Get the ID of the stage.
///
/// Stage IDs must be unique.
@ -229,11 +231,7 @@ pub trait Stage<DB: Database>: Send + Sync {
/// Execute the stage.
/// It is expected that the stage will write all necessary data to the database
/// upon invoking this method.
fn execute(
&mut self,
provider: &DatabaseProviderRW<DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError>;
fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError>;
/// Post execution commit hook.
///
@ -247,7 +245,7 @@ pub trait Stage<DB: Database>: Send + Sync {
/// Unwind the stage.
fn unwind(
&mut self,
provider: &DatabaseProviderRW<DB>,
provider: &Provider,
input: UnwindInput,
) -> Result<UnwindOutput, StageError>;
@ -262,7 +260,7 @@ pub trait Stage<DB: Database>: Send + Sync {
}
/// [Stage] trait extension.
pub trait StageExt<DB: Database>: Stage<DB> {
pub trait StageExt<Provider>: Stage<Provider> {
/// Utility extension for the `Stage` trait that invokes `Stage::poll_execute_ready`
/// with [`poll_fn`] context. For more information see [`Stage::poll_execute_ready`].
fn execute_ready(
@ -273,4 +271,4 @@ pub trait StageExt<DB: Database>: Stage<DB> {
}
}
impl<DB: Database, S: Stage<DB>> StageExt<DB> for S {}
impl<Provider, S: Stage<Provider>> StageExt<Provider> for S {}

View File

@ -1,8 +1,6 @@
#![allow(missing_docs)]
use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput};
use reth_db_api::database::Database;
use reth_provider::DatabaseProviderRW;
use std::collections::VecDeque;
/// A test stage that can be used for testing.
@ -44,26 +42,18 @@ impl TestStage {
}
}
impl<DB: Database> Stage<DB> for TestStage {
impl<Provider> Stage<Provider> for TestStage {
fn id(&self) -> StageId {
self.id
}
fn execute(
&mut self,
_: &DatabaseProviderRW<DB>,
_input: ExecInput,
) -> Result<ExecOutput, StageError> {
fn execute(&mut self, _: &Provider, _input: ExecInput) -> Result<ExecOutput, StageError> {
self.exec_outputs
.pop_front()
.unwrap_or_else(|| panic!("Test stage {} executed too many times.", self.id))
}
fn unwind(
&mut self,
_: &DatabaseProviderRW<DB>,
_input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
fn unwind(&mut self, _: &Provider, _input: UnwindInput) -> Result<UnwindOutput, StageError> {
self.unwind_outputs
.pop_front()
.unwrap_or_else(|| panic!("Test stage {} unwound too many times.", self.id))

View File

@ -3,16 +3,17 @@ use criterion::{criterion_main, measurement::WallTime, BenchmarkGroup, Criterion
#[cfg(not(target_os = "windows"))]
use pprof::criterion::{Output, PProfProfiler};
use reth_config::config::{EtlConfig, TransactionLookupConfig};
use reth_db::{test_utils::TempDatabase, DatabaseEnv};
use reth_db::{test_utils::TempDatabase, Database, DatabaseEnv};
use reth_primitives::BlockNumber;
use reth_provider::{DatabaseProvider, DatabaseProviderFactory};
use reth_stages::{
stages::{MerkleStage, SenderRecoveryStage, TransactionLookupStage},
test_utils::TestStageDB,
StageCheckpoint,
};
use reth_stages_api::{ExecInput, Stage, StageExt, UnwindInput};
use std::{ops::RangeInclusive, sync::Arc};
use std::ops::RangeInclusive;
use tokio::runtime::Runtime;
mod setup;
@ -146,7 +147,7 @@ fn measure_stage<F, S>(
block_interval: RangeInclusive<BlockNumber>,
label: String,
) where
S: Clone + Stage<Arc<TempDatabase<DatabaseEnv>>>,
S: Clone + Stage<DatabaseProvider<<TempDatabase<DatabaseEnv> as Database>::TXMut>>,
F: Fn(S, &TestStageDB, StageRange),
{
let stage_range = (
@ -170,7 +171,7 @@ fn measure_stage<F, S>(
},
|_| async {
let mut stage = stage.clone();
let provider = db.factory.provider_rw().unwrap();
let provider = db.factory.database_provider_rw().unwrap();
stage
.execute_ready(input)
.await

View File

@ -1,12 +1,12 @@
#![allow(unreachable_pub)]
use itertools::concat;
use reth_db::{tables, test_utils::TempDatabase, DatabaseEnv};
use reth_db::{tables, test_utils::TempDatabase, Database, DatabaseEnv};
use reth_db_api::{
cursor::DbCursorRO,
transaction::{DbTx, DbTxMut},
};
use reth_primitives::{Account, Address, SealedBlock, B256, U256};
use reth_provider::TrieWriter;
use reth_provider::{DatabaseProvider, DatabaseProviderFactory, TrieWriter};
use reth_stages::{
stages::{AccountHashingStage, StorageHashingStage},
test_utils::{StorageKind, TestStageDB},
@ -16,7 +16,7 @@ use reth_testing_utils::generators::{
random_eoa_accounts, BlockRangeParams,
};
use reth_trie::StateRoot;
use std::{collections::BTreeMap, fs, path::Path, sync::Arc};
use std::{collections::BTreeMap, fs, path::Path};
use tokio::runtime::Handle;
mod constants;
@ -28,7 +28,9 @@ use reth_trie_db::DatabaseStateRoot;
pub(crate) type StageRange = (ExecInput, UnwindInput);
pub(crate) fn stage_unwind<S: Clone + Stage<Arc<TempDatabase<DatabaseEnv>>>>(
pub(crate) fn stage_unwind<
S: Clone + Stage<DatabaseProvider<<TempDatabase<DatabaseEnv> as Database>::TXMut>>,
>(
stage: S,
db: &TestStageDB,
range: StageRange,
@ -57,7 +59,9 @@ pub(crate) fn stage_unwind<S: Clone + Stage<Arc<TempDatabase<DatabaseEnv>>>>(
});
}
pub(crate) fn unwind_hashes<S: Clone + Stage<Arc<TempDatabase<DatabaseEnv>>>>(
pub(crate) fn unwind_hashes<
S: Clone + Stage<DatabaseProvider<<TempDatabase<DatabaseEnv> as Database>::TXMut>>,
>(
stage: S,
db: &TestStageDB,
range: StageRange,
@ -65,7 +69,7 @@ pub(crate) fn unwind_hashes<S: Clone + Stage<Arc<TempDatabase<DatabaseEnv>>>>(
let (input, unwind) = range;
let mut stage = stage;
let provider = db.factory.provider_rw().unwrap();
let provider = db.factory.database_provider_rw().unwrap();
StorageHashingStage::default().unwind(&provider, unwind).unwrap();
AccountHashingStage::default().unwind(&provider, unwind).unwrap();

View File

@ -43,12 +43,12 @@ use crate::{
};
use reth_config::config::StageConfig;
use reth_consensus::Consensus;
use reth_db_api::database::Database;
use reth_evm::execute::BlockExecutorProvider;
use reth_network_p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader};
use reth_primitives::B256;
use reth_provider::HeaderSyncGapProvider;
use reth_prune_types::PruneModes;
use reth_stages_api::Stage;
use std::{ops::Not, sync::Arc};
use tokio::sync::watch;
@ -119,17 +119,20 @@ impl<Provider, H, B, E> DefaultStages<Provider, H, B, E> {
}
}
impl<Provider, H, B, E> DefaultStages<Provider, H, B, E>
impl<P, H, B, E> DefaultStages<P, H, B, E>
where
E: BlockExecutorProvider,
{
/// Appends the default offline stages and default finish stage to the given builder.
pub fn add_offline_stages<DB: Database + 'static>(
default_offline: StageSetBuilder<DB>,
pub fn add_offline_stages<Provider>(
default_offline: StageSetBuilder<Provider>,
executor_factory: E,
stages_config: StageConfig,
prune_modes: PruneModes,
) -> StageSetBuilder<DB> {
) -> StageSetBuilder<Provider>
where
OfflineStages<E>: StageSet<Provider>,
{
StageSetBuilder::default()
.add_set(default_offline)
.add_set(OfflineStages::new(executor_factory, stages_config, prune_modes))
@ -137,15 +140,16 @@ where
}
}
impl<Provider, H, B, E, DB> StageSet<DB> for DefaultStages<Provider, H, B, E>
impl<P, H, B, E, Provider> StageSet<Provider> for DefaultStages<P, H, B, E>
where
Provider: HeaderSyncGapProvider + 'static,
P: HeaderSyncGapProvider + 'static,
H: HeaderDownloader + 'static,
B: BodyDownloader + 'static,
E: BlockExecutorProvider,
DB: Database + 'static,
OnlineStages<P, H, B>: StageSet<Provider>,
OfflineStages<E>: StageSet<Provider>,
{
fn builder(self) -> StageSetBuilder<DB> {
fn builder(self) -> StageSetBuilder<Provider> {
Self::add_offline_stages(
self.online.builder(),
self.executor_factory,
@ -189,29 +193,37 @@ impl<Provider, H, B> OnlineStages<Provider, H, B> {
}
}
impl<Provider, H, B> OnlineStages<Provider, H, B>
impl<P, H, B> OnlineStages<P, H, B>
where
Provider: HeaderSyncGapProvider + 'static,
P: HeaderSyncGapProvider + 'static,
H: HeaderDownloader + 'static,
B: BodyDownloader + 'static,
{
/// Create a new builder using the given headers stage.
pub fn builder_with_headers<DB: Database>(
headers: HeaderStage<Provider, H>,
pub fn builder_with_headers<Provider>(
headers: HeaderStage<P, H>,
body_downloader: B,
) -> StageSetBuilder<DB> {
) -> StageSetBuilder<Provider>
where
HeaderStage<P, H>: Stage<Provider>,
BodyStage<B>: Stage<Provider>,
{
StageSetBuilder::default().add_stage(headers).add_stage(BodyStage::new(body_downloader))
}
/// Create a new builder using the given bodies stage.
pub fn builder_with_bodies<DB: Database>(
pub fn builder_with_bodies<Provider>(
bodies: BodyStage<B>,
provider: Provider,
provider: P,
tip: watch::Receiver<B256>,
header_downloader: H,
consensus: Arc<dyn Consensus>,
stages_config: StageConfig,
) -> StageSetBuilder<DB> {
) -> StageSetBuilder<Provider>
where
BodyStage<B>: Stage<Provider>,
HeaderStage<P, H>: Stage<Provider>,
{
StageSetBuilder::default()
.add_stage(HeaderStage::new(
provider,
@ -224,14 +236,15 @@ where
}
}
impl<DB, Provider, H, B> StageSet<DB> for OnlineStages<Provider, H, B>
impl<Provider, P, H, B> StageSet<Provider> for OnlineStages<P, H, B>
where
DB: Database,
Provider: HeaderSyncGapProvider + 'static,
P: HeaderSyncGapProvider + 'static,
H: HeaderDownloader + 'static,
B: BodyDownloader + 'static,
HeaderStage<P, H>: Stage<Provider>,
BodyStage<B>: Stage<Provider>,
{
fn builder(self) -> StageSetBuilder<DB> {
fn builder(self) -> StageSetBuilder<Provider> {
StageSetBuilder::default()
.add_stage(HeaderStage::new(
self.provider,
@ -275,12 +288,16 @@ impl<EF> OfflineStages<EF> {
}
}
impl<E, DB> StageSet<DB> for OfflineStages<E>
impl<E, Provider> StageSet<Provider> for OfflineStages<E>
where
E: BlockExecutorProvider,
DB: Database + 'static,
ExecutionStages<E>: StageSet<Provider>,
PruneSenderRecoveryStage: Stage<Provider>,
HashingStages: StageSet<Provider>,
HistoryIndexingStages: StageSet<Provider>,
PruneStage: Stage<Provider>,
{
fn builder(self) -> StageSetBuilder<DB> {
fn builder(self) -> StageSetBuilder<Provider> {
ExecutionStages::new(
self.executor_factory,
self.stages_config.clone(),
@ -328,12 +345,13 @@ impl<E> ExecutionStages<E> {
}
}
impl<E, DB> StageSet<DB> for ExecutionStages<E>
impl<E, Provider> StageSet<Provider> for ExecutionStages<E>
where
DB: Database,
E: BlockExecutorProvider,
SenderRecoveryStage: Stage<Provider>,
ExecutionStage<E>: Stage<Provider>,
{
fn builder(self) -> StageSetBuilder<DB> {
fn builder(self) -> StageSetBuilder<Provider> {
StageSetBuilder::default()
.add_stage(SenderRecoveryStage::new(self.stages_config.sender_recovery))
.add_stage(ExecutionStage::from_config(
@ -353,8 +371,13 @@ pub struct HashingStages {
stages_config: StageConfig,
}
impl<DB: Database> StageSet<DB> for HashingStages {
fn builder(self) -> StageSetBuilder<DB> {
impl<Provider> StageSet<Provider> for HashingStages
where
MerkleStage: Stage<Provider>,
AccountHashingStage: Stage<Provider>,
StorageHashingStage: Stage<Provider>,
{
fn builder(self) -> StageSetBuilder<Provider> {
StageSetBuilder::default()
.add_stage(MerkleStage::default_unwind())
.add_stage(AccountHashingStage::new(
@ -379,8 +402,13 @@ pub struct HistoryIndexingStages {
prune_modes: PruneModes,
}
impl<DB: Database> StageSet<DB> for HistoryIndexingStages {
fn builder(self) -> StageSetBuilder<DB> {
impl<Provider> StageSet<Provider> for HistoryIndexingStages
where
TransactionLookupStage: Stage<Provider>,
IndexStorageHistoryStage: Stage<Provider>,
IndexAccountHistoryStage: Stage<Provider>,
{
fn builder(self) -> StageSetBuilder<Provider> {
StageSetBuilder::default()
.add_stage(TransactionLookupStage::new(
self.stages_config.transaction_lookup,

View File

@ -9,7 +9,6 @@ use tracing::*;
use reth_db::tables;
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW},
database::Database,
models::{StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals},
transaction::DbTxMut,
};
@ -17,7 +16,7 @@ use reth_network_p2p::bodies::{downloader::BodyDownloader, response::BlockRespon
use reth_primitives::{StaticFileSegment, TxNumber};
use reth_provider::{
providers::{StaticFileProvider, StaticFileWriter},
BlockReader, DatabaseProviderRW, HeaderProvider, ProviderError, StatsReader,
BlockReader, DBProvider, ProviderError, StaticFileProviderFactory, StatsReader,
};
use reth_stages_api::{
EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
@ -70,7 +69,10 @@ impl<D: BodyDownloader> BodyStage<D> {
}
}
impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
impl<Provider, D: BodyDownloader> Stage<Provider> for BodyStage<D>
where
Provider: DBProvider<Tx: DbTxMut> + StaticFileProviderFactory + StatsReader + BlockReader,
{
/// Return the id of the stage
fn id(&self) -> StageId {
StageId::Bodies
@ -106,11 +108,7 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
/// Download block bodies from the last checkpoint for this stage up until the latest synced
/// header, limited by the stage's batch size.
fn execute(
&mut self,
provider: &DatabaseProviderRW<DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint()))
}
@ -155,7 +153,7 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
Ordering::Less => {
return Err(missing_static_data_error(
next_static_file_tx_num.saturating_sub(1),
static_file_provider,
&static_file_provider,
provider,
)?)
}
@ -264,7 +262,7 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
/// Unwind the stage.
fn unwind(
&mut self,
provider: &DatabaseProviderRW<DB>,
provider: &Provider,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
self.buffer.take();
@ -327,7 +325,7 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
if db_tx_num > static_file_tx_num {
return Err(missing_static_data_error(
static_file_tx_num,
static_file_provider,
&static_file_provider,
provider,
)?)
}
@ -343,11 +341,14 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
}
}
fn missing_static_data_error<DB: Database>(
fn missing_static_data_error<Provider>(
last_tx_num: TxNumber,
static_file_provider: &StaticFileProvider,
provider: &DatabaseProviderRW<DB>,
) -> Result<StageError, ProviderError> {
provider: &Provider,
) -> Result<StageError, ProviderError>
where
Provider: BlockReader,
{
let mut last_block = static_file_provider
.get_highest_static_file_block(StaticFileSegment::Transactions)
.unwrap_or_default();
@ -377,9 +378,10 @@ fn missing_static_data_error<DB: Database>(
// TODO(alexey): ideally, we want to measure Bodies stage progress in bytes, but it's hard to know
// beforehand how many bytes we need to download. So the good solution would be to measure the
// progress in gas as a proxy to size. Execution stage uses a similar approach.
fn stage_checkpoint<DB: Database>(
provider: &DatabaseProviderRW<DB>,
) -> ProviderResult<EntitiesCheckpoint> {
fn stage_checkpoint<Provider>(provider: &Provider) -> ProviderResult<EntitiesCheckpoint>
where
Provider: StatsReader + StaticFileProviderFactory,
{
Ok(EntitiesCheckpoint {
processed: provider.count_entries::<tables::BlockBodyIndices>()? as u64,
// Count only static files entries. If we count the database entries too, we may have

View File

@ -2,7 +2,7 @@ use crate::stages::MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD;
use num_traits::Zero;
use reth_config::config::ExecutionConfig;
use reth_db::{static_file::HeaderMask, tables};
use reth_db_api::{cursor::DbCursorRO, database::Database, transaction::DbTx};
use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
use reth_evm::{
execute::{BatchExecutor, BlockExecutorProvider},
metrics::ExecutorMetrics,
@ -14,8 +14,9 @@ use reth_primitives_traits::format_gas_throughput;
use reth_provider::{
providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter},
writer::UnifiedStorageWriter,
BlockReader, DatabaseProviderRW, HeaderProvider, LatestStateProviderRef, OriginalValuesKnown,
ProviderError, StateWriter, StatsReader, TransactionVariant,
BlockReader, DBProvider, HeaderProvider, LatestStateProviderRef, OriginalValuesKnown,
ProviderError, StateChangeWriter, StateWriter, StaticFileProviderFactory, StatsReader,
TransactionVariant,
};
use reth_prune_types::PruneModes;
use reth_revm::database::StateProviderDatabase;
@ -149,9 +150,9 @@ impl<E> ExecutionStage<E> {
/// Given that `start_block` changes with each checkpoint, it's necessary to inspect
/// [`tables::AccountsTrie`] to ensure that [`super::MerkleStage`] hasn't
/// been previously executed.
fn adjust_prune_modes<DB: Database>(
fn adjust_prune_modes(
&self,
provider: &DatabaseProviderRW<DB>,
provider: impl StatsReader,
start_block: u64,
max_block: u64,
) -> Result<PruneModes, StageError> {
@ -169,10 +170,12 @@ impl<E> ExecutionStage<E> {
}
}
impl<E, DB> Stage<DB> for ExecutionStage<E>
impl<E, Provider> Stage<Provider> for ExecutionStage<E>
where
DB: Database,
E: BlockExecutorProvider,
Provider:
DBProvider + BlockReader + StaticFileProviderFactory + StatsReader + StateChangeWriter,
for<'a> UnifiedStorageWriter<'a, Provider, StaticFileProviderRWRefMut<'a>>: StateWriter,
{
/// Return the id of the stage
fn id(&self) -> StageId {
@ -190,11 +193,7 @@ where
}
/// Execute the stage
fn execute(
&mut self,
provider: &DatabaseProviderRW<DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint()))
}
@ -209,7 +208,8 @@ where
self.prune_modes.receipts_log_filter.is_empty()
{
debug!(target: "sync::stages::execution", start = start_block, "Preparing static file producer");
let mut producer = prepare_static_file_producer(provider, start_block)?;
let mut producer =
prepare_static_file_producer(provider, &static_file_provider, start_block)?;
// Since there might be a database <-> static file inconsistency (read
// `prepare_static_file_producer` for context), we commit the change straight away.
producer.commit()?;
@ -220,7 +220,7 @@ where
let db = StateProviderDatabase(LatestStateProviderRef::new(
provider.tx_ref(),
provider.static_file_provider().clone(),
provider.static_file_provider(),
));
let mut executor = self.executor_provider.batch_executor(db);
executor.set_tip(max_block);
@ -228,8 +228,12 @@ where
// Progress tracking
let mut stage_progress = start_block;
let mut stage_checkpoint =
execution_checkpoint(static_file_provider, start_block, max_block, input.checkpoint())?;
let mut stage_checkpoint = execution_checkpoint(
&static_file_provider,
start_block,
max_block,
input.checkpoint(),
)?;
let mut fetch_block_duration = Duration::default();
let mut execution_duration = Duration::default();
@ -354,7 +358,7 @@ where
let time = Instant::now();
// write output
let mut writer = UnifiedStorageWriter::new(&provider, static_file_producer);
let mut writer = UnifiedStorageWriter::new(provider, static_file_producer);
writer.write_to_storage(state, OriginalValuesKnown::Yes)?;
let db_write_duration = time.elapsed();
@ -390,7 +394,7 @@ where
/// Unwind the stage.
fn unwind(
&mut self,
provider: &DatabaseProviderRW<DB>,
provider: &Provider,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let (range, unwind_to, _) =
@ -425,6 +429,8 @@ where
}
}
let static_file_provider = provider.static_file_provider();
// Unwind all receipts for transactions in the block range
if self.prune_modes.receipts.is_none() && self.prune_modes.receipts_log_filter.is_empty() {
// We only use static files for Receipts, if there is no receipt pruning of any kind.
@ -432,7 +438,8 @@ where
// prepare_static_file_producer does a consistency check that will unwind static files
// if the expected highest receipt in the files is higher than the database.
// Which is essentially what happens here when we unwind this stage.
let _static_file_producer = prepare_static_file_producer(provider, *range.start())?;
let _static_file_producer =
prepare_static_file_producer(provider, &static_file_provider, *range.start())?;
} else {
// If there is any kind of receipt pruning/filtering we use the database, since static
// files do not support filters.
@ -572,11 +579,13 @@ fn calculate_gas_used_from_headers(
/// the height in the static file is higher**, it rolls back (unwinds) the static file.
/// **Conversely, if the height in the database is lower**, it triggers a rollback in the database
/// (by returning [`StageError`]) until the heights in both the database and static file match.
fn prepare_static_file_producer<'a, 'b, DB: Database>(
provider: &'b DatabaseProviderRW<DB>,
fn prepare_static_file_producer<'a, 'b, Provider>(
provider: &'b Provider,
static_file_provider: &'a StaticFileProvider,
start_block: u64,
) -> Result<StaticFileProviderRWRefMut<'a>, StageError>
where
Provider: DBProvider + BlockReader + HeaderProvider,
'b: 'a,
{
// Get next expected receipt number
@ -588,7 +597,6 @@ where
.unwrap_or(0);
// Get next expected receipt number in static files
let static_file_provider = provider.static_file_provider();
let next_static_file_receipt_num = static_file_provider
.get_highest_static_file_tx(StaticFileSegment::Receipts)
.map(|num| num + 1)
@ -657,8 +665,8 @@ mod tests {
StorageEntry, B256, U256,
};
use reth_provider::{
test_utils::create_test_provider_factory, AccountReader, ReceiptProvider,
StaticFileProviderFactory,
test_utils::create_test_provider_factory, AccountReader, DatabaseProviderFactory,
ReceiptProvider, StaticFileProviderFactory,
};
use reth_prune_types::{PruneMode, ReceiptsLogPruneConfig};
use reth_stages_api::StageUnitCheckpoint;
@ -845,8 +853,9 @@ mod tests {
.commit()
.unwrap();
{
let static_file_provider = provider.static_file_provider();
let mut receipts_writer =
provider.static_file_provider().latest_writer(StaticFileSegment::Receipts).unwrap();
static_file_provider.latest_writer(StaticFileSegment::Receipts).unwrap();
receipts_writer.increment_block(0).unwrap();
receipts_writer.commit().unwrap();
}
@ -886,7 +895,7 @@ mod tests {
// Tests node with database and node with static files
for mut mode in modes {
let provider = factory.provider_rw().unwrap();
let provider = factory.database_provider_rw().unwrap();
if let Some(mode) = &mut mode {
// Simulating a full node where we write receipts to database
@ -959,7 +968,7 @@ mod tests {
"Post changed of a account"
);
let provider = factory.provider_rw().unwrap();
let provider = factory.database_provider_rw().unwrap();
let mut stage = stage();
stage.prune_modes = mode.unwrap_or_default();
@ -991,8 +1000,9 @@ mod tests {
.commit()
.unwrap();
{
let static_file_provider = provider.static_file_provider();
let mut receipts_writer =
provider.static_file_provider().latest_writer(StaticFileSegment::Receipts).unwrap();
static_file_provider.latest_writer(StaticFileSegment::Receipts).unwrap();
receipts_writer.increment_block(0).unwrap();
receipts_writer.commit().unwrap();
}
@ -1017,7 +1027,7 @@ mod tests {
provider.commit().unwrap();
// execute
let mut provider = factory.provider_rw().unwrap();
let mut provider = factory.database_provider_rw().unwrap();
// If there is a pruning configuration, then it's forced to use the database.
// This way we test both cases.
@ -1040,7 +1050,7 @@ mod tests {
provider.commit().unwrap();
// Test Unwind
provider = factory.provider_rw().unwrap();
provider = factory.database_provider_rw().unwrap();
let mut stage = stage();
stage.prune_modes = mode.unwrap_or_default();
@ -1093,7 +1103,7 @@ mod tests {
#[tokio::test]
async fn test_selfdestruct() {
let test_db = TestStageDB::default();
let provider = test_db.factory.provider_rw().unwrap();
let provider = test_db.factory.database_provider_rw().unwrap();
let input = ExecInput { target: Some(1), checkpoint: None };
let mut genesis_rlp = hex!("f901f8f901f3a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa0c9ceb8372c88cb461724d8d3d87e8b933f6fc5f679d4841800e662f4428ffd0da056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b90100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008302000080830f4240808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice();
let genesis = SealedBlock::decode(&mut genesis_rlp).unwrap();
@ -1108,8 +1118,9 @@ mod tests {
.commit()
.unwrap();
{
let static_file_provider = provider.static_file_provider();
let mut receipts_writer =
provider.static_file_provider().latest_writer(StaticFileSegment::Receipts).unwrap();
static_file_provider.latest_writer(StaticFileSegment::Receipts).unwrap();
receipts_writer.increment_block(0).unwrap();
receipts_writer.commit().unwrap();
}
@ -1159,13 +1170,13 @@ mod tests {
provider.commit().unwrap();
// execute
let provider = test_db.factory.provider_rw().unwrap();
let provider = test_db.factory.database_provider_rw().unwrap();
let mut execution_stage = stage();
let _ = execution_stage.execute(&provider, input).unwrap();
provider.commit().unwrap();
// assert unwind stage
let provider = test_db.factory.provider_rw().unwrap();
let provider = test_db.factory.database_provider_rw().unwrap();
assert_eq!(provider.basic_account(destroyed_address), Ok(None), "Account was destroyed");
assert_eq!(

View File

@ -1,5 +1,3 @@
use reth_db_api::database::Database;
use reth_provider::DatabaseProviderRW;
use reth_stages_api::{
ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput,
};
@ -12,14 +10,14 @@ use reth_stages_api::{
#[non_exhaustive]
pub struct FinishStage;
impl<DB: Database> Stage<DB> for FinishStage {
impl<Provider> Stage<Provider> for FinishStage {
fn id(&self) -> StageId {
StageId::Finish
}
fn execute(
&mut self,
_provider: &DatabaseProviderRW<DB>,
_provider: &Provider,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()), done: true })
@ -27,7 +25,7 @@ impl<DB: Database> Stage<DB> for FinishStage {
fn unwind(
&mut self,
_provider: &DatabaseProviderRW<DB>,
_provider: &Provider,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })

View File

@ -3,12 +3,11 @@ use reth_config::config::{EtlConfig, HashingConfig};
use reth_db::{tables, RawKey, RawTable, RawValue};
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW},
database::Database,
transaction::{DbTx, DbTxMut},
};
use reth_etl::Collector;
use reth_primitives::{keccak256, Account, B256};
use reth_provider::{AccountExtReader, DatabaseProviderRW, HashingWriter, StatsReader};
use reth_provider::{AccountExtReader, DBProvider, HashingWriter, StatsReader};
use reth_stages_api::{
AccountHashingCheckpoint, EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint,
StageError, StageId, UnwindInput, UnwindOutput,
@ -58,13 +57,13 @@ impl AccountHashingStage {
///
/// Proceeds to go to the `BlockTransitionIndex` end, go back `transitions` and change the
/// account state in the `AccountChangeSets` table.
pub fn seed<DB: Database>(
provider: &DatabaseProviderRW<DB>,
pub fn seed<Tx: DbTx + DbTxMut + 'static>(
provider: &reth_provider::DatabaseProvider<Tx>,
opts: SeedOpts,
) -> Result<Vec<(reth_primitives::Address, reth_primitives::Account)>, StageError> {
use reth_db_api::models::AccountBeforeTx;
use reth_primitives::U256;
use reth_provider::providers::StaticFileWriter;
use reth_provider::{StaticFileProviderFactory, StaticFileWriter};
use reth_testing_utils::{
generators,
generators::{random_block_range, random_eoa_accounts, BlockRangeParams},
@ -125,18 +124,17 @@ impl Default for AccountHashingStage {
}
}
impl<DB: Database> Stage<DB> for AccountHashingStage {
impl<Provider> Stage<Provider> for AccountHashingStage
where
Provider: DBProvider<Tx: DbTxMut> + HashingWriter + AccountExtReader + StatsReader,
{
/// Return the id of the stage
fn id(&self) -> StageId {
StageId::AccountHashing
}
/// Execute the stage.
fn execute(
&mut self,
provider: &DatabaseProviderRW<DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint()))
}
@ -225,7 +223,7 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
/// Unwind the stage.
fn unwind(
&mut self,
provider: &DatabaseProviderRW<DB>,
provider: &Provider,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let (range, unwind_progress, _) =
@ -282,9 +280,7 @@ pub struct SeedOpts {
pub txs: Range<u8>,
}
fn stage_checkpoint_progress<DB: Database>(
provider: &DatabaseProviderRW<DB>,
) -> ProviderResult<EntitiesCheckpoint> {
fn stage_checkpoint_progress(provider: &impl StatsReader) -> ProviderResult<EntitiesCheckpoint> {
Ok(EntitiesCheckpoint {
processed: provider.count_entries::<tables::HashedAccounts>()? as u64,
total: provider.count_entries::<tables::PlainAccountState>()? as u64,
@ -350,6 +346,7 @@ mod tests {
use super::*;
use crate::test_utils::TestStageDB;
use reth_primitives::Address;
use reth_provider::DatabaseProviderFactory;
pub(crate) struct AccountHashingTestRunner {
pub(crate) db: TestStageDB,
@ -444,7 +441,7 @@ mod tests {
type Seed = Vec<(Address, Account)>;
fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
let provider = self.db.factory.provider_rw()?;
let provider = self.db.factory.database_provider_rw()?;
let res = Ok(AccountHashingStage::seed(
&provider,
SeedOpts { blocks: 1..=input.target(), accounts: 10, txs: 0..3 },

View File

@ -3,14 +3,13 @@ use reth_config::config::{EtlConfig, HashingConfig};
use reth_db::tables;
use reth_db_api::{
cursor::{DbCursorRO, DbDupCursorRW},
database::Database,
models::{BlockNumberAddress, CompactU256},
table::Decompress,
transaction::{DbTx, DbTxMut},
};
use reth_etl::Collector;
use reth_primitives::{keccak256, BufMut, StorageEntry, B256};
use reth_provider::{DatabaseProviderRW, HashingWriter, StatsReader, StorageReader};
use reth_provider::{DBProvider, HashingWriter, StatsReader, StorageReader};
use reth_stages_api::{
EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
StorageHashingCheckpoint, UnwindInput, UnwindOutput,
@ -62,18 +61,17 @@ impl Default for StorageHashingStage {
}
}
impl<DB: Database> Stage<DB> for StorageHashingStage {
impl<Provider> Stage<Provider> for StorageHashingStage
where
Provider: DBProvider<Tx: DbTxMut> + StorageReader + HashingWriter + StatsReader,
{
/// Return the id of the stage
fn id(&self) -> StageId {
StageId::StorageHashing
}
/// Execute the stage.
fn execute(
&mut self,
provider: &DatabaseProviderRW<DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
let tx = provider.tx_ref();
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint()))
@ -164,7 +162,7 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
/// Unwind the stage.
fn unwind(
&mut self,
provider: &DatabaseProviderRW<DB>,
provider: &Provider,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let (range, unwind_progress, _) =
@ -199,9 +197,7 @@ fn collect(
Ok(())
}
fn stage_checkpoint_progress<DB: Database>(
provider: &DatabaseProviderRW<DB>,
) -> ProviderResult<EntitiesCheckpoint> {
fn stage_checkpoint_progress(provider: &impl StatsReader) -> ProviderResult<EntitiesCheckpoint> {
Ok(EntitiesCheckpoint {
processed: provider.count_entries::<tables::HashedStorages>()? as u64,
total: provider.count_entries::<tables::PlainStorageState>()? as u64,

View File

@ -5,15 +5,16 @@ use reth_consensus::Consensus;
use reth_db::{tables, RawKey, RawTable, RawValue};
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW},
database::Database,
transaction::DbTxMut,
DbTxUnwindExt,
};
use reth_etl::Collector;
use reth_network_p2p::headers::{downloader::HeaderDownloader, error::HeadersDownloaderError};
use reth_primitives::{BlockHash, BlockNumber, SealedHeader, StaticFileSegment, B256};
use reth_provider::{
providers::{StaticFileProvider, StaticFileWriter},
BlockHashReader, DatabaseProviderRW, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider,
BlockHashReader, DBProvider, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider,
StaticFileProviderFactory,
};
use reth_stages_api::{
BlockErrorKind, CheckpointBlockRange, EntitiesCheckpoint, ExecInput, ExecOutput,
@ -88,9 +89,9 @@ where
///
/// Writes to static files ( `Header | HeaderTD | HeaderHash` ) and [`tables::HeaderNumbers`]
/// database table.
fn write_headers<DB: Database>(
fn write_headers(
&mut self,
provider: &DatabaseProviderRW<DB>,
provider: &impl DBProvider<Tx: DbTxMut>,
static_file_provider: StaticFileProvider,
) -> Result<BlockNumber, StageError> {
let total_headers = self.header_collector.len();
@ -183,11 +184,11 @@ where
}
}
impl<DB, Provider, D> Stage<DB> for HeaderStage<Provider, D>
impl<Provider, P, D> Stage<Provider> for HeaderStage<P, D>
where
DB: Database,
Provider: HeaderSyncGapProvider,
P: HeaderSyncGapProvider,
D: HeaderDownloader,
Provider: DBProvider<Tx: DbTxMut> + StaticFileProviderFactory,
{
/// Return the id of the stage
fn id(&self) -> StageId {
@ -259,11 +260,7 @@ where
/// Download the headers in reverse order (falling block numbers)
/// starting from the tip of the chain
fn execute(
&mut self,
provider: &DatabaseProviderRW<DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
let current_checkpoint = input.checkpoint();
if self.sync_gap.as_ref().ok_or(StageError::MissingSyncGap)?.is_closed() {
@ -281,8 +278,7 @@ where
// Write the headers and related tables to DB from ETL space
let to_be_processed = self.hash_collector.len() as u64;
let last_header_number =
self.write_headers(provider, provider.static_file_provider().clone())?;
let last_header_number = self.write_headers(provider, provider.static_file_provider())?;
// Clear ETL collectors
self.hash_collector.clear();
@ -310,7 +306,7 @@ where
/// Unwind the stage.
fn unwind(
&mut self,
provider: &DatabaseProviderRW<DB>,
provider: &Provider,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
self.sync_gap.take();
@ -318,13 +314,17 @@ where
// First unwind the db tables, until the unwind_to block number. use the walker to unwind
// HeaderNumbers based on the index in CanonicalHeaders
// unwind from the next block number since the unwind_to block is exclusive
provider.unwind_table_by_walker::<tables::CanonicalHeaders, tables::HeaderNumbers>(
(input.unwind_to + 1)..,
)?;
provider.unwind_table_by_num::<tables::CanonicalHeaders>(input.unwind_to)?;
provider.unwind_table_by_num::<tables::HeaderTerminalDifficulties>(input.unwind_to)?;
provider
.tx_ref()
.unwind_table_by_walker::<tables::CanonicalHeaders, tables::HeaderNumbers>(
(input.unwind_to + 1)..,
)?;
provider.tx_ref().unwind_table_by_num::<tables::CanonicalHeaders>(input.unwind_to)?;
provider
.tx_ref()
.unwind_table_by_num::<tables::HeaderTerminalDifficulties>(input.unwind_to)?;
let unfinalized_headers_unwound =
provider.unwind_table_by_num::<tables::Headers>(input.unwind_to)?;
provider.tx_ref().unwind_table_by_num::<tables::Headers>(input.unwind_to)?;
// determine how many headers to unwind from the static files based on the highest block and
// the unwind_to block

View File

@ -1,11 +1,9 @@
use super::{collect_history_indices, load_history_indices};
use reth_config::config::{EtlConfig, IndexHistoryConfig};
use reth_db::tables;
use reth_db_api::{database::Database, models::ShardedKey, table::Decode, transaction::DbTxMut};
use reth_db_api::{models::ShardedKey, table::Decode, transaction::DbTxMut};
use reth_primitives::Address;
use reth_provider::{
DatabaseProviderRW, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter,
};
use reth_provider::{DBProvider, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter};
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
use reth_stages_api::{
ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput,
@ -44,7 +42,11 @@ impl Default for IndexAccountHistoryStage {
}
}
impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
impl<Provider> Stage<Provider> for IndexAccountHistoryStage
where
Provider:
DBProvider<Tx: DbTxMut> + HistoryWriter + PruneCheckpointReader + PruneCheckpointWriter,
{
/// Return the id of the stage
fn id(&self) -> StageId {
StageId::IndexAccountHistory
@ -53,7 +55,7 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
/// Execute the stage.
fn execute(
&mut self,
provider: &DatabaseProviderRW<DB>,
provider: &Provider,
mut input: ExecInput,
) -> Result<ExecOutput, StageError> {
if let Some((target_prunable_block, prune_mode)) = self
@ -126,7 +128,7 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
/// Unwind the stage.
fn unwind(
&mut self,
provider: &DatabaseProviderRW<DB>,
provider: &Provider,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let (range, unwind_progress, _) =
@ -157,7 +159,7 @@ mod tests {
transaction::DbTx,
};
use reth_primitives::{address, BlockNumber, B256};
use reth_provider::providers::StaticFileWriter;
use reth_provider::{providers::StaticFileWriter, DatabaseProviderFactory};
use reth_testing_utils::generators::{
self, random_block_range, random_changeset_range, random_contract_account_range,
BlockRangeParams,
@ -217,7 +219,7 @@ mod tests {
.map(|block_number| StageCheckpoint { block_number, stage_checkpoint: None }),
};
let mut stage = IndexAccountHistoryStage::default();
let provider = db.factory.provider_rw().unwrap();
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.execute(&provider, input).unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(run_to), done: true });
provider.commit().unwrap();
@ -230,7 +232,7 @@ mod tests {
..Default::default()
};
let mut stage = IndexAccountHistoryStage::default();
let provider = db.factory.provider_rw().unwrap();
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.unwind(&provider, input).unwrap();
assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) });
provider.commit().unwrap();
@ -477,7 +479,7 @@ mod tests {
prune_mode: Some(PruneMode::Before(36)),
..Default::default()
};
let provider = db.factory.provider_rw().unwrap();
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.execute(&provider, input).unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(20000), done: true });
provider.commit().unwrap();

View File

@ -3,14 +3,11 @@ use crate::{StageCheckpoint, StageId};
use reth_config::config::{EtlConfig, IndexHistoryConfig};
use reth_db::tables;
use reth_db_api::{
database::Database,
models::{storage_sharded_key::StorageShardedKey, AddressStorageKey, BlockNumberAddress},
table::Decode,
transaction::DbTxMut,
};
use reth_provider::{
DatabaseProviderRW, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter,
};
use reth_provider::{DBProvider, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter};
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
use reth_stages_api::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use std::fmt::Debug;
@ -47,7 +44,11 @@ impl Default for IndexStorageHistoryStage {
}
}
impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
impl<Provider> Stage<Provider> for IndexStorageHistoryStage
where
Provider:
DBProvider<Tx: DbTxMut> + PruneCheckpointWriter + HistoryWriter + PruneCheckpointReader,
{
/// Return the id of the stage
fn id(&self) -> StageId {
StageId::IndexStorageHistory
@ -56,7 +57,7 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
/// Execute the stage.
fn execute(
&mut self,
provider: &DatabaseProviderRW<DB>,
provider: &Provider,
mut input: ExecInput,
) -> Result<ExecOutput, StageError> {
if let Some((target_prunable_block, prune_mode)) = self
@ -133,7 +134,7 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
/// Unwind the stage.
fn unwind(
&mut self,
provider: &DatabaseProviderRW<DB>,
provider: &Provider,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let (range, unwind_progress, _) =
@ -163,7 +164,7 @@ mod tests {
transaction::DbTx,
};
use reth_primitives::{address, b256, Address, BlockNumber, StorageEntry, B256, U256};
use reth_provider::providers::StaticFileWriter;
use reth_provider::{providers::StaticFileWriter, DatabaseProviderFactory};
use reth_testing_utils::generators::{
self, random_block_range, random_changeset_range, random_contract_account_range,
BlockRangeParams,
@ -236,7 +237,7 @@ mod tests {
.map(|block_number| StageCheckpoint { block_number, stage_checkpoint: None }),
};
let mut stage = IndexStorageHistoryStage::default();
let provider = db.factory.provider_rw().unwrap();
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.execute(&provider, input).unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(run_to), done: true });
provider.commit().unwrap();
@ -249,7 +250,7 @@ mod tests {
..Default::default()
};
let mut stage = IndexStorageHistoryStage::default();
let provider = db.factory.provider_rw().unwrap();
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.unwind(&provider, input).unwrap();
assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) });
provider.commit().unwrap();
@ -499,7 +500,7 @@ mod tests {
prune_mode: Some(PruneMode::Before(36)),
..Default::default()
};
let provider = db.factory.provider_rw().unwrap();
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.execute(&provider, input).unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(20000), done: true });
provider.commit().unwrap();

View File

@ -1,14 +1,11 @@
use reth_codecs::Compact;
use reth_consensus::ConsensusError;
use reth_db::tables;
use reth_db_api::{
database::Database,
transaction::{DbTx, DbTxMut},
};
use reth_db_api::transaction::{DbTx, DbTxMut};
use reth_primitives::{BlockNumber, GotExpected, SealedHeader, B256};
use reth_provider::{
DatabaseProviderRW, HeaderProvider, ProviderError, StageCheckpointReader,
StageCheckpointWriter, StatsReader, TrieWriter,
DBProvider, HeaderProvider, ProviderError, StageCheckpointReader, StageCheckpointWriter,
StatsReader, TrieWriter,
};
use reth_stages_api::{
BlockErrorKind, EntitiesCheckpoint, ExecInput, ExecOutput, MerkleCheckpoint, Stage,
@ -98,9 +95,9 @@ impl MerkleStage {
}
/// Gets the hashing progress
pub fn get_execution_checkpoint<DB: Database>(
pub fn get_execution_checkpoint(
&self,
provider: &DatabaseProviderRW<DB>,
provider: &impl StageCheckpointReader,
) -> Result<Option<MerkleCheckpoint>, StageError> {
let buf =
provider.get_stage_checkpoint_progress(StageId::MerkleExecute)?.unwrap_or_default();
@ -114,9 +111,9 @@ impl MerkleStage {
}
/// Saves the hashing progress
pub fn save_execution_checkpoint<DB: Database>(
pub fn save_execution_checkpoint(
&self,
provider: &DatabaseProviderRW<DB>,
provider: &impl StageCheckpointWriter,
checkpoint: Option<MerkleCheckpoint>,
) -> Result<(), StageError> {
let mut buf = vec![];
@ -132,7 +129,15 @@ impl MerkleStage {
}
}
impl<DB: Database> Stage<DB> for MerkleStage {
impl<Provider> Stage<Provider> for MerkleStage
where
Provider: DBProvider<Tx: DbTxMut>
+ TrieWriter
+ StatsReader
+ HeaderProvider
+ StageCheckpointReader
+ StageCheckpointWriter,
{
/// Return the id of the stage
fn id(&self) -> StageId {
match self {
@ -144,11 +149,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
}
/// Execute the stage.
fn execute(
&mut self,
provider: &DatabaseProviderRW<DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
let threshold = match self {
Self::Unwind => {
info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
@ -286,7 +287,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
/// Unwind the stage.
fn unwind(
&mut self,
provider: &DatabaseProviderRW<DB>,
provider: &Provider,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let tx = provider.tx_ref();

View File

@ -144,7 +144,7 @@ mod tests {
expect_num_receipts: usize,
expect_num_acc_changesets: usize,
expect_num_storage_changesets: usize| async move {
let provider = factory.provider_rw().unwrap();
let provider = factory.database_provider_rw().unwrap();
// Check execution and create receipts and changesets according to the pruning
// configuration

View File

@ -1,5 +1,8 @@
use reth_db_api::database::Database;
use reth_provider::{DatabaseProviderRW, PruneCheckpointReader, PruneCheckpointWriter};
use reth_db::transaction::DbTxMut;
use reth_provider::{
BlockReader, DBProvider, PruneCheckpointReader, PruneCheckpointWriter,
StaticFileProviderFactory,
};
use reth_prune::{
PruneMode, PruneModes, PruneSegment, PrunerBuilder, SegmentOutput, SegmentOutputCheckpoint,
};
@ -32,22 +35,25 @@ impl PruneStage {
}
}
impl<DB: Database> Stage<DB> for PruneStage {
impl<Provider> Stage<Provider> for PruneStage
where
Provider: DBProvider<Tx: DbTxMut>
+ PruneCheckpointReader
+ PruneCheckpointWriter
+ BlockReader
+ StaticFileProviderFactory,
{
fn id(&self) -> StageId {
StageId::Prune
}
fn execute(
&mut self,
provider: &DatabaseProviderRW<DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
let mut pruner = PrunerBuilder::default()
.segments(self.prune_modes.clone())
.delete_limit(self.commit_threshold)
.build(provider.static_file_provider().clone());
.build::<Provider>(provider.static_file_provider());
let result = pruner.run_with_provider(&provider.0, input.target())?;
let result = pruner.run_with_provider(provider, input.target())?;
if result.progress.is_finished() {
Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()), done: true })
} else {
@ -87,7 +93,7 @@ impl<DB: Database> Stage<DB> for PruneStage {
fn unwind(
&mut self,
provider: &DatabaseProviderRW<DB>,
provider: &Provider,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
// We cannot recover the data that was pruned in `execute`, so we just update the
@ -118,16 +124,19 @@ impl PruneSenderRecoveryStage {
}
}
impl<DB: Database> Stage<DB> for PruneSenderRecoveryStage {
impl<Provider> Stage<Provider> for PruneSenderRecoveryStage
where
Provider: DBProvider<Tx: DbTxMut>
+ PruneCheckpointReader
+ PruneCheckpointWriter
+ BlockReader
+ StaticFileProviderFactory,
{
fn id(&self) -> StageId {
StageId::PruneSenderRecovery
}
fn execute(
&mut self,
provider: &DatabaseProviderRW<DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
let mut result = self.0.execute(provider, input)?;
// Adjust the checkpoint to the highest pruned block number of the Sender Recovery segment
@ -146,7 +155,7 @@ impl<DB: Database> Stage<DB> for PruneSenderRecoveryStage {
fn unwind(
&mut self,
provider: &DatabaseProviderRW<DB>,
provider: &Provider,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
self.0.unwind(provider, input)

View File

@ -3,13 +3,13 @@ use reth_consensus::ConsensusError;
use reth_db::{static_file::TransactionMask, tables, RawValue};
use reth_db_api::{
cursor::DbCursorRW,
database::Database,
transaction::{DbTx, DbTxMut},
DbTxUnwindExt,
};
use reth_primitives::{Address, GotExpected, StaticFileSegment, TransactionSignedNoHash, TxNumber};
use reth_provider::{
BlockReader, DatabaseProviderRW, HeaderProvider, ProviderError, PruneCheckpointReader,
StatsReader,
BlockReader, DBProvider, HeaderProvider, ProviderError, PruneCheckpointReader,
StaticFileProviderFactory, StatsReader,
};
use reth_prune_types::PruneSegment;
use reth_stages_api::{
@ -51,7 +51,14 @@ impl Default for SenderRecoveryStage {
}
}
impl<DB: Database> Stage<DB> for SenderRecoveryStage {
impl<Provider> Stage<Provider> for SenderRecoveryStage
where
Provider: DBProvider<Tx: DbTxMut>
+ BlockReader
+ StaticFileProviderFactory
+ StatsReader
+ PruneCheckpointReader,
{
/// Return the id of the stage
fn id(&self) -> StageId {
StageId::SenderRecovery
@ -61,11 +68,7 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
/// [`BlockBodyIndices`][reth_db::tables::BlockBodyIndices],
/// collect transactions within that range, recover signer for each transaction and store
/// entries in the [`TransactionSenders`][reth_db::tables::TransactionSenders] table.
fn execute(
&mut self,
provider: &DatabaseProviderRW<DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint()))
}
@ -110,7 +113,7 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
/// Unwind the stage.
fn unwind(
&mut self,
provider: &DatabaseProviderRW<DB>,
provider: &Provider,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let (_, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold);
@ -120,7 +123,7 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
.block_body_indices(unwind_to)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(unwind_to))?
.last_tx_num();
provider.unwind_table_by_num::<tables::TransactionSenders>(latest_tx_id)?;
provider.tx_ref().unwind_table_by_num::<tables::TransactionSenders>(latest_tx_id)?;
Ok(UnwindOutput {
checkpoint: StageCheckpoint::new(unwind_to)
@ -129,13 +132,13 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
}
}
fn recover_range<DB, CURSOR>(
fn recover_range<Provider, CURSOR>(
tx_range: Range<u64>,
provider: &DatabaseProviderRW<DB>,
provider: &Provider,
senders_cursor: &mut CURSOR,
) -> Result<(), StageError>
where
DB: Database,
Provider: DBProvider + HeaderProvider + StaticFileProviderFactory,
CURSOR: DbCursorRW<tables::TransactionSenders>,
{
debug!(target: "sync::stages::sender_recovery", ?tx_range, "Recovering senders batch");
@ -152,7 +155,7 @@ where
})
.unzip();
let static_file_provider = provider.static_file_provider().clone();
let static_file_provider = provider.static_file_provider();
// We do not use `tokio::task::spawn_blocking` because, during a shutdown,
// there will be a timeout grace period in which Tokio does not allow spawning
@ -287,9 +290,10 @@ fn recover_sender(
Ok((tx_id, sender))
}
fn stage_checkpoint<DB: Database>(
provider: &DatabaseProviderRW<DB>,
) -> Result<EntitiesCheckpoint, StageError> {
fn stage_checkpoint<Provider>(provider: &Provider) -> Result<EntitiesCheckpoint, StageError>
where
Provider: StatsReader + StaticFileProviderFactory + PruneCheckpointReader,
{
let pruned_entries = provider
.get_prune_checkpoint(PruneSegment::SenderRecovery)?
.and_then(|checkpoint| checkpoint.tx_number)
@ -335,8 +339,8 @@ mod tests {
use reth_db_api::cursor::DbCursorRO;
use reth_primitives::{BlockNumber, SealedBlock, TransactionSigned, B256};
use reth_provider::{
providers::StaticFileWriter, PruneCheckpointWriter, StaticFileProviderFactory,
TransactionsProvider,
providers::StaticFileWriter, DatabaseProviderFactory, PruneCheckpointWriter,
StaticFileProviderFactory, TransactionsProvider,
};
use reth_prune_types::{PruneCheckpoint, PruneMode};
use reth_stages_api::StageUnitCheckpoint;
@ -529,7 +533,7 @@ mod tests {
.expect("save stage checkpoint");
provider.commit().expect("commit");
let provider = db.factory.provider_rw().unwrap();
let provider = db.factory.database_provider_rw().unwrap();
assert_eq!(
stage_checkpoint(&provider).expect("stage checkpoint"),
EntitiesCheckpoint {

View File

@ -3,14 +3,13 @@ use reth_config::config::{EtlConfig, TransactionLookupConfig};
use reth_db::{tables, RawKey, RawValue};
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW},
database::Database,
transaction::{DbTx, DbTxMut},
};
use reth_etl::Collector;
use reth_primitives::{TxHash, TxNumber};
use reth_provider::{
BlockReader, DatabaseProviderRW, PruneCheckpointReader, PruneCheckpointWriter, StatsReader,
TransactionsProvider, TransactionsProviderExt,
BlockReader, DBProvider, PruneCheckpointReader, PruneCheckpointWriter,
StaticFileProviderFactory, StatsReader, TransactionsProvider, TransactionsProviderExt,
};
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
use reth_stages_api::{
@ -54,7 +53,16 @@ impl TransactionLookupStage {
}
}
impl<DB: Database> Stage<DB> for TransactionLookupStage {
impl<Provider> Stage<Provider> for TransactionLookupStage
where
Provider: DBProvider<Tx: DbTxMut>
+ PruneCheckpointWriter
+ BlockReader
+ PruneCheckpointReader
+ StatsReader
+ StaticFileProviderFactory
+ TransactionsProviderExt,
{
/// Return the id of the stage
fn id(&self) -> StageId {
StageId::TransactionLookup
@ -63,7 +71,7 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
/// Write transaction hash -> id entries
fn execute(
&mut self,
provider: &DatabaseProviderRW<DB>,
provider: &Provider,
mut input: ExecInput,
) -> Result<ExecOutput, StageError> {
if let Some((target_prunable_block, prune_mode)) = self
@ -178,7 +186,7 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
/// Unwind the stage.
fn unwind(
&mut self,
provider: &DatabaseProviderRW<DB>,
provider: &Provider,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let tx = provider.tx_ref();
@ -212,9 +220,10 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
}
}
fn stage_checkpoint<DB: Database>(
provider: &DatabaseProviderRW<DB>,
) -> Result<EntitiesCheckpoint, StageError> {
fn stage_checkpoint<Provider>(provider: &Provider) -> Result<EntitiesCheckpoint, StageError>
where
Provider: PruneCheckpointReader + StaticFileProviderFactory + StatsReader,
{
let pruned_entries = provider
.get_prune_checkpoint(PruneSegment::TransactionLookup)?
.and_then(|checkpoint| checkpoint.tx_number)
@ -243,7 +252,9 @@ mod tests {
};
use assert_matches::assert_matches;
use reth_primitives::{BlockNumber, SealedBlock, B256};
use reth_provider::{providers::StaticFileWriter, StaticFileProviderFactory};
use reth_provider::{
providers::StaticFileWriter, DatabaseProviderFactory, StaticFileProviderFactory,
};
use reth_stages_api::StageUnitCheckpoint;
use reth_testing_utils::generators::{
self, random_block, random_block_range, BlockParams, BlockRangeParams,
@ -397,7 +408,7 @@ mod tests {
.expect("save stage checkpoint");
provider.commit().expect("commit");
let provider = db.factory.provider_rw().unwrap();
let provider = db.factory.database_provider_rw().unwrap();
assert_eq!(
stage_checkpoint(&provider).expect("stage checkpoint"),
EntitiesCheckpoint {

View File

@ -1,6 +1,6 @@
//! Utils for `stages`.
use reth_config::config::EtlConfig;
use reth_db::{BlockNumberList, Database};
use reth_db::BlockNumberList;
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW},
models::sharded_key::NUM_OF_INDICES_IN_SHARD,
@ -10,7 +10,7 @@ use reth_db_api::{
};
use reth_etl::Collector;
use reth_primitives::BlockNumber;
use reth_provider::DatabaseProviderRW;
use reth_provider::DBProvider;
use reth_stages_api::StageError;
use std::{collections::HashMap, hash::Hash, ops::RangeBounds};
use tracing::info;
@ -35,15 +35,15 @@ const DEFAULT_CACHE_THRESHOLD: u64 = 100_000;
///
/// As a result, the `Collector` will contain entries such as `(Address1.3, [1,2,3])` and
/// `(Address1.300, [100,300])`. The entries may be stored across one or more files.
pub(crate) fn collect_history_indices<DB, CS, H, P>(
provider: &DatabaseProviderRW<DB>,
pub(crate) fn collect_history_indices<Provider, CS, H, P>(
provider: &Provider,
range: impl RangeBounds<CS::Key>,
sharded_key_factory: impl Fn(P, BlockNumber) -> H::Key,
partial_key_factory: impl Fn((CS::Key, CS::Value)) -> (u64, P),
etl_config: &EtlConfig,
) -> Result<Collector<H::Key, H::Value>, StageError>
where
DB: Database,
Provider: DBProvider,
CS: Table,
H: Table<Value = BlockNumberList>,
P: Copy + Eq + Hash,
@ -102,8 +102,8 @@ where
/// `Address.StorageKey`). It flushes indices to disk when reaching a shard's max length
/// (`NUM_OF_INDICES_IN_SHARD`) or when the partial key changes, ensuring the last previous partial
/// key shard is stored.
pub(crate) fn load_history_indices<DB, H, P>(
provider: &DatabaseProviderRW<DB>,
pub(crate) fn load_history_indices<Provider, H, P>(
provider: &Provider,
mut collector: Collector<H::Key, H::Value>,
append_only: bool,
sharded_key_factory: impl Clone + Fn(P, u64) -> <H as Table>::Key,
@ -111,7 +111,7 @@ pub(crate) fn load_history_indices<DB, H, P>(
get_partial: impl Fn(<H as Table>::Key) -> P,
) -> Result<(), StageError>
where
DB: Database,
Provider: DBProvider<Tx: DbTxMut>,
H: Table<Value = BlockNumberList>,
P: Copy + Default + Eq,
{

View File

@ -1,11 +1,10 @@
use super::TestStageDB;
use reth_db::{test_utils::TempDatabase, DatabaseEnv};
use reth_provider::ProviderError;
use reth_db::{test_utils::TempDatabase, Database, DatabaseEnv};
use reth_provider::{DatabaseProvider, ProviderError};
use reth_stages_api::{
ExecInput, ExecOutput, Stage, StageError, StageExt, UnwindInput, UnwindOutput,
};
use reth_storage_errors::db::DatabaseError;
use std::sync::Arc;
use tokio::sync::oneshot;
#[derive(thiserror::Error, Debug)]
@ -20,7 +19,7 @@ pub(crate) enum TestRunnerError {
/// A generic test runner for stages.
pub(crate) trait StageTestRunner {
type S: Stage<Arc<TempDatabase<DatabaseEnv>>> + 'static;
type S: Stage<DatabaseProvider<<TempDatabase<DatabaseEnv> as Database>::TXMut>> + 'static;
/// Return a reference to the database.
fn db(&self) -> &TestStageDB;

View File

@ -1,6 +1,5 @@
use super::TEST_STAGE_ID;
use crate::{StageSet, StageSetBuilder};
use reth_db_api::database::Database;
use reth_stages_api::{test_utils::TestStage, ExecOutput, StageError, UnwindOutput};
use std::collections::VecDeque;
@ -19,8 +18,8 @@ impl TestStages {
}
}
impl<DB: Database> StageSet<DB> for TestStages {
fn builder(self) -> StageSetBuilder<DB> {
impl<Provider> StageSet<Provider> for TestStages {
fn builder(self) -> StageSetBuilder<Provider> {
StageSetBuilder::default().add_stage(
TestStage::new(TEST_STAGE_ID)
.with_exec(self.exec_outputs)

View File

@ -81,3 +81,6 @@ mod scale;
mod utils;
pub use database::Database;
mod unwind;
pub use unwind::DbTxUnwindExt;

View File

@ -0,0 +1,63 @@
use crate::{cursor::DbCursorRO, table::Table, transaction::DbTxMut};
use reth_storage_errors::db::DatabaseError;
use std::ops::RangeBounds;
/// Extension trait for [`DbTxMut`] that provides unwind functionality.
pub trait DbTxUnwindExt: DbTxMut {
/// Unwind table by some number key.
/// Returns number of rows unwound.
///
/// Note: Key is not inclusive and specified key would stay in db.
#[inline]
fn unwind_table_by_num<T>(&self, num: u64) -> Result<usize, DatabaseError>
where
T: Table<Key = u64>,
{
self.unwind_table::<T, _>(num, |key| key)
}
/// Unwind the table to a provided number key.
/// Returns number of rows unwound.
///
/// Note: Key is not inclusive and specified key would stay in db.
fn unwind_table<T, F>(&self, key: u64, mut selector: F) -> Result<usize, DatabaseError>
where
T: Table,
F: FnMut(T::Key) -> u64,
{
let mut cursor = self.cursor_write::<T>()?;
let mut reverse_walker = cursor.walk_back(None)?;
let mut deleted = 0;
while let Some(Ok((entry_key, _))) = reverse_walker.next() {
if selector(entry_key.clone()) <= key {
break
}
reverse_walker.delete_current()?;
deleted += 1;
}
Ok(deleted)
}
/// Unwind a table forward by a [`Walker`][crate::cursor::Walker] on another table.
///
/// Note: Range is inclusive and first key in the range is removed.
fn unwind_table_by_walker<T1, T2>(
&self,
range: impl RangeBounds<T1::Key>,
) -> Result<(), DatabaseError>
where
T1: Table,
T2: Table<Key = T1::Value>,
{
let mut cursor = self.cursor_write::<T1>()?;
let mut walker = cursor.walk_range(range)?;
while let Some((_, value)) = walker.next().transpose()? {
self.delete::<T2>(value, None)?;
}
Ok(())
}
}
impl<T> DbTxUnwindExt for T where T: DbTxMut {}

View File

@ -1415,8 +1415,9 @@ mod tests {
};
use reth_storage_api::{
BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, BlockReaderIdExt, BlockSource,
ChangeSetReader, HeaderProvider, ReceiptProvider, ReceiptProviderIdExt, RequestsProvider,
StateProviderFactory, TransactionVariant, TransactionsProvider, WithdrawalsProvider,
ChangeSetReader, DatabaseProviderFactory, HeaderProvider, ReceiptProvider,
ReceiptProviderIdExt, RequestsProvider, StateProviderFactory, TransactionVariant,
TransactionsProvider, WithdrawalsProvider,
};
use reth_testing_utils::generators::{
self, random_block, random_block_range, random_changeset_range, random_eoa_accounts,
@ -1492,7 +1493,7 @@ mod tests {
.collect();
let factory = create_test_provider_factory_with_chain_spec(chain_spec);
let provider_rw = factory.provider_rw()?;
let provider_rw = factory.database_provider_rw()?;
// Insert blocks into the database
for block in &database_blocks {

View File

@ -793,8 +793,9 @@ mod tests {
);
// Checkpoint and no gap
let static_file_provider = provider.static_file_provider();
let mut static_file_writer =
provider.static_file_provider().latest_writer(StaticFileSegment::Headers).unwrap();
static_file_provider.latest_writer(StaticFileSegment::Headers).unwrap();
static_file_writer.append_header(head.header(), U256::ZERO, &head.hash()).unwrap();
static_file_writer.commit().unwrap();
drop(static_file_writer);

View File

@ -12,8 +12,8 @@ use crate::{
HistoricalStateProvider, HistoryWriter, LatestStateProvider, OriginalValuesKnown,
ProviderError, PruneCheckpointReader, PruneCheckpointWriter, RequestsProvider, RevertsInit,
StageCheckpointReader, StateChangeWriter, StateProviderBox, StateReader, StateWriter,
StatsReader, StorageReader, StorageTrieWriter, TransactionVariant, TransactionsProvider,
TransactionsProviderExt, TrieWriter, WithdrawalsProvider,
StaticFileProviderFactory, StatsReader, StorageReader, StorageTrieWriter, TransactionVariant,
TransactionsProvider, TransactionsProviderExt, TrieWriter, WithdrawalsProvider,
};
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::{keccak256, Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256};
@ -33,7 +33,7 @@ use reth_db_api::{
},
table::Table,
transaction::{DbTx, DbTxMut},
DatabaseError,
DatabaseError, DbTxUnwindExt,
};
use reth_evm::ConfigureEvmEnv;
use reth_execution_types::{Chain, ExecutionOutcome};
@ -132,17 +132,19 @@ pub struct DatabaseProvider<TX> {
}
impl<TX> DatabaseProvider<TX> {
/// Returns a static file provider
pub const fn static_file_provider(&self) -> &StaticFileProvider {
&self.static_file_provider
}
/// Returns reference to prune modes.
pub const fn prune_modes_ref(&self) -> &PruneModes {
&self.prune_modes
}
}
impl<TX> StaticFileProviderFactory for DatabaseProvider<TX> {
/// Returns a static file provider
fn static_file_provider(&self) -> StaticFileProvider {
self.static_file_provider.clone()
}
}
impl<TX: DbTxMut> DatabaseProvider<TX> {
/// Creates a provider with an inner read-write transaction.
pub const fn new_rw(
@ -204,7 +206,7 @@ impl<TX: DbTx + 'static> TryIntoHistoricalStateProvider for DatabaseProvider<TX>
}
}
impl<DB: Database> DatabaseProviderRW<DB> {
impl<Tx: DbTx + DbTxMut + 'static> DatabaseProvider<Tx> {
// TODO: uncomment below, once `reth debug_cmd` has been feature gated with dev.
// #[cfg(any(test, feature = "test-utils"))]
/// Inserts an historical block. **Used for setting up test environments**
@ -953,216 +955,6 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
Ok(self.tx.commit()?)
}
/// Remove the last N blocks of state.
///
/// The latest state will be unwound
///
/// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
/// transaction ids.
/// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
/// [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
/// the changesets.
/// - In order to have both the old and new values in the changesets, we also access the
/// plain state tables.
/// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
/// we:
/// 1. Take the old value from the changeset
/// 2. Take the new value from the plain state
/// 3. Save the old value to the local state
/// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
/// have seen before we:
/// 1. Take the old value from the changeset
/// 2. Take the new value from the local state
/// 3. Set the local state to the value in the changeset
pub fn remove_state(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
if range.is_empty() {
return Ok(())
}
// We are not removing block meta as it is used to get block changesets.
let block_bodies = self.get::<tables::BlockBodyIndices>(range.clone())?;
// get transaction receipts
let from_transaction_num =
block_bodies.first().expect("already checked if there are blocks").1.first_tx_num();
let to_transaction_num =
block_bodies.last().expect("already checked if there are blocks").1.last_tx_num();
let storage_range = BlockNumberAddress::range(range.clone());
let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
// This is not working for blocks that are not at tip. as plain state is not the last
// state of end range. We should rename the functions or add support to access
// History state. Accessing history state can be tricky but we are not gaining
// anything.
let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
let (state, _) = self.populate_bundle_state(
account_changeset,
storage_changeset,
&mut plain_accounts_cursor,
&mut plain_storage_cursor,
)?;
// iterate over local plain state remove all account and all storages.
for (address, (old_account, new_account, storage)) in &state {
// revert account if needed.
if old_account != new_account {
let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
if let Some(account) = old_account {
plain_accounts_cursor.upsert(*address, *account)?;
} else if existing_entry.is_some() {
plain_accounts_cursor.delete_current()?;
}
}
// revert storages
for (storage_key, (old_storage_value, _new_storage_value)) in storage {
let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
// delete previous value
// TODO: This does not use dupsort features
if plain_storage_cursor
.seek_by_key_subkey(*address, *storage_key)?
.filter(|s| s.key == *storage_key)
.is_some()
{
plain_storage_cursor.delete_current()?
}
// insert value if needed
if !old_storage_value.is_zero() {
plain_storage_cursor.upsert(*address, storage_entry)?;
}
}
}
// iterate over block body and remove receipts
self.remove::<tables::Receipts>(from_transaction_num..=to_transaction_num)?;
Ok(())
}
/// Take the last N blocks of state, recreating the [`ExecutionOutcome`].
///
/// The latest state will be unwound and returned back with all the blocks
///
/// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
/// transaction ids.
/// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
/// [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
/// the changesets.
/// - In order to have both the old and new values in the changesets, we also access the
/// plain state tables.
/// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
/// we:
/// 1. Take the old value from the changeset
/// 2. Take the new value from the plain state
/// 3. Save the old value to the local state
/// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
/// have seen before we:
/// 1. Take the old value from the changeset
/// 2. Take the new value from the local state
/// 3. Set the local state to the value in the changeset
pub fn take_state(
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<ExecutionOutcome> {
if range.is_empty() {
return Ok(ExecutionOutcome::default())
}
let start_block_number = *range.start();
// We are not removing block meta as it is used to get block changesets.
let block_bodies = self.get::<tables::BlockBodyIndices>(range.clone())?;
// get transaction receipts
let from_transaction_num =
block_bodies.first().expect("already checked if there are blocks").1.first_tx_num();
let to_transaction_num =
block_bodies.last().expect("already checked if there are blocks").1.last_tx_num();
let storage_range = BlockNumberAddress::range(range.clone());
let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
// This is not working for blocks that are not at tip. as plain state is not the last
// state of end range. We should rename the functions or add support to access
// History state. Accessing history state can be tricky but we are not gaining
// anything.
let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
// populate bundle state and reverts from changesets / state cursors, to iterate over,
// remove, and return later
let (state, reverts) = self.populate_bundle_state(
account_changeset,
storage_changeset,
&mut plain_accounts_cursor,
&mut plain_storage_cursor,
)?;
// iterate over local plain state remove all account and all storages.
for (address, (old_account, new_account, storage)) in &state {
// revert account if needed.
if old_account != new_account {
let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
if let Some(account) = old_account {
plain_accounts_cursor.upsert(*address, *account)?;
} else if existing_entry.is_some() {
plain_accounts_cursor.delete_current()?;
}
}
// revert storages
for (storage_key, (old_storage_value, _new_storage_value)) in storage {
let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
// delete previous value
// TODO: This does not use dupsort features
if plain_storage_cursor
.seek_by_key_subkey(*address, *storage_key)?
.filter(|s| s.key == *storage_key)
.is_some()
{
plain_storage_cursor.delete_current()?
}
// insert value if needed
if !old_storage_value.is_zero() {
plain_storage_cursor.upsert(*address, storage_entry)?;
}
}
}
// iterate over block body and create ExecutionResult
let mut receipt_iter =
self.take::<tables::Receipts>(from_transaction_num..=to_transaction_num)?.into_iter();
let mut receipts = Vec::with_capacity(block_bodies.len());
// loop break if we are at the end of the blocks.
for (_, block_body) in block_bodies {
let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
for _ in block_body.tx_num_range() {
if let Some((_, receipt)) = receipt_iter.next() {
block_receipts.push(Some(receipt));
}
}
receipts.push(block_receipts);
}
Ok(ExecutionOutcome::new_init(
state,
reverts,
Vec::new(),
receipts.into(),
start_block_number,
Vec::new(),
))
}
/// Remove list of entries from the table. Returns the number of entries removed.
#[inline]
pub fn remove<T: Table>(
@ -1356,7 +1148,7 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
return Ok(())
}
self.unwind_table_by_walker::<tables::CanonicalHeaders, tables::HeaderNumbers>(
self.tx.unwind_table_by_walker::<tables::CanonicalHeaders, tables::HeaderNumbers>(
range.clone(),
)?;
self.remove::<tables::CanonicalHeaders>(range.clone())?;
@ -1399,7 +1191,7 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
return Ok(Vec::new())
}
self.unwind_table_by_walker::<tables::CanonicalHeaders, tables::HeaderNumbers>(
self.tx.unwind_table_by_walker::<tables::CanonicalHeaders, tables::HeaderNumbers>(
range.clone(),
)?;
let block_header_hashes = self.take::<tables::CanonicalHeaders>(range.clone())?;
@ -1480,65 +1272,6 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
Ok(blocks)
}
/// Unwind table by some number key.
/// Returns number of rows unwound.
///
/// Note: Key is not inclusive and specified key would stay in db.
#[inline]
pub fn unwind_table_by_num<T>(&self, num: u64) -> Result<usize, DatabaseError>
where
T: Table<Key = u64>,
{
self.unwind_table::<T, _>(num, |key| key)
}
/// Unwind the table to a provided number key.
/// Returns number of rows unwound.
///
/// Note: Key is not inclusive and specified key would stay in db.
pub(crate) fn unwind_table<T, F>(
&self,
key: u64,
mut selector: F,
) -> Result<usize, DatabaseError>
where
T: Table,
F: FnMut(T::Key) -> u64,
{
let mut cursor = self.tx.cursor_write::<T>()?;
let mut reverse_walker = cursor.walk_back(None)?;
let mut deleted = 0;
while let Some(Ok((entry_key, _))) = reverse_walker.next() {
if selector(entry_key.clone()) <= key {
break
}
reverse_walker.delete_current()?;
deleted += 1;
}
Ok(deleted)
}
/// Unwind a table forward by a [`Walker`][reth_db_api::cursor::Walker] on another table.
///
/// Note: Range is inclusive and first key in the range is removed.
pub fn unwind_table_by_walker<T1, T2>(
&self,
range: impl RangeBounds<T1::Key>,
) -> Result<(), DatabaseError>
where
T1: Table,
T2: Table<Key = T1::Value>,
{
let mut cursor = self.tx.cursor_write::<T1>()?;
let mut walker = cursor.walk_range(range)?;
while let Some((_, value)) = walker.next().transpose()? {
self.tx.delete::<T2>(value, None)?;
}
Ok(())
}
/// Load shard and remove it. If list is empty, last shard was full or
/// there are no shards at all.
fn take_shard<T>(&self, key: T::Key) -> ProviderResult<Vec<u64>>
@ -2717,6 +2450,213 @@ impl<TX: DbTxMut + DbTx> StateChangeWriter for DatabaseProvider<TX> {
Ok(())
}
/// Remove the last N blocks of state.
///
/// The latest state will be unwound
///
/// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
/// transaction ids.
/// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
/// [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
/// the changesets.
/// - In order to have both the old and new values in the changesets, we also access the
/// plain state tables.
/// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
/// we:
/// 1. Take the old value from the changeset
/// 2. Take the new value from the plain state
/// 3. Save the old value to the local state
/// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
/// have seen before we:
/// 1. Take the old value from the changeset
/// 2. Take the new value from the local state
/// 3. Set the local state to the value in the changeset
fn remove_state(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
if range.is_empty() {
return Ok(())
}
// We are not removing block meta as it is used to get block changesets.
let block_bodies = self.get::<tables::BlockBodyIndices>(range.clone())?;
// get transaction receipts
let from_transaction_num =
block_bodies.first().expect("already checked if there are blocks").1.first_tx_num();
let to_transaction_num =
block_bodies.last().expect("already checked if there are blocks").1.last_tx_num();
let storage_range = BlockNumberAddress::range(range.clone());
let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
// This is not working for blocks that are not at tip. as plain state is not the last
// state of end range. We should rename the functions or add support to access
// History state. Accessing history state can be tricky but we are not gaining
// anything.
let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
let (state, _) = self.populate_bundle_state(
account_changeset,
storage_changeset,
&mut plain_accounts_cursor,
&mut plain_storage_cursor,
)?;
// iterate over local plain state remove all account and all storages.
for (address, (old_account, new_account, storage)) in &state {
// revert account if needed.
if old_account != new_account {
let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
if let Some(account) = old_account {
plain_accounts_cursor.upsert(*address, *account)?;
} else if existing_entry.is_some() {
plain_accounts_cursor.delete_current()?;
}
}
// revert storages
for (storage_key, (old_storage_value, _new_storage_value)) in storage {
let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
// delete previous value
// TODO: This does not use dupsort features
if plain_storage_cursor
.seek_by_key_subkey(*address, *storage_key)?
.filter(|s| s.key == *storage_key)
.is_some()
{
plain_storage_cursor.delete_current()?
}
// insert value if needed
if !old_storage_value.is_zero() {
plain_storage_cursor.upsert(*address, storage_entry)?;
}
}
}
// iterate over block body and remove receipts
self.remove::<tables::Receipts>(from_transaction_num..=to_transaction_num)?;
Ok(())
}
/// Take the last N blocks of state, recreating the [`ExecutionOutcome`].
///
/// The latest state will be unwound and returned back with all the blocks
///
/// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
/// transaction ids.
/// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
/// [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
/// the changesets.
/// - In order to have both the old and new values in the changesets, we also access the
/// plain state tables.
/// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
/// we:
/// 1. Take the old value from the changeset
/// 2. Take the new value from the plain state
/// 3. Save the old value to the local state
/// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
/// have seen before we:
/// 1. Take the old value from the changeset
/// 2. Take the new value from the local state
/// 3. Set the local state to the value in the changeset
fn take_state(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<ExecutionOutcome> {
if range.is_empty() {
return Ok(ExecutionOutcome::default())
}
let start_block_number = *range.start();
// We are not removing block meta as it is used to get block changesets.
let block_bodies = self.get::<tables::BlockBodyIndices>(range.clone())?;
// get transaction receipts
let from_transaction_num =
block_bodies.first().expect("already checked if there are blocks").1.first_tx_num();
let to_transaction_num =
block_bodies.last().expect("already checked if there are blocks").1.last_tx_num();
let storage_range = BlockNumberAddress::range(range.clone());
let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
// This is not working for blocks that are not at tip. as plain state is not the last
// state of end range. We should rename the functions or add support to access
// History state. Accessing history state can be tricky but we are not gaining
// anything.
let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
// populate bundle state and reverts from changesets / state cursors, to iterate over,
// remove, and return later
let (state, reverts) = self.populate_bundle_state(
account_changeset,
storage_changeset,
&mut plain_accounts_cursor,
&mut plain_storage_cursor,
)?;
// iterate over local plain state remove all account and all storages.
for (address, (old_account, new_account, storage)) in &state {
// revert account if needed.
if old_account != new_account {
let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
if let Some(account) = old_account {
plain_accounts_cursor.upsert(*address, *account)?;
} else if existing_entry.is_some() {
plain_accounts_cursor.delete_current()?;
}
}
// revert storages
for (storage_key, (old_storage_value, _new_storage_value)) in storage {
let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
// delete previous value
// TODO: This does not use dupsort features
if plain_storage_cursor
.seek_by_key_subkey(*address, *storage_key)?
.filter(|s| s.key == *storage_key)
.is_some()
{
plain_storage_cursor.delete_current()?
}
// insert value if needed
if !old_storage_value.is_zero() {
plain_storage_cursor.upsert(*address, storage_entry)?;
}
}
}
// iterate over block body and create ExecutionResult
let mut receipt_iter =
self.take::<tables::Receipts>(from_transaction_num..=to_transaction_num)?.into_iter();
let mut receipts = Vec::with_capacity(block_bodies.len());
// loop break if we are at the end of the blocks.
for (_, block_body) in block_bodies {
let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
for _ in block_body.tx_num_range() {
if let Some((_, receipt)) = receipt_iter.next() {
block_receipts.push(Some(receipt));
}
}
receipts.push(block_receipts);
}
Ok(ExecutionOutcome::new_init(
state,
reverts,
Vec::new(),
receipts.into(),
start_block_number,
Vec::new(),
))
}
}
impl<TX: DbTxMut + DbTx> TrieWriter for DatabaseProvider<TX> {

View File

@ -6,6 +6,7 @@ use revm::db::{
states::{PlainStateReverts, StateChangeset},
OriginalValuesKnown,
};
use std::ops::RangeInclusive;
/// A helper trait for [`ExecutionOutcome`] to write state and receipts to storage.
pub trait StateWriter {
@ -34,4 +35,10 @@ pub trait StateChangeWriter {
/// Writes the hashed state changes to the database
fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()>;
/// Remove the block range of state.
fn remove_state(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()>;
/// Take the block range of state, recreating the [`ExecutionOutcome`].
fn take_state(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<ExecutionOutcome>;
}

View File

@ -46,11 +46,8 @@ impl<'a, ProviderDB, ProviderSF> UnifiedStorageWriter<'a, ProviderDB, ProviderSF
/// # Parameters
/// - `database`: An optional reference to a database provider.
/// - `static_file`: An optional mutable reference to a static file instance.
pub fn new<P>(database: &'a P, static_file: Option<ProviderSF>) -> Self
where
P: AsRef<ProviderDB>,
{
Self { database: database.as_ref(), static_file }
pub const fn new(database: &'a ProviderDB, static_file: Option<ProviderSF>) -> Self {
Self { database, static_file }
}
/// Creates a new instance of [`UnifiedStorageWriter`] from a database provider and a static
@ -59,7 +56,7 @@ impl<'a, ProviderDB, ProviderSF> UnifiedStorageWriter<'a, ProviderDB, ProviderSF
where
P: AsRef<ProviderDB>,
{
Self::new(database, Some(static_file))
Self::new(database.as_ref(), Some(static_file))
}
/// Creates a new instance of [`UnifiedStorageWriter`] from a database provider.
@ -67,7 +64,7 @@ impl<'a, ProviderDB, ProviderSF> UnifiedStorageWriter<'a, ProviderDB, ProviderSF
where
P: AsRef<ProviderDB>,
{
Self::new(database, None)
Self::new(database.as_ref(), None)
}
/// Returns a reference to the database writer.
@ -550,6 +547,7 @@ mod tests {
transaction::{DbTx, DbTxMut},
};
use reth_primitives::{Account, Address, Receipt, Receipts, StorageEntry};
use reth_storage_api::DatabaseProviderFactory;
use reth_trie::{
test_utils::{state_root, storage_root_prehashed},
HashedPostState, HashedStorage, StateRoot, StorageRoot,
@ -756,7 +754,7 @@ mod tests {
#[test]
fn write_to_db_storage() {
let factory = create_test_provider_factory();
let provider = factory.provider_rw().unwrap();
let provider = factory.database_provider_rw().unwrap();
let address_a = Address::ZERO;
let address_b = Address::repeat_byte(0xff);
@ -951,7 +949,7 @@ mod tests {
#[test]
fn write_to_db_multiple_selfdestructs() {
let factory = create_test_provider_factory();
let provider = factory.provider_rw().unwrap();
let provider = factory.database_provider_rw().unwrap();
let address1 = Address::random();
let account_info = RevmAccountInfo { nonce: 1, ..Default::default() };
@ -1266,7 +1264,7 @@ mod tests {
#[test]
fn storage_change_after_selfdestruct_within_block() {
let factory = create_test_provider_factory();
let provider = factory.provider_rw().unwrap();
let provider = factory.database_provider_rw().unwrap();
let address1 = Address::random();
let account1 = RevmAccountInfo { nonce: 1, ..Default::default() };
@ -1416,7 +1414,7 @@ mod tests {
.collect();
let provider_factory = create_test_provider_factory();
let provider_rw = provider_factory.provider_rw().unwrap();
let provider_rw = provider_factory.database_provider_rw().unwrap();
// insert initial state to the database
let tx = provider_rw.tx_ref();

View File

@ -9,7 +9,7 @@ use rayon::iter::{ParallelBridge, ParallelIterator};
use reth_primitives::{BlockBody, SealedBlock, StaticFileSegment};
use reth_provider::{
providers::StaticFileWriter, test_utils::create_test_provider_factory_with_chain_spec,
HashingWriter,
DatabaseProviderFactory, HashingWriter, StaticFileProviderFactory,
};
use reth_stages::{stages::ExecutionStage, ExecInput, Stage};
use std::{collections::BTreeMap, fs, path::Path, sync::Arc};
@ -86,7 +86,7 @@ impl Case for BlockchainTestCase {
let provider = create_test_provider_factory_with_chain_spec(Arc::new(
case.network.clone().into(),
))
.provider_rw()
.database_provider_rw()
.unwrap();
// Insert initial test state into the provider.
@ -102,10 +102,9 @@ impl Case for BlockchainTestCase {
// Initialize receipts static file with genesis
{
let mut receipts_writer = provider
.static_file_provider()
.latest_writer(StaticFileSegment::Receipts)
.unwrap();
let static_file_provider = provider.static_file_provider();
let mut receipts_writer =
static_file_provider.latest_writer(StaticFileSegment::Receipts).unwrap();
receipts_writer.increment_block(0).unwrap();
receipts_writer.commit_without_sync_all().unwrap();
}