feat(sync): state transition indexes (#449)

* introduce state transitions and revert/modify block bodies table

* init refactor

* revamp transaction iteration based on bodies and add state transition mappings

* change expected return on empty db execution

* interim commit

* fix body downloader & stage

* refactor(bodies/dl): make fetch bodies fn more clear

* chore: disable unused vars/fns temporarily until exec is back

* chore: fmt

* test: fix tests

* use transitions in execution stage

* clarify empty unwind test

* remove last_tx_index fn

* rename fn and var names

* fix full block response comment

* rename fetcher`s get_block_body to get_block_bodies

* Update crates/stages/src/db.rs

Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>

* fmt

* fix index overlap check error

* uncomment eth chain command

* fix doc comment

* typos

* cleanup

* any_last_tx_index -> last_tx_index

Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
Co-authored-by: Oliver Nordbjerg <hi@notbjerg.me>
This commit is contained in:
Roman Krasiuk
2022-12-16 18:37:49 +02:00
committed by GitHub
parent 345b2c35b9
commit daaf039fbf
27 changed files with 658 additions and 575 deletions

View File

@ -149,7 +149,8 @@ fn init_genesis<DB: Database>(db: Arc<DB>, genesis: Genesis) -> Result<H256, ret
let hash = header.hash_slow(); let hash = header.hash_slow();
tx.put::<tables::CanonicalHeaders>(0, hash)?; tx.put::<tables::CanonicalHeaders>(0, hash)?;
tx.put::<tables::HeaderNumbers>(hash, 0)?; tx.put::<tables::HeaderNumbers>(hash, 0)?;
tx.put::<tables::CumulativeTxCount>((0, hash).into(), 0)?; tx.put::<tables::BlockBodies>((0, hash).into(), Default::default())?;
tx.put::<tables::BlockTransitionIndex>((0, hash).into(), 0)?;
tx.put::<tables::HeaderTD>((0, hash).into(), header.difficulty.into())?; tx.put::<tables::HeaderTD>((0, hash).into(), header.difficulty.into())?;
tx.put::<tables::Headers>((0, hash).into(), header)?; tx.put::<tables::Headers>((0, hash).into(), header)?;

View File

@ -53,9 +53,9 @@ impl AccountInfoChangeSet {
/// Apply the changes from the changeset to a database transaction. /// Apply the changes from the changeset to a database transaction.
pub fn apply_to_db<'a, TX: DbTxMut<'a>>( pub fn apply_to_db<'a, TX: DbTxMut<'a>>(
self, self,
tx: &TX,
address: Address, address: Address,
tx_index: u64, tx_index: u64,
tx: &TX,
) -> Result<(), DbError> { ) -> Result<(), DbError> {
match self { match self {
AccountInfoChangeSet::Changed { old, new } => { AccountInfoChangeSet::Changed { old, new } => {
@ -105,6 +105,7 @@ pub struct AccountChangeSet {
/// Execution Result containing vector of transaction changesets /// Execution Result containing vector of transaction changesets
/// and block reward if present /// and block reward if present
#[derive(Debug)]
pub struct ExecutionResult { pub struct ExecutionResult {
/// Transaction changeest contraining [Receipt], changed [Accounts][Account] and Storages. /// Transaction changeest contraining [Receipt], changed [Accounts][Account] and Storages.
pub changeset: Vec<TransactionChangeSet>, pub changeset: Vec<TransactionChangeSet>,
@ -586,7 +587,7 @@ mod tests {
// check Changed changeset // check Changed changeset
AccountInfoChangeSet::Changed { new: acc1, old: acc2 } AccountInfoChangeSet::Changed { new: acc1, old: acc2 }
.apply_to_db(address, tx_num, &tx) .apply_to_db(&tx, address, tx_num)
.unwrap(); .unwrap();
assert_eq!( assert_eq!(
tx.get::<tables::AccountChangeSet>(tx_num), tx.get::<tables::AccountChangeSet>(tx_num),
@ -594,7 +595,7 @@ mod tests {
); );
assert_eq!(tx.get::<tables::PlainAccountState>(address), Ok(Some(acc1))); assert_eq!(tx.get::<tables::PlainAccountState>(address), Ok(Some(acc1)));
AccountInfoChangeSet::Created { new: acc1 }.apply_to_db(address, tx_num, &tx).unwrap(); AccountInfoChangeSet::Created { new: acc1 }.apply_to_db(&tx, address, tx_num).unwrap();
assert_eq!( assert_eq!(
tx.get::<tables::AccountChangeSet>(tx_num), tx.get::<tables::AccountChangeSet>(tx_num),
Ok(Some(AccountBeforeTx { address, info: None })) Ok(Some(AccountBeforeTx { address, info: None }))
@ -604,7 +605,7 @@ mod tests {
// delete old value, as it is dupsorted // delete old value, as it is dupsorted
tx.delete::<tables::AccountChangeSet>(tx_num, None).unwrap(); tx.delete::<tables::AccountChangeSet>(tx_num, None).unwrap();
AccountInfoChangeSet::Destroyed { old: acc2 }.apply_to_db(address, tx_num, &tx).unwrap(); AccountInfoChangeSet::Destroyed { old: acc2 }.apply_to_db(&tx, address, tx_num).unwrap();
assert_eq!(tx.get::<tables::PlainAccountState>(address), Ok(None)); assert_eq!(tx.get::<tables::PlainAccountState>(address), Ok(None));
assert_eq!( assert_eq!(
tx.get::<tables::AccountChangeSet>(tx_num), tx.get::<tables::AccountChangeSet>(tx_num),

View File

@ -27,11 +27,11 @@ pub enum Error {
ReceiptLogDiff, ReceiptLogDiff,
#[error("Receipt log is different.")] #[error("Receipt log is different.")]
ExecutionSuccessDiff { got: bool, expected: bool }, ExecutionSuccessDiff { got: bool, expected: bool },
#[error("Receipt root {got:?} is different then expected {expected:?}.")] #[error("Receipt root {got:?} is different than expected {expected:?}.")]
ReceiptRootDiff { got: H256, expected: H256 }, ReceiptRootDiff { got: H256, expected: H256 },
#[error("Header bloom filter {got:?} is different then expected {expected:?}.")] #[error("Header bloom filter {got:?} is different than expected {expected:?}.")]
BloomLogDiff { got: Box<Bloom>, expected: Box<Bloom> }, BloomLogDiff { got: Box<Bloom>, expected: Box<Bloom> },
#[error("Transaction gas limit {transaction_gas_limit} is more then blocks available gas {block_available_gas}")] #[error("Transaction gas limit {transaction_gas_limit} is more than blocks available gas {block_available_gas}")]
TransactionGasLimitMoreThenAvailableBlockGas { TransactionGasLimitMoreThenAvailableBlockGas {
transaction_gas_limit: u64, transaction_gas_limit: u64,
block_available_gas: u64, block_available_gas: u64,

View File

@ -8,5 +8,5 @@ use reth_primitives::H256;
#[auto_impl::auto_impl(&, Arc, Box)] #[auto_impl::auto_impl(&, Arc, Box)]
pub trait BodiesClient: DownloadClient { pub trait BodiesClient: DownloadClient {
/// Fetches the block body for the requested block. /// Fetches the block body for the requested block.
async fn get_block_body(&self, hash: Vec<H256>) -> PeerRequestResult<Vec<BlockBody>>; async fn get_block_bodies(&self, hashes: Vec<H256>) -> PeerRequestResult<Vec<BlockBody>>;
} }

View File

@ -1,6 +1,25 @@
use crate::p2p::downloader::{DownloadStream, Downloader}; use crate::p2p::downloader::{DownloadStream, Downloader};
use reth_primitives::{BlockLocked, SealedHeader}; use reth_primitives::{BlockLocked, SealedHeader};
/// The block response
#[derive(PartialEq, Eq, Debug)]
pub enum BlockResponse {
/// Full block response (with transactions or ommers)
Full(BlockLocked),
/// The empty block response
Empty(SealedHeader),
}
impl BlockResponse {
/// Return the reference to the response header
pub fn header(&self) -> &SealedHeader {
match self {
BlockResponse::Full(block) => &block.header,
BlockResponse::Empty(header) => header,
}
}
}
/// A downloader capable of fetching block bodies from header hashes. /// A downloader capable of fetching block bodies from header hashes.
/// ///
/// A downloader represents a distinct strategy for submitting requests to download block bodies, /// A downloader represents a distinct strategy for submitting requests to download block bodies,
@ -19,7 +38,7 @@ pub trait BodyDownloader: Downloader {
/// ///
/// It is *not* guaranteed that all the requested bodies are fetched: the downloader may close /// It is *not* guaranteed that all the requested bodies are fetched: the downloader may close
/// the stream before the entire range has been fetched for any reason /// the stream before the entire range has been fetched for any reason
fn bodies_stream<'a, 'b, I>(&'a self, headers: I) -> DownloadStream<'a, BlockLocked> fn bodies_stream<'a, 'b, I>(&'a self, headers: I) -> DownloadStream<'a, BlockResponse>
where where
I: IntoIterator<Item = &'b SealedHeader>, I: IntoIterator<Item = &'b SealedHeader>,
<I as IntoIterator>::IntoIter: Send + 'b, <I as IntoIterator>::IntoIter: Send + 'b,

View File

@ -4,12 +4,12 @@ use reth_primitives::{BlockHash, BlockNumber};
#[allow(missing_docs)] #[allow(missing_docs)]
#[derive(Debug, thiserror::Error, PartialEq, Eq, Clone)] #[derive(Debug, thiserror::Error, PartialEq, Eq, Clone)]
pub enum Error { pub enum Error {
#[error("Block Number {block_number:?} does not exist in database")] #[error("Block number {block_number} does not exist in database")]
BlockNumberNotExists { block_number: BlockNumber }, BlockNumber { block_number: BlockNumber },
#[error("Block tx cumulative number for hash {block_hash:?} does not exist in database")] #[error("Block hash {block_hash:?} does not exist in Headers table")]
BlockTxNumberNotExists { block_hash: BlockHash }, BlockHash { block_hash: BlockHash },
#[error("Block hash {block_hash:?} does not exists in Headers table")] #[error("Block body not exists #{block_number} ({block_hash:?})")]
BlockHashNotExist { block_hash: BlockHash }, BlockBody { block_number: BlockNumber, block_hash: BlockHash },
#[error("Block body not exists #{block_number} {block_hash}")] #[error("Block transition does not exist for block #{block_number} ({block_hash:?})")]
BlockBodyNotExist { block_number: BlockNumber, block_hash: BlockHash }, BlockTransition { block_number: BlockNumber, block_hash: BlockHash },
} }

View File

@ -29,7 +29,7 @@ impl<F> BodiesClient for TestBodiesClient<F>
where where
F: Fn(Vec<H256>) -> PeerRequestResult<Vec<BlockBody>> + Send + Sync, F: Fn(Vec<H256>) -> PeerRequestResult<Vec<BlockBody>> + Send + Sync,
{ {
async fn get_block_body(&self, hashes: Vec<H256>) -> PeerRequestResult<Vec<BlockBody>> { async fn get_block_bodies(&self, hashes: Vec<H256>) -> PeerRequestResult<Vec<BlockBody>> {
(self.responder)(hashes) (self.responder)(hashes)
} }
} }

View File

@ -3,9 +3,12 @@ use futures_util::{stream, StreamExt, TryStreamExt};
use reth_interfaces::{ use reth_interfaces::{
consensus::Consensus as ConsensusTrait, consensus::Consensus as ConsensusTrait,
p2p::{ p2p::{
bodies::{client::BodiesClient, downloader::BodyDownloader}, bodies::{
client::BodiesClient,
downloader::{BlockResponse, BodyDownloader},
},
downloader::{DownloadStream, Downloader}, downloader::{DownloadStream, Downloader},
error::{DownloadError, DownloadResult}, error::{DownloadError, DownloadResult, RequestError},
}, },
}; };
use reth_primitives::{BlockLocked, SealedHeader}; use reth_primitives::{BlockLocked, SealedHeader};
@ -50,7 +53,7 @@ where
Client: BodiesClient, Client: BodiesClient,
Consensus: ConsensusTrait, Consensus: ConsensusTrait,
{ {
fn bodies_stream<'a, 'b, I>(&'a self, headers: I) -> DownloadStream<'a, BlockLocked> fn bodies_stream<'a, 'b, I>(&'a self, headers: I) -> DownloadStream<'a, BlockResponse>
where where
I: IntoIterator<Item = &'b SealedHeader>, I: IntoIterator<Item = &'b SealedHeader>,
<I as IntoIterator>::IntoIter: Send + 'b, <I as IntoIterator>::IntoIter: Send + 'b,
@ -104,33 +107,75 @@ where
self self
} }
/// Fetch a batch of block bodies. /// Given a batch of headers, this function proceeds to:
async fn fetch_bodies(&self, headers: Vec<&SealedHeader>) -> DownloadResult<Vec<BlockLocked>> { /// 1. Filter for all the non-empty headers
let (peer_id, response) = self /// 2. Return early with the header values, if there were no non-empty headers, else..
.client /// 3. Request the bodies for the non-empty headers from a peer chosen by the network client
.get_block_body(headers.iter().map(|header| header.hash()).collect()) /// 4. For any non-empty headers, it proceeds to validate the corresponding body from the peer
.await? /// and return it as part of the response via the [`BlockResponse::Full`] variant.
.split(); ///
/// NB: This assumes that peers respond with bodies in the order that they were requested.
/// This is a reasonable assumption to make as that's [what Geth
/// does](https://github.com/ethereum/go-ethereum/blob/f53ff0ff4a68ffc56004ab1d5cc244bcb64d3277/les/server_requests.go#L245).
/// All errors regarding the response cause the peer to get penalized, meaning that adversaries
/// that try to give us bodies that do not match the requested order are going to be penalized
/// and eventually disconnected.
async fn fetch_bodies(
&self,
headers: Vec<&SealedHeader>,
) -> DownloadResult<Vec<BlockResponse>> {
let headers_with_txs_and_ommers =
headers.iter().filter(|h| !h.is_empty()).map(|h| h.hash()).collect::<Vec<_>>();
if headers_with_txs_and_ommers.is_empty() {
return Ok(headers.into_iter().cloned().map(BlockResponse::Empty).collect())
}
let (peer_id, bodies) =
self.client.get_block_bodies(headers_with_txs_and_ommers).await?.split();
let mut bodies = bodies.into_iter();
let mut responses = vec![];
for header in headers.into_iter().cloned() {
// If the header has no txs / ommers, just push it and continue
if header.is_empty() {
responses.push(BlockResponse::Empty(header));
} else {
// If the header was not empty, and there is no body, then
// the peer gave us a bad resopnse and we should return
// error.
let body = match bodies.next() {
Some(body) => body,
None => {
self.client.report_bad_message(peer_id);
// TODO: We error always, this means that if we got no body from a peer
// and the header was not empty we will discard a bunch of progress.
// This will cause redundant bandwdith usage (due to how our retriable
// stream works via std::iter), but we will address this
// with a custom retriable stream that maintains state and is able to
// "resume" progress from the current state of `responses`.
return Err(DownloadError::RequestError(RequestError::BadResponse))
}
};
response
.into_iter()
.zip(headers)
.map(|(body, header)| {
let block = BlockLocked { let block = BlockLocked {
header: header.clone(), header: header.clone(),
body: body.transactions, body: body.transactions,
ommers: body.ommers.into_iter().map(|header| header.seal()).collect(), ommers: body.ommers.into_iter().map(|header| header.seal()).collect(),
}; };
match self.consensus.pre_validate_block(&block) { // This ensures that the TxRoot and OmmersRoot from the header match the
Ok(_) => Ok(block), // ones calculated manually from the block body.
Err(error) => { self.consensus.pre_validate_block(&block).map_err(|error| {
self.client.report_bad_message(peer_id); self.client.report_bad_message(peer_id);
Err(DownloadError::BlockValidation { hash: header.hash(), error }) DownloadError::BlockValidation { hash: header.hash(), error }
} })?;
}
}) responses.push(BlockResponse::Full(block));
.collect() }
}
Ok(responses)
} }
} }
@ -146,7 +191,7 @@ mod tests {
p2p::{bodies::downloader::BodyDownloader, error::RequestError}, p2p::{bodies::downloader::BodyDownloader, error::RequestError},
test_utils::TestConsensus, test_utils::TestConsensus,
}; };
use reth_primitives::{PeerId, H256}; use reth_primitives::{Header, PeerId, H256};
use std::{ use std::{
sync::{ sync::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
@ -190,7 +235,7 @@ mod tests {
assert_matches!( assert_matches!(
downloader downloader
.bodies_stream(headers.clone().iter()) .bodies_stream(headers.clone().iter())
.try_collect::<Vec<BlockLocked>>() .try_collect::<Vec<BlockResponse>>()
.await, .await,
Ok(responses) => { Ok(responses) => {
assert_eq!( assert_eq!(
@ -199,13 +244,13 @@ mod tests {
.into_iter() .into_iter()
.map(| header | { .map(| header | {
let body = bodies .remove(&header.hash()).unwrap(); let body = bodies .remove(&header.hash()).unwrap();
BlockLocked { BlockResponse::Full(BlockLocked {
header, header,
body: body.transactions, body: body.transactions,
ommers: body.ommers.into_iter().map(|o| o.seal()).collect(), ommers: body.ommers.into_iter().map(|o| o.seal()).collect(),
} })
}) })
.collect::<Vec<BlockLocked>>() .collect::<Vec<BlockResponse>>()
); );
} }
); );
@ -221,8 +266,10 @@ mod tests {
Arc::new(TestConsensus::default()), Arc::new(TestConsensus::default()),
); );
let headers = &[Header { ommers_hash: H256::default(), ..Default::default() }.seal()];
let mut stream = downloader.bodies_stream(headers);
assert_matches!( assert_matches!(
downloader.bodies_stream(&[SealedHeader::default()]).next().await, stream.next().await,
Some(Err(DownloadError::RequestError(RequestError::ChannelClosed))) Some(Err(DownloadError::RequestError(RequestError::ChannelClosed)))
); );
} }
@ -251,10 +298,11 @@ mod tests {
Arc::new(TestConsensus::default()), Arc::new(TestConsensus::default()),
); );
let header = Header { ommers_hash: H256::default(), ..Default::default() }.seal();
assert_matches!( assert_matches!(
downloader.bodies_stream(&[SealedHeader::default()]).next().await, downloader.bodies_stream(&[header.clone()]).next().await,
Some(Ok(body)) => { Some(Ok(BlockResponse::Full(block))) => {
assert_eq!(body, BlockLocked::default()); assert_eq!(block.header, header);
} }
); );
assert_eq!(Arc::try_unwrap(retries_left).unwrap().into_inner(), 0); assert_eq!(Arc::try_unwrap(retries_left).unwrap().into_inner(), 0);
@ -286,7 +334,12 @@ mod tests {
.with_retries(0); .with_retries(0);
assert_matches!( assert_matches!(
downloader.bodies_stream(&[SealedHeader::default()]).next().await, downloader
.bodies_stream(&[
Header { ommers_hash: H256::default(), ..Default::default() }.seal()
])
.next()
.await,
Some(Err(DownloadError::RequestError(RequestError::Timeout))) Some(Err(DownloadError::RequestError(RequestError::Timeout)))
); );
assert_eq!(Arc::try_unwrap(retries_left).unwrap().into_inner(), 2); assert_eq!(Arc::try_unwrap(retries_left).unwrap().into_inner(), 2);

View File

@ -65,7 +65,7 @@ where
F: FnMut(Vec<H256>) -> Fut + Send + Sync, F: FnMut(Vec<H256>) -> Fut + Send + Sync,
Fut: Future<Output = PeerRequestResult<Vec<BlockBody>>> + Send, Fut: Future<Output = PeerRequestResult<Vec<BlockBody>>> + Send,
{ {
async fn get_block_body(&self, hash: Vec<H256>) -> PeerRequestResult<Vec<BlockBody>> { async fn get_block_bodies(&self, hash: Vec<H256>) -> PeerRequestResult<Vec<BlockBody>> {
let f = &mut *self.0.lock().await; let f = &mut *self.0.lock().await;
(f)(hash).await (f)(hash).await
} }

View File

@ -41,7 +41,7 @@ impl HeadersClient for FetchClient {
#[async_trait::async_trait] #[async_trait::async_trait]
impl BodiesClient for FetchClient { impl BodiesClient for FetchClient {
async fn get_block_body(&self, request: Vec<H256>) -> PeerRequestResult<Vec<BlockBody>> { async fn get_block_bodies(&self, request: Vec<H256>) -> PeerRequestResult<Vec<BlockBody>> {
let (response, rx) = oneshot::channel(); let (response, rx) = oneshot::channel();
self.request_tx.send(DownloadRequest::GetBlockBodies { request, response })?; self.request_tx.send(DownloadRequest::GetBlockBodies { request, response })?;
rx.await? rx.await?

View File

@ -64,7 +64,7 @@ async fn test_get_body() {
mock_provider.add_block(block_hash, block.clone()); mock_provider.add_block(block_hash, block.clone());
let res = fetch0.get_block_body(vec![block_hash]).await; let res = fetch0.get_block_bodies(vec![block_hash]).await;
assert!(res.is_ok()); assert!(res.is_ok());
let blocks = res.unwrap().1; let blocks = res.unwrap().1;

View File

@ -100,6 +100,11 @@ impl Header {
H256::from_slice(keccak256(&out).as_slice()) H256::from_slice(keccak256(&out).as_slice())
} }
/// Checks if the header is empty - has no transactions and no ommers
pub fn is_empty(&self) -> bool {
self.ommers_hash == EMPTY_LIST_HASH && self.transactions_root == EMPTY_ROOT
}
/// Calculate hash and seal the Header so that it can't be changed. /// Calculate hash and seal the Header so that it can't be changed.
pub fn seal(self) -> SealedHeader { pub fn seal(self) -> SealedHeader {
let hash = self.hash_slow(); let hash = self.hash_slow();

View File

@ -70,6 +70,8 @@ pub type ChainId = u64;
pub type StorageKey = H256; pub type StorageKey = H256;
/// An account storage value. /// An account storage value.
pub type StorageValue = U256; pub type StorageValue = U256;
/// The ID of block/transaction transition (represents state transition)
pub type TransitionId = u64;
pub use ethers_core::{ pub use ethers_core::{
types as rpc, types as rpc,

View File

@ -1,3 +1,4 @@
#![allow(dead_code)]
use std::{ use std::{
fmt::Debug, fmt::Debug,
ops::{Deref, DerefMut}, ops::{Deref, DerefMut},
@ -6,13 +7,13 @@ use std::{
use reth_db::{ use reth_db::{
cursor::{DbCursorRO, DbCursorRW}, cursor::{DbCursorRO, DbCursorRW},
database::{Database, DatabaseGAT}, database::{Database, DatabaseGAT},
models::{BlockNumHash, NumTransactions}, models::{BlockNumHash, StoredBlockBody},
table::Table, table::Table,
tables, tables,
transaction::{DbTx, DbTxMut}, transaction::{DbTx, DbTxMut},
Error, Error,
}; };
use reth_primitives::{BlockHash, BlockNumber, TxNumber}; use reth_primitives::{BlockHash, BlockNumber, TransitionId, TxNumber};
use crate::{DatabaseIntegrityError, StageError}; use crate::{DatabaseIntegrityError, StageError};
@ -118,77 +119,62 @@ where
Ok((number, self.get_block_hash(number)?).into()) Ok((number, self.get_block_hash(number)?).into())
} }
/// Query [tables::CumulativeTxCount] table for total transaction /// Query the block body by [BlockNumHash] key
/// count block by [BlockNumHash] key pub(crate) fn get_block_body(&self, key: BlockNumHash) -> Result<StoredBlockBody, StageError> {
pub(crate) fn get_tx_count(&self, key: BlockNumHash) -> Result<NumTransactions, StageError> { let body = self
let count = self.get::<tables::CumulativeTxCount>(key)?.ok_or( .get::<tables::BlockBodies>(key)?
DatabaseIntegrityError::CumulativeTxCount { number: key.number(), hash: key.hash() }, .ok_or(DatabaseIntegrityError::BlockBody { number: key.number() })?;
)?; Ok(body)
Ok(count)
} }
/// Get id for the first **potential** transaction in a block by looking up /// Query the block body by number
/// the cumulative transaction count at the previous block. pub(crate) fn get_block_body_by_num(
/// &self,
/// This function does not care whether the block is empty. number: BlockNumber,
pub(crate) fn get_first_tx_id(&self, block: BlockNumber) -> Result<TxNumber, StageError> { ) -> Result<StoredBlockBody, StageError> {
// Handle genesis block let key = self.get_block_numhash(number)?;
self.get_block_body(key)
}
/// Query the last transition of the block by [BlockNumHash] key
pub(crate) fn get_block_transition(
&self,
key: BlockNumHash,
) -> Result<TransitionId, StageError> {
let last_transition_id = self.get::<tables::BlockTransitionIndex>(key)?.ok_or(
DatabaseIntegrityError::BlockTransition { number: key.number(), hash: key.hash() },
)?;
Ok(last_transition_id)
}
/// Query the last transition of the block by number
pub(crate) fn get_block_transition_by_num(
&self,
number: BlockNumber,
) -> Result<TransitionId, StageError> {
let key = self.get_block_numhash(number)?;
self.get_block_transition(key)
}
/// Get the next start transaction id and transition for the `block` by looking at the previous
/// block. Returns Zero/Zero for Genesis.
pub(crate) fn get_next_block_ids(
&self,
block: BlockNumber,
) -> Result<(TxNumber, TransitionId), StageError> {
if block == 0 { if block == 0 {
return Ok(0) return Ok((0, 0))
} }
let prev_key = self.get_block_numhash(block - 1)?; let prev_key = self.get_block_numhash(block - 1)?;
self.get_tx_count(prev_key) let prev_body = self.get_block_body(prev_key)?;
} let last_transition = self.get::<tables::BlockTransitionIndex>(prev_key)?.ok_or(
DatabaseIntegrityError::BlockTransition {
/// Get id of the last transaction in the block. number: prev_key.number(),
/// Returns [None] if the block is empty. hash: prev_key.hash(),
/// },
/// The blocks must exist in the database. )?;
#[allow(dead_code)] Ok((prev_body.start_tx_id + prev_body.tx_count, last_transition + 1))
pub(crate) fn get_last_tx_id(
&self,
block: BlockNumber,
) -> Result<Option<TxNumber>, StageError> {
let key = self.get_block_numhash(block)?;
let mut cursor = self.cursor::<tables::CumulativeTxCount>()?;
let (_, tx_count) =
cursor.seek_exact(key)?.ok_or(DatabaseIntegrityError::CumulativeTxCount {
number: key.number(),
hash: key.hash(),
})?;
let is_empty = {
if block != 0 {
let (_, prev_tx_count) =
cursor.prev()?.ok_or(DatabaseIntegrityError::CumulativeTxCount {
number: key.number() + 1,
hash: self.get_block_hash(key.number() + 1)?,
})?;
tx_count != prev_tx_count
} else {
tx_count == 0
}
};
Ok(if !is_empty { Some(tx_count - 1) } else { None })
}
/// Get id of the latest transaction observed before a given block (inclusive).
/// Returns error if there are no transactions in the database.
pub(crate) fn get_latest_tx_id(
&self,
up_to_block: BlockNumber,
) -> Result<TxNumber, StageError> {
let key = self.get_block_numhash(up_to_block)?;
let tx_count = self.get_tx_count(key)?;
if tx_count != 0 {
Ok(tx_count - 1)
} else {
// No transactions in the database
Err(DatabaseIntegrityError::Transaction { id: 0 }.into())
}
} }
/// Unwind table by some number key /// Unwind table by some number key

View File

@ -1,6 +1,6 @@
use crate::pipeline::PipelineEvent; use crate::pipeline::PipelineEvent;
use reth_interfaces::{consensus, db::Error as DbError, executor}; use reth_interfaces::{consensus, db::Error as DbError, executor};
use reth_primitives::{BlockNumber, TxNumber, H256}; use reth_primitives::{BlockHash, BlockNumber, TxNumber, H256};
use thiserror::Error; use thiserror::Error;
use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::error::SendError;
@ -76,27 +76,27 @@ pub enum DatabaseIntegrityError {
number: BlockNumber, number: BlockNumber,
}, },
/// A header is missing from the database. /// A header is missing from the database.
#[error("No header for block #{number} ({hash})")] #[error("No header for block #{number} ({hash:?})")]
Header { Header {
/// The block number key /// The block number key
number: BlockNumber, number: BlockNumber,
/// The block hash key /// The block hash key
hash: H256, hash: H256,
}, },
/// The cumulative transaction count is missing from the database.
#[error("No cumulative tx count for #{number} ({hash})")]
CumulativeTxCount {
/// The block number key
number: BlockNumber,
/// The block hash key
hash: H256,
},
/// A block body is missing. /// A block body is missing.
#[error("Block body not found for block #{number}")] #[error("Block body not found for block #{number}")]
BlockBody { BlockBody {
/// The block number key /// The block number key
number: BlockNumber, number: BlockNumber,
}, },
/// The transaction is missing
#[error("Transaction #{id} not found")]
Transaction {
/// The transaction id
id: TxNumber,
},
#[error("Block transition not found for block #{number} ({hash:?})")]
BlockTransition { number: BlockNumber, hash: BlockHash },
#[error("Gap in transaction table. Missing tx number #{missing}.")] #[error("Gap in transaction table. Missing tx number #{missing}.")]
TransactionsGap { missing: TxNumber }, TransactionsGap { missing: TxNumber },
#[error("Gap in transaction signer table. Missing tx number #{missing}.")] #[error("Gap in transaction signer table. Missing tx number #{missing}.")]
@ -111,12 +111,6 @@ pub enum DatabaseIntegrityError {
/// The block number key /// The block number key
number: BlockNumber, number: BlockNumber,
}, },
/// The transaction is missing
#[error("Transaction #{id} not found")]
Transaction {
/// The transaction id
id: TxNumber,
},
} }
/// A pipeline execution error. /// A pipeline execution error.

View File

@ -6,15 +6,15 @@ use futures_util::StreamExt;
use reth_db::{ use reth_db::{
cursor::{DbCursorRO, DbCursorRW}, cursor::{DbCursorRO, DbCursorRW},
database::{Database, DatabaseGAT}, database::{Database, DatabaseGAT},
models::StoredBlockOmmers, models::{StoredBlockBody, StoredBlockOmmers},
tables, tables,
transaction::{DbTx, DbTxMut}, transaction::{DbTx, DbTxMut},
}; };
use reth_interfaces::{consensus::Consensus, p2p::bodies::downloader::BodyDownloader}; use reth_interfaces::{
use reth_primitives::{ consensus::Consensus,
proofs::{EMPTY_LIST_HASH, EMPTY_ROOT}, p2p::bodies::downloader::{BlockResponse, BodyDownloader},
BlockNumber, SealedHeader,
}; };
use reth_primitives::{BlockNumber, SealedHeader};
use std::{fmt::Debug, sync::Arc}; use std::{fmt::Debug, sync::Arc};
use tracing::{error, warn}; use tracing::{error, warn};
@ -87,31 +87,35 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
} }
// The block we ended at last sync, and the one we are starting on now // The block we ended at last sync, and the one we are starting on now
let previous_block = input.stage_progress.unwrap_or_default(); let stage_progress = input.stage_progress.unwrap_or_default();
let starting_block = previous_block + 1; let starting_block = stage_progress + 1;
// Short circuit in case we already reached the target block // Short circuit in case we already reached the target block
let target = previous_stage_progress.min(starting_block + self.commit_threshold); let target = previous_stage_progress.min(starting_block + self.commit_threshold);
if target <= previous_block { if target <= stage_progress {
return Ok(ExecOutput { stage_progress: target, reached_tip: true, done: true }) return Ok(ExecOutput { stage_progress, reached_tip: true, done: true })
} }
let bodies_to_download = self.bodies_to_download::<DB>(db, starting_block, target)?; let bodies_to_download = self.bodies_to_download::<DB>(db, starting_block, target)?;
// Cursors used to write bodies and transactions // Cursors used to write bodies, ommers and transactions
let mut body_cursor = db.cursor_mut::<tables::BlockBodies>()?;
let mut ommers_cursor = db.cursor_mut::<tables::BlockOmmers>()?; let mut ommers_cursor = db.cursor_mut::<tables::BlockOmmers>()?;
let mut tx_cursor = db.cursor_mut::<tables::Transactions>()?; let mut tx_cursor = db.cursor_mut::<tables::Transactions>()?;
let mut tx_count_cursor = db.cursor_mut::<tables::CumulativeTxCount>()?;
// Cursors used to write state transition mapping
let mut block_transition_cursor = db.cursor_mut::<tables::BlockTransitionIndex>()?;
let mut tx_transition_cursor = db.cursor_mut::<tables::TxTransitionIndex>()?;
// Get id for the first transaction in the block // Get id for the first transaction in the block
let mut first_tx_id = db.get_first_tx_id(starting_block)?; let (mut current_tx_id, mut transition_id) = db.get_next_block_ids(starting_block)?;
// NOTE(onbjerg): The stream needs to live here otherwise it will just create a new iterator // NOTE(onbjerg): The stream needs to live here otherwise it will just create a new iterator
// on every iteration of the while loop -_- // on every iteration of the while loop -_-
let mut bodies_stream = self.downloader.bodies_stream(bodies_to_download.iter()); let mut bodies_stream = self.downloader.bodies_stream(bodies_to_download.iter());
let mut highest_block = previous_block; let mut highest_block = stage_progress;
while let Some(result) = bodies_stream.next().await { while let Some(result) = bodies_stream.next().await {
let Ok(block) = result else { let Ok(response) = result else {
error!( error!(
"Encountered an error downloading block {}: {:?}", "Encountered an error downloading block {}: {:?}",
highest_block + 1, highest_block + 1,
@ -123,34 +127,61 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
reached_tip: false, reached_tip: false,
}) })
}; };
let block_number = block.number;
// Write block
let key = (block_number, block.hash()).into();
// Additional +1, increments tx count to allow indexing of ChangeSet that contains block
// reward. This can't be added to last transaction ChangeSet as it would
// break if block is empty.
let this_tx_count = first_tx_id +
block.body.len() as u64 +
if self.consensus.has_block_reward(block.number) { 1 } else { 0 };
tx_count_cursor.append(key, this_tx_count)?;
ommers_cursor.append(
key,
StoredBlockOmmers {
ommers: block.ommers.into_iter().map(|header| header.unseal()).collect(),
},
)?;
// Write transactions // Write block
for transaction in block.body { let block_header = response.header();
// Insert the transaction hash to number mapping let block_number = block_header.number;
db.put::<tables::TxHashNumber>(transaction.hash(), first_tx_id)?; let block_key = block_header.num_hash().into();
// Append the transaction
tx_cursor.append(first_tx_id, transaction)?; match response {
first_tx_id += 1; BlockResponse::Full(block) => {
body_cursor.append(
block_key,
StoredBlockBody {
start_tx_id: current_tx_id,
tx_count: block.body.len() as u64,
},
)?;
ommers_cursor.append(
block_key,
StoredBlockOmmers {
ommers: block
.ommers
.into_iter()
.map(|header| header.unseal())
.collect(),
},
)?;
// Write transactions
for transaction in block.body {
// Insert the transaction hash to number mapping
db.put::<tables::TxHashNumber>(transaction.hash(), current_tx_id)?;
// Append the transaction
tx_cursor.append(current_tx_id, transaction)?;
tx_transition_cursor.append(current_tx_id, transition_id)?;
current_tx_id += 1;
transition_id += 1;
}
}
BlockResponse::Empty(_) => {
body_cursor.append(
block_key,
StoredBlockBody { start_tx_id: current_tx_id, tx_count: 0 },
)?;
}
};
// The block transition marks the final state at the end of the block.
// Increment the transition if the block contains an addition block reward.
// If the block does not have a reward, the transition will be the same as the
// transition at the last transaction of this block.
if self.consensus.has_block_reward(block_number) {
transition_id += 1;
} }
block_transition_cursor.append(block_key, transition_id)?;
highest_block = block_number; highest_block = block_number;
first_tx_id = this_tx_count;
} }
// The stage is "done" if: // The stage is "done" if:
@ -168,38 +199,51 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
db: &mut Transaction<'_, DB>, db: &mut Transaction<'_, DB>,
input: UnwindInput, input: UnwindInput,
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>> { ) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>> {
let mut tx_count_cursor = db.cursor_mut::<tables::CumulativeTxCount>()?; // Cursors to unwind bodies, ommers, transactions and tx hash to number
let mut block_ommers_cursor = db.cursor_mut::<tables::BlockOmmers>()?; let mut body_cursor = db.cursor_mut::<tables::BlockBodies>()?;
let mut ommers_cursor = db.cursor_mut::<tables::BlockOmmers>()?;
let mut transaction_cursor = db.cursor_mut::<tables::Transactions>()?; let mut transaction_cursor = db.cursor_mut::<tables::Transactions>()?;
let mut tx_hash_number_cursor = db.cursor_mut::<tables::TxHashNumber>()?; let mut tx_hash_number_cursor = db.cursor_mut::<tables::TxHashNumber>()?;
// Cursors to unwind transitions
let mut block_transition_cursor = db.cursor_mut::<tables::BlockTransitionIndex>()?;
let mut tx_transition_cursor = db.cursor_mut::<tables::TxTransitionIndex>()?;
let mut entry = tx_count_cursor.last()?; // let mut entry = tx_count_cursor.last()?;
while let Some((key, count)) = entry { let mut entry = body_cursor.last()?;
while let Some((key, body)) = entry {
if key.number() <= input.unwind_to { if key.number() <= input.unwind_to {
break break
} }
// First delete the current and find the previous cum tx count value // Delete the ommers value if any
tx_count_cursor.delete_current()?; if ommers_cursor.seek_exact(key)?.is_some() {
entry = tx_count_cursor.prev()?; ommers_cursor.delete_current()?;
if block_ommers_cursor.seek_exact(key)?.is_some() {
block_ommers_cursor.delete_current()?;
} }
let prev_count = entry.map(|(_, v)| v).unwrap_or_default(); // Delete the block transition if any
for tx_id in prev_count..count { if block_transition_cursor.seek_exact(key)?.is_some() {
// Block reward introduces gaps in transaction (Last tx number can be the gap) block_transition_cursor.delete_current()?;
// this is why we are checking if tx exist or not. }
// NOTE: more performant way is probably to use `prev`/`next` fn. and reduce
// count by one if block has block reward. // Delete all transactions that belong to this block
for tx_id in body.tx_id_range() {
// First delete the transaction and hash to id mapping
if let Some((_, transaction)) = transaction_cursor.seek_exact(tx_id)? { if let Some((_, transaction)) = transaction_cursor.seek_exact(tx_id)? {
transaction_cursor.delete_current()?; transaction_cursor.delete_current()?;
if tx_hash_number_cursor.seek_exact(transaction.hash)?.is_some() { if tx_hash_number_cursor.seek_exact(transaction.hash)?.is_some() {
tx_hash_number_cursor.delete_current()?; tx_hash_number_cursor.delete_current()?;
} }
} }
// Delete the transaction transition if any
if tx_transition_cursor.seek_exact(tx_id)?.is_some() {
tx_transition_cursor.delete_current()?;
}
} }
// Delete the current body value
body_cursor.delete_current()?;
// Move the cursor to the previous value
entry = body_cursor.prev()?;
} }
Ok(UnwindOutput { stage_progress: input.unwind_to }) Ok(UnwindOutput { stage_progress: input.unwind_to })
@ -228,14 +272,6 @@ impl<D: BodyDownloader, C: Consensus> BodyStage<D, C> {
let (_, header) = header_cursor.seek_exact((block_number, header_hash).into())?.ok_or( let (_, header) = header_cursor.seek_exact((block_number, header_hash).into())?.ok_or(
DatabaseIntegrityError::Header { number: block_number, hash: header_hash }, DatabaseIntegrityError::Header { number: block_number, hash: header_hash },
)?; )?;
if header.ommers_hash == EMPTY_LIST_HASH && header.transactions_root == EMPTY_ROOT {
// TODO: fix this
// If we indeed move to the new changeset structure let's not forget to add a note
// that the gaps issue with the returned empty bodies stream is no longer present
continue
}
bodies_to_download.push(SealedHeader::new(header, header_hash)); bodies_to_download.push(SealedHeader::new(header, header_hash));
} }
@ -503,14 +539,17 @@ mod tests {
use assert_matches::assert_matches; use assert_matches::assert_matches;
use reth_db::{ use reth_db::{
cursor::DbCursorRO, cursor::DbCursorRO,
models::{BlockNumHash, NumTransactions, StoredBlockOmmers}, models::{BlockNumHash, StoredBlockBody, StoredBlockOmmers},
tables, tables,
transaction::{DbTx, DbTxMut}, transaction::{DbTx, DbTxMut},
}; };
use reth_eth_wire::BlockBody; use reth_eth_wire::BlockBody;
use reth_interfaces::{ use reth_interfaces::{
p2p::{ p2p::{
bodies::{client::BodiesClient, downloader::BodyDownloader}, bodies::{
client::BodiesClient,
downloader::{BlockResponse, BodyDownloader},
},
downloader::{DownloadClient, DownloadStream, Downloader}, downloader::{DownloadClient, DownloadStream, Downloader},
error::{DownloadResult, PeerRequestResult}, error::{DownloadResult, PeerRequestResult},
}, },
@ -519,7 +558,7 @@ mod tests {
TestConsensus, TestConsensus,
}, },
}; };
use reth_primitives::{BlockLocked, BlockNumber, Header, SealedHeader, H256}; use reth_primitives::{BlockLocked, BlockNumber, SealedHeader, TxNumber, H256};
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
/// The block hash of the genesis block. /// The block hash of the genesis block.
@ -589,7 +628,6 @@ mod tests {
type Seed = Vec<BlockLocked>; type Seed = Vec<BlockLocked>;
fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> { fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
self.insert_genesis()?;
let start = input.stage_progress.unwrap_or_default(); let start = input.stage_progress.unwrap_or_default();
let end = input.previous_stage_progress() + 1; let end = input.previous_stage_progress() + 1;
let blocks = random_block_range(start..end, GENESIS_HASH); let blocks = random_block_range(start..end, GENESIS_HASH);
@ -598,20 +636,27 @@ mod tests {
// Insert last progress data // Insert last progress data
self.db.commit(|tx| { self.db.commit(|tx| {
let key = (progress.number, progress.hash()).into(); let key = (progress.number, progress.hash()).into();
let last_count = tx let body = StoredBlockBody {
.cursor::<tables::CumulativeTxCount>()? start_tx_id: 0,
.last()? tx_count: progress.body.len() as u64,
.map(|(_, v)| v) };
.unwrap_or_default(); body.tx_id_range().try_for_each(|tx_id| {
// +1 for block reward,
let tx_count = last_count + progress.body.len() as u64 + 1;
tx.put::<tables::CumulativeTxCount>(key, tx_count)?;
tx.put::<tables::BlockOmmers>(key, StoredBlockOmmers { ommers: vec![] })?;
(last_count..tx_count).try_for_each(|idx| {
let transaction = random_signed_tx(); let transaction = random_signed_tx();
tx.put::<tables::TxHashNumber>(transaction.hash(), idx)?; tx.put::<tables::TxHashNumber>(transaction.hash(), tx_id)?;
tx.put::<tables::Transactions>(idx, transaction) tx.put::<tables::Transactions>(tx_id, transaction)?;
}) tx.put::<tables::TxTransitionIndex>(tx_id, tx_id)
})?;
// Randomize rewards
let has_reward: bool = rand::random();
let last_transition_id = progress.body.len().saturating_sub(1) as u64;
let block_transition_id =
last_transition_id + if has_reward { 1 } else { 0 };
tx.put::<tables::BlockTransitionIndex>(key, block_transition_id)?;
tx.put::<tables::BlockBodies>(key, body)?;
tx.put::<tables::BlockOmmers>(key, StoredBlockOmmers { ommers: vec![] })?;
Ok(())
})?; })?;
} }
self.set_responses(blocks.iter().map(body_by_hash).collect()); self.set_responses(blocks.iter().map(body_by_hash).collect());
@ -633,16 +678,23 @@ mod tests {
impl UnwindStageTestRunner for BodyTestRunner { impl UnwindStageTestRunner for BodyTestRunner {
fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> { fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
self.db.check_no_entry_above::<tables::BlockBodies, _>(input.unwind_to, |key| {
key.number()
})?;
self.db.check_no_entry_above::<tables::BlockOmmers, _>(input.unwind_to, |key| { self.db.check_no_entry_above::<tables::BlockOmmers, _>(input.unwind_to, |key| {
key.number() key.number()
})?; })?;
self.db.check_no_entry_above::<tables::CumulativeTxCount, _>( self.db.check_no_entry_above::<tables::BlockTransitionIndex, _>(
input.unwind_to, input.unwind_to,
|key| key.number(), |key| key.number(),
)?; )?;
if let Some(last_tx_id) = self.last_count() { if let Some(last_tx_id) = self.get_last_tx_id()? {
self.db self.db
.check_no_entry_above::<tables::Transactions, _>(last_tx_id, |key| key)?; .check_no_entry_above::<tables::Transactions, _>(last_tx_id, |key| key)?;
self.db.check_no_entry_above::<tables::TxTransitionIndex, _>(
last_tx_id,
|key| key,
)?;
self.db.check_no_entry_above_by_value::<tables::TxHashNumber, _>( self.db.check_no_entry_above_by_value::<tables::TxHashNumber, _>(
last_tx_id, last_tx_id,
|value| value, |value| value,
@ -653,28 +705,18 @@ mod tests {
} }
impl BodyTestRunner { impl BodyTestRunner {
/// Insert the genesis block into the appropriate tables /// Get the last available tx id if any
/// pub(crate) fn get_last_tx_id(&self) -> Result<Option<TxNumber>, TestRunnerError> {
/// The genesis block always has no transactions and no ommers, and it always has the let last_body = self.db.query(|tx| {
/// same hash. let v = tx.cursor::<tables::BlockBodies>()?.last()?;
pub(crate) fn insert_genesis(&self) -> Result<(), TestRunnerError> { Ok(v)
let header = SealedHeader::new(Header::default(), GENESIS_HASH);
self.db.insert_headers(std::iter::once(&header))?;
self.db.commit(|tx| {
let key = (0, GENESIS_HASH).into();
tx.put::<tables::CumulativeTxCount>(key, 0)?;
tx.put::<tables::BlockOmmers>(key, StoredBlockOmmers { ommers: vec![] })
})?; })?;
Ok(match last_body {
Ok(()) Some((_, body)) if body.tx_count != 0 => {
} Some(body.start_tx_id + body.tx_count - 1)
}
/// Retrieve the last tx count from the database _ => None,
pub(crate) fn last_count(&self) -> Option<NumTransactions> { })
self.db
.query(|tx| Ok(tx.cursor::<tables::CumulativeTxCount>()?.last()?.map(|e| e.1)))
.ok()
.flatten()
} }
/// Validate that the inserted block data is valid /// Validate that the inserted block data is valid
@ -685,26 +727,27 @@ mod tests {
) -> Result<(), TestRunnerError> { ) -> Result<(), TestRunnerError> {
self.db.query(|tx| { self.db.query(|tx| {
// Acquire cursors on body related tables // Acquire cursors on body related tables
let mut bodies_cursor = tx.cursor::<tables::BlockBodies>()?;
let mut ommers_cursor = tx.cursor::<tables::BlockOmmers>()?; let mut ommers_cursor = tx.cursor::<tables::BlockOmmers>()?;
let mut tx_count_cursor = tx.cursor::<tables::CumulativeTxCount>()?; let mut block_transition_cursor = tx.cursor::<tables::BlockTransitionIndex>()?;
let mut transaction_cursor = tx.cursor::<tables::Transactions>()?; let mut transaction_cursor = tx.cursor::<tables::Transactions>()?;
let mut tx_hash_num_cursor = tx.cursor::<tables::TxHashNumber>()?; let mut tx_hash_num_cursor = tx.cursor::<tables::TxHashNumber>()?;
let mut tx_transition_cursor = tx.cursor::<tables::TxTransitionIndex>()?;
let first_tx_count_key = match tx_count_cursor.first()? { let first_body_key = match bodies_cursor.first()? {
Some((key, _)) => key, Some((key, _)) => key,
None => return Ok(()), None => return Ok(()),
}; };
let walker = tx_count_cursor.walk(first_tx_count_key)?.peekable();
let mut prev_entry: Option<(BlockNumHash, NumTransactions)> = None; let mut prev_key: Option<BlockNumHash> = None;
for entry in walker { for entry in bodies_cursor.walk(first_body_key)? {
let (key, count) = entry?; let (key, body) = entry?;
// Validate sequentiality only after prev progress, // Validate sequentiality only after prev progress,
// since the data before is mocked and can contain gaps // since the data before is mocked and can contain gaps
if key.number() > prev_progress { if key.number() > prev_progress {
if let Some((prev_key, _)) = prev_entry { if let Some(prev_key) = prev_key {
assert_eq!(prev_key.number() + 1, key.number(), "Tx count entries must be sequential"); assert_eq!(prev_key.number() + 1, key.number(), "Body entries must be sequential");
} }
} }
@ -718,21 +761,23 @@ mod tests {
// Validate that ommers exist // Validate that ommers exist
assert_matches!(ommers_cursor.seek_exact(key), Ok(Some(_)), "Block ommers are missing"); assert_matches!(ommers_cursor.seek_exact(key), Ok(Some(_)), "Block ommers are missing");
// Validate that block trasactions exist // Validate that block transition exists
let first_tx_id = prev_entry.map(|(_, v)| v).unwrap_or_default(); assert_matches!(block_transition_cursor.seek_exact(key), Ok(Some(_)), "Block transition is missing");
// reduce by one for block_reward index
let tx_count = if count == 0 { 0 } else { count - 1 }; for tx_id in body.tx_id_range() {
for tx_id in first_tx_id..tx_count {
let tx_entry = transaction_cursor.seek_exact(tx_id)?; let tx_entry = transaction_cursor.seek_exact(tx_id)?;
assert!(tx_entry.is_some(), "A transaction is missing."); assert!(tx_entry.is_some(), "Transaction is missing.");
assert_matches!(
tx_transition_cursor.seek_exact(tx_id), Ok(Some(_)), "Transaction transition is missing"
);
assert_matches!( assert_matches!(
tx_hash_num_cursor.seek_exact(tx_entry.unwrap().1.hash), tx_hash_num_cursor.seek_exact(tx_entry.unwrap().1.hash),
Ok(Some(_)), Ok(Some(_)),
"A transaction hash to index mapping is missing." "Transaction hash to index mapping is missing."
); );
} }
prev_entry = Some((key, count)); prev_key = Some(key);
} }
Ok(()) Ok(())
})?; })?;
@ -753,7 +798,7 @@ mod tests {
#[async_trait::async_trait] #[async_trait::async_trait]
impl BodiesClient for NoopClient { impl BodiesClient for NoopClient {
async fn get_block_body(&self, _: Vec<H256>) -> PeerRequestResult<Vec<BlockBody>> { async fn get_block_bodies(&self, _: Vec<H256>) -> PeerRequestResult<Vec<BlockBody>> {
panic!("Noop client should not be called") panic!("Noop client should not be called")
} }
} }
@ -785,7 +830,7 @@ mod tests {
} }
impl BodyDownloader for TestBodyDownloader { impl BodyDownloader for TestBodyDownloader {
fn bodies_stream<'a, 'b, I>(&'a self, hashes: I) -> DownloadStream<'a, BlockLocked> fn bodies_stream<'a, 'b, I>(&'a self, hashes: I) -> DownloadStream<'a, BlockResponse>
where where
I: IntoIterator<Item = &'b SealedHeader>, I: IntoIterator<Item = &'b SealedHeader>,
<I as IntoIterator>::IntoIter: Send + 'b, <I as IntoIterator>::IntoIter: Send + 'b,
@ -797,11 +842,11 @@ mod tests {
.get(&header.hash()) .get(&header.hash())
.expect("Stage tried downloading a block we do not have.") .expect("Stage tried downloading a block we do not have.")
.clone()?; .clone()?;
Ok(BlockLocked { Ok(BlockResponse::Full(BlockLocked {
header: header.clone(), header: header.clone(),
body: result.transactions, body: result.transactions,
ommers: result.ommers.into_iter().map(|header| header.seal()).collect(), ommers: result.ommers.into_iter().map(|header| header.seal()).collect(),
}) }))
}))) })))
} }
} }

View File

@ -3,9 +3,9 @@ use crate::{
UnwindInput, UnwindOutput, UnwindInput, UnwindOutput,
}; };
use reth_db::{ use reth_db::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO}, cursor::{DbCursorRO, DbCursorRW},
database::Database, database::Database,
models::{BlockNumHash, TxNumberAddress}, models::{BlockNumHash, StoredBlockBody, TransitionIdAddress},
tables, tables,
transaction::{DbTx, DbTxMut}, transaction::{DbTx, DbTxMut},
}; };
@ -15,9 +15,9 @@ use reth_executor::{
revm_wrap::{State, SubState}, revm_wrap::{State, SubState},
Config, Config,
}; };
use reth_primitives::{Address, StorageEntry, TransactionSignedEcRecovered, H256, U256}; use reth_primitives::{Address, Header, StorageEntry, TransactionSignedEcRecovered, H256, U256};
use reth_provider::StateProviderImplRefLatest; use reth_provider::StateProviderImplRefLatest;
use std::{fmt::Debug, ops::DerefMut}; use std::fmt::Debug;
const EXECUTION: StageId = StageId("Execution"); const EXECUTION: StageId = StageId("Execution");
@ -83,114 +83,61 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
db: &mut Transaction<'_, DB>, db: &mut Transaction<'_, DB>,
input: ExecInput, input: ExecInput,
) -> Result<ExecOutput, StageError> { ) -> Result<ExecOutput, StageError> {
let db_tx = db.deref_mut();
// none and zero are same as for genesis block (zeroed block) we are making assumption to // none and zero are same as for genesis block (zeroed block) we are making assumption to
// not have transaction. // not have transaction.
let last_block = input.stage_progress.unwrap_or_default(); let last_block = input.stage_progress.unwrap_or_default();
let start_block = last_block + 1; let start_block = last_block + 1;
// Get next canonical block hashes to execute. // Get next canonical block hashes to execute.
let mut canonicals = db_tx.cursor::<tables::CanonicalHeaders>()?; let mut canonicals = db.cursor::<tables::CanonicalHeaders>()?;
// Get header with canonical hashes. // Get header with canonical hashes.
let mut headers = db_tx.cursor::<tables::Headers>()?; let mut headers = db.cursor::<tables::Headers>()?;
// Get bodies (to get tx index) with canonical hashes. // Get bodies with canonical hashes.
let mut cumulative_tx_count = db_tx.cursor::<tables::CumulativeTxCount>()?; let mut bodies_cursor = db.cursor::<tables::BlockBodies>()?;
// Get transaction of the block that we are executing. // Get transaction of the block that we are executing.
let mut tx = db_tx.cursor::<tables::Transactions>()?; let mut tx = db.cursor::<tables::Transactions>()?;
// Skip sender recovery and load signer from database. // Skip sender recovery and load signer from database.
let mut tx_sender = db_tx.cursor::<tables::TxSenders>()?; let mut tx_sender = db.cursor::<tables::TxSenders>()?;
// get canonical blocks (num,hash) // get canonical blocks (num,hash)
let canonical_batch = canonicals let canonical_batch = canonicals
.walk(start_block)? .walk(start_block)?
.take(BATCH_SIZE as usize) .take(BATCH_SIZE as usize) // TODO: commit_threshold
.map(|i| i.map(BlockNumHash)) .map(|i| i.map(BlockNumHash))
.collect::<Result<Vec<_>, _>>()?; .collect::<Result<Vec<_>, _>>()?;
// no more canonical blocks, we are done with execution. // no more canonical blocks, we are done with execution.
if canonical_batch.is_empty() { if canonical_batch.is_empty() {
return Ok(ExecOutput { done: true, reached_tip: true, stage_progress: last_block }) return Ok(ExecOutput { stage_progress: last_block, done: true, reached_tip: true })
} }
// get headers from canonical numbers // Get block headers and bodies from canonical hashes
let headers_batch = canonical_batch let block_batch = canonical_batch
.iter() .iter()
.map(|ch_index| { .map(|key| -> Result<(Header, StoredBlockBody), StageError> {
// TODO see if walker next has better performance then seek_exact calls. // TODO see if walker next has better performance then seek_exact calls.
headers.seek_exact(*ch_index).map_err(StageError::Database).and_then(|res| { let (_, header) =
res.ok_or_else(|| { headers.seek_exact(*key)?.ok_or(DatabaseIntegrityError::Header {
DatabaseIntegrityError::Header { number: key.number(),
number: ch_index.number(), hash: key.hash(),
hash: ch_index.hash(), })?;
} let (_, body) = bodies_cursor
.into() .seek_exact(*key)?
}) .ok_or(DatabaseIntegrityError::BlockBody { number: key.number() })?;
.map(|(_, header)| header) Ok((header, body))
})
})
.collect::<Result<Vec<_>, _>>()?;
// get last tx count so that we can know amount of transaction in the block.
let mut last_tx_index = if last_block == 0 {
0u64
} else {
// headers_batch is not empty,
let parent_hash = headers_batch[0].parent_hash;
let (_, tx_cnt) = cumulative_tx_count
.seek_exact(BlockNumHash((last_block, parent_hash)))?
.ok_or(DatabaseIntegrityError::CumulativeTxCount {
number: last_block,
hash: parent_hash,
})?;
tx_cnt
};
let tx_index_ranges = canonical_batch
.iter()
.map(|ch_index| {
// TODO see if walker next has better performance then seek_exact calls.
cumulative_tx_count
.seek_exact(*ch_index)
.map_err(StageError::Database)
.and_then(|res| {
res.ok_or_else(|| {
DatabaseIntegrityError::CumulativeTxCount {
number: ch_index.number(),
hash: ch_index.hash(),
}
.into()
})
})
.map(|(_, cumulative_tx_count)| {
let ret = if self.config.spec_upgrades.has_block_reward(ch_index.number()) {
// if there is block reward, cumulative tx count needs to remove block
// reward index. It is okay to subtract it, as
// block reward index is calculated in the block stage.
(last_tx_index, cumulative_tx_count - 1, Some(cumulative_tx_count - 1))
} else {
// if there is no block reward we just need to use tx_count
(last_tx_index, cumulative_tx_count, None)
};
last_tx_index = cumulative_tx_count;
ret
})
}) })
.collect::<Result<Vec<_>, _>>()?; .collect::<Result<Vec<_>, _>>()?;
// Fetch transactions, execute them and generate results // Fetch transactions, execute them and generate results
let mut block_change_patches = Vec::with_capacity(canonical_batch.len()); let mut block_change_patches = Vec::with_capacity(canonical_batch.len());
for (header, (start_tx_index, end_tx_index, block_reward_index)) in for (header, body) in block_batch.iter() {
headers_batch.iter().zip(tx_index_ranges.iter())
{
let num = header.number; let num = header.number;
tracing::trace!(target: "stages::execution",?num, "Execute block num."); tracing::trace!(target: "stages::execution", ?num, "Execute block num.");
let body_tx_cnt = end_tx_index - start_tx_index;
// iterate over all transactions // iterate over all transactions
let mut tx_walker = tx.walk(*start_tx_index)?; let mut tx_walker = tx.walk(body.start_tx_id)?;
let mut transactions = Vec::with_capacity(body_tx_cnt as usize); let mut transactions = Vec::with_capacity(body.tx_count as usize);
// get next N transactions. // get next N transactions.
for index in *start_tx_index..*end_tx_index { for index in body.tx_id_range() {
let (tx_index, tx) = let (tx_index, tx) =
tx_walker.next().ok_or(DatabaseIntegrityError::EndOfTransactionTable)??; tx_walker.next().ok_or(DatabaseIntegrityError::EndOfTransactionTable)??;
if tx_index != index { if tx_index != index {
@ -200,9 +147,9 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
} }
// take signers // take signers
let mut tx_sender_walker = tx_sender.walk(*start_tx_index)?; let mut tx_sender_walker = tx_sender.walk(body.start_tx_id)?;
let mut signers = Vec::with_capacity(body_tx_cnt as usize); let mut signers = Vec::with_capacity(body.tx_count as usize);
for index in *start_tx_index..*end_tx_index { for index in body.tx_id_range() {
let (tx_index, tx) = tx_sender_walker let (tx_index, tx) = tx_sender_walker
.next() .next()
.ok_or(DatabaseIntegrityError::EndOfTransactionSenderTable)??; .ok_or(DatabaseIntegrityError::EndOfTransactionSenderTable)??;
@ -223,8 +170,7 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
.collect(); .collect();
// for now use default eth config // for now use default eth config
let state_provider = SubState::new(State::new(StateProviderImplRefLatest::new(&**db)));
let state_provider = SubState::new(State::new(StateProviderImplRefLatest::new(db_tx)));
let change_set = std::thread::scope(|scope| { let change_set = std::thread::scope(|scope| {
let handle = std::thread::Builder::new() let handle = std::thread::Builder::new()
@ -244,59 +190,59 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
handle.join().expect("Expects for thread to not panic") handle.join().expect("Expects for thread to not panic")
}) })
.map_err(|error| StageError::ExecutionError { block: header.number, error })?; .map_err(|error| StageError::ExecutionError { block: header.number, error })?;
block_change_patches.push((change_set, start_tx_index, block_reward_index)); block_change_patches.push(change_set);
} }
// Get last tx count so that we can know amount of transaction in the block.
let mut current_transition_id = db.get_block_transition_by_num(last_block)? + 1;
// apply changes to plain database. // apply changes to plain database.
for (results, start_tx_index, block_reward_index) in block_change_patches.into_iter() { for results in block_change_patches.into_iter() {
// insert state change set // insert state change set
for (index, result) in results.changeset.into_iter().enumerate() { for result in results.changeset.into_iter() {
let tx_index = start_tx_index + index as u64; // TODO insert to transitionId to tx_index
for (address, AccountChangeSet { account, wipe_storage, storage }) in for (address, account_change_set) in result.state_diff.into_iter() {
result.state_diff.into_iter() let AccountChangeSet { account, wipe_storage, storage } = account_change_set;
{
// apply account change to db. Updates AccountChangeSet and PlainAccountState // apply account change to db. Updates AccountChangeSet and PlainAccountState
// tables. // tables.
account.apply_to_db(address, tx_index, db_tx)?; account.apply_to_db(&**db, address, current_transition_id)?;
// wipe storage // wipe storage
if wipe_storage { if wipe_storage {
// TODO insert all changes to StorageChangeSet // TODO insert all changes to StorageChangeSet
db_tx.delete::<tables::PlainStorageState>(address, None)?; db.delete::<tables::PlainStorageState>(address, None)?;
} }
// insert storage changeset // insert storage changeset
let storage_id = TxNumberAddress((tx_index, address)); let storage_id = TransitionIdAddress((current_transition_id, address));
for (key, (old_value, new_value)) in storage { for (key, (old_value, new_value)) in storage {
let mut hkey = H256::zero(); let mut hkey = H256::zero();
key.to_big_endian(&mut hkey.0); key.to_big_endian(&mut hkey.0);
// insert into StorageChangeSet // insert into StorageChangeSet
db_tx.put::<tables::StorageChangeSet>( db.put::<tables::StorageChangeSet>(
storage_id.clone(), storage_id.clone(),
StorageEntry { key: hkey, value: old_value }, StorageEntry { key: hkey, value: old_value },
)?; )?;
if new_value.is_zero() { if new_value.is_zero() {
db_tx.delete::<tables::PlainStorageState>( db.delete::<tables::PlainStorageState>(
address, address,
Some(StorageEntry { key: hkey, value: old_value }), Some(StorageEntry { key: hkey, value: old_value }),
)?; )?;
} else { } else {
db_tx.put::<tables::PlainStorageState>( db.put::<tables::PlainStorageState>(
address, address,
StorageEntry { key: hkey, value: new_value }, StorageEntry { key: hkey, value: new_value },
)?; )?;
} }
} }
current_transition_id += 1;
} }
// insert bytecode // insert bytecode
for (hash, bytecode) in result.new_bytecodes.into_iter() { for (hash, bytecode) in result.new_bytecodes.into_iter() {
// make different types of bytecode. Checked and maybe even analyzed (needs to // make different types of bytecode. Checked and maybe even analyzed (needs to
// be packed). Currently save only raw bytes. // be packed). Currently save only raw bytes.
db_tx.put::<tables::Bytecodes>( db.put::<tables::Bytecodes>(hash, bytecode.bytes()[..bytecode.len()].to_vec())?;
hash,
bytecode.bytes()[..bytecode.len()].to_vec(),
)?;
// NOTE: bytecode bytes are not inserted in change set and it stand in saparate // NOTE: bytecode bytes are not inserted in change set and it stand in saparate
// table // table
@ -307,10 +253,10 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
// TODO add apply_block_reward_changeset to db tx fn which maybe takes an option. // TODO add apply_block_reward_changeset to db tx fn which maybe takes an option.
if let Some(block_reward_changeset) = results.block_reward { if let Some(block_reward_changeset) = results.block_reward {
// we are sure that block reward index is present. // we are sure that block reward index is present.
let block_reward_index = block_reward_index.unwrap();
for (address, changeset) in block_reward_changeset.into_iter() { for (address, changeset) in block_reward_changeset.into_iter() {
changeset.apply_to_db(address, block_reward_index, db_tx)?; changeset.apply_to_db(&**db, address, current_transition_id)?;
} }
current_transition_id += 1;
} }
} }
@ -325,96 +271,74 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
db: &mut Transaction<'_, DB>, db: &mut Transaction<'_, DB>,
input: UnwindInput, input: UnwindInput,
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>> { ) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>> {
let unwind_from = input.stage_progress; // Acquire changeset cursors
let unwind_to = input.unwind_to; let mut account_changeset = db.cursor_dup_mut::<tables::AccountChangeSet>()?;
let _bad_block = input.bad_block; let mut storage_changeset = db.cursor_dup_mut::<tables::StorageChangeSet>()?;
// get block body tx indexes let from_transition = db.get_block_transition_by_num(input.stage_progress)?;
let db_tx = db.deref_mut();
// Get transaction of the block that we are executing. let to_transition = if input.unwind_to != 0 {
let mut account_changeset = db_tx.cursor_dup_mut::<tables::AccountChangeSet>()?; db.get_block_transition_by_num(input.unwind_to - 1)?
// Skip sender recovery and load signer from database.
let mut storage_changeset = db_tx.cursor_dup_mut::<tables::StorageChangeSet>()?;
// get from tx_number
let unwind_from_hash = db_tx
.get::<tables::CanonicalHeaders>(unwind_from)?
.ok_or(DatabaseIntegrityError::CanonicalHeader { number: unwind_from })?;
let from_tx_number = db_tx
.get::<tables::CumulativeTxCount>(BlockNumHash((unwind_from, unwind_from_hash)))?
.ok_or(DatabaseIntegrityError::CumulativeTxCount {
number: unwind_from,
hash: unwind_from_hash,
})?;
// get to tx_number
let to_tx_number = if unwind_to == 0 {
0
} else { } else {
let parent_number = unwind_to - 1; 0
let parent_hash = db_tx
.get::<tables::CanonicalHeaders>(parent_number)?
.ok_or(DatabaseIntegrityError::CanonicalHeader { number: parent_number })?;
db_tx
.get::<tables::CumulativeTxCount>(BlockNumHash((parent_number, parent_hash)))?
.ok_or(DatabaseIntegrityError::CumulativeTxCount {
number: parent_number,
hash: parent_hash,
})?
}; };
if to_tx_number > from_tx_number { if to_transition > from_transition {
panic!("Parents CumulativeTxCount {to_tx_number} is higer then present block #{unwind_to} TxCount {from_tx_number}") panic!("Unwind transition {} (stage progress block #{}) is higher than the transition {} of (unwind block #{})", to_transition, input.stage_progress, from_transition, input.unwind_to);
} }
let num_of_tx = (from_tx_number - to_tx_number) as usize; let num_of_tx = (from_transition - to_transition) as usize;
// if there is no transaction ids, this means blocks were empty and block reward change set // if there is no transaction ids, this means blocks were empty and block reward change set
// is not present. // is not present.
if num_of_tx == 0 { if num_of_tx == 0 {
return Ok(UnwindOutput { stage_progress: unwind_to }) return Ok(UnwindOutput { stage_progress: input.unwind_to })
} }
// get all batches for account change // get all batches for account change
// Check if walk and walk_dup would do the same thing // Check if walk and walk_dup would do the same thing
// TODO(dragan) test walking here
let account_changeset_batch = account_changeset let account_changeset_batch = account_changeset
.walk_dup(to_tx_number, Address::zero())? .walk(to_transition)?
.take_while(|item| item.as_ref().map_or(false, |(num, _)| *num <= from_tx_number)) .take_while(|item| {
item.as_ref().map(|(num, _)| *num <= from_transition).unwrap_or_default()
})
.collect::<Result<Vec<_>, _>>()?; .collect::<Result<Vec<_>, _>>()?;
// revert all changes to PlainState // revert all changes to PlainState
for (_, changeset) in account_changeset_batch.into_iter().rev() { for (_, changeset) in account_changeset_batch.into_iter().rev() {
// TODO refactor in db fn called tx.aplly_account_changeset // TODO refactor in db fn called tx.aplly_account_changeset
if let Some(account_info) = changeset.info { if let Some(account_info) = changeset.info {
db_tx.put::<tables::PlainAccountState>(changeset.address, account_info)?; db.put::<tables::PlainAccountState>(changeset.address, account_info)?;
} else { } else {
db_tx.delete::<tables::PlainAccountState>(changeset.address, None)?; db.delete::<tables::PlainAccountState>(changeset.address, None)?;
} }
} }
// TODO(dragan) fix walking here
// get all batches for storage change // get all batches for storage change
let storage_chageset_batch = storage_changeset let storage_chageset_batch = storage_changeset
.walk_dup(TxNumberAddress((to_tx_number, Address::zero())), H256::zero())? .walk((to_transition, Address::zero()).into())?
.take_while(|item| { .take_while(|item| {
item.as_ref().map_or(false, |(TxNumberAddress((num, _)), _)| *num <= from_tx_number) item.as_ref()
.map(|(key, _)| key.transition_id() <= from_transition)
.unwrap_or_default()
}) })
.collect::<Result<Vec<_>, _>>()?; .collect::<Result<Vec<_>, _>>()?;
// revert all changes to PlainStorage // revert all changes to PlainStorage
for (TxNumberAddress((_, address)), storage) in storage_chageset_batch.into_iter().rev() { for (key, storage) in storage_chageset_batch.into_iter().rev() {
db_tx.put::<tables::PlainStorageState>(address, storage.clone())?; let address = key.address();
db.put::<tables::PlainStorageState>(address, storage.clone())?;
if storage.value == U256::zero() { if storage.value == U256::zero() {
// delete value that is zero // delete value that is zero
db_tx.delete::<tables::PlainStorageState>(address, Some(storage))?; db.delete::<tables::PlainStorageState>(address, Some(storage))?;
} }
} }
// Discard unwinded changesets // Discard unwinded changesets
let mut entry = account_changeset.last()?; let mut entry = account_changeset.last()?;
while let Some((tx_number, _)) = entry { while let Some((transition_id, _)) = entry {
if tx_number < to_tx_number { if transition_id < to_transition {
break break
} }
account_changeset.delete_current()?; account_changeset.delete_current()?;
@ -422,8 +346,8 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
} }
let mut entry = storage_changeset.last()?; let mut entry = storage_changeset.last()?;
while let Some((TxNumberAddress((tx_number, _)), _)) = entry { while let Some((key, _)) = entry {
if tx_number < to_tx_number { if key.transition_id() < to_transition {
break break
} }
storage_changeset.delete_current()?; storage_changeset.delete_current()?;
@ -436,7 +360,7 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::ops::Deref; use std::ops::{Deref, DerefMut};
use super::*; use super::*;
use reth_db::mdbx::{test_utils::create_test_db, EnvKind, WriteMap}; use reth_db::mdbx::{test_utils::create_test_db, EnvKind, WriteMap};

View File

@ -59,24 +59,23 @@ impl<DB: Database> Stage<DB> for SendersStage {
input: ExecInput, input: ExecInput,
) -> Result<ExecOutput, StageError> { ) -> Result<ExecOutput, StageError> {
let stage_progress = input.stage_progress.unwrap_or_default(); let stage_progress = input.stage_progress.unwrap_or_default();
// Look up the start index for the transaction range
let start_tx_index = db.get_first_tx_id(stage_progress + 1)?;
// Look up the end index for transaction range (inclusive)
let previous_stage_progress = input.previous_stage_progress(); let previous_stage_progress = input.previous_stage_progress();
let max_block_num = previous_stage_progress.min(stage_progress + self.commit_threshold); let max_block_num = previous_stage_progress.min(stage_progress + self.commit_threshold);
let end_tx_index = match db.get_latest_tx_id(max_block_num) {
Ok(id) => id, if max_block_num <= stage_progress {
// No transactions in the database return Ok(ExecOutput { stage_progress, reached_tip: true, done: true })
Err(_) => { }
return Ok(ExecOutput {
stage_progress: max_block_num, // Look up the start index for the transaction range
done: true, let start_tx_index = db.get_block_body_by_num(stage_progress + 1)?.start_tx_id;
reached_tip: true,
}) // Look up the end index for transaction range (inclusive)
} let end_tx_index = db.get_block_body_by_num(max_block_num)?.last_tx_index();
};
// No transactions to walk over
if start_tx_index > end_tx_index {
return Ok(ExecOutput { stage_progress: max_block_num, done: true, reached_tip: true })
}
// Acquire the cursor for inserting elements // Acquire the cursor for inserting elements
let mut senders_cursor = db.cursor_mut::<tables::TxSenders>()?; let mut senders_cursor = db.cursor_mut::<tables::TxSenders>()?;
@ -117,7 +116,7 @@ impl<DB: Database> Stage<DB> for SendersStage {
input: UnwindInput, input: UnwindInput,
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>> { ) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>> {
// Lookup latest tx id that we should unwind to // Lookup latest tx id that we should unwind to
let latest_tx_id = db.get_latest_tx_id(input.unwind_to).unwrap_or_default(); let latest_tx_id = db.get_block_body_by_num(input.unwind_to)?.last_tx_index();
db.unwind_table_by_num::<tables::TxSenders>(latest_tx_id)?; db.unwind_table_by_num::<tables::TxSenders>(latest_tx_id)?;
Ok(UnwindOutput { stage_progress: input.unwind_to }) Ok(UnwindOutput { stage_progress: input.unwind_to })
} }
@ -126,6 +125,7 @@ impl<DB: Database> Stage<DB> for SendersStage {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use assert_matches::assert_matches; use assert_matches::assert_matches;
use reth_db::models::StoredBlockBody;
use reth_interfaces::test_utils::generators::random_block_range; use reth_interfaces::test_utils::generators::random_block_range;
use reth_primitives::{BlockLocked, BlockNumber, H256}; use reth_primitives::{BlockLocked, BlockNumber, H256};
@ -214,25 +214,27 @@ mod tests {
let blocks = random_block_range(stage_progress..end, H256::zero()); let blocks = random_block_range(stage_progress..end, H256::zero());
self.db.commit(|tx| { self.db.commit(|tx| {
let mut base_tx_id = 0; let mut current_tx_id = 0;
blocks.iter().try_for_each(|b| { blocks.iter().try_for_each(|b| {
let txs = b.body.clone(); let txs = b.body.clone();
let tx_amount = txs.len() as u64;
let num_hash = (b.number, b.hash()).into(); let num_hash = (b.number, b.hash()).into();
tx.put::<tables::CanonicalHeaders>(b.number, b.hash())?; tx.put::<tables::CanonicalHeaders>(b.number, b.hash())?;
tx.put::<tables::CumulativeTxCount>(num_hash, base_tx_id + tx_amount)?; tx.put::<tables::BlockBodies>(
num_hash,
StoredBlockBody { start_tx_id: current_tx_id, tx_count: txs.len() as u64 },
)?;
for body_tx in txs { for body_tx in txs {
// Insert senders for previous stage progress // Insert senders for previous stage progress
if b.number == stage_progress { if b.number == stage_progress {
tx.put::<tables::TxSenders>( tx.put::<tables::TxSenders>(
base_tx_id, current_tx_id,
body_tx.recover_signer().expect("failed to recover sender"), body_tx.recover_signer().expect("failed to recover sender"),
)?; )?;
} }
tx.put::<tables::Transactions>(base_tx_id, body_tx)?; tx.put::<tables::Transactions>(current_tx_id, body_tx)?;
base_tx_id += 1; current_tx_id += 1;
} }
Ok(()) Ok(())
@ -257,17 +259,12 @@ mod tests {
return Ok(()) return Ok(())
} }
let mut tx_count_cursor = tx.cursor::<tables::CumulativeTxCount>()?; let start_hash = tx.get::<tables::CanonicalHeaders>(start_block)?.unwrap();
let mut body_cursor = tx.cursor::<tables::BlockBodies>()?;
body_cursor.seek_exact((start_block, start_hash).into())?;
let last_block = start_block - 1; while let Some((_, body)) = body_cursor.next()? {
let last_hash = tx.get::<tables::CanonicalHeaders>(start_block)?.unwrap(); for tx_id in body.tx_id_range() {
let mut last_tx_count = tx_count_cursor
.seek_exact((last_block, last_hash).into())?
.map(|(_, v)| v)
.unwrap_or_default();
while let Some((_, count)) = tx_count_cursor.next()? {
for tx_id in last_tx_count..count {
let transaction = tx let transaction = tx
.get::<tables::Transactions>(tx_id)? .get::<tables::Transactions>(tx_id)?
.expect("no transaction entry"); .expect("no transaction entry");
@ -275,7 +272,6 @@ mod tests {
transaction.recover_signer().expect("failed to recover signer"); transaction.recover_signer().expect("failed to recover signer");
assert_eq!(Some(signer), tx.get::<tables::TxSenders>(tx_id)?); assert_eq!(Some(signer), tx.get::<tables::TxSenders>(tx_id)?);
} }
last_tx_count = count;
} }
Ok(()) Ok(())
@ -296,11 +292,13 @@ mod tests {
impl SendersTestRunner { impl SendersTestRunner {
fn check_no_senders_by_block(&self, block: BlockNumber) -> Result<(), TestRunnerError> { fn check_no_senders_by_block(&self, block: BlockNumber) -> Result<(), TestRunnerError> {
let latest_tx_id = self.db.inner().get_latest_tx_id(block); let body_result = self.db.inner().get_block_body_by_num(block);
match latest_tx_id { match body_result {
Ok(last_index) => { Ok(body) => self
self.db.check_no_entry_above::<tables::TxSenders, _>(last_index, |key| key)? .db
} .check_no_entry_above::<tables::TxSenders, _>(body.last_tx_index(), |key| {
key
})?,
Err(_) => { Err(_) => {
assert!(self.db.table_is_empty::<tables::TxSenders>()?); assert!(self.db.table_is_empty::<tables::TxSenders>()?);
} }

View File

@ -10,14 +10,13 @@ macro_rules! stage_test_suite {
let input = crate::stage::ExecInput::default(); let input = crate::stage::ExecInput::default();
// Run stage execution // Run stage execution
let result = runner.execute(input).await.unwrap(); let result = runner.execute(input).await;
assert_matches::assert_matches!( // Check that the result is returned and the stage does not panic.
result, // The return result with empty db is stage-specific.
Err(crate::error::StageError::DatabaseIntegrity(_)) assert_matches::assert_matches!(result, Ok(_));
);
// Validate the stage execution // Validate the stage execution
assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation"); assert!(runner.validate_execution(input, result.unwrap().ok()).is_ok(), "execution validation");
} }
// Run the complete stage execution flow. // Run the complete stage execution flow.
@ -49,13 +48,16 @@ macro_rules! stage_test_suite {
assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation"); assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
} }
// Check that unwind does not panic on empty database. // Check that unwind does not panic on no new entries within the input range.
#[tokio::test] #[tokio::test]
async fn unwind_empty_db() { async fn unwind_no_new_entries() {
// Set up the runner // Set up the runner
let runner = $runner::default(); let mut runner = $runner::default();
let input = crate::stage::UnwindInput::default(); let input = crate::stage::UnwindInput::default();
// Seed the database
runner.seed_execution(crate::stage::ExecInput::default()).expect("failed to seed");
// Run stage unwind // Run stage unwind
let rx = runner.unwind(input).await; let rx = runner.unwind(input).await;
assert_matches::assert_matches!( assert_matches::assert_matches!(

View File

@ -40,6 +40,7 @@ impl_compression_for_compact!(
Receipt, Receipt,
TxType, TxType,
StorageEntry, StorageEntry,
StoredBlockBody,
StoredBlockOmmers StoredBlockOmmers
); );
impl_compression_for_compact!(AccountBeforeTx, TransactionSigned); impl_compression_for_compact!(AccountBeforeTx, TransactionSigned);

View File

@ -86,5 +86,5 @@ macro_rules! impl_fuzzer_value_with_input {
}; };
} }
impl_fuzzer_key!(BlockNumHash, TxNumberAddress); impl_fuzzer_key!(BlockNumHash, TransitionIdAddress);
impl_fuzzer_value_with_input!((IntegerList, IntegerListInput)); impl_fuzzer_value_with_input!((IntegerList, IntegerListInput));

View File

@ -10,17 +10,19 @@ use crate::{
tables::{ tables::{
codecs::CompactU256, codecs::CompactU256,
models::{ models::{
accounts::{AccountBeforeTx, TxNumberAddress}, accounts::{AccountBeforeTx, TransitionIdAddress},
blocks::{HeaderHash, NumTransactions, StoredBlockOmmers}, blocks::{HeaderHash, StoredBlockOmmers},
BlockNumHash, ShardedKey, BlockNumHash, ShardedKey,
}, },
}, },
}; };
use reth_primitives::{ use reth_primitives::{
Account, Address, BlockHash, BlockNumber, Header, IntegerList, Receipt, StorageEntry, Account, Address, BlockHash, BlockNumber, Header, IntegerList, Receipt, StorageEntry,
TransactionSigned, TxHash, TxNumber, H256, TransactionSigned, TransitionId, TxHash, TxNumber, H256,
}; };
use self::models::StoredBlockBody;
/// Enum for the types of tables present in libmdbx. /// Enum for the types of tables present in libmdbx.
#[derive(Debug)] #[derive(Debug)]
pub enum TableType { pub enum TableType {
@ -31,13 +33,13 @@ pub enum TableType {
} }
/// Default tables that should be present inside database. /// Default tables that should be present inside database.
pub const TABLES: [(TableType, &str); 21] = [ pub const TABLES: [(TableType, &str); 23] = [
(TableType::Table, CanonicalHeaders::const_name()), (TableType::Table, CanonicalHeaders::const_name()),
(TableType::Table, HeaderTD::const_name()), (TableType::Table, HeaderTD::const_name()),
(TableType::Table, HeaderNumbers::const_name()), (TableType::Table, HeaderNumbers::const_name()),
(TableType::Table, Headers::const_name()), (TableType::Table, Headers::const_name()),
(TableType::Table, BlockBodies::const_name()),
(TableType::Table, BlockOmmers::const_name()), (TableType::Table, BlockOmmers::const_name()),
(TableType::Table, CumulativeTxCount::const_name()),
(TableType::Table, NonCanonicalTransactions::const_name()), (TableType::Table, NonCanonicalTransactions::const_name()),
(TableType::Table, Transactions::const_name()), (TableType::Table, Transactions::const_name()),
(TableType::Table, TxHashNumber::const_name()), (TableType::Table, TxHashNumber::const_name()),
@ -46,6 +48,8 @@ pub const TABLES: [(TableType, &str); 21] = [
(TableType::Table, PlainAccountState::const_name()), (TableType::Table, PlainAccountState::const_name()),
(TableType::DupSort, PlainStorageState::const_name()), (TableType::DupSort, PlainStorageState::const_name()),
(TableType::Table, Bytecodes::const_name()), (TableType::Table, Bytecodes::const_name()),
(TableType::Table, BlockTransitionIndex::const_name()),
(TableType::Table, TxTransitionIndex::const_name()),
(TableType::Table, AccountHistory::const_name()), (TableType::Table, AccountHistory::const_name()),
(TableType::Table, StorageHistory::const_name()), (TableType::Table, StorageHistory::const_name()),
(TableType::DupSort, AccountChangeSet::const_name()), (TableType::DupSort, AccountChangeSet::const_name()),
@ -123,18 +127,14 @@ table!(
); );
table!( table!(
/// Stores the uncles/ommers of the block. /// Stores block bodies.
( BlockOmmers ) BlockNumHash | StoredBlockOmmers ( BlockBodies ) BlockNumHash | StoredBlockBody
); );
table!( table!(
/// Stores the maximum [`TxNumber`] from which this particular block starts. /// Stores the uncles/ommers of the block.
/// ( BlockOmmers ) BlockNumHash | StoredBlockOmmers
/// Used to collect transactions for the block. e.g. To collect transactions );
/// for block `x` you would need to look at cumulative count at block `x` and
/// at block `x - 1`.
( CumulativeTxCount ) BlockNumHash | NumTransactions
); // TODO U256?
table!( table!(
/// Stores the transaction body from non canonical transactions. /// Stores the transaction body from non canonical transactions.
@ -174,6 +174,16 @@ table!(
( Bytecodes ) H256 | Bytecode ( Bytecodes ) H256 | Bytecode
); );
table!(
/// Stores the mapping of block number to state transition id.
( BlockTransitionIndex ) BlockNumHash | TransitionId
);
table!(
/// Stores the mapping of transaction number to state transition id.
( TxTransitionIndex ) TxNumber | TransitionId
);
dupsort!( dupsort!(
/// Stores the current value of a storage key. /// Stores the current value of a storage key.
( PlainStorageState ) Address | [H256] StorageEntry ( PlainStorageState ) Address | [H256] StorageEntry
@ -234,14 +244,14 @@ dupsort!(
/// Stores the state of an account before a certain transaction changed it. /// Stores the state of an account before a certain transaction changed it.
/// Change on state can be: account is created, selfdestructed, touched while empty /// Change on state can be: account is created, selfdestructed, touched while empty
/// or changed (balance,nonce). /// or changed (balance,nonce).
( AccountChangeSet ) TxNumber | [Address] AccountBeforeTx ( AccountChangeSet ) TransitionId | [Address] AccountBeforeTx
); );
dupsort!( dupsort!(
/// Stores the state of a storage key before a certain transaction changed it. /// Stores the state of a storage key before a certain transaction changed it.
/// If [`StorageEntry::value`] is zero, this means storage was not existing /// If [`StorageEntry::value`] is zero, this means storage was not existing
/// and needs to be removed. /// and needs to be removed.
( StorageChangeSet ) TxNumberAddress | [H256] StorageEntry ( StorageChangeSet ) TransitionIdAddress | [H256] StorageEntry
); );
table!( table!(

View File

@ -7,7 +7,7 @@ use crate::{
}; };
use bytes::Bytes; use bytes::Bytes;
use reth_codecs::{main_codec, Compact}; use reth_codecs::{main_codec, Compact};
use reth_primitives::{Account, Address, TxNumber}; use reth_primitives::{Account, Address, TransitionId};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
/// Account as it is saved inside [`AccountChangeSet`]. [`Address`] is the subkey. /// Account as it is saved inside [`AccountChangeSet`]. [`Address`] is the subkey.
@ -26,22 +26,32 @@ pub struct AccountBeforeTx {
/// ///
/// Since it's used as a key, it isn't compressed when encoding it. /// Since it's used as a key, it isn't compressed when encoding it.
#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TxNumberAddress(pub (TxNumber, Address)); pub struct TransitionIdAddress(pub (TransitionId, Address));
impl TransitionIdAddress {
/// Return the transition id
pub fn transition_id(&self) -> TransitionId {
self.0 .0
}
/// Return the address
pub fn address(&self) -> Address {
self.0 .1
}
impl TxNumberAddress {
/// Consumes `Self` and returns [`TxNumber`], [`Address`] /// Consumes `Self` and returns [`TxNumber`], [`Address`]
pub fn take(self) -> (TxNumber, Address) { pub fn take(self) -> (TransitionId, Address) {
(self.0 .0, self.0 .1) (self.0 .0, self.0 .1)
} }
} }
impl From<(u64, Address)> for TxNumberAddress { impl From<(u64, Address)> for TransitionIdAddress {
fn from(tpl: (u64, Address)) -> Self { fn from(tpl: (u64, Address)) -> Self {
TxNumberAddress(tpl) TransitionIdAddress(tpl)
} }
} }
impl Encode for TxNumberAddress { impl Encode for TransitionIdAddress {
type Encoded = [u8; 28]; type Encoded = [u8; 28];
fn encode(self) -> Self::Encoded { fn encode(self) -> Self::Encoded {
@ -56,7 +66,7 @@ impl Encode for TxNumberAddress {
} }
} }
impl Decode for TxNumberAddress { impl Decode for TransitionIdAddress {
fn decode<B: Into<Bytes>>(value: B) -> Result<Self, Error> { fn decode<B: Into<Bytes>>(value: B) -> Result<Self, Error> {
let value: bytes::Bytes = value.into(); let value: bytes::Bytes = value.into();
@ -64,11 +74,11 @@ impl Decode for TxNumberAddress {
u64::from_be_bytes(value.as_ref()[..8].try_into().map_err(|_| Error::DecodeError)?); u64::from_be_bytes(value.as_ref()[..8].try_into().map_err(|_| Error::DecodeError)?);
let hash = Address::from_slice(&value.slice(8..)); let hash = Address::from_slice(&value.slice(8..));
Ok(TxNumberAddress((num, hash))) Ok(TransitionIdAddress((num, hash)))
} }
} }
impl_fixed_arbitrary!(TxNumberAddress, 28); impl_fixed_arbitrary!(TransitionIdAddress, 28);
#[cfg(test)] #[cfg(test)]
mod test { mod test {
@ -80,7 +90,7 @@ mod test {
fn test_tx_number_address() { fn test_tx_number_address() {
let num = 1u64; let num = 1u64;
let hash = Address::from_str("ba5e000000000000000000000000000000000000").unwrap(); let hash = Address::from_str("ba5e000000000000000000000000000000000000").unwrap();
let key = TxNumberAddress((num, hash)); let key = TransitionIdAddress((num, hash));
let mut bytes = [0u8; 28]; let mut bytes = [0u8; 28];
bytes[..8].copy_from_slice(&num.to_be_bytes()); bytes[..8].copy_from_slice(&num.to_be_bytes());
@ -89,7 +99,7 @@ mod test {
let encoded = Encode::encode(key.clone()); let encoded = Encode::encode(key.clone());
assert_eq!(encoded, bytes); assert_eq!(encoded, bytes);
let decoded: TxNumberAddress = Decode::decode(encoded.to_vec()).unwrap(); let decoded: TransitionIdAddress = Decode::decode(encoded.to_vec()).unwrap();
assert_eq!(decoded, key); assert_eq!(decoded, key);
} }
@ -97,7 +107,7 @@ mod test {
fn test_tx_number_address_rand() { fn test_tx_number_address_rand() {
let mut bytes = [0u8; 28]; let mut bytes = [0u8; 28];
thread_rng().fill(bytes.as_mut_slice()); thread_rng().fill(bytes.as_mut_slice());
let key = TxNumberAddress::arbitrary(&mut Unstructured::new(&bytes)).unwrap(); let key = TransitionIdAddress::arbitrary(&mut Unstructured::new(&bytes)).unwrap();
assert_eq!(bytes, Encode::encode(key)); assert_eq!(bytes, Encode::encode(key));
} }
} }

View File

@ -1,5 +1,7 @@
//! Block related models and types. //! Block related models and types.
use std::ops::Range;
use crate::{ use crate::{
impl_fixed_arbitrary, impl_fixed_arbitrary,
table::{Decode, Encode}, table::{Decode, Encode},
@ -7,14 +9,44 @@ use crate::{
}; };
use bytes::Bytes; use bytes::Bytes;
use reth_codecs::{main_codec, Compact}; use reth_codecs::{main_codec, Compact};
use reth_primitives::{BlockHash, BlockNumber, Header, H256}; use reth_primitives::{BlockHash, BlockNumber, Header, TxNumber, H256};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
/// Total chain number of transactions. Value for [`CumulativeTxCount`]. /// Total chain number of transactions. Value for [`CumulativeTxCount`]. // TODO:
///
/// Used for collecting transactions for a block.
pub type NumTransactions = u64; pub type NumTransactions = u64;
/// The storage representation of a block.
///
/// It has the pointer to the transaction Number of the first
/// transaction in the block and the total number of transactions
#[derive(Debug, Default, Eq, PartialEq, Clone)]
#[main_codec]
pub struct StoredBlockBody {
/// The id of the first transaction in this block
pub start_tx_id: TxNumber,
/// The total number of transactions
pub tx_count: NumTransactions,
}
impl StoredBlockBody {
/// Return the range of transaction ids for this body
pub fn tx_id_range(&self) -> Range<u64> {
self.start_tx_id..self.start_tx_id + self.tx_count
}
/// Return the index of last transaction in this block unless the block
/// is empty in which case it refers to the last transaction in a previous
/// non-empty block
pub fn last_tx_index(&self) -> TxNumber {
self.start_tx_id.saturating_add(self.tx_count).saturating_sub(1)
}
/// Return a flag whether the block is empty
pub fn is_empty(&self) -> bool {
self.tx_count == 0
}
}
/// The storage representation of a block ommers. /// The storage representation of a block ommers.
/// ///
/// It is stored as the headers of the block's uncles. /// It is stored as the headers of the block's uncles.

View File

@ -1,6 +1,6 @@
use auto_impl::auto_impl; use auto_impl::auto_impl;
use reth_db::{ use reth_db::{
models::{BlockNumHash, StoredBlockOmmers}, models::{BlockNumHash, StoredBlockBody, StoredBlockOmmers},
tables, tables,
transaction::{DbTx, DbTxMut}, transaction::{DbTx, DbTxMut},
}; };
@ -105,28 +105,6 @@ pub struct ChainInfo {
pub safe_finalized: Option<reth_primitives::BlockNumber>, pub safe_finalized: Option<reth_primitives::BlockNumber>,
} }
/// Get value from [tables::CumulativeTxCount] by block hash
/// as the table is indexed by NumHash key we are obtaining number from
/// [tables::HeaderNumbers]
pub fn get_cumulative_tx_count_by_hash<'a, TX: DbTxMut<'a> + DbTx<'a>>(
tx: &TX,
block_hash: H256,
) -> Result<u64> {
let block_number = tx
.get::<tables::HeaderNumbers>(block_hash)?
.ok_or(ProviderError::BlockHashNotExist { block_hash })?;
let block_num_hash = BlockNumHash((block_number, block_hash));
tx.get::<tables::CumulativeTxCount>(block_num_hash)?.ok_or_else(|| {
ProviderError::BlockBodyNotExist {
block_number: block_num_hash.number(),
block_hash: block_num_hash.hash(),
}
.into()
})
}
/// Fill block to database. Useful for tests. /// Fill block to database. Useful for tests.
/// Check parent dependency in [tables::HeaderNumbers] and in [tables::CumulativeTxCount] tables. /// Check parent dependency in [tables::HeaderNumbers] and in [tables::CumulativeTxCount] tables.
/// Inserts blocks data to [tables::CanonicalHeaders], [tables::Headers], [tables::HeaderNumbers], /// Inserts blocks data to [tables::CanonicalHeaders], [tables::Headers], [tables::HeaderNumbers],
@ -149,22 +127,49 @@ pub fn insert_canonical_block<'a, TX: DbTxMut<'a> + DbTx<'a>>(
StoredBlockOmmers { ommers: block.ommers.iter().map(|h| h.as_ref().clone()).collect() }, StoredBlockOmmers { ommers: block.ommers.iter().map(|h| h.as_ref().clone()).collect() },
)?; )?;
if block.number == 0 { let (mut current_tx_id, mut transition_id) = {
tx.put::<tables::CumulativeTxCount>(block_num_hash, 0)?; if block.number == 0 {
} else { (0, 0)
let mut tx_number = get_cumulative_tx_count_by_hash(tx, block.parent_hash)?; } else {
let prev_block_num = block.number - 1;
for eth_tx in block.body.iter() { let prev_block_hash = tx
let rec_tx = eth_tx.clone().into_ecrecovered().unwrap(); .get::<tables::CanonicalHeaders>(prev_block_num)?
tx.put::<tables::TxSenders>(tx_number, rec_tx.signer())?; .ok_or(ProviderError::BlockNumber { block_number: prev_block_num })?;
tx.put::<tables::Transactions>(tx_number, rec_tx.into())?; let prev_body = tx
tx_number += 1; .get::<tables::BlockBodies>((prev_block_num, prev_block_hash).into())?
.ok_or(ProviderError::BlockBody {
block_number: prev_block_num,
block_hash: prev_block_hash,
})?;
let last_transition_id = tx
.get::<tables::BlockTransitionIndex>((prev_block_num, prev_block_hash).into())?
.ok_or(ProviderError::BlockTransition {
block_number: prev_block_num,
block_hash: prev_block_hash,
})?;
(prev_body.start_tx_id + prev_body.tx_count, last_transition_id + 1)
} }
tx.put::<tables::CumulativeTxCount>( };
block_num_hash,
tx_number + if has_block_reward { 1 } else { 0 }, // insert body data
)?; tx.put::<tables::BlockBodies>(
block_num_hash,
StoredBlockBody { start_tx_id: current_tx_id, tx_count: block.body.len() as u64 },
)?;
for transaction in block.body.iter() {
let rec_tx = transaction.clone().into_ecrecovered().unwrap();
tx.put::<tables::TxSenders>(current_tx_id, rec_tx.signer())?;
tx.put::<tables::Transactions>(current_tx_id, rec_tx.into())?;
tx.put::<tables::TxTransitionIndex>(current_tx_id, transition_id)?;
current_tx_id += 1;
transition_id += 1;
} }
if has_block_reward {
transition_id += 1;
}
tx.put::<tables::BlockTransitionIndex>((block.number, block.hash()).into(), transition_id)?;
Ok(()) Ok(())
} }

View File

@ -9,7 +9,8 @@ use reth_db::{
use reth_interfaces::Result; use reth_interfaces::Result;
use reth_primitives::{ use reth_primitives::{
Account, Address, BlockHash, BlockNumber, Bytes, StorageKey, StorageValue, TxNumber, H256, U256, Account, Address, BlockHash, BlockNumber, Bytes, StorageKey, StorageValue, TransitionId, H256,
U256,
}; };
use std::marker::PhantomData; use std::marker::PhantomData;
@ -26,71 +27,68 @@ impl<DB: Database> StateProviderFactory for ProviderImpl<DB> {
// get block hash // get block hash
let block_hash = tx let block_hash = tx
.get::<tables::CanonicalHeaders>(block_number)? .get::<tables::CanonicalHeaders>(block_number)?
.ok_or(Error::BlockNumberNotExists { block_number })?; .ok_or(Error::BlockNumber { block_number })?;
// get transaction number // get transition id
let block_num_hash = (block_number, block_hash); let block_num_hash = (block_number, block_hash);
let transaction_number = tx let transition = tx
.get::<tables::CumulativeTxCount>(block_num_hash.into())? .get::<tables::BlockTransitionIndex>(block_num_hash.into())?
.ok_or(Error::BlockTxNumberNotExists { block_hash })?; .ok_or(Error::BlockTransition { block_number, block_hash })?;
Ok(StateProviderImplHistory::new(tx, transaction_number)) Ok(StateProviderImplHistory::new(tx, transition))
} }
fn history_by_block_hash(&self, block_hash: BlockHash) -> Result<Self::HistorySP<'_>> { fn history_by_block_hash(&self, block_hash: BlockHash) -> Result<Self::HistorySP<'_>> {
let tx = self.db.tx()?; let tx = self.db.tx()?;
// get block number // get block number
let block_number = tx let block_number =
.get::<tables::HeaderNumbers>(block_hash)? tx.get::<tables::HeaderNumbers>(block_hash)?.ok_or(Error::BlockHash { block_hash })?;
.ok_or(Error::BlockHashNotExist { block_hash })?;
// get transaction number // get transition id
let block_num_hash = (block_number, block_hash); let block_num_hash = (block_number, block_hash);
let transaction_number = tx let transition = tx
.get::<tables::CumulativeTxCount>(block_num_hash.into())? .get::<tables::BlockTransitionIndex>(block_num_hash.into())?
.ok_or(Error::BlockTxNumberNotExists { block_hash })?; .ok_or(Error::BlockTransition { block_number, block_hash })?;
Ok(StateProviderImplHistory::new(tx, transaction_number)) Ok(StateProviderImplHistory::new(tx, transition))
} }
} }
/// State provider for given transaction number /// State provider for a given transition
pub struct StateProviderImplHistory<'a, TX: DbTx<'a>> { pub struct StateProviderImplHistory<'a, TX: DbTx<'a>> {
/// Database transaction /// Database transaction
tx: TX, tx: TX,
/// Transaction number is main indexer of account and storage changes /// Transition is main indexer of account and storage changes
transaction_number: TxNumber, transition: TransitionId,
/// Phantom lifetime `'a` /// Phantom lifetime `'a`
_phantom: PhantomData<&'a TX>, _phantom: PhantomData<&'a TX>,
} }
impl<'a, TX: DbTx<'a>> StateProviderImplHistory<'a, TX> { impl<'a, TX: DbTx<'a>> StateProviderImplHistory<'a, TX> {
/// Create new StateProvider from history transaction number /// Create new StateProvider from history transaction number
pub fn new(tx: TX, transaction_number: TxNumber) -> Self { pub fn new(tx: TX, transition: TransitionId) -> Self {
Self { tx, transaction_number, _phantom: PhantomData {} } Self { tx, transition, _phantom: PhantomData {} }
} }
} }
impl<'a, TX: DbTx<'a>> AccountProvider for StateProviderImplHistory<'a, TX> { impl<'a, TX: DbTx<'a>> AccountProvider for StateProviderImplHistory<'a, TX> {
/// Get basic account information. /// Get basic account information.
fn basic_account(&self, address: Address) -> Result<Option<Account>> { fn basic_account(&self, address: Address) -> Result<Option<Account>> {
StateProviderImplRefHistory::new(&self.tx, self.transaction_number).basic_account(address) StateProviderImplRefHistory::new(&self.tx, self.transition).basic_account(address)
} }
} }
impl<'a, TX: DbTx<'a>> StateProvider for StateProviderImplHistory<'a, TX> { impl<'a, TX: DbTx<'a>> StateProvider for StateProviderImplHistory<'a, TX> {
fn storage(&self, account: Address, storage_key: StorageKey) -> Result<Option<StorageValue>> { fn storage(&self, account: Address, storage_key: StorageKey) -> Result<Option<StorageValue>> {
StateProviderImplRefHistory::new(&self.tx, self.transaction_number) StateProviderImplRefHistory::new(&self.tx, self.transition).storage(account, storage_key)
.storage(account, storage_key)
} }
fn bytecode_by_hash(&self, code_hash: H256) -> Result<Option<Bytes>> { fn bytecode_by_hash(&self, code_hash: H256) -> Result<Option<Bytes>> {
StateProviderImplRefHistory::new(&self.tx, self.transaction_number) StateProviderImplRefHistory::new(&self.tx, self.transition).bytecode_by_hash(code_hash)
.bytecode_by_hash(code_hash)
} }
fn block_hash(&self, number: U256) -> Result<Option<H256>> { fn block_hash(&self, number: U256) -> Result<Option<H256>> {
StateProviderImplRefHistory::new(&self.tx, self.transaction_number).block_hash(number) StateProviderImplRefHistory::new(&self.tx, self.transition).block_hash(number)
} }
} }
/// State provider with given hash /// State provider with given hash
@ -104,16 +102,16 @@ impl<'a, TX: DbTx<'a>> StateProvider for StateProviderImplHistory<'a, TX> {
pub struct StateProviderImplRefHistory<'a, 'b, TX: DbTx<'a>> { pub struct StateProviderImplRefHistory<'a, 'b, TX: DbTx<'a>> {
/// Transaction /// Transaction
tx: &'b TX, tx: &'b TX,
/// Transaction number is main indexer of account and storage changes /// Transition is main indexer of account and storage changes
transaction_number: TxNumber, transition: TransitionId,
/// Phantom lifetime `'a` /// Phantom lifetime `'a`
_phantom: PhantomData<&'a TX>, _phantom: PhantomData<&'a TX>,
} }
impl<'a, 'b, TX: DbTx<'a>> StateProviderImplRefHistory<'a, 'b, TX> { impl<'a, 'b, TX: DbTx<'a>> StateProviderImplRefHistory<'a, 'b, TX> {
/// Create new StateProvider from history transaction number /// Create new StateProvider from history transaction number
pub fn new(tx: &'b TX, transaction_number: TxNumber) -> Self { pub fn new(tx: &'b TX, transition: TransitionId) -> Self {
Self { tx, transaction_number, _phantom: PhantomData {} } Self { tx, transition, _phantom: PhantomData {} }
} }
} }
@ -131,8 +129,8 @@ impl<'a, 'b, TX: DbTx<'a>> StateProvider for StateProviderImplRefHistory<'a, 'b,
// TODO when StorageHistory is defined // TODO when StorageHistory is defined
let transaction_number = let transaction_number =
self.tx.get::<tables::StorageHistory>(Vec::new())?.map(|_integer_list| self.tx.get::<tables::StorageHistory>(Vec::new())?.map(|_integer_list|
// TODO select integer that is one less from transaction_number // TODO select integer that is one less from transaction_number <- // TODO: (rkrasiuk) not sure this comment is still relevant
self.transaction_number); self.transition);
if transaction_number.is_none() { if transaction_number.is_none() {
return Ok(None) return Ok(None)

View File

@ -16,10 +16,7 @@ mod state;
/// Common test helpers for mocking the Provider. /// Common test helpers for mocking the Provider.
pub mod test_utils; pub mod test_utils;
pub use block::{ pub use block::{insert_canonical_block, BlockProvider, ChainInfo, HeaderProvider};
get_cumulative_tx_count_by_hash, insert_canonical_block, BlockProvider, ChainInfo,
HeaderProvider,
};
pub use db_provider::{ pub use db_provider::{
self as db, ProviderImpl, StateProviderImplHistory, StateProviderImplLatest, self as db, ProviderImpl, StateProviderImplHistory, StateProviderImplLatest,
StateProviderImplRefHistory, StateProviderImplRefLatest, StateProviderImplRefHistory, StateProviderImplRefLatest,