feat: add NetworkPrimitives to NetworkBuilder (#13169)

Co-authored-by: Arsenii Kulikov <klkvrr@gmail.com>
This commit is contained in:
Dan Cline
2024-12-10 15:56:32 -05:00
committed by GitHub
parent 73f1583455
commit 37f3933db2
26 changed files with 201 additions and 143 deletions

1
Cargo.lock generated
View File

@ -8118,7 +8118,6 @@ dependencies = [
name = "reth-node-builder"
version = "1.1.3"
dependencies = [
"alloy-consensus",
"alloy-primitives",
"alloy-rpc-types",
"aquamarine",

View File

@ -1,7 +1,7 @@
use futures_util::StreamExt;
use reth_network_api::{
events::PeerEvent, test_utils::PeersHandleProvider, NetworkEvent, NetworkEventListenerProvider,
PeersInfo,
PeerRequest, PeersInfo,
};
use reth_network_peers::{NodeRecord, PeerId};
use reth_tokio_util::EventStream;
@ -9,8 +9,8 @@ use reth_tracing::tracing::info;
/// Helper for network operations
#[derive(Debug)]
pub struct NetworkTestContext<Network> {
network_events: EventStream<NetworkEvent>,
pub struct NetworkTestContext<Network: NetworkEventListenerProvider> {
network_events: EventStream<NetworkEvent<PeerRequest<Network::Primitives>>>,
network: Network,
}

View File

@ -8,7 +8,8 @@
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
use alloy_consensus::{Header, EMPTY_OMMER_ROOT_HASH};
use alloy_consensus::{BlockHeader, EMPTY_OMMER_ROOT_HASH};
use alloy_eips::merge::ALLOWED_FUTURE_BLOCK_TIME_SECONDS;
use alloy_primitives::U256;
use reth_chainspec::{EthChainSpec, EthereumHardfork, EthereumHardforks};
use reth_consensus::{
@ -20,10 +21,8 @@ use reth_consensus_common::validation::{
validate_against_parent_timestamp, validate_block_pre_execution, validate_body_against_header,
validate_header_base_fee, validate_header_extradata, validate_header_gas,
};
use reth_primitives::{
Block, BlockBody, BlockWithSenders, NodePrimitives, Receipt, SealedBlock, SealedHeader,
};
use reth_primitives_traits::constants::MINIMUM_GAS_LIMIT;
use reth_primitives::{BlockWithSenders, NodePrimitives, Receipt, SealedBlock, SealedHeader};
use reth_primitives_traits::{constants::MINIMUM_GAS_LIMIT, BlockBody};
use std::{fmt::Debug, sync::Arc, time::SystemTime};
/// The bound divisor of the gas limit, used in update calculations.
@ -51,43 +50,46 @@ impl<ChainSpec: EthChainSpec + EthereumHardforks> EthBeaconConsensus<ChainSpec>
///
/// The maximum allowable difference between self and parent gas limits is determined by the
/// parent's gas limit divided by the [`GAS_LIMIT_BOUND_DIVISOR`].
fn validate_against_parent_gas_limit(
fn validate_against_parent_gas_limit<H: BlockHeader>(
&self,
header: &SealedHeader,
parent: &SealedHeader,
header: &SealedHeader<H>,
parent: &SealedHeader<H>,
) -> Result<(), ConsensusError> {
// Determine the parent gas limit, considering elasticity multiplier on the London fork.
let parent_gas_limit =
if self.chain_spec.fork(EthereumHardfork::London).transitions_at_block(header.number) {
parent.gas_limit *
if self.chain_spec.fork(EthereumHardfork::London).transitions_at_block(header.number())
{
parent.gas_limit() *
self.chain_spec
.base_fee_params_at_timestamp(header.timestamp)
.base_fee_params_at_timestamp(header.timestamp())
.elasticity_multiplier as u64
} else {
parent.gas_limit
parent.gas_limit()
};
// Check for an increase in gas limit beyond the allowed threshold.
if header.gas_limit > parent_gas_limit {
if header.gas_limit - parent_gas_limit >= parent_gas_limit / GAS_LIMIT_BOUND_DIVISOR {
if header.gas_limit() > parent_gas_limit {
if header.gas_limit() - parent_gas_limit >= parent_gas_limit / GAS_LIMIT_BOUND_DIVISOR {
return Err(ConsensusError::GasLimitInvalidIncrease {
parent_gas_limit,
child_gas_limit: header.gas_limit,
child_gas_limit: header.gas_limit(),
})
}
}
// Check for a decrease in gas limit beyond the allowed threshold.
else if parent_gas_limit - header.gas_limit >= parent_gas_limit / GAS_LIMIT_BOUND_DIVISOR
else if parent_gas_limit - header.gas_limit() >=
parent_gas_limit / GAS_LIMIT_BOUND_DIVISOR
{
return Err(ConsensusError::GasLimitInvalidDecrease {
parent_gas_limit,
child_gas_limit: header.gas_limit,
child_gas_limit: header.gas_limit(),
})
}
// Check if the self gas limit is below the minimum required limit.
else if header.gas_limit < MINIMUM_GAS_LIMIT {
return Err(ConsensusError::GasLimitInvalidMinimum { child_gas_limit: header.gas_limit })
else if header.gas_limit() < MINIMUM_GAS_LIMIT {
return Err(ConsensusError::GasLimitInvalidMinimum {
child_gas_limit: header.gas_limit(),
})
}
Ok(())
@ -97,72 +99,75 @@ impl<ChainSpec: EthChainSpec + EthereumHardforks> EthBeaconConsensus<ChainSpec>
impl<ChainSpec, N> FullConsensus<N> for EthBeaconConsensus<ChainSpec>
where
ChainSpec: Send + Sync + EthChainSpec + EthereumHardforks + Debug,
N: NodePrimitives<
BlockHeader = Header,
BlockBody = BlockBody,
Block = Block,
Receipt = Receipt,
>,
N: NodePrimitives<Receipt = Receipt>,
{
fn validate_block_post_execution(
&self,
block: &BlockWithSenders,
block: &BlockWithSenders<N::Block>,
input: PostExecutionInput<'_>,
) -> Result<(), ConsensusError> {
validate_block_post_execution(block, &self.chain_spec, input.receipts, input.requests)
}
}
impl<ChainSpec: Send + Sync + EthChainSpec + EthereumHardforks + Debug> Consensus
impl<H, B, ChainSpec: Send + Sync + EthChainSpec + EthereumHardforks + Debug> Consensus<H, B>
for EthBeaconConsensus<ChainSpec>
where
H: BlockHeader,
B: BlockBody,
{
fn validate_body_against_header(
&self,
body: &BlockBody,
header: &SealedHeader,
body: &B,
header: &SealedHeader<H>,
) -> Result<(), ConsensusError> {
validate_body_against_header(body, header.header())
}
fn validate_block_pre_execution(&self, block: &SealedBlock) -> Result<(), ConsensusError> {
fn validate_block_pre_execution(
&self,
block: &SealedBlock<H, B>,
) -> Result<(), ConsensusError> {
validate_block_pre_execution(block, &self.chain_spec)
}
}
impl<ChainSpec: Send + Sync + EthChainSpec + EthereumHardforks + Debug> HeaderValidator
impl<H, ChainSpec: Send + Sync + EthChainSpec + EthereumHardforks + Debug> HeaderValidator<H>
for EthBeaconConsensus<ChainSpec>
where
H: BlockHeader,
{
fn validate_header(&self, header: &SealedHeader) -> Result<(), ConsensusError> {
fn validate_header(&self, header: &SealedHeader<H>) -> Result<(), ConsensusError> {
validate_header_gas(header.header())?;
validate_header_base_fee(header.header(), &self.chain_spec)?;
// EIP-4895: Beacon chain push withdrawals as operations
if self.chain_spec.is_shanghai_active_at_timestamp(header.timestamp) &&
header.withdrawals_root.is_none()
if self.chain_spec.is_shanghai_active_at_timestamp(header.timestamp()) &&
header.withdrawals_root().is_none()
{
return Err(ConsensusError::WithdrawalsRootMissing)
} else if !self.chain_spec.is_shanghai_active_at_timestamp(header.timestamp) &&
header.withdrawals_root.is_some()
} else if !self.chain_spec.is_shanghai_active_at_timestamp(header.timestamp()) &&
header.withdrawals_root().is_some()
{
return Err(ConsensusError::WithdrawalsRootUnexpected)
}
// Ensures that EIP-4844 fields are valid once cancun is active.
if self.chain_spec.is_cancun_active_at_timestamp(header.timestamp) {
if self.chain_spec.is_cancun_active_at_timestamp(header.timestamp()) {
validate_4844_header_standalone(header.header())?;
} else if header.blob_gas_used.is_some() {
} else if header.blob_gas_used().is_some() {
return Err(ConsensusError::BlobGasUsedUnexpected)
} else if header.excess_blob_gas.is_some() {
} else if header.excess_blob_gas().is_some() {
return Err(ConsensusError::ExcessBlobGasUnexpected)
} else if header.parent_beacon_block_root.is_some() {
} else if header.parent_beacon_block_root().is_some() {
return Err(ConsensusError::ParentBeaconBlockRootUnexpected)
}
if self.chain_spec.is_prague_active_at_timestamp(header.timestamp) {
if header.requests_hash.is_none() {
if self.chain_spec.is_prague_active_at_timestamp(header.timestamp()) {
if header.requests_hash().is_none() {
return Err(ConsensusError::RequestsHashMissing)
}
} else if header.requests_hash.is_some() {
} else if header.requests_hash().is_some() {
return Err(ConsensusError::RequestsHashUnexpected)
}
@ -171,8 +176,8 @@ impl<ChainSpec: Send + Sync + EthChainSpec + EthereumHardforks + Debug> HeaderVa
fn validate_header_against_parent(
&self,
header: &SealedHeader,
parent: &SealedHeader,
header: &SealedHeader<H>,
parent: &SealedHeader<H>,
) -> Result<(), ConsensusError> {
validate_against_parent_hash_number(header.header(), parent)?;
@ -189,7 +194,7 @@ impl<ChainSpec: Send + Sync + EthChainSpec + EthereumHardforks + Debug> HeaderVa
)?;
// ensure that the blob gas fields for this block
if self.chain_spec.is_cancun_active_at_timestamp(header.timestamp) {
if self.chain_spec.is_cancun_active_at_timestamp(header.timestamp()) {
validate_against_parent_4844(header.header(), parent.header())?;
}
@ -198,24 +203,26 @@ impl<ChainSpec: Send + Sync + EthChainSpec + EthereumHardforks + Debug> HeaderVa
fn validate_header_with_total_difficulty(
&self,
header: &Header,
header: &H,
total_difficulty: U256,
) -> Result<(), ConsensusError> {
let is_post_merge = self
.chain_spec
.fork(EthereumHardfork::Paris)
.active_at_ttd(total_difficulty, header.difficulty);
.active_at_ttd(total_difficulty, header.difficulty());
if is_post_merge {
if !header.is_zero_difficulty() {
// TODO: add `is_zero_difficulty` to `alloy_consensus::BlockHeader` trait
if !header.difficulty().is_zero() {
return Err(ConsensusError::TheMergeDifficultyIsNotZero)
}
if !header.nonce.is_zero() {
// TODO: helper fn in `alloy_consensus::BlockHeader` trait
if !header.nonce().is_some_and(|nonce| nonce.is_zero()) {
return Err(ConsensusError::TheMergeNonceIsNotZero)
}
if header.ommers_hash != EMPTY_OMMER_ROOT_HASH {
if header.ommers_hash() != EMPTY_OMMER_ROOT_HASH {
return Err(ConsensusError::TheMergeOmmerRootIsNotEmpty)
}
@ -241,9 +248,10 @@ impl<ChainSpec: Send + Sync + EthChainSpec + EthereumHardforks + Debug> HeaderVa
let present_timestamp =
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
if header.exceeds_allowed_future_timestamp(present_timestamp) {
// TODO: move this to `alloy_consensus::BlockHeader`
if header.timestamp() > present_timestamp + ALLOWED_FUTURE_BLOCK_TIME_SECONDS {
return Err(ConsensusError::TimestampIsInFuture {
timestamp: header.timestamp,
timestamp: header.timestamp(),
present_timestamp,
})
}
@ -263,7 +271,7 @@ mod tests {
use reth_primitives::proofs;
fn header_with_gas_limit(gas_limit: u64) -> SealedHeader {
let header = Header { gas_limit, ..Default::default() };
let header = reth_primitives::Header { gas_limit, ..Default::default() };
SealedHeader::new(header, B256::ZERO)
}
@ -343,7 +351,7 @@ mod tests {
// that the header is valid
let chain_spec = Arc::new(ChainSpecBuilder::mainnet().shanghai_activated().build());
let header = Header {
let header = reth_primitives::Header {
base_fee_per_gas: Some(1337),
withdrawals_root: Some(proofs::calculate_withdrawals_root(&[])),
..Default::default()

View File

@ -1,26 +1,31 @@
use alloy_consensus::{proofs::calculate_receipt_root, TxReceipt};
use alloy_consensus::{proofs::calculate_receipt_root, BlockHeader, TxReceipt};
use alloy_eips::eip7685::Requests;
use alloy_primitives::{Bloom, B256};
use reth_chainspec::EthereumHardforks;
use reth_consensus::ConsensusError;
use reth_primitives::{gas_spent_by_transactions, BlockWithSenders, GotExpected, Receipt};
use reth_primitives_traits::Block;
/// Validate a block with regard to execution results:
///
/// - Compares the receipts root in the block header to the block body
/// - Compares the gas used in the block header to the actual gas usage after execution
pub fn validate_block_post_execution<ChainSpec: EthereumHardforks>(
block: &BlockWithSenders,
pub fn validate_block_post_execution<B, ChainSpec>(
block: &BlockWithSenders<B>,
chain_spec: &ChainSpec,
receipts: &[Receipt],
requests: &Requests,
) -> Result<(), ConsensusError> {
) -> Result<(), ConsensusError>
where
B: Block,
ChainSpec: EthereumHardforks,
{
// Check if gas used matches the value set in header.
let cumulative_gas_used =
receipts.last().map(|receipt| receipt.cumulative_gas_used).unwrap_or(0);
if block.gas_used != cumulative_gas_used {
if block.header().gas_used() != cumulative_gas_used {
return Err(ConsensusError::BlockGasUsed {
gas: GotExpected { got: cumulative_gas_used, expected: block.gas_used },
gas: GotExpected { got: cumulative_gas_used, expected: block.header().gas_used() },
gas_spent_by_tx: gas_spent_by_transactions(receipts),
})
}
@ -29,9 +34,9 @@ pub fn validate_block_post_execution<ChainSpec: EthereumHardforks>(
// operation as hashing that is required for state root got calculated in every
// transaction This was replaced with is_success flag.
// See more about EIP here: https://eips.ethereum.org/EIPS/eip-658
if chain_spec.is_byzantium_active_at_block(block.header.number) {
if chain_spec.is_byzantium_active_at_block(block.header().number()) {
if let Err(error) =
verify_receipts(block.header.receipts_root, block.header.logs_bloom, receipts)
verify_receipts(block.header().receipts_root(), block.header().logs_bloom(), receipts)
{
tracing::debug!(%error, ?receipts, "receipts verification failed");
return Err(error)
@ -39,8 +44,8 @@ pub fn validate_block_post_execution<ChainSpec: EthereumHardforks>(
}
// Validate that the header requests hash matches the calculated requests hash
if chain_spec.is_prague_active_at_timestamp(block.timestamp) {
let Some(header_requests_hash) = block.header.requests_hash else {
if chain_spec.is_prague_active_at_timestamp(block.header().timestamp()) {
let Some(header_requests_hash) = block.header().requests_hash() else {
return Err(ConsensusError::RequestsHashMissing)
};
let requests_hash = requests.requests_hash();

View File

@ -10,7 +10,7 @@ use reth_ethereum_engine_primitives::{
};
use reth_evm::execute::BasicBlockExecutorProvider;
use reth_evm_ethereum::execute::EthExecutionStrategyFactory;
use reth_network::{NetworkHandle, PeersInfo};
use reth_network::{EthNetworkPrimitives, NetworkHandle, PeersInfo};
use reth_node_api::{
AddOnsContext, ConfigureEvm, FullNodeComponents, HeaderTy, NodeTypesWithDB, TxTy,
};
@ -318,6 +318,8 @@ where
> + Unpin
+ 'static,
{
type Primitives = EthNetworkPrimitives;
async fn build_network(
self,
ctx: &BuilderContext<Node>,

View File

@ -464,7 +464,10 @@ impl<H, B> OrderedBodiesResponse<H, B> {
}
}
impl<H: BlockHeader, B> OrderedBodiesResponse<H, B> {
impl<H, B> OrderedBodiesResponse<H, B>
where
H: BlockHeader,
{
/// Returns the block number of the first element
///
/// # Panics

View File

@ -54,7 +54,6 @@ where
self.inner.clear();
self.last_requested_block_number.take();
}
/// Add new request to the queue.
/// Expects a sorted list of headers.
pub(crate) fn push_new_request(
@ -71,6 +70,7 @@ where
None => last.number(),
})
.or(self.last_requested_block_number);
// Create request and push into the queue.
self.inner.push(
BodiesRequestFuture::new(client, consensus, self.metrics.clone()).with_headers(request),

View File

@ -56,8 +56,8 @@ pub(crate) struct BodiesRequestFuture<H, B: BodiesClient> {
impl<H, B> BodiesRequestFuture<H, B>
where
B: BodiesClient + 'static,
H: BlockHeader,
B: BodiesClient + 'static,
{
/// Returns an empty future. Use [`BodiesRequestFuture::with_headers`] to set the request.
pub(crate) fn new(

View File

@ -133,9 +133,12 @@ pub trait NetworkPeersEvents: Send + Sync {
/// Provides event subscription for the network.
#[auto_impl::auto_impl(&, Arc)]
pub trait NetworkEventListenerProvider<R = PeerRequest>: NetworkPeersEvents {
pub trait NetworkEventListenerProvider: NetworkPeersEvents {
/// The primitive types to use in the `PeerRequest` used in the stream.
type Primitives: NetworkPrimitives;
/// Creates a new [`NetworkEvent`] listener channel.
fn event_listener(&self) -> EventStream<NetworkEvent<R>>;
fn event_listener(&self) -> EventStream<NetworkEvent<PeerRequest<Self::Primitives>>>;
/// Returns a new [`DiscoveryEvent`] stream.
///
/// This stream yields [`DiscoveryEvent`]s for each peer that is discovered.

View File

@ -36,7 +36,6 @@ pub use events::{
use std::{future::Future, net::SocketAddr, sync::Arc, time::Instant};
use reth_eth_wire_types::{capability::Capabilities, DisconnectReason, EthVersion, Status};
use reth_network_p2p::EthBlockClient;
use reth_network_peers::NodeRecord;
/// The `PeerId` type.
@ -44,7 +43,7 @@ pub type PeerId = alloy_primitives::B512;
/// Helper trait that unifies network API needed to launch node.
pub trait FullNetwork:
BlockDownloaderProvider<Client: EthBlockClient>
BlockDownloaderProvider
+ NetworkSyncUpdater
+ NetworkInfo
+ NetworkEventListenerProvider
@ -56,7 +55,7 @@ pub trait FullNetwork:
}
impl<T> FullNetwork for T where
T: BlockDownloaderProvider<Client: EthBlockClient>
T: BlockDownloaderProvider
+ NetworkSyncUpdater
+ NetworkInfo
+ NetworkEventListenerProvider

View File

@ -205,8 +205,10 @@ impl<N: NetworkPrimitives> NetworkPeersEvents for NetworkHandle<N> {
}
}
impl<N: NetworkPrimitives> NetworkEventListenerProvider<PeerRequest<N>> for NetworkHandle<N> {
fn event_listener(&self) -> EventStream<NetworkEvent<PeerRequest<N>>> {
impl<N: NetworkPrimitives> NetworkEventListenerProvider for NetworkHandle<N> {
type Primitives = N;
fn event_listener(&self) -> EventStream<NetworkEvent<PeerRequest<Self::Primitives>>> {
self.inner.event_sender.new_listener()
}

View File

@ -6,17 +6,17 @@ use std::{
use crate::{download::DownloadClient, error::PeerRequestResult, priority::Priority};
use alloy_primitives::B256;
use futures::{Future, FutureExt};
use reth_primitives::BlockBody;
use reth_primitives_traits::BlockBody;
/// The bodies future type
pub type BodiesFut<B = BlockBody> =
pub type BodiesFut<B = reth_primitives::BlockBody> =
Pin<Box<dyn Future<Output = PeerRequestResult<Vec<B>>> + Send + Sync>>;
/// A client capable of downloading block bodies.
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait BodiesClient: DownloadClient {
/// The body type this client fetches.
type Body: Send + Sync + Unpin + 'static;
type Body: BlockBody;
/// The output of the request future for querying block bodies.
type Output: Future<Output = PeerRequestResult<Vec<Self::Body>>> + Sync + Send + Unpin;

View File

@ -15,7 +15,7 @@ pub type BodyDownloaderResult<H, B> = DownloadResult<Vec<BlockResponse<H, B>>>;
pub trait BodyDownloader:
Send + Sync + Stream<Item = BodyDownloaderResult<Self::Header, Self::Body>> + Unpin
{
/// The type of header that can be returned in a blck
/// The type of header that is being used
type Header: Debug + Send + Sync + Unpin + 'static;
/// The type of the body that is being downloaded.

View File

@ -3,6 +3,7 @@ use alloy_consensus::Header;
use alloy_eips::BlockHashOrNumber;
use futures::{Future, FutureExt};
pub use reth_eth_wire_types::{BlockHeaders, HeadersDirection};
use reth_primitives_traits::BlockHeader;
use std::{
fmt::Debug,
pin::Pin,
@ -57,7 +58,7 @@ pub type HeadersFut<H = Header> =
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait HeadersClient: DownloadClient {
/// The header type this client fetches.
type Header: Send + Sync + Unpin;
type Header: BlockHeader;
/// The headers future type
type Output: Future<Output = PeerRequestResult<Vec<Self::Header>>> + Sync + Send + Unpin;

View File

@ -61,7 +61,6 @@ reth-transaction-pool.workspace = true
## ethereum
alloy-primitives.workspace = true
alloy-rpc-types = { workspace = true, features = ["engine"] }
alloy-consensus.workspace = true
revm-primitives.workspace = true
## async

View File

@ -20,7 +20,7 @@ use reth_db_api::{
use reth_exex::ExExContext;
use reth_network::{
transactions::TransactionsManagerConfig, NetworkBuilder, NetworkConfig, NetworkConfigBuilder,
NetworkHandle, NetworkManager,
NetworkHandle, NetworkManager, NetworkPrimitives,
};
use reth_node_api::{
FullNodePrimitives, FullNodeTypes, FullNodeTypesAdapter, NodeAddOns, NodeTypes,
@ -648,19 +648,24 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
///
/// Spawns the configured network and associated tasks and returns the [`NetworkHandle`]
/// connected to that network.
pub fn start_network<Pool>(&self, builder: NetworkBuilder<(), ()>, pool: Pool) -> NetworkHandle
pub fn start_network<N, Pool>(
&self,
builder: NetworkBuilder<(), (), N>,
pool: Pool,
) -> NetworkHandle<N>
where
N: NetworkPrimitives,
Pool: TransactionPool<
Transaction: PoolTransaction<
Consensus = reth_primitives::TransactionSigned,
Pooled = reth_primitives::PooledTransactionsElement,
Consensus = N::BroadcastedTransaction,
Pooled = N::PooledTransaction,
>,
> + Unpin
+ 'static,
Node::Provider: BlockReader<
Block = reth_primitives::Block,
Receipt = reth_primitives::Receipt,
Header = reth_primitives::Header,
Block = N::Block,
Header = N::BlockHeader,
>,
{
self.start_network_with(builder, pool, Default::default())
@ -672,24 +677,25 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
///
/// Spawns the configured network and associated tasks and returns the [`NetworkHandle`]
/// connected to that network.
pub fn start_network_with<Pool>(
pub fn start_network_with<Pool, N>(
&self,
builder: NetworkBuilder<(), ()>,
builder: NetworkBuilder<(), (), N>,
pool: Pool,
tx_config: TransactionsManagerConfig,
) -> NetworkHandle
) -> NetworkHandle<N>
where
N: NetworkPrimitives,
Pool: TransactionPool<
Transaction: PoolTransaction<
Consensus = reth_primitives::TransactionSigned,
Pooled = reth_primitives::PooledTransactionsElement,
Consensus = N::BroadcastedTransaction,
Pooled = N::PooledTransaction,
>,
> + Unpin
+ 'static,
Node::Provider: BlockReader<
Block = reth_primitives::Block,
Receipt = reth_primitives::Receipt,
Header = reth_primitives::Header,
Block = N::Block,
Header = N::BlockHeader,
>,
{
let (handle, network, txpool, eth) = builder

View File

@ -9,7 +9,8 @@ use crate::{
};
use reth_consensus::FullConsensus;
use reth_evm::execute::BlockExecutorProvider;
use reth_node_api::{HeaderTy, NodeTypes, NodeTypesWithEngine, TxTy};
use reth_network::NetworkPrimitives;
use reth_node_api::{BodyTy, HeaderTy, NodeTypes, NodeTypesWithEngine, TxTy};
use reth_payload_builder::PayloadBuilderHandle;
use reth_transaction_pool::{PoolTransaction, TransactionPool};
use std::{future::Future, marker::PhantomData};
@ -295,13 +296,34 @@ impl<Node, PoolB, PayloadB, NetworkB, ExecB, ConsB> NodeComponentsBuilder<Node>
for ComponentsBuilder<Node, PoolB, PayloadB, NetworkB, ExecB, ConsB>
where
Node: FullNodeTypes,
PoolB: PoolBuilder<Node>,
NetworkB: NetworkBuilder<Node, PoolB::Pool>,
PoolB: PoolBuilder<
Node,
Pool: TransactionPool<
Transaction: PoolTransaction<
Pooled = <NetworkB::Primitives as NetworkPrimitives>::PooledTransaction,
>,
>,
>,
NetworkB: NetworkBuilder<
Node,
PoolB::Pool,
Primitives: NetworkPrimitives<
BlockHeader = HeaderTy<Node::Types>,
BlockBody = BodyTy<Node::Types>,
>,
>,
PayloadB: PayloadServiceBuilder<Node, PoolB::Pool>,
ExecB: ExecutorBuilder<Node>,
ConsB: ConsensusBuilder<Node>,
{
type Components = Components<Node, PoolB::Pool, ExecB::EVM, ExecB::Executor, ConsB::Consensus>;
type Components = Components<
Node,
NetworkB::Primitives,
PoolB::Pool,
ExecB::EVM,
ExecB::Executor,
ConsB::Consensus,
>;
async fn build_components(
self,
@ -369,11 +391,12 @@ pub trait NodeComponentsBuilder<Node: FullNodeTypes>: Send {
) -> impl Future<Output = eyre::Result<Self::Components>> + Send;
}
impl<Node, F, Fut, Pool, EVM, Executor, Cons> NodeComponentsBuilder<Node> for F
impl<Node, N, F, Fut, Pool, EVM, Executor, Cons> NodeComponentsBuilder<Node> for F
where
N: NetworkPrimitives<BlockHeader = HeaderTy<Node::Types>, BlockBody = BodyTy<Node::Types>>,
Node: FullNodeTypes,
F: FnOnce(&BuilderContext<Node>) -> Fut + Send,
Fut: Future<Output = eyre::Result<Components<Node, Pool, EVM, Executor, Cons>>> + Send,
Fut: Future<Output = eyre::Result<Components<Node, N, Pool, EVM, Executor, Cons>>> + Send,
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>>
+ Unpin
+ 'static,
@ -381,7 +404,7 @@ where
Executor: BlockExecutorProvider<Primitives = <Node::Types as NodeTypes>::Primitives>,
Cons: FullConsensus<<Node::Types as NodeTypes>::Primitives> + Clone + Unpin + 'static,
{
type Components = Components<Node, Pool, EVM, Executor, Cons>;
type Components = Components<Node, N, Pool, EVM, Executor, Cons>;
fn build_components(
self,

View File

@ -20,13 +20,14 @@ pub use execute::*;
pub use network::*;
pub use payload::*;
pub use pool::*;
use reth_network_p2p::BlockClient;
use crate::{ConfigureEvm, FullNodeTypes};
use reth_consensus::FullConsensus;
use reth_evm::execute::BlockExecutorProvider;
use reth_network::NetworkHandle;
use reth_network::{NetworkHandle, NetworkPrimitives};
use reth_network_api::FullNetwork;
use reth_node_api::{HeaderTy, NodeTypes, NodeTypesWithEngine, PayloadBuilder, TxTy};
use reth_node_api::{BodyTy, HeaderTy, NodeTypes, NodeTypesWithEngine, PayloadBuilder, TxTy};
use reth_payload_builder::PayloadBuilderHandle;
use reth_transaction_pool::{PoolTransaction, TransactionPool};
@ -49,7 +50,9 @@ pub trait NodeComponents<T: FullNodeTypes>: Clone + Unpin + Send + Sync + 'stati
type Consensus: FullConsensus<<T::Types as NodeTypes>::Primitives> + Clone + Unpin + 'static;
/// Network API.
type Network: FullNetwork;
type Network: FullNetwork<
Client: BlockClient<Header = HeaderTy<T::Types>, Body = BodyTy<T::Types>>,
>;
/// Builds new blocks.
type PayloadBuilder: PayloadBuilder<PayloadType = <T::Types as NodeTypesWithEngine>::Engine>
@ -78,7 +81,7 @@ pub trait NodeComponents<T: FullNodeTypes>: Clone + Unpin + Send + Sync + 'stati
///
/// This provides access to all the components of the node.
#[derive(Debug)]
pub struct Components<Node: FullNodeTypes, Pool, EVM, Executor, Consensus> {
pub struct Components<Node: FullNodeTypes, N: NetworkPrimitives, Pool, EVM, Executor, Consensus> {
/// The transaction pool of the node.
pub transaction_pool: Pool,
/// The node's EVM configuration, defining settings for the Ethereum Virtual Machine.
@ -88,14 +91,15 @@ pub struct Components<Node: FullNodeTypes, Pool, EVM, Executor, Consensus> {
/// The consensus implementation of the node.
pub consensus: Consensus,
/// The network implementation of the node.
pub network: NetworkHandle,
pub network: NetworkHandle<N>,
/// The handle to the payload builder service.
pub payload_builder: PayloadBuilderHandle<<Node::Types as NodeTypesWithEngine>::Engine>,
}
impl<Node, Pool, EVM, Executor, Cons> NodeComponents<Node>
for Components<Node, Pool, EVM, Executor, Cons>
impl<Node, Pool, EVM, Executor, Cons, N> NodeComponents<Node>
for Components<Node, N, Pool, EVM, Executor, Cons>
where
N: NetworkPrimitives<BlockHeader = HeaderTy<Node::Types>, BlockBody = BodyTy<Node::Types>>,
Node: FullNodeTypes,
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>>
+ Unpin
@ -108,7 +112,7 @@ where
type Evm = EVM;
type Executor = Executor;
type Consensus = Cons;
type Network = NetworkHandle;
type Network = NetworkHandle<N>;
type PayloadBuilder = PayloadBuilderHandle<<Node::Types as NodeTypesWithEngine>::Engine>;
fn pool(&self) -> &Self::Pool {
@ -136,8 +140,9 @@ where
}
}
impl<Node, Pool, EVM, Executor, Cons> Clone for Components<Node, Pool, EVM, Executor, Cons>
impl<Node, N, Pool, EVM, Executor, Cons> Clone for Components<Node, N, Pool, EVM, Executor, Cons>
where
N: NetworkPrimitives,
Node: FullNodeTypes,
Pool: TransactionPool,
EVM: ConfigureEvm<Header = HeaderTy<Node::Types>, Transaction = TxTy<Node::Types>>,

View File

@ -2,33 +2,39 @@
use std::future::Future;
use reth_network::NetworkHandle;
use reth_network::{NetworkHandle, NetworkPrimitives};
use reth_transaction_pool::TransactionPool;
use crate::{BuilderContext, FullNodeTypes};
/// A type that knows how to build the network implementation.
pub trait NetworkBuilder<Node: FullNodeTypes, Pool: TransactionPool>: Send {
/// The primitive types to use for the network.
type Primitives: NetworkPrimitives;
/// Launches the network implementation and returns the handle to it.
fn build_network(
self,
ctx: &BuilderContext<Node>,
pool: Pool,
) -> impl Future<Output = eyre::Result<NetworkHandle>> + Send;
) -> impl Future<Output = eyre::Result<NetworkHandle<Self::Primitives>>> + Send;
}
impl<Node, F, Fut, Pool> NetworkBuilder<Node, Pool> for F
impl<Node, P, F, Fut, Pool> NetworkBuilder<Node, Pool> for F
where
Node: FullNodeTypes,
P: NetworkPrimitives,
Pool: TransactionPool,
F: Fn(&BuilderContext<Node>, Pool) -> Fut + Send,
Fut: Future<Output = eyre::Result<NetworkHandle>> + Send,
Fut: Future<Output = eyre::Result<NetworkHandle<P>>> + Send,
{
type Primitives = P;
fn build_network(
self,
ctx: &BuilderContext<Node>,
pool: Pool,
) -> impl Future<Output = eyre::Result<NetworkHandle>> + Send {
) -> impl Future<Output = eyre::Result<NetworkHandle<P>>> + Send {
self(ctx, pool)
}
}

View File

@ -29,6 +29,7 @@ use reth_node_core::{
args::InvalidBlockHookType,
dirs::{ChainPath, DataDirPath},
node_config::NodeConfig,
primitives::BlockHeader,
version::{
BUILD_PROFILE_NAME, CARGO_PKG_VERSION, VERGEN_BUILD_TIMESTAMP, VERGEN_CARGO_FEATURES,
VERGEN_CARGO_TARGET_TRIPLE, VERGEN_GIT_SHA,
@ -719,7 +720,7 @@ where
/// necessary
pub async fn max_block<C>(&self, client: C) -> eyre::Result<Option<BlockNumber>>
where
C: HeadersClient<Header = alloy_consensus::Header>,
C: HeadersClient<Header: BlockHeader>,
{
self.node_config().max_block(client, self.provider_factory().clone()).await
}

View File

@ -14,7 +14,7 @@ use reth_exex::ExExManagerHandle;
use reth_network_p2p::{
bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader, BlockClient,
};
use reth_node_api::{BodyTy, HeaderTy, NodePrimitives};
use reth_node_api::{BodyTy, HeaderTy};
use reth_provider::{providers::ProviderNodeTypes, ProviderFactory};
use reth_stages::{prelude::DefaultStages, stages::ExecutionStage, Pipeline, StageSet};
use reth_static_file::StaticFileProducer;
@ -41,7 +41,6 @@ where
N: ProviderNodeTypes,
Client: BlockClient<Header = HeaderTy<N>, Body = BodyTy<N>> + 'static,
Executor: BlockExecutorProvider<Primitives = N::Primitives>,
N::Primitives: NodePrimitives<BlockHeader = reth_primitives::Header>,
{
// building network downloaders using the fetch client
let header_downloader = ReverseHeadersDownloaderBuilder::new(config.headers)
@ -89,7 +88,6 @@ where
H: HeaderDownloader<Header = HeaderTy<N>> + 'static,
B: BodyDownloader<Header = HeaderTy<N>, Body = BodyTy<N>> + 'static,
Executor: BlockExecutorProvider<Primitives = N::Primitives>,
N::Primitives: NodePrimitives<BlockHeader = reth_primitives::Header>,
{
let mut builder = Pipeline::<N>::builder();

View File

@ -11,7 +11,7 @@ use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGenera
use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks};
use reth_db::transaction::{DbTx, DbTxMut};
use reth_evm::{execute::BasicBlockExecutorProvider, ConfigureEvm};
use reth_network::{NetworkConfig, NetworkHandle, NetworkManager, PeersInfo};
use reth_network::{EthNetworkPrimitives, NetworkConfig, NetworkHandle, NetworkManager, PeersInfo};
use reth_node_api::{AddOnsContext, EngineValidator, FullNodeComponents, NodeAddOns, TxTy};
use reth_node_builder::{
components::{
@ -656,6 +656,8 @@ where
> + Unpin
+ 'static,
{
type Primitives = EthNetworkPrimitives;
async fn build_network(
self,
ctx: &BuilderContext<Node>,

View File

@ -215,7 +215,7 @@ where
impl<P, H, B> OnlineStages<P, H, B>
where
P: HeaderSyncGapProvider + 'static,
H: HeaderDownloader<Header = alloy_consensus::Header> + 'static,
H: HeaderDownloader + 'static,
B: BodyDownloader + 'static,
{
/// Create a new builder using the given headers stage.
@ -236,7 +236,7 @@ where
provider: P,
tip: watch::Receiver<B256>,
header_downloader: H,
consensus: Arc<dyn Consensus>,
consensus: Arc<dyn Consensus<H::Header, B::Body>>,
stages_config: StageConfig,
) -> StageSetBuilder<Provider>
where
@ -258,7 +258,7 @@ where
impl<Provider, P, H, B> StageSet<Provider> for OnlineStages<P, H, B>
where
P: HeaderSyncGapProvider + 'static,
H: HeaderDownloader<Header = alloy_consensus::Header> + 'static,
H: HeaderDownloader + 'static,
B: BodyDownloader + 'static,
HeaderStage<P, H>: Stage<Provider>,
BodyStage<B>: Stage<Provider>,

View File

@ -1,5 +1,5 @@
use crate::stages::MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD;
use alloy_consensus::{BlockHeader, Header};
use alloy_consensus::{BlockHeader, Header, Sealable};
use alloy_eips::{eip1898::BlockWithParent, NumHash};
use alloy_primitives::BlockNumber;
use num_traits::Zero;
@ -194,10 +194,7 @@ where
unwind_to: Option<u64>,
) -> Result<(), StageError>
where
Provider: StaticFileProviderFactory
+ DBProvider
+ BlockReader
+ HeaderProvider<Header = reth_primitives::Header>,
Provider: StaticFileProviderFactory + DBProvider + BlockReader + HeaderProvider,
{
// If thre's any receipts pruning configured, receipts are written directly to database and
// inconsistencies are expected.
@ -267,7 +264,7 @@ where
impl<E, Provider> Stage<Provider> for ExecutionStage<E>
where
E: BlockExecutorProvider<Primitives: NodePrimitives<BlockHeader = alloy_consensus::Header>>,
E: BlockExecutorProvider,
Provider: DBProvider
+ BlockReader<
Block = <E::Primitives as NodePrimitives>::Block,

View File

@ -136,7 +136,7 @@ where
Provider: DBProvider<Tx: DbTxMut>
+ TrieWriter
+ StatsReader
+ HeaderProvider<Header = alloy_consensus::Header>
+ HeaderProvider
+ StageCheckpointReader
+ StageCheckpointWriter,
{
@ -344,18 +344,18 @@ where
/// Check that the computed state root matches the root in the expected header.
#[inline]
fn validate_state_root(
fn validate_state_root<H: BlockHeader + Debug>(
got: B256,
expected: SealedHeader,
expected: SealedHeader<H>,
target_block: BlockNumber,
) -> Result<(), StageError> {
if got == expected.state_root {
if got == expected.state_root() {
Ok(())
} else {
error!(target: "sync::stages::merkle", ?target_block, ?got, ?expected, "Failed to verify block state root! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
Err(StageError::Block {
error: BlockErrorKind::Validation(ConsensusError::BodyStateRootDiff(
GotExpected { got, expected: expected.state_root }.into(),
GotExpected { got, expected: expected.state_root() }.into(),
)),
block: Box::new(expected.block_with_parent()),
})

View File

@ -59,7 +59,7 @@ impl Default for SenderRecoveryStage {
impl<Provider> Stage<Provider> for SenderRecoveryStage
where
Provider: DBProvider<Tx: DbTxMut>
+ BlockReader<Header = reth_primitives::Header>
+ BlockReader
+ StaticFileProviderFactory<Primitives: NodePrimitives<SignedTx: Value + SignedTransaction>>
+ StatsReader
+ PruneCheckpointReader,
@ -146,8 +146,7 @@ fn recover_range<Provider, CURSOR>(
senders_cursor: &mut CURSOR,
) -> Result<(), StageError>
where
Provider:
DBProvider + HeaderProvider<Header = reth_primitives::Header> + StaticFileProviderFactory,
Provider: DBProvider + HeaderProvider + StaticFileProviderFactory,
CURSOR: DbCursorRW<tables::TransactionSenders>,
{
debug!(target: "sync::stages::sender_recovery", ?tx_range, "Sending batch for processing");