refactor: remove SyncstateUpdater generics and use boxed instead (#2534)

This commit is contained in:
Matthias Seitz
2023-05-03 13:57:28 +02:00
committed by GitHub
parent 3be635406f
commit 90fa586ced
9 changed files with 51 additions and 68 deletions

View File

@ -11,9 +11,7 @@ use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder,
headers::reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::FileClient,
};
use reth_interfaces::{
consensus::Consensus, p2p::headers::client::NoopStatusUpdater, sync::SyncStateUpdater,
};
use reth_interfaces::{consensus::Consensus, p2p::headers::client::NoopStatusUpdater};
use reth_primitives::{ChainSpec, H256};
use reth_staged_sync::{
utils::{
@ -131,7 +129,7 @@ impl ImportCommand {
db: Arc<Env<WriteMap>>,
consensus: &Arc<C>,
file_client: Arc<FileClient>,
) -> eyre::Result<(Pipeline<Env<WriteMap>, impl SyncStateUpdater>, impl Stream<Item = NodeEvent>)>
) -> eyre::Result<(Pipeline<Env<WriteMap>>, impl Stream<Item = NodeEvent>)>
where
C: Consensus + 'static,
{

View File

@ -425,7 +425,7 @@ impl Command {
consensus: Arc<dyn Consensus>,
db: Arc<Env<WriteMap>>,
task_executor: &TaskExecutor,
) -> eyre::Result<Pipeline<Env<WriteMap>, NetworkHandle>>
) -> eyre::Result<Pipeline<Env<WriteMap>>>
where
Client: HeadersClient + BodiesClient + Clone + 'static,
{
@ -633,7 +633,7 @@ impl Command {
consensus: Arc<dyn Consensus>,
max_block: Option<u64>,
continuous: bool,
) -> eyre::Result<Pipeline<Env<WriteMap>, U>>
) -> eyre::Result<Pipeline<Env<WriteMap>>>
where
H: HeaderDownloader + 'static,
B: BodyDownloader + 'static,

View File

@ -5,7 +5,6 @@ use reth_interfaces::{
blockchain_tree::{BlockStatus, BlockchainTreeEngine},
consensus::ForkchoiceState,
executor::Error as ExecutorError,
sync::SyncStateUpdater,
Error,
};
use reth_payload_builder::{PayloadBuilderAttributes, PayloadBuilderHandle};
@ -135,11 +134,10 @@ impl BeaconConsensusEngineHandle {
///
/// If the future is polled more than once. Leads to undefined state.
#[must_use = "Future does nothing unless polled"]
pub struct BeaconConsensusEngine<DB, TS, U, BT>
pub struct BeaconConsensusEngine<DB, TS, BT>
where
DB: Database,
TS: TaskSpawner,
U: SyncStateUpdater,
BT: BlockchainTreeEngine,
{
/// The database handle.
@ -149,7 +147,7 @@ where
/// The current state of the pipeline.
/// Must always be [Some] unless the state is being reevaluated.
/// The pipeline is used for historical sync by setting the current forkchoice head.
pipeline_state: Option<PipelineState<DB, U>>,
pipeline_state: Option<PipelineState<DB>>,
/// The blockchain tree used for live sync and reorg tracking.
blockchain_tree: BT,
/// The Engine API message receiver.
@ -178,18 +176,17 @@ where
metrics: Metrics,
}
impl<DB, TS, U, BT> BeaconConsensusEngine<DB, TS, U, BT>
impl<DB, TS, BT> BeaconConsensusEngine<DB, TS, BT>
where
DB: Database + Unpin + 'static,
TS: TaskSpawner,
U: SyncStateUpdater + 'static,
BT: BlockchainTreeEngine + 'static,
{
/// Create a new instance of the [BeaconConsensusEngine].
pub fn new(
db: Arc<DB>,
task_spawner: TS,
pipeline: Pipeline<DB, U>,
pipeline: Pipeline<DB>,
blockchain_tree: BT,
max_block: Option<BlockNumber>,
continuous: bool,
@ -215,7 +212,7 @@ where
pub fn with_channel(
db: Arc<DB>,
task_spawner: TS,
pipeline: Pipeline<DB, U>,
pipeline: Pipeline<DB>,
blockchain_tree: BT,
max_block: Option<BlockNumber>,
continuous: bool,
@ -553,9 +550,9 @@ where
/// Resets the next action to the default value.
fn next_pipeline_state(
&mut self,
pipeline: Pipeline<DB, U>,
pipeline: Pipeline<DB>,
forkchoice_state: ForkchoiceState,
) -> PipelineState<DB, U> {
) -> PipelineState<DB> {
let next_action = std::mem::take(&mut self.next_action);
let (tip, should_run_pipeline) = match next_action {
@ -639,11 +636,10 @@ where
/// local forkchoice state, it will launch the pipeline to sync to the head hash.
/// While the pipeline is syncing, the consensus engine will keep processing messages from the
/// receiver and forwarding them to the blockchain tree.
impl<DB, TS, U, BT> Future for BeaconConsensusEngine<DB, TS, U, BT>
impl<DB, TS, BT> Future for BeaconConsensusEngine<DB, TS, BT>
where
DB: Database + Unpin + 'static,
TS: TaskSpawner + Unpin,
U: SyncStateUpdater + Unpin + 'static,
BT: BlockchainTreeEngine + Unpin + 'static,
{
type Output = Result<(), BeaconEngineError>;
@ -805,7 +801,7 @@ mod tests {
BlockchainTree, ShareableBlockchainTree,
};
use reth_db::mdbx::{test_utils::create_test_rw_db, Env, WriteMap};
use reth_interfaces::{sync::NoopSyncStateUpdate, test_utils::TestConsensus};
use reth_interfaces::test_utils::TestConsensus;
use reth_payload_builder::test_utils::spawn_test_payload_service;
use reth_primitives::{ChainSpec, ChainSpecBuilder, SealedBlockWithSenders, H256, MAINNET};
use reth_provider::{test_utils::TestExecutorFactory, Transaction};
@ -820,7 +816,6 @@ mod tests {
type TestBeaconConsensusEngine = BeaconConsensusEngine<
Env<WriteMap>,
TokioTaskExecutor,
NoopSyncStateUpdate,
ShareableBlockchainTree<Arc<Env<WriteMap>>, TestConsensus, TestExecutorFactory>,
>;

View File

@ -1,5 +1,4 @@
use reth_db::database::Database;
use reth_interfaces::sync::SyncStateUpdater;
use reth_stages::{Pipeline, PipelineWithResult};
use tokio::sync::oneshot;
@ -12,14 +11,14 @@ use tokio::sync::oneshot;
/// running, it acquires the write lock over the database. This means that we cannot forward to the
/// blockchain tree any messages that would result in database writes, since it would result in a
/// deadlock.
pub enum PipelineState<DB: Database, U: SyncStateUpdater> {
pub enum PipelineState<DB: Database> {
/// Pipeline is idle.
Idle(Pipeline<DB, U>),
Idle(Pipeline<DB>),
/// Pipeline is running.
Running(oneshot::Receiver<PipelineWithResult<DB, U>>),
Running(oneshot::Receiver<PipelineWithResult<DB>>),
}
impl<DB: Database, U: SyncStateUpdater> PipelineState<DB, U> {
impl<DB: Database> PipelineState<DB> {
/// Returns `true` if the state matches idle.
pub fn is_idle(&self) -> bool {
matches!(self, PipelineState::Idle(_))

View File

@ -16,7 +16,7 @@ pub trait SyncStateProvider: Send + Sync {
/// Eventually the node reaches the `Finish` stage and will transition to [`SyncState::Idle`], it
/// which point the node is considered fully synced.
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait SyncStateUpdater: SyncStateProvider {
pub trait SyncStateUpdater: Send + Sync + 'static {
/// Notifies about an [SyncState] update.
fn update_sync_state(&self, state: SyncState);
}

View File

@ -25,7 +25,6 @@
//! # use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder;
//! # use reth_downloaders::headers::reverse_headers::ReverseHeadersDownloaderBuilder;
//! # use reth_interfaces::consensus::Consensus;
//! # use reth_interfaces::sync::NoopSyncStateUpdate;
//! # use reth_interfaces::test_utils::{TestBodiesClient, TestConsensus, TestHeadersClient, TestStatusUpdater};
//! # use reth_revm::Factory;
//! # use reth_primitives::{PeerId, MAINNET, H256};
@ -47,7 +46,7 @@
//! # let factory = Factory::new(Arc::new(MAINNET.clone()));
//! # let (status_updater, _) = TestStatusUpdater::new();
//! // Create a pipeline that can fully sync
//! # let pipeline: Pipeline<Env<WriteMap>, NoopSyncStateUpdate> =
//! # let pipeline: Pipeline<Env<WriteMap>> =
//! Pipeline::builder()
//! .with_tip_sender(tip_tx)
//! .add_stages(

View File

@ -1,30 +1,28 @@
use crate::{Pipeline, Stage, StageSet};
use reth_db::database::Database;
use reth_interfaces::sync::{NoopSyncStateUpdate, SyncStateUpdater};
use reth_interfaces::sync::SyncStateUpdater;
use reth_primitives::{BlockNumber, H256};
use tokio::sync::watch;
/// Builds a [`Pipeline`].
#[derive(Debug)]
#[must_use = "call `build` to construct the pipeline"]
pub struct PipelineBuilder<DB, U = NoopSyncStateUpdate>
pub struct PipelineBuilder<DB>
where
DB: Database,
U: SyncStateUpdater,
{
pipeline: Pipeline<DB, U>,
pipeline: Pipeline<DB>,
}
impl<DB: Database, U: SyncStateUpdater> Default for PipelineBuilder<DB, U> {
impl<DB: Database> Default for PipelineBuilder<DB> {
fn default() -> Self {
Self { pipeline: Pipeline::default() }
}
}
impl<DB, U> PipelineBuilder<DB, U>
impl<DB> PipelineBuilder<DB>
where
DB: Database,
U: SyncStateUpdater,
{
/// Add a stage to the pipeline.
pub fn add_stage<S>(mut self, stage: S) -> Self
@ -71,13 +69,13 @@ where
}
/// Set a [SyncStateUpdater].
pub fn with_sync_state_updater(mut self, updater: U) -> Self {
self.pipeline.sync_state_updater = Some(updater);
pub fn with_sync_state_updater<U: SyncStateUpdater>(mut self, updater: U) -> Self {
self.pipeline.sync_state_updater = Box::new(updater);
self
}
/// Builds the final [`Pipeline`].
pub fn build(self) -> Pipeline<DB, U> {
pub fn build(self) -> Pipeline<DB> {
self.pipeline
}
}

View File

@ -1,7 +1,7 @@
use crate::{error::*, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput};
use futures_util::Future;
use reth_db::database::Database;
use reth_interfaces::sync::{SyncState, SyncStateUpdater};
use reth_interfaces::sync::{NoopSyncStateUpdate, SyncState, SyncStateUpdater};
use reth_primitives::{listener::EventListeners, BlockNumber, H256};
use reth_provider::Transaction;
use std::{
@ -78,12 +78,12 @@ use sync_metrics::*;
/// In case of a validation error (as determined by the consensus engine) in one of the stages, the
/// pipeline will unwind the stages in reverse order of execution. It is also possible to
/// request an unwind manually (see [Pipeline::unwind]).
pub struct Pipeline<DB: Database, U: SyncStateUpdater> {
pub struct Pipeline<DB: Database> {
stages: Vec<BoxedStage<DB>>,
max_block: Option<BlockNumber>,
continuous: bool,
listeners: EventListeners<PipelineEvent>,
sync_state_updater: Option<U>,
sync_state_updater: Box<dyn SyncStateUpdater>,
progress: PipelineProgress,
tip_tx: Option<watch::Sender<H256>>,
metrics: Metrics,
@ -91,19 +91,19 @@ pub struct Pipeline<DB: Database, U: SyncStateUpdater> {
/// The future that returns the owned pipeline and the result of the pipeline run. See
/// [Pipeline::run_as_fut].
pub type PipelineFut<DB, U> = Pin<Box<dyn Future<Output = PipelineWithResult<DB, U>> + Send>>;
pub type PipelineFut<DB> = Pin<Box<dyn Future<Output = PipelineWithResult<DB>> + Send>>;
/// The pipeline type itself with the result of [Pipeline::run_as_fut]
pub type PipelineWithResult<DB, U> = (Pipeline<DB, U>, Result<ControlFlow, PipelineError>);
pub type PipelineWithResult<DB> = (Pipeline<DB>, Result<ControlFlow, PipelineError>);
impl<DB: Database, U: SyncStateUpdater> Default for Pipeline<DB, U> {
impl<DB: Database> Default for Pipeline<DB> {
fn default() -> Self {
Self {
stages: Vec::new(),
max_block: None,
continuous: false,
listeners: EventListeners::default(),
sync_state_updater: None,
sync_state_updater: Box::<NoopSyncStateUpdate>::default(),
progress: PipelineProgress::default(),
tip_tx: None,
metrics: Metrics::default(),
@ -111,7 +111,7 @@ impl<DB: Database, U: SyncStateUpdater> Default for Pipeline<DB, U> {
}
}
impl<DB: Database, U: SyncStateUpdater> Debug for Pipeline<DB, U> {
impl<DB: Database> Debug for Pipeline<DB> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Pipeline")
.field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())
@ -120,13 +120,12 @@ impl<DB: Database, U: SyncStateUpdater> Debug for Pipeline<DB, U> {
}
}
impl<DB, U> Pipeline<DB, U>
impl<DB> Pipeline<DB>
where
DB: Database + 'static,
U: SyncStateUpdater + 'static,
{
/// Construct a pipeline using a [`PipelineBuilder`].
pub fn builder() -> PipelineBuilder<DB, U> {
pub fn builder() -> PipelineBuilder<DB> {
PipelineBuilder::default()
}
@ -164,7 +163,7 @@ where
/// Consume the pipeline and run it until it reaches the provided tip, if set. Return the
/// pipeline and its result as a future.
#[track_caller]
pub fn run_as_fut(mut self, db: Arc<DB>, tip: Option<H256>) -> PipelineFut<DB, U> {
pub fn run_as_fut(mut self, db: Arc<DB>, tip: Option<H256>) -> PipelineFut<DB> {
// TODO: fix this in a follow up PR. ideally, consensus engine would be responsible for
// updating metrics.
self.register_metrics(db.clone());
@ -220,12 +219,10 @@ where
let stage_id = stage.id();
// Update sync state
if let Some(ref updater) = self.sync_state_updater {
if stage_id.is_finish() {
updater.update_sync_state(SyncState::Idle);
} else {
updater.update_sync_state(SyncState::Syncing);
}
if stage_id.is_finish() {
self.sync_state_updater.update_sync_state(SyncState::Idle);
} else {
self.sync_state_updater.update_sync_state(SyncState::Syncing);
}
trace!(target: "sync::pipeline", stage = %stage_id, "Executing stage");
@ -245,9 +242,7 @@ where
ControlFlow::Continue { progress } => self.progress.update(progress),
ControlFlow::Unwind { target, bad_block } => {
// reset the sync state
if let Some(ref updater) = self.sync_state_updater {
updater.update_sync_state(SyncState::Syncing);
}
self.sync_state_updater.update_sync_state(SyncState::Syncing);
self.unwind(db.as_ref(), target, bad_block).await?;
return Ok(ControlFlow::Unwind { target, bad_block })
}
@ -427,7 +422,7 @@ mod tests {
use crate::{test_utils::TestStage, StageId, UnwindOutput};
use assert_matches::assert_matches;
use reth_db::mdbx::{self, test_utils, EnvKind};
use reth_interfaces::{consensus, provider::ProviderError, sync::NoopSyncStateUpdate};
use reth_interfaces::{consensus, provider::ProviderError};
use tokio_stream::StreamExt;
#[test]
@ -462,7 +457,7 @@ mod tests {
async fn run_pipeline() {
let db = test_utils::create_test_db::<mdbx::WriteMap>(EnvKind::RW);
let mut pipeline: Pipeline<_, NoopSyncStateUpdate> = Pipeline::builder()
let mut pipeline: Pipeline<_> = Pipeline::builder()
.add_stage(
TestStage::new(StageId("A"))
.add_exec(Ok(ExecOutput { stage_progress: 20, done: true })),
@ -503,7 +498,7 @@ mod tests {
async fn unwind_pipeline() {
let db = test_utils::create_test_db::<mdbx::WriteMap>(EnvKind::RW);
let mut pipeline: Pipeline<_, NoopSyncStateUpdate> = Pipeline::builder()
let mut pipeline = Pipeline::builder()
.add_stage(
TestStage::new(StageId("A"))
.add_exec(Ok(ExecOutput { stage_progress: 100, done: true }))
@ -586,7 +581,7 @@ mod tests {
async fn unwind_pipeline_with_intermediate_progress() {
let db = test_utils::create_test_db::<mdbx::WriteMap>(EnvKind::RW);
let mut pipeline: Pipeline<_, NoopSyncStateUpdate> = Pipeline::builder()
let mut pipeline = Pipeline::builder()
.add_stage(
TestStage::new(StageId("A"))
.add_exec(Ok(ExecOutput { stage_progress: 100, done: true }))
@ -655,7 +650,7 @@ mod tests {
async fn run_pipeline_with_unwind() {
let db = test_utils::create_test_db::<mdbx::WriteMap>(EnvKind::RW);
let mut pipeline: Pipeline<_, NoopSyncStateUpdate> = Pipeline::builder()
let mut pipeline = Pipeline::builder()
.add_stage(
TestStage::new(StageId("A"))
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true }))
@ -718,7 +713,7 @@ mod tests {
async fn pipeline_error_handling() {
// Non-fatal
let db = test_utils::create_test_db::<mdbx::WriteMap>(EnvKind::RW);
let mut pipeline: Pipeline<_, NoopSyncStateUpdate> = Pipeline::builder()
let mut pipeline = Pipeline::builder()
.add_stage(
TestStage::new(StageId("NonFatal"))
.add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error))))
@ -731,7 +726,7 @@ mod tests {
// Fatal
let db = test_utils::create_test_db::<mdbx::WriteMap>(EnvKind::RW);
let mut pipeline: Pipeline<_, NoopSyncStateUpdate> = Pipeline::builder()
let mut pipeline = Pipeline::builder()
.add_stage(TestStage::new(StageId("Fatal")).add_exec(Err(
StageError::DatabaseIntegrity(ProviderError::BlockBodyIndices { number: 5 }),
)))

View File

@ -11,7 +11,6 @@
//!
//! ```no_run
//! # use reth_db::mdbx::{Env, WriteMap};
//! # use reth_interfaces::sync::NoopSyncStateUpdate;
//! # use reth_stages::Pipeline;
//! # use reth_stages::sets::{OfflineStages};
//! # use reth_revm::Factory;
@ -20,7 +19,7 @@
//!
//! # let factory = Factory::new(Arc::new(MAINNET.clone()));
//! // Build a pipeline with all offline stages.
//! # let pipeline: Pipeline<Env<WriteMap>, NoopSyncStateUpdate> =
//! # let pipeline: Pipeline<Env<WriteMap>> =
//! Pipeline::builder().add_stages(OfflineStages::new(factory)).build();
//! ```
//!