feat: process payload job initiation async (#2295)

This commit is contained in:
Matthias Seitz
2023-04-19 08:33:17 +02:00
committed by GitHub
parent 862537362e
commit 534d23eed8
6 changed files with 223 additions and 52 deletions

View File

@ -1,5 +1,5 @@
use reth_payload_builder::error::PayloadBuilderError;
use reth_rpc_types::engine::{EngineRpcError, PayloadError};
use reth_rpc_types::engine::{EngineRpcError, ForkchoiceUpdateError, PayloadError};
use reth_stages::PipelineError;
use thiserror::Error;
@ -48,3 +48,14 @@ impl From<reth_interfaces::db::Error> for BeaconEngineError {
Self::Common(e.into())
}
}
/// Represents error cases for an applied forkchoice update.
#[derive(Error, Debug, Eq, PartialEq)]
pub enum BeaconForkChoiceUpdateError {
/// Thrown when a forkchoice update resulted in an error.
#[error("Forkchoice update error: {0}")]
ForkchoiceUpdateError(#[from] ForkchoiceUpdateError),
/// Thrown when the engine task stopped
#[error("beacon consensus engine task stopped")]
EngineUnavailable,
}

View File

@ -1,10 +1,116 @@
use crate::BeaconEngineResult;
use futures::{future::Either, FutureExt};
use reth_interfaces::consensus::ForkchoiceState;
use reth_payload_builder::error::PayloadBuilderError;
use reth_rpc_types::engine::{
ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
ExecutionPayload, ForkChoiceUpdateResult, ForkchoiceUpdateError, ForkchoiceUpdated,
PayloadAttributes, PayloadId, PayloadStatus,
};
use std::{
future::Future,
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::sync::oneshot;
/// Represents the outcome of forkchoice update.
///
/// This is a future that resolves to [ForkChoiceUpdateResult]
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
pub struct OnForkChoiceUpdated {
is_valid_update: bool,
/// Returns the result of the forkchoice update.
fut: Either<futures::future::Ready<ForkChoiceUpdateResult>, PendingPayloadId>,
}
// === impl OnForkChoiceUpdated ===
impl OnForkChoiceUpdated {
/// Returns true if this update is valid
pub(crate) fn is_valid_update(&self) -> bool {
self.is_valid_update
}
/// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update succeeded and no
/// payload attributes were provided.
pub(crate) fn valid(status: PayloadStatus) -> Self {
Self {
is_valid_update: status.is_valid(),
fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
}
}
/// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update failed because the
/// given state is considered invalid
pub(crate) fn invalid_state() -> Self {
Self {
is_valid_update: false,
fut: Either::Left(futures::future::ready(Err(ForkchoiceUpdateError::InvalidState))),
}
}
/// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update was successful but
/// payload attributes were invalid.
pub(crate) fn invalid_payload_attributes() -> Self {
Self {
// This is valid because this is only reachable if the state and payload is valid
is_valid_update: true,
fut: Either::Left(futures::future::ready(Err(
ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes,
))),
}
}
/// If the forkchoice update was successful and no payload attributes were provided, this method
pub(crate) fn updated_with_pending_payload_id(
payload_status: PayloadStatus,
pending_payload_id: oneshot::Receiver<Result<PayloadId, PayloadBuilderError>>,
) -> Self {
Self {
is_valid_update: payload_status.is_valid(),
fut: Either::Right(PendingPayloadId {
payload_status: Some(payload_status),
pending_payload_id,
}),
}
}
}
impl Future for OnForkChoiceUpdated {
type Output = ForkChoiceUpdateResult;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.get_mut().fut.poll_unpin(cx)
}
}
/// A future that returns the payload id of a yet to be initiated payload job after a successful
/// forkchoice update
#[derive(Debug)]
struct PendingPayloadId {
payload_status: Option<PayloadStatus>,
pending_payload_id: oneshot::Receiver<Result<PayloadId, PayloadBuilderError>>,
}
impl Future for PendingPayloadId {
type Output = ForkChoiceUpdateResult;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let res = ready!(this.pending_payload_id.poll_unpin(cx));
match res {
Ok(Ok(payload_id)) => Poll::Ready(Ok(ForkchoiceUpdated {
payload_status: this.payload_status.take().expect("Polled after completion"),
payload_id: Some(payload_id),
})),
Err(_) | Ok(Err(_)) => {
// failed to initiate a payload build job
Poll::Ready(Err(ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes))
}
}
}
}
/// A message for the beacon engine from other components of the node (engine RPC API invoked by the
/// consensus layer).
#[derive(Debug)]
@ -23,6 +129,6 @@ pub enum BeaconEngineMessage {
/// The payload attributes for block building.
payload_attrs: Option<PayloadAttributes>,
/// The sender for returning forkchoice updated result.
tx: oneshot::Sender<BeaconEngineResult<ForkchoiceUpdated>>,
tx: oneshot::Sender<OnForkChoiceUpdated>,
},
}

View File

@ -1,5 +1,5 @@
use crate::engine::metrics::Metrics;
use futures::{Future, FutureExt, StreamExt};
use futures::{Future, FutureExt, StreamExt, TryFutureExt};
use reth_db::{database::Database, tables, transaction::DbTx};
use reth_interfaces::{
blockchain_tree::{BlockStatus, BlockchainTreeEngine},
@ -11,8 +11,7 @@ use reth_interfaces::{
use reth_payload_builder::{PayloadBuilderAttributes, PayloadBuilderHandle};
use reth_primitives::{BlockNumber, Header, SealedBlock, H256};
use reth_rpc_types::engine::{
EngineRpcError, ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
PayloadStatusEnum,
ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, PayloadStatusEnum,
};
use reth_stages::{stages::FINISH, Pipeline};
use reth_tasks::TaskSpawner;
@ -33,10 +32,11 @@ mod message;
pub use message::BeaconEngineMessage;
mod error;
pub use error::{BeaconEngineError, BeaconEngineResult};
pub use error::{BeaconEngineError, BeaconEngineResult, BeaconForkChoiceUpdateError};
mod metrics;
mod pipeline_state;
use crate::engine::message::OnForkChoiceUpdated;
pub use pipeline_state::PipelineState;
/// A _shareable_ beacon consensus frontend. Used to interact with the spawned beacon consensus
@ -75,19 +75,21 @@ impl BeaconConsensusEngineHandle {
&self,
state: ForkchoiceState,
payload_attrs: Option<PayloadAttributes>,
) -> BeaconEngineResult<ForkchoiceUpdated> {
self.send_fork_choice_updated(state, payload_attrs)
.await
.map_err(|_| BeaconEngineError::EngineUnavailable)?
) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
Ok(self
.send_fork_choice_updated(state, payload_attrs)
.map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable)
.await?
.await?)
}
/// Sends a forkchoice update message to the beacon consensus engine and returns the receiver to
/// wait for a response.
pub fn send_fork_choice_updated(
fn send_fork_choice_updated(
&self,
state: ForkchoiceState,
payload_attrs: Option<PayloadAttributes>,
) -> oneshot::Receiver<BeaconEngineResult<ForkchoiceUpdated>> {
) -> oneshot::Receiver<OnForkChoiceUpdated> {
let (tx, rx) = oneshot::channel();
let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
state,
@ -238,14 +240,10 @@ where
&mut self,
state: ForkchoiceState,
attrs: Option<PayloadAttributes>,
) -> Result<ForkchoiceUpdated, BeaconEngineError> {
) -> Result<OnForkChoiceUpdated, BeaconEngineError> {
trace!(target: "consensus::engine", ?state, "Received new forkchoice state");
if state.head_block_hash.is_zero() {
return Ok(ForkchoiceUpdated::new(PayloadStatus::from_status(
PayloadStatusEnum::Invalid {
validation_error: BeaconEngineError::ForkchoiceEmptyHead.to_string(),
},
)))
return Ok(OnForkChoiceUpdated::invalid_state())
}
// TODO: check PoW / EIP-3675 terminal block conditions for the fork choice head
@ -276,7 +274,7 @@ where
.expect("was canonicalized, so it exists");
if let Some(attrs) = attrs {
return self.process_payload_attributes(attrs, header, state)
return Ok(self.process_payload_attributes(attrs, header, state))
}
// TODO: most recent valid block in the branch defined by payload and its
@ -313,7 +311,7 @@ where
};
trace!(target: "consensus::engine", ?state, ?status, "Returning forkchoice status");
Ok(ForkchoiceUpdated::new(status))
Ok(OnForkChoiceUpdated::valid(status))
}
/// Validates the payload attributes with respect to the header and fork choice state.
@ -322,7 +320,7 @@ where
attrs: PayloadAttributes,
header: Header,
state: ForkchoiceState,
) -> Result<ForkchoiceUpdated, BeaconEngineError> {
) -> OnForkChoiceUpdated {
// 7. Client software MUST ensure that payloadAttributes.timestamp is
// greater than timestamp of a block referenced by
// forkchoiceState.headBlockHash. If this condition isn't held client
@ -330,11 +328,7 @@ where
// MUST NOT begin a payload build process. In such an event, the
// forkchoiceState update MUST NOT be rolled back.
if attrs.timestamp <= header.timestamp.into() {
return Ok(ForkchoiceUpdated::new(PayloadStatus::from_status(
PayloadStatusEnum::Invalid {
validation_error: EngineRpcError::InvalidPayloadAttributes.to_string(),
},
)))
return OnForkChoiceUpdated::invalid_payload_attributes()
}
// 8. Client software MUST begin a payload build process building on top of
@ -344,7 +338,7 @@ where
// building section.
let attributes = PayloadBuilderAttributes::new(header.parent_hash, attrs);
// TODO(mattsse) this needs to be handled asynchronously
let payload_id = self.payload_builder.send_new_payload(attributes);
let pending_payload_id = self.payload_builder.send_new_payload(attributes);
// Client software MUST respond to this method call in the following way:
// {
@ -357,11 +351,10 @@ where
// }
//
// if the payload is deemed VALID and the build process has begun.
Ok(ForkchoiceUpdated::new(PayloadStatus::new(
PayloadStatusEnum::Valid,
Some(state.head_block_hash),
))
.with_payload_id(payload_id))
OnForkChoiceUpdated::updated_with_pending_payload_id(
PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
pending_payload_id,
)
}
/// When the Consensus layer receives a new block via the consensus gossip protocol,
@ -522,16 +515,15 @@ where
match msg {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
this.metrics.forkchoice_updated_messages.increment(1);
let response = match this.on_forkchoice_updated(state, payload_attrs) {
let on_updated = match this.on_forkchoice_updated(state, payload_attrs) {
Ok(response) => response,
Err(error) => {
error!(target: "consensus::engine", ?state, ?error, "Error getting forkchoice updated response");
return Poll::Ready(Err(error))
}
};
let is_valid_response =
matches!(response.payload_status.status, PayloadStatusEnum::Valid);
let _ = tx.send(Ok(response));
let is_valid_response = on_updated.is_valid_update();
let _ = tx.send(on_updated);
// Terminate the sync early if it's reached the maximum user
// configured block.
@ -643,6 +635,7 @@ enum PipelineTarget {
#[cfg(test)]
mod tests {
use super::*;
use crate::engine::error::BeaconForkChoiceUpdateError;
use assert_matches::assert_matches;
use reth_blockchain_tree::{
blockchain_tree::{
@ -713,7 +706,7 @@ mod tests {
async fn send_forkchoice_updated(
&self,
state: ForkchoiceState,
) -> BeaconEngineResult<ForkchoiceUpdated> {
) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
self.engine_handle.fork_choice_updated(state, None).await
}
@ -722,7 +715,7 @@ mod tests {
async fn send_forkchoice_retry_on_syncing(
&self,
state: ForkchoiceState,
) -> BeaconEngineResult<ForkchoiceUpdated> {
) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
loop {
let result = self.engine_handle.fork_choice_updated(state, None).await?;
if !result.is_syncing() {
@ -926,6 +919,7 @@ mod tests {
mod fork_choice_updated {
use super::*;
use reth_interfaces::test_utils::generators::random_block;
use reth_rpc_types::engine::ForkchoiceUpdateError;
#[tokio::test]
async fn empty_head() {
@ -945,10 +939,12 @@ mod tests {
let mut engine_rx = spawn_consensus_engine(consensus_engine);
let res = env.send_forkchoice_updated(ForkchoiceState::default()).await;
let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Invalid {
validation_error: BeaconEngineError::ForkchoiceEmptyHead.to_string(),
});
assert_matches!(res, Ok(result) => assert_eq!(result, expected_result));
assert_eq!(
res,
Err(BeaconForkChoiceUpdateError::ForkchoiceUpdateError(
ForkchoiceUpdateError::InvalidState
))
);
assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty));
}

View File

@ -61,12 +61,15 @@ impl PayloadBuilderHandle {
/// Sends a message to the service to start building a new payload for the given payload.
///
/// This is the same as [PayloadBuilderHandle::new_payload] but does not wait for the result.
pub fn send_new_payload(&self, attr: PayloadBuilderAttributes) -> PayloadId {
let id = attr.payload_id();
let (tx, _) = oneshot::channel();
/// This is the same as [PayloadBuilderHandle::new_payload] but does not wait for the result and
/// returns the receiver instead
pub fn send_new_payload(
&self,
attr: PayloadBuilderAttributes,
) -> oneshot::Receiver<Result<PayloadId, PayloadBuilderError>> {
let (tx, rx) = oneshot::channel();
let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, tx));
id
rx
}
/// Starts building a new payload for the given payload attributes.
@ -78,9 +81,7 @@ impl PayloadBuilderHandle {
&self,
attr: PayloadBuilderAttributes,
) -> Result<PayloadId, PayloadBuilderError> {
let (tx, rx) = oneshot::channel();
let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, tx));
rx.await?
self.send_new_payload(attr).await?
}
}

View File

@ -1,5 +1,5 @@
use jsonrpsee_types::error::{INTERNAL_ERROR_CODE, INVALID_PARAMS_CODE};
use reth_beacon_consensus::BeaconEngineError;
use reth_beacon_consensus::{BeaconEngineError, BeaconForkChoiceUpdateError};
use reth_primitives::{H256, U256};
use thiserror::Error;
use tokio::sync::{mpsc, oneshot};
@ -66,6 +66,9 @@ pub enum EngineApiError {
/// Beacon consensus engine error.
#[error(transparent)]
ConsensusEngine(#[from] BeaconEngineError),
/// An error occurred while processing the fork choice update.
#[error(transparent)]
ForkChoiceUpdate(#[from] BeaconForkChoiceUpdateError),
/// Encountered an internal error.
#[error(transparent)]
Internal(Box<dyn std::error::Error + Send + Sync>),

View File

@ -3,6 +3,21 @@ use crate::engine::PayloadId;
use reth_primitives::H256;
use serde::{Deserialize, Serialize};
/// invalid forkchoice state error code.
pub const INVALID_FORK_CHOICE_STATE_ERROR: i32 = -38002;
/// invalid payload attributes error code.
pub const INVALID_PAYLOAD_ATTRIBUTES_ERROR: i32 = -38003;
/// invalid forkchoice state error message.
pub const INVALID_FORK_CHOICE_STATE_ERROR_MSG: &str = "Invalid forkchoice state";
/// invalid payload attributes error message.
pub const INVALID_PAYLOAD_ATTRIBUTES_ERROR_MSG: &str = "Invalid payload attributes";
/// Represents possible variants of a processed forkchoice update.
pub type ForkChoiceUpdateResult = Result<ForkchoiceUpdated, ForkchoiceUpdateError>;
/// This structure encapsulates the fork choice state
#[derive(Default, Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
@ -12,10 +27,49 @@ pub struct ForkchoiceState {
pub finalized_block_hash: H256,
}
/// A standalone forkchoice update result for RPC.
#[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
pub enum ForkchoiceUpdateError {
/// The forkchoice update has been processed, but the requested contained invalid
/// [PayloadAttributes](crate::engine::PayloadAttributes).
///
/// This is returned as an error because the payload attributes are invalid and the payload is not valid, See <https://github.com/ethereum/execution-apis/blob/6709c2a795b707202e93c4f2867fa0bf2640a84f/src/engine/paris.md#engine_forkchoiceupdatedv1>
#[error("Invalid payload attributes")]
UpdatedInvalidPayloadAttributes,
/// The given [ForkchoiceState] is invalid or inconsistent.
#[error("Invalid forkchoice state")]
InvalidState,
}
impl From<ForkchoiceUpdateError> for jsonrpsee_types::error::ErrorObject<'static> {
fn from(value: ForkchoiceUpdateError) -> Self {
match value {
ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes => {
jsonrpsee_types::error::ErrorObject::owned(
INVALID_PAYLOAD_ATTRIBUTES_ERROR,
INVALID_PAYLOAD_ATTRIBUTES_ERROR_MSG,
None::<()>,
)
}
ForkchoiceUpdateError::InvalidState => jsonrpsee_types::error::ErrorObject::owned(
INVALID_FORK_CHOICE_STATE_ERROR,
INVALID_FORK_CHOICE_STATE_ERROR_MSG,
None::<()>,
),
}
}
}
/// Represents a successfully _processed_ forkchoice state update.
///
/// Note: this can still be INVALID if the provided payload was invalid.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ForkchoiceUpdated {
/// Represents the outcome of the validation of the payload, independently of the payload being
/// valid or not.
pub payload_status: PayloadStatus,
/// The identifier of the payload build process that was successfully initiated.
pub payload_id: Option<PayloadId>,
}