perf: chunkify block ranges

This commit is contained in:
sprites0
2025-10-08 13:54:16 +00:00
parent 7e169d409d
commit 233026871f

View File

@ -20,7 +20,7 @@ use reth_provider::{
DatabaseProvider, ProviderFactory, ReceiptProvider, StaticFileProviderFactory, DatabaseProvider, ProviderFactory, ReceiptProvider, StaticFileProviderFactory,
StaticFileSegment, StaticFileWriter, StaticFileSegment, StaticFileWriter,
}; };
use std::{marker::PhantomData, ops::RangeInclusive, path::PathBuf, sync::Arc}; use std::{marker::PhantomData, path::PathBuf, sync::Arc};
use tracing::{info, warn}; use tracing::{info, warn};
use crate::{chainspec::HlChainSpec, HlHeader, HlPrimitives}; use crate::{chainspec::HlChainSpec, HlHeader, HlPrimitives};
@ -166,12 +166,7 @@ where
let sf_provider = self.sf_provider(); let sf_provider = self.sf_provider();
let sf_tmp_provider = StaticFileProvider::<HlPrimitives>::read_write(&conversion_tmp)?; let sf_tmp_provider = StaticFileProvider::<HlPrimitives>::read_write(&conversion_tmp)?;
let block_range_for_filename = sf_provider.find_fixed_range(block_range.start()); let block_range_for_filename = sf_provider.find_fixed_range(block_range.start());
migrate_single_static_file( migrate_single_static_file(&sf_tmp_provider, &sf_provider, &provider, block_range)?;
&sf_tmp_provider,
&sf_provider,
&provider,
block_range,
)?;
self.move_static_files_for_segment(block_range_for_filename)?; self.move_static_files_for_segment(block_range_for_filename)?;
} }
@ -253,17 +248,21 @@ fn migrate_single_static_file<N: NodeTypesForProvider<Primitives = HlPrimitives>
provider: &DatabaseProvider<Tx<RO>, NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>, provider: &DatabaseProvider<Tx<RO>, NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
block_range: SegmentRangeInclusive, block_range: SegmentRangeInclusive,
) -> Result<(), eyre::Error> { ) -> Result<(), eyre::Error> {
let block_range: RangeInclusive<u64> = block_range.into(); info!("Migrating block range {}...", block_range);
info!("Migrating block range {:?}...", block_range);
info!("Loading headers"); // block_ranges into chunks of 100000 blocks
const CHUNK_SIZE: u64 = 100000;
for chunk in (0..=block_range.end()).step_by(CHUNK_SIZE as usize) {
let end = std::cmp::min(chunk + CHUNK_SIZE - 1, block_range.end());
let block_range = chunk..=end;
let headers = old_headers_range(sf_in, block_range.clone())?; let headers = old_headers_range(sf_in, block_range.clone())?;
info!("Loading receipts");
let receipts = provider.receipts_by_block_range(block_range.clone())?; let receipts = provider.receipts_by_block_range(block_range.clone())?;
assert_eq!(headers.len(), receipts.len()); assert_eq!(headers.len(), receipts.len());
let mut writer = sf_out.get_writer(*block_range.start(), StaticFileSegment::Headers)?; let mut writer = sf_out.get_writer(*block_range.start(), StaticFileSegment::Headers)?;
let new_headers = std::iter::zip(headers, receipts) let new_headers = std::iter::zip(headers, receipts)
.map(|(header, receipts)| { .map(|(header, receipts)| {
let system_tx_count = receipts.iter().filter(|r| r.cumulative_gas_used == 0).count(); let system_tx_count =
receipts.iter().filter(|r| r.cumulative_gas_used == 0).count();
let eth_header = Header::decompress(&header[0]).unwrap(); let eth_header = Header::decompress(&header[0]).unwrap();
let hl_header = let hl_header =
HlHeader::from_ethereum_header(eth_header, &receipts, system_tx_count as u64); HlHeader::from_ethereum_header(eth_header, &receipts, system_tx_count as u64);
@ -277,6 +276,8 @@ fn migrate_single_static_file<N: NodeTypesForProvider<Primitives = HlPrimitives>
writer.append_header(&header.0, header.1, &header.2)?; writer.append_header(&header.0, header.1, &header.2)?;
} }
writer.commit().unwrap(); writer.commit().unwrap();
info!("Migrated block range {:?}...", block_range);
}
Ok(()) Ok(())
} }