refactor: replace futures_util pin and tokio_pin with std pin (#8109)

This commit is contained in:
jn
2024-05-06 03:14:54 -07:00
committed by GitHub
parent 199503531c
commit 8f8b29b3ce
8 changed files with 32 additions and 28 deletions

View File

@ -15,7 +15,6 @@ workspace = true
reth-tasks.workspace = true
# async
futures.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] }
# misc

View File

@ -10,9 +10,8 @@
//! Entrypoint for running commands.
use futures::pin_mut;
use reth_tasks::{TaskExecutor, TaskManager};
use std::future::Future;
use std::{future::Future, pin::pin};
use tracing::{debug, error, trace};
/// Executes CLI commands.
@ -141,7 +140,7 @@ where
E: Send + Sync + From<reth_tasks::PanickedTaskError> + 'static,
{
{
pin_mut!(fut);
let fut = pin!(fut);
tokio::select! {
err = tasks => {
return Err(err.into())
@ -166,7 +165,9 @@ where
{
let mut stream = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
let sigterm = stream.recv();
pin_mut!(sigterm, ctrl_c, fut);
let sigterm = pin!(sigterm);
let ctrl_c = pin!(ctrl_c);
let fut = pin!(fut);
tokio::select! {
_ = ctrl_c => {
@ -181,7 +182,8 @@ where
#[cfg(not(unix))]
{
pin_mut!(ctrl_c, fut);
let ctrl_c = pin!(ctrl_c);
let fut = pin!(fut);
tokio::select! {
_ = ctrl_c => {

View File

@ -12,7 +12,7 @@ use std::{
fmt,
future::Future,
io,
pin::Pin,
pin::{pin, Pin},
task::{ready, Context, Poll},
};
@ -23,7 +23,7 @@ use crate::{
CanDisconnect, DisconnectReason, EthStream, P2PStream, Status, UnauthedEthStream,
};
use bytes::{Bytes, BytesMut};
use futures::{pin_mut, Sink, SinkExt, Stream, StreamExt, TryStream, TryStreamExt};
use futures::{Sink, SinkExt, Stream, StreamExt, TryStream, TryStreamExt};
use reth_primitives::ForkFilter;
use tokio::sync::{mpsc, mpsc::UnboundedSender};
use tokio_stream::wrappers::UnboundedReceiverStream;
@ -159,7 +159,7 @@ impl<St> RlpxProtocolMultiplexer<St> {
};
let f = handshake(proxy);
pin_mut!(f);
let mut f = pin!(f);
// this polls the connection and the primary stream concurrently until the handshake is
// complete

View File

@ -104,8 +104,10 @@ impl Stream for TcpListenerStream {
#[cfg(test)]
mod tests {
use super::*;
use futures::pin_mut;
use std::net::{Ipv4Addr, SocketAddrV4};
use std::{
net::{Ipv4Addr, SocketAddrV4},
pin::pin,
};
use tokio::macros::support::poll_fn;
#[tokio::test(flavor = "multi_thread")]
@ -117,7 +119,7 @@ mod tests {
let local_addr = listener.local_address();
tokio::task::spawn(async move {
pin_mut!(listener);
let mut listener = pin!(listener);
match poll_fn(|cx| listener.as_mut().poll(cx)).await {
ListenerEvent::Incoming { .. } => {}
_ => {

View File

@ -35,7 +35,7 @@ use crate::{
transactions::NetworkTransactionEvent,
FetchClient, NetworkBuilder,
};
use futures::{pin_mut, Future, StreamExt};
use futures::{Future, StreamExt};
use parking_lot::Mutex;
use reth_eth_wire::{
capability::{Capabilities, CapabilityMessage},
@ -53,7 +53,7 @@ use reth_tokio_util::EventListeners;
use secp256k1::SecretKey;
use std::{
net::SocketAddr,
pin::Pin,
pin::{pin, Pin},
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
@ -902,7 +902,7 @@ where
shutdown_hook: impl FnOnce(&mut Self),
) {
let network = self;
pin_mut!(network, shutdown);
let mut network = pin!(network);
let mut graceful_guard = None;
tokio::select! {

View File

@ -19,7 +19,7 @@ use jsonrpsee::{
use std::{
future::Future,
io,
pin::Pin,
pin::{pin, Pin},
sync::Arc,
task::{Context, Poll},
};
@ -155,7 +155,7 @@ where
let connection_guard = ConnectionGuard::new(self.cfg.max_connections as usize);
let stopped = stop_handle.clone().shutdown();
tokio::pin!(stopped);
let mut stopped = pin!(stopped);
let (drop_on_completion, mut process_connection_awaiter) = mpsc::channel::<()>(1);
@ -223,7 +223,7 @@ where
S: Future + Unpin,
{
let accept = listener.accept();
tokio::pin!(accept);
let accept = pin!(accept);
match futures_util::future::select(accept, stopped).await {
Either::Left((res, stop)) => match res {
@ -506,11 +506,11 @@ async fn to_ipc_service<S, T>(
pending_calls: Default::default(),
items: Default::default(),
};
tokio::pin!(conn, rx_item);
let stopped = stop_handle.shutdown();
tokio::pin!(stopped);
let mut conn = pin!(conn);
let mut rx_item = pin!(rx_item);
let mut stopped = pin!(stopped);
loop {
tokio::select! {
@ -522,7 +522,7 @@ async fn to_ipc_service<S, T>(
conn.push_back(item);
}
}
_ = &mut stopped=> {
_ = &mut stopped => {
// shutdown
break
}
@ -844,6 +844,7 @@ mod tests {
PendingSubscriptionSink, RpcModule, SubscriptionMessage,
};
use reth_tracing::init_test_tracing;
use std::pin::pin;
use tokio::sync::broadcast;
use tokio_stream::wrappers::BroadcastStream;
@ -854,7 +855,8 @@ mod tests {
let sink = pending.accept().await.unwrap();
let closed = sink.closed();
futures::pin_mut!(closed, stream);
let mut closed = pin!(closed);
let mut stream = pin!(stream);
loop {
match select(closed, stream.next()).await {

View File

@ -19,12 +19,12 @@ use crate::{
use dyn_clone::DynClone;
use futures_util::{
future::{select, BoxFuture},
pin_mut, Future, FutureExt, TryFutureExt,
Future, FutureExt, TryFutureExt,
};
use std::{
any::Any,
fmt::{Display, Formatter},
pin::Pin,
pin::{pin, Pin},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
@ -334,7 +334,7 @@ impl TaskExecutor {
async move {
// Create an instance of IncCounterOnDrop with the counter to increment
let _inc_counter_on_drop = IncCounterOnDrop::new(finished_regular_tasks_metrics);
pin_mut!(fut);
let fut = pin!(fut);
let _ = select(on_shutdown, fut).await;
}
}
@ -409,7 +409,7 @@ impl TaskExecutor {
let task = async move {
// Create an instance of IncCounterOnDrop with the counter to increment
let _inc_counter_on_drop = IncCounterOnDrop::new(finished_critical_tasks_metrics);
pin_mut!(task);
let task = pin!(task);
let _ = select(on_shutdown, task).await;
};