feat: new payload skipper (#8050)

This commit is contained in:
Roman Krasiuk
2024-05-02 19:01:32 +02:00
committed by GitHub
parent 7845c9c897
commit 5378dd79e2
5 changed files with 96 additions and 3 deletions

View File

@ -63,6 +63,10 @@ pub struct DebugArgs {
#[arg(long = "debug.skip-fcu", help_heading = "Debug")]
pub skip_fcu: Option<usize>,
/// If provided, the engine will skip `n` consecutive new payloads.
#[arg(long = "debug.skip-new-payload", help_heading = "Debug")]
pub skip_new_payload: Option<usize>,
/// The path to store engine API messages at.
/// If specified, all of the intercepted engine API messages
/// will be written to specified location.

View File

@ -12,6 +12,9 @@ use engine_store::EngineStoreStream;
pub mod skip_fcu;
use skip_fcu::EngineSkipFcu;
pub mod skip_new_payload;
use skip_new_payload::EngineSkipNewPayload;
/// The collection of stream extensions for engine API message stream.
pub trait EngineMessageStreamExt<Engine: EngineTypes>:
Stream<Item = BeaconEngineMessage<Engine>>
@ -38,6 +41,31 @@ pub trait EngineMessageStreamExt<Engine: EngineTypes>:
}
}
/// Skips the specified number of [BeaconEngineMessage::NewPayload] messages from the
/// engine message stream.
fn skip_new_payload(self, count: usize) -> EngineSkipNewPayload<Self>
where
Self: Sized,
{
EngineSkipNewPayload::new(self, count)
}
/// If the count is [Some], returns the stream that skips the specified number of
/// [BeaconEngineMessage::NewPayload] messages. Otherwise, returns `Self`.
fn maybe_skip_new_payload(
self,
maybe_count: Option<usize>,
) -> Either<EngineSkipNewPayload<Self>, Self>
where
Self: Sized,
{
if let Some(count) = maybe_count {
Either::Left(self.skip_new_payload(count))
} else {
Either::Right(self)
}
}
/// Stores engine messages at the specified location.
fn store_messages(self, path: PathBuf) -> EngineStoreStream<Self>
where

View File

@ -1,4 +1,4 @@
//! Stores engine API messages to disk for later inspection and replay.
//! Stream wrapper that skips specified number of FCUs.
use futures::{Stream, StreamExt};
use reth_beacon_consensus::{BeaconEngineMessage, OnForkChoiceUpdated};

View File

@ -0,0 +1,60 @@
//! Stream wrapper that skips specified number of new payload messages.
use futures::{Stream, StreamExt};
use reth_beacon_consensus::BeaconEngineMessage;
use reth_engine_primitives::EngineTypes;
use reth_rpc_types::engine::{PayloadStatus, PayloadStatusEnum};
use std::{
pin::Pin,
task::{ready, Context, Poll},
};
/// Engine API stream wrapper that skips the specified number of new payload messages.
#[derive(Debug)]
#[pin_project::pin_project]
pub struct EngineSkipNewPayload<S> {
#[pin]
stream: S,
/// The number of messages to skip.
threshold: usize,
/// Current count of skipped messages.
skipped: usize,
}
impl<S> EngineSkipNewPayload<S> {
/// Creates new [EngineSkipNewPayload] stream wrapper.
pub fn new(stream: S, threshold: usize) -> Self {
Self { stream, threshold, skipped: 0 }
}
}
impl<Engine, S> Stream for EngineSkipNewPayload<S>
where
Engine: EngineTypes,
S: Stream<Item = BeaconEngineMessage<Engine>>,
{
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
let next = ready!(this.stream.poll_next_unpin(cx));
let item = match next {
Some(BeaconEngineMessage::NewPayload { payload, cancun_fields, tx }) => {
if this.skipped < this.threshold {
*this.skipped += 1;
tracing::warn!(target: "engine::intercept", ?payload, ?cancun_fields, threshold=this.threshold, skipped=this.skipped, "Skipping new payload");
let _ = tx.send(Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing)));
continue
} else {
*this.skipped = 0;
Some(BeaconEngineMessage::NewPayload { payload, cancun_fields, tx })
}
}
next => next,
};
return Poll::Ready(item)
}
}
}

View File

@ -266,8 +266,9 @@ where
let node_config = ctx.node_config();
let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
.maybe_skip_fcu(node_config.debug.skip_fcu)
// Store messages _after_ skipping messages so that `replay-engine` command
// would replay the exact same messages that were observed by the engine
.maybe_skip_new_payload(node_config.debug.skip_new_payload)
// Store messages _after_ skipping so that `replay-engine` command
// would replay only the messages that were observed by the engine
// during this run.
.maybe_store_messages(node_config.debug.engine_api_store.clone());