diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 8b3e56363..f59c3e4c7 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -149,7 +149,8 @@ fn init_genesis(db: Arc, genesis: Genesis) -> Result(0, hash)?; tx.put::(hash, 0)?; - tx.put::((0, hash).into(), 0)?; + tx.put::((0, hash).into(), Default::default())?; + tx.put::((0, hash).into(), 0)?; tx.put::((0, hash).into(), header.difficulty.into())?; tx.put::((0, hash).into(), header)?; diff --git a/crates/executor/src/executor.rs b/crates/executor/src/executor.rs index a40ce436d..ed895b8ae 100644 --- a/crates/executor/src/executor.rs +++ b/crates/executor/src/executor.rs @@ -53,9 +53,9 @@ impl AccountInfoChangeSet { /// Apply the changes from the changeset to a database transaction. pub fn apply_to_db<'a, TX: DbTxMut<'a>>( self, + tx: &TX, address: Address, tx_index: u64, - tx: &TX, ) -> Result<(), DbError> { match self { AccountInfoChangeSet::Changed { old, new } => { @@ -105,6 +105,7 @@ pub struct AccountChangeSet { /// Execution Result containing vector of transaction changesets /// and block reward if present +#[derive(Debug)] pub struct ExecutionResult { /// Transaction changeest contraining [Receipt], changed [Accounts][Account] and Storages. pub changeset: Vec, @@ -586,7 +587,7 @@ mod tests { // check Changed changeset AccountInfoChangeSet::Changed { new: acc1, old: acc2 } - .apply_to_db(address, tx_num, &tx) + .apply_to_db(&tx, address, tx_num) .unwrap(); assert_eq!( tx.get::(tx_num), @@ -594,7 +595,7 @@ mod tests { ); assert_eq!(tx.get::(address), Ok(Some(acc1))); - AccountInfoChangeSet::Created { new: acc1 }.apply_to_db(address, tx_num, &tx).unwrap(); + AccountInfoChangeSet::Created { new: acc1 }.apply_to_db(&tx, address, tx_num).unwrap(); assert_eq!( tx.get::(tx_num), Ok(Some(AccountBeforeTx { address, info: None })) @@ -604,7 +605,7 @@ mod tests { // delete old value, as it is dupsorted tx.delete::(tx_num, None).unwrap(); - AccountInfoChangeSet::Destroyed { old: acc2 }.apply_to_db(address, tx_num, &tx).unwrap(); + AccountInfoChangeSet::Destroyed { old: acc2 }.apply_to_db(&tx, address, tx_num).unwrap(); assert_eq!(tx.get::(address), Ok(None)); assert_eq!( tx.get::(tx_num), diff --git a/crates/interfaces/src/executor.rs b/crates/interfaces/src/executor.rs index 848db251f..3aa925982 100644 --- a/crates/interfaces/src/executor.rs +++ b/crates/interfaces/src/executor.rs @@ -27,11 +27,11 @@ pub enum Error { ReceiptLogDiff, #[error("Receipt log is different.")] ExecutionSuccessDiff { got: bool, expected: bool }, - #[error("Receipt root {got:?} is different then expected {expected:?}.")] + #[error("Receipt root {got:?} is different than expected {expected:?}.")] ReceiptRootDiff { got: H256, expected: H256 }, - #[error("Header bloom filter {got:?} is different then expected {expected:?}.")] + #[error("Header bloom filter {got:?} is different than expected {expected:?}.")] BloomLogDiff { got: Box, expected: Box }, - #[error("Transaction gas limit {transaction_gas_limit} is more then blocks available gas {block_available_gas}")] + #[error("Transaction gas limit {transaction_gas_limit} is more than blocks available gas {block_available_gas}")] TransactionGasLimitMoreThenAvailableBlockGas { transaction_gas_limit: u64, block_available_gas: u64, diff --git a/crates/interfaces/src/p2p/bodies/client.rs b/crates/interfaces/src/p2p/bodies/client.rs index 69d440eff..78ee1375b 100644 --- a/crates/interfaces/src/p2p/bodies/client.rs +++ b/crates/interfaces/src/p2p/bodies/client.rs @@ -8,5 +8,5 @@ use reth_primitives::H256; #[auto_impl::auto_impl(&, Arc, Box)] pub trait BodiesClient: DownloadClient { /// Fetches the block body for the requested block. - async fn get_block_body(&self, hash: Vec) -> PeerRequestResult>; + async fn get_block_bodies(&self, hashes: Vec) -> PeerRequestResult>; } diff --git a/crates/interfaces/src/p2p/bodies/downloader.rs b/crates/interfaces/src/p2p/bodies/downloader.rs index 5308471e0..86c42ea38 100644 --- a/crates/interfaces/src/p2p/bodies/downloader.rs +++ b/crates/interfaces/src/p2p/bodies/downloader.rs @@ -1,6 +1,25 @@ use crate::p2p::downloader::{DownloadStream, Downloader}; use reth_primitives::{BlockLocked, SealedHeader}; +/// The block response +#[derive(PartialEq, Eq, Debug)] +pub enum BlockResponse { + /// Full block response (with transactions or ommers) + Full(BlockLocked), + /// The empty block response + Empty(SealedHeader), +} + +impl BlockResponse { + /// Return the reference to the response header + pub fn header(&self) -> &SealedHeader { + match self { + BlockResponse::Full(block) => &block.header, + BlockResponse::Empty(header) => header, + } + } +} + /// A downloader capable of fetching block bodies from header hashes. /// /// A downloader represents a distinct strategy for submitting requests to download block bodies, @@ -19,7 +38,7 @@ pub trait BodyDownloader: Downloader { /// /// It is *not* guaranteed that all the requested bodies are fetched: the downloader may close /// the stream before the entire range has been fetched for any reason - fn bodies_stream<'a, 'b, I>(&'a self, headers: I) -> DownloadStream<'a, BlockLocked> + fn bodies_stream<'a, 'b, I>(&'a self, headers: I) -> DownloadStream<'a, BlockResponse> where I: IntoIterator, ::IntoIter: Send + 'b, diff --git a/crates/interfaces/src/provider.rs b/crates/interfaces/src/provider.rs index 5ae00420e..7ea8c71a7 100644 --- a/crates/interfaces/src/provider.rs +++ b/crates/interfaces/src/provider.rs @@ -4,12 +4,12 @@ use reth_primitives::{BlockHash, BlockNumber}; #[allow(missing_docs)] #[derive(Debug, thiserror::Error, PartialEq, Eq, Clone)] pub enum Error { - #[error("Block Number {block_number:?} does not exist in database")] - BlockNumberNotExists { block_number: BlockNumber }, - #[error("Block tx cumulative number for hash {block_hash:?} does not exist in database")] - BlockTxNumberNotExists { block_hash: BlockHash }, - #[error("Block hash {block_hash:?} does not exists in Headers table")] - BlockHashNotExist { block_hash: BlockHash }, - #[error("Block body not exists #{block_number} {block_hash}")] - BlockBodyNotExist { block_number: BlockNumber, block_hash: BlockHash }, + #[error("Block number {block_number} does not exist in database")] + BlockNumber { block_number: BlockNumber }, + #[error("Block hash {block_hash:?} does not exist in Headers table")] + BlockHash { block_hash: BlockHash }, + #[error("Block body not exists #{block_number} ({block_hash:?})")] + BlockBody { block_number: BlockNumber, block_hash: BlockHash }, + #[error("Block transition does not exist for block #{block_number} ({block_hash:?})")] + BlockTransition { block_number: BlockNumber, block_hash: BlockHash }, } diff --git a/crates/interfaces/src/test_utils/bodies.rs b/crates/interfaces/src/test_utils/bodies.rs index 360890454..aba455c55 100644 --- a/crates/interfaces/src/test_utils/bodies.rs +++ b/crates/interfaces/src/test_utils/bodies.rs @@ -29,7 +29,7 @@ impl BodiesClient for TestBodiesClient where F: Fn(Vec) -> PeerRequestResult> + Send + Sync, { - async fn get_block_body(&self, hashes: Vec) -> PeerRequestResult> { + async fn get_block_bodies(&self, hashes: Vec) -> PeerRequestResult> { (self.responder)(hashes) } } diff --git a/crates/net/downloaders/src/bodies/concurrent.rs b/crates/net/downloaders/src/bodies/concurrent.rs index 320341f2f..9d73aa1df 100644 --- a/crates/net/downloaders/src/bodies/concurrent.rs +++ b/crates/net/downloaders/src/bodies/concurrent.rs @@ -3,9 +3,12 @@ use futures_util::{stream, StreamExt, TryStreamExt}; use reth_interfaces::{ consensus::Consensus as ConsensusTrait, p2p::{ - bodies::{client::BodiesClient, downloader::BodyDownloader}, + bodies::{ + client::BodiesClient, + downloader::{BlockResponse, BodyDownloader}, + }, downloader::{DownloadStream, Downloader}, - error::{DownloadError, DownloadResult}, + error::{DownloadError, DownloadResult, RequestError}, }, }; use reth_primitives::{BlockLocked, SealedHeader}; @@ -50,7 +53,7 @@ where Client: BodiesClient, Consensus: ConsensusTrait, { - fn bodies_stream<'a, 'b, I>(&'a self, headers: I) -> DownloadStream<'a, BlockLocked> + fn bodies_stream<'a, 'b, I>(&'a self, headers: I) -> DownloadStream<'a, BlockResponse> where I: IntoIterator, ::IntoIter: Send + 'b, @@ -104,33 +107,75 @@ where self } - /// Fetch a batch of block bodies. - async fn fetch_bodies(&self, headers: Vec<&SealedHeader>) -> DownloadResult> { - let (peer_id, response) = self - .client - .get_block_body(headers.iter().map(|header| header.hash()).collect()) - .await? - .split(); + /// Given a batch of headers, this function proceeds to: + /// 1. Filter for all the non-empty headers + /// 2. Return early with the header values, if there were no non-empty headers, else.. + /// 3. Request the bodies for the non-empty headers from a peer chosen by the network client + /// 4. For any non-empty headers, it proceeds to validate the corresponding body from the peer + /// and return it as part of the response via the [`BlockResponse::Full`] variant. + /// + /// NB: This assumes that peers respond with bodies in the order that they were requested. + /// This is a reasonable assumption to make as that's [what Geth + /// does](https://github.com/ethereum/go-ethereum/blob/f53ff0ff4a68ffc56004ab1d5cc244bcb64d3277/les/server_requests.go#L245). + /// All errors regarding the response cause the peer to get penalized, meaning that adversaries + /// that try to give us bodies that do not match the requested order are going to be penalized + /// and eventually disconnected. + async fn fetch_bodies( + &self, + headers: Vec<&SealedHeader>, + ) -> DownloadResult> { + let headers_with_txs_and_ommers = + headers.iter().filter(|h| !h.is_empty()).map(|h| h.hash()).collect::>(); + if headers_with_txs_and_ommers.is_empty() { + return Ok(headers.into_iter().cloned().map(BlockResponse::Empty).collect()) + } + + let (peer_id, bodies) = + self.client.get_block_bodies(headers_with_txs_and_ommers).await?.split(); + + let mut bodies = bodies.into_iter(); + + let mut responses = vec![]; + for header in headers.into_iter().cloned() { + // If the header has no txs / ommers, just push it and continue + if header.is_empty() { + responses.push(BlockResponse::Empty(header)); + } else { + // If the header was not empty, and there is no body, then + // the peer gave us a bad resopnse and we should return + // error. + let body = match bodies.next() { + Some(body) => body, + None => { + self.client.report_bad_message(peer_id); + // TODO: We error always, this means that if we got no body from a peer + // and the header was not empty we will discard a bunch of progress. + // This will cause redundant bandwdith usage (due to how our retriable + // stream works via std::iter), but we will address this + // with a custom retriable stream that maintains state and is able to + // "resume" progress from the current state of `responses`. + return Err(DownloadError::RequestError(RequestError::BadResponse)) + } + }; - response - .into_iter() - .zip(headers) - .map(|(body, header)| { let block = BlockLocked { header: header.clone(), body: body.transactions, ommers: body.ommers.into_iter().map(|header| header.seal()).collect(), }; - match self.consensus.pre_validate_block(&block) { - Ok(_) => Ok(block), - Err(error) => { - self.client.report_bad_message(peer_id); - Err(DownloadError::BlockValidation { hash: header.hash(), error }) - } - } - }) - .collect() + // This ensures that the TxRoot and OmmersRoot from the header match the + // ones calculated manually from the block body. + self.consensus.pre_validate_block(&block).map_err(|error| { + self.client.report_bad_message(peer_id); + DownloadError::BlockValidation { hash: header.hash(), error } + })?; + + responses.push(BlockResponse::Full(block)); + } + } + + Ok(responses) } } @@ -146,7 +191,7 @@ mod tests { p2p::{bodies::downloader::BodyDownloader, error::RequestError}, test_utils::TestConsensus, }; - use reth_primitives::{PeerId, H256}; + use reth_primitives::{Header, PeerId, H256}; use std::{ sync::{ atomic::{AtomicUsize, Ordering}, @@ -190,7 +235,7 @@ mod tests { assert_matches!( downloader .bodies_stream(headers.clone().iter()) - .try_collect::>() + .try_collect::>() .await, Ok(responses) => { assert_eq!( @@ -199,13 +244,13 @@ mod tests { .into_iter() .map(| header | { let body = bodies .remove(&header.hash()).unwrap(); - BlockLocked { + BlockResponse::Full(BlockLocked { header, body: body.transactions, ommers: body.ommers.into_iter().map(|o| o.seal()).collect(), - } + }) }) - .collect::>() + .collect::>() ); } ); @@ -221,8 +266,10 @@ mod tests { Arc::new(TestConsensus::default()), ); + let headers = &[Header { ommers_hash: H256::default(), ..Default::default() }.seal()]; + let mut stream = downloader.bodies_stream(headers); assert_matches!( - downloader.bodies_stream(&[SealedHeader::default()]).next().await, + stream.next().await, Some(Err(DownloadError::RequestError(RequestError::ChannelClosed))) ); } @@ -251,10 +298,11 @@ mod tests { Arc::new(TestConsensus::default()), ); + let header = Header { ommers_hash: H256::default(), ..Default::default() }.seal(); assert_matches!( - downloader.bodies_stream(&[SealedHeader::default()]).next().await, - Some(Ok(body)) => { - assert_eq!(body, BlockLocked::default()); + downloader.bodies_stream(&[header.clone()]).next().await, + Some(Ok(BlockResponse::Full(block))) => { + assert_eq!(block.header, header); } ); assert_eq!(Arc::try_unwrap(retries_left).unwrap().into_inner(), 0); @@ -286,7 +334,12 @@ mod tests { .with_retries(0); assert_matches!( - downloader.bodies_stream(&[SealedHeader::default()]).next().await, + downloader + .bodies_stream(&[ + Header { ommers_hash: H256::default(), ..Default::default() }.seal() + ]) + .next() + .await, Some(Err(DownloadError::RequestError(RequestError::Timeout))) ); assert_eq!(Arc::try_unwrap(retries_left).unwrap().into_inner(), 2); diff --git a/crates/net/downloaders/src/test_utils.rs b/crates/net/downloaders/src/test_utils.rs index ecf2af5cd..38fb451c5 100644 --- a/crates/net/downloaders/src/test_utils.rs +++ b/crates/net/downloaders/src/test_utils.rs @@ -65,7 +65,7 @@ where F: FnMut(Vec) -> Fut + Send + Sync, Fut: Future>> + Send, { - async fn get_block_body(&self, hash: Vec) -> PeerRequestResult> { + async fn get_block_bodies(&self, hash: Vec) -> PeerRequestResult> { let f = &mut *self.0.lock().await; (f)(hash).await } diff --git a/crates/net/network/src/fetch/client.rs b/crates/net/network/src/fetch/client.rs index 0e3c57daa..1168b5b0c 100644 --- a/crates/net/network/src/fetch/client.rs +++ b/crates/net/network/src/fetch/client.rs @@ -41,7 +41,7 @@ impl HeadersClient for FetchClient { #[async_trait::async_trait] impl BodiesClient for FetchClient { - async fn get_block_body(&self, request: Vec) -> PeerRequestResult> { + async fn get_block_bodies(&self, request: Vec) -> PeerRequestResult> { let (response, rx) = oneshot::channel(); self.request_tx.send(DownloadRequest::GetBlockBodies { request, response })?; rx.await? diff --git a/crates/net/network/tests/it/requests.rs b/crates/net/network/tests/it/requests.rs index 400ee48d8..5bccc6ce0 100644 --- a/crates/net/network/tests/it/requests.rs +++ b/crates/net/network/tests/it/requests.rs @@ -64,7 +64,7 @@ async fn test_get_body() { mock_provider.add_block(block_hash, block.clone()); - let res = fetch0.get_block_body(vec![block_hash]).await; + let res = fetch0.get_block_bodies(vec![block_hash]).await; assert!(res.is_ok()); let blocks = res.unwrap().1; diff --git a/crates/primitives/src/header.rs b/crates/primitives/src/header.rs index 1b3dd5253..7a8d5eaf5 100644 --- a/crates/primitives/src/header.rs +++ b/crates/primitives/src/header.rs @@ -100,6 +100,11 @@ impl Header { H256::from_slice(keccak256(&out).as_slice()) } + /// Checks if the header is empty - has no transactions and no ommers + pub fn is_empty(&self) -> bool { + self.ommers_hash == EMPTY_LIST_HASH && self.transactions_root == EMPTY_ROOT + } + /// Calculate hash and seal the Header so that it can't be changed. pub fn seal(self) -> SealedHeader { let hash = self.hash_slow(); diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index dd3594370..055262152 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -70,6 +70,8 @@ pub type ChainId = u64; pub type StorageKey = H256; /// An account storage value. pub type StorageValue = U256; +/// The ID of block/transaction transition (represents state transition) +pub type TransitionId = u64; pub use ethers_core::{ types as rpc, diff --git a/crates/stages/src/db.rs b/crates/stages/src/db.rs index 13e5f1d33..6ca8c3ca9 100644 --- a/crates/stages/src/db.rs +++ b/crates/stages/src/db.rs @@ -1,3 +1,4 @@ +#![allow(dead_code)] use std::{ fmt::Debug, ops::{Deref, DerefMut}, @@ -6,13 +7,13 @@ use std::{ use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, database::{Database, DatabaseGAT}, - models::{BlockNumHash, NumTransactions}, + models::{BlockNumHash, StoredBlockBody}, table::Table, tables, transaction::{DbTx, DbTxMut}, Error, }; -use reth_primitives::{BlockHash, BlockNumber, TxNumber}; +use reth_primitives::{BlockHash, BlockNumber, TransitionId, TxNumber}; use crate::{DatabaseIntegrityError, StageError}; @@ -118,77 +119,62 @@ where Ok((number, self.get_block_hash(number)?).into()) } - /// Query [tables::CumulativeTxCount] table for total transaction - /// count block by [BlockNumHash] key - pub(crate) fn get_tx_count(&self, key: BlockNumHash) -> Result { - let count = self.get::(key)?.ok_or( - DatabaseIntegrityError::CumulativeTxCount { number: key.number(), hash: key.hash() }, - )?; - Ok(count) + /// Query the block body by [BlockNumHash] key + pub(crate) fn get_block_body(&self, key: BlockNumHash) -> Result { + let body = self + .get::(key)? + .ok_or(DatabaseIntegrityError::BlockBody { number: key.number() })?; + Ok(body) } - /// Get id for the first **potential** transaction in a block by looking up - /// the cumulative transaction count at the previous block. - /// - /// This function does not care whether the block is empty. - pub(crate) fn get_first_tx_id(&self, block: BlockNumber) -> Result { - // Handle genesis block + /// Query the block body by number + pub(crate) fn get_block_body_by_num( + &self, + number: BlockNumber, + ) -> Result { + let key = self.get_block_numhash(number)?; + self.get_block_body(key) + } + + /// Query the last transition of the block by [BlockNumHash] key + pub(crate) fn get_block_transition( + &self, + key: BlockNumHash, + ) -> Result { + let last_transition_id = self.get::(key)?.ok_or( + DatabaseIntegrityError::BlockTransition { number: key.number(), hash: key.hash() }, + )?; + Ok(last_transition_id) + } + + /// Query the last transition of the block by number + pub(crate) fn get_block_transition_by_num( + &self, + number: BlockNumber, + ) -> Result { + let key = self.get_block_numhash(number)?; + self.get_block_transition(key) + } + + /// Get the next start transaction id and transition for the `block` by looking at the previous + /// block. Returns Zero/Zero for Genesis. + pub(crate) fn get_next_block_ids( + &self, + block: BlockNumber, + ) -> Result<(TxNumber, TransitionId), StageError> { if block == 0 { - return Ok(0) + return Ok((0, 0)) } let prev_key = self.get_block_numhash(block - 1)?; - self.get_tx_count(prev_key) - } - - /// Get id of the last transaction in the block. - /// Returns [None] if the block is empty. - /// - /// The blocks must exist in the database. - #[allow(dead_code)] - pub(crate) fn get_last_tx_id( - &self, - block: BlockNumber, - ) -> Result, StageError> { - let key = self.get_block_numhash(block)?; - - let mut cursor = self.cursor::()?; - let (_, tx_count) = - cursor.seek_exact(key)?.ok_or(DatabaseIntegrityError::CumulativeTxCount { - number: key.number(), - hash: key.hash(), - })?; - - let is_empty = { - if block != 0 { - let (_, prev_tx_count) = - cursor.prev()?.ok_or(DatabaseIntegrityError::CumulativeTxCount { - number: key.number() + 1, - hash: self.get_block_hash(key.number() + 1)?, - })?; - tx_count != prev_tx_count - } else { - tx_count == 0 - } - }; - - Ok(if !is_empty { Some(tx_count - 1) } else { None }) - } - - /// Get id of the latest transaction observed before a given block (inclusive). - /// Returns error if there are no transactions in the database. - pub(crate) fn get_latest_tx_id( - &self, - up_to_block: BlockNumber, - ) -> Result { - let key = self.get_block_numhash(up_to_block)?; - let tx_count = self.get_tx_count(key)?; - if tx_count != 0 { - Ok(tx_count - 1) - } else { - // No transactions in the database - Err(DatabaseIntegrityError::Transaction { id: 0 }.into()) - } + let prev_body = self.get_block_body(prev_key)?; + let last_transition = self.get::(prev_key)?.ok_or( + DatabaseIntegrityError::BlockTransition { + number: prev_key.number(), + hash: prev_key.hash(), + }, + )?; + Ok((prev_body.start_tx_id + prev_body.tx_count, last_transition + 1)) } /// Unwind table by some number key diff --git a/crates/stages/src/error.rs b/crates/stages/src/error.rs index 8a7b2a04a..3b3ec20f1 100644 --- a/crates/stages/src/error.rs +++ b/crates/stages/src/error.rs @@ -1,6 +1,6 @@ use crate::pipeline::PipelineEvent; use reth_interfaces::{consensus, db::Error as DbError, executor}; -use reth_primitives::{BlockNumber, TxNumber, H256}; +use reth_primitives::{BlockHash, BlockNumber, TxNumber, H256}; use thiserror::Error; use tokio::sync::mpsc::error::SendError; @@ -76,27 +76,27 @@ pub enum DatabaseIntegrityError { number: BlockNumber, }, /// A header is missing from the database. - #[error("No header for block #{number} ({hash})")] + #[error("No header for block #{number} ({hash:?})")] Header { /// The block number key number: BlockNumber, /// The block hash key hash: H256, }, - /// The cumulative transaction count is missing from the database. - #[error("No cumulative tx count for #{number} ({hash})")] - CumulativeTxCount { - /// The block number key - number: BlockNumber, - /// The block hash key - hash: H256, - }, /// A block body is missing. #[error("Block body not found for block #{number}")] BlockBody { /// The block number key number: BlockNumber, }, + /// The transaction is missing + #[error("Transaction #{id} not found")] + Transaction { + /// The transaction id + id: TxNumber, + }, + #[error("Block transition not found for block #{number} ({hash:?})")] + BlockTransition { number: BlockNumber, hash: BlockHash }, #[error("Gap in transaction table. Missing tx number #{missing}.")] TransactionsGap { missing: TxNumber }, #[error("Gap in transaction signer table. Missing tx number #{missing}.")] @@ -111,12 +111,6 @@ pub enum DatabaseIntegrityError { /// The block number key number: BlockNumber, }, - /// The transaction is missing - #[error("Transaction #{id} not found")] - Transaction { - /// The transaction id - id: TxNumber, - }, } /// A pipeline execution error. diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index 3fa538235..f8dcbaa6a 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -6,15 +6,15 @@ use futures_util::StreamExt; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, database::{Database, DatabaseGAT}, - models::StoredBlockOmmers, + models::{StoredBlockBody, StoredBlockOmmers}, tables, transaction::{DbTx, DbTxMut}, }; -use reth_interfaces::{consensus::Consensus, p2p::bodies::downloader::BodyDownloader}; -use reth_primitives::{ - proofs::{EMPTY_LIST_HASH, EMPTY_ROOT}, - BlockNumber, SealedHeader, +use reth_interfaces::{ + consensus::Consensus, + p2p::bodies::downloader::{BlockResponse, BodyDownloader}, }; +use reth_primitives::{BlockNumber, SealedHeader}; use std::{fmt::Debug, sync::Arc}; use tracing::{error, warn}; @@ -87,31 +87,35 @@ impl Stage for BodyStage(db, starting_block, target)?; - // Cursors used to write bodies and transactions + // Cursors used to write bodies, ommers and transactions + let mut body_cursor = db.cursor_mut::()?; let mut ommers_cursor = db.cursor_mut::()?; let mut tx_cursor = db.cursor_mut::()?; - let mut tx_count_cursor = db.cursor_mut::()?; + + // Cursors used to write state transition mapping + let mut block_transition_cursor = db.cursor_mut::()?; + let mut tx_transition_cursor = db.cursor_mut::()?; // Get id for the first transaction in the block - let mut first_tx_id = db.get_first_tx_id(starting_block)?; + let (mut current_tx_id, mut transition_id) = db.get_next_block_ids(starting_block)?; // NOTE(onbjerg): The stream needs to live here otherwise it will just create a new iterator // on every iteration of the while loop -_- let mut bodies_stream = self.downloader.bodies_stream(bodies_to_download.iter()); - let mut highest_block = previous_block; + let mut highest_block = stage_progress; while let Some(result) = bodies_stream.next().await { - let Ok(block) = result else { + let Ok(response) = result else { error!( "Encountered an error downloading block {}: {:?}", highest_block + 1, @@ -123,34 +127,61 @@ impl Stage for BodyStage(transaction.hash(), first_tx_id)?; - // Append the transaction - tx_cursor.append(first_tx_id, transaction)?; - first_tx_id += 1; + // Write block + let block_header = response.header(); + let block_number = block_header.number; + let block_key = block_header.num_hash().into(); + + match response { + BlockResponse::Full(block) => { + body_cursor.append( + block_key, + StoredBlockBody { + start_tx_id: current_tx_id, + tx_count: block.body.len() as u64, + }, + )?; + ommers_cursor.append( + block_key, + StoredBlockOmmers { + ommers: block + .ommers + .into_iter() + .map(|header| header.unseal()) + .collect(), + }, + )?; + + // Write transactions + for transaction in block.body { + // Insert the transaction hash to number mapping + db.put::(transaction.hash(), current_tx_id)?; + // Append the transaction + tx_cursor.append(current_tx_id, transaction)?; + tx_transition_cursor.append(current_tx_id, transition_id)?; + current_tx_id += 1; + transition_id += 1; + } + } + BlockResponse::Empty(_) => { + body_cursor.append( + block_key, + StoredBlockBody { start_tx_id: current_tx_id, tx_count: 0 }, + )?; + } + }; + + // The block transition marks the final state at the end of the block. + // Increment the transition if the block contains an addition block reward. + // If the block does not have a reward, the transition will be the same as the + // transition at the last transaction of this block. + if self.consensus.has_block_reward(block_number) { + transition_id += 1; } + block_transition_cursor.append(block_key, transition_id)?; highest_block = block_number; - first_tx_id = this_tx_count; } // The stage is "done" if: @@ -168,38 +199,51 @@ impl Stage for BodyStage, input: UnwindInput, ) -> Result> { - let mut tx_count_cursor = db.cursor_mut::()?; - let mut block_ommers_cursor = db.cursor_mut::()?; + // Cursors to unwind bodies, ommers, transactions and tx hash to number + let mut body_cursor = db.cursor_mut::()?; + let mut ommers_cursor = db.cursor_mut::()?; let mut transaction_cursor = db.cursor_mut::()?; let mut tx_hash_number_cursor = db.cursor_mut::()?; + // Cursors to unwind transitions + let mut block_transition_cursor = db.cursor_mut::()?; + let mut tx_transition_cursor = db.cursor_mut::()?; - let mut entry = tx_count_cursor.last()?; - while let Some((key, count)) = entry { + // let mut entry = tx_count_cursor.last()?; + let mut entry = body_cursor.last()?; + while let Some((key, body)) = entry { if key.number() <= input.unwind_to { break } - // First delete the current and find the previous cum tx count value - tx_count_cursor.delete_current()?; - entry = tx_count_cursor.prev()?; - - if block_ommers_cursor.seek_exact(key)?.is_some() { - block_ommers_cursor.delete_current()?; + // Delete the ommers value if any + if ommers_cursor.seek_exact(key)?.is_some() { + ommers_cursor.delete_current()?; } - let prev_count = entry.map(|(_, v)| v).unwrap_or_default(); - for tx_id in prev_count..count { - // Block reward introduces gaps in transaction (Last tx number can be the gap) - // this is why we are checking if tx exist or not. - // NOTE: more performant way is probably to use `prev`/`next` fn. and reduce - // count by one if block has block reward. + // Delete the block transition if any + if block_transition_cursor.seek_exact(key)?.is_some() { + block_transition_cursor.delete_current()?; + } + + // Delete all transactions that belong to this block + for tx_id in body.tx_id_range() { + // First delete the transaction and hash to id mapping if let Some((_, transaction)) = transaction_cursor.seek_exact(tx_id)? { transaction_cursor.delete_current()?; if tx_hash_number_cursor.seek_exact(transaction.hash)?.is_some() { tx_hash_number_cursor.delete_current()?; } } + // Delete the transaction transition if any + if tx_transition_cursor.seek_exact(tx_id)?.is_some() { + tx_transition_cursor.delete_current()?; + } } + + // Delete the current body value + body_cursor.delete_current()?; + // Move the cursor to the previous value + entry = body_cursor.prev()?; } Ok(UnwindOutput { stage_progress: input.unwind_to }) @@ -228,14 +272,6 @@ impl BodyStage { let (_, header) = header_cursor.seek_exact((block_number, header_hash).into())?.ok_or( DatabaseIntegrityError::Header { number: block_number, hash: header_hash }, )?; - - if header.ommers_hash == EMPTY_LIST_HASH && header.transactions_root == EMPTY_ROOT { - // TODO: fix this - // If we indeed move to the new changeset structure let's not forget to add a note - // that the gaps issue with the returned empty bodies stream is no longer present - continue - } - bodies_to_download.push(SealedHeader::new(header, header_hash)); } @@ -503,14 +539,17 @@ mod tests { use assert_matches::assert_matches; use reth_db::{ cursor::DbCursorRO, - models::{BlockNumHash, NumTransactions, StoredBlockOmmers}, + models::{BlockNumHash, StoredBlockBody, StoredBlockOmmers}, tables, transaction::{DbTx, DbTxMut}, }; use reth_eth_wire::BlockBody; use reth_interfaces::{ p2p::{ - bodies::{client::BodiesClient, downloader::BodyDownloader}, + bodies::{ + client::BodiesClient, + downloader::{BlockResponse, BodyDownloader}, + }, downloader::{DownloadClient, DownloadStream, Downloader}, error::{DownloadResult, PeerRequestResult}, }, @@ -519,7 +558,7 @@ mod tests { TestConsensus, }, }; - use reth_primitives::{BlockLocked, BlockNumber, Header, SealedHeader, H256}; + use reth_primitives::{BlockLocked, BlockNumber, SealedHeader, TxNumber, H256}; use std::{collections::HashMap, sync::Arc}; /// The block hash of the genesis block. @@ -589,7 +628,6 @@ mod tests { type Seed = Vec; fn seed_execution(&mut self, input: ExecInput) -> Result { - self.insert_genesis()?; let start = input.stage_progress.unwrap_or_default(); let end = input.previous_stage_progress() + 1; let blocks = random_block_range(start..end, GENESIS_HASH); @@ -598,20 +636,27 @@ mod tests { // Insert last progress data self.db.commit(|tx| { let key = (progress.number, progress.hash()).into(); - let last_count = tx - .cursor::()? - .last()? - .map(|(_, v)| v) - .unwrap_or_default(); - // +1 for block reward, - let tx_count = last_count + progress.body.len() as u64 + 1; - tx.put::(key, tx_count)?; - tx.put::(key, StoredBlockOmmers { ommers: vec![] })?; - (last_count..tx_count).try_for_each(|idx| { + let body = StoredBlockBody { + start_tx_id: 0, + tx_count: progress.body.len() as u64, + }; + body.tx_id_range().try_for_each(|tx_id| { let transaction = random_signed_tx(); - tx.put::(transaction.hash(), idx)?; - tx.put::(idx, transaction) - }) + tx.put::(transaction.hash(), tx_id)?; + tx.put::(tx_id, transaction)?; + tx.put::(tx_id, tx_id) + })?; + + // Randomize rewards + let has_reward: bool = rand::random(); + let last_transition_id = progress.body.len().saturating_sub(1) as u64; + let block_transition_id = + last_transition_id + if has_reward { 1 } else { 0 }; + + tx.put::(key, block_transition_id)?; + tx.put::(key, body)?; + tx.put::(key, StoredBlockOmmers { ommers: vec![] })?; + Ok(()) })?; } self.set_responses(blocks.iter().map(body_by_hash).collect()); @@ -633,16 +678,23 @@ mod tests { impl UnwindStageTestRunner for BodyTestRunner { fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> { + self.db.check_no_entry_above::(input.unwind_to, |key| { + key.number() + })?; self.db.check_no_entry_above::(input.unwind_to, |key| { key.number() })?; - self.db.check_no_entry_above::( + self.db.check_no_entry_above::( input.unwind_to, |key| key.number(), )?; - if let Some(last_tx_id) = self.last_count() { + if let Some(last_tx_id) = self.get_last_tx_id()? { self.db .check_no_entry_above::(last_tx_id, |key| key)?; + self.db.check_no_entry_above::( + last_tx_id, + |key| key, + )?; self.db.check_no_entry_above_by_value::( last_tx_id, |value| value, @@ -653,28 +705,18 @@ mod tests { } impl BodyTestRunner { - /// Insert the genesis block into the appropriate tables - /// - /// The genesis block always has no transactions and no ommers, and it always has the - /// same hash. - pub(crate) fn insert_genesis(&self) -> Result<(), TestRunnerError> { - let header = SealedHeader::new(Header::default(), GENESIS_HASH); - self.db.insert_headers(std::iter::once(&header))?; - self.db.commit(|tx| { - let key = (0, GENESIS_HASH).into(); - tx.put::(key, 0)?; - tx.put::(key, StoredBlockOmmers { ommers: vec![] }) + /// Get the last available tx id if any + pub(crate) fn get_last_tx_id(&self) -> Result, TestRunnerError> { + let last_body = self.db.query(|tx| { + let v = tx.cursor::()?.last()?; + Ok(v) })?; - - Ok(()) - } - - /// Retrieve the last tx count from the database - pub(crate) fn last_count(&self) -> Option { - self.db - .query(|tx| Ok(tx.cursor::()?.last()?.map(|e| e.1))) - .ok() - .flatten() + Ok(match last_body { + Some((_, body)) if body.tx_count != 0 => { + Some(body.start_tx_id + body.tx_count - 1) + } + _ => None, + }) } /// Validate that the inserted block data is valid @@ -685,26 +727,27 @@ mod tests { ) -> Result<(), TestRunnerError> { self.db.query(|tx| { // Acquire cursors on body related tables + let mut bodies_cursor = tx.cursor::()?; let mut ommers_cursor = tx.cursor::()?; - let mut tx_count_cursor = tx.cursor::()?; + let mut block_transition_cursor = tx.cursor::()?; let mut transaction_cursor = tx.cursor::()?; let mut tx_hash_num_cursor = tx.cursor::()?; + let mut tx_transition_cursor = tx.cursor::()?; - let first_tx_count_key = match tx_count_cursor.first()? { + let first_body_key = match bodies_cursor.first()? { Some((key, _)) => key, None => return Ok(()), }; - let walker = tx_count_cursor.walk(first_tx_count_key)?.peekable(); - let mut prev_entry: Option<(BlockNumHash, NumTransactions)> = None; - for entry in walker { - let (key, count) = entry?; + let mut prev_key: Option = None; + for entry in bodies_cursor.walk(first_body_key)? { + let (key, body) = entry?; // Validate sequentiality only after prev progress, // since the data before is mocked and can contain gaps if key.number() > prev_progress { - if let Some((prev_key, _)) = prev_entry { - assert_eq!(prev_key.number() + 1, key.number(), "Tx count entries must be sequential"); + if let Some(prev_key) = prev_key { + assert_eq!(prev_key.number() + 1, key.number(), "Body entries must be sequential"); } } @@ -718,21 +761,23 @@ mod tests { // Validate that ommers exist assert_matches!(ommers_cursor.seek_exact(key), Ok(Some(_)), "Block ommers are missing"); - // Validate that block trasactions exist - let first_tx_id = prev_entry.map(|(_, v)| v).unwrap_or_default(); - // reduce by one for block_reward index - let tx_count = if count == 0 { 0 } else { count - 1 }; - for tx_id in first_tx_id..tx_count { + // Validate that block transition exists + assert_matches!(block_transition_cursor.seek_exact(key), Ok(Some(_)), "Block transition is missing"); + + for tx_id in body.tx_id_range() { let tx_entry = transaction_cursor.seek_exact(tx_id)?; - assert!(tx_entry.is_some(), "A transaction is missing."); + assert!(tx_entry.is_some(), "Transaction is missing."); + assert_matches!( + tx_transition_cursor.seek_exact(tx_id), Ok(Some(_)), "Transaction transition is missing" + ); assert_matches!( tx_hash_num_cursor.seek_exact(tx_entry.unwrap().1.hash), Ok(Some(_)), - "A transaction hash to index mapping is missing." + "Transaction hash to index mapping is missing." ); } - prev_entry = Some((key, count)); + prev_key = Some(key); } Ok(()) })?; @@ -753,7 +798,7 @@ mod tests { #[async_trait::async_trait] impl BodiesClient for NoopClient { - async fn get_block_body(&self, _: Vec) -> PeerRequestResult> { + async fn get_block_bodies(&self, _: Vec) -> PeerRequestResult> { panic!("Noop client should not be called") } } @@ -785,7 +830,7 @@ mod tests { } impl BodyDownloader for TestBodyDownloader { - fn bodies_stream<'a, 'b, I>(&'a self, hashes: I) -> DownloadStream<'a, BlockLocked> + fn bodies_stream<'a, 'b, I>(&'a self, hashes: I) -> DownloadStream<'a, BlockResponse> where I: IntoIterator, ::IntoIter: Send + 'b, @@ -797,11 +842,11 @@ mod tests { .get(&header.hash()) .expect("Stage tried downloading a block we do not have.") .clone()?; - Ok(BlockLocked { + Ok(BlockResponse::Full(BlockLocked { header: header.clone(), body: result.transactions, ommers: result.ommers.into_iter().map(|header| header.seal()).collect(), - }) + })) }))) } } diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index d39405bee..9f3caf72c 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -3,9 +3,9 @@ use crate::{ UnwindInput, UnwindOutput, }; use reth_db::{ - cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO}, + cursor::{DbCursorRO, DbCursorRW}, database::Database, - models::{BlockNumHash, TxNumberAddress}, + models::{BlockNumHash, StoredBlockBody, TransitionIdAddress}, tables, transaction::{DbTx, DbTxMut}, }; @@ -15,9 +15,9 @@ use reth_executor::{ revm_wrap::{State, SubState}, Config, }; -use reth_primitives::{Address, StorageEntry, TransactionSignedEcRecovered, H256, U256}; +use reth_primitives::{Address, Header, StorageEntry, TransactionSignedEcRecovered, H256, U256}; use reth_provider::StateProviderImplRefLatest; -use std::{fmt::Debug, ops::DerefMut}; +use std::fmt::Debug; const EXECUTION: StageId = StageId("Execution"); @@ -83,114 +83,61 @@ impl Stage for ExecutionStage { db: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { - let db_tx = db.deref_mut(); // none and zero are same as for genesis block (zeroed block) we are making assumption to // not have transaction. let last_block = input.stage_progress.unwrap_or_default(); let start_block = last_block + 1; // Get next canonical block hashes to execute. - let mut canonicals = db_tx.cursor::()?; + let mut canonicals = db.cursor::()?; // Get header with canonical hashes. - let mut headers = db_tx.cursor::()?; - // Get bodies (to get tx index) with canonical hashes. - let mut cumulative_tx_count = db_tx.cursor::()?; + let mut headers = db.cursor::()?; + // Get bodies with canonical hashes. + let mut bodies_cursor = db.cursor::()?; // Get transaction of the block that we are executing. - let mut tx = db_tx.cursor::()?; + let mut tx = db.cursor::()?; // Skip sender recovery and load signer from database. - let mut tx_sender = db_tx.cursor::()?; + let mut tx_sender = db.cursor::()?; // get canonical blocks (num,hash) let canonical_batch = canonicals .walk(start_block)? - .take(BATCH_SIZE as usize) + .take(BATCH_SIZE as usize) // TODO: commit_threshold .map(|i| i.map(BlockNumHash)) .collect::, _>>()?; // no more canonical blocks, we are done with execution. if canonical_batch.is_empty() { - return Ok(ExecOutput { done: true, reached_tip: true, stage_progress: last_block }) + return Ok(ExecOutput { stage_progress: last_block, done: true, reached_tip: true }) } - // get headers from canonical numbers - let headers_batch = canonical_batch + // Get block headers and bodies from canonical hashes + let block_batch = canonical_batch .iter() - .map(|ch_index| { + .map(|key| -> Result<(Header, StoredBlockBody), StageError> { // TODO see if walker next has better performance then seek_exact calls. - headers.seek_exact(*ch_index).map_err(StageError::Database).and_then(|res| { - res.ok_or_else(|| { - DatabaseIntegrityError::Header { - number: ch_index.number(), - hash: ch_index.hash(), - } - .into() - }) - .map(|(_, header)| header) - }) - }) - .collect::, _>>()?; - - // get last tx count so that we can know amount of transaction in the block. - let mut last_tx_index = if last_block == 0 { - 0u64 - } else { - // headers_batch is not empty, - let parent_hash = headers_batch[0].parent_hash; - - let (_, tx_cnt) = cumulative_tx_count - .seek_exact(BlockNumHash((last_block, parent_hash)))? - .ok_or(DatabaseIntegrityError::CumulativeTxCount { - number: last_block, - hash: parent_hash, - })?; - tx_cnt - }; - - let tx_index_ranges = canonical_batch - .iter() - .map(|ch_index| { - // TODO see if walker next has better performance then seek_exact calls. - cumulative_tx_count - .seek_exact(*ch_index) - .map_err(StageError::Database) - .and_then(|res| { - res.ok_or_else(|| { - DatabaseIntegrityError::CumulativeTxCount { - number: ch_index.number(), - hash: ch_index.hash(), - } - .into() - }) - }) - .map(|(_, cumulative_tx_count)| { - let ret = if self.config.spec_upgrades.has_block_reward(ch_index.number()) { - // if there is block reward, cumulative tx count needs to remove block - // reward index. It is okay to subtract it, as - // block reward index is calculated in the block stage. - (last_tx_index, cumulative_tx_count - 1, Some(cumulative_tx_count - 1)) - } else { - // if there is no block reward we just need to use tx_count - (last_tx_index, cumulative_tx_count, None) - }; - last_tx_index = cumulative_tx_count; - ret - }) + let (_, header) = + headers.seek_exact(*key)?.ok_or(DatabaseIntegrityError::Header { + number: key.number(), + hash: key.hash(), + })?; + let (_, body) = bodies_cursor + .seek_exact(*key)? + .ok_or(DatabaseIntegrityError::BlockBody { number: key.number() })?; + Ok((header, body)) }) .collect::, _>>()?; // Fetch transactions, execute them and generate results let mut block_change_patches = Vec::with_capacity(canonical_batch.len()); - for (header, (start_tx_index, end_tx_index, block_reward_index)) in - headers_batch.iter().zip(tx_index_ranges.iter()) - { + for (header, body) in block_batch.iter() { let num = header.number; - tracing::trace!(target: "stages::execution",?num, "Execute block num."); - let body_tx_cnt = end_tx_index - start_tx_index; + tracing::trace!(target: "stages::execution", ?num, "Execute block num."); // iterate over all transactions - let mut tx_walker = tx.walk(*start_tx_index)?; - let mut transactions = Vec::with_capacity(body_tx_cnt as usize); + let mut tx_walker = tx.walk(body.start_tx_id)?; + let mut transactions = Vec::with_capacity(body.tx_count as usize); // get next N transactions. - for index in *start_tx_index..*end_tx_index { + for index in body.tx_id_range() { let (tx_index, tx) = tx_walker.next().ok_or(DatabaseIntegrityError::EndOfTransactionTable)??; if tx_index != index { @@ -200,9 +147,9 @@ impl Stage for ExecutionStage { } // take signers - let mut tx_sender_walker = tx_sender.walk(*start_tx_index)?; - let mut signers = Vec::with_capacity(body_tx_cnt as usize); - for index in *start_tx_index..*end_tx_index { + let mut tx_sender_walker = tx_sender.walk(body.start_tx_id)?; + let mut signers = Vec::with_capacity(body.tx_count as usize); + for index in body.tx_id_range() { let (tx_index, tx) = tx_sender_walker .next() .ok_or(DatabaseIntegrityError::EndOfTransactionSenderTable)??; @@ -223,8 +170,7 @@ impl Stage for ExecutionStage { .collect(); // for now use default eth config - - let state_provider = SubState::new(State::new(StateProviderImplRefLatest::new(db_tx))); + let state_provider = SubState::new(State::new(StateProviderImplRefLatest::new(&**db))); let change_set = std::thread::scope(|scope| { let handle = std::thread::Builder::new() @@ -244,59 +190,59 @@ impl Stage for ExecutionStage { handle.join().expect("Expects for thread to not panic") }) .map_err(|error| StageError::ExecutionError { block: header.number, error })?; - block_change_patches.push((change_set, start_tx_index, block_reward_index)); + block_change_patches.push(change_set); } + // Get last tx count so that we can know amount of transaction in the block. + let mut current_transition_id = db.get_block_transition_by_num(last_block)? + 1; + // apply changes to plain database. - for (results, start_tx_index, block_reward_index) in block_change_patches.into_iter() { + for results in block_change_patches.into_iter() { // insert state change set - for (index, result) in results.changeset.into_iter().enumerate() { - let tx_index = start_tx_index + index as u64; - for (address, AccountChangeSet { account, wipe_storage, storage }) in - result.state_diff.into_iter() - { + for result in results.changeset.into_iter() { + // TODO insert to transitionId to tx_index + for (address, account_change_set) in result.state_diff.into_iter() { + let AccountChangeSet { account, wipe_storage, storage } = account_change_set; // apply account change to db. Updates AccountChangeSet and PlainAccountState // tables. - account.apply_to_db(address, tx_index, db_tx)?; + account.apply_to_db(&**db, address, current_transition_id)?; // wipe storage if wipe_storage { // TODO insert all changes to StorageChangeSet - db_tx.delete::(address, None)?; + db.delete::(address, None)?; } // insert storage changeset - let storage_id = TxNumberAddress((tx_index, address)); + let storage_id = TransitionIdAddress((current_transition_id, address)); for (key, (old_value, new_value)) in storage { let mut hkey = H256::zero(); key.to_big_endian(&mut hkey.0); // insert into StorageChangeSet - db_tx.put::( + db.put::( storage_id.clone(), StorageEntry { key: hkey, value: old_value }, )?; if new_value.is_zero() { - db_tx.delete::( + db.delete::( address, Some(StorageEntry { key: hkey, value: old_value }), )?; } else { - db_tx.put::( + db.put::( address, StorageEntry { key: hkey, value: new_value }, )?; } } + current_transition_id += 1; } // insert bytecode for (hash, bytecode) in result.new_bytecodes.into_iter() { // make different types of bytecode. Checked and maybe even analyzed (needs to // be packed). Currently save only raw bytes. - db_tx.put::( - hash, - bytecode.bytes()[..bytecode.len()].to_vec(), - )?; + db.put::(hash, bytecode.bytes()[..bytecode.len()].to_vec())?; // NOTE: bytecode bytes are not inserted in change set and it stand in saparate // table @@ -307,10 +253,10 @@ impl Stage for ExecutionStage { // TODO add apply_block_reward_changeset to db tx fn which maybe takes an option. if let Some(block_reward_changeset) = results.block_reward { // we are sure that block reward index is present. - let block_reward_index = block_reward_index.unwrap(); for (address, changeset) in block_reward_changeset.into_iter() { - changeset.apply_to_db(address, block_reward_index, db_tx)?; + changeset.apply_to_db(&**db, address, current_transition_id)?; } + current_transition_id += 1; } } @@ -325,96 +271,74 @@ impl Stage for ExecutionStage { db: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result> { - let unwind_from = input.stage_progress; - let unwind_to = input.unwind_to; - let _bad_block = input.bad_block; + // Acquire changeset cursors + let mut account_changeset = db.cursor_dup_mut::()?; + let mut storage_changeset = db.cursor_dup_mut::()?; - // get block body tx indexes - let db_tx = db.deref_mut(); + let from_transition = db.get_block_transition_by_num(input.stage_progress)?; - // Get transaction of the block that we are executing. - let mut account_changeset = db_tx.cursor_dup_mut::()?; - // Skip sender recovery and load signer from database. - let mut storage_changeset = db_tx.cursor_dup_mut::()?; - - // get from tx_number - let unwind_from_hash = db_tx - .get::(unwind_from)? - .ok_or(DatabaseIntegrityError::CanonicalHeader { number: unwind_from })?; - - let from_tx_number = db_tx - .get::(BlockNumHash((unwind_from, unwind_from_hash)))? - .ok_or(DatabaseIntegrityError::CumulativeTxCount { - number: unwind_from, - hash: unwind_from_hash, - })?; - - // get to tx_number - let to_tx_number = if unwind_to == 0 { - 0 + let to_transition = if input.unwind_to != 0 { + db.get_block_transition_by_num(input.unwind_to - 1)? } else { - let parent_number = unwind_to - 1; - let parent_hash = db_tx - .get::(parent_number)? - .ok_or(DatabaseIntegrityError::CanonicalHeader { number: parent_number })?; - - db_tx - .get::(BlockNumHash((parent_number, parent_hash)))? - .ok_or(DatabaseIntegrityError::CumulativeTxCount { - number: parent_number, - hash: parent_hash, - })? + 0 }; - if to_tx_number > from_tx_number { - panic!("Parents CumulativeTxCount {to_tx_number} is higer then present block #{unwind_to} TxCount {from_tx_number}") + if to_transition > from_transition { + panic!("Unwind transition {} (stage progress block #{}) is higher than the transition {} of (unwind block #{})", to_transition, input.stage_progress, from_transition, input.unwind_to); } - let num_of_tx = (from_tx_number - to_tx_number) as usize; + let num_of_tx = (from_transition - to_transition) as usize; // if there is no transaction ids, this means blocks were empty and block reward change set // is not present. if num_of_tx == 0 { - return Ok(UnwindOutput { stage_progress: unwind_to }) + return Ok(UnwindOutput { stage_progress: input.unwind_to }) } // get all batches for account change // Check if walk and walk_dup would do the same thing + // TODO(dragan) test walking here let account_changeset_batch = account_changeset - .walk_dup(to_tx_number, Address::zero())? - .take_while(|item| item.as_ref().map_or(false, |(num, _)| *num <= from_tx_number)) + .walk(to_transition)? + .take_while(|item| { + item.as_ref().map(|(num, _)| *num <= from_transition).unwrap_or_default() + }) .collect::, _>>()?; // revert all changes to PlainState for (_, changeset) in account_changeset_batch.into_iter().rev() { // TODO refactor in db fn called tx.aplly_account_changeset if let Some(account_info) = changeset.info { - db_tx.put::(changeset.address, account_info)?; + db.put::(changeset.address, account_info)?; } else { - db_tx.delete::(changeset.address, None)?; + db.delete::(changeset.address, None)?; } } + // TODO(dragan) fix walking here // get all batches for storage change let storage_chageset_batch = storage_changeset - .walk_dup(TxNumberAddress((to_tx_number, Address::zero())), H256::zero())? + .walk((to_transition, Address::zero()).into())? .take_while(|item| { - item.as_ref().map_or(false, |(TxNumberAddress((num, _)), _)| *num <= from_tx_number) + item.as_ref() + .map(|(key, _)| key.transition_id() <= from_transition) + .unwrap_or_default() }) .collect::, _>>()?; // revert all changes to PlainStorage - for (TxNumberAddress((_, address)), storage) in storage_chageset_batch.into_iter().rev() { - db_tx.put::(address, storage.clone())?; + for (key, storage) in storage_chageset_batch.into_iter().rev() { + let address = key.address(); + db.put::(address, storage.clone())?; if storage.value == U256::zero() { // delete value that is zero - db_tx.delete::(address, Some(storage))?; + db.delete::(address, Some(storage))?; } } // Discard unwinded changesets let mut entry = account_changeset.last()?; - while let Some((tx_number, _)) = entry { - if tx_number < to_tx_number { + while let Some((transition_id, _)) = entry { + if transition_id < to_transition { break } account_changeset.delete_current()?; @@ -422,8 +346,8 @@ impl Stage for ExecutionStage { } let mut entry = storage_changeset.last()?; - while let Some((TxNumberAddress((tx_number, _)), _)) = entry { - if tx_number < to_tx_number { + while let Some((key, _)) = entry { + if key.transition_id() < to_transition { break } storage_changeset.delete_current()?; @@ -436,7 +360,7 @@ impl Stage for ExecutionStage { #[cfg(test)] mod tests { - use std::ops::Deref; + use std::ops::{Deref, DerefMut}; use super::*; use reth_db::mdbx::{test_utils::create_test_db, EnvKind, WriteMap}; diff --git a/crates/stages/src/stages/senders.rs b/crates/stages/src/stages/senders.rs index 2d49e2237..aa18c0d9b 100644 --- a/crates/stages/src/stages/senders.rs +++ b/crates/stages/src/stages/senders.rs @@ -59,24 +59,23 @@ impl Stage for SendersStage { input: ExecInput, ) -> Result { let stage_progress = input.stage_progress.unwrap_or_default(); - - // Look up the start index for the transaction range - let start_tx_index = db.get_first_tx_id(stage_progress + 1)?; - - // Look up the end index for transaction range (inclusive) let previous_stage_progress = input.previous_stage_progress(); let max_block_num = previous_stage_progress.min(stage_progress + self.commit_threshold); - let end_tx_index = match db.get_latest_tx_id(max_block_num) { - Ok(id) => id, - // No transactions in the database - Err(_) => { - return Ok(ExecOutput { - stage_progress: max_block_num, - done: true, - reached_tip: true, - }) - } - }; + + if max_block_num <= stage_progress { + return Ok(ExecOutput { stage_progress, reached_tip: true, done: true }) + } + + // Look up the start index for the transaction range + let start_tx_index = db.get_block_body_by_num(stage_progress + 1)?.start_tx_id; + + // Look up the end index for transaction range (inclusive) + let end_tx_index = db.get_block_body_by_num(max_block_num)?.last_tx_index(); + + // No transactions to walk over + if start_tx_index > end_tx_index { + return Ok(ExecOutput { stage_progress: max_block_num, done: true, reached_tip: true }) + } // Acquire the cursor for inserting elements let mut senders_cursor = db.cursor_mut::()?; @@ -117,7 +116,7 @@ impl Stage for SendersStage { input: UnwindInput, ) -> Result> { // Lookup latest tx id that we should unwind to - let latest_tx_id = db.get_latest_tx_id(input.unwind_to).unwrap_or_default(); + let latest_tx_id = db.get_block_body_by_num(input.unwind_to)?.last_tx_index(); db.unwind_table_by_num::(latest_tx_id)?; Ok(UnwindOutput { stage_progress: input.unwind_to }) } @@ -126,6 +125,7 @@ impl Stage for SendersStage { #[cfg(test)] mod tests { use assert_matches::assert_matches; + use reth_db::models::StoredBlockBody; use reth_interfaces::test_utils::generators::random_block_range; use reth_primitives::{BlockLocked, BlockNumber, H256}; @@ -214,25 +214,27 @@ mod tests { let blocks = random_block_range(stage_progress..end, H256::zero()); self.db.commit(|tx| { - let mut base_tx_id = 0; + let mut current_tx_id = 0; blocks.iter().try_for_each(|b| { let txs = b.body.clone(); - let tx_amount = txs.len() as u64; let num_hash = (b.number, b.hash()).into(); tx.put::(b.number, b.hash())?; - tx.put::(num_hash, base_tx_id + tx_amount)?; + tx.put::( + num_hash, + StoredBlockBody { start_tx_id: current_tx_id, tx_count: txs.len() as u64 }, + )?; for body_tx in txs { // Insert senders for previous stage progress if b.number == stage_progress { tx.put::( - base_tx_id, + current_tx_id, body_tx.recover_signer().expect("failed to recover sender"), )?; } - tx.put::(base_tx_id, body_tx)?; - base_tx_id += 1; + tx.put::(current_tx_id, body_tx)?; + current_tx_id += 1; } Ok(()) @@ -257,17 +259,12 @@ mod tests { return Ok(()) } - let mut tx_count_cursor = tx.cursor::()?; + let start_hash = tx.get::(start_block)?.unwrap(); + let mut body_cursor = tx.cursor::()?; + body_cursor.seek_exact((start_block, start_hash).into())?; - let last_block = start_block - 1; - let last_hash = tx.get::(start_block)?.unwrap(); - let mut last_tx_count = tx_count_cursor - .seek_exact((last_block, last_hash).into())? - .map(|(_, v)| v) - .unwrap_or_default(); - - while let Some((_, count)) = tx_count_cursor.next()? { - for tx_id in last_tx_count..count { + while let Some((_, body)) = body_cursor.next()? { + for tx_id in body.tx_id_range() { let transaction = tx .get::(tx_id)? .expect("no transaction entry"); @@ -275,7 +272,6 @@ mod tests { transaction.recover_signer().expect("failed to recover signer"); assert_eq!(Some(signer), tx.get::(tx_id)?); } - last_tx_count = count; } Ok(()) @@ -296,11 +292,13 @@ mod tests { impl SendersTestRunner { fn check_no_senders_by_block(&self, block: BlockNumber) -> Result<(), TestRunnerError> { - let latest_tx_id = self.db.inner().get_latest_tx_id(block); - match latest_tx_id { - Ok(last_index) => { - self.db.check_no_entry_above::(last_index, |key| key)? - } + let body_result = self.db.inner().get_block_body_by_num(block); + match body_result { + Ok(body) => self + .db + .check_no_entry_above::(body.last_tx_index(), |key| { + key + })?, Err(_) => { assert!(self.db.table_is_empty::()?); } diff --git a/crates/stages/src/test_utils/macros.rs b/crates/stages/src/test_utils/macros.rs index dc1dfb7b8..4a25ee186 100644 --- a/crates/stages/src/test_utils/macros.rs +++ b/crates/stages/src/test_utils/macros.rs @@ -10,14 +10,13 @@ macro_rules! stage_test_suite { let input = crate::stage::ExecInput::default(); // Run stage execution - let result = runner.execute(input).await.unwrap(); - assert_matches::assert_matches!( - result, - Err(crate::error::StageError::DatabaseIntegrity(_)) - ); + let result = runner.execute(input).await; + // Check that the result is returned and the stage does not panic. + // The return result with empty db is stage-specific. + assert_matches::assert_matches!(result, Ok(_)); // Validate the stage execution - assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation"); + assert!(runner.validate_execution(input, result.unwrap().ok()).is_ok(), "execution validation"); } // Run the complete stage execution flow. @@ -49,13 +48,16 @@ macro_rules! stage_test_suite { assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation"); } - // Check that unwind does not panic on empty database. + // Check that unwind does not panic on no new entries within the input range. #[tokio::test] - async fn unwind_empty_db() { + async fn unwind_no_new_entries() { // Set up the runner - let runner = $runner::default(); + let mut runner = $runner::default(); let input = crate::stage::UnwindInput::default(); + // Seed the database + runner.seed_execution(crate::stage::ExecInput::default()).expect("failed to seed"); + // Run stage unwind let rx = runner.unwind(input).await; assert_matches::assert_matches!( diff --git a/crates/storage/db/src/tables/codecs/compact.rs b/crates/storage/db/src/tables/codecs/compact.rs index a346ce23e..8a3b96eac 100644 --- a/crates/storage/db/src/tables/codecs/compact.rs +++ b/crates/storage/db/src/tables/codecs/compact.rs @@ -40,6 +40,7 @@ impl_compression_for_compact!( Receipt, TxType, StorageEntry, + StoredBlockBody, StoredBlockOmmers ); impl_compression_for_compact!(AccountBeforeTx, TransactionSigned); diff --git a/crates/storage/db/src/tables/codecs/fuzz/mod.rs b/crates/storage/db/src/tables/codecs/fuzz/mod.rs index 4c34a0cce..78e1b04c5 100644 --- a/crates/storage/db/src/tables/codecs/fuzz/mod.rs +++ b/crates/storage/db/src/tables/codecs/fuzz/mod.rs @@ -86,5 +86,5 @@ macro_rules! impl_fuzzer_value_with_input { }; } -impl_fuzzer_key!(BlockNumHash, TxNumberAddress); +impl_fuzzer_key!(BlockNumHash, TransitionIdAddress); impl_fuzzer_value_with_input!((IntegerList, IntegerListInput)); diff --git a/crates/storage/db/src/tables/mod.rs b/crates/storage/db/src/tables/mod.rs index 6a3ff12e6..b294c8d6d 100644 --- a/crates/storage/db/src/tables/mod.rs +++ b/crates/storage/db/src/tables/mod.rs @@ -10,17 +10,19 @@ use crate::{ tables::{ codecs::CompactU256, models::{ - accounts::{AccountBeforeTx, TxNumberAddress}, - blocks::{HeaderHash, NumTransactions, StoredBlockOmmers}, + accounts::{AccountBeforeTx, TransitionIdAddress}, + blocks::{HeaderHash, StoredBlockOmmers}, BlockNumHash, ShardedKey, }, }, }; use reth_primitives::{ Account, Address, BlockHash, BlockNumber, Header, IntegerList, Receipt, StorageEntry, - TransactionSigned, TxHash, TxNumber, H256, + TransactionSigned, TransitionId, TxHash, TxNumber, H256, }; +use self::models::StoredBlockBody; + /// Enum for the types of tables present in libmdbx. #[derive(Debug)] pub enum TableType { @@ -31,13 +33,13 @@ pub enum TableType { } /// Default tables that should be present inside database. -pub const TABLES: [(TableType, &str); 21] = [ +pub const TABLES: [(TableType, &str); 23] = [ (TableType::Table, CanonicalHeaders::const_name()), (TableType::Table, HeaderTD::const_name()), (TableType::Table, HeaderNumbers::const_name()), (TableType::Table, Headers::const_name()), + (TableType::Table, BlockBodies::const_name()), (TableType::Table, BlockOmmers::const_name()), - (TableType::Table, CumulativeTxCount::const_name()), (TableType::Table, NonCanonicalTransactions::const_name()), (TableType::Table, Transactions::const_name()), (TableType::Table, TxHashNumber::const_name()), @@ -46,6 +48,8 @@ pub const TABLES: [(TableType, &str); 21] = [ (TableType::Table, PlainAccountState::const_name()), (TableType::DupSort, PlainStorageState::const_name()), (TableType::Table, Bytecodes::const_name()), + (TableType::Table, BlockTransitionIndex::const_name()), + (TableType::Table, TxTransitionIndex::const_name()), (TableType::Table, AccountHistory::const_name()), (TableType::Table, StorageHistory::const_name()), (TableType::DupSort, AccountChangeSet::const_name()), @@ -123,18 +127,14 @@ table!( ); table!( - /// Stores the uncles/ommers of the block. - ( BlockOmmers ) BlockNumHash | StoredBlockOmmers + /// Stores block bodies. + ( BlockBodies ) BlockNumHash | StoredBlockBody ); table!( - /// Stores the maximum [`TxNumber`] from which this particular block starts. - /// - /// Used to collect transactions for the block. e.g. To collect transactions - /// for block `x` you would need to look at cumulative count at block `x` and - /// at block `x - 1`. - ( CumulativeTxCount ) BlockNumHash | NumTransactions -); // TODO U256? + /// Stores the uncles/ommers of the block. + ( BlockOmmers ) BlockNumHash | StoredBlockOmmers +); table!( /// Stores the transaction body from non canonical transactions. @@ -174,6 +174,16 @@ table!( ( Bytecodes ) H256 | Bytecode ); +table!( + /// Stores the mapping of block number to state transition id. + ( BlockTransitionIndex ) BlockNumHash | TransitionId +); + +table!( + /// Stores the mapping of transaction number to state transition id. + ( TxTransitionIndex ) TxNumber | TransitionId +); + dupsort!( /// Stores the current value of a storage key. ( PlainStorageState ) Address | [H256] StorageEntry @@ -234,14 +244,14 @@ dupsort!( /// Stores the state of an account before a certain transaction changed it. /// Change on state can be: account is created, selfdestructed, touched while empty /// or changed (balance,nonce). - ( AccountChangeSet ) TxNumber | [Address] AccountBeforeTx + ( AccountChangeSet ) TransitionId | [Address] AccountBeforeTx ); dupsort!( /// Stores the state of a storage key before a certain transaction changed it. /// If [`StorageEntry::value`] is zero, this means storage was not existing /// and needs to be removed. - ( StorageChangeSet ) TxNumberAddress | [H256] StorageEntry + ( StorageChangeSet ) TransitionIdAddress | [H256] StorageEntry ); table!( diff --git a/crates/storage/db/src/tables/models/accounts.rs b/crates/storage/db/src/tables/models/accounts.rs index fd199ebda..6d8b0f0aa 100644 --- a/crates/storage/db/src/tables/models/accounts.rs +++ b/crates/storage/db/src/tables/models/accounts.rs @@ -7,7 +7,7 @@ use crate::{ }; use bytes::Bytes; use reth_codecs::{main_codec, Compact}; -use reth_primitives::{Account, Address, TxNumber}; +use reth_primitives::{Account, Address, TransitionId}; use serde::{Deserialize, Serialize}; /// Account as it is saved inside [`AccountChangeSet`]. [`Address`] is the subkey. @@ -26,22 +26,32 @@ pub struct AccountBeforeTx { /// /// Since it's used as a key, it isn't compressed when encoding it. #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct TxNumberAddress(pub (TxNumber, Address)); +pub struct TransitionIdAddress(pub (TransitionId, Address)); + +impl TransitionIdAddress { + /// Return the transition id + pub fn transition_id(&self) -> TransitionId { + self.0 .0 + } + + /// Return the address + pub fn address(&self) -> Address { + self.0 .1 + } -impl TxNumberAddress { /// Consumes `Self` and returns [`TxNumber`], [`Address`] - pub fn take(self) -> (TxNumber, Address) { + pub fn take(self) -> (TransitionId, Address) { (self.0 .0, self.0 .1) } } -impl From<(u64, Address)> for TxNumberAddress { +impl From<(u64, Address)> for TransitionIdAddress { fn from(tpl: (u64, Address)) -> Self { - TxNumberAddress(tpl) + TransitionIdAddress(tpl) } } -impl Encode for TxNumberAddress { +impl Encode for TransitionIdAddress { type Encoded = [u8; 28]; fn encode(self) -> Self::Encoded { @@ -56,7 +66,7 @@ impl Encode for TxNumberAddress { } } -impl Decode for TxNumberAddress { +impl Decode for TransitionIdAddress { fn decode>(value: B) -> Result { let value: bytes::Bytes = value.into(); @@ -64,11 +74,11 @@ impl Decode for TxNumberAddress { u64::from_be_bytes(value.as_ref()[..8].try_into().map_err(|_| Error::DecodeError)?); let hash = Address::from_slice(&value.slice(8..)); - Ok(TxNumberAddress((num, hash))) + Ok(TransitionIdAddress((num, hash))) } } -impl_fixed_arbitrary!(TxNumberAddress, 28); +impl_fixed_arbitrary!(TransitionIdAddress, 28); #[cfg(test)] mod test { @@ -80,7 +90,7 @@ mod test { fn test_tx_number_address() { let num = 1u64; let hash = Address::from_str("ba5e000000000000000000000000000000000000").unwrap(); - let key = TxNumberAddress((num, hash)); + let key = TransitionIdAddress((num, hash)); let mut bytes = [0u8; 28]; bytes[..8].copy_from_slice(&num.to_be_bytes()); @@ -89,7 +99,7 @@ mod test { let encoded = Encode::encode(key.clone()); assert_eq!(encoded, bytes); - let decoded: TxNumberAddress = Decode::decode(encoded.to_vec()).unwrap(); + let decoded: TransitionIdAddress = Decode::decode(encoded.to_vec()).unwrap(); assert_eq!(decoded, key); } @@ -97,7 +107,7 @@ mod test { fn test_tx_number_address_rand() { let mut bytes = [0u8; 28]; thread_rng().fill(bytes.as_mut_slice()); - let key = TxNumberAddress::arbitrary(&mut Unstructured::new(&bytes)).unwrap(); + let key = TransitionIdAddress::arbitrary(&mut Unstructured::new(&bytes)).unwrap(); assert_eq!(bytes, Encode::encode(key)); } } diff --git a/crates/storage/db/src/tables/models/blocks.rs b/crates/storage/db/src/tables/models/blocks.rs index 57a509172..84bb47f47 100644 --- a/crates/storage/db/src/tables/models/blocks.rs +++ b/crates/storage/db/src/tables/models/blocks.rs @@ -1,5 +1,7 @@ //! Block related models and types. +use std::ops::Range; + use crate::{ impl_fixed_arbitrary, table::{Decode, Encode}, @@ -7,14 +9,44 @@ use crate::{ }; use bytes::Bytes; use reth_codecs::{main_codec, Compact}; -use reth_primitives::{BlockHash, BlockNumber, Header, H256}; +use reth_primitives::{BlockHash, BlockNumber, Header, TxNumber, H256}; use serde::{Deserialize, Serialize}; -/// Total chain number of transactions. Value for [`CumulativeTxCount`]. -/// -/// Used for collecting transactions for a block. +/// Total chain number of transactions. Value for [`CumulativeTxCount`]. // TODO: pub type NumTransactions = u64; +/// The storage representation of a block. +/// +/// It has the pointer to the transaction Number of the first +/// transaction in the block and the total number of transactions +#[derive(Debug, Default, Eq, PartialEq, Clone)] +#[main_codec] +pub struct StoredBlockBody { + /// The id of the first transaction in this block + pub start_tx_id: TxNumber, + /// The total number of transactions + pub tx_count: NumTransactions, +} + +impl StoredBlockBody { + /// Return the range of transaction ids for this body + pub fn tx_id_range(&self) -> Range { + self.start_tx_id..self.start_tx_id + self.tx_count + } + + /// Return the index of last transaction in this block unless the block + /// is empty in which case it refers to the last transaction in a previous + /// non-empty block + pub fn last_tx_index(&self) -> TxNumber { + self.start_tx_id.saturating_add(self.tx_count).saturating_sub(1) + } + + /// Return a flag whether the block is empty + pub fn is_empty(&self) -> bool { + self.tx_count == 0 + } +} + /// The storage representation of a block ommers. /// /// It is stored as the headers of the block's uncles. diff --git a/crates/storage/provider/src/block.rs b/crates/storage/provider/src/block.rs index b3d30ccf1..9decc095e 100644 --- a/crates/storage/provider/src/block.rs +++ b/crates/storage/provider/src/block.rs @@ -1,6 +1,6 @@ use auto_impl::auto_impl; use reth_db::{ - models::{BlockNumHash, StoredBlockOmmers}, + models::{BlockNumHash, StoredBlockBody, StoredBlockOmmers}, tables, transaction::{DbTx, DbTxMut}, }; @@ -105,28 +105,6 @@ pub struct ChainInfo { pub safe_finalized: Option, } -/// Get value from [tables::CumulativeTxCount] by block hash -/// as the table is indexed by NumHash key we are obtaining number from -/// [tables::HeaderNumbers] -pub fn get_cumulative_tx_count_by_hash<'a, TX: DbTxMut<'a> + DbTx<'a>>( - tx: &TX, - block_hash: H256, -) -> Result { - let block_number = tx - .get::(block_hash)? - .ok_or(ProviderError::BlockHashNotExist { block_hash })?; - - let block_num_hash = BlockNumHash((block_number, block_hash)); - - tx.get::(block_num_hash)?.ok_or_else(|| { - ProviderError::BlockBodyNotExist { - block_number: block_num_hash.number(), - block_hash: block_num_hash.hash(), - } - .into() - }) -} - /// Fill block to database. Useful for tests. /// Check parent dependency in [tables::HeaderNumbers] and in [tables::CumulativeTxCount] tables. /// Inserts blocks data to [tables::CanonicalHeaders], [tables::Headers], [tables::HeaderNumbers], @@ -149,22 +127,49 @@ pub fn insert_canonical_block<'a, TX: DbTxMut<'a> + DbTx<'a>>( StoredBlockOmmers { ommers: block.ommers.iter().map(|h| h.as_ref().clone()).collect() }, )?; - if block.number == 0 { - tx.put::(block_num_hash, 0)?; - } else { - let mut tx_number = get_cumulative_tx_count_by_hash(tx, block.parent_hash)?; - - for eth_tx in block.body.iter() { - let rec_tx = eth_tx.clone().into_ecrecovered().unwrap(); - tx.put::(tx_number, rec_tx.signer())?; - tx.put::(tx_number, rec_tx.into())?; - tx_number += 1; + let (mut current_tx_id, mut transition_id) = { + if block.number == 0 { + (0, 0) + } else { + let prev_block_num = block.number - 1; + let prev_block_hash = tx + .get::(prev_block_num)? + .ok_or(ProviderError::BlockNumber { block_number: prev_block_num })?; + let prev_body = tx + .get::((prev_block_num, prev_block_hash).into())? + .ok_or(ProviderError::BlockBody { + block_number: prev_block_num, + block_hash: prev_block_hash, + })?; + let last_transition_id = tx + .get::((prev_block_num, prev_block_hash).into())? + .ok_or(ProviderError::BlockTransition { + block_number: prev_block_num, + block_hash: prev_block_hash, + })?; + (prev_body.start_tx_id + prev_body.tx_count, last_transition_id + 1) } - tx.put::( - block_num_hash, - tx_number + if has_block_reward { 1 } else { 0 }, - )?; + }; + + // insert body data + tx.put::( + block_num_hash, + StoredBlockBody { start_tx_id: current_tx_id, tx_count: block.body.len() as u64 }, + )?; + + for transaction in block.body.iter() { + let rec_tx = transaction.clone().into_ecrecovered().unwrap(); + tx.put::(current_tx_id, rec_tx.signer())?; + tx.put::(current_tx_id, rec_tx.into())?; + tx.put::(current_tx_id, transition_id)?; + current_tx_id += 1; + transition_id += 1; } + if has_block_reward { + transition_id += 1; + } + tx.put::((block.number, block.hash()).into(), transition_id)?; + Ok(()) } diff --git a/crates/storage/provider/src/db_provider/storage.rs b/crates/storage/provider/src/db_provider/storage.rs index 84060aa4a..ab15671f6 100644 --- a/crates/storage/provider/src/db_provider/storage.rs +++ b/crates/storage/provider/src/db_provider/storage.rs @@ -9,7 +9,8 @@ use reth_db::{ use reth_interfaces::Result; use reth_primitives::{ - Account, Address, BlockHash, BlockNumber, Bytes, StorageKey, StorageValue, TxNumber, H256, U256, + Account, Address, BlockHash, BlockNumber, Bytes, StorageKey, StorageValue, TransitionId, H256, + U256, }; use std::marker::PhantomData; @@ -26,71 +27,68 @@ impl StateProviderFactory for ProviderImpl { // get block hash let block_hash = tx .get::(block_number)? - .ok_or(Error::BlockNumberNotExists { block_number })?; + .ok_or(Error::BlockNumber { block_number })?; - // get transaction number + // get transition id let block_num_hash = (block_number, block_hash); - let transaction_number = tx - .get::(block_num_hash.into())? - .ok_or(Error::BlockTxNumberNotExists { block_hash })?; + let transition = tx + .get::(block_num_hash.into())? + .ok_or(Error::BlockTransition { block_number, block_hash })?; - Ok(StateProviderImplHistory::new(tx, transaction_number)) + Ok(StateProviderImplHistory::new(tx, transition)) } fn history_by_block_hash(&self, block_hash: BlockHash) -> Result> { let tx = self.db.tx()?; // get block number - let block_number = tx - .get::(block_hash)? - .ok_or(Error::BlockHashNotExist { block_hash })?; + let block_number = + tx.get::(block_hash)?.ok_or(Error::BlockHash { block_hash })?; - // get transaction number + // get transition id let block_num_hash = (block_number, block_hash); - let transaction_number = tx - .get::(block_num_hash.into())? - .ok_or(Error::BlockTxNumberNotExists { block_hash })?; + let transition = tx + .get::(block_num_hash.into())? + .ok_or(Error::BlockTransition { block_number, block_hash })?; - Ok(StateProviderImplHistory::new(tx, transaction_number)) + Ok(StateProviderImplHistory::new(tx, transition)) } } -/// State provider for given transaction number +/// State provider for a given transition pub struct StateProviderImplHistory<'a, TX: DbTx<'a>> { /// Database transaction tx: TX, - /// Transaction number is main indexer of account and storage changes - transaction_number: TxNumber, + /// Transition is main indexer of account and storage changes + transition: TransitionId, /// Phantom lifetime `'a` _phantom: PhantomData<&'a TX>, } impl<'a, TX: DbTx<'a>> StateProviderImplHistory<'a, TX> { /// Create new StateProvider from history transaction number - pub fn new(tx: TX, transaction_number: TxNumber) -> Self { - Self { tx, transaction_number, _phantom: PhantomData {} } + pub fn new(tx: TX, transition: TransitionId) -> Self { + Self { tx, transition, _phantom: PhantomData {} } } } impl<'a, TX: DbTx<'a>> AccountProvider for StateProviderImplHistory<'a, TX> { /// Get basic account information. fn basic_account(&self, address: Address) -> Result> { - StateProviderImplRefHistory::new(&self.tx, self.transaction_number).basic_account(address) + StateProviderImplRefHistory::new(&self.tx, self.transition).basic_account(address) } } impl<'a, TX: DbTx<'a>> StateProvider for StateProviderImplHistory<'a, TX> { fn storage(&self, account: Address, storage_key: StorageKey) -> Result> { - StateProviderImplRefHistory::new(&self.tx, self.transaction_number) - .storage(account, storage_key) + StateProviderImplRefHistory::new(&self.tx, self.transition).storage(account, storage_key) } fn bytecode_by_hash(&self, code_hash: H256) -> Result> { - StateProviderImplRefHistory::new(&self.tx, self.transaction_number) - .bytecode_by_hash(code_hash) + StateProviderImplRefHistory::new(&self.tx, self.transition).bytecode_by_hash(code_hash) } fn block_hash(&self, number: U256) -> Result> { - StateProviderImplRefHistory::new(&self.tx, self.transaction_number).block_hash(number) + StateProviderImplRefHistory::new(&self.tx, self.transition).block_hash(number) } } /// State provider with given hash @@ -104,16 +102,16 @@ impl<'a, TX: DbTx<'a>> StateProvider for StateProviderImplHistory<'a, TX> { pub struct StateProviderImplRefHistory<'a, 'b, TX: DbTx<'a>> { /// Transaction tx: &'b TX, - /// Transaction number is main indexer of account and storage changes - transaction_number: TxNumber, + /// Transition is main indexer of account and storage changes + transition: TransitionId, /// Phantom lifetime `'a` _phantom: PhantomData<&'a TX>, } impl<'a, 'b, TX: DbTx<'a>> StateProviderImplRefHistory<'a, 'b, TX> { /// Create new StateProvider from history transaction number - pub fn new(tx: &'b TX, transaction_number: TxNumber) -> Self { - Self { tx, transaction_number, _phantom: PhantomData {} } + pub fn new(tx: &'b TX, transition: TransitionId) -> Self { + Self { tx, transition, _phantom: PhantomData {} } } } @@ -131,8 +129,8 @@ impl<'a, 'b, TX: DbTx<'a>> StateProvider for StateProviderImplRefHistory<'a, 'b, // TODO when StorageHistory is defined let transaction_number = self.tx.get::(Vec::new())?.map(|_integer_list| - // TODO select integer that is one less from transaction_number - self.transaction_number); + // TODO select integer that is one less from transaction_number <- // TODO: (rkrasiuk) not sure this comment is still relevant + self.transition); if transaction_number.is_none() { return Ok(None) diff --git a/crates/storage/provider/src/lib.rs b/crates/storage/provider/src/lib.rs index e3d7352dd..2463c9388 100644 --- a/crates/storage/provider/src/lib.rs +++ b/crates/storage/provider/src/lib.rs @@ -16,10 +16,7 @@ mod state; /// Common test helpers for mocking the Provider. pub mod test_utils; -pub use block::{ - get_cumulative_tx_count_by_hash, insert_canonical_block, BlockProvider, ChainInfo, - HeaderProvider, -}; +pub use block::{insert_canonical_block, BlockProvider, ChainInfo, HeaderProvider}; pub use db_provider::{ self as db, ProviderImpl, StateProviderImplHistory, StateProviderImplLatest, StateProviderImplRefHistory, StateProviderImplRefLatest,