diff --git a/.gitignore b/.gitignore index 6db043d3d..ee44a9639 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,2 @@ .idea -/target +target diff --git a/Cargo.lock b/Cargo.lock index 0e4ca0e4c..6a6dc2c52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3043,6 +3043,21 @@ dependencies = [ "walkdir", ] +[[package]] +name = "reth-bodies-downloaders" +version = "0.1.0" +dependencies = [ + "assert_matches", + "futures-util", + "once_cell", + "rand 0.8.5", + "reth-eth-wire", + "reth-interfaces", + "reth-primitives", + "serial_test", + "tokio", +] + [[package]] name = "reth-codecs" version = "0.1.0" @@ -3057,16 +3072,11 @@ dependencies = [ "async-trait", "auto_impl", "eyre", - "hash-db", - "plain_hasher", "reth-interfaces", "reth-primitives", "reth-rlp", - "rlp", - "sha3", "thiserror", "tokio", - "triehash", ] [[package]] @@ -3225,6 +3235,7 @@ dependencies = [ "rand 0.8.5", "reth-codecs", "reth-db", + "reth-eth-wire", "reth-primitives", "reth-rpc-types", "serde", @@ -3332,10 +3343,12 @@ dependencies = [ "crc", "derive_more", "ethers-core", + "hash-db", "hex", "hex-literal", "maplit", "parity-scale-codec", + "plain_hasher", "reth-codecs", "reth-rlp", "secp256k1", @@ -3344,6 +3357,7 @@ dependencies = [ "sucds", "thiserror", "tiny-keccak", + "triehash", ] [[package]] @@ -3416,9 +3430,12 @@ dependencies = [ "aquamarine", "assert_matches", "async-trait", + "futures-util", "metrics", "rand 0.8.5", + "reth-bodies-downloaders", "reth-db", + "reth-eth-wire", "reth-headers-downloaders", "reth-interfaces", "reth-primitives", diff --git a/Cargo.toml b/Cargo.toml index d1fa72558..d94f9eab3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ members = [ "crates/net/rpc-api", "crates/net/rpc-types", "crates/net/headers-downloaders", + "crates/net/bodies-downloaders", "crates/primitives", "crates/stages", "crates/transaction-pool", diff --git a/crates/.gitignore b/crates/.gitignore deleted file mode 100644 index 2f7896d1d..000000000 --- a/crates/.gitignore +++ /dev/null @@ -1 +0,0 @@ -target/ diff --git a/crates/consensus/Cargo.toml b/crates/consensus/Cargo.toml index aefc34afa..267f026ff 100644 --- a/crates/consensus/Cargo.toml +++ b/crates/consensus/Cargo.toml @@ -17,14 +17,4 @@ async-trait = "0.1.57" thiserror = "1.0.37" eyre = "0.6.8" auto_impl = "1.0" -tokio = { version = "1.21.2", features = ["sync"] } - -# proof related -triehash = "0.8" -# See to replace hashers to simplify libraries -plain_hasher = "0.2" -hash-db = "0.15" -# todo replace with faster rlp impl -rlp = { version = "0.5", default-features = false } -# replace with tiny-keccak (it is faster hasher) -sha3 = { version = "0.10", default-features = false } \ No newline at end of file +tokio = { version = "1.21.2", features = ["sync"] } \ No newline at end of file diff --git a/crates/consensus/src/consensus.rs b/crates/consensus/src/consensus.rs index d00088494..3d174df72 100644 --- a/crates/consensus/src/consensus.rs +++ b/crates/consensus/src/consensus.rs @@ -2,7 +2,7 @@ use crate::{verification, Config}; use reth_interfaces::consensus::{Consensus, Error, ForkchoiceState}; -use reth_primitives::{SealedHeader, H256}; +use reth_primitives::{BlockLocked, SealedHeader, H256}; use tokio::sync::watch; /// Ethereum consensus @@ -40,4 +40,8 @@ impl Consensus for EthConsensus { // * mix_hash & nonce PoW stuf // * extra_data } + + fn pre_validate_block(&self, block: &BlockLocked) -> Result<(), Error> { + verification::validate_block_standalone(block, false) + } } diff --git a/crates/consensus/src/lib.rs b/crates/consensus/src/lib.rs index 9d9a6e48c..a36c5abf4 100644 --- a/crates/consensus/src/lib.rs +++ b/crates/consensus/src/lib.rs @@ -10,9 +10,6 @@ pub mod config; pub mod consensus; pub mod verification; -/// Helper function for calculating Merkle proofs and hashes -pub mod proofs; - pub use config::Config; pub use consensus::EthConsensus; pub use reth_interfaces::consensus::Error; diff --git a/crates/consensus/src/verification.rs b/crates/consensus/src/verification.rs index 27428f716..50e357ada 100644 --- a/crates/consensus/src/verification.rs +++ b/crates/consensus/src/verification.rs @@ -115,10 +115,20 @@ pub fn validate_transaction_regarding_state( Ok(()) } -/// Validate block standalone -pub fn validate_block_standalone(block: &BlockLocked) -> Result<(), Error> { - // check ommers hash - let ommers_hash = crate::proofs::calculate_ommers_root(block.ommers.iter().map(|h| h.as_ref())); +/// Validate a block without regard for state: +/// +/// - Compares the ommer hash in the block header to the block body +/// - Compares the transactions root in the block header to the block body +/// - Pre-execution transaction validation +/// - (Optionally) Compares the receipts root in the block header to the block body +pub fn validate_block_standalone( + block: &BlockLocked, + validate_receipts: bool, +) -> Result<(), Error> { + // Check ommers hash + // TODO(onbjerg): This should probably be accessible directly on [Block] + let ommers_hash = + reth_primitives::proofs::calculate_ommers_root(block.ommers.iter().map(|h| h.as_ref())); if block.header.ommers_hash != ommers_hash { return Err(Error::BodyOmmersHashDiff { got: ommers_hash, @@ -126,8 +136,9 @@ pub fn validate_block_standalone(block: &BlockLocked) -> Result<(), Error> { }) } - // check transaction root - let transaction_root = crate::proofs::calculate_transaction_root(block.body.iter()); + // Check transaction root + // TODO(onbjerg): This should probably be accessible directly on [Block] + let transaction_root = reth_primitives::proofs::calculate_transaction_root(block.body.iter()); if block.header.transactions_root != transaction_root { return Err(Error::BodyTransactionRootDiff { got: transaction_root, @@ -135,18 +146,27 @@ pub fn validate_block_standalone(block: &BlockLocked) -> Result<(), Error> { }) } - // TODO transaction verification, Maybe make it configurable as in check only + // TODO: transaction verification,maybe make it configurable as in check only // signatures/limits/types + // Things to probably check: + // - Chain ID + // - Base fee per gas (if applicable) + // - Max priority fee per gas (if applicable) - // check if all transactions limit does not goes over block limit + // TODO: Check if all transaction gas total does not go over block limit - // check receipts root - let receipts_root = crate::proofs::calculate_receipt_root(block.receipts.iter()); - if block.header.receipts_root != receipts_root { - return Err(Error::BodyReceiptsRootDiff { - got: receipts_root, - expected: block.header.receipts_root, - }) + // Check receipts root + // TODO(onbjerg): This should probably be accessible directly on [Block] + // NOTE(onbjerg): Pre-validation does not validate the receipts root since we do not have the + // receipts yet (this validation is before execution). Maybe this should not be in here? + if validate_receipts { + let receipts_root = reth_primitives::proofs::calculate_receipt_root(block.receipts.iter()); + if block.header.receipts_root != receipts_root { + return Err(Error::BodyReceiptsRootDiff { + got: receipts_root, + expected: block.header.receipts_root, + }) + } } Ok(()) @@ -284,7 +304,7 @@ pub fn full_validation( config: &Config, ) -> RethResult<()> { validate_header_standalone(&block.header, config)?; - validate_block_standalone(block)?; + validate_block_standalone(block, true)?; let parent = validate_block_regarding_chain(block, &provider)?; validate_header_regarding_parent(&parent, &block.header, config)?; Ok(()) diff --git a/crates/db/src/kv/mod.rs b/crates/db/src/kv/mod.rs index fdad0808b..a5600702e 100644 --- a/crates/db/src/kv/mod.rs +++ b/crates/db/src/kv/mod.rs @@ -50,7 +50,7 @@ impl Database for Env { impl Env { /// Opens the database at the specified path with the given `EnvKind`. /// - /// It does not create the tables, for that call [`create_tables`]. + /// It does not create the tables, for that call [`Env::create_tables`]. pub fn open(path: &Path, kind: EnvKind) -> Result, Error> { let mode = match kind { EnvKind::RO => Mode::ReadOnly, diff --git a/crates/interfaces/Cargo.toml b/crates/interfaces/Cargo.toml index b754f5e75..752fc9efc 100644 --- a/crates/interfaces/Cargo.toml +++ b/crates/interfaces/Cargo.toml @@ -16,6 +16,9 @@ auto_impl = "1.0" tokio = { version = "1.21.2", features = ["sync"] } bytes = "1.2" +# TODO(onbjerg): We only need this for [BlockBody] +reth-eth-wire = { path = "../net/eth-wire" } + # codecs serde = { version = "1.0.*", default-features = false } postcard = { version = "1.0.2", features = ["alloc"] } diff --git a/crates/interfaces/src/consensus.rs b/crates/interfaces/src/consensus.rs index ac82fce85..0e98c015d 100644 --- a/crates/interfaces/src/consensus.rs +++ b/crates/interfaces/src/consensus.rs @@ -1,20 +1,30 @@ use async_trait::async_trait; -use reth_primitives::{BlockHash, BlockNumber, SealedHeader, H256}; +use reth_primitives::{BlockHash, BlockLocked, BlockNumber, SealedHeader, H256}; use tokio::sync::watch::Receiver; /// Re-export forkchoice state pub use reth_rpc_types::engine::ForkchoiceState; /// Consensus is a protocol that chooses canonical chain. -/// We are checking validity of block header here. #[async_trait] #[auto_impl::auto_impl(&, Arc)] pub trait Consensus: Send + Sync { /// Get a receiver for the fork choice state fn fork_choice_state(&self) -> Receiver; - /// Validate if header is correct and follows consensus specification + /// Validate if header is correct and follows consensus specification. + /// + /// **This should not be called for the genesis block**. fn validate_header(&self, header: &SealedHeader, parent: &SealedHeader) -> Result<(), Error>; + + /// Validate a block disregarding world state, i.e. things that can be checked before sender + /// recovery and execution. + /// + /// See the Yellow Paper sections 4.3.2 "Holistic Validity", 4.3.4 "Block Header Validity", and + /// 11.1 "Ommer Validation". + /// + /// **This should not be called for the genesis block**. + fn pre_validate_block(&self, block: &BlockLocked) -> Result<(), Error>; } /// Consensus Errors diff --git a/crates/interfaces/src/db/codecs/scale.rs b/crates/interfaces/src/db/codecs/scale.rs index 93e2d2374..ce8eaa13f 100644 --- a/crates/interfaces/src/db/codecs/scale.rs +++ b/crates/interfaces/src/db/codecs/scale.rs @@ -1,4 +1,7 @@ -use crate::db::{models::accounts::AccountBeforeTx, Compress, Decompress, Error}; +use crate::db::{ + models::{accounts::AccountBeforeTx, StoredBlockBody}, + Compress, Decompress, Error, +}; use parity_scale_codec::decode_from_bytes; use reth_primitives::*; @@ -53,7 +56,16 @@ impl ScaleValue for Vec {} impl sealed::Sealed for Vec {} impl_scale!(U256, H256, H160); -impl_scale!(Header, Account, Log, Receipt, TxType, StorageEntry, TransactionSigned); +impl_scale!( + Header, + Account, + Log, + Receipt, + TxType, + StorageEntry, + TransactionSigned, + StoredBlockBody +); impl_scale!(AccountBeforeTx); impl_scale_value!(u8, u32, u16, u64); diff --git a/crates/interfaces/src/db/mod.rs b/crates/interfaces/src/db/mod.rs index 30691c7cf..81d241a69 100644 --- a/crates/interfaces/src/db/mod.rs +++ b/crates/interfaces/src/db/mod.rs @@ -74,9 +74,9 @@ pub trait Database: for<'a> DatabaseGAT<'a> { /// Sealed trait which cannot be implemented by 3rd parties, exposed only for implementers pub trait DbTxGAT<'a, __ImplicitBounds: Sealed = Bounds<&'a Self>>: Send + Sync { /// Cursor GAT - type Cursor: DbCursorRO<'a, T>; + type Cursor: DbCursorRO<'a, T> + Send + Sync; /// DupCursor GAT - type DupCursor: DbDupCursorRO<'a, T> + DbCursorRO<'a, T>; + type DupCursor: DbDupCursorRO<'a, T> + DbCursorRO<'a, T> + Send + Sync; } /// Implements the GAT method from: @@ -85,12 +85,14 @@ pub trait DbTxGAT<'a, __ImplicitBounds: Sealed = Bounds<&'a Self>>: Send + Sync /// Sealed trait which cannot be implemented by 3rd parties, exposed only for implementers pub trait DbTxMutGAT<'a, __ImplicitBounds: Sealed = Bounds<&'a Self>>: Send + Sync { /// Cursor GAT - type CursorMut: DbCursorRW<'a, T> + DbCursorRO<'a, T>; + type CursorMut: DbCursorRW<'a, T> + DbCursorRO<'a, T> + Send + Sync; /// DupCursor GAT type DupCursorMut: DbDupCursorRW<'a, T> + DbCursorRW<'a, T> + DbDupCursorRO<'a, T> - + DbCursorRO<'a, T>; + + DbCursorRO<'a, T> + + Send + + Sync; } /// Read only transaction @@ -190,7 +192,9 @@ pub trait DbCursorRW<'tx, T: Table> { /// exists in a table, and insert a new row if the specified value doesn't already exist fn upsert(&mut self, key: T::Key, value: T::Value) -> Result<(), Error>; - /// Append value to next cursor item + /// Append value to next cursor item. + /// + /// This is efficient for pre-sorted data. If the data is not pre-sorted, use [`insert`]. fn append(&mut self, key: T::Key, value: T::Value) -> Result<(), Error>; /// Delete current value that cursor points to @@ -201,7 +205,10 @@ pub trait DbCursorRW<'tx, T: Table> { pub trait DbDupCursorRW<'tx, T: DupSort> { /// Append value to next cursor item fn delete_current_duplicates(&mut self) -> Result<(), Error>; - /// Append duplicate value + + /// Append duplicate value. + /// + /// This is efficient for pre-sorted data. If the data is not pre-sorted, use [`insert`]. fn append_dup(&mut self, key: T::Key, value: T::Value) -> Result<(), Error>; } diff --git a/crates/interfaces/src/db/models/blocks.rs b/crates/interfaces/src/db/models/blocks.rs index c24c0ca4b..a2b0c416e 100644 --- a/crates/interfaces/src/db/models/blocks.rs +++ b/crates/interfaces/src/db/models/blocks.rs @@ -8,14 +8,30 @@ use crate::{ impl_fixed_arbitrary, }; use bytes::Bytes; -use reth_primitives::{BlockHash, BlockNumber, H256}; +use reth_codecs::main_codec; +use reth_primitives::{BlockHash, BlockNumber, Header, TxNumber, H256}; use serde::{Deserialize, Serialize}; /// Total chain number of transactions. Key for [`CumulativeTxCount`]. pub type NumTransactions = u64; -/// Number of transactions in the block. Value for [`BlockBodies`]. -pub type NumTxesInBlock = u16; +/// The storage representation of a block body. +/// +/// A block body is stored as a pointer to the first transaction in the block (`base_tx_id`), a +/// count of how many transactions are in the block, and the headers of the block's uncles. +/// +/// The [TxNumber]s for all the transactions in the block are `base_tx_id..(base_tx_id + +/// tx_amount)`. +#[derive(Debug)] +#[main_codec] +pub struct StoredBlockBody { + /// The ID of the first transaction in the block. + pub base_tx_id: TxNumber, + /// The number of transactions in the block. + pub tx_amount: u64, + /// The block headers of this block's uncles. + pub ommers: Vec
, +} /// Hash of the block header. Value for [`CanonicalHeaders`] pub type HeaderHash = H256; diff --git a/crates/interfaces/src/db/tables.rs b/crates/interfaces/src/db/tables.rs index b9f0c1971..6b663f7c6 100644 --- a/crates/interfaces/src/db/tables.rs +++ b/crates/interfaces/src/db/tables.rs @@ -3,7 +3,7 @@ use crate::db::{ models::{ accounts::{AccountBeforeTx, TxNumberAddress}, - blocks::{BlockNumHash, HeaderHash, NumTransactions, NumTxesInBlock}, + blocks::{BlockNumHash, HeaderHash, NumTransactions, StoredBlockBody}, ShardedKey, }, DupSort, @@ -13,7 +13,7 @@ use reth_primitives::{ TransactionSigned, TxNumber, H256, }; -/// Enum for the type of table present in libmdbx. +/// Enum for the types of tables present in libmdbx. #[derive(Debug)] pub enum TableType { /// key value table @@ -119,8 +119,10 @@ table!( Headers => BlockNumHash => Header); table!( - /// Stores the number of transactions of a block. - BlockBodies => BlockNumHash => NumTxesInBlock); + /// Stores a pointer to the first transaction in the block, the number of transactions in the block, and the uncles/ommers of the block. + /// + /// The transaction IDs point to the [`Transactions`] table. + BlockBodies => BlockNumHash => StoredBlockBody); table!( /// Stores the maximum [`TxNumber`] from which this particular block starts. @@ -131,19 +133,19 @@ table!( NonCanonicalTransactions => BlockNumHashTxNumber => TransactionSigned); table!( - /// Stores the transaction body from canonical transactions. Canonical only + /// (Canonical only) Stores the transaction body for canonical transactions. Transactions => TxNumber => TransactionSigned); table!( - /// Stores transaction receipts. Canonical only + /// (Canonical only) Stores transaction receipts. Receipts => TxNumber => Receipt); table!( - /// Stores transaction logs. Canonical only + /// (Canonical only) Stores transaction logs. Logs => TxNumber => Receipt); table!( - /// Stores the current state of an Account. + /// Stores the current state of an [`Account`]. PlainAccountState => Address => Account); table!( @@ -200,27 +202,27 @@ table!( AccountHistory => ShardedKey
=> TxNumberList); table!( - /// Stores the transaction numbers that changed each storage key. + /// Stores pointers to transactions that changed each storage key. StorageHistory => AddressStorageKey => TxNumberList); dupsort!( - /// Stores state of an account before a certain transaction changed it. + /// Stores the state of an account before a certain transaction changed it. AccountChangeSet => TxNumber => [Address] AccountBeforeTx); dupsort!( - /// Stores state of a storage key before a certain transaction changed it. + /// Stores the state of a storage key before a certain transaction changed it. StorageChangeSet => TxNumberAddress => [H256] StorageEntry); table!( - /// Stores the transaction sender from each transaction. + /// Stores the transaction sender for each transaction. TxSenders => TxNumber => Address); // Is it necessary? if so, inverted index index so we dont repeat addresses? table!( - /// Config. + /// Configuration values. Config => ConfigKey => ConfigValue); table!( - /// Stores the block number of each stage id. + /// Stores the highest synced block number of each stage. SyncStage => StageId => BlockNumber); /// diff --git a/crates/interfaces/src/p2p/bodies/client.rs b/crates/interfaces/src/p2p/bodies/client.rs new file mode 100644 index 000000000..4e546fea9 --- /dev/null +++ b/crates/interfaces/src/p2p/bodies/client.rs @@ -0,0 +1,14 @@ +use reth_eth_wire::BlockBody; +use reth_primitives::H256; + +use crate::p2p::bodies::error::BodiesClientError; +use async_trait::async_trait; +use std::fmt::Debug; + +/// A client capable of downloading block bodies. +#[async_trait] +#[auto_impl::auto_impl(&, Arc, Box)] +pub trait BodiesClient: Send + Sync + Debug { + /// Fetches the block body for the requested block. + async fn get_block_body(&self, hash: H256) -> Result; +} diff --git a/crates/interfaces/src/p2p/bodies/downloader.rs b/crates/interfaces/src/p2p/bodies/downloader.rs new file mode 100644 index 000000000..677cbab0c --- /dev/null +++ b/crates/interfaces/src/p2p/bodies/downloader.rs @@ -0,0 +1,44 @@ +use super::client::BodiesClient; +use crate::p2p::bodies::error::DownloadError; +use reth_eth_wire::BlockBody; +use reth_primitives::{BlockNumber, H256}; +use std::{pin::Pin, time::Duration}; +use tokio_stream::Stream; + +/// A downloader capable of fetching block bodies from header hashes. +/// +/// A downloader represents a distinct strategy for submitting requests to download block bodies, +/// while a [BodiesClient] represents a client capable of fulfilling these requests. +pub trait BodyDownloader: Sync + Send { + /// The [BodiesClient] used to fetch the block bodies + type Client: BodiesClient; + + /// The request timeout duration + fn timeout(&self) -> Duration; + + /// The block bodies client + fn client(&self) -> &Self::Client; + + /// Download the bodies from `starting_block` (inclusive) up until `target_block` (inclusive). + /// + /// The returned stream will always emit bodies in the order they were requested, but multiple + /// requests may be in flight at the same time. + /// + /// The stream may exit early in some cases. Thus, a downloader can only at a minimum guarantee: + /// + /// - All emitted bodies map onto a request + /// - The emitted bodies are emitted in order: i.e. the body for the first block is emitted + /// first, even if it was not fetched first. + /// + /// It is *not* guaranteed that all the requested bodies are fetched: the downloader may close + /// the stream before the entire range has been fetched for any reason + fn bodies_stream<'a, 'b, I>(&'a self, headers: I) -> BodiesStream<'a> + where + I: IntoIterator, + ::IntoIter: Send + 'b, + 'b: 'a; +} + +/// A stream of block bodies. +pub type BodiesStream<'a> = + Pin> + Send + 'a>>; diff --git a/crates/interfaces/src/p2p/bodies/error.rs b/crates/interfaces/src/p2p/bodies/error.rs new file mode 100644 index 000000000..b6b11682f --- /dev/null +++ b/crates/interfaces/src/p2p/bodies/error.rs @@ -0,0 +1,51 @@ +use crate::p2p::error::RequestError; +use reth_primitives::H256; +use thiserror::Error; + +/// Body client errors. +#[derive(Error, Debug, Clone)] +pub enum BodiesClientError { + /// Timed out while waiting for a response. + #[error("Timed out while getting bodies for block {header_hash}.")] + Timeout { + /// The header hash of the block that timed out. + header_hash: H256, + }, + /// The client encountered an internal error. + #[error(transparent)] + Internal(#[from] RequestError), +} + +/// Body downloader errors. +#[derive(Error, Debug, Clone)] +pub enum DownloadError { + /// Timed out while waiting for a response. + #[error("Timed out while getting bodies for block {header_hash}.")] + Timeout { + /// The header hash of the block that timed out. + header_hash: H256, + }, + /// The [BodiesClient] used by the downloader experienced an error. + #[error("The downloader client encountered an error.")] + Client { + /// The underlying client error. + #[source] + source: BodiesClientError, + }, +} + +impl From for DownloadError { + fn from(error: BodiesClientError) -> Self { + match error { + BodiesClientError::Timeout { header_hash } => DownloadError::Timeout { header_hash }, + _ => DownloadError::Client { source: error }, + } + } +} + +impl DownloadError { + /// Indicates whether this error is retryable or fatal. + pub fn is_retryable(&self) -> bool { + matches!(self, DownloadError::Timeout { .. }) + } +} diff --git a/crates/interfaces/src/p2p/bodies/mod.rs b/crates/interfaces/src/p2p/bodies/mod.rs new file mode 100644 index 000000000..bc0c5df09 --- /dev/null +++ b/crates/interfaces/src/p2p/bodies/mod.rs @@ -0,0 +1,8 @@ +/// Traits and types for block body clients. +pub mod client; + +/// Block body downloaders. +pub mod downloader; + +/// Error types. +pub mod error; diff --git a/crates/interfaces/src/p2p/error.rs b/crates/interfaces/src/p2p/error.rs index 56e7bd340..856e5f65e 100644 --- a/crates/interfaces/src/p2p/error.rs +++ b/crates/interfaces/src/p2p/error.rs @@ -4,7 +4,7 @@ use tokio::sync::{mpsc, oneshot}; pub type RequestResult = Result; /// Error variants that can happen when sending requests to a session. -#[derive(Debug, thiserror::Error)] +#[derive(Debug, thiserror::Error, Clone)] #[allow(missing_docs)] pub enum RequestError { #[error("Closed channel to the peer.")] diff --git a/crates/interfaces/src/p2p/headers/downloader.rs b/crates/interfaces/src/p2p/headers/downloader.rs index f5b4c1f7e..2657b37be 100644 --- a/crates/interfaces/src/p2p/headers/downloader.rs +++ b/crates/interfaces/src/p2p/headers/downloader.rs @@ -1,60 +1,20 @@ use super::client::{HeadersClient, HeadersRequest, HeadersStream}; use crate::consensus::Consensus; +use crate::p2p::headers::error::DownloadError; use async_trait::async_trait; -use reth_primitives::{ - rpc::{BlockId, BlockNumber}, - Header, SealedHeader, H256, -}; +use reth_primitives::{rpc::BlockId, Header, SealedHeader}; use reth_rpc_types::engine::ForkchoiceState; -use std::{fmt::Debug, time::Duration}; -use thiserror::Error; +use std::time::Duration; use tokio_stream::StreamExt; -/// The downloader error type -#[derive(Error, Debug, Clone)] -pub enum DownloadError { - /// Header validation failed - #[error("Failed to validate header {hash}. Details: {details}.")] - HeaderValidation { - /// Hash of header failing validation - hash: H256, - /// The details of validation failure - details: String, - }, - /// Timed out while waiting for request id response. - #[error("Timed out while getting headers for request {request_id}.")] - Timeout { - /// The request id that timed out - request_id: u64, - }, - /// Error when checking that the current [`Header`] has the parent's hash as the parent_hash - /// field, and that they have sequential block numbers. - #[error("Headers did not match, current number: {header_number} / current hash: {header_hash}, parent number: {parent_number} / parent_hash: {parent_hash}")] - MismatchedHeaders { - /// The header number being evaluated - header_number: BlockNumber, - /// The header hash being evaluated - header_hash: H256, - /// The parent number being evaluated - parent_number: BlockNumber, - /// The parent hash being evaluated - parent_hash: H256, - }, -} - -impl DownloadError { - /// Returns bool indicating whether this error is retryable or fatal, in the cases - /// where the peer responds with no headers, or times out. - pub fn is_retryable(&self) -> bool { - matches!(self, DownloadError::Timeout { .. }) - } -} - -/// The header downloading strategy +/// A downloader capable of fetching block headers. +/// +/// A downloader represents a distinct strategy for submitting requests to download block headers, +/// while a [HeadersClient] represents a client capable of fulfilling these requests. #[async_trait] #[auto_impl::auto_impl(&, Arc, Box)] -pub trait Downloader: Sync + Send { +pub trait HeaderDownloader: Sync + Send { /// The Consensus used to verify block validity when /// downloading type Consensus: Consensus; @@ -118,9 +78,9 @@ pub trait Downloader: Sync + Send { }) } - self.consensus().validate_header(header, parent).map_err(|e| { - DownloadError::HeaderValidation { hash: parent.hash(), details: e.to_string() } - })?; + self.consensus() + .validate_header(header, parent) + .map_err(|error| DownloadError::HeaderValidation { hash: parent.hash(), error })?; Ok(()) } } diff --git a/crates/interfaces/src/p2p/headers/error.rs b/crates/interfaces/src/p2p/headers/error.rs new file mode 100644 index 000000000..035727f1a --- /dev/null +++ b/crates/interfaces/src/p2p/headers/error.rs @@ -0,0 +1,44 @@ +use crate::consensus; +use reth_primitives::{rpc::BlockNumber, H256}; +use thiserror::Error; + +/// The downloader error type +#[derive(Error, Debug, Clone)] +pub enum DownloadError { + /// Header validation failed + #[error("Failed to validate header {hash}. Details: {error}.")] + HeaderValidation { + /// Hash of header failing validation + hash: H256, + /// The details of validation failure + #[source] + error: consensus::Error, + }, + /// Timed out while waiting for request id response. + #[error("Timed out while getting headers for request {request_id}.")] + Timeout { + /// The request id that timed out + request_id: u64, + }, + /// Error when checking that the current [`Header`] has the parent's hash as the parent_hash + /// field, and that they have sequential block numbers. + #[error("Headers did not match, current number: {header_number} / current hash: {header_hash}, parent number: {parent_number} / parent_hash: {parent_hash}")] + MismatchedHeaders { + /// The header number being evaluated + header_number: BlockNumber, + /// The header hash being evaluated + header_hash: H256, + /// The parent number being evaluated + parent_number: BlockNumber, + /// The parent hash being evaluated + parent_hash: H256, + }, +} + +impl DownloadError { + /// Returns bool indicating whether this error is retryable or fatal, in the cases + /// where the peer responds with no headers, or times out. + pub fn is_retryable(&self) -> bool { + matches!(self, DownloadError::Timeout { .. }) + } +} diff --git a/crates/interfaces/src/p2p/headers/mod.rs b/crates/interfaces/src/p2p/headers/mod.rs index 915b28ff0..d85e6d42a 100644 --- a/crates/interfaces/src/p2p/headers/mod.rs +++ b/crates/interfaces/src/p2p/headers/mod.rs @@ -9,3 +9,6 @@ pub mod client; /// [`Consensus`]: crate::consensus::Consensus /// [`HeadersClient`]: client::HeadersClient pub mod downloader; + +/// Error types. +pub mod error; diff --git a/crates/interfaces/src/p2p/mod.rs b/crates/interfaces/src/p2p/mod.rs index fb351fdc2..ecdad5970 100644 --- a/crates/interfaces/src/p2p/mod.rs +++ b/crates/interfaces/src/p2p/mod.rs @@ -1,3 +1,6 @@ +/// Traits for implementing P2P block body clients. +pub mod bodies; + /// Traits for implementing P2P Header Clients. Also includes implementations /// of a Linear and a Parallel downloader generic over the [`Consensus`] and /// [`HeadersClient`]. diff --git a/crates/interfaces/src/test_utils/bodies.rs b/crates/interfaces/src/test_utils/bodies.rs new file mode 100644 index 000000000..802b21673 --- /dev/null +++ b/crates/interfaces/src/test_utils/bodies.rs @@ -0,0 +1,33 @@ +use crate::p2p::bodies::{client::BodiesClient, error::BodiesClientError}; +use async_trait::async_trait; +use reth_eth_wire::BlockBody; +use reth_primitives::H256; +use std::fmt::{Debug, Formatter}; + +/// A test client for fetching bodies +pub struct TestBodiesClient +where + F: Fn(H256) -> Result, +{ + /// The function that is called on each body request. + pub responder: F, +} + +impl Debug for TestBodiesClient +where + F: Fn(H256) -> Result, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TestBodiesClient").finish() + } +} + +#[async_trait] +impl BodiesClient for TestBodiesClient +where + F: Fn(H256) -> Result + Send + Sync, +{ + async fn get_block_body(&self, hash: H256) -> Result { + (self.responder)(hash) + } +} diff --git a/crates/interfaces/src/test_utils/generators.rs b/crates/interfaces/src/test_utils/generators.rs new file mode 100644 index 000000000..c8c8489f8 --- /dev/null +++ b/crates/interfaces/src/test_utils/generators.rs @@ -0,0 +1,142 @@ +use rand::{thread_rng, Rng}; +use reth_primitives::{ + proofs, Address, BlockLocked, Bytes, Header, SealedHeader, Signature, Transaction, + TransactionKind, TransactionSigned, H256, U256, +}; + +// TODO(onbjerg): Maybe we should split this off to its own crate, or move the helpers to the +// relevant crates? + +/// Generates a range of random [SealedHeader]s. +/// +/// The parent hash of the first header +/// in the result will be equal to `head`. +/// +/// The headers are assumed to not be correct if validated. +pub fn random_header_range(rng: std::ops::Range, head: H256) -> Vec { + let mut headers = Vec::with_capacity(rng.end.saturating_sub(rng.start) as usize); + for idx in rng { + headers.push(random_header( + idx, + Some(headers.last().map(|h: &SealedHeader| h.hash()).unwrap_or(head)), + )); + } + headers +} + +/// Generate a random [SealedHeader]. +/// +/// The header is assumed to not be correct if validated. +pub fn random_header(number: u64, parent: Option) -> SealedHeader { + let header = reth_primitives::Header { + number, + nonce: rand::random(), + difficulty: U256::from(rand::random::()), + parent_hash: parent.unwrap_or_default(), + ..Default::default() + }; + header.seal() +} + +/// Generates a random legacy [Transaction]. +/// +/// Every field is random, except: +/// +/// - The chain ID, which is always 1 +/// - The input, which is always nothing +pub fn random_tx() -> Transaction { + Transaction::Legacy { + chain_id: Some(1), + nonce: rand::random::().into(), + gas_price: rand::random::().into(), + gas_limit: rand::random::().into(), + to: TransactionKind::Call(Address::random()), + value: rand::random::().into(), + input: Bytes::default(), + } +} + +/// Generates a random legacy [Transaction] that is signed. +/// +/// On top of the considerations of [gen_random_tx], these apply as well: +/// +/// - There is no guarantee that the nonce is not used twice for the same account +pub fn random_signed_tx() -> TransactionSigned { + let tx = random_tx(); + let hash = tx.signature_hash(); + TransactionSigned { + transaction: tx, + hash, + signature: Signature { + // TODO + r: Default::default(), + s: Default::default(), + odd_y_parity: false, + }, + } +} + +/// Generate a random block filled with a random number of signed transactions (generated using +/// [random_signed_tx]). +/// +/// All fields use the default values (and are assumed to be invalid) except for: +/// +/// - `parent_hash` +/// - `transactions_root` +/// - `ommers_hash` +/// +/// Additionally, `gas_used` and `gas_limit` always exactly match the total `gas_limit` of all +/// transactions in the block. +/// +/// The ommer headers are not assumed to be valid. +pub fn random_block(number: u64, parent: Option) -> BlockLocked { + let mut rng = thread_rng(); + + // Generate transactions + let transactions: Vec = + (0..rand::random::()).into_iter().map(|_| random_signed_tx()).collect(); + let total_gas = transactions.iter().fold(0, |sum, tx| sum + tx.transaction.gas_limit()); + + // Generate ommers + let mut ommers = Vec::new(); + for _ in 0..rng.gen_range(0..2) { + ommers.push(random_header(number, parent).unseal()); + } + + // Calculate roots + let transactions_root = proofs::calculate_transaction_root(transactions.iter()); + let ommers_hash = proofs::calculate_ommers_root(ommers.iter()); + + BlockLocked { + header: Header { + parent_hash: parent.unwrap_or_default(), + number, + gas_used: total_gas, + gas_limit: total_gas, + transactions_root, + ommers_hash, + ..Default::default() + } + .seal(), + body: transactions, + ommers: ommers.into_iter().map(|ommer| ommer.seal()).collect(), + ..Default::default() + } +} + +/// Generate a range of random blocks. +/// +/// The parent hash of the first block +/// in the result will be equal to `head`. +/// +/// See [random_block] for considerations when validating the generated blocks. +pub fn random_block_range(rng: std::ops::Range, head: H256) -> Vec { + let mut blocks = Vec::with_capacity(rng.end.saturating_sub(rng.start) as usize); + for idx in rng { + blocks.push(random_block( + idx, + Some(blocks.last().map(|block: &BlockLocked| block.header.hash()).unwrap_or(head)), + )); + } + blocks +} diff --git a/crates/interfaces/src/test_utils/headers.rs b/crates/interfaces/src/test_utils/headers.rs index ddd9c0d13..e0f73fbe5 100644 --- a/crates/interfaces/src/test_utils/headers.rs +++ b/crates/interfaces/src/test_utils/headers.rs @@ -1,24 +1,25 @@ //! Testing support for headers related interfaces. use crate::{ - consensus::{self, Consensus}, + consensus::{self, Consensus, Error}, p2p::headers::{ client::{HeadersClient, HeadersRequest, HeadersResponse, HeadersStream}, - downloader::{DownloadError, Downloader}, + downloader::HeaderDownloader, + error::DownloadError, }, }; -use reth_primitives::{Header, SealedHeader, H256, H512, U256}; +use reth_primitives::{BlockLocked, Header, SealedHeader, H256, H512}; use reth_rpc_types::engine::ForkchoiceState; use std::{collections::HashSet, sync::Arc, time::Duration}; use tokio::sync::{broadcast, mpsc, watch}; use tokio_stream::{wrappers::BroadcastStream, StreamExt}; -#[derive(Debug)] /// A test downloader which just returns the values that have been pushed to it. -pub struct TestDownloader { +#[derive(Debug)] +pub struct TestHeaderDownloader { result: Result, DownloadError>, } -impl TestDownloader { +impl TestHeaderDownloader { /// Instantiates the downloader with the mock responses pub fn new(result: Result, DownloadError>) -> Self { Self { result } @@ -26,7 +27,7 @@ impl TestDownloader { } #[async_trait::async_trait] -impl Downloader for TestDownloader { +impl HeaderDownloader for TestHeaderDownloader { type Consensus = TestConsensus; type Client = TestHeadersClient; @@ -51,8 +52,8 @@ impl Downloader for TestDownloader { } } -#[derive(Debug)] /// A test client for fetching headers +#[derive(Debug)] pub struct TestHeadersClient { req_tx: mpsc::Sender<(u64, HeadersRequest)>, req_rx: Arc>>, @@ -109,7 +110,7 @@ impl HeadersClient for TestHeadersClient { } } -/// Consensus client impl for testing +/// Consensus engine implementation for testing #[derive(Debug)] pub struct TestConsensus { /// Watcher over the forkchoice state @@ -132,14 +133,14 @@ impl Default for TestConsensus { } impl TestConsensus { - /// Update the forkchoice state + /// Update the fork choice state pub fn update_tip(&self, tip: H256) { let state = ForkchoiceState { head_block_hash: tip, finalized_block_hash: H256::zero(), safe_block_hash: H256::zero(), }; - self.channel.0.send(state).expect("updating forkchoice state failed"); + self.channel.0.send(state).expect("updating fork choice state failed"); } /// Update the validation flag @@ -165,29 +166,12 @@ impl Consensus for TestConsensus { Ok(()) } } -} -/// Generate a range of random header. The parent hash of the first header -/// in the result will be equal to head -pub fn gen_random_header_range(rng: std::ops::Range, head: H256) -> Vec { - let mut headers = Vec::with_capacity(rng.end.saturating_sub(rng.start) as usize); - for idx in rng { - headers.push(gen_random_header( - idx, - Some(headers.last().map(|h: &SealedHeader| h.hash()).unwrap_or(head)), - )); + fn pre_validate_block(&self, _block: &BlockLocked) -> Result<(), Error> { + if self.fail_validation { + Err(consensus::Error::BaseFeeMissing) + } else { + Ok(()) + } } - headers -} - -/// Generate a random header -pub fn gen_random_header(number: u64, parent: Option) -> SealedHeader { - let header = reth_primitives::Header { - number, - nonce: rand::random(), - difficulty: U256::from(rand::random::()), - parent_hash: parent.unwrap_or_default(), - ..Default::default() - }; - header.seal() } diff --git a/crates/interfaces/src/test_utils/mod.rs b/crates/interfaces/src/test_utils/mod.rs index 7f034b659..415fccb74 100644 --- a/crates/interfaces/src/test_utils/mod.rs +++ b/crates/interfaces/src/test_utils/mod.rs @@ -1,5 +1,10 @@ mod api; +mod bodies; mod headers; +/// Generators for different data structures like block headers, block bodies and ranges of those. +pub mod generators; + pub use api::TestApi; +pub use bodies::*; pub use headers::*; diff --git a/crates/net/bodies-downloaders/Cargo.toml b/crates/net/bodies-downloaders/Cargo.toml new file mode 100644 index 000000000..c54b263f9 --- /dev/null +++ b/crates/net/bodies-downloaders/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "reth-bodies-downloaders" +version = "0.1.0" +edition = "2021" +license = "MIT OR Apache-2.0" +repository = "https://github.com/foundry-rs/reth" +readme = "README.md" +description = "Implementations of various block body downloaders" + +[dependencies] +futures-util = "0.3.25" +reth-interfaces = { path = "../../interfaces" } +reth-primitives = { path = "../../primitives" } +reth-eth-wire = { path= "../eth-wire" } +[dev-dependencies] +assert_matches = "1.5.0" +once_cell = "1.15.0" +rand = "0.8.5" +reth-interfaces = { path = "../../interfaces", features = ["test-utils"] } +tokio = { version = "1.21.2", features = ["full"] } +serial_test = "0.9.0" diff --git a/crates/net/bodies-downloaders/src/concurrent.rs b/crates/net/bodies-downloaders/src/concurrent.rs new file mode 100644 index 000000000..3fa66297c --- /dev/null +++ b/crates/net/bodies-downloaders/src/concurrent.rs @@ -0,0 +1,132 @@ +use futures_util::{stream, StreamExt, TryFutureExt}; +use reth_interfaces::p2p::bodies::{ + client::BodiesClient, + downloader::{BodiesStream, BodyDownloader}, + error::{BodiesClientError, DownloadError}, +}; +use reth_primitives::{BlockNumber, H256}; +use std::{sync::Arc, time::Duration}; + +/// Downloads bodies in batches. +/// +/// All blocks in a batch are fetched at the same time. +#[derive(Debug)] +pub struct ConcurrentDownloader { + /// The bodies client + client: Arc, + /// The batch size per one request + pub batch_size: usize, + /// A single request timeout + pub request_timeout: Duration, + /// The number of retries for downloading + pub request_retries: usize, +} + +impl BodyDownloader for ConcurrentDownloader { + type Client = C; + + /// The request timeout duration + fn timeout(&self) -> Duration { + self.request_timeout + } + + /// The block bodies client + fn client(&self) -> &Self::Client { + &self.client + } + + fn bodies_stream<'a, 'b, I>(&'a self, headers: I) -> BodiesStream<'a> + where + I: IntoIterator, + ::IntoIter: Send + 'b, + 'b: 'a, + { + // TODO: Retry + Box::pin( + stream::iter(headers.into_iter().map(|(block_number, header_hash)| { + { + self.client + .get_block_body(*header_hash) + .map_ok(move |body| (*block_number, *header_hash, body)) + .map_err(|err| match err { + BodiesClientError::Timeout { header_hash } => { + DownloadError::Timeout { header_hash } + } + err => DownloadError::Client { source: err }, + }) + } + })) + .buffered(self.batch_size), + ) + } +} + +/// A [ConcurrentDownloader] builder. +#[derive(Debug)] +pub struct ConcurrentDownloaderBuilder { + /// The batch size per one request + batch_size: usize, + /// A single request timeout + request_timeout: Duration, + /// The number of retries for downloading + request_retries: usize, +} + +impl Default for ConcurrentDownloaderBuilder { + fn default() -> Self { + Self { batch_size: 100, request_timeout: Duration::from_millis(100), request_retries: 5 } + } +} + +impl ConcurrentDownloaderBuilder { + /// Set the request batch size + pub fn batch_size(mut self, size: usize) -> Self { + self.batch_size = size; + self + } + + /// Set the request timeout + pub fn timeout(mut self, timeout: Duration) -> Self { + self.request_timeout = timeout; + self + } + + /// Set the number of retries per request + pub fn retries(mut self, retries: usize) -> Self { + self.request_retries = retries; + self + } + + /// Build [ConcurrentDownloader] with the provided client + pub fn build(self, client: Arc) -> ConcurrentDownloader { + ConcurrentDownloader { + client, + batch_size: self.batch_size, + request_timeout: self.request_timeout, + request_retries: self.request_retries, + } + } +} + +#[cfg(test)] +mod tests { + #[tokio::test] + #[ignore] + async fn emits_bodies_in_order() {} + + #[tokio::test] + #[ignore] + async fn header_iter_failure() {} + + #[tokio::test] + #[ignore] + async fn client_failure() {} + + #[tokio::test] + #[ignore] + async fn retries_requests() {} + + #[tokio::test] + #[ignore] + async fn timeout() {} +} diff --git a/crates/net/bodies-downloaders/src/lib.rs b/crates/net/bodies-downloaders/src/lib.rs new file mode 100644 index 000000000..e9f0fb6c3 --- /dev/null +++ b/crates/net/bodies-downloaders/src/lib.rs @@ -0,0 +1,11 @@ +#![warn(missing_docs, unreachable_pub)] +#![deny(unused_must_use, rust_2018_idioms)] +#![doc(test( + no_crate_inject, + attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) +))] + +//! Implements body downloader algorithms. + +/// A naive concurrent downloader. +pub mod concurrent; diff --git a/crates/net/ecies/src/lib.rs b/crates/net/ecies/src/lib.rs index ab53207c4..2b6ccbf9a 100644 --- a/crates/net/ecies/src/lib.rs +++ b/crates/net/ecies/src/lib.rs @@ -34,7 +34,7 @@ pub enum EgressECIESValue { #[derive(Clone, Debug, PartialEq, Eq)] /// Raw ingress values for an ECIES protocol pub enum IngressECIESValue { - /// Receiving a message from a [`peerId`] + /// Receiving a message from a [`PeerId`] AuthReceive(PeerId), /// Receiving an ACK message Ack, diff --git a/crates/net/eth-wire/src/types/blocks.rs b/crates/net/eth-wire/src/types/blocks.rs index 935b7f559..7f90bf6c9 100644 --- a/crates/net/eth-wire/src/types/blocks.rs +++ b/crates/net/eth-wire/src/types/blocks.rs @@ -108,6 +108,7 @@ impl From> for GetBlockBodies { } } +// TODO(onbjerg): We should have this type in primitives /// A response to [`GetBlockBodies`], containing bodies if any bodies were found. #[derive(Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable)] pub struct BlockBody { diff --git a/crates/net/headers-downloaders/src/linear.rs b/crates/net/headers-downloaders/src/linear.rs index cebdcdd67..b52718d0e 100644 --- a/crates/net/headers-downloaders/src/linear.rs +++ b/crates/net/headers-downloaders/src/linear.rs @@ -5,7 +5,8 @@ use reth_interfaces::{ consensus::Consensus, p2p::headers::{ client::{HeadersClient, HeadersStream}, - downloader::{DownloadError, Downloader}, + downloader::HeaderDownloader, + error::DownloadError, }, }; use reth_primitives::{rpc::BlockId, SealedHeader}; @@ -27,7 +28,7 @@ pub struct LinearDownloader { } #[async_trait] -impl Downloader for LinearDownloader { +impl HeaderDownloader for LinearDownloader { type Consensus = C; type Client = H; @@ -161,11 +162,6 @@ impl Default for LinearDownloadBuilder { } impl LinearDownloadBuilder { - /// Initialize a new builder - pub fn new() -> Self { - Self::default() - } - /// Set the request batch size pub fn batch_size(mut self, size: u64) -> Self { self.batch_size = size; @@ -207,7 +203,8 @@ mod tests { use reth_interfaces::{ p2p::headers::client::HeadersRequest, test_utils::{ - gen_random_header, gen_random_header_range, TestConsensus, TestHeadersClient, + generators::{random_header, random_header_range}, + TestConsensus, TestHeadersClient, }, }; use reth_primitives::{rpc::BlockId, SealedHeader}; @@ -233,7 +230,7 @@ mod tests { let retries = 5; let (tx, rx) = oneshot::channel(); tokio::spawn(async move { - let downloader = LinearDownloadBuilder::new() + let downloader = LinearDownloadBuilder::default() .retries(retries) .build(CONSENSUS.clone(), CLIENT.clone()); let result = @@ -257,7 +254,7 @@ mod tests { let retries = 5; let (tx, rx) = oneshot::channel(); tokio::spawn(async move { - let downloader = LinearDownloadBuilder::new() + let downloader = LinearDownloadBuilder::default() .retries(retries) .build(CONSENSUS.clone(), CLIENT.clone()); let result = @@ -286,14 +283,14 @@ mod tests { #[tokio::test] #[serial] async fn download_propagates_consensus_validation_error() { - let tip_parent = gen_random_header(1, None); - let tip = gen_random_header(2, Some(tip_parent.hash())); + let tip_parent = random_header(1, None); + let tip = random_header(2, Some(tip_parent.hash())); let tip_hash = tip.hash(); let (tx, rx) = oneshot::channel(); tokio::spawn(async move { let downloader = - LinearDownloadBuilder::new().build(CONSENSUS_FAIL.clone(), CLIENT.clone()); + LinearDownloadBuilder::default().build(CONSENSUS_FAIL.clone(), CLIENT.clone()); let forkchoice = ForkchoiceState { head_block_hash: tip_hash, ..Default::default() }; let result = downloader.download(&SealedHeader::default(), &forkchoice).await; tx.send(result).expect("failed to forward download response"); @@ -322,14 +319,15 @@ mod tests { #[tokio::test] #[serial] async fn download_starts_with_chain_tip() { - let head = gen_random_header(1, None); - let tip = gen_random_header(2, Some(head.hash())); + let head = random_header(1, None); + let tip = random_header(2, Some(head.hash())); let tip_hash = tip.hash(); let chain_head = head.clone(); let (tx, mut rx) = oneshot::channel(); tokio::spawn(async move { - let downloader = LinearDownloadBuilder::new().build(CONSENSUS.clone(), CLIENT.clone()); + let downloader = + LinearDownloadBuilder::default().build(CONSENSUS.clone(), CLIENT.clone()); let forkchoice = ForkchoiceState { head_block_hash: tip_hash, ..Default::default() }; let result = downloader.download(&chain_head, &forkchoice).await; tx.send(result).expect("failed to forward download response"); @@ -359,15 +357,16 @@ mod tests { #[serial] async fn download_returns_headers_desc() { let (start, end) = (100, 200); - let head = gen_random_header(start, None); - let mut headers = gen_random_header_range(start + 1..end, head.hash()); + let head = random_header(start, None); + let mut headers = random_header_range(start + 1..end, head.hash()); headers.reverse(); let tip_hash = headers.first().unwrap().hash(); let chain_head = head.clone(); let (tx, rx) = oneshot::channel(); tokio::spawn(async move { - let downloader = LinearDownloadBuilder::new().build(CONSENSUS.clone(), CLIENT.clone()); + let downloader = + LinearDownloadBuilder::default().build(CONSENSUS.clone(), CLIENT.clone()); let forkchoice = ForkchoiceState { head_block_hash: tip_hash, ..Default::default() }; let result = downloader.download(&chain_head, &forkchoice).await; tx.send(result).expect("failed to forward download response"); diff --git a/crates/primitives/Cargo.toml b/crates/primitives/Cargo.toml index 4418823a3..1406750d3 100644 --- a/crates/primitives/Cargo.toml +++ b/crates/primitives/Cargo.toml @@ -34,7 +34,11 @@ hex = "0.4" hex-literal = "0.3" derive_more = "0.99" - +# proof related +triehash = "0.8" +# See to replace hashers to simplify libraries +plain_hasher = "0.2" +hash-db = "0.15" [dev-dependencies] arbitrary = { version = "1.1.7", features = ["derive"]} diff --git a/crates/primitives/src/error.rs b/crates/primitives/src/error.rs index 2360eeb8c..191839916 100644 --- a/crates/primitives/src/error.rs +++ b/crates/primitives/src/error.rs @@ -4,8 +4,8 @@ use thiserror::Error; /// Primitives error type. #[derive(Debug, Error)] pub enum Error { - /// Input provided is invalid. - #[error("Input provided is invalid.")] + /// The provided input is invalid. + #[error("The provided input is invalid.")] InvalidInput, /// Failed to deserialize data into type. #[error("Failed to deserialize data into type.")] diff --git a/crates/primitives/src/header.rs b/crates/primitives/src/header.rs index 42da92604..4aa8c26a3 100644 --- a/crates/primitives/src/header.rs +++ b/crates/primitives/src/header.rs @@ -1,4 +1,7 @@ -use crate::{BlockHash, BlockNumber, Bloom, H160, H256, U256}; +use crate::{ + proofs::{EMPTY_LIST_HASH, EMPTY_ROOT}, + BlockHash, BlockNumber, Bloom, H160, H256, U256, +}; use bytes::{BufMut, BytesMut}; use ethers_core::{types::H64, utils::keccak256}; use reth_codecs::main_codec; @@ -7,7 +10,7 @@ use std::ops::Deref; /// Block header #[main_codec] -#[derive(Debug, Clone, PartialEq, Eq, Default, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Header { /// The Keccak 256-bit hash of the parent /// block’s header, in its entirety; formally Hp. @@ -64,6 +67,29 @@ pub struct Header { pub base_fee_per_gas: Option, } +impl Default for Header { + fn default() -> Self { + Header { + parent_hash: Default::default(), + ommers_hash: EMPTY_LIST_HASH, + beneficiary: Default::default(), + state_root: EMPTY_ROOT, + transactions_root: EMPTY_ROOT, + receipts_root: EMPTY_ROOT, + logs_bloom: Default::default(), + difficulty: Default::default(), + number: 0, + gas_limit: 0, + gas_used: 0, + timestamp: 0, + extra_data: Default::default(), + mix_hash: Default::default(), + nonce: 0, + base_fee_per_gas: None, + } + } +} + impl Header { /// Heavy function that will calculate hash of data and will *not* save the change to metadata. /// Use [`Header::seal`], [`SealedHeader`] and unlock if you need hash to be persistent. @@ -239,6 +265,10 @@ mod tests { gas_used: 0x15b3_u64, timestamp: 0x1a0a_u64, extra_data: Bytes::from_str("7788").unwrap().0, + ommers_hash: H256::zero(), + state_root: H256::zero(), + transactions_root: H256::zero(), + receipts_root: H256::zero(), ..Default::default() }; let mut data = vec![]; @@ -285,6 +315,10 @@ mod tests { gas_used: 0x15b3u64, timestamp: 0x1a0au64, extra_data: Bytes::from_str("7788").unwrap().0, + ommers_hash: H256::zero(), + state_root: H256::zero(), + transactions_root: H256::zero(), + receipts_root: H256::zero(), ..Default::default() }; let header =
::decode(&mut data.as_slice()).unwrap(); diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index d70f9253a..4c172e4d3 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -6,6 +6,8 @@ ))] //! Commonly used types in reth. +//! +//! This crate contains Ethereum primitive types and helper functions. mod account; mod block; @@ -23,6 +25,9 @@ mod receipt; mod storage; mod transaction; +/// Helper function for calculating Merkle proofs and hashes +pub mod proofs; + pub use account::Account; pub use block::{Block, BlockLocked}; pub use chain::Chain; @@ -41,25 +46,24 @@ pub use transaction::{ TransactionSignedEcRecovered, TxType, }; -/// Block hash. +/// A block hash. pub type BlockHash = H256; -/// Block Number is height of chain +/// A block number. pub type BlockNumber = u64; -/// Ethereum address +/// An Ethereum address. pub type Address = H160; +// TODO(onbjerg): Is this not the same as [BlockHash]? /// BlockId is Keccak hash of the header pub type BlockID = H256; -/// TxHash is Kecack hash of rlp encoded signed transaction +/// A transaction hash is a kecack hash of an RLP encoded signed transaction. pub type TxHash = H256; -/// TxNumber is sequence number of all existing transactions +/// The sequence number of all existing transactions. pub type TxNumber = u64; -/// Chain identifier type, introduced in EIP-155 +/// Chain identifier type (introduced in EIP-155). pub type ChainId = u64; - -/// Storage Key +/// An account storage key. pub type StorageKey = H256; - -/// Storage value +/// An account storage value. pub type StorageValue = U256; // TODO: should we use `PublicKey` for this? Even when dealing with public keys we should try to diff --git a/crates/consensus/src/proofs.rs b/crates/primitives/src/proofs.rs similarity index 72% rename from crates/consensus/src/proofs.rs rename to crates/primitives/src/proofs.rs index 964cc8159..1c8fcb477 100644 --- a/crates/consensus/src/proofs.rs +++ b/crates/primitives/src/proofs.rs @@ -1,26 +1,38 @@ +use crate::{keccak256, Bytes, Header, Log, Receipt, TransactionSigned, H256}; +use ethers_core::utils::rlp::RlpStream; use hash_db::Hasher; +use hex_literal::hex; use plain_hasher::PlainHasher; -use reth_primitives::{Bytes, Header, Log, Receipt, TransactionSigned, H256}; use reth_rlp::Encodable; -use rlp::RlpStream; -use sha3::{Digest, Keccak256}; use triehash::sec_trie_root; +/// Keccak-256 hash of the RLP of an empty list, KEC("\xc0"). +pub const EMPTY_LIST_HASH: H256 = + H256(hex!("1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347")); + +/// Root hash of an empty trie. +pub const EMPTY_ROOT: H256 = + H256(hex!("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")); + +/// A [Hasher] that calculates a keccak256 hash of the given data. #[derive(Default, Debug, Clone, PartialEq, Eq)] struct KeccakHasher; + impl Hasher for KeccakHasher { type Out = H256; type StdHasher = PlainHasher; + const LENGTH: usize = 32; + fn hash(x: &[u8]) -> Self::Out { - let out = Keccak256::digest(x); - // TODO make more performant, H256 from slice is not good enought. - H256::from_slice(out.as_slice()) + keccak256(x) } } -/// Calculate Transaction root. Iterate over transaction and create merkle trie of -/// (rlp(index),encoded(tx)) pairs. +/// Calculate a transaction root. +/// +/// Iterates over the given transactions and the merkle merkle trie root of +/// `(rlp(index), encoded(tx))` pairs. pub fn calculate_transaction_root<'a>( transactions: impl IntoIterator, ) -> H256 { @@ -40,7 +52,7 @@ pub fn calculate_transaction_root<'a>( ) } -/// Create receipt root for header +/// Calculates the receipt root for a header. pub fn calculate_receipt_root<'a>(receipts: impl IntoIterator) -> H256 { sec_trie_root::( receipts @@ -57,7 +69,7 @@ pub fn calculate_receipt_root<'a>(receipts: impl IntoIterator(logs: impl IntoIterator) -> H256 { //https://github.com/ethereum/go-ethereum/blob/356bbe343a30789e77bb38f25983c8f2f2bfbb47/cmd/evm/internal/t8ntool/execution.go#L255 let mut stream = RlpStream::new(); @@ -71,11 +83,10 @@ pub fn calculate_log_root<'a>(logs: impl IntoIterator) -> H256 { stream.finalize_unbounded_list(); let out = stream.out().freeze(); - let out = Keccak256::digest(out); - H256::from_slice(out.as_slice()) + keccak256(out) } -/// Calculate hash for ommer/uncle headers +/// Calculates the root hash for ommer/uncle headers. pub fn calculate_ommers_root<'a>(_ommers: impl IntoIterator) -> H256 { // RLP Encode let mut stream = RlpStream::new(); @@ -87,8 +98,7 @@ pub fn calculate_ommers_root<'a>(_ommers: impl IntoIterator) */ stream.finalize_unbounded_list(); let bytes = stream.out().freeze(); - let out = Keccak256::digest(bytes); - H256::from_slice(out.as_slice()) + keccak256(bytes) } // TODO state root diff --git a/crates/primitives/src/transaction/mod.rs b/crates/primitives/src/transaction/mod.rs index 13c022a96..c6953f14c 100644 --- a/crates/primitives/src/transaction/mod.rs +++ b/crates/primitives/src/transaction/mod.rs @@ -13,8 +13,9 @@ use reth_rlp::{length_of_length, Decodable, DecodeError, Encodable, Header, EMPT pub use signature::Signature; pub use tx_type::TxType; -/// Raw Transaction. -/// Transaction type is introduced in EIP-2718: https://eips.ethereum.org/EIPS/eip-2718 +/// A raw transaction. +/// +/// Transaction types were introduced in [EIP-2718](https://eips.ethereum.org/EIPS/eip-2718). #[main_codec] #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum Transaction { @@ -49,7 +50,7 @@ pub enum Transaction { /// input data of the message call, formally Td. input: Bytes, }, - /// Transaction with AccessList. https://eips.ethereum.org/EIPS/eip-2930 + /// Transaction with an [`AccessList`] ([EIP-2930](https://eips.ethereum.org/EIPS/eip-2930)). Eip2930 { /// Added as EIP-155: Simple replay attack protection chain_id: ChainId, @@ -86,7 +87,7 @@ pub enum Transaction { /// accessing outside the list. access_list: AccessList, }, - /// Transaction with priority fee. https://eips.ethereum.org/EIPS/eip-1559 + /// A transaction with a priority fee ([EIP-1559](https://eips.ethereum.org/EIPS/eip-1559)). Eip1559 { /// Added as EIP-155: Simple replay attack protection chain_id: u64, @@ -175,6 +176,15 @@ impl Transaction { } } + /// Get the gas limit of the transaction. + pub fn gas_limit(&self) -> u64 { + match self { + Transaction::Legacy { gas_limit, .. } | + Transaction::Eip2930 { gas_limit, .. } | + Transaction::Eip1559 { gas_limit, .. } => *gas_limit, + } + } + /// Max fee per gas for eip1559 transaction, for legacy transactions this is gas_limit pub fn max_fee_per_gas(&self) -> u64 { match self { diff --git a/crates/stages/Cargo.toml b/crates/stages/Cargo.toml index 099d64e8b..5dcb9688a 100644 --- a/crates/stages/Cargo.toml +++ b/crates/stages/Cargo.toml @@ -18,10 +18,14 @@ tracing-futures = "0.2.5" tokio = { version = "1.21.2", features = ["sync"] } aquamarine = "0.1.12" metrics = "0.20.1" +futures-util = "0.3.25" [dev-dependencies] reth-db = { path = "../db", features = ["test-utils"] } reth-interfaces = { path = "../interfaces", features = ["test-utils"] } +reth-bodies-downloaders = { path = "../net/bodies-downloaders" } +# TODO(onbjerg): We only need this for [BlockBody] +reth-eth-wire = { path = "../net/eth-wire" } reth-headers-downloaders = { path = "../net/headers-downloaders" } tokio = { version = "*", features = ["rt", "sync", "macros"] } tokio-stream = "0.1.10" diff --git a/crates/stages/src/error.rs b/crates/stages/src/error.rs index 522457917..9ab07435c 100644 --- a/crates/stages/src/error.rs +++ b/crates/stages/src/error.rs @@ -1,5 +1,5 @@ use crate::pipeline::PipelineEvent; -use reth_interfaces::db::Error as DbError; +use reth_interfaces::{consensus, db::Error as DbError}; use reth_primitives::{BlockNumber, H256}; use thiserror::Error; use tokio::sync::mpsc::error::SendError; @@ -8,12 +8,13 @@ use tokio::sync::mpsc::error::SendError; #[derive(Error, Debug)] pub enum StageError { /// The stage encountered a state validation error. - /// - /// TODO: This depends on the consensus engine and should include the validation failure reason - #[error("Stage encountered a validation error in block {block}.")] + #[error("Stage encountered a validation error in block {block}: {error}.")] Validation { /// The block that failed validation. block: BlockNumber, + /// The underlying consensus error. + #[source] + error: consensus::Error, }, /// The stage encountered a database error. #[error("An internal database error occurred.")] @@ -30,34 +31,41 @@ pub enum StageError { /// The sender stage error #[derive(Error, Debug)] pub enum DatabaseIntegrityError { - /// Cannonical hash is missing from db - #[error("no cannonical hash for block #{number}")] - CannonicalHash { + // TODO(onbjerg): What's the difference between this and the one below? + /// The canonical hash for a block is missing from the database. + #[error("No canonical hash for block #{number}")] + CanonicalHash { /// The block number key number: BlockNumber, }, - /// Cannonical header is missing from db - #[error("no cannonical hash for block #{number}")] - CannonicalHeader { + /// The canonical header for a block is missing from the database. + #[error("No canonical hash for block #{number}")] + CanonicalHeader { /// The block number key number: BlockNumber, }, - /// Header is missing from db - #[error("no header for block #{number} ({hash})")] + /// A header is missing from the database. + #[error("No header for block #{number} ({hash})")] Header { /// The block number key number: BlockNumber, /// The block hash key hash: H256, }, - /// Cumulative transaction count is missing from db - #[error("no cumulative tx count for ${number} ({hash})")] + /// The cumulative transaction count is missing from the database. + #[error("No cumulative tx count for ${number} ({hash})")] CumulativeTxCount { /// The block number key number: BlockNumber, /// The block hash key hash: H256, }, + /// A block body is missing. + #[error("Block body not found for block #{number}")] + BlockBody { + /// The block number key + number: BlockNumber, + }, } /// A pipeline execution error. diff --git a/crates/stages/src/pipeline.rs b/crates/stages/src/pipeline.rs index 74914d73a..f3581c2d2 100644 --- a/crates/stages/src/pipeline.rs +++ b/crates/stages/src/pipeline.rs @@ -341,8 +341,8 @@ impl QueuedStage { Err(err) => { state.events_sender.send(PipelineEvent::Error { stage_id }).await?; - return if let StageError::Validation { block } = err { - debug!(stage = %stage_id, bad_block = %block, "Stage encountered a validation error."); + return if let StageError::Validation { block, error } = err { + debug!(stage = %stage_id, bad_block = %block, "Stage encountered a validation error: {error}"); // We unwind because of a validation error. If the unwind itself fails, // we bail entirely, otherwise we restart the execution loop from the @@ -362,13 +362,13 @@ impl QueuedStage { #[cfg(test)] mod tests { - use super::*; use crate::{StageId, UnwindOutput}; use reth_db::{ kv::{test_utils, Env, EnvKind}, mdbx::{self, WriteMap}, }; + use reth_interfaces::consensus; use tokio::sync::mpsc::channel; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use utils::TestStage; @@ -520,7 +520,10 @@ mod tests { ) .push( TestStage::new(StageId("B")) - .add_exec(Err(StageError::Validation { block: 5 })) + .add_exec(Err(StageError::Validation { + block: 5, + error: consensus::Error::BaseFeeMissing, + })) .add_unwind(Ok(UnwindOutput { stage_progress: 0 })) .add_exec(Ok(ExecOutput { stage_progress: 10, diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs new file mode 100644 index 000000000..d7916e587 --- /dev/null +++ b/crates/stages/src/stages/bodies.rs @@ -0,0 +1,842 @@ +use crate::{ + DatabaseIntegrityError, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, + UnwindOutput, +}; +use futures_util::TryStreamExt; +use reth_interfaces::{ + consensus::Consensus, + db::{ + models::StoredBlockBody, tables, DBContainer, Database, DatabaseGAT, DbCursorRO, + DbCursorRW, DbTx, DbTxMut, + }, + p2p::bodies::downloader::BodyDownloader, +}; +use reth_primitives::{ + proofs::{EMPTY_LIST_HASH, EMPTY_ROOT}, + BlockLocked, BlockNumber, SealedHeader, H256, +}; +use std::fmt::Debug; +use tracing::warn; + +const BODIES: StageId = StageId("Bodies"); + +// TODO(onbjerg): Metrics and events (gradual status for e.g. CLI) +/// The body stage downloads block bodies. +/// +/// The body stage downloads block bodies for all block headers stored locally in the database. +/// +/// # Empty blocks +/// +/// Blocks with an ommers hash corresponding to no ommers *and* a transaction root corresponding to +/// no transactions will not have a block body downloaded for them, since it would be meaningless to +/// do so. +/// +/// This also means that if there is no body for the block in the database (assuming the +/// block number <= the synced block of this stage), then the block can be considered empty. +/// +/// # Tables +/// +/// The bodies are processed and data is inserted into these tables: +/// +/// - [`BlockBodies`][reth_interfaces::db::tables::BlockBodies] +/// - [`Transactions`][reth_interfaces::db::tables::Transactions] +/// +/// # Genesis +/// +/// This stage expects that the genesis has been inserted into the appropriate tables: +/// +/// - The header tables (see [HeadersStage][crate::stages::headers::HeadersStage]) +/// - The various indexes (e.g. [TotalTxIndex][crate::stages::tx_index::TxIndex]) +/// - The [`BlockBodies`][reth_interfaces::db::tables::BlockBodies] table +#[derive(Debug)] +pub struct BodyStage { + /// The body downloader. + pub downloader: D, + /// The consensus engine. + pub consensus: C, + /// The maximum amount of block bodies to process in one stage execution. + /// + /// Smaller batch sizes result in less memory usage, but more disk I/O. Larger batch sizes + /// result in more memory usage, less disk I/O, and more infrequent checkpoints. + pub batch_size: u64, +} + +#[async_trait::async_trait] +impl Stage for BodyStage { + /// Return the id of the stage + fn id(&self) -> StageId { + BODIES + } + + /// Download block bodies from the last checkpoint for this stage up until the latest synced + /// header, limited by the stage's batch size. + async fn execute( + &mut self, + db: &mut DBContainer<'_, DB>, + input: ExecInput, + ) -> Result { + let tx = db.get_mut(); + + let previous_stage_progress = + input.previous_stage.as_ref().map(|(_, block)| *block).unwrap_or_default(); + if previous_stage_progress == 0 { + warn!("The body stage seems to be running first, no work can be completed."); + } + + // The block we ended at last sync, and the one we are starting on now + let previous_block = input.stage_progress.unwrap_or_default(); + let starting_block = previous_block + 1; + + // Short circuit in case we already reached the target block + let target = previous_stage_progress.min(starting_block + self.batch_size); + if target <= previous_block { + return Ok(ExecOutput { stage_progress: target, reached_tip: true, done: true }) + } + + let bodies_to_download = self.bodies_to_download::(tx, starting_block, target)?; + + // Cursors used to write bodies and transactions + let mut bodies_cursor = tx.cursor_mut::()?; + let mut tx_cursor = tx.cursor_mut::()?; + let mut base_tx_id = bodies_cursor + .last()? + .map(|(_, body)| body.base_tx_id + body.tx_amount) + .ok_or(DatabaseIntegrityError::BlockBody { number: starting_block })?; + + // Cursor used to look up headers for block pre-validation + let mut header_cursor = tx.cursor::()?; + + // NOTE(onbjerg): The stream needs to live here otherwise it will just create a new iterator + // on every iteration of the while loop -_- + let mut bodies_stream = self.downloader.bodies_stream(bodies_to_download.iter()); + let mut highest_block = previous_block; + while let Some((block_number, header_hash, body)) = + bodies_stream.try_next().await.map_err(|err| StageError::Internal(err.into()))? + { + // Fetch the block header for pre-validation + let block = BlockLocked { + header: SealedHeader::new( + header_cursor + .seek_exact((block_number, header_hash).into())? + .ok_or(DatabaseIntegrityError::Header { + number: block_number, + hash: header_hash, + })? + .1, + header_hash, + ), + body: body.transactions, + // TODO: We should have a type w/o receipts probably, no reason to allocate here + receipts: vec![], + ommers: body.ommers.into_iter().map(|header| header.seal()).collect(), + }; + + // Pre-validate the block and unwind if it is invalid + self.consensus + .pre_validate_block(&block) + .map_err(|err| StageError::Validation { block: block_number, error: err })?; + + // Write block + bodies_cursor.append( + (block_number, header_hash).into(), + StoredBlockBody { + base_tx_id, + tx_amount: block.body.len() as u64, + ommers: block.ommers.into_iter().map(|header| header.unseal()).collect(), + }, + )?; + + // Write transactions + for transaction in block.body { + tx_cursor.append(base_tx_id, transaction)?; + base_tx_id += 1; + } + + highest_block = block_number; + } + + // The stage is "done" if: + // - We got fewer blocks than our target + // - We reached our target and the target was not limited by the batch size of the stage + let capped = target < previous_stage_progress; + let done = highest_block < target || !capped; + + Ok(ExecOutput { stage_progress: highest_block, reached_tip: true, done }) + } + + /// Unwind the stage. + async fn unwind( + &mut self, + db: &mut DBContainer<'_, DB>, + input: UnwindInput, + ) -> Result> { + let tx = db.get_mut(); + let mut block_body_cursor = tx.cursor_mut::()?; + let mut transaction_cursor = tx.cursor_mut::()?; + + let mut entry = block_body_cursor.last()?; + while let Some((key, body)) = entry { + if key.number() <= input.unwind_to { + break + } + + for num in 0..body.tx_amount { + let tx_id = body.base_tx_id + num; + if transaction_cursor.seek_exact(tx_id)?.is_some() { + transaction_cursor.delete_current()?; + } + } + + block_body_cursor.delete_current()?; + entry = block_body_cursor.prev()?; + } + + Ok(UnwindOutput { stage_progress: input.unwind_to }) + } +} + +impl BodyStage { + /// Computes a list of `(block_number, header_hash)` for blocks that we need to download bodies + /// for. + /// + /// This skips empty blocks (i.e. no ommers, no transactions). + fn bodies_to_download( + &self, + tx: &mut >::TXMut, + starting_block: BlockNumber, + target: BlockNumber, + ) -> Result, StageError> { + let mut header_cursor = tx.cursor::()?; + let mut header_hashes_cursor = tx.cursor::()?; + let mut walker = header_hashes_cursor + .walk(starting_block)? + .take_while(|item| item.as_ref().map_or(false, |(num, _)| *num <= target)); + + let mut bodies_to_download = Vec::new(); + while let Some(Ok((block_number, header_hash))) = walker.next() { + let header = header_cursor + .seek_exact((block_number, header_hash).into())? + .ok_or(DatabaseIntegrityError::Header { number: block_number, hash: header_hash })? + .1; + if header.ommers_hash == EMPTY_LIST_HASH && header.transactions_root == EMPTY_ROOT { + continue + } + + bodies_to_download.push((block_number, header_hash)); + } + + Ok(bodies_to_download) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::util::test_utils::StageTestRunner; + use assert_matches::assert_matches; + use reth_eth_wire::BlockBody; + use reth_interfaces::{ + consensus, + p2p::bodies::error::DownloadError, + test_utils::generators::{random_block, random_block_range}, + }; + use reth_primitives::{BlockNumber, H256}; + use std::collections::HashMap; + use test_utils::*; + + /// Check that the execution is short-circuited if the database is empty. + #[tokio::test] + async fn empty_db() { + let runner = BodyTestRunner::new(TestBodyDownloader::default); + let rx = runner.execute(ExecInput::default()); + assert_matches!( + rx.await.unwrap(), + Ok(ExecOutput { stage_progress: 0, reached_tip: true, done: true }) + ) + } + + /// Check that the execution is short-circuited if the target was already reached. + #[tokio::test] + async fn already_reached_target() { + let runner = BodyTestRunner::new(TestBodyDownloader::default); + let rx = runner.execute(ExecInput { + previous_stage: Some((StageId("Headers"), 100)), + stage_progress: Some(100), + }); + assert_matches!( + rx.await.unwrap(), + Ok(ExecOutput { stage_progress: 100, reached_tip: true, done: true }) + ) + } + + /// Checks that the stage downloads at most `batch_size` blocks. + #[tokio::test] + async fn partial_body_download() { + // Generate blocks + let blocks = random_block_range(1..200, GENESIS_HASH); + let bodies: HashMap> = + blocks.iter().map(body_by_hash).collect(); + let mut runner = BodyTestRunner::new(|| TestBodyDownloader::new(bodies.clone())); + + // Set the batch size (max we sync per stage execution) to less than the number of blocks + // the previous stage synced (10 vs 20) + runner.set_batch_size(10); + + // Insert required state + runner.insert_genesis().expect("Could not insert genesis block"); + runner + .insert_headers(blocks.iter().map(|block| &block.header)) + .expect("Could not insert headers"); + + // Run the stage + let rx = runner.execute(ExecInput { + previous_stage: Some((StageId("Headers"), blocks.len() as BlockNumber)), + stage_progress: None, + }); + + // Check that we only synced around `batch_size` blocks even though the number of blocks + // synced by the previous stage is higher + let output = rx.await.unwrap(); + assert_matches!( + output, + Ok(ExecOutput { stage_progress, reached_tip: true, done: false }) if stage_progress < 200 + ); + runner + .validate_db_blocks(output.unwrap().stage_progress) + .expect("Written block data invalid"); + } + + /// Same as [partial_body_download] except the `batch_size` is not hit. + #[tokio::test] + async fn full_body_download() { + // Generate blocks #1-20 + let blocks = random_block_range(1..21, GENESIS_HASH); + let bodies: HashMap> = + blocks.iter().map(body_by_hash).collect(); + let mut runner = BodyTestRunner::new(|| TestBodyDownloader::new(bodies.clone())); + + // Set the batch size to more than what the previous stage synced (40 vs 20) + runner.set_batch_size(40); + + // Insert required state + runner.insert_genesis().expect("Could not insert genesis block"); + runner + .insert_headers(blocks.iter().map(|block| &block.header)) + .expect("Could not insert headers"); + + // Run the stage + let rx = runner.execute(ExecInput { + previous_stage: Some((StageId("Headers"), blocks.len() as BlockNumber)), + stage_progress: None, + }); + + // Check that we synced all blocks successfully, even though our `batch_size` allows us to + // sync more (if there were more headers) + let output = rx.await.unwrap(); + assert_matches!( + output, + Ok(ExecOutput { stage_progress: 20, reached_tip: true, done: true }) + ); + runner + .validate_db_blocks(output.unwrap().stage_progress) + .expect("Written block data invalid"); + } + + /// Same as [full_body_download] except we have made progress before + #[tokio::test] + async fn sync_from_previous_progress() { + // Generate blocks #1-20 + let blocks = random_block_range(1..21, GENESIS_HASH); + let bodies: HashMap> = + blocks.iter().map(body_by_hash).collect(); + let runner = BodyTestRunner::new(|| TestBodyDownloader::new(bodies.clone())); + + // Insert required state + runner.insert_genesis().expect("Could not insert genesis block"); + runner + .insert_headers(blocks.iter().map(|block| &block.header)) + .expect("Could not insert headers"); + + // Run the stage + let rx = runner.execute(ExecInput { + previous_stage: Some((StageId("Headers"), blocks.len() as BlockNumber)), + stage_progress: None, + }); + + // Check that we synced at least 10 blocks + let first_run = rx.await.unwrap(); + assert_matches!( + first_run, + Ok(ExecOutput { stage_progress, reached_tip: true, done: false }) if stage_progress >= 10 + ); + let first_run_progress = first_run.unwrap().stage_progress; + + // Execute again on top of the previous run + let rx = runner.execute(ExecInput { + previous_stage: Some((StageId("Headers"), blocks.len() as BlockNumber)), + stage_progress: Some(first_run_progress), + }); + + // Check that we synced more blocks + let output = rx.await.unwrap(); + assert_matches!( + output, + Ok(ExecOutput { stage_progress, reached_tip: true, done: true }) if stage_progress > first_run_progress + ); + runner + .validate_db_blocks(output.unwrap().stage_progress) + .expect("Written block data invalid"); + } + + /// Checks that the stage asks to unwind if pre-validation of the block fails. + #[tokio::test] + async fn pre_validation_failure() { + // Generate blocks #1-19 + let blocks = random_block_range(1..20, GENESIS_HASH); + let bodies: HashMap> = + blocks.iter().map(body_by_hash).collect(); + let mut runner = BodyTestRunner::new(|| TestBodyDownloader::new(bodies.clone())); + + // Fail validation + runner.set_fail_validation(true); + + // Insert required state + runner.insert_genesis().expect("Could not insert genesis block"); + runner + .insert_headers(blocks.iter().map(|block| &block.header)) + .expect("Could not insert headers"); + + // Run the stage + let rx = runner.execute(ExecInput { + previous_stage: Some((StageId("Headers"), blocks.len() as BlockNumber)), + stage_progress: None, + }); + + // Check that the error bubbles up + assert_matches!( + rx.await.unwrap(), + Err(StageError::Validation { block: 1, error: consensus::Error::BaseFeeMissing }) + ); + } + + /// Checks that the stage unwinds correctly with no data. + #[tokio::test] + async fn unwind_empty_db() { + let unwind_to = 10; + let runner = BodyTestRunner::new(TestBodyDownloader::default); + let rx = runner.unwind(UnwindInput { bad_block: None, stage_progress: 20, unwind_to }); + assert_matches!( + rx.await.unwrap(), + Ok(UnwindOutput { stage_progress }) if stage_progress == unwind_to + ) + } + + /// Checks that the stage unwinds correctly with data. + #[tokio::test] + async fn unwind() { + // Generate blocks #1-20 + let blocks = random_block_range(1..21, GENESIS_HASH); + let bodies: HashMap> = + blocks.iter().map(body_by_hash).collect(); + let mut runner = BodyTestRunner::new(|| TestBodyDownloader::new(bodies.clone())); + + // Set the batch size to more than what the previous stage synced (40 vs 20) + runner.set_batch_size(40); + + // Insert required state + runner.insert_genesis().expect("Could not insert genesis block"); + runner + .insert_headers(blocks.iter().map(|block| &block.header)) + .expect("Could not insert headers"); + + // Run the stage + let rx = runner.execute(ExecInput { + previous_stage: Some((StageId("Headers"), blocks.len() as BlockNumber)), + stage_progress: None, + }); + + // Check that we synced all blocks successfully, even though our `batch_size` allows us to + // sync more (if there were more headers) + let output = rx.await.unwrap(); + assert_matches!( + output, + Ok(ExecOutput { stage_progress: 20, reached_tip: true, done: true }) + ); + let stage_progress = output.unwrap().stage_progress; + runner.validate_db_blocks(stage_progress).expect("Written block data invalid"); + + // Unwind all of it + let unwind_to = 1; + let rx = runner.unwind(UnwindInput { bad_block: None, stage_progress, unwind_to }); + assert_matches!( + rx.await.unwrap(), + Ok(UnwindOutput { stage_progress }) if stage_progress == 1 + ); + + let last_body = runner.last_body().expect("Could not read last body"); + let last_tx_id = last_body.base_tx_id + last_body.tx_amount; + runner + .db() + .check_no_entry_above::(unwind_to, |key| key.number()) + .expect("Did not unwind block bodies correctly."); + runner + .db() + .check_no_entry_above::(last_tx_id, |key| key) + .expect("Did not unwind transactions correctly.") + } + + /// Checks that the stage unwinds correctly, even if a transaction in a block is missing. + #[tokio::test] + async fn unwind_missing_tx() { + // Generate blocks #1-20 + let blocks = random_block_range(1..21, GENESIS_HASH); + let bodies: HashMap> = + blocks.iter().map(body_by_hash).collect(); + let mut runner = BodyTestRunner::new(|| TestBodyDownloader::new(bodies.clone())); + + // Set the batch size to more than what the previous stage synced (40 vs 20) + runner.set_batch_size(40); + + // Insert required state + runner.insert_genesis().expect("Could not insert genesis block"); + runner + .insert_headers(blocks.iter().map(|block| &block.header)) + .expect("Could not insert headers"); + + // Run the stage + let rx = runner.execute(ExecInput { + previous_stage: Some((StageId("Headers"), blocks.len() as BlockNumber)), + stage_progress: None, + }); + + // Check that we synced all blocks successfully, even though our `batch_size` allows us to + // sync more (if there were more headers) + let output = rx.await.unwrap(); + assert_matches!( + output, + Ok(ExecOutput { stage_progress: 20, reached_tip: true, done: true }) + ); + let stage_progress = output.unwrap().stage_progress; + runner.validate_db_blocks(stage_progress).expect("Written block data invalid"); + + // Delete a transaction + { + let mut db = runner.db().container(); + let mut tx_cursor = db + .get_mut() + .cursor_mut::() + .expect("Could not get transaction cursor"); + tx_cursor + .last() + .expect("Could not read database") + .expect("Could not read last transaction"); + tx_cursor.delete_current().expect("Could not delete last transaction"); + db.commit().expect("Could not commit database"); + } + + // Unwind all of it + let unwind_to = 1; + let rx = runner.unwind(UnwindInput { bad_block: None, stage_progress, unwind_to }); + assert_matches!( + rx.await.unwrap(), + Ok(UnwindOutput { stage_progress }) if stage_progress == 1 + ); + + let last_body = runner.last_body().expect("Could not read last body"); + let last_tx_id = last_body.base_tx_id + last_body.tx_amount; + runner + .db() + .check_no_entry_above::(unwind_to, |key| key.number()) + .expect("Did not unwind block bodies correctly."); + runner + .db() + .check_no_entry_above::(last_tx_id, |key| key) + .expect("Did not unwind transactions correctly.") + } + + /// Checks that the stage exits if the downloader times out + /// TODO: We should probably just exit as "OK", commit the blocks we downloaded successfully and + /// try again? + #[tokio::test] + async fn downloader_timeout() { + // Generate a header + let header = random_block(1, Some(GENESIS_HASH)).header; + let runner = BodyTestRunner::new(|| { + TestBodyDownloader::new(HashMap::from([( + header.hash(), + Err(DownloadError::Timeout { header_hash: header.hash() }), + )])) + }); + + // Insert required state + runner.insert_genesis().expect("Could not insert genesis block"); + runner.insert_header(&header).expect("Could not insert header"); + + // Run the stage + let rx = runner.execute(ExecInput { + previous_stage: Some((StageId("Headers"), 1)), + stage_progress: None, + }); + + // Check that the error bubbles up + assert_matches!(rx.await.unwrap(), Err(StageError::Internal(_))); + } + + mod test_utils { + use crate::{ + stages::bodies::BodyStage, + util::test_utils::{StageTestDB, StageTestRunner}, + }; + use assert_matches::assert_matches; + use async_trait::async_trait; + use reth_eth_wire::BlockBody; + use reth_interfaces::{ + db, + db::{ + models::{BlockNumHash, StoredBlockBody}, + tables, DbCursorRO, DbTx, DbTxMut, + }, + p2p::bodies::{ + client::BodiesClient, + downloader::{BodiesStream, BodyDownloader}, + error::{BodiesClientError, DownloadError}, + }, + test_utils::TestConsensus, + }; + use reth_primitives::{ + BigEndianHash, BlockLocked, BlockNumber, Header, SealedHeader, H256, U256, + }; + use std::{collections::HashMap, ops::Deref, time::Duration}; + + /// The block hash of the genesis block. + pub(crate) const GENESIS_HASH: H256 = H256::zero(); + + /// A helper to create a collection of resulted-wrapped block bodies keyed by their hash. + pub(crate) fn body_by_hash( + block: &BlockLocked, + ) -> (H256, Result) { + ( + block.hash(), + Ok(BlockBody { + transactions: block.body.clone(), + ommers: block.ommers.iter().cloned().map(|ommer| ommer.unseal()).collect(), + }), + ) + } + + /// A helper struct for running the [BodyStage]. + pub(crate) struct BodyTestRunner + where + F: Fn() -> TestBodyDownloader, + { + downloader_builder: F, + db: StageTestDB, + batch_size: u64, + fail_validation: bool, + } + + impl BodyTestRunner + where + F: Fn() -> TestBodyDownloader, + { + /// Build a new test runner. + pub(crate) fn new(downloader_builder: F) -> Self { + BodyTestRunner { + downloader_builder, + db: StageTestDB::default(), + batch_size: 10, + fail_validation: false, + } + } + + pub(crate) fn set_batch_size(&mut self, batch_size: u64) { + self.batch_size = batch_size; + } + + pub(crate) fn set_fail_validation(&mut self, fail_validation: bool) { + self.fail_validation = fail_validation; + } + } + + impl StageTestRunner for BodyTestRunner + where + F: Fn() -> TestBodyDownloader, + { + type S = BodyStage; + + fn db(&self) -> &StageTestDB { + &self.db + } + + fn stage(&self) -> Self::S { + let mut consensus = TestConsensus::default(); + consensus.set_fail_validation(self.fail_validation); + + BodyStage { + downloader: (self.downloader_builder)(), + consensus, + batch_size: self.batch_size, + } + } + } + + impl BodyTestRunner + where + F: Fn() -> TestBodyDownloader, + { + /// Insert the genesis block into the appropriate tables + /// + /// The genesis block always has no transactions and no ommers, and it always has the + /// same hash. + pub(crate) fn insert_genesis(&self) -> Result<(), db::Error> { + self.insert_header(&SealedHeader::new(Header::default(), GENESIS_HASH))?; + let mut db = self.db.container(); + let tx = db.get_mut(); + tx.put::( + (0, GENESIS_HASH).into(), + StoredBlockBody { base_tx_id: 0, tx_amount: 0, ommers: vec![] }, + )?; + db.commit()?; + + Ok(()) + } + + /// Insert header into tables + pub(crate) fn insert_header(&self, header: &SealedHeader) -> Result<(), db::Error> { + self.insert_headers(std::iter::once(header)) + } + + /// Insert headers into tables + pub(crate) fn insert_headers<'a, I>(&self, headers: I) -> Result<(), db::Error> + where + I: Iterator, + { + let headers = headers.collect::>(); + self.db + .map_put::(&headers, |h| (h.hash(), h.number))?; + self.db.map_put::(&headers, |h| { + (BlockNumHash((h.number, h.hash())), h.deref().clone().unseal()) + })?; + self.db.map_put::(&headers, |h| { + (h.number, h.hash()) + })?; + + self.db.transform_append::(&headers, |prev, h| { + let prev_td = U256::from_big_endian(&prev.clone().unwrap_or_default()); + ( + BlockNumHash((h.number, h.hash())), + H256::from_uint(&(prev_td + h.difficulty)).as_bytes().to_vec(), + ) + })?; + + Ok(()) + } + + pub(crate) fn last_body(&self) -> Option { + Some( + self.db() + .container() + .get() + .cursor::() + .ok()? + .last() + .ok()?? + .1, + ) + } + + /// Validate that the inserted block data is valid + pub(crate) fn validate_db_blocks( + &self, + highest_block: BlockNumber, + ) -> Result<(), db::Error> { + let db = self.db.container(); + let tx = db.get(); + + let mut block_body_cursor = tx.cursor::()?; + let mut transaction_cursor = tx.cursor::()?; + + let mut entry = block_body_cursor.first()?; + let mut prev_max_tx_id = 0; + while let Some((key, body)) = entry { + assert!( + key.number() <= highest_block, + "We wrote a block body outside of our synced range. Found block with number {}, highest block according to stage is {}", + key.number(), highest_block + ); + + assert!(prev_max_tx_id == body.base_tx_id, "Transaction IDs are malformed."); + for num in 0..body.tx_amount { + let tx_id = body.base_tx_id + num; + assert_matches!( + transaction_cursor.seek_exact(tx_id), + Ok(Some(_)), + "A transaction is missing." + ); + } + prev_max_tx_id = body.base_tx_id + body.tx_amount; + entry = block_body_cursor.next()?; + } + + Ok(()) + } + } + + // TODO(onbjerg): Move + /// A [BodiesClient] that should not be called. + #[derive(Debug)] + pub(crate) struct NoopClient; + + #[async_trait] + impl BodiesClient for NoopClient { + async fn get_block_body(&self, _: H256) -> Result { + panic!("Noop client should not be called") + } + } + + // TODO(onbjerg): Move + /// A [BodyDownloader] that is backed by an internal [HashMap] for testing. + #[derive(Debug, Default)] + pub(crate) struct TestBodyDownloader { + responses: HashMap>, + } + + impl TestBodyDownloader { + pub(crate) fn new(responses: HashMap>) -> Self { + Self { responses } + } + } + + impl BodyDownloader for TestBodyDownloader { + type Client = NoopClient; + + fn timeout(&self) -> Duration { + unreachable!() + } + + fn client(&self) -> &Self::Client { + unreachable!() + } + + fn bodies_stream<'a, 'b, I>(&'a self, hashes: I) -> BodiesStream<'a> + where + I: IntoIterator, + ::IntoIter: Send + 'b, + 'b: 'a, + { + Box::pin(futures_util::stream::iter(hashes.into_iter().map( + |(block_number, hash)| { + Ok(( + *block_number, + *hash, + self.responses + .get(hash) + .expect("Stage tried downloading a block we do not have.") + .clone()?, + )) + }, + ))) + } + } + } +} diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index f24c81547..a20fed002 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -9,10 +9,7 @@ use reth_interfaces::{ models::blocks::BlockNumHash, tables, DBContainer, Database, DatabaseGAT, DbCursorRO, DbCursorRW, DbTx, DbTxMut, }, - p2p::headers::{ - client::HeadersClient, - downloader::{DownloadError, Downloader}, - }, + p2p::headers::{client::HeadersClient, downloader::HeaderDownloader, error::DownloadError}, }; use reth_primitives::{rpc::BigEndianHash, BlockNumber, SealedHeader, H256, U256}; use std::{fmt::Debug, sync::Arc}; @@ -20,9 +17,19 @@ use tracing::*; const HEADERS: StageId = StageId("Headers"); -/// The headers stage implementation for staged sync +/// The headers stage. +/// +/// The headers stage downloads all block headers from the highest block in the local database to +/// the perceived highest block on the network. +/// +/// The headers are processed and data is inserted into these tables: +/// +/// - [`HeaderNumbers`][reth_interfaces::db::tables::HeaderNumbers] +/// - [`Headers`][reth_interfaces::db::tables::Headers] +/// - [`CanonicalHeaders`][reth_interfaces::db::tables::CanonicalHeaders] +/// - [`HeaderTD`][reth_interfaces::db::tables::HeaderTD] #[derive(Debug)] -pub struct HeaderStage { +pub struct HeaderStage { /// Strategy for downloading the headers pub downloader: D, /// Consensus client implementation @@ -32,7 +39,7 @@ pub struct HeaderStage { } #[async_trait::async_trait] -impl Stage +impl Stage for HeaderStage { /// Return the id of the stage @@ -55,7 +62,7 @@ impl Stage // TODO: handle input.max_block let last_hash = tx .get::(last_block_num)? - .ok_or(DatabaseIntegrityError::CannonicalHash { number: last_block_num })?; + .ok_or(DatabaseIntegrityError::CanonicalHash { number: last_block_num })?; let last_header = tx.get::((last_block_num, last_hash).into())?.ok_or({ DatabaseIntegrityError::Header { number: last_block_num, hash: last_hash } @@ -81,14 +88,15 @@ impl Stage done: false, }) } - DownloadError::HeaderValidation { hash, details } => { - warn!("validation error for header {hash}: {details}"); - return Err(StageError::Validation { block: last_block_num }) + DownloadError::HeaderValidation { hash, error } => { + warn!("Validation error for header {hash}: {error}"); + return Err(StageError::Validation { block: last_block_num, error }) } // TODO: this error is never propagated, clean up - DownloadError::MismatchedHeaders { .. } => { - return Err(StageError::Validation { block: last_block_num }) - } + // DownloadError::MismatchedHeaders { .. } => { + // return Err(StageError::Validation { block: last_block_num }) + // } + _ => unreachable!(), }, }; let stage_progress = self.write_headers::(tx, headers).await?.unwrap_or(last_block_num); @@ -116,7 +124,7 @@ impl Stage } } -impl HeaderStage { +impl HeaderStage { async fn update_head( &self, tx: &mut >::TXMut, @@ -124,7 +132,7 @@ impl HeaderStage { ) -> Result<(), StageError> { let hash = tx .get::(height)? - .ok_or(DatabaseIntegrityError::CannonicalHeader { number: height })?; + .ok_or(DatabaseIntegrityError::CanonicalHeader { number: height })?; let td: Vec = tx.get::((height, hash).into())?.unwrap(); // TODO: self.client.update_status(height, hash, H256::from_slice(&td)).await; Ok(()) @@ -184,31 +192,36 @@ mod tests { use super::*; use crate::util::test_utils::StageTestRunner; use assert_matches::assert_matches; - use reth_interfaces::test_utils::{gen_random_header, gen_random_header_range}; - use test_utils::{HeadersTestRunner, TestDownloader}; + use reth_interfaces::{ + consensus, + test_utils::{ + generators::{random_header, random_header_range}, + TestHeaderDownloader, + }, + }; + use test_utils::HeadersTestRunner; const TEST_STAGE: StageId = StageId("Headers"); + /// Check that the execution errors on empty database or + /// prev progress missing from the database. #[tokio::test] - // Check that the execution errors on empty database or - // prev progress missing from the database. async fn execute_empty_db() { let runner = HeadersTestRunner::default(); let rx = runner.execute(ExecInput::default()); assert_matches!( rx.await.unwrap(), - Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::CannonicalHeader { .. })) + Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::CanonicalHeader { .. })) ); } + /// Check that the execution exits on downloader timeout. #[tokio::test] - // Check that the execution exits on downloader timeout. async fn execute_timeout() { - let head = gen_random_header(0, None); - let runner = - HeadersTestRunner::with_downloader(TestDownloader::new(Err(DownloadError::Timeout { - request_id: 0, - }))); + let head = random_header(0, None); + let runner = HeadersTestRunner::with_downloader(TestHeaderDownloader::new(Err( + DownloadError::Timeout { request_id: 0 }, + ))); runner.insert_header(&head).expect("failed to insert header"); let rx = runner.execute(ExecInput::default()); @@ -216,30 +229,33 @@ mod tests { assert_matches!(rx.await.unwrap(), Ok(ExecOutput { done, .. }) if !done); } + /// Check that validation error is propagated during the execution. #[tokio::test] - // Check that validation error is propagated during the execution. async fn execute_validation_error() { - let head = gen_random_header(0, None); - let runner = HeadersTestRunner::with_downloader(TestDownloader::new(Err( - DownloadError::HeaderValidation { hash: H256::zero(), details: "".to_owned() }, + let head = random_header(0, None); + let runner = HeadersTestRunner::with_downloader(TestHeaderDownloader::new(Err( + DownloadError::HeaderValidation { + hash: H256::zero(), + error: consensus::Error::BaseFeeMissing, + }, ))); runner.insert_header(&head).expect("failed to insert header"); let rx = runner.execute(ExecInput::default()); runner.consensus.update_tip(H256::from_low_u64_be(1)); - assert_matches!(rx.await.unwrap(), Err(StageError::Validation { block }) if block == 0); + assert_matches!(rx.await.unwrap(), Err(StageError::Validation { block, error: consensus::Error::BaseFeeMissing, }) if block == 0); } + /// Validate that all necessary tables are updated after the + /// header download on no previous progress. #[tokio::test] - // Validate that all necessary tables are updated after the - // header download on no previous progress. async fn execute_no_progress() { let (start, end) = (0, 100); - let head = gen_random_header(start, None); - let headers = gen_random_header_range(start + 1..end, head.hash()); + let head = random_header(start, None); + let headers = random_header_range(start + 1..end, head.hash()); let result = headers.iter().rev().cloned().collect::>(); - let runner = HeadersTestRunner::with_downloader(TestDownloader::new(Ok(result))); + let runner = HeadersTestRunner::with_downloader(TestHeaderDownloader::new(Ok(result))); runner.insert_header(&head).expect("failed to insert header"); let rx = runner.execute(ExecInput::default()); @@ -251,19 +267,19 @@ mod tests { Ok(ExecOutput { done, reached_tip, stage_progress }) if done && reached_tip && stage_progress == tip.number ); - assert!(headers.iter().try_for_each(|h| runner.validate_db_header(&h)).is_ok()); + assert!(headers.iter().try_for_each(|h| runner.validate_db_header(h)).is_ok()); } + /// Validate that all necessary tables are updated after the + /// header download with some previous progress. #[tokio::test] - // Validate that all necessary tables are updated after the - // header download with some previous progress. async fn execute_prev_progress() { let (start, end) = (10000, 10241); - let head = gen_random_header(start, None); - let headers = gen_random_header_range(start + 1..end, head.hash()); + let head = random_header(start, None); + let headers = random_header_range(start + 1..end, head.hash()); let result = headers.iter().rev().cloned().collect::>(); - let runner = HeadersTestRunner::with_downloader(TestDownloader::new(Ok(result))); + let runner = HeadersTestRunner::with_downloader(TestHeaderDownloader::new(Ok(result))); runner.insert_header(&head).expect("failed to insert header"); let rx = runner.execute(ExecInput { @@ -278,15 +294,15 @@ mod tests { Ok(ExecOutput { done, reached_tip, stage_progress }) if done && reached_tip && stage_progress == tip.number ); - assert!(headers.iter().try_for_each(|h| runner.validate_db_header(&h)).is_ok()); + assert!(headers.iter().try_for_each(|h| runner.validate_db_header(h)).is_ok()); } + /// Execute the stage with linear downloader #[tokio::test] - // Execute the stage with linear downloader async fn execute_with_linear_downloader() { let (start, end) = (1000, 1200); - let head = gen_random_header(start, None); - let headers = gen_random_header_range(start + 1..end, head.hash()); + let head = random_header(start, None); + let headers = random_header_range(start + 1..end, head.hash()); let runner = HeadersTestRunner::with_linear_downloader(); runner.insert_header(&head).expect("failed to insert header"); @@ -315,11 +331,11 @@ mod tests { Ok(ExecOutput { done, reached_tip, stage_progress }) if done && reached_tip && stage_progress == tip.number ); - assert!(headers.iter().try_for_each(|h| runner.validate_db_header(&h)).is_ok()); + assert!(headers.iter().try_for_each(|h| runner.validate_db_header(h)).is_ok()); } + /// Check that unwind does not panic on empty database. #[tokio::test] - // Check that unwind does not panic on empty database. async fn unwind_empty_db() { let unwind_to = 100; let runner = HeadersTestRunner::default(); @@ -331,13 +347,13 @@ mod tests { ); } + /// Check that unwind can remove headers across gaps #[tokio::test] - // Check that unwind can remove headers across gaps async fn unwind_db_gaps() { let runner = HeadersTestRunner::default(); - let head = gen_random_header(0, None); - let first_range = gen_random_header_range(1..20, head.hash()); - let second_range = gen_random_header_range(50..100, H256::zero()); + let head = random_header(0, None); + let first_range = random_header_range(1..20, head.hash()); + let second_range = random_header_range(50..100, H256::zero()); runner.insert_header(&head).expect("failed to insert header"); runner .insert_headers(first_range.iter().chain(second_range.iter())) @@ -374,36 +390,34 @@ mod tests { stages::headers::HeaderStage, util::test_utils::{StageTestDB, StageTestRunner}, }; - use async_trait::async_trait; use reth_headers_downloaders::linear::{LinearDownloadBuilder, LinearDownloader}; use reth_interfaces::{ - consensus::ForkchoiceState, db::{self, models::blocks::BlockNumHash, tables, DbTx}, - p2p::headers::downloader::{DownloadError, Downloader}, - test_utils::{TestConsensus, TestHeadersClient}, + p2p::headers::downloader::HeaderDownloader, + test_utils::{TestConsensus, TestHeaderDownloader, TestHeadersClient}, }; use reth_primitives::{rpc::BigEndianHash, SealedHeader, H256, U256}; - use std::{ops::Deref, sync::Arc, time::Duration}; + use std::{ops::Deref, sync::Arc}; - pub(crate) struct HeadersTestRunner { + pub(crate) struct HeadersTestRunner { pub(crate) consensus: Arc, pub(crate) client: Arc, downloader: Arc, db: StageTestDB, } - impl Default for HeadersTestRunner { + impl Default for HeadersTestRunner { fn default() -> Self { Self { client: Arc::new(TestHeadersClient::default()), consensus: Arc::new(TestConsensus::default()), - downloader: Arc::new(TestDownloader::new(Ok(Vec::default()))), + downloader: Arc::new(TestHeaderDownloader::new(Ok(Vec::default()))), db: StageTestDB::default(), } } } - impl StageTestRunner for HeadersTestRunner { + impl StageTestRunner for HeadersTestRunner { type S = HeaderStage, TestConsensus, TestHeadersClient>; fn db(&self) -> &StageTestDB { @@ -423,13 +437,14 @@ mod tests { pub(crate) fn with_linear_downloader() -> Self { let client = Arc::new(TestHeadersClient::default()); let consensus = Arc::new(TestConsensus::default()); - let downloader = - Arc::new(LinearDownloadBuilder::new().build(consensus.clone(), client.clone())); + let downloader = Arc::new( + LinearDownloadBuilder::default().build(consensus.clone(), client.clone()), + ); Self { client, consensus, downloader, db: StageTestDB::default() } } } - impl HeadersTestRunner { + impl HeadersTestRunner { pub(crate) fn with_downloader(downloader: D) -> Self { HeadersTestRunner { client: Arc::new(TestHeadersClient::default()), @@ -501,42 +516,5 @@ mod tests { Ok(()) } } - - #[derive(Debug)] - pub(crate) struct TestDownloader { - result: Result, DownloadError>, - } - - impl TestDownloader { - pub(crate) fn new(result: Result, DownloadError>) -> Self { - Self { result } - } - } - - #[async_trait] - impl Downloader for TestDownloader { - type Consensus = TestConsensus; - type Client = TestHeadersClient; - - fn timeout(&self) -> Duration { - Duration::from_secs(1) - } - - fn consensus(&self) -> &Self::Consensus { - unimplemented!() - } - - fn client(&self) -> &Self::Client { - unimplemented!() - } - - async fn download( - &self, - _: &SealedHeader, - _: &ForkchoiceState, - ) -> Result, DownloadError> { - self.result.clone() - } - } } } diff --git a/crates/stages/src/stages/mod.rs b/crates/stages/src/stages/mod.rs index 2eb1b3483..b75b4e269 100644 --- a/crates/stages/src/stages/mod.rs +++ b/crates/stages/src/stages/mod.rs @@ -1,3 +1,5 @@ +/// The bodies stage. +pub mod bodies; /// The headers stage. pub mod headers; /// The cumulative transaction index stage. diff --git a/crates/stages/src/stages/tx_index.rs b/crates/stages/src/stages/tx_index.rs index a2d986ba7..64ac97021 100644 --- a/crates/stages/src/stages/tx_index.rs +++ b/crates/stages/src/stages/tx_index.rs @@ -37,13 +37,13 @@ impl Stage for TxIndex { let last_block = input.stage_progress.unwrap_or_default(); let last_hash = tx .get::(last_block)? - .ok_or(DatabaseIntegrityError::CannonicalHeader { number: last_block })?; + .ok_or(DatabaseIntegrityError::CanonicalHeader { number: last_block })?; // The start block for this iteration let start_block = last_block + 1; let start_hash = tx .get::(start_block)? - .ok_or(DatabaseIntegrityError::CannonicalHeader { number: start_block })?; + .ok_or(DatabaseIntegrityError::CanonicalHeader { number: start_block })?; // The maximum block that this stage should insert to let max_block = input.previous_stage.as_ref().map(|(_, block)| *block).unwrap_or_default(); @@ -65,8 +65,8 @@ impl Stage for TxIndex { // Aggregate and insert cumulative transaction count for each block number for entry in entries { - let (key, tx_count) = entry?; - count += tx_count as u64; + let (key, body) = entry?; + count += body.tx_amount; cursor.append(key, count)?; } @@ -89,7 +89,7 @@ mod tests { use super::*; use crate::util::test_utils::{StageTestDB, StageTestRunner}; use assert_matches::assert_matches; - use reth_interfaces::{db::models::BlockNumHash, test_utils::gen_random_header_range}; + use reth_interfaces::{db::models::BlockNumHash, test_utils::generators::random_header_range}; use reth_primitives::H256; const TEST_STAGE: StageId = StageId("PrevStage"); @@ -100,14 +100,14 @@ mod tests { let rx = runner.execute(ExecInput::default()); assert_matches!( rx.await.unwrap(), - Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::CannonicalHeader { .. })) + Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::CanonicalHeader { .. })) ); } #[tokio::test] async fn execute_no_prev_tx_count() { let runner = TxIndexTestRunner::default(); - let headers = gen_random_header_range(0..10, H256::zero()); + let headers = random_header_range(0..10, H256::zero()); runner .db() .map_put::(&headers, |h| (h.number, h.hash())) @@ -129,7 +129,7 @@ mod tests { async fn execute() { let runner = TxIndexTestRunner::default(); let (start, pivot, end) = (0, 100, 200); - let headers = gen_random_header_range(start..end, H256::zero()); + let headers = random_header_range(start..end, H256::zero()); runner .db() .map_put::(&headers, |h| (h.number, h.hash())) @@ -170,7 +170,7 @@ mod tests { #[tokio::test] async fn unwind_no_input() { let runner = TxIndexTestRunner::default(); - let headers = gen_random_header_range(0..10, H256::zero()); + let headers = random_header_range(0..10, H256::zero()); runner .db() .transform_append::(&headers, |prev, h| { @@ -195,8 +195,8 @@ mod tests { #[tokio::test] async fn unwind_with_db_gaps() { let runner = TxIndexTestRunner::default(); - let first_range = gen_random_header_range(0..20, H256::zero()); - let second_range = gen_random_header_range(50..100, H256::zero()); + let first_range = random_header_range(0..20, H256::zero()); + let second_range = random_header_range(50..100, H256::zero()); runner .db() .transform_append::( diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index ffd23ded0..8a80d6d47 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -67,8 +67,8 @@ //! The final `TransactionPool` is made up of two layers: //! //! The lowest layer is the actual pool implementations that manages (validated) transactions: -//! [`TxPool`](crate::pool::TxPool). This is contained in a higher level pool type that guards the -//! low level pool and handles additional listeners or metrics: +//! [`TxPool`](crate::pool::txpool::TxPool). This is contained in a higher level pool type that +//! guards the low level pool and handles additional listeners or metrics: //! [`PoolInner`](crate::pool::PoolInner) //! //! The transaction pool will be used by separate consumers (RPC, P2P), to make sharing easier, the