feat: improve block downloads (#2941)

This commit is contained in:
Matthias Seitz
2023-06-02 16:28:03 +02:00
committed by GitHub
parent f7a35f09b5
commit 7d14525307
6 changed files with 210 additions and 68 deletions

View File

@ -37,7 +37,25 @@ impl ForkchoiceStateTracker {
self.last_valid = Some(state);
}
/// Returns the [ForkchoiceStatus] of the latest received FCU.
pub(crate) fn latest_status(&self) -> Option<ForkchoiceStatus> {
self.latest.as_ref().map(|s| s.status)
}
/// Returns whether the latest received FCU is valid: [ForkchoiceStatus::Valid]
#[allow(unused)]
pub(crate) fn is_latest_valid(&self) -> bool {
self.latest_status().map(|s| s.is_valid()).unwrap_or(false)
}
/// Returns the last valid head hash.
#[allow(unused)]
pub(crate) fn last_valid_head(&self) -> Option<H256> {
self.last_valid.as_ref().map(|s| s.head_block_hash)
}
/// Returns the head hash of the latest received FCU to which we need to sync.
#[allow(unused)]
pub(crate) fn sync_target(&self) -> Option<H256> {
self.last_syncing.as_ref().map(|s| s.head_block_hash)
}

View File

@ -7,7 +7,7 @@ use reth_interfaces::consensus::ForkchoiceState;
use reth_payload_builder::error::PayloadBuilderError;
use reth_rpc_types::engine::{
ExecutionPayload, ForkChoiceUpdateResult, ForkchoiceUpdateError, ForkchoiceUpdated,
PayloadAttributes, PayloadId, PayloadStatus,
PayloadAttributes, PayloadId, PayloadStatus, PayloadStatusEnum,
};
use std::{
future::Future,
@ -44,6 +44,15 @@ impl OnForkChoiceUpdated {
self.forkchoice_status
}
/// Creates a new instance of `OnForkChoiceUpdated` for the `SYNCING` state
pub(crate) fn syncing() -> Self {
let status = PayloadStatus::from_status(PayloadStatusEnum::Syncing);
Self {
forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
}
}
/// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update succeeded and no
/// payload attributes were provided.
pub(crate) fn valid(status: PayloadStatus) -> Self {

View File

@ -135,15 +135,65 @@ impl BeaconConsensusEngineHandle {
/// The beacon consensus engine is the driver that switches between historical and live sync.
///
/// The beacon consensus engine is itself driven by messages from the Consensus Layer, which are
/// received by Engine API.
/// received by Engine API (JSON-RPC).
///
/// The consensus engine is idle until it receives the first
/// [BeaconEngineMessage::ForkchoiceUpdated] message from the CL which would initiate the sync. At
/// first, the consensus engine would run the [Pipeline] until the latest known block hash.
/// Afterwards, it would attempt to create/restore the [`BlockchainTreeEngine`] from the blocks
/// Afterward, it would attempt to create/restore the [`BlockchainTreeEngine`] from the blocks
/// that are currently available. In case the restoration is successful, the consensus engine would
/// run in a live sync mode, which mean it would solemnly rely on the messages from Engine API to
/// construct the chain forward.
/// run in a live sync mode, populating the [`BlockchainTreeEngine`] with new blocks as they arrive
/// via engine API and downloading any missing blocks from the network to fill potential gaps.
///
/// The consensus engine has two data input sources:
///
/// ## New Payload (`engine_newPayloadV{}`)
///
/// The engine receives new payloads from the CL. If the payload is connected to the canonical
/// chain, it will be fully validated added to a chain in the [BlockchainTreeEngine]: `VALID`
///
/// If the payload's chain is disconnected (at least 1 block is missing) then it will be buffered:
/// `SYNCING` ([BlockStatus::Disconnected]).
///
/// ## Forkchoice Update (FCU) (`engine_forkchoiceUpdatedV{}`)
///
/// This contains the latest forkchoice state and the payload attributes. The engine will attempt to
/// make a new canonical chain based on the `head_hash` of the update and trigger payload building
/// if the `payload_attrs` are present and the FCU is `VALID`.
///
/// The `head_hash` forms a chain by walking backwards from the `head_hash` towards the canonical
/// blocks of the chain.
///
/// Making a new canonical chain can result in the following relevant outcomes:
///
/// ### The chain is connected
///
/// All blocks of the `head_hash`'s chain are present in the [BlockchainTreeEngine] and are
/// committed to the canonical chain. This also includes reorgs.
///
/// ### The chain is disconnected
///
/// In this case the [BlockchainTreeEngine] doesn't know how the new chain connects to the existing
/// canonical chain. It could be a simple commit (new blocks extend the current head) or a re-org
/// that requires unwinding the canonical chain.
///
/// This further distinguishes between two variants:
///
/// #### `head_hash`'s block exists
///
/// The `head_hash`'s block was already received/downloaded, but at least one block is missing to
/// form a _connected_ chain. The engine will attempt to download the missing blocks from the
/// network by walking backwards (`parent_hash`), and then try to make the block canonical as soon
/// as the chain becomes connected.
///
/// However, it still can be the case that the chain and the FCU is `INVALID`.
///
/// #### `head_hash` block is missing
///
/// This is similar to the previous case, but the `head_hash`'s block is missing. At which point the
/// engine doesn't know where the new head will point to: new chain could be a re-org or a simple
/// commit. The engine will download the missing head first and then proceed as in the previous
/// case.
///
/// # Panics
///
@ -408,6 +458,7 @@ where
tx: oneshot::Sender<Result<OnForkChoiceUpdated, reth_interfaces::Error>>,
) -> bool {
self.metrics.forkchoice_updated_messages.increment(1);
self.blockchain.on_forkchoice_update_received(&state);
let on_updated = match self.forkchoice_updated(state, attrs) {
Ok(response) => response,
@ -423,9 +474,15 @@ where
let is_valid_response = on_updated.is_valid_update();
let _ = tx.send(Ok(on_updated));
// notify listeners about new processed FCU
self.listeners.notify(BeaconConsensusEngineEvent::ForkchoiceUpdated(state));
// Terminate the sync early if it's reached the maximum user
// configured block.
if is_valid_response {
// new VALID update that moved the canonical chain forward
let _ = self.update_canon_chain(&state);
let tip_number = self.blockchain.canonical_tip().number;
if self.sync.has_reached_max_block(tip_number) {
return true
@ -458,52 +515,39 @@ where
return Ok(OnForkChoiceUpdated::with_invalid(status))
}
// TODO: check PoW / EIP-3675 terminal block conditions for the fork choice head
// TODO: ensure validity of the payload (is this satisfied already?)
let status = if self.sync.is_pipeline_idle() {
if self.sync.is_pipeline_active() {
// We can only process new forkchoice updates if the pipeline is idle, since it requires
// exclusive access to the database
match self.blockchain.make_canonical(&state.head_block_hash) {
Ok(outcome) => {
let header = outcome.into_header();
debug!(target: "consensus::engine", hash=?state.head_block_hash, number=header.number, "canonicalized new head");
if let Some(attrs) = attrs {
let payload_response =
self.process_payload_attributes(attrs, header.unseal(), state);
if payload_response.is_valid_update() {
// we will return VALID, so let's make sure the info tracker is
// properly updated
self.update_canon_chain(&state)?;
}
self.listeners.notify(BeaconConsensusEngineEvent::ForkchoiceUpdated(state));
trace!(target: "consensus::engine", status = ?payload_response, ?state, "Returning forkchoice status ");
return Ok(payload_response)
}
// we will return VALID, so let's make sure the info tracker is
// properly updated
self.update_canon_chain(&state)?;
PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash))
}
Err(error) => {
if let Error::Execution(ref err) = error {
if err.is_fatal() {
tracing::error!(target: "consensus::engine", ?err, "Encountered fatal error");
return Err(error)
}
}
self.on_failed_canonical_forkchoice_update(&state, error)
}
}
} else {
trace!(target: "consensus::engine", "Pipeline is syncing, skipping forkchoice update");
PayloadStatus::from_status(PayloadStatusEnum::Syncing)
};
return Ok(OnForkChoiceUpdated::syncing())
}
self.listeners.notify(BeaconConsensusEngineEvent::ForkchoiceUpdated(state));
let status = match self.blockchain.make_canonical(&state.head_block_hash) {
Ok(outcome) => {
let header = outcome.into_header();
debug!(target: "consensus::engine", hash=?state.head_block_hash, number=header.number, "canonicalized new head");
if let Some(attrs) = attrs {
// the CL requested to build a new payload on top of this new VALID head
let payload_response =
self.process_payload_attributes(attrs, header.unseal(), state);
trace!(target: "consensus::engine", status = ?payload_response, ?state, "Returning forkchoice status ");
return Ok(payload_response)
}
PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash))
}
Err(error) => {
if let Error::Execution(ref err) = error {
if err.is_fatal() {
tracing::error!(target: "consensus::engine", ?err, "Encountered fatal error");
return Err(error)
}
}
self.on_failed_canonical_forkchoice_update(&state, error)
}
};
trace!(target: "consensus::engine", ?status, ?state, "Returning forkchoice status");
Ok(OnForkChoiceUpdated::valid(status))
@ -554,7 +598,6 @@ where
total_difficulty: head_td,
});
self.blockchain.set_canonical_head(head);
self.blockchain.on_forkchoice_update_received(update);
Ok(())
}
@ -894,6 +937,90 @@ where
Ok(())
}
/// Invoked if we successfully downloaded a new block from the network.
///
/// This will attempt to insert the block into the tree.
///
/// There are several scenarios:
///
/// ## [BlockStatus::Valid]
///
/// The block is connected to the current canonical head and is valid.
/// If the engine is still SYNCING, then we can try again to make the chain canonical.
///
/// ## [BlockStatus::Accepted]
///
/// All ancestors are known, but the block is not connected to the current canonical _head_. If
/// the block is an ancestor of the current forkchoice head, then we can try again to make the
/// chain canonical, which would trigger a reorg in this case since the new head is therefore
/// not connected to the current head.
///
/// ## [BlockStatus::Disconnected]
///
/// The block is not connected to the canonical head, and we need to download the missing parent
/// first.
///
/// ## Insert Error
///
/// If the insertion into the tree failed, then the block was well-formed (valid hash), but its
/// chain is invalid, which means the FCU that triggered the download is invalid. Here we can
/// stop because there's nothing to do here and the engine needs to wait for another FCU.
fn on_downloaded_block(&mut self, block: SealedBlock) {
trace!(target: "consensus::engine", hash=?block.hash, number=%block.number, "Downloaded full block");
// check if the block's parent is already marked as invalid
if self.check_invalid_ancestor_with_head(block.parent_hash, block.hash).is_some() {
// can skip this invalid block
return
}
match self.blockchain.insert_block_without_senders(block) {
Ok(status) => {
match status {
BlockStatus::Valid => {
// block is connected to the current canonical head and is valid.
self.try_make_sync_target_canonical();
}
BlockStatus::Accepted => {
// block is connected to the canonical chain, but not the current head
self.try_make_sync_target_canonical();
}
BlockStatus::Disconnected { missing_parent } => {
// continue downloading the missing parent
self.sync.download_full_block(missing_parent.hash);
}
}
}
Err(err) => {
debug!(target: "consensus::engine", ?err, "Failed to insert downloaded block");
}
}
}
/// Attempt to form a new canonical chain based on the current sync target.
///
/// This is invoked when we successfully downloaded a new block from the network which resulted
/// in either [BlockStatus::Accepted] or [BlockStatus::Valid].
///
/// Note: This will not succeed if the sync target has changed since the block download request
/// was issued and the new target is still disconnected and additional missing blocks are
/// downloaded
fn try_make_sync_target_canonical(&mut self) {
if let Some(target) = self.forkchoice_state_tracker.sync_target_state() {
// optimistically try to make the chain canonical, the sync target might have changed
// since the block download request was issued (new FCU received)
if let Ok(outcome) = self.blockchain.make_canonical(&target.head_block_hash) {
let new_head = outcome.into_header();
debug!(target: "consensus::engine", hash=?new_head.hash, number=new_head.number, "canonicalized new head");
// we're no longer syncing
self.sync_state_updater.update_sync_state(SyncState::Idle);
// we can update the FCU blocks
let _ = self.update_canon_chain(&target);
}
}
}
/// Event handler for events emitted by the [EngineSyncController].
///
/// This returns a result to indicate whether the engine future should resolve (fatal error).
@ -903,24 +1030,7 @@ where
) -> Option<Result<(), BeaconConsensusEngineError>> {
match ev {
EngineSyncEvent::FetchedFullBlock(block) => {
trace!(target: "consensus::engine", hash=?block.hash, "Fetched full block");
// it is guaranteed that the pipeline is not active at this point.
// TODO(mattsse): better error handling and start closing the gap if there's any by
// closing the gap either via pipeline, or by fetching the blocks via block number
// [head..FCU.number]
if self
.try_insert_new_payload(block)
.map(|status| status.is_valid())
.unwrap_or_default()
{
// payload is valid
self.sync_state_updater.update_sync_state(SyncState::Idle);
} else if let Some(target) = self.forkchoice_state_tracker.sync_target() {
// if the payload is invalid, we run the pipeline to the head block.
self.sync.set_pipeline_sync_target(target);
}
self.on_downloaded_block(block);
}
EngineSyncEvent::PipelineStarted(target) => {
trace!(target: "consensus::engine", ?target, continuous = target.is_none(), "Started the pipeline");

View File

@ -100,6 +100,11 @@ where
self.pipeline_state.is_idle()
}
/// Returns `true` if the pipeline is active.
pub(crate) fn is_pipeline_active(&self) -> bool {
!self.is_pipeline_idle()
}
/// Returns true if there's already a request for the given hash.
pub(crate) fn is_inflight_request(&self, hash: H256) -> bool {
self.inflight_full_block_requests.iter().any(|req| *req.hash() == hash)