feat(stages): unwind on detached local head (#3066)

This commit is contained in:
Roman Krasiuk
2023-06-09 20:35:46 +03:00
committed by GitHub
parent b414eee015
commit 1ff26dd2fd
14 changed files with 262 additions and 134 deletions

View File

@ -247,6 +247,13 @@ pub fn validate_header_regarding_parent(
}) })
} }
if parent.hash != child.parent_hash {
return Err(ConsensusError::ParentHashMismatch {
expected_parent_hash: parent.hash,
got_parent_hash: child.parent_hash,
})
}
// timestamp in past check // timestamp in past check
if child.timestamp <= parent.timestamp { if child.timestamp <= parent.timestamp {
return Err(ConsensusError::TimestampIsInPast { return Err(ConsensusError::TimestampIsInPast {

View File

@ -74,11 +74,15 @@ pub enum ConsensusError {
#[error("Block parent [hash:{hash:?}] is not known.")] #[error("Block parent [hash:{hash:?}] is not known.")]
ParentUnknown { hash: BlockHash }, ParentUnknown { hash: BlockHash },
#[error( #[error(
"Block number {block_number} is mismatch with parent block number {parent_block_number}" "Block number {block_number} does not match parent block number {parent_block_number}"
)] )]
ParentBlockNumberMismatch { parent_block_number: BlockNumber, block_number: BlockNumber }, ParentBlockNumberMismatch { parent_block_number: BlockNumber, block_number: BlockNumber },
#[error( #[error(
"Block timestamp {timestamp} is in the past compared to the parent timestamp {parent_timestamp}." "Parent hash {got_parent_hash:?} does not match the expected {expected_parent_hash:?}"
)]
ParentHashMismatch { expected_parent_hash: H256, got_parent_hash: H256 },
#[error(
"Block timestamp {timestamp} is in the past compared to the parent timestamp {parent_timestamp}."
)] )]
TimestampIsInPast { parent_timestamp: u64, timestamp: u64 }, TimestampIsInPast { parent_timestamp: u64, timestamp: u64 },
#[error("Block timestamp {timestamp} is in the future compared to our clock time {present_timestamp}.")] #[error("Block timestamp {timestamp} is in the future compared to our clock time {present_timestamp}.")]

View File

@ -115,7 +115,7 @@ impl From<oneshot::error::RecvError> for RequestError {
pub type DownloadResult<T> = Result<T, DownloadError>; pub type DownloadResult<T> = Result<T, DownloadError>;
/// The downloader error type /// The downloader error type
#[derive(Error, Debug, Clone)] #[derive(Error, Debug, Clone, PartialEq, Eq)]
pub enum DownloadError { pub enum DownloadError {
/* ==================== HEADER ERRORS ==================== */ /* ==================== HEADER ERRORS ==================== */
/// Header validation failed /// Header validation failed
@ -127,19 +127,6 @@ pub enum DownloadError {
#[source] #[source]
error: consensus::ConsensusError, error: consensus::ConsensusError,
}, },
/// Error when checking that the current [`Header`] has the parent's hash as the parent_hash
/// field, and that they have sequential block numbers.
#[error("Headers did not match, current number: {header_number} / current hash: {header_hash}, parent number: {parent_number} / parent_hash: {parent_hash}")]
MismatchedHeaders {
/// The header number being evaluated
header_number: BlockNumber,
/// The header hash being evaluated
header_hash: H256,
/// The parent number being evaluated
parent_number: BlockNumber,
/// The parent hash being evaluated
parent_hash: H256,
},
/// Received an invalid tip /// Received an invalid tip
#[error("Received invalid tip: {received:?}. Expected {expected:?}.")] #[error("Received invalid tip: {received:?}. Expected {expected:?}.")]
InvalidTip { InvalidTip {

View File

@ -1,3 +1,4 @@
use super::error::HeadersDownloaderResult;
use crate::{ use crate::{
consensus::Consensus, consensus::Consensus,
p2p::error::{DownloadError, DownloadResult}, p2p::error::{DownloadError, DownloadResult},
@ -12,7 +13,9 @@ use reth_primitives::{BlockHashOrNumber, SealedHeader, H256};
/// of fulfilling these requests. /// of fulfilling these requests.
/// ///
/// A [HeaderDownloader] is a [Stream] that returns batches of headers. /// A [HeaderDownloader] is a [Stream] that returns batches of headers.
pub trait HeaderDownloader: Send + Sync + Stream<Item = Vec<SealedHeader>> + Unpin { pub trait HeaderDownloader:
Send + Sync + Stream<Item = HeadersDownloaderResult<Vec<SealedHeader>>> + Unpin
{
/// Updates the gap to sync which ranges from local head to the sync target /// Updates the gap to sync which ranges from local head to the sync target
/// ///
/// See also [HeaderDownloader::update_sync_target] and [HeaderDownloader::update_local_head] /// See also [HeaderDownloader::update_sync_target] and [HeaderDownloader::update_local_head]
@ -76,7 +79,6 @@ pub fn validate_header_download(
header: &SealedHeader, header: &SealedHeader,
parent: &SealedHeader, parent: &SealedHeader,
) -> DownloadResult<()> { ) -> DownloadResult<()> {
ensure_parent(header, parent)?;
// validate header against parent // validate header against parent
consensus consensus
.validate_header_against_parent(header, parent) .validate_header_against_parent(header, parent)
@ -87,16 +89,3 @@ pub fn validate_header_download(
.map_err(|error| DownloadError::HeaderValidation { hash: parent.hash(), error })?; .map_err(|error| DownloadError::HeaderValidation { hash: parent.hash(), error })?;
Ok(()) Ok(())
} }
/// Ensures that the given `parent` header is the actual parent of the `header`
pub fn ensure_parent(header: &SealedHeader, parent: &SealedHeader) -> DownloadResult<()> {
if !(parent.hash() == header.parent_hash && parent.number + 1 == header.number) {
return Err(DownloadError::MismatchedHeaders {
header_number: header.number,
parent_number: parent.number,
header_hash: header.hash(),
parent_hash: parent.hash(),
})
}
Ok(())
}

View File

@ -0,0 +1,22 @@
use crate::consensus::ConsensusError;
use reth_primitives::SealedHeader;
use thiserror::Error;
/// Header downloader result
pub type HeadersDownloaderResult<T> = Result<T, HeadersDownloaderError>;
/// Error variants that can happen when sending requests to a session.
#[derive(Debug, Error, Clone, Eq, PartialEq)]
pub enum HeadersDownloaderError {
/// The downloaded header cannot be attached to the local head,
/// but is valid otherwise.
#[error("Valid downloaded header cannot be attached to the local head. Details: {error}.")]
DetachedHead {
/// The local head we attempted to attach to.
local_head: SealedHeader,
/// The header we attempted to attach.
header: SealedHeader,
/// The error that occurred when attempting to attach the header.
error: Box<ConsensusError>,
},
}

View File

@ -9,3 +9,6 @@ pub mod client;
/// [`Consensus`]: crate::consensus::Consensus /// [`Consensus`]: crate::consensus::Consensus
/// [`HeadersClient`]: client::HeadersClient /// [`HeadersClient`]: client::HeadersClient
pub mod downloader; pub mod downloader;
/// Header downloader error.
pub mod error;

View File

@ -7,6 +7,7 @@ use crate::{
headers::{ headers::{
client::{HeadersClient, HeadersRequest}, client::{HeadersClient, HeadersRequest},
downloader::{validate_header_download, HeaderDownloader, SyncTarget}, downloader::{validate_header_download, HeaderDownloader, SyncTarget},
error::HeadersDownloaderResult,
}, },
priority::Priority, priority::Priority,
}, },
@ -84,20 +85,20 @@ impl HeaderDownloader for TestHeaderDownloader {
} }
impl Stream for TestHeaderDownloader { impl Stream for TestHeaderDownloader {
type Item = Vec<SealedHeader>; type Item = HeadersDownloaderResult<Vec<SealedHeader>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut(); let this = self.get_mut();
loop { loop {
if this.queued_headers.len() == this.batch_size { if this.queued_headers.len() == this.batch_size {
return Poll::Ready(Some(std::mem::take(&mut this.queued_headers))) return Poll::Ready(Some(Ok(std::mem::take(&mut this.queued_headers))))
} }
if this.download.is_none() { if this.download.is_none() {
this.download.insert(this.create_download()); this.download.insert(this.create_download());
} }
match ready!(this.download.as_mut().unwrap().poll_next_unpin(cx)) { match ready!(this.download.as_mut().unwrap().poll_next_unpin(cx)) {
None => return Poll::Ready(Some(std::mem::take(&mut this.queued_headers))), None => return Poll::Ready(Some(Ok(std::mem::take(&mut this.queued_headers)))),
Some(header) => this.queued_headers.push(header.unwrap()), Some(header) => this.queued_headers.push(header.unwrap()),
} }
} }

View File

@ -12,6 +12,7 @@ use reth_interfaces::{
headers::{ headers::{
client::{HeadersClient, HeadersRequest}, client::{HeadersClient, HeadersRequest},
downloader::{validate_header_download, HeaderDownloader, SyncTarget}, downloader::{validate_header_download, HeaderDownloader, SyncTarget},
error::{HeadersDownloaderError, HeadersDownloaderResult},
}, },
priority::Priority, priority::Priority,
}, },
@ -28,7 +29,8 @@ use std::{
sync::Arc, sync::Arc,
task::{ready, Context, Poll}, task::{ready, Context, Poll},
}; };
use tracing::trace; use thiserror::Error;
use tracing::{error, trace};
/// A heuristic that is used to determine the number of requests that should be prepared for a peer. /// A heuristic that is used to determine the number of requests that should be prepared for a peer.
/// This should ensure that there are always requests lined up for peers to handle while the /// This should ensure that there are always requests lined up for peers to handle while the
@ -38,6 +40,16 @@ const REQUESTS_PER_PEER_MULTIPLIER: usize = 5;
/// The scope for headers downloader metrics. /// The scope for headers downloader metrics.
pub const HEADERS_DOWNLOADER_SCOPE: &str = "downloaders.headers"; pub const HEADERS_DOWNLOADER_SCOPE: &str = "downloaders.headers";
/// Wrapper for internal downloader errors.
#[allow(clippy::large_enum_variant)]
#[derive(Error, Debug)]
enum ReverseHeadersDownloaderError {
#[error(transparent)]
Downloader(#[from] HeadersDownloaderError),
#[error(transparent)]
Response(#[from] HeadersResponseError),
}
/// Downloads headers concurrently. /// Downloads headers concurrently.
/// ///
/// This [HeaderDownloader] downloads headers using the configured [HeadersClient]. /// This [HeaderDownloader] downloads headers using the configured [HeadersClient].
@ -186,6 +198,37 @@ where
self.queued_validated_headers.last().or(self.lowest_validated_header.as_ref()) self.queued_validated_headers.last().or(self.lowest_validated_header.as_ref())
} }
/// Validate that the received header matches the expected sync target.
fn validate_sync_target(
&self,
header: &SealedHeader,
request: HeadersRequest,
peer_id: PeerId,
) -> Result<(), HeadersResponseError> {
match self.existing_sync_target() {
SyncTargetBlock::Hash(hash) | SyncTargetBlock::HashAndNumber { hash, .. }
if header.hash() != hash =>
{
Err(HeadersResponseError {
request,
peer_id: Some(peer_id),
error: DownloadError::InvalidTip { received: header.hash(), expected: hash },
})
}
SyncTargetBlock::Number(number) if header.number != number => {
Err(HeadersResponseError {
request,
peer_id: Some(peer_id),
error: DownloadError::InvalidTipNumber {
received: header.number,
expected: number,
},
})
}
_ => Ok(()),
}
}
/// Processes the next headers in line. /// Processes the next headers in line.
/// ///
/// This will validate all headers and insert them into the validated buffer. /// This will validate all headers and insert them into the validated buffer.
@ -199,8 +242,7 @@ where
request: HeadersRequest, request: HeadersRequest,
headers: Vec<Header>, headers: Vec<Header>,
peer_id: PeerId, peer_id: PeerId,
) -> Result<(), HeadersResponseError> { ) -> Result<(), ReverseHeadersDownloaderError> {
let sync_target = self.existing_sync_target();
let mut validated = Vec::with_capacity(headers.len()); let mut validated = Vec::with_capacity(headers.len());
let sealed_headers = headers.into_par_iter().map(|h| h.seal_slow()).collect::<Vec<_>>(); let sealed_headers = headers.into_par_iter().map(|h| h.seal_slow()).collect::<Vec<_>>();
@ -211,59 +253,45 @@ where
{ {
if let Err(error) = self.validate(validated_header, &parent) { if let Err(error) = self.validate(validated_header, &parent) {
trace!(target: "downloaders::headers", ?error ,"Failed to validate header"); trace!(target: "downloaders::headers", ?error ,"Failed to validate header");
return Err(HeadersResponseError { request, peer_id: Some(peer_id), error }) return Err(
HeadersResponseError { request, peer_id: Some(peer_id), error }.into()
)
} }
} else { } else {
match sync_target { self.validate_sync_target(&parent, request.clone(), peer_id)?;
SyncTargetBlock::Hash(hash) => {
if parent.hash() != hash {
return Err(HeadersResponseError {
request,
peer_id: Some(peer_id),
error: DownloadError::InvalidTip {
received: parent.hash(),
expected: hash,
},
})
}
}
SyncTargetBlock::Number(number) => {
if parent.number != number {
return Err(HeadersResponseError {
request,
peer_id: Some(peer_id),
error: DownloadError::InvalidTipNumber {
received: parent.number,
expected: number,
},
})
}
}
SyncTargetBlock::HashAndNumber { hash, .. } => {
if parent.hash() != hash {
return Err(HeadersResponseError {
request,
peer_id: Some(peer_id),
error: DownloadError::InvalidTip {
received: parent.hash(),
expected: hash,
},
})
}
}
}
} }
validated.push(parent); validated.push(parent);
} }
// If the last (smallest) validated header attaches to the local head, validate it. // If the last (smallest) validated header attaches to the local head, validate it.
if let Some((last_header, head)) = validated.last().zip(self.local_head.as_ref()) { if let Some((last_header, head)) = validated
if last_header.number == head.number + 1 { .last_mut()
if let Err(error) = self.validate(last_header, head) { .zip(self.local_head.as_ref())
trace!(target: "downloaders::headers", ?error ,"Failed to validate header"); .filter(|(last, head)| last.number == head.number + 1)
return Err(HeadersResponseError { request, peer_id: Some(peer_id), error }) {
// Every header must be valid on its own
if let Err(error) = self.consensus.validate_header(last_header) {
trace!(target: "downloaders::headers", ?error, "Failed to validate header");
return Err(HeadersResponseError {
request,
peer_id: Some(peer_id),
error: DownloadError::HeaderValidation { hash: head.hash(), error },
} }
.into())
}
// If the header is valid on its own, but not against its parent, we return it as
// detached head error.
if let Err(error) = self.consensus.validate_header_against_parent(last_header, head) {
// Replace the last header with a detached variant
error!(target: "downloaders::headers", ?error, number = last_header.number, hash = ?last_header.hash, "Header cannot be attached to known canonical chain");
return Err(HeadersDownloaderError::DetachedHead {
local_head: head.clone(),
header: last_header.clone(),
error: Box::new(error),
}
.into())
} }
} }
@ -318,7 +346,7 @@ where
fn on_sync_target_outcome( fn on_sync_target_outcome(
&mut self, &mut self,
response: HeadersRequestOutcome, response: HeadersRequestOutcome,
) -> Result<(), HeadersResponseError> { ) -> Result<(), ReverseHeadersDownloaderError> {
let sync_target = self.existing_sync_target(); let sync_target = self.existing_sync_target();
let HeadersRequestOutcome { request, outcome } = response; let HeadersRequestOutcome { request, outcome } = response;
match outcome { match outcome {
@ -336,7 +364,8 @@ where
request, request,
peer_id: Some(peer_id), peer_id: Some(peer_id),
error: DownloadError::EmptyResponse, error: DownloadError::EmptyResponse,
}) }
.into())
} }
let target = headers.remove(0).seal_slow(); let target = headers.remove(0).seal_slow();
@ -351,7 +380,8 @@ where
received: target.hash(), received: target.hash(),
expected: hash, expected: hash,
}, },
}) }
.into())
} }
} }
SyncTargetBlock::Number(number) => { SyncTargetBlock::Number(number) => {
@ -363,7 +393,8 @@ where
received: target.number, received: target.number,
expected: number, expected: number,
}, },
}) }
.into())
} }
} }
SyncTargetBlock::HashAndNumber { hash, .. } => { SyncTargetBlock::HashAndNumber { hash, .. } => {
@ -375,7 +406,8 @@ where
received: target.hash(), received: target.hash(),
expected: hash, expected: hash,
}, },
}) }
.into())
} }
} }
} }
@ -389,11 +421,15 @@ where
self.queued_validated_headers.push(target); self.queued_validated_headers.push(target);
// try to validate all buffered responses blocked by this successful response // try to validate all buffered responses blocked by this successful response
self.try_validate_buffered().map(Err::<(), HeadersResponseError>).transpose()?; self.try_validate_buffered()
.map(Err::<(), ReverseHeadersDownloaderError>)
.transpose()?;
Ok(()) Ok(())
} }
Err(err) => Err(HeadersResponseError { request, peer_id: None, error: err.into() }), Err(err) => {
Err(HeadersResponseError { request, peer_id: None, error: err.into() }.into())
}
} }
} }
@ -402,7 +438,7 @@ where
fn on_headers_outcome( fn on_headers_outcome(
&mut self, &mut self,
response: HeadersRequestOutcome, response: HeadersRequestOutcome,
) -> Result<(), HeadersResponseError> { ) -> Result<(), ReverseHeadersDownloaderError> {
let requested_block_number = response.block_number(); let requested_block_number = response.block_number();
let HeadersRequestOutcome { request, outcome } = response; let HeadersRequestOutcome { request, outcome } = response;
@ -420,7 +456,8 @@ where
request, request,
peer_id: Some(peer_id), peer_id: Some(peer_id),
error: DownloadError::EmptyResponse, error: DownloadError::EmptyResponse,
}) }
.into())
} }
if (headers.len() as u64) != request.limit { if (headers.len() as u64) != request.limit {
@ -431,7 +468,8 @@ where
expected: request.limit, expected: request.limit,
}, },
request, request,
}) }
.into())
} }
// sort headers from highest to lowest block number // sort headers from highest to lowest block number
@ -450,7 +488,8 @@ where
received: highest.number, received: highest.number,
expected: requested_block_number, expected: requested_block_number,
}, },
}) }
.into())
} }
// check if the response is the next expected // check if the response is the next expected
@ -459,7 +498,7 @@ where
self.process_next_headers(request, headers, peer_id)?; self.process_next_headers(request, headers, peer_id)?;
// try to validate all buffered responses blocked by this successful response // try to validate all buffered responses blocked by this successful response
self.try_validate_buffered() self.try_validate_buffered()
.map(Err::<(), HeadersResponseError>) .map(Err::<(), ReverseHeadersDownloaderError>)
.transpose()?; .transpose()?;
} else if highest.number > self.existing_local_block_number() { } else if highest.number > self.existing_local_block_number() {
self.metrics.buffered_responses.increment(1.); self.metrics.buffered_responses.increment(1.);
@ -477,7 +516,7 @@ where
// would've been handled by the fetcher internally // would've been handled by the fetcher internally
Err(err) => { Err(err) => {
trace!(target: "downloaders::headers", %err, "Response error"); trace!(target: "downloaders::headers", %err, "Response error");
Err(HeadersResponseError { request, peer_id: None, error: err.into() }) Err(HeadersResponseError { request, peer_id: None, error: err.into() }.into())
} }
} }
} }
@ -508,7 +547,7 @@ where
/// Attempts to validate the buffered responses /// Attempts to validate the buffered responses
/// ///
/// Returns an error if the next expected response was popped, but failed validation. /// Returns an error if the next expected response was popped, but failed validation.
fn try_validate_buffered(&mut self) -> Option<HeadersResponseError> { fn try_validate_buffered(&mut self) -> Option<ReverseHeadersDownloaderError> {
loop { loop {
// Check to see if we've already received the next value // Check to see if we've already received the next value
let next_response = self.buffered_responses.peek_mut()?; let next_response = self.buffered_responses.peek_mut()?;
@ -691,7 +730,7 @@ impl<H> Stream for ReverseHeadersDownloader<H>
where where
H: HeadersClient + 'static, H: HeadersClient + 'static,
{ {
type Item = Vec<SealedHeader>; type Item = HeadersDownloaderResult<Vec<SealedHeader>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut(); let this = self.get_mut();
@ -713,20 +752,25 @@ where
while let Some(mut req) = this.sync_target_request.take() { while let Some(mut req) = this.sync_target_request.take() {
match req.poll_unpin(cx) { match req.poll_unpin(cx) {
Poll::Ready(outcome) => { Poll::Ready(outcome) => {
if let Err(err) = this.on_sync_target_outcome(outcome) { match this.on_sync_target_outcome(outcome) {
trace!(target: "downloaders::headers", ?err, "invalid sync target response"); Ok(()) => break,
if err.is_channel_closed() { Err(ReverseHeadersDownloaderError::Response(error)) => {
// download channel closed which means the network was dropped trace!(target: "downloaders::headers", ?error, "invalid sync target response");
return Poll::Ready(None) if error.is_channel_closed() {
} // download channel closed which means the network was dropped
return Poll::Ready(None)
}
this.penalize_peer(err.peer_id, &err.error); this.penalize_peer(error.peer_id, &error.error);
this.metrics.increment_errors(&err.error); this.metrics.increment_errors(&error.error);
this.sync_target_request = this.sync_target_request =
Some(this.request_fut(err.request, Priority::High)); Some(this.request_fut(error.request, Priority::High));
} else { }
break Err(ReverseHeadersDownloaderError::Downloader(error)) => {
} this.clear();
return Poll::Ready(Some(Err(error)))
}
};
} }
Poll::Pending => { Poll::Pending => {
this.sync_target_request = Some(req); this.sync_target_request = Some(req);
@ -748,13 +792,20 @@ where
while let Poll::Ready(Some(outcome)) = this.in_progress_queue.poll_next_unpin(cx) { while let Poll::Ready(Some(outcome)) = this.in_progress_queue.poll_next_unpin(cx) {
this.metrics.in_flight_requests.decrement(1.); this.metrics.in_flight_requests.decrement(1.);
// handle response // handle response
if let Err(err) = this.on_headers_outcome(outcome) { match this.on_headers_outcome(outcome) {
if err.is_channel_closed() { Ok(()) => (),
// download channel closed which means the network was dropped Err(ReverseHeadersDownloaderError::Response(error)) => {
return Poll::Ready(None) if error.is_channel_closed() {
// download channel closed which means the network was dropped
return Poll::Ready(None)
}
this.on_headers_error(error);
} }
this.on_headers_error(err); Err(ReverseHeadersDownloaderError::Downloader(error)) => {
} this.clear();
return Poll::Ready(Some(Err(error)))
}
};
} }
// marks the loop's exit condition: exit if no requests submitted // marks the loop's exit condition: exit if no requests submitted
@ -791,7 +842,7 @@ where
trace!(target: "downloaders::headers", batch=%next_batch.len(), "Returning validated batch"); trace!(target: "downloaders::headers", batch=%next_batch.len(), "Returning validated batch");
this.metrics.total_flushed.increment(next_batch.len() as u64); this.metrics.total_flushed.increment(next_batch.len() as u64);
return Poll::Ready(Some(next_batch)) return Poll::Ready(Some(Ok(next_batch)))
} }
if !progress { if !progress {
@ -807,7 +858,7 @@ where
return Poll::Ready(None) return Poll::Ready(None)
} }
this.metrics.total_flushed.increment(next_batch.len() as u64); this.metrics.total_flushed.increment(next_batch.len() as u64);
return Poll::Ready(Some(next_batch)) return Poll::Ready(Some(Ok(next_batch)))
} }
Poll::Pending Poll::Pending
@ -902,6 +953,18 @@ impl HeadersResponseError {
} }
} }
impl std::fmt::Display for HeadersResponseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Error requesting headers from peer {:?}. Error: {}. Request: {:?}",
self.peer_id, self.error, self.request,
)
}
}
impl std::error::Error for HeadersResponseError {}
/// The block to which we want to close the gap: (local head...sync target] /// The block to which we want to close the gap: (local head...sync target]
/// This tracks the sync target block, so this could be either a block number or hash. /// This tracks the sync target block, so this could be either a block number or hash.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -1333,7 +1396,7 @@ mod tests {
.await; .await;
let headers = downloader.next().await.unwrap(); let headers = downloader.next().await.unwrap();
assert_eq!(headers, vec![p0, p1, p2,]); assert_eq!(headers, Ok(vec![p0, p1, p2,]));
assert!(downloader.buffered_responses.is_empty()); assert!(downloader.buffered_responses.is_empty());
assert!(downloader.next().await.is_none()); assert!(downloader.next().await.is_none());
assert!(downloader.next().await.is_none()); assert!(downloader.next().await.is_none());
@ -1365,12 +1428,12 @@ mod tests {
.await; .await;
let headers = downloader.next().await.unwrap(); let headers = downloader.next().await.unwrap();
assert_eq!(headers, vec![p0]); assert_eq!(headers, Ok(vec![p0]));
let headers = downloader.next().await.unwrap(); let headers = downloader.next().await.unwrap();
assert_eq!(headers, vec![p1]); assert_eq!(headers, Ok(vec![p1]));
let headers = downloader.next().await.unwrap(); let headers = downloader.next().await.unwrap();
assert_eq!(headers, vec![p2]); assert_eq!(headers, Ok(vec![p2]));
assert!(downloader.next().await.is_none()); assert!(downloader.next().await.is_none());
} }
} }

View File

@ -1,7 +1,10 @@
use futures::{FutureExt, Stream}; use futures::{FutureExt, Stream};
use futures_util::StreamExt; use futures_util::StreamExt;
use pin_project::pin_project; use pin_project::pin_project;
use reth_interfaces::p2p::headers::downloader::{HeaderDownloader, SyncTarget}; use reth_interfaces::p2p::headers::{
downloader::{HeaderDownloader, SyncTarget},
error::HeadersDownloaderResult,
};
use reth_primitives::SealedHeader; use reth_primitives::SealedHeader;
use reth_tasks::{TaskSpawner, TokioTaskExecutor}; use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use std::{ use std::{
@ -21,7 +24,7 @@ pub const HEADERS_TASK_BUFFER_SIZE: usize = 8;
#[pin_project] #[pin_project]
pub struct TaskDownloader { pub struct TaskDownloader {
#[pin] #[pin]
from_downloader: ReceiverStream<Vec<SealedHeader>>, from_downloader: ReceiverStream<HeadersDownloaderResult<Vec<SealedHeader>>>,
to_downloader: UnboundedSender<DownloaderUpdates>, to_downloader: UnboundedSender<DownloaderUpdates>,
} }
@ -97,7 +100,7 @@ impl HeaderDownloader for TaskDownloader {
} }
impl Stream for TaskDownloader { impl Stream for TaskDownloader {
type Item = Vec<SealedHeader>; type Item = HeadersDownloaderResult<Vec<SealedHeader>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().from_downloader.poll_next(cx) self.project().from_downloader.poll_next(cx)
@ -107,7 +110,7 @@ impl Stream for TaskDownloader {
/// A [HeaderDownloader] that runs on its own task /// A [HeaderDownloader] that runs on its own task
struct SpawnedDownloader<T> { struct SpawnedDownloader<T> {
updates: UnboundedReceiverStream<DownloaderUpdates>, updates: UnboundedReceiverStream<DownloaderUpdates>,
headers_tx: PollSender<Vec<SealedHeader>>, headers_tx: PollSender<HeadersDownloaderResult<Vec<SealedHeader>>>,
downloader: T, downloader: T,
} }
@ -212,11 +215,11 @@ mod tests {
.await; .await;
let headers = downloader.next().await.unwrap(); let headers = downloader.next().await.unwrap();
assert_eq!(headers, vec![p0]); assert_eq!(headers, Ok(vec![p0]));
let headers = downloader.next().await.unwrap(); let headers = downloader.next().await.unwrap();
assert_eq!(headers, vec![p1]); assert_eq!(headers, Ok(vec![p1]));
let headers = downloader.next().await.unwrap(); let headers = downloader.next().await.unwrap();
assert_eq!(headers, vec![p2]); assert_eq!(headers, Ok(vec![p2]));
} }
} }

View File

@ -328,7 +328,7 @@ mod tests {
downloader.update_sync_target(SyncTarget::Tip(p0.hash())); downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
let headers = downloader.next().await.unwrap(); let headers = downloader.next().await.unwrap();
assert_eq!(headers, vec![p0, p1, p2,]); assert_eq!(headers, Ok(vec![p0, p1, p2]));
assert!(downloader.next().await.is_none()); assert!(downloader.next().await.is_none());
assert!(downloader.next().await.is_none()); assert!(downloader.next().await.is_none());
} }
@ -349,7 +349,7 @@ mod tests {
header_downloader.update_sync_target(SyncTarget::Tip(headers.last().unwrap().hash())); header_downloader.update_sync_target(SyncTarget::Tip(headers.last().unwrap().hash()));
// get headers first // get headers first
let mut downloaded_headers = header_downloader.next().await.unwrap(); let mut downloaded_headers = header_downloader.next().await.unwrap().unwrap();
// reverse to make sure it's in the right order before comparing // reverse to make sure it's in the right order before comparing
downloaded_headers.reverse(); downloaded_headers.reverse();

View File

@ -89,6 +89,16 @@ pub const EMPTY_TRANSACTIONS: H256 = EMPTY_SET_HASH;
/// Withdrawals root of empty withdrawals set. /// Withdrawals root of empty withdrawals set.
pub const EMPTY_WITHDRAWALS: H256 = EMPTY_SET_HASH; pub const EMPTY_WITHDRAWALS: H256 = EMPTY_SET_HASH;
/// The number of blocks to unwind during a reorg that already became a part of canonical chain.
///
/// In reality, the node can end up in this particular situation very rarely. It would happen only
/// if the node process is abruptly terminated during ongoing reorg and doesn't boot back up for
/// long period of time.
///
/// Unwind depth of `3` blocks significantly reduces the chance that the reorged block is kept in
/// the database.
pub const BEACON_CONSENSUS_REORG_UNWIND_DEPTH: u64 = 3;
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View File

@ -20,6 +20,23 @@ pub enum StageError {
#[source] #[source]
error: consensus::ConsensusError, error: consensus::ConsensusError,
}, },
/// The stage encountered a downloader error where the responses cannot be attached to the
/// current head.
#[error(
"Stage encountered inconsistent chain. Downloaded header #{header_number} ({header_hash:?}) is detached from local head #{head_number} ({head_hash:?}). Details: {error}.",
header_number = header.number,
header_hash = header.hash,
head_number = local_head.number,
head_hash = local_head.hash,
)]
DetachedHead {
/// The local head we attempted to attach to.
local_head: SealedHeader,
/// The header we attempted to attach.
header: SealedHeader,
/// The error that occurred when attempting to attach the header.
error: Box<consensus::ConsensusError>,
},
/// The stage encountered a database error. /// The stage encountered a database error.
#[error("An internal database error occurred: {0}")] #[error("An internal database error occurred: {0}")]
Database(#[from] DbError), Database(#[from] DbError),

View File

@ -2,7 +2,10 @@ use crate::{error::*, ExecInput, ExecOutput, Stage, StageError, UnwindInput};
use futures_util::Future; use futures_util::Future;
use reth_db::database::Database; use reth_db::database::Database;
use reth_interfaces::executor::BlockExecutionError; use reth_interfaces::executor::BlockExecutionError;
use reth_primitives::{listener::EventListeners, stage::StageId, BlockNumber, H256}; use reth_primitives::{
constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH, listener::EventListeners, stage::StageId,
BlockNumber, H256,
};
use reth_provider::{providers::get_stage_checkpoint, Transaction}; use reth_provider::{providers::get_stage_checkpoint, Transaction};
use std::pin::Pin; use std::pin::Pin;
use tokio::sync::watch; use tokio::sync::watch;
@ -377,7 +380,16 @@ where
Err(err) => { Err(err) => {
self.listeners.notify(PipelineEvent::Error { stage_id }); self.listeners.notify(PipelineEvent::Error { stage_id });
let out = if let StageError::Validation { block, error } = err { let out = if let StageError::DetachedHead { local_head, header, error } = err {
warn!(target: "sync::pipeline", stage = %stage_id, ?local_head, ?header, ?error, "Stage encountered detached head");
// We unwind because of a detached head.
let unwind_to = local_head
.number
.saturating_sub(BEACON_CONSENSUS_REORG_UNWIND_DEPTH)
.max(1);
Ok(ControlFlow::Unwind { target: unwind_to, bad_block: local_head })
} else if let StageError::Validation { block, error } = err {
warn!( warn!(
target: "sync::pipeline", target: "sync::pipeline",
stage = %stage_id, stage = %stage_id,

View File

@ -7,7 +7,10 @@ use reth_db::{
transaction::{DbTx, DbTxMut}, transaction::{DbTx, DbTxMut},
}; };
use reth_interfaces::{ use reth_interfaces::{
p2p::headers::downloader::{HeaderDownloader, SyncTarget}, p2p::headers::{
downloader::{HeaderDownloader, SyncTarget},
error::HeadersDownloaderError,
},
provider::ProviderError, provider::ProviderError,
}; };
use reth_primitives::{ use reth_primitives::{
@ -217,7 +220,14 @@ where
// down to the local head (latest block in db). // down to the local head (latest block in db).
// Task downloader can return `None` only if the response relaying channel was closed. This // Task downloader can return `None` only if the response relaying channel was closed. This
// is a fatal error to prevent the pipeline from running forever. // is a fatal error to prevent the pipeline from running forever.
let downloaded_headers = self.downloader.next().await.ok_or(StageError::ChannelClosed)?; let downloaded_headers = match self.downloader.next().await {
Some(Ok(headers)) => headers,
Some(Err(HeadersDownloaderError::DetachedHead { local_head, header, error })) => {
error!(target: "sync::stages::headers", ?error, "Cannot attach header to head");
return Err(StageError::DetachedHead { local_head, header, error })
}
None => return Err(StageError::ChannelClosed),
};
info!(target: "sync::stages::headers", len = downloaded_headers.len(), "Received headers"); info!(target: "sync::stages::headers", len = downloaded_headers.len(), "Received headers");