diff --git a/Cargo.lock b/Cargo.lock index f054a15ba..4b3305491 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6565,7 +6565,6 @@ dependencies = [ name = "reth-cli-runner" version = "0.2.0-beta.6" dependencies = [ - "futures", "reth-tasks", "tokio", "tracing", diff --git a/crates/cli/runner/Cargo.toml b/crates/cli/runner/Cargo.toml index 697621cee..3182b738b 100644 --- a/crates/cli/runner/Cargo.toml +++ b/crates/cli/runner/Cargo.toml @@ -15,7 +15,6 @@ workspace = true reth-tasks.workspace = true # async -futures.workspace = true tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] } # misc diff --git a/crates/cli/runner/src/lib.rs b/crates/cli/runner/src/lib.rs index 31a1356c6..94536d0cb 100644 --- a/crates/cli/runner/src/lib.rs +++ b/crates/cli/runner/src/lib.rs @@ -10,9 +10,8 @@ //! Entrypoint for running commands. -use futures::pin_mut; use reth_tasks::{TaskExecutor, TaskManager}; -use std::future::Future; +use std::{future::Future, pin::pin}; use tracing::{debug, error, trace}; /// Executes CLI commands. @@ -141,7 +140,7 @@ where E: Send + Sync + From + 'static, { { - pin_mut!(fut); + let fut = pin!(fut); tokio::select! { err = tasks => { return Err(err.into()) @@ -166,7 +165,9 @@ where { let mut stream = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?; let sigterm = stream.recv(); - pin_mut!(sigterm, ctrl_c, fut); + let sigterm = pin!(sigterm); + let ctrl_c = pin!(ctrl_c); + let fut = pin!(fut); tokio::select! { _ = ctrl_c => { @@ -181,7 +182,8 @@ where #[cfg(not(unix))] { - pin_mut!(ctrl_c, fut); + let ctrl_c = pin!(ctrl_c); + let fut = pin!(fut); tokio::select! { _ = ctrl_c => { diff --git a/crates/net/eth-wire/src/multiplex.rs b/crates/net/eth-wire/src/multiplex.rs index 82eccd5c8..04b7cda37 100644 --- a/crates/net/eth-wire/src/multiplex.rs +++ b/crates/net/eth-wire/src/multiplex.rs @@ -12,7 +12,7 @@ use std::{ fmt, future::Future, io, - pin::Pin, + pin::{pin, Pin}, task::{ready, Context, Poll}, }; @@ -23,7 +23,7 @@ use crate::{ CanDisconnect, DisconnectReason, EthStream, P2PStream, Status, UnauthedEthStream, }; use bytes::{Bytes, BytesMut}; -use futures::{pin_mut, Sink, SinkExt, Stream, StreamExt, TryStream, TryStreamExt}; +use futures::{Sink, SinkExt, Stream, StreamExt, TryStream, TryStreamExt}; use reth_primitives::ForkFilter; use tokio::sync::{mpsc, mpsc::UnboundedSender}; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -159,7 +159,7 @@ impl RlpxProtocolMultiplexer { }; let f = handshake(proxy); - pin_mut!(f); + let mut f = pin!(f); // this polls the connection and the primary stream concurrently until the handshake is // complete diff --git a/crates/net/network/src/listener.rs b/crates/net/network/src/listener.rs index 1575b3933..4cc219655 100644 --- a/crates/net/network/src/listener.rs +++ b/crates/net/network/src/listener.rs @@ -104,8 +104,10 @@ impl Stream for TcpListenerStream { #[cfg(test)] mod tests { use super::*; - use futures::pin_mut; - use std::net::{Ipv4Addr, SocketAddrV4}; + use std::{ + net::{Ipv4Addr, SocketAddrV4}, + pin::pin, + }; use tokio::macros::support::poll_fn; #[tokio::test(flavor = "multi_thread")] @@ -117,7 +119,7 @@ mod tests { let local_addr = listener.local_address(); tokio::task::spawn(async move { - pin_mut!(listener); + let mut listener = pin!(listener); match poll_fn(|cx| listener.as_mut().poll(cx)).await { ListenerEvent::Incoming { .. } => {} _ => { diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 0d2a33408..d516625c6 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -35,7 +35,7 @@ use crate::{ transactions::NetworkTransactionEvent, FetchClient, NetworkBuilder, }; -use futures::{pin_mut, Future, StreamExt}; +use futures::{Future, StreamExt}; use parking_lot::Mutex; use reth_eth_wire::{ capability::{Capabilities, CapabilityMessage}, @@ -53,7 +53,7 @@ use reth_tokio_util::EventListeners; use secp256k1::SecretKey; use std::{ net::SocketAddr, - pin::Pin, + pin::{pin, Pin}, sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, @@ -902,7 +902,7 @@ where shutdown_hook: impl FnOnce(&mut Self), ) { let network = self; - pin_mut!(network, shutdown); + let mut network = pin!(network); let mut graceful_guard = None; tokio::select! { diff --git a/crates/rpc/ipc/src/server/mod.rs b/crates/rpc/ipc/src/server/mod.rs index ed0eadb4a..046087454 100644 --- a/crates/rpc/ipc/src/server/mod.rs +++ b/crates/rpc/ipc/src/server/mod.rs @@ -19,7 +19,7 @@ use jsonrpsee::{ use std::{ future::Future, io, - pin::Pin, + pin::{pin, Pin}, sync::Arc, task::{Context, Poll}, }; @@ -155,7 +155,7 @@ where let connection_guard = ConnectionGuard::new(self.cfg.max_connections as usize); let stopped = stop_handle.clone().shutdown(); - tokio::pin!(stopped); + let mut stopped = pin!(stopped); let (drop_on_completion, mut process_connection_awaiter) = mpsc::channel::<()>(1); @@ -223,7 +223,7 @@ where S: Future + Unpin, { let accept = listener.accept(); - tokio::pin!(accept); + let accept = pin!(accept); match futures_util::future::select(accept, stopped).await { Either::Left((res, stop)) => match res { @@ -506,11 +506,11 @@ async fn to_ipc_service( pending_calls: Default::default(), items: Default::default(), }; - tokio::pin!(conn, rx_item); - let stopped = stop_handle.shutdown(); - tokio::pin!(stopped); + let mut conn = pin!(conn); + let mut rx_item = pin!(rx_item); + let mut stopped = pin!(stopped); loop { tokio::select! { @@ -522,7 +522,7 @@ async fn to_ipc_service( conn.push_back(item); } } - _ = &mut stopped=> { + _ = &mut stopped => { // shutdown break } @@ -844,6 +844,7 @@ mod tests { PendingSubscriptionSink, RpcModule, SubscriptionMessage, }; use reth_tracing::init_test_tracing; + use std::pin::pin; use tokio::sync::broadcast; use tokio_stream::wrappers::BroadcastStream; @@ -854,7 +855,8 @@ mod tests { let sink = pending.accept().await.unwrap(); let closed = sink.closed(); - futures::pin_mut!(closed, stream); + let mut closed = pin!(closed); + let mut stream = pin!(stream); loop { match select(closed, stream.next()).await { diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 0f93e5bc5..3e526a344 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -19,12 +19,12 @@ use crate::{ use dyn_clone::DynClone; use futures_util::{ future::{select, BoxFuture}, - pin_mut, Future, FutureExt, TryFutureExt, + Future, FutureExt, TryFutureExt, }; use std::{ any::Any, fmt::{Display, Formatter}, - pin::Pin, + pin::{pin, Pin}, sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -334,7 +334,7 @@ impl TaskExecutor { async move { // Create an instance of IncCounterOnDrop with the counter to increment let _inc_counter_on_drop = IncCounterOnDrop::new(finished_regular_tasks_metrics); - pin_mut!(fut); + let fut = pin!(fut); let _ = select(on_shutdown, fut).await; } } @@ -409,7 +409,7 @@ impl TaskExecutor { let task = async move { // Create an instance of IncCounterOnDrop with the counter to increment let _inc_counter_on_drop = IncCounterOnDrop::new(finished_critical_tasks_metrics); - pin_mut!(task); + let task = pin!(task); let _ = select(on_shutdown, task).await; };