refactor(storage): unify blocks insertion logic (#12694)

This commit is contained in:
Arsenii Kulikov
2024-11-21 02:48:39 +04:00
committed by GitHub
parent 749f98e021
commit 0c5984179e
24 changed files with 225 additions and 268 deletions

1
Cargo.lock generated
View File

@ -7254,6 +7254,7 @@ dependencies = [
"reth-payload-primitives",
"reth-payload-validator",
"reth-primitives",
"reth-primitives-traits",
"reth-provider",
"reth-prune",
"reth-prune-types",

View File

@ -21,7 +21,7 @@ use reth_node_ethereum::EthExecutorProvider;
use reth_provider::{
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockNumReader, BlockWriter,
ChainSpecProvider, DatabaseProviderFactory, HeaderProvider, LatestStateProviderRef,
OriginalValuesKnown, ProviderError, ProviderFactory, StateWriter,
OriginalValuesKnown, ProviderError, ProviderFactory, StateWriter, StorageLocation,
};
use reth_revm::database::StateProviderDatabase;
use reth_stages::{
@ -148,7 +148,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
.map_err(|block| eyre::eyre!("Error sealing block with senders: {block:?}"))?;
trace!(target: "reth::cli", block_number, "Executing block");
provider_rw.insert_block(sealed_block.clone())?;
provider_rw.insert_block(sealed_block.clone(), StorageLocation::Database)?;
td += sealed_block.difficulty;
let mut executor = executor_provider.batch_executor(StateProviderDatabase::new(

View File

@ -1,6 +1,7 @@
//! Implementation of [`BlockchainTree`]
use crate::{
externals::TreeNodeTypes,
metrics::{MakeCanonicalAction, MakeCanonicalDurationsRecorder, TreeMetrics},
state::{SidechainId, TreeState},
AppendableChain, BlockIndices, BlockchainTreeConfig, ExecutionData, TreeExternals,
@ -21,10 +22,10 @@ use reth_primitives::{
SealedHeader, StaticFileSegment,
};
use reth_provider::{
providers::ProviderNodeTypes, BlockExecutionWriter, BlockNumReader, BlockWriter,
CanonStateNotification, CanonStateNotificationSender, CanonStateNotifications,
ChainSpecProvider, ChainSplit, ChainSplitTarget, DBProvider, DisplayBlocksChain,
HeaderProvider, ProviderError, StaticFileProviderFactory,
BlockExecutionWriter, BlockNumReader, BlockWriter, CanonStateNotification,
CanonStateNotificationSender, CanonStateNotifications, ChainSpecProvider, ChainSplit,
ChainSplitTarget, DBProvider, DisplayBlocksChain, HeaderProvider, ProviderError,
StaticFileProviderFactory,
};
use reth_stages_api::{MetricEvent, MetricEventsSender};
use reth_storage_errors::provider::{ProviderResult, RootMismatch};
@ -93,7 +94,7 @@ impl<N: NodeTypesWithDB, E> BlockchainTree<N, E> {
impl<N, E> BlockchainTree<N, E>
where
N: ProviderNodeTypes,
N: TreeNodeTypes,
E: BlockExecutorProvider,
{
/// Builds the blockchain tree for the node.
@ -1386,16 +1387,18 @@ mod tests {
use reth_db_api::transaction::DbTxMut;
use reth_evm::test_utils::MockExecutorProvider;
use reth_evm_ethereum::execute::EthExecutorProvider;
use reth_node_types::FullNodePrimitives;
use reth_primitives::{
proofs::{calculate_receipt_root, calculate_transaction_root},
Account, BlockBody, Transaction, TransactionSigned, TransactionSignedEcRecovered,
};
use reth_provider::{
providers::ProviderNodeTypes,
test_utils::{
blocks::BlockchainTestData, create_test_provider_factory_with_chain_spec,
MockNodeTypesWithDB,
},
ProviderFactory,
ProviderFactory, StorageLocation,
};
use reth_revm::primitives::AccountInfo;
use reth_stages_api::StageCheckpoint;
@ -1420,7 +1423,12 @@ mod tests {
TreeExternals::new(provider_factory, consensus, executor_factory)
}
fn setup_genesis<N: ProviderNodeTypes>(factory: &ProviderFactory<N>, mut genesis: SealedBlock) {
fn setup_genesis<
N: ProviderNodeTypes<Primitives: FullNodePrimitives<Block = reth_primitives::Block>>,
>(
factory: &ProviderFactory<N>,
mut genesis: SealedBlock,
) {
// insert genesis to db.
genesis.header.set_block_number(10);
@ -1551,6 +1559,7 @@ mod tests {
SealedBlock::new(chain_spec.sealed_genesis_header(), Default::default())
.try_seal_with_senders()
.unwrap(),
StorageLocation::Database,
)
.unwrap();
let account = Account { balance: initial_signer_balance, ..Default::default() };

View File

@ -4,8 +4,8 @@ use alloy_primitives::{BlockHash, BlockNumber};
use reth_consensus::Consensus;
use reth_db::{static_file::HeaderMask, tables};
use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
use reth_node_types::NodeTypesWithDB;
use reth_primitives::StaticFileSegment;
use reth_node_types::{Block, FullNodePrimitives, NodeTypesWithDB};
use reth_primitives::{BlockBody, StaticFileSegment};
use reth_provider::{
providers::ProviderNodeTypes, ChainStateBlockReader, ChainStateBlockWriter, ProviderFactory,
StaticFileProviderFactory, StatsReader,
@ -13,6 +13,16 @@ use reth_provider::{
use reth_storage_errors::provider::ProviderResult;
use std::{collections::BTreeMap, sync::Arc};
/// A helper trait with requirements for [`ProviderNodeTypes`] to be used within [`TreeExternals`].
pub trait TreeNodeTypes:
ProviderNodeTypes<Primitives: FullNodePrimitives<Block: Block<Body = BlockBody>>>
{
}
impl<T> TreeNodeTypes for T where
T: ProviderNodeTypes<Primitives: FullNodePrimitives<Block: Block<Body = BlockBody>>>
{
}
/// A container for external components.
///
/// This is a simple container for external components used throughout the blockchain tree

View File

@ -1,5 +1,7 @@
//! Wrapper around `BlockchainTree` that allows for it to be shared.
use crate::externals::TreeNodeTypes;
use super::BlockchainTree;
use alloy_eips::BlockNumHash;
use alloy_primitives::{BlockHash, BlockNumber};
@ -36,7 +38,7 @@ impl<N: NodeTypesWithDB, E> ShareableBlockchainTree<N, E> {
impl<N, E> BlockchainTreeEngine for ShareableBlockchainTree<N, E>
where
N: ProviderNodeTypes,
N: TreeNodeTypes,
E: BlockExecutorProvider,
{
fn buffer_block(&self, block: SealedBlockWithSenders) -> Result<(), InsertBlockError> {
@ -107,7 +109,7 @@ where
impl<N, E> BlockchainTreeViewer for ShareableBlockchainTree<N, E>
where
N: ProviderNodeTypes,
N: TreeNodeTypes,
E: BlockExecutorProvider,
{
fn header_by_hash(&self, hash: BlockHash) -> Option<SealedHeader> {
@ -170,7 +172,7 @@ where
impl<N, E> BlockchainTreePendingStateProvider for ShareableBlockchainTree<N, E>
where
N: ProviderNodeTypes,
N: TreeNodeTypes,
E: BlockExecutorProvider,
{
fn find_pending_state_provider(

View File

@ -3,12 +3,10 @@ use alloy_rlp::Decodable;
use alloy_consensus::Header;
use reth_node_builder::NodePrimitives;
use reth_primitives::{
BlockBody, SealedBlock, SealedBlockWithSenders, SealedHeader, StaticFileSegment,
};
use reth_primitives::{SealedBlock, SealedBlockWithSenders, SealedHeader, StaticFileSegment};
use reth_provider::{
providers::StaticFileProvider, BlockWriter, StageCheckpointWriter, StaticFileProviderFactory,
StaticFileWriter,
StaticFileWriter, StorageLocation,
};
use reth_stages::{StageCheckpoint, StageId};
@ -33,7 +31,9 @@ pub fn setup_without_evm<Provider>(
total_difficulty: U256,
) -> Result<(), eyre::Error>
where
Provider: StaticFileProviderFactory + StageCheckpointWriter + BlockWriter,
Provider: StaticFileProviderFactory
+ StageCheckpointWriter
+ BlockWriter<Body: reth_node_api::BlockBody>,
{
info!(target: "reth::cli", "Setting up dummy EVM chain before importing state.");
@ -64,11 +64,12 @@ fn append_first_block<Provider>(
total_difficulty: U256,
) -> Result<(), eyre::Error>
where
Provider: BlockWriter + StaticFileProviderFactory,
Provider: BlockWriter<Body: reth_node_api::BlockBody> + StaticFileProviderFactory,
{
provider_rw.insert_block(
SealedBlockWithSenders::new(SealedBlock::new(header.clone(), BlockBody::default()), vec![])
SealedBlockWithSenders::new(SealedBlock::new(header.clone(), Default::default()), vec![])
.expect("no senders or txes"),
StorageLocation::Database,
)?;
let sf_provider = provider_rw.static_file_provider();

View File

@ -1991,7 +1991,8 @@ mod tests {
use alloy_rpc_types_engine::{ForkchoiceState, ForkchoiceUpdated, PayloadStatus};
use assert_matches::assert_matches;
use reth_chainspec::{ChainSpecBuilder, MAINNET};
use reth_provider::{BlockWriter, ProviderFactory};
use reth_node_types::FullNodePrimitives;
use reth_provider::{BlockWriter, ProviderFactory, StorageLocation};
use reth_rpc_types_compat::engine::payload::block_to_payload_v1;
use reth_stages::{ExecOutput, PipelineError, StageError};
use reth_stages_api::StageCheckpoint;
@ -2169,7 +2170,10 @@ mod tests {
assert_matches!(rx.await, Ok(Ok(())));
}
fn insert_blocks<'a, N: ProviderNodeTypes>(
fn insert_blocks<
'a,
N: ProviderNodeTypes<Primitives: FullNodePrimitives<Block = reth_primitives::Block>>,
>(
provider_factory: ProviderFactory<N>,
mut blocks: impl Iterator<Item = &'a SealedBlock>,
) {
@ -2179,6 +2183,7 @@ mod tests {
provider
.insert_block(
b.clone().try_seal_with_senders().expect("invalid tx signature in block"),
StorageLocation::Database,
)
.map(drop)
})

View File

@ -27,7 +27,7 @@ use reth_engine_tree::{
EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineRequestHandler, FromEngine,
RequestHandlerEvent,
},
persistence::PersistenceHandle,
persistence::{PersistenceHandle, PersistenceNodeTypes},
tree::{EngineApiTreeHandler, InvalidBlockHook, TreeConfig},
};
use reth_evm::execute::BlockExecutorProvider;
@ -59,7 +59,7 @@ where
impl<N> LocalEngineService<N>
where
N: EngineNodeTypes,
N: EngineNodeTypes + PersistenceNodeTypes,
{
/// Constructor for [`LocalEngineService`].
#[allow(clippy::too_many_arguments)]

View File

@ -8,7 +8,7 @@ use reth_engine_tree::{
backfill::PipelineSync,
download::BasicBlockDownloader,
engine::{EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineHandler},
persistence::PersistenceHandle,
persistence::{PersistenceHandle, PersistenceNodeTypes},
tree::{EngineApiTreeHandler, InvalidBlockHook, TreeConfig},
};
pub use reth_engine_tree::{
@ -59,7 +59,7 @@ where
impl<N, Client, E> EngineService<N, Client, E>
where
N: EngineNodeTypes,
N: EngineNodeTypes + PersistenceNodeTypes,
Client: EthBlockClient + 'static,
E: BlockExecutorProvider + 'static,
{

View File

@ -27,6 +27,7 @@ reth-payload-builder-primitives.workspace = true
reth-payload-primitives.workspace = true
reth-payload-validator.workspace = true
reth-primitives.workspace = true
reth-primitives-traits.workspace = true
reth-provider.workspace = true
reth-prune.workspace = true
reth-revm.workspace = true
@ -107,4 +108,5 @@ test-utils = [
"reth-provider/test-utils",
"reth-trie/test-utils",
"reth-prune-types?/test-utils",
"reth-primitives-traits/test-utils",
]

View File

@ -2,6 +2,8 @@ use crate::metrics::PersistenceMetrics;
use alloy_eips::BlockNumHash;
use reth_chain_state::ExecutedBlock;
use reth_errors::ProviderError;
use reth_primitives::BlockBody;
use reth_primitives_traits::{Block, FullNodePrimitives};
use reth_provider::{
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockHashReader,
ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory, StaticFileProviderFactory,
@ -16,6 +18,16 @@ use thiserror::Error;
use tokio::sync::oneshot;
use tracing::{debug, error};
/// A helper trait with requirements for [`ProviderNodeTypes`] to be used within
/// [`PersistenceService`].
pub trait PersistenceNodeTypes:
ProviderNodeTypes<Primitives: FullNodePrimitives<Block: Block<Body = BlockBody>>>
{
}
impl<T> PersistenceNodeTypes for T where
T: ProviderNodeTypes<Primitives: FullNodePrimitives<Block: Block<Body = BlockBody>>>
{
}
/// Writes parts of reth's in memory tree state to the database and static files.
///
/// This is meant to be a spawned service that listens for various incoming persistence operations,
@ -60,7 +72,7 @@ impl<N: ProviderNodeTypes> PersistenceService<N> {
}
}
impl<N: ProviderNodeTypes> PersistenceService<N> {
impl<N: PersistenceNodeTypes> PersistenceService<N> {
/// This is the main loop, that will listen to database events and perform the requested
/// database actions
pub fn run(mut self) -> Result<(), PersistenceError> {
@ -198,7 +210,7 @@ impl PersistenceHandle {
}
/// Create a new [`PersistenceHandle`], and spawn the persistence service.
pub fn spawn_service<N: ProviderNodeTypes>(
pub fn spawn_service<N: PersistenceNodeTypes>(
provider_factory: ProviderFactory<N>,
pruner: PrunerWithFactory<ProviderFactory<N>>,
sync_metrics_tx: MetricEventsSender,

View File

@ -9,6 +9,7 @@ use reth_evm::execute::{
BatchExecutor, BlockExecutionInput, BlockExecutionOutput, BlockExecutorProvider, Executor,
};
use reth_evm_ethereum::execute::EthExecutorProvider;
use reth_node_api::FullNodePrimitives;
use reth_primitives::{
Block, BlockBody, BlockWithSenders, Receipt, SealedBlockWithSenders, Transaction,
};
@ -57,7 +58,7 @@ pub(crate) fn execute_block_and_commit_to_database<N>(
block: &BlockWithSenders,
) -> eyre::Result<BlockExecutionOutput<Receipt>>
where
N: ProviderNodeTypes,
N: ProviderNodeTypes<Primitives: FullNodePrimitives<Block = reth_primitives::Block>>,
{
let provider = provider_factory.provider()?;
@ -161,7 +162,7 @@ pub(crate) fn blocks_and_execution_outputs<N>(
key_pair: Keypair,
) -> eyre::Result<Vec<(SealedBlockWithSenders, BlockExecutionOutput<Receipt>)>>
where
N: ProviderNodeTypes,
N: ProviderNodeTypes<Primitives: FullNodePrimitives<Block = reth_primitives::Block>>,
{
let (block1, block2) = blocks(chain_spec.clone(), key_pair)?;
@ -183,6 +184,7 @@ pub(crate) fn blocks_and_execution_outcome<N>(
) -> eyre::Result<(Vec<SealedBlockWithSenders>, ExecutionOutcome)>
where
N: ProviderNodeTypes,
N::Primitives: FullNodePrimitives<Block = reth_primitives::Block>,
{
let (block1, block2) = blocks(chain_spec.clone(), key_pair)?;

View File

@ -657,7 +657,7 @@ mod tests {
use reth_primitives::SealedBlockWithSenders;
use reth_provider::{
providers::BlockchainProvider2, test_utils::create_test_provider_factory, BlockReader,
BlockWriter, Chain, DatabaseProviderFactory, TransactionVariant,
BlockWriter, Chain, DatabaseProviderFactory, StorageLocation, TransactionVariant,
};
use reth_testing_utils::generators::{self, random_block, BlockParams};
@ -1238,7 +1238,7 @@ mod tests {
.seal_with_senders()
.unwrap();
let provider_rw = provider_factory.database_provider_rw().unwrap();
provider_rw.insert_block(block.clone()).unwrap();
provider_rw.insert_block(block.clone(), StorageLocation::Database).unwrap();
provider_rw.commit().unwrap();
let provider = BlockchainProvider2::new(provider_factory).unwrap();

View File

@ -403,7 +403,7 @@ mod tests {
use reth_primitives::Block;
use reth_provider::{
providers::BlockchainProvider2, test_utils::create_test_provider_factory, BlockWriter,
Chain, DatabaseProviderFactory,
Chain, DatabaseProviderFactory, StorageLocation,
};
use reth_testing_utils::generators::{self, random_block, BlockParams};
use tokio::sync::mpsc;
@ -431,6 +431,7 @@ mod tests {
let provider_rw = provider_factory.provider_rw()?;
provider_rw.insert_block(
node_head_block.clone().seal_with_senders().ok_or_eyre("failed to recover senders")?,
StorageLocation::Database,
)?;
provider_rw.commit()?;
@ -574,7 +575,7 @@ mod tests {
..Default::default()
};
let provider_rw = provider.database_provider_rw()?;
provider_rw.insert_block(node_head_block)?;
provider_rw.insert_block(node_head_block, StorageLocation::Database)?;
provider_rw.commit()?;
let node_head_notification = ExExNotification::ChainCommitted {
new: Arc::new(

View File

@ -12,7 +12,8 @@ use eyre::{Context, OptionExt};
use rayon::ThreadPoolBuilder;
use reth_beacon_consensus::EthBeaconConsensus;
use reth_blockchain_tree::{
BlockchainTree, BlockchainTreeConfig, ShareableBlockchainTree, TreeExternals,
externals::TreeNodeTypes, BlockchainTree, BlockchainTreeConfig, ShareableBlockchainTree,
TreeExternals,
};
use reth_chainspec::{Chain, EthChainSpec, EthereumHardforks};
use reth_config::{config::EtlConfig, PruneConfig};
@ -631,7 +632,7 @@ impl<T>
Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithMeteredProviders<T>>,
>
where
T: FullNodeTypes<Types: ProviderNodeTypes, Provider: WithTree>,
T: FullNodeTypes<Types: ProviderNodeTypes + TreeNodeTypes, Provider: WithTree>,
{
/// Returns access to the underlying database.
pub const fn database(&self) -> &<T::Types as NodeTypesWithDB>::DB {

View File

@ -19,6 +19,7 @@ use reth_primitives::StaticFileSegment;
use reth_provider::{
providers::{StaticFileProvider, StaticFileWriter},
BlockReader, BlockWriter, DBProvider, ProviderError, StaticFileProviderFactory, StatsReader,
StorageLocation,
};
use reth_stages_api::{
EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
@ -122,7 +123,7 @@ where
let (from_block, to_block) = input.next_block_range().into_inner();
// Get id for the next tx_num of zero if there are no transactions.
let mut next_tx_num = provider
let next_tx_num = provider
.tx_ref()
.cursor_read::<tables::TransactionBlocks>()?
.last()?
@ -130,8 +131,6 @@ where
.unwrap_or_default();
let static_file_provider = provider.static_file_provider();
let mut static_file_producer =
static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?;
// Make sure Transactions static file is at the same height. If it's further, this
// input execution was interrupted previously and we need to unwind the static file.
@ -145,6 +144,8 @@ where
// stage run. So, our only solution is to unwind the static files and proceed from the
// database expected height.
Ordering::Greater => {
let mut static_file_producer =
static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?;
static_file_producer
.prune_transactions(next_static_file_tx_num - next_tx_num, from_block - 1)?;
// Since this is a database <-> static file inconsistency, we commit the change
@ -168,40 +169,16 @@ where
let buffer = self.buffer.take().ok_or(StageError::MissingDownloadBuffer)?;
trace!(target: "sync::stages::bodies", bodies_len = buffer.len(), "Writing blocks");
let mut highest_block = from_block;
let highest_block = buffer.last().map(|r| r.block_number()).unwrap_or(from_block);
// Firstly, write transactions to static files
for response in &buffer {
let block_number = response.block_number();
// Increment block on static file header.
if block_number > 0 {
static_file_producer.increment_block(block_number)?;
}
match response {
BlockResponse::Full(block) => {
// Write transactions
for transaction in block.body.transactions() {
static_file_producer.append_transaction(next_tx_num, transaction)?;
// Increment transaction id for each transaction.
next_tx_num += 1;
}
}
BlockResponse::Empty(_) => {}
};
highest_block = block_number;
}
// Write bodies to database. This will NOT write transactions to database as we've already
// written them directly to static files.
// Write bodies to database.
provider.append_block_bodies(
buffer
.into_iter()
.map(|response| (response.block_number(), response.into_body()))
.collect(),
// We are writing transactions directly to static files.
StorageLocation::StaticFiles,
)?;
// The stage is "done" if:

View File

@ -61,7 +61,10 @@ impl AccountHashingStage {
pub fn seed<Tx: DbTx + DbTxMut + 'static, N: reth_provider::providers::ProviderNodeTypes>(
provider: &reth_provider::DatabaseProvider<Tx, N>,
opts: SeedOpts,
) -> Result<Vec<(alloy_primitives::Address, reth_primitives::Account)>, StageError> {
) -> Result<Vec<(alloy_primitives::Address, reth_primitives::Account)>, StageError>
where
N::Primitives: reth_primitives_traits::FullNodePrimitives<Block = reth_primitives::Block>,
{
use alloy_primitives::U256;
use reth_db_api::models::AccountBeforeTx;
use reth_provider::{StaticFileProviderFactory, StaticFileWriter};

View File

@ -8,7 +8,9 @@ use alloy_consensus::Header;
use alloy_genesis::GenesisAccount;
use alloy_primitives::{Address, Bytes, Log, B256, U256};
use reth_codecs::{add_arbitrary_tests, Compact};
use reth_primitives::{Account, Bytecode, Receipt, StorageEntry, TransactionSignedNoHash, TxType};
use reth_primitives::{
Account, Bytecode, Receipt, StorageEntry, TransactionSigned, TransactionSignedNoHash, TxType,
};
use reth_prune_types::{PruneCheckpoint, PruneSegment};
use reth_stages_types::StageCheckpoint;
use reth_trie_common::{StoredNibbles, StoredNibblesSubKey, *};
@ -225,6 +227,7 @@ impl_compression_for_compact!(
Bytecode,
AccountBeforeTx,
TransactionSignedNoHash,
TransactionSigned,
CompactU256,
StageCheckpoint,
PruneCheckpoint,

View File

@ -22,14 +22,6 @@ impl Default for DurationsRecorder {
}
impl DurationsRecorder {
/// Saves the provided duration for future logging and instantly reports as a metric with
/// `action` label.
pub(crate) fn record_duration(&mut self, action: Action, duration: Duration) {
self.actions.push((action, duration));
self.current_metrics.record_duration(action, duration);
self.latest = Some(self.start.elapsed());
}
/// Records the duration since last record, saves it for future logging and instantly reports as
/// a metric with `action` label.
pub(crate) fn record_relative(&mut self, action: Action) {
@ -56,11 +48,6 @@ pub(crate) enum Action {
InsertHeaders,
InsertHeaderNumbers,
InsertHeaderTerminalDifficulties,
InsertBlockOmmers,
InsertTransactionSenders,
InsertTransactions,
InsertTransactionHashNumbers,
InsertBlockWithdrawals,
InsertBlockBodyIndices,
InsertTransactionBlocks,
GetNextTxNum,
@ -95,16 +82,6 @@ struct DatabaseProviderMetrics {
insert_header_numbers: Histogram,
/// Duration of insert header TD
insert_header_td: Histogram,
/// Duration of insert block ommers
insert_block_ommers: Histogram,
/// Duration of insert tx senders
insert_tx_senders: Histogram,
/// Duration of insert transactions
insert_transactions: Histogram,
/// Duration of insert transaction hash numbers
insert_tx_hash_numbers: Histogram,
/// Duration of insert block withdrawals
insert_block_withdrawals: Histogram,
/// Duration of insert block body indices
insert_block_body_indices: Histogram,
/// Duration of insert transaction blocks
@ -131,11 +108,6 @@ impl DatabaseProviderMetrics {
Action::InsertHeaders => self.insert_headers.record(duration),
Action::InsertHeaderNumbers => self.insert_header_numbers.record(duration),
Action::InsertHeaderTerminalDifficulties => self.insert_header_td.record(duration),
Action::InsertBlockOmmers => self.insert_block_ommers.record(duration),
Action::InsertTransactionSenders => self.insert_tx_senders.record(duration),
Action::InsertTransactions => self.insert_transactions.record(duration),
Action::InsertTransactionHashNumbers => self.insert_tx_hash_numbers.record(duration),
Action::InsertBlockWithdrawals => self.insert_block_withdrawals.record(duration),
Action::InsertBlockBodyIndices => self.insert_block_body_indices.record(duration),
Action::InsertTransactionBlocks => self.insert_tx_blocks.record(duration),
Action::GetNextTxNum => self.get_next_tx_num.record(duration),

View File

@ -644,7 +644,7 @@ mod tests {
providers::{StaticFileProvider, StaticFileWriter},
test_utils::{blocks::TEST_BLOCK, create_test_provider_factory, MockNodeTypesWithDB},
BlockHashReader, BlockNumReader, BlockWriter, DBProvider, HeaderSyncGapProvider,
TransactionsProvider,
StorageLocation, TransactionsProvider,
};
use alloy_primitives::{TxNumber, B256, U256};
use assert_matches::assert_matches;
@ -715,7 +715,10 @@ mod tests {
{
let provider = factory.provider_rw().unwrap();
assert_matches!(
provider.insert_block(block.clone().try_seal_with_senders().unwrap()),
provider.insert_block(
block.clone().try_seal_with_senders().unwrap(),
StorageLocation::Database
),
Ok(_)
);
assert_matches!(
@ -733,7 +736,10 @@ mod tests {
};
let provider = factory.with_prune_modes(prune_modes).provider_rw().unwrap();
assert_matches!(
provider.insert_block(block.clone().try_seal_with_senders().unwrap(),),
provider.insert_block(
block.clone().try_seal_with_senders().unwrap(),
StorageLocation::Database
),
Ok(_)
);
assert_matches!(provider.transaction_sender(0), Ok(None));
@ -754,7 +760,10 @@ mod tests {
let provider = factory.provider_rw().unwrap();
assert_matches!(
provider.insert_block(block.clone().try_seal_with_senders().unwrap()),
provider.insert_block(
block.clone().try_seal_with_senders().unwrap(),
StorageLocation::Database
),
Ok(_)
);

View File

@ -17,8 +17,8 @@ use crate::{
LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError,
PruneCheckpointReader, PruneCheckpointWriter, RevertsInit, StageCheckpointReader,
StateChangeWriter, StateProviderBox, StateReader, StateWriter, StaticFileProviderFactory,
StatsReader, StorageReader, StorageTrieWriter, TransactionVariant, TransactionsProvider,
TransactionsProviderExt, TrieWriter, WithdrawalsProvider,
StatsReader, StorageLocation, StorageReader, StorageTrieWriter, TransactionVariant,
TransactionsProvider, TransactionsProviderExt, TrieWriter, WithdrawalsProvider,
};
use alloy_consensus::Header;
use alloy_eips::{
@ -37,7 +37,7 @@ use reth_db_api::{
database::Database,
models::{
sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
ShardedKey, StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals,
ShardedKey, StoredBlockBodyIndices,
},
table::Table,
transaction::{DbTx, DbTxMut},
@ -52,7 +52,7 @@ use reth_primitives::{
SealedBlockWithSenders, SealedHeader, StaticFileSegment, StorageEntry, TransactionMeta,
TransactionSigned, TransactionSignedEcRecovered, TransactionSignedNoHash,
};
use reth_primitives_traits::{BlockBody as _, FullNodePrimitives};
use reth_primitives_traits::{BlockBody as _, FullNodePrimitives, SignedTransaction};
use reth_prune_types::{PruneCheckpoint, PruneModes, PruneSegment};
use reth_stages_types::{StageCheckpoint, StageId};
use reth_storage_api::{StateProvider, StorageChangeSetReader, TryIntoHistoricalStateProvider};
@ -73,10 +73,9 @@ use std::{
fmt::Debug,
ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
sync::{mpsc, Arc},
time::{Duration, Instant},
};
use tokio::sync::watch;
use tracing::{debug, error, trace, warn};
use tracing::{debug, error, trace};
/// A [`DatabaseProvider`] that holds a read-only database transaction.
pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
@ -292,7 +291,7 @@ impl<Tx: DbTx + DbTxMut + 'static, N: ProviderNodeTypes + 'static> DatabaseProvi
/// Inserts an historical block. **Used for setting up test environments**
pub fn insert_historical_block(
&self,
block: SealedBlockWithSenders,
block: SealedBlockWithSenders<Header, <Self as BlockWriter>::Body>,
) -> ProviderResult<StoredBlockBodyIndices> {
let ttd = if block.number == 0 {
block.difficulty
@ -316,7 +315,7 @@ impl<Tx: DbTx + DbTxMut + 'static, N: ProviderNodeTypes + 'static> DatabaseProvi
writer.append_header(block.header.as_ref(), ttd, &block.hash())?;
self.insert_block(block)
self.insert_block(block, StorageLocation::Database)
}
}
@ -3137,7 +3136,8 @@ impl<TX: DbTxMut + DbTx + 'static, N: ProviderNodeTypes + 'static> BlockWriter
/// [`TransactionHashNumbers`](tables::TransactionHashNumbers).
fn insert_block(
&self,
block: SealedBlockWithSenders,
block: SealedBlockWithSenders<Header, Self::Body>,
write_transactions_to: StorageLocation,
) -> ProviderResult<StoredBlockBodyIndices> {
let block_number = block.number;
@ -3166,15 +3166,6 @@ impl<TX: DbTxMut + DbTx + 'static, N: ProviderNodeTypes + 'static> BlockWriter
self.tx.put::<tables::HeaderTerminalDifficulties>(block_number, ttd.into())?;
durations_recorder.record_relative(metrics::Action::InsertHeaderTerminalDifficulties);
// insert body ommers data
if !block.body.ommers.is_empty() {
self.tx.put::<tables::BlockOmmers>(
block_number,
StoredBlockOmmers { ommers: block.block.body.ommers },
)?;
durations_recorder.record_relative(metrics::Action::InsertBlockOmmers);
}
let mut next_tx_num = self
.tx
.cursor_read::<tables::TransactionBlocks>()?
@ -3184,84 +3175,28 @@ impl<TX: DbTxMut + DbTx + 'static, N: ProviderNodeTypes + 'static> BlockWriter
durations_recorder.record_relative(metrics::Action::GetNextTxNum);
let first_tx_num = next_tx_num;
let tx_count = block.block.body.transactions.len() as u64;
let tx_count = block.block.body.transactions().len() as u64;
// Ensures we have all the senders for the block's transactions.
let mut tx_senders_elapsed = Duration::default();
let mut transactions_elapsed = Duration::default();
let mut tx_hash_numbers_elapsed = Duration::default();
for (transaction, sender) in
block.block.body.transactions.into_iter().zip(block.senders.iter())
block.block.body.transactions().iter().zip(block.senders.iter())
{
let hash = transaction.hash();
let hash = transaction.tx_hash();
if self
.prune_modes
.sender_recovery
.as_ref()
.filter(|prune_mode| prune_mode.is_full())
.is_none()
{
let start = Instant::now();
if self.prune_modes.sender_recovery.as_ref().is_none_or(|m| !m.is_full()) {
self.tx.put::<tables::TransactionSenders>(next_tx_num, *sender)?;
tx_senders_elapsed += start.elapsed();
}
let start = Instant::now();
self.tx.put::<tables::Transactions>(next_tx_num, transaction.into())?;
let elapsed = start.elapsed();
if elapsed > Duration::from_secs(1) {
warn!(
target: "providers::db",
?block_number,
tx_num = %next_tx_num,
hash = %hash,
?elapsed,
"Transaction insertion took too long"
);
}
transactions_elapsed += elapsed;
if self
.prune_modes
.transaction_lookup
.filter(|prune_mode| prune_mode.is_full())
.is_none()
{
let start = Instant::now();
self.tx.put::<tables::TransactionHashNumbers>(hash, next_tx_num)?;
tx_hash_numbers_elapsed += start.elapsed();
if self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full()) {
self.tx.put::<tables::TransactionHashNumbers>(*hash, next_tx_num)?;
}
next_tx_num += 1;
}
durations_recorder
.record_duration(metrics::Action::InsertTransactionSenders, tx_senders_elapsed);
durations_recorder
.record_duration(metrics::Action::InsertTransactions, transactions_elapsed);
durations_recorder.record_duration(
metrics::Action::InsertTransactionHashNumbers,
tx_hash_numbers_elapsed,
);
if let Some(withdrawals) = block.block.body.withdrawals {
if !withdrawals.is_empty() {
self.tx.put::<tables::BlockWithdrawals>(
block_number,
StoredBlockWithdrawals { withdrawals },
self.append_block_bodies(
vec![(block_number, Some(block.block.body))],
write_transactions_to,
)?;
durations_recorder.record_relative(metrics::Action::InsertBlockWithdrawals);
}
}
let block_indices = StoredBlockBodyIndices { first_tx_num, tx_count };
self.tx.put::<tables::BlockBodyIndices>(block_number, block_indices)?;
durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
if !block_indices.is_empty() {
self.tx.put::<tables::TransactionBlocks>(block_indices.last_tx_num(), block_number)?;
durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
}
debug!(
target: "providers::db",
@ -3270,33 +3205,83 @@ impl<TX: DbTxMut + DbTx + 'static, N: ProviderNodeTypes + 'static> BlockWriter
"Inserted block"
);
Ok(block_indices)
Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
}
fn append_block_bodies(
&self,
bodies: Vec<(BlockNumber, Option<Self::Body>)>,
write_transactions_to: StorageLocation,
) -> ProviderResult<()> {
let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
// Initialize writer if we will be writing transactions to staticfiles
let mut tx_static_writer = write_transactions_to
.static_files()
.then(|| {
self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)
})
.transpose()?;
let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
// Initialize cursor if we will be writing transactions to database
let mut tx_cursor = write_transactions_to
.database()
.then(|| {
self.tx.cursor_write::<tables::Transactions<
<Self::Body as reth_primitives_traits::BlockBody>::Transaction,
>>()
})
.transpose()?;
// Get id for the next tx_num of zero if there are no transactions.
let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
for (block_number, body) in &bodies {
// Increment block on static file header.
if let Some(writer) = tx_static_writer.as_mut() {
writer.increment_block(*block_number)?;
}
let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
let mut durations_recorder = metrics::DurationsRecorder::default();
// insert block meta
block_indices_cursor.append(*block_number, block_indices)?;
next_tx_num += tx_count;
durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
let Some(body) = body else { continue };
// write transaction block index
if !body.transactions().is_empty() {
tx_block_cursor.append(block_indices.last_tx_num(), *block_number)?;
durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
}
// write transactions
for transaction in body.transactions() {
if let Some(writer) = tx_static_writer.as_mut() {
writer.append_transaction(next_tx_num, transaction)?;
}
if let Some(cursor) = tx_cursor.as_mut() {
cursor.append(next_tx_num, transaction.clone())?;
}
// Increment transaction id for each transaction.
next_tx_num += 1;
}
debug!(
target: "providers::db",
?block_number,
actions = ?durations_recorder.actions,
"Inserted block body"
);
}
self.storage.writer().write_block_bodies(self, bodies)?;
@ -3307,7 +3292,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: ProviderNodeTypes + 'static> BlockWriter
/// TODO(joshie): this fn should be moved to `UnifiedStorageWriter` eventually
fn append_blocks_with_state(
&self,
blocks: Vec<SealedBlockWithSenders>,
blocks: Vec<SealedBlockWithSenders<Header, Self::Body>>,
execution_outcome: ExecutionOutcome,
hashed_state: HashedPostStateSorted,
trie_updates: TrieUpdates,
@ -3326,7 +3311,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: ProviderNodeTypes + 'static> BlockWriter
// Insert the blocks
for block in blocks {
self.insert_block(block)?;
self.insert_block(block, StorageLocation::Database)?;
durations_recorder.record_relative(metrics::Action::InsertBlock);
}

View File

@ -20,6 +20,7 @@ use reth_blockchain_tree_api::{
};
use reth_chain_state::{ChainInfoTracker, ForkChoiceNotifications, ForkChoiceSubscriptions};
use reth_chainspec::{ChainInfo, EthereumHardforks};
use reth_db::table::Value;
use reth_db_api::models::{AccountBeforeTx, StoredBlockBodyIndices};
use reth_evm::ConfigureEvmEnv;
use reth_node_types::{FullNodePrimitives, NodeTypes, NodeTypesWithDB};
@ -75,7 +76,7 @@ where
Self: NodeTypes<
ChainSpec: EthereumHardforks,
Storage: ChainStorage<Self::Primitives>,
Primitives: FullNodePrimitives,
Primitives: FullNodePrimitives<SignedTx: Value>,
>,
{
}
@ -84,7 +85,7 @@ impl<T> NodeTypesForProvider for T where
T: NodeTypes<
ChainSpec: EthereumHardforks,
Storage: ChainStorage<T::Primitives>,
Primitives: FullNodePrimitives,
Primitives: FullNodePrimitives<SignedTx: Value>,
>
{
}

View File

@ -1,3 +1,4 @@
use alloy_consensus::Header;
use alloy_primitives::BlockNumber;
use reth_db_api::models::StoredBlockBodyIndices;
use reth_execution_types::{Chain, ExecutionOutcome};
@ -6,6 +7,29 @@ use reth_storage_errors::provider::ProviderResult;
use reth_trie::{updates::TrieUpdates, HashedPostStateSorted};
use std::ops::RangeInclusive;
/// An enum that represents the storage location for a piece of data.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum StorageLocation {
/// Write only to static files.
StaticFiles,
/// Write only to the database.
Database,
/// Write to both the database and static files.
Both,
}
impl StorageLocation {
/// Returns true if the storage location includes static files.
pub const fn static_files(&self) -> bool {
matches!(self, Self::StaticFiles | Self::Both)
}
/// Returns true if the storage location includes the database.
pub const fn database(&self) -> bool {
matches!(self, Self::Database | Self::Both)
}
}
/// BlockExecution Writer
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait BlockExecutionWriter: BlockWriter + Send + Sync {
@ -40,8 +64,11 @@ pub trait BlockWriter: Send + Sync {
///
/// Return [StoredBlockBodyIndices] that contains indices of the first and last transactions and
/// transition in the block.
fn insert_block(&self, block: SealedBlockWithSenders)
-> ProviderResult<StoredBlockBodyIndices>;
fn insert_block(
&self,
block: SealedBlockWithSenders<Header, Self::Body>,
write_transactions_to: StorageLocation,
) -> ProviderResult<StoredBlockBodyIndices>;
/// Appends a batch of block bodies extending the canonical chain. This is invoked during
/// `Bodies` stage and does not write to `TransactionHashNumbers` and `TransactionSenders`
@ -51,6 +78,7 @@ pub trait BlockWriter: Send + Sync {
fn append_block_bodies(
&self,
bodies: Vec<(BlockNumber, Option<Self::Body>)>,
write_transactions_to: StorageLocation,
) -> ProviderResult<()>;
/// Appends a batch of sealed blocks to the blockchain, including sender information, and
@ -69,7 +97,7 @@ pub trait BlockWriter: Send + Sync {
/// Returns `Ok(())` on success, or an error if any operation fails.
fn append_blocks_with_state(
&self,
blocks: Vec<SealedBlockWithSenders>,
blocks: Vec<SealedBlockWithSenders<Header, Self::Body>>,
execution_outcome: ExecutionOutcome,
hashed_state: HashedPostStateSorted,
trie_updates: TrieUpdates,

View File

@ -2,7 +2,7 @@ use crate::{
providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter as SfWriter},
writer::static_file::StaticFileWriter,
BlockExecutionWriter, BlockWriter, HistoryWriter, StateChangeWriter, StateWriter,
StaticFileProviderFactory, TrieWriter,
StaticFileProviderFactory, StorageLocation, TrieWriter,
};
use alloy_consensus::Header;
use alloy_primitives::{BlockNumber, B256, U256};
@ -15,7 +15,7 @@ use reth_db::{
};
use reth_errors::{ProviderError, ProviderResult};
use reth_execution_types::ExecutionOutcome;
use reth_primitives::{SealedBlock, StaticFileSegment, TransactionSignedNoHash};
use reth_primitives::{BlockBody, SealedBlock, StaticFileSegment};
use reth_stages_types::{StageCheckpoint, StageId};
use reth_storage_api::{
DBProvider, HeaderProvider, ReceiptWriter, StageCheckpointWriter, TransactionsProviderExt,
@ -148,7 +148,7 @@ impl UnifiedStorageWriter<'_, (), ()> {
impl<ProviderDB> UnifiedStorageWriter<'_, ProviderDB, &StaticFileProvider<ProviderDB::Primitives>>
where
ProviderDB: DBProvider<Tx: DbTx + DbTxMut>
+ BlockWriter
+ BlockWriter<Body = BlockBody>
+ TransactionsProviderExt
+ StateChangeWriter
+ TrieWriter
@ -195,7 +195,7 @@ where
for block in blocks {
let sealed_block =
block.block().clone().try_with_senders_unchecked(block.senders().clone()).unwrap();
self.database().insert_block(sealed_block)?;
self.database().insert_block(sealed_block, StorageLocation::Both)?;
self.save_header_and_transactions(block.block.clone())?;
// Write state and changesets to the database.
@ -246,25 +246,8 @@ where
.save_stage_checkpoint(StageId::Headers, StageCheckpoint::new(block.number))?;
}
{
let transactions_writer =
self.static_file().get_writer(block.number, StaticFileSegment::Transactions)?;
let mut storage_writer =
UnifiedStorageWriter::from(self.database(), transactions_writer);
let no_hash_transactions = block
.body
.transactions
.clone()
.into_iter()
.map(TransactionSignedNoHash::from)
.collect();
storage_writer.append_transactions_from_blocks(
block.header().number,
std::iter::once(&no_hash_transactions),
)?;
self.database()
.save_stage_checkpoint(StageId::Bodies, StageCheckpoint::new(block.number))?;
}
Ok(())
}
@ -378,56 +361,6 @@ where
Ok(td)
}
/// Appends transactions to static files, using the
/// [`BlockBodyIndices`](tables::BlockBodyIndices) table to determine the transaction number
/// when appending to static files.
///
/// NOTE: The static file writer used to construct this [`UnifiedStorageWriter`] MUST be a
/// writer for the Transactions segment.
pub fn append_transactions_from_blocks<T>(
&mut self,
initial_block_number: BlockNumber,
transactions: impl Iterator<Item = T>,
) -> ProviderResult<()>
where
T: Borrow<Vec<TransactionSignedNoHash>>,
{
self.ensure_static_file_segment(StaticFileSegment::Transactions)?;
let mut bodies_cursor =
self.database().tx_ref().cursor_read::<tables::BlockBodyIndices>()?;
let mut last_tx_idx = None;
for (idx, transactions) in transactions.enumerate() {
let block_number = initial_block_number + idx as u64;
let mut first_tx_index =
bodies_cursor.seek_exact(block_number)?.map(|(_, indices)| indices.first_tx_num());
// If there are no indices, that means there have been no transactions
//
// So instead of returning an error, use zero
if block_number == initial_block_number && first_tx_index.is_none() {
first_tx_index = Some(0);
}
let mut tx_index = first_tx_index
.or(last_tx_idx)
.ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))?;
for tx in transactions.borrow() {
self.static_file_mut().append_transaction(tx_index, tx)?;
tx_index += 1;
}
self.static_file_mut().increment_block(block_number)?;
// update index
last_tx_idx = Some(tx_index);
}
Ok(())
}
}
impl<ProviderDB>