perf: handle engine API range request in a new task (#3685)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Jay Miller
2023-07-11 10:07:13 -04:00
committed by GitHub
parent 2effa942e1
commit 1763b5ea7a
5 changed files with 70 additions and 29 deletions

1
Cargo.lock generated
View File

@ -5736,6 +5736,7 @@ dependencies = [
"reth-provider",
"reth-rpc-api",
"reth-rpc-types",
"reth-tasks",
"thiserror",
"tokio",
"tracing",

View File

@ -400,6 +400,7 @@ impl Command {
self.chain.clone(),
beacon_engine_handle,
payload_builder.into(),
Box::new(ctx.task_executor.clone()),
);
info!(target: "reth::cli", "Engine API handler initialized");

View File

@ -30,6 +30,7 @@ pub async fn launch_auth(secret: JwtSecret) -> AuthServerHandle {
MAINNET.clone(),
beacon_engine_handle,
spawn_test_payload_service().into(),
Box::new(TokioTaskExecutor::default()),
);
let module = AuthRpcModule::new(engine_api);
module.start_server(config).await.unwrap()

View File

@ -17,6 +17,7 @@ reth-rpc-types = { workspace = true }
reth-rpc-api = { path = "../rpc-api" }
reth-beacon-consensus = { path = "../../consensus/beacon" }
reth-payload-builder = { workspace = true }
reth-tasks = { workspace = true }
# async
tokio = { workspace = true, features = ["sync"] }

View File

@ -11,6 +11,7 @@ use reth_rpc_types::engine::{
ExecutionPayload, ExecutionPayloadBodies, ExecutionPayloadEnvelope, ForkchoiceUpdated,
PayloadAttributes, PayloadId, PayloadStatus, TransitionConfiguration, CAPABILITIES,
};
use reth_tasks::TaskSpawner;
use std::sync::Arc;
use tokio::sync::oneshot;
use tracing::trace;
@ -24,6 +25,10 @@ const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024;
/// The Engine API implementation that grants the Consensus layer access to data and
/// functions in the Execution layer that are crucial for the consensus process.
pub struct EngineApi<Provider> {
inner: Arc<EngineApiInner<Provider>>,
}
struct EngineApiInner<Provider> {
/// The provider to interact with the chain.
provider: Provider,
/// Consensus configuration
@ -32,6 +37,8 @@ pub struct EngineApi<Provider> {
beacon_consensus: BeaconConsensusEngineHandle,
/// The type that can communicate with the payload service to retrieve payloads.
payload_store: PayloadStore,
/// For spawning and executing async tasks
task_spawner: Box<dyn TaskSpawner>,
}
impl<Provider> EngineApi<Provider>
@ -44,8 +51,16 @@ where
chain_spec: Arc<ChainSpec>,
beacon_consensus: BeaconConsensusEngineHandle,
payload_store: PayloadStore,
task_spawner: Box<dyn TaskSpawner>,
) -> Self {
Self { provider, chain_spec, beacon_consensus, payload_store }
let inner = Arc::new(EngineApiInner {
provider,
chain_spec,
beacon_consensus,
payload_store,
task_spawner,
});
Self { inner }
}
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_newpayloadv1>
@ -59,7 +74,7 @@ where
payload.timestamp.as_u64(),
payload.withdrawals.is_some(),
)?;
Ok(self.beacon_consensus.new_payload(payload).await?)
Ok(self.inner.beacon_consensus.new_payload(payload).await?)
}
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_newpayloadv2>
@ -72,7 +87,7 @@ where
payload.timestamp.as_u64(),
payload.withdrawals.is_some(),
)?;
Ok(self.beacon_consensus.new_payload(payload).await?)
Ok(self.inner.beacon_consensus.new_payload(payload).await?)
}
/// Sends a message to the beacon consensus engine to update the fork choice _without_
@ -93,7 +108,7 @@ where
attrs.withdrawals.is_some(),
)?;
}
Ok(self.beacon_consensus.fork_choice_updated(state, payload_attrs).await?)
Ok(self.inner.beacon_consensus.fork_choice_updated(state, payload_attrs).await?)
}
/// Sends a message to the beacon consensus engine to update the fork choice _with_ withdrawals,
@ -112,7 +127,7 @@ where
attrs.withdrawals.is_some(),
)?;
}
Ok(self.beacon_consensus.fork_choice_updated(state, payload_attrs).await?)
Ok(self.inner.beacon_consensus.fork_choice_updated(state, payload_attrs).await?)
}
/// Returns the most recent version of the payload that is available in the corresponding
@ -126,6 +141,7 @@ where
/// > Provider software MAY stop the corresponding build process after serving this call.
pub async fn get_payload_v1(&self, payload_id: PayloadId) -> EngineApiResult<ExecutionPayload> {
Ok(self
.inner
.payload_store
.resolve(payload_id)
.await
@ -145,6 +161,7 @@ where
payload_id: PayloadId,
) -> EngineApiResult<ExecutionPayloadEnvelope> {
Ok(self
.inner
.payload_store
.resolve(payload_id)
.await
@ -162,31 +179,44 @@ where
/// Implementors should take care when acting on the input to this method, specifically
/// ensuring that the range is limited properly, and that the range boundaries are computed
/// correctly and without panics.
pub fn get_payload_bodies_by_range(
pub async fn get_payload_bodies_by_range(
&self,
start: BlockNumber,
count: u64,
) -> EngineApiResult<ExecutionPayloadBodies> {
if count > MAX_PAYLOAD_BODIES_LIMIT {
return Err(EngineApiError::PayloadRequestTooLarge { len: count })
}
let (tx, rx) = oneshot::channel();
let inner = self.inner.clone();
if start == 0 || count == 0 {
return Err(EngineApiError::InvalidBodiesRange { start, count })
}
self.inner.task_spawner.spawn_blocking(Box::pin(async move {
if count > MAX_PAYLOAD_BODIES_LIMIT {
tx.send(Err(EngineApiError::PayloadRequestTooLarge { len: count })).ok();
return
}
let mut result = Vec::with_capacity(count as usize);
if start == 0 || count == 0 {
tx.send(Err(EngineApiError::InvalidBodiesRange { start, count })).ok();
return
}
let end = start.saturating_add(count);
for num in start..end {
let block = self
.provider
.block(BlockHashOrNumber::Number(num))
.map_err(|err| EngineApiError::Internal(Box::new(err)))?;
result.push(block.map(Into::into));
}
let mut result = Vec::with_capacity(count as usize);
Ok(result)
let end = start.saturating_add(count);
for num in start..end {
let block_result = inner.provider.block(BlockHashOrNumber::Number(num));
match block_result {
Ok(block) => {
result.push(block.map(Into::into));
}
Err(err) => {
tx.send(Err(EngineApiError::Internal(Box::new(err)))).ok();
return
}
};
}
tx.send(Ok(result)).ok();
}));
rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
}
/// Called to retrieve execution payload bodies by hashes.
@ -202,6 +232,7 @@ where
let mut result = Vec::with_capacity(hashes.len());
for hash in hashes {
let block = self
.inner
.provider
.block(BlockHashOrNumber::Hash(hash))
.map_err(|err| EngineApiError::Internal(Box::new(err)))?;
@ -224,6 +255,7 @@ where
} = config;
let merge_terminal_td = self
.inner
.chain_spec
.fork(Hardfork::Paris)
.ttd()
@ -237,7 +269,7 @@ where
})
}
self.beacon_consensus.transition_configuration_exchanged().await;
self.inner.beacon_consensus.transition_configuration_exchanged().await;
// Short circuit if communicated block hash is zero
if terminal_block_hash.is_zero() {
@ -249,6 +281,7 @@ where
// Attempt to look up terminal block hash
let local_hash = self
.inner
.provider
.block_hash(terminal_block_number.as_u64())
.map_err(|err| EngineApiError::Internal(Box::new(err)))?;
@ -276,7 +309,8 @@ where
timestamp: u64,
has_withdrawals: bool,
) -> EngineApiResult<()> {
let is_shanghai = self.chain_spec.fork(Hardfork::Shanghai).active_at_timestamp(timestamp);
let is_shanghai =
self.inner.chain_spec.fork(Hardfork::Shanghai).active_at_timestamp(timestamp);
match version {
EngineApiMessageVersion::V1 => {
@ -404,7 +438,7 @@ where
count: U64,
) -> RpcResult<ExecutionPayloadBodies> {
trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV1");
Ok(EngineApi::get_payload_bodies_by_range(self, start.as_u64(), count.as_u64())?)
Ok(EngineApi::get_payload_bodies_by_range(self, start.as_u64(), count.as_u64()).await?)
}
/// Handler for `engine_exchangeTransitionConfigurationV1`
@ -439,6 +473,7 @@ mod tests {
use reth_payload_builder::test_utils::spawn_test_payload_service;
use reth_primitives::{SealedBlock, H256, MAINNET};
use reth_provider::test_utils::MockEthProvider;
use reth_tasks::TokioTaskExecutor;
use std::sync::Arc;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
@ -447,11 +482,13 @@ mod tests {
let provider = Arc::new(MockEthProvider::default());
let payload_store = spawn_test_payload_service();
let (to_engine, engine_rx) = unbounded_channel();
let task_executor = Box::new(TokioTaskExecutor::default());
let api = EngineApi::new(
provider.clone(),
chain_spec.clone(),
BeaconConsensusEngineHandle::new(to_engine),
payload_store.into(),
task_executor,
);
let handle = EngineApiTestHandle { chain_spec, provider, from_api: engine_rx };
(handle, api)
@ -491,7 +528,7 @@ mod tests {
// test [EngineApiMessage::GetPayloadBodiesByRange]
for (start, count) in by_range_tests {
let res = api.get_payload_bodies_by_range(start, count);
let res = api.get_payload_bodies_by_range(start, count).await;
assert_matches!(res, Err(EngineApiError::InvalidBodiesRange { .. }));
}
}
@ -501,7 +538,7 @@ mod tests {
let (_, api) = setup_engine_api();
let request_count = MAX_PAYLOAD_BODIES_LIMIT + 1;
let res = api.get_payload_bodies_by_range(0, request_count);
let res = api.get_payload_bodies_by_range(0, request_count).await;
assert_matches!(res, Err(EngineApiError::PayloadRequestTooLarge { .. }));
}
@ -518,7 +555,7 @@ mod tests {
let expected =
blocks.iter().cloned().map(|b| Some(b.unseal().into())).collect::<Vec<_>>();
let res = api.get_payload_bodies_by_range(start, count).unwrap();
let res = api.get_payload_bodies_by_range(start, count).await.unwrap();
assert_eq!(res, expected);
}
@ -558,7 +595,7 @@ mod tests {
})
.collect::<Vec<_>>();
let res = api.get_payload_bodies_by_range(start, count).unwrap();
let res = api.get_payload_bodies_by_range(start, count).await.unwrap();
assert_eq!(res, expected);
let hashes = blocks.iter().map(|b| b.hash()).collect();