refactor(db): separate transaction indexes from block body (#326)

This commit is contained in:
Roman Krasiuk
2022-12-06 17:46:25 +02:00
committed by Georgios Konstantopoulos
parent 925001e1ee
commit 449321ddc4
11 changed files with 260 additions and 354 deletions

View File

@ -1,5 +1,5 @@
use crate::db::{ use crate::db::{
models::{accounts::AccountBeforeTx, StoredBlockBody}, models::{accounts::AccountBeforeTx, StoredBlockOmmers},
Compress, Decompress, Error, Compress, Decompress, Error,
}; };
use reth_codecs::{main_codec, Compact}; use reth_codecs::{main_codec, Compact};
@ -32,7 +32,15 @@ macro_rules! impl_compression_for_compact {
}; };
} }
impl_compression_for_compact!(Header, Account, Log, Receipt, TxType, StorageEntry, StoredBlockBody); impl_compression_for_compact!(
Header,
Account,
Log,
Receipt,
TxType,
StorageEntry,
StoredBlockOmmers
);
impl_compression_for_compact!(AccountBeforeTx, TransactionSigned); impl_compression_for_compact!(AccountBeforeTx, TransactionSigned);
impl_compression_for_compact!(CompactU256); impl_compression_for_compact!(CompactU256);

View File

@ -9,37 +9,25 @@ use crate::{
}; };
use bytes::Bytes; use bytes::Bytes;
use reth_codecs::{main_codec, Compact}; use reth_codecs::{main_codec, Compact};
use reth_primitives::{BlockHash, BlockNumber, Header, TxNumber, H256}; use reth_primitives::{BlockHash, BlockNumber, Header, H256};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
/// Total chain number of transactions. Key for [`CumulativeTxCount`]. /// Total chain number of transactions. Value for [`CumulativeTxCount`].
///
/// Used for collecting transactions for a block.
pub type NumTransactions = u64; pub type NumTransactions = u64;
/// The storage representation of a block body. /// The storage representation of a block ommers.
/// ///
/// A block body is stored as a pointer to the first transaction in the block (`base_tx_id`), a /// It is stored as the headers of the block's uncles.
/// count of how many transactions are in the block, and the headers of the block's uncles.
///
/// The [TxNumber]s for all the transactions in the block are `base_tx_id..(base_tx_id +
/// tx_amount)`. /// tx_amount)`.
#[derive(Debug, Default, Eq, PartialEq, Clone)] #[derive(Debug, Default, Eq, PartialEq, Clone)]
#[main_codec] #[main_codec]
pub struct StoredBlockBody { pub struct StoredBlockOmmers {
/// The ID of the first transaction in the block.
pub base_tx_id: TxNumber,
/// The number of transactions in the block.
pub tx_amount: u64,
/// The block headers of this block's uncles. /// The block headers of this block's uncles.
pub ommers: Vec<Header>, pub ommers: Vec<Header>,
} }
impl StoredBlockBody {
/// Return next block tx id.
pub fn next_block_tx_id(&self) -> TxNumber {
self.base_tx_id + self.tx_amount
}
}
/// Hash of the block header. Value for [`CanonicalHeaders`] /// Hash of the block header. Value for [`CanonicalHeaders`]
pub type HeaderHash = H256; pub type HeaderHash = H256;

View File

@ -4,7 +4,7 @@ use crate::db::{
codecs::CompactU256, codecs::CompactU256,
models::{ models::{
accounts::{AccountBeforeTx, TxNumberAddress}, accounts::{AccountBeforeTx, TxNumberAddress},
blocks::{BlockNumHash, HeaderHash, NumTransactions, StoredBlockBody}, blocks::{BlockNumHash, HeaderHash, NumTransactions, StoredBlockOmmers},
ShardedKey, ShardedKey,
}, },
DupSort, DupSort,
@ -29,7 +29,7 @@ pub const TABLES: [(TableType, &str); 20] = [
(TableType::Table, HeaderTD::const_name()), (TableType::Table, HeaderTD::const_name()),
(TableType::Table, HeaderNumbers::const_name()), (TableType::Table, HeaderNumbers::const_name()),
(TableType::Table, Headers::const_name()), (TableType::Table, Headers::const_name()),
(TableType::Table, BlockBodies::const_name()), (TableType::Table, BlockOmmers::const_name()),
(TableType::Table, CumulativeTxCount::const_name()), (TableType::Table, CumulativeTxCount::const_name()),
(TableType::Table, NonCanonicalTransactions::const_name()), (TableType::Table, NonCanonicalTransactions::const_name()),
(TableType::Table, Transactions::const_name()), (TableType::Table, Transactions::const_name()),
@ -122,15 +122,16 @@ table!(
); );
table!( table!(
/// Stores a pointer to the first transaction in the block, the number of transactions in the /// Stores the uncles/ommers of the block.
/// block, and the uncles/ommers of the block. ( BlockOmmers ) BlockNumHash | StoredBlockOmmers
///
/// The transaction IDs point to the [`Transactions`] table.
( BlockBodies ) BlockNumHash | StoredBlockBody
); );
table!( table!(
/// Stores the maximum [`TxNumber`] from which this particular block starts. /// Stores the maximum [`TxNumber`] from which this particular block starts.
///
/// Used to collect transactions for the block. e.g. To collect transactions
/// for block `x` you would need to look at cumulative count at block `x` and
/// at block `x - 1`.
( CumulativeTxCount ) BlockNumHash | NumTransactions ( CumulativeTxCount ) BlockNumHash | NumTransactions
); // TODO U256? ); // TODO U256?

View File

@ -1,6 +1,6 @@
use crate::{ use crate::{
db::{ db::{
models::{BlockNumHash, StoredBlockBody}, models::{BlockNumHash, StoredBlockOmmers},
tables, DbTx, DbTxMut, tables, DbTx, DbTxMut,
}, },
provider::Error as ProviderError, provider::Error as ProviderError,
@ -142,14 +142,10 @@ pub fn insert_canonical_block<'a, TX: DbTxMut<'a> + DbTx<'a>>(
let start_tx_number = let start_tx_number =
if block.number == 0 { 0 } else { get_cumulative_tx_count_by_hash(tx, block.parent_hash)? }; if block.number == 0 { 0 } else { get_cumulative_tx_count_by_hash(tx, block.parent_hash)? };
// insert body // insert body ommers data
tx.put::<tables::BlockBodies>( tx.put::<tables::BlockOmmers>(
block_num_hash, block_num_hash,
StoredBlockBody { StoredBlockOmmers { ommers: block.ommers.iter().map(|h| h.as_ref().clone()).collect() },
base_tx_id: start_tx_number,
tx_amount: block.body.len() as u64,
ommers: block.ommers.iter().map(|h| h.as_ref().clone()).collect(),
},
)?; )?;
let mut tx_number = start_tx_number; let mut tx_number = start_tx_number;

View File

@ -3,12 +3,11 @@ use std::{
ops::{Deref, DerefMut}, ops::{Deref, DerefMut},
}; };
use reth_db::kv::cursor::PairResult;
use reth_interfaces::db::{ use reth_interfaces::db::{
models::{BlockNumHash, NumTransactions}, models::{BlockNumHash, NumTransactions},
tables, Database, DatabaseGAT, DbCursorRO, DbCursorRW, DbTx, DbTxMut, Error, Table, tables, Database, DatabaseGAT, DbCursorRO, DbCursorRW, DbTx, DbTxMut, Error, Table,
}; };
use reth_primitives::{BlockHash, BlockNumber}; use reth_primitives::{BlockHash, BlockNumber, TxNumber};
use crate::{DatabaseIntegrityError, StageError}; use crate::{DatabaseIntegrityError, StageError};
@ -90,12 +89,6 @@ where
self.tx.take(); self.tx.take();
} }
/// Get exact or previous value from the database
pub(crate) fn get_exact_or_prev<T: Table>(&self, key: T::Key) -> PairResult<T> {
let mut cursor = self.cursor::<T>()?;
Ok(cursor.seek_exact(key)?.or(cursor.prev()?))
}
/// Query [tables::CanonicalHeaders] table for block hash by block number /// Query [tables::CanonicalHeaders] table for block hash by block number
pub(crate) fn get_block_hash(&self, number: BlockNumber) -> Result<BlockHash, StageError> { pub(crate) fn get_block_hash(&self, number: BlockNumber) -> Result<BlockHash, StageError> {
let hash = self let hash = self
@ -121,6 +114,70 @@ where
Ok(count) Ok(count)
} }
/// Get id for the first **potential** transaction in a block by looking up
/// the cumulative transaction count at the previous block.
///
/// This function does not care whether the block is empty.
pub(crate) fn get_first_tx_id(&self, block: BlockNumber) -> Result<TxNumber, StageError> {
// Handle genesis block
if block == 0 {
return Ok(0)
}
let prev_key = self.get_block_numhash(block - 1)?;
self.get_tx_count(prev_key)
}
/// Get id of the last transaction in the block.
/// Returns [None] if the block is empty.
///
/// The blocks must exist in the database.
#[allow(dead_code)]
pub(crate) fn get_last_tx_id(
&self,
block: BlockNumber,
) -> Result<Option<TxNumber>, StageError> {
let key = self.get_block_numhash(block)?;
let mut cursor = self.cursor::<tables::CumulativeTxCount>()?;
let (_, tx_count) =
cursor.seek_exact(key)?.ok_or(DatabaseIntegrityError::CumulativeTxCount {
number: key.number(),
hash: key.hash(),
})?;
let is_empty = {
if block != 0 {
let (_, prev_tx_count) =
cursor.prev()?.ok_or(DatabaseIntegrityError::CumulativeTxCount {
number: key.number() + 1,
hash: self.get_block_hash(key.number() + 1)?,
})?;
tx_count != prev_tx_count
} else {
tx_count == 0
}
};
Ok(if !is_empty { Some(tx_count - 1) } else { None })
}
/// Get id of the latest transaction observed before a given block (inclusive).
/// Returns error if there are no transactions in the database.
pub(crate) fn get_latest_tx_id(
&self,
up_to_block: BlockNumber,
) -> Result<TxNumber, StageError> {
let key = self.get_block_numhash(up_to_block)?;
let tx_count = self.get_tx_count(key)?;
if tx_count != 0 {
Ok(tx_count - 1)
} else {
// No transactions in the database
Err(DatabaseIntegrityError::Transaction { id: 0 }.into())
}
}
/// Unwind table by some number key /// Unwind table by some number key
#[inline] #[inline]
pub(crate) fn unwind_table_by_num<T>(&self, num: u64) -> Result<(), Error> pub(crate) fn unwind_table_by_num<T>(&self, num: u64) -> Result<(), Error>

View File

@ -94,6 +94,12 @@ pub enum DatabaseIntegrityError {
/// The block number key /// The block number key
number: BlockNumber, number: BlockNumber,
}, },
/// The transaction is missing
#[error("Transaction #{id} not found")]
Transaction {
/// The transaction id
id: TxNumber,
},
} }
/// A pipeline execution error. /// A pipeline execution error.

View File

@ -6,7 +6,7 @@ use futures_util::TryStreamExt;
use reth_interfaces::{ use reth_interfaces::{
consensus::Consensus, consensus::Consensus,
db::{ db::{
models::StoredBlockBody, tables, Database, DatabaseGAT, DbCursorRO, DbCursorRW, DbTx, models::StoredBlockOmmers, tables, Database, DatabaseGAT, DbCursorRO, DbCursorRW, DbTx,
DbTxMut, DbTxMut,
}, },
p2p::bodies::downloader::BodyDownloader, p2p::bodies::downloader::BodyDownloader,
@ -38,16 +38,17 @@ const BODIES: StageId = StageId("Bodies");
/// ///
/// The bodies are processed and data is inserted into these tables: /// The bodies are processed and data is inserted into these tables:
/// ///
/// - [`BlockBodies`][reth_interfaces::db::tables::BlockBodies] /// - [`BlockOmmers`][reth_interfaces::db::tables::BlockOmmers]
/// - [`Transactions`][reth_interfaces::db::tables::Transactions] /// - [`Transactions`][reth_interfaces::db::tables::Transactions]
/// ///
/// # Genesis /// # Genesis
/// ///
/// This stage expects that the genesis has been inserted into the appropriate tables: /// This stage expects that the genesis has been inserted into the appropriate tables:
/// ///
/// - The header tables (see [HeadersStage][crate::stages::headers::HeadersStage]) /// - The header tables (see [`HeaderStage`][crate::stages::headers::HeaderStage])
/// - The various indexes (e.g. [TotalTxIndex][crate::stages::tx_index::TxIndex]) /// - The [`BlockOmmers`][reth_interfaces::db::tables::BlockOmmers] table
/// - The [`BlockBodies`][reth_interfaces::db::tables::BlockBodies] table /// - The [`CumulativeTxCount`][reth_interfaces::db::tables::CumulativeTxCount] table
/// - The [`Transactions`][reth_interfaces::db::tables::Transactions] table
#[derive(Debug)] #[derive(Debug)]
pub struct BodyStage<D: BodyDownloader, C: Consensus> { pub struct BodyStage<D: BodyDownloader, C: Consensus> {
/// The body downloader. /// The body downloader.
@ -96,12 +97,12 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
let bodies_to_download = self.bodies_to_download::<DB>(db, starting_block, target)?; let bodies_to_download = self.bodies_to_download::<DB>(db, starting_block, target)?;
// Cursors used to write bodies and transactions // Cursors used to write bodies and transactions
let mut bodies_cursor = db.cursor_mut::<tables::BlockBodies>()?; let mut ommers_cursor = db.cursor_mut::<tables::BlockOmmers>()?;
let mut tx_cursor = db.cursor_mut::<tables::Transactions>()?; let mut tx_cursor = db.cursor_mut::<tables::Transactions>()?;
let mut base_tx_id = bodies_cursor let mut tx_count_cursor = db.cursor_mut::<tables::CumulativeTxCount>()?;
.last()?
.map(|(_, body)| body.base_tx_id + body.tx_amount) // Get id for the first transaction in the block
.ok_or(DatabaseIntegrityError::BlockBody { number: starting_block })?; let mut first_tx_id = db.get_first_tx_id(starting_block)?;
// Cursor used to look up headers for block pre-validation // Cursor used to look up headers for block pre-validation
let mut header_cursor = db.cursor::<tables::Headers>()?; let mut header_cursor = db.cursor::<tables::Headers>()?;
@ -135,19 +136,19 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
.map_err(|err| StageError::Validation { block: block_number, error: err })?; .map_err(|err| StageError::Validation { block: block_number, error: err })?;
// Write block // Write block
bodies_cursor.append( let key = (block_number, header_hash).into();
(block_number, header_hash).into(), tx_count_cursor.append(key, first_tx_id + block.body.len() as u64)?;
StoredBlockBody { ommers_cursor.append(
base_tx_id, key,
tx_amount: block.body.len() as u64, StoredBlockOmmers {
ommers: block.ommers.into_iter().map(|header| header.unseal()).collect(), ommers: block.ommers.into_iter().map(|header| header.unseal()).collect(),
}, },
)?; )?;
// Write transactions // Write transactions
for transaction in block.body { for transaction in block.body {
tx_cursor.append(base_tx_id, transaction)?; tx_cursor.append(first_tx_id, transaction)?;
base_tx_id += 1; first_tx_id += 1;
} }
highest_block = block_number; highest_block = block_number;
@ -168,24 +169,30 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
db: &mut StageDB<'_, DB>, db: &mut StageDB<'_, DB>,
input: UnwindInput, input: UnwindInput,
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>> { ) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>> {
let mut block_body_cursor = db.cursor_mut::<tables::BlockBodies>()?; let mut tx_count_cursor = db.cursor_mut::<tables::CumulativeTxCount>()?;
let mut block_ommers_cursor = db.cursor_mut::<tables::BlockOmmers>()?;
let mut transaction_cursor = db.cursor_mut::<tables::Transactions>()?; let mut transaction_cursor = db.cursor_mut::<tables::Transactions>()?;
let mut entry = block_body_cursor.last()?; let mut entry = tx_count_cursor.last()?;
while let Some((key, body)) = entry { while let Some((key, count)) = entry {
if key.number() <= input.unwind_to { if key.number() <= input.unwind_to {
break break
} }
for num in 0..body.tx_amount { // First delete the current and find the previous cum tx count value
let tx_id = body.base_tx_id + num; tx_count_cursor.delete_current()?;
entry = tx_count_cursor.prev()?;
if block_ommers_cursor.seek_exact(key)?.is_some() {
block_ommers_cursor.delete_current()?;
}
let prev_count = entry.map(|(_, v)| v).unwrap_or_default();
for tx_id in prev_count..count {
if transaction_cursor.seek_exact(tx_id)?.is_some() { if transaction_cursor.seek_exact(tx_id)?.is_some() {
transaction_cursor.delete_current()?; transaction_cursor.delete_current()?;
} }
} }
block_body_cursor.delete_current()?;
entry = block_body_cursor.prev()?;
} }
Ok(UnwindOutput { stage_progress: input.unwind_to }) Ok(UnwindOutput { stage_progress: input.unwind_to })
@ -395,7 +402,9 @@ mod tests {
Ok(ExecOutput { stage_progress, reached_tip: true, done: true }) if stage_progress == previous_stage Ok(ExecOutput { stage_progress, reached_tip: true, done: true }) if stage_progress == previous_stage
); );
let stage_progress = output.unwrap().stage_progress; let stage_progress = output.unwrap().stage_progress;
runner.validate_db_blocks(stage_progress).expect("Written block data invalid"); runner
.validate_db_blocks(input.stage_progress.unwrap_or_default(), stage_progress)
.expect("Written block data invalid");
// Delete a transaction // Delete a transaction
runner runner
@ -459,7 +468,10 @@ mod tests {
use assert_matches::assert_matches; use assert_matches::assert_matches;
use reth_eth_wire::BlockBody; use reth_eth_wire::BlockBody;
use reth_interfaces::{ use reth_interfaces::{
db::{models::StoredBlockBody, tables, DbCursorRO, DbTx, DbTxMut}, db::{
models::{BlockNumHash, NumTransactions, StoredBlockOmmers},
tables, DbCursorRO, DbTx, DbTxMut,
},
p2p::{ p2p::{
bodies::{ bodies::{
client::BodiesClient, client::BodiesClient,
@ -467,7 +479,10 @@ mod tests {
}, },
error::RequestResult, error::RequestResult,
}, },
test_utils::{generators::random_block_range, TestConsensus}, test_utils::{
generators::{random_block_range, random_signed_tx},
TestConsensus,
},
}; };
use reth_primitives::{BlockLocked, BlockNumber, Header, SealedHeader, H256}; use reth_primitives::{BlockLocked, BlockNumber, Header, SealedHeader, H256};
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
@ -539,11 +554,28 @@ mod tests {
type Seed = Vec<BlockLocked>; type Seed = Vec<BlockLocked>;
fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> { fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
self.insert_genesis()?;
let start = input.stage_progress.unwrap_or_default(); let start = input.stage_progress.unwrap_or_default();
let end = input.previous_stage_progress() + 1; let end = input.previous_stage_progress() + 1;
let blocks = random_block_range(start..end, GENESIS_HASH); let blocks = random_block_range(start..end, GENESIS_HASH);
self.insert_genesis()?;
self.db.insert_headers(blocks.iter().map(|block| &block.header))?; self.db.insert_headers(blocks.iter().map(|block| &block.header))?;
if let Some(progress) = blocks.first() {
// Insert last progress data
self.db.commit(|tx| {
let key = (progress.number, progress.hash()).into();
let last_count = tx
.cursor::<tables::CumulativeTxCount>()?
.last()?
.map(|(_, v)| v)
.unwrap_or_default();
let tx_count = last_count + progress.body.len() as u64;
tx.put::<tables::CumulativeTxCount>(key, tx_count)?;
tx.put::<tables::BlockOmmers>(key, StoredBlockOmmers { ommers: vec![] })?;
(last_count..tx_count).try_for_each(|idx| {
tx.put::<tables::Transactions>(idx, random_signed_tx())
})
})?;
}
self.set_responses(blocks.iter().map(body_by_hash).collect()); self.set_responses(blocks.iter().map(body_by_hash).collect());
Ok(blocks) Ok(blocks)
} }
@ -557,17 +589,20 @@ mod tests {
Some(output) => output.stage_progress, Some(output) => output.stage_progress,
None => input.stage_progress.unwrap_or_default(), None => input.stage_progress.unwrap_or_default(),
}; };
self.validate_db_blocks(highest_block) self.validate_db_blocks(highest_block, highest_block)
} }
} }
impl UnwindStageTestRunner for BodyTestRunner { impl UnwindStageTestRunner for BodyTestRunner {
fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> { fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
self.db.check_no_entry_above::<tables::BlockBodies, _>(input.unwind_to, |key| { self.db.check_no_entry_above::<tables::BlockOmmers, _>(input.unwind_to, |key| {
key.number() key.number()
})?; })?;
if let Some(last_body) = self.last_body() { self.db.check_no_entry_above::<tables::CumulativeTxCount, _>(
let last_tx_id = last_body.base_tx_id + last_body.tx_amount; input.unwind_to,
|key| key.number(),
)?;
if let Some(last_tx_id) = self.last_count() {
self.db self.db
.check_no_entry_above::<tables::Transactions, _>(last_tx_id, |key| key)?; .check_no_entry_above::<tables::Transactions, _>(last_tx_id, |key| key)?;
} }
@ -584,19 +619,19 @@ mod tests {
let header = SealedHeader::new(Header::default(), GENESIS_HASH); let header = SealedHeader::new(Header::default(), GENESIS_HASH);
self.db.insert_headers(std::iter::once(&header))?; self.db.insert_headers(std::iter::once(&header))?;
self.db.commit(|tx| { self.db.commit(|tx| {
tx.put::<tables::BlockBodies>( let key = (0, GENESIS_HASH).into();
(0, GENESIS_HASH).into(), tx.put::<tables::CumulativeTxCount>(key, 1)?;
StoredBlockBody { base_tx_id: 0, tx_amount: 0, ommers: vec![] }, tx.put::<tables::BlockOmmers>(key, StoredBlockOmmers { ommers: vec![] })?;
) tx.put::<tables::Transactions>(0, random_signed_tx())
})?; })?;
Ok(()) Ok(())
} }
/// Retrieve the last body from the database /// Retrieve the last tx count from the database
pub(crate) fn last_body(&self) -> Option<StoredBlockBody> { pub(crate) fn last_count(&self) -> Option<NumTransactions> {
self.db self.db
.query(|tx| Ok(tx.cursor::<tables::BlockBodies>()?.last()?.map(|e| e.1))) .query(|tx| Ok(tx.cursor::<tables::CumulativeTxCount>()?.last()?.map(|e| e.1)))
.ok() .ok()
.flatten() .flatten()
} }
@ -604,34 +639,55 @@ mod tests {
/// Validate that the inserted block data is valid /// Validate that the inserted block data is valid
pub(crate) fn validate_db_blocks( pub(crate) fn validate_db_blocks(
&self, &self,
prev_progress: BlockNumber,
highest_block: BlockNumber, highest_block: BlockNumber,
) -> Result<(), TestRunnerError> { ) -> Result<(), TestRunnerError> {
self.db.query(|tx| { self.db.query(|tx| {
let mut block_body_cursor = tx.cursor::<tables::BlockBodies>()?; // Acquire cursors on body related tables
let mut ommers_cursor = tx.cursor::<tables::BlockOmmers>()?;
let mut tx_count_cursor = tx.cursor::<tables::CumulativeTxCount>()?;
let mut transaction_cursor = tx.cursor::<tables::Transactions>()?; let mut transaction_cursor = tx.cursor::<tables::Transactions>()?;
let mut entry = block_body_cursor.first()?; let first_tx_count_key = match tx_count_cursor.first()? {
let mut prev_max_tx_id = 0; Some((key, _)) => key,
while let Some((key, body)) = entry { None => return Ok(()),
};
let mut walker = tx_count_cursor.walk(first_tx_count_key)?.peekable();
let mut prev_entry: Option<(BlockNumHash, NumTransactions)> = None;
while let Some(entry) = walker.next() {
let (key, count) = entry?;
// Validate sequentiality only after prev progress,
// since the data before is mocked and can contain gaps
if key.number() > prev_progress {
if let Some((prev_key, _)) = prev_entry {
assert_eq!(prev_key.number() + 1, key.number(), "Tx count entries must be sequential");
}
}
// Validate that the current entry is below or equals to the highest allowed block
assert!( assert!(
key.number() <= highest_block, key.number() <= highest_block,
"We wrote a block body outside of our synced range. Found block with number {}, highest block according to stage is {}", "We wrote a block body outside of our synced range. Found block with number {}, highest block according to stage is {}",
key.number(), highest_block key.number(), highest_block
); );
assert!(prev_max_tx_id == body.base_tx_id, "Transaction IDs are malformed."); // Validate that ommers exist
for num in 0..body.tx_amount { assert_matches!(ommers_cursor.seek_exact(key), Ok(Some(_)), "Block ommers are missing");
let tx_id = body.base_tx_id + num;
// Validate that block trasactions exist
let first_tx_id = prev_entry.map(|(_, v)| v).unwrap_or_default();
for tx_id in first_tx_id..count {
assert_matches!( assert_matches!(
transaction_cursor.seek_exact(tx_id), transaction_cursor.seek_exact(tx_id),
Ok(Some(_)), Ok(Some(_)),
"A transaction is missing." "A transaction is missing."
); );
} }
prev_max_tx_id = body.base_tx_id + body.tx_amount;
entry = block_body_cursor.next()?;
}
prev_entry = Some((key, count));
}
Ok(()) Ok(())
})?; })?;
Ok(()) Ok(())

View File

@ -6,5 +6,3 @@ pub mod execution;
pub mod headers; pub mod headers;
/// The sender recovery stage. /// The sender recovery stage.
pub mod senders; pub mod senders;
/// The cumulative transaction index stage.
pub mod tx_index;

View File

@ -48,23 +48,34 @@ impl<DB: Database> Stage<DB> for SendersStage {
db: &mut StageDB<'_, DB>, db: &mut StageDB<'_, DB>,
input: ExecInput, input: ExecInput,
) -> Result<ExecOutput, StageError> { ) -> Result<ExecOutput, StageError> {
// Look up the start index for transaction range let stage_progress = input.stage_progress.unwrap_or_default();
let last_block = db.get_block_numhash(input.stage_progress.unwrap_or_default())?;
let start_tx_index = db.get_tx_count(last_block)?;
// Look up the end index for transaction range (exclusive) // Look up the start index for the transaction range
let max_block = db.get_block_numhash(input.previous_stage_progress())?; let start_tx_index = db.get_first_tx_id(stage_progress + 1)?;
let end_tx_index = db.get_tx_count(max_block)?;
// Look up the end index for transaction range (inclusive)
let max_block_num = input.previous_stage_progress();
let end_tx_index = match db.get_latest_tx_id(max_block_num) {
Ok(id) => id,
// No transactions in the database
Err(_) => {
return Ok(ExecOutput {
stage_progress: max_block_num,
done: true,
reached_tip: true,
})
}
};
// Acquire the cursor for inserting elements // Acquire the cursor for inserting elements
let mut senders_cursor = db.cursor_mut::<tables::TxSenders>()?; let mut senders_cursor = db.cursor_mut::<tables::TxSenders>()?;
// Acquire the cursor over the transactions // Acquire the cursor over the transactions
let mut tx_cursor = db.cursor::<tables::Transactions>()?; let mut tx_cursor = db.cursor::<tables::Transactions>()?;
// Walk the transactions from start to end index (exclusive) // Walk the transactions from start to end index (inclusive)
let entries = tx_cursor let entries = tx_cursor
.walk(start_tx_index)? .walk(start_tx_index)?
.take_while(|res| res.as_ref().map(|(k, _)| *k < end_tx_index).unwrap_or_default()); .take_while(|res| res.as_ref().map(|(k, _)| *k <= end_tx_index).unwrap_or_default());
// Iterate over transactions in chunks // Iterate over transactions in chunks
for chunk in &entries.chunks(self.batch_size) { for chunk in &entries.chunks(self.batch_size) {
@ -84,7 +95,7 @@ impl<DB: Database> Stage<DB> for SendersStage {
recovered.into_iter().try_for_each(|(id, sender)| senders_cursor.append(id, sender))?; recovered.into_iter().try_for_each(|(id, sender)| senders_cursor.append(id, sender))?;
} }
Ok(ExecOutput { stage_progress: max_block.number(), done: true, reached_tip: true }) Ok(ExecOutput { stage_progress: max_block_num, done: true, reached_tip: true })
} }
/// Unwind the stage. /// Unwind the stage.
@ -93,29 +104,16 @@ impl<DB: Database> Stage<DB> for SendersStage {
db: &mut StageDB<'_, DB>, db: &mut StageDB<'_, DB>,
input: UnwindInput, input: UnwindInput,
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>> { ) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>> {
// Look up the hash of the unwind block // Lookup latest tx id that we should unwind to
if let Some((_, unwind_hash)) = let latest_tx_id = db.get_latest_tx_id(input.unwind_to).unwrap_or_default();
db.get_exact_or_prev::<tables::CanonicalHeaders>(input.unwind_to)? db.unwind_table_by_num::<tables::TxSenders>(latest_tx_id)?;
{
// Look up the cumulative tx count at unwind block
let key = (input.unwind_to, unwind_hash).into();
if let Some((_, unwind_tx_count)) =
db.get_exact_or_prev::<tables::CumulativeTxCount>(key)?
{
// The last remaining tx_id should be at `cum_tx_count - 1`
db.unwind_table_by_num::<tables::TxSenders>(unwind_tx_count - 1)?;
}
}
Ok(UnwindOutput { stage_progress: input.unwind_to }) Ok(UnwindOutput { stage_progress: input.unwind_to })
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use reth_interfaces::{ use reth_interfaces::test_utils::generators::random_block_range;
db::models::StoredBlockBody, test_utils::generators::random_block_range,
};
use reth_primitives::{BlockLocked, BlockNumber, H256}; use reth_primitives::{BlockLocked, BlockNumber, H256};
use super::*; use super::*;
@ -155,19 +153,21 @@ mod tests {
self.db.commit(|tx| { self.db.commit(|tx| {
let mut base_tx_id = 0; let mut base_tx_id = 0;
blocks.iter().try_for_each(|b| { blocks.iter().try_for_each(|b| {
let ommers = b.ommers.iter().map(|o| o.clone().unseal()).collect::<Vec<_>>();
let txs = b.body.clone(); let txs = b.body.clone();
let tx_amount = txs.len() as u64; let tx_amount = txs.len() as u64;
let num_hash = (b.number, b.hash()).into(); let num_hash = (b.number, b.hash()).into();
tx.put::<tables::CanonicalHeaders>(b.number, b.hash())?; tx.put::<tables::CanonicalHeaders>(b.number, b.hash())?;
tx.put::<tables::BlockBodies>(
num_hash,
StoredBlockBody { base_tx_id, tx_amount, ommers },
)?;
tx.put::<tables::CumulativeTxCount>(num_hash, base_tx_id + tx_amount)?; tx.put::<tables::CumulativeTxCount>(num_hash, base_tx_id + tx_amount)?;
for body_tx in txs { for body_tx in txs {
// Insert senders for previous stage progress
if b.number == stage_progress {
tx.put::<tables::TxSenders>(
base_tx_id,
body_tx.recover_signer().expect("failed to recover sender"),
)?;
}
tx.put::<tables::Transactions>(base_tx_id, body_tx)?; tx.put::<tables::Transactions>(base_tx_id, body_tx)?;
base_tx_id += 1; base_tx_id += 1;
} }
@ -194,13 +194,17 @@ mod tests {
return Ok(()) return Ok(())
} }
let start_hash = tx.get::<tables::CanonicalHeaders>(start_block)?.unwrap(); let mut tx_count_cursor = tx.cursor::<tables::CumulativeTxCount>()?;
let mut body_cursor = tx.cursor::<tables::BlockBodies>()?;
let body_walker = body_cursor.walk((start_block, start_hash).into())?;
for entry in body_walker { let last_block = start_block - 1;
let (_, body) = entry?; let last_hash = tx.get::<tables::CanonicalHeaders>(start_block)?.unwrap();
for tx_id in body.base_tx_id..body.base_tx_id + body.tx_amount { let mut last_tx_count = tx_count_cursor
.seek_exact((last_block, last_hash).into())?
.map(|(_, v)| v)
.unwrap_or_default();
while let Some((_, count)) = tx_count_cursor.next()? {
for tx_id in last_tx_count..count {
let transaction = tx let transaction = tx
.get::<tables::Transactions>(tx_id)? .get::<tables::Transactions>(tx_id)?
.expect("no transaction entry"); .expect("no transaction entry");
@ -208,12 +212,13 @@ mod tests {
transaction.recover_signer().expect("failed to recover signer"); transaction.recover_signer().expect("failed to recover signer");
assert_eq!(Some(signer), tx.get::<tables::TxSenders>(tx_id)?); assert_eq!(Some(signer), tx.get::<tables::TxSenders>(tx_id)?);
} }
last_tx_count = count;
} }
Ok(()) Ok(())
})?; })?;
} else { } else {
self.check_no_transaction_by_block(input.stage_progress.unwrap_or_default())?; self.check_no_senders_by_block(input.stage_progress.unwrap_or_default())?;
} }
Ok(()) Ok(())
@ -222,42 +227,23 @@ mod tests {
impl UnwindStageTestRunner for SendersTestRunner { impl UnwindStageTestRunner for SendersTestRunner {
fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> { fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
self.check_no_transaction_by_block(input.unwind_to) self.check_no_senders_by_block(input.unwind_to)
} }
} }
impl SendersTestRunner { impl SendersTestRunner {
fn check_no_transaction_by_block(&self, block: BlockNumber) -> Result<(), TestRunnerError> { fn check_no_senders_by_block(&self, block: BlockNumber) -> Result<(), TestRunnerError> {
match self.get_block_body_entry(block)? { let latest_tx_id = self.db.inner().get_latest_tx_id(block);
Some(body) => { match latest_tx_id {
let last_index = body.base_tx_id + body.tx_amount; Ok(last_index) => {
self.db.check_no_entry_above::<tables::TxSenders, _>(last_index, |key| key)?; self.db.check_no_entry_above::<tables::TxSenders, _>(last_index, |key| key)?
} }
None => { Err(_) => {
assert!(self.db.table_is_empty::<tables::TxSenders>()?); assert!(self.db.table_is_empty::<tables::TxSenders>()?);
} }
}; };
Ok(())
}
/// Get the block body entry at block number. If it doesn't exist, Ok(())
/// fallback to the previous entry.
fn get_block_body_entry(
&self,
block: BlockNumber,
) -> Result<Option<StoredBlockBody>, TestRunnerError> {
let entry = self.db.query(|tx| match tx.get::<tables::CanonicalHeaders>(block)? {
Some(hash) => {
let mut body_cursor = tx.cursor::<tables::BlockBodies>()?;
let entry = match body_cursor.seek_exact((block, hash).into())? {
Some((_, block)) => Some(block),
_ => body_cursor.prev()?.map(|(_, block)| block),
};
Ok(entry)
}
None => Ok(None),
})?;
Ok(entry)
} }
} }
} }

View File

@ -1,192 +0,0 @@
use crate::{
db::StageDB, DatabaseIntegrityError, ExecInput, ExecOutput, Stage, StageError, StageId,
UnwindInput, UnwindOutput,
};
use reth_interfaces::db::{tables, Database, DbCursorRO, DbCursorRW, DbTxMut};
use std::fmt::Debug;
const TX_INDEX: StageId = StageId("TxIndex");
/// The cumulative transaction index stage
/// implementation for staged sync. This stage
/// updates the cumulative transaction count per block.
///
/// e.g. [key, value] entries in [tables::CumulativeTxCount]
/// block #1 with 24 transactions - [1, 24]
/// block #2 with 42 transactions - [2, 66]
/// block #3 with 33 transaction - [3, 99]
#[derive(Debug)]
pub struct TxIndex;
#[async_trait::async_trait]
impl<DB: Database> Stage<DB> for TxIndex {
/// Return the id of the stage
fn id(&self) -> StageId {
TX_INDEX
}
/// Execute the stage
async fn execute(
&mut self,
db: &mut StageDB<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
// The progress of this stage during last iteration
let last_block = db.get_block_numhash(input.stage_progress.unwrap_or_default())?;
// The start block for this iteration
let start_block = db.get_block_numhash(last_block.number() + 1)?;
// The maximum block that this stage should insert to
let max_block = input.previous_stage_progress();
// Get the cursor over the table
let mut cursor = db.cursor_mut::<tables::CumulativeTxCount>()?;
// Find the last count that was inserted during previous iteration
let (_, mut count) =
cursor.seek_exact(last_block)?.ok_or(DatabaseIntegrityError::CumulativeTxCount {
number: last_block.number(),
hash: last_block.hash(),
})?;
// Get the cursor over block bodies
let mut body_cursor = db.cursor_mut::<tables::BlockBodies>()?;
let walker = body_cursor.walk(start_block)?;
// Walk the block body entries up to maximum block (including)
let entries = walker
.take_while(|b| b.as_ref().map(|(k, _)| k.number() <= max_block).unwrap_or_default());
// Aggregate and insert cumulative transaction count for each block number
for entry in entries {
let (key, body) = entry?;
count += body.tx_amount;
cursor.append(key, count)?;
}
Ok(ExecOutput { done: true, reached_tip: true, stage_progress: max_block })
}
/// Unwind the stage.
async fn unwind(
&mut self,
db: &mut StageDB<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>> {
db.unwind_table_by_num_hash::<tables::CumulativeTxCount>(input.unwind_to)?;
Ok(UnwindOutput { stage_progress: input.unwind_to })
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::{
stage_test_suite, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestStageDB,
UnwindStageTestRunner,
};
use reth_interfaces::{
db::{
models::{BlockNumHash, StoredBlockBody},
DbTx,
},
test_utils::generators::random_header_range,
};
use reth_primitives::H256;
stage_test_suite!(TxIndexTestRunner);
#[derive(Default)]
pub(crate) struct TxIndexTestRunner {
db: TestStageDB,
}
impl StageTestRunner for TxIndexTestRunner {
type S = TxIndex;
fn db(&self) -> &TestStageDB {
&self.db
}
fn stage(&self) -> Self::S {
TxIndex {}
}
}
impl ExecuteStageTestRunner for TxIndexTestRunner {
type Seed = ();
fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
let pivot = input.stage_progress.unwrap_or_default();
let start = pivot.saturating_sub(100);
let mut end = input.previous_stage_progress();
end += 2; // generate 2 additional headers to account for start header lookup
let headers = random_header_range(start..end, H256::zero());
let headers =
headers.into_iter().map(|h| (h, rand::random::<u8>())).collect::<Vec<_>>();
self.db.map_put::<tables::CanonicalHeaders, _, _>(&headers, |(h, _)| {
(h.number, h.hash())
})?;
self.db.map_put::<tables::BlockBodies, _, _>(&headers, |(h, count)| {
(
BlockNumHash((h.number, h.hash())),
StoredBlockBody { base_tx_id: 0, tx_amount: *count as u64, ommers: vec![] },
)
})?;
let slice_up_to =
std::cmp::min(pivot.saturating_sub(start) as usize, headers.len() - 1);
self.db.transform_append::<tables::CumulativeTxCount, _, _>(
&headers[..=slice_up_to],
|prev, (h, count)| {
(BlockNumHash((h.number, h.hash())), prev.unwrap_or_default() + (*count as u64))
},
)?;
Ok(())
}
fn validate_execution(
&self,
input: ExecInput,
_output: Option<ExecOutput>,
) -> Result<(), TestRunnerError> {
// TODO: validate that base_tx_index of next block equals the cum count at current
self.db.query(|tx| {
let (start, end) =
(input.stage_progress.unwrap_or_default(), input.previous_stage_progress());
if start >= end {
return Ok(())
}
let start_hash =
tx.get::<tables::CanonicalHeaders>(start)?.expect("no canonical found");
let mut tx_count_cursor = tx.cursor::<tables::CumulativeTxCount>()?;
let mut tx_count_walker = tx_count_cursor.walk((start, start_hash).into())?;
let mut count = tx_count_walker.next().unwrap()?.1;
let mut last_num = start;
for entry in tx_count_walker {
let (key, db_count) = entry?;
count += tx.get::<tables::BlockBodies>(key)?.unwrap().tx_amount;
assert_eq!(db_count, count);
last_num = key.number();
}
assert_eq!(last_num, end);
Ok(())
})?;
Ok(())
}
}
impl UnwindStageTestRunner for TxIndexTestRunner {
fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
self.db.check_no_entry_above::<tables::CumulativeTxCount, _>(input.unwind_to, |h| {
h.number()
})?;
Ok(())
}
}
}

View File

@ -30,7 +30,7 @@ impl Default for TestStageDB {
impl TestStageDB { impl TestStageDB {
/// Return a database wrapped in [StageDB]. /// Return a database wrapped in [StageDB].
fn inner(&self) -> StageDB<'_, Env<WriteMap>> { pub(crate) fn inner(&self) -> StageDB<'_, Env<WriteMap>> {
StageDB::new(self.db.borrow()).expect("failed to create db container") StageDB::new(self.db.borrow()).expect("failed to create db container")
} }
@ -73,6 +73,7 @@ impl TestStageDB {
/// let db = StageTestDB::default(); /// let db = StageTestDB::default();
/// db.map_put::<Table, _, _>(&items, |item| item)?; /// db.map_put::<Table, _, _>(&items, |item| item)?;
/// ``` /// ```
#[allow(dead_code)]
pub(crate) fn map_put<T, S, F>(&self, values: &[S], mut map: F) -> Result<(), db::Error> pub(crate) fn map_put<T, S, F>(&self, values: &[S], mut map: F) -> Result<(), db::Error>
where where
T: Table, T: Table,
@ -96,6 +97,7 @@ impl TestStageDB {
/// let db = StageTestDB::default(); /// let db = StageTestDB::default();
/// db.transform_append::<Table, _, _>(&items, |prev, item| prev.unwrap_or_default() + item)?; /// db.transform_append::<Table, _, _>(&items, |prev, item| prev.unwrap_or_default() + item)?;
/// ``` /// ```
#[allow(dead_code)]
pub(crate) fn transform_append<T, S, F>( pub(crate) fn transform_append<T, S, F>(
&self, &self,
values: &[S], values: &[S],