feat(p2p): integrate txpool in p2p (#208)

This commit is contained in:
Matthias Seitz
2022-11-15 22:27:41 +01:00
committed by GitHub
parent f0388e4032
commit f8fddcdfa4
11 changed files with 305 additions and 61 deletions

View File

@ -50,7 +50,7 @@ impl NetworkHandle {
}
/// Sends a [`NetworkHandleMessage`] to the manager
fn send_message(&self, msg: NetworkHandleMessage) {
pub(crate) fn send_message(&self, msg: NetworkHandleMessage) {
let _ = self.inner.to_manager_tx.send(msg);
}

View File

@ -1,24 +1,35 @@
//! Transaction management for the p2p network.
use crate::{cache::LruCache, manager::NetworkEvent, message::PeerRequestSender, NetworkHandle};
use futures::stream::FuturesUnordered;
use reth_primitives::{PeerId, Transaction, H256};
use reth_transaction_pool::TransactionPool;
use crate::{
cache::LruCache,
manager::NetworkEvent,
message::{PeerRequest, PeerRequestSender},
network::NetworkHandleMessage,
NetworkHandle,
};
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use reth_eth_wire::{GetPooledTransactions, NewPooledTransactionHashes, PooledTransactions};
use reth_interfaces::p2p::error::RequestResult;
use reth_primitives::{
FromRecoveredTransaction, IntoRecoveredTransaction, PeerId, TransactionSigned, TxHash, H256,
};
use reth_transaction_pool::{error::PoolResult, TransactionPool};
use std::{
collections::{hash_map::Entry, HashMap},
future::Future,
num::NonZeroUsize,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio::sync::{mpsc, oneshot, oneshot::Sender};
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
/// Cache limit of transactions to keep track of for a single peer.
const PEER_TRANSACTION_CACHE_LIMIT: usize = 1024;
const PEER_TRANSACTION_CACHE_LIMIT: usize = 1024 * 10;
/// The future for inserting a function into the pool
pub type PoolImportFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
pub type PoolImportFuture = Pin<Box<dyn Future<Output = PoolResult<TxHash>> + Send + 'static>>;
/// Api to interact with [`TransactionsManager`] task.
pub struct TransactionsHandle {
@ -52,11 +63,13 @@ pub struct TransactionsManager<Pool> {
///
/// From which we get all new incoming transaction related messages.
network_events: UnboundedReceiverStream<NetworkEvent>,
/// All currently active requests for pooled transactions.
inflight_requests: Vec<GetPooledTxRequest>,
/// All currently pending transactions grouped by peers.
///
/// This way we can track incoming transactions and prevent multiple pool imports for the same
/// transaction
transactions_by_peers: HashMap<H256, Vec<PeerId>>,
transactions_by_peers: HashMap<TxHash, Vec<PeerId>>,
/// Transactions that are currently imported into the `Pool`
pool_imports: FuturesUnordered<PoolImportFuture>,
/// All the connected peers.
@ -65,28 +78,36 @@ pub struct TransactionsManager<Pool> {
command_tx: mpsc::UnboundedSender<TransactionsCommand>,
/// Incoming commands from [`TransactionsHandle`].
command_rx: UnboundedReceiverStream<TransactionsCommand>,
/// Incoming commands from [`TransactionsHandle`].
pending_transactions: ReceiverStream<TxHash>,
}
// === impl TransactionsManager ===
impl<Pool> TransactionsManager<Pool>
where
Pool: TransactionPool<Transaction = Transaction>,
Pool: TransactionPool + Clone,
<Pool as TransactionPool>::Transaction: IntoRecoveredTransaction,
{
/// Sets up a new instance.
pub fn new(network: NetworkHandle, pool: Pool) -> Self {
let network_events = network.event_listener();
let (command_tx, command_rx) = mpsc::unbounded_channel();
// install a listener for new transactions
let pending = pool.pending_transactions_listener();
Self {
pool,
network,
network_events: UnboundedReceiverStream::new(network_events),
inflight_requests: Default::default(),
transactions_by_peers: Default::default(),
pool_imports: Default::default(),
peers: Default::default(),
command_tx,
command_rx: UnboundedReceiverStream::new(command_rx),
pending_transactions: ReceiverStream::new(pending),
}
}
@ -95,8 +116,63 @@ where
TransactionsHandle { manager_tx: self.command_tx.clone() }
}
/// Request handler for an incoming request for transactions
fn on_get_pooled_transactions(
&mut self,
peer_id: PeerId,
request: GetPooledTransactions,
response: Sender<RequestResult<PooledTransactions>>,
) {
if let Some(peer) = self.peers.get_mut(&peer_id) {
let transactions = self
.pool
.get_all(request.0)
.into_iter()
.map(|tx| tx.transaction.to_recovered_transaction().into_signed())
.collect::<Vec<_>>();
// we sent a response at which point we assume that the peer is aware of the transaction
peer.transactions.extend(transactions.iter().map(|tx| tx.hash()));
let resp = PooledTransactions(transactions);
let _ = response.send(Ok(resp));
}
}
/// Request handler for an incoming `NewPooledTransactionHashes`
fn on_new_pooled_transactions(
&mut self,
peer_id: PeerId,
msg: Arc<NewPooledTransactionHashes>,
) {
if let Some(peer) = self.peers.get_mut(&peer_id) {
let mut transactions = Arc::try_unwrap(msg).unwrap_or_else(|arc| (*arc).clone()).0;
// keep track of the transactions the peer knows
peer.transactions.extend(transactions.clone());
self.pool.retain_unknown(&mut transactions);
if transactions.is_empty() {
// nothing to request
return
}
// request the missing transactions
let (response, rx) = oneshot::channel();
let req = PeerRequest::GetPooledTransactions {
request: GetPooledTransactions(transactions),
response,
};
if peer.request_tx.try_send(req).is_ok() {
self.inflight_requests.push(GetPooledTxRequest { peer_id, response: rx })
}
}
}
/// Handles a received event
async fn on_event(&mut self, event: NetworkEvent) {
fn on_event(&mut self, event: NetworkEvent) {
match event {
NetworkEvent::SessionClosed { peer_id } => {
// remove the peer
@ -114,35 +190,140 @@ where
},
);
// TODO send `NewPooledTransactionHashes
// Send a `NewPooledTransactionHashes` to the peer with _all_ transactions in the
// pool
let msg = Arc::new(NewPooledTransactionHashes(self.pool.pooled_transactions()));
self.network.send_message(NetworkHandleMessage::SendPooledTransactionHashes {
peer_id,
msg,
})
}
NetworkEvent::IncomingTransactions { peer_id, msg } => {
let transactions = Arc::try_unwrap(msg).unwrap_or_else(|arc| (*arc).clone());
if let Some(peer) = self.peers.get_mut(&peer_id) {
for tx in transactions.0 {
// track that the peer knows this transaction
peer.transactions.insert(tx.hash);
match self.transactions_by_peers.entry(tx.hash) {
Entry::Occupied(mut entry) => {
// transaction was already inserted
entry.get_mut().push(peer_id);
}
Entry::Vacant(_) => {
// TODO import into the pool
}
}
}
self.import_transactions(peer_id, transactions.0);
}
NetworkEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
self.on_new_pooled_transactions(peer_id, msg)
}
NetworkEvent::GetPooledTransactions { peer_id, request, response } => {
if let Ok(response) = Arc::try_unwrap(response) {
// TODO(mattsse): there should be a dedicated channel for the transaction
// manager instead
self.on_get_pooled_transactions(peer_id, request, response)
}
}
NetworkEvent::IncomingPooledTransactionHashes { .. } => {}
NetworkEvent::GetPooledTransactions { .. } => {}
}
}
/// Executes an endless future
pub async fn run(self) {}
/// Starts the import process for the given transactions.
fn import_transactions(&mut self, peer_id: PeerId, transactions: Vec<TransactionSigned>) {
if let Some(peer) = self.peers.get_mut(&peer_id) {
for tx in transactions {
// recover transaction
let tx = if let Some(tx) = tx.into_ecrecovered() {
tx
} else {
// TODO: report peer?
continue
};
// track that the peer knows this transaction
peer.transactions.insert(tx.hash);
match self.transactions_by_peers.entry(tx.hash) {
Entry::Occupied(mut entry) => {
// transaction was already inserted
entry.get_mut().push(peer_id);
}
Entry::Vacant(entry) => {
// this is a new transaction that should be imported into the pool
let pool_transaction = <Pool::Transaction as FromRecoveredTransaction>::from_recovered_transaction(tx);
let pool = self.pool.clone();
let import = Box::pin(async move {
pool.add_external_transaction(pool_transaction).await
});
self.pool_imports.push(import);
entry.insert(vec![peer_id]);
}
}
}
}
}
fn on_good_import(&mut self, hash: TxHash) {
if let Some(_peers) = self.transactions_by_peers.remove(&hash) {
// TODO report good peer?
}
}
fn on_bad_import(&mut self, hash: TxHash) {
if let Some(_peers) = self.transactions_by_peers.remove(&hash) {
// TODO report bad peer?
}
}
}
/// An endless future.
///
/// This should be spawned or used as part of `tokio::select!`.
impl<Pool> Future for TransactionsManager<Pool>
where
Pool: TransactionPool + Clone + Unpin,
<Pool as TransactionPool>::Transaction: IntoRecoveredTransaction,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
// Advance all imports
while let Poll::Ready(Some(import_res)) = this.pool_imports.poll_next_unpin(cx) {
match import_res {
Ok(hash) => {
this.on_good_import(hash);
}
Err(err) => {
this.on_bad_import(*err.hash());
}
}
}
// handle new transactions
while let Poll::Ready(Some(_hash)) = this.pending_transactions.poll_next_unpin(cx) {
// TODO(mattsse): propagate new transactions
}
// Advance all requests.
// We remove each request one by one and add them back.
for idx in (0..this.inflight_requests.len()).rev() {
let mut req = this.inflight_requests.swap_remove(idx);
match req.response.poll_unpin(cx) {
Poll::Pending => {
this.inflight_requests.push(req);
}
Poll::Ready(Ok(Ok(txs))) => {
this.import_transactions(req.peer_id, txs.0);
}
Poll::Ready(Ok(Err(_))) => {
// TODO report bad peer
}
Poll::Ready(Err(_)) => {
// TODO report bad peer
}
}
}
Poll::Pending
}
}
/// An inflight request for `PooledTransactions` from a peer
#[allow(missing_docs)]
struct GetPooledTxRequest {
peer_id: PeerId,
response: oneshot::Receiver<RequestResult<PooledTransactions>>,
}
/// Tracks a single peer

View File

@ -42,8 +42,8 @@ pub use log::Log;
pub use receipt::Receipt;
pub use storage::StorageEntry;
pub use transaction::{
AccessList, AccessListItem, FromRecoveredTransaction, Signature, Transaction, TransactionKind,
TransactionSigned, TransactionSignedEcRecovered, TxType,
AccessList, AccessListItem, FromRecoveredTransaction, IntoRecoveredTransaction, Signature,
Transaction, TransactionKind, TransactionSigned, TransactionSignedEcRecovered, TxType,
};
/// A block hash.

View File

@ -571,6 +571,8 @@ impl TransactionSigned {
}
/// Recover signer from signature and hash.
///
/// Returns `None` if the transaction's signature is invalid.
pub fn recover_signer(&self) -> Option<Address> {
let signature_hash = self.signature_hash();
self.signature.recover_signer(signature_hash)
@ -724,6 +726,22 @@ impl FromRecoveredTransaction for TransactionSignedEcRecovered {
}
}
/// The inverse of [`FromRecoveredTransaction`] that ensure the transaction can be sent over the
/// network
pub trait IntoRecoveredTransaction {
/// Converts to this type into a [`TransactionSignedEcRecovered`].
///
/// Note: this takes `&self` since indented usage is via `Arc<Self>`.
fn to_recovered_transaction(&self) -> TransactionSignedEcRecovered;
}
impl IntoRecoveredTransaction for TransactionSignedEcRecovered {
#[inline]
fn to_recovered_transaction(&self) -> TransactionSignedEcRecovered {
self.clone()
}
}
#[cfg(test)]
mod tests {
use crate::{

View File

@ -16,8 +16,9 @@ reth-primitives = { path = "../primitives" }
# async/futures
async-trait = "0.1"
futures = "0.3"
futures-util = "0.3"
parking_lot = "0.12"
tokio = { version = "1", default-features = false, features = ["sync"] }
# misc
aquamarine = "0.1" # docs

View File

@ -87,9 +87,9 @@ use crate::{
traits::{NewTransactionEvent, PoolStatus, TransactionOrigin},
validate::ValidPoolTransaction,
};
use futures::channel::mpsc::Receiver;
use reth_primitives::{BlockID, TxHash, U256, U64};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::mpsc::Receiver;
mod config;
pub mod error;
@ -131,11 +131,12 @@ where
origin: TransactionOrigin,
transactions: impl IntoIterator<Item = V::Transaction>,
) -> PoolResult<HashMap<TxHash, TransactionValidationOutcome<V::Transaction>>> {
let outcome =
futures::future::join_all(transactions.into_iter().map(|tx| self.validate(origin, tx)))
.await
.into_iter()
.collect::<HashMap<_, _>>();
let outcome = futures_util::future::join_all(
transactions.into_iter().map(|tx| self.validate(origin, tx)),
)
.await
.into_iter()
.collect::<HashMap<_, _>>();
Ok(outcome)
}
@ -209,6 +210,10 @@ where
self.pool.add_transaction_listener()
}
fn pooled_transactions(&self) -> Vec<TxHash> {
self.pool.pooled_transactions()
}
fn best_transactions(
&self,
) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>> {
@ -222,6 +227,10 @@ where
todo!()
}
fn retain_unknown(&self, hashes: &mut Vec<TxHash>) {
self.pool.retain_unknown(hashes)
}
fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
self.inner().get(tx_hash)
}

View File

@ -1,9 +1,9 @@
//! Listeners for the transaction-pool
use crate::pool::events::TransactionEvent;
use futures::channel::mpsc::UnboundedSender;
use reth_primitives::H256;
use std::{collections::HashMap, hash};
use tokio::sync::mpsc::UnboundedSender;
type EventSink<Hash> = UnboundedSender<TransactionEvent<Hash>>;
@ -75,7 +75,7 @@ struct PoolEventNotifier<Hash> {
impl<Hash: Clone> PoolEventNotifier<Hash> {
fn notify(&mut self, event: TransactionEvent<Hash>) {
self.senders.retain(|sender| sender.unbounded_send(event.clone()).is_ok())
self.senders.retain(|sender| sender.send(event.clone()).is_ok())
}
fn is_done(&self) -> bool {

View File

@ -73,7 +73,6 @@ use crate::{
};
use best::BestTransactions;
pub use events::TransactionEvent;
use futures::channel::mpsc::{channel, Receiver, Sender};
use parking_lot::{Mutex, RwLock};
use reth_primitives::{Address, TxHash, H256};
use std::{
@ -81,6 +80,7 @@ use std::{
sync::Arc,
time::Instant,
};
use tokio::sync::mpsc;
use tracing::warn;
mod best;
@ -107,9 +107,9 @@ pub struct PoolInner<V: TransactionValidator, T: TransactionOrdering> {
/// Manages listeners for transaction state change events.
event_listener: RwLock<PoolEventListener<TxHash>>,
/// Listeners for new ready transactions.
pending_transaction_listener: Mutex<Vec<Sender<TxHash>>>,
pending_transaction_listener: Mutex<Vec<mpsc::Sender<TxHash>>>,
/// Listeners for new transactions added to the pool.
transaction_listener: Mutex<Vec<Sender<NewTransactionEvent<T::Transaction>>>>,
transaction_listener: Mutex<Vec<mpsc::Sender<NewTransactionEvent<T::Transaction>>>>,
}
// === impl PoolInner ===
@ -149,17 +149,23 @@ where
/// Adds a new transaction listener to the pool that gets notified about every new _ready_
/// transaction
pub fn add_pending_listener(&self) -> Receiver<TxHash> {
pub fn add_pending_listener(&self) -> mpsc::Receiver<TxHash> {
const TX_LISTENER_BUFFER_SIZE: usize = 2048;
let (tx, rx) = channel(TX_LISTENER_BUFFER_SIZE);
let (tx, rx) = mpsc::channel(TX_LISTENER_BUFFER_SIZE);
self.pending_transaction_listener.lock().push(tx);
rx
}
/// Returns hashes of _all_ transactions in the pool.
pub(crate) fn pooled_transactions(&self) -> Vec<TxHash> {
let pool = self.pool.read();
pool.all().hashes_iter().collect()
}
/// Adds a new transaction listener to the pool that gets notified about every new transaction
pub fn add_transaction_listener(&self) -> Receiver<NewTransactionEvent<T::Transaction>> {
pub fn add_transaction_listener(&self) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
const TX_LISTENER_BUFFER_SIZE: usize = 1024;
let (tx, rx) = channel(TX_LISTENER_BUFFER_SIZE);
let (tx, rx) = mpsc::channel(TX_LISTENER_BUFFER_SIZE);
self.transaction_listener.lock().push(tx);
rx
}
@ -256,8 +262,8 @@ where
let mut transaction_listeners = self.pending_transaction_listener.lock();
transaction_listeners.retain_mut(|listener| match listener.try_send(*ready) {
Ok(()) => true,
Err(e) => {
if e.is_full() {
Err(err) => {
if matches!(err, mpsc::error::TrySendError::Full(_)) {
warn!(
target: "txpool",
"[{:?}] dropping full ready transaction listener",
@ -277,8 +283,8 @@ where
transaction_listeners.retain_mut(|listener| match listener.try_send(event.clone()) {
Ok(()) => true,
Err(e) => {
if e.is_full() {
Err(err) => {
if matches!(err, mpsc::error::TrySendError::Full(_)) {
warn!(
target: "txpool",
"dropping full transaction listener",
@ -325,6 +331,12 @@ where
self.pool.read().best_transactions()
}
/// Removes all transactions transactions that are missing in the pool.
pub(crate) fn retain_unknown(&self, hashes: &mut Vec<TxHash>) {
let pool = self.pool.read();
hashes.retain(|tx| !pool.contains(tx))
}
/// Returns the transaction by hash.
pub(crate) fn get(
&self,

View File

@ -102,6 +102,11 @@ impl<T: TransactionOrdering> TxPool<T> {
}
}
/// Returns access to the [`AllTransactions`] container.
pub(crate) fn all(&self) -> &AllTransactions<T::Transaction> {
&self.all_transactions
}
/// Returns stats about the pool.
pub(crate) fn status(&self) -> PoolStatus {
PoolStatus {
@ -417,10 +422,6 @@ impl<T: TransactionOrdering> TxPool<T> {
#[cfg(test)]
#[allow(missing_docs)]
impl<T: TransactionOrdering> TxPool<T> {
pub(crate) fn all(&self) -> &AllTransactions<T::Transaction> {
&self.all_transactions
}
pub(crate) fn pending(&self) -> &PendingPool<T> {
&self.pending_pool
}
@ -463,6 +464,11 @@ impl<T: PoolTransaction> AllTransactions<T> {
Self { max_account_slots, ..Default::default() }
}
/// Returns an iterator over all _unique_ hashes in the pool
pub(crate) fn hashes_iter(&self) -> impl Iterator<Item = TxHash> + '_ {
self.by_hash.keys().copied()
}
/// Returns if the transaction for the given hash is already included in this pool
pub(crate) fn contains(&self, tx_hash: &TxHash) -> bool {
self.by_hash.contains_key(tx_hash)

View File

@ -1,7 +1,7 @@
use crate::{error::PoolResult, pool::state::SubPool, validate::ValidPoolTransaction, BlockID};
use futures::{channel::mpsc::Receiver, future::Shared};
use reth_primitives::{Address, FromRecoveredTransaction, TxHash, H256, U256};
use std::{fmt, sync::Arc};
use tokio::sync::mpsc::Receiver;
/// General purpose abstraction fo a transaction-pool.
///
@ -27,6 +27,8 @@ pub trait TransactionPool: Send + Sync + 'static {
///
/// This is intended to be used by the network to insert incoming transactions received over the
/// p2p network.
///
/// Consumer: P2P
async fn add_external_transaction(&self, transaction: Self::Transaction) -> PoolResult<TxHash> {
self.add_transaction(TransactionOrigin::External, transaction).await
}
@ -59,6 +61,13 @@ pub trait TransactionPool: Send + Sync + 'static {
/// Returns a new stream that yields new valid transactions added to the pool.
fn transactions_listener(&self) -> Receiver<NewTransactionEvent<Self::Transaction>>;
/// Returns hashes of all transactions in the pool.
///
/// Note: This returns a `Vec` but should guarantee that all hashes are unique.
///
/// Consumer: P2P
fn pooled_transactions(&self) -> Vec<TxHash>;
/// Returns an iterator that yields transactions that are ready for block production.
///
/// Consumer: Block production
@ -76,6 +85,13 @@ pub trait TransactionPool: Send + Sync + 'static {
tx_hashes: &[TxHash],
) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
/// Retains only those hashes that are unknown to the pool.
/// In other words, removes all transactions from the given set that are currently present in
/// the pool.
///
/// Consumer: P2P
fn retain_unknown(&self, hashes: &mut Vec<TxHash>);
/// Returns if the transaction for the given hash is already included in this pool.
fn contains(&self, tx_hash: &TxHash) -> bool {
self.get(tx_hash).is_some()