From a887557019210ded4b75f1c5beab154577f181d4 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Mon, 1 May 2023 10:26:30 +0200 Subject: [PATCH] feat: add Resolve Payload function (#2482) --- crates/payload/basic/src/lib.rs | 86 +++++++++++++++++++++++- crates/payload/builder/src/lib.rs | 2 +- crates/payload/builder/src/test_utils.rs | 13 +++- crates/payload/builder/src/traits.rs | 37 +++++++++- 4 files changed, 130 insertions(+), 8 deletions(-) diff --git a/crates/payload/basic/src/lib.rs b/crates/payload/basic/src/lib.rs index 75c461238..bc06d5cb4 100644 --- a/crates/payload/basic/src/lib.rs +++ b/crates/payload/basic/src/lib.rs @@ -11,8 +11,8 @@ use crate::metrics::PayloadBuilderMetrics; use futures_core::ready; use futures_util::FutureExt; use reth_payload_builder::{ - error::PayloadBuilderError, BuiltPayload, PayloadBuilderAttributes, PayloadJob, - PayloadJobGenerator, + error::PayloadBuilderError, BuiltPayload, KeepPayloadJobAlive, PayloadBuilderAttributes, + PayloadJob, PayloadJobGenerator, }; use reth_primitives::{ bytes::{Bytes, BytesMut}, @@ -346,6 +346,8 @@ where Pool: TransactionPool + Unpin + 'static, Tasks: TaskSpawner + Clone + 'static, { + type ResolvePayloadFuture = ResolveBestPayload; + fn best_payload(&self) -> Result, PayloadBuilderError> { if let Some(ref payload) = self.best_payload { return Ok(payload.clone()) @@ -359,9 +361,86 @@ where self.metrics.inc_requested_empty_payload(); build_empty_payload(&self.client, self.config.clone()).map(Arc::new) } + + fn resolve(&mut self) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) { + let best_payload = self.best_payload.take(); + let maybe_better = self.pending_block.take(); + let mut empty_payload = None; + + if best_payload.is_none() { + // if no payload has been built yet + self.metrics.inc_requested_empty_payload(); + // no payload built yet, so we need to return an empty payload + let (tx, rx) = oneshot::channel(); + let client = self.client.clone(); + let config = self.config.clone(); + self.executor.spawn_blocking(Box::pin(async move { + let res = build_empty_payload(&client, config); + let _ = tx.send(res); + })); + + empty_payload = Some(rx); + } + + let fut = ResolveBestPayload { best_payload, maybe_better, empty_payload }; + + (fut, KeepPayloadJobAlive::No) + } +} + +/// The future that returns the best payload to be served to the consensus layer. +/// +/// This returns the payload that's supposed to be sent to the CL. +/// +/// If payload has been built so far, it will return that, but it will check if there's a better +/// payload available from an in progress build job. If so it will return that. +/// +/// If no payload has been built so far, it will either return an empty payload or the result of the +/// in progress build job, whatever finishes first. +#[derive(Debug)] +pub struct ResolveBestPayload { + /// Best payload so far. + best_payload: Option>, + /// Regular payload job that's currently running that might produce a better payload. + maybe_better: Option, + /// The empty payload building job in progress. + empty_payload: Option>>, +} + +impl Future for ResolveBestPayload { + type Output = Result, PayloadBuilderError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + // check if there is a better payload before returning the best payload + if let Some(fut) = Pin::new(&mut this.maybe_better).as_pin_mut() { + if let Poll::Ready(res) = fut.poll(cx) { + this.maybe_better = None; + if let Ok(BuildOutcome::Better(payload)) = res { + return Poll::Ready(Ok(Arc::new(payload))) + } + } + } + + if let Some(best) = this.best_payload.take() { + return Poll::Ready(Ok(best)) + } + + let mut empty_payload = this.empty_payload.take().expect("polled after completion"); + match empty_payload.poll_unpin(cx) { + Poll::Ready(Ok(res)) => Poll::Ready(res.map(Arc::new)), + Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())), + Poll::Pending => { + this.empty_payload = Some(empty_payload); + Poll::Pending + } + } + } } /// A future that resolves to the result of the block building job. +#[derive(Debug)] struct PendingPayload { /// The marker to cancel the job on drop _cancel: Cancelled, @@ -381,7 +460,7 @@ impl Future for PendingPayload { /// A marker that can be used to cancel a job. /// /// If dropped, it will set the `cancelled` flag to true. -#[derive(Default, Clone)] +#[derive(Default, Clone, Debug)] struct Cancelled(Arc); // === impl Cancelled === @@ -416,6 +495,7 @@ struct PayloadConfig { chain_spec: Arc, } +#[derive(Debug)] enum BuildOutcome { /// Successfully built a better block. Better(BuiltPayload), diff --git a/crates/payload/builder/src/lib.rs b/crates/payload/builder/src/lib.rs index 4ef45887a..4247f872e 100644 --- a/crates/payload/builder/src/lib.rs +++ b/crates/payload/builder/src/lib.rs @@ -29,4 +29,4 @@ pub mod test_utils; pub use payload::{BuiltPayload, PayloadBuilderAttributes}; pub use reth_rpc_types::engine::PayloadId; pub use service::{PayloadBuilderHandle, PayloadBuilderService, PayloadStore}; -pub use traits::{PayloadJob, PayloadJobGenerator}; +pub use traits::{KeepPayloadJobAlive, PayloadJob, PayloadJobGenerator}; diff --git a/crates/payload/builder/src/test_utils.rs b/crates/payload/builder/src/test_utils.rs index e9f116706..20b290de8 100644 --- a/crates/payload/builder/src/test_utils.rs +++ b/crates/payload/builder/src/test_utils.rs @@ -1,8 +1,9 @@ //! Utils for testing purposes. use crate::{ - error::PayloadBuilderError, BuiltPayload, PayloadBuilderAttributes, PayloadBuilderHandle, - PayloadBuilderService, PayloadJob, PayloadJobGenerator, + error::PayloadBuilderError, traits::KeepPayloadJobAlive, BuiltPayload, + PayloadBuilderAttributes, PayloadBuilderHandle, PayloadBuilderService, PayloadJob, + PayloadJobGenerator, }; use reth_primitives::{Block, U256}; use std::{ @@ -56,6 +57,9 @@ impl Future for TestPayloadJob { } impl PayloadJob for TestPayloadJob { + type ResolvePayloadFuture = + futures_util::future::Ready, PayloadBuilderError>>; + fn best_payload(&self) -> Result, PayloadBuilderError> { Ok(Arc::new(BuiltPayload::new( self.attr.payload_id(), @@ -63,4 +67,9 @@ impl PayloadJob for TestPayloadJob { U256::ZERO, ))) } + + fn resolve(&mut self) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) { + let fut = futures_util::future::ready(self.best_payload()); + (fut, KeepPayloadJobAlive::No) + } } diff --git a/crates/payload/builder/src/traits.rs b/crates/payload/builder/src/traits.rs index 9cc8a73c6..2b0ffe41f 100644 --- a/crates/payload/builder/src/traits.rs +++ b/crates/payload/builder/src/traits.rs @@ -14,11 +14,44 @@ use std::{future::Future, sync::Arc}; /// /// Note: A PayloadJob need to be cancel safe because it might be dropped after the CL has requested the payload via `engine_getPayloadV1`, See also pub trait PayloadJob: Future> + Send + Sync { + /// Represents the future that resolves the block that's returned to the CL. + type ResolvePayloadFuture: Future, PayloadBuilderError>> + + Send + + Sync; + /// Returns the best payload that has been built so far. /// - /// Note: this is expected to be an empty block without transaction if nothing has been built - /// yet. + /// Note: This is never called by the CL. fn best_payload(&self) -> Result, PayloadBuilderError>; + + /// Called when the payload is requested by the CL. + /// + /// This is invoked on [`engine_getPayloadV2`](https://github.com/ethereum/execution-apis/blob/main/src/engine/shanghai.md#engine_getpayloadv2) and [`engine_getPayloadV1`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#engine_getpayloadv1). + /// + /// The timeout for returning the payload to the CL is 1s. + /// Ideally, future returned by this method must resolve in under 1s. Ideally this is the best + /// payload built so far or an empty block without transactions if nothing has been built yet. + /// + /// According to the spec: + /// > Client software MAY stop the corresponding build process after serving this call. + /// + /// It is at the discretion of the implementer whether the build job should be kept alive or + /// terminated. + /// + /// If this returns [KeepPayloadJobAlive::Yes] then the future the [PayloadJob] will be polled + /// once more, if this returns [KeepPayloadJobAlive::No] then the [PayloadJob] will be dropped + /// after this call + fn resolve(&mut self) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive); +} + +/// Whether the payload job should be kept alive or terminated after the payload was requested by +/// the CL. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum KeepPayloadJobAlive { + /// Keep the job alive. + Yes, + /// Terminate the job. + No, } /// A type that knows how to create new jobs for creating payloads.