feat: more builder abstractions (#2157)

This commit is contained in:
Matthias Seitz
2023-04-10 19:22:10 +02:00
committed by GitHub
parent a6366de1cd
commit 5ba45ae34e
8 changed files with 308 additions and 17 deletions

5
Cargo.lock generated
View File

@ -4854,12 +4854,17 @@ dependencies = [
name = "reth-miner"
version = "0.1.0"
dependencies = [
"futures-core",
"futures-util",
"parking_lot 0.12.1",
"reth-primitives",
"reth-rlp",
"reth-rpc-types",
"sha2 0.10.6",
"thiserror",
"tokio",
"tokio-stream",
"tracing",
]
[[package]]

View File

@ -13,7 +13,14 @@ reth-primitives = { path = "../primitives" }
reth-rpc-types = { path = "../rpc/rpc-types" }
reth-rlp = { path = "../rlp" }
## async
tokio = { version = "1", features = ["sync"] }
tokio-stream = "0.1"
futures-util = "0.3"
futures-core = "0.3"
## misc
thiserror = "1.0"
sha2 = { version = "0.10.6", default-features = false }
parking_lot = "0.12.1"
sha2 = { version = "0.10", default-features = false }
parking_lot = "0.12"
tracing = "0.1.37"

View File

@ -1,6 +1,17 @@
//! Error types emitted by types or implementations of this crate.
use tokio::sync::oneshot;
/// Possible error variants during payload building.
#[derive(Debug, thiserror::Error)]
#[error("Payload builder error")]
pub struct PayloadBuilderError;
pub enum PayloadBuilderError {
/// A oneshot channels has been closed.
#[error("Sender has been dropped")]
ChannelClosed,
}
impl From<oneshot::error::RecvError> for PayloadBuilderError {
fn from(_: oneshot::error::RecvError) -> Self {
PayloadBuilderError::ChannelClosed
}
}

View File

@ -10,19 +10,28 @@
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
))]
//! reth miner implementation
//! This trait implements the [PayloadBuilderService] responsible for managing payload jobs.
//!
//! It Defines the abstractions to create and update payloads:
//! - [PayloadJobGenerator]: a type that knows how to create new jobs for creating payloads based
//! on [PayloadAttributes].
//! - [PayloadJob]: a type that can yields (better) payloads over time.
pub mod error;
mod payload;
mod service;
mod traits;
pub use payload::{BuiltPayload, PayloadBuilderAttributes};
pub use reth_rpc_types::engine::PayloadId;
pub use service::{PayloadBuilderHandle, PayloadBuilderService, PayloadStore as PayloadStore2};
pub use traits::{PayloadJob, PayloadJobGenerator};
use crate::error::PayloadBuilderError;
use parking_lot::Mutex;
pub use payload::{BuiltPayload, PayloadBuilderAttributes};
use reth_primitives::H256;
use reth_rpc_types::engine::{ExecutionPayloadEnvelope, PayloadAttributes, PayloadId};
use reth_primitives::{H256, U256};
use reth_rpc_types::engine::{ExecutionPayloadEnvelope, PayloadAttributes};
use std::{collections::HashMap, sync::Arc};
pub mod error;
/// A type that has access to all locally built payloads and can create new ones.
/// This type is intended to by used by the engine API.
pub trait PayloadStore: Send + Sync {
@ -68,7 +77,9 @@ impl PayloadStore for TestPayloadStore {
) -> Result<PayloadId, PayloadBuilderError> {
let attr = PayloadBuilderAttributes::new(parent, attributes);
let payload_id = attr.payload_id();
self.payloads.lock().insert(payload_id, BuiltPayload::new(payload_id, Default::default()));
self.payloads
.lock()
.insert(payload_id, BuiltPayload::new(payload_id, Default::default(), U256::ZERO));
Ok(payload_id)
}
}

View File

@ -1,6 +1,6 @@
//! Contains types required for building a payload.
use reth_primitives::{Address, Block, SealedBlock, Withdrawal, H256};
use reth_primitives::{Address, Block, SealedBlock, Withdrawal, H256, U256};
use reth_rlp::Encodable;
use reth_rpc_types::engine::{PayloadAttributes, PayloadId};
@ -13,27 +13,40 @@ use reth_rpc_types::engine::{PayloadAttributes, PayloadId};
pub struct BuiltPayload {
/// Identifier of the payload
pub(crate) id: PayloadId,
/// The initially empty block.
_initial_empty_block: SealedBlock,
/// The built block
pub(crate) block: SealedBlock,
/// The fees of the block
pub(crate) fees: U256,
}
// === impl BuiltPayload ===
impl BuiltPayload {
/// Initializes the payload with the given initial block.
pub(crate) fn new(id: PayloadId, initial: Block) -> Self {
Self { id, _initial_empty_block: initial.seal_slow() }
pub(crate) fn new(id: PayloadId, block: Block, fees: U256) -> Self {
Self { id, block: block.seal_slow(), fees }
}
/// Returns the identifier of the payload.
pub fn id(&self) -> PayloadId {
self.id
}
/// Returns the identifier of the payload.
pub fn block(&self) -> &SealedBlock {
&self.block
}
/// Fees of the block
pub fn fees(&self) -> U256 {
self.fees
}
}
/// Container type for all components required to build a payload.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PayloadBuilderAttributes {
// TODO include id here
/// Parent block to build the payload on top
pub(crate) parent: H256,
/// Timestamp for the generated payload
@ -63,7 +76,7 @@ impl PayloadBuilderAttributes {
/// Generates the payload id for the configured payload
///
/// Returns an 8-byte identifier by hashing the payload components.
pub fn payload_id(&self) -> PayloadId {
pub(crate) fn payload_id(&self) -> PayloadId {
use sha2::Digest;
let mut hasher = sha2::Sha256::new();
hasher.update(self.parent.as_bytes());

201
crates/miner/src/service.rs Normal file
View File

@ -0,0 +1,201 @@
//! Support for building payloads.
//!
//! The payload builder is responsible for building payloads.
//! Once a new payload is created, it is continuously updated.
use crate::{traits::PayloadJobGenerator, BuiltPayload, PayloadBuilderAttributes, PayloadJob};
use futures_util::stream::{StreamExt, TryStreamExt};
use reth_rpc_types::engine::PayloadId;
use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{trace, warn};
/// A communication channel to the [PayloadBuilderService] that can retrieve payloads.
#[derive(Debug, Clone)]
pub struct PayloadStore {
inner: PayloadBuilderHandle,
}
// === impl PayloadStore ===
impl PayloadStore {
/// Returns the best payload for the given identifier.
pub async fn get_payload(&self, id: PayloadId) -> Option<Arc<BuiltPayload>> {
self.inner.get_payload(id).await
}
}
/// A communication channel to the [PayloadBuilderService].
///
/// This is the API used to create new payloads and to get the current state of existing ones.
#[derive(Debug, Clone)]
pub struct PayloadBuilderHandle {
/// Sender half of the message channel to the [PayloadBuilderService].
to_service: mpsc::UnboundedSender<PayloadServiceCommand>,
}
// === impl PayloadBuilderHandle ===
impl PayloadBuilderHandle {
/// Returns the best payload for the given identifier.
pub async fn get_payload(&self, id: PayloadId) -> Option<Arc<BuiltPayload>> {
let (tx, rx) = oneshot::channel();
self.to_service.send(PayloadServiceCommand::GetPayload(id, tx)).ok()?;
rx.await.ok()?
}
/// Starts building a new payload for the given payload attributes.
///
/// Returns the identifier of the payload.
///
/// Note: if there's already payload in progress with same identifier, it will be returned.
pub async fn new_payload(
&self,
attr: PayloadBuilderAttributes,
) -> Result<PayloadId, oneshot::error::RecvError> {
let (tx, rx) = oneshot::channel();
let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, tx));
rx.await
}
}
/// A service that manages payload building tasks.
///
/// This type is an endless future that manages the building of payloads.
///
/// It tracks active payloads and their build jobs that run in the worker pool.
///
/// By design, this type relies entirely on the [PayloadJobGenerator] to create new payloads and
/// does know nothing about how to build them, itt just drives the payload jobs.
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct PayloadBuilderService<Gen>
where
Gen: PayloadJobGenerator,
{
/// The type that knows how to create new payloads.
generator: Gen,
/// All active payload jobs.
payload_jobs: Vec<(Gen::Job, PayloadId)>,
/// Copy of the sender half, so new [`PayloadBuilderHandle`] can be created on demand.
_service_tx: mpsc::UnboundedSender<PayloadServiceCommand>,
/// Receiver half of the command channel.
command_rx: UnboundedReceiverStream<PayloadServiceCommand>,
}
// === impl PayloadBuilderService ===
impl<Gen> PayloadBuilderService<Gen>
where
Gen: PayloadJobGenerator,
{
/// Creates a new payload builder service.
pub fn new(generator: Gen) -> (Self, PayloadBuilderHandle) {
let (service_tx, command_rx) = mpsc::unbounded_channel();
let service = Self {
generator,
payload_jobs: Vec::new(),
_service_tx: service_tx.clone(),
command_rx: UnboundedReceiverStream::new(command_rx),
};
let handle = PayloadBuilderHandle { to_service: service_tx };
(service, handle)
}
/// Returns true if the given payload is currently being built.
fn contains_payload(&self, id: PayloadId) -> bool {
self.payload_jobs.iter().any(|(_, job_id)| *job_id == id)
}
/// Returns the best payload for the given identifier.
fn get_payload(&self, id: PayloadId) -> Option<Arc<BuiltPayload>> {
self.payload_jobs.iter().find(|(_, job_id)| *job_id == id).map(|(j, _)| j.best_payload())
}
}
impl<Gen> Future for PayloadBuilderService<Gen>
where
Gen: PayloadJobGenerator + Unpin + 'static,
<Gen as PayloadJobGenerator>::Job: Unpin + 'static,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
loop {
// we poll all jobs first, so we always have the latest payload that we can report if
// requests
// we don't care about the order of the jobs, so we can just swap_remove them
'jobs: for idx in (0..this.payload_jobs.len()).rev() {
let (mut job, id) = this.payload_jobs.swap_remove(idx);
// drain better payloads from the job
loop {
match job.try_poll_next_unpin(cx) {
Poll::Ready(Some(Ok(payload))) => {
trace!(?payload, %id, "new payload");
}
Poll::Ready(Some(Err(err))) => {
warn!(?err, %id, "payload job failed; resolving payload");
continue 'jobs
}
Poll::Ready(None) => {
// job is done
trace!(?id, "payload job finished");
continue 'jobs
}
Poll::Pending => {
// still pending, put it back
this.payload_jobs.push((job, id));
continue 'jobs
}
}
}
}
// marker for exit condition
// TODO(mattsse): this could be optmized so we only poll new jobs
let mut new_job = false;
// drain all requests
while let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) {
match cmd {
PayloadServiceCommand::BuildNewPayload(attr, tx) => {
let id = attr.payload_id();
if !this.contains_payload(id) {
// no job for this payload yet, create one
new_job = true;
let job = this.generator.new_payload_job(attr);
this.payload_jobs.push((job, id));
}
// return the id of the payload
let _ = tx.send(id);
}
PayloadServiceCommand::GetPayload(id, tx) => {
let _ = tx.send(this.get_payload(id));
}
}
}
if !new_job {
return Poll::Pending
}
}
}
}
/// Message type for the [PayloadBuilderService].
#[derive(Debug)]
enum PayloadServiceCommand {
/// Start building a new payload.
BuildNewPayload(PayloadBuilderAttributes, oneshot::Sender<PayloadId>),
/// Get the current payload.
GetPayload(PayloadId, oneshot::Sender<Option<Arc<BuiltPayload>>>),
}

View File

@ -0,0 +1,37 @@
//! Trait abstractions used by the payload crate.
use crate::{error::PayloadBuilderError, BuiltPayload, PayloadBuilderAttributes};
use futures_core::TryStream;
use std::sync::Arc;
/// A type that can build a payload.
///
/// This type is a Stream that yields better payloads.
///
/// Note: PaylodJob need to be cancel safe.
///
/// TODO convert this into a future?
pub trait PayloadJob:
TryStream<Ok = Arc<BuiltPayload>, Error = PayloadBuilderError> + Send + Sync
{
/// Returns the best payload that has been built so far.
///
/// Note: this is expected to be an empty block without transaction if nothing has been built
/// yet.
fn best_payload(&self) -> Arc<BuiltPayload>;
}
/// A type that knows how to create new jobs for creating payloads.
pub trait PayloadJobGenerator: Send + Sync {
/// The type that manages the lifecycle of a payload.
///
/// This type is a Stream that yields better payloads payload.
type Job: PayloadJob;
/// Creates the initial payload and a new [PayloadJob] that yields better payloads.
///
/// Note: this is expected to build a new (empty) payload without transactions, so it can be
/// returned directly. when asked for
fn new_payload_job(&self, attr: PayloadBuilderAttributes) -> Self::Job;
}

View File

@ -20,6 +20,12 @@ impl PayloadId {
}
}
impl std::fmt::Display for PayloadId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
/// This structure maps for the return value of `engine_getPayloadV2` of the beacon chain spec.
///
/// See also: <https://github.com/ethereum/execution-apis/blob/main/src/engine/shanghai.md#engine_getpayloadv2>