feat: validate headers loaded from file on reth import (#14050)

This commit is contained in:
joshieDo
2025-01-28 20:05:37 +00:00
committed by GitHub
parent 9bc07cc5bd
commit 7db8e42dc1
4 changed files with 114 additions and 37 deletions

View File

@ -87,7 +87,13 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> ImportComm
let mut total_decoded_blocks = 0;
let mut total_decoded_txns = 0;
while let Some(file_client) = reader.next_chunk::<FileClient<_>>().await? {
let mut sealed_header = provider_factory
.sealed_header(provider_factory.last_block_number()?)?
.expect("should have genesis");
while let Some(file_client) =
reader.next_chunk::<BlockTy<N>>(consensus.clone(), Some(sealed_header)).await?
{
// create a new FileClient from chunk read from file
info!(target: "reth::cli",
"Importing chain file chunk"
@ -125,6 +131,10 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> ImportComm
res = pipeline.run() => res?,
_ = tokio::signal::ctrl_c() => {},
}
sealed_header = provider_factory
.sealed_header(provider_factory.last_block_number()?)?
.expect("should have genesis");
}
let provider = provider_factory.provider()?;

View File

@ -1,4 +1,5 @@
use crate::{Consensus, ConsensusError, FullConsensus, HeaderValidator, PostExecutionInput};
use alloc::sync::Arc;
use alloy_primitives::U256;
use reth_primitives::{NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader};
use reth_primitives_traits::Block;
@ -8,6 +9,13 @@ use reth_primitives_traits::Block;
#[non_exhaustive]
pub struct NoopConsensus;
impl NoopConsensus {
/// Creates an Arc instance of Self.
pub fn arc() -> Arc<Self> {
Arc::new(Self::default())
}
}
impl<H> HeaderValidator<H> for NoopConsensus {
fn validate_header(&self, _header: &SealedHeader<H>) -> Result<(), ConsensusError> {
Ok(())

View File

@ -1,10 +1,11 @@
use std::{collections::HashMap, io, path::Path};
use std::{collections::HashMap, io, path::Path, sync::Arc};
use alloy_consensus::BlockHeader;
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::{BlockHash, BlockNumber, Sealable, B256};
use futures::Future;
use itertools::Either;
use reth_consensus::{ConsensusError, HeaderValidator};
use reth_network_p2p::{
bodies::client::{BodiesClient, BodiesFut},
download::DownloadClient,
@ -56,6 +57,10 @@ pub struct FileClient<B: Block = reth_primitives::Block> {
/// An error that can occur when constructing and using a [`FileClient`].
#[derive(Debug, Error)]
pub enum FileClientError {
/// An error occurred when validating a header from file.
#[error(transparent)]
Consensus(#[from] ConsensusError),
/// An error occurred when opening or reading the file.
#[error(transparent)]
Io(#[from] std::io::Error),
@ -77,13 +82,19 @@ impl From<&'static str> for FileClientError {
impl<B: FullBlock> FileClient<B> {
/// Create a new file client from a file path.
pub async fn new<P: AsRef<Path>>(path: P) -> Result<Self, FileClientError> {
pub async fn new<P: AsRef<Path>>(
path: P,
consensus: Arc<dyn HeaderValidator<B::Header>>,
) -> Result<Self, FileClientError> {
let file = File::open(path).await?;
Self::from_file(file).await
Self::from_file(file, consensus).await
}
/// Initialize the [`FileClient`] with a file directly.
pub(crate) async fn from_file(mut file: File) -> Result<Self, FileClientError> {
pub(crate) async fn from_file(
mut file: File,
consensus: Arc<dyn HeaderValidator<B::Header>>,
) -> Result<Self, FileClientError> {
// get file len from metadata before reading
let metadata = file.metadata().await?;
let file_len = metadata.len();
@ -91,7 +102,10 @@ impl<B: FullBlock> FileClient<B> {
let mut reader = vec![];
file.read_to_end(&mut reader).await?;
Ok(Self::from_reader(&reader[..], file_len).await?.file_client)
Ok(FileClientBuilder { consensus, parent_header: None }
.build(&reader[..], file_len)
.await?
.file_client)
}
/// Get the tip hash of the chain.
@ -183,14 +197,23 @@ impl<B: FullBlock> FileClient<B> {
}
}
impl<B: FullBlock> FromReader for FileClient<B> {
struct FileClientBuilder<B: Block = reth_primitives::Block> {
pub consensus: Arc<dyn HeaderValidator<B::Header>>,
pub parent_header: Option<SealedHeader<B::Header>>,
}
impl<B: FullBlock<Header: reth_primitives_traits::BlockHeader>> FromReader
for FileClientBuilder<B>
{
type Error = FileClientError;
type Output = FileClient<B>;
/// Initialize the [`FileClient`] from bytes that have been read from file.
fn from_reader<R>(
fn build<R>(
&self,
reader: R,
num_bytes: u64,
) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
where
R: AsyncReadExt + Unpin,
{
@ -213,6 +236,8 @@ impl<B: FullBlock> FromReader for FileClient<B> {
let mut log_interval = 0;
let mut log_interval_start_block = 0;
let mut parent_header = self.parent_header.clone();
async move {
while let Some(block_res) = stream.next().await {
let block = match block_res {
@ -231,6 +256,14 @@ impl<B: FullBlock> FromReader for FileClient<B> {
let block_number = block.header().number();
let block_hash = block.header().hash_slow();
// Validate incoming header
let sealed = SealedHeader::new(block.header().clone(), block_hash);
self.consensus.validate_header(&sealed)?;
if let Some(parent) = &parent_header {
self.consensus.validate_header_against_parent(&sealed, parent)?;
parent_header = Some(sealed);
}
// add to the internal maps
headers.insert(block.header().number(), block.header().clone());
hash_to_number.insert(block_hash, block.header().number());
@ -255,7 +288,7 @@ impl<B: FullBlock> FromReader for FileClient<B> {
trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client");
Ok(DecodedFileChunk {
file_client: Self { headers, hash_to_number, bodies },
file_client: FileClient { headers, hash_to_number, bodies },
remaining_bytes,
highest_block: None,
})
@ -452,15 +485,18 @@ impl ChunkedFileReader {
}
/// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
pub async fn next_chunk<T>(&mut self) -> Result<Option<T>, T::Error>
where
T: FromReader,
{
pub async fn next_chunk<B: FullBlock>(
&mut self,
consensus: Arc<dyn HeaderValidator<B::Header>>,
parent_header: Option<SealedHeader<B::Header>>,
) -> Result<Option<FileClient<B>>, FileClientError> {
let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) };
// make new file client from chunk
let DecodedFileChunk { file_client, remaining_bytes, .. } =
T::from_reader(&self.chunk[..], next_chunk_byte_len).await?;
FileClientBuilder { consensus, parent_header }
.build(&self.chunk[..], next_chunk_byte_len)
.await?;
// save left over bytes
self.chunk = remaining_bytes;
@ -494,17 +530,21 @@ pub trait FromReader {
/// Error returned by file client type.
type Error: From<io::Error>;
/// Output returned by file client type.
type Output;
/// Returns a file client
fn from_reader<B>(
reader: B,
fn build<R>(
&self,
reader: R,
num_bytes: u64,
) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
) -> impl Future<Output = Result<DecodedFileChunk<Self::Output>, Self::Error>>
where
Self: Sized,
B: AsyncReadExt + Unpin;
R: AsyncReadExt + Unpin;
}
/// Output from decoding a file chunk with [`FromReader::from_reader`].
/// Output from decoding a file chunk with [`FromReader::build`].
#[derive(Debug)]
pub struct DecodedFileChunk<T> {
/// File client, i.e. the decoded part of chunk.
@ -530,11 +570,12 @@ mod tests {
use assert_matches::assert_matches;
use futures_util::stream::StreamExt;
use rand::Rng;
use reth_consensus::test_utils::TestConsensus;
use reth_consensus::{noop::NoopConsensus, test_utils::TestConsensus};
use reth_network_p2p::{
bodies::downloader::BodyDownloader,
headers::downloader::{HeaderDownloader, SyncTarget},
};
use reth_primitives::Block;
use reth_provider::test_utils::create_test_provider_factory;
use std::sync::Arc;
@ -549,8 +590,12 @@ mod tests {
// create an empty file
let file = tempfile::tempfile().unwrap();
let client: Arc<FileClient> =
Arc::new(FileClient::from_file(file.into()).await.unwrap().with_bodies(bodies.clone()));
let client: Arc<FileClient> = Arc::new(
FileClient::from_file(file.into(), NoopConsensus::arc())
.await
.unwrap()
.with_bodies(bodies.clone()),
);
let mut downloader = BodiesDownloaderBuilder::default()
.build::<reth_primitives::Block, _, _>(
client.clone(),
@ -576,12 +621,14 @@ mod tests {
let file = tempfile::tempfile().unwrap();
let client: Arc<FileClient> = Arc::new(
FileClient::from_file(file.into()).await.unwrap().with_headers(HashMap::from([
(0u64, p0.clone_header()),
(1, p1.clone_header()),
(2, p2.clone_header()),
(3, p3.clone_header()),
])),
FileClient::from_file(file.into(), NoopConsensus::arc()).await.unwrap().with_headers(
HashMap::from([
(0u64, p0.clone_header()),
(1, p1.clone_header()),
(2, p2.clone_header()),
(3, p3.clone_header()),
]),
),
);
let mut downloader = ReverseHeadersDownloaderBuilder::default()
@ -604,7 +651,8 @@ mod tests {
// Generate some random blocks
let (file, headers, _) = generate_bodies_file(0..=19).await;
// now try to read them back
let client: Arc<FileClient> = Arc::new(FileClient::from_file(file).await.unwrap());
let client: Arc<FileClient> =
Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());
// construct headers downloader and use first header
let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
@ -629,7 +677,8 @@ mod tests {
let (file, headers, mut bodies) = generate_bodies_file(0..=19).await;
// now try to read them back
let client: Arc<FileClient> = Arc::new(FileClient::from_file(file).await.unwrap());
let client: Arc<FileClient> =
Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap());
// insert headers in db for the bodies downloader
insert_headers(factory.db_ref().db(), &headers);
@ -668,7 +717,9 @@ mod tests {
let mut local_header = headers.first().unwrap().clone();
// test
while let Some(client) = reader.next_chunk::<FileClient>().await.unwrap() {
while let Some(client) =
reader.next_chunk::<Block>(NoopConsensus::arc(), None).await.unwrap()
{
let sync_target = client.tip_header().unwrap();
let sync_target_hash = sync_target.hash();

View File

@ -9,15 +9,13 @@ use reth_cli_commands::{
use reth_consensus::noop::NoopConsensus;
use reth_db::tables;
use reth_db_api::transaction::DbTx;
use reth_downloaders::file_client::{
ChunkedFileReader, FileClient, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE,
};
use reth_downloaders::file_client::{ChunkedFileReader, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE};
use reth_node_builder::BlockTy;
use reth_node_core::version::SHORT_VERSION;
use reth_optimism_chainspec::OpChainSpec;
use reth_optimism_evm::OpExecutorProvider;
use reth_optimism_primitives::{bedrock::is_dup_tx, OpPrimitives};
use reth_provider::{ChainSpecProvider, StageCheckpointReader};
use reth_provider::{BlockNumReader, ChainSpecProvider, HeaderProvider, StageCheckpointReader};
use reth_prune::PruneModes;
use reth_stages::StageId;
use reth_static_file::StaticFileProducer;
@ -70,7 +68,13 @@ impl<C: ChainSpecParser<ChainSpec = OpChainSpec>> ImportOpCommand<C> {
let mut total_decoded_txns = 0;
let mut total_filtered_out_dup_txns = 0;
while let Some(mut file_client) = reader.next_chunk::<FileClient<BlockTy<N>>>().await? {
let mut sealed_header = provider_factory
.sealed_header(provider_factory.last_block_number()?)?
.expect("should have genesis");
while let Some(mut file_client) =
reader.next_chunk::<BlockTy<N>>(consensus.clone(), Some(sealed_header)).await?
{
// create a new FileClient from chunk read from file
info!(target: "reth::cli",
"Importing chain file chunk"
@ -118,6 +122,10 @@ impl<C: ChainSpecParser<ChainSpec = OpChainSpec>> ImportOpCommand<C> {
res = pipeline.run() => res?,
_ = tokio::signal::ctrl_c() => {},
}
sealed_header = provider_factory
.sealed_header(provider_factory.last_block_number()?)?
.expect("should have genesis");
}
let provider = provider_factory.provider()?;