feat: add TaskSpawner to spawn validation requests as blocking (#12543)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
ftupas
2024-11-20 11:56:44 +01:00
committed by GitHub
parent ce4a32017a
commit 6977cf0453
2 changed files with 91 additions and 55 deletions

View File

@ -1252,6 +1252,7 @@ where
Arc::new(self.consensus.clone()),
self.block_executor.clone(),
self.config.flashbots.clone(),
Box::new(self.executor.clone()),
)
}
}
@ -1416,6 +1417,7 @@ where
Arc::new(self.consensus.clone()),
self.block_executor.clone(),
self.config.flashbots.clone(),
Box::new(self.executor.clone()),
)
.into_rpc()
.into(),

View File

@ -5,13 +5,13 @@ use alloy_rpc_types_beacon::relay::{
BuilderBlockValidationRequestV3, BuilderBlockValidationRequestV4,
};
use alloy_rpc_types_engine::{
BlobsBundleV1, CancunPayloadFields, ExecutionPayload, ExecutionPayloadSidecar,
BlobsBundleV1, CancunPayloadFields, ExecutionPayload, ExecutionPayloadSidecar, PayloadError,
};
use async_trait::async_trait;
use jsonrpsee::core::RpcResult;
use reth_chainspec::{ChainSpecProvider, EthereumHardforks};
use reth_consensus::{Consensus, PostExecutionInput};
use reth_errors::{BlockExecutionError, ConsensusError, ProviderError, RethError};
use reth_errors::{BlockExecutionError, ConsensusError, ProviderError};
use reth_ethereum_consensus::GAS_LIMIT_BOUND_DIVISOR;
use reth_evm::execute::{BlockExecutorProvider, Executor};
use reth_payload_validator::ExecutionPayloadValidator;
@ -22,16 +22,16 @@ use reth_provider::{
};
use reth_revm::{cached::CachedReads, database::StateProviderDatabase};
use reth_rpc_api::BlockSubmissionValidationApiServer;
use reth_rpc_eth_types::EthApiError;
use reth_rpc_server_types::{result::internal_rpc_err, ToRpcResult};
use reth_rpc_server_types::result::internal_rpc_err;
use reth_tasks::TaskSpawner;
use reth_trie::HashedPostState;
use revm_primitives::{Address, B256, U256};
use serde::{Deserialize, Serialize};
use std::{collections::HashSet, sync::Arc};
use tokio::sync::RwLock;
use tokio::sync::{oneshot, RwLock};
/// The type that implements the `validation` rpc namespace trait
#[derive(Debug, derive_more::Deref)]
#[derive(Clone, Debug, derive_more::Deref)]
pub struct ValidationApi<Provider: ChainSpecProvider, E> {
#[deref]
inner: Arc<ValidationApiInner<Provider, E>>,
@ -47,6 +47,7 @@ where
consensus: Arc<dyn Consensus>,
executor_provider: E,
config: ValidationApiConfig,
task_spawner: Box<dyn TaskSpawner>,
) -> Self {
let ValidationApiConfig { disallow } = config;
@ -58,6 +59,7 @@ where
executor_provider,
disallow,
cached_state: Default::default(),
task_spawner,
});
Self { inner }
@ -338,6 +340,60 @@ where
Ok(versioned_hashes)
}
/// Core logic for validating the builder submission v3
async fn validate_builder_submission_v3(
&self,
request: BuilderBlockValidationRequestV3,
) -> Result<(), ValidationApiError> {
let block = self
.payload_validator
.ensure_well_formed_payload(
ExecutionPayload::V3(request.request.execution_payload),
ExecutionPayloadSidecar::v3(CancunPayloadFields {
parent_beacon_block_root: request.parent_beacon_block_root,
versioned_hashes: self.validate_blobs_bundle(request.request.blobs_bundle)?,
}),
)?
.try_seal_with_senders()
.map_err(|_| ValidationApiError::InvalidTransactionSignature)?;
self.validate_message_against_block(
block,
request.request.message,
request.registered_gas_limit,
)
.await
}
/// Core logic for validating the builder submission v4
async fn validate_builder_submission_v4(
&self,
request: BuilderBlockValidationRequestV4,
) -> Result<(), ValidationApiError> {
let block = self
.payload_validator
.ensure_well_formed_payload(
ExecutionPayload::V3(request.request.execution_payload),
ExecutionPayloadSidecar::v4(
CancunPayloadFields {
parent_beacon_block_root: request.parent_beacon_block_root,
versioned_hashes: self
.validate_blobs_bundle(request.request.blobs_bundle)?,
},
request.request.execution_requests.into(),
),
)?
.try_seal_with_senders()
.map_err(|_| ValidationApiError::InvalidTransactionSignature)?;
self.validate_message_against_block(
block,
request.request.message,
request.registered_gas_limit,
)
.await
}
}
#[async_trait]
@ -372,30 +428,17 @@ where
&self,
request: BuilderBlockValidationRequestV3,
) -> RpcResult<()> {
let block = self
.payload_validator
.ensure_well_formed_payload(
ExecutionPayload::V3(request.request.execution_payload),
ExecutionPayloadSidecar::v3(CancunPayloadFields {
parent_beacon_block_root: request.parent_beacon_block_root,
versioned_hashes: self
.validate_blobs_bundle(request.request.blobs_bundle)
.map_err(|e| RethError::Other(e.into()))
.to_rpc_result()?,
}),
)
.to_rpc_result()?
.try_seal_with_senders()
.map_err(|_| EthApiError::InvalidTransactionSignature)?;
let this = self.clone();
let (tx, rx) = oneshot::channel();
self.validate_message_against_block(
block,
request.request.message,
request.registered_gas_limit,
)
.await
.map_err(|e| RethError::Other(e.into()))
.to_rpc_result()
self.task_spawner.spawn_blocking(Box::pin(async move {
let result = Self::validate_builder_submission_v3(&this, request)
.await
.map_err(|err| internal_rpc_err(err.to_string()));
let _ = tx.send(result);
}));
rx.await.map_err(|_| internal_rpc_err("Internal blocking task error"))?
}
/// Validates a block submitted to the relay
@ -403,33 +446,17 @@ where
&self,
request: BuilderBlockValidationRequestV4,
) -> RpcResult<()> {
let block = self
.payload_validator
.ensure_well_formed_payload(
ExecutionPayload::V3(request.request.execution_payload),
ExecutionPayloadSidecar::v4(
CancunPayloadFields {
parent_beacon_block_root: request.parent_beacon_block_root,
versioned_hashes: self
.validate_blobs_bundle(request.request.blobs_bundle)
.map_err(|e| RethError::Other(e.into()))
.to_rpc_result()?,
},
request.request.execution_requests.into(),
),
)
.to_rpc_result()?
.try_seal_with_senders()
.map_err(|_| EthApiError::InvalidTransactionSignature)?;
let this = self.clone();
let (tx, rx) = oneshot::channel();
self.validate_message_against_block(
block,
request.request.message,
request.registered_gas_limit,
)
.await
.map_err(|e| RethError::Other(e.into()))
.to_rpc_result()
self.task_spawner.spawn_blocking(Box::pin(async move {
let result = Self::validate_builder_submission_v4(&this, request)
.await
.map_err(|err| internal_rpc_err(err.to_string()));
let _ = tx.send(result);
}));
rx.await.map_err(|_| internal_rpc_err("Internal blocking task error"))?
}
}
@ -450,6 +477,8 @@ pub struct ValidationApiInner<Provider: ChainSpecProvider, E> {
/// latest head block state. Uses async `RwLock` to safely handle concurrent validation
/// requests.
cached_state: RwLock<(B256, CachedReads)>,
/// Task spawner for blocking operations
task_spawner: Box<dyn TaskSpawner>,
}
/// Configuration for validation API.
@ -476,6 +505,9 @@ pub enum ValidationApiError {
ProposerPayment,
#[error("invalid blobs bundle")]
InvalidBlobsBundle,
/// When the transaction signature is invalid
#[error("invalid transaction signature")]
InvalidTransactionSignature,
#[error("block accesses blacklisted address: {_0}")]
Blacklist(Address),
#[error(transparent)]
@ -486,4 +518,6 @@ pub enum ValidationApiError {
Provider(#[from] ProviderError),
#[error(transparent)]
Execution(#[from] BlockExecutionError),
#[error(transparent)]
Payload(#[from] PayloadError),
}