refactor: unify logic for writing receipts (#12878)

This commit is contained in:
Arsenii Kulikov
2024-11-26 18:24:40 +04:00
committed by GitHub
parent dee0b8c055
commit 2d6b8937c3
12 changed files with 151 additions and 332 deletions

View File

@ -22,9 +22,9 @@ use reth_network_api::NetworkInfo;
use reth_node_ethereum::EthExecutorProvider;
use reth_primitives::BlockExt;
use reth_provider::{
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, AccountExtReader,
ChainSpecProvider, HashingWriter, HeaderProvider, LatestStateProviderRef, OriginalValuesKnown,
ProviderFactory, StageCheckpointReader, StateWriter, StorageReader,
providers::ProviderNodeTypes, AccountExtReader, ChainSpecProvider, DatabaseProviderFactory,
HashingWriter, HeaderProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderFactory,
StageCheckpointReader, StateWriter, StorageLocation, StorageReader,
};
use reth_revm::database::StateProviderDatabase;
use reth_stages::StageId;
@ -163,7 +163,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
return Ok(())
}
let provider_rw = provider_factory.provider_rw()?;
let provider_rw = provider_factory.database_provider_rw()?;
// Insert block, state and hashes
provider_rw.insert_historical_block(
@ -172,8 +172,11 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
.try_seal_with_senders()
.map_err(|_| BlockValidationError::SenderRecoveryError)?,
)?;
let mut storage_writer = UnifiedStorageWriter::from_database(&provider_rw.0);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::No)?;
provider_rw.write_to_storage(
execution_outcome,
OriginalValuesKnown::No,
StorageLocation::Database,
)?;
let storage_lists = provider_rw.changed_storages_with_range(block.number..=block.number)?;
let storages = provider_rw.plain_state_storages(storage_lists)?;
provider_rw.insert_storage_for_hashing(storages)?;

View File

@ -20,9 +20,9 @@ use reth_network_p2p::full_block::FullBlockClient;
use reth_node_api::BlockTy;
use reth_node_ethereum::EthExecutorProvider;
use reth_provider::{
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockNumReader, BlockWriter,
ChainSpecProvider, DatabaseProviderFactory, HeaderProvider, LatestStateProviderRef,
OriginalValuesKnown, ProviderError, ProviderFactory, StateWriter, StorageLocation,
providers::ProviderNodeTypes, BlockNumReader, BlockWriter, ChainSpecProvider,
DatabaseProviderFactory, HeaderProvider, LatestStateProviderRef, OriginalValuesKnown,
ProviderError, ProviderFactory, StateWriter, StorageLocation,
};
use reth_revm::database::StateProviderDatabase;
use reth_stages::{
@ -158,8 +158,11 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
executor.execute_and_verify_one((&sealed_block.clone().unseal(), td).into())?;
let execution_outcome = executor.finalize();
let mut storage_writer = UnifiedStorageWriter::from_database(&provider_rw);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::Yes)?;
provider_rw.write_to_storage(
execution_outcome,
OriginalValuesKnown::Yes,
StorageLocation::Database,
)?;
let checkpoint = Some(StageCheckpoint::new(
block_number

View File

@ -19,7 +19,7 @@ use reth_primitives::Receipts;
use reth_provider::{
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, DatabaseProviderFactory,
OriginalValuesKnown, ProviderFactory, StageCheckpointReader, StageCheckpointWriter,
StateWriter, StaticFileProviderFactory, StaticFileWriter, StatsReader,
StateWriter, StaticFileProviderFactory, StatsReader, StorageLocation,
};
use reth_stages::{StageCheckpoint, StageId};
use reth_static_file_types::StaticFileSegment;
@ -219,11 +219,11 @@ where
ExecutionOutcome::new(Default::default(), receipts, first_block, Default::default());
// finally, write the receipts
let mut storage_writer = UnifiedStorageWriter::from(
&provider,
static_file_provider.latest_writer(StaticFileSegment::Receipts)?,
);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::Yes)?;
provider.write_to_storage(
execution_outcome,
OriginalValuesKnown::Yes,
StorageLocation::StaticFiles,
)?;
}
// Only commit if we have imported as many receipts as the number of transactions.

View File

@ -15,10 +15,9 @@ use reth_primitives::{SealedHeader, StaticFileSegment};
use reth_primitives_traits::{format_gas_throughput, Block, BlockBody, NodePrimitives};
use reth_provider::{
providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter},
writer::UnifiedStorageWriter,
BlockHashReader, BlockReader, DBProvider, HeaderProvider, LatestStateProviderRef,
OriginalValuesKnown, ProviderError, StateChangeWriter, StateWriter, StaticFileProviderFactory,
StatsReader, TransactionVariant,
StatsReader, StorageLocation, TransactionVariant,
};
use reth_prune_types::PruneModes;
use reth_revm::database::StateProviderDatabase;
@ -180,9 +179,8 @@ where
+ StaticFileProviderFactory
+ StatsReader
+ StateChangeWriter
+ BlockHashReader,
for<'a> UnifiedStorageWriter<'a, Provider, StaticFileProviderRWRefMut<'a, Provider::Primitives>>:
StateWriter,
+ BlockHashReader
+ StateWriter,
{
/// Return the id of the stage
fn id(&self) -> StageId {
@ -211,7 +209,7 @@ where
let static_file_provider = provider.static_file_provider();
// We only use static files for Receipts, if there is no receipt pruning of any kind.
let static_file_producer = if self.prune_modes.receipts.is_none() &&
let write_receipts_to = if self.prune_modes.receipts.is_none() &&
self.prune_modes.receipts_log_filter.is_empty()
{
debug!(target: "sync::stages::execution", start = start_block, "Preparing static file producer");
@ -220,9 +218,9 @@ where
// Since there might be a database <-> static file inconsistency (read
// `prepare_static_file_producer` for context), we commit the change straight away.
producer.commit()?;
Some(producer)
StorageLocation::StaticFiles
} else {
None
StorageLocation::Database
};
let db = StateProviderDatabase(LatestStateProviderRef::new(provider));
@ -362,8 +360,7 @@ where
let time = Instant::now();
// write output
let mut writer = UnifiedStorageWriter::new(provider, static_file_producer);
writer.write_to_storage(state, OriginalValuesKnown::Yes)?;
provider.write_to_storage(state, OriginalValuesKnown::Yes, write_receipts_to)?;
let db_write_duration = time.elapsed();
debug!(

View File

@ -14,7 +14,7 @@ use reth_provider::{
BlockHashReader, BlockNumReader, BundleStateInit, ChainSpecProvider, DBProvider,
DatabaseProviderFactory, ExecutionOutcome, HashingWriter, HeaderProvider, HistoryWriter,
OriginalValuesKnown, ProviderError, RevertsInit, StageCheckpointWriter, StateChangeWriter,
StateWriter, StaticFileProviderFactory, TrieWriter,
StateWriter, StaticFileProviderFactory, StorageLocation, TrieWriter,
};
use reth_stages_types::{StageCheckpoint, StageId};
use reth_trie::{IntermediateStateRootState, StateRoot as StateRootComputer, StateRootProgress};
@ -76,6 +76,7 @@ where
+ HeaderProvider
+ HashingWriter
+ StateChangeWriter
+ StateWriter
+ AsRef<PF::ProviderRW>,
{
let chain = factory.chain_spec();
@ -147,6 +148,7 @@ where
+ DBProvider<Tx: DbTxMut>
+ StateChangeWriter
+ HeaderProvider
+ StateWriter
+ AsRef<Provider>,
{
insert_state(provider, alloc, 0)
@ -163,6 +165,7 @@ where
+ DBProvider<Tx: DbTxMut>
+ StateChangeWriter
+ HeaderProvider
+ StateWriter
+ AsRef<Provider>,
{
let capacity = alloc.size_hint().1.unwrap_or(0);
@ -230,8 +233,11 @@ where
Vec::new(),
);
let mut storage_writer = UnifiedStorageWriter::from_database(&provider);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::Yes)?;
provider.write_to_storage(
execution_outcome,
OriginalValuesKnown::Yes,
StorageLocation::Database,
)?;
trace!(target: "reth::cli", "Inserted state");
@ -351,6 +357,7 @@ where
+ HashingWriter
+ StateChangeWriter
+ TrieWriter
+ StateWriter
+ AsRef<Provider>,
{
let block = provider_rw.last_block_number()?;
@ -470,6 +477,7 @@ where
+ HeaderProvider
+ HashingWriter
+ HistoryWriter
+ StateWriter
+ StateChangeWriter
+ AsRef<Provider>,
{

View File

@ -904,13 +904,18 @@ mod tests {
.unwrap_or_default();
// Insert blocks into the database
for block in &database_blocks {
for (block, receipts) in database_blocks.iter().zip(&receipts) {
// TODO: this should be moved inside `insert_historical_block`: <https://github.com/paradigmxyz/reth/issues/11524>
let mut transactions_writer =
static_file_provider.latest_writer(StaticFileSegment::Transactions)?;
let mut receipts_writer =
static_file_provider.latest_writer(StaticFileSegment::Receipts)?;
transactions_writer.increment_block(block.number)?;
for tx in block.body.transactions() {
receipts_writer.increment_block(block.number)?;
for (tx, receipt) in block.body.transactions().iter().zip(receipts) {
transactions_writer.append_transaction(tx_num, tx)?;
receipts_writer.append_receipt(tx_num, receipt)?;
tx_num += 1;
}
@ -919,19 +924,6 @@ mod tests {
)?;
}
// Insert receipts into the static files
UnifiedStorageWriter::new(
&provider_rw,
Some(factory.static_file_provider().latest_writer(StaticFileSegment::Receipts)?),
)
.append_receipts_from_blocks(
// The initial block number is required
database_blocks.first().map(|b| b.number).unwrap_or_default(),
receipts[..database_blocks.len()]
.iter()
.map(|vec| vec.clone().into_iter().map(Some).collect::<Vec<_>>()),
)?;
// Commit to both storages: database and static files
UnifiedStorageWriter::commit(provider_rw)?;

View File

@ -9,7 +9,6 @@ use crate::{
traits::{
AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
},
writer::UnifiedStorageWriter,
AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
DBProvider, EvmEnvProvider, HashingWriter, HeaderProvider, HeaderSyncGap,
@ -3017,12 +3016,11 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWrite
durations_recorder.record_relative(metrics::Action::InsertBlock);
}
// Write state and changesets to the database.
// Must be written after blocks because of the receipt lookup.
// TODO: should _these_ be moved to storagewriter? seems like storagewriter should be
// _above_ db provider
let mut storage_writer = UnifiedStorageWriter::from_database(self);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::No)?;
self.write_to_storage(
execution_outcome,
OriginalValuesKnown::No,
StorageLocation::Database,
)?;
durations_recorder.record_relative(metrics::Action::InsertState);
// insert hashes and intermediate merkle nodes
@ -3142,3 +3140,73 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
self.prune_modes_ref()
}
}
impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> StateWriter
for DatabaseProvider<TX, N>
{
fn write_to_storage(
&self,
execution_outcome: ExecutionOutcome,
is_value_known: OriginalValuesKnown,
write_receipts_to: StorageLocation,
) -> ProviderResult<()> {
let (plain_state, reverts) =
execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
self.write_state_reverts(reverts, execution_outcome.first_block)?;
self.write_state_changes(plain_state)?;
let mut bodies_cursor = self.tx.cursor_read::<tables::BlockBodyIndices>()?;
let has_receipts_pruning = self.prune_modes.has_receipts_pruning() ||
execution_outcome.receipts.iter().flatten().any(|receipt| receipt.is_none());
// Prepare receipts cursor if we are going to write receipts to the database
//
// We are writing to database if requested or if there's any kind of receipt pruning
// configured
let mut receipts_cursor = (write_receipts_to.database() || has_receipts_pruning)
.then(|| self.tx.cursor_write::<tables::Receipts>())
.transpose()?;
// Prepare receipts static writer if we are going to write receipts to static files
//
// We are writing to static files if requested and if there's no receipt pruning configured
let mut receipts_static_writer = (write_receipts_to.static_files() &&
!has_receipts_pruning)
.then(|| {
self.static_file_provider
.get_writer(execution_outcome.first_block, StaticFileSegment::Receipts)
})
.transpose()?;
for (idx, receipts) in execution_outcome.receipts.into_iter().enumerate() {
let block_number = execution_outcome.first_block + idx as u64;
// Increment block number for receipts static file writer
if let Some(writer) = receipts_static_writer.as_mut() {
writer.increment_block(block_number)?;
}
let first_tx_index = bodies_cursor
.seek_exact(block_number)?
.map(|(_, indices)| indices.first_tx_num())
.ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))?;
for (idx, receipt) in receipts.into_iter().enumerate() {
let receipt_idx = first_tx_index + idx as u64;
if let Some(receipt) = receipt {
if let Some(writer) = &mut receipts_static_writer {
writer.append_receipt(receipt_idx, &receipt)?;
}
if let Some(cursor) = &mut receipts_cursor {
cursor.append(receipt_idx, receipt)?;
}
}
}
}
Ok(())
}
}

View File

@ -8,14 +8,17 @@ use revm::db::{
};
use std::ops::RangeInclusive;
use super::StorageLocation;
/// A helper trait for [`ExecutionOutcome`] to write state and receipts to storage.
pub trait StateWriter {
/// Write the data and receipts to the database or static files if `static_file_producer` is
/// `Some`. It should be `None` if there is any kind of pruning/filtering over the receipts.
fn write_to_storage(
&mut self,
&self,
execution_outcome: ExecutionOutcome,
is_value_known: OriginalValuesKnown,
write_receipts_to: StorageLocation,
) -> ProviderResult<()>;
}

View File

@ -1,29 +0,0 @@
use alloy_primitives::{BlockNumber, TxNumber};
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
tables,
};
use reth_errors::ProviderResult;
use reth_primitives::Receipt;
use reth_storage_api::ReceiptWriter;
pub(crate) struct DatabaseWriter<'a, W>(pub(crate) &'a mut W);
impl<W> ReceiptWriter for DatabaseWriter<'_, W>
where
W: DbCursorRO<tables::Receipts> + DbCursorRW<tables::Receipts>,
{
fn append_block_receipts(
&mut self,
first_tx_index: TxNumber,
_: BlockNumber,
receipts: Vec<Option<Receipt>>,
) -> ProviderResult<()> {
for (tx_idx, receipt) in receipts.into_iter().enumerate() {
if let Some(receipt) = receipt {
self.0.append(first_tx_index + tx_idx as u64, receipt)?;
}
}
Ok(())
}
}

View File

@ -1,35 +1,17 @@
use crate::{
providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter as SfWriter},
writer::static_file::StaticFileWriter,
providers::{StaticFileProvider, StaticFileWriter as SfWriter},
BlockExecutionWriter, BlockWriter, HistoryWriter, StateChangeWriter, StateWriter,
StaticFileProviderFactory, StorageLocation, TrieWriter,
};
use alloy_primitives::BlockNumber;
use reth_chain_state::ExecutedBlock;
use reth_db::{
cursor::DbCursorRO,
tables,
transaction::{DbTx, DbTxMut},
};
use reth_errors::{ProviderError, ProviderResult};
use reth_execution_types::ExecutionOutcome;
use reth_db::transaction::{DbTx, DbTxMut};
use reth_errors::ProviderResult;
use reth_primitives::StaticFileSegment;
use reth_storage_api::{
DBProvider, HeaderProvider, ReceiptWriter, StageCheckpointWriter, TransactionsProviderExt,
};
use reth_storage_api::{DBProvider, StageCheckpointWriter, TransactionsProviderExt};
use reth_storage_errors::writer::UnifiedStorageWriterError;
use revm::db::OriginalValuesKnown;
use tracing::debug;
mod database;
mod static_file;
use database::DatabaseWriter;
enum StorageType<C = (), S = ()> {
Database(C),
StaticFile(S),
}
/// [`UnifiedStorageWriter`] is responsible for managing the writing to storage with both database
/// and static file providers.
#[derive(Debug)]
@ -81,14 +63,6 @@ impl<'a, ProviderDB, ProviderSF> UnifiedStorageWriter<'a, ProviderDB, ProviderSF
self.static_file.as_ref().expect("should exist")
}
/// Returns a mutable reference to the static file instance.
///
/// # Panics
/// If the static file instance is not set.
fn static_file_mut(&mut self) -> &mut ProviderSF {
self.static_file.as_mut().expect("should exist")
}
/// Ensures that the static file instance is set.
///
/// # Returns
@ -148,6 +122,7 @@ where
+ TransactionsProviderExt
+ StateChangeWriter
+ TrieWriter
+ StateWriter
+ HistoryWriter
+ StageCheckpointWriter
+ BlockExecutionWriter
@ -169,16 +144,6 @@ where
debug!(target: "provider::storage_writer", block_count = %blocks.len(), "Writing blocks and execution data to storage");
// Only write receipts to static files if there is no receipt pruning configured.
let mut state_writer = if self.database().prune_modes_ref().has_receipts_pruning() {
UnifiedStorageWriter::from_database(self.database())
} else {
UnifiedStorageWriter::from(
self.database(),
self.static_file().get_writer(first_block.number, StaticFileSegment::Receipts)?,
)
};
// TODO: remove all the clones and do performant / batched writes for each type of object
// instead of a loop over all blocks,
// meaning:
@ -196,7 +161,11 @@ where
// Write state and changesets to the database.
// Must be written after blocks because of the receipt lookup.
let execution_outcome = block.execution_outcome().clone();
state_writer.write_to_storage(execution_outcome, OriginalValuesKnown::No)?;
self.database().write_to_storage(
execution_outcome,
OriginalValuesKnown::No,
StorageLocation::StaticFiles,
)?;
// insert hashes and intermediate merkle nodes
{
@ -261,149 +230,6 @@ where
}
}
impl<ProviderDB>
UnifiedStorageWriter<'_, ProviderDB, StaticFileProviderRWRefMut<'_, ProviderDB::Primitives>>
where
ProviderDB: DBProvider<Tx: DbTx> + HeaderProvider + StaticFileProviderFactory,
{
/// Ensures that the static file writer is set and of the right [`StaticFileSegment`] variant.
///
/// # Returns
/// - `Ok(())` if the static file writer is set.
/// - `Err(StorageWriterError::MissingStaticFileWriter)` if the static file instance is not set.
fn ensure_static_file_segment(
&self,
segment: StaticFileSegment,
) -> Result<(), UnifiedStorageWriterError> {
match &self.static_file {
Some(writer) => {
if writer.user_header().segment() == segment {
Ok(())
} else {
Err(UnifiedStorageWriterError::IncorrectStaticFileWriter(
writer.user_header().segment(),
segment,
))
}
}
None => Err(UnifiedStorageWriterError::MissingStaticFileWriter),
}
}
}
impl<ProviderDB>
UnifiedStorageWriter<'_, ProviderDB, StaticFileProviderRWRefMut<'_, ProviderDB::Primitives>>
where
ProviderDB: DBProvider<Tx: DbTxMut + DbTx> + HeaderProvider + StaticFileProviderFactory,
{
/// Appends receipts block by block.
///
/// ATTENTION: If called from [`UnifiedStorageWriter`] without a static file producer, it will
/// always write them to database. Otherwise, it will look into the pruning configuration to
/// decide.
///
/// NOTE: The static file writer used to construct this [`UnifiedStorageWriter`] MUST be a
/// writer for the Receipts segment.
///
/// # Parameters
/// - `initial_block_number`: The starting block number.
/// - `blocks`: An iterator over blocks, each block having a vector of optional receipts. If
/// `receipt` is `None`, it has been pruned.
pub fn append_receipts_from_blocks(
&mut self,
initial_block_number: BlockNumber,
blocks: impl Iterator<Item = Vec<Option<reth_primitives::Receipt>>>,
) -> ProviderResult<()> {
let mut bodies_cursor =
self.database().tx_ref().cursor_read::<tables::BlockBodyIndices>()?;
// We write receipts to database in two situations:
// * If we are in live sync. In this case, `UnifiedStorageWriter` is built without a static
// file writer.
// * If there is any kind of receipt pruning
let mut storage_type = if self.static_file.is_none() ||
self.database().prune_modes_ref().has_receipts_pruning()
{
StorageType::Database(self.database().tx_ref().cursor_write::<tables::Receipts>()?)
} else {
self.ensure_static_file_segment(StaticFileSegment::Receipts)?;
StorageType::StaticFile(self.static_file_mut())
};
let mut last_tx_idx = None;
for (idx, receipts) in blocks.enumerate() {
let block_number = initial_block_number + idx as u64;
let mut first_tx_index =
bodies_cursor.seek_exact(block_number)?.map(|(_, indices)| indices.first_tx_num());
// If there are no indices, that means there have been no transactions
//
// So instead of returning an error, use zero
if block_number == initial_block_number && first_tx_index.is_none() {
first_tx_index = Some(0);
}
let first_tx_index = first_tx_index
.or(last_tx_idx)
.ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))?;
// update for empty blocks
last_tx_idx = Some(first_tx_index);
match &mut storage_type {
StorageType::Database(cursor) => {
DatabaseWriter(cursor).append_block_receipts(
first_tx_index,
block_number,
receipts,
)?;
}
StorageType::StaticFile(sf) => {
StaticFileWriter(*sf).append_block_receipts(
first_tx_index,
block_number,
receipts,
)?;
}
};
}
Ok(())
}
}
impl<ProviderDB> StateWriter
for UnifiedStorageWriter<'_, ProviderDB, StaticFileProviderRWRefMut<'_, ProviderDB::Primitives>>
where
ProviderDB: DBProvider<Tx: DbTxMut + DbTx>
+ StateChangeWriter
+ HeaderProvider
+ StaticFileProviderFactory,
{
/// Write the data and receipts to the database or static files if `static_file_producer` is
/// `Some`. It should be `None` if there is any kind of pruning/filtering over the receipts.
fn write_to_storage(
&mut self,
execution_outcome: ExecutionOutcome,
is_value_known: OriginalValuesKnown,
) -> ProviderResult<()> {
let (plain_state, reverts) =
execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
self.database().write_state_reverts(reverts, execution_outcome.first_block)?;
self.append_receipts_from_blocks(
execution_outcome.first_block,
execution_outcome.receipts.into_iter(),
)?;
self.database().write_state_changes(plain_state)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -417,6 +243,7 @@ mod tests {
models::{AccountBeforeTx, BlockNumberAddress},
transaction::{DbTx, DbTxMut},
};
use reth_execution_types::ExecutionOutcome;
use reth_primitives::{Account, Receipt, Receipts, StorageEntry};
use reth_storage_api::DatabaseProviderFactory;
use reth_trie::{
@ -679,9 +506,8 @@ mod tests {
let outcome =
ExecutionOutcome::new(state.take_bundle(), Receipts::default(), 1, Vec::new());
let mut writer = UnifiedStorageWriter::from_database(&provider);
writer
.write_to_storage(outcome, OriginalValuesKnown::Yes)
provider
.write_to_storage(outcome, OriginalValuesKnown::Yes, StorageLocation::Database)
.expect("Could not write bundle state to DB");
// Check plain storage state
@ -780,9 +606,8 @@ mod tests {
state.merge_transitions(BundleRetention::Reverts);
let outcome =
ExecutionOutcome::new(state.take_bundle(), Receipts::default(), 2, Vec::new());
let mut writer = UnifiedStorageWriter::from_database(&provider);
writer
.write_to_storage(outcome, OriginalValuesKnown::Yes)
provider
.write_to_storage(outcome, OriginalValuesKnown::Yes, StorageLocation::Database)
.expect("Could not write bundle state to DB");
assert_eq!(
@ -848,9 +673,8 @@ mod tests {
let outcome =
ExecutionOutcome::new(init_state.take_bundle(), Receipts::default(), 0, Vec::new());
let mut writer = UnifiedStorageWriter::from_database(&provider);
writer
.write_to_storage(outcome, OriginalValuesKnown::Yes)
provider
.write_to_storage(outcome, OriginalValuesKnown::Yes, StorageLocation::Database)
.expect("Could not write bundle state to DB");
let mut state = State::builder().with_bundle_update().build();
@ -997,9 +821,8 @@ mod tests {
let outcome: ExecutionOutcome =
ExecutionOutcome::new(bundle, Receipts::default(), 1, Vec::new());
let mut writer = UnifiedStorageWriter::from_database(&provider);
writer
.write_to_storage(outcome, OriginalValuesKnown::Yes)
provider
.write_to_storage(outcome, OriginalValuesKnown::Yes, StorageLocation::Database)
.expect("Could not write bundle state to DB");
let mut storage_changeset_cursor = provider
@ -1163,9 +986,8 @@ mod tests {
init_state.merge_transitions(BundleRetention::Reverts);
let outcome =
ExecutionOutcome::new(init_state.take_bundle(), Receipts::default(), 0, Vec::new());
let mut writer = UnifiedStorageWriter::from_database(&provider);
writer
.write_to_storage(outcome, OriginalValuesKnown::Yes)
provider
.write_to_storage(outcome, OriginalValuesKnown::Yes, StorageLocation::Database)
.expect("Could not write bundle state to DB");
let mut state = State::builder().with_bundle_update().build();
@ -1211,9 +1033,8 @@ mod tests {
state.merge_transitions(BundleRetention::Reverts);
let outcome =
ExecutionOutcome::new(state.take_bundle(), Receipts::default(), 1, Vec::new());
let mut writer = UnifiedStorageWriter::from_database(&provider);
writer
.write_to_storage(outcome, OriginalValuesKnown::Yes)
provider
.write_to_storage(outcome, OriginalValuesKnown::Yes, StorageLocation::Database)
.expect("Could not write bundle state to DB");
let mut storage_changeset_cursor = provider

View File

@ -1,30 +0,0 @@
use crate::providers::StaticFileProviderRWRefMut;
use alloy_primitives::{BlockNumber, TxNumber};
use reth_errors::ProviderResult;
use reth_node_types::NodePrimitives;
use reth_primitives::Receipt;
use reth_storage_api::ReceiptWriter;
pub(crate) struct StaticFileWriter<'a, W>(pub(crate) &'a mut W);
impl<N: NodePrimitives> ReceiptWriter for StaticFileWriter<'_, StaticFileProviderRWRefMut<'_, N>> {
fn append_block_receipts(
&mut self,
first_tx_index: TxNumber,
block_number: BlockNumber,
receipts: Vec<Option<Receipt>>,
) -> ProviderResult<()> {
// Increment block on static file header.
self.0.increment_block(block_number)?;
let receipts = receipts.iter().enumerate().map(|(tx_idx, receipt)| {
Ok((
first_tx_index + tx_idx as u64,
receipt
.as_ref()
.expect("receipt should not be filtered when saving to static files."),
))
});
self.0.append_receipts(receipts)?;
Ok(())
}
}

View File

@ -1,6 +1,6 @@
use crate::BlockIdReader;
use alloy_eips::{BlockHashOrNumber, BlockId, BlockNumberOrTag};
use alloy_primitives::{BlockNumber, TxHash, TxNumber};
use alloy_primitives::{TxHash, TxNumber};
use reth_primitives::Receipt;
use reth_storage_errors::provider::ProviderResult;
use std::ops::RangeBounds;
@ -68,20 +68,3 @@ pub trait ReceiptProviderIdExt: ReceiptProvider + BlockIdReader {
self.receipts_by_block_id(number_or_tag.into())
}
}
/// Writer trait for writing [`Receipt`] data.
pub trait ReceiptWriter {
/// Appends receipts for a block.
///
/// # Parameters
/// - `first_tx_index`: The transaction number of the first receipt in the block.
/// - `block_number`: The block number to which the receipts belong.
/// - `receipts`: A vector of optional receipts in the block. If `None`, it means they were
/// pruned.
fn append_block_receipts(
&mut self,
first_tx_index: TxNumber,
block_number: BlockNumber,
receipts: Vec<Option<Receipt>>,
) -> ProviderResult<()>;
}