mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat: database metrics (#1870)
This commit is contained in:
@ -42,22 +42,30 @@ shellexpand = "3.0.0"
|
||||
dirs-next = "2.0.0"
|
||||
confy = "0.5"
|
||||
|
||||
# rpc/metrics
|
||||
metrics-exporter-prometheus = { version = "0.11.0", features = ["http-listener"] }
|
||||
# metrics
|
||||
metrics = "0.20.1"
|
||||
metrics-exporter-prometheus = "0.11.0"
|
||||
metrics-util = "0.14.0"
|
||||
|
||||
# test vectors generation
|
||||
proptest = "1.0"
|
||||
|
||||
# misc
|
||||
eyre = "0.6.8"
|
||||
clap = { version = "4", features = ["derive", "cargo"] }
|
||||
tokio = { version = "1.21", features = ["sync", "macros", "rt-multi-thread"] }
|
||||
futures = "0.3.25"
|
||||
tempfile = { version = "3.3.0" }
|
||||
backon = "0.4"
|
||||
# tui
|
||||
comfy-table = "6.1.4"
|
||||
crossterm = "0.25.0"
|
||||
tui = "0.19.0"
|
||||
jsonrpsee = { version = "0.16", features = ["server"] }
|
||||
human_bytes = "0.4.1"
|
||||
|
||||
# async
|
||||
tokio = { version = "1.21", features = ["sync", "macros", "rt-multi-thread"] }
|
||||
futures = "0.3.25"
|
||||
|
||||
# http/rpc
|
||||
hyper = "0.14.25"
|
||||
jsonrpsee = { version = "0.16", features = ["server"] }
|
||||
|
||||
# misc
|
||||
eyre = "0.6.8"
|
||||
clap = { version = "4", features = ["derive", "cargo"] }
|
||||
tempfile = { version = "3.3.0" }
|
||||
backon = "0.4"
|
||||
|
||||
@ -161,7 +161,6 @@ pub struct Command {
|
||||
|
||||
impl Command {
|
||||
/// Execute `node` command
|
||||
// TODO: RPC
|
||||
pub async fn execute(self, ctx: CliContext) -> eyre::Result<()> {
|
||||
info!(target: "reth::cli", "reth {} starting", crate_version!());
|
||||
|
||||
@ -177,7 +176,7 @@ impl Command {
|
||||
let shareable_db = ShareableDatabase::new(Arc::clone(&db), Arc::clone(&self.chain));
|
||||
info!(target: "reth::cli", "Database opened");
|
||||
|
||||
self.start_metrics_endpoint()?;
|
||||
self.start_metrics_endpoint(Arc::clone(&db)).await?;
|
||||
|
||||
debug!(target: "reth::cli", chain=%self.chain.chain, genesis=?self.chain.genesis_hash(), "Initializing genesis");
|
||||
|
||||
@ -338,13 +337,14 @@ impl Command {
|
||||
}
|
||||
}
|
||||
|
||||
fn start_metrics_endpoint(&self) -> eyre::Result<()> {
|
||||
async fn start_metrics_endpoint(&self, db: Arc<Env<WriteMap>>) -> eyre::Result<()> {
|
||||
if let Some(listen_addr) = self.metrics {
|
||||
info!(target: "reth::cli", addr = %listen_addr, "Starting metrics endpoint");
|
||||
prometheus_exporter::initialize(listen_addr)
|
||||
} else {
|
||||
Ok(())
|
||||
|
||||
prometheus_exporter::initialize_with_db_metrics(listen_addr, db).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn init_engine_api(
|
||||
|
||||
@ -1,16 +1,36 @@
|
||||
//! Prometheus exporter
|
||||
|
||||
use eyre::WrapErr;
|
||||
use metrics_exporter_prometheus::PrometheusBuilder;
|
||||
use hyper::{
|
||||
service::{make_service_fn, service_fn},
|
||||
Body, Request, Response, Server,
|
||||
};
|
||||
use metrics::Unit;
|
||||
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
|
||||
use metrics_util::layers::{PrefixLayer, Stack};
|
||||
use std::net::SocketAddr;
|
||||
use reth_db::{
|
||||
database::Database,
|
||||
mdbx::{Env, WriteMap},
|
||||
tables,
|
||||
};
|
||||
use std::{convert::Infallible, net::SocketAddr, sync::Arc};
|
||||
|
||||
pub(crate) fn initialize(listen_addr: SocketAddr) -> eyre::Result<()> {
|
||||
let (recorder, exporter) = PrometheusBuilder::new()
|
||||
.with_http_listener(listen_addr)
|
||||
.build()
|
||||
.wrap_err("Could not build Prometheus endpoint.")?;
|
||||
tokio::task::spawn(exporter);
|
||||
/// Installs Prometheus as the metrics recorder and serves it over HTTP with a hook.
|
||||
///
|
||||
/// The hook is called every time the metrics are requested at the given endpoint, and can be used
|
||||
/// to record values for pull-style metrics, i.e. metrics that are not automatically updated.
|
||||
pub(crate) async fn initialize_with_hook<F: Fn() + Send + Sync + 'static>(
|
||||
listen_addr: SocketAddr,
|
||||
hook: F,
|
||||
) -> eyre::Result<()> {
|
||||
let recorder = PrometheusBuilder::new().build_recorder();
|
||||
let handle = recorder.handle();
|
||||
|
||||
// Start endpoint
|
||||
start_endpoint(listen_addr, handle, Arc::new(hook))
|
||||
.await
|
||||
.wrap_err("Could not start Prometheus endpoint")?;
|
||||
|
||||
// Build metrics stack
|
||||
Stack::new(recorder)
|
||||
.push(PrefixLayer::new("reth"))
|
||||
.install()
|
||||
@ -18,3 +38,77 @@ pub(crate) fn initialize(listen_addr: SocketAddr) -> eyre::Result<()> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Starts an endpoint at the given address to serve Prometheus metrics.
|
||||
async fn start_endpoint<F: Fn() + Send + Sync + 'static>(
|
||||
listen_addr: SocketAddr,
|
||||
handle: PrometheusHandle,
|
||||
hook: Arc<F>,
|
||||
) -> eyre::Result<()> {
|
||||
let make_svc = make_service_fn(move |_| {
|
||||
let handle = handle.clone();
|
||||
let hook = Arc::clone(&hook);
|
||||
async move {
|
||||
Ok::<_, Infallible>(service_fn(move |_: Request<Body>| {
|
||||
(hook)();
|
||||
let metrics = handle.render();
|
||||
async move { Ok::<_, Infallible>(Response::new(Body::from(metrics))) }
|
||||
}))
|
||||
}
|
||||
});
|
||||
let server =
|
||||
Server::try_bind(&listen_addr).wrap_err("Could not bind to address")?.serve(make_svc);
|
||||
|
||||
tokio::spawn(async move { server.await.expect("Metrics endpoint crashed") });
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Installs Prometheus as the metrics recorder and serves it over HTTP with database metrics.
|
||||
pub(crate) async fn initialize_with_db_metrics(
|
||||
listen_addr: SocketAddr,
|
||||
db: Arc<Env<WriteMap>>,
|
||||
) -> eyre::Result<()> {
|
||||
let db_stats = move || {
|
||||
// TODO: A generic stats abstraction for other DB types to deduplicate this and `reth db
|
||||
// stats`
|
||||
let _ = db.view(|tx| {
|
||||
for table in tables::TABLES.iter().map(|(_, name)| name) {
|
||||
let table_db =
|
||||
tx.inner.open_db(Some(table)).wrap_err("Could not open db.")?;
|
||||
|
||||
let stats = tx
|
||||
.inner
|
||||
.db_stat(&table_db)
|
||||
.wrap_err(format!("Could not find table: {table}"))?;
|
||||
|
||||
let page_size = stats.page_size() as usize;
|
||||
let leaf_pages = stats.leaf_pages();
|
||||
let branch_pages = stats.branch_pages();
|
||||
let overflow_pages = stats.overflow_pages();
|
||||
let num_pages = leaf_pages + branch_pages + overflow_pages;
|
||||
let table_size = page_size * num_pages;
|
||||
|
||||
metrics::absolute_counter!("db.table_size", table_size as u64, "table" => *table);
|
||||
metrics::absolute_counter!("db.table_pages", leaf_pages as u64, "table" => *table, "type" => "leaf");
|
||||
metrics::absolute_counter!("db.table_pages", branch_pages as u64, "table" => *table, "type" => "branch");
|
||||
metrics::absolute_counter!("db.table_pages", overflow_pages as u64, "table" => *table, "type" => "overflow");
|
||||
}
|
||||
|
||||
Ok::<(), eyre::Report>(())
|
||||
});
|
||||
};
|
||||
|
||||
initialize_with_hook(listen_addr, db_stats).await?;
|
||||
|
||||
// We describe the metrics after the recorder is installed, otherwise this information is not
|
||||
// registered
|
||||
metrics::describe_counter!(
|
||||
"db.table_size",
|
||||
Unit::Bytes,
|
||||
"The size of a database table (in bytes)"
|
||||
);
|
||||
metrics::describe_counter!("db.table_pages", "The number of database pages for a table");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -93,11 +93,6 @@ impl Command {
|
||||
// Does not do anything on windows.
|
||||
fdlimit::raise_fd_limit();
|
||||
|
||||
if let Some(listen_addr) = self.metrics {
|
||||
info!(target: "reth::cli", "Starting metrics endpoint at {}", listen_addr);
|
||||
prometheus_exporter::initialize(listen_addr)?;
|
||||
}
|
||||
|
||||
let config: Config = confy::load_path(&self.config).unwrap_or_default();
|
||||
info!(target: "reth::cli", "reth {} starting stage {:?}", clap::crate_version!(), self.stage);
|
||||
|
||||
@ -111,6 +106,11 @@ impl Command {
|
||||
let db = Arc::new(init_db(&self.db)?);
|
||||
let mut tx = Transaction::new(db.as_ref())?;
|
||||
|
||||
if let Some(listen_addr) = self.metrics {
|
||||
info!(target: "reth::cli", "Starting metrics endpoint at {}", listen_addr);
|
||||
prometheus_exporter::initialize_with_db_metrics(listen_addr, Arc::clone(&db)).await?;
|
||||
}
|
||||
|
||||
let num_blocks = self.to - self.from + 1;
|
||||
|
||||
match self.stage {
|
||||
|
||||
Reference in New Issue
Block a user