fix(txpool/blob): make blob delete failed more accurate (#9846)

Signed-off-by: jsvisa <delweng@gmail.com>
This commit is contained in:
Delweng
2024-07-29 17:37:07 +08:00
committed by GitHub
parent fe2af8fa5c
commit f0975a5f02
3 changed files with 40 additions and 19 deletions

View File

@ -62,11 +62,14 @@ impl BlobStore for DiskFileBlobStore {
}
fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
self.inner.txs_to_delete.write().insert(tx);
if self.inner.contains(tx)? {
self.inner.txs_to_delete.write().insert(tx);
}
Ok(())
}
fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
let txs = self.inner.retain_existing(txs)?;
self.inner.txs_to_delete.write().extend(txs);
Ok(())
}
@ -231,6 +234,23 @@ impl DiskFileBlobStoreInner {
Ok(self.blob_disk_file(tx).is_file())
}
/// Returns all the blob transactions which are in the cache or on the disk.
fn retain_existing(&self, txs: Vec<B256>) -> Result<Vec<B256>, BlobStoreError> {
let (in_cache, not_in_cache): (Vec<B256>, Vec<B256>) = {
let mut cache = self.blob_cache.lock();
txs.into_iter().partition(|tx| cache.get(tx).is_some())
};
let mut existing = in_cache;
for tx in not_in_cache {
if self.blob_disk_file(tx).is_file() {
existing.push(tx);
}
}
Ok(existing)
}
/// Retrieves the blob for the given transaction hash from the blob cache or disk.
fn get_one(&self, tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
if let Some(blob) = self.blob_cache.lock().get(&tx) {

View File

@ -188,20 +188,19 @@ pub async fn maintain_transaction_pool<Client, P, St, Tasks>(
if let Some(finalized) =
last_finalized_block.update(client.finalized_block_number().ok().flatten())
{
match blob_store_tracker.on_finalized_block(finalized) {
BlobStoreUpdates::None => {}
BlobStoreUpdates::Finalized(blobs) => {
metrics.inc_deleted_tracked_blobs(blobs.len());
// remove all finalized blobs from the blob store
pool.delete_blobs(blobs);
}
if let BlobStoreUpdates::Finalized(blobs) =
blob_store_tracker.on_finalized_block(finalized)
{
metrics.inc_deleted_tracked_blobs(blobs.len());
// remove all finalized blobs from the blob store
pool.delete_blobs(blobs);
// and also do periodic cleanup
let pool = pool.clone();
task_spawner.spawn_blocking(Box::pin(async move {
debug!(target: "txpool", finalized_block = %finalized, "cleaning up blob store");
pool.cleanup_blobs();
}));
}
// also do periodic cleanup of the blob store
let pool = pool.clone();
task_spawner.spawn_blocking(Box::pin(async move {
debug!(target: "txpool", finalized_block = %finalized, "cleaning up blob store");
pool.cleanup_blobs();
}));
}
// outcomes of the futures we are waiting on

View File

@ -61,14 +61,16 @@ pub struct BlobStoreMetrics {
#[derive(Metrics)]
#[metrics(scope = "transaction_pool")]
pub struct MaintainPoolMetrics {
/// Number of currently dirty addresses that need to be updated in the pool by fetching account
/// info
/// Gauge indicating the number of addresses with pending updates in the pool,
/// requiring their account information to be fetched.
pub(crate) dirty_accounts: Gauge,
/// How often the pool drifted from the canonical state.
/// Counter for the number of times the pool state diverged from the canonical blockchain
/// state.
pub(crate) drift_count: Counter,
/// Number of transaction reinserted into the pool after reorg.
/// Counter for the number of transactions reinserted into the pool following a blockchain
/// reorganization (reorg).
pub(crate) reinserted_transactions: Counter,
/// Number of transactions finalized blob transactions we were tracking.
/// Counter for the number of finalized blob transactions that have been removed from tracking.
pub(crate) deleted_tracked_finalized_blobs: Counter,
}