refactor: node command (#1087)

Co-authored-by: Andrea Simeoni <>
Co-authored-by: Bjerg <onbjerg@users.noreply.github.com>
Co-authored-by: Oliver Nordbjerg <hi@notbjerg.me>
This commit is contained in:
Andrea Simeoni
2023-01-30 18:19:14 +01:00
committed by GitHub
parent 6b795be77f
commit b7b978fdea

View File

@ -12,12 +12,14 @@ use eyre::Context;
use fdlimit::raise_fd_limit;
use futures::{stream::select as stream_select, Stream, StreamExt};
use reth_consensus::BeaconConsensus;
use reth_db::mdbx::{Env, WriteMap};
use reth_downloaders::{bodies, headers};
use reth_interfaces::consensus::{Consensus, ForkchoiceState};
use reth_net_nat::NatResolver;
use reth_network::NetworkEvent;
use reth_network::{FetchClient, NetworkConfig, NetworkEvent, NetworkHandle};
use reth_network_api::NetworkInfo;
use reth_primitives::{BlockNumber, ChainSpec, H256};
use reth_provider::ShareableDatabase;
use reth_staged_sync::{utils::init::init_genesis, Config};
use reth_stages::{
prelude::*,
@ -84,109 +86,34 @@ impl Command {
/// Execute `node` command
// TODO: RPC
pub async fn execute(self) -> eyre::Result<()> {
info!(target: "reth::cli", "reth {} starting", crate_version!());
// Raise the fd limit of the process.
// Does not do anything on windows.
raise_fd_limit();
let mut config: Config =
confy::load_path(&self.config).wrap_err("Could not load config")?;
config.peers.connect_trusted_nodes_only = self.network.trusted_only;
let mut config: Config = self.load_config()?;
info!(target: "reth::cli", path = %self.db, "Configuration loaded");
if !self.network.trusted_peers.is_empty() {
self.network.trusted_peers.iter().for_each(|peer| {
config.peers.trusted_nodes.insert(*peer);
});
}
info!(target: "reth::cli", "reth {} starting", crate_version!());
self.init_trusted_nodes(&mut config);
info!(target: "reth::cli", path = %self.db, "Opening database");
let db = Arc::new(init_db(&self.db)?);
info!(target: "reth::cli", "Database opened");
if let Some(listen_addr) = self.metrics {
info!(target: "reth::cli", addr = %listen_addr, "Starting metrics endpoint");
prometheus_exporter::initialize(listen_addr)?;
}
self.start_metrics_endpoint()?;
let genesis = init_genesis(db.clone(), self.chain.clone())?;
info!(target: "reth::cli", ?genesis, "Inserted genesis");
init_genesis(db.clone(), self.chain.clone())?;
// TODO: This should be in a builder/factory in the consensus crate
let consensus: Arc<dyn Consensus> = {
let beacon_consensus = BeaconConsensus::new(self.chain.clone());
if let Some(tip) = self.tip {
debug!(target: "reth::cli", %tip, "Tip manually set");
beacon_consensus.notify_fork_choice_state(ForkchoiceState {
head_block_hash: tip,
safe_block_hash: tip,
finalized_block_hash: tip,
})?;
} else {
warn!(target: "reth::cli", "No tip specified. reth cannot communicate with consensus clients, so a tip must manually be provided for the online stages with --debug.tip <HASH>.");
}
Arc::new(beacon_consensus)
};
let network = config
.network_config(
db.clone(),
self.chain.clone(),
self.network.disable_discovery,
self.network.bootnodes.clone(),
self.nat,
)
.start_network()
.await?;
let consensus = self.init_consensus()?;
info!(target: "reth::cli", "Consensus engine initialized");
info!(target: "reth::cli", "Connecting to P2P network");
let netconf = self.load_network_config(&config, &db);
let network = netconf.start_network().await?;
info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network");
let fetch_client = Arc::new(network.fetch_client().await?);
// Spawn headers downloader
let header_downloader = headers::task::TaskDownloader::spawn(
headers::linear::LinearDownloadBuilder::default()
.request_limit(config.stages.headers.downloader_batch_size)
.stream_batch_size(config.stages.headers.commit_threshold as usize)
.build(consensus.clone(), fetch_client.clone()),
);
// Spawn bodies downloader
let body_downloader = bodies::task::TaskDownloader::spawn(
bodies::concurrent::ConcurrentDownloaderBuilder::default()
.with_stream_batch_size(config.stages.bodies.downloader_stream_batch_size)
.with_request_limit(config.stages.bodies.downloader_request_limit)
.with_max_buffered_responses(config.stages.bodies.downloader_max_buffered_responses)
.with_concurrent_requests_range(
config.stages.bodies.downloader_min_concurrent_requests..=
config.stages.bodies.downloader_max_concurrent_requests,
)
.build(fetch_client.clone(), consensus.clone(), db.clone()),
);
let mut pipeline = Pipeline::builder()
.with_sync_state_updater(network.clone())
.add_stages(
OnlineStages::new(consensus.clone(), header_downloader, body_downloader).set(
TotalDifficultyStage {
commit_threshold: config.stages.total_difficulty.commit_threshold,
},
),
)
.add_stages(
OfflineStages::default()
.set(SenderRecoveryStage {
batch_size: config.stages.sender_recovery.batch_size,
commit_threshold: config.stages.execution.commit_threshold,
})
.set(ExecutionStage {
chain_spec: self.chain,
commit_threshold: config.stages.execution.commit_threshold,
}),
)
.build();
let mut pipeline = self.build_pipeline(&config, &network, &consensus, &db).await?;
tokio::spawn(handle_events(stream_select(
network.event_listener().map(Into::into),
@ -200,6 +127,143 @@ impl Command {
info!(target: "reth::cli", "Finishing up");
Ok(())
}
fn load_config(&self) -> eyre::Result<Config> {
confy::load_path::<Config>(&self.config).wrap_err("Could not load config")
}
fn init_trusted_nodes(&self, config: &mut Config) {
config.peers.connect_trusted_nodes_only = self.network.trusted_only;
if !self.network.trusted_peers.is_empty() {
info!(target: "reth::cli", "Adding trusted nodes");
self.network.trusted_peers.iter().for_each(|peer| {
config.peers.trusted_nodes.insert(*peer);
});
}
}
fn start_metrics_endpoint(&self) -> eyre::Result<()> {
if let Some(listen_addr) = self.metrics {
info!(target: "reth::cli", addr = %listen_addr, "Starting metrics endpoint");
prometheus_exporter::initialize(listen_addr)
} else {
Ok(())
}
}
fn init_consensus(&self) -> eyre::Result<Arc<dyn Consensus>> {
// TODO: This should be in a builder/factory in the consensus crate
let consensus: Arc<dyn Consensus> = {
let beacon_consensus = BeaconConsensus::new(self.chain.clone());
if let Some(tip) = self.tip {
debug!(target: "reth::cli", %tip, "Tip manually set");
beacon_consensus.notify_fork_choice_state(ForkchoiceState {
head_block_hash: tip,
safe_block_hash: tip,
finalized_block_hash: tip,
})?;
} else {
let warn_msg = "No tip specified. \
reth cannot communicate with consensus clients, \
so a tip must manually be provided for the online stages with --debug.tip <HASH>.";
warn!(target: "reth::cli", warn_msg);
}
Arc::new(beacon_consensus)
};
Ok(consensus)
}
fn load_network_config(
&self,
config: &Config,
db: &Arc<Env<WriteMap>>,
) -> NetworkConfig<ShareableDatabase<Env<WriteMap>>> {
config.network_config(
db.clone(),
self.chain.clone(),
self.network.disable_discovery,
self.network.bootnodes.clone(),
self.nat,
)
}
async fn build_pipeline(
&self,
config: &Config,
network: &NetworkHandle,
consensus: &Arc<dyn Consensus>,
db: &Arc<Env<WriteMap>>,
) -> eyre::Result<Pipeline<Env<WriteMap>, NetworkHandle>> {
let fetch_client = Arc::new(network.fetch_client().await?);
let header_downloader = self.spawn_headers_downloader(config, consensus, &fetch_client);
let body_downloader = self.spawn_bodies_downloader(config, consensus, &fetch_client, db);
let stage_conf = &config.stages;
let pipeline = Pipeline::builder()
.with_sync_state_updater(network.clone())
.add_stages(
OnlineStages::new(consensus.clone(), header_downloader, body_downloader).set(
TotalDifficultyStage {
commit_threshold: stage_conf.total_difficulty.commit_threshold,
},
),
)
.add_stages(
OfflineStages::default()
.set(SenderRecoveryStage {
batch_size: stage_conf.sender_recovery.batch_size,
commit_threshold: stage_conf.execution.commit_threshold,
})
.set(ExecutionStage {
chain_spec: self.chain.clone(),
commit_threshold: stage_conf.execution.commit_threshold,
}),
)
.build();
Ok(pipeline)
}
fn spawn_headers_downloader(
&self,
config: &Config,
consensus: &Arc<dyn Consensus>,
fetch_client: &Arc<FetchClient>,
) -> reth_downloaders::headers::task::TaskDownloader {
let headers_conf = &config.stages.headers;
headers::task::TaskDownloader::spawn(
headers::linear::LinearDownloadBuilder::default()
.request_limit(headers_conf.downloader_batch_size)
.stream_batch_size(headers_conf.commit_threshold as usize)
.build(consensus.clone(), fetch_client.clone()),
)
}
fn spawn_bodies_downloader(
&self,
config: &Config,
consensus: &Arc<dyn Consensus>,
fetch_client: &Arc<FetchClient>,
db: &Arc<Env<WriteMap>>,
) -> reth_downloaders::bodies::task::TaskDownloader {
let bodies_conf = &config.stages.bodies;
bodies::task::TaskDownloader::spawn(
bodies::concurrent::ConcurrentDownloaderBuilder::default()
.with_stream_batch_size(bodies_conf.downloader_stream_batch_size)
.with_request_limit(bodies_conf.downloader_request_limit)
.with_max_buffered_responses(bodies_conf.downloader_max_buffered_responses)
.with_concurrent_requests_range(
bodies_conf.downloader_min_concurrent_requests..=
bodies_conf.downloader_max_concurrent_requests,
)
.build(fetch_client.clone(), consensus.clone(), db.clone()),
)
}
}
/// The current high-level state of the node.