feat: add blob store service (#4191)

Co-authored-by: Bjerg <onbjerg@users.noreply.github.com>
This commit is contained in:
Matthias Seitz
2023-08-17 18:38:59 +02:00
committed by GitHub
parent ca99ee2ec9
commit a5b777a65f
8 changed files with 268 additions and 2 deletions

View File

@ -183,6 +183,7 @@ where
response: oneshot::Sender<RequestResult<PooledTransactions>>,
) {
if let Some(peer) = self.peers.get_mut(&peer_id) {
// TODO softResponseLimit 2 * 1024 * 1024
let transactions = self
.pool
.get_all(request.0)
@ -237,6 +238,8 @@ where
///
/// The message for new pooled hashes depends on the negotiated version of the stream.
/// See [NewPooledTransactionHashes](NewPooledTransactionHashes)
///
/// TODO add note that this never broadcasts full 4844 transactions
fn propagate_transactions(
&mut self,
to_propagate: Vec<PropagateTransaction>,
@ -803,7 +806,7 @@ impl Future for GetPooledTxRequestFut {
struct Peer {
/// Keeps track of transactions that we know the peer has seen.
transactions: LruCache<H256>,
/// A communication channel directly to the session task.
/// A communication channel directly to the peer's session task.
request_tx: PeerRequestSender,
/// negotiated version of the session.
version: EthVersion,

View File

@ -0,0 +1,27 @@
//! Support for maintaining the blob pool.
use crate::blobstore::BlobStore;
use reth_primitives::H256;
use std::collections::BTreeMap;
/// The type that is used to maintain the blob store and discard finalized transactions.
#[derive(Debug)]
#[allow(unused)]
pub struct BlobStoreMaintainer<S> {
/// The blob store that holds all the blob data.
store: S,
/// Keeps track of the blob transactions that are in blocks.
blob_txs_in_blocks: BTreeMap<u64, Vec<H256>>,
}
impl<S> BlobStoreMaintainer<S> {
/// Creates a new blob store maintenance instance.
pub fn new(store: S) -> Self {
Self { store, blob_txs_in_blocks: Default::default() }
}
}
impl<S: BlobStore> BlobStoreMaintainer<S> {
/// Invoked when a block is finalized.
pub fn on_finalized(&mut self, _block_number: u64) {}
}

View File

@ -0,0 +1,126 @@
use crate::blobstore::{BlobStore, BlobStoreError, BlobTransactionSidecar};
use parking_lot::RwLock;
use reth_primitives::H256;
use std::{
collections::HashMap,
sync::{atomic::AtomicUsize, Arc},
};
/// An in-memory blob store.
#[derive(Clone, Debug, Default)]
pub struct InMemoryBlobStore {
inner: Arc<InMemoryBlobStoreInner>,
}
#[derive(Debug, Default)]
struct InMemoryBlobStoreInner {
/// Storage for all blob data.
store: RwLock<HashMap<H256, BlobTransactionSidecar>>,
size: AtomicUsize,
}
impl InMemoryBlobStoreInner {
fn add_size(&self, add: usize) {
self.size.fetch_add(add, std::sync::atomic::Ordering::Relaxed);
}
fn sub_size(&self, sub: usize) {
self.size.fetch_sub(sub, std::sync::atomic::Ordering::Relaxed);
}
fn update_size(&self, add: usize, sub: usize) {
if add > sub {
self.add_size(add - sub);
} else {
self.sub_size(sub - add);
}
}
}
impl BlobStore for InMemoryBlobStore {
fn insert(&self, tx: H256, data: BlobTransactionSidecar) -> Result<(), BlobStoreError> {
let mut store = self.inner.store.write();
let (add, sub) = insert_size(&mut store, tx, data);
self.inner.update_size(add, sub);
Ok(())
}
fn insert_all(&self, txs: Vec<(H256, BlobTransactionSidecar)>) -> Result<(), BlobStoreError> {
if txs.is_empty() {
return Ok(())
}
let mut store = self.inner.store.write();
let mut total_add = 0;
let mut total_sub = 0;
for (tx, data) in txs {
let (add, sub) = insert_size(&mut store, tx, data);
total_add += add;
total_sub += sub;
}
self.inner.update_size(total_add, total_sub);
Ok(())
}
fn delete(&self, tx: H256) -> Result<(), BlobStoreError> {
let mut store = self.inner.store.write();
let sub = remove_size(&mut store, &tx);
self.inner.sub_size(sub);
Ok(())
}
fn delete_all(&self, txs: Vec<H256>) -> Result<(), BlobStoreError> {
if txs.is_empty() {
return Ok(())
}
let mut store = self.inner.store.write();
let mut total_sub = 0;
for tx in txs {
total_sub += remove_size(&mut store, &tx);
}
self.inner.sub_size(total_sub);
Ok(())
}
// Retrieves the decoded blob data for the given transaction hash.
fn get(&self, tx: H256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
let store = self.inner.store.write();
Ok(store.get(&tx).cloned())
}
fn get_all(
&self,
txs: Vec<H256>,
) -> Result<Vec<(H256, BlobTransactionSidecar)>, BlobStoreError> {
let mut items = Vec::with_capacity(txs.len());
let store = self.inner.store.write();
for tx in txs {
if let Some(item) = store.get(&tx) {
items.push((tx, item.clone()));
}
}
Ok(items)
}
fn data_size_hint(&self) -> Option<usize> {
Some(self.inner.size.load(std::sync::atomic::Ordering::Relaxed))
}
}
/// Removes the given blob from the store and returns the size of the blob that was removed.
#[inline]
fn remove_size(store: &mut HashMap<H256, BlobTransactionSidecar>, tx: &H256) -> usize {
store.remove(tx).map(|rem| rem.size()).unwrap_or_default()
}
/// Inserts the given blob into the store and returns the size of the blob that was (added,removed)
#[inline]
fn insert_size(
store: &mut HashMap<H256, BlobTransactionSidecar>,
tx: H256,
blob: BlobTransactionSidecar,
) -> (usize, usize) {
let add = blob.size();
let sub = store.insert(tx, blob).map(|rem| rem.size()).unwrap_or_default();
(add, sub)
}

View File

@ -0,0 +1,66 @@
//! Storage for blob data of EIP4844 transactions.
use reth_primitives::{BlobTransactionSidecar, H256};
mod maintain;
mod mem;
mod noop;
pub use maintain::BlobStoreMaintainer;
pub use mem::InMemoryBlobStore;
pub use noop::NoopBlobStore;
/// A blob store that can be used to store blob data of EIP4844 transactions.
///
/// This type is responsible for keeping track of blob data until it is no longer needed (after
/// finalization).
///
/// Note: this is Clone because it is expected to be wrapped in an Arc.
pub trait BlobStore: Send + Sync + 'static {
/// Inserts the blob sidecar into the store
fn insert(&self, tx: H256, data: BlobTransactionSidecar) -> Result<(), BlobStoreError>;
/// Inserts multiple blob sidecars into the store
fn insert_all(&self, txs: Vec<(H256, BlobTransactionSidecar)>) -> Result<(), BlobStoreError>;
/// Deletes the blob sidecar from the store
fn delete(&self, tx: H256) -> Result<(), BlobStoreError>;
/// Deletes multiple blob sidecars from the store
fn delete_all(&self, txs: Vec<H256>) -> Result<(), BlobStoreError>;
/// Retrieves the decoded blob data for the given transaction hash.
fn get(&self, tx: H256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError>;
/// Retrieves all decoded blob data for the given transaction hashes.
///
/// This only returns the blobs that were found in the store.
/// If there's no blob it will not be returned.
fn get_all(
&self,
txs: Vec<H256>,
) -> Result<Vec<(H256, BlobTransactionSidecar)>, BlobStoreError>;
/// Data size of all transactions in the blob store.
fn data_size_hint(&self) -> Option<usize>;
}
/// Error variants that can occur when interacting with a blob store.
#[derive(Debug, thiserror::Error)]
pub enum BlobStoreError {
/// Failed to decode the stored blob data.
#[error("failed to decode blob data: {0}")]
DecodeError(#[from] reth_rlp::DecodeError),
/// Other implementation specific error.
#[error(transparent)]
Other(Box<dyn std::error::Error + Send + Sync>),
}
#[cfg(test)]
mod tests {
use super::*;
#[allow(unused)]
struct DynStore {
store: Box<dyn BlobStore>,
}
}

View File

@ -0,0 +1,40 @@
use crate::blobstore::{BlobStore, BlobStoreError, BlobTransactionSidecar};
use reth_primitives::H256;
/// A blobstore implementation that does nothing
#[derive(Clone, Copy, Debug, PartialOrd, PartialEq, Default)]
#[non_exhaustive]
pub struct NoopBlobStore;
impl BlobStore for NoopBlobStore {
fn insert(&self, _tx: H256, _data: BlobTransactionSidecar) -> Result<(), BlobStoreError> {
Ok(())
}
fn insert_all(&self, _txs: Vec<(H256, BlobTransactionSidecar)>) -> Result<(), BlobStoreError> {
Ok(())
}
fn delete(&self, _tx: H256) -> Result<(), BlobStoreError> {
Ok(())
}
fn delete_all(&self, _txs: Vec<H256>) -> Result<(), BlobStoreError> {
Ok(())
}
fn get(&self, _tx: H256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
Ok(None)
}
fn get_all(
&self,
_txs: Vec<H256>,
) -> Result<Vec<(H256, BlobTransactionSidecar)>, BlobStoreError> {
Ok(vec![])
}
fn data_size_hint(&self) -> Option<usize> {
Some(0)
}
}

View File

@ -182,6 +182,7 @@ pub mod noop;
pub mod pool;
pub mod validate;
pub mod blobstore;
mod config;
mod identifier;
mod ordering;

View File

@ -247,7 +247,8 @@ pub trait TransactionPool: Send + Sync + Clone {
/// Returns all transactions objects for the given hashes.
///
/// This adheres to the expected behavior of [`GetPooledTransactions`](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getpooledtransactions-0x09):
/// TODO(mattsse): this will no longer be accurate and we need a new function specifically for
/// pooled txs This adheres to the expected behavior of [`GetPooledTransactions`](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getpooledtransactions-0x09):
/// The transactions must be in same order as in the request, but it is OK to skip transactions
/// which are not available.
fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
@ -576,6 +577,7 @@ pub struct PooledTransaction {
/// For EIP-1559 transactions: `max_fee_per_gas * gas_limit + tx_value`.
/// For legacy transactions: `gas_price * gas_limit + tx_value`.
pub(crate) cost: U256,
// TODO optional sidecar
}
impl PooledTransaction {

View File

@ -33,6 +33,7 @@ pub enum TransactionValidationOutcome<T: PoolTransaction> {
/// Current nonce of the sender.
state_nonce: u64,
/// Validated transaction.
// TODO add enum type for blob,regular?
transaction: T,
/// Whether to propagate the transaction to the network.
propagate: bool,