feat: add Finish stage (#1279)

This commit is contained in:
Bjerg
2023-02-14 17:10:50 +01:00
committed by GitHub
parent ce2354a05a
commit d216081b58
21 changed files with 414 additions and 134 deletions

View File

@ -2,7 +2,11 @@
use crate::dirs::{KnownPeersPath, PlatformPath}; use crate::dirs::{KnownPeersPath, PlatformPath};
use clap::Args; use clap::Args;
use reth_primitives::NodeRecord; use reth_discv4::bootnodes::mainnet_nodes;
use reth_net_nat::NatResolver;
use reth_network::NetworkConfigBuilder;
use reth_primitives::{ChainSpec, NodeRecord};
use reth_staged_sync::Config;
use std::path::PathBuf; use std::path::PathBuf;
/// Parameters for configuring the network more granularity via CLI /// Parameters for configuring the network more granularity via CLI
@ -22,7 +26,7 @@ pub struct NetworkArgs {
#[arg(long)] #[arg(long)]
pub trusted_only: bool, pub trusted_only: bool,
/// Nodes to bootstrap network discovery. /// Bootnodes to connect to initially.
/// ///
/// Will fall back to a network-specific default if not specified. /// Will fall back to a network-specific default if not specified.
#[arg(long, value_delimiter = ',')] #[arg(long, value_delimiter = ',')]
@ -37,6 +41,23 @@ pub struct NetworkArgs {
/// Do not persist peers. Cannot be used with --peers-file /// Do not persist peers. Cannot be used with --peers-file
#[arg(long, verbatim_doc_comment, conflicts_with = "peers_file")] #[arg(long, verbatim_doc_comment, conflicts_with = "peers_file")]
pub no_persist_peers: bool, pub no_persist_peers: bool,
/// NAT resolution method.
#[arg(long, default_value = "any")]
pub nat: NatResolver,
}
impl NetworkArgs {
/// Build a [`NetworkConfigBuilder`] from a [`Config`] and a [`ChainSpec`], in addition to the
/// values in this option struct.
pub fn network_config(&self, config: &Config, chain_spec: ChainSpec) -> NetworkConfigBuilder {
let peers_file = (!self.no_persist_peers).then_some(&self.peers_file);
config
.network_config(self.nat, peers_file.map(|f| f.as_ref().to_path_buf()))
.boot_nodes(self.bootnodes.clone().unwrap_or_else(mainnet_nodes))
.chain_spec(chain_spec)
.set_discovery(self.disable_discovery)
}
} }
// === impl NetworkArgs === // === impl NetworkArgs ===

View File

@ -13,6 +13,7 @@ use reth_downloaders::{
}; };
use reth_interfaces::{ use reth_interfaces::{
consensus::{Consensus, ForkchoiceState}, consensus::{Consensus, ForkchoiceState},
p2p::headers::client::NoopStatusUpdater,
sync::SyncStateUpdater, sync::SyncStateUpdater,
}; };
use reth_primitives::ChainSpec; use reth_primitives::ChainSpec;
@ -142,22 +143,23 @@ impl ImportCommand {
let mut pipeline = Pipeline::builder() let mut pipeline = Pipeline::builder()
.with_sync_state_updater(file_client) .with_sync_state_updater(file_client)
.add_stages( .add_stages(
OnlineStages::new(consensus.clone(), header_downloader, body_downloader).set( DefaultStages::new(
TotalDifficultyStage { consensus.clone(),
chain_spec: self.chain.clone(), header_downloader,
commit_threshold: config.stages.total_difficulty.commit_threshold, body_downloader,
}, NoopStatusUpdater::default(),
), )
) .set(TotalDifficultyStage {
.add_stages( chain_spec: self.chain.clone(),
OfflineStages::default() commit_threshold: config.stages.total_difficulty.commit_threshold,
.set(SenderRecoveryStage { })
commit_threshold: config.stages.sender_recovery.commit_threshold, .set(SenderRecoveryStage {
}) commit_threshold: config.stages.sender_recovery.commit_threshold,
.set(ExecutionStage { })
chain_spec: self.chain.clone(), .set(ExecutionStage {
commit_threshold: config.stages.execution.commit_threshold, chain_spec: self.chain.clone(),
}), commit_threshold: config.stages.execution.commit_threshold,
}),
) )
.with_max_block(0) .with_max_block(0)
.build(); .build();

View File

@ -12,22 +12,29 @@ use eyre::Context;
use fdlimit::raise_fd_limit; use fdlimit::raise_fd_limit;
use futures::{pin_mut, stream::select as stream_select, Stream, StreamExt}; use futures::{pin_mut, stream::select as stream_select, Stream, StreamExt};
use reth_consensus::beacon::BeaconConsensus; use reth_consensus::beacon::BeaconConsensus;
use reth_db::mdbx::{Env, WriteMap}; use reth_db::{
database::Database,
mdbx::{Env, WriteMap},
tables,
transaction::DbTx,
};
use reth_downloaders::{ use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder, bodies::bodies::BodiesDownloaderBuilder,
headers::reverse_headers::ReverseHeadersDownloaderBuilder, headers::reverse_headers::ReverseHeadersDownloaderBuilder,
}; };
use reth_interfaces::{ use reth_interfaces::{
consensus::{Consensus, ForkchoiceState}, consensus::{Consensus, ForkchoiceState},
p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader}, p2p::{
bodies::downloader::BodyDownloader,
headers::{client::StatusUpdater, downloader::HeaderDownloader},
},
sync::SyncStateUpdater, sync::SyncStateUpdater,
}; };
use reth_net_nat::NatResolver;
use reth_network::{ use reth_network::{
error::NetworkError, NetworkConfig, NetworkEvent, NetworkHandle, NetworkManager, error::NetworkError, NetworkConfig, NetworkEvent, NetworkHandle, NetworkManager,
}; };
use reth_network_api::NetworkInfo; use reth_network_api::NetworkInfo;
use reth_primitives::{BlockNumber, ChainSpec, H256}; use reth_primitives::{BlockNumber, ChainSpec, Head, H256};
use reth_provider::{BlockProvider, HeaderProvider, ShareableDatabase}; use reth_provider::{BlockProvider, HeaderProvider, ShareableDatabase};
use reth_rpc_builder::{RethRpcModule, RpcServerConfig, TransportRpcModuleConfig}; use reth_rpc_builder::{RethRpcModule, RpcServerConfig, TransportRpcModuleConfig};
use reth_staged_sync::{ use reth_staged_sync::{
@ -40,7 +47,7 @@ use reth_staged_sync::{
}; };
use reth_stages::{ use reth_stages::{
prelude::*, prelude::*,
stages::{ExecutionStage, SenderRecoveryStage, TotalDifficultyStage}, stages::{ExecutionStage, SenderRecoveryStage, TotalDifficultyStage, FINISH},
}; };
use reth_tasks::TaskExecutor; use reth_tasks::TaskExecutor;
use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
@ -89,9 +96,6 @@ pub struct Command {
#[clap(flatten)] #[clap(flatten)]
network: NetworkArgs, network: NetworkArgs,
#[arg(long, default_value = "any")]
nat: NatResolver,
/// Set the chain tip manually for testing purposes. /// Set the chain tip manually for testing purposes.
/// ///
/// NOTE: This is a temporary flag /// NOTE: This is a temporary flag
@ -135,8 +139,9 @@ impl Command {
self.init_trusted_nodes(&mut config); self.init_trusted_nodes(&mut config);
info!(target: "reth::cli", "Connecting to P2P network"); info!(target: "reth::cli", "Connecting to P2P network");
let netconf = self.load_network_config(&config, Arc::clone(&db), ctx.task_executor.clone()); let network_config =
let network = self.start_network(netconf, &ctx.task_executor, ()).await?; self.load_network_config(&config, Arc::clone(&db), ctx.task_executor.clone());
let network = self.start_network(network_config, &ctx.task_executor, ()).await?;
info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network"); info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network");
// TODO: Use the resolved secret to spawn the Engine API server // TODO: Use the resolved secret to spawn the Engine API server
@ -277,22 +282,42 @@ impl Command {
Ok(handle) Ok(handle)
} }
fn fetch_head(&self, db: Arc<Env<WriteMap>>) -> Result<Head, reth_interfaces::db::Error> {
db.view(|tx| {
let head = FINISH.get_progress(tx)?.unwrap_or_default();
let header = tx
.get::<tables::Headers>(head)?
.expect("the header for the latest block is missing, database is corrupt");
let total_difficulty = tx.get::<tables::HeaderTD>(head)?.expect(
"the total difficulty for the latest block is missing, database is corrupt",
);
let hash = tx
.get::<tables::CanonicalHeaders>(head)?
.expect("the hash for the latest block is missing, database is corrupt");
Ok::<Head, reth_interfaces::db::Error>(Head {
number: head,
hash,
difficulty: header.difficulty,
total_difficulty: total_difficulty.into(),
timestamp: header.timestamp,
})
})?
.map_err(Into::into)
}
fn load_network_config( fn load_network_config(
&self, &self,
config: &Config, config: &Config,
db: Arc<Env<WriteMap>>, db: Arc<Env<WriteMap>>,
executor: TaskExecutor, executor: TaskExecutor,
) -> NetworkConfig<ShareableDatabase<Arc<Env<WriteMap>>>> { ) -> NetworkConfig<ShareableDatabase<Arc<Env<WriteMap>>>> {
let peers_file = self.network.persistent_peers_file(); let head = self.fetch_head(Arc::clone(&db)).expect("the head block is missing");
config.network_config(
db, self.network
self.chain.clone(), .network_config(config, self.chain.clone())
self.network.disable_discovery, .executor(Some(executor))
self.network.bootnodes.clone(), .set_head(head)
self.nat, .build(Arc::new(ShareableDatabase::new(db)))
peers_file,
Some(executor),
)
} }
async fn build_pipeline<H, B, U>( async fn build_pipeline<H, B, U>(
@ -306,7 +331,7 @@ impl Command {
where where
H: HeaderDownloader + 'static, H: HeaderDownloader + 'static,
B: BodyDownloader + 'static, B: BodyDownloader + 'static,
U: SyncStateUpdater, U: SyncStateUpdater + StatusUpdater + Clone + 'static,
{ {
let stage_conf = &config.stages; let stage_conf = &config.stages;
@ -318,17 +343,13 @@ impl Command {
} }
let pipeline = builder let pipeline = builder
.with_sync_state_updater(updater) .with_sync_state_updater(updater.clone())
.add_stages( .add_stages(
OnlineStages::new(consensus.clone(), header_downloader, body_downloader).set( DefaultStages::new(consensus.clone(), header_downloader, body_downloader, updater)
TotalDifficultyStage { .set(TotalDifficultyStage {
chain_spec: self.chain.clone(), chain_spec: self.chain.clone(),
commit_threshold: stage_conf.total_difficulty.commit_threshold, commit_threshold: stage_conf.total_difficulty.commit_threshold,
}, })
),
)
.add_stages(
OfflineStages::default()
.set(SenderRecoveryStage { .set(SenderRecoveryStage {
commit_threshold: stage_conf.sender_recovery.commit_threshold, commit_threshold: stage_conf.sender_recovery.commit_threshold,
}) })

View File

@ -10,6 +10,7 @@ use reth_interfaces::p2p::{
}; };
use reth_network::FetchClient; use reth_network::FetchClient;
use reth_primitives::{BlockHashOrNumber, ChainSpec, NodeRecord, SealedHeader}; use reth_primitives::{BlockHashOrNumber, ChainSpec, NodeRecord, SealedHeader};
use reth_provider::ShareableDatabase;
use reth_staged_sync::{ use reth_staged_sync::{
utils::{chainspec::chain_spec_value_parser, hash_or_num_value_parser}, utils::{chainspec::chain_spec_value_parser, hash_or_num_value_parser},
Config, Config,
@ -98,15 +99,10 @@ impl Command {
config.peers.connect_trusted_nodes_only = self.trusted_only; config.peers.connect_trusted_nodes_only = self.trusted_only;
let network = config let network = config
.network_config( .network_config(self.nat, None)
noop_db, .set_discovery(self.disable_discovery)
self.chain.clone(), .chain_spec(self.chain.clone())
self.disable_discovery, .build(Arc::new(ShareableDatabase::new(noop_db)))
None,
self.nat,
None,
None,
)
.start_network() .start_network()
.await?; .await?;

View File

@ -9,9 +9,8 @@ use crate::{
use clap::{Parser, ValueEnum}; use clap::{Parser, ValueEnum};
use reth_consensus::beacon::BeaconConsensus; use reth_consensus::beacon::BeaconConsensus;
use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder; use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder;
use reth_net_nat::NatResolver;
use reth_primitives::ChainSpec; use reth_primitives::ChainSpec;
use reth_provider::Transaction; use reth_provider::{ShareableDatabase, Transaction};
use reth_staged_sync::{ use reth_staged_sync::{
utils::{chainspec::chain_spec_value_parser, init::init_db}, utils::{chainspec::chain_spec_value_parser, init::init_db},
Config, Config,
@ -85,9 +84,6 @@ pub struct Command {
#[clap(flatten)] #[clap(flatten)]
network: NetworkArgs, network: NetworkArgs,
#[arg(long, default_value = "any")]
nat: NatResolver,
} }
#[derive(Debug, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, ValueEnum)] #[derive(Debug, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, ValueEnum)]
@ -137,16 +133,10 @@ impl Command {
}); });
} }
let network = config let network = self
.network_config( .network
db.clone(), .network_config(&config, self.chain.clone())
self.chain.clone(), .build(Arc::new(ShareableDatabase::new(db.clone())))
self.network.disable_discovery,
None,
self.nat,
None,
None,
)
.start_network() .start_network()
.await?; .await?;
let fetch_client = Arc::new(network.fetch_client().await?); let fetch_client = Arc::new(network.fetch_client().await?);

View File

@ -45,3 +45,12 @@ pub trait StatusUpdater: Send + Sync {
/// Updates the status of the p2p node /// Updates the status of the p2p node
fn update_status(&self, head: Head); fn update_status(&self, head: Head);
} }
/// A [StatusUpdater] implementation that does nothing.
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct NoopStatusUpdater;
impl StatusUpdater for NoopStatusUpdater {
fn update_status(&self, _: Head) {}
}

View File

@ -298,12 +298,27 @@ impl TestConsensus {
} }
} }
/// Nil status updater for testing /// Status updater for testing.
#[derive(Debug, Clone, Default)] ///
pub struct TestStatusUpdater; /// [`TestStatusUpdater::new()`] creates a new [`TestStatusUpdater`] that is **not** shareable. This
/// struct wraps the sender side of a [`tokio::sync::watch`] channel. The receiving side of the
/// channel (which is shareable by cloning it) is also returned.
#[derive(Debug)]
pub struct TestStatusUpdater(tokio::sync::watch::Sender<Head>);
impl TestStatusUpdater {
/// Create a new test status updater and a receiver to listen to status updates on.
pub fn new() -> (Self, tokio::sync::watch::Receiver<Head>) {
let (tx, rx) = tokio::sync::watch::channel(Head::default());
(Self(tx), rx)
}
}
impl StatusUpdater for TestStatusUpdater { impl StatusUpdater for TestStatusUpdater {
fn update_status(&self, _: Head) {} fn update_status(&self, head: Head) {
self.0.send(head).expect("could not send status update");
}
} }
#[async_trait::async_trait] #[async_trait::async_trait]

View File

@ -185,6 +185,16 @@ impl NetworkConfigBuilder {
self self
} }
/// Sets the highest synced block.
///
/// This is used to construct the appropriate [`ForkFilter`] and [`Status`] message.
///
/// If not set, this defaults to the genesis specified by the current chain specification.
pub fn set_head(mut self, head: Head) -> Self {
self.head = Some(head);
self
}
/// Sets the `HelloMessage` to send when connecting to peers. /// Sets the `HelloMessage` to send when connecting to peers.
/// ///
/// ``` /// ```
@ -265,6 +275,7 @@ impl NetworkConfigBuilder {
} }
/// Sets the discovery service off on true. /// Sets the discovery service off on true.
// TODO(onbjerg): This name does not imply `true` = disable
pub fn set_discovery(mut self, disable_discovery: bool) -> Self { pub fn set_discovery(mut self, disable_discovery: bool) -> Self {
if disable_discovery { if disable_discovery {
self.disable_discovery(); self.disable_discovery();
@ -309,7 +320,7 @@ impl NetworkConfigBuilder {
let head = head.unwrap_or(Head { let head = head.unwrap_or(Head {
hash: chain_spec.genesis_hash(), hash: chain_spec.genesis_hash(),
number: 0, number: 0,
timestamp: 0, timestamp: chain_spec.genesis.timestamp,
difficulty: chain_spec.genesis.difficulty, difficulty: chain_spec.genesis.difficulty,
total_difficulty: chain_spec.genesis.difficulty, total_difficulty: chain_spec.genesis.difficulty,
}); });

View File

@ -3,9 +3,9 @@ use crate::{BlockNumber, H256};
/// Current status of the blockchain's head. /// Current status of the blockchain's head.
#[derive(Debug, Eq, PartialEq)] #[derive(Debug, Eq, PartialEq)]
pub struct ChainInfo { pub struct ChainInfo {
/// Best block hash. /// The block hash of the highest fully synced block.
pub best_hash: H256, pub best_hash: H256,
/// Best block number. /// The block number of the highest fully synced block.
pub best_number: BlockNumber, pub best_number: BlockNumber,
/// Last block that was finalized. /// Last block that was finalized.
pub last_finalized: Option<BlockNumber>, pub last_finalized: Option<BlockNumber>,

View File

@ -1,20 +1,12 @@
//! Configuration files. //! Configuration files.
use std::{path::PathBuf, sync::Arc};
use reth_db::database::Database;
use reth_discv4::Discv4Config; use reth_discv4::Discv4Config;
use reth_downloaders::{ use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder, bodies::bodies::BodiesDownloaderBuilder,
headers::reverse_headers::ReverseHeadersDownloaderBuilder, headers::reverse_headers::ReverseHeadersDownloaderBuilder,
}; };
use reth_network::{ use reth_network::{config::rng_secret_key, NetworkConfigBuilder, PeersConfig};
config::{mainnet_nodes, rng_secret_key},
NetworkConfig, NetworkConfigBuilder, PeersConfig,
};
use reth_primitives::{ChainSpec, NodeRecord};
use reth_provider::ShareableDatabase;
use reth_tasks::TaskExecutor;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::path::PathBuf;
/// Configuration for the reth node. /// Configuration for the reth node.
#[derive(Debug, Clone, Default, Deserialize, PartialEq, Serialize)] #[derive(Debug, Clone, Default, Deserialize, PartialEq, Serialize)]
@ -29,17 +21,11 @@ pub struct Config {
impl Config { impl Config {
/// Initializes network config from read data /// Initializes network config from read data
#[allow(clippy::too_many_arguments)] pub fn network_config(
pub fn network_config<DB: Database>(
&self, &self,
db: DB,
chain_spec: ChainSpec,
disable_discovery: bool,
bootnodes: Option<Vec<NodeRecord>>,
nat_resolution_method: reth_net_nat::NatResolver, nat_resolution_method: reth_net_nat::NatResolver,
peers_file: Option<PathBuf>, peers_file: Option<PathBuf>,
executor: Option<TaskExecutor>, ) -> NetworkConfigBuilder {
) -> NetworkConfig<ShareableDatabase<DB>> {
let peer_config = self let peer_config = self
.peers .peers
.clone() .clone()
@ -47,14 +33,7 @@ impl Config {
.unwrap_or_else(|_| self.peers.clone()); .unwrap_or_else(|_| self.peers.clone());
let discv4 = let discv4 =
Discv4Config::builder().external_ip_resolver(Some(nat_resolution_method)).clone(); Discv4Config::builder().external_ip_resolver(Some(nat_resolution_method)).clone();
NetworkConfigBuilder::new(rng_secret_key()) NetworkConfigBuilder::new(rng_secret_key()).peer_config(peer_config).discovery(discv4)
.boot_nodes(bootnodes.unwrap_or_else(mainnet_nodes))
.peer_config(peer_config)
.discovery(discv4)
.chain_spec(chain_spec)
.executor(executor)
.set_discovery(disable_discovery)
.build(Arc::new(ShareableDatabase::new(db)))
} }
} }

View File

@ -26,7 +26,7 @@
//! # use reth_downloaders::headers::reverse_headers::ReverseHeadersDownloaderBuilder; //! # use reth_downloaders::headers::reverse_headers::ReverseHeadersDownloaderBuilder;
//! # use reth_interfaces::consensus::Consensus; //! # use reth_interfaces::consensus::Consensus;
//! # use reth_interfaces::sync::NoopSyncStateUpdate; //! # use reth_interfaces::sync::NoopSyncStateUpdate;
//! # use reth_interfaces::test_utils::{TestBodiesClient, TestConsensus, TestHeadersClient}; //! # use reth_interfaces::test_utils::{TestBodiesClient, TestConsensus, TestHeadersClient, TestStatusUpdater};
//! # use reth_primitives::PeerId; //! # use reth_primitives::PeerId;
//! # use reth_stages::Pipeline; //! # use reth_stages::Pipeline;
//! # use reth_stages::sets::DefaultStages; //! # use reth_stages::sets::DefaultStages;
@ -40,14 +40,14 @@
//! # consensus.clone(), //! # consensus.clone(),
//! # create_test_rw_db() //! # create_test_rw_db()
//! # ); //! # );
//! # let (status_updater, _) = TestStatusUpdater::new();
//! // Create a pipeline that can fully sync //! // Create a pipeline that can fully sync
//! # let pipeline: Pipeline<Env<WriteMap>, NoopSyncStateUpdate> = //! # let pipeline: Pipeline<Env<WriteMap>, NoopSyncStateUpdate> =
//! Pipeline::builder() //! Pipeline::builder()
//! .add_stages( //! .add_stages(
//! DefaultStages::new(consensus, headers_downloader, bodies_downloader) //! DefaultStages::new(consensus, headers_downloader, bodies_downloader, status_updater)
//! ) //! )
//! .build(); //! .build();
//! #
//! ``` //! ```
mod error; mod error;
mod id; mod id;

View File

@ -31,16 +31,19 @@
//! ``` //! ```
use crate::{ use crate::{
stages::{ stages::{
AccountHashingStage, BodyStage, ExecutionStage, HeaderStage, IndexAccountHistoryStage, AccountHashingStage, BodyStage, ExecutionStage, FinishStage, HeaderStage,
IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage, IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage,
TotalDifficultyStage, TransactionLookupStage, StorageHashingStage, TotalDifficultyStage, TransactionLookupStage,
}, },
StageSet, StageSetBuilder, StageSet, StageSetBuilder,
}; };
use reth_db::database::Database; use reth_db::database::Database;
use reth_interfaces::{ use reth_interfaces::{
consensus::Consensus, consensus::Consensus,
p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader}, p2p::{
bodies::downloader::BodyDownloader,
headers::{client::StatusUpdater, downloader::HeaderDownloader},
},
}; };
use reth_primitives::ChainSpec; use reth_primitives::ChainSpec;
use std::sync::Arc; use std::sync::Arc;
@ -51,27 +54,42 @@ use std::sync::Arc;
/// ///
/// - [`OnlineStages`] /// - [`OnlineStages`]
/// - [`OfflineStages`] /// - [`OfflineStages`]
/// - [`FinishStage`]
#[derive(Debug)] #[derive(Debug)]
pub struct DefaultStages<H, B> { pub struct DefaultStages<H, B, S> {
/// Configuration for the online stages /// Configuration for the online stages
online: OnlineStages<H, B>, online: OnlineStages<H, B>,
/// Configuration for the [`FinishStage`] stage.
status_updater: S,
} }
impl<H, B> DefaultStages<H, B> { impl<H, B, S> DefaultStages<H, B, S> {
/// Create a new set of default stages with default values. /// Create a new set of default stages with default values.
pub fn new(consensus: Arc<dyn Consensus>, header_downloader: H, body_downloader: B) -> Self { pub fn new(
Self { online: OnlineStages::new(consensus, header_downloader, body_downloader) } consensus: Arc<dyn Consensus>,
header_downloader: H,
body_downloader: B,
status_updater: S,
) -> Self {
Self {
online: OnlineStages::new(consensus, header_downloader, body_downloader),
status_updater,
}
} }
} }
impl<DB, H, B> StageSet<DB> for DefaultStages<H, B> impl<DB, H, B, S> StageSet<DB> for DefaultStages<H, B, S>
where where
DB: Database, DB: Database,
H: HeaderDownloader + 'static, H: HeaderDownloader + 'static,
B: BodyDownloader + 'static, B: BodyDownloader + 'static,
S: StatusUpdater + 'static,
{ {
fn builder(self) -> StageSetBuilder<DB> { fn builder(self) -> StageSetBuilder<DB> {
self.online.builder().add_set(OfflineStages) self.online
.builder()
.add_set(OfflineStages)
.add_stage(FinishStage::new(self.status_updater))
} }
} }

View File

@ -0,0 +1,191 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput};
use reth_db::database::Database;
use reth_interfaces::p2p::headers::client::StatusUpdater;
use reth_primitives::{BlockNumber, Head};
use reth_provider::Transaction;
/// The [`StageId`] of the finish stage.
pub const FINISH: StageId = StageId("Finish");
/// The finish stage.
///
/// This stage does not write anything; it's checkpoint is used to denote the highest fully synced
/// block, which is communicated to the P2P networking component, as well as the RPC component.
///
/// The highest block is communicated via a [StatusUpdater] when the stage executes or unwinds.
///
/// When the node starts up, the checkpoint for this stage should be used send the initial status
/// update to the relevant components. Assuming the genesis block is written before this, it is safe
/// to default the stage checkpoint to block number 0 on startup.
#[derive(Default, Debug, Clone)]
pub struct FinishStage<S> {
updater: S,
}
impl<S> FinishStage<S>
where
S: StatusUpdater,
{
/// Create a new [FinishStage] with the given [StatusUpdater].
pub fn new(updater: S) -> Self {
Self { updater }
}
/// Construct a [Head] from `block_number`.
fn fetch_head<DB: Database>(
&self,
tx: &mut Transaction<'_, DB>,
block_number: BlockNumber,
) -> Result<Head, StageError> {
let header = tx.get_header(block_number)?;
let hash = tx.get_block_hash(block_number)?;
let total_difficulty = tx.get_td(block_number)?;
Ok(Head {
number: block_number,
hash,
total_difficulty,
difficulty: header.difficulty,
timestamp: header.timestamp,
})
}
}
#[async_trait::async_trait]
impl<DB: Database, S> Stage<DB> for FinishStage<S>
where
S: StatusUpdater,
{
fn id(&self) -> StageId {
FINISH
}
async fn execute(
&mut self,
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
self.updater.update_status(self.fetch_head(tx, input.previous_stage_progress())?);
Ok(ExecOutput { done: true, stage_progress: input.previous_stage_progress() })
}
async fn unwind(
&mut self,
tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
self.updater.update_status(self.fetch_head(tx, input.unwind_to)?);
Ok(UnwindOutput { stage_progress: input.unwind_to })
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::{
stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
TestTransaction, UnwindStageTestRunner,
};
use reth_interfaces::test_utils::{
generators::{random_header, random_header_range},
TestStatusUpdater,
};
use reth_primitives::SealedHeader;
use std::sync::Mutex;
stage_test_suite_ext!(FinishTestRunner, finish);
struct FinishTestRunner {
tx: TestTransaction,
status: Mutex<Option<tokio::sync::watch::Receiver<Head>>>,
}
impl FinishTestRunner {
/// Gets the current status.
///
/// # Panics
///
/// Panics if multiple threads try to read the status at the same time, or if no status
/// receiver has been set.
fn current_status(&self) -> Head {
let status_lock = self.status.try_lock().expect("competing for status lock");
let status = status_lock.as_ref().expect("no status receiver set").borrow();
status.clone()
}
}
impl Default for FinishTestRunner {
fn default() -> Self {
FinishTestRunner { tx: TestTransaction::default(), status: Mutex::new(None) }
}
}
impl StageTestRunner for FinishTestRunner {
type S = FinishStage<TestStatusUpdater>;
fn tx(&self) -> &TestTransaction {
&self.tx
}
fn stage(&self) -> Self::S {
let (status_updater, status) = TestStatusUpdater::new();
let mut status_container = self.status.try_lock().expect("competing for status lock");
*status_container = Some(status);
FinishStage::new(status_updater)
}
}
impl ExecuteStageTestRunner for FinishTestRunner {
type Seed = Vec<SealedHeader>;
fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
let start = input.stage_progress.unwrap_or_default();
let head = random_header(start, None);
self.tx.insert_headers_with_td(std::iter::once(&head))?;
// use previous progress as seed size
let end = input.previous_stage.map(|(_, num)| num).unwrap_or_default() + 1;
if start + 1 >= end {
return Ok(Vec::default())
}
let mut headers = random_header_range(start + 1..end, head.hash());
self.tx.insert_headers_with_td(headers.iter())?;
headers.insert(0, head);
Ok(headers)
}
fn validate_execution(
&self,
input: ExecInput,
output: Option<ExecOutput>,
) -> Result<(), TestRunnerError> {
if let Some(output) = output {
assert!(output.done, "stage should always be done");
assert_eq!(
output.stage_progress,
input.previous_stage_progress(),
"stage progress should always match progress of previous stage"
);
assert_eq!(
self.current_status().number,
output.stage_progress,
"incorrect block number in status update"
);
}
Ok(())
}
}
impl UnwindStageTestRunner for FinishTestRunner {
fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
assert_eq!(
self.current_status().number,
input.unwind_to,
"incorrect block number in status update"
);
Ok(())
}
}
}

View File

@ -14,7 +14,8 @@ use reth_primitives::{Address, TransitionId};
use std::{collections::BTreeMap, fmt::Debug}; use std::{collections::BTreeMap, fmt::Debug};
use tracing::*; use tracing::*;
const INDEX_ACCOUNT_HISTORY: StageId = StageId("IndexAccountHistory"); /// The [`StageId`] of the account history indexing stage.
pub const INDEX_ACCOUNT_HISTORY: StageId = StageId("IndexAccountHistory");
/// Stage is indexing history the account changesets generated in /// Stage is indexing history the account changesets generated in
/// [`ExecutionStage`][crate::stages::ExecutionStage]. For more information /// [`ExecutionStage`][crate::stages::ExecutionStage]. For more information

View File

@ -13,7 +13,8 @@ use reth_provider::Transaction;
use std::{collections::BTreeMap, fmt::Debug}; use std::{collections::BTreeMap, fmt::Debug};
use tracing::*; use tracing::*;
const INDEX_STORAGE_HISTORY: StageId = StageId("IndexStorageHistory"); /// The [`StageId`] of the storage history indexing stage.
pub const INDEX_STORAGE_HISTORY: StageId = StageId("IndexStorageHistory");
/// Stage is indexing history the account changesets generated in /// Stage is indexing history the account changesets generated in
/// [`ExecutionStage`][crate::stages::ExecutionStage]. For more information /// [`ExecutionStage`][crate::stages::ExecutionStage]. For more information

View File

@ -2,6 +2,8 @@
mod bodies; mod bodies;
/// The execution stage that generates state diff. /// The execution stage that generates state diff.
mod execution; mod execution;
/// The finish stage
mod finish;
/// Account hashing stage. /// Account hashing stage.
mod hashing_account; mod hashing_account;
/// Storage hashing stage. /// Storage hashing stage.
@ -25,6 +27,7 @@ mod tx_lookup;
pub use bodies::*; pub use bodies::*;
pub use execution::*; pub use execution::*;
pub use finish::*;
pub use hashing_account::*; pub use hashing_account::*;
pub use hashing_storage::*; pub use hashing_storage::*;
pub use headers::*; pub use headers::*;

View File

@ -19,7 +19,8 @@ use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::*; use tracing::*;
const SENDER_RECOVERY: StageId = StageId("SenderRecovery"); /// The [`StageId`] of the sender recovery stage.
pub const SENDER_RECOVERY: StageId = StageId("SenderRecovery");
/// The sender recovery stage iterates over existing transactions, /// The sender recovery stage iterates over existing transactions,
/// recovers the transaction signer and stores them /// recovers the transaction signer and stores them

View File

@ -13,7 +13,8 @@ use reth_primitives::{ChainSpec, Hardfork, EMPTY_OMMER_ROOT, MAINNET, U256};
use reth_provider::Transaction; use reth_provider::Transaction;
use tracing::*; use tracing::*;
const TOTAL_DIFFICULTY: StageId = StageId("TotalDifficulty"); /// The [`StageId`] of the total difficulty stage.
pub const TOTAL_DIFFICULTY: StageId = StageId("TotalDifficulty");
/// The total difficulty stage. /// The total difficulty stage.
/// ///

View File

@ -11,7 +11,8 @@ use reth_db::{
use reth_provider::Transaction; use reth_provider::Transaction;
use tracing::*; use tracing::*;
const TRANSACTION_LOOKUP: StageId = StageId("TransactionLookup"); /// The [`StageId`] of the transaction lookup stage.
pub const TRANSACTION_LOOKUP: StageId = StageId("TransactionLookup");
/// The transaction lookup stage. /// The transaction lookup stage.
/// ///

View File

@ -76,12 +76,13 @@ impl<DB: Database> BlockHashProvider for ShareableDatabase<DB> {
impl<DB: Database> BlockProvider for ShareableDatabase<DB> { impl<DB: Database> BlockProvider for ShareableDatabase<DB> {
fn chain_info(&self) -> Result<ChainInfo> { fn chain_info(&self) -> Result<ChainInfo> {
Ok(ChainInfo { let best_number = self
best_hash: Default::default(), .db
best_number: 0, .view(|tx| tx.get::<tables::SyncStage>("Finish".as_bytes().to_vec()))?
last_finalized: None, .map_err(Into::<reth_interfaces::db::Error>::into)?
safe_finalized: None, .unwrap_or_default();
}) let best_hash = self.block_hash(U256::from(best_number))?.unwrap_or_default();
Ok(ChainInfo { best_hash, best_number, last_finalized: None, safe_finalized: None })
} }
fn block(&self, _id: BlockId) -> Result<Option<Block>> { fn block(&self, _id: BlockId) -> Result<Option<Block>> {
@ -130,10 +131,11 @@ impl<DB: Database> StateProviderFactory for ShareableDatabase<DB> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::StateProviderFactory; use crate::{BlockProvider, StateProviderFactory};
use super::ShareableDatabase; use super::ShareableDatabase;
use reth_db::mdbx::{test_utils::create_test_db, EnvKind, WriteMap}; use reth_db::mdbx::{test_utils::create_test_db, EnvKind, WriteMap};
use reth_primitives::H256;
#[test] #[test]
fn common_history_provider() { fn common_history_provider() {
@ -141,4 +143,16 @@ mod tests {
let provider = ShareableDatabase::new(db); let provider = ShareableDatabase::new(db);
let _ = provider.latest(); let _ = provider.latest();
} }
#[test]
fn default_chain_info() {
let db = create_test_db::<WriteMap>(EnvKind::RW);
let provider = ShareableDatabase::new(db);
let chain_info = provider.chain_info().expect("should be ok");
assert_eq!(chain_info.best_number, 0);
assert_eq!(chain_info.best_hash, H256::zero());
assert_eq!(chain_info.last_finalized, None);
assert_eq!(chain_info.safe_finalized, None);
}
} }

View File

@ -8,7 +8,7 @@ use reth_db::{
transaction::{DbTx, DbTxMut}, transaction::{DbTx, DbTxMut},
}; };
use reth_interfaces::{db::Error as DbError, provider::Error as ProviderError}; use reth_interfaces::{db::Error as DbError, provider::Error as ProviderError};
use reth_primitives::{BlockHash, BlockNumber, Header, TransitionId, TxNumber}; use reth_primitives::{BlockHash, BlockNumber, Header, TransitionId, TxNumber, U256};
use std::{ use std::{
fmt::Debug, fmt::Debug,
ops::{Deref, DerefMut}, ops::{Deref, DerefMut},
@ -100,10 +100,7 @@ where
} }
/// Query [tables::CanonicalHeaders] table for block hash by block number /// Query [tables::CanonicalHeaders] table for block hash by block number
pub(crate) fn get_block_hash( pub fn get_block_hash(&self, block_number: BlockNumber) -> Result<BlockHash, TransactionError> {
&self,
block_number: BlockNumber,
) -> Result<BlockHash, TransactionError> {
let hash = self let hash = self
.get::<tables::CanonicalHeaders>(block_number)? .get::<tables::CanonicalHeaders>(block_number)?
.ok_or(ProviderError::CanonicalHeader { block_number })?; .ok_or(ProviderError::CanonicalHeader { block_number })?;
@ -150,6 +147,14 @@ where
Ok(header) Ok(header)
} }
/// Get the total difficulty for a block.
pub fn get_td(&self, block: BlockNumber) -> Result<U256, TransactionError> {
let td = self
.get::<tables::HeaderTD>(block)?
.ok_or(ProviderError::TotalDifficulty { number: block })?;
Ok(td.into())
}
/// Unwind table by some number key /// Unwind table by some number key
#[inline] #[inline]
pub fn unwind_table_by_num<T>(&self, num: u64) -> Result<(), DbError> pub fn unwind_table_by_num<T>(&self, num: u64) -> Result<(), DbError>