mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat(txpool) modify txpool guard to be for pipeline syncs only (#4075)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
@ -49,6 +49,9 @@ pub trait NetworkInfo: Send + Sync {
|
||||
|
||||
/// Returns `true` if the network is undergoing sync.
|
||||
fn is_syncing(&self) -> bool;
|
||||
|
||||
/// Returns `true` when the node is undergoing the very first Pipeline sync.
|
||||
fn is_initially_syncing(&self) -> bool;
|
||||
}
|
||||
|
||||
/// Provides general purpose information about Peers in the network.
|
||||
|
||||
@ -45,6 +45,10 @@ impl NetworkInfo for NoopNetwork {
|
||||
fn is_syncing(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn is_initially_syncing(&self) -> bool {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
impl PeersInfo for NoopNetwork {
|
||||
|
||||
@ -55,6 +55,7 @@ impl NetworkHandle {
|
||||
network_mode,
|
||||
bandwidth_meter,
|
||||
is_syncing: Arc::new(AtomicBool::new(false)),
|
||||
initial_sync_done: Arc::new(AtomicBool::new(false)),
|
||||
chain_id,
|
||||
};
|
||||
Self { inner: Arc::new(inner) }
|
||||
@ -247,18 +248,33 @@ impl NetworkInfo for NetworkHandle {
|
||||
fn is_syncing(&self) -> bool {
|
||||
SyncStateProvider::is_syncing(self)
|
||||
}
|
||||
|
||||
fn is_initially_syncing(&self) -> bool {
|
||||
SyncStateProvider::is_initially_syncing(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl SyncStateProvider for NetworkHandle {
|
||||
fn is_syncing(&self) -> bool {
|
||||
self.inner.is_syncing.load(Ordering::Relaxed)
|
||||
}
|
||||
// used to guard the txpool
|
||||
fn is_initially_syncing(&self) -> bool {
|
||||
if self.inner.initial_sync_done.load(Ordering::Relaxed) {
|
||||
return false
|
||||
}
|
||||
self.inner.is_syncing.load(Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
impl NetworkSyncUpdater for NetworkHandle {
|
||||
fn update_sync_state(&self, state: SyncState) {
|
||||
let is_syncing = state.is_syncing();
|
||||
self.inner.is_syncing.store(is_syncing, Ordering::Relaxed)
|
||||
let future_state = state.is_syncing();
|
||||
let prev_state = self.inner.is_syncing.swap(future_state, Ordering::Relaxed);
|
||||
let syncing_to_idle_state_transition = prev_state && !future_state;
|
||||
if syncing_to_idle_state_transition {
|
||||
self.inner.initial_sync_done.store(true, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
/// Update the status of the node.
|
||||
@ -285,6 +301,8 @@ struct NetworkInner {
|
||||
bandwidth_meter: BandwidthMeter,
|
||||
/// Represents if the network is currently syncing.
|
||||
is_syncing: Arc<AtomicBool>,
|
||||
/// Used to differentiate between an initial pipeline sync or a live sync
|
||||
initial_sync_done: Arc<AtomicBool>,
|
||||
/// The chain id
|
||||
chain_id: Arc<AtomicU64>,
|
||||
}
|
||||
|
||||
@ -7,7 +7,7 @@ use crate::{
|
||||
metrics::{TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
|
||||
NetworkHandle,
|
||||
};
|
||||
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
|
||||
use futures::{stream::FuturesUnordered, Future, FutureExt, StreamExt};
|
||||
use reth_eth_wire::{
|
||||
EthVersion, GetPooledTransactions, NewPooledTransactionHashes, NewPooledTransactionHashes66,
|
||||
NewPooledTransactionHashes68, PooledTransactions, Transactions,
|
||||
@ -29,7 +29,6 @@ use reth_transaction_pool::{
|
||||
};
|
||||
use std::{
|
||||
collections::{hash_map::Entry, HashMap},
|
||||
future::Future,
|
||||
num::NonZeroUsize,
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
@ -206,8 +205,8 @@ where
|
||||
/// transactions to a fraction of peers usually ensures that all nodes receive the transaction
|
||||
/// and won't need to request it.
|
||||
fn on_new_transactions(&mut self, hashes: impl IntoIterator<Item = TxHash>) {
|
||||
// Nothing to propagate while syncing
|
||||
if self.network.is_syncing() {
|
||||
// Nothing to propagate while initially syncing
|
||||
if self.network.is_initially_syncing() {
|
||||
return
|
||||
}
|
||||
|
||||
@ -310,8 +309,8 @@ where
|
||||
peer_id: PeerId,
|
||||
msg: NewPooledTransactionHashes,
|
||||
) {
|
||||
// If the node is currently syncing, ignore transactions
|
||||
if self.network.is_syncing() {
|
||||
// If the node is initially syncing, ignore transactions
|
||||
if self.network.is_initially_syncing() {
|
||||
return
|
||||
}
|
||||
|
||||
@ -405,7 +404,7 @@ where
|
||||
// Send a `NewPooledTransactionHashes` to the peer with up to
|
||||
// `NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT` transactions in the
|
||||
// pool
|
||||
if !self.network.is_syncing() {
|
||||
if !self.network.is_initially_syncing() {
|
||||
let peer = self.peers.get_mut(&peer_id).expect("is present; qed");
|
||||
|
||||
let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
|
||||
@ -437,8 +436,8 @@ where
|
||||
transactions: Vec<TransactionSigned>,
|
||||
source: TransactionSource,
|
||||
) {
|
||||
// If the node is currently syncing, ignore transactions
|
||||
if self.network.is_syncing() {
|
||||
// If the node is pipeline syncing, ignore transactions
|
||||
if self.network.is_initially_syncing() {
|
||||
return
|
||||
}
|
||||
|
||||
@ -595,9 +594,11 @@ where
|
||||
this.on_good_import(hash);
|
||||
}
|
||||
Err(err) => {
|
||||
// if we're syncing and the transaction is bad we ignore it, otherwise we
|
||||
// penalize the peer that sent the bad transaction with the assumption that the
|
||||
// peer should have known that this transaction is bad. (e.g. consensus rules)
|
||||
// if we're _currently_ syncing and the transaction is bad we ignore it,
|
||||
// otherwise we penalize the peer that sent the bad
|
||||
// transaction with the assumption that the peer should have
|
||||
// known that this transaction is bad. (e.g. consensus
|
||||
// rules)
|
||||
if err.is_bad_transaction() && !this.network.is_syncing() {
|
||||
trace!(target: "net::tx", ?err, "Bad transaction import");
|
||||
this.on_bad_import(*err.hash());
|
||||
@ -794,12 +795,23 @@ mod tests {
|
||||
use reth_rlp::Decodable;
|
||||
use reth_transaction_pool::test_utils::{testing_pool, MockTransaction};
|
||||
use secp256k1::SecretKey;
|
||||
use std::future::poll_fn;
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
#[cfg_attr(not(feature = "geth-tests"), ignore)]
|
||||
async fn test_ignored_tx_broadcasts_while_syncing() {
|
||||
async fn test_ignored_tx_broadcasts_while_initially_syncing() {
|
||||
reth_tracing::init_test_tracing();
|
||||
let net = Testnet::create(3).await;
|
||||
|
||||
let mut handles = net.handles();
|
||||
let handle0 = handles.next().unwrap();
|
||||
let handle1 = handles.next().unwrap();
|
||||
|
||||
drop(handles);
|
||||
let handle = net.spawn();
|
||||
|
||||
let listener0 = handle0.event_listener();
|
||||
handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
|
||||
let secret_key = SecretKey::new(&mut rand::thread_rng());
|
||||
|
||||
let client = NoopProvider::default();
|
||||
@ -808,7 +820,7 @@ mod tests {
|
||||
.disable_discovery()
|
||||
.listener_port(0)
|
||||
.build(client);
|
||||
let (handle, network, mut transactions, _) = NetworkManager::new(config)
|
||||
let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
|
||||
.await
|
||||
.unwrap()
|
||||
.into_builder()
|
||||
@ -817,17 +829,143 @@ mod tests {
|
||||
|
||||
tokio::task::spawn(network);
|
||||
|
||||
handle.update_sync_state(SyncState::Syncing);
|
||||
assert!(NetworkInfo::is_syncing(&handle));
|
||||
|
||||
let peer_id = PeerId::random();
|
||||
// go to syncing (pipeline sync)
|
||||
network_handle.update_sync_state(SyncState::Syncing);
|
||||
assert!(NetworkInfo::is_syncing(&network_handle));
|
||||
assert!(NetworkInfo::is_initially_syncing(&network_handle));
|
||||
|
||||
// wait for all initiator connections
|
||||
let mut established = listener0.take(2);
|
||||
while let Some(ev) = established.next().await {
|
||||
match ev {
|
||||
NetworkEvent::SessionEstablished {
|
||||
peer_id,
|
||||
remote_addr,
|
||||
client_version,
|
||||
capabilities,
|
||||
messages,
|
||||
status,
|
||||
version,
|
||||
} => {
|
||||
// to insert a new peer in transactions peerset
|
||||
transactions.on_network_event(NetworkEvent::SessionEstablished {
|
||||
peer_id,
|
||||
remote_addr,
|
||||
client_version,
|
||||
capabilities,
|
||||
messages,
|
||||
status,
|
||||
version,
|
||||
})
|
||||
}
|
||||
NetworkEvent::PeerAdded(_peer_id) => continue,
|
||||
ev => {
|
||||
panic!("unexpected event {ev:?}")
|
||||
}
|
||||
}
|
||||
}
|
||||
// random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
|
||||
let input = hex::decode("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76").unwrap();
|
||||
let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
|
||||
transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
|
||||
peer_id,
|
||||
msg: Transactions(vec![TransactionSigned::default()]),
|
||||
peer_id: *handle1.peer_id(),
|
||||
msg: Transactions(vec![signed_tx.clone()]),
|
||||
});
|
||||
|
||||
poll_fn(|cx| {
|
||||
let _ = transactions.poll_unpin(cx);
|
||||
Poll::Ready(())
|
||||
})
|
||||
.await;
|
||||
assert!(pool.is_empty());
|
||||
handle.terminate().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
#[cfg_attr(not(feature = "geth-tests"), ignore)]
|
||||
async fn test_tx_broadcasts_through_two_syncs() {
|
||||
reth_tracing::init_test_tracing();
|
||||
let net = Testnet::create(3).await;
|
||||
|
||||
let mut handles = net.handles();
|
||||
let handle0 = handles.next().unwrap();
|
||||
let handle1 = handles.next().unwrap();
|
||||
|
||||
drop(handles);
|
||||
let handle = net.spawn();
|
||||
|
||||
let listener0 = handle0.event_listener();
|
||||
handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
|
||||
let secret_key = SecretKey::new(&mut rand::thread_rng());
|
||||
|
||||
let client = NoopProvider::default();
|
||||
let pool = testing_pool();
|
||||
let config = NetworkConfigBuilder::new(secret_key)
|
||||
.disable_discovery()
|
||||
.listener_port(0)
|
||||
.build(client);
|
||||
let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
|
||||
.await
|
||||
.unwrap()
|
||||
.into_builder()
|
||||
.transactions(pool.clone())
|
||||
.split_with_handle();
|
||||
|
||||
tokio::task::spawn(network);
|
||||
|
||||
// go to syncing (pipeline sync) to idle and then to syncing (live)
|
||||
network_handle.update_sync_state(SyncState::Syncing);
|
||||
assert!(NetworkInfo::is_syncing(&network_handle));
|
||||
network_handle.update_sync_state(SyncState::Idle);
|
||||
assert!(!NetworkInfo::is_syncing(&network_handle));
|
||||
network_handle.update_sync_state(SyncState::Syncing);
|
||||
assert!(NetworkInfo::is_syncing(&network_handle));
|
||||
|
||||
// wait for all initiator connections
|
||||
let mut established = listener0.take(2);
|
||||
while let Some(ev) = established.next().await {
|
||||
match ev {
|
||||
NetworkEvent::SessionEstablished {
|
||||
peer_id,
|
||||
remote_addr,
|
||||
client_version,
|
||||
capabilities,
|
||||
messages,
|
||||
status,
|
||||
version,
|
||||
} => {
|
||||
// to insert a new peer in transactions peerset
|
||||
transactions.on_network_event(NetworkEvent::SessionEstablished {
|
||||
peer_id,
|
||||
remote_addr,
|
||||
client_version,
|
||||
capabilities,
|
||||
messages,
|
||||
status,
|
||||
version,
|
||||
})
|
||||
}
|
||||
NetworkEvent::PeerAdded(_peer_id) => continue,
|
||||
ev => {
|
||||
panic!("unexpected event {ev:?}")
|
||||
}
|
||||
}
|
||||
}
|
||||
// random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
|
||||
let input = hex::decode("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76").unwrap();
|
||||
let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
|
||||
transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
|
||||
peer_id: *handle1.peer_id(),
|
||||
msg: Transactions(vec![signed_tx.clone()]),
|
||||
});
|
||||
poll_fn(|cx| {
|
||||
let _ = transactions.poll_unpin(cx);
|
||||
Poll::Ready(())
|
||||
})
|
||||
.await;
|
||||
assert!(!NetworkInfo::is_initially_syncing(&network_handle));
|
||||
assert!(NetworkInfo::is_syncing(&network_handle));
|
||||
assert!(!pool.is_empty());
|
||||
handle.terminate().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
@ -906,6 +1044,16 @@ mod tests {
|
||||
*handle1.peer_id(),
|
||||
transactions.transactions_by_peers.get(&signed_tx.hash()).unwrap()[0]
|
||||
);
|
||||
|
||||
// advance the transaction manager future
|
||||
poll_fn(|cx| {
|
||||
let _ = transactions.poll_unpin(cx);
|
||||
Poll::Ready(())
|
||||
})
|
||||
.await;
|
||||
|
||||
assert!(!pool.is_empty());
|
||||
assert!(pool.get(&signed_tx.hash).is_some());
|
||||
handle.terminate().await;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user