From 3ed8e43aeda2d9a2436165bb661a5d6626b63b6c Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 31 Oct 2023 18:14:10 +0100 Subject: [PATCH] feat: add NoopPayloadBuilderService (#5205) Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com> --- crates/payload/builder/src/lib.rs | 2 ++ crates/payload/builder/src/noop.rs | 49 +++++++++++++++++++++++++++ crates/payload/builder/src/service.rs | 21 ++++++++---- 3 files changed, 66 insertions(+), 6 deletions(-) create mode 100644 crates/payload/builder/src/noop.rs diff --git a/crates/payload/builder/src/lib.rs b/crates/payload/builder/src/lib.rs index a743553b9..0d5bdec9d 100644 --- a/crates/payload/builder/src/lib.rs +++ b/crates/payload/builder/src/lib.rs @@ -108,6 +108,8 @@ mod payload; mod service; mod traits; +pub mod noop; + #[cfg(any(test, feature = "test-utils"))] pub mod test_utils; diff --git a/crates/payload/builder/src/noop.rs b/crates/payload/builder/src/noop.rs new file mode 100644 index 000000000..805244f2f --- /dev/null +++ b/crates/payload/builder/src/noop.rs @@ -0,0 +1,49 @@ +//! A payload builder service task that does nothing. + +use crate::{service::PayloadServiceCommand, PayloadBuilderHandle}; +use futures_util::{ready, StreamExt}; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::UnboundedReceiverStream; + +/// A service task that does not build any payloads. +#[derive(Debug)] +pub struct NoopPayloadBuilderService { + /// Receiver half of the command channel. + command_rx: UnboundedReceiverStream, +} + +impl NoopPayloadBuilderService { + /// Creates a new [NoopPayloadBuilderService]. + pub fn new() -> (Self, PayloadBuilderHandle) { + let (service_tx, command_rx) = mpsc::unbounded_channel(); + let handle = PayloadBuilderHandle::new(service_tx); + (Self { command_rx: UnboundedReceiverStream::new(command_rx) }, handle) + } +} + +impl Future for NoopPayloadBuilderService { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + loop { + let Some(cmd) = ready!(this.command_rx.poll_next_unpin(cx)) else { + return Poll::Ready(()) + }; + match cmd { + PayloadServiceCommand::BuildNewPayload(attr, tx) => { + let id = attr.payload_id(); + tx.send(Ok(id)).ok() + } + PayloadServiceCommand::BestPayload(_, tx) => tx.send(None).ok(), + PayloadServiceCommand::PayloadAttributes(_, tx) => tx.send(None).ok(), + PayloadServiceCommand::Resolve(_, tx) => tx.send(None).ok(), + }; + } + } +} diff --git a/crates/payload/builder/src/service.rs b/crates/payload/builder/src/service.rs index 9d4a2b8ef..0750fc009 100644 --- a/crates/payload/builder/src/service.rs +++ b/crates/payload/builder/src/service.rs @@ -75,10 +75,13 @@ pub struct PayloadBuilderHandle { /// Sender half of the message channel to the [PayloadBuilderService]. to_service: mpsc::UnboundedSender, } - // === impl PayloadBuilderHandle === impl PayloadBuilderHandle { + pub(crate) fn new(to_service: mpsc::UnboundedSender) -> Self { + Self { to_service } + } + /// Resolves the payload job and returns the best payload that has been built so far. /// /// Note: depending on the installed [PayloadJobGenerator], this may or may not terminate the @@ -162,7 +165,7 @@ where /// All active payload jobs. payload_jobs: Vec<(Gen::Job, PayloadId)>, /// Copy of the sender half, so new [`PayloadBuilderHandle`] can be created on demand. - _service_tx: mpsc::UnboundedSender, + service_tx: mpsc::UnboundedSender, /// Receiver half of the command channel. command_rx: UnboundedReceiverStream, /// Metrics for the payload builder service @@ -175,20 +178,26 @@ impl PayloadBuilderService where Gen: PayloadJobGenerator, { - /// Creates a new payload builder service. + /// Creates a new payload builder service and returns the [PayloadBuilderHandle] to interact + /// with it. pub fn new(generator: Gen) -> (Self, PayloadBuilderHandle) { let (service_tx, command_rx) = mpsc::unbounded_channel(); let service = Self { generator, payload_jobs: Vec::new(), - _service_tx: service_tx.clone(), + service_tx, command_rx: UnboundedReceiverStream::new(command_rx), metrics: Default::default(), }; - let handle = PayloadBuilderHandle { to_service: service_tx }; + let handle = service.handle(); (service, handle) } + /// Returns a handle to the service. + pub fn handle(&self) -> PayloadBuilderHandle { + PayloadBuilderHandle::new(self.service_tx.clone()) + } + /// Returns true if the given payload is currently being built. fn contains_payload(&self, id: PayloadId) -> bool { self.payload_jobs.iter().any(|(_, job_id)| *job_id == id) @@ -319,7 +328,7 @@ type PayloadFuture = Pin, PayloadBuilderError>> + Send + Sync>>; /// Message type for the [PayloadBuilderService]. -enum PayloadServiceCommand { +pub(crate) enum PayloadServiceCommand { /// Start building a new payload. BuildNewPayload( PayloadBuilderAttributes,