feat(txpool): add a few more maintain checks (#2593)

This commit is contained in:
Matthias Seitz
2023-05-06 22:45:39 +02:00
committed by GitHub
parent 3c14fdab25
commit d067e762ba

View File

@ -12,11 +12,16 @@ use std::{
collections::HashSet,
hash::{Hash, Hasher},
};
use tracing::warn;
use tracing::{debug, warn};
/// Maximum (reorg) depth we handle when updating the transaction pool: `new.number -
/// last_seen.number`
const MAX_UPDATE_DEPTH: u64 = 64;
/// Maintains the state of the transaction pool by handling new blocks and reorgs.
///
/// This listens for any new blocks and reorgs and updates the transaction pool's state accordingly
#[allow(unused)]
pub async fn maintain_transaction_pool<Client, V, T, St>(
client: Client,
pool: Pool<V, T>,
@ -38,20 +43,34 @@ pub async fn maintain_transaction_pool<Client, V, T, St>(
pool.set_block_info(info);
}
// keeps track of any dirty accounts that we failed to fetch the state for and need to retry
let mut dirty = HashSet::new();
// keeps track of any dirty accounts that we know of are out of sync with the pool
let mut dirty_addresses = HashSet::new();
// keeps track of the state of the pool wrt to blocks
let mut maintained_state = MaintainedPoolState::InSync;
// Listen for new chain events and derive the update action for the pool
while let Some(event) = events.next().await {
let pool_info = pool.block_info();
// TODO check dirty accounts from time to time
// TODO from time to time re-check the unique accounts in the pool and remove and resync
// based on the tracked state
match event {
CanonStateNotification::Reorg { old, new } => {
let (old_blocks, old_state) = old.inner();
let (new_blocks, new_state) = new.inner();
let new_tip = new_blocks.tip();
let new_first = new_blocks.first();
let old_first = old_blocks.first();
// check if the reorg is not canonical with the pool's block
if !(old_first.parent_hash == pool_info.last_seen_block_hash ||
new_first.parent_hash == pool_info.last_seen_block_hash)
{
// the new block points to a higher block than the oldest block in the old chain
maintained_state = MaintainedPoolState::Drift;
}
// base fee for the next block: `new_tip+1`
let pending_block_base_fee =
@ -65,15 +84,15 @@ pub async fn maintain_transaction_pool<Client, V, T, St>(
let missing_changed_acc = old_state
.accounts()
.keys()
.filter(|addr| !new_changed_accounts.contains(*addr))
.copied();
.copied()
.filter(|addr| !new_changed_accounts.contains(addr));
// for these we need to fetch the nonce+balance from the db at the new tip
let mut changed_accounts =
match load_accounts(&client, new_tip.hash, missing_changed_acc) {
Ok(LoadedAccounts { accounts, failed_to_load }) => {
// extend accounts we failed to load from database
dirty.extend(failed_to_load);
dirty_addresses.extend(failed_to_load);
accounts
}
@ -84,7 +103,7 @@ pub async fn maintain_transaction_pool<Client, V, T, St>(
"failed to load missing changed accounts at new tip: {:?}",
new_tip.hash
);
dirty.extend(addresses);
dirty_addresses.extend(addresses);
vec![]
}
};
@ -97,7 +116,7 @@ pub async fn maintain_transaction_pool<Client, V, T, St>(
let new_mined_transactions: HashSet<_> =
new_blocks.transactions().map(|tx| tx.hash).collect();
// update the pool then reinject the pruned transactions
// update the pool then re-inject the pruned transactions
// find all transactions that were mined in the old chain but not in the new chain
let pruned_old_transactions = old_blocks
.transactions()
@ -132,6 +151,7 @@ pub async fn maintain_transaction_pool<Client, V, T, St>(
let (blocks, state) = old.inner();
let first_block = blocks.first();
if first_block.hash == pool_info.last_seen_block_hash {
// nothing to update
continue
@ -140,7 +160,14 @@ pub async fn maintain_transaction_pool<Client, V, T, St>(
// base fee for the next block: `first_block+1`
let pending_block_base_fee =
first_block.next_block_base_fee().unwrap_or_default() as u128;
let changed_accounts = changed_accounts_iter(state).collect();
let mut changed_accounts = Vec::with_capacity(state.accounts().len());
for acc in changed_accounts_iter(state) {
// we can always clear the dirty flag for this account
dirty_addresses.remove(&acc.address);
changed_accounts.push(acc);
}
let update = CanonicalStateUpdate {
hash: first_block.hash,
number: first_block.number,
@ -166,18 +193,46 @@ pub async fn maintain_transaction_pool<Client, V, T, St>(
// TODO: metrics
}
CanonStateNotification::Commit { new } => {
// TODO skip large commits?
let (blocks, state) = new.inner();
let tip = blocks.tip();
// base fee for the next block: `tip+1`
let pending_block_base_fee = tip.next_block_base_fee().unwrap_or_default() as u128;
let first_block = blocks.first();
// check if the range of the commit is canonical
if first_block.parent_hash == pool_info.last_seen_block_hash {
let changed_accounts = changed_accounts_iter(state).collect();
// check if the depth is too large and should be skipped, this could happen after
// initial sync or long re-sync
let depth = tip.number.abs_diff(pool_info.last_seen_block_number);
if depth > MAX_UPDATE_DEPTH {
maintained_state = MaintainedPoolState::Drift;
debug!(?depth, "skipping deep canonical update");
let info = BlockInfo {
last_seen_block_hash: tip.hash,
last_seen_block_number: tip.number,
pending_basefee: pending_block_base_fee,
};
pool.set_block_info(info);
continue
}
let mut changed_accounts = Vec::with_capacity(state.accounts().len());
for acc in changed_accounts_iter(state) {
// we can always clear the dirty flag for this account
dirty_addresses.remove(&acc.address);
changed_accounts.push(acc);
}
let mined_transactions = blocks.transactions().map(|tx| tx.hash).collect();
// check if the range of the commit is canonical with the pool's block
if first_block.parent_hash != pool_info.last_seen_block_hash {
// we received a new canonical chain commit but the commit is not canonical with
// the pool's block, this could happen after initial sync or
// long re-sync
maintained_state = MaintainedPoolState::Drift;
}
// Canonical update
let update = CanonicalStateUpdate {
hash: tip.hash,
@ -187,15 +242,21 @@ pub async fn maintain_transaction_pool<Client, V, T, St>(
mined_transactions,
};
pool.on_canonical_state_change(update);
} else {
// TODO is this even reachable, because all commits are canonical?
// this a canonical
}
}
}
}
}
/// Keeps track of the pool's state, whether the accounts in the pool are in sync with the actual
/// state.
#[derive(Eq, PartialEq)]
enum MaintainedPoolState {
/// Pool is assumed to be in sync with the state
InSync,
/// Pool could be out of sync with the state
Drift,
}
/// A unique ChangedAccount identified by its address that can be used for deduplication
#[derive(Eq)]
struct ChangedAccountEntry(ChangedAccount);
@ -229,6 +290,8 @@ struct LoadedAccounts {
/// Loads all accounts at the given state
///
/// Returns an error with all given addresses if the state is not available.
///
/// Note: this expects _unique_ addresses
fn load_accounts<Client, I>(
client: &Client,
at: BlockHash,
@ -258,6 +321,7 @@ where
Ok(res)
}
/// Extracts all changed accounts from the PostState
fn changed_accounts_iter(state: &PostState) -> impl Iterator<Item = ChangedAccount> + '_ {
state.accounts().iter().filter_map(|(addr, acc)| acc.map(|acc| (addr, acc))).map(
|(address, acc)| ChangedAccount {