refactor: roll custom canon state notification stream (#2486)

This commit is contained in:
Matthias Seitz
2023-05-01 16:43:54 +02:00
committed by GitHub
parent 7ccede7e2b
commit e1446ae49b
4 changed files with 38 additions and 6 deletions

1
Cargo.lock generated
View File

@ -5158,6 +5158,7 @@ dependencies = [
"auto_impl", "auto_impl",
"itertools", "itertools",
"parking_lot 0.12.1", "parking_lot 0.12.1",
"pin-project",
"reth-db", "reth-db",
"reth-interfaces", "reth-interfaces",
"reth-primitives", "reth-primitives",

View File

@ -205,8 +205,6 @@ impl Command {
ctx.task_executor.spawn_critical( ctx.task_executor.spawn_critical(
"txpool maintenance task", "txpool maintenance task",
Box::pin(async move { Box::pin(async move {
let chain_events = chain_events.filter_map(|event| async move { event.ok() });
pin_mut!(chain_events);
reth_transaction_pool::maintain::maintain_transaction_pool( reth_transaction_pool::maintain::maintain_transaction_pool(
client, client,
pool, pool,

View File

@ -26,6 +26,7 @@ tracing = "0.1"
thiserror = "1.0.37" thiserror = "1.0.37"
auto_impl = "1.0" auto_impl = "1.0"
itertools = "0.10" itertools = "0.10"
pin-project = "1.0"
# test-utils # test-utils
reth-rlp = { path = "../../rlp", optional = true } reth-rlp = { path = "../../rlp", optional = true }

View File

@ -1,9 +1,14 @@
//! Canonical chain state notification trait and types. //! Canonical chain state notification trait and types.
use crate::{chain::BlockReceipts, Chain}; use crate::{chain::BlockReceipts, Chain};
use auto_impl::auto_impl; use auto_impl::auto_impl;
use std::sync::Arc; use std::{
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
};
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tokio_stream::wrappers::BroadcastStream; use tokio_stream::{wrappers::BroadcastStream, Stream};
use tracing::debug;
/// Type alias for a receiver that receives [CanonStateNotification] /// Type alias for a receiver that receives [CanonStateNotification]
pub type CanonStateNotifications = broadcast::Receiver<CanonStateNotification>; pub type CanonStateNotifications = broadcast::Receiver<CanonStateNotification>;
@ -20,8 +25,35 @@ pub trait CanonStateSubscriptions: Send + Sync {
fn subscribe_to_canonical_state(&self) -> CanonStateNotifications; fn subscribe_to_canonical_state(&self) -> CanonStateNotifications;
/// Convenience method to get a stream of [`CanonStateNotification`]. /// Convenience method to get a stream of [`CanonStateNotification`].
fn canonical_state_stream(&self) -> BroadcastStream<CanonStateNotification> { fn canonical_state_stream(&self) -> CanonStateNotificationStream {
BroadcastStream::new(self.subscribe_to_canonical_state()) CanonStateNotificationStream {
st: BroadcastStream::new(self.subscribe_to_canonical_state()),
}
}
}
/// A Stream of [CanonStateNotification].
#[derive(Debug)]
#[pin_project::pin_project]
pub struct CanonStateNotificationStream {
#[pin]
st: BroadcastStream<CanonStateNotification>,
}
impl Stream for CanonStateNotificationStream {
type Item = CanonStateNotification;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
return match ready!(self.as_mut().project().st.poll_next(cx)) {
Some(Ok(notification)) => Poll::Ready(Some(notification)),
Some(Err(err)) => {
debug!(%err, "canonical state notification stream lagging behind");
continue
}
None => Poll::Ready(None),
}
}
} }
} }