refactor: simplify engine Api (#2240)

This commit is contained in:
Matthias Seitz
2023-04-14 18:59:18 +02:00
committed by GitHub
parent 3779a225fb
commit 08eae76bec
16 changed files with 405 additions and 554 deletions

View File

@ -12,19 +12,22 @@ reth-primitives = { path = "../../primitives" }
reth-interfaces = { path = "../../interfaces" }
reth-provider = { path = "../../storage/provider" }
reth-rpc-types = { path = "../rpc-types" }
reth-rpc-api = { path = "../rpc-api" }
reth-beacon-consensus = { path = "../../consensus/beacon" }
reth-payload-builder = { path = "../../payload/builder" }
# async
futures = "0.3"
tokio = { version = "1", features = ["sync"] }
tokio-stream = "0.1"
# misc
async-trait = "0.1"
thiserror = "1.0.37"
jsonrpsee-types = "0.16"
jsonrpsee-core = "0.16"
tracing = "0.1"
[dev-dependencies]
reth-interfaces = { path = "../../interfaces", features = ["test-utils"] }
reth-provider = { path = "../../storage/provider", features = ["test-utils"] }
reth-payload-builder = { path = "../../payload/builder", features = ["test-utils"] }
assert_matches = "1.5.0"

View File

@ -1,23 +1,19 @@
use crate::{EngineApiError, EngineApiMessage, EngineApiResult};
use futures::StreamExt;
use crate::{EngineApiError, EngineApiMessageVersion, EngineApiResult};
use async_trait::async_trait;
use jsonrpsee_core::RpcResult as Result;
use reth_beacon_consensus::BeaconEngineMessage;
use reth_primitives::{BlockHash, BlockId, BlockNumber, ChainSpec, Hardfork};
use reth_interfaces::consensus::ForkchoiceState;
use reth_payload_builder::PayloadStore;
use reth_primitives::{BlockHash, BlockId, BlockNumber, ChainSpec, Hardfork, U64};
use reth_provider::{BlockProvider, EvmEnvProvider, HeaderProvider, StateProviderFactory};
use reth_rpc_types::engine::{ExecutionPayloadBodies, TransitionConfiguration};
use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
use reth_rpc_api::EngineApiServer;
use reth_rpc_types::engine::{
ExecutionPayload, ExecutionPayloadBodies, ExecutionPayloadEnvelope, ForkchoiceUpdated,
PayloadAttributes, PayloadId, PayloadStatus, TransitionConfiguration, CAPABILITIES,
};
use tokio::sync::{
mpsc::{self, UnboundedSender},
oneshot,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
/// The Engine API handle.
pub type EngineApiHandle = mpsc::UnboundedSender<EngineApiMessage>;
use std::sync::Arc;
use tokio::sync::{mpsc::UnboundedSender, oneshot};
use tracing::trace;
/// The Engine API response sender.
pub type EngineApiSender<Ok> = oneshot::Sender<EngineApiResult<Ok>>;
@ -27,56 +23,119 @@ const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024;
/// The Engine API implementation that grants the Consensus layer access to data and
/// functions in the Execution layer that are crucial for the consensus process.
#[must_use = "EngineApi does nothing unless polled."]
pub struct EngineApi<Client> {
/// The client to interact with the chain.
client: Client,
/// Consensus configuration
chain_spec: Arc<ChainSpec>,
message_rx: UnboundedReceiverStream<EngineApiMessage>,
engine_tx: UnboundedSender<BeaconEngineMessage>,
/// The channel to send messages to the beacon consensus engine.
to_beacon_consensus: UnboundedSender<BeaconEngineMessage>,
/// The type that can communicate with the payload service to retrieve payloads.
payload_store: PayloadStore,
}
impl<Client: HeaderProvider + BlockProvider + StateProviderFactory + EvmEnvProvider>
EngineApi<Client>
impl<Client> EngineApi<Client>
where
Client: HeaderProvider + BlockProvider + StateProviderFactory + EvmEnvProvider + 'static,
{
/// Create new instance of [EngineApi].
pub fn new(
client: Client,
chain_spec: Arc<ChainSpec>,
message_rx: mpsc::UnboundedReceiver<EngineApiMessage>,
engine_tx: UnboundedSender<BeaconEngineMessage>,
to_beacon_consensus: UnboundedSender<BeaconEngineMessage>,
payload_store: PayloadStore,
) -> Self {
Self { client, chain_spec, message_rx: UnboundedReceiverStream::new(message_rx), engine_tx }
Self { client, chain_spec, to_beacon_consensus, payload_store }
}
fn on_message(&mut self, msg: EngineApiMessage) {
match msg {
EngineApiMessage::ExchangeTransitionConfiguration(config, tx) => {
let _ = tx.send(self.exchange_transition_configuration(config));
}
EngineApiMessage::GetPayload(payload_id, tx) => {
// forward message to the consensus engine
let _ = self.engine_tx.send(BeaconEngineMessage::GetPayload { payload_id, tx });
}
EngineApiMessage::GetPayloadBodiesByHash(hashes, tx) => {
let _ = tx.send(self.get_payload_bodies_by_hash(hashes));
}
EngineApiMessage::GetPayloadBodiesByRange(start, count, tx) => {
let _ = tx.send(self.get_payload_bodies_by_range(start, count));
}
EngineApiMessage::NewPayload(payload, tx) => {
// forward message to the consensus engine
let _ = self.engine_tx.send(BeaconEngineMessage::NewPayload { payload, tx });
}
EngineApiMessage::ForkchoiceUpdated(state, payload_attrs, tx) => {
// forward message to the consensus engine
let _ = self.engine_tx.send(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx,
});
}
/// See also <https://github.com/ethereum/execution-apis/blob/8db51dcd2f4bdfbd9ad6e4a7560aac97010ad063/src/engine/specification.md#engine_newpayloadv1>
/// Caution: This should not accept the `withdrawals` field
pub async fn new_payload_v1(
&self,
payload: ExecutionPayload,
) -> EngineApiResult<PayloadStatus> {
self.validate_withdrawals_presence(
EngineApiMessageVersion::V1,
payload.timestamp.as_u64(),
payload.withdrawals.is_some(),
)?;
let (tx, rx) = oneshot::channel();
self.to_beacon_consensus.send(BeaconEngineMessage::NewPayload { payload, tx })?;
Ok(rx.await??)
}
/// See also <https://github.com/ethereum/execution-apis/blob/8db51dcd2f4bdfbd9ad6e4a7560aac97010ad063/src/engine/specification.md#engine_newpayloadv1>
pub async fn new_payload_v2(
&self,
payload: ExecutionPayload,
) -> EngineApiResult<PayloadStatus> {
self.validate_withdrawals_presence(
EngineApiMessageVersion::V2,
payload.timestamp.as_u64(),
payload.withdrawals.is_some(),
)?;
let (tx, rx) = oneshot::channel();
self.to_beacon_consensus.send(BeaconEngineMessage::NewPayload { payload, tx })?;
Ok(rx.await??)
}
/// See also <https://github.com/ethereum/execution-apis/blob/8db51dcd2f4bdfbd9ad6e4a7560aac97010ad063/src/engine/specification.md#engine_forkchoiceUpdatedV1>
///
/// Caution: This should not accept the `withdrawals` field
pub async fn fork_choice_updated_v1(
&self,
state: ForkchoiceState,
payload_attrs: Option<PayloadAttributes>,
) -> EngineApiResult<ForkchoiceUpdated> {
if let Some(ref attrs) = payload_attrs {
self.validate_withdrawals_presence(
EngineApiMessageVersion::V1,
attrs.timestamp.as_u64(),
attrs.withdrawals.is_some(),
)?;
}
let (tx, rx) = oneshot::channel();
self.to_beacon_consensus.send(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx,
})?;
Ok(rx.await??)
}
/// Returns the most recent version of the payload that is available in the corresponding
/// payload build process at the time of receiving this call.
///
/// See also <https://github.com/ethereum/execution-apis/blob/8db51dcd2f4bdfbd9ad6e4a7560aac97010ad063/src/engine/specification.md#engine_getPayloadV1>
///
/// Caution: This should not return the `withdrawals` field
///
/// Note:
/// > Client software MAY stop the corresponding build process after serving this call.
pub async fn get_payload_v1(&self, payload_id: PayloadId) -> EngineApiResult<ExecutionPayload> {
self.payload_store
.get_payload(payload_id)
.await
.map(|payload| (*payload).clone().into_v1_payload())
.ok_or(EngineApiError::UnknownPayload)
}
/// Returns the most recent version of the payload that is available in the corresponding
/// payload build process at the time of receiving this call.
///
/// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/specification.md#engine_getpayloadv2>
///
/// Note:
/// > Client software MAY stop the corresponding build process after serving this call.
async fn get_payload_v2(
&self,
payload_id: PayloadId,
) -> EngineApiResult<ExecutionPayloadEnvelope> {
self.payload_store
.get_payload(payload_id)
.await
.map(|payload| (*payload).clone().into_v2_payload())
.ok_or(EngineApiError::UnknownPayload)
}
/// Called to retrieve execution payload bodies by range.
@ -180,25 +239,156 @@ impl<Client: HeaderProvider + BlockProvider + StateProviderFactory + EvmEnvProvi
}),
}
}
}
impl<Client> Future for EngineApi<Client>
where
Client: HeaderProvider + BlockProvider + StateProviderFactory + EvmEnvProvider + Unpin,
{
type Output = ();
/// Validates the presence of the `withdrawals` field according to the payload timestamp.
/// After Shanghai, withdrawals field must be [Some].
/// Before Shanghai, withdrawals field must be [None];
fn validate_withdrawals_presence(
&self,
version: EngineApiMessageVersion,
timestamp: u64,
has_withdrawals: bool,
) -> EngineApiResult<()> {
let is_shanghai = self.chain_spec.fork(Hardfork::Shanghai).active_at_timestamp(timestamp);
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
loop {
match ready!(this.message_rx.poll_next_unpin(cx)) {
Some(msg) => this.on_message(msg),
None => {
// channel closed
return Poll::Ready(())
match version {
EngineApiMessageVersion::V1 => {
if has_withdrawals {
return Err(EngineApiError::WithdrawalsNotSupportedInV1)
}
if is_shanghai {
return Err(EngineApiError::NoWithdrawalsPostShanghai)
}
}
}
EngineApiMessageVersion::V2 => {
if is_shanghai && !has_withdrawals {
return Err(EngineApiError::NoWithdrawalsPostShanghai)
}
if !is_shanghai && has_withdrawals {
return Err(EngineApiError::HasWithdrawalsPreShanghai)
}
}
};
Ok(())
}
}
#[async_trait]
impl<Client> EngineApiServer for EngineApi<Client>
where
Client: HeaderProvider + BlockProvider + StateProviderFactory + EvmEnvProvider + 'static,
{
/// Handler for `engine_newPayloadV1`
/// See also <https://github.com/ethereum/execution-apis/blob/8db51dcd2f4bdfbd9ad6e4a7560aac97010ad063/src/engine/specification.md#engine_newpayloadv1>
/// Caution: This should not accept the `withdrawals` field
async fn new_payload_v1(&self, payload: ExecutionPayload) -> Result<PayloadStatus> {
trace!(target: "rpc::eth", "Serving engine_newPayloadV1");
Ok(EngineApi::new_payload_v1(self, payload).await?)
}
/// Handler for `engine_newPayloadV1`
/// See also <https://github.com/ethereum/execution-apis/blob/8db51dcd2f4bdfbd9ad6e4a7560aac97010ad063/src/engine/specification.md#engine_newpayloadv1>
async fn new_payload_v2(&self, payload: ExecutionPayload) -> Result<PayloadStatus> {
trace!(target: "rpc::eth", "Serving engine_newPayloadV1");
Ok(EngineApi::new_payload_v2(self, payload).await?)
}
/// Handler for `engine_forkchoiceUpdatedV1`
/// See also <https://github.com/ethereum/execution-apis/blob/8db51dcd2f4bdfbd9ad6e4a7560aac97010ad063/src/engine/specification.md#engine_forkchoiceUpdatedV1>
///
/// Caution: This should not accept the `withdrawals` field
async fn fork_choice_updated_v1(
&self,
fork_choice_state: ForkchoiceState,
payload_attributes: Option<PayloadAttributes>,
) -> Result<ForkchoiceUpdated> {
trace!(target: "rpc::eth", "Serving engine_forkchoiceUpdatedV1");
Ok(EngineApi::fork_choice_updated_v1(self, fork_choice_state, payload_attributes).await?)
}
/// Handler for `engine_forkchoiceUpdatedV2`
/// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/specification.md#engine_forkchoiceupdatedv2>
async fn fork_choice_updated_v2(
&self,
fork_choice_state: ForkchoiceState,
payload_attributes: Option<PayloadAttributes>,
) -> Result<ForkchoiceUpdated> {
trace!(target: "rpc::eth", "Serving engine_forkchoiceUpdatedV2");
Ok(EngineApi::fork_choice_updated_v2(self, fork_choice_state, payload_attributes).await?)
}
/// Handler for `engine_getPayloadV1`
///
/// Returns the most recent version of the payload that is available in the corresponding
/// payload build process at the time of receiving this call.
///
/// See also <https://github.com/ethereum/execution-apis/blob/8db51dcd2f4bdfbd9ad6e4a7560aac97010ad063/src/engine/specification.md#engine_getPayloadV1>
///
/// Caution: This should not return the `withdrawals` field
///
/// Note:
/// > Client software MAY stop the corresponding build process after serving this call.
async fn get_payload_v1(&self, payload_id: PayloadId) -> Result<ExecutionPayload> {
trace!(target: "rpc::eth", "Serving engine_getPayloadV1");
Ok(EngineApi::get_payload_v1(self, payload_id).await?)
}
/// Handler for `engine_getPayloadV2`
///
/// Returns the most recent version of the payload that is available in the corresponding
/// payload build process at the time of receiving this call.
///
/// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/specification.md#engine_getpayloadv2>
///
/// Note:
/// > Client software MAY stop the corresponding build process after serving this call.
async fn get_payload_v2(&self, payload_id: PayloadId) -> Result<ExecutionPayloadEnvelope> {
trace!(target: "rpc::eth", "Serving engine_getPayloadV2");
Ok(EngineApi::get_payload_v2(self, payload_id).await?)
}
/// Handler for `engine_getPayloadBodiesByHashV1`
/// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/shanghai.md#engine_getpayloadbodiesbyhashv1>
async fn get_payload_bodies_by_hash_v1(
&self,
block_hashes: Vec<BlockHash>,
) -> Result<ExecutionPayloadBodies> {
trace!(target: "rpc::eth", "Serving engine_getPayloadBodiesByHashV1");
Ok(EngineApi::get_payload_bodies_by_hash(self, block_hashes)?)
}
/// Handler for `engine_getPayloadBodiesByRangeV1`
/// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/shanghai.md#engine_getpayloadbodiesbyrangev1>
async fn get_payload_bodies_by_range_v1(
&self,
start: U64,
count: U64,
) -> Result<ExecutionPayloadBodies> {
trace!(target: "rpc::eth", "Serving engine_getPayloadBodiesByHashV1");
Ok(EngineApi::get_payload_bodies_by_range(self, start.as_u64(), count.as_u64())?)
}
/// Handler for `engine_exchangeTransitionConfigurationV1`
/// See also <https://github.com/ethereum/execution-apis/blob/8db51dcd2f4bdfbd9ad6e4a7560aac97010ad063/src/engine/specification.md#engine_exchangeTransitionConfigurationV1>
async fn exchange_transition_configuration(
&self,
config: TransitionConfiguration,
) -> Result<TransitionConfiguration> {
trace!(target: "rpc::eth", "Serving engine_getPayloadBodiesByHashV1");
Ok(EngineApi::exchange_transition_configuration(self, config)?)
}
/// Handler for `engine_exchangeCapabilitiesV1`
/// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/common.md#capabilities>
async fn exchange_capabilities(&self, _capabilities: Vec<String>) -> Result<Vec<String>> {
Ok(CAPABILITIES.into_iter().map(str::to_owned).collect())
}
}
impl<Client> std::fmt::Debug for EngineApi<Client> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EngineApi").finish_non_exhaustive()
}
}
@ -206,62 +396,42 @@ where
mod tests {
use super::*;
use assert_matches::assert_matches;
use reth_interfaces::{consensus::ForkchoiceState, test_utils::generators::random_block};
use reth_interfaces::test_utils::generators::random_block;
use reth_payload_builder::test_utils::spawn_test_payload_service;
use reth_primitives::{SealedBlock, H256, MAINNET};
use reth_provider::test_utils::MockEthProvider;
use std::sync::Arc;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
fn setup_engine_api() -> (EngineApiTestHandle, EngineApi<Arc<MockEthProvider>>) {
let chain_spec = Arc::new(MAINNET.clone());
let client = Arc::new(MockEthProvider::default());
let (msg_tx, msg_rx) = unbounded_channel();
let (engine_tx, engine_rx) = mpsc::unbounded_channel();
let api = EngineApi {
client: client.clone(),
chain_spec: chain_spec.clone(),
message_rx: UnboundedReceiverStream::new(msg_rx),
engine_tx,
};
let handle = EngineApiTestHandle { chain_spec, client, msg_tx, engine_rx };
let payload_store = spawn_test_payload_service();
let (to_beacon_consensus, engine_rx) = unbounded_channel();
let api = EngineApi::new(
client.clone(),
chain_spec.clone(),
to_beacon_consensus,
payload_store.into(),
);
let handle = EngineApiTestHandle { chain_spec, client, from_api: engine_rx };
(handle, api)
}
struct EngineApiTestHandle {
chain_spec: Arc<ChainSpec>,
client: Arc<MockEthProvider>,
msg_tx: UnboundedSender<EngineApiMessage>,
engine_rx: UnboundedReceiver<BeaconEngineMessage>,
}
impl EngineApiTestHandle {
fn send_message(&self, msg: EngineApiMessage) {
self.msg_tx.send(msg).expect("failed to send engine msg");
}
from_api: UnboundedReceiver<BeaconEngineMessage>,
}
#[tokio::test]
async fn forwards_responses_to_consensus_engine() {
let (mut handle, api) = setup_engine_api();
tokio::spawn(api);
let (result_tx, _result_rx) = oneshot::channel();
handle.send_message(EngineApiMessage::NewPayload(SealedBlock::default().into(), result_tx));
assert_matches!(
handle.engine_rx.recv().await,
Some(BeaconEngineMessage::NewPayload { .. })
);
let (result_tx, _result_rx) = oneshot::channel();
handle.send_message(EngineApiMessage::ForkchoiceUpdated(
ForkchoiceState::default(),
None,
result_tx,
));
assert_matches!(
handle.engine_rx.recv().await,
Some(BeaconEngineMessage::ForkchoiceUpdated { .. })
);
tokio::spawn(async move {
api.new_payload_v1(SealedBlock::default().into()).await.unwrap();
});
assert_matches!(handle.from_api.recv().await, Some(BeaconEngineMessage::NewPayload { .. }));
}
// tests covering `engine_getPayloadBodiesByRange` and `engine_getPayloadBodiesByHash`
@ -271,8 +441,7 @@ mod tests {
#[tokio::test]
async fn invalid_params() {
let (handle, api) = setup_engine_api();
tokio::spawn(api);
let (_, api) = setup_engine_api();
let by_range_tests = [
// (start, count)
@ -283,45 +452,23 @@ mod tests {
// test [EngineApiMessage::GetPayloadBodiesByRange]
for (start, count) in by_range_tests {
let (result_tx, result_rx) = oneshot::channel();
handle.send_message(EngineApiMessage::GetPayloadBodiesByRange(
start, count, result_tx,
));
assert_matches!(
result_rx.await,
Ok(Err(EngineApiError::InvalidBodiesRange { .. }))
);
let res = api.get_payload_bodies_by_range(start, count);
assert_matches!(res, Err(EngineApiError::InvalidBodiesRange { .. }));
}
}
#[tokio::test]
async fn request_too_large() {
let (handle, api) = setup_engine_api();
tokio::spawn(api);
let (_, api) = setup_engine_api();
let request_count = MAX_PAYLOAD_BODIES_LIMIT + 1;
let (result_tx, result_rx) = oneshot::channel();
handle.send_message(EngineApiMessage::GetPayloadBodiesByRange(
0,
request_count,
result_tx,
));
assert_matches!(
result_rx.await,
Ok(Err(EngineApiError::PayloadRequestTooLarge { .. }))
);
let (result_tx, result_rx) = oneshot::channel();
let hashes = std::iter::repeat(H256::default()).take(request_count as usize).collect();
handle.send_message(EngineApiMessage::GetPayloadBodiesByHash(hashes, result_tx));
assert_matches!(result_rx.await, Ok(Err(EngineApiError::PayloadRequestTooLarge { .. })))
let res = api.get_payload_bodies_by_range(0, request_count);
assert_matches!(res, Err(EngineApiError::PayloadRequestTooLarge { .. }));
}
#[tokio::test]
async fn returns_payload_bodies() {
let (handle, api) = setup_engine_api();
tokio::spawn(api);
let (start, count) = (1, 10);
let blocks = random_block_range(start..start + count, H256::default(), 0..2);
@ -330,20 +477,13 @@ mod tests {
let expected =
blocks.iter().cloned().map(|b| Some(b.unseal().into())).collect::<Vec<_>>();
let (result_tx, result_rx) = oneshot::channel();
handle.send_message(EngineApiMessage::GetPayloadBodiesByRange(start, count, result_tx));
assert_matches!(result_rx.await, Ok(Ok(result)) => assert_eq!(result, expected));
let (result_tx, result_rx) = oneshot::channel();
let hashes = blocks.iter().map(|b| b.hash()).collect();
handle.send_message(EngineApiMessage::GetPayloadBodiesByHash(hashes, result_tx));
assert_matches!(result_rx.await, Ok(Ok(result)) => assert_eq!(result, expected));
let res = api.get_payload_bodies_by_range(start, count).unwrap();
assert_eq!(res, expected);
}
#[tokio::test]
async fn returns_payload_bodies_with_gaps() {
let (handle, api) = setup_engine_api();
tokio::spawn(api);
let (start, count) = (1, 100);
let blocks = random_block_range(start..start + count, H256::default(), 0..2);
@ -375,14 +515,12 @@ mod tests {
})
.collect::<Vec<_>>();
let (result_tx, result_rx) = oneshot::channel();
handle.send_message(EngineApiMessage::GetPayloadBodiesByRange(start, count, result_tx));
assert_matches!(result_rx.await, Ok(Ok(result)) => assert_eq!(result, expected));
let res = api.get_payload_bodies_by_range(start, count).unwrap();
assert_eq!(res, expected);
let (result_tx, result_rx) = oneshot::channel();
let hashes = blocks.iter().map(|b| b.hash()).collect();
handle.send_message(EngineApiMessage::GetPayloadBodiesByHash(hashes, result_tx));
assert_matches!(result_rx.await, Ok(Ok(result)) => assert_eq!(result, expected));
let res = api.get_payload_bodies_by_hash(hashes).unwrap();
assert_eq!(res, expected);
}
}
@ -394,7 +532,6 @@ mod tests {
#[tokio::test]
async fn terminal_td_mismatch() {
let (handle, api) = setup_engine_api();
tokio::spawn(api);
let transition_config = TransitionConfiguration {
terminal_total_difficulty: handle.chain_spec.fork(Hardfork::Paris).ttd().unwrap() +
@ -402,15 +539,11 @@ mod tests {
..Default::default()
};
let (result_tx, result_rx) = oneshot::channel();
handle.send_message(EngineApiMessage::ExchangeTransitionConfiguration(
transition_config.clone(),
result_tx,
));
let res = api.exchange_transition_configuration(transition_config.clone());
assert_matches!(
result_rx.await,
Ok(Err(EngineApiError::TerminalTD { execution, consensus }))
res,
Err(EngineApiError::TerminalTD { execution, consensus })
if execution == handle.chain_spec.fork(Hardfork::Paris).ttd().unwrap() && consensus == U256::from(transition_config.terminal_total_difficulty)
);
}
@ -418,7 +551,6 @@ mod tests {
#[tokio::test]
async fn terminal_block_hash_mismatch() {
let (handle, api) = setup_engine_api();
tokio::spawn(api);
let terminal_block_number = 1000;
let consensus_terminal_block = random_block(terminal_block_number, None, None, None);
@ -431,14 +563,11 @@ mod tests {
};
// Unknown block number
let (result_tx, result_rx) = oneshot::channel();
handle.send_message(EngineApiMessage::ExchangeTransitionConfiguration(
transition_config.clone(),
result_tx,
));
let res = api.exchange_transition_configuration(transition_config.clone());
assert_matches!(
result_rx.await,
Ok(Err(EngineApiError::TerminalBlockHash { execution, consensus }))
res,
Err(EngineApiError::TerminalBlockHash { execution, consensus })
if execution.is_none() && consensus == transition_config.terminal_block_hash
);
@ -448,15 +577,11 @@ mod tests {
execution_terminal_block.clone().unseal(),
);
let (result_tx, result_rx) = oneshot::channel();
handle.send_message(EngineApiMessage::ExchangeTransitionConfiguration(
transition_config.clone(),
result_tx,
));
let res = api.exchange_transition_configuration(transition_config.clone());
assert_matches!(
result_rx.await,
Ok(Err(EngineApiError::TerminalBlockHash { execution, consensus }))
res,
Err(EngineApiError::TerminalBlockHash { execution, consensus })
if execution == Some(execution_terminal_block.hash()) && consensus == transition_config.terminal_block_hash
);
}
@ -464,7 +589,6 @@ mod tests {
#[tokio::test]
async fn configurations_match() {
let (handle, api) = setup_engine_api();
tokio::spawn(api);
let terminal_block_number = 1000;
let terminal_block = random_block(terminal_block_number, None, None, None);
@ -475,15 +599,10 @@ mod tests {
terminal_block_number: terminal_block_number.into(),
};
handle.client.add_block(terminal_block.hash(), terminal_block.clone().unseal());
handle.client.add_block(terminal_block.hash(), terminal_block.unseal());
let (result_tx, result_rx) = oneshot::channel();
handle.send_message(EngineApiMessage::ExchangeTransitionConfiguration(
transition_config.clone(),
result_tx,
));
assert_matches!(result_rx.await, Ok(Ok(config)) => assert_eq!(config, transition_config));
let config = api.exchange_transition_configuration(transition_config.clone()).unwrap();
assert_eq!(config, transition_config);
}
}
}

View File

@ -2,6 +2,7 @@ use jsonrpsee_types::error::{INTERNAL_ERROR_CODE, INVALID_PARAMS_CODE};
use reth_beacon_consensus::BeaconEngineError;
use reth_primitives::{H256, U256};
use thiserror::Error;
use tokio::sync::{mpsc, oneshot};
/// The Engine API result type
pub type EngineApiResult<Ok> = Result<Ok, EngineApiError>;
@ -12,11 +13,13 @@ pub const UNKNOWN_PAYLOAD_CODE: i32 = -38001;
pub const REQUEST_TOO_LARGE_CODE: i32 = -38004;
/// Error returned by [`EngineApi`][crate::EngineApi]
///
/// Note: This is a high fidelity error type which can be converted to an RPC error that adheres to the spec: <https://github.com/ethereum/execution-apis/blob/main/src/engine/common.md#errors>
#[derive(Error, Debug)]
pub enum EngineApiError {
/// Unknown payload requested.
#[error("Unknown payload")]
PayloadUnknown,
UnknownPayload,
/// The payload body request length is too large.
#[error("Payload request too large: {len}")]
PayloadRequestTooLarge {
@ -66,6 +69,21 @@ pub enum EngineApiError {
/// Encountered an internal error.
#[error(transparent)]
Internal(Box<dyn std::error::Error + Send + Sync>),
/// Failed to send message due ot closed channel
#[error("Closed channel")]
ChannelClosed,
}
impl<T> From<mpsc::error::SendError<T>> for EngineApiError {
fn from(_: mpsc::error::SendError<T>) -> Self {
EngineApiError::ChannelClosed
}
}
impl From<oneshot::error::RecvError> for EngineApiError {
fn from(_: oneshot::error::RecvError) -> Self {
EngineApiError::ChannelClosed
}
}
impl From<EngineApiError> for jsonrpsee_types::error::CallError {
@ -75,13 +93,14 @@ impl From<EngineApiError> for jsonrpsee_types::error::CallError {
EngineApiError::WithdrawalsNotSupportedInV1 |
EngineApiError::NoWithdrawalsPostShanghai |
EngineApiError::HasWithdrawalsPreShanghai => INVALID_PARAMS_CODE,
EngineApiError::PayloadUnknown => UNKNOWN_PAYLOAD_CODE,
EngineApiError::UnknownPayload => UNKNOWN_PAYLOAD_CODE,
EngineApiError::PayloadRequestTooLarge { .. } => REQUEST_TOO_LARGE_CODE,
// Any other server error
_ => INTERNAL_ERROR_CODE,
};
jsonrpsee_types::error::CallError::Custom(jsonrpsee_types::error::ErrorObject::owned(
code,
// TODO properly convert to rpc error
error.to_string(),
None::<()>,
))

View File

@ -17,6 +17,6 @@ mod message;
/// Engine API error.
mod error;
pub use engine_api::{EngineApi, EngineApiHandle, EngineApiSender};
pub use engine_api::{EngineApi, EngineApiSender};
pub use error::*;
pub use message::{EngineApiMessage, EngineApiMessageVersion};
pub use message::EngineApiMessageVersion;

View File

@ -1,36 +1,3 @@
use crate::EngineApiSender;
use reth_beacon_consensus::BeaconEngineSender;
use reth_interfaces::consensus::ForkchoiceState;
use reth_primitives::{BlockHash, BlockNumber};
use reth_rpc_types::engine::{
ExecutionPayload, ExecutionPayloadBodies, ExecutionPayloadEnvelope, ForkchoiceUpdated,
PayloadAttributes, PayloadId, PayloadStatus, TransitionConfiguration,
};
/// Message type for communicating with [`EngineApi`][crate::EngineApi].
#[derive(Debug)]
pub enum EngineApiMessage {
/// Get payload message
GetPayload(PayloadId, BeaconEngineSender<ExecutionPayloadEnvelope>),
/// Get payload bodies by range message
GetPayloadBodiesByRange(BlockNumber, u64, EngineApiSender<ExecutionPayloadBodies>),
/// Get payload bodies by hash message
GetPayloadBodiesByHash(Vec<BlockHash>, EngineApiSender<ExecutionPayloadBodies>),
/// Exchange transition configuration message
ExchangeTransitionConfiguration(
TransitionConfiguration,
EngineApiSender<TransitionConfiguration>,
),
/// New payload message
NewPayload(ExecutionPayload, BeaconEngineSender<PayloadStatus>),
/// Forkchoice updated message
ForkchoiceUpdated(
ForkchoiceState,
Option<PayloadAttributes>,
BeaconEngineSender<ForkchoiceUpdated>,
),
}
/// The version of Engine API message.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EngineApiMessageVersion {