feat: extend BlockBodyIndicesProvider with block_body_indices_range (#13829)

This commit is contained in:
joshieDo
2025-01-17 11:21:05 +00:00
committed by GitHub
parent 43bd94ac4e
commit a8c883c6b6
12 changed files with 158 additions and 77 deletions

View File

@ -618,27 +618,23 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
let len = range.end().saturating_sub(*range.start()) as usize;
let mut blocks = Vec::with_capacity(len);
let headers = headers_range(range)?;
let headers = headers_range(range.clone())?;
let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<TxTy<N>>>()?;
let mut block_body_cursor = self.tx.cursor_read::<tables::BlockBodyIndices>()?;
let mut present_headers = Vec::new();
for header in headers {
// If the body indices are not found, this means that the transactions either do
// not exist in the database yet, or they do exit but are
// not indexed. If they exist but are not indexed, we don't
// have enough information to return the block anyways, so
// we skip the block.
if let Some((_, block_body_indices)) =
block_body_cursor.seek_exact(header.as_ref().number())?
{
let tx_range = block_body_indices.tx_num_range();
present_headers.push((header, tx_range));
}
}
// If the body indices are not found, this means that the transactions either do
// not exist in the database yet, or they do exit but are
// not indexed. If they exist but are not indexed, we don't
// have enough information to return the block anyways, so
// we skip the block.
let present_headers = self
.block_body_indices_range(range)?
.into_iter()
.map(|b| b.tx_num_range())
.zip(headers)
.collect::<Vec<_>>();
let mut inputs = Vec::new();
for (header, tx_range) in &present_headers {
for (tx_range, header) in &present_headers {
let transactions = if tx_range.is_empty() {
Vec::new()
} else {
@ -650,7 +646,7 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
for ((header, tx_range), body) in present_headers.into_iter().zip(bodies) {
for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
blocks.push(assemble_block(header, body, tx_range)?);
}
@ -1476,23 +1472,23 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for Datab
&self,
range: impl RangeBounds<BlockNumber>,
) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
let range = to_range(range);
let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<Self::Transaction>>()?;
let mut results = Vec::new();
let mut body_cursor = self.tx.cursor_read::<tables::BlockBodyIndices>()?;
for entry in body_cursor.walk_range(range)? {
let (_, body) = entry?;
let tx_num_range = body.tx_num_range();
if tx_num_range.is_empty() {
results.push(Vec::new());
} else {
results.push(
self.transactions_by_tx_range_with_cursor(tx_num_range, &mut tx_cursor)?
self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
.into_iter()
.map(|body| {
let tx_num_range = body.tx_num_range();
if tx_num_range.is_empty() {
Ok(Vec::new())
} else {
Ok(self
.transactions_by_tx_range_with_cursor(tx_num_range, &mut tx_cursor)?
.into_iter()
.collect(),
);
}
}
Ok(results)
.collect())
}
})
.collect()
}
fn transactions_by_tx_range(
@ -1620,6 +1616,18 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
}
fn block_body_indices_range(
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
Ok(self
.tx_ref()
.cursor_read::<tables::BlockBodyIndices>()?
.walk_range(range)?
.map(|r| r.map(|(_, b)| b))
.collect::<Result<_, _>>()?)
}
}
impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
@ -1756,13 +1764,31 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
is_value_known: OriginalValuesKnown,
write_receipts_to: StorageLocation,
) -> ProviderResult<()> {
let first_block = execution_outcome.first_block();
let block_count = execution_outcome.receipts.len() as u64;
let block_range = first_block..=first_block.saturating_add(block_count).saturating_sub(1);
let last_block = *block_range.end();
let (plain_state, reverts) =
execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
self.write_state_reverts(reverts, execution_outcome.first_block)?;
self.write_state_reverts(reverts, first_block)?;
self.write_state_changes(plain_state)?;
let mut bodies_cursor = self.tx.cursor_read::<tables::BlockBodyIndices>()?;
// Fetch the first transaction number for each block in the range
let block_indices: Vec<_> = self
.block_body_indices_range(block_range)?
.into_iter()
.map(|b| b.first_tx_num)
.collect();
// Ensure all expected blocks are present.
if block_indices.len() < block_count as usize {
let missing_blocks = block_count - block_indices.len() as u64;
return Err(ProviderError::BlockBodyIndicesNotFound(
last_block.saturating_sub(missing_blocks - 1),
));
}
let has_receipts_pruning = self.prune_modes.has_receipts_pruning() ||
execution_outcome.receipts.iter().flatten().any(|receipt| receipt.is_none());
@ -1780,25 +1806,19 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
// We are writing to static files if requested and if there's no receipt pruning configured
let mut receipts_static_writer = (write_receipts_to.static_files() &&
!has_receipts_pruning)
.then(|| {
self.static_file_provider
.get_writer(execution_outcome.first_block, StaticFileSegment::Receipts)
})
.then(|| self.static_file_provider.get_writer(first_block, StaticFileSegment::Receipts))
.transpose()?;
for (idx, receipts) in execution_outcome.receipts.iter().enumerate() {
let block_number = execution_outcome.first_block + idx as u64;
for (idx, (receipts, first_tx_index)) in
execution_outcome.receipts.iter().zip(block_indices).enumerate()
{
let block_number = first_block + idx as u64;
// Increment block number for receipts static file writer
if let Some(writer) = receipts_static_writer.as_mut() {
writer.increment_block(block_number)?;
}
let first_tx_index = bodies_cursor
.seek_exact(block_number)?
.map(|(_, indices)| indices.first_tx_num())
.ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))?;
for (idx, receipt) in receipts.iter().enumerate() {
let receipt_idx = first_tx_index + idx as u64;
if let Some(receipt) = receipt {
@ -2017,11 +2037,11 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
}
// We are not removing block meta as it is used to get block changesets.
let block_bodies = self.get::<tables::BlockBodyIndices>(range.clone())?;
let block_bodies = self.block_body_indices_range(range.clone())?;
// get transaction receipts
let from_transaction_num =
block_bodies.first().expect("already checked if there are blocks").1.first_tx_num();
block_bodies.first().expect("already checked if there are blocks").first_tx_num();
let storage_range = BlockNumberAddress::range(range.clone());
@ -2113,13 +2133,13 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
let start_block_number = *range.start();
// We are not removing block meta as it is used to get block changesets.
let block_bodies = self.get::<tables::BlockBodyIndices>(range.clone())?;
let block_bodies = self.block_body_indices_range(range.clone())?;
// get transaction receipts
let from_transaction_num =
block_bodies.first().expect("already checked if there are blocks").1.first_tx_num();
block_bodies.first().expect("already checked if there are blocks").first_tx_num();
let to_transaction_num =
block_bodies.last().expect("already checked if there are blocks").1.last_tx_num();
block_bodies.last().expect("already checked if there are blocks").last_tx_num();
let storage_range = BlockNumberAddress::range(range.clone());
@ -2199,7 +2219,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
let mut receipts = Vec::with_capacity(block_bodies.len());
// loop break if we are at the end of the blocks.
for (_, block_body) in block_bodies {
for block_body in block_bodies {
let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
for num in block_body.tx_num_range() {
if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
@ -2920,8 +2940,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWrite
// First transaction to be removed
let unwind_tx_from = self
.tx
.get::<tables::BlockBodyIndices>(block)?
.block_body_indices(block)?
.map(|b| b.next_tx_num())
.ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
@ -2957,8 +2976,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWrite
// First transaction to be removed
let unwind_tx_from = self
.tx
.get::<tables::BlockBodyIndices>(block)?
.block_body_indices(block)?
.map(|b| b.next_tx_num())
.ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;