feat: ChainStorageReader (#12836)

This commit is contained in:
Arsenii Kulikov
2024-11-25 17:29:25 +04:00
committed by GitHub
parent 04dd005af9
commit 863c5233fc
13 changed files with 227 additions and 157 deletions

View File

@ -63,7 +63,7 @@ test-utils = [
"revm",
"reth-chainspec/test-utils",
"reth-primitives/test-utils",
"reth-primitives-traits/test-utils",
"reth-trie/test-utils",
"revm?/test-utils",
"reth-primitives-traits/test-utils"
]

View File

@ -15,6 +15,7 @@ use reth_primitives::{
BlockWithSenders, NodePrimitives, Receipts, SealedBlock, SealedBlockWithSenders, SealedHeader,
TransactionMeta, TransactionSigned,
};
use reth_primitives_traits::BlockBody as _;
use reth_storage_api::StateProviderBox;
use reth_trie::{updates::TrieUpdates, HashedPostState};
use std::{collections::BTreeMap, sync::Arc, time::Instant};
@ -547,8 +548,13 @@ where
/// Returns a `TransactionSigned` for the given `TxHash` if found.
pub fn transaction_by_hash(&self, hash: TxHash) -> Option<TransactionSigned> {
for block_state in self.canonical_chain() {
if let Some(tx) =
block_state.block_ref().block().body.transactions().find(|tx| tx.hash() == hash)
if let Some(tx) = block_state
.block_ref()
.block()
.body
.transactions()
.iter()
.find(|tx| tx.hash() == hash)
{
return Some(tx.clone())
}
@ -568,6 +574,7 @@ where
.block()
.body
.transactions()
.iter()
.enumerate()
.find(|(_, tx)| tx.hash() == tx_hash)
{
@ -748,6 +755,7 @@ impl<N: NodePrimitives> BlockState<N> {
.block()
.body
.transactions()
.iter()
.find(|tx| tx.hash() == hash)
.cloned()
})
@ -764,6 +772,7 @@ impl<N: NodePrimitives> BlockState<N> {
.block()
.body
.transactions()
.iter()
.enumerate()
.find(|(_, tx)| tx.hash() == tx_hash)
.map(|(index, tx)| {

View File

@ -4,7 +4,7 @@ use std::sync::Arc;
use alloy_consensus::Header;
use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig};
use reth_chainspec::{EthChainSpec, Hardforks};
use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks};
use reth_db::transaction::{DbTx, DbTxMut};
use reth_evm::{execute::BasicBlockExecutorProvider, ConfigureEvm};
use reth_network::{NetworkConfig, NetworkHandle, NetworkManager, PeersInfo};
@ -32,8 +32,8 @@ use reth_optimism_rpc::{
use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService};
use reth_primitives::BlockBody;
use reth_provider::{
providers::ChainStorage, BlockBodyWriter, CanonStateSubscriptions, DBProvider, EthStorage,
ProviderResult,
providers::ChainStorage, BlockBodyReader, BlockBodyWriter, CanonStateSubscriptions,
ChainSpecProvider, DBProvider, EthStorage, ProviderResult, ReadBodyInput,
};
use reth_rpc_server_types::RethRpcModule;
use reth_tracing::tracing::{debug, info};
@ -72,7 +72,31 @@ impl<Provider: DBProvider<Tx: DbTxMut>> BlockBodyWriter<Provider, BlockBody> for
}
}
impl<Provider: DBProvider + ChainSpecProvider<ChainSpec: EthereumHardforks>>
BlockBodyReader<Provider> for OpStorage
{
type Block = reth_primitives::Block;
fn read_block_bodies(
&self,
provider: &Provider,
inputs: Vec<ReadBodyInput<'_, Self::Block>>,
) -> ProviderResult<Vec<BlockBody>> {
self.0.read_block_bodies(provider, inputs)
}
}
impl ChainStorage<OpPrimitives> for OpStorage {
fn reader<TX, Types>(
&self,
) -> impl reth_provider::ChainStorageReader<reth_provider::DatabaseProvider<TX, Types>, OpPrimitives>
where
TX: DbTx + 'static,
Types: reth_provider::providers::NodeTypesForProvider<Primitives = OpPrimitives>,
{
self
}
fn writer<TX, Types>(
&self,
) -> impl reth_provider::ChainStorageWriter<reth_provider::DatabaseProvider<TX, Types>, OpPrimitives>
@ -83,6 +107,7 @@ impl ChainStorage<OpPrimitives> for OpStorage {
self
}
}
/// Type configuration for a regular Optimism node.
#[derive(Debug, Default, Clone)]
#[non_exhaustive]

View File

@ -661,12 +661,6 @@ impl BlockBody {
pub fn blob_versioned_hashes(&self) -> Vec<&B256> {
self.blob_versioned_hashes_iter().collect()
}
/// Returns an iterator over all transactions.
#[inline]
pub fn transactions(&self) -> impl Iterator<Item = &TransactionSigned> + '_ {
self.transactions.iter()
}
}
impl InMemorySize for BlockBody {

View File

@ -148,6 +148,7 @@ impl PruneInput {
mod tests {
use super::*;
use alloy_primitives::B256;
use reth_primitives_traits::BlockBody;
use reth_provider::{
providers::BlockchainProvider2,
test_utils::{create_test_provider_factory, MockEthProvider},
@ -245,7 +246,7 @@ mod tests {
// Calculate the total number of transactions
let num_txs =
blocks.iter().map(|block| block.body.transactions().count() as u64).sum::<u64>();
blocks.iter().map(|block| block.body.transactions().len() as u64).sum::<u64>();
assert_eq!(range, 0..=num_txs - 1);
}
@ -292,7 +293,7 @@ mod tests {
// Calculate the total number of transactions
let num_txs =
blocks.iter().map(|block| block.body.transactions().count() as u64).sum::<u64>();
blocks.iter().map(|block| block.body.transactions().len() as u64).sum::<u64>();
assert_eq!(range, 0..=num_txs - 1,);
}
@ -327,7 +328,7 @@ mod tests {
// Get the last tx number
// Calculate the total number of transactions
let num_txs =
blocks.iter().map(|block| block.body.transactions().count() as u64).sum::<u64>();
blocks.iter().map(|block| block.body.transactions().len() as u64).sum::<u64>();
let max_range = num_txs - 1;
// Create a prune input with a previous checkpoint that is the last tx number

View File

@ -133,6 +133,8 @@ pub enum ProviderError {
StorageLockError(StorageLockError),
/// Storage writer error.
UnifiedStorageWriterError(UnifiedStorageWriterError),
/// Received invalid output from configured storage implementation.
InvalidStorageOutput,
}
impl From<DatabaseError> for ProviderError {

View File

@ -30,6 +30,7 @@ use reth_primitives::{
Account, Block, BlockWithSenders, Receipt, SealedBlock, SealedBlockWithSenders, SealedHeader,
StorageEntry, TransactionMeta, TransactionSigned, TransactionSignedNoHash,
};
use reth_primitives_traits::BlockBody as _;
use reth_prune_types::{PruneCheckpoint, PruneSegment};
use reth_stages_types::{StageCheckpoint, StageId};
use reth_storage_api::{DBProvider, StorageChangeSetReader};
@ -796,6 +797,7 @@ mod tests {
use reth_primitives::{
BlockExt, Receipt, SealedBlock, StaticFileSegment, TransactionSignedNoHash,
};
use reth_primitives_traits::BlockBody as _;
use reth_storage_api::{
BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, BlockReaderIdExt, BlockSource,
ChangeSetReader, DatabaseProviderFactory, HeaderProvider, ReceiptProvider,

View File

@ -977,7 +977,7 @@ impl<N: ProviderNodeTypes> TransactionsProvider for ConsistentProvider<N> {
fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
if let Some(tx) = self.head_block.as_ref().and_then(|b| b.transaction_on_chain(hash)) {
return Ok(Some(tx.into()))
return Ok(Some(tx))
}
self.storage_provider.transaction_by_hash(hash)
@ -990,7 +990,7 @@ impl<N: ProviderNodeTypes> TransactionsProvider for ConsistentProvider<N> {
if let Some((tx, meta)) =
self.head_block.as_ref().and_then(|b| b.transaction_meta_on_chain(tx_hash))
{
return Ok(Some((tx.into(), meta)))
return Ok(Some((tx, meta)))
}
self.storage_provider.transaction_by_hash_with_meta(tx_hash)
@ -1011,18 +1011,7 @@ impl<N: ProviderNodeTypes> TransactionsProvider for ConsistentProvider<N> {
self.get_in_memory_or_storage_by_block(
id,
|provider| provider.transactions_by_block(id),
|block_state| {
Ok(Some(
block_state
.block_ref()
.block()
.body
.transactions
.iter()
.map(|tx| tx.clone().into())
.collect(),
))
},
|block_state| Ok(Some(block_state.block_ref().block().body.transactions.clone())),
)
}
@ -1033,18 +1022,7 @@ impl<N: ProviderNodeTypes> TransactionsProvider for ConsistentProvider<N> {
self.get_in_memory_or_storage_by_block_range_while(
range,
|db_provider, range, _| db_provider.transactions_by_block_range(range),
|block_state, _| {
Some(
block_state
.block_ref()
.block()
.body
.transactions
.iter()
.map(|tx| tx.clone().into())
.collect(),
)
},
|block_state, _| Some(block_state.block_ref().block().body.transactions.clone()),
|_| true,
)
}

View File

@ -1,25 +1,41 @@
use crate::{providers::NodeTypes, DatabaseProvider};
use crate::{providers::NodeTypesForProvider, DatabaseProvider};
use reth_db::transaction::{DbTx, DbTxMut};
use reth_node_types::FullNodePrimitives;
use reth_primitives::EthPrimitives;
use reth_storage_api::{ChainStorageWriter, EthStorage};
use reth_storage_api::{ChainStorageReader, ChainStorageWriter, EthStorage};
/// Trait that provides access to implementations of [`ChainStorage`]
pub trait ChainStorage<Primitives: FullNodePrimitives>: Send + Sync {
/// Provides access to the chain reader.
fn reader<TX, Types>(&self) -> impl ChainStorageReader<DatabaseProvider<TX, Types>, Primitives>
where
TX: DbTx + 'static,
Types: NodeTypesForProvider<Primitives = Primitives>;
/// Provides access to the chain writer.
fn writer<TX, Types>(&self) -> impl ChainStorageWriter<DatabaseProvider<TX, Types>, Primitives>
where
TX: DbTxMut + DbTx + 'static,
Types: NodeTypes<Primitives = Primitives>;
Types: NodeTypesForProvider<Primitives = Primitives>;
}
impl ChainStorage<EthPrimitives> for EthStorage {
fn reader<TX, Types>(
&self,
) -> impl ChainStorageReader<DatabaseProvider<TX, Types>, EthPrimitives>
where
TX: DbTx + 'static,
Types: NodeTypesForProvider<Primitives = EthPrimitives>,
{
self
}
fn writer<TX, Types>(
&self,
) -> impl ChainStorageWriter<DatabaseProvider<TX, Types>, EthPrimitives>
where
TX: DbTxMut + DbTx + 'static,
Types: NodeTypes<Primitives = EthPrimitives>,
Types: NodeTypesForProvider<Primitives = EthPrimitives>,
{
self
}

View File

@ -50,12 +50,14 @@ use reth_node_types::{BlockTy, NodeTypes, TxTy};
use reth_primitives::{
Account, Block, BlockBody, BlockExt, BlockWithSenders, Bytecode, GotExpected, Receipt,
SealedBlock, SealedBlockWithSenders, SealedHeader, StaticFileSegment, StorageEntry,
TransactionMeta, TransactionSigned, TransactionSignedNoHash,
TransactionMeta, TransactionSignedNoHash,
};
use reth_primitives_traits::{BlockBody as _, SignedTransaction};
use reth_prune_types::{PruneCheckpoint, PruneModes, PruneSegment};
use reth_stages_types::{StageCheckpoint, StageId};
use reth_storage_api::{StateProvider, StorageChangeSetReader, TryIntoHistoricalStateProvider};
use reth_storage_api::{
BlockBodyReader, StateProvider, StorageChangeSetReader, TryIntoHistoricalStateProvider,
};
use reth_storage_errors::provider::{ProviderResult, RootMismatch};
use reth_trie::{
prefix_set::{PrefixSet, PrefixSetMut, TriePrefixSets},
@ -517,21 +519,11 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
N::ChainSpec: EthereumHardforks,
H: AsRef<Header>,
HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
BF: FnOnce(
H,
Vec<TransactionSigned>,
Vec<Address>,
Vec<Header>,
Option<Withdrawals>,
) -> ProviderResult<Option<B>>,
BF: FnOnce(H, BlockBody, Vec<Address>) -> ProviderResult<Option<B>>,
{
let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
let Some(header) = header_by_number(block_number)? else { return Ok(None) };
let ommers = self.ommers(block_number.into())?.unwrap_or_default();
let withdrawals =
self.withdrawals_by_block(block_number.into(), header.as_ref().timestamp)?;
// Get the block body
//
// If the body indices are not found, this means that the transactions either do not exist
@ -548,9 +540,14 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
(self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?)
};
let body = transactions.into_iter().map(Into::into).collect();
let body = self
.storage
.reader()
.read_block_bodies(self, vec![(header.as_ref(), transactions)])?
.pop()
.ok_or(ProviderError::InvalidStorageOutput)?;
construct_block(header, body, senders, ommers, withdrawals)
construct_block(header, body, senders)
}
/// Returns a range of blocks from the database.
@ -572,7 +569,7 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
N::ChainSpec: EthereumHardforks,
H: AsRef<Header>,
HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
F: FnMut(H, Range<TxNumber>, Vec<Header>, Option<Withdrawals>) -> ProviderResult<R>,
F: FnMut(H, BlockBody, Range<TxNumber>) -> ProviderResult<R>,
{
if range.is_empty() {
return Ok(Vec::new())
@ -582,48 +579,39 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
let mut blocks = Vec::with_capacity(len);
let headers = headers_range(range)?;
let mut ommers_cursor = self.tx.cursor_read::<tables::BlockOmmers>()?;
let mut withdrawals_cursor = self.tx.cursor_read::<tables::BlockWithdrawals>()?;
let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<TxTy<N>>>()?;
let mut block_body_cursor = self.tx.cursor_read::<tables::BlockBodyIndices>()?;
let mut present_headers = Vec::new();
for header in headers {
let header_ref = header.as_ref();
// If the body indices are not found, this means that the transactions either do
// not exist in the database yet, or they do exit but are
// not indexed. If they exist but are not indexed, we don't
// have enough information to return the block anyways, so
// we skip the block.
if let Some((_, block_body_indices)) =
block_body_cursor.seek_exact(header_ref.number)?
block_body_cursor.seek_exact(header.as_ref().number)?
{
let tx_range = block_body_indices.tx_num_range();
present_headers.push((header, tx_range));
}
}
// If we are past shanghai, then all blocks should have a withdrawal list,
// even if empty
let withdrawals =
if self.chain_spec.is_shanghai_active_at_timestamp(header_ref.timestamp) {
withdrawals_cursor
.seek_exact(header_ref.number)?
.map(|(_, w)| w.withdrawals)
.unwrap_or_default()
.into()
} else {
None
};
let ommers =
if self.chain_spec.final_paris_total_difficulty(header_ref.number).is_some() {
let mut inputs = Vec::new();
for (header, tx_range) in &present_headers {
let transactions = if tx_range.is_empty() {
Vec::new()
} else {
ommers_cursor
.seek_exact(header_ref.number)?
.map(|(_, o)| o.ommers)
.unwrap_or_default()
self.transactions_by_tx_range_with_cursor(tx_range.clone(), &mut tx_cursor)?
};
if let Ok(b) = assemble_block(header, tx_range, ommers, withdrawals) {
blocks.push(b);
}
inputs.push((header.as_ref(), transactions));
}
let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
for ((header, tx_range), body) in present_headers.into_iter().zip(bodies) {
blocks.push(assemble_block(header, body, tx_range)?);
}
Ok(blocks)
@ -649,34 +637,22 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
N::ChainSpec: EthereumHardforks,
H: AsRef<Header>,
HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
BF: Fn(
H,
Vec<TransactionSigned>,
Vec<Header>,
Option<Withdrawals>,
Vec<Address>,
) -> ProviderResult<B>,
BF: Fn(H, BlockBody, Vec<Address>) -> ProviderResult<B>,
{
let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<TxTy<N>>>()?;
let mut senders_cursor = self.tx.cursor_read::<tables::TransactionSenders>()?;
self.block_range(range, headers_range, |header, tx_range, ommers, withdrawals| {
let (body, senders) = if tx_range.is_empty() {
(Vec::new(), Vec::new())
self.block_range(range, headers_range, |header, body, tx_range| {
let senders = if tx_range.is_empty() {
Vec::new()
} else {
let body = self
.transactions_by_tx_range_with_cursor(tx_range.clone(), &mut tx_cursor)?
.into_iter()
.map(Into::into)
.collect::<Vec<TransactionSigned>>();
// fetch senders from the senders table
let known_senders =
senders_cursor
.walk_range(tx_range.clone())?
.collect::<Result<HashMap<_, _>, _>>()?;
let mut senders = Vec::with_capacity(body.len());
for (tx_num, tx) in tx_range.zip(body.iter()) {
let mut senders = Vec::with_capacity(body.transactions.len());
for (tx_num, tx) in tx_range.zip(body.transactions()) {
match known_senders.get(&tx_num) {
None => {
// recover the sender from the transaction if not found
@ -689,10 +665,10 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
}
}
(body, senders)
senders
};
assemble_block(header, body, ommers, withdrawals, senders)
assemble_block(header, body, senders)
})
}
@ -1230,21 +1206,22 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvid
fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Block>> {
if let Some(number) = self.convert_hash_or_number(id)? {
if let Some(header) = self.header_by_number(number)? {
let withdrawals = self.withdrawals_by_block(number.into(), header.timestamp)?;
let ommers = self.ommers(number.into())?.unwrap_or_default();
// If the body indices are not found, this means that the transactions either do not
// exist in the database yet, or they do exit but are not indexed.
// If they exist but are not indexed, we don't have enough
// information to return the block anyways, so we return `None`.
let transactions = match self.transactions_by_block(number.into())? {
Some(transactions) => transactions.into_iter().map(Into::into).collect(),
None => return Ok(None),
let Some(transactions) = self.transactions_by_block(number.into())? else {
return Ok(None)
};
return Ok(Some(Block {
header,
body: BlockBody { transactions, ommers, withdrawals },
}))
let body = self
.storage
.reader()
.read_block_bodies(self, vec![(&header, transactions)])?
.pop()
.ok_or(ProviderError::InvalidStorageOutput)?;
return Ok(Some(Block { header, body }))
}
}
@ -1303,8 +1280,8 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvid
id,
transaction_kind,
|block_number| self.header_by_number(block_number),
|header, transactions, senders, ommers, withdrawals| {
Block { header, body: BlockBody { transactions, ommers, withdrawals } }
|header, body, senders| {
Block { header, body }
// Note: we're using unchecked here because we know the block contains valid txs
// wrt to its height and can ignore the s value check so pre
// EIP-2 txs are allowed
@ -1324,8 +1301,8 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvid
id,
transaction_kind,
|block_number| self.sealed_header(block_number),
|header, transactions, senders, ommers, withdrawals| {
SealedBlock { header, body: BlockBody { transactions, ommers, withdrawals } }
|header, body, senders| {
SealedBlock { header, body }
// Note: we're using unchecked here because we know the block contains valid txs
// wrt to its height and can ignore the s value check so pre
// EIP-2 txs are allowed
@ -1337,21 +1314,10 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvid
}
fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Block>> {
let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<TxTy<N>>>()?;
self.block_range(
range,
|range| self.headers_range(range),
|header, tx_range, ommers, withdrawals| {
let transactions = if tx_range.is_empty() {
Vec::new()
} else {
self.transactions_by_tx_range_with_cursor(tx_range, &mut tx_cursor)?
.into_iter()
.map(Into::into)
.collect()
};
Ok(Block { header, body: BlockBody { transactions, ommers, withdrawals } })
},
|header, body, _| Ok(Block { header, body }),
)
}
@ -1362,8 +1328,8 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvid
self.block_with_senders_range(
range,
|range| self.headers_range(range),
|header, transactions, ommers, withdrawals, senders| {
Block { header, body: BlockBody { transactions, ommers, withdrawals } }
|header, body, senders| {
Block { header, body }
.try_with_senders_unchecked(senders)
.map_err(|_| ProviderError::SenderRecoveryError)
},
@ -1377,11 +1343,8 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvid
self.block_with_senders_range(
range,
|range| self.sealed_headers_range(range),
|header, transactions, ommers, withdrawals, senders| {
SealedBlockWithSenders::new(
SealedBlock { header, body: BlockBody { transactions, ommers, withdrawals } },
senders,
)
|header, body, senders| {
SealedBlockWithSenders::new(SealedBlock { header, body }, senders)
.ok_or(ProviderError::SenderRecoveryError)
},
)

View File

@ -20,7 +20,6 @@ use reth_blockchain_tree_api::{
};
use reth_chain_state::{ChainInfoTracker, ForkChoiceNotifications, ForkChoiceSubscriptions};
use reth_chainspec::{ChainInfo, EthereumHardforks};
use reth_db::table::Value;
use reth_db_api::models::{AccountBeforeTx, StoredBlockBodyIndices};
use reth_evm::ConfigureEvmEnv;
use reth_node_types::{FullNodePrimitives, NodeTypes, NodeTypesWithDB, TxTy};
@ -77,8 +76,9 @@ where
ChainSpec: EthereumHardforks,
Storage: ChainStorage<Self::Primitives>,
Primitives: FullNodePrimitives<
SignedTx: Value + From<TransactionSigned> + Into<TransactionSigned>,
SignedTx = TransactionSigned,
BlockHeader = alloy_consensus::Header,
BlockBody = reth_primitives::BlockBody,
>,
>,
{
@ -89,8 +89,9 @@ impl<T> NodeTypesForProvider for T where
ChainSpec: EthereumHardforks,
Storage: ChainStorage<T::Primitives>,
Primitives: FullNodePrimitives<
SignedTx: Value + From<TransactionSigned> + Into<TransactionSigned>,
SignedTx = TransactionSigned,
BlockHeader = alloy_consensus::Header,
BlockBody = reth_primitives::BlockBody,
>,
>
{

View File

@ -1,10 +1,11 @@
use crate::DBProvider;
use alloy_primitives::BlockNumber;
use reth_chainspec::{ChainSpecProvider, EthereumHardforks};
use reth_db::{
cursor::DbCursorRW,
cursor::{DbCursorRO, DbCursorRW},
models::{StoredBlockOmmers, StoredBlockWithdrawals},
tables,
transaction::DbTxMut,
transaction::{DbTx, DbTxMut},
DbTxUnwindExt,
};
use reth_primitives_traits::{Block, BlockBody, FullNodePrimitives};
@ -41,6 +42,38 @@ impl<T, Provider, Primitives: FullNodePrimitives> ChainStorageWriter<Provider, P
{
}
/// Input for reading a block body. Contains a header of block being read and a list of pre-fetched
/// transactions.
pub type ReadBodyInput<'a, B> =
(&'a <B as Block>::Header, Vec<<<B as Block>::Body as BlockBody>::Transaction>);
/// Trait that implements how block bodies are read from the storage.
///
/// Note: Within the current abstraction, transactions persistence is handled separately, thus this
/// trait is provided with transactions read beforehand and is expected to construct the block body
/// from those transactions and additional data read from elsewhere.
#[auto_impl::auto_impl(&, Arc)]
pub trait BlockBodyReader<Provider> {
/// The block type.
type Block: Block;
/// Receives a list of block headers along with block transactions and returns the block bodies.
fn read_block_bodies(
&self,
provider: &Provider,
inputs: Vec<ReadBodyInput<'_, Self::Block>>,
) -> ProviderResult<Vec<<Self::Block as Block>::Body>>;
}
/// Trait that implements how chain-specific types are read from storage.
pub trait ChainStorageReader<Provider, Primitives: FullNodePrimitives>:
BlockBodyReader<Provider, Block = Primitives::Block>
{
}
impl<T, Provider, Primitives: FullNodePrimitives> ChainStorageReader<Provider, Primitives> for T where
T: BlockBodyReader<Provider, Block = Primitives::Block>
{
}
/// Ethereum storage implementation.
#[derive(Debug, Default, Clone, Copy)]
pub struct EthStorage;
@ -89,3 +122,47 @@ where
Ok(())
}
}
impl<Provider> BlockBodyReader<Provider> for EthStorage
where
Provider: DBProvider + ChainSpecProvider<ChainSpec: EthereumHardforks>,
{
type Block = reth_primitives::Block;
fn read_block_bodies(
&self,
provider: &Provider,
inputs: Vec<ReadBodyInput<'_, Self::Block>>,
) -> ProviderResult<Vec<<Self::Block as Block>::Body>> {
// TODO: Ideally storage should hold its own copy of chain spec
let chain_spec = provider.chain_spec();
let mut ommers_cursor = provider.tx_ref().cursor_read::<tables::BlockOmmers>()?;
let mut withdrawals_cursor = provider.tx_ref().cursor_read::<tables::BlockWithdrawals>()?;
let mut bodies = Vec::with_capacity(inputs.len());
for (header, transactions) in inputs {
// If we are past shanghai, then all blocks should have a withdrawal list,
// even if empty
let withdrawals = if chain_spec.is_shanghai_active_at_timestamp(header.timestamp) {
withdrawals_cursor
.seek_exact(header.number)?
.map(|(_, w)| w.withdrawals)
.unwrap_or_default()
.into()
} else {
None
};
let ommers = if chain_spec.final_paris_total_difficulty(header.number).is_some() {
Vec::new()
} else {
ommers_cursor.seek_exact(header.number)?.map(|(_, o)| o.ommers).unwrap_or_default()
};
bodies.push(reth_primitives::BlockBody { transactions, ommers, withdrawals });
}
Ok(bodies)
}
}

View File

@ -2,6 +2,7 @@
use alloy_primitives::{BlockNumber, B256};
use reth_execution_types::ChainBlocks;
use reth_primitives_traits::BlockBody as _;
use std::collections::BTreeMap;
/// The type that is used to track canonical blob transactions.
@ -42,6 +43,7 @@ impl BlobStoreCanonTracker {
let iter = block
.body
.transactions()
.iter()
.filter(|tx| tx.transaction.is_eip4844())
.map(|tx| tx.hash());
(*num, iter)