From 66c2ee654ce32781f3cd5986e70996d2e5679d18 Mon Sep 17 00:00:00 2001 From: sprites0 <199826320+sprites0@users.noreply.github.com> Date: Thu, 11 Sep 2025 18:50:22 -0400 Subject: [PATCH] feat: Add block source metrics --- Cargo.lock | 1 + Cargo.toml | 1 + src/pseudo_peer/sources/hl_node/mod.rs | 15 +++++++++++++++ src/pseudo_peer/sources/local.rs | 17 ++++++++++++++++- src/pseudo_peer/sources/s3.rs | 22 +++++++++++++++++++++- 5 files changed, 54 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4781c77b6..e5040e931 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9310,6 +9310,7 @@ dependencies = [ "reth-ethereum-primitives", "reth-evm", "reth-evm-ethereum", + "reth-metrics", "reth-network", "reth-network-api", "reth-network-p2p", diff --git a/Cargo.toml b/Cargo.toml index a149f0900..503d679ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,6 +63,7 @@ reth-trie-db = { git = "https://github.com/sprites0/reth", rev = "a690ef25b56039 reth-codecs = { git = "https://github.com/sprites0/reth", rev = "a690ef25b56039195e7e4a4abd01c78aedcc73fb" } reth-transaction-pool = { git = "https://github.com/sprites0/reth", rev = "a690ef25b56039195e7e4a4abd01c78aedcc73fb" } reth-stages-types = { git = "https://github.com/sprites0/reth", rev = "a690ef25b56039195e7e4a4abd01c78aedcc73fb" } +reth-metrics = { git = "https://github.com/sprites0/reth", rev = "a690ef25b56039195e7e4a4abd01c78aedcc73fb" } revm = { version = "28.0.1", default-features = false } # alloy dependencies diff --git a/src/pseudo_peer/sources/hl_node/mod.rs b/src/pseudo_peer/sources/hl_node/mod.rs index 5317d3fde..c6a231dca 100644 --- a/src/pseudo_peer/sources/hl_node/mod.rs +++ b/src/pseudo_peer/sources/hl_node/mod.rs @@ -14,6 +14,7 @@ use self::{ use super::{BlockSource, BlockSourceBoxed}; use crate::node::types::BlockAndReceipts; use futures::future::BoxFuture; +use reth_metrics::{metrics, metrics::Counter, Metrics}; use std::{ path::{Path, PathBuf}, sync::Arc, @@ -41,6 +42,16 @@ pub struct HlNodeBlockSource { pub local_blocks_cache: Arc>, pub last_local_fetch: Arc>>, pub args: HlNodeBlockSourceArgs, + pub metrics: HlNodeBlockSourceMetrics, +} + +#[derive(Metrics, Clone)] +#[metrics(scope = "block_source.hl_node")] +pub struct HlNodeBlockSourceMetrics { + /// How many times the HL node block source is polling for a block + pub fetched_from_hl_node: Counter, + /// How many times the HL node block source is fetched from the fallback + pub fetched_from_fallback: Counter, } impl BlockSource for HlNodeBlockSource { @@ -49,11 +60,13 @@ impl BlockSource for HlNodeBlockSource { let args = self.args.clone(); let local_blocks_cache = self.local_blocks_cache.clone(); let last_local_fetch = self.last_local_fetch.clone(); + let metrics = self.metrics.clone(); Box::pin(async move { let now = OffsetDateTime::now_utc(); if let Some(block) = Self::try_collect_local_block(local_blocks_cache, height).await { Self::update_last_fetch(last_local_fetch, height, now).await; + metrics.fetched_from_hl_node.increment(1); return Ok(block); } @@ -68,6 +81,7 @@ impl BlockSource for HlNodeBlockSource { } let block = fallback.collect_block(height).await?; + metrics.fetched_from_fallback.increment(1); Self::update_last_fetch(last_local_fetch, height, now).await; Ok(block) }) @@ -224,6 +238,7 @@ impl HlNodeBlockSource { args, local_blocks_cache: Arc::new(Mutex::new(LocalBlocksCache::new(CACHE_SIZE))), last_local_fetch: Arc::new(Mutex::new(None)), + metrics: HlNodeBlockSourceMetrics::default(), }; block_source.run(next_block_number).await.unwrap(); block_source diff --git a/src/pseudo_peer/sources/local.rs b/src/pseudo_peer/sources/local.rs index 0c66b36ce..cdb7c6d27 100644 --- a/src/pseudo_peer/sources/local.rs +++ b/src/pseudo_peer/sources/local.rs @@ -2,6 +2,7 @@ use super::{utils, BlockSource}; use crate::node::types::BlockAndReceipts; use eyre::Context; use futures::{future::BoxFuture, FutureExt}; +use reth_metrics::{metrics, metrics::Counter, Metrics}; use std::path::PathBuf; use tracing::info; @@ -9,11 +10,21 @@ use tracing::info; #[derive(Debug, Clone)] pub struct LocalBlockSource { dir: PathBuf, + metrics: LocalBlockSourceMetrics, +} + +#[derive(Metrics, Clone)] +#[metrics(scope = "block_source.local")] +pub struct LocalBlockSourceMetrics { + /// How many times the local block source is polling for a block + pub polling_attempt: Counter, + /// How many times the local block source is fetched from the local filesystem + pub fetched: Counter, } impl LocalBlockSource { pub fn new(dir: impl Into) -> Self { - Self { dir: dir.into() } + Self { dir: dir.into(), metrics: LocalBlockSourceMetrics::default() } } async fn pick_path_with_highest_number(dir: PathBuf, is_dir: bool) -> Option<(u64, String)> { @@ -31,13 +42,17 @@ impl LocalBlockSource { impl BlockSource for LocalBlockSource { fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result> { let dir = self.dir.clone(); + let metrics = self.metrics.clone(); async move { let path = dir.join(utils::rmp_path(height)); + metrics.polling_attempt.increment(1); + let file = tokio::fs::read(&path) .await .wrap_err_with(|| format!("Failed to read block from {path:?}"))?; let mut decoder = lz4_flex::frame::FrameDecoder::new(&file[..]); let blocks: Vec = rmp_serde::from_read(&mut decoder)?; + metrics.fetched.increment(1); Ok(blocks[0].clone()) } .boxed() diff --git a/src/pseudo_peer/sources/s3.rs b/src/pseudo_peer/sources/s3.rs index 85999796a..b5516d6b5 100644 --- a/src/pseudo_peer/sources/s3.rs +++ b/src/pseudo_peer/sources/s3.rs @@ -2,6 +2,7 @@ use super::{utils, BlockSource}; use crate::node::types::BlockAndReceipts; use aws_sdk_s3::types::RequestPayer; use futures::{future::BoxFuture, FutureExt}; +use reth_metrics::{metrics, metrics::Counter, Metrics}; use std::{sync::Arc, time::Duration}; use tracing::info; @@ -11,11 +12,26 @@ pub struct S3BlockSource { client: Arc, bucket: String, polling_interval: Duration, + metrics: S3BlockSourceMetrics, +} + +#[derive(Metrics, Clone)] +#[metrics(scope = "block_source.s3")] +pub struct S3BlockSourceMetrics { + /// How many times the S3 block source is polling for a block + pub polling_attempt: Counter, + /// How many times the S3 block source has polled a block + pub fetched: Counter, } impl S3BlockSource { pub fn new(client: aws_sdk_s3::Client, bucket: String, polling_interval: Duration) -> Self { - Self { client: client.into(), bucket, polling_interval } + Self { + client: client.into(), + bucket, + polling_interval, + metrics: S3BlockSourceMetrics::default(), + } } async fn pick_path_with_highest_number( @@ -52,14 +68,18 @@ impl BlockSource for S3BlockSource { fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result> { let client = self.client.clone(); let bucket = self.bucket.clone(); + let metrics = self.metrics.clone(); async move { let path = utils::rmp_path(height); + metrics.polling_attempt.increment(1); + let request = client .get_object() .request_payer(RequestPayer::Requester) .bucket(&bucket) .key(path); let response = request.send().await?; + metrics.fetched.increment(1); let bytes = response.body.collect().await?.into_bytes(); let mut decoder = lz4_flex::frame::FrameDecoder::new(&bytes[..]); let blocks: Vec = rmp_serde::from_read(&mut decoder)?;