feat: make TransactionsManager Future impl generic over NetworkPrimitives (#13115)

This commit is contained in:
Dan Cline
2024-12-03 17:06:29 -05:00
committed by GitHub
parent e9484b2437
commit 601e8b9147
3 changed files with 33 additions and 35 deletions

View File

@ -1,7 +1,7 @@
//! Abstraction over primitive types in network messages. //! Abstraction over primitive types in network messages.
use alloy_rlp::{Decodable, Encodable}; use alloy_rlp::{Decodable, Encodable};
use reth_primitives_traits::{Block, BlockHeader}; use reth_primitives_traits::{Block, BlockHeader, SignedTransaction};
use std::fmt::Debug; use std::fmt::Debug;
/// Abstraction over primitive types which might appear in network messages. See /// Abstraction over primitive types which might appear in network messages. See
@ -62,17 +62,7 @@ pub trait NetworkPrimitives:
+ 'static; + 'static;
/// The transaction type which peers return in `PooledTransactions` messages. /// The transaction type which peers return in `PooledTransactions` messages.
type PooledTransaction: TryFrom<Self::BroadcastedTransaction> type PooledTransaction: SignedTransaction + TryFrom<Self::BroadcastedTransaction> + 'static;
+ Encodable
+ Decodable
+ Send
+ Sync
+ Unpin
+ Clone
+ Debug
+ PartialEq
+ Eq
+ 'static;
/// The transaction type which peers return in `GetReceipts` messages. /// The transaction type which peers return in `GetReceipts` messages.
type Receipt: Encodable type Receipt: Encodable

View File

@ -50,6 +50,7 @@ use reth_network_api::PeerRequest;
use reth_network_p2p::error::{RequestError, RequestResult}; use reth_network_p2p::error::{RequestError, RequestResult};
use reth_network_peers::PeerId; use reth_network_peers::PeerId;
use reth_primitives::PooledTransactionsElement; use reth_primitives::PooledTransactionsElement;
use reth_primitives_traits::SignedTransaction;
use schnellru::ByLength; use schnellru::ByLength;
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
use smallvec::{smallvec, SmallVec}; use smallvec::{smallvec, SmallVec};
@ -895,16 +896,14 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {
approx_capacity_get_pooled_transactions_req_eth66() approx_capacity_get_pooled_transactions_req_eth66()
} }
} }
}
impl TransactionFetcher {
/// Processes a resolved [`GetPooledTransactions`] request. Queues the outcome as a /// Processes a resolved [`GetPooledTransactions`] request. Queues the outcome as a
/// [`FetchEvent`], which will then be streamed by /// [`FetchEvent`], which will then be streamed by
/// [`TransactionsManager`](super::TransactionsManager). /// [`TransactionsManager`](super::TransactionsManager).
pub fn on_resolved_get_pooled_transactions_request_fut( pub fn on_resolved_get_pooled_transactions_request_fut(
&mut self, &mut self,
response: GetPooledTxResponse, response: GetPooledTxResponse<N::PooledTransaction>,
) -> FetchEvent { ) -> FetchEvent<N::PooledTransaction> {
// update peer activity, requests for buffered hashes can only be made to idle // update peer activity, requests for buffered hashes can only be made to idle
// fallback peers // fallback peers
let GetPooledTxResponse { peer_id, mut requested_hashes, result } = response; let GetPooledTxResponse { peer_id, mut requested_hashes, result } = response;
@ -1026,8 +1025,8 @@ impl TransactionFetcher {
} }
} }
impl Stream for TransactionFetcher { impl<N: NetworkPrimitives> Stream for TransactionFetcher<N> {
type Item = FetchEvent; type Item = FetchEvent<N::PooledTransaction>;
/// Advances all inflight requests and returns the next event. /// Advances all inflight requests and returns the next event.
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
@ -1176,18 +1175,18 @@ impl<T> Future for GetPooledTxRequestFut<T> {
/// Wrapper of unverified [`PooledTransactions`]. /// Wrapper of unverified [`PooledTransactions`].
#[derive(Debug, Constructor, Deref)] #[derive(Debug, Constructor, Deref)]
pub struct UnverifiedPooledTransactions { pub struct UnverifiedPooledTransactions<T> {
txns: PooledTransactions, txns: PooledTransactions<T>,
} }
/// [`PooledTransactions`] that have been successfully verified. /// [`PooledTransactions`] that have been successfully verified.
#[derive(Debug, Constructor, Deref)] #[derive(Debug, Constructor, Deref)]
pub struct VerifiedPooledTransactions { pub struct VerifiedPooledTransactions<T> {
txns: PooledTransactions, txns: PooledTransactions<T>,
} }
impl DedupPayload for VerifiedPooledTransactions { impl<T: SignedTransaction> DedupPayload for VerifiedPooledTransactions<T> {
type Value = PooledTransactionsElement; type Value = T;
fn is_empty(&self) -> bool { fn is_empty(&self) -> bool {
self.txns.is_empty() self.txns.is_empty()
@ -1199,26 +1198,30 @@ impl DedupPayload for VerifiedPooledTransactions {
fn dedup(self) -> PartiallyValidData<Self::Value> { fn dedup(self) -> PartiallyValidData<Self::Value> {
PartiallyValidData::from_raw_data( PartiallyValidData::from_raw_data(
self.txns.into_iter().map(|tx| (*tx.hash(), tx)).collect(), self.txns.into_iter().map(|tx| (*tx.tx_hash(), tx)).collect(),
None, None,
) )
} }
} }
trait VerifyPooledTransactionsResponse { trait VerifyPooledTransactionsResponse {
type Transaction: SignedTransaction;
fn verify( fn verify(
self, self,
requested_hashes: &RequestTxHashes, requested_hashes: &RequestTxHashes,
peer_id: &PeerId, peer_id: &PeerId,
) -> (VerificationOutcome, VerifiedPooledTransactions); ) -> (VerificationOutcome, VerifiedPooledTransactions<Self::Transaction>);
} }
impl VerifyPooledTransactionsResponse for UnverifiedPooledTransactions { impl<T: SignedTransaction> VerifyPooledTransactionsResponse for UnverifiedPooledTransactions<T> {
type Transaction = T;
fn verify( fn verify(
self, self,
requested_hashes: &RequestTxHashes, requested_hashes: &RequestTxHashes,
_peer_id: &PeerId, _peer_id: &PeerId,
) -> (VerificationOutcome, VerifiedPooledTransactions) { ) -> (VerificationOutcome, VerifiedPooledTransactions<T>) {
let mut verification_outcome = VerificationOutcome::Ok; let mut verification_outcome = VerificationOutcome::Ok;
let Self { mut txns } = self; let Self { mut txns } = self;
@ -1229,11 +1232,11 @@ impl VerifyPooledTransactionsResponse for UnverifiedPooledTransactions {
let mut tx_hashes_not_requested_count = 0; let mut tx_hashes_not_requested_count = 0;
txns.0.retain(|tx| { txns.0.retain(|tx| {
if !requested_hashes.contains(tx.hash()) { if !requested_hashes.contains(tx.tx_hash()) {
verification_outcome = VerificationOutcome::ReportPeer; verification_outcome = VerificationOutcome::ReportPeer;
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
tx_hashes_not_requested.push(*tx.hash()); tx_hashes_not_requested.push(*tx.tx_hash());
#[cfg(not(debug_assertions))] #[cfg(not(debug_assertions))]
{ {
tx_hashes_not_requested_count += 1; tx_hashes_not_requested_count += 1;

View File

@ -49,8 +49,7 @@ use reth_network_p2p::{
use reth_network_peers::PeerId; use reth_network_peers::PeerId;
use reth_network_types::ReputationChangeKind; use reth_network_types::ReputationChangeKind;
use reth_primitives::{ use reth_primitives::{
transaction::SignedTransactionIntoRecoveredExt, PooledTransactionsElement, RecoveredTx, transaction::SignedTransactionIntoRecoveredExt, RecoveredTx, TransactionSigned,
TransactionSigned,
}; };
use reth_primitives_traits::{SignedTransaction, TxType}; use reth_primitives_traits::{SignedTransaction, TxType};
use reth_tokio_util::EventStream; use reth_tokio_util::EventStream;
@ -1307,11 +1306,17 @@ where
// //
// spawned in `NodeConfig::start_network`(reth_node_core::NodeConfig) and // spawned in `NodeConfig::start_network`(reth_node_core::NodeConfig) and
// `NetworkConfig::start_network`(reth_network::NetworkConfig) // `NetworkConfig::start_network`(reth_network::NetworkConfig)
impl<Pool> Future for TransactionsManager<Pool> impl<Pool, N> Future for TransactionsManager<Pool, N>
where where
Pool: TransactionPool + Unpin + 'static, Pool: TransactionPool + Unpin + 'static,
Pool::Transaction: N: NetworkPrimitives<
PoolTransaction<Consensus = TransactionSigned, Pooled: Into<PooledTransactionsElement>>, BroadcastedTransaction: SignedTransaction,
PooledTransaction: SignedTransaction,
>,
Pool::Transaction: PoolTransaction<
Consensus = N::BroadcastedTransaction,
Pooled: Into<N::PooledTransaction> + From<RecoveredTx<N::PooledTransaction>>,
>,
{ {
type Output = (); type Output = ();