From 5e9612897cf23571a80103d5ea6f9e1063bafa3f Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Thu, 27 Apr 2023 22:47:45 +0200 Subject: [PATCH] refactor: convert payload job from stream to future (#2438) --- Cargo.lock | 1 - crates/payload/basic/src/lib.rs | 62 +++++++++++------------- crates/payload/builder/Cargo.toml | 1 - crates/payload/builder/src/service.rs | 41 ++++++---------- crates/payload/builder/src/test_utils.rs | 8 +-- crates/payload/builder/src/traits.rs | 11 +++-- 6 files changed, 54 insertions(+), 70 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 462491893..d5bbb9607 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5084,7 +5084,6 @@ dependencies = [ name = "reth-payload-builder" version = "0.1.0" dependencies = [ - "futures-core", "futures-util", "metrics", "reth-interfaces", diff --git a/crates/payload/basic/src/lib.rs b/crates/payload/basic/src/lib.rs index 60cc15a87..46b70f4a6 100644 --- a/crates/payload/basic/src/lib.rs +++ b/crates/payload/basic/src/lib.rs @@ -8,7 +8,7 @@ //! reth basic payload job generator use crate::metrics::PayloadBuilderMetrics; -use futures_core::{ready, Stream}; +use futures_core::ready; use futures_util::FutureExt; use reth_payload_builder::{ error::PayloadBuilderError, BuiltPayload, PayloadBuilderAttributes, PayloadJob, @@ -265,42 +265,44 @@ pub struct BasicPayloadJob { metrics: PayloadBuilderMetrics, } -impl Stream for BasicPayloadJob +impl Future for BasicPayloadJob where Client: StateProviderFactory + Clone + Unpin + 'static, Pool: TransactionPool + Unpin + 'static, Tasks: TaskSpawner + Clone + 'static, { - type Item = Result, PayloadBuilderError>; + type Output = Result<(), PayloadBuilderError>; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); // check if the deadline is reached - let deadline_reached = this.deadline.as_mut().poll(cx).is_ready(); + if this.deadline.as_mut().poll(cx).is_ready() { + trace!("Payload building deadline reached"); + return Poll::Ready(Ok(())) + } // check if the interval is reached - if this.interval.poll_tick(cx).is_ready() && - this.pending_block.is_none() && - !deadline_reached - { - let _ = this.interval.poll_tick(cx); - trace!("spawn new payload build task"); - let (tx, rx) = oneshot::channel(); - let client = this.client.clone(); - let pool = this.pool.clone(); - let cancel = Cancelled::default(); - let _cancel = cancel.clone(); - let guard = this.payload_task_guard.clone(); - let payload_config = this.config.clone(); - let best_payload = this.best_payload.clone(); - this.metrics.inc_initiated_payload_builds(); - this.executor.spawn_blocking(Box::pin(async move { - // acquire the permit for executing the task - let _permit = guard.0.acquire().await; - build_payload(client, pool, payload_config, cancel, best_payload, tx) - })); - this.pending_block = Some(PendingPayload { _cancel, payload: rx }); + while this.interval.poll_tick(cx).is_ready() { + // start a new job if there is no pending block and we haven't reached the deadline + if this.pending_block.is_none() { + trace!("spawn new payload build task"); + let (tx, rx) = oneshot::channel(); + let client = this.client.clone(); + let pool = this.pool.clone(); + let cancel = Cancelled::default(); + let _cancel = cancel.clone(); + let guard = this.payload_task_guard.clone(); + let payload_config = this.config.clone(); + let best_payload = this.best_payload.clone(); + this.metrics.inc_initiated_payload_builds(); + this.executor.spawn_blocking(Box::pin(async move { + // acquire the permit for executing the task + let _permit = guard.0.acquire().await; + build_payload(client, pool, payload_config, cancel, best_payload, tx) + })); + this.pending_block = Some(PendingPayload { _cancel, payload: rx }); + } } // poll the pending block @@ -323,10 +325,9 @@ where } } Poll::Ready(Err(err)) => { + // job failed, but we simply try again next interval trace!(?err, "payload build attempt failed"); this.metrics.inc_failed_payload_builds(); - this.interval.reset(); - return Poll::Ready(Some(Err(err))) } Poll::Pending => { this.pending_block = Some(fut); @@ -334,11 +335,6 @@ where } } - if deadline_reached { - trace!("Payload building deadline reached"); - return Poll::Ready(None) - } - Poll::Pending } } diff --git a/crates/payload/builder/Cargo.toml b/crates/payload/builder/Cargo.toml index 87e2ce953..d5856c2a7 100644 --- a/crates/payload/builder/Cargo.toml +++ b/crates/payload/builder/Cargo.toml @@ -26,7 +26,6 @@ reth-metrics-derive = { path = "../../metrics/metrics-derive" } tokio = { version = "1", features = ["sync"] } tokio-stream = "0.1" futures-util = "0.3" -futures-core = "0.3" ## misc thiserror = "1.0" diff --git a/crates/payload/builder/src/service.rs b/crates/payload/builder/src/service.rs index e1ac4c4eb..4094ef49e 100644 --- a/crates/payload/builder/src/service.rs +++ b/crates/payload/builder/src/service.rs @@ -7,7 +7,7 @@ use crate::{ error::PayloadBuilderError, metrics::PayloadBuilderServiceMetrics, traits::PayloadJobGenerator, BuiltPayload, PayloadBuilderAttributes, PayloadJob, }; -use futures_util::stream::{StreamExt, TryStreamExt}; +use futures_util::{future::FutureExt, StreamExt}; use reth_rpc_types::engine::PayloadId; use std::{ future::Future, @@ -161,34 +161,23 @@ where // we poll all jobs first, so we always have the latest payload that we can report if // requests // we don't care about the order of the jobs, so we can just swap_remove them - 'jobs: for idx in (0..this.payload_jobs.len()).rev() { + for idx in (0..this.payload_jobs.len()).rev() { let (mut job, id) = this.payload_jobs.swap_remove(idx); // drain better payloads from the job - loop { - match job.try_poll_next_unpin(cx) { - Poll::Ready(Some(Ok(payload))) => { - this.metrics.set_active_jobs(this.payload_jobs.len()); - trace!(?payload, %id, "new payload"); - } - Poll::Ready(Some(Err(err))) => { - warn!(?err, ?id, "payload job failed; resolving payload"); - this.metrics.inc_failed_jobs(); - - this.payload_jobs.push((job, id)); - continue 'jobs - } - Poll::Ready(None) => { - // job is done - trace!(?id, "payload job finished"); - this.metrics.set_active_jobs(this.payload_jobs.len()); - continue 'jobs - } - Poll::Pending => { - // still pending, put it back - this.payload_jobs.push((job, id)); - continue 'jobs - } + match job.poll_unpin(cx) { + Poll::Ready(Ok(_)) => { + this.metrics.set_active_jobs(this.payload_jobs.len()); + trace!(%id, "payload job finished"); + } + Poll::Ready(Err(err)) => { + warn!(?err, ?id, "payload job failed; resolving payload"); + this.metrics.inc_failed_jobs(); + this.metrics.set_active_jobs(this.payload_jobs.len()); + } + Poll::Pending => { + // still pending, put it back + this.payload_jobs.push((job, id)); } } } diff --git a/crates/payload/builder/src/test_utils.rs b/crates/payload/builder/src/test_utils.rs index 44cd797de..e9f116706 100644 --- a/crates/payload/builder/src/test_utils.rs +++ b/crates/payload/builder/src/test_utils.rs @@ -4,9 +4,9 @@ use crate::{ error::PayloadBuilderError, BuiltPayload, PayloadBuilderAttributes, PayloadBuilderHandle, PayloadBuilderService, PayloadJob, PayloadJobGenerator, }; -use futures_core::Stream; use reth_primitives::{Block, U256}; use std::{ + future::Future, pin::Pin, sync::Arc, task::{Context, Poll}, @@ -47,10 +47,10 @@ pub struct TestPayloadJob { attr: PayloadBuilderAttributes, } -impl Stream for TestPayloadJob { - type Item = Result, PayloadBuilderError>; +impl Future for TestPayloadJob { + type Output = Result<(), PayloadBuilderError>; - fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { Poll::Pending } } diff --git a/crates/payload/builder/src/traits.rs b/crates/payload/builder/src/traits.rs index f64b540b2..e96be34fb 100644 --- a/crates/payload/builder/src/traits.rs +++ b/crates/payload/builder/src/traits.rs @@ -1,21 +1,22 @@ //! Trait abstractions used by the payload crate. use crate::{error::PayloadBuilderError, BuiltPayload, PayloadBuilderAttributes}; -use futures_core::TryStream; + +use std::future::Future; use std::sync::Arc; /// A type that can build a payload. /// -/// This type is a Stream that yields better payloads. +/// This type is a Future that resolves when the job is done (e.g. timed out) or it failed. It's not +/// supposed to return the best payload built when it resolves instead [PayloadJob::best_payload] +/// should be used for that. /// /// A PayloadJob must always be prepared to return the best payload built so far to make there's a /// valid payload to deliver to the CL, so it does not miss a slot, even if the payload is empty. /// /// Note: A PayloadJob need to be cancel safe because it might be dropped after the CL has requested the payload via `engine_getPayloadV1`, See also -pub trait PayloadJob: - TryStream, Error = PayloadBuilderError> + Send + Sync -{ +pub trait PayloadJob: Future> + Send + Sync { /// Returns the best payload that has been built so far. /// /// Note: this is expected to be an empty block without transaction if nothing has been built