feat: integrate HeaderValidator + make FileClient generic over block (#12681)

This commit is contained in:
Arsenii Kulikov
2024-11-20 15:07:24 +04:00
committed by GitHub
parent 6977cf0453
commit 868f3acdbc
19 changed files with 143 additions and 89 deletions

View File

@ -20,6 +20,7 @@ use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use std::{
cmp::Ordering,
collections::BinaryHeap,
fmt::Debug,
mem,
ops::RangeInclusive,
pin::Pin,
@ -298,7 +299,7 @@ where
impl<B, Provider> BodyDownloader for BodiesDownloader<B, Provider>
where
B: BodiesClient<Body: InMemorySize> + 'static,
B: BodiesClient<Body: Debug + InMemorySize> + 'static,
Provider: HeaderProvider + Unpin + 'static,
{
type Body = B::Body;

View File

@ -8,6 +8,7 @@ use reth_network_p2p::{
};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use std::{
fmt::Debug,
future::Future,
ops::RangeInclusive,
pin::Pin,
@ -47,10 +48,10 @@ impl<B: Send + Sync + Unpin + 'static> TaskDownloader<B> {
/// use reth_network_p2p::bodies::client::BodiesClient;
/// use reth_primitives_traits::InMemorySize;
/// use reth_storage_api::HeaderProvider;
/// use std::sync::Arc;
/// use std::{fmt::Debug, sync::Arc};
///
/// fn t<
/// B: BodiesClient<Body: InMemorySize> + 'static,
/// B: BodiesClient<Body: Debug + InMemorySize> + 'static,
/// Provider: HeaderProvider + Unpin + 'static,
/// >(
/// client: Arc<B>,
@ -90,7 +91,7 @@ impl<B: Send + Sync + Unpin + 'static> TaskDownloader<B> {
}
}
impl<B: Send + Sync + Unpin + 'static> BodyDownloader for TaskDownloader<B> {
impl<B: Debug + Send + Sync + Unpin + 'static> BodyDownloader for TaskDownloader<B> {
type Body = B;
fn set_download_range(&mut self, range: RangeInclusive<BlockNumber>) -> DownloadResult<()> {

View File

@ -1,8 +1,8 @@
use std::{collections::HashMap, io, path::Path};
use alloy_consensus::Header;
use alloy_consensus::BlockHeader;
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::{BlockHash, BlockNumber, B256};
use alloy_primitives::{BlockHash, BlockNumber, Sealable, B256};
use futures::Future;
use itertools::Either;
use reth_network_p2p::{
@ -13,7 +13,8 @@ use reth_network_p2p::{
priority::Priority,
};
use reth_network_peers::PeerId;
use reth_primitives::{BlockBody, SealedHeader};
use reth_primitives::SealedHeader;
use reth_primitives_traits::{Block, BlockBody, FullBlock};
use thiserror::Error;
use tokio::{fs::File, io::AsyncReadExt};
use tokio_stream::StreamExt;
@ -40,15 +41,15 @@ pub const DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE: u64 = 1_000_000_000;
///
/// This reads the entire file into memory, so it is not suitable for large files.
#[derive(Debug)]
pub struct FileClient {
pub struct FileClient<B: Block = reth_primitives::Block> {
/// The buffered headers retrieved when fetching new bodies.
headers: HashMap<BlockNumber, Header>,
headers: HashMap<BlockNumber, B::Header>,
/// A mapping between block hash and number.
hash_to_number: HashMap<BlockHash, BlockNumber>,
/// The buffered bodies retrieved when fetching new headers.
bodies: HashMap<BlockHash, BlockBody>,
bodies: HashMap<BlockHash, B::Body>,
}
/// An error that can occur when constructing and using a [`FileClient`].
@ -73,7 +74,7 @@ impl From<&'static str> for FileClientError {
}
}
impl FileClient {
impl<B: FullBlock> FileClient<B> {
/// Create a new file client from a file path.
pub async fn new<P: AsRef<Path>>(path: P) -> Result<Self, FileClientError> {
let file = File::open(path).await?;
@ -114,7 +115,7 @@ impl FileClient {
/// Clones and returns the highest header of this client has or `None` if empty. Seals header
/// before returning.
pub fn tip_header(&self) -> Option<SealedHeader> {
pub fn tip_header(&self) -> Option<SealedHeader<B::Header>> {
self.headers.get(&self.max_block()?).map(|h| SealedHeader::seal(h.clone()))
}
@ -137,13 +138,13 @@ impl FileClient {
}
/// Use the provided bodies as the file client's block body buffer.
pub fn with_bodies(mut self, bodies: HashMap<BlockHash, BlockBody>) -> Self {
pub fn with_bodies(mut self, bodies: HashMap<BlockHash, B::Body>) -> Self {
self.bodies = bodies;
self
}
/// Use the provided headers as the file client's block body buffer.
pub fn with_headers(mut self, headers: HashMap<BlockNumber, Header>) -> Self {
pub fn with_headers(mut self, headers: HashMap<BlockNumber, B::Header>) -> Self {
self.headers = headers;
for (number, header) in &self.headers {
self.hash_to_number.insert(header.hash_slow(), *number);
@ -162,14 +163,14 @@ impl FileClient {
}
/// Returns an iterator over headers in the client.
pub fn headers_iter(&self) -> impl Iterator<Item = &Header> {
pub fn headers_iter(&self) -> impl Iterator<Item = &B::Header> {
self.headers.values()
}
/// Returns a mutable iterator over bodies in the client.
///
/// Panics, if file client headers and bodies are not mapping 1-1.
pub fn bodies_iter_mut(&mut self) -> impl Iterator<Item = (u64, &mut BlockBody)> {
pub fn bodies_iter_mut(&mut self) -> impl Iterator<Item = (u64, &mut B::Body)> {
let bodies = &mut self.bodies;
let numbers = &self.hash_to_number;
bodies.iter_mut().map(|(hash, body)| (numbers[hash], body))
@ -177,27 +178,28 @@ impl FileClient {
/// Returns the current number of transactions in the client.
pub fn total_transactions(&self) -> usize {
self.bodies.iter().fold(0, |acc, (_, body)| acc + body.transactions.len())
self.bodies.iter().fold(0, |acc, (_, body)| acc + body.transactions().len())
}
}
impl FromReader for FileClient {
impl<B: FullBlock> FromReader for FileClient<B> {
type Error = FileClientError;
/// Initialize the [`FileClient`] from bytes that have been read from file.
fn from_reader<B>(
reader: B,
fn from_reader<R>(
reader: R,
num_bytes: u64,
) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
where
B: AsyncReadExt + Unpin,
R: AsyncReadExt + Unpin,
{
let mut headers = HashMap::default();
let mut hash_to_number = HashMap::default();
let mut bodies = HashMap::default();
// use with_capacity to make sure the internal buffer contains the entire chunk
let mut stream = FramedRead::with_capacity(reader, BlockFileCodec, num_bytes as usize);
let mut stream =
FramedRead::with_capacity(reader, BlockFileCodec::<B>::default(), num_bytes as usize);
trace!(target: "downloaders::file",
target_num_bytes=num_bytes,
@ -225,13 +227,13 @@ impl FromReader for FileClient {
}
Err(err) => return Err(err),
};
let block_number = block.header.number;
let block_hash = block.header.hash_slow();
let block_number = block.header().number();
let block_hash = block.header().hash_slow();
// add to the internal maps
headers.insert(block.header.number, block.header.clone());
hash_to_number.insert(block_hash, block.header.number);
bodies.insert(block_hash, block.into());
headers.insert(block.header().number(), block.header().clone());
hash_to_number.insert(block_hash, block.header().number());
bodies.insert(block_hash, block.body().clone());
if log_interval == 0 {
trace!(target: "downloaders::file",
@ -260,9 +262,9 @@ impl FromReader for FileClient {
}
}
impl HeadersClient for FileClient {
type Header = Header;
type Output = HeadersFut;
impl<B: FullBlock> HeadersClient for FileClient<B> {
type Header = B::Header;
type Output = HeadersFut<B::Header>;
fn get_headers_with_priority(
&self,
@ -311,9 +313,9 @@ impl HeadersClient for FileClient {
}
}
impl BodiesClient for FileClient {
type Body = BlockBody;
type Output = BodiesFut;
impl<B: FullBlock> BodiesClient for FileClient<B> {
type Body = B::Body;
type Output = BodiesFut<B::Body>;
fn get_block_bodies_with_priority(
&self,
@ -336,7 +338,7 @@ impl BodiesClient for FileClient {
}
}
impl DownloadClient for FileClient {
impl<B: FullBlock> DownloadClient for FileClient<B> {
fn report_bad_message(&self, _peer_id: PeerId) {
warn!("Reported a bad message on a file client, the file may be corrupted or invalid");
// noop
@ -542,7 +544,7 @@ mod tests {
// create an empty file
let file = tempfile::tempfile().unwrap();
let client =
let client: Arc<FileClient> =
Arc::new(FileClient::from_file(file.into()).await.unwrap().with_bodies(bodies.clone()));
let mut downloader = BodiesDownloaderBuilder::default().build(
client.clone(),
@ -567,14 +569,14 @@ mod tests {
let p0 = child_header(&p1);
let file = tempfile::tempfile().unwrap();
let client = Arc::new(FileClient::from_file(file.into()).await.unwrap().with_headers(
HashMap::from([
let client: Arc<FileClient> = Arc::new(
FileClient::from_file(file.into()).await.unwrap().with_headers(HashMap::from([
(0u64, p0.clone().unseal()),
(1, p1.clone().unseal()),
(2, p2.clone().unseal()),
(3, p3.clone().unseal()),
]),
));
])),
);
let mut downloader = ReverseHeadersDownloaderBuilder::default()
.stream_batch_size(3)
@ -596,7 +598,7 @@ mod tests {
// Generate some random blocks
let (file, headers, _) = generate_bodies_file(0..=19).await;
// now try to read them back
let client = Arc::new(FileClient::from_file(file).await.unwrap());
let client: Arc<FileClient> = Arc::new(FileClient::from_file(file).await.unwrap());
// construct headers downloader and use first header
let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
@ -621,7 +623,7 @@ mod tests {
let (file, headers, mut bodies) = generate_bodies_file(0..=19).await;
// now try to read them back
let client = Arc::new(FileClient::from_file(file).await.unwrap());
let client: Arc<FileClient> = Arc::new(FileClient::from_file(file).await.unwrap());
// insert headers in db for the bodies downloader
insert_headers(factory.db_ref().db(), &headers);

View File

@ -3,7 +3,6 @@
use crate::file_client::FileClientError;
use alloy_primitives::bytes::{Buf, BytesMut};
use alloy_rlp::{Decodable, Encodable};
use reth_primitives::Block;
use tokio_util::codec::{Decoder, Encoder};
/// Codec for reading raw block bodies from a file.
@ -19,10 +18,16 @@ use tokio_util::codec::{Decoder, Encoder};
///
/// It's recommended to use [`with_capacity`](tokio_util::codec::FramedRead::with_capacity) to set
/// the capacity of the framed reader to the size of the file.
pub(crate) struct BlockFileCodec;
pub(crate) struct BlockFileCodec<B>(std::marker::PhantomData<B>);
impl Decoder for BlockFileCodec {
type Item = Block;
impl<B> Default for BlockFileCodec<B> {
fn default() -> Self {
Self(std::marker::PhantomData)
}
}
impl<B: Decodable> Decoder for BlockFileCodec<B> {
type Item = B;
type Error = FileClientError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
@ -31,18 +36,17 @@ impl Decoder for BlockFileCodec {
}
let buf_slice = &mut src.as_ref();
let body =
Block::decode(buf_slice).map_err(|err| FileClientError::Rlp(err, src.to_vec()))?;
let body = B::decode(buf_slice).map_err(|err| FileClientError::Rlp(err, src.to_vec()))?;
src.advance(src.len() - buf_slice.len());
Ok(Some(body))
}
}
impl Encoder<Block> for BlockFileCodec {
impl<B: Encodable> Encoder<B> for BlockFileCodec<B> {
type Error = FileClientError;
fn encode(&mut self, item: Block, dst: &mut BytesMut) -> Result<(), Self::Error> {
fn encode(&mut self, item: B, dst: &mut BytesMut) -> Result<(), Self::Error> {
item.encode(dst);
Ok(())
}

View File

@ -9,7 +9,7 @@ use futures::{stream::Stream, FutureExt};
use futures_util::{stream::FuturesUnordered, StreamExt};
use rayon::prelude::*;
use reth_config::config::HeadersConfig;
use reth_consensus::{Consensus, HeaderValidator};
use reth_consensus::HeaderValidator;
use reth_network_p2p::{
error::{DownloadError, DownloadResult, PeerRequestResult},
headers::{
@ -68,7 +68,7 @@ impl<H> From<HeadersResponseError> for ReverseHeadersDownloaderError<H> {
#[derive(Debug)]
pub struct ReverseHeadersDownloader<H: HeadersClient> {
/// Consensus client used to validate headers
consensus: Arc<dyn Consensus<H::Header>>,
consensus: Arc<dyn HeaderValidator<H::Header>>,
/// Client used to download headers.
client: Arc<H>,
/// The local head of the chain.
@ -1165,7 +1165,7 @@ impl ReverseHeadersDownloaderBuilder {
pub fn build<H>(
self,
client: H,
consensus: Arc<dyn Consensus<H::Header>>,
consensus: Arc<dyn HeaderValidator<H::Header>>,
) -> ReverseHeadersDownloader<H>
where
H: HeadersClient + 'static,

View File

@ -8,6 +8,7 @@ use reth_network_p2p::headers::{
use reth_primitives::SealedHeader;
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use std::{
fmt::Debug,
future::Future,
pin::Pin,
task::{ready, Context, Poll},
@ -44,10 +45,10 @@ impl<H: Send + Sync + Unpin + 'static> TaskDownloader<H> {
/// # use std::sync::Arc;
/// # use reth_downloaders::headers::reverse_headers::ReverseHeadersDownloader;
/// # use reth_downloaders::headers::task::TaskDownloader;
/// # use reth_consensus::Consensus;
/// # use reth_consensus::HeaderValidator;
/// # use reth_network_p2p::headers::client::HeadersClient;
/// # use reth_primitives_traits::BlockHeader;
/// # fn t<H: HeadersClient<Header: BlockHeader> + 'static>(consensus:Arc<dyn Consensus<H::Header>>, client: Arc<H>) {
/// # fn t<H: HeadersClient<Header: BlockHeader> + 'static>(consensus:Arc<dyn HeaderValidator<H::Header>>, client: Arc<H>) {
/// let downloader = ReverseHeadersDownloader::<H>::builder().build(
/// client,
/// consensus
@ -82,7 +83,7 @@ impl<H: Send + Sync + Unpin + 'static> TaskDownloader<H> {
}
}
impl<H: Send + Sync + Unpin + 'static> HeaderDownloader for TaskDownloader<H> {
impl<H: Debug + Send + Sync + Unpin + 'static> HeaderDownloader for TaskDownloader<H> {
type Header = H;
fn update_sync_gap(&mut self, head: SealedHeader<H>, target: SyncTarget) {

View File

@ -43,7 +43,7 @@ pub(crate) async fn generate_bodies_file(
let raw_block_bodies = create_raw_bodies(headers.iter().cloned(), &mut bodies.clone());
let file: File = tempfile::tempfile().unwrap().into();
let mut writer = FramedWrite::new(file, BlockFileCodec);
let mut writer = FramedWrite::new(file, BlockFileCodec::default());
// rlp encode one after the other
for block in raw_block_bodies {