fix(cli): run pipeline to completion after restart (#2727)

This commit is contained in:
Roman Krasiuk
2023-05-18 23:05:44 +03:00
committed by GitHub
parent e4cd48aefd
commit defa64b2de
2 changed files with 45 additions and 53 deletions

View File

@ -14,7 +14,7 @@ use fdlimit::raise_fd_limit;
use futures::{pin_mut, stream::select as stream_select, StreamExt};
use reth_auto_seal_consensus::{AutoSealBuilder, AutoSealConsensus};
use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig};
use reth_beacon_consensus::{BeaconConsensus, BeaconConsensusEngine, BeaconEngineMessage};
use reth_beacon_consensus::{BeaconConsensus, BeaconConsensusEngine};
use reth_blockchain_tree::{
config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree,
};
@ -30,7 +30,7 @@ use reth_downloaders::{
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
};
use reth_interfaces::{
consensus::{Consensus, ForkchoiceState},
consensus::Consensus,
p2p::{
bodies::{client::BodiesClient, downloader::BodyDownloader},
either::EitherDownloader,
@ -244,44 +244,6 @@ impl Command {
let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
// Forward genesis as forkchoice state to the consensus engine.
// This will allow the downloader to start
if self.debug.continuous {
info!(target: "reth::cli", "Continuous sync mode enabled");
let (tip_tx, _tip_rx) = oneshot::channel();
let state = ForkchoiceState {
head_block_hash: genesis_hash,
finalized_block_hash: genesis_hash,
safe_block_hash: genesis_hash,
};
consensus_engine_tx.send(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs: None,
tx: tip_tx,
})?;
}
// Forward the `debug.tip` as forkchoice state to the consensus engine.
// This will initiate the sync up to the provided tip.
let _tip_rx = match self.debug.tip {
Some(tip) => {
let (tip_tx, tip_rx) = oneshot::channel();
let state = ForkchoiceState {
head_block_hash: tip,
finalized_block_hash: tip,
safe_block_hash: tip,
};
consensus_engine_tx.send(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs: None,
tx: tip_tx,
})?;
debug!(target: "reth::cli", %tip, "Tip manually set");
Some(tip_rx)
}
None => None,
};
// configure the payload builder
let payload_generator = BasicPayloadJobGenerator::new(
blockchain_db.clone(),
@ -339,6 +301,20 @@ impl Command {
let pipeline_events = pipeline.events();
let initial_target = if let Some(tip) = self.debug.tip {
// Set the provided tip as the initial pipeline target.
debug!(target: "reth::cli", %tip, "Tip manually set");
Some(tip)
} else if self.debug.continuous {
// Set genesis as the initial pipeline target.
// This will allow the downloader to start
debug!(target: "reth::cli", "Continuous sync mode enabled");
Some(genesis_hash)
} else {
None
};
// Configure the consensus engine
let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel(
Arc::clone(&db),
client,
@ -349,6 +325,7 @@ impl Command {
self.debug.max_block,
self.debug.continuous,
payload_builder.clone(),
initial_target,
consensus_engine_tx,
consensus_engine_rx,
);

View File

@ -191,6 +191,7 @@ where
max_block: Option<BlockNumber>,
run_pipeline_continuously: bool,
payload_builder: PayloadBuilderHandle,
target: Option<H256>,
) -> (Self, BeaconConsensusEngineHandle) {
let (to_engine, rx) = mpsc::unbounded_channel();
Self::with_channel(
@ -203,6 +204,7 @@ where
max_block,
run_pipeline_continuously,
payload_builder,
target,
to_engine,
rx,
)
@ -221,6 +223,7 @@ where
max_block: Option<BlockNumber>,
run_pipeline_continuously: bool,
payload_builder: PayloadBuilderHandle,
target: Option<H256>,
to_engine: UnboundedSender<BeaconEngineMessage>,
rx: UnboundedReceiver<BeaconEngineMessage>,
) -> (Self, BeaconConsensusEngineHandle) {
@ -232,7 +235,7 @@ where
run_pipeline_continuously,
max_block,
);
let this = Self {
let mut this = Self {
db,
sync,
blockchain,
@ -246,6 +249,10 @@ where
metrics: Metrics::default(),
};
if let Some(target) = target {
this.sync.set_pipeline_sync_target(target);
}
(this, handle)
}
@ -765,7 +772,6 @@ where
fn on_sync_event(
&mut self,
ev: EngineSyncEvent,
current_state: &ForkchoiceState,
) -> Option<Result<(), BeaconConsensusEngineError>> {
match ev {
EngineSyncEvent::FetchedFullBlock(block) => {
@ -795,16 +801,30 @@ where
EngineSyncEvent::PipelineFinished { result, reached_max_block } => {
match result {
Ok(ctrl) => {
if ctrl.is_unwind() {
self.sync.set_pipeline_sync_target(current_state.head_block_hash);
} else if reached_max_block {
if reached_max_block {
// Terminate the sync early if it's reached the maximum user
// configured block.
return Some(Ok(()))
}
let current_state = match self.forkchoice_state {
Some(state) => state,
None => {
// This is only possible if the node was run with `debug.tip`
// argument and without CL.
warn!(target: "consensus::engine", "No forkchoice state available");
return None
}
};
if ctrl.is_unwind() {
// Attempt to sync to the head block after unwind.
self.sync.set_pipeline_sync_target(current_state.head_block_hash);
return None
}
// Update the state and hashes of the blockchain tree if possible.
match self.restore_tree_if_possible(*current_state) {
match self.restore_tree_if_possible(current_state) {
Ok(_) => self.sync_state_updater.update_sync_state(SyncState::Idle),
Err(error) => {
error!(target: "consensus::engine", ?error, "Error restoring blockchain tree");
@ -859,15 +879,9 @@ where
}
}
// Lookup the forkchoice state. We can't launch the pipeline without the tip.
let forkchoice_state = match &this.forkchoice_state {
Some(state) => *state,
None => return Poll::Pending,
};
// poll sync controller
while let Poll::Ready(sync_event) = this.sync.poll(cx) {
if let Some(res) = this.on_sync_event(sync_event, &forkchoice_state) {
if let Some(res) = this.on_sync_event(sync_event) {
return Poll::Ready(res)
}
}
@ -1040,6 +1054,7 @@ mod tests {
None,
false,
payload_builder,
None,
);
(engine, TestEnv::new(db, tip_rx, handle))