mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat: integrate blobstore in pool (#4266)
This commit is contained in:
@ -65,7 +65,9 @@ use reth_stages::{
|
||||
MetricEventsSender, MetricsListener,
|
||||
};
|
||||
use reth_tasks::TaskExecutor;
|
||||
use reth_transaction_pool::{TransactionPool, TransactionValidationTaskExecutor};
|
||||
use reth_transaction_pool::{
|
||||
blobstore::InMemoryBlobStore, TransactionPool, TransactionValidationTaskExecutor,
|
||||
};
|
||||
use secp256k1::SecretKey;
|
||||
use std::{
|
||||
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
|
||||
@ -269,6 +271,7 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
|
||||
ctx.task_executor.clone(),
|
||||
1,
|
||||
),
|
||||
InMemoryBlobStore::default(),
|
||||
self.txpool.pool_config(),
|
||||
);
|
||||
info!(target: "reth::cli", "Transaction pool initialized");
|
||||
|
||||
@ -29,6 +29,7 @@
|
||||
//! - providing existing transactions
|
||||
//! - ordering and providing the best transactions for block production
|
||||
//! - monitoring memory footprint and enforce pool size limits
|
||||
//! - storing blob data for transactions in a separate blobstore on insertion
|
||||
//!
|
||||
//! ## Assumptions
|
||||
//!
|
||||
@ -86,6 +87,13 @@
|
||||
//! that provides the `TransactionPool` interface.
|
||||
//!
|
||||
//!
|
||||
//! ## Blob Transactions
|
||||
//!
|
||||
//! Blob transaction can be quite large hence they are stored in a separate blobstore. The pool is
|
||||
//! responsible for inserting blob data for new transactions into the blobstore.
|
||||
//! See also [ValidTransaction](validate::ValidTransaction)
|
||||
//!
|
||||
//!
|
||||
//! ## Examples
|
||||
//!
|
||||
//! Listen for new transactions and print them:
|
||||
@ -95,9 +103,11 @@
|
||||
//! use reth_provider::{ChainSpecProvider, StateProviderFactory};
|
||||
//! use reth_tasks::TokioTaskExecutor;
|
||||
//! use reth_transaction_pool::{TransactionValidationTaskExecutor, Pool, TransactionPool};
|
||||
//! use reth_transaction_pool::blobstore::InMemoryBlobStore;
|
||||
//! async fn t<C>(client: C) where C: StateProviderFactory + ChainSpecProvider + Clone + 'static{
|
||||
//! let pool = Pool::eth_pool(
|
||||
//! TransactionValidationTaskExecutor::eth(client, MAINNET.clone(), TokioTaskExecutor::default()),
|
||||
//! InMemoryBlobStore::default(),
|
||||
//! Default::default(),
|
||||
//! );
|
||||
//! let mut transactions = pool.pending_transactions_listener();
|
||||
@ -120,6 +130,7 @@
|
||||
//! use reth_provider::{BlockReaderIdExt, CanonStateNotification, ChainSpecProvider, StateProviderFactory};
|
||||
//! use reth_tasks::TokioTaskExecutor;
|
||||
//! use reth_transaction_pool::{TransactionValidationTaskExecutor, Pool};
|
||||
//! use reth_transaction_pool::blobstore::InMemoryBlobStore;
|
||||
//! use reth_transaction_pool::maintain::maintain_transaction_pool_future;
|
||||
//! async fn t<C, St>(client: C, stream: St)
|
||||
//! where C: StateProviderFactory + BlockReaderIdExt + ChainSpecProvider + Clone + 'static,
|
||||
@ -127,6 +138,7 @@
|
||||
//! {
|
||||
//! let pool = Pool::eth_pool(
|
||||
//! TransactionValidationTaskExecutor::eth(client.clone(), MAINNET.clone(), TokioTaskExecutor::default()),
|
||||
//! InMemoryBlobStore::default(),
|
||||
//! Default::default(),
|
||||
//! );
|
||||
//!
|
||||
@ -151,6 +163,7 @@ use std::{
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
use tracing::{instrument, trace};
|
||||
|
||||
use crate::blobstore::BlobStore;
|
||||
pub use crate::{
|
||||
config::{
|
||||
PoolConfig, PriceBumpConfig, SubPoolLimit, DEFAULT_PRICE_BUMP, REPLACE_BLOB_PRICE_BUMP,
|
||||
@ -194,25 +207,26 @@ pub mod test_utils;
|
||||
|
||||
/// A shareable, generic, customizable `TransactionPool` implementation.
|
||||
#[derive(Debug)]
|
||||
pub struct Pool<V: TransactionValidator, T: TransactionOrdering> {
|
||||
pub struct Pool<V, T: TransactionOrdering, S> {
|
||||
/// Arc'ed instance of the pool internals
|
||||
pool: Arc<PoolInner<V, T>>,
|
||||
pool: Arc<PoolInner<V, T, S>>,
|
||||
}
|
||||
|
||||
// === impl Pool ===
|
||||
|
||||
impl<V, T> Pool<V, T>
|
||||
impl<V, T, S> Pool<V, T, S>
|
||||
where
|
||||
V: TransactionValidator,
|
||||
T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
|
||||
S: BlobStore,
|
||||
{
|
||||
/// Create a new transaction pool instance.
|
||||
pub fn new(validator: V, ordering: T, config: PoolConfig) -> Self {
|
||||
Self { pool: Arc::new(PoolInner::new(validator, ordering, config)) }
|
||||
pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
|
||||
Self { pool: Arc::new(PoolInner::new(validator, ordering, blob_store, config)) }
|
||||
}
|
||||
|
||||
/// Returns the wrapped pool.
|
||||
pub(crate) fn inner(&self) -> &PoolInner<V, T> {
|
||||
pub(crate) fn inner(&self) -> &PoolInner<V, T, S> {
|
||||
&self.pool
|
||||
}
|
||||
|
||||
@ -261,13 +275,15 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<Client>
|
||||
impl<Client, S>
|
||||
Pool<
|
||||
TransactionValidationTaskExecutor<EthTransactionValidator<Client, EthPooledTransaction>>,
|
||||
CoinbaseTipOrdering<EthPooledTransaction>,
|
||||
S,
|
||||
>
|
||||
where
|
||||
Client: StateProviderFactory + Clone + 'static,
|
||||
S: BlobStore,
|
||||
{
|
||||
/// Returns a new [Pool] that uses the default [TransactionValidationTaskExecutor] when
|
||||
/// validating [EthPooledTransaction]s and ords via [CoinbaseTipOrdering]
|
||||
@ -279,9 +295,11 @@ where
|
||||
/// use reth_primitives::MAINNET;
|
||||
/// use reth_tasks::TokioTaskExecutor;
|
||||
/// use reth_transaction_pool::{TransactionValidationTaskExecutor, Pool};
|
||||
/// use reth_transaction_pool::blobstore::InMemoryBlobStore;
|
||||
/// # fn t<C>(client: C) where C: StateProviderFactory + Clone + 'static{
|
||||
/// let pool = Pool::eth_pool(
|
||||
/// TransactionValidationTaskExecutor::eth(client, MAINNET.clone(), TokioTaskExecutor::default()),
|
||||
/// InMemoryBlobStore::default(),
|
||||
/// Default::default(),
|
||||
/// );
|
||||
/// # }
|
||||
@ -290,18 +308,20 @@ where
|
||||
validator: TransactionValidationTaskExecutor<
|
||||
EthTransactionValidator<Client, EthPooledTransaction>,
|
||||
>,
|
||||
blob_store: S,
|
||||
config: PoolConfig,
|
||||
) -> Self {
|
||||
Self::new(validator, CoinbaseTipOrdering::default(), config)
|
||||
Self::new(validator, CoinbaseTipOrdering::default(), blob_store, config)
|
||||
}
|
||||
}
|
||||
|
||||
/// implements the `TransactionPool` interface for various transaction pool API consumers.
|
||||
#[async_trait::async_trait]
|
||||
impl<V, T> TransactionPool for Pool<V, T>
|
||||
impl<V, T, S> TransactionPool for Pool<V, T, S>
|
||||
where
|
||||
V: TransactionValidator,
|
||||
T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
|
||||
S: BlobStore,
|
||||
{
|
||||
type Transaction = T::Transaction;
|
||||
|
||||
@ -440,10 +460,11 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<V: TransactionValidator, T: TransactionOrdering> TransactionPoolExt for Pool<V, T>
|
||||
impl<V: TransactionValidator, T: TransactionOrdering, S> TransactionPoolExt for Pool<V, T, S>
|
||||
where
|
||||
V: TransactionValidator,
|
||||
T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
|
||||
S: BlobStore,
|
||||
{
|
||||
#[instrument(skip(self), target = "txpool")]
|
||||
fn set_block_info(&self, info: BlockInfo) {
|
||||
@ -460,7 +481,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<V: TransactionValidator, T: TransactionOrdering> Clone for Pool<V, T> {
|
||||
impl<V, T: TransactionOrdering, S> Clone for Pool<V, T, S> {
|
||||
fn clone(&self) -> Self {
|
||||
Self { pool: Arc::clone(&self.pool) }
|
||||
}
|
||||
|
||||
@ -38,6 +38,16 @@ pub struct TxPoolMetrics {
|
||||
pub(crate) performed_state_updates: Counter,
|
||||
}
|
||||
|
||||
/// Transaction pool blobstore metrics
|
||||
#[derive(Metrics)]
|
||||
#[metrics(scope = "transaction_pool")]
|
||||
pub struct BlobStoreMetrics {
|
||||
/// Number of failed inserts into the blobstore
|
||||
pub(crate) blobstore_failed_inserts: Counter,
|
||||
/// Number of failed deletes into the blobstore
|
||||
pub(crate) blobstore_failed_deletes: Counter,
|
||||
}
|
||||
|
||||
/// Transaction pool maintenance metrics
|
||||
#[derive(Metrics)]
|
||||
#[metrics(scope = "transaction_pool")]
|
||||
|
||||
@ -82,7 +82,7 @@ use crate::{
|
||||
};
|
||||
use best::BestTransactions;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use reth_primitives::{Address, TxHash, H256};
|
||||
use reth_primitives::{Address, BlobTransactionSidecar, TxHash, H256};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
fmt,
|
||||
@ -90,14 +90,15 @@ use std::{
|
||||
time::Instant,
|
||||
};
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::{debug, trace};
|
||||
use tracing::{debug, trace, warn};
|
||||
|
||||
mod events;
|
||||
pub use events::{FullTransactionEvent, TransactionEvent};
|
||||
|
||||
mod listener;
|
||||
use crate::{
|
||||
pool::txpool::UpdateOutcome, traits::PendingTransactionListenerKind, validate::ValidTransaction,
|
||||
blobstore::BlobStore, metrics::BlobStoreMetrics, pool::txpool::UpdateOutcome,
|
||||
traits::PendingTransactionListenerKind, validate::ValidTransaction,
|
||||
};
|
||||
pub use listener::{AllTransactionsEvents, TransactionEvents};
|
||||
|
||||
@ -110,11 +111,16 @@ pub mod txpool;
|
||||
mod update;
|
||||
|
||||
/// Transaction pool internals.
|
||||
pub struct PoolInner<V: TransactionValidator, T: TransactionOrdering> {
|
||||
pub struct PoolInner<V, T, S>
|
||||
where
|
||||
T: TransactionOrdering,
|
||||
{
|
||||
/// Internal mapping of addresses to plain ints.
|
||||
identifiers: RwLock<SenderIdentifiers>,
|
||||
/// Transaction validation.
|
||||
validator: V,
|
||||
/// Storage for blob transactions
|
||||
blob_store: S,
|
||||
/// The internal pool that manages all transactions.
|
||||
pool: RwLock<TxPool<T>>,
|
||||
/// Pool settings.
|
||||
@ -125,17 +131,20 @@ pub struct PoolInner<V: TransactionValidator, T: TransactionOrdering> {
|
||||
pending_transaction_listener: Mutex<Vec<PendingTransactionListener>>,
|
||||
/// Listeners for new transactions added to the pool.
|
||||
transaction_listener: Mutex<Vec<mpsc::Sender<NewTransactionEvent<T::Transaction>>>>,
|
||||
/// Metrics for the blob store
|
||||
blob_store_metrics: BlobStoreMetrics,
|
||||
}
|
||||
|
||||
// === impl PoolInner ===
|
||||
|
||||
impl<V, T> PoolInner<V, T>
|
||||
impl<V, T, S> PoolInner<V, T, S>
|
||||
where
|
||||
V: TransactionValidator,
|
||||
T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
|
||||
S: BlobStore,
|
||||
{
|
||||
/// Create a new transaction pool instance.
|
||||
pub(crate) fn new(validator: V, ordering: T, config: PoolConfig) -> Self {
|
||||
pub(crate) fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
|
||||
Self {
|
||||
identifiers: Default::default(),
|
||||
validator,
|
||||
@ -144,6 +153,8 @@ where
|
||||
pending_transaction_listener: Default::default(),
|
||||
transaction_listener: Default::default(),
|
||||
config,
|
||||
blob_store,
|
||||
blob_store_metrics: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -316,7 +327,8 @@ where
|
||||
let transaction_id = TransactionId::new(sender_id, transaction.nonce());
|
||||
let encoded_length = transaction.encoded_length();
|
||||
|
||||
let (transaction, _maybe_sidecar) = match transaction {
|
||||
// split the valid transaction and the blob sidecar if it has any
|
||||
let (transaction, maybe_sidecar) = match transaction {
|
||||
ValidTransaction::Valid(tx) => (tx, None),
|
||||
ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
|
||||
debug_assert!(
|
||||
@ -339,6 +351,16 @@ where
|
||||
let added = self.pool.write().add_transaction(tx, balance, state_nonce)?;
|
||||
let hash = *added.hash();
|
||||
|
||||
// transaction was successfully inserted into the pool
|
||||
if let Some(sidecar) = maybe_sidecar {
|
||||
// store the sidecar in the blob store
|
||||
self.insert_blob(hash, sidecar);
|
||||
}
|
||||
if let Some(replaced) = added.replaced_blob_transaction() {
|
||||
// delete the replaced transaction from the blob store
|
||||
self.delete_blob(replaced);
|
||||
}
|
||||
|
||||
// Notify about new pending transactions
|
||||
if let Some(pending) = added.as_pending() {
|
||||
self.on_new_pending_transaction(pending);
|
||||
@ -625,9 +647,25 @@ where
|
||||
pub(crate) fn discard_worst(&self) -> HashSet<TxHash> {
|
||||
self.pool.write().discard_worst().into_iter().map(|tx| *tx.hash()).collect()
|
||||
}
|
||||
|
||||
/// Inserts a blob transaction into the blob store
|
||||
fn insert_blob(&self, hash: TxHash, blob: BlobTransactionSidecar) {
|
||||
if let Err(err) = self.blob_store.insert(hash, blob) {
|
||||
warn!(target: "txpool", ?err, "[{:?}] failed to insert blob", hash);
|
||||
self.blob_store_metrics.blobstore_failed_inserts.increment(1);
|
||||
}
|
||||
}
|
||||
|
||||
/// Delete a blob from the blob store
|
||||
fn delete_blob(&self, blob: TxHash) {
|
||||
if let Err(err) = self.blob_store.delete(blob) {
|
||||
warn!(target: "txpool", ?err, "[{:?}] failed to delete blobs", blob);
|
||||
self.blob_store_metrics.blobstore_failed_deletes.increment(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<V: TransactionValidator, T: TransactionOrdering> fmt::Debug for PoolInner<V, T> {
|
||||
impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("PoolInner").field("config", &self.config).finish_non_exhaustive()
|
||||
}
|
||||
@ -723,6 +761,19 @@ impl<T: PoolTransaction> AddedTransaction<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the the replaced transaction if there was one
|
||||
pub(crate) fn replaced(&self) -> Option<&Arc<ValidPoolTransaction<T>>> {
|
||||
match self {
|
||||
AddedTransaction::Pending(tx) => tx.replaced.as_ref(),
|
||||
AddedTransaction::Parked { replaced, .. } => replaced.as_ref(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the hash of the replaced transaction if it is a blob transaction.
|
||||
pub(crate) fn replaced_blob_transaction(&self) -> Option<H256> {
|
||||
self.replaced().filter(|tx| tx.transaction.is_eip4844()).map(|tx| *tx.transaction.hash())
|
||||
}
|
||||
|
||||
/// Returns the hash of the transaction
|
||||
pub(crate) fn hash(&self) -> &TxHash {
|
||||
match self {
|
||||
|
||||
@ -5,15 +5,16 @@ mod mock;
|
||||
mod pool;
|
||||
|
||||
use crate::{
|
||||
noop::MockTransactionValidator, Pool, PoolTransaction, TransactionOrigin,
|
||||
TransactionValidationOutcome, TransactionValidator,
|
||||
blobstore::InMemoryBlobStore, noop::MockTransactionValidator, Pool, PoolTransaction,
|
||||
TransactionOrigin, TransactionValidationOutcome, TransactionValidator,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
pub use mock::*;
|
||||
use std::{marker::PhantomData, sync::Arc};
|
||||
|
||||
/// A [Pool] used for testing
|
||||
pub type TestPool = Pool<MockTransactionValidator<MockTransaction>, MockOrdering>;
|
||||
pub type TestPool =
|
||||
Pool<MockTransactionValidator<MockTransaction>, MockOrdering, InMemoryBlobStore>;
|
||||
|
||||
/// Returns a new [Pool] used for testing purposes
|
||||
pub fn testing_pool() -> TestPool {
|
||||
@ -23,5 +24,5 @@ pub fn testing_pool() -> TestPool {
|
||||
pub fn testing_pool_with_validator(
|
||||
validator: MockTransactionValidator<MockTransaction>,
|
||||
) -> TestPool {
|
||||
Pool::new(validator, MockOrdering::default(), Default::default())
|
||||
Pool::new(validator, MockOrdering::default(), InMemoryBlobStore::default(), Default::default())
|
||||
}
|
||||
|
||||
@ -10,8 +10,9 @@
|
||||
use reth_network::{config::rng_secret_key, NetworkConfig, NetworkManager};
|
||||
use reth_provider::test_utils::NoopProvider;
|
||||
use reth_transaction_pool::{
|
||||
validate::ValidTransaction, CoinbaseTipOrdering, EthPooledTransaction, PoolTransaction,
|
||||
TransactionOrigin, TransactionPool, TransactionValidationOutcome, TransactionValidator,
|
||||
blobstore::InMemoryBlobStore, validate::ValidTransaction, CoinbaseTipOrdering,
|
||||
EthPooledTransaction, PoolTransaction, TransactionOrigin, TransactionPool,
|
||||
TransactionValidationOutcome, TransactionValidator,
|
||||
};
|
||||
|
||||
#[tokio::main]
|
||||
@ -25,6 +26,7 @@ async fn main() -> eyre::Result<()> {
|
||||
let pool = reth_transaction_pool::Pool::new(
|
||||
OkValidator::default(),
|
||||
CoinbaseTipOrdering::default(),
|
||||
InMemoryBlobStore::default(),
|
||||
Default::default(),
|
||||
);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user