mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat: make NodeState generic over DB with DatabaseMetadata (#5691)
This commit is contained in:
@ -3,7 +3,7 @@
|
||||
use crate::node::cl_events::ConsensusLayerHealthEvent;
|
||||
use futures::Stream;
|
||||
use reth_beacon_consensus::BeaconConsensusEngineEvent;
|
||||
use reth_db::DatabaseEnv;
|
||||
use reth_db::{database::Database, database_metrics::DatabaseMetadata};
|
||||
use reth_interfaces::consensus::ForkchoiceState;
|
||||
use reth_network::{NetworkEvent, NetworkHandle};
|
||||
use reth_network_api::PeersInfo;
|
||||
@ -17,7 +17,6 @@ use std::{
|
||||
fmt::{Display, Formatter},
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
@ -28,11 +27,11 @@ use tracing::{info, warn};
|
||||
const INFO_MESSAGE_INTERVAL: Duration = Duration::from_secs(25);
|
||||
|
||||
/// The current high-level state of the node.
|
||||
struct NodeState {
|
||||
struct NodeState<DB> {
|
||||
/// Database environment.
|
||||
/// Used for freelist calculation reported in the "Status" log message.
|
||||
/// See [EventHandler::poll].
|
||||
db: Arc<DatabaseEnv>,
|
||||
db: DB,
|
||||
/// Connection to the network.
|
||||
network: Option<NetworkHandle>,
|
||||
/// The stage currently being executed.
|
||||
@ -41,12 +40,8 @@ struct NodeState {
|
||||
latest_block: Option<BlockNumber>,
|
||||
}
|
||||
|
||||
impl NodeState {
|
||||
fn new(
|
||||
db: Arc<DatabaseEnv>,
|
||||
network: Option<NetworkHandle>,
|
||||
latest_block: Option<BlockNumber>,
|
||||
) -> Self {
|
||||
impl<DB> NodeState<DB> {
|
||||
fn new(db: DB, network: Option<NetworkHandle>, latest_block: Option<BlockNumber>) -> Self {
|
||||
Self { db, network, current_stage: None, latest_block }
|
||||
}
|
||||
|
||||
@ -200,6 +195,12 @@ impl NodeState {
|
||||
}
|
||||
}
|
||||
|
||||
impl<DB: DatabaseMetadata> NodeState<DB> {
|
||||
fn freelist(&self) -> Option<usize> {
|
||||
self.db.metadata().freelist_size()
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper type for formatting of optional fields:
|
||||
/// - If [Some(x)], then `x` is written
|
||||
/// - If [None], then `None` is written
|
||||
@ -270,13 +271,14 @@ impl From<PrunerEvent> for NodeEvent {
|
||||
|
||||
/// Displays relevant information to the user from components of the node, and periodically
|
||||
/// displays the high-level status of the node.
|
||||
pub async fn handle_events<E>(
|
||||
pub async fn handle_events<E, DB>(
|
||||
network: Option<NetworkHandle>,
|
||||
latest_block_number: Option<BlockNumber>,
|
||||
events: E,
|
||||
db: Arc<DatabaseEnv>,
|
||||
db: DB,
|
||||
) where
|
||||
E: Stream<Item = NodeEvent> + Unpin,
|
||||
DB: DatabaseMetadata + Database + 'static,
|
||||
{
|
||||
let state = NodeState::new(db, network, latest_block_number);
|
||||
|
||||
@ -290,17 +292,18 @@ pub async fn handle_events<E>(
|
||||
|
||||
/// Handles events emitted by the node and logs them accordingly.
|
||||
#[pin_project::pin_project]
|
||||
struct EventHandler<E> {
|
||||
state: NodeState,
|
||||
struct EventHandler<E, DB> {
|
||||
state: NodeState<DB>,
|
||||
#[pin]
|
||||
events: E,
|
||||
#[pin]
|
||||
info_interval: Interval,
|
||||
}
|
||||
|
||||
impl<E> Future for EventHandler<E>
|
||||
impl<E, DB> Future for EventHandler<E, DB>
|
||||
where
|
||||
E: Stream<Item = NodeEvent> + Unpin,
|
||||
DB: DatabaseMetadata + Database + 'static,
|
||||
{
|
||||
type Output = ();
|
||||
|
||||
@ -308,7 +311,7 @@ where
|
||||
let mut this = self.project();
|
||||
|
||||
while this.info_interval.poll_tick(cx).is_ready() {
|
||||
let freelist = OptionalField(this.state.db.freelist().ok());
|
||||
let freelist = OptionalField(this.state.freelist());
|
||||
|
||||
if let Some(CurrentStage { stage_id, eta, checkpoint, target }) =
|
||||
&this.state.current_stage
|
||||
|
||||
@ -1,10 +1,38 @@
|
||||
use metrics::{counter, gauge, histogram, Label};
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Represents a type that can report metrics, used mainly with the database. The `report_metrics`
|
||||
/// method can be used as a prometheus hook.
|
||||
pub trait DatabaseMetrics {
|
||||
/// Reports metrics for the database.
|
||||
fn report_metrics(&self);
|
||||
fn report_metrics(&self) {
|
||||
for (name, value, labels) in self.gauge_metrics() {
|
||||
gauge!(name, value, labels);
|
||||
}
|
||||
|
||||
for (name, value, labels) in self.counter_metrics() {
|
||||
counter!(name, value, labels);
|
||||
}
|
||||
|
||||
for (name, value, labels) in self.histogram_metrics() {
|
||||
histogram!(name, value, labels);
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a list of [Gauge](metrics::Gauge) metrics for the database.
|
||||
fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
|
||||
vec![]
|
||||
}
|
||||
|
||||
/// Returns a list of [Counter](metrics::Counter) metrics for the database.
|
||||
fn counter_metrics(&self) -> Vec<(&'static str, u64, Vec<Label>)> {
|
||||
vec![]
|
||||
}
|
||||
|
||||
/// Returns a list of [Histogram](metrics::Histogram) metrics for the database.
|
||||
fn histogram_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
|
||||
vec![]
|
||||
}
|
||||
}
|
||||
|
||||
impl<DB: DatabaseMetrics> DatabaseMetrics for Arc<DB> {
|
||||
@ -12,3 +40,35 @@ impl<DB: DatabaseMetrics> DatabaseMetrics for Arc<DB> {
|
||||
<DB as DatabaseMetrics>::report_metrics(self)
|
||||
}
|
||||
}
|
||||
|
||||
/// The type used to store metadata about the database.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct DatabaseMetadataValue {
|
||||
/// The freelist size
|
||||
freelist_size: Option<usize>,
|
||||
}
|
||||
|
||||
impl DatabaseMetadataValue {
|
||||
/// Creates a new [DatabaseMetadataValue] with the given freelist size.
|
||||
pub fn new(freelist_size: Option<usize>) -> Self {
|
||||
Self { freelist_size }
|
||||
}
|
||||
|
||||
/// Returns the freelist size, if available.
|
||||
pub fn freelist_size(&self) -> Option<usize> {
|
||||
self.freelist_size
|
||||
}
|
||||
}
|
||||
|
||||
/// Includes a method to return a [DatabaseMetadataValue] type, which can be used to dynamically
|
||||
/// retrieve information about the database.
|
||||
pub trait DatabaseMetadata {
|
||||
/// Returns a metadata type, [DatabaseMetadataValue] for the database.
|
||||
fn metadata(&self) -> DatabaseMetadataValue;
|
||||
}
|
||||
|
||||
impl<DB: DatabaseMetadata> DatabaseMetadata for Arc<DB> {
|
||||
fn metadata(&self) -> DatabaseMetadataValue {
|
||||
<DB as DatabaseMetadata>::metadata(self)
|
||||
}
|
||||
}
|
||||
|
||||
@ -2,13 +2,13 @@
|
||||
|
||||
use crate::{
|
||||
database::Database,
|
||||
database_metrics::DatabaseMetrics,
|
||||
database_metrics::{DatabaseMetadata, DatabaseMetadataValue, DatabaseMetrics},
|
||||
tables::{TableType, Tables},
|
||||
utils::default_page_size,
|
||||
DatabaseError,
|
||||
};
|
||||
use eyre::Context;
|
||||
use metrics::gauge;
|
||||
use metrics::{gauge, Label};
|
||||
use reth_interfaces::db::LogLevel;
|
||||
use reth_libmdbx::{
|
||||
DatabaseFlags, Environment, EnvironmentFlags, Geometry, Mode, PageSize, SyncMode, RO, RW,
|
||||
@ -65,39 +65,76 @@ impl Database for DatabaseEnv {
|
||||
|
||||
impl DatabaseMetrics for DatabaseEnv {
|
||||
fn report_metrics(&self) {
|
||||
let _ = self.view(|tx| {
|
||||
for table in Tables::ALL.iter().map(|table| table.name()) {
|
||||
let table_db =
|
||||
tx.inner.open_db(Some(table)).wrap_err("Could not open db.")?;
|
||||
for (name, value, labels) in self.gauge_metrics() {
|
||||
gauge!(name, value, labels);
|
||||
}
|
||||
}
|
||||
|
||||
let stats = tx
|
||||
.inner
|
||||
.db_stat(&table_db)
|
||||
.wrap_err(format!("Could not find table: {table}"))?;
|
||||
fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
|
||||
let mut metrics = Vec::new();
|
||||
|
||||
let page_size = stats.page_size() as usize;
|
||||
let leaf_pages = stats.leaf_pages();
|
||||
let branch_pages = stats.branch_pages();
|
||||
let overflow_pages = stats.overflow_pages();
|
||||
let num_pages = leaf_pages + branch_pages + overflow_pages;
|
||||
let table_size = page_size * num_pages;
|
||||
let entries = stats.entries();
|
||||
let _ = self
|
||||
.view(|tx| {
|
||||
for table in Tables::ALL.iter().map(|table| table.name()) {
|
||||
let table_db = tx.inner.open_db(Some(table)).wrap_err("Could not open db.")?;
|
||||
|
||||
gauge!("db.table_size", table_size as f64, "table" => table);
|
||||
gauge!("db.table_pages", leaf_pages as f64, "table" => table, "type" => "leaf");
|
||||
gauge!("db.table_pages", branch_pages as f64, "table" => table, "type" => "branch");
|
||||
gauge!("db.table_pages", overflow_pages as f64, "table" => table, "type" => "overflow");
|
||||
gauge!("db.table_entries", entries as f64, "table" => table);
|
||||
}
|
||||
let stats = tx
|
||||
.inner
|
||||
.db_stat(&table_db)
|
||||
.wrap_err(format!("Could not find table: {table}"))?;
|
||||
|
||||
Ok::<(), eyre::Report>(())
|
||||
}).map_err(|error| error!(?error, "Failed to read db table stats"));
|
||||
let page_size = stats.page_size() as usize;
|
||||
let leaf_pages = stats.leaf_pages();
|
||||
let branch_pages = stats.branch_pages();
|
||||
let overflow_pages = stats.overflow_pages();
|
||||
let num_pages = leaf_pages + branch_pages + overflow_pages;
|
||||
let table_size = page_size * num_pages;
|
||||
let entries = stats.entries();
|
||||
|
||||
metrics.push((
|
||||
"db.table_size",
|
||||
table_size as f64,
|
||||
vec![Label::new("table", table)],
|
||||
));
|
||||
metrics.push((
|
||||
"db.table_pages",
|
||||
leaf_pages as f64,
|
||||
vec![Label::new("table", table), Label::new("type", "leaf")],
|
||||
));
|
||||
metrics.push((
|
||||
"db.table_pages",
|
||||
branch_pages as f64,
|
||||
vec![Label::new("table", table), Label::new("type", "branch")],
|
||||
));
|
||||
metrics.push((
|
||||
"db.table_pages",
|
||||
overflow_pages as f64,
|
||||
vec![Label::new("table", table), Label::new("type", "overflow")],
|
||||
));
|
||||
metrics.push((
|
||||
"db.table_entries",
|
||||
entries as f64,
|
||||
vec![Label::new("table", table)],
|
||||
));
|
||||
}
|
||||
|
||||
Ok::<(), eyre::Report>(())
|
||||
})
|
||||
.map_err(|error| error!(?error, "Failed to read db table stats"));
|
||||
|
||||
if let Ok(freelist) =
|
||||
self.freelist().map_err(|error| error!(?error, "Failed to read db.freelist"))
|
||||
{
|
||||
gauge!("db.freelist", freelist as f64);
|
||||
metrics.push(("db.freelist", freelist as f64, vec![]));
|
||||
}
|
||||
|
||||
metrics
|
||||
}
|
||||
}
|
||||
|
||||
impl DatabaseMetadata for DatabaseEnv {
|
||||
fn metadata(&self) -> DatabaseMetadataValue {
|
||||
DatabaseMetadataValue::new(self.freelist().ok())
|
||||
}
|
||||
}
|
||||
|
||||
@ -190,7 +227,7 @@ impl DatabaseEnv {
|
||||
LogLevel::Extra => 7,
|
||||
});
|
||||
} else {
|
||||
return Err(DatabaseError::LogLevelUnavailable(log_level))
|
||||
return Err(DatabaseError::LogLevelUnavailable(log_level));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -153,7 +153,10 @@ pub fn open_db(path: &Path, log_level: Option<LogLevel>) -> eyre::Result<Databas
|
||||
#[cfg(any(test, feature = "test-utils"))]
|
||||
pub mod test_utils {
|
||||
use super::*;
|
||||
use crate::{database::Database, database_metrics::DatabaseMetrics};
|
||||
use crate::{
|
||||
database::Database,
|
||||
database_metrics::{DatabaseMetadata, DatabaseMetadataValue, DatabaseMetrics},
|
||||
};
|
||||
use std::{path::PathBuf, sync::Arc};
|
||||
|
||||
/// Error during database open
|
||||
@ -216,6 +219,12 @@ pub mod test_utils {
|
||||
}
|
||||
}
|
||||
|
||||
impl<DB: DatabaseMetadata> DatabaseMetadata for TempDatabase<DB> {
|
||||
fn metadata(&self) -> DatabaseMetadataValue {
|
||||
self.db().metadata()
|
||||
}
|
||||
}
|
||||
|
||||
/// Create read/write database for testing
|
||||
pub fn create_test_rw_db() -> Arc<TempDatabase<DatabaseEnv>> {
|
||||
let path = tempfile::TempDir::new().expect(ERROR_TEMPDIR).into_path();
|
||||
|
||||
Reference in New Issue
Block a user