Revert "feat: add reth dump-stage command" (#1327)

This commit is contained in:
Georgios Konstantopoulos
2023-02-13 19:10:58 -08:00
committed by GitHub
parent b3ff45229a
commit 49292091dd
12 changed files with 24 additions and 327 deletions

View File

@ -4,7 +4,7 @@ use std::str::FromStr;
use crate::{
chain, db,
dirs::{LogsDir, PlatformPath},
dump_stage, node, p2p,
node, p2p,
runner::CliRunner,
stage, test_eth_chain, test_vectors,
};
@ -30,7 +30,6 @@ pub fn run() -> eyre::Result<()> {
Commands::Import(command) => runner.run_until_ctrl_c(command.execute()),
Commands::Db(command) => runner.run_until_ctrl_c(command.execute()),
Commands::Stage(command) => runner.run_until_ctrl_c(command.execute()),
Commands::DumpStage(command) => runner.run_until_ctrl_c(command.execute()),
Commands::P2P(command) => runner.run_until_ctrl_c(command.execute()),
Commands::TestVectors(command) => runner.run_until_ctrl_c(command.execute()),
Commands::TestEthChain(command) => runner.run_until_ctrl_c(command.execute()),
@ -60,9 +59,6 @@ pub enum Commands {
/// a lot of memory to store all the data.
#[command(name = "stage")]
Stage(stage::Command),
/// Dumps a stage from a range into a new database.
#[command(name = "dump-stage")]
DumpStage(dump_stage::Command),
/// P2P Debugging utilities
#[command(name = "p2p")]
P2P(p2p::Command),

View File

@ -189,13 +189,13 @@ impl Command {
}
/// Wrapper over DB that implements many useful DB queries.
pub(crate) struct DbTool<'a, DB: Database> {
struct DbTool<'a, DB: Database> {
pub(crate) db: &'a DB,
}
impl<'a, DB: Database> DbTool<'a, DB> {
/// Takes a DB where the tables have already been created.
pub(crate) fn new(db: &'a DB) -> eyre::Result<Self> {
fn new(db: &'a DB) -> eyre::Result<Self> {
Ok(Self { db })
}

View File

@ -1,203 +0,0 @@
//! Database debugging tool
use crate::dirs::{DbPath, PlatformPath};
use clap::Parser;
use eyre::Result;
use reth_db::{
cursor::DbCursorRO, database::Database, table::TableImporter, tables, transaction::DbTx,
};
use reth_provider::Transaction;
use reth_staged_sync::utils::init::init_db;
use reth_stages::{stages::ExecutionStage, Stage, StageId, UnwindInput};
use std::ops::DerefMut;
use tracing::info;
use crate::db::DbTool;
/// `reth dump-stage` command
#[derive(Debug, Parser)]
pub struct Command {
/// The path to the database folder.
///
/// Defaults to the OS-specific data directory:
///
/// - Linux: `$XDG_DATA_HOME/reth/db` or `$HOME/.local/share/reth/db`
/// - Windows: `{FOLDERID_RoamingAppData}/reth/db`
/// - macOS: `$HOME/Library/Application Support/reth/db`
#[arg(long, value_name = "PATH", verbatim_doc_comment, default_value_t)]
db: PlatformPath<DbPath>,
#[clap(subcommand)]
command: Stages,
}
/// Supported stages to be dumped
#[derive(Debug, Clone, Parser)]
pub enum Stages {
/// Execution stage.
Execution(StageCommand),
}
/// Stage command that takes a range
#[derive(Debug, Clone, Parser)]
pub struct StageCommand {
/// The path to the new database folder.
///
/// Defaults to the OS-specific data directory:
///
/// - Linux: `$XDG_DATA_HOME/reth/db` or `$HOME/.local/share/reth/db`
/// - Windows: `{FOLDERID_RoamingAppData}/reth/db`
/// - macOS: `$HOME/Library/Application Support/reth/db`
#[arg(long, value_name = "OUTPUT_PATH", verbatim_doc_comment, default_value_t)]
output_db: PlatformPath<DbPath>,
/// From which block.
#[arg(long, short)]
from: u64,
/// To which block.
#[arg(long, short)]
to: u64,
/// If passed, it will dry-run a stage execution from the newly created database right after
/// dumping.
#[arg(long, short, default_value = "false")]
dry_run: bool,
}
impl Command {
/// Execute `dump-stage` command
pub async fn execute(&self) -> eyre::Result<()> {
std::fs::create_dir_all(&self.db)?;
// TODO: Auto-impl for Database trait
let db = reth_db::mdbx::Env::<reth_db::mdbx::WriteMap>::open(
self.db.as_ref(),
reth_db::mdbx::EnvKind::RW,
)?;
let mut tool = DbTool::new(&db)?;
match &self.command {
Stages::Execution(StageCommand { output_db, from, to, dry_run, .. }) => {
dump_execution_stage(&mut tool, *from, *to, output_db, *dry_run).await?
}
}
Ok(())
}
}
async fn dump_execution_stage<DB: Database>(
db_tool: &mut DbTool<'_, DB>,
from: u64,
to: u64,
output_db: &PlatformPath<DbPath>,
dry_run: bool,
) -> Result<()> {
assert!(from < to, "FROM block should be bigger than TO block.");
info!(target: "reth::cli", "Creating separate db at {}", output_db);
let output_db = init_db(output_db)?;
// Copy input tables. We're not sharing the transaction in case the memory grows too much.
output_db.update(|tx| {
tx.import_table_with_range::<tables::CanonicalHeaders, _>(&db_tool.db.tx()?, Some(from), to)
})??;
output_db.update(|tx| {
tx.import_table_with_range::<tables::HeaderTD, _>(&db_tool.db.tx()?, Some(from), to)
})??;
output_db.update(|tx| {
tx.import_table_with_range::<tables::Headers, _>(&db_tool.db.tx()?, Some(from), to)
})??;
output_db.update(|tx| {
tx.import_table_with_range::<tables::BlockBodies, _>(&db_tool.db.tx()?, Some(from), to)
})??;
output_db.update(|tx| {
tx.import_table_with_range::<tables::BlockOmmers, _>(&db_tool.db.tx()?, Some(from), to)
})??;
output_db.update(|tx| {
tx.import_table_with_range::<tables::BlockTransitionIndex, _>(
&db_tool.db.tx()?,
Some(from - 1),
to + 1,
)
})??;
// Find range of transactions that need to be copied over
let (from_tx, to_tx) = db_tool.db.view(|read_tx| {
let mut read_cursor = read_tx.cursor_read::<tables::BlockBodies>()?;
let (_, from_block) =
read_cursor.seek(from)?.ok_or(eyre::eyre!("BlockBody {from} does not exist."))?;
let (_, to_block) =
read_cursor.seek(to)?.ok_or(eyre::eyre!("BlockBody {to} does not exist."))?;
Ok::<(u64, u64), eyre::ErrReport>((
from_block.start_tx_id,
to_block.start_tx_id + to_block.tx_count,
))
})??;
output_db.update(|tx| {
tx.import_table_with_range::<tables::Transactions, _>(
&db_tool.db.tx()?,
Some(from_tx),
to_tx,
)
})??;
output_db.update(|tx| {
tx.import_table_with_range::<tables::TxSenders, _>(&db_tool.db.tx()?, Some(from_tx), to_tx)
})??;
// Find the latest block to unwind from
let (tip_block_number, _) = db_tool
.db
.view(|tx| tx.cursor_read::<tables::BlockTransitionIndex>()?.last())??
.expect("some");
// Dry-run an unwind to FROM block, so we can get the PlainStorageState and
// PlainAccountState safely. There might be some state dependency from an address
// which hasn't been changed in the given range.
{
let mut unwind_tx = Transaction::new(db_tool.db)?;
let mut exec_stage = ExecutionStage::default();
exec_stage
.unwind(
&mut unwind_tx,
UnwindInput { unwind_to: from, stage_progress: tip_block_number, bad_block: None },
)
.await?;
let unwind_inner_tx = unwind_tx.deref_mut();
output_db
.update(|tx| tx.import_dupsort::<tables::PlainStorageState, _>(unwind_inner_tx))??;
output_db
.update(|tx| tx.import_table::<tables::PlainAccountState, _>(unwind_inner_tx))??;
output_db.update(|tx| tx.import_table::<tables::Bytecodes, _>(unwind_inner_tx))??;
// We don't want to actually commit these changes to our original database.
unwind_tx.drop()?;
}
// Try to re-execute the stage without committing
if dry_run {
info!(target: "reth::cli", "Executing stage. [dry-run]");
let mut tx = Transaction::new(&output_db)?;
let mut exec_stage = ExecutionStage::default();
exec_stage
.execute(
&mut tx,
reth_stages::ExecInput {
previous_stage: Some((StageId("Another"), to)),
stage_progress: Some(from),
},
)
.await?;
tx.drop()?;
info!(target: "reth::cli", "Success.");
}
Ok(())
}

View File

@ -12,7 +12,6 @@ pub mod chain;
pub mod cli;
pub mod db;
pub mod dirs;
pub mod dump_stage;
pub mod node;
pub mod p2p;
pub mod prometheus_exporter;

View File

@ -332,6 +332,21 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
return Ok(UnwindOutput { stage_progress: input.unwind_to })
}
// get all batches for account change
// Check if walk and walk_dup would do the same thing
let account_changeset_batch = account_changeset
.walk_range(from_transition_rev..to_transition_rev)?
.collect::<Result<Vec<_>, _>>()?;
// revert all changes to PlainState
for (_, changeset) in account_changeset_batch.into_iter().rev() {
if let Some(account_info) = changeset.info {
tx.put::<tables::PlainAccountState>(changeset.address, account_info)?;
} else {
tx.delete::<tables::PlainAccountState>(changeset.address, None)?;
}
}
// get all batches for storage change
let storage_changeset_batch = storage_changeset
.walk_range(
@ -345,34 +360,14 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
for (key, storage) in storage_changeset_batch.into_iter().rev() {
let address = key.address();
if let Some(v) = plain_storage_cursor.seek_by_key_subkey(address, storage.key)? {
if v.key == storage.key {
plain_storage_cursor.delete_current()?;
}
if plain_storage_cursor.seek_by_key_subkey(address, storage.key)?.is_some() {
plain_storage_cursor.delete_current()?;
}
if storage.value != U256::ZERO {
plain_storage_cursor.upsert(address, storage)?;
}
}
// Get all batches for account change
// Check if walk and walk_dup would do the same thing
let account_changeset_batch = account_changeset
.walk_range(from_transition_rev..to_transition_rev)?
.collect::<Result<Vec<_>, _>>()?;
// revert all changes to PlainState
// Needs to happen after the storage unwind, so we don't end up inserting a storage value
// into a deleted key (eg. contract creation)
for (_, changeset) in account_changeset_batch.into_iter().rev() {
if let Some(account_info) = changeset.info {
tx.put::<tables::PlainAccountState>(changeset.address, account_info)?;
} else {
tx.delete::<tables::PlainAccountState>(changeset.address, None)?;
tx.delete::<tables::PlainStorageState>(changeset.address, None)?;
}
}
// Discard unwinded changesets
let mut rev_acc_changeset_walker = account_changeset.walk_back(None)?;
while let Some((transition_id, _)) = rev_acc_changeset_walker.next().transpose()? {

View File

@ -1,6 +1,5 @@
use crate::{
common::{Bounds, Sealed},
table::TableImporter,
transaction::{DbTx, DbTxMut},
Error,
};
@ -14,7 +13,7 @@ pub trait DatabaseGAT<'a, __ImplicitBounds: Sealed = Bounds<&'a Self>>: Send + S
/// RO database transaction
type TX: DbTx<'a> + Send + Sync;
/// RW database transaction
type TXMut: DbTxMut<'a> + DbTx<'a> + TableImporter<'a> + Send + Sync;
type TXMut: DbTxMut<'a> + DbTx<'a> + Send + Sync;
}
/// Main Database trait that spawns transactions to be executed.

View File

@ -8,7 +8,7 @@ use crate::{
ReverseWalker, Walker,
},
database::{Database, DatabaseGAT},
table::{DupSort, Table, TableImporter},
table::{DupSort, Table},
transaction::{DbTx, DbTxGAT, DbTxMut, DbTxMutGAT},
Error,
};
@ -63,10 +63,6 @@ impl<'a> DbTx<'a> for TxMock {
todo!()
}
fn drop(self) {
todo!()
}
fn cursor_read<T: Table>(&self) -> Result<<Self as DbTxGAT<'_>>::Cursor<T>, Error> {
todo!()
}
@ -100,8 +96,6 @@ impl<'a> DbTxMut<'a> for TxMock {
}
}
impl<'a> TableImporter<'a> for TxMock {}
/// CUrsor that iterates over table
pub struct CursorMock {
_cursor: u32,

View File

@ -1,8 +1,4 @@
use crate::{
abstraction::cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
transaction::{DbTx, DbTxMut},
Error,
};
use crate::Error;
use bytes::Bytes;
use serde::Serialize;
use std::{
@ -80,57 +76,3 @@ pub trait DupSort: Table {
/// Upstream docs: <https://libmdbx.dqdkfa.ru/usage.html#autotoc_md48>
type SubKey: Key;
}
/// Allows duplicating tables across databases
pub trait TableImporter<'tx>: for<'a> DbTxMut<'a> {
/// Imports all table data from another transaction.
fn import_table<T: Table, R: DbTx<'tx>>(&self, source_tx: &R) -> Result<(), Error> {
let mut destination_cursor = self.cursor_write::<T>()?;
for kv in source_tx.cursor_read::<T>()?.walk(None)? {
let (k, v) = kv?;
destination_cursor.append(k, v)?;
}
Ok(())
}
/// Imports table data from another transaction within a range.
fn import_table_with_range<T: Table, R: DbTx<'tx>>(
&self,
source_tx: &R,
from: Option<<T as Table>::Key>,
to: <T as Table>::Key,
) -> Result<(), Error> {
let mut destination_cursor = self.cursor_write::<T>()?;
let mut source_cursor = source_tx.cursor_read::<T>()?;
for row in source_cursor.walk(from)? {
let (key, value) = row?;
let finished = key == to;
destination_cursor.append(key, value)?;
if finished {
break
}
}
Ok(())
}
/// Imports all dupsort data from another transaction.
fn import_dupsort<T: DupSort, R: DbTx<'tx>>(&self, source_tx: &R) -> Result<(), Error> {
let mut destination_cursor = self.cursor_dup_write::<T>()?;
let mut cursor = source_tx.cursor_dup_read::<T>()?;
while let Some((k, _)) = cursor.next_no_dup()? {
for kv in cursor.walk_dup(Some(k), None)? {
let (k, v) = kv?;
destination_cursor.append_dup(k, v)?;
}
}
Ok(())
}
}

View File

@ -39,8 +39,6 @@ pub trait DbTx<'tx>: for<'a> DbTxGAT<'a> {
/// Commit for read only transaction will consume and free transaction and allows
/// freeing of memory pages
fn commit(self) -> Result<bool, Error>;
/// Drops transaction
fn drop(self);
/// Iterate over read only values in table.
fn cursor_read<T: Table>(&self) -> Result<<Self as DbTxGAT<'_>>::Cursor<T>, Error>;
/// Iterate over read only values in dup sorted table.

View File

@ -2,7 +2,7 @@
use super::cursor::Cursor;
use crate::{
table::{Compress, DupSort, Encode, Table, TableImporter},
table::{Compress, DupSort, Encode, Table},
tables::utils::decode_one,
transaction::{DbTx, DbTxGAT, DbTxMut, DbTxMutGAT},
Error,
@ -57,8 +57,6 @@ impl<'a, K: TransactionKind, E: EnvironmentKind> DbTxMutGAT<'a> for Tx<'_, K, E>
type DupCursorMut<T: DupSort> = Cursor<'a, RW, T>;
}
impl<'a, E: EnvironmentKind> TableImporter<'a> for Tx<'_, RW, E> {}
impl<'tx, K: TransactionKind, E: EnvironmentKind> DbTx<'tx> for Tx<'tx, K, E> {
// Iterate over read only values in database.
fn cursor_read<T: Table>(&self) -> Result<<Self as DbTxGAT<'_>>::Cursor<T>, Error> {
@ -77,10 +75,6 @@ impl<'tx, K: TransactionKind, E: EnvironmentKind> DbTx<'tx> for Tx<'tx, K, E> {
result
}
fn drop(self) {
drop(self.inner)
}
fn get<T: Table>(&self, key: T::Key) -> Result<Option<<T as Table>::Value>, Error> {
self.inner
.get(

View File

@ -97,12 +97,6 @@ impl From<(u64, H256)> for BlockNumHash {
}
}
impl From<u64> for BlockNumHash {
fn from(tpl: u64) -> Self {
BlockNumHash((tpl, H256::default()))
}
}
impl Encode for BlockNumHash {
type Encoded = [u8; 40];

View File

@ -88,17 +88,6 @@ where
Ok(success)
}
/// Drops the current inner transaction and open a new one.
pub fn drop(&mut self) -> Result<(), DbError> {
if let Some(tx) = self.tx.take() {
drop(tx);
}
self.tx = Some(self.db.tx_mut()?);
Ok(())
}
/// Open a new inner transaction.
pub fn open(&mut self) -> Result<(), DbError> {
self.tx = Some(self.db.tx_mut()?);