mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
refactor: give Pipeline a database (#2558)
This commit is contained in:
@ -6,7 +6,8 @@ use clap::{crate_version, Parser};
|
||||
use eyre::Context;
|
||||
use futures::{Stream, StreamExt};
|
||||
use reth_beacon_consensus::BeaconConsensus;
|
||||
use reth_db::mdbx::{Env, WriteMap};
|
||||
|
||||
use reth_db::database::Database;
|
||||
use reth_downloaders::{
|
||||
bodies::bodies::BodiesDownloaderBuilder,
|
||||
headers::reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::FileClient,
|
||||
@ -104,7 +105,7 @@ impl ImportCommand {
|
||||
info!(target: "reth::cli", "Chain file imported");
|
||||
|
||||
let (mut pipeline, events) =
|
||||
self.build_import_pipeline(config, db.clone(), &consensus, file_client).await?;
|
||||
self.build_import_pipeline(config, db, &consensus, file_client).await?;
|
||||
|
||||
// override the tip
|
||||
pipeline.set_tip(tip);
|
||||
@ -115,7 +116,7 @@ impl ImportCommand {
|
||||
// Run pipeline
|
||||
info!(target: "reth::cli", "Starting sync pipeline");
|
||||
tokio::select! {
|
||||
res = pipeline.run(db.clone()) => res?,
|
||||
res = pipeline.run() => res?,
|
||||
_ = tokio::signal::ctrl_c() => {},
|
||||
};
|
||||
|
||||
@ -123,14 +124,15 @@ impl ImportCommand {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn build_import_pipeline<C>(
|
||||
async fn build_import_pipeline<DB, C>(
|
||||
&self,
|
||||
config: Config,
|
||||
db: Arc<Env<WriteMap>>,
|
||||
db: DB,
|
||||
consensus: &Arc<C>,
|
||||
file_client: Arc<FileClient>,
|
||||
) -> eyre::Result<(Pipeline<Env<WriteMap>>, impl Stream<Item = NodeEvent>)>
|
||||
) -> eyre::Result<(Pipeline<DB>, impl Stream<Item = NodeEvent>)>
|
||||
where
|
||||
DB: Database + Clone + Unpin + 'static,
|
||||
C: Consensus + 'static,
|
||||
{
|
||||
if !file_client.has_canonical_blocks() {
|
||||
@ -142,7 +144,7 @@ impl ImportCommand {
|
||||
.into_task();
|
||||
|
||||
let body_downloader = BodiesDownloaderBuilder::from(config.stages.bodies)
|
||||
.build(file_client.clone(), consensus.clone(), db)
|
||||
.build(file_client.clone(), consensus.clone(), db.clone())
|
||||
.into_task();
|
||||
|
||||
let (tip_tx, tip_rx) = watch::channel(H256::zero());
|
||||
@ -171,7 +173,7 @@ impl ImportCommand {
|
||||
})
|
||||
.set(ExecutionStage::new(factory, config.stages.execution.commit_threshold)),
|
||||
)
|
||||
.build();
|
||||
.build(db);
|
||||
|
||||
let events = pipeline.events().map(Into::into);
|
||||
|
||||
|
||||
@ -412,22 +412,23 @@ impl Command {
|
||||
}
|
||||
|
||||
/// Constructs a [Pipeline] that's wired to the network
|
||||
async fn build_networked_pipeline<Client>(
|
||||
async fn build_networked_pipeline<DB, Client>(
|
||||
&self,
|
||||
config: &mut Config,
|
||||
network: NetworkHandle,
|
||||
client: Client,
|
||||
consensus: Arc<dyn Consensus>,
|
||||
db: Arc<Env<WriteMap>>,
|
||||
db: DB,
|
||||
task_executor: &TaskExecutor,
|
||||
) -> eyre::Result<Pipeline<Env<WriteMap>>>
|
||||
) -> eyre::Result<Pipeline<DB>>
|
||||
where
|
||||
DB: Database + Unpin + Clone + 'static,
|
||||
Client: HeadersClient + BodiesClient + Clone + 'static,
|
||||
{
|
||||
let max_block = if let Some(block) = self.debug.max_block {
|
||||
Some(block)
|
||||
} else if let Some(tip) = self.debug.tip {
|
||||
Some(self.lookup_or_fetch_tip(db.clone(), &client, tip).await?)
|
||||
Some(self.lookup_or_fetch_tip(&db, &client, tip).await?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
@ -443,6 +444,7 @@ impl Command {
|
||||
|
||||
let pipeline = self
|
||||
.build_pipeline(
|
||||
db,
|
||||
config,
|
||||
header_downloader,
|
||||
body_downloader,
|
||||
@ -541,13 +543,14 @@ impl Command {
|
||||
/// If it doesn't exist, download the header and return the block number.
|
||||
///
|
||||
/// NOTE: The download is attempted with infinite retries.
|
||||
async fn lookup_or_fetch_tip<Client>(
|
||||
async fn lookup_or_fetch_tip<DB, Client>(
|
||||
&self,
|
||||
db: Arc<Env<WriteMap>>,
|
||||
db: &DB,
|
||||
client: Client,
|
||||
tip: H256,
|
||||
) -> Result<u64, reth_interfaces::Error>
|
||||
where
|
||||
DB: Database,
|
||||
Client: HeadersClient,
|
||||
{
|
||||
Ok(self.fetch_tip(db, client, BlockHashOrNumber::Hash(tip)).await?.number)
|
||||
@ -556,13 +559,14 @@ impl Command {
|
||||
/// Attempt to look up the block with the given number and return the header.
|
||||
///
|
||||
/// NOTE: The download is attempted with infinite retries.
|
||||
async fn fetch_tip<Client>(
|
||||
async fn fetch_tip<DB, Client>(
|
||||
&self,
|
||||
db: Arc<Env<WriteMap>>,
|
||||
db: &DB,
|
||||
client: Client,
|
||||
tip: BlockHashOrNumber,
|
||||
) -> Result<SealedHeader, reth_interfaces::Error>
|
||||
where
|
||||
DB: Database,
|
||||
Client: HeadersClient,
|
||||
{
|
||||
let header = db.view(|tx| -> Result<Option<Header>, reth_db::Error> {
|
||||
@ -619,8 +623,9 @@ impl Command {
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn build_pipeline<H, B, U>(
|
||||
async fn build_pipeline<DB, H, B, U>(
|
||||
&self,
|
||||
db: DB,
|
||||
config: &Config,
|
||||
header_downloader: H,
|
||||
body_downloader: B,
|
||||
@ -628,8 +633,9 @@ impl Command {
|
||||
consensus: Arc<dyn Consensus>,
|
||||
max_block: Option<u64>,
|
||||
continuous: bool,
|
||||
) -> eyre::Result<Pipeline<Env<WriteMap>>>
|
||||
) -> eyre::Result<Pipeline<DB>>
|
||||
where
|
||||
DB: Database + Clone + 'static,
|
||||
H: HeaderDownloader + 'static,
|
||||
B: BodyDownloader + 'static,
|
||||
U: SyncStateUpdater + StatusUpdater + Clone + 'static,
|
||||
@ -687,7 +693,7 @@ impl Command {
|
||||
.disable_if(MERKLE_UNWIND, || self.auto_mine)
|
||||
.disable_if(MERKLE_EXECUTION, || self.auto_mine),
|
||||
)
|
||||
.build();
|
||||
.build(db);
|
||||
|
||||
Ok(pipeline)
|
||||
}
|
||||
|
||||
@ -20,7 +20,6 @@ use reth_tasks::TaskSpawner;
|
||||
use schnellru::{ByLength, LruMap};
|
||||
use std::{
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tokio::sync::{
|
||||
@ -141,7 +140,7 @@ where
|
||||
BT: BlockchainTreeEngine,
|
||||
{
|
||||
/// The database handle.
|
||||
db: Arc<DB>,
|
||||
db: DB,
|
||||
/// Task spawner for spawning the pipeline.
|
||||
task_spawner: TS,
|
||||
/// The current state of the pipeline.
|
||||
@ -184,7 +183,7 @@ where
|
||||
{
|
||||
/// Create a new instance of the [BeaconConsensusEngine].
|
||||
pub fn new(
|
||||
db: Arc<DB>,
|
||||
db: DB,
|
||||
task_spawner: TS,
|
||||
pipeline: Pipeline<DB>,
|
||||
blockchain_tree: BT,
|
||||
@ -210,7 +209,7 @@ where
|
||||
/// the [BeaconEngineMessage] communication channel.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn with_channel(
|
||||
db: Arc<DB>,
|
||||
db: DB,
|
||||
task_spawner: TS,
|
||||
pipeline: Pipeline<DB>,
|
||||
blockchain_tree: BT,
|
||||
@ -570,11 +569,10 @@ where
|
||||
self.metrics.pipeline_runs.increment(1);
|
||||
trace!(target: "consensus::engine", ?tip, continuous = tip.is_none(), "Starting the pipeline");
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let db = self.db.clone();
|
||||
self.task_spawner.spawn_critical_blocking(
|
||||
"pipeline",
|
||||
Box::pin(async move {
|
||||
let result = pipeline.run_as_fut(db, tip).await;
|
||||
let result = pipeline.run_as_fut(tip).await;
|
||||
let _ = tx.send(result);
|
||||
}),
|
||||
);
|
||||
@ -768,7 +766,7 @@ enum PipelineTarget {
|
||||
Safe,
|
||||
}
|
||||
|
||||
/// Keeps track of invalid headerst.
|
||||
/// Keeps track of invalid headers.
|
||||
struct InvalidHeaderCache {
|
||||
headers: LruMap<H256, Header>,
|
||||
}
|
||||
@ -807,20 +805,20 @@ mod tests {
|
||||
use reth_provider::{test_utils::TestExecutorFactory, Transaction};
|
||||
use reth_stages::{test_utils::TestStages, ExecOutput, PipelineError, StageError};
|
||||
use reth_tasks::TokioTaskExecutor;
|
||||
use std::{collections::VecDeque, time::Duration};
|
||||
use std::{collections::VecDeque, sync::Arc, time::Duration};
|
||||
use tokio::sync::{
|
||||
oneshot::{self, error::TryRecvError},
|
||||
watch,
|
||||
};
|
||||
|
||||
type TestBeaconConsensusEngine = BeaconConsensusEngine<
|
||||
Env<WriteMap>,
|
||||
Arc<Env<WriteMap>>,
|
||||
TokioTaskExecutor,
|
||||
ShareableBlockchainTree<Arc<Env<WriteMap>>, TestConsensus, TestExecutorFactory>,
|
||||
>;
|
||||
|
||||
struct TestEnv<DB> {
|
||||
db: Arc<DB>,
|
||||
db: DB,
|
||||
// Keep the tip receiver around, so it's not dropped.
|
||||
#[allow(dead_code)]
|
||||
tip_rx: watch::Receiver<H256>,
|
||||
@ -829,7 +827,7 @@ mod tests {
|
||||
|
||||
impl<DB> TestEnv<DB> {
|
||||
fn new(
|
||||
db: Arc<DB>,
|
||||
db: DB,
|
||||
tip_rx: watch::Receiver<H256>,
|
||||
engine_handle: BeaconConsensusEngineHandle,
|
||||
) -> Self {
|
||||
@ -883,7 +881,7 @@ mod tests {
|
||||
chain_spec: Arc<ChainSpec>,
|
||||
pipeline_exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
|
||||
executor_results: Vec<PostState>,
|
||||
) -> (TestBeaconConsensusEngine, TestEnv<Env<WriteMap>>) {
|
||||
) -> (TestBeaconConsensusEngine, TestEnv<Arc<Env<WriteMap>>>) {
|
||||
reth_tracing::init_test_tracing();
|
||||
let db = create_test_rw_db();
|
||||
let consensus = TestConsensus::default();
|
||||
@ -897,7 +895,7 @@ mod tests {
|
||||
let pipeline = Pipeline::builder()
|
||||
.add_stages(TestStages::new(pipeline_exec_outputs, Default::default()))
|
||||
.with_tip_sender(tip_tx)
|
||||
.build();
|
||||
.build(db.clone());
|
||||
|
||||
// Setup blockchain tree
|
||||
let externals = TreeExternals::new(db.clone(), consensus, executor_factory, chain_spec);
|
||||
|
||||
@ -16,7 +16,7 @@ pub trait SyncStateProvider: Send + Sync {
|
||||
/// Eventually the node reaches the `Finish` stage and will transition to [`SyncState::Idle`], it
|
||||
/// which point the node is considered fully synced.
|
||||
#[auto_impl::auto_impl(&, Arc, Box)]
|
||||
pub trait SyncStateUpdater: Send + Sync + 'static {
|
||||
pub trait SyncStateUpdater: std::fmt::Debug + Send + Sync + 'static {
|
||||
/// Notifies about an [SyncState] update.
|
||||
fn update_sync_state(&self, state: SyncState);
|
||||
}
|
||||
|
||||
@ -50,7 +50,7 @@ pub struct BodiesDownloader<B: BodiesClient, DB> {
|
||||
consensus: Arc<dyn Consensus>,
|
||||
// TODO: make this a [HeaderProvider]
|
||||
/// The database handle
|
||||
db: Arc<DB>,
|
||||
db: DB,
|
||||
/// The maximum number of non-empty blocks per one request
|
||||
request_limit: u64,
|
||||
/// The maximum number of block bodies returned at once from the stream
|
||||
@ -76,7 +76,7 @@ pub struct BodiesDownloader<B: BodiesClient, DB> {
|
||||
impl<B, DB> BodiesDownloader<B, DB>
|
||||
where
|
||||
B: BodiesClient + 'static,
|
||||
DB: Database,
|
||||
DB: Database + Unpin + 'static,
|
||||
{
|
||||
/// Returns the next contiguous request.
|
||||
fn next_headers_request(&mut self) -> DownloadResult<Option<Vec<SealedHeader>>> {
|
||||
@ -249,7 +249,7 @@ where
|
||||
impl<B, DB> BodiesDownloader<B, DB>
|
||||
where
|
||||
B: BodiesClient + 'static,
|
||||
DB: Database,
|
||||
DB: Database + Unpin + 'static,
|
||||
Self: BodyDownloader + 'static,
|
||||
{
|
||||
/// Spawns the downloader task via [tokio::task::spawn]
|
||||
@ -270,7 +270,7 @@ where
|
||||
impl<B, DB> BodyDownloader for BodiesDownloader<B, DB>
|
||||
where
|
||||
B: BodiesClient + 'static,
|
||||
DB: Database,
|
||||
DB: Database + Unpin + 'static,
|
||||
{
|
||||
/// Set a new download range (exclusive).
|
||||
///
|
||||
@ -315,7 +315,7 @@ where
|
||||
impl<B, DB> Stream for BodiesDownloader<B, DB>
|
||||
where
|
||||
B: BodiesClient + 'static,
|
||||
DB: Database,
|
||||
DB: Database + Unpin + 'static,
|
||||
{
|
||||
type Item = BodyDownloaderResult;
|
||||
|
||||
@ -497,7 +497,7 @@ impl BodiesDownloaderBuilder {
|
||||
self,
|
||||
client: B,
|
||||
consensus: Arc<dyn Consensus>,
|
||||
db: Arc<DB>,
|
||||
db: DB,
|
||||
) -> BodiesDownloader<B, DB>
|
||||
where
|
||||
B: BodiesClient + 'static,
|
||||
|
||||
@ -21,7 +21,6 @@
|
||||
//! ```
|
||||
//! # use std::sync::Arc;
|
||||
//! # use reth_db::mdbx::test_utils::create_test_rw_db;
|
||||
//! # use reth_db::mdbx::{Env, WriteMap};
|
||||
//! # use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder;
|
||||
//! # use reth_downloaders::headers::reverse_headers::ReverseHeadersDownloaderBuilder;
|
||||
//! # use reth_interfaces::consensus::Consensus;
|
||||
@ -37,22 +36,23 @@
|
||||
//! # Arc::new(TestHeadersClient::default()),
|
||||
//! # consensus.clone()
|
||||
//! # );
|
||||
//! # let db = create_test_rw_db();
|
||||
//! # let bodies_downloader = BodiesDownloaderBuilder::default().build(
|
||||
//! # Arc::new(TestBodiesClient { responder: |_| Ok((PeerId::zero(), vec![]).into()) }),
|
||||
//! # consensus.clone(),
|
||||
//! # create_test_rw_db()
|
||||
//! # db.clone()
|
||||
//! # );
|
||||
//! # let (tip_tx, tip_rx) = watch::channel(H256::default());
|
||||
//! # let factory = Factory::new(Arc::new(MAINNET.clone()));
|
||||
//! # let (status_updater, _) = TestStatusUpdater::new();
|
||||
//! // Create a pipeline that can fully sync
|
||||
//! # let pipeline: Pipeline<Env<WriteMap>> =
|
||||
//! # let pipeline =
|
||||
//! Pipeline::builder()
|
||||
//! .with_tip_sender(tip_tx)
|
||||
//! .add_stages(
|
||||
//! DefaultStages::new(HeaderSyncMode::Tip(tip_rx), consensus, headers_downloader, bodies_downloader, status_updater, factory)
|
||||
//! )
|
||||
//! .build();
|
||||
//! .build(db);
|
||||
//! ```
|
||||
mod error;
|
||||
mod id;
|
||||
|
||||
@ -1,23 +1,25 @@
|
||||
use crate::{Pipeline, Stage, StageSet};
|
||||
use crate::{pipeline::BoxedStage, Pipeline, Stage, StageId, StageSet};
|
||||
use reth_db::database::Database;
|
||||
use reth_interfaces::sync::SyncStateUpdater;
|
||||
use reth_interfaces::sync::{NoopSyncStateUpdate, SyncStateUpdater};
|
||||
use reth_primitives::{BlockNumber, H256};
|
||||
use tokio::sync::watch;
|
||||
|
||||
/// Builds a [`Pipeline`].
|
||||
#[derive(Debug)]
|
||||
#[must_use = "call `build` to construct the pipeline"]
|
||||
pub struct PipelineBuilder<DB>
|
||||
where
|
||||
DB: Database,
|
||||
{
|
||||
pipeline: Pipeline<DB>,
|
||||
}
|
||||
|
||||
impl<DB: Database> Default for PipelineBuilder<DB> {
|
||||
fn default() -> Self {
|
||||
Self { pipeline: Pipeline::default() }
|
||||
}
|
||||
/// All configured stages in the order they will be executed.
|
||||
stages: Vec<BoxedStage<DB>>,
|
||||
/// The maximum block number to sync to.
|
||||
max_block: Option<BlockNumber>,
|
||||
/// Used for emitting updates about whether the pipeline is running or not.
|
||||
sync_state_updater: Box<dyn SyncStateUpdater>,
|
||||
/// A receiver for the current chain tip to sync to
|
||||
///
|
||||
/// Note: this is only used for debugging purposes.
|
||||
tip_tx: Option<watch::Sender<H256>>,
|
||||
}
|
||||
|
||||
impl<DB> PipelineBuilder<DB>
|
||||
@ -29,7 +31,7 @@ where
|
||||
where
|
||||
S: Stage<DB> + 'static,
|
||||
{
|
||||
self.pipeline.stages.push(Box::new(stage));
|
||||
self.stages.push(Box::new(stage));
|
||||
self
|
||||
}
|
||||
|
||||
@ -42,7 +44,7 @@ where
|
||||
/// [`StageSetBuilder`][crate::StageSetBuilder].
|
||||
pub fn add_stages<Set: StageSet<DB>>(mut self, set: Set) -> Self {
|
||||
for stage in set.builder().build() {
|
||||
self.pipeline.stages.push(stage);
|
||||
self.stages.push(stage);
|
||||
}
|
||||
self
|
||||
}
|
||||
@ -51,24 +53,57 @@ where
|
||||
///
|
||||
/// Once this block is reached, the pipeline will stop.
|
||||
pub fn with_max_block(mut self, block: BlockNumber) -> Self {
|
||||
self.pipeline.max_block = Some(block);
|
||||
self.max_block = Some(block);
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the tip sender.
|
||||
pub fn with_tip_sender(mut self, tip_tx: watch::Sender<H256>) -> Self {
|
||||
self.pipeline.tip_tx = Some(tip_tx);
|
||||
self.tip_tx = Some(tip_tx);
|
||||
self
|
||||
}
|
||||
|
||||
/// Set a [SyncStateUpdater].
|
||||
pub fn with_sync_state_updater<U: SyncStateUpdater>(mut self, updater: U) -> Self {
|
||||
self.pipeline.sync_state_updater = Box::new(updater);
|
||||
self.sync_state_updater = Box::new(updater);
|
||||
self
|
||||
}
|
||||
|
||||
/// Builds the final [`Pipeline`].
|
||||
pub fn build(self) -> Pipeline<DB> {
|
||||
self.pipeline
|
||||
/// Builds the final [`Pipeline`] using the given database.
|
||||
///
|
||||
/// Note: it's expected that this is either an [Arc](std::sync::Arc) or an Arc wrapper type.
|
||||
pub fn build(self, db: DB) -> Pipeline<DB> {
|
||||
let Self { stages, max_block, sync_state_updater, tip_tx } = self;
|
||||
Pipeline {
|
||||
db,
|
||||
stages,
|
||||
max_block,
|
||||
sync_state_updater,
|
||||
tip_tx,
|
||||
listeners: Default::default(),
|
||||
progress: Default::default(),
|
||||
metrics: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<DB: Database> Default for PipelineBuilder<DB> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
stages: Vec::new(),
|
||||
max_block: None,
|
||||
sync_state_updater: Box::<NoopSyncStateUpdate>::default(),
|
||||
tip_tx: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<DB: Database> std::fmt::Debug for PipelineBuilder<DB> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("PipelineBuilder")
|
||||
.field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())
|
||||
.field("max_block", &self.max_block)
|
||||
.field("sync_state_updater", &self.sync_state_updater)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,15 +1,10 @@
|
||||
use crate::{error::*, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput};
|
||||
use futures_util::Future;
|
||||
use reth_db::database::Database;
|
||||
use reth_interfaces::sync::{NoopSyncStateUpdate, SyncState, SyncStateUpdater};
|
||||
use reth_interfaces::sync::{SyncState, SyncStateUpdater};
|
||||
use reth_primitives::{listener::EventListeners, BlockNumber, H256};
|
||||
use reth_provider::Transaction;
|
||||
use std::{
|
||||
fmt::{Debug, Formatter},
|
||||
ops::Deref,
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
};
|
||||
use std::{ops::Deref, pin::Pin};
|
||||
use tokio::sync::watch;
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
use tracing::*;
|
||||
@ -28,6 +23,16 @@ use progress::*;
|
||||
pub use set::*;
|
||||
use sync_metrics::*;
|
||||
|
||||
/// A container for a queued stage.
|
||||
pub(crate) type BoxedStage<DB> = Box<dyn Stage<DB>>;
|
||||
|
||||
/// The future that returns the owned pipeline and the result of the pipeline run. See
|
||||
/// [Pipeline::run_as_fut].
|
||||
pub type PipelineFut<DB> = Pin<Box<dyn Future<Output = PipelineWithResult<DB>> + Send>>;
|
||||
|
||||
/// The pipeline type itself with the result of [Pipeline::run_as_fut]
|
||||
pub type PipelineWithResult<DB> = (Pipeline<DB>, Result<ControlFlow, PipelineError>);
|
||||
|
||||
#[cfg_attr(doc, aquamarine::aquamarine)]
|
||||
/// A staged sync pipeline.
|
||||
///
|
||||
@ -79,6 +84,8 @@ use sync_metrics::*;
|
||||
/// pipeline will unwind the stages in reverse order of execution. It is also possible to
|
||||
/// request an unwind manually (see [Pipeline::unwind]).
|
||||
pub struct Pipeline<DB: Database> {
|
||||
/// The Database
|
||||
db: DB,
|
||||
/// All configured stages in the order they will be executed.
|
||||
stages: Vec<BoxedStage<DB>>,
|
||||
/// The maximum block number to sync to.
|
||||
@ -96,36 +103,6 @@ pub struct Pipeline<DB: Database> {
|
||||
metrics: Metrics,
|
||||
}
|
||||
|
||||
/// The future that returns the owned pipeline and the result of the pipeline run. See
|
||||
/// [Pipeline::run_as_fut].
|
||||
pub type PipelineFut<DB> = Pin<Box<dyn Future<Output = PipelineWithResult<DB>> + Send>>;
|
||||
|
||||
/// The pipeline type itself with the result of [Pipeline::run_as_fut]
|
||||
pub type PipelineWithResult<DB> = (Pipeline<DB>, Result<ControlFlow, PipelineError>);
|
||||
|
||||
impl<DB: Database> Default for Pipeline<DB> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
stages: Vec::new(),
|
||||
max_block: None,
|
||||
listeners: EventListeners::default(),
|
||||
sync_state_updater: Box::<NoopSyncStateUpdate>::default(),
|
||||
progress: PipelineProgress::default(),
|
||||
tip_tx: None,
|
||||
metrics: Metrics::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<DB: Database> Debug for Pipeline<DB> {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("Pipeline")
|
||||
.field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())
|
||||
.field("max_block", &self.max_block)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<DB> Pipeline<DB>
|
||||
where
|
||||
DB: Database + 'static,
|
||||
@ -154,12 +131,13 @@ where
|
||||
}
|
||||
|
||||
/// Registers progress metrics for each registered stage
|
||||
pub fn register_metrics(&mut self, db: Arc<DB>) {
|
||||
pub fn register_metrics(&mut self) {
|
||||
for stage in &self.stages {
|
||||
let stage_id = stage.id();
|
||||
self.metrics.stage_checkpoint(
|
||||
stage_id,
|
||||
db.view(|tx| stage_id.get_progress(tx).ok().flatten().unwrap_or_default())
|
||||
self.db
|
||||
.view(|tx| stage_id.get_progress(tx).ok().flatten().unwrap_or_default())
|
||||
.ok()
|
||||
.unwrap_or_default(),
|
||||
);
|
||||
@ -169,16 +147,16 @@ where
|
||||
/// Consume the pipeline and run it until it reaches the provided tip, if set. Return the
|
||||
/// pipeline and its result as a future.
|
||||
#[track_caller]
|
||||
pub fn run_as_fut(mut self, db: Arc<DB>, tip: Option<H256>) -> PipelineFut<DB> {
|
||||
pub fn run_as_fut(mut self, tip: Option<H256>) -> PipelineFut<DB> {
|
||||
// TODO: fix this in a follow up PR. ideally, consensus engine would be responsible for
|
||||
// updating metrics.
|
||||
self.register_metrics(db.clone());
|
||||
self.register_metrics();
|
||||
Box::pin(async move {
|
||||
// NOTE: the tip should only be None if we are in continuous sync mode.
|
||||
if let Some(tip) = tip {
|
||||
self.set_tip(tip);
|
||||
}
|
||||
let result = self.run_loop(db).await;
|
||||
let result = self.run_loop().await;
|
||||
trace!(target: "sync::pipeline", ?tip, ?result, "Pipeline finished");
|
||||
(self, result)
|
||||
})
|
||||
@ -186,11 +164,11 @@ where
|
||||
|
||||
/// Run the pipeline in an infinite loop. Will terminate early if the user has specified
|
||||
/// a `max_block` in the pipeline.
|
||||
pub async fn run(&mut self, db: Arc<DB>) -> Result<(), PipelineError> {
|
||||
self.register_metrics(db.clone());
|
||||
pub async fn run(&mut self) -> Result<(), PipelineError> {
|
||||
self.register_metrics();
|
||||
|
||||
loop {
|
||||
let next_action = self.run_loop(db.clone()).await?;
|
||||
let next_action = self.run_loop().await?;
|
||||
|
||||
// Terminate the loop early if it's reached the maximum user
|
||||
// configured block.
|
||||
@ -218,7 +196,7 @@ where
|
||||
/// If any stage is unsuccessful at execution, we proceed to
|
||||
/// unwind. This will undo the progress across the entire pipeline
|
||||
/// up to the block that caused the error.
|
||||
async fn run_loop(&mut self, db: Arc<DB>) -> Result<ControlFlow, PipelineError> {
|
||||
async fn run_loop(&mut self) -> Result<ControlFlow, PipelineError> {
|
||||
let mut previous_stage = None;
|
||||
for stage_index in 0..self.stages.len() {
|
||||
let stage = &self.stages[stage_index];
|
||||
@ -233,7 +211,7 @@ where
|
||||
|
||||
trace!(target: "sync::pipeline", stage = %stage_id, "Executing stage");
|
||||
let next = self
|
||||
.execute_stage_to_completion(db.as_ref(), previous_stage, stage_index)
|
||||
.execute_stage_to_completion(previous_stage, stage_index)
|
||||
.instrument(info_span!("execute", stage = %stage_id))
|
||||
.await?;
|
||||
|
||||
@ -249,13 +227,15 @@ where
|
||||
ControlFlow::Unwind { target, bad_block } => {
|
||||
// reset the sync state
|
||||
self.sync_state_updater.update_sync_state(SyncState::Syncing);
|
||||
self.unwind(db.as_ref(), target, bad_block).await?;
|
||||
self.unwind(target, bad_block).await?;
|
||||
return Ok(ControlFlow::Unwind { target, bad_block })
|
||||
}
|
||||
}
|
||||
|
||||
previous_stage =
|
||||
Some((stage_id, db.view(|tx| stage_id.get_progress(tx))??.unwrap_or_default()));
|
||||
previous_stage = Some((
|
||||
stage_id,
|
||||
self.db.view(|tx| stage_id.get_progress(tx))??.unwrap_or_default(),
|
||||
));
|
||||
}
|
||||
|
||||
Ok(self.progress.next_ctrl())
|
||||
@ -266,14 +246,13 @@ where
|
||||
/// If the unwind is due to a bad block the number of that block should be specified.
|
||||
pub async fn unwind(
|
||||
&mut self,
|
||||
db: &DB,
|
||||
to: BlockNumber,
|
||||
bad_block: Option<BlockNumber>,
|
||||
) -> Result<(), PipelineError> {
|
||||
// Unwind stages in reverse order of execution
|
||||
let unwind_pipeline = self.stages.iter_mut().rev();
|
||||
|
||||
let mut tx = Transaction::new(db)?;
|
||||
let mut tx = Transaction::new(&self.db)?;
|
||||
|
||||
for stage in unwind_pipeline {
|
||||
let stage_id = stage.id();
|
||||
@ -316,7 +295,6 @@ where
|
||||
|
||||
async fn execute_stage_to_completion(
|
||||
&mut self,
|
||||
db: &DB,
|
||||
previous_stage: Option<(StageId, BlockNumber)>,
|
||||
stage_index: usize,
|
||||
) -> Result<ControlFlow, PipelineError> {
|
||||
@ -324,7 +302,7 @@ where
|
||||
let stage_id = stage.id();
|
||||
let mut made_progress = false;
|
||||
loop {
|
||||
let mut tx = Transaction::new(db)?;
|
||||
let mut tx = Transaction::new(&self.db)?;
|
||||
|
||||
let prev_progress = stage_id.get_progress(tx.deref())?;
|
||||
|
||||
@ -419,8 +397,16 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// A container for a queued stage.
|
||||
pub(crate) type BoxedStage<DB> = Box<dyn Stage<DB>>;
|
||||
impl<DB: Database> std::fmt::Debug for Pipeline<DB> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("Pipeline")
|
||||
.field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())
|
||||
.field("max_block", &self.max_block)
|
||||
.field("listeners", &self.listeners)
|
||||
.field("sync_state_updater", &self.sync_state_updater)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
@ -463,7 +449,7 @@ mod tests {
|
||||
async fn run_pipeline() {
|
||||
let db = test_utils::create_test_db::<mdbx::WriteMap>(EnvKind::RW);
|
||||
|
||||
let mut pipeline: Pipeline<_> = Pipeline::builder()
|
||||
let mut pipeline = Pipeline::builder()
|
||||
.add_stage(
|
||||
TestStage::new(StageId("A"))
|
||||
.add_exec(Ok(ExecOutput { stage_progress: 20, done: true })),
|
||||
@ -473,12 +459,12 @@ mod tests {
|
||||
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true })),
|
||||
)
|
||||
.with_max_block(10)
|
||||
.build();
|
||||
.build(db);
|
||||
let events = pipeline.events();
|
||||
|
||||
// Run pipeline
|
||||
tokio::spawn(async move {
|
||||
pipeline.run(db).await.unwrap();
|
||||
pipeline.run().await.unwrap();
|
||||
});
|
||||
|
||||
// Check that the stages were run in order
|
||||
@ -521,16 +507,16 @@ mod tests {
|
||||
.add_unwind(Ok(UnwindOutput { stage_progress: 1 })),
|
||||
)
|
||||
.with_max_block(10)
|
||||
.build();
|
||||
.build(db);
|
||||
let events = pipeline.events();
|
||||
|
||||
// Run pipeline
|
||||
tokio::spawn(async move {
|
||||
// Sync first
|
||||
pipeline.run(db.clone()).await.expect("Could not run pipeline");
|
||||
pipeline.run().await.expect("Could not run pipeline");
|
||||
|
||||
// Unwind
|
||||
pipeline.unwind(&db, 1, None).await.expect("Could not unwind pipeline");
|
||||
pipeline.unwind(1, None).await.expect("Could not unwind pipeline");
|
||||
});
|
||||
|
||||
// Check that the stages were unwound in reverse order
|
||||
@ -598,16 +584,16 @@ mod tests {
|
||||
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true })),
|
||||
)
|
||||
.with_max_block(10)
|
||||
.build();
|
||||
.build(db);
|
||||
let events = pipeline.events();
|
||||
|
||||
// Run pipeline
|
||||
tokio::spawn(async move {
|
||||
// Sync first
|
||||
pipeline.run(db.clone()).await.expect("Could not run pipeline");
|
||||
pipeline.run().await.expect("Could not run pipeline");
|
||||
|
||||
// Unwind
|
||||
pipeline.unwind(&db, 50, None).await.expect("Could not unwind pipeline");
|
||||
pipeline.unwind(50, None).await.expect("Could not unwind pipeline");
|
||||
});
|
||||
|
||||
// Check that the stages were unwound in reverse order
|
||||
@ -673,12 +659,12 @@ mod tests {
|
||||
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true })),
|
||||
)
|
||||
.with_max_block(10)
|
||||
.build();
|
||||
.build(db);
|
||||
let events = pipeline.events();
|
||||
|
||||
// Run pipeline
|
||||
tokio::spawn(async move {
|
||||
pipeline.run(db).await.expect("Could not run pipeline");
|
||||
pipeline.run().await.expect("Could not run pipeline");
|
||||
});
|
||||
|
||||
// Check that the stages were unwound in reverse order
|
||||
@ -726,8 +712,8 @@ mod tests {
|
||||
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true })),
|
||||
)
|
||||
.with_max_block(10)
|
||||
.build();
|
||||
let result = pipeline.run(db).await;
|
||||
.build(db);
|
||||
let result = pipeline.run().await;
|
||||
assert_matches!(result, Ok(()));
|
||||
|
||||
// Fatal
|
||||
@ -736,8 +722,8 @@ mod tests {
|
||||
.add_stage(TestStage::new(StageId("Fatal")).add_exec(Err(
|
||||
StageError::DatabaseIntegrity(ProviderError::BlockBodyIndices { number: 5 }),
|
||||
)))
|
||||
.build();
|
||||
let result = pipeline.run(db).await;
|
||||
.build(db);
|
||||
let result = pipeline.run().await;
|
||||
assert_matches!(
|
||||
result,
|
||||
Err(PipelineError::Stage(StageError::DatabaseIntegrity(
|
||||
|
||||
@ -10,17 +10,18 @@
|
||||
//! # Examples
|
||||
//!
|
||||
//! ```no_run
|
||||
//! # use reth_db::mdbx::{Env, WriteMap};
|
||||
//! # use reth_stages::Pipeline;
|
||||
//! # use reth_stages::sets::{OfflineStages};
|
||||
//! # use reth_revm::Factory;
|
||||
//! # use reth_primitives::MAINNET;
|
||||
//! # use std::sync::Arc;
|
||||
//! use reth_db::mdbx::test_utils::create_test_rw_db;
|
||||
//!
|
||||
//! # let factory = Factory::new(Arc::new(MAINNET.clone()));
|
||||
//! # let db = create_test_rw_db();
|
||||
//! // Build a pipeline with all offline stages.
|
||||
//! # let pipeline: Pipeline<Env<WriteMap>> =
|
||||
//! Pipeline::builder().add_stages(OfflineStages::new(factory)).build();
|
||||
//! # let pipeline =
|
||||
//! Pipeline::builder().add_stages(OfflineStages::new(factory)).build(db);
|
||||
//! ```
|
||||
//!
|
||||
//! ```ignore
|
||||
|
||||
Reference in New Issue
Block a user