diff --git a/Cargo.lock b/Cargo.lock index b9829808f..ddc7b63fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8016,6 +8016,7 @@ version = "1.0.4" dependencies = [ "futures-util", "metrics", + "pin-project", "reth-errors", "reth-ethereum-engine-primitives", "reth-metrics", diff --git a/crates/payload/builder/Cargo.toml b/crates/payload/builder/Cargo.toml index b2d0ca98b..d551b4838 100644 --- a/crates/payload/builder/Cargo.toml +++ b/crates/payload/builder/Cargo.toml @@ -25,6 +25,7 @@ reth-ethereum-engine-primitives.workspace = true tokio = { workspace = true, features = ["sync"] } tokio-stream.workspace = true futures-util.workspace = true +pin-project.workspace = true # metrics reth-metrics.workspace = true diff --git a/crates/payload/builder/src/events.rs b/crates/payload/builder/src/events.rs index 6235ddf7f..57e9365e0 100644 --- a/crates/payload/builder/src/events.rs +++ b/crates/payload/builder/src/events.rs @@ -1,9 +1,14 @@ use reth_payload_primitives::PayloadTypes; +use std::{ + pin::Pin, + task::{ready, Context, Poll}, +}; use tokio::sync::broadcast; use tokio_stream::{ wrappers::{errors::BroadcastStreamRecvError, BroadcastStream}, - StreamExt, + Stream, StreamExt, }; +use tracing::debug; /// Payload builder events. #[derive(Clone, Debug)] @@ -34,4 +39,74 @@ impl PayloadEvents { let mut event_stream = self.into_stream(); event_stream.next().await } + + /// Returns a new stream that yields all built payloads. + pub fn into_built_payload_stream(self) -> BuiltPayloadStream { + BuiltPayloadStream { st: self.into_stream() } + } + + /// Returns a new stream that yields received payload attributes + pub fn into_attributes_stream(self) -> PayloadAttributeStream { + PayloadAttributeStream { st: self.into_stream() } + } +} + +/// A stream that yields built payloads. +#[derive(Debug)] +#[pin_project::pin_project] +pub struct BuiltPayloadStream { + /// The stream of events. + #[pin] + st: BroadcastStream>, +} + +impl Stream for BuiltPayloadStream { + type Item = T::BuiltPayload; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + return match ready!(self.as_mut().project().st.poll_next(cx)) { + Some(Ok(Events::BuiltPayload(payload))) => Poll::Ready(Some(payload)), + Some(Ok(Events::Attributes(_))) => { + // ignoring attributes + continue + } + Some(Err(err)) => { + debug!(%err, "payload event stream stream lagging behind"); + continue + } + None => Poll::Ready(None), + } + } + } +} + +/// A stream that yields received payload attributes +#[derive(Debug)] +#[pin_project::pin_project] +pub struct PayloadAttributeStream { + /// The stream of events. + #[pin] + st: BroadcastStream>, +} + +impl Stream for PayloadAttributeStream { + type Item = T::PayloadBuilderAttributes; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + return match ready!(self.as_mut().project().st.poll_next(cx)) { + Some(Ok(Events::Attributes(attr))) => Poll::Ready(Some(attr)), + Some(Ok(Events::BuiltPayload(_))) => { + // ignoring payloads + continue + } + Some(Err(err)) => { + debug!(%err, "payload event stream stream lagging behind"); + continue + } + None => Poll::Ready(None), + } + } + } }