feat(exex): subscribe to notifications explicitly (#10573)

This commit is contained in:
Alexey Shekhirin
2024-09-06 15:24:23 +01:00
committed by GitHub
parent 3ec5d373c1
commit a89de219c9
13 changed files with 209 additions and 45 deletions

1
Cargo.lock generated
View File

@ -7308,6 +7308,7 @@ name = "reth-exex-types"
version = "1.0.6" version = "1.0.6"
dependencies = [ dependencies = [
"alloy-primitives", "alloy-primitives",
"reth-primitives",
"reth-provider", "reth-provider",
"serde", "serde",
] ]

View File

@ -24,7 +24,9 @@ reth = { git = "https://github.com/paradigmxyz/reth.git" } # Reth
reth-exex = { git = "https://github.com/paradigmxyz/reth.git" } # Execution Extensions reth-exex = { git = "https://github.com/paradigmxyz/reth.git" } # Execution Extensions
reth-node-ethereum = { git = "https://github.com/paradigmxyz/reth.git" } # Ethereum Node implementation reth-node-ethereum = { git = "https://github.com/paradigmxyz/reth.git" } # Ethereum Node implementation
reth-tracing = { git = "https://github.com/paradigmxyz/reth.git" } # Logging reth-tracing = { git = "https://github.com/paradigmxyz/reth.git" } # Logging
eyre = "0.6" # Easy error handling eyre = "0.6" # Easy error handling
futures-util = "0.3" # Stream utilities for consuming notifications
``` ```
### Default Reth node ### Default Reth node
@ -101,13 +103,14 @@ If you try running a node with an ExEx that exits, the node will exit as well.
Now, let's extend our simplest ExEx and start actually listening to new notifications, log them, and send events back to the main node Now, let's extend our simplest ExEx and start actually listening to new notifications, log them, and send events back to the main node
```rust,norun,noplayground,ignore ```rust,norun,noplayground,ignore
use futures_util::StreamExt;
use reth::api::FullNodeComponents; use reth::api::FullNodeComponents;
use reth_exex::{ExExContext, ExExEvent, ExExNotification}; use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_ethereum::EthereumNode; use reth_node_ethereum::EthereumNode;
use reth_tracing::tracing::info; use reth_tracing::tracing::info;
async fn my_exex<Node: FullNodeComponents>(mut ctx: ExExContext<Node>) -> eyre::Result<()> { async fn my_exex<Node: FullNodeComponents>(mut ctx: ExExContext<Node>) -> eyre::Result<()> {
while let Some(notification) = ctx.notifications.recv().await { while let Some(notification) = ctx.notifications.next().await {
match &notification { match &notification {
ExExNotification::ChainCommitted { new } => { ExExNotification::ChainCommitted { new } => {
info!(committed_chain = ?new.range(), "Received commit"); info!(committed_chain = ?new.range(), "Received commit");

View File

@ -268,13 +268,15 @@ Don't forget to emit `ExExEvent::FinishedHeight`
```rust,norun,noplayground,ignore ```rust,norun,noplayground,ignore
// ... // ...
use futures_util::StreamExt;
use reth_exex::{ExExContext, ExExEvent}; use reth_exex::{ExExContext, ExExEvent};
async fn remote_exex<Node: FullNodeComponents>( async fn remote_exex<Node: FullNodeComponents>(
mut ctx: ExExContext<Node>, mut ctx: ExExContext<Node>,
notifications: Arc<broadcast::Sender<ExExNotification>>, notifications: Arc<broadcast::Sender<ExExNotification>>,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
while let Some(notification) = ctx.notifications.recv().await { while let Some(notification) = ctx.notifications.next().await {
if let Some(committed_chain) = notification.committed_chain() { if let Some(committed_chain) = notification.committed_chain() {
ctx.events ctx.events
.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?; .send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
@ -332,6 +334,9 @@ fn main() -> eyre::Result<()> {
<summary>Click to expand</summary> <summary>Click to expand</summary>
```rust,norun,noplayground,ignore ```rust,norun,noplayground,ignore
use std::sync::Arc;
use futures_util::StreamExt;
use remote_exex::proto::{ use remote_exex::proto::{
self, self,
remote_ex_ex_server::{RemoteExEx, RemoteExExServer}, remote_ex_ex_server::{RemoteExEx, RemoteExExServer},
@ -340,7 +345,6 @@ use reth::api::FullNodeComponents;
use reth_exex::{ExExContext, ExExEvent, ExExNotification}; use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_ethereum::EthereumNode; use reth_node_ethereum::EthereumNode;
use reth_tracing::tracing::info; use reth_tracing::tracing::info;
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc}; use tokio::sync::{broadcast, mpsc};
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use tonic::{transport::Server, Request, Response, Status}; use tonic::{transport::Server, Request, Response, Status};
@ -381,7 +385,7 @@ async fn remote_exex<Node: FullNodeComponents>(
mut ctx: ExExContext<Node>, mut ctx: ExExContext<Node>,
notifications: Arc<broadcast::Sender<ExExNotification>>, notifications: Arc<broadcast::Sender<ExExNotification>>,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
while let Some(notification) = ctx.notifications.recv().await { while let Some(notification) = ctx.notifications.next().await {
if let Some(committed_chain) = notification.committed_chain() { if let Some(committed_chain) = notification.committed_chain() {
ctx.events ctx.events
.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?; .send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;

View File

@ -25,6 +25,7 @@ use std::{
task::{ready, Context, Poll}, task::{ready, Context, Poll},
}; };
use futures_util::StreamExt;
use reth::api::FullNodeComponents; use reth::api::FullNodeComponents;
use reth_exex::{ExExContext, ExExEvent, ExExNotification}; use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_ethereum::EthereumNode; use reth_node_ethereum::EthereumNode;
@ -40,7 +41,7 @@ impl<Node: FullNodeComponents> Future for MyExEx<Node> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut(); let this = self.get_mut();
while let Some(notification) = ready!(this.ctx.notifications.poll_recv(cx)) { while let Some(notification) = ready!(this.ctx.notifications.poll_next_unpin(cx)) {
match &notification { match &notification {
ExExNotification::ChainCommitted { new } => { ExExNotification::ChainCommitted { new } => {
info!(committed_chain = ?new.range(), "Received commit"); info!(committed_chain = ?new.range(), "Received commit");
@ -101,6 +102,7 @@ use std::{
task::{ready, Context, Poll}, task::{ready, Context, Poll},
}; };
use futures_util::StreamExt;
use reth::{api::FullNodeComponents, primitives::BlockNumber}; use reth::{api::FullNodeComponents, primitives::BlockNumber};
use reth_exex::{ExExContext, ExExEvent}; use reth_exex::{ExExContext, ExExEvent};
use reth_node_ethereum::EthereumNode; use reth_node_ethereum::EthereumNode;
@ -130,7 +132,7 @@ impl<Node: FullNodeComponents> Future for MyExEx<Node> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut(); let this = self.get_mut();
while let Some(notification) = ready!(this.ctx.notifications.poll_recv(cx)) { while let Some(notification) = ready!(this.ctx.notifications.poll_next_unpin(cx)) {
if let Some(reverted_chain) = notification.reverted_chain() { if let Some(reverted_chain) = notification.reverted_chain() {
this.transactions = this.transactions.saturating_sub( this.transactions = this.transactions.saturating_sub(
reverted_chain reverted_chain

View File

@ -32,7 +32,7 @@ operating system:
These are needed to build bindings for Reth's database. These are needed to build bindings for Reth's database.
The Minimum Supported Rust Version (MSRV) of this project is 1.81.0. If you already have a version of Rust installed, The Minimum Supported Rust Version (MSRV) of this project is 1.80.0. If you already have a version of Rust installed,
you can check your version by running `rustc --version`. To update your version of Rust, run `rustup update`. you can check your version by running `rustc --version`. To update your version of Rust, run `rustup update`.
## Build Reth ## Build Reth

View File

@ -20,8 +20,8 @@ reth-metrics.workspace = true
reth-node-api.workspace = true reth-node-api.workspace = true
reth-node-core.workspace = true reth-node-core.workspace = true
reth-payload-builder.workspace = true reth-payload-builder.workspace = true
reth-primitives-traits.workspace = true
reth-primitives = { workspace = true, features = ["secp256k1"] } reth-primitives = { workspace = true, features = ["secp256k1"] }
reth-primitives-traits.workspace = true
reth-provider.workspace = true reth-provider.workspace = true
reth-prune-types.workspace = true reth-prune-types.workspace = true
reth-revm.workspace = true reth-revm.workspace = true
@ -45,9 +45,9 @@ reth-db-api.workspace = true
reth-db-common.workspace = true reth-db-common.workspace = true
reth-evm-ethereum.workspace = true reth-evm-ethereum.workspace = true
reth-node-api.workspace = true reth-node-api.workspace = true
reth-primitives-traits = { workspace = true, features = ["test-utils"] }
reth-provider = { workspace = true, features = ["test-utils"] } reth-provider = { workspace = true, features = ["test-utils"] }
reth-testing-utils.workspace = true reth-testing-utils.workspace = true
reth-primitives-traits = { workspace = true, features = ["test-utils"] }
secp256k1.workspace = true secp256k1.workspace = true

View File

@ -4,9 +4,9 @@ use reth_node_api::{FullNodeComponents, NodeTypesWithEngine};
use reth_node_core::node_config::NodeConfig; use reth_node_core::node_config::NodeConfig;
use reth_primitives::Head; use reth_primitives::Head;
use reth_tasks::TaskExecutor; use reth_tasks::TaskExecutor;
use tokio::sync::mpsc::{Receiver, UnboundedSender}; use tokio::sync::mpsc::UnboundedSender;
use crate::{ExExEvent, ExExNotification}; use crate::{ExExEvent, ExExNotifications};
/// Captures the context that an `ExEx` has access to. /// Captures the context that an `ExEx` has access to.
pub struct ExExContext<Node: FullNodeComponents> { pub struct ExExContext<Node: FullNodeComponents> {
@ -24,13 +24,13 @@ pub struct ExExContext<Node: FullNodeComponents> {
/// Additionally, the exex can pre-emptively emit a `FinishedHeight` event to specify what /// Additionally, the exex can pre-emptively emit a `FinishedHeight` event to specify what
/// blocks to receive notifications for. /// blocks to receive notifications for.
pub events: UnboundedSender<ExExEvent>, pub events: UnboundedSender<ExExEvent>,
/// Channel to receive [`ExExNotification`]s. /// Channel to receive [`ExExNotification`](crate::ExExNotification)s.
/// ///
/// # Important /// # Important
/// ///
/// Once a an [`ExExNotification`] is sent over the channel, it is considered delivered by the /// Once an [`ExExNotification`](crate::ExExNotification) is sent over the channel, it is
/// node. /// considered delivered by the node.
pub notifications: Receiver<ExExNotification>, pub notifications: ExExNotifications<Node>,
/// node components /// node components
pub components: Node, pub components: Node,

View File

@ -1,10 +1,13 @@
use crate::{ExExEvent, ExExNotification, FinishedExExHeight}; use crate::{ExExEvent, ExExNotification, FinishedExExHeight};
use futures::Stream;
use metrics::Gauge; use metrics::Gauge;
use reth_exex_types::ExExHead;
use reth_metrics::{metrics::Counter, Metrics}; use reth_metrics::{metrics::Counter, Metrics};
use reth_primitives::BlockNumber; use reth_primitives::BlockNumber;
use reth_tracing::tracing::debug; use reth_tracing::tracing::debug;
use std::{ use std::{
collections::VecDeque, collections::VecDeque,
fmt::Debug,
future::{poll_fn, Future}, future::{poll_fn, Future},
pin::Pin, pin::Pin,
sync::{ sync::{
@ -40,14 +43,12 @@ pub struct ExExHandle {
id: String, id: String,
/// Metrics for an `ExEx`. /// Metrics for an `ExEx`.
metrics: ExExMetrics, metrics: ExExMetrics,
/// Channel to send [`ExExNotification`]s to the `ExEx`. /// Channel to send [`ExExNotification`]s to the `ExEx`.
sender: PollSender<ExExNotification>, sender: PollSender<ExExNotification>,
/// Channel to receive [`ExExEvent`]s from the `ExEx`. /// Channel to receive [`ExExEvent`]s from the `ExEx`.
receiver: UnboundedReceiver<ExExEvent>, receiver: UnboundedReceiver<ExExEvent>,
/// The ID of the next notification to send to this `ExEx`. /// The ID of the next notification to send to this `ExEx`.
next_notification_id: usize, next_notification_id: usize,
/// The finished block number of the `ExEx`. /// The finished block number of the `ExEx`.
/// ///
/// If this is `None`, the `ExEx` has not emitted a `FinishedHeight` event. /// If this is `None`, the `ExEx` has not emitted a `FinishedHeight` event.
@ -59,9 +60,13 @@ impl ExExHandle {
/// ///
/// Returns the handle, as well as a [`UnboundedSender`] for [`ExExEvent`]s and a /// Returns the handle, as well as a [`UnboundedSender`] for [`ExExEvent`]s and a
/// [`Receiver`] for [`ExExNotification`]s that should be given to the `ExEx`. /// [`Receiver`] for [`ExExNotification`]s that should be given to the `ExEx`.
pub fn new(id: String) -> (Self, UnboundedSender<ExExEvent>, Receiver<ExExNotification>) { pub fn new<Node>(
id: String,
components: Node,
) -> (Self, UnboundedSender<ExExEvent>, ExExNotifications<Node>) {
let (notification_tx, notification_rx) = mpsc::channel(1); let (notification_tx, notification_rx) = mpsc::channel(1);
let (event_tx, event_rx) = mpsc::unbounded_channel(); let (event_tx, event_rx) = mpsc::unbounded_channel();
let notifications = ExExNotifications { components, notifications: notification_rx };
( (
Self { Self {
@ -73,7 +78,7 @@ impl ExExHandle {
finished_height: None, finished_height: None,
}, },
event_tx, event_tx,
notification_rx, notifications,
) )
} }
@ -139,6 +144,133 @@ impl ExExHandle {
} }
} }
/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks.
pub struct ExExNotifications<Node> {
components: Node,
notifications: Receiver<ExExNotification>,
}
impl<Node> Debug for ExExNotifications<Node> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ExExNotifications")
.field("components", &"...")
.field("notifications", &self.notifications)
.finish()
}
}
impl<Node> ExExNotifications<Node> {
/// Creates a new instance of [`ExExNotifications`].
pub const fn new(components: Node, notifications: Receiver<ExExNotification>) -> Self {
Self { components, notifications }
}
/// Receives the next value for this receiver.
///
/// This method returns `None` if the channel has been closed and there are
/// no remaining messages in the channel's buffer. This indicates that no
/// further values can ever be received from this `Receiver`. The channel is
/// closed when all senders have been dropped, or when [`Receiver::close`] is called.
///
/// # Cancel safety
///
/// This method is cancel safe. If `recv` is used as the event in a
/// [`tokio::select!`] statement and some other branch
/// completes first, it is guaranteed that no messages were received on this
/// channel.
///
/// For full documentation, see [`Receiver::recv`].
#[deprecated(note = "use `ExExNotifications::next` and its `Stream` implementation instead")]
pub async fn recv(&mut self) -> Option<ExExNotification> {
self.notifications.recv().await
}
/// Polls to receive the next message on this channel.
///
/// This method returns:
///
/// * `Poll::Pending` if no messages are available but the channel is not closed, or if a
/// spurious failure happens.
/// * `Poll::Ready(Some(message))` if a message is available.
/// * `Poll::Ready(None)` if the channel has been closed and all messages sent before it was
/// closed have been received.
///
/// When the method returns `Poll::Pending`, the `Waker` in the provided
/// `Context` is scheduled to receive a wakeup when a message is sent on any
/// receiver, or when the channel is closed. Note that on multiple calls to
/// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
/// passed to the most recent call is scheduled to receive a wakeup.
///
/// If this method returns `Poll::Pending` due to a spurious failure, then
/// the `Waker` will be notified when the situation causing the spurious
/// failure has been resolved. Note that receiving such a wakeup does not
/// guarantee that the next call will succeed — it could fail with another
/// spurious failure.
///
/// For full documentation, see [`Receiver::poll_recv`].
#[deprecated(
note = "use `ExExNotifications::poll_next` and its `Stream` implementation instead"
)]
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<ExExNotification>> {
self.notifications.poll_recv(cx)
}
// TODO(alexey): make it public when backfill is implemented in [`ExExNotificationsWithHead`]
/// Subscribe to notifications with the given head.
///
/// Notifications will be sent starting from the head, not inclusive. For example, if
/// `head.number == 10`, then the first notification will be with `block.number == 11`.
#[allow(dead_code)]
fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead<Node> {
ExExNotificationsWithHead {
components: self.components,
notifications: self.notifications,
head,
}
}
}
impl<Node: Unpin> Stream for ExExNotifications<Node> {
type Item = ExExNotification;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.get_mut().notifications.poll_recv(cx)
}
}
/// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that are
/// committed or reverted after the given head.
#[derive(Debug)]
pub struct ExExNotificationsWithHead<Node> {
#[allow(dead_code)]
components: Node,
notifications: Receiver<ExExNotification>,
head: ExExHead,
}
impl<Node: Unpin> Stream for ExExNotificationsWithHead<Node> {
type Item = ExExNotification;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
// TODO(alexey): backfill according to the head
loop {
let Some(notification) = ready!(this.notifications.poll_recv(cx)) else {
return Poll::Ready(None)
};
if notification
.committed_chain()
.or_else(|| notification.reverted_chain())
.map_or(false, |chain| chain.first().number > this.head.block.number)
{
return Poll::Ready(Some(notification))
}
}
}
}
/// Metrics for the `ExEx` manager. /// Metrics for the `ExEx` manager.
#[derive(Metrics)] #[derive(Metrics)]
#[metrics(scope = "exex_manager")] #[metrics(scope = "exex_manager")]
@ -473,13 +605,14 @@ impl Clone for ExExManagerHandle {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use futures::StreamExt;
use reth_primitives::{SealedBlockWithSenders, B256}; use reth_primitives::{SealedBlockWithSenders, B256};
use reth_provider::Chain; use reth_provider::Chain;
#[tokio::test] #[tokio::test]
async fn test_delivers_events() { async fn test_delivers_events() {
let (mut exex_handle, event_tx, mut _notification_rx) = let (mut exex_handle, event_tx, mut _notification_rx) =
ExExHandle::new("test_exex".to_string()); ExExHandle::new("test_exex".to_string(), ());
// Send an event and check that it's delivered correctly // Send an event and check that it's delivered correctly
event_tx.send(ExExEvent::FinishedHeight(42)).unwrap(); event_tx.send(ExExEvent::FinishedHeight(42)).unwrap();
@ -489,7 +622,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_has_exexs() { async fn test_has_exexs() {
let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string()); let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string(), ());
assert!(!ExExManager::new(vec![], 0).handle.has_exexs()); assert!(!ExExManager::new(vec![], 0).handle.has_exexs());
@ -498,7 +631,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_has_capacity() { async fn test_has_capacity() {
let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string()); let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string(), ());
assert!(!ExExManager::new(vec![], 0).handle.has_capacity()); assert!(!ExExManager::new(vec![], 0).handle.has_capacity());
@ -507,7 +640,7 @@ mod tests {
#[test] #[test]
fn test_push_notification() { fn test_push_notification() {
let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string()); let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), ());
// Create a mock ExExManager and add the exex_handle to it // Create a mock ExExManager and add the exex_handle to it
let mut exex_manager = ExExManager::new(vec![exex_handle], 10); let mut exex_manager = ExExManager::new(vec![exex_handle], 10);
@ -552,7 +685,7 @@ mod tests {
#[test] #[test]
fn test_update_capacity() { fn test_update_capacity() {
let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string()); let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), ());
// Create a mock ExExManager and add the exex_handle to it // Create a mock ExExManager and add the exex_handle to it
let max_capacity = 5; let max_capacity = 5;
@ -587,7 +720,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_updates_block_height() { async fn test_updates_block_height() {
let (exex_handle, event_tx, mut _notification_rx) = let (exex_handle, event_tx, mut _notification_rx) =
ExExHandle::new("test_exex".to_string()); ExExHandle::new("test_exex".to_string(), ());
// Check initial block height // Check initial block height
assert!(exex_handle.finished_height.is_none()); assert!(exex_handle.finished_height.is_none());
@ -624,8 +757,8 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_updates_block_height_lower() { async fn test_updates_block_height_lower() {
// Create two `ExExHandle` instances // Create two `ExExHandle` instances
let (exex_handle1, event_tx1, _) = ExExHandle::new("test_exex1".to_string()); let (exex_handle1, event_tx1, _) = ExExHandle::new("test_exex1".to_string(), ());
let (exex_handle2, event_tx2, _) = ExExHandle::new("test_exex2".to_string()); let (exex_handle2, event_tx2, _) = ExExHandle::new("test_exex2".to_string(), ());
// Send events to update the block heights of the two handles, with the second being lower // Send events to update the block heights of the two handles, with the second being lower
event_tx1.send(ExExEvent::FinishedHeight(42)).unwrap(); event_tx1.send(ExExEvent::FinishedHeight(42)).unwrap();
@ -655,8 +788,8 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_updates_block_height_greater() { async fn test_updates_block_height_greater() {
// Create two `ExExHandle` instances // Create two `ExExHandle` instances
let (exex_handle1, event_tx1, _) = ExExHandle::new("test_exex1".to_string()); let (exex_handle1, event_tx1, _) = ExExHandle::new("test_exex1".to_string(), ());
let (exex_handle2, event_tx2, _) = ExExHandle::new("test_exex2".to_string()); let (exex_handle2, event_tx2, _) = ExExHandle::new("test_exex2".to_string(), ());
// Assert that the initial block height is `None` for the first `ExExHandle`. // Assert that the initial block height is `None` for the first `ExExHandle`.
assert!(exex_handle1.finished_height.is_none()); assert!(exex_handle1.finished_height.is_none());
@ -692,7 +825,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_exex_manager_capacity() { async fn test_exex_manager_capacity() {
let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string()); let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string(), ());
// Create an ExExManager with a small max capacity // Create an ExExManager with a small max capacity
let max_capacity = 2; let max_capacity = 2;
@ -730,7 +863,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn exex_handle_new() { async fn exex_handle_new() {
let (mut exex_handle, _, mut notification_rx) = ExExHandle::new("test_exex".to_string()); let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string(), ());
// Check initial state // Check initial state
assert_eq!(exex_handle.id, "test_exex"); assert_eq!(exex_handle.id, "test_exex");
@ -759,7 +892,7 @@ mod tests {
// Send a notification and ensure it's received correctly // Send a notification and ensure it's received correctly
match exex_handle.send(&mut cx, &(22, notification.clone())) { match exex_handle.send(&mut cx, &(22, notification.clone())) {
Poll::Ready(Ok(())) => { Poll::Ready(Ok(())) => {
let received_notification = notification_rx.recv().await.unwrap(); let received_notification = notifications.next().await.unwrap();
assert_eq!(received_notification, notification); assert_eq!(received_notification, notification);
} }
Poll::Pending => panic!("Notification send is pending"), Poll::Pending => panic!("Notification send is pending"),
@ -772,7 +905,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_notification_if_finished_height_gt_chain_tip() { async fn test_notification_if_finished_height_gt_chain_tip() {
let (mut exex_handle, _, mut notification_rx) = ExExHandle::new("test_exex".to_string()); let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string(), ());
// Set finished_height to a value higher than the block tip // Set finished_height to a value higher than the block tip
exex_handle.finished_height = Some(15); exex_handle.finished_height = Some(15);
@ -790,9 +923,17 @@ mod tests {
// Send the notification // Send the notification
match exex_handle.send(&mut cx, &(22, notification)) { match exex_handle.send(&mut cx, &(22, notification)) {
Poll::Ready(Ok(())) => { Poll::Ready(Ok(())) => {
poll_fn(|cx| {
// The notification should be skipped, so nothing should be sent. // The notification should be skipped, so nothing should be sent.
// Check that the receiver channel is indeed empty // Check that the receiver channel is indeed empty
assert!(notification_rx.try_recv().is_err(), "Receiver channel should be empty"); assert_eq!(
notifications.poll_next_unpin(cx),
Poll::Pending,
"Receiver channel should be empty"
);
Poll::Ready(())
})
.await;
} }
Poll::Pending | Poll::Ready(Err(_)) => { Poll::Pending | Poll::Ready(Err(_)) => {
panic!("Notification should not be pending or fail"); panic!("Notification should not be pending or fail");
@ -805,7 +946,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_sends_chain_reorged_notification() { async fn test_sends_chain_reorged_notification() {
let (mut exex_handle, _, mut notification_rx) = ExExHandle::new("test_exex".to_string()); let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string(), ());
let notification = ExExNotification::ChainReorged { let notification = ExExNotification::ChainReorged {
old: Arc::new(Chain::default()), old: Arc::new(Chain::default()),
@ -821,7 +962,7 @@ mod tests {
// Send the notification // Send the notification
match exex_handle.send(&mut cx, &(22, notification.clone())) { match exex_handle.send(&mut cx, &(22, notification.clone())) {
Poll::Ready(Ok(())) => { Poll::Ready(Ok(())) => {
let received_notification = notification_rx.recv().await.unwrap(); let received_notification = notifications.next().await.unwrap();
assert_eq!(received_notification, notification); assert_eq!(received_notification, notification);
} }
Poll::Pending | Poll::Ready(Err(_)) => { Poll::Pending | Poll::Ready(Err(_)) => {
@ -835,7 +976,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_sends_chain_reverted_notification() { async fn test_sends_chain_reverted_notification() {
let (mut exex_handle, _, mut notification_rx) = ExExHandle::new("test_exex".to_string()); let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string(), ());
let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) }; let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) };
@ -848,7 +989,7 @@ mod tests {
// Send the notification // Send the notification
match exex_handle.send(&mut cx, &(22, notification.clone())) { match exex_handle.send(&mut cx, &(22, notification.clone())) {
Poll::Ready(Ok(())) => { Poll::Ready(Ok(())) => {
let received_notification = notification_rx.recv().await.unwrap(); let received_notification = notifications.next().await.unwrap();
assert_eq!(received_notification, notification); assert_eq!(received_notification, notification);
} }
Poll::Pending | Poll::Ready(Err(_)) => { Poll::Pending | Poll::Ready(Err(_)) => {

View File

@ -19,7 +19,7 @@ use reth_db::{
use reth_db_common::init::init_genesis; use reth_db_common::init::init_genesis;
use reth_evm::test_utils::MockExecutorProvider; use reth_evm::test_utils::MockExecutorProvider;
use reth_execution_types::Chain; use reth_execution_types::Chain;
use reth_exex::{ExExContext, ExExEvent, ExExNotification}; use reth_exex::{ExExContext, ExExEvent, ExExNotification, ExExNotifications};
use reth_network::{config::SecretKey, NetworkConfigBuilder, NetworkManager}; use reth_network::{config::SecretKey, NetworkConfigBuilder, NetworkManager};
use reth_node_api::{ use reth_node_api::{
FullNodeTypes, FullNodeTypesAdapter, NodeTypes, NodeTypesWithDBAdapter, NodeTypesWithEngine, FullNodeTypes, FullNodeTypesAdapter, NodeTypes, NodeTypesWithDBAdapter, NodeTypesWithEngine,
@ -296,13 +296,14 @@ pub async fn test_exex_context_with_chain_spec(
let (events_tx, events_rx) = tokio::sync::mpsc::unbounded_channel(); let (events_tx, events_rx) = tokio::sync::mpsc::unbounded_channel();
let (notifications_tx, notifications_rx) = tokio::sync::mpsc::channel(1); let (notifications_tx, notifications_rx) = tokio::sync::mpsc::channel(1);
let notifications = ExExNotifications::new(components.clone(), notifications_rx);
let ctx = ExExContext { let ctx = ExExContext {
head, head,
config: NodeConfig::test(), config: NodeConfig::test(),
reth_config: reth_config::Config::default(), reth_config: reth_config::Config::default(),
events: events_tx, events: events_tx,
notifications: notifications_rx, notifications,
components, components,
}; };

View File

@ -13,6 +13,7 @@ workspace = true
[dependencies] [dependencies]
# reth # reth
reth-primitives.workspace = true
reth-provider.workspace = true reth-provider.workspace = true
# reth # reth

View File

@ -0,0 +1,9 @@
use reth_primitives::BlockNumHash;
#[allow(clippy::doc_markdown)]
/// A head of the ExEx. It determines the highest block committed to the internal ExEx state.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ExExHead {
/// The head block.
pub block: BlockNumHash,
}

View File

@ -9,7 +9,9 @@
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
mod finished_height; mod finished_height;
mod head;
mod notification; mod notification;
pub use finished_height::FinishedExExHeight; pub use finished_height::FinishedExExHeight;
pub use head::ExExHead;
pub use notification::ExExNotification; pub use notification::ExExNotification;

View File

@ -48,7 +48,7 @@ impl<Node: FullNodeComponents + Clone> ExExLauncher<Node> {
for (id, exex) in extensions { for (id, exex) in extensions {
// create a new exex handle // create a new exex handle
let (handle, events, notifications) = ExExHandle::new(id.clone()); let (handle, events, notifications) = ExExHandle::new(id.clone(), components.clone());
exex_handles.push(handle); exex_handles.push(handle);
// create the launch context for the exex // create the launch context for the exex