feat(download): bodies downloader stream (#905)

Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
Roman Krasiuk
2023-01-23 20:48:03 +02:00
committed by GitHub
parent b354a17f65
commit d601895940
18 changed files with 1472 additions and 941 deletions

439
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -163,17 +163,18 @@ impl Command {
commit_threshold: config.stages.total_difficulty.commit_threshold,
})
.push(BodyStage {
downloader: Arc::new(
bodies::concurrent::ConcurrentDownloader::new(
fetch_client.clone(),
consensus.clone(),
downloader: bodies::concurrent::ConcurrentDownloaderBuilder::default()
.with_stream_batch_size(config.stages.bodies.downloader_stream_batch_size)
.with_request_limit(config.stages.bodies.downloader_request_limit)
.with_max_buffered_responses(
config.stages.bodies.downloader_max_buffered_responses,
)
.with_batch_size(config.stages.bodies.downloader_batch_size)
.with_retries(config.stages.bodies.downloader_retries)
.with_concurrency(config.stages.bodies.downloader_concurrency),
),
.with_concurrent_requests_range(
config.stages.bodies.downloader_min_concurrent_requests..=
config.stages.bodies.downloader_max_concurrent_requests,
)
.build(fetch_client.clone(), consensus.clone(), db.clone()),
consensus: consensus.clone(),
commit_threshold: config.stages.bodies.commit_threshold,
})
.push(SenderRecoveryStage {
batch_size: config.stages.sender_recovery.batch_size,

View File

@ -8,7 +8,7 @@ use crate::{
NetworkOpts,
};
use reth_consensus::BeaconConsensus;
use reth_downloaders::bodies::concurrent::ConcurrentDownloader;
use reth_downloaders::bodies::concurrent::ConcurrentDownloaderBuilder;
use reth_net_nat::NatResolver;
use reth_primitives::ChainSpec;
@ -150,14 +150,18 @@ impl Command {
let fetch_client = Arc::new(network.fetch_client().await?);
let mut stage = BodyStage {
downloader: Arc::new(
ConcurrentDownloader::new(fetch_client.clone(), consensus.clone())
.with_batch_size(config.stages.bodies.downloader_batch_size)
.with_retries(config.stages.bodies.downloader_retries)
.with_concurrency(config.stages.bodies.downloader_concurrency),
),
downloader: ConcurrentDownloaderBuilder::default()
.with_stream_batch_size(num_blocks as usize)
.with_request_limit(config.stages.bodies.downloader_request_limit)
.with_max_buffered_responses(
config.stages.bodies.downloader_max_buffered_responses,
)
.with_concurrent_requests_range(
config.stages.bodies.downloader_min_concurrent_requests..=
config.stages.bodies.downloader_max_concurrent_requests,
)
.build(fetch_client.clone(), consensus.clone(), db.clone()),
consensus: consensus.clone(),
commit_threshold: num_blocks,
};
if !self.skip_unwind {

View File

@ -1,46 +1,14 @@
use crate::p2p::downloader::{DownloadStream, Downloader};
use reth_primitives::{SealedBlock, SealedHeader};
use super::response::BlockResponse;
use crate::{db, p2p::downloader::Downloader};
use futures::Stream;
use reth_primitives::BlockNumber;
use std::ops::Range;
/// The block response
#[derive(PartialEq, Eq, Debug)]
pub enum BlockResponse {
/// Full block response (with transactions or ommers)
Full(SealedBlock),
/// The empty block response
Empty(SealedHeader),
}
impl BlockResponse {
/// Return the reference to the response header
pub fn header(&self) -> &SealedHeader {
match self {
BlockResponse::Full(block) => &block.header,
BlockResponse::Empty(header) => header,
}
}
}
/// A downloader capable of fetching block bodies from header hashes.
/// A downloader capable of fetching and yielding block bodies from block headers.
///
/// A downloader represents a distinct strategy for submitting requests to download block bodies,
/// while a [BodiesClient] represents a client capable of fulfilling these requests.
pub trait BodyDownloader: Downloader {
/// Download the bodies from `starting_block` (inclusive) up until `target_block` (inclusive).
///
/// The returned stream will always emit bodies in the order they were requested, but multiple
/// requests may be in flight at the same time.
///
/// The stream may exit early in some cases. Thus, a downloader can only at a minimum guarantee:
///
/// - All emitted bodies map onto a request
/// - The emitted bodies are emitted in order: i.e. the body for the first block is emitted
/// first, even if it was not fetched first.
///
/// It is *not* guaranteed that all the requested bodies are fetched: the downloader may close
/// the stream before the entire range has been fetched for any reason
fn bodies_stream<'a, 'b, I>(&'a self, headers: I) -> DownloadStream<'a, BlockResponse>
where
I: IntoIterator<Item = &'b SealedHeader>,
<I as IntoIterator>::IntoIter: Send + 'b,
'b: 'a;
pub trait BodyDownloader: Downloader + Stream<Item = Vec<BlockResponse>> + Unpin {
/// Method for setting the download range.
fn set_download_range(&mut self, range: Range<BlockNumber>) -> Result<(), db::Error>;
}

View File

@ -3,3 +3,6 @@ pub mod client;
/// Block body downloaders.
pub mod downloader;
/// Block response
pub mod response;

View File

@ -0,0 +1,25 @@
use reth_primitives::{BlockNumber, SealedBlock, SealedHeader};
/// The block response
#[derive(PartialEq, Eq, Debug)]
pub enum BlockResponse {
/// Full block response (with transactions or ommers)
Full(SealedBlock),
/// The empty block response
Empty(SealedHeader),
}
impl BlockResponse {
/// Return the reference to the response header
pub fn header(&self) -> &SealedHeader {
match self {
BlockResponse::Full(block) => &block.header,
BlockResponse::Empty(header) => header,
}
}
/// Return the block number
pub fn block_number(&self) -> BlockNumber {
self.header().number
}
}

View File

@ -13,6 +13,7 @@ reth-interfaces = { path = "../../interfaces" }
reth-primitives = { path = "../../primitives" }
reth-rpc-types = { path = "../rpc-types" }
reth-eth-wire = { path= "../eth-wire" }
reth-db = { path = "../../storage/db" }
# async
async-trait = "0.1.58"
@ -22,8 +23,10 @@ futures-util = "0.3.25"
# misc
backon = "0.2.0"
tracing = "0.1.37"
thiserror = "1.0"
[dev-dependencies]
reth-db = { path = "../../storage/db", features = ["test-utils"] }
reth-interfaces = { path = "../../interfaces", features = ["test-utils"] }
reth-tracing = { path = "../../tracing" }
assert_matches = "1.5.0"

File diff suppressed because it is too large Load Diff

View File

@ -1,2 +1,8 @@
/// A naive concurrent downloader.
pub mod concurrent;
mod queue;
mod request;
#[cfg(test)]
mod test_utils;

View File

@ -0,0 +1,98 @@
use super::request::BodiesRequestFuture;
use futures::{stream::FuturesUnordered, Stream};
use futures_util::StreamExt;
use reth_interfaces::{
consensus::Consensus as ConsensusTrait,
p2p::bodies::{client::BodiesClient, response::BlockResponse},
};
use reth_primitives::{BlockNumber, SealedHeader};
use std::{
collections::HashSet,
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
};
/// The wrapper around [FuturesUnordered] that keeps information
/// about the blocks currently being requested.
#[derive(Debug)]
pub(crate) struct BodiesRequestQueue<B, C> {
/// Inner body request queue.
inner: FuturesUnordered<BodiesRequestFuture<B, C>>,
/// The block numbers being requested.
block_numbers: HashSet<BlockNumber>,
/// Last requested block number.
pub(crate) last_requested_block_number: Option<BlockNumber>,
}
impl<B, C> Default for BodiesRequestQueue<B, C> {
fn default() -> Self {
Self {
inner: Default::default(),
block_numbers: Default::default(),
last_requested_block_number: None,
}
}
}
impl<B, C> BodiesRequestQueue<B, C>
where
B: BodiesClient + 'static,
C: ConsensusTrait + 'static,
{
pub(crate) fn is_empty(&self) -> bool {
self.inner.is_empty()
}
pub(crate) fn len(&self) -> usize {
self.inner.len()
}
/// Add new request to the queue.
/// Expects sorted collection of headers.
pub(crate) fn push_new_request(
&mut self,
client: Arc<B>,
consensus: Arc<C>,
request: Vec<SealedHeader>,
) {
// Set last max requested block number
self.last_requested_block_number = request
.last()
.map(|last| match self.last_requested_block_number {
Some(num) => last.number.max(num),
None => last.number,
})
.or(self.last_requested_block_number);
// Set requested block numbers
self.block_numbers.extend(request.iter().map(|h| h.number));
// Create request and push into the queue.
self.inner.push(BodiesRequestFuture::new(client, consensus).with_headers(request))
}
/// Check if the block number is currently in progress
pub(crate) fn contains_block(&self, number: BlockNumber) -> bool {
self.block_numbers.contains(&number)
}
}
impl<B, C> Stream for BodiesRequestQueue<B, C>
where
B: BodiesClient + 'static,
C: ConsensusTrait + 'static,
{
type Item = Vec<BlockResponse>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
let result = ready!(this.inner.poll_next_unpin(cx));
if let Some(ref response) = result {
response.iter().for_each(|block| {
this.block_numbers.remove(&block.block_number());
});
}
Poll::Ready(result)
}
}

View File

@ -0,0 +1,280 @@
use futures::{Future, FutureExt};
use reth_eth_wire::BlockBody;
use reth_interfaces::{
consensus::{Consensus as ConsensusTrait, Error as ConsensusError},
p2p::{
bodies::{client::BodiesClient, response::BlockResponse},
error::{PeerRequestResult, RequestError},
},
};
use reth_primitives::{PeerId, SealedBlock, SealedHeader, H256};
use std::{
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
};
use thiserror::Error;
type BodiesFut = Pin<Box<dyn Future<Output = PeerRequestResult<Vec<BlockBody>>> + Send>>;
#[derive(Error, Debug)]
enum BodyRequestError {
#[error("Received empty response")]
EmptyResponse,
#[error("Received more bodies than requested. Expected: {expected}. Received: {received}")]
TooManyBodies { expected: usize, received: usize },
#[error("Error validating body for header {hash:?}: {error}")]
Validation { hash: H256, error: ConsensusError },
#[error(transparent)]
Request(#[from] RequestError),
}
/// Body request implemented as a [Future].
///
/// The future will poll the underlying request until fullfilled.
/// If the response arrived with insufficient number of bodies, the future
/// will issue another request until all bodies are collected.
///
/// It then proceeds to verify the downloaded bodies. In case of an validation error,
/// the future will start over.
///
/// The future will filter out any empty headers (see [SealedHeader::is_empty]) from the request.
/// If [BodiesRequestFuture] was initialized with all empty headers, no request will be dispatched
/// and they will be immediately returned upon polling.
///
/// NB: This assumes that peers respond with bodies in the order that they were requested.
/// This is a reasonable assumption to make as that's [what Geth
/// does](https://github.com/ethereum/go-ethereum/blob/f53ff0ff4a68ffc56004ab1d5cc244bcb64d3277/les/server_requests.go#L245).
/// All errors regarding the response cause the peer to get penalized, meaning that adversaries
/// that try to give us bodies that do not match the requested order are going to be penalized
/// and eventually disconnected.
pub(crate) struct BodiesRequestFuture<B, C> {
client: Arc<B>,
consensus: Arc<C>,
// All requested headers
headers: Vec<SealedHeader>,
// Remaining hashes to download
hashes_to_download: Vec<H256>,
buffer: Vec<(PeerId, BlockBody)>,
fut: Option<BodiesFut>,
}
impl<B, C> BodiesRequestFuture<B, C>
where
B: BodiesClient + 'static,
C: ConsensusTrait + 'static,
{
/// Returns an empty future. Use [BodiesRequestFuture::with_headers] to set the request.
pub(crate) fn new(client: Arc<B>, consensus: Arc<C>) -> Self {
Self {
client,
consensus,
headers: Default::default(),
hashes_to_download: Default::default(),
buffer: Default::default(),
fut: None,
}
}
pub(crate) fn with_headers(mut self, headers: Vec<SealedHeader>) -> Self {
self.headers = headers;
self.reset_hashes();
// Submit the request only if there are any headers to download.
// Otherwise, the future will immediately be resolved.
if !self.hashes_to_download.is_empty() {
self.submit_request();
}
self
}
fn on_error(&mut self, error: BodyRequestError, peer_id: Option<PeerId>) {
tracing::error!(target: "downloaders::bodies", ?peer_id, %error, "Error requesting bodies");
if let Some(peer_id) = peer_id {
self.client.report_bad_message(peer_id);
}
self.submit_request();
}
fn submit_request(&mut self) {
let client = Arc::clone(&self.client);
let request = self.hashes_to_download.clone();
tracing::trace!(target: "downloaders::bodies", request_len = request.len(), "Requesting bodies");
self.fut = Some(Box::pin(async move { client.get_block_bodies(request).await }));
}
fn reset_hashes(&mut self) {
self.hashes_to_download =
self.headers.iter().filter(|h| !h.is_empty()).map(|h| h.hash()).collect();
}
/// Attempt to construct blocks from origin headers and buffered bodies.
///
/// NOTE: This method drains the buffer.
///
/// # Panics
///
/// If the number of buffered bodies does not equal the number of non empty headers.
#[allow(clippy::result_large_err)]
fn try_construct_blocks(&mut self) -> Result<Vec<BlockResponse>, (PeerId, BodyRequestError)> {
// Drop the allocated memory for the buffer. Optimistically, it will not be reused.
let mut bodies = std::mem::take(&mut self.buffer).into_iter();
let mut results = Vec::default();
for header in self.headers.iter().cloned() {
if header.is_empty() {
results.push(BlockResponse::Empty(header));
} else {
// The body must be present since we requested headers for all non-empty
// bodies at this point
let (peer_id, body) = bodies.next().expect("download logic failed");
let block = SealedBlock {
header: header.clone(),
body: body.transactions,
ommers: body.ommers.into_iter().map(|header| header.seal()).collect(),
};
// This ensures that the TxRoot and OmmersRoot from the header match the
// ones calculated manually from the block body.
self.consensus.pre_validate_block(&block).map_err(|error| {
(peer_id, BodyRequestError::Validation { hash: header.hash(), error })
})?;
results.push(BlockResponse::Full(block));
}
}
Ok(results)
}
}
impl<B, C> Future for BodiesRequestFuture<B, C>
where
B: BodiesClient + 'static,
C: ConsensusTrait + 'static,
{
type Output = Vec<BlockResponse>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
loop {
// Check if there is a pending requests. It might not exist if all
// headers are empty and there is nothing to download.
if let Some(fut) = this.fut.as_mut() {
match ready!(fut.poll_unpin(cx)) {
Ok(response) => {
let (peer_id, bodies) = response.split();
if bodies.is_empty() {
this.on_error(BodyRequestError::EmptyResponse, Some(peer_id));
continue
}
let request_len = this.hashes_to_download.len();
let response_len = bodies.len();
if response_len > request_len {
this.on_error(
BodyRequestError::TooManyBodies {
expected: request_len,
received: response_len,
},
Some(peer_id),
);
continue
}
// TODO: Consider limiting
if request_len != 1 && response_len == 1 {
this.on_error(BodyRequestError::EmptyResponse, Some(peer_id));
continue
}
tracing::trace!(
target: "downloaders::bodies", request_len, response_len, ?peer_id, "Received bodies"
);
// Draining the hashes here so that on the next `submit_request` call we
// only request the remaining bodies, instead of the
// ones we already received
this.hashes_to_download.drain(..response_len);
this.buffer.extend(bodies.into_iter().map(|b| (peer_id, b)));
if !this.hashes_to_download.is_empty() {
// Submit next request if not done
this.submit_request();
} else {
this.fut = None;
}
}
Err(error) => {
this.on_error(error.into(), None);
}
}
}
// Drain the buffer and attempt to construct the response.
// If validation fails, the future will restart from scratch.
if this.hashes_to_download.is_empty() {
match this.try_construct_blocks() {
Ok(blocks) => return Poll::Ready(blocks),
Err((peer_id, error)) => {
this.reset_hashes();
this.on_error(error, Some(peer_id));
}
}
// Sanity check
} else if this.fut.is_none() {
unreachable!("Body request logic failure")
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
bodies::test_utils::zip_blocks,
test_utils::{generate_bodies, TestBodiesClient},
};
use reth_interfaces::{
p2p::bodies::response::BlockResponse,
test_utils::{generators::random_header_range, TestConsensus},
};
use reth_primitives::H256;
use std::sync::Arc;
/// Check if future returns empty bodies without dispathing any requests.
#[tokio::test]
async fn request_returns_empty_bodies() {
let headers = random_header_range(0..20, H256::zero());
let client = Arc::new(TestBodiesClient::default());
let fut = BodiesRequestFuture::new(client.clone(), Arc::new(TestConsensus::default()))
.with_headers(headers.clone());
assert_eq!(
fut.await,
headers.into_iter().map(|h| BlockResponse::Empty(h)).collect::<Vec<_>>()
);
assert_eq!(client.times_requested(), 0);
}
/// Check that the request future
#[tokio::test]
async fn request_submits_until_fullfilled() {
// Generate some random blocks
let (headers, mut bodies) = generate_bodies(0..20);
let batch_size = 2;
let client = Arc::new(
TestBodiesClient::default().with_bodies(bodies.clone()).with_max_batch_size(batch_size),
);
let fut = BodiesRequestFuture::new(client.clone(), Arc::new(TestConsensus::default()))
.with_headers(headers.clone());
assert_eq!(fut.await, zip_blocks(headers.iter(), &mut bodies));
assert_eq!(
client.times_requested(),
// div_ceild
(headers.into_iter().filter(|h| !h.is_empty()).count() as u64 + 1) / 2
);
}
}

View File

@ -0,0 +1,25 @@
use reth_eth_wire::BlockBody;
use reth_interfaces::p2p::bodies::response::BlockResponse;
use reth_primitives::{SealedBlock, SealedHeader, H256};
use std::collections::HashMap;
pub(crate) fn zip_blocks<'a>(
headers: impl Iterator<Item = &'a SealedHeader>,
bodies: &mut HashMap<H256, BlockBody>,
) -> Vec<BlockResponse> {
headers
.into_iter()
.map(|header| {
let body = bodies.remove(&header.hash()).expect("body exists");
if header.is_empty() {
BlockResponse::Empty(header.clone())
} else {
BlockResponse::Full(SealedBlock {
header: header.clone(),
body: body.transactions,
ommers: body.ommers.into_iter().map(|o| o.seal()).collect(),
})
}
})
.collect()
}

View File

@ -58,7 +58,7 @@ pub struct LinearDownloader<C, H> {
/// Tip block number to start validating from (in reverse)
next_chain_tip_block_number: u64,
/// The batch size per one request
request_batch_size: u64,
request_limit: u64,
/// Minimum amount of requests to handle concurrently.
min_concurrent_requests: usize,
/// Maximum amount of requests to handle concurrently.
@ -124,11 +124,8 @@ where
fn next_request(&mut self) -> Option<HeadersRequest> {
let local_head = self.local_block_number();
if self.next_request_block_number > local_head {
let request = calc_next_request(
local_head,
self.next_request_block_number,
self.request_batch_size,
);
let request =
calc_next_request(local_head, self.next_request_block_number, self.request_limit);
// need to shift the tracked request block number based on the number of requested
// headers so follow-up requests will use that as start.
self.next_request_block_number -= request.limit;
@ -742,7 +739,7 @@ struct SyncTargetBlock {
#[derive(Debug)]
pub struct LinearDownloadBuilder {
/// The batch size per one request
request_batch_size: u64,
request_limit: u64,
/// Batch size for headers
stream_batch_size: usize,
/// Batch size for headers
@ -756,7 +753,7 @@ pub struct LinearDownloadBuilder {
impl Default for LinearDownloadBuilder {
fn default() -> Self {
Self {
request_batch_size: 1_000,
request_limit: 1_000,
stream_batch_size: 10_000,
max_concurrent_requests: 150,
min_concurrent_requests: 5,
@ -771,7 +768,7 @@ impl LinearDownloadBuilder {
/// This determines the `limit` for a [GetHeaders](reth_eth_wire::GetBlockHeaders) requests, the
/// number of headers we ask for.
pub fn request_limit(mut self, limit: u64) -> Self {
self.request_batch_size = limit;
self.request_limit = limit;
self
}
@ -826,7 +823,7 @@ impl LinearDownloadBuilder {
H: HeadersClient + 'static,
{
let Self {
request_batch_size,
request_limit,
stream_batch_size,
min_concurrent_requests,
max_concurrent_requests,
@ -842,7 +839,7 @@ impl LinearDownloadBuilder {
next_request_block_number: 0,
next_chain_tip_block_number: 0,
lowest_validated_header: None,
request_batch_size,
request_limit,
min_concurrent_requests,
max_concurrent_requests,
stream_batch_size,
@ -863,18 +860,18 @@ impl LinearDownloadBuilder {
/// Configures and returns the next [HeadersRequest] based on the given parameters
///
/// The request wil start at the given `next_request_block_number` block.
/// The `limit` of the request will either be the targeted `request_batch_size` or the difference of
/// The `limit` of the request will either be the targeted `request_limit` or the difference of
/// `next_request_block_number` and the `local_head` in case this is smaller than the targeted
/// `request_batch_size`.
/// `request_limit`.
#[inline]
fn calc_next_request(
local_head: u64,
next_request_block_number: u64,
request_batch_size: u64,
request_limit: u64,
) -> HeadersRequest {
// downloading is in reverse
let diff = next_request_block_number - local_head;
let limit = diff.min(request_batch_size);
let limit = diff.min(request_limit);
let start = next_request_block_number;
HeadersRequest { start: start.into(), limit, direction: HeadersDirection::Falling }
}

View File

@ -12,9 +12,9 @@ use reth_interfaces::{
use reth_primitives::{PeerId, SealedHeader, H256};
use std::{
collections::HashMap,
fmt::{Debug, Formatter},
future::Future,
sync::Arc,
fmt::Debug,
sync::atomic::{AtomicU64, Ordering},
time::Duration,
};
use tokio::sync::Mutex;
@ -42,21 +42,36 @@ pub(crate) fn generate_bodies(
}
/// A [BodiesClient] for testing.
pub(crate) struct TestBodiesClient<F>(pub(crate) Arc<Mutex<F>>);
#[derive(Debug, Default)]
pub(crate) struct TestBodiesClient {
bodies: Mutex<HashMap<H256, BlockBody>>,
should_delay: bool,
max_batch_size: Option<usize>,
times_requested: AtomicU64,
}
impl<F> Debug for TestBodiesClient<F> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TestBodiesClient").finish_non_exhaustive()
impl TestBodiesClient {
pub(crate) fn with_bodies(mut self, bodies: HashMap<H256, BlockBody>) -> Self {
self.bodies = Mutex::new(bodies);
self
}
pub(crate) fn with_should_delay(mut self, should_delay: bool) -> Self {
self.should_delay = should_delay;
self
}
pub(crate) fn with_max_batch_size(mut self, max_batch_size: usize) -> Self {
self.max_batch_size = Some(max_batch_size);
self
}
pub(crate) fn times_requested(&self) -> u64 {
self.times_requested.load(Ordering::Relaxed)
}
}
impl<F> TestBodiesClient<F> {
pub(crate) fn new(f: F) -> Self {
Self(Arc::new(Mutex::new(f)))
}
}
impl<F: Send + Sync> DownloadClient for TestBodiesClient<F> {
impl DownloadClient for TestBodiesClient {
fn report_bad_message(&self, _peer_id: PeerId) {
// noop
}
@ -67,17 +82,30 @@ impl<F: Send + Sync> DownloadClient for TestBodiesClient<F> {
}
#[async_trait]
impl<F, Fut> BodiesClient for TestBodiesClient<F>
where
F: FnMut(Vec<H256>) -> Fut + Send + Sync,
Fut: Future<Output = PeerRequestResult<Vec<BlockBody>>> + Send,
{
impl BodiesClient for TestBodiesClient {
async fn get_block_bodies_with_priority(
&self,
hash: Vec<H256>,
hashes: Vec<H256>,
_priority: Priority,
) -> PeerRequestResult<Vec<BlockBody>> {
let f = &mut *self.0.lock().await;
(f)(hash).await
if self.should_delay {
tokio::time::sleep(Duration::from_millis(hashes[0].to_low_u64_be() % 100)).await;
}
self.times_requested.fetch_add(1, Ordering::Relaxed);
let bodies = &mut *self.bodies.lock().await;
println!("HASHES {}", hashes.len());
Ok((
PeerId::default(),
hashes
.into_iter()
.take(self.max_batch_size.unwrap_or(usize::MAX))
.map(|hash| {
bodies
.remove(&hash)
.expect("Downloader asked for a block it should not ask for")
})
.collect(),
)
.into())
}
}

View File

@ -96,26 +96,26 @@ impl Default for TotalDifficultyConfig {
/// Body stage configuration.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct BodiesConfig {
/// The maximum number of bodies to download before committing progress to the database.
pub commit_threshold: u64,
/// The maximum number of bodies to request from a peer at a time.
pub downloader_batch_size: usize,
/// The number of times to retry downloading a set of bodies.
pub downloader_retries: usize,
/// The maximum number of body requests to have in flight at a time.
///
/// The maximum number of bodies downloaded at the same time is `downloader_batch_size *
/// downloader_concurrency`.
pub downloader_concurrency: usize,
/// The batch size of non-empty blocks per one request
pub downloader_request_limit: u64,
/// The maximum number of block bodies returned at once from the stream
pub downloader_stream_batch_size: usize,
/// Maximum amount of received bodies to buffer internally.
pub downloader_max_buffered_responses: usize,
/// The minimum number of requests to send concurrently.
pub downloader_min_concurrent_requests: usize,
/// The maximum number of requests to send concurrently.
pub downloader_max_concurrent_requests: usize,
}
impl Default for BodiesConfig {
fn default() -> Self {
Self {
commit_threshold: 5_000,
downloader_batch_size: 100,
downloader_retries: 5,
downloader_concurrency: 10,
downloader_request_limit: 200,
downloader_stream_batch_size: 10000,
downloader_max_buffered_responses: 30000,
downloader_min_concurrent_requests: 5,
downloader_max_concurrent_requests: 100,
}
}
}

View File

@ -128,7 +128,17 @@ pub trait Stage<DB: Database>: Send + Sync {
/// Get the next execute action for the stage. Return if the stage has no
/// blocks to process.
macro_rules! exec_or_return {
($input: expr, $threshold: expr, $log_target: expr) => {
($input: expr, $log_target: literal) => {
match $input.next_action(None) {
// Next action cannot be capped without a threshold.
ExecAction::Run { range, capped: _capped } => range.into_inner(),
ExecAction::Done { stage_progress, target } => {
info!(target: $log_target, stage_progress, target, "Target block already reached");
return Ok(ExecOutput { stage_progress, done: true })
}
}
};
($input: expr, $threshold: expr, $log_target: literal) => {
match $input.next_action(Some($threshold)) {
ExecAction::Run { range, capped } => (range.into_inner(), capped),
ExecAction::Done { stage_progress, target } => {

View File

@ -1,20 +1,19 @@
use crate::{
db::Transaction, exec_or_return, DatabaseIntegrityError, ExecAction, ExecInput, ExecOutput,
Stage, StageError, StageId, UnwindInput, UnwindOutput,
db::Transaction, exec_or_return, ExecAction, ExecInput, ExecOutput, Stage, StageError, StageId,
UnwindInput, UnwindOutput,
};
use futures_util::StreamExt;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
database::{Database, DatabaseGAT},
database::Database,
models::{BlockNumHash, StoredBlockBody, StoredBlockOmmers},
tables,
transaction::{DbTx, DbTxMut},
transaction::DbTxMut,
};
use reth_interfaces::{
consensus::Consensus,
p2p::bodies::downloader::{BlockResponse, BodyDownloader},
p2p::bodies::{downloader::BodyDownloader, response::BlockResponse},
};
use reth_primitives::{BlockNumber, SealedHeader};
use std::{fmt::Debug, sync::Arc};
use tracing::*;
@ -54,14 +53,9 @@ pub(crate) const BODIES: StageId = StageId("Bodies");
#[derive(Debug)]
pub struct BodyStage<D: BodyDownloader, C: Consensus> {
/// The body downloader.
pub downloader: Arc<D>,
pub downloader: D,
/// The consensus engine.
pub consensus: Arc<C>,
/// The maximum amount of block bodies to process in one stage execution.
///
/// Smaller batch sizes result in less memory usage, but more disk I/O. Larger batch sizes
/// result in more memory usage, less disk I/O, and more infrequent checkpoints.
pub commit_threshold: u64,
}
#[async_trait::async_trait]
@ -78,10 +72,10 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let ((start_block, end_block), capped) =
exec_or_return!(input, self.commit_threshold, "sync::stages::bodies");
let (start_block, end_block) = exec_or_return!(input, "sync::stages::bodies");
let bodies_to_download = self.bodies_to_download::<DB>(tx, start_block, end_block)?;
// Update the header range on the downloader
self.downloader.set_download_range(start_block..end_block + 1)?;
// Cursors used to write bodies, ommers and transactions
let mut body_cursor = tx.cursor_write::<tables::BlockBodies>()?;
@ -95,28 +89,25 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
// Get id for the first transaction and first transition in the block
let (mut current_tx_id, mut transition_id) = tx.get_next_block_ids(start_block)?;
// NOTE(onbjerg): The stream needs to live here otherwise it will just create a new iterator
// on every iteration of the while loop -_-
let mut bodies_stream = self.downloader.bodies_stream(bodies_to_download.iter());
let mut highest_block = input.stage_progress.unwrap_or_default();
debug!(target: "sync::stages::bodies", stage_progress = highest_block, target = end_block, start_tx_id = current_tx_id, transition_id, "Commencing sync");
while let Some(result) = bodies_stream.next().await {
let Ok(response) = result else {
error!(target: "sync::stages::bodies", block = highest_block + 1, error = ?result.unwrap_err(), "Error downloading block");
return Ok(ExecOutput {
stage_progress: highest_block,
done: false,
})
};
let downloaded_bodies = match self.downloader.next().await {
Some(downloaded_bodies) => downloaded_bodies,
None => {
info!(target: "sync::stages::bodies", stage_progress = highest_block, "Download stream exhausted");
return Ok(ExecOutput { stage_progress: highest_block, done: true })
}
};
trace!(target: "sync::stages::bodies", bodies_len = downloaded_bodies.len(), "Writing blocks");
for response in downloaded_bodies {
// Write block
let block_header = response.header();
let numhash: BlockNumHash = block_header.num_hash().into();
match response {
BlockResponse::Full(block) => {
trace!(target: "sync::stages::bodies", ommers = block.ommers.len(), txs = block.body.len(), ?numhash, "Writing full block");
body_cursor.append(
numhash,
StoredBlockBody {
@ -147,7 +138,6 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
}
}
BlockResponse::Empty(_) => {
trace!(target: "sync::stages::bodies", ?numhash, "Writing empty block");
body_cursor.append(
numhash,
StoredBlockBody { start_tx_id: current_tx_id, tx_count: 0 },
@ -160,7 +150,6 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
// If the block does not have a reward, the transition will be the same as the
// transition at the last transaction of this block.
let has_reward = self.consensus.has_block_reward(numhash.number());
trace!(target: "sync::stages::bodies", has_reward, ?numhash, "Block reward");
if has_reward {
transition_id += 1;
}
@ -172,7 +161,7 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
// The stage is "done" if:
// - We got fewer blocks than our target
// - We reached our target and the target was not limited by the batch size of the stage
let done = !capped && highest_block == end_block;
let done = highest_block == end_block;
info!(target: "sync::stages::bodies", stage_progress = highest_block, target = end_block, done, "Sync iteration finished");
Ok(ExecOutput { stage_progress: highest_block, done })
}
@ -232,35 +221,6 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
}
}
impl<D: BodyDownloader, C: Consensus> BodyStage<D, C> {
/// Computes a list of `(block_number, header_hash)` for blocks that we need to download bodies
/// for.
///
/// This skips empty blocks (i.e. no ommers, no transactions).
fn bodies_to_download<DB: Database>(
&self,
tx: &mut <DB as DatabaseGAT<'_>>::TXMut,
starting_block: BlockNumber,
target: BlockNumber,
) -> Result<Vec<SealedHeader>, StageError> {
let mut header_cursor = tx.cursor_read::<tables::Headers>()?;
let mut header_hashes_cursor = tx.cursor_read::<tables::CanonicalHeaders>()?;
let mut walker = header_hashes_cursor
.walk(starting_block)?
.take_while(|item| item.as_ref().map_or(false, |(num, _)| *num <= target));
let mut bodies_to_download = Vec::new();
while let Some(Ok((block_number, header_hash))) = walker.next() {
let (_, header) = header_cursor.seek_exact((block_number, header_hash).into())?.ok_or(
DatabaseIntegrityError::Header { number: block_number, hash: header_hash },
)?;
bodies_to_download.push(SealedHeader::new(header, header_hash));
}
Ok(bodies_to_download)
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -269,11 +229,6 @@ mod tests {
PREV_STAGE_ID,
};
use assert_matches::assert_matches;
use reth_interfaces::{
consensus,
p2p::error::{DownloadError, RequestError},
};
use std::collections::HashMap;
use test_utils::*;
stage_test_suite_ext!(BodyTestRunner, body);
@ -376,47 +331,6 @@ mod tests {
assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation");
}
/// Checks that the stage returns to the pipeline on validation failure.
#[tokio::test]
async fn pre_validation_failure() {
let (stage_progress, previous_stage) = (1, 20);
// Set up test runner
let mut runner = BodyTestRunner::default();
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
};
let blocks = runner.seed_execution(input).expect("failed to seed execution");
// Fail validation
let responses = blocks
.iter()
.map(|b| {
(
b.hash(),
Err(DownloadError::BlockValidation {
hash: b.hash(),
error: consensus::Error::BaseFeeMissing,
}),
)
})
.collect::<HashMap<_, _>>();
runner.set_responses(responses);
// Run the stage
let rx = runner.execute(input);
// Check that the error bubbles up
assert_matches!(
rx.await.unwrap(),
Ok(ExecOutput { stage_progress: out_stage_progress, done: false })
if out_stage_progress == stage_progress
);
assert!(runner.validate_execution(input, None).is_ok(), "execution validation");
}
/// Checks that the stage unwinds correctly, even if a transaction in a block is missing.
#[tokio::test]
async fn unwind_missing_tx() {
@ -472,40 +386,6 @@ mod tests {
assert!(runner.validate_unwind(input).is_ok(), "unwind validation");
}
/// Checks that the stage exits if the downloader times out
/// TODO: We should probably just exit as "OK", commit the blocks we downloaded successfully and
/// try again?
#[tokio::test]
async fn downloader_timeout() {
let (stage_progress, previous_stage) = (1, 2);
// Set up test runner
let mut runner = BodyTestRunner::default();
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
};
let blocks = runner.seed_execution(input).expect("failed to seed execution");
// overwrite responses
let header = blocks.last().unwrap();
runner.set_responses(HashMap::from([(
header.hash(),
Err(DownloadError::RequestError(RequestError::Timeout)),
)]));
// Run the stage
let rx = runner.execute(input);
// Check that the error bubbles up
assert_matches!(
rx.await.unwrap(),
Ok(ExecOutput { stage_progress: out_stage_progress, done: false })
if out_stage_progress == stage_progress
);
assert!(runner.validate_execution(input, None).is_ok(), "execution validation");
}
mod test_utils {
use crate::{
stages::bodies::BodyStage,
@ -516,8 +396,11 @@ mod tests {
ExecInput, ExecOutput, UnwindInput,
};
use assert_matches::assert_matches;
use futures_util::Stream;
use reth_db::{
cursor::DbCursorRO,
database::Database,
mdbx::{Env, WriteMap},
models::{BlockNumHash, StoredBlockBody, StoredBlockOmmers},
tables,
transaction::{DbTx, DbTxMut},
@ -525,13 +408,13 @@ mod tests {
use reth_eth_wire::BlockBody;
use reth_interfaces::{
consensus::Consensus,
db,
p2p::{
bodies::{
client::BodiesClient,
downloader::{BlockResponse, BodyDownloader},
client::BodiesClient, downloader::BodyDownloader, response::BlockResponse,
},
downloader::{DownloadClient, DownloadStream, Downloader},
error::{DownloadResult, PeerRequestResult},
downloader::{DownloadClient, Downloader},
error::PeerRequestResult,
priority::Priority,
},
test_utils::{
@ -540,26 +423,32 @@ mod tests {
},
};
use reth_primitives::{BlockNumber, SealedBlock, SealedHeader, TxNumber, H256};
use std::{collections::HashMap, sync::Arc};
use std::{
collections::{HashMap, VecDeque},
ops::Range,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
/// The block hash of the genesis block.
pub(crate) const GENESIS_HASH: H256 = H256::zero();
/// A helper to create a collection of resulted-wrapped block bodies keyed by their hash.
pub(crate) fn body_by_hash(block: &SealedBlock) -> (H256, DownloadResult<BlockBody>) {
/// A helper to create a collection of block bodies keyed by their hash.
pub(crate) fn body_by_hash(block: &SealedBlock) -> (H256, BlockBody) {
(
block.hash(),
Ok(BlockBody {
BlockBody {
transactions: block.body.clone(),
ommers: block.ommers.iter().cloned().map(|ommer| ommer.unseal()).collect(),
}),
},
)
}
/// A helper struct for running the [BodyStage].
pub(crate) struct BodyTestRunner {
pub(crate) consensus: Arc<TestConsensus>,
responses: HashMap<H256, DownloadResult<BlockBody>>,
responses: HashMap<H256, BlockBody>,
tx: TestTransaction,
batch_size: u64,
}
@ -580,10 +469,7 @@ mod tests {
self.batch_size = batch_size;
}
pub(crate) fn set_responses(
&mut self,
responses: HashMap<H256, DownloadResult<BlockBody>>,
) {
pub(crate) fn set_responses(&mut self, responses: HashMap<H256, BlockBody>) {
self.responses = responses;
}
}
@ -597,9 +483,12 @@ mod tests {
fn stage(&self) -> Self::S {
BodyStage {
downloader: Arc::new(TestBodyDownloader::new(self.responses.clone())),
downloader: TestBodyDownloader::new(
self.tx.inner_raw(),
self.responses.clone(),
self.batch_size,
),
consensus: self.consensus.clone(),
commit_threshold: self.batch_size,
}
}
}
@ -630,11 +519,15 @@ mod tests {
let last_transition_id = progress.body.len() as u64;
let block_transition_id = last_transition_id + 1; // for block reward
println!("Bodies tx:{}", progress.body.len());
tx.put::<tables::BlockTransitionIndex>(key.number(), block_transition_id)?;
tx.put::<tables::BlockBodies>(key, body)?;
tx.put::<tables::BlockOmmers>(key, StoredBlockOmmers { ommers: vec![] })?;
if !progress.is_empty() {
tx.put::<tables::BlockOmmers>(
key,
StoredBlockOmmers { ommers: vec![] },
)?;
}
Ok(())
})?;
}
@ -706,6 +599,7 @@ mod tests {
) -> Result<(), TestRunnerError> {
self.tx.query(|tx| {
// Acquire cursors on body related tables
let mut headers_cursor = tx.cursor_read::<tables::Headers>()?;
let mut bodies_cursor = tx.cursor_read::<tables::BlockBodies>()?;
let mut ommers_cursor = tx.cursor_read::<tables::BlockOmmers>()?;
let mut block_transition_cursor = tx.cursor_read::<tables::BlockTransitionIndex>()?;
@ -738,8 +632,15 @@ mod tests {
key.number(), highest_block
);
let (_, header) = headers_cursor.seek_exact(key)?.expect("to be present");
// Validate that ommers exist
assert_matches!(ommers_cursor.seek_exact(key), Ok(Some(_)), "Block ommers are missing");
assert_matches!(
ommers_cursor.seek_exact(key),
Ok(ommers) => {
assert!(if header.is_empty() { ommers.is_none() } else { ommers.is_some() })
},
"Block ommers are missing"
);
for tx_id in body.tx_id_range() {
let tx_entry = transaction_cursor.seek_exact(tx_id)?;
@ -772,7 +673,6 @@ mod tests {
}
}
// TODO(onbjerg): Move
/// A [BodiesClient] that should not be called.
#[derive(Debug)]
pub(crate) struct NoopClient;
@ -798,16 +698,22 @@ mod tests {
}
}
// TODO(onbjerg): Move
/// A [BodyDownloader] that is backed by an internal [HashMap] for testing.
#[derive(Debug, Default, Clone)]
#[derive(Debug)]
pub(crate) struct TestBodyDownloader {
responses: HashMap<H256, DownloadResult<BlockBody>>,
db: Arc<Env<WriteMap>>,
responses: HashMap<H256, BlockBody>,
headers: VecDeque<SealedHeader>,
batch_size: u64,
}
impl TestBodyDownloader {
pub(crate) fn new(responses: HashMap<H256, DownloadResult<BlockBody>>) -> Self {
Self { responses }
pub(crate) fn new(
db: Arc<Env<WriteMap>>,
responses: HashMap<H256, BlockBody>,
batch_size: u64,
) -> Self {
Self { db, responses, headers: VecDeque::default(), batch_size }
}
}
@ -825,24 +731,61 @@ mod tests {
}
impl BodyDownloader for TestBodyDownloader {
fn bodies_stream<'a, 'b, I>(&'a self, hashes: I) -> DownloadStream<'a, BlockResponse>
where
I: IntoIterator<Item = &'b SealedHeader>,
<I as IntoIterator>::IntoIter: Send + 'b,
'b: 'a,
{
Box::pin(futures_util::stream::iter(hashes.into_iter().map(|header| {
let result = self
.responses
.get(&header.hash())
.expect("Stage tried downloading a block we do not have.")
.clone()?;
Ok(BlockResponse::Full(SealedBlock {
header: header.clone(),
body: result.transactions,
ommers: result.ommers.into_iter().map(|header| header.seal()).collect(),
}))
})))
fn set_download_range(&mut self, range: Range<BlockNumber>) -> Result<(), db::Error> {
self.headers = VecDeque::from(self.db.view(|tx| {
let mut header_cursor = tx.cursor_read::<tables::Headers>()?;
let mut canonical_cursor = tx.cursor_read::<tables::CanonicalHeaders>()?;
let walker = canonical_cursor.walk(range.start)?.take_while(|entry| {
entry.as_ref().map(|(num, _)| *num < range.end).unwrap_or_default()
});
let mut headers = Vec::default();
for entry in walker {
let (num, hash) = entry?;
let (_, header) =
header_cursor.seek_exact((num, hash).into())?.expect("missing header");
headers.push(SealedHeader::new(header, hash));
}
Ok(headers)
})??);
Ok(())
}
}
impl Stream for TestBodyDownloader {
type Item = Vec<BlockResponse>;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if this.headers.is_empty() {
return Poll::Ready(None)
}
let mut response = Vec::default();
while let Some(header) = this.headers.pop_front() {
if header.is_empty() {
response.push(BlockResponse::Empty(header))
} else {
let body =
this.responses.remove(&header.hash()).expect("requested unknown body");
response.push(BlockResponse::Full(SealedBlock {
header,
body: body.transactions,
ommers: body.ommers.into_iter().map(|h| h.seal()).collect(),
}));
}
if response.len() as u64 >= this.batch_size {
break
}
}
if !response.is_empty() {
return Poll::Ready(Some(response))
}
panic!("requested bodies without setting headers")
}
}
}

View File

@ -19,6 +19,7 @@ use crate::db::Transaction;
/// let tx = TestTransaction::default();
/// stage.execute(&mut tx.container(), input);
/// ```
#[derive(Debug)]
pub(crate) struct TestTransaction {
tx: Arc<Env<WriteMap>>,
}