feat: add NoopPayloadBuilderService (#5205)

Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
This commit is contained in:
Matthias Seitz
2023-10-31 18:14:10 +01:00
committed by GitHub
parent ac241928be
commit 3ed8e43aed
3 changed files with 66 additions and 6 deletions

View File

@ -108,6 +108,8 @@ mod payload;
mod service;
mod traits;
pub mod noop;
#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils;

View File

@ -0,0 +1,49 @@
//! A payload builder service task that does nothing.
use crate::{service::PayloadServiceCommand, PayloadBuilderHandle};
use futures_util::{ready, StreamExt};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
/// A service task that does not build any payloads.
#[derive(Debug)]
pub struct NoopPayloadBuilderService {
/// Receiver half of the command channel.
command_rx: UnboundedReceiverStream<PayloadServiceCommand>,
}
impl NoopPayloadBuilderService {
/// Creates a new [NoopPayloadBuilderService].
pub fn new() -> (Self, PayloadBuilderHandle) {
let (service_tx, command_rx) = mpsc::unbounded_channel();
let handle = PayloadBuilderHandle::new(service_tx);
(Self { command_rx: UnboundedReceiverStream::new(command_rx) }, handle)
}
}
impl Future for NoopPayloadBuilderService {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
loop {
let Some(cmd) = ready!(this.command_rx.poll_next_unpin(cx)) else {
return Poll::Ready(())
};
match cmd {
PayloadServiceCommand::BuildNewPayload(attr, tx) => {
let id = attr.payload_id();
tx.send(Ok(id)).ok()
}
PayloadServiceCommand::BestPayload(_, tx) => tx.send(None).ok(),
PayloadServiceCommand::PayloadAttributes(_, tx) => tx.send(None).ok(),
PayloadServiceCommand::Resolve(_, tx) => tx.send(None).ok(),
};
}
}
}

View File

@ -75,10 +75,13 @@ pub struct PayloadBuilderHandle {
/// Sender half of the message channel to the [PayloadBuilderService].
to_service: mpsc::UnboundedSender<PayloadServiceCommand>,
}
// === impl PayloadBuilderHandle ===
impl PayloadBuilderHandle {
pub(crate) fn new(to_service: mpsc::UnboundedSender<PayloadServiceCommand>) -> Self {
Self { to_service }
}
/// Resolves the payload job and returns the best payload that has been built so far.
///
/// Note: depending on the installed [PayloadJobGenerator], this may or may not terminate the
@ -162,7 +165,7 @@ where
/// All active payload jobs.
payload_jobs: Vec<(Gen::Job, PayloadId)>,
/// Copy of the sender half, so new [`PayloadBuilderHandle`] can be created on demand.
_service_tx: mpsc::UnboundedSender<PayloadServiceCommand>,
service_tx: mpsc::UnboundedSender<PayloadServiceCommand>,
/// Receiver half of the command channel.
command_rx: UnboundedReceiverStream<PayloadServiceCommand>,
/// Metrics for the payload builder service
@ -175,20 +178,26 @@ impl<Gen> PayloadBuilderService<Gen>
where
Gen: PayloadJobGenerator,
{
/// Creates a new payload builder service.
/// Creates a new payload builder service and returns the [PayloadBuilderHandle] to interact
/// with it.
pub fn new(generator: Gen) -> (Self, PayloadBuilderHandle) {
let (service_tx, command_rx) = mpsc::unbounded_channel();
let service = Self {
generator,
payload_jobs: Vec::new(),
_service_tx: service_tx.clone(),
service_tx,
command_rx: UnboundedReceiverStream::new(command_rx),
metrics: Default::default(),
};
let handle = PayloadBuilderHandle { to_service: service_tx };
let handle = service.handle();
(service, handle)
}
/// Returns a handle to the service.
pub fn handle(&self) -> PayloadBuilderHandle {
PayloadBuilderHandle::new(self.service_tx.clone())
}
/// Returns true if the given payload is currently being built.
fn contains_payload(&self, id: PayloadId) -> bool {
self.payload_jobs.iter().any(|(_, job_id)| *job_id == id)
@ -319,7 +328,7 @@ type PayloadFuture =
Pin<Box<dyn Future<Output = Result<Arc<BuiltPayload>, PayloadBuilderError>> + Send + Sync>>;
/// Message type for the [PayloadBuilderService].
enum PayloadServiceCommand {
pub(crate) enum PayloadServiceCommand {
/// Start building a new payload.
BuildNewPayload(
PayloadBuilderAttributes,