refactor: unify logic for writing headers (#12858)

This commit is contained in:
Arsenii Kulikov
2024-11-26 05:38:46 +04:00
committed by GitHub
parent b96c0d9897
commit 404f8f8778
3 changed files with 29 additions and 87 deletions

View File

@ -2749,22 +2749,12 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWrite
fn insert_block(
&self,
block: SealedBlockWithSenders<Self::Block>,
write_transactions_to: StorageLocation,
write_to: StorageLocation,
) -> ProviderResult<StoredBlockBodyIndices> {
let block_number = block.number;
let mut durations_recorder = metrics::DurationsRecorder::default();
self.tx.put::<tables::CanonicalHeaders>(block_number, block.hash())?;
durations_recorder.record_relative(metrics::Action::InsertCanonicalHeaders);
// Put header with canonical hashes.
self.tx.put::<tables::Headers>(block_number, block.header.as_ref().clone())?;
durations_recorder.record_relative(metrics::Action::InsertHeaders);
self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers);
// total difficulty
let ttd = if block_number == 0 {
block.difficulty
@ -2775,8 +2765,26 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWrite
parent_ttd + block.difficulty
};
self.tx.put::<tables::HeaderTerminalDifficulties>(block_number, ttd.into())?;
durations_recorder.record_relative(metrics::Action::InsertHeaderTerminalDifficulties);
if write_to.database() {
self.tx.put::<tables::CanonicalHeaders>(block_number, block.hash())?;
durations_recorder.record_relative(metrics::Action::InsertCanonicalHeaders);
// Put header with canonical hashes.
self.tx.put::<tables::Headers>(block_number, block.header.as_ref().clone())?;
durations_recorder.record_relative(metrics::Action::InsertHeaders);
self.tx.put::<tables::HeaderTerminalDifficulties>(block_number, ttd.into())?;
durations_recorder.record_relative(metrics::Action::InsertHeaderTerminalDifficulties);
}
if write_to.static_files() {
let mut writer =
self.static_file_provider.get_writer(block_number, StaticFileSegment::Headers)?;
writer.append_header(&block.header, ttd, &block.hash())?;
}
self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers);
let mut next_tx_num = self
.tx
@ -2805,10 +2813,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWrite
next_tx_num += 1;
}
self.append_block_bodies(
vec![(block_number, Some(block.block.body))],
write_transactions_to,
)?;
self.append_block_bodies(vec![(block_number, Some(block.block.body))], write_to)?;
debug!(
target: "providers::db",

View File

@ -68,10 +68,13 @@ pub trait BlockWriter: Send + Sync {
///
/// Return [StoredBlockBodyIndices] that contains indices of the first and last transactions and
/// transition in the block.
///
/// Accepts [`StorageLocation`] value which specifies where transactions and headers should be
/// written.
fn insert_block(
&self,
block: SealedBlockWithSenders<Self::Block>,
write_transactions_to: StorageLocation,
write_to: StorageLocation,
) -> ProviderResult<StoredBlockBodyIndices>;
/// Appends a batch of block bodies extending the canonical chain. This is invoked during

View File

@ -4,26 +4,22 @@ use crate::{
BlockExecutionWriter, BlockWriter, HistoryWriter, StateChangeWriter, StateWriter,
StaticFileProviderFactory, StorageLocation, TrieWriter,
};
use alloy_consensus::Header;
use alloy_primitives::{BlockNumber, B256, U256};
use alloy_primitives::BlockNumber;
use reth_chain_state::ExecutedBlock;
use reth_db::{
cursor::DbCursorRO,
models::CompactU256,
tables,
transaction::{DbTx, DbTxMut},
};
use reth_errors::{ProviderError, ProviderResult};
use reth_execution_types::ExecutionOutcome;
use reth_primitives::{SealedBlock, StaticFileSegment};
use reth_stages_types::{StageCheckpoint, StageId};
use reth_primitives::StaticFileSegment;
use reth_storage_api::{
DBProvider, HeaderProvider, ReceiptWriter, StageCheckpointWriter, TransactionsProviderExt,
};
use reth_storage_errors::writer::UnifiedStorageWriterError;
use revm::db::OriginalValuesKnown;
use std::{borrow::Borrow, sync::Arc};
use tracing::{debug, instrument};
use tracing::debug;
mod database;
mod static_file;
@ -196,7 +192,6 @@ where
let sealed_block =
block.block().clone().try_with_senders_unchecked(block.senders().clone()).unwrap();
self.database().insert_block(sealed_block, StorageLocation::Both)?;
self.save_header_and_transactions(block.block.clone())?;
// Write state and changesets to the database.
// Must be written after blocks because of the receipt lookup.
@ -223,35 +218,6 @@ where
Ok(())
}
/// Writes the header & transactions to static files, and updates their respective checkpoints
/// on database.
#[instrument(level = "trace", skip_all, fields(block = ?block.num_hash()) target = "storage")]
fn save_header_and_transactions(&self, block: Arc<SealedBlock>) -> ProviderResult<()> {
debug!(target: "provider::storage_writer", "Writing headers and transactions.");
{
let header_writer =
self.static_file().get_writer(block.number, StaticFileSegment::Headers)?;
let mut storage_writer = UnifiedStorageWriter::from(self.database(), header_writer);
let td = storage_writer.append_headers_from_blocks(
block.header().number,
std::iter::once(&(block.header(), block.hash())),
)?;
debug!(target: "provider::storage_writer", block_num=block.number, "Updating transaction metadata after writing");
self.database()
.tx_ref()
.put::<tables::HeaderTerminalDifficulties>(block.number, CompactU256(td))?;
self.database()
.save_stage_checkpoint(StageId::Headers, StageCheckpoint::new(block.number))?;
}
self.database()
.save_stage_checkpoint(StageId::Bodies, StageCheckpoint::new(block.number))?;
Ok(())
}
/// Removes all block, transaction and receipt data above the given block number from the
/// database and static files. This is exclusive, i.e., it only removes blocks above
/// `block_number`, and does not remove `block_number`.
@ -323,38 +289,6 @@ where
None => Err(UnifiedStorageWriterError::MissingStaticFileWriter),
}
}
/// Appends headers to static files, using the
/// [`HeaderTerminalDifficulties`](tables::HeaderTerminalDifficulties) table to determine the
/// total difficulty of the parent block during header insertion.
///
/// NOTE: The static file writer used to construct this [`UnifiedStorageWriter`] MUST be a
/// writer for the Headers segment.
pub fn append_headers_from_blocks<H, I>(
&mut self,
initial_block_number: BlockNumber,
headers: impl Iterator<Item = I>,
) -> ProviderResult<U256>
where
I: Borrow<(H, B256)>,
H: Borrow<Header>,
{
self.ensure_static_file_segment(StaticFileSegment::Headers)?;
let mut td = self
.database()
.header_td_by_number(initial_block_number)?
.ok_or(ProviderError::TotalDifficultyNotFound(initial_block_number))?;
for pair in headers {
let (header, hash) = pair.borrow();
let header = header.borrow();
td += header.difficulty;
self.static_file_mut().append_header(header, td, hash)?;
}
Ok(td)
}
}
impl<ProviderDB>