feat(cli): add persistent peers (#1167)

Co-authored-by: lambdaclass-user <github@lambdaclass.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
Tomás
2023-02-08 15:36:06 -03:00
committed by GitHub
parent 4cbd199016
commit 4df228a87e
10 changed files with 118 additions and 14 deletions

1
Cargo.lock generated
View File

@ -4472,6 +4472,7 @@ dependencies = [
"reth-transaction-pool",
"secp256k1 0.24.3",
"serde",
"serde_json",
"serial_test",
"tempfile",
"thiserror",

View File

@ -68,6 +68,19 @@ impl XdgPath for ConfigPath {
}
}
/// Returns the path to the default reth known peers file.
///
/// Refer to [dirs_next::config_dir] for cross-platform behavior.
#[derive(Default, Debug, Clone)]
#[non_exhaustive]
pub struct KnownPeersPath;
impl XdgPath for KnownPeersPath {
fn resolve() -> Option<PathBuf> {
database_path().map(|p| p.join("known-peers.json"))
}
}
/// Returns the path to the reth logs directory.
///
/// Refer to [dirs_next::cache_dir] for cross-platform behavior.

View File

@ -15,6 +15,7 @@ pub mod prometheus_exporter;
pub mod stage;
pub mod test_eth_chain;
pub mod test_vectors;
use dirs::{KnownPeersPath, PlatformPath};
pub use reth_staged_sync::utils;
use clap::Args;
@ -42,4 +43,14 @@ struct NetworkOpts {
/// Will fall back to a network-specific default if not specified.
#[arg(long, value_delimiter = ',')]
bootnodes: Option<Vec<NodeRecord>>,
/// The path to the known peers file. Connected peers are
/// dumped to this file on node shutdown, and read on startup.
/// Cannot be used with --no-persist-peers
#[arg(long, value_name = "FILE", verbatim_doc_comment, default_value_t)]
peers_file: PlatformPath<KnownPeersPath>,
/// Do not persist peers. Cannot be used with --peers-file
#[arg(long, verbatim_doc_comment, conflicts_with = "peers_file")]
no_persist_peers: bool,
}

View File

@ -26,8 +26,7 @@ use reth_stages::{
prelude::*,
stages::{ExecutionStage, SenderRecoveryStage, TotalDifficultyStage},
};
use std::{net::SocketAddr, sync::Arc, time::Duration};
use tokio::select;
use std::{io, net::SocketAddr, path::Path, sync::Arc, time::Duration};
use tracing::{debug, info, warn};
/// Start the node
@ -116,6 +115,7 @@ impl Command {
info!(target: "reth::cli", "Connecting to P2P network");
let netconf = self.load_network_config(&config, &db);
let network = netconf.start_network().await?;
info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network");
// TODO(mattsse): cleanup, add cli args
@ -139,7 +139,14 @@ impl Command {
// Run pipeline
info!(target: "reth::cli", "Starting sync pipeline");
pipeline.run(db.clone()).await?;
tokio::select! {
res = pipeline.run(db.clone()) => res?,
_ = tokio::signal::ctrl_c() => {},
};
if !self.network.no_persist_peers {
dump_peers(self.network.peers_file.as_ref(), network).await?;
}
info!(target: "reth::cli", "Finishing up");
Ok(())
@ -194,12 +201,14 @@ impl Command {
config: &Config,
db: &Arc<Env<WriteMap>>,
) -> NetworkConfig<ShareableDatabase<Env<WriteMap>>> {
let peers_file = (!self.network.no_persist_peers).then_some(&self.network.peers_file);
config.network_config(
db.clone(),
self.chain.clone(),
self.network.disable_discovery,
self.network.bootnodes.clone(),
self.nat,
peers_file.map(|f| f.as_ref().to_path_buf()),
)
}
@ -285,6 +294,15 @@ impl Command {
}
}
/// Dumps peers to `file_path` for persistence.
async fn dump_peers(file_path: &Path, network: NetworkHandle) -> Result<(), io::Error> {
info!(target : "net::peers", file = %file_path.display(), "Saving current peers");
let known_peers = network.peers_handle().all_peers().await;
tokio::fs::write(file_path, serde_json::to_string_pretty(&known_peers)?).await?;
Ok(())
}
/// The current high-level state of the node.
#[derive(Default)]
struct NodeState {
@ -324,7 +342,7 @@ async fn handle_events(mut events: impl Stream<Item = NodeEvent> + Unpin) {
let mut interval = tokio::time::interval(Duration::from_secs(30));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
select! {
tokio::select! {
Some(event) = events.next() => {
match event {
NodeEvent::Network(NetworkEvent::SessionEstablished { peer_id, status, .. }) => {

View File

@ -98,7 +98,14 @@ impl Command {
config.peers.connect_trusted_nodes_only = self.trusted_only;
let network = config
.network_config(noop_db, self.chain.clone(), self.disable_discovery, None, self.nat)
.network_config(
noop_db,
self.chain.clone(),
self.disable_discovery,
None,
self.nat,
None,
)
.start_network()
.await?;

View File

@ -143,6 +143,7 @@ impl Command {
self.network.disable_discovery,
None,
self.nat,
None,
)
.start_network()
.await?;

View File

@ -42,6 +42,7 @@ tokio-util = { version = "0.7", features = ["codec"] }
# io
serde = { version = "1.0", optional = true }
humantime-serde = { version = "1.1", optional = true }
serde_json = { version = "1.0", optional = true }
# metrics
metrics = "0.20.1"
@ -96,5 +97,5 @@ serial_test = "0.10"
[features]
default = ["serde"]
serde = ["dep:serde", "dep:humantime-serde", "secp256k1/serde", "enr?/serde"]
serde = ["dep:serde", "dep:humantime-serde", "secp256k1/serde", "enr?/serde", "dep:serde_json"]
test-utils = ["reth-provider/test-utils", "dep:enr", "dep:ethers-core", "dep:tempfile"]

View File

@ -14,8 +14,9 @@ use reth_primitives::{ForkId, NodeRecord, PeerId};
use std::{
collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
fmt::Display,
io,
io::{self, ErrorKind},
net::{IpAddr, SocketAddr},
path::Path,
task::{Context, Poll},
time::Duration,
};
@ -25,7 +26,7 @@ use tokio::{
time::{Instant, Interval},
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, trace};
use tracing::{debug, info, trace};
/// A communication channel to the [`PeersManager`] to apply manual changes to the peer set.
#[derive(Clone, Debug)]
@ -63,6 +64,14 @@ impl PeersHandle {
rx.await.unwrap_or(None)
}
/// Returns all peers in the peerset.
pub async fn all_peers(&self) -> Vec<NodeRecord> {
let (tx, rx) = oneshot::channel();
self.send(PeerCommand::GetPeers(tx));
rx.await.unwrap_or_default()
}
}
/// Maintains the state of _all_ the peers known to the network.
@ -111,6 +120,7 @@ impl PeersManager {
backoff_durations,
trusted_nodes,
connect_trusted_nodes_only,
basic_nodes,
..
} = config;
let (manager_tx, handle_rx) = mpsc::unbounded_channel();
@ -119,12 +129,16 @@ impl PeersManager {
// We use half of the interval to decrease the max duration to `150%` in worst case
let unban_interval = ban_duration.min(backoff_durations.low) / 2;
let mut peers = HashMap::with_capacity(trusted_nodes.len());
let mut peers = HashMap::with_capacity(trusted_nodes.len() + basic_nodes.len());
for NodeRecord { address, tcp_port, udp_port: _, id } in trusted_nodes {
peers.entry(id).or_insert_with(|| Peer::trusted(SocketAddr::from((address, tcp_port))));
}
for NodeRecord { address, tcp_port, udp_port: _, id } in basic_nodes {
peers.entry(id).or_insert_with(|| Peer::new(SocketAddr::from((address, tcp_port))));
}
Self {
peers,
manager_tx,
@ -623,6 +637,11 @@ impl PeersManager {
PeerCommand::GetPeer(peer, tx) => {
let _ = tx.send(self.peers.get(&peer).cloned());
}
PeerCommand::GetPeers(tx) => {
let _ = tx.send(
self.peers.iter().map(|(k, v)| NodeRecord::new(v.addr, *k)).collect(),
);
}
}
}
@ -876,6 +895,8 @@ pub(crate) enum PeerCommand {
ReputationChange(PeerId, ReputationChangeKind),
/// Get information about a peer
GetPeer(PeerId, oneshot::Sender<Option<Peer>>),
/// Get node information on all peers
GetPeers(oneshot::Sender<Vec<NodeRecord>>),
}
/// Actions the peer manager can trigger.
@ -921,6 +942,9 @@ pub struct PeersConfig {
pub trusted_nodes: HashSet<NodeRecord>,
/// Connect to trusted nodes only?
pub connect_trusted_nodes_only: bool,
/// Basic nodes to connect to.
#[cfg_attr(feature = "serde", serde(skip))]
pub basic_nodes: HashSet<NodeRecord>,
/// How long to ban bad peers.
#[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
pub ban_duration: Duration,
@ -948,6 +972,7 @@ impl Default for PeersConfig {
backoff_durations: Default::default(),
trusted_nodes: Default::default(),
connect_trusted_nodes_only: false,
basic_nodes: Default::default(),
}
}
}
@ -992,6 +1017,30 @@ impl PeersConfig {
self.connect_trusted_nodes_only = trusted_only;
self
}
/// Nodes available at launch.
pub fn with_basic_nodes(mut self, nodes: HashSet<NodeRecord>) -> Self {
self.basic_nodes = nodes;
self
}
/// Read from file nodes available at launch. Ignored if None.
pub fn with_basic_nodes_from_file(
self,
optional_file: Option<impl AsRef<Path>>,
) -> Result<Self, io::Error> {
let Some(file_path) = optional_file else {
return Ok(self)
};
let reader = match std::fs::File::open(file_path.as_ref()) {
Ok(file) => std::io::BufReader::new(file),
Err(e) if e.kind() == ErrorKind::NotFound => return Ok(self),
Err(e) => Err(e)?,
};
info!(target: "net::peers", file = %file_path.as_ref().display(), "Loading saved peers");
let nodes: HashSet<NodeRecord> = serde_json::from_reader(reader)?;
Ok(self.with_basic_nodes(nodes))
}
}
/// The durations to use when a backoff should be applied to a peer.

View File

@ -4,7 +4,7 @@ mod manager;
mod reputation;
pub(crate) use manager::{InboundConnectionError, PeerAction, PeersManager};
pub use manager::{PeersConfig, PeersHandle};
pub use manager::{Peer, PeersConfig, PeersHandle};
pub use reputation::ReputationChangeWeights;
pub use reth_network_api::PeerKind;

View File

@ -1,5 +1,5 @@
//! Configuration files.
use std::sync::Arc;
use std::{path::PathBuf, sync::Arc};
use reth_db::database::Database;
use reth_discv4::Discv4Config;
@ -31,10 +31,13 @@ impl Config {
disable_discovery: bool,
bootnodes: Option<Vec<NodeRecord>>,
nat_resolution_method: reth_net_nat::NatResolver,
peers_file: Option<PathBuf>,
) -> NetworkConfig<ShareableDatabase<DB>> {
let peer_config = reth_network::PeersConfig::default()
.with_trusted_nodes(self.peers.trusted_nodes.clone())
.with_connect_trusted_nodes_only(self.peers.connect_trusted_nodes_only);
let peer_config = self
.peers
.clone()
.with_basic_nodes_from_file(peers_file)
.unwrap_or_else(|_| self.peers.clone());
let discv4 =
Discv4Config::builder().external_ip_resolver(Some(nat_resolution_method)).clone();
NetworkConfigBuilder::new(rng_secret_key())