From 0c5984179e0304c11ad4c5bc0057508b65441338 Mon Sep 17 00:00:00 2001 From: Arsenii Kulikov Date: Thu, 21 Nov 2024 02:48:39 +0400 Subject: [PATCH] refactor(storage): unify blocks insertion logic (#12694) --- Cargo.lock | 1 + bin/reth/src/commands/debug_cmd/merkle.rs | 4 +- crates/blockchain-tree/src/blockchain_tree.rs | 23 ++- crates/blockchain-tree/src/externals.rs | 14 +- crates/blockchain-tree/src/shareable.rs | 8 +- .../commands/src/init_state/without_evm.rs | 15 +- crates/consensus/beacon/src/engine/mod.rs | 9 +- crates/engine/local/src/service.rs | 4 +- crates/engine/service/src/service.rs | 4 +- crates/engine/tree/Cargo.toml | 2 + crates/engine/tree/src/persistence.rs | 16 +- crates/exex/exex/src/backfill/test_utils.rs | 6 +- crates/exex/exex/src/manager.rs | 4 +- crates/exex/exex/src/notifications.rs | 5 +- crates/node/builder/src/launch/common.rs | 5 +- crates/stages/stages/src/stages/bodies.rs | 39 +---- .../stages/src/stages/hashing_account.rs | 5 +- crates/storage/db-api/src/models/mod.rs | 5 +- .../src/providers/database/metrics.rs | 28 --- .../provider/src/providers/database/mod.rs | 17 +- .../src/providers/database/provider.rs | 161 ++++++++---------- crates/storage/provider/src/providers/mod.rs | 5 +- crates/storage/provider/src/traits/block.rs | 34 +++- crates/storage/provider/src/writer/mod.rs | 79 +-------- 24 files changed, 225 insertions(+), 268 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 09bcccf65..017b84f6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7254,6 +7254,7 @@ dependencies = [ "reth-payload-primitives", "reth-payload-validator", "reth-primitives", + "reth-primitives-traits", "reth-provider", "reth-prune", "reth-prune-types", diff --git a/bin/reth/src/commands/debug_cmd/merkle.rs b/bin/reth/src/commands/debug_cmd/merkle.rs index db4cd952e..bb8a6a2c4 100644 --- a/bin/reth/src/commands/debug_cmd/merkle.rs +++ b/bin/reth/src/commands/debug_cmd/merkle.rs @@ -21,7 +21,7 @@ use reth_node_ethereum::EthExecutorProvider; use reth_provider::{ providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockNumReader, BlockWriter, ChainSpecProvider, DatabaseProviderFactory, HeaderProvider, LatestStateProviderRef, - OriginalValuesKnown, ProviderError, ProviderFactory, StateWriter, + OriginalValuesKnown, ProviderError, ProviderFactory, StateWriter, StorageLocation, }; use reth_revm::database::StateProviderDatabase; use reth_stages::{ @@ -148,7 +148,7 @@ impl> Command { .map_err(|block| eyre::eyre!("Error sealing block with senders: {block:?}"))?; trace!(target: "reth::cli", block_number, "Executing block"); - provider_rw.insert_block(sealed_block.clone())?; + provider_rw.insert_block(sealed_block.clone(), StorageLocation::Database)?; td += sealed_block.difficulty; let mut executor = executor_provider.batch_executor(StateProviderDatabase::new( diff --git a/crates/blockchain-tree/src/blockchain_tree.rs b/crates/blockchain-tree/src/blockchain_tree.rs index 1a8a390e9..8e1924925 100644 --- a/crates/blockchain-tree/src/blockchain_tree.rs +++ b/crates/blockchain-tree/src/blockchain_tree.rs @@ -1,6 +1,7 @@ //! Implementation of [`BlockchainTree`] use crate::{ + externals::TreeNodeTypes, metrics::{MakeCanonicalAction, MakeCanonicalDurationsRecorder, TreeMetrics}, state::{SidechainId, TreeState}, AppendableChain, BlockIndices, BlockchainTreeConfig, ExecutionData, TreeExternals, @@ -21,10 +22,10 @@ use reth_primitives::{ SealedHeader, StaticFileSegment, }; use reth_provider::{ - providers::ProviderNodeTypes, BlockExecutionWriter, BlockNumReader, BlockWriter, - CanonStateNotification, CanonStateNotificationSender, CanonStateNotifications, - ChainSpecProvider, ChainSplit, ChainSplitTarget, DBProvider, DisplayBlocksChain, - HeaderProvider, ProviderError, StaticFileProviderFactory, + BlockExecutionWriter, BlockNumReader, BlockWriter, CanonStateNotification, + CanonStateNotificationSender, CanonStateNotifications, ChainSpecProvider, ChainSplit, + ChainSplitTarget, DBProvider, DisplayBlocksChain, HeaderProvider, ProviderError, + StaticFileProviderFactory, }; use reth_stages_api::{MetricEvent, MetricEventsSender}; use reth_storage_errors::provider::{ProviderResult, RootMismatch}; @@ -93,7 +94,7 @@ impl BlockchainTree { impl BlockchainTree where - N: ProviderNodeTypes, + N: TreeNodeTypes, E: BlockExecutorProvider, { /// Builds the blockchain tree for the node. @@ -1386,16 +1387,18 @@ mod tests { use reth_db_api::transaction::DbTxMut; use reth_evm::test_utils::MockExecutorProvider; use reth_evm_ethereum::execute::EthExecutorProvider; + use reth_node_types::FullNodePrimitives; use reth_primitives::{ proofs::{calculate_receipt_root, calculate_transaction_root}, Account, BlockBody, Transaction, TransactionSigned, TransactionSignedEcRecovered, }; use reth_provider::{ + providers::ProviderNodeTypes, test_utils::{ blocks::BlockchainTestData, create_test_provider_factory_with_chain_spec, MockNodeTypesWithDB, }, - ProviderFactory, + ProviderFactory, StorageLocation, }; use reth_revm::primitives::AccountInfo; use reth_stages_api::StageCheckpoint; @@ -1420,7 +1423,12 @@ mod tests { TreeExternals::new(provider_factory, consensus, executor_factory) } - fn setup_genesis(factory: &ProviderFactory, mut genesis: SealedBlock) { + fn setup_genesis< + N: ProviderNodeTypes>, + >( + factory: &ProviderFactory, + mut genesis: SealedBlock, + ) { // insert genesis to db. genesis.header.set_block_number(10); @@ -1551,6 +1559,7 @@ mod tests { SealedBlock::new(chain_spec.sealed_genesis_header(), Default::default()) .try_seal_with_senders() .unwrap(), + StorageLocation::Database, ) .unwrap(); let account = Account { balance: initial_signer_balance, ..Default::default() }; diff --git a/crates/blockchain-tree/src/externals.rs b/crates/blockchain-tree/src/externals.rs index 4e22fcb78..76b658248 100644 --- a/crates/blockchain-tree/src/externals.rs +++ b/crates/blockchain-tree/src/externals.rs @@ -4,8 +4,8 @@ use alloy_primitives::{BlockHash, BlockNumber}; use reth_consensus::Consensus; use reth_db::{static_file::HeaderMask, tables}; use reth_db_api::{cursor::DbCursorRO, transaction::DbTx}; -use reth_node_types::NodeTypesWithDB; -use reth_primitives::StaticFileSegment; +use reth_node_types::{Block, FullNodePrimitives, NodeTypesWithDB}; +use reth_primitives::{BlockBody, StaticFileSegment}; use reth_provider::{ providers::ProviderNodeTypes, ChainStateBlockReader, ChainStateBlockWriter, ProviderFactory, StaticFileProviderFactory, StatsReader, @@ -13,6 +13,16 @@ use reth_provider::{ use reth_storage_errors::provider::ProviderResult; use std::{collections::BTreeMap, sync::Arc}; +/// A helper trait with requirements for [`ProviderNodeTypes`] to be used within [`TreeExternals`]. +pub trait TreeNodeTypes: + ProviderNodeTypes>> +{ +} +impl TreeNodeTypes for T where + T: ProviderNodeTypes>> +{ +} + /// A container for external components. /// /// This is a simple container for external components used throughout the blockchain tree diff --git a/crates/blockchain-tree/src/shareable.rs b/crates/blockchain-tree/src/shareable.rs index ec1f3cccf..f997e0a06 100644 --- a/crates/blockchain-tree/src/shareable.rs +++ b/crates/blockchain-tree/src/shareable.rs @@ -1,5 +1,7 @@ //! Wrapper around `BlockchainTree` that allows for it to be shared. +use crate::externals::TreeNodeTypes; + use super::BlockchainTree; use alloy_eips::BlockNumHash; use alloy_primitives::{BlockHash, BlockNumber}; @@ -36,7 +38,7 @@ impl ShareableBlockchainTree { impl BlockchainTreeEngine for ShareableBlockchainTree where - N: ProviderNodeTypes, + N: TreeNodeTypes, E: BlockExecutorProvider, { fn buffer_block(&self, block: SealedBlockWithSenders) -> Result<(), InsertBlockError> { @@ -107,7 +109,7 @@ where impl BlockchainTreeViewer for ShareableBlockchainTree where - N: ProviderNodeTypes, + N: TreeNodeTypes, E: BlockExecutorProvider, { fn header_by_hash(&self, hash: BlockHash) -> Option { @@ -170,7 +172,7 @@ where impl BlockchainTreePendingStateProvider for ShareableBlockchainTree where - N: ProviderNodeTypes, + N: TreeNodeTypes, E: BlockExecutorProvider, { fn find_pending_state_provider( diff --git a/crates/cli/commands/src/init_state/without_evm.rs b/crates/cli/commands/src/init_state/without_evm.rs index c6e1f9a51..e3594a593 100644 --- a/crates/cli/commands/src/init_state/without_evm.rs +++ b/crates/cli/commands/src/init_state/without_evm.rs @@ -3,12 +3,10 @@ use alloy_rlp::Decodable; use alloy_consensus::Header; use reth_node_builder::NodePrimitives; -use reth_primitives::{ - BlockBody, SealedBlock, SealedBlockWithSenders, SealedHeader, StaticFileSegment, -}; +use reth_primitives::{SealedBlock, SealedBlockWithSenders, SealedHeader, StaticFileSegment}; use reth_provider::{ providers::StaticFileProvider, BlockWriter, StageCheckpointWriter, StaticFileProviderFactory, - StaticFileWriter, + StaticFileWriter, StorageLocation, }; use reth_stages::{StageCheckpoint, StageId}; @@ -33,7 +31,9 @@ pub fn setup_without_evm( total_difficulty: U256, ) -> Result<(), eyre::Error> where - Provider: StaticFileProviderFactory + StageCheckpointWriter + BlockWriter, + Provider: StaticFileProviderFactory + + StageCheckpointWriter + + BlockWriter, { info!(target: "reth::cli", "Setting up dummy EVM chain before importing state."); @@ -64,11 +64,12 @@ fn append_first_block( total_difficulty: U256, ) -> Result<(), eyre::Error> where - Provider: BlockWriter + StaticFileProviderFactory, + Provider: BlockWriter + StaticFileProviderFactory, { provider_rw.insert_block( - SealedBlockWithSenders::new(SealedBlock::new(header.clone(), BlockBody::default()), vec![]) + SealedBlockWithSenders::new(SealedBlock::new(header.clone(), Default::default()), vec![]) .expect("no senders or txes"), + StorageLocation::Database, )?; let sf_provider = provider_rw.static_file_provider(); diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 0b93ae0f2..2ad06e68b 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -1991,7 +1991,8 @@ mod tests { use alloy_rpc_types_engine::{ForkchoiceState, ForkchoiceUpdated, PayloadStatus}; use assert_matches::assert_matches; use reth_chainspec::{ChainSpecBuilder, MAINNET}; - use reth_provider::{BlockWriter, ProviderFactory}; + use reth_node_types::FullNodePrimitives; + use reth_provider::{BlockWriter, ProviderFactory, StorageLocation}; use reth_rpc_types_compat::engine::payload::block_to_payload_v1; use reth_stages::{ExecOutput, PipelineError, StageError}; use reth_stages_api::StageCheckpoint; @@ -2169,7 +2170,10 @@ mod tests { assert_matches!(rx.await, Ok(Ok(()))); } - fn insert_blocks<'a, N: ProviderNodeTypes>( + fn insert_blocks< + 'a, + N: ProviderNodeTypes>, + >( provider_factory: ProviderFactory, mut blocks: impl Iterator, ) { @@ -2179,6 +2183,7 @@ mod tests { provider .insert_block( b.clone().try_seal_with_senders().expect("invalid tx signature in block"), + StorageLocation::Database, ) .map(drop) }) diff --git a/crates/engine/local/src/service.rs b/crates/engine/local/src/service.rs index 4e4826be3..3575bc133 100644 --- a/crates/engine/local/src/service.rs +++ b/crates/engine/local/src/service.rs @@ -27,7 +27,7 @@ use reth_engine_tree::{ EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineRequestHandler, FromEngine, RequestHandlerEvent, }, - persistence::PersistenceHandle, + persistence::{PersistenceHandle, PersistenceNodeTypes}, tree::{EngineApiTreeHandler, InvalidBlockHook, TreeConfig}, }; use reth_evm::execute::BlockExecutorProvider; @@ -59,7 +59,7 @@ where impl LocalEngineService where - N: EngineNodeTypes, + N: EngineNodeTypes + PersistenceNodeTypes, { /// Constructor for [`LocalEngineService`]. #[allow(clippy::too_many_arguments)] diff --git a/crates/engine/service/src/service.rs b/crates/engine/service/src/service.rs index cec9d981f..49233439e 100644 --- a/crates/engine/service/src/service.rs +++ b/crates/engine/service/src/service.rs @@ -8,7 +8,7 @@ use reth_engine_tree::{ backfill::PipelineSync, download::BasicBlockDownloader, engine::{EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineHandler}, - persistence::PersistenceHandle, + persistence::{PersistenceHandle, PersistenceNodeTypes}, tree::{EngineApiTreeHandler, InvalidBlockHook, TreeConfig}, }; pub use reth_engine_tree::{ @@ -59,7 +59,7 @@ where impl EngineService where - N: EngineNodeTypes, + N: EngineNodeTypes + PersistenceNodeTypes, Client: EthBlockClient + 'static, E: BlockExecutorProvider + 'static, { diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index d6e1c80a7..70be84a9f 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -27,6 +27,7 @@ reth-payload-builder-primitives.workspace = true reth-payload-primitives.workspace = true reth-payload-validator.workspace = true reth-primitives.workspace = true +reth-primitives-traits.workspace = true reth-provider.workspace = true reth-prune.workspace = true reth-revm.workspace = true @@ -107,4 +108,5 @@ test-utils = [ "reth-provider/test-utils", "reth-trie/test-utils", "reth-prune-types?/test-utils", + "reth-primitives-traits/test-utils", ] diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index e0c9e0362..0199ae3f4 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -2,6 +2,8 @@ use crate::metrics::PersistenceMetrics; use alloy_eips::BlockNumHash; use reth_chain_state::ExecutedBlock; use reth_errors::ProviderError; +use reth_primitives::BlockBody; +use reth_primitives_traits::{Block, FullNodePrimitives}; use reth_provider::{ providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockHashReader, ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory, StaticFileProviderFactory, @@ -16,6 +18,16 @@ use thiserror::Error; use tokio::sync::oneshot; use tracing::{debug, error}; +/// A helper trait with requirements for [`ProviderNodeTypes`] to be used within +/// [`PersistenceService`]. +pub trait PersistenceNodeTypes: + ProviderNodeTypes>> +{ +} +impl PersistenceNodeTypes for T where + T: ProviderNodeTypes>> +{ +} /// Writes parts of reth's in memory tree state to the database and static files. /// /// This is meant to be a spawned service that listens for various incoming persistence operations, @@ -60,7 +72,7 @@ impl PersistenceService { } } -impl PersistenceService { +impl PersistenceService { /// This is the main loop, that will listen to database events and perform the requested /// database actions pub fn run(mut self) -> Result<(), PersistenceError> { @@ -198,7 +210,7 @@ impl PersistenceHandle { } /// Create a new [`PersistenceHandle`], and spawn the persistence service. - pub fn spawn_service( + pub fn spawn_service( provider_factory: ProviderFactory, pruner: PrunerWithFactory>, sync_metrics_tx: MetricEventsSender, diff --git a/crates/exex/exex/src/backfill/test_utils.rs b/crates/exex/exex/src/backfill/test_utils.rs index 80af408c5..5d0f88f51 100644 --- a/crates/exex/exex/src/backfill/test_utils.rs +++ b/crates/exex/exex/src/backfill/test_utils.rs @@ -9,6 +9,7 @@ use reth_evm::execute::{ BatchExecutor, BlockExecutionInput, BlockExecutionOutput, BlockExecutorProvider, Executor, }; use reth_evm_ethereum::execute::EthExecutorProvider; +use reth_node_api::FullNodePrimitives; use reth_primitives::{ Block, BlockBody, BlockWithSenders, Receipt, SealedBlockWithSenders, Transaction, }; @@ -57,7 +58,7 @@ pub(crate) fn execute_block_and_commit_to_database( block: &BlockWithSenders, ) -> eyre::Result> where - N: ProviderNodeTypes, + N: ProviderNodeTypes>, { let provider = provider_factory.provider()?; @@ -161,7 +162,7 @@ pub(crate) fn blocks_and_execution_outputs( key_pair: Keypair, ) -> eyre::Result)>> where - N: ProviderNodeTypes, + N: ProviderNodeTypes>, { let (block1, block2) = blocks(chain_spec.clone(), key_pair)?; @@ -183,6 +184,7 @@ pub(crate) fn blocks_and_execution_outcome( ) -> eyre::Result<(Vec, ExecutionOutcome)> where N: ProviderNodeTypes, + N::Primitives: FullNodePrimitives, { let (block1, block2) = blocks(chain_spec.clone(), key_pair)?; diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index c8c06021e..e3d3a3c06 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -657,7 +657,7 @@ mod tests { use reth_primitives::SealedBlockWithSenders; use reth_provider::{ providers::BlockchainProvider2, test_utils::create_test_provider_factory, BlockReader, - BlockWriter, Chain, DatabaseProviderFactory, TransactionVariant, + BlockWriter, Chain, DatabaseProviderFactory, StorageLocation, TransactionVariant, }; use reth_testing_utils::generators::{self, random_block, BlockParams}; @@ -1238,7 +1238,7 @@ mod tests { .seal_with_senders() .unwrap(); let provider_rw = provider_factory.database_provider_rw().unwrap(); - provider_rw.insert_block(block.clone()).unwrap(); + provider_rw.insert_block(block.clone(), StorageLocation::Database).unwrap(); provider_rw.commit().unwrap(); let provider = BlockchainProvider2::new(provider_factory).unwrap(); diff --git a/crates/exex/exex/src/notifications.rs b/crates/exex/exex/src/notifications.rs index 14cfe9be4..baf504166 100644 --- a/crates/exex/exex/src/notifications.rs +++ b/crates/exex/exex/src/notifications.rs @@ -403,7 +403,7 @@ mod tests { use reth_primitives::Block; use reth_provider::{ providers::BlockchainProvider2, test_utils::create_test_provider_factory, BlockWriter, - Chain, DatabaseProviderFactory, + Chain, DatabaseProviderFactory, StorageLocation, }; use reth_testing_utils::generators::{self, random_block, BlockParams}; use tokio::sync::mpsc; @@ -431,6 +431,7 @@ mod tests { let provider_rw = provider_factory.provider_rw()?; provider_rw.insert_block( node_head_block.clone().seal_with_senders().ok_or_eyre("failed to recover senders")?, + StorageLocation::Database, )?; provider_rw.commit()?; @@ -574,7 +575,7 @@ mod tests { ..Default::default() }; let provider_rw = provider.database_provider_rw()?; - provider_rw.insert_block(node_head_block)?; + provider_rw.insert_block(node_head_block, StorageLocation::Database)?; provider_rw.commit()?; let node_head_notification = ExExNotification::ChainCommitted { new: Arc::new( diff --git a/crates/node/builder/src/launch/common.rs b/crates/node/builder/src/launch/common.rs index 903b09803..225f2029c 100644 --- a/crates/node/builder/src/launch/common.rs +++ b/crates/node/builder/src/launch/common.rs @@ -12,7 +12,8 @@ use eyre::{Context, OptionExt}; use rayon::ThreadPoolBuilder; use reth_beacon_consensus::EthBeaconConsensus; use reth_blockchain_tree::{ - BlockchainTree, BlockchainTreeConfig, ShareableBlockchainTree, TreeExternals, + externals::TreeNodeTypes, BlockchainTree, BlockchainTreeConfig, ShareableBlockchainTree, + TreeExternals, }; use reth_chainspec::{Chain, EthChainSpec, EthereumHardforks}; use reth_config::{config::EtlConfig, PruneConfig}; @@ -631,7 +632,7 @@ impl Attached::ChainSpec>, WithMeteredProviders>, > where - T: FullNodeTypes, + T: FullNodeTypes, { /// Returns access to the underlying database. pub const fn database(&self) -> &::DB { diff --git a/crates/stages/stages/src/stages/bodies.rs b/crates/stages/stages/src/stages/bodies.rs index 80185eade..b6eab349e 100644 --- a/crates/stages/stages/src/stages/bodies.rs +++ b/crates/stages/stages/src/stages/bodies.rs @@ -19,6 +19,7 @@ use reth_primitives::StaticFileSegment; use reth_provider::{ providers::{StaticFileProvider, StaticFileWriter}, BlockReader, BlockWriter, DBProvider, ProviderError, StaticFileProviderFactory, StatsReader, + StorageLocation, }; use reth_stages_api::{ EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId, @@ -122,7 +123,7 @@ where let (from_block, to_block) = input.next_block_range().into_inner(); // Get id for the next tx_num of zero if there are no transactions. - let mut next_tx_num = provider + let next_tx_num = provider .tx_ref() .cursor_read::()? .last()? @@ -130,8 +131,6 @@ where .unwrap_or_default(); let static_file_provider = provider.static_file_provider(); - let mut static_file_producer = - static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?; // Make sure Transactions static file is at the same height. If it's further, this // input execution was interrupted previously and we need to unwind the static file. @@ -145,6 +144,8 @@ where // stage run. So, our only solution is to unwind the static files and proceed from the // database expected height. Ordering::Greater => { + let mut static_file_producer = + static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?; static_file_producer .prune_transactions(next_static_file_tx_num - next_tx_num, from_block - 1)?; // Since this is a database <-> static file inconsistency, we commit the change @@ -168,40 +169,16 @@ where let buffer = self.buffer.take().ok_or(StageError::MissingDownloadBuffer)?; trace!(target: "sync::stages::bodies", bodies_len = buffer.len(), "Writing blocks"); - let mut highest_block = from_block; + let highest_block = buffer.last().map(|r| r.block_number()).unwrap_or(from_block); - // Firstly, write transactions to static files - for response in &buffer { - let block_number = response.block_number(); - - // Increment block on static file header. - if block_number > 0 { - static_file_producer.increment_block(block_number)?; - } - - match response { - BlockResponse::Full(block) => { - // Write transactions - for transaction in block.body.transactions() { - static_file_producer.append_transaction(next_tx_num, transaction)?; - - // Increment transaction id for each transaction. - next_tx_num += 1; - } - } - BlockResponse::Empty(_) => {} - }; - - highest_block = block_number; - } - - // Write bodies to database. This will NOT write transactions to database as we've already - // written them directly to static files. + // Write bodies to database. provider.append_block_bodies( buffer .into_iter() .map(|response| (response.block_number(), response.into_body())) .collect(), + // We are writing transactions directly to static files. + StorageLocation::StaticFiles, )?; // The stage is "done" if: diff --git a/crates/stages/stages/src/stages/hashing_account.rs b/crates/stages/stages/src/stages/hashing_account.rs index 1ca0e1aa1..ecca1e071 100644 --- a/crates/stages/stages/src/stages/hashing_account.rs +++ b/crates/stages/stages/src/stages/hashing_account.rs @@ -61,7 +61,10 @@ impl AccountHashingStage { pub fn seed( provider: &reth_provider::DatabaseProvider, opts: SeedOpts, - ) -> Result, StageError> { + ) -> Result, StageError> + where + N::Primitives: reth_primitives_traits::FullNodePrimitives, + { use alloy_primitives::U256; use reth_db_api::models::AccountBeforeTx; use reth_provider::{StaticFileProviderFactory, StaticFileWriter}; diff --git a/crates/storage/db-api/src/models/mod.rs b/crates/storage/db-api/src/models/mod.rs index 7b1cd5a1d..5d1871192 100644 --- a/crates/storage/db-api/src/models/mod.rs +++ b/crates/storage/db-api/src/models/mod.rs @@ -8,7 +8,9 @@ use alloy_consensus::Header; use alloy_genesis::GenesisAccount; use alloy_primitives::{Address, Bytes, Log, B256, U256}; use reth_codecs::{add_arbitrary_tests, Compact}; -use reth_primitives::{Account, Bytecode, Receipt, StorageEntry, TransactionSignedNoHash, TxType}; +use reth_primitives::{ + Account, Bytecode, Receipt, StorageEntry, TransactionSigned, TransactionSignedNoHash, TxType, +}; use reth_prune_types::{PruneCheckpoint, PruneSegment}; use reth_stages_types::StageCheckpoint; use reth_trie_common::{StoredNibbles, StoredNibblesSubKey, *}; @@ -225,6 +227,7 @@ impl_compression_for_compact!( Bytecode, AccountBeforeTx, TransactionSignedNoHash, + TransactionSigned, CompactU256, StageCheckpoint, PruneCheckpoint, diff --git a/crates/storage/provider/src/providers/database/metrics.rs b/crates/storage/provider/src/providers/database/metrics.rs index 7e9ee7202..4ee8f1ce5 100644 --- a/crates/storage/provider/src/providers/database/metrics.rs +++ b/crates/storage/provider/src/providers/database/metrics.rs @@ -22,14 +22,6 @@ impl Default for DurationsRecorder { } impl DurationsRecorder { - /// Saves the provided duration for future logging and instantly reports as a metric with - /// `action` label. - pub(crate) fn record_duration(&mut self, action: Action, duration: Duration) { - self.actions.push((action, duration)); - self.current_metrics.record_duration(action, duration); - self.latest = Some(self.start.elapsed()); - } - /// Records the duration since last record, saves it for future logging and instantly reports as /// a metric with `action` label. pub(crate) fn record_relative(&mut self, action: Action) { @@ -56,11 +48,6 @@ pub(crate) enum Action { InsertHeaders, InsertHeaderNumbers, InsertHeaderTerminalDifficulties, - InsertBlockOmmers, - InsertTransactionSenders, - InsertTransactions, - InsertTransactionHashNumbers, - InsertBlockWithdrawals, InsertBlockBodyIndices, InsertTransactionBlocks, GetNextTxNum, @@ -95,16 +82,6 @@ struct DatabaseProviderMetrics { insert_header_numbers: Histogram, /// Duration of insert header TD insert_header_td: Histogram, - /// Duration of insert block ommers - insert_block_ommers: Histogram, - /// Duration of insert tx senders - insert_tx_senders: Histogram, - /// Duration of insert transactions - insert_transactions: Histogram, - /// Duration of insert transaction hash numbers - insert_tx_hash_numbers: Histogram, - /// Duration of insert block withdrawals - insert_block_withdrawals: Histogram, /// Duration of insert block body indices insert_block_body_indices: Histogram, /// Duration of insert transaction blocks @@ -131,11 +108,6 @@ impl DatabaseProviderMetrics { Action::InsertHeaders => self.insert_headers.record(duration), Action::InsertHeaderNumbers => self.insert_header_numbers.record(duration), Action::InsertHeaderTerminalDifficulties => self.insert_header_td.record(duration), - Action::InsertBlockOmmers => self.insert_block_ommers.record(duration), - Action::InsertTransactionSenders => self.insert_tx_senders.record(duration), - Action::InsertTransactions => self.insert_transactions.record(duration), - Action::InsertTransactionHashNumbers => self.insert_tx_hash_numbers.record(duration), - Action::InsertBlockWithdrawals => self.insert_block_withdrawals.record(duration), Action::InsertBlockBodyIndices => self.insert_block_body_indices.record(duration), Action::InsertTransactionBlocks => self.insert_tx_blocks.record(duration), Action::GetNextTxNum => self.get_next_tx_num.record(duration), diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index a64bb2578..cc50aa351 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -644,7 +644,7 @@ mod tests { providers::{StaticFileProvider, StaticFileWriter}, test_utils::{blocks::TEST_BLOCK, create_test_provider_factory, MockNodeTypesWithDB}, BlockHashReader, BlockNumReader, BlockWriter, DBProvider, HeaderSyncGapProvider, - TransactionsProvider, + StorageLocation, TransactionsProvider, }; use alloy_primitives::{TxNumber, B256, U256}; use assert_matches::assert_matches; @@ -715,7 +715,10 @@ mod tests { { let provider = factory.provider_rw().unwrap(); assert_matches!( - provider.insert_block(block.clone().try_seal_with_senders().unwrap()), + provider.insert_block( + block.clone().try_seal_with_senders().unwrap(), + StorageLocation::Database + ), Ok(_) ); assert_matches!( @@ -733,7 +736,10 @@ mod tests { }; let provider = factory.with_prune_modes(prune_modes).provider_rw().unwrap(); assert_matches!( - provider.insert_block(block.clone().try_seal_with_senders().unwrap(),), + provider.insert_block( + block.clone().try_seal_with_senders().unwrap(), + StorageLocation::Database + ), Ok(_) ); assert_matches!(provider.transaction_sender(0), Ok(None)); @@ -754,7 +760,10 @@ mod tests { let provider = factory.provider_rw().unwrap(); assert_matches!( - provider.insert_block(block.clone().try_seal_with_senders().unwrap()), + provider.insert_block( + block.clone().try_seal_with_senders().unwrap(), + StorageLocation::Database + ), Ok(_) ); diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 4690e2782..66bc4c053 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -17,8 +17,8 @@ use crate::{ LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError, PruneCheckpointReader, PruneCheckpointWriter, RevertsInit, StageCheckpointReader, StateChangeWriter, StateProviderBox, StateReader, StateWriter, StaticFileProviderFactory, - StatsReader, StorageReader, StorageTrieWriter, TransactionVariant, TransactionsProvider, - TransactionsProviderExt, TrieWriter, WithdrawalsProvider, + StatsReader, StorageLocation, StorageReader, StorageTrieWriter, TransactionVariant, + TransactionsProvider, TransactionsProviderExt, TrieWriter, WithdrawalsProvider, }; use alloy_consensus::Header; use alloy_eips::{ @@ -37,7 +37,7 @@ use reth_db_api::{ database::Database, models::{ sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress, - ShardedKey, StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals, + ShardedKey, StoredBlockBodyIndices, }, table::Table, transaction::{DbTx, DbTxMut}, @@ -52,7 +52,7 @@ use reth_primitives::{ SealedBlockWithSenders, SealedHeader, StaticFileSegment, StorageEntry, TransactionMeta, TransactionSigned, TransactionSignedEcRecovered, TransactionSignedNoHash, }; -use reth_primitives_traits::{BlockBody as _, FullNodePrimitives}; +use reth_primitives_traits::{BlockBody as _, FullNodePrimitives, SignedTransaction}; use reth_prune_types::{PruneCheckpoint, PruneModes, PruneSegment}; use reth_stages_types::{StageCheckpoint, StageId}; use reth_storage_api::{StateProvider, StorageChangeSetReader, TryIntoHistoricalStateProvider}; @@ -73,10 +73,9 @@ use std::{ fmt::Debug, ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive}, sync::{mpsc, Arc}, - time::{Duration, Instant}, }; use tokio::sync::watch; -use tracing::{debug, error, trace, warn}; +use tracing::{debug, error, trace}; /// A [`DatabaseProvider`] that holds a read-only database transaction. pub type DatabaseProviderRO = DatabaseProvider<::TX, N>; @@ -292,7 +291,7 @@ impl DatabaseProvi /// Inserts an historical block. **Used for setting up test environments** pub fn insert_historical_block( &self, - block: SealedBlockWithSenders, + block: SealedBlockWithSenders::Body>, ) -> ProviderResult { let ttd = if block.number == 0 { block.difficulty @@ -316,7 +315,7 @@ impl DatabaseProvi writer.append_header(block.header.as_ref(), ttd, &block.hash())?; - self.insert_block(block) + self.insert_block(block, StorageLocation::Database) } } @@ -3137,7 +3136,8 @@ impl BlockWriter /// [`TransactionHashNumbers`](tables::TransactionHashNumbers). fn insert_block( &self, - block: SealedBlockWithSenders, + block: SealedBlockWithSenders, + write_transactions_to: StorageLocation, ) -> ProviderResult { let block_number = block.number; @@ -3166,15 +3166,6 @@ impl BlockWriter self.tx.put::(block_number, ttd.into())?; durations_recorder.record_relative(metrics::Action::InsertHeaderTerminalDifficulties); - // insert body ommers data - if !block.body.ommers.is_empty() { - self.tx.put::( - block_number, - StoredBlockOmmers { ommers: block.block.body.ommers }, - )?; - durations_recorder.record_relative(metrics::Action::InsertBlockOmmers); - } - let mut next_tx_num = self .tx .cursor_read::()? @@ -3184,84 +3175,28 @@ impl BlockWriter durations_recorder.record_relative(metrics::Action::GetNextTxNum); let first_tx_num = next_tx_num; - let tx_count = block.block.body.transactions.len() as u64; + let tx_count = block.block.body.transactions().len() as u64; // Ensures we have all the senders for the block's transactions. - let mut tx_senders_elapsed = Duration::default(); - let mut transactions_elapsed = Duration::default(); - let mut tx_hash_numbers_elapsed = Duration::default(); - for (transaction, sender) in - block.block.body.transactions.into_iter().zip(block.senders.iter()) + block.block.body.transactions().iter().zip(block.senders.iter()) { - let hash = transaction.hash(); + let hash = transaction.tx_hash(); - if self - .prune_modes - .sender_recovery - .as_ref() - .filter(|prune_mode| prune_mode.is_full()) - .is_none() - { - let start = Instant::now(); + if self.prune_modes.sender_recovery.as_ref().is_none_or(|m| !m.is_full()) { self.tx.put::(next_tx_num, *sender)?; - tx_senders_elapsed += start.elapsed(); } - let start = Instant::now(); - self.tx.put::(next_tx_num, transaction.into())?; - let elapsed = start.elapsed(); - if elapsed > Duration::from_secs(1) { - warn!( - target: "providers::db", - ?block_number, - tx_num = %next_tx_num, - hash = %hash, - ?elapsed, - "Transaction insertion took too long" - ); - } - transactions_elapsed += elapsed; - - if self - .prune_modes - .transaction_lookup - .filter(|prune_mode| prune_mode.is_full()) - .is_none() - { - let start = Instant::now(); - self.tx.put::(hash, next_tx_num)?; - tx_hash_numbers_elapsed += start.elapsed(); + if self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full()) { + self.tx.put::(*hash, next_tx_num)?; } next_tx_num += 1; } - durations_recorder - .record_duration(metrics::Action::InsertTransactionSenders, tx_senders_elapsed); - durations_recorder - .record_duration(metrics::Action::InsertTransactions, transactions_elapsed); - durations_recorder.record_duration( - metrics::Action::InsertTransactionHashNumbers, - tx_hash_numbers_elapsed, - ); - if let Some(withdrawals) = block.block.body.withdrawals { - if !withdrawals.is_empty() { - self.tx.put::( - block_number, - StoredBlockWithdrawals { withdrawals }, - )?; - durations_recorder.record_relative(metrics::Action::InsertBlockWithdrawals); - } - } - - let block_indices = StoredBlockBodyIndices { first_tx_num, tx_count }; - self.tx.put::(block_number, block_indices)?; - durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices); - - if !block_indices.is_empty() { - self.tx.put::(block_indices.last_tx_num(), block_number)?; - durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks); - } + self.append_block_bodies( + vec![(block_number, Some(block.block.body))], + write_transactions_to, + )?; debug!( target: "providers::db", @@ -3270,33 +3205,83 @@ impl BlockWriter "Inserted block" ); - Ok(block_indices) + Ok(StoredBlockBodyIndices { first_tx_num, tx_count }) } fn append_block_bodies( &self, bodies: Vec<(BlockNumber, Option)>, + write_transactions_to: StorageLocation, ) -> ProviderResult<()> { + let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) }; + + // Initialize writer if we will be writing transactions to staticfiles + let mut tx_static_writer = write_transactions_to + .static_files() + .then(|| { + self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions) + }) + .transpose()?; + let mut block_indices_cursor = self.tx.cursor_write::()?; let mut tx_block_cursor = self.tx.cursor_write::()?; + // Initialize cursor if we will be writing transactions to database + let mut tx_cursor = write_transactions_to + .database() + .then(|| { + self.tx.cursor_write::::Transaction, + >>() + }) + .transpose()?; + // Get id for the next tx_num of zero if there are no transactions. let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default(); for (block_number, body) in &bodies { + // Increment block on static file header. + if let Some(writer) = tx_static_writer.as_mut() { + writer.increment_block(*block_number)?; + } + let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default(); let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count }; + let mut durations_recorder = metrics::DurationsRecorder::default(); + // insert block meta block_indices_cursor.append(*block_number, block_indices)?; - next_tx_num += tx_count; + durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices); + let Some(body) = body else { continue }; // write transaction block index if !body.transactions().is_empty() { tx_block_cursor.append(block_indices.last_tx_num(), *block_number)?; + durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks); } + + // write transactions + for transaction in body.transactions() { + if let Some(writer) = tx_static_writer.as_mut() { + writer.append_transaction(next_tx_num, transaction)?; + } + if let Some(cursor) = tx_cursor.as_mut() { + cursor.append(next_tx_num, transaction.clone())?; + } + + // Increment transaction id for each transaction. + next_tx_num += 1; + } + + debug!( + target: "providers::db", + ?block_number, + actions = ?durations_recorder.actions, + "Inserted block body" + ); } self.storage.writer().write_block_bodies(self, bodies)?; @@ -3307,7 +3292,7 @@ impl BlockWriter /// TODO(joshie): this fn should be moved to `UnifiedStorageWriter` eventually fn append_blocks_with_state( &self, - blocks: Vec, + blocks: Vec>, execution_outcome: ExecutionOutcome, hashed_state: HashedPostStateSorted, trie_updates: TrieUpdates, @@ -3326,7 +3311,7 @@ impl BlockWriter // Insert the blocks for block in blocks { - self.insert_block(block)?; + self.insert_block(block, StorageLocation::Database)?; durations_recorder.record_relative(metrics::Action::InsertBlock); } diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index d04924337..d53091790 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -20,6 +20,7 @@ use reth_blockchain_tree_api::{ }; use reth_chain_state::{ChainInfoTracker, ForkChoiceNotifications, ForkChoiceSubscriptions}; use reth_chainspec::{ChainInfo, EthereumHardforks}; +use reth_db::table::Value; use reth_db_api::models::{AccountBeforeTx, StoredBlockBodyIndices}; use reth_evm::ConfigureEvmEnv; use reth_node_types::{FullNodePrimitives, NodeTypes, NodeTypesWithDB}; @@ -75,7 +76,7 @@ where Self: NodeTypes< ChainSpec: EthereumHardforks, Storage: ChainStorage, - Primitives: FullNodePrimitives, + Primitives: FullNodePrimitives, >, { } @@ -84,7 +85,7 @@ impl NodeTypesForProvider for T where T: NodeTypes< ChainSpec: EthereumHardforks, Storage: ChainStorage, - Primitives: FullNodePrimitives, + Primitives: FullNodePrimitives, > { } diff --git a/crates/storage/provider/src/traits/block.rs b/crates/storage/provider/src/traits/block.rs index a0dae1783..c84534e7a 100644 --- a/crates/storage/provider/src/traits/block.rs +++ b/crates/storage/provider/src/traits/block.rs @@ -1,3 +1,4 @@ +use alloy_consensus::Header; use alloy_primitives::BlockNumber; use reth_db_api::models::StoredBlockBodyIndices; use reth_execution_types::{Chain, ExecutionOutcome}; @@ -6,6 +7,29 @@ use reth_storage_errors::provider::ProviderResult; use reth_trie::{updates::TrieUpdates, HashedPostStateSorted}; use std::ops::RangeInclusive; +/// An enum that represents the storage location for a piece of data. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum StorageLocation { + /// Write only to static files. + StaticFiles, + /// Write only to the database. + Database, + /// Write to both the database and static files. + Both, +} + +impl StorageLocation { + /// Returns true if the storage location includes static files. + pub const fn static_files(&self) -> bool { + matches!(self, Self::StaticFiles | Self::Both) + } + + /// Returns true if the storage location includes the database. + pub const fn database(&self) -> bool { + matches!(self, Self::Database | Self::Both) + } +} + /// BlockExecution Writer #[auto_impl::auto_impl(&, Arc, Box)] pub trait BlockExecutionWriter: BlockWriter + Send + Sync { @@ -40,8 +64,11 @@ pub trait BlockWriter: Send + Sync { /// /// Return [StoredBlockBodyIndices] that contains indices of the first and last transactions and /// transition in the block. - fn insert_block(&self, block: SealedBlockWithSenders) - -> ProviderResult; + fn insert_block( + &self, + block: SealedBlockWithSenders, + write_transactions_to: StorageLocation, + ) -> ProviderResult; /// Appends a batch of block bodies extending the canonical chain. This is invoked during /// `Bodies` stage and does not write to `TransactionHashNumbers` and `TransactionSenders` @@ -51,6 +78,7 @@ pub trait BlockWriter: Send + Sync { fn append_block_bodies( &self, bodies: Vec<(BlockNumber, Option)>, + write_transactions_to: StorageLocation, ) -> ProviderResult<()>; /// Appends a batch of sealed blocks to the blockchain, including sender information, and @@ -69,7 +97,7 @@ pub trait BlockWriter: Send + Sync { /// Returns `Ok(())` on success, or an error if any operation fails. fn append_blocks_with_state( &self, - blocks: Vec, + blocks: Vec>, execution_outcome: ExecutionOutcome, hashed_state: HashedPostStateSorted, trie_updates: TrieUpdates, diff --git a/crates/storage/provider/src/writer/mod.rs b/crates/storage/provider/src/writer/mod.rs index 17dea5a6d..3878cf2a9 100644 --- a/crates/storage/provider/src/writer/mod.rs +++ b/crates/storage/provider/src/writer/mod.rs @@ -2,7 +2,7 @@ use crate::{ providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter as SfWriter}, writer::static_file::StaticFileWriter, BlockExecutionWriter, BlockWriter, HistoryWriter, StateChangeWriter, StateWriter, - StaticFileProviderFactory, TrieWriter, + StaticFileProviderFactory, StorageLocation, TrieWriter, }; use alloy_consensus::Header; use alloy_primitives::{BlockNumber, B256, U256}; @@ -15,7 +15,7 @@ use reth_db::{ }; use reth_errors::{ProviderError, ProviderResult}; use reth_execution_types::ExecutionOutcome; -use reth_primitives::{SealedBlock, StaticFileSegment, TransactionSignedNoHash}; +use reth_primitives::{BlockBody, SealedBlock, StaticFileSegment}; use reth_stages_types::{StageCheckpoint, StageId}; use reth_storage_api::{ DBProvider, HeaderProvider, ReceiptWriter, StageCheckpointWriter, TransactionsProviderExt, @@ -148,7 +148,7 @@ impl UnifiedStorageWriter<'_, (), ()> { impl UnifiedStorageWriter<'_, ProviderDB, &StaticFileProvider> where ProviderDB: DBProvider - + BlockWriter + + BlockWriter + TransactionsProviderExt + StateChangeWriter + TrieWriter @@ -195,7 +195,7 @@ where for block in blocks { let sealed_block = block.block().clone().try_with_senders_unchecked(block.senders().clone()).unwrap(); - self.database().insert_block(sealed_block)?; + self.database().insert_block(sealed_block, StorageLocation::Both)?; self.save_header_and_transactions(block.block.clone())?; // Write state and changesets to the database. @@ -246,25 +246,8 @@ where .save_stage_checkpoint(StageId::Headers, StageCheckpoint::new(block.number))?; } - { - let transactions_writer = - self.static_file().get_writer(block.number, StaticFileSegment::Transactions)?; - let mut storage_writer = - UnifiedStorageWriter::from(self.database(), transactions_writer); - let no_hash_transactions = block - .body - .transactions - .clone() - .into_iter() - .map(TransactionSignedNoHash::from) - .collect(); - storage_writer.append_transactions_from_blocks( - block.header().number, - std::iter::once(&no_hash_transactions), - )?; - self.database() - .save_stage_checkpoint(StageId::Bodies, StageCheckpoint::new(block.number))?; - } + self.database() + .save_stage_checkpoint(StageId::Bodies, StageCheckpoint::new(block.number))?; Ok(()) } @@ -378,56 +361,6 @@ where Ok(td) } - - /// Appends transactions to static files, using the - /// [`BlockBodyIndices`](tables::BlockBodyIndices) table to determine the transaction number - /// when appending to static files. - /// - /// NOTE: The static file writer used to construct this [`UnifiedStorageWriter`] MUST be a - /// writer for the Transactions segment. - pub fn append_transactions_from_blocks( - &mut self, - initial_block_number: BlockNumber, - transactions: impl Iterator, - ) -> ProviderResult<()> - where - T: Borrow>, - { - self.ensure_static_file_segment(StaticFileSegment::Transactions)?; - - let mut bodies_cursor = - self.database().tx_ref().cursor_read::()?; - - let mut last_tx_idx = None; - for (idx, transactions) in transactions.enumerate() { - let block_number = initial_block_number + idx as u64; - - let mut first_tx_index = - bodies_cursor.seek_exact(block_number)?.map(|(_, indices)| indices.first_tx_num()); - - // If there are no indices, that means there have been no transactions - // - // So instead of returning an error, use zero - if block_number == initial_block_number && first_tx_index.is_none() { - first_tx_index = Some(0); - } - - let mut tx_index = first_tx_index - .or(last_tx_idx) - .ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))?; - - for tx in transactions.borrow() { - self.static_file_mut().append_transaction(tx_index, tx)?; - tx_index += 1; - } - - self.static_file_mut().increment_block(block_number)?; - - // update index - last_tx_idx = Some(tx_index); - } - Ok(()) - } } impl