feat(net): make Status and Hello configurable (#431)

* feat(net): make Status and Hello configurable

* set port
This commit is contained in:
Matthias Seitz
2022-12-14 11:39:09 +01:00
committed by GitHub
parent ac5efc0749
commit 12e7f0acbc
7 changed files with 199 additions and 41 deletions

View File

@ -1,14 +1,15 @@
use crate::{capability::Capability, ProtocolVersion};
use crate::{capability::Capability, EthVersion, ProtocolVersion};
use reth_primitives::PeerId;
use reth_rlp::{RlpDecodable, RlpEncodable};
use serde::{Deserialize, Serialize};
/// The client version: `reth/v{major}.{minor}.{patch}`
pub(crate) const DEFAULT_CLIENT_VERSION: &str = concat!("reth/v", env!("CARGO_PKG_VERSION"));
// TODO: determine if we should allow for the extra fields at the end like EIP-706 suggests
/// Message used in the `p2p` handshake, containing information about the supported RLPx protocol
/// version and capabilities.
#[derive(
Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable, Serialize, Deserialize, Default,
)]
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable, Serialize, Deserialize)]
pub struct HelloMessage {
/// The version of the `p2p` protocol.
pub protocol_version: ProtocolVersion,
@ -23,6 +24,83 @@ pub struct HelloMessage {
pub id: PeerId,
}
// === impl HelloMessage ===
impl HelloMessage {
/// Starts a new [`HelloMessageBuilder`]
///
/// ```
/// use secp256k1::{SECP256K1, SecretKey};
/// use reth_ecies::util::pk2id;
/// use reth_eth_wire::HelloMessage;
/// let secret_key = SecretKey::new(&mut rand::thread_rng());
/// let id = pk2id(&secret_key.public_key(SECP256K1));
/// let status = HelloMessage::builder(id).build();
/// ```
pub fn builder(id: PeerId) -> HelloMessageBuilder {
HelloMessageBuilder::new(id)
}
}
pub struct HelloMessageBuilder {
/// The version of the `p2p` protocol.
pub protocol_version: Option<ProtocolVersion>,
/// Specifies the client software identity, as a human-readable string (e.g.
/// "Ethereum(++)/1.0.0").
pub client_version: Option<String>,
/// The list of supported capabilities and their versions.
pub capabilities: Option<Vec<Capability>>,
/// The port that the client is listening on, zero indicates the client is not listening.
pub port: Option<u16>,
/// The secp256k1 public key corresponding to the node's private key.
pub id: PeerId,
}
// === impl HelloMessageBuilder ===
impl HelloMessageBuilder {
/// Create a new builder to configure a [`HelloMessage`]
pub fn new(id: PeerId) -> Self {
Self { protocol_version: None, client_version: None, capabilities: None, port: None, id }
}
/// Sets the port the client is listening on
pub fn port(mut self, port: u16) -> Self {
self.port = Some(port);
self
}
/// Sets capabilities.
pub fn capabilities(mut self, capabilities: Vec<Capability>) -> Self {
self.capabilities = Some(capabilities);
self
}
/// Sets client version.
pub fn client_version(mut self, client_version: impl Into<String>) -> Self {
self.client_version = Some(client_version.into());
self
}
/// Sets client version.
pub fn protocol_version(mut self, protocol_version: ProtocolVersion) -> Self {
self.protocol_version = Some(protocol_version);
self
}
/// Consumes the type and returns the configured [`HelloMessage`]
pub fn build(self) -> HelloMessage {
let Self { protocol_version, client_version, capabilities, port, id } = self;
HelloMessage {
protocol_version: protocol_version.unwrap_or_default(),
client_version: client_version.unwrap_or_else(|| DEFAULT_CLIENT_VERSION.to_string()),
capabilities: capabilities.unwrap_or_else(|| vec![EthVersion::Eth67.into()]),
port: port.unwrap_or(30303),
id,
}
}
}
#[cfg(test)]
mod tests {
use reth_ecies::util::pk2id;

View File

@ -91,15 +91,16 @@ impl Debug for Status {
}
}
// <https://etherscan.io/block/0>
impl Default for Status {
fn default() -> Self {
Status {
version: EthVersion::Eth67 as u8,
chain: Chain::Named(ethers_core::types::Chain::Mainnet),
total_difficulty: U256::zero(),
total_difficulty: 17_179_869_184u64.into(),
blockhash: MAINNET_GENESIS,
genesis: MAINNET_GENESIS,
forkid: Hardfork::Homestead.fork_id(),
forkid: Hardfork::Latest.fork_id(),
}
}
}

View File

@ -6,9 +6,9 @@ use crate::{
session::SessionsConfig,
};
use reth_discv4::{Discv4Config, Discv4ConfigBuilder, NodeRecord, DEFAULT_DISCOVERY_PORT};
use reth_primitives::{Chain, ForkId, H256};
use reth_primitives::{Chain, PeerId, H256};
use reth_tasks::TaskExecutor;
use secp256k1::SecretKey;
use secp256k1::{SecretKey, SECP256K1};
use std::{
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
sync::Arc,
@ -21,6 +21,8 @@ mod __reexport {
pub use secp256k1::SecretKey;
}
pub use __reexport::*;
use reth_ecies::util::pk2id;
use reth_eth_wire::{HelloMessage, Status};
/// Convenience function to create a new random [`SecretKey`]
pub fn rng_secret_key() -> SecretKey {
@ -45,9 +47,6 @@ pub struct NetworkConfig<C> {
pub peers_config: PeersConfig,
/// How to configure the [SessionManager](crate::session::SessionManager).
pub sessions_config: SessionsConfig,
/// A fork identifier as defined by EIP-2124.
/// Serves as the chain compatibility identifier.
pub fork_id: Option<ForkId>,
/// The id of the network
pub chain: Chain,
/// Genesis hash of the network
@ -58,6 +57,10 @@ pub struct NetworkConfig<C> {
pub network_mode: NetworkMode,
/// The executor to use for spawning tasks.
pub executor: Option<TaskExecutor>,
/// The `Status` message to send to peers at the beginning.
pub status: Status,
/// Sets the hello message for the p2p handshake in RLPx
pub hello_message: HelloMessage,
}
// === impl NetworkConfig ===
@ -105,9 +108,6 @@ pub struct NetworkConfigBuilder<C> {
peers_config: Option<PeersConfig>,
/// How to configure the sessions manager
sessions_config: Option<SessionsConfig>,
/// A fork identifier as defined by EIP-2124.
/// Serves as the chain compatibility identifier.
fork_id: Option<ForkId>,
/// The network's chain id
chain: Chain,
/// Network genesis hash
@ -118,6 +118,10 @@ pub struct NetworkConfigBuilder<C> {
network_mode: NetworkMode,
/// The executor to use for spawning tasks.
executor: Option<TaskExecutor>,
/// The `Status` message to send to peers at the beginning.
status: Option<Status>,
/// Sets the hello message for the p2p handshake in RLPx
hello_message: Option<HelloMessage>,
}
// === impl NetworkConfigBuilder ===
@ -134,15 +138,54 @@ impl<C> NetworkConfigBuilder<C> {
listener_addr: None,
peers_config: None,
sessions_config: None,
fork_id: None,
chain: Chain::Named(reth_primitives::rpc::Chain::Mainnet),
genesis_hash: Default::default(),
block_import: Box::<ProofOfStakeBlockImport>::default(),
network_mode: Default::default(),
executor: None,
status: None,
hello_message: None,
}
}
/// Returns the configured [`PeerId`]
pub fn get_peer_id(&self) -> PeerId {
pk2id(&self.secret_key.public_key(SECP256K1))
}
/// Sets the `Status` message to send when connecting to peers.
///
/// ```
/// # use reth_eth_wire::Status;
/// # use reth_network::NetworkConfigBuilder;
/// # fn builder<C>(builder: NetworkConfigBuilder<C>) {
/// builder.status(
/// Status::builder().build()
/// );
/// # }
/// ```
pub fn status(mut self, status: Status) -> Self {
self.status = Some(status);
self
}
/// Sets the `HelloMessage` to send when connecting to peers.
///
/// ```
/// # use reth_eth_wire::HelloMessage;
/// # use reth_network::NetworkConfigBuilder;
/// # fn builder<C>(builder: NetworkConfigBuilder<C>) {
/// let peer_id = builder.get_peer_id();
/// builder.hello_message(
/// HelloMessage::builder(peer_id).build()
/// );
/// # }
/// ```
pub fn hello_message(mut self, hello_message: HelloMessage) -> Self {
self.hello_message = Some(hello_message);
self
}
/// set a custom peer config for how peers are handled
pub fn peer_config(mut self, config: PeersConfig) -> Self {
self.peers_config = Some(config);
@ -199,6 +242,7 @@ impl<C> NetworkConfigBuilder<C> {
/// Consumes the type and creates the actual [`NetworkConfig`]
pub fn build(self) -> NetworkConfig<C> {
let peer_id = self.get_peer_id();
let Self {
client,
secret_key,
@ -208,13 +252,23 @@ impl<C> NetworkConfigBuilder<C> {
listener_addr,
peers_config,
sessions_config,
fork_id,
chain,
genesis_hash,
block_import,
network_mode,
executor,
status,
hello_message,
} = self;
let listener_addr = listener_addr.unwrap_or_else(|| {
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT))
});
let mut hello_message =
hello_message.unwrap_or_else(|| HelloMessage::builder(peer_id).build());
hello_message.port = listener_addr.port();
NetworkConfig {
client,
secret_key,
@ -223,17 +277,16 @@ impl<C> NetworkConfigBuilder<C> {
discovery_addr: discovery_addr.unwrap_or_else(|| {
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT))
}),
listener_addr: listener_addr.unwrap_or_else(|| {
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT))
}),
listener_addr,
peers_config: peers_config.unwrap_or_default(),
sessions_config: sessions_config.unwrap_or_default(),
fork_id,
chain,
genesis_hash,
block_import,
network_mode,
executor,
status: status.unwrap_or_default(),
hello_message,
}
}
}

View File

@ -97,7 +97,7 @@ mod swarm;
pub mod transactions;
pub use builder::NetworkBuilder;
pub use config::NetworkConfig;
pub use config::{NetworkConfig, NetworkConfigBuilder};
pub use fetch::FetchClient;
pub use manager::{NetworkEvent, NetworkManager};
pub use message::PeerRequest;

View File

@ -145,6 +145,8 @@ where
network_mode,
boot_nodes,
executor,
hello_message,
status,
..
} = config;
@ -160,7 +162,8 @@ where
// need to retrieve the addr here since provided port could be `0`
let local_peer_id = discovery.local_id();
let sessions = SessionManager::new(secret_key, sessions_config, executor);
let sessions =
SessionManager::new(secret_key, sessions_config, executor, status, hello_message);
let state = NetworkState::new(client, discovery, peers_manger, genesis_hash);
let swarm = Swarm::new(incoming, sessions, state);

View File

@ -5,6 +5,7 @@ use crate::{
message::PeerMessage,
session::{
active::ActiveSession,
config::SessionCounter,
handle::{
ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle,
SessionCommand,
@ -17,11 +18,10 @@ use reth_ecies::stream::ECIESStream;
use reth_eth_wire::{
capability::{Capabilities, CapabilityMessage},
error::EthStreamError,
DisconnectReason, HelloBuilder, HelloMessage, Status, StatusBuilder, UnauthedEthStream,
UnauthedP2PStream,
DisconnectReason, HelloMessage, Status, UnauthedEthStream, UnauthedP2PStream,
};
use reth_primitives::{ForkFilter, Hardfork, PeerId};
use secp256k1::{SecretKey, SECP256K1};
use secp256k1::SecretKey;
use std::{
collections::HashMap,
future::Future,
@ -40,9 +40,8 @@ use tracing::{instrument, trace, warn};
mod active;
mod config;
mod handle;
use crate::session::config::SessionCounter;
pub use config::SessionsConfig;
use reth_ecies::util::pk2id;
use reth_tasks::TaskExecutor;
/// Internal identifier for active sessions.
@ -62,8 +61,8 @@ pub(crate) struct SessionManager {
secret_key: SecretKey,
/// The `Status` message to send to peers.
status: Status,
/// THe `Hello` message to send to peers.
hello: HelloMessage,
/// THe `HelloMessage` message to send to peers.
hello_message: HelloMessage,
/// The [`ForkFilter`] used to validate the peer's `Status` message.
fork_filter: ForkFilter,
/// Size of the command buffer per session.
@ -101,18 +100,14 @@ impl SessionManager {
secret_key: SecretKey,
config: SessionsConfig,
executor: Option<TaskExecutor>,
status: Status,
hello_message: HelloMessage,
) -> Self {
let (pending_sessions_tx, pending_sessions_rx) = mpsc::channel(config.session_event_buffer);
let (active_session_tx, active_session_rx) = mpsc::channel(config.session_event_buffer);
let pk = secret_key.public_key(SECP256K1);
let peer_id = pk2id(&pk);
// TODO: make sure this is the right place to put these builders - maybe per-Network rather
// than per-Session?
let hello = HelloBuilder::new(peer_id).build();
let status = StatusBuilder::default().build();
let fork_filter = Hardfork::Frontier.fork_filter();
let hardfork = Hardfork::from(status.forkid.next);
let fork_filter = hardfork.fork_filter();
Self {
next_id: 0,
@ -120,7 +115,7 @@ impl SessionManager {
request_timeout: config.request_timeout,
secret_key,
status,
hello,
hello_message,
fork_filter,
session_command_buffer: config.session_command_buffer,
executor,
@ -180,7 +175,7 @@ impl SessionManager {
pending_events,
remote_addr,
self.secret_key,
self.hello.clone(),
self.hello_message.clone(),
self.status,
self.fork_filter.clone(),
));
@ -204,7 +199,7 @@ impl SessionManager {
remote_addr,
remote_peer_id,
self.secret_key,
self.hello.clone(),
self.hello_message.clone(),
self.status,
self.fork_filter.clone(),
));

View File

@ -87,6 +87,34 @@ async fn test_connect_with_boot_nodes() {
}
}
#[tokio::test(flavor = "multi_thread")]
#[ignore]
async fn test_connect_with_builder() {
reth_tracing::init_tracing();
let secret_key = SecretKey::new(&mut rand::thread_rng());
let mut discv4 = Discv4Config::builder();
discv4.add_boot_nodes(mainnet_nodes());
let client = Arc::new(TestApi::default());
let config = NetworkConfig::builder(Arc::clone(&client), secret_key).discovery(discv4).build();
let (handle, network, _, requests) = NetworkManager::new(config)
.await
.unwrap()
.into_builder()
.request_handler(client)
.split_with_handle();
let mut events = handle.event_listener();
tokio::task::spawn(async move {
tokio::join!(network, requests);
});
while let Some(ev) = events.next().await {
dbg!(ev);
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_incoming_node_id_blacklist() {
reth_tracing::init_tracing();
@ -200,7 +228,7 @@ async fn test_outgoing_connect_with_single_geth() {
let geth_socket = SocketAddr::new([127, 0, 0, 1].into(), geth_p2p_port);
let geth_endpoint = SocketAddr::new([127, 0, 0, 1].into(), geth.port()).to_string();
let provider = Provider::<Http>::try_from(format!("http://{}", geth_endpoint)).unwrap();
let provider = Provider::<Http>::try_from(format!("http://{geth_endpoint}")).unwrap();
// get the peer id we should be expecting
let geth_peer_id: PeerId =