feat(op): chunked chain import (#7574)

Co-authored-by: Atris <vacekj@outlook.com>
This commit is contained in:
Emilia Hane
2024-04-15 13:46:34 +02:00
committed by GitHub
parent 288b7fb37b
commit 8f1f386f52
5 changed files with 249 additions and 54 deletions

1
Cargo.lock generated
View File

@ -6408,6 +6408,7 @@ dependencies = [
"itertools 0.12.1",
"metrics",
"pin-project",
"rand 0.8.5",
"rayon",
"reth-config",
"reth-db",

View File

@ -15,7 +15,8 @@ use reth_beacon_consensus::BeaconConsensus;
use reth_config::Config;
use reth_db::{database::Database, init_db};
use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder, file_client::FileClient,
bodies::bodies::BodiesDownloaderBuilder,
file_client::{ChunkedFileReader, FileClient, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE},
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
};
use reth_interfaces::{
@ -76,6 +77,10 @@ pub struct ImportCommand {
#[arg(long, verbatim_doc_comment, env = OP_RETH_MAINNET_BELOW_BEDROCK)]
op_mainnet_below_bedrock: bool,
/// Chunk byte length.
#[arg(long, value_name = "CHUNK_LEN", verbatim_doc_comment)]
chunk_len: Option<u64>,
#[command(flatten)]
db: DatabaseArgs,
@ -101,6 +106,10 @@ impl ImportCommand {
debug!(target: "reth::cli", "Execution stage disabled");
}
debug!(target: "reth::cli",
chunk_byte_len=self.chunk_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE), "Chunking chain import"
);
// add network name to data dir
let data_dir = self.datadir.unwrap_or_chain_default(self.chain.chain);
let config_path = self.config.clone().unwrap_or_else(|| data_dir.config_path());
@ -123,49 +132,55 @@ impl ImportCommand {
let consensus = Arc::new(BeaconConsensus::new(self.chain.clone()));
info!(target: "reth::cli", "Consensus engine initialized");
// create a new FileClient
info!(target: "reth::cli", "Importing chain file");
let file_client = Arc::new(FileClient::new(&self.path).await?);
// open file
let mut reader = ChunkedFileReader::new(&self.path, self.chunk_len).await?;
// override the tip
let tip = file_client.tip().expect("file client has no tip");
info!(target: "reth::cli", "Chain file read");
while let Some(file_client) = reader.next_chunk().await? {
// create a new FileClient from chunk read from file
info!(target: "reth::cli",
"Importing chain file chunk"
);
let (mut pipeline, events) = self
.build_import_pipeline(
config,
provider_factory.clone(),
&consensus,
file_client,
StaticFileProducer::new(
// override the tip
let tip = file_client.tip().expect("file client has no tip");
info!(target: "reth::cli", "Chain file chunk read");
let (mut pipeline, events) = self
.build_import_pipeline(
&config,
provider_factory.clone(),
provider_factory.static_file_provider(),
PruneModes::default(),
),
self.disable_execution,
)
.await?;
&consensus,
Arc::new(file_client),
StaticFileProducer::new(
provider_factory.clone(),
provider_factory.static_file_provider(),
PruneModes::default(),
),
self.disable_execution,
)
.await?;
// override the tip
pipeline.set_tip(tip);
debug!(target: "reth::cli", ?tip, "Tip manually set");
// override the tip
pipeline.set_tip(tip);
debug!(target: "reth::cli", ?tip, "Tip manually set");
let provider = provider_factory.provider()?;
let provider = provider_factory.provider()?;
let latest_block_number =
provider.get_stage_checkpoint(StageId::Finish)?.map(|ch| ch.block_number);
tokio::spawn(reth_node_core::events::node::handle_events(
None,
latest_block_number,
events,
db.clone(),
));
let latest_block_number =
provider.get_stage_checkpoint(StageId::Finish)?.map(|ch| ch.block_number);
tokio::spawn(reth_node_core::events::node::handle_events(
None,
latest_block_number,
events,
db.clone(),
));
// Run pipeline
info!(target: "reth::cli", "Starting sync pipeline");
tokio::select! {
res = pipeline.run() => res?,
_ = tokio::signal::ctrl_c() => {},
// Run pipeline
info!(target: "reth::cli", "Starting sync pipeline");
tokio::select! {
res = pipeline.run() => res?,
_ = tokio::signal::ctrl_c() => {},
}
}
info!(target: "reth::cli", "Chain file imported");
@ -174,7 +189,7 @@ impl ImportCommand {
async fn build_import_pipeline<DB, C>(
&self,
config: Config,
config: &Config,
provider_factory: ProviderFactory<DB>,
consensus: &Arc<C>,
file_client: Arc<FileClient>,
@ -220,7 +235,7 @@ impl ImportCommand {
header_downloader,
body_downloader,
factory.clone(),
config.stages.etl,
config.stages.etl.clone(),
)
.set(SenderRecoveryStage {
commit_threshold: config.stages.sender_recovery.commit_threshold,
@ -239,7 +254,7 @@ impl ImportCommand {
.clean_threshold
.max(config.stages.account_hashing.clean_threshold)
.max(config.stages.storage_hashing.clean_threshold),
config.prune.map(|prune| prune.segments).unwrap_or_default(),
config.prune.clone().map(|prune| prune.segments).unwrap_or_default(),
))
.disable_if(StageId::Execution, || disable_execution),
)

View File

@ -52,6 +52,7 @@ assert_matches.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
alloy-rlp.workspace = true
itertools.workspace = true
rand.workspace = true
tempfile.workspace = true

View File

@ -8,8 +8,8 @@ use reth_interfaces::p2p::{
priority::Priority,
};
use reth_primitives::{
BlockBody, BlockHash, BlockHashOrNumber, BlockNumber, Header, HeadersDirection, PeerId,
SealedHeader, B256,
BlockBody, BlockHash, BlockHashOrNumber, BlockNumber, BytesMut, Header, HeadersDirection,
PeerId, SealedHeader, B256,
};
use std::{collections::HashMap, path::Path};
use thiserror::Error;
@ -18,6 +18,9 @@ use tokio_stream::StreamExt;
use tokio_util::codec::FramedRead;
use tracing::{trace, warn};
/// Byte length of chunk to read from chain file.
pub const DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE: u64 = 1_000_000_000;
/// Front-end API for fetching chain data from a file.
///
/// Blocks are assumed to be written one after another in a file, as rlp bytes.
@ -49,8 +52,8 @@ pub enum FileClientError {
Io(#[from] std::io::Error),
/// An error occurred when decoding blocks, headers, or rlp headers from the file.
#[error(transparent)]
Rlp(#[from] alloy_rlp::Error),
#[error("{0}")]
Rlp(alloy_rlp::Error, Vec<u8>),
}
impl FileClient {
@ -66,23 +69,41 @@ impl FileClient {
let metadata = file.metadata().await?;
let file_len = metadata.len();
// todo: read chunks into memory. for op mainnet 1/8 th of blocks below bedrock can be
// decoded at once
let mut reader = vec![];
file.read_to_end(&mut reader).await.unwrap();
Ok(Self::from_reader(&reader[..], file_len).await?.0)
}
/// Initialize the [`FileClient`] from bytes that have been read from file.
pub(crate) async fn from_reader<B>(
reader: B,
num_bytes: u64,
) -> Result<(Self, Vec<u8>), FileClientError>
where
B: AsyncReadExt + Unpin,
{
let mut headers = HashMap::new();
let mut hash_to_number = HashMap::new();
let mut bodies = HashMap::new();
// use with_capacity to make sure the internal buffer contains the entire file
let mut stream = FramedRead::with_capacity(&reader[..], BlockFileCodec, file_len as usize);
let mut stream = FramedRead::with_capacity(reader, BlockFileCodec, num_bytes as usize);
let mut remaining_bytes = vec![];
let mut log_interval = 0;
let mut log_interval_start_block = 0;
while let Some(block_res) = stream.next().await {
let block = block_res?;
let block = match block_res {
Ok(block) => block,
Err(FileClientError::Rlp(_err, bytes)) => {
remaining_bytes = bytes;
break
}
Err(err) => return Err(err),
};
let block_number = block.header.number;
let block_hash = block.header.hash_slow();
@ -99,11 +120,15 @@ impl FileClient {
);
if log_interval == 0 {
trace!(target: "downloaders::file",
block_number,
"read first block"
);
log_interval_start_block = block_number;
} else if log_interval % 100000 == 0 {
} else if log_interval % 100_000 == 0 {
trace!(target: "downloaders::file",
blocks=?log_interval_start_block..=block_number,
"inserted blocks into db"
"read blocks from file"
);
log_interval_start_block = block_number + 1;
}
@ -112,12 +137,12 @@ impl FileClient {
trace!(blocks = headers.len(), "Initialized file client");
Ok(Self { headers, hash_to_number, bodies })
Ok((Self { headers, hash_to_number, bodies }, remaining_bytes))
}
/// Get the tip hash of the chain.
pub fn tip(&self) -> Option<B256> {
self.headers.get(&((self.headers.len() - 1) as u64)).map(|h| h.hash_slow())
self.headers.get(&self.max_block()?).map(|h| h.hash_slow())
}
/// Get the start hash of the chain.
@ -267,6 +292,96 @@ impl DownloadClient for FileClient {
}
}
/// Chunks file into several [`FileClient`]s.
#[derive(Debug)]
pub struct ChunkedFileReader {
/// File to read from.
file: File,
/// Current file length.
file_len: u64,
/// Bytes that have been read.
chunk: Vec<u8>,
/// Max bytes per chunk.
chunk_byte_len: u64,
}
impl ChunkedFileReader {
/// Opens the file to import from given path. Returns a new instance. If no chunk byte length
/// is passed, chunks have [`DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE`] (one static file).
pub async fn new<P: AsRef<Path>>(
path: P,
chunk_byte_len: Option<u64>,
) -> Result<Self, FileClientError> {
let file = File::open(path).await?;
let chunk_byte_len = chunk_byte_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE);
Self::from_file(file, chunk_byte_len).await
}
/// Opens the file to import from given path. Returns a new instance.
pub async fn from_file(file: File, chunk_byte_len: u64) -> Result<Self, FileClientError> {
// get file len from metadata before reading
let metadata = file.metadata().await?;
let file_len = metadata.len();
Ok(Self { file, file_len, chunk: vec![], chunk_byte_len })
}
/// Calculates the number of bytes to read from the chain file. Returns a tuple of the chunk
/// length and the remaining file length.
fn chunk_len(&self) -> u64 {
let Self { chunk_byte_len, file_len, .. } = *self;
let file_len = file_len + self.chunk.len() as u64;
if chunk_byte_len > file_len {
// last chunk
file_len
} else {
chunk_byte_len
}
}
/// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
pub async fn next_chunk(&mut self) -> Result<Option<FileClient>, FileClientError> {
if self.file_len == 0 && self.chunk.is_empty() {
// eof
return Ok(None)
}
let chunk_len = self.chunk_len();
let old_bytes_len = self.chunk.len() as u64;
// calculate reserved space in chunk
let new_bytes_len = chunk_len - old_bytes_len;
// read new bytes from file
let mut reader = BytesMut::with_capacity(new_bytes_len as usize);
self.file.read_buf(&mut reader).await?;
// update remaining file length
self.file_len -= new_bytes_len;
trace!(target: "downloaders::file",
max_chunk_byte_len=self.chunk_byte_len,
prev_read_bytes_len=self.chunk.len(),
new_bytes_len,
remaining_file_byte_len=self.file_len,
"new bytes were read from file"
);
// read new bytes from file into chunk
self.chunk.extend_from_slice(&reader[..]);
// make new file client from chunk
let (file_client, bytes) = FileClient::from_reader(&self.chunk[..], chunk_len).await?;
// save left over bytes
self.chunk = bytes;
Ok(Some(file_client))
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -280,6 +395,7 @@ mod tests {
};
use assert_matches::assert_matches;
use futures_util::stream::StreamExt;
use rand::Rng;
use reth_interfaces::{
p2p::{
bodies::downloader::BodyDownloader,
@ -288,7 +404,7 @@ mod tests {
test_utils::TestConsensus,
};
use reth_provider::test_utils::create_test_provider_factory;
use std::sync::Arc;
use std::{mem, sync::Arc};
#[tokio::test]
async fn streams_bodies_from_buffer() {
@ -350,6 +466,8 @@ mod tests {
#[tokio::test]
async fn test_download_headers_from_file() {
reth_tracing::init_test_tracing();
// Generate some random blocks
let (file, headers, _) = generate_bodies_file(0..=19).await;
// now try to read them back
@ -395,4 +513,61 @@ mod tests {
Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
);
}
#[tokio::test]
async fn test_chunk_download_headers_from_file() {
reth_tracing::init_test_tracing();
// rig
const MAX_BYTE_SIZE_HEADER: usize = 720;
// Generate some random blocks
let (file, headers, bodies) = generate_bodies_file(0..=14).await;
// now try to read them back in chunks.
for header in &headers {
assert_eq!(720, mem::size_of_val(header))
}
// calculate min for chunk byte length range
let mut bodies_sizes = bodies.values().map(|body| body.size()).collect::<Vec<_>>();
bodies_sizes.sort();
let max_block_size = MAX_BYTE_SIZE_HEADER + bodies_sizes.last().unwrap();
let chunk_byte_len = rand::thread_rng().gen_range(max_block_size..=max_block_size + 10_000);
trace!(target: "downloaders::file::test", chunk_byte_len);
// init reader
let mut reader = ChunkedFileReader::from_file(file, chunk_byte_len as u64).await.unwrap();
let mut downloaded_headers: Vec<SealedHeader> = vec![];
let mut local_header = headers.first().unwrap().clone();
// test
while let Some(client) = reader.next_chunk().await.unwrap() {
let sync_target = client.tip_header().unwrap();
let sync_target_hash = sync_target.hash();
// construct headers downloader and use first header
let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
.build(Arc::clone(&Arc::new(client)), Arc::new(TestConsensus::default()));
header_downloader.update_local_head(local_header.clone());
header_downloader.update_sync_target(SyncTarget::Tip(sync_target_hash));
// get headers first
let mut downloaded_headers_chunk = header_downloader.next().await.unwrap().unwrap();
// export new local header to outer scope
local_header = sync_target;
// reverse to make sure it's in the right order before comparing
downloaded_headers_chunk.reverse();
downloaded_headers.extend_from_slice(&downloaded_headers_chunk);
}
// the first header is not included in the response
assert_eq!(headers[1..], downloaded_headers);
}
}

View File

@ -31,9 +31,12 @@ impl Decoder for BlockFileCodec {
if src.is_empty() {
return Ok(None)
}
let buf_slice = &mut src.as_ref();
let body = Block::decode(buf_slice)?;
let body =
Block::decode(buf_slice).map_err(|err| FileClientError::Rlp(err, src.to_vec()))?;
src.advance(src.len() - buf_slice.len());
Ok(Some(body))
}
}