exex: add unit tests for exex manager (#10380)

This commit is contained in:
Thomas Coratger
2024-08-20 00:08:47 -07:00
committed by GitHub
parent 6898c30a65
commit 32fb61337f
2 changed files with 300 additions and 7 deletions

View File

@ -47,6 +47,7 @@ reth-evm-ethereum.workspace = true
reth-node-api.workspace = true
reth-provider = { workspace = true, features = ["test-utils"] }
reth-testing-utils.workspace = true
reth-primitives-traits = { workspace = true, features = ["test-utils"] }
secp256k1.workspace = true

View File

@ -432,7 +432,7 @@ impl ExExManagerHandle {
/// If this returns `false`, the owner of the handle should **NOT** send new notifications over
/// the channel until the manager is ready again, as this can lead to unbounded memory growth.
pub fn has_capacity(&self) -> bool {
self.current_capacity.load(Ordering::Relaxed) > 0
self.capacity() > 0
}
/// Returns `true` if there are `ExEx`'s installed in the node.
@ -481,18 +481,310 @@ impl Clone for ExExManagerHandle {
#[cfg(test)]
mod tests {
#[tokio::test]
async fn delivers_events() {}
use super::*;
use reth_primitives::{SealedBlockWithSenders, B256};
use reth_provider::Chain;
#[tokio::test]
async fn capacity() {}
async fn test_delivers_events() {
let (mut exex_handle, event_tx, mut _notification_rx) =
ExExHandle::new("test_exex".to_string());
// Send an event and check that it's delivered correctly
event_tx.send(ExExEvent::FinishedHeight(42)).unwrap();
let received_event = exex_handle.receiver.recv().await.unwrap();
assert_eq!(received_event, ExExEvent::FinishedHeight(42));
}
#[tokio::test]
async fn updates_block_height() {}
async fn test_has_exexs() {
let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string());
assert!(!ExExManager::new(vec![], 0).handle.has_exexs());
assert!(ExExManager::new(vec![exex_handle_1], 0).handle.has_exexs());
}
#[tokio::test]
async fn slow_exex() {}
async fn test_has_capacity() {
let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string());
assert!(!ExExManager::new(vec![], 0).handle.has_capacity());
assert!(ExExManager::new(vec![exex_handle_1], 10).handle.has_capacity());
}
#[test]
fn test_push_notification() {
let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string());
// Create a mock ExExManager and add the exex_handle to it
let mut exex_manager = ExExManager::new(vec![exex_handle], 10);
// Define the notification for testing
let mut block1 = SealedBlockWithSenders::default();
block1.block.header.set_hash(B256::new([0x01; 32]));
block1.block.header.set_block_number(10);
let notification1 = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
};
// Push the first notification
exex_manager.push_notification(notification1.clone());
// Verify the buffer contains the notification with the correct ID
assert_eq!(exex_manager.buffer.len(), 1);
assert_eq!(exex_manager.buffer.front().unwrap().0, 0);
assert_eq!(exex_manager.buffer.front().unwrap().1, notification1);
assert_eq!(exex_manager.next_id, 1);
// Push another notification
let mut block2 = SealedBlockWithSenders::default();
block2.block.header.set_hash(B256::new([0x02; 32]));
block2.block.header.set_block_number(20);
let notification2 = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![block2.clone()], Default::default(), Default::default())),
};
exex_manager.push_notification(notification2.clone());
// Verify the buffer contains both notifications with correct IDs
assert_eq!(exex_manager.buffer.len(), 2);
assert_eq!(exex_manager.buffer.front().unwrap().0, 0);
assert_eq!(exex_manager.buffer.front().unwrap().1, notification1);
assert_eq!(exex_manager.buffer.get(1).unwrap().0, 1);
assert_eq!(exex_manager.buffer.get(1).unwrap().1, notification2);
assert_eq!(exex_manager.next_id, 2);
}
#[test]
fn test_update_capacity() {
let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string());
// Create a mock ExExManager and add the exex_handle to it
let max_capacity = 5;
let mut exex_manager = ExExManager::new(vec![exex_handle], max_capacity);
// Push some notifications to fill part of the buffer
let mut block1 = SealedBlockWithSenders::default();
block1.block.header.set_hash(B256::new([0x01; 32]));
block1.block.header.set_block_number(10);
let notification1 = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
};
exex_manager.push_notification(notification1.clone());
exex_manager.push_notification(notification1);
// Update capacity
exex_manager.update_capacity();
// Verify current capacity and metrics
assert_eq!(exex_manager.current_capacity.load(Ordering::Relaxed), max_capacity - 2);
// Clear the buffer and update capacity
exex_manager.buffer.clear();
exex_manager.update_capacity();
// Verify current capacity
assert_eq!(exex_manager.current_capacity.load(Ordering::Relaxed), max_capacity);
}
#[tokio::test]
async fn is_ready() {}
async fn test_updates_block_height() {
let (exex_handle, event_tx, mut _notification_rx) =
ExExHandle::new("test_exex".to_string());
// Check initial block height
assert!(exex_handle.finished_height.is_none());
// Update the block height via an event
event_tx.send(ExExEvent::FinishedHeight(42)).unwrap();
// Create a mock ExExManager and add the exex_handle to it
let exex_manager = ExExManager::new(vec![exex_handle], 10);
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
// Pin the ExExManager to call the poll method
let mut pinned_manager = std::pin::pin!(exex_manager);
let _ = pinned_manager.as_mut().poll(&mut cx);
// Check that the block height was updated
let updated_exex_handle = &pinned_manager.exex_handles[0];
assert_eq!(updated_exex_handle.finished_height, Some(42));
}
#[tokio::test]
async fn test_exex_manager_capacity() {
let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string());
// Create an ExExManager with a small max capacity
let max_capacity = 2;
let mut exex_manager = ExExManager::new(vec![exex_handle_1], max_capacity);
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
// Setup a notification
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![Default::default()],
Default::default(),
Default::default(),
)),
};
// Send notifications to go over the max capacity
exex_manager.handle.exex_tx.send(notification.clone()).unwrap();
exex_manager.handle.exex_tx.send(notification.clone()).unwrap();
exex_manager.handle.exex_tx.send(notification).unwrap();
// Pin the ExExManager to call the poll method
let mut pinned_manager = std::pin::pin!(exex_manager);
// Before polling, the next notification ID should be 0 and the buffer should be empty
assert_eq!(pinned_manager.next_id, 0);
assert_eq!(pinned_manager.buffer.len(), 0);
let _ = pinned_manager.as_mut().poll(&mut cx);
// After polling, the next notification ID and buffer size should be updated
assert_eq!(pinned_manager.next_id, 2);
assert_eq!(pinned_manager.buffer.len(), 2);
}
#[tokio::test]
async fn exex_handle_new() {
let (mut exex_handle, _, mut notification_rx) = ExExHandle::new("test_exex".to_string());
// Check initial state
assert_eq!(exex_handle.id, "test_exex");
assert_eq!(exex_handle.next_notification_id, 0);
// Setup two blocks for the chain commit notification
let mut block1 = SealedBlockWithSenders::default();
block1.block.header.set_hash(B256::new([0x01; 32]));
block1.block.header.set_block_number(10);
let mut block2 = SealedBlockWithSenders::default();
block2.block.header.set_hash(B256::new([0x02; 32]));
block2.block.header.set_block_number(11);
// Setup a notification
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![block1.clone(), block2.clone()],
Default::default(),
Default::default(),
)),
};
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
// Send a notification and ensure it's received correctly
match exex_handle.send(&mut cx, &(22, notification.clone())) {
Poll::Ready(Ok(())) => {
let received_notification = notification_rx.recv().await.unwrap();
assert_eq!(received_notification, notification);
}
Poll::Pending => panic!("Notification send is pending"),
Poll::Ready(Err(e)) => panic!("Failed to send notification: {:?}", e),
}
// Ensure the notification ID was incremented
assert_eq!(exex_handle.next_notification_id, 23);
}
#[tokio::test]
async fn test_notification_if_finished_height_gt_chain_tip() {
let (mut exex_handle, _, mut notification_rx) = ExExHandle::new("test_exex".to_string());
// Set finished_height to a value higher than the block tip
exex_handle.finished_height = Some(15);
let mut block1 = SealedBlockWithSenders::default();
block1.block.header.set_hash(B256::new([0x01; 32]));
block1.block.header.set_block_number(10);
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
};
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
// Send the notification
match exex_handle.send(&mut cx, &(22, notification)) {
Poll::Ready(Ok(())) => {
// The notification should be skipped, so nothing should be sent.
// Check that the receiver channel is indeed empty
assert!(notification_rx.try_recv().is_err(), "Receiver channel should be empty");
}
Poll::Pending | Poll::Ready(Err(_)) => {
panic!("Notification should not be pending or fail");
}
}
// Ensure the notification ID was still incremented
assert_eq!(exex_handle.next_notification_id, 23);
}
#[tokio::test]
async fn test_sends_chain_reorged_notification() {
let (mut exex_handle, _, mut notification_rx) = ExExHandle::new("test_exex".to_string());
let notification = ExExNotification::ChainReorged {
old: Arc::new(Chain::default()),
new: Arc::new(Chain::default()),
};
// Even if the finished height is higher than the tip of the new chain, the reorg
// notification should be received
exex_handle.finished_height = Some(u64::MAX);
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
// Send the notification
match exex_handle.send(&mut cx, &(22, notification.clone())) {
Poll::Ready(Ok(())) => {
let received_notification = notification_rx.recv().await.unwrap();
assert_eq!(received_notification, notification);
}
Poll::Pending | Poll::Ready(Err(_)) => {
panic!("Notification should not be pending or fail")
}
}
// Ensure the notification ID was incremented
assert_eq!(exex_handle.next_notification_id, 23);
}
#[tokio::test]
async fn test_sends_chain_reverted_notification() {
let (mut exex_handle, _, mut notification_rx) = ExExHandle::new("test_exex".to_string());
let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) };
// Even if the finished height is higher than the tip of the new chain, the reorg
// notification should be received
exex_handle.finished_height = Some(u64::MAX);
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
// Send the notification
match exex_handle.send(&mut cx, &(22, notification.clone())) {
Poll::Ready(Ok(())) => {
let received_notification = notification_rx.recv().await.unwrap();
assert_eq!(received_notification, notification);
}
Poll::Pending | Poll::Ready(Err(_)) => {
panic!("Notification should not be pending or fail")
}
}
// Ensure the notification ID was incremented
assert_eq!(exex_handle.next_notification_id, 23);
}
}