Refactor get_payload_bodies_by_hash_with to be non-blocking (#11511)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Parikalp Bhardwaj
2024-10-07 20:11:59 +04:00
committed by GitHub
parent 54c8305fb6
commit 8ca5ec9b72

View File

@ -464,48 +464,59 @@ where
}
/// Called to retrieve execution payload bodies by hashes.
fn get_payload_bodies_by_hash_with<F, R>(
async fn get_payload_bodies_by_hash_with<F, R>(
&self,
hashes: Vec<BlockHash>,
f: F,
) -> EngineApiResult<Vec<Option<R>>>
where
F: Fn(Block) -> R,
F: Fn(Block) -> R + Send + 'static,
R: Send + 'static,
{
let len = hashes.len() as u64;
if len > MAX_PAYLOAD_BODIES_LIMIT {
return Err(EngineApiError::PayloadRequestTooLarge { len })
return Err(EngineApiError::PayloadRequestTooLarge { len });
}
let mut result = Vec::with_capacity(hashes.len());
for hash in hashes {
let block = self
.inner
.provider
.block(BlockHashOrNumber::Hash(hash))
.map_err(|err| EngineApiError::Internal(Box::new(err)))?;
result.push(block.map(&f));
}
let (tx, rx) = oneshot::channel();
let inner = self.inner.clone();
Ok(result)
self.inner.task_spawner.spawn_blocking(Box::pin(async move {
let mut result = Vec::with_capacity(hashes.len());
for hash in hashes {
let block_result = inner.provider.block(BlockHashOrNumber::Hash(hash));
match block_result {
Ok(block) => {
result.push(block.map(&f));
}
Err(err) => {
let _ = tx.send(Err(EngineApiError::Internal(Box::new(err))));
return;
}
}
}
tx.send(Ok(result)).ok();
}));
rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
}
/// Called to retrieve execution payload bodies by hashes.
pub fn get_payload_bodies_by_hash_v1(
pub async fn get_payload_bodies_by_hash_v1(
&self,
hashes: Vec<BlockHash>,
) -> EngineApiResult<ExecutionPayloadBodiesV1> {
self.get_payload_bodies_by_hash_with(hashes, convert_to_payload_body_v1)
self.get_payload_bodies_by_hash_with(hashes, convert_to_payload_body_v1).await
}
/// Called to retrieve execution payload bodies by hashes.
///
/// Same as [`Self::get_payload_bodies_by_hash_v1`] but as [`ExecutionPayloadBodiesV2`].
pub fn get_payload_bodies_by_hash_v2(
pub async fn get_payload_bodies_by_hash_v2(
&self,
hashes: Vec<BlockHash>,
) -> EngineApiResult<ExecutionPayloadBodiesV2> {
self.get_payload_bodies_by_hash_with(hashes, convert_to_payload_body_v2)
self.get_payload_bodies_by_hash_with(hashes, convert_to_payload_body_v2).await
}
/// Called to verify network configuration parameters and ensure that Consensus and Execution
@ -832,7 +843,7 @@ where
let start = Instant::now();
let res = Self::get_payload_bodies_by_hash_v1(self, block_hashes);
self.inner.metrics.latency.get_payload_bodies_by_hash_v1.record(start.elapsed());
Ok(res?)
Ok(res.await?)
}
async fn get_payload_bodies_by_hash_v2(
@ -843,7 +854,7 @@ where
let start = Instant::now();
let res = Self::get_payload_bodies_by_hash_v2(self, block_hashes);
self.inner.metrics.latency.get_payload_bodies_by_hash_v2.record(start.elapsed());
Ok(res?)
Ok(res.await?)
}
/// Handler for `engine_getPayloadBodiesByRangeV1`
@ -1147,7 +1158,7 @@ mod tests {
.collect::<Vec<_>>();
let hashes = blocks.iter().map(|b| b.hash()).collect();
let res = api.get_payload_bodies_by_hash_v1(hashes).unwrap();
let res = api.get_payload_bodies_by_hash_v1(hashes).await.unwrap();
assert_eq!(res, expected);
}
}