fix(exex): properly check ready state in poll_ready (#7772)

This commit is contained in:
Oliver Nordbjerg
2024-04-20 11:00:11 +02:00
committed by GitHub
parent db5bd06851
commit 615e90b0f8
3 changed files with 22 additions and 25 deletions

2
Cargo.lock generated
View File

@ -6626,7 +6626,6 @@ name = "reth-exex"
version = "0.2.0-beta.5" version = "0.2.0-beta.5"
dependencies = [ dependencies = [
"eyre", "eyre",
"futures",
"metrics", "metrics",
"reth-config", "reth-config",
"reth-metrics", "reth-metrics",
@ -6637,7 +6636,6 @@ dependencies = [
"reth-tasks", "reth-tasks",
"reth-tracing", "reth-tracing",
"tokio", "tokio",
"tokio-stream",
"tokio-util", "tokio-util",
] ]

View File

@ -23,9 +23,7 @@ reth-tasks.workspace = true
reth-tracing.workspace = true reth-tracing.workspace = true
## async ## async
futures.workspace = true
tokio.workspace = true tokio.workspace = true
tokio-stream.workspace = true
tokio-util.workspace = true tokio-util.workspace = true
## misc ## misc

View File

@ -12,14 +12,13 @@ use std::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
Arc, Arc,
}, },
task::{Context, Poll}, task::{ready, Context, Poll},
}; };
use tokio::sync::{ use tokio::sync::{
mpsc::{self, error::SendError, Receiver, UnboundedReceiver, UnboundedSender}, mpsc::{self, error::SendError, Receiver, UnboundedReceiver, UnboundedSender},
watch, watch,
}; };
use tokio_stream::wrappers::WatchStream; use tokio_util::sync::{PollSendError, PollSender, ReusableBoxFuture};
use tokio_util::sync::{PollSendError, PollSender};
/// Metrics for an ExEx. /// Metrics for an ExEx.
#[derive(Metrics)] #[derive(Metrics)]
@ -217,7 +216,7 @@ impl ExExManager {
exex_tx: handle_tx, exex_tx: handle_tx,
num_exexs, num_exexs,
is_ready_receiver: is_ready_rx.clone(), is_ready_receiver: is_ready_rx.clone(),
is_ready: WatchStream::new(is_ready_rx), is_ready: ReusableBoxFuture::new(make_wait_future(is_ready_rx)),
current_capacity, current_capacity,
finished_height: finished_height_rx, finished_height: finished_height_rx,
}, },
@ -340,13 +339,13 @@ pub struct ExExManagerHandle {
num_exexs: usize, num_exexs: usize,
/// A watch channel denoting whether the manager is ready for new notifications or not. /// A watch channel denoting whether the manager is ready for new notifications or not.
/// ///
/// This is stored internally alongside a `WatchStream` representation of the same value. This /// This is stored internally alongside a `ReusableBoxFuture` representation of the same value.
/// field is only used to create a new `WatchStream` when the handle is cloned, but is /// This field is only used to create a new `ReusableBoxFuture` when the handle is cloned,
/// otherwise unused. /// but is otherwise unused.
is_ready_receiver: watch::Receiver<bool>, is_ready_receiver: watch::Receiver<bool>,
/// A stream of bools denoting whether the manager is ready for new notifications. /// A reusable future that resolves when the manager is ready for new
#[allow(unused)] /// notifications.
is_ready: WatchStream<bool>, is_ready: ReusableBoxFuture<'static, watch::Receiver<bool>>,
/// The current capacity of the manager's internal notification buffer. /// The current capacity of the manager's internal notification buffer.
current_capacity: Arc<AtomicUsize>, current_capacity: Arc<AtomicUsize>,
/// The finished height of all ExEx's. /// The finished height of all ExEx's.
@ -368,7 +367,7 @@ impl ExExManagerHandle {
exex_tx, exex_tx,
num_exexs: 0, num_exexs: 0,
is_ready_receiver: is_ready_rx.clone(), is_ready_receiver: is_ready_rx.clone(),
is_ready: WatchStream::new(is_ready_rx), is_ready: ReusableBoxFuture::new(make_wait_future(is_ready_rx)),
current_capacity: Arc::new(AtomicUsize::new(0)), current_capacity: Arc::new(AtomicUsize::new(0)),
finished_height: finished_height_rx, finished_height: finished_height_rx,
} }
@ -425,26 +424,28 @@ impl ExExManagerHandle {
} }
/// Wait until the manager is ready for new notifications. /// Wait until the manager is ready for new notifications.
#[allow(clippy::needless_pass_by_ref_mut)]
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()> { pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()> {
use futures as _; let rx = ready!(self.is_ready.poll(cx));
// FIXME: if not ready this must be polled self.is_ready.set(make_wait_future(rx));
if *self.is_ready_receiver.borrow() { Poll::Ready(())
Poll::Ready(())
} else {
cx.waker().wake_by_ref();
Poll::Pending
}
} }
} }
/// Creates a future that resolves once the given watch channel receiver is true.
async fn make_wait_future(mut rx: watch::Receiver<bool>) -> watch::Receiver<bool> {
// NOTE(onbjerg): We can ignore the error here, because if the channel is closed, the node
// is shutting down.
let _ = rx.wait_for(|ready| *ready).await;
rx
}
impl Clone for ExExManagerHandle { impl Clone for ExExManagerHandle {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
exex_tx: self.exex_tx.clone(), exex_tx: self.exex_tx.clone(),
num_exexs: self.num_exexs, num_exexs: self.num_exexs,
is_ready_receiver: self.is_ready_receiver.clone(), is_ready_receiver: self.is_ready_receiver.clone(),
is_ready: WatchStream::new(self.is_ready_receiver.clone()), is_ready: ReusableBoxFuture::new(make_wait_future(self.is_ready_receiver.clone())),
current_capacity: self.current_capacity.clone(), current_capacity: self.current_capacity.clone(),
finished_height: self.finished_height.clone(), finished_height: self.finished_height.clone(),
} }