mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat: make more network components generic over primitives (#12481)
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -7349,6 +7349,7 @@ dependencies = [
|
||||
"reth-chainspec",
|
||||
"reth-codecs-derive",
|
||||
"reth-primitives",
|
||||
"reth-primitives-traits",
|
||||
"serde",
|
||||
"thiserror",
|
||||
]
|
||||
@ -7784,6 +7785,7 @@ dependencies = [
|
||||
"reth-network-peers",
|
||||
"reth-network-types",
|
||||
"reth-primitives",
|
||||
"reth-primitives-traits",
|
||||
"reth-provider",
|
||||
"reth-storage-api",
|
||||
"reth-tasks",
|
||||
|
||||
@ -16,9 +16,9 @@ workspace = true
|
||||
reth-chainspec.workspace = true
|
||||
reth-codecs-derive.workspace = true
|
||||
reth-primitives.workspace = true
|
||||
reth-primitives-traits.workspace = true
|
||||
|
||||
# ethereum
|
||||
alloy-consensus.workspace = true
|
||||
alloy-chains = { workspace = true, features = ["rlp"] }
|
||||
alloy-eips.workspace = true
|
||||
alloy-primitives.workspace = true
|
||||
@ -54,7 +54,8 @@ arbitrary = [
|
||||
"reth-chainspec/arbitrary",
|
||||
"alloy-consensus/arbitrary",
|
||||
"alloy-eips/arbitrary",
|
||||
"alloy-primitives/arbitrary"
|
||||
"alloy-primitives/arbitrary",
|
||||
"reth-primitives-traits/arbitrary",
|
||||
]
|
||||
serde = [
|
||||
"dep:serde",
|
||||
|
||||
@ -2,8 +2,8 @@
|
||||
|
||||
use std::fmt::Debug;
|
||||
|
||||
use alloy_consensus::BlockHeader;
|
||||
use alloy_rlp::{Decodable, Encodable};
|
||||
use reth_primitives_traits::{Block, BlockHeader};
|
||||
|
||||
/// Abstraction over primitive types which might appear in network messages. See
|
||||
/// [`crate::EthMessage`] for more context.
|
||||
@ -34,7 +34,8 @@ pub trait NetworkPrimitives:
|
||||
+ Eq
|
||||
+ 'static;
|
||||
/// Full block type.
|
||||
type Block: Encodable
|
||||
type Block: Block<Header = Self::BlockHeader, Body = Self::BlockBody>
|
||||
+ Encodable
|
||||
+ Decodable
|
||||
+ Send
|
||||
+ Sync
|
||||
|
||||
@ -16,6 +16,7 @@ workspace = true
|
||||
reth-chainspec.workspace = true
|
||||
reth-fs-util.workspace = true
|
||||
reth-primitives = { workspace = true, features = ["secp256k1"] }
|
||||
reth-primitives-traits.workspace = true
|
||||
reth-net-banlist.workspace = true
|
||||
reth-network-api.workspace = true
|
||||
reth-network-p2p.workspace = true
|
||||
@ -130,7 +131,8 @@ test-utils = [
|
||||
"reth-discv4/test-utils",
|
||||
"reth-network/test-utils",
|
||||
"reth-network-p2p/test-utils",
|
||||
"reth-primitives/test-utils"
|
||||
"reth-primitives/test-utils",
|
||||
"reth-primitives-traits/test-utils",
|
||||
]
|
||||
|
||||
[[bench]]
|
||||
|
||||
@ -7,7 +7,7 @@ use reth_network_peers::PeerId;
|
||||
use crate::message::NewBlockMessage;
|
||||
|
||||
/// Abstraction over block import.
|
||||
pub trait BlockImport: std::fmt::Debug + Send + Sync {
|
||||
pub trait BlockImport<B = reth_primitives::Block>: std::fmt::Debug + Send + Sync {
|
||||
/// Invoked for a received `NewBlock` broadcast message from the peer.
|
||||
///
|
||||
/// > When a `NewBlock` announcement message is received from a peer, the client first verifies
|
||||
@ -15,35 +15,35 @@ pub trait BlockImport: std::fmt::Debug + Send + Sync {
|
||||
///
|
||||
/// This is supposed to start verification. The results are then expected to be returned via
|
||||
/// [`BlockImport::poll`].
|
||||
fn on_new_block(&mut self, peer_id: PeerId, incoming_block: NewBlockMessage);
|
||||
fn on_new_block(&mut self, peer_id: PeerId, incoming_block: NewBlockMessage<B>);
|
||||
|
||||
/// Returns the results of a [`BlockImport::on_new_block`]
|
||||
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<BlockImportOutcome>;
|
||||
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<BlockImportOutcome<B>>;
|
||||
}
|
||||
|
||||
/// Outcome of the [`BlockImport`]'s block handling.
|
||||
#[derive(Debug)]
|
||||
pub struct BlockImportOutcome {
|
||||
pub struct BlockImportOutcome<B = reth_primitives::Block> {
|
||||
/// Sender of the `NewBlock` message.
|
||||
pub peer: PeerId,
|
||||
/// The result after validating the block
|
||||
pub result: Result<BlockValidation, BlockImportError>,
|
||||
pub result: Result<BlockValidation<B>, BlockImportError>,
|
||||
}
|
||||
|
||||
/// Represents the successful validation of a received `NewBlock` message.
|
||||
#[derive(Debug)]
|
||||
pub enum BlockValidation {
|
||||
pub enum BlockValidation<B> {
|
||||
/// Basic Header validity check, after which the block should be relayed to peers via a
|
||||
/// `NewBlock` message
|
||||
ValidHeader {
|
||||
/// received block
|
||||
block: NewBlockMessage,
|
||||
block: NewBlockMessage<B>,
|
||||
},
|
||||
/// Successfully imported: state-root matches after execution. The block should be relayed via
|
||||
/// `NewBlockHashes`
|
||||
ValidBlock {
|
||||
/// validated block.
|
||||
block: NewBlockMessage,
|
||||
block: NewBlockMessage<B>,
|
||||
},
|
||||
}
|
||||
|
||||
@ -62,10 +62,10 @@ pub enum BlockImportError {
|
||||
#[non_exhaustive]
|
||||
pub struct ProofOfStakeBlockImport;
|
||||
|
||||
impl BlockImport for ProofOfStakeBlockImport {
|
||||
fn on_new_block(&mut self, _peer_id: PeerId, _incoming_block: NewBlockMessage) {}
|
||||
impl<B> BlockImport<B> for ProofOfStakeBlockImport {
|
||||
fn on_new_block(&mut self, _peer_id: PeerId, _incoming_block: NewBlockMessage<B>) {}
|
||||
|
||||
fn poll(&mut self, _cx: &mut Context<'_>) -> Poll<BlockImportOutcome> {
|
||||
fn poll(&mut self, _cx: &mut Context<'_>) -> Poll<BlockImportOutcome<B>> {
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
@ -8,6 +8,7 @@ use std::{
|
||||
task::{ready, Context, Poll},
|
||||
};
|
||||
|
||||
use alloy_consensus::BlockHeader;
|
||||
use alloy_primitives::{Bytes, B256};
|
||||
use futures::FutureExt;
|
||||
use reth_eth_wire::{
|
||||
@ -23,30 +24,30 @@ use tokio::sync::oneshot;
|
||||
|
||||
/// Internal form of a `NewBlock` message
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct NewBlockMessage {
|
||||
pub struct NewBlockMessage<B = reth_primitives::Block> {
|
||||
/// Hash of the block
|
||||
pub hash: B256,
|
||||
/// Raw received message
|
||||
pub block: Arc<NewBlock>,
|
||||
pub block: Arc<NewBlock<B>>,
|
||||
}
|
||||
|
||||
// === impl NewBlockMessage ===
|
||||
|
||||
impl NewBlockMessage {
|
||||
impl<B: reth_primitives_traits::Block> NewBlockMessage<B> {
|
||||
/// Returns the block number of the block
|
||||
pub fn number(&self) -> u64 {
|
||||
self.block.block.header.number
|
||||
self.block.block.header().number()
|
||||
}
|
||||
}
|
||||
|
||||
/// All Bi-directional eth-message variants that can be sent to a session or received from a
|
||||
/// session.
|
||||
#[derive(Debug)]
|
||||
pub enum PeerMessage {
|
||||
pub enum PeerMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
|
||||
/// Announce new block hashes
|
||||
NewBlockHashes(NewBlockHashes),
|
||||
/// Broadcast new block.
|
||||
NewBlock(NewBlockMessage),
|
||||
NewBlock(NewBlockMessage<N::Block>),
|
||||
/// Received transactions _from_ the peer
|
||||
ReceivedTransaction(Transactions),
|
||||
/// Broadcast transactions _from_ local _to_ a peer.
|
||||
@ -54,7 +55,7 @@ pub enum PeerMessage {
|
||||
/// Send new pooled transactions
|
||||
PooledTransactions(NewPooledTransactionHashes),
|
||||
/// All `eth` request variants.
|
||||
EthRequest(PeerRequest),
|
||||
EthRequest(PeerRequest<N>),
|
||||
/// Other than eth namespace message
|
||||
Other(RawCapabilityMessage),
|
||||
}
|
||||
|
||||
@ -11,18 +11,20 @@ use std::{
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use alloy_primitives::Sealable;
|
||||
use futures::{stream::Fuse, SinkExt, StreamExt};
|
||||
use metrics::Gauge;
|
||||
use reth_eth_wire::{
|
||||
errors::{EthHandshakeError, EthStreamError, P2PStreamError},
|
||||
message::{EthBroadcastMessage, RequestPair},
|
||||
Capabilities, DisconnectP2P, DisconnectReason, EthMessage,
|
||||
Capabilities, DisconnectP2P, DisconnectReason, EthMessage, NetworkPrimitives,
|
||||
};
|
||||
use reth_metrics::common::mpsc::MeteredPollSender;
|
||||
use reth_network_api::PeerRequest;
|
||||
use reth_network_p2p::error::RequestError;
|
||||
use reth_network_peers::PeerId;
|
||||
use reth_network_types::session::config::INITIAL_REQUEST_TIMEOUT;
|
||||
use reth_primitives_traits::Block;
|
||||
use rustc_hash::FxHashMap;
|
||||
use tokio::{
|
||||
sync::{mpsc::error::TrySendError, oneshot},
|
||||
@ -62,11 +64,11 @@ const TIMEOUT_SCALING: u32 = 3;
|
||||
/// - incoming requests/broadcasts _from remote_ via the connection
|
||||
/// - responses for handled ETH requests received from the remote peer.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) struct ActiveSession {
|
||||
pub(crate) struct ActiveSession<N: NetworkPrimitives> {
|
||||
/// Keeps track of request ids.
|
||||
pub(crate) next_id: u64,
|
||||
/// The underlying connection.
|
||||
pub(crate) conn: EthRlpxConnection,
|
||||
pub(crate) conn: EthRlpxConnection<N>,
|
||||
/// Identifier of the node we're connected to.
|
||||
pub(crate) remote_peer_id: PeerId,
|
||||
/// The address we're connected to.
|
||||
@ -76,19 +78,19 @@ pub(crate) struct ActiveSession {
|
||||
/// Internal identifier of this session
|
||||
pub(crate) session_id: SessionId,
|
||||
/// Incoming commands from the manager
|
||||
pub(crate) commands_rx: ReceiverStream<SessionCommand>,
|
||||
pub(crate) commands_rx: ReceiverStream<SessionCommand<N>>,
|
||||
/// Sink to send messages to the [`SessionManager`](super::SessionManager).
|
||||
pub(crate) to_session_manager: MeteredPollSender<ActiveSessionMessage>,
|
||||
pub(crate) to_session_manager: MeteredPollSender<ActiveSessionMessage<N>>,
|
||||
/// A message that needs to be delivered to the session manager
|
||||
pub(crate) pending_message_to_session: Option<ActiveSessionMessage>,
|
||||
pub(crate) pending_message_to_session: Option<ActiveSessionMessage<N>>,
|
||||
/// Incoming internal requests which are delegated to the remote peer.
|
||||
pub(crate) internal_request_tx: Fuse<ReceiverStream<PeerRequest>>,
|
||||
pub(crate) internal_request_tx: Fuse<ReceiverStream<PeerRequest<N>>>,
|
||||
/// All requests sent to the remote peer we're waiting on a response
|
||||
pub(crate) inflight_requests: FxHashMap<u64, InflightRequest>,
|
||||
pub(crate) inflight_requests: FxHashMap<u64, InflightRequest<PeerRequest<N>>>,
|
||||
/// All requests that were sent by the remote peer and we're waiting on an internal response
|
||||
pub(crate) received_requests_from_remote: Vec<ReceivedRequest>,
|
||||
pub(crate) received_requests_from_remote: Vec<ReceivedRequest<N>>,
|
||||
/// Buffered messages that should be handled and sent to the peer.
|
||||
pub(crate) queued_outgoing: QueuedOutgoingMessages,
|
||||
pub(crate) queued_outgoing: QueuedOutgoingMessages<N>,
|
||||
/// The maximum time we wait for a response from a peer.
|
||||
pub(crate) internal_request_timeout: Arc<AtomicU64>,
|
||||
/// Interval when to check for timed out requests.
|
||||
@ -97,10 +99,11 @@ pub(crate) struct ActiveSession {
|
||||
/// considered a protocol violation and the session will initiate a drop.
|
||||
pub(crate) protocol_breach_request_timeout: Duration,
|
||||
/// Used to reserve a slot to guarantee that the termination message is delivered
|
||||
pub(crate) terminate_message: Option<(PollSender<ActiveSessionMessage>, ActiveSessionMessage)>,
|
||||
pub(crate) terminate_message:
|
||||
Option<(PollSender<ActiveSessionMessage<N>>, ActiveSessionMessage<N>)>,
|
||||
}
|
||||
|
||||
impl ActiveSession {
|
||||
impl<N: NetworkPrimitives> ActiveSession<N> {
|
||||
/// Returns `true` if the session is currently in the process of disconnecting
|
||||
fn is_disconnecting(&self) -> bool {
|
||||
self.conn.inner().is_disconnecting()
|
||||
@ -122,7 +125,7 @@ impl ActiveSession {
|
||||
/// Handle a message read from the connection.
|
||||
///
|
||||
/// Returns an error if the message is considered to be in violation of the protocol.
|
||||
fn on_incoming_message(&mut self, msg: EthMessage) -> OnIncomingMessageOutcome {
|
||||
fn on_incoming_message(&mut self, msg: EthMessage<N>) -> OnIncomingMessageOutcome<N> {
|
||||
/// A macro that handles an incoming request
|
||||
/// This creates a new channel and tries to send the sender half to the session while
|
||||
/// storing the receiver half internally so the pending response can be polled.
|
||||
@ -182,7 +185,7 @@ impl ActiveSession {
|
||||
}
|
||||
EthMessage::NewBlock(msg) => {
|
||||
let block =
|
||||
NewBlockMessage { hash: msg.block.header.hash_slow(), block: Arc::new(*msg) };
|
||||
NewBlockMessage { hash: msg.block.header().hash_slow(), block: Arc::new(*msg) };
|
||||
self.try_emit_broadcast(PeerMessage::NewBlock(block)).into()
|
||||
}
|
||||
EthMessage::Transactions(msg) => {
|
||||
@ -238,7 +241,7 @@ impl ActiveSession {
|
||||
}
|
||||
|
||||
/// Handle an internal peer request that will be sent to the remote.
|
||||
fn on_internal_peer_request(&mut self, request: PeerRequest, deadline: Instant) {
|
||||
fn on_internal_peer_request(&mut self, request: PeerRequest<N>, deadline: Instant) {
|
||||
let request_id = self.next_id();
|
||||
let msg = request.create_request_message(request_id);
|
||||
self.queued_outgoing.push_back(msg.into());
|
||||
@ -251,7 +254,7 @@ impl ActiveSession {
|
||||
}
|
||||
|
||||
/// Handle a message received from the internal network
|
||||
fn on_internal_peer_message(&mut self, msg: PeerMessage) {
|
||||
fn on_internal_peer_message(&mut self, msg: PeerMessage<N>) {
|
||||
match msg {
|
||||
PeerMessage::NewBlockHashes(msg) => {
|
||||
self.queued_outgoing.push_back(EthMessage::NewBlockHashes(msg).into());
|
||||
@ -289,7 +292,7 @@ impl ActiveSession {
|
||||
/// Handle a Response to the peer
|
||||
///
|
||||
/// This will queue the response to be sent to the peer
|
||||
fn handle_outgoing_response(&mut self, id: u64, resp: PeerResponseResult) {
|
||||
fn handle_outgoing_response(&mut self, id: u64, resp: PeerResponseResult<N>) {
|
||||
match resp.try_into_message(id) {
|
||||
Ok(msg) => {
|
||||
self.queued_outgoing.push_back(msg.into());
|
||||
@ -304,7 +307,7 @@ impl ActiveSession {
|
||||
///
|
||||
/// Returns the message if the bounded channel is currently unable to handle this message.
|
||||
#[allow(clippy::result_large_err)]
|
||||
fn try_emit_broadcast(&self, message: PeerMessage) -> Result<(), ActiveSessionMessage> {
|
||||
fn try_emit_broadcast(&self, message: PeerMessage<N>) -> Result<(), ActiveSessionMessage<N>> {
|
||||
let Some(sender) = self.to_session_manager.inner().get_ref() else { return Ok(()) };
|
||||
|
||||
match sender
|
||||
@ -330,7 +333,7 @@ impl ActiveSession {
|
||||
///
|
||||
/// Returns the message if the bounded channel is currently unable to handle this message.
|
||||
#[allow(clippy::result_large_err)]
|
||||
fn try_emit_request(&self, message: PeerMessage) -> Result<(), ActiveSessionMessage> {
|
||||
fn try_emit_request(&self, message: PeerMessage<N>) -> Result<(), ActiveSessionMessage<N>> {
|
||||
let Some(sender) = self.to_session_manager.inner().get_ref() else { return Ok(()) };
|
||||
|
||||
match sender
|
||||
@ -470,7 +473,7 @@ impl ActiveSession {
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for ActiveSession {
|
||||
impl<N: NetworkPrimitives> Future for ActiveSession<N> {
|
||||
type Output = ();
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
@ -656,20 +659,20 @@ impl Future for ActiveSession {
|
||||
}
|
||||
|
||||
/// Tracks a request received from the peer
|
||||
pub(crate) struct ReceivedRequest {
|
||||
pub(crate) struct ReceivedRequest<N: NetworkPrimitives> {
|
||||
/// Protocol Identifier
|
||||
request_id: u64,
|
||||
/// Receiver half of the channel that's supposed to receive the proper response.
|
||||
rx: PeerResponse,
|
||||
rx: PeerResponse<N>,
|
||||
/// Timestamp when we read this msg from the wire.
|
||||
#[allow(dead_code)]
|
||||
received: Instant,
|
||||
}
|
||||
|
||||
/// A request that waits for a response from the peer
|
||||
pub(crate) struct InflightRequest {
|
||||
pub(crate) struct InflightRequest<R> {
|
||||
/// Request we sent to peer and the internal response channel
|
||||
request: RequestState,
|
||||
request: RequestState<R>,
|
||||
/// Instant when the request was sent
|
||||
timestamp: Instant,
|
||||
/// Time limit for the response
|
||||
@ -678,7 +681,7 @@ pub(crate) struct InflightRequest {
|
||||
|
||||
// === impl InflightRequest ===
|
||||
|
||||
impl InflightRequest {
|
||||
impl<N: NetworkPrimitives> InflightRequest<PeerRequest<N>> {
|
||||
/// Returns true if the request is timedout
|
||||
#[inline]
|
||||
fn is_timed_out(&self, now: Instant) -> bool {
|
||||
@ -703,17 +706,19 @@ impl InflightRequest {
|
||||
}
|
||||
|
||||
/// All outcome variants when handling an incoming message
|
||||
enum OnIncomingMessageOutcome {
|
||||
enum OnIncomingMessageOutcome<N: NetworkPrimitives> {
|
||||
/// Message successfully handled.
|
||||
Ok,
|
||||
/// Message is considered to be in violation of the protocol
|
||||
BadMessage { error: EthStreamError, message: EthMessage },
|
||||
BadMessage { error: EthStreamError, message: EthMessage<N> },
|
||||
/// Currently no capacity to handle the message
|
||||
NoCapacity(ActiveSessionMessage),
|
||||
NoCapacity(ActiveSessionMessage<N>),
|
||||
}
|
||||
|
||||
impl From<Result<(), ActiveSessionMessage>> for OnIncomingMessageOutcome {
|
||||
fn from(res: Result<(), ActiveSessionMessage>) -> Self {
|
||||
impl<N: NetworkPrimitives> From<Result<(), ActiveSessionMessage<N>>>
|
||||
for OnIncomingMessageOutcome<N>
|
||||
{
|
||||
fn from(res: Result<(), ActiveSessionMessage<N>>) -> Self {
|
||||
match res {
|
||||
Ok(_) => Self::Ok,
|
||||
Err(msg) => Self::NoCapacity(msg),
|
||||
@ -721,29 +726,29 @@ impl From<Result<(), ActiveSessionMessage>> for OnIncomingMessageOutcome {
|
||||
}
|
||||
}
|
||||
|
||||
enum RequestState {
|
||||
enum RequestState<R> {
|
||||
/// Waiting for the response
|
||||
Waiting(PeerRequest),
|
||||
Waiting(R),
|
||||
/// Request already timed out
|
||||
TimedOut,
|
||||
}
|
||||
|
||||
/// Outgoing messages that can be sent over the wire.
|
||||
pub(crate) enum OutgoingMessage {
|
||||
pub(crate) enum OutgoingMessage<N: NetworkPrimitives> {
|
||||
/// A message that is owned.
|
||||
Eth(EthMessage),
|
||||
Eth(EthMessage<N>),
|
||||
/// A message that may be shared by multiple sessions.
|
||||
Broadcast(EthBroadcastMessage),
|
||||
Broadcast(EthBroadcastMessage<N>),
|
||||
}
|
||||
|
||||
impl From<EthMessage> for OutgoingMessage {
|
||||
fn from(value: EthMessage) -> Self {
|
||||
impl<N: NetworkPrimitives> From<EthMessage<N>> for OutgoingMessage<N> {
|
||||
fn from(value: EthMessage<N>) -> Self {
|
||||
Self::Eth(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<EthBroadcastMessage> for OutgoingMessage {
|
||||
fn from(value: EthBroadcastMessage) -> Self {
|
||||
impl<N: NetworkPrimitives> From<EthBroadcastMessage<N>> for OutgoingMessage<N> {
|
||||
fn from(value: EthBroadcastMessage<N>) -> Self {
|
||||
Self::Broadcast(value)
|
||||
}
|
||||
}
|
||||
@ -760,22 +765,22 @@ fn calculate_new_timeout(current_timeout: Duration, estimated_rtt: Duration) ->
|
||||
}
|
||||
|
||||
/// A helper struct that wraps the queue of outgoing messages and a metric to track their count
|
||||
pub(crate) struct QueuedOutgoingMessages {
|
||||
messages: VecDeque<OutgoingMessage>,
|
||||
pub(crate) struct QueuedOutgoingMessages<N: NetworkPrimitives> {
|
||||
messages: VecDeque<OutgoingMessage<N>>,
|
||||
count: Gauge,
|
||||
}
|
||||
|
||||
impl QueuedOutgoingMessages {
|
||||
impl<N: NetworkPrimitives> QueuedOutgoingMessages<N> {
|
||||
pub(crate) const fn new(metric: Gauge) -> Self {
|
||||
Self { messages: VecDeque::new(), count: metric }
|
||||
}
|
||||
|
||||
pub(crate) fn push_back(&mut self, message: OutgoingMessage) {
|
||||
pub(crate) fn push_back(&mut self, message: OutgoingMessage<N>) {
|
||||
self.messages.push_back(message);
|
||||
self.count.increment(1);
|
||||
}
|
||||
|
||||
pub(crate) fn pop_front(&mut self) -> Option<OutgoingMessage> {
|
||||
pub(crate) fn pop_front(&mut self) -> Option<OutgoingMessage<N>> {
|
||||
self.messages.pop_front().inspect(|_| self.count.decrement(1))
|
||||
}
|
||||
|
||||
@ -791,8 +796,8 @@ mod tests {
|
||||
use reth_chainspec::MAINNET;
|
||||
use reth_ecies::stream::ECIESStream;
|
||||
use reth_eth_wire::{
|
||||
EthStream, GetBlockBodies, HelloMessageWithProtocols, P2PStream, Status, StatusBuilder,
|
||||
UnauthedEthStream, UnauthedP2PStream,
|
||||
EthNetworkPrimitives, EthStream, GetBlockBodies, HelloMessageWithProtocols, P2PStream,
|
||||
Status, StatusBuilder, UnauthedEthStream, UnauthedP2PStream,
|
||||
};
|
||||
use reth_network_peers::pk2id;
|
||||
use reth_network_types::session::config::PROTOCOL_BREACH_REQUEST_TIMEOUT;
|
||||
@ -808,11 +813,11 @@ mod tests {
|
||||
HelloMessageWithProtocols::builder(pk2id(&server_key.public_key(SECP256K1))).build()
|
||||
}
|
||||
|
||||
struct SessionBuilder {
|
||||
struct SessionBuilder<N: NetworkPrimitives = EthNetworkPrimitives> {
|
||||
_remote_capabilities: Arc<Capabilities>,
|
||||
active_session_tx: mpsc::Sender<ActiveSessionMessage>,
|
||||
active_session_rx: ReceiverStream<ActiveSessionMessage>,
|
||||
to_sessions: Vec<mpsc::Sender<SessionCommand>>,
|
||||
active_session_tx: mpsc::Sender<ActiveSessionMessage<N>>,
|
||||
active_session_rx: ReceiverStream<ActiveSessionMessage<N>>,
|
||||
to_sessions: Vec<mpsc::Sender<SessionCommand<N>>>,
|
||||
secret_key: SecretKey,
|
||||
local_peer_id: PeerId,
|
||||
hello: HelloMessageWithProtocols,
|
||||
@ -821,7 +826,7 @@ mod tests {
|
||||
next_id: usize,
|
||||
}
|
||||
|
||||
impl SessionBuilder {
|
||||
impl<N: NetworkPrimitives> SessionBuilder<N> {
|
||||
fn next_id(&mut self) -> SessionId {
|
||||
let id = self.next_id;
|
||||
self.next_id += 1;
|
||||
@ -858,7 +863,7 @@ mod tests {
|
||||
})
|
||||
}
|
||||
|
||||
async fn connect_incoming(&mut self, stream: TcpStream) -> ActiveSession {
|
||||
async fn connect_incoming(&mut self, stream: TcpStream) -> ActiveSession<N> {
|
||||
let remote_addr = stream.local_addr().unwrap();
|
||||
let session_id = self.next_id();
|
||||
let (_disconnect_tx, disconnect_rx) = oneshot::channel();
|
||||
|
||||
@ -11,16 +11,16 @@ use reth_eth_wire::{
|
||||
errors::EthStreamError,
|
||||
message::EthBroadcastMessage,
|
||||
multiplex::{ProtocolProxy, RlpxSatelliteStream},
|
||||
EthMessage, EthStream, EthVersion, P2PStream,
|
||||
EthMessage, EthNetworkPrimitives, EthStream, EthVersion, NetworkPrimitives, P2PStream,
|
||||
};
|
||||
use tokio::net::TcpStream;
|
||||
|
||||
/// The type of the underlying peer network connection.
|
||||
pub type EthPeerConnection = EthStream<P2PStream<ECIESStream<TcpStream>>>;
|
||||
pub type EthPeerConnection<N> = EthStream<P2PStream<ECIESStream<TcpStream>>, N>;
|
||||
|
||||
/// Various connection types that at least support the ETH protocol.
|
||||
pub type EthSatelliteConnection =
|
||||
RlpxSatelliteStream<ECIESStream<TcpStream>, EthStream<ProtocolProxy>>;
|
||||
pub type EthSatelliteConnection<N = EthNetworkPrimitives> =
|
||||
RlpxSatelliteStream<ECIESStream<TcpStream>, EthStream<ProtocolProxy, N>>;
|
||||
|
||||
/// Connection types that support the ETH protocol.
|
||||
///
|
||||
@ -30,14 +30,14 @@ pub type EthSatelliteConnection =
|
||||
// This type is boxed because the underlying stream is ~6KB,
|
||||
// mostly coming from `P2PStream`'s `snap::Encoder` (2072), and `ECIESStream` (3600).
|
||||
#[derive(Debug)]
|
||||
pub enum EthRlpxConnection {
|
||||
pub enum EthRlpxConnection<N: NetworkPrimitives = EthNetworkPrimitives> {
|
||||
/// A connection that only supports the ETH protocol.
|
||||
EthOnly(Box<EthPeerConnection>),
|
||||
EthOnly(Box<EthPeerConnection<N>>),
|
||||
/// A connection that supports the ETH protocol and __at least one other__ `RLPx` protocol.
|
||||
Satellite(Box<EthSatelliteConnection>),
|
||||
Satellite(Box<EthSatelliteConnection<N>>),
|
||||
}
|
||||
|
||||
impl EthRlpxConnection {
|
||||
impl<N: NetworkPrimitives> EthRlpxConnection<N> {
|
||||
/// Returns the negotiated ETH version.
|
||||
#[inline]
|
||||
pub(crate) const fn version(&self) -> EthVersion {
|
||||
@ -78,7 +78,7 @@ impl EthRlpxConnection {
|
||||
#[inline]
|
||||
pub fn start_send_broadcast(
|
||||
&mut self,
|
||||
item: EthBroadcastMessage,
|
||||
item: EthBroadcastMessage<N>,
|
||||
) -> Result<(), EthStreamError> {
|
||||
match self {
|
||||
Self::EthOnly(conn) => conn.start_send_broadcast(item),
|
||||
@ -87,16 +87,16 @@ impl EthRlpxConnection {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<EthPeerConnection> for EthRlpxConnection {
|
||||
impl<N: NetworkPrimitives> From<EthPeerConnection<N>> for EthRlpxConnection<N> {
|
||||
#[inline]
|
||||
fn from(conn: EthPeerConnection) -> Self {
|
||||
fn from(conn: EthPeerConnection<N>) -> Self {
|
||||
Self::EthOnly(Box::new(conn))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<EthSatelliteConnection> for EthRlpxConnection {
|
||||
impl<N: NetworkPrimitives> From<EthSatelliteConnection<N>> for EthRlpxConnection<N> {
|
||||
#[inline]
|
||||
fn from(conn: EthSatelliteConnection) -> Self {
|
||||
fn from(conn: EthSatelliteConnection<N>) -> Self {
|
||||
Self::Satellite(Box::new(conn))
|
||||
}
|
||||
}
|
||||
@ -112,22 +112,22 @@ macro_rules! delegate_call {
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for EthRlpxConnection {
|
||||
type Item = Result<EthMessage, EthStreamError>;
|
||||
impl<N: NetworkPrimitives> Stream for EthRlpxConnection<N> {
|
||||
type Item = Result<EthMessage<N>, EthStreamError>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
delegate_call!(self.poll_next(cx))
|
||||
}
|
||||
}
|
||||
|
||||
impl Sink<EthMessage> for EthRlpxConnection {
|
||||
impl<N: NetworkPrimitives> Sink<EthMessage<N>> for EthRlpxConnection<N> {
|
||||
type Error = EthStreamError;
|
||||
|
||||
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
delegate_call!(self.poll_ready(cx))
|
||||
}
|
||||
|
||||
fn start_send(self: Pin<&mut Self>, item: EthMessage) -> Result<(), Self::Error> {
|
||||
fn start_send(self: Pin<&mut Self>, item: EthMessage<N>) -> Result<(), Self::Error> {
|
||||
delegate_call!(self.start_send(item))
|
||||
}
|
||||
|
||||
|
||||
@ -5,7 +5,7 @@ use std::{io, net::SocketAddr, sync::Arc, time::Instant};
|
||||
use reth_ecies::ECIESError;
|
||||
use reth_eth_wire::{
|
||||
capability::CapabilityMessage, errors::EthStreamError, Capabilities, DisconnectReason,
|
||||
EthVersion, Status,
|
||||
EthVersion, NetworkPrimitives, Status,
|
||||
};
|
||||
use reth_network_api::PeerInfo;
|
||||
use reth_network_peers::{NodeRecord, PeerId};
|
||||
@ -54,7 +54,7 @@ impl PendingSessionHandle {
|
||||
/// Within an active session that supports the `Ethereum Wire Protocol `, three high-level tasks can
|
||||
/// be performed: chain synchronization, block propagation and transaction exchange.
|
||||
#[derive(Debug)]
|
||||
pub struct ActiveSessionHandle {
|
||||
pub struct ActiveSessionHandle<N: NetworkPrimitives> {
|
||||
/// The direction of the session
|
||||
pub(crate) direction: Direction,
|
||||
/// The assigned id for this session
|
||||
@ -68,7 +68,7 @@ pub struct ActiveSessionHandle {
|
||||
/// Announced capabilities of the peer.
|
||||
pub(crate) capabilities: Arc<Capabilities>,
|
||||
/// Sender half of the command channel used send commands _to_ the spawned session
|
||||
pub(crate) commands_to_session: mpsc::Sender<SessionCommand>,
|
||||
pub(crate) commands_to_session: mpsc::Sender<SessionCommand<N>>,
|
||||
/// The client's name and version
|
||||
pub(crate) client_version: Arc<str>,
|
||||
/// The address we're connected to
|
||||
@ -81,7 +81,7 @@ pub struct ActiveSessionHandle {
|
||||
|
||||
// === impl ActiveSessionHandle ===
|
||||
|
||||
impl ActiveSessionHandle {
|
||||
impl<N: NetworkPrimitives> ActiveSessionHandle<N> {
|
||||
/// Sends a disconnect command to the session.
|
||||
pub fn disconnect(&self, reason: Option<DisconnectReason>) {
|
||||
// Note: we clone the sender which ensures the channel has capacity to send the message
|
||||
@ -93,7 +93,7 @@ impl ActiveSessionHandle {
|
||||
pub async fn try_disconnect(
|
||||
&self,
|
||||
reason: Option<DisconnectReason>,
|
||||
) -> Result<(), SendError<SessionCommand>> {
|
||||
) -> Result<(), SendError<SessionCommand<N>>> {
|
||||
self.commands_to_session.clone().send(SessionCommand::Disconnect { reason }).await
|
||||
}
|
||||
|
||||
@ -162,7 +162,7 @@ impl ActiveSessionHandle {
|
||||
///
|
||||
/// A session starts with a `Handshake`, followed by a `Hello` message which
|
||||
#[derive(Debug)]
|
||||
pub enum PendingSessionEvent {
|
||||
pub enum PendingSessionEvent<N: NetworkPrimitives> {
|
||||
/// Represents a successful `Hello` and `Status` exchange: <https://github.com/ethereum/devp2p/blob/6b0abc3d956a626c28dce1307ee9f546db17b6bd/rlpx.md#hello-0x00>
|
||||
Established {
|
||||
/// An internal identifier for the established session
|
||||
@ -179,7 +179,7 @@ pub enum PendingSessionEvent {
|
||||
status: Arc<Status>,
|
||||
/// The actual connection stream which can be used to send and receive `eth` protocol
|
||||
/// messages
|
||||
conn: EthRlpxConnection,
|
||||
conn: EthRlpxConnection<N>,
|
||||
/// The direction of the session, either `Inbound` or `Outgoing`
|
||||
direction: Direction,
|
||||
/// The remote node's user agent, usually containing the client name and version
|
||||
@ -222,20 +222,20 @@ pub enum PendingSessionEvent {
|
||||
|
||||
/// Commands that can be sent to the spawned session.
|
||||
#[derive(Debug)]
|
||||
pub enum SessionCommand {
|
||||
pub enum SessionCommand<N: NetworkPrimitives> {
|
||||
/// Disconnect the connection
|
||||
Disconnect {
|
||||
/// Why the disconnect was initiated
|
||||
reason: Option<DisconnectReason>,
|
||||
},
|
||||
/// Sends a message to the peer
|
||||
Message(PeerMessage),
|
||||
Message(PeerMessage<N>),
|
||||
}
|
||||
|
||||
/// Message variants an active session can produce and send back to the
|
||||
/// [`SessionManager`](crate::session::SessionManager)
|
||||
#[derive(Debug)]
|
||||
pub enum ActiveSessionMessage {
|
||||
pub enum ActiveSessionMessage<N: NetworkPrimitives> {
|
||||
/// Session was gracefully disconnected.
|
||||
Disconnected {
|
||||
/// The remote node's public key
|
||||
@ -257,7 +257,7 @@ pub enum ActiveSessionMessage {
|
||||
/// Identifier of the remote peer.
|
||||
peer_id: PeerId,
|
||||
/// Message received from the peer.
|
||||
message: PeerMessage,
|
||||
message: PeerMessage<N>,
|
||||
},
|
||||
/// Received a message that does not match the announced capabilities of the peer.
|
||||
InvalidMessage {
|
||||
|
||||
@ -28,11 +28,11 @@ use futures::{future::Either, io, FutureExt, StreamExt};
|
||||
use reth_ecies::{stream::ECIESStream, ECIESError};
|
||||
use reth_eth_wire::{
|
||||
capability::CapabilityMessage, errors::EthStreamError, multiplex::RlpxProtocolMultiplexer,
|
||||
Capabilities, DisconnectReason, EthVersion, HelloMessageWithProtocols, Status,
|
||||
UnauthedEthStream, UnauthedP2PStream,
|
||||
Capabilities, DisconnectReason, EthVersion, HelloMessageWithProtocols, NetworkPrimitives,
|
||||
Status, UnauthedEthStream, UnauthedP2PStream,
|
||||
};
|
||||
use reth_metrics::common::mpsc::MeteredPollSender;
|
||||
use reth_network_api::PeerRequestSender;
|
||||
use reth_network_api::{PeerRequest, PeerRequestSender};
|
||||
use reth_network_peers::PeerId;
|
||||
use reth_network_types::SessionsConfig;
|
||||
use reth_primitives::{ForkFilter, ForkId, ForkTransition, Head};
|
||||
@ -62,7 +62,7 @@ pub struct SessionId(usize);
|
||||
/// Manages a set of sessions.
|
||||
#[must_use = "Session Manager must be polled to process session events."]
|
||||
#[derive(Debug)]
|
||||
pub struct SessionManager {
|
||||
pub struct SessionManager<N: NetworkPrimitives> {
|
||||
/// Tracks the identifier for the next session.
|
||||
next_id: usize,
|
||||
/// Keeps track of all sessions
|
||||
@ -93,21 +93,21 @@ pub struct SessionManager {
|
||||
/// session is authenticated, it can be moved to the `active_session` set.
|
||||
pending_sessions: FxHashMap<SessionId, PendingSessionHandle>,
|
||||
/// All active sessions that are ready to exchange messages.
|
||||
active_sessions: HashMap<PeerId, ActiveSessionHandle>,
|
||||
active_sessions: HashMap<PeerId, ActiveSessionHandle<N>>,
|
||||
/// The original Sender half of the [`PendingSessionEvent`] channel.
|
||||
///
|
||||
/// When a new (pending) session is created, the corresponding [`PendingSessionHandle`] will
|
||||
/// get a clone of this sender half.
|
||||
pending_sessions_tx: mpsc::Sender<PendingSessionEvent>,
|
||||
pending_sessions_tx: mpsc::Sender<PendingSessionEvent<N>>,
|
||||
/// Receiver half that listens for [`PendingSessionEvent`] produced by pending sessions.
|
||||
pending_session_rx: ReceiverStream<PendingSessionEvent>,
|
||||
pending_session_rx: ReceiverStream<PendingSessionEvent<N>>,
|
||||
/// The original Sender half of the [`ActiveSessionMessage`] channel.
|
||||
///
|
||||
/// When active session state is reached, the corresponding [`ActiveSessionHandle`] will get a
|
||||
/// clone of this sender half.
|
||||
active_session_tx: MeteredPollSender<ActiveSessionMessage>,
|
||||
active_session_tx: MeteredPollSender<ActiveSessionMessage<N>>,
|
||||
/// Receiver half that listens for [`ActiveSessionMessage`] produced by pending sessions.
|
||||
active_session_rx: ReceiverStream<ActiveSessionMessage>,
|
||||
active_session_rx: ReceiverStream<ActiveSessionMessage<N>>,
|
||||
/// Additional `RLPx` sub-protocols to be used by the session manager.
|
||||
extra_protocols: RlpxSubProtocols,
|
||||
/// Tracks the ongoing graceful disconnections attempts for incoming connections.
|
||||
@ -118,7 +118,7 @@ pub struct SessionManager {
|
||||
|
||||
// === impl SessionManager ===
|
||||
|
||||
impl SessionManager {
|
||||
impl<N: NetworkPrimitives> SessionManager<N> {
|
||||
/// Creates a new empty [`SessionManager`].
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
@ -182,7 +182,7 @@ impl SessionManager {
|
||||
}
|
||||
|
||||
/// Returns a borrowed reference to the active sessions.
|
||||
pub const fn active_sessions(&self) -> &HashMap<PeerId, ActiveSessionHandle> {
|
||||
pub const fn active_sessions(&self) -> &HashMap<PeerId, ActiveSessionHandle<N>> {
|
||||
&self.active_sessions
|
||||
}
|
||||
|
||||
@ -348,7 +348,7 @@ impl SessionManager {
|
||||
}
|
||||
|
||||
/// Sends a message to the peer's session
|
||||
pub fn send_message(&mut self, peer_id: &PeerId, msg: PeerMessage) {
|
||||
pub fn send_message(&mut self, peer_id: &PeerId, msg: PeerMessage<N>) {
|
||||
if let Some(session) = self.active_sessions.get_mut(peer_id) {
|
||||
let _ = session.commands_to_session.try_send(SessionCommand::Message(msg)).inspect_err(
|
||||
|e| {
|
||||
@ -373,7 +373,7 @@ impl SessionManager {
|
||||
}
|
||||
|
||||
/// Removes the [`PendingSessionHandle`] if it exists.
|
||||
fn remove_active_session(&mut self, id: &PeerId) -> Option<ActiveSessionHandle> {
|
||||
fn remove_active_session(&mut self, id: &PeerId) -> Option<ActiveSessionHandle<N>> {
|
||||
let session = self.active_sessions.remove(id)?;
|
||||
self.counter.dec_active(&session.direction);
|
||||
Some(session)
|
||||
@ -411,7 +411,7 @@ impl SessionManager {
|
||||
/// This polls all the session handles and returns [`SessionEvent`].
|
||||
///
|
||||
/// Active sessions are prioritized.
|
||||
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<SessionEvent> {
|
||||
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<SessionEvent<N>> {
|
||||
// Poll events from active sessions
|
||||
match self.active_session_rx.poll_next_unpin(cx) {
|
||||
Poll::Pending => {}
|
||||
@ -663,7 +663,7 @@ impl DisconnectionsCounter {
|
||||
|
||||
/// Events produced by the [`SessionManager`]
|
||||
#[derive(Debug)]
|
||||
pub enum SessionEvent {
|
||||
pub enum SessionEvent<N: NetworkPrimitives> {
|
||||
/// A new session was successfully authenticated.
|
||||
///
|
||||
/// This session is now able to exchange data.
|
||||
@ -681,7 +681,7 @@ pub enum SessionEvent {
|
||||
/// The Status message the peer sent during the `eth` handshake
|
||||
status: Arc<Status>,
|
||||
/// The channel for sending messages to the peer with the session
|
||||
messages: PeerRequestSender,
|
||||
messages: PeerRequestSender<PeerRequest<N>>,
|
||||
/// The direction of the session, either `Inbound` or `Outgoing`
|
||||
direction: Direction,
|
||||
/// The maximum time that the session waits for a response from the peer before timing out
|
||||
@ -702,7 +702,7 @@ pub enum SessionEvent {
|
||||
/// The remote node's public key
|
||||
peer_id: PeerId,
|
||||
/// Message received from the peer.
|
||||
message: PeerMessage,
|
||||
message: PeerMessage<N>,
|
||||
},
|
||||
/// Received a message that does not match the announced capabilities of the peer.
|
||||
InvalidMessage {
|
||||
@ -797,12 +797,12 @@ impl PendingSessionHandshakeError {
|
||||
pub struct ExceedsSessionLimit(pub(crate) u32);
|
||||
|
||||
/// Starts a pending session authentication with a timeout.
|
||||
pub(crate) async fn pending_session_with_timeout<F>(
|
||||
pub(crate) async fn pending_session_with_timeout<F, N: NetworkPrimitives>(
|
||||
timeout: Duration,
|
||||
session_id: SessionId,
|
||||
remote_addr: SocketAddr,
|
||||
direction: Direction,
|
||||
events: mpsc::Sender<PendingSessionEvent>,
|
||||
events: mpsc::Sender<PendingSessionEvent<N>>,
|
||||
f: F,
|
||||
) where
|
||||
F: Future<Output = ()>,
|
||||
@ -823,11 +823,11 @@ pub(crate) async fn pending_session_with_timeout<F>(
|
||||
///
|
||||
/// This will wait for the _incoming_ handshake request and answer it.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) async fn start_pending_incoming_session(
|
||||
pub(crate) async fn start_pending_incoming_session<N: NetworkPrimitives>(
|
||||
disconnect_rx: oneshot::Receiver<()>,
|
||||
session_id: SessionId,
|
||||
stream: TcpStream,
|
||||
events: mpsc::Sender<PendingSessionEvent>,
|
||||
events: mpsc::Sender<PendingSessionEvent<N>>,
|
||||
remote_addr: SocketAddr,
|
||||
secret_key: SecretKey,
|
||||
hello: HelloMessageWithProtocols,
|
||||
@ -854,9 +854,9 @@ pub(crate) async fn start_pending_incoming_session(
|
||||
/// Starts the authentication process for a connection initiated by a remote peer.
|
||||
#[instrument(skip_all, fields(%remote_addr, peer_id), target = "net")]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn start_pending_outbound_session(
|
||||
async fn start_pending_outbound_session<N: NetworkPrimitives>(
|
||||
disconnect_rx: oneshot::Receiver<()>,
|
||||
events: mpsc::Sender<PendingSessionEvent>,
|
||||
events: mpsc::Sender<PendingSessionEvent<N>>,
|
||||
session_id: SessionId,
|
||||
remote_addr: SocketAddr,
|
||||
remote_peer_id: PeerId,
|
||||
@ -903,9 +903,9 @@ async fn start_pending_outbound_session(
|
||||
|
||||
/// Authenticates a session
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn authenticate(
|
||||
async fn authenticate<N: NetworkPrimitives>(
|
||||
disconnect_rx: oneshot::Receiver<()>,
|
||||
events: mpsc::Sender<PendingSessionEvent>,
|
||||
events: mpsc::Sender<PendingSessionEvent<N>>,
|
||||
stream: TcpStream,
|
||||
session_id: SessionId,
|
||||
remote_addr: SocketAddr,
|
||||
@ -986,7 +986,7 @@ async fn get_ecies_stream<Io: AsyncRead + AsyncWrite + Unpin>(
|
||||
/// If additional [`RlpxSubProtocolHandlers`] are provided, the hello message will be updated to
|
||||
/// also negotiate the additional protocols.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn authenticate_stream(
|
||||
async fn authenticate_stream<N: NetworkPrimitives>(
|
||||
stream: UnauthedP2PStream<ECIESStream<TcpStream>>,
|
||||
session_id: SessionId,
|
||||
remote_addr: SocketAddr,
|
||||
@ -996,7 +996,7 @@ async fn authenticate_stream(
|
||||
mut status: Status,
|
||||
fork_filter: ForkFilter,
|
||||
mut extra_handlers: RlpxSubProtocolHandlers,
|
||||
) -> PendingSessionEvent {
|
||||
) -> PendingSessionEvent<N> {
|
||||
// Add extra protocols to the hello message
|
||||
extra_handlers.retain(|handler| hello.try_add_protocol(handler.protocol()).is_ok());
|
||||
|
||||
|
||||
@ -12,6 +12,7 @@ use std::{
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use alloy_consensus::BlockHeader;
|
||||
use alloy_primitives::B256;
|
||||
use rand::seq::SliceRandom;
|
||||
use reth_eth_wire::{
|
||||
@ -22,6 +23,7 @@ use reth_network_api::{DiscoveredEvent, DiscoveryEvent, PeerRequest, PeerRequest
|
||||
use reth_network_peers::PeerId;
|
||||
use reth_network_types::{PeerAddr, PeerKind};
|
||||
use reth_primitives::ForkId;
|
||||
use reth_primitives_traits::Block;
|
||||
use tokio::sync::oneshot;
|
||||
use tracing::{debug, trace};
|
||||
|
||||
@ -78,7 +80,7 @@ pub struct NetworkState<N: NetworkPrimitives = EthNetworkPrimitives> {
|
||||
/// Manages connections to peers.
|
||||
peers_manager: PeersManager,
|
||||
/// Buffered messages until polled.
|
||||
queued_messages: VecDeque<StateAction>,
|
||||
queued_messages: VecDeque<StateAction<N>>,
|
||||
/// The client type that can interact with the chain.
|
||||
///
|
||||
/// This type is used to fetch the block number after we established a session and received the
|
||||
@ -185,12 +187,12 @@ impl<N: NetworkPrimitives> NetworkState<N> {
|
||||
/// > the total number of peers) using the `NewBlock` message.
|
||||
///
|
||||
/// See also <https://github.com/ethereum/devp2p/blob/master/caps/eth.md>
|
||||
pub(crate) fn announce_new_block(&mut self, msg: NewBlockMessage) {
|
||||
pub(crate) fn announce_new_block(&mut self, msg: NewBlockMessage<N::Block>) {
|
||||
// send a `NewBlock` message to a fraction of the connected peers (square root of the total
|
||||
// number of peers)
|
||||
let num_propagate = (self.active_peers.len() as f64).sqrt() as u64 + 1;
|
||||
|
||||
let number = msg.block.block.header.number;
|
||||
let number = msg.block.block.header().number();
|
||||
let mut count = 0;
|
||||
|
||||
// Shuffle to propagate to a random sample of peers on every block announcement
|
||||
@ -227,8 +229,8 @@ impl<N: NetworkPrimitives> NetworkState<N> {
|
||||
|
||||
/// Completes the block propagation process started in [`NetworkState::announce_new_block()`]
|
||||
/// but sending `NewBlockHash` broadcast to all peers that haven't seen it yet.
|
||||
pub(crate) fn announce_new_block_hash(&mut self, msg: NewBlockMessage) {
|
||||
let number = msg.block.block.header.number;
|
||||
pub(crate) fn announce_new_block_hash(&mut self, msg: NewBlockMessage<N::Block>) {
|
||||
let number = msg.block.block.header().number();
|
||||
let hashes = NewBlockHashes(vec![BlockHashNumber { hash: msg.hash, number }]);
|
||||
for (peer_id, peer) in &mut self.active_peers {
|
||||
if peer.blocks.contains(&msg.hash) {
|
||||
@ -385,7 +387,10 @@ impl<N: NetworkPrimitives> NetworkState<N> {
|
||||
}
|
||||
|
||||
/// Handle the outcome of processed response, for example directly queue another request.
|
||||
fn on_block_response_outcome(&mut self, outcome: BlockResponseOutcome) -> Option<StateAction> {
|
||||
fn on_block_response_outcome(
|
||||
&mut self,
|
||||
outcome: BlockResponseOutcome,
|
||||
) -> Option<StateAction<N>> {
|
||||
match outcome {
|
||||
BlockResponseOutcome::Request(peer, request) => {
|
||||
self.handle_block_request(peer, request);
|
||||
@ -406,7 +411,7 @@ impl<N: NetworkPrimitives> NetworkState<N> {
|
||||
&mut self,
|
||||
peer: PeerId,
|
||||
resp: PeerResponseResult<N>,
|
||||
) -> Option<StateAction> {
|
||||
) -> Option<StateAction<N>> {
|
||||
match resp {
|
||||
PeerResponseResult::BlockHeaders(res) => {
|
||||
let outcome = self.state_fetcher.on_block_headers_response(peer, res)?;
|
||||
@ -421,7 +426,7 @@ impl<N: NetworkPrimitives> NetworkState<N> {
|
||||
}
|
||||
|
||||
/// Advances the state
|
||||
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<StateAction> {
|
||||
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<StateAction<N>> {
|
||||
loop {
|
||||
// drain buffered messages
|
||||
if let Some(message) = self.queued_messages.pop_front() {
|
||||
@ -515,13 +520,13 @@ pub(crate) struct ActivePeer<N: NetworkPrimitives> {
|
||||
|
||||
/// Message variants triggered by the [`NetworkState`]
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum StateAction {
|
||||
pub(crate) enum StateAction<N: NetworkPrimitives> {
|
||||
/// Dispatch a `NewBlock` message to the peer
|
||||
NewBlock {
|
||||
/// Target of the message
|
||||
peer_id: PeerId,
|
||||
/// The `NewBlock` message
|
||||
block: NewBlockMessage,
|
||||
block: NewBlockMessage<N::Block>,
|
||||
},
|
||||
NewBlockHashes {
|
||||
/// Target of the message
|
||||
|
||||
@ -9,9 +9,9 @@ use std::{
|
||||
use futures::Stream;
|
||||
use reth_eth_wire::{
|
||||
capability::CapabilityMessage, errors::EthStreamError, Capabilities, DisconnectReason,
|
||||
EthVersion, Status,
|
||||
EthNetworkPrimitives, EthVersion, NetworkPrimitives, Status,
|
||||
};
|
||||
use reth_network_api::PeerRequestSender;
|
||||
use reth_network_api::{PeerRequest, PeerRequestSender};
|
||||
use reth_network_peers::PeerId;
|
||||
use tracing::trace;
|
||||
|
||||
@ -50,23 +50,23 @@ use crate::{
|
||||
/// `include_mmd!("docs/mermaid/swarm.mmd`")
|
||||
#[derive(Debug)]
|
||||
#[must_use = "Swarm does nothing unless polled"]
|
||||
pub(crate) struct Swarm {
|
||||
pub(crate) struct Swarm<N: NetworkPrimitives = EthNetworkPrimitives> {
|
||||
/// Listens for new incoming connections.
|
||||
incoming: ConnectionListener,
|
||||
/// All sessions.
|
||||
sessions: SessionManager,
|
||||
sessions: SessionManager<N>,
|
||||
/// Tracks the entire state of the network and handles events received from the sessions.
|
||||
state: NetworkState,
|
||||
state: NetworkState<N>,
|
||||
}
|
||||
|
||||
// === impl Swarm ===
|
||||
|
||||
impl Swarm {
|
||||
impl<N: NetworkPrimitives> Swarm<N> {
|
||||
/// Configures a new swarm instance.
|
||||
pub(crate) const fn new(
|
||||
incoming: ConnectionListener,
|
||||
sessions: SessionManager,
|
||||
state: NetworkState,
|
||||
sessions: SessionManager<N>,
|
||||
state: NetworkState<N>,
|
||||
) -> Self {
|
||||
Self { incoming, sessions, state }
|
||||
}
|
||||
@ -77,12 +77,12 @@ impl Swarm {
|
||||
}
|
||||
|
||||
/// Access to the state.
|
||||
pub(crate) const fn state(&self) -> &NetworkState {
|
||||
pub(crate) const fn state(&self) -> &NetworkState<N> {
|
||||
&self.state
|
||||
}
|
||||
|
||||
/// Mutable access to the state.
|
||||
pub(crate) fn state_mut(&mut self) -> &mut NetworkState {
|
||||
pub(crate) fn state_mut(&mut self) -> &mut NetworkState<N> {
|
||||
&mut self.state
|
||||
}
|
||||
|
||||
@ -92,17 +92,17 @@ impl Swarm {
|
||||
}
|
||||
|
||||
/// Access to the [`SessionManager`].
|
||||
pub(crate) const fn sessions(&self) -> &SessionManager {
|
||||
pub(crate) const fn sessions(&self) -> &SessionManager<N> {
|
||||
&self.sessions
|
||||
}
|
||||
|
||||
/// Mutable access to the [`SessionManager`].
|
||||
pub(crate) fn sessions_mut(&mut self) -> &mut SessionManager {
|
||||
pub(crate) fn sessions_mut(&mut self) -> &mut SessionManager<N> {
|
||||
&mut self.sessions
|
||||
}
|
||||
}
|
||||
|
||||
impl Swarm {
|
||||
impl<N: NetworkPrimitives> Swarm<N> {
|
||||
/// Triggers a new outgoing connection to the given node
|
||||
pub(crate) fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_id: PeerId) {
|
||||
self.sessions.dial_outbound(remote_addr, remote_id)
|
||||
@ -112,7 +112,7 @@ impl Swarm {
|
||||
///
|
||||
/// This either updates the state or produces a new [`SwarmEvent`] that is bubbled up to the
|
||||
/// manager.
|
||||
fn on_session_event(&mut self, event: SessionEvent) -> Option<SwarmEvent> {
|
||||
fn on_session_event(&mut self, event: SessionEvent<N>) -> Option<SwarmEvent<N>> {
|
||||
match event {
|
||||
SessionEvent::SessionEstablished {
|
||||
peer_id,
|
||||
@ -181,7 +181,7 @@ impl Swarm {
|
||||
/// Callback for events produced by [`ConnectionListener`].
|
||||
///
|
||||
/// Depending on the event, this will produce a new [`SwarmEvent`].
|
||||
fn on_connection(&mut self, event: ListenerEvent) -> Option<SwarmEvent> {
|
||||
fn on_connection(&mut self, event: ListenerEvent) -> Option<SwarmEvent<N>> {
|
||||
match event {
|
||||
ListenerEvent::Error(err) => return Some(SwarmEvent::TcpListenerError(err)),
|
||||
ListenerEvent::ListenerClosed { local_address: address } => {
|
||||
@ -229,7 +229,7 @@ impl Swarm {
|
||||
}
|
||||
|
||||
/// Hook for actions pulled from the state
|
||||
fn on_state_action(&mut self, event: StateAction) -> Option<SwarmEvent> {
|
||||
fn on_state_action(&mut self, event: StateAction<N>) -> Option<SwarmEvent<N>> {
|
||||
match event {
|
||||
StateAction::Connect { remote_addr, peer_id } => {
|
||||
self.dial_outbound(remote_addr, peer_id);
|
||||
@ -286,8 +286,8 @@ impl Swarm {
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for Swarm {
|
||||
type Item = SwarmEvent;
|
||||
impl<N: NetworkPrimitives> Stream for Swarm<N> {
|
||||
type Item = SwarmEvent<N>;
|
||||
|
||||
/// This advances all components.
|
||||
///
|
||||
@ -338,13 +338,13 @@ impl Stream for Swarm {
|
||||
|
||||
/// All events created or delegated by the [`Swarm`] that represents changes to the state of the
|
||||
/// network.
|
||||
pub(crate) enum SwarmEvent {
|
||||
pub(crate) enum SwarmEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
|
||||
/// Events related to the actual network protocol.
|
||||
ValidMessage {
|
||||
/// The peer that sent the message
|
||||
peer_id: PeerId,
|
||||
/// Message received from the peer
|
||||
message: PeerMessage,
|
||||
message: PeerMessage<N>,
|
||||
},
|
||||
/// Received a message that does not match the announced capabilities of the peer.
|
||||
InvalidCapabilityMessage {
|
||||
@ -394,7 +394,7 @@ pub(crate) enum SwarmEvent {
|
||||
capabilities: Arc<Capabilities>,
|
||||
/// negotiated eth version
|
||||
version: EthVersion,
|
||||
messages: PeerRequestSender,
|
||||
messages: PeerRequestSender<PeerRequest<N>>,
|
||||
status: Arc<Status>,
|
||||
direction: Direction,
|
||||
},
|
||||
|
||||
@ -6,7 +6,7 @@ use alloy_consensus::{BlockHeader, Transaction, TxType};
|
||||
use alloy_eips::{eip4895::Withdrawal, eip7685::Requests};
|
||||
use alloy_primitives::{Address, B256};
|
||||
|
||||
use crate::{Block, InMemorySize};
|
||||
use crate::InMemorySize;
|
||||
|
||||
/// Abstraction for block's body.
|
||||
pub trait BlockBody:
|
||||
@ -47,11 +47,6 @@ pub trait BlockBody:
|
||||
/// Returns [`Requests`] in block, if any.
|
||||
fn requests(&self) -> Option<&Requests>;
|
||||
|
||||
/// Create a [`Block`] from the body and its header.
|
||||
fn into_block<T: Block<Header = Self::Header, Body = Self>>(self, header: Self::Header) -> T {
|
||||
T::from((header, self))
|
||||
}
|
||||
|
||||
/// Calculate the transaction root for the block body.
|
||||
fn calculate_tx_root(&self) -> B256;
|
||||
|
||||
|
||||
@ -3,12 +3,11 @@
|
||||
pub mod body;
|
||||
pub mod header;
|
||||
|
||||
use alloc::{fmt, vec::Vec};
|
||||
use alloc::fmt;
|
||||
|
||||
use alloy_primitives::{Address, B256};
|
||||
use reth_codecs::Compact;
|
||||
|
||||
use crate::{BlockBody, BlockHeader, FullBlockHeader, InMemorySize};
|
||||
use crate::{BlockHeader, FullBlockHeader, InMemorySize};
|
||||
|
||||
/// Helper trait that unifies all behaviour required by block to support full node operations.
|
||||
pub trait FullBlock: Block<Header: Compact> + Compact {}
|
||||
@ -30,79 +29,17 @@ pub trait Block:
|
||||
+ Eq
|
||||
+ serde::Serialize
|
||||
+ for<'a> serde::Deserialize<'a>
|
||||
+ From<(Self::Header, Self::Body)>
|
||||
+ Into<(Self::Header, Self::Body)>
|
||||
+ InMemorySize
|
||||
{
|
||||
/// Header part of the block.
|
||||
type Header: BlockHeader;
|
||||
|
||||
/// The block's body contains the transactions in the block.
|
||||
type Body: BlockBody;
|
||||
type Body: Send + Sync + Unpin + 'static;
|
||||
|
||||
/// A block and block hash.
|
||||
type SealedBlock<H, B>;
|
||||
|
||||
/// A block and addresses of senders of transactions in it.
|
||||
type BlockWithSenders<T>;
|
||||
|
||||
/// Returns reference to [`BlockHeader`] type.
|
||||
/// Returns reference to block header.
|
||||
fn header(&self) -> &Self::Header;
|
||||
|
||||
/// Returns reference to [`BlockBody`] type.
|
||||
/// Returns reference to block body.
|
||||
fn body(&self) -> &Self::Body;
|
||||
|
||||
/// Calculate the header hash and seal the block so that it can't be changed.
|
||||
// todo: can be default impl if sealed block type is made generic over header and body and
|
||||
// migrated to alloy
|
||||
fn seal_slow(self) -> Self::SealedBlock<Self::Header, Self::Body>;
|
||||
|
||||
/// Seal the block with a known hash.
|
||||
///
|
||||
/// WARNING: This method does not perform validation whether the hash is correct.
|
||||
// todo: can be default impl if sealed block type is made generic over header and body and
|
||||
// migrated to alloy
|
||||
fn seal(self, hash: B256) -> Self::SealedBlock<Self::Header, Self::Body>;
|
||||
|
||||
/// Expensive operation that recovers transaction signer. See
|
||||
/// `SealedBlockWithSenders`.
|
||||
fn senders(&self) -> Option<Vec<Address>> {
|
||||
self.body().recover_signers()
|
||||
}
|
||||
|
||||
/// Transform into a `BlockWithSenders`.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// If the number of senders does not match the number of transactions in the block
|
||||
/// and the signer recovery for one of the transactions fails.
|
||||
///
|
||||
/// Note: this is expected to be called with blocks read from disk.
|
||||
#[track_caller]
|
||||
fn with_senders_unchecked(self, senders: Vec<Address>) -> Self::BlockWithSenders<Self> {
|
||||
self.try_with_senders_unchecked(senders).expect("stored block is valid")
|
||||
}
|
||||
|
||||
/// Transform into a `BlockWithSenders` using the given senders.
|
||||
///
|
||||
/// If the number of senders does not match the number of transactions in the block, this falls
|
||||
/// back to manually recovery, but _without ensuring that the signature has a low `s` value_.
|
||||
/// See also `SignedTransaction::recover_signer_unchecked`.
|
||||
///
|
||||
/// Returns an error if a signature is invalid.
|
||||
// todo: can be default impl if block with senders type is made generic over block and migrated
|
||||
// to alloy
|
||||
#[track_caller]
|
||||
fn try_with_senders_unchecked(
|
||||
self,
|
||||
senders: Vec<Address>,
|
||||
) -> Result<Self::BlockWithSenders<Self>, Self>;
|
||||
|
||||
/// **Expensive**. Transform into a `BlockWithSenders` by recovering senders in the contained
|
||||
/// transactions.
|
||||
///
|
||||
/// Returns `None` if a transaction is invalid.
|
||||
// todo: can be default impl if sealed block type is made generic over header and body and
|
||||
// migrated to alloy
|
||||
fn with_recovered_senders(self) -> Option<Self::BlockWithSenders<Self>>;
|
||||
}
|
||||
|
||||
@ -87,6 +87,19 @@ impl Block {
|
||||
}
|
||||
}
|
||||
|
||||
impl reth_primitives_traits::Block for Block {
|
||||
type Header = Header;
|
||||
type Body = BlockBody;
|
||||
|
||||
fn body(&self) -> &Self::Body {
|
||||
&self.body
|
||||
}
|
||||
|
||||
fn header(&self) -> &Self::Header {
|
||||
&self.header
|
||||
}
|
||||
}
|
||||
|
||||
impl InMemorySize for Block {
|
||||
/// Calculates a heuristic for the in-memory size of the [`Block`].
|
||||
#[inline]
|
||||
|
||||
Reference in New Issue
Block a user