fix(provider): handle race on fn transaction_id (#11380)

This commit is contained in:
joshieDo
2024-10-01 19:42:29 +02:00
committed by GitHub
parent b5b7005741
commit 22ddc98c06

View File

@ -7,7 +7,7 @@ use crate::{
RequestsProvider, StageCheckpointReader, StateProviderBox, StateProviderFactory, StateReader,
StaticFileProviderFactory, TransactionVariant, TransactionsProvider, WithdrawalsProvider,
};
use alloy_eips::{BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag};
use alloy_eips::{BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag, HashOrNumber};
use alloy_primitives::{Address, BlockHash, BlockNumber, Sealable, TxHash, TxNumber, B256, U256};
use alloy_rpc_types_engine::ForkchoiceState;
use reth_chain_state::{
@ -241,16 +241,16 @@ impl<N: ProviderNodeTypes> BlockchainProvider2<N> {
Ok(self.canonical_in_memory_state.state_provider_from_state(state, latest_historical))
}
/// Fetches data from either in-memory state or storage by [`TxNumber`].
fn get_in_memory_or_storage_by_tx_id<S, M, R>(
/// Fetches data from either in-memory state or storage by transaction [`HashOrNumber`].
fn get_in_memory_or_storage_by_tx<S, M, R>(
&self,
id: TxNumber,
id: HashOrNumber,
fetch_from_db: S,
fetch_from_block_state: M,
) -> ProviderResult<Option<R>>
where
S: FnOnce(DatabaseProviderRO<N::DB, N::ChainSpec>) -> ProviderResult<Option<R>>,
M: Fn(usize, Arc<BlockState>) -> ProviderResult<Option<R>>,
M: Fn(usize, TxNumber, Arc<BlockState>) -> ProviderResult<Option<R>>,
{
// Order of instantiation matters. More information on: `fetch_db_mem_range_while`.
let in_mem_chain = self.canonical_in_memory_state.canonical_chain().collect::<Vec<_>>();
@ -274,8 +274,10 @@ impl<N: ProviderNodeTypes> BlockchainProvider2<N> {
// If the transaction number is less than the first in-memory transaction number, make a
// database lookup
if id < in_memory_tx_num {
return fetch_from_db(provider)
if let HashOrNumber::Number(id) = id {
if id < in_memory_tx_num {
return fetch_from_db(provider)
}
}
// Iterate from the lowest block to the highest
@ -284,14 +286,28 @@ impl<N: ProviderNodeTypes> BlockchainProvider2<N> {
let block = executed_block.block();
for tx_index in 0..block.body.transactions.len() {
if id == in_memory_tx_num {
return fetch_from_block_state(tx_index, block_state)
match id {
HashOrNumber::Hash(tx_hash) => {
if tx_hash == block.body.transactions[tx_index].hash() {
return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
}
}
HashOrNumber::Number(id) => {
if id == in_memory_tx_num {
return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
}
}
}
in_memory_tx_num += 1;
}
}
// Not found in-memory, so check database.
if let HashOrNumber::Hash(_) = id {
return fetch_from_db(provider)
}
Ok(None)
}
}
@ -664,50 +680,18 @@ impl<N: ProviderNodeTypes> BlockReader for BlockchainProvider2<N> {
impl<N: ProviderNodeTypes> TransactionsProvider for BlockchainProvider2<N> {
fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
// First, check the database
if let Some(id) = self.database.transaction_id(tx_hash)? {
return Ok(Some(id))
}
// If the transaction is not found in the database, check the in-memory state
// Get the last transaction number stored in the database
let last_database_block_number = self.database.last_block_number()?;
let last_database_tx_id = self
.database
.block_body_indices(last_database_block_number)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?
.last_tx_num();
// Find the transaction in the in-memory state with the matching hash, and return its
// number
let mut in_memory_tx_id = last_database_tx_id + 1;
for block_number in last_database_block_number.saturating_add(1)..=
self.canonical_in_memory_state.get_canonical_block_number()
{
// TODO: there might be an update between loop iterations, we
// need to handle that situation.
let block_state = self
.canonical_in_memory_state
.state_by_number(block_number)
.ok_or(ProviderError::StateForNumberNotFound(block_number))?;
for tx in block_state.block().block().body.transactions() {
if tx.hash() == tx_hash {
return Ok(Some(in_memory_tx_id))
}
in_memory_tx_id += 1;
}
}
Ok(None)
self.get_in_memory_or_storage_by_tx(
tx_hash.into(),
|db_provider| db_provider.transaction_id(tx_hash),
|_, tx_number, _| Ok(Some(tx_number)),
)
}
fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<TransactionSigned>> {
self.get_in_memory_or_storage_by_tx_id(
id,
self.get_in_memory_or_storage_by_tx(
id.into(),
|provider| provider.transaction_by_id(id),
|tx_index, block_state| {
|tx_index, _, block_state| {
Ok(block_state.block().block().body.transactions.get(tx_index).cloned())
},
)
@ -717,10 +701,10 @@ impl<N: ProviderNodeTypes> TransactionsProvider for BlockchainProvider2<N> {
&self,
id: TxNumber,
) -> ProviderResult<Option<TransactionSignedNoHash>> {
self.get_in_memory_or_storage_by_tx_id(
id,
self.get_in_memory_or_storage_by_tx(
id.into(),
|provider| provider.transaction_by_id_no_hash(id),
|tx_index, block_state| {
|tx_index, _, block_state| {
Ok(block_state
.block()
.block()
@ -755,10 +739,10 @@ impl<N: ProviderNodeTypes> TransactionsProvider for BlockchainProvider2<N> {
}
fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
self.get_in_memory_or_storage_by_tx_id(
id,
self.get_in_memory_or_storage_by_tx(
id.into(),
|provider| provider.transaction_block(id),
|_, block_state| Ok(Some(block_state.block().block().number)),
|_, _, block_state| Ok(Some(block_state.block().block().number)),
)
}
@ -831,10 +815,10 @@ impl<N: ProviderNodeTypes> TransactionsProvider for BlockchainProvider2<N> {
}
fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
self.get_in_memory_or_storage_by_tx_id(
id,
self.get_in_memory_or_storage_by_tx(
id.into(),
|provider| provider.transaction_sender(id),
|tx_index, block_state| {
|tx_index, _, block_state| {
Ok(block_state
.block()
.block()
@ -849,10 +833,10 @@ impl<N: ProviderNodeTypes> TransactionsProvider for BlockchainProvider2<N> {
impl<N: ProviderNodeTypes> ReceiptProvider for BlockchainProvider2<N> {
fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Receipt>> {
self.get_in_memory_or_storage_by_tx_id(
id,
self.get_in_memory_or_storage_by_tx(
id.into(),
|provider| provider.receipt(id),
|tx_index, block_state| {
|tx_index, _, block_state| {
Ok(block_state.executed_block_receipts().get(tx_index).cloned())
},
)