Merge pull request #60 from hl-archive-node/feat/block-metrics

feat: Add block source metrics
This commit is contained in:
sprites0
2025-09-11 18:56:18 -04:00
committed by GitHub
5 changed files with 54 additions and 2 deletions

1
Cargo.lock generated
View File

@ -9310,6 +9310,7 @@ dependencies = [
"reth-ethereum-primitives", "reth-ethereum-primitives",
"reth-evm", "reth-evm",
"reth-evm-ethereum", "reth-evm-ethereum",
"reth-metrics",
"reth-network", "reth-network",
"reth-network-api", "reth-network-api",
"reth-network-p2p", "reth-network-p2p",

View File

@ -63,6 +63,7 @@ reth-trie-db = { git = "https://github.com/sprites0/reth", rev = "a690ef25b56039
reth-codecs = { git = "https://github.com/sprites0/reth", rev = "a690ef25b56039195e7e4a4abd01c78aedcc73fb" } reth-codecs = { git = "https://github.com/sprites0/reth", rev = "a690ef25b56039195e7e4a4abd01c78aedcc73fb" }
reth-transaction-pool = { git = "https://github.com/sprites0/reth", rev = "a690ef25b56039195e7e4a4abd01c78aedcc73fb" } reth-transaction-pool = { git = "https://github.com/sprites0/reth", rev = "a690ef25b56039195e7e4a4abd01c78aedcc73fb" }
reth-stages-types = { git = "https://github.com/sprites0/reth", rev = "a690ef25b56039195e7e4a4abd01c78aedcc73fb" } reth-stages-types = { git = "https://github.com/sprites0/reth", rev = "a690ef25b56039195e7e4a4abd01c78aedcc73fb" }
reth-metrics = { git = "https://github.com/sprites0/reth", rev = "a690ef25b56039195e7e4a4abd01c78aedcc73fb" }
revm = { version = "28.0.1", default-features = false } revm = { version = "28.0.1", default-features = false }
# alloy dependencies # alloy dependencies

View File

@ -14,6 +14,7 @@ use self::{
use super::{BlockSource, BlockSourceBoxed}; use super::{BlockSource, BlockSourceBoxed};
use crate::node::types::BlockAndReceipts; use crate::node::types::BlockAndReceipts;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use reth_metrics::{metrics, metrics::Counter, Metrics};
use std::{ use std::{
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::Arc, sync::Arc,
@ -41,6 +42,16 @@ pub struct HlNodeBlockSource {
pub local_blocks_cache: Arc<Mutex<LocalBlocksCache>>, pub local_blocks_cache: Arc<Mutex<LocalBlocksCache>>,
pub last_local_fetch: Arc<Mutex<Option<(u64, OffsetDateTime)>>>, pub last_local_fetch: Arc<Mutex<Option<(u64, OffsetDateTime)>>>,
pub args: HlNodeBlockSourceArgs, pub args: HlNodeBlockSourceArgs,
pub metrics: HlNodeBlockSourceMetrics,
}
#[derive(Metrics, Clone)]
#[metrics(scope = "block_source.hl_node")]
pub struct HlNodeBlockSourceMetrics {
/// How many times the HL node block source is polling for a block
pub fetched_from_hl_node: Counter,
/// How many times the HL node block source is fetched from the fallback
pub fetched_from_fallback: Counter,
} }
impl BlockSource for HlNodeBlockSource { impl BlockSource for HlNodeBlockSource {
@ -49,11 +60,13 @@ impl BlockSource for HlNodeBlockSource {
let args = self.args.clone(); let args = self.args.clone();
let local_blocks_cache = self.local_blocks_cache.clone(); let local_blocks_cache = self.local_blocks_cache.clone();
let last_local_fetch = self.last_local_fetch.clone(); let last_local_fetch = self.last_local_fetch.clone();
let metrics = self.metrics.clone();
Box::pin(async move { Box::pin(async move {
let now = OffsetDateTime::now_utc(); let now = OffsetDateTime::now_utc();
if let Some(block) = Self::try_collect_local_block(local_blocks_cache, height).await { if let Some(block) = Self::try_collect_local_block(local_blocks_cache, height).await {
Self::update_last_fetch(last_local_fetch, height, now).await; Self::update_last_fetch(last_local_fetch, height, now).await;
metrics.fetched_from_hl_node.increment(1);
return Ok(block); return Ok(block);
} }
@ -68,6 +81,7 @@ impl BlockSource for HlNodeBlockSource {
} }
let block = fallback.collect_block(height).await?; let block = fallback.collect_block(height).await?;
metrics.fetched_from_fallback.increment(1);
Self::update_last_fetch(last_local_fetch, height, now).await; Self::update_last_fetch(last_local_fetch, height, now).await;
Ok(block) Ok(block)
}) })
@ -224,6 +238,7 @@ impl HlNodeBlockSource {
args, args,
local_blocks_cache: Arc::new(Mutex::new(LocalBlocksCache::new(CACHE_SIZE))), local_blocks_cache: Arc::new(Mutex::new(LocalBlocksCache::new(CACHE_SIZE))),
last_local_fetch: Arc::new(Mutex::new(None)), last_local_fetch: Arc::new(Mutex::new(None)),
metrics: HlNodeBlockSourceMetrics::default(),
}; };
block_source.run(next_block_number).await.unwrap(); block_source.run(next_block_number).await.unwrap();
block_source block_source

View File

@ -2,6 +2,7 @@ use super::{utils, BlockSource};
use crate::node::types::BlockAndReceipts; use crate::node::types::BlockAndReceipts;
use eyre::Context; use eyre::Context;
use futures::{future::BoxFuture, FutureExt}; use futures::{future::BoxFuture, FutureExt};
use reth_metrics::{metrics, metrics::Counter, Metrics};
use std::path::PathBuf; use std::path::PathBuf;
use tracing::info; use tracing::info;
@ -9,11 +10,21 @@ use tracing::info;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct LocalBlockSource { pub struct LocalBlockSource {
dir: PathBuf, dir: PathBuf,
metrics: LocalBlockSourceMetrics,
}
#[derive(Metrics, Clone)]
#[metrics(scope = "block_source.local")]
pub struct LocalBlockSourceMetrics {
/// How many times the local block source is polling for a block
pub polling_attempt: Counter,
/// How many times the local block source is fetched from the local filesystem
pub fetched: Counter,
} }
impl LocalBlockSource { impl LocalBlockSource {
pub fn new(dir: impl Into<PathBuf>) -> Self { pub fn new(dir: impl Into<PathBuf>) -> Self {
Self { dir: dir.into() } Self { dir: dir.into(), metrics: LocalBlockSourceMetrics::default() }
} }
async fn pick_path_with_highest_number(dir: PathBuf, is_dir: bool) -> Option<(u64, String)> { async fn pick_path_with_highest_number(dir: PathBuf, is_dir: bool) -> Option<(u64, String)> {
@ -31,13 +42,17 @@ impl LocalBlockSource {
impl BlockSource for LocalBlockSource { impl BlockSource for LocalBlockSource {
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> { fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> {
let dir = self.dir.clone(); let dir = self.dir.clone();
let metrics = self.metrics.clone();
async move { async move {
let path = dir.join(utils::rmp_path(height)); let path = dir.join(utils::rmp_path(height));
metrics.polling_attempt.increment(1);
let file = tokio::fs::read(&path) let file = tokio::fs::read(&path)
.await .await
.wrap_err_with(|| format!("Failed to read block from {path:?}"))?; .wrap_err_with(|| format!("Failed to read block from {path:?}"))?;
let mut decoder = lz4_flex::frame::FrameDecoder::new(&file[..]); let mut decoder = lz4_flex::frame::FrameDecoder::new(&file[..]);
let blocks: Vec<BlockAndReceipts> = rmp_serde::from_read(&mut decoder)?; let blocks: Vec<BlockAndReceipts> = rmp_serde::from_read(&mut decoder)?;
metrics.fetched.increment(1);
Ok(blocks[0].clone()) Ok(blocks[0].clone())
} }
.boxed() .boxed()

View File

@ -2,6 +2,7 @@ use super::{utils, BlockSource};
use crate::node::types::BlockAndReceipts; use crate::node::types::BlockAndReceipts;
use aws_sdk_s3::types::RequestPayer; use aws_sdk_s3::types::RequestPayer;
use futures::{future::BoxFuture, FutureExt}; use futures::{future::BoxFuture, FutureExt};
use reth_metrics::{metrics, metrics::Counter, Metrics};
use std::{sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};
use tracing::info; use tracing::info;
@ -11,11 +12,26 @@ pub struct S3BlockSource {
client: Arc<aws_sdk_s3::Client>, client: Arc<aws_sdk_s3::Client>,
bucket: String, bucket: String,
polling_interval: Duration, polling_interval: Duration,
metrics: S3BlockSourceMetrics,
}
#[derive(Metrics, Clone)]
#[metrics(scope = "block_source.s3")]
pub struct S3BlockSourceMetrics {
/// How many times the S3 block source is polling for a block
pub polling_attempt: Counter,
/// How many times the S3 block source has polled a block
pub fetched: Counter,
} }
impl S3BlockSource { impl S3BlockSource {
pub fn new(client: aws_sdk_s3::Client, bucket: String, polling_interval: Duration) -> Self { pub fn new(client: aws_sdk_s3::Client, bucket: String, polling_interval: Duration) -> Self {
Self { client: client.into(), bucket, polling_interval } Self {
client: client.into(),
bucket,
polling_interval,
metrics: S3BlockSourceMetrics::default(),
}
} }
async fn pick_path_with_highest_number( async fn pick_path_with_highest_number(
@ -52,14 +68,18 @@ impl BlockSource for S3BlockSource {
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> { fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> {
let client = self.client.clone(); let client = self.client.clone();
let bucket = self.bucket.clone(); let bucket = self.bucket.clone();
let metrics = self.metrics.clone();
async move { async move {
let path = utils::rmp_path(height); let path = utils::rmp_path(height);
metrics.polling_attempt.increment(1);
let request = client let request = client
.get_object() .get_object()
.request_payer(RequestPayer::Requester) .request_payer(RequestPayer::Requester)
.bucket(&bucket) .bucket(&bucket)
.key(path); .key(path);
let response = request.send().await?; let response = request.send().await?;
metrics.fetched.increment(1);
let bytes = response.body.collect().await?.into_bytes(); let bytes = response.body.collect().await?.into_bytes();
let mut decoder = lz4_flex::frame::FrameDecoder::new(&bytes[..]); let mut decoder = lz4_flex::frame::FrameDecoder::new(&bytes[..]);
let blocks: Vec<BlockAndReceipts> = rmp_serde::from_read(&mut decoder)?; let blocks: Vec<BlockAndReceipts> = rmp_serde::from_read(&mut decoder)?;