diff --git a/crates/exex/exex/src/wal/storage.rs b/crates/exex/exex/src/wal/storage.rs index 8d4e008b4..ad2307361 100644 --- a/crates/exex/exex/src/wal/storage.rs +++ b/crates/exex/exex/src/wal/storage.rs @@ -1,6 +1,5 @@ use std::{ fs::File, - io::{Read, Write}, ops::RangeInclusive, path::{Path, PathBuf}, }; @@ -95,7 +94,8 @@ impl Storage { debug!(?file_path, "Reading notification from WAL"); let mut file = File::open(&file_path)?; - read_notification(&mut file) + // TODO(alexey): use rmp-serde when Alloy and Reth serde issues are resolved + Ok(serde_json::from_reader(&mut file)?) } /// Writes the notification to the file with the given id. @@ -108,27 +108,13 @@ impl Storage { let file_path = self.file_path(file_id); debug!(?file_path, "Writing notification to WAL"); - let mut file = File::create_new(&file_path)?; - write_notification(&mut file, notification)?; - - Ok(()) + Ok(reth_fs_util::atomic_write_file(&file_path, |file| { + // TODO(alexey): use rmp-serde when Alloy and Reth serde issues are resolved + serde_json::to_writer(file, notification) + })?) } } -// TODO(alexey): use rmp-serde when Alloy and Reth serde issues are resolved - -fn write_notification(mut w: &mut impl Write, notification: &ExExNotification) -> eyre::Result<()> { - // rmp_serde::encode::write(w, notification)?; - serde_json::to_writer(&mut w, notification)?; - w.flush()?; - Ok(()) -} - -fn read_notification(r: &mut impl Read) -> eyre::Result { - // Ok(rmp_serde::from_read(r)?) - Ok(serde_json::from_reader(r)?) -} - #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/crates/fs-util/src/lib.rs b/crates/fs-util/src/lib.rs index 91e60c313..f77632cc8 100644 --- a/crates/fs-util/src/lib.rs +++ b/crates/fs-util/src/lib.rs @@ -8,8 +8,8 @@ #![cfg_attr(not(test), warn(unused_crate_dependencies))] use serde::{de::DeserializeOwned, Serialize}; use std::{ - fs::{self, File, ReadDir}, - io::{self, BufWriter, Write}, + fs::{self, File, OpenOptions, ReadDir}, + io::{self, BufWriter, Error, ErrorKind, Write}, path::{Path, PathBuf}, }; @@ -138,6 +138,14 @@ pub enum FsPathError { /// The path related to the operation. path: PathBuf, }, + /// Error variant for failed fsync operation with additional path context. + #[error("failed to sync path {path:?}: {source}")] + Fsync { + /// The source `io::Error`. + source: io::Error, + /// The path related to the operation. + path: PathBuf, + }, } impl FsPathError { @@ -195,6 +203,11 @@ impl FsPathError { pub fn metadata(source: io::Error, path: impl Into) -> Self { Self::Metadata { source, path: path.into() } } + + /// Returns the complementary error variant for `fsync`. + pub fn fsync(source: io::Error, path: impl Into) -> Self { + Self::Fsync { source, path: path.into() } + } } /// Wrapper for `std::fs::read_to_string` @@ -277,3 +290,61 @@ pub fn write_json_file(path: &Path, obj: &T) -> Result<()> { .map_err(|source| FsPathError::WriteJson { source, path: path.into() })?; writer.flush().map_err(|e| FsPathError::write(e, path)) } + +/// Writes atomically to file. +/// +/// 1. Creates a temporary file with a `.tmp` extension in the same file directory. +/// 2. Writes content with `write_fn`. +/// 3. Fsyncs the temp file to disk. +/// 4. Renames the temp file to the target path. +/// 5. Fsyncs the file directory. +/// +/// Atomic writes are hard: +/// * +/// * +pub fn atomic_write_file(file_path: &Path, write_fn: F) -> Result<()> +where + F: FnOnce(&mut File) -> std::result::Result<(), E>, + E: Into>, +{ + let mut tmp_path = file_path.to_path_buf(); + tmp_path.set_extension("tmp"); + + // Write to the temporary file + let mut file = + File::create(&tmp_path).map_err(|err| FsPathError::create_file(err, &tmp_path))?; + + write_fn(&mut file).map_err(|err| FsPathError::Write { + source: Error::new(ErrorKind::Other, err.into()), + path: tmp_path.clone(), + })?; + + // fsync() file + file.sync_all().map_err(|err| FsPathError::fsync(err, &tmp_path))?; + + // Rename file, not move + rename(&tmp_path, file_path)?; + + // fsync() directory + if let Some(parent) = file_path.parent() { + #[cfg(windows)] + OpenOptions::new() + .read(true) + .write(true) + .custom_flags(0x02000000) // FILE_FLAG_BACKUP_SEMANTICS + .open(parent) + .map_err(|err| FsPathError::open(err, parent))? + .sync_all() + .map_err(|err| FsPathError::fsync(err, parent))?; + + #[cfg(not(windows))] + OpenOptions::new() + .read(true) + .open(parent) + .map_err(|err| FsPathError::open(err, parent))? + .sync_all() + .map_err(|err| FsPathError::fsync(err, parent))?; + } + + Ok(()) +} diff --git a/crates/storage/nippy-jar/src/lib.rs b/crates/storage/nippy-jar/src/lib.rs index a720192d6..bdc950aa3 100644 --- a/crates/storage/nippy-jar/src/lib.rs +++ b/crates/storage/nippy-jar/src/lib.rs @@ -17,7 +17,7 @@ use memmap2::Mmap; use serde::{Deserialize, Serialize}; use std::{ error::Error as StdError, - fs::{File, OpenOptions}, + fs::File, ops::Range, path::{Path, PathBuf}, }; @@ -250,35 +250,9 @@ impl NippyJar { /// Writes all necessary configuration to file. fn freeze_config(&self) -> Result<(), NippyJarError> { - // Atomic writes are hard: - let mut tmp_path = self.config_path(); - tmp_path.set_extension(".tmp"); - - // Write to temporary file - let mut file = File::create(&tmp_path)?; - bincode::serialize_into(&mut file, &self)?; - - // fsync() file - file.sync_all()?; - - // Rename file, not move - reth_fs_util::rename(&tmp_path, self.config_path())?; - - // fsync() dir - if let Some(parent) = tmp_path.parent() { - //custom_flags() is only available on Windows - #[cfg(windows)] - OpenOptions::new() - .read(true) - .write(true) - .custom_flags(0x02000000) // FILE_FLAG_BACKUP_SEMANTICS - .open(parent)? - .sync_all()?; - - #[cfg(not(windows))] - OpenOptions::new().read(true).open(parent)?.sync_all()?; - } - Ok(()) + Ok(reth_fs_util::atomic_write_file(&self.config_path(), |file| { + bincode::serialize_into(file, &self) + })?) } }