fix(exex): skipping logic of the notifications (#7919)

This commit is contained in:
Alexey Shekhirin
2024-04-26 16:14:35 +01:00
committed by GitHub
parent 953ba043ad
commit 51bdc6afe8

View File

@ -84,31 +84,53 @@ impl ExExHandle {
fn send(
&mut self,
cx: &mut Context<'_>,
(event_id, notification): &(usize, ExExNotification),
(notification_id, notification): &(usize, ExExNotification),
) -> Poll<Result<(), PollSendError<ExExNotification>>> {
// check that this notification is above the finished height of the exex if the exex has set
// one
if let Some(finished_height) = self.finished_height {
match notification {
ExExNotification::ChainCommitted { new } |
ExExNotification::ChainReorged { old: _, new }
if finished_height >= new.tip().number =>
{
self.next_notification_id = event_id + 1;
return Poll::Ready(Ok(()))
ExExNotification::ChainCommitted { new } => {
// Skip the chain commit notification if the finished height of the ExEx is
// higher than or equal to the tip of the new notification.
// I.e., the ExEx has already processed the notification.
if finished_height >= new.tip().number {
debug!(
exex_id = %self.id,
%notification_id,
%finished_height,
new_tip = %new.tip().number,
"Skipping notification"
);
self.next_notification_id = notification_id + 1;
return Poll::Ready(Ok(()))
}
}
_ => (),
// Do not handle [ExExNotification::ChainReorged] and
// [ExExNotification::ChainReverted] cases and always send the
// notification, because the ExEx should be aware of the reorgs and reverts lower
// than its finished height
ExExNotification::ChainReorged { .. } | ExExNotification::ChainReverted { .. } => {}
}
}
debug!(
exex_id = %self.id,
%notification_id,
"Reserving slot for notification"
);
match self.sender.poll_reserve(cx) {
Poll::Ready(Ok(())) => (),
other => return other,
}
debug!(
exex_id = %self.id,
%notification_id,
"Sending notification"
);
match self.sender.send_item(notification.clone()) {
Ok(()) => {
self.next_notification_id = event_id + 1;
self.next_notification_id = notification_id + 1;
self.metrics.notifications_sent_total.increment(1);
Poll::Ready(Ok(()))
}
@ -263,7 +285,11 @@ impl Future for ExExManager {
// drain handle notifications
while self.buffer.len() < self.max_capacity {
if let Poll::Ready(Some(notification)) = self.handle_rx.poll_recv(cx) {
debug!("received new notification");
debug!(
committed_tip = ?notification.committed_chain().map(|chain| chain.tip().number),
reverted_tip = ?notification.reverted_chain().map(|chain| chain.tip().number),
"Received new notification"
);
self.push_notification(notification);
continue
}
@ -285,11 +311,6 @@ impl Future for ExExManager {
.checked_sub(self.min_id)
.expect("exex expected notification ID outside the manager's range");
if let Some(notification) = self.buffer.get(notification_index) {
debug!(
exex.id,
notification_id = exex.next_notification_id,
"sent notification to exex"
);
if let Poll::Ready(Err(err)) = exex.send(cx, notification) {
// the channel was closed, which is irrecoverable for the manager
return Poll::Ready(Err(err.into()))
@ -300,9 +321,9 @@ impl Future for ExExManager {
}
// remove processed buffered notifications
debug!(%min_id, "Updating lowest notification id in buffer");
self.buffer.retain(|&(id, _)| id >= min_id);
self.min_id = min_id;
debug!(min_id, "lowest notification id in buffer updated");
// update capacity
self.update_capacity();
@ -310,7 +331,7 @@ impl Future for ExExManager {
// handle incoming exex events
for exex in self.exex_handles.iter_mut() {
while let Poll::Ready(Some(event)) = exex.receiver.poll_recv(cx) {
debug!(?event, id = exex.id, "received event from exex");
debug!(exex_id = exex.id, ?event, "Received event from exex");
exex.metrics.events_sent_total.increment(1);
match event {
ExExEvent::FinishedHeight(height) => exex.finished_height = Some(height),