mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat(cli): more reasonable log output (#784)
This commit is contained in:
@ -27,6 +27,7 @@ reth-cli-utils = { path = "../../crates/cli/utils" }
|
||||
tracing = "0.1"
|
||||
tracing-futures = "0.2"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
tracing-appender = "0.2"
|
||||
|
||||
# io
|
||||
fdlimit = "0.2.1"
|
||||
@ -47,6 +48,7 @@ eyre = "0.6.8"
|
||||
clap = { version = "4.0", features = ["derive", "cargo"] }
|
||||
thiserror = "1.0"
|
||||
tokio = { version = "1.21", features = ["sync", "macros", "rt-multi-thread"] }
|
||||
tokio-stream = "0.1"
|
||||
futures = "0.3.25"
|
||||
strum = "0.24.1"
|
||||
tempfile = { version = "3.3.0" }
|
||||
|
||||
@ -1,20 +1,21 @@
|
||||
//! CLI definition and entrypoint to executable
|
||||
use crate::{
|
||||
db,
|
||||
dirs::{LogsDir, PlatformPath},
|
||||
node, p2p, stage, test_eth_chain,
|
||||
utils::{reth_tracing, reth_tracing::BoxedLayer},
|
||||
};
|
||||
use clap::{ArgAction, Args, Parser, Subcommand};
|
||||
use std::str::FromStr;
|
||||
use tracing::{metadata::LevelFilter, Level, Subscriber};
|
||||
use tracing_subscriber::{filter::Directive, registry::LookupSpan};
|
||||
|
||||
use crate::{db, node, p2p, stage, test_eth_chain};
|
||||
|
||||
use clap::{ArgAction, Parser, Subcommand};
|
||||
use reth_cli_utils::reth_tracing::{self, TracingMode};
|
||||
use tracing_subscriber::util::SubscriberInitExt;
|
||||
|
||||
/// main function that parses cli and runs command
|
||||
/// Parse CLI options, set up logging and run the chosen command.
|
||||
pub async fn run() -> eyre::Result<()> {
|
||||
let opt = Cli::parse();
|
||||
reth_tracing::build_subscriber(if opt.silent {
|
||||
TracingMode::Silent
|
||||
} else {
|
||||
TracingMode::from(opt.verbose)
|
||||
})
|
||||
.init();
|
||||
|
||||
let (layer, _guard) = opt.logs.layer();
|
||||
reth_tracing::init(vec![layer, reth_tracing::stdout(opt.verbosity.directive())]);
|
||||
|
||||
match opt.command {
|
||||
Commands::Node(command) => command.execute().await,
|
||||
@ -51,17 +52,94 @@ pub enum Commands {
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(author, version="0.1", about="Reth binary", long_about = None)]
|
||||
#[command(author, version = "0.1", about = "Reth", long_about = None)]
|
||||
struct Cli {
|
||||
/// The command to run
|
||||
#[clap(subcommand)]
|
||||
command: Commands,
|
||||
|
||||
/// Use verbose output
|
||||
#[clap(short, long, action = ArgAction::Count, global = true)]
|
||||
verbose: u8,
|
||||
#[clap(flatten)]
|
||||
logs: Logs,
|
||||
|
||||
/// Silence all output
|
||||
#[clap(long, global = true)]
|
||||
silent: bool,
|
||||
#[clap(flatten)]
|
||||
verbosity: Verbosity,
|
||||
}
|
||||
|
||||
#[derive(Args)]
|
||||
#[command(next_help_heading = "Logging")]
|
||||
struct Logs {
|
||||
/// The path to put log files in.
|
||||
#[arg(
|
||||
long = "log.directory",
|
||||
value_name = "PATH",
|
||||
global = true,
|
||||
default_value_t,
|
||||
conflicts_with = "journald"
|
||||
)]
|
||||
log_directory: PlatformPath<LogsDir>,
|
||||
|
||||
/// Log events to journald.
|
||||
#[arg(long = "log.journald", global = true, conflicts_with = "log_directory")]
|
||||
journald: bool,
|
||||
|
||||
/// The filter to use for logs written to the log file.
|
||||
#[arg(long = "log.filter", value_name = "FILTER", global = true, default_value = "debug")]
|
||||
filter: String,
|
||||
}
|
||||
|
||||
impl Logs {
|
||||
/// Builds a tracing layer from the current log options.
|
||||
fn layer<S>(&self) -> (BoxedLayer<S>, Option<tracing_appender::non_blocking::WorkerGuard>)
|
||||
where
|
||||
S: Subscriber,
|
||||
for<'a> S: LookupSpan<'a>,
|
||||
{
|
||||
let directive = Directive::from_str(self.filter.as_str())
|
||||
.unwrap_or_else(|_| Directive::from_str("debug").unwrap());
|
||||
|
||||
if self.journald {
|
||||
(reth_tracing::journald(directive).expect("Could not connect to journald"), None)
|
||||
} else {
|
||||
let (layer, guard) = reth_tracing::file(directive, &self.log_directory, "reth.log");
|
||||
(layer, Some(guard))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Args)]
|
||||
#[command(next_help_heading = "Display")]
|
||||
struct Verbosity {
|
||||
/// Set the minimum log level.
|
||||
///
|
||||
/// -v Errors
|
||||
/// -vv Warnings
|
||||
/// -vvv Info
|
||||
/// -vvvv Debug
|
||||
/// -vvvvv Traces (warning: very verbose!)
|
||||
#[clap(short, long, action = ArgAction::Count, global = true, default_value_t = 3, verbatim_doc_comment, help_heading = "Display")]
|
||||
verbosity: u8,
|
||||
|
||||
/// Silence all log output.
|
||||
#[clap(long, alias = "silent", short = 'q', global = true, help_heading = "Display")]
|
||||
quiet: bool,
|
||||
}
|
||||
|
||||
impl Verbosity {
|
||||
/// Get the corresponding [Directive] for the given verbosity, or none if the verbosity
|
||||
/// corresponds to silent.
|
||||
fn directive(&self) -> Directive {
|
||||
if self.quiet {
|
||||
LevelFilter::OFF.into()
|
||||
} else {
|
||||
let level = match self.verbosity - 1 {
|
||||
0 => Level::ERROR,
|
||||
1 => Level::WARN,
|
||||
2 => Level::INFO,
|
||||
3 => Level::DEBUG,
|
||||
_ => Level::TRACE,
|
||||
};
|
||||
|
||||
format!("reth::cli={level}").parse().unwrap()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
//! Database debugging tool
|
||||
use crate::dirs::DbPath;
|
||||
use crate::dirs::{DbPath, PlatformPath};
|
||||
use clap::{Parser, Subcommand};
|
||||
use eyre::{Result, WrapErr};
|
||||
use reth_db::{
|
||||
@ -24,7 +24,7 @@ pub struct Command {
|
||||
/// - Windows: `{FOLDERID_RoamingAppData}/reth/db`
|
||||
/// - macOS: `$HOME/Library/Application Support/reth/db`
|
||||
#[arg(long, value_name = "PATH", verbatim_doc_comment, default_value_t)]
|
||||
db: DbPath,
|
||||
db: PlatformPath<DbPath>,
|
||||
|
||||
#[clap(subcommand)]
|
||||
command: Subcommands,
|
||||
@ -98,6 +98,7 @@ impl Command {
|
||||
let num_pages = leaf_pages + branch_pages + overflow_pages;
|
||||
let table_size = page_size * num_pages;
|
||||
info!(
|
||||
target: "reth::cli",
|
||||
"Table {} has {} entries (total size: {} KB)",
|
||||
table,
|
||||
stats.entries(),
|
||||
@ -135,7 +136,7 @@ impl<'a, DB: Database> DbTool<'a, DB> {
|
||||
|
||||
/// Seeds the database with some random data, only used for testing
|
||||
fn seed(&mut self, len: u64) -> Result<()> {
|
||||
info!("Generating random block range from 0 to {len}");
|
||||
info!(target: "reth::cli", "Generating random block range from 0 to {len}");
|
||||
let chain = random_block_range(0..len, Default::default(), 0..64);
|
||||
|
||||
self.db.update(|tx| {
|
||||
@ -145,7 +146,7 @@ impl<'a, DB: Database> DbTool<'a, DB> {
|
||||
})
|
||||
})??;
|
||||
|
||||
info!("Database seeded with {len} blocks");
|
||||
info!(target: "reth::cli", "Database seeded with {len} blocks");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -158,7 +159,7 @@ impl<'a, DB: Database> DbTool<'a, DB> {
|
||||
self.list_table::<tables::$table>($start, $len)?
|
||||
},)*
|
||||
_ => {
|
||||
tracing::error!("Unknown table.");
|
||||
tracing::error!(target: "reth::cli", "Unknown table.");
|
||||
return Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@ -28,67 +28,111 @@ pub fn config_dir() -> Option<PathBuf> {
|
||||
dirs_next::config_dir().map(|root| root.join("reth"))
|
||||
}
|
||||
|
||||
/// A wrapper type that either parses a user-given path for the reth database or defaults to an
|
||||
/// OS-specific path.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct DbPath(PathBuf);
|
||||
/// Returns the path to the reth cache directory.
|
||||
///
|
||||
/// Refer to [dirs_next::cache_dir] for cross-platform behavior.
|
||||
pub fn cache_dir() -> Option<PathBuf> {
|
||||
dirs_next::cache_dir().map(|root| root.join("reth"))
|
||||
}
|
||||
|
||||
impl Display for DbPath {
|
||||
/// Returns the path to the reth logs directory.
|
||||
///
|
||||
/// Refer to [dirs_next::cache_dir] for cross-platform behavior.
|
||||
pub fn logs_dir() -> Option<PathBuf> {
|
||||
cache_dir().map(|root| root.join("logs"))
|
||||
}
|
||||
|
||||
/// Returns the path to the reth database.
|
||||
///
|
||||
/// Refer to [dirs_next::data_dir] for cross-platform behavior.
|
||||
#[derive(Default, Debug, Clone)]
|
||||
#[non_exhaustive]
|
||||
pub struct DbPath;
|
||||
|
||||
impl XdgPath for DbPath {
|
||||
fn resolve() -> Option<PathBuf> {
|
||||
database_path()
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the path to the default reth configuration file.
|
||||
///
|
||||
/// Refer to [dirs_next::config_dir] for cross-platform behavior.
|
||||
#[derive(Default, Debug, Clone)]
|
||||
#[non_exhaustive]
|
||||
pub struct ConfigPath;
|
||||
|
||||
impl XdgPath for ConfigPath {
|
||||
fn resolve() -> Option<PathBuf> {
|
||||
config_dir().map(|p| p.join("reth.toml"))
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the path to the reth logs directory.
|
||||
///
|
||||
/// Refer to [dirs_next::cache_dir] for cross-platform behavior.
|
||||
#[derive(Default, Debug, Clone)]
|
||||
#[non_exhaustive]
|
||||
pub struct LogsDir;
|
||||
|
||||
impl XdgPath for LogsDir {
|
||||
fn resolve() -> Option<PathBuf> {
|
||||
logs_dir()
|
||||
}
|
||||
}
|
||||
|
||||
/// A small helper trait for unit structs that represent a standard path following the XDG
|
||||
/// path specification.
|
||||
trait XdgPath {
|
||||
fn resolve() -> Option<PathBuf>;
|
||||
}
|
||||
|
||||
/// A wrapper type that either parses a user-given path or defaults to an
|
||||
/// OS-specific path.
|
||||
///
|
||||
/// The [FromStr] implementation supports shell expansions and common patterns such as `~` for the
|
||||
/// home directory.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```
|
||||
/// use reth::dirs::{PlatformPath, DbPath};
|
||||
/// use std::str::FromStr;
|
||||
///
|
||||
/// // Resolves to the platform-specific database path
|
||||
/// let default: PlatformPath<DbPath> = PlatformPath::default();
|
||||
/// // Resolves to `$(pwd)/my/path/to/db`
|
||||
/// let custom: PlatformPath<DbPath> = PlatformPath::from_str("my/path/to/db").unwrap();
|
||||
///
|
||||
/// assert_ne!(default.as_ref(), custom.as_ref());
|
||||
/// ```
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct PlatformPath<D>(PathBuf, std::marker::PhantomData<D>);
|
||||
|
||||
impl<D> Display for PlatformPath<D> {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.0.display())
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for DbPath {
|
||||
fn default() -> Self {
|
||||
Self(database_path().expect("Could not determine default database path. Set one manually."))
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for DbPath {
|
||||
type Err = shellexpand::LookupError<VarError>;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
Ok(Self(parse_path(s)?))
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<Path> for DbPath {
|
||||
fn as_ref(&self) -> &Path {
|
||||
self.0.as_path()
|
||||
}
|
||||
}
|
||||
|
||||
/// A wrapper type that either parses a user-given path for the reth config or defaults to an
|
||||
/// OS-specific path.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ConfigPath(PathBuf);
|
||||
|
||||
impl Display for ConfigPath {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.0.display())
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ConfigPath {
|
||||
impl<D: XdgPath> Default for PlatformPath<D> {
|
||||
fn default() -> Self {
|
||||
Self(
|
||||
config_dir()
|
||||
.expect("Could not determine default database path. Set one manually.")
|
||||
.join("reth.toml"),
|
||||
D::resolve().expect("Could not resolve default path. Set one manually."),
|
||||
std::marker::PhantomData,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for ConfigPath {
|
||||
impl<D> FromStr for PlatformPath<D> {
|
||||
type Err = shellexpand::LookupError<VarError>;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
Ok(Self(parse_path(s)?))
|
||||
Ok(Self(parse_path(s)?, std::marker::PhantomData))
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<Path> for ConfigPath {
|
||||
impl<D> AsRef<Path> for PlatformPath<D> {
|
||||
fn as_ref(&self) -> &Path {
|
||||
self.0.as_path()
|
||||
}
|
||||
|
||||
@ -15,13 +15,14 @@ pub mod p2p;
|
||||
pub mod prometheus_exporter;
|
||||
pub mod stage;
|
||||
pub mod test_eth_chain;
|
||||
|
||||
use clap::Parser;
|
||||
pub use reth_cli_utils as utils;
|
||||
|
||||
use clap::Args;
|
||||
use reth_primitives::NodeRecord;
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
/// Parameters for configuring the network more granularly via CLI
|
||||
#[derive(Debug, Args)]
|
||||
#[command(next_help_heading = "Networking")]
|
||||
struct NetworkOpts {
|
||||
/// Disable the discovery service.
|
||||
#[arg(short, long)]
|
||||
|
||||
@ -3,34 +3,44 @@
|
||||
//! Starts the client
|
||||
use crate::{
|
||||
config::Config,
|
||||
dirs::{ConfigPath, DbPath},
|
||||
prometheus_exporter, NetworkOpts,
|
||||
dirs::{ConfigPath, DbPath, PlatformPath},
|
||||
prometheus_exporter,
|
||||
utils::{
|
||||
chainspec::{chain_spec_value_parser, ChainSpecification},
|
||||
init::{init_db, init_genesis},
|
||||
parse_socket_address,
|
||||
},
|
||||
NetworkOpts,
|
||||
};
|
||||
use clap::{crate_version, Parser};
|
||||
use fdlimit::raise_fd_limit;
|
||||
use reth_cli_utils::{
|
||||
chainspec::{chain_spec_value_parser, ChainSpecification},
|
||||
init::{init_db, init_genesis},
|
||||
parse_socket_address,
|
||||
};
|
||||
use futures::{stream::select as stream_select, Stream, StreamExt};
|
||||
use reth_consensus::BeaconConsensus;
|
||||
use reth_downloaders::{bodies, headers};
|
||||
use reth_executor::Config as ExecutorConfig;
|
||||
use reth_interfaces::consensus::ForkchoiceState;
|
||||
use reth_primitives::H256;
|
||||
use reth_network::NetworkEvent;
|
||||
use reth_primitives::{BlockNumber, H256};
|
||||
use reth_stages::{
|
||||
metrics::HeaderMetrics,
|
||||
stages::{
|
||||
bodies::BodyStage, execution::ExecutionStage, headers::HeaderStage,
|
||||
sender_recovery::SenderRecoveryStage, total_difficulty::TotalDifficultyStage,
|
||||
},
|
||||
PipelineEvent, StageId,
|
||||
};
|
||||
use std::{net::SocketAddr, sync::Arc};
|
||||
use tracing::{debug, info};
|
||||
use std::{net::SocketAddr, sync::Arc, time::Duration};
|
||||
use tokio::select;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
/// Start the client
|
||||
/// Start the node
|
||||
#[derive(Debug, Parser)]
|
||||
pub struct Command {
|
||||
/// The path to the configuration file to use.
|
||||
#[arg(long, value_name = "FILE", verbatim_doc_comment, default_value_t)]
|
||||
config: PlatformPath<ConfigPath>,
|
||||
|
||||
/// The path to the database folder.
|
||||
///
|
||||
/// Defaults to the OS-specific data directory:
|
||||
@ -39,11 +49,7 @@ pub struct Command {
|
||||
/// - Windows: `{FOLDERID_RoamingAppData}/reth/db`
|
||||
/// - macOS: `$HOME/Library/Application Support/reth/db`
|
||||
#[arg(long, value_name = "PATH", verbatim_doc_comment, default_value_t)]
|
||||
db: DbPath,
|
||||
|
||||
/// The path to the configuration file to use.
|
||||
#[arg(long, value_name = "FILE", verbatim_doc_comment, default_value_t)]
|
||||
config: ConfigPath,
|
||||
db: PlatformPath<DbPath>,
|
||||
|
||||
/// The chain this node is running.
|
||||
///
|
||||
@ -65,13 +71,13 @@ pub struct Command {
|
||||
/// Enable Prometheus metrics.
|
||||
///
|
||||
/// The metrics will be served at the given interface and port.
|
||||
#[arg(long, value_name = "SOCKET", value_parser = parse_socket_address)]
|
||||
#[arg(long, value_name = "SOCKET", value_parser = parse_socket_address, help_heading = "Metrics")]
|
||||
metrics: Option<SocketAddr>,
|
||||
|
||||
/// Set the chain tip manually for testing purposes.
|
||||
///
|
||||
/// NOTE: This is a temporary flag
|
||||
#[arg(long = "debug.tip")]
|
||||
#[arg(long = "debug.tip", help_heading = "Debug")]
|
||||
tip: Option<H256>,
|
||||
|
||||
#[clap(flatten)]
|
||||
@ -94,14 +100,14 @@ impl Command {
|
||||
});
|
||||
}
|
||||
|
||||
info!("reth {} starting", crate_version!());
|
||||
info!(target: "reth::cli", "reth {} starting", crate_version!());
|
||||
|
||||
info!("Opening database at {}", &self.db);
|
||||
info!(target: "reth::cli", path = %self.db, "Opening database");
|
||||
let db = Arc::new(init_db(&self.db)?);
|
||||
info!("Database open");
|
||||
info!(target: "reth::cli", "Database opened");
|
||||
|
||||
if let Some(listen_addr) = self.metrics {
|
||||
info!("Starting metrics endpoint at {}", listen_addr);
|
||||
info!(target: "reth::cli", addr = %listen_addr, "Starting metrics endpoint");
|
||||
prometheus_exporter::initialize(listen_addr)?;
|
||||
HeaderMetrics::describe();
|
||||
}
|
||||
@ -115,14 +121,18 @@ impl Command {
|
||||
.start_network()
|
||||
.await?;
|
||||
|
||||
info!(peer_id = ?network.peer_id(), local_addr = %network.local_addr(), "Started p2p networking");
|
||||
let (sender, receiver) = tokio::sync::mpsc::channel(64);
|
||||
tokio::spawn(handle_events(stream_select(
|
||||
network.event_listener().map(Into::into),
|
||||
ReceiverStream::new(receiver).map(Into::into),
|
||||
)));
|
||||
|
||||
info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network");
|
||||
|
||||
// TODO: Are most of these Arcs unnecessary? For example, fetch client is completely
|
||||
// cloneable on its own
|
||||
// TODO: Remove magic numbers
|
||||
let fetch_client = Arc::new(network.fetch_client().await?);
|
||||
let mut pipeline = reth_stages::Pipeline::default()
|
||||
.with_sync_state_updater(network.clone())
|
||||
.with_channel(sender)
|
||||
.push(HeaderStage {
|
||||
downloader: headers::linear::LinearDownloadBuilder::default()
|
||||
.batch_size(config.stages.headers.downloader_batch_size)
|
||||
@ -160,19 +170,102 @@ impl Command {
|
||||
});
|
||||
|
||||
if let Some(tip) = self.tip {
|
||||
debug!("Tip manually set: {}", tip);
|
||||
debug!(target: "reth::cli", %tip, "Tip manually set");
|
||||
consensus.notify_fork_choice_state(ForkchoiceState {
|
||||
head_block_hash: tip,
|
||||
safe_block_hash: tip,
|
||||
finalized_block_hash: tip,
|
||||
})?;
|
||||
} else {
|
||||
warn!(target: "reth::cli", "No tip specified. reth cannot communicate with consensus clients, so a tip must manually be provided for the online stages with --debug.tip <HASH>.");
|
||||
}
|
||||
|
||||
// Run pipeline
|
||||
info!("Starting pipeline");
|
||||
info!(target: "reth::cli", "Starting sync pipeline");
|
||||
pipeline.run(db.clone()).await?;
|
||||
|
||||
info!("Finishing up");
|
||||
info!(target: "reth::cli", "Finishing up");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// The current high-level state of the node.
|
||||
#[derive(Default)]
|
||||
struct NodeState {
|
||||
/// The number of connected peers.
|
||||
connected_peers: usize,
|
||||
/// The stage currently being executed.
|
||||
current_stage: Option<StageId>,
|
||||
/// The current checkpoint of the executing stage.
|
||||
current_checkpoint: BlockNumber,
|
||||
}
|
||||
|
||||
/// A node event.
|
||||
enum NodeEvent {
|
||||
/// A network event.
|
||||
Network(NetworkEvent),
|
||||
/// A sync pipeline event.
|
||||
Pipeline(PipelineEvent),
|
||||
}
|
||||
|
||||
impl From<NetworkEvent> for NodeEvent {
|
||||
fn from(evt: NetworkEvent) -> NodeEvent {
|
||||
NodeEvent::Network(evt)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PipelineEvent> for NodeEvent {
|
||||
fn from(evt: PipelineEvent) -> NodeEvent {
|
||||
NodeEvent::Pipeline(evt)
|
||||
}
|
||||
}
|
||||
|
||||
/// Displays relevant information to the user from components of the node, and periodically
|
||||
/// displays the high-level status of the node.
|
||||
async fn handle_events(mut events: impl Stream<Item = NodeEvent> + Unpin) {
|
||||
let mut state = NodeState::default();
|
||||
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(30));
|
||||
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
|
||||
loop {
|
||||
select! {
|
||||
Some(event) = events.next() => {
|
||||
match event {
|
||||
NodeEvent::Network(NetworkEvent::SessionEstablished { peer_id, status, .. }) => {
|
||||
state.connected_peers += 1;
|
||||
info!(target: "reth::cli", connected_peers = state.connected_peers, peer_id = %peer_id, best_block = %status.blockhash, "Peer connected");
|
||||
},
|
||||
NodeEvent::Network(NetworkEvent::SessionClosed { peer_id, reason }) => {
|
||||
state.connected_peers -= 1;
|
||||
let reason = reason.map(|s| s.to_string()).unwrap_or("None".to_string());
|
||||
warn!(target: "reth::cli", connected_peers = state.connected_peers, peer_id = %peer_id, %reason, "Peer disconnected.");
|
||||
},
|
||||
NodeEvent::Pipeline(PipelineEvent::Running { stage_id, stage_progress }) => {
|
||||
let notable = state.current_stage.is_none();
|
||||
state.current_stage = Some(stage_id);
|
||||
state.current_checkpoint = stage_progress.unwrap_or_default();
|
||||
|
||||
if notable {
|
||||
info!(target: "reth::cli", stage = %stage_id, from = stage_progress, "Executing stage");
|
||||
}
|
||||
},
|
||||
NodeEvent::Pipeline(PipelineEvent::Ran { stage_id, result }) => {
|
||||
let notable = result.stage_progress > state.current_checkpoint;
|
||||
state.current_checkpoint = result.stage_progress;
|
||||
if result.done {
|
||||
state.current_stage = None;
|
||||
info!(target: "reth::cli", stage = %stage_id, checkpoint = result.stage_progress, "Stage finished executing");
|
||||
} else if notable {
|
||||
info!(target: "reth::cli", stage = %stage_id, checkpoint = result.stage_progress, "Stage committed progress");
|
||||
}
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
},
|
||||
_ = interval.tick() => {
|
||||
let stage = state.current_stage.map(|id| id.to_string()).unwrap_or("None".to_string());
|
||||
info!(target: "reth::cli", connected_peers = state.connected_peers, %stage, checkpoint = state.current_checkpoint, "Status");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,10 +1,14 @@
|
||||
//! P2P Debugging tool
|
||||
use crate::{
|
||||
config::Config,
|
||||
dirs::{ConfigPath, PlatformPath},
|
||||
utils::{
|
||||
chainspec::{chain_spec_value_parser, ChainSpecification},
|
||||
hash_or_num_value_parser,
|
||||
},
|
||||
};
|
||||
use backon::{ConstantBackoff, Retryable};
|
||||
use clap::{Parser, Subcommand};
|
||||
use reth_cli_utils::{
|
||||
chainspec::{chain_spec_value_parser, ChainSpecification},
|
||||
hash_or_num_value_parser,
|
||||
};
|
||||
use reth_db::mdbx::{Env, EnvKind, WriteMap};
|
||||
use reth_interfaces::p2p::{
|
||||
bodies::client::BodiesClient,
|
||||
@ -14,14 +18,12 @@ use reth_network::FetchClient;
|
||||
use reth_primitives::{BlockHashOrNumber, Header, NodeRecord, SealedHeader};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::{config::Config, dirs::ConfigPath};
|
||||
|
||||
/// `reth p2p` command
|
||||
#[derive(Debug, Parser)]
|
||||
pub struct Command {
|
||||
/// The path to the configuration file to use.
|
||||
#[arg(long, value_name = "FILE", verbatim_doc_comment, default_value_t)]
|
||||
config: ConfigPath,
|
||||
config: PlatformPath<ConfigPath>,
|
||||
|
||||
/// The chain this node is running.
|
||||
///
|
||||
|
||||
@ -3,12 +3,13 @@
|
||||
//! Stage debugging tool
|
||||
use crate::{
|
||||
config::Config,
|
||||
dirs::{ConfigPath, DbPath},
|
||||
prometheus_exporter, NetworkOpts,
|
||||
};
|
||||
use reth_cli_utils::{
|
||||
chainspec::{chain_spec_value_parser, ChainSpecification},
|
||||
init::{init_db, init_genesis},
|
||||
dirs::{ConfigPath, DbPath, PlatformPath},
|
||||
prometheus_exporter,
|
||||
utils::{
|
||||
chainspec::{chain_spec_value_parser, ChainSpecification},
|
||||
init::{init_db, init_genesis},
|
||||
},
|
||||
NetworkOpts,
|
||||
};
|
||||
use reth_consensus::BeaconConsensus;
|
||||
use reth_downloaders::bodies::concurrent::ConcurrentDownloader;
|
||||
@ -36,11 +37,11 @@ pub struct Command {
|
||||
/// - Windows: `{FOLDERID_RoamingAppData}/reth/db`
|
||||
/// - macOS: `$HOME/Library/Application Support/reth/db`
|
||||
#[arg(long, value_name = "PATH", verbatim_doc_comment, default_value_t)]
|
||||
db: DbPath,
|
||||
db: PlatformPath<DbPath>,
|
||||
|
||||
/// The path to the configuration file to use.
|
||||
#[arg(long, value_name = "FILE", verbatim_doc_comment, default_value_t)]
|
||||
config: ConfigPath,
|
||||
config: PlatformPath<ConfigPath>,
|
||||
|
||||
/// The chain this node is running.
|
||||
///
|
||||
@ -108,13 +109,13 @@ impl Command {
|
||||
fdlimit::raise_fd_limit();
|
||||
|
||||
if let Some(listen_addr) = self.metrics {
|
||||
info!("Starting metrics endpoint at {}", listen_addr);
|
||||
info!(target: "reth::cli", "Starting metrics endpoint at {}", listen_addr);
|
||||
prometheus_exporter::initialize(listen_addr)?;
|
||||
HeaderMetrics::describe();
|
||||
}
|
||||
|
||||
let config: Config = confy::load_path(&self.config).unwrap_or_default();
|
||||
info!("reth {} starting stage {:?}", clap::crate_version!(), self.stage);
|
||||
info!(target: "reth::cli", "reth {} starting stage {:?}", clap::crate_version!(), self.stage);
|
||||
|
||||
let input = ExecInput {
|
||||
previous_stage: Some((StageId("No Previous Stage"), self.to)),
|
||||
|
||||
@ -38,12 +38,12 @@ impl Command {
|
||||
}
|
||||
Err(error) => {
|
||||
num_of_failed += 1;
|
||||
error!("Test {file:?} failed:\n {error}\n");
|
||||
error!(target: "reth::cli", "Test {file:?} failed:\n{error}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("\nPASSED {num_of_passed}/{} tests\n", num_of_passed + num_of_failed);
|
||||
info!(target: "reth::cli", "{num_of_passed}/{} tests passed\n", num_of_passed + num_of_failed);
|
||||
|
||||
if num_of_failed != 0 {
|
||||
Err(eyre!("Failed {num_of_failed} tests"))
|
||||
|
||||
@ -21,7 +21,7 @@ use std::{
|
||||
ffi::OsStr,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
use tracing::{debug, info};
|
||||
use tracing::{debug, info, trace};
|
||||
|
||||
/// Tests are test edge cases that are not possible to happen on mainnet, so we are skipping them.
|
||||
pub fn should_skip(path: &Path) -> bool {
|
||||
@ -80,7 +80,7 @@ pub async fn run_test(path: PathBuf) -> eyre::Result<()> {
|
||||
if should_skip(path) {
|
||||
return Ok(())
|
||||
}
|
||||
info!("Executing test from path: {path:?}");
|
||||
info!(target: "reth::cli", ?path, "Running test suite");
|
||||
|
||||
for (name, suite) in suites.0 {
|
||||
if matches!(
|
||||
@ -101,7 +101,7 @@ pub async fn run_test(path: PathBuf) -> eyre::Result<()> {
|
||||
|
||||
let pre_state = suite.pre.0;
|
||||
|
||||
debug!("Executing {:?} spec: {:?}", name, suite.network);
|
||||
debug!(target: "reth::cli", name, network = ?suite.network, "Running test");
|
||||
|
||||
let spec_upgrades: SpecUpgrades = suite.network.into();
|
||||
// if paris aka merge is not activated we dont have block rewards;
|
||||
@ -139,7 +139,7 @@ pub async fn run_test(path: PathBuf) -> eyre::Result<()> {
|
||||
tx.put::<tables::Bytecodes>(code_hash, account.code.to_vec())?;
|
||||
}
|
||||
account.storage.iter().try_for_each(|(k, v)| {
|
||||
tracing::trace!("Update storage: {address} key:{:?} val:{:?}", k.0, v.0);
|
||||
trace!(target: "reth::cli", ?address, key = ?k.0, value = ?v.0, "Update storage");
|
||||
tx.put::<tables::PlainStorageState>(
|
||||
address,
|
||||
StorageEntry { key: H256::from_slice(&k.0.to_be_bytes::<32>()), value: v.0 },
|
||||
@ -164,7 +164,7 @@ pub async fn run_test(path: PathBuf) -> eyre::Result<()> {
|
||||
map
|
||||
}))
|
||||
})??;
|
||||
tracing::trace!("Pre state :{:?}", storage);
|
||||
trace!(target: "reth::cli", ?storage, "Pre-state");
|
||||
|
||||
// Initialize the execution stage
|
||||
// Hardcode the chain_id to Ethereum 1.
|
||||
@ -189,7 +189,7 @@ pub async fn run_test(path: PathBuf) -> eyre::Result<()> {
|
||||
// Validate post state
|
||||
match suite.post_state {
|
||||
Some(RootOrState::Root(root)) => {
|
||||
info!("Post state is root: #{root:?}")
|
||||
info!(target: "reth::cli", "Post-state root: #{root:?}")
|
||||
}
|
||||
Some(RootOrState::State(state)) => db.view(|tx| -> eyre::Result<()> {
|
||||
let mut cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
|
||||
@ -269,7 +269,7 @@ pub async fn run_test(path: PathBuf) -> eyre::Result<()> {
|
||||
}
|
||||
Ok(())
|
||||
})??,
|
||||
None => info!("Post state is none"),
|
||||
None => info!(target: "reth::cli", "No post-state"),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
||||
Reference in New Issue
Block a user