feat(cli): execution debug script (#2619)

This commit is contained in:
Roman Krasiuk
2023-05-26 05:02:46 +03:00
committed by GitHub
parent 46e95337db
commit 318107ed18
4 changed files with 300 additions and 2 deletions

View File

@ -2,7 +2,7 @@
use crate::{
chain, config, db,
dirs::{LogsDir, PlatformPath},
merkle_debug, node, p2p,
execution_debug, merkle_debug, node, p2p,
runner::CliRunner,
stage, test_vectors,
version::{LONG_VERSION, SHORT_VERSION},
@ -37,6 +37,9 @@ pub fn run() -> eyre::Result<()> {
Commands::TestVectors(command) => runner.run_until_ctrl_c(command.execute()),
Commands::Config(command) => runner.run_until_ctrl_c(command.execute()),
Commands::MerkleDebug(command) => runner.run_until_ctrl_c(command.execute()),
Commands::ExecutionDebug(command) => {
runner.run_command_until_exit(|ctx| command.execute(ctx))
}
}
}
@ -70,6 +73,9 @@ pub enum Commands {
/// Debug state root calculation
#[command(name = "merkle-debug")]
MerkleDebug(merkle_debug::Command),
/// Debug execution.
#[command(name = "execution-debug")]
ExecutionDebug(execution_debug::Command),
}
#[derive(Debug, Parser)]

View File

@ -0,0 +1,291 @@
//! Command for debugging execution.
use crate::{
args::{get_secret_key, NetworkArgs},
dirs::{DataDirPath, MaybePlatformPath},
node::events,
runner::CliContext,
utils::get_single_header,
};
use clap::Parser;
use futures::{stream::select as stream_select, StreamExt};
use reth_beacon_consensus::BeaconConsensus;
use reth_db::{
database::Database,
mdbx::{Env, WriteMap},
};
use reth_discv4::DEFAULT_DISCOVERY_PORT;
use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder,
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
};
use reth_interfaces::{
consensus::Consensus,
p2p::{bodies::client::BodiesClient, headers::client::HeadersClient},
};
use reth_network::NetworkHandle;
use reth_network_api::NetworkInfo;
use reth_primitives::{BlockHashOrNumber, BlockNumber, ChainSpec, H256};
use reth_provider::{ShareableDatabase, Transaction};
use reth_staged_sync::{
utils::{
chainspec::genesis_value_parser,
init::{init_db, init_genesis},
},
Config,
};
use reth_stages::{
sets::DefaultStages,
stages::{
ExecutionStage, ExecutionStageThresholds, HeaderSyncMode, SenderRecoveryStage,
TotalDifficultyStage, FINISH,
},
Pipeline, StageSet,
};
use reth_tasks::TaskExecutor;
use std::{
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
path::PathBuf,
sync::Arc,
};
use tokio::sync::watch;
use tracing::*;
/// `reth execution-debug` command
#[derive(Debug, Parser)]
pub struct Command {
/// The path to the data dir for all reth files and subdirectories.
///
/// Defaults to the OS-specific data directory:
///
/// - Linux: `$XDG_DATA_HOME/reth/` or `$HOME/.local/share/reth/`
/// - Windows: `{FOLDERID_RoamingAppData}/reth/`
/// - macOS: `$HOME/Library/Application Support/reth/`
#[arg(long, value_name = "DATA_DIR", verbatim_doc_comment, default_value_t)]
datadir: MaybePlatformPath<DataDirPath>,
/// The chain this node is running.
///
/// Possible values are either a built-in chain or the path to a chain specification file.
///
/// Built-in chains:
/// - mainnet
/// - goerli
/// - sepolia
#[arg(
long,
value_name = "CHAIN_OR_PATH",
verbatim_doc_comment,
default_value = "mainnet",
value_parser = genesis_value_parser
)]
chain: Arc<ChainSpec>,
#[clap(flatten)]
network: NetworkArgs,
/// Set the chain tip manually for testing purposes.
///
/// NOTE: This is a temporary flag
#[arg(long = "debug.tip", help_heading = "Debug")]
pub tip: Option<H256>,
/// The maximum block height.
#[arg(long)]
pub to: u64,
/// The block interval for sync and unwind.
/// Defaults to `1000`.
#[arg(long)]
pub interval: Option<u64>,
}
impl Command {
fn interval(&self) -> u64 {
self.interval.unwrap_or(1000)
}
fn build_pipeline<DB, Client>(
&self,
config: &Config,
client: Client,
consensus: Arc<dyn Consensus>,
db: DB,
task_executor: &TaskExecutor,
) -> eyre::Result<Pipeline<DB>>
where
DB: Database + Unpin + Clone + 'static,
Client: HeadersClient + BodiesClient + Clone + 'static,
{
// building network downloaders using the fetch client
let header_downloader = ReverseHeadersDownloaderBuilder::from(config.stages.headers)
.build(client.clone(), Arc::clone(&consensus))
.into_task_with(task_executor);
let body_downloader = BodiesDownloaderBuilder::from(config.stages.bodies)
.build(client, Arc::clone(&consensus), db.clone())
.into_task_with(task_executor);
let stage_conf = &config.stages;
let (tip_tx, tip_rx) = watch::channel(H256::zero());
let factory = reth_revm::Factory::new(self.chain.clone());
let header_mode = HeaderSyncMode::Tip(tip_rx);
let pipeline = Pipeline::builder()
.with_tip_sender(tip_tx)
.add_stages(
DefaultStages::new(
header_mode,
Arc::clone(&consensus),
header_downloader,
body_downloader,
factory.clone(),
)
.set(
TotalDifficultyStage::new(consensus)
.with_commit_threshold(stage_conf.total_difficulty.commit_threshold),
)
.set(SenderRecoveryStage {
commit_threshold: stage_conf.sender_recovery.commit_threshold,
})
.set(ExecutionStage::new(
factory,
ExecutionStageThresholds {
max_blocks: None,
max_changes: None,
max_changesets: None,
},
)),
)
.build(db);
Ok(pipeline)
}
async fn build_network(
&self,
config: &Config,
task_executor: TaskExecutor,
db: Arc<Env<WriteMap>>,
network_secret_path: PathBuf,
default_peers_path: PathBuf,
) -> eyre::Result<NetworkHandle> {
let secret_key = get_secret_key(&network_secret_path)?;
let network = self
.network
.network_config(config, self.chain.clone(), secret_key, default_peers_path)
.with_task_executor(Box::new(task_executor))
.listener_addr(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::UNSPECIFIED,
self.network.port.unwrap_or(DEFAULT_DISCOVERY_PORT),
)))
.discovery_addr(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::UNSPECIFIED,
self.network.discovery.port.unwrap_or(DEFAULT_DISCOVERY_PORT),
)))
.build(ShareableDatabase::new(db, self.chain.clone()))
.start_network()
.await?;
info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network");
debug!(target: "reth::cli", peer_id = ?network.peer_id(), "Full peer ID");
Ok(network)
}
async fn fetch_block_hash<Client: HeadersClient>(
&self,
client: Client,
block: BlockNumber,
) -> eyre::Result<H256> {
info!(target: "reth::cli", ?block, "Fetching block from the network.");
loop {
match get_single_header(&client, BlockHashOrNumber::Number(block)).await {
Ok(tip_header) => {
info!(target: "reth::cli", ?block, "Successfully fetched block");
return Ok(tip_header.hash)
}
Err(error) => {
error!(target: "reth::cli", ?block, %error, "Failed to fetch the block. Retrying...");
}
}
}
}
/// Execute `execution-debug` command
pub async fn execute(self, ctx: CliContext) -> eyre::Result<()> {
let config = Config::default();
let data_dir = self.datadir.unwrap_or_chain_default(self.chain.chain);
let db_path = data_dir.db_path();
std::fs::create_dir_all(&db_path)?;
let db = Arc::new(init_db(db_path)?);
debug!(target: "reth::cli", chain=%self.chain.chain, genesis=?self.chain.genesis_hash(), "Initializing genesis");
init_genesis(db.clone(), self.chain.clone())?;
let consensus: Arc<dyn Consensus> = Arc::new(BeaconConsensus::new(Arc::clone(&self.chain)));
// Configure and build network
let network_secret_path =
self.network.p2p_secret_key.clone().unwrap_or_else(|| data_dir.p2p_secret_path());
let network = self
.build_network(
&config,
ctx.task_executor.clone(),
db.clone(),
network_secret_path,
data_dir.known_peers_path(),
)
.await?;
// Configure the pipeline
let fetch_client = network.fetch_client().await?;
let mut pipeline = self.build_pipeline(
&config,
fetch_client.clone(),
Arc::clone(&consensus),
db.clone(),
&ctx.task_executor,
)?;
let pipeline_events = pipeline.events();
let events = stream_select(
network.event_listener().map(Into::into),
pipeline_events.map(Into::into),
);
ctx.task_executor
.spawn_critical("events task", events::handle_events(Some(network.clone()), events));
let latest_block_number =
FINISH.get_checkpoint(&db.tx()?)?.unwrap_or_default().block_number;
if latest_block_number >= self.to {
info!(target: "reth::cli", latest = latest_block_number, "Nothing to run");
return Ok(())
}
let interval = self.interval();
let mut current_max_block = latest_block_number;
while current_max_block < self.to {
let next_block = current_max_block + 1;
let target_block = self.to.min(current_max_block + interval);
let target_block_hash =
self.fetch_block_hash(fetch_client.clone(), target_block).await?;
// Run the pipeline
info!(target: "reth::cli", from = next_block, to = target_block, tip = ?target_block_hash, "Starting pipeline");
pipeline.set_tip(target_block_hash);
let result = pipeline.run_loop().await?;
trace!(target: "reth::cli", from = next_block, to = target_block, tip = ?target_block_hash, ?result, "Pipeline finished");
// Unwind the pipeline without committing.
{
let tx = Transaction::new(db.as_ref())?;
tx.take_block_and_execution_range(&self.chain, next_block..=target_block)?;
}
// Update latest block
current_max_block = target_block;
}
Ok(())
}
}

View File

@ -13,6 +13,7 @@ pub mod cli;
pub mod config;
pub mod db;
pub mod dirs;
pub mod execution_debug;
pub mod merkle_debug;
pub mod node;
pub mod p2p;

View File

@ -196,7 +196,7 @@ where
/// If any stage is unsuccessful at execution, we proceed to
/// unwind. This will undo the progress across the entire pipeline
/// up to the block that caused the error.
async fn run_loop(&mut self) -> Result<ControlFlow, PipelineError> {
pub async fn run_loop(&mut self) -> Result<ControlFlow, PipelineError> {
let mut previous_stage = None;
for stage_index in 0..self.stages.len() {
let stage = &self.stages[stage_index];