diff --git a/bin/reth/src/commands/debug_cmd/execution.rs b/bin/reth/src/commands/debug_cmd/execution.rs index e6ab1a64c..3f22b8443 100644 --- a/bin/reth/src/commands/debug_cmd/execution.rs +++ b/bin/reth/src/commands/debug_cmd/execution.rs @@ -127,7 +127,7 @@ impl Command { header_downloader, body_downloader, factory.clone(), - )? + ) .set(SenderRecoveryStage { commit_threshold: stage_conf.sender_recovery.commit_threshold, }) diff --git a/bin/reth/src/commands/import.rs b/bin/reth/src/commands/import.rs index 56ff78127..73a316895 100644 --- a/bin/reth/src/commands/import.rs +++ b/bin/reth/src/commands/import.rs @@ -191,7 +191,7 @@ impl ImportCommand { header_downloader, body_downloader, factory.clone(), - )? + ) .set(SenderRecoveryStage { commit_threshold: config.stages.sender_recovery.commit_threshold, }) diff --git a/crates/consensus/beacon/src/engine/test_utils.rs b/crates/consensus/beacon/src/engine/test_utils.rs index e491b1fe3..bae48a7ce 100644 --- a/crates/consensus/beacon/src/engine/test_utils.rs +++ b/crates/consensus/beacon/src/engine/test_utils.rs @@ -399,17 +399,14 @@ where .build(client.clone(), consensus.clone(), provider_factory.clone()) .into_task(); - Pipeline::builder().add_stages( - DefaultStages::new( - provider_factory.clone(), - HeaderSyncMode::Tip(tip_rx.clone()), - Arc::clone(&consensus), - header_downloader, - body_downloader, - executor_factory.clone(), - ) - .expect("should build"), - ) + Pipeline::builder().add_stages(DefaultStages::new( + provider_factory.clone(), + HeaderSyncMode::Tip(tip_rx.clone()), + Arc::clone(&consensus), + header_downloader, + body_downloader, + executor_factory.clone(), + )) } }; diff --git a/crates/etl/src/lib.rs b/crates/etl/src/lib.rs index 0676267e0..5b4413171 100644 --- a/crates/etl/src/lib.rs +++ b/crates/etl/src/lib.rs @@ -17,9 +17,8 @@ use std::{ cmp::Reverse, collections::BinaryHeap, - io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write}, + io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write}, path::Path, - sync::Arc, }; use rayon::prelude::*; @@ -41,7 +40,7 @@ where ::Compressed: std::fmt::Debug, { /// Directory for temporary file storage - dir: Arc, + dir: Option, /// Collection of temporary ETL files files: Vec, /// Current buffer size in bytes @@ -61,12 +60,12 @@ where ::Encoded: Ord + std::fmt::Debug, ::Compressed: Ord + std::fmt::Debug, { - /// Create a new collector in a specific temporary directory with some capacity. + /// Create a new collector with some capacity. /// /// Once the capacity (in bytes) is reached, the data is sorted and flushed to disk. - pub fn new(dir: Arc, buffer_capacity_bytes: usize) -> Self { + pub fn new(buffer_capacity_bytes: usize) -> Self { Self { - dir, + dir: None, buffer_size_bytes: 0, files: Vec::new(), buffer_capacity_bytes, @@ -85,24 +84,49 @@ where self.len == 0 } + /// Clears the collector, removing all data, including the temporary directory. + pub fn clear(&mut self) { + self.dir = None; + // Clear vectors and free the allocated memory + self.files = Vec::new(); + self.buffer = Vec::new(); + self.buffer_size_bytes = 0; + self.len = 0; + } + /// Insert an entry into the collector. - pub fn insert(&mut self, key: K, value: V) { + pub fn insert(&mut self, key: K, value: V) -> io::Result<()> { let key = key.encode(); let value = value.compress(); self.buffer_size_bytes += key.as_ref().len() + value.as_ref().len(); self.buffer.push((key, value)); if self.buffer_size_bytes > self.buffer_capacity_bytes { - self.flush(); + self.flush()?; } self.len += 1; + + Ok(()) } - fn flush(&mut self) { + /// Returns a reference to the temporary directory used by the collector. If the directory + /// doesn't exist, it will be created. + fn dir(&mut self) -> io::Result<&TempDir> { + if self.dir.is_none() { + self.dir = Some(TempDir::new()?); + } + Ok(self.dir.as_ref().unwrap()) + } + + fn flush(&mut self) -> io::Result<()> { self.buffer_size_bytes = 0; self.buffer.par_sort_unstable_by(|a, b| a.0.cmp(&b.0)); let mut buf = Vec::with_capacity(self.buffer.len()); std::mem::swap(&mut buf, &mut self.buffer); - self.files.push(EtlFile::new(self.dir.path(), buf).expect("could not flush data to disk")) + + let path = self.dir()?.path().to_path_buf(); + self.files.push(EtlFile::new(path.as_path(), buf)?); + + Ok(()) } /// Returns an iterator over the collector data. @@ -116,7 +140,7 @@ where pub fn iter(&mut self) -> std::io::Result> { // Flush the remaining items to disk if self.buffer_size_bytes > 0 { - self.flush(); + self.flush()?; } let mut heap = BinaryHeap::new(); @@ -246,9 +270,11 @@ mod tests { let mut entries: Vec<_> = (0..10_000).map(|id| (TxHash::random(), id as TxNumber)).collect(); - let mut collector = Collector::new(Arc::new(TempDir::new().unwrap()), 1024); + let mut collector = Collector::new(1024); + assert!(collector.dir.is_none()); + for (k, v) in entries.clone() { - collector.insert(k, v); + collector.insert(k, v).unwrap(); } entries.sort_unstable_by_key(|entry| entry.0); @@ -259,5 +285,16 @@ mod tests { (expected.0.encode().to_vec(), expected.1.compress().to_vec()) ); } + + let temp_dir_path = collector.dir.as_ref().unwrap().path().to_path_buf(); + + collector.clear(); + assert!(collector.dir.is_none()); + assert!(collector.files.is_empty()); + assert_eq!(collector.buffer_size_bytes, 0); + assert!(collector.buffer.is_empty()); + assert_eq!(collector.len, 0); + assert!(collector.is_empty()); + assert!(!temp_dir_path.exists()); } } diff --git a/crates/node-core/src/node_config.rs b/crates/node-core/src/node_config.rs index 95ebc044d..d0eb5a717 100644 --- a/crates/node-core/src/node_config.rs +++ b/crates/node-core/src/node_config.rs @@ -836,7 +836,7 @@ impl NodeConfig { header_downloader, body_downloader, factory.clone(), - )? + ) .set(SenderRecoveryStage { commit_threshold: stage_config.sender_recovery.commit_threshold, }) diff --git a/crates/stages/src/lib.rs b/crates/stages/src/lib.rs index c2bceceee..568a63bba 100644 --- a/crates/stages/src/lib.rs +++ b/crates/stages/src/lib.rs @@ -60,7 +60,6 @@ //! bodies_downloader, //! executor_factory, //! ) -//! .unwrap(), //! ) //! .build(provider_factory, static_file_producer); //! ``` diff --git a/crates/stages/src/sets.rs b/crates/stages/src/sets.rs index 1b029b11e..30fc169ac 100644 --- a/crates/stages/src/sets.rs +++ b/crates/stages/src/sets.rs @@ -52,7 +52,7 @@ use crate::{ IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage, TransactionLookupStage, }, - StageError, StageSet, StageSetBuilder, + StageSet, StageSetBuilder, }; use reth_db::database::Database; use reth_interfaces::{ @@ -61,7 +61,6 @@ use reth_interfaces::{ }; use reth_provider::{ExecutorFactory, HeaderSyncGapProvider, HeaderSyncMode}; use std::sync::Arc; -use tempfile::TempDir; /// A set containing all stages to run a fully syncing instance of reth. /// @@ -101,21 +100,20 @@ impl DefaultStages { header_downloader: H, body_downloader: B, executor_factory: EF, - ) -> Result + ) -> Self where EF: ExecutorFactory, { - Ok(Self { + Self { online: OnlineStages::new( provider, header_mode, consensus, header_downloader, body_downloader, - Arc::new(TempDir::new()?), ), executor_factory, - }) + } } } @@ -164,8 +162,6 @@ pub struct OnlineStages { header_downloader: H, /// The block body downloader body_downloader: B, - /// Temporary directory for ETL usage on headers stage. - temp_dir: Arc, } impl OnlineStages { @@ -176,9 +172,8 @@ impl OnlineStages { consensus: Arc, header_downloader: H, body_downloader: B, - temp_dir: Arc, ) -> Self { - Self { provider, header_mode, consensus, header_downloader, body_downloader, temp_dir } + Self { provider, header_mode, consensus, header_downloader, body_downloader } } } @@ -203,16 +198,9 @@ where mode: HeaderSyncMode, header_downloader: H, consensus: Arc, - temp_dir: Arc, ) -> StageSetBuilder { StageSetBuilder::default() - .add_stage(HeaderStage::new( - provider, - header_downloader, - mode, - consensus.clone(), - temp_dir.clone(), - )) + .add_stage(HeaderStage::new(provider, header_downloader, mode, consensus.clone())) .add_stage(bodies) } } @@ -231,7 +219,6 @@ where self.header_downloader, self.header_mode, self.consensus.clone(), - self.temp_dir.clone(), )) .add_stage(BodyStage::new(self.body_downloader)) } diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index d34ffa46b..48e5bba80 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -29,7 +29,6 @@ use std::{ sync::Arc, task::{ready, Context, Poll}, }; -use tempfile::TempDir; use tracing::*; /// The headers stage. @@ -77,7 +76,6 @@ where downloader: Downloader, mode: HeaderSyncMode, consensus: Arc, - tempdir: Arc, ) -> Self { Self { provider: database, @@ -85,8 +83,8 @@ where mode, consensus, sync_gap: None, - hash_collector: Collector::new(tempdir.clone(), 100 * (1024 * 1024)), - header_collector: Collector::new(tempdir, 100 * (1024 * 1024)), + hash_collector: Collector::new(100 * (1024 * 1024)), + header_collector: Collector::new(100 * (1024 * 1024)), is_etl_ready: false, } } @@ -158,7 +156,7 @@ where // add it to the collector and use tx.append on all hashes. if let Some((hash, block_number)) = cursor_header_numbers.last()? { if block_number.value()? == 0 { - self.hash_collector.insert(hash.key()?, 0); + self.hash_collector.insert(hash.key()?, 0)?; cursor_header_numbers.delete_current()?; first_sync = true; } @@ -244,8 +242,8 @@ where for header in headers { let header_number = header.number; - self.hash_collector.insert(header.hash(), header_number); - self.header_collector.insert(header_number, header); + self.hash_collector.insert(header.hash(), header_number)?; + self.header_collector.insert(header_number, header)?; // Headers are downloaded in reverse, so if we reach here, we know we have // filled the gap. @@ -291,6 +289,10 @@ where let last_header_number = self.write_headers::(provider.tx_ref(), provider.static_file_provider().clone())?; + // Clear ETL collectors + self.hash_collector.clear(); + self.header_collector.clear(); + Ok(ExecOutput { checkpoint: StageCheckpoint::new(last_header_number).with_headers_stage_checkpoint( HeadersCheckpoint { @@ -420,7 +422,6 @@ mod tests { (*self.downloader_factory)(), HeaderSyncMode::Tip(self.channel.1.clone()), self.consensus.clone(), - Arc::new(TempDir::new().unwrap()), ) } } @@ -586,5 +587,7 @@ mod tests { processed == checkpoint + headers.len() as u64 - 1 && total == tip.number ); assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed"); + assert!(runner.stage().hash_collector.is_empty()); + assert!(runner.stage().header_collector.is_empty()); } } diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index a619fe709..ac6899754 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -17,8 +17,6 @@ use reth_provider::{ BlockReader, DatabaseProviderRW, PruneCheckpointReader, PruneCheckpointWriter, StatsReader, TransactionsProvider, TransactionsProviderExt, }; -use std::sync::Arc; -use tempfile::TempDir; use tracing::*; /// The transaction lookup stage. @@ -101,8 +99,7 @@ impl Stage for TransactionLookupStage { } // 500MB temporary files - let mut hash_collector: Collector = - Collector::new(Arc::new(TempDir::new()?), 500 * (1024 * 1024)); + let mut hash_collector: Collector = Collector::new(500 * (1024 * 1024)); debug!( target: "sync::stages::transaction_lookup", @@ -119,7 +116,7 @@ impl Stage for TransactionLookupStage { debug!(target: "sync::stages::transaction_lookup", ?tx_range, "Calculating transaction hashes"); for (key, value) in provider.transaction_hashes_by_range(tx_range)? { - hash_collector.insert(key, value); + hash_collector.insert(key, value)?; } input.checkpoint = Some(