From d599ce46835891a1afe7970ac8b0cab4efe8b391 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Fri, 19 Apr 2024 23:18:33 +0200 Subject: [PATCH] fix: dont await changes (#7760) --- crates/exex/src/manager.rs | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/crates/exex/src/manager.rs b/crates/exex/src/manager.rs index a89b09171..9345e2180 100644 --- a/crates/exex/src/manager.rs +++ b/crates/exex/src/manager.rs @@ -1,3 +1,9 @@ +use crate::ExExEvent; +use metrics::Gauge; +use reth_metrics::{metrics::Counter, Metrics}; +use reth_primitives::{BlockNumber, FinishedExExHeight}; +use reth_provider::CanonStateNotification; +use reth_tracing::tracing::debug; use std::{ collections::VecDeque, future::{poll_fn, Future}, @@ -8,14 +14,6 @@ use std::{ }, task::{Context, Poll}, }; - -use crate::ExExEvent; -use futures::StreamExt; -use metrics::Gauge; -use reth_metrics::{metrics::Counter, Metrics}; -use reth_primitives::{BlockNumber, FinishedExExHeight}; -use reth_provider::CanonStateNotification; -use reth_tracing::tracing::debug; use tokio::sync::{ mpsc::{self, error::SendError, Receiver, UnboundedReceiver, UnboundedSender}, watch, @@ -347,6 +345,7 @@ pub struct ExExManagerHandle { /// otherwise unused. is_ready_receiver: watch::Receiver, /// A stream of bools denoting whether the manager is ready for new notifications. + #[allow(unused)] is_ready: WatchStream, /// The current capacity of the manager's internal notification buffer. current_capacity: Arc, @@ -426,15 +425,14 @@ impl ExExManagerHandle { } /// Wait until the manager is ready for new notifications. + #[allow(clippy::needless_pass_by_ref_mut)] pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()> { - // if this returns `Poll::Ready(None)` the stream is exhausted, which means the underlying - // channel is closed. - // - // this can only happen if the manager died, and the node is shutting down, so we ignore it - let mut pinned = std::pin::pin!(&mut self.is_ready); - if pinned.poll_next_unpin(cx) == Poll::Ready(Some(true)) { + use futures as _; + // FIXME: if not ready this must be polled + if *self.is_ready_receiver.borrow() { Poll::Ready(()) } else { + cx.waker().wake_by_ref(); Poll::Pending } }