5 Commits

Author SHA1 Message Date
f83326059f chore: clippy 2025-10-09 08:55:40 +00:00
ca8c374116 feat: Mark migrator as experimental 2025-10-09 08:49:29 +00:00
5ba12a4850 perf: adjust chunk size, do not hold tx too long 2025-10-09 08:20:22 +00:00
8a179a6d9e perf: Use smaller chunks 2025-10-09 08:13:53 +00:00
d570cf3e8d fix: Create directory before migration 2025-10-09 08:13:45 +00:00

View File

@ -1,5 +1,5 @@
use alloy_consensus::Header; use alloy_consensus::Header;
use alloy_primitives::{b256, hex::ToHexExt, BlockHash, Bytes, B256, U256}; use alloy_primitives::{B256, BlockHash, Bytes, U256, b256, hex::ToHexExt};
use reth::{ use reth::{
api::NodeTypesWithDBAdapter, api::NodeTypesWithDBAdapter,
args::{DatabaseArgs, DatadirArgs}, args::{DatabaseArgs, DatadirArgs},
@ -7,11 +7,12 @@ use reth::{
}; };
use reth_chainspec::EthChainSpec; use reth_chainspec::EthChainSpec;
use reth_db::{ use reth_db::{
mdbx::{tx::Tx, RO}, DatabaseEnv,
mdbx::{RO, tx::Tx},
models::CompactU256, models::CompactU256,
static_file::iter_static_files, static_file::iter_static_files,
table::Decompress, table::Decompress,
tables, DatabaseEnv, tables,
}; };
use reth_db_api::{ use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW}, cursor::{DbCursorRO, DbCursorRW},
@ -20,15 +21,15 @@ use reth_db_api::{
use reth_errors::ProviderResult; use reth_errors::ProviderResult;
use reth_ethereum_primitives::EthereumReceipt; use reth_ethereum_primitives::EthereumReceipt;
use reth_provider::{ use reth_provider::{
providers::{NodeTypesForProvider, StaticFileProvider},
static_file::SegmentRangeInclusive,
DatabaseProvider, ProviderFactory, ReceiptProvider, StaticFileProviderFactory, DatabaseProvider, ProviderFactory, ReceiptProvider, StaticFileProviderFactory,
StaticFileSegment, StaticFileWriter, StaticFileSegment, StaticFileWriter,
providers::{NodeTypesForProvider, StaticFileProvider},
static_file::SegmentRangeInclusive,
}; };
use std::{fs::File, io::Write, path::PathBuf, sync::Arc}; use std::{fs::File, io::Write, path::PathBuf, sync::Arc};
use tracing::{info, warn}; use tracing::{info, warn};
use crate::{chainspec::HlChainSpec, HlHeader, HlPrimitives}; use crate::{HlHeader, HlPrimitives, chainspec::HlChainSpec};
pub(crate) trait HlNodeType: pub(crate) trait HlNodeType:
NodeTypesForProvider<ChainSpec = HlChainSpec, Primitives = HlPrimitives> NodeTypesForProvider<ChainSpec = HlChainSpec, Primitives = HlPrimitives>
@ -123,6 +124,8 @@ impl<'a, N: HlNodeType> MigratorMdbx<'a, N> {
return Ok(false); return Ok(false);
} }
check_if_migration_enabled()?;
self.migrate_mdbx_inner()?; self.migrate_mdbx_inner()?;
Ok(true) Ok(true)
} }
@ -130,7 +133,14 @@ impl<'a, N: HlNodeType> MigratorMdbx<'a, N> {
fn migrate_mdbx_inner(&self) -> eyre::Result<()> { fn migrate_mdbx_inner(&self) -> eyre::Result<()> {
// There shouldn't be many headers in mdbx, but using file for safety // There shouldn't be many headers in mdbx, but using file for safety
info!("Old database detected, migrating mdbx..."); info!("Old database detected, migrating mdbx...");
let tmp_path = self.0.conversion_tmp_dir().join("headers.rmp"); let conversion_tmp = self.0.conversion_tmp_dir();
let tmp_path = conversion_tmp.join("headers.rmp");
if conversion_tmp.exists() {
std::fs::remove_dir_all(&conversion_tmp)?;
}
std::fs::create_dir_all(&conversion_tmp)?;
let count = self.export_old_headers(&tmp_path)?; let count = self.export_old_headers(&tmp_path)?;
self.import_new_headers(tmp_path, count)?; self.import_new_headers(tmp_path, count)?;
Ok(()) Ok(())
@ -172,6 +182,18 @@ impl<'a, N: HlNodeType> MigratorMdbx<'a, N> {
} }
} }
fn check_if_migration_enabled() -> Result<(), eyre::Error> {
if std::env::var("EXPERIMENTAL_MIGRATE_DB").is_err() {
let err_msg = concat!(
"Detected an old database format but experimental database migration is currently disabled. ",
"To enable migration, set EXPERIMENTAL_MIGRATE_DB=1, or alternatively, resync your node (safest option)."
);
warn!("{}", err_msg);
return Err(eyre::eyre!("{}", err_msg));
}
Ok(())
}
struct MigrateStaticFiles<'a, N: HlNodeType>(&'a Migrator<N>); struct MigrateStaticFiles<'a, N: HlNodeType>(&'a Migrator<N>);
impl<'a, N: HlNodeType> MigrateStaticFiles<'a, N> { impl<'a, N: HlNodeType> MigrateStaticFiles<'a, N> {
@ -244,13 +266,12 @@ impl<'a, N: HlNodeType> MigrateStaticFiles<'a, N> {
let mut all_static_files = iter_static_files(&old_path)?; let mut all_static_files = iter_static_files(&old_path)?;
let all_static_files = let all_static_files =
all_static_files.remove(&StaticFileSegment::Headers).unwrap_or_default(); all_static_files.remove(&StaticFileSegment::Headers).unwrap_or_default();
let provider = self.0.provider_factory.provider()?;
let mut first = true; let mut first = true;
for (block_range, _tx_ranges) in all_static_files { for (block_range, _tx_ranges) in all_static_files {
let migration_needed = self.using_old_header(block_range.start())? || let migration_needed = self.using_old_header(block_range.start())?
self.using_old_header(block_range.end())?; || self.using_old_header(block_range.end())?;
if !migration_needed { if !migration_needed {
// Create a placeholder symlink // Create a placeholder symlink
self.create_placeholder(block_range)?; self.create_placeholder(block_range)?;
@ -258,12 +279,15 @@ impl<'a, N: HlNodeType> MigrateStaticFiles<'a, N> {
} }
if first { if first {
check_if_migration_enabled()?;
info!("Old database detected, migrating static files..."); info!("Old database detected, migrating static files...");
first = false; first = false;
} }
let sf_provider = self.0.sf_provider(); let sf_provider = self.0.sf_provider();
let sf_tmp_provider = StaticFileProvider::<HlPrimitives>::read_write(&conversion_tmp)?; let sf_tmp_provider = StaticFileProvider::<HlPrimitives>::read_write(&conversion_tmp)?;
let provider = self.0.provider_factory.provider()?;
let block_range_for_filename = sf_provider.find_fixed_range(block_range.start()); let block_range_for_filename = sf_provider.find_fixed_range(block_range.start());
migrate_single_static_file(&sf_tmp_provider, &sf_provider, &provider, block_range)?; migrate_single_static_file(&sf_tmp_provider, &sf_provider, &provider, block_range)?;
@ -320,8 +344,8 @@ fn migrate_single_static_file<N: HlNodeType>(
) -> Result<(), eyre::Error> { ) -> Result<(), eyre::Error> {
info!("Migrating block range {}...", block_range); info!("Migrating block range {}...", block_range);
// block_ranges into chunks of 100000 blocks // block_ranges into chunks of 50000 blocks
const CHUNK_SIZE: u64 = 100000; const CHUNK_SIZE: u64 = 50000;
for chunk in (0..=block_range.end()).step_by(CHUNK_SIZE as usize) { for chunk in (0..=block_range.end()).step_by(CHUNK_SIZE as usize) {
let end = std::cmp::min(chunk + CHUNK_SIZE - 1, block_range.end()); let end = std::cmp::min(chunk + CHUNK_SIZE - 1, block_range.end());
let block_range = chunk..=end; let block_range = chunk..=end;