mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat(node-core, storage): static files segment metrics (#6908)
This commit is contained in:
@ -8,7 +8,6 @@ use reth_db::{database::Database, mdbx, static_file::iter_static_files, Database
|
||||
use reth_node_core::dirs::{ChainPath, DataDirPath};
|
||||
use reth_primitives::static_file::{find_fixed_range, SegmentRangeInclusive};
|
||||
use reth_provider::providers::StaticFileProvider;
|
||||
use std::fs::File;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
/// The arguments for the `reth db stats` command
|
||||
@ -174,20 +173,17 @@ impl Command {
|
||||
|
||||
let columns = jar_provider.columns();
|
||||
let rows = jar_provider.rows();
|
||||
let data_size = File::open(jar_provider.data_path())
|
||||
.and_then(|file| file.metadata())
|
||||
|
||||
let data_size = reth_primitives::fs::metadata(jar_provider.data_path())
|
||||
.map(|metadata| metadata.len())
|
||||
.unwrap_or_default();
|
||||
let index_size = File::open(jar_provider.index_path())
|
||||
.and_then(|file| file.metadata())
|
||||
let index_size = reth_primitives::fs::metadata(jar_provider.index_path())
|
||||
.map(|metadata| metadata.len())
|
||||
.unwrap_or_default();
|
||||
let offsets_size = File::open(jar_provider.offsets_path())
|
||||
.and_then(|file| file.metadata())
|
||||
let offsets_size = reth_primitives::fs::metadata(jar_provider.offsets_path())
|
||||
.map(|metadata| metadata.len())
|
||||
.unwrap_or_default();
|
||||
let config_size = File::open(jar_provider.config_path())
|
||||
.and_then(|file| file.metadata())
|
||||
let config_size = reth_primitives::fs::metadata(jar_provider.config_path())
|
||||
.map(|metadata| metadata.len())
|
||||
.unwrap_or_default();
|
||||
|
||||
|
||||
@ -144,6 +144,7 @@ impl Command {
|
||||
listen_addr,
|
||||
prometheus_exporter::install_recorder()?,
|
||||
Arc::clone(&db),
|
||||
factory.static_file_provider(),
|
||||
metrics_process::Collector::default(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@ -395,17 +395,22 @@ where
|
||||
// Does not do anything on windows.
|
||||
fdlimit::raise_fd_limit()?;
|
||||
|
||||
let prometheus_handle = config.install_prometheus_recorder()?;
|
||||
config.start_metrics_endpoint(prometheus_handle, database.clone()).await?;
|
||||
|
||||
info!(target: "reth::cli", "Database opened");
|
||||
|
||||
let provider_factory = ProviderFactory::new(
|
||||
database.clone(),
|
||||
Arc::clone(&config.chain),
|
||||
data_dir.static_files_path(),
|
||||
)?
|
||||
.with_static_files_metrics();
|
||||
info!(target: "reth::cli", "Database opened");
|
||||
|
||||
let prometheus_handle = config.install_prometheus_recorder()?;
|
||||
config
|
||||
.start_metrics_endpoint(
|
||||
prometheus_handle,
|
||||
database.clone(),
|
||||
provider_factory.static_file_provider(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
debug!(target: "reth::cli", chain=%config.chain.chain, genesis=?config.chain.genesis_hash(), "Initializing genesis");
|
||||
|
||||
|
||||
@ -11,6 +11,7 @@ use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
|
||||
use metrics_util::layers::{PrefixLayer, Stack};
|
||||
use reth_db::database_metrics::DatabaseMetrics;
|
||||
use reth_metrics::metrics::Unit;
|
||||
use reth_provider::providers::StaticFileProvider;
|
||||
use std::{convert::Infallible, net::SocketAddr, sync::Arc};
|
||||
|
||||
pub(crate) trait Hook: Fn() + Send + Sync {}
|
||||
@ -79,17 +80,24 @@ pub async fn serve<Metrics>(
|
||||
listen_addr: SocketAddr,
|
||||
handle: PrometheusHandle,
|
||||
db: Metrics,
|
||||
static_file_provider: StaticFileProvider,
|
||||
process: metrics_process::Collector,
|
||||
) -> eyre::Result<()>
|
||||
where
|
||||
Metrics: DatabaseMetrics + 'static + Send + Sync,
|
||||
{
|
||||
let db_metrics_hook = move || db.report_metrics();
|
||||
let static_file_metrics_hook = move || {
|
||||
let _ = static_file_provider.report_metrics().map_err(
|
||||
|error| tracing::error!(%error, "Failed to report static file provider metrics"),
|
||||
);
|
||||
};
|
||||
|
||||
// Clone `process` to move it into the hook and use the original `process` for describe below.
|
||||
let cloned_process = process.clone();
|
||||
let hooks: Vec<Box<dyn Hook<Output = ()>>> = vec![
|
||||
Box::new(db_metrics_hook),
|
||||
Box::new(static_file_metrics_hook),
|
||||
Box::new(move || cloned_process.collect()),
|
||||
Box::new(collect_memory_stats),
|
||||
Box::new(collect_io_stats),
|
||||
@ -106,6 +114,14 @@ where
|
||||
"db.timed_out_not_aborted_transactions",
|
||||
"Number of timed out transactions that were not aborted by the user yet"
|
||||
);
|
||||
|
||||
describe_gauge!("static_files.segment_size", Unit::Bytes, "The size of a static file segment");
|
||||
describe_gauge!("static_files.segment_files", "The number of files for a static file segment");
|
||||
describe_gauge!(
|
||||
"static_files.segment_entries",
|
||||
"The number of entries for a static file segment"
|
||||
);
|
||||
|
||||
process.describe();
|
||||
describe_memory_stats();
|
||||
describe_io_stats();
|
||||
|
||||
@ -47,9 +47,10 @@ use reth_primitives::{
|
||||
BlockHashOrNumber, BlockNumber, ChainSpec, Head, SealedHeader, TxHash, B256, MAINNET,
|
||||
};
|
||||
use reth_provider::{
|
||||
providers::BlockchainProvider, BlockHashReader, BlockNumReader, BlockReader,
|
||||
BlockchainTreePendingStateProvider, CanonStateSubscriptions, HeaderProvider, HeaderSyncMode,
|
||||
ProviderFactory, StageCheckpointReader,
|
||||
providers::{BlockchainProvider, StaticFileProvider},
|
||||
BlockHashReader, BlockNumReader, BlockReader, BlockchainTreePendingStateProvider,
|
||||
CanonStateSubscriptions, HeaderProvider, HeaderSyncMode, ProviderFactory,
|
||||
StageCheckpointReader,
|
||||
};
|
||||
use reth_revm::EvmProcessorFactory;
|
||||
use reth_stages::{
|
||||
@ -599,6 +600,7 @@ impl NodeConfig {
|
||||
&self,
|
||||
prometheus_handle: PrometheusHandle,
|
||||
db: Metrics,
|
||||
static_file_provider: StaticFileProvider,
|
||||
) -> eyre::Result<()>
|
||||
where
|
||||
Metrics: DatabaseMetrics + 'static + Send + Sync,
|
||||
@ -609,6 +611,7 @@ impl NodeConfig {
|
||||
listen_addr,
|
||||
prometheus_handle,
|
||||
db,
|
||||
static_file_provider,
|
||||
metrics_process::Collector::default(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@ -119,6 +119,15 @@ pub enum FsPathError {
|
||||
/// The path related to the operation.
|
||||
path: PathBuf,
|
||||
},
|
||||
|
||||
/// Error variant for failed file metadata operation with additional path context.
|
||||
#[error("failed to get metadata for {path:?}: {source}")]
|
||||
Metadata {
|
||||
/// The source `io::Error`.
|
||||
source: io::Error,
|
||||
/// The path related to the operation.
|
||||
path: PathBuf,
|
||||
},
|
||||
}
|
||||
|
||||
impl FsPathError {
|
||||
@ -171,6 +180,11 @@ impl FsPathError {
|
||||
pub fn rename(source: io::Error, from: impl Into<PathBuf>, to: impl Into<PathBuf>) -> Self {
|
||||
FsPathError::Rename { source, from: from.into(), to: to.into() }
|
||||
}
|
||||
|
||||
/// Returns the complementary error variant for [`std::fs::File::metadata`].
|
||||
pub fn metadata(source: io::Error, path: impl Into<PathBuf>) -> Self {
|
||||
FsPathError::Metadata { source, path: path.into() }
|
||||
}
|
||||
}
|
||||
|
||||
type Result<T> = std::result::Result<T, FsPathError>;
|
||||
@ -225,3 +239,9 @@ pub fn rename(from: impl AsRef<Path>, to: impl AsRef<Path>) -> Result<()> {
|
||||
let to = to.as_ref();
|
||||
fs::rename(from, to).map_err(|err| FsPathError::rename(err, from, to))
|
||||
}
|
||||
|
||||
/// Wrapper for `std::fs::metadata`
|
||||
pub fn metadata(path: impl AsRef<Path>) -> Result<fs::Metadata> {
|
||||
let path = path.as_ref();
|
||||
fs::metadata(path).map_err(|err| FsPathError::metadata(err, path))
|
||||
}
|
||||
|
||||
@ -114,6 +114,45 @@ impl StaticFileProvider {
|
||||
Self(Arc::new(provider))
|
||||
}
|
||||
|
||||
/// Reports metrics for the static files.
|
||||
pub fn report_metrics(&self) -> ProviderResult<()> {
|
||||
let Some(metrics) = &self.metrics else { return Ok(()) };
|
||||
|
||||
let static_files = iter_static_files(&self.path)?;
|
||||
for (segment, ranges) in static_files {
|
||||
let mut entries = 0;
|
||||
let mut size = 0;
|
||||
|
||||
for (block_range, _) in &ranges {
|
||||
let fixed_block_range = find_fixed_range(block_range.start());
|
||||
let jar_provider = self
|
||||
.get_segment_provider(segment, || Some(fixed_block_range), None)?
|
||||
.ok_or(ProviderError::MissingStaticFileBlock(segment, block_range.start()))?;
|
||||
|
||||
entries += jar_provider.rows();
|
||||
|
||||
let data_size = reth_primitives::fs::metadata(jar_provider.data_path())
|
||||
.map(|metadata| metadata.len())
|
||||
.unwrap_or_default();
|
||||
let index_size = reth_primitives::fs::metadata(jar_provider.index_path())
|
||||
.map(|metadata| metadata.len())
|
||||
.unwrap_or_default();
|
||||
let offsets_size = reth_primitives::fs::metadata(jar_provider.offsets_path())
|
||||
.map(|metadata| metadata.len())
|
||||
.unwrap_or_default();
|
||||
let config_size = reth_primitives::fs::metadata(jar_provider.config_path())
|
||||
.map(|metadata| metadata.len())
|
||||
.unwrap_or_default();
|
||||
|
||||
size += data_size + index_size + offsets_size + config_size;
|
||||
}
|
||||
|
||||
metrics.record_segment(segment, size, ranges.len(), entries);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Gets the [`StaticFileJarProvider`] of the requested segment and block.
|
||||
pub fn get_segment_provider_from_block(
|
||||
&self,
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
use std::{collections::HashMap, time::Duration};
|
||||
|
||||
use itertools::Itertools;
|
||||
use metrics::{Counter, Histogram};
|
||||
use metrics::{Counter, Gauge, Histogram};
|
||||
use reth_metrics::Metrics;
|
||||
use reth_primitives::StaticFileSegment;
|
||||
use strum::{EnumIter, IntoEnumIterator};
|
||||
@ -9,6 +9,7 @@ use strum::{EnumIter, IntoEnumIterator};
|
||||
/// Metrics for the static file provider.
|
||||
#[derive(Debug)]
|
||||
pub struct StaticFileProviderMetrics {
|
||||
segments: HashMap<StaticFileSegment, StaticFileSegmentMetrics>,
|
||||
segment_operations: HashMap<
|
||||
(StaticFileSegment, StaticFileProviderOperation),
|
||||
StaticFileProviderOperationMetrics,
|
||||
@ -18,6 +19,14 @@ pub struct StaticFileProviderMetrics {
|
||||
impl Default for StaticFileProviderMetrics {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
segments: StaticFileSegment::iter()
|
||||
.map(|segment| {
|
||||
(
|
||||
segment,
|
||||
StaticFileSegmentMetrics::new_with_labels(&[("segment", segment.as_str())]),
|
||||
)
|
||||
})
|
||||
.collect(),
|
||||
segment_operations: StaticFileSegment::iter()
|
||||
.cartesian_product(StaticFileProviderOperation::iter())
|
||||
.map(|(segment, operation)| {
|
||||
@ -35,6 +44,22 @@ impl Default for StaticFileProviderMetrics {
|
||||
}
|
||||
|
||||
impl StaticFileProviderMetrics {
|
||||
pub(crate) fn record_segment(
|
||||
&self,
|
||||
segment: StaticFileSegment,
|
||||
size: u64,
|
||||
files: usize,
|
||||
entries: usize,
|
||||
) {
|
||||
self.segments.get(&segment).expect("segment metrics should exist").size.set(size as f64);
|
||||
self.segments.get(&segment).expect("segment metrics should exist").files.set(files as f64);
|
||||
self.segments
|
||||
.get(&segment)
|
||||
.expect("segment metrics should exist")
|
||||
.entries
|
||||
.set(entries as f64);
|
||||
}
|
||||
|
||||
pub(crate) fn record_segment_operation(
|
||||
&self,
|
||||
segment: StaticFileSegment,
|
||||
@ -80,6 +105,18 @@ impl StaticFileProviderOperation {
|
||||
}
|
||||
}
|
||||
|
||||
/// Metrics for a specific static file segment.
|
||||
#[derive(Metrics)]
|
||||
#[metrics(scope = "static_files.segment")]
|
||||
pub(crate) struct StaticFileSegmentMetrics {
|
||||
/// The size of a static file segment
|
||||
size: Gauge,
|
||||
/// The number of files for a static file segment
|
||||
files: Gauge,
|
||||
/// The number of entries for a static file segment
|
||||
entries: Gauge,
|
||||
}
|
||||
|
||||
#[derive(Metrics)]
|
||||
#[metrics(scope = "static_files.jar_provider")]
|
||||
pub(crate) struct StaticFileProviderOperationMetrics {
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user