From 440718288dc5bdabbf475a2f5ffe3f7e60b1058b Mon Sep 17 00:00:00 2001 From: Bjerg Date: Thu, 9 Feb 2023 21:08:10 +0100 Subject: [PATCH] feat: add `reth init` and `reth import` (#877) Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com> --- Cargo.lock | 2 + bin/reth/Cargo.toml | 4 +- bin/reth/src/chain/import.rs | 168 +++++++++++++++ bin/reth/src/chain/init.rs | 65 ++++++ bin/reth/src/chain/mod.rs | 7 + bin/reth/src/cli.rs | 12 +- bin/reth/src/lib.rs | 1 + bin/reth/src/node/mod.rs | 195 ++++++++++-------- crates/executor/src/executor.rs | 1 + crates/net/downloaders/Cargo.toml | 4 +- crates/net/downloaders/src/bodies/bodies.rs | 18 +- .../src/headers/reverse_headers.rs | 16 ++ .../downloaders/src/test_utils/file_client.rs | 43 ++-- crates/primitives/src/chain/mod.rs | 4 +- crates/primitives/src/chain/spec.rs | 31 +++ crates/primitives/src/lib.rs | 3 +- crates/staged-sync/Cargo.toml | 1 + crates/staged-sync/src/config.rs | 39 +++- crates/staged-sync/src/utils/chainspec.rs | 17 +- 19 files changed, 515 insertions(+), 116 deletions(-) create mode 100644 bin/reth/src/chain/import.rs create mode 100644 bin/reth/src/chain/init.rs create mode 100644 bin/reth/src/chain/mod.rs diff --git a/Cargo.lock b/Cargo.lock index bc039596d..b6f7c1bdc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4062,6 +4062,7 @@ dependencies = [ "reth-db", "reth-discv4", "reth-downloaders", + "reth-eth-wire", "reth-executor", "reth-interfaces", "reth-net-nat", @@ -4208,6 +4209,7 @@ dependencies = [ "bytes", "futures", "futures-util", + "itertools 0.10.5", "metrics", "pin-project", "reth-db", diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index befece59c..f24e7fb04 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -18,11 +18,13 @@ reth-interfaces = { path = "../../crates/interfaces", features = ["test-utils"] reth-transaction-pool = { path = "../../crates/transaction-pool", features = ["test-utils"] } reth-consensus = { path = "../../crates/consensus" } reth-executor = { path = "../../crates/executor" } +reth-eth-wire = { path = "../../crates/net/eth-wire" } reth-rpc-builder = { path = "../../crates/rpc/rpc-builder" } +# reth-rpc = {path = "../../crates/rpc/rpc"} reth-rlp = { path = "../../crates/rlp" } reth-network = {path = "../../crates/net/network", features = ["serde"] } reth-network-api = {path = "../../crates/net/network-api" } -reth-downloaders = {path = "../../crates/net/downloaders" } +reth-downloaders = {path = "../../crates/net/downloaders", features = ["test-utils"] } reth-tracing = { path = "../../crates/tracing" } reth-net-nat = { path = "../../crates/net/nat" } reth-discv4 = { path = "../../crates/net/discv4" } diff --git a/bin/reth/src/chain/import.rs b/bin/reth/src/chain/import.rs new file mode 100644 index 000000000..4bdb964c7 --- /dev/null +++ b/bin/reth/src/chain/import.rs @@ -0,0 +1,168 @@ +use crate::{ + dirs::{ConfigPath, DbPath, PlatformPath}, + node::{handle_events, NodeEvent}, + utils::{chainspec::genesis_value_parser, init::init_db}, +}; +use clap::{crate_version, Parser}; +use eyre::Context; +use futures::{Stream, StreamExt}; +use reth_consensus::beacon::BeaconConsensus; +use reth_db::mdbx::{Env, WriteMap}; +use reth_downloaders::{ + bodies::bodies::BodiesDownloaderBuilder, + headers::reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::FileClient, +}; +use reth_interfaces::{ + consensus::{Consensus, ForkchoiceState}, + sync::SyncStateUpdater, +}; +use reth_primitives::ChainSpec; +use reth_staged_sync::{utils::init::init_genesis, Config}; +use reth_stages::{ + prelude::*, + stages::{ExecutionStage, SenderRecoveryStage, TotalDifficultyStage}, +}; +use std::sync::Arc; +use tracing::{debug, info}; + +/// Syncs RLP encoded blocks from a file. +#[derive(Debug, Parser)] +pub struct ImportCommand { + /// The path to the configuration file to use. + #[arg(long, value_name = "FILE", verbatim_doc_comment, default_value_t)] + config: PlatformPath, + + /// The path to the database folder. + /// + /// Defaults to the OS-specific data directory: + /// + /// - Linux: `$XDG_DATA_HOME/reth/db` or `$HOME/.local/share/reth/db` + /// - Windows: `{FOLDERID_RoamingAppData}/reth/db` + /// - macOS: `$HOME/Library/Application Support/reth/db` + #[arg(long, value_name = "PATH", verbatim_doc_comment, default_value_t)] + db: PlatformPath, + + /// The chain this node is running. + /// + /// Possible values are either a built-in chain or the path to a chain specification file. + /// + /// Built-in chains: + /// - mainnet + /// - goerli + /// - sepolia + #[arg( + long, + value_name = "CHAIN_OR_PATH", + verbatim_doc_comment, + default_value = "mainnet", + value_parser = genesis_value_parser + )] + chain: ChainSpec, + + /// The path to a block file for import. + /// + /// The online stages (headers and bodies) are replaced by a file import, after which the + /// remaining stages are executed. + #[arg(long, value_name = "IMPORT_PATH", verbatim_doc_comment)] + path: PlatformPath, +} + +impl ImportCommand { + /// Execute `import` command + pub async fn execute(self) -> eyre::Result<()> { + info!(target: "reth::cli", "reth {} starting", crate_version!()); + + let config: Config = self.load_config()?; + info!(target: "reth::cli", path = %self.db, "Configuration loaded"); + + info!(target: "reth::cli", path = %self.db, "Opening database"); + let db = Arc::new(init_db(&self.db)?); + info!(target: "reth::cli", "Database opened"); + + debug!(target: "reth::cli", chainspec=?self.chain, "Initializing genesis"); + init_genesis(db.clone(), self.chain.clone())?; + + // create a new FileClient + info!(target: "reth::cli", "Importing chain file"); + let file_client = Arc::new(FileClient::new(&self.path).await?); + + // override the tip + let tip = file_client.tip().expect("file client has no tip"); + info!(target: "reth::cli", "Chain file imported"); + + let (consensus, notifier) = BeaconConsensus::builder().build(self.chain.clone()); + debug!(target: "reth::cli", %tip, "Tip manually set"); + notifier.send(ForkchoiceState { + head_block_hash: tip, + safe_block_hash: tip, + finalized_block_hash: tip, + })?; + info!(target: "reth::cli", "Consensus engine initialized"); + + let (mut pipeline, events) = + self.build_import_pipeline(config, db.clone(), &consensus, file_client).await?; + + tokio::spawn(handle_events(events)); + + // Run pipeline + info!(target: "reth::cli", "Starting sync pipeline"); + tokio::select! { + res = pipeline.run(db.clone()) => res?, + _ = tokio::signal::ctrl_c() => {}, + }; + + info!(target: "reth::cli", "Finishing up"); + Ok(()) + } + + async fn build_import_pipeline( + &self, + config: Config, + db: Arc>, + consensus: &Arc, + file_client: Arc, + ) -> eyre::Result<(Pipeline, impl SyncStateUpdater>, impl Stream)> + where + C: Consensus + 'static, + { + let header_downloader = ReverseHeadersDownloaderBuilder::from(config.stages.headers) + .build(consensus.clone(), file_client.clone()) + .as_task(); + + let body_downloader = BodiesDownloaderBuilder::from(config.stages.bodies) + .build(file_client.clone(), consensus.clone(), db) + .as_task(); + + let mut pipeline = Pipeline::builder() + .with_sync_state_updater(file_client) + .add_stages( + OnlineStages::new(consensus.clone(), header_downloader, body_downloader).set( + TotalDifficultyStage { + chain_spec: self.chain.clone(), + commit_threshold: config.stages.total_difficulty.commit_threshold, + }, + ), + ) + .add_stages( + OfflineStages::default() + .set(SenderRecoveryStage { + batch_size: config.stages.sender_recovery.batch_size, + commit_threshold: config.stages.sender_recovery.commit_threshold, + }) + .set(ExecutionStage { + chain_spec: self.chain.clone(), + commit_threshold: config.stages.execution.commit_threshold, + }), + ) + .with_max_block(0) + .build(); + + let events = pipeline.events().map(Into::into); + + Ok((pipeline, events)) + } + + fn load_config(&self) -> eyre::Result { + confy::load_path::(&self.config).wrap_err("Could not load config") + } +} diff --git a/bin/reth/src/chain/init.rs b/bin/reth/src/chain/init.rs new file mode 100644 index 000000000..c596f2651 --- /dev/null +++ b/bin/reth/src/chain/init.rs @@ -0,0 +1,65 @@ +use crate::{ + dirs::{DbPath, PlatformPath}, + utils::chainspec::genesis_value_parser, +}; +use clap::Parser; +use reth_primitives::ChainSpec; +use reth_staged_sync::utils::init::{init_db, init_genesis}; +use std::sync::Arc; +use tracing::info; + +/// Initializes the database with the genesis block. +#[derive(Debug, Parser)] +pub struct InitCommand { + /// The path to the database folder. + /// + /// Defaults to the OS-specific data directory: + /// + /// - Linux: `$XDG_DATA_HOME/reth/db` or `$HOME/.local/share/reth/db` + /// - Windows: `{FOLDERID_RoamingAppData}/reth/db` + /// - macOS: `$HOME/Library/Application Support/reth/db` + #[arg(long, value_name = "PATH", verbatim_doc_comment, default_value_t)] + db: PlatformPath, + + /// The chain this node is running. + /// + /// Possible values are either a built-in chain or the path to a chain specification file. + /// + /// Built-in chains: + /// - mainnet + /// - goerli + /// - sepolia + #[arg( + long, + value_name = "CHAIN_OR_PATH", + verbatim_doc_comment, + default_value = "mainnet", + value_parser = genesis_value_parser + )] + chain: ChainSpec, +} + +impl InitCommand { + /// Execute the `init` command + pub async fn execute(&self) -> eyre::Result<()> { + info!(target: "reth::cli", "reth import starting"); + + info!(target: "reth::cli", path = %self.db, "Opening database"); + let db = Arc::new(init_db(&self.db)?); + info!(target: "reth::cli", "Database opened"); + + info!(target: "reth::cli", "Writing genesis block"); + let genesis_hash = init_genesis(db, self.chain.clone())?; + + if genesis_hash != self.chain.genesis_hash() { + // TODO: better error text + return Err(eyre::eyre!( + "Genesis hash mismatch: expected {}, got {}", + self.chain.genesis_hash(), + genesis_hash + )) + } + + Ok(()) + } +} diff --git a/bin/reth/src/chain/mod.rs b/bin/reth/src/chain/mod.rs new file mode 100644 index 000000000..202f9392a --- /dev/null +++ b/bin/reth/src/chain/mod.rs @@ -0,0 +1,7 @@ +//! Command line utilities for initializing a chain. + +mod import; +mod init; + +pub use import::ImportCommand; +pub use init::InitCommand; diff --git a/bin/reth/src/cli.rs b/bin/reth/src/cli.rs index 9d297ae60..3cbcfe9f7 100644 --- a/bin/reth/src/cli.rs +++ b/bin/reth/src/cli.rs @@ -1,6 +1,6 @@ //! CLI definition and entrypoint to executable use crate::{ - db, + chain, db, dirs::{LogsDir, PlatformPath}, node, p2p, stage, test_eth_chain, test_vectors, }; @@ -21,11 +21,13 @@ pub async fn run() -> eyre::Result<()> { match opt.command { Commands::Node(command) => command.execute().await, - Commands::TestEthChain(command) => command.execute().await, + Commands::Init(command) => command.execute().await, + Commands::Import(command) => command.execute().await, Commands::Db(command) => command.execute().await, Commands::Stage(command) => command.execute().await, Commands::P2P(command) => command.execute().await, Commands::TestVectors(command) => command.execute().await, + Commands::TestEthChain(command) => command.execute().await, } } @@ -35,6 +37,12 @@ pub enum Commands { /// Start the node #[command(name = "node")] Node(node::Command), + /// Initialize the database from a genesis file. + #[command(name = "init")] + Init(chain::InitCommand), + /// This syncs RLP encoded blocks from a file. + #[command(name = "import")] + Import(chain::ImportCommand), /// Database debugging utilities #[command(name = "db")] Db(db::Command), diff --git a/bin/reth/src/lib.rs b/bin/reth/src/lib.rs index 2376783b9..ad9335a1c 100644 --- a/bin/reth/src/lib.rs +++ b/bin/reth/src/lib.rs @@ -6,6 +6,7 @@ ))] //! Rust Ethereum (reth) binary executable. +pub mod chain; pub mod cli; pub mod db; pub mod dirs; diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index b7307bf8c..bf9a559d4 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -4,7 +4,7 @@ use crate::{ dirs::{ConfigPath, DbPath, PlatformPath}, prometheus_exporter, - utils::{chainspec::chain_spec_value_parser, init::init_db, parse_socket_address}, + utils::{chainspec::genesis_value_parser, init::init_db, parse_socket_address}, NetworkOpts, RpcServerOpts, }; use clap::{crate_version, Parser}; @@ -13,10 +13,17 @@ use fdlimit::raise_fd_limit; use futures::{stream::select as stream_select, Stream, StreamExt}; use reth_consensus::beacon::BeaconConsensus; use reth_db::mdbx::{Env, WriteMap}; -use reth_downloaders::{bodies, headers}; -use reth_interfaces::consensus::{Consensus, ForkchoiceState}; +use reth_downloaders::{ + bodies::bodies::BodiesDownloaderBuilder, + headers::reverse_headers::ReverseHeadersDownloaderBuilder, +}; +use reth_interfaces::{ + consensus::{Consensus, ForkchoiceState}, + p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader}, + sync::SyncStateUpdater, +}; use reth_net_nat::NatResolver; -use reth_network::{FetchClient, NetworkConfig, NetworkEvent, NetworkHandle}; +use reth_network::{NetworkConfig, NetworkEvent, NetworkHandle}; use reth_network_api::NetworkInfo; use reth_primitives::{BlockNumber, ChainSpec, H256}; use reth_provider::ShareableDatabase; @@ -59,7 +66,7 @@ pub struct Command { value_name = "CHAIN_OR_PATH", verbatim_doc_comment, default_value = "mainnet", - value_parser = chain_spec_value_parser + value_parser = genesis_value_parser )] chain: ChainSpec, @@ -102,19 +109,20 @@ impl Command { let mut config: Config = self.load_config()?; info!(target: "reth::cli", path = %self.db, "Configuration loaded"); - self.init_trusted_nodes(&mut config); - info!(target: "reth::cli", path = %self.db, "Opening database"); let db = Arc::new(init_db(&self.db)?); info!(target: "reth::cli", "Database opened"); self.start_metrics_endpoint()?; + debug!(target: "reth::cli", chainspec=?self.chain, "Initializing genesis"); init_genesis(db.clone(), self.chain.clone())?; let consensus = self.init_consensus()?; info!(target: "reth::cli", "Consensus engine initialized"); + self.init_trusted_nodes(&mut config); + info!(target: "reth::cli", "Connecting to P2P network"); let netconf = self.load_network_config(&config, &db); let network = netconf.start_network().await?; @@ -133,12 +141,11 @@ impl Command { .await?; info!(target: "reth::cli", "Started RPC server"); - let mut pipeline = self.build_pipeline(&config, &network, &consensus, &db).await?; + let (mut pipeline, events) = self + .build_networked_pipeline(&mut config, network.clone(), &consensus, db.clone()) + .await?; - tokio::spawn(handle_events(stream_select( - network.event_listener().map(Into::into), - pipeline.events().map(Into::into), - ))); + tokio::spawn(handle_events(events)); // Run pipeline info!(target: "reth::cli", "Starting sync pipeline"); @@ -154,6 +161,36 @@ impl Command { Ok(()) } + async fn build_networked_pipeline( + &self, + config: &mut Config, + network: NetworkHandle, + consensus: &Arc, + db: Arc>, + ) -> eyre::Result<(Pipeline, impl SyncStateUpdater>, impl Stream)> + { + // building network downloaders using the fetch client + let fetch_client = Arc::new(network.fetch_client().await?); + + let header_downloader = ReverseHeadersDownloaderBuilder::from(config.stages.headers) + .build(consensus.clone(), fetch_client.clone()) + .as_task(); + + let body_downloader = BodiesDownloaderBuilder::from(config.stages.bodies) + .build(fetch_client.clone(), consensus.clone(), db.clone()) + .as_task(); + + let mut pipeline = self + .build_pipeline(config, header_downloader, body_downloader, network.clone(), consensus) + .await?; + + let events = stream_select( + network.event_listener().map(Into::into), + pipeline.events().map(Into::into), + ); + Ok((pipeline, events)) + } + fn load_config(&self) -> eyre::Result { confy::load_path::(&self.config).wrap_err("Could not load config") } @@ -214,27 +251,30 @@ impl Command { ) } - async fn build_pipeline( + async fn build_pipeline( &self, config: &Config, - network: &NetworkHandle, + header_downloader: H, + body_downloader: B, + updater: U, consensus: &Arc, - db: &Arc>, - ) -> eyre::Result, NetworkHandle>> { - let fetch_client = Arc::new(network.fetch_client().await?); - - let header_downloader = self.spawn_headers_downloader(config, consensus, &fetch_client); - let body_downloader = self.spawn_bodies_downloader(config, consensus, &fetch_client, db); + ) -> eyre::Result, U>> + where + H: HeaderDownloader + 'static, + B: BodyDownloader + 'static, + U: SyncStateUpdater, + { let stage_conf = &config.stages; let mut builder = Pipeline::builder(); if let Some(max_block) = self.max_block { + debug!(target: "reth::cli", max_block, "Configuring builder to use max block"); builder = builder.with_max_block(max_block) } let pipeline = builder - .with_sync_state_updater(network.clone()) + .with_sync_state_updater(updater) .add_stages( OnlineStages::new(consensus.clone(), header_downloader, body_downloader).set( TotalDifficultyStage { @@ -258,42 +298,6 @@ impl Command { Ok(pipeline) } - - fn spawn_headers_downloader( - &self, - config: &Config, - consensus: &Arc, - fetch_client: &Arc, - ) -> reth_downloaders::headers::task::TaskDownloader { - let headers_conf = &config.stages.headers; - headers::task::TaskDownloader::spawn( - headers::reverse_headers::ReverseHeadersDownloaderBuilder::default() - .request_limit(headers_conf.downloader_batch_size) - .stream_batch_size(headers_conf.commit_threshold as usize) - .build(consensus.clone(), fetch_client.clone()), - ) - } - - fn spawn_bodies_downloader( - &self, - config: &Config, - consensus: &Arc, - fetch_client: &Arc, - db: &Arc>, - ) -> reth_downloaders::bodies::task::TaskDownloader { - let bodies_conf = &config.stages.bodies; - bodies::task::TaskDownloader::spawn( - bodies::bodies::BodiesDownloaderBuilder::default() - .with_stream_batch_size(bodies_conf.downloader_stream_batch_size) - .with_request_limit(bodies_conf.downloader_request_limit) - .with_max_buffered_responses(bodies_conf.downloader_max_buffered_responses) - .with_concurrent_requests_range( - bodies_conf.downloader_min_concurrent_requests..= - bodies_conf.downloader_max_concurrent_requests, - ) - .build(fetch_client.clone(), consensus.clone(), db.clone()), - ) - } } /// Dumps peers to `file_path` for persistence. @@ -316,8 +320,50 @@ struct NodeState { current_checkpoint: BlockNumber, } +impl NodeState { + async fn handle_pipeline_event(&mut self, event: PipelineEvent) { + match event { + PipelineEvent::Running { stage_id, stage_progress } => { + let notable = self.current_stage.is_none(); + self.current_stage = Some(stage_id); + self.current_checkpoint = stage_progress.unwrap_or_default(); + + if notable { + info!(target: "reth::cli", stage = %stage_id, from = stage_progress, "Executing stage"); + } + } + PipelineEvent::Ran { stage_id, result } => { + let notable = result.stage_progress > self.current_checkpoint; + self.current_checkpoint = result.stage_progress; + if result.done { + self.current_stage = None; + info!(target: "reth::cli", stage = %stage_id, checkpoint = result.stage_progress, "Stage finished executing"); + } else if notable { + info!(target: "reth::cli", stage = %stage_id, checkpoint = result.stage_progress, "Stage committed progress"); + } + } + _ => (), + } + } + + async fn handle_network_event(&mut self, event: NetworkEvent) { + match event { + NetworkEvent::SessionEstablished { peer_id, status, .. } => { + self.connected_peers += 1; + info!(target: "reth::cli", connected_peers = self.connected_peers, peer_id = %peer_id, best_block = %status.blockhash, "Peer connected"); + } + NetworkEvent::SessionClosed { peer_id, reason } => { + self.connected_peers -= 1; + let reason = reason.map(|s| s.to_string()).unwrap_or_else(|| "None".to_string()); + warn!(target: "reth::cli", connected_peers = self.connected_peers, peer_id = %peer_id, %reason, "Peer disconnected."); + } + _ => (), + } + } +} + /// A node event. -enum NodeEvent { +pub enum NodeEvent { /// A network event. Network(NetworkEvent), /// A sync pipeline event. @@ -338,7 +384,7 @@ impl From for NodeEvent { /// Displays relevant information to the user from components of the node, and periodically /// displays the high-level status of the node. -async fn handle_events(mut events: impl Stream + Unpin) { +pub async fn handle_events(mut events: impl Stream + Unpin) { let mut state = NodeState::default(); let mut interval = tokio::time::interval(Duration::from_secs(30)); @@ -347,35 +393,12 @@ async fn handle_events(mut events: impl Stream + Unpin) { tokio::select! { Some(event) = events.next() => { match event { - NodeEvent::Network(NetworkEvent::SessionEstablished { peer_id, status, .. }) => { - state.connected_peers += 1; - info!(target: "reth::cli", connected_peers = state.connected_peers, peer_id = %peer_id, best_block = %status.blockhash, "Peer connected"); + NodeEvent::Network(event) => { + state.handle_network_event(event).await; }, - NodeEvent::Network(NetworkEvent::SessionClosed { peer_id, reason }) => { - state.connected_peers -= 1; - let reason = reason.map(|s| s.to_string()).unwrap_or_else(|| "None".to_string()); - warn!(target: "reth::cli", connected_peers = state.connected_peers, peer_id = %peer_id, %reason, "Peer disconnected."); - }, - NodeEvent::Pipeline(PipelineEvent::Running { stage_id, stage_progress }) => { - let notable = state.current_stage.is_none(); - state.current_stage = Some(stage_id); - state.current_checkpoint = stage_progress.unwrap_or_default(); - - if notable { - info!(target: "reth::cli", stage = %stage_id, from = stage_progress, "Executing stage"); - } - }, - NodeEvent::Pipeline(PipelineEvent::Ran { stage_id, result }) => { - let notable = result.stage_progress > state.current_checkpoint; - state.current_checkpoint = result.stage_progress; - if result.done { - state.current_stage = None; - info!(target: "reth::cli", stage = %stage_id, checkpoint = result.stage_progress, "Stage finished executing"); - } else if notable { - info!(target: "reth::cli", stage = %stage_id, checkpoint = result.stage_progress, "Stage committed progress"); - } + NodeEvent::Pipeline(event) => { + state.handle_pipeline_event(event).await; } - _ => (), } }, _ = interval.tick() => { diff --git a/crates/executor/src/executor.rs b/crates/executor/src/executor.rs index 88b0a539e..0d4ce498a 100644 --- a/crates/executor/src/executor.rs +++ b/crates/executor/src/executor.rs @@ -67,6 +67,7 @@ where ..Default::default() }, ); + self.evm.env.cfg.chain_id = U256::from(self.chain_spec.chain().id()); self.evm.env.cfg.spec_id = spec_id; self.evm.env.cfg.perf_all_precompiles_have_balance = false; diff --git a/crates/net/downloaders/Cargo.toml b/crates/net/downloaders/Cargo.toml index 507dd9951..69b0522a2 100644 --- a/crates/net/downloaders/Cargo.toml +++ b/crates/net/downloaders/Cargo.toml @@ -32,6 +32,7 @@ reth-rlp = { path = "../../rlp", optional = true } tokio-util = { version = "0.7", features = ["codec"], optional = true } bytes = { version = "1", optional = true } tempfile = { version = "3.3", optional = true } +itertools = { version = "0.10", optional = true } [dev-dependencies] reth-db = { path = "../../storage/db", features = ["test-utils"] } @@ -42,10 +43,11 @@ assert_matches = "1.5.0" tokio = { version = "1", features = ["macros", "rt-multi-thread"] } tokio-util = { version = "0.7", features = ["codec"] } reth-rlp = { path = "../../rlp" } +itertools = "0.10" bytes = "1" thiserror = "1" tempfile = "3.3" [features] -test-utils = ["dep:reth-rlp", "dep:thiserror", "dep:tokio-util", "dep:tempfile", "dep:bytes"] +test-utils = ["dep:reth-rlp", "dep:thiserror", "dep:tokio-util", "dep:tempfile", "dep:bytes", "dep:itertools"] diff --git a/crates/net/downloaders/src/bodies/bodies.rs b/crates/net/downloaders/src/bodies/bodies.rs index 96f89ea54..845d6f160 100644 --- a/crates/net/downloaders/src/bodies/bodies.rs +++ b/crates/net/downloaders/src/bodies/bodies.rs @@ -1,6 +1,5 @@ -use crate::metrics::DownloaderMetrics; - use super::queue::BodiesRequestQueue; +use crate::{bodies::task::TaskDownloader, metrics::DownloaderMetrics}; use futures::Stream; use futures_util::StreamExt; use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx}; @@ -186,7 +185,7 @@ where fn is_terminated(&self) -> bool { // There is nothing to request if the range is empty let nothing_to_request = self.download_range.is_empty() || - // or all blocks have already been requested. + // or all blocks have already been requested. self.in_progress_queue .last_requested_block_number .map(|last| last + 1 == self.download_range.end) @@ -241,6 +240,19 @@ where } } +impl BodiesDownloader +where + B: BodiesClient + 'static, + DB: Database, + Self: BodyDownloader + 'static, +{ + /// Convert the downloader into a [`TaskDownloader`](super::task::TaskDownloader) by spawning + /// it. + pub fn as_task(self) -> TaskDownloader { + TaskDownloader::spawn(self) + } +} + impl BodyDownloader for BodiesDownloader where B: BodiesClient + 'static, diff --git a/crates/net/downloaders/src/headers/reverse_headers.rs b/crates/net/downloaders/src/headers/reverse_headers.rs index 334a22170..fd547d6c5 100644 --- a/crates/net/downloaders/src/headers/reverse_headers.rs +++ b/crates/net/downloaders/src/headers/reverse_headers.rs @@ -1,5 +1,6 @@ //! A headers downloader that can handle multiple requests concurrently. +use super::task::TaskDownloader; use crate::metrics::DownloaderMetrics; use futures::{stream::Stream, FutureExt}; use futures_util::{stream::FuturesUnordered, StreamExt}; @@ -357,6 +358,8 @@ where // validate the response let highest = &headers[0]; + trace!(target: "downloaders::headers", requested_block_number, highest=?highest.number, "Validating non-empty headers response"); + if highest.number != requested_block_number { return Err(HeadersResponseError { request, @@ -462,6 +465,7 @@ where /// Starts a request future fn submit_request(&mut self, request: HeadersRequest, priority: Priority) { + trace!(target: "downloaders::headers", ?request, "Submitting headers request"); self.in_progress_queue.push(self.request_fut(request, priority)); } @@ -499,6 +503,18 @@ where } } +impl ReverseHeadersDownloader +where + H: HeadersClient, + Self: HeaderDownloader + 'static, +{ + /// Convert the downloader into a [`TaskDownloader`](super::task::TaskDownloader) by spawning + /// it. + pub fn as_task(self) -> TaskDownloader { + TaskDownloader::spawn(self) + } +} + impl HeaderDownloader for ReverseHeadersDownloader where H: HeadersClient + 'static, diff --git a/crates/net/downloaders/src/test_utils/file_client.rs b/crates/net/downloaders/src/test_utils/file_client.rs index ea5a10db9..3894d1824 100644 --- a/crates/net/downloaders/src/test_utils/file_client.rs +++ b/crates/net/downloaders/src/test_utils/file_client.rs @@ -1,4 +1,5 @@ use super::file_codec::BlockFileCodec; +use itertools::Either; use reth_eth_wire::{BlockBody, RawBlockBody}; use reth_interfaces::{ p2p::{ @@ -30,7 +31,7 @@ use tokio::{ }; use tokio_stream::StreamExt; use tokio_util::codec::FramedRead; -use tracing::warn; +use tracing::{trace, warn}; /// Front-end API for fetching chain data from a file. /// @@ -94,26 +95,29 @@ impl FileClient { // use with_capacity to make sure the internal buffer contains the entire file let mut stream = FramedRead::with_capacity(&reader[..], BlockFileCodec, file_len as usize); - let mut block_num = 0; while let Some(block_res) = stream.next().await { let block = block_res?; let block_hash = block.header.hash_slow(); // add to the internal maps - headers.insert(block_num, block.header.clone()); - hash_to_number.insert(block_hash, block_num); + headers.insert(block.header.number, block.header.clone()); + hash_to_number.insert(block_hash, block.header.number); bodies.insert( block_hash, BlockBody { transactions: block.transactions, ommers: block.ommers }, ); - - // update block num - block_num += 1; } + trace!(blocks = headers.len(), "Initialized file client"); + Ok(Self { headers, hash_to_number, bodies, is_syncing: Arc::new(Default::default()) }) } + /// Get the tip hash of the chain. + pub fn tip(&self) -> Option { + self.headers.get(&(self.headers.len() as u64 - 1)).map(|h| h.hash_slow()) + } + /// Use the provided bodies as the file client's block body buffer. pub(crate) fn with_bodies(mut self, bodies: HashMap) -> Self { self.bodies = bodies; @@ -140,24 +144,39 @@ impl HeadersClient for FileClient { ) -> Self::Output { // this just searches the buffer, and fails if it can't find the header let mut headers = Vec::new(); + trace!(target : "downloaders::file", request=?request, "Getting headers"); let start_num = match request.start { BlockHashOrNumber::Hash(hash) => match self.hash_to_number.get(&hash) { Some(num) => *num, - None => return Box::pin(async move { Err(RequestError::BadResponse) }), + None => { + warn!(%hash, "Could not find starting block number for requested header hash"); + return Box::pin(async move { Err(RequestError::BadResponse) }) + } }, BlockHashOrNumber::Number(num) => num, }; - let range = match request.direction { - HeadersDirection::Rising => start_num..=start_num + 1 - request.limit, - HeadersDirection::Falling => start_num + 1 - request.limit..=start_num, + let range = if request.limit == 1 { + Either::Left(start_num..start_num + 1) + } else { + match request.direction { + HeadersDirection::Rising => Either::Left(start_num..start_num + request.limit), + HeadersDirection::Falling => { + Either::Right((start_num - request.limit + 1..=start_num).rev()) + } + } }; + trace!(target : "downloaders::file", range=?range, "Getting headers with range"); + for block_number in range { match self.headers.get(&block_number).cloned() { Some(header) => headers.push(header), - None => return Box::pin(async move { Err(RequestError::BadResponse) }), + None => { + warn!(number=%block_number, "Could not find header"); + return Box::pin(async move { Err(RequestError::BadResponse) }) + } } } diff --git a/crates/primitives/src/chain/mod.rs b/crates/primitives/src/chain/mod.rs index 55a68a0c1..d03aff38a 100644 --- a/crates/primitives/src/chain/mod.rs +++ b/crates/primitives/src/chain/mod.rs @@ -7,7 +7,9 @@ use std::{fmt, str::FromStr}; // The chain spec module. mod spec; -pub use spec::{ChainSpec, ChainSpecBuilder, ForkCondition, GOERLI, MAINNET, SEPOLIA}; +pub use spec::{ + AllGenesisFormats, ChainSpec, ChainSpecBuilder, ForkCondition, GOERLI, MAINNET, SEPOLIA, +}; // The chain info module. mod info; diff --git a/crates/primitives/src/chain/spec.rs b/crates/primitives/src/chain/spec.rs index 0a636b2cf..cee8a7832 100644 --- a/crates/primitives/src/chain/spec.rs +++ b/crates/primitives/src/chain/spec.rs @@ -260,6 +260,37 @@ impl From for ChainSpec { } } +/// A helper type for compatibility with geth's config +#[derive(Debug, Clone, Deserialize)] +#[serde(untagged)] +pub enum AllGenesisFormats { + /// The geth genesis format + Geth(EthersGenesis), + /// The reth genesis format + Reth(ChainSpec), +} + +impl From for AllGenesisFormats { + fn from(genesis: EthersGenesis) -> Self { + Self::Geth(genesis) + } +} + +impl From for AllGenesisFormats { + fn from(genesis: ChainSpec) -> Self { + Self::Reth(genesis) + } +} + +impl From for ChainSpec { + fn from(genesis: AllGenesisFormats) -> Self { + match genesis { + AllGenesisFormats::Geth(genesis) => genesis.into(), + AllGenesisFormats::Reth(genesis) => genesis, + } + } +} + /// A helper to build custom chain specs #[derive(Debug, Default)] pub struct ChainSpecBuilder { diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index 010cdff5b..3c8e434cd 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -38,7 +38,8 @@ pub use bits::H512; pub use block::{Block, BlockHashOrNumber, SealedBlock}; pub use bloom::Bloom; pub use chain::{ - Chain, ChainInfo, ChainSpec, ChainSpecBuilder, ForkCondition, GOERLI, MAINNET, SEPOLIA, + AllGenesisFormats, Chain, ChainInfo, ChainSpec, ChainSpecBuilder, ForkCondition, GOERLI, + MAINNET, SEPOLIA, }; pub use constants::{ EMPTY_OMMER_ROOT, GOERLI_GENESIS, KECCAK_EMPTY, MAINNET_GENESIS, SEPOLIA_GENESIS, diff --git a/crates/staged-sync/Cargo.toml b/crates/staged-sync/Cargo.toml index 4d18f0589..143fe9e74 100644 --- a/crates/staged-sync/Cargo.toml +++ b/crates/staged-sync/Cargo.toml @@ -19,6 +19,7 @@ reth-db = {path = "../../crates/storage/db", features = ["mdbx", "test-utils"] } reth-discv4 = { path = "../../crates/net/discv4" } reth-network-api = { path = "../../crates/net/network-api" } reth-network = { path = "../../crates/net/network", features = ["serde"] } +reth-downloaders = { path = "../../crates/net/downloaders" } reth-primitives = { path = "../../crates/primitives" } reth-provider = { path = "../../crates/storage/provider", features = ["test-utils"] } reth-net-nat = { path = "../../crates/net/nat" } diff --git a/crates/staged-sync/src/config.rs b/crates/staged-sync/src/config.rs index c74d38a4d..7d3e32699 100644 --- a/crates/staged-sync/src/config.rs +++ b/crates/staged-sync/src/config.rs @@ -3,6 +3,10 @@ use std::{path::PathBuf, sync::Arc}; use reth_db::database::Database; use reth_discv4::Discv4Config; +use reth_downloaders::{ + bodies::bodies::BodiesDownloaderBuilder, + headers::reverse_headers::ReverseHeadersDownloaderBuilder, +}; use reth_network::{ config::{mainnet_nodes, rng_secret_key}, NetworkConfig, NetworkConfigBuilder, PeersConfig, @@ -66,24 +70,30 @@ pub struct StageConfig { } /// Header stage configuration. -#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)] +#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)] pub struct HeadersConfig { /// The maximum number of headers to download before committing progress to the database. pub commit_threshold: u64, /// The maximum number of headers to request from a peer at a time. pub downloader_batch_size: u64, - /// The number of times to retry downloading a set of headers. - pub downloader_retries: usize, } impl Default for HeadersConfig { fn default() -> Self { - Self { commit_threshold: 10_000, downloader_batch_size: 1000, downloader_retries: 5 } + Self { commit_threshold: 10_000, downloader_batch_size: 1000 } + } +} + +impl From for ReverseHeadersDownloaderBuilder { + fn from(config: HeadersConfig) -> Self { + ReverseHeadersDownloaderBuilder::default() + .request_limit(config.downloader_batch_size) + .stream_batch_size(config.commit_threshold as usize) } } /// Total difficulty stage configuration -#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)] +#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)] pub struct TotalDifficultyConfig { /// The maximum number of total difficulty entries to sum up before committing progress to the /// database. @@ -97,7 +107,7 @@ impl Default for TotalDifficultyConfig { } /// Body stage configuration. -#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)] +#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)] pub struct BodiesConfig { /// The batch size of non-empty blocks per one request pub downloader_request_limit: u64, @@ -124,8 +134,21 @@ impl Default for BodiesConfig { } } +impl From for BodiesDownloaderBuilder { + fn from(config: BodiesConfig) -> Self { + BodiesDownloaderBuilder::default() + .with_stream_batch_size(config.downloader_stream_batch_size) + .with_request_limit(config.downloader_request_limit) + .with_max_buffered_responses(config.downloader_max_buffered_responses) + .with_concurrent_requests_range( + config.downloader_min_concurrent_requests..= + config.downloader_max_concurrent_requests, + ) + } +} + /// Sender recovery stage configuration. -#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq, Serialize)] pub struct SenderRecoveryConfig { /// The maximum number of blocks to process before committing progress to the database. pub commit_threshold: u64, @@ -140,7 +163,7 @@ impl Default for SenderRecoveryConfig { } /// Execution stage configuration. -#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)] +#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)] pub struct ExecutionConfig { /// The maximum number of blocks to execution before committing progress to the database. pub commit_threshold: u64, diff --git a/crates/staged-sync/src/utils/chainspec.rs b/crates/staged-sync/src/utils/chainspec.rs index d016c2530..76a6401d2 100644 --- a/crates/staged-sync/src/utils/chainspec.rs +++ b/crates/staged-sync/src/utils/chainspec.rs @@ -1,4 +1,4 @@ -use reth_primitives::{ChainSpec, GOERLI, MAINNET, SEPOLIA}; +use reth_primitives::{AllGenesisFormats, ChainSpec, GOERLI, MAINNET, SEPOLIA}; use std::path::PathBuf; /// Clap value parser for [ChainSpec]s that takes either a built-in chainspec or the path @@ -14,3 +14,18 @@ pub fn chain_spec_value_parser(s: &str) -> Result { } }) } + +/// Clap value parser for [ChainSpec]s that takes either a built-in genesis format or the path +/// to a custom one. +pub fn genesis_value_parser(s: &str) -> Result { + Ok(match s { + "mainnet" => MAINNET.clone(), + "goerli" => GOERLI.clone(), + "sepolia" => SEPOLIA.clone(), + _ => { + let raw = std::fs::read_to_string(PathBuf::from(shellexpand::full(s)?.into_owned()))?; + let genesis: AllGenesisFormats = serde_json::from_str(&raw)?; + genesis.into() + } + }) +}