mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat(exex): commit notifications to WAL before sending to ExExes (#11354)
This commit is contained in:
@ -115,6 +115,7 @@ impl ExExHandle {
|
||||
// I.e., the ExEx has already processed the notification.
|
||||
if finished_height.number >= new.tip().number {
|
||||
debug!(
|
||||
target: "exex::manager",
|
||||
exex_id = %self.id,
|
||||
%notification_id,
|
||||
?finished_height,
|
||||
@ -135,6 +136,7 @@ impl ExExHandle {
|
||||
}
|
||||
|
||||
debug!(
|
||||
target: "exex::manager",
|
||||
exex_id = %self.id,
|
||||
%notification_id,
|
||||
"Reserving slot for notification"
|
||||
@ -145,6 +147,7 @@ impl ExExHandle {
|
||||
}
|
||||
|
||||
debug!(
|
||||
target: "exex::manager",
|
||||
exex_id = %self.id,
|
||||
%notification_id,
|
||||
"Sending notification"
|
||||
@ -327,7 +330,7 @@ where
|
||||
/// This function checks if all ExExes are on the canonical chain and finalizes the WAL if
|
||||
/// necessary.
|
||||
fn finalize_wal(&self, finalized_header: SealedHeader) -> eyre::Result<()> {
|
||||
debug!(header = ?finalized_header.num_hash(), "Received finalized header");
|
||||
debug!(target: "exex::manager", header = ?finalized_header.num_hash(), "Received finalized header");
|
||||
|
||||
// Check if all ExExes are on the canonical chain
|
||||
let exex_finished_heights = self
|
||||
@ -368,9 +371,13 @@ where
|
||||
is_canonical.not().then_some((exex_id, num_hash))
|
||||
})
|
||||
.format_with(", ", |(exex_id, num_hash), f| {
|
||||
f(&format_args!("{exex_id:?} = {num_hash:?}"))
|
||||
});
|
||||
f(&format_args!("{exex_id} = {num_hash:?}"))
|
||||
})
|
||||
// We need this because `debug!` uses the argument twice when formatting the final
|
||||
// log message, but the result of `format_with` can only be used once
|
||||
.to_string();
|
||||
debug!(
|
||||
target: "exex::manager",
|
||||
%unfinalized_exexes,
|
||||
"Not all ExExes are on the canonical chain, can't finalize the WAL"
|
||||
);
|
||||
@ -403,7 +410,7 @@ where
|
||||
// Handle incoming ExEx events
|
||||
for exex in &mut this.exex_handles {
|
||||
while let Poll::Ready(Some(event)) = exex.receiver.poll_recv(cx) {
|
||||
debug!(exex_id = %exex.id, ?event, "Received event from ExEx");
|
||||
debug!(target: "exex::manager", exex_id = %exex.id, ?event, "Received event from ExEx");
|
||||
exex.metrics.events_sent_total.increment(1);
|
||||
match event {
|
||||
ExExEvent::FinishedHeight(height) => exex.finished_height = Some(height),
|
||||
@ -424,10 +431,12 @@ where
|
||||
while this.buffer.len() < this.max_capacity {
|
||||
if let Poll::Ready(Some(notification)) = this.handle_rx.poll_recv(cx) {
|
||||
debug!(
|
||||
target: "exex::manager",
|
||||
committed_tip = ?notification.committed_chain().map(|chain| chain.tip().number),
|
||||
reverted_tip = ?notification.reverted_chain().map(|chain| chain.tip().number),
|
||||
"Received new notification"
|
||||
);
|
||||
this.wal.commit(¬ification)?;
|
||||
this.push_notification(notification);
|
||||
continue
|
||||
}
|
||||
@ -459,7 +468,7 @@ where
|
||||
}
|
||||
|
||||
// Remove processed buffered notifications
|
||||
debug!(%min_id, "Updating lowest notification id in buffer");
|
||||
debug!(target: "exex::manager", %min_id, "Updating lowest notification id in buffer");
|
||||
this.buffer.retain(|&(id, _)| id >= min_id);
|
||||
this.min_id = min_id;
|
||||
|
||||
@ -602,7 +611,7 @@ mod tests {
|
||||
use super::*;
|
||||
use alloy_primitives::B256;
|
||||
use eyre::OptionExt;
|
||||
use futures::StreamExt;
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use rand::Rng;
|
||||
use reth_primitives::SealedBlockWithSenders;
|
||||
use reth_provider::{test_utils::create_test_provider_factory, BlockWriter, Chain};
|
||||
@ -1121,7 +1130,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_exex_wal_finalize() -> eyre::Result<()> {
|
||||
async fn test_exex_wal() -> eyre::Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
let mut rng = generators::rng();
|
||||
@ -1141,12 +1150,11 @@ mod tests {
|
||||
let notification = ExExNotification::ChainCommitted {
|
||||
new: Arc::new(Chain::new(vec![block.clone()], Default::default(), None)),
|
||||
};
|
||||
wal.commit(¬ification)?;
|
||||
|
||||
let (finalized_headers_tx, rx) = watch::channel(None);
|
||||
let finalized_header_stream = ForkChoiceStream::new(rx);
|
||||
|
||||
let (exex_handle, events_tx, _) =
|
||||
let (exex_handle, events_tx, mut notifications) =
|
||||
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
|
||||
|
||||
let mut exex_manager = std::pin::pin!(ExExManager::new(
|
||||
@ -1159,7 +1167,13 @@ mod tests {
|
||||
|
||||
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
|
||||
|
||||
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
|
||||
exex_manager.handle().send(notification.clone())?;
|
||||
|
||||
assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
|
||||
assert_eq!(
|
||||
notifications.next().poll_unpin(&mut cx),
|
||||
Poll::Ready(Some(notification.clone()))
|
||||
);
|
||||
assert_eq!(
|
||||
exex_manager.wal.iter_notifications()?.collect::<eyre::Result<Vec<_>>>()?,
|
||||
[notification.clone()]
|
||||
|
||||
@ -223,21 +223,19 @@ where
|
||||
/// - ExEx is at the same block number as the node head (`node_head.number ==
|
||||
/// exex_head.number`). Nothing to do.
|
||||
fn check_backfill(&mut self) -> eyre::Result<()> {
|
||||
debug!(target: "exex::manager", "Synchronizing ExEx head");
|
||||
|
||||
let backfill_job_factory =
|
||||
BackfillJobFactory::new(self.executor.clone(), self.provider.clone());
|
||||
match self.exex_head.block.number.cmp(&self.node_head.number) {
|
||||
std::cmp::Ordering::Less => {
|
||||
// ExEx is behind the node head, start backfill
|
||||
debug!(target: "exex::manager", "ExEx is behind the node head and on the canonical chain, starting backfill");
|
||||
debug!(target: "exex::notifications", "ExEx is behind the node head and on the canonical chain, starting backfill");
|
||||
let backfill = backfill_job_factory
|
||||
.backfill(self.exex_head.block.number + 1..=self.node_head.number)
|
||||
.into_stream();
|
||||
self.backfill_job = Some(backfill);
|
||||
}
|
||||
std::cmp::Ordering::Equal => {
|
||||
debug!(target: "exex::manager", "ExEx is at the node head");
|
||||
debug!(target: "exex::notifications", "ExEx is at the node head");
|
||||
}
|
||||
std::cmp::Ordering::Greater => {
|
||||
return Err(eyre::eyre!("ExEx is ahead of the node head"))
|
||||
|
||||
@ -96,7 +96,7 @@ impl WalInner {
|
||||
}
|
||||
|
||||
/// Fills the block cache with the notifications from the storage.
|
||||
#[instrument(target = "exex::wal", skip(self))]
|
||||
#[instrument(skip(self))]
|
||||
fn fill_block_cache(&mut self) -> eyre::Result<()> {
|
||||
let Some(files_range) = self.storage.files_range()? else { return Ok(()) };
|
||||
self.next_file_id.store(files_range.end() + 1, Ordering::Relaxed);
|
||||
@ -128,7 +128,7 @@ impl WalInner {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(target = "exex::wal", skip_all, fields(
|
||||
#[instrument(skip_all, fields(
|
||||
reverted_block_range = ?notification.reverted_chain().as_ref().map(|chain| chain.range()),
|
||||
committed_block_range = ?notification.committed_chain().as_ref().map(|chain| chain.range())
|
||||
))]
|
||||
@ -138,7 +138,7 @@ impl WalInner {
|
||||
let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed);
|
||||
let size = self.storage.write_notification(file_id, notification)?;
|
||||
|
||||
debug!(?file_id, "Inserting notification blocks into the block cache");
|
||||
debug!(target: "exex::wal", ?file_id, "Inserting notification blocks into the block cache");
|
||||
block_cache.insert_notification_blocks_with_file_id(file_id, notification);
|
||||
|
||||
self.update_metrics(&block_cache, size as i64);
|
||||
@ -146,19 +146,19 @@ impl WalInner {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(target = "exex::wal", skip(self))]
|
||||
#[instrument(skip(self))]
|
||||
fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> {
|
||||
let mut block_cache = self.block_cache.write();
|
||||
let file_ids = block_cache.remove_before(to_block.number);
|
||||
|
||||
// Remove notifications from the storage.
|
||||
if file_ids.is_empty() {
|
||||
debug!("No notifications were finalized from the storage");
|
||||
debug!(target: "exex::wal", "No notifications were finalized from the storage");
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
let (removed_notifications, removed_size) = self.storage.remove_notifications(file_ids)?;
|
||||
debug!(?removed_notifications, ?removed_size, "Storage was finalized");
|
||||
debug!(target: "exex::wal", ?removed_notifications, ?removed_size, "Storage was finalized");
|
||||
|
||||
self.update_metrics(&block_cache, -(removed_size as i64));
|
||||
|
||||
|
||||
@ -44,18 +44,18 @@ impl Storage {
|
||||
/// # Returns
|
||||
///
|
||||
/// The size of the file that was removed in bytes, if any.
|
||||
#[instrument(target = "exex::wal::storage", skip(self))]
|
||||
#[instrument(skip(self))]
|
||||
fn remove_notification(&self, file_id: u32) -> Option<u64> {
|
||||
let path = self.file_path(file_id);
|
||||
let size = path.metadata().ok()?.len();
|
||||
|
||||
match reth_fs_util::remove_file(self.file_path(file_id)) {
|
||||
Ok(()) => {
|
||||
debug!("Notification was removed from the storage");
|
||||
debug!(target: "exex::wal::storage", "Notification was removed from the storage");
|
||||
Some(size)
|
||||
}
|
||||
Err(err) => {
|
||||
debug!(?err, "Failed to remove notification from the storage");
|
||||
debug!(target: "exex::wal::storage", ?err, "Failed to remove notification from the storage");
|
||||
None
|
||||
}
|
||||
}
|
||||
@ -108,31 +108,33 @@ impl Storage {
|
||||
) -> impl Iterator<Item = eyre::Result<(u32, u64, ExExNotification)>> + '_ {
|
||||
range.map(move |id| {
|
||||
let (notification, size) =
|
||||
self.read_notification(id)?.ok_or_eyre("notification not found")?;
|
||||
self.read_notification(id)?.ok_or_eyre("notification {id} not found")?;
|
||||
|
||||
Ok((id, size, notification))
|
||||
})
|
||||
}
|
||||
|
||||
/// Reads the notification from the file with the given ID.
|
||||
#[instrument(target = "exex::wal::storage", skip(self))]
|
||||
#[instrument(skip(self))]
|
||||
pub(super) fn read_notification(
|
||||
&self,
|
||||
file_id: u32,
|
||||
) -> eyre::Result<Option<(ExExNotification, u64)>> {
|
||||
let file_path = self.file_path(file_id);
|
||||
debug!(?file_path, "Reading notification from WAL");
|
||||
debug!(target: "exex::wal::storage", ?file_path, "Reading notification from WAL");
|
||||
|
||||
let mut file = match File::open(&file_path) {
|
||||
Ok(file) => file,
|
||||
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
|
||||
Err(err) => return Err(err.into()),
|
||||
Err(err) => return Err(reth_fs_util::FsPathError::open(err, &file_path).into()),
|
||||
};
|
||||
let size = file.metadata()?.len();
|
||||
|
||||
// Deserialize using the bincode- and msgpack-compatible serde wrapper
|
||||
let notification: reth_exex_types::serde_bincode_compat::ExExNotification<'_> =
|
||||
rmp_serde::decode::from_read(&mut file)?;
|
||||
rmp_serde::decode::from_read(&mut file).map_err(|err| {
|
||||
eyre::eyre!("failed to decode notification from {file_path:?}: {err:?}")
|
||||
})?;
|
||||
|
||||
Ok(Some((notification.into(), size)))
|
||||
}
|
||||
@ -142,14 +144,14 @@ impl Storage {
|
||||
/// # Returns
|
||||
///
|
||||
/// The size of the file that was written in bytes.
|
||||
#[instrument(target = "exex::wal::storage", skip(self, notification))]
|
||||
#[instrument(skip(self, notification))]
|
||||
pub(super) fn write_notification(
|
||||
&self,
|
||||
file_id: u32,
|
||||
notification: &ExExNotification,
|
||||
) -> eyre::Result<u64> {
|
||||
let file_path = self.file_path(file_id);
|
||||
debug!(?file_path, "Writing notification to WAL");
|
||||
debug!(target: "exex::wal::storage", ?file_path, "Writing notification to WAL");
|
||||
|
||||
// Serialize using the bincode- and msgpack-compatible serde wrapper
|
||||
let notification =
|
||||
|
||||
Reference in New Issue
Block a user