From 8f1f386f5202c1099c002974d3b08bed79d73ce9 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Mon, 15 Apr 2024 13:46:34 +0200 Subject: [PATCH] feat(op): chunked chain import (#7574) Co-authored-by: Atris --- Cargo.lock | 1 + bin/reth/src/commands/import.rs | 95 +++++----- crates/net/downloaders/Cargo.toml | 1 + crates/net/downloaders/src/file_client.rs | 201 ++++++++++++++++++++-- crates/net/downloaders/src/file_codec.rs | 5 +- 5 files changed, 249 insertions(+), 54 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 166e66290..5a71369a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6408,6 +6408,7 @@ dependencies = [ "itertools 0.12.1", "metrics", "pin-project", + "rand 0.8.5", "rayon", "reth-config", "reth-db", diff --git a/bin/reth/src/commands/import.rs b/bin/reth/src/commands/import.rs index 49fdba1ee..74e694388 100644 --- a/bin/reth/src/commands/import.rs +++ b/bin/reth/src/commands/import.rs @@ -15,7 +15,8 @@ use reth_beacon_consensus::BeaconConsensus; use reth_config::Config; use reth_db::{database::Database, init_db}; use reth_downloaders::{ - bodies::bodies::BodiesDownloaderBuilder, file_client::FileClient, + bodies::bodies::BodiesDownloaderBuilder, + file_client::{ChunkedFileReader, FileClient, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE}, headers::reverse_headers::ReverseHeadersDownloaderBuilder, }; use reth_interfaces::{ @@ -76,6 +77,10 @@ pub struct ImportCommand { #[arg(long, verbatim_doc_comment, env = OP_RETH_MAINNET_BELOW_BEDROCK)] op_mainnet_below_bedrock: bool, + /// Chunk byte length. + #[arg(long, value_name = "CHUNK_LEN", verbatim_doc_comment)] + chunk_len: Option, + #[command(flatten)] db: DatabaseArgs, @@ -101,6 +106,10 @@ impl ImportCommand { debug!(target: "reth::cli", "Execution stage disabled"); } + debug!(target: "reth::cli", + chunk_byte_len=self.chunk_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE), "Chunking chain import" + ); + // add network name to data dir let data_dir = self.datadir.unwrap_or_chain_default(self.chain.chain); let config_path = self.config.clone().unwrap_or_else(|| data_dir.config_path()); @@ -123,49 +132,55 @@ impl ImportCommand { let consensus = Arc::new(BeaconConsensus::new(self.chain.clone())); info!(target: "reth::cli", "Consensus engine initialized"); - // create a new FileClient - info!(target: "reth::cli", "Importing chain file"); - let file_client = Arc::new(FileClient::new(&self.path).await?); + // open file + let mut reader = ChunkedFileReader::new(&self.path, self.chunk_len).await?; - // override the tip - let tip = file_client.tip().expect("file client has no tip"); - info!(target: "reth::cli", "Chain file read"); + while let Some(file_client) = reader.next_chunk().await? { + // create a new FileClient from chunk read from file + info!(target: "reth::cli", + "Importing chain file chunk" + ); - let (mut pipeline, events) = self - .build_import_pipeline( - config, - provider_factory.clone(), - &consensus, - file_client, - StaticFileProducer::new( + // override the tip + let tip = file_client.tip().expect("file client has no tip"); + info!(target: "reth::cli", "Chain file chunk read"); + + let (mut pipeline, events) = self + .build_import_pipeline( + &config, provider_factory.clone(), - provider_factory.static_file_provider(), - PruneModes::default(), - ), - self.disable_execution, - ) - .await?; + &consensus, + Arc::new(file_client), + StaticFileProducer::new( + provider_factory.clone(), + provider_factory.static_file_provider(), + PruneModes::default(), + ), + self.disable_execution, + ) + .await?; - // override the tip - pipeline.set_tip(tip); - debug!(target: "reth::cli", ?tip, "Tip manually set"); + // override the tip + pipeline.set_tip(tip); + debug!(target: "reth::cli", ?tip, "Tip manually set"); - let provider = provider_factory.provider()?; + let provider = provider_factory.provider()?; - let latest_block_number = - provider.get_stage_checkpoint(StageId::Finish)?.map(|ch| ch.block_number); - tokio::spawn(reth_node_core::events::node::handle_events( - None, - latest_block_number, - events, - db.clone(), - )); + let latest_block_number = + provider.get_stage_checkpoint(StageId::Finish)?.map(|ch| ch.block_number); + tokio::spawn(reth_node_core::events::node::handle_events( + None, + latest_block_number, + events, + db.clone(), + )); - // Run pipeline - info!(target: "reth::cli", "Starting sync pipeline"); - tokio::select! { - res = pipeline.run() => res?, - _ = tokio::signal::ctrl_c() => {}, + // Run pipeline + info!(target: "reth::cli", "Starting sync pipeline"); + tokio::select! { + res = pipeline.run() => res?, + _ = tokio::signal::ctrl_c() => {}, + } } info!(target: "reth::cli", "Chain file imported"); @@ -174,7 +189,7 @@ impl ImportCommand { async fn build_import_pipeline( &self, - config: Config, + config: &Config, provider_factory: ProviderFactory, consensus: &Arc, file_client: Arc, @@ -220,7 +235,7 @@ impl ImportCommand { header_downloader, body_downloader, factory.clone(), - config.stages.etl, + config.stages.etl.clone(), ) .set(SenderRecoveryStage { commit_threshold: config.stages.sender_recovery.commit_threshold, @@ -239,7 +254,7 @@ impl ImportCommand { .clean_threshold .max(config.stages.account_hashing.clean_threshold) .max(config.stages.storage_hashing.clean_threshold), - config.prune.map(|prune| prune.segments).unwrap_or_default(), + config.prune.clone().map(|prune| prune.segments).unwrap_or_default(), )) .disable_if(StageId::Execution, || disable_execution), ) diff --git a/crates/net/downloaders/Cargo.toml b/crates/net/downloaders/Cargo.toml index 83aadb853..7ae6db8e6 100644 --- a/crates/net/downloaders/Cargo.toml +++ b/crates/net/downloaders/Cargo.toml @@ -52,6 +52,7 @@ assert_matches.workspace = true tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } alloy-rlp.workspace = true itertools.workspace = true +rand.workspace = true tempfile.workspace = true diff --git a/crates/net/downloaders/src/file_client.rs b/crates/net/downloaders/src/file_client.rs index a692bc825..ce830383f 100644 --- a/crates/net/downloaders/src/file_client.rs +++ b/crates/net/downloaders/src/file_client.rs @@ -8,8 +8,8 @@ use reth_interfaces::p2p::{ priority::Priority, }; use reth_primitives::{ - BlockBody, BlockHash, BlockHashOrNumber, BlockNumber, Header, HeadersDirection, PeerId, - SealedHeader, B256, + BlockBody, BlockHash, BlockHashOrNumber, BlockNumber, BytesMut, Header, HeadersDirection, + PeerId, SealedHeader, B256, }; use std::{collections::HashMap, path::Path}; use thiserror::Error; @@ -18,6 +18,9 @@ use tokio_stream::StreamExt; use tokio_util::codec::FramedRead; use tracing::{trace, warn}; +/// Byte length of chunk to read from chain file. +pub const DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE: u64 = 1_000_000_000; + /// Front-end API for fetching chain data from a file. /// /// Blocks are assumed to be written one after another in a file, as rlp bytes. @@ -49,8 +52,8 @@ pub enum FileClientError { Io(#[from] std::io::Error), /// An error occurred when decoding blocks, headers, or rlp headers from the file. - #[error(transparent)] - Rlp(#[from] alloy_rlp::Error), + #[error("{0}")] + Rlp(alloy_rlp::Error, Vec), } impl FileClient { @@ -66,23 +69,41 @@ impl FileClient { let metadata = file.metadata().await?; let file_len = metadata.len(); - // todo: read chunks into memory. for op mainnet 1/8 th of blocks below bedrock can be - // decoded at once let mut reader = vec![]; file.read_to_end(&mut reader).await.unwrap(); + Ok(Self::from_reader(&reader[..], file_len).await?.0) + } + + /// Initialize the [`FileClient`] from bytes that have been read from file. + pub(crate) async fn from_reader( + reader: B, + num_bytes: u64, + ) -> Result<(Self, Vec), FileClientError> + where + B: AsyncReadExt + Unpin, + { let mut headers = HashMap::new(); let mut hash_to_number = HashMap::new(); let mut bodies = HashMap::new(); // use with_capacity to make sure the internal buffer contains the entire file - let mut stream = FramedRead::with_capacity(&reader[..], BlockFileCodec, file_len as usize); + let mut stream = FramedRead::with_capacity(reader, BlockFileCodec, num_bytes as usize); + + let mut remaining_bytes = vec![]; let mut log_interval = 0; let mut log_interval_start_block = 0; while let Some(block_res) = stream.next().await { - let block = block_res?; + let block = match block_res { + Ok(block) => block, + Err(FileClientError::Rlp(_err, bytes)) => { + remaining_bytes = bytes; + break + } + Err(err) => return Err(err), + }; let block_number = block.header.number; let block_hash = block.header.hash_slow(); @@ -99,11 +120,15 @@ impl FileClient { ); if log_interval == 0 { + trace!(target: "downloaders::file", + block_number, + "read first block" + ); log_interval_start_block = block_number; - } else if log_interval % 100000 == 0 { + } else if log_interval % 100_000 == 0 { trace!(target: "downloaders::file", blocks=?log_interval_start_block..=block_number, - "inserted blocks into db" + "read blocks from file" ); log_interval_start_block = block_number + 1; } @@ -112,12 +137,12 @@ impl FileClient { trace!(blocks = headers.len(), "Initialized file client"); - Ok(Self { headers, hash_to_number, bodies }) + Ok((Self { headers, hash_to_number, bodies }, remaining_bytes)) } /// Get the tip hash of the chain. pub fn tip(&self) -> Option { - self.headers.get(&((self.headers.len() - 1) as u64)).map(|h| h.hash_slow()) + self.headers.get(&self.max_block()?).map(|h| h.hash_slow()) } /// Get the start hash of the chain. @@ -267,6 +292,96 @@ impl DownloadClient for FileClient { } } +/// Chunks file into several [`FileClient`]s. +#[derive(Debug)] +pub struct ChunkedFileReader { + /// File to read from. + file: File, + /// Current file length. + file_len: u64, + /// Bytes that have been read. + chunk: Vec, + /// Max bytes per chunk. + chunk_byte_len: u64, +} + +impl ChunkedFileReader { + /// Opens the file to import from given path. Returns a new instance. If no chunk byte length + /// is passed, chunks have [`DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE`] (one static file). + pub async fn new>( + path: P, + chunk_byte_len: Option, + ) -> Result { + let file = File::open(path).await?; + let chunk_byte_len = chunk_byte_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE); + + Self::from_file(file, chunk_byte_len).await + } + + /// Opens the file to import from given path. Returns a new instance. + pub async fn from_file(file: File, chunk_byte_len: u64) -> Result { + // get file len from metadata before reading + let metadata = file.metadata().await?; + let file_len = metadata.len(); + + Ok(Self { file, file_len, chunk: vec![], chunk_byte_len }) + } + + /// Calculates the number of bytes to read from the chain file. Returns a tuple of the chunk + /// length and the remaining file length. + fn chunk_len(&self) -> u64 { + let Self { chunk_byte_len, file_len, .. } = *self; + let file_len = file_len + self.chunk.len() as u64; + + if chunk_byte_len > file_len { + // last chunk + file_len + } else { + chunk_byte_len + } + } + + /// Read next chunk from file. Returns [`FileClient`] containing decoded chunk. + pub async fn next_chunk(&mut self) -> Result, FileClientError> { + if self.file_len == 0 && self.chunk.is_empty() { + // eof + return Ok(None) + } + + let chunk_len = self.chunk_len(); + let old_bytes_len = self.chunk.len() as u64; + + // calculate reserved space in chunk + let new_bytes_len = chunk_len - old_bytes_len; + + // read new bytes from file + let mut reader = BytesMut::with_capacity(new_bytes_len as usize); + self.file.read_buf(&mut reader).await?; + + // update remaining file length + self.file_len -= new_bytes_len; + + trace!(target: "downloaders::file", + max_chunk_byte_len=self.chunk_byte_len, + prev_read_bytes_len=self.chunk.len(), + new_bytes_len, + remaining_file_byte_len=self.file_len, + "new bytes were read from file" + ); + + // read new bytes from file into chunk + self.chunk.extend_from_slice(&reader[..]); + + // make new file client from chunk + let (file_client, bytes) = FileClient::from_reader(&self.chunk[..], chunk_len).await?; + + // save left over bytes + self.chunk = bytes; + + Ok(Some(file_client)) + } +} + #[cfg(test)] mod tests { use super::*; @@ -280,6 +395,7 @@ mod tests { }; use assert_matches::assert_matches; use futures_util::stream::StreamExt; + use rand::Rng; use reth_interfaces::{ p2p::{ bodies::downloader::BodyDownloader, @@ -288,7 +404,7 @@ mod tests { test_utils::TestConsensus, }; use reth_provider::test_utils::create_test_provider_factory; - use std::sync::Arc; + use std::{mem, sync::Arc}; #[tokio::test] async fn streams_bodies_from_buffer() { @@ -350,6 +466,8 @@ mod tests { #[tokio::test] async fn test_download_headers_from_file() { + reth_tracing::init_test_tracing(); + // Generate some random blocks let (file, headers, _) = generate_bodies_file(0..=19).await; // now try to read them back @@ -395,4 +513,61 @@ mod tests { Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies)) ); } + + #[tokio::test] + async fn test_chunk_download_headers_from_file() { + reth_tracing::init_test_tracing(); + + // rig + + const MAX_BYTE_SIZE_HEADER: usize = 720; + + // Generate some random blocks + let (file, headers, bodies) = generate_bodies_file(0..=14).await; + // now try to read them back in chunks. + for header in &headers { + assert_eq!(720, mem::size_of_val(header)) + } + + // calculate min for chunk byte length range + let mut bodies_sizes = bodies.values().map(|body| body.size()).collect::>(); + bodies_sizes.sort(); + let max_block_size = MAX_BYTE_SIZE_HEADER + bodies_sizes.last().unwrap(); + let chunk_byte_len = rand::thread_rng().gen_range(max_block_size..=max_block_size + 10_000); + + trace!(target: "downloaders::file::test", chunk_byte_len); + + // init reader + let mut reader = ChunkedFileReader::from_file(file, chunk_byte_len as u64).await.unwrap(); + + let mut downloaded_headers: Vec = vec![]; + + let mut local_header = headers.first().unwrap().clone(); + + // test + + while let Some(client) = reader.next_chunk().await.unwrap() { + let sync_target = client.tip_header().unwrap(); + let sync_target_hash = sync_target.hash(); + + // construct headers downloader and use first header + let mut header_downloader = ReverseHeadersDownloaderBuilder::default() + .build(Arc::clone(&Arc::new(client)), Arc::new(TestConsensus::default())); + header_downloader.update_local_head(local_header.clone()); + header_downloader.update_sync_target(SyncTarget::Tip(sync_target_hash)); + + // get headers first + let mut downloaded_headers_chunk = header_downloader.next().await.unwrap().unwrap(); + + // export new local header to outer scope + local_header = sync_target; + + // reverse to make sure it's in the right order before comparing + downloaded_headers_chunk.reverse(); + downloaded_headers.extend_from_slice(&downloaded_headers_chunk); + } + + // the first header is not included in the response + assert_eq!(headers[1..], downloaded_headers); + } } diff --git a/crates/net/downloaders/src/file_codec.rs b/crates/net/downloaders/src/file_codec.rs index b73b15e88..156f3316c 100644 --- a/crates/net/downloaders/src/file_codec.rs +++ b/crates/net/downloaders/src/file_codec.rs @@ -31,9 +31,12 @@ impl Decoder for BlockFileCodec { if src.is_empty() { return Ok(None) } + let buf_slice = &mut src.as_ref(); - let body = Block::decode(buf_slice)?; + let body = + Block::decode(buf_slice).map_err(|err| FileClientError::Rlp(err, src.to_vec()))?; src.advance(src.len() - buf_slice.len()); + Ok(Some(body)) } }