mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat: extend engine syncing with single block downloads (#2626)
This commit is contained in:
@ -240,6 +240,7 @@ impl Command {
|
||||
.await?;
|
||||
info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network");
|
||||
debug!(target: "reth::cli", peer_id = ?network.peer_id(), "Full peer ID");
|
||||
let network_client = network.fetch_client().await?;
|
||||
|
||||
let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
|
||||
|
||||
@ -310,11 +311,10 @@ impl Command {
|
||||
|
||||
pipeline
|
||||
} else {
|
||||
let client = network.fetch_client().await?;
|
||||
self.build_networked_pipeline(
|
||||
&mut config,
|
||||
network.clone(),
|
||||
client,
|
||||
network_client.clone(),
|
||||
Arc::clone(&consensus),
|
||||
db.clone(),
|
||||
&ctx.task_executor,
|
||||
@ -339,9 +339,10 @@ impl Command {
|
||||
let pipeline_events = pipeline.events();
|
||||
let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel(
|
||||
Arc::clone(&db),
|
||||
ctx.task_executor.clone(),
|
||||
network_client,
|
||||
pipeline,
|
||||
blockchain_db.clone(),
|
||||
Box::new(ctx.task_executor.clone()),
|
||||
self.debug.max_block,
|
||||
self.debug.continuous,
|
||||
payload_builder.clone(),
|
||||
|
||||
@ -95,8 +95,7 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTreeViewer
|
||||
self.tree.read().block_indices().canonical_chain().inner().clone()
|
||||
}
|
||||
|
||||
fn find_canonical_ancestor(&self, hash: BlockHash) -> Option<BlockHash> {
|
||||
let mut parent = hash;
|
||||
fn find_canonical_ancestor(&self, mut parent: BlockHash) -> Option<BlockHash> {
|
||||
let tree = self.tree.read();
|
||||
|
||||
// walk up the tree and check if the parent is in the sidechain
|
||||
|
||||
@ -1,10 +1,14 @@
|
||||
use crate::engine::{message::OnForkChoiceUpdated, metrics::Metrics};
|
||||
use futures::{Future, FutureExt, StreamExt, TryFutureExt};
|
||||
use crate::{
|
||||
engine::{message::OnForkChoiceUpdated, metrics::Metrics},
|
||||
sync::{EngineSyncController, EngineSyncEvent},
|
||||
};
|
||||
use futures::{Future, StreamExt, TryFutureExt};
|
||||
use reth_db::{database::Database, tables, transaction::DbTx};
|
||||
use reth_interfaces::{
|
||||
blockchain_tree::{BlockStatus, BlockchainTreeEngine},
|
||||
consensus::ForkchoiceState,
|
||||
executor::Error as ExecutorError,
|
||||
p2p::{bodies::client::BodiesClient, headers::client::HeadersClient},
|
||||
Error,
|
||||
};
|
||||
use reth_payload_builder::{PayloadBuilderAttributes, PayloadBuilderHandle};
|
||||
@ -39,11 +43,9 @@ pub use error::{BeaconEngineError, BeaconEngineResult, BeaconForkChoiceUpdateErr
|
||||
|
||||
mod metrics;
|
||||
|
||||
mod pipeline_state;
|
||||
|
||||
pub use pipeline_state::PipelineState;
|
||||
|
||||
mod event;
|
||||
pub(crate) mod sync;
|
||||
|
||||
pub use event::BeaconConsensusEngineEvent;
|
||||
|
||||
/// The maximum number of invalid headers that can be tracked by the engine.
|
||||
@ -134,20 +136,16 @@ impl BeaconConsensusEngineHandle {
|
||||
///
|
||||
/// If the future is polled more than once. Leads to undefined state.
|
||||
#[must_use = "Future does nothing unless polled"]
|
||||
pub struct BeaconConsensusEngine<DB, TS, BT>
|
||||
pub struct BeaconConsensusEngine<DB, BT, Client>
|
||||
where
|
||||
DB: Database,
|
||||
TS: TaskSpawner,
|
||||
Client: HeadersClient + BodiesClient,
|
||||
BT: BlockchainTreeEngine + BlockProvider + CanonChainTracker,
|
||||
{
|
||||
/// The database handle.
|
||||
db: DB,
|
||||
/// Task spawner for spawning the pipeline.
|
||||
task_spawner: TS,
|
||||
/// The current state of the pipeline.
|
||||
/// Must always be [Some] unless the state is being reevaluated.
|
||||
/// The pipeline is used for historical sync by setting the current forkchoice head.
|
||||
pipeline_state: Option<PipelineState<DB>>,
|
||||
/// Controls syncing triggered by engine updates.
|
||||
sync: EngineSyncController<DB, Client>,
|
||||
/// The type we can use to query both the database and the blockchain tree.
|
||||
blockchain: BT,
|
||||
/// The Engine API message receiver.
|
||||
@ -157,14 +155,6 @@ where
|
||||
/// Current forkchoice state. The engine must receive the initial state in order to start
|
||||
/// syncing.
|
||||
forkchoice_state: Option<ForkchoiceState>,
|
||||
/// Next action that the engine should take after the pipeline finished running.
|
||||
next_action: BeaconEngineAction,
|
||||
/// Max block after which the consensus engine would terminate the sync. Used for debugging
|
||||
/// purposes.
|
||||
max_block: Option<BlockNumber>,
|
||||
/// If true, the engine will run the pipeline continuously, regardless of whether or not there
|
||||
/// is a new fork choice state.
|
||||
continuous: bool,
|
||||
/// The payload store.
|
||||
payload_builder: PayloadBuilderHandle,
|
||||
/// Listeners for engine events.
|
||||
@ -176,30 +166,33 @@ where
|
||||
metrics: Metrics,
|
||||
}
|
||||
|
||||
impl<DB, TS, BT> BeaconConsensusEngine<DB, TS, BT>
|
||||
impl<DB, BT, Client> BeaconConsensusEngine<DB, BT, Client>
|
||||
where
|
||||
DB: Database + Unpin + 'static,
|
||||
TS: TaskSpawner,
|
||||
BT: BlockchainTreeEngine + BlockProvider + CanonChainTracker + 'static,
|
||||
Client: HeadersClient + BodiesClient + Clone + Unpin + 'static,
|
||||
{
|
||||
/// Create a new instance of the [BeaconConsensusEngine].
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
db: DB,
|
||||
task_spawner: TS,
|
||||
client: Client,
|
||||
pipeline: Pipeline<DB>,
|
||||
blockchain: BT,
|
||||
task_spawner: Box<dyn TaskSpawner>,
|
||||
max_block: Option<BlockNumber>,
|
||||
continuous: bool,
|
||||
run_pipeline_continuously: bool,
|
||||
payload_builder: PayloadBuilderHandle,
|
||||
) -> (Self, BeaconConsensusEngineHandle) {
|
||||
let (to_engine, rx) = mpsc::unbounded_channel();
|
||||
Self::with_channel(
|
||||
db,
|
||||
task_spawner,
|
||||
client,
|
||||
pipeline,
|
||||
blockchain,
|
||||
task_spawner,
|
||||
max_block,
|
||||
continuous,
|
||||
run_pipeline_continuously,
|
||||
payload_builder,
|
||||
to_engine,
|
||||
rx,
|
||||
@ -211,27 +204,31 @@ where
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn with_channel(
|
||||
db: DB,
|
||||
task_spawner: TS,
|
||||
client: Client,
|
||||
pipeline: Pipeline<DB>,
|
||||
blockchain: BT,
|
||||
task_spawner: Box<dyn TaskSpawner>,
|
||||
max_block: Option<BlockNumber>,
|
||||
continuous: bool,
|
||||
run_pipeline_continuously: bool,
|
||||
payload_builder: PayloadBuilderHandle,
|
||||
to_engine: UnboundedSender<BeaconEngineMessage>,
|
||||
rx: UnboundedReceiver<BeaconEngineMessage>,
|
||||
) -> (Self, BeaconConsensusEngineHandle) {
|
||||
let handle = BeaconConsensusEngineHandle { to_engine };
|
||||
let sync = EngineSyncController::new(
|
||||
pipeline,
|
||||
client,
|
||||
task_spawner,
|
||||
run_pipeline_continuously,
|
||||
max_block,
|
||||
);
|
||||
let this = Self {
|
||||
db,
|
||||
task_spawner,
|
||||
pipeline_state: Some(PipelineState::Idle(pipeline)),
|
||||
sync,
|
||||
blockchain,
|
||||
engine_message_rx: UnboundedReceiverStream::new(rx),
|
||||
handle: handle.clone(),
|
||||
forkchoice_state: None,
|
||||
next_action: BeaconEngineAction::None,
|
||||
max_block,
|
||||
continuous,
|
||||
payload_builder,
|
||||
listeners: EventListeners::default(),
|
||||
invalid_headers: InvalidHeaderCache::new(MAX_INVALID_HEADERS),
|
||||
@ -249,17 +246,6 @@ where
|
||||
self.handle.clone()
|
||||
}
|
||||
|
||||
/// Returns `true` if the pipeline is currently idle.
|
||||
fn is_pipeline_idle(&self) -> bool {
|
||||
self.pipeline_state.as_ref().expect("pipeline state is set").is_idle()
|
||||
}
|
||||
|
||||
/// Set next action to [BeaconEngineAction::RunPipeline] to indicate that
|
||||
/// consensus engine needs to run the pipeline as soon as it becomes available.
|
||||
fn require_pipeline_run(&mut self, target: PipelineTarget) {
|
||||
self.next_action = BeaconEngineAction::RunPipeline(target);
|
||||
}
|
||||
|
||||
/// If validation fails, the response MUST contain the latest valid hash:
|
||||
///
|
||||
/// - The block hash of the ancestor of the invalid payload satisfying the following two
|
||||
@ -280,6 +266,10 @@ where
|
||||
return Some(H256::zero())
|
||||
}
|
||||
|
||||
// TODO(mattsse): This could be invoked on new payload which does not make tree canonical,
|
||||
// which would make this inaccurate, e.g. if an invalid payload is received in this
|
||||
// scenario: FUC (unknown head) -> valid payload -> invalid payload
|
||||
|
||||
self.blockchain.find_canonical_ancestor(parent_hash)
|
||||
}
|
||||
|
||||
@ -334,7 +324,10 @@ where
|
||||
|
||||
let is_first_forkchoice = self.forkchoice_state.is_none();
|
||||
self.forkchoice_state = Some(state);
|
||||
let status = if self.is_pipeline_idle() {
|
||||
|
||||
let status = if self.sync.is_pipeline_idle() {
|
||||
// We can only process new forkchoice updates if the pipeline is idle, since it requires
|
||||
// exclusive access to the database
|
||||
match self.blockchain.make_canonical(&state.head_block_hash) {
|
||||
Ok(_) => {
|
||||
let head_block_number = self
|
||||
@ -347,7 +340,9 @@ where
|
||||
|
||||
if pipeline_min_progress < head_block_number {
|
||||
debug!(target: "consensus::engine", last_finished=pipeline_min_progress, head_number=head_block_number, "pipeline run to head required");
|
||||
self.require_pipeline_run(PipelineTarget::Head);
|
||||
|
||||
// TODO(mattsse) ideally sync blockwise
|
||||
self.sync.set_pipeline_sync_target(state.head_block_hash);
|
||||
}
|
||||
|
||||
if let Some(attrs) = attrs {
|
||||
@ -441,6 +436,7 @@ where
|
||||
error: Error,
|
||||
is_first_forkchoice: bool,
|
||||
) -> PayloadStatus {
|
||||
debug_assert!(self.sync.is_pipeline_idle(), "pipeline must be idle");
|
||||
warn!(target: "consensus::engine", ?error, ?state, "Error canonicalizing the head hash");
|
||||
|
||||
// check if the new head was previously invalidated, if so then we deem this FCU
|
||||
@ -450,26 +446,42 @@ where
|
||||
return invalid_ancestor
|
||||
}
|
||||
|
||||
// If this is the first forkchoice received, start downloading from safe block
|
||||
// hash, if we have that block.
|
||||
let target = if is_first_forkchoice &&
|
||||
!state.safe_block_hash.is_zero() &&
|
||||
self.get_block_number(state.safe_block_hash).ok().flatten().is_none()
|
||||
{
|
||||
PipelineTarget::Safe
|
||||
} else {
|
||||
PipelineTarget::Head
|
||||
};
|
||||
self.require_pipeline_run(target);
|
||||
match error {
|
||||
#[allow(clippy::single_match)]
|
||||
match &error {
|
||||
Error::Execution(error @ ExecutorError::BlockPreMerge { .. }) => {
|
||||
PayloadStatus::from_status(PayloadStatusEnum::Invalid {
|
||||
return PayloadStatus::from_status(PayloadStatusEnum::Invalid {
|
||||
validation_error: error.to_string(),
|
||||
})
|
||||
.with_latest_valid_hash(H256::zero())
|
||||
}
|
||||
_ => PayloadStatus::from_status(PayloadStatusEnum::Syncing),
|
||||
_ => {
|
||||
// TODO(mattsse) better error handling before attempting to sync (FCU could be
|
||||
// invalid): only trigger sync if we can't determine whether the FCU is invalid
|
||||
}
|
||||
}
|
||||
|
||||
// we assume the FCU is valid and at least the head is missing, so we need to start syncing
|
||||
// to it
|
||||
|
||||
// if this is the first FCU we received from the beacon node, then we start triggering the
|
||||
// pipeline
|
||||
if is_first_forkchoice {
|
||||
// find the appropriate target to sync to, if we don't have the safe block hash then we
|
||||
// start syncing to the safe block via pipeline first
|
||||
let target = if !state.safe_block_hash.is_zero() &&
|
||||
self.get_block_number(state.safe_block_hash).ok().flatten().is_none()
|
||||
{
|
||||
state.safe_block_hash
|
||||
} else {
|
||||
state.head_block_hash
|
||||
};
|
||||
self.sync.set_pipeline_sync_target(target);
|
||||
} else {
|
||||
// trigger a full block download for the _missing_ new head
|
||||
self.sync.download_full_block(state.head_block_hash)
|
||||
}
|
||||
|
||||
PayloadStatus::from_status(PayloadStatusEnum::Syncing)
|
||||
}
|
||||
|
||||
/// Validates the payload attributes with respect to the header and fork choice state.
|
||||
@ -555,7 +567,9 @@ where
|
||||
|
||||
let header = block.header.clone();
|
||||
|
||||
let status = if self.is_pipeline_idle() {
|
||||
let status = if self.sync.is_pipeline_idle() {
|
||||
// we can only insert new payloads if the pipeline is _not_ running, because it holds
|
||||
// exclusive access to the database
|
||||
match self.blockchain.insert_block_without_senders(block) {
|
||||
Ok(status) => {
|
||||
let mut latest_valid_hash = None;
|
||||
@ -603,43 +617,6 @@ where
|
||||
status
|
||||
}
|
||||
|
||||
/// Returns the next pipeline state depending on the current value of the next action.
|
||||
/// Resets the next action to the default value.
|
||||
fn next_pipeline_state(
|
||||
&mut self,
|
||||
pipeline: Pipeline<DB>,
|
||||
forkchoice_state: ForkchoiceState,
|
||||
) -> PipelineState<DB> {
|
||||
let next_action = std::mem::take(&mut self.next_action);
|
||||
|
||||
let (tip, should_run_pipeline) = match next_action {
|
||||
BeaconEngineAction::RunPipeline(target) => {
|
||||
let tip = match target {
|
||||
PipelineTarget::Head => forkchoice_state.head_block_hash,
|
||||
PipelineTarget::Safe => forkchoice_state.safe_block_hash,
|
||||
};
|
||||
(Some(tip), true)
|
||||
}
|
||||
BeaconEngineAction::None => (None, self.continuous),
|
||||
};
|
||||
|
||||
if should_run_pipeline {
|
||||
self.metrics.pipeline_runs.increment(1);
|
||||
trace!(target: "consensus::engine", ?tip, continuous = tip.is_none(), "Starting the pipeline");
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.task_spawner.spawn_critical_blocking(
|
||||
"pipeline",
|
||||
Box::pin(async move {
|
||||
let result = pipeline.run_as_fut(tip).await;
|
||||
let _ = tx.send(result);
|
||||
}),
|
||||
);
|
||||
PipelineState::Running(rx)
|
||||
} else {
|
||||
PipelineState::Idle(pipeline)
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempt to restore the tree with the finalized block number.
|
||||
/// If the finalized block is missing from the database, trigger the pipeline run.
|
||||
fn restore_tree_if_possible(
|
||||
@ -658,31 +635,73 @@ where
|
||||
}
|
||||
None => true,
|
||||
};
|
||||
|
||||
if needs_pipeline_run {
|
||||
self.require_pipeline_run(PipelineTarget::Head);
|
||||
self.sync.set_pipeline_sync_target(state.head_block_hash);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check if the engine reached max block as specified by `max_block` parameter.
|
||||
fn has_reached_max_block(&self, progress: BlockNumber) -> bool {
|
||||
if self.max_block.map_or(false, |target| progress >= target) {
|
||||
trace!(
|
||||
target: "consensus::engine",
|
||||
?progress,
|
||||
max_block = ?self.max_block,
|
||||
"Consensus engine reached max block."
|
||||
);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Retrieve the block number for the given block hash.
|
||||
fn get_block_number(&self, hash: H256) -> Result<Option<BlockNumber>, reth_interfaces::Error> {
|
||||
Ok(self.db.view(|tx| tx.get::<tables::HeaderNumbers>(hash))??)
|
||||
}
|
||||
|
||||
/// Event handler for events emitted by the [EngineSyncController].
|
||||
///
|
||||
/// This returns a result to indicate whether the engine future should resolve (fatal error).
|
||||
fn on_sync_event(
|
||||
&mut self,
|
||||
ev: EngineSyncEvent,
|
||||
current_state: &ForkchoiceState,
|
||||
) -> Option<Result<(), BeaconEngineError>> {
|
||||
match ev {
|
||||
EngineSyncEvent::FetchedFullBlock(block) => {
|
||||
// it is guaranteed that the pipeline is not active at this point.
|
||||
|
||||
// TODO(mattsse): better error handling and start closing the gap if there's any by
|
||||
// closing the gap either via pipeline, or by fetching the blocks via block number
|
||||
// [head..FCU.number]
|
||||
|
||||
let hash = block.hash;
|
||||
if !self.on_new_payload(block.into()).is_valid() {
|
||||
// if the payload is invalid we run the pipeline
|
||||
self.sync.set_pipeline_sync_target(hash);
|
||||
}
|
||||
}
|
||||
EngineSyncEvent::PipelineStarted(target) => {
|
||||
trace!(target: "consensus::engine", ?target, continuous = target.is_none(), "Started the pipeline");
|
||||
self.metrics.pipeline_runs.increment(1);
|
||||
}
|
||||
EngineSyncEvent::PipelineTaskDropped => {
|
||||
error!(target: "consensus::engine", "Failed to receive spawned pipeline");
|
||||
return Some(Err(BeaconEngineError::PipelineChannelClosed))
|
||||
}
|
||||
EngineSyncEvent::PipelineFinished { result, reached_max_block } => {
|
||||
match result {
|
||||
Ok(ctrl) => {
|
||||
if ctrl.is_unwind() {
|
||||
self.sync.set_pipeline_sync_target(current_state.head_block_hash);
|
||||
} else if reached_max_block {
|
||||
// Terminate the sync early if it's reached the maximum user
|
||||
// configured block.
|
||||
return Some(Ok(()))
|
||||
}
|
||||
|
||||
// Update the state and hashes of the blockchain tree if possible
|
||||
if let Err(error) = self.restore_tree_if_possible(*current_state) {
|
||||
error!(target: "consensus::engine", ?error, "Error restoring blockchain tree");
|
||||
return Some(Err(error.into()))
|
||||
}
|
||||
}
|
||||
// Any pipeline error at this point is fatal.
|
||||
Err(error) => return Some(Err(error.into())),
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// On initialization, the consensus engine will poll the message receiver and return
|
||||
@ -692,10 +711,10 @@ where
|
||||
/// local forkchoice state, it will launch the pipeline to sync to the head hash.
|
||||
/// While the pipeline is syncing, the consensus engine will keep processing messages from the
|
||||
/// receiver and forwarding them to the blockchain tree.
|
||||
impl<DB, TS, BT> Future for BeaconConsensusEngine<DB, TS, BT>
|
||||
impl<DB, BT, Client> Future for BeaconConsensusEngine<DB, BT, Client>
|
||||
where
|
||||
DB: Database + Unpin + 'static,
|
||||
TS: TaskSpawner + Unpin,
|
||||
Client: HeadersClient + BodiesClient + Clone + Unpin + 'static,
|
||||
BT: BlockchainTreeEngine + BlockProvider + CanonChainTracker + Unpin + 'static,
|
||||
{
|
||||
type Output = Result<(), BeaconEngineError>;
|
||||
@ -703,127 +722,58 @@ where
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.get_mut();
|
||||
|
||||
// Set the next pipeline state.
|
||||
loop {
|
||||
// Process all incoming messages first.
|
||||
while let Poll::Ready(Some(msg)) = this.engine_message_rx.poll_next_unpin(cx) {
|
||||
match msg {
|
||||
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
|
||||
this.metrics.forkchoice_updated_messages.increment(1);
|
||||
let on_updated = match this.on_forkchoice_updated(state, payload_attrs) {
|
||||
Ok(response) => response,
|
||||
Err(error) => {
|
||||
error!(target: "consensus::engine", ?state, ?error, "Error getting forkchoice updated response");
|
||||
return Poll::Ready(Err(error))
|
||||
}
|
||||
};
|
||||
let is_valid_response = on_updated.is_valid_update();
|
||||
let _ = tx.send(on_updated);
|
||||
|
||||
// Terminate the sync early if it's reached the maximum user
|
||||
// configured block.
|
||||
if is_valid_response {
|
||||
let tip_number = this.blockchain.canonical_tip().number;
|
||||
if this.has_reached_max_block(tip_number) {
|
||||
return Poll::Ready(Ok(()))
|
||||
}
|
||||
// Process all incoming messages first.
|
||||
while let Poll::Ready(Some(msg)) = this.engine_message_rx.poll_next_unpin(cx) {
|
||||
match msg {
|
||||
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
|
||||
this.metrics.forkchoice_updated_messages.increment(1);
|
||||
let on_updated = match this.on_forkchoice_updated(state, payload_attrs) {
|
||||
Ok(response) => response,
|
||||
Err(error) => {
|
||||
error!(target: "consensus::engine", ?state, ?error, "Error getting forkchoice updated response");
|
||||
return Poll::Ready(Err(error))
|
||||
}
|
||||
}
|
||||
BeaconEngineMessage::NewPayload { payload, tx } => {
|
||||
this.metrics.new_payload_messages.increment(1);
|
||||
let status = this.on_new_payload(payload);
|
||||
let _ = tx.send(Ok(status));
|
||||
}
|
||||
BeaconEngineMessage::EventListener(tx) => {
|
||||
this.listeners.push_listener(tx);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
let is_valid_response = on_updated.is_valid_update();
|
||||
let _ = tx.send(on_updated);
|
||||
|
||||
// Lookup the forkchoice state. We can't launch the pipeline without the tip.
|
||||
let forkchoice_state = match &this.forkchoice_state {
|
||||
Some(state) => *state,
|
||||
None => return Poll::Pending,
|
||||
};
|
||||
|
||||
let next_state = match this.pipeline_state.take().expect("pipeline state is set") {
|
||||
PipelineState::Running(mut fut) => {
|
||||
match fut.poll_unpin(cx) {
|
||||
Poll::Ready(Ok((pipeline, result))) => {
|
||||
if let Err(error) = result {
|
||||
return Poll::Ready(Err(error.into()))
|
||||
}
|
||||
|
||||
match result {
|
||||
Ok(ctrl) => {
|
||||
if ctrl.is_unwind() {
|
||||
this.require_pipeline_run(PipelineTarget::Head);
|
||||
} else {
|
||||
// Terminate the sync early if it's reached the maximum user
|
||||
// configured block.
|
||||
let minimum_pipeline_progress =
|
||||
pipeline.minimum_progress().unwrap_or_default();
|
||||
if this.has_reached_max_block(minimum_pipeline_progress) {
|
||||
return Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
}
|
||||
// Any pipeline error at this point is fatal.
|
||||
Err(error) => return Poll::Ready(Err(error.into())),
|
||||
};
|
||||
|
||||
// Update the state and hashes of the blockchain tree if possible
|
||||
if let Err(error) = this.restore_tree_if_possible(forkchoice_state) {
|
||||
error!(target: "consensus::engine", ?error, "Error restoring blockchain tree");
|
||||
return Poll::Ready(Err(error.into()))
|
||||
}
|
||||
|
||||
// Get next pipeline state.
|
||||
this.next_pipeline_state(pipeline, forkchoice_state)
|
||||
}
|
||||
Poll::Ready(Err(error)) => {
|
||||
error!(target: "consensus::engine", ?error, "Failed to receive pipeline result");
|
||||
return Poll::Ready(Err(BeaconEngineError::PipelineChannelClosed))
|
||||
}
|
||||
Poll::Pending => {
|
||||
this.pipeline_state = Some(PipelineState::Running(fut));
|
||||
return Poll::Pending
|
||||
// Terminate the sync early if it's reached the maximum user
|
||||
// configured block.
|
||||
if is_valid_response {
|
||||
let tip_number = this.blockchain.canonical_tip().number;
|
||||
if this.sync.has_reached_max_block(tip_number) {
|
||||
return Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
}
|
||||
PipelineState::Idle(pipeline) => {
|
||||
this.next_pipeline_state(pipeline, forkchoice_state)
|
||||
BeaconEngineMessage::NewPayload { payload, tx } => {
|
||||
this.metrics.new_payload_messages.increment(1);
|
||||
let status = this.on_new_payload(payload);
|
||||
let _ = tx.send(Ok(status));
|
||||
}
|
||||
BeaconEngineMessage::EventListener(tx) => {
|
||||
this.listeners.push_listener(tx);
|
||||
}
|
||||
};
|
||||
this.pipeline_state = Some(next_state);
|
||||
|
||||
// If the pipeline is idle, break from the loop.
|
||||
if this.is_pipeline_idle() {
|
||||
return Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
// Lookup the forkchoice state. We can't launch the pipeline without the tip.
|
||||
let forkchoice_state = match &this.forkchoice_state {
|
||||
Some(state) => *state,
|
||||
None => return Poll::Pending,
|
||||
};
|
||||
|
||||
// poll sync controller
|
||||
while let Poll::Ready(sync_event) = this.sync.poll(cx) {
|
||||
if let Some(res) = this.on_sync_event(sync_event, &forkchoice_state) {
|
||||
return Poll::Ready(res)
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
/// Denotes the next action that the [BeaconConsensusEngine] should take.
|
||||
#[derive(Debug, Default)]
|
||||
enum BeaconEngineAction {
|
||||
#[default]
|
||||
None,
|
||||
/// Contains the type of target hash to pass to the pipeline
|
||||
RunPipeline(PipelineTarget),
|
||||
}
|
||||
|
||||
/// The target hash to pass to the pipeline.
|
||||
#[derive(Debug, Default)]
|
||||
enum PipelineTarget {
|
||||
/// Corresponds to the head block hash.
|
||||
#[default]
|
||||
Head,
|
||||
/// Corresponds to the safe block hash.
|
||||
Safe,
|
||||
}
|
||||
|
||||
/// Keeps track of invalid headers.
|
||||
struct InvalidHeaderCache {
|
||||
headers: LruMap<H256, Header>,
|
||||
@ -857,7 +807,7 @@ mod tests {
|
||||
BlockchainTree, ShareableBlockchainTree,
|
||||
};
|
||||
use reth_db::mdbx::{test_utils::create_test_rw_db, Env, WriteMap};
|
||||
use reth_interfaces::test_utils::TestConsensus;
|
||||
use reth_interfaces::test_utils::{NoopFullBlockClient, TestConsensus};
|
||||
use reth_payload_builder::test_utils::spawn_test_payload_service;
|
||||
use reth_primitives::{ChainSpec, ChainSpecBuilder, SealedBlockWithSenders, H256, MAINNET};
|
||||
use reth_provider::{
|
||||
@ -874,11 +824,11 @@ mod tests {
|
||||
|
||||
type TestBeaconConsensusEngine = BeaconConsensusEngine<
|
||||
Arc<Env<WriteMap>>,
|
||||
TokioTaskExecutor,
|
||||
BlockchainProvider<
|
||||
Arc<Env<WriteMap>>,
|
||||
ShareableBlockchainTree<Arc<Env<WriteMap>>, TestConsensus, TestExecutorFactory>,
|
||||
>,
|
||||
NoopFullBlockClient,
|
||||
>;
|
||||
|
||||
struct TestEnv<DB> {
|
||||
@ -975,9 +925,10 @@ mod tests {
|
||||
let blockchain_provider = BlockchainProvider::with_latest(shareable_db, tree, latest);
|
||||
let (engine, handle) = BeaconConsensusEngine::new(
|
||||
db.clone(),
|
||||
TokioTaskExecutor::default(),
|
||||
NoopFullBlockClient::default(),
|
||||
pipeline,
|
||||
blockchain_provider,
|
||||
Box::<TokioTaskExecutor>::default(),
|
||||
None,
|
||||
false,
|
||||
payload_builder,
|
||||
@ -1114,7 +1065,7 @@ mod tests {
|
||||
VecDeque::from([Ok(ExecOutput { stage_progress: max_block, done: true })]),
|
||||
Vec::default(),
|
||||
);
|
||||
consensus_engine.max_block = Some(max_block);
|
||||
consensus_engine.sync.set_max_block(max_block);
|
||||
let rx = spawn_consensus_engine(consensus_engine);
|
||||
|
||||
let _ = env
|
||||
|
||||
@ -1,26 +0,0 @@
|
||||
use reth_db::database::Database;
|
||||
use reth_stages::{Pipeline, PipelineWithResult};
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
/// The possible pipeline states within the sync controller.
|
||||
///
|
||||
/// [PipelineState::Idle] means that the pipeline is currently idle.
|
||||
/// [PipelineState::Running] means that the pipeline is currently running.
|
||||
///
|
||||
/// NOTE: The differentiation between these two states is important, because when the pipeline is
|
||||
/// running, it acquires the write lock over the database. This means that we cannot forward to the
|
||||
/// blockchain tree any messages that would result in database writes, since it would result in a
|
||||
/// deadlock.
|
||||
pub enum PipelineState<DB: Database> {
|
||||
/// Pipeline is idle.
|
||||
Idle(Pipeline<DB>),
|
||||
/// Pipeline is running.
|
||||
Running(oneshot::Receiver<PipelineWithResult<DB>>),
|
||||
}
|
||||
|
||||
impl<DB: Database> PipelineState<DB> {
|
||||
/// Returns `true` if the state matches idle.
|
||||
pub fn is_idle(&self) -> bool {
|
||||
matches!(self, PipelineState::Idle(_))
|
||||
}
|
||||
}
|
||||
266
crates/consensus/beacon/src/engine/sync.rs
Normal file
266
crates/consensus/beacon/src/engine/sync.rs
Normal file
@ -0,0 +1,266 @@
|
||||
//! Sync management for the engine implementation.
|
||||
|
||||
use futures::FutureExt;
|
||||
use reth_db::database::Database;
|
||||
use reth_interfaces::p2p::{
|
||||
bodies::client::BodiesClient,
|
||||
full_block::{FetchFullBlockFuture, FullBlockClient},
|
||||
headers::client::HeadersClient,
|
||||
};
|
||||
use reth_primitives::{BlockNumber, SealedBlock, H256};
|
||||
use reth_stages::{ControlFlow, Pipeline, PipelineError, PipelineWithResult};
|
||||
use reth_tasks::TaskSpawner;
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
task::{ready, Context, Poll},
|
||||
};
|
||||
use tokio::sync::oneshot;
|
||||
use tracing::trace;
|
||||
|
||||
/// Manages syncing under the control of the engine.
|
||||
///
|
||||
/// This type controls the [Pipeline] and supports (single) full block downloads.
|
||||
///
|
||||
/// Caution: If the pipeline is running, this type will not emit blocks downloaded from the network
|
||||
/// [EngineSyncEvent::FetchedFullBlock] until the pipeline is idle to prevent commits to the
|
||||
/// database while the pipeline is still active.
|
||||
pub(crate) struct EngineSyncController<DB, Client>
|
||||
where
|
||||
DB: Database,
|
||||
Client: HeadersClient + BodiesClient,
|
||||
{
|
||||
/// A downloader that can download full blocks from the network.
|
||||
full_block_client: FullBlockClient<Client>,
|
||||
/// The type that can spawn the pipeline task.
|
||||
pipeline_task_spawner: Box<dyn TaskSpawner>,
|
||||
/// The current state of the pipeline.
|
||||
/// The pipeline is used for large ranges.
|
||||
pipeline_state: PipelineState<DB>,
|
||||
/// Pending target block for the pipeline to sync
|
||||
pending_pipeline_target: Option<H256>,
|
||||
/// In requests in progress.
|
||||
inflight_full_block_requests: Vec<FetchFullBlockFuture<Client>>,
|
||||
/// Buffered events until the manager is polled and the pipeline is idle.
|
||||
queued_events: VecDeque<EngineSyncEvent>,
|
||||
/// If enabled, the pipeline will be triggered continuously, as soon as it becomes idle
|
||||
run_pipeline_continuously: bool,
|
||||
/// Max block after which the consensus engine would terminate the sync. Used for debugging
|
||||
/// purposes.
|
||||
max_block: Option<BlockNumber>,
|
||||
}
|
||||
|
||||
impl<DB, Client> EngineSyncController<DB, Client>
|
||||
where
|
||||
DB: Database + 'static,
|
||||
Client: HeadersClient + BodiesClient + Clone + Unpin + 'static,
|
||||
{
|
||||
/// Create a new instance
|
||||
pub(crate) fn new(
|
||||
pipeline: Pipeline<DB>,
|
||||
client: Client,
|
||||
pipeline_task_spawner: Box<dyn TaskSpawner>,
|
||||
run_pipeline_continuously: bool,
|
||||
max_block: Option<BlockNumber>,
|
||||
) -> Self {
|
||||
Self {
|
||||
full_block_client: FullBlockClient::new(client),
|
||||
pipeline_task_spawner,
|
||||
pipeline_state: PipelineState::Idle(Some(pipeline)),
|
||||
pending_pipeline_target: None,
|
||||
inflight_full_block_requests: Vec::new(),
|
||||
queued_events: VecDeque::new(),
|
||||
run_pipeline_continuously,
|
||||
max_block,
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the max block value for testing
|
||||
#[cfg(test)]
|
||||
pub(crate) fn set_max_block(&mut self, block: BlockNumber) {
|
||||
self.max_block = Some(block);
|
||||
}
|
||||
|
||||
/// Cancels all full block requests that are in progress.
|
||||
pub(crate) fn clear_full_block_requests(&mut self) {
|
||||
self.inflight_full_block_requests.clear();
|
||||
}
|
||||
|
||||
/// Returns `true` if the pipeline is idle.
|
||||
pub(crate) fn is_pipeline_idle(&self) -> bool {
|
||||
self.pipeline_state.is_idle()
|
||||
}
|
||||
|
||||
/// Starts requesting a full block from the network.
|
||||
pub(crate) fn download_full_block(&mut self, hash: H256) {
|
||||
let request = self.full_block_client.get_full_block(hash);
|
||||
self.inflight_full_block_requests.push(request);
|
||||
}
|
||||
|
||||
/// Sets a new target to sync the pipeline to.
|
||||
pub(crate) fn set_pipeline_sync_target(&mut self, target: H256) {
|
||||
self.pending_pipeline_target = Some(target);
|
||||
}
|
||||
|
||||
/// Check if the engine reached max block as specified by `max_block` parameter.
|
||||
///
|
||||
/// Note: this is mainly for debugging purposes.
|
||||
pub(crate) fn has_reached_max_block(&self, progress: BlockNumber) -> bool {
|
||||
let has_reached_max_block =
|
||||
self.max_block.map(|target| progress >= target).unwrap_or_default();
|
||||
if has_reached_max_block {
|
||||
trace!(
|
||||
target: "consensus::engine",
|
||||
?progress,
|
||||
max_block = ?self.max_block,
|
||||
"Consensus engine reached max block."
|
||||
);
|
||||
}
|
||||
has_reached_max_block
|
||||
}
|
||||
|
||||
/// Advances the pipeline state.
|
||||
///
|
||||
/// This checks for the result in the channel, or returns pending if the pipeline is idle.
|
||||
fn poll_pipeline(&mut self, cx: &mut Context<'_>) -> Poll<EngineSyncEvent> {
|
||||
let res = match self.pipeline_state {
|
||||
PipelineState::Idle(_) => return Poll::Pending,
|
||||
PipelineState::Running(ref mut fut) => {
|
||||
ready!(fut.poll_unpin(cx))
|
||||
}
|
||||
};
|
||||
let ev = match res {
|
||||
Ok((pipeline, result)) => {
|
||||
let minimum_progress = pipeline.minimum_progress();
|
||||
let reached_max_block =
|
||||
self.has_reached_max_block(minimum_progress.unwrap_or_default());
|
||||
self.pipeline_state = PipelineState::Idle(Some(pipeline));
|
||||
EngineSyncEvent::PipelineFinished { result, reached_max_block }
|
||||
}
|
||||
Err(_) => {
|
||||
// failed to receive the pipeline
|
||||
EngineSyncEvent::PipelineTaskDropped
|
||||
}
|
||||
};
|
||||
Poll::Ready(ev)
|
||||
}
|
||||
|
||||
/// This will spawn the pipeline if it is idle and a target is set or if the pipeline is set to
|
||||
/// run continuously.
|
||||
fn try_spawn_pipeline(&mut self) -> Option<EngineSyncEvent> {
|
||||
match &mut self.pipeline_state {
|
||||
PipelineState::Idle(pipeline) => {
|
||||
let target = self.pending_pipeline_target.take();
|
||||
|
||||
if target.is_none() && !self.run_pipeline_continuously {
|
||||
// nothing to sync
|
||||
return None
|
||||
}
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
let pipeline = pipeline.take().expect("exists");
|
||||
self.pipeline_task_spawner.spawn_critical_blocking(
|
||||
"pipeline task",
|
||||
Box::pin(async move {
|
||||
let result = pipeline.run_as_fut(target).await;
|
||||
let _ = tx.send(result);
|
||||
}),
|
||||
);
|
||||
self.pipeline_state = PipelineState::Running(rx);
|
||||
|
||||
// we also clear any pending full block requests because we expect them to be
|
||||
// outdated (included in the range the pipeline is syncing anyway)
|
||||
self.clear_full_block_requests();
|
||||
|
||||
Some(EngineSyncEvent::PipelineStarted(target))
|
||||
}
|
||||
PipelineState::Running(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Advances the sync process.
|
||||
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<EngineSyncEvent> {
|
||||
// try to spawn a pipeline if a target is set
|
||||
if let Some(event) = self.try_spawn_pipeline() {
|
||||
return Poll::Ready(event)
|
||||
}
|
||||
|
||||
loop {
|
||||
// drain buffered events first if pipeline is not running
|
||||
if self.is_pipeline_idle() {
|
||||
if let Some(event) = self.queued_events.pop_front() {
|
||||
return Poll::Ready(event)
|
||||
}
|
||||
} else {
|
||||
// advance the pipeline
|
||||
if let Poll::Ready(event) = self.poll_pipeline(cx) {
|
||||
return Poll::Ready(event)
|
||||
}
|
||||
}
|
||||
|
||||
// advance all requests
|
||||
for idx in (0..self.inflight_full_block_requests.len()).rev() {
|
||||
let mut request = self.inflight_full_block_requests.swap_remove(idx);
|
||||
if let Poll::Ready(block) = request.poll_unpin(cx) {
|
||||
self.queued_events.push_back(EngineSyncEvent::FetchedFullBlock(block));
|
||||
} else {
|
||||
// still pending
|
||||
self.inflight_full_block_requests.push(request);
|
||||
}
|
||||
}
|
||||
|
||||
if !self.pipeline_state.is_idle() || self.queued_events.is_empty() {
|
||||
// can not make any progress
|
||||
return Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The event type emitted by the [EngineSyncController].
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum EngineSyncEvent {
|
||||
/// A full block has been downloaded from the network.
|
||||
FetchedFullBlock(SealedBlock),
|
||||
/// Pipeline started syncing
|
||||
///
|
||||
/// This is none if the pipeline is triggered without a specific target.
|
||||
PipelineStarted(Option<H256>),
|
||||
/// Pipeline finished
|
||||
///
|
||||
/// If this is returned, the pipeline is idle.
|
||||
PipelineFinished {
|
||||
/// Final result of the pipeline run.
|
||||
result: Result<ControlFlow, PipelineError>,
|
||||
/// Whether the pipeline reached the configured `max_block`.
|
||||
///
|
||||
/// Note: this is only relevant in debugging scenarios.
|
||||
reached_max_block: bool,
|
||||
},
|
||||
/// Pipeline task was dropped after it was started, unable to receive it because channel
|
||||
/// closed. This would indicate a panicked pipeline task
|
||||
PipelineTaskDropped,
|
||||
}
|
||||
|
||||
/// The possible pipeline states within the sync controller.
|
||||
///
|
||||
/// [PipelineState::Idle] means that the pipeline is currently idle.
|
||||
/// [PipelineState::Running] means that the pipeline is currently running.
|
||||
///
|
||||
/// NOTE: The differentiation between these two states is important, because when the pipeline is
|
||||
/// running, it acquires the write lock over the database. This means that we cannot forward to the
|
||||
/// blockchain tree any messages that would result in database writes, since it would result in a
|
||||
/// deadlock.
|
||||
enum PipelineState<DB: Database> {
|
||||
/// Pipeline is idle.
|
||||
Idle(Option<Pipeline<DB>>),
|
||||
/// Pipeline is running and waiting for a response
|
||||
Running(oneshot::Receiver<PipelineWithResult<DB>>),
|
||||
}
|
||||
|
||||
impl<DB: Database> PipelineState<DB> {
|
||||
/// Returns `true` if the state matches idle.
|
||||
fn is_idle(&self) -> bool {
|
||||
matches!(self, PipelineState::Idle(_))
|
||||
}
|
||||
}
|
||||
@ -7,8 +7,8 @@ use std::collections::{BTreeMap, HashSet};
|
||||
/// * [BlockchainTreeEngine::finalize_block]: Remove chains that join to now finalized block, as
|
||||
/// chain becomes invalid.
|
||||
/// * [BlockchainTreeEngine::make_canonical]: Check if we have the hash of block that we want to
|
||||
/// finalize and commit it to db. If we dont have the block, pipeline syncing should start to
|
||||
/// fetch the blocks from p2p. Do reorg in tables if canonical chain if needed.
|
||||
/// finalize and commit it to db. If we don't have the block, syncing should start to fetch the
|
||||
/// blocks from p2p. Do reorg in tables if canonical chain if needed.
|
||||
pub trait BlockchainTreeEngine: BlockchainTreeViewer + Send + Sync {
|
||||
/// Recover senders and call [`BlockchainTreeEngine::insert_block`].
|
||||
fn insert_block_without_senders(&self, block: SealedBlock) -> Result<BlockStatus, Error> {
|
||||
@ -98,11 +98,14 @@ pub trait BlockchainTreeViewer: Send + Sync {
|
||||
/// Canonical block number and hashes best known by the tree.
|
||||
fn canonical_blocks(&self) -> BTreeMap<BlockNumber, BlockHash>;
|
||||
|
||||
/// Given a hash, this tries to find the last ancestor that is part of the canonical chain.
|
||||
/// Given the parent hash of a block, this tries to find the last ancestor that is part of the
|
||||
/// canonical chain.
|
||||
///
|
||||
/// In other words, this will walk up the (side) chain starting with the given hash and return
|
||||
/// the first block that's canonical.
|
||||
fn find_canonical_ancestor(&self, hash: BlockHash) -> Option<BlockHash>;
|
||||
///
|
||||
/// Note: this could be the given `parent_hash` if it's already canonical.
|
||||
fn find_canonical_ancestor(&self, parent_hash: BlockHash) -> Option<BlockHash>;
|
||||
|
||||
/// Return BlockchainTree best known canonical chain tip (BlockHash, BlockNumber)
|
||||
fn canonical_tip(&self) -> BlockNumHash;
|
||||
|
||||
@ -67,6 +67,11 @@ impl<Client> FetchFullBlockFuture<Client>
|
||||
where
|
||||
Client: BodiesClient + HeadersClient,
|
||||
{
|
||||
/// Returns the hash of the block being requested.
|
||||
pub fn hash(&self) -> &H256 {
|
||||
&self.hash
|
||||
}
|
||||
|
||||
/// If the header request is already complete, this returns the block number
|
||||
pub fn block_number(&self) -> Option<u64> {
|
||||
self.header.as_ref().map(|h| h.number)
|
||||
|
||||
45
crates/interfaces/src/test_utils/full_block.rs
Normal file
45
crates/interfaces/src/test_utils/full_block.rs
Normal file
@ -0,0 +1,45 @@
|
||||
use crate::p2p::{
|
||||
bodies::client::BodiesClient,
|
||||
download::DownloadClient,
|
||||
error::PeerRequestResult,
|
||||
headers::client::{HeadersClient, HeadersRequest},
|
||||
priority::Priority,
|
||||
};
|
||||
use reth_primitives::{BlockBody, Header, PeerId, WithPeerId, H256};
|
||||
|
||||
/// A headers+bodies client implementation that does nothing.
|
||||
#[derive(Debug, Default, Clone)]
|
||||
#[non_exhaustive]
|
||||
pub struct NoopFullBlockClient;
|
||||
|
||||
impl DownloadClient for NoopFullBlockClient {
|
||||
fn report_bad_message(&self, _peer_id: PeerId) {}
|
||||
|
||||
fn num_connected_peers(&self) -> usize {
|
||||
0
|
||||
}
|
||||
}
|
||||
|
||||
impl BodiesClient for NoopFullBlockClient {
|
||||
type Output = futures::future::Ready<PeerRequestResult<Vec<BlockBody>>>;
|
||||
|
||||
fn get_block_bodies_with_priority(
|
||||
&self,
|
||||
_hashes: Vec<H256>,
|
||||
_priority: Priority,
|
||||
) -> Self::Output {
|
||||
futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![])))
|
||||
}
|
||||
}
|
||||
|
||||
impl HeadersClient for NoopFullBlockClient {
|
||||
type Output = futures::future::Ready<PeerRequestResult<Vec<Header>>>;
|
||||
|
||||
fn get_headers_with_priority(
|
||||
&self,
|
||||
_request: HeadersRequest,
|
||||
_priority: Priority,
|
||||
) -> Self::Output {
|
||||
futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![])))
|
||||
}
|
||||
}
|
||||
@ -1,10 +1,12 @@
|
||||
#![allow(unused)]
|
||||
|
||||
mod bodies;
|
||||
mod full_block;
|
||||
mod headers;
|
||||
|
||||
/// Generators for different data structures like block headers, block bodies and ranges of those.
|
||||
pub mod generators;
|
||||
|
||||
pub use bodies::*;
|
||||
pub use full_block::*;
|
||||
pub use headers::*;
|
||||
|
||||
@ -11,6 +11,14 @@ use tokio::{
|
||||
};
|
||||
use tokio_util::codec::FramedWrite;
|
||||
|
||||
mod bodies_client;
|
||||
mod file_client;
|
||||
mod file_codec;
|
||||
|
||||
pub use bodies_client::TestBodiesClient;
|
||||
pub use file_client::{FileClient, FileClientError};
|
||||
pub(crate) use file_codec::BlockFileCodec;
|
||||
|
||||
/// Metrics scope used for testing.
|
||||
pub(crate) const TEST_SCOPE: &str = "downloaders.test";
|
||||
|
||||
@ -59,11 +67,3 @@ pub(crate) async fn generate_bodies_file(
|
||||
file.seek(SeekFrom::Start(0)).await.unwrap();
|
||||
(file, headers, bodies)
|
||||
}
|
||||
|
||||
mod file_client;
|
||||
mod file_codec;
|
||||
mod test_client;
|
||||
|
||||
pub use file_client::{FileClient, FileClientError};
|
||||
pub(crate) use file_codec::BlockFileCodec;
|
||||
pub use test_client::TestBodiesClient;
|
||||
|
||||
@ -15,6 +15,7 @@ pub enum ControlFlow {
|
||||
/// The progress of the last stage
|
||||
progress: BlockNumber,
|
||||
},
|
||||
/// Pipeline made no progress
|
||||
NoProgress {
|
||||
/// The current stage progress.
|
||||
stage_progress: Option<BlockNumber>,
|
||||
@ -22,10 +23,12 @@ pub enum ControlFlow {
|
||||
}
|
||||
|
||||
impl ControlFlow {
|
||||
/// Whether the pipeline should continue executing stages.
|
||||
pub fn should_continue(&self) -> bool {
|
||||
matches!(self, ControlFlow::Continue { .. } | ControlFlow::NoProgress { .. })
|
||||
}
|
||||
|
||||
/// Returns true if the control flow is unwind.
|
||||
pub fn is_unwind(&self) -> bool {
|
||||
matches!(self, ControlFlow::Unwind { .. })
|
||||
}
|
||||
|
||||
@ -16,8 +16,8 @@ mod progress;
|
||||
mod set;
|
||||
mod sync_metrics;
|
||||
|
||||
pub use crate::pipeline::ctrl::ControlFlow;
|
||||
pub use builder::*;
|
||||
use ctrl::*;
|
||||
pub use event::*;
|
||||
use progress::*;
|
||||
pub use set::*;
|
||||
@ -113,8 +113,8 @@ where
|
||||
}
|
||||
|
||||
/// Return the minimum pipeline progress
|
||||
pub fn minimum_progress(&self) -> &Option<u64> {
|
||||
&self.progress.minimum_progress
|
||||
pub fn minimum_progress(&self) -> Option<u64> {
|
||||
self.progress.minimum_progress
|
||||
}
|
||||
|
||||
/// Set tip for reverse sync.
|
||||
|
||||
Reference in New Issue
Block a user