mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat: introduce networkprimitives in transition fetcher (#12889)
This commit is contained in:
@ -45,6 +45,7 @@ use reth_eth_wire::{
|
||||
DedupPayload, EthVersion, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData,
|
||||
PartiallyValidData, RequestTxHashes, ValidAnnouncementData,
|
||||
};
|
||||
use reth_eth_wire_types::{EthNetworkPrimitives, NetworkPrimitives};
|
||||
use reth_network_api::PeerRequest;
|
||||
use reth_network_p2p::error::{RequestError, RequestResult};
|
||||
use reth_network_peers::PeerId;
|
||||
@ -68,7 +69,7 @@ use validation::FilterOutcome;
|
||||
/// new requests on announced hashes.
|
||||
#[derive(Debug)]
|
||||
#[pin_project]
|
||||
pub struct TransactionFetcher {
|
||||
pub struct TransactionFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
|
||||
/// All peers with to which a [`GetPooledTransactions`] request is inflight.
|
||||
pub active_peers: LruMap<PeerId, u8, ByLength>,
|
||||
/// All currently active [`GetPooledTransactions`] requests.
|
||||
@ -77,7 +78,7 @@ pub struct TransactionFetcher {
|
||||
/// It's disjoint from the set of hashes which are awaiting an idle fallback peer in order to
|
||||
/// be fetched.
|
||||
#[pin]
|
||||
pub inflight_requests: FuturesUnordered<GetPooledTxRequestFut>,
|
||||
pub inflight_requests: FuturesUnordered<GetPooledTxRequestFut<N::PooledTransaction>>,
|
||||
/// Hashes that are awaiting an idle fallback peer so they can be fetched.
|
||||
///
|
||||
/// This is a subset of all hashes in the fetcher, and is disjoint from the set of hashes for
|
||||
@ -93,9 +94,7 @@ pub struct TransactionFetcher {
|
||||
metrics: TransactionFetcherMetrics,
|
||||
}
|
||||
|
||||
// === impl TransactionFetcher ===
|
||||
|
||||
impl TransactionFetcher {
|
||||
impl<N: NetworkPrimitives> TransactionFetcher<N> {
|
||||
/// Removes the peer from the active set.
|
||||
pub(crate) fn remove_peer(&mut self, peer_id: &PeerId) {
|
||||
self.active_peers.remove(peer_id);
|
||||
@ -429,7 +428,7 @@ impl TransactionFetcher {
|
||||
/// the request by checking the transactions seen by the peer against the buffer.
|
||||
pub fn on_fetch_pending_hashes(
|
||||
&mut self,
|
||||
peers: &HashMap<PeerId, PeerMetadata>,
|
||||
peers: &HashMap<PeerId, PeerMetadata<N>>,
|
||||
has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
|
||||
) {
|
||||
let init_capacity_req = approx_capacity_get_pooled_transactions_req_eth68(&self.info);
|
||||
@ -632,7 +631,7 @@ impl TransactionFetcher {
|
||||
pub fn request_transactions_from_peer(
|
||||
&mut self,
|
||||
new_announced_hashes: RequestTxHashes,
|
||||
peer: &PeerMetadata,
|
||||
peer: &PeerMetadata<N>,
|
||||
) -> Option<RequestTxHashes> {
|
||||
let peer_id: PeerId = peer.request_tx.peer_id;
|
||||
let conn_eth_version = peer.version;
|
||||
@ -896,7 +895,9 @@ impl TransactionFetcher {
|
||||
approx_capacity_get_pooled_transactions_req_eth66()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TransactionFetcher {
|
||||
/// Processes a resolved [`GetPooledTransactions`] request. Queues the outcome as a
|
||||
/// [`FetchEvent`], which will then be streamed by
|
||||
/// [`TransactionsManager`](super::TransactionsManager).
|
||||
@ -1044,7 +1045,7 @@ impl Stream for TransactionFetcher {
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for TransactionFetcher {
|
||||
impl<T: NetworkPrimitives> Default for TransactionFetcher<T> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
active_peers: LruMap::new(DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS),
|
||||
@ -1091,13 +1092,13 @@ impl TxFetchMetadata {
|
||||
|
||||
/// Represents possible events from fetching transactions.
|
||||
#[derive(Debug)]
|
||||
pub enum FetchEvent {
|
||||
pub enum FetchEvent<T = PooledTransactionsElement> {
|
||||
/// Triggered when transactions are successfully fetched.
|
||||
TransactionsFetched {
|
||||
/// The ID of the peer from which transactions were fetched.
|
||||
peer_id: PeerId,
|
||||
/// The transactions that were fetched, if available.
|
||||
transactions: PooledTransactions,
|
||||
transactions: PooledTransactions<T>,
|
||||
},
|
||||
/// Triggered when there is an error in fetching transactions.
|
||||
FetchError {
|
||||
@ -1115,22 +1116,22 @@ pub enum FetchEvent {
|
||||
|
||||
/// An inflight request for [`PooledTransactions`] from a peer.
|
||||
#[derive(Debug)]
|
||||
pub struct GetPooledTxRequest {
|
||||
pub struct GetPooledTxRequest<T = PooledTransactionsElement> {
|
||||
peer_id: PeerId,
|
||||
/// Transaction hashes that were requested, for cleanup purposes
|
||||
requested_hashes: RequestTxHashes,
|
||||
response: oneshot::Receiver<RequestResult<PooledTransactions>>,
|
||||
response: oneshot::Receiver<RequestResult<PooledTransactions<T>>>,
|
||||
}
|
||||
|
||||
/// Upon reception of a response, a [`GetPooledTxRequest`] is deconstructed to form a
|
||||
/// [`GetPooledTxResponse`].
|
||||
#[derive(Debug)]
|
||||
pub struct GetPooledTxResponse {
|
||||
pub struct GetPooledTxResponse<T = PooledTransactionsElement> {
|
||||
peer_id: PeerId,
|
||||
/// Transaction hashes that were requested, for cleanup purposes, since peer may only return a
|
||||
/// subset of requested hashes.
|
||||
requested_hashes: RequestTxHashes,
|
||||
result: Result<RequestResult<PooledTransactions>, RecvError>,
|
||||
result: Result<RequestResult<PooledTransactions<T>>, RecvError>,
|
||||
}
|
||||
|
||||
/// Stores the response receiver made by sending a [`GetPooledTransactions`] request to a peer's
|
||||
@ -1138,24 +1139,24 @@ pub struct GetPooledTxResponse {
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
#[pin_project::pin_project]
|
||||
#[derive(Debug)]
|
||||
pub struct GetPooledTxRequestFut {
|
||||
pub struct GetPooledTxRequestFut<T = PooledTransactionsElement> {
|
||||
#[pin]
|
||||
inner: Option<GetPooledTxRequest>,
|
||||
inner: Option<GetPooledTxRequest<T>>,
|
||||
}
|
||||
|
||||
impl GetPooledTxRequestFut {
|
||||
impl<T> GetPooledTxRequestFut<T> {
|
||||
#[inline]
|
||||
const fn new(
|
||||
peer_id: PeerId,
|
||||
requested_hashes: RequestTxHashes,
|
||||
response: oneshot::Receiver<RequestResult<PooledTransactions>>,
|
||||
response: oneshot::Receiver<RequestResult<PooledTransactions<T>>>,
|
||||
) -> Self {
|
||||
Self { inner: Some(GetPooledTxRequest { peer_id, requested_hashes, response }) }
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for GetPooledTxRequestFut {
|
||||
type Output = GetPooledTxResponse;
|
||||
impl<T> Future for GetPooledTxRequestFut<T> {
|
||||
type Output = GetPooledTxResponse<T>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let mut req = self.as_mut().project().inner.take().expect("polled after completion");
|
||||
@ -1372,7 +1373,7 @@ mod test {
|
||||
|
||||
// RIG TEST
|
||||
|
||||
let tx_fetcher = &mut TransactionFetcher::default();
|
||||
let tx_fetcher = &mut TransactionFetcher::<EthNetworkPrimitives>::default();
|
||||
|
||||
let eth68_hashes = [
|
||||
B256::from_slice(&[1; 32]),
|
||||
|
||||
@ -212,7 +212,7 @@ pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives
|
||||
/// From which we get all new incoming transaction related messages.
|
||||
network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
|
||||
/// Transaction fetcher to handle inflight and missing transaction requests.
|
||||
transaction_fetcher: TransactionFetcher,
|
||||
transaction_fetcher: TransactionFetcher<N>,
|
||||
/// All currently pending transactions grouped by peers.
|
||||
///
|
||||
/// This way we can track incoming transactions and prevent multiple pool imports for the same
|
||||
@ -235,7 +235,7 @@ pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives
|
||||
/// Bad imports.
|
||||
bad_imports: LruCache<TxHash>,
|
||||
/// All the connected peers.
|
||||
peers: HashMap<PeerId, PeerMetadata>,
|
||||
peers: HashMap<PeerId, PeerMetadata<N>>,
|
||||
/// Send half for the command channel.
|
||||
///
|
||||
/// This is kept so that a new [`TransactionsHandle`] can be created at any time.
|
||||
@ -1731,23 +1731,23 @@ impl TransactionSource {
|
||||
|
||||
/// Tracks a single peer in the context of [`TransactionsManager`].
|
||||
#[derive(Debug)]
|
||||
pub struct PeerMetadata {
|
||||
pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
|
||||
/// Optimistically keeps track of transactions that we know the peer has seen. Optimistic, in
|
||||
/// the sense that transactions are preemptively marked as seen by peer when they are sent to
|
||||
/// the peer.
|
||||
seen_transactions: LruCache<TxHash>,
|
||||
/// A communication channel directly to the peer's session task.
|
||||
request_tx: PeerRequestSender,
|
||||
request_tx: PeerRequestSender<PeerRequest<N>>,
|
||||
/// negotiated version of the session.
|
||||
version: EthVersion,
|
||||
/// The peer's client version.
|
||||
client_version: Arc<str>,
|
||||
}
|
||||
|
||||
impl PeerMetadata {
|
||||
impl<N: NetworkPrimitives> PeerMetadata<N> {
|
||||
/// Returns a new instance of [`PeerMetadata`].
|
||||
fn new(
|
||||
request_tx: PeerRequestSender,
|
||||
request_tx: PeerRequestSender<PeerRequest<N>>,
|
||||
version: EthVersion,
|
||||
client_version: Arc<str>,
|
||||
max_transactions_seen_by_peer: u32,
|
||||
|
||||
Reference in New Issue
Block a user