From 596d32686cad08ec872d677702521483de83573c Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Thu, 6 Jul 2023 07:33:14 -0400 Subject: [PATCH] feat: download block ranges (#3416) Co-authored-by: Matthias Seitz --- crates/consensus/beacon/src/engine/mod.rs | 28 +- crates/consensus/beacon/src/engine/sync.rs | 365 ++++++++-- crates/interfaces/src/p2p/full_block.rs | 627 +++++++++++++++--- .../interfaces/src/test_utils/full_block.rs | 123 +++- crates/primitives/src/block.rs | 57 ++ crates/primitives/src/header.rs | 11 +- crates/primitives/src/lib.rs | 4 +- 7 files changed, 1092 insertions(+), 123 deletions(-) diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 80c007141..4cc52939e 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -472,6 +472,17 @@ where None } + /// Returns how far the local tip is from the given block. If the local tip is at the same + /// height or its block number is greater than the given block, this returns None. + #[inline] + fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option { + if block > local_tip { + Some(block - local_tip) + } else { + None + } + } + /// If validation fails, the response MUST contain the latest valid hash: /// /// - The block hash of the ancestor of the invalid payload satisfying the following two @@ -602,9 +613,10 @@ where // Terminate the sync early if it's reached the maximum user // configured block. if is_valid_response { - // node's fully synced, clear pending requests - self.sync.clear_full_block_requests(); + // node's fully synced, clear active download requests + self.sync.clear_block_download_requests(); + // check if we reached the maximum configured block let tip_number = self.blockchain.canonical_tip().number; if self.sync.has_reached_max_block(tip_number) { return true @@ -1189,7 +1201,15 @@ where // * the missing parent block num >= canonical tip num, but the number of missing blocks is // less than the pipeline threshold // * this case represents a potentially long range of blocks to download and execute - self.sync.download_full_block(missing_parent.hash); + if let Some(distance) = + self.distance_from_local_tip(canonical_tip_num, missing_parent.number) + { + self.sync.download_block_range(missing_parent.hash, distance) + } else { + // This happens when the missing parent is on an outdated + // sidechain + self.sync.download_full_block(missing_parent.hash); + } } /// Attempt to form a new canonical chain based on the current sync target. @@ -1217,7 +1237,7 @@ where self.sync_state_updater.update_sync_state(SyncState::Idle); // clear any active block requests - self.sync.clear_full_block_requests(); + self.sync.clear_block_download_requests(); } Err(err) => { // if we failed to make the FCU's head canonical, because we don't have that diff --git a/crates/consensus/beacon/src/engine/sync.rs b/crates/consensus/beacon/src/engine/sync.rs index 3e9aa91b6..73bb85dbb 100644 --- a/crates/consensus/beacon/src/engine/sync.rs +++ b/crates/consensus/beacon/src/engine/sync.rs @@ -5,14 +5,15 @@ use futures::FutureExt; use reth_db::database::Database; use reth_interfaces::p2p::{ bodies::client::BodiesClient, - full_block::{FetchFullBlockFuture, FullBlockClient}, + full_block::{FetchFullBlockFuture, FetchFullBlockRangeFuture, FullBlockClient}, headers::client::HeadersClient, }; use reth_primitives::{BlockNumber, SealedBlock, H256}; use reth_stages::{ControlFlow, Pipeline, PipelineError, PipelineWithResult}; use reth_tasks::TaskSpawner; use std::{ - collections::VecDeque, + cmp::{Ordering, Reverse}, + collections::BinaryHeap, task::{ready, Context, Poll}, }; use tokio::sync::oneshot; @@ -39,10 +40,13 @@ where pipeline_state: PipelineState, /// Pending target block for the pipeline to sync pending_pipeline_target: Option, - /// In requests in progress. + /// In-flight full block requests in progress. inflight_full_block_requests: Vec>, - /// Buffered events until the manager is polled and the pipeline is idle. - queued_events: VecDeque, + /// In-flight full block _range_ requests in progress. + inflight_block_range_requests: Vec>, + /// Buffered blocks from downloads - this is a min-heap of blocks, using the block number for + /// ordering. This means the blocks will be popped from the heap with ascending block numbers. + range_buffered_blocks: BinaryHeap>, /// If enabled, the pipeline will be triggered continuously, as soon as it becomes idle run_pipeline_continuously: bool, /// Max block after which the consensus engine would terminate the sync. Used for debugging @@ -71,7 +75,8 @@ where pipeline_state: PipelineState::Idle(Some(pipeline)), pending_pipeline_target: None, inflight_full_block_requests: Vec::new(), - queued_events: VecDeque::new(), + inflight_block_range_requests: Vec::new(), + range_buffered_blocks: BinaryHeap::new(), run_pipeline_continuously, max_block, metrics: EngineSyncMetrics::default(), @@ -81,6 +86,7 @@ where /// Sets the metrics for the active downloads fn update_block_download_metrics(&self) { self.metrics.active_block_downloads.set(self.inflight_full_block_requests.len() as f64); + // TODO: full block range metrics } /// Sets the max block value for testing @@ -89,9 +95,10 @@ where self.max_block = Some(block); } - /// Cancels all full block requests that are in progress. - pub(crate) fn clear_full_block_requests(&mut self) { + /// Cancels all download requests that are in progress. + pub(crate) fn clear_block_download_requests(&mut self) { self.inflight_full_block_requests.clear(); + self.inflight_block_range_requests.clear(); self.update_block_download_metrics(); } @@ -127,6 +134,29 @@ where self.inflight_full_block_requests.iter().any(|req| *req.hash() == hash) } + /// Starts requesting a range of blocks from the network, in reverse from the given hash. + /// + /// If the `count` is 1, this will use the `download_full_block` method instead, because it + /// downloads headers and bodies for the block concurrently. + pub(crate) fn download_block_range(&mut self, hash: H256, count: u64) { + if count == 1 { + self.download_full_block(hash); + } else { + trace!( + target: "consensus::engine", + ?hash, + ?count, + "start downloading full block range." + ); + + let request = self.full_block_client.get_full_block_range(hash, count); + self.inflight_block_range_requests.push(request); + } + + // // TODO: need more metrics for block ranges + // self.update_block_download_metrics(); + } + /// Starts requesting a full block from the network. /// /// Returns `true` if the request was started, `false` if there's already a request for the @@ -222,7 +252,7 @@ where // we also clear any pending full block requests because we expect them to be // outdated (included in the range the pipeline is syncing anyway) - self.clear_full_block_requests(); + self.clear_block_download_requests(); Some(EngineSyncEvent::PipelineStarted(target)) } @@ -237,37 +267,63 @@ where return Poll::Ready(event) } - loop { - // drain buffered events first if pipeline is not running - if self.is_pipeline_idle() { - if let Some(event) = self.queued_events.pop_front() { - return Poll::Ready(event) - } - } else { - // advance the pipeline - if let Poll::Ready(event) = self.poll_pipeline(cx) { - return Poll::Ready(event) - } - } - - // advance all requests - for idx in (0..self.inflight_full_block_requests.len()).rev() { - let mut request = self.inflight_full_block_requests.swap_remove(idx); - if let Poll::Ready(block) = request.poll_unpin(cx) { - self.queued_events.push_back(EngineSyncEvent::FetchedFullBlock(block)); - } else { - // still pending - self.inflight_full_block_requests.push(request); - } - } - - self.update_block_download_metrics(); - - if !self.pipeline_state.is_idle() || self.queued_events.is_empty() { - // can not make any progress - return Poll::Pending + // make sure we poll the pipeline if it's active, and return any ready pipeline events + if !self.is_pipeline_idle() { + // advance the pipeline + if let Poll::Ready(event) = self.poll_pipeline(cx) { + return Poll::Ready(event) } } + + // advance all full block requests + for idx in (0..self.inflight_full_block_requests.len()).rev() { + let mut request = self.inflight_full_block_requests.swap_remove(idx); + if let Poll::Ready(block) = request.poll_unpin(cx) { + trace!(target: "consensus::engine", block=?block.num_hash(), "Received single full block, buffering"); + self.range_buffered_blocks.push(Reverse(OrderedSealedBlock(block))); + } else { + // still pending + self.inflight_full_block_requests.push(request); + } + } + + // advance all full block range requests + for idx in (0..self.inflight_block_range_requests.len()).rev() { + let mut request = self.inflight_block_range_requests.swap_remove(idx); + if let Poll::Ready(blocks) = request.poll_unpin(cx) { + trace!(target: "consensus::engine", len=?blocks.len(), first=?blocks.first().map(|b| b.num_hash()), last=?blocks.last().map(|b| b.num_hash()), "Received full block range, buffering"); + self.range_buffered_blocks + .extend(blocks.into_iter().map(OrderedSealedBlock).map(Reverse)); + } else { + // still pending + self.inflight_block_range_requests.push(request); + } + } + + self.update_block_download_metrics(); + + // drain an element of the block buffer if there are any + if let Some(block) = self.range_buffered_blocks.pop() { + return Poll::Ready(EngineSyncEvent::FetchedFullBlock(block.0 .0)) + } + + Poll::Pending + } +} + +/// A wrapper type around [SealedBlock] that implements the [Ord] trait by block number. +#[derive(Debug, Clone, PartialEq, Eq)] +struct OrderedSealedBlock(SealedBlock); + +impl PartialOrd for OrderedSealedBlock { + fn partial_cmp(&self, other: &Self) -> Option { + self.0.number.partial_cmp(&other.0.number) + } +} + +impl Ord for OrderedSealedBlock { + fn cmp(&self, other: &Self) -> Ordering { + self.0.number.cmp(&other.0.number) } } @@ -318,3 +374,236 @@ impl PipelineState { matches!(self, PipelineState::Idle(_)) } } + +#[cfg(test)] +mod tests { + use super::*; + use assert_matches::assert_matches; + use futures::poll; + use reth_db::{ + mdbx::{Env, WriteMap}, + test_utils::create_test_rw_db, + }; + use reth_interfaces::{p2p::either::EitherDownloader, test_utils::TestFullBlockClient}; + use reth_primitives::{ + stage::StageCheckpoint, BlockBody, ChainSpec, ChainSpecBuilder, SealedHeader, MAINNET, + }; + use reth_provider::{test_utils::TestExecutorFactory, PostState}; + use reth_stages::{test_utils::TestStages, ExecOutput, StageError}; + use reth_tasks::TokioTaskExecutor; + use std::{collections::VecDeque, future::poll_fn, sync::Arc}; + use tokio::sync::watch; + + struct TestPipelineBuilder { + pipeline_exec_outputs: VecDeque>, + executor_results: Vec, + max_block: Option, + } + + impl TestPipelineBuilder { + /// Create a new [TestPipelineBuilder]. + fn new() -> Self { + Self { + pipeline_exec_outputs: VecDeque::new(), + executor_results: Vec::new(), + max_block: None, + } + } + + /// Set the pipeline execution outputs to use for the test consensus engine. + fn with_pipeline_exec_outputs( + mut self, + pipeline_exec_outputs: VecDeque>, + ) -> Self { + self.pipeline_exec_outputs = pipeline_exec_outputs; + self + } + + /// Set the executor results to use for the test consensus engine. + #[allow(dead_code)] + fn with_executor_results(mut self, executor_results: Vec) -> Self { + self.executor_results = executor_results; + self + } + + /// Sets the max block for the pipeline to run. + #[allow(dead_code)] + fn with_max_block(mut self, max_block: BlockNumber) -> Self { + self.max_block = Some(max_block); + self + } + + /// Builds the pipeline. + fn build(self, chain_spec: Arc) -> Pipeline>> { + reth_tracing::init_test_tracing(); + let db = create_test_rw_db(); + + let executor_factory = TestExecutorFactory::new(chain_spec.clone()); + executor_factory.extend(self.executor_results); + + // Setup pipeline + let (tip_tx, _tip_rx) = watch::channel(H256::default()); + let mut pipeline = Pipeline::builder() + .add_stages(TestStages::new(self.pipeline_exec_outputs, Default::default())) + .with_tip_sender(tip_tx); + + if let Some(max_block) = self.max_block { + pipeline = pipeline.with_max_block(max_block); + } + + pipeline.build(db, chain_spec) + } + } + + struct TestSyncControllerBuilder { + max_block: Option, + client: Option, + } + + impl TestSyncControllerBuilder { + /// Create a new [TestSyncControllerBuilder]. + fn new() -> Self { + Self { max_block: None, client: None } + } + + /// Sets the max block for the pipeline to run. + #[allow(dead_code)] + fn with_max_block(mut self, max_block: BlockNumber) -> Self { + self.max_block = Some(max_block); + self + } + + /// Sets the client to use for network operations. + fn with_client(mut self, client: Client) -> Self { + self.client = Some(client); + self + } + + /// Builds the sync controller. + fn build( + self, + pipeline: Pipeline, + ) -> EngineSyncController> + where + DB: Database + 'static, + Client: HeadersClient + BodiesClient + Clone + Unpin + 'static, + { + let client = self + .client + .map(EitherDownloader::Left) + .unwrap_or_else(|| EitherDownloader::Right(TestFullBlockClient::default())); + + EngineSyncController::new( + pipeline, + client, + Box::::default(), + // run_pipeline_continuously: false here until we want to test this + false, + self.max_block, + ) + } + } + + #[tokio::test] + async fn pipeline_started_after_setting_target() { + let chain_spec = Arc::new( + ChainSpecBuilder::default() + .chain(MAINNET.chain) + .genesis(MAINNET.genesis.clone()) + .paris_activated() + .build(), + ); + + let client = TestFullBlockClient::default(); + let mut header = SealedHeader::default(); + let body = BlockBody::default(); + client.insert(header.clone(), body.clone()); + for _ in 0..10 { + header.parent_hash = header.hash_slow(); + header.number += 1; + header = header.header.seal_slow(); + client.insert(header.clone(), body.clone()); + } + + // force the pipeline to be "done" after 5 blocks + let pipeline = TestPipelineBuilder::new() + .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput { + checkpoint: StageCheckpoint::new(5), + done: true, + })])) + .build(chain_spec); + + let mut sync_controller = + TestSyncControllerBuilder::new().with_client(client.clone()).build(pipeline); + + let tip = client.highest_block().expect("there should be blocks here"); + sync_controller.set_pipeline_sync_target(tip.hash); + + let sync_future = poll_fn(|cx| sync_controller.poll(cx)); + let next_event = poll!(sync_future); + + // can assert that the first event here is PipelineStarted because we set the sync target, + // and we should get Ready because the pipeline should be spawned immediately + assert_matches!(next_event, Poll::Ready(EngineSyncEvent::PipelineStarted(Some(target))) => { + assert_eq!(target, tip.hash); + }); + + // the next event should be the pipeline finishing in a good state + let sync_future = poll_fn(|cx| sync_controller.poll(cx)); + let next_ready = sync_future.await; + assert_matches!(next_ready, EngineSyncEvent::PipelineFinished { result, reached_max_block } => { + assert_matches!(result, Ok(control_flow) => assert_eq!(control_flow, ControlFlow::Continue { block_number: 5 })); + // no max block configured + assert!(!reached_max_block); + }); + } + + #[tokio::test] + async fn controller_sends_range_request() { + let chain_spec = Arc::new( + ChainSpecBuilder::default() + .chain(MAINNET.chain) + .genesis(MAINNET.genesis.clone()) + .paris_activated() + .build(), + ); + + let client = TestFullBlockClient::default(); + let mut header = SealedHeader::default(); + let body = BlockBody::default(); + for _ in 0..10 { + header.parent_hash = header.hash_slow(); + header.number += 1; + header = header.header.seal_slow(); + client.insert(header.clone(), body.clone()); + } + + // set up a pipeline + let pipeline = TestPipelineBuilder::new().build(chain_spec); + + let mut sync_controller = + TestSyncControllerBuilder::new().with_client(client.clone()).build(pipeline); + + let tip = client.highest_block().expect("there should be blocks here"); + + // call the download range method + sync_controller.download_block_range(tip.hash, tip.number); + + // ensure we have one in flight range request + assert_eq!(sync_controller.inflight_block_range_requests.len(), 1); + + // ensure the range request is made correctly + let first_req = sync_controller.inflight_block_range_requests.first().unwrap(); + assert_eq!(first_req.start_hash(), tip.hash); + assert_eq!(first_req.count(), tip.number); + + // ensure they are in ascending order + for num in 1..=10 { + let sync_future = poll_fn(|cx| sync_controller.poll(cx)); + let next_ready = sync_future.await; + assert_matches!(next_ready, EngineSyncEvent::FetchedFullBlock(block) => { + assert_eq!(block.number, num); + }); + } + } +} diff --git a/crates/interfaces/src/p2p/full_block.rs b/crates/interfaces/src/p2p/full_block.rs index 8da7f456f..6cc4e4b61 100644 --- a/crates/interfaces/src/p2p/full_block.rs +++ b/crates/interfaces/src/p2p/full_block.rs @@ -6,8 +6,13 @@ use crate::{ headers::client::{HeadersClient, SingleHeaderRequest}, }, }; -use reth_primitives::{BlockBody, Header, SealedBlock, SealedHeader, WithPeerId, H256}; +use futures::Stream; +use reth_primitives::{ + BlockBody, Header, HeadersDirection, SealedBlock, SealedHeader, WithPeerId, H256, +}; use std::{ + cmp::Reverse, + collections::{HashMap, VecDeque}, fmt::Debug, future::Future, pin::Pin, @@ -15,6 +20,8 @@ use std::{ }; use tracing::debug; +use super::headers::client::HeadersRequest; + /// A Client that can fetch full blocks from the network. #[derive(Debug, Clone)] pub struct FullBlockClient { @@ -51,9 +58,51 @@ where body: None, } } + + /// Returns a future that fetches [SealedBlock]s for the given hash and count. + /// + /// Note: this future is cancel safe + /// + /// Caution: This does no validation of body (transactions) responses but guarantees that + /// the starting [SealedHeader] matches the requested hash, and that the number of headers and + /// bodies received matches the requested limit. + /// + /// The returned future yields bodies in falling order, i.e. with descending block numbers. + pub fn get_full_block_range( + &self, + hash: H256, + count: u64, + ) -> FetchFullBlockRangeFuture { + let client = self.client.clone(); + + // Optimization: if we only want one block, we don't need to wait for the headers request + // to complete, and can send the block bodies request right away. + let bodies_request = + if count == 1 { None } else { Some(client.get_block_bodies(vec![hash])) }; + + FetchFullBlockRangeFuture { + start_hash: hash, + count, + request: FullBlockRangeRequest { + headers: Some(client.get_headers(HeadersRequest { + start: hash.into(), + limit: count, + direction: HeadersDirection::Falling, + })), + bodies: bodies_request, + }, + client, + headers: None, + pending_headers: VecDeque::new(), + bodies: HashMap::new(), + } + } } /// A future that downloads a full block from the network. +/// +/// This will attempt to fetch both the header and body for the given block hash at the same time. +/// When both requests succeed, the future will yield the full block. #[must_use = "futures do nothing unless polled"] pub struct FetchFullBlockFuture where @@ -223,6 +272,8 @@ where } } +/// The result of a request for a single header or body. This is yielded by the `FullBlockRequest` +/// future. enum ResponseResult { Header(PeerRequestResult>), Body(PeerRequestResult>), @@ -247,18 +298,18 @@ fn ensure_valid_body_response( header: &SealedHeader, block: &BlockBody, ) -> Result<(), ConsensusError> { - let ommers_hash = reth_primitives::proofs::calculate_ommers_root(&block.ommers); - if header.ommers_hash != ommers_hash { + let body_roots = block.calculate_roots(); + + if header.ommers_hash != body_roots.ommers_hash { return Err(ConsensusError::BodyOmmersHashDiff { - got: ommers_hash, + got: body_roots.ommers_hash, expected: header.ommers_hash, }) } - let transaction_root = reth_primitives::proofs::calculate_transaction_root(&block.transactions); - if header.transactions_root != transaction_root { + if header.transactions_root != body_roots.tx_root { return Err(ConsensusError::BodyTransactionRootDiff { - got: transaction_root, + got: body_roots.tx_root, expected: header.transactions_root, }) } @@ -282,82 +333,393 @@ fn ensure_valid_body_response( Ok(()) } +/// A future that downloads a range of full blocks from the network. +/// +/// This first fetches the headers for the given range using the inner `Client`. Once the request +/// is complete, it will fetch the bodies for the headers it received. +/// +/// Once the bodies request completes, the [SealedBlock]s will be assembled and the future will +/// yield the full block range. +/// +/// The full block range will be returned with falling block numbers, i.e. in descending order. +/// +/// NOTE: this assumes that bodies responses are returned by the client in the same order as the +/// hash array used to request them. +#[must_use = "futures do nothing unless polled"] +pub struct FetchFullBlockRangeFuture +where + Client: BodiesClient + HeadersClient, +{ + /// The client used to fetch headers and bodies. + client: Client, + /// The block hash to start fetching from (inclusive). + start_hash: H256, + /// How many blocks to fetch: `len([start_hash, ..]) == count` + count: u64, + /// Requests for headers and bodies that are in progress. + request: FullBlockRangeRequest, + /// Fetched headers. + headers: Option>, + /// The next headers to request bodies for. This is drained as responses are received. + pending_headers: VecDeque, + /// The bodies that have been received so far. + bodies: HashMap, +} + +impl FetchFullBlockRangeFuture +where + Client: BodiesClient + HeadersClient, +{ + /// Returns the block hashes for the given range, if they are available. + pub fn range_block_hashes(&self) -> Option> { + self.headers.as_ref().map(|h| h.iter().map(|h| h.hash()).collect::>()) + } + + /// Returns whether or not the bodies map is fully populated with requested headers and bodies. + fn is_bodies_complete(&self) -> bool { + self.bodies.len() == self.count as usize + } + + /// Inserts a block body, matching it with the `next_header`. + fn insert_body(&mut self, body_response: BodyResponse) { + if let Some(header) = self.pending_headers.pop_front() { + self.bodies.insert(header, body_response); + } + } + + /// Inserts multiple block bodies. + fn insert_bodies(&mut self, bodies: Vec) { + for body in bodies { + self.insert_body(body); + } + } + + /// Returns the remaining hashes for the bodies request, based on the headers that still exist + /// in the `root_map`. + fn remaining_bodies_hashes(&self) -> Vec { + self.pending_headers.iter().map(|h| h.hash()).collect::>() + } + + /// Returns the [SealedBlock]s if the request is complete and valid. + /// + /// The request is complete if the number of blocks requested is equal to the number of blocks + /// received. The request is valid if the returned bodies match the roots in the headers. + /// + /// These are returned in falling order starting with the requested `hash`, i.e. with + /// descending block numbers. + fn take_blocks(&mut self) -> Option> { + if !self.is_bodies_complete() { + // not done with bodies yet + return None + } + + let headers = self.headers.take()?; + let mut needs_retry = false; + let mut response = Vec::new(); + + for header in &headers { + if let Some(body_resp) = self.bodies.remove(header) { + // validate body w.r.t. the hashes in the header, only inserting into the response + let body = match body_resp { + BodyResponse::Validated(body) => body, + BodyResponse::PendingValidation(resp) => { + // ensure the block is valid, else retry + if let Err(err) = ensure_valid_body_response(header, resp.data()) { + debug!(target: "downloaders", ?err, hash=?header.hash, "Received wrong body in range response"); + self.client.report_bad_message(resp.peer_id()); + + // get body that doesn't match, put back into vecdeque, and just retry + self.pending_headers.push_back(header.clone()); + needs_retry = true; + } + + resp.into_data() + } + }; + + response.push(SealedBlock::new(header.clone(), body)); + } + } + + if needs_retry { + // put response hashes back into bodies map since we aren't returning them as a + // response + for block in response { + let (header, body) = block.split_header_body(); + self.bodies.insert(header, BodyResponse::Validated(body)); + } + + // put headers back since they were `take`n before + self.headers = Some(headers); + + // create response for failing bodies + let hashes = self.remaining_bodies_hashes(); + self.request.bodies = Some(self.client.get_block_bodies(hashes)); + return None + } + + Some(response) + } + + /// Returns whether or not a bodies request has been started, returning false if there is no + /// pending request. + fn has_bodies_request_started(&self) -> bool { + self.request.bodies.is_some() + } + + /// Returns the start hash for the request + pub fn start_hash(&self) -> H256 { + self.start_hash + } + + /// Returns the block count for the request + pub fn count(&self) -> u64 { + self.count + } +} + +impl Future for FetchFullBlockRangeFuture +where + Client: BodiesClient + HeadersClient + Unpin + 'static, +{ + type Output = Vec; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + loop { + match ready!(this.request.poll(cx)) { + // This branch handles headers responses from peers - it first ensures that the + // starting hash and number of headers matches what we requested. + // + // If these don't match, we penalize the peer and retry the request. + // If they do match, we sort the headers by block number and start the request for + // the corresponding block bodies. + // + // The next result that should be yielded by `poll` is the bodies response. + RangeResponseResult::Header(res) => { + match res { + Ok(headers) => { + let (peer, mut headers) = headers + .map(|h| { + h.iter().map(|h| h.clone().seal_slow()).collect::>() + }) + .split(); + + // fill in the response if it's the correct length + if headers.len() == this.count as usize { + // sort headers from highest to lowest block number + headers.sort_unstable_by_key(|h| Reverse(h.number)); + + // check the starting hash + if headers[0].hash() != this.start_hash { + // received bad response + this.client.report_bad_message(peer); + } else { + // get the bodies request so it can be polled later + let hashes = + headers.iter().map(|h| h.hash()).collect::>(); + + // populate the pending headers + this.pending_headers = headers.clone().into(); + + // set the actual request if it hasn't been started yet + if !this.has_bodies_request_started() { + this.request.bodies = + Some(this.client.get_block_bodies(hashes)); + } + + // set the headers response + this.headers = Some(headers); + } + } + } + Err(err) => { + debug!(target: "downloaders", %err, ?this.start_hash, "Header range download failed"); + } + } + + if this.headers.is_none() { + // did not receive a correct response yet, retry + this.request.headers = Some(this.client.get_headers(HeadersRequest { + start: this.start_hash.into(), + limit: this.count, + direction: HeadersDirection::Falling, + })); + } + } + // This branch handles block body responses from peers - it first inserts the + // bodies into the `bodies` map, and then checks if the request is complete. + // + // If the request is not complete, and we need to request more bodies, we send + // a bodies request for the headers we don't yet have bodies for. + RangeResponseResult::Body(res) => { + match res { + Ok(bodies_resp) => { + let (peer, new_bodies) = bodies_resp.split(); + + // first insert the received bodies + this.insert_bodies( + new_bodies + .iter() + .map(|resp| WithPeerId::new(peer, resp.clone())) + .map(BodyResponse::PendingValidation) + .collect::>(), + ); + + if !this.is_bodies_complete() { + // get remaining hashes so we can send the next request + let req_hashes = this.remaining_bodies_hashes(); + + // set a new request + this.request.bodies = Some(this.client.get_block_bodies(req_hashes)) + } + } + Err(err) => { + debug!(target: "downloaders", %err, ?this.start_hash, "Body range download failed"); + } + } + if this.bodies.is_empty() { + // received bad response, re-request headers + // TODO: convert this into two futures, one which is a headers range + // future, and one which is a bodies range future. + // + // The headers range future should yield the bodies range future. + // The bodies range future should not have an Option>, it should + // have a populated Vec from the successful headers range future. + // + // This is optimal because we can not send a bodies request without + // first completing the headers request. This way we can get rid of the + // following `if let Some`. A bodies request should never be sent before + // the headers request completes, so this should always be `Some` anyways. + let hashes = this.remaining_bodies_hashes(); + if !hashes.is_empty() { + this.request.bodies = Some(this.client.get_block_bodies(hashes)); + } + } + } + } + + if let Some(res) = this.take_blocks() { + return Poll::Ready(res) + } + } + } +} + +/// A type that buffers the result of a range request so we can return it as a `Stream`. +struct FullBlockRangeStream +where + Client: BodiesClient + HeadersClient, +{ + /// The inner [FetchFullBlockRangeFuture] that is polled. + inner: FetchFullBlockRangeFuture, + /// The blocks that have been received so far. + /// + /// If this is `None` then the request is still in progress. If the vec is empty, then all of + /// the response values have been consumed. + blocks: Option>, +} + +impl From> for FullBlockRangeStream +where + Client: BodiesClient + HeadersClient, +{ + fn from(inner: FetchFullBlockRangeFuture) -> Self { + Self { inner, blocks: None } + } +} + +impl Stream for FullBlockRangeStream +where + Client: BodiesClient + HeadersClient + Unpin + 'static, +{ + type Item = SealedBlock; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + // If all blocks have been consumed, then return `None`. + if let Some(blocks) = &mut this.blocks { + if blocks.is_empty() { + // Stream is finished + return Poll::Ready(None) + } + + // return the next block if it's ready - the vec should be in ascending order since it + // is reversed right after it is received from the future, so we can just pop() the + // elements to return them from the stream in descending order + return Poll::Ready(blocks.pop()) + } + + // poll the inner future if the blocks are not yet ready + let mut blocks = ready!(Pin::new(&mut this.inner).poll(cx)); + + // the blocks are returned in descending order, reverse the list so we can just pop() the + // vec to yield the next block in the stream + blocks.reverse(); + + // pop the first block from the vec as the first stream element and store the rest + let first_result = blocks.pop(); + + // if the inner future is ready, then we can return the blocks + this.blocks = Some(blocks); + + // return the first block + Poll::Ready(first_result) + } +} + +/// A request for a range of full blocks. Polling this will poll the inner headers and bodies +/// futures until they return responses. It will return either the header or body result, depending +/// on which future successfully returned. +struct FullBlockRangeRequest +where + Client: BodiesClient + HeadersClient, +{ + headers: Option<::Output>, + bodies: Option<::Output>, +} + +impl FullBlockRangeRequest +where + Client: BodiesClient + HeadersClient, +{ + fn poll(&mut self, cx: &mut Context<'_>) -> Poll { + if let Some(fut) = Pin::new(&mut self.headers).as_pin_mut() { + if let Poll::Ready(res) = fut.poll(cx) { + self.headers = None; + return Poll::Ready(RangeResponseResult::Header(res)) + } + } + + if let Some(fut) = Pin::new(&mut self.bodies).as_pin_mut() { + if let Poll::Ready(res) = fut.poll(cx) { + self.bodies = None; + return Poll::Ready(RangeResponseResult::Body(res)) + } + } + + Poll::Pending + } +} + +// The result of a request for headers or block bodies. This is yielded by the +// `FullBlockRangeRequest` future. +enum RangeResponseResult { + Header(PeerRequestResult>), + Body(PeerRequestResult>), +} + #[cfg(test)] mod tests { use super::*; - use crate::p2p::{ - download::DownloadClient, headers::client::HeadersRequest, priority::Priority, - }; - use parking_lot::Mutex; - use reth_primitives::{BlockHashOrNumber, PeerId, WithPeerId}; - use std::{collections::HashMap, sync::Arc}; - - #[derive(Clone, Default, Debug)] - struct TestSingleFullBlockClient { - headers: Arc>>, - bodies: Arc>>, - } - - impl TestSingleFullBlockClient { - fn insert(&self, header: SealedHeader, body: BlockBody) { - let hash = header.hash(); - let header = header.unseal(); - self.headers.lock().insert(hash, header); - self.bodies.lock().insert(hash, body); - } - } - - impl DownloadClient for TestSingleFullBlockClient { - fn report_bad_message(&self, _peer_id: PeerId) {} - - fn num_connected_peers(&self) -> usize { - 1 - } - } - - impl HeadersClient for TestSingleFullBlockClient { - type Output = futures::future::Ready>>; - - fn get_headers_with_priority( - &self, - request: HeadersRequest, - _priority: Priority, - ) -> Self::Output { - let headers = self.headers.lock(); - let resp = match request.start { - BlockHashOrNumber::Hash(hash) => headers.get(&hash).cloned(), - BlockHashOrNumber::Number(num) => { - headers.values().find(|h| h.number == num).cloned() - } - } - .map(|h| vec![h]) - .unwrap_or_default(); - futures::future::ready(Ok(WithPeerId::new(PeerId::random(), resp))) - } - } - - impl BodiesClient for TestSingleFullBlockClient { - type Output = futures::future::Ready>>; - - fn get_block_bodies_with_priority( - &self, - hashes: Vec, - _priority: Priority, - ) -> Self::Output { - let bodies = self.bodies.lock(); - let mut all_bodies = Vec::new(); - for hash in hashes { - if let Some(body) = bodies.get(&hash) { - all_bodies.push(body.clone()); - } - } - futures::future::ready(Ok(WithPeerId::new(PeerId::random(), all_bodies))) - } - } + use crate::test_utils::TestFullBlockClient; + use futures::StreamExt; #[tokio::test] async fn download_single_full_block() { - let client = TestSingleFullBlockClient::default(); + let client = TestFullBlockClient::default(); let header = SealedHeader::default(); let body = BlockBody::default(); client.insert(header.clone(), body.clone()); @@ -366,4 +728,115 @@ mod tests { let received = client.get_full_block(header.hash()).await; assert_eq!(received, SealedBlock::new(header, body)); } + + #[tokio::test] + async fn download_single_full_block_range() { + let client = TestFullBlockClient::default(); + let header = SealedHeader::default(); + let body = BlockBody::default(); + client.insert(header.clone(), body.clone()); + let client = FullBlockClient::new(client); + + let received = client.get_full_block_range(header.hash(), 1).await; + let received = received.first().expect("response should include a block"); + assert_eq!(*received, SealedBlock::new(header, body)); + } + + #[tokio::test] + async fn download_full_block_range() { + let client = TestFullBlockClient::default(); + let mut header = SealedHeader::default(); + let body = BlockBody::default(); + client.insert(header.clone(), body.clone()); + for _ in 0..10 { + header.parent_hash = header.hash_slow(); + header.number += 1; + header = header.header.seal_slow(); + client.insert(header.clone(), body.clone()); + } + let client = FullBlockClient::new(client); + + let received = client.get_full_block_range(header.hash(), 1).await; + let received = received.first().expect("response should include a block"); + assert_eq!(*received, SealedBlock::new(header.clone(), body)); + + let received = client.get_full_block_range(header.hash(), 10).await; + assert_eq!(received.len(), 10); + for (i, block) in received.iter().enumerate() { + let expected_number = header.number - i as u64; + assert_eq!(block.header.number, expected_number); + } + } + + #[tokio::test] + async fn download_full_block_range_stream() { + let client = TestFullBlockClient::default(); + let mut header = SealedHeader::default(); + let body = BlockBody::default(); + client.insert(header.clone(), body.clone()); + for _ in 0..10 { + header.parent_hash = header.hash_slow(); + header.number += 1; + header = header.header.seal_slow(); + client.insert(header.clone(), body.clone()); + } + let client = FullBlockClient::new(client); + + let future = client.get_full_block_range(header.hash(), 1); + let mut stream = FullBlockRangeStream::from(future); + + // ensure only block in the stream is the one we requested + let received = stream.next().await.expect("response should not be None"); + assert_eq!(received, SealedBlock::new(header.clone(), body.clone())); + + // stream should be done now + assert_eq!(stream.next().await, None); + + // there are 11 total blocks + let future = client.get_full_block_range(header.hash(), 11); + let mut stream = FullBlockRangeStream::from(future); + + // check first header + let received = stream.next().await.expect("response should not be None"); + let mut curr_number = received.number; + assert_eq!(received, SealedBlock::new(header.clone(), body.clone())); + + // check the rest of the headers + for _ in 0..10 { + let received = stream.next().await.expect("response should not be None"); + assert_eq!(received.number, curr_number - 1); + curr_number = received.number; + } + + // ensure stream is done + let received = stream.next().await; + assert!(received.is_none()); + } + + #[tokio::test] + async fn download_full_block_range_over_soft_limit() { + // default soft limit is 20, so we will request 50 blocks + let client = TestFullBlockClient::default(); + let mut header = SealedHeader::default(); + let body = BlockBody::default(); + client.insert(header.clone(), body.clone()); + for _ in 0..50 { + header.parent_hash = header.hash_slow(); + header.number += 1; + header = header.header.seal_slow(); + client.insert(header.clone(), body.clone()); + } + let client = FullBlockClient::new(client); + + let received = client.get_full_block_range(header.hash(), 1).await; + let received = received.first().expect("response should include a block"); + assert_eq!(*received, SealedBlock::new(header.clone(), body)); + + let received = client.get_full_block_range(header.hash(), 50).await; + assert_eq!(received.len(), 50); + for (i, block) in received.iter().enumerate() { + let expected_number = header.number - i as u64; + assert_eq!(block.header.number, expected_number); + } + } } diff --git a/crates/interfaces/src/test_utils/full_block.rs b/crates/interfaces/src/test_utils/full_block.rs index b192c6b96..9d1545a0b 100644 --- a/crates/interfaces/src/test_utils/full_block.rs +++ b/crates/interfaces/src/test_utils/full_block.rs @@ -5,7 +5,12 @@ use crate::p2p::{ headers::client::{HeadersClient, HeadersRequest}, priority::Priority, }; -use reth_primitives::{BlockBody, Header, PeerId, WithPeerId, H256}; +use parking_lot::Mutex; +use reth_primitives::{ + BlockBody, BlockHashOrNumber, BlockNumHash, Header, HeadersDirection, PeerId, SealedBlock, + SealedHeader, WithPeerId, H256, +}; +use std::{collections::HashMap, sync::Arc}; /// A headers+bodies client implementation that does nothing. #[derive(Debug, Default, Clone)] @@ -43,3 +48,119 @@ impl HeadersClient for NoopFullBlockClient { futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![]))) } } + +/// A headers+bodies client that stores the headers and bodies in memory, with an artificial soft +/// bodies response limit that is set to 20 by default. +/// +/// This full block client can be [Clone]d and shared between multiple tasks. +#[derive(Clone, Debug)] +pub struct TestFullBlockClient { + headers: Arc>>, + bodies: Arc>>, + // soft response limit, max number of bodies to respond with + soft_limit: usize, +} + +impl Default for TestFullBlockClient { + fn default() -> Self { + Self { + headers: Arc::new(Mutex::new(HashMap::new())), + bodies: Arc::new(Mutex::new(HashMap::new())), + soft_limit: 20, + } + } +} + +impl TestFullBlockClient { + /// Insert a header and body into the client maps. + pub fn insert(&self, header: SealedHeader, body: BlockBody) { + let hash = header.hash(); + let header = header.unseal(); + self.headers.lock().insert(hash, header); + self.bodies.lock().insert(hash, body); + } + + /// Set the soft response limit. + pub fn set_soft_limit(&mut self, limit: usize) { + self.soft_limit = limit; + } + + /// Get the block with the highest block number. + pub fn highest_block(&self) -> Option { + let headers = self.headers.lock(); + let (hash, header) = headers.iter().max_by_key(|(hash, header)| header.number)?; + let bodies = self.bodies.lock(); + let body = bodies.get(hash)?; + Some(SealedBlock::new(header.clone().seal(*hash), body.clone())) + } +} + +impl DownloadClient for TestFullBlockClient { + fn report_bad_message(&self, _peer_id: PeerId) {} + + fn num_connected_peers(&self) -> usize { + 1 + } +} + +impl HeadersClient for TestFullBlockClient { + type Output = futures::future::Ready>>; + + fn get_headers_with_priority( + &self, + request: HeadersRequest, + _priority: Priority, + ) -> Self::Output { + let headers = self.headers.lock(); + let mut block: BlockHashOrNumber = match request.start { + BlockHashOrNumber::Hash(hash) => headers.get(&hash).cloned(), + BlockHashOrNumber::Number(num) => headers.values().find(|h| h.number == num).cloned(), + } + .map(|h| h.number.into()) + .unwrap(); + + let mut resp = Vec::new(); + + for _ in 0..request.limit { + // fetch from storage + if let Some((_, header)) = headers.iter().find(|(hash, header)| { + BlockNumHash::new(header.number, **hash).matches_block_or_num(&block) + }) { + match request.direction { + HeadersDirection::Falling => block = header.parent_hash.into(), + HeadersDirection::Rising => { + let next = header.number + 1; + block = next.into() + } + } + resp.push(header.clone()); + } else { + break + } + } + futures::future::ready(Ok(WithPeerId::new(PeerId::random(), resp))) + } +} + +impl BodiesClient for TestFullBlockClient { + type Output = futures::future::Ready>>; + + fn get_block_bodies_with_priority( + &self, + hashes: Vec, + _priority: Priority, + ) -> Self::Output { + let bodies = self.bodies.lock(); + let mut all_bodies = Vec::new(); + for hash in hashes { + if let Some(body) = bodies.get(&hash) { + all_bodies.push(body.clone()); + } + + if all_bodies.len() == self.soft_limit { + break + } + } + futures::future::ready(Ok(WithPeerId::new(PeerId::random(), all_bodies))) + } +} diff --git a/crates/primitives/src/block.rs b/crates/primitives/src/block.rs index 22bf8e71a..6d652f92e 100644 --- a/crates/primitives/src/block.rs +++ b/crates/primitives/src/block.rs @@ -139,6 +139,18 @@ impl SealedBlock { (self.header, self.body, self.ommers) } + /// Splits the [BlockBody] and [SealedHeader] into separate components + pub fn split_header_body(self) -> (SealedHeader, BlockBody) { + ( + self.header, + BlockBody { + transactions: self.body, + ommers: self.ommers, + withdrawals: self.withdrawals, + }, + ) + } + /// Expensive operation that recovers transaction signer. See [SealedBlockWithSenders]. pub fn senders(&self) -> Option> { self.body.iter().map(|tx| tx.recover_signer()).collect::>>() @@ -715,6 +727,14 @@ impl BlockNumHash { pub fn into_components(self) -> (BlockNumber, BlockHash) { (self.number, self.hash) } + + /// Returns whether or not the block matches the given [BlockHashOrNumber]. + pub fn matches_block_or_num(&self, block: &BlockHashOrNumber) -> bool { + match block { + BlockHashOrNumber::Hash(hash) => self.hash == *hash, + BlockHashOrNumber::Number(number) => self.number == *number, + } + } } impl From<(BlockNumber, BlockHash)> for BlockNumHash { @@ -774,6 +794,43 @@ impl BlockBody { withdrawals: self.withdrawals.clone(), } } + + /// Calculate the transaction root for the block body. + pub fn calculate_tx_root(&self) -> H256 { + crate::proofs::calculate_transaction_root(&self.transactions) + } + + /// Calculate the ommers root for the block body. + pub fn calculate_ommers_root(&self) -> H256 { + crate::proofs::calculate_ommers_root(&self.ommers) + } + + /// Calculate the withdrawals root for the block body, if withdrawals exist. If there are no + /// withdrawals, this will return `None`. + pub fn calculate_withdrawals_root(&self) -> Option { + self.withdrawals.as_ref().map(|w| crate::proofs::calculate_withdrawals_root(w)) + } + + /// Calculate all roots (transaction, ommers, withdrawals) for the block body. + pub fn calculate_roots(&self) -> BlockBodyRoots { + BlockBodyRoots { + tx_root: self.calculate_tx_root(), + ommers_hash: self.calculate_ommers_root(), + withdrawals_root: self.calculate_withdrawals_root(), + } + } +} + +/// A struct that represents roots associated with a block body. This can be used to correlate +/// block body responses with headers. +#[derive(Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize, Hash)] +pub struct BlockBodyRoots { + /// The transaction root for the block body. + pub tx_root: H256, + /// The ommers hash for the block body. + pub ommers_hash: H256, + /// The withdrawals root for the block body, if withdrawals exist. + pub withdrawals_root: Option, } #[cfg(test)] diff --git a/crates/primitives/src/header.rs b/crates/primitives/src/header.rs index 5a87c6411..54b81649b 100644 --- a/crates/primitives/src/header.rs +++ b/crates/primitives/src/header.rs @@ -2,7 +2,7 @@ use crate::{ basefee::calculate_next_block_base_fee, keccak256, proofs::{EMPTY_LIST_HASH, EMPTY_ROOT}, - BlockHash, BlockNumHash, BlockNumber, Bloom, Bytes, H160, H256, H64, U256, + BlockBodyRoots, BlockHash, BlockNumHash, BlockNumber, Bloom, Bytes, H160, H256, H64, U256, }; use bytes::{Buf, BufMut, BytesMut}; @@ -151,6 +151,15 @@ impl Header { self.transactions_root == EMPTY_ROOT } + /// Converts all roots in the header to a [BlockBodyRoots] struct. + pub fn body_roots(&self) -> BlockBodyRoots { + BlockBodyRoots { + tx_root: self.transactions_root, + ommers_hash: self.ommers_hash, + withdrawals_root: self.withdrawals_root, + } + } + /// Calculate base fee for next block according to the EIP-1559 spec. /// /// Returns a `None` if no base fee is set, no EIP-1559 support diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index eb3bcf194..a76bcba90 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -54,8 +54,8 @@ pub mod proofs; pub use account::{Account, Bytecode}; pub use bits::H512; pub use block::{ - Block, BlockBody, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag, BlockWithSenders, - ForkBlock, SealedBlock, SealedBlockWithSenders, + Block, BlockBody, BlockBodyRoots, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag, + BlockWithSenders, ForkBlock, SealedBlock, SealedBlockWithSenders, }; pub use bloom::Bloom; pub use chain::{