feat: Stage tool (#716)

* feat: `reth stage` command

* refactor(bin): move init utils to separate file

* feat(bin): scaffold stage command for one stage

* fix: correctly set from/to for stage range

* fix(stage-tool): add unwind before execute to re-exec

otherwise we're double executing stuff

* fix(stage-tool): use max commit threshold avail

* chore: rm unused vars

* fix(genesis-init): take a write tx only if needed

this avoids blocking by accident if we took a write tx expecting
that init_genesis would immediately return the hash

* feat(stage-tool): add bodies stage

Co-authored-by: Oliver Nordbjerg <hi@notbjerg.me>
This commit is contained in:
Georgios Konstantopoulos
2023-01-04 21:53:27 +02:00
committed by GitHub
parent 973ea48e03
commit 759ba39311
8 changed files with 283 additions and 64 deletions

1
Cargo.lock generated
View File

@ -3495,6 +3495,7 @@ dependencies = [
"serde",
"serde_json",
"shellexpand",
"strum",
"tempfile",
"thiserror",
"tokio",

View File

@ -47,5 +47,6 @@ clap = { version = "4.0", features = ["derive", "cargo"] }
thiserror = "1.0"
tokio = { version = "1.21", features = ["sync", "macros", "rt-multi-thread"] }
futures = "0.3.25"
strum = "0.24.1"
tempfile = { version = "3.3.0" }
backon = "0.2.0"

View File

@ -1,12 +1,11 @@
//! CLI definition and entrypoint to executable
use clap::{ArgAction, Parser, Subcommand};
use tracing_subscriber::util::SubscriberInitExt;
use crate::{
db, node, p2p, test_eth_chain,
db, node, p2p, stage, test_eth_chain,
util::reth_tracing::{self, TracingMode},
};
use clap::{ArgAction, Parser, Subcommand};
use tracing_subscriber::util::SubscriberInitExt;
/// main function that parses cli and runs command
pub async fn run() -> eyre::Result<()> {
@ -22,6 +21,7 @@ pub async fn run() -> eyre::Result<()> {
Commands::Node(command) => command.execute().await,
Commands::TestEthChain(command) => command.execute().await,
Commands::Db(command) => command.execute().await,
Commands::Stage(command) => command.execute().await,
Commands::P2P(command) => command.execute().await,
}
}
@ -32,12 +32,15 @@ pub enum Commands {
/// Start the node
#[command(name = "node")]
Node(node::Command),
/// Runs Ethereum blockchain tests
/// Run Ethereum blockchain tests
#[command(name = "test-chain")]
TestEthChain(test_eth_chain::Command),
/// DB Debugging utilities
/// Database debugging utilities
#[command(name = "db")]
Db(db::Command),
/// Run a single stage
#[command(name = "stage")]
Stage(stage::Command),
/// P2P Debugging utilities
#[command(name = "p2p")]
P2P(p2p::Command),

View File

@ -13,5 +13,6 @@ pub mod dirs;
pub mod node;
pub mod p2p;
pub mod prometheus_exporter;
pub mod stage;
pub mod test_eth_chain;
pub mod util;

View File

@ -5,22 +5,18 @@ use crate::{
config::Config,
dirs::{ConfigPath, DbPath},
prometheus_exporter,
util::chainspec::{chain_spec_value_parser, ChainSpecification, Genesis},
util::{
chainspec::{chain_spec_value_parser, ChainSpecification},
init::{init_db, init_genesis},
},
};
use clap::{crate_version, Parser};
use fdlimit::raise_fd_limit;
use reth_consensus::BeaconConsensus;
use reth_db::{
cursor::DbCursorRO,
database::Database,
mdbx::{Env, WriteMap},
tables,
transaction::{DbTx, DbTxMut},
};
use reth_downloaders::{bodies, headers};
use reth_executor::Config as ExecutorConfig;
use reth_interfaces::consensus::ForkchoiceState;
use reth_primitives::{Account, Header, H256};
use reth_primitives::H256;
use reth_stages::{
metrics::HeaderMetrics,
stages::{
@ -28,7 +24,7 @@ use reth_stages::{
sender_recovery::SenderRecoveryStage, total_difficulty::TotalDifficultyStage,
},
};
use std::{net::SocketAddr, path::Path, sync::Arc};
use std::{net::SocketAddr, sync::Arc};
use tracing::{debug, info};
/// Start the client
@ -173,51 +169,3 @@ impl Command {
Ok(())
}
}
/// Opens up an existing database or creates a new one at the specified path.
fn init_db<P: AsRef<Path>>(path: P) -> eyre::Result<Env<WriteMap>> {
std::fs::create_dir_all(path.as_ref())?;
let db = reth_db::mdbx::Env::<reth_db::mdbx::WriteMap>::open(
path.as_ref(),
reth_db::mdbx::EnvKind::RW,
)?;
db.create_tables()?;
Ok(db)
}
/// Write the genesis block if it has not already been written
#[allow(clippy::field_reassign_with_default)]
fn init_genesis<DB: Database>(db: Arc<DB>, genesis: Genesis) -> Result<H256, reth_db::Error> {
let tx = db.tx_mut()?;
if let Some((_, hash)) = tx.cursor::<tables::CanonicalHeaders>()?.first()? {
debug!("Genesis already written, skipping.");
return Ok(hash)
}
debug!("Writing genesis block.");
// Insert account state
for (address, account) in &genesis.alloc {
tx.put::<tables::PlainAccountState>(
*address,
Account {
nonce: account.nonce.unwrap_or_default(),
balance: account.balance,
bytecode_hash: None,
},
)?;
}
// Insert header
let header: Header = genesis.into();
let hash = header.hash_slow();
tx.put::<tables::CanonicalHeaders>(0, hash)?;
tx.put::<tables::HeaderNumbers>(hash, 0)?;
tx.put::<tables::BlockBodies>((0, hash).into(), Default::default())?;
tx.put::<tables::BlockTransitionIndex>((0, hash).into(), 0)?;
tx.put::<tables::HeaderTD>((0, hash).into(), header.difficulty.into())?;
tx.put::<tables::Headers>((0, hash).into(), header)?;
tx.commit()?;
Ok(hash)
}

201
bin/reth/src/stage/mod.rs Normal file
View File

@ -0,0 +1,201 @@
//! Main `stage` command
//!
//! Stage debugging tool
use crate::{
config::Config,
dirs::{ConfigPath, DbPath},
prometheus_exporter,
util::{
chainspec::{chain_spec_value_parser, ChainSpecification},
init::{init_db, init_genesis},
},
};
use reth_consensus::BeaconConsensus;
use reth_downloaders::bodies::concurrent::ConcurrentDownloader;
use reth_primitives::NodeRecord;
use reth_stages::{
metrics::HeaderMetrics,
stages::{bodies::BodyStage, sender_recovery::SenderRecoveryStage},
ExecInput, Stage, StageId, Transaction, UnwindInput,
};
use clap::Parser;
use serde::Deserialize;
use std::{net::SocketAddr, sync::Arc};
use strum::{AsRefStr, EnumString, EnumVariantNames};
use tracing::*;
/// `reth stage` command
#[derive(Debug, Parser)]
pub struct Command {
/// The path to the database folder.
///
/// Defaults to the OS-specific data directory:
///
/// - Linux: `$XDG_DATA_HOME/reth/db` or `$HOME/.local/share/reth/db`
/// - Windows: `{FOLDERID_RoamingAppData}/reth/db`
/// - macOS: `$HOME/Library/Application Support/reth/db`
#[arg(long, value_name = "PATH", verbatim_doc_comment, default_value_t)]
db: DbPath,
/// The path to the configuration file to use.
#[arg(long, value_name = "FILE", verbatim_doc_comment, default_value_t)]
config: ConfigPath,
/// The chain this node is running.
///
/// Possible values are either a built-in chain or the path to a chain specification file.
///
/// Built-in chains:
/// - mainnet
/// - goerli
/// - sepolia
#[arg(
long,
value_name = "CHAIN_OR_PATH",
verbatim_doc_comment,
default_value = "mainnet",
value_parser = chain_spec_value_parser
)]
chain: ChainSpecification,
/// Enable Prometheus metrics.
///
/// The metrics will be served at the given interface and port.
#[clap(long, value_name = "SOCKET")]
metrics: Option<SocketAddr>,
/// The name of the stage to run
#[arg(long, short)]
stage: StageEnum,
/// The height to start at
#[arg(long)]
from: u64,
/// The end of the stage
#[arg(long, short)]
to: u64,
/// Whether to unwind or run the stage forward
#[arg(long, short)]
unwind: bool,
#[clap(flatten)]
network: NetworkOpts,
}
#[derive(
Debug, Clone, Copy, Eq, PartialEq, AsRefStr, EnumVariantNames, EnumString, Deserialize,
)]
#[serde(rename_all = "snake_case")]
#[strum(serialize_all = "kebab-case")]
enum StageEnum {
Headers,
Bodies,
Senders,
Execution,
}
#[derive(Debug, Parser)]
struct NetworkOpts {
/// Disable the discovery service.
#[arg(short, long)]
disable_discovery: bool,
/// Target trusted peer enodes
/// --trusted-peers enode://abcd@192.168.0.1:30303
#[arg(long)]
trusted_peers: Vec<NodeRecord>,
/// Connect only to trusted peers
#[arg(long)]
trusted_only: bool,
}
impl Command {
/// Execute `stage` command
pub async fn execute(&self) -> eyre::Result<()> {
// Raise the fd limit of the process.
// Does not do anything on windows.
fdlimit::raise_fd_limit();
if let Some(listen_addr) = self.metrics {
info!("Starting metrics endpoint at {}", listen_addr);
prometheus_exporter::initialize(listen_addr)?;
HeaderMetrics::describe();
}
let config: Config = confy::load_path(&self.config).unwrap_or_default();
info!("reth {} starting stage {:?}", clap::crate_version!(), self.stage);
let input = ExecInput {
previous_stage: Some((StageId("No Previous Stage"), self.to)),
stage_progress: Some(self.from),
};
let unwind = UnwindInput { stage_progress: self.to, unwind_to: self.from, bad_block: None };
let db = Arc::new(init_db(&self.db)?);
let mut tx = Transaction::new(db.as_ref())?;
match self.stage {
StageEnum::Bodies => {
let chain_id = self.chain.consensus.chain_id;
let consensus = Arc::new(BeaconConsensus::new(self.chain.consensus.clone()));
let genesis_hash = init_genesis(db.clone(), self.chain.genesis.clone())?;
let mut config = config;
config.peers.connect_trusted_nodes_only = self.network.trusted_only;
if !self.network.trusted_peers.is_empty() {
self.network.trusted_peers.iter().for_each(|peer| {
config.peers.trusted_nodes.insert(*peer);
});
}
let network = config
.network_config(
db.clone(),
chain_id,
genesis_hash,
self.network.disable_discovery,
)
.start_network()
.await?;
let fetch_client = Arc::new(network.fetch_client().await?);
dbg!(&config.stages.bodies);
let mut stage = BodyStage {
downloader: Arc::new(
ConcurrentDownloader::new(fetch_client.clone(), consensus.clone())
.with_batch_size(config.stages.bodies.downloader_batch_size)
.with_retries(config.stages.bodies.downloader_retries)
.with_concurrency(config.stages.bodies.downloader_concurrency),
),
consensus: consensus.clone(),
commit_threshold: config.stages.bodies.commit_threshold,
};
// Unwind first
stage.unwind(&mut tx, unwind).await?;
stage.execute(&mut tx, input).await?;
}
StageEnum::Senders => {
let mut stage = SenderRecoveryStage {
batch_size: config.stages.sender_recovery.batch_size,
commit_threshold: self.to - self.from + 1,
};
// Unwind first
stage.unwind(&mut tx, unwind).await?;
stage.execute(&mut tx, input).await?;
}
StageEnum::Execution => {
// let stage = ExecutionStage { config: ExecutorConfig::new_ethereum() };
}
_ => {}
}
Ok(())
}
}

61
bin/reth/src/util/init.rs Normal file
View File

@ -0,0 +1,61 @@
use crate::util::chainspec::Genesis;
use reth_db::{
cursor::DbCursorRO,
database::Database,
mdbx::{Env, WriteMap},
tables,
transaction::{DbTx, DbTxMut},
};
use reth_primitives::{Account, Header, H256};
use std::{path::Path, sync::Arc};
use tracing::debug;
/// Opens up an existing database or creates a new one at the specified path.
pub fn init_db<P: AsRef<Path>>(path: P) -> eyre::Result<Env<WriteMap>> {
std::fs::create_dir_all(path.as_ref())?;
let db = reth_db::mdbx::Env::<reth_db::mdbx::WriteMap>::open(
path.as_ref(),
reth_db::mdbx::EnvKind::RW,
)?;
db.create_tables()?;
Ok(db)
}
/// Write the genesis block if it has not already been written
#[allow(clippy::field_reassign_with_default)]
pub fn init_genesis<DB: Database>(db: Arc<DB>, genesis: Genesis) -> Result<H256, reth_db::Error> {
let tx = db.tx()?;
if let Some((_, hash)) = tx.cursor::<tables::CanonicalHeaders>()?.first()? {
debug!("Genesis already written, skipping.");
return Ok(hash)
}
drop(tx);
debug!("Writing genesis block.");
let tx = db.tx_mut()?;
// Insert account state
for (address, account) in &genesis.alloc {
tx.put::<tables::PlainAccountState>(
*address,
Account {
nonce: account.nonce.unwrap_or_default(),
balance: account.balance,
bytecode_hash: None,
},
)?;
}
// Insert header
let header: Header = genesis.into();
let hash = header.hash_slow();
tx.put::<tables::CanonicalHeaders>(0, hash)?;
tx.put::<tables::HeaderNumbers>(hash, 0)?;
tx.put::<tables::BlockBodies>((0, hash).into(), Default::default())?;
tx.put::<tables::BlockTransitionIndex>((0, hash).into(), 0)?;
tx.put::<tables::HeaderTD>((0, hash).into(), header.difficulty.into())?;
tx.put::<tables::Headers>((0, hash).into(), header)?;
tx.commit()?;
Ok(hash)
}

View File

@ -10,6 +10,9 @@ use walkdir::{DirEntry, WalkDir};
/// Utilities for parsing chainspecs
pub mod chainspec;
/// Utilities for initializing parts of the chain
pub mod init;
/// Finds all files in a directory with a given postfix.
pub(crate) fn find_all_files_with_postfix(path: &Path, postfix: &str) -> Vec<PathBuf> {
WalkDir::new(path)