diff --git a/crates/stages/stages/src/test_utils/test_db.rs b/crates/stages/stages/src/test_utils/test_db.rs index 37fdacd28..f2d653c0b 100644 --- a/crates/stages/stages/src/test_utils/test_db.rs +++ b/crates/stages/stages/src/test_utils/test_db.rs @@ -349,9 +349,7 @@ impl TestStageDB { let mut writer = provider.latest_writer(StaticFileSegment::Receipts)?; let res = receipts.into_iter().try_for_each(|(block_num, receipts)| { writer.increment_block(StaticFileSegment::Receipts, block_num)?; - for (tx_num, receipt) in receipts { - writer.append_receipt(tx_num, receipt)?; - } + writer.append_receipts(receipts.into_iter().map(Ok))?; Ok(()) }); writer.commit_without_sync_all()?; diff --git a/crates/static-file/static-file/src/segments/receipts.rs b/crates/static-file/static-file/src/segments/receipts.rs index 06102a7d8..e0ed58086 100644 --- a/crates/static-file/static-file/src/segments/receipts.rs +++ b/crates/static-file/static-file/src/segments/receipts.rs @@ -42,11 +42,9 @@ impl Segment for Receipts { let mut receipts_cursor = provider.tx_ref().cursor_read::()?; let receipts_walker = receipts_cursor.walk_range(block_body_indices.tx_num_range())?; - for entry in receipts_walker { - let (tx_number, receipt) = entry?; - - static_file_writer.append_receipt(tx_number, receipt)?; - } + static_file_writer.append_receipts( + receipts_walker.map(|result| result.map_err(ProviderError::from)), + )?; } Ok(()) diff --git a/crates/storage/provider/src/bundle_state/bundle_state_with_receipts.rs b/crates/storage/provider/src/bundle_state/bundle_state_with_receipts.rs index 81198481e..de5aff11b 100644 --- a/crates/storage/provider/src/bundle_state/bundle_state_with_receipts.rs +++ b/crates/storage/provider/src/bundle_state/bundle_state_with_receipts.rs @@ -42,12 +42,14 @@ impl StateWriter for ExecutionOutcome { if let Some(static_file_producer) = &mut static_file_producer { // Increment block on static file header. static_file_producer.increment_block(StaticFileSegment::Receipts, block_number)?; - - for (tx_idx, receipt) in receipts.into_iter().enumerate() { - let receipt = receipt - .expect("receipt should not be filtered when saving to static files."); - static_file_producer.append_receipt(first_tx_index + tx_idx as u64, receipt)?; - } + let receipts = receipts.into_iter().enumerate().map(|(tx_idx, receipt)| { + Ok(( + first_tx_index + tx_idx as u64, + receipt + .expect("receipt should not be filtered when saving to static files."), + )) + }); + static_file_producer.append_receipts(receipts)?; } else if !receipts.is_empty() { for (tx_idx, receipt) in receipts.into_iter().enumerate() { if let Some(receipt) = receipt { diff --git a/crates/storage/provider/src/providers/static_file/metrics.rs b/crates/storage/provider/src/providers/static_file/metrics.rs index f1a4204a7..72589ca69 100644 --- a/crates/storage/provider/src/providers/static_file/metrics.rs +++ b/crates/storage/provider/src/providers/static_file/metrics.rs @@ -80,6 +80,28 @@ impl StaticFileProviderMetrics { .record(duration.as_secs_f64()); } } + + pub(crate) fn record_segment_operations( + &self, + segment: StaticFileSegment, + operation: StaticFileProviderOperation, + count: u64, + duration: Option, + ) { + self.segment_operations + .get(&(segment, operation)) + .expect("segment operation metrics should exist") + .calls_total + .increment(count); + + if let Some(duration) = duration { + self.segment_operations + .get(&(segment, operation)) + .expect("segment operation metrics should exist") + .write_duration_seconds + .record(duration.as_secs_f64() / count as f64); + } + } } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumIter)] diff --git a/crates/storage/provider/src/providers/static_file/writer.rs b/crates/storage/provider/src/providers/static_file/writer.rs index 147d812ce..304429a01 100644 --- a/crates/storage/provider/src/providers/static_file/writer.rs +++ b/crates/storage/provider/src/providers/static_file/writer.rs @@ -547,6 +547,44 @@ impl StaticFileProviderRW { Ok(result) } + /// Appends multiple receipts to the static file. + /// + /// Returns the current [`TxNumber`] as seen in the static file, if any. + pub fn append_receipts(&mut self, receipts: I) -> ProviderResult> + where + I: IntoIterator>, + { + let mut receipts_iter = receipts.into_iter().peekable(); + // If receipts are empty, we can simply return None + if receipts_iter.peek().is_none() { + return Ok(None); + } + + let start = Instant::now(); + self.ensure_no_queued_prune()?; + + // At this point receipts contains at least one receipt, so this would be overwritten. + let mut tx_number = 0; + let mut count: u64 = 0; + + for receipt_result in receipts_iter { + let (tx_num, receipt) = receipt_result?; + tx_number = self.append_with_tx_number(StaticFileSegment::Receipts, tx_num, receipt)?; + count += 1; + } + + if let Some(metrics) = &self.metrics { + metrics.record_segment_operations( + StaticFileSegment::Receipts, + StaticFileProviderOperation::Append, + count, + Some(start.elapsed()), + ); + } + + Ok(Some(tx_number)) + } + /// Adds an instruction to prune `to_delete`transactions during commit. /// /// Note: `last_block` refers to the block the unwinds ends at.