feat(bin): improve status logs (#5518)

This commit is contained in:
Alexey Shekhirin
2023-11-21 23:57:42 +00:00
committed by GitHub
parent c825b26f2e
commit 06eeb35366
7 changed files with 194 additions and 150 deletions

View File

@ -109,12 +109,12 @@ impl ImportCommand {
pipeline.set_tip(tip);
debug!(target: "reth::cli", ?tip, "Tip manually set");
let factory = ProviderFactory::new(&db, self.chain.clone());
let factory = ProviderFactory::new(db.clone(), self.chain.clone());
let provider = factory.provider()?;
let latest_block_number =
provider.get_stage_checkpoint(StageId::Finish)?.map(|ch| ch.block_number);
tokio::spawn(handle_events(None, latest_block_number, events));
tokio::spawn(handle_events(None, latest_block_number, events, db.clone()));
// Run pipeline
info!(target: "reth::cli", "Starting sync pipeline");

View File

@ -235,7 +235,7 @@ impl Command {
&ctx.task_executor,
)?;
let factory = ProviderFactory::new(&db, self.chain.clone());
let factory = ProviderFactory::new(db.clone(), self.chain.clone());
let provider = factory.provider()?;
let latest_block_number =
@ -252,7 +252,7 @@ impl Command {
);
ctx.task_executor.spawn_critical(
"events task",
events::handle_events(Some(network.clone()), latest_block_number, events),
events::handle_events(Some(network.clone()), latest_block_number, events, db.clone()),
);
let mut current_max_block = latest_block_number.unwrap_or_default();

View File

@ -3,6 +3,7 @@
use crate::node::cl_events::ConsensusLayerHealthEvent;
use futures::Stream;
use reth_beacon_consensus::BeaconConsensusEngineEvent;
use reth_db::DatabaseEnv;
use reth_interfaces::consensus::ForkchoiceState;
use reth_network::{NetworkEvent, NetworkHandle};
use reth_network_api::PeersInfo;
@ -13,8 +14,10 @@ use reth_primitives::{
use reth_prune::PrunerEvent;
use reth_stages::{ExecOutput, PipelineEvent};
use std::{
fmt::{Display, Formatter},
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::{Duration, Instant},
};
@ -26,27 +29,25 @@ const INFO_MESSAGE_INTERVAL: Duration = Duration::from_secs(25);
/// The current high-level state of the node.
struct NodeState {
/// Database environment.
/// Used for freelist calculation reported in the "Status" log message.
/// See [EventHandler::poll].
db: Arc<DatabaseEnv>,
/// Connection to the network.
network: Option<NetworkHandle>,
/// The stage currently being executed.
current_stage: Option<StageId>,
/// The ETA for the current stage.
eta: Eta,
/// The current checkpoint of the executing stage.
current_checkpoint: StageCheckpoint,
current_stage: Option<CurrentStage>,
/// The latest block reached by either pipeline or consensus engine.
latest_block: Option<BlockNumber>,
}
impl NodeState {
fn new(network: Option<NetworkHandle>, latest_block: Option<BlockNumber>) -> Self {
Self {
network,
current_stage: None,
eta: Eta::default(),
current_checkpoint: StageCheckpoint::new(0),
latest_block,
}
fn new(
db: Arc<DatabaseEnv>,
network: Option<NetworkHandle>,
latest_block: Option<BlockNumber>,
) -> Self {
Self { db, network, current_stage: None, latest_block }
}
fn num_connected_peers(&self) -> usize {
@ -56,70 +57,80 @@ impl NodeState {
/// Processes an event emitted by the pipeline
fn handle_pipeline_event(&mut self, event: PipelineEvent) {
match event {
PipelineEvent::Running { pipeline_stages_progress, stage_id, checkpoint } => {
let notable = self.current_stage.is_none();
self.current_stage = Some(stage_id);
self.current_checkpoint = checkpoint.unwrap_or_default();
PipelineEvent::Run { pipeline_stages_progress, stage_id, checkpoint, target } => {
let checkpoint = checkpoint.unwrap_or_default();
let current_stage = CurrentStage {
stage_id,
eta: match &self.current_stage {
Some(current_stage) if current_stage.stage_id == stage_id => {
current_stage.eta
}
_ => Eta::default(),
},
checkpoint,
target,
};
if notable {
if let Some(progress) = self.current_checkpoint.entities() {
info!(
pipeline_stages = %pipeline_stages_progress,
stage = %stage_id,
from = self.current_checkpoint.block_number,
checkpoint = %self.current_checkpoint.block_number,
%progress,
eta = %self.eta.fmt_for_stage(stage_id),
"Executing stage",
);
} else {
info!(
pipeline_stages = %pipeline_stages_progress,
stage = %stage_id,
from = self.current_checkpoint.block_number,
checkpoint = %self.current_checkpoint.block_number,
eta = %self.eta.fmt_for_stage(stage_id),
"Executing stage",
);
}
}
let progress = OptionalField(
checkpoint.entities().and_then(|entities| entities.fmt_percentage()),
);
let eta = current_stage.eta.fmt_for_stage(stage_id);
info!(
pipeline_stages = %pipeline_stages_progress,
stage = %stage_id,
checkpoint = %checkpoint.block_number,
target = %OptionalField(target),
%progress,
%eta,
"Executing stage",
);
self.current_stage = Some(current_stage);
}
PipelineEvent::Ran {
pipeline_stages_progress,
stage_id,
result: ExecOutput { checkpoint, done },
} => {
self.current_checkpoint = checkpoint;
if stage_id.is_finish() {
self.latest_block = Some(checkpoint.block_number);
}
self.eta.update(self.current_checkpoint);
let message =
if done { "Stage finished executing" } else { "Stage committed progress" };
if let Some(current_stage) = self.current_stage.as_mut() {
current_stage.checkpoint = checkpoint;
current_stage.eta.update(checkpoint);
if let Some(progress) = checkpoint.entities() {
info!(
pipeline_stages = %pipeline_stages_progress,
stage = %stage_id,
checkpoint = %checkpoint.block_number,
%progress,
eta = %self.eta.fmt_for_stage(stage_id),
"{message}",
);
} else {
info!(
pipeline_stages = %pipeline_stages_progress,
stage = %stage_id,
checkpoint = %checkpoint.block_number,
eta = %self.eta.fmt_for_stage(stage_id),
"{message}",
let target = OptionalField(current_stage.target);
let progress = OptionalField(
checkpoint.entities().and_then(|entities| entities.fmt_percentage()),
);
if done {
info!(
pipeline_stages = %pipeline_stages_progress,
stage = %stage_id,
checkpoint = %checkpoint.block_number,
%target,
%progress,
"Stage finished executing",
)
} else {
let eta = current_stage.eta.fmt_for_stage(stage_id);
info!(
pipeline_stages = %pipeline_stages_progress,
stage = %stage_id,
checkpoint = %checkpoint.block_number,
%target,
%progress,
%eta,
"Stage committed progress",
)
}
}
if done {
self.current_stage = None;
self.eta = Eta::default();
}
}
_ => (),
@ -189,6 +200,29 @@ impl NodeState {
}
}
/// Helper type for formatting of optional fields:
/// - If [Some(x)], then `x` is written
/// - If [None], then `None` is written
struct OptionalField<T: Display>(Option<T>);
impl<T: Display> Display for OptionalField<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if let Some(field) = &self.0 {
write!(f, "{field}")
} else {
write!(f, "None")
}
}
}
/// The stage currently being executed.
struct CurrentStage {
stage_id: StageId,
eta: Eta,
checkpoint: StageCheckpoint,
target: Option<BlockNumber>,
}
/// A node event.
#[derive(Debug)]
pub enum NodeEvent {
@ -240,10 +274,11 @@ pub async fn handle_events<E>(
network: Option<NetworkHandle>,
latest_block_number: Option<BlockNumber>,
events: E,
db: Arc<DatabaseEnv>,
) where
E: Stream<Item = NodeEvent> + Unpin,
{
let state = NodeState::new(network, latest_block_number);
let state = NodeState::new(db, network, latest_block_number);
let start = tokio::time::Instant::now() + Duration::from_secs(3);
let mut info_interval = tokio::time::interval_at(start, INFO_MESSAGE_INTERVAL);
@ -273,32 +308,40 @@ where
let mut this = self.project();
while this.info_interval.poll_tick(cx).is_ready() {
if let Some(stage) = this.state.current_stage {
if let Some(progress) = this.state.current_checkpoint.entities() {
info!(
target: "reth::cli",
connected_peers = this.state.num_connected_peers(),
%stage,
checkpoint = %this.state.current_checkpoint.block_number,
%progress,
eta = %this.state.eta.fmt_for_stage(stage),
"Status"
);
} else {
info!(
target: "reth::cli",
connected_peers = this.state.num_connected_peers(),
%stage,
checkpoint = %this.state.current_checkpoint.block_number,
eta = %this.state.eta.fmt_for_stage(stage),
"Status"
);
}
let freelist = OptionalField(this.state.db.freelist().ok());
if let Some(CurrentStage { stage_id, eta, checkpoint, target }) =
&this.state.current_stage
{
let progress = OptionalField(
checkpoint.entities().and_then(|entities| entities.fmt_percentage()),
);
let eta = eta.fmt_for_stage(*stage_id);
info!(
target: "reth::cli",
connected_peers = this.state.num_connected_peers(),
%freelist,
stage = %stage_id,
checkpoint = checkpoint.block_number,
target = %OptionalField(*target),
%progress,
%eta,
"Status"
);
} else if let Some(latest_block) = this.state.latest_block {
info!(
target: "reth::cli",
connected_peers = this.state.num_connected_peers(),
%freelist,
%latest_block,
"Status"
);
} else {
info!(
target: "reth::cli",
connected_peers = this.state.num_connected_peers(),
latest_block = this.state.latest_block.unwrap_or(this.state.current_checkpoint.block_number),
%freelist,
"Status"
);
}
@ -332,7 +375,7 @@ where
/// checkpoints reported by the pipeline.
///
/// One `Eta` is only valid for a single stage.
#[derive(Default)]
#[derive(Default, Copy, Clone)]
struct Eta {
/// The last stage checkpoint
last_checkpoint: EntitiesCheckpoint,
@ -375,8 +418,8 @@ impl Eta {
}
}
impl std::fmt::Display for Eta {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl Display for Eta {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if let Some((eta, last_checkpoint_time)) = self.eta.zip(self.last_checkpoint_time) {
let remaining = eta.checked_sub(last_checkpoint_time.elapsed());

View File

@ -516,7 +516,7 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
);
ctx.task_executor.spawn_critical(
"events task",
events::handle_events(Some(network.clone()), Some(head.number), events),
events::handle_events(Some(network.clone()), Some(head.number), events, db.clone()),
);
let engine_api = EngineApi::new(

View File

@ -5,10 +5,7 @@ use crate::{
use bytes::{Buf, BufMut};
use reth_codecs::{derive_arbitrary, main_codec, Compact};
use serde::{Deserialize, Serialize};
use std::{
fmt::{Display, Formatter},
ops::RangeInclusive,
};
use std::ops::RangeInclusive;
/// Saves the progress of Merkle stage.
#[derive(Default, Debug, Clone, PartialEq)]
@ -169,9 +166,16 @@ pub struct EntitiesCheckpoint {
pub total: u64,
}
impl Display for EntitiesCheckpoint {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:.2}%", 100.0 * self.processed as f64 / self.total as f64)
impl EntitiesCheckpoint {
/// Formats entities checkpoint as percentage, i.e. `processed / total`.
///
/// Return [None] if `total == 0`.
pub fn fmt_percentage(&self) -> Option<String> {
if self.total == 0 {
return None
}
Some(format!("{:.2}%", 100.0 * self.processed as f64 / self.total as f64))
}
}

View File

@ -1,5 +1,8 @@
use crate::stage::{ExecOutput, UnwindInput, UnwindOutput};
use reth_primitives::stage::{StageCheckpoint, StageId};
use reth_primitives::{
stage::{StageCheckpoint, StageId},
BlockNumber,
};
use std::fmt::{Display, Formatter};
/// An event emitted by a [Pipeline][crate::Pipeline].
@ -12,13 +15,15 @@ use std::fmt::{Display, Formatter};
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum PipelineEvent {
/// Emitted when a stage is about to be run.
Running {
Run {
/// Pipeline stages progress.
pipeline_stages_progress: PipelineStagesProgress,
/// The stage that is about to be run.
stage_id: StageId,
/// The previous checkpoint of the stage.
checkpoint: Option<StageCheckpoint>,
/// The block number up to which the stage is running, if known.
target: Option<BlockNumber>,
},
/// Emitted when a stage has run a single time.
Ran {
@ -30,7 +35,7 @@ pub enum PipelineEvent {
result: ExecOutput,
},
/// Emitted when a stage is about to be unwound.
Unwinding {
Unwind {
/// The stage that is about to be unwound.
stage_id: StageId,
/// The unwind parameters.

View File

@ -290,7 +290,7 @@ where
);
while checkpoint.block_number > to {
let input = UnwindInput { checkpoint, unwind_to: to, bad_block };
self.listeners.notify(PipelineEvent::Unwinding { stage_id, input });
self.listeners.notify(PipelineEvent::Unwind { stage_id, input });
let output = stage.unwind(&provider_rw, input);
match output {
@ -378,13 +378,14 @@ where
};
}
self.listeners.notify(PipelineEvent::Running {
self.listeners.notify(PipelineEvent::Run {
pipeline_stages_progress: event::PipelineStagesProgress {
current: stage_index + 1,
total: total_stages,
},
stage_id,
checkpoint: prev_checkpoint,
target,
});
let provider_rw = factory.provider_rw()?;
@ -393,26 +394,6 @@ where
made_progress |=
checkpoint.block_number != prev_checkpoint.unwrap_or_default().block_number;
if let Some(progress) = checkpoint.entities() {
debug!(
target: "sync::pipeline",
stage = %stage_id,
checkpoint = checkpoint.block_number,
?target,
%progress,
%done,
"Stage committed progress"
);
} else {
debug!(
target: "sync::pipeline",
stage = %stage_id,
checkpoint = checkpoint.block_number,
?target,
%done,
"Stage committed progress"
);
}
if let Some(metrics_tx) = &mut self.metrics_tx {
let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
stage_id,
@ -608,20 +589,22 @@ mod tests {
assert_eq!(
events.collect::<Vec<PipelineEvent>>().await,
vec![
PipelineEvent::Running {
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
checkpoint: None
checkpoint: None,
target: Some(10),
},
PipelineEvent::Ran {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
},
PipelineEvent::Running {
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
checkpoint: None
checkpoint: None,
target: Some(10),
},
PipelineEvent::Ran {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
@ -671,30 +654,33 @@ mod tests {
events.collect::<Vec<PipelineEvent>>().await,
vec![
// Executing
PipelineEvent::Running {
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
stage_id: StageId::Other("A"),
checkpoint: None
checkpoint: None,
target: Some(10),
},
PipelineEvent::Ran {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
},
PipelineEvent::Running {
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
stage_id: StageId::Other("B"),
checkpoint: None
checkpoint: None,
target: Some(10),
},
PipelineEvent::Ran {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
stage_id: StageId::Other("B"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
PipelineEvent::Running {
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
stage_id: StageId::Other("C"),
checkpoint: None
checkpoint: None,
target: Some(10),
},
PipelineEvent::Ran {
pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
@ -702,7 +688,7 @@ mod tests {
result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
},
// Unwinding
PipelineEvent::Unwinding {
PipelineEvent::Unwind {
stage_id: StageId::Other("C"),
input: UnwindInput {
checkpoint: StageCheckpoint::new(20),
@ -714,7 +700,7 @@ mod tests {
stage_id: StageId::Other("C"),
result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
},
PipelineEvent::Unwinding {
PipelineEvent::Unwind {
stage_id: StageId::Other("B"),
input: UnwindInput {
checkpoint: StageCheckpoint::new(10),
@ -726,7 +712,7 @@ mod tests {
stage_id: StageId::Other("B"),
result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
},
PipelineEvent::Unwinding {
PipelineEvent::Unwind {
stage_id: StageId::Other("A"),
input: UnwindInput {
checkpoint: StageCheckpoint::new(100),
@ -775,20 +761,22 @@ mod tests {
events.collect::<Vec<PipelineEvent>>().await,
vec![
// Executing
PipelineEvent::Running {
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
checkpoint: None
checkpoint: None,
target: Some(10),
},
PipelineEvent::Ran {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
},
PipelineEvent::Running {
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
checkpoint: None
checkpoint: None,
target: Some(10),
},
PipelineEvent::Ran {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
@ -798,7 +786,7 @@ mod tests {
// Unwinding
// Nothing to unwind in stage "B"
PipelineEvent::Skipped { stage_id: StageId::Other("B") },
PipelineEvent::Unwinding {
PipelineEvent::Unwind {
stage_id: StageId::Other("A"),
input: UnwindInput {
checkpoint: StageCheckpoint::new(100),
@ -865,23 +853,25 @@ mod tests {
assert_eq!(
events.collect::<Vec<PipelineEvent>>().await,
vec![
PipelineEvent::Running {
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
checkpoint: None
checkpoint: None,
target: Some(10),
},
PipelineEvent::Ran {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
PipelineEvent::Running {
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
checkpoint: None
checkpoint: None,
target: Some(10),
},
PipelineEvent::Error { stage_id: StageId::Other("B") },
PipelineEvent::Unwinding {
PipelineEvent::Unwind {
stage_id: StageId::Other("A"),
input: UnwindInput {
checkpoint: StageCheckpoint::new(10),
@ -893,20 +883,22 @@ mod tests {
stage_id: StageId::Other("A"),
result: UnwindOutput { checkpoint: StageCheckpoint::new(0) },
},
PipelineEvent::Running {
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
checkpoint: Some(StageCheckpoint::new(0))
checkpoint: Some(StageCheckpoint::new(0)),
target: Some(10),
},
PipelineEvent::Ran {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
PipelineEvent::Running {
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
checkpoint: None
checkpoint: None,
target: Some(10),
},
PipelineEvent::Ran {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },