feat(download): immediate body response validation (#1080)

Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
Roman Krasiuk
2023-01-31 01:44:41 +02:00
committed by GitHub
parent 0e24093b0b
commit 55fc6924b9
2 changed files with 106 additions and 102 deletions

View File

@ -5,11 +5,12 @@ use reth_interfaces::{
consensus::{Consensus as ConsensusTrait, Consensus},
p2p::{
bodies::{client::BodiesClient, response::BlockResponse},
error::DownloadError,
error::{DownloadError, DownloadResult},
},
};
use reth_primitives::{PeerId, SealedBlock, SealedHeader, H256};
use reth_primitives::{PeerId, SealedBlock, SealedHeader, WithPeerId, H256};
use std::{
collections::VecDeque,
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
@ -38,12 +39,11 @@ pub(crate) struct BodiesRequestFuture<B: BodiesClient> {
client: Arc<B>,
consensus: Arc<dyn Consensus>,
metrics: DownloaderMetrics,
// All requested headers
headers: Vec<SealedHeader>,
// Remaining hashes to download
hashes_to_download: Vec<H256>,
buffer: Vec<(PeerId, BlockBody)>,
// Headers to download. The collection is shrinked as responses are buffered.
headers: VecDeque<SealedHeader>,
buffer: Vec<BlockResponse>,
fut: Option<B::Output>,
last_request_len: Option<usize>,
}
impl<B> BodiesRequestFuture<B>
@ -61,19 +61,19 @@ where
consensus,
metrics,
headers: Default::default(),
hashes_to_download: Default::default(),
buffer: Default::default(),
last_request_len: None,
fut: None,
}
}
pub(crate) fn with_headers(mut self, headers: Vec<SealedHeader>) -> Self {
self.headers = headers;
self.reset_hashes();
self.buffer.reserve_exact(headers.len());
self.headers = VecDeque::from(headers);
// Submit the request only if there are any headers to download.
// Otherwise, the future will immediately be resolved.
if !self.hashes_to_download.is_empty() {
self.submit_request();
if let Some(req) = self.next_request() {
self.submit_request(req);
}
self
}
@ -84,57 +84,103 @@ where
if let Some(peer_id) = peer_id {
self.client.report_bad_message(peer_id);
}
self.submit_request();
self.submit_request(self.next_request().expect("existing hashes to resubmit"));
}
fn submit_request(&mut self) {
/// Retrieve header hashes for the next request.
fn next_request(&self) -> Option<Vec<H256>> {
let mut hashes = self.headers.iter().filter(|h| !h.is_empty()).map(|h| h.hash()).peekable();
if hashes.peek().is_some() {
Some(hashes.collect())
} else {
None
}
}
/// Submit the request.
fn submit_request(&mut self, req: Vec<H256>) {
tracing::trace!(target: "downloaders::bodies", request_len = req.len(), "Requesting bodies");
let client = Arc::clone(&self.client);
let request = self.hashes_to_download.clone();
tracing::trace!(target: "downloaders::bodies", request_len = request.len(), "Requesting bodies");
self.fut = Some(client.get_block_bodies(request));
self.last_request_len = Some(req.len());
self.fut = Some(client.get_block_bodies(req));
}
fn reset_hashes(&mut self) {
self.hashes_to_download =
self.headers.iter().filter(|h| !h.is_empty()).map(|h| h.hash()).collect();
/// Process block response.
/// Returns an error if the response is invalid.
fn on_block_response(&mut self, response: WithPeerId<Vec<BlockBody>>) -> DownloadResult<()> {
let (peer_id, bodies) = response.split();
let request_len = self.last_request_len.unwrap_or_default();
let response_len = bodies.len();
tracing::trace!(target: "downloaders::bodies", request_len, response_len, ?peer_id, "Received bodies");
// Increment total downloaded metric
self.metrics.total_downloaded.increment(response_len as u64);
// Malicious peers often return a single block. Mark responses with single
// block when more than 1 were requested invalid.
// TODO: Instead of marking single block responses invalid, calculate
// soft response size lower limit and use that for filtering.
if bodies.is_empty() || (request_len != 1 && response_len == 1) {
return Err(DownloadError::EmptyResponse)
}
if response_len > request_len {
return Err(DownloadError::TooManyBodies {
expected: request_len,
received: response_len,
})
}
// Buffer block responses
self.try_buffer_blocks(bodies)?;
// Submit next request if any
if let Some(req) = self.next_request() {
self.submit_request(req);
} else {
self.fut = None;
}
Ok(())
}
/// Attempt to construct blocks from origin headers and buffered bodies.
/// Attempt to buffer body responses. Returns an error if body response fails validation.
/// Every body preceeding the failed one will be buffered.
///
/// NOTE: This method drains the buffer.
///
/// # Panics
///
/// If the number of buffered bodies does not equal the number of non empty headers.
#[allow(clippy::result_large_err)]
fn try_construct_blocks(&mut self) -> Result<Vec<BlockResponse>, (PeerId, DownloadError)> {
// Drop the allocated memory for the buffer. Optimistically, it will not be reused.
let mut bodies = std::mem::take(&mut self.buffer).into_iter();
let mut results = Vec::default();
for header in self.headers.iter().cloned() {
if header.is_empty() {
results.push(BlockResponse::Empty(header));
/// This method removes headers from the internal collection.
/// If the response fails validation, then the header will be put back.
fn try_buffer_blocks(&mut self, bodies: Vec<BlockBody>) -> DownloadResult<()> {
let mut bodies = bodies.into_iter().peekable();
while bodies.peek().is_some() {
let next_header = match self.headers.pop_front() {
Some(header) => header,
None => return Ok(()), // no more headers
};
if next_header.is_empty() {
self.buffer.push(BlockResponse::Empty(next_header));
} else {
// The body must be present since we requested headers for all non-empty
// bodies at this point
let (peer_id, body) = bodies.next().expect("download logic failed");
let next_body = bodies.next().unwrap();
let block = SealedBlock {
header: header.clone(),
body: body.transactions,
ommers: body.ommers.into_iter().map(|header| header.seal()).collect(),
header: next_header,
body: next_body.transactions,
ommers: next_body.ommers.into_iter().map(|header| header.seal()).collect(),
};
// This ensures that the TxRoot and OmmersRoot from the header match the
// ones calculated manually from the block body.
self.consensus.pre_validate_block(&block).map_err(|error| {
(peer_id, DownloadError::BodyValidation { hash: header.hash(), error })
})?;
if let Err(error) = self.consensus.pre_validate_block(&block) {
// Put the header back and return an error
let hash = block.hash();
self.headers.push_front(block.header);
return Err(DownloadError::BodyValidation { hash, error })
}
results.push(BlockResponse::Full(block));
self.buffer.push(BlockResponse::Full(block));
}
}
Ok(results)
Ok(())
}
}
@ -148,52 +194,18 @@ where
let this = self.get_mut();
loop {
if this.headers.is_empty() {
return Poll::Ready(std::mem::take(&mut this.buffer))
}
// Check if there is a pending requests. It might not exist if all
// headers are empty and there is nothing to download.
if let Some(fut) = this.fut.as_mut() {
match ready!(fut.poll_unpin(cx)) {
Ok(response) => {
let (peer_id, bodies) = response.split();
let request_len = this.hashes_to_download.len();
let response_len = bodies.len();
// Increment total downloaded metric
this.metrics.total_downloaded.increment(response_len as u64);
// Malicious peers often return a single block. Mark responses with single
// block when more than 1 were requested invalid.
// TODO: Instead of marking single block responses invalid, calculate
// soft response size lower limit and use that for filtering.
if bodies.is_empty() || (request_len != 1 && response_len == 1) {
this.on_error(DownloadError::EmptyResponse, Some(peer_id));
continue
}
if response_len > request_len {
this.on_error(
DownloadError::TooManyBodies {
expected: request_len,
received: response_len,
},
Some(peer_id),
);
continue
}
tracing::trace!(
target: "downloaders::bodies", request_len, response_len, ?peer_id, "Received bodies"
);
// Draining the hashes here so that on the next `submit_request` call we
// only request the remaining bodies, instead of the
// ones we already received
this.hashes_to_download.drain(..response_len);
this.buffer.extend(bodies.into_iter().map(|b| (peer_id, b)));
if !this.hashes_to_download.is_empty() {
// Submit next request if not done
this.submit_request();
} else {
this.fut = None;
let peer_id = response.peer_id();
if let Err(error) = this.on_block_response(response) {
this.on_error(error, Some(peer_id));
}
}
Err(error) => {
@ -202,19 +214,10 @@ where
}
}
// Drain the buffer and attempt to construct the response.
// If validation fails, the future will restart from scratch.
if this.hashes_to_download.is_empty() {
match this.try_construct_blocks() {
Ok(blocks) => return Poll::Ready(blocks),
Err((peer_id, error)) => {
this.reset_hashes();
this.on_error(error, Some(peer_id));
}
}
// Sanity check
} else if this.fut.is_none() {
unreachable!("Body request logic failure")
// Buffer any empty headers
while this.headers.front().map(|h| h.is_empty()).unwrap_or_default() {
let header = this.headers.pop_front().unwrap();
this.buffer.push(BlockResponse::Empty(header));
}
}
}

View File

@ -10,6 +10,7 @@ use reth_db::{
transaction::{DbTx, DbTxMut, DbTxMutGAT},
TransitionList,
};
use reth_primitives::{Address, TransitionId};
use std::{collections::BTreeMap, fmt::Debug};
use tracing::*;