dev: update NodeExitFuture (#9153)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
greged93
2024-06-28 10:13:40 +02:00
committed by GitHub
parent ce8bcd8f1c
commit c12ca99438
4 changed files with 31 additions and 29 deletions

1
Cargo.lock generated
View File

@ -7597,7 +7597,6 @@ dependencies = [
"procfs", "procfs",
"proptest", "proptest",
"rand 0.8.5", "rand 0.8.5",
"reth-beacon-consensus",
"reth-chainspec", "reth-chainspec",
"reth-config", "reth-config",
"reth-consensus-common", "reth-consensus-common",

View File

@ -36,7 +36,6 @@ reth-net-nat.workspace = true
reth-network-peers.workspace = true reth-network-peers.workspace = true
reth-tasks.workspace = true reth-tasks.workspace = true
reth-consensus-common.workspace = true reth-consensus-common.workspace = true
reth-beacon-consensus.workspace = true
reth-prune-types.workspace = true reth-prune-types.workspace = true
reth-stages-types.workspace = true reth-stages-types.workspace = true
@ -102,7 +101,6 @@ optimism = [
"reth-primitives/optimism", "reth-primitives/optimism",
"reth-provider/optimism", "reth-provider/optimism",
"reth-rpc-types-compat/optimism", "reth-rpc-types-compat/optimism",
"reth-beacon-consensus/optimism",
"reth-rpc-eth-api/optimism", "reth-rpc-eth-api/optimism",
"reth-rpc-eth-types/optimism" "reth-rpc-eth-types/optimism"
] ]

View File

@ -1,32 +1,39 @@
//! Helper types for waiting for the node to exit. //! Helper types for waiting for the node to exit.
use futures::FutureExt; use futures::{future::BoxFuture, FutureExt};
use reth_beacon_consensus::BeaconConsensusEngineError;
use std::{ use std::{
fmt,
future::Future, future::Future,
pin::Pin, pin::Pin,
task::{ready, Context, Poll}, task::{ready, Context, Poll},
}; };
use tokio::sync::oneshot;
/// A Future which resolves when the node exits /// A Future which resolves when the node exits
#[derive(Debug)]
pub struct NodeExitFuture { pub struct NodeExitFuture {
/// The receiver half of the channel for the consensus engine. /// The consensus engine future.
/// This can be used to wait for the consensus engine to exit. /// This can be polled to wait for the consensus engine to exit.
consensus_engine_rx: Option<oneshot::Receiver<Result<(), BeaconConsensusEngineError>>>, consensus_engine_fut: Option<BoxFuture<'static, eyre::Result<()>>>,
/// Flag indicating whether the node should be terminated after the pipeline sync. /// Flag indicating whether the node should be terminated after the pipeline sync.
terminate: bool, terminate: bool,
} }
impl fmt::Debug for NodeExitFuture {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("NodeExitFuture")
.field("consensus_engine_fut", &"...")
.field("terminate", &self.terminate)
.finish()
}
}
impl NodeExitFuture { impl NodeExitFuture {
/// Create a new `NodeExitFuture`. /// Create a new `NodeExitFuture`.
pub const fn new( pub fn new<F>(consensus_engine_fut: F, terminate: bool) -> Self
consensus_engine_rx: oneshot::Receiver<Result<(), BeaconConsensusEngineError>>, where
terminate: bool, F: Future<Output = eyre::Result<()>> + 'static + Send,
) -> Self { {
Self { consensus_engine_rx: Some(consensus_engine_rx), terminate } Self { consensus_engine_fut: Some(Box::pin(consensus_engine_fut)), terminate }
} }
} }
@ -35,18 +42,17 @@ impl Future for NodeExitFuture {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut(); let this = self.get_mut();
if let Some(rx) = this.consensus_engine_rx.as_mut() { if let Some(rx) = this.consensus_engine_fut.as_mut() {
match ready!(rx.poll_unpin(cx)) { match ready!(rx.poll_unpin(cx)) {
Ok(res) => { Ok(_) => {
this.consensus_engine_rx.take(); this.consensus_engine_fut.take();
res?;
if this.terminate { if this.terminate {
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} else { } else {
Poll::Pending Poll::Pending
} }
} }
Err(err) => Poll::Ready(Err(err.into())), Err(err) => Poll::Ready(Err(err)),
} }
} else { } else {
Poll::Pending Poll::Pending
@ -61,11 +67,9 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_node_exit_future_terminate_true() { async fn test_node_exit_future_terminate_true() {
let (tx, rx) = oneshot::channel::<Result<(), BeaconConsensusEngineError>>(); let fut = async { Ok(()) };
let _ = tx.send(Ok(())); let node_exit_future = NodeExitFuture::new(fut, true);
let node_exit_future = NodeExitFuture::new(rx, true);
let res = node_exit_future.await; let res = node_exit_future.await;
@ -74,11 +78,9 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_node_exit_future_terminate_false() { async fn test_node_exit_future_terminate_false() {
let (tx, rx) = oneshot::channel::<Result<(), BeaconConsensusEngineError>>(); let fut = async { Ok(()) };
let _ = tx.send(Ok(())); let mut node_exit_future = NodeExitFuture::new(fut, false);
let mut node_exit_future = NodeExitFuture::new(rx, false);
poll_fn(|cx| { poll_fn(|cx| {
assert!(node_exit_future.poll_unpin(cx).is_pending()); assert!(node_exit_future.poll_unpin(cx).is_pending());
Poll::Ready(()) Poll::Ready(())

View File

@ -389,7 +389,10 @@ where
on_node_started.on_event(full_node.clone())?; on_node_started.on_event(full_node.clone())?;
let handle = NodeHandle { let handle = NodeHandle {
node_exit_future: NodeExitFuture::new(rx, full_node.config.debug.terminate), node_exit_future: NodeExitFuture::new(
async { Ok(rx.await??) },
full_node.config.debug.terminate,
),
node: full_node, node: full_node,
}; };