fix: ensures that pruning data from static files only happens on calling commit() (#8101)

This commit is contained in:
joshieDo
2024-05-07 16:46:11 +01:00
committed by GitHub
parent 05e434eae3
commit a2623e8364
6 changed files with 114 additions and 25 deletions

View File

@ -146,8 +146,13 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
// If static files are ahead, then we didn't reach the database commit in a previous
// stage run. So, our only solution is to unwind the static files and proceed from the
// database expected height.
Ordering::Greater => static_file_producer
.prune_transactions(next_static_file_tx_num - next_tx_num, from_block - 1)?,
Ordering::Greater => {
static_file_producer
.prune_transactions(next_static_file_tx_num - next_tx_num, from_block - 1)?;
// Since this is a database <-> static file inconsistency, we commit the change
// straight away.
static_file_producer.commit()?;
}
// If static files are behind, then there was some corruption or loss of files. This
// error will trigger an unwind, that will bring the database to the same height as the
// static files.
@ -576,6 +581,7 @@ mod tests {
let mut static_file_producer =
static_file_provider.latest_writer(StaticFileSegment::Transactions).unwrap();
static_file_producer.prune_transactions(1, checkpoint.block_number).unwrap();
static_file_producer.commit().unwrap();
}
// Unwind all of it
let unwind_to = 1;

View File

@ -169,7 +169,11 @@ where
let static_file_producer = if self.prune_modes.receipts.is_none() &&
self.prune_modes.receipts_log_filter.is_empty()
{
Some(prepare_static_file_producer(provider, start_block)?)
let mut producer = prepare_static_file_producer(provider, start_block)?;
// Since there might be a database <-> static file inconsistency (read
// `prepare_static_file_producer` for context), we commit the change straight away.
producer.commit()?;
Some(producer)
} else {
None
};

View File

@ -582,6 +582,7 @@ mod tests {
let hash = last_header.hash_slow();
writer.prune_headers(1).unwrap();
writer.commit().unwrap();
writer.append_header(last_header, U256::ZERO, hash).unwrap();
writer.commit().unwrap();

View File

@ -272,12 +272,13 @@ mod tests {
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
// Unwind headers from static_files and manually insert them into the database, so we're
// able to check that static_file_producer works
db.factory
.static_file_provider()
let static_file_provider = db.factory.static_file_provider();
let mut static_file_writer = static_file_provider
.latest_writer(StaticFileSegment::Headers)
.expect("get static file writer for headers")
.prune_headers(blocks.len() as u64)
.expect("prune headers");
.expect("get static file writer for headers");
static_file_writer.prune_headers(blocks.len() as u64).unwrap();
static_file_writer.commit().expect("prune headers");
let tx = db.factory.db_ref().tx_mut().expect("init tx");
blocks.iter().for_each(|block| {
TestStageDB::insert_header(None, &tx, &block.header, U256::ZERO)

View File

@ -1112,7 +1112,10 @@ impl<TX: DbTx> HeaderSyncGapProvider for DatabaseProvider<TX> {
Ordering::Greater => {
let mut static_file_producer =
static_file_provider.latest_writer(StaticFileSegment::Headers)?;
static_file_producer.prune_headers(next_static_file_block_num - next_block)?
static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
// Since this is a database <-> static file inconsistency, we commit the change
// straight away.
static_file_producer.commit()?
}
Ordering::Less => {
// There's either missing or corrupted files.

View File

@ -30,10 +30,17 @@ pub struct StaticFileProviderRW {
/// stored in a [dashmap::DashMap] inside the parent [StaticFileProvider].which is an [Arc].
/// If we were to use an [Arc] here, we would create a reference cycle.
reader: Weak<StaticFileProviderInner>,
/// A [`NippyJarWriter`] instance.
writer: NippyJarWriter<SegmentHeader>,
/// Path to opened file.
data_path: PathBuf,
/// Reusable buffer for encoding appended data.
buf: Vec<u8>,
/// Metrics.
metrics: Option<Arc<StaticFileProviderMetrics>>,
/// On commit, does the instructed pruning: number of lines, and if it applies, the last block
/// it ends at.
prune_on_commit: Option<(u64, Option<BlockNumber>)>,
}
impl StaticFileProviderRW {
@ -45,7 +52,14 @@ impl StaticFileProviderRW {
metrics: Option<Arc<StaticFileProviderMetrics>>,
) -> ProviderResult<Self> {
let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?;
Ok(Self { writer, data_path, buf: Vec::with_capacity(100), reader, metrics })
Ok(Self {
writer,
data_path,
buf: Vec::with_capacity(100),
reader,
metrics,
prune_on_commit: None,
})
}
fn open(
@ -100,6 +114,18 @@ impl StaticFileProviderRW {
pub fn commit(&mut self) -> ProviderResult<()> {
let start = Instant::now();
// Truncates the data file if instructed to.
if let Some((to_delete, last_block_number)) = self.prune_on_commit.take() {
match self.writer.user_header().segment() {
StaticFileSegment::Headers => self.prune_header_data(to_delete)?,
StaticFileSegment::Transactions => self
.prune_transaction_data(to_delete, last_block_number.expect("should exist"))?,
StaticFileSegment::Receipts => {
self.prune_receipt_data(to_delete, last_block_number.expect("should exist"))?
}
}
}
// Commits offsets and new user_header to disk
self.writer.commit().map_err(|e| ProviderError::NippyJar(e.to_string()))?;
@ -372,6 +398,7 @@ impl StaticFileProviderRW {
hash: BlockHash,
) -> ProviderResult<BlockNumber> {
let start = Instant::now();
self.ensure_no_queued_prune()?;
debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
@ -404,6 +431,7 @@ impl StaticFileProviderRW {
tx: TransactionSignedNoHash,
) -> ProviderResult<TxNumber> {
let start = Instant::now();
self.ensure_no_queued_prune()?;
let result = self.append_with_tx_number(StaticFileSegment::Transactions, tx_num, tx)?;
@ -430,6 +458,7 @@ impl StaticFileProviderRW {
receipt: Receipt,
) -> ProviderResult<TxNumber> {
let start = Instant::now();
self.ensure_no_queued_prune()?;
let result = self.append_with_tx_number(StaticFileSegment::Receipts, tx_num, receipt)?;
@ -444,13 +473,64 @@ impl StaticFileProviderRW {
Ok(result)
}
/// Removes the last `number` of transactions from static files.
/// Adds an instruction to prune `to_delete`transactions during commit.
///
/// # Note
/// Commits to the configuration file at the end.
/// Note: `last_block` refers to the block the unwinds ends at.
pub fn prune_transactions(
&mut self,
number: u64,
to_delete: u64,
last_block: BlockNumber,
) -> ProviderResult<()> {
debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Transactions);
self.queue_prune(to_delete, Some(last_block))
}
/// Adds an instruction to prune `to_delete` receipts during commit.
///
/// Note: `last_block` refers to the block the unwinds ends at.
pub fn prune_receipts(
&mut self,
to_delete: u64,
last_block: BlockNumber,
) -> ProviderResult<()> {
debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Receipts);
self.queue_prune(to_delete, Some(last_block))
}
/// Adds an instruction to prune `to_delete` headers during commit.
pub fn prune_headers(&mut self, to_delete: u64) -> ProviderResult<()> {
debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Headers);
self.queue_prune(to_delete, None)
}
/// Adds an instruction to prune `to_delete` elements during commit.
///
/// Note: `last_block` refers to the block the unwinds ends at if dealing with transaction-based
/// data.
fn queue_prune(
&mut self,
to_delete: u64,
last_block: Option<BlockNumber>,
) -> ProviderResult<()> {
self.ensure_no_queued_prune()?;
self.prune_on_commit = Some((to_delete, last_block));
Ok(())
}
/// Returns Error if there is a pruning instruction that needs to be applied.
fn ensure_no_queued_prune(&self) -> ProviderResult<()> {
if self.prune_on_commit.is_some() {
return Err(ProviderError::NippyJar(
"Pruning should be comitted before appending or pruning more data".to_string(),
));
}
Ok(())
}
/// Removes the last `to_delete` transactions from the data file.
fn prune_transaction_data(
&mut self,
to_delete: u64,
last_block: BlockNumber,
) -> ProviderResult<()> {
let start = Instant::now();
@ -458,7 +538,7 @@ impl StaticFileProviderRW {
let segment = StaticFileSegment::Transactions;
debug_assert!(self.writer.user_header().segment() == segment);
self.truncate(segment, number, Some(last_block))?;
self.truncate(segment, to_delete, Some(last_block))?;
if let Some(metrics) = &self.metrics {
metrics.record_segment_operation(
@ -471,11 +551,8 @@ impl StaticFileProviderRW {
Ok(())
}
/// Prunes `to_delete` number of receipts from static_files.
///
/// # Note
/// Commits to the configuration file at the end.
pub fn prune_receipts(
/// Prunes the last `to_delete` receipts from the data file.
fn prune_receipt_data(
&mut self,
to_delete: u64,
last_block: BlockNumber,
@ -498,11 +575,8 @@ impl StaticFileProviderRW {
Ok(())
}
/// Prunes `to_delete` number of headers from static_files.
///
/// # Note
/// Commits to the configuration file at the end.
pub fn prune_headers(&mut self, to_delete: u64) -> ProviderResult<()> {
/// Prunes the last `to_delete` headers from the data file.
fn prune_header_data(&mut self, to_delete: u64) -> ProviderResult<()> {
let start = Instant::now();
let segment = StaticFileSegment::Headers;