feat: initialize txpool maintenance (#2429)

Co-authored-by: Roman Krasiuk <rokrassyuk@gmail.com>
This commit is contained in:
Matthias Seitz
2023-04-27 15:53:55 +02:00
committed by GitHub
parent 4e1ae0ee95
commit 9b5a84acc8
5 changed files with 35 additions and 6 deletions

1
Cargo.lock generated
View File

@ -5156,6 +5156,7 @@ dependencies = [
"reth-trie",
"thiserror",
"tokio",
"tokio-stream",
"tracing",
]

View File

@ -40,7 +40,7 @@ use reth_interfaces::{
use reth_network::{error::NetworkError, NetworkConfig, NetworkHandle, NetworkManager};
use reth_network_api::NetworkInfo;
use reth_primitives::{BlockHashOrNumber, Chain, ChainSpec, Head, Header, SealedHeader, H256};
use reth_provider::{BlockProvider, HeaderProvider, ShareableDatabase};
use reth_provider::{BlockProvider, CanonStateSubscriptions, HeaderProvider, ShareableDatabase};
use reth_revm::Factory;
use reth_revm_inspectors::stack::Hook;
use reth_rpc_engine_api::EngineApi;
@ -197,6 +197,27 @@ impl Command {
);
info!(target: "reth::cli", "Transaction pool initialized");
// spawn txpool maintenance task
{
let pool = transaction_pool.clone();
let chain_events = blockchain_db.canonical_state_stream();
let client = blockchain_db.clone();
ctx.task_executor.spawn_critical(
"txpool maintenance task",
Box::pin(async move {
let chain_events = chain_events.filter_map(|event| async move { event.ok() });
pin_mut!(chain_events);
reth_transaction_pool::maintain::maintain_transaction_pool(
client,
pool,
chain_events,
)
.await
}),
);
debug!(target: "reth::cli", "Spawned txpool maintenance task");
}
info!(target: "reth::cli", "Connecting to P2P network");
let secret_key =
get_secret_key(self.p2p_secret_key.unwrap_or_chain_default(self.chain.chain))?;

View File

@ -17,6 +17,7 @@ reth-trie = { path = "../../trie" }
# async
tokio = { version = "1.21", features = ["sync", "macros", "rt-multi-thread"] }
tokio-stream = "0.1"
# tracing
tracing = "0.1"

View File

@ -2,13 +2,14 @@
use crate::{chain::BlockReceipts, Chain};
use auto_impl::auto_impl;
use std::sync::Arc;
use tokio::sync::broadcast::{Receiver, Sender};
use tokio::sync::broadcast;
use tokio_stream::wrappers::BroadcastStream;
/// Type alias for a receiver that receives [CanonStateNotification]
pub type CanonStateNotifications = Receiver<CanonStateNotification>;
pub type CanonStateNotifications = broadcast::Receiver<CanonStateNotification>;
/// Type alias for a sender that sends [CanonStateNotification]
pub type CanonStateNotificationSender = Sender<CanonStateNotification>;
pub type CanonStateNotificationSender = broadcast::Sender<CanonStateNotification>;
/// A type that allows to register chain related event subscriptions.
#[auto_impl(&, Arc)]
@ -17,6 +18,11 @@ pub trait CanonStateSubscriptions: Send + Sync {
///
/// A canonical chain be one or more blocks, a reorg or a revert.
fn subscribe_to_canonical_state(&self) -> CanonStateNotifications;
/// Convenience method to get a stream of [`CanonStateNotification`].
fn canonical_state_stream(&self) -> BroadcastStream<CanonStateNotification> {
BroadcastStream::new(self.subscribe_to_canonical_state())
}
}
/// Chain action that is triggered when a new block is imported or old block is reverted.

View File

@ -22,10 +22,10 @@ pub async fn maintain_transaction_pool<Client, V, T, St>(
pool: Pool<V, T>,
mut events: St,
) where
Client: StateProviderFactory + BlockProvider + 'static,
Client: StateProviderFactory + BlockProvider,
V: TransactionValidator,
T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
St: Stream<Item = CanonStateNotification> + Unpin + 'static,
St: Stream<Item = CanonStateNotification> + Unpin,
{
// TODO set current head for the pool