Verify tx response data against request (#6439)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Oliver Nordbjerg <onbjerg@users.noreply.github.com>
This commit is contained in:
Emilia Hane
2024-02-27 18:09:27 +01:00
committed by GitHub
parent a47c62c84b
commit 0007c9a4d2
14 changed files with 663 additions and 476 deletions

View File

@ -5,7 +5,7 @@ use alloy_rlp::{
Decodable, Encodable, RlpDecodable, RlpDecodableWrapper, RlpEncodable, RlpEncodableWrapper,
};
use derive_more::{Constructor, Deref, DerefMut, IntoIterator};
use derive_more::{Constructor, Deref, DerefMut, From, IntoIterator};
use reth_codecs::derive_arbitrary;
use reth_primitives::{
Block, Bytes, PooledTransactionsElement, TransactionSigned, TxHash, B256, U128,
@ -441,6 +441,95 @@ impl Decodable for NewPooledTransactionHashes68 {
}
}
/// Validation pass that checks for unique transaction hashes.
pub trait DedupPayload {
/// Value type in [`PartiallyValidData`] map.
type Value;
/// The payload contains no entries.
fn is_empty(&self) -> bool;
/// Returns the number of entries.
fn len(&self) -> usize;
/// Consumes self, returning an iterator over hashes in payload.
fn dedup(self) -> PartiallyValidData<Self::Value>;
}
/// Value in [`PartiallyValidData`] map obtained from an announcement.
pub type Eth68TxMetadata = Option<(u8, usize)>;
impl DedupPayload for NewPooledTransactionHashes {
type Value = Eth68TxMetadata;
fn is_empty(&self) -> bool {
self.is_empty()
}
fn len(&self) -> usize {
self.len()
}
fn dedup(self) -> PartiallyValidData<Self::Value> {
match self {
NewPooledTransactionHashes::Eth66(msg) => msg.dedup(),
NewPooledTransactionHashes::Eth68(msg) => msg.dedup(),
}
}
}
impl DedupPayload for NewPooledTransactionHashes68 {
type Value = Eth68TxMetadata;
fn is_empty(&self) -> bool {
self.hashes.is_empty()
}
fn len(&self) -> usize {
self.hashes.len()
}
fn dedup(self) -> PartiallyValidData<Self::Value> {
let Self { hashes, mut sizes, mut types } = self;
let mut deduped_data = HashMap::with_capacity(hashes.len());
for hash in hashes.into_iter().rev() {
if let (Some(ty), Some(size)) = (types.pop(), sizes.pop()) {
deduped_data.insert(hash, Some((ty, size)));
}
}
PartiallyValidData::from_raw_data_eth68(deduped_data)
}
}
impl DedupPayload for NewPooledTransactionHashes66 {
type Value = Eth68TxMetadata;
fn is_empty(&self) -> bool {
self.0.is_empty()
}
fn len(&self) -> usize {
self.0.len()
}
fn dedup(self) -> PartiallyValidData<Self::Value> {
let Self(hashes) = self;
let mut deduped_data = HashMap::with_capacity(hashes.len());
let noop_value: Eth68TxMetadata = None;
for hash in hashes.into_iter().rev() {
deduped_data.insert(hash, noop_value);
}
PartiallyValidData::from_raw_data_eth66(deduped_data)
}
}
/// Interface for handling mempool message data. Used in various filters in pipelines in
/// `TransactionsManager` and in queries to `TransactionPool`.
pub trait HandleMempoolData {
@ -450,9 +539,8 @@ pub trait HandleMempoolData {
/// Returns the number of entries.
fn len(&self) -> usize;
/// Retain only entries for which the hash in the entry satisfies a given predicate, return
/// the rest.
fn retain_by_hash(&mut self, f: impl FnMut(&TxHash) -> bool) -> Self;
/// Retain only entries for which the hash in the entry satisfies a given predicate.
fn retain_by_hash(&mut self, f: impl FnMut(&TxHash) -> bool);
}
/// Extension of [`HandleMempoolData`] interface, for mempool messages that are versioned.
@ -462,7 +550,7 @@ pub trait HandleVersionedMempoolData {
fn msg_version(&self) -> EthVersion;
}
impl HandleMempoolData for NewPooledTransactionHashes {
impl HandleMempoolData for Vec<PooledTransactionsElement> {
fn is_empty(&self) -> bool {
self.is_empty()
}
@ -471,163 +559,119 @@ impl HandleMempoolData for NewPooledTransactionHashes {
self.len()
}
fn retain_by_hash(&mut self, f: impl FnMut(&TxHash) -> bool) -> Self {
match self {
NewPooledTransactionHashes::Eth66(msg) => Self::Eth66(msg.retain_by_hash(f)),
NewPooledTransactionHashes::Eth68(msg) => Self::Eth68(msg.retain_by_hash(f)),
}
fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) {
self.retain(|tx| f(tx.hash()))
}
}
impl HandleVersionedMempoolData for NewPooledTransactionHashes {
fn msg_version(&self) -> EthVersion {
self.version()
}
}
macro_rules! handle_mempool_data_map_impl {
($data_ty:ty, $(<$generic:ident>)?) => {
impl$(<$generic>)? HandleMempoolData for $data_ty {
fn is_empty(&self) -> bool {
self.data.is_empty()
}
impl HandleMempoolData for NewPooledTransactionHashes68 {
fn is_empty(&self) -> bool {
self.hashes.is_empty()
}
fn len(&self) -> usize {
self.data.len()
}
fn len(&self) -> usize {
self.hashes.len()
}
fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) -> Self {
let mut indices_to_remove = vec![];
for (i, hash) in self.hashes.iter().enumerate() {
if !f(hash) {
indices_to_remove.push(i);
fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) {
self.data.retain(|hash, _| f(hash));
}
}
};
}
let mut removed_hashes = Vec::with_capacity(indices_to_remove.len());
let mut removed_types = Vec::with_capacity(indices_to_remove.len());
let mut removed_sizes = Vec::with_capacity(indices_to_remove.len());
/// Data that has passed an initial validation pass that is not specific to any mempool message
/// type.
#[derive(Debug, Deref, DerefMut, IntoIterator)]
pub struct PartiallyValidData<V> {
#[deref]
#[deref_mut]
#[into_iterator]
data: HashMap<TxHash, V>,
version: Option<EthVersion>,
}
for index in indices_to_remove.into_iter().rev() {
let hash = self.hashes.remove(index);
removed_hashes.push(hash);
let ty = self.types.remove(index);
removed_types.push(ty);
let size = self.sizes.remove(index);
removed_sizes.push(size);
}
handle_mempool_data_map_impl!(PartiallyValidData<V>, <V>);
Self { hashes: removed_hashes, types: removed_types, sizes: removed_sizes }
impl<V> PartiallyValidData<V> {
/// Wraps raw data.
pub fn from_raw_data(data: HashMap<TxHash, V>, version: Option<EthVersion>) -> Self {
Self { data, version }
}
/// Wraps raw data with version [`EthVersion::Eth68`].
pub fn from_raw_data_eth68(data: HashMap<TxHash, V>) -> Self {
Self::from_raw_data(data, Some(EthVersion::Eth68))
}
/// Wraps raw data with version [`EthVersion::Eth66`].
pub fn from_raw_data_eth66(data: HashMap<TxHash, V>) -> Self {
Self::from_raw_data(data, Some(EthVersion::Eth66))
}
/// Returns a new [`PartiallyValidData`] with empty data from an [`Eth68`](EthVersion::Eth68)
/// announcement.
pub fn empty_eth68() -> Self {
Self::from_raw_data_eth68(HashMap::new())
}
/// Returns a new [`PartiallyValidData`] with empty data from an [`Eth66`](EthVersion::Eth66)
/// announcement.
pub fn empty_eth66() -> Self {
Self::from_raw_data_eth66(HashMap::new())
}
/// Returns the version of the message this data was received in if different versions of the
/// message exists, either [`Eth66`](EthVersion::Eth66) or [`Eth68`](EthVersion::Eth68).
pub fn msg_version(&self) -> Option<EthVersion> {
self.version
}
/// Destructs returning the validated data.
pub fn into_data(self) -> HashMap<TxHash, V> {
self.data
}
}
impl HandleVersionedMempoolData for NewPooledTransactionHashes68 {
fn msg_version(&self) -> EthVersion {
EthVersion::Eth68
}
}
impl HandleMempoolData for NewPooledTransactionHashes66 {
fn is_empty(&self) -> bool {
self.0.is_empty()
}
fn len(&self) -> usize {
self.0.len()
}
fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) -> Self {
let mut indices_to_remove = vec![];
for (i, hash) in self.0.iter().enumerate() {
if !f(hash) {
indices_to_remove.push(i);
}
}
let mut removed_hashes = Vec::with_capacity(indices_to_remove.len());
for index in indices_to_remove.into_iter().rev() {
let hash = self.0.remove(index);
removed_hashes.push(hash);
}
Self(removed_hashes)
}
}
impl HandleVersionedMempoolData for NewPooledTransactionHashes66 {
fn msg_version(&self) -> EthVersion {
EthVersion::Eth66
}
}
/// Announcement data that has been validated according to the configured network. For an eth68
/// announcement, values of the map are `Some((u8, usize))` - the tx metadata. For an eth66
/// announcement, values of the map are `None`.
#[derive(Debug, Deref, DerefMut, IntoIterator, Constructor)]
/// Partially validated data from an announcement or a
/// [`PooledTransactions`](crate::PooledTransactions) response.
#[derive(Debug, Deref, DerefMut, IntoIterator, From)]
#[from(PartiallyValidData<Eth68TxMetadata>)]
pub struct ValidAnnouncementData {
#[deref]
#[deref_mut]
#[into_iterator]
data: HashMap<TxHash, Option<(u8, usize)>>,
data: HashMap<TxHash, Eth68TxMetadata>,
version: EthVersion,
}
handle_mempool_data_map_impl!(ValidAnnouncementData,);
impl ValidAnnouncementData {
/// Returns a new [`ValidAnnouncementData`] wrapper around validated
/// [`Eth68`](EthVersion::Eth68) announcement data.
pub fn new_eth68(data: HashMap<TxHash, Option<(u8, usize)>>) -> Self {
Self::new(data, EthVersion::Eth68)
}
/// Returns a new [`ValidAnnouncementData`] wrapper around validated
/// [`Eth68`](EthVersion::Eth68) announcement data.
pub fn new_eth66(data: HashMap<TxHash, Option<(u8, usize)>>) -> Self {
Self::new(data, EthVersion::Eth66)
}
/// Returns a new [`ValidAnnouncementData`] with empty data from an [`Eth68`](EthVersion::Eth68)
/// announcement.
pub fn empty_eth68() -> Self {
Self::new_eth68(HashMap::new())
}
/// Returns a new [`ValidAnnouncementData`] with empty data from an [`Eth66`](EthVersion::Eth66)
/// announcement.
pub fn empty_eth66() -> Self {
Self::new_eth66(HashMap::new())
}
/// Destructs returning the validated data.
pub fn into_data(self) -> HashMap<TxHash, Option<(u8, usize)>> {
self.data
}
/// Destructs returning only the valid hashes and the announcement message version. Caution! If
/// this is [`Eth68`](EthVersion::Eth68)announcement data, the metadata must be cached
/// before call.
/// this is [`Eth68`](EthVersion::Eth68) announcement data, this drops the metadata.
pub fn into_request_hashes(self) -> (RequestTxHashes, EthVersion) {
let hashes = self.data.into_keys().collect::<Vec<_>>();
(RequestTxHashes::new(hashes), self.version)
}
}
impl HandleMempoolData for ValidAnnouncementData {
fn is_empty(&self) -> bool {
self.data.is_empty()
/// Conversion from [`PartiallyValidData`] from an announcement. Note! [`PartiallyValidData`]
/// from an announcement, should have some [`EthVersion`]. Panics if [`PartiallyValidData`] has
/// version set to `None`.
pub fn from_partially_valid_data(data: PartiallyValidData<Eth68TxMetadata>) -> Self {
let PartiallyValidData { data, version } = data;
let version = version.expect("should have eth version for conversion");
Self { data, version }
}
fn len(&self) -> usize {
self.data.len()
}
fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) -> Self {
let data = std::mem::take(&mut self.data);
let (keep, rest) = data.into_iter().partition(|(hash, _)| f(hash));
self.data = keep;
ValidAnnouncementData::new(rest, self.version)
/// Destructs returning the validated data.
pub fn into_data(self) -> HashMap<TxHash, Eth68TxMetadata> {
self.data
}
}
@ -656,8 +700,8 @@ impl RequestTxHashes {
}
}
impl FromIterator<(TxHash, Option<(u8, usize)>)> for RequestTxHashes {
fn from_iter<I: IntoIterator<Item = (TxHash, Option<(u8, usize)>)>>(iter: I) -> Self {
impl FromIterator<(TxHash, Eth68TxMetadata)> for RequestTxHashes {
fn from_iter<I: IntoIterator<Item = (TxHash, Eth68TxMetadata)>>(iter: I) -> Self {
let mut hashes = Vec::with_capacity(32);
for (hash, _) in iter {
@ -670,34 +714,6 @@ impl FromIterator<(TxHash, Option<(u8, usize)>)> for RequestTxHashes {
}
}
impl HandleMempoolData for Vec<PooledTransactionsElement> {
fn is_empty(&self) -> bool {
self.is_empty()
}
fn len(&self) -> usize {
self.len()
}
fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) -> Self {
let mut indices_to_remove = vec![];
for (i, tx) in self.iter().enumerate() {
if !f(tx.hash()) {
indices_to_remove.push(i);
}
}
let mut removed_txns = Vec::with_capacity(indices_to_remove.len());
for index in indices_to_remove.into_iter().rev() {
let hash = self.remove(index);
removed_txns.push(hash);
}
removed_txns
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -1,7 +1,7 @@
//! Implements the `GetPooledTransactions` and `PooledTransactions` message types.
use alloy_rlp::{RlpDecodableWrapper, RlpEncodableWrapper};
use derive_more::Deref;
use derive_more::{Constructor, Deref, IntoIterator};
use reth_codecs::derive_arbitrary;
use reth_primitives::{PooledTransactionsElement, TransactionSigned, B256};
@ -34,7 +34,18 @@ where
/// corresponds to a requested hash. Hashes may need to be re-requested if the bodies are not
/// included in the response.
// #[derive_arbitrary(rlp, 10)]
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default, Deref)]
#[derive(
Clone,
Debug,
PartialEq,
Eq,
RlpEncodableWrapper,
RlpDecodableWrapper,
Default,
IntoIterator,
Deref,
Constructor,
)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct PooledTransactions(
/// The transaction bodies, each of which should correspond to a requested hash.
@ -54,6 +65,12 @@ impl From<Vec<TransactionSigned>> for PooledTransactions {
}
}
impl FromIterator<PooledTransactionsElement> for PooledTransactions {
fn from_iter<I: IntoIterator<Item = PooledTransactionsElement>>(iter: I) -> Self {
PooledTransactions(iter.into_iter().collect())
}
}
#[cfg(test)]
mod tests {
use crate::{message::RequestPair, GetPooledTransactions, PooledTransactions};

View File

@ -68,6 +68,7 @@ derive_more.workspace = true
schnellru.workspace = true
itertools.workspace = true
tempfile = { workspace = true, optional = true }
smallvec.workspace = true
[dev-dependencies]
# reth

View File

@ -148,6 +148,6 @@ pub use session::{
PendingSessionHandle, PendingSessionHandshakeError, SessionCommand, SessionEvent, SessionId,
SessionLimits, SessionManager, SessionsConfig,
};
pub use transactions::{AnnouncementFilter, FilterAnnouncement, ValidateTx68};
pub use transactions::{FilterAnnouncement, MessageFilter, ValidateTx68};
pub use reth_eth_wire::{DisconnectReason, HelloMessageWithProtocols};

View File

@ -9,10 +9,10 @@ pub use reputation::ReputationChangeWeights;
pub use reth_network_api::PeerKind;
/// Maximum number of available slots for outbound sessions.
pub(crate) const DEFAULT_MAX_COUNT_PEERS_OUTBOUND: u32 = 100;
pub const DEFAULT_MAX_COUNT_PEERS_OUTBOUND: u32 = 100;
/// Maximum number of available slots for inbound sessions.
pub(crate) const DEFAULT_MAX_COUNT_PEERS_INBOUND: u32 = 30;
pub const DEFAULT_MAX_COUNT_PEERS_INBOUND: u32 = 30;
/// Maximum number of available slots concurrent outgoing dials.
pub(crate) const DEFAULT_MAX_COUNT_CONCURRENT_DIALS: usize = 10;
pub const DEFAULT_MAX_COUNT_CONCURRENT_DIALS: usize = 10;

View File

@ -33,6 +33,7 @@ pub const SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST: usize = 25
/// <https://github.com/ethereum/devp2p/blob/master/caps/eth.md#protocol-messages>.
pub const SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE: usize = 2 * 1024 * 1024;
/// Constants used by [`TransactionsManager`](super::TransactionsManager).
pub mod tx_manager {
use super::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
@ -54,6 +55,7 @@ pub mod tx_manager {
pub const DEFAULT_CAPACITY_CACHE_BAD_IMPORTS: usize = 100 * 1024;
}
/// Constants used by [`TransactionFetcher`](super::TransactionFetcher).
pub mod tx_fetcher {
use crate::{
peers::{DEFAULT_MAX_COUNT_PEERS_INBOUND, DEFAULT_MAX_COUNT_PEERS_OUTBOUND},
@ -100,7 +102,7 @@ pub mod tx_fetcher {
/// once from each individual peer.
///
/// Default is 1 peer.
const DEFAULT_MARGINAL_COUNT_FALLBACK_PEERS: u8 = 1;
pub const DEFAULT_MARGINAL_COUNT_FALLBACK_PEERS: u8 = 1;
/* ==================== CONCURRENCY ==================== */

View File

@ -1,31 +1,37 @@
use crate::{
cache::{LruCache, LruMap},
message::PeerRequest,
transactions::{validation, PartiallyFilterMessage},
};
use derive_more::Constructor;
use derive_more::{Constructor, Deref};
use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
use pin_project::pin_project;
use reth_eth_wire::{
EthVersion, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData,
RequestTxHashes, ValidAnnouncementData,
DedupPayload, EthVersion, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData,
PartiallyValidData, RequestTxHashes, ValidAnnouncementData,
};
use reth_interfaces::p2p::error::{RequestError, RequestResult};
use reth_metrics::common::mpsc::{
metered_unbounded_channel, UnboundedMeteredReceiver, UnboundedMeteredSender,
};
use reth_primitives::{PeerId, PooledTransactionsElement, TxHash};
use schnellru::{ByLength, Unlimited};
use smallvec::{smallvec, SmallVec};
use std::{
collections::HashMap,
num::NonZeroUsize,
pin::Pin,
task::{Context, Poll},
task::{ready, Context, Poll},
};
use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError};
use tracing::{debug, trace};
use validation::FilterOutcome;
use super::{
config::TransactionFetcherConfig,
constants::{tx_fetcher::*, SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST},
AnnouncementFilter, Peer, PooledTransactions,
MessageFilter, PeerMetadata, PooledTransactions,
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
};
@ -35,27 +41,33 @@ use super::{
/// new requests on announced hashes.
#[derive(Debug)]
#[pin_project]
pub(crate) struct TransactionFetcher {
pub struct TransactionFetcher {
/// All peers with to which a [`GetPooledTransactions`] request is inflight.
pub(super) active_peers: LruMap<PeerId, u8, ByLength>,
pub active_peers: LruMap<PeerId, u8, ByLength>,
/// All currently active [`GetPooledTransactions`] requests.
///
/// The set of hashes encompassed by these requests are a subset of all hashes in the fetcher.
/// It's disjoint from the set of hashes which are awaiting an idle fallback peer in order to
/// be fetched.
#[pin]
pub(super) inflight_requests: FuturesUnordered<GetPooledTxRequestFut>,
pub inflight_requests: FuturesUnordered<GetPooledTxRequestFut>,
/// Hashes that are awaiting an idle fallback peer so they can be fetched.
///
/// This is a subset of all hashes in the fetcher, and is disjoint from the set of hashes for
/// which a [`GetPooledTransactions`] request is inflight.
pub(super) hashes_pending_fetch: LruCache<TxHash>,
pub hashes_pending_fetch: LruCache<TxHash>,
/// Tracks all hashes in the transaction fetcher.
pub(super) hashes_fetch_inflight_and_pending_fetch: LruMap<TxHash, TxFetchMetadata, Unlimited>,
/// Filter for valid eth68 announcements.
pub(super) filter_valid_hashes: AnnouncementFilter,
/// Filter for valid announcement and response data.
pub(super) filter_valid_message: MessageFilter,
/// Info on capacity of the transaction fetcher.
pub(super) info: TransactionFetcherInfo,
pub info: TransactionFetcherInfo,
/// [`FetchEvent`]s as a result of advancing inflight requests. This is an intermediary ¨
/// storage, before [`TransactionsManager`](super::TransactionsManager) streams them.
#[pin]
pub fetch_events_head: UnboundedMeteredReceiver<FetchEvent>,
/// Handle for queueing [`FetchEvent`]s as a result of advancing inflight requests.
pub fetch_events_tail: UnboundedMeteredSender<FetchEvent>,
}
// === impl TransactionFetcher ===
@ -100,7 +112,7 @@ impl TransactionFetcher {
}
/// Returns `true` if peer is idle with respect to `self.inflight_requests`.
pub(super) fn is_idle(&self, peer_id: &PeerId) -> bool {
pub fn is_idle(&self, peer_id: &PeerId) -> bool {
let Some(inflight_count) = self.active_peers.peek(peer_id) else { return true };
if *inflight_count < DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER {
return true
@ -109,7 +121,7 @@ impl TransactionFetcher {
}
/// Returns any idle peer for the given hash.
pub(super) fn get_idle_peer_for(
pub fn get_idle_peer_for(
&self,
hash: TxHash,
is_session_active: impl Fn(&PeerId) -> bool,
@ -131,7 +143,7 @@ impl TransactionFetcher {
///
/// Loops through the hashes pending fetch in lru order until one is found with an idle
/// fallback peer, or the budget passed as parameter is depleted, whatever happens first.
pub(super) fn find_any_idle_fallback_peer_for_any_pending_hash(
pub fn find_any_idle_fallback_peer_for_any_pending_hash(
&mut self,
hashes_to_request: &mut RequestTxHashes,
is_session_active: impl Fn(&PeerId) -> bool,
@ -169,7 +181,7 @@ impl TransactionFetcher {
/// a [`RequestTxHashes`] buffer as parameter for filling with hashes to request.
///
/// Returns left over hashes.
pub(super) fn pack_request(
pub fn pack_request(
&mut self,
hashes_to_request: &mut RequestTxHashes,
hashes_from_announcement: ValidAnnouncementData,
@ -190,7 +202,7 @@ impl TransactionFetcher {
/// Loops through hashes passed as parameter and checks if a hash fits in the expected
/// response. If no, it's added to surplus hashes. If yes, it's added to hashes to the request
/// and expected response size is accumulated.
pub(super) fn pack_request_eth68(
pub fn pack_request_eth68(
&mut self,
hashes_to_request: &mut RequestTxHashes,
hashes_from_announcement: impl HandleMempoolData
@ -258,7 +270,7 @@ impl TransactionFetcher {
/// hashes to request.
///
/// Returns left over hashes.
pub(super) fn pack_request_eth66(
pub fn pack_request_eth66(
&mut self,
hashes_to_request: &mut RequestTxHashes,
hashes_from_announcement: ValidAnnouncementData,
@ -280,7 +292,7 @@ impl TransactionFetcher {
}
/// Tries to buffer hashes for retry.
pub(super) fn buffer_hashes_for_retry(
pub fn try_buffer_hashes_for_retry(
&mut self,
mut hashes: RequestTxHashes,
peer_failed_to_serve: &PeerId,
@ -301,9 +313,9 @@ impl TransactionFetcher {
/// Buffers hashes. Note: Only peers that haven't yet tried to request the hashes should be
/// passed as `fallback_peer` parameter! For re-buffering hashes on failed request, use
/// [`TransactionFetcher::buffer_hashes_for_retry`]. Hashes that have been re-requested
/// [`TransactionFetcher::try_buffer_hashes_for_retry`]. Hashes that have been re-requested
/// [`DEFAULT_MAX_RETRIES`], are dropped.
pub(super) fn buffer_hashes(&mut self, hashes: RequestTxHashes, fallback_peer: Option<PeerId>) {
pub fn buffer_hashes(&mut self, hashes: RequestTxHashes, fallback_peer: Option<PeerId>) {
let mut max_retried_and_evicted_hashes = vec![];
for hash in hashes.into_iter() {
@ -349,15 +361,14 @@ impl TransactionFetcher {
///
/// Finds the first buffered hash with a fallback peer that is idle, if any. Fills the rest of
/// the request by checking the transactions seen by the peer against the buffer.
pub(super) fn on_fetch_pending_hashes(
pub fn on_fetch_pending_hashes(
&mut self,
peers: &HashMap<PeerId, Peer>,
peers: &HashMap<PeerId, PeerMetadata>,
has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
metrics_increment_egress_peer_channel_full: impl FnOnce(),
) {
let init_capacity_req = approx_capacity_get_pooled_transactions_req_eth68(&self.info);
let mut hashes_to_request = RequestTxHashes::with_capacity(init_capacity_req);
let is_session_active = |peer_id: &PeerId| peers.contains_key(peer_id);
// budget to look for an idle peer before giving up
@ -420,7 +431,7 @@ impl TransactionFetcher {
/// Filters out hashes that have been seen before. For hashes that have already been seen, the
/// peer is added as fallback peer.
pub(super) fn filter_unseen_and_pending_hashes(
pub fn filter_unseen_and_pending_hashes(
&mut self,
new_announced_hashes: &mut ValidAnnouncementData,
is_tx_bad_import: impl Fn(&TxHash) -> bool,
@ -504,7 +515,7 @@ impl TransactionFetcher {
debug!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=%hash,
msg_version=%msg_version,
msg_version=?msg_version,
client_version=%client_version,
"failed to cache new announced hash from peer in schnellru::LruMap, dropping hash"
);
@ -518,7 +529,7 @@ impl TransactionFetcher {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
previously_unseen_hashes_count=previously_unseen_hashes_count,
msg_version=%msg_version,
msg_version=?msg_version,
client_version=%client_version,
"received previously unseen hashes in announcement from peer"
);
@ -526,7 +537,7 @@ impl TransactionFetcher {
#[cfg(debug_assertions)]
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
msg_version=%msg_version,
msg_version=?msg_version,
client_version=%client_version,
previously_unseen_hashes_len=?previously_unseen_hashes.len(),
previously_unseen_hashes=?previously_unseen_hashes,
@ -541,10 +552,10 @@ impl TransactionFetcher {
/// This filters all announced hashes that are already in flight, and requests the missing,
/// while marking the given peer as an alternative peer for the hashes that are already in
/// flight.
pub(super) fn request_transactions_from_peer(
pub fn request_transactions_from_peer(
&mut self,
new_announced_hashes: RequestTxHashes,
peer: &Peer,
peer: &PeerMetadata,
metrics_increment_egress_peer_channel_full: impl FnOnce(),
) -> Option<RequestTxHashes> {
let peer_id: PeerId = peer.request_tx.peer_id;
@ -648,7 +659,7 @@ impl TransactionFetcher {
/// 3. Accumulate expected total response size.
/// 4. Check if acc size and hashes count is at limit, if so stop looping.
/// 5. Remove hashes to request from cache of hashes pending fetch.
pub(super) fn fill_request_from_hashes_pending_fetch(
pub fn fill_request_from_hashes_pending_fetch(
&mut self,
hashes_to_request: &mut RequestTxHashes,
seen_hashes: &LruCache<TxHash>,
@ -716,7 +727,7 @@ impl TransactionFetcher {
/// Returns `true` if [`TransactionFetcher`] has capacity to request pending hashes. Returns
/// `false` if [`TransactionFetcher`] is operating close to full capacity.
pub(super) fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
pub fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
let info = &self.info;
self.has_capacity(info.max_inflight_requests)
@ -732,7 +743,7 @@ impl TransactionFetcher {
/// Returns `Some(limit)` if [`TransactionFetcher`] and the
/// [`TransactionPool`](reth_transaction_pool::TransactionPool) are operating close to full
/// capacity. Returns `None`, unlimited, if they are not that busy.
pub(super) fn search_breadth_budget_find_idle_fallback_peer(
pub fn search_breadth_budget_find_idle_fallback_peer(
&self,
has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
) -> Option<usize> {
@ -771,7 +782,7 @@ impl TransactionFetcher {
/// Returns `Some(limit)` if [`TransactionFetcher`] and the
/// [`TransactionPool`](reth_transaction_pool::TransactionPool) are operating close to full
/// capacity. Returns `None`, unlimited, if they are not that busy.
pub(super) fn search_breadth_budget_find_intersection_pending_hashes_and_hashes_seen_by_peer(
pub fn search_breadth_budget_find_intersection_pending_hashes_and_hashes_seen_by_peer(
&self,
has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
) -> Option<usize> {
@ -816,22 +827,19 @@ impl TransactionFetcher {
approx_capacity_get_pooled_transactions_req_eth66()
}
}
}
impl Stream for TransactionFetcher {
type Item = FetchEvent;
/// Processes a resolved [`GetPooledTransactions`] request. Queues the outcome as a
/// [`FetchEvent`], which will then be streamed by
/// [`TransactionsManager`](super::TransactionsManager).
pub fn on_resolved_get_pooled_transactions_request_fut(
&mut self,
response: GetPooledTxResponse,
) -> FetchEvent {
// update peer activity, requests for buffered hashes can only be made to idle
// fallback peers
let GetPooledTxResponse { peer_id, mut requested_hashes, result } = response;
/// Advances all inflight requests and returns the next event.
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut().project();
let res = this.inflight_requests.poll_next_unpin(cx);
if let Poll::Ready(Some(response)) = res {
// update peer activity, requests for buffered hashes can only be made to idle
// fallback peers
let GetPooledTxResponse { peer_id, mut requested_hashes, result } = response;
debug_assert!(
debug_assert!(
self.active_peers.get(&peer_id).is_some(),
"`%peer_id` has been removed from `@active_peers` before inflight request(s) resolved, broken invariant `@active_peers` and `@inflight_requests`,
`%peer_id`: {},
@ -839,45 +847,90 @@ impl Stream for TransactionFetcher {
peer_id, self
);
self.decrement_inflight_request_count_for(&peer_id);
self.decrement_inflight_request_count_for(&peer_id);
return match result {
Ok(Ok(transactions)) => {
// clear received hashes
let mut fetched = Vec::with_capacity(transactions.len());
requested_hashes.retain(|requested_hash| {
if transactions.hashes().any(|hash| hash == requested_hash) {
// hash is now known, stop tracking
fetched.push(*requested_hash);
return false
}
true
});
fetched.shrink_to_fit();
match result {
Ok(Ok(transactions)) => {
let payload = UnverifiedPooledTransactions::new(transactions);
self.remove_hashes_from_transaction_fetcher(fetched);
let unverified_len = payload.len();
let (verification_outcome, verified_payload) =
payload.verify(&requested_hashes, &peer_id);
// buffer left over hashes
self.buffer_hashes_for_retry(requested_hashes, &peer_id);
Poll::Ready(Some(FetchEvent::TransactionsFetched {
peer_id,
transactions: transactions.0,
}))
if let VerificationOutcome::ReportPeer = verification_outcome {
// todo: report peer for sending hashes that weren't requested
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
unverified_len=unverified_len,
verified_payload_len=verified_payload.len(),
"received `PooledTransactions` response from peer with entries that didn't verify against request, filtered out transactions"
);
}
Ok(Err(req_err)) => {
self.buffer_hashes_for_retry(requested_hashes, &peer_id);
Poll::Ready(Some(FetchEvent::FetchError { peer_id, error: req_err }))
}
Err(_) => {
self.buffer_hashes_for_retry(requested_hashes, &peer_id);
// request channel closed/dropped
Poll::Ready(Some(FetchEvent::FetchError {
peer_id,
error: RequestError::ChannelClosed,
}))
let unvalidated_payload_len = verified_payload.len();
// todo: report peer for sending invalid response
// <https://github.com/paradigmxyz/reth/issues/6529>
let (validation_outcome, valid_payload) =
self.filter_valid_message.partially_filter_valid_entries(verified_payload);
if let FilterOutcome::ReportPeer = validation_outcome {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
unvalidated_payload_len=unvalidated_payload_len,
valid_payload_len=valid_payload.len(),
"received invalid `PooledTransactions` response from peer, filtered out invalid entries"
);
}
// clear received hashes
let mut fetched = Vec::with_capacity(valid_payload.len());
requested_hashes.retain(|requested_hash| {
if valid_payload.contains_key(requested_hash) {
// hash is now known, stop tracking
fetched.push(*requested_hash);
return false
}
true
});
fetched.shrink_to_fit();
self.remove_hashes_from_transaction_fetcher(fetched);
// buffer left over hashes
self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
let transactions =
valid_payload.into_data().into_values().collect::<PooledTransactions>();
FetchEvent::TransactionsFetched { peer_id, transactions }
}
Ok(Err(req_err)) => {
self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
FetchEvent::FetchError { peer_id, error: req_err }
}
Err(_) => {
self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
// request channel closed/dropped
FetchEvent::FetchError { peer_id, error: RequestError::ChannelClosed }
}
}
}
}
impl Stream for TransactionFetcher {
type Item = FetchEvent;
/// Advances all inflight requests and returns the next event.
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// `FuturesUnordered` doesn't close when `None` is returned. so just return pending.
// <https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=815be2b6c8003303757c3ced135f363e>
if self.inflight_requests.is_empty() {
return Poll::Pending
}
if let Some(resp) = ready!(self.inflight_requests.poll_next_unpin(cx)) {
return Poll::Ready(Some(self.on_resolved_get_pooled_transactions_request_fut(resp)))
}
Poll::Pending
@ -886,6 +939,8 @@ impl Stream for TransactionFetcher {
impl Default for TransactionFetcher {
fn default() -> Self {
let (fetch_events_tail, fetch_events_head) = metered_unbounded_channel("net::tx");
Self {
active_peers: LruMap::new(DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS),
inflight_requests: Default::default(),
@ -894,15 +949,17 @@ impl Default for TransactionFetcher {
.expect("buffered cache limit should be non-zero"),
),
hashes_fetch_inflight_and_pending_fetch: LruMap::new_unlimited(),
filter_valid_hashes: Default::default(),
filter_valid_message: Default::default(),
info: TransactionFetcherInfo::default(),
fetch_events_head,
fetch_events_tail,
}
}
}
/// Metadata of a transaction hash that is yet to be fetched.
#[derive(Debug, Constructor)]
pub(super) struct TxFetchMetadata {
pub struct TxFetchMetadata {
/// The number of times a request attempt has been made for the hash.
retries: u8,
/// Peers that have announced the hash, but to which a request attempt has not yet been made.
@ -915,10 +972,15 @@ pub(super) struct TxFetchMetadata {
}
impl TxFetchMetadata {
/// Returns a mutable reference to the fallback peers cache for this transaction hash.
pub fn fallback_peers_mut(&mut self) -> &mut LruCache<PeerId> {
&mut self.fallback_peers
}
/// Returns the size of the transaction, if its hash has been received in any
/// [`Eth68`](reth_eth_wire::EthVersion::Eth68) announcement. If the transaction hash has only
/// been seen in [`Eth66`](reth_eth_wire::EthVersion::Eth66) announcements so far, this will
/// return `None`.
pub fn tx_encoded_len(&self) -> Option<usize> {
self.tx_encoded_length
}
@ -926,13 +988,13 @@ impl TxFetchMetadata {
/// Represents possible events from fetching transactions.
#[derive(Debug)]
pub(crate) enum FetchEvent {
pub enum FetchEvent {
/// Triggered when transactions are successfully fetched.
TransactionsFetched {
/// The ID of the peer from which transactions were fetched.
peer_id: PeerId,
/// The transactions that were fetched, if available.
transactions: Vec<PooledTransactionsElement>,
transactions: PooledTransactions,
},
/// Triggered when there is an error in fetching transactions.
FetchError {
@ -943,15 +1005,19 @@ pub(crate) enum FetchEvent {
},
}
/// An inflight request for [`PooledTransactions`] from a peer
pub(super) struct GetPooledTxRequest {
/// An inflight request for [`PooledTransactions`] from a peer.
#[derive(Debug)]
pub struct GetPooledTxRequest {
peer_id: PeerId,
/// Transaction hashes that were requested, for cleanup purposes
requested_hashes: RequestTxHashes,
response: oneshot::Receiver<RequestResult<PooledTransactions>>,
}
pub(super) struct GetPooledTxResponse {
/// Upon reception of a response, a [`GetPooledTxRequest`] is deconstructed to form a
/// [`GetPooledTxResponse`].
#[derive(Debug)]
pub struct GetPooledTxResponse {
peer_id: PeerId,
/// Transaction hashes that were requested, for cleanup purposes, since peer may only return a
/// subset of requested hashes.
@ -959,9 +1025,12 @@ pub(super) struct GetPooledTxResponse {
result: Result<RequestResult<PooledTransactions>, RecvError>,
}
/// Stores the response receiver made by sending a [`GetPooledTransactions`] request to a peer's
/// session.
#[must_use = "futures do nothing unless polled"]
#[pin_project::pin_project]
pub(super) struct GetPooledTxRequestFut {
#[derive(Debug)]
pub struct GetPooledTxRequestFut {
#[pin]
inner: Option<GetPooledTxRequest>,
}
@ -996,11 +1065,100 @@ impl Future for GetPooledTxRequestFut {
}
}
/// Wrapper of unverified [`PooledTransactions`].
#[derive(Debug, Constructor, Deref)]
pub struct UnverifiedPooledTransactions {
txns: PooledTransactions,
}
/// [`PooledTransactions`] that have been successfully verified.
#[derive(Debug, Constructor)]
pub struct VerifiedPooledTransactions {
txns: PooledTransactions,
}
impl DedupPayload for VerifiedPooledTransactions {
type Value = PooledTransactionsElement;
fn is_empty(&self) -> bool {
self.txns.is_empty()
}
fn len(&self) -> usize {
self.txns.len()
}
fn dedup(self) -> PartiallyValidData<Self::Value> {
let Self { txns } = self;
let unique_fetched = txns
.into_iter()
.map(|tx| (*tx.hash(), tx))
.collect::<HashMap<TxHash, PooledTransactionsElement>>();
PartiallyValidData::from_raw_data(unique_fetched, None)
}
}
trait VerifyPooledTransactionsResponse {
fn verify(
self,
requested_hashes: &[TxHash],
peer_id: &PeerId,
) -> (VerificationOutcome, VerifiedPooledTransactions);
}
impl VerifyPooledTransactionsResponse for UnverifiedPooledTransactions {
fn verify(
self,
requested_hashes: &[TxHash],
peer_id: &PeerId,
) -> (VerificationOutcome, VerifiedPooledTransactions) {
let mut verification_outcome = VerificationOutcome::Ok;
let Self { mut txns } = self;
#[cfg(debug_assertions)]
let mut tx_hashes_not_requested: SmallVec<[TxHash; 16]> = smallvec!();
txns.0.retain(|tx| {
if !requested_hashes.contains(tx.hash()) {
verification_outcome = VerificationOutcome::ReportPeer;
#[cfg(debug_assertions)]
tx_hashes_not_requested.push(*tx.hash());
return false
}
true
});
#[cfg(debug_assertions)]
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
tx_hashes_not_requested=?tx_hashes_not_requested,
"transactions in `PooledTransactions` response from peer were not requested"
);
(verification_outcome, VerifiedPooledTransactions::new(txns))
}
}
/// Outcome from verifying a [`PooledTransactions`] response. Signals to caller whether to penalize
/// the sender of the response or not.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum VerificationOutcome {
/// Peer behaves appropriately.
Ok,
/// A penalty should be flagged for the peer. Peer sent a response with unacceptably
/// invalid entries.
ReportPeer,
}
/// Tracks stats about the [`TransactionFetcher`].
#[derive(Debug)]
pub struct TransactionFetcherInfo {
/// Currently active outgoing [`GetPooledTransactions`] requests.
pub(super) max_inflight_requests: usize,
pub max_inflight_requests: usize,
/// Soft limit for the byte size of the expected
/// [`PooledTransactions`] response on packing a
/// [`GetPooledTransactions`] request with hashes.
@ -1008,10 +1166,11 @@ pub struct TransactionFetcherInfo {
/// Soft limit for the byte size of a [`PooledTransactions`]
/// response on assembling a [`GetPooledTransactions`]
/// request. Spec'd at 2 MiB.
pub(super) soft_limit_byte_size_pooled_transactions_response: usize,
pub soft_limit_byte_size_pooled_transactions_response: usize,
}
impl TransactionFetcherInfo {
/// Creates a new max
pub fn new(
max_inflight_transaction_requests: usize,
soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize,
@ -1058,22 +1217,8 @@ mod test {
self.0.len()
}
fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) -> Self {
let mut indices_to_remove = vec![];
for (i, (hash, _)) in self.0.iter().enumerate() {
if !f(hash) {
indices_to_remove.push(i);
}
}
let mut removed_hashes = Vec::with_capacity(indices_to_remove.len());
for index in indices_to_remove.into_iter().rev() {
let entry = self.0.remove(index);
removed_hashes.push(entry);
}
TestValidAnnouncementData(removed_hashes)
fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) {
self.0.retain(|(hash, _)| f(hash))
}
}
@ -1111,6 +1256,7 @@ mod test {
let expected_surplus_hashes = [eth68_hashes[1], eth68_hashes[3], eth68_hashes[4]];
let mut eth68_hashes_to_request = RequestTxHashes::with_capacity(3);
let valid_announcement_data = TestValidAnnouncementData(
eth68_hashes
.into_iter()

View File

@ -69,10 +69,13 @@ use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
use tracing::{debug, trace};
mod config;
mod constants;
mod fetcher;
mod validation;
/// Aggregation on configurable parameters for [`TransactionsManager`].
pub mod config;
/// Default and spec'd bounds.
pub mod constants;
/// Component responsible for fetching transactions from [`NewPooledTransactionHashes`].
pub mod fetcher;
pub mod validation;
pub use config::{TransactionFetcherConfig, TransactionsManagerConfig};
use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
@ -239,7 +242,7 @@ pub struct TransactionsManager<Pool> {
/// Bad imports.
bad_imports: LruCache<TxHash>,
/// All the connected peers.
peers: HashMap<PeerId, Peer>,
peers: HashMap<PeerId, PeerMetadata>,
/// Send half for the command channel.
///
/// This is kept so that a new [TransactionsHandle] can be created at any time.
@ -630,7 +633,7 @@ where
fn on_new_pooled_transaction_hashes(
&mut self,
peer_id: PeerId,
mut msg: NewPooledTransactionHashes,
msg: NewPooledTransactionHashes,
) {
// If the node is initially syncing, ignore transactions
if self.network.is_initially_syncing() {
@ -679,58 +682,66 @@ where
self.report_already_seen(peer_id);
}
// 1. filter out known hashes
// 1. filter out spam
let (validation_outcome, mut partially_valid_msg) =
self.transaction_fetcher.filter_valid_message.partially_filter_valid_entries(msg);
if let FilterOutcome::ReportPeer = validation_outcome {
self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
}
// 2. filter out known hashes
//
// known txns have already been successfully fetched.
// known txns have already been successfully fetched or received over gossip.
//
// most hashes will be filtered out here since this the mempool protocol is a gossip
// protocol, healthy peers will send many of the same hashes.
//
let already_known_by_pool = self.pool.retain_unknown(&mut msg);
if let Some(intersection) = already_known_by_pool {
self.metrics.occurrences_hashes_already_in_pool.increment(intersection.len() as u64);
let hashes_count_pre_pool_filter = partially_valid_msg.len();
self.pool.retain_unknown(&mut partially_valid_msg);
if hashes_count_pre_pool_filter > partially_valid_msg.len() {
let already_known_hashes_count =
hashes_count_pre_pool_filter - partially_valid_msg.len();
self.metrics
.occurrences_hashes_already_in_pool
.increment(already_known_hashes_count as u64);
}
if msg.is_empty() {
if partially_valid_msg.is_empty() {
// nothing to request
return
}
// 2. filter out invalid entries
// 3. filter out invalid entries (spam)
//
// validates messages with respect to the given network, e.g. allowed tx types
//
let mut valid_announcement_data = match msg {
NewPooledTransactionHashes::Eth68(eth68_msg) => {
// validate eth68 announcement data
let (outcome, valid_data) =
self.transaction_fetcher.filter_valid_hashes.filter_valid_entries_68(eth68_msg);
if let FilterOutcome::ReportPeer = outcome {
self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
}
valid_data
}
NewPooledTransactionHashes::Eth66(eth66_msg) => {
// validate eth66 announcement data
let (outcome, valid_data) =
self.transaction_fetcher.filter_valid_hashes.filter_valid_entries_66(eth66_msg);
if let FilterOutcome::ReportPeer = outcome {
self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
}
valid_data
}
let (validation_outcome, mut valid_announcement_data) = if partially_valid_msg
.msg_version()
.expect("partially valid announcement should have version")
.is_eth68()
{
// validate eth68 announcement data
self.transaction_fetcher
.filter_valid_message
.filter_valid_entries_68(partially_valid_msg)
} else {
// validate eth66 announcement data
self.transaction_fetcher
.filter_valid_message
.filter_valid_entries_66(partially_valid_msg)
};
if let FilterOutcome::ReportPeer = validation_outcome {
self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
}
if valid_announcement_data.is_empty() {
// no valid announcement data
return
}
// 3. filter out already seen unknown hashes
// 4. filter out already seen unknown hashes
//
// seen hashes are already in the tx fetcher, pending fetch.
//
@ -849,7 +860,7 @@ where
.into_iter()
.map(PooledTransactionsElement::try_from_broadcast)
.filter_map(Result::ok)
.collect::<Vec<_>>();
.collect::<PooledTransactions>();
// mark the transactions as received
self.transaction_fetcher.remove_hashes_from_transaction_fetcher(
@ -923,7 +934,7 @@ where
peer_id, client_version, messages, version, ..
} => {
// Insert a new peer into the peerset.
let peer = Peer::new(messages, version, client_version);
let peer = PeerMetadata::new(messages, version, client_version);
let peer = match self.peers.entry(peer_id) {
Entry::Occupied(mut entry) => {
entry.insert(peer);
@ -964,7 +975,7 @@ where
fn import_transactions(
&mut self,
peer_id: PeerId,
mut transactions: Vec<PooledTransactionsElement>,
transactions: PooledTransactions,
source: TransactionSource,
) {
// If the node is pipeline syncing, ignore transactions
@ -975,6 +986,8 @@ where
return
}
let mut transactions = transactions.0;
let Some(peer) = self.peers.get_mut(&peer_id) else { return };
// track that the peer knows these transaction, but only if this is a new broadcast.
@ -988,12 +1001,14 @@ where
}
}
// 1. filter out already imported txns
let already_known_by_pool = self.pool.retain_unknown(&mut transactions);
if let Some(intersection) = already_known_by_pool {
// 1. filter out txns already inserted into pool
let txns_count_pre_pool_filter = transactions.len();
self.pool.retain_unknown(&mut transactions);
if txns_count_pre_pool_filter > transactions.len() {
let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
self.metrics
.occurrences_transactions_already_in_pool
.increment(intersection.len() as u64);
.increment(already_known_txns_count as u64);
}
// tracks the quality of the given transactions
@ -1485,13 +1500,13 @@ impl TransactionSource {
}
}
/// Tracks a single peer
/// Tracks a single peer in the context of [`TransactionsManager`].
#[derive(Debug)]
struct Peer {
pub struct PeerMetadata {
/// Optimistically keeps track of transactions that we know the peer has seen. Optimistic, in
/// the sense that transactions are preemptively marked as seen by peer when they are sent to
/// the peer.
seen_transactions: LruCache<B256>,
seen_transactions: LruCache<TxHash>,
/// A communication channel directly to the peer's session task.
request_tx: PeerRequestSender,
/// negotiated version of the session.
@ -1500,7 +1515,8 @@ struct Peer {
client_version: Arc<str>,
}
impl Peer {
impl PeerMetadata {
/// Returns a new instance of [`PeerMetadata`].
fn new(request_tx: PeerRequestSender, version: EthVersion, client_version: Arc<str>) -> Self {
Self {
seen_transactions: LruCache::new(
@ -1570,14 +1586,15 @@ pub enum NetworkTransactionEvent {
/// Tracks stats about the [`TransactionsManager`].
#[derive(Debug)]
struct PendingPoolImportsInfo {
/// Number of transactions that are currently being imported into pool.
pub struct PendingPoolImportsInfo {
/// Number of transactions about to be inserted into the pool.
pending_pool_imports: Arc<AtomicUsize>,
/// Max number of transactions allowed to be imported concurrently.
max_pending_pool_imports: usize,
}
impl PendingPoolImportsInfo {
/// Returns a new [`PendingPoolImportsInfo`].
pub fn new(max_pending_pool_imports: usize) -> Self {
Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports }
}
@ -1642,11 +1659,15 @@ mod tests {
pub(super) fn new_mock_session(
peer_id: PeerId,
version: EthVersion,
) -> (Peer, mpsc::Receiver<PeerRequest>) {
) -> (PeerMetadata, mpsc::Receiver<PeerRequest>) {
let (to_mock_session_tx, to_mock_session_rx) = mpsc::channel(1);
(
Peer::new(PeerRequestSender::new(peer_id, to_mock_session_tx), version, Arc::from("")),
PeerMetadata::new(
PeerRequestSender::new(peer_id, to_mock_session_tx),
version,
Arc::from(""),
),
to_mock_session_rx,
)
}

View File

@ -1,12 +1,12 @@
//! Validation of [`NewPooledTransactionHashes66`] and [`NewPooledTransactionHashes68`]
//! Validation of [`NewPooledTransactionHashes66`](reth_eth_wire::NewPooledTransactionHashes66)
//! and [`NewPooledTransactionHashes68`](reth_eth_wire::NewPooledTransactionHashes68)
//! announcements. Validation and filtering of announcements is network dependent.
use std::{collections::HashMap, mem};
use std::{fmt, mem};
use derive_more::{Deref, DerefMut, Display};
use itertools::izip;
use reth_eth_wire::{
NewPooledTransactionHashes66, NewPooledTransactionHashes68, ValidAnnouncementData,
DedupPayload, Eth68TxMetadata, HandleMempoolData, PartiallyValidData, ValidAnnouncementData,
MAX_MESSAGE_SIZE,
};
use reth_primitives::{Signature, TxHash, TxType};
@ -15,12 +15,14 @@ use tracing::{debug, trace};
/// The size of a decoded signature in bytes.
pub const SIGNATURE_DECODED_SIZE_BYTES: usize = mem::size_of::<Signature>();
/// Interface for validating a `(ty, size, hash)` tuple from a [`NewPooledTransactionHashes68`].
/// Interface for validating a `(ty, size, hash)` tuple from a
/// [`NewPooledTransactionHashes68`](reth_eth_wire::NewPooledTransactionHashes68)..
pub trait ValidateTx68 {
/// Validates a [`NewPooledTransactionHashes68`] entry. Returns [`ValidationOutcome`] which
/// signals to the caller wether to fetch the transaction or wether to drop it, and wether the
/// sender of the announcement should be penalized.
fn should_fetch(&self, ty: u8, hash: TxHash, size: usize) -> ValidationOutcome;
/// Validates a [`NewPooledTransactionHashes68`](reth_eth_wire::NewPooledTransactionHashes68)
/// entry. Returns [`ValidationOutcome`] which signals to the caller wether to fetch the
/// transaction or wether to drop it, and wether the sender of the announcement should be
/// penalized.
fn should_fetch(&self, ty: u8, hash: &TxHash, size: usize) -> ValidationOutcome;
/// Returns the reasonable maximum encoded transaction length configured for this network, if
/// any. This property is not spec'ed out but can be inferred by looking how much data can be
@ -42,9 +44,9 @@ pub trait ValidateTx68 {
fn strict_min_encoded_tx_length(&self, ty: TxType) -> Option<usize>;
}
/// Outcomes from validating a `(ty, hash, size)` entry from a [`NewPooledTransactionHashes68`].
/// Signals to the caller how to deal with an announcement entry and the peer who sent the
/// announcement.
/// Outcomes from validating a `(ty, hash, size)` entry from a
/// [`NewPooledTransactionHashes68`](reth_eth_wire::NewPooledTransactionHashes68). Signals to the
/// caller how to deal with an announcement entry and the peer who sent the announcement.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ValidationOutcome {
/// Tells the caller to keep the entry in the announcement for fetch.
@ -56,30 +58,68 @@ pub enum ValidationOutcome {
ReportPeer,
}
/// Filters valid entries in [`NewPooledTransactionHashes68`] and [`NewPooledTransactionHashes66`]
/// in place, and flags misbehaving peers.
/// Generic filter for announcements and responses. Checks for empty message and unique hashes/
/// transactions in message.
pub trait PartiallyFilterMessage {
/// Removes duplicate entries from a mempool message. Returns [`FilterOutcome::ReportPeer`] if
/// the caller should penalize the peer, otherwise [`FilterOutcome::Ok`].
fn partially_filter_valid_entries<V>(
&self,
msg: impl DedupPayload<Value = V> + fmt::Debug,
) -> (FilterOutcome, PartiallyValidData<V>) {
// 1. checks if the announcement is empty
if msg.is_empty() {
debug!(target: "net::tx",
msg=?msg,
"empty payload"
);
return (FilterOutcome::ReportPeer, PartiallyValidData::empty_eth66())
}
// 2. checks if announcement is spam packed with duplicate hashes
let original_len = msg.len();
let partially_valid_data = msg.dedup();
(
if partially_valid_data.len() != original_len {
FilterOutcome::ReportPeer
} else {
FilterOutcome::Ok
},
partially_valid_data,
)
}
}
/// Filters valid entries in
/// [`NewPooledTransactionHashes68`](reth_eth_wire::NewPooledTransactionHashes68) and
/// [`NewPooledTransactionHashes66`](reth_eth_wire::NewPooledTransactionHashes66) in place, and
/// flags misbehaving peers.
pub trait FilterAnnouncement {
/// Removes invalid entries from a [`NewPooledTransactionHashes68`] announcement. Returns
/// [`FilterOutcome::ReportPeer`] if the caller should penalize the peer, otherwise
/// Removes invalid entries from a
/// [`NewPooledTransactionHashes68`](reth_eth_wire::NewPooledTransactionHashes68) announcement.
/// Returns [`FilterOutcome::ReportPeer`] if the caller should penalize the peer, otherwise
/// [`FilterOutcome::Ok`].
fn filter_valid_entries_68(
&self,
msg: NewPooledTransactionHashes68,
msg: PartiallyValidData<Eth68TxMetadata>,
) -> (FilterOutcome, ValidAnnouncementData)
where
Self: ValidateTx68;
/// Removes invalid entries from a [`NewPooledTransactionHashes66`] announcement. Returns
/// [`FilterOutcome::ReportPeer`] if the caller should penalize the peer, otherwise
/// Removes invalid entries from a
/// [`NewPooledTransactionHashes66`](reth_eth_wire::NewPooledTransactionHashes66) announcement.
/// Returns [`FilterOutcome::ReportPeer`] if the caller should penalize the peer, otherwise
/// [`FilterOutcome::Ok`].
fn filter_valid_entries_66(
&self,
msg: NewPooledTransactionHashes66,
msg: PartiallyValidData<Eth68TxMetadata>,
) -> (FilterOutcome, ValidAnnouncementData);
}
/// Outcome from filtering [`NewPooledTransactionHashes68`]. Signals to caller whether to penalize
/// the sender of the announcement or not.
/// Outcome from filtering
/// [`NewPooledTransactionHashes68`](reth_eth_wire::NewPooledTransactionHashes68). Signals to caller
/// whether to penalize the sender of the announcement or not.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FilterOutcome {
/// Peer behaves appropriately.
@ -90,18 +130,20 @@ pub enum FilterOutcome {
}
/// Wrapper for types that implement [`FilterAnnouncement`]. The definition of a valid
/// announcement is network dependent. For example, different networks support different [`TxType`]
/// s, and different [`TxType`]s have different transaction size constraints. Defaults to
/// [`EthAnnouncementFilter`].
/// announcement is network dependent. For example, different networks support different
/// [`TxType`]s, and different [`TxType`]s have different transaction size constraints. Defaults to
/// [`EthMessageFilter`].
#[derive(Debug, Default, Deref, DerefMut)]
pub struct AnnouncementFilter<N = EthAnnouncementFilter>(N);
pub struct MessageFilter<N = EthMessageFilter>(N);
/// Filter for announcements containing EIP [`TxType`]s.
#[derive(Debug, Display, Default)]
pub struct EthAnnouncementFilter;
pub struct EthMessageFilter;
impl ValidateTx68 for EthAnnouncementFilter {
fn should_fetch(&self, ty: u8, hash: TxHash, size: usize) -> ValidationOutcome {
impl PartiallyFilterMessage for EthMessageFilter {}
impl ValidateTx68 for EthMessageFilter {
fn should_fetch(&self, ty: u8, hash: &TxHash, size: usize) -> ValidationOutcome {
//
// 1. checks if tx type is valid value for this network
//
@ -204,140 +246,74 @@ impl ValidateTx68 for EthAnnouncementFilter {
}
}
impl FilterAnnouncement for EthAnnouncementFilter {
impl FilterAnnouncement for EthMessageFilter {
fn filter_valid_entries_68(
&self,
msg: NewPooledTransactionHashes68,
mut msg: PartiallyValidData<Eth68TxMetadata>,
) -> (FilterOutcome, ValidAnnouncementData)
where
Self: ValidateTx68,
{
trace!(target: "net::tx::validation",
types=?msg.types,
sizes=?msg.sizes,
hashes=?msg.hashes,
msg=?*msg,
network=%Self,
"validating eth68 announcement data.."
);
let NewPooledTransactionHashes68 { mut hashes, mut types, mut sizes } = msg;
debug_assert!(
hashes.len() == types.len() && hashes.len() == sizes.len(), "`%hashes`, `%types` and `%sizes` should all be the same length, decoding of `NewPooledTransactionHashes68` should handle this,
`%hashes`: {hashes:?},
`%types`: {types:?},
`%sizes: {sizes:?}`"
);
//
// 1. checks if the announcement is empty
//
if hashes.is_empty() {
debug!(target: "net::tx",
network=%Self,
"empty eth68 announcement"
);
return (FilterOutcome::ReportPeer, ValidAnnouncementData::empty_eth68())
}
let mut should_report_peer = false;
let mut indices_to_remove = vec![];
//
// 2. checks if eth68 announcement metadata is valid
// checks if eth68 announcement metadata is valid
//
// transactions that are filtered out here, may not be spam, rather from benevolent peers
// that are unknowingly sending announcements with invalid data.
//
for (i, (&ty, &hash, &size)) in izip!(&types, &hashes, &sizes).enumerate() {
match self.should_fetch(ty, hash, size) {
ValidationOutcome::Fetch => (),
ValidationOutcome::Ignore => indices_to_remove.push(i),
msg.retain(|hash, metadata| {
debug_assert!(
metadata.is_some(),
"metadata should exist for `%hash` in eth68 announcement passed to `%filter_valid_entries_68`,
`%hash`: {hash}"
);
let Some((ty, size)) = metadata else {
return false
};
match self.should_fetch(*ty, hash, *size) {
ValidationOutcome::Fetch => true,
ValidationOutcome::Ignore => false,
ValidationOutcome::ReportPeer => {
indices_to_remove.push(i);
should_report_peer = true;
false
}
}
}
for index in indices_to_remove.into_iter().rev() {
hashes.remove(index);
types.remove(index);
sizes.remove(index);
}
//
// 3. checks if announcement is spam packed with duplicate hashes
//
let original_len = hashes.len();
let mut deduped_data = HashMap::with_capacity(hashes.len());
for hash in hashes.into_iter().rev() {
if let (Some(ty), Some(size)) = (types.pop(), sizes.pop()) {
deduped_data.insert(hash, Some((ty, size)));
}
}
deduped_data.shrink_to_fit();
if deduped_data.len() != original_len {
should_report_peer = true
}
});
(
if should_report_peer { FilterOutcome::ReportPeer } else { FilterOutcome::Ok },
ValidAnnouncementData::new_eth68(deduped_data),
ValidAnnouncementData::from_partially_valid_data(msg),
)
}
fn filter_valid_entries_66(
&self,
msg: NewPooledTransactionHashes66,
partially_valid_data: PartiallyValidData<Option<(u8, usize)>>,
) -> (FilterOutcome, ValidAnnouncementData) {
trace!(target: "net::tx::validation",
hashes=?msg.0,
hashes=?*partially_valid_data,
network=%Self,
"validating eth66 announcement data.."
);
let NewPooledTransactionHashes66(hashes) = msg;
//
// 1. checks if the announcement is empty
//
if hashes.is_empty() {
debug!(target: "net::tx",
network=%Self,
"empty eth66 announcement"
);
return (FilterOutcome::ReportPeer, ValidAnnouncementData::empty_eth66())
}
//
// 2. checks if announcement is spam packed with duplicate hashes
//
let original_len = hashes.len();
let mut deduped_data = HashMap::with_capacity(hashes.len());
for hash in hashes.into_iter().rev() {
deduped_data.insert(hash, None);
}
deduped_data.shrink_to_fit();
(
if deduped_data.len() != original_len {
FilterOutcome::ReportPeer
} else {
FilterOutcome::Ok
},
ValidAnnouncementData::new_eth66(deduped_data),
)
(FilterOutcome::Ok, ValidAnnouncementData::from_partially_valid_data(partially_valid_data))
}
}
#[cfg(test)]
mod test {
use super::*;
use reth_eth_wire::{NewPooledTransactionHashes66, NewPooledTransactionHashes68};
use reth_primitives::B256;
use std::str::FromStr;
use std::{collections::HashMap, str::FromStr};
#[test]
fn eth68_empty_announcement() {
@ -347,9 +323,9 @@ mod test {
let announcement = NewPooledTransactionHashes68 { types, sizes, hashes };
let filter = EthAnnouncementFilter;
let filter = EthMessageFilter;
let (outcome, _data) = filter.filter_valid_entries_68(announcement);
let (outcome, _partially_valid_data) = filter.partially_filter_valid_entries(announcement);
assert_eq!(outcome, FilterOutcome::ReportPeer);
}
@ -374,16 +350,20 @@ mod test {
hashes: hashes.clone(),
};
let filter = EthAnnouncementFilter;
let filter = EthMessageFilter;
let (outcome, data) = filter.filter_valid_entries_68(announcement);
let (outcome, partially_valid_data) = filter.partially_filter_valid_entries(announcement);
assert_eq!(outcome, FilterOutcome::Ok);
let (outcome, valid_data) = filter.filter_valid_entries_68(partially_valid_data);
assert_eq!(outcome, FilterOutcome::ReportPeer);
let mut expected_data = HashMap::new();
expected_data.insert(hashes[1], Some((types[1], sizes[1])));
assert_eq!(expected_data, data.into_data())
assert_eq!(expected_data, valid_data.into_data())
}
#[test]
@ -410,16 +390,20 @@ mod test {
hashes: hashes.clone(),
};
let filter = EthAnnouncementFilter;
let filter = EthMessageFilter;
let (outcome, data) = filter.filter_valid_entries_68(announcement);
let (outcome, partially_valid_data) = filter.partially_filter_valid_entries(announcement);
assert_eq!(outcome, FilterOutcome::Ok);
let (outcome, valid_data) = filter.filter_valid_entries_68(partially_valid_data);
assert_eq!(outcome, FilterOutcome::Ok);
let mut expected_data = HashMap::new();
expected_data.insert(hashes[2], Some((types[2], sizes[2])));
assert_eq!(expected_data, data.into_data())
assert_eq!(expected_data, valid_data.into_data())
}
#[test]
@ -449,9 +433,9 @@ mod test {
hashes: hashes.clone(),
};
let filter = EthAnnouncementFilter;
let filter = EthMessageFilter;
let (outcome, data) = filter.filter_valid_entries_68(announcement);
let (outcome, partially_valid_data) = filter.partially_filter_valid_entries(announcement);
assert_eq!(outcome, FilterOutcome::ReportPeer);
@ -459,7 +443,7 @@ mod test {
expected_data.insert(hashes[3], Some((types[3], sizes[3])));
expected_data.insert(hashes[0], Some((types[0], sizes[0])));
assert_eq!(expected_data, data.into_data())
assert_eq!(expected_data, partially_valid_data.into_data())
}
#[test]
@ -468,9 +452,9 @@ mod test {
let announcement = NewPooledTransactionHashes66(hashes);
let filter: AnnouncementFilter = AnnouncementFilter::default();
let filter: MessageFilter = MessageFilter::default();
let (outcome, _data) = filter.filter_valid_entries_66(announcement);
let (outcome, _partially_valid_data) = filter.partially_filter_valid_entries(announcement);
assert_eq!(outcome, FilterOutcome::ReportPeer);
}
@ -493,9 +477,9 @@ mod test {
let announcement = NewPooledTransactionHashes66(hashes.clone());
let filter: AnnouncementFilter = AnnouncementFilter::default();
let filter: MessageFilter = MessageFilter::default();
let (outcome, data) = filter.filter_valid_entries_66(announcement);
let (outcome, partially_valid_data) = filter.partially_filter_valid_entries(announcement);
assert_eq!(outcome, FilterOutcome::ReportPeer);
@ -503,12 +487,12 @@ mod test {
expected_data.insert(hashes[1], None);
expected_data.insert(hashes[0], None);
assert_eq!(expected_data, data.into_data())
assert_eq!(expected_data, partially_valid_data.into_data())
}
#[test]
fn test_derive_more_display_for_zst() {
let filter = EthAnnouncementFilter;
assert_eq!("EthAnnouncementFilter", &filter.to_string());
let filter = EthMessageFilter;
assert_eq!("EthMessageFilter", &filter.to_string());
}
}