diff --git a/crates/chain-state/src/lib.rs b/crates/chain-state/src/lib.rs index 48d996881..50a103111 100644 --- a/crates/chain-state/src/lib.rs +++ b/crates/chain-state/src/lib.rs @@ -16,9 +16,9 @@ pub use chain_info::ChainInfoTracker; mod notifications; pub use notifications::{ - BlockStateNotificationStream, CanonStateNotification, CanonStateNotificationSender, - CanonStateNotificationStream, CanonStateNotifications, CanonStateSubscriptions, - ForkChoiceNotifications, ForkChoiceStream, ForkChoiceSubscriptions, + CanonStateNotification, CanonStateNotificationSender, CanonStateNotificationStream, + CanonStateNotifications, CanonStateSubscriptions, ForkChoiceNotifications, ForkChoiceStream, + ForkChoiceSubscriptions, }; mod memory_overlay; diff --git a/crates/chain-state/src/notifications.rs b/crates/chain-state/src/notifications.rs index f754bf613..c1ab009cf 100644 --- a/crates/chain-state/src/notifications.rs +++ b/crates/chain-state/src/notifications.rs @@ -152,26 +152,35 @@ pub trait ForkChoiceSubscriptions: Send + Sync { fn subscribe_finalized_block(&self) -> ForkChoiceNotifications; /// Convenience method to get a stream of the new safe blocks of the chain. - fn safe_block_stream(&self) -> ForkChoiceStream { - ForkChoiceStream { st: WatchStream::new(self.subscribe_safe_block().0) } + fn safe_block_stream(&self) -> ForkChoiceStream { + ForkChoiceStream:: { st: WatchStream::new(self.subscribe_safe_block().0) } } /// Convenience method to get a stream of the new finalized blocks of the chain. - fn finalized_block_stream(&self) -> ForkChoiceStream { - ForkChoiceStream { st: WatchStream::new(self.subscribe_finalized_block().0) } + fn finalized_block_stream(&self) -> ForkChoiceStream { + ForkChoiceStream:: { + st: WatchStream::new(self.subscribe_finalized_block().0), + } } } -/// A stream of the fork choices in the form of [`SealedHeader`]. +/// A stream for fork choice watch channels (pending, safe or finalized watchers) #[derive(Debug)] #[pin_project::pin_project] -pub struct ForkChoiceStream { +pub struct ForkChoiceStream { #[pin] - st: WatchStream>, + st: WatchStream>, } -impl Stream for ForkChoiceStream { - type Item = SealedHeader; +impl ForkChoiceStream { + /// Creates a new `ForkChoiceStream` + pub fn new(rx: watch::Receiver>) -> Self { + Self { st: WatchStream::new(rx) } + } +} + +impl Stream for ForkChoiceStream { + type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { @@ -183,38 +192,3 @@ impl Stream for ForkChoiceStream { } } } - -/// A stream for block state watch channels (pending, safe or finalized watchers) -#[derive(Debug)] -#[pin_project::pin_project] -pub struct BlockStateNotificationStream { - #[pin] - st: WatchStream>, -} - -impl BlockStateNotificationStream { - /// Creates a new `BlockStateNotificationStream` - pub fn new(rx: watch::Receiver>) -> Self { - Self { st: WatchStream::new(rx) } - } -} - -impl Stream for BlockStateNotificationStream { - type Item = T; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - loop { - return match ready!(self.as_mut().project().st.poll_next(cx)) { - Some(notification) => { - if notification.is_some() { - return Poll::Ready(notification); - } else { - // skip None values - continue; - } - } - None => Poll::Ready(None), - } - } - } -}