Wrap sidecar in arcs (#11554)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Kaushik Donthi
2024-10-30 09:06:37 -07:00
committed by GitHub
parent bb8da983b0
commit 755fac08dd
11 changed files with 84 additions and 46 deletions

View File

@ -456,7 +456,7 @@ where
EthBuiltPayload::new(attributes.id, sealed_block, total_fees, Some(executed), requests);
// extend the payload with the blob sidecars from the executed txs
payload.extend_sidecars(blob_sidecars);
payload.extend_sidecars(blob_sidecars.into_iter().map(Arc::unwrap_or_clone));
Ok(BuildOutcome::Better { payload, cached_reads })
}

View File

@ -103,7 +103,7 @@ impl BlobStore for DiskFileBlobStore {
stat
}
fn get(&self, tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
self.inner.get_one(tx)
}
@ -114,14 +114,17 @@ impl BlobStore for DiskFileBlobStore {
fn get_all(
&self,
txs: Vec<B256>,
) -> Result<Vec<(B256, BlobTransactionSidecar)>, BlobStoreError> {
) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
if txs.is_empty() {
return Ok(Vec::new())
}
self.inner.get_all(txs)
}
fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError> {
fn get_exact(
&self,
txs: Vec<B256>,
) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
if txs.is_empty() {
return Ok(Vec::new())
}
@ -164,7 +167,7 @@ impl BlobStore for DiskFileBlobStore {
struct DiskFileBlobStoreInner {
blob_dir: PathBuf,
blob_cache: Mutex<LruMap<TxHash, BlobTransactionSidecar, ByLength>>,
blob_cache: Mutex<LruMap<TxHash, Arc<BlobTransactionSidecar>, ByLength>>,
size_tracker: BlobStoreSize,
file_lock: RwLock<()>,
txs_to_delete: RwLock<HashSet<B256>>,
@ -205,7 +208,7 @@ impl DiskFileBlobStoreInner {
fn insert_one(&self, tx: B256, data: BlobTransactionSidecar) -> Result<(), BlobStoreError> {
let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
data.rlp_encode_fields(&mut buf);
self.blob_cache.lock().insert(tx, data);
self.blob_cache.lock().insert(tx, Arc::new(data));
let size = self.write_one_encoded(tx, &buf)?;
self.size_tracker.add_size(size);
@ -227,7 +230,7 @@ impl DiskFileBlobStoreInner {
{
let mut cache = self.blob_cache.lock();
for (tx, data) in txs {
cache.insert(tx, data);
cache.insert(tx, Arc::new(data));
}
}
let mut add = 0;
@ -278,15 +281,19 @@ impl DiskFileBlobStoreInner {
}
/// Retrieves the blob for the given transaction hash from the blob cache or disk.
fn get_one(&self, tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
fn get_one(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
if let Some(blob) = self.blob_cache.lock().get(&tx) {
return Ok(Some(blob.clone()))
}
let blob = self.read_one(tx)?;
if let Some(blob) = &blob {
self.blob_cache.lock().insert(tx, blob.clone());
let blob_arc = Arc::new(blob.clone());
self.blob_cache.lock().insert(tx, blob_arc.clone());
return Ok(Some(blob_arc))
}
Ok(blob)
Ok(None)
}
/// Returns the path to the blob file for the given transaction hash.
@ -374,7 +381,7 @@ impl DiskFileBlobStoreInner {
fn get_all(
&self,
txs: Vec<B256>,
) -> Result<Vec<(B256, BlobTransactionSidecar)>, BlobStoreError> {
) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
let mut res = Vec::with_capacity(txs.len());
let mut cache_miss = Vec::new();
{
@ -396,8 +403,9 @@ impl DiskFileBlobStoreInner {
}
let mut cache = self.blob_cache.lock();
for (tx, data) in from_disk {
cache.insert(tx, data.clone());
res.push((tx, data));
let arc = Arc::new(data.clone());
cache.insert(tx, arc.clone());
res.push((tx, arc.clone()));
}
Ok(res)
@ -407,7 +415,10 @@ impl DiskFileBlobStoreInner {
///
/// Returns an error if there are any missing blobs.
#[inline]
fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError> {
fn get_exact(
&self,
txs: Vec<B256>,
) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
txs.into_iter()
.map(|tx| self.get_one(tx)?.ok_or(BlobStoreError::MissingSidecar(tx)))
.collect()
@ -514,14 +525,17 @@ mod tests {
let blobs = rng_blobs(10);
let all_hashes = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
store.insert_all(blobs.clone()).unwrap();
// all cached
for (tx, blob) in &blobs {
assert!(store.is_cached(tx));
assert_eq!(store.get(*tx).unwrap().unwrap(), *blob);
let b = store.get(*tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
assert_eq!(b, *blob);
}
let all = store.get_all(all_hashes.clone()).unwrap();
for (tx, blob) in all {
assert!(blobs.contains(&(tx, blob)), "missing blob {tx:?}");
assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))), "missing blob {tx:?}");
}
assert!(store.contains(all_hashes[0]).unwrap());

View File

@ -15,7 +15,7 @@ pub struct InMemoryBlobStore {
#[derive(Debug, Default)]
struct InMemoryBlobStoreInner {
/// Storage for all blob data.
store: RwLock<HashMap<B256, BlobTransactionSidecar>>,
store: RwLock<HashMap<B256, Arc<BlobTransactionSidecar>>>,
size_tracker: BlobStoreSize,
}
@ -75,7 +75,7 @@ impl BlobStore for InMemoryBlobStore {
}
// Retrieves the decoded blob data for the given transaction hash.
fn get(&self, tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
Ok(self.inner.store.read().get(&tx).cloned())
}
@ -86,16 +86,17 @@ impl BlobStore for InMemoryBlobStore {
fn get_all(
&self,
txs: Vec<B256>,
) -> Result<Vec<(B256, BlobTransactionSidecar)>, BlobStoreError> {
) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
let store = self.inner.store.read();
Ok(txs.into_iter().filter_map(|tx| store.get(&tx).map(|item| (tx, item.clone()))).collect())
}
fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError> {
fn get_exact(
&self,
txs: Vec<B256>,
) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
let store = self.inner.store.read();
txs.into_iter()
.map(|tx| store.get(&tx).cloned().ok_or_else(|| BlobStoreError::MissingSidecar(tx)))
.collect()
Ok(txs.into_iter().filter_map(|tx| store.get(&tx).cloned()).collect())
}
fn get_by_versioned_hashes(
@ -134,7 +135,7 @@ impl BlobStore for InMemoryBlobStore {
/// Removes the given blob from the store and returns the size of the blob that was removed.
#[inline]
fn remove_size(store: &mut HashMap<B256, BlobTransactionSidecar>, tx: &B256) -> usize {
fn remove_size(store: &mut HashMap<B256, Arc<BlobTransactionSidecar>>, tx: &B256) -> usize {
store.remove(tx).map(|rem| rem.size()).unwrap_or_default()
}
@ -143,11 +144,11 @@ fn remove_size(store: &mut HashMap<B256, BlobTransactionSidecar>, tx: &B256) ->
/// We don't need to handle the size updates for replacements because transactions are unique.
#[inline]
fn insert_size(
store: &mut HashMap<B256, BlobTransactionSidecar>,
store: &mut HashMap<B256, Arc<BlobTransactionSidecar>>,
tx: B256,
blob: BlobTransactionSidecar,
) -> usize {
let add = blob.size();
store.insert(tx, blob);
store.insert(tx, Arc::new(blob));
add
}

View File

@ -8,7 +8,10 @@ pub use noop::NoopBlobStore;
use reth_primitives::BlobTransactionSidecar;
use std::{
fmt,
sync::atomic::{AtomicUsize, Ordering},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
pub use tracker::{BlobStoreCanonTracker, BlobStoreUpdates};
@ -44,7 +47,7 @@ pub trait BlobStore: fmt::Debug + Send + Sync + 'static {
fn cleanup(&self) -> BlobStoreCleanupStat;
/// Retrieves the decoded blob data for the given transaction hash.
fn get(&self, tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError>;
fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError>;
/// Checks if the given transaction hash is in the blob store.
fn contains(&self, tx: B256) -> Result<bool, BlobStoreError>;
@ -58,13 +61,14 @@ pub trait BlobStore: fmt::Debug + Send + Sync + 'static {
fn get_all(
&self,
txs: Vec<B256>,
) -> Result<Vec<(B256, BlobTransactionSidecar)>, BlobStoreError>;
) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError>;
/// Returns the exact [`BlobTransactionSidecar`] for the given transaction hashes in the exact
/// order they were requested.
///
/// Returns an error if any of the blobs are not found in the blob store.
fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError>;
fn get_exact(&self, txs: Vec<B256>)
-> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError>;
/// Return the [`BlobTransactionSidecar`]s for a list of blob versioned hashes.
fn get_by_versioned_hashes(

View File

@ -1,6 +1,7 @@
use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobTransactionSidecar};
use alloy_eips::eip4844::BlobAndProofV1;
use alloy_primitives::B256;
use std::sync::Arc;
/// A blobstore implementation that does nothing
#[derive(Clone, Copy, Debug, PartialOrd, PartialEq, Eq, Default)]
@ -28,7 +29,7 @@ impl BlobStore for NoopBlobStore {
BlobStoreCleanupStat::default()
}
fn get(&self, _tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
fn get(&self, _tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
Ok(None)
}
@ -39,11 +40,14 @@ impl BlobStore for NoopBlobStore {
fn get_all(
&self,
_txs: Vec<B256>,
) -> Result<Vec<(B256, BlobTransactionSidecar)>, BlobStoreError> {
) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
Ok(vec![])
}
fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError> {
fn get_exact(
&self,
txs: Vec<B256>,
) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
if txs.is_empty() {
return Ok(vec![])
}

View File

@ -561,21 +561,24 @@ where
self.pool.unique_senders()
}
fn get_blob(&self, tx_hash: TxHash) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
fn get_blob(
&self,
tx_hash: TxHash,
) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
self.pool.blob_store().get(tx_hash)
}
fn get_all_blobs(
&self,
tx_hashes: Vec<TxHash>,
) -> Result<Vec<(TxHash, BlobTransactionSidecar)>, BlobStoreError> {
) -> Result<Vec<(TxHash, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
self.pool.blob_store().get_all(tx_hashes)
}
fn get_all_blobs_exact(
&self,
tx_hashes: Vec<TxHash>,
) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError> {
) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
self.pool.blob_store().get_exact(tx_hashes)
}

View File

@ -27,6 +27,7 @@ use std::{
collections::HashSet,
hash::{Hash, Hasher},
path::{Path, PathBuf},
sync::Arc,
};
use tokio::sync::oneshot;
use tracing::{debug, error, info, trace, warn};
@ -328,6 +329,7 @@ pub async fn maintain_transaction_pool<Client, P, St, Tasks>(
pool.get_blob(tx.hash)
.ok()
.flatten()
.map(Arc::unwrap_or_clone)
.and_then(|sidecar| {
PooledTransactionsElementEcRecovered::try_from_blob_transaction(
tx, sidecar,

View File

@ -275,21 +275,24 @@ impl TransactionPool for NoopTransactionPool {
Default::default()
}
fn get_blob(&self, _tx_hash: TxHash) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
fn get_blob(
&self,
_tx_hash: TxHash,
) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
Ok(None)
}
fn get_all_blobs(
&self,
_tx_hashes: Vec<TxHash>,
) -> Result<Vec<(TxHash, BlobTransactionSidecar)>, BlobStoreError> {
) -> Result<Vec<(TxHash, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
Ok(vec![])
}
fn get_all_blobs_exact(
&self,
tx_hashes: Vec<TxHash>,
) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError> {
) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
if tx_hashes.is_empty() {
return Ok(vec![])
}

View File

@ -307,7 +307,9 @@ where
/// Caution: this assumes the given transaction is eip-4844
fn get_blob_transaction(&self, transaction: TransactionSigned) -> Option<BlobTransaction> {
if let Ok(Some(sidecar)) = self.blob_store.get(transaction.hash()) {
if let Ok(blob) = BlobTransaction::try_from_signed(transaction, sidecar) {
if let Ok(blob) =
BlobTransaction::try_from_signed(transaction, Arc::unwrap_or_clone(sidecar))
{
return Some(blob)
}
}

View File

@ -443,7 +443,10 @@ pub trait TransactionPool: Send + Sync + Clone {
/// Returns the [BlobTransactionSidecar] for the given transaction hash if it exists in the blob
/// store.
fn get_blob(&self, tx_hash: TxHash) -> Result<Option<BlobTransactionSidecar>, BlobStoreError>;
fn get_blob(
&self,
tx_hash: TxHash,
) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError>;
/// Returns all [BlobTransactionSidecar] for the given transaction hashes if they exists in the
/// blob store.
@ -453,7 +456,7 @@ pub trait TransactionPool: Send + Sync + Clone {
fn get_all_blobs(
&self,
tx_hashes: Vec<TxHash>,
) -> Result<Vec<(TxHash, BlobTransactionSidecar)>, BlobStoreError>;
) -> Result<Vec<(TxHash, Arc<BlobTransactionSidecar>)>, BlobStoreError>;
/// Returns the exact [BlobTransactionSidecar] for the given transaction hashes in the order
/// they were requested.
@ -462,7 +465,7 @@ pub trait TransactionPool: Send + Sync + Clone {
fn get_all_blobs_exact(
&self,
tx_hashes: Vec<TxHash>,
) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError>;
) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError>;
/// Return the [`BlobTransactionSidecar`]s for a list of blob versioned hashes.
fn get_blobs_for_versioned_hashes(