feat: add get_in_memory_or_storage_by_tx_range (#11414)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
joshieDo
2024-10-02 19:45:06 +02:00
committed by GitHub
parent c4ce9977f2
commit 5ec448ef94
2 changed files with 158 additions and 18 deletions

View File

@ -618,6 +618,11 @@ impl BlockState {
self.block.clone()
}
/// Returns a reference to the executed block that determines the state.
pub const fn block_ref(&self) -> &ExecutedBlock {
&self.block
}
/// Returns the block with senders for the state.
pub fn block_with_senders(&self) -> BlockWithSenders {
let block = self.block.block().clone();

View File

@ -168,6 +168,10 @@ impl<N: ProviderNodeTypes> BlockchainProvider2<N> {
.unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default())
});
if start > end {
return Ok(vec![])
}
// Split range into storage_range and in-memory range. If the in-memory range is not
// necessary drop it early.
//
@ -241,6 +245,103 @@ impl<N: ProviderNodeTypes> BlockchainProvider2<N> {
Ok(self.canonical_in_memory_state.state_provider_from_state(state, latest_historical))
}
/// Fetches data from either in-memory state or persistent storage for a range of transactions.
///
/// * `fetch_from_db`: has a [`DatabaseProviderRO`] and the storage specific range.
/// * `fetch_from_block_state`: has a [`RangeInclusive`] of elements that should be fetched from
/// [`BlockState`]. [`RangeInclusive`] is necessary to handle partial look-ups of a block.
fn get_in_memory_or_storage_by_tx_range<S, M, R>(
&self,
range: impl RangeBounds<BlockNumber>,
fetch_from_db: S,
fetch_from_block_state: M,
) -> ProviderResult<Vec<R>>
where
S: FnOnce(
DatabaseProviderRO<N::DB, N::ChainSpec>,
RangeInclusive<TxNumber>,
) -> ProviderResult<Vec<R>>,
M: Fn(RangeInclusive<usize>, Arc<BlockState>) -> ProviderResult<Vec<R>>,
{
let in_mem_chain = self.canonical_in_memory_state.canonical_chain().collect::<Vec<_>>();
let provider = self.database.provider()?;
// Get the last block number stored in the storage which does NOT overlap with in-memory
// chain.
let mut last_database_block_number = provider.last_block_number()?;
if let Some(lowest_in_mem_block) = in_mem_chain.last() {
if lowest_in_mem_block.number() <= last_database_block_number {
last_database_block_number = lowest_in_mem_block.number().saturating_sub(1);
}
}
// Get the next tx number for the last block stored in the storage, which marks the start of
// the in-memory state.
let last_block_body_index = provider
.block_body_indices(last_database_block_number)?
.ok_or_else(|| ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
let mut in_memory_tx_num = last_block_body_index.next_tx_num();
let (start, end) = self.convert_range_bounds(range, || {
in_mem_chain
.iter()
.map(|b| b.block_ref().block().body.transactions.len() as u64)
.sum::<u64>() +
last_block_body_index.last_tx_num()
});
if start > end {
return Ok(vec![])
}
let mut tx_range = start..=end;
// If the range is entirely before the first in-memory transaction number, fetch from
// storage
if *tx_range.end() < in_memory_tx_num {
return fetch_from_db(provider, tx_range);
}
let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize);
// If the range spans storage and memory, get elements from storage first.
if *tx_range.start() < in_memory_tx_num {
// Determine the range that needs to be fetched from storage.
let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1);
// Set the remaining transaction range for in-memory
tx_range = in_memory_tx_num..=*tx_range.end();
items.extend(fetch_from_db(provider, db_range)?);
}
// Iterate from the lowest block to the highest in-memory chain
for block_state in in_mem_chain.into_iter().rev() {
let block_tx_count = block_state.block_ref().block().body.transactions.len();
let remaining = (tx_range.end() - tx_range.start() + 1) as usize;
// This should only be more than 0 in the first iteration, in case of a partial range
let skip = (tx_range.start() - in_memory_tx_num) as usize;
items.extend(fetch_from_block_state(
skip..=(remaining.min(block_tx_count) - 1),
block_state,
)?);
in_memory_tx_num += block_tx_count as u64;
// Break if the range has been fully processed
if in_memory_tx_num > *tx_range.end() {
break
}
// Set updated range
tx_range = in_memory_tx_num..=*tx_range.end();
}
Ok(items)
}
/// Fetches data from either in-memory state or persistent storage by transaction
/// [`HashOrNumber`].
fn get_in_memory_or_storage_by_tx<S, M, R>(
@ -807,14 +908,28 @@ impl<N: ProviderNodeTypes> TransactionsProvider for BlockchainProvider2<N> {
&self,
range: impl RangeBounds<TxNumber>,
) -> ProviderResult<Vec<TransactionSignedNoHash>> {
self.database.transactions_by_tx_range(range)
self.get_in_memory_or_storage_by_tx_range(
range,
|db_provider, db_range| db_provider.transactions_by_tx_range(db_range),
|index_range, block_state| {
Ok(block_state.block_ref().block().body.transactions[index_range]
.iter()
.cloned()
.map(Into::into)
.collect())
},
)
}
fn senders_by_tx_range(
&self,
range: impl RangeBounds<TxNumber>,
) -> ProviderResult<Vec<Address>> {
self.database.senders_by_tx_range(range)
self.get_in_memory_or_storage_by_tx_range(
range,
|db_provider, db_range| db_provider.senders_by_tx_range(db_range),
|index_range, block_state| Ok(block_state.block_ref().senders[index_range].to_vec()),
)
}
fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
@ -872,7 +987,13 @@ impl<N: ProviderNodeTypes> ReceiptProvider for BlockchainProvider2<N> {
&self,
range: impl RangeBounds<TxNumber>,
) -> ProviderResult<Vec<Receipt>> {
self.database.receipts_by_tx_range(range)
self.get_in_memory_or_storage_by_tx_range(
range,
|db_provider, db_range| db_provider.receipts_by_tx_range(db_range),
|index_range, block_state| {
Ok(block_state.executed_block_receipts().drain(index_range).collect())
},
)
}
}
@ -4058,34 +4179,48 @@ mod tests {
#[test]
fn test_senders_by_tx_range() -> eyre::Result<()> {
let mut rng = generators::rng();
let (provider, database_blocks, _, _) = provider_with_random_blocks(
let (provider, database_blocks, in_memory_blocks, _) = provider_with_random_blocks(
&mut rng,
TEST_BLOCKS_COUNT,
0,
TEST_BLOCKS_COUNT,
BlockRangeParams {
tx_count: TEST_TRANSACTIONS_COUNT..TEST_TRANSACTIONS_COUNT,
..Default::default()
},
)?;
// Define a valid transaction range within the database
let start_tx_num = 0;
let end_tx_num = 1;
let db_tx_count =
database_blocks.iter().map(|b| b.body.transactions.len()).sum::<usize>() as u64;
let in_mem_tx_count =
in_memory_blocks.iter().map(|b| b.body.transactions.len()).sum::<usize>() as u64;
// Retrieve the senders for this transaction number range
let result = provider.senders_by_tx_range(start_tx_num..=end_tx_num)?;
let db_range = 0..=(db_tx_count - 1);
let in_mem_range = db_tx_count..=(in_mem_tx_count + db_range.end());
// Ensure the sender addresses match the expected addresses in the database
assert_eq!(result.len(), 2);
// Retrieve the senders for the whole database range
let database_senders =
database_blocks.iter().flat_map(|b| b.senders().unwrap()).collect::<Vec<_>>();
assert_eq!(provider.senders_by_tx_range(db_range)?, database_senders);
// Retrieve the senders for the whole in-memory range
let in_memory_senders =
in_memory_blocks.iter().flat_map(|b| b.senders().unwrap()).collect::<Vec<_>>();
assert_eq!(provider.senders_by_tx_range(in_mem_range.clone())?, in_memory_senders);
// Retrieve the senders for a partial in-memory range
assert_eq!(
result[0],
database_blocks[0].senders().unwrap()[0],
"The sender address should match the expected sender address"
&provider.senders_by_tx_range(in_mem_range.start() + 1..=in_mem_range.end() - 1)?,
&in_memory_senders[1..in_memory_senders.len() - 1]
);
// Retrieve the senders for a range that spans database and in-memory
assert_eq!(
result[1],
database_blocks[0].senders().unwrap()[1],
"The sender address should match the expected sender address"
provider.senders_by_tx_range(in_mem_range.start() - 2..=in_mem_range.end() - 1)?,
database_senders[database_senders.len() - 2..]
.iter()
.chain(&in_memory_senders[..in_memory_senders.len() - 1])
.copied()
.collect::<Vec<_>>()
);
// Define an empty range that should return no sender addresses