feat(txpool): add pool manage task (#2298)

Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
Matthias Seitz
2023-04-26 11:10:02 +02:00
committed by GitHub
parent 607186de71
commit 4763aad11e
10 changed files with 450 additions and 53 deletions

2
Cargo.lock generated
View File

@ -5447,6 +5447,8 @@ dependencies = [
"parking_lot 0.12.1", "parking_lot 0.12.1",
"paste", "paste",
"rand 0.8.5", "rand 0.8.5",
"reth-consensus-common",
"reth-interfaces",
"reth-metrics-derive", "reth-metrics-derive",
"reth-primitives", "reth-primitives",
"reth-provider", "reth-provider",

View File

@ -4,9 +4,9 @@ use crate::PostState;
use reth_interfaces::{executor::Error as ExecError, Error}; use reth_interfaces::{executor::Error as ExecError, Error};
use reth_primitives::{ use reth_primitives::{
BlockHash, BlockNumHash, BlockNumber, ForkBlock, Receipt, SealedBlock, SealedBlockWithSenders, BlockHash, BlockNumHash, BlockNumber, ForkBlock, Receipt, SealedBlock, SealedBlockWithSenders,
TxHash, TransactionSigned, TxHash,
}; };
use std::collections::BTreeMap; use std::{borrow::Cow, collections::BTreeMap};
/// A chain of blocks and their final state. /// A chain of blocks and their final state.
/// ///
@ -64,8 +64,14 @@ impl Chain {
/// Destructure the chain into its inner components, the blocks and the state at the tip of the /// Destructure the chain into its inner components, the blocks and the state at the tip of the
/// chain. /// chain.
pub fn into_inner(self) -> (ChainBlocks, PostState) { pub fn into_inner(self) -> (ChainBlocks<'static>, PostState) {
(ChainBlocks { blocks: self.blocks }, self.state) (ChainBlocks { blocks: Cow::Owned(self.blocks) }, self.state)
}
/// Destructure the chain into its inner components, the blocks and the state at the tip of the
/// chain.
pub fn inner(&self) -> (ChainBlocks<'_>, &PostState) {
(ChainBlocks { blocks: Cow::Borrowed(&self.blocks) }, &self.state)
} }
/// Get the block at which this chain forked. /// Get the block at which this chain forked.
@ -202,16 +208,16 @@ impl Chain {
/// All blocks in the chain /// All blocks in the chain
#[derive(Clone, Debug, Default, PartialEq, Eq)] #[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct ChainBlocks { pub struct ChainBlocks<'a> {
blocks: BTreeMap<BlockNumber, SealedBlockWithSenders>, blocks: Cow<'a, BTreeMap<BlockNumber, SealedBlockWithSenders>>,
} }
impl ChainBlocks { impl<'a> ChainBlocks<'a> {
/// Creates a consuming iterator over all blocks in the chain with increasing block number. /// Creates a consuming iterator over all blocks in the chain with increasing block number.
/// ///
/// Note: this always yields at least one block. /// Note: this always yields at least one block.
pub fn into_blocks(self) -> impl Iterator<Item = SealedBlockWithSenders> { pub fn into_blocks(self) -> impl Iterator<Item = SealedBlockWithSenders> {
self.blocks.into_values() self.blocks.into_owned().into_values()
} }
/// Creates an iterator over all blocks in the chain with increasing block number. /// Creates an iterator over all blocks in the chain with increasing block number.
@ -227,14 +233,29 @@ impl ChainBlocks {
pub fn tip(&self) -> &SealedBlockWithSenders { pub fn tip(&self) -> &SealedBlockWithSenders {
self.blocks.last_key_value().expect("Chain should have at least one block").1 self.blocks.last_key_value().expect("Chain should have at least one block").1
} }
/// Get the _first_ block of the chain.
///
/// # Note
///
/// Chains always have at least one block.
pub fn first(&self) -> &SealedBlockWithSenders {
self.blocks.first_key_value().expect("Chain should have at least one block").1
}
/// Returns an iterator over all transactions in the chain.
pub fn transactions(&self) -> impl Iterator<Item = &TransactionSigned> + '_ {
self.blocks.values().flat_map(|block| block.body.iter())
}
} }
impl IntoIterator for ChainBlocks { impl<'a> IntoIterator for ChainBlocks<'a> {
type Item = (BlockNumber, SealedBlockWithSenders); type Item = (BlockNumber, SealedBlockWithSenders);
type IntoIter = std::collections::btree_map::IntoIter<BlockNumber, SealedBlockWithSenders>; type IntoIter = std::collections::btree_map::IntoIter<BlockNumber, SealedBlockWithSenders>;
fn into_iter(self) -> Self::IntoIter { fn into_iter(self) -> Self::IntoIter {
self.blocks.into_iter() #[allow(clippy::unnecessary_to_owned)]
self.blocks.into_owned().into_iter()
} }
} }

View File

@ -6,5 +6,7 @@ use reth_primitives::{Account, Address};
#[auto_impl(&, Arc, Box)] #[auto_impl(&, Arc, Box)]
pub trait AccountProvider: Send + Sync { pub trait AccountProvider: Send + Sync {
/// Get basic account information. /// Get basic account information.
///
/// Returns `None` if the account doesn't exist.
fn basic_account(&self, address: Address) -> Result<Option<Account>>; fn basic_account(&self, address: Address) -> Result<Option<Account>>;
} }

View File

@ -28,6 +28,8 @@ pub enum CanonStateNotification {
/// Chain reorgs and both old and new chain are returned. /// Chain reorgs and both old and new chain are returned.
Reorg { old: Arc<Chain>, new: Arc<Chain> }, Reorg { old: Arc<Chain>, new: Arc<Chain> },
/// Chain got reverted without reorg and only old chain is returned. /// Chain got reverted without reorg and only old chain is returned.
///
/// This reverts the chain's tip to the first block of the chain.
Revert { old: Arc<Chain> }, Revert { old: Arc<Chain> },
/// Chain got extended without reorg and only new chain is returned. /// Chain got extended without reorg and only new chain is returned.
Commit { new: Arc<Chain> }, Commit { new: Arc<Chain> },

View File

@ -18,8 +18,10 @@ normal = [
[dependencies] [dependencies]
# reth # reth
reth-consensus-common = { path = "../consensus/common" }
reth-primitives = { path = "../primitives" } reth-primitives = { path = "../primitives" }
reth-provider = { path = "../storage/provider" } reth-provider = { path = "../storage/provider" }
reth-interfaces = { path = "../interfaces" }
reth-rlp = { path = "../rlp" } reth-rlp = { path = "../rlp" }
# async/futures # async/futures

View File

@ -83,8 +83,9 @@ pub use crate::{
config::PoolConfig, config::PoolConfig,
ordering::{CostOrdering, TransactionOrdering}, ordering::{CostOrdering, TransactionOrdering},
traits::{ traits::{
BestTransactions, OnNewBlockEvent, PoolTransaction, PooledTransaction, PropagateKind, BestTransactions, BlockInfo, CanonicalStateUpdate, ChangedAccount, PoolTransaction,
PropagatedTransactions, TransactionOrigin, TransactionPool, PooledTransaction, PropagateKind, PropagatedTransactions, TransactionOrigin,
TransactionPool,
}, },
validate::{ validate::{
EthTransactionValidator, TransactionValidationOutcome, TransactionValidator, EthTransactionValidator, TransactionValidationOutcome, TransactionValidator,
@ -104,6 +105,7 @@ use tokio::sync::mpsc::Receiver;
mod config; mod config;
pub mod error; pub mod error;
mod identifier; mod identifier;
pub mod maintain;
pub mod metrics; pub mod metrics;
mod ordering; mod ordering;
pub mod pool; pub mod pool;
@ -225,12 +227,16 @@ where
{ {
type Transaction = T::Transaction; type Transaction = T::Transaction;
fn status(&self) -> PoolSize { fn pool_size(&self) -> PoolSize {
self.pool.size() self.pool.size()
} }
fn on_new_block(&self, event: OnNewBlockEvent) { fn block_info(&self) -> BlockInfo {
self.pool.on_new_block(event); self.pool.block_info()
}
fn on_canonical_state_change(&self, update: CanonicalStateUpdate) {
self.pool.on_canonical_state_change(update);
} }
async fn add_transaction( async fn add_transaction(

View File

@ -0,0 +1,284 @@
//! Support for maintaining the state of the transaction pool
use crate::{
traits::{CanonicalStateUpdate, ChangedAccount},
Pool, TransactionOrdering, TransactionPool, TransactionValidator,
};
use futures_util::{Stream, StreamExt};
use reth_consensus_common::validation::calculate_next_block_base_fee;
use reth_primitives::{Address, BlockHash, FromRecoveredTransaction};
use reth_provider::{BlockProvider, CanonStateNotification, PostState, StateProviderFactory};
use std::{
borrow::Borrow,
collections::HashSet,
hash::{Hash, Hasher},
};
use tracing::warn;
/// Maintains the state of the transaction pool by handling new blocks and reorgs.
///
/// This listens for any new blocks and reorgs and updates the transaction pool's state accordingly
pub async fn maintain_transaction_pool<Client, V, T, St>(
client: Client,
pool: Pool<V, T>,
mut events: St,
) where
Client: StateProviderFactory + BlockProvider + 'static,
V: TransactionValidator,
T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
St: Stream<Item = CanonStateNotification> + Unpin + 'static,
{
// TODO set current head for the pool
// keeps track of any dirty accounts that we failed to fetch the state for and need to retry
let mut dirty = HashSet::new();
// Listen for new chain events and derive the update action for the pool
while let Some(event) = events.next().await {
let pool_info = pool.block_info();
// TODO check dirty accounts from time to time
match event {
CanonStateNotification::Reorg { old, new } => {
let (old_blocks, old_state) = old.inner();
let (new_blocks, new_state) = new.inner();
let new_tip = new_blocks.tip();
// base fee for the next block: `new_tip+1`
let pending_block_base_fee = calculate_next_block_base_fee(
new_tip.gas_used,
new_tip.gas_limit,
new_tip.base_fee_per_gas.unwrap_or_default(),
) as u128;
// we know all changed account in the new chain
let new_changed_accounts: HashSet<_> =
changed_accounts_iter(new_state).map(ChangedAccountEntry).collect();
// find all accounts that were changed in the old chain but _not_ in the new chain
let missing_changed_acc = old_state
.accounts()
.keys()
.filter(|addr| !new_changed_accounts.contains(*addr))
.copied();
// for these we need to fetch the nonce+balance from the db at the new tip
let mut changed_accounts =
match load_accounts(&client, new_tip.hash, missing_changed_acc) {
Ok(LoadedAccounts { accounts, failed_to_load }) => {
// extend accounts we failed to load from database
dirty.extend(failed_to_load);
accounts
}
Err(err) => {
let (addresses, err) = *err;
warn!(
?err,
"failed to load missing changed accounts at new tip: {:?}",
new_tip.hash
);
dirty.extend(addresses);
vec![]
}
};
// also include all accounts from new chain
// we can use extend here because they are unique
changed_accounts.extend(new_changed_accounts.into_iter().map(|entry| entry.0));
// all transactions mined in the new chain
let new_mined_transactions: HashSet<_> =
new_blocks.transactions().map(|tx| tx.hash).collect();
// update the pool then reinject the pruned transactions
// find all transactions that were mined in the old chain but not in the new chain
let pruned_old_transactions = old_blocks
.transactions()
.filter(|tx| !new_mined_transactions.contains(&tx.hash))
.filter_map(|tx| tx.clone().into_ecrecovered())
.map(|tx| {
<V as TransactionValidator>::Transaction::from_recovered_transaction(tx)
})
.collect();
// update the pool first
let update = CanonicalStateUpdate {
hash: new_tip.hash,
number: new_tip.number,
pending_block_base_fee,
changed_accounts,
// all transactions mined in the new chain need to be removed from the pool
mined_transactions: new_mined_transactions.into_iter().collect(),
};
pool.on_canonical_state_change(update);
// all transactions that were mined in the old chain but not in the new chain need
// to be re-injected
//
// Note: we no longer know if the tx was local or external
let _ = pool.add_external_transactions(pruned_old_transactions).await;
// TODO: metrics
}
CanonStateNotification::Revert { old } => {
// this similar to the inverse of a commit where we need to insert the transactions
// back into the pool and update the pool's state accordingly
let (blocks, state) = old.inner();
let first_block = blocks.first();
if first_block.hash == pool_info.last_seen_block_hash {
// nothing to update
continue
}
// base fee for the next block: `first_block+1`
let pending_block_base_fee = calculate_next_block_base_fee(
first_block.gas_used,
first_block.gas_limit,
first_block.base_fee_per_gas.unwrap_or_default(),
) as u128;
let changed_accounts = changed_accounts_iter(state).collect();
let update = CanonicalStateUpdate {
hash: first_block.hash,
number: first_block.number,
pending_block_base_fee,
changed_accounts,
// no tx to prune in the reverted chain
mined_transactions: vec![],
};
pool.on_canonical_state_change(update);
let pruned_old_transactions = blocks
.transactions()
.filter_map(|tx| tx.clone().into_ecrecovered())
.map(|tx| {
<V as TransactionValidator>::Transaction::from_recovered_transaction(tx)
})
.collect();
// all transactions that were mined in the old chain need to be re-injected
//
// Note: we no longer know if the tx was local or external
let _ = pool.add_external_transactions(pruned_old_transactions).await;
// TODO: metrics
}
CanonStateNotification::Commit { new } => {
// TODO skip large commits?
let (blocks, state) = new.inner();
let tip = blocks.tip();
// base fee for the next block: `tip+1`
let pending_block_base_fee = calculate_next_block_base_fee(
tip.gas_used,
tip.gas_limit,
tip.base_fee_per_gas.unwrap_or_default(),
) as u128;
let first_block = blocks.first();
// check if the range of the commit is canonical
if first_block.parent_hash == pool_info.last_seen_block_hash {
let changed_accounts = changed_accounts_iter(state).collect();
let mined_transactions = blocks.transactions().map(|tx| tx.hash).collect();
// Canonical update
let update = CanonicalStateUpdate {
hash: tip.hash,
number: tip.number,
pending_block_base_fee,
changed_accounts,
mined_transactions,
};
pool.on_canonical_state_change(update);
} else {
// TODO is this even reachable, because all commits are canonical?
// this a canonical
}
}
}
}
}
/// A unique ChangedAccount identified by its address that can be used for deduplication
#[derive(Eq)]
struct ChangedAccountEntry(ChangedAccount);
impl PartialEq for ChangedAccountEntry {
fn eq(&self, other: &Self) -> bool {
self.0.address == other.0.address
}
}
impl Hash for ChangedAccountEntry {
fn hash<H: Hasher>(&self, state: &mut H) {
self.0.address.hash(state);
}
}
impl Borrow<Address> for ChangedAccountEntry {
fn borrow(&self) -> &Address {
&self.0.address
}
}
#[derive(Default)]
struct LoadedAccounts {
/// All accounts that were loaded
accounts: Vec<ChangedAccount>,
/// All accounts that failed to load
failed_to_load: Vec<Address>,
}
/// Loads all accounts at the given state
///
/// Returns an error with all given addresses if the state is not available.
fn load_accounts<Client, I>(
client: &Client,
at: BlockHash,
addresses: I,
) -> Result<LoadedAccounts, Box<(HashSet<Address>, reth_interfaces::Error)>>
where
I: Iterator<Item = Address>,
Client: StateProviderFactory,
{
let mut res = LoadedAccounts::default();
let state = match client.history_by_block_hash(at) {
Ok(state) => state,
Err(err) => return Err(Box::new((addresses.collect(), err))),
};
for addr in addresses {
if let Ok(maybe_acc) = state.basic_account(addr) {
let acc = maybe_acc
.map(|acc| ChangedAccount { address: addr, nonce: acc.nonce, balance: acc.balance })
.unwrap_or_else(|| ChangedAccount::empty(addr));
res.accounts.push(acc)
} else {
// failed to load account.
res.failed_to_load.push(addr);
}
}
Ok(res)
}
fn changed_accounts_iter(state: &PostState) -> impl Iterator<Item = ChangedAccount> + '_ {
state.accounts().iter().filter_map(|(addr, acc)| acc.map(|acc| (addr, acc))).map(
|(address, acc)| ChangedAccount {
address: *address,
nonce: acc.nonce,
balance: acc.balance,
},
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn changed_acc_entry() {
let changed_acc = ChangedAccountEntry(ChangedAccount::empty(Address::random()));
let mut copy = changed_acc.0;
copy.nonce = 10;
assert!(changed_acc.eq(&ChangedAccountEntry(copy)));
}
}

View File

@ -72,10 +72,11 @@ use crate::{
identifier::{SenderId, SenderIdentifiers, TransactionId}, identifier::{SenderId, SenderIdentifiers, TransactionId},
pool::{listener::PoolEventBroadcast, state::SubPool, txpool::TxPool}, pool::{listener::PoolEventBroadcast, state::SubPool, txpool::TxPool},
traits::{ traits::{
NewTransactionEvent, PoolSize, PoolTransaction, PropagatedTransactions, TransactionOrigin, BlockInfo, NewTransactionEvent, PoolSize, PoolTransaction, PropagatedTransactions,
TransactionOrigin,
}, },
validate::{TransactionValidationOutcome, ValidPoolTransaction}, validate::{TransactionValidationOutcome, ValidPoolTransaction},
OnNewBlockEvent, PoolConfig, TransactionOrdering, TransactionValidator, CanonicalStateUpdate, PoolConfig, TransactionOrdering, TransactionValidator,
}; };
use best::BestTransactions; use best::BestTransactions;
pub use events::TransactionEvent; pub use events::TransactionEvent;
@ -138,6 +139,11 @@ where
self.pool.read().size() self.pool.read().size()
} }
/// Returns the currently tracked block
pub(crate) fn block_info(&self) -> BlockInfo {
self.pool.read().block_info()
}
/// Returns the internal `SenderId` for this address /// Returns the internal `SenderId` for this address
pub(crate) fn get_sender_id(&self, addr: Address) -> SenderId { pub(crate) fn get_sender_id(&self, addr: Address) -> SenderId {
self.identifiers.write().sender_id_or_create(addr) self.identifiers.write().sender_id_or_create(addr)
@ -183,9 +189,9 @@ where
} }
/// Updates the entire pool after a new block was executed. /// Updates the entire pool after a new block was executed.
pub(crate) fn on_new_block(&self, block: OnNewBlockEvent) { pub(crate) fn on_canonical_state_change(&self, update: CanonicalStateUpdate) {
let outcome = self.pool.write().on_new_block(block); let outcome = self.pool.write().on_canonical_state_change(update);
self.notify_on_new_block(outcome); self.notify_on_new_state(outcome);
} }
/// Add a single validated transaction into the pool. /// Add a single validated transaction into the pool.
@ -313,8 +319,8 @@ where
} }
/// Notifies transaction listeners about changes after a block was processed. /// Notifies transaction listeners about changes after a block was processed.
fn notify_on_new_block(&self, outcome: OnNewBlockOutcome) { fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome) {
let OnNewBlockOutcome { mined, promoted, discarded, block_hash } = outcome; let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
let mut listener = self.event_listener.write(); let mut listener = self.event_listener.write();
@ -486,9 +492,9 @@ impl<T: PoolTransaction> AddedTransaction<T> {
} }
} }
/// Contains all state changes after a [`OnNewBlockEvent`] was processed /// Contains all state changes after a [`CanonicalStateUpdate`] was processed
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct OnNewBlockOutcome { pub(crate) struct OnNewCanonicalStateOutcome {
/// Hash of the block. /// Hash of the block.
pub(crate) block_hash: H256, pub(crate) block_hash: H256,
/// All mined transactions. /// All mined transactions.

View File

@ -10,11 +10,11 @@ use crate::{
pending::PendingPool, pending::PendingPool,
state::{SubPool, TxState}, state::{SubPool, TxState},
update::{Destination, PoolUpdate}, update::{Destination, PoolUpdate},
AddedPendingTransaction, AddedTransaction, OnNewBlockOutcome, AddedPendingTransaction, AddedTransaction, OnNewCanonicalStateOutcome,
}, },
traits::{PoolSize, StateDiff}, traits::{BlockInfo, PoolSize},
OnNewBlockEvent, PoolConfig, PoolResult, PoolTransaction, TransactionOrdering, CanonicalStateUpdate, ChangedAccount, PoolConfig, PoolResult, PoolTransaction,
ValidPoolTransaction, U256, TransactionOrdering, ValidPoolTransaction, U256,
}; };
use fnv::FnvHashMap; use fnv::FnvHashMap;
use reth_primitives::{constants::MIN_PROTOCOL_BASE_FEE, TxHash, H256}; use reth_primitives::{constants::MIN_PROTOCOL_BASE_FEE, TxHash, H256};
@ -120,6 +120,15 @@ impl<T: TransactionOrdering> TxPool<T> {
} }
} }
/// Returns the currently tracked block values
pub(crate) fn block_info(&self) -> BlockInfo {
BlockInfo {
last_seen_block_hash: self.all_transactions.last_seen_block_hash,
last_seen_block_number: self.all_transactions.last_seen_block_number,
pending_basefee: self.all_transactions.pending_basefee,
}
}
/// Updates the pool based on the changed base fee. /// Updates the pool based on the changed base fee.
/// ///
/// This enforces the dynamic fee requirement. /// This enforces the dynamic fee requirement.
@ -166,22 +175,26 @@ impl<T: TransactionOrdering> TxPool<T> {
/// ///
/// This removes all mined transactions, updates according to the new base fee and rechecks /// This removes all mined transactions, updates according to the new base fee and rechecks
/// sender allowance. /// sender allowance.
pub(crate) fn on_new_block(&mut self, event: OnNewBlockEvent) -> OnNewBlockOutcome { pub(crate) fn on_canonical_state_change(
&mut self,
event: CanonicalStateUpdate,
) -> OnNewCanonicalStateOutcome {
// Remove all transaction that were included in the block // Remove all transaction that were included in the block
for tx_hash in &event.mined_transactions { for tx_hash in &event.mined_transactions {
self.remove_transaction_by_hash(tx_hash); if self.remove_transaction_by_hash(tx_hash).is_some() {
// Update removed transactions metric // Update removed transactions metric
self.metrics.removed_transactions.increment(1); self.metrics.removed_transactions.increment(1);
}
} }
// Apply the state changes to the total set of transactions which triggers sub-pool updates. // Apply the state changes to the total set of transactions which triggers sub-pool updates.
let updates = let updates =
self.all_transactions.update(event.pending_block_base_fee, &event.state_changes); self.all_transactions.update(event.pending_block_base_fee, &event.changed_accounts);
// Process the sub-pool updates // Process the sub-pool updates
let UpdateOutcome { promoted, discarded } = self.process_updates(updates); let UpdateOutcome { promoted, discarded } = self.process_updates(updates);
OnNewBlockOutcome { OnNewCanonicalStateOutcome {
block_hash: event.hash, block_hash: event.hash,
mined: event.mined_transactions, mined: event.mined_transactions,
promoted, promoted,
@ -489,8 +502,6 @@ impl<T: TransactionOrdering> fmt::Debug for TxPool<T> {
/// This is the sole entrypoint that's guarding all sub-pools, all sub-pool actions are always /// This is the sole entrypoint that's guarding all sub-pools, all sub-pool actions are always
/// derived from this set. Updates returned from this type must be applied to the sub-pools. /// derived from this set. Updates returned from this type must be applied to the sub-pools.
pub(crate) struct AllTransactions<T: PoolTransaction> { pub(crate) struct AllTransactions<T: PoolTransaction> {
/// Expected base fee for the pending block.
pending_basefee: u128,
/// Minimum base fee required by the protocol. /// Minimum base fee required by the protocol.
/// ///
/// Transactions with a lower base fee will never be included by the chain /// Transactions with a lower base fee will never be included by the chain
@ -505,6 +516,12 @@ pub(crate) struct AllTransactions<T: PoolTransaction> {
txs: BTreeMap<TransactionId, PoolInternalTransaction<T>>, txs: BTreeMap<TransactionId, PoolInternalTransaction<T>>,
/// Tracks the number of transactions by sender that are currently in the pool. /// Tracks the number of transactions by sender that are currently in the pool.
tx_counter: FnvHashMap<SenderId, usize>, tx_counter: FnvHashMap<SenderId, usize>,
/// The current block number the pool keeps track of.
last_seen_block_number: u64,
/// The current block hash the pool keeps track of.
last_seen_block_hash: H256,
/// Expected base fee for the pending block.
pending_basefee: u128,
} }
impl<T: PoolTransaction> AllTransactions<T> { impl<T: PoolTransaction> AllTransactions<T> {
@ -572,7 +589,7 @@ impl<T: PoolTransaction> AllTransactions<T> {
pub(crate) fn update( pub(crate) fn update(
&mut self, &mut self,
pending_block_base_fee: u128, pending_block_base_fee: u128,
_state_diffs: &StateDiff, _changed_accounts: &[ChangedAccount],
) -> Vec<PoolUpdate> { ) -> Vec<PoolUpdate> {
// update new basefee // update new basefee
self.pending_basefee = pending_block_base_fee; self.pending_basefee = pending_block_base_fee;
@ -1010,12 +1027,14 @@ impl<T: PoolTransaction> Default for AllTransactions<T> {
fn default() -> Self { fn default() -> Self {
Self { Self {
max_account_slots: MAX_ACCOUNT_SLOTS_PER_SENDER, max_account_slots: MAX_ACCOUNT_SLOTS_PER_SENDER,
pending_basefee: Default::default(),
minimal_protocol_basefee: MIN_PROTOCOL_BASE_FEE, minimal_protocol_basefee: MIN_PROTOCOL_BASE_FEE,
block_gas_limit: 30_000_000, block_gas_limit: 30_000_000,
by_hash: Default::default(), by_hash: Default::default(),
txs: Default::default(), txs: Default::default(),
tx_counter: Default::default(), tx_counter: Default::default(),
last_seen_block_number: 0,
last_seen_block_hash: Default::default(),
pending_basefee: Default::default(),
} }
} }
} }

View File

@ -24,15 +24,20 @@ pub trait TransactionPool: Send + Sync + Clone {
/// The transaction type of the pool /// The transaction type of the pool
type Transaction: PoolTransaction; type Transaction: PoolTransaction;
/// Returns stats about the pool. /// Returns stats about the pool and all sub-pools.
fn status(&self) -> PoolSize; fn pool_size(&self) -> PoolSize;
/// Event listener for when a new block was mined. /// Returns the block the pool is currently tracking.
///
/// This tracks the block that the pool has last seen.
fn block_info(&self) -> BlockInfo;
/// Event listener for when the pool needs to be updated
/// ///
/// Implementers need to update the pool accordingly. /// Implementers need to update the pool accordingly.
/// For example the base fee of the pending block is determined after a block is mined which /// For example the base fee of the pending block is determined after a block is mined which
/// affects the dynamic fee requirement of pending transactions in the pool. /// affects the dynamic fee requirement of pending transactions in the pool.
fn on_new_block(&self, event: OnNewBlockEvent); fn on_canonical_state_change(&self, update: CanonicalStateUpdate);
/// Imports an _external_ transaction. /// Imports an _external_ transaction.
/// ///
@ -44,6 +49,17 @@ pub trait TransactionPool: Send + Sync + Clone {
self.add_transaction(TransactionOrigin::External, transaction).await self.add_transaction(TransactionOrigin::External, transaction).await
} }
/// Imports all _external_ transactions
///
///
/// Consumer: Utility
async fn add_external_transactions(
&self,
transactions: Vec<Self::Transaction>,
) -> PoolResult<Vec<PoolResult<TxHash>>> {
self.add_transactions(TransactionOrigin::External, transactions).await
}
/// Adds an _unvalidated_ transaction into the pool. /// Adds an _unvalidated_ transaction into the pool.
/// ///
/// Consumer: RPC /// Consumer: RPC
@ -108,7 +124,7 @@ pub trait TransactionPool: Send + Sync + Clone {
/// Removes all transactions corresponding to the given hashes. /// Removes all transactions corresponding to the given hashes.
/// ///
/// Also removes all dependent transactions. /// Also removes all _dependent_ transactions.
/// ///
/// Consumer: Block production /// Consumer: Block production
fn remove_transactions( fn remove_transactions(
@ -229,25 +245,48 @@ impl TransactionOrigin {
} }
} }
/// Event fired when a new block was mined /// Represents changes after a new canonical block or range of canonical blocks was added to the
/// chain.
///
/// It is expected that this is only used if the added blocks are canonical to the pool's last known
/// block hash. In other words, the first added block of the range must be the child of the last
/// known block hash.
///
/// This is used to update the pool state accordingly.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct OnNewBlockEvent { pub struct CanonicalStateUpdate {
/// Hash of the added block. /// Hash of the tip block.
pub hash: H256, pub hash: H256,
/// Number of the tip block.
pub number: u64,
/// EIP-1559 Base fee of the _next_ (pending) block /// EIP-1559 Base fee of the _next_ (pending) block
/// ///
/// The base fee of a block depends on the utilization of the last block and its base fee. /// The base fee of a block depends on the utilization of the last block and its base fee.
pub pending_block_base_fee: u128, pub pending_block_base_fee: u128,
/// Provides a set of state changes that affected the accounts. /// A set of changed accounts across a range of blocks.
pub state_changes: StateDiff, pub changed_accounts: Vec<ChangedAccount>,
/// All mined transactions in the block /// All mined transactions in the block range.
pub mined_transactions: Vec<H256>, pub mined_transactions: Vec<H256>,
} }
/// Contains a list of changed state /// Represents a changed account
#[derive(Debug, Clone)] #[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
pub struct StateDiff { pub struct ChangedAccount {
// TODO(mattsse) this could be an `Arc<revm::State>>` /// The address of the account.
pub address: Address,
/// Account nonce.
pub nonce: u64,
/// Account balance.
pub balance: U256,
}
// === impl ChangedAccount ===
impl ChangedAccount {
/// Creates a new `ChangedAccount` with the given address and 0 balance and nonce.
pub(crate) fn empty(address: Address) -> Self {
Self { address, nonce: 0, balance: U256::ZERO }
}
} }
/// An `Iterator` that only returns transactions that are ready to be executed. /// An `Iterator` that only returns transactions that are ready to be executed.
@ -480,3 +519,17 @@ pub struct PoolSize {
/// Reported size of transactions in the _queued_ sub-pool. /// Reported size of transactions in the _queued_ sub-pool.
pub queued_size: usize, pub queued_size: usize,
} }
/// Represents the current status of the pool.
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct BlockInfo {
/// Hash for the currently tracked block.
pub last_seen_block_hash: H256,
/// Current the currently tracked block.
pub last_seen_block_number: u64,
/// Currently enforced base fee: the threshold for the basefee sub-pool.
///
/// Note: this is the derived base fee of the _next_ block that builds on the clock the pool is
/// currently tracking.
pub pending_basefee: u128,
}