feat: remove finalized blobs (#4342)

This commit is contained in:
Matthias Seitz
2023-08-24 17:44:25 +02:00
committed by GitHub
parent f9b6a64ccb
commit 0beaf85f4b
3 changed files with 56 additions and 9 deletions

View File

@ -4,7 +4,7 @@ pub use mem::InMemoryBlobStore;
pub use noop::NoopBlobStore;
use reth_primitives::{BlobTransactionSidecar, H256};
use std::fmt;
pub use tracker::BlobStoreCanonTracker;
pub use tracker::{BlobStoreCanonTracker, BlobStoreUpdates};
mod mem;
mod noop;

View File

@ -13,7 +13,7 @@ pub struct BlobStoreCanonTracker {
impl BlobStoreCanonTracker {
/// Adds a block to the blob store maintenance.
pub(crate) fn add_block(
pub fn add_block(
&mut self,
block_number: BlockNumber,
blob_txs: impl IntoIterator<Item = H256>,
@ -22,7 +22,7 @@ impl BlobStoreCanonTracker {
}
/// Adds all blocks to the tracked list of blocks.
pub(crate) fn add_blocks(
pub fn add_blocks(
&mut self,
blocks: impl IntoIterator<Item = (BlockNumber, impl IntoIterator<Item = H256>)>,
) {
@ -32,7 +32,7 @@ impl BlobStoreCanonTracker {
}
/// Adds all blob transactions from the given chain to the tracker.
pub(crate) fn add_new_chain_blocks(&mut self, blocks: &ChainBlocks<'_>) {
pub fn add_new_chain_blocks(&mut self, blocks: &ChainBlocks<'_>) {
let blob_txs = blocks.iter().map(|(num, blocks)| {
let iter =
blocks.body.iter().filter(|tx| tx.transaction.is_eip4844()).map(|tx| tx.hash);
@ -42,8 +42,7 @@ impl BlobStoreCanonTracker {
}
/// Invoked when a block is finalized.
#[allow(unused)]
pub(crate) fn on_finalized_block(&mut self, number: BlockNumber) -> BlobStoreUpdates {
pub fn on_finalized_block(&mut self, number: BlockNumber) -> BlobStoreUpdates {
let mut finalized = Vec::new();
while let Some(entry) = self.blob_txs_in_blocks.first_entry() {
if *entry.key() <= number {
@ -63,7 +62,7 @@ impl BlobStoreCanonTracker {
/// Updates that should be applied to the blob store.
#[derive(Debug, Eq, PartialEq)]
pub(crate) enum BlobStoreUpdates {
pub enum BlobStoreUpdates {
/// No updates.
None,
/// Delete the given finalized transactions from the blob store.

View File

@ -1,7 +1,7 @@
//! Support for maintaining the state of the transaction pool
use crate::{
blobstore::BlobStoreCanonTracker,
blobstore::{BlobStoreCanonTracker, BlobStoreUpdates},
metrics::MaintainPoolMetrics,
traits::{CanonicalStateUpdate, ChangedAccount, TransactionPoolExt},
BlockInfo, TransactionPool,
@ -10,7 +10,9 @@ use futures_util::{
future::{BoxFuture, Fuse, FusedFuture},
FutureExt, Stream, StreamExt,
};
use reth_primitives::{Address, BlockHash, BlockNumberOrTag, FromRecoveredTransaction};
use reth_primitives::{
Address, BlockHash, BlockNumber, BlockNumberOrTag, FromRecoveredTransaction,
};
use reth_provider::{
BlockReaderIdExt, CanonStateNotification, ChainSpecProvider, PostState, StateProviderFactory,
};
@ -97,6 +99,10 @@ pub async fn maintain_transaction_pool<Client, P, St, Tasks>(
// keeps track of mined blob transaction so we can clean finalized transactions
let mut blob_store_tracker = BlobStoreCanonTracker::default();
// keeps track of the latest finalized block
let mut last_finalized_block =
FinalizedBlockTracker::new(client.finalized_block_number().ok().flatten());
// keeps track of any dirty accounts that we know of are out of sync with the pool
let mut dirty_addresses = HashSet::new();
@ -154,6 +160,19 @@ pub async fn maintain_transaction_pool<Client, P, St, Tasks>(
task_spawner.spawn_blocking(fut);
}
// check if we have a new finalized block
if let Some(finalized) =
last_finalized_block.update(client.finalized_block_number().ok().flatten())
{
match blob_store_tracker.on_finalized_block(finalized) {
BlobStoreUpdates::None => {}
BlobStoreUpdates::Finalized(blobs) => {
// remove all finalized blobs from the blob store
pool.delete_blobs(blobs);
}
}
}
// outcomes of the futures we are waiting on
let mut event = None;
let mut reloaded = None;
@ -360,6 +379,35 @@ pub async fn maintain_transaction_pool<Client, P, St, Tasks>(
}
}
struct FinalizedBlockTracker {
last_finalized_block: Option<BlockNumber>,
}
impl FinalizedBlockTracker {
fn new(last_finalized_block: Option<BlockNumber>) -> Self {
Self { last_finalized_block }
}
/// Updates the tracked finalized block and returns the new finalized block if it changed
fn update(&mut self, finalized_block: Option<BlockNumber>) -> Option<BlockNumber> {
match (self.last_finalized_block, finalized_block) {
(Some(last), Some(finalized)) => {
self.last_finalized_block = Some(finalized);
if last < finalized {
Some(finalized)
} else {
None
}
}
(None, Some(finalized)) => {
self.last_finalized_block = Some(finalized);
Some(finalized)
}
_ => None,
}
}
}
/// Keeps track of the pool's state, whether the accounts in the pool are in sync with the actual
/// state.
#[derive(Debug, Eq, PartialEq)]