chore(consensus): crate refactor (#1094)

This commit is contained in:
Roman Krasiuk
2023-01-31 20:29:26 +02:00
committed by GitHub
parent e0dbcaece3
commit 5da6b07d9e
16 changed files with 172 additions and 117 deletions

20
Cargo.lock generated
View File

@ -4539,13 +4539,13 @@ dependencies = [
"async-trait",
"hex",
"jsonrpsee",
"reth-consensus",
"reth-interfaces",
"reth-network",
"reth-network-api",
"reth-primitives",
"reth-provider",
"reth-rpc-api",
"reth-rpc-engine-api",
"reth-rpc-types",
"reth-transaction-pool",
"serde",
@ -4578,6 +4578,24 @@ dependencies = [
"strum",
]
[[package]]
name = "reth-rpc-engine-api"
version = "0.1.0"
dependencies = [
"assert_matches",
"bytes",
"futures",
"reth-executor",
"reth-interfaces",
"reth-primitives",
"reth-provider",
"reth-rlp",
"reth-rpc-types",
"thiserror",
"tokio",
"tokio-stream",
]
[[package]]
name = "reth-rpc-types"
version = "0.1.0"

View File

@ -19,6 +19,7 @@ members = [
"crates/net/rpc",
"crates/net/rpc-api",
"crates/net/rpc-builder",
"crates/net/rpc-engine-api",
"crates/net/rpc-types",
"crates/net/downloaders",
"crates/primitives",

View File

@ -11,7 +11,7 @@ use clap::{crate_version, Parser};
use eyre::Context;
use fdlimit::raise_fd_limit;
use futures::{stream::select as stream_select, Stream, StreamExt};
use reth_consensus::BeaconConsensus;
use reth_consensus::beacon::BeaconConsensus;
use reth_db::mdbx::{Env, WriteMap};
use reth_downloaders::{bodies, headers};
use reth_interfaces::consensus::{Consensus, ForkchoiceState};

View File

@ -7,7 +7,7 @@ use crate::{
utils::{chainspec::chain_spec_value_parser, init::init_db},
NetworkOpts,
};
use reth_consensus::BeaconConsensus;
use reth_consensus::beacon::BeaconConsensus;
use reth_downloaders::bodies::concurrent::ConcurrentDownloaderBuilder;
use reth_net_nat::NatResolver;

View File

@ -1,5 +1,5 @@
//! Consensus for ethereum network
use crate::verification;
use crate::validation;
use reth_interfaces::consensus::{Consensus, Error, ForkchoiceState};
use reth_primitives::{BlockNumber, ChainSpec, SealedBlock, SealedHeader, H256};
use tokio::sync::{watch, watch::error::SendError};
@ -43,8 +43,8 @@ impl Consensus for BeaconConsensus {
}
fn validate_header(&self, header: &SealedHeader, parent: &SealedHeader) -> Result<(), Error> {
verification::validate_header_standalone(header, &self.chain_spec)?;
verification::validate_header_regarding_parent(parent, header, &self.chain_spec)?;
validation::validate_header_standalone(header, &self.chain_spec)?;
validation::validate_header_regarding_parent(parent, header, &self.chain_spec)?;
if Some(header.number) < self.chain_spec.paris_status().block_number() {
// TODO Consensus checks for old blocks:
@ -55,7 +55,7 @@ impl Consensus for BeaconConsensus {
}
fn pre_validate_block(&self, block: &SealedBlock) -> Result<(), Error> {
verification::validate_block_standalone(block)
validation::validate_block_standalone(block)
}
fn has_block_reward(&self, block_num: BlockNumber) -> bool {

View File

@ -0,0 +1,5 @@
//! Beacon consensus implementation.
mod beacon_consensus;
pub use beacon_consensus::BeaconConsensus;

View File

@ -1 +0,0 @@
//! Reth block execution/validation configuration and constants

View File

@ -9,12 +9,9 @@
//! # Features
//!
//! - `serde`: Enable serde support for configuration types.
pub mod consensus;
pub mod constants;
pub mod verification;
/// Engine API module.
pub mod engine;
/// Beacon consensus implementation.
pub mod beacon;
pub use consensus::BeaconConsensus;
pub use reth_interfaces::consensus::Error;
/// Collection of consensus validation methods.
pub mod validation;

View File

@ -1,4 +1,4 @@
//! ALl functions for verification of block
//! Collection of methods for block validation.
use reth_interfaces::{consensus::Error, Result as RethResult};
use reth_primitives::{
BlockNumber, ChainSpec, Hardfork, Header, SealedBlock, SealedHeader, Transaction,
@ -128,7 +128,7 @@ pub fn validate_transaction_regarding_header(
Ok(())
}
/// Iterate over all transactions, verify them against each other and against the block.
/// Iterate over all transactions, validate them against each other and against the block.
/// There is no gas check done as [REVM](https://github.com/bluealloy/revm/blob/fd0108381799662098b7ab2c429ea719d6dfbf28/crates/revm/src/evm_impl.rs#L113-L131) already checks that.
pub fn validate_all_transaction_regarding_block_and_nonces<
'a,

View File

@ -0,0 +1,30 @@
[package]
name = "reth-rpc-engine-api"
version = "0.1.0"
edition = "2021"
license = "MIT OR Apache-2.0"
repository = "https://github.com/paradigmxyz/reth"
description = "Implementation of Engine API"
[dependencies]
# reth
reth-primitives = { path = "../../primitives" }
reth-interfaces = { path = "../../interfaces" }
reth-provider = { path = "../../storage/provider" }
reth-rlp = { path = "../../common/rlp" }
reth-executor = { path = "../../executor" }
reth-rpc-types = { path = "../rpc-types" }
# async
futures = "0.3"
tokio = { version = "1", features = ["sync"] }
tokio-stream = "0.1"
# misc
thiserror = "1.0.37"
[dev-dependencies]
reth-interfaces = { path = "../../interfaces", features = ["test-utils"] }
reth-provider = { path = "../../storage/provider", features = ["test-utils"] }
assert_matches = "1.5.0"
bytes = "1.2"

View File

@ -1,3 +1,4 @@
use crate::{EngineApiError, EngineApiMessage, EngineApiResult};
use futures::StreamExt;
use reth_executor::{
executor,
@ -25,91 +26,38 @@ use std::{
use tokio::sync::oneshot;
use tokio_stream::wrappers::UnboundedReceiverStream;
mod error;
pub use error::{EngineApiError, EngineApiResult};
/// The Engine API response sender
pub type EngineApiSender<Ok> = oneshot::Sender<EngineApiResult<Ok>>;
/// The Consensus Engine API is a trait that grants the Consensus layer access to data and functions
/// in the Execution layer that are crucial for the consensus process.
pub trait ConsensusEngine {
/// Called to retrieve the latest state of the network, validate new blocks, and maintain
/// consistency between the Consensus and Execution layers.
fn get_payload(&self, payload_id: H64) -> Option<ExecutionPayload>;
/// When the Consensus layer receives a new block via the consensus gossip protocol,
/// the transactions in the block are sent to the execution layer in the form of a
/// `ExecutionPayload`. The Execution layer executes the transactions and validates the
/// state in the block header, then passes validation data back to Consensus layer, that
/// adds the block to the head of its own blockchain and attests to it. The block is then
/// broadcasted over the consensus p2p network in the form of a "Beacon block".
fn new_payload(&mut self, payload: ExecutionPayload) -> EngineApiResult<PayloadStatus>;
/// Called to resolve chain forks and ensure that the Execution layer is working with the latest
/// valid chain.
fn fork_choice_updated(
&self,
fork_choice_state: ForkchoiceState,
payload_attributes: Option<PayloadAttributes>,
) -> EngineApiResult<ForkchoiceUpdated>;
/// Called to verify network configuration parameters and ensure that Consensus and Execution
/// layers are using the latest configuration.
fn exchange_transition_configuration(
&self,
config: TransitionConfiguration,
) -> EngineApiResult<TransitionConfiguration>;
}
/// Message type for communicating with [EthConsensusEngine]
#[derive(Debug)]
pub enum EngineMessage {
/// New payload message
NewPayload(ExecutionPayload, EngineApiSender<PayloadStatus>),
/// Get payload message
GetPayload(H64, EngineApiSender<ExecutionPayload>),
/// Forkchoice updated message
ForkchoiceUpdated(
ForkchoiceState,
Option<PayloadAttributes>,
EngineApiSender<ForkchoiceUpdated>,
),
/// Exchange transition configuration message
ExchangeTransitionConfiguration(
TransitionConfiguration,
EngineApiSender<TransitionConfiguration>,
),
}
/// The consensus engine API implementation
#[must_use = "EthConsensusEngine does nothing unless polled."]
pub struct EthConsensusEngine<Client> {
/// The Engine API implementation that grants the Consensus layer access to data and
/// functions in the Execution layer that are crucial for the consensus process.
#[must_use = "EngineApi does nothing unless polled."]
pub struct EngineApi<Client> {
client: Arc<Client>,
/// Consensus configuration
chain_spec: ChainSpec,
rx: UnboundedReceiverStream<EngineMessage>,
rx: UnboundedReceiverStream<EngineApiMessage>,
// TODO: Placeholder for storing future blocks. Make cache bounded.
// Use [lru](https://crates.io/crates/lru) crate
local_store: HashMap<H64, ExecutionPayload>,
// remote_store: HashMap<H64, ExecutionPayload>,
}
impl<Client: HeaderProvider + BlockProvider + StateProvider> EthConsensusEngine<Client> {
fn on_message(&mut self, msg: EngineMessage) {
impl<Client: HeaderProvider + BlockProvider + StateProvider> EngineApi<Client> {
fn on_message(&mut self, msg: EngineApiMessage) {
match msg {
EngineMessage::GetPayload(payload_id, tx) => {
EngineApiMessage::GetPayload(payload_id, tx) => {
// NOTE: Will always result in `PayloadUnknown` since we don't support block
// building for now.
let _ = tx.send(self.get_payload(payload_id).ok_or(EngineApiError::PayloadUnknown));
}
EngineMessage::NewPayload(payload, tx) => {
EngineApiMessage::NewPayload(payload, tx) => {
let _ = tx.send(self.new_payload(payload));
}
EngineMessage::ForkchoiceUpdated(state, attrs, tx) => {
EngineApiMessage::ForkchoiceUpdated(state, attrs, tx) => {
let _ = tx.send(self.fork_choice_updated(state, attrs));
}
EngineMessage::ExchangeTransitionConfiguration(config, tx) => {
EngineApiMessage::ExchangeTransitionConfiguration(config, tx) => {
let _ = tx.send(self.exchange_transition_configuration(config));
}
}
@ -168,16 +116,20 @@ impl<Client: HeaderProvider + BlockProvider + StateProvider> EthConsensusEngine<
Ok(SealedBlock { header, body: transactions, ommers: Default::default() })
}
}
impl<Client: HeaderProvider + BlockProvider + StateProvider> ConsensusEngine
for EthConsensusEngine<Client>
{
fn get_payload(&self, payload_id: H64) -> Option<ExecutionPayload> {
/// Called to retrieve the latest state of the network, validate new blocks, and maintain
/// consistency between the Consensus and Execution layers.
pub fn get_payload(&self, payload_id: H64) -> Option<ExecutionPayload> {
self.local_store.get(&payload_id).cloned()
}
fn new_payload(&mut self, payload: ExecutionPayload) -> EngineApiResult<PayloadStatus> {
/// When the Consensus layer receives a new block via the consensus gossip protocol,
/// the transactions in the block are sent to the execution layer in the form of a
/// `ExecutionPayload`. The Execution layer executes the transactions and validates the
/// state in the block header, then passes validation data back to Consensus layer, that
/// adds the block to the head of its own blockchain and attests to it. The block is then
/// broadcasted over the consensus p2p network in the form of a "Beacon block".
pub fn new_payload(&mut self, payload: ExecutionPayload) -> EngineApiResult<PayloadStatus> {
let block = match self.try_construct_block(payload) {
Ok(b) => b,
Err(err) => {
@ -239,7 +191,9 @@ impl<Client: HeaderProvider + BlockProvider + StateProvider> ConsensusEngine
}
}
fn fork_choice_updated(
/// Called to resolve chain forks and ensure that the Execution layer is working with the latest
/// valid chain.
pub fn fork_choice_updated(
&self,
fork_choice_state: ForkchoiceState,
_payload_attributes: Option<PayloadAttributes>,
@ -269,7 +223,9 @@ impl<Client: HeaderProvider + BlockProvider + StateProvider> ConsensusEngine
.with_latest_valid_hash(chain_info.best_hash))
}
fn exchange_transition_configuration(
/// Called to verify network configuration parameters and ensure that Consensus and Execution
/// layers are using the latest configuration.
pub fn exchange_transition_configuration(
&self,
config: TransitionConfiguration,
) -> EngineApiResult<TransitionConfiguration> {
@ -319,7 +275,7 @@ impl<Client: HeaderProvider + BlockProvider + StateProvider> ConsensusEngine
}
}
impl<Client> Future for EthConsensusEngine<Client>
impl<Client> Future for EngineApi<Client>
where
Client: HeaderProvider + BlockProvider + StateProvider + Unpin,
{
@ -373,7 +329,7 @@ mod tests {
#[tokio::test]
async fn payload_validation() {
let (_tx, rx) = unbounded_channel();
let engine = EthConsensusEngine {
let engine = EngineApi {
client: Arc::new(MockEthProvider::default()),
chain_spec: MAINNET.clone(),
local_store: Default::default(),
@ -462,7 +418,7 @@ mod tests {
async fn payload_known() {
let (tx, rx) = unbounded_channel();
let client = Arc::new(MockEthProvider::default());
let engine = EthConsensusEngine {
let engine = EngineApi {
client: client.clone(),
chain_spec: MAINNET.clone(),
local_store: Default::default(),
@ -478,7 +434,7 @@ mod tests {
client.add_header(block_hash, block.header.unseal());
let (result_tx, result_rx) = oneshot::channel();
tx.send(EngineMessage::NewPayload(execution_payload, result_tx))
tx.send(EngineApiMessage::NewPayload(execution_payload, result_tx))
.expect("failed to send engine msg");
let result = result_rx.await;
@ -490,7 +446,7 @@ mod tests {
#[tokio::test]
async fn payload_parent_unknown() {
let (tx, rx) = unbounded_channel();
let engine = EthConsensusEngine {
let engine = EngineApi {
client: Arc::new(MockEthProvider::default()),
chain_spec: MAINNET.clone(),
local_store: Default::default(),
@ -501,7 +457,7 @@ mod tests {
let (result_tx, result_rx) = oneshot::channel();
let block = random_block(100, Some(H256::random()), None, Some(0)); // payload must have no ommers
tx.send(EngineMessage::NewPayload(block.into(), result_tx))
tx.send(EngineApiMessage::NewPayload(block.into(), result_tx))
.expect("failed to send engine msg");
let result = result_rx.await;
@ -515,7 +471,7 @@ mod tests {
let (tx, rx) = unbounded_channel();
let chain_spec = MAINNET.clone();
let client = Arc::new(MockEthProvider::default());
let engine = EthConsensusEngine {
let engine = EngineApi {
client: client.clone(),
chain_spec: chain_spec.clone(),
local_store: Default::default(),
@ -534,7 +490,7 @@ mod tests {
client.add_block(parent.hash(), parent.clone().unseal());
tx.send(EngineMessage::NewPayload(block.clone().into(), result_tx))
tx.send(EngineApiMessage::NewPayload(block.clone().into(), result_tx))
.expect("failed to send engine msg");
let result = result_rx.await;
@ -550,7 +506,7 @@ mod tests {
let (tx, rx) = unbounded_channel();
let chain_spec = MAINNET.clone();
let client = Arc::new(MockEthProvider::default());
let engine = EthConsensusEngine {
let engine = EngineApi {
client: client.clone(),
chain_spec: chain_spec.clone(),
local_store: Default::default(),
@ -576,7 +532,7 @@ mod tests {
client.add_block(parent.hash(), parent.clone().unseal());
tx.send(EngineMessage::NewPayload(block.clone().into(), result_tx))
tx.send(EngineApiMessage::NewPayload(block.clone().into(), result_tx))
.expect("failed to send engine msg");
let result = result_rx.await;
@ -604,7 +560,7 @@ mod tests {
#[tokio::test]
async fn payload_unknown() {
let (tx, rx) = unbounded_channel();
let engine = EthConsensusEngine {
let engine = EngineApi {
client: Arc::new(MockEthProvider::default()),
chain_spec: MAINNET.clone(),
local_store: Default::default(),
@ -616,7 +572,7 @@ mod tests {
let payload_id = H64::random();
let (result_tx, result_rx) = oneshot::channel();
tx.send(EngineMessage::GetPayload(payload_id, result_tx))
tx.send(EngineApiMessage::GetPayload(payload_id, result_tx))
.expect("failed to send engine msg");
assert_matches!(result_rx.await, Ok(Err(EngineApiError::PayloadUnknown)));
@ -633,7 +589,7 @@ mod tests {
async fn terminal_td_mismatch() {
let (tx, rx) = unbounded_channel();
let chain_spec = MAINNET.clone();
let engine = EthConsensusEngine {
let engine = EngineApi {
client: Arc::new(MockEthProvider::default()),
chain_spec: chain_spec.clone(),
local_store: Default::default(),
@ -652,7 +608,7 @@ mod tests {
};
let (result_tx, result_rx) = oneshot::channel();
tx.send(EngineMessage::ExchangeTransitionConfiguration(
tx.send(EngineApiMessage::ExchangeTransitionConfiguration(
transition_config.clone(),
result_tx,
))
@ -671,7 +627,7 @@ mod tests {
let (tx, rx) = unbounded_channel();
let client = Arc::new(MockEthProvider::default());
let chain_spec = MAINNET.clone();
let engine = EthConsensusEngine {
let engine = EngineApi {
client: client.clone(),
chain_spec: chain_spec.clone(),
local_store: Default::default(),
@ -695,7 +651,7 @@ mod tests {
// Unknown block number
let (result_tx, result_rx) = oneshot::channel();
tx.send(EngineMessage::ExchangeTransitionConfiguration(
tx.send(EngineApiMessage::ExchangeTransitionConfiguration(
transition_config.clone(),
result_tx,
))
@ -715,7 +671,7 @@ mod tests {
);
let (result_tx, result_rx) = oneshot::channel();
tx.send(EngineMessage::ExchangeTransitionConfiguration(
tx.send(EngineApiMessage::ExchangeTransitionConfiguration(
transition_config.clone(),
result_tx,
))
@ -734,7 +690,7 @@ mod tests {
let (tx, rx) = unbounded_channel();
let client = Arc::new(MockEthProvider::default());
let chain_spec = MAINNET.clone();
let engine = EthConsensusEngine {
let engine = EngineApi {
client: client.clone(),
chain_spec: chain_spec.clone(),
local_store: Default::default(),
@ -758,7 +714,7 @@ mod tests {
client.add_block(terminal_block.hash(), terminal_block.clone().unseal());
let (result_tx, result_rx) = oneshot::channel();
tx.send(EngineMessage::ExchangeTransitionConfiguration(
tx.send(EngineApiMessage::ExchangeTransitionConfiguration(
transition_config.clone(),
result_tx,
))

View File

@ -4,7 +4,7 @@ use thiserror::Error;
/// The Engine API result type
pub type EngineApiResult<Ok> = Result<Ok, EngineApiError>;
/// Error returned by [`EthConsensusEngine`][crate::engine::EthConsensusEngine]
/// Error returned by [`EngineApi`][crate::EngineApi]
#[derive(Error, Debug)]
pub enum EngineApiError {
/// Invalid payload extra data.

View File

@ -0,0 +1,22 @@
#![warn(missing_docs, unreachable_pub)]
#![deny(unused_must_use, rust_2018_idioms)]
#![doc(test(
no_crate_inject,
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
))]
//! The implementation of Engine API.
//! [Read more](https://github.com/ethereum/execution-apis/tree/main/src/engine).
/// The Engine API implementation.
pub mod engine_api;
/// The Engine API message type.
pub mod message;
/// Engine API error.
pub mod error;
pub use engine_api::{EngineApi, EngineApiSender};
pub use error::{EngineApiError, EngineApiResult};
pub use message::EngineApiMessage;

View File

@ -0,0 +1,26 @@
use crate::EngineApiSender;
use reth_interfaces::consensus::ForkchoiceState;
use reth_primitives::H64;
use reth_rpc_types::engine::{
ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, TransitionConfiguration,
};
/// Message type for communicating with [`EngineApi`][crate::EngineApi].
#[derive(Debug)]
pub enum EngineApiMessage {
/// New payload message
NewPayload(ExecutionPayload, EngineApiSender<PayloadStatus>),
/// Get payload message
GetPayload(H64, EngineApiSender<ExecutionPayload>),
/// Forkchoice updated message
ForkchoiceUpdated(
ForkchoiceState,
Option<PayloadAttributes>,
EngineApiSender<ForkchoiceUpdated>,
),
/// Exchange transition configuration message
ExchangeTransitionConfiguration(
TransitionConfiguration,
EngineApiSender<TransitionConfiguration>,
),
}

View File

@ -18,7 +18,7 @@ reth-provider = { path = "../../storage/provider" }
reth-transaction-pool = { path = "../../transaction-pool" }
reth-network = { path = "../network" }
reth-network-api = { path = "../network-api" }
reth-consensus = { path = "../../consensus", features = ["serde"] }
reth-rpc-engine-api = { path = "../rpc-engine-api" }
# rpc
jsonrpsee = { version = "0.16" }

View File

@ -1,10 +1,10 @@
use crate::result::rpc_err;
use async_trait::async_trait;
use jsonrpsee::core::{Error, RpcResult as Result};
use reth_consensus::engine::{EngineApiError, EngineApiResult, EngineMessage};
use reth_interfaces::consensus::ForkchoiceState;
use reth_primitives::H64;
use reth_rpc_api::EngineApiServer;
use reth_rpc_engine_api::{EngineApiError, EngineApiMessage, EngineApiResult};
use reth_rpc_types::engine::{
ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, TransitionConfiguration,
};
@ -16,7 +16,7 @@ use tokio::sync::{
/// The server implementation of Engine API
pub struct EngineApi {
/// Handle to the consensus engine
engine_tx: UnboundedSender<EngineMessage>,
engine_tx: UnboundedSender<EngineApiMessage>,
}
impl std::fmt::Debug for EngineApi {
@ -28,7 +28,7 @@ impl std::fmt::Debug for EngineApi {
impl EngineApi {
async fn delegate_request<T>(
&self,
msg: EngineMessage,
msg: EngineApiMessage,
rx: Receiver<EngineApiResult<T>>,
) -> Result<T> {
let _ = self.engine_tx.send(msg);
@ -49,7 +49,7 @@ impl EngineApiServer for EngineApi {
/// Caution: This should not accept the `withdrawals` field
async fn new_payload_v1(&self, payload: ExecutionPayload) -> Result<PayloadStatus> {
let (tx, rx) = oneshot::channel();
self.delegate_request(EngineMessage::NewPayload(payload, tx), rx).await
self.delegate_request(EngineApiMessage::NewPayload(payload, tx), rx).await
}
/// See also <https://github.com/ethereum/execution-apis/blob/8db51dcd2f4bdfbd9ad6e4a7560aac97010ad063/src/engine/specification.md#engine_newpayloadv1>
@ -67,7 +67,7 @@ impl EngineApiServer for EngineApi {
) -> Result<ForkchoiceUpdated> {
let (tx, rx) = oneshot::channel();
self.delegate_request(
EngineMessage::ForkchoiceUpdated(fork_choice_state, payload_attributes, tx),
EngineApiMessage::ForkchoiceUpdated(fork_choice_state, payload_attributes, tx),
rx,
)
.await
@ -87,7 +87,7 @@ impl EngineApiServer for EngineApi {
/// Caution: This should not return the `withdrawals` field
async fn get_payload_v1(&self, payload_id: H64) -> Result<ExecutionPayload> {
let (tx, rx) = oneshot::channel();
self.delegate_request(EngineMessage::GetPayload(payload_id, tx), rx).await
self.delegate_request(EngineApiMessage::GetPayload(payload_id, tx), rx).await
}
/// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/specification.md#engine_getpayloadv2>
@ -101,6 +101,7 @@ impl EngineApiServer for EngineApi {
config: TransitionConfiguration,
) -> Result<TransitionConfiguration> {
let (tx, rx) = oneshot::channel();
self.delegate_request(EngineMessage::ExchangeTransitionConfiguration(config, tx), rx).await
self.delegate_request(EngineApiMessage::ExchangeTransitionConfiguration(config, tx), rx)
.await
}
}