diff --git a/crates/net/p2p/src/bodies/response.rs b/crates/net/p2p/src/bodies/response.rs index 153d7d39d..11aaab17a 100644 --- a/crates/net/p2p/src/bodies/response.rs +++ b/crates/net/p2p/src/bodies/response.rs @@ -32,6 +32,14 @@ impl BlockResponse { Self::Empty(header) => header.difficulty, } } + + /// Return the reference to the response body + pub fn into_body(self) -> Option { + match self { + Self::Full(block) => Some(block.body), + Self::Empty(_) => None, + } + } } impl InMemorySize for BlockResponse { diff --git a/crates/stages/stages/src/stages/bodies.rs b/crates/stages/stages/src/stages/bodies.rs index eae61b088..c4676b272 100644 --- a/crates/stages/stages/src/stages/bodies.rs +++ b/crates/stages/stages/src/stages/bodies.rs @@ -7,17 +7,16 @@ use futures_util::TryStreamExt; use tracing::*; use alloy_primitives::TxNumber; -use reth_db::tables; +use reth_db::{tables, transaction::DbTx}; use reth_db_api::{ cursor::{DbCursorRO, DbCursorRW}, - models::{StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals}, transaction::DbTxMut, }; use reth_network_p2p::bodies::{downloader::BodyDownloader, response::BlockResponse}; use reth_primitives::StaticFileSegment; use reth_provider::{ providers::{StaticFileProvider, StaticFileWriter}, - BlockReader, DBProvider, ProviderError, StaticFileProviderFactory, StatsReader, + BlockReader, BlockWriter, DBProvider, ProviderError, StaticFileProviderFactory, StatsReader, }; use reth_stages_api::{ EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId, @@ -72,7 +71,11 @@ impl BodyStage { impl Stage for BodyStage where - Provider: DBProvider + StaticFileProviderFactory + StatsReader + BlockReader, + Provider: DBProvider + + StaticFileProviderFactory + + StatsReader + + BlockReader + + BlockWriter, D: BodyDownloader, { /// Return the id of the stage @@ -116,15 +119,13 @@ where } let (from_block, to_block) = input.next_block_range().into_inner(); - // Cursors used to write bodies, ommers and transactions - let tx = provider.tx_ref(); - let mut block_indices_cursor = tx.cursor_write::()?; - let mut tx_block_cursor = tx.cursor_write::()?; - let mut ommers_cursor = tx.cursor_write::()?; - let mut withdrawals_cursor = tx.cursor_write::()?; - // Get id for the next tx_num of zero if there are no transactions. - let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default(); + let mut next_tx_num = provider + .tx_ref() + .cursor_read::()? + .last()? + .map(|(id, _)| id + 1) + .unwrap_or_default(); let static_file_provider = provider.static_file_provider(); let mut static_file_producer = @@ -166,17 +167,10 @@ where let buffer = self.buffer.take().ok_or(StageError::MissingDownloadBuffer)?; trace!(target: "sync::stages::bodies", bodies_len = buffer.len(), "Writing blocks"); let mut highest_block = from_block; - for response in buffer { - // Write block - let block_number = response.block_number(); - let block_indices = StoredBlockBodyIndices { - first_tx_num: next_tx_num, - tx_count: match &response { - BlockResponse::Full(block) => block.body.transactions.len() as u64, - BlockResponse::Empty(_) => 0, - }, - }; + // Firstly, write transactions to static files + for response in &buffer { + let block_number = response.block_number(); // Increment block on static file header. if block_number > 0 { @@ -195,15 +189,10 @@ where match response { BlockResponse::Full(block) => { - // write transaction block index - if !block.body.transactions.is_empty() { - tx_block_cursor.append(block_indices.last_tx_num(), block.number)?; - } - // Write transactions - for transaction in block.body.transactions { + for transaction in &block.body.transactions { let appended_tx_number = static_file_producer - .append_transaction(next_tx_num, &transaction.into())?; + .append_transaction(next_tx_num, &transaction.clone().into())?; if appended_tx_number != next_tx_num { // This scenario indicates a critical error in the logic of adding new @@ -218,32 +207,19 @@ where // Increment transaction id for each transaction. next_tx_num += 1; } - - // Write ommers if any - if !block.body.ommers.is_empty() { - ommers_cursor.append( - block_number, - StoredBlockOmmers { ommers: block.body.ommers }, - )?; - } - - // Write withdrawals if any - if let Some(withdrawals) = block.body.withdrawals { - if !withdrawals.is_empty() { - withdrawals_cursor - .append(block_number, StoredBlockWithdrawals { withdrawals })?; - } - } } BlockResponse::Empty(_) => {} }; - // insert block meta - block_indices_cursor.append(block_number, block_indices)?; - highest_block = block_number; } + // Write bodies to database. This will NOT write transactions to database as we've already + // written them directly to static files. + provider.append_block_bodies( + buffer.into_iter().map(|response| (response.block_number(), response.into_body())), + )?; + // The stage is "done" if: // - We got fewer blocks than our target // - We reached our target and the target was not limited by the batch size of the stage diff --git a/crates/storage/db-models/src/blocks.rs b/crates/storage/db-models/src/blocks.rs index ed1d7fb67..be7661c8b 100644 --- a/crates/storage/db-models/src/blocks.rs +++ b/crates/storage/db-models/src/blocks.rs @@ -12,7 +12,7 @@ pub type NumTransactions = u64; /// /// It has the pointer to the transaction Number of the first /// transaction in the block and the total number of transactions. -#[derive(Debug, Default, Eq, PartialEq, Clone, Serialize, Deserialize, Compact)] +#[derive(Debug, Default, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Compact)] #[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))] #[add_arbitrary_tests(compact)] pub struct StoredBlockBodyIndices { diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index eef0ff5b6..20d01932a 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -3250,7 +3250,7 @@ impl + } let block_indices = StoredBlockBodyIndices { first_tx_num, tx_count }; - self.tx.put::(block_number, block_indices.clone())?; + self.tx.put::(block_number, block_indices)?; durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices); if !block_indices.is_empty() { @@ -3268,6 +3268,50 @@ impl + Ok(block_indices) } + fn append_block_bodies( + &self, + bodies: impl Iterator)>, + ) -> ProviderResult<()> { + let mut block_indices_cursor = self.tx.cursor_write::()?; + let mut tx_block_cursor = self.tx.cursor_write::()?; + let mut ommers_cursor = self.tx.cursor_write::()?; + let mut withdrawals_cursor = self.tx.cursor_write::()?; + + // Get id for the next tx_num of zero if there are no transactions. + let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default(); + + for (block_number, body) in bodies { + let tx_count = body.as_ref().map(|b| b.transactions.len() as u64).unwrap_or_default(); + let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count }; + + // insert block meta + block_indices_cursor.append(block_number, block_indices)?; + + next_tx_num += tx_count; + let Some(body) = body else { continue }; + + // write transaction block index + if !body.transactions.is_empty() { + tx_block_cursor.append(block_indices.last_tx_num(), block_number)?; + } + + // Write ommers if any + if !body.ommers.is_empty() { + ommers_cursor.append(block_number, StoredBlockOmmers { ommers: body.ommers })?; + } + + // Write withdrawals if any + if let Some(withdrawals) = body.withdrawals { + if !withdrawals.is_empty() { + withdrawals_cursor + .append(block_number, StoredBlockWithdrawals { withdrawals })?; + } + } + } + + Ok(()) + } + /// TODO(joshie): this fn should be moved to `UnifiedStorageWriter` eventually fn append_blocks_with_state( &self, diff --git a/crates/storage/provider/src/traits/block.rs b/crates/storage/provider/src/traits/block.rs index 7202c405f..5cb60c2f4 100644 --- a/crates/storage/provider/src/traits/block.rs +++ b/crates/storage/provider/src/traits/block.rs @@ -1,7 +1,7 @@ use alloy_primitives::BlockNumber; use reth_db_api::models::StoredBlockBodyIndices; use reth_execution_types::{Chain, ExecutionOutcome}; -use reth_primitives::SealedBlockWithSenders; +use reth_primitives::{BlockBody, SealedBlockWithSenders}; use reth_storage_errors::provider::ProviderResult; use reth_trie::{updates::TrieUpdates, HashedPostStateSorted}; use std::ops::RangeInclusive; @@ -40,6 +40,16 @@ pub trait BlockWriter: Send + Sync { fn insert_block(&self, block: SealedBlockWithSenders) -> ProviderResult; + /// Appends a batch of block bodies extending the canonical chain. This is invoked during + /// `Bodies` stage and does not write to `TransactionHashNumbers` and `TransactionSenders` + /// tables which are populated on later stages. + /// + /// Bodies are passed as [`Option`]s, if body is `None` the corresponding block is empty. + fn append_block_bodies( + &self, + bodies: impl Iterator)>, + ) -> ProviderResult<()>; + /// Appends a batch of sealed blocks to the blockchain, including sender information, and /// updates the post-state. ///