mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat: add Primitives AT to BlockExecutorProvider (#12994)
This commit is contained in:
@ -9,7 +9,7 @@ use alloy_primitives::BlockNumber;
|
||||
use reth_evm::execute::{
|
||||
BatchExecutor, BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider, Executor,
|
||||
};
|
||||
use reth_node_api::{Block as _, BlockBody as _};
|
||||
use reth_node_api::{Block as _, BlockBody as _, NodePrimitives};
|
||||
use reth_primitives::{BlockExt, BlockWithSenders, Receipt};
|
||||
use reth_primitives_traits::{format_gas_throughput, SignedTransaction};
|
||||
use reth_provider::{
|
||||
@ -38,12 +38,10 @@ pub struct BackfillJob<E, P> {
|
||||
|
||||
impl<E, P> Iterator for BackfillJob<E, P>
|
||||
where
|
||||
E: BlockExecutorProvider,
|
||||
P: HeaderProvider
|
||||
+ BlockReader<Transaction: SignedTransaction, Block = reth_primitives::Block>
|
||||
+ StateProviderFactory,
|
||||
E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>,
|
||||
P: HeaderProvider + BlockReader<Transaction: SignedTransaction> + StateProviderFactory,
|
||||
{
|
||||
type Item = BackfillJobResult<Chain>;
|
||||
type Item = BackfillJobResult<Chain<E::Primitives>>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
if self.range.is_empty() {
|
||||
@ -56,10 +54,8 @@ where
|
||||
|
||||
impl<E, P> BackfillJob<E, P>
|
||||
where
|
||||
E: BlockExecutorProvider,
|
||||
P: BlockReader<Transaction: SignedTransaction, Block = reth_primitives::Block>
|
||||
+ HeaderProvider
|
||||
+ StateProviderFactory,
|
||||
E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>,
|
||||
P: BlockReader<Transaction: SignedTransaction> + HeaderProvider + StateProviderFactory,
|
||||
{
|
||||
/// Converts the backfill job into a single block backfill job.
|
||||
pub fn into_single_blocks(self) -> SingleBlockBackfillJob<E, P> {
|
||||
@ -67,11 +63,11 @@ where
|
||||
}
|
||||
|
||||
/// Converts the backfill job into a stream.
|
||||
pub fn into_stream(self) -> StreamBackfillJob<E, P, Chain> {
|
||||
pub fn into_stream(self) -> StreamBackfillJob<E, P, Chain<E::Primitives>> {
|
||||
self.into()
|
||||
}
|
||||
|
||||
fn execute_range(&mut self) -> BackfillJobResult<Chain> {
|
||||
fn execute_range(&mut self) -> BackfillJobResult<Chain<E::Primitives>> {
|
||||
debug!(
|
||||
target: "exex::backfill",
|
||||
range = ?self.range,
|
||||
@ -169,10 +165,13 @@ pub struct SingleBlockBackfillJob<E, P> {
|
||||
|
||||
impl<E, P> Iterator for SingleBlockBackfillJob<E, P>
|
||||
where
|
||||
E: BlockExecutorProvider,
|
||||
P: HeaderProvider + BlockReader<Block = reth_primitives::Block> + StateProviderFactory,
|
||||
E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>,
|
||||
P: HeaderProvider + BlockReader + StateProviderFactory,
|
||||
{
|
||||
type Item = BackfillJobResult<(BlockWithSenders, BlockExecutionOutput<Receipt>)>;
|
||||
type Item = BackfillJobResult<(
|
||||
BlockWithSenders<P::Block>,
|
||||
BlockExecutionOutput<<E::Primitives as NodePrimitives>::Receipt>,
|
||||
)>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.range.next().map(|block_number| self.execute_block(block_number))
|
||||
@ -181,8 +180,8 @@ where
|
||||
|
||||
impl<E, P> SingleBlockBackfillJob<E, P>
|
||||
where
|
||||
E: BlockExecutorProvider,
|
||||
P: HeaderProvider + BlockReader<Block = reth_primitives::Block> + StateProviderFactory,
|
||||
E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>,
|
||||
P: HeaderProvider + BlockReader + StateProviderFactory,
|
||||
{
|
||||
/// Converts the single block backfill job into a stream.
|
||||
pub fn into_stream(
|
||||
@ -191,10 +190,14 @@ where
|
||||
self.into()
|
||||
}
|
||||
|
||||
#[expect(clippy::type_complexity)]
|
||||
pub(crate) fn execute_block(
|
||||
&self,
|
||||
block_number: u64,
|
||||
) -> BackfillJobResult<(BlockWithSenders<P::Block>, BlockExecutionOutput<Receipt>)> {
|
||||
) -> BackfillJobResult<(
|
||||
BlockWithSenders<P::Block>,
|
||||
BlockExecutionOutput<<E::Primitives as NodePrimitives>::Receipt>,
|
||||
)> {
|
||||
let td = self
|
||||
.provider
|
||||
.header_td_by_number(block_number)?
|
||||
|
||||
@ -11,7 +11,8 @@ use futures::{
|
||||
StreamExt,
|
||||
};
|
||||
use reth_evm::execute::{BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider};
|
||||
use reth_primitives::{BlockWithSenders, Receipt};
|
||||
use reth_node_api::NodePrimitives;
|
||||
use reth_primitives::{BlockWithSenders, EthPrimitives};
|
||||
use reth_provider::{BlockReader, Chain, HeaderProvider, StateProviderFactory};
|
||||
use reth_prune_types::PruneModes;
|
||||
use reth_stages_api::ExecutionStageThresholds;
|
||||
@ -38,8 +39,11 @@ struct BackfillTaskOutput<T> {
|
||||
/// Ordered queue of [`JoinHandle`]s that yield [`BackfillTaskOutput`]s.
|
||||
type BackfillTasks<T> = FuturesOrdered<JoinHandle<BackfillTaskOutput<T>>>;
|
||||
|
||||
type SingleBlockStreamItem = (BlockWithSenders, BlockExecutionOutput<Receipt>);
|
||||
type BatchBlockStreamItem = Chain;
|
||||
type SingleBlockStreamItem<N = EthPrimitives> = (
|
||||
BlockWithSenders<<N as NodePrimitives>::Block>,
|
||||
BlockExecutionOutput<<N as NodePrimitives>::Receipt>,
|
||||
);
|
||||
type BatchBlockStreamItem<N = EthPrimitives> = Chain<N>;
|
||||
|
||||
/// Stream for processing backfill jobs asynchronously.
|
||||
///
|
||||
@ -100,18 +104,12 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<E, P> Stream for StreamBackfillJob<E, P, SingleBlockStreamItem>
|
||||
impl<E, P> Stream for StreamBackfillJob<E, P, SingleBlockStreamItem<E::Primitives>>
|
||||
where
|
||||
E: BlockExecutorProvider + Clone + Send + 'static,
|
||||
P: HeaderProvider
|
||||
+ BlockReader<Block = reth_primitives::Block>
|
||||
+ StateProviderFactory
|
||||
+ Clone
|
||||
+ Send
|
||||
+ Unpin
|
||||
+ 'static,
|
||||
E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>> + Clone + Send + 'static,
|
||||
P: HeaderProvider + BlockReader + StateProviderFactory + Clone + Send + Unpin + 'static,
|
||||
{
|
||||
type Item = BackfillJobResult<SingleBlockStreamItem>;
|
||||
type Item = BackfillJobResult<SingleBlockStreamItem<E::Primitives>>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let this = self.get_mut();
|
||||
@ -139,18 +137,12 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<E, P> Stream for StreamBackfillJob<E, P, BatchBlockStreamItem>
|
||||
impl<E, P> Stream for StreamBackfillJob<E, P, BatchBlockStreamItem<E::Primitives>>
|
||||
where
|
||||
E: BlockExecutorProvider + Clone + Send + 'static,
|
||||
P: HeaderProvider
|
||||
+ BlockReader<Block = reth_primitives::Block>
|
||||
+ StateProviderFactory
|
||||
+ Clone
|
||||
+ Send
|
||||
+ Unpin
|
||||
+ 'static,
|
||||
E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>> + Clone + Send + 'static,
|
||||
P: HeaderProvider + BlockReader + StateProviderFactory + Clone + Send + Unpin + 'static,
|
||||
{
|
||||
type Item = BackfillJobResult<BatchBlockStreamItem>;
|
||||
type Item = BackfillJobResult<BatchBlockStreamItem<E::Primitives>>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let this = self.get_mut();
|
||||
@ -200,7 +192,10 @@ impl<E, P> From<SingleBlockBackfillJob<E, P>> for StreamBackfillJob<E, P, Single
|
||||
}
|
||||
}
|
||||
|
||||
impl<E, P> From<BackfillJob<E, P>> for StreamBackfillJob<E, P, BatchBlockStreamItem> {
|
||||
impl<E, P> From<BackfillJob<E, P>> for StreamBackfillJob<E, P, BatchBlockStreamItem<E::Primitives>>
|
||||
where
|
||||
E: BlockExecutorProvider,
|
||||
{
|
||||
fn from(job: BackfillJob<E, P>) -> Self {
|
||||
let batch_size = job.thresholds.max_blocks.map_or(DEFAULT_BATCH_SIZE, |max| max as usize);
|
||||
Self {
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
use crate::{ExExContextDyn, ExExEvent, ExExNotifications, ExExNotificationsStream};
|
||||
use reth_exex_types::ExExHead;
|
||||
use reth_node_api::{FullNodeComponents, NodeTypes};
|
||||
use reth_node_api::{FullNodeComponents, NodePrimitives, NodeTypes};
|
||||
use reth_node_core::node_config::NodeConfig;
|
||||
use reth_primitives::Head;
|
||||
use reth_provider::BlockReader;
|
||||
@ -57,11 +57,12 @@ where
|
||||
impl<Node> ExExContext<Node>
|
||||
where
|
||||
Node: FullNodeComponents,
|
||||
Node::Provider: Debug + BlockReader<Block = reth_primitives::Block>,
|
||||
Node::Provider: Debug + BlockReader,
|
||||
Node::Executor: Debug,
|
||||
Node::Types: NodeTypes<Primitives: NodePrimitives>,
|
||||
{
|
||||
/// Returns dynamic version of the context
|
||||
pub fn into_dyn(self) -> ExExContextDyn {
|
||||
pub fn into_dyn(self) -> ExExContextDyn<<Node::Types as NodeTypes>::Primitives> {
|
||||
ExExContextDyn::from(self)
|
||||
}
|
||||
}
|
||||
@ -69,6 +70,7 @@ where
|
||||
impl<Node> ExExContext<Node>
|
||||
where
|
||||
Node: FullNodeComponents,
|
||||
Node::Types: NodeTypes<Primitives: NodePrimitives>,
|
||||
{
|
||||
/// Returns the transaction pool of the node.
|
||||
pub fn pool(&self) -> &Node::Pool {
|
||||
@ -107,19 +109,13 @@ where
|
||||
|
||||
/// Sets notifications stream to [`crate::ExExNotificationsWithoutHead`], a stream of
|
||||
/// notifications without a head.
|
||||
pub fn set_notifications_without_head(&mut self)
|
||||
where
|
||||
Node::Provider: BlockReader<Block = reth_primitives::Block>,
|
||||
{
|
||||
pub fn set_notifications_without_head(&mut self) {
|
||||
self.notifications.set_without_head();
|
||||
}
|
||||
|
||||
/// Sets notifications stream to [`crate::ExExNotificationsWithHead`], a stream of notifications
|
||||
/// with the provided head.
|
||||
pub fn set_notifications_with_head(&mut self, head: ExExHead)
|
||||
where
|
||||
Node::Provider: BlockReader<Block = reth_primitives::Block>,
|
||||
{
|
||||
pub fn set_notifications_with_head(&mut self, head: ExExHead) {
|
||||
self.notifications.set_with_head(head);
|
||||
}
|
||||
}
|
||||
@ -142,7 +138,7 @@ mod tests {
|
||||
|
||||
impl<Node: FullNodeComponents> ExEx<Node>
|
||||
where
|
||||
Node::Provider: BlockReader<Block = reth_primitives::Block>,
|
||||
Node::Provider: BlockReader,
|
||||
{
|
||||
async fn _test_bounds(mut self) -> eyre::Result<()> {
|
||||
self.ctx.pool();
|
||||
|
||||
@ -4,8 +4,9 @@
|
||||
use std::fmt::Debug;
|
||||
|
||||
use reth_chainspec::{EthChainSpec, Head};
|
||||
use reth_node_api::FullNodeComponents;
|
||||
use reth_node_api::{FullNodeComponents, NodePrimitives, NodeTypes};
|
||||
use reth_node_core::node_config::NodeConfig;
|
||||
use reth_primitives::EthPrimitives;
|
||||
use reth_provider::BlockReader;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
@ -13,7 +14,7 @@ use crate::{ExExContext, ExExEvent, ExExNotificationsStream};
|
||||
|
||||
// TODO(0xurb) - add `node` after abstractions
|
||||
/// Captures the context that an `ExEx` has access to.
|
||||
pub struct ExExContextDyn {
|
||||
pub struct ExExContextDyn<N: NodePrimitives = EthPrimitives> {
|
||||
/// The current head of the blockchain at launch.
|
||||
pub head: Head,
|
||||
/// The config of the node
|
||||
@ -34,10 +35,10 @@ pub struct ExExContextDyn {
|
||||
///
|
||||
/// Once an [`ExExNotification`](crate::ExExNotification) is sent over the channel, it is
|
||||
/// considered delivered by the node.
|
||||
pub notifications: Box<dyn ExExNotificationsStream>,
|
||||
pub notifications: Box<dyn ExExNotificationsStream<N>>,
|
||||
}
|
||||
|
||||
impl Debug for ExExContextDyn {
|
||||
impl<N: NodePrimitives> Debug for ExExContextDyn<N> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("ExExContext")
|
||||
.field("head", &self.head)
|
||||
@ -49,16 +50,16 @@ impl Debug for ExExContextDyn {
|
||||
}
|
||||
}
|
||||
|
||||
impl<Node> From<ExExContext<Node>> for ExExContextDyn
|
||||
impl<Node> From<ExExContext<Node>> for ExExContextDyn<<Node::Types as NodeTypes>::Primitives>
|
||||
where
|
||||
Node: FullNodeComponents,
|
||||
Node::Provider: Debug + BlockReader<Block = reth_primitives::Block>,
|
||||
Node: FullNodeComponents<Types: NodeTypes<Primitives: NodePrimitives>>,
|
||||
Node::Provider: Debug + BlockReader,
|
||||
Node::Executor: Debug,
|
||||
{
|
||||
fn from(ctx: ExExContext<Node>) -> Self {
|
||||
let config =
|
||||
ctx.config.map_chainspec(|chainspec| Box::new(chainspec) as Box<dyn EthChainSpec>);
|
||||
let notifications = Box::new(ctx.notifications) as Box<dyn ExExNotificationsStream>;
|
||||
let notifications = Box::new(ctx.notifications) as Box<_>;
|
||||
|
||||
Self {
|
||||
head: ctx.head,
|
||||
|
||||
@ -1,14 +1,17 @@
|
||||
use crate::{
|
||||
wal::Wal, ExExEvent, ExExNotification, ExExNotifications, FinishedExExHeight, WalHandle,
|
||||
};
|
||||
use alloy_consensus::BlockHeader;
|
||||
use alloy_eips::BlockNumHash;
|
||||
use futures::StreamExt;
|
||||
use itertools::Itertools;
|
||||
use metrics::Gauge;
|
||||
use reth_chain_state::ForkChoiceStream;
|
||||
use reth_chainspec::Head;
|
||||
use reth_evm::execute::BlockExecutorProvider;
|
||||
use reth_metrics::{metrics::Counter, Metrics};
|
||||
use reth_primitives::SealedHeader;
|
||||
use reth_node_api::NodePrimitives;
|
||||
use reth_primitives::{EthPrimitives, SealedHeader};
|
||||
use reth_provider::HeaderProvider;
|
||||
use reth_tracing::tracing::{debug, warn};
|
||||
use std::{
|
||||
@ -69,13 +72,13 @@ struct ExExMetrics {
|
||||
/// [`ExExHandle::new`] should be given to the `ExEx`, while the handle itself should be given to
|
||||
/// the manager in [`ExExManager::new`].
|
||||
#[derive(Debug)]
|
||||
pub struct ExExHandle {
|
||||
pub struct ExExHandle<N: NodePrimitives = EthPrimitives> {
|
||||
/// The execution extension's ID.
|
||||
id: String,
|
||||
/// Metrics for an `ExEx`.
|
||||
metrics: ExExMetrics,
|
||||
/// Channel to send [`ExExNotification`]s to the `ExEx`.
|
||||
sender: PollSender<ExExNotification>,
|
||||
sender: PollSender<ExExNotification<N>>,
|
||||
/// Channel to receive [`ExExEvent`]s from the `ExEx`.
|
||||
receiver: UnboundedReceiver<ExExEvent>,
|
||||
/// The ID of the next notification to send to this `ExEx`.
|
||||
@ -86,17 +89,17 @@ pub struct ExExHandle {
|
||||
finished_height: Option<BlockNumHash>,
|
||||
}
|
||||
|
||||
impl ExExHandle {
|
||||
impl<N: NodePrimitives> ExExHandle<N> {
|
||||
/// Create a new handle for the given `ExEx`.
|
||||
///
|
||||
/// Returns the handle, as well as a [`UnboundedSender`] for [`ExExEvent`]s and a
|
||||
/// [`mpsc::Receiver`] for [`ExExNotification`]s that should be given to the `ExEx`.
|
||||
pub fn new<P, E>(
|
||||
pub fn new<P, E: BlockExecutorProvider<Primitives = N>>(
|
||||
id: String,
|
||||
node_head: Head,
|
||||
provider: P,
|
||||
executor: E,
|
||||
wal_handle: WalHandle,
|
||||
wal_handle: WalHandle<N>,
|
||||
) -> (Self, UnboundedSender<ExExEvent>, ExExNotifications<P, E>) {
|
||||
let (notification_tx, notification_rx) = mpsc::channel(1);
|
||||
let (event_tx, event_rx) = mpsc::unbounded_channel();
|
||||
@ -124,21 +127,21 @@ impl ExExHandle {
|
||||
fn send(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
(notification_id, notification): &(usize, ExExNotification),
|
||||
) -> Poll<Result<(), PollSendError<ExExNotification>>> {
|
||||
(notification_id, notification): &(usize, ExExNotification<N>),
|
||||
) -> Poll<Result<(), PollSendError<ExExNotification<N>>>> {
|
||||
if let Some(finished_height) = self.finished_height {
|
||||
match notification {
|
||||
ExExNotification::ChainCommitted { new } => {
|
||||
// Skip the chain commit notification if the finished height of the ExEx is
|
||||
// higher than or equal to the tip of the new notification.
|
||||
// I.e., the ExEx has already processed the notification.
|
||||
if finished_height.number >= new.tip().number {
|
||||
if finished_height.number >= new.tip().number() {
|
||||
debug!(
|
||||
target: "exex::manager",
|
||||
exex_id = %self.id,
|
||||
%notification_id,
|
||||
?finished_height,
|
||||
new_tip = %new.tip().number,
|
||||
new_tip = %new.tip().number(),
|
||||
"Skipping notification"
|
||||
);
|
||||
|
||||
@ -208,15 +211,15 @@ pub struct ExExManagerMetrics {
|
||||
/// - Error handling
|
||||
/// - Monitoring
|
||||
#[derive(Debug)]
|
||||
pub struct ExExManager<P> {
|
||||
pub struct ExExManager<P, N: NodePrimitives> {
|
||||
/// Provider for querying headers.
|
||||
provider: P,
|
||||
|
||||
/// Handles to communicate with the `ExEx`'s.
|
||||
exex_handles: Vec<ExExHandle>,
|
||||
exex_handles: Vec<ExExHandle<N>>,
|
||||
|
||||
/// [`ExExNotification`] channel from the [`ExExManagerHandle`]s.
|
||||
handle_rx: UnboundedReceiver<(ExExNotificationSource, ExExNotification)>,
|
||||
handle_rx: UnboundedReceiver<(ExExNotificationSource, ExExNotification<N>)>,
|
||||
|
||||
/// The minimum notification ID currently present in the buffer.
|
||||
min_id: usize,
|
||||
@ -226,7 +229,7 @@ pub struct ExExManager<P> {
|
||||
///
|
||||
/// The first element of the tuple is a monotonically increasing ID unique to the notification
|
||||
/// (the second element of the tuple).
|
||||
buffer: VecDeque<(usize, ExExNotification)>,
|
||||
buffer: VecDeque<(usize, ExExNotification<N>)>,
|
||||
/// Max size of the internal state notifications buffer.
|
||||
max_capacity: usize,
|
||||
/// Current state notifications buffer capacity.
|
||||
@ -241,17 +244,20 @@ pub struct ExExManager<P> {
|
||||
finished_height: watch::Sender<FinishedExExHeight>,
|
||||
|
||||
/// Write-Ahead Log for the [`ExExNotification`]s.
|
||||
wal: Wal,
|
||||
wal: Wal<N>,
|
||||
/// A stream of finalized headers.
|
||||
finalized_header_stream: ForkChoiceStream<SealedHeader>,
|
||||
|
||||
/// A handle to the `ExEx` manager.
|
||||
handle: ExExManagerHandle,
|
||||
handle: ExExManagerHandle<N>,
|
||||
/// Metrics for the `ExEx` manager.
|
||||
metrics: ExExManagerMetrics,
|
||||
}
|
||||
|
||||
impl<P> ExExManager<P> {
|
||||
impl<P, N> ExExManager<P, N>
|
||||
where
|
||||
N: NodePrimitives,
|
||||
{
|
||||
/// Create a new [`ExExManager`].
|
||||
///
|
||||
/// You must provide an [`ExExHandle`] for each `ExEx` and the maximum capacity of the
|
||||
@ -261,9 +267,9 @@ impl<P> ExExManager<P> {
|
||||
/// notifications over [`ExExManagerHandle`]s until there is capacity again.
|
||||
pub fn new(
|
||||
provider: P,
|
||||
handles: Vec<ExExHandle>,
|
||||
handles: Vec<ExExHandle<N>>,
|
||||
max_capacity: usize,
|
||||
wal: Wal,
|
||||
wal: Wal<N>,
|
||||
finalized_header_stream: ForkChoiceStream<SealedHeader>,
|
||||
) -> Self {
|
||||
let num_exexs = handles.len();
|
||||
@ -314,7 +320,7 @@ impl<P> ExExManager<P> {
|
||||
}
|
||||
|
||||
/// Returns the handle to the manager.
|
||||
pub fn handle(&self) -> ExExManagerHandle {
|
||||
pub fn handle(&self) -> ExExManagerHandle<N> {
|
||||
self.handle.clone()
|
||||
}
|
||||
|
||||
@ -333,16 +339,17 @@ impl<P> ExExManager<P> {
|
||||
|
||||
/// Pushes a new notification into the managers internal buffer, assigning the notification a
|
||||
/// unique ID.
|
||||
fn push_notification(&mut self, notification: ExExNotification) {
|
||||
fn push_notification(&mut self, notification: ExExNotification<N>) {
|
||||
let next_id = self.next_id;
|
||||
self.buffer.push_back((next_id, notification));
|
||||
self.next_id += 1;
|
||||
}
|
||||
}
|
||||
|
||||
impl<P> ExExManager<P>
|
||||
impl<P, N> ExExManager<P, N>
|
||||
where
|
||||
P: HeaderProvider,
|
||||
N: NodePrimitives,
|
||||
{
|
||||
/// Finalizes the WAL according to the passed finalized header.
|
||||
///
|
||||
@ -413,9 +420,10 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<P> Future for ExExManager<P>
|
||||
impl<P, N> Future for ExExManager<P, N>
|
||||
where
|
||||
P: HeaderProvider + Unpin + 'static,
|
||||
N: NodePrimitives,
|
||||
{
|
||||
type Output = eyre::Result<()>;
|
||||
|
||||
@ -456,8 +464,9 @@ where
|
||||
// Drain handle notifications
|
||||
while this.buffer.len() < this.max_capacity {
|
||||
if let Poll::Ready(Some((source, notification))) = this.handle_rx.poll_recv(cx) {
|
||||
let committed_tip = notification.committed_chain().map(|chain| chain.tip().number);
|
||||
let reverted_tip = notification.reverted_chain().map(|chain| chain.tip().number);
|
||||
let committed_tip =
|
||||
notification.committed_chain().map(|chain| chain.tip().number());
|
||||
let reverted_tip = notification.reverted_chain().map(|chain| chain.tip().number());
|
||||
debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Received new notification");
|
||||
|
||||
// Commit to WAL only notifications from blockchain tree. Pipeline notifications
|
||||
@ -524,9 +533,9 @@ where
|
||||
|
||||
/// A handle to communicate with the [`ExExManager`].
|
||||
#[derive(Debug)]
|
||||
pub struct ExExManagerHandle {
|
||||
pub struct ExExManagerHandle<N: NodePrimitives = EthPrimitives> {
|
||||
/// Channel to send notifications to the `ExEx` manager.
|
||||
exex_tx: UnboundedSender<(ExExNotificationSource, ExExNotification)>,
|
||||
exex_tx: UnboundedSender<(ExExNotificationSource, ExExNotification<N>)>,
|
||||
/// The number of `ExEx`'s running on the node.
|
||||
num_exexs: usize,
|
||||
/// A watch channel denoting whether the manager is ready for new notifications or not.
|
||||
@ -544,7 +553,7 @@ pub struct ExExManagerHandle {
|
||||
finished_height: watch::Receiver<FinishedExExHeight>,
|
||||
}
|
||||
|
||||
impl ExExManagerHandle {
|
||||
impl<N: NodePrimitives> ExExManagerHandle<N> {
|
||||
/// Creates an empty manager handle.
|
||||
///
|
||||
/// Use this if there is no manager present.
|
||||
@ -571,8 +580,8 @@ impl ExExManagerHandle {
|
||||
pub fn send(
|
||||
&self,
|
||||
source: ExExNotificationSource,
|
||||
notification: ExExNotification,
|
||||
) -> Result<(), SendError<(ExExNotificationSource, ExExNotification)>> {
|
||||
notification: ExExNotification<N>,
|
||||
) -> Result<(), SendError<(ExExNotificationSource, ExExNotification<N>)>> {
|
||||
self.exex_tx.send((source, notification))
|
||||
}
|
||||
|
||||
@ -583,8 +592,8 @@ impl ExExManagerHandle {
|
||||
pub async fn send_async(
|
||||
&mut self,
|
||||
source: ExExNotificationSource,
|
||||
notification: ExExNotification,
|
||||
) -> Result<(), SendError<(ExExNotificationSource, ExExNotification)>> {
|
||||
notification: ExExNotification<N>,
|
||||
) -> Result<(), SendError<(ExExNotificationSource, ExExNotification<N>)>> {
|
||||
self.ready().await;
|
||||
self.exex_tx.send((source, notification))
|
||||
}
|
||||
@ -633,7 +642,7 @@ async fn make_wait_future(mut rx: watch::Receiver<bool>) -> watch::Receiver<bool
|
||||
rx
|
||||
}
|
||||
|
||||
impl Clone for ExExManagerHandle {
|
||||
impl<N: NodePrimitives> Clone for ExExManagerHandle<N> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
exex_tx: self.exex_tx.clone(),
|
||||
@ -653,6 +662,7 @@ mod tests {
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
use rand::Rng;
|
||||
use reth_db_common::init::init_genesis;
|
||||
use reth_evm::test_utils::MockExecutorProvider;
|
||||
use reth_evm_ethereum::execute::EthExecutorProvider;
|
||||
use reth_primitives::SealedBlockWithSenders;
|
||||
use reth_provider::{
|
||||
@ -673,8 +683,13 @@ mod tests {
|
||||
let temp_dir = tempfile::tempdir().unwrap();
|
||||
let wal = Wal::new(temp_dir.path()).unwrap();
|
||||
|
||||
let (mut exex_handle, event_tx, mut _notification_rx) =
|
||||
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
|
||||
let (mut exex_handle, event_tx, mut _notification_rx) = ExExHandle::new(
|
||||
"test_exex".to_string(),
|
||||
Head::default(),
|
||||
(),
|
||||
MockExecutorProvider::default(),
|
||||
wal.handle(),
|
||||
);
|
||||
|
||||
// Send an event and check that it's delivered correctly
|
||||
let event = ExExEvent::FinishedHeight(BlockNumHash::new(42, B256::random()));
|
||||
@ -688,8 +703,13 @@ mod tests {
|
||||
let temp_dir = tempfile::tempdir().unwrap();
|
||||
let wal = Wal::new(temp_dir.path()).unwrap();
|
||||
|
||||
let (exex_handle_1, _, _) =
|
||||
ExExHandle::new("test_exex_1".to_string(), Head::default(), (), (), wal.handle());
|
||||
let (exex_handle_1, _, _) = ExExHandle::new(
|
||||
"test_exex_1".to_string(),
|
||||
Head::default(),
|
||||
(),
|
||||
MockExecutorProvider::default(),
|
||||
wal.handle(),
|
||||
);
|
||||
|
||||
assert!(!ExExManager::new((), vec![], 0, wal.clone(), empty_finalized_header_stream())
|
||||
.handle
|
||||
@ -705,8 +725,13 @@ mod tests {
|
||||
let temp_dir = tempfile::tempdir().unwrap();
|
||||
let wal = Wal::new(temp_dir.path()).unwrap();
|
||||
|
||||
let (exex_handle_1, _, _) =
|
||||
ExExHandle::new("test_exex_1".to_string(), Head::default(), (), (), wal.handle());
|
||||
let (exex_handle_1, _, _) = ExExHandle::new(
|
||||
"test_exex_1".to_string(),
|
||||
Head::default(),
|
||||
(),
|
||||
MockExecutorProvider::default(),
|
||||
wal.handle(),
|
||||
);
|
||||
|
||||
assert!(!ExExManager::new((), vec![], 0, wal.clone(), empty_finalized_header_stream())
|
||||
.handle
|
||||
@ -728,8 +753,13 @@ mod tests {
|
||||
let temp_dir = tempfile::tempdir().unwrap();
|
||||
let wal = Wal::new(temp_dir.path()).unwrap();
|
||||
|
||||
let (exex_handle, _, _) =
|
||||
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
|
||||
let (exex_handle, _, _) = ExExHandle::new(
|
||||
"test_exex".to_string(),
|
||||
Head::default(),
|
||||
(),
|
||||
MockExecutorProvider::default(),
|
||||
wal.handle(),
|
||||
);
|
||||
|
||||
// Create a mock ExExManager and add the exex_handle to it
|
||||
let mut exex_manager =
|
||||
@ -778,8 +808,13 @@ mod tests {
|
||||
let temp_dir = tempfile::tempdir().unwrap();
|
||||
let wal = Wal::new(temp_dir.path()).unwrap();
|
||||
|
||||
let (exex_handle, _, _) =
|
||||
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
|
||||
let (exex_handle, _, _) = ExExHandle::new(
|
||||
"test_exex".to_string(),
|
||||
Head::default(),
|
||||
(),
|
||||
MockExecutorProvider::default(),
|
||||
wal.handle(),
|
||||
);
|
||||
|
||||
// Create a mock ExExManager and add the exex_handle to it
|
||||
let max_capacity = 5;
|
||||
@ -824,8 +859,13 @@ mod tests {
|
||||
|
||||
let provider_factory = create_test_provider_factory();
|
||||
|
||||
let (exex_handle, event_tx, mut _notification_rx) =
|
||||
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
|
||||
let (exex_handle, event_tx, mut _notification_rx) = ExExHandle::new(
|
||||
"test_exex".to_string(),
|
||||
Head::default(),
|
||||
(),
|
||||
MockExecutorProvider::default(),
|
||||
wal.handle(),
|
||||
);
|
||||
|
||||
// Check initial block height
|
||||
assert!(exex_handle.finished_height.is_none());
|
||||
@ -874,10 +914,20 @@ mod tests {
|
||||
let provider_factory = create_test_provider_factory();
|
||||
|
||||
// Create two `ExExHandle` instances
|
||||
let (exex_handle1, event_tx1, _) =
|
||||
ExExHandle::new("test_exex1".to_string(), Head::default(), (), (), wal.handle());
|
||||
let (exex_handle2, event_tx2, _) =
|
||||
ExExHandle::new("test_exex2".to_string(), Head::default(), (), (), wal.handle());
|
||||
let (exex_handle1, event_tx1, _) = ExExHandle::new(
|
||||
"test_exex1".to_string(),
|
||||
Head::default(),
|
||||
(),
|
||||
MockExecutorProvider::default(),
|
||||
wal.handle(),
|
||||
);
|
||||
let (exex_handle2, event_tx2, _) = ExExHandle::new(
|
||||
"test_exex2".to_string(),
|
||||
Head::default(),
|
||||
(),
|
||||
MockExecutorProvider::default(),
|
||||
wal.handle(),
|
||||
);
|
||||
|
||||
let block1 = BlockNumHash::new(42, B256::random());
|
||||
let block2 = BlockNumHash::new(10, B256::random());
|
||||
@ -921,10 +971,20 @@ mod tests {
|
||||
let provider_factory = create_test_provider_factory();
|
||||
|
||||
// Create two `ExExHandle` instances
|
||||
let (exex_handle1, event_tx1, _) =
|
||||
ExExHandle::new("test_exex1".to_string(), Head::default(), (), (), wal.handle());
|
||||
let (exex_handle2, event_tx2, _) =
|
||||
ExExHandle::new("test_exex2".to_string(), Head::default(), (), (), wal.handle());
|
||||
let (exex_handle1, event_tx1, _) = ExExHandle::new(
|
||||
"test_exex1".to_string(),
|
||||
Head::default(),
|
||||
(),
|
||||
MockExecutorProvider::default(),
|
||||
wal.handle(),
|
||||
);
|
||||
let (exex_handle2, event_tx2, _) = ExExHandle::new(
|
||||
"test_exex2".to_string(),
|
||||
Head::default(),
|
||||
(),
|
||||
MockExecutorProvider::default(),
|
||||
wal.handle(),
|
||||
);
|
||||
|
||||
// Assert that the initial block height is `None` for the first `ExExHandle`.
|
||||
assert!(exex_handle1.finished_height.is_none());
|
||||
@ -974,8 +1034,13 @@ mod tests {
|
||||
|
||||
let provider_factory = create_test_provider_factory();
|
||||
|
||||
let (exex_handle_1, _, _) =
|
||||
ExExHandle::new("test_exex_1".to_string(), Head::default(), (), (), wal.handle());
|
||||
let (exex_handle_1, _, _) = ExExHandle::new(
|
||||
"test_exex_1".to_string(),
|
||||
Head::default(),
|
||||
(),
|
||||
MockExecutorProvider::default(),
|
||||
wal.handle(),
|
||||
);
|
||||
|
||||
// Create an ExExManager with a small max capacity
|
||||
let max_capacity = 2;
|
||||
|
||||
@ -1,8 +1,11 @@
|
||||
use crate::{BackfillJobFactory, ExExNotification, StreamBackfillJob, WalHandle};
|
||||
use alloy_consensus::BlockHeader;
|
||||
use futures::{Stream, StreamExt};
|
||||
use reth_chainspec::Head;
|
||||
use reth_evm::execute::BlockExecutorProvider;
|
||||
use reth_exex_types::ExExHead;
|
||||
use reth_node_api::NodePrimitives;
|
||||
use reth_primitives::EthPrimitives;
|
||||
use reth_provider::{BlockReader, Chain, HeaderProvider, StateProviderFactory};
|
||||
use reth_tracing::tracing::debug;
|
||||
use std::{
|
||||
@ -17,14 +20,19 @@ use tokio::sync::mpsc::Receiver;
|
||||
/// stream is configured with a head via [`ExExNotifications::set_with_head`] or
|
||||
/// [`ExExNotifications::with_head`], it will run backfill jobs to catch up to the node head.
|
||||
#[derive(Debug)]
|
||||
pub struct ExExNotifications<P, E> {
|
||||
pub struct ExExNotifications<P, E>
|
||||
where
|
||||
E: BlockExecutorProvider,
|
||||
{
|
||||
inner: ExExNotificationsInner<P, E>,
|
||||
}
|
||||
|
||||
/// A trait, that represents a stream of [`ExExNotification`]s. The stream will emit notifications
|
||||
/// for all blocks. If the stream is configured with a head via [`ExExNotifications::set_with_head`]
|
||||
/// or [`ExExNotifications::with_head`], it will run backfill jobs to catch up to the node head.
|
||||
pub trait ExExNotificationsStream: Stream<Item = eyre::Result<ExExNotification>> + Unpin {
|
||||
pub trait ExExNotificationsStream<N: NodePrimitives = EthPrimitives>:
|
||||
Stream<Item = eyre::Result<ExExNotification<N>>> + Unpin
|
||||
{
|
||||
/// Sets [`ExExNotificationsStream`] to a stream of [`ExExNotification`]s without a head.
|
||||
///
|
||||
/// It's a no-op if the stream has already been configured without a head.
|
||||
@ -56,7 +64,10 @@ pub trait ExExNotificationsStream: Stream<Item = eyre::Result<ExExNotification>>
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum ExExNotificationsInner<P, E> {
|
||||
enum ExExNotificationsInner<P, E>
|
||||
where
|
||||
E: BlockExecutorProvider,
|
||||
{
|
||||
/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks.
|
||||
WithoutHead(ExExNotificationsWithoutHead<P, E>),
|
||||
/// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that
|
||||
@ -67,14 +78,17 @@ enum ExExNotificationsInner<P, E> {
|
||||
Invalid,
|
||||
}
|
||||
|
||||
impl<P, E> ExExNotifications<P, E> {
|
||||
impl<P, E> ExExNotifications<P, E>
|
||||
where
|
||||
E: BlockExecutorProvider,
|
||||
{
|
||||
/// Creates a new stream of [`ExExNotifications`] without a head.
|
||||
pub const fn new(
|
||||
node_head: Head,
|
||||
provider: P,
|
||||
executor: E,
|
||||
notifications: Receiver<ExExNotification>,
|
||||
wal_handle: WalHandle,
|
||||
notifications: Receiver<ExExNotification<E::Primitives>>,
|
||||
wal_handle: WalHandle<E::Primitives>,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner: ExExNotificationsInner::WithoutHead(ExExNotificationsWithoutHead::new(
|
||||
@ -88,15 +102,13 @@ impl<P, E> ExExNotifications<P, E> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<P, E> ExExNotificationsStream for ExExNotifications<P, E>
|
||||
impl<P, E> ExExNotificationsStream<E::Primitives> for ExExNotifications<P, E>
|
||||
where
|
||||
P: BlockReader<Block = reth_primitives::Block>
|
||||
+ HeaderProvider
|
||||
+ StateProviderFactory
|
||||
P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
|
||||
E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
|
||||
+ Clone
|
||||
+ Unpin
|
||||
+ 'static,
|
||||
E: BlockExecutorProvider + Clone + Unpin + 'static,
|
||||
{
|
||||
fn set_without_head(&mut self) {
|
||||
let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid);
|
||||
@ -144,15 +156,13 @@ where
|
||||
|
||||
impl<P, E> Stream for ExExNotifications<P, E>
|
||||
where
|
||||
P: BlockReader<Block = reth_primitives::Block>
|
||||
+ HeaderProvider
|
||||
+ StateProviderFactory
|
||||
P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
|
||||
E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
|
||||
+ Clone
|
||||
+ Unpin
|
||||
+ 'static,
|
||||
E: BlockExecutorProvider + Clone + Unpin + 'static,
|
||||
{
|
||||
type Item = eyre::Result<ExExNotification>;
|
||||
type Item = eyre::Result<ExExNotification<E::Primitives>>;
|
||||
|
||||
fn poll_next(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
@ -169,15 +179,21 @@ where
|
||||
}
|
||||
|
||||
/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks.
|
||||
pub struct ExExNotificationsWithoutHead<P, E> {
|
||||
pub struct ExExNotificationsWithoutHead<P, E>
|
||||
where
|
||||
E: BlockExecutorProvider,
|
||||
{
|
||||
node_head: Head,
|
||||
provider: P,
|
||||
executor: E,
|
||||
notifications: Receiver<ExExNotification>,
|
||||
wal_handle: WalHandle,
|
||||
notifications: Receiver<ExExNotification<E::Primitives>>,
|
||||
wal_handle: WalHandle<E::Primitives>,
|
||||
}
|
||||
|
||||
impl<P: Debug, E: Debug> Debug for ExExNotificationsWithoutHead<P, E> {
|
||||
impl<P: Debug, E> Debug for ExExNotificationsWithoutHead<P, E>
|
||||
where
|
||||
E: Debug + BlockExecutorProvider,
|
||||
{
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("ExExNotifications")
|
||||
.field("provider", &self.provider)
|
||||
@ -187,14 +203,17 @@ impl<P: Debug, E: Debug> Debug for ExExNotificationsWithoutHead<P, E> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<P, E> ExExNotificationsWithoutHead<P, E> {
|
||||
impl<P, E> ExExNotificationsWithoutHead<P, E>
|
||||
where
|
||||
E: BlockExecutorProvider,
|
||||
{
|
||||
/// Creates a new instance of [`ExExNotificationsWithoutHead`].
|
||||
const fn new(
|
||||
node_head: Head,
|
||||
provider: P,
|
||||
executor: E,
|
||||
notifications: Receiver<ExExNotification>,
|
||||
wal_handle: WalHandle,
|
||||
notifications: Receiver<ExExNotification<E::Primitives>>,
|
||||
wal_handle: WalHandle<E::Primitives>,
|
||||
) -> Self {
|
||||
Self { node_head, provider, executor, notifications, wal_handle }
|
||||
}
|
||||
@ -212,8 +231,11 @@ impl<P, E> ExExNotificationsWithoutHead<P, E> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<P: Unpin, E: Unpin> Stream for ExExNotificationsWithoutHead<P, E> {
|
||||
type Item = ExExNotification;
|
||||
impl<P: Unpin, E> Stream for ExExNotificationsWithoutHead<P, E>
|
||||
where
|
||||
E: Unpin + BlockExecutorProvider,
|
||||
{
|
||||
type Item = ExExNotification<E::Primitives>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
self.get_mut().notifications.poll_recv(cx)
|
||||
@ -229,12 +251,15 @@ impl<P: Unpin, E: Unpin> Stream for ExExNotificationsWithoutHead<P, E> {
|
||||
/// `exex_head.number` of 10 indicates that the ExEx has processed up to block 10, and is ready to
|
||||
/// process block 11.
|
||||
#[derive(Debug)]
|
||||
pub struct ExExNotificationsWithHead<P, E> {
|
||||
pub struct ExExNotificationsWithHead<P, E>
|
||||
where
|
||||
E: BlockExecutorProvider,
|
||||
{
|
||||
node_head: Head,
|
||||
provider: P,
|
||||
executor: E,
|
||||
notifications: Receiver<ExExNotification>,
|
||||
wal_handle: WalHandle,
|
||||
notifications: Receiver<ExExNotification<E::Primitives>>,
|
||||
wal_handle: WalHandle<E::Primitives>,
|
||||
exex_head: ExExHead,
|
||||
/// If true, then we need to check if the ExEx head is on the canonical chain and if not,
|
||||
/// revert its head.
|
||||
@ -243,17 +268,20 @@ pub struct ExExNotificationsWithHead<P, E> {
|
||||
/// the missing blocks.
|
||||
pending_check_backfill: bool,
|
||||
/// The backfill job to run before consuming any notifications.
|
||||
backfill_job: Option<StreamBackfillJob<E, P, Chain>>,
|
||||
backfill_job: Option<StreamBackfillJob<E, P, Chain<E::Primitives>>>,
|
||||
}
|
||||
|
||||
impl<P, E> ExExNotificationsWithHead<P, E> {
|
||||
impl<P, E> ExExNotificationsWithHead<P, E>
|
||||
where
|
||||
E: BlockExecutorProvider,
|
||||
{
|
||||
/// Creates a new [`ExExNotificationsWithHead`].
|
||||
const fn new(
|
||||
node_head: Head,
|
||||
provider: P,
|
||||
executor: E,
|
||||
notifications: Receiver<ExExNotification>,
|
||||
wal_handle: WalHandle,
|
||||
notifications: Receiver<ExExNotification<E::Primitives>>,
|
||||
wal_handle: WalHandle<E::Primitives>,
|
||||
exex_head: ExExHead,
|
||||
) -> Self {
|
||||
Self {
|
||||
@ -272,20 +300,18 @@ impl<P, E> ExExNotificationsWithHead<P, E> {
|
||||
|
||||
impl<P, E> ExExNotificationsWithHead<P, E>
|
||||
where
|
||||
P: BlockReader<Block = reth_primitives::Block>
|
||||
+ HeaderProvider
|
||||
+ StateProviderFactory
|
||||
P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
|
||||
E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
|
||||
+ Clone
|
||||
+ Unpin
|
||||
+ 'static,
|
||||
E: BlockExecutorProvider + Clone + Unpin + 'static,
|
||||
{
|
||||
/// Checks if the ExEx head is on the canonical chain.
|
||||
///
|
||||
/// If the head block is not found in the database or it's ahead of the node head, it means
|
||||
/// we're not on the canonical chain and we need to revert the notification with the ExEx
|
||||
/// head block.
|
||||
fn check_canonical(&mut self) -> eyre::Result<Option<ExExNotification>> {
|
||||
fn check_canonical(&mut self) -> eyre::Result<Option<ExExNotification<E::Primitives>>> {
|
||||
if self.provider.is_known(&self.exex_head.block.hash)? &&
|
||||
self.exex_head.block.number <= self.node_head.number
|
||||
{
|
||||
@ -309,7 +335,7 @@ where
|
||||
// Update the head block hash to the parent hash of the first committed block.
|
||||
let committed_chain = notification.committed_chain().unwrap();
|
||||
let new_exex_head =
|
||||
(committed_chain.first().parent_hash, committed_chain.first().number - 1).into();
|
||||
(committed_chain.first().parent_hash(), committed_chain.first().number() - 1).into();
|
||||
debug!(target: "exex::notifications", old_exex_head = ?self.exex_head.block, new_exex_head = ?new_exex_head, "ExEx head updated");
|
||||
self.exex_head.block = new_exex_head;
|
||||
|
||||
@ -354,15 +380,13 @@ where
|
||||
|
||||
impl<P, E> Stream for ExExNotificationsWithHead<P, E>
|
||||
where
|
||||
P: BlockReader<Block = reth_primitives::Block>
|
||||
+ HeaderProvider
|
||||
+ StateProviderFactory
|
||||
P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
|
||||
E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
|
||||
+ Clone
|
||||
+ Unpin
|
||||
+ 'static,
|
||||
E: BlockExecutorProvider + Clone + Unpin + 'static,
|
||||
{
|
||||
type Item = eyre::Result<ExExNotification>;
|
||||
type Item = eyre::Result<ExExNotification<E::Primitives>>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let this = self.get_mut();
|
||||
@ -402,7 +426,7 @@ where
|
||||
this.exex_head.block = committed_chain.tip().num_hash();
|
||||
} else if let Some(reverted_chain) = notification.reverted_chain() {
|
||||
let first_block = reverted_chain.first();
|
||||
this.exex_head.block = (first_block.parent_hash, first_block.number - 1).into();
|
||||
this.exex_head.block = (first_block.parent_hash(), first_block.number() - 1).into();
|
||||
}
|
||||
|
||||
Poll::Ready(Some(Ok(notification)))
|
||||
|
||||
@ -3,9 +3,11 @@ use std::{
|
||||
collections::{BinaryHeap, HashSet},
|
||||
};
|
||||
|
||||
use alloy_consensus::BlockHeader;
|
||||
use alloy_eips::BlockNumHash;
|
||||
use alloy_primitives::{map::FbHashMap, BlockNumber, B256};
|
||||
use reth_exex_types::ExExNotification;
|
||||
use reth_node_api::NodePrimitives;
|
||||
|
||||
/// The block cache of the WAL.
|
||||
///
|
||||
@ -91,16 +93,16 @@ impl BlockCache {
|
||||
}
|
||||
|
||||
/// Inserts the blocks from the notification into the cache with the given file ID.
|
||||
pub(super) fn insert_notification_blocks_with_file_id(
|
||||
pub(super) fn insert_notification_blocks_with_file_id<N: NodePrimitives>(
|
||||
&mut self,
|
||||
file_id: u32,
|
||||
notification: &ExExNotification,
|
||||
notification: &ExExNotification<N>,
|
||||
) {
|
||||
let reverted_chain = notification.reverted_chain();
|
||||
let committed_chain = notification.committed_chain();
|
||||
|
||||
let max_block =
|
||||
reverted_chain.iter().chain(&committed_chain).map(|chain| chain.tip().number).max();
|
||||
reverted_chain.iter().chain(&committed_chain).map(|chain| chain.tip().number()).max();
|
||||
if let Some(max_block) = max_block {
|
||||
self.notification_max_blocks.push(Reverse((max_block, file_id)));
|
||||
}
|
||||
@ -108,13 +110,13 @@ impl BlockCache {
|
||||
if let Some(committed_chain) = &committed_chain {
|
||||
for block in committed_chain.blocks().values() {
|
||||
let cached_block = CachedBlock {
|
||||
block: (block.number, block.hash()).into(),
|
||||
parent_hash: block.parent_hash,
|
||||
block: (block.number(), block.hash()).into(),
|
||||
parent_hash: block.parent_hash(),
|
||||
};
|
||||
self.committed_blocks.insert(block.hash(), (file_id, cached_block));
|
||||
}
|
||||
|
||||
self.highest_committed_block_height = Some(committed_chain.tip().number);
|
||||
self.highest_committed_block_height = Some(committed_chain.tip().number());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -3,6 +3,8 @@
|
||||
mod cache;
|
||||
pub use cache::BlockCache;
|
||||
mod storage;
|
||||
use reth_node_api::NodePrimitives;
|
||||
use reth_primitives::EthPrimitives;
|
||||
pub use storage::Storage;
|
||||
mod metrics;
|
||||
use metrics::Metrics;
|
||||
@ -32,23 +34,26 @@ use reth_tracing::tracing::{debug, instrument};
|
||||
/// 2. When the chain is finalized, call [`Wal::finalize`] to prevent the infinite growth of the
|
||||
/// WAL.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Wal {
|
||||
inner: Arc<WalInner>,
|
||||
pub struct Wal<N: NodePrimitives = EthPrimitives> {
|
||||
inner: Arc<WalInner<N>>,
|
||||
}
|
||||
|
||||
impl Wal {
|
||||
impl<N> Wal<N>
|
||||
where
|
||||
N: NodePrimitives,
|
||||
{
|
||||
/// Creates a new instance of [`Wal`].
|
||||
pub fn new(directory: impl AsRef<Path>) -> eyre::Result<Self> {
|
||||
Ok(Self { inner: Arc::new(WalInner::new(directory)?) })
|
||||
}
|
||||
|
||||
/// Returns a read-only handle to the WAL.
|
||||
pub fn handle(&self) -> WalHandle {
|
||||
pub fn handle(&self) -> WalHandle<N> {
|
||||
WalHandle { wal: self.inner.clone() }
|
||||
}
|
||||
|
||||
/// Commits the notification to WAL.
|
||||
pub fn commit(&self, notification: &ExExNotification) -> eyre::Result<()> {
|
||||
pub fn commit(&self, notification: &ExExNotification<N>) -> eyre::Result<()> {
|
||||
self.inner.commit(notification)
|
||||
}
|
||||
|
||||
@ -63,7 +68,7 @@ impl Wal {
|
||||
/// Returns an iterator over all notifications in the WAL.
|
||||
pub fn iter_notifications(
|
||||
&self,
|
||||
) -> eyre::Result<Box<dyn Iterator<Item = eyre::Result<ExExNotification>> + '_>> {
|
||||
) -> eyre::Result<Box<dyn Iterator<Item = eyre::Result<ExExNotification<N>>> + '_>> {
|
||||
self.inner.iter_notifications()
|
||||
}
|
||||
|
||||
@ -75,16 +80,19 @@ impl Wal {
|
||||
|
||||
/// Inner type for the WAL.
|
||||
#[derive(Debug)]
|
||||
struct WalInner {
|
||||
struct WalInner<N: NodePrimitives> {
|
||||
next_file_id: AtomicU32,
|
||||
/// The underlying WAL storage backed by a file.
|
||||
storage: Storage,
|
||||
storage: Storage<N>,
|
||||
/// WAL block cache. See [`cache::BlockCache`] docs for more details.
|
||||
block_cache: RwLock<BlockCache>,
|
||||
metrics: Metrics,
|
||||
}
|
||||
|
||||
impl WalInner {
|
||||
impl<N> WalInner<N>
|
||||
where
|
||||
N: NodePrimitives,
|
||||
{
|
||||
fn new(directory: impl AsRef<Path>) -> eyre::Result<Self> {
|
||||
let mut wal = Self {
|
||||
next_file_id: AtomicU32::new(0),
|
||||
@ -137,7 +145,7 @@ impl WalInner {
|
||||
reverted_block_range = ?notification.reverted_chain().as_ref().map(|chain| chain.range()),
|
||||
committed_block_range = ?notification.committed_chain().as_ref().map(|chain| chain.range())
|
||||
))]
|
||||
fn commit(&self, notification: &ExExNotification) -> eyre::Result<()> {
|
||||
fn commit(&self, notification: &ExExNotification<N>) -> eyre::Result<()> {
|
||||
let mut block_cache = self.block_cache.write();
|
||||
|
||||
let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed);
|
||||
@ -187,7 +195,7 @@ impl WalInner {
|
||||
/// Returns an iterator over all notifications in the WAL.
|
||||
fn iter_notifications(
|
||||
&self,
|
||||
) -> eyre::Result<Box<dyn Iterator<Item = eyre::Result<ExExNotification>> + '_>> {
|
||||
) -> eyre::Result<Box<dyn Iterator<Item = eyre::Result<ExExNotification<N>>> + '_>> {
|
||||
let Some(range) = self.storage.files_range()? else {
|
||||
return Ok(Box::new(std::iter::empty()))
|
||||
};
|
||||
@ -198,16 +206,19 @@ impl WalInner {
|
||||
|
||||
/// A read-only handle to the WAL that can be shared.
|
||||
#[derive(Debug)]
|
||||
pub struct WalHandle {
|
||||
wal: Arc<WalInner>,
|
||||
pub struct WalHandle<N: NodePrimitives> {
|
||||
wal: Arc<WalInner<N>>,
|
||||
}
|
||||
|
||||
impl WalHandle {
|
||||
impl<N> WalHandle<N>
|
||||
where
|
||||
N: NodePrimitives,
|
||||
{
|
||||
/// Returns the notification for the given committed block hash if it exists.
|
||||
pub fn get_committed_notification_by_block_hash(
|
||||
&self,
|
||||
block_hash: &B256,
|
||||
) -> eyre::Result<Option<ExExNotification>> {
|
||||
) -> eyre::Result<Option<ExExNotification<N>>> {
|
||||
let Some(file_id) = self.wal.block_cache().get_file_id_by_committed_block_hash(block_hash)
|
||||
else {
|
||||
return Ok(None)
|
||||
|
||||
@ -6,6 +6,8 @@ use std::{
|
||||
|
||||
use eyre::OptionExt;
|
||||
use reth_exex_types::ExExNotification;
|
||||
use reth_node_api::NodePrimitives;
|
||||
use reth_primitives::EthPrimitives;
|
||||
use reth_tracing::tracing::debug;
|
||||
use tracing::instrument;
|
||||
|
||||
@ -16,18 +18,22 @@ static FILE_EXTENSION: &str = "wal";
|
||||
/// Each notification is represented by a single file that contains a MessagePack-encoded
|
||||
/// notification.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Storage {
|
||||
pub struct Storage<N: NodePrimitives = EthPrimitives> {
|
||||
/// The path to the WAL file.
|
||||
path: PathBuf,
|
||||
_pd: std::marker::PhantomData<N>,
|
||||
}
|
||||
|
||||
impl Storage {
|
||||
impl<N> Storage<N>
|
||||
where
|
||||
N: NodePrimitives,
|
||||
{
|
||||
/// Creates a new instance of [`Storage`] backed by the file at the given path and creates
|
||||
/// it doesn't exist.
|
||||
pub(super) fn new(path: impl AsRef<Path>) -> eyre::Result<Self> {
|
||||
reth_fs_util::create_dir_all(&path)?;
|
||||
|
||||
Ok(Self { path: path.as_ref().to_path_buf() })
|
||||
Ok(Self { path: path.as_ref().to_path_buf(), _pd: std::marker::PhantomData })
|
||||
}
|
||||
|
||||
fn file_path(&self, id: u32) -> PathBuf {
|
||||
@ -110,7 +116,7 @@ impl Storage {
|
||||
pub(super) fn iter_notifications(
|
||||
&self,
|
||||
range: RangeInclusive<u32>,
|
||||
) -> impl Iterator<Item = eyre::Result<(u32, u64, ExExNotification)>> + '_ {
|
||||
) -> impl Iterator<Item = eyre::Result<(u32, u64, ExExNotification<N>)>> + '_ {
|
||||
range.map(move |id| {
|
||||
let (notification, size) =
|
||||
self.read_notification(id)?.ok_or_eyre("notification {id} not found")?;
|
||||
@ -124,7 +130,7 @@ impl Storage {
|
||||
pub(super) fn read_notification(
|
||||
&self,
|
||||
file_id: u32,
|
||||
) -> eyre::Result<Option<(ExExNotification, u64)>> {
|
||||
) -> eyre::Result<Option<(ExExNotification<N>, u64)>> {
|
||||
let file_path = self.file_path(file_id);
|
||||
debug!(target: "exex::wal::storage", ?file_path, "Reading notification from WAL");
|
||||
|
||||
@ -136,7 +142,7 @@ impl Storage {
|
||||
let size = file.metadata()?.len();
|
||||
|
||||
// Deserialize using the bincode- and msgpack-compatible serde wrapper
|
||||
let notification: reth_exex_types::serde_bincode_compat::ExExNotification<'_> =
|
||||
let notification: reth_exex_types::serde_bincode_compat::ExExNotification<'_, N> =
|
||||
rmp_serde::decode::from_read(&mut file).map_err(|err| {
|
||||
eyre::eyre!("failed to decode notification from {file_path:?}: {err:?}")
|
||||
})?;
|
||||
@ -153,14 +159,14 @@ impl Storage {
|
||||
pub(super) fn write_notification(
|
||||
&self,
|
||||
file_id: u32,
|
||||
notification: &ExExNotification,
|
||||
notification: &ExExNotification<N>,
|
||||
) -> eyre::Result<u64> {
|
||||
let file_path = self.file_path(file_id);
|
||||
debug!(target: "exex::wal::storage", ?file_path, "Writing notification to WAL");
|
||||
|
||||
// Serialize using the bincode- and msgpack-compatible serde wrapper
|
||||
let notification =
|
||||
reth_exex_types::serde_bincode_compat::ExExNotification::from(notification);
|
||||
reth_exex_types::serde_bincode_compat::ExExNotification::<N>::from(notification);
|
||||
|
||||
reth_fs_util::atomic_write_file(&file_path, |file| {
|
||||
rmp_serde::encode::write(file, ¬ification)
|
||||
@ -186,7 +192,7 @@ mod tests {
|
||||
let mut rng = generators::rng();
|
||||
|
||||
let temp_dir = tempfile::tempdir()?;
|
||||
let storage = Storage::new(&temp_dir)?;
|
||||
let storage: Storage = Storage::new(&temp_dir)?;
|
||||
|
||||
let old_block = random_block(&mut rng, 0, Default::default())
|
||||
.seal_with_senders()
|
||||
@ -215,7 +221,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_files_range() -> eyre::Result<()> {
|
||||
let temp_dir = tempfile::tempdir()?;
|
||||
let storage = Storage::new(&temp_dir)?;
|
||||
let storage: Storage = Storage::new(&temp_dir)?;
|
||||
|
||||
// Create WAL files
|
||||
File::create(storage.file_path(1))?;
|
||||
|
||||
@ -80,7 +80,7 @@ pub struct TestExecutorBuilder;
|
||||
|
||||
impl<Node> ExecutorBuilder<Node> for TestExecutorBuilder
|
||||
where
|
||||
Node: FullNodeTypes<Types: NodeTypes<ChainSpec = ChainSpec>>,
|
||||
Node: FullNodeTypes<Types: NodeTypes<ChainSpec = ChainSpec, Primitives = EthPrimitives>>,
|
||||
{
|
||||
type EVM = EthEvmConfig;
|
||||
type Executor = MockExecutorProvider;
|
||||
|
||||
@ -7,30 +7,30 @@ use reth_primitives_traits::NodePrimitives;
|
||||
/// Notifications sent to an `ExEx`.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
pub enum ExExNotification<P: NodePrimitives = reth_chain_state::EthPrimitives> {
|
||||
pub enum ExExNotification<N: NodePrimitives = reth_chain_state::EthPrimitives> {
|
||||
/// Chain got committed without a reorg, and only the new chain is returned.
|
||||
ChainCommitted {
|
||||
/// The new chain after commit.
|
||||
new: Arc<Chain<P>>,
|
||||
new: Arc<Chain<N>>,
|
||||
},
|
||||
/// Chain got reorged, and both the old and the new chains are returned.
|
||||
ChainReorged {
|
||||
/// The old chain before reorg.
|
||||
old: Arc<Chain<P>>,
|
||||
old: Arc<Chain<N>>,
|
||||
/// The new chain after reorg.
|
||||
new: Arc<Chain<P>>,
|
||||
new: Arc<Chain<N>>,
|
||||
},
|
||||
/// Chain got reverted, and only the old chain is returned.
|
||||
ChainReverted {
|
||||
/// The old chain before reversion.
|
||||
old: Arc<Chain<P>>,
|
||||
old: Arc<Chain<N>>,
|
||||
},
|
||||
}
|
||||
|
||||
impl ExExNotification {
|
||||
impl<N: NodePrimitives> ExExNotification<N> {
|
||||
/// Returns the committed chain from the [`Self::ChainCommitted`] and [`Self::ChainReorged`]
|
||||
/// variants, if any.
|
||||
pub fn committed_chain(&self) -> Option<Arc<Chain>> {
|
||||
pub fn committed_chain(&self) -> Option<Arc<Chain<N>>> {
|
||||
match self {
|
||||
Self::ChainCommitted { new } | Self::ChainReorged { old: _, new } => Some(new.clone()),
|
||||
Self::ChainReverted { .. } => None,
|
||||
@ -39,7 +39,7 @@ impl ExExNotification {
|
||||
|
||||
/// Returns the reverted chain from the [`Self::ChainReorged`] and [`Self::ChainReverted`]
|
||||
/// variants, if any.
|
||||
pub fn reverted_chain(&self) -> Option<Arc<Chain>> {
|
||||
pub fn reverted_chain(&self) -> Option<Arc<Chain<N>>> {
|
||||
match self {
|
||||
Self::ChainReorged { old, new: _ } | Self::ChainReverted { old } => Some(old.clone()),
|
||||
Self::ChainCommitted { .. } => None,
|
||||
|
||||
Reference in New Issue
Block a user