feat: allow awaiting payload in progress (#11823)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Arsenii Kulikov
2024-10-18 14:45:51 +04:00
committed by GitHub
parent cfd066c071
commit 8d32fd788b
11 changed files with 127 additions and 81 deletions

View File

@ -28,7 +28,7 @@ impl<E: EngineTypes> PayloadTestContext<E> {
) -> eyre::Result<E::PayloadBuilderAttributes> {
self.timestamp += 1;
let attributes: E::PayloadBuilderAttributes = attributes_generator(self.timestamp);
self.payload_builder.new_payload(attributes.clone()).await.unwrap();
self.payload_builder.send_new_payload(attributes.clone()).await.unwrap()?;
Ok(attributes)
}

View File

@ -9,7 +9,7 @@ use reth_chainspec::EthereumHardforks;
use reth_engine_primitives::EngineTypes;
use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_primitives::{
BuiltPayload, PayloadAttributesBuilder, PayloadBuilder, PayloadTypes,
BuiltPayload, PayloadAttributesBuilder, PayloadBuilder, PayloadKind, PayloadTypes,
};
use reth_provider::{BlockReader, ChainSpecProvider};
use reth_rpc_types_compat::engine::payload::block_to_payload;
@ -202,10 +202,9 @@ where
let payload_id = res.payload_id.ok_or_eyre("No payload id")?;
// wait for some time to let the payload be built
tokio::time::sleep(Duration::from_millis(200)).await;
let Some(Ok(payload)) = self.payload_builder.best_payload(payload_id).await else {
let Some(Ok(payload)) =
self.payload_builder.resolve_kind(payload_id, PayloadKind::WaitForPending).await
else {
eyre::bail!("No payload")
};

View File

@ -17,7 +17,9 @@ use reth_chainspec::{ChainSpec, EthereumHardforks};
use reth_payload_builder::{
database::CachedReads, KeepPayloadJobAlive, PayloadId, PayloadJob, PayloadJobGenerator,
};
use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes, PayloadBuilderError};
use reth_payload_primitives::{
BuiltPayload, PayloadBuilderAttributes, PayloadBuilderError, PayloadKind,
};
use reth_primitives::{
constants::{RETH_CLIENT_VERSION, SLOT_DURATION},
proofs, BlockNumberOrTag, SealedBlock, Withdrawals,
@ -474,7 +476,10 @@ where
Ok(self.config.attributes.clone())
}
fn resolve(&mut self) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) {
fn resolve_kind(
&mut self,
kind: PayloadKind,
) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) {
let best_payload = self.best_payload.take();
if best_payload.is_none() && self.pending_block.is_none() {
@ -530,7 +535,11 @@ where
};
}
let fut = ResolveBestPayload { best_payload, maybe_better, empty_payload };
let fut = ResolveBestPayload {
best_payload,
maybe_better,
empty_payload: empty_payload.filter(|_| kind != PayloadKind::WaitForPending),
};
(fut, KeepPayloadJobAlive::No)
}

View File

@ -28,7 +28,7 @@
//! use std::pin::Pin;
//! use std::task::{Context, Poll};
//! use alloy_primitives::U256;
//! use reth_payload_builder::{EthBuiltPayload, PayloadBuilderError, KeepPayloadJobAlive, EthPayloadBuilderAttributes, PayloadJob, PayloadJobGenerator};
//! use reth_payload_builder::{EthBuiltPayload, PayloadBuilderError, KeepPayloadJobAlive, EthPayloadBuilderAttributes, PayloadJob, PayloadJobGenerator, PayloadKind};
//! use reth_primitives::{Block, Header};
//!
//! /// The generator type that creates new jobs that builds empty blocks.
@ -73,7 +73,7 @@
//! Ok(self.attributes.clone())
//! }
//!
//! fn resolve(&mut self) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) {
//! fn resolve_kind(&mut self, _kind: PayloadKind) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) {
//! let payload = self.best_payload();
//! (futures_util::future::ready(payload), KeepPayloadJobAlive::No)
//! }
@ -112,7 +112,7 @@ pub mod noop;
pub mod test_utils;
pub use alloy_rpc_types::engine::PayloadId;
pub use reth_payload_primitives::PayloadBuilderError;
pub use reth_payload_primitives::{PayloadBuilderError, PayloadKind};
pub use service::{
PayloadBuilderHandle, PayloadBuilderService, PayloadServiceCommand, PayloadStore,
};

View File

@ -51,7 +51,7 @@ where
}
PayloadServiceCommand::BestPayload(_, tx) => tx.send(None).ok(),
PayloadServiceCommand::PayloadAttributes(_, tx) => tx.send(None).ok(),
PayloadServiceCommand::Resolve(_, tx) => tx.send(None).ok(),
PayloadServiceCommand::Resolve(_, _, tx) => tx.send(None).ok(),
PayloadServiceCommand::Subscribe(_) => None,
};
}

View File

@ -11,7 +11,7 @@ use alloy_rpc_types::engine::PayloadId;
use futures_util::{future::FutureExt, Stream, StreamExt};
use reth_payload_primitives::{
BuiltPayload, Events, PayloadBuilder, PayloadBuilderAttributes, PayloadBuilderError,
PayloadEvents, PayloadTypes,
PayloadEvents, PayloadKind, PayloadTypes,
};
use reth_provider::CanonStateNotification;
use std::{
@ -45,11 +45,20 @@ where
///
/// Note: depending on the installed [`PayloadJobGenerator`], this may or may not terminate the
/// job, See [`PayloadJob::resolve`].
pub async fn resolve_kind(
&self,
id: PayloadId,
kind: PayloadKind,
) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
self.inner.resolve_kind(id, kind).await
}
/// Resolves the payload job and returns the best payload that has been built so far.
pub async fn resolve(
&self,
id: PayloadId,
) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
self.inner.resolve(id).await
self.resolve_kind(id, PayloadKind::Earliest).await
}
/// Returns the best payload for the given identifier.
@ -110,16 +119,13 @@ where
type PayloadType = T;
type Error = PayloadBuilderError;
async fn send_and_resolve_payload(
fn send_new_payload(
&self,
attr: <Self::PayloadType as PayloadTypes>::PayloadBuilderAttributes,
) -> Result<PayloadFuture<<Self::PayloadType as PayloadTypes>::BuiltPayload>, Self::Error> {
let rx = self.send_new_payload(attr);
let id = rx.await??;
) -> Receiver<Result<PayloadId, Self::Error>> {
let (tx, rx) = oneshot::channel();
let _ = self.to_service.send(PayloadServiceCommand::Resolve(id, tx));
rx.await?.ok_or(PayloadBuilderError::MissingPayload)
let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, tx));
rx
}
/// Note: this does not resolve the job if it's still in progress.
@ -132,21 +138,17 @@ where
rx.await.ok()?
}
fn send_new_payload(
async fn resolve_kind(
&self,
attr: <Self::PayloadType as PayloadTypes>::PayloadBuilderAttributes,
) -> Receiver<Result<PayloadId, Self::Error>> {
id: PayloadId,
kind: PayloadKind,
) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
let (tx, rx) = oneshot::channel();
let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, tx));
rx
}
/// Note: if there's already payload in progress with same identifier, it will be returned.
async fn new_payload(
&self,
attr: <Self::PayloadType as PayloadTypes>::PayloadBuilderAttributes,
) -> Result<PayloadId, Self::Error> {
self.send_new_payload(attr).await?
self.to_service.send(PayloadServiceCommand::Resolve(id, kind, tx)).ok()?;
match rx.await.transpose()? {
Ok(fut) => Some(fut.await),
Err(e) => Some(Err(e.into())),
}
}
async fn subscribe(&self) -> Result<PayloadEvents<Self::PayloadType>, Self::Error> {
@ -168,19 +170,6 @@ where
Self { to_service }
}
/// Resolves the payload job and returns the best payload that has been built so far.
///
/// Note: depending on the installed [`PayloadJobGenerator`], this may or may not terminate the
/// job, See [`PayloadJob::resolve`].
async fn resolve(&self, id: PayloadId) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
let (tx, rx) = oneshot::channel();
self.to_service.send(PayloadServiceCommand::Resolve(id, tx)).ok()?;
match rx.await.transpose()? {
Ok(fut) => Some(fut.await),
Err(e) => Some(Err(e.into())),
}
}
/// Returns the payload attributes associated with the given identifier.
///
/// Note: this returns the attributes of the payload and does not resolve the job.
@ -296,11 +285,15 @@ where
/// Returns the best payload for the given identifier that has been built so far and terminates
/// the job if requested.
fn resolve(&mut self, id: PayloadId) -> Option<PayloadFuture<T::BuiltPayload>> {
fn resolve(
&mut self,
id: PayloadId,
kind: PayloadKind,
) -> Option<PayloadFuture<T::BuiltPayload>> {
trace!(%id, "resolving payload job");
let job = self.payload_jobs.iter().position(|(_, job_id)| *job_id == id)?;
let (fut, keep_alive) = self.payload_jobs[job].0.resolve();
let (fut, keep_alive) = self.payload_jobs[job].0.resolve_kind(kind);
if keep_alive == KeepPayloadJobAlive::No {
let (_, id) = self.payload_jobs.swap_remove(job);
@ -437,8 +430,8 @@ where
let attributes = this.payload_attributes(id);
let _ = tx.send(attributes);
}
PayloadServiceCommand::Resolve(id, tx) => {
let _ = tx.send(this.resolve(id));
PayloadServiceCommand::Resolve(id, strategy, tx) => {
let _ = tx.send(this.resolve(id, strategy));
}
PayloadServiceCommand::Subscribe(tx) => {
let new_rx = this.payload_events.subscribe();
@ -469,7 +462,11 @@ pub enum PayloadServiceCommand<T: PayloadTypes> {
oneshot::Sender<Option<Result<T::PayloadBuilderAttributes, PayloadBuilderError>>>,
),
/// Resolve the payload and return the payload
Resolve(PayloadId, oneshot::Sender<Option<PayloadFuture<T::BuiltPayload>>>),
Resolve(
PayloadId,
/* kind: */ PayloadKind,
oneshot::Sender<Option<PayloadFuture<T::BuiltPayload>>>,
),
/// Payload service events
Subscribe(oneshot::Sender<broadcast::Receiver<Events<T>>>),
}
@ -489,7 +486,7 @@ where
Self::PayloadAttributes(f0, f1) => {
f.debug_tuple("PayloadAttributes").field(&f0).field(&f1).finish()
}
Self::Resolve(f0, _f1) => f.debug_tuple("Resolve").field(&f0).finish(),
Self::Resolve(f0, f1, _f2) => f.debug_tuple("Resolve").field(&f0).field(&f1).finish(),
Self::Subscribe(f0) => f.debug_tuple("Subscribe").field(&f0).finish(),
}
}

View File

@ -7,7 +7,7 @@ use crate::{
use alloy_primitives::U256;
use reth_chain_state::ExecutedBlock;
use reth_payload_primitives::{PayloadBuilderError, PayloadTypes};
use reth_payload_primitives::{PayloadBuilderError, PayloadKind, PayloadTypes};
use reth_primitives::Block;
use reth_provider::CanonStateNotification;
use std::{
@ -96,7 +96,10 @@ impl PayloadJob for TestPayloadJob {
Ok(self.attr.clone())
}
fn resolve(&mut self) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) {
fn resolve_kind(
&mut self,
_kind: PayloadKind,
) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) {
let fut = futures_util::future::ready(self.best_payload());
(fut, KeepPayloadJobAlive::No)
}

View File

@ -1,6 +1,8 @@
//! Trait abstractions used by the payload crate.
use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes, PayloadBuilderError};
use reth_payload_primitives::{
BuiltPayload, PayloadBuilderAttributes, PayloadBuilderError, PayloadKind,
};
use reth_provider::CanonStateNotification;
use std::future::Future;
@ -53,7 +55,21 @@ pub trait PayloadJob: Future<Output = Result<(), PayloadBuilderError>> + Send +
/// If this returns [`KeepPayloadJobAlive::Yes`], then the [`PayloadJob`] will be polled
/// once more. If this returns [`KeepPayloadJobAlive::No`] then the [`PayloadJob`] will be
/// dropped after this call.
fn resolve(&mut self) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive);
///
/// The [`PayloadKind`] determines how the payload should be resolved in the
/// `ResolvePayloadFuture`. [`PayloadKind::Earliest`] should return the earliest available
/// payload (as fast as possible), e.g. racing an empty payload job against a pending job if
/// there's no payload available yet. [`PayloadKind::WaitForPending`] is allowed to wait
/// until a built payload is available.
fn resolve_kind(
&mut self,
kind: PayloadKind,
) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive);
/// Resolves the payload as fast as possible.
fn resolve(&mut self) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) {
self.resolve_kind(PayloadKind::Earliest)
}
}
/// Whether the payload job should be kept alive or terminated after the payload was requested by

View File

@ -342,6 +342,28 @@ pub enum EngineApiMessageVersion {
V4,
}
/// Determines how we should choose the payload to return.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum PayloadKind {
/// Returns the next best available payload (the earliest available payload).
/// This does not wait for a real for pending job to finish if there's no best payload yet and
/// is allowed to race various payload jobs (empty, pending best) against each other and
/// returns whichever job finishes faster.
///
/// This should be used when it's more important to return a valid payload as fast as possible.
/// For example, the engine API timeout for `engine_getPayload` is 1s and clients should rather
/// return an empty payload than indefinitely waiting for the pending payload job to finish and
/// risk missing the deadline.
#[default]
Earliest,
/// Only returns once we have at least one built payload.
///
/// Compared to [`PayloadKind::Earliest`] this does not race an empty payload job against the
/// already in progress one, and returns the best available built payload or awaits the job in
/// progress.
WaitForPending,
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -1,4 +1,4 @@
use crate::{PayloadBuilderError, PayloadEvents, PayloadTypes};
use crate::{PayloadEvents, PayloadKind, PayloadTypes};
use alloy_primitives::{Address, B256, U256};
use alloy_rpc_types::{
engine::{PayloadAttributes as EthPayloadAttributes, PayloadId},
@ -7,12 +7,8 @@ use alloy_rpc_types::{
use op_alloy_rpc_types_engine::OpPayloadAttributes;
use reth_chain_state::ExecutedBlock;
use reth_primitives::{SealedBlock, Withdrawals};
use std::{future::Future, pin::Pin};
use tokio::sync::oneshot;
pub(crate) type PayloadFuture<P> =
Pin<Box<dyn Future<Output = Result<P, PayloadBuilderError>> + Send + Sync>>;
/// A type that can request, subscribe to and resolve payloads.
#[async_trait::async_trait]
pub trait PayloadBuilder: Send + Unpin {
@ -21,12 +17,13 @@ pub trait PayloadBuilder: Send + Unpin {
/// The error type returned by the builder.
type Error;
/// Sends a message to the service to start building a new payload for the given payload
/// attributes and returns a future that resolves to the payload.
async fn send_and_resolve_payload(
/// Sends a message to the service to start building a new payload for the given payload.
///
/// Returns a receiver that will receive the payload id.
fn send_new_payload(
&self,
attr: <Self::PayloadType as PayloadTypes>::PayloadBuilderAttributes,
) -> Result<PayloadFuture<<Self::PayloadType as PayloadTypes>::BuiltPayload>, Self::Error>;
) -> oneshot::Receiver<Result<PayloadId, Self::Error>>;
/// Returns the best payload for the given identifier.
async fn best_payload(
@ -34,22 +31,21 @@ pub trait PayloadBuilder: Send + Unpin {
id: PayloadId,
) -> Option<Result<<Self::PayloadType as PayloadTypes>::BuiltPayload, Self::Error>>;
/// Sends a message to the service to start building a new payload for the given payload.
///
/// This is the same as [`PayloadBuilder::new_payload`] but does not wait for the result
/// and returns the receiver instead
fn send_new_payload(
/// Resolves the payload job and returns the best payload that has been built so far.
async fn resolve_kind(
&self,
attr: <Self::PayloadType as PayloadTypes>::PayloadBuilderAttributes,
) -> oneshot::Receiver<Result<PayloadId, Self::Error>>;
id: PayloadId,
kind: PayloadKind,
) -> Option<Result<<Self::PayloadType as PayloadTypes>::BuiltPayload, Self::Error>>;
/// Starts building a new payload for the given payload attributes.
///
/// Returns the identifier of the payload.
async fn new_payload(
/// Resolves the payload job as fast and possible and returns the best payload that has been
/// built so far.
async fn resolve(
&self,
attr: <Self::PayloadType as PayloadTypes>::PayloadBuilderAttributes,
) -> Result<PayloadId, Self::Error>;
id: PayloadId,
) -> Option<Result<<Self::PayloadType as PayloadTypes>::BuiltPayload, Self::Error>> {
self.resolve_kind(id, PayloadKind::Earliest).await
}
/// Sends a message to the service to subscribe to payload events.
/// Returns a receiver that will receive them.

View File

@ -3,6 +3,7 @@ use reth::{
providers::StateProviderFactory, tasks::TaskSpawner, transaction_pool::TransactionPool,
};
use reth_basic_payload_builder::{PayloadBuilder, PayloadConfig};
use reth_node_api::PayloadKind;
use reth_payload_builder::{KeepPayloadJobAlive, PayloadBuilderError, PayloadJob};
use std::{
@ -52,7 +53,10 @@ where
Ok(self.config.attributes.clone())
}
fn resolve(&mut self) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) {
fn resolve_kind(
&mut self,
_kind: PayloadKind,
) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) {
let payload = self.best_payload();
(futures_util::future::ready(payload), KeepPayloadJobAlive::No)
}