feat(exex): send ExExNotification instead of CanonStateNotification (#7803)

This commit is contained in:
Alexey Shekhirin
2024-04-23 11:05:46 +01:00
committed by GitHub
parent c499797a6c
commit d6b861ea5d
10 changed files with 148 additions and 89 deletions

2
Cargo.lock generated
View File

@ -4723,7 +4723,7 @@ dependencies = [
"reth-node-core",
"reth-node-ethereum",
"reth-primitives",
"reth-provider",
"reth-tracing",
"tokio",
]

View File

@ -4,11 +4,10 @@ use reth_node_core::{
node_config::NodeConfig,
};
use reth_primitives::Head;
use reth_provider::CanonStateNotification;
use reth_tasks::TaskExecutor;
use tokio::sync::mpsc::{Receiver, UnboundedSender};
use crate::ExExEvent;
use crate::{ExExEvent, ExExNotification};
/// Captures the context that an ExEx has access to.
#[derive(Debug)]
@ -35,12 +34,11 @@ pub struct ExExContext<Node: FullNodeComponents> {
/// Additionally, the exex can pre-emptively emit a `FinishedHeight` event to specify what
/// blocks to receive notifications for.
pub events: UnboundedSender<ExExEvent>,
/// Channel to receive [`CanonStateNotification`]s on state transitions.
/// Channel to receive [`ExExNotification`]s.
///
/// # Important
///
/// Once a `CanonStateNotification` is sent over the channel, it is considered delivered by the
/// Once a an [`ExExNotification`] is sent over the channel, it is considered delivered by the
/// node.
pub notifications: Receiver<CanonStateNotification>,
// TODO(alexey): add pool, payload builder, anything else?
pub notifications: Receiver<ExExNotification>,
}

View File

@ -42,3 +42,6 @@ pub use event::*;
mod manager;
pub use manager::*;
mod notification;
pub use notification::*;

View File

@ -1,8 +1,7 @@
use crate::ExExEvent;
use crate::{ExExEvent, ExExNotification};
use metrics::Gauge;
use reth_metrics::{metrics::Counter, Metrics};
use reth_primitives::{BlockNumber, FinishedExExHeight};
use reth_provider::CanonStateNotification;
use reth_tracing::tracing::debug;
use std::{
collections::VecDeque,
@ -24,7 +23,7 @@ use tokio_util::sync::{PollSendError, PollSender, ReusableBoxFuture};
#[derive(Metrics)]
#[metrics(scope = "exex")]
struct ExExMetrics {
/// The total number of canonical state notifications sent to an ExEx.
/// The total number of notifications sent to an ExEx.
notifications_sent_total: Counter,
/// The total number of events an ExEx has sent to the manager.
events_sent_total: Counter,
@ -42,8 +41,8 @@ pub struct ExExHandle {
/// Metrics for an ExEx.
metrics: ExExMetrics,
/// Channel to send [`CanonStateNotification`]s to the ExEx.
sender: PollSender<CanonStateNotification>,
/// Channel to send [`ExExNotification`]s to the ExEx.
sender: PollSender<ExExNotification>,
/// Channel to receive [`ExExEvent`]s from the ExEx.
receiver: UnboundedReceiver<ExExEvent>,
/// The ID of the next notification to send to this ExEx.
@ -59,22 +58,22 @@ impl ExExHandle {
/// Create a new handle for the given ExEx.
///
/// Returns the handle, as well as a [`UnboundedSender`] for [`ExExEvent`]s and a
/// [`Receiver`] for [`CanonStateNotification`]s that should be given to the ExEx.
pub fn new(id: String) -> (Self, UnboundedSender<ExExEvent>, Receiver<CanonStateNotification>) {
let (canon_tx, canon_rx) = mpsc::channel(1);
/// [`Receiver`] for [`ExExNotification`]s that should be given to the ExEx.
pub fn new(id: String) -> (Self, UnboundedSender<ExExEvent>, Receiver<ExExNotification>) {
let (notification_tx, notification_rx) = mpsc::channel(1);
let (event_tx, event_rx) = mpsc::unbounded_channel();
(
Self {
id: id.clone(),
metrics: ExExMetrics::new_with_labels(&[("exex", id)]),
sender: PollSender::new(canon_tx),
sender: PollSender::new(notification_tx),
receiver: event_rx,
next_notification_id: 0,
finished_height: None,
},
event_tx,
canon_rx,
notification_rx,
)
}
@ -85,14 +84,20 @@ impl ExExHandle {
fn send(
&mut self,
cx: &mut Context<'_>,
(event_id, notification): &(usize, CanonStateNotification),
) -> Poll<Result<(), PollSendError<CanonStateNotification>>> {
(event_id, notification): &(usize, ExExNotification),
) -> Poll<Result<(), PollSendError<ExExNotification>>> {
// check that this notification is above the finished height of the exex if the exex has set
// one
if let Some(finished_height) = self.finished_height {
if finished_height >= notification.tip().number {
self.next_notification_id = event_id + 1;
return Poll::Ready(Ok(()))
match notification {
ExExNotification::ChainCommitted { new } |
ExExNotification::ChainReorged { old: _, new }
if finished_height >= new.tip().number =>
{
self.next_notification_id = event_id + 1;
return Poll::Ready(Ok(()))
}
_ => (),
}
}
@ -142,18 +147,18 @@ pub struct ExExManager {
/// Handles to communicate with the ExEx's.
exex_handles: Vec<ExExHandle>,
/// [`CanonStateNotification`] channel from the [`ExExManagerHandle`]s.
handle_rx: UnboundedReceiver<CanonStateNotification>,
/// [`ExExNotification`] channel from the [`ExExManagerHandle`]s.
handle_rx: UnboundedReceiver<ExExNotification>,
/// The minimum notification ID currently present in the buffer.
min_id: usize,
/// Monotonically increasing ID for [`CanonStateNotification`]s.
/// Monotonically increasing ID for [`ExExNotification`]s.
next_id: usize,
/// Internal buffer of [`CanonStateNotification`]s.
/// Internal buffer of [`ExExNotification`]s.
///
/// The first element of the tuple is a monotonically increasing ID unique to the notification
/// (the second element of the tuple).
buffer: VecDeque<(usize, CanonStateNotification)>,
buffer: VecDeque<(usize, ExExNotification)>,
/// Max size of the internal state notifications buffer.
max_capacity: usize,
/// Current state notifications buffer capacity.
@ -244,7 +249,7 @@ impl ExExManager {
/// Pushes a new notification into the managers internal buffer, assigning the notification a
/// unique ID.
fn push_notification(&mut self, notification: CanonStateNotification) {
fn push_notification(&mut self, notification: ExExNotification) {
let next_id = self.next_id;
self.buffer.push_back((next_id, notification));
self.next_id += 1;
@ -334,7 +339,7 @@ impl Future for ExExManager {
#[derive(Debug)]
pub struct ExExManagerHandle {
/// Channel to send notifications to the ExEx manager.
exex_tx: UnboundedSender<CanonStateNotification>,
exex_tx: UnboundedSender<ExExNotification>,
/// The number of ExEx's running on the node.
num_exexs: usize,
/// A watch channel denoting whether the manager is ready for new notifications or not.
@ -376,10 +381,7 @@ impl ExExManagerHandle {
/// Synchronously send a notification over the channel to all execution extensions.
///
/// Senders should call [`Self::has_capacity`] first.
pub fn send(
&self,
notification: CanonStateNotification,
) -> Result<(), SendError<CanonStateNotification>> {
pub fn send(&self, notification: ExExNotification) -> Result<(), SendError<ExExNotification>> {
self.exex_tx.send(notification)
}
@ -389,8 +391,8 @@ impl ExExManagerHandle {
/// capacity in the channel, the future will wait.
pub async fn send_async(
&mut self,
notification: CanonStateNotification,
) -> Result<(), SendError<CanonStateNotification>> {
notification: ExExNotification,
) -> Result<(), SendError<ExExNotification>> {
self.ready().await;
self.exex_tx.send(notification)
}

View File

@ -0,0 +1,54 @@
use std::sync::Arc;
use reth_provider::{CanonStateNotification, Chain};
/// Notifications sent to an ExEx.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ExExNotification {
/// Chain got committed without a reorg, and only the new chain is returned.
ChainCommitted {
/// The new chain after commit.
new: Arc<Chain>,
},
/// Chain got reorged, and both the old and the new chains are returned.
ChainReorged {
/// The old chain before reorg.
old: Arc<Chain>,
/// The new chain after reorg.
new: Arc<Chain>,
},
/// Chain got reverted, and only the old chain is returned.
ChainReverted {
/// The old chain before reversion.
old: Arc<Chain>,
},
}
impl ExExNotification {
/// Returns the committed chain from the [Self::ChainCommitted] and [Self::ChainReorged]
/// variants, if any.
pub fn committed_chain(&self) -> Option<Arc<Chain>> {
match self {
Self::ChainCommitted { new } | Self::ChainReorged { old: _, new } => Some(new.clone()),
Self::ChainReverted { .. } => None,
}
}
/// Returns the reverted chain from the [Self::ChainReorged] and [Self::ChainReverted] variants,
/// if any.
pub fn reverted_chain(&self) -> Option<Arc<Chain>> {
match self {
Self::ChainReorged { old, new: _ } | Self::ChainReverted { old } => Some(old.clone()),
Self::ChainCommitted { .. } => None,
}
}
}
impl From<CanonStateNotification> for ExExNotification {
fn from(notification: CanonStateNotification) -> Self {
match notification {
CanonStateNotification::Commit { new } => Self::ChainCommitted { new },
CanonStateNotification::Reorg { old, new } => Self::ChainReorged { old, new },
}
}
}

View File

@ -661,7 +661,7 @@ where
executor.spawn_critical("exex manager blockchain tree notifications", async move {
while let Ok(notification) = canon_state_notifications.recv().await {
handle
.send_async(notification)
.send_async(notification.into())
.await
.expect("blockchain tree notification could not be sent to exex manager");
}

View File

@ -3,7 +3,7 @@ use num_traits::Zero;
use reth_db::{
cursor::DbCursorRO, database::Database, static_file::HeaderMask, tables, transaction::DbTx,
};
use reth_exex::ExExManagerHandle;
use reth_exex::{ExExManagerHandle, ExExNotification};
use reth_primitives::{
stage::{
CheckpointBlockRange, EntitiesCheckpoint, ExecutionCheckpoint, StageCheckpoint, StageId,
@ -12,9 +12,8 @@ use reth_primitives::{
};
use reth_provider::{
providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter},
BlockReader, CanonStateNotification, Chain, DatabaseProviderRW, ExecutorFactory,
HeaderProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError, StatsReader,
TransactionVariant,
BlockReader, Chain, DatabaseProviderRW, ExecutorFactory, HeaderProvider,
LatestStateProviderRef, OriginalValuesKnown, ProviderError, StatsReader, TransactionVariant,
};
use reth_stages_api::{
BlockErrorKind, ExecInput, ExecOutput, MetricEvent, MetricEventsSender, Stage, StageError,
@ -265,7 +264,7 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
// NOTE: We can ignore the error here, since an error means that the channel is closed,
// which means the manager has died, which then in turn means the node is shutting down.
let _ = self.exex_manager_handle.send(CanonStateNotification::Commit { new: chain });
let _ = self.exex_manager_handle.send(ExExNotification::ChainCommitted { new: chain });
}
let time = Instant::now();
@ -436,7 +435,7 @@ impl<EF: ExecutorFactory, DB: Database> Stage<DB> for ExecutionStage<EF> {
// NOTE: We can ignore the error here, since an error means that the channel is closed,
// which means the manager has died, which then in turn means the node is shutting down.
let _ = self.exex_manager_handle.send(CanonStateNotification::Reorg {
let _ = self.exex_manager_handle.send(ExExNotification::ChainReorged {
old: Arc::new(chain),
new: Arc::new(Chain::default()),
});

View File

@ -12,7 +12,7 @@ reth-node-api.workspace = true
reth-node-core.workspace = true
reth-node-ethereum.workspace = true
reth-primitives.workspace = true
reth-provider.workspace = true
reth-tracing.workspace = true
eyre.workspace = true
tokio.workspace = true

View File

@ -1,8 +1,8 @@
use futures::Future;
use reth_exex::{ExExContext, ExExEvent};
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_api::FullNodeComponents;
use reth_node_ethereum::EthereumNode;
use reth_provider::CanonStateNotification;
use reth_tracing::tracing::info;
/// The initialization logic of the ExEx is just an async function.
///
@ -21,19 +21,20 @@ async fn exex_init<Node: FullNodeComponents>(
async fn exex<Node: FullNodeComponents>(mut ctx: ExExContext<Node>) -> eyre::Result<()> {
while let Some(notification) = ctx.notifications.recv().await {
match &notification {
CanonStateNotification::Commit { new } => {
println!("Received commit: {:?}", new.first().number..=new.tip().number);
ExExNotification::ChainCommitted { new } => {
info!(committed_chain = ?new.range(), "Received commit");
}
CanonStateNotification::Reorg { old, new } => {
println!(
"Received reorg: {:?} -> {:?}",
old.first().number..=old.tip().number,
new.first().number..=new.tip().number
);
ExExNotification::ChainReorged { old, new } => {
info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
}
ExExNotification::ChainReverted { old } => {
info!(reverted_chain = ?old.range(), "Received revert");
}
};
ctx.events.send(ExExEvent::FinishedHeight(notification.tip().number))?;
if let Some(committed_chain) = notification.committed_chain() {
ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
}
}
Ok(())
}

View File

@ -94,7 +94,8 @@ async fn op_bridge_exex<Node: FullNodeComponents>(
) -> eyre::Result<()> {
// Process all new chain state notifications
while let Some(notification) = ctx.notifications.recv().await {
if let Some(reverted_chain) = notification.reverted() {
// Revert all deposits and withdrawals
if let Some(reverted_chain) = notification.reverted_chain() {
let events = decode_chain_into_events(&reverted_chain);
let mut deposits = 0;
@ -126,22 +127,22 @@ async fn op_bridge_exex<Node: FullNodeComponents>(
}
// Insert all new deposits and withdrawals
let committed_chain = notification.committed();
let events = decode_chain_into_events(&committed_chain);
if let Some(committed_chain) = notification.committed_chain() {
let events = decode_chain_into_events(&committed_chain);
let mut deposits = 0;
let mut withdrawals = 0;
let mut deposits = 0;
let mut withdrawals = 0;
for (block, tx, log, event) in events {
match event {
// L1 -> L2 deposit
L1StandardBridgeEvents::ETHBridgeInitiated(ETHBridgeInitiated {
amount,
from,
to,
..
}) => {
let inserted = connection.execute(
for (block, tx, log, event) in events {
match event {
// L1 -> L2 deposit
L1StandardBridgeEvents::ETHBridgeInitiated(ETHBridgeInitiated {
amount,
from,
to,
..
}) => {
let inserted = connection.execute(
r#"
INSERT INTO deposits (block_number, tx_hash, contract_address, "from", "to", amount)
VALUES (?, ?, ?, ?, ?, ?)
@ -155,16 +156,16 @@ async fn op_bridge_exex<Node: FullNodeComponents>(
amount.to_string(),
),
)?;
deposits += inserted;
}
// L2 -> L1 withdrawal
L1StandardBridgeEvents::ETHBridgeFinalized(ETHBridgeFinalized {
amount,
from,
to,
..
}) => {
let inserted = connection.execute(
deposits += inserted;
}
// L2 -> L1 withdrawal
L1StandardBridgeEvents::ETHBridgeFinalized(ETHBridgeFinalized {
amount,
from,
to,
..
}) => {
let inserted = connection.execute(
r#"
INSERT INTO withdrawals (block_number, tx_hash, contract_address, "from", "to", amount)
VALUES (?, ?, ?, ?, ?, ?)
@ -178,17 +179,18 @@ async fn op_bridge_exex<Node: FullNodeComponents>(
amount.to_string(),
),
)?;
withdrawals += inserted;
}
_ => continue,
};
withdrawals += inserted;
}
_ => continue,
};
}
info!(block_range = ?committed_chain.range(), %deposits, %withdrawals, "Committed chain events");
// Send a finished height event, signaling the node that we don't need any blocks below
// this height anymore
ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
}
info!(block_range = ?committed_chain.range(), %deposits, %withdrawals, "Committed chain events");
// Send a finished height event, signaling the node that we don't need any blocks below
// this height anymore
ctx.events.send(ExExEvent::FinishedHeight(notification.tip().number))?;
}
Ok(())