chore: move Transaction container to reth_provider (#1238)

Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
Dan Cline
2023-02-11 00:25:26 -05:00
committed by GitHub
parent b23bd7c609
commit 3d0864bbb9
25 changed files with 181 additions and 179 deletions

View File

@ -6,20 +6,20 @@ use crate::{
dirs::{ConfigPath, DbPath, PlatformPath},
prometheus_exporter,
};
use clap::{Parser, ValueEnum};
use reth_consensus::beacon::BeaconConsensus;
use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder;
use reth_net_nat::NatResolver;
use reth_primitives::ChainSpec;
use reth_provider::Transaction;
use reth_staged_sync::{
utils::{chainspec::chain_spec_value_parser, init::init_db},
Config,
};
use reth_stages::{
stages::{BodyStage, ExecutionStage, SenderRecoveryStage},
ExecInput, Stage, StageId, Transaction, UnwindInput,
ExecInput, Stage, StageId, UnwindInput,
};
use clap::{Parser, ValueEnum};
use std::{net::SocketAddr, sync::Arc};
use tracing::*;

View File

@ -13,8 +13,9 @@ use reth_primitives::{
keccak256, Account as RethAccount, Address, ChainSpec, ForkCondition, Hardfork, JsonU256,
SealedBlock, SealedHeader, StorageEntry, H256, U256,
};
use reth_provider::Transaction;
use reth_rlp::Decodable;
use reth_stages::{stages::ExecutionStage, ExecInput, Stage, StageId, Transaction};
use reth_stages::{stages::ExecutionStage, ExecInput, Stage, StageId};
use std::{
collections::HashMap,
ffi::OsStr,

View File

@ -1,25 +1,71 @@
use reth_primitives::{Address, BlockHash, BlockNumber, TransitionId, H256};
use reth_primitives::{Address, BlockHash, BlockNumber, TransitionId, TxNumber, H256};
/// KV error type. They are using u32 to represent error code.
#[allow(missing_docs)]
#[derive(Debug, thiserror::Error, PartialEq, Eq, Clone)]
pub enum Error {
/// The header hash is missing from the database.
#[error("Block number {block_number} does not exist in database")]
BlockNumber { block_number: BlockNumber },
CanonicalHeader { block_number: BlockNumber },
/// A header body is missing from the database.
#[error("No header for block #{number}")]
Header {
/// The block number key
number: BlockNumber,
},
/// The header number was not found for the given block hash.
#[error("Block hash {block_hash:?} does not exist in Headers table")]
BlockHash { block_hash: BlockHash },
#[error("Block body not exists #{block_number}")]
BlockBody { block_number: BlockNumber },
/// A block body is missing.
#[error("Block body not found for block #{number}")]
BlockBody { number: BlockNumber },
/// The block transition id for a certain block number is missing.
#[error("Block transition id does not exist for block #{block_number}")]
BlockTransition { block_number: BlockNumber },
#[error("Block number {block_number} with hash #{received_hash:?} is not canonical block. Canonical block hash is #{expected_hash:?}")]
NonCanonicalBlock {
block_number: BlockNumber,
expected_hash: BlockHash,
received_hash: BlockHash,
},
/// The transition id was found for the given address and storage key, but the changeset was
/// not found.
#[error("Storage ChangeSet address: ({address:?} key: {storage_key:?}) for transition:#{transition_id} does not exist")]
StorageChangeset { transition_id: TransitionId, address: Address, storage_key: H256 },
StorageChangeset {
/// The transition id found for the address and storage key
transition_id: TransitionId,
/// The account address
address: Address,
/// The storage key
storage_key: H256,
},
/// The transition id was found for the given address, but the changeset was not found.
#[error("Account {address:?} ChangeSet for transition #{transition_id} does not exist")]
AccountChangeset { transition_id: TransitionId, address: Address },
AccountChangeset {
/// Transition id found for the address
transition_id: TransitionId,
/// The account address
address: Address,
},
/// The total difficulty for a block is missing.
#[error("Total difficulty not found for block #{number}")]
TotalDifficulty { number: BlockNumber },
/// The transaction is missing
#[error("Transaction #{id} not found")]
Transaction {
/// The transaction id
id: TxNumber,
},
/// A ommers are missing.
#[error("Block ommers not found for block #{number}")]
Ommers {
/// The block number key
number: BlockNumber,
},
/// There is a gap in the transaction table, at a missing transaction number.
#[error("Gap in transaction table. Missing tx number #{missing}.")]
TransactionsGap { missing: TxNumber },
/// There is a gap in the senders table, at a missing transaction number.
#[error("Gap in transaction signer table. Missing tx number #{missing}.")]
TransactionsSignerGap { missing: TxNumber },
/// Reached the end of the transaction table.
#[error("Got to the end of transaction table")]
EndOfTransactionTable,
/// Reached the end of the transaction sender table.
#[error("Got to the end of the transaction sender table")]
EndOfTransactionSenderTable,
}

View File

@ -1,6 +1,10 @@
use crate::pipeline::PipelineEvent;
use reth_interfaces::{consensus, db::Error as DbError, executor, p2p::error::DownloadError};
use reth_primitives::{BlockNumber, TxNumber};
use reth_interfaces::{
consensus, db::Error as DbError, executor, p2p::error::DownloadError,
provider::Error as ProviderError,
};
use reth_primitives::BlockNumber;
use reth_provider::TransactionError;
use thiserror::Error;
use tokio::sync::mpsc::error::SendError;
@ -31,7 +35,10 @@ pub enum StageError {
},
/// The stage encountered a database integrity error.
#[error("A database integrity error occurred: {0}")]
DatabaseIntegrity(#[from] DatabaseIntegrityError),
DatabaseIntegrity(#[from] ProviderError),
/// The stage encountered an error related to the current database transaction.
#[error("A database transaction error occurred: {0}")]
Transaction(#[from] TransactionError),
/// Invalid download response. Applicable for stages which
/// rely on external downloaders
#[error("Invalid download response: {0}")]
@ -66,59 +73,6 @@ impl StageError {
}
}
/// A database integrity error.
/// The sender stage error
#[derive(Error, Debug)]
#[allow(missing_docs)]
pub enum DatabaseIntegrityError {
/// The canonical header for a block is missing from the database.
#[error("No canonical header for block #{number}")]
CanonicalHeader {
/// The block number key
number: BlockNumber,
},
/// A header is missing from the database.
#[error("No header for block #{number}")]
Header {
/// The block number key
number: BlockNumber,
},
/// A ommers are missing.
#[error("Block ommers not found for block #{number}")]
Ommers {
/// The block number key
number: BlockNumber,
},
/// A block body is missing.
#[error("Block body not found for block #{number}")]
BlockBody {
/// The block number key
number: BlockNumber,
},
/// The transaction is missing
#[error("Transaction #{id} not found")]
Transaction {
/// The transaction id
id: TxNumber,
},
#[error("Block transition not found for block #{number}")]
BlockTransition { number: BlockNumber },
#[error("Gap in transaction table. Missing tx number #{missing}.")]
TransactionsGap { missing: TxNumber },
#[error("Gap in transaction signer table. Missing tx number #{missing}.")]
TransactionsSignerGap { missing: TxNumber },
#[error("Got to the end of transaction table")]
EndOfTransactionTable,
#[error("Got to the end of the transaction sender table")]
EndOfTransactionSenderTable,
/// The total difficulty from the block header is missing.
#[error("Total difficulty not found for block #{number}")]
TotalDifficulty {
/// The block number key
number: BlockNumber,
},
}
/// A pipeline execution error.
#[derive(Error, Debug)]
pub enum PipelineError {

View File

@ -49,7 +49,6 @@
//! .build();
//! #
//! ```
mod db;
mod error;
mod id;
mod pipeline;
@ -69,7 +68,6 @@ pub mod stages;
pub mod sets;
pub use db::Transaction;
pub use error::*;
pub use id::*;
pub use pipeline::*;

View File

@ -1,12 +1,10 @@
use crate::{
db::Transaction, error::*, util::opt, ExecInput, ExecOutput, Stage, StageError, StageId,
UnwindInput,
};
use crate::{error::*, util::opt, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput};
use metrics::Gauge;
use reth_db::database::Database;
use reth_interfaces::sync::{SyncState, SyncStateUpdater};
use reth_metrics_derive::Metrics;
use reth_primitives::BlockNumber;
use reth_provider::Transaction;
use std::{
collections::HashMap,
fmt::{Debug, Formatter},
@ -421,7 +419,7 @@ mod tests {
use crate::{StageId, UnwindOutput};
use assert_matches::assert_matches;
use reth_db::mdbx::{self, test_utils, EnvKind};
use reth_interfaces::{consensus, sync::NoopSyncStateUpdate};
use reth_interfaces::{consensus, provider::Error as ProviderError, sync::NoopSyncStateUpdate};
use tokio_stream::StreamExt;
use utils::TestStage;
@ -682,15 +680,15 @@ mod tests {
let db = test_utils::create_test_db::<mdbx::WriteMap>(EnvKind::RW);
let mut pipeline: Pipeline<_, NoopSyncStateUpdate> = Pipeline::builder()
.add_stage(TestStage::new(StageId("Fatal")).add_exec(Err(
StageError::DatabaseIntegrity(DatabaseIntegrityError::BlockBody { number: 5 }),
StageError::DatabaseIntegrity(ProviderError::BlockBody { number: 5 }),
)))
.build();
let result = pipeline.run(db).await;
assert_matches!(
result,
Err(PipelineError::Stage(StageError::DatabaseIntegrity(
DatabaseIntegrityError::BlockBody { number: 5 }
)))
Err(PipelineError::Stage(StageError::DatabaseIntegrity(ProviderError::BlockBody {
number: 5
})))
);
}

View File

@ -1,9 +1,10 @@
use std::ops::RangeInclusive;
use crate::{db::Transaction, error::StageError, id::StageId};
use crate::{error::StageError, id::StageId};
use async_trait::async_trait;
use reth_db::database::Database;
use reth_primitives::BlockNumber;
use reth_provider::Transaction;
/// Stage execution input, see [Stage::execute].
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]

View File

@ -1,6 +1,6 @@
use crate::{
db::Transaction, exec_or_return, DatabaseIntegrityError, ExecAction, ExecInput, ExecOutput,
Stage, StageError, StageId, UnwindInput, UnwindOutput,
exec_or_return, ExecAction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput,
UnwindOutput,
};
use futures_util::TryStreamExt;
use reth_db::{
@ -13,7 +13,9 @@ use reth_db::{
use reth_interfaces::{
consensus::Consensus,
p2p::bodies::{downloader::BodyDownloader, response::BlockResponse},
provider::Error as ProviderError,
};
use reth_provider::Transaction;
use std::sync::Arc;
use tracing::*;
@ -157,7 +159,7 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
// transition at the last transaction of this block.
let td = td_cursor
.seek(block_number)?
.ok_or(DatabaseIntegrityError::TotalDifficulty { number: block_number })?
.ok_or(ProviderError::TotalDifficulty { number: block_number })?
.1;
let has_reward = self.consensus.has_block_reward(td.into());
if has_reward {

View File

@ -1,6 +1,6 @@
use crate::{
db::Transaction, exec_or_return, DatabaseIntegrityError, ExecAction, ExecInput, ExecOutput,
Stage, StageError, StageId, UnwindInput, UnwindOutput,
exec_or_return, ExecAction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput,
UnwindOutput,
};
use reth_db::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
@ -13,10 +13,11 @@ use reth_executor::{
execution_result::AccountChangeSet,
revm_wrap::{State, SubState},
};
use reth_interfaces::provider::Error as ProviderError;
use reth_primitives::{
Address, Block, ChainSpec, Hardfork, Header, StorageEntry, H256, MAINNET, U256,
};
use reth_provider::LatestStateProviderRef;
use reth_provider::{LatestStateProviderRef, Transaction};
use std::fmt::Debug;
use tracing::*;
@ -96,10 +97,9 @@ impl ExecutionStage {
let (number, header) = entry?;
let (_, td) = td_cursor
.seek_exact(number)?
.ok_or(DatabaseIntegrityError::TotalDifficulty { number })?;
let (_, body) = bodies_cursor
.seek_exact(number)?
.ok_or(DatabaseIntegrityError::BlockBody { number })?;
.ok_or(ProviderError::TotalDifficulty { number })?;
let (_, body) =
bodies_cursor.seek_exact(number)?.ok_or(ProviderError::BlockBody { number })?;
let (_, stored_ommers) = ommers_cursor.seek_exact(number)?.unwrap_or_default();
Ok((header, td.into(), body, stored_ommers.ommers))
})
@ -120,10 +120,10 @@ impl ExecutionStage {
// get next N transactions.
for index in body.tx_id_range() {
let (tx_index, tx) =
tx_walker.next().ok_or(DatabaseIntegrityError::EndOfTransactionTable)??;
tx_walker.next().ok_or(ProviderError::EndOfTransactionTable)??;
if tx_index != index {
error!(target: "sync::stages::execution", block = block_number, expected = index, found = tx_index, ?body, "Transaction gap");
return Err(DatabaseIntegrityError::TransactionsGap { missing: tx_index }.into())
return Err(ProviderError::TransactionsGap { missing: tx_index }.into())
}
transactions.push(tx);
}
@ -132,14 +132,11 @@ impl ExecutionStage {
let mut tx_sender_walker = tx_sender.walk(Some(body.start_tx_id))?;
let mut signers = Vec::with_capacity(body.tx_count as usize);
for index in body.tx_id_range() {
let (tx_index, tx) = tx_sender_walker
.next()
.ok_or(DatabaseIntegrityError::EndOfTransactionSenderTable)??;
let (tx_index, tx) =
tx_sender_walker.next().ok_or(ProviderError::EndOfTransactionSenderTable)??;
if tx_index != index {
error!(target: "sync::stages::execution", block = block_number, expected = index, found = tx_index, ?body, "Signer gap");
return Err(
DatabaseIntegrityError::TransactionsSignerGap { missing: tx_index }.into()
)
return Err(ProviderError::TransactionsSignerGap { missing: tx_index }.into())
}
signers.push(tx);
}

View File

@ -1,6 +1,4 @@
use crate::{
db::Transaction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput,
};
use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput};
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
database::Database,
@ -8,6 +6,7 @@ use reth_db::{
transaction::{DbTx, DbTxMut},
};
use reth_primitives::{keccak256, Account, Address};
use reth_provider::Transaction;
use std::{
collections::{BTreeMap, BTreeSet},
fmt::Debug,

View File

@ -1,6 +1,4 @@
use crate::{
db::Transaction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput,
};
use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput};
use reth_db::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
database::Database,
@ -9,6 +7,7 @@ use reth_db::{
transaction::{DbTx, DbTxMut},
};
use reth_primitives::{keccak256, Address, StorageEntry, H256, U256};
use reth_provider::Transaction;
use std::{
collections::{BTreeMap, BTreeSet},
fmt::Debug,

View File

@ -1,7 +1,4 @@
use crate::{
db::Transaction, DatabaseIntegrityError, ExecInput, ExecOutput, Stage, StageError, StageId,
UnwindInput, UnwindOutput,
};
use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput};
use futures_util::StreamExt;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
@ -12,8 +9,10 @@ use reth_db::{
use reth_interfaces::{
consensus::{Consensus, ForkchoiceState},
p2p::headers::downloader::{HeaderDownloader, SyncTarget},
provider::Error as ProviderError,
};
use reth_primitives::{BlockNumber, SealedHeader};
use reth_provider::Transaction;
use std::sync::Arc;
use tracing::*;
@ -60,7 +59,7 @@ where
let mut header_cursor = tx.cursor_read::<tables::CanonicalHeaders>()?;
let (head_num, _) = header_cursor
.seek_exact(stage_progress)?
.ok_or(DatabaseIntegrityError::CanonicalHeader { number: stage_progress })?;
.ok_or(ProviderError::CanonicalHeader { block_number: stage_progress })?;
// Check if the next entry is congruent
Ok(header_cursor.next()?.map(|(next_num, _)| head_num + 1 == next_num).unwrap_or_default())
}
@ -80,12 +79,12 @@ where
// Get head hash and reposition the cursor
let (head_num, head_hash) = cursor
.seek_exact(stage_progress)?
.ok_or(DatabaseIntegrityError::CanonicalHeader { number: stage_progress })?;
.ok_or(ProviderError::CanonicalHeader { block_number: stage_progress })?;
// Construct head
let (_, head) = header_cursor
.seek_exact(head_num)?
.ok_or(DatabaseIntegrityError::Header { number: head_num })?;
.ok_or(ProviderError::Header { number: head_num })?;
let local_head = SealedHeader::new(head, head_hash);
// Look up the next header
@ -94,7 +93,7 @@ where
.map(|(next_num, next_hash)| -> Result<SealedHeader, StageError> {
let (_, next) = header_cursor
.seek_exact(next_num)?
.ok_or(DatabaseIntegrityError::Header { number: next_num })?;
.ok_or(ProviderError::Header { number: next_num })?;
Ok(SealedHeader::new(next, next_hash))
})
.transpose()?;
@ -500,8 +499,8 @@ mod tests {
// Empty database
assert_matches!(
stage.get_sync_gap(&tx, stage_progress).await,
Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::CanonicalHeader { number }))
if number == stage_progress
Err(StageError::DatabaseIntegrity(ProviderError::CanonicalHeader { block_number }))
if block_number == stage_progress
);
// Checkpoint and no gap

View File

@ -1,6 +1,4 @@
use crate::{
db::Transaction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput,
};
use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput};
use itertools::Itertools;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
@ -10,6 +8,7 @@ use reth_db::{
transaction::{DbTx, DbTxMut, DbTxMutGAT},
TransitionList,
};
use reth_provider::Transaction;
use reth_primitives::{Address, TransitionId};
use std::{collections::BTreeMap, fmt::Debug};

View File

@ -1,6 +1,4 @@
use crate::{
db::Transaction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput,
};
use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput};
use itertools::Itertools;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
@ -11,6 +9,7 @@ use reth_db::{
TransitionList,
};
use reth_primitives::{Address, TransitionId, H256};
use reth_provider::Transaction;
use std::{collections::BTreeMap, fmt::Debug};
use tracing::*;

View File

@ -1,9 +1,10 @@
use crate::{
db::Transaction, trie::DBTrieLoader, ExecInput, ExecOutput, Stage, StageError, StageId,
UnwindInput, UnwindOutput,
trie::DBTrieLoader, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput,
UnwindOutput,
};
use reth_db::{database::Database, tables, transaction::DbTx};
use reth_interfaces::consensus;
use reth_provider::Transaction;
use std::fmt::Debug;
use tracing::*;

View File

@ -1,6 +1,6 @@
use crate::{
db::Transaction, exec_or_return, ExecAction, ExecInput, ExecOutput, Stage, StageError, StageId,
UnwindInput, UnwindOutput,
exec_or_return, ExecAction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput,
UnwindOutput,
};
use futures_util::StreamExt;
@ -12,6 +12,7 @@ use reth_db::{
transaction::{DbTx, DbTxMut},
};
use reth_primitives::TxNumber;
use reth_provider::Transaction;
use std::fmt::Debug;
use thiserror::Error;
use tokio::sync::mpsc;

View File

@ -1,6 +1,6 @@
use crate::{
db::Transaction, exec_or_return, DatabaseIntegrityError, ExecAction, ExecInput, ExecOutput,
Stage, StageError, StageId, UnwindInput, UnwindOutput,
exec_or_return, ExecAction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput,
UnwindOutput,
};
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
@ -8,8 +8,9 @@ use reth_db::{
tables,
transaction::{DbTx, DbTxMut},
};
use reth_interfaces::consensus::Error;
use reth_interfaces::{consensus::Error, provider::Error as ProviderError};
use reth_primitives::{ChainSpec, Hardfork, EMPTY_OMMER_ROOT, MAINNET, U256};
use reth_provider::Transaction;
use tracing::*;
const TOTAL_DIFFICULTY: StageId = StageId("TotalDifficulty");
@ -59,7 +60,7 @@ impl<DB: Database> Stage<DB> for TotalDifficultyStage {
let last_header_number = input.stage_progress.unwrap_or_default();
let last_entry = cursor_td
.seek_exact(last_header_number)?
.ok_or(DatabaseIntegrityError::TotalDifficulty { number: last_header_number })?;
.ok_or(ProviderError::TotalDifficulty { number: last_header_number })?;
let mut td: U256 = last_entry.1.into();
debug!(target: "sync::stages::total_difficulty", ?td, block_number = last_header_number, "Last total difficulty entry");

View File

@ -1,6 +1,6 @@
use crate::{
db::Transaction, exec_or_return, ExecAction, ExecInput, ExecOutput, Stage, StageError, StageId,
UnwindInput, UnwindOutput,
exec_or_return, ExecAction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput,
UnwindOutput,
};
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
@ -8,6 +8,7 @@ use reth_db::{
tables,
transaction::{DbTx, DbTxMut},
};
use reth_provider::Transaction;
use tracing::*;
const TRANSACTION_LOOKUP: StageId = StageId("TransactionLookup");

View File

@ -1,10 +1,10 @@
use super::TestTransaction;
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use reth_db::mdbx::{Env, WriteMap};
use reth_provider::Transaction;
use std::borrow::Borrow;
use tokio::sync::oneshot;
use super::TestTransaction;
use crate::{db::Transaction, ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
#[derive(thiserror::Error, Debug)]
pub(crate) enum TestRunnerError {
#[error("Database error occurred.")]

View File

@ -12,10 +12,9 @@ use reth_db::{
Error as DbError,
};
use reth_primitives::{BlockNumber, SealedBlock, SealedHeader, U256};
use reth_provider::Transaction;
use std::{borrow::Borrow, path::Path, sync::Arc};
use crate::db::Transaction;
/// The [TestTransaction] is used as an internal
/// database for testing stage implementation.
///

View File

@ -1,4 +1,3 @@
use crate::Transaction;
use cita_trie::{PatriciaTrie, Trie};
use hasher::HasherKeccak;
use reth_db::{
@ -12,6 +11,7 @@ use reth_primitives::{
keccak256, proofs::EMPTY_ROOT, Account, Address, StorageEntry, StorageTrieEntry, TransitionId,
H256, KECCAK_EMPTY, U256,
};
use reth_provider::Transaction;
use reth_rlp::{
encode_fixed_size, Decodable, DecodeError, Encodable, RlpDecodable, RlpEncodable,
EMPTY_STRING_CODE,

View File

@ -22,6 +22,10 @@ pub use providers::{
LatestStateProviderRef, ShareableDatabase,
};
/// Helper types for interacting with the database
mod transaction;
pub use transaction::{Transaction, TransactionError};
/// Common database utilities.
mod utils;
pub use utils::{insert_block, insert_canonical_block};

View File

@ -1,9 +1,4 @@
#![allow(dead_code)]
use std::{
fmt::Debug,
ops::{Deref, DerefMut},
};
use reth_db::{
cursor::DbCursorRO,
database::{Database, DatabaseGAT},
@ -11,11 +6,13 @@ use reth_db::{
table::Table,
tables,
transaction::{DbTx, DbTxMut},
Error,
};
use reth_interfaces::{db::Error as DbError, provider::Error as ProviderError};
use reth_primitives::{BlockHash, BlockNumber, Header, TransitionId, TxNumber};
use crate::{DatabaseIntegrityError, StageError};
use std::{
fmt::Debug,
ops::{Deref, DerefMut},
};
/// A container for any DB transaction that will open a new inner transaction when the current
/// one is committed.
@ -70,7 +67,7 @@ where
/// Create a new container with the given database handle.
///
/// A new inner transaction will be opened.
pub fn new(db: &'this DB) -> Result<Self, Error> {
pub fn new(db: &'this DB) -> Result<Self, DbError> {
Ok(Self { db, tx: Some(db.tx_mut()?) })
}
@ -85,14 +82,14 @@ where
///
/// Panics if an inner transaction does not exist. This should never be the case unless
/// [Transaction::close] was called without following up with a call to [Transaction::open].
pub fn commit(&mut self) -> Result<bool, Error> {
pub fn commit(&mut self) -> Result<bool, DbError> {
let success = if let Some(tx) = self.tx.take() { tx.commit()? } else { false };
self.tx = Some(self.db.tx_mut()?);
Ok(success)
}
/// Open a new inner transaction.
pub fn open(&mut self) -> Result<(), Error> {
pub fn open(&mut self) -> Result<(), DbError> {
self.tx = Some(self.db.tx_mut()?);
Ok(())
}
@ -103,41 +100,37 @@ where
}
/// Query [tables::CanonicalHeaders] table for block hash by block number
pub(crate) fn get_block_hash(&self, number: BlockNumber) -> Result<BlockHash, StageError> {
pub(crate) fn get_block_hash(
&self,
block_number: BlockNumber,
) -> Result<BlockHash, TransactionError> {
let hash = self
.get::<tables::CanonicalHeaders>(number)?
.ok_or(DatabaseIntegrityError::CanonicalHeader { number })?;
.get::<tables::CanonicalHeaders>(block_number)?
.ok_or(ProviderError::CanonicalHeader { block_number })?;
Ok(hash)
}
/// Query the block body by number.
pub(crate) fn get_block_body(
&self,
number: BlockNumber,
) -> Result<StoredBlockBody, StageError> {
let body = self
.get::<tables::BlockBodies>(number)?
.ok_or(DatabaseIntegrityError::BlockBody { number })?;
pub fn get_block_body(&self, number: BlockNumber) -> Result<StoredBlockBody, TransactionError> {
let body =
self.get::<tables::BlockBodies>(number)?.ok_or(ProviderError::BlockBody { number })?;
Ok(body)
}
/// Query the last transition of the block by [BlockNumber] key
pub(crate) fn get_block_transition(
&self,
key: BlockNumber,
) -> Result<TransitionId, StageError> {
pub fn get_block_transition(&self, key: BlockNumber) -> Result<TransitionId, TransactionError> {
let last_transition_id = self
.get::<tables::BlockTransitionIndex>(key)?
.ok_or(DatabaseIntegrityError::BlockTransition { number: key })?;
.ok_or(ProviderError::BlockTransition { block_number: key })?;
Ok(last_transition_id)
}
/// Get the next start transaction id and transition for the `block` by looking at the previous
/// block. Returns Zero/Zero for Genesis.
pub(crate) fn get_next_block_ids(
pub fn get_next_block_ids(
&self,
block: BlockNumber,
) -> Result<(TxNumber, TransitionId), StageError> {
) -> Result<(TxNumber, TransitionId), TransactionError> {
if block == 0 {
return Ok((0, 0))
}
@ -146,21 +139,20 @@ where
let prev_body = self.get_block_body(prev_number)?;
let last_transition = self
.get::<tables::BlockTransitionIndex>(prev_number)?
.ok_or(DatabaseIntegrityError::BlockTransition { number: prev_number })?;
.ok_or(ProviderError::BlockTransition { block_number: prev_number })?;
Ok((prev_body.start_tx_id + prev_body.tx_count, last_transition))
}
/// Query the block header by number
pub(crate) fn get_header(&self, number: BlockNumber) -> Result<Header, StageError> {
let header = self
.get::<tables::Headers>(number)?
.ok_or(DatabaseIntegrityError::Header { number })?;
pub fn get_header(&self, number: BlockNumber) -> Result<Header, TransactionError> {
let header =
self.get::<tables::Headers>(number)?.ok_or(ProviderError::Header { number })?;
Ok(header)
}
/// Unwind table by some number key
#[inline]
pub(crate) fn unwind_table_by_num<T>(&self, num: u64) -> Result<(), Error>
pub fn unwind_table_by_num<T>(&self, num: u64) -> Result<(), DbError>
where
DB: Database,
T: Table<Key = u64>,
@ -173,7 +165,7 @@ where
&self,
block: BlockNumber,
mut selector: F,
) -> Result<(), Error>
) -> Result<(), DbError>
where
DB: Database,
T: Table,
@ -192,7 +184,7 @@ where
}
/// Unwind a table forward by a [Walker][reth_db::abstraction::cursor::Walker] on another table
pub(crate) fn unwind_table_by_walker<T1, T2>(&self, start_at: T1::Key) -> Result<(), Error>
pub fn unwind_table_by_walker<T1, T2>(&self, start_at: T1::Key) -> Result<(), DbError>
where
DB: Database,
T1: Table,
@ -206,3 +198,14 @@ where
Ok(())
}
}
/// An error that can occur when using the transaction container
#[derive(Debug, thiserror::Error)]
pub enum TransactionError {
/// The transaction encountered a database error.
#[error("Database error: {0}")]
Database(#[from] DbError),
/// The transaction encountered a database integrity error.
#[error("A database integrity error occurred: {0}")]
DatabaseIntegrity(#[from] ProviderError),
}

View File

@ -44,7 +44,7 @@ pub fn insert_block<'a, TX: DbTxMut<'a> + DbTx<'a>>(
let prev_block_num = block.number - 1;
let prev_body = tx
.get::<tables::BlockBodies>(prev_block_num)?
.ok_or(ProviderError::BlockBody { block_number: prev_block_num })?;
.ok_or(ProviderError::BlockBody { number: prev_block_num })?;
let last_transition_id = tx
.get::<tables::BlockTransitionIndex>(prev_block_num)?
.ok_or(ProviderError::BlockTransition { block_number: prev_block_num })?;

View File

@ -194,7 +194,7 @@ pub trait DbTxMut<'tx>: for<'a> DbTxMutGAT<'a> {
Lets take a look at the `DbTx` and `DbTxMut` traits in action. Revisiting the `Transaction` struct as an example, the `Transaction::get_block_hash()` method uses the `DbTx::get()` function to get a block header hash in the form of `self.get::<tables::CanonicalHeaders>(number)`.
[File: crates/stages/src/db.rs](https://github.com/paradigmxyz/reth/blob/main/crates/stages/src/db.rs#L106)
[File: crates/storage/provider/src/transaction.rs](https://github.com/paradigmxyz/reth/blob/main/crates/storage/provider/src/transaction.rs#L106)
```rust ignore
@ -208,7 +208,7 @@ where
pub(crate) fn get_block_hash(&self, number: BlockNumber) -> Result<BlockHash, StageError> {
let hash = self
.get::<tables::CanonicalHeaders>(number)?
.ok_or(DatabaseIntegrityError::CanonicalHash { number })?;
.ok_or(ProviderError::CanonicalHash { number })?;
Ok(hash)
}
//--snip--