feat(primitives, stages): improve checkpoint logs readability (#5194)

This commit is contained in:
Alexey Shekhirin
2023-10-26 17:41:13 +01:00
committed by GitHub
parent c449bd53ea
commit fc4fc93680
5 changed files with 151 additions and 107 deletions

View File

@ -56,25 +56,36 @@ impl NodeState {
/// Processes an event emitted by the pipeline
fn handle_pipeline_event(&mut self, event: PipelineEvent) {
match event {
PipelineEvent::Running { pipeline_position, pipeline_total, stage_id, checkpoint } => {
PipelineEvent::Running { pipeline_stages_progress, stage_id, checkpoint } => {
let notable = self.current_stage.is_none();
self.current_stage = Some(stage_id);
self.current_checkpoint = checkpoint.unwrap_or_default();
if notable {
info!(
pipeline_stages = %format!("{pipeline_position}/{pipeline_total}"),
stage = %stage_id,
from = self.current_checkpoint.block_number,
checkpoint = %self.current_checkpoint,
eta = %self.eta.fmt_for_stage(stage_id),
"Executing stage",
);
if let Some(progress) = self.current_checkpoint.entities() {
info!(
pipeline_stages = %pipeline_stages_progress,
stage = %stage_id,
from = self.current_checkpoint.block_number,
checkpoint = %self.current_checkpoint.block_number,
%progress,
eta = %self.eta.fmt_for_stage(stage_id),
"Executing stage",
);
} else {
info!(
pipeline_stages = %pipeline_stages_progress,
stage = %stage_id,
from = self.current_checkpoint.block_number,
checkpoint = %self.current_checkpoint.block_number,
eta = %self.eta.fmt_for_stage(stage_id),
"Executing stage",
);
}
}
}
PipelineEvent::Ran {
pipeline_position,
pipeline_total,
pipeline_stages_progress,
stage_id,
result: ExecOutput { checkpoint, done },
} => {
@ -84,19 +95,27 @@ impl NodeState {
}
self.eta.update(self.current_checkpoint);
info!(
pipeline_stages = %format!("{pipeline_position}/{pipeline_total}"),
stage = %stage_id,
block = checkpoint.block_number,
%checkpoint,
eta = %self.eta.fmt_for_stage(stage_id),
"{}",
if done {
"Stage finished executing"
} else {
"Stage committed progress"
}
);
let message =
if done { "Stage finished executing" } else { "Stage committed progress" };
if let Some(progress) = checkpoint.entities() {
info!(
pipeline_stages = %pipeline_stages_progress,
stage = %stage_id,
checkpoint = %checkpoint.block_number,
%progress,
eta = %self.eta.fmt_for_stage(stage_id),
"{message}",
);
} else {
info!(
pipeline_stages = %pipeline_stages_progress,
stage = %stage_id,
checkpoint = %checkpoint.block_number,
eta = %self.eta.fmt_for_stage(stage_id),
"{message}",
);
}
if done {
self.current_stage = None;
@ -254,15 +273,27 @@ where
let mut this = self.project();
while this.info_interval.poll_tick(cx).is_ready() {
if let Some(stage_id) = this.state.current_stage {
info!(
target: "reth::cli",
connected_peers = this.state.num_connected_peers(),
stage = %stage_id.to_string(),
checkpoint = %this.state.current_checkpoint,
eta = %this.state.eta.fmt_for_stage(stage_id),
"Status"
);
if let Some(stage) = this.state.current_stage {
if let Some(progress) = this.state.current_checkpoint.entities() {
info!(
target: "reth::cli",
connected_peers = this.state.num_connected_peers(),
%stage,
checkpoint = %this.state.current_checkpoint.block_number,
%progress,
eta = %this.state.eta.fmt_for_stage(stage),
"Status"
);
} else {
info!(
target: "reth::cli",
connected_peers = this.state.num_connected_peers(),
%stage,
checkpoint = %this.state.current_checkpoint.block_number,
eta = %this.state.eta.fmt_for_stage(stage),
"Status"
);
}
} else {
info!(
target: "reth::cli",

View File

@ -246,15 +246,6 @@ impl StageCheckpoint {
}
}
impl Display for StageCheckpoint {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self.entities() {
Some(entities) => entities.fmt(f),
None => write!(f, "{}", self.block_number),
}
}
}
// TODO(alexey): add a merkle checkpoint. Currently it's hard because [`MerkleCheckpoint`]
// is not a Copy type.
/// Stage-specific checkpoint metrics.

View File

@ -1,5 +1,6 @@
use crate::stage::{ExecOutput, UnwindInput, UnwindOutput};
use reth_primitives::stage::{StageCheckpoint, StageId};
use std::fmt::{Display, Formatter};
/// An event emitted by a [Pipeline][crate::Pipeline].
///
@ -12,10 +13,8 @@ use reth_primitives::stage::{StageCheckpoint, StageId};
pub enum PipelineEvent {
/// Emitted when a stage is about to be run.
Running {
/// 1-indexed ID of the stage that is about to be run out of total stages in the pipeline.
pipeline_position: usize,
/// Total number of stages in the pipeline.
pipeline_total: usize,
/// Pipeline stages progress.
pipeline_stages_progress: PipelineStagesProgress,
/// The stage that is about to be run.
stage_id: StageId,
/// The previous checkpoint of the stage.
@ -23,10 +22,8 @@ pub enum PipelineEvent {
},
/// Emitted when a stage has run a single time.
Ran {
/// 1-indexed ID of the stage that was run out of total stages in the pipeline.
pipeline_position: usize,
/// Total number of stages in the pipeline.
pipeline_total: usize,
/// Pipeline stages progress.
pipeline_stages_progress: PipelineStagesProgress,
/// The stage that was run.
stage_id: StageId,
/// The result of executing the stage.
@ -61,3 +58,18 @@ pub enum PipelineEvent {
stage_id: StageId,
},
}
/// Pipeline stages progress.
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct PipelineStagesProgress {
/// 1-indexed ID of the stage that is about to be run out of total stages in the pipeline.
pub current: usize,
/// Total number of stages in the pipeline.
pub total: usize,
}
impl Display for PipelineStagesProgress {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", self.current, self.total)
}
}

View File

@ -272,12 +272,23 @@ where
let mut checkpoint = provider_rw.get_stage_checkpoint(stage_id)?.unwrap_or_default();
if checkpoint.block_number < to {
debug!(target: "sync::pipeline", from = %checkpoint, %to, "Unwind point too far for stage");
debug!(
target: "sync::pipeline",
from = %checkpoint.block_number,
%to,
"Unwind point too far for stage"
);
self.listeners.notify(PipelineEvent::Skipped { stage_id });
continue
}
debug!(target: "sync::pipeline", from = %checkpoint, %to, ?bad_block, "Starting unwind");
debug!(
target: "sync::pipeline",
from = %checkpoint.block_number,
%to,
?bad_block,
"Starting unwind"
);
while checkpoint.block_number > to {
let input = UnwindInput { checkpoint, unwind_to: to, bad_block };
self.listeners.notify(PipelineEvent::Unwinding { stage_id, input });
@ -360,8 +371,10 @@ where
}
self.listeners.notify(PipelineEvent::Running {
pipeline_position: stage_index + 1,
pipeline_total: total_stages,
pipeline_stages_progress: event::PipelineStagesProgress {
current: stage_index + 1,
total: total_stages,
},
stage_id,
checkpoint: prev_checkpoint,
});
@ -373,14 +386,25 @@ where
Ok(out @ ExecOutput { checkpoint, done }) => {
made_progress |=
checkpoint.block_number != prev_checkpoint.unwrap_or_default().block_number;
debug!(
target: "sync::pipeline",
stage = %stage_id,
progress = checkpoint.block_number,
%checkpoint,
%done,
"Stage committed progress"
);
if let Some(progress) = checkpoint.entities() {
debug!(
target: "sync::pipeline",
stage = %stage_id,
checkpoint = checkpoint.block_number,
%progress,
%done,
"Stage committed progress"
);
} else {
debug!(
target: "sync::pipeline",
stage = %stage_id,
checkpoint = checkpoint.block_number,
%done,
"Stage committed progress"
);
}
if let Some(metrics_tx) = &mut self.metrics_tx {
let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
stage_id,
@ -391,8 +415,10 @@ where
provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
self.listeners.notify(PipelineEvent::Ran {
pipeline_position: stage_index + 1,
pipeline_total: total_stages,
pipeline_stages_progress: event::PipelineStagesProgress {
current: stage_index + 1,
total: total_stages,
},
stage_id,
result: out.clone(),
});
@ -579,26 +605,22 @@ mod tests {
events.collect::<Vec<PipelineEvent>>().await,
vec![
PipelineEvent::Running {
pipeline_position: 1,
pipeline_total: 2,
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
checkpoint: None
},
PipelineEvent::Ran {
pipeline_position: 1,
pipeline_total: 2,
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
},
PipelineEvent::Running {
pipeline_position: 2,
pipeline_total: 2,
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
checkpoint: None
},
PipelineEvent::Ran {
pipeline_position: 2,
pipeline_total: 2,
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
@ -646,38 +668,32 @@ mod tests {
vec![
// Executing
PipelineEvent::Running {
pipeline_position: 1,
pipeline_total: 3,
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
stage_id: StageId::Other("A"),
checkpoint: None
},
PipelineEvent::Ran {
pipeline_position: 1,
pipeline_total: 3,
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
},
PipelineEvent::Running {
pipeline_position: 2,
pipeline_total: 3,
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
stage_id: StageId::Other("B"),
checkpoint: None
},
PipelineEvent::Ran {
pipeline_position: 2,
pipeline_total: 3,
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
stage_id: StageId::Other("B"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
PipelineEvent::Running {
pipeline_position: 3,
pipeline_total: 3,
pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
stage_id: StageId::Other("C"),
checkpoint: None
},
PipelineEvent::Ran {
pipeline_position: 3,
pipeline_total: 3,
pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
stage_id: StageId::Other("C"),
result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
},
@ -756,26 +772,22 @@ mod tests {
vec![
// Executing
PipelineEvent::Running {
pipeline_position: 1,
pipeline_total: 2,
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
checkpoint: None
},
PipelineEvent::Ran {
pipeline_position: 1,
pipeline_total: 2,
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
},
PipelineEvent::Running {
pipeline_position: 2,
pipeline_total: 2,
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
checkpoint: None
},
PipelineEvent::Ran {
pipeline_position: 2,
pipeline_total: 2,
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
@ -846,20 +858,17 @@ mod tests {
events.collect::<Vec<PipelineEvent>>().await,
vec![
PipelineEvent::Running {
pipeline_position: 1,
pipeline_total: 2,
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
checkpoint: None
},
PipelineEvent::Ran {
pipeline_position: 1,
pipeline_total: 2,
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
PipelineEvent::Running {
pipeline_position: 2,
pipeline_total: 2,
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
checkpoint: None
},
@ -877,26 +886,22 @@ mod tests {
result: UnwindOutput { checkpoint: StageCheckpoint::new(0) },
},
PipelineEvent::Running {
pipeline_position: 1,
pipeline_total: 2,
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
checkpoint: Some(StageCheckpoint::new(0))
},
PipelineEvent::Ran {
pipeline_position: 1,
pipeline_total: 2,
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
PipelineEvent::Running {
pipeline_position: 2,
pipeline_total: 2,
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
checkpoint: None
},
PipelineEvent::Ran {
pipeline_position: 2,
pipeline_total: 2,
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},

View File

@ -206,7 +206,12 @@ where
// Nothing to sync
if gap.is_closed() {
info!(target: "sync::stages::headers", checkpoint = %current_checkpoint, target = ?tip, "Target block already reached");
info!(
target: "sync::stages::headers",
checkpoint = %current_checkpoint.block_number,
target = ?tip,
"Target block already reached"
);
return Ok(ExecOutput::done(current_checkpoint))
}