mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
Compare commits
1 Commits
nb-2025100
...
860fa666e3
| Author | SHA1 | Date | |
|---|---|---|---|
| 860fa666e3 |
@ -1,7 +1,7 @@
|
||||
use alloy_consensus::Header;
|
||||
use alloy_primitives::{b256, hex::ToHexExt, BlockHash, Bytes, B256, U256};
|
||||
use alloy_primitives::{b256, hex::ToHexExt, BlockHash, B256, U256};
|
||||
use reth::{
|
||||
api::NodeTypesWithDBAdapter,
|
||||
api::{NodeTypes, NodeTypesWithDBAdapter},
|
||||
args::{DatabaseArgs, DatadirArgs},
|
||||
dirs::{ChainPath, DataDirPath},
|
||||
};
|
||||
@ -11,37 +11,30 @@ use reth_db::{
|
||||
models::CompactU256,
|
||||
static_file::iter_static_files,
|
||||
table::Decompress,
|
||||
tables, DatabaseEnv,
|
||||
};
|
||||
use reth_db_api::{
|
||||
cursor::{DbCursorRO, DbCursorRW},
|
||||
transaction::{DbTx, DbTxMut},
|
||||
DatabaseEnv,
|
||||
};
|
||||
use reth_errors::ProviderResult;
|
||||
use reth_ethereum_primitives::EthereumReceipt;
|
||||
use reth_provider::{
|
||||
providers::{NodeTypesForProvider, StaticFileProvider},
|
||||
static_file::SegmentRangeInclusive,
|
||||
DatabaseProvider, ProviderFactory, ReceiptProvider, StaticFileProviderFactory,
|
||||
StaticFileSegment, StaticFileWriter,
|
||||
};
|
||||
use std::{fs::File, io::Write, path::PathBuf, sync::Arc};
|
||||
use std::{marker::PhantomData, path::PathBuf, sync::Arc};
|
||||
use tracing::{info, warn};
|
||||
|
||||
use crate::{chainspec::HlChainSpec, HlHeader, HlPrimitives};
|
||||
|
||||
pub(crate) trait HlNodeType:
|
||||
NodeTypesForProvider<ChainSpec = HlChainSpec, Primitives = HlPrimitives>
|
||||
{
|
||||
}
|
||||
impl<N: NodeTypesForProvider<ChainSpec = HlChainSpec, Primitives = HlPrimitives>> HlNodeType for N {}
|
||||
|
||||
pub(super) struct Migrator<N: HlNodeType> {
|
||||
pub(super) struct Migrator<N: NodeTypesForProvider> {
|
||||
data_dir: ChainPath<DataDirPath>,
|
||||
provider_factory: ProviderFactory<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
|
||||
_nt: PhantomData<N>,
|
||||
}
|
||||
|
||||
impl<N: HlNodeType> Migrator<N> {
|
||||
impl<N: NodeTypesForProvider> Migrator<N>
|
||||
where
|
||||
N: NodeTypes<ChainSpec = HlChainSpec, Primitives = HlPrimitives>,
|
||||
{
|
||||
const MIGRATION_PATH_SUFFIX: &'static str = "migration-tmp";
|
||||
|
||||
pub fn new(
|
||||
@ -51,7 +44,7 @@ impl<N: HlNodeType> Migrator<N> {
|
||||
) -> eyre::Result<Self> {
|
||||
let data_dir = datadir.clone().resolve_datadir(chain_spec.chain());
|
||||
let provider_factory = Self::provider_factory(chain_spec, datadir, database_args)?;
|
||||
Ok(Self { data_dir, provider_factory })
|
||||
Ok(Self { data_dir, provider_factory, _nt: PhantomData })
|
||||
}
|
||||
|
||||
pub fn sf_provider(&self) -> StaticFileProvider<HlPrimitives> {
|
||||
@ -73,12 +66,9 @@ impl<N: HlNodeType> Migrator<N> {
|
||||
}
|
||||
|
||||
fn migrate_db_inner(&self) -> eyre::Result<()> {
|
||||
let migrated_mdbx = MigratorMdbx::<N>(self).migrate_mdbx()?;
|
||||
let migrated_static_files = MigrateStaticFiles::<N>(self).migrate_static_files()?;
|
||||
|
||||
if migrated_mdbx || migrated_static_files {
|
||||
info!("Database migrated successfully");
|
||||
}
|
||||
self.migrate_static_files()?;
|
||||
self.migrate_mdbx()?;
|
||||
info!("Database migrated successfully");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -86,95 +76,6 @@ impl<N: HlNodeType> Migrator<N> {
|
||||
self.data_dir.data_dir().join(Self::MIGRATION_PATH_SUFFIX)
|
||||
}
|
||||
|
||||
fn provider_factory(
|
||||
chain_spec: HlChainSpec,
|
||||
datadir: DatadirArgs,
|
||||
database_args: DatabaseArgs,
|
||||
) -> eyre::Result<ProviderFactory<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>> {
|
||||
let data_dir = datadir.clone().resolve_datadir(chain_spec.chain());
|
||||
let db_env = reth_db::init_db(data_dir.db(), database_args.database_args())?;
|
||||
let static_file_provider = StaticFileProvider::read_only(data_dir.static_files(), false)?;
|
||||
let db = Arc::new(db_env);
|
||||
Ok(ProviderFactory::new(db, Arc::new(chain_spec), static_file_provider))
|
||||
}
|
||||
}
|
||||
|
||||
struct MigratorMdbx<'a, N: HlNodeType>(&'a Migrator<N>);
|
||||
|
||||
impl<'a, N: HlNodeType> MigratorMdbx<'a, N> {
|
||||
fn migrate_mdbx(&self) -> eyre::Result<bool> {
|
||||
// if any header is in old format, we need to migrate it, so we pick the first and last one
|
||||
let db_env = self.0.provider_factory.provider()?;
|
||||
let mut cursor = db_env.tx_ref().cursor_read::<tables::Headers<Bytes>>()?;
|
||||
|
||||
let migration_needed = {
|
||||
let first_is_old = match cursor.first()? {
|
||||
Some((number, header)) => using_old_header(number, &header),
|
||||
None => false,
|
||||
};
|
||||
let last_is_old = match cursor.last()? {
|
||||
Some((number, header)) => using_old_header(number, &header),
|
||||
None => false,
|
||||
};
|
||||
first_is_old || last_is_old
|
||||
};
|
||||
|
||||
if !migration_needed {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
self.migrate_mdbx_inner()?;
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
fn migrate_mdbx_inner(&self) -> eyre::Result<()> {
|
||||
// There shouldn't be many headers in mdbx, but using file for safety
|
||||
info!("Old database detected, migrating mdbx...");
|
||||
let tmp_path = self.0.conversion_tmp_dir().join("headers.rmp");
|
||||
let count = self.export_old_headers(&tmp_path)?;
|
||||
self.import_new_headers(tmp_path, count)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn export_old_headers(&self, tmp_path: &PathBuf) -> Result<i32, eyre::Error> {
|
||||
let db_env = self.0.provider_factory.provider()?;
|
||||
let mut cursor_read = db_env.tx_ref().cursor_read::<tables::Headers<Bytes>>()?;
|
||||
let mut tmp_writer = File::create(tmp_path)?;
|
||||
let mut count = 0;
|
||||
let old_headers = cursor_read.walk(None)?.filter_map(|row| {
|
||||
let (block_number, header) = row.ok()?;
|
||||
if !using_old_header(block_number, &header) {
|
||||
None
|
||||
} else {
|
||||
Some((block_number, Header::decompress(&header).ok()?))
|
||||
}
|
||||
});
|
||||
for (block_number, header) in old_headers {
|
||||
let receipt =
|
||||
db_env.receipts_by_block(block_number.into())?.expect("Receipt not found");
|
||||
let new_header = to_hl_header(receipt, header);
|
||||
tmp_writer.write_all(&rmp_serde::to_vec(&(block_number, new_header))?)?;
|
||||
count += 1;
|
||||
}
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
fn import_new_headers(&self, tmp_path: PathBuf, count: i32) -> Result<(), eyre::Error> {
|
||||
let mut tmp_reader = File::open(tmp_path)?;
|
||||
let db_env = self.0.provider_factory.provider_rw()?;
|
||||
let mut cursor_write = db_env.tx_ref().cursor_write::<tables::Headers<Bytes>>()?;
|
||||
for _ in 0..count {
|
||||
let (number, header) = rmp_serde::from_read::<_, (u64, HlHeader)>(&mut tmp_reader)?;
|
||||
cursor_write.upsert(number, &rmp_serde::to_vec(&header)?.into())?;
|
||||
}
|
||||
db_env.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct MigrateStaticFiles<'a, N: HlNodeType>(&'a Migrator<N>);
|
||||
|
||||
impl<'a, N: HlNodeType> MigrateStaticFiles<'a, N> {
|
||||
fn iterate_files_for_segment(
|
||||
&self,
|
||||
block_range: SegmentRangeInclusive,
|
||||
@ -201,8 +102,8 @@ impl<'a, N: HlNodeType> MigrateStaticFiles<'a, N> {
|
||||
|
||||
fn create_placeholder(&self, block_range: SegmentRangeInclusive) -> eyre::Result<()> {
|
||||
// The direction is opposite here
|
||||
let src = self.0.data_dir.static_files();
|
||||
let dst = self.0.conversion_tmp_dir();
|
||||
let src = self.data_dir.static_files();
|
||||
let dst = self.conversion_tmp_dir();
|
||||
|
||||
for (src_path, file_name) in self.iterate_files_for_segment(block_range, &src)? {
|
||||
let dst_path = dst.join(file_name);
|
||||
@ -219,8 +120,8 @@ impl<'a, N: HlNodeType> MigrateStaticFiles<'a, N> {
|
||||
&self,
|
||||
block_range: SegmentRangeInclusive,
|
||||
) -> eyre::Result<()> {
|
||||
let src = self.0.conversion_tmp_dir();
|
||||
let dst = self.0.data_dir.static_files();
|
||||
let src = self.conversion_tmp_dir();
|
||||
let dst = self.data_dir.static_files();
|
||||
|
||||
for (src_path, file_name) in self.iterate_files_for_segment(block_range, &src)? {
|
||||
let dst_path = dst.join(file_name);
|
||||
@ -232,9 +133,9 @@ impl<'a, N: HlNodeType> MigrateStaticFiles<'a, N> {
|
||||
self.create_placeholder(block_range)
|
||||
}
|
||||
|
||||
fn migrate_static_files(&self) -> eyre::Result<bool> {
|
||||
let conversion_tmp = self.0.conversion_tmp_dir();
|
||||
let old_path = self.0.data_dir.static_files();
|
||||
fn migrate_static_files(&self) -> eyre::Result<()> {
|
||||
let conversion_tmp = self.conversion_tmp_dir();
|
||||
let old_path = self.data_dir.static_files();
|
||||
|
||||
if conversion_tmp.exists() {
|
||||
std::fs::remove_dir_all(&conversion_tmp)?;
|
||||
@ -244,7 +145,7 @@ impl<'a, N: HlNodeType> MigrateStaticFiles<'a, N> {
|
||||
let mut all_static_files = iter_static_files(&old_path)?;
|
||||
let all_static_files =
|
||||
all_static_files.remove(&StaticFileSegment::Headers).unwrap_or_default();
|
||||
let provider = self.0.provider_factory.provider()?;
|
||||
let provider = self.provider_factory.provider()?;
|
||||
|
||||
let mut first = true;
|
||||
|
||||
@ -258,11 +159,11 @@ impl<'a, N: HlNodeType> MigrateStaticFiles<'a, N> {
|
||||
}
|
||||
|
||||
if first {
|
||||
info!("Old database detected, migrating static files...");
|
||||
info!("Old database detected, migrating database...");
|
||||
first = false;
|
||||
}
|
||||
|
||||
let sf_provider = self.0.sf_provider();
|
||||
let sf_provider = self.sf_provider();
|
||||
let sf_tmp_provider = StaticFileProvider::<HlPrimitives>::read_write(&conversion_tmp)?;
|
||||
let block_range_for_filename = sf_provider.find_fixed_range(block_range.start());
|
||||
migrate_single_static_file(&sf_tmp_provider, &sf_provider, &provider, block_range)?;
|
||||
@ -270,19 +171,48 @@ impl<'a, N: HlNodeType> MigrateStaticFiles<'a, N> {
|
||||
self.move_static_files_for_segment(block_range_for_filename)?;
|
||||
}
|
||||
|
||||
Ok(!first)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn provider_factory(
|
||||
chain_spec: HlChainSpec,
|
||||
datadir: DatadirArgs,
|
||||
database_args: DatabaseArgs,
|
||||
) -> eyre::Result<ProviderFactory<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>> {
|
||||
let data_dir = datadir.clone().resolve_datadir(chain_spec.chain());
|
||||
let db_env = reth_db::init_db(data_dir.db(), database_args.database_args())?;
|
||||
let static_file_provider = StaticFileProvider::read_only(data_dir.static_files(), false)?;
|
||||
let db = Arc::new(db_env);
|
||||
Ok(ProviderFactory::new(db, Arc::new(chain_spec), static_file_provider))
|
||||
}
|
||||
|
||||
fn migrate_mdbx(&self) -> eyre::Result<()> {
|
||||
// Actually not much here, all of blocks should be in the static files
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn using_old_header(&self, number: u64) -> eyre::Result<bool> {
|
||||
let sf_provider = self.0.sf_provider();
|
||||
let sf_provider = self.sf_provider();
|
||||
let content = old_headers_range(&sf_provider, number..=number)?;
|
||||
|
||||
let &[row] = &content.as_slice() else {
|
||||
warn!("No header found for block {}", number);
|
||||
return Ok(false);
|
||||
};
|
||||
let header = &row[0];
|
||||
|
||||
Ok(using_old_header(number, &row[0]))
|
||||
let deserialized_old = is_old_header(header);
|
||||
let deserialized_new = is_new_header(header);
|
||||
|
||||
assert!(
|
||||
deserialized_old ^ deserialized_new,
|
||||
"Header is not valid: {} {}\ndeserialized_old: {}\ndeserialized_new: {}",
|
||||
number,
|
||||
header.encode_hex(),
|
||||
deserialized_old,
|
||||
deserialized_new
|
||||
);
|
||||
Ok(deserialized_old && !deserialized_new)
|
||||
}
|
||||
}
|
||||
|
||||
@ -312,7 +242,7 @@ fn is_new_header(header: &[u8]) -> bool {
|
||||
rmp_serde::from_slice::<HlHeader>(header).is_ok()
|
||||
}
|
||||
|
||||
fn migrate_single_static_file<N: HlNodeType>(
|
||||
fn migrate_single_static_file<N: NodeTypesForProvider<Primitives = HlPrimitives>>(
|
||||
sf_out: &StaticFileProvider<HlPrimitives>,
|
||||
sf_in: &StaticFileProvider<HlPrimitives>,
|
||||
provider: &DatabaseProvider<Tx<RO>, NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
|
||||
@ -331,8 +261,11 @@ fn migrate_single_static_file<N: HlNodeType>(
|
||||
let mut writer = sf_out.get_writer(*block_range.start(), StaticFileSegment::Headers)?;
|
||||
let new_headers = std::iter::zip(headers, receipts)
|
||||
.map(|(header, receipts)| {
|
||||
let system_tx_count =
|
||||
receipts.iter().filter(|r| r.cumulative_gas_used == 0).count();
|
||||
let eth_header = Header::decompress(&header[0]).unwrap();
|
||||
let hl_header = to_hl_header(receipts, eth_header);
|
||||
let hl_header =
|
||||
HlHeader::from_ethereum_header(eth_header, &receipts, system_tx_count as u64);
|
||||
|
||||
let difficulty: U256 = CompactU256::decompress(&header[1]).unwrap().into();
|
||||
let hash = BlockHash::decompress(&header[2]).unwrap();
|
||||
@ -348,11 +281,6 @@ fn migrate_single_static_file<N: HlNodeType>(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn to_hl_header(receipts: Vec<EthereumReceipt>, eth_header: Header) -> HlHeader {
|
||||
let system_tx_count = receipts.iter().filter(|r| r.cumulative_gas_used == 0).count();
|
||||
HlHeader::from_ethereum_header(eth_header, &receipts, system_tx_count as u64)
|
||||
}
|
||||
|
||||
fn old_headers_range(
|
||||
provider: &StaticFileProvider<HlPrimitives>,
|
||||
block_range: impl std::ops::RangeBounds<u64>,
|
||||
@ -388,18 +316,3 @@ fn to_range<R: std::ops::RangeBounds<u64>>(bounds: R) -> std::ops::Range<u64> {
|
||||
|
||||
start..end
|
||||
}
|
||||
|
||||
fn using_old_header(number: u64, header: &[u8]) -> bool {
|
||||
let deserialized_old = is_old_header(header);
|
||||
let deserialized_new = is_new_header(header);
|
||||
|
||||
assert!(
|
||||
deserialized_old ^ deserialized_new,
|
||||
"Header is not valid: {} {}\ndeserialized_old: {}\ndeserialized_new: {}",
|
||||
number,
|
||||
header.encode_hex(),
|
||||
deserialized_old,
|
||||
deserialized_new
|
||||
);
|
||||
deserialized_old && !deserialized_new
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user