feat(payload): abstract payload builder in trait (#10965)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
greged93
2024-09-20 09:12:14 +02:00
committed by GitHub
parent 08bdec9efa
commit 7cf492d525
28 changed files with 169 additions and 174 deletions

View File

@ -15,8 +15,6 @@ workspace = true
# reth
reth-primitives.workspace = true
reth-rpc-types.workspace = true
reth-transaction-pool.workspace = true
reth-errors.workspace = true
reth-provider.workspace = true
reth-payload-primitives.workspace = true
reth-ethereum-engine-primitives.workspace = true
@ -26,17 +24,16 @@ reth-chain-state = { workspace = true, optional = true }
alloy-primitives.workspace = true
# async
async-trait.workspace = true
tokio = { workspace = true, features = ["sync"] }
tokio-stream.workspace = true
futures-util.workspace = true
pin-project.workspace = true
# metrics
reth-metrics.workspace = true
metrics.workspace = true
# misc
thiserror.workspace = true
tracing.workspace = true
[dev-dependencies]

View File

@ -1,58 +0,0 @@
//! Error types emitted by types or implementations of this crate.
use alloy_primitives::B256;
use reth_errors::{ProviderError, RethError};
use reth_primitives::revm_primitives::EVMError;
use reth_transaction_pool::BlobStoreError;
use tokio::sync::oneshot;
/// Possible error variants during payload building.
#[derive(Debug, thiserror::Error)]
pub enum PayloadBuilderError {
/// Thrown when the parent block is missing.
#[error("missing parent block {0}")]
MissingParentBlock(B256),
/// An oneshot channels has been closed.
#[error("sender has been dropped")]
ChannelClosed,
/// If there's no payload to resolve.
#[error("missing payload")]
MissingPayload,
/// Error occurring in the blob store.
#[error(transparent)]
BlobStore(#[from] BlobStoreError),
/// Other internal error
#[error(transparent)]
Internal(#[from] RethError),
/// Unrecoverable error during evm execution.
#[error("evm execution error: {0}")]
EvmExecutionError(EVMError<ProviderError>),
/// Thrown if the payload requests withdrawals before Shanghai activation.
#[error("withdrawals set before Shanghai activation")]
WithdrawalsBeforeShanghai,
/// Any other payload building errors.
#[error(transparent)]
Other(Box<dyn std::error::Error + Send + Sync>),
}
impl PayloadBuilderError {
/// Create a new error from a boxed error.
pub fn other<E>(error: E) -> Self
where
E: std::error::Error + Send + Sync + 'static,
{
Self::Other(Box::new(error))
}
}
impl From<ProviderError> for PayloadBuilderError {
fn from(error: ProviderError) -> Self {
Self::Internal(RethError::Provider(error))
}
}
impl From<oneshot::error::RecvError> for PayloadBuilderError {
fn from(_: oneshot::error::RecvError) -> Self {
Self::ChannelClosed
}
}

View File

@ -1,112 +0,0 @@
use reth_payload_primitives::PayloadTypes;
use std::{
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::sync::broadcast;
use tokio_stream::{
wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
Stream, StreamExt,
};
use tracing::debug;
/// Payload builder events.
#[derive(Clone, Debug)]
pub enum Events<T: PayloadTypes> {
/// The payload attributes as
/// they are received from the CL through the engine api.
Attributes(T::PayloadBuilderAttributes),
/// The built payload that has been just built.
/// Triggered by the CL whenever it asks for an execution payload.
/// This event is only thrown if the CL is a validator.
BuiltPayload(T::BuiltPayload),
}
/// Represents a receiver for various payload events.
#[derive(Debug)]
pub struct PayloadEvents<T: PayloadTypes> {
/// The receiver for the payload events.
pub receiver: broadcast::Receiver<Events<T>>,
}
impl<T: PayloadTypes + 'static> PayloadEvents<T> {
/// Convert this receiver into a stream of `PayloadEvents`.
pub fn into_stream(self) -> BroadcastStream<Events<T>> {
BroadcastStream::new(self.receiver)
}
/// Asynchronously receives the next payload event.
pub async fn recv(self) -> Option<Result<Events<T>, BroadcastStreamRecvError>> {
let mut event_stream = self.into_stream();
event_stream.next().await
}
/// Returns a new stream that yields all built payloads.
pub fn into_built_payload_stream(self) -> BuiltPayloadStream<T> {
BuiltPayloadStream { st: self.into_stream() }
}
/// Returns a new stream that yields received payload attributes
pub fn into_attributes_stream(self) -> PayloadAttributeStream<T> {
PayloadAttributeStream { st: self.into_stream() }
}
}
/// A stream that yields built payloads.
#[derive(Debug)]
#[pin_project::pin_project]
pub struct BuiltPayloadStream<T: PayloadTypes> {
/// The stream of events.
#[pin]
st: BroadcastStream<Events<T>>,
}
impl<T: PayloadTypes + 'static> Stream for BuiltPayloadStream<T> {
type Item = T::BuiltPayload;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
return match ready!(self.as_mut().project().st.poll_next(cx)) {
Some(Ok(Events::BuiltPayload(payload))) => Poll::Ready(Some(payload)),
Some(Ok(Events::Attributes(_))) => {
// ignoring attributes
continue
}
Some(Err(err)) => {
debug!(%err, "payload event stream stream lagging behind");
continue
}
None => Poll::Ready(None),
}
}
}
}
/// A stream that yields received payload attributes
#[derive(Debug)]
#[pin_project::pin_project]
pub struct PayloadAttributeStream<T: PayloadTypes> {
/// The stream of events.
#[pin]
st: BroadcastStream<Events<T>>,
}
impl<T: PayloadTypes + 'static> Stream for PayloadAttributeStream<T> {
type Item = T::PayloadBuilderAttributes;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
return match ready!(self.as_mut().project().st.poll_next(cx)) {
Some(Ok(Events::Attributes(attr))) => Poll::Ready(Some(attr)),
Some(Ok(Events::BuiltPayload(_))) => {
// ignoring payloads
continue
}
Some(Err(err)) => {
debug!(%err, "payload event stream stream lagging behind");
continue
}
None => Poll::Ready(None),
}
}
}
}

View File

@ -27,8 +27,7 @@
//! use std::future::Future;
//! use std::pin::Pin;
//! use std::task::{Context, Poll};
//! use reth_payload_builder::{EthBuiltPayload, KeepPayloadJobAlive, EthPayloadBuilderAttributes, PayloadJob, PayloadJobGenerator};
//! use reth_payload_builder::error::PayloadBuilderError;
//! use reth_payload_builder::{EthBuiltPayload, PayloadBuilderError, KeepPayloadJobAlive, EthPayloadBuilderAttributes, PayloadJob, PayloadJobGenerator};
//! use reth_primitives::{Block, Header, U256};
//!
//! /// The generator type that creates new jobs that builds empty blocks.
@ -102,8 +101,6 @@
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
pub mod database;
pub mod error;
mod events;
mod metrics;
mod service;
mod traits;
@ -113,7 +110,7 @@ pub mod noop;
#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils;
pub use events::{Events, PayloadEvents};
pub use reth_payload_primitives::PayloadBuilderError;
pub use reth_rpc_types::engine::PayloadId;
pub use service::{
PayloadBuilderHandle, PayloadBuilderService, PayloadServiceCommand, PayloadStore,

View File

@ -20,7 +20,7 @@ pub struct NoopPayloadBuilderService<T: PayloadTypes> {
impl<T> NoopPayloadBuilderService<T>
where
T: PayloadTypes + 'static,
T: PayloadTypes,
{
/// Creates a new [`NoopPayloadBuilderService`].
pub fn new() -> (Self, PayloadBuilderHandle<T>) {

View File

@ -4,14 +4,14 @@
//! Once a new payload is created, it is continuously updated.
use crate::{
error::PayloadBuilderError,
events::{Events, PayloadEvents},
metrics::PayloadBuilderServiceMetrics,
traits::PayloadJobGenerator,
KeepPayloadJobAlive, PayloadJob,
metrics::PayloadBuilderServiceMetrics, traits::PayloadJobGenerator, KeepPayloadJobAlive,
PayloadJob,
};
use futures_util::{future::FutureExt, Stream, StreamExt};
use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes, PayloadTypes};
use reth_payload_primitives::{
BuiltPayload, Events, PayloadBuilder, PayloadBuilderAttributes, PayloadBuilderError,
PayloadEvents, PayloadTypes,
};
use reth_provider::CanonStateNotification;
use reth_rpc_types::engine::PayloadId;
use std::{
@ -22,7 +22,7 @@ use std::{
};
use tokio::sync::{
broadcast, mpsc,
oneshot::{self, error::RecvError},
oneshot::{self, Receiver},
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, info, trace, warn};
@ -39,7 +39,7 @@ pub struct PayloadStore<T: PayloadTypes> {
impl<T> PayloadStore<T>
where
T: PayloadTypes + 'static,
T: PayloadTypes,
{
/// Resolves the payload job and returns the best payload that has been built so far.
///
@ -102,9 +102,63 @@ pub struct PayloadBuilderHandle<T: PayloadTypes> {
// === impl PayloadBuilderHandle ===
#[async_trait::async_trait]
impl<T> PayloadBuilder for PayloadBuilderHandle<T>
where
T: PayloadTypes,
{
type PayloadType = T;
type Error = PayloadBuilderError;
async fn send_and_resolve_payload(
&self,
attr: <Self::PayloadType as PayloadTypes>::PayloadBuilderAttributes,
) -> Result<PayloadFuture<<Self::PayloadType as PayloadTypes>::BuiltPayload>, Self::Error> {
let rx = self.send_new_payload(attr);
let id = rx.await??;
let (tx, rx) = oneshot::channel();
let _ = self.to_service.send(PayloadServiceCommand::Resolve(id, tx));
rx.await?.ok_or(PayloadBuilderError::MissingPayload)
}
/// Note: this does not resolve the job if it's still in progress.
async fn best_payload(
&self,
id: PayloadId,
) -> Option<Result<<Self::PayloadType as PayloadTypes>::BuiltPayload, Self::Error>> {
let (tx, rx) = oneshot::channel();
self.to_service.send(PayloadServiceCommand::BestPayload(id, tx)).ok()?;
rx.await.ok()?
}
fn send_new_payload(
&self,
attr: <Self::PayloadType as PayloadTypes>::PayloadBuilderAttributes,
) -> Receiver<Result<PayloadId, Self::Error>> {
let (tx, rx) = oneshot::channel();
let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, tx));
rx
}
/// Note: if there's already payload in progress with same identifier, it will be returned.
async fn new_payload(
&self,
attr: <Self::PayloadType as PayloadTypes>::PayloadBuilderAttributes,
) -> Result<PayloadId, Self::Error> {
self.send_new_payload(attr).await?
}
async fn subscribe(&self) -> Result<PayloadEvents<Self::PayloadType>, Self::Error> {
let (tx, rx) = oneshot::channel();
let _ = self.to_service.send(PayloadServiceCommand::Subscribe(tx));
Ok(PayloadEvents { receiver: rx.await? })
}
}
impl<T> PayloadBuilderHandle<T>
where
T: PayloadTypes + 'static,
T: PayloadTypes,
{
/// Creates a new payload builder handle for the given channel.
///
@ -127,32 +181,6 @@ where
}
}
/// Sends a message to the service to start building a new payload for the given payload
/// attributes and returns a future that resolves to the payload.
pub async fn send_and_resolve_payload(
&self,
attr: T::PayloadBuilderAttributes,
) -> Result<PayloadFuture<T::BuiltPayload>, PayloadBuilderError> {
let rx = self.send_new_payload(attr);
let id = rx.await??;
let (tx, rx) = oneshot::channel();
let _ = self.to_service.send(PayloadServiceCommand::Resolve(id, tx));
rx.await?.ok_or(PayloadBuilderError::MissingPayload)
}
/// Returns the best payload for the given identifier.
///
/// Note: this does not resolve the job if it's still in progress.
pub async fn best_payload(
&self,
id: PayloadId,
) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
let (tx, rx) = oneshot::channel();
self.to_service.send(PayloadServiceCommand::BestPayload(id, tx)).ok()?;
rx.await.ok()?
}
/// Returns the payload attributes associated with the given identifier.
///
/// Note: this returns the attributes of the payload and does not resolve the job.
@ -164,39 +192,6 @@ where
self.to_service.send(PayloadServiceCommand::PayloadAttributes(id, tx)).ok()?;
rx.await.ok()?
}
/// Sends a message to the service to start building a new payload for the given payload.
///
/// This is the same as [`PayloadBuilderHandle::new_payload`] but does not wait for the result
/// and returns the receiver instead
pub fn send_new_payload(
&self,
attr: T::PayloadBuilderAttributes,
) -> oneshot::Receiver<Result<PayloadId, PayloadBuilderError>> {
let (tx, rx) = oneshot::channel();
let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, tx));
rx
}
/// Starts building a new payload for the given payload attributes.
///
/// Returns the identifier of the payload.
///
/// Note: if there's already payload in progress with same identifier, it will be returned.
pub async fn new_payload(
&self,
attr: T::PayloadBuilderAttributes,
) -> Result<PayloadId, PayloadBuilderError> {
self.send_new_payload(attr).await?
}
/// Sends a message to the service to subscribe to payload events.
/// Returns a receiver that will receive them.
pub async fn subscribe(&self) -> Result<PayloadEvents<T>, RecvError> {
let (tx, rx) = oneshot::channel();
let _ = self.to_service.send(PayloadServiceCommand::Subscribe(tx));
Ok(PayloadEvents { receiver: rx.await? })
}
}
impl<T> Clone for PayloadBuilderHandle<T>
@ -246,7 +241,7 @@ const PAYLOAD_EVENTS_BUFFER_SIZE: usize = 20;
impl<Gen, St, T> PayloadBuilderService<Gen, St, T>
where
T: PayloadTypes + 'static,
T: PayloadTypes,
Gen: PayloadJobGenerator,
Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
<Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
@ -360,7 +355,7 @@ where
impl<Gen, St, T> Future for PayloadBuilderService<Gen, St, T>
where
T: PayloadTypes + 'static,
T: PayloadTypes,
Gen: PayloadJobGenerator + Unpin + 'static,
<Gen as PayloadJobGenerator>::Job: Unpin + 'static,
St: Stream<Item = CanonStateNotification> + Send + Unpin + 'static,

View File

@ -1,13 +1,13 @@
//! Utils for testing purposes.
use crate::{
error::PayloadBuilderError, traits::KeepPayloadJobAlive, EthBuiltPayload,
EthPayloadBuilderAttributes, PayloadBuilderHandle, PayloadBuilderService, PayloadJob,
PayloadJobGenerator,
traits::KeepPayloadJobAlive, EthBuiltPayload, EthPayloadBuilderAttributes,
PayloadBuilderHandle, PayloadBuilderService, PayloadJob, PayloadJobGenerator,
};
use alloy_primitives::U256;
use reth_chain_state::ExecutedBlock;
use reth_payload_primitives::PayloadTypes;
use reth_payload_primitives::{PayloadBuilderError, PayloadTypes};
use reth_primitives::Block;
use reth_provider::CanonStateNotification;
use std::{

View File

@ -1,7 +1,6 @@
//! Trait abstractions used by the payload crate.
use crate::error::PayloadBuilderError;
use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes};
use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes, PayloadBuilderError};
use reth_provider::CanonStateNotification;
use std::future::Future;