fix: validate headers in full block downloader (#4034)

This commit is contained in:
Matthias Seitz
2023-08-02 18:36:48 +02:00
committed by GitHub
parent d595834d20
commit 94dfeb3ade
4 changed files with 122 additions and 62 deletions

View File

@ -26,7 +26,7 @@ use reth_primitives::{
Head, Header, SealedBlock, SealedHeader, H256, U256, Head, Header, SealedBlock, SealedHeader, H256, U256,
}; };
use reth_provider::{ use reth_provider::{
BlockIdReader, BlockReader, BlockSource, CanonChainTracker, ProviderError, BlockIdReader, BlockReader, BlockSource, CanonChainTracker, ChainSpecProvider, ProviderError,
StageCheckpointReader, StageCheckpointReader,
}; };
use reth_prune::Pruner; use reth_prune::Pruner;
@ -208,6 +208,7 @@ where
+ BlockIdReader + BlockIdReader
+ CanonChainTracker + CanonChainTracker
+ StageCheckpointReader + StageCheckpointReader
+ ChainSpecProvider
+ 'static, + 'static,
Client: HeadersClient + BodiesClient + Clone + Unpin + 'static, Client: HeadersClient + BodiesClient + Clone + Unpin + 'static,
{ {
@ -279,6 +280,7 @@ where
task_spawner.clone(), task_spawner.clone(),
run_pipeline_continuously, run_pipeline_continuously,
max_block, max_block,
blockchain.chain_spec(),
); );
let prune = pruner.map(|pruner| EnginePruneController::new(pruner, task_spawner)); let prune = pruner.map(|pruner| EnginePruneController::new(pruner, task_spawner));
let mut this = Self { let mut this = Self {
@ -1651,6 +1653,7 @@ where
+ BlockIdReader + BlockIdReader
+ CanonChainTracker + CanonChainTracker
+ StageCheckpointReader + StageCheckpointReader
+ ChainSpecProvider
+ Unpin + Unpin
+ 'static, + 'static,
{ {

View File

@ -1,6 +1,6 @@
//! Sync management for the engine implementation. //! Sync management for the engine implementation.
use crate::engine::metrics::EngineSyncMetrics; use crate::{engine::metrics::EngineSyncMetrics, BeaconConsensus};
use futures::FutureExt; use futures::FutureExt;
use reth_db::database::Database; use reth_db::database::Database;
use reth_interfaces::p2p::{ use reth_interfaces::p2p::{
@ -8,12 +8,13 @@ use reth_interfaces::p2p::{
full_block::{FetchFullBlockFuture, FetchFullBlockRangeFuture, FullBlockClient}, full_block::{FetchFullBlockFuture, FetchFullBlockRangeFuture, FullBlockClient},
headers::client::HeadersClient, headers::client::HeadersClient,
}; };
use reth_primitives::{BlockNumber, SealedBlock, H256}; use reth_primitives::{BlockNumber, ChainSpec, SealedBlock, H256};
use reth_stages::{ControlFlow, Pipeline, PipelineError, PipelineWithResult}; use reth_stages::{ControlFlow, Pipeline, PipelineError, PipelineWithResult};
use reth_tasks::TaskSpawner; use reth_tasks::TaskSpawner;
use std::{ use std::{
cmp::{Ordering, Reverse}, cmp::{Ordering, Reverse},
collections::{binary_heap::PeekMut, BinaryHeap}, collections::{binary_heap::PeekMut, BinaryHeap},
sync::Arc,
task::{ready, Context, Poll}, task::{ready, Context, Poll},
}; };
use tokio::sync::oneshot; use tokio::sync::oneshot;
@ -68,9 +69,13 @@ where
pipeline_task_spawner: Box<dyn TaskSpawner>, pipeline_task_spawner: Box<dyn TaskSpawner>,
run_pipeline_continuously: bool, run_pipeline_continuously: bool,
max_block: Option<BlockNumber>, max_block: Option<BlockNumber>,
chain_spec: Arc<ChainSpec>,
) -> Self { ) -> Self {
Self { Self {
full_block_client: FullBlockClient::new(client), full_block_client: FullBlockClient::new(
client,
Arc::new(BeaconConsensus::new(chain_spec)),
),
pipeline_task_spawner, pipeline_task_spawner,
pipeline_state: PipelineState::Idle(Some(pipeline)), pipeline_state: PipelineState::Idle(Some(pipeline)),
pending_pipeline_target: None, pending_pipeline_target: None,
@ -394,7 +399,8 @@ mod tests {
}; };
use reth_interfaces::{p2p::either::EitherDownloader, test_utils::TestFullBlockClient}; use reth_interfaces::{p2p::either::EitherDownloader, test_utils::TestFullBlockClient};
use reth_primitives::{ use reth_primitives::{
stage::StageCheckpoint, BlockBody, ChainSpec, ChainSpecBuilder, SealedHeader, MAINNET, constants::ETHEREUM_BLOCK_GAS_LIMIT, stage::StageCheckpoint, BlockBody, ChainSpec,
ChainSpecBuilder, Header, SealedHeader, MAINNET,
}; };
use reth_provider::{test_utils::TestExecutorFactory, PostState}; use reth_provider::{test_utils::TestExecutorFactory, PostState};
use reth_stages::{test_utils::TestStages, ExecOutput, StageError}; use reth_stages::{test_utils::TestStages, ExecOutput, StageError};
@ -491,6 +497,7 @@ mod tests {
fn build<DB>( fn build<DB>(
self, self,
pipeline: Pipeline<DB>, pipeline: Pipeline<DB>,
chain_spec: Arc<ChainSpec>,
) -> EngineSyncController<DB, EitherDownloader<Client, TestFullBlockClient>> ) -> EngineSyncController<DB, EitherDownloader<Client, TestFullBlockClient>>
where where
DB: Database + 'static, DB: Database + 'static,
@ -508,6 +515,7 @@ mod tests {
// run_pipeline_continuously: false here until we want to test this // run_pipeline_continuously: false here until we want to test this
false, false,
self.max_block, self.max_block,
chain_spec,
) )
} }
} }
@ -539,10 +547,11 @@ mod tests {
checkpoint: StageCheckpoint::new(5), checkpoint: StageCheckpoint::new(5),
done: true, done: true,
})])) })]))
.build(chain_spec); .build(chain_spec.clone());
let mut sync_controller = let mut sync_controller = TestSyncControllerBuilder::new()
TestSyncControllerBuilder::new().with_client(client.clone()).build(pipeline); .with_client(client.clone())
.build(pipeline, chain_spec);
let tip = client.highest_block().expect("there should be blocks here"); let tip = client.highest_block().expect("there should be blocks here");
sync_controller.set_pipeline_sync_target(tip.hash); sync_controller.set_pipeline_sync_target(tip.hash);
@ -577,20 +586,27 @@ mod tests {
); );
let client = TestFullBlockClient::default(); let client = TestFullBlockClient::default();
let mut header = SealedHeader::default(); let mut header = Header {
base_fee_per_gas: Some(7),
gas_limit: ETHEREUM_BLOCK_GAS_LIMIT,
..Default::default()
}
.seal_slow();
let body = BlockBody::default(); let body = BlockBody::default();
for _ in 0..10 { for _ in 0..10 {
header.parent_hash = header.hash_slow(); header.parent_hash = header.hash_slow();
header.number += 1; header.number += 1;
header.timestamp += 1;
header = header.header.seal_slow(); header = header.header.seal_slow();
client.insert(header.clone(), body.clone()); client.insert(header.clone(), body.clone());
} }
// set up a pipeline // set up a pipeline
let pipeline = TestPipelineBuilder::new().build(chain_spec); let pipeline = TestPipelineBuilder::new().build(chain_spec.clone());
let mut sync_controller = let mut sync_controller = TestSyncControllerBuilder::new()
TestSyncControllerBuilder::new().with_client(client.clone()).build(pipeline); .with_client(client.clone())
.build(pipeline, chain_spec);
let tip = client.highest_block().expect("there should be blocks here"); let tip = client.highest_block().expect("there should be blocks here");

View File

@ -31,6 +31,28 @@ pub trait Consensus: Debug + Send + Sync {
parent: &SealedHeader, parent: &SealedHeader,
) -> Result<(), ConsensusError>; ) -> Result<(), ConsensusError>;
/// Validates the given headers
///
/// This ensures that the first header is valid on its own and all subsequent headers are valid
/// on its own and valid against its parent.
///
/// Note: this expects that the headers are in natural order (ascending block number)
fn validate_header_range(&self, headers: &[SealedHeader]) -> Result<(), ConsensusError> {
if headers.is_empty() {
return Ok(())
}
let first = headers.first().expect("checked empty");
self.validate_header(first)?;
let mut parent = first;
for child in headers.iter().skip(1) {
self.validate_header(child)?;
self.validate_header_against_parent(child, parent)?;
parent = child;
}
Ok(())
}
/// Validate if the header is correct and follows the consensus specification, including /// Validate if the header is correct and follows the consensus specification, including
/// computed properties (like total difficulty). /// computed properties (like total difficulty).
/// ///

View File

@ -1,5 +1,6 @@
use super::headers::client::HeadersRequest;
use crate::{ use crate::{
consensus::ConsensusError, consensus::{Consensus, ConsensusError},
p2p::{ p2p::{
bodies::client::{BodiesClient, SingleBodyRequest}, bodies::client::{BodiesClient, SingleBodyRequest},
error::PeerRequestResult, error::PeerRequestResult,
@ -16,22 +17,28 @@ use std::{
fmt::Debug, fmt::Debug,
future::Future, future::Future,
pin::Pin, pin::Pin,
sync::Arc,
task::{ready, Context, Poll}, task::{ready, Context, Poll},
}; };
use tracing::debug; use tracing::debug;
use super::headers::client::HeadersRequest;
/// A Client that can fetch full blocks from the network. /// A Client that can fetch full blocks from the network.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct FullBlockClient<Client> { pub struct FullBlockClient<Client> {
client: Client, client: Client,
consensus: Arc<dyn Consensus>,
} }
impl<Client> FullBlockClient<Client> { impl<Client> FullBlockClient<Client> {
/// Creates a new instance of `FullBlockClient`. /// Creates a new instance of `FullBlockClient`.
pub fn new(client: Client) -> Self { pub fn new(client: Client, consensus: Arc<dyn Consensus>) -> Self {
Self { client } Self { client, consensus }
}
/// Returns a client with Test consensus
#[cfg(feature = "test-utils")]
pub fn test_client(client: Client) -> Self {
Self::new(client, Arc::new(crate::test_utils::TestConsensus::default()))
} }
} }
@ -95,6 +102,7 @@ where
headers: None, headers: None,
pending_headers: VecDeque::new(), pending_headers: VecDeque::new(),
bodies: HashMap::new(), bodies: HashMap::new(),
consensus: Arc::clone(&self.consensus),
} }
} }
} }
@ -186,7 +194,7 @@ where
if let Some(header) = maybe_header { if let Some(header) = maybe_header {
if header.hash() != this.hash { if header.hash() != this.hash {
debug!(target: "downloaders", expected=?this.hash, received=?header.hash, "Received wrong header"); debug!(target: "downloaders", expected=?this.hash, received=?header.hash, "Received wrong header");
// received bad header // received a different header than requested
this.client.report_bad_message(peer) this.client.report_bad_message(peer)
} else { } else {
this.header = Some(header); this.header = Some(header);
@ -352,6 +360,8 @@ where
{ {
/// The client used to fetch headers and bodies. /// The client used to fetch headers and bodies.
client: Client, client: Client,
/// The consensus instance used to validate the blocks.
consensus: Arc<dyn Consensus>,
/// The block hash to start fetching from (inclusive). /// The block hash to start fetching from (inclusive).
start_hash: H256, start_hash: H256,
/// How many blocks to fetch: `len([start_hash, ..]) == count` /// How many blocks to fetch: `len([start_hash, ..]) == count`
@ -381,6 +391,8 @@ where
} }
/// Inserts a block body, matching it with the `next_header`. /// Inserts a block body, matching it with the `next_header`.
///
/// Note: this assumes the response matches the next header in the queue.
fn insert_body(&mut self, body_response: BodyResponse) { fn insert_body(&mut self, body_response: BodyResponse) {
if let Some(header) = self.pending_headers.pop_front() { if let Some(header) = self.pending_headers.pop_front() {
self.bodies.insert(header, body_response); self.bodies.insert(header, body_response);
@ -388,8 +400,8 @@ where
} }
/// Inserts multiple block bodies. /// Inserts multiple block bodies.
fn insert_bodies(&mut self, bodies: Vec<BodyResponse>) { fn insert_bodies(&mut self, bodies: impl IntoIterator<Item = BodyResponse>) {
for body in bodies { for body in bodies.into_iter() {
self.insert_body(body); self.insert_body(body);
} }
} }
@ -461,6 +473,46 @@ where
Some(response) Some(response)
} }
fn on_headers_response(&mut self, headers: WithPeerId<Vec<Header>>) {
let (peer, mut headers_falling) =
headers.map(|h| h.into_iter().map(|h| h.seal_slow()).collect::<Vec<_>>()).split();
// fill in the response if it's the correct length
if headers_falling.len() == self.count as usize {
// sort headers from highest to lowest block number
headers_falling.sort_unstable_by_key(|h| Reverse(h.number));
// check the starting hash
if headers_falling[0].hash() != self.start_hash {
// received a different header than requested
self.client.report_bad_message(peer);
} else {
let headers_rising = headers_falling.iter().rev().cloned().collect::<Vec<_>>();
// ensure the downloaded headers are valid
if let Err(err) = self.consensus.validate_header_range(&headers_rising) {
debug!(target: "downloaders", %err, ?self.start_hash, "Received bad header response");
self.client.report_bad_message(peer);
return
}
// get the bodies request so it can be polled later
let hashes = headers_falling.iter().map(|h| h.hash()).collect::<Vec<_>>();
// populate the pending headers
self.pending_headers = headers_falling.clone().into();
// set the actual request if it hasn't been started yet
if !self.has_bodies_request_started() {
// request the bodies for the downloaded headers
self.request.bodies = Some(self.client.get_block_bodies(hashes));
}
// set the headers response
self.headers = Some(headers_falling);
}
}
}
/// Returns whether or not a bodies request has been started, returning false if there is no /// Returns whether or not a bodies request has been started, returning false if there is no
/// pending request. /// pending request.
fn has_bodies_request_started(&self) -> bool { fn has_bodies_request_started(&self) -> bool {
@ -500,39 +552,7 @@ where
RangeResponseResult::Header(res) => { RangeResponseResult::Header(res) => {
match res { match res {
Ok(headers) => { Ok(headers) => {
let (peer, mut headers) = headers this.on_headers_response(headers);
.map(|h| {
h.iter().map(|h| h.clone().seal_slow()).collect::<Vec<_>>()
})
.split();
// fill in the response if it's the correct length
if headers.len() == this.count as usize {
// sort headers from highest to lowest block number
headers.sort_unstable_by_key(|h| Reverse(h.number));
// check the starting hash
if headers[0].hash() != this.start_hash {
// received bad response
this.client.report_bad_message(peer);
} else {
// get the bodies request so it can be polled later
let hashes =
headers.iter().map(|h| h.hash()).collect::<Vec<_>>();
// populate the pending headers
this.pending_headers = headers.clone().into();
// set the actual request if it hasn't been started yet
if !this.has_bodies_request_started() {
this.request.bodies =
Some(this.client.get_block_bodies(hashes));
}
// set the headers response
this.headers = Some(headers);
}
}
} }
Err(err) => { Err(err) => {
debug!(target: "downloaders", %err, ?this.start_hash, "Header range download failed"); debug!(target: "downloaders", %err, ?this.start_hash, "Header range download failed");
@ -561,10 +581,9 @@ where
// first insert the received bodies // first insert the received bodies
this.insert_bodies( this.insert_bodies(
new_bodies new_bodies
.iter() .into_iter()
.map(|resp| WithPeerId::new(peer, resp.clone())) .map(|resp| WithPeerId::new(peer, resp))
.map(BodyResponse::PendingValidation) .map(BodyResponse::PendingValidation),
.collect::<Vec<_>>(),
); );
if !this.is_bodies_complete() { if !this.is_bodies_complete() {
@ -723,7 +742,7 @@ mod tests {
let header = SealedHeader::default(); let header = SealedHeader::default();
let body = BlockBody::default(); let body = BlockBody::default();
client.insert(header.clone(), body.clone()); client.insert(header.clone(), body.clone());
let client = FullBlockClient::new(client); let client = FullBlockClient::test_client(client);
let received = client.get_full_block(header.hash()).await; let received = client.get_full_block(header.hash()).await;
assert_eq!(received, SealedBlock::new(header, body)); assert_eq!(received, SealedBlock::new(header, body));
@ -735,7 +754,7 @@ mod tests {
let header = SealedHeader::default(); let header = SealedHeader::default();
let body = BlockBody::default(); let body = BlockBody::default();
client.insert(header.clone(), body.clone()); client.insert(header.clone(), body.clone());
let client = FullBlockClient::new(client); let client = FullBlockClient::test_client(client);
let received = client.get_full_block_range(header.hash(), 1).await; let received = client.get_full_block_range(header.hash(), 1).await;
let received = received.first().expect("response should include a block"); let received = received.first().expect("response should include a block");
@ -754,7 +773,7 @@ mod tests {
header = header.header.seal_slow(); header = header.header.seal_slow();
client.insert(header.clone(), body.clone()); client.insert(header.clone(), body.clone());
} }
let client = FullBlockClient::new(client); let client = FullBlockClient::test_client(client);
let received = client.get_full_block_range(header.hash(), 1).await; let received = client.get_full_block_range(header.hash(), 1).await;
let received = received.first().expect("response should include a block"); let received = received.first().expect("response should include a block");
@ -780,7 +799,7 @@ mod tests {
header = header.header.seal_slow(); header = header.header.seal_slow();
client.insert(header.clone(), body.clone()); client.insert(header.clone(), body.clone());
} }
let client = FullBlockClient::new(client); let client = FullBlockClient::test_client(client);
let future = client.get_full_block_range(header.hash(), 1); let future = client.get_full_block_range(header.hash(), 1);
let mut stream = FullBlockRangeStream::from(future); let mut stream = FullBlockRangeStream::from(future);
@ -826,7 +845,7 @@ mod tests {
header = header.header.seal_slow(); header = header.header.seal_slow();
client.insert(header.clone(), body.clone()); client.insert(header.clone(), body.clone());
} }
let client = FullBlockClient::new(client); let client = FullBlockClient::test_client(client);
let received = client.get_full_block_range(header.hash(), 1).await; let received = client.get_full_block_range(header.hash(), 1).await;
let received = received.first().expect("response should include a block"); let received = received.first().expect("response should include a block");