mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat: remove Bandwidthmeter type (#8698)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -7058,7 +7058,6 @@ name = "reth-net-common"
|
||||
version = "0.2.0-beta.9"
|
||||
dependencies = [
|
||||
"alloy-primitives",
|
||||
"pin-project",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
|
||||
@ -16,5 +16,4 @@ workspace = true
|
||||
alloy-primitives.workspace = true
|
||||
|
||||
# async
|
||||
pin-project.workspace = true
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
|
||||
@ -1,277 +0,0 @@
|
||||
//! Support for metering bandwidth.
|
||||
//!
|
||||
//! Takes heavy inspiration from <https://github.com/libp2p/rust-libp2p/blob/master/src/bandwidth.rs>
|
||||
|
||||
// Copyright 2019 Parity Technologies (UK) Ltd.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the "Software"),
|
||||
// to deal in the Software without restriction, including without limitation
|
||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
// and/or sell copies of the Software, and to permit persons to whom the
|
||||
// Software is furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use crate::stream::HasRemoteAddr;
|
||||
use std::{
|
||||
convert::TryFrom as _,
|
||||
io,
|
||||
net::SocketAddr,
|
||||
pin::Pin,
|
||||
sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc,
|
||||
},
|
||||
task::{ready, Context, Poll},
|
||||
};
|
||||
use tokio::{
|
||||
io::{AsyncRead, AsyncWrite, ReadBuf},
|
||||
net::TcpStream,
|
||||
};
|
||||
|
||||
/// Meters bandwidth usage of streams
|
||||
#[derive(Debug)]
|
||||
struct BandwidthMeterInner {
|
||||
/// Measures the number of inbound packets
|
||||
inbound: AtomicU64,
|
||||
/// Measures the number of outbound packets
|
||||
outbound: AtomicU64,
|
||||
}
|
||||
|
||||
/// Public shareable struct used for getting bandwidth metering info
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct BandwidthMeter {
|
||||
inner: Arc<BandwidthMeterInner>,
|
||||
}
|
||||
|
||||
impl BandwidthMeter {
|
||||
/// Returns the total number of bytes that have been downloaded on all the streams.
|
||||
///
|
||||
/// > **Note**: This method is by design subject to race conditions. The returned value should
|
||||
/// > only ever be used for statistics purposes.
|
||||
pub fn total_inbound(&self) -> u64 {
|
||||
self.inner.inbound.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Returns the total number of bytes that have been uploaded on all the streams.
|
||||
///
|
||||
/// > **Note**: This method is by design subject to race conditions. The returned value should
|
||||
/// > only ever be used for statistics purposes.
|
||||
pub fn total_outbound(&self) -> u64 {
|
||||
self.inner.outbound.load(Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for BandwidthMeter {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
inner: Arc::new(BandwidthMeterInner {
|
||||
inbound: AtomicU64::new(0),
|
||||
outbound: AtomicU64::new(0),
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Wraps around a single stream that implements [`AsyncRead`] + [`AsyncWrite`] and meters the
|
||||
/// bandwidth through it
|
||||
#[derive(Debug)]
|
||||
#[pin_project::pin_project]
|
||||
pub struct MeteredStream<S> {
|
||||
/// The stream this instruments
|
||||
#[pin]
|
||||
inner: S,
|
||||
/// The [`BandwidthMeter`] struct this uses to meter bandwidth
|
||||
meter: BandwidthMeter,
|
||||
}
|
||||
|
||||
impl<S> MeteredStream<S> {
|
||||
/// Creates a new [`MeteredStream`] wrapping around the provided stream,
|
||||
/// along with a new [`BandwidthMeter`]
|
||||
pub fn new(inner: S) -> Self {
|
||||
Self { inner, meter: BandwidthMeter::default() }
|
||||
}
|
||||
|
||||
/// Creates a new [`MeteredStream`] wrapping around the provided stream,
|
||||
/// attaching the provided [`BandwidthMeter`]
|
||||
pub const fn new_with_meter(inner: S, meter: BandwidthMeter) -> Self {
|
||||
Self { inner, meter }
|
||||
}
|
||||
|
||||
/// Provides a reference to the [`BandwidthMeter`] attached to this [`MeteredStream`]
|
||||
pub const fn get_bandwidth_meter(&self) -> &BandwidthMeter {
|
||||
&self.meter
|
||||
}
|
||||
|
||||
/// Returns the wrapped stream
|
||||
pub const fn inner(&self) -> &S {
|
||||
&self.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl<Stream: AsyncRead> AsyncRead for MeteredStream<Stream> {
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut ReadBuf<'_>,
|
||||
) -> Poll<io::Result<()>> {
|
||||
let this = self.project();
|
||||
let num_bytes = {
|
||||
let init_num_bytes = buf.filled().len();
|
||||
ready!(this.inner.poll_read(cx, buf))?;
|
||||
buf.filled().len() - init_num_bytes
|
||||
};
|
||||
this.meter
|
||||
.inner
|
||||
.inbound
|
||||
.fetch_add(u64::try_from(num_bytes).unwrap_or(u64::MAX), Ordering::Relaxed);
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
impl<Stream: AsyncWrite> AsyncWrite for MeteredStream<Stream> {
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
let this = self.project();
|
||||
let num_bytes = ready!(this.inner.poll_write(cx, buf))?;
|
||||
this.meter
|
||||
.inner
|
||||
.outbound
|
||||
.fetch_add(u64::try_from(num_bytes).unwrap_or(u64::MAX), Ordering::Relaxed);
|
||||
Poll::Ready(Ok(num_bytes))
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
let this = self.project();
|
||||
this.inner.poll_flush(cx)
|
||||
}
|
||||
|
||||
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
let this = self.project();
|
||||
this.inner.poll_shutdown(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl HasRemoteAddr for MeteredStream<TcpStream> {
|
||||
fn remote_addr(&self) -> Option<SocketAddr> {
|
||||
self.inner.remote_addr()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use tokio::{
|
||||
io::{duplex, AsyncReadExt, AsyncWriteExt, DuplexStream},
|
||||
net::TcpListener,
|
||||
};
|
||||
|
||||
async fn duplex_stream_ping_pong(
|
||||
client: &mut MeteredStream<DuplexStream>,
|
||||
server: &mut MeteredStream<DuplexStream>,
|
||||
) {
|
||||
let mut buf = [0u8; 4];
|
||||
|
||||
client.write_all(b"ping").await.unwrap();
|
||||
server.read_exact(&mut buf).await.unwrap();
|
||||
|
||||
server.write_all(b"pong").await.unwrap();
|
||||
client.read_exact(&mut buf).await.unwrap();
|
||||
}
|
||||
|
||||
fn assert_bandwidth_counts(
|
||||
bandwidth_meter: &BandwidthMeter,
|
||||
expected_inbound: u64,
|
||||
expected_outbound: u64,
|
||||
) {
|
||||
let actual_inbound = bandwidth_meter.total_inbound();
|
||||
assert_eq!(
|
||||
actual_inbound, expected_inbound,
|
||||
"Expected {expected_inbound} inbound bytes, but got {actual_inbound}",
|
||||
);
|
||||
|
||||
let actual_outbound = bandwidth_meter.total_outbound();
|
||||
assert_eq!(
|
||||
actual_outbound, expected_outbound,
|
||||
"Expected {expected_outbound} inbound bytes, but got {actual_outbound}",
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_count_read_write() {
|
||||
// Taken in large part from https://docs.rs/tokio/latest/tokio/io/struct.DuplexStream.html#example
|
||||
|
||||
let (client, server) = duplex(64);
|
||||
let mut metered_client = MeteredStream::new(client);
|
||||
let mut metered_server = MeteredStream::new(server);
|
||||
|
||||
duplex_stream_ping_pong(&mut metered_client, &mut metered_server).await;
|
||||
|
||||
assert_bandwidth_counts(metered_client.get_bandwidth_meter(), 4, 4);
|
||||
assert_bandwidth_counts(metered_server.get_bandwidth_meter(), 4, 4);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_equals_write_tcp() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let server_addr = listener.local_addr().unwrap();
|
||||
|
||||
let client_stream = TcpStream::connect(server_addr).await.unwrap();
|
||||
let mut metered_client_stream = MeteredStream::new(client_stream);
|
||||
|
||||
let client_meter = metered_client_stream.meter.clone();
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
let (server_stream, _) = listener.accept().await.unwrap();
|
||||
let mut metered_server_stream = MeteredStream::new(server_stream);
|
||||
|
||||
let mut buf = [0u8; 4];
|
||||
|
||||
metered_server_stream.read_exact(&mut buf).await.unwrap();
|
||||
|
||||
assert_eq!(metered_server_stream.meter.total_inbound(), client_meter.total_outbound());
|
||||
});
|
||||
|
||||
metered_client_stream.write_all(b"ping").await.unwrap();
|
||||
|
||||
handle.await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_multiple_streams_one_meter() {
|
||||
let (client_1, server_1) = duplex(64);
|
||||
let (client_2, server_2) = duplex(64);
|
||||
|
||||
let shared_client_bandwidth_meter = BandwidthMeter::default();
|
||||
let shared_server_bandwidth_meter = BandwidthMeter::default();
|
||||
|
||||
let mut metered_client_1 =
|
||||
MeteredStream::new_with_meter(client_1, shared_client_bandwidth_meter.clone());
|
||||
let mut metered_server_1 =
|
||||
MeteredStream::new_with_meter(server_1, shared_server_bandwidth_meter.clone());
|
||||
|
||||
let mut metered_client_2 =
|
||||
MeteredStream::new_with_meter(client_2, shared_client_bandwidth_meter.clone());
|
||||
let mut metered_server_2 =
|
||||
MeteredStream::new_with_meter(server_2, shared_server_bandwidth_meter.clone());
|
||||
|
||||
duplex_stream_ping_pong(&mut metered_client_1, &mut metered_server_1).await;
|
||||
duplex_stream_ping_pong(&mut metered_client_2, &mut metered_server_2).await;
|
||||
|
||||
assert_bandwidth_counts(&shared_client_bandwidth_meter, 8, 8);
|
||||
assert_bandwidth_counts(&shared_server_bandwidth_meter, 8, 8);
|
||||
}
|
||||
}
|
||||
@ -9,7 +9,6 @@
|
||||
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
|
||||
|
||||
pub mod ban_list;
|
||||
pub mod bandwidth_meter;
|
||||
|
||||
/// Traits related to tokio streams
|
||||
pub mod stream;
|
||||
|
||||
@ -42,7 +42,6 @@ use reth_eth_wire::{
|
||||
DisconnectReason, EthVersion, Status,
|
||||
};
|
||||
use reth_metrics::common::mpsc::UnboundedMeteredSender;
|
||||
use reth_net_common::bandwidth_meter::BandwidthMeter;
|
||||
use reth_network_api::ReputationChangeKind;
|
||||
use reth_network_peers::PeerId;
|
||||
use reth_primitives::{ForkId, NodeRecord};
|
||||
@ -141,12 +140,6 @@ impl<C> NetworkManager<C> {
|
||||
&self.handle
|
||||
}
|
||||
|
||||
/// Returns a shareable reference to the [`BandwidthMeter`] stored
|
||||
/// inside of the [`NetworkHandle`]
|
||||
pub fn bandwidth_meter(&self) -> &BandwidthMeter {
|
||||
self.handle.bandwidth_meter()
|
||||
}
|
||||
|
||||
/// Returns the secret key used for authenticating sessions.
|
||||
pub const fn secret_key(&self) -> SecretKey {
|
||||
self.swarm.sessions().secret_key()
|
||||
@ -233,7 +226,6 @@ where
|
||||
let discv4 = discovery.discv4();
|
||||
|
||||
let num_active_peers = Arc::new(AtomicUsize::new(0));
|
||||
let bandwidth_meter: BandwidthMeter = BandwidthMeter::default();
|
||||
|
||||
let sessions = SessionManager::new(
|
||||
secret_key,
|
||||
@ -243,7 +235,6 @@ where
|
||||
hello_message,
|
||||
fork_filter,
|
||||
extra_protocols,
|
||||
bandwidth_meter.clone(),
|
||||
);
|
||||
|
||||
let state =
|
||||
@ -263,7 +254,6 @@ where
|
||||
local_peer_id,
|
||||
peers_handle,
|
||||
network_mode,
|
||||
bandwidth_meter,
|
||||
Arc::new(AtomicU64::new(chain_spec.chain.id())),
|
||||
tx_gossip_disabled,
|
||||
discv4,
|
||||
|
||||
@ -7,7 +7,6 @@ use enr::Enr;
|
||||
use parking_lot::Mutex;
|
||||
use reth_discv4::Discv4;
|
||||
use reth_eth_wire::{DisconnectReason, NewBlock, NewPooledTransactionHashes, SharedTransactions};
|
||||
use reth_net_common::bandwidth_meter::BandwidthMeter;
|
||||
use reth_network_api::{
|
||||
NetworkError, NetworkInfo, PeerInfo, PeerKind, Peers, PeersInfo, Reputation,
|
||||
ReputationChangeKind,
|
||||
@ -53,7 +52,6 @@ impl NetworkHandle {
|
||||
local_peer_id: PeerId,
|
||||
peers: PeersHandle,
|
||||
network_mode: NetworkMode,
|
||||
bandwidth_meter: BandwidthMeter,
|
||||
chain_id: Arc<AtomicU64>,
|
||||
tx_gossip_disabled: bool,
|
||||
discv4: Option<Discv4>,
|
||||
@ -67,7 +65,6 @@ impl NetworkHandle {
|
||||
local_peer_id,
|
||||
peers,
|
||||
network_mode,
|
||||
bandwidth_meter,
|
||||
is_syncing: Arc::new(AtomicBool::new(false)),
|
||||
initial_sync_done: Arc::new(AtomicBool::new(false)),
|
||||
chain_id,
|
||||
@ -154,11 +151,6 @@ impl NetworkHandle {
|
||||
rx.await.unwrap()
|
||||
}
|
||||
|
||||
/// Provides a shareable reference to the [`BandwidthMeter`] stored on the `NetworkInner`.
|
||||
pub fn bandwidth_meter(&self) -> &BandwidthMeter {
|
||||
&self.inner.bandwidth_meter
|
||||
}
|
||||
|
||||
/// Send message to gracefully shutdown node.
|
||||
///
|
||||
/// This will disconnect all active and pending sessions and prevent
|
||||
@ -393,8 +385,6 @@ struct NetworkInner {
|
||||
peers: PeersHandle,
|
||||
/// The mode of the network
|
||||
network_mode: NetworkMode,
|
||||
/// Used to measure inbound & outbound bandwidth across network streams (currently unused)
|
||||
bandwidth_meter: BandwidthMeter,
|
||||
/// Represents if the network is currently syncing.
|
||||
is_syncing: Arc<AtomicBool>,
|
||||
/// Used to differentiate between an initial pipeline sync or a live sync
|
||||
|
||||
@ -768,7 +768,6 @@ mod tests {
|
||||
EthStream, GetBlockBodies, HelloMessageWithProtocols, P2PStream, Status, StatusBuilder,
|
||||
UnauthedEthStream, UnauthedP2PStream,
|
||||
};
|
||||
use reth_net_common::bandwidth_meter::{BandwidthMeter, MeteredStream};
|
||||
use reth_network_peers::pk2id;
|
||||
use reth_primitives::{ForkFilter, Hardfork, MAINNET};
|
||||
use secp256k1::{SecretKey, SECP256K1};
|
||||
@ -793,7 +792,6 @@ mod tests {
|
||||
status: Status,
|
||||
fork_filter: ForkFilter,
|
||||
next_id: usize,
|
||||
bandwidth_meter: BandwidthMeter,
|
||||
}
|
||||
|
||||
impl SessionBuilder {
|
||||
@ -838,13 +836,11 @@ mod tests {
|
||||
let session_id = self.next_id();
|
||||
let (_disconnect_tx, disconnect_rx) = oneshot::channel();
|
||||
let (pending_sessions_tx, pending_sessions_rx) = mpsc::channel(1);
|
||||
let metered_stream =
|
||||
MeteredStream::new_with_meter(stream, self.bandwidth_meter.clone());
|
||||
|
||||
tokio::task::spawn(start_pending_incoming_session(
|
||||
disconnect_rx,
|
||||
session_id,
|
||||
metered_stream,
|
||||
stream,
|
||||
pending_sessions_tx,
|
||||
remote_addr,
|
||||
self.secret_key,
|
||||
@ -925,7 +921,6 @@ mod tests {
|
||||
fork_filter: MAINNET
|
||||
.hardfork_fork_filter(Hardfork::Frontier)
|
||||
.expect("The Frontier fork filter should exist on mainnet"),
|
||||
bandwidth_meter: BandwidthMeter::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -8,7 +8,6 @@ use reth_eth_wire::{
|
||||
multiplex::{ProtocolProxy, RlpxSatelliteStream},
|
||||
EthMessage, EthStream, EthVersion, P2PStream,
|
||||
};
|
||||
use reth_net_common::bandwidth_meter::MeteredStream;
|
||||
use std::{
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
@ -16,11 +15,11 @@ use std::{
|
||||
use tokio::net::TcpStream;
|
||||
|
||||
/// The type of the underlying peer network connection.
|
||||
pub type EthPeerConnection = EthStream<P2PStream<ECIESStream<MeteredStream<TcpStream>>>>;
|
||||
pub type EthPeerConnection = EthStream<P2PStream<ECIESStream<TcpStream>>>;
|
||||
|
||||
/// Various connection types that at least support the ETH protocol.
|
||||
pub type EthSatelliteConnection =
|
||||
RlpxSatelliteStream<ECIESStream<MeteredStream<TcpStream>>, EthStream<ProtocolProxy>>;
|
||||
RlpxSatelliteStream<ECIESStream<TcpStream>, EthStream<ProtocolProxy>>;
|
||||
|
||||
/// Connection types that support the ETH protocol.
|
||||
///
|
||||
@ -47,7 +46,7 @@ impl EthRlpxConnection {
|
||||
|
||||
/// Consumes this type and returns the wrapped [`P2PStream`].
|
||||
#[inline]
|
||||
pub(crate) fn into_inner(self) -> P2PStream<ECIESStream<MeteredStream<TcpStream>>> {
|
||||
pub(crate) fn into_inner(self) -> P2PStream<ECIESStream<TcpStream>> {
|
||||
match self {
|
||||
Self::EthOnly(conn) => conn.into_inner(),
|
||||
Self::Satellite(conn) => conn.into_inner(),
|
||||
@ -56,7 +55,7 @@ impl EthRlpxConnection {
|
||||
|
||||
/// Returns mutable access to the underlying stream.
|
||||
#[inline]
|
||||
pub(crate) fn inner_mut(&mut self) -> &mut P2PStream<ECIESStream<MeteredStream<TcpStream>>> {
|
||||
pub(crate) fn inner_mut(&mut self) -> &mut P2PStream<ECIESStream<TcpStream>> {
|
||||
match self {
|
||||
Self::EthOnly(conn) => conn.inner_mut(),
|
||||
Self::Satellite(conn) => conn.inner_mut(),
|
||||
@ -65,7 +64,7 @@ impl EthRlpxConnection {
|
||||
|
||||
/// Returns access to the underlying stream.
|
||||
#[inline]
|
||||
pub(crate) const fn inner(&self) -> &P2PStream<ECIESStream<MeteredStream<TcpStream>>> {
|
||||
pub(crate) const fn inner(&self) -> &P2PStream<ECIESStream<TcpStream>> {
|
||||
match self {
|
||||
Self::EthOnly(conn) => conn.inner(),
|
||||
Self::Satellite(conn) => conn.inner(),
|
||||
|
||||
@ -15,10 +15,7 @@ use reth_eth_wire::{
|
||||
UnauthedP2PStream,
|
||||
};
|
||||
use reth_metrics::common::mpsc::MeteredPollSender;
|
||||
use reth_net_common::{
|
||||
bandwidth_meter::{BandwidthMeter, MeteredStream},
|
||||
stream::HasRemoteAddr,
|
||||
};
|
||||
use reth_net_common::stream::HasRemoteAddr;
|
||||
use reth_network_peers::PeerId;
|
||||
use reth_primitives::{ForkFilter, ForkId, ForkTransition, Head};
|
||||
use reth_tasks::TaskSpawner;
|
||||
@ -109,8 +106,6 @@ pub struct SessionManager {
|
||||
active_session_rx: ReceiverStream<ActiveSessionMessage>,
|
||||
/// Additional `RLPx` sub-protocols to be used by the session manager.
|
||||
extra_protocols: RlpxSubProtocols,
|
||||
/// Used to measure inbound & outbound bandwidth across all managed streams
|
||||
bandwidth_meter: BandwidthMeter,
|
||||
/// Metrics for the session manager.
|
||||
metrics: SessionManagerMetrics,
|
||||
}
|
||||
@ -128,7 +123,6 @@ impl SessionManager {
|
||||
hello_message: HelloMessageWithProtocols,
|
||||
fork_filter: ForkFilter,
|
||||
extra_protocols: RlpxSubProtocols,
|
||||
bandwidth_meter: BandwidthMeter,
|
||||
) -> Self {
|
||||
let (pending_sessions_tx, pending_sessions_rx) = mpsc::channel(config.session_event_buffer);
|
||||
let (active_session_tx, active_session_rx) = mpsc::channel(config.session_event_buffer);
|
||||
@ -152,7 +146,6 @@ impl SessionManager {
|
||||
pending_session_rx: ReceiverStream::new(pending_sessions_rx),
|
||||
active_session_tx: MeteredPollSender::new(active_session_tx, "network_active_session"),
|
||||
active_session_rx: ReceiverStream::new(active_session_rx),
|
||||
bandwidth_meter,
|
||||
extra_protocols,
|
||||
metrics: Default::default(),
|
||||
}
|
||||
@ -240,7 +233,6 @@ impl SessionManager {
|
||||
|
||||
let (disconnect_tx, disconnect_rx) = oneshot::channel();
|
||||
let pending_events = self.pending_sessions_tx.clone();
|
||||
let metered_stream = MeteredStream::new_with_meter(stream, self.bandwidth_meter.clone());
|
||||
let secret_key = self.secret_key;
|
||||
let hello_message = self.hello_message.clone();
|
||||
let status = self.status;
|
||||
@ -255,7 +247,7 @@ impl SessionManager {
|
||||
start_pending_incoming_session(
|
||||
disconnect_rx,
|
||||
session_id,
|
||||
metered_stream,
|
||||
stream,
|
||||
pending_events,
|
||||
remote_addr,
|
||||
secret_key,
|
||||
@ -286,7 +278,6 @@ impl SessionManager {
|
||||
let hello_message = self.hello_message.clone();
|
||||
let fork_filter = self.fork_filter.clone();
|
||||
let status = self.status;
|
||||
let band_with_meter = self.bandwidth_meter.clone();
|
||||
let extra_handlers = self.extra_protocols.on_outgoing(remote_addr, remote_peer_id);
|
||||
self.spawn(pending_session_with_timeout(
|
||||
self.pending_session_timeout,
|
||||
@ -304,7 +295,6 @@ impl SessionManager {
|
||||
hello_message,
|
||||
status,
|
||||
fork_filter,
|
||||
band_with_meter,
|
||||
extra_handlers,
|
||||
),
|
||||
));
|
||||
@ -797,7 +787,7 @@ pub(crate) async fn pending_session_with_timeout<F>(
|
||||
pub(crate) async fn start_pending_incoming_session(
|
||||
disconnect_rx: oneshot::Receiver<()>,
|
||||
session_id: SessionId,
|
||||
stream: MeteredStream<TcpStream>,
|
||||
stream: TcpStream,
|
||||
events: mpsc::Sender<PendingSessionEvent>,
|
||||
remote_addr: SocketAddr,
|
||||
secret_key: SecretKey,
|
||||
@ -835,7 +825,6 @@ async fn start_pending_outbound_session(
|
||||
hello: HelloMessageWithProtocols,
|
||||
status: Status,
|
||||
fork_filter: ForkFilter,
|
||||
bandwidth_meter: BandwidthMeter,
|
||||
extra_handlers: RlpxSubProtocolHandlers,
|
||||
) {
|
||||
let stream = match TcpStream::connect(remote_addr).await {
|
||||
@ -843,7 +832,7 @@ async fn start_pending_outbound_session(
|
||||
if let Err(err) = stream.set_nodelay(true) {
|
||||
tracing::warn!(target: "net::session", "set nodelay failed: {:?}", err);
|
||||
}
|
||||
MeteredStream::new_with_meter(stream, bandwidth_meter)
|
||||
stream
|
||||
}
|
||||
Err(error) => {
|
||||
let _ = events
|
||||
@ -878,7 +867,7 @@ async fn start_pending_outbound_session(
|
||||
async fn authenticate(
|
||||
disconnect_rx: oneshot::Receiver<()>,
|
||||
events: mpsc::Sender<PendingSessionEvent>,
|
||||
stream: MeteredStream<TcpStream>,
|
||||
stream: TcpStream,
|
||||
session_id: SessionId,
|
||||
remote_addr: SocketAddr,
|
||||
secret_key: SecretKey,
|
||||
@ -888,7 +877,7 @@ async fn authenticate(
|
||||
fork_filter: ForkFilter,
|
||||
extra_handlers: RlpxSubProtocolHandlers,
|
||||
) {
|
||||
let local_addr = stream.inner().local_addr().ok();
|
||||
let local_addr = stream.local_addr().ok();
|
||||
let stream = match get_eciess_stream(stream, secret_key, direction).await {
|
||||
Ok(stream) => stream,
|
||||
Err(error) => {
|
||||
@ -959,7 +948,7 @@ async fn get_eciess_stream<Io: AsyncRead + AsyncWrite + Unpin + HasRemoteAddr>(
|
||||
/// also negotiate the additional protocols.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn authenticate_stream(
|
||||
stream: UnauthedP2PStream<ECIESStream<MeteredStream<TcpStream>>>,
|
||||
stream: UnauthedP2PStream<ECIESStream<TcpStream>>,
|
||||
session_id: SessionId,
|
||||
remote_addr: SocketAddr,
|
||||
local_addr: Option<SocketAddr>,
|
||||
|
||||
Reference in New Issue
Block a user