chore: some blob reinject improvements (#5441)

This commit is contained in:
Matthias Seitz
2023-11-15 20:04:03 +01:00
committed by GitHub
parent 8459fb0ada
commit 187f6faa09
8 changed files with 47 additions and 5 deletions

View File

@ -75,6 +75,10 @@ impl BlobStore for DiskFileBlobStore {
self.inner.get_one(tx)
}
fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
self.inner.contains(tx)
}
fn get_all(
&self,
txs: Vec<B256>,
@ -183,6 +187,15 @@ impl DiskFileBlobStoreInner {
Ok(())
}
/// Returns true if the blob for the given transaction hash is in the blob cache or on disk.
fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
if self.blob_cache.lock().get(&tx).is_some() {
return Ok(true)
}
// we only check if the file exists and assume it's valid
Ok(self.blob_disk_file(tx).is_file())
}
/// Retrieves the blob for the given transaction hash from the blob cache or disk.
fn get_one(&self, tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
if let Some(blob) = self.blob_cache.lock().get(&tx) {
@ -438,6 +451,7 @@ mod tests {
assert!(blobs.contains(&(tx, blob)), "missing blob {:?}", tx);
}
assert!(store.contains(all_hashes[0]).unwrap());
store.delete_all(all_hashes.clone()).unwrap();
store.clear_cache();
@ -446,6 +460,7 @@ mod tests {
let all = store.get_all(all_hashes.clone()).unwrap();
assert!(all.is_empty());
assert!(!store.contains(all_hashes[0]).unwrap());
assert!(store.get_exact(all_hashes).is_err());
}
}

View File

@ -67,6 +67,11 @@ impl BlobStore for InMemoryBlobStore {
Ok(store.get(&tx).cloned())
}
fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
let store = self.inner.store.read();
Ok(store.contains_key(&tx))
}
fn get_all(
&self,
txs: Vec<B256>,

View File

@ -34,6 +34,9 @@ pub trait BlobStore: fmt::Debug + Send + Sync + 'static {
/// Retrieves the decoded blob data for the given transaction hash.
fn get(&self, tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError>;
/// Checks if the given transaction hash is in the blob store.
fn contains(&self, tx: B256) -> Result<bool, BlobStoreError>;
/// Retrieves all decoded blob data for the given transaction hashes.
///
/// This only returns the blobs that were found in the store.

View File

@ -27,6 +27,10 @@ impl BlobStore for NoopBlobStore {
Ok(None)
}
fn contains(&self, _tx: B256) -> Result<bool, BlobStoreError> {
Ok(false)
}
fn get_all(
&self,
_txs: Vec<B256>,

View File

@ -22,6 +22,8 @@ impl BlobStoreCanonTracker {
}
/// Adds all blocks to the tracked list of blocks.
///
/// Replaces any previously tracked blocks with the set of transactions.
pub fn add_blocks(
&mut self,
blocks: impl IntoIterator<Item = (BlockNumber, impl IntoIterator<Item = B256>)>,
@ -32,6 +34,9 @@ impl BlobStoreCanonTracker {
}
/// Adds all blob transactions from the given chain to the tracker.
///
/// Note: In case this is a chain that's part of a reorg, this replaces previously tracked
/// blocks.
pub fn add_new_chain_blocks(&mut self, blocks: &ChainBlocks<'_>) {
let blob_txs = blocks.iter().map(|(num, blocks)| {
let iter =
@ -42,10 +47,12 @@ impl BlobStoreCanonTracker {
}
/// Invoked when a block is finalized.
pub fn on_finalized_block(&mut self, number: BlockNumber) -> BlobStoreUpdates {
///
/// This returns all blob transactions that were included in blocks that are now finalized.
pub fn on_finalized_block(&mut self, finalized_block: BlockNumber) -> BlobStoreUpdates {
let mut finalized = Vec::new();
while let Some(entry) = self.blob_txs_in_blocks.first_entry() {
if *entry.key() <= number {
if *entry.key() <= finalized_block {
finalized.extend(entry.remove_entry().1);
} else {
break

View File

@ -351,6 +351,9 @@ where
origin: TransactionOrigin,
transactions: Vec<Self::Transaction>,
) -> PoolResult<Vec<PoolResult<TxHash>>> {
if transactions.is_empty() {
return Ok(Vec::new())
}
let validated = self.validate_all(origin, transactions).await?;
let transactions =

View File

@ -304,11 +304,12 @@ pub async fn maintain_transaction_pool<Client, P, St, Tasks>(
// to be re-injected
//
// Note: we no longer know if the tx was local or external
// Because the transactions are not finalized, the corresponding blobs are still in
// blob store (if we previously received them from the network)
metrics.inc_reinserted_transactions(pruned_old_transactions.len());
let _ = pool.add_external_transactions(pruned_old_transactions).await;
// keep track of mined blob transactions
// TODO(mattsse): handle reorged transactions
// keep track of new mined blob transactions
blob_store_tracker.add_new_chain_blocks(&new_blocks);
}
CanonStateNotification::Commit { new } => {

View File

@ -307,7 +307,11 @@ where
)
}
EthBlobTransactionSidecar::Missing => {
if let Ok(Some(_)) = self.blob_store.get(*transaction.hash()) {
// This can happen for re-injected blob transactions (on re-org), since the blob
// is stripped from the transaction and not included in a block.
// check if the blob is in the store, if it's included we previously validated
// it and inserted it
if let Ok(true) = self.blob_store.contains(*transaction.hash()) {
// validated transaction is already in the store
} else {
return TransactionValidationOutcome::Invalid(