From 755fac08ddeec0f4260fa5565bac72d6cb392348 Mon Sep 17 00:00:00 2001 From: Kaushik Donthi Date: Wed, 30 Oct 2024 09:06:37 -0700 Subject: [PATCH] Wrap sidecar in arcs (#11554) Co-authored-by: Matthias Seitz --- crates/ethereum/payload/src/lib.rs | 2 +- crates/transaction-pool/src/blobstore/disk.rs | 44 ++++++++++++------- crates/transaction-pool/src/blobstore/mem.rs | 21 ++++----- crates/transaction-pool/src/blobstore/mod.rs | 12 +++-- crates/transaction-pool/src/blobstore/noop.rs | 10 +++-- crates/transaction-pool/src/lib.rs | 9 ++-- crates/transaction-pool/src/maintain.rs | 2 + crates/transaction-pool/src/noop.rs | 9 ++-- crates/transaction-pool/src/pool/mod.rs | 4 +- crates/transaction-pool/src/traits.rs | 9 ++-- .../src/mined_sidecar.rs | 8 ++-- 11 files changed, 84 insertions(+), 46 deletions(-) diff --git a/crates/ethereum/payload/src/lib.rs b/crates/ethereum/payload/src/lib.rs index 27d9d98bc..8e92d0aa8 100644 --- a/crates/ethereum/payload/src/lib.rs +++ b/crates/ethereum/payload/src/lib.rs @@ -456,7 +456,7 @@ where EthBuiltPayload::new(attributes.id, sealed_block, total_fees, Some(executed), requests); // extend the payload with the blob sidecars from the executed txs - payload.extend_sidecars(blob_sidecars); + payload.extend_sidecars(blob_sidecars.into_iter().map(Arc::unwrap_or_clone)); Ok(BuildOutcome::Better { payload, cached_reads }) } diff --git a/crates/transaction-pool/src/blobstore/disk.rs b/crates/transaction-pool/src/blobstore/disk.rs index 787d4985f..987264853 100644 --- a/crates/transaction-pool/src/blobstore/disk.rs +++ b/crates/transaction-pool/src/blobstore/disk.rs @@ -103,7 +103,7 @@ impl BlobStore for DiskFileBlobStore { stat } - fn get(&self, tx: B256) -> Result, BlobStoreError> { + fn get(&self, tx: B256) -> Result>, BlobStoreError> { self.inner.get_one(tx) } @@ -114,14 +114,17 @@ impl BlobStore for DiskFileBlobStore { fn get_all( &self, txs: Vec, - ) -> Result, BlobStoreError> { + ) -> Result)>, BlobStoreError> { if txs.is_empty() { return Ok(Vec::new()) } self.inner.get_all(txs) } - fn get_exact(&self, txs: Vec) -> Result, BlobStoreError> { + fn get_exact( + &self, + txs: Vec, + ) -> Result>, BlobStoreError> { if txs.is_empty() { return Ok(Vec::new()) } @@ -164,7 +167,7 @@ impl BlobStore for DiskFileBlobStore { struct DiskFileBlobStoreInner { blob_dir: PathBuf, - blob_cache: Mutex>, + blob_cache: Mutex, ByLength>>, size_tracker: BlobStoreSize, file_lock: RwLock<()>, txs_to_delete: RwLock>, @@ -205,7 +208,7 @@ impl DiskFileBlobStoreInner { fn insert_one(&self, tx: B256, data: BlobTransactionSidecar) -> Result<(), BlobStoreError> { let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length()); data.rlp_encode_fields(&mut buf); - self.blob_cache.lock().insert(tx, data); + self.blob_cache.lock().insert(tx, Arc::new(data)); let size = self.write_one_encoded(tx, &buf)?; self.size_tracker.add_size(size); @@ -227,7 +230,7 @@ impl DiskFileBlobStoreInner { { let mut cache = self.blob_cache.lock(); for (tx, data) in txs { - cache.insert(tx, data); + cache.insert(tx, Arc::new(data)); } } let mut add = 0; @@ -278,15 +281,19 @@ impl DiskFileBlobStoreInner { } /// Retrieves the blob for the given transaction hash from the blob cache or disk. - fn get_one(&self, tx: B256) -> Result, BlobStoreError> { + fn get_one(&self, tx: B256) -> Result>, BlobStoreError> { if let Some(blob) = self.blob_cache.lock().get(&tx) { return Ok(Some(blob.clone())) } let blob = self.read_one(tx)?; + if let Some(blob) = &blob { - self.blob_cache.lock().insert(tx, blob.clone()); + let blob_arc = Arc::new(blob.clone()); + self.blob_cache.lock().insert(tx, blob_arc.clone()); + return Ok(Some(blob_arc)) } - Ok(blob) + + Ok(None) } /// Returns the path to the blob file for the given transaction hash. @@ -374,7 +381,7 @@ impl DiskFileBlobStoreInner { fn get_all( &self, txs: Vec, - ) -> Result, BlobStoreError> { + ) -> Result)>, BlobStoreError> { let mut res = Vec::with_capacity(txs.len()); let mut cache_miss = Vec::new(); { @@ -396,8 +403,9 @@ impl DiskFileBlobStoreInner { } let mut cache = self.blob_cache.lock(); for (tx, data) in from_disk { - cache.insert(tx, data.clone()); - res.push((tx, data)); + let arc = Arc::new(data.clone()); + cache.insert(tx, arc.clone()); + res.push((tx, arc.clone())); } Ok(res) @@ -407,7 +415,10 @@ impl DiskFileBlobStoreInner { /// /// Returns an error if there are any missing blobs. #[inline] - fn get_exact(&self, txs: Vec) -> Result, BlobStoreError> { + fn get_exact( + &self, + txs: Vec, + ) -> Result>, BlobStoreError> { txs.into_iter() .map(|tx| self.get_one(tx)?.ok_or(BlobStoreError::MissingSidecar(tx))) .collect() @@ -514,14 +525,17 @@ mod tests { let blobs = rng_blobs(10); let all_hashes = blobs.iter().map(|(tx, _)| *tx).collect::>(); store.insert_all(blobs.clone()).unwrap(); + // all cached for (tx, blob) in &blobs { assert!(store.is_cached(tx)); - assert_eq!(store.get(*tx).unwrap().unwrap(), *blob); + let b = store.get(*tx).unwrap().map(Arc::unwrap_or_clone).unwrap(); + assert_eq!(b, *blob); } + let all = store.get_all(all_hashes.clone()).unwrap(); for (tx, blob) in all { - assert!(blobs.contains(&(tx, blob)), "missing blob {tx:?}"); + assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))), "missing blob {tx:?}"); } assert!(store.contains(all_hashes[0]).unwrap()); diff --git a/crates/transaction-pool/src/blobstore/mem.rs b/crates/transaction-pool/src/blobstore/mem.rs index c98a01b88..cea1837bd 100644 --- a/crates/transaction-pool/src/blobstore/mem.rs +++ b/crates/transaction-pool/src/blobstore/mem.rs @@ -15,7 +15,7 @@ pub struct InMemoryBlobStore { #[derive(Debug, Default)] struct InMemoryBlobStoreInner { /// Storage for all blob data. - store: RwLock>, + store: RwLock>>, size_tracker: BlobStoreSize, } @@ -75,7 +75,7 @@ impl BlobStore for InMemoryBlobStore { } // Retrieves the decoded blob data for the given transaction hash. - fn get(&self, tx: B256) -> Result, BlobStoreError> { + fn get(&self, tx: B256) -> Result>, BlobStoreError> { Ok(self.inner.store.read().get(&tx).cloned()) } @@ -86,16 +86,17 @@ impl BlobStore for InMemoryBlobStore { fn get_all( &self, txs: Vec, - ) -> Result, BlobStoreError> { + ) -> Result)>, BlobStoreError> { let store = self.inner.store.read(); Ok(txs.into_iter().filter_map(|tx| store.get(&tx).map(|item| (tx, item.clone()))).collect()) } - fn get_exact(&self, txs: Vec) -> Result, BlobStoreError> { + fn get_exact( + &self, + txs: Vec, + ) -> Result>, BlobStoreError> { let store = self.inner.store.read(); - txs.into_iter() - .map(|tx| store.get(&tx).cloned().ok_or_else(|| BlobStoreError::MissingSidecar(tx))) - .collect() + Ok(txs.into_iter().filter_map(|tx| store.get(&tx).cloned()).collect()) } fn get_by_versioned_hashes( @@ -134,7 +135,7 @@ impl BlobStore for InMemoryBlobStore { /// Removes the given blob from the store and returns the size of the blob that was removed. #[inline] -fn remove_size(store: &mut HashMap, tx: &B256) -> usize { +fn remove_size(store: &mut HashMap>, tx: &B256) -> usize { store.remove(tx).map(|rem| rem.size()).unwrap_or_default() } @@ -143,11 +144,11 @@ fn remove_size(store: &mut HashMap, tx: &B256) -> /// We don't need to handle the size updates for replacements because transactions are unique. #[inline] fn insert_size( - store: &mut HashMap, + store: &mut HashMap>, tx: B256, blob: BlobTransactionSidecar, ) -> usize { let add = blob.size(); - store.insert(tx, blob); + store.insert(tx, Arc::new(blob)); add } diff --git a/crates/transaction-pool/src/blobstore/mod.rs b/crates/transaction-pool/src/blobstore/mod.rs index ee98e3eed..f8d37bfcc 100644 --- a/crates/transaction-pool/src/blobstore/mod.rs +++ b/crates/transaction-pool/src/blobstore/mod.rs @@ -8,7 +8,10 @@ pub use noop::NoopBlobStore; use reth_primitives::BlobTransactionSidecar; use std::{ fmt, - sync::atomic::{AtomicUsize, Ordering}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, }; pub use tracker::{BlobStoreCanonTracker, BlobStoreUpdates}; @@ -44,7 +47,7 @@ pub trait BlobStore: fmt::Debug + Send + Sync + 'static { fn cleanup(&self) -> BlobStoreCleanupStat; /// Retrieves the decoded blob data for the given transaction hash. - fn get(&self, tx: B256) -> Result, BlobStoreError>; + fn get(&self, tx: B256) -> Result>, BlobStoreError>; /// Checks if the given transaction hash is in the blob store. fn contains(&self, tx: B256) -> Result; @@ -58,13 +61,14 @@ pub trait BlobStore: fmt::Debug + Send + Sync + 'static { fn get_all( &self, txs: Vec, - ) -> Result, BlobStoreError>; + ) -> Result)>, BlobStoreError>; /// Returns the exact [`BlobTransactionSidecar`] for the given transaction hashes in the exact /// order they were requested. /// /// Returns an error if any of the blobs are not found in the blob store. - fn get_exact(&self, txs: Vec) -> Result, BlobStoreError>; + fn get_exact(&self, txs: Vec) + -> Result>, BlobStoreError>; /// Return the [`BlobTransactionSidecar`]s for a list of blob versioned hashes. fn get_by_versioned_hashes( diff --git a/crates/transaction-pool/src/blobstore/noop.rs b/crates/transaction-pool/src/blobstore/noop.rs index 0e99858bd..0f2935735 100644 --- a/crates/transaction-pool/src/blobstore/noop.rs +++ b/crates/transaction-pool/src/blobstore/noop.rs @@ -1,6 +1,7 @@ use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobTransactionSidecar}; use alloy_eips::eip4844::BlobAndProofV1; use alloy_primitives::B256; +use std::sync::Arc; /// A blobstore implementation that does nothing #[derive(Clone, Copy, Debug, PartialOrd, PartialEq, Eq, Default)] @@ -28,7 +29,7 @@ impl BlobStore for NoopBlobStore { BlobStoreCleanupStat::default() } - fn get(&self, _tx: B256) -> Result, BlobStoreError> { + fn get(&self, _tx: B256) -> Result>, BlobStoreError> { Ok(None) } @@ -39,11 +40,14 @@ impl BlobStore for NoopBlobStore { fn get_all( &self, _txs: Vec, - ) -> Result, BlobStoreError> { + ) -> Result)>, BlobStoreError> { Ok(vec![]) } - fn get_exact(&self, txs: Vec) -> Result, BlobStoreError> { + fn get_exact( + &self, + txs: Vec, + ) -> Result>, BlobStoreError> { if txs.is_empty() { return Ok(vec![]) } diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index 609ab987f..3a5e547ba 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -561,21 +561,24 @@ where self.pool.unique_senders() } - fn get_blob(&self, tx_hash: TxHash) -> Result, BlobStoreError> { + fn get_blob( + &self, + tx_hash: TxHash, + ) -> Result>, BlobStoreError> { self.pool.blob_store().get(tx_hash) } fn get_all_blobs( &self, tx_hashes: Vec, - ) -> Result, BlobStoreError> { + ) -> Result)>, BlobStoreError> { self.pool.blob_store().get_all(tx_hashes) } fn get_all_blobs_exact( &self, tx_hashes: Vec, - ) -> Result, BlobStoreError> { + ) -> Result>, BlobStoreError> { self.pool.blob_store().get_exact(tx_hashes) } diff --git a/crates/transaction-pool/src/maintain.rs b/crates/transaction-pool/src/maintain.rs index 09c042ae6..b62a6c18c 100644 --- a/crates/transaction-pool/src/maintain.rs +++ b/crates/transaction-pool/src/maintain.rs @@ -27,6 +27,7 @@ use std::{ collections::HashSet, hash::{Hash, Hasher}, path::{Path, PathBuf}, + sync::Arc, }; use tokio::sync::oneshot; use tracing::{debug, error, info, trace, warn}; @@ -328,6 +329,7 @@ pub async fn maintain_transaction_pool( pool.get_blob(tx.hash) .ok() .flatten() + .map(Arc::unwrap_or_clone) .and_then(|sidecar| { PooledTransactionsElementEcRecovered::try_from_blob_transaction( tx, sidecar, diff --git a/crates/transaction-pool/src/noop.rs b/crates/transaction-pool/src/noop.rs index 817ea7bad..4f4e5a381 100644 --- a/crates/transaction-pool/src/noop.rs +++ b/crates/transaction-pool/src/noop.rs @@ -275,21 +275,24 @@ impl TransactionPool for NoopTransactionPool { Default::default() } - fn get_blob(&self, _tx_hash: TxHash) -> Result, BlobStoreError> { + fn get_blob( + &self, + _tx_hash: TxHash, + ) -> Result>, BlobStoreError> { Ok(None) } fn get_all_blobs( &self, _tx_hashes: Vec, - ) -> Result, BlobStoreError> { + ) -> Result)>, BlobStoreError> { Ok(vec![]) } fn get_all_blobs_exact( &self, tx_hashes: Vec, - ) -> Result, BlobStoreError> { + ) -> Result>, BlobStoreError> { if tx_hashes.is_empty() { return Ok(vec![]) } diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index fef0fd0ee..2e7340954 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -307,7 +307,9 @@ where /// Caution: this assumes the given transaction is eip-4844 fn get_blob_transaction(&self, transaction: TransactionSigned) -> Option { if let Ok(Some(sidecar)) = self.blob_store.get(transaction.hash()) { - if let Ok(blob) = BlobTransaction::try_from_signed(transaction, sidecar) { + if let Ok(blob) = + BlobTransaction::try_from_signed(transaction, Arc::unwrap_or_clone(sidecar)) + { return Some(blob) } } diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index fbbddb98f..1b3004154 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -443,7 +443,10 @@ pub trait TransactionPool: Send + Sync + Clone { /// Returns the [BlobTransactionSidecar] for the given transaction hash if it exists in the blob /// store. - fn get_blob(&self, tx_hash: TxHash) -> Result, BlobStoreError>; + fn get_blob( + &self, + tx_hash: TxHash, + ) -> Result>, BlobStoreError>; /// Returns all [BlobTransactionSidecar] for the given transaction hashes if they exists in the /// blob store. @@ -453,7 +456,7 @@ pub trait TransactionPool: Send + Sync + Clone { fn get_all_blobs( &self, tx_hashes: Vec, - ) -> Result, BlobStoreError>; + ) -> Result)>, BlobStoreError>; /// Returns the exact [BlobTransactionSidecar] for the given transaction hashes in the order /// they were requested. @@ -462,7 +465,7 @@ pub trait TransactionPool: Send + Sync + Clone { fn get_all_blobs_exact( &self, tx_hashes: Vec, - ) -> Result, BlobStoreError>; + ) -> Result>, BlobStoreError>; /// Return the [`BlobTransactionSidecar`]s for a list of blob versioned hashes. fn get_blobs_for_versioned_hashes( diff --git a/examples/beacon-api-sidecar-fetcher/src/mined_sidecar.rs b/examples/beacon-api-sidecar-fetcher/src/mined_sidecar.rs index 1c53e4f41..2436ee021 100644 --- a/examples/beacon-api-sidecar-fetcher/src/mined_sidecar.rs +++ b/examples/beacon-api-sidecar-fetcher/src/mined_sidecar.rs @@ -13,6 +13,7 @@ use serde::{Deserialize, Serialize}; use std::{ collections::VecDeque, pin::Pin, + sync::Arc, task::{Context, Poll}, }; use thiserror::Error; @@ -110,9 +111,10 @@ where match self.pool.get_all_blobs_exact(txs.iter().map(|(tx, _)| tx.hash()).collect()) { Ok(blobs) => { actions_to_queue.reserve_exact(txs.len()); - for ((tx, _), sidecar) in txs.iter().zip(blobs.iter()) { - let transaction = BlobTransaction::try_from_signed(tx.clone(), sidecar.clone()) - .expect("should not fail to convert blob tx if it is already eip4844"); + for ((tx, _), sidecar) in txs.iter().zip(blobs.into_iter()) { + let transaction = + BlobTransaction::try_from_signed(tx.clone(), Arc::unwrap_or_clone(sidecar)) + .expect("should not fail to convert blob tx if it is already eip4844"); let block_metadata = BlockMetadata { block_hash: block.hash(),