From a53af3a0f28a1cfeceb1d5dd5bc5440fd6f2ae69 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Wed, 28 Jun 2023 22:03:00 +0100 Subject: [PATCH] refactor: move `init_db` to `reth_db` and add `DatabaseEnv` (#3450) --- Cargo.lock | 2 + bin/reth/src/chain/import.rs | 4 +- bin/reth/src/chain/init.rs | 3 +- bin/reth/src/db/mod.rs | 4 +- bin/reth/src/debug_cmd/execution.rs | 9 +- bin/reth/src/debug_cmd/merkle.rs | 3 +- bin/reth/src/node/mod.rs | 15 ++-- bin/reth/src/prometheus_exporter.rs | 8 +- bin/reth/src/stage/drop.rs | 7 +- bin/reth/src/stage/dump/mod.rs | 4 +- bin/reth/src/stage/run.rs | 2 +- crates/blockchain-tree/src/blockchain_tree.rs | 7 +- crates/consensus/beacon/src/engine/mod.rs | 10 +-- .../interfaces/src/blockchain_tree/error.rs | 1 + crates/interfaces/src/error.rs | 3 + .../net/downloaders/src/bodies/test_utils.rs | 3 +- crates/staged-sync/src/utils/init.rs | 76 ++-------------- crates/stages/benches/criterion.rs | 6 +- crates/stages/benches/setup/mod.rs | 6 +- crates/stages/src/stage.rs | 6 +- crates/stages/src/stages/bodies.rs | 6 +- crates/stages/src/stages/execution.rs | 5 +- crates/stages/src/stages/sender_recovery.rs | 13 ++- crates/stages/src/stages/tx_lookup.rs | 8 +- crates/stages/src/test_utils/runner.rs | 7 +- crates/stages/src/test_utils/test_db.rs | 12 +-- crates/storage/db/Cargo.toml | 1 + crates/storage/db/benches/hash_keys.rs | 14 +-- crates/storage/db/benches/utils.rs | 3 +- crates/storage/db/src/lib.rs | 86 +++++++++++++++++++ crates/storage/provider/Cargo.toml | 1 + crates/storage/provider/src/post_state/mod.rs | 13 +-- .../provider/src/providers/database/mod.rs | 40 ++++++++- .../src/providers/database/provider.rs | 14 +-- crates/trie/src/trie.rs | 7 +- 35 files changed, 234 insertions(+), 175 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a17d38d14..06036c4a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5108,6 +5108,7 @@ dependencies = [ "bytes", "criterion", "derive_more", + "eyre", "futures", "heapless", "iai", @@ -5539,6 +5540,7 @@ dependencies = [ "reth-revm-primitives", "reth-rlp", "reth-trie", + "tempfile", "tokio", "tokio-stream", "tracing", diff --git a/bin/reth/src/chain/import.rs b/bin/reth/src/chain/import.rs index 6a1a79e69..ef430a81d 100644 --- a/bin/reth/src/chain/import.rs +++ b/bin/reth/src/chain/import.rs @@ -11,14 +11,14 @@ use reth_provider::{ProviderFactory, StageCheckpointReader}; use crate::args::utils::genesis_value_parser; use reth_config::Config; -use reth_db::database::Database; +use reth_db::{database::Database, init_db}; use reth_downloaders::{ bodies::bodies::BodiesDownloaderBuilder, headers::reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::FileClient, }; use reth_interfaces::consensus::Consensus; use reth_primitives::{stage::StageId, ChainSpec, H256}; -use reth_staged_sync::utils::init::{init_db, init_genesis}; +use reth_staged_sync::utils::init::init_genesis; use reth_stages::{ prelude::*, stages::{ diff --git a/bin/reth/src/chain/init.rs b/bin/reth/src/chain/init.rs index 9be1ae163..1a920f780 100644 --- a/bin/reth/src/chain/init.rs +++ b/bin/reth/src/chain/init.rs @@ -3,8 +3,9 @@ use crate::{ dirs::{DataDirPath, MaybePlatformPath}, }; use clap::Parser; +use reth_db::init_db; use reth_primitives::ChainSpec; -use reth_staged_sync::utils::init::{init_db, init_genesis}; +use reth_staged_sync::utils::init::init_genesis; use std::sync::Arc; use tracing::info; diff --git a/bin/reth/src/db/mod.rs b/bin/reth/src/db/mod.rs index 8940a1b54..d44a977ab 100644 --- a/bin/reth/src/db/mod.rs +++ b/bin/reth/src/db/mod.rs @@ -12,7 +12,7 @@ use reth_db::{ database::Database, mdbx::{Env, NoWriteMap, WriteMap}, version::{get_db_version, DatabaseVersionError, DB_VERSION}, - Tables, + DatabaseEnv, Tables, }; use reth_primitives::ChainSpec; use std::{path::Path, sync::Arc}; @@ -178,7 +178,7 @@ fn read_only_db(path: &Path) -> eyre::Result> { .with_context(|| format!("Could not open database at path: {}", path.display())) } -fn read_write_db(path: &Path) -> eyre::Result> { +fn read_write_db(path: &Path) -> eyre::Result { Env::::open(path, reth_db::mdbx::EnvKind::RW) .with_context(|| format!("Could not open database at path: {}", path.display())) } diff --git a/bin/reth/src/debug_cmd/execution.rs b/bin/reth/src/debug_cmd/execution.rs index 76fef4a28..fa3b8502a 100644 --- a/bin/reth/src/debug_cmd/execution.rs +++ b/bin/reth/src/debug_cmd/execution.rs @@ -10,10 +10,7 @@ use clap::Parser; use futures::{stream::select as stream_select, StreamExt}; use reth_beacon_consensus::BeaconConsensus; use reth_config::Config; -use reth_db::{ - database::Database, - mdbx::{Env, WriteMap}, -}; +use reth_db::{database::Database, init_db, DatabaseEnv}; use reth_discv4::DEFAULT_DISCOVERY_PORT; use reth_downloaders::{ bodies::bodies::BodiesDownloaderBuilder, @@ -27,7 +24,7 @@ use reth_network::NetworkHandle; use reth_network_api::NetworkInfo; use reth_primitives::{stage::StageId, BlockHashOrNumber, BlockNumber, ChainSpec, H256}; use reth_provider::{BlockExecutionWriter, ProviderFactory, StageCheckpointReader}; -use reth_staged_sync::utils::init::{init_db, init_genesis}; +use reth_staged_sync::utils::init::init_genesis; use reth_stages::{ sets::DefaultStages, stages::{ @@ -153,7 +150,7 @@ impl Command { &self, config: &Config, task_executor: TaskExecutor, - db: Arc>, + db: Arc, network_secret_path: PathBuf, default_peers_path: PathBuf, ) -> eyre::Result { diff --git a/bin/reth/src/debug_cmd/merkle.rs b/bin/reth/src/debug_cmd/merkle.rs index d1aaf291d..9b186506b 100644 --- a/bin/reth/src/debug_cmd/merkle.rs +++ b/bin/reth/src/debug_cmd/merkle.rs @@ -4,13 +4,12 @@ use crate::{ dirs::{DataDirPath, MaybePlatformPath}, }; use clap::Parser; -use reth_db::{cursor::DbCursorRO, tables, transaction::DbTx}; +use reth_db::{cursor::DbCursorRO, init_db, tables, transaction::DbTx}; use reth_primitives::{ stage::{StageCheckpoint, StageId}, ChainSpec, }; use reth_provider::{ProviderFactory, StageCheckpointReader}; -use reth_staged_sync::utils::init::init_db; use reth_stages::{ stages::{ AccountHashingStage, ExecutionStage, ExecutionStageThresholds, MerkleStage, diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index d7436e152..aaecdda51 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -20,10 +20,7 @@ use reth_blockchain_tree::{ config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree, }; use reth_config::Config; -use reth_db::{ - database::Database, - mdbx::{Env, WriteMap}, -}; +use reth_db::{database::Database, init_db, DatabaseEnv}; use reth_discv4::DEFAULT_DISCOVERY_PORT; use reth_downloaders::{ bodies::bodies::BodiesDownloaderBuilder, @@ -47,7 +44,7 @@ use reth_provider::{ use reth_revm::Factory; use reth_revm_inspectors::stack::Hook; use reth_rpc_engine_api::EngineApi; -use reth_staged_sync::utils::init::{init_db, init_genesis}; +use reth_staged_sync::utils::init::init_genesis; use reth_stages::{ prelude::*, stages::{ @@ -478,7 +475,7 @@ impl Command { } } - async fn start_metrics_endpoint(&self, db: Arc>) -> eyre::Result<()> { + async fn start_metrics_endpoint(&self, db: Arc) -> eyre::Result<()> { if let Some(listen_addr) = self.metrics { info!(target: "reth::cli", addr = %listen_addr, "Starting metrics endpoint"); prometheus_exporter::initialize(listen_addr, db, metrics_process::Collector::default()) @@ -519,7 +516,7 @@ impl Command { Ok(handle) } - fn lookup_head(&self, db: Arc>) -> Result { + fn lookup_head(&self, db: Arc) -> Result { let factory = ProviderFactory::new(db, self.chain.clone()); let provider = factory.provider()?; @@ -604,12 +601,12 @@ impl Command { fn load_network_config( &self, config: &Config, - db: Arc>, + db: Arc, executor: TaskExecutor, head: Head, secret_key: SecretKey, default_peers_path: PathBuf, - ) -> NetworkConfig>>> { + ) -> NetworkConfig>> { self.network .network_config(config, self.chain.clone(), secret_key, default_peers_path) .with_task_executor(Box::new(executor)) diff --git a/bin/reth/src/prometheus_exporter.rs b/bin/reth/src/prometheus_exporter.rs index 2dba64b12..f5635c88d 100644 --- a/bin/reth/src/prometheus_exporter.rs +++ b/bin/reth/src/prometheus_exporter.rs @@ -6,11 +6,7 @@ use hyper::{ }; use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle}; use metrics_util::layers::{PrefixLayer, Stack}; -use reth_db::{ - database::Database, - mdbx::{Env, WriteMap}, - tables, -}; +use reth_db::{database::Database, tables, DatabaseEnv}; use reth_metrics::metrics::{self, absolute_counter, describe_counter, Unit}; use std::{convert::Infallible, net::SocketAddr, sync::Arc}; @@ -73,7 +69,7 @@ async fn start_endpoint( /// metrics. pub(crate) async fn initialize( listen_addr: SocketAddr, - db: Arc>, + db: Arc, process: metrics_process::Collector, ) -> eyre::Result<()> { let db_stats = move || { diff --git a/bin/reth/src/stage/drop.rs b/bin/reth/src/stage/drop.rs index 8b4bf2836..4c2ed1585 100644 --- a/bin/reth/src/stage/drop.rs +++ b/bin/reth/src/stage/drop.rs @@ -10,6 +10,7 @@ use reth_db::{ mdbx::{Env, WriteMap}, tables, transaction::DbTxMut, + DatabaseEnv, }; use reth_primitives::{stage::StageId, ChainSpec}; use reth_staged_sync::utils::init::{insert_genesis_header, insert_genesis_state}; @@ -70,7 +71,7 @@ impl Command { tx.clear::()?; tx.clear::()?; tx.put::(StageId::Bodies.to_string(), Default::default())?; - insert_genesis_header::>(tx, self.chain)?; + insert_genesis_header::(tx, self.chain)?; } StageEnum::Senders => { tx.clear::()?; @@ -90,7 +91,7 @@ impl Command { StageId::Execution.to_string(), Default::default(), )?; - insert_genesis_state::>(tx, self.chain.genesis())?; + insert_genesis_state::(tx, self.chain.genesis())?; } StageEnum::AccountHashing => { tx.clear::()?; @@ -155,7 +156,7 @@ impl Command { StageId::TotalDifficulty.to_string(), Default::default(), )?; - insert_genesis_header::>(tx, self.chain)?; + insert_genesis_header::(tx, self.chain)?; } _ => { info!("Nothing to do for stage {:?}", self.stage); diff --git a/bin/reth/src/stage/dump/mod.rs b/bin/reth/src/stage/dump/mod.rs index 4ceffba6f..38add81da 100644 --- a/bin/reth/src/stage/dump/mod.rs +++ b/bin/reth/src/stage/dump/mod.rs @@ -5,10 +5,10 @@ use crate::{ }; use clap::Parser; use reth_db::{ - cursor::DbCursorRO, database::Database, table::TableImporter, tables, transaction::DbTx, + cursor::DbCursorRO, database::Database, init_db, table::TableImporter, tables, + transaction::DbTx, }; use reth_primitives::ChainSpec; -use reth_staged_sync::utils::init::init_db; use std::{path::PathBuf, sync::Arc}; use tracing::info; diff --git a/bin/reth/src/stage/run.rs b/bin/reth/src/stage/run.rs index 02d90514e..95760bd72 100644 --- a/bin/reth/src/stage/run.rs +++ b/bin/reth/src/stage/run.rs @@ -10,10 +10,10 @@ use crate::{ use clap::Parser; use reth_beacon_consensus::BeaconConsensus; use reth_config::Config; +use reth_db::init_db; use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder; use reth_primitives::ChainSpec; use reth_provider::{ProviderFactory, StageCheckpointReader}; -use reth_staged_sync::utils::init::init_db; use reth_stages::{ stages::{ AccountHashingStage, BodyStage, ExecutionStage, ExecutionStageThresholds, diff --git a/crates/blockchain-tree/src/blockchain_tree.rs b/crates/blockchain-tree/src/blockchain_tree.rs index 6b52ec098..d951939f4 100644 --- a/crates/blockchain-tree/src/blockchain_tree.rs +++ b/crates/blockchain-tree/src/blockchain_tree.rs @@ -1085,10 +1085,7 @@ mod tests { use crate::block_buffer::BufferedBlocks; use assert_matches::assert_matches; use linked_hash_set::LinkedHashSet; - use reth_db::{ - mdbx::{test_utils::create_test_rw_db, Env, WriteMap}, - transaction::DbTxMut, - }; + use reth_db::{mdbx::test_utils::create_test_rw_db, transaction::DbTxMut, DatabaseEnv}; use reth_interfaces::test_utils::TestConsensus; use reth_primitives::{ proofs::EMPTY_ROOT, stage::StageCheckpoint, ChainSpecBuilder, H256, MAINNET, @@ -1102,7 +1099,7 @@ mod tests { fn setup_externals( exec_res: Vec, - ) -> TreeExternals>, Arc, TestExecutorFactory> { + ) -> TreeExternals, Arc, TestExecutorFactory> { let db = create_test_rw_db(); let consensus = Arc::new(TestConsensus::default()); let chain_spec = Arc::new( diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 70e9fc007..96fc9ca0c 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -1361,7 +1361,7 @@ mod tests { config::BlockchainTreeConfig, externals::TreeExternals, post_state::PostState, BlockchainTree, ShareableBlockchainTree, }; - use reth_db::mdbx::{test_utils::create_test_rw_db, Env, WriteMap}; + use reth_db::{mdbx::test_utils::create_test_rw_db, DatabaseEnv}; use reth_interfaces::{ sync::NoopSyncStateUpdater, test_utils::{NoopFullBlockClient, TestConsensus}, @@ -1381,10 +1381,10 @@ mod tests { }; type TestBeaconConsensusEngine = BeaconConsensusEngine< - Arc>, + Arc, BlockchainProvider< - Arc>, - ShareableBlockchainTree>, TestConsensus, TestExecutorFactory>, + Arc, + ShareableBlockchainTree, TestConsensus, TestExecutorFactory>, >, NoopFullBlockClient, >; @@ -1498,7 +1498,7 @@ mod tests { } /// Builds the test consensus engine into a `TestConsensusEngine` and `TestEnv`. - fn build(self) -> (TestBeaconConsensusEngine, TestEnv>>) { + fn build(self) -> (TestBeaconConsensusEngine, TestEnv>) { reth_tracing::init_test_tracing(); let db = create_test_rw_db(); let consensus = TestConsensus::default(); diff --git a/crates/interfaces/src/blockchain_tree/error.rs b/crates/interfaces/src/blockchain_tree/error.rs index c8913911c..edcdb569b 100644 --- a/crates/interfaces/src/blockchain_tree/error.rs +++ b/crates/interfaces/src/blockchain_tree/error.rs @@ -272,6 +272,7 @@ impl From for InsertBlockErrorKind { Error::Database(err) => InsertBlockErrorKind::Internal(Box::new(err)), Error::Provider(err) => InsertBlockErrorKind::Internal(Box::new(err)), Error::Network(err) => InsertBlockErrorKind::Internal(Box::new(err)), + Error::Custom(err) => InsertBlockErrorKind::Internal(err.into()), } } } diff --git a/crates/interfaces/src/error.rs b/crates/interfaces/src/error.rs index 7e6e2d618..b8b72fc92 100644 --- a/crates/interfaces/src/error.rs +++ b/crates/interfaces/src/error.rs @@ -19,4 +19,7 @@ pub enum Error { #[error(transparent)] Network(#[from] reth_network_api::NetworkError), + + #[error("{0}")] + Custom(std::string::String), } diff --git a/crates/net/downloaders/src/bodies/test_utils.rs b/crates/net/downloaders/src/bodies/test_utils.rs index 1a77745e4..6aefe2e06 100644 --- a/crates/net/downloaders/src/bodies/test_utils.rs +++ b/crates/net/downloaders/src/bodies/test_utils.rs @@ -5,6 +5,7 @@ use reth_db::{ mdbx::{Env, WriteMap}, tables, transaction::DbTxMut, + DatabaseEnv, }; use reth_interfaces::{db, p2p::bodies::response::BlockResponse}; use reth_primitives::{Block, BlockBody, SealedBlock, SealedHeader, H256}; @@ -46,7 +47,7 @@ pub(crate) fn create_raw_bodies<'a>( } #[inline] -pub(crate) fn insert_headers(db: &Env, headers: &[SealedHeader]) { +pub(crate) fn insert_headers(db: &DatabaseEnv, headers: &[SealedHeader]) { db.update(|tx| -> Result<(), db::DatabaseError> { for header in headers { tx.put::(header.number, header.hash())?; diff --git a/crates/staged-sync/src/utils/init.rs b/crates/staged-sync/src/utils/init.rs index aadf2d6d9..b80554e14 100644 --- a/crates/staged-sync/src/utils/init.rs +++ b/crates/staged-sync/src/utils/init.rs @@ -1,40 +1,14 @@ -use eyre::WrapErr; use reth_db::{ cursor::DbCursorRO, database::{Database, DatabaseGAT}, - is_database_empty, - mdbx::{Env, WriteMap}, tables, transaction::{DbTx, DbTxMut}, - version::{check_db_version_file, create_db_version_file, DatabaseVersionError}, }; use reth_primitives::{stage::StageId, Account, Bytecode, ChainSpec, StorageEntry, H256, U256}; use reth_provider::{DatabaseProviderRW, HashingWriter, HistoryWriter, PostState, ProviderFactory}; -use std::{collections::BTreeMap, fs, path::Path, sync::Arc}; +use std::{collections::BTreeMap, sync::Arc}; use tracing::debug; -/// Opens up an existing database or creates a new one at the specified path. -pub fn init_db>(path: P) -> eyre::Result> { - if is_database_empty(&path) { - fs::create_dir_all(&path).wrap_err_with(|| { - format!("Could not create database directory {}", path.as_ref().display()) - })?; - create_db_version_file(&path)?; - } else { - match check_db_version_file(&path) { - Ok(_) => (), - Err(DatabaseVersionError::MissingFile) => create_db_version_file(&path)?, - Err(err) => return Err(err.into()), - } - } - - let db = Env::::open(path.as_ref(), reth_db::mdbx::EnvKind::RW)?; - - db.create_tables()?; - - Ok(db) -} - /// Database initialization error type. #[derive(Debug, thiserror::Error, PartialEq, Eq, Clone)] pub enum InitDatabaseError { @@ -202,19 +176,18 @@ pub fn insert_genesis_header( #[cfg(test)] mod tests { use super::*; - use assert_matches::assert_matches; + use reth_db::{ mdbx::test_utils::create_test_rw_db, models::{storage_sharded_key::StorageShardedKey, ShardedKey}, table::Table, - version::db_version_file_path, + DatabaseEnv, }; use reth_primitives::{ Address, Chain, ForkTimestamps, Genesis, GenesisAccount, IntegerList, GOERLI, GOERLI_GENESIS, MAINNET, MAINNET_GENESIS, SEPOLIA, SEPOLIA_GENESIS, }; use std::collections::HashMap; - use tempfile::tempdir; fn collect_table_entries( tx: &>::TX, @@ -305,7 +278,7 @@ mod tests { let tx = db.tx().expect("failed to init tx"); assert_eq!( - collect_table_entries::>, tables::AccountHistory>(&tx) + collect_table_entries::, tables::AccountHistory>(&tx) .expect("failed to collect"), vec![ (ShardedKey::new(address_with_balance, u64::MAX), IntegerList::new([0]).unwrap()), @@ -314,7 +287,7 @@ mod tests { ); assert_eq!( - collect_table_entries::>, tables::StorageHistory>(&tx) + collect_table_entries::, tables::StorageHistory>(&tx) .expect("failed to collect"), vec![( StorageShardedKey::new(address_with_storage, storage_key, u64::MAX), @@ -322,43 +295,4 @@ mod tests { )], ); } - - #[test] - fn db_version() { - let path = tempdir().unwrap(); - - // Database is empty - { - let db = init_db(&path); - assert_matches!(db, Ok(_)); - } - - // Database is not empty, current version is the same as in the file - { - let db = init_db(&path); - assert_matches!(db, Ok(_)); - } - - // Database is not empty, version file is malformed - { - fs::write(path.path().join(db_version_file_path(&path)), "invalid-version").unwrap(); - let db = init_db(&path); - assert!(db.is_err()); - assert_matches!( - db.unwrap_err().downcast_ref::(), - Some(DatabaseVersionError::MalformedFile) - ) - } - - // Database is not empty, version file contains not matching version - { - fs::write(path.path().join(db_version_file_path(&path)), "0").unwrap(); - let db = init_db(&path); - assert!(db.is_err()); - assert_matches!( - db.unwrap_err().downcast_ref::(), - Some(DatabaseVersionError::VersionMismatch { version: 0 }) - ) - } - } } diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs index 2e9fca48d..8fce2e370 100644 --- a/crates/stages/benches/criterion.rs +++ b/crates/stages/benches/criterion.rs @@ -3,7 +3,7 @@ use criterion::{ BenchmarkGroup, Criterion, }; use pprof::criterion::{Output, PProfProfiler}; -use reth_db::mdbx::{Env, WriteMap}; +use reth_db::DatabaseEnv; use reth_interfaces::test_utils::TestConsensus; use reth_primitives::{stage::StageCheckpoint, MAINNET}; use reth_provider::ProviderFactory; @@ -122,7 +122,7 @@ fn measure_stage_with_path( stage_range: StageRange, label: String, ) where - S: Clone + Stage>, + S: Clone + Stage, F: Fn(S, &TestTransaction, StageRange), { let tx = TestTransaction::new(&path); @@ -152,7 +152,7 @@ fn measure_stage( block_interval: std::ops::Range, label: String, ) where - S: Clone + Stage>, + S: Clone + Stage, F: Fn(S, &TestTransaction, StageRange), { let path = setup::txs_testdata(block_interval.end); diff --git a/crates/stages/benches/setup/mod.rs b/crates/stages/benches/setup/mod.rs index f3af8b075..0c2a6dc19 100644 --- a/crates/stages/benches/setup/mod.rs +++ b/crates/stages/benches/setup/mod.rs @@ -1,9 +1,9 @@ use itertools::concat; use reth_db::{ cursor::DbCursorRO, - mdbx::{Env, WriteMap}, tables, transaction::{DbTx, DbTxMut}, + DatabaseEnv, }; use reth_interfaces::test_utils::{ generators, @@ -32,7 +32,7 @@ pub use account_hashing::*; pub(crate) type StageRange = (ExecInput, UnwindInput); -pub(crate) fn stage_unwind>>( +pub(crate) fn stage_unwind>( stage: S, tx: &TestTransaction, range: StageRange, @@ -60,7 +60,7 @@ pub(crate) fn stage_unwind>>( }); } -pub(crate) fn unwind_hashes>>( +pub(crate) fn unwind_hashes>( stage: S, tx: &TestTransaction, range: StageRange, diff --git a/crates/stages/src/stage.rs b/crates/stages/src/stage.rs index 09519d34b..7580c6bab 100644 --- a/crates/stages/src/stage.rs +++ b/crates/stages/src/stage.rs @@ -5,7 +5,7 @@ use reth_primitives::{ stage::{StageCheckpoint, StageId}, BlockNumber, TxNumber, }; -use reth_provider::DatabaseProviderRW; +use reth_provider::{BlockReader, DatabaseProviderRW, ProviderError}; use std::{ cmp::{max, min}, ops::RangeInclusive, @@ -79,7 +79,9 @@ impl ExecInput { tx_threshold: u64, ) -> Result<(RangeInclusive, RangeInclusive, bool), StageError> { let start_block = self.next_block(); - let start_block_body = provider.block_body_indices(start_block)?; + let start_block_body = provider + .block_body_indices(start_block)? + .ok_or(ProviderError::BlockBodyIndicesNotFound(start_block))?; let target_block = self.target(); diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index 14f6302d8..e1a003dce 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -456,10 +456,10 @@ mod tests { use reth_db::{ cursor::DbCursorRO, database::Database, - mdbx::{Env, WriteMap}, models::{StoredBlockBodyIndices, StoredBlockOmmers}, tables, transaction::{DbTx, DbTxMut}, + DatabaseEnv, }; use reth_interfaces::{ p2p::{ @@ -740,7 +740,7 @@ mod tests { /// A [BodyDownloader] that is backed by an internal [HashMap] for testing. #[derive(Debug)] pub(crate) struct TestBodyDownloader { - db: Arc>, + db: Arc, responses: HashMap, headers: VecDeque, batch_size: u64, @@ -748,7 +748,7 @@ mod tests { impl TestBodyDownloader { pub(crate) fn new( - db: Arc>, + db: Arc, responses: HashMap, batch_size: u64, ) -> Self { diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 68038cc73..06440410c 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -353,7 +353,10 @@ impl Stage for ExecutionStage { } // Look up the start index for the transaction range - let first_tx_num = provider.block_body_indices(*range.start())?.first_tx_num(); + let first_tx_num = provider + .block_body_indices(*range.start())? + .ok_or(ProviderError::BlockBodyIndicesNotFound(*range.start()))? + .first_tx_num(); let mut stage_checkpoint = input.checkpoint.execution_stage_checkpoint(); diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 26def0568..482ed1eb4 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -13,7 +13,7 @@ use reth_primitives::{ stage::{EntitiesCheckpoint, StageCheckpoint, StageId}, TransactionSignedNoHash, TxNumber, H160, }; -use reth_provider::{DatabaseProviderRW, HeaderProvider, ProviderError}; +use reth_provider::{BlockReader, DatabaseProviderRW, HeaderProvider, ProviderError}; use std::fmt::Debug; use thiserror::Error; use tokio::sync::mpsc; @@ -173,7 +173,10 @@ impl Stage for SenderRecoveryStage { let (_, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold); // Lookup latest tx id that we should unwind to - let latest_tx_id = provider.block_body_indices(unwind_to)?.last_tx_num(); + let latest_tx_id = provider + .block_body_indices(unwind_to)? + .ok_or(ProviderError::BlockBodyIndicesNotFound(unwind_to))? + .last_tx_num(); provider.unwind_table_by_num::(latest_tx_id)?; Ok(UnwindOutput { @@ -386,7 +389,11 @@ mod tests { /// 2. If the is no requested block entry in the bodies table, but [tables::TxSenders] is /// not empty. fn ensure_no_senders_by_block(&self, block: BlockNumber) -> Result<(), TestRunnerError> { - let body_result = self.tx.inner_rw().block_body_indices(block); + let body_result = self + .tx + .inner_rw() + .block_body_indices(block)? + .ok_or(ProviderError::BlockBodyIndicesNotFound(block)); match body_result { Ok(body) => self .tx diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index 2d32db03d..f572a7f27 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -203,7 +203,7 @@ mod tests { generators::{random_block, random_block_range}, }; use reth_primitives::{stage::StageUnitCheckpoint, BlockNumber, SealedBlock, H256}; - use reth_provider::TransactionsProvider; + use reth_provider::{BlockReader, ProviderError, TransactionsProvider}; // Implement stage test suite. stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup); @@ -345,7 +345,11 @@ mod tests { /// 2. If the is no requested block entry in the bodies table, but [tables::TxHashNumber] is /// not empty. fn ensure_no_hash_by_block(&self, number: BlockNumber) -> Result<(), TestRunnerError> { - let body_result = self.tx.inner_rw().block_body_indices(number); + let body_result = self + .tx + .inner_rw() + .block_body_indices(number)? + .ok_or(ProviderError::BlockBodyIndicesNotFound(number)); match body_result { Ok(body) => self.tx.ensure_no_entry_above_by_value::( body.last_tx_num(), diff --git a/crates/stages/src/test_utils/runner.rs b/crates/stages/src/test_utils/runner.rs index 83d97a038..e726948e1 100644 --- a/crates/stages/src/test_utils/runner.rs +++ b/crates/stages/src/test_utils/runner.rs @@ -1,6 +1,9 @@ use super::TestTransaction; use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; -use reth_db::mdbx::{Env, WriteMap}; +use reth_db::{ + mdbx::{Env, WriteMap}, + DatabaseEnv, +}; use reth_primitives::MAINNET; use reth_provider::ProviderFactory; use std::{borrow::Borrow, sync::Arc}; @@ -19,7 +22,7 @@ pub(crate) enum TestRunnerError { /// A generic test runner for stages. #[async_trait::async_trait] pub(crate) trait StageTestRunner { - type S: Stage> + 'static; + type S: Stage + 'static; /// Return a reference to the database. fn tx(&self) -> &TestTransaction; diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index 6d4a62595..ef2c04c5a 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -10,7 +10,7 @@ use reth_db::{ table::Table, tables, transaction::{DbTx, DbTxMut}, - DatabaseError as DbError, + DatabaseEnv, DatabaseError as DbError, }; use reth_primitives::{ keccak256, Account, Address, BlockNumber, SealedBlock, SealedHeader, StorageEntry, H256, @@ -35,9 +35,9 @@ use std::{ #[derive(Debug)] pub struct TestTransaction { /// WriteMap DB - pub tx: Arc>, + pub tx: Arc, pub path: Option, - pub factory: ProviderFactory>>, + pub factory: ProviderFactory>, } impl Default for TestTransaction { @@ -59,17 +59,17 @@ impl TestTransaction { } /// Return a database wrapped in [DatabaseProviderRW]. - pub fn inner_rw(&self) -> DatabaseProviderRW<'_, Arc>> { + pub fn inner_rw(&self) -> DatabaseProviderRW<'_, Arc> { self.factory.provider_rw().expect("failed to create db container") } /// Return a database wrapped in [DatabaseProviderRO]. - pub fn inner(&self) -> DatabaseProviderRO<'_, Arc>> { + pub fn inner(&self) -> DatabaseProviderRO<'_, Arc> { self.factory.provider().expect("failed to create db container") } /// Get a pointer to an internal database. - pub fn inner_raw(&self) -> Arc> { + pub fn inner_raw(&self) -> Arc { self.tx.clone() } diff --git a/crates/storage/db/Cargo.toml b/crates/storage/db/Cargo.toml index 5254d69bb..5b5987b31 100644 --- a/crates/storage/db/Cargo.toml +++ b/crates/storage/db/Cargo.toml @@ -38,6 +38,7 @@ thiserror = { workspace = true } tempfile = { version = "3.3.0", optional = true } parking_lot = "0.12" derive_more = "0.99" +eyre = "0.6.8" # arbitrary utils arbitrary = { version = "1.1.7", features = ["derive"], optional = true } diff --git a/crates/storage/db/benches/hash_keys.rs b/crates/storage/db/benches/hash_keys.rs index 078ae5bee..7b8fe3dc1 100644 --- a/crates/storage/db/benches/hash_keys.rs +++ b/crates/storage/db/benches/hash_keys.rs @@ -162,10 +162,7 @@ where (preload, input) } -fn append( - db: Env, - input: Vec<(::Key, ::Value)>, -) -> Env +fn append(db: DatabaseEnv, input: Vec<(::Key, ::Value)>) -> DatabaseEnv where T: Table + Default, { @@ -183,10 +180,7 @@ where db } -fn insert( - db: Env, - input: Vec<(::Key, ::Value)>, -) -> Env +fn insert(db: DatabaseEnv, input: Vec<(::Key, ::Value)>) -> DatabaseEnv where T: Table + Default, { @@ -204,7 +198,7 @@ where db } -fn put(db: Env, input: Vec<(::Key, ::Value)>) -> Env +fn put(db: DatabaseEnv, input: Vec<(::Key, ::Value)>) -> DatabaseEnv where T: Table + Default, { @@ -231,7 +225,7 @@ struct TableStats { size: usize, } -fn get_table_stats(db: Env) +fn get_table_stats(db: DatabaseEnv) where T: Table + Default, { diff --git a/crates/storage/db/benches/utils.rs b/crates/storage/db/benches/utils.rs index 8f159ec1d..30f8f6ac4 100644 --- a/crates/storage/db/benches/utils.rs +++ b/crates/storage/db/benches/utils.rs @@ -1,3 +1,4 @@ +use reth_db::DatabaseEnv; #[allow(unused_imports)] use reth_db::{ database::Database, @@ -51,7 +52,7 @@ where fn set_up_db( bench_db_path: &Path, pair: &Vec<(::Key, bytes::Bytes, ::Value, bytes::Bytes)>, -) -> reth_db::mdbx::Env +) -> DatabaseEnv where T: Table + Default, T::Key: Default + Clone, diff --git a/crates/storage/db/src/lib.rs b/crates/storage/db/src/lib.rs index 49bdad206..c2b5d539d 100644 --- a/crates/storage/db/src/lib.rs +++ b/crates/storage/db/src/lib.rs @@ -87,3 +87,89 @@ pub use abstraction::*; pub use reth_interfaces::db::DatabaseError; pub use tables::*; pub use utils::is_database_empty; + +#[cfg(feature = "mdbx")] +use mdbx::{Env, EnvKind, WriteMap}; + +#[cfg(feature = "mdbx")] +/// Alias type for the database engine in use. +pub type DatabaseEnv = Env; + +/// Opens up an existing database or creates a new one at the specified path. +pub fn init_db>(path: P) -> eyre::Result { + use crate::version::{check_db_version_file, create_db_version_file, DatabaseVersionError}; + use eyre::WrapErr; + + let rpath = path.as_ref(); + if is_database_empty(rpath) { + std::fs::create_dir_all(rpath) + .wrap_err_with(|| format!("Could not create database directory {}", rpath.display()))?; + create_db_version_file(rpath)?; + } else { + match check_db_version_file(rpath) { + Ok(_) => (), + Err(DatabaseVersionError::MissingFile) => create_db_version_file(rpath)?, + Err(err) => return Err(err.into()), + } + } + #[cfg(feature = "mdbx")] + { + let db = Env::::open(rpath, EnvKind::RW)?; + db.create_tables()?; + Ok(db) + } + #[cfg(not(feature = "mdbx"))] + { + unimplemented!(); + } +} + +#[cfg(test)] +mod tests { + use crate::{ + init_db, + version::{db_version_file_path, DatabaseVersionError}, + }; + use assert_matches::assert_matches; + use tempfile::tempdir; + + #[test] + fn db_version() { + let path = tempdir().unwrap(); + + // Database is empty + { + let db = init_db(&path); + assert_matches!(db, Ok(_)); + } + + // Database is not empty, current version is the same as in the file + { + let db = init_db(&path); + assert_matches!(db, Ok(_)); + } + + // Database is not empty, version file is malformed + { + std::fs::write(path.path().join(db_version_file_path(&path)), "invalid-version") + .unwrap(); + let db = init_db(&path); + assert!(db.is_err()); + assert_matches!( + db.unwrap_err().downcast_ref::(), + Some(DatabaseVersionError::MalformedFile) + ) + } + + // Database is not empty, version file contains not matching version + { + std::fs::write(path.path().join(db_version_file_path(&path)), "0").unwrap(); + let db = init_db(&path); + assert!(db.is_err()); + assert_matches!( + db.unwrap_err().downcast_ref::(), + Some(DatabaseVersionError::VersionMismatch { version: 0 }) + ) + } + } +} diff --git a/crates/storage/provider/Cargo.toml b/crates/storage/provider/Cargo.toml index 25b7e9f4c..07a2a9a02 100644 --- a/crates/storage/provider/Cargo.toml +++ b/crates/storage/provider/Cargo.toml @@ -39,6 +39,7 @@ reth-primitives = { workspace = true, features = ["arbitrary", "test-utils"] } reth-rlp = { workspace = true } reth-trie = { path = "../../trie", features = ["test-utils"] } parking_lot = "0.12" +tempfile = "3.3" [features] test-utils = ["reth-rlp"] diff --git a/crates/storage/provider/src/post_state/mod.rs b/crates/storage/provider/src/post_state/mod.rs index e9eb216f1..1ab52b1b9 100644 --- a/crates/storage/provider/src/post_state/mod.rs +++ b/crates/storage/provider/src/post_state/mod.rs @@ -643,8 +643,9 @@ mod tests { use crate::{AccountReader, ProviderFactory}; use reth_db::{ database::Database, - mdbx::{test_utils, Env, EnvKind, WriteMap}, + mdbx::{test_utils, EnvKind}, transaction::DbTx, + DatabaseEnv, }; use reth_primitives::{proofs::EMPTY_ROOT, MAINNET}; use reth_trie::test_utils::state_root; @@ -1066,7 +1067,7 @@ mod tests { #[test] fn write_to_db_account_info() { - let db: Arc> = test_utils::create_test_db(EnvKind::RW); + let db: Arc = test_utils::create_test_db(EnvKind::RW); let factory = ProviderFactory::new(db, MAINNET.clone()); let provider = factory.provider_rw().unwrap(); @@ -1135,7 +1136,7 @@ mod tests { #[test] fn write_to_db_storage() { - let db: Arc> = test_utils::create_test_db(EnvKind::RW); + let db: Arc = test_utils::create_test_db(EnvKind::RW); let tx = db.tx_mut().expect("Could not get database tx"); let mut post_state = PostState::new(); @@ -1271,7 +1272,7 @@ mod tests { #[test] fn write_to_db_multiple_selfdestructs() { - let db: Arc> = test_utils::create_test_db(EnvKind::RW); + let db: Arc = test_utils::create_test_db(EnvKind::RW); let tx = db.tx_mut().expect("Could not get database tx"); let address1 = Address::random(); @@ -1820,7 +1821,7 @@ mod tests { #[test] fn empty_post_state_state_root() { - let db: Arc> = test_utils::create_test_db(EnvKind::RW); + let db: Arc = test_utils::create_test_db(EnvKind::RW); let tx = db.tx().unwrap(); let post_state = PostState::new(); @@ -1839,7 +1840,7 @@ mod tests { }) .collect(); - let db: Arc> = test_utils::create_test_db(EnvKind::RW); + let db: Arc = test_utils::create_test_db(EnvKind::RW); // insert initial state to the database db.update(|tx| { diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index ebfefd57c..acfdff010 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -4,7 +4,7 @@ use crate::{ BlockHashReader, BlockNumReader, BlockReader, EvmEnvProvider, HeaderProvider, ProviderError, StageCheckpointReader, StateProviderBox, TransactionsProvider, WithdrawalsProvider, }; -use reth_db::{database::Database, models::StoredBlockBodyIndices}; +use reth_db::{database::Database, init_db, models::StoredBlockBodyIndices, DatabaseEnv}; use reth_interfaces::Result; use reth_primitives::{ stage::{StageCheckpoint, StageId}, @@ -54,6 +54,20 @@ impl ProviderFactory { } } +impl ProviderFactory { + /// create new database provider by passing a path. [`ProviderFactory`] will own the database + /// instance. + pub fn new_with_database_path>( + path: P, + chain_spec: Arc, + ) -> Result> { + Ok(ProviderFactory:: { + db: init_db(path).map_err(|e| reth_interfaces::Error::Custom(e.to_string()))?, + chain_spec, + }) + } +} + impl Clone for ProviderFactory { fn clone(&self) -> Self { Self { db: self.db.clone(), chain_spec: Arc::clone(&self.chain_spec) } @@ -333,7 +347,13 @@ impl EvmEnvProvider for ProviderFactory { mod tests { use super::ProviderFactory; use crate::{BlockHashReader, BlockNumReader}; - use reth_db::mdbx::{test_utils::create_test_db, EnvKind, WriteMap}; + use reth_db::{ + mdbx::{ + test_utils::{create_test_db, ERROR_TEMPDIR}, + EnvKind, WriteMap, + }, + DatabaseEnv, + }; use reth_primitives::{ChainSpecBuilder, H256}; use std::sync::Arc; @@ -368,4 +388,20 @@ mod tests { provider_rw.block_hash(0).unwrap(); provider.block_hash(0).unwrap(); } + + #[test] + fn provider_factory_with_database_path() { + let chain_spec = ChainSpecBuilder::mainnet().build(); + let factory = ProviderFactory::::new_with_database_path( + tempfile::TempDir::new().expect(ERROR_TEMPDIR).into_path(), + Arc::new(chain_spec), + ) + .unwrap(); + + let provider = factory.provider().unwrap(); + provider.block_hash(0).unwrap(); + let provider_rw = factory.provider_rw().unwrap(); + provider_rw.block_hash(0).unwrap(); + provider.block_hash(0).unwrap(); + } } diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index d0842f6c5..8ce2855e9 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -560,15 +560,6 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { Ok(blocks) } - /// Query the block body by number. - pub fn block_body_indices(&self, number: BlockNumber) -> Result { - let body = self - .tx - .get::(number)? - .ok_or(ProviderError::BlockBodyIndicesNotFound(number))?; - Ok(body) - } - /// Unwind table by some number key. /// Returns number of rows unwound. /// @@ -1695,9 +1686,8 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> BlockWriter for DatabaseProvider<' block.difficulty } else { let parent_block_number = block.number - 1; - let parent_ttd = - self.tx.get::(parent_block_number)?.unwrap_or_default(); - parent_ttd.0 + block.difficulty + let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default(); + parent_ttd + block.difficulty }; self.tx.put::(block.number, ttd.into())?; diff --git a/crates/trie/src/trie.rs b/crates/trie/src/trie.rs index ccf9b6ebe..679d98a76 100644 --- a/crates/trie/src/trie.rs +++ b/crates/trie/src/trie.rs @@ -514,9 +514,10 @@ mod tests { use proptest::{prelude::ProptestConfig, proptest}; use reth_db::{ cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO}, - mdbx::{test_utils::create_test_rw_db, Env, WriteMap}, + mdbx::test_utils::create_test_rw_db, tables, transaction::DbTxMut, + DatabaseEnv, }; use reth_primitives::{ hex_literal::hex, @@ -1278,7 +1279,7 @@ mod tests { } fn extension_node_storage_trie( - tx: &DatabaseProviderRW<'_, &Env>, + tx: &DatabaseProviderRW<'_, &DatabaseEnv>, hashed_address: H256, ) -> (H256, HashMap) { let value = U256::from(1); @@ -1304,7 +1305,7 @@ mod tests { (root, updates) } - fn extension_node_trie(tx: &DatabaseProviderRW<'_, &Env>) -> H256 { + fn extension_node_trie(tx: &DatabaseProviderRW<'_, &DatabaseEnv>) -> H256 { let a = Account { nonce: 0, balance: U256::from(1u64), bytecode_hash: Some(H256::random()) }; let val = encode_account(a, None);