chore: reuse build_import_pipeline for OP (#13529)

This commit is contained in:
Arsenii Kulikov
2024-12-24 02:39:29 +04:00
committed by GitHub
parent 8579c08d63
commit 5ea8f31896
5 changed files with 8 additions and 117 deletions

View File

@ -36,11 +36,7 @@ reth-optimism-primitives.workspace = true
reth-optimism-chainspec.workspace = true
reth-chainspec.workspace = true
reth-stages-types.workspace = true
reth-node-events.workspace = true
reth-network-p2p.workspace = true
reth-errors.workspace = true
reth-config.workspace = true
reth-optimism-evm.workspace = true
reth-cli.workspace = true
reth-cli-runner.workspace = true

View File

@ -1,101 +0,0 @@
use alloy_primitives::B256;
use futures_util::{Stream, StreamExt};
use reth_cli_commands::common::CliNodeTypes;
use reth_config::Config;
use reth_consensus::Consensus;
use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder, file_client::FileClient,
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
};
use reth_errors::ProviderError;
use reth_network_p2p::{
bodies::downloader::BodyDownloader,
headers::downloader::{HeaderDownloader, SyncTarget},
};
use reth_node_events::node::NodeEvent;
use reth_optimism_chainspec::OpChainSpec;
use reth_optimism_evm::OpExecutorProvider;
use reth_provider::{
providers::ProviderNodeTypes, BlockNumReader, ChainSpecProvider, HeaderProvider,
ProviderFactory,
};
use reth_prune::PruneModes;
use reth_stages::{sets::DefaultStages, Pipeline, StageSet};
use reth_stages_types::StageId;
use reth_static_file::StaticFileProducer;
use std::sync::Arc;
use tokio::sync::watch;
/// Builds import pipeline.
///
/// If configured to execute, all stages will run. Otherwise, only stages that don't require state
/// will run.
pub(crate) async fn build_import_pipeline<N, C>(
config: &Config,
provider_factory: ProviderFactory<N>,
consensus: &Arc<C>,
file_client: Arc<FileClient>,
static_file_producer: StaticFileProducer<ProviderFactory<N>>,
disable_exec: bool,
) -> eyre::Result<(Pipeline<N>, impl Stream<Item = NodeEvent>)>
where
N: CliNodeTypes + ProviderNodeTypes<ChainSpec = OpChainSpec>,
C: Consensus + 'static,
{
if !file_client.has_canonical_blocks() {
eyre::bail!("unable to import non canonical blocks");
}
// Retrieve latest header found in the database.
let last_block_number = provider_factory.last_block_number()?;
let local_head = provider_factory
.sealed_header(last_block_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(last_block_number.into()))?;
let mut header_downloader = ReverseHeadersDownloaderBuilder::new(config.stages.headers)
.build(file_client.clone(), consensus.clone())
.into_task();
// TODO: The pipeline should correctly configure the downloader on its own.
// Find the possibility to remove unnecessary pre-configuration.
header_downloader.update_local_head(local_head);
header_downloader.update_sync_target(SyncTarget::Tip(file_client.tip().unwrap()));
let mut body_downloader = BodiesDownloaderBuilder::new(config.stages.bodies)
.build(file_client.clone(), consensus.clone(), provider_factory.clone())
.into_task();
// TODO: The pipeline should correctly configure the downloader on its own.
// Find the possibility to remove unnecessary pre-configuration.
body_downloader
.set_download_range(file_client.min_block().unwrap()..=file_client.max_block().unwrap())
.expect("failed to set download range");
let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
let executor = OpExecutorProvider::optimism(provider_factory.chain_spec());
let max_block = file_client.max_block().unwrap_or(0);
let pipeline = Pipeline::<N>::builder()
.with_tip_sender(tip_tx)
// we want to sync all blocks the file client provides or 0 if empty
.with_max_block(max_block)
.with_fail_on_unwind(true)
.add_stages(
DefaultStages::new(
provider_factory.clone(),
tip_rx,
consensus.clone(),
header_downloader,
body_downloader,
executor,
config.stages.clone(),
PruneModes::default(),
)
.builder()
.disable_all_if(&StageId::STATE_REQUIRED, || disable_exec),
)
.build(provider_factory, static_file_producer);
let events = pipeline.events().map(Into::into);
Ok((pipeline, events))
}

View File

@ -2,7 +2,10 @@
//! file.
use clap::Parser;
use reth_cli::chainspec::ChainSpecParser;
use reth_cli_commands::common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs};
use reth_cli_commands::{
common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs},
import::build_import_pipeline,
};
use reth_consensus::noop::NoopConsensus;
use reth_db::tables;
use reth_db_api::transaction::DbTx;
@ -11,16 +14,15 @@ use reth_downloaders::file_client::{
};
use reth_node_core::version::SHORT_VERSION;
use reth_optimism_chainspec::OpChainSpec;
use reth_optimism_evm::OpExecutorProvider;
use reth_optimism_primitives::bedrock::is_dup_tx;
use reth_provider::StageCheckpointReader;
use reth_provider::{ChainSpecProvider, StageCheckpointReader};
use reth_prune::PruneModes;
use reth_stages::StageId;
use reth_static_file::StaticFileProducer;
use std::{path::PathBuf, sync::Arc};
use tracing::{debug, error, info};
use crate::commands::build_pipeline::build_import_pipeline;
/// Syncs RLP encoded blocks from a file.
#[derive(Debug, Parser)]
pub struct ImportOpCommand<C: ChainSpecParser> {
@ -94,8 +96,8 @@ impl<C: ChainSpecParser<ChainSpec = OpChainSpec>> ImportOpCommand<C> {
Arc::new(file_client),
StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
true,
)
.await?;
OpExecutorProvider::optimism(provider_factory.chain_spec()),
)?;
// override the tip
pipeline.set_tip(tip);

View File

@ -10,8 +10,6 @@ use reth_cli_commands::{
};
use std::fmt;
/// Helper function to build an import pipeline.
mod build_pipeline;
pub mod import;
pub mod import_receipts;
pub mod init_state;