Bug fix tx fetcher RUST_LOG=trace & add state dump to debug_assert error (#6146)

This commit is contained in:
Emilia Hane
2024-01-22 16:37:56 +01:00
committed by GitHub
parent 7ceec05e63
commit b713408331
5 changed files with 169 additions and 86 deletions

View File

@ -124,6 +124,14 @@ pub enum NewPooledTransactionHashes {
// === impl NewPooledTransactionHashes ===
impl NewPooledTransactionHashes {
/// Returns the message [`EthVersion`].
pub fn version(&self) -> EthVersion {
match self {
NewPooledTransactionHashes::Eth66(_) => EthVersion::Eth66,
NewPooledTransactionHashes::Eth68(_) => EthVersion::Eth68,
}
}
/// Returns `true` if the payload is valid for the given version
pub fn is_valid_for_version(&self, version: EthVersion) -> bool {
match self {

View File

@ -3,6 +3,8 @@
use std::str::FromStr;
use derive_more::Display;
/// Error thrown when failed to parse a valid [`EthVersion`].
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[error("Unknown eth protocol version: {0}")]
@ -10,7 +12,7 @@ pub struct ParseVersionError(String);
/// The `eth` protocol version.
#[repr(u8)]
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Display)]
pub enum EthVersion {
/// The `eth` protocol version 66.
Eth66 = 66,

View File

@ -1,13 +1,9 @@
use core::hash::BuildHasher;
use derive_more::{Deref, DerefMut};
use itertools::Itertools;
use linked_hash_set::LinkedHashSet;
use schnellru::{self, ByLength, Limiter, RandomState, Unlimited};
use std::{
borrow::Borrow,
fmt::{self, Write},
hash::Hash,
num::NonZeroUsize,
};
use std::{borrow::Borrow, fmt, hash::Hash, num::NonZeroUsize};
/// A minimal LRU cache based on a `LinkedHashSet` with limited capacity.
///
@ -115,16 +111,22 @@ impl<K, V, L, S> fmt::Debug for LruMap<K, V, L, S>
where
K: Hash + PartialEq + fmt::Display,
V: fmt::Debug,
L: Limiter<K, V>,
L: Limiter<K, V> + fmt::Debug,
S: BuildHasher,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut debug_struct = f.debug_struct("LruMap");
for (k, v) in self.0.iter() {
let mut key_str = String::new();
write!(&mut key_str, "{k}")?;
debug_struct.field(&key_str, &v);
}
debug_struct.field("limiter", self.limiter());
debug_struct.field(
"inner",
&format_args!(
"Iter: {{{}}}",
self.0.iter().map(|(k, v)| format!(" {k}: {v:?}")).format(",")
),
);
debug_struct.finish()
}
}
@ -214,6 +216,6 @@ mod test {
let value_2 = Value(22);
cache.insert(key_2, value_2);
assert_eq!("LruMap { 2: Value(22), 1: Value(11) }", format!("{cache:?}"))
assert_eq!("LruMap { limiter: ByLength { max_length: 2 }, inner: Iter: { 2: Value(22), 1: Value(11)} }", format!("{cache:?}"))
}
}

View File

@ -3,9 +3,8 @@ use crate::{
message::PeerRequest,
};
use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
use itertools::Itertools;
use pin_project::pin_project;
use reth_eth_wire::GetPooledTransactions;
use reth_eth_wire::{EthVersion, GetPooledTransactions};
use reth_interfaces::p2p::error::{RequestError, RequestResult};
use reth_primitives::{PeerId, PooledTransactionsElement, TxHash};
use schnellru::{ByLength, Unlimited};
@ -83,17 +82,10 @@ impl TransactionFetcher {
}
/// Updates peer's activity status upon a resolved [`GetPooledTxRequest`].
fn update_peer_activity(&mut self, resp: &GetPooledTxResponse) {
let GetPooledTxResponse { peer_id, .. } = resp;
debug_assert!(
self.active_peers.get(peer_id).is_some(),
"broken invariant `active-peers` and `inflight-requests`"
);
fn decrement_inflight_request_count_for(&mut self, peer_id: PeerId) {
let remove = || -> bool {
if let Some(inflight_count) = self.active_peers.get(peer_id) {
if *inflight_count <= 1 {
if let Some(inflight_count) = self.active_peers.get(&peer_id) {
if *inflight_count <= MAX_CONCURRENT_TX_REQUESTS_PER_PEER {
return true
}
*inflight_count -= 1;
@ -102,7 +94,7 @@ impl TransactionFetcher {
}();
if remove {
self.active_peers.remove(peer_id);
self.active_peers.remove(&peer_id);
}
}
@ -170,13 +162,16 @@ impl TransactionFetcher {
///
/// Returns `true` if hash is included in request. If there is still space in the respective
/// response but not enough for the transaction of given hash, `false` is returned.
fn include_eth68_hash(&self, acc_size_response: &mut usize, eth68_hash: TxHash) -> bool {
fn include_eth68_hash(&self, acc_size_response: &mut usize, hash: TxHash) -> bool {
debug_assert!(
self.eth68_meta.peek(&eth68_hash).is_some(),
"broken invariant `eth68-hash` and `eth68-meta`"
self.eth68_meta.peek(&hash).is_some(),
"can't find eth68 metadata for `%hash` that should be of version eth68, broken invariant `@eth68_meta` and `@self`,
`%hash`: {},
`@self`: {:?}",
hash, self
);
if let Some(size) = self.eth68_meta.peek(&eth68_hash) {
if let Some(size) = self.eth68_meta.peek(&hash) {
let next_acc_size = *acc_size_response + size;
if next_acc_size <= MAX_FULL_TRANSACTIONS_PACKET_SIZE {
@ -212,11 +207,10 @@ impl TransactionFetcher {
hashes.retain(|&hash| match self.include_eth68_hash(&mut acc_size_response, hash) {
true => true,
false => {
trace!(
target: "net::tx",
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=format!("{hash:#}"),
size=self.eth68_meta.get(&hash).expect("should find size in `eth68-meta`"),
hash=%hash,
size=self.eth68_meta.peek(&hash).expect("should find size in `eth68-meta`"),
acc_size_response=acc_size_response,
MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE,
"no space for hash in `GetPooledTransactions` request to peer"
@ -251,10 +245,13 @@ impl TransactionFetcher {
let mut max_retried_hashes = vec![];
for hash in hashes {
// todo: enforce by adding new type UnknownTxHash
// todo: enforce by adding new types UnknownTxHash66 and UnknownTxHash68
debug_assert!(
self.unknown_hashes.peek(&hash).is_some(),
"only hashes that are confirmed as unknown should be buffered"
"`%hash` in `@buffered_hashes` that's not in `@unknown_hashes`, `@buffered_hashes` should be a subset of keys in `@unknown_hashes`, broken invariant `@buffered_hashes` and `@unknown_hashes`,
`%hash`: {},
`@self`: {:?}",
hash, self
);
let Some((retries, peers)) = self.unknown_hashes.get(&hash) else { return };
@ -266,11 +263,20 @@ impl TransactionFetcher {
// peer in caller's context has requested hash and is hence not eligible as
// fallback peer.
if *retries >= MAX_REQUEST_RETRIES_PER_TX_HASH {
let msg_version = || -> EthVersion {
self.eth68_meta
.peek(&hash)
.map(|_| EthVersion::Eth68)
.unwrap_or(EthVersion::Eth66)
};
debug!(target: "net::tx",
hash=format!("{hash:#}"),
hash=%hash,
retries=retries,
msg_version=%msg_version(),
"retry limit for `GetPooledTransactions` requests reached for hash, dropping hash"
);
max_retried_hashes.push(hash);
continue;
}
@ -326,11 +332,14 @@ impl TransactionFetcher {
backups.insert(peer_id);
return false
}
let msg_version = || -> EthVersion { self.eth68_meta.peek(hash).map(|_| EthVersion::Eth68).unwrap_or(EthVersion::Eth66) };
// vacant entry
trace!(
target: "net::tx",
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=format!("{hash:#}"),
hash=%hash,
msg_version=%msg_version(),
"new hash seen in announcement by peer"
);
@ -343,7 +352,8 @@ impl TransactionFetcher {
debug!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=format!("{hash:#}"),
hash=%hash,
msg_version=%msg_version(),
"failed to cache new announced hash from peer in schnellru::LruMap, dropping hash"
);
@ -368,10 +378,23 @@ impl TransactionFetcher {
) -> Option<Vec<TxHash>> {
let peer_id: PeerId = peer.request_tx.peer_id;
let msg_version = || -> EthVersion {
new_announced_hashes
.first()
.map(|hash| {
self.eth68_meta
.peek(hash)
.map(|_| EthVersion::Eth68)
.unwrap_or(EthVersion::Eth66)
})
.expect("`new_announced_hashes` shouldn't be empty")
};
if self.active_peers.len() as u32 >= MAX_CONCURRENT_TX_REQUESTS {
debug!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hashes=format!("[{:#}]", new_announced_hashes.iter().format(", ")),
new_announced_hashes=?new_announced_hashes,
msg_version=%msg_version(),
limit=MAX_CONCURRENT_TX_REQUESTS,
"limit for concurrent `GetPooledTransactions` requests reached, dropping request for hashes to peer"
);
@ -381,7 +404,8 @@ impl TransactionFetcher {
let Some(inflight_count) = self.active_peers.get_or_insert(peer_id, || 0) else {
debug!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hashes=format!("[{:#}]", new_announced_hashes.iter().format(", ")),
new_announced_hashes=?new_announced_hashes,
msg_version=%msg_version(),
"failed to cache active peer in schnellru::LruMap, dropping request to peer"
);
return Some(new_announced_hashes)
@ -390,7 +414,8 @@ impl TransactionFetcher {
if *inflight_count >= MAX_CONCURRENT_TX_REQUESTS_PER_PEER {
debug!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hashes=format!("[{:#}]", new_announced_hashes.iter().format(", ")),
new_announced_hashes=?new_announced_hashes,
msg_version=%msg_version(),
limit=MAX_CONCURRENT_TX_REQUESTS_PER_PEER,
"limit for concurrent `GetPooledTransactions` requests per peer reached"
);
@ -420,7 +445,6 @@ impl TransactionFetcher {
metrics_increment_egress_peer_channel_full();
return Some(new_announced_hashes)
} else {
// remove requested hashes from buffered hashes
debug_assert!(
|| -> bool {
for hash in &new_announced_hashes {
@ -430,7 +454,10 @@ impl TransactionFetcher {
}
true
}(),
"broken invariant `buffered-hashes` and `unknown-hashes`"
"`%new_announced_hashes` should been taken out of buffer before packing in a request, breaks invariant `@buffered_hashes` and `@inflight_requests`,
`%new_announced_hashes`: {:?},
`@self`: {:?}",
new_announced_hashes, self
);
// stores a new request future for the request
@ -452,45 +479,46 @@ impl TransactionFetcher {
&mut self,
hashes: &mut Vec<TxHash>,
peer_id: PeerId,
mut acc_eth68_size: Option<usize>,
mut acc_size_eth68_response: Option<usize>,
) {
debug_assert!(
acc_eth68_size.is_none() || {
acc_size_eth68_response.is_none() || {
let mut acc_size = 0;
for &hash in hashes.iter() {
_ = self.include_eth68_hash(&mut acc_size, hash);
}
Some(acc_size) == acc_eth68_size
Some(acc_size) == acc_size_eth68_response
},
"broken invariant `acc-eth68-size` and `hashes`"
"an eth68 request is being assembled and caller has miscalculated accumulated size of corresponding transactions response, broken invariant `%acc_size_eth68_response` and `%hashes`,
`%acc_size_eth68_response`: {:?},
`%hashes`: {:?},
`@self`: {:?}",
acc_size_eth68_response, hashes, self
);
for hash in self.buffered_hashes.iter() {
// if this request is for eth68 txns...
if let Some(acc_size_response) = acc_eth68_size.as_mut() {
if *acc_size_response >= MAX_FULL_TRANSACTIONS_PACKET_SIZE {
trace!(
target: "net::tx",
if let Some(acc_size_eth68_response) = acc_size_eth68_response.as_mut() {
if *acc_size_eth68_response >= MAX_FULL_TRANSACTIONS_PACKET_SIZE {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=format!("{hash:#}"),
size=self.eth68_meta.get(hash).expect("should find size in `eth68-meta`"),
acc_size_response=acc_size_response,
hash=%hash,
acc_size_eth68_response=acc_size_eth68_response,
MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE,
"found buffered hash for peer but can't fit it into request"
"request to peer full"
);
break
}
// ...and this buffered hash is for an eth68 tx, check the size metadata
if self.eth68_meta.get(hash).is_some() &&
!self.include_eth68_hash(acc_size_response, *hash)
!self.include_eth68_hash(acc_size_eth68_response, *hash)
{
trace!(
target: "net::tx",
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=format!("{hash:#}"),
size=self.eth68_meta.get(hash).expect("should find size in `eth68-meta`"),
acc_size_response=acc_size_response,
hash=%hash,
size=self.eth68_meta.peek(hash).expect("should find size in `eth68-meta`"),
acc_size_eth68_response=acc_size_eth68_response,
MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE,
"found buffered hash for peer but can't fit it into request"
);
@ -502,9 +530,21 @@ impl TransactionFetcher {
break
}
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=%hash,
size=self.eth68_meta.peek(hash),
acc_size_eth68_response=acc_size_eth68_response,
MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE,
"found buffered hash for request to peer"
);
debug_assert!(
self.unknown_hashes.peek(hash).is_some(),
"broken invariant `buffered-hashes` and `unknown-hashes`"
self.unknown_hashes.get(hash).is_some(),
"can't find buffered `%hash` in `@unknown_hashes`, `@buffered_hashes` should be a subset of keys in `@unknown_hashes`, broken invariant `@buffered_hashes` and `@unknown_hashes`,
`%hash`: {},
`@self`: {:?}",
hash, self
);
if let Some((_, fallback_peers)) = self.unknown_hashes.get(hash) {
@ -532,7 +572,17 @@ impl Stream for TransactionFetcher {
if let Poll::Ready(Some(response)) = res {
// update peer activity, requests for buffered hashes can only be made to idle
// fallback peers
self.update_peer_activity(&response);
let GetPooledTxResponse { peer_id, .. } = response;
debug_assert!(
self.active_peers.get(&peer_id).is_some(),
"`%peer_id` has been removed from `@active_peers` before inflight request(s) resolved, broken invariant `@active_peers` and `@inflight_requests`,
`%peer_id`: {},
`@self`: {:?}",
peer_id, self
);
self.decrement_inflight_request_count_for(peer_id);
let GetPooledTxResponse { peer_id, mut requested_hashes, result } = response;

View File

@ -35,7 +35,6 @@ use crate::{
NetworkEvents, NetworkHandle,
};
use futures::{stream::FuturesUnordered, Future, StreamExt};
use itertools::Itertools;
use reth_eth_wire::{
EthVersion, GetPooledTransactions, NewPooledTransactionHashes, NewPooledTransactionHashes66,
NewPooledTransactionHashes68, PooledTransactions, Transactions,
@ -559,13 +558,14 @@ where
};
// message version decides how hashes are packed
// if this is a eth68 message, store eth68 tx metadata
if let Some(eth68_msg) = msg.as_eth68() {
for (&hash, (_type, size)) in eth68_msg.metadata_iter() {
self.transaction_fetcher.eth68_meta.insert(hash, size);
}
}
// extract hashes payload
let msg_version = msg.version();
// extract hashes payload, and sizes if version eth68
let sizes = msg.as_eth68().map(|eth68_msg| {
eth68_msg
.metadata_iter()
.map(|(&hash, (_type, size))| (hash, size))
.collect::<HashMap<_, _>>()
});
let mut hashes = msg.into_hashes();
// keep track of the transactions the peer knows
@ -593,16 +593,27 @@ where
debug!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hashes=format!("[{:#}]", hashes.iter().format(", ")),
hashes=?hashes,
msg_version=?msg_version,
"received previously unseen hashes in announcement from peer"
);
if msg_version == EthVersion::Eth68 {
// cache size metadata of unseen hashes
for (hash, size) in sizes.expect("should be at least empty map") {
if hashes.contains(&hash) {
self.transaction_fetcher.eth68_meta.insert(hash, size);
}
}
}
// only send request for hashes to idle peer, otherwise buffer hashes storing peer as
// fallback
if !self.transaction_fetcher.is_idle(peer_id) {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hashes=format!("[{:#}]", hashes.iter().format(", ")),
hashes=?hashes,
msg_version=?msg_version,
"buffering hashes announced by busy peer"
);
@ -616,7 +627,8 @@ where
if !surplus_hashes.is_empty() {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
surplus_hashes=format!("{surplus_hashes:#?}"),
surplus_hashes=?surplus_hashes,
msg_version=?msg_version,
"some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
);
@ -625,7 +637,8 @@ where
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hashes=format!("[{:#}]", hashes.iter().format(", ")),
hashes=?hashes,
msg_version=?msg_version,
"sending hashes in `GetPooledTransactions` request to peer's session"
);
@ -641,7 +654,8 @@ where
{
debug!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hashes=format!("[{:#}]", failed_to_request_hashes.iter().format(", ")),
failed_to_request_hashes=?failed_to_request_hashes,
msg_version=?msg_version,
"sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
);
self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
@ -668,7 +682,11 @@ where
debug_assert!(
self.peers.contains_key(&peer_id),
"broken invariant `peers` and `transaction-fetcher`"
"a dead peer has been returned as idle by `@pop_any_idle_peer`, broken invariant `@peers` and `@transaction_fetcher`,
`%peer_id`: {:?},
`@peers`: {:?},
`@transaction_fetcher`: {:?}",
peer_id, self.peers, self.transaction_fetcher
);
// fill the request with other buffered hashes that have been announced by the peer
@ -676,12 +694,14 @@ where
let Some(hash) = hashes.first() else { return };
let acc_eth68_size = self.transaction_fetcher.eth68_meta.get(hash).copied();
let msg_version =
acc_eth68_size.map(|_| EthVersion::Eth68).unwrap_or(EthVersion::Eth66);
self.transaction_fetcher.fill_request_for_peer(&mut hashes, peer_id, acc_eth68_size);
trace!(
target: "net::tx",
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hashes=format!("[{:#}]", hashes.iter().format(", ")),
hashes=?hashes,
msg_version=?msg_version,
"requesting buffered hashes from idle peer"
);
@ -694,7 +714,8 @@ where
{
debug!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hashes=format!("[{:#}]", failed_to_request_hashes.iter().format(", ")),
failed_to_request_hashes=?failed_to_request_hashes,
msg_version=?msg_version,
"failed sending request to peer's session, buffering hashes"
);