From e1446ae49b5453020cf1e5a4510b83375acd42cc Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Mon, 1 May 2023 16:43:54 +0200 Subject: [PATCH] refactor: roll custom canon state notification stream (#2486) --- Cargo.lock | 1 + bin/reth/src/node/mod.rs | 2 -- crates/storage/provider/Cargo.toml | 1 + crates/storage/provider/src/traits/chain.rs | 40 ++++++++++++++++++--- 4 files changed, 38 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 86277781c..e06a080aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5158,6 +5158,7 @@ dependencies = [ "auto_impl", "itertools", "parking_lot 0.12.1", + "pin-project", "reth-db", "reth-interfaces", "reth-primitives", diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 23731dfcb..da14513e2 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -205,8 +205,6 @@ impl Command { ctx.task_executor.spawn_critical( "txpool maintenance task", Box::pin(async move { - let chain_events = chain_events.filter_map(|event| async move { event.ok() }); - pin_mut!(chain_events); reth_transaction_pool::maintain::maintain_transaction_pool( client, pool, diff --git a/crates/storage/provider/Cargo.toml b/crates/storage/provider/Cargo.toml index 7d1a7580f..c93219c5e 100644 --- a/crates/storage/provider/Cargo.toml +++ b/crates/storage/provider/Cargo.toml @@ -26,6 +26,7 @@ tracing = "0.1" thiserror = "1.0.37" auto_impl = "1.0" itertools = "0.10" +pin-project = "1.0" # test-utils reth-rlp = { path = "../../rlp", optional = true } diff --git a/crates/storage/provider/src/traits/chain.rs b/crates/storage/provider/src/traits/chain.rs index 8deb380d0..da1388eda 100644 --- a/crates/storage/provider/src/traits/chain.rs +++ b/crates/storage/provider/src/traits/chain.rs @@ -1,9 +1,14 @@ //! Canonical chain state notification trait and types. use crate::{chain::BlockReceipts, Chain}; use auto_impl::auto_impl; -use std::sync::Arc; +use std::{ + pin::Pin, + sync::Arc, + task::{ready, Context, Poll}, +}; use tokio::sync::broadcast; -use tokio_stream::wrappers::BroadcastStream; +use tokio_stream::{wrappers::BroadcastStream, Stream}; +use tracing::debug; /// Type alias for a receiver that receives [CanonStateNotification] pub type CanonStateNotifications = broadcast::Receiver; @@ -20,8 +25,35 @@ pub trait CanonStateSubscriptions: Send + Sync { fn subscribe_to_canonical_state(&self) -> CanonStateNotifications; /// Convenience method to get a stream of [`CanonStateNotification`]. - fn canonical_state_stream(&self) -> BroadcastStream { - BroadcastStream::new(self.subscribe_to_canonical_state()) + fn canonical_state_stream(&self) -> CanonStateNotificationStream { + CanonStateNotificationStream { + st: BroadcastStream::new(self.subscribe_to_canonical_state()), + } + } +} + +/// A Stream of [CanonStateNotification]. +#[derive(Debug)] +#[pin_project::pin_project] +pub struct CanonStateNotificationStream { + #[pin] + st: BroadcastStream, +} + +impl Stream for CanonStateNotificationStream { + type Item = CanonStateNotification; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + return match ready!(self.as_mut().project().st.poll_next(cx)) { + Some(Ok(notification)) => Poll::Ready(Some(notification)), + Some(Err(err)) => { + debug!(%err, "canonical state notification stream lagging behind"); + continue + } + None => Poll::Ready(None), + } + } } }