feat: add Receipt AT to writer traits (#12892)

This commit is contained in:
Arsenii Kulikov
2024-11-27 19:31:13 +04:00
committed by GitHub
parent 1131bdecc3
commit 7a6a725d91
24 changed files with 211 additions and 161 deletions

View File

@ -60,7 +60,10 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
async fn build_network< async fn build_network<
N: ProviderNodeTypes< N: ProviderNodeTypes<
ChainSpec = C::ChainSpec, ChainSpec = C::ChainSpec,
Primitives: NodePrimitives<Block = reth_primitives::Block>, Primitives: NodePrimitives<
Block = reth_primitives::Block,
Receipt = reth_primitives::Receipt,
>,
>, >,
>( >(
&self, &self,
@ -178,7 +181,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
.try_seal_with_senders() .try_seal_with_senders()
.map_err(|_| BlockValidationError::SenderRecoveryError)?, .map_err(|_| BlockValidationError::SenderRecoveryError)?,
)?; )?;
provider_rw.write_to_storage( provider_rw.write_state(
execution_outcome, execution_outcome,
OriginalValuesKnown::No, OriginalValuesKnown::No,
StorageLocation::Database, StorageLocation::Database,

View File

@ -59,7 +59,10 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
async fn build_network< async fn build_network<
N: ProviderNodeTypes< N: ProviderNodeTypes<
ChainSpec = C::ChainSpec, ChainSpec = C::ChainSpec,
Primitives: NodePrimitives<Block = reth_primitives::Block>, Primitives: NodePrimitives<
Block = reth_primitives::Block,
Receipt = reth_primitives::Receipt,
>,
>, >,
>( >(
&self, &self,
@ -163,7 +166,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
executor.execute_and_verify_one((&sealed_block.clone().unseal(), td).into())?; executor.execute_and_verify_one((&sealed_block.clone().unseal(), td).into())?;
let execution_outcome = executor.finalize(); let execution_outcome = executor.finalize();
provider_rw.write_to_storage( provider_rw.write_state(
execution_outcome, execution_outcome,
OriginalValuesKnown::Yes, OriginalValuesKnown::Yes,
StorageLocation::Database, StorageLocation::Database,

View File

@ -58,7 +58,10 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
async fn build_network< async fn build_network<
N: ProviderNodeTypes< N: ProviderNodeTypes<
ChainSpec = C::ChainSpec, ChainSpec = C::ChainSpec,
Primitives: NodePrimitives<Block = reth_primitives::Block>, Primitives: NodePrimitives<
Block = reth_primitives::Block,
Receipt = reth_primitives::Receipt,
>,
>, >,
>( >(
&self, &self,

View File

@ -28,7 +28,10 @@ pub(crate) async fn dump_execution_stage<N, E>(
where where
N: ProviderNodeTypes< N: ProviderNodeTypes<
DB = Arc<DatabaseEnv>, DB = Arc<DatabaseEnv>,
Primitives: NodePrimitives<Block = reth_primitives::Block>, Primitives: NodePrimitives<
Block = reth_primitives::Block,
Receipt = reth_primitives::Receipt,
>,
>, >,
E: BlockExecutorProvider, E: BlockExecutorProvider,
{ {
@ -136,7 +139,12 @@ fn import_tables_with_range<N: NodeTypesWithDB>(
/// `PlainAccountState` safely. There might be some state dependency from an address /// `PlainAccountState` safely. There might be some state dependency from an address
/// which hasn't been changed in the given range. /// which hasn't been changed in the given range.
fn unwind_and_copy< fn unwind_and_copy<
N: ProviderNodeTypes<Primitives: NodePrimitives<Block = reth_primitives::Block>>, N: ProviderNodeTypes<
Primitives: NodePrimitives<
Block = reth_primitives::Block,
Receipt = reth_primitives::Receipt,
>,
>,
>( >(
db_tool: &DbTool<N>, db_tool: &DbTool<N>,
from: u64, from: u64,
@ -174,7 +182,12 @@ fn dry_run<N, E>(
executor: E, executor: E,
) -> eyre::Result<()> ) -> eyre::Result<()>
where where
N: ProviderNodeTypes<Primitives: NodePrimitives<Block = reth_primitives::Block>>, N: ProviderNodeTypes<
Primitives: NodePrimitives<
Block = reth_primitives::Block,
Receipt = reth_primitives::Receipt,
>,
>,
E: BlockExecutorProvider, E: BlockExecutorProvider,
{ {
info!(target: "reth::cli", "Executing stage. [dry-run]"); info!(target: "reth::cli", "Executing stage. [dry-run]");

View File

@ -28,7 +28,10 @@ use tracing::info;
pub(crate) async fn dump_merkle_stage< pub(crate) async fn dump_merkle_stage<
N: ProviderNodeTypes< N: ProviderNodeTypes<
DB = Arc<DatabaseEnv>, DB = Arc<DatabaseEnv>,
Primitives: NodePrimitives<Block = reth_primitives::Block>, Primitives: NodePrimitives<
Block = reth_primitives::Block,
Receipt = reth_primitives::Receipt,
>,
>, >,
>( >(
db_tool: &DbTool<N>, db_tool: &DbTool<N>,
@ -74,7 +77,12 @@ pub(crate) async fn dump_merkle_stage<
/// Dry-run an unwind to FROM block and copy the necessary table data to the new database. /// Dry-run an unwind to FROM block and copy the necessary table data to the new database.
fn unwind_and_copy< fn unwind_and_copy<
N: ProviderNodeTypes<Primitives: NodePrimitives<Block = reth_primitives::Block>>, N: ProviderNodeTypes<
Primitives: NodePrimitives<
Block = reth_primitives::Block,
Receipt = reth_primitives::Receipt,
>,
>,
>( >(
db_tool: &DbTool<N>, db_tool: &DbTool<N>,
range: (u64, u64), range: (u64, u64),

View File

@ -62,6 +62,7 @@ where
Primitives: FullNodePrimitives< Primitives: FullNodePrimitives<
Block = reth_primitives::Block, Block = reth_primitives::Block,
BlockBody = reth_primitives::BlockBody, BlockBody = reth_primitives::BlockBody,
Receipt = reth_primitives::Receipt,
>, >,
>, >,
{ {
@ -171,6 +172,7 @@ where
Primitives: FullNodePrimitives< Primitives: FullNodePrimitives<
Block = reth_primitives::Block, Block = reth_primitives::Block,
BlockBody = reth_primitives::BlockBody, BlockBody = reth_primitives::BlockBody,
Receipt = reth_primitives::Receipt,
>, >,
>, >,
{ {
@ -194,7 +196,8 @@ pub(crate) fn blocks_and_execution_outcome<N>(
) -> eyre::Result<(Vec<SealedBlockWithSenders>, ExecutionOutcome)> ) -> eyre::Result<(Vec<SealedBlockWithSenders>, ExecutionOutcome)>
where where
N: ProviderNodeTypes, N: ProviderNodeTypes,
N::Primitives: FullNodePrimitives<Block = reth_primitives::Block>, N::Primitives:
FullNodePrimitives<Block = reth_primitives::Block, Receipt = reth_primitives::Receipt>,
{ {
let (block1, block2) = blocks(chain_spec.clone(), key_pair)?; let (block1, block2) = blocks(chain_spec.clone(), key_pair)?;

View File

@ -385,6 +385,7 @@ where
N::Primitives: FullNodePrimitives< N::Primitives: FullNodePrimitives<
Block = reth_primitives::Block, Block = reth_primitives::Block,
BlockBody = reth_primitives::BlockBody, BlockBody = reth_primitives::BlockBody,
Receipt = reth_primitives::Receipt,
>, >,
{ {
let factory = ProviderFactory::new( let factory = ProviderFactory::new(
@ -455,6 +456,7 @@ where
N::Primitives: FullNodePrimitives< N::Primitives: FullNodePrimitives<
Block = reth_primitives::Block, Block = reth_primitives::Block,
BlockBody = reth_primitives::BlockBody, BlockBody = reth_primitives::BlockBody,
Receipt = reth_primitives::Receipt,
>, >,
{ {
let factory = self.create_provider_factory().await?; let factory = self.create_provider_factory().await?;

View File

@ -41,8 +41,11 @@ where
N: ProviderNodeTypes, N: ProviderNodeTypes,
Client: EthBlockClient + 'static, Client: EthBlockClient + 'static,
Executor: BlockExecutorProvider, Executor: BlockExecutorProvider,
N::Primitives: N::Primitives: FullNodePrimitives<
FullNodePrimitives<Block = reth_primitives::Block, BlockBody = reth_primitives::BlockBody>, Block = reth_primitives::Block,
BlockBody = reth_primitives::BlockBody,
Receipt = reth_primitives::Receipt,
>,
{ {
// building network downloaders using the fetch client // building network downloaders using the fetch client
let header_downloader = ReverseHeadersDownloaderBuilder::new(config.headers) let header_downloader = ReverseHeadersDownloaderBuilder::new(config.headers)
@ -90,8 +93,11 @@ where
H: HeaderDownloader<Header = alloy_consensus::Header> + 'static, H: HeaderDownloader<Header = alloy_consensus::Header> + 'static,
B: BodyDownloader<Body = BodyTy<N>> + 'static, B: BodyDownloader<Body = BodyTy<N>> + 'static,
Executor: BlockExecutorProvider, Executor: BlockExecutorProvider,
N::Primitives: N::Primitives: FullNodePrimitives<
FullNodePrimitives<Block = reth_primitives::Block, BlockBody = reth_primitives::BlockBody>, Block = reth_primitives::Block,
BlockBody = reth_primitives::BlockBody,
Receipt = reth_primitives::Receipt,
>,
{ {
let mut builder = Pipeline::<N>::builder(); let mut builder = Pipeline::<N>::builder();

View File

@ -15,7 +15,7 @@ use reth_execution_types::ExecutionOutcome;
use reth_node_core::version::SHORT_VERSION; use reth_node_core::version::SHORT_VERSION;
use reth_optimism_chainspec::OpChainSpec; use reth_optimism_chainspec::OpChainSpec;
use reth_optimism_primitives::bedrock::is_dup_tx; use reth_optimism_primitives::bedrock::is_dup_tx;
use reth_primitives::Receipts; use reth_primitives::{NodePrimitives, Receipts};
use reth_provider::{ use reth_provider::{
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, DatabaseProviderFactory, providers::ProviderNodeTypes, writer::UnifiedStorageWriter, DatabaseProviderFactory,
OriginalValuesKnown, ProviderFactory, StageCheckpointReader, StageCheckpointWriter, OriginalValuesKnown, ProviderFactory, StageCheckpointReader, StageCheckpointWriter,
@ -85,7 +85,10 @@ pub async fn import_receipts_from_file<N, P, F>(
filter: F, filter: F,
) -> eyre::Result<()> ) -> eyre::Result<()>
where where
N: ProviderNodeTypes<ChainSpec = OpChainSpec>, N: ProviderNodeTypes<
ChainSpec = OpChainSpec,
Primitives: NodePrimitives<Receipt = reth_primitives::Receipt>,
>,
P: AsRef<Path>, P: AsRef<Path>,
F: FnMut(u64, &mut Receipts) -> usize, F: FnMut(u64, &mut Receipts) -> usize,
{ {
@ -123,7 +126,7 @@ pub async fn import_receipts_from_reader<N, F>(
mut filter: F, mut filter: F,
) -> eyre::Result<ImportReceiptsResult> ) -> eyre::Result<ImportReceiptsResult>
where where
N: ProviderNodeTypes, N: ProviderNodeTypes<Primitives: NodePrimitives<Receipt = reth_primitives::Receipt>>,
F: FnMut(u64, &mut Receipts) -> usize, F: FnMut(u64, &mut Receipts) -> usize,
{ {
let static_file_provider = provider_factory.static_file_provider(); let static_file_provider = provider_factory.static_file_provider();
@ -219,7 +222,7 @@ where
ExecutionOutcome::new(Default::default(), receipts, first_block, Default::default()); ExecutionOutcome::new(Default::default(), receipts, first_block, Default::default());
// finally, write the receipts // finally, write the receipts
provider.write_to_storage( provider.write_state(
execution_outcome, execution_outcome,
OriginalValuesKnown::Yes, OriginalValuesKnown::Yes,
StorageLocation::StaticFiles, StorageLocation::StaticFiles,

View File

@ -131,7 +131,6 @@ impl InMemorySize for Receipt {
Debug, Debug,
PartialEq, PartialEq,
Eq, Eq,
Default,
Serialize, Serialize,
Deserialize, Deserialize,
From, From,
@ -187,6 +186,12 @@ impl From<Receipt> for ReceiptWithBloom {
} }
} }
impl<T> Default for Receipts<T> {
fn default() -> Self {
Self { receipt_vec: Vec::new() }
}
}
/// [`Receipt`] with calculated bloom filter. /// [`Receipt`] with calculated bloom filter.
#[derive(Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize)] #[derive(Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize)]
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))] #[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]

View File

@ -16,7 +16,7 @@ use reth_primitives_traits::{format_gas_throughput, Block, BlockBody, NodePrimit
use reth_provider::{ use reth_provider::{
providers::{StaticFileProvider, StaticFileWriter}, providers::{StaticFileProvider, StaticFileWriter},
BlockHashReader, BlockReader, DBProvider, HeaderProvider, LatestStateProviderRef, BlockHashReader, BlockReader, DBProvider, HeaderProvider, LatestStateProviderRef,
OriginalValuesKnown, ProviderError, StateChangeWriter, StateCommitmentProvider, StateWriter, OriginalValuesKnown, ProviderError, StateCommitmentProvider, StateWriter,
StaticFileProviderFactory, StatsReader, StorageLocation, TransactionVariant, StaticFileProviderFactory, StatsReader, StorageLocation, TransactionVariant,
}; };
use reth_prune_types::PruneModes; use reth_prune_types::PruneModes;
@ -262,9 +262,8 @@ where
+ BlockReader<Block = reth_primitives::Block> + BlockReader<Block = reth_primitives::Block>
+ StaticFileProviderFactory + StaticFileProviderFactory
+ StatsReader + StatsReader
+ StateChangeWriter
+ BlockHashReader + BlockHashReader
+ StateWriter + StateWriter<Receipt = reth_primitives::Receipt>
+ StateCommitmentProvider, + StateCommitmentProvider,
{ {
/// Return the id of the stage /// Return the id of the stage
@ -432,7 +431,7 @@ where
let time = Instant::now(); let time = Instant::now();
// write output // write output
provider.write_to_storage(state, OriginalValuesKnown::Yes, StorageLocation::StaticFiles)?; provider.write_state(state, OriginalValuesKnown::Yes, StorageLocation::StaticFiles)?;
let db_write_duration = time.elapsed(); let db_write_duration = time.elapsed();
debug!( debug!(

View File

@ -13,8 +13,8 @@ use reth_provider::{
errors::provider::ProviderResult, providers::StaticFileWriter, writer::UnifiedStorageWriter, errors::provider::ProviderResult, providers::StaticFileWriter, writer::UnifiedStorageWriter,
BlockHashReader, BlockNumReader, BundleStateInit, ChainSpecProvider, DBProvider, BlockHashReader, BlockNumReader, BundleStateInit, ChainSpecProvider, DBProvider,
DatabaseProviderFactory, ExecutionOutcome, HashingWriter, HeaderProvider, HistoryWriter, DatabaseProviderFactory, ExecutionOutcome, HashingWriter, HeaderProvider, HistoryWriter,
OriginalValuesKnown, ProviderError, RevertsInit, StageCheckpointWriter, StateChangeWriter, OriginalValuesKnown, ProviderError, RevertsInit, StageCheckpointWriter, StateWriter,
StateWriter, StaticFileProviderFactory, StorageLocation, TrieWriter, StaticFileProviderFactory, StorageLocation, TrieWriter,
}; };
use reth_stages_types::{StageCheckpoint, StageId}; use reth_stages_types::{StageCheckpoint, StageId};
use reth_trie::{IntermediateStateRootState, StateRoot as StateRootComputer, StateRootProgress}; use reth_trie::{IntermediateStateRootState, StateRoot as StateRootComputer, StateRootProgress};
@ -75,7 +75,7 @@ where
+ HistoryWriter + HistoryWriter
+ HeaderProvider + HeaderProvider
+ HashingWriter + HashingWriter
+ StateChangeWriter + StateWriter
+ StateWriter + StateWriter
+ AsRef<PF::ProviderRW>, + AsRef<PF::ProviderRW>,
{ {
@ -146,7 +146,6 @@ pub fn insert_genesis_state<'a, 'b, Provider>(
where where
Provider: StaticFileProviderFactory Provider: StaticFileProviderFactory
+ DBProvider<Tx: DbTxMut> + DBProvider<Tx: DbTxMut>
+ StateChangeWriter
+ HeaderProvider + HeaderProvider
+ StateWriter + StateWriter
+ AsRef<Provider>, + AsRef<Provider>,
@ -163,7 +162,6 @@ pub fn insert_state<'a, 'b, Provider>(
where where
Provider: StaticFileProviderFactory Provider: StaticFileProviderFactory
+ DBProvider<Tx: DbTxMut> + DBProvider<Tx: DbTxMut>
+ StateChangeWriter
+ HeaderProvider + HeaderProvider
+ StateWriter + StateWriter
+ AsRef<Provider>, + AsRef<Provider>,
@ -233,11 +231,7 @@ where
Vec::new(), Vec::new(),
); );
provider.write_to_storage( provider.write_state(execution_outcome, OriginalValuesKnown::Yes, StorageLocation::Database)?;
execution_outcome,
OriginalValuesKnown::Yes,
StorageLocation::Database,
)?;
trace!(target: "reth::cli", "Inserted state"); trace!(target: "reth::cli", "Inserted state");
@ -355,7 +349,6 @@ where
+ HistoryWriter + HistoryWriter
+ HeaderProvider + HeaderProvider
+ HashingWriter + HashingWriter
+ StateChangeWriter
+ TrieWriter + TrieWriter
+ StateWriter + StateWriter
+ AsRef<Provider>, + AsRef<Provider>,
@ -478,7 +471,6 @@ where
+ HashingWriter + HashingWriter
+ HistoryWriter + HistoryWriter
+ StateWriter + StateWriter
+ StateChangeWriter
+ AsRef<Provider>, + AsRef<Provider>,
{ {
let accounts_len = collector.len(); let accounts_len = collector.len();

View File

@ -346,9 +346,9 @@ tables! {
} }
/// Canonical only Stores transaction receipts. /// Canonical only Stores transaction receipts.
table Receipts { table Receipts<R = Receipt> {
type Key = TxNumber; type Key = TxNumber;
type Value = Receipt; type Value = R;
} }
/// Stores all smart contract bytecodes. /// Stores all smart contract bytecodes.

View File

@ -145,7 +145,7 @@ impl<N: ProviderNodeTypes> BlockchainProvider2<N> {
pub fn get_state( pub fn get_state(
&self, &self,
range: RangeInclusive<BlockNumber>, range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Option<ExecutionOutcome>> { ) -> ProviderResult<Option<ExecutionOutcome<ReceiptTy<N>>>> {
self.consistent_provider()?.get_state(range) self.consistent_provider()?.get_state(range)
} }
} }

View File

@ -1512,7 +1512,10 @@ impl<N: ProviderNodeTypes> StateReader for ConsistentProvider<N> {
/// inconsistent. Currently this can safely be called within the blockchain tree thread, /// inconsistent. Currently this can safely be called within the blockchain tree thread,
/// because the tree thread is responsible for modifying the [`CanonicalInMemoryState`] in the /// because the tree thread is responsible for modifying the [`CanonicalInMemoryState`] in the
/// first place. /// first place.
fn get_state(&self, block: BlockNumber) -> ProviderResult<Option<ExecutionOutcome>> { fn get_state(
&self,
block: BlockNumber,
) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
if let Some(state) = self.head_block.as_ref().and_then(|b| b.block_on_chain(block.into())) { if let Some(state) = self.head_block.as_ref().and_then(|b| b.block_on_chain(block.into())) {
let state = state.block_ref().execution_outcome().clone(); let state = state.block_ref().execution_outcome().clone();
Ok(Some(state)) Ok(Some(state))

View File

@ -15,10 +15,9 @@ use crate::{
HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter, HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter,
LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError, LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError,
PruneCheckpointReader, PruneCheckpointWriter, RevertsInit, StageCheckpointReader, PruneCheckpointReader, PruneCheckpointWriter, RevertsInit, StageCheckpointReader,
StateChangeWriter, StateCommitmentProvider, StateProviderBox, StateWriter, StateCommitmentProvider, StateProviderBox, StateWriter, StaticFileProviderFactory, StatsReader,
StaticFileProviderFactory, StatsReader, StorageLocation, StorageReader, StorageTrieWriter, StorageLocation, StorageReader, StorageTrieWriter, TransactionVariant, TransactionsProvider,
TransactionVariant, TransactionsProvider, TransactionsProviderExt, TrieWriter, TransactionsProviderExt, TrieWriter, WithdrawalsProvider,
WithdrawalsProvider,
}; };
use alloy_consensus::Header; use alloy_consensus::Header;
use alloy_eips::{ use alloy_eips::{
@ -351,7 +350,7 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
) -> ProviderResult<()> { ) -> ProviderResult<()> {
if remove_from.database() { if remove_from.database() {
// iterate over block body and remove receipts // iterate over block body and remove receipts
self.remove::<tables::Receipts>(from_tx..)?; self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
} }
if remove_from.static_files() && !self.prune_modes.has_receipts_pruning() { if remove_from.static_files() && !self.prune_modes.has_receipts_pruning() {
@ -1536,7 +1535,7 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabasePr
StaticFileSegment::Receipts, StaticFileSegment::Receipts,
id, id,
|static_file| static_file.receipt(id), |static_file| static_file.receipt(id),
|| Ok(self.tx.get::<tables::Receipts>(id)?), || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
) )
} }
@ -1573,7 +1572,10 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabasePr
StaticFileSegment::Receipts, StaticFileSegment::Receipts,
to_range(range), to_range(range),
|static_file, range, _| static_file.receipts_by_tx_range(range), |static_file, range, _| static_file.receipts_by_tx_range(range),
|range, _| self.cursor_read_collect::<tables::Receipts>(range).map_err(Into::into), |range, _| {
self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range)
.map_err(Into::into)
},
|_| true, |_| true,
) )
} }
@ -1798,9 +1800,77 @@ impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N>
} }
} }
impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateChangeWriter impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
for DatabaseProvider<TX, N> for DatabaseProvider<TX, N>
{ {
type Receipt = ReceiptTy<N>;
fn write_state(
&self,
execution_outcome: ExecutionOutcome<Self::Receipt>,
is_value_known: OriginalValuesKnown,
write_receipts_to: StorageLocation,
) -> ProviderResult<()> {
let (plain_state, reverts) =
execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
self.write_state_reverts(reverts, execution_outcome.first_block)?;
self.write_state_changes(plain_state)?;
let mut bodies_cursor = self.tx.cursor_read::<tables::BlockBodyIndices>()?;
let has_receipts_pruning = self.prune_modes.has_receipts_pruning() ||
execution_outcome.receipts.iter().flatten().any(|receipt| receipt.is_none());
// Prepare receipts cursor if we are going to write receipts to the database
//
// We are writing to database if requested or if there's any kind of receipt pruning
// configured
let mut receipts_cursor = (write_receipts_to.database() || has_receipts_pruning)
.then(|| self.tx.cursor_write::<tables::Receipts<Self::Receipt>>())
.transpose()?;
// Prepare receipts static writer if we are going to write receipts to static files
//
// We are writing to static files if requested and if there's no receipt pruning configured
let mut receipts_static_writer = (write_receipts_to.static_files() &&
!has_receipts_pruning)
.then(|| {
self.static_file_provider
.get_writer(execution_outcome.first_block, StaticFileSegment::Receipts)
})
.transpose()?;
for (idx, receipts) in execution_outcome.receipts.into_iter().enumerate() {
let block_number = execution_outcome.first_block + idx as u64;
// Increment block number for receipts static file writer
if let Some(writer) = receipts_static_writer.as_mut() {
writer.increment_block(block_number)?;
}
let first_tx_index = bodies_cursor
.seek_exact(block_number)?
.map(|(_, indices)| indices.first_tx_num())
.ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))?;
for (idx, receipt) in receipts.into_iter().enumerate() {
let receipt_idx = first_tx_index + idx as u64;
if let Some(receipt) = receipt {
if let Some(writer) = &mut receipts_static_writer {
writer.append_receipt(receipt_idx, &receipt)?;
}
if let Some(cursor) = &mut receipts_cursor {
cursor.append(receipt_idx, receipt)?;
}
}
}
}
Ok(())
}
fn write_state_reverts( fn write_state_reverts(
&self, &self,
reverts: PlainStateReverts, reverts: PlainStateReverts,
@ -2089,7 +2159,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateChangeWriter
&self, &self,
block: BlockNumber, block: BlockNumber,
remove_receipts_from: StorageLocation, remove_receipts_from: StorageLocation,
) -> ProviderResult<ExecutionOutcome> { ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
let range = block + 1..=self.last_block_number()?; let range = block + 1..=self.last_block_number()?;
if range.is_empty() { if range.is_empty() {
@ -2172,7 +2242,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateChangeWriter
}, },
|range, _| { |range, _| {
self.tx self.tx
.cursor_read::<tables::Receipts>()? .cursor_read::<tables::Receipts<Self::Receipt>>()?
.walk_range(range)? .walk_range(range)?
.map(|r| r.map_err(Into::into)) .map(|r| r.map_err(Into::into))
.collect() .collect()
@ -2709,6 +2779,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWrite
for DatabaseProvider<TX, N> for DatabaseProvider<TX, N>
{ {
type Block = BlockTy<N>; type Block = BlockTy<N>;
type Receipt = ReceiptTy<N>;
/// Inserts the block into the database, always modifying the following tables: /// Inserts the block into the database, always modifying the following tables:
/// * [`CanonicalHeaders`](tables::CanonicalHeaders) /// * [`CanonicalHeaders`](tables::CanonicalHeaders)
@ -2976,7 +3047,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWrite
fn append_blocks_with_state( fn append_blocks_with_state(
&self, &self,
blocks: Vec<SealedBlockWithSenders<Self::Block>>, blocks: Vec<SealedBlockWithSenders<Self::Block>>,
execution_outcome: ExecutionOutcome, execution_outcome: ExecutionOutcome<Self::Receipt>,
hashed_state: HashedPostStateSorted, hashed_state: HashedPostStateSorted,
trie_updates: TrieUpdates, trie_updates: TrieUpdates,
) -> ProviderResult<()> { ) -> ProviderResult<()> {
@ -2998,11 +3069,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWrite
durations_recorder.record_relative(metrics::Action::InsertBlock); durations_recorder.record_relative(metrics::Action::InsertBlock);
} }
self.write_to_storage( self.write_state(execution_outcome, OriginalValuesKnown::No, StorageLocation::Database)?;
execution_outcome,
OriginalValuesKnown::No,
StorageLocation::Database,
)?;
durations_recorder.record_relative(metrics::Action::InsertState); durations_recorder.record_relative(metrics::Action::InsertState);
// insert hashes and intermediate merkle nodes // insert hashes and intermediate merkle nodes
@ -3050,7 +3117,7 @@ impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N
} }
} }
impl<TX: DbTx + 'static, N: NodeTypes> StatsReader for DatabaseProvider<TX, N> { impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
fn count_entries<T: Table>(&self) -> ProviderResult<usize> { fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
let db_entries = self.tx.entries::<T>()?; let db_entries = self.tx.entries::<T>()?;
let static_file_entries = match self.static_file_provider.count_entries::<T>() { let static_file_entries = match self.static_file_provider.count_entries::<T>() {
@ -3122,73 +3189,3 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
self.prune_modes_ref() self.prune_modes_ref()
} }
} }
impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> StateWriter
for DatabaseProvider<TX, N>
{
fn write_to_storage(
&self,
execution_outcome: ExecutionOutcome,
is_value_known: OriginalValuesKnown,
write_receipts_to: StorageLocation,
) -> ProviderResult<()> {
let (plain_state, reverts) =
execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
self.write_state_reverts(reverts, execution_outcome.first_block)?;
self.write_state_changes(plain_state)?;
let mut bodies_cursor = self.tx.cursor_read::<tables::BlockBodyIndices>()?;
let has_receipts_pruning = self.prune_modes.has_receipts_pruning() ||
execution_outcome.receipts.iter().flatten().any(|receipt| receipt.is_none());
// Prepare receipts cursor if we are going to write receipts to the database
//
// We are writing to database if requested or if there's any kind of receipt pruning
// configured
let mut receipts_cursor = (write_receipts_to.database() || has_receipts_pruning)
.then(|| self.tx.cursor_write::<tables::Receipts>())
.transpose()?;
// Prepare receipts static writer if we are going to write receipts to static files
//
// We are writing to static files if requested and if there's no receipt pruning configured
let mut receipts_static_writer = (write_receipts_to.static_files() &&
!has_receipts_pruning)
.then(|| {
self.static_file_provider
.get_writer(execution_outcome.first_block, StaticFileSegment::Receipts)
})
.transpose()?;
for (idx, receipts) in execution_outcome.receipts.into_iter().enumerate() {
let block_number = execution_outcome.first_block + idx as u64;
// Increment block number for receipts static file writer
if let Some(writer) = receipts_static_writer.as_mut() {
writer.increment_block(block_number)?;
}
let first_tx_index = bodies_cursor
.seek_exact(block_number)?
.map(|(_, indices)| indices.first_tx_num())
.ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))?;
for (idx, receipt) in receipts.into_iter().enumerate() {
let receipt_idx = first_tx_index + idx as u64;
if let Some(receipt) = receipt {
if let Some(writer) = &mut receipts_static_writer {
writer.append_receipt(receipt_idx, &receipt)?;
}
if let Some(cursor) = &mut receipts_cursor {
cursor.append(receipt_idx, receipt)?;
}
}
}
}
Ok(())
}
}

View File

@ -79,8 +79,8 @@ where
Storage: ChainStorage<Self::Primitives>, Storage: ChainStorage<Self::Primitives>,
Primitives: FullNodePrimitives< Primitives: FullNodePrimitives<
SignedTx: Value, SignedTx: Value,
Receipt: Value,
BlockHeader = alloy_consensus::Header, BlockHeader = alloy_consensus::Header,
Receipt = reth_primitives::Receipt,
>, >,
>, >,
{ {
@ -92,8 +92,8 @@ impl<T> NodeTypesForProvider for T where
Storage: ChainStorage<T::Primitives>, Storage: ChainStorage<T::Primitives>,
Primitives: FullNodePrimitives< Primitives: FullNodePrimitives<
SignedTx: Value, SignedTx: Value,
Receipt: Value,
BlockHeader = alloy_consensus::Header, BlockHeader = alloy_consensus::Header,
Receipt = reth_primitives::Receipt,
>, >,
> >
{ {
@ -541,7 +541,7 @@ impl<N: ProviderNodeTypes> ReceiptProvider for BlockchainProvider<N> {
} }
} }
impl<N: ProviderNodeTypes> ReceiptProviderIdExt for BlockchainProvider<N> { impl<N: TreeNodeTypes> ReceiptProviderIdExt for BlockchainProvider<N> {
fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult<Option<Vec<Receipt>>> { fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult<Option<Vec<Receipt>>> {
match block { match block {
BlockId::Hash(rpc_block_hash) => { BlockId::Hash(rpc_block_hash) => {

View File

@ -38,8 +38,8 @@ use reth_primitives::{
DEFAULT_BLOCKS_PER_STATIC_FILE, DEFAULT_BLOCKS_PER_STATIC_FILE,
}, },
transaction::recover_signers, transaction::recover_signers,
BlockWithSenders, SealedBlockFor, SealedBlockWithSenders, SealedHeader, StaticFileSegment, BlockWithSenders, Receipt, SealedBlockFor, SealedBlockWithSenders, SealedHeader,
TransactionMeta, TransactionSignedNoHash, StaticFileSegment, TransactionMeta, TransactionSignedNoHash,
}; };
use reth_primitives_traits::SignedTransaction; use reth_primitives_traits::SignedTransaction;
use reth_stages_types::{PipelineTarget, StageId}; use reth_stages_types::{PipelineTarget, StageId};
@ -1692,7 +1692,7 @@ impl<N: NodePrimitives> StatsReader for StaticFileProvider<N> {
.map(|block| block + 1) .map(|block| block + 1)
.unwrap_or_default() .unwrap_or_default()
as usize), as usize),
tables::Receipts::NAME => Ok(self tables::Receipts::<Receipt>::NAME => Ok(self
.get_highest_static_file_tx(StaticFileSegment::Receipts) .get_highest_static_file_tx(StaticFileSegment::Receipts)
.map(|receipts| receipts + 1) .map(|receipts| receipts + 1)
.unwrap_or_default() as usize), .unwrap_or_default() as usize),

View File

@ -585,7 +585,10 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
/// empty blocks and this function wouldn't be called. /// empty blocks and this function wouldn't be called.
/// ///
/// Returns the current [`TxNumber`] as seen in the static file. /// Returns the current [`TxNumber`] as seen in the static file.
pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &Receipt) -> ProviderResult<()> { pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()>
where
N::Receipt: Compact,
{
let start = Instant::now(); let start = Instant::now();
self.ensure_no_queued_prune()?; self.ensure_no_queued_prune()?;

View File

@ -95,6 +95,8 @@ pub trait StateReader: Send + Sync {
pub trait BlockWriter: Send + Sync { pub trait BlockWriter: Send + Sync {
/// The body this writer can write. /// The body this writer can write.
type Block: reth_primitives_traits::Block; type Block: reth_primitives_traits::Block;
/// The receipt type for [`ExecutionOutcome`].
type Receipt: Send + Sync;
/// Insert full block and make it canonical. Parent tx num and transition id is taken from /// Insert full block and make it canonical. Parent tx num and transition id is taken from
/// parent block in database. /// parent block in database.
@ -154,7 +156,7 @@ pub trait BlockWriter: Send + Sync {
fn append_blocks_with_state( fn append_blocks_with_state(
&self, &self,
blocks: Vec<SealedBlockWithSenders<Self::Block>>, blocks: Vec<SealedBlockWithSenders<Self::Block>>,
execution_outcome: ExecutionOutcome, execution_outcome: ExecutionOutcome<Self::Receipt>,
hashed_state: HashedPostStateSorted, hashed_state: HashedPostStateSorted,
trie_updates: TrieUpdates, trie_updates: TrieUpdates,
) -> ProviderResult<()>; ) -> ProviderResult<()>;

View File

@ -13,7 +13,7 @@ mod header_sync_gap;
pub use header_sync_gap::{HeaderSyncGap, HeaderSyncGapProvider}; pub use header_sync_gap::{HeaderSyncGap, HeaderSyncGapProvider};
mod state; mod state;
pub use state::{StateChangeWriter, StateWriter}; pub use state::StateWriter;
pub use reth_chainspec::ChainSpecProvider; pub use reth_chainspec::ChainSpecProvider;

View File

@ -9,20 +9,20 @@ use revm::db::{
use super::StorageLocation; use super::StorageLocation;
/// A helper trait for [`ExecutionOutcome`] to write state and receipts to storage. /// A trait specifically for writing state changes or reverts
pub trait StateWriter { pub trait StateWriter {
/// Write the data and receipts to the database or static files if `static_file_producer` is /// Receipt type included into [`ExecutionOutcome`].
type Receipt;
/// Write the state and receipts to the database or static files if `static_file_producer` is
/// `Some`. It should be `None` if there is any kind of pruning/filtering over the receipts. /// `Some`. It should be `None` if there is any kind of pruning/filtering over the receipts.
fn write_to_storage( fn write_state(
&self, &self,
execution_outcome: ExecutionOutcome, execution_outcome: ExecutionOutcome<Self::Receipt>,
is_value_known: OriginalValuesKnown, is_value_known: OriginalValuesKnown,
write_receipts_to: StorageLocation, write_receipts_to: StorageLocation,
) -> ProviderResult<()>; ) -> ProviderResult<()>;
}
/// A trait specifically for writing state changes or reverts
pub trait StateChangeWriter {
/// Write state reverts to the database. /// Write state reverts to the database.
/// ///
/// NOTE: Reverts will delete all wiped storage from plain state. /// NOTE: Reverts will delete all wiped storage from plain state.
@ -52,5 +52,5 @@ pub trait StateChangeWriter {
&self, &self,
block: BlockNumber, block: BlockNumber,
remove_receipts_from: StorageLocation, remove_receipts_from: StorageLocation,
) -> ProviderResult<ExecutionOutcome>; ) -> ProviderResult<ExecutionOutcome<Self::Receipt>>;
} }

View File

@ -1,12 +1,14 @@
use crate::{ use crate::{
providers::{StaticFileProvider, StaticFileWriter as SfWriter}, providers::{StaticFileProvider, StaticFileWriter as SfWriter},
BlockExecutionWriter, BlockWriter, HistoryWriter, StateChangeWriter, StateWriter, BlockExecutionWriter, BlockWriter, HistoryWriter, StateWriter, StaticFileProviderFactory,
StaticFileProviderFactory, StorageLocation, TrieWriter, StorageLocation, TrieWriter,
}; };
use alloy_consensus::BlockHeader;
use reth_chain_state::ExecutedBlock; use reth_chain_state::ExecutedBlock;
use reth_db::transaction::{DbTx, DbTxMut}; use reth_db::transaction::{DbTx, DbTxMut};
use reth_errors::ProviderResult; use reth_errors::ProviderResult;
use reth_primitives::StaticFileSegment; use reth_primitives::{NodePrimitives, StaticFileSegment};
use reth_primitives_traits::SignedTransaction;
use reth_storage_api::{DBProvider, StageCheckpointWriter, TransactionsProviderExt}; use reth_storage_api::{DBProvider, StageCheckpointWriter, TransactionsProviderExt};
use reth_storage_errors::writer::UnifiedStorageWriterError; use reth_storage_errors::writer::UnifiedStorageWriterError;
use revm::db::OriginalValuesKnown; use revm::db::OriginalValuesKnown;
@ -119,9 +121,8 @@ impl UnifiedStorageWriter<'_, (), ()> {
impl<ProviderDB> UnifiedStorageWriter<'_, ProviderDB, &StaticFileProvider<ProviderDB::Primitives>> impl<ProviderDB> UnifiedStorageWriter<'_, ProviderDB, &StaticFileProvider<ProviderDB::Primitives>>
where where
ProviderDB: DBProvider<Tx: DbTx + DbTxMut> ProviderDB: DBProvider<Tx: DbTx + DbTxMut>
+ BlockWriter<Block = reth_primitives::Block> + BlockWriter
+ TransactionsProviderExt + TransactionsProviderExt
+ StateChangeWriter
+ TrieWriter + TrieWriter
+ StateWriter + StateWriter
+ HistoryWriter + HistoryWriter
@ -131,7 +132,11 @@ where
+ StaticFileProviderFactory, + StaticFileProviderFactory,
{ {
/// Writes executed blocks and receipts to storage. /// Writes executed blocks and receipts to storage.
pub fn save_blocks(&self, blocks: Vec<ExecutedBlock>) -> ProviderResult<()> { pub fn save_blocks<N>(&self, blocks: Vec<ExecutedBlock<N>>) -> ProviderResult<()>
where
N: NodePrimitives<SignedTx: SignedTransaction>,
ProviderDB: BlockWriter<Block = N::Block> + StateWriter<Receipt = N::Receipt>,
{
if blocks.is_empty() { if blocks.is_empty() {
debug!(target: "provider::storage_writer", "Attempted to write empty block range"); debug!(target: "provider::storage_writer", "Attempted to write empty block range");
return Ok(()) return Ok(())
@ -139,9 +144,10 @@ where
// NOTE: checked non-empty above // NOTE: checked non-empty above
let first_block = blocks.first().unwrap().block(); let first_block = blocks.first().unwrap().block();
let last_block = blocks.last().unwrap().block(); let last_block = blocks.last().unwrap().block();
let first_number = first_block.number; let first_number = first_block.number();
let last_block_number = last_block.number; let last_block_number = last_block.number();
debug!(target: "provider::storage_writer", block_count = %blocks.len(), "Writing blocks and execution data to storage"); debug!(target: "provider::storage_writer", block_count = %blocks.len(), "Writing blocks and execution data to storage");
@ -162,7 +168,7 @@ where
// Write state and changesets to the database. // Write state and changesets to the database.
// Must be written after blocks because of the receipt lookup. // Must be written after blocks because of the receipt lookup.
self.database().write_to_storage( self.database().write_state(
Arc::unwrap_or_clone(execution_output), Arc::unwrap_or_clone(execution_output),
OriginalValuesKnown::No, OriginalValuesKnown::No,
StorageLocation::StaticFiles, StorageLocation::StaticFiles,
@ -490,7 +496,7 @@ mod tests {
let outcome = let outcome =
ExecutionOutcome::new(state.take_bundle(), Receipts::default(), 1, Vec::new()); ExecutionOutcome::new(state.take_bundle(), Receipts::default(), 1, Vec::new());
provider provider
.write_to_storage(outcome, OriginalValuesKnown::Yes, StorageLocation::Database) .write_state(outcome, OriginalValuesKnown::Yes, StorageLocation::Database)
.expect("Could not write bundle state to DB"); .expect("Could not write bundle state to DB");
// Check plain storage state // Check plain storage state
@ -590,7 +596,7 @@ mod tests {
let outcome = let outcome =
ExecutionOutcome::new(state.take_bundle(), Receipts::default(), 2, Vec::new()); ExecutionOutcome::new(state.take_bundle(), Receipts::default(), 2, Vec::new());
provider provider
.write_to_storage(outcome, OriginalValuesKnown::Yes, StorageLocation::Database) .write_state(outcome, OriginalValuesKnown::Yes, StorageLocation::Database)
.expect("Could not write bundle state to DB"); .expect("Could not write bundle state to DB");
assert_eq!( assert_eq!(
@ -657,7 +663,7 @@ mod tests {
let outcome = let outcome =
ExecutionOutcome::new(init_state.take_bundle(), Receipts::default(), 0, Vec::new()); ExecutionOutcome::new(init_state.take_bundle(), Receipts::default(), 0, Vec::new());
provider provider
.write_to_storage(outcome, OriginalValuesKnown::Yes, StorageLocation::Database) .write_state(outcome, OriginalValuesKnown::Yes, StorageLocation::Database)
.expect("Could not write bundle state to DB"); .expect("Could not write bundle state to DB");
let mut state = State::builder().with_bundle_update().build(); let mut state = State::builder().with_bundle_update().build();
@ -805,7 +811,7 @@ mod tests {
let outcome: ExecutionOutcome = let outcome: ExecutionOutcome =
ExecutionOutcome::new(bundle, Receipts::default(), 1, Vec::new()); ExecutionOutcome::new(bundle, Receipts::default(), 1, Vec::new());
provider provider
.write_to_storage(outcome, OriginalValuesKnown::Yes, StorageLocation::Database) .write_state(outcome, OriginalValuesKnown::Yes, StorageLocation::Database)
.expect("Could not write bundle state to DB"); .expect("Could not write bundle state to DB");
let mut storage_changeset_cursor = provider let mut storage_changeset_cursor = provider
@ -970,7 +976,7 @@ mod tests {
let outcome = let outcome =
ExecutionOutcome::new(init_state.take_bundle(), Receipts::default(), 0, Vec::new()); ExecutionOutcome::new(init_state.take_bundle(), Receipts::default(), 0, Vec::new());
provider provider
.write_to_storage(outcome, OriginalValuesKnown::Yes, StorageLocation::Database) .write_state(outcome, OriginalValuesKnown::Yes, StorageLocation::Database)
.expect("Could not write bundle state to DB"); .expect("Could not write bundle state to DB");
let mut state = State::builder().with_bundle_update().build(); let mut state = State::builder().with_bundle_update().build();
@ -1017,7 +1023,7 @@ mod tests {
let outcome = let outcome =
ExecutionOutcome::new(state.take_bundle(), Receipts::default(), 1, Vec::new()); ExecutionOutcome::new(state.take_bundle(), Receipts::default(), 1, Vec::new());
provider provider
.write_to_storage(outcome, OriginalValuesKnown::Yes, StorageLocation::Database) .write_state(outcome, OriginalValuesKnown::Yes, StorageLocation::Database)
.expect("Could not write bundle state to DB"); .expect("Could not write bundle state to DB");
let mut storage_changeset_cursor = provider let mut storage_changeset_cursor = provider

View File

@ -5,8 +5,7 @@ use proptest::{prelude::*, strategy::ValueTree, test_runner::TestRunner};
use proptest_arbitrary_interop::arb; use proptest_arbitrary_interop::arb;
use reth_primitives::Account; use reth_primitives::Account;
use reth_provider::{ use reth_provider::{
providers::ConsistentDbView, test_utils::create_test_provider_factory, StateChangeWriter, providers::ConsistentDbView, test_utils::create_test_provider_factory, StateWriter, TrieWriter,
TrieWriter,
}; };
use reth_trie::{ use reth_trie::{
hashed_cursor::HashedPostStateCursorFactory, HashedPostState, HashedStorage, StateRoot, hashed_cursor::HashedPostStateCursorFactory, HashedPostState, HashedStorage, StateRoot,