chore(engine): refactor pipeline outcome processing (#7692)

This commit is contained in:
Roman Krasiuk
2024-04-17 15:25:44 +02:00
committed by GitHub
parent dc39fd68f4
commit 7cda5945d7

View File

@ -33,7 +33,7 @@ use reth_provider::{
use reth_rpc_types::engine::{
CancunPayloadFields, ExecutionPayload, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
};
use reth_stages::{ControlFlow, Pipeline, PipelineError};
use reth_stages::{ControlFlow, Pipeline};
use reth_tasks::TaskSpawner;
use reth_tokio_util::EventListeners;
use std::{
@ -784,16 +784,11 @@ where
/// Checks if the given `head` points to an invalid header, which requires a specific response
/// to a forkchoice update.
fn check_invalid_ancestor(&mut self, head: B256) -> Option<PayloadStatus> {
let parent_hash = {
// check if the head was previously marked as invalid
let header = self.invalid_headers.get(&head)?;
header.parent_hash
};
// check if the head was previously marked as invalid
let header = self.invalid_headers.get(&head)?;
// populate the latest valid hash field
let status = self.prepare_invalid_response(parent_hash);
Some(status)
Some(self.prepare_invalid_response(header.parent_hash))
}
/// Record latency metrics for one call to make a block canonical
@ -816,7 +811,6 @@ where
}
Err(_) => self.metrics.make_canonical_error_latency.record(elapsed),
}
elapsed
}
@ -1404,17 +1398,15 @@ where
if let Err((hash, error)) =
self.try_make_sync_target_canonical(downloaded_num_hash)
{
if !error.is_block_hash_not_found() {
if error.is_fatal() {
error!(target: "consensus::engine", %error, "Encountered fatal error while making sync target canonical: {:?}, {:?}", error, hash);
} else {
debug!(
target: "consensus::engine",
"Unexpected error while making sync target canonical: {:?}, {:?}",
error,
hash
)
}
if error.is_fatal() {
error!(target: "consensus::engine", %error, "Encountered fatal error while making sync target canonical: {:?}, {:?}", error, hash);
} else if !error.is_block_hash_not_found() {
debug!(
target: "consensus::engine",
"Unexpected error while making sync target canonical: {:?}, {:?}",
error,
hash
)
}
}
}
@ -1502,59 +1494,57 @@ where
&mut self,
inserted: BlockNumHash,
) -> Result<(), (B256, CanonicalError)> {
if let Some(target) = self.forkchoice_state_tracker.sync_target_state() {
// optimistically try to make the head of the current FCU target canonical, the sync
// target might have changed since the block download request was issued
// (new FCU received)
let start = Instant::now();
let make_canonical_result = self.blockchain.make_canonical(target.head_block_hash);
let elapsed = self.record_make_canonical_latency(start, &make_canonical_result);
match make_canonical_result {
Ok(outcome) => {
if let CanonicalOutcome::Committed { head } = &outcome {
self.listeners.notify(BeaconConsensusEngineEvent::CanonicalChainCommitted(
Box::new(head.clone()),
elapsed,
));
}
let Some(target) = self.forkchoice_state_tracker.sync_target_state() else { return Ok(()) };
let new_head = outcome.into_header();
debug!(target: "consensus::engine", hash=?new_head.hash(), number=new_head.number, "Canonicalized new head");
// we can update the FCU blocks
if let Err(err) = self.update_canon_chain(new_head, &target) {
debug!(target: "consensus::engine", ?err, ?target, "Failed to update the canonical chain tracker");
}
// we're no longer syncing
self.sync_state_updater.update_sync_state(SyncState::Idle);
// clear any active block requests
self.sync.clear_block_download_requests();
Ok(())
// optimistically try to make the head of the current FCU target canonical, the sync
// target might have changed since the block download request was issued
// (new FCU received)
let start = Instant::now();
let make_canonical_result = self.blockchain.make_canonical(target.head_block_hash);
let elapsed = self.record_make_canonical_latency(start, &make_canonical_result);
match make_canonical_result {
Ok(outcome) => {
if let CanonicalOutcome::Committed { head } = &outcome {
self.listeners.notify(BeaconConsensusEngineEvent::CanonicalChainCommitted(
Box::new(head.clone()),
elapsed,
));
}
Err(err) => {
// if we failed to make the FCU's head canonical, because we don't have that
// block yet, then we can try to make the inserted block canonical if we know
// it's part of the canonical chain: if it's the safe or the finalized block
if err.is_block_hash_not_found() {
// if the inserted block is the currently targeted `finalized` or `safe`
// block, we will attempt to make them canonical,
// because they are also part of the canonical chain and
// their missing block range might already be downloaded (buffered).
if let Some(target_hash) = ForkchoiceStateHash::find(&target, inserted.hash)
.filter(|h| !h.is_head())
{
// TODO: do not ignore this
let _ = self.blockchain.make_canonical(*target_hash.as_ref());
}
}
Err((target.head_block_hash, err))
let new_head = outcome.into_header();
debug!(target: "consensus::engine", hash=?new_head.hash(), number=new_head.number, "Canonicalized new head");
// we can update the FCU blocks
if let Err(err) = self.update_canon_chain(new_head, &target) {
debug!(target: "consensus::engine", ?err, ?target, "Failed to update the canonical chain tracker");
}
// we're no longer syncing
self.sync_state_updater.update_sync_state(SyncState::Idle);
// clear any active block requests
self.sync.clear_block_download_requests();
Ok(())
}
Err(err) => {
// if we failed to make the FCU's head canonical, because we don't have that
// block yet, then we can try to make the inserted block canonical if we know
// it's part of the canonical chain: if it's the safe or the finalized block
if err.is_block_hash_not_found() {
// if the inserted block is the currently targeted `finalized` or `safe`
// block, we will attempt to make them canonical,
// because they are also part of the canonical chain and
// their missing block range might already be downloaded (buffered).
if let Some(target_hash) =
ForkchoiceStateHash::find(&target, inserted.hash).filter(|h| !h.is_head())
{
// TODO: do not ignore this
let _ = self.blockchain.make_canonical(*target_hash.as_ref());
}
}
Err((target.head_block_hash, err))
}
} else {
Ok(())
}
}
@ -1564,168 +1554,144 @@ where
fn on_sync_event(
&mut self,
event: EngineSyncEvent,
) -> Option<Result<(), BeaconConsensusEngineError>> {
match event {
) -> Result<SyncEventOutcome, BeaconConsensusEngineError> {
let outcome = match event {
EngineSyncEvent::FetchedFullBlock(block) => {
self.on_downloaded_block(block);
SyncEventOutcome::Processed
}
EngineSyncEvent::PipelineStarted(target) => {
trace!(target: "consensus::engine", ?target, continuous = target.is_none(), "Started the pipeline");
self.metrics.pipeline_runs.increment(1);
self.sync_state_updater.update_sync_state(SyncState::Syncing);
SyncEventOutcome::Processed
}
EngineSyncEvent::PipelineFinished { result, reached_max_block } => {
trace!(target: "consensus::engine", ?result, ?reached_max_block, "Pipeline finished");
// Any pipeline error at this point is fatal.
let ctrl = result?;
if reached_max_block {
// Terminate the sync early if it's reached the maximum user-configured block.
SyncEventOutcome::ReachedMaxBlock
} else {
self.on_pipeline_outcome(ctrl)?;
SyncEventOutcome::Processed
}
}
EngineSyncEvent::PipelineTaskDropped => {
error!(target: "consensus::engine", "Failed to receive spawned pipeline");
return Some(Err(BeaconConsensusEngineError::PipelineChannelClosed))
}
EngineSyncEvent::PipelineFinished { result, reached_max_block } => {
return self.on_pipeline_finished(result, reached_max_block)
return Err(BeaconConsensusEngineError::PipelineChannelClosed)
}
};
None
Ok(outcome)
}
/// Invoked when the pipeline has finished.
/// Invoked when the pipeline has successfully finished.
///
/// Returns an Option to indicate whether the engine future should resolve:
///
/// Returns a result if:
/// - Ok(()) if the pipeline finished successfully
/// - Err(..) if the pipeline failed fatally
///
/// Returns None if the pipeline finished successfully and engine should continue.
fn on_pipeline_finished(
&mut self,
result: Result<ControlFlow, PipelineError>,
reached_max_block: bool,
) -> Option<Result<(), BeaconConsensusEngineError>> {
trace!(target: "consensus::engine", ?result, ?reached_max_block, "Pipeline finished");
match result {
Ok(ctrl) => {
if reached_max_block {
// Terminate the sync early if it's reached the maximum user
// configured block.
return Some(Ok(()))
}
/// Updates the internal sync state depending on the pipeline configuration,
/// the outcome of the pipeline run and the last observed forkchoice state.
fn on_pipeline_outcome(&mut self, ctrl: ControlFlow) -> RethResult<()> {
// Pipeline unwound, memorize the invalid block and
// wait for CL for further sync instructions.
if let ControlFlow::Unwind { bad_block, .. } = ctrl {
warn!(target: "consensus::engine", invalid_hash=?bad_block.hash(), invalid_number=?bad_block.number, "Bad block detected in unwind");
// update the `invalid_headers` cache with the new invalid header
self.invalid_headers.insert(*bad_block);
return Ok(())
}
if let ControlFlow::Unwind { bad_block, .. } = ctrl {
warn!(target: "consensus::engine", invalid_hash=?bad_block.hash(), invalid_number=?bad_block.number, "Bad block detected in unwind");
// update the canon chain if continuous is enabled
if self.sync.run_pipeline_continuously() {
let max_block = ctrl.block_number().unwrap_or_default();
let max_header = self.blockchain.sealed_header(max_block)
.inspect_err(|error| {
error!(target: "consensus::engine", %error, "Error getting canonical header for continuous sync");
})?
.ok_or_else(|| ProviderError::HeaderNotFound(max_block.into()))?;
self.blockchain.set_canonical_head(max_header);
}
// update the `invalid_headers` cache with the new invalid headers
self.invalid_headers.insert(*bad_block);
return None
}
// update the canon chain if continuous is enabled
if self.sync.run_pipeline_continuously() {
let max_block = ctrl.block_number().unwrap_or_default();
let max_header = match self.blockchain.sealed_header(max_block) {
Ok(header) => match header {
Some(header) => header,
None => {
return Some(Err(RethError::Provider(
ProviderError::HeaderNotFound(max_block.into()),
)
.into()))
}
},
Err(error) => {
error!(target: "consensus::engine", %error, "Error getting canonical header for continuous sync");
return Some(Err(RethError::Provider(error).into()))
}
};
self.blockchain.set_canonical_head(max_header);
}
let sync_target_state = match self.forkchoice_state_tracker.sync_target_state() {
Some(current_state) => current_state,
None => {
// This is only possible if the node was run with `debug.tip`
// argument and without CL.
warn!(target: "consensus::engine", "No fork choice state available");
return None
}
};
// Next, we check if we need to schedule another pipeline run or transition
// to live sync via tree.
// This can arise if we buffer the forkchoice head, and if the head is an
// ancestor of an invalid block.
//
// * The forkchoice head could be buffered if it were first sent as a `newPayload`
// request.
//
// In this case, we won't have the head hash in the database, so we would
// set the pipeline sync target to a known-invalid head.
//
// This is why we check the invalid header cache here.
let lowest_buffered_ancestor =
self.lowest_buffered_ancestor_or(sync_target_state.head_block_hash);
// this inserts the head if the lowest buffered ancestor is invalid
if self
.check_invalid_ancestor_with_head(
lowest_buffered_ancestor,
sync_target_state.head_block_hash,
)
.is_none()
{
// get the block number of the finalized block, if we have it
let newest_finalized = self
.forkchoice_state_tracker
.sync_target_state()
.map(|s| s.finalized_block_hash)
.and_then(|h| self.blockchain.buffered_header_by_hash(h))
.map(|header| header.number);
// The block number that the pipeline finished at - if the progress or newest
// finalized is None then we can't check the distance anyways.
//
// If both are Some, we perform another distance check and return the desired
// pipeline target
let pipeline_target = if let (Some(progress), Some(finalized_number)) =
(ctrl.block_number(), newest_finalized)
{
// Determines whether or not we should run the pipeline again, in case the
// new gap is large enough to warrant running the pipeline.
self.can_pipeline_sync_to_finalized(progress, finalized_number, None)
} else {
None
};
// If the distance is large enough, we should run the pipeline again to prevent
// the tree update from executing too many blocks and blocking.
if let Some(target) = pipeline_target {
// run the pipeline to the target since the distance is sufficient
self.sync.set_pipeline_sync_target(target);
} else {
// Update the state and hashes of the blockchain tree if possible.
match self.update_tree_on_finished_pipeline(
sync_target_state.finalized_block_hash,
) {
Ok(synced) => {
if !synced {
// We don't have the finalized block in the database, so
// we need to run another pipeline.
self.sync.set_pipeline_sync_target(
sync_target_state.finalized_block_hash,
);
}
}
Err(error) => {
error!(target: "consensus::engine", %error, "Error restoring blockchain tree state");
return Some(Err(error.into()))
}
};
}
}
let sync_target_state = match self.forkchoice_state_tracker.sync_target_state() {
Some(current_state) => current_state,
None => {
// This is only possible if the node was run with `debug.tip`
// argument and without CL.
warn!(target: "consensus::engine", "No fork choice state available");
return Ok(())
}
// Any pipeline error at this point is fatal.
Err(error) => return Some(Err(error.into())),
};
None
// Next, we check if we need to schedule another pipeline run or transition
// to live sync via tree.
// This can arise if we buffer the forkchoice head, and if the head is an
// ancestor of an invalid block.
//
// * The forkchoice head could be buffered if it were first sent as a `newPayload` request.
//
// In this case, we won't have the head hash in the database, so we would
// set the pipeline sync target to a known-invalid head.
//
// This is why we check the invalid header cache here.
let lowest_buffered_ancestor =
self.lowest_buffered_ancestor_or(sync_target_state.head_block_hash);
// this inserts the head into invalid headers cache
// if the lowest buffered ancestor is invalid
if self
.check_invalid_ancestor_with_head(
lowest_buffered_ancestor,
sync_target_state.head_block_hash,
)
.is_some()
{
warn!(
target: "consensus::engine",
invalid_ancestor = %lowest_buffered_ancestor,
head = %sync_target_state.head_block_hash,
"Current head has an invalid ancestor"
);
return Ok(())
}
// get the block number of the finalized block, if we have it
let newest_finalized = self
.blockchain
.buffered_header_by_hash(sync_target_state.finalized_block_hash)
.map(|header| header.number);
// The block number that the pipeline finished at - if the progress or newest
// finalized is None then we can't check the distance anyways.
//
// If both are Some, we perform another distance check and return the desired
// pipeline target
let pipeline_target =
ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
// Determines whether or not we should run the pipeline again, in case
// the new gap is large enough to warrant
// running the pipeline.
self.can_pipeline_sync_to_finalized(progress, finalized_number, None)
});
// If the distance is large enough, we should run the pipeline again to prevent
// the tree update from executing too many blocks and blocking.
if let Some(target) = pipeline_target {
// run the pipeline to the target since the distance is sufficient
self.sync.set_pipeline_sync_target(target);
} else {
// Update the state and hashes of the blockchain tree if possible.
let synced = self.update_tree_on_finished_pipeline(sync_target_state.finalized_block_hash).inspect_err(|error| {
error!(target: "consensus::engine", %error, "Error restoring blockchain tree state");
})?;
if !synced {
// We don't have the finalized block in the database, so
// we need to run another pipeline.
self.sync.set_pipeline_sync_target(sync_target_state.finalized_block_hash);
}
}
Ok(())
}
fn on_hook_result(&self, polled_hook: PolledHook) -> Result<(), BeaconConsensusEngineError> {
@ -1847,18 +1813,17 @@ where
}
// process sync events if any
match this.sync.poll(cx) {
Poll::Ready(sync_event) => {
if let Some(res) = this.on_sync_event(sync_event) {
return Poll::Ready(res)
}
// this could have taken a while, so we start the next cycle to handle any new
// engine messages
continue 'main
}
Poll::Pending => {
// no more sync events to process
if let Poll::Ready(sync_event) = this.sync.poll(cx) {
match this.on_sync_event(sync_event)? {
// Sync event was successfully processed
SyncEventOutcome::Processed => (),
// Max block has been reached, exit the engine loop
SyncEventOutcome::ReachedMaxBlock => return Poll::Ready(Ok(())),
}
// this could have taken a while, so we start the next cycle to handle any new
// engine messages
continue 'main
}
// at this point, all engine messages and sync events are fully drained
@ -1898,6 +1863,15 @@ enum OnForkchoiceUpdateOutcome {
Fatal(BlockExecutionError),
}
/// Represents outcomes of processing a sync event
#[derive(Debug)]
enum SyncEventOutcome {
/// Sync event was processed successfully, engine should continue.
Processed,
/// Sync event was processed successfully and reached max block.
ReachedMaxBlock,
}
#[cfg(test)]
mod tests {
use super::*;
@ -1911,7 +1885,7 @@ mod tests {
use reth_provider::{BlockWriter, ProviderFactory};
use reth_rpc_types::engine::{ForkchoiceState, ForkchoiceUpdated, PayloadStatus};
use reth_rpc_types_compat::engine::payload::try_block_to_payload_v1;
use reth_stages::{ExecOutput, StageError};
use reth_stages::{ExecOutput, PipelineError, StageError};
use std::{collections::VecDeque, sync::Arc};
use tokio::sync::oneshot::error::TryRecvError;