feat: make BlockResponse generic over header (#13195)

This commit is contained in:
Dan Cline
2024-12-06 16:35:51 -05:00
committed by GitHub
parent e9915702fa
commit 552c6237a8
8 changed files with 33 additions and 23 deletions

View File

@ -61,7 +61,7 @@ pub struct BodiesDownloader<B: BodiesClient, Provider> {
/// Buffered responses /// Buffered responses
buffered_responses: BinaryHeap<OrderedBodiesResponse<B::Body>>, buffered_responses: BinaryHeap<OrderedBodiesResponse<B::Body>>,
/// Queued body responses that can be returned for insertion into the database. /// Queued body responses that can be returned for insertion into the database.
queued_bodies: Vec<BlockResponse<B::Body>>, queued_bodies: Vec<BlockResponse<alloy_consensus::Header, B::Body>>,
/// The bodies downloader metrics. /// The bodies downloader metrics.
metrics: BodyDownloaderMetrics, metrics: BodyDownloaderMetrics,
} }
@ -193,7 +193,7 @@ where
} }
/// Queues bodies and sets the latest queued block number /// Queues bodies and sets the latest queued block number
fn queue_bodies(&mut self, bodies: Vec<BlockResponse<B::Body>>) { fn queue_bodies(&mut self, bodies: Vec<BlockResponse<alloy_consensus::Header, B::Body>>) {
self.latest_queued_block_number = Some(bodies.last().expect("is not empty").block_number()); self.latest_queued_block_number = Some(bodies.last().expect("is not empty").block_number());
self.queued_bodies.extend(bodies); self.queued_bodies.extend(bodies);
self.metrics.queued_blocks.set(self.queued_bodies.len() as f64); self.metrics.queued_blocks.set(self.queued_bodies.len() as f64);
@ -210,7 +210,10 @@ where
} }
/// Adds a new response to the internal buffer /// Adds a new response to the internal buffer
fn buffer_bodies_response(&mut self, response: Vec<BlockResponse<B::Body>>) { fn buffer_bodies_response(
&mut self,
response: Vec<BlockResponse<alloy_consensus::Header, B::Body>>,
) {
// take into account capacity // take into account capacity
let size = response.iter().map(BlockResponse::size).sum::<usize>() + let size = response.iter().map(BlockResponse::size).sum::<usize>() +
response.capacity() * mem::size_of::<BlockResponse<B::Body>>(); response.capacity() * mem::size_of::<BlockResponse<B::Body>>();
@ -227,7 +230,9 @@ where
} }
/// Returns a response if it's first block number matches the next expected. /// Returns a response if it's first block number matches the next expected.
fn try_next_buffered(&mut self) -> Option<Vec<BlockResponse<B::Body>>> { fn try_next_buffered(
&mut self,
) -> Option<Vec<BlockResponse<alloy_consensus::Header, B::Body>>> {
if let Some(next) = self.buffered_responses.peek() { if let Some(next) = self.buffered_responses.peek() {
let expected = self.next_expected_block_number(); let expected = self.next_expected_block_number();
let next_block_range = next.block_range(); let next_block_range = next.block_range();
@ -253,7 +258,9 @@ where
/// Returns the next batch of block bodies that can be returned if we have enough buffered /// Returns the next batch of block bodies that can be returned if we have enough buffered
/// bodies /// bodies
fn try_split_next_batch(&mut self) -> Option<Vec<BlockResponse<B::Body>>> { fn try_split_next_batch(
&mut self,
) -> Option<Vec<BlockResponse<alloy_consensus::Header, B::Body>>> {
if self.queued_bodies.len() >= self.stream_batch_size { if self.queued_bodies.len() >= self.stream_batch_size {
let next_batch = self.queued_bodies.drain(..self.stream_batch_size).collect::<Vec<_>>(); let next_batch = self.queued_bodies.drain(..self.stream_batch_size).collect::<Vec<_>>();
self.queued_bodies.shrink_to_fit(); self.queued_bodies.shrink_to_fit();
@ -436,7 +443,7 @@ where
#[derive(Debug)] #[derive(Debug)]
struct OrderedBodiesResponse<B> { struct OrderedBodiesResponse<B> {
resp: Vec<BlockResponse<B>>, resp: Vec<BlockResponse<alloy_consensus::Header, B>>,
/// The total size of the response in bytes /// The total size of the response in bytes
size: usize, size: usize,
} }

View File

@ -21,7 +21,7 @@ impl BodyDownloader for NoopBodiesDownloader {
} }
impl Stream for NoopBodiesDownloader { impl Stream for NoopBodiesDownloader {
type Item = Result<Vec<BlockResponse<BlockBody>>, DownloadError>; type Item = Result<Vec<BlockResponse<alloy_consensus::Header, BlockBody>>, DownloadError>;
fn poll_next( fn poll_next(
self: std::pin::Pin<&mut Self>, self: std::pin::Pin<&mut Self>,

View File

@ -80,7 +80,7 @@ impl<B> Stream for BodiesRequestQueue<B>
where where
B: BodiesClient<Body: InMemorySize> + 'static, B: BodiesClient<Body: InMemorySize> + 'static,
{ {
type Item = DownloadResult<Vec<BlockResponse<B::Body>>>; type Item = DownloadResult<Vec<BlockResponse<alloy_consensus::Header, B::Body>>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.get_mut().inner.poll_next_unpin(cx) self.get_mut().inner.poll_next_unpin(cx)

View File

@ -48,7 +48,7 @@ pub(crate) struct BodiesRequestFuture<B: BodiesClient> {
// Headers to download. The collection is shrunk as responses are buffered. // Headers to download. The collection is shrunk as responses are buffered.
pending_headers: VecDeque<SealedHeader>, pending_headers: VecDeque<SealedHeader>,
/// Internal buffer for all blocks /// Internal buffer for all blocks
buffer: Vec<BlockResponse<B::Body>>, buffer: Vec<BlockResponse<alloy_consensus::Header, B::Body>>,
fut: Option<B::Output>, fut: Option<B::Output>,
/// Tracks how many bodies we requested in the last request. /// Tracks how many bodies we requested in the last request.
last_request_len: Option<usize>, last_request_len: Option<usize>,
@ -217,7 +217,7 @@ impl<B> Future for BodiesRequestFuture<B>
where where
B: BodiesClient<Body: InMemorySize> + 'static, B: BodiesClient<Body: InMemorySize> + 'static,
{ {
type Output = DownloadResult<Vec<BlockResponse<B::Body>>>; type Output = DownloadResult<Vec<BlockResponse<alloy_consensus::Header, B::Body>>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut(); let this = self.get_mut();

View File

@ -5,7 +5,7 @@ use futures::Stream;
use std::{fmt::Debug, ops::RangeInclusive}; use std::{fmt::Debug, ops::RangeInclusive};
/// Body downloader return type. /// Body downloader return type.
pub type BodyDownloaderResult<B> = DownloadResult<Vec<BlockResponse<B>>>; pub type BodyDownloaderResult<B> = DownloadResult<Vec<BlockResponse<alloy_consensus::Header, B>>>;
/// A downloader capable of fetching and yielding block bodies from block headers. /// A downloader capable of fetching and yielding block bodies from block headers.
/// ///

View File

@ -1,19 +1,22 @@
use alloy_primitives::{BlockNumber, U256}; use alloy_primitives::{BlockNumber, U256};
use reth_primitives::{BlockBody, SealedBlock, SealedHeader}; use reth_primitives::{BlockBody, SealedBlock, SealedHeader};
use reth_primitives_traits::InMemorySize; use reth_primitives_traits::{BlockHeader, InMemorySize};
/// The block response /// The block response
#[derive(PartialEq, Eq, Debug, Clone)] #[derive(PartialEq, Eq, Debug, Clone)]
pub enum BlockResponse<B = BlockBody> { pub enum BlockResponse<H = alloy_consensus::Header, B = BlockBody> {
/// Full block response (with transactions or ommers) /// Full block response (with transactions or ommers)
Full(SealedBlock<alloy_consensus::Header, B>), Full(SealedBlock<H, B>),
/// The empty block response /// The empty block response
Empty(SealedHeader), Empty(SealedHeader<H>),
} }
impl<B> BlockResponse<B> { impl<H, B> BlockResponse<H, B>
where
H: BlockHeader,
{
/// Return the reference to the response header /// Return the reference to the response header
pub const fn header(&self) -> &SealedHeader { pub const fn header(&self) -> &SealedHeader<H> {
match self { match self {
Self::Full(block) => &block.header, Self::Full(block) => &block.header,
Self::Empty(header) => header, Self::Empty(header) => header,
@ -22,14 +25,14 @@ impl<B> BlockResponse<B> {
/// Return the block number /// Return the block number
pub fn block_number(&self) -> BlockNumber { pub fn block_number(&self) -> BlockNumber {
self.header().number self.header().number()
} }
/// Return the reference to the response header /// Return the reference to the response header
pub fn difficulty(&self) -> U256 { pub fn difficulty(&self) -> U256 {
match self { match self {
Self::Full(block) => block.difficulty, Self::Full(block) => block.difficulty(),
Self::Empty(header) => header.difficulty, Self::Empty(header) => header.difficulty(),
} }
} }
@ -42,7 +45,7 @@ impl<B> BlockResponse<B> {
} }
} }
impl<B: InMemorySize> InMemorySize for BlockResponse<B> { impl<H: InMemorySize, B: InMemorySize> InMemorySize for BlockResponse<H, B> {
#[inline] #[inline]
fn size(&self) -> usize { fn size(&self) -> usize {
match self { match self {

View File

@ -67,7 +67,7 @@ impl<H: alloy_consensus::BlockHeader> SealedHeader<H> {
} }
} }
impl InMemorySize for SealedHeader { impl<H: InMemorySize> InMemorySize for SealedHeader<H> {
/// Calculates a heuristic for the in-memory size of the [`SealedHeader`]. /// Calculates a heuristic for the in-memory size of the [`SealedHeader`].
#[inline] #[inline]
fn size(&self) -> usize { fn size(&self) -> usize {

View File

@ -56,7 +56,7 @@ pub struct BodyStage<D: BodyDownloader> {
/// The body downloader. /// The body downloader.
downloader: D, downloader: D,
/// Block response buffer. /// Block response buffer.
buffer: Option<Vec<BlockResponse<D::Body>>>, buffer: Option<Vec<BlockResponse<alloy_consensus::Header, D::Body>>>,
} }
impl<D: BodyDownloader> BodyStage<D> { impl<D: BodyDownloader> BodyStage<D> {