refactor: move cli trace helpers to reth-tracing (#820)

This commit is contained in:
Bjerg
2023-01-11 19:20:37 +01:00
committed by GitHub
parent 8f97540bd9
commit 20ee1bb1c9
14 changed files with 142 additions and 131 deletions

10
Cargo.lock generated
View File

@ -3602,6 +3602,7 @@ dependencies = [
"reth-provider",
"reth-rlp",
"reth-stages",
"reth-tracing",
"reth-transaction-pool",
"serde",
"serde_json",
@ -3612,9 +3613,6 @@ dependencies = [
"tokio",
"tokio-stream",
"tracing",
"tracing-appender",
"tracing-futures",
"tracing-subscriber",
"walkdir",
]
@ -3630,9 +3628,6 @@ dependencies = [
"serde_json",
"shellexpand",
"tracing",
"tracing-appender",
"tracing-journald",
"tracing-subscriber",
"walkdir",
]
@ -4193,6 +4188,9 @@ name = "reth-tracing"
version = "0.1.0"
dependencies = [
"tracing",
"tracing-appender",
"tracing-futures",
"tracing-journald",
"tracing-subscriber",
]

View File

@ -22,12 +22,10 @@ reth-rlp = { path = "../../crates/common/rlp" }
reth-network = {path = "../../crates/net/network", features = ["serde"] }
reth-downloaders = {path = "../../crates/net/downloaders" }
reth-cli-utils = { path = "../../crates/cli/utils" }
reth-tracing = { path = "../../crates/tracing" }
# tracing
tracing = "0.1"
tracing-futures = "0.2"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-appender = "0.2"
# io
fdlimit = "0.2.1"

View File

@ -3,12 +3,14 @@ use crate::{
db,
dirs::{LogsDir, PlatformPath},
node, p2p, stage, test_eth_chain,
utils::{reth_tracing, reth_tracing::BoxedLayer},
};
use clap::{ArgAction, Args, Parser, Subcommand};
use reth_tracing::{
tracing::{metadata::LevelFilter, Level, Subscriber},
tracing_subscriber::{filter::Directive, registry::LookupSpan},
BoxedLayer, FileWorkerGuard,
};
use std::str::FromStr;
use tracing::{metadata::LevelFilter, Level, Subscriber};
use tracing_subscriber::{filter::Directive, registry::LookupSpan};
/// Parse CLI options, set up logging and run the chosen command.
pub async fn run() -> eyre::Result<()> {
@ -89,7 +91,7 @@ struct Logs {
impl Logs {
/// Builds a tracing layer from the current log options.
fn layer<S>(&self) -> (BoxedLayer<S>, Option<tracing_appender::non_blocking::WorkerGuard>)
fn layer<S>(&self) -> (BoxedLayer<S>, Option<FileWorkerGuard>)
where
S: Subscriber,
for<'a> S: LookupSpan<'a>,

View File

@ -21,6 +21,3 @@ walkdir = "2.3"
# Tracing
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-appender = "0.2"
tracing-journald = "0.3"

View File

@ -71,89 +71,6 @@ pub fn parse_socket_address(value: &str) -> Result<SocketAddr, eyre::Error> {
.ok_or_else(|| eyre::eyre!("Could not parse socket address from {}", value))
}
/// Tracing utility
pub mod reth_tracing {
use std::path::Path;
use tracing::Subscriber;
use tracing_subscriber::{
filter::Directive, prelude::*, registry::LookupSpan, EnvFilter, Layer, Registry,
};
/// A boxed tracing [Layer].
pub type BoxedLayer<S> = Box<dyn Layer<S> + Send + Sync>;
/// Initializes a new [Subscriber] based on the given layers.
pub fn init(layers: Vec<BoxedLayer<Registry>>) {
tracing_subscriber::registry().with(layers).init();
}
/// Builds a new tracing layer that writes to stdout.
///
/// The events are filtered by `default_directive`, unless overriden by `RUST_LOG`.
///
/// Colors can be disabled with `RUST_LOG_STYLE=never`, and event targets can be displayed with
/// `RUST_LOG_TARGET=1`.
pub fn stdout<S>(default_directive: impl Into<Directive>) -> BoxedLayer<S>
where
S: Subscriber,
for<'a> S: LookupSpan<'a>,
{
// TODO: Auto-detect
let with_ansi = std::env::var("RUST_LOG_STYLE").map(|val| val != "never").unwrap_or(true);
let with_target = std::env::var("RUST_LOG_TARGET").map(|val| val != "0").unwrap_or(false);
let filter =
EnvFilter::builder().with_default_directive(default_directive.into()).from_env_lossy();
tracing_subscriber::fmt::layer()
.with_ansi(with_ansi)
.with_target(with_target)
.with_filter(filter)
.boxed()
}
/// Builds a new tracing layer that appends to a log file.
///
/// The events are filtered by `directive`.
///
/// The boxed layer and a guard is returned. When the guard is dropped the buffer for the log
/// file is immediately flushed to disk. Any events after the guard is dropped may be missed.
pub fn file<S>(
directive: impl Into<Directive>,
dir: impl AsRef<Path>,
file_name: impl AsRef<Path>,
) -> (BoxedLayer<S>, tracing_appender::non_blocking::WorkerGuard)
where
S: Subscriber,
for<'a> S: LookupSpan<'a>,
{
let (writer, guard) =
tracing_appender::non_blocking(tracing_appender::rolling::never(dir, file_name));
let layer = tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_writer(writer)
.with_filter(EnvFilter::default().add_directive(directive.into()))
.boxed();
(layer, guard)
}
/// Builds a new tracing layer that writes events to journald.
///
/// The events are filtered by `directive`.
///
/// If the layer cannot connect to journald for any reason this function will return an error.
pub fn journald<S>(directive: impl Into<Directive>) -> std::io::Result<BoxedLayer<S>>
where
S: Subscriber,
for<'a> S: LookupSpan<'a>,
{
Ok(tracing_journald::layer()?
.with_filter(EnvFilter::default().add_directive(directive.into()))
.boxed())
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -1887,7 +1887,7 @@ mod tests {
#[tokio::test(flavor = "multi_thread")]
#[ignore]
async fn test_lookup() {
reth_tracing::init_tracing();
reth_tracing::init_test_tracing();
let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
let all_nodes = mainnet_nodes();
@ -1922,7 +1922,7 @@ mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn test_mapped_ipv4() {
reth_tracing::init_tracing();
reth_tracing::init_test_tracing();
let mut rng = thread_rng();
let config = Discv4Config::builder().build();
let (_discv4, mut service) = create_discv4_with_config(config).await;
@ -1954,7 +1954,7 @@ mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn test_single_lookups() {
reth_tracing::init_tracing();
reth_tracing::init_test_tracing();
let config = Discv4Config::builder().build();
let (_discv4, mut service) = create_discv4_with_config(config).await;
@ -1988,7 +1988,7 @@ mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn test_no_local_in_closest() {
reth_tracing::init_tracing();
reth_tracing::init_test_tracing();
let config = Discv4Config::builder().build();
let (_discv4, mut service) = create_discv4_with_config(config).await;
@ -2021,7 +2021,7 @@ mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn test_random_lookup() {
reth_tracing::init_tracing();
reth_tracing::init_test_tracing();
let config = Discv4Config::builder().build();
let (_discv4, mut service) = create_discv4_with_config(config).await;
@ -2055,7 +2055,7 @@ mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn test_service_commands() {
reth_tracing::init_tracing();
reth_tracing::init_test_tracing();
let config = Discv4Config::builder().build();
let (discv4, mut service) = create_discv4_with_config(config).await;

View File

@ -291,7 +291,7 @@ mod tests {
/// messages and we check the actual service receives answers
#[tokio::test(flavor = "multi_thread")]
async fn can_mock_discovery() {
reth_tracing::init_tracing();
reth_tracing::init_test_tracing();
let mut rng = thread_rng();
let (_, mut service) = create_discv4().await;

View File

@ -769,7 +769,7 @@ mod tests {
#[tokio::test]
async fn test_can_disconnect() {
reth_tracing::init_tracing();
reth_tracing::init_test_tracing();
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let local_addr = listener.local_addr().unwrap();

View File

@ -197,7 +197,7 @@ mod tests {
#[tokio::test]
#[ignore]
async fn get_external_ip() {
reth_tracing::init_tracing();
reth_tracing::init_test_tracing();
let ip = external_ip().await;
dbg!(ip);
}
@ -205,7 +205,7 @@ mod tests {
#[tokio::test]
#[ignore]
async fn get_external_ip_interval() {
reth_tracing::init_tracing();
reth_tracing::init_test_tracing();
let mut interval = ResolveNatInterval::interval(Default::default(), Duration::from_secs(5));
let ip = interval.tick().await;

View File

@ -566,7 +566,7 @@ mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn test_ignored_tx_broadcasts_while_syncing() {
reth_tracing::init_tracing();
reth_tracing::init_test_tracing();
let secret_key = SecretKey::new(&mut rand::thread_rng());

View File

@ -34,7 +34,7 @@ fn enr_to_peer_id(enr: Enr<SigningKey>) -> PeerId {
#[tokio::test(flavor = "multi_thread")]
async fn test_establish_connections() {
reth_tracing::init_tracing();
reth_tracing::init_test_tracing();
for _ in 0..10 {
let net = Testnet::create(3).await;
@ -94,7 +94,7 @@ async fn test_establish_connections() {
#[tokio::test(flavor = "multi_thread")]
async fn test_already_connected() {
reth_tracing::init_tracing();
reth_tracing::init_test_tracing();
let mut net = Testnet::default();
let secret_key = SecretKey::new(&mut rand::thread_rng());
@ -138,7 +138,7 @@ async fn test_already_connected() {
#[tokio::test(flavor = "multi_thread")]
async fn test_get_peer() {
reth_tracing::init_tracing();
reth_tracing::init_test_tracing();
let mut net = Testnet::default();
let secret_key = SecretKey::new(&mut rand::thread_rng());
@ -173,7 +173,7 @@ async fn test_get_peer() {
#[tokio::test(flavor = "multi_thread")]
async fn test_get_peer_by_id() {
reth_tracing::init_tracing();
reth_tracing::init_test_tracing();
let mut net = Testnet::default();
let secret_key = SecretKey::new(&mut rand::thread_rng());
@ -208,7 +208,7 @@ async fn test_get_peer_by_id() {
#[tokio::test(flavor = "multi_thread")]
#[ignore]
async fn test_connect_with_boot_nodes() {
reth_tracing::init_tracing();
reth_tracing::init_test_tracing();
let secret_key = SecretKey::new(&mut rand::thread_rng());
let mut discv4 = Discv4Config::builder();
discv4.add_boot_nodes(mainnet_nodes());
@ -230,7 +230,7 @@ async fn test_connect_with_boot_nodes() {
#[tokio::test(flavor = "multi_thread")]
#[ignore]
async fn test_connect_with_builder() {
reth_tracing::init_tracing();
reth_tracing::init_test_tracing();
let secret_key = SecretKey::new(&mut rand::thread_rng());
let mut discv4 = Discv4Config::builder();
discv4.add_boot_nodes(mainnet_nodes());
@ -267,7 +267,7 @@ async fn test_connect_with_builder() {
#[tokio::test(flavor = "multi_thread")]
#[ignore]
async fn test_connect_to_trusted_peer() {
reth_tracing::init_tracing();
reth_tracing::init_test_tracing();
let secret_key = SecretKey::new(&mut rand::thread_rng());
let discv4 = Discv4Config::builder();
@ -320,7 +320,7 @@ async fn test_connect_to_trusted_peer() {
#[tokio::test(flavor = "multi_thread")]
async fn test_incoming_node_id_blacklist() {
reth_tracing::init_tracing();
reth_tracing::init_test_tracing();
tokio::time::timeout(GETH_TIMEOUT, async move {
let secret_key = SecretKey::new(&mut rand::thread_rng());
@ -373,7 +373,7 @@ async fn test_incoming_node_id_blacklist() {
#[tokio::test(flavor = "multi_thread")]
#[serial_test::serial]
async fn test_incoming_connect_with_single_geth() {
reth_tracing::init_tracing();
reth_tracing::init_test_tracing();
tokio::time::timeout(GETH_TIMEOUT, async move {
let secret_key = SecretKey::new(&mut rand::thread_rng());
@ -417,7 +417,7 @@ async fn test_incoming_connect_with_single_geth() {
#[tokio::test(flavor = "multi_thread")]
#[serial_test::serial]
async fn test_outgoing_connect_with_single_geth() {
reth_tracing::init_tracing();
reth_tracing::init_test_tracing();
tokio::time::timeout(GETH_TIMEOUT, async move {
let secret_key = SecretKey::new(&mut rand::thread_rng());
@ -464,7 +464,7 @@ async fn test_outgoing_connect_with_single_geth() {
#[tokio::test(flavor = "multi_thread")]
#[serial_test::serial]
async fn test_geth_disconnect() {
reth_tracing::init_tracing();
reth_tracing::init_test_tracing();
tokio::time::timeout(GETH_TIMEOUT, async move {
let secret_key = SecretKey::new(&mut rand::thread_rng());

View File

@ -34,7 +34,7 @@ pub fn rng_transaction(rng: &mut impl rand::RngCore) -> TransactionSigned {
#[tokio::test(flavor = "multi_thread")]
async fn test_get_body() {
reth_tracing::init_tracing();
reth_tracing::init_test_tracing();
let mut rng = rand::thread_rng();
let mock_provider = Arc::new(MockEthProvider::default());
@ -77,7 +77,7 @@ async fn test_get_body() {
#[tokio::test(flavor = "multi_thread")]
async fn test_get_header() {
reth_tracing::init_tracing();
reth_tracing::init_test_tracing();
let mut rng = rand::thread_rng();
let mock_provider = Arc::new(MockEthProvider::default());

View File

@ -9,4 +9,7 @@ description = "tracing helpers"
[dependencies]
tracing = { version = "0.1", default-features = false }
tracing-subscriber = { version = "0.3", default-features = false, features = ["env-filter", "fmt"] }
tracing-subscriber = { version = "0.3", default-features = false, features = ["env-filter", "fmt"] }
tracing-futures = "0.2"
tracing-appender = "0.2"
tracing-journald = "0.3"

View File

@ -4,19 +4,115 @@
no_crate_inject,
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
))]
//! reth tracing subscribers and utilities.
//!
//! Contains a standardized set of layers:
//!
//! - [`stdout()`]
//! - [`file()`]
//! - [`journald()`]
//!
//! As well as a simple way to initialize a subscriber: [`init`].
use std::path::Path;
use tracing::Subscriber;
use tracing_subscriber::{
filter::Directive, prelude::*, registry::LookupSpan, EnvFilter, Layer, Registry,
};
//! reth-tracing
// re-export tracing crates.
// Re-export tracing crates
pub use tracing;
pub use tracing_subscriber;
/// Initialises a tracing subscriber via `RUST_LOG` environment variable filter.
/// A boxed tracing [Layer].
pub type BoxedLayer<S> = Box<dyn Layer<S> + Send + Sync>;
/// Initializes a new [Subscriber] based on the given layers.
pub fn init(layers: Vec<BoxedLayer<Registry>>) {
tracing_subscriber::registry().with(layers).init();
}
/// Builds a new tracing layer that writes to stdout.
///
/// Note: This ignores any error and should be used for testing.
pub fn init_tracing() {
/// The events are filtered by `default_directive`, unless overriden by `RUST_LOG`.
///
/// Colors can be disabled with `RUST_LOG_STYLE=never`, and event targets can be displayed with
/// `RUST_LOG_TARGET=1`.
pub fn stdout<S>(default_directive: impl Into<Directive>) -> BoxedLayer<S>
where
S: Subscriber,
for<'a> S: LookupSpan<'a>,
{
// TODO: Auto-detect
let with_ansi = std::env::var("RUST_LOG_STYLE").map(|val| val != "never").unwrap_or(true);
let with_target = std::env::var("RUST_LOG_TARGET").map(|val| val != "0").unwrap_or(false);
let filter =
EnvFilter::builder().with_default_directive(default_directive.into()).from_env_lossy();
tracing_subscriber::fmt::layer()
.with_ansi(with_ansi)
.with_target(with_target)
.with_filter(filter)
.boxed()
}
/// Builds a new tracing layer that appends to a log file.
///
/// The events are filtered by `directive`.
///
/// The boxed layer and a guard is returned. When the guard is dropped the buffer for the log
/// file is immediately flushed to disk. Any events after the guard is dropped may be missed.
pub fn file<S>(
directive: impl Into<Directive>,
dir: impl AsRef<Path>,
file_name: impl AsRef<Path>,
) -> (BoxedLayer<S>, tracing_appender::non_blocking::WorkerGuard)
where
S: Subscriber,
for<'a> S: LookupSpan<'a>,
{
let (writer, guard) =
tracing_appender::non_blocking(tracing_appender::rolling::never(dir, file_name));
let layer = tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_writer(writer)
.with_filter(EnvFilter::default().add_directive(directive.into()))
.boxed();
(layer, guard)
}
/// A worker guard returned by [`file()`].
///
/// When a guard is dropped, all events currently in-memory are flushed to the log file this guard
/// belongs to.
pub type FileWorkerGuard = tracing_appender::non_blocking::WorkerGuard;
/// Builds a new tracing layer that writes events to journald.
///
/// The events are filtered by `directive`.
///
/// If the layer cannot connect to journald for any reason this function will return an error.
pub fn journald<S>(directive: impl Into<Directive>) -> std::io::Result<BoxedLayer<S>>
where
S: Subscriber,
for<'a> S: LookupSpan<'a>,
{
Ok(tracing_journald::layer()?
.with_filter(EnvFilter::default().add_directive(directive.into()))
.boxed())
}
/// Initializes a tracing subscriber for tests.
///
/// The filter is configurable via `RUST_LOG`.
///
/// # Note
///
/// The subscriber will silently fail if it could not be installed.
pub fn init_test_tracing() {
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_env_filter(EnvFilter::from_default_env())
.with_writer(std::io::stderr)
.try_init();
}