feat: stage eta (#3135)

This commit is contained in:
Bjerg
2023-06-15 13:58:35 +02:00
committed by GitHub
parent 64392209e9
commit ff36f78c2b
5 changed files with 100 additions and 29 deletions

View File

@ -83,6 +83,7 @@ backon = "0.4"
hex = "0.4"
thiserror = { workspace = true }
pretty_assertions = "1.3.0"
humantime = "2.1.0"
[features]
jemalloc = ["dep:jemallocator"]

View File

@ -7,7 +7,7 @@ use reth_interfaces::consensus::ForkchoiceState;
use reth_network::{NetworkEvent, NetworkHandle};
use reth_network_api::PeersInfo;
use reth_primitives::{
stage::{StageCheckpoint, StageId},
stage::{EntitiesCheckpoint, StageCheckpoint, StageId},
BlockNumber,
};
use reth_stages::{ExecOutput, PipelineEvent};
@ -15,10 +15,10 @@ use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::Duration,
time::{Duration, Instant},
};
use tokio::time::Interval;
use tracing::{debug, info, warn};
use tracing::{info, warn};
/// Interval of reporting node state.
const INFO_MESSAGE_INTERVAL: Duration = Duration::from_secs(30);
@ -29,6 +29,8 @@ struct NodeState {
network: Option<NetworkHandle>,
/// The stage currently being executed.
current_stage: Option<StageId>,
/// The ETA for the current stage.
eta: Eta,
/// The current checkpoint of the executing stage.
current_checkpoint: StageCheckpoint,
/// The latest canonical block added in the consensus engine.
@ -40,6 +42,7 @@ impl NodeState {
Self {
network,
current_stage: None,
eta: Eta::default(),
current_checkpoint: StageCheckpoint::new(0),
latest_canonical_engine_block: None,
}
@ -59,11 +62,11 @@ impl NodeState {
if notable {
info!(
target: "reth::cli",
pipeline_stages = %format!("{pipeline_position}/{pipeline_total}"),
stage = %stage_id,
from = self.current_checkpoint.block_number,
checkpoint = %self.current_checkpoint,
eta = %self.eta,
"Executing stage",
);
}
@ -75,17 +78,14 @@ impl NodeState {
result: ExecOutput { checkpoint, done },
} => {
self.current_checkpoint = checkpoint;
if done {
self.current_stage = None;
}
self.eta.update(self.current_checkpoint);
info!(
target: "reth::cli",
pipeline_stages = %format!("{pipeline_position}/{pipeline_total}"),
stage = %stage_id,
progress = checkpoint.block_number,
block = checkpoint.block_number,
%checkpoint,
eta = %self.eta,
"{}",
if done {
"Stage finished executing"
@ -93,22 +93,20 @@ impl NodeState {
"Stage committed progress"
}
);
if done {
self.current_stage = None;
self.eta = Eta::default();
}
}
_ => (),
}
}
fn handle_network_event(&mut self, event: NetworkEvent) {
match event {
NetworkEvent::SessionEstablished { peer_id, status, .. } => {
info!(target: "reth::cli", connected_peers = self.num_connected_peers(), peer_id = %peer_id, best_block = %status.blockhash, "Peer connected");
}
NetworkEvent::SessionClosed { peer_id, reason } => {
let reason = reason.map(|s| s.to_string()).unwrap_or_else(|| "None".to_string());
debug!(target: "reth::cli", connected_peers = self.num_connected_peers(), peer_id = %peer_id, %reason, "Peer disconnected.");
}
_ => (),
}
fn handle_network_event(&mut self, _: NetworkEvent) {
// NOTE(onbjerg): This used to log established/disconnecting sessions, but this is already
// logged in the networking component. I kept this stub in case we want to catch other
// networking events later on.
}
fn handle_consensus_engine_event(&mut self, event: BeaconConsensusEngineEvent) {
@ -117,7 +115,6 @@ impl NodeState {
let ForkchoiceState { head_block_hash, safe_block_hash, finalized_block_hash } =
state;
info!(
target: "reth::cli",
?head_block_hash,
?safe_block_hash,
?finalized_block_hash,
@ -128,10 +125,10 @@ impl NodeState {
BeaconConsensusEngineEvent::CanonicalBlockAdded(block) => {
self.latest_canonical_engine_block = Some(block.number);
info!(target: "reth::cli", number=block.number, hash=?block.hash, "Block added to canonical chain");
info!(number=block.number, hash=?block.hash, "Block added to canonical chain");
}
BeaconConsensusEngineEvent::ForkBlockAdded(block) => {
info!(target: "reth::cli", number=block.number, hash=?block.hash, "Block added to fork chain");
info!(number=block.number, hash=?block.hash, "Block added to fork chain");
}
}
}
@ -139,16 +136,16 @@ impl NodeState {
fn handle_consensus_layer_health_event(&self, event: ConsensusLayerHealthEvent) {
match event {
ConsensusLayerHealthEvent::NeverSeen => {
warn!(target: "reth::cli", "Post-merge network, but never seen beacon client. Please launch one to follow the chain!")
warn!("Post-merge network, but never seen beacon client. Please launch one to follow the chain!")
}
ConsensusLayerHealthEvent::HasNotBeenSeenForAWhile(period) => {
warn!(target: "reth::cli", ?period, "Post-merge network, but no beacon client seen for a while. Please launch one to follow the chain!")
warn!(?period, "Post-merge network, but no beacon client seen for a while. Please launch one to follow the chain!")
}
ConsensusLayerHealthEvent::NeverReceivedUpdates => {
warn!(target: "reth::cli", "Beacon client online, but never received consensus updates. Please ensure your beacon client is operational to follow the chain!")
warn!("Beacon client online, but never received consensus updates. Please ensure your beacon client is operational to follow the chain!")
}
ConsensusLayerHealthEvent::HaveNotReceivedUpdatesForAWhile(period) => {
warn!(target: "reth::cli", ?period, "Beacon client online, but no consensus updates received for a while. Please fix your beacon client to follow the chain!")
warn!(?period, "Beacon client online, but no consensus updates received for a while. Please fix your beacon client to follow the chain!")
}
}
}
@ -232,6 +229,7 @@ where
connected_peers = this.state.num_connected_peers(),
%stage,
checkpoint = %this.state.current_checkpoint,
eta = %this.state.eta,
"Status"
);
} else {
@ -264,3 +262,51 @@ where
Poll::Pending
}
}
/// A container calculating the estimated time that a stage will complete in, based on stage
/// checkpoints reported by the pipeline.
///
/// One `Eta` is only valid for a single stage.
#[derive(Default)]
struct Eta {
/// The last stage checkpoint
last_checkpoint: EntitiesCheckpoint,
/// The last time the stage reported its checkpoint
last_checkpoint_time: Option<Instant>,
/// The current ETA
eta: Option<Duration>,
}
impl Eta {
/// Update the ETA given the checkpoint.
fn update(&mut self, checkpoint: StageCheckpoint) {
let current = checkpoint.entities();
if let Some(last_checkpoint_time) = &self.last_checkpoint_time {
let processed_since_last = current.processed - self.last_checkpoint.processed;
let elapsed = last_checkpoint_time.elapsed();
let per_second = processed_since_last as f64 / elapsed.as_secs_f64();
self.eta = Some(Duration::from_secs_f64(
(current.total - current.processed) as f64 / per_second,
));
}
self.last_checkpoint = current;
self.last_checkpoint_time = Some(Instant::now());
}
}
impl std::fmt::Display for Eta {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some((eta, last_checkpoint_time)) = self.eta.zip(self.last_checkpoint_time) {
let remaining = eta.checked_sub(last_checkpoint_time.elapsed());
if let Some(remaining) = remaining {
return write!(f, "{}", humantime::format_duration(remaining))
}
}
write!(f, "unknown")
}
}