fix(stages, etl): clear ETL collectors in Headers stage when done (#6964)

This commit is contained in:
Alexey Shekhirin
2024-03-05 14:49:06 +00:00
committed by GitHub
parent ec401aa781
commit 024c217564
9 changed files with 80 additions and 60 deletions

View File

@ -17,9 +17,8 @@
use std::{
cmp::Reverse,
collections::BinaryHeap,
io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write},
io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write},
path::Path,
sync::Arc,
};
use rayon::prelude::*;
@ -41,7 +40,7 @@ where
<V as Compress>::Compressed: std::fmt::Debug,
{
/// Directory for temporary file storage
dir: Arc<TempDir>,
dir: Option<TempDir>,
/// Collection of temporary ETL files
files: Vec<EtlFile>,
/// Current buffer size in bytes
@ -61,12 +60,12 @@ where
<K as Encode>::Encoded: Ord + std::fmt::Debug,
<V as Compress>::Compressed: Ord + std::fmt::Debug,
{
/// Create a new collector in a specific temporary directory with some capacity.
/// Create a new collector with some capacity.
///
/// Once the capacity (in bytes) is reached, the data is sorted and flushed to disk.
pub fn new(dir: Arc<TempDir>, buffer_capacity_bytes: usize) -> Self {
pub fn new(buffer_capacity_bytes: usize) -> Self {
Self {
dir,
dir: None,
buffer_size_bytes: 0,
files: Vec::new(),
buffer_capacity_bytes,
@ -85,24 +84,49 @@ where
self.len == 0
}
/// Clears the collector, removing all data, including the temporary directory.
pub fn clear(&mut self) {
self.dir = None;
// Clear vectors and free the allocated memory
self.files = Vec::new();
self.buffer = Vec::new();
self.buffer_size_bytes = 0;
self.len = 0;
}
/// Insert an entry into the collector.
pub fn insert(&mut self, key: K, value: V) {
pub fn insert(&mut self, key: K, value: V) -> io::Result<()> {
let key = key.encode();
let value = value.compress();
self.buffer_size_bytes += key.as_ref().len() + value.as_ref().len();
self.buffer.push((key, value));
if self.buffer_size_bytes > self.buffer_capacity_bytes {
self.flush();
self.flush()?;
}
self.len += 1;
Ok(())
}
fn flush(&mut self) {
/// Returns a reference to the temporary directory used by the collector. If the directory
/// doesn't exist, it will be created.
fn dir(&mut self) -> io::Result<&TempDir> {
if self.dir.is_none() {
self.dir = Some(TempDir::new()?);
}
Ok(self.dir.as_ref().unwrap())
}
fn flush(&mut self) -> io::Result<()> {
self.buffer_size_bytes = 0;
self.buffer.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
let mut buf = Vec::with_capacity(self.buffer.len());
std::mem::swap(&mut buf, &mut self.buffer);
self.files.push(EtlFile::new(self.dir.path(), buf).expect("could not flush data to disk"))
let path = self.dir()?.path().to_path_buf();
self.files.push(EtlFile::new(path.as_path(), buf)?);
Ok(())
}
/// Returns an iterator over the collector data.
@ -116,7 +140,7 @@ where
pub fn iter(&mut self) -> std::io::Result<EtlIter<'_>> {
// Flush the remaining items to disk
if self.buffer_size_bytes > 0 {
self.flush();
self.flush()?;
}
let mut heap = BinaryHeap::new();
@ -246,9 +270,11 @@ mod tests {
let mut entries: Vec<_> =
(0..10_000).map(|id| (TxHash::random(), id as TxNumber)).collect();
let mut collector = Collector::new(Arc::new(TempDir::new().unwrap()), 1024);
let mut collector = Collector::new(1024);
assert!(collector.dir.is_none());
for (k, v) in entries.clone() {
collector.insert(k, v);
collector.insert(k, v).unwrap();
}
entries.sort_unstable_by_key(|entry| entry.0);
@ -259,5 +285,16 @@ mod tests {
(expected.0.encode().to_vec(), expected.1.compress().to_vec())
);
}
let temp_dir_path = collector.dir.as_ref().unwrap().path().to_path_buf();
collector.clear();
assert!(collector.dir.is_none());
assert!(collector.files.is_empty());
assert_eq!(collector.buffer_size_bytes, 0);
assert!(collector.buffer.is_empty());
assert_eq!(collector.len, 0);
assert!(collector.is_empty());
assert!(!temp_dir_path.exists());
}
}