fix: use engine responses to progress autoseal mining task (#3727)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Dan Cline
2023-07-12 10:03:40 -04:00
committed by GitHub
parent dbafe23cce
commit 6799fc3600
3 changed files with 36 additions and 28 deletions

View File

@ -1,12 +1,10 @@
use crate::{mode::MiningMode, Storage};
use futures_util::{future::BoxFuture, FutureExt, StreamExt};
use reth_beacon_consensus::BeaconEngineMessage;
use futures_util::{future::BoxFuture, FutureExt};
use reth_beacon_consensus::{BeaconEngineMessage, ForkchoiceStatus};
use reth_interfaces::consensus::ForkchoiceState;
use reth_primitives::{
constants::{EMPTY_RECEIPTS, EMPTY_TRANSACTIONS, ETHEREUM_BLOCK_GAS_LIMIT},
proofs,
stage::StageId,
Block, BlockBody, ChainSpec, Header, IntoRecoveredTransaction, ReceiptWithBloom,
proofs, Block, BlockBody, ChainSpec, Header, IntoRecoveredTransaction, ReceiptWithBloom,
SealedBlockWithSenders, EMPTY_OMMER_ROOT, U256,
};
use reth_provider::{CanonChainTracker, CanonStateNotificationSender, Chain, StateProviderFactory};
@ -26,7 +24,7 @@ use std::{
};
use tokio::sync::{mpsc::UnboundedSender, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, trace, warn};
use tracing::{debug, error, trace, warn};
/// A Future that listens for new ready transactions and puts new blocks into storage
pub struct MiningTask<Client, Pool: TransactionPool> {
@ -117,7 +115,7 @@ where
let client = this.client.clone();
let chain_spec = Arc::clone(&this.chain_spec);
let pool = this.pool.clone();
let mut events = this.pipe_line_events.take();
let events = this.pipe_line_events.take();
let canon_state_notification = this.canon_state_notification.clone();
// Create the mining future that creates a block, notifies the engine that drives
@ -226,29 +224,38 @@ where
};
drop(storage);
// send the new update to the engine, this will trigger the pipeline to
// download the block, execute it and store it in the database.
let (tx, _rx) = oneshot::channel();
let _ = to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs: None,
tx,
});
debug!(target: "consensus::auto", ?state, "sent fork choice update");
// TODO: make this a future
// await the fcu call rx for SYNCING, then wait for a VALID response
loop {
// send the new update to the engine, this will trigger the engine
// to download and execute the block we just inserted
let (tx, rx) = oneshot::channel();
let _ = to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs: None,
tx,
});
debug!(target: "consensus::auto", ?state, "Sent fork choice update");
// wait for the pipeline to finish
if let Some(events) = events.as_mut() {
debug!(target: "consensus::auto", "waiting for finish stage event...");
// wait for the finish stage to
loop {
if let Some(PipelineEvent::Running { stage_id, .. }) =
events.next().await
{
if stage_id == StageId::Finish {
debug!(target: "consensus::auto", "received finish stage event");
break
match rx.await.unwrap() {
Ok(fcu_response) => {
match fcu_response.forkchoice_status() {
ForkchoiceStatus::Valid => break,
ForkchoiceStatus::Invalid => {
error!(target: "consensus::auto", ?fcu_response, "Forkchoice update returned invalid response");
return None
}
ForkchoiceStatus::Syncing => {
debug!(target: "consensus::auto", ?fcu_response, "Forkchoice update returned SYNCING, waiting for VALID");
// wait for the next fork choice update
continue
}
}
}
Err(err) => {
error!(target: "consensus::auto", ?err, "Autoseal fork choice update failed");
return None
}
}
}