feat(exex): write notification files atomically (#11264)

This commit is contained in:
joshieDo
2024-09-26 23:23:09 +02:00
committed by GitHub
parent 13a3c2c8cc
commit da6b1e7c64
3 changed files with 83 additions and 52 deletions

View File

@ -1,6 +1,5 @@
use std::{
fs::File,
io::{Read, Write},
ops::RangeInclusive,
path::{Path, PathBuf},
};
@ -95,7 +94,8 @@ impl Storage {
debug!(?file_path, "Reading notification from WAL");
let mut file = File::open(&file_path)?;
read_notification(&mut file)
// TODO(alexey): use rmp-serde when Alloy and Reth serde issues are resolved
Ok(serde_json::from_reader(&mut file)?)
}
/// Writes the notification to the file with the given id.
@ -108,27 +108,13 @@ impl Storage {
let file_path = self.file_path(file_id);
debug!(?file_path, "Writing notification to WAL");
let mut file = File::create_new(&file_path)?;
write_notification(&mut file, notification)?;
Ok(())
Ok(reth_fs_util::atomic_write_file(&file_path, |file| {
// TODO(alexey): use rmp-serde when Alloy and Reth serde issues are resolved
serde_json::to_writer(file, notification)
})?)
}
}
// TODO(alexey): use rmp-serde when Alloy and Reth serde issues are resolved
fn write_notification(mut w: &mut impl Write, notification: &ExExNotification) -> eyre::Result<()> {
// rmp_serde::encode::write(w, notification)?;
serde_json::to_writer(&mut w, notification)?;
w.flush()?;
Ok(())
}
fn read_notification(r: &mut impl Read) -> eyre::Result<ExExNotification> {
// Ok(rmp_serde::from_read(r)?)
Ok(serde_json::from_reader(r)?)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;

View File

@ -8,8 +8,8 @@
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
use serde::{de::DeserializeOwned, Serialize};
use std::{
fs::{self, File, ReadDir},
io::{self, BufWriter, Write},
fs::{self, File, OpenOptions, ReadDir},
io::{self, BufWriter, Error, ErrorKind, Write},
path::{Path, PathBuf},
};
@ -138,6 +138,14 @@ pub enum FsPathError {
/// The path related to the operation.
path: PathBuf,
},
/// Error variant for failed fsync operation with additional path context.
#[error("failed to sync path {path:?}: {source}")]
Fsync {
/// The source `io::Error`.
source: io::Error,
/// The path related to the operation.
path: PathBuf,
},
}
impl FsPathError {
@ -195,6 +203,11 @@ impl FsPathError {
pub fn metadata(source: io::Error, path: impl Into<PathBuf>) -> Self {
Self::Metadata { source, path: path.into() }
}
/// Returns the complementary error variant for `fsync`.
pub fn fsync(source: io::Error, path: impl Into<PathBuf>) -> Self {
Self::Fsync { source, path: path.into() }
}
}
/// Wrapper for `std::fs::read_to_string`
@ -277,3 +290,61 @@ pub fn write_json_file<T: Serialize>(path: &Path, obj: &T) -> Result<()> {
.map_err(|source| FsPathError::WriteJson { source, path: path.into() })?;
writer.flush().map_err(|e| FsPathError::write(e, path))
}
/// Writes atomically to file.
///
/// 1. Creates a temporary file with a `.tmp` extension in the same file directory.
/// 2. Writes content with `write_fn`.
/// 3. Fsyncs the temp file to disk.
/// 4. Renames the temp file to the target path.
/// 5. Fsyncs the file directory.
///
/// Atomic writes are hard:
/// * <https://github.com/paradigmxyz/reth/issues/8622>
/// * <https://users.rust-lang.org/t/how-to-write-replace-files-atomically/42821/13>
pub fn atomic_write_file<F, E>(file_path: &Path, write_fn: F) -> Result<()>
where
F: FnOnce(&mut File) -> std::result::Result<(), E>,
E: Into<Box<dyn std::error::Error + Send + Sync>>,
{
let mut tmp_path = file_path.to_path_buf();
tmp_path.set_extension("tmp");
// Write to the temporary file
let mut file =
File::create(&tmp_path).map_err(|err| FsPathError::create_file(err, &tmp_path))?;
write_fn(&mut file).map_err(|err| FsPathError::Write {
source: Error::new(ErrorKind::Other, err.into()),
path: tmp_path.clone(),
})?;
// fsync() file
file.sync_all().map_err(|err| FsPathError::fsync(err, &tmp_path))?;
// Rename file, not move
rename(&tmp_path, file_path)?;
// fsync() directory
if let Some(parent) = file_path.parent() {
#[cfg(windows)]
OpenOptions::new()
.read(true)
.write(true)
.custom_flags(0x02000000) // FILE_FLAG_BACKUP_SEMANTICS
.open(parent)
.map_err(|err| FsPathError::open(err, parent))?
.sync_all()
.map_err(|err| FsPathError::fsync(err, parent))?;
#[cfg(not(windows))]
OpenOptions::new()
.read(true)
.open(parent)
.map_err(|err| FsPathError::open(err, parent))?
.sync_all()
.map_err(|err| FsPathError::fsync(err, parent))?;
}
Ok(())
}

View File

@ -17,7 +17,7 @@ use memmap2::Mmap;
use serde::{Deserialize, Serialize};
use std::{
error::Error as StdError,
fs::{File, OpenOptions},
fs::File,
ops::Range,
path::{Path, PathBuf},
};
@ -250,35 +250,9 @@ impl<H: NippyJarHeader> NippyJar<H> {
/// Writes all necessary configuration to file.
fn freeze_config(&self) -> Result<(), NippyJarError> {
// Atomic writes are hard: <https://github.com/paradigmxyz/reth/issues/8622>
let mut tmp_path = self.config_path();
tmp_path.set_extension(".tmp");
// Write to temporary file
let mut file = File::create(&tmp_path)?;
bincode::serialize_into(&mut file, &self)?;
// fsync() file
file.sync_all()?;
// Rename file, not move
reth_fs_util::rename(&tmp_path, self.config_path())?;
// fsync() dir
if let Some(parent) = tmp_path.parent() {
//custom_flags() is only available on Windows
#[cfg(windows)]
OpenOptions::new()
.read(true)
.write(true)
.custom_flags(0x02000000) // FILE_FLAG_BACKUP_SEMANTICS
.open(parent)?
.sync_all()?;
#[cfg(not(windows))]
OpenOptions::new().read(true).open(parent)?.sync_all()?;
}
Ok(())
Ok(reth_fs_util::atomic_write_file(&self.config_path(), |file| {
bincode::serialize_into(file, &self)
})?)
}
}