use alloy_consensus::Header; use alloy_primitives::{B256, BlockHash, Bytes, U256, b256, hex::ToHexExt}; use reth::{ api::NodeTypesWithDBAdapter, args::{DatabaseArgs, DatadirArgs}, dirs::{ChainPath, DataDirPath}, }; use reth_chainspec::EthChainSpec; use reth_db::{ DatabaseEnv, mdbx::{RO, tx::Tx}, models::CompactU256, static_file::iter_static_files, table::Decompress, tables, }; use reth_db_api::{ cursor::{DbCursorRO, DbCursorRW}, transaction::{DbTx, DbTxMut}, }; use reth_errors::ProviderResult; use reth_ethereum_primitives::EthereumReceipt; use reth_provider::{ DatabaseProvider, ProviderFactory, ReceiptProvider, StaticFileProviderFactory, StaticFileSegment, StaticFileWriter, providers::{NodeTypesForProvider, StaticFileProvider}, static_file::SegmentRangeInclusive, }; use std::{fs::File, io::Write, path::PathBuf, sync::Arc}; use tracing::{info, warn}; use crate::{HlHeader, HlPrimitives, chainspec::HlChainSpec}; pub(crate) trait HlNodeType: NodeTypesForProvider { } impl> HlNodeType for N {} pub(super) struct Migrator { data_dir: ChainPath, provider_factory: ProviderFactory>>, } impl Migrator { const MIGRATION_PATH_SUFFIX: &'static str = "migration-tmp"; pub fn new( chain_spec: HlChainSpec, datadir: DatadirArgs, database_args: DatabaseArgs, ) -> eyre::Result { let data_dir = datadir.clone().resolve_datadir(chain_spec.chain()); let provider_factory = Self::provider_factory(chain_spec, datadir, database_args)?; Ok(Self { data_dir, provider_factory }) } pub fn sf_provider(&self) -> StaticFileProvider { self.provider_factory.static_file_provider() } pub fn migrate_db(&self) -> eyre::Result<()> { let is_empty = Self::highest_block_number(&self.sf_provider()).is_none(); if is_empty { return Ok(()); } self.migrate_db_inner() } fn highest_block_number(sf_provider: &StaticFileProvider) -> Option { sf_provider.get_highest_static_file_block(StaticFileSegment::Headers) } fn migrate_db_inner(&self) -> eyre::Result<()> { let migrated_mdbx = MigratorMdbx::(self).migrate_mdbx()?; let migrated_static_files = MigrateStaticFiles::(self).migrate_static_files()?; if migrated_mdbx || migrated_static_files { info!("Database migrated successfully"); } Ok(()) } fn conversion_tmp_dir(&self) -> PathBuf { self.data_dir.data_dir().join(Self::MIGRATION_PATH_SUFFIX) } fn provider_factory( chain_spec: HlChainSpec, datadir: DatadirArgs, database_args: DatabaseArgs, ) -> eyre::Result>>> { let data_dir = datadir.clone().resolve_datadir(chain_spec.chain()); let db_env = reth_db::init_db(data_dir.db(), database_args.database_args())?; let static_file_provider = StaticFileProvider::read_only(data_dir.static_files(), false)?; let db = Arc::new(db_env); Ok(ProviderFactory::new(db, Arc::new(chain_spec), static_file_provider)) } } struct MigratorMdbx<'a, N: HlNodeType>(&'a Migrator); impl<'a, N: HlNodeType> MigratorMdbx<'a, N> { fn migrate_mdbx(&self) -> eyre::Result { // if any header is in old format, we need to migrate it, so we pick the first and last one let db_env = self.0.provider_factory.provider()?; let mut cursor = db_env.tx_ref().cursor_read::>()?; let migration_needed = { let first_is_old = match cursor.first()? { Some((number, header)) => using_old_header(number, &header), None => false, }; let last_is_old = match cursor.last()? { Some((number, header)) => using_old_header(number, &header), None => false, }; first_is_old || last_is_old }; if !migration_needed { return Ok(false); } check_if_migration_enabled()?; self.migrate_mdbx_inner()?; Ok(true) } fn migrate_mdbx_inner(&self) -> eyre::Result<()> { // There shouldn't be many headers in mdbx, but using file for safety info!("Old database detected, migrating mdbx..."); let conversion_tmp = self.0.conversion_tmp_dir(); let tmp_path = conversion_tmp.join("headers.rmp"); if conversion_tmp.exists() { std::fs::remove_dir_all(&conversion_tmp)?; } std::fs::create_dir_all(&conversion_tmp)?; let count = self.export_old_headers(&tmp_path)?; self.import_new_headers(tmp_path, count)?; Ok(()) } fn export_old_headers(&self, tmp_path: &PathBuf) -> Result { let db_env = self.0.provider_factory.provider()?; let mut cursor_read = db_env.tx_ref().cursor_read::>()?; let mut tmp_writer = File::create(tmp_path)?; let mut count = 0; let old_headers = cursor_read.walk(None)?.filter_map(|row| { let (block_number, header) = row.ok()?; if !using_old_header(block_number, &header) { None } else { Some((block_number, Header::decompress(&header).ok()?)) } }); for (block_number, header) in old_headers { let receipt = db_env.receipts_by_block(block_number.into())?.expect("Receipt not found"); let new_header = to_hl_header(receipt, header); tmp_writer.write_all(&rmp_serde::to_vec(&(block_number, new_header))?)?; count += 1; } Ok(count) } fn import_new_headers(&self, tmp_path: PathBuf, count: i32) -> Result<(), eyre::Error> { let mut tmp_reader = File::open(tmp_path)?; let db_env = self.0.provider_factory.provider_rw()?; let mut cursor_write = db_env.tx_ref().cursor_write::>()?; for _ in 0..count { let (number, header) = rmp_serde::from_read::<_, (u64, HlHeader)>(&mut tmp_reader)?; cursor_write.upsert(number, &rmp_serde::to_vec(&header)?.into())?; } db_env.commit()?; Ok(()) } } fn check_if_migration_enabled() -> Result<(), eyre::Error> { if std::env::var("EXPERIMENTAL_MIGRATE_DB").is_err() { let err_msg = concat!( "Detected an old database format but experimental database migration is currently disabled. ", "To enable migration, set EXPERIMENTAL_MIGRATE_DB=1, or alternatively, resync your node (safest option)." ); warn!("{}", err_msg); return Err(eyre::eyre!("{}", err_msg)); } Ok(()) } struct MigrateStaticFiles<'a, N: HlNodeType>(&'a Migrator); impl<'a, N: HlNodeType> MigrateStaticFiles<'a, N> { fn iterate_files_for_segment( &self, block_range: SegmentRangeInclusive, dir: &PathBuf, ) -> eyre::Result> { let prefix = StaticFileSegment::Headers.filename(&block_range); let entries = std::fs::read_dir(dir)? .map(|res| res.map(|e| e.path())) .collect::, _>>()?; Ok(entries .into_iter() .filter_map(|path| { let file_name = path.file_name().and_then(|f| f.to_str())?; if file_name.starts_with(&prefix) { Some((path.clone(), file_name.to_string())) } else { None } }) .collect()) } fn create_placeholder(&self, block_range: SegmentRangeInclusive) -> eyre::Result<()> { // The direction is opposite here let src = self.0.data_dir.static_files(); let dst = self.0.conversion_tmp_dir(); for (src_path, file_name) in self.iterate_files_for_segment(block_range, &src)? { let dst_path = dst.join(file_name); if dst_path.exists() { std::fs::remove_file(&dst_path)?; } std::os::unix::fs::symlink(src_path, dst_path)?; } Ok(()) } fn move_static_files_for_segment( &self, block_range: SegmentRangeInclusive, ) -> eyre::Result<()> { let src = self.0.conversion_tmp_dir(); let dst = self.0.data_dir.static_files(); for (src_path, file_name) in self.iterate_files_for_segment(block_range, &src)? { let dst_path = dst.join(file_name); std::fs::remove_file(&dst_path)?; std::fs::rename(&src_path, &dst_path)?; } // Still StaticFileProvider needs the file to exist, so we create a symlink self.create_placeholder(block_range) } fn migrate_static_files(&self) -> eyre::Result { let conversion_tmp = self.0.conversion_tmp_dir(); let old_path = self.0.data_dir.static_files(); if conversion_tmp.exists() { std::fs::remove_dir_all(&conversion_tmp)?; } std::fs::create_dir_all(&conversion_tmp)?; let mut all_static_files = iter_static_files(&old_path)?; let all_static_files = all_static_files.remove(&StaticFileSegment::Headers).unwrap_or_default(); let mut first = true; for (block_range, _tx_ranges) in all_static_files { let migration_needed = self.using_old_header(block_range.start())? || self.using_old_header(block_range.end())?; if !migration_needed { // Create a placeholder symlink self.create_placeholder(block_range)?; continue; } if first { check_if_migration_enabled()?; info!("Old database detected, migrating static files..."); first = false; } let sf_provider = self.0.sf_provider(); let sf_tmp_provider = StaticFileProvider::::read_write(&conversion_tmp)?; let provider = self.0.provider_factory.provider()?; let block_range_for_filename = sf_provider.find_fixed_range(block_range.start()); migrate_single_static_file(&sf_tmp_provider, &sf_provider, &provider, block_range)?; self.move_static_files_for_segment(block_range_for_filename)?; } Ok(!first) } fn using_old_header(&self, number: u64) -> eyre::Result { let sf_provider = self.0.sf_provider(); let content = old_headers_range(&sf_provider, number..=number)?; let &[row] = &content.as_slice() else { warn!("No header found for block {}", number); return Ok(false); }; Ok(using_old_header(number, &row[0])) } } // Problem is that decompress just panics when the header is not valid // So we need heuristics... fn is_old_header(header: &[u8]) -> bool { const SHA3_UNCLE_OFFSET: usize = 0x24; const SHA3_UNCLE_HASH: B256 = b256!("1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347"); const GENESIS_PREFIX: [u8; 4] = [0x01, 0x20, 0x00, 0xf8]; let Some(sha3_uncle_hash) = header.get(SHA3_UNCLE_OFFSET..SHA3_UNCLE_OFFSET + 32) else { return false; }; if sha3_uncle_hash == SHA3_UNCLE_HASH { return true; } // genesis block might be different if header.starts_with(&GENESIS_PREFIX) { return true; } false } fn is_new_header(header: &[u8]) -> bool { rmp_serde::from_slice::(header).is_ok() } fn migrate_single_static_file( sf_out: &StaticFileProvider, sf_in: &StaticFileProvider, provider: &DatabaseProvider, NodeTypesWithDBAdapter>>, block_range: SegmentRangeInclusive, ) -> Result<(), eyre::Error> { info!("Migrating block range {}...", block_range); // block_ranges into chunks of 50000 blocks const CHUNK_SIZE: u64 = 50000; for chunk in (block_range.start()..=block_range.end()).step_by(CHUNK_SIZE as usize) { let end = std::cmp::min(chunk + CHUNK_SIZE - 1, block_range.end()); let block_range = chunk..=end; let headers = old_headers_range(sf_in, block_range.clone())?; let receipts = provider.receipts_by_block_range(block_range.clone())?; assert_eq!(headers.len(), receipts.len()); let mut writer = sf_out.get_writer(*block_range.start(), StaticFileSegment::Headers)?; let new_headers = std::iter::zip(headers, receipts) .map(|(header, receipts)| { let eth_header = Header::decompress(&header[0]).unwrap(); let hl_header = to_hl_header(receipts, eth_header); let difficulty: U256 = CompactU256::decompress(&header[1]).unwrap().into(); let hash = BlockHash::decompress(&header[2]).unwrap(); (hl_header, difficulty, hash) }) .collect::>(); for header in new_headers { writer.append_header(&header.0, header.1, &header.2)?; } writer.commit().unwrap(); info!("Migrated block range {:?}...", block_range); } Ok(()) } fn to_hl_header(receipts: Vec, eth_header: Header) -> HlHeader { let system_tx_count = receipts.iter().filter(|r| r.cumulative_gas_used == 0).count(); HlHeader::from_ethereum_header(eth_header, &receipts, system_tx_count as u64) } fn old_headers_range( provider: &StaticFileProvider, block_range: impl std::ops::RangeBounds, ) -> ProviderResult>>> { Ok(provider .fetch_range_with_predicate( StaticFileSegment::Headers, to_range(block_range), |cursor, number| { cursor.get(number.into(), 0b111).map(|rows| { rows.map(|columns| columns.into_iter().map(|column| column.to_vec()).collect()) }) }, |_| true, )? .into_iter() .collect()) } // Copied from reth fn to_range>(bounds: R) -> std::ops::Range { let start = match bounds.start_bound() { std::ops::Bound::Included(&v) => v, std::ops::Bound::Excluded(&v) => v + 1, std::ops::Bound::Unbounded => 0, }; let end = match bounds.end_bound() { std::ops::Bound::Included(&v) => v + 1, std::ops::Bound::Excluded(&v) => v, std::ops::Bound::Unbounded => u64::MAX, }; start..end } fn using_old_header(number: u64, header: &[u8]) -> bool { let deserialized_old = is_old_header(header); let deserialized_new = is_new_header(header); assert!( deserialized_old ^ deserialized_new, "Header is not valid: {} {}\ndeserialized_old: {}\ndeserialized_new: {}", number, header.encode_hex(), deserialized_old, deserialized_new ); deserialized_old && !deserialized_new }