refactor: move init_db to reth_db and add DatabaseEnv (#3450)

This commit is contained in:
joshieDo
2023-06-28 22:03:00 +01:00
committed by GitHub
parent 6e2fa845d8
commit a53af3a0f2
35 changed files with 234 additions and 175 deletions

2
Cargo.lock generated
View File

@ -5108,6 +5108,7 @@ dependencies = [
"bytes", "bytes",
"criterion", "criterion",
"derive_more", "derive_more",
"eyre",
"futures", "futures",
"heapless", "heapless",
"iai", "iai",
@ -5539,6 +5540,7 @@ dependencies = [
"reth-revm-primitives", "reth-revm-primitives",
"reth-rlp", "reth-rlp",
"reth-trie", "reth-trie",
"tempfile",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tracing", "tracing",

View File

@ -11,14 +11,14 @@ use reth_provider::{ProviderFactory, StageCheckpointReader};
use crate::args::utils::genesis_value_parser; use crate::args::utils::genesis_value_parser;
use reth_config::Config; use reth_config::Config;
use reth_db::database::Database; use reth_db::{database::Database, init_db};
use reth_downloaders::{ use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder, bodies::bodies::BodiesDownloaderBuilder,
headers::reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::FileClient, headers::reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::FileClient,
}; };
use reth_interfaces::consensus::Consensus; use reth_interfaces::consensus::Consensus;
use reth_primitives::{stage::StageId, ChainSpec, H256}; use reth_primitives::{stage::StageId, ChainSpec, H256};
use reth_staged_sync::utils::init::{init_db, init_genesis}; use reth_staged_sync::utils::init::init_genesis;
use reth_stages::{ use reth_stages::{
prelude::*, prelude::*,
stages::{ stages::{

View File

@ -3,8 +3,9 @@ use crate::{
dirs::{DataDirPath, MaybePlatformPath}, dirs::{DataDirPath, MaybePlatformPath},
}; };
use clap::Parser; use clap::Parser;
use reth_db::init_db;
use reth_primitives::ChainSpec; use reth_primitives::ChainSpec;
use reth_staged_sync::utils::init::{init_db, init_genesis}; use reth_staged_sync::utils::init::init_genesis;
use std::sync::Arc; use std::sync::Arc;
use tracing::info; use tracing::info;

View File

@ -12,7 +12,7 @@ use reth_db::{
database::Database, database::Database,
mdbx::{Env, NoWriteMap, WriteMap}, mdbx::{Env, NoWriteMap, WriteMap},
version::{get_db_version, DatabaseVersionError, DB_VERSION}, version::{get_db_version, DatabaseVersionError, DB_VERSION},
Tables, DatabaseEnv, Tables,
}; };
use reth_primitives::ChainSpec; use reth_primitives::ChainSpec;
use std::{path::Path, sync::Arc}; use std::{path::Path, sync::Arc};
@ -178,7 +178,7 @@ fn read_only_db(path: &Path) -> eyre::Result<Env<NoWriteMap>> {
.with_context(|| format!("Could not open database at path: {}", path.display())) .with_context(|| format!("Could not open database at path: {}", path.display()))
} }
fn read_write_db(path: &Path) -> eyre::Result<Env<WriteMap>> { fn read_write_db(path: &Path) -> eyre::Result<DatabaseEnv> {
Env::<WriteMap>::open(path, reth_db::mdbx::EnvKind::RW) Env::<WriteMap>::open(path, reth_db::mdbx::EnvKind::RW)
.with_context(|| format!("Could not open database at path: {}", path.display())) .with_context(|| format!("Could not open database at path: {}", path.display()))
} }

View File

@ -10,10 +10,7 @@ use clap::Parser;
use futures::{stream::select as stream_select, StreamExt}; use futures::{stream::select as stream_select, StreamExt};
use reth_beacon_consensus::BeaconConsensus; use reth_beacon_consensus::BeaconConsensus;
use reth_config::Config; use reth_config::Config;
use reth_db::{ use reth_db::{database::Database, init_db, DatabaseEnv};
database::Database,
mdbx::{Env, WriteMap},
};
use reth_discv4::DEFAULT_DISCOVERY_PORT; use reth_discv4::DEFAULT_DISCOVERY_PORT;
use reth_downloaders::{ use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder, bodies::bodies::BodiesDownloaderBuilder,
@ -27,7 +24,7 @@ use reth_network::NetworkHandle;
use reth_network_api::NetworkInfo; use reth_network_api::NetworkInfo;
use reth_primitives::{stage::StageId, BlockHashOrNumber, BlockNumber, ChainSpec, H256}; use reth_primitives::{stage::StageId, BlockHashOrNumber, BlockNumber, ChainSpec, H256};
use reth_provider::{BlockExecutionWriter, ProviderFactory, StageCheckpointReader}; use reth_provider::{BlockExecutionWriter, ProviderFactory, StageCheckpointReader};
use reth_staged_sync::utils::init::{init_db, init_genesis}; use reth_staged_sync::utils::init::init_genesis;
use reth_stages::{ use reth_stages::{
sets::DefaultStages, sets::DefaultStages,
stages::{ stages::{
@ -153,7 +150,7 @@ impl Command {
&self, &self,
config: &Config, config: &Config,
task_executor: TaskExecutor, task_executor: TaskExecutor,
db: Arc<Env<WriteMap>>, db: Arc<DatabaseEnv>,
network_secret_path: PathBuf, network_secret_path: PathBuf,
default_peers_path: PathBuf, default_peers_path: PathBuf,
) -> eyre::Result<NetworkHandle> { ) -> eyre::Result<NetworkHandle> {

View File

@ -4,13 +4,12 @@ use crate::{
dirs::{DataDirPath, MaybePlatformPath}, dirs::{DataDirPath, MaybePlatformPath},
}; };
use clap::Parser; use clap::Parser;
use reth_db::{cursor::DbCursorRO, tables, transaction::DbTx}; use reth_db::{cursor::DbCursorRO, init_db, tables, transaction::DbTx};
use reth_primitives::{ use reth_primitives::{
stage::{StageCheckpoint, StageId}, stage::{StageCheckpoint, StageId},
ChainSpec, ChainSpec,
}; };
use reth_provider::{ProviderFactory, StageCheckpointReader}; use reth_provider::{ProviderFactory, StageCheckpointReader};
use reth_staged_sync::utils::init::init_db;
use reth_stages::{ use reth_stages::{
stages::{ stages::{
AccountHashingStage, ExecutionStage, ExecutionStageThresholds, MerkleStage, AccountHashingStage, ExecutionStage, ExecutionStageThresholds, MerkleStage,

View File

@ -20,10 +20,7 @@ use reth_blockchain_tree::{
config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree, config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree,
}; };
use reth_config::Config; use reth_config::Config;
use reth_db::{ use reth_db::{database::Database, init_db, DatabaseEnv};
database::Database,
mdbx::{Env, WriteMap},
};
use reth_discv4::DEFAULT_DISCOVERY_PORT; use reth_discv4::DEFAULT_DISCOVERY_PORT;
use reth_downloaders::{ use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder, bodies::bodies::BodiesDownloaderBuilder,
@ -47,7 +44,7 @@ use reth_provider::{
use reth_revm::Factory; use reth_revm::Factory;
use reth_revm_inspectors::stack::Hook; use reth_revm_inspectors::stack::Hook;
use reth_rpc_engine_api::EngineApi; use reth_rpc_engine_api::EngineApi;
use reth_staged_sync::utils::init::{init_db, init_genesis}; use reth_staged_sync::utils::init::init_genesis;
use reth_stages::{ use reth_stages::{
prelude::*, prelude::*,
stages::{ stages::{
@ -478,7 +475,7 @@ impl Command {
} }
} }
async fn start_metrics_endpoint(&self, db: Arc<Env<WriteMap>>) -> eyre::Result<()> { async fn start_metrics_endpoint(&self, db: Arc<DatabaseEnv>) -> eyre::Result<()> {
if let Some(listen_addr) = self.metrics { if let Some(listen_addr) = self.metrics {
info!(target: "reth::cli", addr = %listen_addr, "Starting metrics endpoint"); info!(target: "reth::cli", addr = %listen_addr, "Starting metrics endpoint");
prometheus_exporter::initialize(listen_addr, db, metrics_process::Collector::default()) prometheus_exporter::initialize(listen_addr, db, metrics_process::Collector::default())
@ -519,7 +516,7 @@ impl Command {
Ok(handle) Ok(handle)
} }
fn lookup_head(&self, db: Arc<Env<WriteMap>>) -> Result<Head, reth_interfaces::Error> { fn lookup_head(&self, db: Arc<DatabaseEnv>) -> Result<Head, reth_interfaces::Error> {
let factory = ProviderFactory::new(db, self.chain.clone()); let factory = ProviderFactory::new(db, self.chain.clone());
let provider = factory.provider()?; let provider = factory.provider()?;
@ -604,12 +601,12 @@ impl Command {
fn load_network_config( fn load_network_config(
&self, &self,
config: &Config, config: &Config,
db: Arc<Env<WriteMap>>, db: Arc<DatabaseEnv>,
executor: TaskExecutor, executor: TaskExecutor,
head: Head, head: Head,
secret_key: SecretKey, secret_key: SecretKey,
default_peers_path: PathBuf, default_peers_path: PathBuf,
) -> NetworkConfig<ProviderFactory<Arc<Env<WriteMap>>>> { ) -> NetworkConfig<ProviderFactory<Arc<DatabaseEnv>>> {
self.network self.network
.network_config(config, self.chain.clone(), secret_key, default_peers_path) .network_config(config, self.chain.clone(), secret_key, default_peers_path)
.with_task_executor(Box::new(executor)) .with_task_executor(Box::new(executor))

View File

@ -6,11 +6,7 @@ use hyper::{
}; };
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle}; use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
use metrics_util::layers::{PrefixLayer, Stack}; use metrics_util::layers::{PrefixLayer, Stack};
use reth_db::{ use reth_db::{database::Database, tables, DatabaseEnv};
database::Database,
mdbx::{Env, WriteMap},
tables,
};
use reth_metrics::metrics::{self, absolute_counter, describe_counter, Unit}; use reth_metrics::metrics::{self, absolute_counter, describe_counter, Unit};
use std::{convert::Infallible, net::SocketAddr, sync::Arc}; use std::{convert::Infallible, net::SocketAddr, sync::Arc};
@ -73,7 +69,7 @@ async fn start_endpoint<F: Hook + 'static>(
/// metrics. /// metrics.
pub(crate) async fn initialize( pub(crate) async fn initialize(
listen_addr: SocketAddr, listen_addr: SocketAddr,
db: Arc<Env<WriteMap>>, db: Arc<DatabaseEnv>,
process: metrics_process::Collector, process: metrics_process::Collector,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
let db_stats = move || { let db_stats = move || {

View File

@ -10,6 +10,7 @@ use reth_db::{
mdbx::{Env, WriteMap}, mdbx::{Env, WriteMap},
tables, tables,
transaction::DbTxMut, transaction::DbTxMut,
DatabaseEnv,
}; };
use reth_primitives::{stage::StageId, ChainSpec}; use reth_primitives::{stage::StageId, ChainSpec};
use reth_staged_sync::utils::init::{insert_genesis_header, insert_genesis_state}; use reth_staged_sync::utils::init::{insert_genesis_header, insert_genesis_state};
@ -70,7 +71,7 @@ impl Command {
tx.clear::<tables::BlockOmmers>()?; tx.clear::<tables::BlockOmmers>()?;
tx.clear::<tables::BlockWithdrawals>()?; tx.clear::<tables::BlockWithdrawals>()?;
tx.put::<tables::SyncStage>(StageId::Bodies.to_string(), Default::default())?; tx.put::<tables::SyncStage>(StageId::Bodies.to_string(), Default::default())?;
insert_genesis_header::<Env<WriteMap>>(tx, self.chain)?; insert_genesis_header::<DatabaseEnv>(tx, self.chain)?;
} }
StageEnum::Senders => { StageEnum::Senders => {
tx.clear::<tables::TxSenders>()?; tx.clear::<tables::TxSenders>()?;
@ -90,7 +91,7 @@ impl Command {
StageId::Execution.to_string(), StageId::Execution.to_string(),
Default::default(), Default::default(),
)?; )?;
insert_genesis_state::<Env<WriteMap>>(tx, self.chain.genesis())?; insert_genesis_state::<DatabaseEnv>(tx, self.chain.genesis())?;
} }
StageEnum::AccountHashing => { StageEnum::AccountHashing => {
tx.clear::<tables::HashedAccount>()?; tx.clear::<tables::HashedAccount>()?;
@ -155,7 +156,7 @@ impl Command {
StageId::TotalDifficulty.to_string(), StageId::TotalDifficulty.to_string(),
Default::default(), Default::default(),
)?; )?;
insert_genesis_header::<Env<WriteMap>>(tx, self.chain)?; insert_genesis_header::<DatabaseEnv>(tx, self.chain)?;
} }
_ => { _ => {
info!("Nothing to do for stage {:?}", self.stage); info!("Nothing to do for stage {:?}", self.stage);

View File

@ -5,10 +5,10 @@ use crate::{
}; };
use clap::Parser; use clap::Parser;
use reth_db::{ use reth_db::{
cursor::DbCursorRO, database::Database, table::TableImporter, tables, transaction::DbTx, cursor::DbCursorRO, database::Database, init_db, table::TableImporter, tables,
transaction::DbTx,
}; };
use reth_primitives::ChainSpec; use reth_primitives::ChainSpec;
use reth_staged_sync::utils::init::init_db;
use std::{path::PathBuf, sync::Arc}; use std::{path::PathBuf, sync::Arc};
use tracing::info; use tracing::info;

View File

@ -10,10 +10,10 @@ use crate::{
use clap::Parser; use clap::Parser;
use reth_beacon_consensus::BeaconConsensus; use reth_beacon_consensus::BeaconConsensus;
use reth_config::Config; use reth_config::Config;
use reth_db::init_db;
use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder; use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder;
use reth_primitives::ChainSpec; use reth_primitives::ChainSpec;
use reth_provider::{ProviderFactory, StageCheckpointReader}; use reth_provider::{ProviderFactory, StageCheckpointReader};
use reth_staged_sync::utils::init::init_db;
use reth_stages::{ use reth_stages::{
stages::{ stages::{
AccountHashingStage, BodyStage, ExecutionStage, ExecutionStageThresholds, AccountHashingStage, BodyStage, ExecutionStage, ExecutionStageThresholds,

View File

@ -1085,10 +1085,7 @@ mod tests {
use crate::block_buffer::BufferedBlocks; use crate::block_buffer::BufferedBlocks;
use assert_matches::assert_matches; use assert_matches::assert_matches;
use linked_hash_set::LinkedHashSet; use linked_hash_set::LinkedHashSet;
use reth_db::{ use reth_db::{mdbx::test_utils::create_test_rw_db, transaction::DbTxMut, DatabaseEnv};
mdbx::{test_utils::create_test_rw_db, Env, WriteMap},
transaction::DbTxMut,
};
use reth_interfaces::test_utils::TestConsensus; use reth_interfaces::test_utils::TestConsensus;
use reth_primitives::{ use reth_primitives::{
proofs::EMPTY_ROOT, stage::StageCheckpoint, ChainSpecBuilder, H256, MAINNET, proofs::EMPTY_ROOT, stage::StageCheckpoint, ChainSpecBuilder, H256, MAINNET,
@ -1102,7 +1099,7 @@ mod tests {
fn setup_externals( fn setup_externals(
exec_res: Vec<PostState>, exec_res: Vec<PostState>,
) -> TreeExternals<Arc<Env<WriteMap>>, Arc<TestConsensus>, TestExecutorFactory> { ) -> TreeExternals<Arc<DatabaseEnv>, Arc<TestConsensus>, TestExecutorFactory> {
let db = create_test_rw_db(); let db = create_test_rw_db();
let consensus = Arc::new(TestConsensus::default()); let consensus = Arc::new(TestConsensus::default());
let chain_spec = Arc::new( let chain_spec = Arc::new(

View File

@ -1361,7 +1361,7 @@ mod tests {
config::BlockchainTreeConfig, externals::TreeExternals, post_state::PostState, config::BlockchainTreeConfig, externals::TreeExternals, post_state::PostState,
BlockchainTree, ShareableBlockchainTree, BlockchainTree, ShareableBlockchainTree,
}; };
use reth_db::mdbx::{test_utils::create_test_rw_db, Env, WriteMap}; use reth_db::{mdbx::test_utils::create_test_rw_db, DatabaseEnv};
use reth_interfaces::{ use reth_interfaces::{
sync::NoopSyncStateUpdater, sync::NoopSyncStateUpdater,
test_utils::{NoopFullBlockClient, TestConsensus}, test_utils::{NoopFullBlockClient, TestConsensus},
@ -1381,10 +1381,10 @@ mod tests {
}; };
type TestBeaconConsensusEngine = BeaconConsensusEngine< type TestBeaconConsensusEngine = BeaconConsensusEngine<
Arc<Env<WriteMap>>, Arc<DatabaseEnv>,
BlockchainProvider< BlockchainProvider<
Arc<Env<WriteMap>>, Arc<DatabaseEnv>,
ShareableBlockchainTree<Arc<Env<WriteMap>>, TestConsensus, TestExecutorFactory>, ShareableBlockchainTree<Arc<DatabaseEnv>, TestConsensus, TestExecutorFactory>,
>, >,
NoopFullBlockClient, NoopFullBlockClient,
>; >;
@ -1498,7 +1498,7 @@ mod tests {
} }
/// Builds the test consensus engine into a `TestConsensusEngine` and `TestEnv`. /// Builds the test consensus engine into a `TestConsensusEngine` and `TestEnv`.
fn build(self) -> (TestBeaconConsensusEngine, TestEnv<Arc<Env<WriteMap>>>) { fn build(self) -> (TestBeaconConsensusEngine, TestEnv<Arc<DatabaseEnv>>) {
reth_tracing::init_test_tracing(); reth_tracing::init_test_tracing();
let db = create_test_rw_db(); let db = create_test_rw_db();
let consensus = TestConsensus::default(); let consensus = TestConsensus::default();

View File

@ -272,6 +272,7 @@ impl From<crate::Error> for InsertBlockErrorKind {
Error::Database(err) => InsertBlockErrorKind::Internal(Box::new(err)), Error::Database(err) => InsertBlockErrorKind::Internal(Box::new(err)),
Error::Provider(err) => InsertBlockErrorKind::Internal(Box::new(err)), Error::Provider(err) => InsertBlockErrorKind::Internal(Box::new(err)),
Error::Network(err) => InsertBlockErrorKind::Internal(Box::new(err)), Error::Network(err) => InsertBlockErrorKind::Internal(Box::new(err)),
Error::Custom(err) => InsertBlockErrorKind::Internal(err.into()),
} }
} }
} }

View File

@ -19,4 +19,7 @@ pub enum Error {
#[error(transparent)] #[error(transparent)]
Network(#[from] reth_network_api::NetworkError), Network(#[from] reth_network_api::NetworkError),
#[error("{0}")]
Custom(std::string::String),
} }

View File

@ -5,6 +5,7 @@ use reth_db::{
mdbx::{Env, WriteMap}, mdbx::{Env, WriteMap},
tables, tables,
transaction::DbTxMut, transaction::DbTxMut,
DatabaseEnv,
}; };
use reth_interfaces::{db, p2p::bodies::response::BlockResponse}; use reth_interfaces::{db, p2p::bodies::response::BlockResponse};
use reth_primitives::{Block, BlockBody, SealedBlock, SealedHeader, H256}; use reth_primitives::{Block, BlockBody, SealedBlock, SealedHeader, H256};
@ -46,7 +47,7 @@ pub(crate) fn create_raw_bodies<'a>(
} }
#[inline] #[inline]
pub(crate) fn insert_headers(db: &Env<WriteMap>, headers: &[SealedHeader]) { pub(crate) fn insert_headers(db: &DatabaseEnv, headers: &[SealedHeader]) {
db.update(|tx| -> Result<(), db::DatabaseError> { db.update(|tx| -> Result<(), db::DatabaseError> {
for header in headers { for header in headers {
tx.put::<tables::CanonicalHeaders>(header.number, header.hash())?; tx.put::<tables::CanonicalHeaders>(header.number, header.hash())?;

View File

@ -1,40 +1,14 @@
use eyre::WrapErr;
use reth_db::{ use reth_db::{
cursor::DbCursorRO, cursor::DbCursorRO,
database::{Database, DatabaseGAT}, database::{Database, DatabaseGAT},
is_database_empty,
mdbx::{Env, WriteMap},
tables, tables,
transaction::{DbTx, DbTxMut}, transaction::{DbTx, DbTxMut},
version::{check_db_version_file, create_db_version_file, DatabaseVersionError},
}; };
use reth_primitives::{stage::StageId, Account, Bytecode, ChainSpec, StorageEntry, H256, U256}; use reth_primitives::{stage::StageId, Account, Bytecode, ChainSpec, StorageEntry, H256, U256};
use reth_provider::{DatabaseProviderRW, HashingWriter, HistoryWriter, PostState, ProviderFactory}; use reth_provider::{DatabaseProviderRW, HashingWriter, HistoryWriter, PostState, ProviderFactory};
use std::{collections::BTreeMap, fs, path::Path, sync::Arc}; use std::{collections::BTreeMap, sync::Arc};
use tracing::debug; use tracing::debug;
/// Opens up an existing database or creates a new one at the specified path.
pub fn init_db<P: AsRef<Path>>(path: P) -> eyre::Result<Env<WriteMap>> {
if is_database_empty(&path) {
fs::create_dir_all(&path).wrap_err_with(|| {
format!("Could not create database directory {}", path.as_ref().display())
})?;
create_db_version_file(&path)?;
} else {
match check_db_version_file(&path) {
Ok(_) => (),
Err(DatabaseVersionError::MissingFile) => create_db_version_file(&path)?,
Err(err) => return Err(err.into()),
}
}
let db = Env::<WriteMap>::open(path.as_ref(), reth_db::mdbx::EnvKind::RW)?;
db.create_tables()?;
Ok(db)
}
/// Database initialization error type. /// Database initialization error type.
#[derive(Debug, thiserror::Error, PartialEq, Eq, Clone)] #[derive(Debug, thiserror::Error, PartialEq, Eq, Clone)]
pub enum InitDatabaseError { pub enum InitDatabaseError {
@ -202,19 +176,18 @@ pub fn insert_genesis_header<DB: Database>(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use assert_matches::assert_matches;
use reth_db::{ use reth_db::{
mdbx::test_utils::create_test_rw_db, mdbx::test_utils::create_test_rw_db,
models::{storage_sharded_key::StorageShardedKey, ShardedKey}, models::{storage_sharded_key::StorageShardedKey, ShardedKey},
table::Table, table::Table,
version::db_version_file_path, DatabaseEnv,
}; };
use reth_primitives::{ use reth_primitives::{
Address, Chain, ForkTimestamps, Genesis, GenesisAccount, IntegerList, GOERLI, Address, Chain, ForkTimestamps, Genesis, GenesisAccount, IntegerList, GOERLI,
GOERLI_GENESIS, MAINNET, MAINNET_GENESIS, SEPOLIA, SEPOLIA_GENESIS, GOERLI_GENESIS, MAINNET, MAINNET_GENESIS, SEPOLIA, SEPOLIA_GENESIS,
}; };
use std::collections::HashMap; use std::collections::HashMap;
use tempfile::tempdir;
fn collect_table_entries<DB, T>( fn collect_table_entries<DB, T>(
tx: &<DB as DatabaseGAT<'_>>::TX, tx: &<DB as DatabaseGAT<'_>>::TX,
@ -305,7 +278,7 @@ mod tests {
let tx = db.tx().expect("failed to init tx"); let tx = db.tx().expect("failed to init tx");
assert_eq!( assert_eq!(
collect_table_entries::<Arc<Env<WriteMap>>, tables::AccountHistory>(&tx) collect_table_entries::<Arc<DatabaseEnv>, tables::AccountHistory>(&tx)
.expect("failed to collect"), .expect("failed to collect"),
vec![ vec![
(ShardedKey::new(address_with_balance, u64::MAX), IntegerList::new([0]).unwrap()), (ShardedKey::new(address_with_balance, u64::MAX), IntegerList::new([0]).unwrap()),
@ -314,7 +287,7 @@ mod tests {
); );
assert_eq!( assert_eq!(
collect_table_entries::<Arc<Env<WriteMap>>, tables::StorageHistory>(&tx) collect_table_entries::<Arc<DatabaseEnv>, tables::StorageHistory>(&tx)
.expect("failed to collect"), .expect("failed to collect"),
vec![( vec![(
StorageShardedKey::new(address_with_storage, storage_key, u64::MAX), StorageShardedKey::new(address_with_storage, storage_key, u64::MAX),
@ -322,43 +295,4 @@ mod tests {
)], )],
); );
} }
#[test]
fn db_version() {
let path = tempdir().unwrap();
// Database is empty
{
let db = init_db(&path);
assert_matches!(db, Ok(_));
}
// Database is not empty, current version is the same as in the file
{
let db = init_db(&path);
assert_matches!(db, Ok(_));
}
// Database is not empty, version file is malformed
{
fs::write(path.path().join(db_version_file_path(&path)), "invalid-version").unwrap();
let db = init_db(&path);
assert!(db.is_err());
assert_matches!(
db.unwrap_err().downcast_ref::<DatabaseVersionError>(),
Some(DatabaseVersionError::MalformedFile)
)
}
// Database is not empty, version file contains not matching version
{
fs::write(path.path().join(db_version_file_path(&path)), "0").unwrap();
let db = init_db(&path);
assert!(db.is_err());
assert_matches!(
db.unwrap_err().downcast_ref::<DatabaseVersionError>(),
Some(DatabaseVersionError::VersionMismatch { version: 0 })
)
}
}
} }

View File

@ -3,7 +3,7 @@ use criterion::{
BenchmarkGroup, Criterion, BenchmarkGroup, Criterion,
}; };
use pprof::criterion::{Output, PProfProfiler}; use pprof::criterion::{Output, PProfProfiler};
use reth_db::mdbx::{Env, WriteMap}; use reth_db::DatabaseEnv;
use reth_interfaces::test_utils::TestConsensus; use reth_interfaces::test_utils::TestConsensus;
use reth_primitives::{stage::StageCheckpoint, MAINNET}; use reth_primitives::{stage::StageCheckpoint, MAINNET};
use reth_provider::ProviderFactory; use reth_provider::ProviderFactory;
@ -122,7 +122,7 @@ fn measure_stage_with_path<F, S>(
stage_range: StageRange, stage_range: StageRange,
label: String, label: String,
) where ) where
S: Clone + Stage<Env<WriteMap>>, S: Clone + Stage<DatabaseEnv>,
F: Fn(S, &TestTransaction, StageRange), F: Fn(S, &TestTransaction, StageRange),
{ {
let tx = TestTransaction::new(&path); let tx = TestTransaction::new(&path);
@ -152,7 +152,7 @@ fn measure_stage<F, S>(
block_interval: std::ops::Range<u64>, block_interval: std::ops::Range<u64>,
label: String, label: String,
) where ) where
S: Clone + Stage<Env<WriteMap>>, S: Clone + Stage<DatabaseEnv>,
F: Fn(S, &TestTransaction, StageRange), F: Fn(S, &TestTransaction, StageRange),
{ {
let path = setup::txs_testdata(block_interval.end); let path = setup::txs_testdata(block_interval.end);

View File

@ -1,9 +1,9 @@
use itertools::concat; use itertools::concat;
use reth_db::{ use reth_db::{
cursor::DbCursorRO, cursor::DbCursorRO,
mdbx::{Env, WriteMap},
tables, tables,
transaction::{DbTx, DbTxMut}, transaction::{DbTx, DbTxMut},
DatabaseEnv,
}; };
use reth_interfaces::test_utils::{ use reth_interfaces::test_utils::{
generators, generators,
@ -32,7 +32,7 @@ pub use account_hashing::*;
pub(crate) type StageRange = (ExecInput, UnwindInput); pub(crate) type StageRange = (ExecInput, UnwindInput);
pub(crate) fn stage_unwind<S: Clone + Stage<Env<WriteMap>>>( pub(crate) fn stage_unwind<S: Clone + Stage<DatabaseEnv>>(
stage: S, stage: S,
tx: &TestTransaction, tx: &TestTransaction,
range: StageRange, range: StageRange,
@ -60,7 +60,7 @@ pub(crate) fn stage_unwind<S: Clone + Stage<Env<WriteMap>>>(
}); });
} }
pub(crate) fn unwind_hashes<S: Clone + Stage<Env<WriteMap>>>( pub(crate) fn unwind_hashes<S: Clone + Stage<DatabaseEnv>>(
stage: S, stage: S,
tx: &TestTransaction, tx: &TestTransaction,
range: StageRange, range: StageRange,

View File

@ -5,7 +5,7 @@ use reth_primitives::{
stage::{StageCheckpoint, StageId}, stage::{StageCheckpoint, StageId},
BlockNumber, TxNumber, BlockNumber, TxNumber,
}; };
use reth_provider::DatabaseProviderRW; use reth_provider::{BlockReader, DatabaseProviderRW, ProviderError};
use std::{ use std::{
cmp::{max, min}, cmp::{max, min},
ops::RangeInclusive, ops::RangeInclusive,
@ -79,7 +79,9 @@ impl ExecInput {
tx_threshold: u64, tx_threshold: u64,
) -> Result<(RangeInclusive<TxNumber>, RangeInclusive<BlockNumber>, bool), StageError> { ) -> Result<(RangeInclusive<TxNumber>, RangeInclusive<BlockNumber>, bool), StageError> {
let start_block = self.next_block(); let start_block = self.next_block();
let start_block_body = provider.block_body_indices(start_block)?; let start_block_body = provider
.block_body_indices(start_block)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(start_block))?;
let target_block = self.target(); let target_block = self.target();

View File

@ -456,10 +456,10 @@ mod tests {
use reth_db::{ use reth_db::{
cursor::DbCursorRO, cursor::DbCursorRO,
database::Database, database::Database,
mdbx::{Env, WriteMap},
models::{StoredBlockBodyIndices, StoredBlockOmmers}, models::{StoredBlockBodyIndices, StoredBlockOmmers},
tables, tables,
transaction::{DbTx, DbTxMut}, transaction::{DbTx, DbTxMut},
DatabaseEnv,
}; };
use reth_interfaces::{ use reth_interfaces::{
p2p::{ p2p::{
@ -740,7 +740,7 @@ mod tests {
/// A [BodyDownloader] that is backed by an internal [HashMap] for testing. /// A [BodyDownloader] that is backed by an internal [HashMap] for testing.
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct TestBodyDownloader { pub(crate) struct TestBodyDownloader {
db: Arc<Env<WriteMap>>, db: Arc<DatabaseEnv>,
responses: HashMap<H256, BlockBody>, responses: HashMap<H256, BlockBody>,
headers: VecDeque<SealedHeader>, headers: VecDeque<SealedHeader>,
batch_size: u64, batch_size: u64,
@ -748,7 +748,7 @@ mod tests {
impl TestBodyDownloader { impl TestBodyDownloader {
pub(crate) fn new( pub(crate) fn new(
db: Arc<Env<WriteMap>>, db: Arc<DatabaseEnv>,
responses: HashMap<H256, BlockBody>, responses: HashMap<H256, BlockBody>,
batch_size: u64, batch_size: u64,
) -> Self { ) -> Self {

View File

@ -353,7 +353,10 @@ impl<EF: ExecutorFactory, DB: Database> Stage<DB> for ExecutionStage<EF> {
} }
// Look up the start index for the transaction range // Look up the start index for the transaction range
let first_tx_num = provider.block_body_indices(*range.start())?.first_tx_num(); let first_tx_num = provider
.block_body_indices(*range.start())?
.ok_or(ProviderError::BlockBodyIndicesNotFound(*range.start()))?
.first_tx_num();
let mut stage_checkpoint = input.checkpoint.execution_stage_checkpoint(); let mut stage_checkpoint = input.checkpoint.execution_stage_checkpoint();

View File

@ -13,7 +13,7 @@ use reth_primitives::{
stage::{EntitiesCheckpoint, StageCheckpoint, StageId}, stage::{EntitiesCheckpoint, StageCheckpoint, StageId},
TransactionSignedNoHash, TxNumber, H160, TransactionSignedNoHash, TxNumber, H160,
}; };
use reth_provider::{DatabaseProviderRW, HeaderProvider, ProviderError}; use reth_provider::{BlockReader, DatabaseProviderRW, HeaderProvider, ProviderError};
use std::fmt::Debug; use std::fmt::Debug;
use thiserror::Error; use thiserror::Error;
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -173,7 +173,10 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
let (_, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold); let (_, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold);
// Lookup latest tx id that we should unwind to // Lookup latest tx id that we should unwind to
let latest_tx_id = provider.block_body_indices(unwind_to)?.last_tx_num(); let latest_tx_id = provider
.block_body_indices(unwind_to)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(unwind_to))?
.last_tx_num();
provider.unwind_table_by_num::<tables::TxSenders>(latest_tx_id)?; provider.unwind_table_by_num::<tables::TxSenders>(latest_tx_id)?;
Ok(UnwindOutput { Ok(UnwindOutput {
@ -386,7 +389,11 @@ mod tests {
/// 2. If the is no requested block entry in the bodies table, but [tables::TxSenders] is /// 2. If the is no requested block entry in the bodies table, but [tables::TxSenders] is
/// not empty. /// not empty.
fn ensure_no_senders_by_block(&self, block: BlockNumber) -> Result<(), TestRunnerError> { fn ensure_no_senders_by_block(&self, block: BlockNumber) -> Result<(), TestRunnerError> {
let body_result = self.tx.inner_rw().block_body_indices(block); let body_result = self
.tx
.inner_rw()
.block_body_indices(block)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(block));
match body_result { match body_result {
Ok(body) => self Ok(body) => self
.tx .tx

View File

@ -203,7 +203,7 @@ mod tests {
generators::{random_block, random_block_range}, generators::{random_block, random_block_range},
}; };
use reth_primitives::{stage::StageUnitCheckpoint, BlockNumber, SealedBlock, H256}; use reth_primitives::{stage::StageUnitCheckpoint, BlockNumber, SealedBlock, H256};
use reth_provider::TransactionsProvider; use reth_provider::{BlockReader, ProviderError, TransactionsProvider};
// Implement stage test suite. // Implement stage test suite.
stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup); stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup);
@ -345,7 +345,11 @@ mod tests {
/// 2. If the is no requested block entry in the bodies table, but [tables::TxHashNumber] is /// 2. If the is no requested block entry in the bodies table, but [tables::TxHashNumber] is
/// not empty. /// not empty.
fn ensure_no_hash_by_block(&self, number: BlockNumber) -> Result<(), TestRunnerError> { fn ensure_no_hash_by_block(&self, number: BlockNumber) -> Result<(), TestRunnerError> {
let body_result = self.tx.inner_rw().block_body_indices(number); let body_result = self
.tx
.inner_rw()
.block_body_indices(number)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(number));
match body_result { match body_result {
Ok(body) => self.tx.ensure_no_entry_above_by_value::<tables::TxHashNumber, _>( Ok(body) => self.tx.ensure_no_entry_above_by_value::<tables::TxHashNumber, _>(
body.last_tx_num(), body.last_tx_num(),

View File

@ -1,6 +1,9 @@
use super::TestTransaction; use super::TestTransaction;
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use reth_db::mdbx::{Env, WriteMap}; use reth_db::{
mdbx::{Env, WriteMap},
DatabaseEnv,
};
use reth_primitives::MAINNET; use reth_primitives::MAINNET;
use reth_provider::ProviderFactory; use reth_provider::ProviderFactory;
use std::{borrow::Borrow, sync::Arc}; use std::{borrow::Borrow, sync::Arc};
@ -19,7 +22,7 @@ pub(crate) enum TestRunnerError {
/// A generic test runner for stages. /// A generic test runner for stages.
#[async_trait::async_trait] #[async_trait::async_trait]
pub(crate) trait StageTestRunner { pub(crate) trait StageTestRunner {
type S: Stage<Env<WriteMap>> + 'static; type S: Stage<DatabaseEnv> + 'static;
/// Return a reference to the database. /// Return a reference to the database.
fn tx(&self) -> &TestTransaction; fn tx(&self) -> &TestTransaction;

View File

@ -10,7 +10,7 @@ use reth_db::{
table::Table, table::Table,
tables, tables,
transaction::{DbTx, DbTxMut}, transaction::{DbTx, DbTxMut},
DatabaseError as DbError, DatabaseEnv, DatabaseError as DbError,
}; };
use reth_primitives::{ use reth_primitives::{
keccak256, Account, Address, BlockNumber, SealedBlock, SealedHeader, StorageEntry, H256, keccak256, Account, Address, BlockNumber, SealedBlock, SealedHeader, StorageEntry, H256,
@ -35,9 +35,9 @@ use std::{
#[derive(Debug)] #[derive(Debug)]
pub struct TestTransaction { pub struct TestTransaction {
/// WriteMap DB /// WriteMap DB
pub tx: Arc<Env<WriteMap>>, pub tx: Arc<DatabaseEnv>,
pub path: Option<PathBuf>, pub path: Option<PathBuf>,
pub factory: ProviderFactory<Arc<Env<WriteMap>>>, pub factory: ProviderFactory<Arc<DatabaseEnv>>,
} }
impl Default for TestTransaction { impl Default for TestTransaction {
@ -59,17 +59,17 @@ impl TestTransaction {
} }
/// Return a database wrapped in [DatabaseProviderRW]. /// Return a database wrapped in [DatabaseProviderRW].
pub fn inner_rw(&self) -> DatabaseProviderRW<'_, Arc<Env<WriteMap>>> { pub fn inner_rw(&self) -> DatabaseProviderRW<'_, Arc<DatabaseEnv>> {
self.factory.provider_rw().expect("failed to create db container") self.factory.provider_rw().expect("failed to create db container")
} }
/// Return a database wrapped in [DatabaseProviderRO]. /// Return a database wrapped in [DatabaseProviderRO].
pub fn inner(&self) -> DatabaseProviderRO<'_, Arc<Env<WriteMap>>> { pub fn inner(&self) -> DatabaseProviderRO<'_, Arc<DatabaseEnv>> {
self.factory.provider().expect("failed to create db container") self.factory.provider().expect("failed to create db container")
} }
/// Get a pointer to an internal database. /// Get a pointer to an internal database.
pub fn inner_raw(&self) -> Arc<Env<WriteMap>> { pub fn inner_raw(&self) -> Arc<DatabaseEnv> {
self.tx.clone() self.tx.clone()
} }

View File

@ -38,6 +38,7 @@ thiserror = { workspace = true }
tempfile = { version = "3.3.0", optional = true } tempfile = { version = "3.3.0", optional = true }
parking_lot = "0.12" parking_lot = "0.12"
derive_more = "0.99" derive_more = "0.99"
eyre = "0.6.8"
# arbitrary utils # arbitrary utils
arbitrary = { version = "1.1.7", features = ["derive"], optional = true } arbitrary = { version = "1.1.7", features = ["derive"], optional = true }

View File

@ -162,10 +162,7 @@ where
(preload, input) (preload, input)
} }
fn append<T>( fn append<T>(db: DatabaseEnv, input: Vec<(<T as Table>::Key, <T as Table>::Value)>) -> DatabaseEnv
db: Env<WriteMap>,
input: Vec<(<T as Table>::Key, <T as Table>::Value)>,
) -> Env<WriteMap>
where where
T: Table + Default, T: Table + Default,
{ {
@ -183,10 +180,7 @@ where
db db
} }
fn insert<T>( fn insert<T>(db: DatabaseEnv, input: Vec<(<T as Table>::Key, <T as Table>::Value)>) -> DatabaseEnv
db: Env<WriteMap>,
input: Vec<(<T as Table>::Key, <T as Table>::Value)>,
) -> Env<WriteMap>
where where
T: Table + Default, T: Table + Default,
{ {
@ -204,7 +198,7 @@ where
db db
} }
fn put<T>(db: Env<WriteMap>, input: Vec<(<T as Table>::Key, <T as Table>::Value)>) -> Env<WriteMap> fn put<T>(db: DatabaseEnv, input: Vec<(<T as Table>::Key, <T as Table>::Value)>) -> DatabaseEnv
where where
T: Table + Default, T: Table + Default,
{ {
@ -231,7 +225,7 @@ struct TableStats {
size: usize, size: usize,
} }
fn get_table_stats<T>(db: Env<WriteMap>) fn get_table_stats<T>(db: DatabaseEnv)
where where
T: Table + Default, T: Table + Default,
{ {

View File

@ -1,3 +1,4 @@
use reth_db::DatabaseEnv;
#[allow(unused_imports)] #[allow(unused_imports)]
use reth_db::{ use reth_db::{
database::Database, database::Database,
@ -51,7 +52,7 @@ where
fn set_up_db<T>( fn set_up_db<T>(
bench_db_path: &Path, bench_db_path: &Path,
pair: &Vec<(<T as Table>::Key, bytes::Bytes, <T as Table>::Value, bytes::Bytes)>, pair: &Vec<(<T as Table>::Key, bytes::Bytes, <T as Table>::Value, bytes::Bytes)>,
) -> reth_db::mdbx::Env<WriteMap> ) -> DatabaseEnv
where where
T: Table + Default, T: Table + Default,
T::Key: Default + Clone, T::Key: Default + Clone,

View File

@ -87,3 +87,89 @@ pub use abstraction::*;
pub use reth_interfaces::db::DatabaseError; pub use reth_interfaces::db::DatabaseError;
pub use tables::*; pub use tables::*;
pub use utils::is_database_empty; pub use utils::is_database_empty;
#[cfg(feature = "mdbx")]
use mdbx::{Env, EnvKind, WriteMap};
#[cfg(feature = "mdbx")]
/// Alias type for the database engine in use.
pub type DatabaseEnv = Env<WriteMap>;
/// Opens up an existing database or creates a new one at the specified path.
pub fn init_db<P: AsRef<std::path::Path>>(path: P) -> eyre::Result<DatabaseEnv> {
use crate::version::{check_db_version_file, create_db_version_file, DatabaseVersionError};
use eyre::WrapErr;
let rpath = path.as_ref();
if is_database_empty(rpath) {
std::fs::create_dir_all(rpath)
.wrap_err_with(|| format!("Could not create database directory {}", rpath.display()))?;
create_db_version_file(rpath)?;
} else {
match check_db_version_file(rpath) {
Ok(_) => (),
Err(DatabaseVersionError::MissingFile) => create_db_version_file(rpath)?,
Err(err) => return Err(err.into()),
}
}
#[cfg(feature = "mdbx")]
{
let db = Env::<WriteMap>::open(rpath, EnvKind::RW)?;
db.create_tables()?;
Ok(db)
}
#[cfg(not(feature = "mdbx"))]
{
unimplemented!();
}
}
#[cfg(test)]
mod tests {
use crate::{
init_db,
version::{db_version_file_path, DatabaseVersionError},
};
use assert_matches::assert_matches;
use tempfile::tempdir;
#[test]
fn db_version() {
let path = tempdir().unwrap();
// Database is empty
{
let db = init_db(&path);
assert_matches!(db, Ok(_));
}
// Database is not empty, current version is the same as in the file
{
let db = init_db(&path);
assert_matches!(db, Ok(_));
}
// Database is not empty, version file is malformed
{
std::fs::write(path.path().join(db_version_file_path(&path)), "invalid-version")
.unwrap();
let db = init_db(&path);
assert!(db.is_err());
assert_matches!(
db.unwrap_err().downcast_ref::<DatabaseVersionError>(),
Some(DatabaseVersionError::MalformedFile)
)
}
// Database is not empty, version file contains not matching version
{
std::fs::write(path.path().join(db_version_file_path(&path)), "0").unwrap();
let db = init_db(&path);
assert!(db.is_err());
assert_matches!(
db.unwrap_err().downcast_ref::<DatabaseVersionError>(),
Some(DatabaseVersionError::VersionMismatch { version: 0 })
)
}
}
}

View File

@ -39,6 +39,7 @@ reth-primitives = { workspace = true, features = ["arbitrary", "test-utils"] }
reth-rlp = { workspace = true } reth-rlp = { workspace = true }
reth-trie = { path = "../../trie", features = ["test-utils"] } reth-trie = { path = "../../trie", features = ["test-utils"] }
parking_lot = "0.12" parking_lot = "0.12"
tempfile = "3.3"
[features] [features]
test-utils = ["reth-rlp"] test-utils = ["reth-rlp"]

View File

@ -643,8 +643,9 @@ mod tests {
use crate::{AccountReader, ProviderFactory}; use crate::{AccountReader, ProviderFactory};
use reth_db::{ use reth_db::{
database::Database, database::Database,
mdbx::{test_utils, Env, EnvKind, WriteMap}, mdbx::{test_utils, EnvKind},
transaction::DbTx, transaction::DbTx,
DatabaseEnv,
}; };
use reth_primitives::{proofs::EMPTY_ROOT, MAINNET}; use reth_primitives::{proofs::EMPTY_ROOT, MAINNET};
use reth_trie::test_utils::state_root; use reth_trie::test_utils::state_root;
@ -1066,7 +1067,7 @@ mod tests {
#[test] #[test]
fn write_to_db_account_info() { fn write_to_db_account_info() {
let db: Arc<Env<WriteMap>> = test_utils::create_test_db(EnvKind::RW); let db: Arc<DatabaseEnv> = test_utils::create_test_db(EnvKind::RW);
let factory = ProviderFactory::new(db, MAINNET.clone()); let factory = ProviderFactory::new(db, MAINNET.clone());
let provider = factory.provider_rw().unwrap(); let provider = factory.provider_rw().unwrap();
@ -1135,7 +1136,7 @@ mod tests {
#[test] #[test]
fn write_to_db_storage() { fn write_to_db_storage() {
let db: Arc<Env<WriteMap>> = test_utils::create_test_db(EnvKind::RW); let db: Arc<DatabaseEnv> = test_utils::create_test_db(EnvKind::RW);
let tx = db.tx_mut().expect("Could not get database tx"); let tx = db.tx_mut().expect("Could not get database tx");
let mut post_state = PostState::new(); let mut post_state = PostState::new();
@ -1271,7 +1272,7 @@ mod tests {
#[test] #[test]
fn write_to_db_multiple_selfdestructs() { fn write_to_db_multiple_selfdestructs() {
let db: Arc<Env<WriteMap>> = test_utils::create_test_db(EnvKind::RW); let db: Arc<DatabaseEnv> = test_utils::create_test_db(EnvKind::RW);
let tx = db.tx_mut().expect("Could not get database tx"); let tx = db.tx_mut().expect("Could not get database tx");
let address1 = Address::random(); let address1 = Address::random();
@ -1820,7 +1821,7 @@ mod tests {
#[test] #[test]
fn empty_post_state_state_root() { fn empty_post_state_state_root() {
let db: Arc<Env<WriteMap>> = test_utils::create_test_db(EnvKind::RW); let db: Arc<DatabaseEnv> = test_utils::create_test_db(EnvKind::RW);
let tx = db.tx().unwrap(); let tx = db.tx().unwrap();
let post_state = PostState::new(); let post_state = PostState::new();
@ -1839,7 +1840,7 @@ mod tests {
}) })
.collect(); .collect();
let db: Arc<Env<WriteMap>> = test_utils::create_test_db(EnvKind::RW); let db: Arc<DatabaseEnv> = test_utils::create_test_db(EnvKind::RW);
// insert initial state to the database // insert initial state to the database
db.update(|tx| { db.update(|tx| {

View File

@ -4,7 +4,7 @@ use crate::{
BlockHashReader, BlockNumReader, BlockReader, EvmEnvProvider, HeaderProvider, ProviderError, BlockHashReader, BlockNumReader, BlockReader, EvmEnvProvider, HeaderProvider, ProviderError,
StageCheckpointReader, StateProviderBox, TransactionsProvider, WithdrawalsProvider, StageCheckpointReader, StateProviderBox, TransactionsProvider, WithdrawalsProvider,
}; };
use reth_db::{database::Database, models::StoredBlockBodyIndices}; use reth_db::{database::Database, init_db, models::StoredBlockBodyIndices, DatabaseEnv};
use reth_interfaces::Result; use reth_interfaces::Result;
use reth_primitives::{ use reth_primitives::{
stage::{StageCheckpoint, StageId}, stage::{StageCheckpoint, StageId},
@ -54,6 +54,20 @@ impl<DB> ProviderFactory<DB> {
} }
} }
impl<DB: Database> ProviderFactory<DB> {
/// create new database provider by passing a path. [`ProviderFactory`] will own the database
/// instance.
pub fn new_with_database_path<P: AsRef<std::path::Path>>(
path: P,
chain_spec: Arc<ChainSpec>,
) -> Result<ProviderFactory<DatabaseEnv>> {
Ok(ProviderFactory::<DatabaseEnv> {
db: init_db(path).map_err(|e| reth_interfaces::Error::Custom(e.to_string()))?,
chain_spec,
})
}
}
impl<DB: Clone> Clone for ProviderFactory<DB> { impl<DB: Clone> Clone for ProviderFactory<DB> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { db: self.db.clone(), chain_spec: Arc::clone(&self.chain_spec) } Self { db: self.db.clone(), chain_spec: Arc::clone(&self.chain_spec) }
@ -333,7 +347,13 @@ impl<DB: Database> EvmEnvProvider for ProviderFactory<DB> {
mod tests { mod tests {
use super::ProviderFactory; use super::ProviderFactory;
use crate::{BlockHashReader, BlockNumReader}; use crate::{BlockHashReader, BlockNumReader};
use reth_db::mdbx::{test_utils::create_test_db, EnvKind, WriteMap}; use reth_db::{
mdbx::{
test_utils::{create_test_db, ERROR_TEMPDIR},
EnvKind, WriteMap,
},
DatabaseEnv,
};
use reth_primitives::{ChainSpecBuilder, H256}; use reth_primitives::{ChainSpecBuilder, H256};
use std::sync::Arc; use std::sync::Arc;
@ -368,4 +388,20 @@ mod tests {
provider_rw.block_hash(0).unwrap(); provider_rw.block_hash(0).unwrap();
provider.block_hash(0).unwrap(); provider.block_hash(0).unwrap();
} }
#[test]
fn provider_factory_with_database_path() {
let chain_spec = ChainSpecBuilder::mainnet().build();
let factory = ProviderFactory::<DatabaseEnv>::new_with_database_path(
tempfile::TempDir::new().expect(ERROR_TEMPDIR).into_path(),
Arc::new(chain_spec),
)
.unwrap();
let provider = factory.provider().unwrap();
provider.block_hash(0).unwrap();
let provider_rw = factory.provider_rw().unwrap();
provider_rw.block_hash(0).unwrap();
provider.block_hash(0).unwrap();
}
} }

View File

@ -560,15 +560,6 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
Ok(blocks) Ok(blocks)
} }
/// Query the block body by number.
pub fn block_body_indices(&self, number: BlockNumber) -> Result<StoredBlockBodyIndices> {
let body = self
.tx
.get::<tables::BlockBodyIndices>(number)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(number))?;
Ok(body)
}
/// Unwind table by some number key. /// Unwind table by some number key.
/// Returns number of rows unwound. /// Returns number of rows unwound.
/// ///
@ -1695,9 +1686,8 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> BlockWriter for DatabaseProvider<'
block.difficulty block.difficulty
} else { } else {
let parent_block_number = block.number - 1; let parent_block_number = block.number - 1;
let parent_ttd = let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default();
self.tx.get::<tables::HeaderTD>(parent_block_number)?.unwrap_or_default(); parent_ttd + block.difficulty
parent_ttd.0 + block.difficulty
}; };
self.tx.put::<tables::HeaderTD>(block.number, ttd.into())?; self.tx.put::<tables::HeaderTD>(block.number, ttd.into())?;

View File

@ -514,9 +514,10 @@ mod tests {
use proptest::{prelude::ProptestConfig, proptest}; use proptest::{prelude::ProptestConfig, proptest};
use reth_db::{ use reth_db::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO}, cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
mdbx::{test_utils::create_test_rw_db, Env, WriteMap}, mdbx::test_utils::create_test_rw_db,
tables, tables,
transaction::DbTxMut, transaction::DbTxMut,
DatabaseEnv,
}; };
use reth_primitives::{ use reth_primitives::{
hex_literal::hex, hex_literal::hex,
@ -1278,7 +1279,7 @@ mod tests {
} }
fn extension_node_storage_trie( fn extension_node_storage_trie(
tx: &DatabaseProviderRW<'_, &Env<WriteMap>>, tx: &DatabaseProviderRW<'_, &DatabaseEnv>,
hashed_address: H256, hashed_address: H256,
) -> (H256, HashMap<Nibbles, BranchNodeCompact>) { ) -> (H256, HashMap<Nibbles, BranchNodeCompact>) {
let value = U256::from(1); let value = U256::from(1);
@ -1304,7 +1305,7 @@ mod tests {
(root, updates) (root, updates)
} }
fn extension_node_trie(tx: &DatabaseProviderRW<'_, &Env<WriteMap>>) -> H256 { fn extension_node_trie(tx: &DatabaseProviderRW<'_, &DatabaseEnv>) -> H256 {
let a = let a =
Account { nonce: 0, balance: U256::from(1u64), bytecode_hash: Some(H256::random()) }; Account { nonce: 0, balance: U256::from(1u64), bytecode_hash: Some(H256::random()) };
let val = encode_account(a, None); let val = encode_account(a, None);