chore: introduce network primitives to transactions handle (#12711)

This commit is contained in:
Matthias Seitz
2024-11-20 15:39:32 +01:00
committed by GitHub
parent 68abcb1fe9
commit 04729f3c66

View File

@ -82,44 +82,28 @@ pub type PoolImportFuture = Pin<Box<dyn Future<Output = Vec<PoolResult<TxHash>>>
/// For example [`TransactionsHandle::get_peer_transaction_hashes`] returns the transaction hashes /// For example [`TransactionsHandle::get_peer_transaction_hashes`] returns the transaction hashes
/// known by a specific peer. /// known by a specific peer.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct TransactionsHandle { pub struct TransactionsHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
/// Command channel to the [`TransactionsManager`] /// Command channel to the [`TransactionsManager`]
manager_tx: mpsc::UnboundedSender<TransactionsCommand>, manager_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
} }
/// Implementation of the `TransactionsHandle` API for use in testnet via type /// Implementation of the `TransactionsHandle` API for use in testnet via type
/// [`PeerHandle`](crate::test_utils::PeerHandle). /// [`PeerHandle`](crate::test_utils::PeerHandle).
impl TransactionsHandle { impl<N: NetworkPrimitives> TransactionsHandle<N> {
fn send(&self, cmd: TransactionsCommand) { fn send(&self, cmd: TransactionsCommand<N>) {
let _ = self.manager_tx.send(cmd); let _ = self.manager_tx.send(cmd);
} }
/// Fetch the [`PeerRequestSender`] for the given peer. /// Fetch the [`PeerRequestSender`] for the given peer.
async fn peer_handle(&self, peer_id: PeerId) -> Result<Option<PeerRequestSender>, RecvError> { async fn peer_handle(
&self,
peer_id: PeerId,
) -> Result<Option<PeerRequestSender<PeerRequest<N>>>, RecvError> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
self.send(TransactionsCommand::GetPeerSender { peer_id, peer_request_sender: tx }); self.send(TransactionsCommand::GetPeerSender { peer_id, peer_request_sender: tx });
rx.await rx.await
} }
/// Requests the transactions directly from the given peer.
///
/// Returns `None` if the peer is not connected.
///
/// **Note**: this returns the response from the peer as received.
pub async fn get_pooled_transactions_from(
&self,
peer_id: PeerId,
hashes: Vec<B256>,
) -> Result<Option<Vec<PooledTransactionsElement>>, RequestError> {
let Some(peer) = self.peer_handle(peer_id).await? else { return Ok(None) };
let (tx, rx) = oneshot::channel();
let request = PeerRequest::GetPooledTransactions { request: hashes.into(), response: tx };
peer.try_send(request).ok();
rx.await?.map(|res| Some(res.0))
}
/// Manually propagate the transaction that belongs to the hash. /// Manually propagate the transaction that belongs to the hash.
pub fn propagate(&self, hash: TxHash) { pub fn propagate(&self, hash: TxHash) {
self.send(TransactionsCommand::PropagateHash(hash)) self.send(TransactionsCommand::PropagateHash(hash))
@ -179,6 +163,27 @@ impl TransactionsHandle {
} }
} }
impl TransactionsHandle {
/// Requests the transactions directly from the given peer.
///
/// Returns `None` if the peer is not connected.
///
/// **Note**: this returns the response from the peer as received.
pub async fn get_pooled_transactions_from(
&self,
peer_id: PeerId,
hashes: Vec<B256>,
) -> Result<Option<Vec<PooledTransactionsElement>>, RequestError> {
let Some(peer) = self.peer_handle(peer_id).await? else { return Ok(None) };
let (tx, rx) = oneshot::channel();
let request = PeerRequest::GetPooledTransactions { request: hashes.into(), response: tx };
peer.try_send(request).ok();
rx.await?.map(|res| Some(res.0))
}
}
/// Manages transactions on top of the p2p network. /// Manages transactions on top of the p2p network.
/// ///
/// This can be spawned to another task and is supposed to be run as background service. /// This can be spawned to another task and is supposed to be run as background service.
@ -235,12 +240,12 @@ pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives
/// Send half for the command channel. /// Send half for the command channel.
/// ///
/// This is kept so that a new [`TransactionsHandle`] can be created at any time. /// This is kept so that a new [`TransactionsHandle`] can be created at any time.
command_tx: mpsc::UnboundedSender<TransactionsCommand>, command_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
/// Incoming commands from [`TransactionsHandle`]. /// Incoming commands from [`TransactionsHandle`].
/// ///
/// This will only receive commands if a user manually sends a command to the manager through /// This will only receive commands if a user manually sends a command to the manager through
/// the [`TransactionsHandle`] to interact with this type directly. /// the [`TransactionsHandle`] to interact with this type directly.
command_rx: UnboundedReceiverStream<TransactionsCommand>, command_rx: UnboundedReceiverStream<TransactionsCommand<N>>,
/// A stream that yields new __pending__ transactions. /// A stream that yields new __pending__ transactions.
/// ///
/// A transaction is considered __pending__ if it is executable on the current state of the /// A transaction is considered __pending__ if it is executable on the current state of the
@ -312,7 +317,7 @@ impl<Pool: TransactionPool> TransactionsManager<Pool> {
impl<Pool, N: NetworkPrimitives> TransactionsManager<Pool, N> { impl<Pool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
/// Returns a new handle that can send commands to this type. /// Returns a new handle that can send commands to this type.
pub fn handle(&self) -> TransactionsHandle { pub fn handle(&self) -> TransactionsHandle<N> {
TransactionsHandle { manager_tx: self.command_tx.clone() } TransactionsHandle { manager_tx: self.command_tx.clone() }
} }
@ -1732,7 +1737,7 @@ impl PeerMetadata {
/// Commands to send to the [`TransactionsManager`] /// Commands to send to the [`TransactionsManager`]
#[derive(Debug)] #[derive(Debug)]
enum TransactionsCommand { enum TransactionsCommand<N: NetworkPrimitives = EthNetworkPrimitives> {
/// Propagate a transaction hash to the network. /// Propagate a transaction hash to the network.
PropagateHash(B256), PropagateHash(B256),
/// Propagate transaction hashes to a specific peer. /// Propagate transaction hashes to a specific peer.
@ -1751,7 +1756,7 @@ enum TransactionsCommand {
/// Requests a clone of the sender sender channel to the peer. /// Requests a clone of the sender sender channel to the peer.
GetPeerSender { GetPeerSender {
peer_id: PeerId, peer_id: PeerId,
peer_request_sender: oneshot::Sender<Option<PeerRequestSender>>, peer_request_sender: oneshot::Sender<Option<PeerRequestSender<PeerRequest<N>>>>,
}, },
} }