From e34cdc4e0db93f3ddec205c02d037242ad88dde5 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Fri, 12 May 2023 15:02:45 +0200 Subject: [PATCH] feat: extend engine syncing with single block downloads (#2626) --- bin/reth/src/node/mod.rs | 7 +- crates/blockchain-tree/src/shareable.rs | 3 +- crates/consensus/beacon/src/engine/mod.rs | 415 ++++++++---------- .../beacon/src/engine/pipeline_state.rs | 26 -- crates/consensus/beacon/src/engine/sync.rs | 266 +++++++++++ crates/interfaces/src/blockchain_tree.rs | 11 +- crates/interfaces/src/p2p/full_block.rs | 5 + .../interfaces/src/test_utils/full_block.rs | 45 ++ crates/interfaces/src/test_utils/mod.rs | 2 + .../{test_client.rs => bodies_client.rs} | 0 crates/net/downloaders/src/test_utils/mod.rs | 16 +- crates/stages/src/pipeline/ctrl.rs | 3 + crates/stages/src/pipeline/mod.rs | 6 +- 13 files changed, 527 insertions(+), 278 deletions(-) delete mode 100644 crates/consensus/beacon/src/engine/pipeline_state.rs create mode 100644 crates/consensus/beacon/src/engine/sync.rs create mode 100644 crates/interfaces/src/test_utils/full_block.rs rename crates/net/downloaders/src/test_utils/{test_client.rs => bodies_client.rs} (100%) diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 4e69b3b5a..db9257b78 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -240,6 +240,7 @@ impl Command { .await?; info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network"); debug!(target: "reth::cli", peer_id = ?network.peer_id(), "Full peer ID"); + let network_client = network.fetch_client().await?; let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel(); @@ -310,11 +311,10 @@ impl Command { pipeline } else { - let client = network.fetch_client().await?; self.build_networked_pipeline( &mut config, network.clone(), - client, + network_client.clone(), Arc::clone(&consensus), db.clone(), &ctx.task_executor, @@ -339,9 +339,10 @@ impl Command { let pipeline_events = pipeline.events(); let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel( Arc::clone(&db), - ctx.task_executor.clone(), + network_client, pipeline, blockchain_db.clone(), + Box::new(ctx.task_executor.clone()), self.debug.max_block, self.debug.continuous, payload_builder.clone(), diff --git a/crates/blockchain-tree/src/shareable.rs b/crates/blockchain-tree/src/shareable.rs index 4cb792d1e..f7a02ec87 100644 --- a/crates/blockchain-tree/src/shareable.rs +++ b/crates/blockchain-tree/src/shareable.rs @@ -95,8 +95,7 @@ impl BlockchainTreeViewer self.tree.read().block_indices().canonical_chain().inner().clone() } - fn find_canonical_ancestor(&self, hash: BlockHash) -> Option { - let mut parent = hash; + fn find_canonical_ancestor(&self, mut parent: BlockHash) -> Option { let tree = self.tree.read(); // walk up the tree and check if the parent is in the sidechain diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 2de37eeb6..b5a3d2ca9 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -1,10 +1,14 @@ -use crate::engine::{message::OnForkChoiceUpdated, metrics::Metrics}; -use futures::{Future, FutureExt, StreamExt, TryFutureExt}; +use crate::{ + engine::{message::OnForkChoiceUpdated, metrics::Metrics}, + sync::{EngineSyncController, EngineSyncEvent}, +}; +use futures::{Future, StreamExt, TryFutureExt}; use reth_db::{database::Database, tables, transaction::DbTx}; use reth_interfaces::{ blockchain_tree::{BlockStatus, BlockchainTreeEngine}, consensus::ForkchoiceState, executor::Error as ExecutorError, + p2p::{bodies::client::BodiesClient, headers::client::HeadersClient}, Error, }; use reth_payload_builder::{PayloadBuilderAttributes, PayloadBuilderHandle}; @@ -39,11 +43,9 @@ pub use error::{BeaconEngineError, BeaconEngineResult, BeaconForkChoiceUpdateErr mod metrics; -mod pipeline_state; - -pub use pipeline_state::PipelineState; - mod event; +pub(crate) mod sync; + pub use event::BeaconConsensusEngineEvent; /// The maximum number of invalid headers that can be tracked by the engine. @@ -134,20 +136,16 @@ impl BeaconConsensusEngineHandle { /// /// If the future is polled more than once. Leads to undefined state. #[must_use = "Future does nothing unless polled"] -pub struct BeaconConsensusEngine +pub struct BeaconConsensusEngine where DB: Database, - TS: TaskSpawner, + Client: HeadersClient + BodiesClient, BT: BlockchainTreeEngine + BlockProvider + CanonChainTracker, { /// The database handle. db: DB, - /// Task spawner for spawning the pipeline. - task_spawner: TS, - /// The current state of the pipeline. - /// Must always be [Some] unless the state is being reevaluated. - /// The pipeline is used for historical sync by setting the current forkchoice head. - pipeline_state: Option>, + /// Controls syncing triggered by engine updates. + sync: EngineSyncController, /// The type we can use to query both the database and the blockchain tree. blockchain: BT, /// The Engine API message receiver. @@ -157,14 +155,6 @@ where /// Current forkchoice state. The engine must receive the initial state in order to start /// syncing. forkchoice_state: Option, - /// Next action that the engine should take after the pipeline finished running. - next_action: BeaconEngineAction, - /// Max block after which the consensus engine would terminate the sync. Used for debugging - /// purposes. - max_block: Option, - /// If true, the engine will run the pipeline continuously, regardless of whether or not there - /// is a new fork choice state. - continuous: bool, /// The payload store. payload_builder: PayloadBuilderHandle, /// Listeners for engine events. @@ -176,30 +166,33 @@ where metrics: Metrics, } -impl BeaconConsensusEngine +impl BeaconConsensusEngine where DB: Database + Unpin + 'static, - TS: TaskSpawner, BT: BlockchainTreeEngine + BlockProvider + CanonChainTracker + 'static, + Client: HeadersClient + BodiesClient + Clone + Unpin + 'static, { /// Create a new instance of the [BeaconConsensusEngine]. + #[allow(clippy::too_many_arguments)] pub fn new( db: DB, - task_spawner: TS, + client: Client, pipeline: Pipeline, blockchain: BT, + task_spawner: Box, max_block: Option, - continuous: bool, + run_pipeline_continuously: bool, payload_builder: PayloadBuilderHandle, ) -> (Self, BeaconConsensusEngineHandle) { let (to_engine, rx) = mpsc::unbounded_channel(); Self::with_channel( db, - task_spawner, + client, pipeline, blockchain, + task_spawner, max_block, - continuous, + run_pipeline_continuously, payload_builder, to_engine, rx, @@ -211,27 +204,31 @@ where #[allow(clippy::too_many_arguments)] pub fn with_channel( db: DB, - task_spawner: TS, + client: Client, pipeline: Pipeline, blockchain: BT, + task_spawner: Box, max_block: Option, - continuous: bool, + run_pipeline_continuously: bool, payload_builder: PayloadBuilderHandle, to_engine: UnboundedSender, rx: UnboundedReceiver, ) -> (Self, BeaconConsensusEngineHandle) { let handle = BeaconConsensusEngineHandle { to_engine }; + let sync = EngineSyncController::new( + pipeline, + client, + task_spawner, + run_pipeline_continuously, + max_block, + ); let this = Self { db, - task_spawner, - pipeline_state: Some(PipelineState::Idle(pipeline)), + sync, blockchain, engine_message_rx: UnboundedReceiverStream::new(rx), handle: handle.clone(), forkchoice_state: None, - next_action: BeaconEngineAction::None, - max_block, - continuous, payload_builder, listeners: EventListeners::default(), invalid_headers: InvalidHeaderCache::new(MAX_INVALID_HEADERS), @@ -249,17 +246,6 @@ where self.handle.clone() } - /// Returns `true` if the pipeline is currently idle. - fn is_pipeline_idle(&self) -> bool { - self.pipeline_state.as_ref().expect("pipeline state is set").is_idle() - } - - /// Set next action to [BeaconEngineAction::RunPipeline] to indicate that - /// consensus engine needs to run the pipeline as soon as it becomes available. - fn require_pipeline_run(&mut self, target: PipelineTarget) { - self.next_action = BeaconEngineAction::RunPipeline(target); - } - /// If validation fails, the response MUST contain the latest valid hash: /// /// - The block hash of the ancestor of the invalid payload satisfying the following two @@ -280,6 +266,10 @@ where return Some(H256::zero()) } + // TODO(mattsse): This could be invoked on new payload which does not make tree canonical, + // which would make this inaccurate, e.g. if an invalid payload is received in this + // scenario: FUC (unknown head) -> valid payload -> invalid payload + self.blockchain.find_canonical_ancestor(parent_hash) } @@ -334,7 +324,10 @@ where let is_first_forkchoice = self.forkchoice_state.is_none(); self.forkchoice_state = Some(state); - let status = if self.is_pipeline_idle() { + + let status = if self.sync.is_pipeline_idle() { + // We can only process new forkchoice updates if the pipeline is idle, since it requires + // exclusive access to the database match self.blockchain.make_canonical(&state.head_block_hash) { Ok(_) => { let head_block_number = self @@ -347,7 +340,9 @@ where if pipeline_min_progress < head_block_number { debug!(target: "consensus::engine", last_finished=pipeline_min_progress, head_number=head_block_number, "pipeline run to head required"); - self.require_pipeline_run(PipelineTarget::Head); + + // TODO(mattsse) ideally sync blockwise + self.sync.set_pipeline_sync_target(state.head_block_hash); } if let Some(attrs) = attrs { @@ -441,6 +436,7 @@ where error: Error, is_first_forkchoice: bool, ) -> PayloadStatus { + debug_assert!(self.sync.is_pipeline_idle(), "pipeline must be idle"); warn!(target: "consensus::engine", ?error, ?state, "Error canonicalizing the head hash"); // check if the new head was previously invalidated, if so then we deem this FCU @@ -450,26 +446,42 @@ where return invalid_ancestor } - // If this is the first forkchoice received, start downloading from safe block - // hash, if we have that block. - let target = if is_first_forkchoice && - !state.safe_block_hash.is_zero() && - self.get_block_number(state.safe_block_hash).ok().flatten().is_none() - { - PipelineTarget::Safe - } else { - PipelineTarget::Head - }; - self.require_pipeline_run(target); - match error { + #[allow(clippy::single_match)] + match &error { Error::Execution(error @ ExecutorError::BlockPreMerge { .. }) => { - PayloadStatus::from_status(PayloadStatusEnum::Invalid { + return PayloadStatus::from_status(PayloadStatusEnum::Invalid { validation_error: error.to_string(), }) .with_latest_valid_hash(H256::zero()) } - _ => PayloadStatus::from_status(PayloadStatusEnum::Syncing), + _ => { + // TODO(mattsse) better error handling before attempting to sync (FCU could be + // invalid): only trigger sync if we can't determine whether the FCU is invalid + } } + + // we assume the FCU is valid and at least the head is missing, so we need to start syncing + // to it + + // if this is the first FCU we received from the beacon node, then we start triggering the + // pipeline + if is_first_forkchoice { + // find the appropriate target to sync to, if we don't have the safe block hash then we + // start syncing to the safe block via pipeline first + let target = if !state.safe_block_hash.is_zero() && + self.get_block_number(state.safe_block_hash).ok().flatten().is_none() + { + state.safe_block_hash + } else { + state.head_block_hash + }; + self.sync.set_pipeline_sync_target(target); + } else { + // trigger a full block download for the _missing_ new head + self.sync.download_full_block(state.head_block_hash) + } + + PayloadStatus::from_status(PayloadStatusEnum::Syncing) } /// Validates the payload attributes with respect to the header and fork choice state. @@ -555,7 +567,9 @@ where let header = block.header.clone(); - let status = if self.is_pipeline_idle() { + let status = if self.sync.is_pipeline_idle() { + // we can only insert new payloads if the pipeline is _not_ running, because it holds + // exclusive access to the database match self.blockchain.insert_block_without_senders(block) { Ok(status) => { let mut latest_valid_hash = None; @@ -603,43 +617,6 @@ where status } - /// Returns the next pipeline state depending on the current value of the next action. - /// Resets the next action to the default value. - fn next_pipeline_state( - &mut self, - pipeline: Pipeline, - forkchoice_state: ForkchoiceState, - ) -> PipelineState { - let next_action = std::mem::take(&mut self.next_action); - - let (tip, should_run_pipeline) = match next_action { - BeaconEngineAction::RunPipeline(target) => { - let tip = match target { - PipelineTarget::Head => forkchoice_state.head_block_hash, - PipelineTarget::Safe => forkchoice_state.safe_block_hash, - }; - (Some(tip), true) - } - BeaconEngineAction::None => (None, self.continuous), - }; - - if should_run_pipeline { - self.metrics.pipeline_runs.increment(1); - trace!(target: "consensus::engine", ?tip, continuous = tip.is_none(), "Starting the pipeline"); - let (tx, rx) = oneshot::channel(); - self.task_spawner.spawn_critical_blocking( - "pipeline", - Box::pin(async move { - let result = pipeline.run_as_fut(tip).await; - let _ = tx.send(result); - }), - ); - PipelineState::Running(rx) - } else { - PipelineState::Idle(pipeline) - } - } - /// Attempt to restore the tree with the finalized block number. /// If the finalized block is missing from the database, trigger the pipeline run. fn restore_tree_if_possible( @@ -658,31 +635,73 @@ where } None => true, }; + if needs_pipeline_run { - self.require_pipeline_run(PipelineTarget::Head); + self.sync.set_pipeline_sync_target(state.head_block_hash); } Ok(()) } - /// Check if the engine reached max block as specified by `max_block` parameter. - fn has_reached_max_block(&self, progress: BlockNumber) -> bool { - if self.max_block.map_or(false, |target| progress >= target) { - trace!( - target: "consensus::engine", - ?progress, - max_block = ?self.max_block, - "Consensus engine reached max block." - ); - true - } else { - false - } - } - /// Retrieve the block number for the given block hash. fn get_block_number(&self, hash: H256) -> Result, reth_interfaces::Error> { Ok(self.db.view(|tx| tx.get::(hash))??) } + + /// Event handler for events emitted by the [EngineSyncController]. + /// + /// This returns a result to indicate whether the engine future should resolve (fatal error). + fn on_sync_event( + &mut self, + ev: EngineSyncEvent, + current_state: &ForkchoiceState, + ) -> Option> { + match ev { + EngineSyncEvent::FetchedFullBlock(block) => { + // it is guaranteed that the pipeline is not active at this point. + + // TODO(mattsse): better error handling and start closing the gap if there's any by + // closing the gap either via pipeline, or by fetching the blocks via block number + // [head..FCU.number] + + let hash = block.hash; + if !self.on_new_payload(block.into()).is_valid() { + // if the payload is invalid we run the pipeline + self.sync.set_pipeline_sync_target(hash); + } + } + EngineSyncEvent::PipelineStarted(target) => { + trace!(target: "consensus::engine", ?target, continuous = target.is_none(), "Started the pipeline"); + self.metrics.pipeline_runs.increment(1); + } + EngineSyncEvent::PipelineTaskDropped => { + error!(target: "consensus::engine", "Failed to receive spawned pipeline"); + return Some(Err(BeaconEngineError::PipelineChannelClosed)) + } + EngineSyncEvent::PipelineFinished { result, reached_max_block } => { + match result { + Ok(ctrl) => { + if ctrl.is_unwind() { + self.sync.set_pipeline_sync_target(current_state.head_block_hash); + } else if reached_max_block { + // Terminate the sync early if it's reached the maximum user + // configured block. + return Some(Ok(())) + } + + // Update the state and hashes of the blockchain tree if possible + if let Err(error) = self.restore_tree_if_possible(*current_state) { + error!(target: "consensus::engine", ?error, "Error restoring blockchain tree"); + return Some(Err(error.into())) + } + } + // Any pipeline error at this point is fatal. + Err(error) => return Some(Err(error.into())), + }; + } + }; + + None + } } /// On initialization, the consensus engine will poll the message receiver and return @@ -692,10 +711,10 @@ where /// local forkchoice state, it will launch the pipeline to sync to the head hash. /// While the pipeline is syncing, the consensus engine will keep processing messages from the /// receiver and forwarding them to the blockchain tree. -impl Future for BeaconConsensusEngine +impl Future for BeaconConsensusEngine where DB: Database + Unpin + 'static, - TS: TaskSpawner + Unpin, + Client: HeadersClient + BodiesClient + Clone + Unpin + 'static, BT: BlockchainTreeEngine + BlockProvider + CanonChainTracker + Unpin + 'static, { type Output = Result<(), BeaconEngineError>; @@ -703,127 +722,58 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); - // Set the next pipeline state. - loop { - // Process all incoming messages first. - while let Poll::Ready(Some(msg)) = this.engine_message_rx.poll_next_unpin(cx) { - match msg { - BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => { - this.metrics.forkchoice_updated_messages.increment(1); - let on_updated = match this.on_forkchoice_updated(state, payload_attrs) { - Ok(response) => response, - Err(error) => { - error!(target: "consensus::engine", ?state, ?error, "Error getting forkchoice updated response"); - return Poll::Ready(Err(error)) - } - }; - let is_valid_response = on_updated.is_valid_update(); - let _ = tx.send(on_updated); - - // Terminate the sync early if it's reached the maximum user - // configured block. - if is_valid_response { - let tip_number = this.blockchain.canonical_tip().number; - if this.has_reached_max_block(tip_number) { - return Poll::Ready(Ok(())) - } + // Process all incoming messages first. + while let Poll::Ready(Some(msg)) = this.engine_message_rx.poll_next_unpin(cx) { + match msg { + BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => { + this.metrics.forkchoice_updated_messages.increment(1); + let on_updated = match this.on_forkchoice_updated(state, payload_attrs) { + Ok(response) => response, + Err(error) => { + error!(target: "consensus::engine", ?state, ?error, "Error getting forkchoice updated response"); + return Poll::Ready(Err(error)) } - } - BeaconEngineMessage::NewPayload { payload, tx } => { - this.metrics.new_payload_messages.increment(1); - let status = this.on_new_payload(payload); - let _ = tx.send(Ok(status)); - } - BeaconEngineMessage::EventListener(tx) => { - this.listeners.push_listener(tx); - } - } - } + }; + let is_valid_response = on_updated.is_valid_update(); + let _ = tx.send(on_updated); - // Lookup the forkchoice state. We can't launch the pipeline without the tip. - let forkchoice_state = match &this.forkchoice_state { - Some(state) => *state, - None => return Poll::Pending, - }; - - let next_state = match this.pipeline_state.take().expect("pipeline state is set") { - PipelineState::Running(mut fut) => { - match fut.poll_unpin(cx) { - Poll::Ready(Ok((pipeline, result))) => { - if let Err(error) = result { - return Poll::Ready(Err(error.into())) - } - - match result { - Ok(ctrl) => { - if ctrl.is_unwind() { - this.require_pipeline_run(PipelineTarget::Head); - } else { - // Terminate the sync early if it's reached the maximum user - // configured block. - let minimum_pipeline_progress = - pipeline.minimum_progress().unwrap_or_default(); - if this.has_reached_max_block(minimum_pipeline_progress) { - return Poll::Ready(Ok(())) - } - } - } - // Any pipeline error at this point is fatal. - Err(error) => return Poll::Ready(Err(error.into())), - }; - - // Update the state and hashes of the blockchain tree if possible - if let Err(error) = this.restore_tree_if_possible(forkchoice_state) { - error!(target: "consensus::engine", ?error, "Error restoring blockchain tree"); - return Poll::Ready(Err(error.into())) - } - - // Get next pipeline state. - this.next_pipeline_state(pipeline, forkchoice_state) - } - Poll::Ready(Err(error)) => { - error!(target: "consensus::engine", ?error, "Failed to receive pipeline result"); - return Poll::Ready(Err(BeaconEngineError::PipelineChannelClosed)) - } - Poll::Pending => { - this.pipeline_state = Some(PipelineState::Running(fut)); - return Poll::Pending + // Terminate the sync early if it's reached the maximum user + // configured block. + if is_valid_response { + let tip_number = this.blockchain.canonical_tip().number; + if this.sync.has_reached_max_block(tip_number) { + return Poll::Ready(Ok(())) } } } - PipelineState::Idle(pipeline) => { - this.next_pipeline_state(pipeline, forkchoice_state) + BeaconEngineMessage::NewPayload { payload, tx } => { + this.metrics.new_payload_messages.increment(1); + let status = this.on_new_payload(payload); + let _ = tx.send(Ok(status)); + } + BeaconEngineMessage::EventListener(tx) => { + this.listeners.push_listener(tx); } - }; - this.pipeline_state = Some(next_state); - - // If the pipeline is idle, break from the loop. - if this.is_pipeline_idle() { - return Poll::Pending } } + + // Lookup the forkchoice state. We can't launch the pipeline without the tip. + let forkchoice_state = match &this.forkchoice_state { + Some(state) => *state, + None => return Poll::Pending, + }; + + // poll sync controller + while let Poll::Ready(sync_event) = this.sync.poll(cx) { + if let Some(res) = this.on_sync_event(sync_event, &forkchoice_state) { + return Poll::Ready(res) + } + } + + Poll::Pending } } -/// Denotes the next action that the [BeaconConsensusEngine] should take. -#[derive(Debug, Default)] -enum BeaconEngineAction { - #[default] - None, - /// Contains the type of target hash to pass to the pipeline - RunPipeline(PipelineTarget), -} - -/// The target hash to pass to the pipeline. -#[derive(Debug, Default)] -enum PipelineTarget { - /// Corresponds to the head block hash. - #[default] - Head, - /// Corresponds to the safe block hash. - Safe, -} - /// Keeps track of invalid headers. struct InvalidHeaderCache { headers: LruMap, @@ -857,7 +807,7 @@ mod tests { BlockchainTree, ShareableBlockchainTree, }; use reth_db::mdbx::{test_utils::create_test_rw_db, Env, WriteMap}; - use reth_interfaces::test_utils::TestConsensus; + use reth_interfaces::test_utils::{NoopFullBlockClient, TestConsensus}; use reth_payload_builder::test_utils::spawn_test_payload_service; use reth_primitives::{ChainSpec, ChainSpecBuilder, SealedBlockWithSenders, H256, MAINNET}; use reth_provider::{ @@ -874,11 +824,11 @@ mod tests { type TestBeaconConsensusEngine = BeaconConsensusEngine< Arc>, - TokioTaskExecutor, BlockchainProvider< Arc>, ShareableBlockchainTree>, TestConsensus, TestExecutorFactory>, >, + NoopFullBlockClient, >; struct TestEnv { @@ -975,9 +925,10 @@ mod tests { let blockchain_provider = BlockchainProvider::with_latest(shareable_db, tree, latest); let (engine, handle) = BeaconConsensusEngine::new( db.clone(), - TokioTaskExecutor::default(), + NoopFullBlockClient::default(), pipeline, blockchain_provider, + Box::::default(), None, false, payload_builder, @@ -1114,7 +1065,7 @@ mod tests { VecDeque::from([Ok(ExecOutput { stage_progress: max_block, done: true })]), Vec::default(), ); - consensus_engine.max_block = Some(max_block); + consensus_engine.sync.set_max_block(max_block); let rx = spawn_consensus_engine(consensus_engine); let _ = env diff --git a/crates/consensus/beacon/src/engine/pipeline_state.rs b/crates/consensus/beacon/src/engine/pipeline_state.rs deleted file mode 100644 index 2595e140a..000000000 --- a/crates/consensus/beacon/src/engine/pipeline_state.rs +++ /dev/null @@ -1,26 +0,0 @@ -use reth_db::database::Database; -use reth_stages::{Pipeline, PipelineWithResult}; -use tokio::sync::oneshot; - -/// The possible pipeline states within the sync controller. -/// -/// [PipelineState::Idle] means that the pipeline is currently idle. -/// [PipelineState::Running] means that the pipeline is currently running. -/// -/// NOTE: The differentiation between these two states is important, because when the pipeline is -/// running, it acquires the write lock over the database. This means that we cannot forward to the -/// blockchain tree any messages that would result in database writes, since it would result in a -/// deadlock. -pub enum PipelineState { - /// Pipeline is idle. - Idle(Pipeline), - /// Pipeline is running. - Running(oneshot::Receiver>), -} - -impl PipelineState { - /// Returns `true` if the state matches idle. - pub fn is_idle(&self) -> bool { - matches!(self, PipelineState::Idle(_)) - } -} diff --git a/crates/consensus/beacon/src/engine/sync.rs b/crates/consensus/beacon/src/engine/sync.rs new file mode 100644 index 000000000..e2c5fe3e4 --- /dev/null +++ b/crates/consensus/beacon/src/engine/sync.rs @@ -0,0 +1,266 @@ +//! Sync management for the engine implementation. + +use futures::FutureExt; +use reth_db::database::Database; +use reth_interfaces::p2p::{ + bodies::client::BodiesClient, + full_block::{FetchFullBlockFuture, FullBlockClient}, + headers::client::HeadersClient, +}; +use reth_primitives::{BlockNumber, SealedBlock, H256}; +use reth_stages::{ControlFlow, Pipeline, PipelineError, PipelineWithResult}; +use reth_tasks::TaskSpawner; +use std::{ + collections::VecDeque, + task::{ready, Context, Poll}, +}; +use tokio::sync::oneshot; +use tracing::trace; + +/// Manages syncing under the control of the engine. +/// +/// This type controls the [Pipeline] and supports (single) full block downloads. +/// +/// Caution: If the pipeline is running, this type will not emit blocks downloaded from the network +/// [EngineSyncEvent::FetchedFullBlock] until the pipeline is idle to prevent commits to the +/// database while the pipeline is still active. +pub(crate) struct EngineSyncController +where + DB: Database, + Client: HeadersClient + BodiesClient, +{ + /// A downloader that can download full blocks from the network. + full_block_client: FullBlockClient, + /// The type that can spawn the pipeline task. + pipeline_task_spawner: Box, + /// The current state of the pipeline. + /// The pipeline is used for large ranges. + pipeline_state: PipelineState, + /// Pending target block for the pipeline to sync + pending_pipeline_target: Option, + /// In requests in progress. + inflight_full_block_requests: Vec>, + /// Buffered events until the manager is polled and the pipeline is idle. + queued_events: VecDeque, + /// If enabled, the pipeline will be triggered continuously, as soon as it becomes idle + run_pipeline_continuously: bool, + /// Max block after which the consensus engine would terminate the sync. Used for debugging + /// purposes. + max_block: Option, +} + +impl EngineSyncController +where + DB: Database + 'static, + Client: HeadersClient + BodiesClient + Clone + Unpin + 'static, +{ + /// Create a new instance + pub(crate) fn new( + pipeline: Pipeline, + client: Client, + pipeline_task_spawner: Box, + run_pipeline_continuously: bool, + max_block: Option, + ) -> Self { + Self { + full_block_client: FullBlockClient::new(client), + pipeline_task_spawner, + pipeline_state: PipelineState::Idle(Some(pipeline)), + pending_pipeline_target: None, + inflight_full_block_requests: Vec::new(), + queued_events: VecDeque::new(), + run_pipeline_continuously, + max_block, + } + } + + /// Sets the max block value for testing + #[cfg(test)] + pub(crate) fn set_max_block(&mut self, block: BlockNumber) { + self.max_block = Some(block); + } + + /// Cancels all full block requests that are in progress. + pub(crate) fn clear_full_block_requests(&mut self) { + self.inflight_full_block_requests.clear(); + } + + /// Returns `true` if the pipeline is idle. + pub(crate) fn is_pipeline_idle(&self) -> bool { + self.pipeline_state.is_idle() + } + + /// Starts requesting a full block from the network. + pub(crate) fn download_full_block(&mut self, hash: H256) { + let request = self.full_block_client.get_full_block(hash); + self.inflight_full_block_requests.push(request); + } + + /// Sets a new target to sync the pipeline to. + pub(crate) fn set_pipeline_sync_target(&mut self, target: H256) { + self.pending_pipeline_target = Some(target); + } + + /// Check if the engine reached max block as specified by `max_block` parameter. + /// + /// Note: this is mainly for debugging purposes. + pub(crate) fn has_reached_max_block(&self, progress: BlockNumber) -> bool { + let has_reached_max_block = + self.max_block.map(|target| progress >= target).unwrap_or_default(); + if has_reached_max_block { + trace!( + target: "consensus::engine", + ?progress, + max_block = ?self.max_block, + "Consensus engine reached max block." + ); + } + has_reached_max_block + } + + /// Advances the pipeline state. + /// + /// This checks for the result in the channel, or returns pending if the pipeline is idle. + fn poll_pipeline(&mut self, cx: &mut Context<'_>) -> Poll { + let res = match self.pipeline_state { + PipelineState::Idle(_) => return Poll::Pending, + PipelineState::Running(ref mut fut) => { + ready!(fut.poll_unpin(cx)) + } + }; + let ev = match res { + Ok((pipeline, result)) => { + let minimum_progress = pipeline.minimum_progress(); + let reached_max_block = + self.has_reached_max_block(minimum_progress.unwrap_or_default()); + self.pipeline_state = PipelineState::Idle(Some(pipeline)); + EngineSyncEvent::PipelineFinished { result, reached_max_block } + } + Err(_) => { + // failed to receive the pipeline + EngineSyncEvent::PipelineTaskDropped + } + }; + Poll::Ready(ev) + } + + /// This will spawn the pipeline if it is idle and a target is set or if the pipeline is set to + /// run continuously. + fn try_spawn_pipeline(&mut self) -> Option { + match &mut self.pipeline_state { + PipelineState::Idle(pipeline) => { + let target = self.pending_pipeline_target.take(); + + if target.is_none() && !self.run_pipeline_continuously { + // nothing to sync + return None + } + + let (tx, rx) = oneshot::channel(); + + let pipeline = pipeline.take().expect("exists"); + self.pipeline_task_spawner.spawn_critical_blocking( + "pipeline task", + Box::pin(async move { + let result = pipeline.run_as_fut(target).await; + let _ = tx.send(result); + }), + ); + self.pipeline_state = PipelineState::Running(rx); + + // we also clear any pending full block requests because we expect them to be + // outdated (included in the range the pipeline is syncing anyway) + self.clear_full_block_requests(); + + Some(EngineSyncEvent::PipelineStarted(target)) + } + PipelineState::Running(_) => None, + } + } + + /// Advances the sync process. + pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll { + // try to spawn a pipeline if a target is set + if let Some(event) = self.try_spawn_pipeline() { + return Poll::Ready(event) + } + + loop { + // drain buffered events first if pipeline is not running + if self.is_pipeline_idle() { + if let Some(event) = self.queued_events.pop_front() { + return Poll::Ready(event) + } + } else { + // advance the pipeline + if let Poll::Ready(event) = self.poll_pipeline(cx) { + return Poll::Ready(event) + } + } + + // advance all requests + for idx in (0..self.inflight_full_block_requests.len()).rev() { + let mut request = self.inflight_full_block_requests.swap_remove(idx); + if let Poll::Ready(block) = request.poll_unpin(cx) { + self.queued_events.push_back(EngineSyncEvent::FetchedFullBlock(block)); + } else { + // still pending + self.inflight_full_block_requests.push(request); + } + } + + if !self.pipeline_state.is_idle() || self.queued_events.is_empty() { + // can not make any progress + return Poll::Pending + } + } + } +} + +/// The event type emitted by the [EngineSyncController]. +#[derive(Debug)] +pub(crate) enum EngineSyncEvent { + /// A full block has been downloaded from the network. + FetchedFullBlock(SealedBlock), + /// Pipeline started syncing + /// + /// This is none if the pipeline is triggered without a specific target. + PipelineStarted(Option), + /// Pipeline finished + /// + /// If this is returned, the pipeline is idle. + PipelineFinished { + /// Final result of the pipeline run. + result: Result, + /// Whether the pipeline reached the configured `max_block`. + /// + /// Note: this is only relevant in debugging scenarios. + reached_max_block: bool, + }, + /// Pipeline task was dropped after it was started, unable to receive it because channel + /// closed. This would indicate a panicked pipeline task + PipelineTaskDropped, +} + +/// The possible pipeline states within the sync controller. +/// +/// [PipelineState::Idle] means that the pipeline is currently idle. +/// [PipelineState::Running] means that the pipeline is currently running. +/// +/// NOTE: The differentiation between these two states is important, because when the pipeline is +/// running, it acquires the write lock over the database. This means that we cannot forward to the +/// blockchain tree any messages that would result in database writes, since it would result in a +/// deadlock. +enum PipelineState { + /// Pipeline is idle. + Idle(Option>), + /// Pipeline is running and waiting for a response + Running(oneshot::Receiver>), +} + +impl PipelineState { + /// Returns `true` if the state matches idle. + fn is_idle(&self) -> bool { + matches!(self, PipelineState::Idle(_)) + } +} diff --git a/crates/interfaces/src/blockchain_tree.rs b/crates/interfaces/src/blockchain_tree.rs index ee37c1655..03d69c50b 100644 --- a/crates/interfaces/src/blockchain_tree.rs +++ b/crates/interfaces/src/blockchain_tree.rs @@ -7,8 +7,8 @@ use std::collections::{BTreeMap, HashSet}; /// * [BlockchainTreeEngine::finalize_block]: Remove chains that join to now finalized block, as /// chain becomes invalid. /// * [BlockchainTreeEngine::make_canonical]: Check if we have the hash of block that we want to -/// finalize and commit it to db. If we dont have the block, pipeline syncing should start to -/// fetch the blocks from p2p. Do reorg in tables if canonical chain if needed. +/// finalize and commit it to db. If we don't have the block, syncing should start to fetch the +/// blocks from p2p. Do reorg in tables if canonical chain if needed. pub trait BlockchainTreeEngine: BlockchainTreeViewer + Send + Sync { /// Recover senders and call [`BlockchainTreeEngine::insert_block`]. fn insert_block_without_senders(&self, block: SealedBlock) -> Result { @@ -98,11 +98,14 @@ pub trait BlockchainTreeViewer: Send + Sync { /// Canonical block number and hashes best known by the tree. fn canonical_blocks(&self) -> BTreeMap; - /// Given a hash, this tries to find the last ancestor that is part of the canonical chain. + /// Given the parent hash of a block, this tries to find the last ancestor that is part of the + /// canonical chain. /// /// In other words, this will walk up the (side) chain starting with the given hash and return /// the first block that's canonical. - fn find_canonical_ancestor(&self, hash: BlockHash) -> Option; + /// + /// Note: this could be the given `parent_hash` if it's already canonical. + fn find_canonical_ancestor(&self, parent_hash: BlockHash) -> Option; /// Return BlockchainTree best known canonical chain tip (BlockHash, BlockNumber) fn canonical_tip(&self) -> BlockNumHash; diff --git a/crates/interfaces/src/p2p/full_block.rs b/crates/interfaces/src/p2p/full_block.rs index e934faac9..bb13a3e9e 100644 --- a/crates/interfaces/src/p2p/full_block.rs +++ b/crates/interfaces/src/p2p/full_block.rs @@ -67,6 +67,11 @@ impl FetchFullBlockFuture where Client: BodiesClient + HeadersClient, { + /// Returns the hash of the block being requested. + pub fn hash(&self) -> &H256 { + &self.hash + } + /// If the header request is already complete, this returns the block number pub fn block_number(&self) -> Option { self.header.as_ref().map(|h| h.number) diff --git a/crates/interfaces/src/test_utils/full_block.rs b/crates/interfaces/src/test_utils/full_block.rs new file mode 100644 index 000000000..b192c6b96 --- /dev/null +++ b/crates/interfaces/src/test_utils/full_block.rs @@ -0,0 +1,45 @@ +use crate::p2p::{ + bodies::client::BodiesClient, + download::DownloadClient, + error::PeerRequestResult, + headers::client::{HeadersClient, HeadersRequest}, + priority::Priority, +}; +use reth_primitives::{BlockBody, Header, PeerId, WithPeerId, H256}; + +/// A headers+bodies client implementation that does nothing. +#[derive(Debug, Default, Clone)] +#[non_exhaustive] +pub struct NoopFullBlockClient; + +impl DownloadClient for NoopFullBlockClient { + fn report_bad_message(&self, _peer_id: PeerId) {} + + fn num_connected_peers(&self) -> usize { + 0 + } +} + +impl BodiesClient for NoopFullBlockClient { + type Output = futures::future::Ready>>; + + fn get_block_bodies_with_priority( + &self, + _hashes: Vec, + _priority: Priority, + ) -> Self::Output { + futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![]))) + } +} + +impl HeadersClient for NoopFullBlockClient { + type Output = futures::future::Ready>>; + + fn get_headers_with_priority( + &self, + _request: HeadersRequest, + _priority: Priority, + ) -> Self::Output { + futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![]))) + } +} diff --git a/crates/interfaces/src/test_utils/mod.rs b/crates/interfaces/src/test_utils/mod.rs index b64acb592..60c73bdaa 100644 --- a/crates/interfaces/src/test_utils/mod.rs +++ b/crates/interfaces/src/test_utils/mod.rs @@ -1,10 +1,12 @@ #![allow(unused)] mod bodies; +mod full_block; mod headers; /// Generators for different data structures like block headers, block bodies and ranges of those. pub mod generators; pub use bodies::*; +pub use full_block::*; pub use headers::*; diff --git a/crates/net/downloaders/src/test_utils/test_client.rs b/crates/net/downloaders/src/test_utils/bodies_client.rs similarity index 100% rename from crates/net/downloaders/src/test_utils/test_client.rs rename to crates/net/downloaders/src/test_utils/bodies_client.rs diff --git a/crates/net/downloaders/src/test_utils/mod.rs b/crates/net/downloaders/src/test_utils/mod.rs index a436be228..ad363ebef 100644 --- a/crates/net/downloaders/src/test_utils/mod.rs +++ b/crates/net/downloaders/src/test_utils/mod.rs @@ -11,6 +11,14 @@ use tokio::{ }; use tokio_util::codec::FramedWrite; +mod bodies_client; +mod file_client; +mod file_codec; + +pub use bodies_client::TestBodiesClient; +pub use file_client::{FileClient, FileClientError}; +pub(crate) use file_codec::BlockFileCodec; + /// Metrics scope used for testing. pub(crate) const TEST_SCOPE: &str = "downloaders.test"; @@ -59,11 +67,3 @@ pub(crate) async fn generate_bodies_file( file.seek(SeekFrom::Start(0)).await.unwrap(); (file, headers, bodies) } - -mod file_client; -mod file_codec; -mod test_client; - -pub use file_client::{FileClient, FileClientError}; -pub(crate) use file_codec::BlockFileCodec; -pub use test_client::TestBodiesClient; diff --git a/crates/stages/src/pipeline/ctrl.rs b/crates/stages/src/pipeline/ctrl.rs index ee712cdfd..05f84c2b0 100644 --- a/crates/stages/src/pipeline/ctrl.rs +++ b/crates/stages/src/pipeline/ctrl.rs @@ -15,6 +15,7 @@ pub enum ControlFlow { /// The progress of the last stage progress: BlockNumber, }, + /// Pipeline made no progress NoProgress { /// The current stage progress. stage_progress: Option, @@ -22,10 +23,12 @@ pub enum ControlFlow { } impl ControlFlow { + /// Whether the pipeline should continue executing stages. pub fn should_continue(&self) -> bool { matches!(self, ControlFlow::Continue { .. } | ControlFlow::NoProgress { .. }) } + /// Returns true if the control flow is unwind. pub fn is_unwind(&self) -> bool { matches!(self, ControlFlow::Unwind { .. }) } diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index 310426be5..f15b6ef2b 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -16,8 +16,8 @@ mod progress; mod set; mod sync_metrics; +pub use crate::pipeline::ctrl::ControlFlow; pub use builder::*; -use ctrl::*; pub use event::*; use progress::*; pub use set::*; @@ -113,8 +113,8 @@ where } /// Return the minimum pipeline progress - pub fn minimum_progress(&self) -> &Option { - &self.progress.minimum_progress + pub fn minimum_progress(&self) -> Option { + self.progress.minimum_progress } /// Set tip for reverse sync.