mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat: Introduce NodeBuilder (#5824)
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -5608,6 +5608,7 @@ dependencies = [
|
||||
"metrics-exporter-prometheus",
|
||||
"metrics-process",
|
||||
"metrics-util",
|
||||
"once_cell",
|
||||
"pin-project",
|
||||
"pretty_assertions",
|
||||
"procfs",
|
||||
|
||||
@ -74,6 +74,7 @@ metrics-util = "0.15.0"
|
||||
metrics-process = "1.0.9"
|
||||
reth-metrics.workspace = true
|
||||
metrics.workspace = true
|
||||
once_cell.workspace = true
|
||||
|
||||
# test vectors generation
|
||||
proptest.workspace = true
|
||||
|
||||
@ -179,6 +179,31 @@ pub struct RpcServerArgs {
|
||||
}
|
||||
|
||||
impl RpcServerArgs {
|
||||
/// Enables the HTTP-RPC server.
|
||||
pub fn with_http(mut self) -> Self {
|
||||
self.http = true;
|
||||
self
|
||||
}
|
||||
|
||||
/// Enables the WS-RPC server.
|
||||
pub fn with_ws(mut self) -> Self {
|
||||
self.ws = true;
|
||||
self
|
||||
}
|
||||
|
||||
/// Change rpc port numbers based on the instance number.
|
||||
///
|
||||
/// Warning: if `instance` is zero, this will panic.
|
||||
pub fn adjust_instance_ports(&mut self, instance: u16) {
|
||||
debug_assert_ne!(instance, 0, "instance must be non-zero");
|
||||
// auth port is scaled by a factor of instance * 100
|
||||
self.auth_port += instance * 100 - 100;
|
||||
// http port is scaled by a factor of -instance
|
||||
self.http_port -= instance - 1;
|
||||
// ws port is scaled by a factor of instance * 2
|
||||
self.ws_port += instance * 2 - 2;
|
||||
}
|
||||
|
||||
/// Configures and launches _all_ servers.
|
||||
///
|
||||
/// Returns the handles for the launched regular RPC server(s) (if any) and the server handle
|
||||
|
||||
108
bin/reth/src/cli/db_type.rs
Normal file
108
bin/reth/src/cli/db_type.rs
Normal file
@ -0,0 +1,108 @@
|
||||
//! A real or test database type
|
||||
|
||||
use crate::dirs::{ChainPath, DataDirPath, MaybePlatformPath};
|
||||
use reth_db::{
|
||||
init_db,
|
||||
test_utils::{create_test_rw_db, TempDatabase},
|
||||
DatabaseEnv,
|
||||
};
|
||||
use reth_interfaces::db::LogLevel;
|
||||
use reth_primitives::Chain;
|
||||
use std::{str::FromStr, sync::Arc};
|
||||
|
||||
/// A type that represents either a _real_ (represented by a path), or _test_ database, which will
|
||||
/// use a [TempDatabase].
|
||||
#[derive(Debug)]
|
||||
pub enum DatabaseType {
|
||||
/// The real database type
|
||||
Real(MaybePlatformPath<DataDirPath>),
|
||||
/// The test database type
|
||||
Test,
|
||||
}
|
||||
|
||||
/// The [Default] implementation for [DatabaseType] uses the _real_ variant, using the default
|
||||
/// value for the inner [MaybePlatformPath].
|
||||
impl Default for DatabaseType {
|
||||
fn default() -> Self {
|
||||
Self::Real(MaybePlatformPath::<DataDirPath>::default())
|
||||
}
|
||||
}
|
||||
|
||||
impl DatabaseType {
|
||||
/// Creates a _test_ database
|
||||
pub fn test() -> Self {
|
||||
Self::Test
|
||||
}
|
||||
}
|
||||
|
||||
/// Type that represents a [DatabaseType] and [LogLevel], used to build a database type
|
||||
pub struct DatabaseBuilder {
|
||||
/// The database type
|
||||
db_type: DatabaseType,
|
||||
}
|
||||
|
||||
impl DatabaseBuilder {
|
||||
/// Creates the [DatabaseBuilder] with the given [DatabaseType]
|
||||
pub fn new(db_type: DatabaseType) -> Self {
|
||||
Self { db_type }
|
||||
}
|
||||
|
||||
/// Initializes and returns the [DatabaseInstance] depending on the current database type. If
|
||||
/// the [DatabaseType] is test, the [LogLevel] is not used.
|
||||
///
|
||||
/// If the [DatabaseType] is test, then the [ChainPath] constructed will be derived from the db
|
||||
/// path of the [TempDatabase] and the given chain.
|
||||
pub fn build_db(
|
||||
self,
|
||||
log_level: Option<LogLevel>,
|
||||
chain: Chain,
|
||||
) -> eyre::Result<DatabaseInstance> {
|
||||
match self.db_type {
|
||||
DatabaseType::Test => {
|
||||
let db = create_test_rw_db();
|
||||
let db_path_str = db.path().to_str().expect("Path is not valid unicode");
|
||||
let path = MaybePlatformPath::<DataDirPath>::from_str(db_path_str)
|
||||
.expect("Path is not valid");
|
||||
let data_dir = path.unwrap_or_chain_default(chain);
|
||||
|
||||
Ok(DatabaseInstance::Test { db, data_dir })
|
||||
}
|
||||
DatabaseType::Real(path) => {
|
||||
let data_dir = path.unwrap_or_chain_default(chain);
|
||||
|
||||
tracing::info!(target: "reth::cli", path = ?data_dir, "Opening database");
|
||||
let db = Arc::new(init_db(data_dir.clone(), log_level)?);
|
||||
Ok(DatabaseInstance::Real { db, data_dir })
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A constructed database type, with a [ChainPath].
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum DatabaseInstance {
|
||||
/// The test database
|
||||
Test {
|
||||
/// The database
|
||||
db: Arc<TempDatabase<DatabaseEnv>>,
|
||||
/// The data dir
|
||||
data_dir: ChainPath<DataDirPath>,
|
||||
},
|
||||
/// The real database
|
||||
Real {
|
||||
/// The database
|
||||
db: Arc<DatabaseEnv>,
|
||||
/// The data dir
|
||||
data_dir: ChainPath<DataDirPath>,
|
||||
},
|
||||
}
|
||||
|
||||
impl DatabaseInstance {
|
||||
/// Returns the data dir for this database instance
|
||||
pub fn data_dir(&self) -> &ChainPath<DataDirPath> {
|
||||
match self {
|
||||
Self::Test { data_dir, .. } => data_dir,
|
||||
Self::Real { data_dir, .. } => data_dir,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -21,7 +21,9 @@ use std::{fmt, fmt::Display, sync::Arc};
|
||||
|
||||
pub mod components;
|
||||
pub mod config;
|
||||
pub mod db_type;
|
||||
pub mod ext;
|
||||
pub mod node_builder;
|
||||
|
||||
/// Default [directives](Directive) for [EnvFilter] which disables high-frequency debug logs from
|
||||
/// `hyper` and `trust-dns`
|
||||
|
||||
1354
bin/reth/src/cli/node_builder.rs
Normal file
1354
bin/reth/src/cli/node_builder.rs
Normal file
File diff suppressed because it is too large
Load Diff
@ -190,6 +190,11 @@ impl<D: XdgPath> MaybePlatformPath<D> {
|
||||
)
|
||||
}
|
||||
|
||||
/// Returns the default platform path for the specified [Chain].
|
||||
pub fn chain_default(chain: Chain) -> ChainPath<D> {
|
||||
PlatformPath::default().with_chain(chain)
|
||||
}
|
||||
|
||||
/// Returns true if a custom path is set
|
||||
pub fn is_some(&self) -> bool {
|
||||
self.0.is_some()
|
||||
|
||||
@ -3,90 +3,21 @@
|
||||
//! Starts the client
|
||||
use crate::{
|
||||
args::{
|
||||
get_secret_key,
|
||||
utils::{chain_help, genesis_value_parser, parse_socket_address, SUPPORTED_CHAINS},
|
||||
DatabaseArgs, DebugArgs, DevArgs, NetworkArgs, PayloadBuilderArgs, PruningArgs,
|
||||
RpcServerArgs, TxPoolArgs,
|
||||
},
|
||||
cli::{
|
||||
components::RethNodeComponentsImpl,
|
||||
config::{RethRpcConfig, RethTransactionPoolConfig},
|
||||
ext::{RethCliExt, RethNodeCommandConfig},
|
||||
},
|
||||
dirs::{ChainPath, DataDirPath, MaybePlatformPath},
|
||||
init::init_genesis,
|
||||
node::cl_events::ConsensusLayerHealthEvents,
|
||||
prometheus_exporter,
|
||||
cli::{db_type::DatabaseType, ext::RethCliExt, node_builder::NodeBuilder},
|
||||
dirs::{DataDirPath, MaybePlatformPath},
|
||||
runner::CliContext,
|
||||
utils::get_single_header,
|
||||
version::SHORT_VERSION,
|
||||
};
|
||||
use clap::{value_parser, Parser};
|
||||
use eyre::Context;
|
||||
use futures::{future::Either, pin_mut, stream, stream_select, StreamExt};
|
||||
use metrics_exporter_prometheus::PrometheusHandle;
|
||||
use reth_auto_seal_consensus::{AutoSealBuilder, AutoSealConsensus, MiningMode};
|
||||
use reth_beacon_consensus::{
|
||||
hooks::{EngineHooks, PruneHook},
|
||||
BeaconConsensus, BeaconConsensusEngine, MIN_BLOCKS_FOR_PIPELINE_RUN,
|
||||
};
|
||||
use reth_blockchain_tree::{
|
||||
config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree,
|
||||
};
|
||||
use reth_config::{
|
||||
config::{PruneConfig, StageConfig},
|
||||
Config,
|
||||
};
|
||||
use reth_db::{database::Database, database_metrics::DatabaseMetrics, init_db};
|
||||
use reth_downloaders::{
|
||||
bodies::bodies::BodiesDownloaderBuilder,
|
||||
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
|
||||
};
|
||||
use reth_interfaces::{
|
||||
consensus::Consensus,
|
||||
p2p::{
|
||||
bodies::{client::BodiesClient, downloader::BodyDownloader},
|
||||
either::EitherDownloader,
|
||||
headers::{client::HeadersClient, downloader::HeaderDownloader},
|
||||
},
|
||||
RethResult,
|
||||
};
|
||||
use reth_network::{NetworkBuilder, NetworkConfig, NetworkEvents, NetworkHandle, NetworkManager};
|
||||
use reth_network_api::{NetworkInfo, PeersInfo};
|
||||
use reth_primitives::{
|
||||
constants::eip4844::{LoadKzgSettingsError, MAINNET_KZG_TRUSTED_SETUP},
|
||||
fs,
|
||||
kzg::KzgSettings,
|
||||
stage::StageId,
|
||||
BlockHashOrNumber, BlockNumber, ChainSpec, DisplayHardforks, Head, SealedHeader, B256,
|
||||
};
|
||||
use reth_provider::{
|
||||
providers::BlockchainProvider, BlockHashReader, BlockReader, CanonStateSubscriptions,
|
||||
HeaderProvider, HeaderSyncMode, ProviderFactory, StageCheckpointReader,
|
||||
};
|
||||
use reth_prune::PrunerBuilder;
|
||||
use reth_revm::EvmProcessorFactory;
|
||||
use reth_revm_inspectors::stack::Hook;
|
||||
use reth_rpc_engine_api::EngineApi;
|
||||
use reth_stages::{
|
||||
prelude::*,
|
||||
stages::{
|
||||
AccountHashingStage, ExecutionStage, ExecutionStageThresholds, IndexAccountHistoryStage,
|
||||
IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage,
|
||||
TotalDifficultyStage, TransactionLookupStage,
|
||||
},
|
||||
};
|
||||
use reth_tasks::TaskExecutor;
|
||||
use reth_transaction_pool::{
|
||||
blobstore::InMemoryBlobStore, TransactionPool, TransactionValidationTaskExecutor,
|
||||
};
|
||||
use secp256k1::SecretKey;
|
||||
use std::{
|
||||
net::{SocketAddr, SocketAddrV4},
|
||||
path::PathBuf,
|
||||
sync::Arc,
|
||||
};
|
||||
use tokio::sync::{mpsc::unbounded_channel, oneshot, watch};
|
||||
use reth_auto_seal_consensus::AutoSealConsensus;
|
||||
use reth_beacon_consensus::BeaconConsensus;
|
||||
use reth_interfaces::consensus::Consensus;
|
||||
use reth_primitives::ChainSpec;
|
||||
use std::{net::SocketAddr, path::PathBuf, sync::Arc};
|
||||
use tracing::*;
|
||||
|
||||
pub mod cl_events;
|
||||
@ -236,357 +167,59 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
|
||||
}
|
||||
|
||||
/// Execute `node` command
|
||||
pub async fn execute(mut self, ctx: CliContext) -> eyre::Result<()> {
|
||||
pub async fn execute(self, ctx: CliContext) -> eyre::Result<()> {
|
||||
info!(target: "reth::cli", "reth {} starting", SHORT_VERSION);
|
||||
|
||||
// Raise the fd limit of the process.
|
||||
// Does not do anything on windows.
|
||||
let _ = fdlimit::raise_fd_limit();
|
||||
|
||||
// get config
|
||||
let config = self.load_config()?;
|
||||
|
||||
let prometheus_handle = self.install_prometheus_recorder()?;
|
||||
|
||||
let data_dir = self.data_dir();
|
||||
let db_path = data_dir.db_path();
|
||||
|
||||
info!(target: "reth::cli", path = ?db_path, "Opening database");
|
||||
let db = Arc::new(init_db(&db_path, self.db.log_level)?.with_metrics());
|
||||
info!(target: "reth::cli", "Database opened");
|
||||
|
||||
let mut provider_factory = ProviderFactory::new(Arc::clone(&db), Arc::clone(&self.chain));
|
||||
|
||||
// configure snapshotter
|
||||
let snapshotter = reth_snapshot::Snapshotter::new(
|
||||
provider_factory.clone(),
|
||||
data_dir.snapshots_path(),
|
||||
self.chain.snapshot_block_interval,
|
||||
)?;
|
||||
|
||||
provider_factory = provider_factory
|
||||
.with_snapshots(data_dir.snapshots_path(), snapshotter.highest_snapshot_receiver())?;
|
||||
|
||||
self.start_metrics_endpoint(prometheus_handle, Arc::clone(&db)).await?;
|
||||
|
||||
debug!(target: "reth::cli", chain=%self.chain.chain, genesis=?self.chain.genesis_hash(), "Initializing genesis");
|
||||
|
||||
let genesis_hash = init_genesis(Arc::clone(&db), self.chain.clone())?;
|
||||
|
||||
info!(target: "reth::cli", "{}", DisplayHardforks::new(self.chain.hardforks()));
|
||||
|
||||
let consensus = self.consensus();
|
||||
|
||||
debug!(target: "reth::cli", "Spawning stages metrics listener task");
|
||||
let (sync_metrics_tx, sync_metrics_rx) = unbounded_channel();
|
||||
let sync_metrics_listener = reth_stages::MetricsListener::new(sync_metrics_rx);
|
||||
ctx.task_executor.spawn_critical("stages metrics listener task", sync_metrics_listener);
|
||||
|
||||
let prune_config =
|
||||
self.pruning.prune_config(Arc::clone(&self.chain))?.or(config.prune.clone());
|
||||
|
||||
// configure blockchain tree
|
||||
let tree_externals = TreeExternals::new(
|
||||
provider_factory.clone(),
|
||||
Arc::clone(&consensus),
|
||||
EvmProcessorFactory::new(self.chain.clone()),
|
||||
);
|
||||
let tree_config = BlockchainTreeConfig::default();
|
||||
let tree = BlockchainTree::new(
|
||||
tree_externals,
|
||||
tree_config,
|
||||
prune_config.clone().map(|config| config.segments),
|
||||
)?
|
||||
.with_sync_metrics_tx(sync_metrics_tx.clone());
|
||||
let canon_state_notification_sender = tree.canon_state_notification_sender();
|
||||
let blockchain_tree = ShareableBlockchainTree::new(tree);
|
||||
debug!(target: "reth::cli", "configured blockchain tree");
|
||||
|
||||
// fetch the head block from the database
|
||||
let head =
|
||||
self.lookup_head(provider_factory.clone()).wrap_err("the head block is missing")?;
|
||||
|
||||
// setup the blockchain provider
|
||||
let blockchain_db =
|
||||
BlockchainProvider::new(provider_factory.clone(), blockchain_tree.clone())?;
|
||||
let blob_store = InMemoryBlobStore::default();
|
||||
let validator = TransactionValidationTaskExecutor::eth_builder(Arc::clone(&self.chain))
|
||||
.with_head_timestamp(head.timestamp)
|
||||
.kzg_settings(self.kzg_settings()?)
|
||||
.with_additional_tasks(1)
|
||||
.build_with_tasks(blockchain_db.clone(), ctx.task_executor.clone(), blob_store.clone());
|
||||
|
||||
let transaction_pool =
|
||||
reth_transaction_pool::Pool::eth_pool(validator, blob_store, self.txpool.pool_config());
|
||||
info!(target: "reth::cli", "Transaction pool initialized");
|
||||
|
||||
// spawn txpool maintenance task
|
||||
{
|
||||
let pool = transaction_pool.clone();
|
||||
let chain_events = blockchain_db.canonical_state_stream();
|
||||
let client = blockchain_db.clone();
|
||||
ctx.task_executor.spawn_critical(
|
||||
"txpool maintenance task",
|
||||
reth_transaction_pool::maintain::maintain_transaction_pool_future(
|
||||
client,
|
||||
pool,
|
||||
chain_events,
|
||||
ctx.task_executor.clone(),
|
||||
Default::default(),
|
||||
),
|
||||
);
|
||||
debug!(target: "reth::cli", "Spawned txpool maintenance task");
|
||||
}
|
||||
|
||||
info!(target: "reth::cli", "Connecting to P2P network");
|
||||
let network_secret_path =
|
||||
self.network.p2p_secret_key.clone().unwrap_or_else(|| data_dir.p2p_secret_path());
|
||||
debug!(target: "reth::cli", ?network_secret_path, "Loading p2p key file");
|
||||
let secret_key = get_secret_key(&network_secret_path)?;
|
||||
let default_peers_path = data_dir.known_peers_path();
|
||||
let network_config = self.load_network_config(
|
||||
&config,
|
||||
provider_factory.clone(),
|
||||
ctx.task_executor.clone(),
|
||||
head,
|
||||
secret_key,
|
||||
default_peers_path.clone(),
|
||||
);
|
||||
|
||||
let network_client = network_config.client.clone();
|
||||
let mut network_builder = NetworkManager::builder(network_config).await?;
|
||||
|
||||
let components = RethNodeComponentsImpl {
|
||||
provider: blockchain_db.clone(),
|
||||
pool: transaction_pool.clone(),
|
||||
network: network_builder.handle(),
|
||||
task_executor: ctx.task_executor.clone(),
|
||||
events: blockchain_db.clone(),
|
||||
};
|
||||
|
||||
// allow network modifications
|
||||
self.ext.configure_network(network_builder.network_mut(), &components)?;
|
||||
|
||||
// launch network
|
||||
let network = self.start_network(
|
||||
network_builder,
|
||||
&ctx.task_executor,
|
||||
transaction_pool.clone(),
|
||||
network_client,
|
||||
default_peers_path,
|
||||
);
|
||||
|
||||
info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), enode = %network.local_node_record(), "Connected to P2P network");
|
||||
debug!(target: "reth::cli", peer_id = ?network.peer_id(), "Full peer ID");
|
||||
let network_client = network.fetch_client().await?;
|
||||
|
||||
self.ext.on_components_initialized(&components)?;
|
||||
|
||||
debug!(target: "reth::cli", "Spawning payload builder service");
|
||||
let payload_builder = self.ext.spawn_payload_builder_service(&self.builder, &components)?;
|
||||
|
||||
let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
|
||||
let max_block = if let Some(block) = self.debug.max_block {
|
||||
Some(block)
|
||||
} else if let Some(tip) = self.debug.tip {
|
||||
Some(self.lookup_or_fetch_tip(provider_factory.clone(), &network_client, tip).await?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Configure the pipeline
|
||||
let (mut pipeline, client) = if self.dev.dev {
|
||||
info!(target: "reth::cli", "Starting Reth in dev mode");
|
||||
|
||||
let mining_mode = if let Some(interval) = self.dev.block_time {
|
||||
MiningMode::interval(interval)
|
||||
} else if let Some(max_transactions) = self.dev.block_max_transactions {
|
||||
MiningMode::instant(
|
||||
max_transactions,
|
||||
transaction_pool.pending_transactions_listener(),
|
||||
)
|
||||
} else {
|
||||
info!(target: "reth::cli", "No mining mode specified, defaulting to ReadyTransaction");
|
||||
MiningMode::instant(1, transaction_pool.pending_transactions_listener())
|
||||
};
|
||||
|
||||
let (_, client, mut task) = AutoSealBuilder::new(
|
||||
Arc::clone(&self.chain),
|
||||
blockchain_db.clone(),
|
||||
transaction_pool.clone(),
|
||||
consensus_engine_tx.clone(),
|
||||
canon_state_notification_sender,
|
||||
mining_mode,
|
||||
)
|
||||
.build();
|
||||
|
||||
let mut pipeline = self
|
||||
.build_networked_pipeline(
|
||||
&config.stages,
|
||||
client.clone(),
|
||||
Arc::clone(&consensus),
|
||||
provider_factory.clone(),
|
||||
&ctx.task_executor,
|
||||
sync_metrics_tx,
|
||||
prune_config.clone(),
|
||||
max_block,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let pipeline_events = pipeline.events();
|
||||
task.set_pipeline_events(pipeline_events);
|
||||
debug!(target: "reth::cli", "Spawning auto mine task");
|
||||
ctx.task_executor.spawn(Box::pin(task));
|
||||
|
||||
(pipeline, EitherDownloader::Left(client))
|
||||
} else {
|
||||
let pipeline = self
|
||||
.build_networked_pipeline(
|
||||
&config.stages,
|
||||
network_client.clone(),
|
||||
Arc::clone(&consensus),
|
||||
provider_factory.clone(),
|
||||
&ctx.task_executor,
|
||||
sync_metrics_tx,
|
||||
prune_config.clone(),
|
||||
max_block,
|
||||
)
|
||||
.await?;
|
||||
|
||||
(pipeline, EitherDownloader::Right(network_client))
|
||||
};
|
||||
|
||||
let pipeline_events = pipeline.events();
|
||||
|
||||
let initial_target = if let Some(tip) = self.debug.tip {
|
||||
// Set the provided tip as the initial pipeline target.
|
||||
debug!(target: "reth::cli", %tip, "Tip manually set");
|
||||
Some(tip)
|
||||
} else if self.debug.continuous {
|
||||
// Set genesis as the initial pipeline target.
|
||||
// This will allow the downloader to start
|
||||
debug!(target: "reth::cli", "Continuous sync mode enabled");
|
||||
Some(genesis_hash)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let mut hooks = EngineHooks::new();
|
||||
|
||||
let pruner_events = if let Some(prune_config) = prune_config {
|
||||
let mut pruner = PrunerBuilder::new(prune_config.clone())
|
||||
.max_reorg_depth(tree_config.max_reorg_depth() as usize)
|
||||
.prune_delete_limit(self.chain.prune_delete_limit)
|
||||
.build(provider_factory, snapshotter.highest_snapshot_receiver());
|
||||
|
||||
let events = pruner.events();
|
||||
hooks.add(PruneHook::new(pruner, Box::new(ctx.task_executor.clone())));
|
||||
|
||||
info!(target: "reth::cli", ?prune_config, "Pruner initialized");
|
||||
Either::Left(events)
|
||||
} else {
|
||||
Either::Right(stream::empty())
|
||||
};
|
||||
|
||||
// Configure the consensus engine
|
||||
let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel(
|
||||
client,
|
||||
pipeline,
|
||||
blockchain_db.clone(),
|
||||
Box::new(ctx.task_executor.clone()),
|
||||
Box::new(network.clone()),
|
||||
max_block,
|
||||
self.debug.continuous,
|
||||
payload_builder.clone(),
|
||||
initial_target,
|
||||
MIN_BLOCKS_FOR_PIPELINE_RUN,
|
||||
consensus_engine_tx,
|
||||
consensus_engine_rx,
|
||||
hooks,
|
||||
)?;
|
||||
info!(target: "reth::cli", "Consensus engine initialized");
|
||||
|
||||
let events = stream_select!(
|
||||
network.event_listener().map(Into::into),
|
||||
beacon_engine_handle.event_listener().map(Into::into),
|
||||
pipeline_events.map(Into::into),
|
||||
if self.debug.tip.is_none() {
|
||||
Either::Left(
|
||||
ConsensusLayerHealthEvents::new(Box::new(blockchain_db.clone()))
|
||||
.map(Into::into),
|
||||
)
|
||||
} else {
|
||||
Either::Right(stream::empty())
|
||||
},
|
||||
pruner_events.map(Into::into)
|
||||
);
|
||||
ctx.task_executor.spawn_critical(
|
||||
"events task",
|
||||
events::handle_events(Some(network.clone()), Some(head.number), events, db.clone()),
|
||||
);
|
||||
|
||||
let engine_api = EngineApi::new(
|
||||
blockchain_db.clone(),
|
||||
self.chain.clone(),
|
||||
beacon_engine_handle,
|
||||
payload_builder.into(),
|
||||
Box::new(ctx.task_executor.clone()),
|
||||
);
|
||||
info!(target: "reth::cli", "Engine API handler initialized");
|
||||
|
||||
// extract the jwt secret from the args if possible
|
||||
let default_jwt_path = data_dir.jwt_path();
|
||||
let jwt_secret = self.rpc.auth_jwt_secret(default_jwt_path)?;
|
||||
|
||||
// adjust rpc port numbers based on instance number
|
||||
self.adjust_instance_ports();
|
||||
|
||||
// Start RPC servers
|
||||
let _rpc_server_handles =
|
||||
self.rpc.start_servers(&components, engine_api, jwt_secret, &mut self.ext).await?;
|
||||
|
||||
// Run consensus engine to completion
|
||||
let (tx, rx) = oneshot::channel();
|
||||
info!(target: "reth::cli", "Starting consensus engine");
|
||||
ctx.task_executor.spawn_critical_blocking("consensus engine", async move {
|
||||
let res = beacon_consensus_engine.await;
|
||||
let _ = tx.send(res);
|
||||
});
|
||||
|
||||
self.ext.on_node_started(&components)?;
|
||||
|
||||
// If `enable_genesis_walkback` is set to true, the rollup client will need to
|
||||
// perform the derivation pipeline from genesis, validating the data dir.
|
||||
// When set to false, set the finalized, safe, and unsafe head block hashes
|
||||
// on the rollup client using a fork choice update. This prevents the rollup
|
||||
// client from performing the derivation pipeline from genesis, and instead
|
||||
// starts syncing from the current tip in the DB.
|
||||
let Self {
|
||||
datadir,
|
||||
config,
|
||||
chain,
|
||||
metrics,
|
||||
trusted_setup_file,
|
||||
instance,
|
||||
network,
|
||||
rpc,
|
||||
txpool,
|
||||
builder,
|
||||
debug,
|
||||
db,
|
||||
dev,
|
||||
pruning,
|
||||
#[cfg(feature = "optimism")]
|
||||
if self.chain.is_optimism() && !self.rollup.enable_genesis_walkback {
|
||||
let client = _rpc_server_handles.auth.http_client();
|
||||
reth_rpc_api::EngineApiClient::fork_choice_updated_v2(
|
||||
&client,
|
||||
reth_rpc_types::engine::ForkchoiceState {
|
||||
head_block_hash: head.hash,
|
||||
safe_block_hash: head.hash,
|
||||
finalized_block_hash: head.hash,
|
||||
},
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
rollup,
|
||||
ext,
|
||||
} = self;
|
||||
|
||||
rx.await??;
|
||||
// set up real database
|
||||
let database = DatabaseType::Real(datadir);
|
||||
|
||||
info!(target: "reth::cli", "Consensus engine has exited.");
|
||||
// set up node config
|
||||
let node_config = NodeBuilder {
|
||||
database,
|
||||
config,
|
||||
chain,
|
||||
metrics,
|
||||
instance,
|
||||
trusted_setup_file,
|
||||
network,
|
||||
rpc,
|
||||
txpool,
|
||||
builder,
|
||||
debug,
|
||||
db,
|
||||
dev,
|
||||
pruning,
|
||||
#[cfg(feature = "optimism")]
|
||||
rollup,
|
||||
};
|
||||
|
||||
if self.debug.terminate {
|
||||
Ok(())
|
||||
} else {
|
||||
// The pipeline has finished downloading blocks up to `--debug.tip` or
|
||||
// `--debug.max-block`. Keep other node components alive for further usage.
|
||||
futures::future::pending().await
|
||||
}
|
||||
let executor = ctx.task_executor;
|
||||
|
||||
// launch the node
|
||||
let handle = node_config.launch::<Ext>(ext, executor).await?;
|
||||
|
||||
// wait for node exit
|
||||
handle.wait_for_node_exit().await
|
||||
}
|
||||
|
||||
/// Returns the [Consensus] instance to use.
|
||||
@ -600,425 +233,6 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
|
||||
Arc::new(BeaconConsensus::new(Arc::clone(&self.chain)))
|
||||
}
|
||||
}
|
||||
|
||||
/// Constructs a [Pipeline] that's wired to the network
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn build_networked_pipeline<DB, Client>(
|
||||
&self,
|
||||
config: &StageConfig,
|
||||
client: Client,
|
||||
consensus: Arc<dyn Consensus>,
|
||||
provider_factory: ProviderFactory<DB>,
|
||||
task_executor: &TaskExecutor,
|
||||
metrics_tx: reth_stages::MetricEventsSender,
|
||||
prune_config: Option<PruneConfig>,
|
||||
max_block: Option<BlockNumber>,
|
||||
) -> eyre::Result<Pipeline<DB>>
|
||||
where
|
||||
DB: Database + Unpin + Clone + 'static,
|
||||
Client: HeadersClient + BodiesClient + Clone + 'static,
|
||||
{
|
||||
// building network downloaders using the fetch client
|
||||
let header_downloader = ReverseHeadersDownloaderBuilder::new(config.headers)
|
||||
.build(client.clone(), Arc::clone(&consensus))
|
||||
.into_task_with(task_executor);
|
||||
|
||||
let body_downloader = BodiesDownloaderBuilder::new(config.bodies)
|
||||
.build(client, Arc::clone(&consensus), provider_factory.clone())
|
||||
.into_task_with(task_executor);
|
||||
|
||||
let pipeline = self
|
||||
.build_pipeline(
|
||||
provider_factory,
|
||||
config,
|
||||
header_downloader,
|
||||
body_downloader,
|
||||
consensus,
|
||||
max_block,
|
||||
self.debug.continuous,
|
||||
metrics_tx,
|
||||
prune_config,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(pipeline)
|
||||
}
|
||||
|
||||
/// Returns the chain specific path to the data dir.
|
||||
fn data_dir(&self) -> ChainPath<DataDirPath> {
|
||||
self.datadir.unwrap_or_chain_default(self.chain.chain)
|
||||
}
|
||||
|
||||
/// Returns the path to the config file.
|
||||
fn config_path(&self) -> PathBuf {
|
||||
self.config.clone().unwrap_or_else(|| self.data_dir().config_path())
|
||||
}
|
||||
|
||||
/// Loads the reth config with the given datadir root
|
||||
fn load_config(&self) -> eyre::Result<Config> {
|
||||
let config_path = self.config_path();
|
||||
let mut config = confy::load_path::<Config>(&config_path)
|
||||
.wrap_err_with(|| format!("Could not load config file {:?}", config_path))?;
|
||||
|
||||
info!(target: "reth::cli", path = ?config_path, "Configuration loaded");
|
||||
|
||||
// Update the config with the command line arguments
|
||||
config.peers.connect_trusted_nodes_only = self.network.trusted_only;
|
||||
|
||||
if !self.network.trusted_peers.is_empty() {
|
||||
info!(target: "reth::cli", "Adding trusted nodes");
|
||||
self.network.trusted_peers.iter().for_each(|peer| {
|
||||
config.peers.trusted_nodes.insert(*peer);
|
||||
});
|
||||
}
|
||||
|
||||
Ok(config)
|
||||
}
|
||||
|
||||
/// Loads the trusted setup params from a given file path or falls back to
|
||||
/// `MAINNET_KZG_TRUSTED_SETUP`.
|
||||
fn kzg_settings(&self) -> eyre::Result<Arc<KzgSettings>> {
|
||||
if let Some(ref trusted_setup_file) = self.trusted_setup_file {
|
||||
let trusted_setup = KzgSettings::load_trusted_setup_file(trusted_setup_file)
|
||||
.map_err(LoadKzgSettingsError::KzgError)?;
|
||||
Ok(Arc::new(trusted_setup))
|
||||
} else {
|
||||
Ok(Arc::clone(&MAINNET_KZG_TRUSTED_SETUP))
|
||||
}
|
||||
}
|
||||
|
||||
fn install_prometheus_recorder(&self) -> eyre::Result<PrometheusHandle> {
|
||||
prometheus_exporter::install_recorder()
|
||||
}
|
||||
|
||||
async fn start_metrics_endpoint<Metrics>(
|
||||
&self,
|
||||
prometheus_handle: PrometheusHandle,
|
||||
db: Metrics,
|
||||
) -> eyre::Result<()>
|
||||
where
|
||||
Metrics: DatabaseMetrics + 'static + Send + Sync,
|
||||
{
|
||||
if let Some(listen_addr) = self.metrics {
|
||||
info!(target: "reth::cli", addr = %listen_addr, "Starting metrics endpoint");
|
||||
prometheus_exporter::serve(
|
||||
listen_addr,
|
||||
prometheus_handle,
|
||||
db,
|
||||
metrics_process::Collector::default(),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Spawns the configured network and associated tasks and returns the [NetworkHandle] connected
|
||||
/// to that network.
|
||||
fn start_network<C, Pool>(
|
||||
&self,
|
||||
builder: NetworkBuilder<C, (), ()>,
|
||||
task_executor: &TaskExecutor,
|
||||
pool: Pool,
|
||||
client: C,
|
||||
default_peers_path: PathBuf,
|
||||
) -> NetworkHandle
|
||||
where
|
||||
C: BlockReader + HeaderProvider + Clone + Unpin + 'static,
|
||||
Pool: TransactionPool + Unpin + 'static,
|
||||
{
|
||||
let (handle, network, txpool, eth) =
|
||||
builder.transactions(pool).request_handler(client).split_with_handle();
|
||||
|
||||
task_executor.spawn_critical("p2p txpool", txpool);
|
||||
task_executor.spawn_critical("p2p eth request handler", eth);
|
||||
|
||||
let known_peers_file = self.network.persistent_peers_file(default_peers_path);
|
||||
task_executor
|
||||
.spawn_critical_with_graceful_shutdown_signal("p2p network task", |shutdown| {
|
||||
run_network_until_shutdown(shutdown, network, known_peers_file)
|
||||
});
|
||||
|
||||
handle
|
||||
}
|
||||
|
||||
/// Fetches the head block from the database.
|
||||
///
|
||||
/// If the database is empty, returns the genesis block.
|
||||
fn lookup_head<DB: Database>(&self, factory: ProviderFactory<DB>) -> RethResult<Head> {
|
||||
let provider = factory.provider()?;
|
||||
|
||||
let head = provider.get_stage_checkpoint(StageId::Finish)?.unwrap_or_default().block_number;
|
||||
|
||||
let header = provider
|
||||
.header_by_number(head)?
|
||||
.expect("the header for the latest block is missing, database is corrupt");
|
||||
|
||||
let total_difficulty = provider
|
||||
.header_td_by_number(head)?
|
||||
.expect("the total difficulty for the latest block is missing, database is corrupt");
|
||||
|
||||
let hash = provider
|
||||
.block_hash(head)?
|
||||
.expect("the hash for the latest block is missing, database is corrupt");
|
||||
|
||||
Ok(Head {
|
||||
number: head,
|
||||
hash,
|
||||
difficulty: header.difficulty,
|
||||
total_difficulty,
|
||||
timestamp: header.timestamp,
|
||||
})
|
||||
}
|
||||
|
||||
/// Attempt to look up the block number for the tip hash in the database.
|
||||
/// If it doesn't exist, download the header and return the block number.
|
||||
///
|
||||
/// NOTE: The download is attempted with infinite retries.
|
||||
async fn lookup_or_fetch_tip<DB, Client>(
|
||||
&self,
|
||||
provider_factory: ProviderFactory<DB>,
|
||||
client: Client,
|
||||
tip: B256,
|
||||
) -> RethResult<u64>
|
||||
where
|
||||
DB: Database,
|
||||
Client: HeadersClient,
|
||||
{
|
||||
Ok(self.fetch_tip(provider_factory, client, BlockHashOrNumber::Hash(tip)).await?.number)
|
||||
}
|
||||
|
||||
/// Attempt to look up the block with the given number and return the header.
|
||||
///
|
||||
/// NOTE: The download is attempted with infinite retries.
|
||||
async fn fetch_tip<DB, Client>(
|
||||
&self,
|
||||
factory: ProviderFactory<DB>,
|
||||
client: Client,
|
||||
tip: BlockHashOrNumber,
|
||||
) -> RethResult<SealedHeader>
|
||||
where
|
||||
DB: Database,
|
||||
Client: HeadersClient,
|
||||
{
|
||||
let provider = factory.provider()?;
|
||||
|
||||
let header = provider.header_by_hash_or_number(tip)?;
|
||||
|
||||
// try to look up the header in the database
|
||||
if let Some(header) = header {
|
||||
info!(target: "reth::cli", ?tip, "Successfully looked up tip block in the database");
|
||||
return Ok(header.seal_slow())
|
||||
}
|
||||
|
||||
info!(target: "reth::cli", ?tip, "Fetching tip block from the network.");
|
||||
loop {
|
||||
match get_single_header(&client, tip).await {
|
||||
Ok(tip_header) => {
|
||||
info!(target: "reth::cli", ?tip, "Successfully fetched tip");
|
||||
return Ok(tip_header)
|
||||
}
|
||||
Err(error) => {
|
||||
error!(target: "reth::cli", %error, "Failed to fetch the tip. Retrying...");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn load_network_config<DB: Database>(
|
||||
&self,
|
||||
config: &Config,
|
||||
provider_factory: ProviderFactory<DB>,
|
||||
executor: TaskExecutor,
|
||||
head: Head,
|
||||
secret_key: SecretKey,
|
||||
default_peers_path: PathBuf,
|
||||
) -> NetworkConfig<ProviderFactory<DB>> {
|
||||
let cfg_builder = self
|
||||
.network
|
||||
.network_config(config, self.chain.clone(), secret_key, default_peers_path)
|
||||
.with_task_executor(Box::new(executor))
|
||||
.set_head(head)
|
||||
.listener_addr(SocketAddr::V4(SocketAddrV4::new(
|
||||
self.network.addr,
|
||||
// set discovery port based on instance number
|
||||
self.network.port + self.instance - 1,
|
||||
)))
|
||||
.discovery_addr(SocketAddr::V4(SocketAddrV4::new(
|
||||
self.network.addr,
|
||||
// set discovery port based on instance number
|
||||
self.network.port + self.instance - 1,
|
||||
)));
|
||||
|
||||
// When `sequencer_endpoint` is configured, the node will forward all transactions to a
|
||||
// Sequencer node for execution and inclusion on L1, and disable its own txpool
|
||||
// gossip to prevent other parties in the network from learning about them.
|
||||
#[cfg(feature = "optimism")]
|
||||
let cfg_builder = cfg_builder
|
||||
.sequencer_endpoint(self.rollup.sequencer_http.clone())
|
||||
.disable_tx_gossip(self.rollup.disable_txpool_gossip);
|
||||
|
||||
cfg_builder.build(provider_factory)
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn build_pipeline<DB, H, B>(
|
||||
&self,
|
||||
provider_factory: ProviderFactory<DB>,
|
||||
config: &StageConfig,
|
||||
header_downloader: H,
|
||||
body_downloader: B,
|
||||
consensus: Arc<dyn Consensus>,
|
||||
max_block: Option<u64>,
|
||||
continuous: bool,
|
||||
metrics_tx: reth_stages::MetricEventsSender,
|
||||
prune_config: Option<PruneConfig>,
|
||||
) -> eyre::Result<Pipeline<DB>>
|
||||
where
|
||||
DB: Database + Clone + 'static,
|
||||
H: HeaderDownloader + 'static,
|
||||
B: BodyDownloader + 'static,
|
||||
{
|
||||
let mut builder = Pipeline::builder();
|
||||
|
||||
if let Some(max_block) = max_block {
|
||||
debug!(target: "reth::cli", max_block, "Configuring builder to use max block");
|
||||
builder = builder.with_max_block(max_block)
|
||||
}
|
||||
|
||||
let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
|
||||
use reth_revm_inspectors::stack::InspectorStackConfig;
|
||||
let factory = reth_revm::EvmProcessorFactory::new(self.chain.clone());
|
||||
|
||||
let stack_config = InspectorStackConfig {
|
||||
use_printer_tracer: self.debug.print_inspector,
|
||||
hook: if let Some(hook_block) = self.debug.hook_block {
|
||||
Hook::Block(hook_block)
|
||||
} else if let Some(tx) = self.debug.hook_transaction {
|
||||
Hook::Transaction(tx)
|
||||
} else if self.debug.hook_all {
|
||||
Hook::All
|
||||
} else {
|
||||
Hook::None
|
||||
},
|
||||
};
|
||||
|
||||
let factory = factory.with_stack_config(stack_config);
|
||||
|
||||
let prune_modes = prune_config.map(|prune| prune.segments).unwrap_or_default();
|
||||
|
||||
let header_mode =
|
||||
if continuous { HeaderSyncMode::Continuous } else { HeaderSyncMode::Tip(tip_rx) };
|
||||
let pipeline = builder
|
||||
.with_tip_sender(tip_tx)
|
||||
.with_metrics_tx(metrics_tx.clone())
|
||||
.add_stages(
|
||||
DefaultStages::new(
|
||||
provider_factory.clone(),
|
||||
header_mode,
|
||||
Arc::clone(&consensus),
|
||||
header_downloader,
|
||||
body_downloader,
|
||||
factory.clone(),
|
||||
)
|
||||
.set(
|
||||
TotalDifficultyStage::new(consensus)
|
||||
.with_commit_threshold(config.total_difficulty.commit_threshold),
|
||||
)
|
||||
.set(SenderRecoveryStage {
|
||||
commit_threshold: config.sender_recovery.commit_threshold,
|
||||
})
|
||||
.set(
|
||||
ExecutionStage::new(
|
||||
factory,
|
||||
ExecutionStageThresholds {
|
||||
max_blocks: config.execution.max_blocks,
|
||||
max_changes: config.execution.max_changes,
|
||||
max_cumulative_gas: config.execution.max_cumulative_gas,
|
||||
},
|
||||
config
|
||||
.merkle
|
||||
.clean_threshold
|
||||
.max(config.account_hashing.clean_threshold)
|
||||
.max(config.storage_hashing.clean_threshold),
|
||||
prune_modes.clone(),
|
||||
)
|
||||
.with_metrics_tx(metrics_tx),
|
||||
)
|
||||
.set(AccountHashingStage::new(
|
||||
config.account_hashing.clean_threshold,
|
||||
config.account_hashing.commit_threshold,
|
||||
))
|
||||
.set(StorageHashingStage::new(
|
||||
config.storage_hashing.clean_threshold,
|
||||
config.storage_hashing.commit_threshold,
|
||||
))
|
||||
.set(MerkleStage::new_execution(config.merkle.clean_threshold))
|
||||
.set(TransactionLookupStage::new(
|
||||
config.transaction_lookup.commit_threshold,
|
||||
prune_modes.transaction_lookup,
|
||||
))
|
||||
.set(IndexAccountHistoryStage::new(
|
||||
config.index_account_history.commit_threshold,
|
||||
prune_modes.account_history,
|
||||
))
|
||||
.set(IndexStorageHistoryStage::new(
|
||||
config.index_storage_history.commit_threshold,
|
||||
prune_modes.storage_history,
|
||||
)),
|
||||
)
|
||||
.build(provider_factory);
|
||||
|
||||
Ok(pipeline)
|
||||
}
|
||||
|
||||
/// Change rpc port numbers based on the instance number.
|
||||
fn adjust_instance_ports(&mut self) {
|
||||
// auth port is scaled by a factor of instance * 100
|
||||
self.rpc.auth_port += self.instance * 100 - 100;
|
||||
// http port is scaled by a factor of -instance
|
||||
self.rpc.http_port -= self.instance - 1;
|
||||
// ws port is scaled by a factor of instance * 2
|
||||
self.rpc.ws_port += self.instance * 2 - 2;
|
||||
}
|
||||
}
|
||||
|
||||
/// Drives the [NetworkManager] future until a [Shutdown](reth_tasks::shutdown::Shutdown) signal is
|
||||
/// received. If configured, this writes known peers to `persistent_peers_file` afterwards.
|
||||
async fn run_network_until_shutdown<C>(
|
||||
shutdown: reth_tasks::shutdown::GracefulShutdown,
|
||||
network: NetworkManager<C>,
|
||||
persistent_peers_file: Option<PathBuf>,
|
||||
) where
|
||||
C: BlockReader + HeaderProvider + Clone + Unpin + 'static,
|
||||
{
|
||||
pin_mut!(network, shutdown);
|
||||
|
||||
let mut graceful_guard = None;
|
||||
tokio::select! {
|
||||
_ = &mut network => {},
|
||||
guard = shutdown => {
|
||||
graceful_guard = Some(guard);
|
||||
},
|
||||
}
|
||||
|
||||
if let Some(file_path) = persistent_peers_file {
|
||||
let known_peers = network.all_peers().collect::<Vec<_>>();
|
||||
if let Ok(known_peers) = serde_json::to_string_pretty(&known_peers) {
|
||||
trace!(target: "reth::cli", peers_file =?file_path, num_peers=%known_peers.len(), "Saving current peers");
|
||||
let parent_dir = file_path.parent().map(fs::create_dir_all).transpose();
|
||||
match parent_dir.and_then(|_| fs::write(&file_path, known_peers)) {
|
||||
Ok(_) => {
|
||||
info!(target: "reth::cli", peers_file=?file_path, "Wrote network peers to file");
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(target: "reth::cli", ?err, peers_file=?file_path, "Failed to write network peers to file");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
drop(graceful_guard)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@ -1149,7 +363,7 @@ mod tests {
|
||||
#[test]
|
||||
fn parse_instance() {
|
||||
let mut cmd = NodeCommand::<()>::parse_from(["reth"]);
|
||||
cmd.adjust_instance_ports();
|
||||
cmd.rpc.adjust_instance_ports(cmd.instance);
|
||||
cmd.network.port = DEFAULT_DISCOVERY_PORT + cmd.instance - 1;
|
||||
// check rpc port numbers
|
||||
assert_eq!(cmd.rpc.auth_port, 8551);
|
||||
@ -1159,7 +373,7 @@ mod tests {
|
||||
assert_eq!(cmd.network.port, 30303);
|
||||
|
||||
let mut cmd = NodeCommand::<()>::parse_from(["reth", "--instance", "2"]);
|
||||
cmd.adjust_instance_ports();
|
||||
cmd.rpc.adjust_instance_ports(cmd.instance);
|
||||
cmd.network.port = DEFAULT_DISCOVERY_PORT + cmd.instance - 1;
|
||||
// check rpc port numbers
|
||||
assert_eq!(cmd.rpc.auth_port, 8651);
|
||||
@ -1169,7 +383,7 @@ mod tests {
|
||||
assert_eq!(cmd.network.port, 30304);
|
||||
|
||||
let mut cmd = NodeCommand::<()>::parse_from(["reth", "--instance", "3"]);
|
||||
cmd.adjust_instance_ports();
|
||||
cmd.rpc.adjust_instance_ports(cmd.instance);
|
||||
cmd.network.port = DEFAULT_DISCOVERY_PORT + cmd.instance - 1;
|
||||
// check rpc port numbers
|
||||
assert_eq!(cmd.rpc.auth_port, 8751);
|
||||
|
||||
@ -15,9 +15,11 @@ use reth_interfaces::p2p::{
|
||||
headers::client::{HeadersClient, HeadersRequest},
|
||||
priority::Priority,
|
||||
};
|
||||
use reth_network::NetworkManager;
|
||||
use reth_primitives::{
|
||||
fs, BlockHashOrNumber, ChainSpec, HeadersDirection, SealedBlock, SealedHeader,
|
||||
};
|
||||
use reth_provider::BlockReader;
|
||||
use reth_rpc::{JwtError, JwtSecret};
|
||||
use std::{
|
||||
env::VarError,
|
||||
@ -25,7 +27,7 @@ use std::{
|
||||
rc::Rc,
|
||||
sync::Arc,
|
||||
};
|
||||
use tracing::{debug, info};
|
||||
use tracing::{debug, info, trace, warn};
|
||||
|
||||
/// Exposing `open_db_read_only` function
|
||||
pub mod db {
|
||||
@ -255,8 +257,8 @@ impl ListFilter {
|
||||
self.len = len;
|
||||
}
|
||||
}
|
||||
/// Attempts to retrieve or create a JWT secret from the specified path.
|
||||
|
||||
/// Attempts to retrieve or create a JWT secret from the specified path.
|
||||
pub fn get_or_create_jwt_secret_from_path(path: &Path) -> Result<JwtSecret, JwtError> {
|
||||
if path.exists() {
|
||||
debug!(target: "reth::cli", ?path, "Reading JWT auth secret file");
|
||||
@ -266,3 +268,26 @@ pub fn get_or_create_jwt_secret_from_path(path: &Path) -> Result<JwtSecret, JwtE
|
||||
JwtSecret::try_create(path)
|
||||
}
|
||||
}
|
||||
|
||||
/// Collect the peers from the [NetworkManager] and write them to the given `persistent_peers_file`,
|
||||
/// if configured.
|
||||
pub fn write_peers_to_file<C>(network: &NetworkManager<C>, persistent_peers_file: Option<PathBuf>)
|
||||
where
|
||||
C: BlockReader + Unpin,
|
||||
{
|
||||
if let Some(file_path) = persistent_peers_file {
|
||||
let known_peers = network.all_peers().collect::<Vec<_>>();
|
||||
if let Ok(known_peers) = serde_json::to_string_pretty(&known_peers) {
|
||||
trace!(target: "reth::cli", peers_file =?file_path, num_peers=%known_peers.len(), "Saving current peers");
|
||||
let parent_dir = file_path.parent().map(fs::create_dir_all).transpose();
|
||||
match parent_dir.and_then(|_| fs::write(&file_path, known_peers)) {
|
||||
Ok(_) => {
|
||||
info!(target: "reth::cli", peers_file=?file_path, "Wrote network peers to file");
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(target: "reth::cli", ?err, peers_file=?file_path, "Failed to write network peers to file");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -33,7 +33,7 @@ use crate::{
|
||||
transactions::NetworkTransactionEvent,
|
||||
FetchClient, NetworkBuilder,
|
||||
};
|
||||
use futures::{Future, StreamExt};
|
||||
use futures::{pin_mut, Future, StreamExt};
|
||||
use parking_lot::Mutex;
|
||||
use reth_eth_wire::{
|
||||
capability::{Capabilities, CapabilityMessage},
|
||||
@ -45,6 +45,7 @@ use reth_network_api::ReputationChangeKind;
|
||||
use reth_primitives::{ForkId, NodeRecord, PeerId, B256};
|
||||
use reth_provider::{BlockNumReader, BlockReader};
|
||||
use reth_rpc_types::{EthProtocolInfo, NetworkStatus};
|
||||
use reth_tasks::shutdown::GracefulShutdown;
|
||||
use reth_tokio_util::EventListeners;
|
||||
use secp256k1::SecretKey;
|
||||
use std::{
|
||||
@ -596,6 +597,34 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<C> NetworkManager<C>
|
||||
where
|
||||
C: BlockReader + Unpin,
|
||||
{
|
||||
/// Drives the [NetworkManager] future until a [GracefulShutdown] signal is received.
|
||||
///
|
||||
/// This also run the given function `shutdown_hook` afterwards.
|
||||
pub async fn run_until_graceful_shutdown(
|
||||
self,
|
||||
shutdown: GracefulShutdown,
|
||||
shutdown_hook: impl FnOnce(&mut Self),
|
||||
) {
|
||||
let network = self;
|
||||
pin_mut!(network, shutdown);
|
||||
|
||||
let mut graceful_guard = None;
|
||||
tokio::select! {
|
||||
_ = &mut network => {},
|
||||
guard = shutdown => {
|
||||
graceful_guard = Some(guard);
|
||||
},
|
||||
}
|
||||
|
||||
shutdown_hook(&mut network);
|
||||
drop(graceful_guard);
|
||||
}
|
||||
}
|
||||
|
||||
impl<C> Future for NetworkManager<C>
|
||||
where
|
||||
C: BlockReader + Unpin,
|
||||
|
||||
@ -226,9 +226,15 @@ pub mod test_utils {
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a temporary directory path to use for the database
|
||||
pub fn tempdir_path() -> PathBuf {
|
||||
let builder = tempfile::Builder::new().prefix("reth-test-").rand_bytes(8).tempdir();
|
||||
builder.expect(ERROR_TEMPDIR).into_path()
|
||||
}
|
||||
|
||||
/// Create read/write database for testing
|
||||
pub fn create_test_rw_db() -> Arc<TempDatabase<DatabaseEnv>> {
|
||||
let path = tempfile::TempDir::new().expect(ERROR_TEMPDIR).into_path();
|
||||
let path = tempdir_path();
|
||||
let emsg = format!("{}: {:?}", ERROR_DB_CREATION, path);
|
||||
|
||||
let db = init_db(&path, None).expect(&emsg);
|
||||
@ -245,7 +251,7 @@ pub mod test_utils {
|
||||
|
||||
/// Create read only database for testing
|
||||
pub fn create_test_ro_db() -> Arc<TempDatabase<DatabaseEnv>> {
|
||||
let path = tempfile::TempDir::new().expect(ERROR_TEMPDIR).into_path();
|
||||
let path = tempdir_path();
|
||||
{
|
||||
init_db(path.as_path(), None).expect(ERROR_DB_CREATION);
|
||||
}
|
||||
|
||||
@ -33,7 +33,8 @@ pub type BoxedLayer<S> = Box<dyn Layer<S> + Send + Sync>;
|
||||
|
||||
/// Initializes a new [Subscriber] based on the given layers.
|
||||
pub fn init(layers: Vec<BoxedLayer<Registry>>) {
|
||||
tracing_subscriber::registry().with(layers).init();
|
||||
// To avoid panicking in tests, we silently fail if we cannot initialize the subscriber.
|
||||
let _ = tracing_subscriber::registry().with(layers).try_init();
|
||||
}
|
||||
|
||||
/// Builds a new tracing layer that writes to stdout.
|
||||
|
||||
Reference in New Issue
Block a user