feat: gracefully shutdown prometheus server (#7728)

Co-authored-by: Oliver Nordbjerg <hi@notbjerg.me>
Co-authored-by: Oliver Nordbjerg <onbjerg@users.noreply.github.com>
This commit is contained in:
Rupam Dey
2024-04-20 15:14:17 +05:30
committed by GitHub
parent 615e90b0f8
commit 6728a5518a
6 changed files with 34 additions and 9 deletions

View File

@ -148,7 +148,7 @@ impl<Ext: clap::Args + fmt::Debug> Cli<Ext> {
Commands::Import(command) => runner.run_blocking_until_ctrl_c(command.execute()), Commands::Import(command) => runner.run_blocking_until_ctrl_c(command.execute()),
Commands::DumpGenesis(command) => runner.run_blocking_until_ctrl_c(command.execute()), Commands::DumpGenesis(command) => runner.run_blocking_until_ctrl_c(command.execute()),
Commands::Db(command) => runner.run_blocking_until_ctrl_c(command.execute()), Commands::Db(command) => runner.run_blocking_until_ctrl_c(command.execute()),
Commands::Stage(command) => runner.run_blocking_until_ctrl_c(command.execute()), Commands::Stage(command) => runner.run_command_until_exit(|ctx| command.execute(ctx)),
Commands::P2P(command) => runner.run_until_ctrl_c(command.execute()), Commands::P2P(command) => runner.run_until_ctrl_c(command.execute()),
Commands::TestVectors(command) => runner.run_until_ctrl_c(command.execute()), Commands::TestVectors(command) => runner.run_until_ctrl_c(command.execute()),
Commands::Config(command) => runner.run_until_ctrl_c(command.execute()), Commands::Config(command) => runner.run_until_ctrl_c(command.execute()),

View File

@ -1,6 +1,7 @@
//! `reth stage` command //! `reth stage` command
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use reth_cli_runner::CliContext;
pub mod drop; pub mod drop;
pub mod dump; pub mod dump;
@ -34,9 +35,9 @@ pub enum Subcommands {
impl Command { impl Command {
/// Execute `stage` command /// Execute `stage` command
pub async fn execute(self) -> eyre::Result<()> { pub async fn execute(self, ctx: CliContext) -> eyre::Result<()> {
match self.command { match self.command {
Subcommands::Run(command) => command.execute().await, Subcommands::Run(command) => command.execute(ctx).await,
Subcommands::Drop(command) => command.execute().await, Subcommands::Drop(command) => command.execute().await,
Subcommands::Dump(command) => command.execute().await, Subcommands::Dump(command) => command.execute().await,
Subcommands::Unwind(command) => command.execute().await, Subcommands::Unwind(command) => command.execute().await,

View File

@ -14,6 +14,7 @@ use crate::{
}; };
use clap::Parser; use clap::Parser;
use reth_beacon_consensus::BeaconConsensus; use reth_beacon_consensus::BeaconConsensus;
use reth_cli_runner::CliContext;
use reth_config::{config::EtlConfig, Config}; use reth_config::{config::EtlConfig, Config};
use reth_db::init_db; use reth_db::init_db;
use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder; use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder;
@ -120,7 +121,7 @@ pub struct Command {
impl Command { impl Command {
/// Execute `stage` command /// Execute `stage` command
pub async fn execute(self) -> eyre::Result<()> { pub async fn execute(self, ctx: CliContext) -> eyre::Result<()> {
// Raise the fd limit of the process. // Raise the fd limit of the process.
// Does not do anything on windows. // Does not do anything on windows.
let _ = fdlimit::raise_fd_limit(); let _ = fdlimit::raise_fd_limit();
@ -154,6 +155,7 @@ impl Command {
Arc::clone(&db), Arc::clone(&db),
factory.static_file_provider(), factory.static_file_provider(),
metrics_process::Collector::default(), metrics_process::Collector::default(),
ctx.task_executor,
) )
.await?; .await?;
} }

View File

@ -514,6 +514,7 @@ where
prometheus_handle, prometheus_handle,
database.clone(), database.clone(),
provider_factory.static_file_provider(), provider_factory.static_file_provider(),
executor.clone(),
) )
.await?; .await?;

View File

@ -12,6 +12,7 @@ use metrics_util::layers::{PrefixLayer, Stack};
use reth_db::database_metrics::DatabaseMetrics; use reth_db::database_metrics::DatabaseMetrics;
use reth_metrics::metrics::Unit; use reth_metrics::metrics::Unit;
use reth_provider::providers::StaticFileProvider; use reth_provider::providers::StaticFileProvider;
use reth_tasks::TaskExecutor;
use std::{convert::Infallible, net::SocketAddr, sync::Arc}; use std::{convert::Infallible, net::SocketAddr, sync::Arc};
pub(crate) trait Hook: Fn() + Send + Sync {} pub(crate) trait Hook: Fn() + Send + Sync {}
@ -39,13 +40,19 @@ pub(crate) async fn serve_with_hooks<F: Hook + 'static>(
listen_addr: SocketAddr, listen_addr: SocketAddr,
handle: PrometheusHandle, handle: PrometheusHandle,
hooks: impl IntoIterator<Item = F>, hooks: impl IntoIterator<Item = F>,
task_executor: TaskExecutor,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
let hooks: Vec<_> = hooks.into_iter().collect(); let hooks: Vec<_> = hooks.into_iter().collect();
// Start endpoint // Start endpoint
start_endpoint(listen_addr, handle, Arc::new(move || hooks.iter().for_each(|hook| hook()))) start_endpoint(
.await listen_addr,
.wrap_err("Could not start Prometheus endpoint")?; handle,
Arc::new(move || hooks.iter().for_each(|hook| hook())),
task_executor,
)
.await
.wrap_err("Could not start Prometheus endpoint")?;
Ok(()) Ok(())
} }
@ -55,6 +62,7 @@ async fn start_endpoint<F: Hook + 'static>(
listen_addr: SocketAddr, listen_addr: SocketAddr,
handle: PrometheusHandle, handle: PrometheusHandle,
hook: Arc<F>, hook: Arc<F>,
task_executor: TaskExecutor,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
let make_svc = make_service_fn(move |_| { let make_svc = make_service_fn(move |_| {
let handle = handle.clone(); let handle = handle.clone();
@ -67,10 +75,20 @@ async fn start_endpoint<F: Hook + 'static>(
})) }))
} }
}); });
let server = let server =
Server::try_bind(&listen_addr).wrap_err("Could not bind to address")?.serve(make_svc); Server::try_bind(&listen_addr).wrap_err("Could not bind to address")?.serve(make_svc);
tokio::spawn(async move { server.await.expect("Metrics endpoint crashed") }); task_executor.spawn_with_graceful_shutdown_signal(move |signal| async move {
if let Err(error) = server
.with_graceful_shutdown(async move {
let _ = signal.await;
})
.await
{
tracing::error!(%error, "metrics endpoint crashed")
}
});
Ok(()) Ok(())
} }
@ -82,6 +100,7 @@ pub async fn serve<Metrics>(
db: Metrics, db: Metrics,
static_file_provider: StaticFileProvider, static_file_provider: StaticFileProvider,
process: metrics_process::Collector, process: metrics_process::Collector,
task_executor: TaskExecutor,
) -> eyre::Result<()> ) -> eyre::Result<()>
where where
Metrics: DatabaseMetrics + 'static + Send + Sync, Metrics: DatabaseMetrics + 'static + Send + Sync,
@ -102,7 +121,7 @@ where
Box::new(collect_memory_stats), Box::new(collect_memory_stats),
Box::new(collect_io_stats), Box::new(collect_io_stats),
]; ];
serve_with_hooks(listen_addr, handle, hooks).await?; serve_with_hooks(listen_addr, handle, hooks, task_executor).await?;
// We describe the metrics after the recorder is installed, otherwise this information is not // We describe the metrics after the recorder is installed, otherwise this information is not
// registered // registered

View File

@ -365,6 +365,7 @@ impl NodeConfig {
prometheus_handle: PrometheusHandle, prometheus_handle: PrometheusHandle,
db: Metrics, db: Metrics,
static_file_provider: StaticFileProvider, static_file_provider: StaticFileProvider,
task_executor: TaskExecutor,
) -> eyre::Result<()> ) -> eyre::Result<()>
where where
Metrics: DatabaseMetrics + 'static + Send + Sync, Metrics: DatabaseMetrics + 'static + Send + Sync,
@ -377,6 +378,7 @@ impl NodeConfig {
db, db,
static_file_provider, static_file_provider,
metrics_process::Collector::default(), metrics_process::Collector::default(),
task_executor,
) )
.await?; .await?;
} }