diff --git a/src/node/cli.rs b/src/node/cli.rs index fec922ef1..71072a222 100644 --- a/src/node/cli.rs +++ b/src/node/cli.rs @@ -1,12 +1,15 @@ use crate::{ chainspec::{HlChainSpec, parser::HlChainSpecParser}, - node::{HlNode, consensus::HlConsensus, evm::config::HlEvmConfig, storage::tables::Tables}, + node::{ + HlNode, consensus::HlConsensus, evm::config::HlEvmConfig, migrate::Migrator, + storage::tables::Tables, + }, pseudo_peer::BlockSourceArgs, }; use clap::{Args, Parser}; use reth::{ CliRunner, - args::LogArgs, + args::{DatabaseArgs, DatadirArgs, LogArgs}, builder::{NodeBuilder, WithLaunchContext}, cli::Commands, prometheus_exporter::install_prometheus_recorder, @@ -142,6 +145,8 @@ where match self.command { Commands::Node(command) => runner.run_command_until_exit(|ctx| { + Self::migrate_db(&command.chain, &command.datadir, &command.db) + .expect("Failed to migrate database"); command.execute(ctx, FnLauncher::new::(launcher)) }), Commands::Init(command) => { @@ -188,4 +193,13 @@ where init_db_for::<_, Tables>(db_path, env.db.database_args())?; Ok(()) } + + fn migrate_db( + chain: &HlChainSpec, + datadir: &DatadirArgs, + db: &DatabaseArgs, + ) -> eyre::Result<()> { + Migrator::::new(chain.clone(), datadir.clone(), *db)?.migrate_db()?; + Ok(()) + } } diff --git a/src/node/migrate.rs b/src/node/migrate.rs new file mode 100644 index 000000000..ca96fbf80 --- /dev/null +++ b/src/node/migrate.rs @@ -0,0 +1,317 @@ +use alloy_consensus::Header; +use alloy_primitives::{b256, hex::ToHexExt, BlockHash, B256, U256}; +use reth::{ + api::{NodeTypes, NodeTypesWithDBAdapter}, + args::{DatabaseArgs, DatadirArgs}, + dirs::{ChainPath, DataDirPath}, +}; +use reth_chainspec::EthChainSpec; +use reth_db::{ + mdbx::{tx::Tx, RO}, + models::CompactU256, + static_file::iter_static_files, + table::Decompress, + DatabaseEnv, +}; +use reth_errors::ProviderResult; +use reth_provider::{ + providers::{NodeTypesForProvider, StaticFileProvider}, + static_file::SegmentRangeInclusive, + DatabaseProvider, ProviderFactory, ReceiptProvider, StaticFileProviderFactory, + StaticFileSegment, StaticFileWriter, +}; +use std::{marker::PhantomData, ops::RangeInclusive, path::PathBuf, sync::Arc}; +use tracing::{info, warn}; + +use crate::{chainspec::HlChainSpec, HlHeader, HlPrimitives}; + +pub(super) struct Migrator { + data_dir: ChainPath, + provider_factory: ProviderFactory>>, + _nt: PhantomData, +} + +impl Migrator +where + N: NodeTypes, +{ + const MIGRATION_PATH_SUFFIX: &'static str = "migration-tmp"; + + pub fn new( + chain_spec: HlChainSpec, + datadir: DatadirArgs, + database_args: DatabaseArgs, + ) -> eyre::Result { + let data_dir = datadir.clone().resolve_datadir(chain_spec.chain()); + let provider_factory = Self::provider_factory(chain_spec, datadir, database_args)?; + Ok(Self { data_dir, provider_factory, _nt: PhantomData }) + } + + pub fn sf_provider(&self) -> StaticFileProvider { + self.provider_factory.static_file_provider() + } + + pub fn migrate_db(&self) -> eyre::Result<()> { + let is_empty = Self::highest_block_number(&self.sf_provider()).is_none(); + + if is_empty { + return Ok(()); + } + + self.migrate_db_inner() + } + + fn highest_block_number(sf_provider: &StaticFileProvider) -> Option { + sf_provider.get_highest_static_file_block(StaticFileSegment::Headers) + } + + fn migrate_db_inner(&self) -> eyre::Result<()> { + self.migrate_static_files()?; + self.migrate_mdbx()?; + info!("Database migrated successfully"); + Ok(()) + } + + fn conversion_tmp_dir(&self) -> PathBuf { + self.data_dir.data_dir().join(Self::MIGRATION_PATH_SUFFIX) + } + + fn iterate_files_for_segment( + &self, + block_range: SegmentRangeInclusive, + dir: &PathBuf, + ) -> eyre::Result> { + let prefix = StaticFileSegment::Headers.filename(&block_range); + + let entries = std::fs::read_dir(dir)? + .map(|res| res.map(|e| e.path())) + .collect::, _>>()?; + + Ok(entries + .into_iter() + .filter_map(|path| { + let file_name = path.file_name().and_then(|f| f.to_str())?; + if file_name.starts_with(&prefix) { + Some((path.clone(), file_name.to_string())) + } else { + None + } + }) + .collect()) + } + + fn create_placeholder(&self, block_range: SegmentRangeInclusive) -> eyre::Result<()> { + // The direction is opposite here + let src = self.data_dir.static_files(); + let dst = self.conversion_tmp_dir(); + + for (src_path, file_name) in self.iterate_files_for_segment(block_range, &src)? { + let dst_path = dst.join(file_name); + if dst_path.exists() { + std::fs::remove_file(&dst_path)?; + } + std::os::unix::fs::symlink(src_path, dst_path)?; + } + + Ok(()) + } + + fn move_static_files_for_segment( + &self, + block_range: SegmentRangeInclusive, + ) -> eyre::Result<()> { + let src = self.conversion_tmp_dir(); + let dst = self.data_dir.static_files(); + + for (src_path, file_name) in self.iterate_files_for_segment(block_range, &src)? { + let dst_path = dst.join(file_name); + std::fs::remove_file(&dst_path)?; + std::fs::rename(&src_path, &dst_path)?; + } + + // Still StaticFileProvider needs the file to exist, so we create a symlink + self.create_placeholder(block_range) + } + + fn migrate_static_files(&self) -> eyre::Result<()> { + let conversion_tmp = self.conversion_tmp_dir(); + let old_path = self.data_dir.static_files(); + + if conversion_tmp.exists() { + std::fs::remove_dir_all(&conversion_tmp)?; + } + std::fs::create_dir_all(&conversion_tmp)?; + + let mut all_static_files = iter_static_files(&old_path)?; + let all_static_files = + all_static_files.remove(&StaticFileSegment::Headers).unwrap_or_default(); + let provider = self.provider_factory.provider()?; + + let mut first = true; + + for (block_range, _tx_ranges) in all_static_files { + let migration_needed = self.using_old_header(block_range.start())? || + self.using_old_header(block_range.end())?; + if !migration_needed { + // Create a placeholder symlink + self.create_placeholder(block_range)?; + continue; + } + + if first { + info!("Old database detected, migrating database..."); + first = false; + } + + let sf_provider = self.sf_provider(); + let sf_tmp_provider = StaticFileProvider::::read_write(&conversion_tmp)?; + let block_range_for_filename = sf_provider.find_fixed_range(block_range.start()); + migrate_single_static_file( + &sf_tmp_provider, + &sf_provider, + &provider, + block_range, + )?; + + self.move_static_files_for_segment(block_range_for_filename)?; + } + + Ok(()) + } + + fn provider_factory( + chain_spec: HlChainSpec, + datadir: DatadirArgs, + database_args: DatabaseArgs, + ) -> eyre::Result>>> { + let data_dir = datadir.clone().resolve_datadir(chain_spec.chain()); + let db_env = reth_db::init_db(data_dir.db(), database_args.database_args())?; + let static_file_provider = StaticFileProvider::read_only(data_dir.static_files(), false)?; + let db = Arc::new(db_env); + Ok(ProviderFactory::new(db, Arc::new(chain_spec), static_file_provider)) + } + + fn migrate_mdbx(&self) -> eyre::Result<()> { + // Actually not much here, all of blocks should be in the static files + Ok(()) + } + + fn using_old_header(&self, number: u64) -> eyre::Result { + let sf_provider = self.sf_provider(); + let content = old_headers_range(&sf_provider, number..=number)?; + + let &[row] = &content.as_slice() else { + warn!("No header found for block {}", number); + return Ok(false); + }; + let header = &row[0]; + + let deserialized_old = is_old_header(header); + let deserialized_new = is_new_header(header); + + assert!( + deserialized_old ^ deserialized_new, + "Header is not valid: {} {}\ndeserialized_old: {}\ndeserialized_new: {}", + number, + header.encode_hex(), + deserialized_old, + deserialized_new + ); + Ok(deserialized_old && !deserialized_new) + } +} + +// Problem is that decompress just panics when the header is not valid +// So we need heuristics... +fn is_old_header(header: &[u8]) -> bool { + const SHA3_UNCLE_OFFSET: usize = 0x24; + const SHA3_UNCLE_HASH: B256 = + b256!("1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347"); + const GENESIS_PREFIX: [u8; 4] = [0x01, 0x20, 0x00, 0xf8]; + let Some(sha3_uncle_hash) = header.get(SHA3_UNCLE_OFFSET..SHA3_UNCLE_OFFSET + 32) else { + return false; + }; + if sha3_uncle_hash == SHA3_UNCLE_HASH { + return true; + } + + // genesis block might be different + if header.starts_with(&GENESIS_PREFIX) { + return true; + } + + false +} + +fn is_new_header(header: &[u8]) -> bool { + rmp_serde::from_slice::(header).is_ok() +} + +fn migrate_single_static_file>( + sf_out: &StaticFileProvider, + sf_in: &StaticFileProvider, + provider: &DatabaseProvider, NodeTypesWithDBAdapter>>, + block_range: SegmentRangeInclusive, +) -> Result<(), eyre::Error> { + let block_range: RangeInclusive = block_range.into(); + info!("Migrating block range {:?}...", block_range); + info!("Loading headers"); + let headers = old_headers_range(sf_in, block_range.clone())?; + info!("Loading receipts"); + let receipts = provider.receipts_by_block_range(block_range.clone())?; + assert_eq!(headers.len(), receipts.len()); + let mut writer = sf_out.get_writer(*block_range.start(), StaticFileSegment::Headers)?; + let new_headers = std::iter::zip(headers, receipts) + .map(|(header, receipts)| { + let system_tx_count = receipts.iter().filter(|r| r.cumulative_gas_used == 0).count(); + let eth_header = Header::decompress(&header[0]).unwrap(); + let hl_header = + HlHeader::from_ethereum_header(eth_header, &receipts, system_tx_count as u64); + + let difficulty: U256 = CompactU256::decompress(&header[1]).unwrap().into(); + let hash = BlockHash::decompress(&header[2]).unwrap(); + (hl_header, difficulty, hash) + }) + .collect::>(); + for header in new_headers { + writer.append_header(&header.0, header.1, &header.2)?; + } + writer.commit().unwrap(); + Ok(()) +} + +fn old_headers_range( + provider: &StaticFileProvider, + block_range: impl std::ops::RangeBounds, +) -> ProviderResult>>> { + Ok(provider + .fetch_range_with_predicate( + StaticFileSegment::Headers, + to_range(block_range), + |cursor, number| { + cursor.get(number.into(), 0b111).map(|rows| { + rows.map(|columns| columns.into_iter().map(|column| column.to_vec()).collect()) + }) + }, + |_| true, + )? + .into_iter() + .collect()) +} + +// Copied from reth +fn to_range>(bounds: R) -> std::ops::Range { + let start = match bounds.start_bound() { + std::ops::Bound::Included(&v) => v, + std::ops::Bound::Excluded(&v) => v + 1, + std::ops::Bound::Unbounded => 0, + }; + + let end = match bounds.end_bound() { + std::ops::Bound::Included(&v) => v + 1, + std::ops::Bound::Excluded(&v) => v, + std::ops::Bound::Unbounded => u64::MAX, + }; + + start..end +} diff --git a/src/node/mod.rs b/src/node/mod.rs index 51a1c213e..48761ccb9 100644 --- a/src/node/mod.rs +++ b/src/node/mod.rs @@ -33,6 +33,7 @@ pub mod cli; pub mod consensus; pub mod engine; pub mod evm; +pub mod migrate; pub mod network; pub mod primitives; pub mod rpc; diff --git a/src/node/primitives/rlp.rs b/src/node/primitives/rlp.rs index fb9c9975a..43cdaf640 100644 --- a/src/node/primitives/rlp.rs +++ b/src/node/primitives/rlp.rs @@ -95,7 +95,7 @@ impl Decodable for HlBlockBody { Ok(Self { inner: BlockBody { transactions: transactions.into_owned(), - ommers: ommers.into_owned().into_iter().map(Into::into).collect(), + ommers: ommers.into_owned(), withdrawals: withdrawals.map(|w| w.into_owned()), }, sidecars: sidecars.map(|s| s.into_owned()), diff --git a/src/node/primitives/transaction.rs b/src/node/primitives/transaction.rs index 30bf1ffc8..cb4710281 100644 --- a/src/node/primitives/transaction.rs +++ b/src/node/primitives/transaction.rs @@ -209,22 +209,6 @@ impl Decompress for TransactionSigned { } } -pub fn convert_to_eth_block_body(value: BlockBody) -> alloy_consensus::BlockBody { - alloy_consensus::BlockBody { - transactions: value.transactions.into_iter().map(|tx| tx.into_inner()).collect(), - ommers: value.ommers.into_iter().map(|ommer| ommer.into()).collect(), - withdrawals: value.withdrawals, - } -} - -pub fn convert_to_hl_block_body(value: alloy_consensus::BlockBody) -> BlockBody { - BlockBody { - transactions: value.transactions.into_iter().map(TransactionSigned::Default).collect(), - ommers: value.ommers, - withdrawals: value.withdrawals, - } -} - impl TryIntoSimTx for TransactionRequest { fn try_into_sim_tx(self) -> Result> { let tx = self diff --git a/src/node/storage/mod.rs b/src/node/storage/mod.rs index b492bff1b..d6549cf31 100644 --- a/src/node/storage/mod.rs +++ b/src/node/storage/mod.rs @@ -1,9 +1,6 @@ use crate::{ HlBlock, HlBlockBody, HlHeader, HlPrimitives, - node::{ - primitives::transaction::{convert_to_eth_block_body, convert_to_hl_block_body}, - types::HlExtras, - }, + node::{primitives::TransactionSigned, types::HlExtras}, }; use alloy_consensus::BlockHeader; use alloy_primitives::Bytes; @@ -13,7 +10,6 @@ use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, transaction::{DbTx, DbTxMut}, }; -use reth_primitives::TransactionSigned; use reth_primitives_traits::Block; use reth_provider::{ BlockBodyReader, BlockBodyWriter, ChainSpecProvider, ChainStorageReader, ChainStorageWriter, @@ -91,30 +87,17 @@ where let mut read_precompile_calls = Vec::with_capacity(bodies.len()); for (block_number, body) in bodies { - match body { + let (inner_opt, extras) = match body { Some(HlBlockBody { inner, sidecars: _, - read_precompile_calls: rpc, + read_precompile_calls, highest_precompile_address, - }) => { - eth_bodies.push((block_number, Some(convert_to_eth_block_body(inner)))); - read_precompile_calls.push(( - block_number, - HlExtras { read_precompile_calls: rpc, highest_precompile_address }, - )); - } - None => { - eth_bodies.push((block_number, None)); - read_precompile_calls.push(( - block_number, - HlExtras { - read_precompile_calls: Default::default(), - highest_precompile_address: None, - }, - )); - } - } + }) => (Some(inner), HlExtras { read_precompile_calls, highest_precompile_address }), + None => Default::default(), + }; + eth_bodies.push((block_number, inner_opt)); + read_precompile_calls.push((block_number, extras)); } self.0.write_block_bodies(provider, eth_bodies, write_to)?; @@ -148,12 +131,6 @@ where inputs: Vec>, ) -> ProviderResult> { let read_precompile_calls = self.read_precompile_calls(provider, &inputs)?; - let inputs: Vec<(&HlHeader, _)> = inputs - .into_iter() - .map(|(header, transactions)| { - (header, transactions.into_iter().map(|tx| tx.into_inner()).collect()) - }) - .collect(); let inputs: Vec<(&::Header, _)> = inputs; let eth_bodies = self.0.read_block_bodies(provider, inputs)?; let eth_bodies: Vec> = eth_bodies; @@ -163,7 +140,7 @@ where .into_iter() .zip(read_precompile_calls) .map(|(inner, extra)| HlBlockBody { - inner: convert_to_hl_block_body(inner), + inner, sidecars: None, read_precompile_calls: extra.read_precompile_calls, highest_precompile_address: extra.highest_precompile_address, diff --git a/src/node/types/mod.rs b/src/node/types/mod.rs index bdfcf572c..aaf112b31 100644 --- a/src/node/types/mod.rs +++ b/src/node/types/mod.rs @@ -19,10 +19,7 @@ pub struct ReadPrecompileCalls(pub Vec); pub(crate) mod reth_compat; -#[derive( - Debug, Clone, Serialize, Deserialize, Default, RlpEncodable, RlpDecodable, Eq, PartialEq, Hash, -)] -#[rlp(trailing)] +#[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct HlExtras { pub read_precompile_calls: Option, pub highest_precompile_address: Option
, @@ -30,8 +27,8 @@ pub struct HlExtras { impl InMemorySize for HlExtras { fn size(&self) -> usize { - self.read_precompile_calls.as_ref().map_or(0, |s| s.0.len()) - + self.highest_precompile_address.as_ref().map_or(0, |_| 20) + self.read_precompile_calls.as_ref().map_or(0, |s| s.0.len()) + + self.highest_precompile_address.as_ref().map_or(0, |_| 20) } }