refactor: move stage commands to reth stage (#2741)

This commit is contained in:
Bjerg
2023-05-19 10:13:41 +02:00
committed by GitHub
parent 7b7805a4cc
commit 2a39b2825f
10 changed files with 270 additions and 241 deletions

View File

@ -2,7 +2,7 @@
use crate::{
chain, config, db,
dirs::{LogsDir, PlatformPath},
drop_stage, dump_stage, merkle_debug, node, p2p,
merkle_debug, node, p2p,
runner::CliRunner,
stage, test_eth_chain, test_vectors,
};
@ -32,8 +32,6 @@ pub fn run() -> eyre::Result<()> {
Commands::Import(command) => runner.run_blocking_until_ctrl_c(command.execute()),
Commands::Db(command) => runner.run_blocking_until_ctrl_c(command.execute()),
Commands::Stage(command) => runner.run_blocking_until_ctrl_c(command.execute()),
Commands::DumpStage(command) => runner.run_blocking_until_ctrl_c(command.execute()),
Commands::DropStage(command) => runner.run_blocking_until_ctrl_c(command.execute()),
Commands::P2P(command) => runner.run_until_ctrl_c(command.execute()),
Commands::TestVectors(command) => runner.run_until_ctrl_c(command.execute()),
Commands::TestEthChain(command) => runner.run_until_ctrl_c(command.execute()),
@ -57,20 +55,9 @@ pub enum Commands {
/// Database debugging utilities
#[command(name = "db")]
Db(db::Command),
/// Run a single stage.
///
/// Note that this won't use the Pipeline and as a result runs stages
/// assuming that all the data can be held in memory. It is not recommended
/// to run a stage for really large block ranges if your computer does not have
/// a lot of memory to store all the data.
/// Manipulate individual stages.
#[command(name = "stage")]
Stage(stage::Command),
/// Dumps a stage from a range into a new database.
#[command(name = "dump-stage")]
DumpStage(dump_stage::Command),
/// Drops a stage's tables from the database.
#[command(name = "drop-stage")]
DropStage(drop_stage::Command),
/// P2P Debugging utilities
#[command(name = "p2p")]
P2P(p2p::Command),

View File

@ -13,8 +13,6 @@ pub mod cli;
pub mod config;
pub mod db;
pub mod dirs;
pub mod drop_stage;
pub mod dump_stage;
pub mod merkle_debug;
pub mod node;
pub mod p2p;

View File

@ -42,11 +42,11 @@ pub struct Command {
/// - goerli
/// - sepolia
#[arg(
long,
value_name = "CHAIN_OR_PATH",
verbatim_doc_comment,
default_value = "mainnet",
value_parser = genesis_value_parser
long,
value_name = "CHAIN_OR_PATH",
verbatim_doc_comment,
default_value = "mainnet",
value_parser = genesis_value_parser
)]
chain: Arc<ChainSpec>,

View File

@ -1,4 +1,5 @@
use crate::{dump_stage::setup, utils::DbTool};
use super::setup;
use crate::utils::DbTool;
use eyre::Result;
use reth_db::{
cursor::DbCursorRO, database::Database, table::TableImporter, tables, transaction::DbTx,

View File

@ -1,4 +1,5 @@
use crate::{dump_stage::setup, utils::DbTool};
use super::setup;
use crate::utils::DbTool;
use eyre::Result;
use reth_db::{database::Database, table::TableImporter, tables};
use reth_primitives::{BlockNumber, StageCheckpoint};

View File

@ -1,4 +1,5 @@
use crate::{dump_stage::setup, utils::DbTool};
use super::setup;
use crate::utils::DbTool;
use eyre::Result;
use reth_db::{database::Database, table::TableImporter, tables};
use reth_primitives::StageCheckpoint;

View File

@ -1,4 +1,5 @@
use crate::{dump_stage::setup, utils::DbTool};
use super::setup;
use crate::utils::DbTool;
use eyre::Result;
use reth_db::{database::Database, table::TableImporter, tables};
use reth_primitives::{BlockNumber, StageCheckpoint, MAINNET};

View File

@ -1,228 +1,40 @@
//! Main `stage` command
//!
//! Stage debugging tool
use crate::{
args::{get_secret_key, NetworkArgs, StageEnum},
dirs::{DataDirPath, MaybePlatformPath},
prometheus_exporter,
};
use clap::Parser;
use reth_beacon_consensus::BeaconConsensus;
use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder;
use reth_primitives::{ChainSpec, StageCheckpoint};
use reth_provider::{ShareableDatabase, Transaction};
use reth_staged_sync::{
utils::{chainspec::chain_spec_value_parser, init::init_db},
Config,
};
use reth_stages::{
stages::{
BodyStage, ExecutionStage, ExecutionStageThresholds, MerkleStage, SenderRecoveryStage,
TransactionLookupStage,
},
ExecInput, ExecOutput, Stage, StageId, UnwindInput,
};
use std::{net::SocketAddr, path::PathBuf, sync::Arc};
use tracing::*;
//! `reth stage` command
use clap::{Parser, Subcommand};
pub mod drop;
pub mod dump;
pub mod run;
/// `reth stage` command
#[derive(Debug, Parser)]
pub struct Command {
/// The path to the configuration file to use.
#[arg(long, value_name = "FILE", verbatim_doc_comment)]
config: Option<PathBuf>,
#[clap(subcommand)]
command: Subcommands,
}
/// The path to the data dir for all reth files and subdirectories.
/// `reth stage` subcommands
#[derive(Subcommand, Debug)]
pub enum Subcommands {
/// Run a single stage.
///
/// Defaults to the OS-specific data directory:
///
/// - Linux: `$XDG_DATA_HOME/reth/` or `$HOME/.local/share/reth/`
/// - Windows: `{FOLDERID_RoamingAppData}/reth/`
/// - macOS: `$HOME/Library/Application Support/reth/`
#[arg(long, value_name = "DATA_DIR", verbatim_doc_comment, default_value_t)]
datadir: MaybePlatformPath<DataDirPath>,
/// The chain this node is running.
///
/// Possible values are either a built-in chain or the path to a chain specification file.
///
/// Built-in chains:
/// - mainnet
/// - goerli
/// - sepolia
#[arg(
long,
value_name = "CHAIN_OR_PATH",
verbatim_doc_comment,
default_value = "mainnet",
value_parser = chain_spec_value_parser
)]
chain: Arc<ChainSpec>,
/// Enable Prometheus metrics.
///
/// The metrics will be served at the given interface and port.
#[clap(long, value_name = "SOCKET")]
metrics: Option<SocketAddr>,
/// The name of the stage to run
#[arg(value_enum)]
stage: StageEnum,
/// The height to start at
#[arg(long)]
from: u64,
/// The end of the stage
#[arg(long, short)]
to: u64,
/// Batch size for stage execution and unwind
#[arg(long)]
batch_size: Option<u64>,
/// Normally, running the stage requires unwinding for stages that already
/// have been run, in order to not rewrite to the same database slots.
///
/// You can optionally skip the unwinding phase if you're syncing a block
/// range that has not been synced before.
#[arg(long, short)]
skip_unwind: bool,
#[clap(flatten)]
network: NetworkArgs,
/// Note that this won't use the Pipeline and as a result runs stages
/// assuming that all the data can be held in memory. It is not recommended
/// to run a stage for really large block ranges if your computer does not have
/// a lot of memory to store all the data.
Run(run::Command),
/// Drop a stage's tables from the database.
Drop(drop::Command),
/// Dumps a stage from a range into a new database.
Dump(dump::Command),
}
impl Command {
/// Execute `stage` command
/// Execute `db` command
pub async fn execute(self) -> eyre::Result<()> {
// Raise the fd limit of the process.
// Does not do anything on windows.
fdlimit::raise_fd_limit();
// add network name to data dir
let data_dir = self.datadir.unwrap_or_chain_default(self.chain.chain);
let config_path = self.config.clone().unwrap_or(data_dir.config_path());
let config: Config = confy::load_path(config_path).unwrap_or_default();
info!(target: "reth::cli", "reth {} starting stage {:?}", clap::crate_version!(), self.stage);
// use the overridden db path if specified
let db_path = data_dir.db_path();
info!(target: "reth::cli", path = ?db_path, "Opening database");
let db = Arc::new(init_db(db_path)?);
let mut tx = Transaction::new(db.as_ref())?;
if let Some(listen_addr) = self.metrics {
info!(target: "reth::cli", "Starting metrics endpoint at {}", listen_addr);
prometheus_exporter::initialize_with_db_metrics(listen_addr, Arc::clone(&db)).await?;
match self.command {
Subcommands::Run(command) => command.execute().await,
Subcommands::Drop(command) => command.execute().await,
Subcommands::Dump(command) => command.execute().await,
}
let batch_size = self.batch_size.unwrap_or(self.to - self.from + 1);
let (mut exec_stage, mut unwind_stage): (Box<dyn Stage<_>>, Option<Box<dyn Stage<_>>>) =
match self.stage {
StageEnum::Bodies => {
let consensus = Arc::new(BeaconConsensus::new(self.chain.clone()));
let mut config = config;
config.peers.connect_trusted_nodes_only = self.network.trusted_only;
if !self.network.trusted_peers.is_empty() {
self.network.trusted_peers.iter().for_each(|peer| {
config.peers.trusted_nodes.insert(*peer);
});
}
let network_secret_path = self
.network
.p2p_secret_key
.clone()
.unwrap_or_else(|| data_dir.p2p_secret_path());
let p2p_secret_key = get_secret_key(&network_secret_path)?;
let default_peers_path = data_dir.known_peers_path();
let network = self
.network
.network_config(
&config,
self.chain.clone(),
p2p_secret_key,
default_peers_path,
)
.build(Arc::new(ShareableDatabase::new(db.clone(), self.chain.clone())))
.start_network()
.await?;
let fetch_client = Arc::new(network.fetch_client().await?);
let stage = BodyStage {
downloader: BodiesDownloaderBuilder::default()
.with_stream_batch_size(batch_size as usize)
.with_request_limit(config.stages.bodies.downloader_request_limit)
.with_max_buffered_blocks(
config.stages.bodies.downloader_max_buffered_blocks,
)
.with_concurrent_requests_range(
config.stages.bodies.downloader_min_concurrent_requests..=
config.stages.bodies.downloader_max_concurrent_requests,
)
.build(fetch_client, consensus.clone(), db.clone()),
consensus: consensus.clone(),
};
(Box::new(stage), None)
}
StageEnum::Senders => {
(Box::new(SenderRecoveryStage { commit_threshold: batch_size }), None)
}
StageEnum::Execution => {
let factory = reth_revm::Factory::new(self.chain.clone());
(
Box::new(ExecutionStage::new(
factory,
ExecutionStageThresholds {
max_blocks: Some(batch_size),
max_changes: None,
max_changesets: None,
},
)),
None,
)
}
StageEnum::TxLookup => (Box::new(TransactionLookupStage::new(batch_size)), None),
StageEnum::Merkle => (
Box::new(MerkleStage::default_execution()),
Some(Box::new(MerkleStage::default_unwind())),
),
_ => return Ok(()),
};
let unwind_stage = unwind_stage.as_mut().unwrap_or(&mut exec_stage);
let mut input = ExecInput {
previous_stage: Some((StageId("No Previous Stage"), StageCheckpoint::new(self.to))),
checkpoint: Some(StageCheckpoint::new(self.from)),
};
let mut unwind = UnwindInput {
checkpoint: StageCheckpoint::new(self.to),
unwind_to: self.from,
bad_block: None,
};
if !self.skip_unwind {
while unwind.checkpoint.block_number > self.from {
let unwind_output = unwind_stage.unwind(&mut tx, unwind).await?;
unwind.checkpoint = unwind_output.checkpoint;
}
}
while let ExecOutput { checkpoint: stage_progress, done: false } =
exec_stage.execute(&mut tx, input).await?
{
input.checkpoint = Some(stage_progress)
}
Ok(())
}
}

228
bin/reth/src/stage/run.rs Normal file
View File

@ -0,0 +1,228 @@
//! Main `stage` command
//!
//! Stage debugging tool
use crate::{
args::{get_secret_key, NetworkArgs, StageEnum},
dirs::{DataDirPath, MaybePlatformPath},
prometheus_exporter,
};
use clap::Parser;
use reth_beacon_consensus::BeaconConsensus;
use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder;
use reth_primitives::{ChainSpec, StageCheckpoint};
use reth_provider::{ShareableDatabase, Transaction};
use reth_staged_sync::{
utils::{chainspec::chain_spec_value_parser, init::init_db},
Config,
};
use reth_stages::{
stages::{
BodyStage, ExecutionStage, ExecutionStageThresholds, MerkleStage, SenderRecoveryStage,
TransactionLookupStage,
},
ExecInput, ExecOutput, Stage, StageId, UnwindInput,
};
use std::{net::SocketAddr, path::PathBuf, sync::Arc};
use tracing::*;
/// `reth stage` command
#[derive(Debug, Parser)]
pub struct Command {
/// The path to the configuration file to use.
#[arg(long, value_name = "FILE", verbatim_doc_comment)]
config: Option<PathBuf>,
/// The path to the data dir for all reth files and subdirectories.
///
/// Defaults to the OS-specific data directory:
///
/// - Linux: `$XDG_DATA_HOME/reth/` or `$HOME/.local/share/reth/`
/// - Windows: `{FOLDERID_RoamingAppData}/reth/`
/// - macOS: `$HOME/Library/Application Support/reth/`
#[arg(long, value_name = "DATA_DIR", verbatim_doc_comment, default_value_t)]
datadir: MaybePlatformPath<DataDirPath>,
/// The chain this node is running.
///
/// Possible values are either a built-in chain or the path to a chain specification file.
///
/// Built-in chains:
/// - mainnet
/// - goerli
/// - sepolia
#[arg(
long,
value_name = "CHAIN_OR_PATH",
verbatim_doc_comment,
default_value = "mainnet",
value_parser = chain_spec_value_parser
)]
chain: Arc<ChainSpec>,
/// Enable Prometheus metrics.
///
/// The metrics will be served at the given interface and port.
#[clap(long, value_name = "SOCKET")]
metrics: Option<SocketAddr>,
/// The name of the stage to run
#[arg(value_enum)]
stage: StageEnum,
/// The height to start at
#[arg(long)]
from: u64,
/// The end of the stage
#[arg(long, short)]
to: u64,
/// Batch size for stage execution and unwind
#[arg(long)]
batch_size: Option<u64>,
/// Normally, running the stage requires unwinding for stages that already
/// have been run, in order to not rewrite to the same database slots.
///
/// You can optionally skip the unwinding phase if you're syncing a block
/// range that has not been synced before.
#[arg(long, short)]
skip_unwind: bool,
#[clap(flatten)]
network: NetworkArgs,
}
impl Command {
/// Execute `stage` command
pub async fn execute(self) -> eyre::Result<()> {
// Raise the fd limit of the process.
// Does not do anything on windows.
fdlimit::raise_fd_limit();
// add network name to data dir
let data_dir = self.datadir.unwrap_or_chain_default(self.chain.chain);
let config_path = self.config.clone().unwrap_or(data_dir.config_path());
let config: Config = confy::load_path(config_path).unwrap_or_default();
info!(target: "reth::cli", "reth {} starting stage {:?}", clap::crate_version!(), self.stage);
// use the overridden db path if specified
let db_path = data_dir.db_path();
info!(target: "reth::cli", path = ?db_path, "Opening database");
let db = Arc::new(init_db(db_path)?);
let mut tx = Transaction::new(db.as_ref())?;
if let Some(listen_addr) = self.metrics {
info!(target: "reth::cli", "Starting metrics endpoint at {}", listen_addr);
prometheus_exporter::initialize_with_db_metrics(listen_addr, Arc::clone(&db)).await?;
}
let batch_size = self.batch_size.unwrap_or(self.to - self.from + 1);
let (mut exec_stage, mut unwind_stage): (Box<dyn Stage<_>>, Option<Box<dyn Stage<_>>>) =
match self.stage {
StageEnum::Bodies => {
let consensus = Arc::new(BeaconConsensus::new(self.chain.clone()));
let mut config = config;
config.peers.connect_trusted_nodes_only = self.network.trusted_only;
if !self.network.trusted_peers.is_empty() {
self.network.trusted_peers.iter().for_each(|peer| {
config.peers.trusted_nodes.insert(*peer);
});
}
let network_secret_path = self
.network
.p2p_secret_key
.clone()
.unwrap_or_else(|| data_dir.p2p_secret_path());
let p2p_secret_key = get_secret_key(&network_secret_path)?;
let default_peers_path = data_dir.known_peers_path();
let network = self
.network
.network_config(
&config,
self.chain.clone(),
p2p_secret_key,
default_peers_path,
)
.build(Arc::new(ShareableDatabase::new(db.clone(), self.chain.clone())))
.start_network()
.await?;
let fetch_client = Arc::new(network.fetch_client().await?);
let stage = BodyStage {
downloader: BodiesDownloaderBuilder::default()
.with_stream_batch_size(batch_size as usize)
.with_request_limit(config.stages.bodies.downloader_request_limit)
.with_max_buffered_blocks(
config.stages.bodies.downloader_max_buffered_blocks,
)
.with_concurrent_requests_range(
config.stages.bodies.downloader_min_concurrent_requests..=
config.stages.bodies.downloader_max_concurrent_requests,
)
.build(fetch_client, consensus.clone(), db.clone()),
consensus: consensus.clone(),
};
(Box::new(stage), None)
}
StageEnum::Senders => {
(Box::new(SenderRecoveryStage { commit_threshold: batch_size }), None)
}
StageEnum::Execution => {
let factory = reth_revm::Factory::new(self.chain.clone());
(
Box::new(ExecutionStage::new(
factory,
ExecutionStageThresholds {
max_blocks: Some(batch_size),
max_changes: None,
max_changesets: None,
},
)),
None,
)
}
StageEnum::TxLookup => (Box::new(TransactionLookupStage::new(batch_size)), None),
StageEnum::Merkle => (
Box::new(MerkleStage::default_execution()),
Some(Box::new(MerkleStage::default_unwind())),
),
_ => return Ok(()),
};
let unwind_stage = unwind_stage.as_mut().unwrap_or(&mut exec_stage);
let mut input = ExecInput {
previous_stage: Some((StageId("No Previous Stage"), StageCheckpoint::new(self.to))),
checkpoint: Some(StageCheckpoint::new(self.from)),
};
let mut unwind = UnwindInput {
checkpoint: StageCheckpoint::new(self.to),
unwind_to: self.from,
bad_block: None,
};
if !self.skip_unwind {
while unwind.checkpoint.block_number > self.from {
let unwind_output = unwind_stage.unwind(&mut tx, unwind).await?;
unwind.checkpoint = unwind_output.checkpoint;
}
}
while let ExecOutput { checkpoint: stage_progress, done: false } =
exec_stage.execute(&mut tx, input).await?
{
input.checkpoint = Some(stage_progress)
}
Ok(())
}
}