feat: add header AT to provider (#13030)

Co-authored-by: Arsenii Kulikov <klkvrr@gmail.com>
This commit is contained in:
Matthias Seitz
2024-12-02 14:24:48 +01:00
committed by GitHub
parent 519a10ae99
commit 332cce1f9b
71 changed files with 669 additions and 434 deletions

View File

@ -75,7 +75,9 @@ impl<D: BodyDownloader> BodyStage<D> {
unwind_block: Option<u64>,
) -> Result<(), StageError>
where
Provider: DBProvider<Tx: DbTxMut> + BlockReader + StaticFileProviderFactory,
Provider: DBProvider<Tx: DbTxMut>
+ BlockReader<Header = reth_primitives::Header>
+ StaticFileProviderFactory,
{
// Get id for the next tx_num of zero if there are no transactions.
let next_tx_num = provider
@ -152,7 +154,7 @@ where
Provider: DBProvider<Tx: DbTxMut>
+ StaticFileProviderFactory
+ StatsReader
+ BlockReader
+ BlockReader<Header = reth_primitives::Header>
+ BlockWriter<Block: Block<Body = D::Body>>,
D: BodyDownloader<Body: BlockBody<Transaction: Compact>>,
{

View File

@ -193,7 +193,10 @@ where
unwind_to: Option<u64>,
) -> Result<(), StageError>
where
Provider: StaticFileProviderFactory + DBProvider + BlockReader + HeaderProvider,
Provider: StaticFileProviderFactory
+ DBProvider
+ BlockReader
+ HeaderProvider<Header = reth_primitives::Header>,
{
// If thre's any receipts pruning configured, receipts are written directly to database and
// inconsistencies are expected.
@ -265,8 +268,10 @@ impl<E, Provider> Stage<Provider> for ExecutionStage<E>
where
E: BlockExecutorProvider<Primitives: NodePrimitives<BlockHeader = alloy_consensus::Header>>,
Provider: DBProvider
+ BlockReader<Block = <E::Primitives as NodePrimitives>::Block>
+ StaticFileProviderFactory
+ BlockReader<
Block = <E::Primitives as NodePrimitives>::Block,
Header = <E::Primitives as NodePrimitives>::BlockHeader,
> + StaticFileProviderFactory
+ StatsReader
+ BlockHashReader
+ StateWriter<Receipt = <E::Primitives as NodePrimitives>::Receipt>

View File

@ -63,8 +63,10 @@ impl AccountHashingStage {
opts: SeedOpts,
) -> Result<Vec<(alloy_primitives::Address, reth_primitives::Account)>, StageError>
where
N::Primitives:
reth_primitives_traits::FullNodePrimitives<BlockBody = reth_primitives::BlockBody>,
N::Primitives: reth_primitives_traits::FullNodePrimitives<
BlockBody = reth_primitives::BlockBody,
BlockHeader = reth_primitives::Header,
>,
{
use alloy_primitives::U256;
use reth_db_api::models::AccountBeforeTx;

View File

@ -1,3 +1,4 @@
use alloy_consensus::BlockHeader;
use alloy_primitives::{BlockHash, BlockNumber, Bytes, B256};
use futures_util::StreamExt;
use reth_config::config::EtlConfig;
@ -10,7 +11,7 @@ use reth_db_api::{
};
use reth_etl::Collector;
use reth_network_p2p::headers::{downloader::HeaderDownloader, error::HeadersDownloaderError};
use reth_primitives::{SealedHeader, StaticFileSegment};
use reth_primitives::{NodePrimitives, SealedHeader, StaticFileSegment};
use reth_primitives_traits::serde_bincode_compat;
use reth_provider::{
providers::StaticFileWriter, BlockHashReader, DBProvider, HeaderProvider, HeaderSyncGap,
@ -50,7 +51,7 @@ pub struct HeaderStage<Provider, Downloader: HeaderDownloader> {
/// Consensus client implementation
consensus: Arc<dyn HeaderValidator<Downloader::Header>>,
/// Current sync gap.
sync_gap: Option<HeaderSyncGap>,
sync_gap: Option<HeaderSyncGap<Downloader::Header>>,
/// ETL collector with `HeaderHash` -> `BlockNumber`
hash_collector: Collector<BlockHash, BlockNumber>,
/// ETL collector with `BlockNumber` -> `BincodeSealedHeader`
@ -63,7 +64,7 @@ pub struct HeaderStage<Provider, Downloader: HeaderDownloader> {
impl<Provider, Downloader> HeaderStage<Provider, Downloader>
where
Downloader: HeaderDownloader<Header = alloy_consensus::Header>,
Downloader: HeaderDownloader,
{
/// Create a new header stage
pub fn new(
@ -89,10 +90,14 @@ where
///
/// Writes to static files ( `Header | HeaderTD | HeaderHash` ) and [`tables::HeaderNumbers`]
/// database table.
fn write_headers<P: DBProvider<Tx: DbTxMut> + StaticFileProviderFactory>(
&mut self,
provider: &P,
) -> Result<BlockNumber, StageError> {
fn write_headers<P>(&mut self, provider: &P) -> Result<BlockNumber, StageError>
where
P: DBProvider<Tx: DbTxMut>
+ StaticFileProviderFactory<
Primitives: NodePrimitives<BlockHeader = reth_primitives::Header>,
>,
Downloader: HeaderDownloader<Header = <P::Primitives as NodePrimitives>::BlockHeader>,
{
let total_headers = self.header_collector.len();
info!(target: "sync::stages::headers", total = total_headers, "Writing headers");
@ -121,19 +126,19 @@ where
info!(target: "sync::stages::headers", progress = %format!("{:.2}%", (index as f64 / total_headers as f64) * 100.0), "Writing headers");
}
let sealed_header: SealedHeader =
bincode::deserialize::<serde_bincode_compat::SealedHeader<'_>>(&header_buf)
let sealed_header: SealedHeader<Downloader::Header> =
bincode::deserialize::<serde_bincode_compat::SealedHeader<'_, _>>(&header_buf)
.map_err(|err| StageError::Fatal(Box::new(err)))?
.into();
let (header, header_hash) = sealed_header.split();
if header.number == 0 {
if header.number() == 0 {
continue
}
last_header_number = header.number;
last_header_number = header.number();
// Increase total difficulty
td += header.difficulty;
td += header.difficulty();
// Header validation
self.consensus.validate_header_with_total_difficulty(&header, td).map_err(|error| {
@ -193,9 +198,10 @@ where
impl<Provider, P, D> Stage<Provider> for HeaderStage<P, D>
where
P: HeaderSyncGapProvider,
D: HeaderDownloader<Header = alloy_consensus::Header>,
Provider: DBProvider<Tx: DbTxMut> + StaticFileProviderFactory,
Provider::Primitives: NodePrimitives<BlockHeader = reth_primitives::Header>,
P: HeaderSyncGapProvider<Header = <Provider::Primitives as NodePrimitives>::BlockHeader>,
D: HeaderDownloader<Header = <Provider::Primitives as NodePrimitives>::BlockHeader>,
{
/// Return the id of the stage
fn id(&self) -> StageId {
@ -232,7 +238,7 @@ where
}
debug!(target: "sync::stages::headers", ?tip, head = ?gap.local_head.hash(), "Commencing sync");
let local_head_number = gap.local_head.number;
let local_head_number = gap.local_head.number();
// let the downloader know what to sync
self.downloader.update_sync_gap(gap.local_head, gap.target);
@ -241,9 +247,9 @@ where
loop {
match ready!(self.downloader.poll_next_unpin(cx)) {
Some(Ok(headers)) => {
info!(target: "sync::stages::headers", total = headers.len(), from_block = headers.first().map(|h| h.number), to_block = headers.last().map(|h| h.number), "Received headers");
info!(target: "sync::stages::headers", total = headers.len(), from_block = headers.first().map(|h| h.number()), to_block = headers.last().map(|h| h.number()), "Received headers");
for header in headers {
let header_number = header.number;
let header_number = header.number();
self.hash_collector.insert(header.hash(), header_number)?;
self.header_collector.insert(

View File

@ -1,3 +1,4 @@
use alloy_consensus::BlockHeader;
use alloy_primitives::{BlockNumber, B256};
use reth_codecs::Compact;
use reth_consensus::ConsensusError;
@ -135,7 +136,7 @@ where
Provider: DBProvider<Tx: DbTxMut>
+ TrieWriter
+ StatsReader
+ HeaderProvider
+ HeaderProvider<Header = alloy_consensus::Header>
+ StageCheckpointReader
+ StageCheckpointWriter,
{
@ -168,7 +169,7 @@ where
let target_block = provider
.header_by_number(to_block)?
.ok_or_else(|| ProviderError::HeaderNotFound(to_block.into()))?;
let target_block_root = target_block.state_root;
let target_block_root = target_block.state_root();
let mut checkpoint = self.get_execution_checkpoint(provider)?;
let (trie_root, entities_checkpoint) = if range.is_empty() {

View File

@ -59,7 +59,7 @@ impl Default for SenderRecoveryStage {
impl<Provider> Stage<Provider> for SenderRecoveryStage
where
Provider: DBProvider<Tx: DbTxMut>
+ BlockReader
+ BlockReader<Header = reth_primitives::Header>
+ StaticFileProviderFactory<Primitives: NodePrimitives<SignedTx: Value + SignedTransaction>>
+ StatsReader
+ PruneCheckpointReader,
@ -146,7 +146,8 @@ fn recover_range<Provider, CURSOR>(
senders_cursor: &mut CURSOR,
) -> Result<(), StageError>
where
Provider: DBProvider + HeaderProvider + StaticFileProviderFactory,
Provider:
DBProvider + HeaderProvider<Header = reth_primitives::Header> + StaticFileProviderFactory,
CURSOR: DbCursorRW<tables::TransactionSenders>,
{
debug!(target: "sync::stages::sender_recovery", ?tx_range, "Sending batch for processing");

View File

@ -258,7 +258,7 @@ pub(crate) fn missing_static_data_error<Provider>(
segment: StaticFileSegment,
) -> Result<StageError, ProviderError>
where
Provider: BlockReader + StaticFileProviderFactory,
Provider: BlockReader<Header = reth_primitives::Header> + StaticFileProviderFactory,
{
let mut last_block =
static_file_provider.get_highest_static_file_block(segment).unwrap_or_default();