feat: add reth dump-stage command (#1328)

This commit is contained in:
joshieDo
2023-02-15 01:23:33 +08:00
committed by GitHub
parent 6e89af9e8e
commit 313bf28501
14 changed files with 459 additions and 9 deletions

View File

@ -4,7 +4,7 @@ use std::str::FromStr;
use crate::{
chain, db,
dirs::{LogsDir, PlatformPath},
node, p2p,
dump_stage, node, p2p,
runner::CliRunner,
stage, test_eth_chain, test_vectors,
};
@ -30,6 +30,7 @@ pub fn run() -> eyre::Result<()> {
Commands::Import(command) => runner.run_until_ctrl_c(command.execute()),
Commands::Db(command) => runner.run_until_ctrl_c(command.execute()),
Commands::Stage(command) => runner.run_until_ctrl_c(command.execute()),
Commands::DumpStage(command) => runner.run_until_ctrl_c(command.execute()),
Commands::P2P(command) => runner.run_until_ctrl_c(command.execute()),
Commands::TestVectors(command) => runner.run_until_ctrl_c(command.execute()),
Commands::TestEthChain(command) => runner.run_until_ctrl_c(command.execute()),
@ -59,6 +60,9 @@ pub enum Commands {
/// a lot of memory to store all the data.
#[command(name = "stage")]
Stage(stage::Command),
/// Dumps a stage from a range into a new database.
#[command(name = "dump-stage")]
DumpStage(dump_stage::Command),
/// P2P Debugging utilities
#[command(name = "p2p")]
P2P(p2p::Command),

View File

@ -189,13 +189,13 @@ impl Command {
}
/// Wrapper over DB that implements many useful DB queries.
struct DbTool<'a, DB: Database> {
pub(crate) struct DbTool<'a, DB: Database> {
pub(crate) db: &'a DB,
}
impl<'a, DB: Database> DbTool<'a, DB> {
/// Takes a DB where the tables have already been created.
fn new(db: &'a DB) -> eyre::Result<Self> {
pub(crate) fn new(db: &'a DB) -> eyre::Result<Self> {
Ok(Self { db })
}

View File

@ -0,0 +1,145 @@
use crate::{
db::DbTool,
dirs::{DbPath, PlatformPath},
dump_stage::setup,
};
use eyre::Result;
use reth_db::{
cursor::DbCursorRO, database::Database, table::TableImporter, tables, transaction::DbTx,
};
use reth_provider::Transaction;
use reth_stages::{stages::ExecutionStage, Stage, StageId, UnwindInput};
use std::ops::DerefMut;
use tracing::info;
pub(crate) async fn dump_execution_stage<DB: Database>(
db_tool: &mut DbTool<'_, DB>,
from: u64,
to: u64,
output_db: &PlatformPath<DbPath>,
should_run: bool,
) -> Result<()> {
let (output_db, tip_block_number) = setup::<DB>(from, to, output_db, db_tool)?;
import_tables_with_range::<DB>(&output_db, db_tool, from, to)?;
unwind_and_copy::<DB>(db_tool, from, tip_block_number, &output_db).await?;
if should_run {
dry_run(output_db, to, from).await?;
}
Ok(())
}
/// Imports all the tables that can be copied over a range.
fn import_tables_with_range<DB: Database>(
output_db: &reth_db::mdbx::Env<reth_db::mdbx::WriteMap>,
db_tool: &mut DbTool<'_, DB>,
from: u64,
to: u64,
) -> eyre::Result<()> {
// We're not sharing the transaction in case the memory grows too much.
output_db.update(|tx| {
tx.import_table_with_range::<tables::CanonicalHeaders, _>(&db_tool.db.tx()?, Some(from), to)
})??;
output_db.update(|tx| {
tx.import_table_with_range::<tables::HeaderTD, _>(&db_tool.db.tx()?, Some(from), to)
})??;
output_db.update(|tx| {
tx.import_table_with_range::<tables::Headers, _>(&db_tool.db.tx()?, Some(from), to)
})??;
output_db.update(|tx| {
tx.import_table_with_range::<tables::BlockBodies, _>(&db_tool.db.tx()?, Some(from), to)
})??;
output_db.update(|tx| {
tx.import_table_with_range::<tables::BlockOmmers, _>(&db_tool.db.tx()?, Some(from), to)
})??;
// Find range of transactions that need to be copied over
let (from_tx, to_tx) = db_tool.db.view(|read_tx| {
let mut read_cursor = read_tx.cursor_read::<tables::BlockBodies>()?;
let (_, from_block) =
read_cursor.seek(from)?.ok_or(eyre::eyre!("BlockBody {from} does not exist."))?;
let (_, to_block) =
read_cursor.seek(to)?.ok_or(eyre::eyre!("BlockBody {to} does not exist."))?;
Ok::<(u64, u64), eyre::ErrReport>((
from_block.start_tx_id,
to_block.start_tx_id + to_block.tx_count,
))
})??;
output_db.update(|tx| {
tx.import_table_with_range::<tables::Transactions, _>(
&db_tool.db.tx()?,
Some(from_tx),
to_tx,
)
})??;
output_db.update(|tx| {
tx.import_table_with_range::<tables::TxSenders, _>(&db_tool.db.tx()?, Some(from_tx), to_tx)
})??;
Ok(())
}
/// Dry-run an unwind to FROM block, so we can get the PlainStorageState and
/// PlainAccountState safely. There might be some state dependency from an address
/// which hasn't been changed in the given range.
async fn unwind_and_copy<DB: Database>(
db_tool: &mut DbTool<'_, DB>,
from: u64,
tip_block_number: u64,
output_db: &reth_db::mdbx::Env<reth_db::mdbx::WriteMap>,
) -> eyre::Result<()> {
let mut unwind_tx = Transaction::new(db_tool.db)?;
let mut exec_stage = ExecutionStage::default();
exec_stage
.unwind(
&mut unwind_tx,
UnwindInput { unwind_to: from, stage_progress: tip_block_number, bad_block: None },
)
.await?;
let unwind_inner_tx = unwind_tx.deref_mut();
output_db.update(|tx| tx.import_dupsort::<tables::PlainStorageState, _>(unwind_inner_tx))??;
output_db.update(|tx| tx.import_table::<tables::PlainAccountState, _>(unwind_inner_tx))??;
output_db.update(|tx| tx.import_table::<tables::Bytecodes, _>(unwind_inner_tx))??;
unwind_tx.drop()?;
Ok(())
}
/// Try to re-execute the stage without committing
async fn dry_run(
output_db: reth_db::mdbx::Env<reth_db::mdbx::WriteMap>,
to: u64,
from: u64,
) -> eyre::Result<()> {
info!(target: "reth::cli", "Executing stage. [dry-run]");
let mut tx = Transaction::new(&output_db)?;
let mut exec_stage = ExecutionStage::default();
exec_stage
.execute(
&mut tx,
reth_stages::ExecInput {
previous_stage: Some((StageId("Another"), to)),
stage_progress: Some(from),
},
)
.await?;
tx.drop()?;
info!(target: "reth::cli", "Success.");
Ok(())
}

View File

@ -0,0 +1,85 @@
use crate::{
db::DbTool,
dirs::{DbPath, PlatformPath},
dump_stage::setup,
};
use eyre::Result;
use reth_db::{database::Database, table::TableImporter, tables};
use reth_provider::Transaction;
use reth_stages::{stages::StorageHashingStage, Stage, StageId, UnwindInput};
use std::ops::DerefMut;
use tracing::info;
pub(crate) async fn dump_hashing_storage_stage<DB: Database>(
db_tool: &mut DbTool<'_, DB>,
from: u64,
to: u64,
output_db: &PlatformPath<DbPath>,
should_run: bool,
) -> Result<()> {
let (output_db, tip_block_number) = setup::<DB>(from, to, output_db, db_tool)?;
unwind_and_copy::<DB>(db_tool, from, tip_block_number, &output_db).await?;
// Try to re-execute the stage without committing
if should_run {
dry_run(output_db, to, from).await?;
}
Ok(())
}
/// Dry-run an unwind to FROM block and copy the necessary table data to the new database.
async fn unwind_and_copy<DB: Database>(
db_tool: &mut DbTool<'_, DB>,
from: u64,
tip_block_number: u64,
output_db: &reth_db::mdbx::Env<reth_db::mdbx::WriteMap>,
) -> eyre::Result<()> {
let mut unwind_tx = Transaction::new(db_tool.db)?;
let mut exec_stage = StorageHashingStage::default();
exec_stage
.unwind(
&mut unwind_tx,
UnwindInput { unwind_to: from, stage_progress: tip_block_number, bad_block: None },
)
.await?;
let unwind_inner_tx = unwind_tx.deref_mut();
// TODO optimize we can actually just get the entries we need for both these tables
output_db.update(|tx| tx.import_dupsort::<tables::PlainStorageState, _>(unwind_inner_tx))??;
output_db.update(|tx| tx.import_dupsort::<tables::StorageChangeSet, _>(unwind_inner_tx))??;
unwind_tx.drop()?;
Ok(())
}
/// Try to re-execute the stage without committing
async fn dry_run(
output_db: reth_db::mdbx::Env<reth_db::mdbx::WriteMap>,
to: u64,
from: u64,
) -> eyre::Result<()> {
info!(target: "reth::cli", "Executing stage. [dry-run]");
let mut tx = Transaction::new(&output_db)?;
let mut stage = StorageHashingStage { clean_threshold: 1, ..Default::default() };
stage
.execute(
&mut tx,
reth_stages::ExecInput {
previous_stage: Some((StageId("Another"), to)),
stage_progress: Some(from),
},
)
.await?;
tx.drop()?;
info!(target: "reth::cli", "Success.");
Ok(())
}

View File

@ -0,0 +1,123 @@
//! Database debugging tool
mod hashing_storage;
use hashing_storage::dump_hashing_storage_stage;
mod execution;
use execution::dump_execution_stage;
use crate::{
db::DbTool,
dirs::{DbPath, PlatformPath},
};
use clap::Parser;
use reth_db::{
cursor::DbCursorRO, database::Database, table::TableImporter, tables, transaction::DbTx,
};
use reth_staged_sync::utils::init::init_db;
use tracing::info;
/// `reth dump-stage` command
#[derive(Debug, Parser)]
pub struct Command {
/// The path to the database folder.
///
/// Defaults to the OS-specific data directory:
///
/// - Linux: `$XDG_DATA_HOME/reth/db` or `$HOME/.local/share/reth/db`
/// - Windows: `{FOLDERID_RoamingAppData}/reth/db`
/// - macOS: `$HOME/Library/Application Support/reth/db`
#[arg(long, value_name = "PATH", verbatim_doc_comment, default_value_t)]
db: PlatformPath<DbPath>,
#[clap(subcommand)]
command: Stages,
}
/// Supported stages to be dumped
#[derive(Debug, Clone, Parser)]
pub enum Stages {
/// Execution stage.
Execution(StageCommand),
/// StorageHashing stage.
StorageHashing(StageCommand),
}
/// Stage command that takes a range
#[derive(Debug, Clone, Parser)]
pub struct StageCommand {
/// The path to the new database folder.
///
/// Defaults to the OS-specific data directory:
///
/// - Linux: `$XDG_DATA_HOME/reth/db` or `$HOME/.local/share/reth/db`
/// - Windows: `{FOLDERID_RoamingAppData}/reth/db`
/// - macOS: `$HOME/Library/Application Support/reth/db`
#[arg(long, value_name = "OUTPUT_PATH", verbatim_doc_comment, default_value_t)]
output_db: PlatformPath<DbPath>,
/// From which block.
#[arg(long, short)]
from: u64,
/// To which block.
#[arg(long, short)]
to: u64,
/// If passed, it will dry-run a stage execution from the newly created database right after
/// dumping.
#[arg(long, short, default_value = "false")]
dry_run: bool,
}
impl Command {
/// Execute `dump-stage` command
pub async fn execute(&self) -> eyre::Result<()> {
std::fs::create_dir_all(&self.db)?;
// TODO: Auto-impl for Database trait
let db = reth_db::mdbx::Env::<reth_db::mdbx::WriteMap>::open(
self.db.as_ref(),
reth_db::mdbx::EnvKind::RW,
)?;
let mut tool = DbTool::new(&db)?;
match &self.command {
Stages::Execution(StageCommand { output_db, from, to, dry_run, .. }) => {
dump_execution_stage(&mut tool, *from, *to, output_db, *dry_run).await?
}
Stages::StorageHashing(StageCommand { output_db, from, to, dry_run, .. }) => {
dump_hashing_storage_stage(&mut tool, *from, *to, output_db, *dry_run).await?
}
}
Ok(())
}
}
/// Sets up the database and initial state on `BlockTransitionIndex`. Also returns the tip block
/// number.
pub(crate) fn setup<DB: Database>(
from: u64,
to: u64,
output_db: &PlatformPath<DbPath>,
db_tool: &mut DbTool<'_, DB>,
) -> eyre::Result<(reth_db::mdbx::Env<reth_db::mdbx::WriteMap>, u64)> {
assert!(from < to, "FROM block should be bigger than TO block.");
info!(target: "reth::cli", "Creating separate db at {}", output_db);
let output_db = init_db(output_db)?;
output_db.update(|tx| {
tx.import_table_with_range::<tables::BlockTransitionIndex, _>(
&db_tool.db.tx()?,
Some(from - 1),
to + 1,
)
})??;
let (tip_block_number, _) = db_tool
.db
.view(|tx| tx.cursor_read::<tables::BlockTransitionIndex>()?.last())??
.expect("some");
Ok((output_db, tip_block_number))
}

View File

@ -12,6 +12,7 @@ pub mod chain;
pub mod cli;
pub mod db;
pub mod dirs;
pub mod dump_stage;
pub mod node;
pub mod p2p;
pub mod prometheus_exporter;

View File

@ -360,8 +360,10 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
for (key, storage) in storage_changeset_batch.into_iter().rev() {
let address = key.address();
if plain_storage_cursor.seek_by_key_subkey(address, storage.key)?.is_some() {
plain_storage_cursor.delete_current()?;
if let Some(v) = plain_storage_cursor.seek_by_key_subkey(address, storage.key)? {
if v.key == storage.key {
plain_storage_cursor.delete_current()?;
}
}
if storage.value != U256::ZERO {
plain_storage_cursor.upsert(address, storage)?;

View File

@ -1,5 +1,6 @@
use crate::{
common::{Bounds, Sealed},
table::TableImporter,
transaction::{DbTx, DbTxMut},
Error,
};
@ -13,7 +14,7 @@ pub trait DatabaseGAT<'a, __ImplicitBounds: Sealed = Bounds<&'a Self>>: Send + S
/// RO database transaction
type TX: DbTx<'a> + Send + Sync;
/// RW database transaction
type TXMut: DbTxMut<'a> + DbTx<'a> + Send + Sync;
type TXMut: DbTxMut<'a> + DbTx<'a> + TableImporter<'a> + Send + Sync;
}
/// Main Database trait that spawns transactions to be executed.

View File

@ -8,7 +8,7 @@ use crate::{
ReverseWalker, Walker,
},
database::{Database, DatabaseGAT},
table::{DupSort, Table},
table::{DupSort, Table, TableImporter},
transaction::{DbTx, DbTxGAT, DbTxMut, DbTxMutGAT},
Error,
};
@ -63,6 +63,10 @@ impl<'a> DbTx<'a> for TxMock {
todo!()
}
fn drop(self) {
todo!()
}
fn cursor_read<T: Table>(&self) -> Result<<Self as DbTxGAT<'_>>::Cursor<T>, Error> {
todo!()
}
@ -96,6 +100,8 @@ impl<'a> DbTxMut<'a> for TxMock {
}
}
impl<'a> TableImporter<'a> for TxMock {}
/// CUrsor that iterates over table
pub struct CursorMock {
_cursor: u32,

View File

@ -1,4 +1,8 @@
use crate::Error;
use crate::{
abstraction::cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
transaction::{DbTx, DbTxMut},
Error,
};
use bytes::Bytes;
use serde::Serialize;
use std::{
@ -76,3 +80,57 @@ pub trait DupSort: Table {
/// Upstream docs: <https://libmdbx.dqdkfa.ru/usage.html#autotoc_md48>
type SubKey: Key;
}
/// Allows duplicating tables across databases
pub trait TableImporter<'tx>: for<'a> DbTxMut<'a> {
/// Imports all table data from another transaction.
fn import_table<T: Table, R: DbTx<'tx>>(&self, source_tx: &R) -> Result<(), Error> {
let mut destination_cursor = self.cursor_write::<T>()?;
for kv in source_tx.cursor_read::<T>()?.walk(None)? {
let (k, v) = kv?;
destination_cursor.append(k, v)?;
}
Ok(())
}
/// Imports table data from another transaction within a range.
fn import_table_with_range<T: Table, R: DbTx<'tx>>(
&self,
source_tx: &R,
from: Option<<T as Table>::Key>,
to: <T as Table>::Key,
) -> Result<(), Error> {
let mut destination_cursor = self.cursor_write::<T>()?;
let mut source_cursor = source_tx.cursor_read::<T>()?;
for row in source_cursor.walk(from)? {
let (key, value) = row?;
let finished = key == to;
destination_cursor.append(key, value)?;
if finished {
break
}
}
Ok(())
}
/// Imports all dupsort data from another transaction.
fn import_dupsort<T: DupSort, R: DbTx<'tx>>(&self, source_tx: &R) -> Result<(), Error> {
let mut destination_cursor = self.cursor_dup_write::<T>()?;
let mut cursor = source_tx.cursor_dup_read::<T>()?;
while let Some((k, _)) = cursor.next_no_dup()? {
for kv in cursor.walk_dup(Some(k), None)? {
let (k, v) = kv?;
destination_cursor.append_dup(k, v)?;
}
}
Ok(())
}
}

View File

@ -39,6 +39,8 @@ pub trait DbTx<'tx>: for<'a> DbTxGAT<'a> {
/// Commit for read only transaction will consume and free transaction and allows
/// freeing of memory pages
fn commit(self) -> Result<bool, Error>;
/// Drops transaction
fn drop(self);
/// Iterate over read only values in table.
fn cursor_read<T: Table>(&self) -> Result<<Self as DbTxGAT<'_>>::Cursor<T>, Error>;
/// Iterate over read only values in dup sorted table.

View File

@ -2,7 +2,7 @@
use super::cursor::Cursor;
use crate::{
table::{Compress, DupSort, Encode, Table},
table::{Compress, DupSort, Encode, Table, TableImporter},
tables::utils::decode_one,
transaction::{DbTx, DbTxGAT, DbTxMut, DbTxMutGAT},
Error,
@ -57,6 +57,8 @@ impl<'a, K: TransactionKind, E: EnvironmentKind> DbTxMutGAT<'a> for Tx<'_, K, E>
type DupCursorMut<T: DupSort> = Cursor<'a, RW, T>;
}
impl<'a, E: EnvironmentKind> TableImporter<'a> for Tx<'_, RW, E> {}
impl<'tx, K: TransactionKind, E: EnvironmentKind> DbTx<'tx> for Tx<'tx, K, E> {
// Iterate over read only values in database.
fn cursor_read<T: Table>(&self) -> Result<<Self as DbTxGAT<'_>>::Cursor<T>, Error> {
@ -75,6 +77,10 @@ impl<'tx, K: TransactionKind, E: EnvironmentKind> DbTx<'tx> for Tx<'tx, K, E> {
result
}
fn drop(self) {
drop(self.inner)
}
fn get<T: Table>(&self, key: T::Key) -> Result<Option<<T as Table>::Value>, Error> {
self.inner
.get(

View File

@ -97,6 +97,12 @@ impl From<(u64, H256)> for BlockNumHash {
}
}
impl From<u64> for BlockNumHash {
fn from(tpl: u64) -> Self {
BlockNumHash((tpl, H256::default()))
}
}
impl Encode for BlockNumHash {
type Encoded = [u8; 40];

View File

@ -88,6 +88,17 @@ where
Ok(success)
}
/// Drops the current inner transaction and open a new one.
pub fn drop(&mut self) -> Result<(), DbError> {
if let Some(tx) = self.tx.take() {
drop(tx);
}
self.tx = Some(self.db.tx_mut()?);
Ok(())
}
/// Open a new inner transaction.
pub fn open(&mut self) -> Result<(), DbError> {
self.tx = Some(self.db.tx_mut()?);