Fix bug hashes buffered for busy peer gave false-positive on eth68 check (#6427)

This commit is contained in:
Emilia Hane
2024-02-05 23:39:45 +01:00
committed by GitHub
parent 5b847cba4f
commit 75132dabba
4 changed files with 243 additions and 95 deletions

View File

@ -4,6 +4,7 @@ use crate::{EthMessage, EthVersion};
use alloy_rlp::{ use alloy_rlp::{
Decodable, Encodable, RlpDecodable, RlpDecodableWrapper, RlpEncodable, RlpEncodableWrapper, Decodable, Encodable, RlpDecodable, RlpDecodableWrapper, RlpEncodable, RlpEncodableWrapper,
}; };
use derive_more::{Deref, DerefMut, IntoIterator};
use reth_codecs::derive_arbitrary; use reth_codecs::derive_arbitrary;
use reth_primitives::{Block, Bytes, TransactionSigned, TxHash, B256, U128}; use reth_primitives::{Block, Bytes, TransactionSigned, TxHash, B256, U128};
@ -447,8 +448,14 @@ pub trait HandleAnnouncement {
/// The announcement contains no entries. /// The announcement contains no entries.
fn is_empty(&self) -> bool; fn is_empty(&self) -> bool;
/// Returns the number of entries.
fn len(&self) -> usize;
/// Retain only entries for which the hash in the entry, satisfies a given predicate. /// Retain only entries for which the hash in the entry, satisfies a given predicate.
fn retain_by_hash(&mut self, f: impl FnMut(TxHash) -> bool); fn retain_by_hash(&mut self, f: impl FnMut(TxHash) -> bool);
/// Returns the announcement version, either [`EthVersion::Eth66`] or [`EthVersion::Eth68`].
fn msg_version(&self) -> EthVersion;
} }
impl HandleAnnouncement for NewPooledTransactionHashes { impl HandleAnnouncement for NewPooledTransactionHashes {
@ -456,12 +463,20 @@ impl HandleAnnouncement for NewPooledTransactionHashes {
self.is_empty() self.is_empty()
} }
fn len(&self) -> usize {
self.len()
}
fn retain_by_hash(&mut self, f: impl FnMut(TxHash) -> bool) { fn retain_by_hash(&mut self, f: impl FnMut(TxHash) -> bool) {
match self { match self {
NewPooledTransactionHashes::Eth66(msg) => msg.retain_by_hash(f), NewPooledTransactionHashes::Eth66(msg) => msg.retain_by_hash(f),
NewPooledTransactionHashes::Eth68(msg) => msg.retain_by_hash(f), NewPooledTransactionHashes::Eth68(msg) => msg.retain_by_hash(f),
} }
} }
fn msg_version(&self) -> EthVersion {
self.version()
}
} }
impl HandleAnnouncement for NewPooledTransactionHashes68 { impl HandleAnnouncement for NewPooledTransactionHashes68 {
@ -469,6 +484,10 @@ impl HandleAnnouncement for NewPooledTransactionHashes68 {
self.hashes.is_empty() self.hashes.is_empty()
} }
fn len(&self) -> usize {
self.hashes.len()
}
fn retain_by_hash(&mut self, mut f: impl FnMut(TxHash) -> bool) { fn retain_by_hash(&mut self, mut f: impl FnMut(TxHash) -> bool) {
let mut indices_to_remove = vec![]; let mut indices_to_remove = vec![];
for (i, &hash) in self.hashes.iter().enumerate() { for (i, &hash) in self.hashes.iter().enumerate() {
@ -483,6 +502,10 @@ impl HandleAnnouncement for NewPooledTransactionHashes68 {
self.sizes.remove(index); self.sizes.remove(index);
} }
} }
fn msg_version(&self) -> EthVersion {
EthVersion::Eth68
}
} }
impl HandleAnnouncement for NewPooledTransactionHashes66 { impl HandleAnnouncement for NewPooledTransactionHashes66 {
@ -490,6 +513,10 @@ impl HandleAnnouncement for NewPooledTransactionHashes66 {
self.0.is_empty() self.0.is_empty()
} }
fn len(&self) -> usize {
self.0.len()
}
fn retain_by_hash(&mut self, mut f: impl FnMut(TxHash) -> bool) { fn retain_by_hash(&mut self, mut f: impl FnMut(TxHash) -> bool) {
let mut indices_to_remove = vec![]; let mut indices_to_remove = vec![];
for (i, &hash) in self.0.iter().enumerate() { for (i, &hash) in self.0.iter().enumerate() {
@ -502,34 +529,139 @@ impl HandleAnnouncement for NewPooledTransactionHashes66 {
self.0.remove(index); self.0.remove(index);
} }
} }
fn msg_version(&self) -> EthVersion {
EthVersion::Eth66
}
} }
/// Announcement data that has been validated according to the configured network. For an eth68 /// Announcement data that has been validated according to the configured network. For an eth68
/// announcement, values of the map are `Some((u8, usize))` - the tx metadata. For an eth66 /// announcement, values of the map are `Some((u8, usize))` - the tx metadata. For an eth66
/// announcement, values of the map are `None`. /// announcement, values of the map are `None`.
pub type ValidAnnouncementData = HashMap<TxHash, Option<(u8, usize)>>; #[derive(Debug, IntoIterator)]
pub struct ValidAnnouncementData {
#[into_iterator]
data: HashMap<TxHash, Option<(u8, usize)>>,
version: EthVersion,
}
impl ValidAnnouncementData {
/// Returns a new [`ValidAnnouncementData`] wrapper around validated [`EthVersion::Eth68`]
/// announcement data.
pub fn new_eth68(data: HashMap<TxHash, Option<(u8, usize)>>) -> Self {
Self { data, version: EthVersion::Eth68 }
}
/// Returns a new [`ValidAnnouncementData`] wrapper around validated [`EthVersion::Eth68`]
/// announcement data.
pub fn new_eth66(data: HashMap<TxHash, Option<(u8, usize)>>) -> Self {
Self { data, version: EthVersion::Eth66 }
}
/// Returns a new [`ValidAnnouncementData`] with empty data for an [`EthVersion::Eth68`]
/// announcement.
pub fn empty_eth68() -> Self {
Self { data: HashMap::new(), version: EthVersion::Eth68 }
}
/// Returns a new [`ValidAnnouncementData`] with empty data for an [`EthVersion::Eth66`]
/// announcement.
pub fn empty_eth66() -> Self {
Self { data: HashMap::new(), version: EthVersion::Eth66 }
}
/// Destructs returning the validated data.
pub fn into_data(self) -> HashMap<TxHash, Option<(u8, usize)>> {
self.data
}
}
impl HandleAnnouncement for ValidAnnouncementData { impl HandleAnnouncement for ValidAnnouncementData {
fn is_empty(&self) -> bool { fn is_empty(&self) -> bool {
self.is_empty() self.data.is_empty()
}
fn len(&self) -> usize {
self.data.len()
} }
fn retain_by_hash(&mut self, mut f: impl FnMut(TxHash) -> bool) { fn retain_by_hash(&mut self, mut f: impl FnMut(TxHash) -> bool) {
self.retain(|&hash, _| f(hash)) self.data.retain(|&hash, _| f(hash))
}
fn msg_version(&self) -> EthVersion {
self.version
} }
} }
/// Hashes extracted from valid announcement data. For an eth68 announcement, this means the eth68 /// Hashes extracted from valid announcement data. For an eth68 announcement, this means the eth68
/// metadata should have been cached already. /// metadata should have been cached already.
pub type ValidTxHashes = Vec<TxHash>; #[derive(Debug, Deref, DerefMut, IntoIterator)]
pub struct ValidTxHashes {
#[deref]
#[deref_mut]
#[into_iterator]
hashes: Vec<TxHash>,
version: EthVersion,
}
impl ValidTxHashes {
/// Returns a new [`ValidTxHashes`] wrapper around validated hashes. Takes a list of validated
/// hashes as parameter along with the eth version.
pub fn new(hashes: Vec<TxHash>, version: EthVersion) -> Self {
Self { hashes, version }
}
/// Returns a new [`ValidTxHashes`] wrapper around validated hashes from valid
/// [`EthVersion::Eth68`] announcement data. Takes a list of validated hashes as parameter.
pub fn new_eth68(hashes: Vec<TxHash>) -> Self {
Self::new(hashes, EthVersion::Eth68)
}
/// Returns a new [`ValidTxHashes`] wrapper around validated hashes from valid
/// [`EthVersion::Eth66`] announcement data. Takes a list of validated hashes as parameter.
pub fn new_eth66(hashes: Vec<TxHash>) -> Self {
Self::new(hashes, EthVersion::Eth66)
}
/// Returns a new [`ValidTxHashes`] with empty hashes.
pub fn empty(version: EthVersion) -> Self {
Self { hashes: vec![], version }
}
/// Returns a new [`ValidTxHashes`] with empty hashes for an [`EthVersion::Eth68`]
/// announcement.
pub fn empty_eth68() -> Self {
Self::empty(EthVersion::Eth68)
}
/// Returns a new [`ValidTxHashes`] with empty hashes for an [`EthVersion::Eth66`]
/// announcement.
pub fn empty_eth66() -> Self {
Self::empty(EthVersion::Eth66)
}
/// Destructs returning the validated hashes.
pub fn into_hashes(self) -> Vec<TxHash> {
self.hashes
}
}
impl HandleAnnouncement for ValidTxHashes { impl HandleAnnouncement for ValidTxHashes {
fn is_empty(&self) -> bool { fn is_empty(&self) -> bool {
self.is_empty() self.hashes.is_empty()
}
fn len(&self) -> usize {
self.hashes.len()
} }
fn retain_by_hash(&mut self, mut f: impl FnMut(TxHash) -> bool) { fn retain_by_hash(&mut self, mut f: impl FnMut(TxHash) -> bool) {
self.retain(|&hash| f(hash)) self.hashes.retain(|&hash| f(hash))
}
fn msg_version(&self) -> EthVersion {
self.version
} }
} }

View File

@ -4,7 +4,7 @@ use crate::{
}; };
use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt}; use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
use pin_project::pin_project; use pin_project::pin_project;
use reth_eth_wire::{EthVersion, GetPooledTransactions, HandleAnnouncement}; use reth_eth_wire::{GetPooledTransactions, HandleAnnouncement, ValidTxHashes};
use reth_interfaces::p2p::error::{RequestError, RequestResult}; use reth_interfaces::p2p::error::{RequestError, RequestResult};
use reth_primitives::{PeerId, PooledTransactionsElement, TxHash}; use reth_primitives::{PeerId, PooledTransactionsElement, TxHash};
use schnellru::{ByLength, Unlimited}; use schnellru::{ByLength, Unlimited};
@ -138,10 +138,16 @@ impl TransactionFetcher {
} }
/// Packages hashes for [`GetPooledTxRequest`] up to limit. Returns left over hashes. /// Packages hashes for [`GetPooledTxRequest`] up to limit. Returns left over hashes.
pub(super) fn pack_hashes(&mut self, hashes: &mut Vec<TxHash>, peer_id: PeerId) -> Vec<TxHash> { pub(super) fn pack_hashes(
let Some(hash) = hashes.first() else { return vec![] }; &mut self,
hashes: &mut ValidTxHashes,
peer_id: PeerId,
) -> ValidTxHashes {
if hashes.is_empty() {
return ValidTxHashes::empty(hashes.msg_version())
};
if self.eth68_meta.get(hash).is_some() { if hashes.msg_version().is_eth68() {
return self.pack_hashes_eth68(hashes, peer_id) return self.pack_hashes_eth68(hashes, peer_id)
} }
self.pack_hashes_eth66(hashes) self.pack_hashes_eth66(hashes)
@ -151,11 +157,13 @@ impl TransactionFetcher {
/// If necessary, takes hashes from buffer for which peer is listed as fallback peer. /// If necessary, takes hashes from buffer for which peer is listed as fallback peer.
/// ///
/// Returns left over hashes. /// Returns left over hashes.
pub(super) fn pack_hashes_eth66(&mut self, hashes: &mut Vec<TxHash>) -> Vec<TxHash> { pub(super) fn pack_hashes_eth66(&mut self, hashes: &mut ValidTxHashes) -> ValidTxHashes {
if hashes.len() <= GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES { if hashes.len() <= GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES {
return vec![] return ValidTxHashes::empty_eth66()
} }
hashes.split_off(GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES - 1) let surplus_hashes = hashes.split_off(GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES - 1);
ValidTxHashes::new_eth66(surplus_hashes)
} }
/// Evaluates wether or not to include a hash in a `GetPooledTransactions` version eth68 /// Evaluates wether or not to include a hash in a `GetPooledTransactions` version eth68
@ -201,13 +209,14 @@ impl TransactionFetcher {
/// 4. Return surplus hashes. /// 4. Return surplus hashes.
pub(super) fn pack_hashes_eth68( pub(super) fn pack_hashes_eth68(
&mut self, &mut self,
hashes: &mut Vec<TxHash>, hashes: &mut ValidTxHashes,
peer_id: PeerId, peer_id: PeerId,
) -> Vec<TxHash> { ) -> ValidTxHashes {
if let Some(hash) = hashes.first() { if let Some(hash) = hashes.first() {
if let Some(size) = self.eth68_meta.get(hash) { if let Some(size) = self.eth68_meta.get(hash) {
if *size >= SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_MESSAGE { if *size >= SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_MESSAGE {
return hashes.split_off(1) let surplus_hashes = hashes.split_off(1);
return ValidTxHashes::new_eth68(surplus_hashes)
} }
} }
} }
@ -233,10 +242,10 @@ impl TransactionFetcher {
} }
}); });
surplus_hashes ValidTxHashes::new_eth68(surplus_hashes)
} }
pub(super) fn buffer_hashes_for_retry(&mut self, mut hashes: Vec<TxHash>) { pub(super) fn buffer_hashes_for_retry(&mut self, mut hashes: ValidTxHashes) {
// It could be that the txns have been received over broadcast in the time being. // It could be that the txns have been received over broadcast in the time being.
hashes.retain(|hash| self.unknown_hashes.get(hash).is_some()); hashes.retain(|hash| self.unknown_hashes.get(hash).is_some());
@ -246,10 +255,12 @@ impl TransactionFetcher {
/// Buffers hashes. Note: Only peers that haven't yet tried to request the hashes should be /// Buffers hashes. Note: Only peers that haven't yet tried to request the hashes should be
/// passed as `fallback_peer` parameter! Hashes that have been re-requested /// passed as `fallback_peer` parameter! Hashes that have been re-requested
/// [`MAX_REQUEST_RETRIES_PER_TX_HASH`], are dropped. /// [`MAX_REQUEST_RETRIES_PER_TX_HASH`], are dropped.
pub(super) fn buffer_hashes(&mut self, hashes: Vec<TxHash>, fallback_peer: Option<PeerId>) { pub(super) fn buffer_hashes(&mut self, hashes: ValidTxHashes, fallback_peer: Option<PeerId>) {
let mut max_retried_and_evicted_hashes = vec![]; let mut max_retried_and_evicted_hashes = vec![];
for hash in hashes { let msg_version = hashes.msg_version();
for hash in hashes.into_iter() {
// todo: enforce by adding new types UnknownTxHash66 and UnknownTxHash68 // todo: enforce by adding new types UnknownTxHash66 and UnknownTxHash68
debug_assert!( debug_assert!(
self.unknown_hashes.peek(&hash).is_some(), self.unknown_hashes.peek(&hash).is_some(),
@ -267,17 +278,10 @@ impl TransactionFetcher {
// peer in caller's context has requested hash and is hence not eligible as // peer in caller's context has requested hash and is hence not eligible as
// fallback peer. // fallback peer.
if *retries >= MAX_REQUEST_RETRIES_PER_TX_HASH { if *retries >= MAX_REQUEST_RETRIES_PER_TX_HASH {
let msg_version = || {
self.eth68_meta
.peek(&hash)
.map(|_| EthVersion::Eth68)
.unwrap_or(EthVersion::Eth66)
};
debug!(target: "net::tx", debug!(target: "net::tx",
hash=%hash, hash=%hash,
retries=retries, retries=retries,
msg_version=%msg_version(), msg_version=%msg_version,
"retry limit for `GetPooledTransactions` requests reached for hash, dropping hash" "retry limit for `GetPooledTransactions` requests reached for hash, dropping hash"
); );
@ -305,12 +309,16 @@ impl TransactionFetcher {
self.remove_from_unknown_hashes(hashes) self.remove_from_unknown_hashes(hashes)
} }
pub(super) fn filter_unseen_hashes<T: HandleAnnouncement>( pub(super) fn filter_unseen_and_pending_hashes<T: HandleAnnouncement>(
&mut self, &mut self,
new_announced_hashes: &mut T, new_announced_hashes: &mut T,
peer_id: PeerId, peer_id: PeerId,
is_session_active: impl Fn(PeerId) -> bool, is_session_active: impl Fn(PeerId) -> bool,
) { ) {
#[cfg(debug_assertions)]
let mut previously_unseen_hashes = Vec::with_capacity(new_announced_hashes.len() / 4);
let msg_version = new_announced_hashes.msg_version();
// filter out inflight hashes, and register the peer as fallback for all inflight hashes // filter out inflight hashes, and register the peer as fallback for all inflight hashes
new_announced_hashes.retain_by_hash(|hash| { new_announced_hashes.retain_by_hash(|hash| {
// occupied entry // occupied entry
@ -337,12 +345,13 @@ impl TransactionFetcher {
} }
// vacant entry // vacant entry
let msg_version = || self.eth68_meta.peek(&hash).map(|_| EthVersion::Eth68).unwrap_or(EthVersion::Eth66); #[cfg(debug_assertions)]
previously_unseen_hashes.push(hash);
trace!(target: "net::tx", trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"), peer_id=format!("{peer_id:#}"),
hash=%hash, hash=%hash,
msg_version=%msg_version(), msg_version=%msg_version,
"new hash seen in announcement by peer" "new hash seen in announcement by peer"
); );
@ -356,7 +365,7 @@ impl TransactionFetcher {
debug!(target: "net::tx", debug!(target: "net::tx",
peer_id=format!("{peer_id:#}"), peer_id=format!("{peer_id:#}"),
hash=%hash, hash=%hash,
msg_version=%msg_version(), msg_version=%msg_version,
"failed to cache new announced hash from peer in schnellru::LruMap, dropping hash" "failed to cache new announced hash from peer in schnellru::LruMap, dropping hash"
); );
@ -364,6 +373,14 @@ impl TransactionFetcher {
} }
true true
}); });
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
previously_unseen_hashes_len=previously_unseen_hashes.len(),
previously_unseen_hashes=?previously_unseen_hashes,
msg_version=%msg_version,
"received previously unseen hashes in announcement from peer"
);
} }
/// Requests the missing transactions from the announced hashes of the peer. Returns the /// Requests the missing transactions from the announced hashes of the peer. Returns the
@ -375,28 +392,17 @@ impl TransactionFetcher {
/// flight. /// flight.
pub(super) fn request_transactions_from_peer( pub(super) fn request_transactions_from_peer(
&mut self, &mut self,
new_announced_hashes: Vec<TxHash>, new_announced_hashes: ValidTxHashes,
peer: &Peer, peer: &Peer,
metrics_increment_egress_peer_channel_full: impl FnOnce(), metrics_increment_egress_peer_channel_full: impl FnOnce(),
) -> Option<Vec<TxHash>> { ) -> Option<ValidTxHashes> {
let peer_id: PeerId = peer.request_tx.peer_id; let peer_id: PeerId = peer.request_tx.peer_id;
let msg_version = || {
new_announced_hashes
.first()
.map(|hash| {
self.eth68_meta
.peek(hash)
.map(|_| EthVersion::Eth68)
.unwrap_or(EthVersion::Eth66)
})
.expect("`new_announced_hashes` shouldn't be empty")
};
if self.active_peers.len() as u32 >= MAX_CONCURRENT_TX_REQUESTS { if self.active_peers.len() as u32 >= MAX_CONCURRENT_TX_REQUESTS {
debug!(target: "net::tx", debug!(target: "net::tx",
peer_id=format!("{peer_id:#}"), peer_id=format!("{peer_id:#}"),
new_announced_hashes=?new_announced_hashes, new_announced_hashes=?*new_announced_hashes,
msg_version=%msg_version(), msg_version=%new_announced_hashes.msg_version(),
limit=MAX_CONCURRENT_TX_REQUESTS, limit=MAX_CONCURRENT_TX_REQUESTS,
"limit for concurrent `GetPooledTransactions` requests reached, dropping request for hashes to peer" "limit for concurrent `GetPooledTransactions` requests reached, dropping request for hashes to peer"
); );
@ -406,8 +412,8 @@ impl TransactionFetcher {
let Some(inflight_count) = self.active_peers.get_or_insert(peer_id, || 0) else { let Some(inflight_count) = self.active_peers.get_or_insert(peer_id, || 0) else {
debug!(target: "net::tx", debug!(target: "net::tx",
peer_id=format!("{peer_id:#}"), peer_id=format!("{peer_id:#}"),
new_announced_hashes=?new_announced_hashes, new_announced_hashes=?*new_announced_hashes,
msg_version=%msg_version(), msg_version=%new_announced_hashes.msg_version(),
"failed to cache active peer in schnellru::LruMap, dropping request to peer" "failed to cache active peer in schnellru::LruMap, dropping request to peer"
); );
return Some(new_announced_hashes) return Some(new_announced_hashes)
@ -416,8 +422,8 @@ impl TransactionFetcher {
if *inflight_count >= MAX_CONCURRENT_TX_REQUESTS_PER_PEER { if *inflight_count >= MAX_CONCURRENT_TX_REQUESTS_PER_PEER {
debug!(target: "net::tx", debug!(target: "net::tx",
peer_id=format!("{peer_id:#}"), peer_id=format!("{peer_id:#}"),
new_announced_hashes=?new_announced_hashes, new_announced_hashes=?*new_announced_hashes,
msg_version=%msg_version(), msg_version=%new_announced_hashes.msg_version(),
limit=MAX_CONCURRENT_TX_REQUESTS_PER_PEER, limit=MAX_CONCURRENT_TX_REQUESTS_PER_PEER,
"limit for concurrent `GetPooledTransactions` requests per peer reached" "limit for concurrent `GetPooledTransactions` requests per peer reached"
); );
@ -428,7 +434,7 @@ impl TransactionFetcher {
debug_assert!( debug_assert!(
|| -> bool { || -> bool {
for hash in &new_announced_hashes { for hash in new_announced_hashes.iter() {
if self.buffered_hashes.contains(hash) { if self.buffered_hashes.contains(hash) {
return false return false
} }
@ -456,7 +462,7 @@ impl TransactionFetcher {
let req = req.into_get_pooled_transactions().expect("is get pooled tx"); let req = req.into_get_pooled_transactions().expect("is get pooled tx");
metrics_increment_egress_peer_channel_full(); metrics_increment_egress_peer_channel_full();
return Some(req.0) return Some(ValidTxHashes::new(req.0, new_announced_hashes.msg_version()))
} }
} }
} else { } else {
@ -753,14 +759,14 @@ pub(super) enum FetchEvent {
pub(super) struct GetPooledTxRequest { pub(super) struct GetPooledTxRequest {
peer_id: PeerId, peer_id: PeerId,
/// Transaction hashes that were requested, for cleanup purposes /// Transaction hashes that were requested, for cleanup purposes
requested_hashes: Vec<TxHash>, requested_hashes: ValidTxHashes,
response: oneshot::Receiver<RequestResult<PooledTransactions>>, response: oneshot::Receiver<RequestResult<PooledTransactions>>,
} }
pub(super) struct GetPooledTxResponse { pub(super) struct GetPooledTxResponse {
peer_id: PeerId, peer_id: PeerId,
/// Transaction hashes that were requested, for cleanup purposes /// Transaction hashes that were requested, for cleanup purposes
requested_hashes: Vec<TxHash>, requested_hashes: ValidTxHashes,
result: Result<RequestResult<PooledTransactions>, RecvError>, result: Result<RequestResult<PooledTransactions>, RecvError>,
} }
@ -775,7 +781,7 @@ impl GetPooledTxRequestFut {
#[inline] #[inline]
fn new( fn new(
peer_id: PeerId, peer_id: PeerId,
requested_hashes: Vec<TxHash>, requested_hashes: ValidTxHashes,
response: oneshot::Receiver<RequestResult<PooledTransactions>>, response: oneshot::Receiver<RequestResult<PooledTransactions>>,
) -> Self { ) -> Self {
Self { inner: Some(GetPooledTxRequest { peer_id, requested_hashes, response }) } Self { inner: Some(GetPooledTxRequest { peer_id, requested_hashes, response }) }
@ -846,13 +852,16 @@ mod test {
tx_fetcher.eth68_meta.insert(eth68_hashes[i], eth68_hashes_sizes[i]); tx_fetcher.eth68_meta.insert(eth68_hashes[i], eth68_hashes_sizes[i]);
} }
let mut eth68_hashes_to_request = eth68_hashes.clone().to_vec(); let mut eth68_hashes_to_request = ValidTxHashes::new_eth68(eth68_hashes.clone().to_vec());
let surplus_eth68_hashes = let surplus_eth68_hashes =
tx_fetcher.pack_hashes_eth68(&mut eth68_hashes_to_request, peer_id); tx_fetcher.pack_hashes_eth68(&mut eth68_hashes_to_request, peer_id);
assert_eq!(surplus_eth68_hashes, vec!(eth68_hashes[1], eth68_hashes[3], eth68_hashes[5]));
assert_eq!( assert_eq!(
eth68_hashes_to_request, surplus_eth68_hashes.into_hashes(),
vec!(eth68_hashes[1], eth68_hashes[3], eth68_hashes[5])
);
assert_eq!(
eth68_hashes_to_request.into_hashes(),
vec!(eth68_hashes[0], eth68_hashes[2], eth68_hashes[4]) vec!(eth68_hashes[0], eth68_hashes[2], eth68_hashes[4])
); );
} }

View File

@ -36,8 +36,9 @@ use crate::{
}; };
use futures::{stream::FuturesUnordered, Future, StreamExt}; use futures::{stream::FuturesUnordered, Future, StreamExt};
use reth_eth_wire::{ use reth_eth_wire::{
EthVersion, GetPooledTransactions, NewPooledTransactionHashes, NewPooledTransactionHashes66, EthVersion, GetPooledTransactions, HandleAnnouncement, NewPooledTransactionHashes,
NewPooledTransactionHashes68, PooledTransactions, Transactions, NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions, Transactions,
ValidTxHashes,
}; };
use reth_interfaces::{ use reth_interfaces::{
p2p::error::{RequestError, RequestResult}, p2p::error::{RequestError, RequestResult},
@ -590,9 +591,6 @@ where
// 2. filter out invalid entries // 2. filter out invalid entries
// //
// first get the message version, because this will destruct the message
let msg_version = msg.version();
//
// validates messages with respect to the given network, e.g. allowed tx types // validates messages with respect to the given network, e.g. allowed tx types
// //
let mut hashes = match msg { let mut hashes = match msg {
@ -604,7 +602,7 @@ where
if let FilterOutcome::ReportPeer = outcome { if let FilterOutcome::ReportPeer = outcome {
self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement); self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
} }
valid_data.into_iter().map(|(hash, metadata)| { let hashes = valid_data.into_iter().map(|(hash, metadata)| {
// cache eth68 metadata // cache eth68 metadata
if let Some((_ty, size)) = metadata { if let Some((_ty, size)) = metadata {
// check if this peer is announcing a different size for an already seen // check if this peer is announcing a different size for an already seen
@ -624,7 +622,9 @@ where
self.transaction_fetcher.eth68_meta.insert(hash, size); self.transaction_fetcher.eth68_meta.insert(hash, size);
} }
hash hash
}).collect::<Vec<_>>() }).collect::<Vec<_>>();
ValidTxHashes::new_eth68(hashes)
} }
NewPooledTransactionHashes::Eth66(eth66_msg) => { NewPooledTransactionHashes::Eth66(eth66_msg) => {
// validate eth66 announcement data // validate eth66 announcement data
@ -635,7 +635,9 @@ where
self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement); self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
} }
valid_data.into_keys().collect::<Vec<_>>() let valid_hashes = valid_data.into_data().into_keys().collect::<Vec<_>>();
ValidTxHashes::new_eth66(valid_hashes)
} }
}; };
@ -646,9 +648,11 @@ where
// for any seen hashes add the peer as fallback. unseen hashes are loaded into the tx // for any seen hashes add the peer as fallback. unseen hashes are loaded into the tx
// fetcher, hence they should be valid at this point. // fetcher, hence they should be valid at this point.
// //
self.transaction_fetcher.filter_unseen_hashes(&mut hashes, peer_id, |peer_id| { self.transaction_fetcher.filter_unseen_and_pending_hashes(
self.peers.contains_key(&peer_id) &mut hashes,
}); peer_id,
|peer_id| self.peers.contains_key(&peer_id),
);
if hashes.is_empty() { if hashes.is_empty() {
// nothing to request // nothing to request
@ -657,9 +661,10 @@ where
debug!(target: "net::tx", debug!(target: "net::tx",
peer_id=format!("{peer_id:#}"), peer_id=format!("{peer_id:#}"),
hashes=?hashes, hashes_len=hashes.len(),
msg_version=%msg_version, hashes=?*hashes,
"received previously unseen hashes in announcement from peer" msg_version=%hashes.msg_version(),
"received previously unseen and pending hashes in announcement from peer"
); );
// only send request for hashes to idle peer, otherwise buffer hashes storing peer as // only send request for hashes to idle peer, otherwise buffer hashes storing peer as
@ -667,8 +672,8 @@ where
if !self.transaction_fetcher.is_idle(peer_id) { if !self.transaction_fetcher.is_idle(peer_id) {
trace!(target: "net::tx", trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"), peer_id=format!("{peer_id:#}"),
hashes=?hashes, hashes=?*hashes,
msg_version=%msg_version, msg_version=%hashes.msg_version(),
"buffering hashes announced by busy peer" "buffering hashes announced by busy peer"
); );
@ -682,8 +687,8 @@ where
if !surplus_hashes.is_empty() { if !surplus_hashes.is_empty() {
trace!(target: "net::tx", trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"), peer_id=format!("{peer_id:#}"),
surplus_hashes=?surplus_hashes, surplus_hashes=?*surplus_hashes,
msg_version=%msg_version, msg_version=%surplus_hashes.msg_version(),
"some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes" "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
); );
@ -692,8 +697,8 @@ where
trace!(target: "net::tx", trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"), peer_id=format!("{peer_id:#}"),
hashes=?hashes, hashes=?*hashes,
msg_version=%msg_version, msg_version=%hashes.msg_version(),
"sending hashes in `GetPooledTransactions` request to peer's session" "sending hashes in `GetPooledTransactions` request to peer's session"
); );
@ -709,8 +714,8 @@ where
{ {
debug!(target: "net::tx", debug!(target: "net::tx",
peer_id=format!("{peer_id:#}"), peer_id=format!("{peer_id:#}"),
failed_to_request_hashes=?failed_to_request_hashes, failed_to_request_hashes=?*failed_to_request_hashes,
msg_version=%msg_version, msg_version=%failed_to_request_hashes.msg_version(),
"sending `GetPooledTransactions` request to peer's session failed, buffering hashes" "sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
); );
self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id)); self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
@ -755,26 +760,28 @@ where
self.transaction_fetcher.fill_eth66_request_for_peer(&mut hashes, peer_id); self.transaction_fetcher.fill_eth66_request_for_peer(&mut hashes, peer_id);
} }
let msg_version = || eth68_size.map(|_| EthVersion::Eth68).unwrap_or(EthVersion::Eth66); let msg_version = peer.version;
trace!(target: "net::tx", trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"), peer_id=format!("{peer_id:#}"),
hashes=?hashes, hashes=?hashes,
msg_version=%msg_version(), msg_version=%msg_version,
"requesting buffered hashes from idle peer" "requesting buffered hashes from idle peer"
); );
// request the buffered missing transactions // request the buffered missing transactions
let metrics = &self.metrics; let metrics = &self.metrics;
if let Some(failed_to_request_hashes) = if let Some(failed_to_request_hashes) =
self.transaction_fetcher.request_transactions_from_peer(hashes, peer, || { self.transaction_fetcher.request_transactions_from_peer(
metrics.egress_peer_channel_full.increment(1) ValidTxHashes::new(hashes, msg_version),
}) peer,
|| metrics.egress_peer_channel_full.increment(1),
)
{ {
debug!(target: "net::tx", debug!(target: "net::tx",
peer_id=format!("{peer_id:#}"), peer_id=format!("{peer_id:#}"),
failed_to_request_hashes=?failed_to_request_hashes, failed_to_request_hashes=?failed_to_request_hashes,
msg_version=%msg_version(), msg_version=%msg_version,
"failed sending request to peer's session, buffering hashes" "failed sending request to peer's session, buffering hashes"
); );

View File

@ -237,7 +237,7 @@ impl FilterAnnouncement for EthAnnouncementFilter {
network=%Self, network=%Self,
"empty eth68 announcement" "empty eth68 announcement"
); );
return (FilterOutcome::ReportPeer, HashMap::new()) return (FilterOutcome::ReportPeer, ValidAnnouncementData::empty_eth68())
} }
let mut should_report_peer = false; let mut should_report_peer = false;
@ -284,7 +284,7 @@ impl FilterAnnouncement for EthAnnouncementFilter {
( (
if should_report_peer { FilterOutcome::ReportPeer } else { FilterOutcome::Ok }, if should_report_peer { FilterOutcome::ReportPeer } else { FilterOutcome::Ok },
deduped_data, ValidAnnouncementData::new_eth68(deduped_data),
) )
} }
@ -306,7 +306,7 @@ impl FilterAnnouncement for EthAnnouncementFilter {
network=%Self, network=%Self,
"empty eth66 announcement" "empty eth66 announcement"
); );
return (FilterOutcome::ReportPeer, HashMap::new()) return (FilterOutcome::ReportPeer, ValidAnnouncementData::empty_eth66())
} }
// 2. checks if announcement is spam packed with duplicate hashes // 2. checks if announcement is spam packed with duplicate hashes
@ -324,7 +324,7 @@ impl FilterAnnouncement for EthAnnouncementFilter {
} else { } else {
FilterOutcome::Ok FilterOutcome::Ok
}, },
deduped_data, ValidAnnouncementData::new_eth66(deduped_data),
) )
} }
} }
@ -380,7 +380,7 @@ mod test {
let mut expected_data = HashMap::new(); let mut expected_data = HashMap::new();
expected_data.insert(hashes[1], Some((types[1], sizes[1]))); expected_data.insert(hashes[1], Some((types[1], sizes[1])));
assert_eq!(expected_data, data,) assert_eq!(expected_data, data.into_data())
} }
#[test] #[test]
@ -416,7 +416,7 @@ mod test {
let mut expected_data = HashMap::new(); let mut expected_data = HashMap::new();
expected_data.insert(hashes[2], Some((types[2], sizes[2]))); expected_data.insert(hashes[2], Some((types[2], sizes[2])));
assert_eq!(expected_data, data,) assert_eq!(expected_data, data.into_data())
} }
#[test] #[test]
@ -456,7 +456,7 @@ mod test {
expected_data.insert(hashes[3], Some((types[3], sizes[3]))); expected_data.insert(hashes[3], Some((types[3], sizes[3])));
expected_data.insert(hashes[0], Some((types[0], sizes[0]))); expected_data.insert(hashes[0], Some((types[0], sizes[0])));
assert_eq!(expected_data, data,) assert_eq!(expected_data, data.into_data())
} }
#[test] #[test]
@ -500,7 +500,7 @@ mod test {
expected_data.insert(hashes[1], None); expected_data.insert(hashes[1], None);
expected_data.insert(hashes[0], None); expected_data.insert(hashes[0], None);
assert_eq!(expected_data, data) assert_eq!(expected_data, data.into_data())
} }
#[test] #[test]