feat: download block ranges (#3416)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Dan Cline
2023-07-06 07:33:14 -04:00
committed by GitHub
parent 09fe22f470
commit 596d32686c
7 changed files with 1092 additions and 123 deletions

View File

@ -472,6 +472,17 @@ where
None
}
/// Returns how far the local tip is from the given block. If the local tip is at the same
/// height or its block number is greater than the given block, this returns None.
#[inline]
fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
if block > local_tip {
Some(block - local_tip)
} else {
None
}
}
/// If validation fails, the response MUST contain the latest valid hash:
///
/// - The block hash of the ancestor of the invalid payload satisfying the following two
@ -602,9 +613,10 @@ where
// Terminate the sync early if it's reached the maximum user
// configured block.
if is_valid_response {
// node's fully synced, clear pending requests
self.sync.clear_full_block_requests();
// node's fully synced, clear active download requests
self.sync.clear_block_download_requests();
// check if we reached the maximum configured block
let tip_number = self.blockchain.canonical_tip().number;
if self.sync.has_reached_max_block(tip_number) {
return true
@ -1189,7 +1201,15 @@ where
// * the missing parent block num >= canonical tip num, but the number of missing blocks is
// less than the pipeline threshold
// * this case represents a potentially long range of blocks to download and execute
self.sync.download_full_block(missing_parent.hash);
if let Some(distance) =
self.distance_from_local_tip(canonical_tip_num, missing_parent.number)
{
self.sync.download_block_range(missing_parent.hash, distance)
} else {
// This happens when the missing parent is on an outdated
// sidechain
self.sync.download_full_block(missing_parent.hash);
}
}
/// Attempt to form a new canonical chain based on the current sync target.
@ -1217,7 +1237,7 @@ where
self.sync_state_updater.update_sync_state(SyncState::Idle);
// clear any active block requests
self.sync.clear_full_block_requests();
self.sync.clear_block_download_requests();
}
Err(err) => {
// if we failed to make the FCU's head canonical, because we don't have that

View File

@ -5,14 +5,15 @@ use futures::FutureExt;
use reth_db::database::Database;
use reth_interfaces::p2p::{
bodies::client::BodiesClient,
full_block::{FetchFullBlockFuture, FullBlockClient},
full_block::{FetchFullBlockFuture, FetchFullBlockRangeFuture, FullBlockClient},
headers::client::HeadersClient,
};
use reth_primitives::{BlockNumber, SealedBlock, H256};
use reth_stages::{ControlFlow, Pipeline, PipelineError, PipelineWithResult};
use reth_tasks::TaskSpawner;
use std::{
collections::VecDeque,
cmp::{Ordering, Reverse},
collections::BinaryHeap,
task::{ready, Context, Poll},
};
use tokio::sync::oneshot;
@ -39,10 +40,13 @@ where
pipeline_state: PipelineState<DB>,
/// Pending target block for the pipeline to sync
pending_pipeline_target: Option<H256>,
/// In requests in progress.
/// In-flight full block requests in progress.
inflight_full_block_requests: Vec<FetchFullBlockFuture<Client>>,
/// Buffered events until the manager is polled and the pipeline is idle.
queued_events: VecDeque<EngineSyncEvent>,
/// In-flight full block _range_ requests in progress.
inflight_block_range_requests: Vec<FetchFullBlockRangeFuture<Client>>,
/// Buffered blocks from downloads - this is a min-heap of blocks, using the block number for
/// ordering. This means the blocks will be popped from the heap with ascending block numbers.
range_buffered_blocks: BinaryHeap<Reverse<OrderedSealedBlock>>,
/// If enabled, the pipeline will be triggered continuously, as soon as it becomes idle
run_pipeline_continuously: bool,
/// Max block after which the consensus engine would terminate the sync. Used for debugging
@ -71,7 +75,8 @@ where
pipeline_state: PipelineState::Idle(Some(pipeline)),
pending_pipeline_target: None,
inflight_full_block_requests: Vec::new(),
queued_events: VecDeque::new(),
inflight_block_range_requests: Vec::new(),
range_buffered_blocks: BinaryHeap::new(),
run_pipeline_continuously,
max_block,
metrics: EngineSyncMetrics::default(),
@ -81,6 +86,7 @@ where
/// Sets the metrics for the active downloads
fn update_block_download_metrics(&self) {
self.metrics.active_block_downloads.set(self.inflight_full_block_requests.len() as f64);
// TODO: full block range metrics
}
/// Sets the max block value for testing
@ -89,9 +95,10 @@ where
self.max_block = Some(block);
}
/// Cancels all full block requests that are in progress.
pub(crate) fn clear_full_block_requests(&mut self) {
/// Cancels all download requests that are in progress.
pub(crate) fn clear_block_download_requests(&mut self) {
self.inflight_full_block_requests.clear();
self.inflight_block_range_requests.clear();
self.update_block_download_metrics();
}
@ -127,6 +134,29 @@ where
self.inflight_full_block_requests.iter().any(|req| *req.hash() == hash)
}
/// Starts requesting a range of blocks from the network, in reverse from the given hash.
///
/// If the `count` is 1, this will use the `download_full_block` method instead, because it
/// downloads headers and bodies for the block concurrently.
pub(crate) fn download_block_range(&mut self, hash: H256, count: u64) {
if count == 1 {
self.download_full_block(hash);
} else {
trace!(
target: "consensus::engine",
?hash,
?count,
"start downloading full block range."
);
let request = self.full_block_client.get_full_block_range(hash, count);
self.inflight_block_range_requests.push(request);
}
// // TODO: need more metrics for block ranges
// self.update_block_download_metrics();
}
/// Starts requesting a full block from the network.
///
/// Returns `true` if the request was started, `false` if there's already a request for the
@ -222,7 +252,7 @@ where
// we also clear any pending full block requests because we expect them to be
// outdated (included in the range the pipeline is syncing anyway)
self.clear_full_block_requests();
self.clear_block_download_requests();
Some(EngineSyncEvent::PipelineStarted(target))
}
@ -237,37 +267,63 @@ where
return Poll::Ready(event)
}
loop {
// drain buffered events first if pipeline is not running
if self.is_pipeline_idle() {
if let Some(event) = self.queued_events.pop_front() {
return Poll::Ready(event)
}
} else {
// advance the pipeline
if let Poll::Ready(event) = self.poll_pipeline(cx) {
return Poll::Ready(event)
}
}
// advance all requests
for idx in (0..self.inflight_full_block_requests.len()).rev() {
let mut request = self.inflight_full_block_requests.swap_remove(idx);
if let Poll::Ready(block) = request.poll_unpin(cx) {
self.queued_events.push_back(EngineSyncEvent::FetchedFullBlock(block));
} else {
// still pending
self.inflight_full_block_requests.push(request);
}
}
self.update_block_download_metrics();
if !self.pipeline_state.is_idle() || self.queued_events.is_empty() {
// can not make any progress
return Poll::Pending
// make sure we poll the pipeline if it's active, and return any ready pipeline events
if !self.is_pipeline_idle() {
// advance the pipeline
if let Poll::Ready(event) = self.poll_pipeline(cx) {
return Poll::Ready(event)
}
}
// advance all full block requests
for idx in (0..self.inflight_full_block_requests.len()).rev() {
let mut request = self.inflight_full_block_requests.swap_remove(idx);
if let Poll::Ready(block) = request.poll_unpin(cx) {
trace!(target: "consensus::engine", block=?block.num_hash(), "Received single full block, buffering");
self.range_buffered_blocks.push(Reverse(OrderedSealedBlock(block)));
} else {
// still pending
self.inflight_full_block_requests.push(request);
}
}
// advance all full block range requests
for idx in (0..self.inflight_block_range_requests.len()).rev() {
let mut request = self.inflight_block_range_requests.swap_remove(idx);
if let Poll::Ready(blocks) = request.poll_unpin(cx) {
trace!(target: "consensus::engine", len=?blocks.len(), first=?blocks.first().map(|b| b.num_hash()), last=?blocks.last().map(|b| b.num_hash()), "Received full block range, buffering");
self.range_buffered_blocks
.extend(blocks.into_iter().map(OrderedSealedBlock).map(Reverse));
} else {
// still pending
self.inflight_block_range_requests.push(request);
}
}
self.update_block_download_metrics();
// drain an element of the block buffer if there are any
if let Some(block) = self.range_buffered_blocks.pop() {
return Poll::Ready(EngineSyncEvent::FetchedFullBlock(block.0 .0))
}
Poll::Pending
}
}
/// A wrapper type around [SealedBlock] that implements the [Ord] trait by block number.
#[derive(Debug, Clone, PartialEq, Eq)]
struct OrderedSealedBlock(SealedBlock);
impl PartialOrd for OrderedSealedBlock {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
self.0.number.partial_cmp(&other.0.number)
}
}
impl Ord for OrderedSealedBlock {
fn cmp(&self, other: &Self) -> Ordering {
self.0.number.cmp(&other.0.number)
}
}
@ -318,3 +374,236 @@ impl<DB: Database> PipelineState<DB> {
matches!(self, PipelineState::Idle(_))
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use futures::poll;
use reth_db::{
mdbx::{Env, WriteMap},
test_utils::create_test_rw_db,
};
use reth_interfaces::{p2p::either::EitherDownloader, test_utils::TestFullBlockClient};
use reth_primitives::{
stage::StageCheckpoint, BlockBody, ChainSpec, ChainSpecBuilder, SealedHeader, MAINNET,
};
use reth_provider::{test_utils::TestExecutorFactory, PostState};
use reth_stages::{test_utils::TestStages, ExecOutput, StageError};
use reth_tasks::TokioTaskExecutor;
use std::{collections::VecDeque, future::poll_fn, sync::Arc};
use tokio::sync::watch;
struct TestPipelineBuilder {
pipeline_exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
executor_results: Vec<PostState>,
max_block: Option<BlockNumber>,
}
impl TestPipelineBuilder {
/// Create a new [TestPipelineBuilder].
fn new() -> Self {
Self {
pipeline_exec_outputs: VecDeque::new(),
executor_results: Vec::new(),
max_block: None,
}
}
/// Set the pipeline execution outputs to use for the test consensus engine.
fn with_pipeline_exec_outputs(
mut self,
pipeline_exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
) -> Self {
self.pipeline_exec_outputs = pipeline_exec_outputs;
self
}
/// Set the executor results to use for the test consensus engine.
#[allow(dead_code)]
fn with_executor_results(mut self, executor_results: Vec<PostState>) -> Self {
self.executor_results = executor_results;
self
}
/// Sets the max block for the pipeline to run.
#[allow(dead_code)]
fn with_max_block(mut self, max_block: BlockNumber) -> Self {
self.max_block = Some(max_block);
self
}
/// Builds the pipeline.
fn build(self, chain_spec: Arc<ChainSpec>) -> Pipeline<Arc<Env<WriteMap>>> {
reth_tracing::init_test_tracing();
let db = create_test_rw_db();
let executor_factory = TestExecutorFactory::new(chain_spec.clone());
executor_factory.extend(self.executor_results);
// Setup pipeline
let (tip_tx, _tip_rx) = watch::channel(H256::default());
let mut pipeline = Pipeline::builder()
.add_stages(TestStages::new(self.pipeline_exec_outputs, Default::default()))
.with_tip_sender(tip_tx);
if let Some(max_block) = self.max_block {
pipeline = pipeline.with_max_block(max_block);
}
pipeline.build(db, chain_spec)
}
}
struct TestSyncControllerBuilder<Client> {
max_block: Option<BlockNumber>,
client: Option<Client>,
}
impl<Client> TestSyncControllerBuilder<Client> {
/// Create a new [TestSyncControllerBuilder].
fn new() -> Self {
Self { max_block: None, client: None }
}
/// Sets the max block for the pipeline to run.
#[allow(dead_code)]
fn with_max_block(mut self, max_block: BlockNumber) -> Self {
self.max_block = Some(max_block);
self
}
/// Sets the client to use for network operations.
fn with_client(mut self, client: Client) -> Self {
self.client = Some(client);
self
}
/// Builds the sync controller.
fn build<DB>(
self,
pipeline: Pipeline<DB>,
) -> EngineSyncController<DB, EitherDownloader<Client, TestFullBlockClient>>
where
DB: Database + 'static,
Client: HeadersClient + BodiesClient + Clone + Unpin + 'static,
{
let client = self
.client
.map(EitherDownloader::Left)
.unwrap_or_else(|| EitherDownloader::Right(TestFullBlockClient::default()));
EngineSyncController::new(
pipeline,
client,
Box::<TokioTaskExecutor>::default(),
// run_pipeline_continuously: false here until we want to test this
false,
self.max_block,
)
}
}
#[tokio::test]
async fn pipeline_started_after_setting_target() {
let chain_spec = Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(MAINNET.genesis.clone())
.paris_activated()
.build(),
);
let client = TestFullBlockClient::default();
let mut header = SealedHeader::default();
let body = BlockBody::default();
client.insert(header.clone(), body.clone());
for _ in 0..10 {
header.parent_hash = header.hash_slow();
header.number += 1;
header = header.header.seal_slow();
client.insert(header.clone(), body.clone());
}
// force the pipeline to be "done" after 5 blocks
let pipeline = TestPipelineBuilder::new()
.with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(5),
done: true,
})]))
.build(chain_spec);
let mut sync_controller =
TestSyncControllerBuilder::new().with_client(client.clone()).build(pipeline);
let tip = client.highest_block().expect("there should be blocks here");
sync_controller.set_pipeline_sync_target(tip.hash);
let sync_future = poll_fn(|cx| sync_controller.poll(cx));
let next_event = poll!(sync_future);
// can assert that the first event here is PipelineStarted because we set the sync target,
// and we should get Ready because the pipeline should be spawned immediately
assert_matches!(next_event, Poll::Ready(EngineSyncEvent::PipelineStarted(Some(target))) => {
assert_eq!(target, tip.hash);
});
// the next event should be the pipeline finishing in a good state
let sync_future = poll_fn(|cx| sync_controller.poll(cx));
let next_ready = sync_future.await;
assert_matches!(next_ready, EngineSyncEvent::PipelineFinished { result, reached_max_block } => {
assert_matches!(result, Ok(control_flow) => assert_eq!(control_flow, ControlFlow::Continue { block_number: 5 }));
// no max block configured
assert!(!reached_max_block);
});
}
#[tokio::test]
async fn controller_sends_range_request() {
let chain_spec = Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(MAINNET.genesis.clone())
.paris_activated()
.build(),
);
let client = TestFullBlockClient::default();
let mut header = SealedHeader::default();
let body = BlockBody::default();
for _ in 0..10 {
header.parent_hash = header.hash_slow();
header.number += 1;
header = header.header.seal_slow();
client.insert(header.clone(), body.clone());
}
// set up a pipeline
let pipeline = TestPipelineBuilder::new().build(chain_spec);
let mut sync_controller =
TestSyncControllerBuilder::new().with_client(client.clone()).build(pipeline);
let tip = client.highest_block().expect("there should be blocks here");
// call the download range method
sync_controller.download_block_range(tip.hash, tip.number);
// ensure we have one in flight range request
assert_eq!(sync_controller.inflight_block_range_requests.len(), 1);
// ensure the range request is made correctly
let first_req = sync_controller.inflight_block_range_requests.first().unwrap();
assert_eq!(first_req.start_hash(), tip.hash);
assert_eq!(first_req.count(), tip.number);
// ensure they are in ascending order
for num in 1..=10 {
let sync_future = poll_fn(|cx| sync_controller.poll(cx));
let next_ready = sync_future.await;
assert_matches!(next_ready, EngineSyncEvent::FetchedFullBlock(block) => {
assert_eq!(block.number, num);
});
}
}
}

View File

@ -6,8 +6,13 @@ use crate::{
headers::client::{HeadersClient, SingleHeaderRequest},
},
};
use reth_primitives::{BlockBody, Header, SealedBlock, SealedHeader, WithPeerId, H256};
use futures::Stream;
use reth_primitives::{
BlockBody, Header, HeadersDirection, SealedBlock, SealedHeader, WithPeerId, H256,
};
use std::{
cmp::Reverse,
collections::{HashMap, VecDeque},
fmt::Debug,
future::Future,
pin::Pin,
@ -15,6 +20,8 @@ use std::{
};
use tracing::debug;
use super::headers::client::HeadersRequest;
/// A Client that can fetch full blocks from the network.
#[derive(Debug, Clone)]
pub struct FullBlockClient<Client> {
@ -51,9 +58,51 @@ where
body: None,
}
}
/// Returns a future that fetches [SealedBlock]s for the given hash and count.
///
/// Note: this future is cancel safe
///
/// Caution: This does no validation of body (transactions) responses but guarantees that
/// the starting [SealedHeader] matches the requested hash, and that the number of headers and
/// bodies received matches the requested limit.
///
/// The returned future yields bodies in falling order, i.e. with descending block numbers.
pub fn get_full_block_range(
&self,
hash: H256,
count: u64,
) -> FetchFullBlockRangeFuture<Client> {
let client = self.client.clone();
// Optimization: if we only want one block, we don't need to wait for the headers request
// to complete, and can send the block bodies request right away.
let bodies_request =
if count == 1 { None } else { Some(client.get_block_bodies(vec![hash])) };
FetchFullBlockRangeFuture {
start_hash: hash,
count,
request: FullBlockRangeRequest {
headers: Some(client.get_headers(HeadersRequest {
start: hash.into(),
limit: count,
direction: HeadersDirection::Falling,
})),
bodies: bodies_request,
},
client,
headers: None,
pending_headers: VecDeque::new(),
bodies: HashMap::new(),
}
}
}
/// A future that downloads a full block from the network.
///
/// This will attempt to fetch both the header and body for the given block hash at the same time.
/// When both requests succeed, the future will yield the full block.
#[must_use = "futures do nothing unless polled"]
pub struct FetchFullBlockFuture<Client>
where
@ -223,6 +272,8 @@ where
}
}
/// The result of a request for a single header or body. This is yielded by the `FullBlockRequest`
/// future.
enum ResponseResult {
Header(PeerRequestResult<Option<Header>>),
Body(PeerRequestResult<Option<BlockBody>>),
@ -247,18 +298,18 @@ fn ensure_valid_body_response(
header: &SealedHeader,
block: &BlockBody,
) -> Result<(), ConsensusError> {
let ommers_hash = reth_primitives::proofs::calculate_ommers_root(&block.ommers);
if header.ommers_hash != ommers_hash {
let body_roots = block.calculate_roots();
if header.ommers_hash != body_roots.ommers_hash {
return Err(ConsensusError::BodyOmmersHashDiff {
got: ommers_hash,
got: body_roots.ommers_hash,
expected: header.ommers_hash,
})
}
let transaction_root = reth_primitives::proofs::calculate_transaction_root(&block.transactions);
if header.transactions_root != transaction_root {
if header.transactions_root != body_roots.tx_root {
return Err(ConsensusError::BodyTransactionRootDiff {
got: transaction_root,
got: body_roots.tx_root,
expected: header.transactions_root,
})
}
@ -282,82 +333,393 @@ fn ensure_valid_body_response(
Ok(())
}
/// A future that downloads a range of full blocks from the network.
///
/// This first fetches the headers for the given range using the inner `Client`. Once the request
/// is complete, it will fetch the bodies for the headers it received.
///
/// Once the bodies request completes, the [SealedBlock]s will be assembled and the future will
/// yield the full block range.
///
/// The full block range will be returned with falling block numbers, i.e. in descending order.
///
/// NOTE: this assumes that bodies responses are returned by the client in the same order as the
/// hash array used to request them.
#[must_use = "futures do nothing unless polled"]
pub struct FetchFullBlockRangeFuture<Client>
where
Client: BodiesClient + HeadersClient,
{
/// The client used to fetch headers and bodies.
client: Client,
/// The block hash to start fetching from (inclusive).
start_hash: H256,
/// How many blocks to fetch: `len([start_hash, ..]) == count`
count: u64,
/// Requests for headers and bodies that are in progress.
request: FullBlockRangeRequest<Client>,
/// Fetched headers.
headers: Option<Vec<SealedHeader>>,
/// The next headers to request bodies for. This is drained as responses are received.
pending_headers: VecDeque<SealedHeader>,
/// The bodies that have been received so far.
bodies: HashMap<SealedHeader, BodyResponse>,
}
impl<Client> FetchFullBlockRangeFuture<Client>
where
Client: BodiesClient + HeadersClient,
{
/// Returns the block hashes for the given range, if they are available.
pub fn range_block_hashes(&self) -> Option<Vec<H256>> {
self.headers.as_ref().map(|h| h.iter().map(|h| h.hash()).collect::<Vec<_>>())
}
/// Returns whether or not the bodies map is fully populated with requested headers and bodies.
fn is_bodies_complete(&self) -> bool {
self.bodies.len() == self.count as usize
}
/// Inserts a block body, matching it with the `next_header`.
fn insert_body(&mut self, body_response: BodyResponse) {
if let Some(header) = self.pending_headers.pop_front() {
self.bodies.insert(header, body_response);
}
}
/// Inserts multiple block bodies.
fn insert_bodies(&mut self, bodies: Vec<BodyResponse>) {
for body in bodies {
self.insert_body(body);
}
}
/// Returns the remaining hashes for the bodies request, based on the headers that still exist
/// in the `root_map`.
fn remaining_bodies_hashes(&self) -> Vec<H256> {
self.pending_headers.iter().map(|h| h.hash()).collect::<Vec<_>>()
}
/// Returns the [SealedBlock]s if the request is complete and valid.
///
/// The request is complete if the number of blocks requested is equal to the number of blocks
/// received. The request is valid if the returned bodies match the roots in the headers.
///
/// These are returned in falling order starting with the requested `hash`, i.e. with
/// descending block numbers.
fn take_blocks(&mut self) -> Option<Vec<SealedBlock>> {
if !self.is_bodies_complete() {
// not done with bodies yet
return None
}
let headers = self.headers.take()?;
let mut needs_retry = false;
let mut response = Vec::new();
for header in &headers {
if let Some(body_resp) = self.bodies.remove(header) {
// validate body w.r.t. the hashes in the header, only inserting into the response
let body = match body_resp {
BodyResponse::Validated(body) => body,
BodyResponse::PendingValidation(resp) => {
// ensure the block is valid, else retry
if let Err(err) = ensure_valid_body_response(header, resp.data()) {
debug!(target: "downloaders", ?err, hash=?header.hash, "Received wrong body in range response");
self.client.report_bad_message(resp.peer_id());
// get body that doesn't match, put back into vecdeque, and just retry
self.pending_headers.push_back(header.clone());
needs_retry = true;
}
resp.into_data()
}
};
response.push(SealedBlock::new(header.clone(), body));
}
}
if needs_retry {
// put response hashes back into bodies map since we aren't returning them as a
// response
for block in response {
let (header, body) = block.split_header_body();
self.bodies.insert(header, BodyResponse::Validated(body));
}
// put headers back since they were `take`n before
self.headers = Some(headers);
// create response for failing bodies
let hashes = self.remaining_bodies_hashes();
self.request.bodies = Some(self.client.get_block_bodies(hashes));
return None
}
Some(response)
}
/// Returns whether or not a bodies request has been started, returning false if there is no
/// pending request.
fn has_bodies_request_started(&self) -> bool {
self.request.bodies.is_some()
}
/// Returns the start hash for the request
pub fn start_hash(&self) -> H256 {
self.start_hash
}
/// Returns the block count for the request
pub fn count(&self) -> u64 {
self.count
}
}
impl<Client> Future for FetchFullBlockRangeFuture<Client>
where
Client: BodiesClient + HeadersClient + Unpin + 'static,
{
type Output = Vec<SealedBlock>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
loop {
match ready!(this.request.poll(cx)) {
// This branch handles headers responses from peers - it first ensures that the
// starting hash and number of headers matches what we requested.
//
// If these don't match, we penalize the peer and retry the request.
// If they do match, we sort the headers by block number and start the request for
// the corresponding block bodies.
//
// The next result that should be yielded by `poll` is the bodies response.
RangeResponseResult::Header(res) => {
match res {
Ok(headers) => {
let (peer, mut headers) = headers
.map(|h| {
h.iter().map(|h| h.clone().seal_slow()).collect::<Vec<_>>()
})
.split();
// fill in the response if it's the correct length
if headers.len() == this.count as usize {
// sort headers from highest to lowest block number
headers.sort_unstable_by_key(|h| Reverse(h.number));
// check the starting hash
if headers[0].hash() != this.start_hash {
// received bad response
this.client.report_bad_message(peer);
} else {
// get the bodies request so it can be polled later
let hashes =
headers.iter().map(|h| h.hash()).collect::<Vec<_>>();
// populate the pending headers
this.pending_headers = headers.clone().into();
// set the actual request if it hasn't been started yet
if !this.has_bodies_request_started() {
this.request.bodies =
Some(this.client.get_block_bodies(hashes));
}
// set the headers response
this.headers = Some(headers);
}
}
}
Err(err) => {
debug!(target: "downloaders", %err, ?this.start_hash, "Header range download failed");
}
}
if this.headers.is_none() {
// did not receive a correct response yet, retry
this.request.headers = Some(this.client.get_headers(HeadersRequest {
start: this.start_hash.into(),
limit: this.count,
direction: HeadersDirection::Falling,
}));
}
}
// This branch handles block body responses from peers - it first inserts the
// bodies into the `bodies` map, and then checks if the request is complete.
//
// If the request is not complete, and we need to request more bodies, we send
// a bodies request for the headers we don't yet have bodies for.
RangeResponseResult::Body(res) => {
match res {
Ok(bodies_resp) => {
let (peer, new_bodies) = bodies_resp.split();
// first insert the received bodies
this.insert_bodies(
new_bodies
.iter()
.map(|resp| WithPeerId::new(peer, resp.clone()))
.map(BodyResponse::PendingValidation)
.collect::<Vec<_>>(),
);
if !this.is_bodies_complete() {
// get remaining hashes so we can send the next request
let req_hashes = this.remaining_bodies_hashes();
// set a new request
this.request.bodies = Some(this.client.get_block_bodies(req_hashes))
}
}
Err(err) => {
debug!(target: "downloaders", %err, ?this.start_hash, "Body range download failed");
}
}
if this.bodies.is_empty() {
// received bad response, re-request headers
// TODO: convert this into two futures, one which is a headers range
// future, and one which is a bodies range future.
//
// The headers range future should yield the bodies range future.
// The bodies range future should not have an Option<Vec<H256>>, it should
// have a populated Vec<H256> from the successful headers range future.
//
// This is optimal because we can not send a bodies request without
// first completing the headers request. This way we can get rid of the
// following `if let Some`. A bodies request should never be sent before
// the headers request completes, so this should always be `Some` anyways.
let hashes = this.remaining_bodies_hashes();
if !hashes.is_empty() {
this.request.bodies = Some(this.client.get_block_bodies(hashes));
}
}
}
}
if let Some(res) = this.take_blocks() {
return Poll::Ready(res)
}
}
}
}
/// A type that buffers the result of a range request so we can return it as a `Stream`.
struct FullBlockRangeStream<Client>
where
Client: BodiesClient + HeadersClient,
{
/// The inner [FetchFullBlockRangeFuture] that is polled.
inner: FetchFullBlockRangeFuture<Client>,
/// The blocks that have been received so far.
///
/// If this is `None` then the request is still in progress. If the vec is empty, then all of
/// the response values have been consumed.
blocks: Option<Vec<SealedBlock>>,
}
impl<Client> From<FetchFullBlockRangeFuture<Client>> for FullBlockRangeStream<Client>
where
Client: BodiesClient + HeadersClient,
{
fn from(inner: FetchFullBlockRangeFuture<Client>) -> Self {
Self { inner, blocks: None }
}
}
impl<Client> Stream for FullBlockRangeStream<Client>
where
Client: BodiesClient + HeadersClient + Unpin + 'static,
{
type Item = SealedBlock;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
// If all blocks have been consumed, then return `None`.
if let Some(blocks) = &mut this.blocks {
if blocks.is_empty() {
// Stream is finished
return Poll::Ready(None)
}
// return the next block if it's ready - the vec should be in ascending order since it
// is reversed right after it is received from the future, so we can just pop() the
// elements to return them from the stream in descending order
return Poll::Ready(blocks.pop())
}
// poll the inner future if the blocks are not yet ready
let mut blocks = ready!(Pin::new(&mut this.inner).poll(cx));
// the blocks are returned in descending order, reverse the list so we can just pop() the
// vec to yield the next block in the stream
blocks.reverse();
// pop the first block from the vec as the first stream element and store the rest
let first_result = blocks.pop();
// if the inner future is ready, then we can return the blocks
this.blocks = Some(blocks);
// return the first block
Poll::Ready(first_result)
}
}
/// A request for a range of full blocks. Polling this will poll the inner headers and bodies
/// futures until they return responses. It will return either the header or body result, depending
/// on which future successfully returned.
struct FullBlockRangeRequest<Client>
where
Client: BodiesClient + HeadersClient,
{
headers: Option<<Client as HeadersClient>::Output>,
bodies: Option<<Client as BodiesClient>::Output>,
}
impl<Client> FullBlockRangeRequest<Client>
where
Client: BodiesClient + HeadersClient,
{
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<RangeResponseResult> {
if let Some(fut) = Pin::new(&mut self.headers).as_pin_mut() {
if let Poll::Ready(res) = fut.poll(cx) {
self.headers = None;
return Poll::Ready(RangeResponseResult::Header(res))
}
}
if let Some(fut) = Pin::new(&mut self.bodies).as_pin_mut() {
if let Poll::Ready(res) = fut.poll(cx) {
self.bodies = None;
return Poll::Ready(RangeResponseResult::Body(res))
}
}
Poll::Pending
}
}
// The result of a request for headers or block bodies. This is yielded by the
// `FullBlockRangeRequest` future.
enum RangeResponseResult {
Header(PeerRequestResult<Vec<Header>>),
Body(PeerRequestResult<Vec<BlockBody>>),
}
#[cfg(test)]
mod tests {
use super::*;
use crate::p2p::{
download::DownloadClient, headers::client::HeadersRequest, priority::Priority,
};
use parking_lot::Mutex;
use reth_primitives::{BlockHashOrNumber, PeerId, WithPeerId};
use std::{collections::HashMap, sync::Arc};
#[derive(Clone, Default, Debug)]
struct TestSingleFullBlockClient {
headers: Arc<Mutex<HashMap<H256, Header>>>,
bodies: Arc<Mutex<HashMap<H256, BlockBody>>>,
}
impl TestSingleFullBlockClient {
fn insert(&self, header: SealedHeader, body: BlockBody) {
let hash = header.hash();
let header = header.unseal();
self.headers.lock().insert(hash, header);
self.bodies.lock().insert(hash, body);
}
}
impl DownloadClient for TestSingleFullBlockClient {
fn report_bad_message(&self, _peer_id: PeerId) {}
fn num_connected_peers(&self) -> usize {
1
}
}
impl HeadersClient for TestSingleFullBlockClient {
type Output = futures::future::Ready<PeerRequestResult<Vec<Header>>>;
fn get_headers_with_priority(
&self,
request: HeadersRequest,
_priority: Priority,
) -> Self::Output {
let headers = self.headers.lock();
let resp = match request.start {
BlockHashOrNumber::Hash(hash) => headers.get(&hash).cloned(),
BlockHashOrNumber::Number(num) => {
headers.values().find(|h| h.number == num).cloned()
}
}
.map(|h| vec![h])
.unwrap_or_default();
futures::future::ready(Ok(WithPeerId::new(PeerId::random(), resp)))
}
}
impl BodiesClient for TestSingleFullBlockClient {
type Output = futures::future::Ready<PeerRequestResult<Vec<BlockBody>>>;
fn get_block_bodies_with_priority(
&self,
hashes: Vec<H256>,
_priority: Priority,
) -> Self::Output {
let bodies = self.bodies.lock();
let mut all_bodies = Vec::new();
for hash in hashes {
if let Some(body) = bodies.get(&hash) {
all_bodies.push(body.clone());
}
}
futures::future::ready(Ok(WithPeerId::new(PeerId::random(), all_bodies)))
}
}
use crate::test_utils::TestFullBlockClient;
use futures::StreamExt;
#[tokio::test]
async fn download_single_full_block() {
let client = TestSingleFullBlockClient::default();
let client = TestFullBlockClient::default();
let header = SealedHeader::default();
let body = BlockBody::default();
client.insert(header.clone(), body.clone());
@ -366,4 +728,115 @@ mod tests {
let received = client.get_full_block(header.hash()).await;
assert_eq!(received, SealedBlock::new(header, body));
}
#[tokio::test]
async fn download_single_full_block_range() {
let client = TestFullBlockClient::default();
let header = SealedHeader::default();
let body = BlockBody::default();
client.insert(header.clone(), body.clone());
let client = FullBlockClient::new(client);
let received = client.get_full_block_range(header.hash(), 1).await;
let received = received.first().expect("response should include a block");
assert_eq!(*received, SealedBlock::new(header, body));
}
#[tokio::test]
async fn download_full_block_range() {
let client = TestFullBlockClient::default();
let mut header = SealedHeader::default();
let body = BlockBody::default();
client.insert(header.clone(), body.clone());
for _ in 0..10 {
header.parent_hash = header.hash_slow();
header.number += 1;
header = header.header.seal_slow();
client.insert(header.clone(), body.clone());
}
let client = FullBlockClient::new(client);
let received = client.get_full_block_range(header.hash(), 1).await;
let received = received.first().expect("response should include a block");
assert_eq!(*received, SealedBlock::new(header.clone(), body));
let received = client.get_full_block_range(header.hash(), 10).await;
assert_eq!(received.len(), 10);
for (i, block) in received.iter().enumerate() {
let expected_number = header.number - i as u64;
assert_eq!(block.header.number, expected_number);
}
}
#[tokio::test]
async fn download_full_block_range_stream() {
let client = TestFullBlockClient::default();
let mut header = SealedHeader::default();
let body = BlockBody::default();
client.insert(header.clone(), body.clone());
for _ in 0..10 {
header.parent_hash = header.hash_slow();
header.number += 1;
header = header.header.seal_slow();
client.insert(header.clone(), body.clone());
}
let client = FullBlockClient::new(client);
let future = client.get_full_block_range(header.hash(), 1);
let mut stream = FullBlockRangeStream::from(future);
// ensure only block in the stream is the one we requested
let received = stream.next().await.expect("response should not be None");
assert_eq!(received, SealedBlock::new(header.clone(), body.clone()));
// stream should be done now
assert_eq!(stream.next().await, None);
// there are 11 total blocks
let future = client.get_full_block_range(header.hash(), 11);
let mut stream = FullBlockRangeStream::from(future);
// check first header
let received = stream.next().await.expect("response should not be None");
let mut curr_number = received.number;
assert_eq!(received, SealedBlock::new(header.clone(), body.clone()));
// check the rest of the headers
for _ in 0..10 {
let received = stream.next().await.expect("response should not be None");
assert_eq!(received.number, curr_number - 1);
curr_number = received.number;
}
// ensure stream is done
let received = stream.next().await;
assert!(received.is_none());
}
#[tokio::test]
async fn download_full_block_range_over_soft_limit() {
// default soft limit is 20, so we will request 50 blocks
let client = TestFullBlockClient::default();
let mut header = SealedHeader::default();
let body = BlockBody::default();
client.insert(header.clone(), body.clone());
for _ in 0..50 {
header.parent_hash = header.hash_slow();
header.number += 1;
header = header.header.seal_slow();
client.insert(header.clone(), body.clone());
}
let client = FullBlockClient::new(client);
let received = client.get_full_block_range(header.hash(), 1).await;
let received = received.first().expect("response should include a block");
assert_eq!(*received, SealedBlock::new(header.clone(), body));
let received = client.get_full_block_range(header.hash(), 50).await;
assert_eq!(received.len(), 50);
for (i, block) in received.iter().enumerate() {
let expected_number = header.number - i as u64;
assert_eq!(block.header.number, expected_number);
}
}
}

View File

@ -5,7 +5,12 @@ use crate::p2p::{
headers::client::{HeadersClient, HeadersRequest},
priority::Priority,
};
use reth_primitives::{BlockBody, Header, PeerId, WithPeerId, H256};
use parking_lot::Mutex;
use reth_primitives::{
BlockBody, BlockHashOrNumber, BlockNumHash, Header, HeadersDirection, PeerId, SealedBlock,
SealedHeader, WithPeerId, H256,
};
use std::{collections::HashMap, sync::Arc};
/// A headers+bodies client implementation that does nothing.
#[derive(Debug, Default, Clone)]
@ -43,3 +48,119 @@ impl HeadersClient for NoopFullBlockClient {
futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![])))
}
}
/// A headers+bodies client that stores the headers and bodies in memory, with an artificial soft
/// bodies response limit that is set to 20 by default.
///
/// This full block client can be [Clone]d and shared between multiple tasks.
#[derive(Clone, Debug)]
pub struct TestFullBlockClient {
headers: Arc<Mutex<HashMap<H256, Header>>>,
bodies: Arc<Mutex<HashMap<H256, BlockBody>>>,
// soft response limit, max number of bodies to respond with
soft_limit: usize,
}
impl Default for TestFullBlockClient {
fn default() -> Self {
Self {
headers: Arc::new(Mutex::new(HashMap::new())),
bodies: Arc::new(Mutex::new(HashMap::new())),
soft_limit: 20,
}
}
}
impl TestFullBlockClient {
/// Insert a header and body into the client maps.
pub fn insert(&self, header: SealedHeader, body: BlockBody) {
let hash = header.hash();
let header = header.unseal();
self.headers.lock().insert(hash, header);
self.bodies.lock().insert(hash, body);
}
/// Set the soft response limit.
pub fn set_soft_limit(&mut self, limit: usize) {
self.soft_limit = limit;
}
/// Get the block with the highest block number.
pub fn highest_block(&self) -> Option<SealedBlock> {
let headers = self.headers.lock();
let (hash, header) = headers.iter().max_by_key(|(hash, header)| header.number)?;
let bodies = self.bodies.lock();
let body = bodies.get(hash)?;
Some(SealedBlock::new(header.clone().seal(*hash), body.clone()))
}
}
impl DownloadClient for TestFullBlockClient {
fn report_bad_message(&self, _peer_id: PeerId) {}
fn num_connected_peers(&self) -> usize {
1
}
}
impl HeadersClient for TestFullBlockClient {
type Output = futures::future::Ready<PeerRequestResult<Vec<Header>>>;
fn get_headers_with_priority(
&self,
request: HeadersRequest,
_priority: Priority,
) -> Self::Output {
let headers = self.headers.lock();
let mut block: BlockHashOrNumber = match request.start {
BlockHashOrNumber::Hash(hash) => headers.get(&hash).cloned(),
BlockHashOrNumber::Number(num) => headers.values().find(|h| h.number == num).cloned(),
}
.map(|h| h.number.into())
.unwrap();
let mut resp = Vec::new();
for _ in 0..request.limit {
// fetch from storage
if let Some((_, header)) = headers.iter().find(|(hash, header)| {
BlockNumHash::new(header.number, **hash).matches_block_or_num(&block)
}) {
match request.direction {
HeadersDirection::Falling => block = header.parent_hash.into(),
HeadersDirection::Rising => {
let next = header.number + 1;
block = next.into()
}
}
resp.push(header.clone());
} else {
break
}
}
futures::future::ready(Ok(WithPeerId::new(PeerId::random(), resp)))
}
}
impl BodiesClient for TestFullBlockClient {
type Output = futures::future::Ready<PeerRequestResult<Vec<BlockBody>>>;
fn get_block_bodies_with_priority(
&self,
hashes: Vec<H256>,
_priority: Priority,
) -> Self::Output {
let bodies = self.bodies.lock();
let mut all_bodies = Vec::new();
for hash in hashes {
if let Some(body) = bodies.get(&hash) {
all_bodies.push(body.clone());
}
if all_bodies.len() == self.soft_limit {
break
}
}
futures::future::ready(Ok(WithPeerId::new(PeerId::random(), all_bodies)))
}
}

View File

@ -139,6 +139,18 @@ impl SealedBlock {
(self.header, self.body, self.ommers)
}
/// Splits the [BlockBody] and [SealedHeader] into separate components
pub fn split_header_body(self) -> (SealedHeader, BlockBody) {
(
self.header,
BlockBody {
transactions: self.body,
ommers: self.ommers,
withdrawals: self.withdrawals,
},
)
}
/// Expensive operation that recovers transaction signer. See [SealedBlockWithSenders].
pub fn senders(&self) -> Option<Vec<Address>> {
self.body.iter().map(|tx| tx.recover_signer()).collect::<Option<Vec<Address>>>()
@ -715,6 +727,14 @@ impl BlockNumHash {
pub fn into_components(self) -> (BlockNumber, BlockHash) {
(self.number, self.hash)
}
/// Returns whether or not the block matches the given [BlockHashOrNumber].
pub fn matches_block_or_num(&self, block: &BlockHashOrNumber) -> bool {
match block {
BlockHashOrNumber::Hash(hash) => self.hash == *hash,
BlockHashOrNumber::Number(number) => self.number == *number,
}
}
}
impl From<(BlockNumber, BlockHash)> for BlockNumHash {
@ -774,6 +794,43 @@ impl BlockBody {
withdrawals: self.withdrawals.clone(),
}
}
/// Calculate the transaction root for the block body.
pub fn calculate_tx_root(&self) -> H256 {
crate::proofs::calculate_transaction_root(&self.transactions)
}
/// Calculate the ommers root for the block body.
pub fn calculate_ommers_root(&self) -> H256 {
crate::proofs::calculate_ommers_root(&self.ommers)
}
/// Calculate the withdrawals root for the block body, if withdrawals exist. If there are no
/// withdrawals, this will return `None`.
pub fn calculate_withdrawals_root(&self) -> Option<H256> {
self.withdrawals.as_ref().map(|w| crate::proofs::calculate_withdrawals_root(w))
}
/// Calculate all roots (transaction, ommers, withdrawals) for the block body.
pub fn calculate_roots(&self) -> BlockBodyRoots {
BlockBodyRoots {
tx_root: self.calculate_tx_root(),
ommers_hash: self.calculate_ommers_root(),
withdrawals_root: self.calculate_withdrawals_root(),
}
}
}
/// A struct that represents roots associated with a block body. This can be used to correlate
/// block body responses with headers.
#[derive(Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize, Hash)]
pub struct BlockBodyRoots {
/// The transaction root for the block body.
pub tx_root: H256,
/// The ommers hash for the block body.
pub ommers_hash: H256,
/// The withdrawals root for the block body, if withdrawals exist.
pub withdrawals_root: Option<H256>,
}
#[cfg(test)]

View File

@ -2,7 +2,7 @@ use crate::{
basefee::calculate_next_block_base_fee,
keccak256,
proofs::{EMPTY_LIST_HASH, EMPTY_ROOT},
BlockHash, BlockNumHash, BlockNumber, Bloom, Bytes, H160, H256, H64, U256,
BlockBodyRoots, BlockHash, BlockNumHash, BlockNumber, Bloom, Bytes, H160, H256, H64, U256,
};
use bytes::{Buf, BufMut, BytesMut};
@ -151,6 +151,15 @@ impl Header {
self.transactions_root == EMPTY_ROOT
}
/// Converts all roots in the header to a [BlockBodyRoots] struct.
pub fn body_roots(&self) -> BlockBodyRoots {
BlockBodyRoots {
tx_root: self.transactions_root,
ommers_hash: self.ommers_hash,
withdrawals_root: self.withdrawals_root,
}
}
/// Calculate base fee for next block according to the EIP-1559 spec.
///
/// Returns a `None` if no base fee is set, no EIP-1559 support

View File

@ -54,8 +54,8 @@ pub mod proofs;
pub use account::{Account, Bytecode};
pub use bits::H512;
pub use block::{
Block, BlockBody, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag, BlockWithSenders,
ForkBlock, SealedBlock, SealedBlockWithSenders,
Block, BlockBody, BlockBodyRoots, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag,
BlockWithSenders, ForkBlock, SealedBlock, SealedBlockWithSenders,
};
pub use bloom::Bloom;
pub use chain::{