From 255780d3810d8e35e012d4bdb0df5b1c3fdf6e9e Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Mon, 19 Jun 2023 14:21:38 -0400 Subject: [PATCH] feat: run pipeline only if missing range is large (#3059) --- bin/reth/src/node/mod.rs | 3 +- crates/consensus/beacon/src/engine/mod.rs | 411 ++++++++++++++-------- 2 files changed, 263 insertions(+), 151 deletions(-) diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 3e698af27..c6cf5cc94 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -15,7 +15,7 @@ use fdlimit::raise_fd_limit; use futures::{future::Either, pin_mut, stream, stream_select, StreamExt}; use reth_auto_seal_consensus::{AutoSealBuilder, AutoSealConsensus}; use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig}; -use reth_beacon_consensus::{BeaconConsensus, BeaconConsensusEngine}; +use reth_beacon_consensus::{BeaconConsensus, BeaconConsensusEngine, MIN_BLOCKS_FOR_PIPELINE_RUN}; use reth_blockchain_tree::{ config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree, }; @@ -341,6 +341,7 @@ impl Command { self.debug.continuous, payload_builder.clone(), initial_target, + MIN_BLOCKS_FOR_PIPELINE_RUN, consensus_engine_tx, consensus_engine_rx, )?; diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index b31dea409..b4c77e761 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -65,6 +65,10 @@ use reth_interfaces::blockchain_tree::InsertPayloadOk; /// The maximum number of invalid headers that can be tracked by the engine. const MAX_INVALID_HEADERS: u32 = 512u32; +/// The largest gap for which the tree will be used for sync. See docs for `pipeline_run_threshold` +/// for more information. +pub const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = 128; + /// A _shareable_ beacon consensus frontend. Used to interact with the spawned beacon consensus /// engine. /// @@ -234,6 +238,18 @@ where invalid_headers: InvalidHeaderCache, /// Consensus engine metrics. metrics: EngineMetrics, + /// After downloading a block corresponding to a recent forkchoice update, the engine will + /// check whether or not we can connect the block to the current canonical chain. If we can't, + /// we need to download and execute the missing parents of that block. + /// + /// When the block can't be connected, its block number will be compared to the canonical head, + /// resulting in a heuristic for the number of missing blocks, or the size of the gap between + /// the new block and the canonical head. + /// + /// If the gap is larger than this threshold, the engine will download and execute the missing + /// blocks using the pipeline. Otherwise, the engine, sync controller, and blockchain tree will + /// be used to download and execute the missing blocks. + pipeline_run_threshold: u64, } impl BeaconConsensusEngine @@ -254,6 +270,7 @@ where run_pipeline_continuously: bool, payload_builder: PayloadBuilderHandle, target: Option, + pipeline_run_threshold: u64, ) -> Result<(Self, BeaconConsensusEngineHandle), reth_interfaces::Error> { let (to_engine, rx) = mpsc::unbounded_channel(); Self::with_channel( @@ -266,6 +283,7 @@ where run_pipeline_continuously, payload_builder, target, + pipeline_run_threshold, to_engine, rx, ) @@ -294,6 +312,7 @@ where run_pipeline_continuously: bool, payload_builder: PayloadBuilderHandle, target: Option, + pipeline_run_threshold: u64, to_engine: UnboundedSender, rx: UnboundedReceiver, ) -> Result<(Self, BeaconConsensusEngineHandle), reth_interfaces::Error> { @@ -316,6 +335,7 @@ where listeners: EventListeners::default(), invalid_headers: InvalidHeaderCache::new(MAX_INVALID_HEADERS), metrics: EngineMetrics::default(), + pipeline_run_threshold, }; let maybe_pipeline_target = match target { @@ -678,10 +698,7 @@ where // we assume the FCU is valid and at least the head is missing, so we need to start syncing // to it - - // if this is the first FCU we received from the beacon node, then we start triggering the - // pipeline - if self.forkchoice_state_tracker.is_empty() { + let target = if self.forkchoice_state_tracker.is_empty() { // find the appropriate target to sync to, if we don't have the safe block hash then we // start syncing to the safe block via pipeline first let target = if !state.safe_block_hash.is_zero() && @@ -694,19 +711,24 @@ where // we need to first check the buffer for the head and its ancestors let lowest_unknown_hash = self.lowest_buffered_ancestor_or(target); - - trace!(target: "consensus::engine", request=?lowest_unknown_hash, "Triggering pipeline with target instead of downloading"); - - self.sync.set_pipeline_sync_target(lowest_unknown_hash); + trace!(target: "consensus::engine", request=?lowest_unknown_hash, "Triggering full block download for missing ancestors of the new head"); + lowest_unknown_hash } else { // we need to first check the buffer for the head and its ancestors let lowest_unknown_hash = self.lowest_buffered_ancestor_or(state.head_block_hash); - trace!(target: "consensus::engine", request=?lowest_unknown_hash, "Triggering full block download for missing ancestors of the new head"); + lowest_unknown_hash + }; + // if the threshold is zero, we should not download the block first, and just use the + // pipeline. Otherwise we use the tree to insert the block first + if self.pipeline_run_threshold == 0 { + // use the pipeline to sync to the target + self.sync.set_pipeline_sync_target(target); + } else { // trigger a full block download for missing hash, or the parent of its lowest buffered // ancestor - self.sync.download_full_block(lowest_unknown_hash); + self.sync.download_full_block(target); } PayloadStatus::from_status(PayloadStatusEnum::Syncing) @@ -797,9 +819,7 @@ where let block_hash = block.hash(); // now check the block itself - if let Some(status) = self.check_invalid_ancestor(block.parent_hash) { - // The parent is invalid, so this block is also invalid - self.invalid_headers.insert(block.header); + if let Some(status) = self.check_invalid_ancestor_with_head(block.parent_hash, block.hash) { return Ok(status) } @@ -1030,8 +1050,39 @@ where self.try_make_sync_target_canonical(num_hash); } InsertPayloadOk::Inserted(BlockStatus::Disconnected { missing_parent }) => { - // continue downloading the missing parent - self.sync.download_full_block(missing_parent.hash); + // compare the missing parent with the canonical tip + let canonical_tip_num = self.blockchain.canonical_tip().number; + + // if the number of missing blocks is greater than the max, run the + // pipeline + if missing_parent.number >= canonical_tip_num && + missing_parent.number - canonical_tip_num > + self.pipeline_run_threshold + { + if let Some(state) = self.forkchoice_state_tracker.sync_target_state() { + // if we have already canonicalized the finalized block, we should + // skip the pipeline run + if Ok(None) == + self.blockchain.header_by_hash_or_number( + state.finalized_block_hash.into(), + ) + { + self.sync.set_pipeline_sync_target(state.finalized_block_hash) + } + } + } else { + // continue downloading the missing parent + // + // this happens if either: + // * the missing parent block num < canonical tip num + // * this case represents a missing block on a fork that is shorter + // than the canonical chain + // * the missing parent block num >= canonical tip num, but the number + // of missing blocks is less than the pipeline threshold + // * this case represents a potentially long range of blocks to + // download and execute + self.sync.download_full_block(missing_parent.hash); + } } _ => (), } @@ -1434,52 +1485,112 @@ mod tests { } } - fn setup_consensus_engine( + struct TestConsensusEngineBuilder { chain_spec: Arc, pipeline_exec_outputs: VecDeque>, executor_results: Vec, - ) -> (TestBeaconConsensusEngine, TestEnv>>) { - reth_tracing::init_test_tracing(); - let db = create_test_rw_db(); - let consensus = TestConsensus::default(); - let payload_builder = spawn_test_payload_service(); + pipeline_run_threshold: Option, + max_block: Option, + } - let executor_factory = TestExecutorFactory::new(chain_spec.clone()); - executor_factory.extend(executor_results); + impl TestConsensusEngineBuilder { + /// Create a new `TestConsensusEngineBuilder` with the given `ChainSpec`. + fn new(chain_spec: Arc) -> Self { + Self { + chain_spec, + pipeline_exec_outputs: VecDeque::new(), + executor_results: Vec::new(), + pipeline_run_threshold: None, + max_block: None, + } + } - // Setup pipeline - let (tip_tx, tip_rx) = watch::channel(H256::default()); - let pipeline = Pipeline::builder() - .add_stages(TestStages::new(pipeline_exec_outputs, Default::default())) - .with_tip_sender(tip_tx) - .build(db.clone(), chain_spec.clone()); + /// Set the pipeline execution outputs to use for the test consensus engine. + fn with_pipeline_exec_outputs( + mut self, + pipeline_exec_outputs: VecDeque>, + ) -> Self { + self.pipeline_exec_outputs = pipeline_exec_outputs; + self + } - // Setup blockchain tree - let externals = - TreeExternals::new(db.clone(), consensus, executor_factory, chain_spec.clone()); - let config = BlockchainTreeConfig::new(1, 2, 3, 2); - let (canon_state_notification_sender, _) = tokio::sync::broadcast::channel(3); - let tree = ShareableBlockchainTree::new( - BlockchainTree::new(externals, canon_state_notification_sender, config) - .expect("failed to create tree"), - ); - let factory = ProviderFactory::new(db.clone(), chain_spec.clone()); - let latest = chain_spec.genesis_header().seal_slow(); - let blockchain_provider = BlockchainProvider::with_latest(factory, tree, latest); - let (engine, handle) = BeaconConsensusEngine::new( - NoopFullBlockClient::default(), - pipeline, - blockchain_provider, - Box::::default(), - Box::::default(), - None, - false, - payload_builder, - None, - ) - .expect("failed to create consensus engine"); + /// Set the executor results to use for the test consensus engine. + fn with_executor_results(mut self, executor_results: Vec) -> Self { + self.executor_results = executor_results; + self + } - (engine, TestEnv::new(db, tip_rx, handle)) + /// Sets the max block for the pipeline to run. + fn with_max_block(mut self, max_block: BlockNumber) -> Self { + self.max_block = Some(max_block); + self + } + + /// Disables blockchain tree driven sync. This is the same as setting the pipeline run + /// threshold to 0. + fn disable_blockchain_tree_sync(mut self) -> Self { + self.pipeline_run_threshold = Some(0); + self + } + + /// Builds the test consensus engine into a `TestConsensusEngine` and `TestEnv`. + fn build(self) -> (TestBeaconConsensusEngine, TestEnv>>) { + reth_tracing::init_test_tracing(); + let db = create_test_rw_db(); + let consensus = TestConsensus::default(); + let payload_builder = spawn_test_payload_service(); + + let executor_factory = TestExecutorFactory::new(self.chain_spec.clone()); + executor_factory.extend(self.executor_results); + + // Setup pipeline + let (tip_tx, tip_rx) = watch::channel(H256::default()); + let mut pipeline = Pipeline::builder() + .add_stages(TestStages::new(self.pipeline_exec_outputs, Default::default())) + .with_tip_sender(tip_tx); + + if let Some(max_block) = self.max_block { + pipeline = pipeline.with_max_block(max_block); + } + + let pipeline = pipeline.build(db.clone(), self.chain_spec.clone()); + + // Setup blockchain tree + let externals = TreeExternals::new( + db.clone(), + consensus, + executor_factory, + self.chain_spec.clone(), + ); + let config = BlockchainTreeConfig::new(1, 2, 3, 2); + let (canon_state_notification_sender, _) = tokio::sync::broadcast::channel(3); + let tree = ShareableBlockchainTree::new( + BlockchainTree::new(externals, canon_state_notification_sender, config) + .expect("failed to create tree"), + ); + let shareable_db = ProviderFactory::new(db.clone(), self.chain_spec.clone()); + let latest = self.chain_spec.genesis_header().seal_slow(); + let blockchain_provider = BlockchainProvider::with_latest(shareable_db, tree, latest); + let (mut engine, handle) = BeaconConsensusEngine::new( + NoopFullBlockClient::default(), + pipeline, + blockchain_provider, + Box::::default(), + Box::::default(), + None, + false, + payload_builder, + None, + self.pipeline_run_threshold.unwrap_or(MIN_BLOCKS_FOR_PIPELINE_RUN), + ) + .expect("failed to create consensus engine"); + + if let Some(max_block) = self.max_block { + engine.sync.set_max_block(max_block) + } + + (engine, TestEnv::new(db, tip_rx, handle)) + } } fn spawn_consensus_engine( @@ -1503,11 +1614,13 @@ mod tests { .paris_activated() .build(), ); - let (consensus_engine, env) = setup_consensus_engine( - chain_spec.clone(), - VecDeque::from([Err(StageError::ChannelClosed)]), - Vec::default(), - ); + + let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone()) + .with_pipeline_exec_outputs(VecDeque::from([Err(StageError::ChannelClosed)])) + .disable_blockchain_tree_sync() + .with_max_block(1) + .build(); + let res = spawn_consensus_engine(consensus_engine); let _ = env @@ -1532,11 +1645,13 @@ mod tests { .paris_activated() .build(), ); - let (consensus_engine, env) = setup_consensus_engine( - chain_spec.clone(), - VecDeque::from([Err(StageError::ChannelClosed)]), - Vec::default(), - ); + + let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone()) + .with_pipeline_exec_outputs(VecDeque::from([Err(StageError::ChannelClosed)])) + .disable_blockchain_tree_sync() + .with_max_block(1) + .build(); + let mut rx = spawn_consensus_engine(consensus_engine); // consensus engine is idle @@ -1572,14 +1687,16 @@ mod tests { .paris_activated() .build(), ); - let (consensus_engine, env) = setup_consensus_engine( - chain_spec.clone(), - VecDeque::from([ + + let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone()) + .with_pipeline_exec_outputs(VecDeque::from([ Ok(ExecOutput { checkpoint: StageCheckpoint::new(1), done: true }), Err(StageError::ChannelClosed), - ]), - Vec::default(), - ); + ])) + .disable_blockchain_tree_sync() + .with_max_block(2) + .build(); + let rx = spawn_consensus_engine(consensus_engine); let _ = env @@ -1605,15 +1722,16 @@ mod tests { .paris_activated() .build(), ); - let (mut consensus_engine, env) = setup_consensus_engine( - chain_spec.clone(), - VecDeque::from([Ok(ExecOutput { + + let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone()) + .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(max_block), done: true, - })]), - Vec::default(), - ); - consensus_engine.sync.set_max_block(max_block); + })])) + .with_max_block(max_block) + .disable_blockchain_tree_sync() + .build(); + let rx = spawn_consensus_engine(consensus_engine); let _ = env @@ -1651,14 +1769,13 @@ mod tests { .paris_activated() .build(), ); - let (consensus_engine, env) = setup_consensus_engine( - chain_spec.clone(), - VecDeque::from([Ok(ExecOutput { - done: true, + + let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone()) + .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), - })]), - Vec::default(), - ); + done: true, + })])) + .build(); let mut engine_rx = spawn_consensus_engine(consensus_engine); @@ -1682,14 +1799,13 @@ mod tests { .paris_activated() .build(), ); - let (consensus_engine, env) = setup_consensus_engine( - chain_spec.clone(), - VecDeque::from([Ok(ExecOutput { - done: true, + + let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone()) + .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), - })]), - Vec::default(), - ); + done: true, + })])) + .build(); let genesis = random_block(0, None, None, Some(0)); let block1 = random_block(1, Some(genesis.hash), None, Some(0)); @@ -1730,14 +1846,14 @@ mod tests { .paris_activated() .build(), ); - let (consensus_engine, env) = setup_consensus_engine( - chain_spec.clone(), - VecDeque::from([ - Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }), - Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }), - ]), - Vec::default(), - ); + + let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone()) + .with_pipeline_exec_outputs(VecDeque::from([ + Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }), + Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }), + ])) + .disable_blockchain_tree_sync() + .build(); let genesis = random_block(0, None, None, Some(0)); let block1 = random_block(1, Some(genesis.hash), None, Some(0)); @@ -1779,14 +1895,14 @@ mod tests { .paris_activated() .build(), ); - let (consensus_engine, env) = setup_consensus_engine( - chain_spec.clone(), - VecDeque::from([Ok(ExecOutput { - done: true, + + let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone()) + .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), - })]), - Vec::default(), - ); + done: true, + })])) + .disable_blockchain_tree_sync() + .build(); let genesis = random_block(0, None, None, Some(0)); let block1 = random_block(1, Some(genesis.hash), None, Some(0)); @@ -1816,14 +1932,13 @@ mod tests { .paris_at_ttd(U256::from(3)) .build(), ); - let (consensus_engine, env) = setup_consensus_engine( - chain_spec.clone(), - VecDeque::from([ - Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }), - Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }), - ]), - Vec::default(), - ); + + let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone()) + .with_pipeline_exec_outputs(VecDeque::from([ + Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }), + Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }), + ])) + .build(); let genesis = random_block(0, None, None, Some(0)); let mut block1 = random_block(1, Some(genesis.hash), None, Some(0)); @@ -1869,14 +1984,13 @@ mod tests { .london_activated() .build(), ); - let (consensus_engine, env) = setup_consensus_engine( - chain_spec.clone(), - VecDeque::from([ - Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }), - Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }), - ]), - Vec::default(), - ); + + let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone()) + .with_pipeline_exec_outputs(VecDeque::from([ + Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }), + Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }), + ])) + .build(); let genesis = random_block(0, None, None, Some(0)); let block1 = random_block(1, Some(genesis.hash), None, Some(0)); @@ -1916,14 +2030,13 @@ mod tests { .paris_activated() .build(), ); - let (consensus_engine, env) = setup_consensus_engine( - chain_spec.clone(), - VecDeque::from([Ok(ExecOutput { - done: true, + + let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone()) + .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), - })]), - Vec::default(), - ); + done: true, + })])) + .build(); let mut engine_rx = spawn_consensus_engine(consensus_engine); @@ -1949,14 +2062,13 @@ mod tests { .paris_activated() .build(), ); - let (consensus_engine, env) = setup_consensus_engine( - chain_spec.clone(), - VecDeque::from([Ok(ExecOutput { - done: true, + + let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone()) + .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), - })]), - Vec::default(), - ); + done: true, + })])) + .build(); let genesis = random_block(0, None, None, Some(0)); let block1 = random_block(1, Some(genesis.hash), None, Some(0)); @@ -1999,14 +2111,13 @@ mod tests { .paris_activated() .build(), ); - let (consensus_engine, env) = setup_consensus_engine( - chain_spec.clone(), - VecDeque::from([Ok(ExecOutput { - done: true, + + let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone()) + .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), - })]), - Vec::default(), - ); + done: true, + })])) + .build(); let genesis = random_block(0, None, None, Some(0)); @@ -2056,14 +2167,14 @@ mod tests { .london_activated() .build(), ); - let (consensus_engine, env) = setup_consensus_engine( - chain_spec.clone(), - VecDeque::from([Ok(ExecOutput { - done: true, + + let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone()) + .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), - })]), - Vec::from([exec_result2]), - ); + done: true, + })])) + .with_executor_results(Vec::from([exec_result2])) + .build(); insert_blocks( env.db.as_ref(),