feat: implement table range checksums for reth db checksum (#7623)

Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com>
This commit is contained in:
Abner Zheng
2024-05-23 05:47:20 +08:00
committed by GitHub
parent bc914a64d9
commit 3eddaf31d0
2 changed files with 74 additions and 9 deletions

View File

@ -1,12 +1,15 @@
use crate::utils::DbTool; use crate::{
use ahash::AHasher; commands::db::get::{maybe_json_value_parser, table_key},
utils::DbTool,
};
use ahash::RandomState;
use clap::Parser; use clap::Parser;
use reth_db::{ use reth_db::{
cursor::DbCursorRO, database::Database, table::Table, transaction::DbTx, DatabaseEnv, RawKey, cursor::DbCursorRO, database::Database, table::Table, transaction::DbTx, DatabaseEnv, RawKey,
RawTable, RawValue, TableViewer, Tables, RawTable, RawValue, TableViewer, Tables,
}; };
use std::{ use std::{
hash::Hasher, hash::{BuildHasher, Hasher},
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use tracing::{info, warn}; use tracing::{info, warn};
@ -16,35 +19,81 @@ use tracing::{info, warn};
pub struct Command { pub struct Command {
/// The table name /// The table name
table: Tables, table: Tables,
/// The start of the range to checksum.
#[arg(long, value_parser = maybe_json_value_parser)]
start_key: Option<String>,
/// The end of the range to checksum.
#[arg(long, value_parser = maybe_json_value_parser)]
end_key: Option<String>,
/// The maximum number of records that are queried and used to compute the
/// checksum.
#[arg(long)]
limit: Option<usize>,
} }
impl Command { impl Command {
/// Execute `db checksum` command /// Execute `db checksum` command
pub fn execute(self, tool: &DbTool<DatabaseEnv>) -> eyre::Result<()> { pub fn execute(self, tool: &DbTool<DatabaseEnv>) -> eyre::Result<()> {
warn!("This command should be run without the node running!"); warn!("This command should be run without the node running!");
self.table.view(&ChecksumViewer { tool }) self.table.view(&ChecksumViewer {
tool,
start_key: self.start_key,
end_key: self.end_key,
limit: self.limit,
})
} }
} }
pub(crate) struct ChecksumViewer<'a, DB: Database> { pub(crate) struct ChecksumViewer<'a, DB: Database> {
tool: &'a DbTool<DB>, tool: &'a DbTool<DB>,
start_key: Option<String>,
end_key: Option<String>,
limit: Option<usize>,
} }
impl<DB: Database> ChecksumViewer<'_, DB> { impl<DB: Database> ChecksumViewer<'_, DB> {
pub(crate) fn new(tool: &'_ DbTool<DB>) -> ChecksumViewer<'_, DB> { pub(crate) fn new(tool: &'_ DbTool<DB>) -> ChecksumViewer<'_, DB> {
ChecksumViewer { tool } ChecksumViewer { tool, start_key: None, end_key: None, limit: None }
} }
pub(crate) fn get_checksum<T: Table>(&self) -> Result<(u64, Duration), eyre::Report> { pub(crate) fn get_checksum<T: Table>(&self) -> Result<(u64, Duration), eyre::Report> {
let provider = let provider =
self.tool.provider_factory.provider()?.disable_long_read_transaction_safety(); self.tool.provider_factory.provider()?.disable_long_read_transaction_safety();
let tx = provider.tx_ref(); let tx = provider.tx_ref();
info!(
"Start computing checksum, start={:?}, end={:?}, limit={:?}",
self.start_key, self.end_key, self.limit
);
let mut cursor = tx.cursor_read::<RawTable<T>>()?; let mut cursor = tx.cursor_read::<RawTable<T>>()?;
let walker = cursor.walk(None)?; let walker = match (self.start_key.as_deref(), self.end_key.as_deref()) {
(Some(start), Some(end)) => {
let start_key = table_key::<T>(start).map(RawKey::<T::Key>::new)?;
let end_key = table_key::<T>(end).map(RawKey::<T::Key>::new)?;
cursor.walk_range(start_key..=end_key)?
}
(None, Some(end)) => {
let end_key = table_key::<T>(end).map(RawKey::<T::Key>::new)?;
cursor.walk_range(..=end_key)?
}
(Some(start), None) => {
let start_key = table_key::<T>(start).map(RawKey::<T::Key>::new)?;
cursor.walk_range(start_key..)?
}
(None, None) => cursor.walk_range(..)?,
};
let start_time = Instant::now(); let start_time = Instant::now();
let mut hasher = AHasher::default(); let mut hasher = RandomState::with_seeds(1, 2, 3, 4).build_hasher();
let mut total = 0;
let limit = self.limit.unwrap_or(usize::MAX);
let mut enumerate_start_key = None;
let mut enumerate_end_key = None;
for (index, entry) in walker.enumerate() { for (index, entry) in walker.enumerate() {
let (k, v): (RawKey<T::Key>, RawValue<T::Value>) = entry?; let (k, v): (RawKey<T::Key>, RawValue<T::Value>) = entry?;
@ -54,6 +103,22 @@ impl<DB: Database> ChecksumViewer<'_, DB> {
hasher.write(k.raw_key()); hasher.write(k.raw_key());
hasher.write(v.raw_value()); hasher.write(v.raw_value());
if enumerate_start_key.is_none() {
enumerate_start_key = Some(k.clone());
}
enumerate_end_key = Some(k);
total = index + 1;
if total >= limit {
break
}
}
info!("Hashed {total} entries.");
if let (Some(s), Some(e)) = (enumerate_start_key, enumerate_end_key) {
info!("start-key: {}", serde_json::to_string(&s.key()?).unwrap_or_default());
info!("end-key: {}", serde_json::to_string(&e.key()?).unwrap_or_default());
} }
let checksum = hasher.finish(); let checksum = hasher.finish();

View File

@ -125,7 +125,7 @@ impl Command {
} }
/// Get an instance of key for given table /// Get an instance of key for given table
fn table_key<T: Table>(key: &str) -> Result<T::Key, eyre::Error> { pub(crate) fn table_key<T: Table>(key: &str) -> Result<T::Key, eyre::Error> {
serde_json::from_str::<T::Key>(key).map_err(|e| eyre::eyre!(e)) serde_json::from_str::<T::Key>(key).map_err(|e| eyre::eyre!(e))
} }
@ -188,7 +188,7 @@ impl<DB: Database> TableViewer<()> for GetValueViewer<'_, DB> {
} }
/// Map the user input value to json /// Map the user input value to json
fn maybe_json_value_parser(value: &str) -> Result<String, eyre::Error> { pub(crate) fn maybe_json_value_parser(value: &str) -> Result<String, eyre::Error> {
if serde_json::from_str::<serde::de::IgnoredAny>(value).is_ok() { if serde_json::from_str::<serde::de::IgnoredAny>(value).is_ok() {
Ok(value.to_string()) Ok(value.to_string())
} else { } else {