mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
test: add EngineApiTreeHandlerImpl integration test (#9453)
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -7070,6 +7070,7 @@ dependencies = [
|
||||
"reth-engine-primitives",
|
||||
"reth-errors",
|
||||
"reth-ethereum-consensus",
|
||||
"reth-ethereum-engine-primitives",
|
||||
"reth-evm",
|
||||
"reth-exex-types",
|
||||
"reth-metrics",
|
||||
|
||||
@ -63,6 +63,8 @@ reth-tracing = { workspace = true, optional = true }
|
||||
[dev-dependencies]
|
||||
# reth
|
||||
reth-db = { workspace = true, features = ["test-utils"] }
|
||||
reth-ethereum-engine-primitives.workspace = true
|
||||
reth-evm = { workspace = true, features = ["test-utils"] }
|
||||
reth-exex-types.workspace = true
|
||||
reth-network-p2p = { workspace = true, features = ["test-utils"] }
|
||||
reth-prune.workspace = true
|
||||
|
||||
@ -286,17 +286,14 @@ impl PersistenceHandle {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::test_utils::get_executed_block_with_number;
|
||||
use reth_chainspec::MAINNET;
|
||||
use reth_db::test_utils::{create_test_rw_db, create_test_static_files_dir};
|
||||
use reth_exex_types::FinishedExExHeight;
|
||||
use reth_primitives::{
|
||||
Address, Block, Receipts, Requests, SealedBlockWithSenders, TransactionSigned, B256,
|
||||
};
|
||||
use reth_provider::{providers::StaticFileProvider, ExecutionOutcome, ProviderFactory};
|
||||
use reth_primitives::B256;
|
||||
use reth_provider::{providers::StaticFileProvider, ProviderFactory};
|
||||
use reth_prune::Pruner;
|
||||
use reth_trie::{updates::TrieUpdates, HashedPostState};
|
||||
use revm::db::BundleState;
|
||||
use std::sync::{mpsc::channel, Arc};
|
||||
use std::sync::mpsc::channel;
|
||||
|
||||
fn default_persistence_handle() -> PersistenceHandle {
|
||||
let db = create_test_rw_db();
|
||||
@ -340,29 +337,10 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_save_blocks_single_block() {
|
||||
let persistence_handle = default_persistence_handle();
|
||||
let block_number = 5;
|
||||
let executed = get_executed_block_with_number(block_number);
|
||||
let block_hash = executed.block().hash();
|
||||
|
||||
let mut block = Block::default();
|
||||
let sender = Address::random();
|
||||
let tx = TransactionSigned::default();
|
||||
block.body.push(tx);
|
||||
let block_hash = block.hash_slow();
|
||||
let block_number = block.number;
|
||||
let sealed = block.seal_slow();
|
||||
let sealed_with_senders =
|
||||
SealedBlockWithSenders::new(sealed.clone(), vec![sender]).unwrap();
|
||||
|
||||
let executed = ExecutedBlock::new(
|
||||
Arc::new(sealed),
|
||||
Arc::new(sealed_with_senders.senders),
|
||||
Arc::new(ExecutionOutcome::new(
|
||||
BundleState::default(),
|
||||
Receipts { receipt_vec: vec![] },
|
||||
block_number,
|
||||
vec![Requests::default()],
|
||||
)),
|
||||
Arc::new(HashedPostState::default()),
|
||||
Arc::new(TrieUpdates::default()),
|
||||
);
|
||||
let blocks = vec![executed];
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
|
||||
@ -1,12 +1,18 @@
|
||||
use crate::tree::ExecutedBlock;
|
||||
use reth_chainspec::ChainSpec;
|
||||
use reth_db::{mdbx::DatabaseEnv, test_utils::TempDatabase};
|
||||
use reth_network_p2p::test_utils::TestFullBlockClient;
|
||||
use reth_primitives::{BlockBody, SealedHeader, B256};
|
||||
use reth_primitives::{
|
||||
Address, Block, BlockBody, BlockNumber, Receipts, Requests, SealedBlockWithSenders,
|
||||
SealedHeader, TransactionSigned, B256,
|
||||
};
|
||||
use reth_provider::{test_utils::create_test_provider_factory_with_chain_spec, ExecutionOutcome};
|
||||
use reth_prune_types::PruneModes;
|
||||
use reth_stages::{test_utils::TestStages, ExecOutput, StageError};
|
||||
use reth_stages_api::Pipeline;
|
||||
use reth_static_file::StaticFileProducer;
|
||||
use reth_trie::{updates::TrieUpdates, HashedPostState};
|
||||
use revm::db::BundleState;
|
||||
use std::{collections::VecDeque, ops::Range, sync::Arc};
|
||||
use tokio::sync::watch;
|
||||
|
||||
@ -75,3 +81,33 @@ pub(crate) fn insert_headers_into_client(
|
||||
client.insert(sealed_header.clone(), body.clone());
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_executed_block_with_number(block_number: BlockNumber) -> ExecutedBlock {
|
||||
let mut block = Block::default();
|
||||
let mut header = block.header.clone();
|
||||
header.number = block_number;
|
||||
block.header = header;
|
||||
|
||||
let sender = Address::random();
|
||||
let tx = TransactionSigned::default();
|
||||
block.body.push(tx);
|
||||
let sealed = block.seal_slow();
|
||||
let sealed_with_senders = SealedBlockWithSenders::new(sealed.clone(), vec![sender]).unwrap();
|
||||
|
||||
ExecutedBlock::new(
|
||||
Arc::new(sealed),
|
||||
Arc::new(sealed_with_senders.senders),
|
||||
Arc::new(ExecutionOutcome::new(
|
||||
BundleState::default(),
|
||||
Receipts { receipt_vec: vec![] },
|
||||
block_number,
|
||||
vec![Requests::default()],
|
||||
)),
|
||||
Arc::new(HashedPostState::default()),
|
||||
Arc::new(TrieUpdates::default()),
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn get_executed_blocks(number: u64) -> Vec<ExecutedBlock> {
|
||||
(1..=number).map(get_executed_block_with_number).collect()
|
||||
}
|
||||
|
||||
@ -48,7 +48,7 @@ pub use memory_overlay::MemoryOverlayStateProvider;
|
||||
const PERSISTENCE_THRESHOLD: u64 = 256;
|
||||
|
||||
/// Represents an executed block stored in-memory.
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub struct ExecutedBlock {
|
||||
block: Arc<SealedBlock>,
|
||||
senders: Arc<Vec<Address>>,
|
||||
@ -326,44 +326,39 @@ where
|
||||
}
|
||||
|
||||
fn run(mut self) {
|
||||
loop {
|
||||
while let Ok(msg) = self.incoming.recv() {
|
||||
match msg {
|
||||
FromEngine::Event(event) => match event {
|
||||
FromOrchestrator::BackfillSyncFinished => {
|
||||
todo!()
|
||||
while let Ok(msg) = self.incoming.recv() {
|
||||
match msg {
|
||||
FromEngine::Event(event) => match event {
|
||||
FromOrchestrator::BackfillSyncFinished => {
|
||||
todo!()
|
||||
}
|
||||
FromOrchestrator::BackfillSyncStarted => {
|
||||
todo!()
|
||||
}
|
||||
},
|
||||
FromEngine::Request(request) => match request {
|
||||
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
|
||||
let output = self.on_forkchoice_updated(state, payload_attrs);
|
||||
if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(Into::into)) {
|
||||
error!("Failed to send event: {err:?}");
|
||||
}
|
||||
FromOrchestrator::BackfillSyncStarted => {
|
||||
todo!()
|
||||
}
|
||||
BeaconEngineMessage::NewPayload { payload, cancun_fields, tx } => {
|
||||
let output = self.on_new_payload(payload, cancun_fields);
|
||||
if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(|e| {
|
||||
reth_beacon_consensus::BeaconOnNewPayloadError::Internal(Box::new(e))
|
||||
})) {
|
||||
error!("Failed to send event: {err:?}");
|
||||
}
|
||||
},
|
||||
FromEngine::Request(request) => match request {
|
||||
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
|
||||
let output = self.on_forkchoice_updated(state, payload_attrs);
|
||||
if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(Into::into))
|
||||
{
|
||||
error!("Failed to send event: {err:?}");
|
||||
}
|
||||
}
|
||||
BeaconEngineMessage::NewPayload { payload, cancun_fields, tx } => {
|
||||
let output = self.on_new_payload(payload, cancun_fields);
|
||||
if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(|e| {
|
||||
reth_beacon_consensus::BeaconOnNewPayloadError::Internal(Box::new(
|
||||
e,
|
||||
))
|
||||
})) {
|
||||
error!("Failed to send event: {err:?}");
|
||||
}
|
||||
}
|
||||
BeaconEngineMessage::TransitionConfigurationExchanged => {
|
||||
todo!()
|
||||
}
|
||||
},
|
||||
FromEngine::DownloadedBlocks(blocks) => {
|
||||
if let Some(event) = self.on_downloaded(blocks) {
|
||||
if let Err(err) = self.outgoing.send(EngineApiEvent::FromTree(event)) {
|
||||
error!("Failed to send event: {err:?}");
|
||||
}
|
||||
}
|
||||
BeaconEngineMessage::TransitionConfigurationExchanged => {
|
||||
todo!()
|
||||
}
|
||||
},
|
||||
FromEngine::DownloadedBlocks(blocks) => {
|
||||
if let Some(event) = self.on_downloaded(blocks) {
|
||||
if let Err(err) = self.outgoing.send(EngineApiEvent::FromTree(event)) {
|
||||
error!("Failed to send event: {err:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -719,7 +714,8 @@ where
|
||||
type Engine = T;
|
||||
|
||||
fn on_downloaded(&mut self, _blocks: Vec<SealedBlockWithSenders>) -> Option<TreeEvent> {
|
||||
todo!()
|
||||
debug!("not implemented");
|
||||
None
|
||||
}
|
||||
|
||||
fn on_new_payload(
|
||||
@ -878,3 +874,102 @@ impl PersistenceState {
|
||||
self.last_persisted_block_hash = last_persisted_block_hash;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::{persistence::PersistenceAction, test_utils::get_executed_blocks};
|
||||
|
||||
use reth_beacon_consensus::EthBeaconConsensus;
|
||||
use reth_chainspec::{ChainSpecBuilder, MAINNET};
|
||||
use reth_ethereum_engine_primitives::EthEngineTypes;
|
||||
use reth_evm::test_utils::MockExecutorProvider;
|
||||
use reth_provider::test_utils::MockEthProvider;
|
||||
use std::sync::mpsc::{channel, Sender};
|
||||
use tokio::sync::mpsc::unbounded_channel;
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
fn get_default_tree(
|
||||
persistence_handle: PersistenceHandle,
|
||||
tree_state: TreeState,
|
||||
) -> (
|
||||
EngineApiTreeHandlerImpl<MockEthProvider, MockExecutorProvider, EthEngineTypes>,
|
||||
Sender<FromEngine<BeaconEngineMessage<EthEngineTypes>>>,
|
||||
) {
|
||||
let chain_spec = Arc::new(
|
||||
ChainSpecBuilder::default()
|
||||
.chain(MAINNET.chain)
|
||||
.genesis(MAINNET.genesis.clone())
|
||||
.paris_activated()
|
||||
.build(),
|
||||
);
|
||||
let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone()));
|
||||
|
||||
let provider = MockEthProvider::default();
|
||||
let executor_factory = MockExecutorProvider::default();
|
||||
executor_factory.extend(vec![ExecutionOutcome::default()]);
|
||||
|
||||
let payload_validator = ExecutionPayloadValidator::new(chain_spec);
|
||||
|
||||
let (to_tree_tx, to_tree_rx) = channel();
|
||||
let (from_tree_tx, from_tree_rx) = unbounded_channel();
|
||||
|
||||
let engine_api_tree_state = EngineApiTreeState {
|
||||
invalid_headers: InvalidHeaderCache::new(10),
|
||||
buffer: BlockBuffer::new(10),
|
||||
tree_state,
|
||||
forkchoice_state_tracker: ForkchoiceStateTracker::default(),
|
||||
};
|
||||
|
||||
(
|
||||
EngineApiTreeHandlerImpl::new(
|
||||
provider,
|
||||
executor_factory,
|
||||
consensus,
|
||||
payload_validator,
|
||||
to_tree_rx,
|
||||
from_tree_tx,
|
||||
engine_api_tree_state,
|
||||
persistence_handle,
|
||||
),
|
||||
to_tree_tx,
|
||||
)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_tree_persist_blocks() {
|
||||
// we need more than PERSISTENCE_THRESHOLD blocks to trigger the
|
||||
// persistence task.
|
||||
let mut blocks = get_executed_blocks(PERSISTENCE_THRESHOLD + 1);
|
||||
|
||||
let mut blocks_by_hash = HashMap::new();
|
||||
let mut blocks_by_number = BTreeMap::new();
|
||||
for block in &blocks {
|
||||
blocks_by_hash.insert(block.block().hash(), block.clone());
|
||||
blocks_by_number
|
||||
.entry(block.block().number)
|
||||
.or_insert_with(Vec::new)
|
||||
.push(block.clone());
|
||||
}
|
||||
let tree_state = TreeState { blocks_by_hash, blocks_by_number };
|
||||
|
||||
let (action_tx, action_rx) = channel();
|
||||
let persistence_handle = PersistenceHandle::new(action_tx);
|
||||
|
||||
let (tree, to_tree_tx) = get_default_tree(persistence_handle, tree_state);
|
||||
std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| tree.run()).unwrap();
|
||||
|
||||
// send a message to the tree
|
||||
to_tree_tx.send(FromEngine::DownloadedBlocks(vec![])).unwrap();
|
||||
|
||||
let received_action = action_rx.recv().expect("Failed to receive saved blocks");
|
||||
if let PersistenceAction::SaveBlocks((saved_blocks, _)) = received_action {
|
||||
// only PERSISTENCE_THRESHOLD will be persisted
|
||||
blocks.pop();
|
||||
assert_eq!(saved_blocks, blocks);
|
||||
assert_eq!(saved_blocks.len() as u64, PERSISTENCE_THRESHOLD);
|
||||
} else {
|
||||
panic!("unexpected action received {received_action:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user