diff --git a/Cargo.lock b/Cargo.lock index c016c2b49..2617e6409 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3175,7 +3175,9 @@ dependencies = [ "eyre", "reth-consensus", "reth-db", + "reth-downloaders", "reth-interfaces", + "reth-network", "reth-primitives", "reth-provider", "reth-rpc", diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index 09ab265ad..7d06b5a03 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -10,12 +10,15 @@ readme = "README.md" # reth reth-primitives = { path = "../../crates/primitives" } reth-db = {path = "../../crates/storage/db", features = ["mdbx"]} -reth-provider = {path = "../../crates/storage/provider" } +# TODO: Temporary use of the test-utils feature +reth-provider = {path = "../../crates/storage/provider", features = ["test-utils"]} reth-stages = {path = "../../crates/stages"} reth-interfaces = {path = "../../crates/interfaces", features = ["test-utils"] } reth-transaction-pool = {path = "../../crates/transaction-pool"} reth-consensus = {path = "../../crates/consensus"} reth-rpc = {path = "../../crates/net/rpc"} +reth-network = {path = "../../crates/net/network"} +reth-downloaders = {path = "../../crates/net/downloaders"} # tracing tracing = "0.1" @@ -26,7 +29,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } shellexpand = "2.1" eyre = "0.6.8" -clap = { version = "4.0", features = ["derive"] } +clap = { version = "4.0", features = ["derive", "cargo"] } thiserror = "1.0" tokio = { version = "1.21", features = ["sync", "macros", "rt-multi-thread"] } serde = "1.0" diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index d53eecfeb..da6bd5efc 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -1,48 +1,179 @@ //! Main node command //! //! Starts the client +use clap::{crate_version, Parser}; +use reth_consensus::EthConsensus; +use reth_db::{ + cursor::DbCursorRO, + database::Database, + mdbx::{Env, WriteMap}, + tables, + transaction::{DbTx, DbTxMut}, +}; +use reth_downloaders::{bodies, headers}; +use reth_interfaces::consensus::ForkchoiceState; +use reth_network::{ + config::{mainnet_nodes, rng_secret_key}, + error::NetworkError, + NetworkConfig, NetworkHandle, NetworkManager, +}; +use reth_primitives::{hex_literal::hex, Bytes, Header, H256, U256}; +use reth_provider::{db_provider::ProviderImpl, BlockProvider, HeaderProvider}; +use reth_stages::stages::{bodies::BodyStage, headers::HeaderStage, senders::SendersStage}; +use std::{path::Path, str::FromStr, sync::Arc}; +use tracing::{debug, info}; -use clap::Parser; -use std::{path::Path, sync::Arc}; -use tracing::info; - -/// Execute Ethereum blockchain tests by specifying path to json files +/// Start the client #[derive(Debug, Parser)] pub struct Command { /// Path to database folder + // TODO: This should use dirs-next #[arg(long, default_value = "~/.reth/db")] db: String, } impl Command { /// Execute `node` command + // TODO: RPC, metrics pub async fn execute(&self) -> eyre::Result<()> { - info!("Rust Ethereum client"); - - info!("Initialize components:"); + info!("reth {} starting", crate_version!()); let path = shellexpand::full(&self.db)?.into_owned(); let expanded_db_path = Path::new(&path); - std::fs::create_dir_all(expanded_db_path)?; - let db = Arc::new(reth_db::mdbx::Env::::open( - expanded_db_path, - reth_db::mdbx::EnvKind::RW, - )?); - info!("DB opened"); + let db = Arc::new(init_db(expanded_db_path)?); + info!("Database ready"); - // let _p2p = (); - // let _consensus = (); - // let _rpc = (); + // TODO: Write genesis info + // TODO: Here we should parse and validate the chainspec and build consensus/networking + // stuff off of that + let consensus = Arc::new(EthConsensus::new(consensus_config())); + init_genesis(db.clone())?; - let mut pipeline = reth_stages::Pipeline::new(); + info!("Connecting to p2p"); + let network = start_network(network_config(db.clone())).await?; - // define all stages here + // TODO: Are most of these Arcs unnecessary? For example, fetch client is completely + // cloneable on its own + // TODO: Remove magic numbers + let fetch_client = Arc::new(network.fetch_client().await?); + let mut pipeline = reth_stages::Pipeline::new() + .push( + HeaderStage { + downloader: headers::linear::LinearDownloadBuilder::default() + .build(consensus.clone(), fetch_client.clone()), + consensus: consensus.clone(), + client: fetch_client.clone(), + commit_threshold: 100, + }, + false, + ) + .push( + BodyStage { + downloader: Arc::new(bodies::concurrent::ConcurrentDownloader::new( + fetch_client.clone(), + )), + consensus: consensus.clone(), + batch_size: 100, + }, + false, + ) + .push(SendersStage { batch_size: 1000, commit_threshold: 100 }, false); - // run pipeline - info!("Pipeline started:"); + // Run pipeline + info!("Starting pipeline"); + // TODO: This is a temporary measure to set the fork choice state, but this should be + // handled by the engine API + consensus.notify_fork_choice_state(ForkchoiceState { + // NOTE: This is block 1000 + head_block_hash: H256(hex!( + "5b4590a9905fa1c9cc273f32e6dc63b4c512f0ee14edc6fa41c26b416a7b5d58" + )), + safe_block_hash: H256(hex!( + "5b4590a9905fa1c9cc273f32e6dc63b4c512f0ee14edc6fa41c26b416a7b5d58" + )), + finalized_block_hash: H256(hex!( + "5b4590a9905fa1c9cc273f32e6dc63b4c512f0ee14edc6fa41c26b416a7b5d58" + )), + })?; pipeline.run(db.clone()).await?; - info!("Finishing"); + info!("Finishing up"); Ok(()) } } + +/// Opens up an existing database or creates a new one at the specified path. +fn init_db>(path: P) -> eyre::Result> { + std::fs::create_dir_all(path.as_ref())?; + let db = reth_db::mdbx::Env::::open( + path.as_ref(), + reth_db::mdbx::EnvKind::RW, + )?; + db.create_tables()?; + + Ok(db) +} + +/// Write the genesis block if it has not already been written +#[allow(clippy::field_reassign_with_default)] +fn init_genesis(db: Arc) -> Result<(), reth_db::Error> { + let tx = db.tx_mut()?; + let has_block = tx.cursor::()?.first()?.is_some(); + if has_block { + debug!("Genesis already written, skipping."); + return Ok(()) + } + + debug!("Writing genesis block."); + + // TODO: Should be based on a chain spec + let mut genesis = Header::default(); + genesis.gas_limit = 5000; + genesis.difficulty = U256::from(0x400000000usize); + genesis.nonce = 0x0000000000000042; + genesis.extra_data = + Bytes::from_str("11bbe8db4e347b4e8c937c1c8370e4b5ed33adb3db69cbdb7a38e1e50b1b82fa") + .unwrap() + .0; + genesis.state_root = + H256(hex!("d7f8974fb5ac78d9ac099b9ad5018bedc2ce0a72dad1827a1709da30580f0544")); + let genesis = genesis.seal(); + + // Insert genesis + tx.put::(genesis.number, genesis.hash())?; + tx.put::(genesis.num_hash().into(), genesis.as_ref().clone())?; + tx.put::(genesis.hash(), genesis.number)?; + tx.put::(genesis.num_hash().into(), 0)?; + tx.put::(genesis.num_hash().into(), genesis.difficulty.into())?; + tx.commit()?; + + // TODO: Account balances + Ok(()) +} + +// TODO: This should be based on some external config +fn network_config(db: Arc) -> NetworkConfig> { + NetworkConfig::builder(Arc::new(ProviderImpl::new(db)), rng_secret_key()) + .boot_nodes(mainnet_nodes()) + .build() +} + +// TODO: This should be based on some external config +fn consensus_config() -> reth_consensus::Config { + reth_consensus::Config::default() +} + +/// Starts the networking stack given a [NetworkConfig] and returns a handle to the network. +async fn start_network(config: NetworkConfig) -> Result +where + C: BlockProvider + HeaderProvider + 'static, +{ + let client = config.client.clone(); + let (handle, network, _txpool, eth) = + NetworkManager::builder(config).await?.request_handler(client).split_with_handle(); + + tokio::task::spawn(network); + //tokio::task::spawn(txpool); + tokio::task::spawn(eth); + Ok(handle) +} diff --git a/crates/consensus/src/consensus.rs b/crates/consensus/src/consensus.rs index 86369dbf8..604bb4f64 100644 --- a/crates/consensus/src/consensus.rs +++ b/crates/consensus/src/consensus.rs @@ -3,7 +3,7 @@ use crate::{verification, Config}; use reth_interfaces::consensus::{Consensus, Error, ForkchoiceState}; use reth_primitives::{BlockLocked, BlockNumber, SealedHeader, H256}; -use tokio::sync::watch; +use tokio::sync::{watch, watch::error::SendError}; /// Ethereum consensus pub struct EthConsensus { @@ -14,7 +14,7 @@ pub struct EthConsensus { } impl EthConsensus { - /// Create new object + /// Create a new instance of [EthConsensus] pub fn new(config: Config) -> Self { Self { channel: watch::channel(ForkchoiceState { @@ -25,6 +25,14 @@ impl EthConsensus { config, } } + + /// Notifies all listeners of the latest [ForkchoiceState]. + pub fn notify_fork_choice_state( + &self, + state: ForkchoiceState, + ) -> Result<(), SendError> { + self.channel.0.send(state) + } } impl Consensus for EthConsensus { diff --git a/crates/net/discv4/src/bootnodes.rs b/crates/net/discv4/src/bootnodes.rs index ca4b4cdf7..3d5a94ed9 100644 --- a/crates/net/discv4/src/bootnodes.rs +++ b/crates/net/discv4/src/bootnodes.rs @@ -26,18 +26,18 @@ pub static SEPOLIA_BOOTNODES : [&str; 2] = [ /// GOERLI bootnodes pub static GOERLI_BOOTNODES : [&str; 7] = [ -// Upstream bootnodes -"enode://011f758e6552d105183b1761c5e2dea0111bc20fd5f6422bc7f91e0fabbec9a6595caf6239b37feb773dddd3f87240d99d859431891e4a642cf2a0a9e6cbb98a@51.141.78.53:30303", -"enode://176b9417f511d05b6b2cf3e34b756cf0a7096b3094572a8f6ef4cdcb9d1f9d00683bf0f83347eebdf3b81c3521c2332086d9592802230bf528eaf606a1d9677b@13.93.54.137:30303", -"enode://46add44b9f13965f7b9875ac6b85f016f341012d84f975377573800a863526f4da19ae2c620ec73d11591fa9510e992ecc03ad0751f53cc02f7c7ed6d55c7291@94.237.54.114:30313", -"enode://b5948a2d3e9d486c4d75bf32713221c2bd6cf86463302339299bd227dc2e276cd5a1c7ca4f43a0e9122fe9af884efed563bd2a1fd28661f3b5f5ad7bf1de5949@18.218.250.66:30303", + // Upstream bootnodes + "enode://011f758e6552d105183b1761c5e2dea0111bc20fd5f6422bc7f91e0fabbec9a6595caf6239b37feb773dddd3f87240d99d859431891e4a642cf2a0a9e6cbb98a@51.141.78.53:30303", + "enode://176b9417f511d05b6b2cf3e34b756cf0a7096b3094572a8f6ef4cdcb9d1f9d00683bf0f83347eebdf3b81c3521c2332086d9592802230bf528eaf606a1d9677b@13.93.54.137:30303", + "enode://46add44b9f13965f7b9875ac6b85f016f341012d84f975377573800a863526f4da19ae2c620ec73d11591fa9510e992ecc03ad0751f53cc02f7c7ed6d55c7291@94.237.54.114:30313", + "enode://b5948a2d3e9d486c4d75bf32713221c2bd6cf86463302339299bd227dc2e276cd5a1c7ca4f43a0e9122fe9af884efed563bd2a1fd28661f3b5f5ad7bf1de5949@18.218.250.66:30303", -// Ethereum Foundation bootnode -"enode://a61215641fb8714a373c80edbfa0ea8878243193f57c96eeb44d0bc019ef295abd4e044fd619bfc4c59731a73fb79afe84e9ab6da0c743ceb479cbb6d263fa91@3.11.147.67:30303", + // Ethereum Foundation bootnode + "enode://a61215641fb8714a373c80edbfa0ea8878243193f57c96eeb44d0bc019ef295abd4e044fd619bfc4c59731a73fb79afe84e9ab6da0c743ceb479cbb6d263fa91@3.11.147.67:30303", -// Goerli Initiative bootnodes -"enode://d4f764a48ec2a8ecf883735776fdefe0a3949eb0ca476bd7bc8d0954a9defe8fea15ae5da7d40b5d2d59ce9524a99daedadf6da6283fca492cc80b53689fb3b3@46.4.99.122:32109", -"enode://d2b720352e8216c9efc470091aa91ddafc53e222b32780f505c817ceef69e01d5b0b0797b69db254c586f493872352f5a022b4d8479a00fc92ec55f9ad46a27e@88.99.70.182:30303", + // Goerli Initiative bootnodes + "enode://d4f764a48ec2a8ecf883735776fdefe0a3949eb0ca476bd7bc8d0954a9defe8fea15ae5da7d40b5d2d59ce9524a99daedadf6da6283fca492cc80b53689fb3b3@46.4.99.122:32109", + "enode://d2b720352e8216c9efc470091aa91ddafc53e222b32780f505c817ceef69e01d5b0b0797b69db254c586f493872352f5a022b4d8479a00fc92ec55f9ad46a27e@88.99.70.182:30303", ]; /// Returns parsed mainnet nodes diff --git a/crates/net/downloaders/src/headers/linear.rs b/crates/net/downloaders/src/headers/linear.rs index 7c99bc9fe..3f1f4556b 100644 --- a/crates/net/downloaders/src/headers/linear.rs +++ b/crates/net/downloaders/src/headers/linear.rs @@ -218,11 +218,13 @@ where /// Validate whether the header is valid in relation to it's parent /// /// Returns Ok(false) if the + #[allow(clippy::result_large_err)] fn validate(&self, header: &SealedHeader, parent: &SealedHeader) -> Result<(), DownloadError> { validate_header_download(&self.consensus, header, parent)?; Ok(()) } + #[allow(clippy::result_large_err)] fn process_header_response( &mut self, response: PeerRequestResult, diff --git a/crates/net/network/src/config.rs b/crates/net/network/src/config.rs index a4e1d8f30..c674783c6 100644 --- a/crates/net/network/src/config.rs +++ b/crates/net/network/src/config.rs @@ -186,7 +186,7 @@ impl NetworkConfigBuilder { self } - /// set a custom peer config for how peers are handled + /// Set a custom peer config for how peers are handled pub fn peer_config(mut self, config: PeersConfig) -> Self { self.peers_config = Some(config); self diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index 18a9d9e33..3b34104dd 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -95,6 +95,7 @@ impl Stage, _>>() { Ok(res) => { // Perform basic response validation @@ -163,6 +164,7 @@ impl HeaderStage { loop { let _ = state_rcv.changed().await; let forkchoice = state_rcv.borrow(); + debug!(?forkchoice, "Received fork choice state"); if !forkchoice.head_block_hash.is_zero() && forkchoice.head_block_hash != *head { return forkchoice.clone() } diff --git a/crates/storage/provider/src/db_provider.rs b/crates/storage/provider/src/db_provider.rs index 6e838ffc6..3a64b73ff 100644 --- a/crates/storage/provider/src/db_provider.rs +++ b/crates/storage/provider/src/db_provider.rs @@ -12,7 +12,8 @@ pub use storage::{ use reth_db::database::Database; -/// Provider +/// A provider that fetches data from a database. +// TODO: ProviderImpl is a bad name pub struct ProviderImpl { /// Database db: Arc, diff --git a/crates/storage/provider/src/db_provider/block.rs b/crates/storage/provider/src/db_provider/block.rs index 1de611e37..6f93a8e69 100644 --- a/crates/storage/provider/src/db_provider/block.rs +++ b/crates/storage/provider/src/db_provider/block.rs @@ -1,7 +1,7 @@ -use crate::{HeaderProvider, ProviderImpl}; +use crate::{BlockProvider, ChainInfo, HeaderProvider, ProviderImpl}; use reth_db::{database::Database, tables, transaction::DbTx}; use reth_interfaces::Result; -use reth_primitives::{BlockNumber, Header}; +use reth_primitives::{rpc::BlockId, Block, BlockNumber, Header, H256, U256}; impl HeaderProvider for ProviderImpl { fn header(&self, block_hash: &reth_primitives::BlockHash) -> Result> { @@ -16,3 +16,30 @@ impl HeaderProvider for ProviderImpl { } } } + +impl BlockProvider for ProviderImpl { + fn chain_info(&self) -> Result { + Ok(ChainInfo { + best_hash: Default::default(), + best_number: 0, + last_finalized: None, + safe_finalized: None, + }) + } + + fn block(&self, _id: BlockId) -> Result> { + // TODO + Ok(None) + } + + fn block_number(&self, hash: H256) -> Result> { + self.db.view(|tx| tx.get::(hash))?.map_err(Into::into) + } + + fn block_hash(&self, number: U256) -> Result> { + // TODO: This unwrap is potentially unsafe + self.db + .view(|tx| tx.get::(number.try_into().unwrap()))? + .map_err(Into::into) + } +}