mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 02:49:55 +00:00
feat: Add block source metrics
This commit is contained in:
@ -14,6 +14,7 @@ use self::{
|
||||
use super::{BlockSource, BlockSourceBoxed};
|
||||
use crate::node::types::BlockAndReceipts;
|
||||
use futures::future::BoxFuture;
|
||||
use reth_metrics::{metrics, metrics::Counter, Metrics};
|
||||
use std::{
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
@ -41,6 +42,16 @@ pub struct HlNodeBlockSource {
|
||||
pub local_blocks_cache: Arc<Mutex<LocalBlocksCache>>,
|
||||
pub last_local_fetch: Arc<Mutex<Option<(u64, OffsetDateTime)>>>,
|
||||
pub args: HlNodeBlockSourceArgs,
|
||||
pub metrics: HlNodeBlockSourceMetrics,
|
||||
}
|
||||
|
||||
#[derive(Metrics, Clone)]
|
||||
#[metrics(scope = "block_source.hl_node")]
|
||||
pub struct HlNodeBlockSourceMetrics {
|
||||
/// How many times the HL node block source is polling for a block
|
||||
pub fetched_from_hl_node: Counter,
|
||||
/// How many times the HL node block source is fetched from the fallback
|
||||
pub fetched_from_fallback: Counter,
|
||||
}
|
||||
|
||||
impl BlockSource for HlNodeBlockSource {
|
||||
@ -49,11 +60,13 @@ impl BlockSource for HlNodeBlockSource {
|
||||
let args = self.args.clone();
|
||||
let local_blocks_cache = self.local_blocks_cache.clone();
|
||||
let last_local_fetch = self.last_local_fetch.clone();
|
||||
let metrics = self.metrics.clone();
|
||||
Box::pin(async move {
|
||||
let now = OffsetDateTime::now_utc();
|
||||
|
||||
if let Some(block) = Self::try_collect_local_block(local_blocks_cache, height).await {
|
||||
Self::update_last_fetch(last_local_fetch, height, now).await;
|
||||
metrics.fetched_from_hl_node.increment(1);
|
||||
return Ok(block);
|
||||
}
|
||||
|
||||
@ -68,6 +81,7 @@ impl BlockSource for HlNodeBlockSource {
|
||||
}
|
||||
|
||||
let block = fallback.collect_block(height).await?;
|
||||
metrics.fetched_from_fallback.increment(1);
|
||||
Self::update_last_fetch(last_local_fetch, height, now).await;
|
||||
Ok(block)
|
||||
})
|
||||
@ -224,6 +238,7 @@ impl HlNodeBlockSource {
|
||||
args,
|
||||
local_blocks_cache: Arc::new(Mutex::new(LocalBlocksCache::new(CACHE_SIZE))),
|
||||
last_local_fetch: Arc::new(Mutex::new(None)),
|
||||
metrics: HlNodeBlockSourceMetrics::default(),
|
||||
};
|
||||
block_source.run(next_block_number).await.unwrap();
|
||||
block_source
|
||||
|
||||
@ -2,6 +2,7 @@ use super::{utils, BlockSource};
|
||||
use crate::node::types::BlockAndReceipts;
|
||||
use eyre::Context;
|
||||
use futures::{future::BoxFuture, FutureExt};
|
||||
use reth_metrics::{metrics, metrics::Counter, Metrics};
|
||||
use std::path::PathBuf;
|
||||
use tracing::info;
|
||||
|
||||
@ -9,11 +10,21 @@ use tracing::info;
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct LocalBlockSource {
|
||||
dir: PathBuf,
|
||||
metrics: LocalBlockSourceMetrics,
|
||||
}
|
||||
|
||||
#[derive(Metrics, Clone)]
|
||||
#[metrics(scope = "block_source.local")]
|
||||
pub struct LocalBlockSourceMetrics {
|
||||
/// How many times the local block source is polling for a block
|
||||
pub polling_attempt: Counter,
|
||||
/// How many times the local block source is fetched from the local filesystem
|
||||
pub fetched: Counter,
|
||||
}
|
||||
|
||||
impl LocalBlockSource {
|
||||
pub fn new(dir: impl Into<PathBuf>) -> Self {
|
||||
Self { dir: dir.into() }
|
||||
Self { dir: dir.into(), metrics: LocalBlockSourceMetrics::default() }
|
||||
}
|
||||
|
||||
async fn pick_path_with_highest_number(dir: PathBuf, is_dir: bool) -> Option<(u64, String)> {
|
||||
@ -31,13 +42,17 @@ impl LocalBlockSource {
|
||||
impl BlockSource for LocalBlockSource {
|
||||
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> {
|
||||
let dir = self.dir.clone();
|
||||
let metrics = self.metrics.clone();
|
||||
async move {
|
||||
let path = dir.join(utils::rmp_path(height));
|
||||
metrics.polling_attempt.increment(1);
|
||||
|
||||
let file = tokio::fs::read(&path)
|
||||
.await
|
||||
.wrap_err_with(|| format!("Failed to read block from {path:?}"))?;
|
||||
let mut decoder = lz4_flex::frame::FrameDecoder::new(&file[..]);
|
||||
let blocks: Vec<BlockAndReceipts> = rmp_serde::from_read(&mut decoder)?;
|
||||
metrics.fetched.increment(1);
|
||||
Ok(blocks[0].clone())
|
||||
}
|
||||
.boxed()
|
||||
|
||||
@ -2,6 +2,7 @@ use super::{utils, BlockSource};
|
||||
use crate::node::types::BlockAndReceipts;
|
||||
use aws_sdk_s3::types::RequestPayer;
|
||||
use futures::{future::BoxFuture, FutureExt};
|
||||
use reth_metrics::{metrics, metrics::Counter, Metrics};
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use tracing::info;
|
||||
|
||||
@ -11,11 +12,26 @@ pub struct S3BlockSource {
|
||||
client: Arc<aws_sdk_s3::Client>,
|
||||
bucket: String,
|
||||
polling_interval: Duration,
|
||||
metrics: S3BlockSourceMetrics,
|
||||
}
|
||||
|
||||
#[derive(Metrics, Clone)]
|
||||
#[metrics(scope = "block_source.s3")]
|
||||
pub struct S3BlockSourceMetrics {
|
||||
/// How many times the S3 block source is polling for a block
|
||||
pub polling_attempt: Counter,
|
||||
/// How many times the S3 block source has polled a block
|
||||
pub fetched: Counter,
|
||||
}
|
||||
|
||||
impl S3BlockSource {
|
||||
pub fn new(client: aws_sdk_s3::Client, bucket: String, polling_interval: Duration) -> Self {
|
||||
Self { client: client.into(), bucket, polling_interval }
|
||||
Self {
|
||||
client: client.into(),
|
||||
bucket,
|
||||
polling_interval,
|
||||
metrics: S3BlockSourceMetrics::default(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn pick_path_with_highest_number(
|
||||
@ -52,14 +68,18 @@ impl BlockSource for S3BlockSource {
|
||||
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> {
|
||||
let client = self.client.clone();
|
||||
let bucket = self.bucket.clone();
|
||||
let metrics = self.metrics.clone();
|
||||
async move {
|
||||
let path = utils::rmp_path(height);
|
||||
metrics.polling_attempt.increment(1);
|
||||
|
||||
let request = client
|
||||
.get_object()
|
||||
.request_payer(RequestPayer::Requester)
|
||||
.bucket(&bucket)
|
||||
.key(path);
|
||||
let response = request.send().await?;
|
||||
metrics.fetched.increment(1);
|
||||
let bytes = response.body.collect().await?.into_bytes();
|
||||
let mut decoder = lz4_flex::frame::FrameDecoder::new(&bytes[..]);
|
||||
let blocks: Vec<BlockAndReceipts> = rmp_serde::from_read(&mut decoder)?;
|
||||
|
||||
Reference in New Issue
Block a user