feat: run pipeline only if missing range is large (#3059)

This commit is contained in:
Dan Cline
2023-06-19 14:21:38 -04:00
committed by GitHub
parent c481987558
commit 255780d381
2 changed files with 263 additions and 151 deletions

View File

@ -15,7 +15,7 @@ use fdlimit::raise_fd_limit;
use futures::{future::Either, pin_mut, stream, stream_select, StreamExt};
use reth_auto_seal_consensus::{AutoSealBuilder, AutoSealConsensus};
use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig};
use reth_beacon_consensus::{BeaconConsensus, BeaconConsensusEngine};
use reth_beacon_consensus::{BeaconConsensus, BeaconConsensusEngine, MIN_BLOCKS_FOR_PIPELINE_RUN};
use reth_blockchain_tree::{
config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree,
};
@ -341,6 +341,7 @@ impl Command {
self.debug.continuous,
payload_builder.clone(),
initial_target,
MIN_BLOCKS_FOR_PIPELINE_RUN,
consensus_engine_tx,
consensus_engine_rx,
)?;

View File

@ -65,6 +65,10 @@ use reth_interfaces::blockchain_tree::InsertPayloadOk;
/// The maximum number of invalid headers that can be tracked by the engine.
const MAX_INVALID_HEADERS: u32 = 512u32;
/// The largest gap for which the tree will be used for sync. See docs for `pipeline_run_threshold`
/// for more information.
pub const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = 128;
/// A _shareable_ beacon consensus frontend. Used to interact with the spawned beacon consensus
/// engine.
///
@ -234,6 +238,18 @@ where
invalid_headers: InvalidHeaderCache,
/// Consensus engine metrics.
metrics: EngineMetrics,
/// After downloading a block corresponding to a recent forkchoice update, the engine will
/// check whether or not we can connect the block to the current canonical chain. If we can't,
/// we need to download and execute the missing parents of that block.
///
/// When the block can't be connected, its block number will be compared to the canonical head,
/// resulting in a heuristic for the number of missing blocks, or the size of the gap between
/// the new block and the canonical head.
///
/// If the gap is larger than this threshold, the engine will download and execute the missing
/// blocks using the pipeline. Otherwise, the engine, sync controller, and blockchain tree will
/// be used to download and execute the missing blocks.
pipeline_run_threshold: u64,
}
impl<DB, BT, Client> BeaconConsensusEngine<DB, BT, Client>
@ -254,6 +270,7 @@ where
run_pipeline_continuously: bool,
payload_builder: PayloadBuilderHandle,
target: Option<H256>,
pipeline_run_threshold: u64,
) -> Result<(Self, BeaconConsensusEngineHandle), reth_interfaces::Error> {
let (to_engine, rx) = mpsc::unbounded_channel();
Self::with_channel(
@ -266,6 +283,7 @@ where
run_pipeline_continuously,
payload_builder,
target,
pipeline_run_threshold,
to_engine,
rx,
)
@ -294,6 +312,7 @@ where
run_pipeline_continuously: bool,
payload_builder: PayloadBuilderHandle,
target: Option<H256>,
pipeline_run_threshold: u64,
to_engine: UnboundedSender<BeaconEngineMessage>,
rx: UnboundedReceiver<BeaconEngineMessage>,
) -> Result<(Self, BeaconConsensusEngineHandle), reth_interfaces::Error> {
@ -316,6 +335,7 @@ where
listeners: EventListeners::default(),
invalid_headers: InvalidHeaderCache::new(MAX_INVALID_HEADERS),
metrics: EngineMetrics::default(),
pipeline_run_threshold,
};
let maybe_pipeline_target = match target {
@ -678,10 +698,7 @@ where
// we assume the FCU is valid and at least the head is missing, so we need to start syncing
// to it
// if this is the first FCU we received from the beacon node, then we start triggering the
// pipeline
if self.forkchoice_state_tracker.is_empty() {
let target = if self.forkchoice_state_tracker.is_empty() {
// find the appropriate target to sync to, if we don't have the safe block hash then we
// start syncing to the safe block via pipeline first
let target = if !state.safe_block_hash.is_zero() &&
@ -694,19 +711,24 @@ where
// we need to first check the buffer for the head and its ancestors
let lowest_unknown_hash = self.lowest_buffered_ancestor_or(target);
trace!(target: "consensus::engine", request=?lowest_unknown_hash, "Triggering pipeline with target instead of downloading");
self.sync.set_pipeline_sync_target(lowest_unknown_hash);
trace!(target: "consensus::engine", request=?lowest_unknown_hash, "Triggering full block download for missing ancestors of the new head");
lowest_unknown_hash
} else {
// we need to first check the buffer for the head and its ancestors
let lowest_unknown_hash = self.lowest_buffered_ancestor_or(state.head_block_hash);
trace!(target: "consensus::engine", request=?lowest_unknown_hash, "Triggering full block download for missing ancestors of the new head");
lowest_unknown_hash
};
// if the threshold is zero, we should not download the block first, and just use the
// pipeline. Otherwise we use the tree to insert the block first
if self.pipeline_run_threshold == 0 {
// use the pipeline to sync to the target
self.sync.set_pipeline_sync_target(target);
} else {
// trigger a full block download for missing hash, or the parent of its lowest buffered
// ancestor
self.sync.download_full_block(lowest_unknown_hash);
self.sync.download_full_block(target);
}
PayloadStatus::from_status(PayloadStatusEnum::Syncing)
@ -797,9 +819,7 @@ where
let block_hash = block.hash();
// now check the block itself
if let Some(status) = self.check_invalid_ancestor(block.parent_hash) {
// The parent is invalid, so this block is also invalid
self.invalid_headers.insert(block.header);
if let Some(status) = self.check_invalid_ancestor_with_head(block.parent_hash, block.hash) {
return Ok(status)
}
@ -1030,9 +1050,40 @@ where
self.try_make_sync_target_canonical(num_hash);
}
InsertPayloadOk::Inserted(BlockStatus::Disconnected { missing_parent }) => {
// compare the missing parent with the canonical tip
let canonical_tip_num = self.blockchain.canonical_tip().number;
// if the number of missing blocks is greater than the max, run the
// pipeline
if missing_parent.number >= canonical_tip_num &&
missing_parent.number - canonical_tip_num >
self.pipeline_run_threshold
{
if let Some(state) = self.forkchoice_state_tracker.sync_target_state() {
// if we have already canonicalized the finalized block, we should
// skip the pipeline run
if Ok(None) ==
self.blockchain.header_by_hash_or_number(
state.finalized_block_hash.into(),
)
{
self.sync.set_pipeline_sync_target(state.finalized_block_hash)
}
}
} else {
// continue downloading the missing parent
//
// this happens if either:
// * the missing parent block num < canonical tip num
// * this case represents a missing block on a fork that is shorter
// than the canonical chain
// * the missing parent block num >= canonical tip num, but the number
// of missing blocks is less than the pipeline threshold
// * this case represents a potentially long range of blocks to
// download and execute
self.sync.download_full_block(missing_parent.hash);
}
}
_ => (),
}
}
@ -1434,39 +1485,93 @@ mod tests {
}
}
fn setup_consensus_engine(
struct TestConsensusEngineBuilder {
chain_spec: Arc<ChainSpec>,
pipeline_exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
executor_results: Vec<PostState>,
) -> (TestBeaconConsensusEngine, TestEnv<Arc<Env<WriteMap>>>) {
pipeline_run_threshold: Option<u64>,
max_block: Option<BlockNumber>,
}
impl TestConsensusEngineBuilder {
/// Create a new `TestConsensusEngineBuilder` with the given `ChainSpec`.
fn new(chain_spec: Arc<ChainSpec>) -> Self {
Self {
chain_spec,
pipeline_exec_outputs: VecDeque::new(),
executor_results: Vec::new(),
pipeline_run_threshold: None,
max_block: None,
}
}
/// Set the pipeline execution outputs to use for the test consensus engine.
fn with_pipeline_exec_outputs(
mut self,
pipeline_exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
) -> Self {
self.pipeline_exec_outputs = pipeline_exec_outputs;
self
}
/// Set the executor results to use for the test consensus engine.
fn with_executor_results(mut self, executor_results: Vec<PostState>) -> Self {
self.executor_results = executor_results;
self
}
/// Sets the max block for the pipeline to run.
fn with_max_block(mut self, max_block: BlockNumber) -> Self {
self.max_block = Some(max_block);
self
}
/// Disables blockchain tree driven sync. This is the same as setting the pipeline run
/// threshold to 0.
fn disable_blockchain_tree_sync(mut self) -> Self {
self.pipeline_run_threshold = Some(0);
self
}
/// Builds the test consensus engine into a `TestConsensusEngine` and `TestEnv`.
fn build(self) -> (TestBeaconConsensusEngine, TestEnv<Arc<Env<WriteMap>>>) {
reth_tracing::init_test_tracing();
let db = create_test_rw_db();
let consensus = TestConsensus::default();
let payload_builder = spawn_test_payload_service();
let executor_factory = TestExecutorFactory::new(chain_spec.clone());
executor_factory.extend(executor_results);
let executor_factory = TestExecutorFactory::new(self.chain_spec.clone());
executor_factory.extend(self.executor_results);
// Setup pipeline
let (tip_tx, tip_rx) = watch::channel(H256::default());
let pipeline = Pipeline::builder()
.add_stages(TestStages::new(pipeline_exec_outputs, Default::default()))
.with_tip_sender(tip_tx)
.build(db.clone(), chain_spec.clone());
let mut pipeline = Pipeline::builder()
.add_stages(TestStages::new(self.pipeline_exec_outputs, Default::default()))
.with_tip_sender(tip_tx);
if let Some(max_block) = self.max_block {
pipeline = pipeline.with_max_block(max_block);
}
let pipeline = pipeline.build(db.clone(), self.chain_spec.clone());
// Setup blockchain tree
let externals =
TreeExternals::new(db.clone(), consensus, executor_factory, chain_spec.clone());
let externals = TreeExternals::new(
db.clone(),
consensus,
executor_factory,
self.chain_spec.clone(),
);
let config = BlockchainTreeConfig::new(1, 2, 3, 2);
let (canon_state_notification_sender, _) = tokio::sync::broadcast::channel(3);
let tree = ShareableBlockchainTree::new(
BlockchainTree::new(externals, canon_state_notification_sender, config)
.expect("failed to create tree"),
);
let factory = ProviderFactory::new(db.clone(), chain_spec.clone());
let latest = chain_spec.genesis_header().seal_slow();
let blockchain_provider = BlockchainProvider::with_latest(factory, tree, latest);
let (engine, handle) = BeaconConsensusEngine::new(
let shareable_db = ProviderFactory::new(db.clone(), self.chain_spec.clone());
let latest = self.chain_spec.genesis_header().seal_slow();
let blockchain_provider = BlockchainProvider::with_latest(shareable_db, tree, latest);
let (mut engine, handle) = BeaconConsensusEngine::new(
NoopFullBlockClient::default(),
pipeline,
blockchain_provider,
@ -1476,11 +1581,17 @@ mod tests {
false,
payload_builder,
None,
self.pipeline_run_threshold.unwrap_or(MIN_BLOCKS_FOR_PIPELINE_RUN),
)
.expect("failed to create consensus engine");
if let Some(max_block) = self.max_block {
engine.sync.set_max_block(max_block)
}
(engine, TestEnv::new(db, tip_rx, handle))
}
}
fn spawn_consensus_engine(
engine: TestBeaconConsensusEngine,
@ -1503,11 +1614,13 @@ mod tests {
.paris_activated()
.build(),
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([Err(StageError::ChannelClosed)]),
Vec::default(),
);
let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([Err(StageError::ChannelClosed)]))
.disable_blockchain_tree_sync()
.with_max_block(1)
.build();
let res = spawn_consensus_engine(consensus_engine);
let _ = env
@ -1532,11 +1645,13 @@ mod tests {
.paris_activated()
.build(),
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([Err(StageError::ChannelClosed)]),
Vec::default(),
);
let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([Err(StageError::ChannelClosed)]))
.disable_blockchain_tree_sync()
.with_max_block(1)
.build();
let mut rx = spawn_consensus_engine(consensus_engine);
// consensus engine is idle
@ -1572,14 +1687,16 @@ mod tests {
.paris_activated()
.build(),
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([
let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([
Ok(ExecOutput { checkpoint: StageCheckpoint::new(1), done: true }),
Err(StageError::ChannelClosed),
]),
Vec::default(),
);
]))
.disable_blockchain_tree_sync()
.with_max_block(2)
.build();
let rx = spawn_consensus_engine(consensus_engine);
let _ = env
@ -1605,15 +1722,16 @@ mod tests {
.paris_activated()
.build(),
);
let (mut consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([Ok(ExecOutput {
let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(max_block),
done: true,
})]),
Vec::default(),
);
consensus_engine.sync.set_max_block(max_block);
})]))
.with_max_block(max_block)
.disable_blockchain_tree_sync()
.build();
let rx = spawn_consensus_engine(consensus_engine);
let _ = env
@ -1651,14 +1769,13 @@ mod tests {
.paris_activated()
.build(),
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([Ok(ExecOutput {
done: true,
let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(0),
})]),
Vec::default(),
);
done: true,
})]))
.build();
let mut engine_rx = spawn_consensus_engine(consensus_engine);
@ -1682,14 +1799,13 @@ mod tests {
.paris_activated()
.build(),
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([Ok(ExecOutput {
done: true,
let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(0),
})]),
Vec::default(),
);
done: true,
})]))
.build();
let genesis = random_block(0, None, None, Some(0));
let block1 = random_block(1, Some(genesis.hash), None, Some(0));
@ -1730,14 +1846,14 @@ mod tests {
.paris_activated()
.build(),
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
]),
Vec::default(),
);
let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }),
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }),
]))
.disable_blockchain_tree_sync()
.build();
let genesis = random_block(0, None, None, Some(0));
let block1 = random_block(1, Some(genesis.hash), None, Some(0));
@ -1779,14 +1895,14 @@ mod tests {
.paris_activated()
.build(),
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([Ok(ExecOutput {
done: true,
let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(0),
})]),
Vec::default(),
);
done: true,
})]))
.disable_blockchain_tree_sync()
.build();
let genesis = random_block(0, None, None, Some(0));
let block1 = random_block(1, Some(genesis.hash), None, Some(0));
@ -1816,14 +1932,13 @@ mod tests {
.paris_at_ttd(U256::from(3))
.build(),
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
]),
Vec::default(),
);
let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }),
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }),
]))
.build();
let genesis = random_block(0, None, None, Some(0));
let mut block1 = random_block(1, Some(genesis.hash), None, Some(0));
@ -1869,14 +1984,13 @@ mod tests {
.london_activated()
.build(),
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
]),
Vec::default(),
);
let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }),
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }),
]))
.build();
let genesis = random_block(0, None, None, Some(0));
let block1 = random_block(1, Some(genesis.hash), None, Some(0));
@ -1916,14 +2030,13 @@ mod tests {
.paris_activated()
.build(),
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([Ok(ExecOutput {
done: true,
let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(0),
})]),
Vec::default(),
);
done: true,
})]))
.build();
let mut engine_rx = spawn_consensus_engine(consensus_engine);
@ -1949,14 +2062,13 @@ mod tests {
.paris_activated()
.build(),
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([Ok(ExecOutput {
done: true,
let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(0),
})]),
Vec::default(),
);
done: true,
})]))
.build();
let genesis = random_block(0, None, None, Some(0));
let block1 = random_block(1, Some(genesis.hash), None, Some(0));
@ -1999,14 +2111,13 @@ mod tests {
.paris_activated()
.build(),
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([Ok(ExecOutput {
done: true,
let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(0),
})]),
Vec::default(),
);
done: true,
})]))
.build();
let genesis = random_block(0, None, None, Some(0));
@ -2056,14 +2167,14 @@ mod tests {
.london_activated()
.build(),
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([Ok(ExecOutput {
done: true,
let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(0),
})]),
Vec::from([exec_result2]),
);
done: true,
})]))
.with_executor_results(Vec::from([exec_result2]))
.build();
insert_blocks(
env.db.as_ref(),