feat: move metrics to its own crate (#9691)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Luca Provini
2024-07-25 17:47:46 +02:00
committed by GitHub
parent abdbc44c9b
commit 93ebdf292b
24 changed files with 711 additions and 462 deletions

View File

@ -35,7 +35,6 @@ reth-discv4.workspace = true
reth-discv5.workspace = true
reth-net-nat.workspace = true
reth-network-peers.workspace = true
reth-tasks.workspace = true
reth-consensus-common.workspace = true
reth-prune-types.workspace = true
reth-stages-types.workspace = true
@ -44,15 +43,6 @@ reth-stages-types.workspace = true
alloy-genesis.workspace = true
alloy-rpc-types-engine.workspace = true
# async
tokio.workspace = true
# metrics
reth-metrics.workspace = true
metrics.workspace = true
metrics-exporter-prometheus.workspace = true
metrics-process.workspace = true
metrics-util.workspace = true
# misc
eyre.workspace = true
@ -61,17 +51,13 @@ humantime.workspace = true
const_format.workspace = true
rand.workspace = true
derive_more.workspace = true
once_cell.workspace = true
# io
dirs-next = "2.0.0"
shellexpand.workspace = true
serde_json.workspace = true
# http/rpc
http.workspace = true
jsonrpsee.workspace = true
tower.workspace = true
# tracing
tracing.workspace = true
@ -86,15 +72,11 @@ secp256k1 = { workspace = true, features = [
# async
futures.workspace = true
[target.'cfg(unix)'.dependencies]
tikv-jemalloc-ctl = { version = "0.5.0", optional = true }
[target.'cfg(target_os = "linux")'.dependencies]
procfs = "0.16.0"
[dev-dependencies]
# test vectors generation
proptest.workspace = true
tokio.workspace = true
[features]
optimism = [
@ -105,7 +87,7 @@ optimism = [
"reth-rpc-eth-types/optimism",
]
jemalloc = ["dep:tikv-jemalloc-ctl"]
[build-dependencies]
vergen = { version = "8.0.0", features = ["build", "cargo", "git", "gitcl"] }

View File

@ -12,14 +12,9 @@ pub mod args;
pub mod cli;
pub mod dirs;
pub mod exit;
pub mod metrics;
pub mod node_config;
pub mod utils;
pub mod version;
// Re-export for backwards compatibility.
pub use metrics::prometheus_exporter;
/// Re-exported from `reth_primitives`.
pub mod primitives {
pub use reth_primitives::*;

View File

@ -1,4 +0,0 @@
//! Metrics utilities for the node.
pub mod prometheus_exporter;
pub mod version_metrics;

View File

@ -1,319 +0,0 @@
//! Prometheus exporter
use crate::metrics::version_metrics::VersionInfo;
use eyre::WrapErr;
use http::{header::CONTENT_TYPE, HeaderValue, Response};
use metrics::describe_gauge;
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
use metrics_util::layers::{PrefixLayer, Stack};
use reth_db_api::database_metrics::DatabaseMetrics;
use reth_metrics::metrics::Unit;
use reth_provider::providers::StaticFileProvider;
use reth_tasks::TaskExecutor;
use std::{convert::Infallible, net::SocketAddr, sync::Arc};
pub(crate) trait Hook: Fn() + Send + Sync {}
impl<T: Fn() + Send + Sync> Hook for T {}
/// Installs Prometheus as the metrics recorder.
pub fn install_recorder() -> eyre::Result<PrometheusHandle> {
let recorder = PrometheusBuilder::new().build_recorder();
let handle = recorder.handle();
// Build metrics stack
Stack::new(recorder)
.push(PrefixLayer::new("reth"))
.install()
.wrap_err("Couldn't set metrics recorder.")?;
Ok(handle)
}
/// Serves Prometheus metrics over HTTP with hooks.
///
/// The hooks are called every time the metrics are requested at the given endpoint, and can be used
/// to record values for pull-style metrics, i.e. metrics that are not automatically updated.
pub(crate) async fn serve_with_hooks<F: Hook + 'static>(
listen_addr: SocketAddr,
handle: PrometheusHandle,
hooks: impl IntoIterator<Item = F>,
task_executor: TaskExecutor,
) -> eyre::Result<()> {
let hooks: Vec<_> = hooks.into_iter().collect();
// Start endpoint
start_endpoint(
listen_addr,
handle,
Arc::new(move || hooks.iter().for_each(|hook| hook())),
task_executor,
)
.await
.wrap_err("Could not start Prometheus endpoint")?;
Ok(())
}
/// Starts an endpoint at the given address to serve Prometheus metrics.
async fn start_endpoint<F: Hook + 'static>(
listen_addr: SocketAddr,
handle: PrometheusHandle,
hook: Arc<F>,
task_executor: TaskExecutor,
) -> eyre::Result<()> {
let listener =
tokio::net::TcpListener::bind(listen_addr).await.wrap_err("Could not bind to address")?;
task_executor.spawn_with_graceful_shutdown_signal(|mut signal| async move {
loop {
let io = tokio::select! {
_ = &mut signal => break,
io = listener.accept() => {
match io {
Ok((stream, _remote_addr)) => stream,
Err(err) => {
tracing::error!(%err, "failed to accept connection");
continue;
}
}
}
};
let handle = handle.clone();
let hook = hook.clone();
let service = tower::service_fn(move |_| {
(hook)();
let metrics = handle.render();
let mut response = Response::new(metrics);
response.headers_mut().insert(CONTENT_TYPE, HeaderValue::from_static("text/plain"));
async move { Ok::<_, Infallible>(response) }
});
let mut shutdown = signal.clone().ignore_guard();
tokio::task::spawn(async move {
if let Err(error) =
jsonrpsee::server::serve_with_graceful_shutdown(io, service, &mut shutdown)
.await
{
tracing::debug!(%error, "failed to serve request")
}
});
}
});
Ok(())
}
/// Serves Prometheus metrics over HTTP with database and process metrics.
pub async fn serve<Metrics>(
listen_addr: SocketAddr,
handle: PrometheusHandle,
db: Metrics,
static_file_provider: StaticFileProvider,
process: metrics_process::Collector,
task_executor: TaskExecutor,
) -> eyre::Result<()>
where
Metrics: DatabaseMetrics + 'static + Send + Sync,
{
let db_metrics_hook = move || db.report_metrics();
let static_file_metrics_hook = move || {
let _ = static_file_provider.report_metrics().map_err(
|error| tracing::error!(%error, "Failed to report static file provider metrics"),
);
};
// Clone `process` to move it into the hook and use the original `process` for describe below.
let cloned_process = process.clone();
let hooks: Vec<Box<dyn Hook<Output = ()>>> = vec![
Box::new(db_metrics_hook),
Box::new(static_file_metrics_hook),
Box::new(move || cloned_process.collect()),
Box::new(collect_memory_stats),
Box::new(collect_io_stats),
];
serve_with_hooks(listen_addr, handle, hooks, task_executor).await?;
// We describe the metrics after the recorder is installed, otherwise this information is not
// registered
describe_gauge!("db.table_size", Unit::Bytes, "The size of a database table (in bytes)");
describe_gauge!("db.table_pages", "The number of database pages for a table");
describe_gauge!("db.table_entries", "The number of entries for a table");
describe_gauge!("db.freelist", "The number of pages on the freelist");
describe_gauge!("db.page_size", Unit::Bytes, "The size of a database page (in bytes)");
describe_gauge!(
"db.timed_out_not_aborted_transactions",
"Number of timed out transactions that were not aborted by the user yet"
);
describe_gauge!("static_files.segment_size", Unit::Bytes, "The size of a static file segment");
describe_gauge!("static_files.segment_files", "The number of files for a static file segment");
describe_gauge!(
"static_files.segment_entries",
"The number of entries for a static file segment"
);
process.describe();
describe_memory_stats();
describe_io_stats();
VersionInfo::default().register_version_metrics();
Ok(())
}
#[cfg(all(feature = "jemalloc", unix))]
fn collect_memory_stats() {
use metrics::gauge;
use tikv_jemalloc_ctl::{epoch, stats};
use tracing::error;
if epoch::advance().map_err(|error| error!(%error, "Failed to advance jemalloc epoch")).is_err()
{
return
}
if let Ok(value) = stats::active::read()
.map_err(|error| error!(%error, "Failed to read jemalloc.stats.active"))
{
gauge!("jemalloc.active").set(value as f64);
}
if let Ok(value) = stats::allocated::read()
.map_err(|error| error!(%error, "Failed to read jemalloc.stats.allocated"))
{
gauge!("jemalloc.allocated").set(value as f64);
}
if let Ok(value) = stats::mapped::read()
.map_err(|error| error!(%error, "Failed to read jemalloc.stats.mapped"))
{
gauge!("jemalloc.mapped").set(value as f64);
}
if let Ok(value) = stats::metadata::read()
.map_err(|error| error!(%error, "Failed to read jemalloc.stats.metadata"))
{
gauge!("jemalloc.metadata").set(value as f64);
}
if let Ok(value) = stats::resident::read()
.map_err(|error| error!(%error, "Failed to read jemalloc.stats.resident"))
{
gauge!("jemalloc.resident").set(value as f64);
}
if let Ok(value) = stats::retained::read()
.map_err(|error| error!(%error, "Failed to read jemalloc.stats.retained"))
{
gauge!("jemalloc.retained").set(value as f64);
}
}
#[cfg(all(feature = "jemalloc", unix))]
fn describe_memory_stats() {
describe_gauge!(
"jemalloc.active",
Unit::Bytes,
"Total number of bytes in active pages allocated by the application"
);
describe_gauge!(
"jemalloc.allocated",
Unit::Bytes,
"Total number of bytes allocated by the application"
);
describe_gauge!(
"jemalloc.mapped",
Unit::Bytes,
"Total number of bytes in active extents mapped by the allocator"
);
describe_gauge!(
"jemalloc.metadata",
Unit::Bytes,
"Total number of bytes dedicated to jemalloc metadata"
);
describe_gauge!(
"jemalloc.resident",
Unit::Bytes,
"Total number of bytes in physically resident data pages mapped by the allocator"
);
describe_gauge!(
"jemalloc.retained",
Unit::Bytes,
"Total number of bytes in virtual memory mappings that were retained rather than \
being returned to the operating system via e.g. munmap(2)"
);
}
#[cfg(not(all(feature = "jemalloc", unix)))]
fn collect_memory_stats() {}
#[cfg(not(all(feature = "jemalloc", unix)))]
fn describe_memory_stats() {}
#[cfg(target_os = "linux")]
fn collect_io_stats() {
use metrics::counter;
use tracing::error;
let Ok(process) = procfs::process::Process::myself()
.map_err(|error| error!(%error, "Failed to get currently running process"))
else {
return
};
let Ok(io) = process.io().map_err(
|error| error!(%error, "Failed to get IO stats for the currently running process"),
) else {
return
};
counter!("io.rchar").absolute(io.rchar);
counter!("io.wchar").absolute(io.wchar);
counter!("io.syscr").absolute(io.syscr);
counter!("io.syscw").absolute(io.syscw);
counter!("io.read_bytes").absolute(io.read_bytes);
counter!("io.write_bytes").absolute(io.write_bytes);
counter!("io.cancelled_write_bytes").absolute(io.cancelled_write_bytes);
}
#[cfg(target_os = "linux")]
fn describe_io_stats() {
use metrics::describe_counter;
describe_counter!("io.rchar", "Characters read");
describe_counter!("io.wchar", "Characters written");
describe_counter!("io.syscr", "Read syscalls");
describe_counter!("io.syscw", "Write syscalls");
describe_counter!("io.read_bytes", Unit::Bytes, "Bytes read");
describe_counter!("io.write_bytes", Unit::Bytes, "Bytes written");
describe_counter!("io.cancelled_write_bytes", Unit::Bytes, "Cancelled write bytes");
}
#[cfg(not(target_os = "linux"))]
const fn collect_io_stats() {}
#[cfg(not(target_os = "linux"))]
const fn describe_io_stats() {}
#[cfg(test)]
mod tests {
use crate::node_config::PROMETHEUS_RECORDER_HANDLE;
// Dependencies using different version of the `metrics` crate (to be exact, 0.21 vs 0.22)
// may not be able to communicate with each other through the global recorder.
//
// This test ensures that `metrics-process` dependency plays well with the current
// `metrics-exporter-prometheus` dependency version.
#[test]
fn process_metrics() {
// initialize the lazy handle
let _ = &*PROMETHEUS_RECORDER_HANDLE;
let process = metrics_process::Collector::default();
process.describe();
process.collect();
let metrics = PROMETHEUS_RECORDER_HANDLE.render();
assert!(metrics.contains("process_cpu_seconds_total"), "{metrics:?}");
}
}

View File

@ -1,51 +0,0 @@
//! This exposes reth's version information over prometheus.
use crate::version::{BUILD_PROFILE_NAME, VERGEN_GIT_SHA};
use metrics::gauge;
/// Contains version information for the application.
#[derive(Debug, Clone)]
pub struct VersionInfo {
/// The version of the application.
pub version: &'static str,
/// The build timestamp of the application.
pub build_timestamp: &'static str,
/// The cargo features enabled for the build.
pub cargo_features: &'static str,
/// The Git SHA of the build.
pub git_sha: &'static str,
/// The target triple for the build.
pub target_triple: &'static str,
/// The build profile (e.g., debug or release).
pub build_profile: &'static str,
}
impl Default for VersionInfo {
fn default() -> Self {
Self {
version: env!("CARGO_PKG_VERSION"),
build_timestamp: env!("VERGEN_BUILD_TIMESTAMP"),
cargo_features: env!("VERGEN_CARGO_FEATURES"),
git_sha: VERGEN_GIT_SHA,
target_triple: env!("VERGEN_CARGO_TARGET_TRIPLE"),
build_profile: BUILD_PROFILE_NAME,
}
}
}
impl VersionInfo {
/// This exposes reth's version information over prometheus.
pub fn register_version_metrics(&self) {
let labels: [(&str, &str); 6] = [
("version", self.version),
("build_timestamp", self.build_timestamp),
("cargo_features", self.cargo_features),
("git_sha", self.git_sha),
("target_triple", self.target_triple),
("build_profile", self.build_profile),
];
let gauge = gauge!("info", &labels);
gauge.set(1)
}
}

View File

@ -6,39 +6,27 @@ use crate::{
PruningArgs, RpcServerArgs, TxPoolArgs,
},
dirs::{ChainPath, DataDirPath},
metrics::prometheus_exporter,
utils::get_single_header,
};
use metrics_exporter_prometheus::PrometheusHandle;
use once_cell::sync::Lazy;
use reth_chainspec::{ChainSpec, MAINNET};
use reth_config::config::PruneConfig;
use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
use reth_db_api::database::Database;
use reth_network_p2p::headers::client::HeadersClient;
use reth_primitives::{
revm_primitives::EnvKzgSettings, BlockHashOrNumber, BlockNumber, Head, SealedHeader, B256,
};
use reth_provider::{
providers::StaticFileProvider, BlockHashReader, HeaderProvider, ProviderFactory,
StageCheckpointReader,
};
use reth_provider::{BlockHashReader, HeaderProvider, ProviderFactory, StageCheckpointReader};
use reth_stages_types::StageId;
use reth_storage_errors::provider::ProviderResult;
use reth_tasks::TaskExecutor;
use std::{net::SocketAddr, path::PathBuf, sync::Arc};
use tracing::*;
/// The default prometheus recorder handle. We use a global static to ensure that it is only
/// installed once.
pub static PROMETHEUS_RECORDER_HANDLE: Lazy<PrometheusHandle> =
Lazy::new(|| prometheus_exporter::install_recorder().unwrap());
/// This includes all necessary configuration to launch the node.
/// The individual configuration options can be overwritten before launching the node.
///
/// # Example
/// ```rust
/// # use reth_tasks::{TaskManager, TaskSpawner};
/// # use reth_node_core::{
/// # node_config::NodeConfig,
/// # args::RpcServerArgs,
@ -47,10 +35,6 @@ pub static PROMETHEUS_RECORDER_HANDLE: Lazy<PrometheusHandle> =
/// # use tokio::runtime::Handle;
///
/// async fn t() {
/// let handle = Handle::current();
/// let manager = TaskManager::new(handle);
/// let executor = manager.executor();
///
/// // create the builder
/// let builder = NodeConfig::default();
///
@ -66,7 +50,6 @@ pub static PROMETHEUS_RECORDER_HANDLE: Lazy<PrometheusHandle> =
///
/// # Example
/// ```rust
/// # use reth_tasks::{TaskManager, TaskSpawner};
/// # use reth_node_core::{
/// # node_config::NodeConfig,
/// # args::RpcServerArgs,
@ -75,10 +58,6 @@ pub static PROMETHEUS_RECORDER_HANDLE: Lazy<PrometheusHandle> =
/// # use tokio::runtime::Handle;
///
/// async fn t() {
/// let handle = Handle::current();
/// let manager = TaskManager::new(handle);
/// let executor = manager.executor();
///
/// // create the builder with a test database, using the `test` method
/// let builder = NodeConfig::test();
///
@ -284,38 +263,6 @@ impl NodeConfig {
Ok(EnvKzgSettings::Default)
}
/// Installs the prometheus recorder.
pub fn install_prometheus_recorder(&self) -> eyre::Result<PrometheusHandle> {
Ok(PROMETHEUS_RECORDER_HANDLE.clone())
}
/// Serves the prometheus endpoint over HTTP with the given database and prometheus handle.
pub async fn start_metrics_endpoint<Metrics>(
&self,
prometheus_handle: PrometheusHandle,
db: Metrics,
static_file_provider: StaticFileProvider,
task_executor: TaskExecutor,
) -> eyre::Result<()>
where
Metrics: DatabaseMetrics + 'static + Send + Sync,
{
if let Some(listen_addr) = self.metrics {
info!(target: "reth::cli", addr = %listen_addr, "Starting metrics endpoint");
prometheus_exporter::serve(
listen_addr,
prometheus_handle,
db,
static_file_provider,
metrics_process::Collector::default(),
task_executor,
)
.await?;
}
Ok(())
}
/// Fetches the head block from the database.
///
/// If the database is empty, returns the genesis block.

View File

@ -20,6 +20,12 @@ pub const VERGEN_GIT_SHA: &str = const_format::str_index!(VERGEN_GIT_SHA_LONG, .
/// The build timestamp.
pub const VERGEN_BUILD_TIMESTAMP: &str = env!("VERGEN_BUILD_TIMESTAMP");
/// The target triple.
pub const VERGEN_CARGO_TARGET_TRIPLE: &str = env!("VERGEN_CARGO_TARGET_TRIPLE");
/// The build features.
pub const VERGEN_CARGO_FEATURES: &str = env!("VERGEN_CARGO_FEATURES");
/// The short version information for reth.
///
/// - The latest version from Cargo.toml
@ -73,7 +79,8 @@ pub const LONG_VERSION: &str = const_format::concatcp!(
BUILD_PROFILE_NAME
);
pub(crate) const BUILD_PROFILE_NAME: &str = {
/// The build profile name.
pub const BUILD_PROFILE_NAME: &str = {
// Derived from https://stackoverflow.com/questions/73595435/how-to-get-profile-from-cargo-toml-in-build-rs-or-at-runtime
// We split on the path separator of the *host* machine, which may be different from
// `std::path::MAIN_SEPARATOR_STR`.