diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index 5080b9b9e..bce56880a 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -146,8 +146,13 @@ impl Stage for BodyStage { // If static files are ahead, then we didn't reach the database commit in a previous // stage run. So, our only solution is to unwind the static files and proceed from the // database expected height. - Ordering::Greater => static_file_producer - .prune_transactions(next_static_file_tx_num - next_tx_num, from_block - 1)?, + Ordering::Greater => { + static_file_producer + .prune_transactions(next_static_file_tx_num - next_tx_num, from_block - 1)?; + // Since this is a database <-> static file inconsistency, we commit the change + // straight away. + static_file_producer.commit()?; + } // If static files are behind, then there was some corruption or loss of files. This // error will trigger an unwind, that will bring the database to the same height as the // static files. @@ -576,6 +581,7 @@ mod tests { let mut static_file_producer = static_file_provider.latest_writer(StaticFileSegment::Transactions).unwrap(); static_file_producer.prune_transactions(1, checkpoint.block_number).unwrap(); + static_file_producer.commit().unwrap(); } // Unwind all of it let unwind_to = 1; diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 0db907211..6d2eb2a5d 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -169,7 +169,11 @@ where let static_file_producer = if self.prune_modes.receipts.is_none() && self.prune_modes.receipts_log_filter.is_empty() { - Some(prepare_static_file_producer(provider, start_block)?) + let mut producer = prepare_static_file_producer(provider, start_block)?; + // Since there might be a database <-> static file inconsistency (read + // `prepare_static_file_producer` for context), we commit the change straight away. + producer.commit()?; + Some(producer) } else { None }; diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index 77fcf2e15..cdf33b40f 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -582,6 +582,7 @@ mod tests { let hash = last_header.hash_slow(); writer.prune_headers(1).unwrap(); + writer.commit().unwrap(); writer.append_header(last_header, U256::ZERO, hash).unwrap(); writer.commit().unwrap(); diff --git a/crates/static-file/src/static_file_producer.rs b/crates/static-file/src/static_file_producer.rs index 0b0720e21..c7a365c9a 100644 --- a/crates/static-file/src/static_file_producer.rs +++ b/crates/static-file/src/static_file_producer.rs @@ -272,12 +272,13 @@ mod tests { db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks"); // Unwind headers from static_files and manually insert them into the database, so we're // able to check that static_file_producer works - db.factory - .static_file_provider() + let static_file_provider = db.factory.static_file_provider(); + let mut static_file_writer = static_file_provider .latest_writer(StaticFileSegment::Headers) - .expect("get static file writer for headers") - .prune_headers(blocks.len() as u64) - .expect("prune headers"); + .expect("get static file writer for headers"); + static_file_writer.prune_headers(blocks.len() as u64).unwrap(); + static_file_writer.commit().expect("prune headers"); + let tx = db.factory.db_ref().tx_mut().expect("init tx"); blocks.iter().for_each(|block| { TestStageDB::insert_header(None, &tx, &block.header, U256::ZERO) diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 2cae000ce..428645f1a 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -1112,7 +1112,10 @@ impl HeaderSyncGapProvider for DatabaseProvider { Ordering::Greater => { let mut static_file_producer = static_file_provider.latest_writer(StaticFileSegment::Headers)?; - static_file_producer.prune_headers(next_static_file_block_num - next_block)? + static_file_producer.prune_headers(next_static_file_block_num - next_block)?; + // Since this is a database <-> static file inconsistency, we commit the change + // straight away. + static_file_producer.commit()? } Ordering::Less => { // There's either missing or corrupted files. diff --git a/crates/storage/provider/src/providers/static_file/writer.rs b/crates/storage/provider/src/providers/static_file/writer.rs index d1aa8560f..3a0f2d031 100644 --- a/crates/storage/provider/src/providers/static_file/writer.rs +++ b/crates/storage/provider/src/providers/static_file/writer.rs @@ -30,10 +30,17 @@ pub struct StaticFileProviderRW { /// stored in a [dashmap::DashMap] inside the parent [StaticFileProvider].which is an [Arc]. /// If we were to use an [Arc] here, we would create a reference cycle. reader: Weak, + /// A [`NippyJarWriter`] instance. writer: NippyJarWriter, + /// Path to opened file. data_path: PathBuf, + /// Reusable buffer for encoding appended data. buf: Vec, + /// Metrics. metrics: Option>, + /// On commit, does the instructed pruning: number of lines, and if it applies, the last block + /// it ends at. + prune_on_commit: Option<(u64, Option)>, } impl StaticFileProviderRW { @@ -45,7 +52,14 @@ impl StaticFileProviderRW { metrics: Option>, ) -> ProviderResult { let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?; - Ok(Self { writer, data_path, buf: Vec::with_capacity(100), reader, metrics }) + Ok(Self { + writer, + data_path, + buf: Vec::with_capacity(100), + reader, + metrics, + prune_on_commit: None, + }) } fn open( @@ -100,6 +114,18 @@ impl StaticFileProviderRW { pub fn commit(&mut self) -> ProviderResult<()> { let start = Instant::now(); + // Truncates the data file if instructed to. + if let Some((to_delete, last_block_number)) = self.prune_on_commit.take() { + match self.writer.user_header().segment() { + StaticFileSegment::Headers => self.prune_header_data(to_delete)?, + StaticFileSegment::Transactions => self + .prune_transaction_data(to_delete, last_block_number.expect("should exist"))?, + StaticFileSegment::Receipts => { + self.prune_receipt_data(to_delete, last_block_number.expect("should exist"))? + } + } + } + // Commits offsets and new user_header to disk self.writer.commit().map_err(|e| ProviderError::NippyJar(e.to_string()))?; @@ -372,6 +398,7 @@ impl StaticFileProviderRW { hash: BlockHash, ) -> ProviderResult { let start = Instant::now(); + self.ensure_no_queued_prune()?; debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers); @@ -404,6 +431,7 @@ impl StaticFileProviderRW { tx: TransactionSignedNoHash, ) -> ProviderResult { let start = Instant::now(); + self.ensure_no_queued_prune()?; let result = self.append_with_tx_number(StaticFileSegment::Transactions, tx_num, tx)?; @@ -430,6 +458,7 @@ impl StaticFileProviderRW { receipt: Receipt, ) -> ProviderResult { let start = Instant::now(); + self.ensure_no_queued_prune()?; let result = self.append_with_tx_number(StaticFileSegment::Receipts, tx_num, receipt)?; @@ -444,13 +473,64 @@ impl StaticFileProviderRW { Ok(result) } - /// Removes the last `number` of transactions from static files. + /// Adds an instruction to prune `to_delete`transactions during commit. /// - /// # Note - /// Commits to the configuration file at the end. + /// Note: `last_block` refers to the block the unwinds ends at. pub fn prune_transactions( &mut self, - number: u64, + to_delete: u64, + last_block: BlockNumber, + ) -> ProviderResult<()> { + debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Transactions); + self.queue_prune(to_delete, Some(last_block)) + } + + /// Adds an instruction to prune `to_delete` receipts during commit. + /// + /// Note: `last_block` refers to the block the unwinds ends at. + pub fn prune_receipts( + &mut self, + to_delete: u64, + last_block: BlockNumber, + ) -> ProviderResult<()> { + debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Receipts); + self.queue_prune(to_delete, Some(last_block)) + } + + /// Adds an instruction to prune `to_delete` headers during commit. + pub fn prune_headers(&mut self, to_delete: u64) -> ProviderResult<()> { + debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Headers); + self.queue_prune(to_delete, None) + } + + /// Adds an instruction to prune `to_delete` elements during commit. + /// + /// Note: `last_block` refers to the block the unwinds ends at if dealing with transaction-based + /// data. + fn queue_prune( + &mut self, + to_delete: u64, + last_block: Option, + ) -> ProviderResult<()> { + self.ensure_no_queued_prune()?; + self.prune_on_commit = Some((to_delete, last_block)); + Ok(()) + } + + /// Returns Error if there is a pruning instruction that needs to be applied. + fn ensure_no_queued_prune(&self) -> ProviderResult<()> { + if self.prune_on_commit.is_some() { + return Err(ProviderError::NippyJar( + "Pruning should be comitted before appending or pruning more data".to_string(), + )); + } + Ok(()) + } + + /// Removes the last `to_delete` transactions from the data file. + fn prune_transaction_data( + &mut self, + to_delete: u64, last_block: BlockNumber, ) -> ProviderResult<()> { let start = Instant::now(); @@ -458,7 +538,7 @@ impl StaticFileProviderRW { let segment = StaticFileSegment::Transactions; debug_assert!(self.writer.user_header().segment() == segment); - self.truncate(segment, number, Some(last_block))?; + self.truncate(segment, to_delete, Some(last_block))?; if let Some(metrics) = &self.metrics { metrics.record_segment_operation( @@ -471,11 +551,8 @@ impl StaticFileProviderRW { Ok(()) } - /// Prunes `to_delete` number of receipts from static_files. - /// - /// # Note - /// Commits to the configuration file at the end. - pub fn prune_receipts( + /// Prunes the last `to_delete` receipts from the data file. + fn prune_receipt_data( &mut self, to_delete: u64, last_block: BlockNumber, @@ -498,11 +575,8 @@ impl StaticFileProviderRW { Ok(()) } - /// Prunes `to_delete` number of headers from static_files. - /// - /// # Note - /// Commits to the configuration file at the end. - pub fn prune_headers(&mut self, to_delete: u64) -> ProviderResult<()> { + /// Prunes the last `to_delete` headers from the data file. + fn prune_header_data(&mut self, to_delete: u64) -> ProviderResult<()> { let start = Instant::now(); let segment = StaticFileSegment::Headers;