feat: add reth init and reth import (#877)

Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com>
This commit is contained in:
Bjerg
2023-02-09 21:08:10 +01:00
committed by GitHub
parent 0f2d345970
commit 440718288d
19 changed files with 515 additions and 116 deletions

2
Cargo.lock generated
View File

@ -4062,6 +4062,7 @@ dependencies = [
"reth-db", "reth-db",
"reth-discv4", "reth-discv4",
"reth-downloaders", "reth-downloaders",
"reth-eth-wire",
"reth-executor", "reth-executor",
"reth-interfaces", "reth-interfaces",
"reth-net-nat", "reth-net-nat",
@ -4208,6 +4209,7 @@ dependencies = [
"bytes", "bytes",
"futures", "futures",
"futures-util", "futures-util",
"itertools 0.10.5",
"metrics", "metrics",
"pin-project", "pin-project",
"reth-db", "reth-db",

View File

@ -18,11 +18,13 @@ reth-interfaces = { path = "../../crates/interfaces", features = ["test-utils"]
reth-transaction-pool = { path = "../../crates/transaction-pool", features = ["test-utils"] } reth-transaction-pool = { path = "../../crates/transaction-pool", features = ["test-utils"] }
reth-consensus = { path = "../../crates/consensus" } reth-consensus = { path = "../../crates/consensus" }
reth-executor = { path = "../../crates/executor" } reth-executor = { path = "../../crates/executor" }
reth-eth-wire = { path = "../../crates/net/eth-wire" }
reth-rpc-builder = { path = "../../crates/rpc/rpc-builder" } reth-rpc-builder = { path = "../../crates/rpc/rpc-builder" }
# reth-rpc = {path = "../../crates/rpc/rpc"}
reth-rlp = { path = "../../crates/rlp" } reth-rlp = { path = "../../crates/rlp" }
reth-network = {path = "../../crates/net/network", features = ["serde"] } reth-network = {path = "../../crates/net/network", features = ["serde"] }
reth-network-api = {path = "../../crates/net/network-api" } reth-network-api = {path = "../../crates/net/network-api" }
reth-downloaders = {path = "../../crates/net/downloaders" } reth-downloaders = {path = "../../crates/net/downloaders", features = ["test-utils"] }
reth-tracing = { path = "../../crates/tracing" } reth-tracing = { path = "../../crates/tracing" }
reth-net-nat = { path = "../../crates/net/nat" } reth-net-nat = { path = "../../crates/net/nat" }
reth-discv4 = { path = "../../crates/net/discv4" } reth-discv4 = { path = "../../crates/net/discv4" }

View File

@ -0,0 +1,168 @@
use crate::{
dirs::{ConfigPath, DbPath, PlatformPath},
node::{handle_events, NodeEvent},
utils::{chainspec::genesis_value_parser, init::init_db},
};
use clap::{crate_version, Parser};
use eyre::Context;
use futures::{Stream, StreamExt};
use reth_consensus::beacon::BeaconConsensus;
use reth_db::mdbx::{Env, WriteMap};
use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder,
headers::reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::FileClient,
};
use reth_interfaces::{
consensus::{Consensus, ForkchoiceState},
sync::SyncStateUpdater,
};
use reth_primitives::ChainSpec;
use reth_staged_sync::{utils::init::init_genesis, Config};
use reth_stages::{
prelude::*,
stages::{ExecutionStage, SenderRecoveryStage, TotalDifficultyStage},
};
use std::sync::Arc;
use tracing::{debug, info};
/// Syncs RLP encoded blocks from a file.
#[derive(Debug, Parser)]
pub struct ImportCommand {
/// The path to the configuration file to use.
#[arg(long, value_name = "FILE", verbatim_doc_comment, default_value_t)]
config: PlatformPath<ConfigPath>,
/// The path to the database folder.
///
/// Defaults to the OS-specific data directory:
///
/// - Linux: `$XDG_DATA_HOME/reth/db` or `$HOME/.local/share/reth/db`
/// - Windows: `{FOLDERID_RoamingAppData}/reth/db`
/// - macOS: `$HOME/Library/Application Support/reth/db`
#[arg(long, value_name = "PATH", verbatim_doc_comment, default_value_t)]
db: PlatformPath<DbPath>,
/// The chain this node is running.
///
/// Possible values are either a built-in chain or the path to a chain specification file.
///
/// Built-in chains:
/// - mainnet
/// - goerli
/// - sepolia
#[arg(
long,
value_name = "CHAIN_OR_PATH",
verbatim_doc_comment,
default_value = "mainnet",
value_parser = genesis_value_parser
)]
chain: ChainSpec,
/// The path to a block file for import.
///
/// The online stages (headers and bodies) are replaced by a file import, after which the
/// remaining stages are executed.
#[arg(long, value_name = "IMPORT_PATH", verbatim_doc_comment)]
path: PlatformPath<ConfigPath>,
}
impl ImportCommand {
/// Execute `import` command
pub async fn execute(self) -> eyre::Result<()> {
info!(target: "reth::cli", "reth {} starting", crate_version!());
let config: Config = self.load_config()?;
info!(target: "reth::cli", path = %self.db, "Configuration loaded");
info!(target: "reth::cli", path = %self.db, "Opening database");
let db = Arc::new(init_db(&self.db)?);
info!(target: "reth::cli", "Database opened");
debug!(target: "reth::cli", chainspec=?self.chain, "Initializing genesis");
init_genesis(db.clone(), self.chain.clone())?;
// create a new FileClient
info!(target: "reth::cli", "Importing chain file");
let file_client = Arc::new(FileClient::new(&self.path).await?);
// override the tip
let tip = file_client.tip().expect("file client has no tip");
info!(target: "reth::cli", "Chain file imported");
let (consensus, notifier) = BeaconConsensus::builder().build(self.chain.clone());
debug!(target: "reth::cli", %tip, "Tip manually set");
notifier.send(ForkchoiceState {
head_block_hash: tip,
safe_block_hash: tip,
finalized_block_hash: tip,
})?;
info!(target: "reth::cli", "Consensus engine initialized");
let (mut pipeline, events) =
self.build_import_pipeline(config, db.clone(), &consensus, file_client).await?;
tokio::spawn(handle_events(events));
// Run pipeline
info!(target: "reth::cli", "Starting sync pipeline");
tokio::select! {
res = pipeline.run(db.clone()) => res?,
_ = tokio::signal::ctrl_c() => {},
};
info!(target: "reth::cli", "Finishing up");
Ok(())
}
async fn build_import_pipeline<C>(
&self,
config: Config,
db: Arc<Env<WriteMap>>,
consensus: &Arc<C>,
file_client: Arc<FileClient>,
) -> eyre::Result<(Pipeline<Env<WriteMap>, impl SyncStateUpdater>, impl Stream<Item = NodeEvent>)>
where
C: Consensus + 'static,
{
let header_downloader = ReverseHeadersDownloaderBuilder::from(config.stages.headers)
.build(consensus.clone(), file_client.clone())
.as_task();
let body_downloader = BodiesDownloaderBuilder::from(config.stages.bodies)
.build(file_client.clone(), consensus.clone(), db)
.as_task();
let mut pipeline = Pipeline::builder()
.with_sync_state_updater(file_client)
.add_stages(
OnlineStages::new(consensus.clone(), header_downloader, body_downloader).set(
TotalDifficultyStage {
chain_spec: self.chain.clone(),
commit_threshold: config.stages.total_difficulty.commit_threshold,
},
),
)
.add_stages(
OfflineStages::default()
.set(SenderRecoveryStage {
batch_size: config.stages.sender_recovery.batch_size,
commit_threshold: config.stages.sender_recovery.commit_threshold,
})
.set(ExecutionStage {
chain_spec: self.chain.clone(),
commit_threshold: config.stages.execution.commit_threshold,
}),
)
.with_max_block(0)
.build();
let events = pipeline.events().map(Into::into);
Ok((pipeline, events))
}
fn load_config(&self) -> eyre::Result<Config> {
confy::load_path::<Config>(&self.config).wrap_err("Could not load config")
}
}

View File

@ -0,0 +1,65 @@
use crate::{
dirs::{DbPath, PlatformPath},
utils::chainspec::genesis_value_parser,
};
use clap::Parser;
use reth_primitives::ChainSpec;
use reth_staged_sync::utils::init::{init_db, init_genesis};
use std::sync::Arc;
use tracing::info;
/// Initializes the database with the genesis block.
#[derive(Debug, Parser)]
pub struct InitCommand {
/// The path to the database folder.
///
/// Defaults to the OS-specific data directory:
///
/// - Linux: `$XDG_DATA_HOME/reth/db` or `$HOME/.local/share/reth/db`
/// - Windows: `{FOLDERID_RoamingAppData}/reth/db`
/// - macOS: `$HOME/Library/Application Support/reth/db`
#[arg(long, value_name = "PATH", verbatim_doc_comment, default_value_t)]
db: PlatformPath<DbPath>,
/// The chain this node is running.
///
/// Possible values are either a built-in chain or the path to a chain specification file.
///
/// Built-in chains:
/// - mainnet
/// - goerli
/// - sepolia
#[arg(
long,
value_name = "CHAIN_OR_PATH",
verbatim_doc_comment,
default_value = "mainnet",
value_parser = genesis_value_parser
)]
chain: ChainSpec,
}
impl InitCommand {
/// Execute the `init` command
pub async fn execute(&self) -> eyre::Result<()> {
info!(target: "reth::cli", "reth import starting");
info!(target: "reth::cli", path = %self.db, "Opening database");
let db = Arc::new(init_db(&self.db)?);
info!(target: "reth::cli", "Database opened");
info!(target: "reth::cli", "Writing genesis block");
let genesis_hash = init_genesis(db, self.chain.clone())?;
if genesis_hash != self.chain.genesis_hash() {
// TODO: better error text
return Err(eyre::eyre!(
"Genesis hash mismatch: expected {}, got {}",
self.chain.genesis_hash(),
genesis_hash
))
}
Ok(())
}
}

View File

@ -0,0 +1,7 @@
//! Command line utilities for initializing a chain.
mod import;
mod init;
pub use import::ImportCommand;
pub use init::InitCommand;

View File

@ -1,6 +1,6 @@
//! CLI definition and entrypoint to executable //! CLI definition and entrypoint to executable
use crate::{ use crate::{
db, chain, db,
dirs::{LogsDir, PlatformPath}, dirs::{LogsDir, PlatformPath},
node, p2p, stage, test_eth_chain, test_vectors, node, p2p, stage, test_eth_chain, test_vectors,
}; };
@ -21,11 +21,13 @@ pub async fn run() -> eyre::Result<()> {
match opt.command { match opt.command {
Commands::Node(command) => command.execute().await, Commands::Node(command) => command.execute().await,
Commands::TestEthChain(command) => command.execute().await, Commands::Init(command) => command.execute().await,
Commands::Import(command) => command.execute().await,
Commands::Db(command) => command.execute().await, Commands::Db(command) => command.execute().await,
Commands::Stage(command) => command.execute().await, Commands::Stage(command) => command.execute().await,
Commands::P2P(command) => command.execute().await, Commands::P2P(command) => command.execute().await,
Commands::TestVectors(command) => command.execute().await, Commands::TestVectors(command) => command.execute().await,
Commands::TestEthChain(command) => command.execute().await,
} }
} }
@ -35,6 +37,12 @@ pub enum Commands {
/// Start the node /// Start the node
#[command(name = "node")] #[command(name = "node")]
Node(node::Command), Node(node::Command),
/// Initialize the database from a genesis file.
#[command(name = "init")]
Init(chain::InitCommand),
/// This syncs RLP encoded blocks from a file.
#[command(name = "import")]
Import(chain::ImportCommand),
/// Database debugging utilities /// Database debugging utilities
#[command(name = "db")] #[command(name = "db")]
Db(db::Command), Db(db::Command),

View File

@ -6,6 +6,7 @@
))] ))]
//! Rust Ethereum (reth) binary executable. //! Rust Ethereum (reth) binary executable.
pub mod chain;
pub mod cli; pub mod cli;
pub mod db; pub mod db;
pub mod dirs; pub mod dirs;

View File

@ -4,7 +4,7 @@
use crate::{ use crate::{
dirs::{ConfigPath, DbPath, PlatformPath}, dirs::{ConfigPath, DbPath, PlatformPath},
prometheus_exporter, prometheus_exporter,
utils::{chainspec::chain_spec_value_parser, init::init_db, parse_socket_address}, utils::{chainspec::genesis_value_parser, init::init_db, parse_socket_address},
NetworkOpts, RpcServerOpts, NetworkOpts, RpcServerOpts,
}; };
use clap::{crate_version, Parser}; use clap::{crate_version, Parser};
@ -13,10 +13,17 @@ use fdlimit::raise_fd_limit;
use futures::{stream::select as stream_select, Stream, StreamExt}; use futures::{stream::select as stream_select, Stream, StreamExt};
use reth_consensus::beacon::BeaconConsensus; use reth_consensus::beacon::BeaconConsensus;
use reth_db::mdbx::{Env, WriteMap}; use reth_db::mdbx::{Env, WriteMap};
use reth_downloaders::{bodies, headers}; use reth_downloaders::{
use reth_interfaces::consensus::{Consensus, ForkchoiceState}; bodies::bodies::BodiesDownloaderBuilder,
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
};
use reth_interfaces::{
consensus::{Consensus, ForkchoiceState},
p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader},
sync::SyncStateUpdater,
};
use reth_net_nat::NatResolver; use reth_net_nat::NatResolver;
use reth_network::{FetchClient, NetworkConfig, NetworkEvent, NetworkHandle}; use reth_network::{NetworkConfig, NetworkEvent, NetworkHandle};
use reth_network_api::NetworkInfo; use reth_network_api::NetworkInfo;
use reth_primitives::{BlockNumber, ChainSpec, H256}; use reth_primitives::{BlockNumber, ChainSpec, H256};
use reth_provider::ShareableDatabase; use reth_provider::ShareableDatabase;
@ -59,7 +66,7 @@ pub struct Command {
value_name = "CHAIN_OR_PATH", value_name = "CHAIN_OR_PATH",
verbatim_doc_comment, verbatim_doc_comment,
default_value = "mainnet", default_value = "mainnet",
value_parser = chain_spec_value_parser value_parser = genesis_value_parser
)] )]
chain: ChainSpec, chain: ChainSpec,
@ -102,19 +109,20 @@ impl Command {
let mut config: Config = self.load_config()?; let mut config: Config = self.load_config()?;
info!(target: "reth::cli", path = %self.db, "Configuration loaded"); info!(target: "reth::cli", path = %self.db, "Configuration loaded");
self.init_trusted_nodes(&mut config);
info!(target: "reth::cli", path = %self.db, "Opening database"); info!(target: "reth::cli", path = %self.db, "Opening database");
let db = Arc::new(init_db(&self.db)?); let db = Arc::new(init_db(&self.db)?);
info!(target: "reth::cli", "Database opened"); info!(target: "reth::cli", "Database opened");
self.start_metrics_endpoint()?; self.start_metrics_endpoint()?;
debug!(target: "reth::cli", chainspec=?self.chain, "Initializing genesis");
init_genesis(db.clone(), self.chain.clone())?; init_genesis(db.clone(), self.chain.clone())?;
let consensus = self.init_consensus()?; let consensus = self.init_consensus()?;
info!(target: "reth::cli", "Consensus engine initialized"); info!(target: "reth::cli", "Consensus engine initialized");
self.init_trusted_nodes(&mut config);
info!(target: "reth::cli", "Connecting to P2P network"); info!(target: "reth::cli", "Connecting to P2P network");
let netconf = self.load_network_config(&config, &db); let netconf = self.load_network_config(&config, &db);
let network = netconf.start_network().await?; let network = netconf.start_network().await?;
@ -133,12 +141,11 @@ impl Command {
.await?; .await?;
info!(target: "reth::cli", "Started RPC server"); info!(target: "reth::cli", "Started RPC server");
let mut pipeline = self.build_pipeline(&config, &network, &consensus, &db).await?; let (mut pipeline, events) = self
.build_networked_pipeline(&mut config, network.clone(), &consensus, db.clone())
.await?;
tokio::spawn(handle_events(stream_select( tokio::spawn(handle_events(events));
network.event_listener().map(Into::into),
pipeline.events().map(Into::into),
)));
// Run pipeline // Run pipeline
info!(target: "reth::cli", "Starting sync pipeline"); info!(target: "reth::cli", "Starting sync pipeline");
@ -154,6 +161,36 @@ impl Command {
Ok(()) Ok(())
} }
async fn build_networked_pipeline(
&self,
config: &mut Config,
network: NetworkHandle,
consensus: &Arc<dyn Consensus>,
db: Arc<Env<WriteMap>>,
) -> eyre::Result<(Pipeline<Env<WriteMap>, impl SyncStateUpdater>, impl Stream<Item = NodeEvent>)>
{
// building network downloaders using the fetch client
let fetch_client = Arc::new(network.fetch_client().await?);
let header_downloader = ReverseHeadersDownloaderBuilder::from(config.stages.headers)
.build(consensus.clone(), fetch_client.clone())
.as_task();
let body_downloader = BodiesDownloaderBuilder::from(config.stages.bodies)
.build(fetch_client.clone(), consensus.clone(), db.clone())
.as_task();
let mut pipeline = self
.build_pipeline(config, header_downloader, body_downloader, network.clone(), consensus)
.await?;
let events = stream_select(
network.event_listener().map(Into::into),
pipeline.events().map(Into::into),
);
Ok((pipeline, events))
}
fn load_config(&self) -> eyre::Result<Config> { fn load_config(&self) -> eyre::Result<Config> {
confy::load_path::<Config>(&self.config).wrap_err("Could not load config") confy::load_path::<Config>(&self.config).wrap_err("Could not load config")
} }
@ -214,27 +251,30 @@ impl Command {
) )
} }
async fn build_pipeline( async fn build_pipeline<H, B, U>(
&self, &self,
config: &Config, config: &Config,
network: &NetworkHandle, header_downloader: H,
body_downloader: B,
updater: U,
consensus: &Arc<dyn Consensus>, consensus: &Arc<dyn Consensus>,
db: &Arc<Env<WriteMap>>, ) -> eyre::Result<Pipeline<Env<WriteMap>, U>>
) -> eyre::Result<Pipeline<Env<WriteMap>, NetworkHandle>> { where
let fetch_client = Arc::new(network.fetch_client().await?); H: HeaderDownloader + 'static,
B: BodyDownloader + 'static,
let header_downloader = self.spawn_headers_downloader(config, consensus, &fetch_client); U: SyncStateUpdater,
let body_downloader = self.spawn_bodies_downloader(config, consensus, &fetch_client, db); {
let stage_conf = &config.stages; let stage_conf = &config.stages;
let mut builder = Pipeline::builder(); let mut builder = Pipeline::builder();
if let Some(max_block) = self.max_block { if let Some(max_block) = self.max_block {
debug!(target: "reth::cli", max_block, "Configuring builder to use max block");
builder = builder.with_max_block(max_block) builder = builder.with_max_block(max_block)
} }
let pipeline = builder let pipeline = builder
.with_sync_state_updater(network.clone()) .with_sync_state_updater(updater)
.add_stages( .add_stages(
OnlineStages::new(consensus.clone(), header_downloader, body_downloader).set( OnlineStages::new(consensus.clone(), header_downloader, body_downloader).set(
TotalDifficultyStage { TotalDifficultyStage {
@ -258,42 +298,6 @@ impl Command {
Ok(pipeline) Ok(pipeline)
} }
fn spawn_headers_downloader(
&self,
config: &Config,
consensus: &Arc<dyn Consensus>,
fetch_client: &Arc<FetchClient>,
) -> reth_downloaders::headers::task::TaskDownloader {
let headers_conf = &config.stages.headers;
headers::task::TaskDownloader::spawn(
headers::reverse_headers::ReverseHeadersDownloaderBuilder::default()
.request_limit(headers_conf.downloader_batch_size)
.stream_batch_size(headers_conf.commit_threshold as usize)
.build(consensus.clone(), fetch_client.clone()),
)
}
fn spawn_bodies_downloader(
&self,
config: &Config,
consensus: &Arc<dyn Consensus>,
fetch_client: &Arc<FetchClient>,
db: &Arc<Env<WriteMap>>,
) -> reth_downloaders::bodies::task::TaskDownloader {
let bodies_conf = &config.stages.bodies;
bodies::task::TaskDownloader::spawn(
bodies::bodies::BodiesDownloaderBuilder::default()
.with_stream_batch_size(bodies_conf.downloader_stream_batch_size)
.with_request_limit(bodies_conf.downloader_request_limit)
.with_max_buffered_responses(bodies_conf.downloader_max_buffered_responses)
.with_concurrent_requests_range(
bodies_conf.downloader_min_concurrent_requests..=
bodies_conf.downloader_max_concurrent_requests,
)
.build(fetch_client.clone(), consensus.clone(), db.clone()),
)
}
} }
/// Dumps peers to `file_path` for persistence. /// Dumps peers to `file_path` for persistence.
@ -316,8 +320,50 @@ struct NodeState {
current_checkpoint: BlockNumber, current_checkpoint: BlockNumber,
} }
impl NodeState {
async fn handle_pipeline_event(&mut self, event: PipelineEvent) {
match event {
PipelineEvent::Running { stage_id, stage_progress } => {
let notable = self.current_stage.is_none();
self.current_stage = Some(stage_id);
self.current_checkpoint = stage_progress.unwrap_or_default();
if notable {
info!(target: "reth::cli", stage = %stage_id, from = stage_progress, "Executing stage");
}
}
PipelineEvent::Ran { stage_id, result } => {
let notable = result.stage_progress > self.current_checkpoint;
self.current_checkpoint = result.stage_progress;
if result.done {
self.current_stage = None;
info!(target: "reth::cli", stage = %stage_id, checkpoint = result.stage_progress, "Stage finished executing");
} else if notable {
info!(target: "reth::cli", stage = %stage_id, checkpoint = result.stage_progress, "Stage committed progress");
}
}
_ => (),
}
}
async fn handle_network_event(&mut self, event: NetworkEvent) {
match event {
NetworkEvent::SessionEstablished { peer_id, status, .. } => {
self.connected_peers += 1;
info!(target: "reth::cli", connected_peers = self.connected_peers, peer_id = %peer_id, best_block = %status.blockhash, "Peer connected");
}
NetworkEvent::SessionClosed { peer_id, reason } => {
self.connected_peers -= 1;
let reason = reason.map(|s| s.to_string()).unwrap_or_else(|| "None".to_string());
warn!(target: "reth::cli", connected_peers = self.connected_peers, peer_id = %peer_id, %reason, "Peer disconnected.");
}
_ => (),
}
}
}
/// A node event. /// A node event.
enum NodeEvent { pub enum NodeEvent {
/// A network event. /// A network event.
Network(NetworkEvent), Network(NetworkEvent),
/// A sync pipeline event. /// A sync pipeline event.
@ -338,7 +384,7 @@ impl From<PipelineEvent> for NodeEvent {
/// Displays relevant information to the user from components of the node, and periodically /// Displays relevant information to the user from components of the node, and periodically
/// displays the high-level status of the node. /// displays the high-level status of the node.
async fn handle_events(mut events: impl Stream<Item = NodeEvent> + Unpin) { pub async fn handle_events(mut events: impl Stream<Item = NodeEvent> + Unpin) {
let mut state = NodeState::default(); let mut state = NodeState::default();
let mut interval = tokio::time::interval(Duration::from_secs(30)); let mut interval = tokio::time::interval(Duration::from_secs(30));
@ -347,35 +393,12 @@ async fn handle_events(mut events: impl Stream<Item = NodeEvent> + Unpin) {
tokio::select! { tokio::select! {
Some(event) = events.next() => { Some(event) = events.next() => {
match event { match event {
NodeEvent::Network(NetworkEvent::SessionEstablished { peer_id, status, .. }) => { NodeEvent::Network(event) => {
state.connected_peers += 1; state.handle_network_event(event).await;
info!(target: "reth::cli", connected_peers = state.connected_peers, peer_id = %peer_id, best_block = %status.blockhash, "Peer connected");
}, },
NodeEvent::Network(NetworkEvent::SessionClosed { peer_id, reason }) => { NodeEvent::Pipeline(event) => {
state.connected_peers -= 1; state.handle_pipeline_event(event).await;
let reason = reason.map(|s| s.to_string()).unwrap_or_else(|| "None".to_string());
warn!(target: "reth::cli", connected_peers = state.connected_peers, peer_id = %peer_id, %reason, "Peer disconnected.");
},
NodeEvent::Pipeline(PipelineEvent::Running { stage_id, stage_progress }) => {
let notable = state.current_stage.is_none();
state.current_stage = Some(stage_id);
state.current_checkpoint = stage_progress.unwrap_or_default();
if notable {
info!(target: "reth::cli", stage = %stage_id, from = stage_progress, "Executing stage");
}
},
NodeEvent::Pipeline(PipelineEvent::Ran { stage_id, result }) => {
let notable = result.stage_progress > state.current_checkpoint;
state.current_checkpoint = result.stage_progress;
if result.done {
state.current_stage = None;
info!(target: "reth::cli", stage = %stage_id, checkpoint = result.stage_progress, "Stage finished executing");
} else if notable {
info!(target: "reth::cli", stage = %stage_id, checkpoint = result.stage_progress, "Stage committed progress");
}
} }
_ => (),
} }
}, },
_ = interval.tick() => { _ = interval.tick() => {

View File

@ -67,6 +67,7 @@ where
..Default::default() ..Default::default()
}, },
); );
self.evm.env.cfg.chain_id = U256::from(self.chain_spec.chain().id()); self.evm.env.cfg.chain_id = U256::from(self.chain_spec.chain().id());
self.evm.env.cfg.spec_id = spec_id; self.evm.env.cfg.spec_id = spec_id;
self.evm.env.cfg.perf_all_precompiles_have_balance = false; self.evm.env.cfg.perf_all_precompiles_have_balance = false;

View File

@ -32,6 +32,7 @@ reth-rlp = { path = "../../rlp", optional = true }
tokio-util = { version = "0.7", features = ["codec"], optional = true } tokio-util = { version = "0.7", features = ["codec"], optional = true }
bytes = { version = "1", optional = true } bytes = { version = "1", optional = true }
tempfile = { version = "3.3", optional = true } tempfile = { version = "3.3", optional = true }
itertools = { version = "0.10", optional = true }
[dev-dependencies] [dev-dependencies]
reth-db = { path = "../../storage/db", features = ["test-utils"] } reth-db = { path = "../../storage/db", features = ["test-utils"] }
@ -42,10 +43,11 @@ assert_matches = "1.5.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] } tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
tokio-util = { version = "0.7", features = ["codec"] } tokio-util = { version = "0.7", features = ["codec"] }
reth-rlp = { path = "../../rlp" } reth-rlp = { path = "../../rlp" }
itertools = "0.10"
bytes = "1" bytes = "1"
thiserror = "1" thiserror = "1"
tempfile = "3.3" tempfile = "3.3"
[features] [features]
test-utils = ["dep:reth-rlp", "dep:thiserror", "dep:tokio-util", "dep:tempfile", "dep:bytes"] test-utils = ["dep:reth-rlp", "dep:thiserror", "dep:tokio-util", "dep:tempfile", "dep:bytes", "dep:itertools"]

View File

@ -1,6 +1,5 @@
use crate::metrics::DownloaderMetrics;
use super::queue::BodiesRequestQueue; use super::queue::BodiesRequestQueue;
use crate::{bodies::task::TaskDownloader, metrics::DownloaderMetrics};
use futures::Stream; use futures::Stream;
use futures_util::StreamExt; use futures_util::StreamExt;
use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx}; use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx};
@ -186,7 +185,7 @@ where
fn is_terminated(&self) -> bool { fn is_terminated(&self) -> bool {
// There is nothing to request if the range is empty // There is nothing to request if the range is empty
let nothing_to_request = self.download_range.is_empty() || let nothing_to_request = self.download_range.is_empty() ||
// or all blocks have already been requested. // or all blocks have already been requested.
self.in_progress_queue self.in_progress_queue
.last_requested_block_number .last_requested_block_number
.map(|last| last + 1 == self.download_range.end) .map(|last| last + 1 == self.download_range.end)
@ -241,6 +240,19 @@ where
} }
} }
impl<B, DB> BodiesDownloader<B, DB>
where
B: BodiesClient + 'static,
DB: Database,
Self: BodyDownloader + 'static,
{
/// Convert the downloader into a [`TaskDownloader`](super::task::TaskDownloader) by spawning
/// it.
pub fn as_task(self) -> TaskDownloader {
TaskDownloader::spawn(self)
}
}
impl<B, DB> BodyDownloader for BodiesDownloader<B, DB> impl<B, DB> BodyDownloader for BodiesDownloader<B, DB>
where where
B: BodiesClient + 'static, B: BodiesClient + 'static,

View File

@ -1,5 +1,6 @@
//! A headers downloader that can handle multiple requests concurrently. //! A headers downloader that can handle multiple requests concurrently.
use super::task::TaskDownloader;
use crate::metrics::DownloaderMetrics; use crate::metrics::DownloaderMetrics;
use futures::{stream::Stream, FutureExt}; use futures::{stream::Stream, FutureExt};
use futures_util::{stream::FuturesUnordered, StreamExt}; use futures_util::{stream::FuturesUnordered, StreamExt};
@ -357,6 +358,8 @@ where
// validate the response // validate the response
let highest = &headers[0]; let highest = &headers[0];
trace!(target: "downloaders::headers", requested_block_number, highest=?highest.number, "Validating non-empty headers response");
if highest.number != requested_block_number { if highest.number != requested_block_number {
return Err(HeadersResponseError { return Err(HeadersResponseError {
request, request,
@ -462,6 +465,7 @@ where
/// Starts a request future /// Starts a request future
fn submit_request(&mut self, request: HeadersRequest, priority: Priority) { fn submit_request(&mut self, request: HeadersRequest, priority: Priority) {
trace!(target: "downloaders::headers", ?request, "Submitting headers request");
self.in_progress_queue.push(self.request_fut(request, priority)); self.in_progress_queue.push(self.request_fut(request, priority));
} }
@ -499,6 +503,18 @@ where
} }
} }
impl<H> ReverseHeadersDownloader<H>
where
H: HeadersClient,
Self: HeaderDownloader + 'static,
{
/// Convert the downloader into a [`TaskDownloader`](super::task::TaskDownloader) by spawning
/// it.
pub fn as_task(self) -> TaskDownloader {
TaskDownloader::spawn(self)
}
}
impl<H> HeaderDownloader for ReverseHeadersDownloader<H> impl<H> HeaderDownloader for ReverseHeadersDownloader<H>
where where
H: HeadersClient + 'static, H: HeadersClient + 'static,

View File

@ -1,4 +1,5 @@
use super::file_codec::BlockFileCodec; use super::file_codec::BlockFileCodec;
use itertools::Either;
use reth_eth_wire::{BlockBody, RawBlockBody}; use reth_eth_wire::{BlockBody, RawBlockBody};
use reth_interfaces::{ use reth_interfaces::{
p2p::{ p2p::{
@ -30,7 +31,7 @@ use tokio::{
}; };
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use tokio_util::codec::FramedRead; use tokio_util::codec::FramedRead;
use tracing::warn; use tracing::{trace, warn};
/// Front-end API for fetching chain data from a file. /// Front-end API for fetching chain data from a file.
/// ///
@ -94,26 +95,29 @@ impl FileClient {
// use with_capacity to make sure the internal buffer contains the entire file // use with_capacity to make sure the internal buffer contains the entire file
let mut stream = FramedRead::with_capacity(&reader[..], BlockFileCodec, file_len as usize); let mut stream = FramedRead::with_capacity(&reader[..], BlockFileCodec, file_len as usize);
let mut block_num = 0;
while let Some(block_res) = stream.next().await { while let Some(block_res) = stream.next().await {
let block = block_res?; let block = block_res?;
let block_hash = block.header.hash_slow(); let block_hash = block.header.hash_slow();
// add to the internal maps // add to the internal maps
headers.insert(block_num, block.header.clone()); headers.insert(block.header.number, block.header.clone());
hash_to_number.insert(block_hash, block_num); hash_to_number.insert(block_hash, block.header.number);
bodies.insert( bodies.insert(
block_hash, block_hash,
BlockBody { transactions: block.transactions, ommers: block.ommers }, BlockBody { transactions: block.transactions, ommers: block.ommers },
); );
// update block num
block_num += 1;
} }
trace!(blocks = headers.len(), "Initialized file client");
Ok(Self { headers, hash_to_number, bodies, is_syncing: Arc::new(Default::default()) }) Ok(Self { headers, hash_to_number, bodies, is_syncing: Arc::new(Default::default()) })
} }
/// Get the tip hash of the chain.
pub fn tip(&self) -> Option<H256> {
self.headers.get(&(self.headers.len() as u64 - 1)).map(|h| h.hash_slow())
}
/// Use the provided bodies as the file client's block body buffer. /// Use the provided bodies as the file client's block body buffer.
pub(crate) fn with_bodies(mut self, bodies: HashMap<BlockHash, BlockBody>) -> Self { pub(crate) fn with_bodies(mut self, bodies: HashMap<BlockHash, BlockBody>) -> Self {
self.bodies = bodies; self.bodies = bodies;
@ -140,24 +144,39 @@ impl HeadersClient for FileClient {
) -> Self::Output { ) -> Self::Output {
// this just searches the buffer, and fails if it can't find the header // this just searches the buffer, and fails if it can't find the header
let mut headers = Vec::new(); let mut headers = Vec::new();
trace!(target : "downloaders::file", request=?request, "Getting headers");
let start_num = match request.start { let start_num = match request.start {
BlockHashOrNumber::Hash(hash) => match self.hash_to_number.get(&hash) { BlockHashOrNumber::Hash(hash) => match self.hash_to_number.get(&hash) {
Some(num) => *num, Some(num) => *num,
None => return Box::pin(async move { Err(RequestError::BadResponse) }), None => {
warn!(%hash, "Could not find starting block number for requested header hash");
return Box::pin(async move { Err(RequestError::BadResponse) })
}
}, },
BlockHashOrNumber::Number(num) => num, BlockHashOrNumber::Number(num) => num,
}; };
let range = match request.direction { let range = if request.limit == 1 {
HeadersDirection::Rising => start_num..=start_num + 1 - request.limit, Either::Left(start_num..start_num + 1)
HeadersDirection::Falling => start_num + 1 - request.limit..=start_num, } else {
match request.direction {
HeadersDirection::Rising => Either::Left(start_num..start_num + request.limit),
HeadersDirection::Falling => {
Either::Right((start_num - request.limit + 1..=start_num).rev())
}
}
}; };
trace!(target : "downloaders::file", range=?range, "Getting headers with range");
for block_number in range { for block_number in range {
match self.headers.get(&block_number).cloned() { match self.headers.get(&block_number).cloned() {
Some(header) => headers.push(header), Some(header) => headers.push(header),
None => return Box::pin(async move { Err(RequestError::BadResponse) }), None => {
warn!(number=%block_number, "Could not find header");
return Box::pin(async move { Err(RequestError::BadResponse) })
}
} }
} }

View File

@ -7,7 +7,9 @@ use std::{fmt, str::FromStr};
// The chain spec module. // The chain spec module.
mod spec; mod spec;
pub use spec::{ChainSpec, ChainSpecBuilder, ForkCondition, GOERLI, MAINNET, SEPOLIA}; pub use spec::{
AllGenesisFormats, ChainSpec, ChainSpecBuilder, ForkCondition, GOERLI, MAINNET, SEPOLIA,
};
// The chain info module. // The chain info module.
mod info; mod info;

View File

@ -260,6 +260,37 @@ impl From<EthersGenesis> for ChainSpec {
} }
} }
/// A helper type for compatibility with geth's config
#[derive(Debug, Clone, Deserialize)]
#[serde(untagged)]
pub enum AllGenesisFormats {
/// The geth genesis format
Geth(EthersGenesis),
/// The reth genesis format
Reth(ChainSpec),
}
impl From<EthersGenesis> for AllGenesisFormats {
fn from(genesis: EthersGenesis) -> Self {
Self::Geth(genesis)
}
}
impl From<ChainSpec> for AllGenesisFormats {
fn from(genesis: ChainSpec) -> Self {
Self::Reth(genesis)
}
}
impl From<AllGenesisFormats> for ChainSpec {
fn from(genesis: AllGenesisFormats) -> Self {
match genesis {
AllGenesisFormats::Geth(genesis) => genesis.into(),
AllGenesisFormats::Reth(genesis) => genesis,
}
}
}
/// A helper to build custom chain specs /// A helper to build custom chain specs
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct ChainSpecBuilder { pub struct ChainSpecBuilder {

View File

@ -38,7 +38,8 @@ pub use bits::H512;
pub use block::{Block, BlockHashOrNumber, SealedBlock}; pub use block::{Block, BlockHashOrNumber, SealedBlock};
pub use bloom::Bloom; pub use bloom::Bloom;
pub use chain::{ pub use chain::{
Chain, ChainInfo, ChainSpec, ChainSpecBuilder, ForkCondition, GOERLI, MAINNET, SEPOLIA, AllGenesisFormats, Chain, ChainInfo, ChainSpec, ChainSpecBuilder, ForkCondition, GOERLI,
MAINNET, SEPOLIA,
}; };
pub use constants::{ pub use constants::{
EMPTY_OMMER_ROOT, GOERLI_GENESIS, KECCAK_EMPTY, MAINNET_GENESIS, SEPOLIA_GENESIS, EMPTY_OMMER_ROOT, GOERLI_GENESIS, KECCAK_EMPTY, MAINNET_GENESIS, SEPOLIA_GENESIS,

View File

@ -19,6 +19,7 @@ reth-db = {path = "../../crates/storage/db", features = ["mdbx", "test-utils"] }
reth-discv4 = { path = "../../crates/net/discv4" } reth-discv4 = { path = "../../crates/net/discv4" }
reth-network-api = { path = "../../crates/net/network-api" } reth-network-api = { path = "../../crates/net/network-api" }
reth-network = { path = "../../crates/net/network", features = ["serde"] } reth-network = { path = "../../crates/net/network", features = ["serde"] }
reth-downloaders = { path = "../../crates/net/downloaders" }
reth-primitives = { path = "../../crates/primitives" } reth-primitives = { path = "../../crates/primitives" }
reth-provider = { path = "../../crates/storage/provider", features = ["test-utils"] } reth-provider = { path = "../../crates/storage/provider", features = ["test-utils"] }
reth-net-nat = { path = "../../crates/net/nat" } reth-net-nat = { path = "../../crates/net/nat" }

View File

@ -3,6 +3,10 @@ use std::{path::PathBuf, sync::Arc};
use reth_db::database::Database; use reth_db::database::Database;
use reth_discv4::Discv4Config; use reth_discv4::Discv4Config;
use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder,
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
};
use reth_network::{ use reth_network::{
config::{mainnet_nodes, rng_secret_key}, config::{mainnet_nodes, rng_secret_key},
NetworkConfig, NetworkConfigBuilder, PeersConfig, NetworkConfig, NetworkConfigBuilder, PeersConfig,
@ -66,24 +70,30 @@ pub struct StageConfig {
} }
/// Header stage configuration. /// Header stage configuration.
#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)] #[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)]
pub struct HeadersConfig { pub struct HeadersConfig {
/// The maximum number of headers to download before committing progress to the database. /// The maximum number of headers to download before committing progress to the database.
pub commit_threshold: u64, pub commit_threshold: u64,
/// The maximum number of headers to request from a peer at a time. /// The maximum number of headers to request from a peer at a time.
pub downloader_batch_size: u64, pub downloader_batch_size: u64,
/// The number of times to retry downloading a set of headers.
pub downloader_retries: usize,
} }
impl Default for HeadersConfig { impl Default for HeadersConfig {
fn default() -> Self { fn default() -> Self {
Self { commit_threshold: 10_000, downloader_batch_size: 1000, downloader_retries: 5 } Self { commit_threshold: 10_000, downloader_batch_size: 1000 }
}
}
impl From<HeadersConfig> for ReverseHeadersDownloaderBuilder {
fn from(config: HeadersConfig) -> Self {
ReverseHeadersDownloaderBuilder::default()
.request_limit(config.downloader_batch_size)
.stream_batch_size(config.commit_threshold as usize)
} }
} }
/// Total difficulty stage configuration /// Total difficulty stage configuration
#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)] #[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)]
pub struct TotalDifficultyConfig { pub struct TotalDifficultyConfig {
/// The maximum number of total difficulty entries to sum up before committing progress to the /// The maximum number of total difficulty entries to sum up before committing progress to the
/// database. /// database.
@ -97,7 +107,7 @@ impl Default for TotalDifficultyConfig {
} }
/// Body stage configuration. /// Body stage configuration.
#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)] #[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)]
pub struct BodiesConfig { pub struct BodiesConfig {
/// The batch size of non-empty blocks per one request /// The batch size of non-empty blocks per one request
pub downloader_request_limit: u64, pub downloader_request_limit: u64,
@ -124,8 +134,21 @@ impl Default for BodiesConfig {
} }
} }
impl From<BodiesConfig> for BodiesDownloaderBuilder {
fn from(config: BodiesConfig) -> Self {
BodiesDownloaderBuilder::default()
.with_stream_batch_size(config.downloader_stream_batch_size)
.with_request_limit(config.downloader_request_limit)
.with_max_buffered_responses(config.downloader_max_buffered_responses)
.with_concurrent_requests_range(
config.downloader_min_concurrent_requests..=
config.downloader_max_concurrent_requests,
)
}
}
/// Sender recovery stage configuration. /// Sender recovery stage configuration.
#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Serialize)] #[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq, Serialize)]
pub struct SenderRecoveryConfig { pub struct SenderRecoveryConfig {
/// The maximum number of blocks to process before committing progress to the database. /// The maximum number of blocks to process before committing progress to the database.
pub commit_threshold: u64, pub commit_threshold: u64,
@ -140,7 +163,7 @@ impl Default for SenderRecoveryConfig {
} }
/// Execution stage configuration. /// Execution stage configuration.
#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)] #[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)]
pub struct ExecutionConfig { pub struct ExecutionConfig {
/// The maximum number of blocks to execution before committing progress to the database. /// The maximum number of blocks to execution before committing progress to the database.
pub commit_threshold: u64, pub commit_threshold: u64,

View File

@ -1,4 +1,4 @@
use reth_primitives::{ChainSpec, GOERLI, MAINNET, SEPOLIA}; use reth_primitives::{AllGenesisFormats, ChainSpec, GOERLI, MAINNET, SEPOLIA};
use std::path::PathBuf; use std::path::PathBuf;
/// Clap value parser for [ChainSpec]s that takes either a built-in chainspec or the path /// Clap value parser for [ChainSpec]s that takes either a built-in chainspec or the path
@ -14,3 +14,18 @@ pub fn chain_spec_value_parser(s: &str) -> Result<ChainSpec, eyre::Error> {
} }
}) })
} }
/// Clap value parser for [ChainSpec]s that takes either a built-in genesis format or the path
/// to a custom one.
pub fn genesis_value_parser(s: &str) -> Result<ChainSpec, eyre::Error> {
Ok(match s {
"mainnet" => MAINNET.clone(),
"goerli" => GOERLI.clone(),
"sepolia" => SEPOLIA.clone(),
_ => {
let raw = std::fs::read_to_string(PathBuf::from(shellexpand::full(s)?.into_owned()))?;
let genesis: AllGenesisFormats = serde_json::from_str(&raw)?;
genesis.into()
}
})
}