feat: let consensus tests configure pipeline, executor, and client (#3839)

This commit is contained in:
Dan Cline
2023-07-18 14:16:17 -04:00
committed by GitHub
parent 34c9abe249
commit 5ad9b32cbc
3 changed files with 292 additions and 116 deletions

2
Cargo.lock generated
View File

@ -5050,12 +5050,14 @@ dependencies = [
"reth-blockchain-tree",
"reth-consensus-common",
"reth-db",
"reth-downloaders",
"reth-interfaces",
"reth-metrics",
"reth-payload-builder",
"reth-primitives",
"reth-provider",
"reth-prune",
"reth-revm",
"reth-rpc-types",
"reth-stages",
"reth-tasks",

View File

@ -40,5 +40,7 @@ reth-blockchain-tree = { path = "../../blockchain-tree", features = ["test-utils
reth-db = { path = "../../storage/db", features = ["test-utils"] }
reth-provider = { workspace = true, features = ["test-utils"] }
reth-tracing = { path = "../../tracing" }
reth-revm = { path = "../../revm" }
reth-downloaders = { path = "../../net/downloaders" }
assert_matches = "1.5"

View File

@ -1716,20 +1716,30 @@ mod tests {
BlockchainTree, ShareableBlockchainTree,
};
use reth_db::{test_utils::create_test_rw_db, DatabaseEnv};
use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder,
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
};
use reth_interfaces::{
consensus::Consensus,
p2p::either::EitherDownloader,
sync::NoopSyncStateUpdater,
test_utils::{NoopFullBlockClient, TestConsensus},
};
use reth_payload_builder::test_utils::spawn_test_payload_service;
use reth_primitives::{stage::StageCheckpoint, ChainSpec, ChainSpecBuilder, H256, MAINNET};
use reth_provider::{
providers::BlockchainProvider, test_utils::TestExecutorFactory, BlockWriter,
ProviderFactory,
providers::BlockchainProvider, test_utils::TestExecutorFactory, BlockExecutor, BlockWriter,
ExecutorFactory, ProviderFactory, StateProvider,
};
use reth_revm::Factory;
use reth_rpc_types::engine::{
ExecutionPayload, ForkchoiceState, ForkchoiceUpdated, PayloadStatus,
};
use reth_stages::{test_utils::TestStages, ExecOutput, PipelineError, StageError};
use reth_stages::{
sets::DefaultStages, stages::HeaderSyncMode, test_utils::TestStages, ExecOutput,
PipelineError, StageError,
};
use reth_tasks::TokioTaskExecutor;
use std::{collections::VecDeque, sync::Arc, time::Duration};
use tokio::sync::{
@ -1737,13 +1747,17 @@ mod tests {
watch,
};
type TestBeaconConsensusEngine = BeaconConsensusEngine<
type TestBeaconConsensusEngine<Client> = BeaconConsensusEngine<
Arc<DatabaseEnv>,
BlockchainProvider<
Arc<DatabaseEnv>,
ShareableBlockchainTree<Arc<DatabaseEnv>, TestConsensus, TestExecutorFactory>,
ShareableBlockchainTree<
Arc<DatabaseEnv>,
Arc<TestConsensus>,
EitherExecutorFactory<TestExecutorFactory, Factory>,
>,
>,
NoopFullBlockClient,
Arc<EitherDownloader<Client, NoopFullBlockClient>>,
>;
struct TestEnv<DB> {
@ -1806,22 +1820,124 @@ mod tests {
}
}
struct TestConsensusEngineBuilder {
chain_spec: Arc<ChainSpec>,
pipeline_exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
executor_results: Vec<PostState>,
pipeline_run_threshold: Option<u64>,
max_block: Option<BlockNumber>,
/// Represents either test pipeline outputs, or real pipeline configuration.
#[derive(Default)]
enum TestPipelineConfig {
/// Test pipeline outputs.
Test(VecDeque<Result<ExecOutput, StageError>>),
/// Real pipeline configuration.
#[default]
Real,
}
impl TestConsensusEngineBuilder {
/// Represents either test executor results, or real executor configuration.
#[derive(Default)]
enum TestExecutorConfig {
/// Test executor results.
Test(Vec<PostState>),
/// Real executor configuration.
#[default]
Real,
}
/// A type that represents one of two possible executor factories.
#[derive(Debug, Clone)]
enum EitherExecutorFactory<A: ExecutorFactory, B: ExecutorFactory> {
/// The first factory variant
Left(A),
/// The second factory variant
Right(B),
}
// A type that represents one of two possible BlockExecutor types.
#[derive(Debug)]
enum EitherBlockExecutor<A, B> {
/// The first executor variant
Left(A),
/// The second executor variant
Right(B),
}
impl<A, B, SP> BlockExecutor<SP> for EitherBlockExecutor<A, B>
where
A: BlockExecutor<SP>,
B: BlockExecutor<SP>,
SP: StateProvider,
{
fn execute(
&mut self,
block: &reth_primitives::Block,
total_difficulty: U256,
senders: Option<Vec<reth_primitives::Address>>,
) -> Result<PostState, BlockExecutionError> {
match self {
EitherBlockExecutor::Left(a) => a.execute(block, total_difficulty, senders),
EitherBlockExecutor::Right(b) => b.execute(block, total_difficulty, senders),
}
}
fn execute_and_verify_receipt(
&mut self,
block: &reth_primitives::Block,
total_difficulty: U256,
senders: Option<Vec<reth_primitives::Address>>,
) -> Result<PostState, BlockExecutionError> {
match self {
EitherBlockExecutor::Left(a) => {
a.execute_and_verify_receipt(block, total_difficulty, senders)
}
EitherBlockExecutor::Right(b) => {
b.execute_and_verify_receipt(block, total_difficulty, senders)
}
}
}
}
impl<A, B> ExecutorFactory for EitherExecutorFactory<A, B>
where
A: ExecutorFactory,
B: ExecutorFactory,
{
type Executor<T: StateProvider> = EitherBlockExecutor<A::Executor<T>, B::Executor<T>>;
fn chain_spec(&self) -> &ChainSpec {
match self {
EitherExecutorFactory::Left(a) => a.chain_spec(),
EitherExecutorFactory::Right(b) => b.chain_spec(),
}
}
fn with_sp<SP: reth_provider::StateProvider>(&self, sp: SP) -> Self::Executor<SP> {
match self {
EitherExecutorFactory::Left(a) => EitherBlockExecutor::Left(a.with_sp(sp)),
EitherExecutorFactory::Right(b) => EitherBlockExecutor::Right(b.with_sp(sp)),
}
}
}
/// A builder for `TestConsensusEngine`, allows configuration of mocked pipeline outputs and
/// mocked executor results.
struct TestConsensusEngineBuilder<Client> {
chain_spec: Arc<ChainSpec>,
pipeline_config: TestPipelineConfig,
executor_config: TestExecutorConfig,
pipeline_run_threshold: Option<u64>,
max_block: Option<BlockNumber>,
client: Option<Client>,
}
impl<Client> TestConsensusEngineBuilder<Client>
where
Client: HeadersClient + BodiesClient + 'static,
{
/// Create a new `TestConsensusEngineBuilder` with the given `ChainSpec`.
fn new(chain_spec: Arc<ChainSpec>) -> Self {
Self {
chain_spec,
pipeline_exec_outputs: VecDeque::new(),
executor_results: Vec::new(),
pipeline_config: Default::default(),
executor_config: Default::default(),
pipeline_run_threshold: None,
client: None,
max_block: None,
}
}
@ -1831,13 +1947,13 @@ mod tests {
mut self,
pipeline_exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
) -> Self {
self.pipeline_exec_outputs = pipeline_exec_outputs;
self.pipeline_config = TestPipelineConfig::Test(pipeline_exec_outputs);
self
}
/// Set the executor results to use for the test consensus engine.
fn with_executor_results(mut self, executor_results: Vec<PostState>) -> Self {
self.executor_results = executor_results;
self.executor_config = TestExecutorConfig::Test(executor_results);
self
}
@ -1847,6 +1963,13 @@ mod tests {
self
}
/// Sets the client to use for network operations.
#[allow(dead_code)]
fn with_client(mut self, client: Client) -> Self {
self.client = Some(client);
self
}
/// Disables blockchain tree driven sync. This is the same as setting the pipeline run
/// threshold to 0.
fn disable_blockchain_tree_sync(mut self) -> Self {
@ -1855,20 +1978,55 @@ mod tests {
}
/// Builds the test consensus engine into a `TestConsensusEngine` and `TestEnv`.
fn build(self) -> (TestBeaconConsensusEngine, TestEnv<Arc<DatabaseEnv>>) {
fn build(self) -> (TestBeaconConsensusEngine<Client>, TestEnv<Arc<DatabaseEnv>>) {
reth_tracing::init_test_tracing();
let db = create_test_rw_db();
let consensus = TestConsensus::default();
let consensus = Arc::new(TestConsensus::default());
let payload_builder = spawn_test_payload_service();
let executor_factory = TestExecutorFactory::new(self.chain_spec.clone());
executor_factory.extend(self.executor_results);
// use either noop client or a user provided client (for example TestFullBlockClient)
let client = Arc::new(
self.client
.map(EitherDownloader::Left)
.unwrap_or_else(|| EitherDownloader::Right(NoopFullBlockClient::default())),
);
// use either test executor or real executor
let executor_factory = match self.executor_config {
TestExecutorConfig::Test(results) => {
let executor_factory = TestExecutorFactory::new(self.chain_spec.clone());
executor_factory.extend(results);
EitherExecutorFactory::Left(executor_factory)
}
TestExecutorConfig::Real => {
EitherExecutorFactory::Right(Factory::new(self.chain_spec.clone()))
}
};
// Setup pipeline
let (tip_tx, tip_rx) = watch::channel(H256::default());
let mut pipeline = Pipeline::builder()
.add_stages(TestStages::new(self.pipeline_exec_outputs, Default::default()))
.with_tip_sender(tip_tx);
let mut pipeline = match self.pipeline_config {
TestPipelineConfig::Test(outputs) => Pipeline::builder()
.add_stages(TestStages::new(outputs, Default::default()))
.with_tip_sender(tip_tx),
TestPipelineConfig::Real => {
let header_downloader = ReverseHeadersDownloaderBuilder::default()
.build(client.clone(), consensus.clone())
.into_task();
let body_downloader = BodiesDownloaderBuilder::default()
.build(client.clone(), consensus.clone(), db.clone())
.into_task();
Pipeline::builder().add_stages(DefaultStages::new(
HeaderSyncMode::Tip(tip_rx.clone()),
Arc::clone(&consensus) as Arc<dyn Consensus>,
header_downloader,
body_downloader,
executor_factory.clone(),
))
}
};
if let Some(max_block) = self.max_block {
pipeline = pipeline.with_max_block(max_block);
@ -1896,7 +2054,7 @@ mod tests {
let pruner = Pruner::new(5, 0);
let (mut engine, handle) = BeaconConsensusEngine::new(
NoopFullBlockClient::default(),
client,
pipeline,
blockchain_provider,
Box::<TokioTaskExecutor>::default(),
@ -1918,8 +2076,8 @@ mod tests {
}
}
fn spawn_consensus_engine(
engine: TestBeaconConsensusEngine,
fn spawn_consensus_engine<Client: HeadersClient + BodiesClient + 'static>(
engine: TestBeaconConsensusEngine<Client>,
) -> oneshot::Receiver<Result<(), BeaconConsensusEngineError>> {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
@ -1940,11 +2098,12 @@ mod tests {
.build(),
);
let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([Err(StageError::ChannelClosed)]))
.disable_blockchain_tree_sync()
.with_max_block(1)
.build();
let (consensus_engine, env) =
TestConsensusEngineBuilder::<NoopFullBlockClient>::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([Err(StageError::ChannelClosed)]))
.disable_blockchain_tree_sync()
.with_max_block(1)
.build();
let res = spawn_consensus_engine(consensus_engine);
@ -1971,11 +2130,12 @@ mod tests {
.build(),
);
let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([Err(StageError::ChannelClosed)]))
.disable_blockchain_tree_sync()
.with_max_block(1)
.build();
let (consensus_engine, env) =
TestConsensusEngineBuilder::<NoopFullBlockClient>::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([Err(StageError::ChannelClosed)]))
.disable_blockchain_tree_sync()
.with_max_block(1)
.build();
let mut rx = spawn_consensus_engine(consensus_engine);
@ -2033,14 +2193,15 @@ mod tests {
.build(),
);
let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([
Ok(ExecOutput { checkpoint: StageCheckpoint::new(1), done: true }),
Err(StageError::ChannelClosed),
]))
.disable_blockchain_tree_sync()
.with_max_block(2)
.build();
let (consensus_engine, env) =
TestConsensusEngineBuilder::<NoopFullBlockClient>::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([
Ok(ExecOutput { checkpoint: StageCheckpoint::new(1), done: true }),
Err(StageError::ChannelClosed),
]))
.disable_blockchain_tree_sync()
.with_max_block(2)
.build();
let rx = spawn_consensus_engine(consensus_engine);
@ -2068,14 +2229,15 @@ mod tests {
.build(),
);
let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(max_block),
done: true,
})]))
.with_max_block(max_block)
.disable_blockchain_tree_sync()
.build();
let (consensus_engine, env) =
TestConsensusEngineBuilder::<NoopFullBlockClient>::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(max_block),
done: true,
})]))
.with_max_block(max_block)
.disable_blockchain_tree_sync()
.build();
let rx = spawn_consensus_engine(consensus_engine);
@ -2117,12 +2279,13 @@ mod tests {
.build(),
);
let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(0),
done: true,
})]))
.build();
let (consensus_engine, env) =
TestConsensusEngineBuilder::<NoopFullBlockClient>::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(0),
done: true,
})]))
.build();
let mut engine_rx = spawn_consensus_engine(consensus_engine);
@ -2148,12 +2311,13 @@ mod tests {
.build(),
);
let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(0),
done: true,
})]))
.build();
let (consensus_engine, env) =
TestConsensusEngineBuilder::<NoopFullBlockClient>::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(0),
done: true,
})]))
.build();
let genesis = random_block(&mut rng, 0, None, None, Some(0));
let block1 = random_block(&mut rng, 1, Some(genesis.hash), None, Some(0));
@ -2197,13 +2361,14 @@ mod tests {
.build(),
);
let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }),
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }),
]))
.disable_blockchain_tree_sync()
.build();
let (consensus_engine, env) =
TestConsensusEngineBuilder::<NoopFullBlockClient>::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }),
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }),
]))
.disable_blockchain_tree_sync()
.build();
let genesis = random_block(&mut rng, 0, None, None, Some(0));
let block1 = random_block(&mut rng, 1, Some(genesis.hash), None, Some(0));
@ -2247,13 +2412,14 @@ mod tests {
.build(),
);
let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(0),
done: true,
})]))
.disable_blockchain_tree_sync()
.build();
let (consensus_engine, env) =
TestConsensusEngineBuilder::<NoopFullBlockClient>::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(0),
done: true,
})]))
.disable_blockchain_tree_sync()
.build();
let genesis = random_block(&mut rng, 0, None, None, Some(0));
let block1 = random_block(&mut rng, 1, Some(genesis.hash), None, Some(0));
@ -2285,12 +2451,13 @@ mod tests {
.build(),
);
let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }),
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }),
]))
.build();
let (consensus_engine, env) =
TestConsensusEngineBuilder::<NoopFullBlockClient>::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }),
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }),
]))
.build();
let genesis = random_block(&mut rng, 0, None, None, Some(0));
let mut block1 = random_block(&mut rng, 1, Some(genesis.hash), None, Some(0));
@ -2338,12 +2505,13 @@ mod tests {
.build(),
);
let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }),
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }),
]))
.build();
let (consensus_engine, env) =
TestConsensusEngineBuilder::<NoopFullBlockClient>::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }),
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }),
]))
.build();
let genesis = random_block(&mut rng, 0, None, None, Some(0));
let block1 = random_block(&mut rng, 1, Some(genesis.hash), None, Some(0));
@ -2385,12 +2553,13 @@ mod tests {
.build(),
);
let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(0),
done: true,
})]))
.build();
let (consensus_engine, env) =
TestConsensusEngineBuilder::<NoopFullBlockClient>::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(0),
done: true,
})]))
.build();
let mut engine_rx = spawn_consensus_engine(consensus_engine);
@ -2420,12 +2589,13 @@ mod tests {
.build(),
);
let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(0),
done: true,
})]))
.build();
let (consensus_engine, env) =
TestConsensusEngineBuilder::<NoopFullBlockClient>::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(0),
done: true,
})]))
.build();
let genesis = random_block(&mut rng, 0, None, None, Some(0));
let block1 = random_block(&mut rng, 1, Some(genesis.hash), None, Some(0));
@ -2470,12 +2640,13 @@ mod tests {
.build(),
);
let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(0),
done: true,
})]))
.build();
let (consensus_engine, env) =
TestConsensusEngineBuilder::<NoopFullBlockClient>::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(0),
done: true,
})]))
.build();
let genesis = random_block(&mut rng, 0, None, None, Some(0));
@ -2526,13 +2697,14 @@ mod tests {
.build(),
);
let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(0),
done: true,
})]))
.with_executor_results(Vec::from([exec_result2]))
.build();
let (consensus_engine, env) =
TestConsensusEngineBuilder::<NoopFullBlockClient>::new(chain_spec.clone())
.with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(0),
done: true,
})]))
.with_executor_results(Vec::from([exec_result2]))
.build();
insert_blocks(
env.db.as_ref(),