feat: bodies stage (#190)

* chore: clean up `.gitignore`

* fix: make RO cursors `Send + Sync`

* feat(wip): bodies stage

* driveby: improve docs

* chore: don't panic if we're the first stage

* chore: use `Vec` for ommers

* feat: error handling in bodies downloader

* chore: remove stale comment

* chore: pascal-case stage id

* refactor: remove unused new fns

* refactor: distinguish downloaders with prefix

* refactor: move downloader errs to own module

* refactor: `stream_bodies` -> `bodies_stream`

* test: fix borked imports in header stage

* test: clean up header tests

* test: add basic body stage tests

* test: add 2 more body stage test skeletons

* test: move generator test utils to own module

* refactor: move proof functions to primitives crate

* feat: add block generator test utils

* test: more body stage tests

* chore: fix typo (`Cannonical*` -> `Canonical`)

* docs: document `bodies_to_download`

* test: more body stage tests

* test: more body stage tests

* refactor: clean up body stage tests a bit

* test: fix broken tests

* refactor: clean up body stage tests

* test: more body stage tests
This commit is contained in:
Bjerg
2022-11-15 08:49:11 +01:00
committed by GitHub
parent 651eed1086
commit 92a7818512
48 changed files with 1772 additions and 340 deletions

2
.gitignore vendored
View File

@ -1,2 +1,2 @@
.idea
/target
target

27
Cargo.lock generated
View File

@ -3043,6 +3043,21 @@ dependencies = [
"walkdir",
]
[[package]]
name = "reth-bodies-downloaders"
version = "0.1.0"
dependencies = [
"assert_matches",
"futures-util",
"once_cell",
"rand 0.8.5",
"reth-eth-wire",
"reth-interfaces",
"reth-primitives",
"serial_test",
"tokio",
]
[[package]]
name = "reth-codecs"
version = "0.1.0"
@ -3057,16 +3072,11 @@ dependencies = [
"async-trait",
"auto_impl",
"eyre",
"hash-db",
"plain_hasher",
"reth-interfaces",
"reth-primitives",
"reth-rlp",
"rlp",
"sha3",
"thiserror",
"tokio",
"triehash",
]
[[package]]
@ -3225,6 +3235,7 @@ dependencies = [
"rand 0.8.5",
"reth-codecs",
"reth-db",
"reth-eth-wire",
"reth-primitives",
"reth-rpc-types",
"serde",
@ -3332,10 +3343,12 @@ dependencies = [
"crc",
"derive_more",
"ethers-core",
"hash-db",
"hex",
"hex-literal",
"maplit",
"parity-scale-codec",
"plain_hasher",
"reth-codecs",
"reth-rlp",
"secp256k1",
@ -3344,6 +3357,7 @@ dependencies = [
"sucds",
"thiserror",
"tiny-keccak",
"triehash",
]
[[package]]
@ -3416,9 +3430,12 @@ dependencies = [
"aquamarine",
"assert_matches",
"async-trait",
"futures-util",
"metrics",
"rand 0.8.5",
"reth-bodies-downloaders",
"reth-db",
"reth-eth-wire",
"reth-headers-downloaders",
"reth-interfaces",
"reth-primitives",

View File

@ -18,6 +18,7 @@ members = [
"crates/net/rpc-api",
"crates/net/rpc-types",
"crates/net/headers-downloaders",
"crates/net/bodies-downloaders",
"crates/primitives",
"crates/stages",
"crates/transaction-pool",

1
crates/.gitignore vendored
View File

@ -1 +0,0 @@
target/

View File

@ -18,13 +18,3 @@ thiserror = "1.0.37"
eyre = "0.6.8"
auto_impl = "1.0"
tokio = { version = "1.21.2", features = ["sync"] }
# proof related
triehash = "0.8"
# See to replace hashers to simplify libraries
plain_hasher = "0.2"
hash-db = "0.15"
# todo replace with faster rlp impl
rlp = { version = "0.5", default-features = false }
# replace with tiny-keccak (it is faster hasher)
sha3 = { version = "0.10", default-features = false }

View File

@ -2,7 +2,7 @@
use crate::{verification, Config};
use reth_interfaces::consensus::{Consensus, Error, ForkchoiceState};
use reth_primitives::{SealedHeader, H256};
use reth_primitives::{BlockLocked, SealedHeader, H256};
use tokio::sync::watch;
/// Ethereum consensus
@ -40,4 +40,8 @@ impl Consensus for EthConsensus {
// * mix_hash & nonce PoW stuf
// * extra_data
}
fn pre_validate_block(&self, block: &BlockLocked) -> Result<(), Error> {
verification::validate_block_standalone(block, false)
}
}

View File

@ -10,9 +10,6 @@ pub mod config;
pub mod consensus;
pub mod verification;
/// Helper function for calculating Merkle proofs and hashes
pub mod proofs;
pub use config::Config;
pub use consensus::EthConsensus;
pub use reth_interfaces::consensus::Error;

View File

@ -115,10 +115,20 @@ pub fn validate_transaction_regarding_state<AP: AccountProvider>(
Ok(())
}
/// Validate block standalone
pub fn validate_block_standalone(block: &BlockLocked) -> Result<(), Error> {
// check ommers hash
let ommers_hash = crate::proofs::calculate_ommers_root(block.ommers.iter().map(|h| h.as_ref()));
/// Validate a block without regard for state:
///
/// - Compares the ommer hash in the block header to the block body
/// - Compares the transactions root in the block header to the block body
/// - Pre-execution transaction validation
/// - (Optionally) Compares the receipts root in the block header to the block body
pub fn validate_block_standalone(
block: &BlockLocked,
validate_receipts: bool,
) -> Result<(), Error> {
// Check ommers hash
// TODO(onbjerg): This should probably be accessible directly on [Block]
let ommers_hash =
reth_primitives::proofs::calculate_ommers_root(block.ommers.iter().map(|h| h.as_ref()));
if block.header.ommers_hash != ommers_hash {
return Err(Error::BodyOmmersHashDiff {
got: ommers_hash,
@ -126,8 +136,9 @@ pub fn validate_block_standalone(block: &BlockLocked) -> Result<(), Error> {
})
}
// check transaction root
let transaction_root = crate::proofs::calculate_transaction_root(block.body.iter());
// Check transaction root
// TODO(onbjerg): This should probably be accessible directly on [Block]
let transaction_root = reth_primitives::proofs::calculate_transaction_root(block.body.iter());
if block.header.transactions_root != transaction_root {
return Err(Error::BodyTransactionRootDiff {
got: transaction_root,
@ -135,18 +146,27 @@ pub fn validate_block_standalone(block: &BlockLocked) -> Result<(), Error> {
})
}
// TODO transaction verification, Maybe make it configurable as in check only
// TODO: transaction verification,maybe make it configurable as in check only
// signatures/limits/types
// Things to probably check:
// - Chain ID
// - Base fee per gas (if applicable)
// - Max priority fee per gas (if applicable)
// check if all transactions limit does not goes over block limit
// TODO: Check if all transaction gas total does not go over block limit
// check receipts root
let receipts_root = crate::proofs::calculate_receipt_root(block.receipts.iter());
if block.header.receipts_root != receipts_root {
return Err(Error::BodyReceiptsRootDiff {
got: receipts_root,
expected: block.header.receipts_root,
})
// Check receipts root
// TODO(onbjerg): This should probably be accessible directly on [Block]
// NOTE(onbjerg): Pre-validation does not validate the receipts root since we do not have the
// receipts yet (this validation is before execution). Maybe this should not be in here?
if validate_receipts {
let receipts_root = reth_primitives::proofs::calculate_receipt_root(block.receipts.iter());
if block.header.receipts_root != receipts_root {
return Err(Error::BodyReceiptsRootDiff {
got: receipts_root,
expected: block.header.receipts_root,
})
}
}
Ok(())
@ -284,7 +304,7 @@ pub fn full_validation<PROV: HeaderProvider>(
config: &Config,
) -> RethResult<()> {
validate_header_standalone(&block.header, config)?;
validate_block_standalone(block)?;
validate_block_standalone(block, true)?;
let parent = validate_block_regarding_chain(block, &provider)?;
validate_header_regarding_parent(&parent, &block.header, config)?;
Ok(())

View File

@ -50,7 +50,7 @@ impl<E: EnvironmentKind> Database for Env<E> {
impl<E: EnvironmentKind> Env<E> {
/// Opens the database at the specified path with the given `EnvKind`.
///
/// It does not create the tables, for that call [`create_tables`].
/// It does not create the tables, for that call [`Env::create_tables`].
pub fn open(path: &Path, kind: EnvKind) -> Result<Env<E>, Error> {
let mode = match kind {
EnvKind::RO => Mode::ReadOnly,

View File

@ -16,6 +16,9 @@ auto_impl = "1.0"
tokio = { version = "1.21.2", features = ["sync"] }
bytes = "1.2"
# TODO(onbjerg): We only need this for [BlockBody]
reth-eth-wire = { path = "../net/eth-wire" }
# codecs
serde = { version = "1.0.*", default-features = false }
postcard = { version = "1.0.2", features = ["alloc"] }

View File

@ -1,20 +1,30 @@
use async_trait::async_trait;
use reth_primitives::{BlockHash, BlockNumber, SealedHeader, H256};
use reth_primitives::{BlockHash, BlockLocked, BlockNumber, SealedHeader, H256};
use tokio::sync::watch::Receiver;
/// Re-export forkchoice state
pub use reth_rpc_types::engine::ForkchoiceState;
/// Consensus is a protocol that chooses canonical chain.
/// We are checking validity of block header here.
#[async_trait]
#[auto_impl::auto_impl(&, Arc)]
pub trait Consensus: Send + Sync {
/// Get a receiver for the fork choice state
fn fork_choice_state(&self) -> Receiver<ForkchoiceState>;
/// Validate if header is correct and follows consensus specification
/// Validate if header is correct and follows consensus specification.
///
/// **This should not be called for the genesis block**.
fn validate_header(&self, header: &SealedHeader, parent: &SealedHeader) -> Result<(), Error>;
/// Validate a block disregarding world state, i.e. things that can be checked before sender
/// recovery and execution.
///
/// See the Yellow Paper sections 4.3.2 "Holistic Validity", 4.3.4 "Block Header Validity", and
/// 11.1 "Ommer Validation".
///
/// **This should not be called for the genesis block**.
fn pre_validate_block(&self, block: &BlockLocked) -> Result<(), Error>;
}
/// Consensus Errors

View File

@ -1,4 +1,7 @@
use crate::db::{models::accounts::AccountBeforeTx, Compress, Decompress, Error};
use crate::db::{
models::{accounts::AccountBeforeTx, StoredBlockBody},
Compress, Decompress, Error,
};
use parity_scale_codec::decode_from_bytes;
use reth_primitives::*;
@ -53,7 +56,16 @@ impl ScaleValue for Vec<u8> {}
impl sealed::Sealed for Vec<u8> {}
impl_scale!(U256, H256, H160);
impl_scale!(Header, Account, Log, Receipt, TxType, StorageEntry, TransactionSigned);
impl_scale!(
Header,
Account,
Log,
Receipt,
TxType,
StorageEntry,
TransactionSigned,
StoredBlockBody
);
impl_scale!(AccountBeforeTx);
impl_scale_value!(u8, u32, u16, u64);

View File

@ -74,9 +74,9 @@ pub trait Database: for<'a> DatabaseGAT<'a> {
/// Sealed trait which cannot be implemented by 3rd parties, exposed only for implementers
pub trait DbTxGAT<'a, __ImplicitBounds: Sealed = Bounds<&'a Self>>: Send + Sync {
/// Cursor GAT
type Cursor<T: Table>: DbCursorRO<'a, T>;
type Cursor<T: Table>: DbCursorRO<'a, T> + Send + Sync;
/// DupCursor GAT
type DupCursor<T: DupSort>: DbDupCursorRO<'a, T> + DbCursorRO<'a, T>;
type DupCursor<T: DupSort>: DbDupCursorRO<'a, T> + DbCursorRO<'a, T> + Send + Sync;
}
/// Implements the GAT method from:
@ -85,12 +85,14 @@ pub trait DbTxGAT<'a, __ImplicitBounds: Sealed = Bounds<&'a Self>>: Send + Sync
/// Sealed trait which cannot be implemented by 3rd parties, exposed only for implementers
pub trait DbTxMutGAT<'a, __ImplicitBounds: Sealed = Bounds<&'a Self>>: Send + Sync {
/// Cursor GAT
type CursorMut<T: Table>: DbCursorRW<'a, T> + DbCursorRO<'a, T>;
type CursorMut<T: Table>: DbCursorRW<'a, T> + DbCursorRO<'a, T> + Send + Sync;
/// DupCursor GAT
type DupCursorMut<T: DupSort>: DbDupCursorRW<'a, T>
+ DbCursorRW<'a, T>
+ DbDupCursorRO<'a, T>
+ DbCursorRO<'a, T>;
+ DbCursorRO<'a, T>
+ Send
+ Sync;
}
/// Read only transaction
@ -190,7 +192,9 @@ pub trait DbCursorRW<'tx, T: Table> {
/// exists in a table, and insert a new row if the specified value doesn't already exist
fn upsert(&mut self, key: T::Key, value: T::Value) -> Result<(), Error>;
/// Append value to next cursor item
/// Append value to next cursor item.
///
/// This is efficient for pre-sorted data. If the data is not pre-sorted, use [`insert`].
fn append(&mut self, key: T::Key, value: T::Value) -> Result<(), Error>;
/// Delete current value that cursor points to
@ -201,7 +205,10 @@ pub trait DbCursorRW<'tx, T: Table> {
pub trait DbDupCursorRW<'tx, T: DupSort> {
/// Append value to next cursor item
fn delete_current_duplicates(&mut self) -> Result<(), Error>;
/// Append duplicate value
/// Append duplicate value.
///
/// This is efficient for pre-sorted data. If the data is not pre-sorted, use [`insert`].
fn append_dup(&mut self, key: T::Key, value: T::Value) -> Result<(), Error>;
}

View File

@ -8,14 +8,30 @@ use crate::{
impl_fixed_arbitrary,
};
use bytes::Bytes;
use reth_primitives::{BlockHash, BlockNumber, H256};
use reth_codecs::main_codec;
use reth_primitives::{BlockHash, BlockNumber, Header, TxNumber, H256};
use serde::{Deserialize, Serialize};
/// Total chain number of transactions. Key for [`CumulativeTxCount`].
pub type NumTransactions = u64;
/// Number of transactions in the block. Value for [`BlockBodies`].
pub type NumTxesInBlock = u16;
/// The storage representation of a block body.
///
/// A block body is stored as a pointer to the first transaction in the block (`base_tx_id`), a
/// count of how many transactions are in the block, and the headers of the block's uncles.
///
/// The [TxNumber]s for all the transactions in the block are `base_tx_id..(base_tx_id +
/// tx_amount)`.
#[derive(Debug)]
#[main_codec]
pub struct StoredBlockBody {
/// The ID of the first transaction in the block.
pub base_tx_id: TxNumber,
/// The number of transactions in the block.
pub tx_amount: u64,
/// The block headers of this block's uncles.
pub ommers: Vec<Header>,
}
/// Hash of the block header. Value for [`CanonicalHeaders`]
pub type HeaderHash = H256;

View File

@ -3,7 +3,7 @@
use crate::db::{
models::{
accounts::{AccountBeforeTx, TxNumberAddress},
blocks::{BlockNumHash, HeaderHash, NumTransactions, NumTxesInBlock},
blocks::{BlockNumHash, HeaderHash, NumTransactions, StoredBlockBody},
ShardedKey,
},
DupSort,
@ -13,7 +13,7 @@ use reth_primitives::{
TransactionSigned, TxNumber, H256,
};
/// Enum for the type of table present in libmdbx.
/// Enum for the types of tables present in libmdbx.
#[derive(Debug)]
pub enum TableType {
/// key value table
@ -119,8 +119,10 @@ table!(
Headers => BlockNumHash => Header);
table!(
/// Stores the number of transactions of a block.
BlockBodies => BlockNumHash => NumTxesInBlock);
/// Stores a pointer to the first transaction in the block, the number of transactions in the block, and the uncles/ommers of the block.
///
/// The transaction IDs point to the [`Transactions`] table.
BlockBodies => BlockNumHash => StoredBlockBody);
table!(
/// Stores the maximum [`TxNumber`] from which this particular block starts.
@ -131,19 +133,19 @@ table!(
NonCanonicalTransactions => BlockNumHashTxNumber => TransactionSigned);
table!(
/// Stores the transaction body from canonical transactions. Canonical only
/// (Canonical only) Stores the transaction body for canonical transactions.
Transactions => TxNumber => TransactionSigned);
table!(
/// Stores transaction receipts. Canonical only
/// (Canonical only) Stores transaction receipts.
Receipts => TxNumber => Receipt);
table!(
/// Stores transaction logs. Canonical only
/// (Canonical only) Stores transaction logs.
Logs => TxNumber => Receipt);
table!(
/// Stores the current state of an Account.
/// Stores the current state of an [`Account`].
PlainAccountState => Address => Account);
table!(
@ -200,27 +202,27 @@ table!(
AccountHistory => ShardedKey<Address> => TxNumberList);
table!(
/// Stores the transaction numbers that changed each storage key.
/// Stores pointers to transactions that changed each storage key.
StorageHistory => AddressStorageKey => TxNumberList);
dupsort!(
/// Stores state of an account before a certain transaction changed it.
/// Stores the state of an account before a certain transaction changed it.
AccountChangeSet => TxNumber => [Address] AccountBeforeTx);
dupsort!(
/// Stores state of a storage key before a certain transaction changed it.
/// Stores the state of a storage key before a certain transaction changed it.
StorageChangeSet => TxNumberAddress => [H256] StorageEntry);
table!(
/// Stores the transaction sender from each transaction.
/// Stores the transaction sender for each transaction.
TxSenders => TxNumber => Address); // Is it necessary? if so, inverted index index so we dont repeat addresses?
table!(
/// Config.
/// Configuration values.
Config => ConfigKey => ConfigValue);
table!(
/// Stores the block number of each stage id.
/// Stores the highest synced block number of each stage.
SyncStage => StageId => BlockNumber);
///

View File

@ -0,0 +1,14 @@
use reth_eth_wire::BlockBody;
use reth_primitives::H256;
use crate::p2p::bodies::error::BodiesClientError;
use async_trait::async_trait;
use std::fmt::Debug;
/// A client capable of downloading block bodies.
#[async_trait]
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait BodiesClient: Send + Sync + Debug {
/// Fetches the block body for the requested block.
async fn get_block_body(&self, hash: H256) -> Result<BlockBody, BodiesClientError>;
}

View File

@ -0,0 +1,44 @@
use super::client::BodiesClient;
use crate::p2p::bodies::error::DownloadError;
use reth_eth_wire::BlockBody;
use reth_primitives::{BlockNumber, H256};
use std::{pin::Pin, time::Duration};
use tokio_stream::Stream;
/// A downloader capable of fetching block bodies from header hashes.
///
/// A downloader represents a distinct strategy for submitting requests to download block bodies,
/// while a [BodiesClient] represents a client capable of fulfilling these requests.
pub trait BodyDownloader: Sync + Send {
/// The [BodiesClient] used to fetch the block bodies
type Client: BodiesClient;
/// The request timeout duration
fn timeout(&self) -> Duration;
/// The block bodies client
fn client(&self) -> &Self::Client;
/// Download the bodies from `starting_block` (inclusive) up until `target_block` (inclusive).
///
/// The returned stream will always emit bodies in the order they were requested, but multiple
/// requests may be in flight at the same time.
///
/// The stream may exit early in some cases. Thus, a downloader can only at a minimum guarantee:
///
/// - All emitted bodies map onto a request
/// - The emitted bodies are emitted in order: i.e. the body for the first block is emitted
/// first, even if it was not fetched first.
///
/// It is *not* guaranteed that all the requested bodies are fetched: the downloader may close
/// the stream before the entire range has been fetched for any reason
fn bodies_stream<'a, 'b, I>(&'a self, headers: I) -> BodiesStream<'a>
where
I: IntoIterator<Item = &'b (BlockNumber, H256)>,
<I as IntoIterator>::IntoIter: Send + 'b,
'b: 'a;
}
/// A stream of block bodies.
pub type BodiesStream<'a> =
Pin<Box<dyn Stream<Item = Result<(BlockNumber, H256, BlockBody), DownloadError>> + Send + 'a>>;

View File

@ -0,0 +1,51 @@
use crate::p2p::error::RequestError;
use reth_primitives::H256;
use thiserror::Error;
/// Body client errors.
#[derive(Error, Debug, Clone)]
pub enum BodiesClientError {
/// Timed out while waiting for a response.
#[error("Timed out while getting bodies for block {header_hash}.")]
Timeout {
/// The header hash of the block that timed out.
header_hash: H256,
},
/// The client encountered an internal error.
#[error(transparent)]
Internal(#[from] RequestError),
}
/// Body downloader errors.
#[derive(Error, Debug, Clone)]
pub enum DownloadError {
/// Timed out while waiting for a response.
#[error("Timed out while getting bodies for block {header_hash}.")]
Timeout {
/// The header hash of the block that timed out.
header_hash: H256,
},
/// The [BodiesClient] used by the downloader experienced an error.
#[error("The downloader client encountered an error.")]
Client {
/// The underlying client error.
#[source]
source: BodiesClientError,
},
}
impl From<BodiesClientError> for DownloadError {
fn from(error: BodiesClientError) -> Self {
match error {
BodiesClientError::Timeout { header_hash } => DownloadError::Timeout { header_hash },
_ => DownloadError::Client { source: error },
}
}
}
impl DownloadError {
/// Indicates whether this error is retryable or fatal.
pub fn is_retryable(&self) -> bool {
matches!(self, DownloadError::Timeout { .. })
}
}

View File

@ -0,0 +1,8 @@
/// Traits and types for block body clients.
pub mod client;
/// Block body downloaders.
pub mod downloader;
/// Error types.
pub mod error;

View File

@ -4,7 +4,7 @@ use tokio::sync::{mpsc, oneshot};
pub type RequestResult<T> = Result<T, RequestError>;
/// Error variants that can happen when sending requests to a session.
#[derive(Debug, thiserror::Error)]
#[derive(Debug, thiserror::Error, Clone)]
#[allow(missing_docs)]
pub enum RequestError {
#[error("Closed channel to the peer.")]

View File

@ -1,60 +1,20 @@
use super::client::{HeadersClient, HeadersRequest, HeadersStream};
use crate::consensus::Consensus;
use crate::p2p::headers::error::DownloadError;
use async_trait::async_trait;
use reth_primitives::{
rpc::{BlockId, BlockNumber},
Header, SealedHeader, H256,
};
use reth_primitives::{rpc::BlockId, Header, SealedHeader};
use reth_rpc_types::engine::ForkchoiceState;
use std::{fmt::Debug, time::Duration};
use thiserror::Error;
use std::time::Duration;
use tokio_stream::StreamExt;
/// The downloader error type
#[derive(Error, Debug, Clone)]
pub enum DownloadError {
/// Header validation failed
#[error("Failed to validate header {hash}. Details: {details}.")]
HeaderValidation {
/// Hash of header failing validation
hash: H256,
/// The details of validation failure
details: String,
},
/// Timed out while waiting for request id response.
#[error("Timed out while getting headers for request {request_id}.")]
Timeout {
/// The request id that timed out
request_id: u64,
},
/// Error when checking that the current [`Header`] has the parent's hash as the parent_hash
/// field, and that they have sequential block numbers.
#[error("Headers did not match, current number: {header_number} / current hash: {header_hash}, parent number: {parent_number} / parent_hash: {parent_hash}")]
MismatchedHeaders {
/// The header number being evaluated
header_number: BlockNumber,
/// The header hash being evaluated
header_hash: H256,
/// The parent number being evaluated
parent_number: BlockNumber,
/// The parent hash being evaluated
parent_hash: H256,
},
}
impl DownloadError {
/// Returns bool indicating whether this error is retryable or fatal, in the cases
/// where the peer responds with no headers, or times out.
pub fn is_retryable(&self) -> bool {
matches!(self, DownloadError::Timeout { .. })
}
}
/// The header downloading strategy
/// A downloader capable of fetching block headers.
///
/// A downloader represents a distinct strategy for submitting requests to download block headers,
/// while a [HeadersClient] represents a client capable of fulfilling these requests.
#[async_trait]
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait Downloader: Sync + Send {
pub trait HeaderDownloader: Sync + Send {
/// The Consensus used to verify block validity when
/// downloading
type Consensus: Consensus;
@ -118,9 +78,9 @@ pub trait Downloader: Sync + Send {
})
}
self.consensus().validate_header(header, parent).map_err(|e| {
DownloadError::HeaderValidation { hash: parent.hash(), details: e.to_string() }
})?;
self.consensus()
.validate_header(header, parent)
.map_err(|error| DownloadError::HeaderValidation { hash: parent.hash(), error })?;
Ok(())
}
}

View File

@ -0,0 +1,44 @@
use crate::consensus;
use reth_primitives::{rpc::BlockNumber, H256};
use thiserror::Error;
/// The downloader error type
#[derive(Error, Debug, Clone)]
pub enum DownloadError {
/// Header validation failed
#[error("Failed to validate header {hash}. Details: {error}.")]
HeaderValidation {
/// Hash of header failing validation
hash: H256,
/// The details of validation failure
#[source]
error: consensus::Error,
},
/// Timed out while waiting for request id response.
#[error("Timed out while getting headers for request {request_id}.")]
Timeout {
/// The request id that timed out
request_id: u64,
},
/// Error when checking that the current [`Header`] has the parent's hash as the parent_hash
/// field, and that they have sequential block numbers.
#[error("Headers did not match, current number: {header_number} / current hash: {header_hash}, parent number: {parent_number} / parent_hash: {parent_hash}")]
MismatchedHeaders {
/// The header number being evaluated
header_number: BlockNumber,
/// The header hash being evaluated
header_hash: H256,
/// The parent number being evaluated
parent_number: BlockNumber,
/// The parent hash being evaluated
parent_hash: H256,
},
}
impl DownloadError {
/// Returns bool indicating whether this error is retryable or fatal, in the cases
/// where the peer responds with no headers, or times out.
pub fn is_retryable(&self) -> bool {
matches!(self, DownloadError::Timeout { .. })
}
}

View File

@ -9,3 +9,6 @@ pub mod client;
/// [`Consensus`]: crate::consensus::Consensus
/// [`HeadersClient`]: client::HeadersClient
pub mod downloader;
/// Error types.
pub mod error;

View File

@ -1,3 +1,6 @@
/// Traits for implementing P2P block body clients.
pub mod bodies;
/// Traits for implementing P2P Header Clients. Also includes implementations
/// of a Linear and a Parallel downloader generic over the [`Consensus`] and
/// [`HeadersClient`].

View File

@ -0,0 +1,33 @@
use crate::p2p::bodies::{client::BodiesClient, error::BodiesClientError};
use async_trait::async_trait;
use reth_eth_wire::BlockBody;
use reth_primitives::H256;
use std::fmt::{Debug, Formatter};
/// A test client for fetching bodies
pub struct TestBodiesClient<F>
where
F: Fn(H256) -> Result<BlockBody, BodiesClientError>,
{
/// The function that is called on each body request.
pub responder: F,
}
impl<F> Debug for TestBodiesClient<F>
where
F: Fn(H256) -> Result<BlockBody, BodiesClientError>,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TestBodiesClient").finish()
}
}
#[async_trait]
impl<F> BodiesClient for TestBodiesClient<F>
where
F: Fn(H256) -> Result<BlockBody, BodiesClientError> + Send + Sync,
{
async fn get_block_body(&self, hash: H256) -> Result<BlockBody, BodiesClientError> {
(self.responder)(hash)
}
}

View File

@ -0,0 +1,142 @@
use rand::{thread_rng, Rng};
use reth_primitives::{
proofs, Address, BlockLocked, Bytes, Header, SealedHeader, Signature, Transaction,
TransactionKind, TransactionSigned, H256, U256,
};
// TODO(onbjerg): Maybe we should split this off to its own crate, or move the helpers to the
// relevant crates?
/// Generates a range of random [SealedHeader]s.
///
/// The parent hash of the first header
/// in the result will be equal to `head`.
///
/// The headers are assumed to not be correct if validated.
pub fn random_header_range(rng: std::ops::Range<u64>, head: H256) -> Vec<SealedHeader> {
let mut headers = Vec::with_capacity(rng.end.saturating_sub(rng.start) as usize);
for idx in rng {
headers.push(random_header(
idx,
Some(headers.last().map(|h: &SealedHeader| h.hash()).unwrap_or(head)),
));
}
headers
}
/// Generate a random [SealedHeader].
///
/// The header is assumed to not be correct if validated.
pub fn random_header(number: u64, parent: Option<H256>) -> SealedHeader {
let header = reth_primitives::Header {
number,
nonce: rand::random(),
difficulty: U256::from(rand::random::<u32>()),
parent_hash: parent.unwrap_or_default(),
..Default::default()
};
header.seal()
}
/// Generates a random legacy [Transaction].
///
/// Every field is random, except:
///
/// - The chain ID, which is always 1
/// - The input, which is always nothing
pub fn random_tx() -> Transaction {
Transaction::Legacy {
chain_id: Some(1),
nonce: rand::random::<u16>().into(),
gas_price: rand::random::<u16>().into(),
gas_limit: rand::random::<u16>().into(),
to: TransactionKind::Call(Address::random()),
value: rand::random::<u16>().into(),
input: Bytes::default(),
}
}
/// Generates a random legacy [Transaction] that is signed.
///
/// On top of the considerations of [gen_random_tx], these apply as well:
///
/// - There is no guarantee that the nonce is not used twice for the same account
pub fn random_signed_tx() -> TransactionSigned {
let tx = random_tx();
let hash = tx.signature_hash();
TransactionSigned {
transaction: tx,
hash,
signature: Signature {
// TODO
r: Default::default(),
s: Default::default(),
odd_y_parity: false,
},
}
}
/// Generate a random block filled with a random number of signed transactions (generated using
/// [random_signed_tx]).
///
/// All fields use the default values (and are assumed to be invalid) except for:
///
/// - `parent_hash`
/// - `transactions_root`
/// - `ommers_hash`
///
/// Additionally, `gas_used` and `gas_limit` always exactly match the total `gas_limit` of all
/// transactions in the block.
///
/// The ommer headers are not assumed to be valid.
pub fn random_block(number: u64, parent: Option<H256>) -> BlockLocked {
let mut rng = thread_rng();
// Generate transactions
let transactions: Vec<TransactionSigned> =
(0..rand::random::<u8>()).into_iter().map(|_| random_signed_tx()).collect();
let total_gas = transactions.iter().fold(0, |sum, tx| sum + tx.transaction.gas_limit());
// Generate ommers
let mut ommers = Vec::new();
for _ in 0..rng.gen_range(0..2) {
ommers.push(random_header(number, parent).unseal());
}
// Calculate roots
let transactions_root = proofs::calculate_transaction_root(transactions.iter());
let ommers_hash = proofs::calculate_ommers_root(ommers.iter());
BlockLocked {
header: Header {
parent_hash: parent.unwrap_or_default(),
number,
gas_used: total_gas,
gas_limit: total_gas,
transactions_root,
ommers_hash,
..Default::default()
}
.seal(),
body: transactions,
ommers: ommers.into_iter().map(|ommer| ommer.seal()).collect(),
..Default::default()
}
}
/// Generate a range of random blocks.
///
/// The parent hash of the first block
/// in the result will be equal to `head`.
///
/// See [random_block] for considerations when validating the generated blocks.
pub fn random_block_range(rng: std::ops::Range<u64>, head: H256) -> Vec<BlockLocked> {
let mut blocks = Vec::with_capacity(rng.end.saturating_sub(rng.start) as usize);
for idx in rng {
blocks.push(random_block(
idx,
Some(blocks.last().map(|block: &BlockLocked| block.header.hash()).unwrap_or(head)),
));
}
blocks
}

View File

@ -1,24 +1,25 @@
//! Testing support for headers related interfaces.
use crate::{
consensus::{self, Consensus},
consensus::{self, Consensus, Error},
p2p::headers::{
client::{HeadersClient, HeadersRequest, HeadersResponse, HeadersStream},
downloader::{DownloadError, Downloader},
downloader::HeaderDownloader,
error::DownloadError,
},
};
use reth_primitives::{Header, SealedHeader, H256, H512, U256};
use reth_primitives::{BlockLocked, Header, SealedHeader, H256, H512};
use reth_rpc_types::engine::ForkchoiceState;
use std::{collections::HashSet, sync::Arc, time::Duration};
use tokio::sync::{broadcast, mpsc, watch};
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
#[derive(Debug)]
/// A test downloader which just returns the values that have been pushed to it.
pub struct TestDownloader {
#[derive(Debug)]
pub struct TestHeaderDownloader {
result: Result<Vec<SealedHeader>, DownloadError>,
}
impl TestDownloader {
impl TestHeaderDownloader {
/// Instantiates the downloader with the mock responses
pub fn new(result: Result<Vec<SealedHeader>, DownloadError>) -> Self {
Self { result }
@ -26,7 +27,7 @@ impl TestDownloader {
}
#[async_trait::async_trait]
impl Downloader for TestDownloader {
impl HeaderDownloader for TestHeaderDownloader {
type Consensus = TestConsensus;
type Client = TestHeadersClient;
@ -51,8 +52,8 @@ impl Downloader for TestDownloader {
}
}
#[derive(Debug)]
/// A test client for fetching headers
#[derive(Debug)]
pub struct TestHeadersClient {
req_tx: mpsc::Sender<(u64, HeadersRequest)>,
req_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<(u64, HeadersRequest)>>>,
@ -109,7 +110,7 @@ impl HeadersClient for TestHeadersClient {
}
}
/// Consensus client impl for testing
/// Consensus engine implementation for testing
#[derive(Debug)]
pub struct TestConsensus {
/// Watcher over the forkchoice state
@ -132,14 +133,14 @@ impl Default for TestConsensus {
}
impl TestConsensus {
/// Update the forkchoice state
/// Update the fork choice state
pub fn update_tip(&self, tip: H256) {
let state = ForkchoiceState {
head_block_hash: tip,
finalized_block_hash: H256::zero(),
safe_block_hash: H256::zero(),
};
self.channel.0.send(state).expect("updating forkchoice state failed");
self.channel.0.send(state).expect("updating fork choice state failed");
}
/// Update the validation flag
@ -165,29 +166,12 @@ impl Consensus for TestConsensus {
Ok(())
}
}
}
/// Generate a range of random header. The parent hash of the first header
/// in the result will be equal to head
pub fn gen_random_header_range(rng: std::ops::Range<u64>, head: H256) -> Vec<SealedHeader> {
let mut headers = Vec::with_capacity(rng.end.saturating_sub(rng.start) as usize);
for idx in rng {
headers.push(gen_random_header(
idx,
Some(headers.last().map(|h: &SealedHeader| h.hash()).unwrap_or(head)),
));
fn pre_validate_block(&self, _block: &BlockLocked) -> Result<(), Error> {
if self.fail_validation {
Err(consensus::Error::BaseFeeMissing)
} else {
Ok(())
}
}
headers
}
/// Generate a random header
pub fn gen_random_header(number: u64, parent: Option<H256>) -> SealedHeader {
let header = reth_primitives::Header {
number,
nonce: rand::random(),
difficulty: U256::from(rand::random::<u32>()),
parent_hash: parent.unwrap_or_default(),
..Default::default()
};
header.seal()
}

View File

@ -1,5 +1,10 @@
mod api;
mod bodies;
mod headers;
/// Generators for different data structures like block headers, block bodies and ranges of those.
pub mod generators;
pub use api::TestApi;
pub use bodies::*;
pub use headers::*;

View File

@ -0,0 +1,21 @@
[package]
name = "reth-bodies-downloaders"
version = "0.1.0"
edition = "2021"
license = "MIT OR Apache-2.0"
repository = "https://github.com/foundry-rs/reth"
readme = "README.md"
description = "Implementations of various block body downloaders"
[dependencies]
futures-util = "0.3.25"
reth-interfaces = { path = "../../interfaces" }
reth-primitives = { path = "../../primitives" }
reth-eth-wire = { path= "../eth-wire" }
[dev-dependencies]
assert_matches = "1.5.0"
once_cell = "1.15.0"
rand = "0.8.5"
reth-interfaces = { path = "../../interfaces", features = ["test-utils"] }
tokio = { version = "1.21.2", features = ["full"] }
serial_test = "0.9.0"

View File

@ -0,0 +1,132 @@
use futures_util::{stream, StreamExt, TryFutureExt};
use reth_interfaces::p2p::bodies::{
client::BodiesClient,
downloader::{BodiesStream, BodyDownloader},
error::{BodiesClientError, DownloadError},
};
use reth_primitives::{BlockNumber, H256};
use std::{sync::Arc, time::Duration};
/// Downloads bodies in batches.
///
/// All blocks in a batch are fetched at the same time.
#[derive(Debug)]
pub struct ConcurrentDownloader<C> {
/// The bodies client
client: Arc<C>,
/// The batch size per one request
pub batch_size: usize,
/// A single request timeout
pub request_timeout: Duration,
/// The number of retries for downloading
pub request_retries: usize,
}
impl<C: BodiesClient> BodyDownloader for ConcurrentDownloader<C> {
type Client = C;
/// The request timeout duration
fn timeout(&self) -> Duration {
self.request_timeout
}
/// The block bodies client
fn client(&self) -> &Self::Client {
&self.client
}
fn bodies_stream<'a, 'b, I>(&'a self, headers: I) -> BodiesStream<'a>
where
I: IntoIterator<Item = &'b (BlockNumber, H256)>,
<I as IntoIterator>::IntoIter: Send + 'b,
'b: 'a,
{
// TODO: Retry
Box::pin(
stream::iter(headers.into_iter().map(|(block_number, header_hash)| {
{
self.client
.get_block_body(*header_hash)
.map_ok(move |body| (*block_number, *header_hash, body))
.map_err(|err| match err {
BodiesClientError::Timeout { header_hash } => {
DownloadError::Timeout { header_hash }
}
err => DownloadError::Client { source: err },
})
}
}))
.buffered(self.batch_size),
)
}
}
/// A [ConcurrentDownloader] builder.
#[derive(Debug)]
pub struct ConcurrentDownloaderBuilder {
/// The batch size per one request
batch_size: usize,
/// A single request timeout
request_timeout: Duration,
/// The number of retries for downloading
request_retries: usize,
}
impl Default for ConcurrentDownloaderBuilder {
fn default() -> Self {
Self { batch_size: 100, request_timeout: Duration::from_millis(100), request_retries: 5 }
}
}
impl ConcurrentDownloaderBuilder {
/// Set the request batch size
pub fn batch_size(mut self, size: usize) -> Self {
self.batch_size = size;
self
}
/// Set the request timeout
pub fn timeout(mut self, timeout: Duration) -> Self {
self.request_timeout = timeout;
self
}
/// Set the number of retries per request
pub fn retries(mut self, retries: usize) -> Self {
self.request_retries = retries;
self
}
/// Build [ConcurrentDownloader] with the provided client
pub fn build<C: BodiesClient>(self, client: Arc<C>) -> ConcurrentDownloader<C> {
ConcurrentDownloader {
client,
batch_size: self.batch_size,
request_timeout: self.request_timeout,
request_retries: self.request_retries,
}
}
}
#[cfg(test)]
mod tests {
#[tokio::test]
#[ignore]
async fn emits_bodies_in_order() {}
#[tokio::test]
#[ignore]
async fn header_iter_failure() {}
#[tokio::test]
#[ignore]
async fn client_failure() {}
#[tokio::test]
#[ignore]
async fn retries_requests() {}
#[tokio::test]
#[ignore]
async fn timeout() {}
}

View File

@ -0,0 +1,11 @@
#![warn(missing_docs, unreachable_pub)]
#![deny(unused_must_use, rust_2018_idioms)]
#![doc(test(
no_crate_inject,
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
))]
//! Implements body downloader algorithms.
/// A naive concurrent downloader.
pub mod concurrent;

View File

@ -34,7 +34,7 @@ pub enum EgressECIESValue {
#[derive(Clone, Debug, PartialEq, Eq)]
/// Raw ingress values for an ECIES protocol
pub enum IngressECIESValue {
/// Receiving a message from a [`peerId`]
/// Receiving a message from a [`PeerId`]
AuthReceive(PeerId),
/// Receiving an ACK message
Ack,

View File

@ -108,6 +108,7 @@ impl From<Vec<H256>> for GetBlockBodies {
}
}
// TODO(onbjerg): We should have this type in primitives
/// A response to [`GetBlockBodies`], containing bodies if any bodies were found.
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable)]
pub struct BlockBody {

View File

@ -5,7 +5,8 @@ use reth_interfaces::{
consensus::Consensus,
p2p::headers::{
client::{HeadersClient, HeadersStream},
downloader::{DownloadError, Downloader},
downloader::HeaderDownloader,
error::DownloadError,
},
};
use reth_primitives::{rpc::BlockId, SealedHeader};
@ -27,7 +28,7 @@ pub struct LinearDownloader<C, H> {
}
#[async_trait]
impl<C: Consensus, H: HeadersClient> Downloader for LinearDownloader<C, H> {
impl<C: Consensus, H: HeadersClient> HeaderDownloader for LinearDownloader<C, H> {
type Consensus = C;
type Client = H;
@ -161,11 +162,6 @@ impl Default for LinearDownloadBuilder {
}
impl LinearDownloadBuilder {
/// Initialize a new builder
pub fn new() -> Self {
Self::default()
}
/// Set the request batch size
pub fn batch_size(mut self, size: u64) -> Self {
self.batch_size = size;
@ -207,7 +203,8 @@ mod tests {
use reth_interfaces::{
p2p::headers::client::HeadersRequest,
test_utils::{
gen_random_header, gen_random_header_range, TestConsensus, TestHeadersClient,
generators::{random_header, random_header_range},
TestConsensus, TestHeadersClient,
},
};
use reth_primitives::{rpc::BlockId, SealedHeader};
@ -233,7 +230,7 @@ mod tests {
let retries = 5;
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
let downloader = LinearDownloadBuilder::new()
let downloader = LinearDownloadBuilder::default()
.retries(retries)
.build(CONSENSUS.clone(), CLIENT.clone());
let result =
@ -257,7 +254,7 @@ mod tests {
let retries = 5;
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
let downloader = LinearDownloadBuilder::new()
let downloader = LinearDownloadBuilder::default()
.retries(retries)
.build(CONSENSUS.clone(), CLIENT.clone());
let result =
@ -286,14 +283,14 @@ mod tests {
#[tokio::test]
#[serial]
async fn download_propagates_consensus_validation_error() {
let tip_parent = gen_random_header(1, None);
let tip = gen_random_header(2, Some(tip_parent.hash()));
let tip_parent = random_header(1, None);
let tip = random_header(2, Some(tip_parent.hash()));
let tip_hash = tip.hash();
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
let downloader =
LinearDownloadBuilder::new().build(CONSENSUS_FAIL.clone(), CLIENT.clone());
LinearDownloadBuilder::default().build(CONSENSUS_FAIL.clone(), CLIENT.clone());
let forkchoice = ForkchoiceState { head_block_hash: tip_hash, ..Default::default() };
let result = downloader.download(&SealedHeader::default(), &forkchoice).await;
tx.send(result).expect("failed to forward download response");
@ -322,14 +319,15 @@ mod tests {
#[tokio::test]
#[serial]
async fn download_starts_with_chain_tip() {
let head = gen_random_header(1, None);
let tip = gen_random_header(2, Some(head.hash()));
let head = random_header(1, None);
let tip = random_header(2, Some(head.hash()));
let tip_hash = tip.hash();
let chain_head = head.clone();
let (tx, mut rx) = oneshot::channel();
tokio::spawn(async move {
let downloader = LinearDownloadBuilder::new().build(CONSENSUS.clone(), CLIENT.clone());
let downloader =
LinearDownloadBuilder::default().build(CONSENSUS.clone(), CLIENT.clone());
let forkchoice = ForkchoiceState { head_block_hash: tip_hash, ..Default::default() };
let result = downloader.download(&chain_head, &forkchoice).await;
tx.send(result).expect("failed to forward download response");
@ -359,15 +357,16 @@ mod tests {
#[serial]
async fn download_returns_headers_desc() {
let (start, end) = (100, 200);
let head = gen_random_header(start, None);
let mut headers = gen_random_header_range(start + 1..end, head.hash());
let head = random_header(start, None);
let mut headers = random_header_range(start + 1..end, head.hash());
headers.reverse();
let tip_hash = headers.first().unwrap().hash();
let chain_head = head.clone();
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
let downloader = LinearDownloadBuilder::new().build(CONSENSUS.clone(), CLIENT.clone());
let downloader =
LinearDownloadBuilder::default().build(CONSENSUS.clone(), CLIENT.clone());
let forkchoice = ForkchoiceState { head_block_hash: tip_hash, ..Default::default() };
let result = downloader.download(&chain_head, &forkchoice).await;
tx.send(result).expect("failed to forward download response");

View File

@ -34,7 +34,11 @@ hex = "0.4"
hex-literal = "0.3"
derive_more = "0.99"
# proof related
triehash = "0.8"
# See to replace hashers to simplify libraries
plain_hasher = "0.2"
hash-db = "0.15"
[dev-dependencies]
arbitrary = { version = "1.1.7", features = ["derive"]}

View File

@ -4,8 +4,8 @@ use thiserror::Error;
/// Primitives error type.
#[derive(Debug, Error)]
pub enum Error {
/// Input provided is invalid.
#[error("Input provided is invalid.")]
/// The provided input is invalid.
#[error("The provided input is invalid.")]
InvalidInput,
/// Failed to deserialize data into type.
#[error("Failed to deserialize data into type.")]

View File

@ -1,4 +1,7 @@
use crate::{BlockHash, BlockNumber, Bloom, H160, H256, U256};
use crate::{
proofs::{EMPTY_LIST_HASH, EMPTY_ROOT},
BlockHash, BlockNumber, Bloom, H160, H256, U256,
};
use bytes::{BufMut, BytesMut};
use ethers_core::{types::H64, utils::keccak256};
use reth_codecs::main_codec;
@ -7,7 +10,7 @@ use std::ops::Deref;
/// Block header
#[main_codec]
#[derive(Debug, Clone, PartialEq, Eq, Default, Hash)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Header {
/// The Keccak 256-bit hash of the parent
/// blocks header, in its entirety; formally Hp.
@ -64,6 +67,29 @@ pub struct Header {
pub base_fee_per_gas: Option<u64>,
}
impl Default for Header {
fn default() -> Self {
Header {
parent_hash: Default::default(),
ommers_hash: EMPTY_LIST_HASH,
beneficiary: Default::default(),
state_root: EMPTY_ROOT,
transactions_root: EMPTY_ROOT,
receipts_root: EMPTY_ROOT,
logs_bloom: Default::default(),
difficulty: Default::default(),
number: 0,
gas_limit: 0,
gas_used: 0,
timestamp: 0,
extra_data: Default::default(),
mix_hash: Default::default(),
nonce: 0,
base_fee_per_gas: None,
}
}
}
impl Header {
/// Heavy function that will calculate hash of data and will *not* save the change to metadata.
/// Use [`Header::seal`], [`SealedHeader`] and unlock if you need hash to be persistent.
@ -239,6 +265,10 @@ mod tests {
gas_used: 0x15b3_u64,
timestamp: 0x1a0a_u64,
extra_data: Bytes::from_str("7788").unwrap().0,
ommers_hash: H256::zero(),
state_root: H256::zero(),
transactions_root: H256::zero(),
receipts_root: H256::zero(),
..Default::default()
};
let mut data = vec![];
@ -285,6 +315,10 @@ mod tests {
gas_used: 0x15b3u64,
timestamp: 0x1a0au64,
extra_data: Bytes::from_str("7788").unwrap().0,
ommers_hash: H256::zero(),
state_root: H256::zero(),
transactions_root: H256::zero(),
receipts_root: H256::zero(),
..Default::default()
};
let header = <Header as Decodable>::decode(&mut data.as_slice()).unwrap();

View File

@ -6,6 +6,8 @@
))]
//! Commonly used types in reth.
//!
//! This crate contains Ethereum primitive types and helper functions.
mod account;
mod block;
@ -23,6 +25,9 @@ mod receipt;
mod storage;
mod transaction;
/// Helper function for calculating Merkle proofs and hashes
pub mod proofs;
pub use account::Account;
pub use block::{Block, BlockLocked};
pub use chain::Chain;
@ -41,25 +46,24 @@ pub use transaction::{
TransactionSignedEcRecovered, TxType,
};
/// Block hash.
/// A block hash.
pub type BlockHash = H256;
/// Block Number is height of chain
/// A block number.
pub type BlockNumber = u64;
/// Ethereum address
/// An Ethereum address.
pub type Address = H160;
// TODO(onbjerg): Is this not the same as [BlockHash]?
/// BlockId is Keccak hash of the header
pub type BlockID = H256;
/// TxHash is Kecack hash of rlp encoded signed transaction
/// A transaction hash is a kecack hash of an RLP encoded signed transaction.
pub type TxHash = H256;
/// TxNumber is sequence number of all existing transactions
/// The sequence number of all existing transactions.
pub type TxNumber = u64;
/// Chain identifier type, introduced in EIP-155
/// Chain identifier type (introduced in EIP-155).
pub type ChainId = u64;
/// Storage Key
/// An account storage key.
pub type StorageKey = H256;
/// Storage value
/// An account storage value.
pub type StorageValue = U256;
// TODO: should we use `PublicKey` for this? Even when dealing with public keys we should try to

View File

@ -1,26 +1,38 @@
use crate::{keccak256, Bytes, Header, Log, Receipt, TransactionSigned, H256};
use ethers_core::utils::rlp::RlpStream;
use hash_db::Hasher;
use hex_literal::hex;
use plain_hasher::PlainHasher;
use reth_primitives::{Bytes, Header, Log, Receipt, TransactionSigned, H256};
use reth_rlp::Encodable;
use rlp::RlpStream;
use sha3::{Digest, Keccak256};
use triehash::sec_trie_root;
/// Keccak-256 hash of the RLP of an empty list, KEC("\xc0").
pub const EMPTY_LIST_HASH: H256 =
H256(hex!("1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347"));
/// Root hash of an empty trie.
pub const EMPTY_ROOT: H256 =
H256(hex!("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"));
/// A [Hasher] that calculates a keccak256 hash of the given data.
#[derive(Default, Debug, Clone, PartialEq, Eq)]
struct KeccakHasher;
impl Hasher for KeccakHasher {
type Out = H256;
type StdHasher = PlainHasher;
const LENGTH: usize = 32;
fn hash(x: &[u8]) -> Self::Out {
let out = Keccak256::digest(x);
// TODO make more performant, H256 from slice is not good enought.
H256::from_slice(out.as_slice())
keccak256(x)
}
}
/// Calculate Transaction root. Iterate over transaction and create merkle trie of
/// (rlp(index),encoded(tx)) pairs.
/// Calculate a transaction root.
///
/// Iterates over the given transactions and the merkle merkle trie root of
/// `(rlp(index), encoded(tx))` pairs.
pub fn calculate_transaction_root<'a>(
transactions: impl IntoIterator<Item = &'a TransactionSigned>,
) -> H256 {
@ -40,7 +52,7 @@ pub fn calculate_transaction_root<'a>(
)
}
/// Create receipt root for header
/// Calculates the receipt root for a header.
pub fn calculate_receipt_root<'a>(receipts: impl IntoIterator<Item = &'a Receipt>) -> H256 {
sec_trie_root::<KeccakHasher, _, _, _>(
receipts
@ -57,7 +69,7 @@ pub fn calculate_receipt_root<'a>(receipts: impl IntoIterator<Item = &'a Receipt
)
}
/// Create log hash for header
/// Calculates the log root for a header.
pub fn calculate_log_root<'a>(logs: impl IntoIterator<Item = &'a Log>) -> H256 {
//https://github.com/ethereum/go-ethereum/blob/356bbe343a30789e77bb38f25983c8f2f2bfbb47/cmd/evm/internal/t8ntool/execution.go#L255
let mut stream = RlpStream::new();
@ -71,11 +83,10 @@ pub fn calculate_log_root<'a>(logs: impl IntoIterator<Item = &'a Log>) -> H256 {
stream.finalize_unbounded_list();
let out = stream.out().freeze();
let out = Keccak256::digest(out);
H256::from_slice(out.as_slice())
keccak256(out)
}
/// Calculate hash for ommer/uncle headers
/// Calculates the root hash for ommer/uncle headers.
pub fn calculate_ommers_root<'a>(_ommers: impl IntoIterator<Item = &'a Header>) -> H256 {
// RLP Encode
let mut stream = RlpStream::new();
@ -87,8 +98,7 @@ pub fn calculate_ommers_root<'a>(_ommers: impl IntoIterator<Item = &'a Header>)
*/
stream.finalize_unbounded_list();
let bytes = stream.out().freeze();
let out = Keccak256::digest(bytes);
H256::from_slice(out.as_slice())
keccak256(bytes)
}
// TODO state root

View File

@ -13,8 +13,9 @@ use reth_rlp::{length_of_length, Decodable, DecodeError, Encodable, Header, EMPT
pub use signature::Signature;
pub use tx_type::TxType;
/// Raw Transaction.
/// Transaction type is introduced in EIP-2718: https://eips.ethereum.org/EIPS/eip-2718
/// A raw transaction.
///
/// Transaction types were introduced in [EIP-2718](https://eips.ethereum.org/EIPS/eip-2718).
#[main_codec]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum Transaction {
@ -49,7 +50,7 @@ pub enum Transaction {
/// input data of the message call, formally Td.
input: Bytes,
},
/// Transaction with AccessList. https://eips.ethereum.org/EIPS/eip-2930
/// Transaction with an [`AccessList`] ([EIP-2930](https://eips.ethereum.org/EIPS/eip-2930)).
Eip2930 {
/// Added as EIP-155: Simple replay attack protection
chain_id: ChainId,
@ -86,7 +87,7 @@ pub enum Transaction {
/// accessing outside the list.
access_list: AccessList,
},
/// Transaction with priority fee. https://eips.ethereum.org/EIPS/eip-1559
/// A transaction with a priority fee ([EIP-1559](https://eips.ethereum.org/EIPS/eip-1559)).
Eip1559 {
/// Added as EIP-155: Simple replay attack protection
chain_id: u64,
@ -175,6 +176,15 @@ impl Transaction {
}
}
/// Get the gas limit of the transaction.
pub fn gas_limit(&self) -> u64 {
match self {
Transaction::Legacy { gas_limit, .. } |
Transaction::Eip2930 { gas_limit, .. } |
Transaction::Eip1559 { gas_limit, .. } => *gas_limit,
}
}
/// Max fee per gas for eip1559 transaction, for legacy transactions this is gas_limit
pub fn max_fee_per_gas(&self) -> u64 {
match self {

View File

@ -18,10 +18,14 @@ tracing-futures = "0.2.5"
tokio = { version = "1.21.2", features = ["sync"] }
aquamarine = "0.1.12"
metrics = "0.20.1"
futures-util = "0.3.25"
[dev-dependencies]
reth-db = { path = "../db", features = ["test-utils"] }
reth-interfaces = { path = "../interfaces", features = ["test-utils"] }
reth-bodies-downloaders = { path = "../net/bodies-downloaders" }
# TODO(onbjerg): We only need this for [BlockBody]
reth-eth-wire = { path = "../net/eth-wire" }
reth-headers-downloaders = { path = "../net/headers-downloaders" }
tokio = { version = "*", features = ["rt", "sync", "macros"] }
tokio-stream = "0.1.10"

View File

@ -1,5 +1,5 @@
use crate::pipeline::PipelineEvent;
use reth_interfaces::db::Error as DbError;
use reth_interfaces::{consensus, db::Error as DbError};
use reth_primitives::{BlockNumber, H256};
use thiserror::Error;
use tokio::sync::mpsc::error::SendError;
@ -8,12 +8,13 @@ use tokio::sync::mpsc::error::SendError;
#[derive(Error, Debug)]
pub enum StageError {
/// The stage encountered a state validation error.
///
/// TODO: This depends on the consensus engine and should include the validation failure reason
#[error("Stage encountered a validation error in block {block}.")]
#[error("Stage encountered a validation error in block {block}: {error}.")]
Validation {
/// The block that failed validation.
block: BlockNumber,
/// The underlying consensus error.
#[source]
error: consensus::Error,
},
/// The stage encountered a database error.
#[error("An internal database error occurred.")]
@ -30,34 +31,41 @@ pub enum StageError {
/// The sender stage error
#[derive(Error, Debug)]
pub enum DatabaseIntegrityError {
/// Cannonical hash is missing from db
#[error("no cannonical hash for block #{number}")]
CannonicalHash {
// TODO(onbjerg): What's the difference between this and the one below?
/// The canonical hash for a block is missing from the database.
#[error("No canonical hash for block #{number}")]
CanonicalHash {
/// The block number key
number: BlockNumber,
},
/// Cannonical header is missing from db
#[error("no cannonical hash for block #{number}")]
CannonicalHeader {
/// The canonical header for a block is missing from the database.
#[error("No canonical hash for block #{number}")]
CanonicalHeader {
/// The block number key
number: BlockNumber,
},
/// Header is missing from db
#[error("no header for block #{number} ({hash})")]
/// A header is missing from the database.
#[error("No header for block #{number} ({hash})")]
Header {
/// The block number key
number: BlockNumber,
/// The block hash key
hash: H256,
},
/// Cumulative transaction count is missing from db
#[error("no cumulative tx count for ${number} ({hash})")]
/// The cumulative transaction count is missing from the database.
#[error("No cumulative tx count for ${number} ({hash})")]
CumulativeTxCount {
/// The block number key
number: BlockNumber,
/// The block hash key
hash: H256,
},
/// A block body is missing.
#[error("Block body not found for block #{number}")]
BlockBody {
/// The block number key
number: BlockNumber,
},
}
/// A pipeline execution error.

View File

@ -341,8 +341,8 @@ impl<DB: Database> QueuedStage<DB> {
Err(err) => {
state.events_sender.send(PipelineEvent::Error { stage_id }).await?;
return if let StageError::Validation { block } = err {
debug!(stage = %stage_id, bad_block = %block, "Stage encountered a validation error.");
return if let StageError::Validation { block, error } = err {
debug!(stage = %stage_id, bad_block = %block, "Stage encountered a validation error: {error}");
// We unwind because of a validation error. If the unwind itself fails,
// we bail entirely, otherwise we restart the execution loop from the
@ -362,13 +362,13 @@ impl<DB: Database> QueuedStage<DB> {
#[cfg(test)]
mod tests {
use super::*;
use crate::{StageId, UnwindOutput};
use reth_db::{
kv::{test_utils, Env, EnvKind},
mdbx::{self, WriteMap},
};
use reth_interfaces::consensus;
use tokio::sync::mpsc::channel;
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use utils::TestStage;
@ -520,7 +520,10 @@ mod tests {
)
.push(
TestStage::new(StageId("B"))
.add_exec(Err(StageError::Validation { block: 5 }))
.add_exec(Err(StageError::Validation {
block: 5,
error: consensus::Error::BaseFeeMissing,
}))
.add_unwind(Ok(UnwindOutput { stage_progress: 0 }))
.add_exec(Ok(ExecOutput {
stage_progress: 10,

View File

@ -0,0 +1,842 @@
use crate::{
DatabaseIntegrityError, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput,
UnwindOutput,
};
use futures_util::TryStreamExt;
use reth_interfaces::{
consensus::Consensus,
db::{
models::StoredBlockBody, tables, DBContainer, Database, DatabaseGAT, DbCursorRO,
DbCursorRW, DbTx, DbTxMut,
},
p2p::bodies::downloader::BodyDownloader,
};
use reth_primitives::{
proofs::{EMPTY_LIST_HASH, EMPTY_ROOT},
BlockLocked, BlockNumber, SealedHeader, H256,
};
use std::fmt::Debug;
use tracing::warn;
const BODIES: StageId = StageId("Bodies");
// TODO(onbjerg): Metrics and events (gradual status for e.g. CLI)
/// The body stage downloads block bodies.
///
/// The body stage downloads block bodies for all block headers stored locally in the database.
///
/// # Empty blocks
///
/// Blocks with an ommers hash corresponding to no ommers *and* a transaction root corresponding to
/// no transactions will not have a block body downloaded for them, since it would be meaningless to
/// do so.
///
/// This also means that if there is no body for the block in the database (assuming the
/// block number <= the synced block of this stage), then the block can be considered empty.
///
/// # Tables
///
/// The bodies are processed and data is inserted into these tables:
///
/// - [`BlockBodies`][reth_interfaces::db::tables::BlockBodies]
/// - [`Transactions`][reth_interfaces::db::tables::Transactions]
///
/// # Genesis
///
/// This stage expects that the genesis has been inserted into the appropriate tables:
///
/// - The header tables (see [HeadersStage][crate::stages::headers::HeadersStage])
/// - The various indexes (e.g. [TotalTxIndex][crate::stages::tx_index::TxIndex])
/// - The [`BlockBodies`][reth_interfaces::db::tables::BlockBodies] table
#[derive(Debug)]
pub struct BodyStage<D: BodyDownloader, C: Consensus> {
/// The body downloader.
pub downloader: D,
/// The consensus engine.
pub consensus: C,
/// The maximum amount of block bodies to process in one stage execution.
///
/// Smaller batch sizes result in less memory usage, but more disk I/O. Larger batch sizes
/// result in more memory usage, less disk I/O, and more infrequent checkpoints.
pub batch_size: u64,
}
#[async_trait::async_trait]
impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C> {
/// Return the id of the stage
fn id(&self) -> StageId {
BODIES
}
/// Download block bodies from the last checkpoint for this stage up until the latest synced
/// header, limited by the stage's batch size.
async fn execute(
&mut self,
db: &mut DBContainer<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let tx = db.get_mut();
let previous_stage_progress =
input.previous_stage.as_ref().map(|(_, block)| *block).unwrap_or_default();
if previous_stage_progress == 0 {
warn!("The body stage seems to be running first, no work can be completed.");
}
// The block we ended at last sync, and the one we are starting on now
let previous_block = input.stage_progress.unwrap_or_default();
let starting_block = previous_block + 1;
// Short circuit in case we already reached the target block
let target = previous_stage_progress.min(starting_block + self.batch_size);
if target <= previous_block {
return Ok(ExecOutput { stage_progress: target, reached_tip: true, done: true })
}
let bodies_to_download = self.bodies_to_download::<DB>(tx, starting_block, target)?;
// Cursors used to write bodies and transactions
let mut bodies_cursor = tx.cursor_mut::<tables::BlockBodies>()?;
let mut tx_cursor = tx.cursor_mut::<tables::Transactions>()?;
let mut base_tx_id = bodies_cursor
.last()?
.map(|(_, body)| body.base_tx_id + body.tx_amount)
.ok_or(DatabaseIntegrityError::BlockBody { number: starting_block })?;
// Cursor used to look up headers for block pre-validation
let mut header_cursor = tx.cursor::<tables::Headers>()?;
// NOTE(onbjerg): The stream needs to live here otherwise it will just create a new iterator
// on every iteration of the while loop -_-
let mut bodies_stream = self.downloader.bodies_stream(bodies_to_download.iter());
let mut highest_block = previous_block;
while let Some((block_number, header_hash, body)) =
bodies_stream.try_next().await.map_err(|err| StageError::Internal(err.into()))?
{
// Fetch the block header for pre-validation
let block = BlockLocked {
header: SealedHeader::new(
header_cursor
.seek_exact((block_number, header_hash).into())?
.ok_or(DatabaseIntegrityError::Header {
number: block_number,
hash: header_hash,
})?
.1,
header_hash,
),
body: body.transactions,
// TODO: We should have a type w/o receipts probably, no reason to allocate here
receipts: vec![],
ommers: body.ommers.into_iter().map(|header| header.seal()).collect(),
};
// Pre-validate the block and unwind if it is invalid
self.consensus
.pre_validate_block(&block)
.map_err(|err| StageError::Validation { block: block_number, error: err })?;
// Write block
bodies_cursor.append(
(block_number, header_hash).into(),
StoredBlockBody {
base_tx_id,
tx_amount: block.body.len() as u64,
ommers: block.ommers.into_iter().map(|header| header.unseal()).collect(),
},
)?;
// Write transactions
for transaction in block.body {
tx_cursor.append(base_tx_id, transaction)?;
base_tx_id += 1;
}
highest_block = block_number;
}
// The stage is "done" if:
// - We got fewer blocks than our target
// - We reached our target and the target was not limited by the batch size of the stage
let capped = target < previous_stage_progress;
let done = highest_block < target || !capped;
Ok(ExecOutput { stage_progress: highest_block, reached_tip: true, done })
}
/// Unwind the stage.
async fn unwind(
&mut self,
db: &mut DBContainer<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>> {
let tx = db.get_mut();
let mut block_body_cursor = tx.cursor_mut::<tables::BlockBodies>()?;
let mut transaction_cursor = tx.cursor_mut::<tables::Transactions>()?;
let mut entry = block_body_cursor.last()?;
while let Some((key, body)) = entry {
if key.number() <= input.unwind_to {
break
}
for num in 0..body.tx_amount {
let tx_id = body.base_tx_id + num;
if transaction_cursor.seek_exact(tx_id)?.is_some() {
transaction_cursor.delete_current()?;
}
}
block_body_cursor.delete_current()?;
entry = block_body_cursor.prev()?;
}
Ok(UnwindOutput { stage_progress: input.unwind_to })
}
}
impl<D: BodyDownloader, C: Consensus> BodyStage<D, C> {
/// Computes a list of `(block_number, header_hash)` for blocks that we need to download bodies
/// for.
///
/// This skips empty blocks (i.e. no ommers, no transactions).
fn bodies_to_download<DB: Database>(
&self,
tx: &mut <DB as DatabaseGAT<'_>>::TXMut,
starting_block: BlockNumber,
target: BlockNumber,
) -> Result<Vec<(BlockNumber, H256)>, StageError> {
let mut header_cursor = tx.cursor::<tables::Headers>()?;
let mut header_hashes_cursor = tx.cursor::<tables::CanonicalHeaders>()?;
let mut walker = header_hashes_cursor
.walk(starting_block)?
.take_while(|item| item.as_ref().map_or(false, |(num, _)| *num <= target));
let mut bodies_to_download = Vec::new();
while let Some(Ok((block_number, header_hash))) = walker.next() {
let header = header_cursor
.seek_exact((block_number, header_hash).into())?
.ok_or(DatabaseIntegrityError::Header { number: block_number, hash: header_hash })?
.1;
if header.ommers_hash == EMPTY_LIST_HASH && header.transactions_root == EMPTY_ROOT {
continue
}
bodies_to_download.push((block_number, header_hash));
}
Ok(bodies_to_download)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::util::test_utils::StageTestRunner;
use assert_matches::assert_matches;
use reth_eth_wire::BlockBody;
use reth_interfaces::{
consensus,
p2p::bodies::error::DownloadError,
test_utils::generators::{random_block, random_block_range},
};
use reth_primitives::{BlockNumber, H256};
use std::collections::HashMap;
use test_utils::*;
/// Check that the execution is short-circuited if the database is empty.
#[tokio::test]
async fn empty_db() {
let runner = BodyTestRunner::new(TestBodyDownloader::default);
let rx = runner.execute(ExecInput::default());
assert_matches!(
rx.await.unwrap(),
Ok(ExecOutput { stage_progress: 0, reached_tip: true, done: true })
)
}
/// Check that the execution is short-circuited if the target was already reached.
#[tokio::test]
async fn already_reached_target() {
let runner = BodyTestRunner::new(TestBodyDownloader::default);
let rx = runner.execute(ExecInput {
previous_stage: Some((StageId("Headers"), 100)),
stage_progress: Some(100),
});
assert_matches!(
rx.await.unwrap(),
Ok(ExecOutput { stage_progress: 100, reached_tip: true, done: true })
)
}
/// Checks that the stage downloads at most `batch_size` blocks.
#[tokio::test]
async fn partial_body_download() {
// Generate blocks
let blocks = random_block_range(1..200, GENESIS_HASH);
let bodies: HashMap<H256, Result<BlockBody, DownloadError>> =
blocks.iter().map(body_by_hash).collect();
let mut runner = BodyTestRunner::new(|| TestBodyDownloader::new(bodies.clone()));
// Set the batch size (max we sync per stage execution) to less than the number of blocks
// the previous stage synced (10 vs 20)
runner.set_batch_size(10);
// Insert required state
runner.insert_genesis().expect("Could not insert genesis block");
runner
.insert_headers(blocks.iter().map(|block| &block.header))
.expect("Could not insert headers");
// Run the stage
let rx = runner.execute(ExecInput {
previous_stage: Some((StageId("Headers"), blocks.len() as BlockNumber)),
stage_progress: None,
});
// Check that we only synced around `batch_size` blocks even though the number of blocks
// synced by the previous stage is higher
let output = rx.await.unwrap();
assert_matches!(
output,
Ok(ExecOutput { stage_progress, reached_tip: true, done: false }) if stage_progress < 200
);
runner
.validate_db_blocks(output.unwrap().stage_progress)
.expect("Written block data invalid");
}
/// Same as [partial_body_download] except the `batch_size` is not hit.
#[tokio::test]
async fn full_body_download() {
// Generate blocks #1-20
let blocks = random_block_range(1..21, GENESIS_HASH);
let bodies: HashMap<H256, Result<BlockBody, DownloadError>> =
blocks.iter().map(body_by_hash).collect();
let mut runner = BodyTestRunner::new(|| TestBodyDownloader::new(bodies.clone()));
// Set the batch size to more than what the previous stage synced (40 vs 20)
runner.set_batch_size(40);
// Insert required state
runner.insert_genesis().expect("Could not insert genesis block");
runner
.insert_headers(blocks.iter().map(|block| &block.header))
.expect("Could not insert headers");
// Run the stage
let rx = runner.execute(ExecInput {
previous_stage: Some((StageId("Headers"), blocks.len() as BlockNumber)),
stage_progress: None,
});
// Check that we synced all blocks successfully, even though our `batch_size` allows us to
// sync more (if there were more headers)
let output = rx.await.unwrap();
assert_matches!(
output,
Ok(ExecOutput { stage_progress: 20, reached_tip: true, done: true })
);
runner
.validate_db_blocks(output.unwrap().stage_progress)
.expect("Written block data invalid");
}
/// Same as [full_body_download] except we have made progress before
#[tokio::test]
async fn sync_from_previous_progress() {
// Generate blocks #1-20
let blocks = random_block_range(1..21, GENESIS_HASH);
let bodies: HashMap<H256, Result<BlockBody, DownloadError>> =
blocks.iter().map(body_by_hash).collect();
let runner = BodyTestRunner::new(|| TestBodyDownloader::new(bodies.clone()));
// Insert required state
runner.insert_genesis().expect("Could not insert genesis block");
runner
.insert_headers(blocks.iter().map(|block| &block.header))
.expect("Could not insert headers");
// Run the stage
let rx = runner.execute(ExecInput {
previous_stage: Some((StageId("Headers"), blocks.len() as BlockNumber)),
stage_progress: None,
});
// Check that we synced at least 10 blocks
let first_run = rx.await.unwrap();
assert_matches!(
first_run,
Ok(ExecOutput { stage_progress, reached_tip: true, done: false }) if stage_progress >= 10
);
let first_run_progress = first_run.unwrap().stage_progress;
// Execute again on top of the previous run
let rx = runner.execute(ExecInput {
previous_stage: Some((StageId("Headers"), blocks.len() as BlockNumber)),
stage_progress: Some(first_run_progress),
});
// Check that we synced more blocks
let output = rx.await.unwrap();
assert_matches!(
output,
Ok(ExecOutput { stage_progress, reached_tip: true, done: true }) if stage_progress > first_run_progress
);
runner
.validate_db_blocks(output.unwrap().stage_progress)
.expect("Written block data invalid");
}
/// Checks that the stage asks to unwind if pre-validation of the block fails.
#[tokio::test]
async fn pre_validation_failure() {
// Generate blocks #1-19
let blocks = random_block_range(1..20, GENESIS_HASH);
let bodies: HashMap<H256, Result<BlockBody, DownloadError>> =
blocks.iter().map(body_by_hash).collect();
let mut runner = BodyTestRunner::new(|| TestBodyDownloader::new(bodies.clone()));
// Fail validation
runner.set_fail_validation(true);
// Insert required state
runner.insert_genesis().expect("Could not insert genesis block");
runner
.insert_headers(blocks.iter().map(|block| &block.header))
.expect("Could not insert headers");
// Run the stage
let rx = runner.execute(ExecInput {
previous_stage: Some((StageId("Headers"), blocks.len() as BlockNumber)),
stage_progress: None,
});
// Check that the error bubbles up
assert_matches!(
rx.await.unwrap(),
Err(StageError::Validation { block: 1, error: consensus::Error::BaseFeeMissing })
);
}
/// Checks that the stage unwinds correctly with no data.
#[tokio::test]
async fn unwind_empty_db() {
let unwind_to = 10;
let runner = BodyTestRunner::new(TestBodyDownloader::default);
let rx = runner.unwind(UnwindInput { bad_block: None, stage_progress: 20, unwind_to });
assert_matches!(
rx.await.unwrap(),
Ok(UnwindOutput { stage_progress }) if stage_progress == unwind_to
)
}
/// Checks that the stage unwinds correctly with data.
#[tokio::test]
async fn unwind() {
// Generate blocks #1-20
let blocks = random_block_range(1..21, GENESIS_HASH);
let bodies: HashMap<H256, Result<BlockBody, DownloadError>> =
blocks.iter().map(body_by_hash).collect();
let mut runner = BodyTestRunner::new(|| TestBodyDownloader::new(bodies.clone()));
// Set the batch size to more than what the previous stage synced (40 vs 20)
runner.set_batch_size(40);
// Insert required state
runner.insert_genesis().expect("Could not insert genesis block");
runner
.insert_headers(blocks.iter().map(|block| &block.header))
.expect("Could not insert headers");
// Run the stage
let rx = runner.execute(ExecInput {
previous_stage: Some((StageId("Headers"), blocks.len() as BlockNumber)),
stage_progress: None,
});
// Check that we synced all blocks successfully, even though our `batch_size` allows us to
// sync more (if there were more headers)
let output = rx.await.unwrap();
assert_matches!(
output,
Ok(ExecOutput { stage_progress: 20, reached_tip: true, done: true })
);
let stage_progress = output.unwrap().stage_progress;
runner.validate_db_blocks(stage_progress).expect("Written block data invalid");
// Unwind all of it
let unwind_to = 1;
let rx = runner.unwind(UnwindInput { bad_block: None, stage_progress, unwind_to });
assert_matches!(
rx.await.unwrap(),
Ok(UnwindOutput { stage_progress }) if stage_progress == 1
);
let last_body = runner.last_body().expect("Could not read last body");
let last_tx_id = last_body.base_tx_id + last_body.tx_amount;
runner
.db()
.check_no_entry_above::<tables::BlockBodies, _>(unwind_to, |key| key.number())
.expect("Did not unwind block bodies correctly.");
runner
.db()
.check_no_entry_above::<tables::Transactions, _>(last_tx_id, |key| key)
.expect("Did not unwind transactions correctly.")
}
/// Checks that the stage unwinds correctly, even if a transaction in a block is missing.
#[tokio::test]
async fn unwind_missing_tx() {
// Generate blocks #1-20
let blocks = random_block_range(1..21, GENESIS_HASH);
let bodies: HashMap<H256, Result<BlockBody, DownloadError>> =
blocks.iter().map(body_by_hash).collect();
let mut runner = BodyTestRunner::new(|| TestBodyDownloader::new(bodies.clone()));
// Set the batch size to more than what the previous stage synced (40 vs 20)
runner.set_batch_size(40);
// Insert required state
runner.insert_genesis().expect("Could not insert genesis block");
runner
.insert_headers(blocks.iter().map(|block| &block.header))
.expect("Could not insert headers");
// Run the stage
let rx = runner.execute(ExecInput {
previous_stage: Some((StageId("Headers"), blocks.len() as BlockNumber)),
stage_progress: None,
});
// Check that we synced all blocks successfully, even though our `batch_size` allows us to
// sync more (if there were more headers)
let output = rx.await.unwrap();
assert_matches!(
output,
Ok(ExecOutput { stage_progress: 20, reached_tip: true, done: true })
);
let stage_progress = output.unwrap().stage_progress;
runner.validate_db_blocks(stage_progress).expect("Written block data invalid");
// Delete a transaction
{
let mut db = runner.db().container();
let mut tx_cursor = db
.get_mut()
.cursor_mut::<tables::Transactions>()
.expect("Could not get transaction cursor");
tx_cursor
.last()
.expect("Could not read database")
.expect("Could not read last transaction");
tx_cursor.delete_current().expect("Could not delete last transaction");
db.commit().expect("Could not commit database");
}
// Unwind all of it
let unwind_to = 1;
let rx = runner.unwind(UnwindInput { bad_block: None, stage_progress, unwind_to });
assert_matches!(
rx.await.unwrap(),
Ok(UnwindOutput { stage_progress }) if stage_progress == 1
);
let last_body = runner.last_body().expect("Could not read last body");
let last_tx_id = last_body.base_tx_id + last_body.tx_amount;
runner
.db()
.check_no_entry_above::<tables::BlockBodies, _>(unwind_to, |key| key.number())
.expect("Did not unwind block bodies correctly.");
runner
.db()
.check_no_entry_above::<tables::Transactions, _>(last_tx_id, |key| key)
.expect("Did not unwind transactions correctly.")
}
/// Checks that the stage exits if the downloader times out
/// TODO: We should probably just exit as "OK", commit the blocks we downloaded successfully and
/// try again?
#[tokio::test]
async fn downloader_timeout() {
// Generate a header
let header = random_block(1, Some(GENESIS_HASH)).header;
let runner = BodyTestRunner::new(|| {
TestBodyDownloader::new(HashMap::from([(
header.hash(),
Err(DownloadError::Timeout { header_hash: header.hash() }),
)]))
});
// Insert required state
runner.insert_genesis().expect("Could not insert genesis block");
runner.insert_header(&header).expect("Could not insert header");
// Run the stage
let rx = runner.execute(ExecInput {
previous_stage: Some((StageId("Headers"), 1)),
stage_progress: None,
});
// Check that the error bubbles up
assert_matches!(rx.await.unwrap(), Err(StageError::Internal(_)));
}
mod test_utils {
use crate::{
stages::bodies::BodyStage,
util::test_utils::{StageTestDB, StageTestRunner},
};
use assert_matches::assert_matches;
use async_trait::async_trait;
use reth_eth_wire::BlockBody;
use reth_interfaces::{
db,
db::{
models::{BlockNumHash, StoredBlockBody},
tables, DbCursorRO, DbTx, DbTxMut,
},
p2p::bodies::{
client::BodiesClient,
downloader::{BodiesStream, BodyDownloader},
error::{BodiesClientError, DownloadError},
},
test_utils::TestConsensus,
};
use reth_primitives::{
BigEndianHash, BlockLocked, BlockNumber, Header, SealedHeader, H256, U256,
};
use std::{collections::HashMap, ops::Deref, time::Duration};
/// The block hash of the genesis block.
pub(crate) const GENESIS_HASH: H256 = H256::zero();
/// A helper to create a collection of resulted-wrapped block bodies keyed by their hash.
pub(crate) fn body_by_hash(
block: &BlockLocked,
) -> (H256, Result<BlockBody, DownloadError>) {
(
block.hash(),
Ok(BlockBody {
transactions: block.body.clone(),
ommers: block.ommers.iter().cloned().map(|ommer| ommer.unseal()).collect(),
}),
)
}
/// A helper struct for running the [BodyStage].
pub(crate) struct BodyTestRunner<F>
where
F: Fn() -> TestBodyDownloader,
{
downloader_builder: F,
db: StageTestDB,
batch_size: u64,
fail_validation: bool,
}
impl<F> BodyTestRunner<F>
where
F: Fn() -> TestBodyDownloader,
{
/// Build a new test runner.
pub(crate) fn new(downloader_builder: F) -> Self {
BodyTestRunner {
downloader_builder,
db: StageTestDB::default(),
batch_size: 10,
fail_validation: false,
}
}
pub(crate) fn set_batch_size(&mut self, batch_size: u64) {
self.batch_size = batch_size;
}
pub(crate) fn set_fail_validation(&mut self, fail_validation: bool) {
self.fail_validation = fail_validation;
}
}
impl<F> StageTestRunner for BodyTestRunner<F>
where
F: Fn() -> TestBodyDownloader,
{
type S = BodyStage<TestBodyDownloader, TestConsensus>;
fn db(&self) -> &StageTestDB {
&self.db
}
fn stage(&self) -> Self::S {
let mut consensus = TestConsensus::default();
consensus.set_fail_validation(self.fail_validation);
BodyStage {
downloader: (self.downloader_builder)(),
consensus,
batch_size: self.batch_size,
}
}
}
impl<F> BodyTestRunner<F>
where
F: Fn() -> TestBodyDownloader,
{
/// Insert the genesis block into the appropriate tables
///
/// The genesis block always has no transactions and no ommers, and it always has the
/// same hash.
pub(crate) fn insert_genesis(&self) -> Result<(), db::Error> {
self.insert_header(&SealedHeader::new(Header::default(), GENESIS_HASH))?;
let mut db = self.db.container();
let tx = db.get_mut();
tx.put::<tables::BlockBodies>(
(0, GENESIS_HASH).into(),
StoredBlockBody { base_tx_id: 0, tx_amount: 0, ommers: vec![] },
)?;
db.commit()?;
Ok(())
}
/// Insert header into tables
pub(crate) fn insert_header(&self, header: &SealedHeader) -> Result<(), db::Error> {
self.insert_headers(std::iter::once(header))
}
/// Insert headers into tables
pub(crate) fn insert_headers<'a, I>(&self, headers: I) -> Result<(), db::Error>
where
I: Iterator<Item = &'a SealedHeader>,
{
let headers = headers.collect::<Vec<_>>();
self.db
.map_put::<tables::HeaderNumbers, _, _>(&headers, |h| (h.hash(), h.number))?;
self.db.map_put::<tables::Headers, _, _>(&headers, |h| {
(BlockNumHash((h.number, h.hash())), h.deref().clone().unseal())
})?;
self.db.map_put::<tables::CanonicalHeaders, _, _>(&headers, |h| {
(h.number, h.hash())
})?;
self.db.transform_append::<tables::HeaderTD, _, _>(&headers, |prev, h| {
let prev_td = U256::from_big_endian(&prev.clone().unwrap_or_default());
(
BlockNumHash((h.number, h.hash())),
H256::from_uint(&(prev_td + h.difficulty)).as_bytes().to_vec(),
)
})?;
Ok(())
}
pub(crate) fn last_body(&self) -> Option<StoredBlockBody> {
Some(
self.db()
.container()
.get()
.cursor::<tables::BlockBodies>()
.ok()?
.last()
.ok()??
.1,
)
}
/// Validate that the inserted block data is valid
pub(crate) fn validate_db_blocks(
&self,
highest_block: BlockNumber,
) -> Result<(), db::Error> {
let db = self.db.container();
let tx = db.get();
let mut block_body_cursor = tx.cursor::<tables::BlockBodies>()?;
let mut transaction_cursor = tx.cursor::<tables::Transactions>()?;
let mut entry = block_body_cursor.first()?;
let mut prev_max_tx_id = 0;
while let Some((key, body)) = entry {
assert!(
key.number() <= highest_block,
"We wrote a block body outside of our synced range. Found block with number {}, highest block according to stage is {}",
key.number(), highest_block
);
assert!(prev_max_tx_id == body.base_tx_id, "Transaction IDs are malformed.");
for num in 0..body.tx_amount {
let tx_id = body.base_tx_id + num;
assert_matches!(
transaction_cursor.seek_exact(tx_id),
Ok(Some(_)),
"A transaction is missing."
);
}
prev_max_tx_id = body.base_tx_id + body.tx_amount;
entry = block_body_cursor.next()?;
}
Ok(())
}
}
// TODO(onbjerg): Move
/// A [BodiesClient] that should not be called.
#[derive(Debug)]
pub(crate) struct NoopClient;
#[async_trait]
impl BodiesClient for NoopClient {
async fn get_block_body(&self, _: H256) -> Result<BlockBody, BodiesClientError> {
panic!("Noop client should not be called")
}
}
// TODO(onbjerg): Move
/// A [BodyDownloader] that is backed by an internal [HashMap] for testing.
#[derive(Debug, Default)]
pub(crate) struct TestBodyDownloader {
responses: HashMap<H256, Result<BlockBody, DownloadError>>,
}
impl TestBodyDownloader {
pub(crate) fn new(responses: HashMap<H256, Result<BlockBody, DownloadError>>) -> Self {
Self { responses }
}
}
impl BodyDownloader for TestBodyDownloader {
type Client = NoopClient;
fn timeout(&self) -> Duration {
unreachable!()
}
fn client(&self) -> &Self::Client {
unreachable!()
}
fn bodies_stream<'a, 'b, I>(&'a self, hashes: I) -> BodiesStream<'a>
where
I: IntoIterator<Item = &'b (BlockNumber, H256)>,
<I as IntoIterator>::IntoIter: Send + 'b,
'b: 'a,
{
Box::pin(futures_util::stream::iter(hashes.into_iter().map(
|(block_number, hash)| {
Ok((
*block_number,
*hash,
self.responses
.get(hash)
.expect("Stage tried downloading a block we do not have.")
.clone()?,
))
},
)))
}
}
}
}

View File

@ -9,10 +9,7 @@ use reth_interfaces::{
models::blocks::BlockNumHash, tables, DBContainer, Database, DatabaseGAT, DbCursorRO,
DbCursorRW, DbTx, DbTxMut,
},
p2p::headers::{
client::HeadersClient,
downloader::{DownloadError, Downloader},
},
p2p::headers::{client::HeadersClient, downloader::HeaderDownloader, error::DownloadError},
};
use reth_primitives::{rpc::BigEndianHash, BlockNumber, SealedHeader, H256, U256};
use std::{fmt::Debug, sync::Arc};
@ -20,9 +17,19 @@ use tracing::*;
const HEADERS: StageId = StageId("Headers");
/// The headers stage implementation for staged sync
/// The headers stage.
///
/// The headers stage downloads all block headers from the highest block in the local database to
/// the perceived highest block on the network.
///
/// The headers are processed and data is inserted into these tables:
///
/// - [`HeaderNumbers`][reth_interfaces::db::tables::HeaderNumbers]
/// - [`Headers`][reth_interfaces::db::tables::Headers]
/// - [`CanonicalHeaders`][reth_interfaces::db::tables::CanonicalHeaders]
/// - [`HeaderTD`][reth_interfaces::db::tables::HeaderTD]
#[derive(Debug)]
pub struct HeaderStage<D: Downloader, C: Consensus, H: HeadersClient> {
pub struct HeaderStage<D: HeaderDownloader, C: Consensus, H: HeadersClient> {
/// Strategy for downloading the headers
pub downloader: D,
/// Consensus client implementation
@ -32,7 +39,7 @@ pub struct HeaderStage<D: Downloader, C: Consensus, H: HeadersClient> {
}
#[async_trait::async_trait]
impl<DB: Database, D: Downloader, C: Consensus, H: HeadersClient> Stage<DB>
impl<DB: Database, D: HeaderDownloader, C: Consensus, H: HeadersClient> Stage<DB>
for HeaderStage<D, C, H>
{
/// Return the id of the stage
@ -55,7 +62,7 @@ impl<DB: Database, D: Downloader, C: Consensus, H: HeadersClient> Stage<DB>
// TODO: handle input.max_block
let last_hash = tx
.get::<tables::CanonicalHeaders>(last_block_num)?
.ok_or(DatabaseIntegrityError::CannonicalHash { number: last_block_num })?;
.ok_or(DatabaseIntegrityError::CanonicalHash { number: last_block_num })?;
let last_header =
tx.get::<tables::Headers>((last_block_num, last_hash).into())?.ok_or({
DatabaseIntegrityError::Header { number: last_block_num, hash: last_hash }
@ -81,14 +88,15 @@ impl<DB: Database, D: Downloader, C: Consensus, H: HeadersClient> Stage<DB>
done: false,
})
}
DownloadError::HeaderValidation { hash, details } => {
warn!("validation error for header {hash}: {details}");
return Err(StageError::Validation { block: last_block_num })
DownloadError::HeaderValidation { hash, error } => {
warn!("Validation error for header {hash}: {error}");
return Err(StageError::Validation { block: last_block_num, error })
}
// TODO: this error is never propagated, clean up
DownloadError::MismatchedHeaders { .. } => {
return Err(StageError::Validation { block: last_block_num })
}
// DownloadError::MismatchedHeaders { .. } => {
// return Err(StageError::Validation { block: last_block_num })
// }
_ => unreachable!(),
},
};
let stage_progress = self.write_headers::<DB>(tx, headers).await?.unwrap_or(last_block_num);
@ -116,7 +124,7 @@ impl<DB: Database, D: Downloader, C: Consensus, H: HeadersClient> Stage<DB>
}
}
impl<D: Downloader, C: Consensus, H: HeadersClient> HeaderStage<D, C, H> {
impl<D: HeaderDownloader, C: Consensus, H: HeadersClient> HeaderStage<D, C, H> {
async fn update_head<DB: Database>(
&self,
tx: &mut <DB as DatabaseGAT<'_>>::TXMut,
@ -124,7 +132,7 @@ impl<D: Downloader, C: Consensus, H: HeadersClient> HeaderStage<D, C, H> {
) -> Result<(), StageError> {
let hash = tx
.get::<tables::CanonicalHeaders>(height)?
.ok_or(DatabaseIntegrityError::CannonicalHeader { number: height })?;
.ok_or(DatabaseIntegrityError::CanonicalHeader { number: height })?;
let td: Vec<u8> = tx.get::<tables::HeaderTD>((height, hash).into())?.unwrap(); // TODO:
self.client.update_status(height, hash, H256::from_slice(&td)).await;
Ok(())
@ -184,31 +192,36 @@ mod tests {
use super::*;
use crate::util::test_utils::StageTestRunner;
use assert_matches::assert_matches;
use reth_interfaces::test_utils::{gen_random_header, gen_random_header_range};
use test_utils::{HeadersTestRunner, TestDownloader};
use reth_interfaces::{
consensus,
test_utils::{
generators::{random_header, random_header_range},
TestHeaderDownloader,
},
};
use test_utils::HeadersTestRunner;
const TEST_STAGE: StageId = StageId("Headers");
/// Check that the execution errors on empty database or
/// prev progress missing from the database.
#[tokio::test]
// Check that the execution errors on empty database or
// prev progress missing from the database.
async fn execute_empty_db() {
let runner = HeadersTestRunner::default();
let rx = runner.execute(ExecInput::default());
assert_matches!(
rx.await.unwrap(),
Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::CannonicalHeader { .. }))
Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::CanonicalHeader { .. }))
);
}
/// Check that the execution exits on downloader timeout.
#[tokio::test]
// Check that the execution exits on downloader timeout.
async fn execute_timeout() {
let head = gen_random_header(0, None);
let runner =
HeadersTestRunner::with_downloader(TestDownloader::new(Err(DownloadError::Timeout {
request_id: 0,
})));
let head = random_header(0, None);
let runner = HeadersTestRunner::with_downloader(TestHeaderDownloader::new(Err(
DownloadError::Timeout { request_id: 0 },
)));
runner.insert_header(&head).expect("failed to insert header");
let rx = runner.execute(ExecInput::default());
@ -216,30 +229,33 @@ mod tests {
assert_matches!(rx.await.unwrap(), Ok(ExecOutput { done, .. }) if !done);
}
/// Check that validation error is propagated during the execution.
#[tokio::test]
// Check that validation error is propagated during the execution.
async fn execute_validation_error() {
let head = gen_random_header(0, None);
let runner = HeadersTestRunner::with_downloader(TestDownloader::new(Err(
DownloadError::HeaderValidation { hash: H256::zero(), details: "".to_owned() },
let head = random_header(0, None);
let runner = HeadersTestRunner::with_downloader(TestHeaderDownloader::new(Err(
DownloadError::HeaderValidation {
hash: H256::zero(),
error: consensus::Error::BaseFeeMissing,
},
)));
runner.insert_header(&head).expect("failed to insert header");
let rx = runner.execute(ExecInput::default());
runner.consensus.update_tip(H256::from_low_u64_be(1));
assert_matches!(rx.await.unwrap(), Err(StageError::Validation { block }) if block == 0);
assert_matches!(rx.await.unwrap(), Err(StageError::Validation { block, error: consensus::Error::BaseFeeMissing, }) if block == 0);
}
/// Validate that all necessary tables are updated after the
/// header download on no previous progress.
#[tokio::test]
// Validate that all necessary tables are updated after the
// header download on no previous progress.
async fn execute_no_progress() {
let (start, end) = (0, 100);
let head = gen_random_header(start, None);
let headers = gen_random_header_range(start + 1..end, head.hash());
let head = random_header(start, None);
let headers = random_header_range(start + 1..end, head.hash());
let result = headers.iter().rev().cloned().collect::<Vec<_>>();
let runner = HeadersTestRunner::with_downloader(TestDownloader::new(Ok(result)));
let runner = HeadersTestRunner::with_downloader(TestHeaderDownloader::new(Ok(result)));
runner.insert_header(&head).expect("failed to insert header");
let rx = runner.execute(ExecInput::default());
@ -251,19 +267,19 @@ mod tests {
Ok(ExecOutput { done, reached_tip, stage_progress })
if done && reached_tip && stage_progress == tip.number
);
assert!(headers.iter().try_for_each(|h| runner.validate_db_header(&h)).is_ok());
assert!(headers.iter().try_for_each(|h| runner.validate_db_header(h)).is_ok());
}
/// Validate that all necessary tables are updated after the
/// header download with some previous progress.
#[tokio::test]
// Validate that all necessary tables are updated after the
// header download with some previous progress.
async fn execute_prev_progress() {
let (start, end) = (10000, 10241);
let head = gen_random_header(start, None);
let headers = gen_random_header_range(start + 1..end, head.hash());
let head = random_header(start, None);
let headers = random_header_range(start + 1..end, head.hash());
let result = headers.iter().rev().cloned().collect::<Vec<_>>();
let runner = HeadersTestRunner::with_downloader(TestDownloader::new(Ok(result)));
let runner = HeadersTestRunner::with_downloader(TestHeaderDownloader::new(Ok(result)));
runner.insert_header(&head).expect("failed to insert header");
let rx = runner.execute(ExecInput {
@ -278,15 +294,15 @@ mod tests {
Ok(ExecOutput { done, reached_tip, stage_progress })
if done && reached_tip && stage_progress == tip.number
);
assert!(headers.iter().try_for_each(|h| runner.validate_db_header(&h)).is_ok());
assert!(headers.iter().try_for_each(|h| runner.validate_db_header(h)).is_ok());
}
/// Execute the stage with linear downloader
#[tokio::test]
// Execute the stage with linear downloader
async fn execute_with_linear_downloader() {
let (start, end) = (1000, 1200);
let head = gen_random_header(start, None);
let headers = gen_random_header_range(start + 1..end, head.hash());
let head = random_header(start, None);
let headers = random_header_range(start + 1..end, head.hash());
let runner = HeadersTestRunner::with_linear_downloader();
runner.insert_header(&head).expect("failed to insert header");
@ -315,11 +331,11 @@ mod tests {
Ok(ExecOutput { done, reached_tip, stage_progress })
if done && reached_tip && stage_progress == tip.number
);
assert!(headers.iter().try_for_each(|h| runner.validate_db_header(&h)).is_ok());
assert!(headers.iter().try_for_each(|h| runner.validate_db_header(h)).is_ok());
}
/// Check that unwind does not panic on empty database.
#[tokio::test]
// Check that unwind does not panic on empty database.
async fn unwind_empty_db() {
let unwind_to = 100;
let runner = HeadersTestRunner::default();
@ -331,13 +347,13 @@ mod tests {
);
}
/// Check that unwind can remove headers across gaps
#[tokio::test]
// Check that unwind can remove headers across gaps
async fn unwind_db_gaps() {
let runner = HeadersTestRunner::default();
let head = gen_random_header(0, None);
let first_range = gen_random_header_range(1..20, head.hash());
let second_range = gen_random_header_range(50..100, H256::zero());
let head = random_header(0, None);
let first_range = random_header_range(1..20, head.hash());
let second_range = random_header_range(50..100, H256::zero());
runner.insert_header(&head).expect("failed to insert header");
runner
.insert_headers(first_range.iter().chain(second_range.iter()))
@ -374,36 +390,34 @@ mod tests {
stages::headers::HeaderStage,
util::test_utils::{StageTestDB, StageTestRunner},
};
use async_trait::async_trait;
use reth_headers_downloaders::linear::{LinearDownloadBuilder, LinearDownloader};
use reth_interfaces::{
consensus::ForkchoiceState,
db::{self, models::blocks::BlockNumHash, tables, DbTx},
p2p::headers::downloader::{DownloadError, Downloader},
test_utils::{TestConsensus, TestHeadersClient},
p2p::headers::downloader::HeaderDownloader,
test_utils::{TestConsensus, TestHeaderDownloader, TestHeadersClient},
};
use reth_primitives::{rpc::BigEndianHash, SealedHeader, H256, U256};
use std::{ops::Deref, sync::Arc, time::Duration};
use std::{ops::Deref, sync::Arc};
pub(crate) struct HeadersTestRunner<D: Downloader> {
pub(crate) struct HeadersTestRunner<D: HeaderDownloader> {
pub(crate) consensus: Arc<TestConsensus>,
pub(crate) client: Arc<TestHeadersClient>,
downloader: Arc<D>,
db: StageTestDB,
}
impl Default for HeadersTestRunner<TestDownloader> {
impl Default for HeadersTestRunner<TestHeaderDownloader> {
fn default() -> Self {
Self {
client: Arc::new(TestHeadersClient::default()),
consensus: Arc::new(TestConsensus::default()),
downloader: Arc::new(TestDownloader::new(Ok(Vec::default()))),
downloader: Arc::new(TestHeaderDownloader::new(Ok(Vec::default()))),
db: StageTestDB::default(),
}
}
}
impl<D: Downloader + 'static> StageTestRunner for HeadersTestRunner<D> {
impl<D: HeaderDownloader + 'static> StageTestRunner for HeadersTestRunner<D> {
type S = HeaderStage<Arc<D>, TestConsensus, TestHeadersClient>;
fn db(&self) -> &StageTestDB {
@ -423,13 +437,14 @@ mod tests {
pub(crate) fn with_linear_downloader() -> Self {
let client = Arc::new(TestHeadersClient::default());
let consensus = Arc::new(TestConsensus::default());
let downloader =
Arc::new(LinearDownloadBuilder::new().build(consensus.clone(), client.clone()));
let downloader = Arc::new(
LinearDownloadBuilder::default().build(consensus.clone(), client.clone()),
);
Self { client, consensus, downloader, db: StageTestDB::default() }
}
}
impl<D: Downloader> HeadersTestRunner<D> {
impl<D: HeaderDownloader> HeadersTestRunner<D> {
pub(crate) fn with_downloader(downloader: D) -> Self {
HeadersTestRunner {
client: Arc::new(TestHeadersClient::default()),
@ -501,42 +516,5 @@ mod tests {
Ok(())
}
}
#[derive(Debug)]
pub(crate) struct TestDownloader {
result: Result<Vec<SealedHeader>, DownloadError>,
}
impl TestDownloader {
pub(crate) fn new(result: Result<Vec<SealedHeader>, DownloadError>) -> Self {
Self { result }
}
}
#[async_trait]
impl Downloader for TestDownloader {
type Consensus = TestConsensus;
type Client = TestHeadersClient;
fn timeout(&self) -> Duration {
Duration::from_secs(1)
}
fn consensus(&self) -> &Self::Consensus {
unimplemented!()
}
fn client(&self) -> &Self::Client {
unimplemented!()
}
async fn download(
&self,
_: &SealedHeader,
_: &ForkchoiceState,
) -> Result<Vec<SealedHeader>, DownloadError> {
self.result.clone()
}
}
}
}

View File

@ -1,3 +1,5 @@
/// The bodies stage.
pub mod bodies;
/// The headers stage.
pub mod headers;
/// The cumulative transaction index stage.

View File

@ -37,13 +37,13 @@ impl<DB: Database> Stage<DB> for TxIndex {
let last_block = input.stage_progress.unwrap_or_default();
let last_hash = tx
.get::<tables::CanonicalHeaders>(last_block)?
.ok_or(DatabaseIntegrityError::CannonicalHeader { number: last_block })?;
.ok_or(DatabaseIntegrityError::CanonicalHeader { number: last_block })?;
// The start block for this iteration
let start_block = last_block + 1;
let start_hash = tx
.get::<tables::CanonicalHeaders>(start_block)?
.ok_or(DatabaseIntegrityError::CannonicalHeader { number: start_block })?;
.ok_or(DatabaseIntegrityError::CanonicalHeader { number: start_block })?;
// The maximum block that this stage should insert to
let max_block = input.previous_stage.as_ref().map(|(_, block)| *block).unwrap_or_default();
@ -65,8 +65,8 @@ impl<DB: Database> Stage<DB> for TxIndex {
// Aggregate and insert cumulative transaction count for each block number
for entry in entries {
let (key, tx_count) = entry?;
count += tx_count as u64;
let (key, body) = entry?;
count += body.tx_amount;
cursor.append(key, count)?;
}
@ -89,7 +89,7 @@ mod tests {
use super::*;
use crate::util::test_utils::{StageTestDB, StageTestRunner};
use assert_matches::assert_matches;
use reth_interfaces::{db::models::BlockNumHash, test_utils::gen_random_header_range};
use reth_interfaces::{db::models::BlockNumHash, test_utils::generators::random_header_range};
use reth_primitives::H256;
const TEST_STAGE: StageId = StageId("PrevStage");
@ -100,14 +100,14 @@ mod tests {
let rx = runner.execute(ExecInput::default());
assert_matches!(
rx.await.unwrap(),
Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::CannonicalHeader { .. }))
Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::CanonicalHeader { .. }))
);
}
#[tokio::test]
async fn execute_no_prev_tx_count() {
let runner = TxIndexTestRunner::default();
let headers = gen_random_header_range(0..10, H256::zero());
let headers = random_header_range(0..10, H256::zero());
runner
.db()
.map_put::<tables::CanonicalHeaders, _, _>(&headers, |h| (h.number, h.hash()))
@ -129,7 +129,7 @@ mod tests {
async fn execute() {
let runner = TxIndexTestRunner::default();
let (start, pivot, end) = (0, 100, 200);
let headers = gen_random_header_range(start..end, H256::zero());
let headers = random_header_range(start..end, H256::zero());
runner
.db()
.map_put::<tables::CanonicalHeaders, _, _>(&headers, |h| (h.number, h.hash()))
@ -170,7 +170,7 @@ mod tests {
#[tokio::test]
async fn unwind_no_input() {
let runner = TxIndexTestRunner::default();
let headers = gen_random_header_range(0..10, H256::zero());
let headers = random_header_range(0..10, H256::zero());
runner
.db()
.transform_append::<tables::CumulativeTxCount, _, _>(&headers, |prev, h| {
@ -195,8 +195,8 @@ mod tests {
#[tokio::test]
async fn unwind_with_db_gaps() {
let runner = TxIndexTestRunner::default();
let first_range = gen_random_header_range(0..20, H256::zero());
let second_range = gen_random_header_range(50..100, H256::zero());
let first_range = random_header_range(0..20, H256::zero());
let second_range = random_header_range(50..100, H256::zero());
runner
.db()
.transform_append::<tables::CumulativeTxCount, _, _>(

View File

@ -67,8 +67,8 @@
//! The final `TransactionPool` is made up of two layers:
//!
//! The lowest layer is the actual pool implementations that manages (validated) transactions:
//! [`TxPool`](crate::pool::TxPool). This is contained in a higher level pool type that guards the
//! low level pool and handles additional listeners or metrics:
//! [`TxPool`](crate::pool::txpool::TxPool). This is contained in a higher level pool type that
//! guards the low level pool and handles additional listeners or metrics:
//! [`PoolInner`](crate::pool::PoolInner)
//!
//! The transaction pool will be used by separate consumers (RPC, P2P), to make sharing easier, the