feat: make StaticFileProvider generic over NodePrimitives (#12565)

This commit is contained in:
Arsenii Kulikov
2024-11-15 14:42:58 +04:00
committed by GitHub
parent cd9da550da
commit 72a52d5ea5
38 changed files with 324 additions and 222 deletions

1
Cargo.lock generated
View File

@ -8140,6 +8140,7 @@ dependencies = [
"reqwest",
"reth-db-api",
"reth-metrics",
"reth-primitives-traits",
"reth-provider",
"reth-tasks",
"socket2",

View File

@ -109,7 +109,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Environmen
&self,
config: &Config,
db: Arc<DatabaseEnv>,
static_file_provider: StaticFileProvider,
static_file_provider: StaticFileProvider<N::Primitives>,
) -> eyre::Result<ProviderFactory<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>> {
let has_receipt_pruning = config.prune.as_ref().map_or(false, |a| a.has_receipts_pruning());
let prune_modes =

View File

@ -9,7 +9,9 @@ use reth_db::{mdbx, static_file::iter_static_files, DatabaseEnv, TableViewer, Ta
use reth_db_api::database::Database;
use reth_db_common::DbTool;
use reth_fs_util as fs;
use reth_node_builder::{NodeTypesWithDB, NodeTypesWithDBAdapter, NodeTypesWithEngine};
use reth_node_builder::{
NodePrimitives, NodeTypesWithDB, NodeTypesWithDBAdapter, NodeTypesWithEngine,
};
use reth_node_core::dirs::{ChainPath, DataDirPath};
use reth_provider::providers::{ProviderNodeTypes, StaticFileProvider};
use reth_static_file_types::SegmentRangeInclusive;
@ -49,7 +51,7 @@ impl Command {
println!("\n");
}
let static_files_stats_table = self.static_files_stats_table(data_dir)?;
let static_files_stats_table = self.static_files_stats_table::<N::Primitives>(data_dir)?;
println!("{static_files_stats_table}");
println!("\n");
@ -143,7 +145,7 @@ impl Command {
Ok(table)
}
fn static_files_stats_table(
fn static_files_stats_table<N: NodePrimitives>(
&self,
data_dir: ChainPath<DataDirPath>,
) -> eyre::Result<ComfyTable> {
@ -173,7 +175,8 @@ impl Command {
}
let static_files = iter_static_files(data_dir.static_files())?;
let static_file_provider = StaticFileProvider::read_only(data_dir.static_files(), false)?;
let static_file_provider =
StaticFileProvider::<N>::read_only(data_dir.static_files(), false)?;
let mut total_data_size = 0;
let mut total_index_size = 0;

View File

@ -97,7 +97,6 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> InitStateC
if last_block_number == 0 {
without_evm::setup_without_evm(
&provider_rw,
&static_file_provider,
// &header,
// header_hash,
SealedHeader::new(header, header_hash),

View File

@ -2,11 +2,13 @@ use alloy_primitives::{BlockNumber, B256, U256};
use alloy_rlp::Decodable;
use alloy_consensus::Header;
use reth_node_builder::NodePrimitives;
use reth_primitives::{
BlockBody, SealedBlock, SealedBlockWithSenders, SealedHeader, StaticFileSegment,
};
use reth_provider::{
providers::StaticFileProvider, BlockWriter, StageCheckpointWriter, StaticFileWriter,
providers::StaticFileProvider, BlockWriter, StageCheckpointWriter, StaticFileProviderFactory,
StaticFileWriter,
};
use reth_stages::{StageCheckpoint, StageId};
@ -27,21 +29,21 @@ pub(crate) fn read_header_from_file(path: PathBuf) -> Result<Header, eyre::Error
/// first valid block.
pub fn setup_without_evm<Provider>(
provider_rw: &Provider,
static_file_provider: &StaticFileProvider,
header: SealedHeader,
total_difficulty: U256,
) -> Result<(), eyre::Error>
where
Provider: StageCheckpointWriter + BlockWriter,
Provider: StaticFileProviderFactory + StageCheckpointWriter + BlockWriter,
{
info!(target: "reth::cli", "Setting up dummy EVM chain before importing state.");
let static_file_provider = provider_rw.static_file_provider();
// Write EVM dummy data up to `header - 1` block
append_dummy_chain(static_file_provider, header.number - 1)?;
append_dummy_chain(&static_file_provider, header.number - 1)?;
info!(target: "reth::cli", "Appending first valid block.");
append_first_block(provider_rw, static_file_provider, &header, total_difficulty)?;
append_first_block(provider_rw, &header, total_difficulty)?;
for stage in StageId::ALL {
provider_rw.save_stage_checkpoint(stage, StageCheckpoint::new(header.number))?;
@ -56,17 +58,21 @@ where
///
/// By appending it, static file writer also verifies that all segments are at the same
/// height.
fn append_first_block(
provider_rw: impl BlockWriter,
sf_provider: &StaticFileProvider,
fn append_first_block<Provider>(
provider_rw: &Provider,
header: &SealedHeader,
total_difficulty: U256,
) -> Result<(), eyre::Error> {
) -> Result<(), eyre::Error>
where
Provider: BlockWriter + StaticFileProviderFactory,
{
provider_rw.insert_block(
SealedBlockWithSenders::new(SealedBlock::new(header.clone(), BlockBody::default()), vec![])
.expect("no senders or txes"),
)?;
let sf_provider = provider_rw.static_file_provider();
sf_provider.latest_writer(StaticFileSegment::Headers)?.append_header(
header,
total_difficulty,
@ -85,8 +91,8 @@ fn append_first_block(
/// * Headers: It will push an empty block.
/// * Transactions: It will not push any tx, only increments the end block range.
/// * Receipts: It will not push any receipt, only increments the end block range.
fn append_dummy_chain(
sf_provider: &StaticFileProvider,
fn append_dummy_chain<N: NodePrimitives>(
sf_provider: &StaticFileProvider<N>,
target_height: BlockNumber,
) -> Result<(), eyre::Error> {
let (tx, rx) = std::sync::mpsc::channel();

View File

@ -12,7 +12,9 @@ use reth_db_common::{
};
use reth_node_builder::NodeTypesWithEngine;
use reth_node_core::args::StageEnum;
use reth_provider::{writer::UnifiedStorageWriter, StaticFileProviderFactory};
use reth_provider::{
writer::UnifiedStorageWriter, DatabaseProviderFactory, StaticFileProviderFactory,
};
use reth_prune::PruneSegment;
use reth_stages::StageId;
use reth_static_file_types::StaticFileSegment;
@ -33,8 +35,6 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
) -> eyre::Result<()> {
let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RW)?;
let static_file_provider = provider_factory.static_file_provider();
let tool = DbTool::new(provider_factory)?;
let static_file_segment = match self.stage {
@ -60,7 +60,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
}
}
let provider_rw = tool.provider_factory.provider_rw()?;
let provider_rw = tool.provider_factory.database_provider_rw()?;
let tx = provider_rw.tx_ref();
match self.stage {
@ -71,7 +71,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
tx.clear::<tables::HeaderNumbers>()?;
reset_stage_checkpoint(tx, StageId::Headers)?;
insert_genesis_header(&provider_rw.0, &static_file_provider, &self.env.chain)?;
insert_genesis_header(&provider_rw, &self.env.chain)?;
}
StageEnum::Bodies => {
tx.clear::<tables::BlockBodyIndices>()?;
@ -83,7 +83,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
tx.clear::<tables::BlockWithdrawals>()?;
reset_stage_checkpoint(tx, StageId::Bodies)?;
insert_genesis_header(&provider_rw.0, &static_file_provider, &self.env.chain)?;
insert_genesis_header(&provider_rw, &self.env.chain)?;
}
StageEnum::Senders => {
tx.clear::<tables::TransactionSenders>()?;
@ -104,7 +104,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
reset_stage_checkpoint(tx, StageId::Execution)?;
let alloc = &self.env.chain.genesis().alloc;
insert_genesis_state(&provider_rw.0, alloc.iter())?;
insert_genesis_state(&provider_rw, alloc.iter())?;
}
StageEnum::AccountHashing => {
tx.clear::<tables::HashedAccounts>()?;
@ -142,20 +142,20 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
reset_stage_checkpoint(tx, StageId::IndexAccountHistory)?;
reset_stage_checkpoint(tx, StageId::IndexStorageHistory)?;
insert_genesis_history(&provider_rw.0, self.env.chain.genesis().alloc.iter())?;
insert_genesis_history(&provider_rw, self.env.chain.genesis().alloc.iter())?;
}
StageEnum::TxLookup => {
tx.clear::<tables::TransactionHashNumbers>()?;
reset_prune_checkpoint(tx, PruneSegment::TransactionLookup)?;
reset_stage_checkpoint(tx, StageId::TransactionLookup)?;
insert_genesis_header(&provider_rw.0, &static_file_provider, &self.env.chain)?;
insert_genesis_header(&provider_rw, &self.env.chain)?;
}
}
tx.put::<tables::StageCheckpoints>(StageId::Finish.to_string(), Default::default())?;
UnifiedStorageWriter::commit_unwind(provider_rw, static_file_provider)?;
UnifiedStorageWriter::commit_unwind(provider_rw)?;
Ok(())
}

View File

@ -329,10 +329,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
}
if self.commit {
UnifiedStorageWriter::commit_unwind(
provider_rw,
provider_factory.static_file_provider(),
)?;
UnifiedStorageWriter::commit_unwind(provider_rw)?;
provider_rw = provider_factory.database_provider_rw()?;
}
}
@ -355,7 +352,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
provider_rw.save_stage_checkpoint(exec_stage.id(), checkpoint)?;
}
if self.commit {
UnifiedStorageWriter::commit(provider_rw, provider_factory.static_file_provider())?;
UnifiedStorageWriter::commit(provider_rw)?;
provider_rw = provider_factory.database_provider_rw()?;
}

View File

@ -120,7 +120,7 @@ impl<N: ProviderNodeTypes> PersistenceService<N> {
let new_tip_hash = provider_rw.block_hash(new_tip_num)?;
UnifiedStorageWriter::from(&provider_rw, &sf_provider).remove_blocks_above(new_tip_num)?;
UnifiedStorageWriter::commit_unwind(provider_rw, sf_provider)?;
UnifiedStorageWriter::commit_unwind(provider_rw)?;
debug!(target: "engine::persistence", ?new_tip_num, ?new_tip_hash, "Removed blocks from disk");
self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed());
@ -142,7 +142,7 @@ impl<N: ProviderNodeTypes> PersistenceService<N> {
let static_file_provider = self.provider.static_file_provider();
UnifiedStorageWriter::from(&provider_rw, &static_file_provider).save_blocks(&blocks)?;
UnifiedStorageWriter::commit(provider_rw, static_file_provider)?;
UnifiedStorageWriter::commit(provider_rw)?;
}
self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
Ok(last_block_hash_num)

View File

@ -495,7 +495,7 @@ where
}
/// Returns the static file provider to interact with the static files.
pub fn static_file_provider(&self) -> StaticFileProvider {
pub fn static_file_provider(&self) -> StaticFileProvider<T::Primitives> {
self.right().static_file_provider()
}
@ -766,7 +766,7 @@ where
}
/// Returns the static file provider to interact with the static files.
pub fn static_file_provider(&self) -> StaticFileProvider {
pub fn static_file_provider(&self) -> StaticFileProvider<<T::Types as NodeTypes>::Primitives> {
self.provider_factory().static_file_provider()
}

View File

@ -9,6 +9,7 @@ repository.workspace = true
[dependencies]
reth-db-api.workspace = true
reth-primitives-traits.workspace = true
reth-provider.workspace = true
reth-metrics.workspace = true
reth-tasks.workspace = true

View File

@ -1,7 +1,12 @@
use metrics_process::Collector;
use reth_db_api::database_metrics::DatabaseMetrics;
use reth_primitives_traits::NodePrimitives;
use reth_provider::providers::StaticFileProvider;
use std::{fmt, sync::Arc};
use std::{
fmt::{self},
sync::Arc,
};
pub(crate) trait Hook: Fn() + Send + Sync {}
impl<T: Fn() + Send + Sync> Hook for T {}
@ -22,10 +27,11 @@ pub struct Hooks {
impl Hooks {
/// Create a new set of hooks
pub fn new<Metrics: DatabaseMetrics + 'static + Send + Sync>(
db: Metrics,
static_file_provider: StaticFileProvider,
) -> Self {
pub fn new<Metrics, N>(db: Metrics, static_file_provider: StaticFileProvider<N>) -> Self
where
Metrics: DatabaseMetrics + 'static + Send + Sync,
N: NodePrimitives,
{
let hooks: Vec<Box<dyn Hook<Output = ()>>> = vec![
Box::new(move || db.report_metrics()),
Box::new(move || {

View File

@ -150,7 +150,7 @@ where
}
}
let provider = provider_factory.provider_rw()?;
let provider = provider_factory.database_provider_rw()?;
let mut total_decoded_receipts = 0;
let mut total_receipts = 0;
let mut total_filtered_out_dup_txns = 0;
@ -247,7 +247,7 @@ where
provider
.save_stage_checkpoint(StageId::Execution, StageCheckpoint::new(highest_block_receipts))?;
UnifiedStorageWriter::commit(provider, static_file_provider)?;
UnifiedStorageWriter::commit(provider)?;
Ok(ImportReceiptsResult { total_decoded_receipts, total_filtered_out_dup_txns })
}

View File

@ -54,7 +54,6 @@ impl<C: ChainSpecParser<ChainSpec = OpChainSpec>> InitStateCommandOp<C> {
if last_block_number == 0 {
reth_cli_commands::init_state::without_evm::setup_without_evm(
&provider_rw,
&static_file_provider,
SealedHeader::new(BEDROCK_HEADER, BEDROCK_HEADER_HASH),
BEDROCK_HEADER_TTD,
)?;

View File

@ -3,7 +3,7 @@ use core::fmt;
use crate::{BlockBody, FullBlock, FullReceipt, FullSignedTx, FullTxType};
/// Configures all the primitive types of the node.
pub trait NodePrimitives: Send + Sync + Unpin + Clone + Default + fmt::Debug {
pub trait NodePrimitives: Send + Sync + Unpin + Clone + Default + fmt::Debug + 'static {
/// Block primitive.
type Block: Send + Sync + Unpin + Clone + Default + fmt::Debug + 'static;
/// Signed version of the transaction type.
@ -22,7 +22,7 @@ impl NodePrimitives for () {
}
/// Helper trait that sets trait bounds on [`NodePrimitives`].
pub trait FullNodePrimitives: Send + Sync + Unpin + Clone + Default + fmt::Debug {
pub trait FullNodePrimitives: Send + Sync + Unpin + Clone + Default + fmt::Debug + 'static {
/// Block primitive.
type Block: FullBlock<Body: BlockBody<Transaction = Self::SignedTx>>;
/// Signed version of the transaction type.

View File

@ -76,8 +76,11 @@ impl PrunerBuilder {
/// Builds a [Pruner] from the current configuration with the given provider factory.
pub fn build_with_provider_factory<PF>(self, provider_factory: PF) -> Pruner<PF::ProviderRW, PF>
where
PF: DatabaseProviderFactory<ProviderRW: PruneCheckpointWriter + BlockReader>
+ StaticFileProviderFactory,
PF: DatabaseProviderFactory<
ProviderRW: PruneCheckpointWriter + BlockReader + StaticFileProviderFactory,
> + StaticFileProviderFactory<
Primitives = <PF::ProviderRW as StaticFileProviderFactory>::Primitives,
>,
{
let segments =
SegmentSet::from_components(provider_factory.static_file_provider(), self.segments);
@ -93,10 +96,16 @@ impl PrunerBuilder {
}
/// Builds a [Pruner] from the current configuration with the given static file provider.
pub fn build<Provider>(self, static_file_provider: StaticFileProvider) -> Pruner<Provider, ()>
pub fn build<Provider>(
self,
static_file_provider: StaticFileProvider<Provider::Primitives>,
) -> Pruner<Provider, ()>
where
Provider:
DBProvider<Tx: DbTxMut> + BlockReader + PruneCheckpointWriter + TransactionsProvider,
Provider: StaticFileProviderFactory
+ DBProvider<Tx: DbTxMut>
+ BlockReader
+ PruneCheckpointWriter
+ TransactionsProvider,
{
let segments = SegmentSet::<Provider>::from_components(static_file_provider, self.segments);

View File

@ -5,7 +5,7 @@ use crate::segments::{
use reth_db::transaction::DbTxMut;
use reth_provider::{
providers::StaticFileProvider, BlockReader, DBProvider, PruneCheckpointWriter,
TransactionsProvider,
StaticFileProviderFactory, TransactionsProvider,
};
use reth_prune_types::PruneModes;
@ -45,12 +45,16 @@ impl<Provider> SegmentSet<Provider> {
impl<Provider> SegmentSet<Provider>
where
Provider: DBProvider<Tx: DbTxMut> + TransactionsProvider + PruneCheckpointWriter + BlockReader,
Provider: StaticFileProviderFactory
+ DBProvider<Tx: DbTxMut>
+ TransactionsProvider
+ PruneCheckpointWriter
+ BlockReader,
{
/// Creates a [`SegmentSet`] from an existing components, such as [`StaticFileProvider`] and
/// [`PruneModes`].
pub fn from_components(
static_file_provider: StaticFileProvider,
static_file_provider: StaticFileProvider<Provider::Primitives>,
prune_modes: PruneModes,
) -> Self {
let PruneModes {

View File

@ -12,7 +12,7 @@ use reth_db::{
tables,
transaction::DbTxMut,
};
use reth_provider::{providers::StaticFileProvider, DBProvider};
use reth_provider::{providers::StaticFileProvider, DBProvider, StaticFileProviderFactory};
use reth_prune_types::{
PruneLimiter, PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput,
SegmentOutputCheckpoint,
@ -24,17 +24,19 @@ use tracing::trace;
const HEADER_TABLES_TO_PRUNE: usize = 3;
#[derive(Debug)]
pub struct Headers {
static_file_provider: StaticFileProvider,
pub struct Headers<N> {
static_file_provider: StaticFileProvider<N>,
}
impl Headers {
pub const fn new(static_file_provider: StaticFileProvider) -> Self {
impl<N> Headers<N> {
pub const fn new(static_file_provider: StaticFileProvider<N>) -> Self {
Self { static_file_provider }
}
}
impl<Provider: DBProvider<Tx: DbTxMut>> Segment<Provider> for Headers {
impl<Provider: StaticFileProviderFactory + DBProvider<Tx: DbTxMut>> Segment<Provider>
for Headers<Provider::Primitives>
{
fn segment(&self) -> PruneSegment {
PruneSegment::Headers
}

View File

@ -5,25 +5,29 @@ use crate::{
use reth_db::transaction::DbTxMut;
use reth_provider::{
errors::provider::ProviderResult, providers::StaticFileProvider, BlockReader, DBProvider,
PruneCheckpointWriter, TransactionsProvider,
PruneCheckpointWriter, StaticFileProviderFactory, TransactionsProvider,
};
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, SegmentOutput};
use reth_static_file_types::StaticFileSegment;
#[derive(Debug)]
pub struct Receipts {
static_file_provider: StaticFileProvider,
pub struct Receipts<N> {
static_file_provider: StaticFileProvider<N>,
}
impl Receipts {
pub const fn new(static_file_provider: StaticFileProvider) -> Self {
impl<N> Receipts<N> {
pub const fn new(static_file_provider: StaticFileProvider<N>) -> Self {
Self { static_file_provider }
}
}
impl<Provider> Segment<Provider> for Receipts
impl<Provider> Segment<Provider> for Receipts<Provider::Primitives>
where
Provider: DBProvider<Tx: DbTxMut> + PruneCheckpointWriter + TransactionsProvider + BlockReader,
Provider: StaticFileProviderFactory
+ DBProvider<Tx: DbTxMut>
+ PruneCheckpointWriter
+ TransactionsProvider
+ BlockReader,
{
fn segment(&self) -> PruneSegment {
PruneSegment::Receipts

View File

@ -4,7 +4,10 @@ use crate::{
PrunerError,
};
use reth_db::{tables, transaction::DbTxMut};
use reth_provider::{providers::StaticFileProvider, BlockReader, DBProvider, TransactionsProvider};
use reth_provider::{
providers::StaticFileProvider, BlockReader, DBProvider, StaticFileProviderFactory,
TransactionsProvider,
};
use reth_prune_types::{
PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
@ -12,19 +15,20 @@ use reth_static_file_types::StaticFileSegment;
use tracing::trace;
#[derive(Debug)]
pub struct Transactions {
static_file_provider: StaticFileProvider,
pub struct Transactions<N> {
static_file_provider: StaticFileProvider<N>,
}
impl Transactions {
pub const fn new(static_file_provider: StaticFileProvider) -> Self {
impl<N> Transactions<N> {
pub const fn new(static_file_provider: StaticFileProvider<N>) -> Self {
Self { static_file_provider }
}
}
impl<Provider> Segment<Provider> for Transactions
impl<Provider> Segment<Provider> for Transactions<Provider::Primitives>
where
Provider: DBProvider<Tx: DbTxMut> + TransactionsProvider + BlockReader,
Provider:
DBProvider<Tx: DbTxMut> + TransactionsProvider + BlockReader + StaticFileProviderFactory,
{
fn segment(&self) -> PruneSegment {
PruneSegment::Transactions

View File

@ -9,7 +9,7 @@ use reth_primitives_traits::constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH;
use reth_provider::{
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, ChainStateBlockReader,
ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory, StageCheckpointReader,
StageCheckpointWriter, StaticFileProviderFactory,
StageCheckpointWriter,
};
use reth_prune::PrunerBuilder;
use reth_static_file::StaticFileProducer;
@ -358,10 +358,7 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
))?;
}
UnifiedStorageWriter::commit_unwind(
provider_rw,
self.provider_factory.static_file_provider(),
)?;
UnifiedStorageWriter::commit_unwind(provider_rw)?;
stage.post_unwind_commit()?;
@ -469,10 +466,7 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
result: out.clone(),
});
UnifiedStorageWriter::commit(
provider_rw,
self.provider_factory.static_file_provider(),
)?;
UnifiedStorageWriter::commit(provider_rw)?;
stage.post_execute_commit()?;
@ -533,7 +527,7 @@ fn on_stage_error<N: ProviderNodeTypes>(
prev_checkpoint.unwrap_or_default(),
)?;
UnifiedStorageWriter::commit(provider_rw, factory.static_file_provider())?;
UnifiedStorageWriter::commit(provider_rw)?;
// We unwind because of a validation error. If the unwind itself
// fails, we bail entirely,

View File

@ -311,11 +311,11 @@ where
fn missing_static_data_error<Provider>(
last_tx_num: TxNumber,
static_file_provider: &StaticFileProvider,
static_file_provider: &StaticFileProvider<Provider::Primitives>,
provider: &Provider,
) -> Result<StageError, ProviderError>
where
Provider: BlockReader,
Provider: BlockReader + StaticFileProviderFactory,
{
let mut last_block = static_file_provider
.get_highest_static_file_block(StaticFileSegment::Transactions)

View File

@ -12,7 +12,7 @@ use reth_evm::{
use reth_execution_types::Chain;
use reth_exex::{ExExManagerHandle, ExExNotification, ExExNotificationSource};
use reth_primitives::{SealedHeader, StaticFileSegment};
use reth_primitives_traits::format_gas_throughput;
use reth_primitives_traits::{format_gas_throughput, NodePrimitives};
use reth_provider::{
providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter},
writer::UnifiedStorageWriter,
@ -181,7 +181,8 @@ where
+ StatsReader
+ StateChangeWriter
+ BlockHashReader,
for<'a> UnifiedStorageWriter<'a, Provider, StaticFileProviderRWRefMut<'a>>: StateWriter,
for<'a> UnifiedStorageWriter<'a, Provider, StaticFileProviderRWRefMut<'a, Provider::Primitives>>:
StateWriter,
{
/// Return the id of the stage
fn id(&self) -> StageId {
@ -485,8 +486,8 @@ where
}
}
fn execution_checkpoint(
provider: &StaticFileProvider,
fn execution_checkpoint<N: NodePrimitives>(
provider: &StaticFileProvider<N>,
start_block: BlockNumber,
max_block: BlockNumber,
checkpoint: StageCheckpoint,
@ -552,8 +553,8 @@ fn execution_checkpoint(
})
}
fn calculate_gas_used_from_headers(
provider: &StaticFileProvider,
fn calculate_gas_used_from_headers<N: NodePrimitives>(
provider: &StaticFileProvider<N>,
range: RangeInclusive<BlockNumber>,
) -> Result<u64, ProviderError> {
debug!(target: "sync::stages::execution", ?range, "Calculating gas used from headers");
@ -587,11 +588,11 @@ fn calculate_gas_used_from_headers(
/// (by returning [`StageError`]) until the heights in both the database and static file match.
fn prepare_static_file_producer<'a, 'b, Provider>(
provider: &'b Provider,
static_file_provider: &'a StaticFileProvider,
static_file_provider: &'a StaticFileProvider<Provider::Primitives>,
start_block: u64,
) -> Result<StaticFileProviderRWRefMut<'a>, StageError>
) -> Result<StaticFileProviderRWRefMut<'a, Provider::Primitives>, StageError>
where
Provider: DBProvider + BlockReader + HeaderProvider,
Provider: StaticFileProviderFactory + DBProvider + BlockReader + HeaderProvider,
'b: 'a,
{
// Get next expected receipt number

View File

@ -296,8 +296,8 @@ mod tests {
) {
// We recreate the static file provider, since consistency heals are done on fetching the
// writer for the first time.
let static_file_provider =
StaticFileProvider::read_write(db.factory.static_file_provider().path()).unwrap();
let mut static_file_provider = db.factory.static_file_provider();
static_file_provider = StaticFileProvider::read_write(static_file_provider.path()).unwrap();
// Simulate corruption by removing `prune_count` rows from the data file without updating
// its offset list and configuration.
@ -314,9 +314,10 @@ mod tests {
// We recreate the static file provider, since consistency heals are done on fetching the
// writer for the first time.
let mut static_file_provider = db.factory.static_file_provider();
static_file_provider = StaticFileProvider::read_write(static_file_provider.path()).unwrap();
assert_eq!(
StaticFileProvider::read_write(db.factory.static_file_provider().path())
.unwrap()
static_file_provider
.check_consistency(&db.factory.database_provider_ro().unwrap(), is_full_node,),
Ok(expected)
);

View File

@ -24,7 +24,7 @@ use reth_provider::{
};
use reth_storage_errors::provider::ProviderResult;
use reth_testing_utils::generators::ChangeSet;
use std::{collections::BTreeMap, path::Path};
use std::{collections::BTreeMap, fmt::Debug, path::Path};
use tempfile::TempDir;
/// Test database that is used for testing stage implementations.
@ -142,7 +142,7 @@ impl TestStageDB {
/// Insert header to static file if `writer` exists, otherwise to DB.
pub fn insert_header<TX: DbTx + DbTxMut>(
writer: Option<&mut StaticFileProviderRWRefMut<'_>>,
writer: Option<&mut StaticFileProviderRWRefMut<'_, ()>>,
tx: &TX,
header: &SealedHeader,
td: U256,

View File

@ -10,9 +10,7 @@ use reth_db_api::{transaction::DbTxMut, DatabaseError};
use reth_etl::Collector;
use reth_primitives::{Account, Bytecode, GotExpected, Receipts, StaticFileSegment, StorageEntry};
use reth_provider::{
errors::provider::ProviderResult,
providers::{StaticFileProvider, StaticFileWriter},
writer::UnifiedStorageWriter,
errors::provider::ProviderResult, providers::StaticFileWriter, writer::UnifiedStorageWriter,
BlockHashReader, BlockNumReader, BundleStateInit, ChainSpecProvider, DBProvider,
DatabaseProviderFactory, ExecutionOutcome, HashingWriter, HeaderProvider, HistoryWriter,
OriginalValuesKnown, ProviderError, RevertsInit, StageCheckpointWriter, StateChangeWriter,
@ -72,7 +70,8 @@ impl From<DatabaseError> for InitDatabaseError {
pub fn init_genesis<PF>(factory: &PF) -> Result<B256, InitDatabaseError>
where
PF: DatabaseProviderFactory + StaticFileProviderFactory + ChainSpecProvider + BlockHashReader,
PF::ProviderRW: StageCheckpointWriter
PF::ProviderRW: StaticFileProviderFactory
+ StageCheckpointWriter
+ HistoryWriter
+ HeaderProvider
+ HashingWriter
@ -114,8 +113,7 @@ where
insert_genesis_history(&provider_rw, alloc.iter())?;
// Insert header
let static_file_provider = factory.static_file_provider();
insert_genesis_header(&provider_rw, &static_file_provider, &chain)?;
insert_genesis_header(&provider_rw, &chain)?;
insert_genesis_state(&provider_rw, alloc.iter())?;
@ -124,6 +122,7 @@ where
provider_rw.save_stage_checkpoint(stage, Default::default())?;
}
let static_file_provider = provider_rw.static_file_provider();
// Static file segments start empty, so we need to initialize the genesis block.
let segment = StaticFileSegment::Receipts;
static_file_provider.latest_writer(segment)?.increment_block(0)?;
@ -133,7 +132,7 @@ where
// `commit_unwind`` will first commit the DB and then the static file provider, which is
// necessary on `init_genesis`.
UnifiedStorageWriter::commit_unwind(provider_rw, static_file_provider)?;
UnifiedStorageWriter::commit_unwind(provider_rw)?;
Ok(hash)
}
@ -144,7 +143,11 @@ pub fn insert_genesis_state<'a, 'b, Provider>(
alloc: impl Iterator<Item = (&'a Address, &'b GenesisAccount)>,
) -> ProviderResult<()>
where
Provider: DBProvider<Tx: DbTxMut> + StateChangeWriter + HeaderProvider + AsRef<Provider>,
Provider: StaticFileProviderFactory
+ DBProvider<Tx: DbTxMut>
+ StateChangeWriter
+ HeaderProvider
+ AsRef<Provider>,
{
insert_state(provider, alloc, 0)
}
@ -156,7 +159,11 @@ pub fn insert_state<'a, 'b, Provider>(
block: u64,
) -> ProviderResult<()>
where
Provider: DBProvider<Tx: DbTxMut> + StateChangeWriter + HeaderProvider + AsRef<Provider>,
Provider: StaticFileProviderFactory
+ DBProvider<Tx: DbTxMut>
+ StateChangeWriter
+ HeaderProvider
+ AsRef<Provider>,
{
let capacity = alloc.size_hint().1.unwrap_or(0);
let mut state_init: BundleStateInit = HashMap::with_capacity(capacity);
@ -296,14 +303,14 @@ where
/// Inserts header for the genesis state.
pub fn insert_genesis_header<Provider, Spec>(
provider: &Provider,
static_file_provider: &StaticFileProvider,
chain: &Spec,
) -> ProviderResult<()>
where
Provider: DBProvider<Tx: DbTxMut>,
Provider: StaticFileProviderFactory + DBProvider<Tx: DbTxMut>,
Spec: EthChainSpec,
{
let (header, block_hash) = (chain.genesis_header(), chain.genesis_hash());
let static_file_provider = provider.static_file_provider();
match static_file_provider.block_hash(0) {
Ok(None) | Err(ProviderError::MissingStaticFileBlock(StaticFileSegment::Headers, 0)) => {
@ -333,7 +340,8 @@ pub fn init_from_state_dump<Provider>(
etl_config: EtlConfig,
) -> eyre::Result<B256>
where
Provider: DBProvider<Tx: DbTxMut>
Provider: StaticFileProviderFactory
+ DBProvider<Tx: DbTxMut>
+ BlockNumReader
+ BlockHashReader
+ ChainSpecProvider
@ -457,7 +465,8 @@ fn dump_state<Provider>(
block: u64,
) -> Result<(), eyre::Error>
where
Provider: DBProvider<Tx: DbTxMut>
Provider: StaticFileProviderFactory
+ DBProvider<Tx: DbTxMut>
+ HeaderProvider
+ HashingWriter
+ HistoryWriter

View File

@ -163,7 +163,9 @@ impl<N: ProviderNodeTypes> DatabaseProviderFactory for BlockchainProvider2<N> {
}
impl<N: ProviderNodeTypes> StaticFileProviderFactory for BlockchainProvider2<N> {
fn static_file_provider(&self) -> StaticFileProvider {
type Primitives = N::Primitives;
fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
self.database.static_file_provider()
}
}
@ -911,7 +913,7 @@ mod tests {
)?;
// Commit to both storages: database and static files
UnifiedStorageWriter::commit(provider_rw, factory.static_file_provider())?;
UnifiedStorageWriter::commit(provider_rw)?;
let provider = BlockchainProvider2::new(factory)?;
@ -999,8 +1001,7 @@ mod tests {
UnifiedStorageWriter::from(&provider_rw, &hook_provider.static_file_provider())
.save_blocks(&[lowest_memory_block])
.unwrap();
UnifiedStorageWriter::commit(provider_rw, hook_provider.static_file_provider())
.unwrap();
UnifiedStorageWriter::commit(provider_rw).unwrap();
// Remove from memory
hook_provider.canonical_in_memory_state.remove_persisted_blocks(num_hash);

View File

@ -612,7 +612,9 @@ impl<N: ProviderNodeTypes> ConsistentProvider<N> {
}
impl<N: ProviderNodeTypes> StaticFileProviderFactory for ConsistentProvider<N> {
fn static_file_provider(&self) -> StaticFileProvider {
type Primitives = N::Primitives;
fn static_file_provider(&self) -> StaticFileProvider<N::Primitives> {
self.storage_provider.static_file_provider()
}
}

View File

@ -53,7 +53,7 @@ pub struct ProviderFactory<N: NodeTypesWithDB> {
/// Chain spec
chain_spec: Arc<N::ChainSpec>,
/// Static File Provider
static_file_provider: StaticFileProvider,
static_file_provider: StaticFileProvider<N::Primitives>,
/// Optional pruning configuration
prune_modes: PruneModes,
}
@ -78,7 +78,7 @@ impl<N: NodeTypesWithDB> ProviderFactory<N> {
pub fn new(
db: N::DB,
chain_spec: Arc<N::ChainSpec>,
static_file_provider: StaticFileProvider,
static_file_provider: StaticFileProvider<N::Primitives>,
) -> Self {
Self { db, chain_spec, static_file_provider, prune_modes: PruneModes::none() }
}
@ -114,7 +114,7 @@ impl<N: NodeTypesWithDB<DB = Arc<DatabaseEnv>>> ProviderFactory<N> {
path: P,
chain_spec: Arc<N::ChainSpec>,
args: DatabaseArguments,
static_file_provider: StaticFileProvider,
static_file_provider: StaticFileProvider<N::Primitives>,
) -> RethResult<Self> {
Ok(Self {
db: Arc::new(init_db(path, args).map_err(RethError::msg)?),
@ -202,8 +202,10 @@ impl<N: ProviderNodeTypes> DatabaseProviderFactory for ProviderFactory<N> {
}
impl<N: NodeTypesWithDB> StaticFileProviderFactory for ProviderFactory<N> {
type Primitives = N::Primitives;
/// Returns static file provider
fn static_file_provider(&self) -> StaticFileProvider {
fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
self.static_file_provider.clone()
}
}

View File

@ -135,7 +135,7 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
/// Chain spec
chain_spec: Arc<N::ChainSpec>,
/// Static File provider
static_file_provider: StaticFileProvider,
static_file_provider: StaticFileProvider<N::Primitives>,
/// Pruning configuration
prune_modes: PruneModes,
}
@ -199,8 +199,10 @@ impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
}
impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
type Primitives = N::Primitives;
/// Returns a static file provider
fn static_file_provider(&self) -> StaticFileProvider {
fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
self.static_file_provider.clone()
}
}
@ -220,7 +222,7 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
pub const fn new_rw(
tx: TX,
chain_spec: Arc<N::ChainSpec>,
static_file_provider: StaticFileProvider,
static_file_provider: StaticFileProvider<N::Primitives>,
prune_modes: PruneModes,
) -> Self {
Self { tx, chain_spec, static_file_provider, prune_modes }
@ -363,7 +365,7 @@ impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
pub const fn new(
tx: TX,
chain_spec: Arc<N::ChainSpec>,
static_file_provider: StaticFileProvider,
static_file_provider: StaticFileProvider<N::Primitives>,
prune_modes: PruneModes,
) -> Self {
Self { tx, chain_spec, static_file_provider, prune_modes }

View File

@ -204,7 +204,9 @@ impl<N: ProviderNodeTypes> DatabaseProviderFactory for BlockchainProvider<N> {
}
impl<N: ProviderNodeTypes> StaticFileProviderFactory for BlockchainProvider<N> {
fn static_file_provider(&self) -> StaticFileProvider {
type Primitives = N::Primitives;
fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
self.database.static_file_provider()
}
}

View File

@ -12,39 +12,49 @@ use alloy_primitives::{Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
use reth_chainspec::ChainInfo;
use reth_db::static_file::{HeaderMask, ReceiptMask, StaticFileCursor, TransactionMask};
use reth_db_api::models::CompactU256;
use reth_node_types::NodePrimitives;
use reth_primitives::{
Receipt, SealedHeader, TransactionMeta, TransactionSigned, TransactionSignedNoHash,
};
use reth_storage_errors::provider::{ProviderError, ProviderResult};
use std::{
fmt::Debug,
ops::{Deref, RangeBounds},
sync::Arc,
};
/// Provider over a specific `NippyJar` and range.
#[derive(Debug)]
pub struct StaticFileJarProvider<'a> {
pub struct StaticFileJarProvider<'a, N> {
/// Main static file segment
jar: LoadedJarRef<'a>,
/// Another kind of static file segment to help query data from the main one.
auxiliary_jar: Option<Box<Self>>,
/// Metrics for the static files.
metrics: Option<Arc<StaticFileProviderMetrics>>,
/// Node primitives
_pd: std::marker::PhantomData<N>,
}
impl<'a> Deref for StaticFileJarProvider<'a> {
impl<'a, N: NodePrimitives> Deref for StaticFileJarProvider<'a, N> {
type Target = LoadedJarRef<'a>;
fn deref(&self) -> &Self::Target {
&self.jar
}
}
impl<'a> From<LoadedJarRef<'a>> for StaticFileJarProvider<'a> {
impl<'a, N: NodePrimitives> From<LoadedJarRef<'a>> for StaticFileJarProvider<'a, N> {
fn from(value: LoadedJarRef<'a>) -> Self {
StaticFileJarProvider { jar: value, auxiliary_jar: None, metrics: None }
StaticFileJarProvider {
jar: value,
auxiliary_jar: None,
metrics: None,
_pd: Default::default(),
}
}
}
impl<'a> StaticFileJarProvider<'a> {
impl<'a, N: NodePrimitives> StaticFileJarProvider<'a, N> {
/// Provides a cursor for more granular data access.
pub fn cursor<'b>(&'b self) -> ProviderResult<StaticFileCursor<'a>>
where
@ -76,7 +86,7 @@ impl<'a> StaticFileJarProvider<'a> {
}
}
impl HeaderProvider for StaticFileJarProvider<'_> {
impl<N: NodePrimitives> HeaderProvider for StaticFileJarProvider<'_, N> {
fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Header>> {
Ok(self
.cursor()?
@ -148,7 +158,7 @@ impl HeaderProvider for StaticFileJarProvider<'_> {
}
}
impl BlockHashReader for StaticFileJarProvider<'_> {
impl<N: NodePrimitives> BlockHashReader for StaticFileJarProvider<'_, N> {
fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
self.cursor()?.get_one::<HeaderMask<BlockHash>>(number.into())
}
@ -170,7 +180,7 @@ impl BlockHashReader for StaticFileJarProvider<'_> {
}
}
impl BlockNumReader for StaticFileJarProvider<'_> {
impl<N: NodePrimitives> BlockNumReader for StaticFileJarProvider<'_, N> {
fn chain_info(&self) -> ProviderResult<ChainInfo> {
// Information on live database
Err(ProviderError::UnsupportedProvider)
@ -195,7 +205,7 @@ impl BlockNumReader for StaticFileJarProvider<'_> {
}
}
impl TransactionsProvider for StaticFileJarProvider<'_> {
impl<N: NodePrimitives> TransactionsProvider for StaticFileJarProvider<'_, N> {
fn transaction_id(&self, hash: TxHash) -> ProviderResult<Option<TxNumber>> {
let mut cursor = self.cursor()?;
@ -291,7 +301,7 @@ impl TransactionsProvider for StaticFileJarProvider<'_> {
}
}
impl ReceiptProvider for StaticFileJarProvider<'_> {
impl<N: NodePrimitives> ReceiptProvider for StaticFileJarProvider<'_, N> {
fn receipt(&self, num: TxNumber) -> ProviderResult<Option<Receipt>> {
self.cursor()?.get_one::<ReceiptMask<Receipt>>(num.into())
}

View File

@ -29,6 +29,7 @@ use reth_db_api::{
transaction::DbTx,
};
use reth_nippy_jar::{NippyJar, NippyJarChecker, CONFIG_FILE_EXTENSION};
use reth_node_types::NodePrimitives;
use reth_primitives::{
static_file::{
find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive,
@ -42,6 +43,8 @@ use reth_storage_api::DBProvider;
use reth_storage_errors::provider::{ProviderError, ProviderResult};
use std::{
collections::{hash_map::Entry, BTreeMap, HashMap},
fmt::Debug,
marker::PhantomData,
ops::{Deref, Range, RangeBounds, RangeInclusive},
path::{Path, PathBuf},
sync::{mpsc, Arc},
@ -77,10 +80,16 @@ impl StaticFileAccess {
}
/// [`StaticFileProvider`] manages all existing [`StaticFileJarProvider`].
#[derive(Debug, Clone)]
pub struct StaticFileProvider(pub(crate) Arc<StaticFileProviderInner>);
#[derive(Debug)]
pub struct StaticFileProvider<N>(pub(crate) Arc<StaticFileProviderInner<N>>);
impl StaticFileProvider {
impl<N> Clone for StaticFileProvider<N> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<N: NodePrimitives> StaticFileProvider<N> {
/// Creates a new [`StaticFileProvider`].
fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
let provider = Self(Arc::new(StaticFileProviderInner::new(path, access)?));
@ -191,8 +200,8 @@ impl StaticFileProvider {
}
}
impl Deref for StaticFileProvider {
type Target = StaticFileProviderInner;
impl<N: NodePrimitives> Deref for StaticFileProvider<N> {
type Target = StaticFileProviderInner<N>;
fn deref(&self) -> &Self::Target {
&self.0
@ -201,7 +210,7 @@ impl Deref for StaticFileProvider {
/// [`StaticFileProviderInner`] manages all existing [`StaticFileJarProvider`].
#[derive(Debug)]
pub struct StaticFileProviderInner {
pub struct StaticFileProviderInner<N> {
/// Maintains a map which allows for concurrent access to different `NippyJars`, over different
/// segments and ranges.
map: DashMap<(BlockNumber, StaticFileSegment), LoadedJar>,
@ -212,7 +221,8 @@ pub struct StaticFileProviderInner {
/// Directory where `static_files` are located
path: PathBuf,
/// Maintains a writer set of [`StaticFileSegment`].
writers: StaticFileWriters,
writers: StaticFileWriters<N>,
/// Metrics for the static files.
metrics: Option<Arc<StaticFileProviderMetrics>>,
/// Access rights of the provider.
access: StaticFileAccess,
@ -220,9 +230,11 @@ pub struct StaticFileProviderInner {
blocks_per_file: u64,
/// Write lock for when access is [`StaticFileAccess::RW`].
_lock_file: Option<StorageLock>,
/// Node primitives
_pd: PhantomData<N>,
}
impl StaticFileProviderInner {
impl<N: NodePrimitives> StaticFileProviderInner<N> {
/// Creates a new [`StaticFileProviderInner`].
fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
let _lock_file = if access.is_read_write() {
@ -241,6 +253,7 @@ impl StaticFileProviderInner {
access,
blocks_per_file: DEFAULT_BLOCKS_PER_STATIC_FILE,
_lock_file,
_pd: Default::default(),
};
Ok(provider)
@ -257,7 +270,7 @@ impl StaticFileProviderInner {
}
}
impl StaticFileProvider {
impl<N: NodePrimitives> StaticFileProvider<N> {
/// Set a custom number of blocks per file.
#[cfg(any(test, feature = "test-utils"))]
pub fn with_custom_blocks_per_file(self, blocks_per_file: u64) -> Self {
@ -323,7 +336,7 @@ impl StaticFileProvider {
segment: StaticFileSegment,
block: BlockNumber,
path: Option<&Path>,
) -> ProviderResult<StaticFileJarProvider<'_>> {
) -> ProviderResult<StaticFileJarProvider<'_, N>> {
self.get_segment_provider(
segment,
|| self.get_segment_ranges_from_block(segment, block),
@ -338,7 +351,7 @@ impl StaticFileProvider {
segment: StaticFileSegment,
tx: TxNumber,
path: Option<&Path>,
) -> ProviderResult<StaticFileJarProvider<'_>> {
) -> ProviderResult<StaticFileJarProvider<'_, N>> {
self.get_segment_provider(
segment,
|| self.get_segment_ranges_from_transaction(segment, tx),
@ -355,7 +368,7 @@ impl StaticFileProvider {
segment: StaticFileSegment,
fn_range: impl Fn() -> Option<SegmentRangeInclusive>,
path: Option<&Path>,
) -> ProviderResult<Option<StaticFileJarProvider<'_>>> {
) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
// If we have a path, then get the block range from its name.
// Otherwise, check `self.available_static_files`
let block_range = match path {
@ -426,12 +439,12 @@ impl StaticFileProvider {
&self,
segment: StaticFileSegment,
fixed_block_range: &SegmentRangeInclusive,
) -> ProviderResult<StaticFileJarProvider<'_>> {
) -> ProviderResult<StaticFileJarProvider<'_, N>> {
let key = (fixed_block_range.end(), segment);
// Avoid using `entry` directly to avoid a write lock in the common case.
trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Getting provider");
let mut provider: StaticFileJarProvider<'_> = if let Some(jar) = self.map.get(&key) {
let mut provider: StaticFileJarProvider<'_, N> = if let Some(jar) = self.map.get(&key) {
trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Jar found in cache");
jar.into()
} else {
@ -924,7 +937,7 @@ impl StaticFileProvider {
pub fn find_static_file<T>(
&self,
segment: StaticFileSegment,
func: impl Fn(StaticFileJarProvider<'_>) -> ProviderResult<Option<T>>,
func: impl Fn(StaticFileJarProvider<'_, N>) -> ProviderResult<Option<T>>,
) -> ProviderResult<Option<T>> {
if let Some(highest_block) = self.get_highest_static_file_block(segment) {
let mut range = self.find_fixed_range(highest_block);
@ -1167,30 +1180,35 @@ impl StaticFileProvider {
/// Helper trait to manage different [`StaticFileProviderRW`] of an `Arc<StaticFileProvider`
pub trait StaticFileWriter {
/// The primitives type used by the static file provider.
type Primitives: Send + Sync + 'static;
/// Returns a mutable reference to a [`StaticFileProviderRW`] of a [`StaticFileSegment`].
fn get_writer(
&self,
block: BlockNumber,
segment: StaticFileSegment,
) -> ProviderResult<StaticFileProviderRWRefMut<'_>>;
) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
/// Returns a mutable reference to a [`StaticFileProviderRW`] of the latest
/// [`StaticFileSegment`].
fn latest_writer(
&self,
segment: StaticFileSegment,
) -> ProviderResult<StaticFileProviderRWRefMut<'_>>;
) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
/// Commits all changes of all [`StaticFileProviderRW`] of all [`StaticFileSegment`].
fn commit(&self) -> ProviderResult<()>;
}
impl StaticFileWriter for StaticFileProvider {
impl<N: NodePrimitives> StaticFileWriter for StaticFileProvider<N> {
type Primitives = N;
fn get_writer(
&self,
block: BlockNumber,
segment: StaticFileSegment,
) -> ProviderResult<StaticFileProviderRWRefMut<'_>> {
) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
if self.access.is_read_only() {
return Err(ProviderError::ReadOnlyStaticFileAccess)
}
@ -1204,7 +1222,7 @@ impl StaticFileWriter for StaticFileProvider {
fn latest_writer(
&self,
segment: StaticFileSegment,
) -> ProviderResult<StaticFileProviderRWRefMut<'_>> {
) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
self.get_writer(self.get_highest_static_file_block(segment).unwrap_or_default(), segment)
}
@ -1213,7 +1231,7 @@ impl StaticFileWriter for StaticFileProvider {
}
}
impl HeaderProvider for StaticFileProvider {
impl<N: NodePrimitives> HeaderProvider for StaticFileProvider<N> {
fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Header>> {
self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
Ok(jar_provider
@ -1300,7 +1318,7 @@ impl HeaderProvider for StaticFileProvider {
}
}
impl BlockHashReader for StaticFileProvider {
impl<N: NodePrimitives> BlockHashReader for StaticFileProvider<N> {
fn block_hash(&self, num: u64) -> ProviderResult<Option<B256>> {
self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)?.block_hash(num)
}
@ -1319,7 +1337,7 @@ impl BlockHashReader for StaticFileProvider {
}
}
impl ReceiptProvider for StaticFileProvider {
impl<N: NodePrimitives> ReceiptProvider for StaticFileProvider<N> {
fn receipt(&self, num: TxNumber) -> ProviderResult<Option<Receipt>> {
self.get_segment_provider_from_transaction(StaticFileSegment::Receipts, num, None)
.and_then(|provider| provider.receipt(num))
@ -1356,7 +1374,7 @@ impl ReceiptProvider for StaticFileProvider {
}
}
impl TransactionsProviderExt for StaticFileProvider {
impl<N: NodePrimitives> TransactionsProviderExt for StaticFileProvider<N> {
fn transaction_hashes_by_range(
&self,
tx_range: Range<TxNumber>,
@ -1417,7 +1435,7 @@ impl TransactionsProviderExt for StaticFileProvider {
}
}
impl TransactionsProvider for StaticFileProvider {
impl<N: NodePrimitives> TransactionsProvider for StaticFileProvider<N> {
fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
let mut cursor = jar_provider.cursor()?;
@ -1529,7 +1547,7 @@ impl TransactionsProvider for StaticFileProvider {
/* Cannot be successfully implemented but must exist for trait requirements */
impl BlockNumReader for StaticFileProvider {
impl<N: NodePrimitives> BlockNumReader for StaticFileProvider<N> {
fn chain_info(&self) -> ProviderResult<ChainInfo> {
// Required data not present in static_files
Err(ProviderError::UnsupportedProvider)
@ -1551,7 +1569,7 @@ impl BlockNumReader for StaticFileProvider {
}
}
impl BlockReader for StaticFileProvider {
impl<N: NodePrimitives> BlockReader for StaticFileProvider<N> {
fn find_block_by_hash(
&self,
_hash: B256,
@ -1629,7 +1647,7 @@ impl BlockReader for StaticFileProvider {
}
}
impl WithdrawalsProvider for StaticFileProvider {
impl<N: NodePrimitives> WithdrawalsProvider for StaticFileProvider<N> {
fn withdrawals_by_block(
&self,
_id: BlockHashOrNumber,
@ -1645,7 +1663,7 @@ impl WithdrawalsProvider for StaticFileProvider {
}
}
impl StatsReader for StaticFileProvider {
impl<N: NodePrimitives> StatsReader for StaticFileProvider<N> {
fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
match T::NAME {
tables::CanonicalHeaders::NAME |

View File

@ -55,7 +55,9 @@ impl Deref for LoadedJar {
#[cfg(test)]
mod tests {
use super::*;
use crate::{test_utils::create_test_provider_factory, HeaderProvider};
use crate::{
test_utils::create_test_provider_factory, HeaderProvider, StaticFileProviderFactory,
};
use alloy_consensus::{Header, Transaction};
use alloy_primitives::{BlockHash, TxNumber, B256, U256};
use rand::seq::SliceRandom;
@ -116,7 +118,7 @@ mod tests {
// Create StaticFile
{
let manager = StaticFileProvider::read_write(static_files_path.path()).unwrap();
let manager = factory.static_file_provider();
let mut writer = manager.latest_writer(StaticFileSegment::Headers).unwrap();
let mut td = U256::ZERO;
@ -131,7 +133,7 @@ mod tests {
// Use providers to query Header data and compare if it matches
{
let db_provider = factory.provider().unwrap();
let manager = StaticFileProvider::read_write(static_files_path.path()).unwrap();
let manager = db_provider.static_file_provider();
let jar_provider = manager
.get_segment_provider_from_block(StaticFileSegment::Headers, 0, Some(&static_file))
.unwrap();
@ -170,7 +172,7 @@ mod tests {
// [ Headers Creation and Commit ]
{
let sf_rw = StaticFileProvider::read_write(&static_dir)
let sf_rw = StaticFileProvider::<()>::read_write(&static_dir)
.expect("Failed to create static file provider")
.with_custom_blocks_per_file(blocks_per_file);
@ -189,8 +191,8 @@ mod tests {
// Helper function to prune headers and validate truncation results
fn prune_and_validate(
writer: &mut StaticFileProviderRWRefMut<'_>,
sf_rw: &StaticFileProvider,
writer: &mut StaticFileProviderRWRefMut<'_, ()>,
sf_rw: &StaticFileProvider<()>,
static_dir: impl AsRef<Path>,
prune_count: u64,
expected_tip: Option<u64>,
@ -302,13 +304,13 @@ mod tests {
/// * `10..=19`: no txs/receipts
/// * `20..=29`: only one tx/receipt
fn setup_tx_based_scenario(
sf_rw: &StaticFileProvider,
sf_rw: &StaticFileProvider<()>,
segment: StaticFileSegment,
blocks_per_file: u64,
) {
fn setup_block_ranges(
writer: &mut StaticFileProviderRWRefMut<'_>,
sf_rw: &StaticFileProvider,
writer: &mut StaticFileProviderRWRefMut<'_, ()>,
sf_rw: &StaticFileProvider<()>,
segment: StaticFileSegment,
block_range: &Range<u64>,
mut tx_count: u64,
@ -413,7 +415,7 @@ mod tests {
#[allow(clippy::too_many_arguments)]
fn prune_and_validate(
sf_rw: &StaticFileProvider,
sf_rw: &StaticFileProvider<()>,
static_dir: impl AsRef<Path>,
segment: StaticFileSegment,
prune_count: u64,

View File

@ -8,6 +8,7 @@ use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
use reth_codecs::Compact;
use reth_db_api::models::CompactU256;
use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
use reth_node_types::NodePrimitives;
use reth_primitives::{
static_file::{SegmentHeader, SegmentRangeInclusive},
Receipt, StaticFileSegment,
@ -15,6 +16,7 @@ use reth_primitives::{
use reth_storage_errors::provider::{ProviderError, ProviderResult};
use std::{
borrow::Borrow,
fmt::Debug,
path::{Path, PathBuf},
sync::{Arc, Weak},
time::Instant,
@ -25,19 +27,29 @@ use tracing::debug;
///
/// WARNING: Trying to use more than one writer for the same segment type **will result in a
/// deadlock**.
#[derive(Debug, Default)]
pub(crate) struct StaticFileWriters {
headers: RwLock<Option<StaticFileProviderRW>>,
transactions: RwLock<Option<StaticFileProviderRW>>,
receipts: RwLock<Option<StaticFileProviderRW>>,
#[derive(Debug)]
pub(crate) struct StaticFileWriters<N> {
headers: RwLock<Option<StaticFileProviderRW<N>>>,
transactions: RwLock<Option<StaticFileProviderRW<N>>>,
receipts: RwLock<Option<StaticFileProviderRW<N>>>,
}
impl StaticFileWriters {
impl<N> Default for StaticFileWriters<N> {
fn default() -> Self {
Self {
headers: Default::default(),
transactions: Default::default(),
receipts: Default::default(),
}
}
}
impl<N: NodePrimitives> StaticFileWriters<N> {
pub(crate) fn get_or_create(
&self,
segment: StaticFileSegment,
create_fn: impl FnOnce() -> ProviderResult<StaticFileProviderRW>,
) -> ProviderResult<StaticFileProviderRWRefMut<'_>> {
create_fn: impl FnOnce() -> ProviderResult<StaticFileProviderRW<N>>,
) -> ProviderResult<StaticFileProviderRWRefMut<'_, N>> {
let mut write_guard = match segment {
StaticFileSegment::Headers => self.headers.write(),
StaticFileSegment::Transactions => self.transactions.write(),
@ -64,19 +76,19 @@ impl StaticFileWriters {
/// Mutable reference to a [`StaticFileProviderRW`] behind a [`RwLockWriteGuard`].
#[derive(Debug)]
pub struct StaticFileProviderRWRefMut<'a>(
pub(crate) RwLockWriteGuard<'a, RawRwLock, Option<StaticFileProviderRW>>,
pub struct StaticFileProviderRWRefMut<'a, N>(
pub(crate) RwLockWriteGuard<'a, RawRwLock, Option<StaticFileProviderRW<N>>>,
);
impl std::ops::DerefMut for StaticFileProviderRWRefMut<'_> {
impl<N> std::ops::DerefMut for StaticFileProviderRWRefMut<'_, N> {
fn deref_mut(&mut self) -> &mut Self::Target {
// This is always created by [`StaticFileWriters::get_or_create`]
self.0.as_mut().expect("static file writer provider should be init")
}
}
impl std::ops::Deref for StaticFileProviderRWRefMut<'_> {
type Target = StaticFileProviderRW;
impl<N> std::ops::Deref for StaticFileProviderRWRefMut<'_, N> {
type Target = StaticFileProviderRW<N>;
fn deref(&self) -> &Self::Target {
// This is always created by [`StaticFileWriters::get_or_create`]
@ -86,11 +98,11 @@ impl std::ops::Deref for StaticFileProviderRWRefMut<'_> {
#[derive(Debug)]
/// Extends `StaticFileProvider` with writing capabilities
pub struct StaticFileProviderRW {
pub struct StaticFileProviderRW<N> {
/// Reference back to the provider. We need [Weak] here because [`StaticFileProviderRW`] is
/// stored in a [`dashmap::DashMap`] inside the parent [`StaticFileProvider`].which is an
/// [Arc]. If we were to use an [Arc] here, we would create a reference cycle.
reader: Weak<StaticFileProviderInner>,
reader: Weak<StaticFileProviderInner<N>>,
/// A [`NippyJarWriter`] instance.
writer: NippyJarWriter<SegmentHeader>,
/// Path to opened file.
@ -104,7 +116,7 @@ pub struct StaticFileProviderRW {
prune_on_commit: Option<(u64, Option<BlockNumber>)>,
}
impl StaticFileProviderRW {
impl<N: NodePrimitives> StaticFileProviderRW<N> {
/// Creates a new [`StaticFileProviderRW`] for a [`StaticFileSegment`].
///
/// Before use, transaction based segments should ensure the block end range is the expected
@ -112,7 +124,7 @@ impl StaticFileProviderRW {
pub fn new(
segment: StaticFileSegment,
block: BlockNumber,
reader: Weak<StaticFileProviderInner>,
reader: Weak<StaticFileProviderInner<N>>,
metrics: Option<Arc<StaticFileProviderMetrics>>,
) -> ProviderResult<Self> {
let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?;
@ -133,7 +145,7 @@ impl StaticFileProviderRW {
fn open(
segment: StaticFileSegment,
block: u64,
reader: Weak<StaticFileProviderInner>,
reader: Weak<StaticFileProviderInner<N>>,
metrics: Option<Arc<StaticFileProviderMetrics>>,
) -> ProviderResult<(NippyJarWriter<SegmentHeader>, PathBuf)> {
let start = Instant::now();
@ -751,7 +763,7 @@ impl StaticFileProviderRW {
Ok(())
}
fn reader(&self) -> StaticFileProvider {
fn reader(&self) -> StaticFileProvider<N> {
Self::upgrade_provider_to_strong_reference(&self.reader)
}
@ -764,8 +776,8 @@ impl StaticFileProviderRW {
/// active. In reality, it's impossible to detach the [`StaticFileProviderRW`] from the
/// [`StaticFileProvider`].
fn upgrade_provider_to_strong_reference(
provider: &Weak<StaticFileProviderInner>,
) -> StaticFileProvider {
provider: &Weak<StaticFileProviderInner<N>>,
) -> StaticFileProvider<N> {
provider.upgrade().map(StaticFileProvider).expect("StaticFileProvider is dropped")
}

View File

@ -556,7 +556,9 @@ impl PruneCheckpointReader for NoopProvider {
}
impl StaticFileProviderFactory for NoopProvider {
fn static_file_provider(&self) -> StaticFileProvider {
type Primitives = ();
fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
StaticFileProvider::read_only(PathBuf::default(), false).unwrap()
}
}

View File

@ -1,7 +1,12 @@
use reth_node_types::NodePrimitives;
use crate::providers::StaticFileProvider;
/// Static file provider factory.
pub trait StaticFileProviderFactory {
/// The network primitives type [`StaticFileProvider`] is using.
type Primitives: NodePrimitives;
/// Create new instance of static file provider.
fn static_file_provider(&self) -> StaticFileProvider;
fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives>;
}

View File

@ -1,7 +1,8 @@
use crate::{
providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter as SfWriter},
writer::static_file::StaticFileWriter,
BlockExecutionWriter, BlockWriter, HistoryWriter, StateChangeWriter, StateWriter, TrieWriter,
BlockExecutionWriter, BlockWriter, HistoryWriter, StateChangeWriter, StateWriter,
StaticFileProviderFactory, TrieWriter,
};
use alloy_consensus::Header;
use alloy_primitives::{BlockNumber, B256, U256};
@ -115,15 +116,13 @@ impl UnifiedStorageWriter<'_, (), ()> {
/// start-up.
///
/// NOTE: If unwinding data from storage, use `commit_unwind` instead!
pub fn commit<P>(
database: impl Into<P> + AsRef<P>,
static_file: StaticFileProvider,
) -> ProviderResult<()>
pub fn commit<P>(provider: P) -> ProviderResult<()>
where
P: DBProvider<Tx: DbTxMut>,
P: DBProvider<Tx: DbTxMut> + StaticFileProviderFactory,
{
let static_file = provider.static_file_provider();
static_file.commit()?;
database.into().into_tx().commit()?;
provider.commit()?;
Ok(())
}
@ -135,20 +134,18 @@ impl UnifiedStorageWriter<'_, (), ()> {
/// checkpoints on the next start-up.
///
/// NOTE: Should only be used after unwinding data from storage!
pub fn commit_unwind<P>(
database: impl Into<P> + AsRef<P>,
static_file: StaticFileProvider,
) -> ProviderResult<()>
pub fn commit_unwind<P>(provider: P) -> ProviderResult<()>
where
P: DBProvider<Tx: DbTxMut>,
P: DBProvider<Tx: DbTxMut> + StaticFileProviderFactory,
{
database.into().into_tx().commit()?;
let static_file = provider.static_file_provider();
provider.commit()?;
static_file.commit()?;
Ok(())
}
}
impl<ProviderDB> UnifiedStorageWriter<'_, ProviderDB, &StaticFileProvider>
impl<ProviderDB> UnifiedStorageWriter<'_, ProviderDB, &StaticFileProvider<ProviderDB::Primitives>>
where
ProviderDB: DBProvider<Tx: DbTx + DbTxMut>
+ BlockWriter
@ -158,7 +155,8 @@ where
+ HistoryWriter
+ StageCheckpointWriter
+ BlockExecutionWriter
+ AsRef<ProviderDB>,
+ AsRef<ProviderDB>
+ StaticFileProviderFactory,
{
/// Writes executed blocks and receipts to storage.
pub fn save_blocks(&self, blocks: &[ExecutedBlock]) -> ProviderResult<()> {
@ -319,9 +317,10 @@ where
}
}
impl<ProviderDB> UnifiedStorageWriter<'_, ProviderDB, StaticFileProviderRWRefMut<'_>>
impl<ProviderDB>
UnifiedStorageWriter<'_, ProviderDB, StaticFileProviderRWRefMut<'_, ProviderDB::Primitives>>
where
ProviderDB: DBProvider<Tx: DbTx> + HeaderProvider,
ProviderDB: DBProvider<Tx: DbTx> + HeaderProvider + StaticFileProviderFactory,
{
/// Ensures that the static file writer is set and of the right [`StaticFileSegment`] variant.
///
@ -430,9 +429,10 @@ where
}
}
impl<ProviderDB> UnifiedStorageWriter<'_, ProviderDB, StaticFileProviderRWRefMut<'_>>
impl<ProviderDB>
UnifiedStorageWriter<'_, ProviderDB, StaticFileProviderRWRefMut<'_, ProviderDB::Primitives>>
where
ProviderDB: DBProvider<Tx: DbTxMut + DbTx> + HeaderProvider,
ProviderDB: DBProvider<Tx: DbTxMut + DbTx> + HeaderProvider + StaticFileProviderFactory,
{
/// Appends receipts block by block.
///
@ -512,9 +512,12 @@ where
}
impl<ProviderDB> StateWriter
for UnifiedStorageWriter<'_, ProviderDB, StaticFileProviderRWRefMut<'_>>
for UnifiedStorageWriter<'_, ProviderDB, StaticFileProviderRWRefMut<'_, ProviderDB::Primitives>>
where
ProviderDB: DBProvider<Tx: DbTxMut + DbTx> + StateChangeWriter + HeaderProvider,
ProviderDB: DBProvider<Tx: DbTxMut + DbTx>
+ StateChangeWriter
+ HeaderProvider
+ StaticFileProviderFactory,
{
/// Write the data and receipts to the database or static files if `static_file_producer` is
/// `Some`. It should be `None` if there is any kind of pruning/filtering over the receipts.

View File

@ -1,12 +1,13 @@
use crate::providers::StaticFileProviderRWRefMut;
use alloy_primitives::{BlockNumber, TxNumber};
use reth_errors::ProviderResult;
use reth_node_types::NodePrimitives;
use reth_primitives::Receipt;
use reth_storage_api::ReceiptWriter;
pub(crate) struct StaticFileWriter<'a, W>(pub(crate) &'a mut W);
impl ReceiptWriter for StaticFileWriter<'_, StaticFileProviderRWRefMut<'_>> {
impl<N: NodePrimitives> ReceiptWriter for StaticFileWriter<'_, StaticFileProviderRWRefMut<'_, N>> {
fn append_block_receipts(
&mut self,
first_tx_index: TxNumber,