From d216081b5828ed99d7a255e5ef8bbea2481112b5 Mon Sep 17 00:00:00 2001 From: Bjerg Date: Tue, 14 Feb 2023 17:10:50 +0100 Subject: [PATCH] feat: add `Finish` stage (#1279) --- bin/reth/src/args/network_args.rs | 25 ++- bin/reth/src/chain/import.rs | 34 ++-- bin/reth/src/node/mod.rs | 79 +++++--- bin/reth/src/p2p/mod.rs | 14 +- bin/reth/src/stage/mod.rs | 20 +- crates/interfaces/src/p2p/headers/client.rs | 9 + crates/interfaces/src/test_utils/headers.rs | 23 ++- crates/net/network/src/config.rs | 13 +- crates/primitives/src/chain/info.rs | 4 +- crates/staged-sync/src/config.rs | 31 +-- crates/stages/src/lib.rs | 6 +- crates/stages/src/sets.rs | 38 +++- crates/stages/src/stages/finish.rs | 191 ++++++++++++++++++ .../src/stages/index_account_history.rs | 3 +- .../src/stages/index_storage_history.rs | 3 +- crates/stages/src/stages/mod.rs | 3 + crates/stages/src/stages/sender_recovery.rs | 3 +- crates/stages/src/stages/total_difficulty.rs | 3 +- crates/stages/src/stages/tx_lookup.rs | 3 +- crates/storage/provider/src/providers/mod.rs | 28 ++- crates/storage/provider/src/transaction.rs | 15 +- 21 files changed, 414 insertions(+), 134 deletions(-) create mode 100644 crates/stages/src/stages/finish.rs diff --git a/bin/reth/src/args/network_args.rs b/bin/reth/src/args/network_args.rs index aed1adb3b..ee63b7329 100644 --- a/bin/reth/src/args/network_args.rs +++ b/bin/reth/src/args/network_args.rs @@ -2,7 +2,11 @@ use crate::dirs::{KnownPeersPath, PlatformPath}; use clap::Args; -use reth_primitives::NodeRecord; +use reth_discv4::bootnodes::mainnet_nodes; +use reth_net_nat::NatResolver; +use reth_network::NetworkConfigBuilder; +use reth_primitives::{ChainSpec, NodeRecord}; +use reth_staged_sync::Config; use std::path::PathBuf; /// Parameters for configuring the network more granularity via CLI @@ -22,7 +26,7 @@ pub struct NetworkArgs { #[arg(long)] pub trusted_only: bool, - /// Nodes to bootstrap network discovery. + /// Bootnodes to connect to initially. /// /// Will fall back to a network-specific default if not specified. #[arg(long, value_delimiter = ',')] @@ -37,6 +41,23 @@ pub struct NetworkArgs { /// Do not persist peers. Cannot be used with --peers-file #[arg(long, verbatim_doc_comment, conflicts_with = "peers_file")] pub no_persist_peers: bool, + + /// NAT resolution method. + #[arg(long, default_value = "any")] + pub nat: NatResolver, +} + +impl NetworkArgs { + /// Build a [`NetworkConfigBuilder`] from a [`Config`] and a [`ChainSpec`], in addition to the + /// values in this option struct. + pub fn network_config(&self, config: &Config, chain_spec: ChainSpec) -> NetworkConfigBuilder { + let peers_file = (!self.no_persist_peers).then_some(&self.peers_file); + config + .network_config(self.nat, peers_file.map(|f| f.as_ref().to_path_buf())) + .boot_nodes(self.bootnodes.clone().unwrap_or_else(mainnet_nodes)) + .chain_spec(chain_spec) + .set_discovery(self.disable_discovery) + } } // === impl NetworkArgs === diff --git a/bin/reth/src/chain/import.rs b/bin/reth/src/chain/import.rs index ef1f9692b..23f4bd85d 100644 --- a/bin/reth/src/chain/import.rs +++ b/bin/reth/src/chain/import.rs @@ -13,6 +13,7 @@ use reth_downloaders::{ }; use reth_interfaces::{ consensus::{Consensus, ForkchoiceState}, + p2p::headers::client::NoopStatusUpdater, sync::SyncStateUpdater, }; use reth_primitives::ChainSpec; @@ -142,22 +143,23 @@ impl ImportCommand { let mut pipeline = Pipeline::builder() .with_sync_state_updater(file_client) .add_stages( - OnlineStages::new(consensus.clone(), header_downloader, body_downloader).set( - TotalDifficultyStage { - chain_spec: self.chain.clone(), - commit_threshold: config.stages.total_difficulty.commit_threshold, - }, - ), - ) - .add_stages( - OfflineStages::default() - .set(SenderRecoveryStage { - commit_threshold: config.stages.sender_recovery.commit_threshold, - }) - .set(ExecutionStage { - chain_spec: self.chain.clone(), - commit_threshold: config.stages.execution.commit_threshold, - }), + DefaultStages::new( + consensus.clone(), + header_downloader, + body_downloader, + NoopStatusUpdater::default(), + ) + .set(TotalDifficultyStage { + chain_spec: self.chain.clone(), + commit_threshold: config.stages.total_difficulty.commit_threshold, + }) + .set(SenderRecoveryStage { + commit_threshold: config.stages.sender_recovery.commit_threshold, + }) + .set(ExecutionStage { + chain_spec: self.chain.clone(), + commit_threshold: config.stages.execution.commit_threshold, + }), ) .with_max_block(0) .build(); diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 5c5296773..c3dc6a689 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -12,22 +12,29 @@ use eyre::Context; use fdlimit::raise_fd_limit; use futures::{pin_mut, stream::select as stream_select, Stream, StreamExt}; use reth_consensus::beacon::BeaconConsensus; -use reth_db::mdbx::{Env, WriteMap}; +use reth_db::{ + database::Database, + mdbx::{Env, WriteMap}, + tables, + transaction::DbTx, +}; use reth_downloaders::{ bodies::bodies::BodiesDownloaderBuilder, headers::reverse_headers::ReverseHeadersDownloaderBuilder, }; use reth_interfaces::{ consensus::{Consensus, ForkchoiceState}, - p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader}, + p2p::{ + bodies::downloader::BodyDownloader, + headers::{client::StatusUpdater, downloader::HeaderDownloader}, + }, sync::SyncStateUpdater, }; -use reth_net_nat::NatResolver; use reth_network::{ error::NetworkError, NetworkConfig, NetworkEvent, NetworkHandle, NetworkManager, }; use reth_network_api::NetworkInfo; -use reth_primitives::{BlockNumber, ChainSpec, H256}; +use reth_primitives::{BlockNumber, ChainSpec, Head, H256}; use reth_provider::{BlockProvider, HeaderProvider, ShareableDatabase}; use reth_rpc_builder::{RethRpcModule, RpcServerConfig, TransportRpcModuleConfig}; use reth_staged_sync::{ @@ -40,7 +47,7 @@ use reth_staged_sync::{ }; use reth_stages::{ prelude::*, - stages::{ExecutionStage, SenderRecoveryStage, TotalDifficultyStage}, + stages::{ExecutionStage, SenderRecoveryStage, TotalDifficultyStage, FINISH}, }; use reth_tasks::TaskExecutor; use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; @@ -89,9 +96,6 @@ pub struct Command { #[clap(flatten)] network: NetworkArgs, - #[arg(long, default_value = "any")] - nat: NatResolver, - /// Set the chain tip manually for testing purposes. /// /// NOTE: This is a temporary flag @@ -135,8 +139,9 @@ impl Command { self.init_trusted_nodes(&mut config); info!(target: "reth::cli", "Connecting to P2P network"); - let netconf = self.load_network_config(&config, Arc::clone(&db), ctx.task_executor.clone()); - let network = self.start_network(netconf, &ctx.task_executor, ()).await?; + let network_config = + self.load_network_config(&config, Arc::clone(&db), ctx.task_executor.clone()); + let network = self.start_network(network_config, &ctx.task_executor, ()).await?; info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network"); // TODO: Use the resolved secret to spawn the Engine API server @@ -277,22 +282,42 @@ impl Command { Ok(handle) } + fn fetch_head(&self, db: Arc>) -> Result { + db.view(|tx| { + let head = FINISH.get_progress(tx)?.unwrap_or_default(); + let header = tx + .get::(head)? + .expect("the header for the latest block is missing, database is corrupt"); + let total_difficulty = tx.get::(head)?.expect( + "the total difficulty for the latest block is missing, database is corrupt", + ); + let hash = tx + .get::(head)? + .expect("the hash for the latest block is missing, database is corrupt"); + Ok::(Head { + number: head, + hash, + difficulty: header.difficulty, + total_difficulty: total_difficulty.into(), + timestamp: header.timestamp, + }) + })? + .map_err(Into::into) + } + fn load_network_config( &self, config: &Config, db: Arc>, executor: TaskExecutor, ) -> NetworkConfig>>> { - let peers_file = self.network.persistent_peers_file(); - config.network_config( - db, - self.chain.clone(), - self.network.disable_discovery, - self.network.bootnodes.clone(), - self.nat, - peers_file, - Some(executor), - ) + let head = self.fetch_head(Arc::clone(&db)).expect("the head block is missing"); + + self.network + .network_config(config, self.chain.clone()) + .executor(Some(executor)) + .set_head(head) + .build(Arc::new(ShareableDatabase::new(db))) } async fn build_pipeline( @@ -306,7 +331,7 @@ impl Command { where H: HeaderDownloader + 'static, B: BodyDownloader + 'static, - U: SyncStateUpdater, + U: SyncStateUpdater + StatusUpdater + Clone + 'static, { let stage_conf = &config.stages; @@ -318,17 +343,13 @@ impl Command { } let pipeline = builder - .with_sync_state_updater(updater) + .with_sync_state_updater(updater.clone()) .add_stages( - OnlineStages::new(consensus.clone(), header_downloader, body_downloader).set( - TotalDifficultyStage { + DefaultStages::new(consensus.clone(), header_downloader, body_downloader, updater) + .set(TotalDifficultyStage { chain_spec: self.chain.clone(), commit_threshold: stage_conf.total_difficulty.commit_threshold, - }, - ), - ) - .add_stages( - OfflineStages::default() + }) .set(SenderRecoveryStage { commit_threshold: stage_conf.sender_recovery.commit_threshold, }) diff --git a/bin/reth/src/p2p/mod.rs b/bin/reth/src/p2p/mod.rs index 8c34b54eb..210531423 100644 --- a/bin/reth/src/p2p/mod.rs +++ b/bin/reth/src/p2p/mod.rs @@ -10,6 +10,7 @@ use reth_interfaces::p2p::{ }; use reth_network::FetchClient; use reth_primitives::{BlockHashOrNumber, ChainSpec, NodeRecord, SealedHeader}; +use reth_provider::ShareableDatabase; use reth_staged_sync::{ utils::{chainspec::chain_spec_value_parser, hash_or_num_value_parser}, Config, @@ -98,15 +99,10 @@ impl Command { config.peers.connect_trusted_nodes_only = self.trusted_only; let network = config - .network_config( - noop_db, - self.chain.clone(), - self.disable_discovery, - None, - self.nat, - None, - None, - ) + .network_config(self.nat, None) + .set_discovery(self.disable_discovery) + .chain_spec(self.chain.clone()) + .build(Arc::new(ShareableDatabase::new(noop_db))) .start_network() .await?; diff --git a/bin/reth/src/stage/mod.rs b/bin/reth/src/stage/mod.rs index bd18069fd..019d001c3 100644 --- a/bin/reth/src/stage/mod.rs +++ b/bin/reth/src/stage/mod.rs @@ -9,9 +9,8 @@ use crate::{ use clap::{Parser, ValueEnum}; use reth_consensus::beacon::BeaconConsensus; use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder; -use reth_net_nat::NatResolver; use reth_primitives::ChainSpec; -use reth_provider::Transaction; +use reth_provider::{ShareableDatabase, Transaction}; use reth_staged_sync::{ utils::{chainspec::chain_spec_value_parser, init::init_db}, Config, @@ -85,9 +84,6 @@ pub struct Command { #[clap(flatten)] network: NetworkArgs, - - #[arg(long, default_value = "any")] - nat: NatResolver, } #[derive(Debug, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, ValueEnum)] @@ -137,16 +133,10 @@ impl Command { }); } - let network = config - .network_config( - db.clone(), - self.chain.clone(), - self.network.disable_discovery, - None, - self.nat, - None, - None, - ) + let network = self + .network + .network_config(&config, self.chain.clone()) + .build(Arc::new(ShareableDatabase::new(db.clone()))) .start_network() .await?; let fetch_client = Arc::new(network.fetch_client().await?); diff --git a/crates/interfaces/src/p2p/headers/client.rs b/crates/interfaces/src/p2p/headers/client.rs index 52e29c80e..7832e6289 100644 --- a/crates/interfaces/src/p2p/headers/client.rs +++ b/crates/interfaces/src/p2p/headers/client.rs @@ -45,3 +45,12 @@ pub trait StatusUpdater: Send + Sync { /// Updates the status of the p2p node fn update_status(&self, head: Head); } + +/// A [StatusUpdater] implementation that does nothing. +#[derive(Debug, Clone, Default)] +#[non_exhaustive] +pub struct NoopStatusUpdater; + +impl StatusUpdater for NoopStatusUpdater { + fn update_status(&self, _: Head) {} +} diff --git a/crates/interfaces/src/test_utils/headers.rs b/crates/interfaces/src/test_utils/headers.rs index e0d929d3f..cefb5e47b 100644 --- a/crates/interfaces/src/test_utils/headers.rs +++ b/crates/interfaces/src/test_utils/headers.rs @@ -298,12 +298,27 @@ impl TestConsensus { } } -/// Nil status updater for testing -#[derive(Debug, Clone, Default)] -pub struct TestStatusUpdater; +/// Status updater for testing. +/// +/// [`TestStatusUpdater::new()`] creates a new [`TestStatusUpdater`] that is **not** shareable. This +/// struct wraps the sender side of a [`tokio::sync::watch`] channel. The receiving side of the +/// channel (which is shareable by cloning it) is also returned. +#[derive(Debug)] +pub struct TestStatusUpdater(tokio::sync::watch::Sender); + +impl TestStatusUpdater { + /// Create a new test status updater and a receiver to listen to status updates on. + pub fn new() -> (Self, tokio::sync::watch::Receiver) { + let (tx, rx) = tokio::sync::watch::channel(Head::default()); + + (Self(tx), rx) + } +} impl StatusUpdater for TestStatusUpdater { - fn update_status(&self, _: Head) {} + fn update_status(&self, head: Head) { + self.0.send(head).expect("could not send status update"); + } } #[async_trait::async_trait] diff --git a/crates/net/network/src/config.rs b/crates/net/network/src/config.rs index d1989d03f..ccd465a8e 100644 --- a/crates/net/network/src/config.rs +++ b/crates/net/network/src/config.rs @@ -185,6 +185,16 @@ impl NetworkConfigBuilder { self } + /// Sets the highest synced block. + /// + /// This is used to construct the appropriate [`ForkFilter`] and [`Status`] message. + /// + /// If not set, this defaults to the genesis specified by the current chain specification. + pub fn set_head(mut self, head: Head) -> Self { + self.head = Some(head); + self + } + /// Sets the `HelloMessage` to send when connecting to peers. /// /// ``` @@ -265,6 +275,7 @@ impl NetworkConfigBuilder { } /// Sets the discovery service off on true. + // TODO(onbjerg): This name does not imply `true` = disable pub fn set_discovery(mut self, disable_discovery: bool) -> Self { if disable_discovery { self.disable_discovery(); @@ -309,7 +320,7 @@ impl NetworkConfigBuilder { let head = head.unwrap_or(Head { hash: chain_spec.genesis_hash(), number: 0, - timestamp: 0, + timestamp: chain_spec.genesis.timestamp, difficulty: chain_spec.genesis.difficulty, total_difficulty: chain_spec.genesis.difficulty, }); diff --git a/crates/primitives/src/chain/info.rs b/crates/primitives/src/chain/info.rs index 055d044b6..068d8dc6d 100644 --- a/crates/primitives/src/chain/info.rs +++ b/crates/primitives/src/chain/info.rs @@ -3,9 +3,9 @@ use crate::{BlockNumber, H256}; /// Current status of the blockchain's head. #[derive(Debug, Eq, PartialEq)] pub struct ChainInfo { - /// Best block hash. + /// The block hash of the highest fully synced block. pub best_hash: H256, - /// Best block number. + /// The block number of the highest fully synced block. pub best_number: BlockNumber, /// Last block that was finalized. pub last_finalized: Option, diff --git a/crates/staged-sync/src/config.rs b/crates/staged-sync/src/config.rs index 5928cae39..eb72b2d9f 100644 --- a/crates/staged-sync/src/config.rs +++ b/crates/staged-sync/src/config.rs @@ -1,20 +1,12 @@ //! Configuration files. -use std::{path::PathBuf, sync::Arc}; - -use reth_db::database::Database; use reth_discv4::Discv4Config; use reth_downloaders::{ bodies::bodies::BodiesDownloaderBuilder, headers::reverse_headers::ReverseHeadersDownloaderBuilder, }; -use reth_network::{ - config::{mainnet_nodes, rng_secret_key}, - NetworkConfig, NetworkConfigBuilder, PeersConfig, -}; -use reth_primitives::{ChainSpec, NodeRecord}; -use reth_provider::ShareableDatabase; -use reth_tasks::TaskExecutor; +use reth_network::{config::rng_secret_key, NetworkConfigBuilder, PeersConfig}; use serde::{Deserialize, Serialize}; +use std::path::PathBuf; /// Configuration for the reth node. #[derive(Debug, Clone, Default, Deserialize, PartialEq, Serialize)] @@ -29,17 +21,11 @@ pub struct Config { impl Config { /// Initializes network config from read data - #[allow(clippy::too_many_arguments)] - pub fn network_config( + pub fn network_config( &self, - db: DB, - chain_spec: ChainSpec, - disable_discovery: bool, - bootnodes: Option>, nat_resolution_method: reth_net_nat::NatResolver, peers_file: Option, - executor: Option, - ) -> NetworkConfig> { + ) -> NetworkConfigBuilder { let peer_config = self .peers .clone() @@ -47,14 +33,7 @@ impl Config { .unwrap_or_else(|_| self.peers.clone()); let discv4 = Discv4Config::builder().external_ip_resolver(Some(nat_resolution_method)).clone(); - NetworkConfigBuilder::new(rng_secret_key()) - .boot_nodes(bootnodes.unwrap_or_else(mainnet_nodes)) - .peer_config(peer_config) - .discovery(discv4) - .chain_spec(chain_spec) - .executor(executor) - .set_discovery(disable_discovery) - .build(Arc::new(ShareableDatabase::new(db))) + NetworkConfigBuilder::new(rng_secret_key()).peer_config(peer_config).discovery(discv4) } } diff --git a/crates/stages/src/lib.rs b/crates/stages/src/lib.rs index df4fefb12..139bc314d 100644 --- a/crates/stages/src/lib.rs +++ b/crates/stages/src/lib.rs @@ -26,7 +26,7 @@ //! # use reth_downloaders::headers::reverse_headers::ReverseHeadersDownloaderBuilder; //! # use reth_interfaces::consensus::Consensus; //! # use reth_interfaces::sync::NoopSyncStateUpdate; -//! # use reth_interfaces::test_utils::{TestBodiesClient, TestConsensus, TestHeadersClient}; +//! # use reth_interfaces::test_utils::{TestBodiesClient, TestConsensus, TestHeadersClient, TestStatusUpdater}; //! # use reth_primitives::PeerId; //! # use reth_stages::Pipeline; //! # use reth_stages::sets::DefaultStages; @@ -40,14 +40,14 @@ //! # consensus.clone(), //! # create_test_rw_db() //! # ); +//! # let (status_updater, _) = TestStatusUpdater::new(); //! // Create a pipeline that can fully sync //! # let pipeline: Pipeline, NoopSyncStateUpdate> = //! Pipeline::builder() //! .add_stages( -//! DefaultStages::new(consensus, headers_downloader, bodies_downloader) +//! DefaultStages::new(consensus, headers_downloader, bodies_downloader, status_updater) //! ) //! .build(); -//! # //! ``` mod error; mod id; diff --git a/crates/stages/src/sets.rs b/crates/stages/src/sets.rs index ea4d7d55b..6b2d70031 100644 --- a/crates/stages/src/sets.rs +++ b/crates/stages/src/sets.rs @@ -31,16 +31,19 @@ //! ``` use crate::{ stages::{ - AccountHashingStage, BodyStage, ExecutionStage, HeaderStage, IndexAccountHistoryStage, - IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage, - TotalDifficultyStage, TransactionLookupStage, + AccountHashingStage, BodyStage, ExecutionStage, FinishStage, HeaderStage, + IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, + StorageHashingStage, TotalDifficultyStage, TransactionLookupStage, }, StageSet, StageSetBuilder, }; use reth_db::database::Database; use reth_interfaces::{ consensus::Consensus, - p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader}, + p2p::{ + bodies::downloader::BodyDownloader, + headers::{client::StatusUpdater, downloader::HeaderDownloader}, + }, }; use reth_primitives::ChainSpec; use std::sync::Arc; @@ -51,27 +54,42 @@ use std::sync::Arc; /// /// - [`OnlineStages`] /// - [`OfflineStages`] +/// - [`FinishStage`] #[derive(Debug)] -pub struct DefaultStages { +pub struct DefaultStages { /// Configuration for the online stages online: OnlineStages, + /// Configuration for the [`FinishStage`] stage. + status_updater: S, } -impl DefaultStages { +impl DefaultStages { /// Create a new set of default stages with default values. - pub fn new(consensus: Arc, header_downloader: H, body_downloader: B) -> Self { - Self { online: OnlineStages::new(consensus, header_downloader, body_downloader) } + pub fn new( + consensus: Arc, + header_downloader: H, + body_downloader: B, + status_updater: S, + ) -> Self { + Self { + online: OnlineStages::new(consensus, header_downloader, body_downloader), + status_updater, + } } } -impl StageSet for DefaultStages +impl StageSet for DefaultStages where DB: Database, H: HeaderDownloader + 'static, B: BodyDownloader + 'static, + S: StatusUpdater + 'static, { fn builder(self) -> StageSetBuilder { - self.online.builder().add_set(OfflineStages) + self.online + .builder() + .add_set(OfflineStages) + .add_stage(FinishStage::new(self.status_updater)) } } diff --git a/crates/stages/src/stages/finish.rs b/crates/stages/src/stages/finish.rs new file mode 100644 index 000000000..fd3df6498 --- /dev/null +++ b/crates/stages/src/stages/finish.rs @@ -0,0 +1,191 @@ +use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; +use reth_db::database::Database; +use reth_interfaces::p2p::headers::client::StatusUpdater; +use reth_primitives::{BlockNumber, Head}; +use reth_provider::Transaction; + +/// The [`StageId`] of the finish stage. +pub const FINISH: StageId = StageId("Finish"); + +/// The finish stage. +/// +/// This stage does not write anything; it's checkpoint is used to denote the highest fully synced +/// block, which is communicated to the P2P networking component, as well as the RPC component. +/// +/// The highest block is communicated via a [StatusUpdater] when the stage executes or unwinds. +/// +/// When the node starts up, the checkpoint for this stage should be used send the initial status +/// update to the relevant components. Assuming the genesis block is written before this, it is safe +/// to default the stage checkpoint to block number 0 on startup. +#[derive(Default, Debug, Clone)] +pub struct FinishStage { + updater: S, +} + +impl FinishStage +where + S: StatusUpdater, +{ + /// Create a new [FinishStage] with the given [StatusUpdater]. + pub fn new(updater: S) -> Self { + Self { updater } + } + + /// Construct a [Head] from `block_number`. + fn fetch_head( + &self, + tx: &mut Transaction<'_, DB>, + block_number: BlockNumber, + ) -> Result { + let header = tx.get_header(block_number)?; + let hash = tx.get_block_hash(block_number)?; + let total_difficulty = tx.get_td(block_number)?; + + Ok(Head { + number: block_number, + hash, + total_difficulty, + difficulty: header.difficulty, + timestamp: header.timestamp, + }) + } +} + +#[async_trait::async_trait] +impl Stage for FinishStage +where + S: StatusUpdater, +{ + fn id(&self) -> StageId { + FINISH + } + + async fn execute( + &mut self, + tx: &mut Transaction<'_, DB>, + input: ExecInput, + ) -> Result { + self.updater.update_status(self.fetch_head(tx, input.previous_stage_progress())?); + Ok(ExecOutput { done: true, stage_progress: input.previous_stage_progress() }) + } + + async fn unwind( + &mut self, + tx: &mut Transaction<'_, DB>, + input: UnwindInput, + ) -> Result { + self.updater.update_status(self.fetch_head(tx, input.unwind_to)?); + Ok(UnwindOutput { stage_progress: input.unwind_to }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::{ + stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, + TestTransaction, UnwindStageTestRunner, + }; + use reth_interfaces::test_utils::{ + generators::{random_header, random_header_range}, + TestStatusUpdater, + }; + use reth_primitives::SealedHeader; + use std::sync::Mutex; + + stage_test_suite_ext!(FinishTestRunner, finish); + + struct FinishTestRunner { + tx: TestTransaction, + status: Mutex>>, + } + + impl FinishTestRunner { + /// Gets the current status. + /// + /// # Panics + /// + /// Panics if multiple threads try to read the status at the same time, or if no status + /// receiver has been set. + fn current_status(&self) -> Head { + let status_lock = self.status.try_lock().expect("competing for status lock"); + let status = status_lock.as_ref().expect("no status receiver set").borrow(); + status.clone() + } + } + + impl Default for FinishTestRunner { + fn default() -> Self { + FinishTestRunner { tx: TestTransaction::default(), status: Mutex::new(None) } + } + } + + impl StageTestRunner for FinishTestRunner { + type S = FinishStage; + + fn tx(&self) -> &TestTransaction { + &self.tx + } + + fn stage(&self) -> Self::S { + let (status_updater, status) = TestStatusUpdater::new(); + let mut status_container = self.status.try_lock().expect("competing for status lock"); + *status_container = Some(status); + FinishStage::new(status_updater) + } + } + + impl ExecuteStageTestRunner for FinishTestRunner { + type Seed = Vec; + + fn seed_execution(&mut self, input: ExecInput) -> Result { + let start = input.stage_progress.unwrap_or_default(); + let head = random_header(start, None); + self.tx.insert_headers_with_td(std::iter::once(&head))?; + + // use previous progress as seed size + let end = input.previous_stage.map(|(_, num)| num).unwrap_or_default() + 1; + + if start + 1 >= end { + return Ok(Vec::default()) + } + + let mut headers = random_header_range(start + 1..end, head.hash()); + self.tx.insert_headers_with_td(headers.iter())?; + headers.insert(0, head); + Ok(headers) + } + + fn validate_execution( + &self, + input: ExecInput, + output: Option, + ) -> Result<(), TestRunnerError> { + if let Some(output) = output { + assert!(output.done, "stage should always be done"); + assert_eq!( + output.stage_progress, + input.previous_stage_progress(), + "stage progress should always match progress of previous stage" + ); + assert_eq!( + self.current_status().number, + output.stage_progress, + "incorrect block number in status update" + ); + } + Ok(()) + } + } + + impl UnwindStageTestRunner for FinishTestRunner { + fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> { + assert_eq!( + self.current_status().number, + input.unwind_to, + "incorrect block number in status update" + ); + Ok(()) + } + } +} diff --git a/crates/stages/src/stages/index_account_history.rs b/crates/stages/src/stages/index_account_history.rs index 0f91fc3eb..5ff888019 100644 --- a/crates/stages/src/stages/index_account_history.rs +++ b/crates/stages/src/stages/index_account_history.rs @@ -14,7 +14,8 @@ use reth_primitives::{Address, TransitionId}; use std::{collections::BTreeMap, fmt::Debug}; use tracing::*; -const INDEX_ACCOUNT_HISTORY: StageId = StageId("IndexAccountHistory"); +/// The [`StageId`] of the account history indexing stage. +pub const INDEX_ACCOUNT_HISTORY: StageId = StageId("IndexAccountHistory"); /// Stage is indexing history the account changesets generated in /// [`ExecutionStage`][crate::stages::ExecutionStage]. For more information diff --git a/crates/stages/src/stages/index_storage_history.rs b/crates/stages/src/stages/index_storage_history.rs index c938ac64c..576ff0bd5 100644 --- a/crates/stages/src/stages/index_storage_history.rs +++ b/crates/stages/src/stages/index_storage_history.rs @@ -13,7 +13,8 @@ use reth_provider::Transaction; use std::{collections::BTreeMap, fmt::Debug}; use tracing::*; -const INDEX_STORAGE_HISTORY: StageId = StageId("IndexStorageHistory"); +/// The [`StageId`] of the storage history indexing stage. +pub const INDEX_STORAGE_HISTORY: StageId = StageId("IndexStorageHistory"); /// Stage is indexing history the account changesets generated in /// [`ExecutionStage`][crate::stages::ExecutionStage]. For more information diff --git a/crates/stages/src/stages/mod.rs b/crates/stages/src/stages/mod.rs index 4b344a8db..98a40c7f3 100644 --- a/crates/stages/src/stages/mod.rs +++ b/crates/stages/src/stages/mod.rs @@ -2,6 +2,8 @@ mod bodies; /// The execution stage that generates state diff. mod execution; +/// The finish stage +mod finish; /// Account hashing stage. mod hashing_account; /// Storage hashing stage. @@ -25,6 +27,7 @@ mod tx_lookup; pub use bodies::*; pub use execution::*; +pub use finish::*; pub use hashing_account::*; pub use hashing_storage::*; pub use headers::*; diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 373e97862..e33268c0b 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -19,7 +19,8 @@ use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::*; -const SENDER_RECOVERY: StageId = StageId("SenderRecovery"); +/// The [`StageId`] of the sender recovery stage. +pub const SENDER_RECOVERY: StageId = StageId("SenderRecovery"); /// The sender recovery stage iterates over existing transactions, /// recovers the transaction signer and stores them diff --git a/crates/stages/src/stages/total_difficulty.rs b/crates/stages/src/stages/total_difficulty.rs index f05392886..d8bee3af8 100644 --- a/crates/stages/src/stages/total_difficulty.rs +++ b/crates/stages/src/stages/total_difficulty.rs @@ -13,7 +13,8 @@ use reth_primitives::{ChainSpec, Hardfork, EMPTY_OMMER_ROOT, MAINNET, U256}; use reth_provider::Transaction; use tracing::*; -const TOTAL_DIFFICULTY: StageId = StageId("TotalDifficulty"); +/// The [`StageId`] of the total difficulty stage. +pub const TOTAL_DIFFICULTY: StageId = StageId("TotalDifficulty"); /// The total difficulty stage. /// diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index 978a959b9..7921dbded 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -11,7 +11,8 @@ use reth_db::{ use reth_provider::Transaction; use tracing::*; -const TRANSACTION_LOOKUP: StageId = StageId("TransactionLookup"); +/// The [`StageId`] of the transaction lookup stage. +pub const TRANSACTION_LOOKUP: StageId = StageId("TransactionLookup"); /// The transaction lookup stage. /// diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index e78413576..fb81b15b2 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -76,12 +76,13 @@ impl BlockHashProvider for ShareableDatabase { impl BlockProvider for ShareableDatabase { fn chain_info(&self) -> Result { - Ok(ChainInfo { - best_hash: Default::default(), - best_number: 0, - last_finalized: None, - safe_finalized: None, - }) + let best_number = self + .db + .view(|tx| tx.get::("Finish".as_bytes().to_vec()))? + .map_err(Into::::into)? + .unwrap_or_default(); + let best_hash = self.block_hash(U256::from(best_number))?.unwrap_or_default(); + Ok(ChainInfo { best_hash, best_number, last_finalized: None, safe_finalized: None }) } fn block(&self, _id: BlockId) -> Result> { @@ -130,10 +131,11 @@ impl StateProviderFactory for ShareableDatabase { #[cfg(test)] mod tests { - use crate::StateProviderFactory; + use crate::{BlockProvider, StateProviderFactory}; use super::ShareableDatabase; use reth_db::mdbx::{test_utils::create_test_db, EnvKind, WriteMap}; + use reth_primitives::H256; #[test] fn common_history_provider() { @@ -141,4 +143,16 @@ mod tests { let provider = ShareableDatabase::new(db); let _ = provider.latest(); } + + #[test] + fn default_chain_info() { + let db = create_test_db::(EnvKind::RW); + let provider = ShareableDatabase::new(db); + + let chain_info = provider.chain_info().expect("should be ok"); + assert_eq!(chain_info.best_number, 0); + assert_eq!(chain_info.best_hash, H256::zero()); + assert_eq!(chain_info.last_finalized, None); + assert_eq!(chain_info.safe_finalized, None); + } } diff --git a/crates/storage/provider/src/transaction.rs b/crates/storage/provider/src/transaction.rs index 798b2c6fe..747121cff 100644 --- a/crates/storage/provider/src/transaction.rs +++ b/crates/storage/provider/src/transaction.rs @@ -8,7 +8,7 @@ use reth_db::{ transaction::{DbTx, DbTxMut}, }; use reth_interfaces::{db::Error as DbError, provider::Error as ProviderError}; -use reth_primitives::{BlockHash, BlockNumber, Header, TransitionId, TxNumber}; +use reth_primitives::{BlockHash, BlockNumber, Header, TransitionId, TxNumber, U256}; use std::{ fmt::Debug, ops::{Deref, DerefMut}, @@ -100,10 +100,7 @@ where } /// Query [tables::CanonicalHeaders] table for block hash by block number - pub(crate) fn get_block_hash( - &self, - block_number: BlockNumber, - ) -> Result { + pub fn get_block_hash(&self, block_number: BlockNumber) -> Result { let hash = self .get::(block_number)? .ok_or(ProviderError::CanonicalHeader { block_number })?; @@ -150,6 +147,14 @@ where Ok(header) } + /// Get the total difficulty for a block. + pub fn get_td(&self, block: BlockNumber) -> Result { + let td = self + .get::(block)? + .ok_or(ProviderError::TotalDifficulty { number: block })?; + Ok(td.into()) + } + /// Unwind table by some number key #[inline] pub fn unwind_table_by_num(&self, num: u64) -> Result<(), DbError>