From a5b777a65fe13a212f30acd2d7ad8336fa202b5f Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Thu, 17 Aug 2023 18:38:59 +0200 Subject: [PATCH] feat: add blob store service (#4191) Co-authored-by: Bjerg --- crates/net/network/src/transactions.rs | 5 +- .../src/blobstore/maintain.rs | 27 ++++ crates/transaction-pool/src/blobstore/mem.rs | 126 ++++++++++++++++++ crates/transaction-pool/src/blobstore/mod.rs | 66 +++++++++ crates/transaction-pool/src/blobstore/noop.rs | 40 ++++++ crates/transaction-pool/src/lib.rs | 1 + crates/transaction-pool/src/traits.rs | 4 +- crates/transaction-pool/src/validate/mod.rs | 1 + 8 files changed, 268 insertions(+), 2 deletions(-) create mode 100644 crates/transaction-pool/src/blobstore/maintain.rs create mode 100644 crates/transaction-pool/src/blobstore/mem.rs create mode 100644 crates/transaction-pool/src/blobstore/mod.rs create mode 100644 crates/transaction-pool/src/blobstore/noop.rs diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index f5cc1487d..56d43dbb5 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -183,6 +183,7 @@ where response: oneshot::Sender>, ) { if let Some(peer) = self.peers.get_mut(&peer_id) { + // TODO softResponseLimit 2 * 1024 * 1024 let transactions = self .pool .get_all(request.0) @@ -237,6 +238,8 @@ where /// /// The message for new pooled hashes depends on the negotiated version of the stream. /// See [NewPooledTransactionHashes](NewPooledTransactionHashes) + /// + /// TODO add note that this never broadcasts full 4844 transactions fn propagate_transactions( &mut self, to_propagate: Vec, @@ -803,7 +806,7 @@ impl Future for GetPooledTxRequestFut { struct Peer { /// Keeps track of transactions that we know the peer has seen. transactions: LruCache, - /// A communication channel directly to the session task. + /// A communication channel directly to the peer's session task. request_tx: PeerRequestSender, /// negotiated version of the session. version: EthVersion, diff --git a/crates/transaction-pool/src/blobstore/maintain.rs b/crates/transaction-pool/src/blobstore/maintain.rs new file mode 100644 index 000000000..cfc4c8fc6 --- /dev/null +++ b/crates/transaction-pool/src/blobstore/maintain.rs @@ -0,0 +1,27 @@ +//! Support for maintaining the blob pool. + +use crate::blobstore::BlobStore; +use reth_primitives::H256; +use std::collections::BTreeMap; + +/// The type that is used to maintain the blob store and discard finalized transactions. +#[derive(Debug)] +#[allow(unused)] +pub struct BlobStoreMaintainer { + /// The blob store that holds all the blob data. + store: S, + /// Keeps track of the blob transactions that are in blocks. + blob_txs_in_blocks: BTreeMap>, +} + +impl BlobStoreMaintainer { + /// Creates a new blob store maintenance instance. + pub fn new(store: S) -> Self { + Self { store, blob_txs_in_blocks: Default::default() } + } +} + +impl BlobStoreMaintainer { + /// Invoked when a block is finalized. + pub fn on_finalized(&mut self, _block_number: u64) {} +} diff --git a/crates/transaction-pool/src/blobstore/mem.rs b/crates/transaction-pool/src/blobstore/mem.rs new file mode 100644 index 000000000..6d1dcb76a --- /dev/null +++ b/crates/transaction-pool/src/blobstore/mem.rs @@ -0,0 +1,126 @@ +use crate::blobstore::{BlobStore, BlobStoreError, BlobTransactionSidecar}; +use parking_lot::RwLock; +use reth_primitives::H256; +use std::{ + collections::HashMap, + sync::{atomic::AtomicUsize, Arc}, +}; + +/// An in-memory blob store. +#[derive(Clone, Debug, Default)] +pub struct InMemoryBlobStore { + inner: Arc, +} + +#[derive(Debug, Default)] +struct InMemoryBlobStoreInner { + /// Storage for all blob data. + store: RwLock>, + size: AtomicUsize, +} + +impl InMemoryBlobStoreInner { + fn add_size(&self, add: usize) { + self.size.fetch_add(add, std::sync::atomic::Ordering::Relaxed); + } + + fn sub_size(&self, sub: usize) { + self.size.fetch_sub(sub, std::sync::atomic::Ordering::Relaxed); + } + + fn update_size(&self, add: usize, sub: usize) { + if add > sub { + self.add_size(add - sub); + } else { + self.sub_size(sub - add); + } + } +} + +impl BlobStore for InMemoryBlobStore { + fn insert(&self, tx: H256, data: BlobTransactionSidecar) -> Result<(), BlobStoreError> { + let mut store = self.inner.store.write(); + let (add, sub) = insert_size(&mut store, tx, data); + self.inner.update_size(add, sub); + Ok(()) + } + + fn insert_all(&self, txs: Vec<(H256, BlobTransactionSidecar)>) -> Result<(), BlobStoreError> { + if txs.is_empty() { + return Ok(()) + } + let mut store = self.inner.store.write(); + let mut total_add = 0; + let mut total_sub = 0; + for (tx, data) in txs { + let (add, sub) = insert_size(&mut store, tx, data); + total_add += add; + total_sub += sub; + } + self.inner.update_size(total_add, total_sub); + Ok(()) + } + + fn delete(&self, tx: H256) -> Result<(), BlobStoreError> { + let mut store = self.inner.store.write(); + let sub = remove_size(&mut store, &tx); + self.inner.sub_size(sub); + Ok(()) + } + + fn delete_all(&self, txs: Vec) -> Result<(), BlobStoreError> { + if txs.is_empty() { + return Ok(()) + } + let mut store = self.inner.store.write(); + let mut total_sub = 0; + for tx in txs { + total_sub += remove_size(&mut store, &tx); + } + self.inner.sub_size(total_sub); + Ok(()) + } + + // Retrieves the decoded blob data for the given transaction hash. + fn get(&self, tx: H256) -> Result, BlobStoreError> { + let store = self.inner.store.write(); + Ok(store.get(&tx).cloned()) + } + + fn get_all( + &self, + txs: Vec, + ) -> Result, BlobStoreError> { + let mut items = Vec::with_capacity(txs.len()); + let store = self.inner.store.write(); + for tx in txs { + if let Some(item) = store.get(&tx) { + items.push((tx, item.clone())); + } + } + + Ok(items) + } + + fn data_size_hint(&self) -> Option { + Some(self.inner.size.load(std::sync::atomic::Ordering::Relaxed)) + } +} + +/// Removes the given blob from the store and returns the size of the blob that was removed. +#[inline] +fn remove_size(store: &mut HashMap, tx: &H256) -> usize { + store.remove(tx).map(|rem| rem.size()).unwrap_or_default() +} + +/// Inserts the given blob into the store and returns the size of the blob that was (added,removed) +#[inline] +fn insert_size( + store: &mut HashMap, + tx: H256, + blob: BlobTransactionSidecar, +) -> (usize, usize) { + let add = blob.size(); + let sub = store.insert(tx, blob).map(|rem| rem.size()).unwrap_or_default(); + (add, sub) +} diff --git a/crates/transaction-pool/src/blobstore/mod.rs b/crates/transaction-pool/src/blobstore/mod.rs new file mode 100644 index 000000000..0bdd14218 --- /dev/null +++ b/crates/transaction-pool/src/blobstore/mod.rs @@ -0,0 +1,66 @@ +//! Storage for blob data of EIP4844 transactions. + +use reth_primitives::{BlobTransactionSidecar, H256}; +mod maintain; +mod mem; +mod noop; + +pub use maintain::BlobStoreMaintainer; +pub use mem::InMemoryBlobStore; +pub use noop::NoopBlobStore; + +/// A blob store that can be used to store blob data of EIP4844 transactions. +/// +/// This type is responsible for keeping track of blob data until it is no longer needed (after +/// finalization). +/// +/// Note: this is Clone because it is expected to be wrapped in an Arc. +pub trait BlobStore: Send + Sync + 'static { + /// Inserts the blob sidecar into the store + fn insert(&self, tx: H256, data: BlobTransactionSidecar) -> Result<(), BlobStoreError>; + + /// Inserts multiple blob sidecars into the store + fn insert_all(&self, txs: Vec<(H256, BlobTransactionSidecar)>) -> Result<(), BlobStoreError>; + + /// Deletes the blob sidecar from the store + fn delete(&self, tx: H256) -> Result<(), BlobStoreError>; + + /// Deletes multiple blob sidecars from the store + fn delete_all(&self, txs: Vec) -> Result<(), BlobStoreError>; + + /// Retrieves the decoded blob data for the given transaction hash. + fn get(&self, tx: H256) -> Result, BlobStoreError>; + + /// Retrieves all decoded blob data for the given transaction hashes. + /// + /// This only returns the blobs that were found in the store. + /// If there's no blob it will not be returned. + fn get_all( + &self, + txs: Vec, + ) -> Result, BlobStoreError>; + + /// Data size of all transactions in the blob store. + fn data_size_hint(&self) -> Option; +} + +/// Error variants that can occur when interacting with a blob store. +#[derive(Debug, thiserror::Error)] +pub enum BlobStoreError { + /// Failed to decode the stored blob data. + #[error("failed to decode blob data: {0}")] + DecodeError(#[from] reth_rlp::DecodeError), + /// Other implementation specific error. + #[error(transparent)] + Other(Box), +} + +#[cfg(test)] +mod tests { + use super::*; + + #[allow(unused)] + struct DynStore { + store: Box, + } +} diff --git a/crates/transaction-pool/src/blobstore/noop.rs b/crates/transaction-pool/src/blobstore/noop.rs new file mode 100644 index 000000000..d21bf59ef --- /dev/null +++ b/crates/transaction-pool/src/blobstore/noop.rs @@ -0,0 +1,40 @@ +use crate::blobstore::{BlobStore, BlobStoreError, BlobTransactionSidecar}; +use reth_primitives::H256; + +/// A blobstore implementation that does nothing +#[derive(Clone, Copy, Debug, PartialOrd, PartialEq, Default)] +#[non_exhaustive] +pub struct NoopBlobStore; + +impl BlobStore for NoopBlobStore { + fn insert(&self, _tx: H256, _data: BlobTransactionSidecar) -> Result<(), BlobStoreError> { + Ok(()) + } + + fn insert_all(&self, _txs: Vec<(H256, BlobTransactionSidecar)>) -> Result<(), BlobStoreError> { + Ok(()) + } + + fn delete(&self, _tx: H256) -> Result<(), BlobStoreError> { + Ok(()) + } + + fn delete_all(&self, _txs: Vec) -> Result<(), BlobStoreError> { + Ok(()) + } + + fn get(&self, _tx: H256) -> Result, BlobStoreError> { + Ok(None) + } + + fn get_all( + &self, + _txs: Vec, + ) -> Result, BlobStoreError> { + Ok(vec![]) + } + + fn data_size_hint(&self) -> Option { + Some(0) + } +} diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index 8d039f66a..4d78f6aba 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -182,6 +182,7 @@ pub mod noop; pub mod pool; pub mod validate; +pub mod blobstore; mod config; mod identifier; mod ordering; diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index 511ddc9a8..2e869e671 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -247,7 +247,8 @@ pub trait TransactionPool: Send + Sync + Clone { /// Returns all transactions objects for the given hashes. /// - /// This adheres to the expected behavior of [`GetPooledTransactions`](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getpooledtransactions-0x09): + /// TODO(mattsse): this will no longer be accurate and we need a new function specifically for + /// pooled txs This adheres to the expected behavior of [`GetPooledTransactions`](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getpooledtransactions-0x09): /// The transactions must be in same order as in the request, but it is OK to skip transactions /// which are not available. fn get_all(&self, txs: Vec) -> Vec>>; @@ -576,6 +577,7 @@ pub struct PooledTransaction { /// For EIP-1559 transactions: `max_fee_per_gas * gas_limit + tx_value`. /// For legacy transactions: `gas_price * gas_limit + tx_value`. pub(crate) cost: U256, + // TODO optional sidecar } impl PooledTransaction { diff --git a/crates/transaction-pool/src/validate/mod.rs b/crates/transaction-pool/src/validate/mod.rs index 3c758bd74..989f7cc78 100644 --- a/crates/transaction-pool/src/validate/mod.rs +++ b/crates/transaction-pool/src/validate/mod.rs @@ -33,6 +33,7 @@ pub enum TransactionValidationOutcome { /// Current nonce of the sender. state_nonce: u64, /// Validated transaction. + // TODO add enum type for blob,regular? transaction: T, /// Whether to propagate the transaction to the network. propagate: bool,