chore: use downloader and client traits where possible (#1992)

This commit is contained in:
Matthias Seitz
2023-03-27 19:45:17 +02:00
committed by GitHub
parent 81183b274f
commit 7263c9a644
8 changed files with 72 additions and 56 deletions

View File

@ -32,15 +32,16 @@ use reth_executor::{
use reth_interfaces::{
consensus::{Consensus, ForkchoiceState},
p2p::{
bodies::downloader::BodyDownloader,
headers::{client::StatusUpdater, downloader::HeaderDownloader},
bodies::{client::BodiesClient, downloader::BodyDownloader},
headers::{
client::{HeadersClient, StatusUpdater},
downloader::HeaderDownloader,
},
},
sync::SyncStateUpdater,
test_utils::TestChainEventSubscriptions,
};
use reth_network::{
error::NetworkError, FetchClient, NetworkConfig, NetworkHandle, NetworkManager,
};
use reth_network::{error::NetworkError, NetworkConfig, NetworkHandle, NetworkManager};
use reth_network_api::NetworkInfo;
use reth_primitives::{BlockHashOrNumber, ChainSpec, Head, Header, SealedHeader, H256};
use reth_provider::{BlockProvider, HeaderProvider, ShareableDatabase};
@ -146,8 +147,7 @@ impl Command {
init_genesis(db.clone(), self.chain.clone())?;
let consensus = Arc::new(BeaconConsensus::new(self.chain.clone())) as Arc<dyn Consensus>;
info!(target: "reth::cli", "Consensus engine initialized");
let consensus: Arc<dyn Consensus> = Arc::new(BeaconConsensus::new(Arc::clone(&self.chain)));
self.init_trusted_nodes(&mut config);
@ -213,14 +213,14 @@ impl Command {
};
let engine_api_handle =
self.init_engine_api(Arc::clone(&db), consensus_engine_tx, &ctx.task_executor);
self.init_engine_api(Arc::clone(&db), consensus_engine_tx.clone(), &ctx.task_executor);
info!(target: "reth::cli", "Engine API handler initialized");
let _auth_server = self
.rpc
.start_auth_server(
shareable_db,
transaction_pool,
shareable_db.clone(),
transaction_pool.clone(),
network.clone(),
ctx.task_executor.clone(),
self.chain.clone(),
@ -229,17 +229,20 @@ impl Command {
.await?;
info!(target: "reth::cli", "Started Auth server");
let client = network.fetch_client().await?;
let (pipeline, events) = self
.build_networked_pipeline(
&mut config,
network.clone(),
&consensus,
client,
Arc::clone(&consensus),
db.clone(),
&ctx.task_executor,
)
.await?;
ctx.task_executor.spawn(events::handle_events(Some(network.clone()), events));
ctx.task_executor
.spawn_critical("events task", events::handle_events(Some(network.clone()), events));
let beacon_consensus_engine = self.build_consensus_engine(
db.clone(),
@ -248,6 +251,7 @@ impl Command {
pipeline,
consensus_engine_rx,
)?;
info!(target: "reth::cli", "Consensus engine initialized");
// Run consensus engine
let (rx, tx) = tokio::sync::oneshot::channel();
@ -270,33 +274,35 @@ impl Command {
}
}
async fn build_networked_pipeline(
/// Constructs a [Pipeline] that's wired to the network
async fn build_networked_pipeline<Client>(
&self,
config: &mut Config,
network: NetworkHandle,
consensus: &Arc<dyn Consensus>,
client: Client,
consensus: Arc<dyn Consensus>,
db: Arc<Env<WriteMap>>,
task_executor: &TaskExecutor,
) -> eyre::Result<(Pipeline<Env<WriteMap>, impl SyncStateUpdater>, impl Stream<Item = NodeEvent>)>
) -> eyre::Result<(Pipeline<Env<WriteMap>, NetworkHandle>, impl Stream<Item = NodeEvent>)>
where
Client: HeadersClient + BodiesClient + Clone + 'static,
{
let fetch_client = network.fetch_client().await?;
let max_block = if let Some(block) = self.debug.max_block {
Some(block)
} else if let Some(tip) = self.debug.tip {
Some(self.lookup_or_fetch_tip(db.clone(), fetch_client.clone(), tip).await?)
Some(self.lookup_or_fetch_tip(db.clone(), &client, tip).await?)
} else {
None
};
// TODO: remove Arc requirement from downloader builders.
// building network downloaders using the fetch client
let fetch_client = Arc::new(fetch_client);
let header_downloader = ReverseHeadersDownloaderBuilder::from(config.stages.headers)
.build(fetch_client.clone(), consensus.clone())
.build(client.clone(), Arc::clone(&consensus))
.into_task_with(task_executor);
let body_downloader = BodiesDownloaderBuilder::from(config.stages.bodies)
.build(fetch_client.clone(), consensus.clone(), db.clone())
.build(client, Arc::clone(&consensus), db.clone())
.into_task_with(task_executor);
let mut pipeline = self
@ -445,24 +451,30 @@ impl Command {
/// If it doesn't exist, download the header and return the block number.
///
/// NOTE: The download is attempted with infinite retries.
async fn lookup_or_fetch_tip(
async fn lookup_or_fetch_tip<Client>(
&self,
db: Arc<Env<WriteMap>>,
fetch_client: FetchClient,
client: Client,
tip: H256,
) -> Result<u64, reth_interfaces::Error> {
Ok(self.fetch_tip(db, fetch_client, BlockHashOrNumber::Hash(tip)).await?.number)
) -> Result<u64, reth_interfaces::Error>
where
Client: HeadersClient,
{
Ok(self.fetch_tip(db, client, BlockHashOrNumber::Hash(tip)).await?.number)
}
/// Attempt to look up the block with the given number and return the header.
///
/// NOTE: The download is attempted with infinite retries.
async fn fetch_tip(
async fn fetch_tip<Client>(
&self,
db: Arc<Env<WriteMap>>,
fetch_client: FetchClient,
client: Client,
tip: BlockHashOrNumber,
) -> Result<SealedHeader, reth_interfaces::Error> {
) -> Result<SealedHeader, reth_interfaces::Error>
where
Client: HeadersClient,
{
let header = db.view(|tx| -> Result<Option<Header>, reth_db::Error> {
let number = match tip {
BlockHashOrNumber::Hash(hash) => tx.get::<tables::HeaderNumbers>(hash)?,
@ -479,7 +491,7 @@ impl Command {
info!(target: "reth::cli", ?tip, "Fetching tip block from the network.");
loop {
match get_single_header(fetch_client.clone(), tip).await {
match get_single_header(&client, tip).await {
Ok(tip_header) => {
info!(target: "reth::cli", ?tip, "Successfully fetched tip");
return Ok(tip_header)
@ -521,7 +533,7 @@ impl Command {
header_downloader: H,
body_downloader: B,
updater: U,
consensus: &Arc<dyn Consensus>,
consensus: Arc<dyn Consensus>,
max_block: Option<u64>,
continuous: bool,
) -> eyre::Result<Pipeline<Env<WriteMap>, U>>
@ -566,14 +578,14 @@ impl Command {
.add_stages(
DefaultStages::new(
header_mode,
consensus.clone(),
Arc::clone(&consensus),
header_downloader,
body_downloader,
updater,
factory.clone(),
)
.set(
TotalDifficultyStage::new(consensus.clone())
TotalDifficultyStage::new(consensus)
.with_commit_threshold(stage_conf.total_difficulty.commit_threshold),
)
.set(SenderRecoveryStage {

View File

@ -9,23 +9,24 @@ use reth_db::{
};
use reth_interfaces::{
p2p::{
download::DownloadClient,
headers::client::{HeadersClient, HeadersRequest},
priority::Priority,
},
test_utils::generators::random_block_range,
};
use reth_network::FetchClient;
use reth_primitives::{BlockHashOrNumber, HeadersDirection, SealedHeader};
use reth_provider::insert_canonical_block;
use std::collections::BTreeMap;
use tracing::info;
/// Get a single header from network
pub async fn get_single_header(
client: FetchClient,
pub async fn get_single_header<Client>(
client: Client,
id: BlockHashOrNumber,
) -> eyre::Result<SealedHeader> {
) -> eyre::Result<SealedHeader>
where
Client: HeadersClient,
{
let request = HeadersRequest { direction: HeadersDirection::Rising, limit: 1, start: id };
let (peer_id, response) =