From 3d0864bbb9699d60cc4ee36ba6c2eb01ea6674a1 Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Sat, 11 Feb 2023 00:25:26 -0500 Subject: [PATCH] chore: move Transaction container to reth_provider (#1238) Co-authored-by: Georgios Konstantopoulos --- bin/reth/src/stage/mod.rs | 6 +- bin/reth/src/test_eth_chain/runner.rs | 3 +- crates/interfaces/src/provider.rs | 70 ++++++++++++++--- crates/stages/src/error.rs | 66 +++------------- crates/stages/src/lib.rs | 2 - crates/stages/src/pipeline/mod.rs | 16 ++-- crates/stages/src/stage.rs | 3 +- crates/stages/src/stages/bodies.rs | 8 +- crates/stages/src/stages/execution.rs | 27 +++---- crates/stages/src/stages/hashing_account.rs | 5 +- crates/stages/src/stages/hashing_storage.rs | 5 +- crates/stages/src/stages/headers.rs | 19 +++-- .../src/stages/index_account_history.rs | 5 +- .../src/stages/index_storage_history.rs | 5 +- crates/stages/src/stages/merkle.rs | 5 +- crates/stages/src/stages/sender_recovery.rs | 5 +- crates/stages/src/stages/total_difficulty.rs | 9 ++- crates/stages/src/stages/tx_lookup.rs | 5 +- crates/stages/src/test_utils/runner.rs | 6 +- crates/stages/src/test_utils/test_db.rs | 3 +- crates/stages/src/trie/mod.rs | 2 +- crates/storage/provider/src/lib.rs | 4 + .../provider/src/transaction.rs} | 75 ++++++++++--------- crates/storage/provider/src/utils.rs | 2 +- docs/repo/crates/db.md | 4 +- 25 files changed, 181 insertions(+), 179 deletions(-) rename crates/{stages/src/db.rs => storage/provider/src/transaction.rs} (74%) diff --git a/bin/reth/src/stage/mod.rs b/bin/reth/src/stage/mod.rs index aeca6717a..eb2c56730 100644 --- a/bin/reth/src/stage/mod.rs +++ b/bin/reth/src/stage/mod.rs @@ -6,20 +6,20 @@ use crate::{ dirs::{ConfigPath, DbPath, PlatformPath}, prometheus_exporter, }; +use clap::{Parser, ValueEnum}; use reth_consensus::beacon::BeaconConsensus; use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder; use reth_net_nat::NatResolver; use reth_primitives::ChainSpec; +use reth_provider::Transaction; use reth_staged_sync::{ utils::{chainspec::chain_spec_value_parser, init::init_db}, Config, }; use reth_stages::{ stages::{BodyStage, ExecutionStage, SenderRecoveryStage}, - ExecInput, Stage, StageId, Transaction, UnwindInput, + ExecInput, Stage, StageId, UnwindInput, }; - -use clap::{Parser, ValueEnum}; use std::{net::SocketAddr, sync::Arc}; use tracing::*; diff --git a/bin/reth/src/test_eth_chain/runner.rs b/bin/reth/src/test_eth_chain/runner.rs index be2f1c64a..b6f479f24 100644 --- a/bin/reth/src/test_eth_chain/runner.rs +++ b/bin/reth/src/test_eth_chain/runner.rs @@ -13,8 +13,9 @@ use reth_primitives::{ keccak256, Account as RethAccount, Address, ChainSpec, ForkCondition, Hardfork, JsonU256, SealedBlock, SealedHeader, StorageEntry, H256, U256, }; +use reth_provider::Transaction; use reth_rlp::Decodable; -use reth_stages::{stages::ExecutionStage, ExecInput, Stage, StageId, Transaction}; +use reth_stages::{stages::ExecutionStage, ExecInput, Stage, StageId}; use std::{ collections::HashMap, ffi::OsStr, diff --git a/crates/interfaces/src/provider.rs b/crates/interfaces/src/provider.rs index a89d6ad70..102b56eee 100644 --- a/crates/interfaces/src/provider.rs +++ b/crates/interfaces/src/provider.rs @@ -1,25 +1,71 @@ -use reth_primitives::{Address, BlockHash, BlockNumber, TransitionId, H256}; +use reth_primitives::{Address, BlockHash, BlockNumber, TransitionId, TxNumber, H256}; /// KV error type. They are using u32 to represent error code. #[allow(missing_docs)] #[derive(Debug, thiserror::Error, PartialEq, Eq, Clone)] pub enum Error { + /// The header hash is missing from the database. #[error("Block number {block_number} does not exist in database")] - BlockNumber { block_number: BlockNumber }, + CanonicalHeader { block_number: BlockNumber }, + /// A header body is missing from the database. + #[error("No header for block #{number}")] + Header { + /// The block number key + number: BlockNumber, + }, + /// The header number was not found for the given block hash. #[error("Block hash {block_hash:?} does not exist in Headers table")] BlockHash { block_hash: BlockHash }, - #[error("Block body not exists #{block_number}")] - BlockBody { block_number: BlockNumber }, + /// A block body is missing. + #[error("Block body not found for block #{number}")] + BlockBody { number: BlockNumber }, + /// The block transition id for a certain block number is missing. #[error("Block transition id does not exist for block #{block_number}")] BlockTransition { block_number: BlockNumber }, - #[error("Block number {block_number} with hash #{received_hash:?} is not canonical block. Canonical block hash is #{expected_hash:?}")] - NonCanonicalBlock { - block_number: BlockNumber, - expected_hash: BlockHash, - received_hash: BlockHash, - }, + /// The transition id was found for the given address and storage key, but the changeset was + /// not found. #[error("Storage ChangeSet address: ({address:?} key: {storage_key:?}) for transition:#{transition_id} does not exist")] - StorageChangeset { transition_id: TransitionId, address: Address, storage_key: H256 }, + StorageChangeset { + /// The transition id found for the address and storage key + transition_id: TransitionId, + /// The account address + address: Address, + /// The storage key + storage_key: H256, + }, + /// The transition id was found for the given address, but the changeset was not found. #[error("Account {address:?} ChangeSet for transition #{transition_id} does not exist")] - AccountChangeset { transition_id: TransitionId, address: Address }, + AccountChangeset { + /// Transition id found for the address + transition_id: TransitionId, + /// The account address + address: Address, + }, + /// The total difficulty for a block is missing. + #[error("Total difficulty not found for block #{number}")] + TotalDifficulty { number: BlockNumber }, + /// The transaction is missing + #[error("Transaction #{id} not found")] + Transaction { + /// The transaction id + id: TxNumber, + }, + /// A ommers are missing. + #[error("Block ommers not found for block #{number}")] + Ommers { + /// The block number key + number: BlockNumber, + }, + /// There is a gap in the transaction table, at a missing transaction number. + #[error("Gap in transaction table. Missing tx number #{missing}.")] + TransactionsGap { missing: TxNumber }, + /// There is a gap in the senders table, at a missing transaction number. + #[error("Gap in transaction signer table. Missing tx number #{missing}.")] + TransactionsSignerGap { missing: TxNumber }, + /// Reached the end of the transaction table. + #[error("Got to the end of transaction table")] + EndOfTransactionTable, + /// Reached the end of the transaction sender table. + #[error("Got to the end of the transaction sender table")] + EndOfTransactionSenderTable, } diff --git a/crates/stages/src/error.rs b/crates/stages/src/error.rs index f7bda51f1..06b543709 100644 --- a/crates/stages/src/error.rs +++ b/crates/stages/src/error.rs @@ -1,6 +1,10 @@ use crate::pipeline::PipelineEvent; -use reth_interfaces::{consensus, db::Error as DbError, executor, p2p::error::DownloadError}; -use reth_primitives::{BlockNumber, TxNumber}; +use reth_interfaces::{ + consensus, db::Error as DbError, executor, p2p::error::DownloadError, + provider::Error as ProviderError, +}; +use reth_primitives::BlockNumber; +use reth_provider::TransactionError; use thiserror::Error; use tokio::sync::mpsc::error::SendError; @@ -31,7 +35,10 @@ pub enum StageError { }, /// The stage encountered a database integrity error. #[error("A database integrity error occurred: {0}")] - DatabaseIntegrity(#[from] DatabaseIntegrityError), + DatabaseIntegrity(#[from] ProviderError), + /// The stage encountered an error related to the current database transaction. + #[error("A database transaction error occurred: {0}")] + Transaction(#[from] TransactionError), /// Invalid download response. Applicable for stages which /// rely on external downloaders #[error("Invalid download response: {0}")] @@ -66,59 +73,6 @@ impl StageError { } } -/// A database integrity error. -/// The sender stage error -#[derive(Error, Debug)] -#[allow(missing_docs)] -pub enum DatabaseIntegrityError { - /// The canonical header for a block is missing from the database. - #[error("No canonical header for block #{number}")] - CanonicalHeader { - /// The block number key - number: BlockNumber, - }, - /// A header is missing from the database. - #[error("No header for block #{number}")] - Header { - /// The block number key - number: BlockNumber, - }, - /// A ommers are missing. - #[error("Block ommers not found for block #{number}")] - Ommers { - /// The block number key - number: BlockNumber, - }, - /// A block body is missing. - #[error("Block body not found for block #{number}")] - BlockBody { - /// The block number key - number: BlockNumber, - }, - /// The transaction is missing - #[error("Transaction #{id} not found")] - Transaction { - /// The transaction id - id: TxNumber, - }, - #[error("Block transition not found for block #{number}")] - BlockTransition { number: BlockNumber }, - #[error("Gap in transaction table. Missing tx number #{missing}.")] - TransactionsGap { missing: TxNumber }, - #[error("Gap in transaction signer table. Missing tx number #{missing}.")] - TransactionsSignerGap { missing: TxNumber }, - #[error("Got to the end of transaction table")] - EndOfTransactionTable, - #[error("Got to the end of the transaction sender table")] - EndOfTransactionSenderTable, - /// The total difficulty from the block header is missing. - #[error("Total difficulty not found for block #{number}")] - TotalDifficulty { - /// The block number key - number: BlockNumber, - }, -} - /// A pipeline execution error. #[derive(Error, Debug)] pub enum PipelineError { diff --git a/crates/stages/src/lib.rs b/crates/stages/src/lib.rs index 70b32da83..df4fefb12 100644 --- a/crates/stages/src/lib.rs +++ b/crates/stages/src/lib.rs @@ -49,7 +49,6 @@ //! .build(); //! # //! ``` -mod db; mod error; mod id; mod pipeline; @@ -69,7 +68,6 @@ pub mod stages; pub mod sets; -pub use db::Transaction; pub use error::*; pub use id::*; pub use pipeline::*; diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index 834cc958a..797c9b336 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -1,12 +1,10 @@ -use crate::{ - db::Transaction, error::*, util::opt, ExecInput, ExecOutput, Stage, StageError, StageId, - UnwindInput, -}; +use crate::{error::*, util::opt, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput}; use metrics::Gauge; use reth_db::database::Database; use reth_interfaces::sync::{SyncState, SyncStateUpdater}; use reth_metrics_derive::Metrics; use reth_primitives::BlockNumber; +use reth_provider::Transaction; use std::{ collections::HashMap, fmt::{Debug, Formatter}, @@ -421,7 +419,7 @@ mod tests { use crate::{StageId, UnwindOutput}; use assert_matches::assert_matches; use reth_db::mdbx::{self, test_utils, EnvKind}; - use reth_interfaces::{consensus, sync::NoopSyncStateUpdate}; + use reth_interfaces::{consensus, provider::Error as ProviderError, sync::NoopSyncStateUpdate}; use tokio_stream::StreamExt; use utils::TestStage; @@ -682,15 +680,15 @@ mod tests { let db = test_utils::create_test_db::(EnvKind::RW); let mut pipeline: Pipeline<_, NoopSyncStateUpdate> = Pipeline::builder() .add_stage(TestStage::new(StageId("Fatal")).add_exec(Err( - StageError::DatabaseIntegrity(DatabaseIntegrityError::BlockBody { number: 5 }), + StageError::DatabaseIntegrity(ProviderError::BlockBody { number: 5 }), ))) .build(); let result = pipeline.run(db).await; assert_matches!( result, - Err(PipelineError::Stage(StageError::DatabaseIntegrity( - DatabaseIntegrityError::BlockBody { number: 5 } - ))) + Err(PipelineError::Stage(StageError::DatabaseIntegrity(ProviderError::BlockBody { + number: 5 + }))) ); } diff --git a/crates/stages/src/stage.rs b/crates/stages/src/stage.rs index 0d5580ed3..dc3cd117e 100644 --- a/crates/stages/src/stage.rs +++ b/crates/stages/src/stage.rs @@ -1,9 +1,10 @@ use std::ops::RangeInclusive; -use crate::{db::Transaction, error::StageError, id::StageId}; +use crate::{error::StageError, id::StageId}; use async_trait::async_trait; use reth_db::database::Database; use reth_primitives::BlockNumber; +use reth_provider::Transaction; /// Stage execution input, see [Stage::execute]. #[derive(Debug, Default, PartialEq, Eq, Clone, Copy)] diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index dc230d896..bbb0f77fa 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -1,6 +1,6 @@ use crate::{ - db::Transaction, exec_or_return, DatabaseIntegrityError, ExecAction, ExecInput, ExecOutput, - Stage, StageError, StageId, UnwindInput, UnwindOutput, + exec_or_return, ExecAction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, + UnwindOutput, }; use futures_util::TryStreamExt; use reth_db::{ @@ -13,7 +13,9 @@ use reth_db::{ use reth_interfaces::{ consensus::Consensus, p2p::bodies::{downloader::BodyDownloader, response::BlockResponse}, + provider::Error as ProviderError, }; +use reth_provider::Transaction; use std::sync::Arc; use tracing::*; @@ -157,7 +159,7 @@ impl Stage for BodyStage { // transition at the last transaction of this block. let td = td_cursor .seek(block_number)? - .ok_or(DatabaseIntegrityError::TotalDifficulty { number: block_number })? + .ok_or(ProviderError::TotalDifficulty { number: block_number })? .1; let has_reward = self.consensus.has_block_reward(td.into()); if has_reward { diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index edab82fb8..5631fdc0b 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -1,6 +1,6 @@ use crate::{ - db::Transaction, exec_or_return, DatabaseIntegrityError, ExecAction, ExecInput, ExecOutput, - Stage, StageError, StageId, UnwindInput, UnwindOutput, + exec_or_return, ExecAction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, + UnwindOutput, }; use reth_db::{ cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO}, @@ -13,10 +13,11 @@ use reth_executor::{ execution_result::AccountChangeSet, revm_wrap::{State, SubState}, }; +use reth_interfaces::provider::Error as ProviderError; use reth_primitives::{ Address, Block, ChainSpec, Hardfork, Header, StorageEntry, H256, MAINNET, U256, }; -use reth_provider::LatestStateProviderRef; +use reth_provider::{LatestStateProviderRef, Transaction}; use std::fmt::Debug; use tracing::*; @@ -96,10 +97,9 @@ impl ExecutionStage { let (number, header) = entry?; let (_, td) = td_cursor .seek_exact(number)? - .ok_or(DatabaseIntegrityError::TotalDifficulty { number })?; - let (_, body) = bodies_cursor - .seek_exact(number)? - .ok_or(DatabaseIntegrityError::BlockBody { number })?; + .ok_or(ProviderError::TotalDifficulty { number })?; + let (_, body) = + bodies_cursor.seek_exact(number)?.ok_or(ProviderError::BlockBody { number })?; let (_, stored_ommers) = ommers_cursor.seek_exact(number)?.unwrap_or_default(); Ok((header, td.into(), body, stored_ommers.ommers)) }) @@ -120,10 +120,10 @@ impl ExecutionStage { // get next N transactions. for index in body.tx_id_range() { let (tx_index, tx) = - tx_walker.next().ok_or(DatabaseIntegrityError::EndOfTransactionTable)??; + tx_walker.next().ok_or(ProviderError::EndOfTransactionTable)??; if tx_index != index { error!(target: "sync::stages::execution", block = block_number, expected = index, found = tx_index, ?body, "Transaction gap"); - return Err(DatabaseIntegrityError::TransactionsGap { missing: tx_index }.into()) + return Err(ProviderError::TransactionsGap { missing: tx_index }.into()) } transactions.push(tx); } @@ -132,14 +132,11 @@ impl ExecutionStage { let mut tx_sender_walker = tx_sender.walk(Some(body.start_tx_id))?; let mut signers = Vec::with_capacity(body.tx_count as usize); for index in body.tx_id_range() { - let (tx_index, tx) = tx_sender_walker - .next() - .ok_or(DatabaseIntegrityError::EndOfTransactionSenderTable)??; + let (tx_index, tx) = + tx_sender_walker.next().ok_or(ProviderError::EndOfTransactionSenderTable)??; if tx_index != index { error!(target: "sync::stages::execution", block = block_number, expected = index, found = tx_index, ?body, "Signer gap"); - return Err( - DatabaseIntegrityError::TransactionsSignerGap { missing: tx_index }.into() - ) + return Err(ProviderError::TransactionsSignerGap { missing: tx_index }.into()) } signers.push(tx); } diff --git a/crates/stages/src/stages/hashing_account.rs b/crates/stages/src/stages/hashing_account.rs index 3911c0500..8fba0a2ea 100644 --- a/crates/stages/src/stages/hashing_account.rs +++ b/crates/stages/src/stages/hashing_account.rs @@ -1,6 +1,4 @@ -use crate::{ - db::Transaction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput, -}; +use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, database::Database, @@ -8,6 +6,7 @@ use reth_db::{ transaction::{DbTx, DbTxMut}, }; use reth_primitives::{keccak256, Account, Address}; +use reth_provider::Transaction; use std::{ collections::{BTreeMap, BTreeSet}, fmt::Debug, diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs index 76289d2bb..be2312c77 100644 --- a/crates/stages/src/stages/hashing_storage.rs +++ b/crates/stages/src/stages/hashing_storage.rs @@ -1,6 +1,4 @@ -use crate::{ - db::Transaction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput, -}; +use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; use reth_db::{ cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO}, database::Database, @@ -9,6 +7,7 @@ use reth_db::{ transaction::{DbTx, DbTxMut}, }; use reth_primitives::{keccak256, Address, StorageEntry, H256, U256}; +use reth_provider::Transaction; use std::{ collections::{BTreeMap, BTreeSet}, fmt::Debug, diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index 0dc9019af..eda0ccb1d 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -1,7 +1,4 @@ -use crate::{ - db::Transaction, DatabaseIntegrityError, ExecInput, ExecOutput, Stage, StageError, StageId, - UnwindInput, UnwindOutput, -}; +use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; use futures_util::StreamExt; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, @@ -12,8 +9,10 @@ use reth_db::{ use reth_interfaces::{ consensus::{Consensus, ForkchoiceState}, p2p::headers::downloader::{HeaderDownloader, SyncTarget}, + provider::Error as ProviderError, }; use reth_primitives::{BlockNumber, SealedHeader}; +use reth_provider::Transaction; use std::sync::Arc; use tracing::*; @@ -60,7 +59,7 @@ where let mut header_cursor = tx.cursor_read::()?; let (head_num, _) = header_cursor .seek_exact(stage_progress)? - .ok_or(DatabaseIntegrityError::CanonicalHeader { number: stage_progress })?; + .ok_or(ProviderError::CanonicalHeader { block_number: stage_progress })?; // Check if the next entry is congruent Ok(header_cursor.next()?.map(|(next_num, _)| head_num + 1 == next_num).unwrap_or_default()) } @@ -80,12 +79,12 @@ where // Get head hash and reposition the cursor let (head_num, head_hash) = cursor .seek_exact(stage_progress)? - .ok_or(DatabaseIntegrityError::CanonicalHeader { number: stage_progress })?; + .ok_or(ProviderError::CanonicalHeader { block_number: stage_progress })?; // Construct head let (_, head) = header_cursor .seek_exact(head_num)? - .ok_or(DatabaseIntegrityError::Header { number: head_num })?; + .ok_or(ProviderError::Header { number: head_num })?; let local_head = SealedHeader::new(head, head_hash); // Look up the next header @@ -94,7 +93,7 @@ where .map(|(next_num, next_hash)| -> Result { let (_, next) = header_cursor .seek_exact(next_num)? - .ok_or(DatabaseIntegrityError::Header { number: next_num })?; + .ok_or(ProviderError::Header { number: next_num })?; Ok(SealedHeader::new(next, next_hash)) }) .transpose()?; @@ -500,8 +499,8 @@ mod tests { // Empty database assert_matches!( stage.get_sync_gap(&tx, stage_progress).await, - Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::CanonicalHeader { number })) - if number == stage_progress + Err(StageError::DatabaseIntegrity(ProviderError::CanonicalHeader { block_number })) + if block_number == stage_progress ); // Checkpoint and no gap diff --git a/crates/stages/src/stages/index_account_history.rs b/crates/stages/src/stages/index_account_history.rs index b240a95b9..0f91fc3eb 100644 --- a/crates/stages/src/stages/index_account_history.rs +++ b/crates/stages/src/stages/index_account_history.rs @@ -1,6 +1,4 @@ -use crate::{ - db::Transaction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput, -}; +use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; use itertools::Itertools; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, @@ -10,6 +8,7 @@ use reth_db::{ transaction::{DbTx, DbTxMut, DbTxMutGAT}, TransitionList, }; +use reth_provider::Transaction; use reth_primitives::{Address, TransitionId}; use std::{collections::BTreeMap, fmt::Debug}; diff --git a/crates/stages/src/stages/index_storage_history.rs b/crates/stages/src/stages/index_storage_history.rs index 2afde1a47..c938ac64c 100644 --- a/crates/stages/src/stages/index_storage_history.rs +++ b/crates/stages/src/stages/index_storage_history.rs @@ -1,6 +1,4 @@ -use crate::{ - db::Transaction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput, -}; +use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; use itertools::Itertools; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, @@ -11,6 +9,7 @@ use reth_db::{ TransitionList, }; use reth_primitives::{Address, TransitionId, H256}; +use reth_provider::Transaction; use std::{collections::BTreeMap, fmt::Debug}; use tracing::*; diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index 2525608d3..f13f91535 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -1,9 +1,10 @@ use crate::{ - db::Transaction, trie::DBTrieLoader, ExecInput, ExecOutput, Stage, StageError, StageId, - UnwindInput, UnwindOutput, + trie::DBTrieLoader, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, + UnwindOutput, }; use reth_db::{database::Database, tables, transaction::DbTx}; use reth_interfaces::consensus; +use reth_provider::Transaction; use std::fmt::Debug; use tracing::*; diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index b96ec641f..5e700da47 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -1,6 +1,6 @@ use crate::{ - db::Transaction, exec_or_return, ExecAction, ExecInput, ExecOutput, Stage, StageError, StageId, - UnwindInput, UnwindOutput, + exec_or_return, ExecAction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, + UnwindOutput, }; use futures_util::StreamExt; @@ -12,6 +12,7 @@ use reth_db::{ transaction::{DbTx, DbTxMut}, }; use reth_primitives::TxNumber; +use reth_provider::Transaction; use std::fmt::Debug; use thiserror::Error; use tokio::sync::mpsc; diff --git a/crates/stages/src/stages/total_difficulty.rs b/crates/stages/src/stages/total_difficulty.rs index 24cd27ed6..fb07d551c 100644 --- a/crates/stages/src/stages/total_difficulty.rs +++ b/crates/stages/src/stages/total_difficulty.rs @@ -1,6 +1,6 @@ use crate::{ - db::Transaction, exec_or_return, DatabaseIntegrityError, ExecAction, ExecInput, ExecOutput, - Stage, StageError, StageId, UnwindInput, UnwindOutput, + exec_or_return, ExecAction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, + UnwindOutput, }; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, @@ -8,8 +8,9 @@ use reth_db::{ tables, transaction::{DbTx, DbTxMut}, }; -use reth_interfaces::consensus::Error; +use reth_interfaces::{consensus::Error, provider::Error as ProviderError}; use reth_primitives::{ChainSpec, Hardfork, EMPTY_OMMER_ROOT, MAINNET, U256}; +use reth_provider::Transaction; use tracing::*; const TOTAL_DIFFICULTY: StageId = StageId("TotalDifficulty"); @@ -59,7 +60,7 @@ impl Stage for TotalDifficultyStage { let last_header_number = input.stage_progress.unwrap_or_default(); let last_entry = cursor_td .seek_exact(last_header_number)? - .ok_or(DatabaseIntegrityError::TotalDifficulty { number: last_header_number })?; + .ok_or(ProviderError::TotalDifficulty { number: last_header_number })?; let mut td: U256 = last_entry.1.into(); debug!(target: "sync::stages::total_difficulty", ?td, block_number = last_header_number, "Last total difficulty entry"); diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index 7263a8a91..978a959b9 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -1,6 +1,6 @@ use crate::{ - db::Transaction, exec_or_return, ExecAction, ExecInput, ExecOutput, Stage, StageError, StageId, - UnwindInput, UnwindOutput, + exec_or_return, ExecAction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, + UnwindOutput, }; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, @@ -8,6 +8,7 @@ use reth_db::{ tables, transaction::{DbTx, DbTxMut}, }; +use reth_provider::Transaction; use tracing::*; const TRANSACTION_LOOKUP: StageId = StageId("TransactionLookup"); diff --git a/crates/stages/src/test_utils/runner.rs b/crates/stages/src/test_utils/runner.rs index b63d0dcc6..7e4d6f7ed 100644 --- a/crates/stages/src/test_utils/runner.rs +++ b/crates/stages/src/test_utils/runner.rs @@ -1,10 +1,10 @@ +use super::TestTransaction; +use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use reth_db::mdbx::{Env, WriteMap}; +use reth_provider::Transaction; use std::borrow::Borrow; use tokio::sync::oneshot; -use super::TestTransaction; -use crate::{db::Transaction, ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; - #[derive(thiserror::Error, Debug)] pub(crate) enum TestRunnerError { #[error("Database error occurred.")] diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index 25aac890e..a040c6513 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -12,10 +12,9 @@ use reth_db::{ Error as DbError, }; use reth_primitives::{BlockNumber, SealedBlock, SealedHeader, U256}; +use reth_provider::Transaction; use std::{borrow::Borrow, path::Path, sync::Arc}; -use crate::db::Transaction; - /// The [TestTransaction] is used as an internal /// database for testing stage implementation. /// diff --git a/crates/stages/src/trie/mod.rs b/crates/stages/src/trie/mod.rs index be1f30501..1530fb04d 100644 --- a/crates/stages/src/trie/mod.rs +++ b/crates/stages/src/trie/mod.rs @@ -1,4 +1,3 @@ -use crate::Transaction; use cita_trie::{PatriciaTrie, Trie}; use hasher::HasherKeccak; use reth_db::{ @@ -12,6 +11,7 @@ use reth_primitives::{ keccak256, proofs::EMPTY_ROOT, Account, Address, StorageEntry, StorageTrieEntry, TransitionId, H256, KECCAK_EMPTY, U256, }; +use reth_provider::Transaction; use reth_rlp::{ encode_fixed_size, Decodable, DecodeError, Encodable, RlpDecodable, RlpEncodable, EMPTY_STRING_CODE, diff --git a/crates/storage/provider/src/lib.rs b/crates/storage/provider/src/lib.rs index 8227a46a5..d054cb32f 100644 --- a/crates/storage/provider/src/lib.rs +++ b/crates/storage/provider/src/lib.rs @@ -22,6 +22,10 @@ pub use providers::{ LatestStateProviderRef, ShareableDatabase, }; +/// Helper types for interacting with the database +mod transaction; +pub use transaction::{Transaction, TransactionError}; + /// Common database utilities. mod utils; pub use utils::{insert_block, insert_canonical_block}; diff --git a/crates/stages/src/db.rs b/crates/storage/provider/src/transaction.rs similarity index 74% rename from crates/stages/src/db.rs rename to crates/storage/provider/src/transaction.rs index 7152b0038..798b2c6fe 100644 --- a/crates/stages/src/db.rs +++ b/crates/storage/provider/src/transaction.rs @@ -1,9 +1,4 @@ #![allow(dead_code)] -use std::{ - fmt::Debug, - ops::{Deref, DerefMut}, -}; - use reth_db::{ cursor::DbCursorRO, database::{Database, DatabaseGAT}, @@ -11,11 +6,13 @@ use reth_db::{ table::Table, tables, transaction::{DbTx, DbTxMut}, - Error, }; +use reth_interfaces::{db::Error as DbError, provider::Error as ProviderError}; use reth_primitives::{BlockHash, BlockNumber, Header, TransitionId, TxNumber}; - -use crate::{DatabaseIntegrityError, StageError}; +use std::{ + fmt::Debug, + ops::{Deref, DerefMut}, +}; /// A container for any DB transaction that will open a new inner transaction when the current /// one is committed. @@ -70,7 +67,7 @@ where /// Create a new container with the given database handle. /// /// A new inner transaction will be opened. - pub fn new(db: &'this DB) -> Result { + pub fn new(db: &'this DB) -> Result { Ok(Self { db, tx: Some(db.tx_mut()?) }) } @@ -85,14 +82,14 @@ where /// /// Panics if an inner transaction does not exist. This should never be the case unless /// [Transaction::close] was called without following up with a call to [Transaction::open]. - pub fn commit(&mut self) -> Result { + pub fn commit(&mut self) -> Result { let success = if let Some(tx) = self.tx.take() { tx.commit()? } else { false }; self.tx = Some(self.db.tx_mut()?); Ok(success) } /// Open a new inner transaction. - pub fn open(&mut self) -> Result<(), Error> { + pub fn open(&mut self) -> Result<(), DbError> { self.tx = Some(self.db.tx_mut()?); Ok(()) } @@ -103,41 +100,37 @@ where } /// Query [tables::CanonicalHeaders] table for block hash by block number - pub(crate) fn get_block_hash(&self, number: BlockNumber) -> Result { + pub(crate) fn get_block_hash( + &self, + block_number: BlockNumber, + ) -> Result { let hash = self - .get::(number)? - .ok_or(DatabaseIntegrityError::CanonicalHeader { number })?; + .get::(block_number)? + .ok_or(ProviderError::CanonicalHeader { block_number })?; Ok(hash) } /// Query the block body by number. - pub(crate) fn get_block_body( - &self, - number: BlockNumber, - ) -> Result { - let body = self - .get::(number)? - .ok_or(DatabaseIntegrityError::BlockBody { number })?; + pub fn get_block_body(&self, number: BlockNumber) -> Result { + let body = + self.get::(number)?.ok_or(ProviderError::BlockBody { number })?; Ok(body) } /// Query the last transition of the block by [BlockNumber] key - pub(crate) fn get_block_transition( - &self, - key: BlockNumber, - ) -> Result { + pub fn get_block_transition(&self, key: BlockNumber) -> Result { let last_transition_id = self .get::(key)? - .ok_or(DatabaseIntegrityError::BlockTransition { number: key })?; + .ok_or(ProviderError::BlockTransition { block_number: key })?; Ok(last_transition_id) } /// Get the next start transaction id and transition for the `block` by looking at the previous /// block. Returns Zero/Zero for Genesis. - pub(crate) fn get_next_block_ids( + pub fn get_next_block_ids( &self, block: BlockNumber, - ) -> Result<(TxNumber, TransitionId), StageError> { + ) -> Result<(TxNumber, TransitionId), TransactionError> { if block == 0 { return Ok((0, 0)) } @@ -146,21 +139,20 @@ where let prev_body = self.get_block_body(prev_number)?; let last_transition = self .get::(prev_number)? - .ok_or(DatabaseIntegrityError::BlockTransition { number: prev_number })?; + .ok_or(ProviderError::BlockTransition { block_number: prev_number })?; Ok((prev_body.start_tx_id + prev_body.tx_count, last_transition)) } /// Query the block header by number - pub(crate) fn get_header(&self, number: BlockNumber) -> Result { - let header = self - .get::(number)? - .ok_or(DatabaseIntegrityError::Header { number })?; + pub fn get_header(&self, number: BlockNumber) -> Result { + let header = + self.get::(number)?.ok_or(ProviderError::Header { number })?; Ok(header) } /// Unwind table by some number key #[inline] - pub(crate) fn unwind_table_by_num(&self, num: u64) -> Result<(), Error> + pub fn unwind_table_by_num(&self, num: u64) -> Result<(), DbError> where DB: Database, T: Table, @@ -173,7 +165,7 @@ where &self, block: BlockNumber, mut selector: F, - ) -> Result<(), Error> + ) -> Result<(), DbError> where DB: Database, T: Table, @@ -192,7 +184,7 @@ where } /// Unwind a table forward by a [Walker][reth_db::abstraction::cursor::Walker] on another table - pub(crate) fn unwind_table_by_walker(&self, start_at: T1::Key) -> Result<(), Error> + pub fn unwind_table_by_walker(&self, start_at: T1::Key) -> Result<(), DbError> where DB: Database, T1: Table, @@ -206,3 +198,14 @@ where Ok(()) } } + +/// An error that can occur when using the transaction container +#[derive(Debug, thiserror::Error)] +pub enum TransactionError { + /// The transaction encountered a database error. + #[error("Database error: {0}")] + Database(#[from] DbError), + /// The transaction encountered a database integrity error. + #[error("A database integrity error occurred: {0}")] + DatabaseIntegrity(#[from] ProviderError), +} diff --git a/crates/storage/provider/src/utils.rs b/crates/storage/provider/src/utils.rs index c53d693f8..3c6e5d60b 100644 --- a/crates/storage/provider/src/utils.rs +++ b/crates/storage/provider/src/utils.rs @@ -44,7 +44,7 @@ pub fn insert_block<'a, TX: DbTxMut<'a> + DbTx<'a>>( let prev_block_num = block.number - 1; let prev_body = tx .get::(prev_block_num)? - .ok_or(ProviderError::BlockBody { block_number: prev_block_num })?; + .ok_or(ProviderError::BlockBody { number: prev_block_num })?; let last_transition_id = tx .get::(prev_block_num)? .ok_or(ProviderError::BlockTransition { block_number: prev_block_num })?; diff --git a/docs/repo/crates/db.md b/docs/repo/crates/db.md index 27b2de013..a505f28ff 100644 --- a/docs/repo/crates/db.md +++ b/docs/repo/crates/db.md @@ -194,7 +194,7 @@ pub trait DbTxMut<'tx>: for<'a> DbTxMutGAT<'a> { Lets take a look at the `DbTx` and `DbTxMut` traits in action. Revisiting the `Transaction` struct as an example, the `Transaction::get_block_hash()` method uses the `DbTx::get()` function to get a block header hash in the form of `self.get::(number)`. -[File: crates/stages/src/db.rs](https://github.com/paradigmxyz/reth/blob/main/crates/stages/src/db.rs#L106) +[File: crates/storage/provider/src/transaction.rs](https://github.com/paradigmxyz/reth/blob/main/crates/storage/provider/src/transaction.rs#L106) ```rust ignore @@ -208,7 +208,7 @@ where pub(crate) fn get_block_hash(&self, number: BlockNumber) -> Result { let hash = self .get::(number)? - .ok_or(DatabaseIntegrityError::CanonicalHash { number })?; + .ok_or(ProviderError::CanonicalHash { number })?; Ok(hash) } //--snip--