refactor: replace ForkChoiceStream type with generic stream type (#10458)

This commit is contained in:
Jennifer
2024-08-23 18:24:58 +01:00
committed by GitHub
parent 8ba7821a49
commit 0791b9eac9
2 changed files with 21 additions and 47 deletions

View File

@ -16,9 +16,9 @@ pub use chain_info::ChainInfoTracker;
mod notifications;
pub use notifications::{
BlockStateNotificationStream, CanonStateNotification, CanonStateNotificationSender,
CanonStateNotificationStream, CanonStateNotifications, CanonStateSubscriptions,
ForkChoiceNotifications, ForkChoiceStream, ForkChoiceSubscriptions,
CanonStateNotification, CanonStateNotificationSender, CanonStateNotificationStream,
CanonStateNotifications, CanonStateSubscriptions, ForkChoiceNotifications, ForkChoiceStream,
ForkChoiceSubscriptions,
};
mod memory_overlay;

View File

@ -152,26 +152,35 @@ pub trait ForkChoiceSubscriptions: Send + Sync {
fn subscribe_finalized_block(&self) -> ForkChoiceNotifications;
/// Convenience method to get a stream of the new safe blocks of the chain.
fn safe_block_stream(&self) -> ForkChoiceStream {
ForkChoiceStream { st: WatchStream::new(self.subscribe_safe_block().0) }
fn safe_block_stream(&self) -> ForkChoiceStream<SealedHeader> {
ForkChoiceStream::<SealedHeader> { st: WatchStream::new(self.subscribe_safe_block().0) }
}
/// Convenience method to get a stream of the new finalized blocks of the chain.
fn finalized_block_stream(&self) -> ForkChoiceStream {
ForkChoiceStream { st: WatchStream::new(self.subscribe_finalized_block().0) }
fn finalized_block_stream(&self) -> ForkChoiceStream<SealedHeader> {
ForkChoiceStream::<SealedHeader> {
st: WatchStream::new(self.subscribe_finalized_block().0),
}
}
}
/// A stream of the fork choices in the form of [`SealedHeader`].
/// A stream for fork choice watch channels (pending, safe or finalized watchers)
#[derive(Debug)]
#[pin_project::pin_project]
pub struct ForkChoiceStream {
pub struct ForkChoiceStream<T> {
#[pin]
st: WatchStream<Option<SealedHeader>>,
st: WatchStream<Option<T>>,
}
impl Stream for ForkChoiceStream {
type Item = SealedHeader;
impl<T: Clone + Sync + Send + 'static> ForkChoiceStream<T> {
/// Creates a new `ForkChoiceStream`
pub fn new(rx: watch::Receiver<Option<T>>) -> Self {
Self { st: WatchStream::new(rx) }
}
}
impl<T: Clone + Sync + Send + 'static> Stream for ForkChoiceStream<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
@ -183,38 +192,3 @@ impl Stream for ForkChoiceStream {
}
}
}
/// A stream for block state watch channels (pending, safe or finalized watchers)
#[derive(Debug)]
#[pin_project::pin_project]
pub struct BlockStateNotificationStream<T> {
#[pin]
st: WatchStream<Option<T>>,
}
impl<T: Clone + Sync + Send + 'static> BlockStateNotificationStream<T> {
/// Creates a new `BlockStateNotificationStream`
pub fn new(rx: watch::Receiver<Option<T>>) -> Self {
Self { st: WatchStream::new(rx) }
}
}
impl<T: Clone + Sync + Send + 'static> Stream for BlockStateNotificationStream<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
return match ready!(self.as_mut().project().st.poll_next(cx)) {
Some(notification) => {
if notification.is_some() {
return Poll::Ready(notification);
} else {
// skip None values
continue;
}
}
None => Poll::Ready(None),
}
}
}
}