refactor(net): use shared objects on a per peer basis (#270)

This commit is contained in:
Matthias Seitz
2022-11-29 11:43:45 +01:00
committed by GitHub
parent d0b31bb074
commit e53ed8ffc2
8 changed files with 71 additions and 56 deletions

View File

@ -266,12 +266,11 @@ where
}
}
/// Handles a received Message from the peer.
/// Handles a received Message from the peer's session.
fn on_peer_message(&mut self, peer_id: PeerId, msg: PeerMessage) {
match msg {
PeerMessage::NewBlockHashes(hashes) => {
self.within_pow_or_disconnect(peer_id, |this| {
let hashes = Arc::try_unwrap(hashes).unwrap_or_else(|arc| (*arc).clone());
// update peer's state, to track what blocks this peer has seen
this.swarm.state_mut().on_new_block_hashes(peer_id, hashes.0)
})
@ -289,14 +288,17 @@ where
msg,
});
}
PeerMessage::Transactions(msg) => {
PeerMessage::EthRequest(req) => {
self.on_eth_request(peer_id, req);
}
PeerMessage::ReceivedTransaction(msg) => {
self.notify_tx_manager(NetworkTransactionEvent::IncomingTransactions {
peer_id,
msg,
});
}
PeerMessage::EthRequest(req) => {
self.on_eth_request(peer_id, req);
PeerMessage::SendTransactions(_) => {
unreachable!("Not emitted by session")
}
PeerMessage::Other(_) => {}
}
@ -320,7 +322,7 @@ where
self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::EthRequest(request))
}
NetworkHandleMessage::SendTransaction { peer_id, msg } => {
self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::Transactions(msg))
self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::SendTransactions(msg))
}
NetworkHandleMessage::SendPooledTransactionHashes { peer_id, msg } => self
.swarm

View File

@ -8,7 +8,7 @@ use reth_eth_wire::{
capability::CapabilityMessage, message::RequestPair, BlockBodies, BlockBody, BlockHeaders,
EthMessage, GetBlockBodies, GetBlockHeaders, GetNodeData, GetPooledTransactions, GetReceipts,
NewBlock, NewBlockHashes, NewPooledTransactionHashes, NodeData, PooledTransactions, Receipts,
Transactions,
SharedTransactions, Transactions,
};
use reth_interfaces::p2p::error::{RequestError, RequestResult};
use reth_primitives::{Header, PeerId, Receipt, TransactionSigned, H256};
@ -36,17 +36,20 @@ impl NewBlockMessage {
}
}
/// Represents all messages that can be sent to a peer session
/// All Bi-directional eth-message variants that can be sent to a session or received from a
/// session.
#[derive(Debug)]
pub enum PeerMessage {
/// Announce new block hashes
NewBlockHashes(Arc<NewBlockHashes>),
NewBlockHashes(NewBlockHashes),
/// Broadcast new block.
NewBlock(NewBlockMessage),
/// Broadcast transactions.
Transactions(Arc<Transactions>),
/// Received transactions _from_ the peer
ReceivedTransaction(Transactions),
/// Broadcast transactions _from_ local _to_ a peer.
SendTransactions(SharedTransactions),
/// Send new pooled transactions
PooledTransactions(Arc<NewPooledTransactionHashes>),
PooledTransactions(NewPooledTransactionHashes),
/// All `eth` request variants.
EthRequest(PeerRequest),
/// Other than eth namespace message

View File

@ -5,8 +5,8 @@ use crate::{
peers::{PeersHandle, ReputationChangeKind},
};
use parking_lot::Mutex;
use reth_eth_wire::{NewBlock, NewPooledTransactionHashes, Transactions};
use reth_primitives::{PeerId, H256};
use reth_eth_wire::{NewBlock, NewPooledTransactionHashes, SharedTransactions};
use reth_primitives::{PeerId, TransactionSigned, H256};
use std::{
net::SocketAddr,
sync::{
@ -107,8 +107,11 @@ impl NetworkHandle {
}
/// Send full transactions to the peer
pub fn send_transactions(&self, peer_id: PeerId, msg: Arc<Transactions>) {
self.send_message(NetworkHandleMessage::SendTransaction { peer_id, msg })
pub fn send_transactions(&self, peer_id: PeerId, msg: Vec<Arc<TransactionSigned>>) {
self.send_message(NetworkHandleMessage::SendTransaction {
peer_id,
msg: SharedTransactions(msg),
})
}
}
@ -139,9 +142,9 @@ pub(crate) enum NetworkHandleMessage {
/// Broadcast event to announce a new block to all nodes.
AnnounceBlock(NewBlock, H256),
/// Sends the list of transactions to the given peer.
SendTransaction { peer_id: PeerId, msg: Arc<Transactions> },
SendTransaction { peer_id: PeerId, msg: SharedTransactions },
/// Sends the list of transactions hashes to the given peer.
SendPooledTransactionHashes { peer_id: PeerId, msg: Arc<NewPooledTransactionHashes> },
SendPooledTransactionHashes { peer_id: PeerId, msg: NewPooledTransactionHashes },
/// Send an `eth` protocol request to the peer.
EthRequest {
/// The peer to send the request to.

View File

@ -137,7 +137,7 @@ impl ActiveSession {
return Some(EthStreamError::HandshakeError(HandshakeError::StatusNotInHandshake))
}
EthMessage::NewBlockHashes(msg) => {
self.emit_message(PeerMessage::NewBlockHashes(Arc::new(msg)));
self.emit_message(PeerMessage::NewBlockHashes(msg));
}
EthMessage::NewBlock(msg) => {
let block =
@ -145,10 +145,10 @@ impl ActiveSession {
self.emit_message(PeerMessage::NewBlock(block));
}
EthMessage::Transactions(msg) => {
self.emit_message(PeerMessage::Transactions(Arc::new(msg)));
self.emit_message(PeerMessage::ReceivedTransaction(msg));
}
EthMessage::NewPooledTransactionHashes(msg) => {
self.emit_message(PeerMessage::PooledTransactions(Arc::new(msg)));
self.emit_message(PeerMessage::PooledTransactions(msg));
}
EthMessage::GetBlockHeaders(req) => {
on_request!(req, BlockHeaders, GetBlockHeaders);
@ -198,22 +198,24 @@ impl ActiveSession {
fn on_peer_message(&mut self, msg: PeerMessage) {
match msg {
PeerMessage::NewBlockHashes(msg) => {
self.queued_outgoing.push_back(EthBroadcastMessage::NewBlockHashes(msg).into());
self.queued_outgoing.push_back(EthMessage::NewBlockHashes(msg).into());
}
PeerMessage::NewBlock(msg) => {
self.queued_outgoing.push_back(EthBroadcastMessage::NewBlock(msg.block).into());
}
PeerMessage::Transactions(msg) => {
self.queued_outgoing.push_back(EthBroadcastMessage::Transactions(msg).into());
}
PeerMessage::PooledTransactions(msg) => {
self.queued_outgoing
.push_back(EthBroadcastMessage::NewPooledTransactionHashes(msg).into());
self.queued_outgoing.push_back(EthMessage::NewPooledTransactionHashes(msg).into());
}
PeerMessage::EthRequest(req) => {
let deadline = self.request_deadline();
self.on_peer_request(req, deadline);
}
PeerMessage::SendTransactions(msg) => {
self.queued_outgoing.push_back(EthBroadcastMessage::Transactions(msg).into());
}
PeerMessage::ReceivedTransaction(_) => {
unreachable!("Not emitted by network")
}
PeerMessage::Other(_) => {}
}
}

View File

@ -173,7 +173,7 @@ where
/// but sending `NewBlockHash` broadcast to all peers that haven't seen it yet.
pub(crate) fn announce_new_block_hash(&mut self, msg: NewBlockMessage) {
let number = msg.block.block.header.number;
let hashes = Arc::new(NewBlockHashes(vec![BlockHashNumber { hash: msg.hash, number }]));
let hashes = NewBlockHashes(vec![BlockHashNumber { hash: msg.hash, number }]);
for (peer_id, peer) in self.connected_peers.iter_mut() {
if peer.blocks.contains(&msg.hash) {
// skip peers which already reported the block
@ -186,7 +186,7 @@ where
self.queued_messages.push_back(StateAction::NewBlockHashes {
peer_id: *peer_id,
hashes: Arc::clone(&hashes),
hashes: hashes.clone(),
});
}
}
@ -409,7 +409,7 @@ pub(crate) enum StateAction {
/// Target of the message
peer_id: PeerId,
/// `NewBlockHashes` message to send to the peer.
hashes: Arc<NewBlockHashes>,
hashes: NewBlockHashes,
},
/// Create a new connection to the given node.
Connect { remote_addr: SocketAddr, peer_id: PeerId },

View File

@ -172,7 +172,9 @@ where
self.pool
.get_all(hashes)
.into_iter()
.map(|tx| (*tx.hash(), tx.transaction.to_recovered_transaction().into_signed()))
.map(|tx| {
(*tx.hash(), Arc::new(tx.transaction.to_recovered_transaction().into_signed()))
})
.collect(),
);
@ -182,7 +184,7 @@ where
fn propagate_transactions(
&mut self,
txs: Vec<(TxHash, TransactionSigned)>,
txs: Vec<(TxHash, Arc<TransactionSigned>)>,
) -> PropagatedTransactions {
let mut propagated = PropagatedTransactions::default();
@ -192,7 +194,7 @@ where
if !full.is_empty() {
// TODO select peer for full or hash
self.network.send_transactions(*peer_id, Arc::new(Transactions(full)));
self.network.send_transactions(*peer_id, full);
for hash in hashes {
propagated.0.entry(hash).or_default().push(PropagateKind::Full(*peer_id));
@ -204,13 +206,9 @@ where
}
/// Request handler for an incoming `NewPooledTransactionHashes`
fn on_new_pooled_transactions(
&mut self,
peer_id: PeerId,
msg: Arc<NewPooledTransactionHashes>,
) {
fn on_new_pooled_transactions(&mut self, peer_id: PeerId, msg: NewPooledTransactionHashes) {
if let Some(peer) = self.peers.get_mut(&peer_id) {
let mut transactions = Arc::try_unwrap(msg).unwrap_or_else(|arc| (*arc).clone()).0;
let mut transactions = msg.0;
// keep track of the transactions the peer knows
peer.transactions.extend(transactions.clone());
@ -239,8 +237,7 @@ where
fn on_network_tx_event(&mut self, event: NetworkTransactionEvent) {
match event {
NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
let transactions = Arc::try_unwrap(msg).unwrap_or_else(|arc| (*arc).clone());
self.import_transactions(peer_id, transactions.0);
self.import_transactions(peer_id, msg.0);
}
NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
self.on_new_pooled_transactions(peer_id, msg)
@ -272,7 +269,7 @@ where
// Send a `NewPooledTransactionHashes` to the peer with _all_ transactions in the
// pool
let msg = Arc::new(NewPooledTransactionHashes(self.pool.pooled_transactions()));
let msg = NewPooledTransactionHashes(self.pool.pooled_transactions());
self.network.send_message(NetworkHandleMessage::SendPooledTransactionHashes {
peer_id,
msg,
@ -434,9 +431,9 @@ enum TransactionsCommand {
#[derive(Debug)]
pub enum NetworkTransactionEvent {
/// Received list of transactions to the given peer.
IncomingTransactions { peer_id: PeerId, msg: Arc<Transactions> },
IncomingTransactions { peer_id: PeerId, msg: Transactions },
/// Received list of transactions hashes to the given peer.
IncomingPooledTransactionHashes { peer_id: PeerId, msg: Arc<NewPooledTransactionHashes> },
IncomingPooledTransactionHashes { peer_id: PeerId, msg: NewPooledTransactionHashes },
/// Incoming `GetPooledTransactions` request from a peer.
GetPooledTransactions {
peer_id: PeerId,