chore: use ethereum-forks types directly (#12702)

This commit is contained in:
Matthias Seitz
2024-11-20 12:15:48 +01:00
committed by GitHub
parent 868f3acdbc
commit f12d7a9264
31 changed files with 243 additions and 286 deletions

1
Cargo.lock generated
View File

@ -7802,6 +7802,7 @@ dependencies = [
"reth-ecies", "reth-ecies",
"reth-eth-wire", "reth-eth-wire",
"reth-eth-wire-types", "reth-eth-wire-types",
"reth-ethereum-forks",
"reth-fs-util", "reth-fs-util",
"reth-metrics", "reth-metrics",
"reth-net-banlist", "reth-net-banlist",

View File

@ -23,6 +23,7 @@ reth-network-p2p.workspace = true
reth-discv4.workspace = true reth-discv4.workspace = true
reth-discv5.workspace = true reth-discv5.workspace = true
reth-dns-discovery.workspace = true reth-dns-discovery.workspace = true
reth-ethereum-forks.workspace = true
reth-eth-wire.workspace = true reth-eth-wire.workspace = true
reth-eth-wire-types.workspace = true reth-eth-wire-types.workspace = true
reth-ecies.workspace = true reth-ecies.workspace = true

View File

@ -1,15 +1,14 @@
//! Builder support for configuring the entire setup. //! Builder support for configuring the entire setup.
use reth_eth_wire::{EthNetworkPrimitives, NetworkPrimitives};
use reth_network_api::test_utils::PeersHandleProvider;
use reth_transaction_pool::TransactionPool;
use tokio::sync::mpsc;
use crate::{ use crate::{
eth_requests::EthRequestHandler, eth_requests::EthRequestHandler,
transactions::{TransactionsManager, TransactionsManagerConfig}, transactions::{TransactionsManager, TransactionsManagerConfig},
NetworkHandle, NetworkManager, NetworkHandle, NetworkManager,
}; };
use reth_eth_wire::{EthNetworkPrimitives, NetworkPrimitives};
use reth_network_api::test_utils::PeersHandleProvider;
use reth_transaction_pool::TransactionPool;
use tokio::sync::mpsc;
/// We set the max channel capacity of the `EthRequestHandler` to 256 /// We set the max channel capacity of the `EthRequestHandler` to 256
/// 256 requests with malicious 10MB body requests is 2.6GB which can be absorbed by the node. /// 256 requests with malicious 10MB body requests is 2.6GB which can be absorbed by the node.

View File

@ -1,11 +1,10 @@
//! Network cache support //! Network cache support
use core::hash::BuildHasher; use core::hash::BuildHasher;
use std::{fmt, hash::Hash};
use derive_more::{Deref, DerefMut}; use derive_more::{Deref, DerefMut};
use itertools::Itertools; use itertools::Itertools;
use schnellru::{ByLength, Limiter, RandomState, Unlimited}; use schnellru::{ByLength, Limiter, RandomState, Unlimited};
use std::{fmt, hash::Hash};
/// A minimal LRU cache based on a [`LruMap`](schnellru::LruMap) with limited capacity. /// A minimal LRU cache based on a [`LruMap`](schnellru::LruMap) with limited capacity.
/// ///

View File

@ -1,27 +1,25 @@
//! Network config support //! Network config support
use std::{collections::HashSet, net::SocketAddr, sync::Arc};
use reth_chainspec::{ChainSpecProvider, EthChainSpec, Hardforks};
use reth_discv4::{Discv4Config, Discv4ConfigBuilder, NatResolver, DEFAULT_DISCOVERY_ADDRESS};
use reth_discv5::NetworkStackId;
use reth_dns_discovery::DnsDiscoveryConfig;
use reth_eth_wire::{
EthNetworkPrimitives, HelloMessage, HelloMessageWithProtocols, NetworkPrimitives, Status,
};
use reth_network_peers::{mainnet_nodes, pk2id, sepolia_nodes, PeerId, TrustedPeer};
use reth_network_types::{PeersConfig, SessionsConfig};
use reth_primitives::{ForkFilter, Head};
use reth_storage_api::{noop::NoopBlockReader, BlockNumReader, BlockReader, HeaderProvider};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use secp256k1::SECP256K1;
use crate::{ use crate::{
error::NetworkError, error::NetworkError,
import::{BlockImport, ProofOfStakeBlockImport}, import::{BlockImport, ProofOfStakeBlockImport},
transactions::TransactionsManagerConfig, transactions::TransactionsManagerConfig,
NetworkHandle, NetworkManager, NetworkHandle, NetworkManager,
}; };
use reth_chainspec::{ChainSpecProvider, EthChainSpec, Hardforks};
use reth_discv4::{Discv4Config, Discv4ConfigBuilder, NatResolver, DEFAULT_DISCOVERY_ADDRESS};
use reth_discv5::NetworkStackId;
use reth_dns_discovery::DnsDiscoveryConfig;
use reth_eth_wire::{
EthNetworkPrimitives, HelloMessage, HelloMessageWithProtocols, NetworkPrimitives, Status,
};
use reth_ethereum_forks::{ForkFilter, Head};
use reth_network_peers::{mainnet_nodes, pk2id, sepolia_nodes, PeerId, TrustedPeer};
use reth_network_types::{PeersConfig, SessionsConfig};
use reth_storage_api::{noop::NoopBlockReader, BlockNumReader, BlockReader, HeaderProvider};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use secp256k1::SECP256K1;
use std::{collections::HashSet, net::SocketAddr, sync::Arc};
// re-export for convenience // re-export for convenience
use crate::protocol::{IntoRlpxSubProtocol, RlpxSubProtocols}; use crate::protocol::{IntoRlpxSubProtocol, RlpxSubProtocols};

View File

@ -1,13 +1,9 @@
//! Discovery support for the network. //! Discovery support for the network.
use std::{ use crate::{
collections::VecDeque, cache::LruMap,
net::{IpAddr, SocketAddr}, error::{NetworkError, ServiceKind},
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
}; };
use enr::Enr; use enr::Enr;
use futures::StreamExt; use futures::StreamExt;
use reth_discv4::{DiscoveryUpdate, Discv4, Discv4Config}; use reth_discv4::{DiscoveryUpdate, Discv4, Discv4Config};
@ -15,20 +11,22 @@ use reth_discv5::{DiscoveredPeer, Discv5};
use reth_dns_discovery::{ use reth_dns_discovery::{
DnsDiscoveryConfig, DnsDiscoveryHandle, DnsDiscoveryService, DnsNodeRecordUpdate, DnsResolver, DnsDiscoveryConfig, DnsDiscoveryHandle, DnsDiscoveryService, DnsNodeRecordUpdate, DnsResolver,
}; };
use reth_ethereum_forks::{EnrForkIdEntry, ForkId};
use reth_network_api::{DiscoveredEvent, DiscoveryEvent}; use reth_network_api::{DiscoveredEvent, DiscoveryEvent};
use reth_network_peers::{NodeRecord, PeerId}; use reth_network_peers::{NodeRecord, PeerId};
use reth_network_types::PeerAddr; use reth_network_types::PeerAddr;
use reth_primitives::{EnrForkIdEntry, ForkId};
use secp256k1::SecretKey; use secp256k1::SecretKey;
use std::{
collections::VecDeque,
net::{IpAddr, SocketAddr},
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
};
use tokio::{sync::mpsc, task::JoinHandle}; use tokio::{sync::mpsc, task::JoinHandle};
use tokio_stream::{wrappers::ReceiverStream, Stream}; use tokio_stream::{wrappers::ReceiverStream, Stream};
use tracing::trace; use tracing::trace;
use crate::{
cache::LruMap,
error::{NetworkError, ServiceKind},
};
/// Default max capacity for cache of discovered peers. /// Default max capacity for cache of discovered peers.
/// ///
/// Default is 10 000 peers. /// Default is 10 000 peers.

View File

@ -1,7 +1,6 @@
//! Possible errors when interacting with the network. //! Possible errors when interacting with the network.
use std::{fmt, io, io::ErrorKind, net::SocketAddr}; use crate::session::PendingSessionHandshakeError;
use reth_dns_discovery::resolver::ResolveError; use reth_dns_discovery::resolver::ResolveError;
use reth_ecies::ECIESErrorImpl; use reth_ecies::ECIESErrorImpl;
use reth_eth_wire::{ use reth_eth_wire::{
@ -9,8 +8,7 @@ use reth_eth_wire::{
DisconnectReason, DisconnectReason,
}; };
use reth_network_types::BackoffKind; use reth_network_types::BackoffKind;
use std::{fmt, io, io::ErrorKind, net::SocketAddr};
use crate::session::PendingSessionHandshakeError;
/// Service kind. /// Service kind.
#[derive(Debug, PartialEq, Eq, Copy, Clone)] #[derive(Debug, PartialEq, Eq, Copy, Clone)]

View File

@ -1,12 +1,9 @@
//! Blocks/Headers management for the p2p network. //! Blocks/Headers management for the p2p network.
use std::{ use crate::{
future::Future, budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget,
pin::Pin, metrics::EthRequestHandlerMetrics,
task::{Context, Poll},
time::Duration,
}; };
use alloy_consensus::Header; use alloy_consensus::Header;
use alloy_eips::BlockHashOrNumber; use alloy_eips::BlockHashOrNumber;
use alloy_rlp::Encodable; use alloy_rlp::Encodable;
@ -20,14 +17,15 @@ use reth_network_p2p::error::RequestResult;
use reth_network_peers::PeerId; use reth_network_peers::PeerId;
use reth_primitives::BlockBody; use reth_primitives::BlockBody;
use reth_storage_api::{BlockReader, HeaderProvider, ReceiptProvider}; use reth_storage_api::{BlockReader, HeaderProvider, ReceiptProvider};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use tokio::sync::{mpsc::Receiver, oneshot}; use tokio::sync::{mpsc::Receiver, oneshot};
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use crate::{
budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget,
metrics::EthRequestHandlerMetrics,
};
// Limits: <https://github.com/ethereum/go-ethereum/blob/b0d44338bbcefee044f1f635a84487cbbd8f0538/eth/protocols/eth/handler.go#L34-L56> // Limits: <https://github.com/ethereum/go-ethereum/blob/b0d44338bbcefee044f1f635a84487cbbd8f0538/eth/protocols/eth/handler.go#L34-L56>
/// Maximum number of receipts to serve. /// Maximum number of receipts to serve.

View File

@ -1,10 +1,6 @@
//! A client implementation that can interact with the network and download data. //! A client implementation that can interact with the network and download data.
use std::sync::{ use crate::{fetch::DownloadRequest, flattened_response::FlattenedResponse};
atomic::{AtomicUsize, Ordering},
Arc,
};
use alloy_primitives::B256; use alloy_primitives::B256;
use futures::{future, future::Either}; use futures::{future, future::Either};
use reth_eth_wire::{EthNetworkPrimitives, NetworkPrimitives}; use reth_eth_wire::{EthNetworkPrimitives, NetworkPrimitives};
@ -18,10 +14,12 @@ use reth_network_p2p::{
}; };
use reth_network_peers::PeerId; use reth_network_peers::PeerId;
use reth_network_types::ReputationChangeKind; use reth_network_types::ReputationChangeKind;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use tokio::sync::{mpsc::UnboundedSender, oneshot}; use tokio::sync::{mpsc::UnboundedSender, oneshot};
use crate::{fetch::DownloadRequest, flattened_response::FlattenedResponse};
#[cfg_attr(doc, aquamarine::aquamarine)] #[cfg_attr(doc, aquamarine::aquamarine)]
/// Front-end API for fetching data from the network. /// Front-end API for fetching data from the network.
/// ///

View File

@ -4,15 +4,7 @@ mod client;
pub use client::FetchClient; pub use client::FetchClient;
use std::{ use crate::message::BlockRequest;
collections::{HashMap, VecDeque},
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
},
task::{Context, Poll},
};
use alloy_primitives::B256; use alloy_primitives::B256;
use futures::StreamExt; use futures::StreamExt;
use reth_eth_wire::{EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, NetworkPrimitives}; use reth_eth_wire::{EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, NetworkPrimitives};
@ -24,11 +16,17 @@ use reth_network_p2p::{
}; };
use reth_network_peers::PeerId; use reth_network_peers::PeerId;
use reth_network_types::ReputationChangeKind; use reth_network_types::ReputationChangeKind;
use std::{
collections::{HashMap, VecDeque},
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
},
task::{Context, Poll},
};
use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot}; use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_stream::wrappers::UnboundedReceiverStream;
use crate::message::BlockRequest;
type InflightHeadersRequest<H> = Request<HeadersRequest, PeerRequestResult<Vec<H>>>; type InflightHeadersRequest<H> = Request<HeadersRequest, PeerRequestResult<Vec<H>>>;
type InflightBodiesRequest<B> = Request<Vec<B256>, PeerRequestResult<Vec<B>>>; type InflightBodiesRequest<B> = Request<Vec<B256>, PeerRequestResult<Vec<B>>>;

View File

@ -1,10 +1,9 @@
use futures::Future;
use pin_project::pin_project;
use std::{ use std::{
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
use futures::Future;
use pin_project::pin_project;
use tokio::sync::oneshot::{error::RecvError, Receiver}; use tokio::sync::oneshot::{error::RecvError, Receiver};
/// Flatten a [Receiver] message in order to get rid of the [RecvError] result /// Flatten a [Receiver] message in order to get rid of the [RecvError] result

View File

@ -1,10 +1,8 @@
//! This module provides an abstraction over block import in the form of the `BlockImport` trait. //! This module provides an abstraction over block import in the form of the `BlockImport` trait.
use std::task::{Context, Poll};
use reth_network_peers::PeerId;
use crate::message::NewBlockMessage; use crate::message::NewBlockMessage;
use reth_network_peers::PeerId;
use std::task::{Context, Poll};
/// Abstraction over block import. /// Abstraction over block import.
pub trait BlockImport<B = reth_primitives::Block>: std::fmt::Debug + Send + Sync { pub trait BlockImport<B = reth_primitives::Block>: std::fmt::Debug + Send + Sync {

View File

@ -1,13 +1,12 @@
//! Contains connection-oriented interfaces. //! Contains connection-oriented interfaces.
use futures::{ready, Stream};
use std::{ use std::{
io, io,
net::SocketAddr, net::SocketAddr,
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
use futures::{ready, Stream};
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
/// A tcp connection listener. /// A tcp connection listener.

View File

@ -15,39 +15,6 @@
//! (IP+port) of our node is published via discovery, remote peers can initiate inbound connections //! (IP+port) of our node is published via discovery, remote peers can initiate inbound connections
//! to the local node. Once a (tcp) connection is established, both peers start to authenticate a [RLPx session](https://github.com/ethereum/devp2p/blob/master/rlpx.md) via a handshake. If the handshake was successful, both peers announce their capabilities and are now ready to exchange sub-protocol messages via the `RLPx` session. //! to the local node. Once a (tcp) connection is established, both peers start to authenticate a [RLPx session](https://github.com/ethereum/devp2p/blob/master/rlpx.md) via a handshake. If the handshake was successful, both peers announce their capabilities and are now ready to exchange sub-protocol messages via the `RLPx` session.
use std::{
net::SocketAddr,
path::Path,
pin::Pin,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
},
task::{Context, Poll},
time::{Duration, Instant},
};
use futures::{Future, StreamExt};
use parking_lot::Mutex;
use reth_eth_wire::{
capability::CapabilityMessage, Capabilities, DisconnectReason, EthNetworkPrimitives,
NetworkPrimitives,
};
use reth_fs_util::{self as fs, FsPathError};
use reth_metrics::common::mpsc::UnboundedMeteredSender;
use reth_network_api::{
test_utils::PeersHandle, EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest,
};
use reth_network_peers::{NodeRecord, PeerId};
use reth_network_types::ReputationChangeKind;
use reth_storage_api::BlockNumReader;
use reth_tasks::shutdown::GracefulShutdown;
use reth_tokio_util::EventSender;
use secp256k1::SecretKey;
use tokio::sync::mpsc::{self, error::TrySendError};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, error, trace, warn};
use crate::{ use crate::{
budget::{DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL, DEFAULT_BUDGET_TRY_DRAIN_SWARM}, budget::{DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL, DEFAULT_BUDGET_TRY_DRAIN_SWARM},
config::NetworkConfig, config::NetworkConfig,
@ -68,6 +35,37 @@ use crate::{
transactions::NetworkTransactionEvent, transactions::NetworkTransactionEvent,
FetchClient, NetworkBuilder, FetchClient, NetworkBuilder,
}; };
use futures::{Future, StreamExt};
use parking_lot::Mutex;
use reth_eth_wire::{
capability::CapabilityMessage, Capabilities, DisconnectReason, EthNetworkPrimitives,
NetworkPrimitives,
};
use reth_fs_util::{self as fs, FsPathError};
use reth_metrics::common::mpsc::UnboundedMeteredSender;
use reth_network_api::{
test_utils::PeersHandle, EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest,
};
use reth_network_peers::{NodeRecord, PeerId};
use reth_network_types::ReputationChangeKind;
use reth_storage_api::BlockNumReader;
use reth_tasks::shutdown::GracefulShutdown;
use reth_tokio_util::EventSender;
use secp256k1::SecretKey;
use std::{
net::SocketAddr,
path::Path,
pin::Pin,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
},
task::{Context, Poll},
time::{Duration, Instant},
};
use tokio::sync::mpsc::{self, error::TrySendError};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, error, trace, warn};
#[cfg_attr(doc, aquamarine::aquamarine)] #[cfg_attr(doc, aquamarine::aquamarine)]
/// Manages the _entire_ state of the network. /// Manages the _entire_ state of the network.

View File

@ -3,11 +3,6 @@
//! An `RLPx` stream is multiplexed via the prepended message-id of a framed message. //! An `RLPx` stream is multiplexed via the prepended message-id of a framed message.
//! Capabilities are exchanged via the `RLPx` `Hello` message as pairs of `(id, version)`, <https://github.com/ethereum/devp2p/blob/master/rlpx.md#capability-messaging> //! Capabilities are exchanged via the `RLPx` `Hello` message as pairs of `(id, version)`, <https://github.com/ethereum/devp2p/blob/master/rlpx.md#capability-messaging>
use std::{
sync::Arc,
task::{ready, Context, Poll},
};
use alloy_consensus::BlockHeader; use alloy_consensus::BlockHeader;
use alloy_primitives::{Bytes, B256}; use alloy_primitives::{Bytes, B256};
use futures::FutureExt; use futures::FutureExt;
@ -20,6 +15,10 @@ use reth_eth_wire::{
use reth_network_api::PeerRequest; use reth_network_api::PeerRequest;
use reth_network_p2p::error::{RequestError, RequestResult}; use reth_network_p2p::error::{RequestError, RequestResult};
use reth_primitives::{PooledTransactionsElement, ReceiptWithBloom}; use reth_primitives::{PooledTransactionsElement, ReceiptWithBloom};
use std::{
sync::Arc,
task::{ready, Context, Poll},
};
use tokio::sync::oneshot; use tokio::sync::oneshot;
/// Internal form of a `NewBlock` message /// Internal form of a `NewBlock` message

View File

@ -1,11 +1,7 @@
use std::{ use crate::{
net::SocketAddr, config::NetworkMode, protocol::RlpxSubProtocol, swarm::NetworkConnectionState,
sync::{ transactions::TransactionsHandle, FetchClient,
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
Arc,
},
}; };
use alloy_primitives::B256; use alloy_primitives::B256;
use enr::Enr; use enr::Enr;
use parking_lot::Mutex; use parking_lot::Mutex;
@ -15,6 +11,7 @@ use reth_eth_wire::{
DisconnectReason, EthNetworkPrimitives, NetworkPrimitives, NewBlock, DisconnectReason, EthNetworkPrimitives, NetworkPrimitives, NewBlock,
NewPooledTransactionHashes, SharedTransactions, NewPooledTransactionHashes, SharedTransactions,
}; };
use reth_ethereum_forks::Head;
use reth_network_api::{ use reth_network_api::{
test_utils::{PeersHandle, PeersHandleProvider}, test_utils::{PeersHandle, PeersHandleProvider},
BlockDownloaderProvider, DiscoveryEvent, NetworkError, NetworkEvent, BlockDownloaderProvider, DiscoveryEvent, NetworkError, NetworkEvent,
@ -24,20 +21,22 @@ use reth_network_api::{
use reth_network_p2p::sync::{NetworkSyncUpdater, SyncState, SyncStateProvider}; use reth_network_p2p::sync::{NetworkSyncUpdater, SyncState, SyncStateProvider};
use reth_network_peers::{NodeRecord, PeerId}; use reth_network_peers::{NodeRecord, PeerId};
use reth_network_types::{PeerAddr, PeerKind, Reputation, ReputationChangeKind}; use reth_network_types::{PeerAddr, PeerKind, Reputation, ReputationChangeKind};
use reth_primitives::{Head, TransactionSigned}; use reth_primitives::TransactionSigned;
use reth_tokio_util::{EventSender, EventStream}; use reth_tokio_util::{EventSender, EventStream};
use secp256k1::SecretKey; use secp256k1::SecretKey;
use std::{
net::SocketAddr,
sync::{
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
Arc,
},
};
use tokio::sync::{ use tokio::sync::{
mpsc::{self, UnboundedSender}, mpsc::{self, UnboundedSender},
oneshot, oneshot,
}; };
use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_stream::wrappers::UnboundedReceiverStream;
use crate::{
config::NetworkMode, protocol::RlpxSubProtocol, swarm::NetworkConnectionState,
transactions::TransactionsHandle, FetchClient,
};
/// A _shareable_ network frontend. Used to interact with the network. /// A _shareable_ network frontend. Used to interact with the network.
/// ///
/// See also [`NetworkManager`](crate::NetworkManager). /// See also [`NetworkManager`](crate::NetworkManager).

View File

@ -1,16 +1,13 @@
//! Peer related implementations //! Peer related implementations
use std::{ use crate::{
collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, error::SessionError,
fmt::Display, session::{Direction, PendingSessionHandshakeError},
io::{self}, swarm::NetworkConnectionState,
net::{IpAddr, SocketAddr},
task::{Context, Poll},
time::Duration,
}; };
use futures::StreamExt; use futures::StreamExt;
use reth_eth_wire::{errors::EthStreamError, DisconnectReason}; use reth_eth_wire::{errors::EthStreamError, DisconnectReason};
use reth_ethereum_forks::ForkId;
use reth_net_banlist::BanList; use reth_net_banlist::BanList;
use reth_network_api::test_utils::{PeerCommand, PeersHandle}; use reth_network_api::test_utils::{PeerCommand, PeersHandle};
use reth_network_peers::{NodeRecord, PeerId}; use reth_network_peers::{NodeRecord, PeerId};
@ -22,7 +19,14 @@ use reth_network_types::{
ConnectionsConfig, Peer, PeerAddr, PeerConnectionState, PeerKind, PeersConfig, ConnectionsConfig, Peer, PeerAddr, PeerConnectionState, PeerKind, PeersConfig,
ReputationChangeKind, ReputationChangeOutcome, ReputationChangeWeights, ReputationChangeKind, ReputationChangeOutcome, ReputationChangeWeights,
}; };
use reth_primitives::ForkId; use std::{
collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
fmt::Display,
io::{self},
net::{IpAddr, SocketAddr},
task::{Context, Poll},
time::Duration,
};
use thiserror::Error; use thiserror::Error;
use tokio::{ use tokio::{
sync::mpsc, sync::mpsc,
@ -31,12 +35,6 @@ use tokio::{
use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{trace, warn}; use tracing::{trace, warn};
use crate::{
error::SessionError,
session::{Direction, PendingSessionHandshakeError},
swarm::NetworkConnectionState,
};
/// Maintains the state of _all_ the peers known to the network. /// Maintains the state of _all_ the peers known to the network.
/// ///
/// This is supposed to be owned by the network itself, but can be reached via the [`PeersHandle`]. /// This is supposed to be owned by the network itself, but can be reached via the [`PeersHandle`].

View File

@ -2,19 +2,18 @@
//! //!
//! See also <https://github.com/ethereum/devp2p/blob/master/README.md> //! See also <https://github.com/ethereum/devp2p/blob/master/README.md>
use std::{
fmt,
net::SocketAddr,
ops::{Deref, DerefMut},
pin::Pin,
};
use alloy_primitives::bytes::BytesMut; use alloy_primitives::bytes::BytesMut;
use futures::Stream; use futures::Stream;
use reth_eth_wire::{ use reth_eth_wire::{
capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol, capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol,
}; };
use reth_network_api::{Direction, PeerId}; use reth_network_api::{Direction, PeerId};
use std::{
fmt,
net::SocketAddr,
ops::{Deref, DerefMut},
pin::Pin,
};
/// A trait that allows to offer additional RLPx-based application-level protocols when establishing /// A trait that allows to offer additional RLPx-based application-level protocols when establishing
/// a peer-to-peer connection. /// a peer-to-peer connection.

View File

@ -11,6 +11,14 @@ use std::{
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use crate::{
message::{NewBlockMessage, PeerMessage, PeerResponse, PeerResponseResult},
session::{
conn::EthRlpxConnection,
handle::{ActiveSessionMessage, SessionCommand},
SessionId,
},
};
use alloy_primitives::Sealable; use alloy_primitives::Sealable;
use futures::{stream::Fuse, SinkExt, StreamExt}; use futures::{stream::Fuse, SinkExt, StreamExt};
use metrics::Gauge; use metrics::Gauge;
@ -34,15 +42,6 @@ use tokio_stream::wrappers::ReceiverStream;
use tokio_util::sync::PollSender; use tokio_util::sync::PollSender;
use tracing::{debug, trace}; use tracing::{debug, trace};
use crate::{
message::{NewBlockMessage, PeerMessage, PeerResponse, PeerResponseResult},
session::{
conn::EthRlpxConnection,
handle::{ActiveSessionMessage, SessionCommand},
SessionId,
},
};
// Constants for timeout updating. // Constants for timeout updating.
/// Minimum timeout value /// Minimum timeout value

View File

@ -1,10 +1,5 @@
//! Connection types for a session //! Connection types for a session
use std::{
pin::Pin,
task::{Context, Poll},
};
use futures::{Sink, Stream}; use futures::{Sink, Stream};
use reth_ecies::stream::ECIESStream; use reth_ecies::stream::ECIESStream;
use reth_eth_wire::{ use reth_eth_wire::{
@ -13,6 +8,10 @@ use reth_eth_wire::{
multiplex::{ProtocolProxy, RlpxSatelliteStream}, multiplex::{ProtocolProxy, RlpxSatelliteStream},
EthMessage, EthNetworkPrimitives, EthStream, EthVersion, NetworkPrimitives, P2PStream, EthMessage, EthNetworkPrimitives, EthStream, EthVersion, NetworkPrimitives, P2PStream,
}; };
use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::net::TcpStream; use tokio::net::TcpStream;
/// The type of the underlying peer network connection. /// The type of the underlying peer network connection.

View File

@ -1,8 +1,7 @@
use super::ExceedsSessionLimit;
use reth_network_api::Direction; use reth_network_api::Direction;
use reth_network_types::SessionLimits; use reth_network_types::SessionLimits;
use super::ExceedsSessionLimit;
/// Keeps track of all sessions. /// Keeps track of all sessions.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct SessionCounter { pub struct SessionCounter {

View File

@ -1,7 +1,10 @@
//! Session handles. //! Session handles.
use std::{io, net::SocketAddr, sync::Arc, time::Instant}; use crate::{
message::PeerMessage,
session::{conn::EthRlpxConnection, Direction, SessionId},
PendingSessionHandshakeError,
};
use reth_ecies::ECIESError; use reth_ecies::ECIESError;
use reth_eth_wire::{ use reth_eth_wire::{
capability::CapabilityMessage, errors::EthStreamError, Capabilities, DisconnectReason, capability::CapabilityMessage, errors::EthStreamError, Capabilities, DisconnectReason,
@ -10,17 +13,12 @@ use reth_eth_wire::{
use reth_network_api::PeerInfo; use reth_network_api::PeerInfo;
use reth_network_peers::{NodeRecord, PeerId}; use reth_network_peers::{NodeRecord, PeerId};
use reth_network_types::PeerKind; use reth_network_types::PeerKind;
use std::{io, net::SocketAddr, sync::Arc, time::Instant};
use tokio::sync::{ use tokio::sync::{
mpsc::{self, error::SendError}, mpsc::{self, error::SendError},
oneshot, oneshot,
}; };
use crate::{
message::PeerMessage,
session::{conn::EthRlpxConnection, Direction, SessionId},
PendingSessionHandshakeError,
};
/// A handler attached to a peer session that's not authenticated yet, pending Handshake and hello /// A handler attached to a peer session that's not authenticated yet, pending Handshake and hello
/// message which exchanges the `capabilities` of the peer. /// message which exchanges the `capabilities` of the peer.
/// ///

View File

@ -23,6 +23,12 @@ use std::{
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use crate::{
message::PeerMessage,
metrics::SessionManagerMetrics,
protocol::{IntoRlpxSubProtocol, RlpxSubProtocolHandlers, RlpxSubProtocols},
session::active::ActiveSession,
};
use counter::SessionCounter; use counter::SessionCounter;
use futures::{future::Either, io, FutureExt, StreamExt}; use futures::{future::Either, io, FutureExt, StreamExt};
use reth_ecies::{stream::ECIESStream, ECIESError}; use reth_ecies::{stream::ECIESStream, ECIESError};
@ -31,11 +37,11 @@ use reth_eth_wire::{
Capabilities, DisconnectReason, EthVersion, HelloMessageWithProtocols, NetworkPrimitives, Capabilities, DisconnectReason, EthVersion, HelloMessageWithProtocols, NetworkPrimitives,
Status, UnauthedEthStream, UnauthedP2PStream, Status, UnauthedEthStream, UnauthedP2PStream,
}; };
use reth_ethereum_forks::{ForkFilter, ForkId, ForkTransition, Head};
use reth_metrics::common::mpsc::MeteredPollSender; use reth_metrics::common::mpsc::MeteredPollSender;
use reth_network_api::{PeerRequest, PeerRequestSender}; use reth_network_api::{PeerRequest, PeerRequestSender};
use reth_network_peers::PeerId; use reth_network_peers::PeerId;
use reth_network_types::SessionsConfig; use reth_network_types::SessionsConfig;
use reth_primitives::{ForkFilter, ForkId, ForkTransition, Head};
use reth_tasks::TaskSpawner; use reth_tasks::TaskSpawner;
use rustc_hash::FxHashMap; use rustc_hash::FxHashMap;
use secp256k1::SecretKey; use secp256k1::SecretKey;
@ -48,13 +54,6 @@ use tokio_stream::wrappers::ReceiverStream;
use tokio_util::sync::PollSender; use tokio_util::sync::PollSender;
use tracing::{debug, instrument, trace}; use tracing::{debug, instrument, trace};
use crate::{
message::PeerMessage,
metrics::SessionManagerMetrics,
protocol::{IntoRlpxSubProtocol, RlpxSubProtocolHandlers, RlpxSubProtocols},
session::active::ActiveSession,
};
/// Internal identifier for active sessions. /// Internal identifier for active sessions.
#[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)] #[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)]
pub struct SessionId(usize); pub struct SessionId(usize);

View File

@ -1,5 +1,25 @@
//! Keeps track of the state of the network. //! Keeps track of the state of the network.
use crate::{
cache::LruCache,
discovery::Discovery,
fetch::{BlockResponseOutcome, FetchAction, StateFetcher},
message::{BlockRequest, NewBlockMessage, PeerResponse, PeerResponseResult},
peers::{PeerAction, PeersManager},
FetchClient,
};
use alloy_consensus::BlockHeader;
use alloy_primitives::B256;
use rand::seq::SliceRandom;
use reth_eth_wire::{
BlockHashNumber, Capabilities, DisconnectReason, EthNetworkPrimitives, NetworkPrimitives,
NewBlockHashes, Status,
};
use reth_ethereum_forks::ForkId;
use reth_network_api::{DiscoveredEvent, DiscoveryEvent, PeerRequest, PeerRequestSender};
use reth_network_peers::PeerId;
use reth_network_types::{PeerAddr, PeerKind};
use reth_primitives_traits::Block;
use std::{ use std::{
collections::{HashMap, VecDeque}, collections::{HashMap, VecDeque},
fmt, fmt,
@ -11,31 +31,9 @@ use std::{
}, },
task::{Context, Poll}, task::{Context, Poll},
}; };
use alloy_consensus::BlockHeader;
use alloy_primitives::B256;
use rand::seq::SliceRandom;
use reth_eth_wire::{
BlockHashNumber, Capabilities, DisconnectReason, EthNetworkPrimitives, NetworkPrimitives,
NewBlockHashes, Status,
};
use reth_network_api::{DiscoveredEvent, DiscoveryEvent, PeerRequest, PeerRequestSender};
use reth_network_peers::PeerId;
use reth_network_types::{PeerAddr, PeerKind};
use reth_primitives::ForkId;
use reth_primitives_traits::Block;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tracing::{debug, trace}; use tracing::{debug, trace};
use crate::{
cache::LruCache,
discovery::Discovery,
fetch::{BlockResponseOutcome, FetchAction, StateFetcher},
message::{BlockRequest, NewBlockMessage, PeerResponse, PeerResponseResult},
peers::{PeerAction, PeersManager},
FetchClient,
};
/// Cache limit of blocks to keep track of for a single peer. /// Cache limit of blocks to keep track of for a single peer.
const PEER_BLOCK_CACHE_LIMIT: u32 = 512; const PEER_BLOCK_CACHE_LIMIT: u32 = 512;

View File

@ -1,20 +1,3 @@
use std::{
io,
net::SocketAddr,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use futures::Stream;
use reth_eth_wire::{
capability::CapabilityMessage, errors::EthStreamError, Capabilities, DisconnectReason,
EthNetworkPrimitives, EthVersion, NetworkPrimitives, Status,
};
use reth_network_api::{PeerRequest, PeerRequestSender};
use reth_network_peers::PeerId;
use tracing::trace;
use crate::{ use crate::{
listener::{ConnectionListener, ListenerEvent}, listener::{ConnectionListener, ListenerEvent},
message::PeerMessage, message::PeerMessage,
@ -23,6 +6,21 @@ use crate::{
session::{Direction, PendingSessionHandshakeError, SessionEvent, SessionId, SessionManager}, session::{Direction, PendingSessionHandshakeError, SessionEvent, SessionId, SessionManager},
state::{NetworkState, StateAction}, state::{NetworkState, StateAction},
}; };
use futures::Stream;
use reth_eth_wire::{
capability::CapabilityMessage, errors::EthStreamError, Capabilities, DisconnectReason,
EthNetworkPrimitives, EthVersion, NetworkPrimitives, Status,
};
use reth_network_api::{PeerRequest, PeerRequestSender};
use reth_network_peers::PeerId;
use std::{
io,
net::SocketAddr,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tracing::trace;
#[cfg_attr(doc, aquamarine::aquamarine)] #[cfg_attr(doc, aquamarine::aquamarine)]
/// Contains the connectivity related state of the network. /// Contains the connectivity related state of the network.

View File

@ -1,7 +1,6 @@
use std::{net::SocketAddr, time::Duration};
use enr::{k256::ecdsa::SigningKey, Enr, EnrPublicKey}; use enr::{k256::ecdsa::SigningKey, Enr, EnrPublicKey};
use reth_network_peers::PeerId; use reth_network_peers::PeerId;
use std::{net::SocketAddr, time::Duration};
/// The timeout for tests that create a `GethInstance` /// The timeout for tests that create a `GethInstance`
pub const GETH_TIMEOUT: Duration = Duration::from_secs(60); pub const GETH_TIMEOUT: Duration = Duration::from_secs(60);

View File

@ -1,13 +1,13 @@
//! A network implementation for testing purposes. //! A network implementation for testing purposes.
use std::{ use crate::{
fmt, builder::ETH_REQUEST_CHANNEL_CAPACITY,
future::Future, error::NetworkError,
net::{Ipv4Addr, SocketAddr, SocketAddrV4}, eth_requests::EthRequestHandler,
pin::Pin, protocol::IntoRlpxSubProtocol,
task::{Context, Poll}, transactions::{TransactionsHandle, TransactionsManager, TransactionsManagerConfig},
NetworkConfig, NetworkConfigBuilder, NetworkHandle, NetworkManager,
}; };
use futures::{FutureExt, StreamExt}; use futures::{FutureExt, StreamExt};
use pin_project::pin_project; use pin_project::pin_project;
use reth_chainspec::{Hardforks, MAINNET}; use reth_chainspec::{Hardforks, MAINNET};
@ -27,6 +27,13 @@ use reth_transaction_pool::{
EthTransactionPool, TransactionPool, TransactionValidationTaskExecutor, EthTransactionPool, TransactionPool, TransactionValidationTaskExecutor,
}; };
use secp256k1::SecretKey; use secp256k1::SecretKey;
use std::{
fmt,
future::Future,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
pin::Pin,
task::{Context, Poll},
};
use tokio::{ use tokio::{
sync::{ sync::{
mpsc::{channel, unbounded_channel}, mpsc::{channel, unbounded_channel},
@ -35,15 +42,6 @@ use tokio::{
task::JoinHandle, task::JoinHandle,
}; };
use crate::{
builder::ETH_REQUEST_CHANNEL_CAPACITY,
error::NetworkError,
eth_requests::EthRequestHandler,
protocol::IntoRlpxSubProtocol,
transactions::{TransactionsHandle, TransactionsManager, TransactionsManagerConfig},
NetworkConfig, NetworkConfigBuilder, NetworkHandle, NetworkManager,
};
/// A test network consisting of multiple peers. /// A test network consisting of multiple peers.
pub struct Testnet<C, Pool> { pub struct Testnet<C, Pool> {
/// All running peers in the network. /// All running peers in the network.

View File

@ -1,5 +1,3 @@
use derive_more::Constructor;
use super::{ use super::{
DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER, DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER,
DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ, DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
@ -9,6 +7,7 @@ use crate::transactions::constants::tx_fetcher::{
DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH, DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS, DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH, DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS,
DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER, DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
}; };
use derive_more::Constructor;
/// Configuration for managing transactions within the network. /// Configuration for managing transactions within the network.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]

View File

@ -25,13 +25,18 @@
//! before it's re-tried. Nonetheless, the capacity of the buffered hashes cache must be large //! before it's re-tried. Nonetheless, the capacity of the buffered hashes cache must be large
//! enough to buffer many hashes during network failure, to allow for recovery. //! enough to buffer many hashes during network failure, to allow for recovery.
use std::{ use super::{
collections::HashMap, config::TransactionFetcherConfig,
pin::Pin, constants::{tx_fetcher::*, SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST},
task::{ready, Context, Poll}, MessageFilter, PeerMetadata, PooledTransactions,
time::Duration, SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
};
use crate::{
cache::{LruCache, LruMap},
duration_metered_exec,
metrics::TransactionFetcherMetrics,
transactions::{validation, PartiallyFilterMessage},
}; };
use alloy_primitives::TxHash; use alloy_primitives::TxHash;
use derive_more::{Constructor, Deref}; use derive_more::{Constructor, Deref};
use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt}; use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
@ -47,23 +52,16 @@ use reth_primitives::PooledTransactionsElement;
use schnellru::ByLength; use schnellru::ByLength;
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
use smallvec::{smallvec, SmallVec}; use smallvec::{smallvec, SmallVec};
use std::{
collections::HashMap,
pin::Pin,
task::{ready, Context, Poll},
time::Duration,
};
use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError}; use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError};
use tracing::{debug, trace}; use tracing::{debug, trace};
use validation::FilterOutcome; use validation::FilterOutcome;
use super::{
config::TransactionFetcherConfig,
constants::{tx_fetcher::*, SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST},
MessageFilter, PeerMetadata, PooledTransactions,
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
};
use crate::{
cache::{LruCache, LruMap},
duration_metered_exec,
metrics::TransactionFetcherMetrics,
transactions::{validation, PartiallyFilterMessage},
};
/// The type responsible for fetching missing transactions from peers. /// The type responsible for fetching missing transactions from peers.
/// ///
/// This will keep track of unique transaction hashes that are currently being fetched and submits /// This will keep track of unique transaction hashes that are currently being fetched and submits

View File

@ -18,20 +18,19 @@ pub use validation::*;
pub(crate) use fetcher::{FetchEvent, TransactionFetcher}; pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE}; use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE; use crate::{
budget::{
use std::{ DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
collections::{hash_map::Entry, HashMap, HashSet}, DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS,
pin::Pin, DEFAULT_BUDGET_TRY_DRAIN_STREAM,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
}, },
task::{Context, Poll}, cache::LruCache,
time::{Duration, Instant}, duration_metered_exec, metered_poll_nested_stream_with_budget,
metrics::{TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
NetworkHandle,
}; };
use alloy_primitives::{TxHash, B256}; use alloy_primitives::{TxHash, B256};
use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
use futures::{stream::FuturesUnordered, Future, StreamExt}; use futures::{stream::FuturesUnordered, Future, StreamExt};
use reth_eth_wire::{ use reth_eth_wire::{
DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData, DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData,
@ -56,22 +55,20 @@ use reth_transaction_pool::{
GetPooledTransactionLimit, PoolTransaction, PropagateKind, PropagatedTransactions, GetPooledTransactionLimit, PoolTransaction, PropagateKind, PropagatedTransactions,
TransactionPool, ValidPoolTransaction, TransactionPool, ValidPoolTransaction,
}; };
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
task::{Context, Poll},
time::{Duration, Instant},
};
use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError}; use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream}; use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
use tracing::{debug, trace}; use tracing::{debug, trace};
use crate::{
budget::{
DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS,
DEFAULT_BUDGET_TRY_DRAIN_STREAM,
},
cache::LruCache,
duration_metered_exec, metered_poll_nested_stream_with_budget,
metrics::{TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
NetworkHandle,
};
/// The future for importing transactions into the pool. /// The future for importing transactions into the pool.
/// ///
/// Resolves with the result of each transaction import. /// Resolves with the result of each transaction import.

View File

@ -2,8 +2,6 @@
//! and [`NewPooledTransactionHashes68`](reth_eth_wire::NewPooledTransactionHashes68) //! and [`NewPooledTransactionHashes68`](reth_eth_wire::NewPooledTransactionHashes68)
//! announcements. Validation and filtering of announcements is network dependent. //! announcements. Validation and filtering of announcements is network dependent.
use std::{fmt, fmt::Display, mem};
use crate::metrics::{AnnouncedTxTypesMetrics, TxTypesCounter}; use crate::metrics::{AnnouncedTxTypesMetrics, TxTypesCounter};
use alloy_primitives::{Signature, TxHash}; use alloy_primitives::{Signature, TxHash};
use derive_more::{Deref, DerefMut}; use derive_more::{Deref, DerefMut};
@ -12,6 +10,7 @@ use reth_eth_wire::{
MAX_MESSAGE_SIZE, MAX_MESSAGE_SIZE,
}; };
use reth_primitives::TxType; use reth_primitives::TxType;
use std::{fmt, fmt::Display, mem};
use tracing::trace; use tracing::trace;
/// The size of a decoded signature in bytes. /// The size of a decoded signature in bytes.