Make transaction manager param configurable from cli (#6594)

Co-authored-by: Emilia Hane <elsaemiliaevahane@gmail.com>
Co-authored-by: Bjerg <onbjerg@users.noreply.github.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
back
2024-02-16 14:25:55 -08:00
committed by GitHub
parent 468cbae89c
commit 208eb6d6a1
20 changed files with 265 additions and 47 deletions

1
Cargo.lock generated
View File

@ -6434,6 +6434,7 @@ dependencies = [
"reth-db",
"reth-discv4",
"reth-downloaders",
"reth-eth-wire",
"reth-interfaces",
"reth-metrics",
"reth-net-nat",

17
book/cli/reth/node.md vendored
View File

@ -129,6 +129,18 @@ Networking:
--max-inbound-peers <MAX_INBOUND_PEERS>
Maximum number of inbound requests. default: 30
--pooled-tx-response-soft-limit <BYTES>
Soft limit for the byte size of a [`PooledTransactions`](reth_eth_wire::PooledTransactions) response on assembling a [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) request. Spec'd at 2 MiB.
<https://github.com/ethereum/devp2p/blob/master/caps/eth.md#protocol-messages>.
[default: 2097152]
--pooled-tx-pack-soft-limit <BYTES>
Default soft limit for the byte size of a [`PooledTransactions`](reth_eth_wire::PooledTransactions) response on assembling a [`GetPooledTransactions`](reth_eth_wire::PooledTransactions) request. This defaults to less than the [`SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE`], at 2 MiB, used when assembling a [`PooledTransactions`](reth_eth_wire::PooledTransactions) response. Default is 128 KiB
[default: 131072]
RPC:
--http
Enable the HTTP-RPC server
@ -331,6 +343,11 @@ TxPool:
[default: 100]
--txpool.max_tx_input_bytes <MAX_TX_INPUT_BYTES>
Maximum size a single transaction can have
[default: 131072]
--txpool.nolocals
Flag to disable local transaction exemptions

View File

@ -1,8 +1,9 @@
//! Builder support for configuring the entire setup.
use crate::{
eth_requests::EthRequestHandler, transactions::TransactionsManager, NetworkHandle,
NetworkManager,
eth_requests::EthRequestHandler,
transactions::{TransactionsManager, TransactionsManagerConfig},
NetworkHandle, NetworkManager,
};
use reth_transaction_pool::TransactionPool;
use tokio::sync::mpsc;
@ -54,12 +55,13 @@ impl<C, Tx, Eth> NetworkBuilder<C, Tx, Eth> {
pub fn transactions<Pool: TransactionPool>(
self,
pool: Pool,
transactions_manager_config: TransactionsManagerConfig,
) -> NetworkBuilder<C, TransactionsManager<Pool>, Eth> {
let NetworkBuilder { mut network, request_handler, .. } = self;
let (tx, rx) = mpsc::unbounded_channel();
network.set_transactions(tx);
let handle = network.handle().clone();
let transactions = TransactionsManager::new(handle, pool, rx);
let transactions = TransactionsManager::new(handle, pool, rx, transactions_manager_config);
NetworkBuilder { network, request_handler, transactions }
}

View File

@ -5,6 +5,7 @@ use crate::{
import::{BlockImport, ProofOfStakeBlockImport},
peers::PeersConfig,
session::SessionsConfig,
transactions::TransactionsManagerConfig,
NetworkHandle, NetworkManager,
};
use reth_discv4::{Discv4Config, Discv4ConfigBuilder, DEFAULT_DISCOVERY_ADDRESS};
@ -74,6 +75,8 @@ pub struct NetworkConfig<C> {
pub extra_protocols: RlpxSubProtocols,
/// Whether to disable transaction gossip
pub tx_gossip_disabled: bool,
/// How to instantiate transactions manager.
pub transactions_manager_config: TransactionsManagerConfig,
/// Optimism Network Config
#[cfg(feature = "optimism")]
pub optimism_network_config: OptimismNetworkConfig,
@ -171,6 +174,8 @@ pub struct NetworkConfigBuilder {
/// The block importer type
#[serde(skip)]
block_import: Option<Box<dyn BlockImport>>,
/// How to instantiate transactions manager.
transactions_manager_config: TransactionsManagerConfig,
/// Optimism Network Config Builder
#[cfg(feature = "optimism")]
optimism_network_config: OptimismNetworkConfigBuilder,
@ -209,6 +214,7 @@ impl NetworkConfigBuilder {
block_import: None,
#[cfg(feature = "optimism")]
optimism_network_config: OptimismNetworkConfigBuilder::default(),
transactions_manager_config: Default::default(),
}
}
@ -274,6 +280,11 @@ impl NetworkConfigBuilder {
self
}
pub fn transactions_manager_config(mut self, config: TransactionsManagerConfig) -> Self {
self.transactions_manager_config = config;
self
}
/// Sets the discovery and listener address
///
/// This is a convenience function for both [NetworkConfigBuilder::listener_addr] and
@ -447,6 +458,7 @@ impl NetworkConfigBuilder {
block_import,
#[cfg(feature = "optimism")]
optimism_network_config: OptimismNetworkConfigBuilder { sequencer_endpoint },
transactions_manager_config,
} = self;
let listener_addr = listener_addr.unwrap_or(DEFAULT_DISCOVERY_ADDRESS);
@ -501,6 +513,7 @@ impl NetworkConfigBuilder {
tx_gossip_disabled,
#[cfg(feature = "optimism")]
optimism_network_config: OptimismNetworkConfig { sequencer_endpoint },
transactions_manager_config,
}
}
}

View File

@ -84,12 +84,13 @@
//!
//! let config =
//! NetworkConfig::builder(local_key).boot_nodes(mainnet_nodes()).build(client.clone());
//! let transactions_manager_config = config.transactions_manager_config.clone();
//!
//! // create the network instance
//! let (handle, network, transactions, request_handler) = NetworkManager::builder(config)
//! .await
//! .unwrap()
//! .transactions(pool)
//! .transactions(pool, transactions_manager_config)
//! .request_handler(client)
//! .split_with_handle();
//! }

View File

@ -179,6 +179,7 @@ where
tx_gossip_disabled,
#[cfg(feature = "optimism")]
optimism_network_config: crate::config::OptimismNetworkConfig { sequencer_endpoint },
..
} = config;
let peers_manager = PeersManager::new(peers_config);
@ -274,12 +275,13 @@ where
///
/// let config =
/// NetworkConfig::builder(local_key).boot_nodes(mainnet_nodes()).build(client.clone());
/// let transactions_manager_config = config.transactions_manager_config.clone();
///
/// // create the network instance
/// let (handle, network, transactions, request_handler) = NetworkManager::builder(config)
/// .await
/// .unwrap()
/// .transactions(pool)
/// .transactions(pool, transactions_manager_config)
/// .request_handler(client)
/// .split_with_handle();
/// }

View File

@ -6,7 +6,7 @@ use crate::{
eth_requests::EthRequestHandler,
peers::PeersHandle,
protocol::IntoRlpxSubProtocol,
transactions::{TransactionsHandle, TransactionsManager},
transactions::{TransactionsHandle, TransactionsManager, TransactionsManagerConfig},
NetworkConfig, NetworkConfigBuilder, NetworkEvent, NetworkEvents, NetworkHandle,
NetworkManager,
};
@ -394,7 +394,12 @@ where
pub fn install_transactions_manager(&mut self, pool: Pool) {
let (tx, rx) = unbounded_channel();
self.network.set_transactions(tx);
let transactions_manager = TransactionsManager::new(self.handle(), pool.clone(), rx);
let transactions_manager = TransactionsManager::new(
self.handle(),
pool.clone(),
rx,
TransactionsManagerConfig::default(),
);
self.transactions_manager = Some(transactions_manager);
self.pool = Some(pool);
}
@ -407,8 +412,12 @@ where
let Self { mut network, request_handler, client, secret_key, .. } = self;
let (tx, rx) = unbounded_channel();
network.set_transactions(tx);
let transactions_manager =
TransactionsManager::new(network.handle().clone(), pool.clone(), rx);
let transactions_manager = TransactionsManager::new(
network.handle().clone(),
pool.clone(),
rx,
TransactionsManagerConfig::default(),
);
Peer {
network,
request_handler,

View File

@ -0,0 +1,35 @@
use super::{
DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_PACK_GET_POOLED_TRANSACTIONS_REQUEST,
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
};
use derive_more::Constructor;
/// Configuration for managing transactions within the network.
#[derive(Debug, Default, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct TransactionsManagerConfig {
/// Configuration for fetching transactions.
pub transaction_fetcher_config: TransactionFetcherConfig,
}
/// Configuration for fetching transactions.
#[derive(Debug, Constructor, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct TransactionFetcherConfig {
/// Soft limit for the byte size of a
/// [`PooledTransactions`](reth_eth_wire::PooledTransactions) response on assembling a
/// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) request. Spec'd at 2
/// MiB.
pub soft_limit_byte_size_pooled_transactions_response: usize,
/// Soft limit for the byte size of the expected
/// [`PooledTransactions`](reth_eth_wire::PooledTransactions) response on packing a
/// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) request with hashes.
pub soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize,
}
impl Default for TransactionFetcherConfig {
fn default() -> Self {
Self { soft_limit_byte_size_pooled_transactions_response: SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE, soft_limit_byte_size_pooled_transactions_response_on_pack_request: DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_PACK_GET_POOLED_TRANSACTIONS_REQUEST
}
}
}

View File

@ -21,8 +21,10 @@ use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError}
use tracing::{debug, trace};
use super::{
config::TransactionFetcherConfig,
constants::{tx_fetcher::*, SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST},
AnnouncementFilter, Peer, PooledTransactions,
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
};
/// The type responsible for fetching missing transactions from peers.
@ -57,6 +59,15 @@ pub(crate) struct TransactionFetcher {
// === impl TransactionFetcher ===
impl TransactionFetcher {
/// Sets up transaction fetcher with config
pub fn with_transaction_fetcher_config(mut self, config: &TransactionFetcherConfig) -> Self {
self.info.soft_limit_byte_size_pooled_transactions_response =
config.soft_limit_byte_size_pooled_transactions_response;
self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request =
config.soft_limit_byte_size_pooled_transactions_response_on_pack_request;
self
}
/// Removes the specified hashes from inflight tracking.
#[inline]
pub fn remove_hashes_from_transaction_fetcher<I>(&mut self, hashes: I)
@ -191,8 +202,8 @@ impl TransactionFetcher {
hashes_to_request.push(hash);
// tx is really big, pack request with single tx
if size >= DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_PACK_GET_POOLED_TRANSACTIONS_REQUEST {
return hashes_from_announcement_iter.collect::<RequestTxHashes>()
if size >= self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request {
return hashes_from_announcement_iter.collect::<RequestTxHashes>();
} else {
acc_size_response = size;
}
@ -211,7 +222,9 @@ impl TransactionFetcher {
let next_acc_size = acc_size_response + size;
if next_acc_size <= DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_PACK_GET_POOLED_TRANSACTIONS_REQUEST {
if next_acc_size <=
self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request
{
// only update accumulated size of tx response if tx will fit in without exceeding
// soft limit
acc_size_response = next_acc_size;
@ -221,7 +234,8 @@ impl TransactionFetcher {
}
let free_space =
DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_PACK_GET_POOLED_TRANSACTIONS_REQUEST - acc_size_response;
self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request -
acc_size_response;
if free_space < MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED {
break
@ -966,17 +980,37 @@ impl Future for GetPooledTxRequestFut {
pub struct TransactionFetcherInfo {
/// Currently active outgoing [`GetPooledTransactions`] requests.
pub(super) max_inflight_requests: usize,
/// Soft limit for the byte size of the expected
/// [`PooledTransactions`] response on packing a
/// [`GetPooledTransactions`] request with hashes.
soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize,
/// Soft limit for the byte size of a [`PooledTransactions`]
/// response on assembling a [`GetPooledTransactions`]
/// request. Spec'd at 2 MiB.
pub(super) soft_limit_byte_size_pooled_transactions_response: usize,
}
impl TransactionFetcherInfo {
pub fn new(max_inflight_transaction_requests: usize) -> Self {
Self { max_inflight_requests: max_inflight_transaction_requests }
pub fn new(
max_inflight_transaction_requests: usize,
soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize,
soft_limit_byte_size_pooled_transactions_response: usize,
) -> Self {
Self {
max_inflight_requests: max_inflight_transaction_requests,
soft_limit_byte_size_pooled_transactions_response_on_pack_request,
soft_limit_byte_size_pooled_transactions_response,
}
}
}
impl Default for TransactionFetcherInfo {
fn default() -> Self {
Self::new(DEFAULT_MAX_COUNT_INFLIGHT_REQUESTS_ON_FETCH_PENDING_HASHES)
Self::new(
DEFAULT_MAX_COUNT_INFLIGHT_REQUESTS_ON_FETCH_PENDING_HASHES,
DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_PACK_GET_POOLED_TRANSACTIONS_REQUEST,
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE
)
}
}

View File

@ -69,18 +69,21 @@ use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
use tracing::{debug, trace};
mod config;
mod constants;
mod fetcher;
mod validation;
pub use config::{TransactionFetcherConfig, TransactionsManagerConfig};
use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
pub use validation::*;
use self::constants::{
tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE,
pub use self::constants::{
tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_PACK_GET_POOLED_TRANSACTIONS_REQUEST,
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
};
use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
/// The future for inserting a function into the pool
pub type PoolImportFuture = Pin<Box<dyn Future<Output = Vec<PoolResult<TxHash>>> + Send + 'static>>;
@ -237,11 +240,14 @@ impl<Pool: TransactionPool> TransactionsManager<Pool> {
network: NetworkHandle,
pool: Pool,
from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent>,
transactions_manager_config: TransactionsManagerConfig,
) -> Self {
let network_events = network.event_listener();
let (command_tx, command_rx) = mpsc::unbounded_channel();
let transaction_fetcher = TransactionFetcher::default();
let transaction_fetcher = TransactionFetcher::default().with_transaction_fetcher_config(
&transactions_manager_config.transaction_fetcher_config,
);
// install a listener for new pending transactions that are allowed to be propagated over
// the network
@ -326,7 +332,7 @@ where
let transactions = self.pool.get_pooled_transaction_elements(
request.0,
GetPooledTransactionLimit::ResponseSizeSoftLimit(
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
),
);
@ -1503,11 +1509,12 @@ mod tests {
let pool = testing_pool();
let transactions_manager_config = config.transactions_manager_config.clone();
let (_network_handle, _network, transactions, _) = NetworkManager::new(config)
.await
.unwrap()
.into_builder()
.transactions(pool.clone())
.transactions(pool.clone(), transactions_manager_config)
.split_with_handle();
transactions
@ -1553,11 +1560,12 @@ mod tests {
.disable_discovery()
.listener_port(0)
.build(client);
let transactions_manager_config = config.transactions_manager_config.clone();
let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
.await
.unwrap()
.into_builder()
.transactions(pool.clone())
.transactions(pool.clone(), transactions_manager_config)
.split_with_handle();
tokio::task::spawn(network);
@ -1635,11 +1643,12 @@ mod tests {
.disable_discovery()
.listener_port(0)
.build(client);
let transactions_manager_config = config.transactions_manager_config.clone();
let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
.await
.unwrap()
.into_builder()
.transactions(pool.clone())
.transactions(pool.clone(), transactions_manager_config)
.split_with_handle();
tokio::task::spawn(network);
@ -1723,11 +1732,12 @@ mod tests {
.disable_discovery()
.listener_port(0)
.build(client);
let transactions_manager_config = config.transactions_manager_config.clone();
let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
.await
.unwrap()
.into_builder()
.transactions(pool.clone())
.transactions(pool.clone(), transactions_manager_config)
.split_with_handle();
tokio::task::spawn(network);
@ -1812,11 +1822,12 @@ mod tests {
.disable_discovery()
.listener_port(0)
.build(client);
let transactions_manager_config = config.transactions_manager_config.clone();
let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
.await
.unwrap()
.into_builder()
.transactions(pool.clone())
.transactions(pool.clone(), transactions_manager_config)
.split_with_handle();
tokio::task::spawn(network);

View File

@ -260,12 +260,13 @@ async fn test_connect_to_trusted_peer() {
let client = NoopProvider::default();
let config = NetworkConfigBuilder::new(secret_key).discovery(discv4).build(client);
let transactions_manager_config = config.transactions_manager_config.clone();
let (handle, network, transactions, requests) = NetworkManager::new(config)
.await
.unwrap()
.into_builder()
.request_handler(client)
.transactions(testing_pool())
.transactions(testing_pool(), transactions_manager_config)
.split_with_handle();
let mut events = handle.event_listener();

View File

@ -23,7 +23,10 @@ use reth_db::{
database_metrics::{DatabaseMetadata, DatabaseMetrics},
};
use reth_interfaces::p2p::either::EitherDownloader;
use reth_network::{NetworkBuilder, NetworkEvents, NetworkHandle};
use reth_network::{
transactions::{TransactionFetcherConfig, TransactionsManagerConfig},
NetworkBuilder, NetworkEvents, NetworkHandle,
};
use reth_node_core::{
cli::config::{PayloadBuilderConfig, RethRpcConfig, RethTransactionPoolConfig},
dirs::{ChainPath, DataDirPath},
@ -731,8 +734,20 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
where
Pool: TransactionPool + Unpin + 'static,
{
let (handle, network, txpool, eth) =
builder.transactions(pool).request_handler(self.provider().clone()).split_with_handle();
let (handle, network, txpool, eth) = builder
.transactions(
pool,
TransactionsManagerConfig {
transaction_fetcher_config: TransactionFetcherConfig::new(
self.config.network.soft_limit_byte_size_pooled_transactions_response,
self.config
.network
.soft_limit_byte_size_pooled_transactions_response_on_pack_request,
),
},
)
.request_handler(self.provider().clone())
.split_with_handle();
self.executor.spawn_critical("p2p txpool", txpool);
self.executor.spawn_critical("p2p eth request handler", eth);

View File

@ -46,6 +46,7 @@ reth-blockchain-tree.workspace = true
revm-inspectors.workspace = true
reth-snapshot.workspace = true
reth-optimism-payload-builder = { workspace = true, optional = true }
reth-eth-wire.workspace = true
# async
tokio.workspace = true

View File

@ -5,7 +5,14 @@ use clap::Args;
use reth_config::Config;
use reth_discv4::{DEFAULT_DISCOVERY_ADDR, DEFAULT_DISCOVERY_PORT};
use reth_net_nat::NatResolver;
use reth_network::{HelloMessageWithProtocols, NetworkConfigBuilder};
use reth_network::{
transactions::{
TransactionFetcherConfig, TransactionsManagerConfig,
DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_PACK_GET_POOLED_TRANSACTIONS_REQUEST,
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
},
HelloMessageWithProtocols, NetworkConfigBuilder,
};
use reth_primitives::{mainnet_nodes, ChainSpec, NodeRecord};
use secp256k1::SecretKey;
use std::{net::Ipv4Addr, path::PathBuf, sync::Arc};
@ -73,6 +80,23 @@ pub struct NetworkArgs {
/// Maximum number of inbound requests. default: 30
#[arg(long)]
pub max_inbound_peers: Option<usize>,
/// Soft limit for the byte size of a [`PooledTransactions`](reth_eth_wire::PooledTransactions)
/// response on assembling a [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions)
/// request. Spec'd at 2 MiB.
///
/// <https://github.com/ethereum/devp2p/blob/master/caps/eth.md#protocol-messages>.
#[arg(long = "pooled-tx-response-soft-limit", value_name = "BYTES", default_value_t = SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE, help = "Sets the soft limit for the byte size of pooled transactions response. Specified at 2 MiB by default. This is a spec'd value that should only be set for experimental purposes on a testnet.")]
pub soft_limit_byte_size_pooled_transactions_response: usize,
/// Default soft limit for the byte size of a
/// [`PooledTransactions`](reth_eth_wire::PooledTransactions) response on assembling a
/// [`GetPooledTransactions`](reth_eth_wire::PooledTransactions) request. This defaults to less
/// than the [`SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE`], at 2 MiB, used when
/// assembling a [`PooledTransactions`](reth_eth_wire::PooledTransactions) response. Default
/// is 128 KiB.
#[arg(long = "pooled-tx-pack-soft-limit", value_name = "BYTES", default_value_t = DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_PACK_GET_POOLED_TRANSACTIONS_REQUEST)]
pub soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize,
}
impl NetworkArgs {
@ -98,12 +122,21 @@ impl NetworkArgs {
.with_max_inbound_opt(self.max_inbound_peers)
.with_max_outbound_opt(self.max_outbound_peers);
// Configure transactions manager
let transactions_manager_config = TransactionsManagerConfig {
transaction_fetcher_config: TransactionFetcherConfig::new(
self.soft_limit_byte_size_pooled_transactions_response,
self.soft_limit_byte_size_pooled_transactions_response_on_pack_request,
),
};
// Configure basic network stack
let mut network_config_builder = config
.network_config(self.nat, self.persistent_peers_file(peers_file), secret_key)
.peer_config(peer_config)
.boot_nodes(self.bootnodes.clone().unwrap_or(chain_bootnodes))
.chain_spec(chain_spec);
.chain_spec(chain_spec)
.transactions_manager_config(transactions_manager_config);
// Configure node identity
let peer_id = network_config_builder.get_peer_id();
@ -155,6 +188,9 @@ impl Default for NetworkArgs {
port: DEFAULT_DISCOVERY_PORT,
max_outbound_peers: None,
max_inbound_peers: None,
soft_limit_byte_size_pooled_transactions_response:
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
soft_limit_byte_size_pooled_transactions_response_on_pack_request: DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_PACK_GET_POOLED_TRANSACTIONS_REQUEST,
}
}
}

View File

@ -4,8 +4,8 @@ use crate::cli::config::RethTransactionPoolConfig;
use clap::Args;
use reth_primitives::Address;
use reth_transaction_pool::{
LocalTransactionConfig, PoolConfig, PriceBumpConfig, SubPoolLimit, DEFAULT_PRICE_BUMP,
REPLACE_BLOB_PRICE_BUMP, TXPOOL_MAX_ACCOUNT_SLOTS_PER_SENDER,
validate::DEFAULT_MAX_TX_INPUT_BYTES, LocalTransactionConfig, PoolConfig, PriceBumpConfig,
SubPoolLimit, DEFAULT_PRICE_BUMP, REPLACE_BLOB_PRICE_BUMP, TXPOOL_MAX_ACCOUNT_SLOTS_PER_SENDER,
TXPOOL_SUBPOOL_MAX_SIZE_MB_DEFAULT, TXPOOL_SUBPOOL_MAX_TXS_DEFAULT,
};
/// Parameters for debugging purposes
@ -44,6 +44,11 @@ pub struct TxPoolArgs {
/// Price bump percentage to replace an already existing blob transaction
#[arg(long = "blobpool.pricebump", default_value_t = REPLACE_BLOB_PRICE_BUMP)]
pub blob_transaction_price_bump: u128,
/// Max size in bytes of a single transaction allowed to enter the pool
#[arg(long = "txpool.max_tx_input_bytes", default_value_t = DEFAULT_MAX_TX_INPUT_BYTES)]
pub max_tx_input_bytes: usize,
/// Flag to disable local transaction exemptions.
#[arg(long = "txpool.nolocals")]
pub no_locals: bool,
@ -67,6 +72,7 @@ impl Default for TxPoolArgs {
max_account_slots: TXPOOL_MAX_ACCOUNT_SLOTS_PER_SENDER,
price_bump: DEFAULT_PRICE_BUMP,
blob_transaction_price_bump: REPLACE_BLOB_PRICE_BUMP,
max_tx_input_bytes: DEFAULT_MAX_TX_INPUT_BYTES,
no_locals: false,
locals: Default::default(),
no_local_transactions_propagation: false,

View File

@ -35,7 +35,10 @@ use reth_interfaces::{
},
RethResult,
};
use reth_network::{NetworkBuilder, NetworkConfig, NetworkHandle, NetworkManager};
use reth_network::{
transactions::{TransactionFetcherConfig, TransactionsManagerConfig},
NetworkBuilder, NetworkConfig, NetworkHandle, NetworkManager,
};
use reth_node_api::ConfigureEvmEnv;
use reth_primitives::{
constants::eip4844::{LoadKzgSettingsError, MAINNET_KZG_TRUSTED_SETUP},
@ -470,6 +473,8 @@ impl NodeConfig {
.kzg_settings(self.kzg_settings()?)
// use an additional validation task so we can validate transactions in parallel
.with_additional_tasks(1)
// set the max tx size in bytes allowed to enter the pool
.with_max_tx_input_bytes(self.txpool.max_tx_input_bytes)
.build_with_tasks(blockchain_db.clone(), executor.clone(), blob_store.clone());
let transaction_pool =
@ -625,8 +630,19 @@ impl NodeConfig {
C: BlockReader + HeaderProvider + Clone + Unpin + 'static,
Pool: TransactionPool + Unpin + 'static,
{
let (handle, network, txpool, eth) =
builder.transactions(pool).request_handler(client).split_with_handle();
let (handle, network, txpool, eth) = builder
.transactions(
pool, // Configure transactions manager
TransactionsManagerConfig {
transaction_fetcher_config: TransactionFetcherConfig::new(
self.network.soft_limit_byte_size_pooled_transactions_response,
self.network
.soft_limit_byte_size_pooled_transactions_response_on_pack_request,
),
},
)
.request_handler(client)
.split_with_handle();
task_executor.spawn_critical("p2p txpool", txpool);
task_executor.spawn_critical("p2p eth request handler", eth);

View File

@ -4,11 +4,12 @@
/// O(maxslots), where max slots are 4 currently).
pub const TX_SLOT_BYTE_SIZE: usize = 32 * 1024;
/// [`MAX_TX_INPUT_BYTES`] is the maximum size a single transaction can have. This field has
/// non-trivial consequences: larger transactions are significantly harder and
/// [`DEFAULT_MAX_TX_INPUT_BYTES`] is the default maximum size a single transaction can have. This
/// field has non-trivial consequences: larger transactions are significantly harder and
/// more expensive to propagate; larger transactions also take more resources
/// to validate whether they fit into the pool or not.
pub const MAX_TX_INPUT_BYTES: usize = 4 * TX_SLOT_BYTE_SIZE; // 128KB
/// to validate whether they fit into the pool or not. Default is 4 times [`TX_SLOT_BYTE_SIZE`],
/// which defaults to 32 KiB, so 128 KiB.
pub const DEFAULT_MAX_TX_INPUT_BYTES: usize = 4 * TX_SLOT_BYTE_SIZE; // 128KB
/// Maximum bytecode to permit for a contract.
pub const MAX_CODE_BYTE_SIZE: usize = 24576;

View File

@ -4,7 +4,7 @@ use crate::{
blobstore::BlobStore,
error::{Eip4844PoolTransactionError, InvalidPoolTransactionError},
traits::TransactionOrigin,
validate::{ValidTransaction, ValidationTask, MAX_INIT_CODE_BYTE_SIZE, MAX_TX_INPUT_BYTES},
validate::{ValidTransaction, ValidationTask, MAX_INIT_CODE_BYTE_SIZE},
EthBlobTransactionSidecar, EthPoolTransaction, LocalTransactionConfig, PoolTransaction,
TransactionValidationOutcome, TransactionValidationTaskExecutor, TransactionValidator,
};
@ -29,6 +29,8 @@ use tokio::sync::Mutex;
#[cfg(feature = "optimism")]
use reth_revm::optimism::RethL1BlockInfo;
use super::constants::DEFAULT_MAX_TX_INPUT_BYTES;
/// Validator for Ethereum transactions.
#[derive(Debug, Clone)]
pub struct EthTransactionValidator<Client, T> {
@ -118,6 +120,8 @@ pub(crate) struct EthTransactionValidatorInner<Client, T> {
kzg_settings: Arc<KzgSettings>,
/// How to handle [TransactionOrigin::Local](TransactionOrigin) transactions.
local_transactions_config: LocalTransactionConfig,
/// Maximum size in bytes a single transaction can have in order to be accepted into the pool.
max_tx_input_bytes: usize,
/// Marker for the transaction type
_marker: PhantomData<T>,
}
@ -192,11 +196,11 @@ where
};
// Reject transactions over defined size to prevent DOS attacks
if transaction.size() > MAX_TX_INPUT_BYTES {
if transaction.size() > self.max_tx_input_bytes {
let size = transaction.size();
return TransactionValidationOutcome::Invalid(
transaction,
InvalidPoolTransactionError::OversizedData(size, MAX_TX_INPUT_BYTES),
InvalidPoolTransactionError::OversizedData(size, self.max_tx_input_bytes),
)
}
@ -480,6 +484,8 @@ pub struct EthTransactionValidatorBuilder {
kzg_settings: Arc<KzgSettings>,
/// How to handle [TransactionOrigin::Local](TransactionOrigin) transactions.
local_transactions_config: LocalTransactionConfig,
/// Max size in bytes of a single transaction allowed
max_tx_input_bytes: usize,
}
impl EthTransactionValidatorBuilder {
@ -495,6 +501,7 @@ impl EthTransactionValidatorBuilder {
additional_tasks: 1,
kzg_settings: Arc::clone(&MAINNET_KZG_TRUSTED_SETUP),
local_transactions_config: Default::default(),
max_tx_input_bytes: DEFAULT_MAX_TX_INPUT_BYTES,
// by default all transaction types are allowed
eip2718: true,
@ -589,6 +596,12 @@ impl EthTransactionValidatorBuilder {
self
}
/// Sets a max size in bytes of a single transaction allowed into the pool
pub const fn with_max_tx_input_bytes(mut self, max_tx_input_bytes: usize) -> Self {
self.max_tx_input_bytes = max_tx_input_bytes;
self
}
/// Builds a the [EthTransactionValidator] without spawning validator tasks.
pub fn build<Client, Tx, S>(
self,
@ -609,6 +622,7 @@ impl EthTransactionValidatorBuilder {
minimum_priority_fee,
kzg_settings,
local_transactions_config,
max_tx_input_bytes,
..
} = self;
@ -627,6 +641,7 @@ impl EthTransactionValidatorBuilder {
blob_store: Box::new(blob_store),
kzg_settings,
local_transactions_config,
max_tx_input_bytes,
_marker: Default::default(),
};

View File

@ -23,7 +23,7 @@ pub use task::{TransactionValidationTaskExecutor, ValidationTask};
/// Validation constants.
pub use constants::{
MAX_CODE_BYTE_SIZE, MAX_INIT_CODE_BYTE_SIZE, MAX_TX_INPUT_BYTES, TX_SLOT_BYTE_SIZE,
DEFAULT_MAX_TX_INPUT_BYTES, MAX_CODE_BYTE_SIZE, MAX_INIT_CODE_BYTE_SIZE, TX_SLOT_BYTE_SIZE,
};
/// A Result type returned after checking a transaction's validity.

View File

@ -35,10 +35,12 @@ async fn main() -> eyre::Result<()> {
// Configure the network
let config = NetworkConfig::builder(local_key).mainnet_boot_nodes().build(client);
let transactions_manager_config = config.transactions_manager_config.clone();
// create the network instance
let (_handle, network, txpool, _) =
NetworkManager::builder(config).await?.transactions(pool.clone()).split_with_handle();
let (_handle, network, txpool, _) = NetworkManager::builder(config)
.await?
.transactions(pool.clone(), transactions_manager_config)
.split_with_handle();
// spawn the network task
tokio::task::spawn(network);