feat: implement resolve step (#2507)

This commit is contained in:
Matthias Seitz
2023-05-01 21:22:43 +02:00
committed by GitHub
parent d34349bf0b
commit 6a55f70251
3 changed files with 71 additions and 17 deletions

View File

@ -5,7 +5,7 @@
use crate::{
error::PayloadBuilderError, metrics::PayloadBuilderServiceMetrics, traits::PayloadJobGenerator,
BuiltPayload, PayloadBuilderAttributes, PayloadJob,
BuiltPayload, KeepPayloadJobAlive, PayloadBuilderAttributes, PayloadJob,
};
use futures_util::{future::FutureExt, StreamExt};
use reth_rpc_types::engine::PayloadId;
@ -28,12 +28,25 @@ pub struct PayloadStore {
// === impl PayloadStore ===
impl PayloadStore {
/// Returns the best payload for the given identifier.
pub async fn get_payload(
/// Resolves the payload job and returns the best payload that has been built so far.
///
/// Note: depending on the installed [PayloadJobGenerator], this may or may not terminate the
/// job, See [PayloadJob::resolve].
pub async fn resolve(
&self,
id: PayloadId,
) -> Option<Result<Arc<BuiltPayload>, PayloadBuilderError>> {
self.inner.get_payload(id).await
self.inner.resolve(id).await
}
/// Returns the best payload for the given identifier.
///
/// Note: this merely returns the best payload so far and does not resolve the job.
pub async fn best_payload(
&self,
id: PayloadId,
) -> Option<Result<Arc<BuiltPayload>, PayloadBuilderError>> {
self.inner.best_payload(id).await
}
}
@ -55,13 +68,29 @@ pub struct PayloadBuilderHandle {
// === impl PayloadBuilderHandle ===
impl PayloadBuilderHandle {
/// Returns the best payload for the given identifier.
pub async fn get_payload(
/// Resolves the payload job and returns the best payload that has been built so far.
///
/// Note: depending on the installed [PayloadJobGenerator], this may or may not terminate the
/// job, See [PayloadJob::resolve].
pub async fn resolve(
&self,
id: PayloadId,
) -> Option<Result<Arc<BuiltPayload>, PayloadBuilderError>> {
let (tx, rx) = oneshot::channel();
self.to_service.send(PayloadServiceCommand::GetPayload(id, tx)).ok()?;
self.to_service.send(PayloadServiceCommand::Resolve(id, tx)).ok()?;
match rx.await.transpose()? {
Ok(fut) => Some(fut.await),
Err(e) => Some(Err(e.into())),
}
}
/// Returns the best payload for the given identifier.
pub async fn best_payload(
&self,
id: PayloadId,
) -> Option<Result<Arc<BuiltPayload>, PayloadBuilderError>> {
let (tx, rx) = oneshot::channel();
self.to_service.send(PayloadServiceCommand::BestPayload(id, tx)).ok()?;
rx.await.ok()?
}
@ -141,10 +170,27 @@ where
self.payload_jobs.iter().any(|(_, job_id)| *job_id == id)
}
/// Returns the best payload for the given identifier.
fn get_payload(&self, id: PayloadId) -> Option<Result<Arc<BuiltPayload>, PayloadBuilderError>> {
/// Returns the best payload for the given identifier that has been built so far.
fn best_payload(
&self,
id: PayloadId,
) -> Option<Result<Arc<BuiltPayload>, PayloadBuilderError>> {
self.payload_jobs.iter().find(|(_, job_id)| *job_id == id).map(|(j, _)| j.best_payload())
}
/// Returns the best payload for the given identifier that has been built so far and terminates
/// the job if requested.
fn resolve(&mut self, id: PayloadId) -> Option<PayloadFuture> {
let job = self.payload_jobs.iter().position(|(_, job_id)| *job_id == id)?;
let (fut, keep_alive) = self.payload_jobs[job].0.resolve();
if keep_alive == KeepPayloadJobAlive::No {
let (_, id) = self.payload_jobs.remove(job);
trace!(%id, "terminated resolved job");
}
Some(Box::pin(fut))
}
}
impl<Gen> Future for PayloadBuilderService<Gen>
@ -214,8 +260,11 @@ where
// return the id of the payload
let _ = tx.send(res);
}
PayloadServiceCommand::GetPayload(id, tx) => {
let _ = tx.send(this.get_payload(id));
PayloadServiceCommand::BestPayload(id, tx) => {
let _ = tx.send(this.best_payload(id));
}
PayloadServiceCommand::Resolve(id, tx) => {
let _ = tx.send(this.resolve(id));
}
}
}
@ -227,14 +276,18 @@ where
}
}
type PayloadFuture =
Pin<Box<dyn Future<Output = Result<Arc<BuiltPayload>, PayloadBuilderError>> + Send>>;
/// Message type for the [PayloadBuilderService].
#[derive(Debug)]
enum PayloadServiceCommand {
/// Start building a new payload.
BuildNewPayload(
PayloadBuilderAttributes,
oneshot::Sender<Result<PayloadId, PayloadBuilderError>>,
),
/// Get the current payload.
GetPayload(PayloadId, oneshot::Sender<Option<Result<Arc<BuiltPayload>, PayloadBuilderError>>>),
/// Get the best payload so far
BestPayload(PayloadId, oneshot::Sender<Option<Result<Arc<BuiltPayload>, PayloadBuilderError>>>),
/// Resolve the payload and return the payload
Resolve(PayloadId, oneshot::Sender<Option<PayloadFuture>>),
}

View File

@ -17,7 +17,8 @@ pub trait PayloadJob: Future<Output = Result<(), PayloadBuilderError>> + Send +
/// Represents the future that resolves the block that's returned to the CL.
type ResolvePayloadFuture: Future<Output = Result<Arc<BuiltPayload>, PayloadBuilderError>>
+ Send
+ Sync;
+ Sync
+ 'static;
/// Returns the best payload that has been built so far.
///