feat: add append_receipts function (#8718)

Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
This commit is contained in:
Krishang
2024-06-13 19:38:57 +05:30
committed by GitHub
parent 41933b76de
commit e5111f0339
5 changed files with 72 additions and 14 deletions

View File

@ -349,9 +349,7 @@ impl TestStageDB {
let mut writer = provider.latest_writer(StaticFileSegment::Receipts)?;
let res = receipts.into_iter().try_for_each(|(block_num, receipts)| {
writer.increment_block(StaticFileSegment::Receipts, block_num)?;
for (tx_num, receipt) in receipts {
writer.append_receipt(tx_num, receipt)?;
}
writer.append_receipts(receipts.into_iter().map(Ok))?;
Ok(())
});
writer.commit_without_sync_all()?;

View File

@ -42,11 +42,9 @@ impl<DB: Database> Segment<DB> for Receipts {
let mut receipts_cursor = provider.tx_ref().cursor_read::<tables::Receipts>()?;
let receipts_walker = receipts_cursor.walk_range(block_body_indices.tx_num_range())?;
for entry in receipts_walker {
let (tx_number, receipt) = entry?;
static_file_writer.append_receipt(tx_number, receipt)?;
}
static_file_writer.append_receipts(
receipts_walker.map(|result| result.map_err(ProviderError::from)),
)?;
}
Ok(())

View File

@ -42,12 +42,14 @@ impl StateWriter for ExecutionOutcome {
if let Some(static_file_producer) = &mut static_file_producer {
// Increment block on static file header.
static_file_producer.increment_block(StaticFileSegment::Receipts, block_number)?;
for (tx_idx, receipt) in receipts.into_iter().enumerate() {
let receipt = receipt
.expect("receipt should not be filtered when saving to static files.");
static_file_producer.append_receipt(first_tx_index + tx_idx as u64, receipt)?;
}
let receipts = receipts.into_iter().enumerate().map(|(tx_idx, receipt)| {
Ok((
first_tx_index + tx_idx as u64,
receipt
.expect("receipt should not be filtered when saving to static files."),
))
});
static_file_producer.append_receipts(receipts)?;
} else if !receipts.is_empty() {
for (tx_idx, receipt) in receipts.into_iter().enumerate() {
if let Some(receipt) = receipt {

View File

@ -80,6 +80,28 @@ impl StaticFileProviderMetrics {
.record(duration.as_secs_f64());
}
}
pub(crate) fn record_segment_operations(
&self,
segment: StaticFileSegment,
operation: StaticFileProviderOperation,
count: u64,
duration: Option<Duration>,
) {
self.segment_operations
.get(&(segment, operation))
.expect("segment operation metrics should exist")
.calls_total
.increment(count);
if let Some(duration) = duration {
self.segment_operations
.get(&(segment, operation))
.expect("segment operation metrics should exist")
.write_duration_seconds
.record(duration.as_secs_f64() / count as f64);
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumIter)]

View File

@ -547,6 +547,44 @@ impl StaticFileProviderRW {
Ok(result)
}
/// Appends multiple receipts to the static file.
///
/// Returns the current [`TxNumber`] as seen in the static file, if any.
pub fn append_receipts<I>(&mut self, receipts: I) -> ProviderResult<Option<TxNumber>>
where
I: IntoIterator<Item = Result<(TxNumber, Receipt), ProviderError>>,
{
let mut receipts_iter = receipts.into_iter().peekable();
// If receipts are empty, we can simply return None
if receipts_iter.peek().is_none() {
return Ok(None);
}
let start = Instant::now();
self.ensure_no_queued_prune()?;
// At this point receipts contains at least one receipt, so this would be overwritten.
let mut tx_number = 0;
let mut count: u64 = 0;
for receipt_result in receipts_iter {
let (tx_num, receipt) = receipt_result?;
tx_number = self.append_with_tx_number(StaticFileSegment::Receipts, tx_num, receipt)?;
count += 1;
}
if let Some(metrics) = &self.metrics {
metrics.record_segment_operations(
StaticFileSegment::Receipts,
StaticFileProviderOperation::Append,
count,
Some(start.elapsed()),
);
}
Ok(Some(tx_number))
}
/// Adds an instruction to prune `to_delete`transactions during commit.
///
/// Note: `last_block` refers to the block the unwinds ends at.