From 13b63ff136fee19191cd9c199fe81e9c31f536bb Mon Sep 17 00:00:00 2001 From: sprites0 <199826320+sprites0@users.noreply.github.com> Date: Thu, 9 Oct 2025 06:35:56 +0000 Subject: [PATCH] feat: add migrator for mdbx as well --- src/node/migrate.rs | 209 +++++++++++++++++++++++++++++++------------- 1 file changed, 148 insertions(+), 61 deletions(-) diff --git a/src/node/migrate.rs b/src/node/migrate.rs index 7d3634a68..650e098a6 100644 --- a/src/node/migrate.rs +++ b/src/node/migrate.rs @@ -1,7 +1,7 @@ use alloy_consensus::Header; -use alloy_primitives::{b256, hex::ToHexExt, BlockHash, B256, U256}; +use alloy_primitives::{b256, hex::ToHexExt, BlockHash, Bytes, B256, U256}; use reth::{ - api::{NodeTypes, NodeTypesWithDBAdapter}, + api::NodeTypesWithDBAdapter, args::{DatabaseArgs, DatadirArgs}, dirs::{ChainPath, DataDirPath}, }; @@ -11,30 +11,37 @@ use reth_db::{ models::CompactU256, static_file::iter_static_files, table::Decompress, - DatabaseEnv, + tables, DatabaseEnv, +}; +use reth_db_api::{ + cursor::{DbCursorRO, DbCursorRW}, + transaction::{DbTx, DbTxMut}, }; use reth_errors::ProviderResult; +use reth_ethereum_primitives::EthereumReceipt; use reth_provider::{ providers::{NodeTypesForProvider, StaticFileProvider}, static_file::SegmentRangeInclusive, DatabaseProvider, ProviderFactory, ReceiptProvider, StaticFileProviderFactory, StaticFileSegment, StaticFileWriter, }; -use std::{marker::PhantomData, path::PathBuf, sync::Arc}; +use std::{fs::File, io::Write, path::PathBuf, sync::Arc}; use tracing::{info, warn}; use crate::{chainspec::HlChainSpec, HlHeader, HlPrimitives}; -pub(super) struct Migrator { +pub(crate) trait HlNodeType: + NodeTypesForProvider +{ +} +impl> HlNodeType for N {} + +pub(super) struct Migrator { data_dir: ChainPath, provider_factory: ProviderFactory>>, - _nt: PhantomData, } -impl Migrator -where - N: NodeTypes, -{ +impl Migrator { const MIGRATION_PATH_SUFFIX: &'static str = "migration-tmp"; pub fn new( @@ -44,7 +51,7 @@ where ) -> eyre::Result { let data_dir = datadir.clone().resolve_datadir(chain_spec.chain()); let provider_factory = Self::provider_factory(chain_spec, datadir, database_args)?; - Ok(Self { data_dir, provider_factory, _nt: PhantomData }) + Ok(Self { data_dir, provider_factory }) } pub fn sf_provider(&self) -> StaticFileProvider { @@ -66,9 +73,12 @@ where } fn migrate_db_inner(&self) -> eyre::Result<()> { - self.migrate_static_files()?; - self.migrate_mdbx()?; - info!("Database migrated successfully"); + let migrated_mdbx = MigratorMdbx::(self).migrate_mdbx()?; + let migrated_static_files = MigrateStaticFiles::(self).migrate_static_files()?; + + if migrated_mdbx || migrated_static_files { + info!("Database migrated successfully"); + } Ok(()) } @@ -76,6 +86,95 @@ where self.data_dir.data_dir().join(Self::MIGRATION_PATH_SUFFIX) } + fn provider_factory( + chain_spec: HlChainSpec, + datadir: DatadirArgs, + database_args: DatabaseArgs, + ) -> eyre::Result>>> { + let data_dir = datadir.clone().resolve_datadir(chain_spec.chain()); + let db_env = reth_db::init_db(data_dir.db(), database_args.database_args())?; + let static_file_provider = StaticFileProvider::read_only(data_dir.static_files(), false)?; + let db = Arc::new(db_env); + Ok(ProviderFactory::new(db, Arc::new(chain_spec), static_file_provider)) + } +} + +struct MigratorMdbx<'a, N: HlNodeType>(&'a Migrator); + +impl<'a, N: HlNodeType> MigratorMdbx<'a, N> { + fn migrate_mdbx(&self) -> eyre::Result { + // if any header is in old format, we need to migrate it, so we pick the first and last one + let db_env = self.0.provider_factory.provider()?; + let mut cursor = db_env.tx_ref().cursor_read::>()?; + + let migration_needed = { + let first_is_old = match cursor.first()? { + Some((number, header)) => using_old_header(number, &header), + None => false, + }; + let last_is_old = match cursor.last()? { + Some((number, header)) => using_old_header(number, &header), + None => false, + }; + first_is_old || last_is_old + }; + + if !migration_needed { + return Ok(false); + } + + self.migrate_mdbx_inner()?; + Ok(true) + } + + fn migrate_mdbx_inner(&self) -> eyre::Result<()> { + // There shouldn't be many headers in mdbx, but using file for safety + info!("Old database detected, migrating mdbx..."); + let tmp_path = self.0.conversion_tmp_dir().join("headers.rmp"); + let count = self.export_old_headers(&tmp_path)?; + self.import_new_headers(tmp_path, count)?; + Ok(()) + } + + fn export_old_headers(&self, tmp_path: &PathBuf) -> Result { + let db_env = self.0.provider_factory.provider()?; + let mut cursor_read = db_env.tx_ref().cursor_read::>()?; + let mut tmp_writer = File::create(tmp_path)?; + let mut count = 0; + let old_headers = cursor_read.walk(None)?.filter_map(|row| { + let (block_number, header) = row.ok()?; + if !using_old_header(block_number, &header) { + None + } else { + Some((block_number, Header::decompress(&header).ok()?)) + } + }); + for (block_number, header) in old_headers { + let receipt = + db_env.receipts_by_block(block_number.into())?.expect("Receipt not found"); + let new_header = to_hl_header(receipt, header); + tmp_writer.write_all(&rmp_serde::to_vec(&(block_number, new_header))?)?; + count += 1; + } + Ok(count) + } + + fn import_new_headers(&self, tmp_path: PathBuf, count: i32) -> Result<(), eyre::Error> { + let mut tmp_reader = File::open(tmp_path)?; + let db_env = self.0.provider_factory.provider_rw()?; + let mut cursor_write = db_env.tx_ref().cursor_write::>()?; + for _ in 0..count { + let (number, header) = rmp_serde::from_read::<_, (u64, HlHeader)>(&mut tmp_reader)?; + cursor_write.upsert(number, &rmp_serde::to_vec(&header)?.into())?; + } + db_env.commit()?; + Ok(()) + } +} + +struct MigrateStaticFiles<'a, N: HlNodeType>(&'a Migrator); + +impl<'a, N: HlNodeType> MigrateStaticFiles<'a, N> { fn iterate_files_for_segment( &self, block_range: SegmentRangeInclusive, @@ -102,8 +201,8 @@ where fn create_placeholder(&self, block_range: SegmentRangeInclusive) -> eyre::Result<()> { // The direction is opposite here - let src = self.data_dir.static_files(); - let dst = self.conversion_tmp_dir(); + let src = self.0.data_dir.static_files(); + let dst = self.0.conversion_tmp_dir(); for (src_path, file_name) in self.iterate_files_for_segment(block_range, &src)? { let dst_path = dst.join(file_name); @@ -120,8 +219,8 @@ where &self, block_range: SegmentRangeInclusive, ) -> eyre::Result<()> { - let src = self.conversion_tmp_dir(); - let dst = self.data_dir.static_files(); + let src = self.0.conversion_tmp_dir(); + let dst = self.0.data_dir.static_files(); for (src_path, file_name) in self.iterate_files_for_segment(block_range, &src)? { let dst_path = dst.join(file_name); @@ -133,9 +232,9 @@ where self.create_placeholder(block_range) } - fn migrate_static_files(&self) -> eyre::Result<()> { - let conversion_tmp = self.conversion_tmp_dir(); - let old_path = self.data_dir.static_files(); + fn migrate_static_files(&self) -> eyre::Result { + let conversion_tmp = self.0.conversion_tmp_dir(); + let old_path = self.0.data_dir.static_files(); if conversion_tmp.exists() { std::fs::remove_dir_all(&conversion_tmp)?; @@ -145,7 +244,7 @@ where let mut all_static_files = iter_static_files(&old_path)?; let all_static_files = all_static_files.remove(&StaticFileSegment::Headers).unwrap_or_default(); - let provider = self.provider_factory.provider()?; + let provider = self.0.provider_factory.provider()?; let mut first = true; @@ -159,11 +258,11 @@ where } if first { - info!("Old database detected, migrating database..."); + info!("Old database detected, migrating static files..."); first = false; } - let sf_provider = self.sf_provider(); + let sf_provider = self.0.sf_provider(); let sf_tmp_provider = StaticFileProvider::::read_write(&conversion_tmp)?; let block_range_for_filename = sf_provider.find_fixed_range(block_range.start()); migrate_single_static_file(&sf_tmp_provider, &sf_provider, &provider, block_range)?; @@ -171,48 +270,19 @@ where self.move_static_files_for_segment(block_range_for_filename)?; } - Ok(()) - } - - fn provider_factory( - chain_spec: HlChainSpec, - datadir: DatadirArgs, - database_args: DatabaseArgs, - ) -> eyre::Result>>> { - let data_dir = datadir.clone().resolve_datadir(chain_spec.chain()); - let db_env = reth_db::init_db(data_dir.db(), database_args.database_args())?; - let static_file_provider = StaticFileProvider::read_only(data_dir.static_files(), false)?; - let db = Arc::new(db_env); - Ok(ProviderFactory::new(db, Arc::new(chain_spec), static_file_provider)) - } - - fn migrate_mdbx(&self) -> eyre::Result<()> { - // Actually not much here, all of blocks should be in the static files - Ok(()) + Ok(!first) } fn using_old_header(&self, number: u64) -> eyre::Result { - let sf_provider = self.sf_provider(); + let sf_provider = self.0.sf_provider(); let content = old_headers_range(&sf_provider, number..=number)?; let &[row] = &content.as_slice() else { warn!("No header found for block {}", number); return Ok(false); }; - let header = &row[0]; - let deserialized_old = is_old_header(header); - let deserialized_new = is_new_header(header); - - assert!( - deserialized_old ^ deserialized_new, - "Header is not valid: {} {}\ndeserialized_old: {}\ndeserialized_new: {}", - number, - header.encode_hex(), - deserialized_old, - deserialized_new - ); - Ok(deserialized_old && !deserialized_new) + Ok(using_old_header(number, &row[0])) } } @@ -242,7 +312,7 @@ fn is_new_header(header: &[u8]) -> bool { rmp_serde::from_slice::(header).is_ok() } -fn migrate_single_static_file>( +fn migrate_single_static_file( sf_out: &StaticFileProvider, sf_in: &StaticFileProvider, provider: &DatabaseProvider, NodeTypesWithDBAdapter>>, @@ -261,11 +331,8 @@ fn migrate_single_static_file let mut writer = sf_out.get_writer(*block_range.start(), StaticFileSegment::Headers)?; let new_headers = std::iter::zip(headers, receipts) .map(|(header, receipts)| { - let system_tx_count = - receipts.iter().filter(|r| r.cumulative_gas_used == 0).count(); let eth_header = Header::decompress(&header[0]).unwrap(); - let hl_header = - HlHeader::from_ethereum_header(eth_header, &receipts, system_tx_count as u64); + let hl_header = to_hl_header(receipts, eth_header); let difficulty: U256 = CompactU256::decompress(&header[1]).unwrap().into(); let hash = BlockHash::decompress(&header[2]).unwrap(); @@ -281,6 +348,11 @@ fn migrate_single_static_file Ok(()) } +fn to_hl_header(receipts: Vec, eth_header: Header) -> HlHeader { + let system_tx_count = receipts.iter().filter(|r| r.cumulative_gas_used == 0).count(); + HlHeader::from_ethereum_header(eth_header, &receipts, system_tx_count as u64) +} + fn old_headers_range( provider: &StaticFileProvider, block_range: impl std::ops::RangeBounds, @@ -316,3 +388,18 @@ fn to_range>(bounds: R) -> std::ops::Range { start..end } + +fn using_old_header(number: u64, header: &[u8]) -> bool { + let deserialized_old = is_old_header(header); + let deserialized_new = is_new_header(header); + + assert!( + deserialized_old ^ deserialized_new, + "Header is not valid: {} {}\ndeserialized_old: {}\ndeserialized_new: {}", + number, + header.encode_hex(), + deserialized_old, + deserialized_new + ); + deserialized_old && !deserialized_new +}