fix: atomically write to static file configuration file (#8666)

This commit is contained in:
joshieDo
2024-06-07 22:37:20 +02:00
committed by GitHub
parent 314da27567
commit dd9e417d31
3 changed files with 66 additions and 20 deletions

View File

@ -17,7 +17,7 @@ use memmap2::Mmap;
use serde::{Deserialize, Serialize};
use std::{
error::Error as StdError,
fs::File,
fs::{File, OpenOptions},
ops::Range,
path::{Path, PathBuf},
};
@ -432,7 +432,25 @@ impl<H: NippyJarHeader> NippyJar<H> {
/// Writes all necessary configuration to file.
fn freeze_config(&self) -> Result<(), NippyJarError> {
Ok(bincode::serialize_into(File::create(self.config_path())?, &self)?)
// Atomic writes are hard: <https://github.com/paradigmxyz/reth/issues/8622>
let mut tmp_path = self.config_path();
tmp_path.set_extension(".tmp");
// Write to temporary file
let mut file = File::create(&tmp_path)?;
bincode::serialize_into(&mut file, &self)?;
// fsync() file
file.sync_all()?;
// Rename file, not move
reth_fs_util::rename(&tmp_path, self.config_path())?;
// fsync() dir
if let Some(parent) = tmp_path.parent() {
OpenOptions::new().read(true).open(parent)?.sync_all()?;
}
Ok(())
}
}
@ -1215,8 +1233,10 @@ mod tests {
writer.append_column(Some(Ok(&col1[0]))).unwrap();
assert_eq!(writer.column(), 1);
assert!(writer.is_dirty());
writer.append_column(Some(Ok(&col2[0]))).unwrap();
assert!(writer.is_dirty());
// Adding last column of a row resets writer and updates jar config
assert_eq!(writer.column(), 0);
@ -1225,6 +1245,7 @@ mod tests {
assert_eq!(writer.offsets().len(), 3);
let expected_data_file_size = *writer.offsets().last().unwrap();
writer.commit().unwrap();
assert!(!writer.is_dirty());
assert_eq!(writer.max_row_size(), col1[0].len() + col2[0].len());
assert_eq!(writer.rows(), 1);
@ -1281,6 +1302,7 @@ mod tests {
// Appends a third row, so we have an offset list in memory, which is not flushed to disk
writer.append_column(Some(Ok(&col1[2]))).unwrap();
writer.append_column(Some(Ok(&col2[2]))).unwrap();
assert!(writer.is_dirty());
// This should prune from the on-memory offset list and ondisk offset list
writer.prune_rows(2).unwrap();
@ -1308,6 +1330,8 @@ mod tests {
// This should prune from the ondisk offset list and clear the jar.
let mut writer = NippyJarWriter::new(nippy, ConsistencyFailStrategy::Heal).unwrap();
writer.prune_rows(1).unwrap();
assert!(writer.is_dirty());
assert_eq!(writer.rows(), 0);
assert_eq!(writer.max_row_size(), 0);
assert_eq!(File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize, 0);
@ -1316,6 +1340,8 @@ mod tests {
File::open(writer.offsets_path()).unwrap().metadata().unwrap().len() as usize,
1
);
writer.commit().unwrap();
assert!(!writer.is_dirty());
}
fn simulate_interrupted_prune(

View File

@ -39,6 +39,8 @@ pub struct NippyJarWriter<H: NippyJarHeader = ()> {
offsets: Vec<u64>,
/// Column where writer is going to write next.
column: usize,
/// Whether the writer has changed data that needs to be committed.
dirty: bool,
}
impl<H: NippyJarHeader> NippyJarWriter<H> {
@ -64,6 +66,7 @@ impl<H: NippyJarHeader> NippyJarWriter<H> {
uncompressed_row_size: 0,
offsets: Vec::with_capacity(1_000_000),
column: 0,
dirty: false,
};
// If we are opening a previously created jar, we need to check its consistency, and make
@ -83,11 +86,20 @@ impl<H: NippyJarHeader> NippyJarWriter<H> {
&self.jar.user_header
}
/// Returns a mutable reference to `H` of [`NippyJar`]
/// Returns a mutable reference to `H` of [`NippyJar`].
///
/// Since there's no way of knowing if `H` has been actually changed, this sets `self.dirty` to
/// true.
pub fn user_header_mut(&mut self) -> &mut H {
self.dirty = true;
&mut self.jar.user_header
}
/// Returns whether there are changes that need to be committed.
pub const fn is_dirty(&self) -> bool {
self.dirty
}
/// Gets total writer rows in jar.
pub const fn rows(&self) -> usize {
self.jar.rows()
@ -264,6 +276,8 @@ impl<H: NippyJarHeader> NippyJarWriter<H> {
&mut self,
column: Option<ColumnResult<impl AsRef<[u8]>>>,
) -> Result<(), NippyJarError> {
self.dirty = true;
match column {
Some(Ok(value)) => {
if self.offsets.is_empty() {
@ -313,6 +327,8 @@ impl<H: NippyJarHeader> NippyJarWriter<H> {
/// Prunes rows from data and offsets file and updates its configuration on disk
pub fn prune_rows(&mut self, num_rows: usize) -> Result<(), NippyJarError> {
self.dirty = true;
self.offsets_file.flush()?;
self.data_file.flush()?;
@ -412,6 +428,7 @@ impl<H: NippyJarHeader> NippyJarWriter<H> {
// Flushes `max_row_size` and total `rows` to disk.
self.jar.freeze_config()?;
self.dirty = false;
Ok(())
}
@ -424,6 +441,7 @@ impl<H: NippyJarHeader> NippyJarWriter<H> {
// Flushes `max_row_size` and total `rows` to disk.
self.jar.freeze_config()?;
self.dirty = false;
Ok(())
}

View File

@ -181,27 +181,29 @@ impl StaticFileProviderRW {
}
}
// Commits offsets and new user_header to disk
self.writer.commit().map_err(|e| ProviderError::NippyJar(e.to_string()))?;
if self.writer.is_dirty() {
// Commits offsets and new user_header to disk
self.writer.commit().map_err(|e| ProviderError::NippyJar(e.to_string()))?;
if let Some(metrics) = &self.metrics {
metrics.record_segment_operation(
self.writer.user_header().segment(),
StaticFileProviderOperation::CommitWriter,
Some(start.elapsed()),
if let Some(metrics) = &self.metrics {
metrics.record_segment_operation(
self.writer.user_header().segment(),
StaticFileProviderOperation::CommitWriter,
Some(start.elapsed()),
);
}
debug!(
target: "provider::static_file",
segment = ?self.writer.user_header().segment(),
path = ?self.data_path,
duration = ?start.elapsed(),
"Commit"
);
self.update_index()?;
}
debug!(
target: "provider::static_file",
segment = ?self.writer.user_header().segment(),
path = ?self.data_path,
duration = ?start.elapsed(),
"Commit"
);
self.update_index()?;
Ok(())
}