feat: Refactor TransitionId tables, BlockBodyIndices table. (#2039)

This commit is contained in:
rakita
2023-04-05 13:10:18 +02:00
committed by GitHub
parent 9a011b4900
commit 633d84ded0
31 changed files with 476 additions and 367 deletions

View File

@ -164,7 +164,7 @@ impl Command {
HeaderTD,
HeaderNumbers,
Headers,
BlockBodies,
BlockBodyIndices,
BlockOmmers,
BlockWithdrawals,
TransactionBlock,
@ -174,8 +174,6 @@ impl Command {
PlainStorageState,
PlainAccountState,
Bytecodes,
BlockTransitionIndex,
TxTransitionIndex,
AccountHistory,
StorageHistory,
AccountChangeSet,

View File

@ -52,7 +52,7 @@ fn import_tables_with_range<DB: Database>(
tx.import_table_with_range::<tables::Headers, _>(&db_tool.db.tx()?, Some(from), to)
})??;
output_db.update(|tx| {
tx.import_table_with_range::<tables::BlockBodies, _>(&db_tool.db.tx()?, Some(from), to)
tx.import_table_with_range::<tables::BlockBodyIndices, _>(&db_tool.db.tx()?, Some(from), to)
})??;
output_db.update(|tx| {
tx.import_table_with_range::<tables::BlockOmmers, _>(&db_tool.db.tx()?, Some(from), to)
@ -60,15 +60,15 @@ fn import_tables_with_range<DB: Database>(
// Find range of transactions that need to be copied over
let (from_tx, to_tx) = db_tool.db.view(|read_tx| {
let mut read_cursor = read_tx.cursor_read::<tables::BlockBodies>()?;
let mut read_cursor = read_tx.cursor_read::<tables::BlockBodyIndices>()?;
let (_, from_block) =
read_cursor.seek(from)?.ok_or(eyre::eyre!("BlockBody {from} does not exist."))?;
let (_, to_block) =
read_cursor.seek(to)?.ok_or(eyre::eyre!("BlockBody {to} does not exist."))?;
Ok::<(u64, u64), eyre::ErrReport>((
from_block.start_tx_id,
to_block.start_tx_id + to_block.tx_count,
from_block.first_tx_num,
to_block.first_tx_num + to_block.tx_count,
))
})??;

View File

@ -21,10 +21,14 @@ pub(crate) async fn dump_hashing_account_stage<DB: Database>(
// Import relevant AccountChangeSets
let tx = db_tool.db.tx()?;
let from_transition_rev =
tx.get::<tables::BlockTransitionIndex>(from)?.expect("there should be at least one.");
let to_transition_rev =
tx.get::<tables::BlockTransitionIndex>(to)?.expect("there should be at least one.");
let from_transition_rev = tx
.get::<tables::BlockBodyIndices>(from)?
.expect("there should be at least one.")
.transition_at_block();
let to_transition_rev = tx
.get::<tables::BlockBodyIndices>(to)?
.expect("there should be at least one.")
.transition_after_block();
output_db.update(|tx| {
tx.import_table_with_range::<tables::AccountChangeSet, _>(
&db_tool.db.tx()?,

View File

@ -28,10 +28,14 @@ pub(crate) async fn dump_merkle_stage<DB: Database>(
})??;
let tx = db_tool.db.tx()?;
let from_transition_rev =
tx.get::<tables::BlockTransitionIndex>(from)?.expect("there should be at least one.");
let to_transition_rev =
tx.get::<tables::BlockTransitionIndex>(to)?.expect("there should be at least one.");
let from_transition_rev = tx
.get::<tables::BlockBodyIndices>(from)?
.expect("there should be at least one.")
.transition_at_block();
let to_transition_rev = tx
.get::<tables::BlockBodyIndices>(to)?
.expect("there should be at least one.")
.transition_after_block();
output_db.update(|tx| {
tx.import_table_with_range::<tables::AccountChangeSet, _>(

View File

@ -108,8 +108,8 @@ impl Command {
}
}
/// Sets up the database and initial state on `BlockTransitionIndex`. Also returns the tip block
/// number.
/// Sets up the database and initial state on [`tables::BlockBodyIndices`]. Also returns the tip
/// block number.
pub(crate) fn setup<DB: Database>(
from: u64,
to: u64,
@ -123,17 +123,15 @@ pub(crate) fn setup<DB: Database>(
let output_db = init_db(output_db)?;
output_db.update(|tx| {
tx.import_table_with_range::<tables::BlockTransitionIndex, _>(
tx.import_table_with_range::<tables::BlockBodyIndices, _>(
&db_tool.db.tx()?,
Some(from - 1),
to + 1,
)
})??;
let (tip_block_number, _) = db_tool
.db
.view(|tx| tx.cursor_read::<tables::BlockTransitionIndex>()?.last())??
.expect("some");
let (tip_block_number, _) =
db_tool.db.view(|tx| tx.cursor_read::<tables::BlockBodyIndices>()?.last())??.expect("some");
Ok((output_db, tip_block_number))
}

View File

@ -60,11 +60,9 @@ pub(crate) fn generate_vectors(mut tables: Vec<String>) -> Result<()> {
(HeaderTD, PER_TABLE, TABLE),
(HeaderNumbers, PER_TABLE, TABLE),
(Headers, PER_TABLE, TABLE),
(BlockBodies, PER_TABLE, TABLE),
(BlockBodyIndices, PER_TABLE, TABLE),
(BlockOmmers, 100, TABLE),
(TxHashNumber, PER_TABLE, TABLE),
(BlockTransitionIndex, PER_TABLE, TABLE),
(TxTransitionIndex, PER_TABLE, TABLE),
(Transactions, 100, TABLE),
(PlainStorageState, PER_TABLE, DUPSORT),
(PlainAccountState, PER_TABLE, TABLE)

View File

@ -17,8 +17,8 @@ pub enum ProviderError {
#[error("Block hash {block_hash:?} does not exist in Headers table")]
BlockHash { block_hash: BlockHash },
/// A block body is missing.
#[error("Block body not found for block #{number}")]
BlockBody { number: BlockNumber },
#[error("Block meta not found for block #{number}")]
BlockBodyIndices { number: BlockNumber },
/// The block transition id for a certain block number is missing.
#[error("Block transition id does not exist for block #{block_number}")]
BlockTransition { block_number: BlockNumber },

View File

@ -64,8 +64,7 @@ pub fn init_genesis<DB: Database>(
// Insert header
tx.put::<tables::CanonicalHeaders>(0, hash)?;
tx.put::<tables::HeaderNumbers>(hash, 0)?;
tx.put::<tables::BlockBodies>(0, Default::default())?;
tx.put::<tables::BlockTransitionIndex>(0, 0)?;
tx.put::<tables::BlockBodyIndices>(0, Default::default())?;
tx.put::<tables::HeaderTD>(0, header.difficulty.into())?;
tx.put::<tables::Headers>(0, header)?;

View File

@ -98,7 +98,7 @@ fn merkle(c: &mut Criterion) {
&mut group,
setup::unwind_hashes,
stage,
1..DEFAULT_NUM_BLOCKS + 1,
1..DEFAULT_NUM_BLOCKS,
"Merkle-incremental".to_string(),
);
@ -107,7 +107,7 @@ fn merkle(c: &mut Criterion) {
&mut group,
setup::unwind_hashes,
stage,
1..DEFAULT_NUM_BLOCKS + 1,
1..DEFAULT_NUM_BLOCKS,
"Merkle-fullhash".to_string(),
);
}

View File

@ -11,7 +11,7 @@ use std::path::{Path, PathBuf};
/// Prepares a database for [`AccountHashingStage`]
/// If the environment variable [`constants::ACCOUNT_HASHING_DB`] is set, it will use that one and
/// will get the stage execution range from [`tables::BlockTransitionIndex`]. Otherwise, it will
/// will get the stage execution range from [`tables::BlockBodyIndices`]. Otherwise, it will
/// generate its own random data.
///
/// Returns the path to the database file, stage and range of stage execution if it exists.
@ -33,7 +33,7 @@ fn find_stage_range(db: &Path) -> StageRange {
TestTransaction::new(db)
.tx
.view(|tx| {
let mut cursor = tx.cursor_read::<tables::BlockTransitionIndex>()?;
let mut cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
let from = cursor.first()?.unwrap().0;
let to = cursor.last()?.unwrap().0;
@ -55,7 +55,7 @@ fn find_stage_range(db: &Path) -> StageRange {
fn generate_testdata_db(num_blocks: u64) -> (PathBuf, StageRange) {
let opts = SeedOpts {
blocks: 0..num_blocks + 1,
accounts: 0..10_000,
accounts: 0..100_000,
txs: 100..150,
transitions: 10_000 + 1,
};

View File

@ -730,15 +730,15 @@ mod tests {
let db = test_utils::create_test_db::<mdbx::WriteMap>(EnvKind::RW);
let mut pipeline: Pipeline<_, NoopSyncStateUpdate> = Pipeline::builder()
.add_stage(TestStage::new(StageId("Fatal")).add_exec(Err(
StageError::DatabaseIntegrity(ProviderError::BlockBody { number: 5 }),
StageError::DatabaseIntegrity(ProviderError::BlockBodyIndices { number: 5 }),
)))
.build();
let result = pipeline.run(db).await;
assert_matches!(
result,
Err(PipelineError::Stage(StageError::DatabaseIntegrity(ProviderError::BlockBody {
number: 5
})))
Err(PipelineError::Stage(StageError::DatabaseIntegrity(
ProviderError::BlockBodyIndices { number: 5 }
)))
);
}
}

View File

@ -6,7 +6,7 @@ use futures_util::TryStreamExt;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
database::Database,
models::{StoredBlockBody, StoredBlockOmmers, StoredBlockWithdrawals},
models::{StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals},
tables,
transaction::{DbTx, DbTxMut},
};
@ -42,11 +42,9 @@ pub const BODIES: StageId = StageId("Bodies");
/// The bodies are processed and data is inserted into these tables:
///
/// - [`BlockOmmers`][reth_db::tables::BlockOmmers]
/// - [`BlockBodies`][reth_db::tables::BlockBodies]
/// - [`BlockBodies`][reth_db::tables::BlockBodyIndices]
/// - [`Transactions`][reth_db::tables::Transactions]
/// - [`TransactionBlock`][reth_db::tables::TransactionBlock]
/// - [`BlockTransitionIndex`][reth_db::tables::BlockTransitionIndex]
/// - [`TxTransitionIndex`][reth_db::tables::TxTransitionIndex]
///
/// # Genesis
///
@ -54,10 +52,8 @@ pub const BODIES: StageId = StageId("Bodies");
///
/// - The header tables (see [`HeaderStage`][crate::stages::HeaderStage])
/// - The [`BlockOmmers`][reth_db::tables::BlockOmmers] table
/// - The [`BlockBodies`][reth_db::tables::BlockBodies] table
/// - The [`BlockBodies`][reth_db::tables::BlockBodyIndices] table
/// - The [`Transactions`][reth_db::tables::Transactions] table
/// - The [`BlockTransitionIndex`][reth_db::tables::BlockTransitionIndex] table
/// - The [`TxTransitionIndex`][reth_db::tables::TxTransitionIndex] table
#[derive(Debug)]
pub struct BodyStage<D: BodyDownloader> {
/// The body downloader.
@ -89,21 +85,17 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
let mut td_cursor = tx.cursor_read::<tables::HeaderTD>()?;
// Cursors used to write bodies, ommers and transactions
let mut body_cursor = tx.cursor_write::<tables::BlockBodies>()?;
let mut block_meta_cursor = tx.cursor_write::<tables::BlockBodyIndices>()?;
let mut tx_cursor = tx.cursor_write::<tables::Transactions>()?;
let mut tx_block_cursor = tx.cursor_write::<tables::TransactionBlock>()?;
let mut ommers_cursor = tx.cursor_write::<tables::BlockOmmers>()?;
let mut withdrawals_cursor = tx.cursor_write::<tables::BlockWithdrawals>()?;
// Cursors used to write state transition mapping
let mut block_transition_cursor = tx.cursor_write::<tables::BlockTransitionIndex>()?;
let mut tx_transition_cursor = tx.cursor_write::<tables::TxTransitionIndex>()?;
// Get id for the first transaction and first transition in the block
let (mut current_tx_id, mut transition_id) = tx.get_next_block_ids(start_block)?;
let (mut next_tx_num, mut next_transition_id) = tx.get_next_block_ids(start_block)?;
let mut highest_block = input.stage_progress.unwrap_or_default();
debug!(target: "sync::stages::bodies", stage_progress = highest_block, target = end_block, start_tx_id = current_tx_id, transition_id, "Commencing sync");
debug!(target: "sync::stages::bodies", stage_progress = highest_block, target = end_block, start_tx_id = next_tx_num, next_transition_id, "Commencing sync");
// Task downloader can return `None` only if the response relaying channel was closed. This
// is a fatal error to prevent the pipeline from running forever.
@ -116,29 +108,24 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
let block_number = response.block_number();
let difficulty = response.difficulty();
let first_tx_num = next_tx_num;
let first_transition_id = next_transition_id;
let mut tx_count = 0;
let mut has_withdrawals = false;
match response {
BlockResponse::Full(block) => {
let body = StoredBlockBody {
start_tx_id: current_tx_id,
tx_count: block.body.len() as u64,
};
body_cursor.append(block_number, body.clone())?;
tx_count = block.body.len() as u64;
// write transaction block index
if !body.is_empty() {
tx_block_cursor.append(body.last_tx_index(), block.number)?;
if !block.body.is_empty() {
tx_block_cursor.append(first_tx_num, block.number)?;
}
// Write transactions
for transaction in block.body {
// Append the transaction
tx_cursor.append(current_tx_id, transaction)?;
tx_transition_cursor.append(current_tx_id, transition_id)?;
tx_cursor.append(next_tx_num, transaction)?;
// Increment transaction id for each transaction.
current_tx_id += 1;
// Increment transition id for each transaction.
transition_id += 1;
next_tx_num += 1;
}
// Write ommers if any
@ -164,12 +151,7 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
}
}
}
BlockResponse::Empty(_) => {
body_cursor.append(
block_number,
StoredBlockBody { start_tx_id: current_tx_id, tx_count: 0 },
)?;
}
BlockResponse::Empty(_) => {}
};
// The block transition marks the final state at the end of the block.
@ -181,11 +163,22 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
.ok_or(ProviderError::TotalDifficulty { number: block_number })?
.1;
let has_reward = self.consensus.has_block_reward(td.into(), difficulty);
let has_post_block_transition = has_reward || has_withdrawals;
if has_post_block_transition {
transition_id += 1;
}
block_transition_cursor.append(block_number, transition_id)?;
let has_block_change = has_reward || has_withdrawals;
// Increment transition id for each transaction,
// and by +1 if the block has its own state change (an block reward or withdrawals).
next_transition_id += tx_count + has_block_change as u64;
// insert block meta
block_meta_cursor.append(
block_number,
StoredBlockBodyIndices {
first_tx_num,
first_transition_id,
has_block_change,
tx_count,
},
)?;
highest_block = block_number;
}
@ -206,17 +199,15 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
) -> Result<UnwindOutput, StageError> {
info!(target: "sync::stages::bodies", to_block = input.unwind_to, "Unwinding");
// Cursors to unwind bodies, ommers
let mut body_cursor = tx.cursor_write::<tables::BlockBodies>()?;
let mut body_cursor = tx.cursor_write::<tables::BlockBodyIndices>()?;
let mut transaction_cursor = tx.cursor_write::<tables::Transactions>()?;
let mut ommers_cursor = tx.cursor_write::<tables::BlockOmmers>()?;
let mut withdrawals_cursor = tx.cursor_write::<tables::BlockWithdrawals>()?;
// Cursors to unwind transitions
let mut block_transition_cursor = tx.cursor_write::<tables::BlockTransitionIndex>()?;
let mut tx_transition_cursor = tx.cursor_write::<tables::TxTransitionIndex>()?;
let mut tx_block_cursor = tx.cursor_write::<tables::TransactionBlock>()?;
let mut rev_walker = body_cursor.walk_back(None)?;
while let Some((number, body)) = rev_walker.next().transpose()? {
while let Some((number, block_meta)) = rev_walker.next().transpose()? {
if number <= input.unwind_to {
break
}
@ -231,30 +222,23 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
withdrawals_cursor.delete_current()?;
}
// Delete the block transition if any
if block_transition_cursor.seek_exact(number)?.is_some() {
block_transition_cursor.delete_current()?;
}
// Delete all transaction to block values.
if !body.is_empty() && tx_block_cursor.seek_exact(body.last_tx_index())?.is_some() {
if !block_meta.is_empty() &&
tx_block_cursor.seek_exact(block_meta.last_tx_num())?.is_some()
{
tx_block_cursor.delete_current()?;
}
// Delete all transactions that belong to this block
for tx_id in body.tx_id_range() {
for tx_id in block_meta.tx_num_range() {
// First delete the transaction
if transaction_cursor.seek_exact(tx_id)?.is_some() {
transaction_cursor.delete_current()?;
}
// Delete the transaction transition if any
if tx_transition_cursor.seek_exact(tx_id)?.is_some() {
tx_transition_cursor.delete_current()?;
}
}
// Delete the current body value
tx.delete::<tables::BlockBodies>(number, None)?;
tx.delete::<tables::BlockBodyIndices>(number, None)?;
}
Ok(UnwindOutput { stage_progress: input.unwind_to })
@ -443,7 +427,7 @@ mod tests {
cursor::DbCursorRO,
database::Database,
mdbx::{Env, WriteMap},
models::{StoredBlockBody, StoredBlockOmmers},
models::{StoredBlockBodyIndices, StoredBlockOmmers},
tables,
transaction::{DbTx, DbTxMut},
};
@ -548,30 +532,26 @@ mod tests {
if let Some(progress) = blocks.first() {
// Insert last progress data
self.tx.commit(|tx| {
let body = StoredBlockBody {
start_tx_id: 0,
let body = StoredBlockBodyIndices {
first_tx_num: 0,
first_transition_id: 0,
tx_count: progress.body.len() as u64,
has_block_change: true,
};
body.tx_id_range().try_for_each(|tx_id| {
body.tx_num_range().try_for_each(|tx_num| {
let transaction = random_signed_tx();
tx.put::<tables::Transactions>(tx_id, transaction)?;
tx.put::<tables::TxTransitionIndex>(tx_id, tx_id)
tx.put::<tables::Transactions>(tx_num, transaction)
})?;
let last_transition_id = progress.body.len() as u64;
let block_transition_id = last_transition_id + 1; // for block reward
tx.put::<tables::BlockTransitionIndex>(
progress.number,
block_transition_id,
)?;
if body.tx_count != 0 {
tx.put::<tables::TransactionBlock>(
body.first_tx_index(),
body.first_tx_num(),
progress.number,
)?;
}
tx.put::<tables::BlockBodies>(progress.number, body)?;
tx.put::<tables::BlockBodyIndices>(progress.number, body)?;
if !progress.ommers_hash_is_empty() {
tx.put::<tables::BlockOmmers>(
progress.number,
@ -606,21 +586,15 @@ mod tests {
impl UnwindStageTestRunner for BodyTestRunner {
fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
self.tx
.ensure_no_entry_above::<tables::BlockBodies, _>(input.unwind_to, |key| key)?;
self.tx
.ensure_no_entry_above::<tables::BlockOmmers, _>(input.unwind_to, |key| key)?;
self.tx.ensure_no_entry_above::<tables::BlockTransitionIndex, _>(
self.tx.ensure_no_entry_above::<tables::BlockBodyIndices, _>(
input.unwind_to,
|key| key,
)?;
self.tx
.ensure_no_entry_above::<tables::BlockOmmers, _>(input.unwind_to, |key| key)?;
if let Some(last_tx_id) = self.get_last_tx_id()? {
self.tx
.ensure_no_entry_above::<tables::Transactions, _>(last_tx_id, |key| key)?;
self.tx.ensure_no_entry_above::<tables::TxTransitionIndex, _>(
last_tx_id,
|key| key,
)?;
self.tx.ensure_no_entry_above::<tables::TransactionBlock, _>(
last_tx_id,
|key| key,
@ -634,12 +608,12 @@ mod tests {
/// Get the last available tx id if any
pub(crate) fn get_last_tx_id(&self) -> Result<Option<TxNumber>, TestRunnerError> {
let last_body = self.tx.query(|tx| {
let v = tx.cursor_read::<tables::BlockBodies>()?.last()?;
let v = tx.cursor_read::<tables::BlockBodyIndices>()?.last()?;
Ok(v)
})?;
Ok(match last_body {
Some((_, body)) if body.tx_count != 0 => {
Some(body.start_tx_id + body.tx_count - 1)
Some(body.first_tx_num + body.tx_count - 1)
}
_ => None,
})
@ -655,11 +629,9 @@ mod tests {
// Acquire cursors on body related tables
let mut headers_cursor = tx.cursor_read::<tables::Headers>()?;
let mut td_cursor = tx.cursor_read::<tables::HeaderTD>()?;
let mut bodies_cursor = tx.cursor_read::<tables::BlockBodies>()?;
let mut bodies_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
let mut ommers_cursor = tx.cursor_read::<tables::BlockOmmers>()?;
let mut block_transition_cursor = tx.cursor_read::<tables::BlockTransitionIndex>()?;
let mut transaction_cursor = tx.cursor_read::<tables::Transactions>()?;
let mut tx_transition_cursor = tx.cursor_read::<tables::TxTransitionIndex>()?;
let mut tx_block_cursor = tx.cursor_read::<tables::TransactionBlock>()?;
let first_body_key = match bodies_cursor.first()? {
@ -696,19 +668,18 @@ mod tests {
assert!(stored_ommers.is_some(), "Missing ommers entry");
}
let tx_block_id = tx_block_cursor.seek_exact(body.last_tx_index())?.map(|(_,b)| b);
let tx_block_id = tx_block_cursor.seek_exact(body.last_tx_num())?.map(|(_,b)| b);
if body.tx_count == 0 {
assert_ne!(tx_block_id,Some(number));
} else {
assert_eq!(tx_block_id, Some(number));
}
for tx_id in body.tx_id_range() {
assert_eq!(body.first_transition_id, expected_transition_id);
for tx_id in body.tx_num_range() {
let tx_entry = transaction_cursor.seek_exact(tx_id)?;
assert!(tx_entry.is_some(), "Transaction is missing.");
assert_eq!(
tx_transition_cursor.seek_exact(tx_id).expect("to be okay").expect("to be present").1, expected_transition_id
);
// Increment expected id for each transaction transition.
expected_transition_id += 1;
}
@ -722,10 +693,6 @@ mod tests {
expected_transition_id += 1;
}
// Validate that block transition exists
assert_eq!(block_transition_cursor.seek_exact(number).expect("To be okay").expect("Block transition to be present").1,expected_transition_id);
prev_number = Some(number);
}
Ok(())

View File

@ -36,7 +36,7 @@ pub struct ExecutionStageMetrics {
/// - [tables::CanonicalHeaders] get next block to execute.
/// - [tables::Headers] get for revm environment variables.
/// - [tables::HeaderTD]
/// - [tables::BlockBodies] to get tx number
/// - [tables::BlockBodyIndices] to get tx number
/// - [tables::Transactions] to execute
///
/// For state access [LatestStateProviderRef] provides us latest state and history state
@ -53,7 +53,7 @@ pub struct ExecutionStageMetrics {
/// - [tables::StorageChangeSet]
///
/// For unwinds we are accessing:
/// - [tables::BlockBodies] get tx index to know what needs to be unwinded
/// - [tables::BlockBodyIndices] get tx index to know what needs to be unwinded
/// - [tables::AccountHistory] to remove change set and apply old values to
/// - [tables::PlainAccountState] [tables::StorageHistory] to remove change set and apply old values
/// to [tables::PlainStorageState]

View File

@ -1,4 +1,6 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput};
use itertools::Itertools;
use rayon::slice::ParallelSliceMut;
use reth_codecs::Compact;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
@ -8,7 +10,8 @@ use reth_db::{
};
use reth_primitives::{keccak256, AccountHashingCheckpoint};
use reth_provider::Transaction;
use std::{collections::BTreeMap, fmt::Debug, ops::Range};
use std::{fmt::Debug, ops::Range};
use tokio::sync::mpsc;
use tracing::*;
/// The [`StageId`] of the account hashing stage.
@ -21,7 +24,7 @@ pub struct AccountHashingStage {
/// The threshold (in number of state transitions) for switching between incremental
/// hashing and full storage hashing.
pub clean_threshold: u64,
/// The maximum number of blocks to process before committing.
/// The maximum number of accounts to process before committing.
pub commit_threshold: u64,
}
@ -126,8 +129,12 @@ impl AccountHashingStage {
}
// seed account changeset
let (_, last_transition) =
tx.cursor_read::<tables::BlockTransitionIndex>()?.last()?.unwrap();
let last_transition = tx
.cursor_read::<tables::BlockBodyIndices>()?
.last()?
.unwrap()
.1
.transition_after_block();
let first_transition = last_transition.checked_sub(transitions).unwrap_or_default();
@ -192,13 +199,38 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
let start_address = checkpoint.address.take();
let next_address = {
let mut accounts = tx.cursor_read::<tables::PlainAccountState>()?;
let mut accounts_cursor = tx.cursor_read::<tables::PlainAccountState>()?;
let hashed_batch = accounts
// channels used to return result of account hashing
let mut channels = Vec::new();
for chunk in &accounts_cursor
.walk(start_address)?
.take(self.commit_threshold as usize)
.map(|res| res.map(|(address, account)| (keccak256(address), account)))
.collect::<Result<BTreeMap<_, _>, _>>()?;
.chunks(self.commit_threshold as usize / rayon::current_num_threads())
{
// An _unordered_ channel to receive results from a rayon job
let (tx, rx) = mpsc::unbounded_channel();
channels.push(rx);
let chunk = chunk.collect::<Result<Vec<_>, _>>()?;
// Spawn the hashing task onto the global rayon pool
rayon::spawn(move || {
for (address, account) in chunk.into_iter() {
let _ = tx.send((keccak256(address), account));
}
});
}
let mut hashed_batch = Vec::with_capacity(self.commit_threshold as usize);
// Iterate over channels and append the hashed accounts.
for mut channel in channels {
while let Some(hashed) = channel.recv().await {
hashed_batch.push(hashed);
}
}
// sort it all in parallel
hashed_batch.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
let mut hashed_account_cursor = tx.cursor_write::<tables::HashedAccount>()?;
@ -214,7 +246,7 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
}
// next key of iterator
accounts.next()?
accounts_cursor.next()?
};
if let Some((next_address, _)) = &next_address {

View File

@ -226,7 +226,7 @@ mod tests {
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
mdbx::{tx::Tx, WriteMap, RW},
models::{StoredBlockBody, TransitionIdAddress},
models::{StoredBlockBodyIndices, TransitionIdAddress},
};
use reth_interfaces::test_utils::generators::{
random_block_range, random_contract_account_range,
@ -321,20 +321,15 @@ mod tests {
self.tx.insert_headers(blocks.iter().map(|block| &block.header))?;
let iter = blocks.iter();
let (mut transition_id, mut tx_id) = (0, 0);
let (mut next_transition_id, mut next_tx_num) = (0, 0);
let mut first_transition_id = next_transition_id;
let mut first_tx_num = next_tx_num;
for progress in iter {
// Insert last progress data
self.tx.commit(|tx| {
let body = StoredBlockBody {
start_tx_id: tx_id,
tx_count: progress.body.len() as u64,
};
progress.body.iter().try_for_each(|transaction| {
tx.put::<tables::TxHashNumber>(transaction.hash(), tx_id)?;
tx.put::<tables::Transactions>(tx_id, transaction.clone())?;
tx.put::<tables::TxTransitionIndex>(tx_id, transition_id)?;
tx.put::<tables::TxHashNumber>(transaction.hash(), next_tx_num)?;
tx.put::<tables::Transactions>(next_tx_num, transaction.clone())?;
let (addr, _) = accounts
.get_mut(rand::random::<usize>() % n_accounts as usize)
@ -347,14 +342,14 @@ mod tests {
};
self.insert_storage_entry(
tx,
(transition_id, *addr).into(),
(next_transition_id, *addr).into(),
new_entry,
progress.header.number == stage_progress,
)?;
}
tx_id += 1;
transition_id += 1;
next_tx_num += 1;
next_transition_id += 1;
Ok(())
})?;
@ -363,18 +358,27 @@ mod tests {
if has_reward {
self.insert_storage_entry(
tx,
(transition_id, Address::random()).into(),
(next_transition_id, Address::random()).into(),
StorageEntry {
key: keccak256("mining"),
value: U256::from(rand::random::<u32>()),
},
progress.header.number == stage_progress,
)?;
transition_id += 1;
next_transition_id += 1;
}
tx.put::<tables::BlockTransitionIndex>(progress.number, transition_id)?;
tx.put::<tables::BlockBodies>(progress.number, body)
let body = StoredBlockBodyIndices {
first_tx_num,
first_transition_id,
tx_count: progress.body.len() as u64,
has_block_change: has_reward,
};
first_transition_id = next_transition_id;
first_tx_num = next_tx_num;
tx.put::<tables::BlockBodyIndices>(progress.number, body)
})?;
}

View File

@ -80,7 +80,10 @@ mod tests {
use super::*;
use crate::test_utils::{TestTransaction, PREV_STAGE_ID};
use reth_db::{
models::{sharded_key::NUM_OF_INDICES_IN_SHARD, AccountBeforeTx, ShardedKey},
models::{
sharded_key::NUM_OF_INDICES_IN_SHARD, AccountBeforeTx, ShardedKey,
StoredBlockBodyIndices,
},
tables,
transaction::DbTxMut,
TransitionList,
@ -118,8 +121,25 @@ mod tests {
// setup
tx.commit(|tx| {
// we just need first and last
tx.put::<tables::BlockTransitionIndex>(0, 3).unwrap();
tx.put::<tables::BlockTransitionIndex>(5, 7).unwrap();
tx.put::<tables::BlockBodyIndices>(
0,
StoredBlockBodyIndices {
first_transition_id: 0,
tx_count: 3,
..Default::default()
},
)
.unwrap();
tx.put::<tables::BlockBodyIndices>(
5,
StoredBlockBodyIndices {
first_transition_id: 3,
tx_count: 5,
..Default::default()
},
)
.unwrap();
// setup changeset that are going to be applied to history index
tx.put::<tables::AccountChangeSet>(4, acc()).unwrap();

View File

@ -86,7 +86,7 @@ mod tests {
use reth_db::{
models::{
storage_sharded_key::{StorageShardedKey, NUM_OF_INDICES_IN_SHARD},
ShardedKey, TransitionIdAddress,
ShardedKey, StoredBlockBodyIndices, TransitionIdAddress,
},
tables,
transaction::DbTxMut,
@ -135,8 +135,25 @@ mod tests {
// setup
tx.commit(|tx| {
// we just need first and last
tx.put::<tables::BlockTransitionIndex>(0, 3).unwrap();
tx.put::<tables::BlockTransitionIndex>(5, 7).unwrap();
tx.put::<tables::BlockBodyIndices>(
0,
StoredBlockBodyIndices {
first_transition_id: 0,
tx_count: 3,
..Default::default()
},
)
.unwrap();
tx.put::<tables::BlockBodyIndices>(
5,
StoredBlockBodyIndices {
first_transition_id: 3,
tx_count: 5,
..Default::default()
},
)
.unwrap();
// setup changeset that are going to be applied to history index
tx.put::<tables::StorageChangeSet>(trns(4), storage(STORAGE_KEY)).unwrap();

View File

@ -43,7 +43,7 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
}
/// Retrieve the range of transactions to iterate over by querying
/// [`BlockBodies`][reth_db::tables::BlockBodies],
/// [`BlockBodyIndices`][reth_db::tables::BlockBodyIndices],
/// collect transactions within that range,
/// recover signer for each transaction and store entries in
/// the [`TxSenders`][reth_db::tables::TxSenders] table.
@ -57,14 +57,14 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
let done = !capped;
// Look up the start index for the transaction range
let start_tx_index = tx.get_block_body(start_block)?.start_tx_id;
let first_tx_num = tx.get_block_meta(start_block)?.first_tx_num();
// Look up the end index for transaction range (inclusive)
let end_tx_index = tx.get_block_body(end_block)?.last_tx_index();
let last_tx_num = tx.get_block_meta(end_block)?.last_tx_num();
// No transactions to walk over
if start_tx_index > end_tx_index {
info!(target: "sync::stages::sender_recovery", start_tx_index, end_tx_index, "Target transaction already reached");
if first_tx_num > last_tx_num {
info!(target: "sync::stages::sender_recovery", first_tx_num, last_tx_num, "Target transaction already reached");
return Ok(ExecOutput { stage_progress: end_block, done })
}
@ -74,10 +74,10 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
// Acquire the cursor over the transactions
let mut tx_cursor = tx.cursor_read::<tables::Transactions>()?;
// Walk the transactions from start to end index (inclusive)
let tx_walker = tx_cursor.walk_range(start_tx_index..=end_tx_index)?;
let tx_walker = tx_cursor.walk_range(first_tx_num..=last_tx_num)?;
// Iterate over transactions in chunks
info!(target: "sync::stages::sender_recovery", start_tx_index, end_tx_index, "Recovering senders");
info!(target: "sync::stages::sender_recovery", first_tx_num, last_tx_num, "Recovering senders");
// channels used to return result of sender recovery.
let mut channels = Vec::new();
@ -136,7 +136,7 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
) -> Result<UnwindOutput, StageError> {
info!(target: "sync::stages::sender_recovery", to_block = input.unwind_to, "Unwinding");
// Lookup latest tx id that we should unwind to
let latest_tx_id = tx.get_block_body(input.unwind_to)?.last_tx_index();
let latest_tx_id = tx.get_block_meta(input.unwind_to)?.last_tx_num();
tx.unwind_table_by_num::<tables::TxSenders>(latest_tx_id)?;
Ok(UnwindOutput { stage_progress: input.unwind_to })
}
@ -267,13 +267,11 @@ mod tests {
/// 2. If the is no requested block entry in the bodies table,
/// but [tables::TxSenders] is not empty.
fn ensure_no_senders_by_block(&self, block: BlockNumber) -> Result<(), TestRunnerError> {
let body_result = self.tx.inner().get_block_body(block);
let body_result = self.tx.inner().get_block_meta(block);
match body_result {
Ok(body) => self
.tx
.ensure_no_entry_above::<tables::TxSenders, _>(body.last_tx_index(), |key| {
key
})?,
.ensure_no_entry_above::<tables::TxSenders, _>(body.last_tx_num(), |key| key)?,
Err(_) => {
assert!(self.tx.table_is_empty::<tables::TxSenders>()?);
}
@ -321,11 +319,11 @@ mod tests {
return Ok(())
}
let mut body_cursor = tx.cursor_read::<tables::BlockBodies>()?;
let mut body_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
body_cursor.seek_exact(start_block)?;
while let Some((_, body)) = body_cursor.next()? {
for tx_id in body.tx_id_range() {
for tx_id in body.tx_num_range() {
let transaction = tx
.get::<tables::Transactions>(tx_id)?
.expect("no transaction entry");

View File

@ -56,19 +56,17 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
debug!(target: "sync::stages::transaction_lookup", start_block, end_block, "Commencing sync");
let mut cursor_bodies = tx.cursor_read::<tables::BlockBodies>()?;
let mut tx_cursor = tx.cursor_write::<tables::Transactions>()?;
let mut block_meta_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
let mut tx_cursor = tx.cursor_read::<tables::Transactions>()?;
// Walk over block bodies within a specified range.
let bodies = cursor_bodies.walk(Some(start_block))?.take_while(|entry| {
entry.as_ref().map(|(num, _)| *num <= end_block).unwrap_or_default()
});
let bodies = block_meta_cursor.walk_range(start_block..=end_block)?;
// Collect transactions for each body
let mut tx_list = vec![];
for body_entry in bodies {
let (_, body) = body_entry?;
let transactions = tx_cursor.walk(Some(body.start_tx_id))?.take(body.tx_count as usize);
for block_meta_entry in bodies {
let (_, block_meta) = block_meta_entry?;
let transactions = tx_cursor.walk_range(block_meta.tx_num_range())?;
for tx_entry in transactions {
let (id, transaction) = tx_entry?;
@ -81,20 +79,23 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
let mut txhash_cursor = tx.cursor_write::<tables::TxHashNumber>()?;
// If the last inserted element in the database is smaller than the first in our set, then
// we can just append into the DB. This probably only ever happens during sync, on
// the first table insertion.
let append = tx_list
// If the last inserted element in the database is equal or bigger than the first
// in our set, then we need to insert inside the DB. If it is smaller then last
// element in the DB, we can append to the DB.
// Append probably only ever happens during sync, on the first table insertion.
let insert = tx_list
.first()
.zip(txhash_cursor.last()?)
.map(|((first, _), (last, _))| &last < first)
.map(|((first, _), (last, _))| first <= &last)
.unwrap_or_default();
// if txhash_cursor.last() is None we will do insert. `zip` would return none if any item is
// none. if it is some and if first is smaller than last, we will do append.
for (tx_hash, id) in tx_list {
if append {
txhash_cursor.append(tx_hash, id)?;
} else {
if insert {
txhash_cursor.insert(tx_hash, id)?;
} else {
txhash_cursor.append(tx_hash, id)?;
}
}
@ -111,7 +112,7 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
) -> Result<UnwindOutput, StageError> {
info!(target: "sync::stages::transaction_lookup", to_block = input.unwind_to, "Unwinding");
// Cursors to unwind tx hash to number
let mut body_cursor = tx.cursor_read::<tables::BlockBodies>()?;
let mut body_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
let mut tx_hash_number_cursor = tx.cursor_write::<tables::TxHashNumber>()?;
let mut transaction_cursor = tx.cursor_read::<tables::Transactions>()?;
let mut rev_walker = body_cursor.walk_back(None)?;
@ -121,7 +122,7 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
}
// Delete all transactions that belong to this block
for tx_id in body.tx_id_range() {
for tx_id in body.tx_num_range() {
// First delete the transaction and hash to id mapping
if let Some((_, transaction)) = transaction_cursor.seek_exact(tx_id)? {
if tx_hash_number_cursor.seek_exact(transaction.hash)?.is_some() {
@ -162,7 +163,7 @@ mod tests {
// Insert blocks with a single transaction at block `stage_progress + 10`
let non_empty_block_number = stage_progress + 10;
let blocks = (stage_progress..input.previous_stage_progress() + 1)
let blocks = (stage_progress..=input.previous_stage_progress())
.map(|number| {
random_block(number, None, Some((number == non_empty_block_number) as u8), None)
})
@ -246,10 +247,10 @@ mod tests {
/// 2. If the is no requested block entry in the bodies table,
/// but [tables::TxHashNumber] is not empty.
fn ensure_no_hash_by_block(&self, number: BlockNumber) -> Result<(), TestRunnerError> {
let body_result = self.tx.inner().get_block_body(number);
let body_result = self.tx.inner().get_block_meta(number);
match body_result {
Ok(body) => self.tx.ensure_no_entry_above_by_value::<tables::TxHashNumber, _>(
body.last_tx_index(),
body.last_tx_num(),
|key| key,
)?,
Err(_) => {
@ -299,11 +300,11 @@ mod tests {
return Ok(())
}
let mut body_cursor = tx.cursor_read::<tables::BlockBodies>()?;
let mut body_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
body_cursor.seek_exact(start_block)?;
while let Some((_, body)) = body_cursor.next()? {
for tx_id in body.tx_id_range() {
for tx_id in body.tx_num_range() {
let transaction = tx
.get::<tables::Transactions>(tx_id)?
.expect("no transaction entry");

View File

@ -6,7 +6,7 @@ use reth_db::{
tx::Tx,
Env, EnvKind, WriteMap, RW,
},
models::{AccountBeforeTx, StoredBlockBody},
models::{AccountBeforeTx, StoredBlockBodyIndices},
table::Table,
tables,
transaction::{DbTx, DbTxMut},
@ -230,25 +230,25 @@ impl TestTransaction {
I: Iterator<Item = &'a SealedBlock>,
{
self.commit(|tx| {
let mut current_tx_id = tx_offset.unwrap_or_default();
let mut next_tx_num = tx_offset.unwrap_or_default();
blocks.into_iter().try_for_each(|block| {
Self::insert_header(tx, &block.header)?;
// Insert into body tables.
tx.put::<tables::BlockBodies>(
tx.put::<tables::BlockBodyIndices>(
block.number,
StoredBlockBody {
start_tx_id: current_tx_id,
StoredBlockBodyIndices {
first_tx_num: next_tx_num,
first_transition_id: next_tx_num,
tx_count: block.body.len() as u64,
has_block_change: false,
},
)?;
block.body.iter().try_for_each(|body_tx| {
tx.put::<tables::TxTransitionIndex>(current_tx_id, current_tx_id)?;
tx.put::<tables::Transactions>(current_tx_id, body_tx.clone())?;
current_tx_id += 1;
tx.put::<tables::Transactions>(next_tx_num, body_tx.clone())?;
next_tx_num += 1;
Ok(())
})?;
tx.put::<tables::BlockTransitionIndex>(block.number, current_tx_id)
})
})
})
}

View File

@ -162,7 +162,7 @@ pub fn get_bit_size(ftype: &str) -> u8 {
match ftype {
"bool" | "Option" => 1,
"TxType" => 2,
"u64" | "BlockNumber" | "TxNumber" | "ChainId" => 4,
"u64" | "BlockNumber" | "TxNumber" | "ChainId" | "TransitionId" | "NumTransactions" => 4,
"u128" => 5,
"U256" | "TxHash" => 6,
_ => 0,

View File

@ -26,11 +26,9 @@ pub fn db(c: &mut Criterion) {
measure_table_db::<HeaderTD>(&mut group);
measure_table_db::<HeaderNumbers>(&mut group);
measure_table_db::<Headers>(&mut group);
measure_table_db::<BlockBodies>(&mut group);
measure_table_db::<BlockBodyIndices>(&mut group);
measure_table_db::<BlockOmmers>(&mut group);
measure_table_db::<TxHashNumber>(&mut group);
measure_table_db::<BlockTransitionIndex>(&mut group);
measure_table_db::<TxTransitionIndex>(&mut group);
measure_table_db::<Transactions>(&mut group);
measure_dupsort_db::<PlainStorageState>(&mut group);
measure_table_db::<PlainAccountState>(&mut group);
@ -45,11 +43,9 @@ pub fn serialization(c: &mut Criterion) {
measure_table_serialization::<HeaderTD>(&mut group);
measure_table_serialization::<HeaderNumbers>(&mut group);
measure_table_serialization::<Headers>(&mut group);
measure_table_serialization::<BlockBodies>(&mut group);
measure_table_serialization::<BlockBodyIndices>(&mut group);
measure_table_serialization::<BlockOmmers>(&mut group);
measure_table_serialization::<TxHashNumber>(&mut group);
measure_table_serialization::<BlockTransitionIndex>(&mut group);
measure_table_serialization::<TxTransitionIndex>(&mut group);
measure_table_serialization::<Transactions>(&mut group);
measure_table_serialization::<PlainStorageState>(&mut group);
measure_table_serialization::<PlainAccountState>(&mut group);

View File

@ -68,11 +68,9 @@ impl_iai!(
HeaderTD,
HeaderNumbers,
Headers,
BlockBodies,
BlockBodyIndices,
BlockOmmers,
TxHashNumber,
BlockTransitionIndex,
TxTransitionIndex,
Transactions,
PlainStorageState,
PlainAccountState

View File

@ -41,7 +41,7 @@ impl_compression_for_compact!(
TxType,
StorageEntry,
StorageTrieEntry,
StoredBlockBody,
StoredBlockBodyIndices,
StoredBlockOmmers,
StoredBlockWithdrawals,
Bytecode,

View File

@ -25,7 +25,7 @@ use crate::{
accounts::{AccountBeforeTx, TransitionIdAddress},
blocks::{HeaderHash, StoredBlockOmmers},
storage_sharded_key::StorageShardedKey,
ShardedKey, StoredBlockBody, StoredBlockWithdrawals,
ShardedKey, StoredBlockBodyIndices, StoredBlockWithdrawals,
},
},
};
@ -44,12 +44,12 @@ pub enum TableType {
}
/// Default tables that should be present inside database.
pub const TABLES: [(TableType, &str); 27] = [
pub const TABLES: [(TableType, &str); 25] = [
(TableType::Table, CanonicalHeaders::const_name()),
(TableType::Table, HeaderTD::const_name()),
(TableType::Table, HeaderNumbers::const_name()),
(TableType::Table, Headers::const_name()),
(TableType::Table, BlockBodies::const_name()),
(TableType::Table, BlockBodyIndices::const_name()),
(TableType::Table, BlockOmmers::const_name()),
(TableType::Table, BlockWithdrawals::const_name()),
(TableType::Table, TransactionBlock::const_name()),
@ -59,8 +59,6 @@ pub const TABLES: [(TableType, &str); 27] = [
(TableType::Table, PlainAccountState::const_name()),
(TableType::DupSort, PlainStorageState::const_name()),
(TableType::Table, Bytecodes::const_name()),
(TableType::Table, BlockTransitionIndex::const_name()),
(TableType::Table, TxTransitionIndex::const_name()),
(TableType::Table, AccountHistory::const_name()),
(TableType::Table, StorageHistory::const_name()),
(TableType::DupSort, AccountChangeSet::const_name()),
@ -146,8 +144,11 @@ table!(
);
table!(
/// Stores block bodies.
( BlockBodies ) BlockNumber | StoredBlockBody
/// Stores block indices that contains indexes of transaction and transitions,
/// number of transactions and if block has a block change (block reward or withdrawals).
///
/// More information about stored indices can be found in the [`StoredBlockBodyIndices`] struct.
( BlockBodyIndices ) BlockNumber | StoredBlockBodyIndices
);
table!(
@ -190,20 +191,6 @@ table!(
( Bytecodes ) H256 | Bytecode
);
table!(
/// Stores the mapping of block number to state transition id.
/// The block transition marks the final state at the end of the block.
/// Increment the transition if the block contains an addition block reward.
/// If the block does not have a reward and transaction, the transition will be the same as the
/// transition at the last transaction of this block.
( BlockTransitionIndex ) BlockNumber | TransitionId
);
table!(
/// Stores the mapping of transaction number to state transition id.
( TxTransitionIndex ) TxNumber | TransitionId
);
table!(
/// Stores the current state of an [`Account`].
( PlainAccountState ) Address | Account

View File

@ -1,40 +1,89 @@
//! Block related models and types.
use reth_codecs::{main_codec, Compact};
use reth_primitives::{Header, TxNumber, Withdrawal, H256};
use reth_primitives::{Header, TransitionId, TxNumber, Withdrawal, H256};
use std::ops::Range;
/// Total number of transactions.
pub type NumTransactions = u64;
/// The storage representation of a block.
/// The storage of the block body indices
///
/// It has the pointer to the transaction Number of the first
/// transaction in the block and the total number of transactions
#[derive(Debug, Default, Eq, PartialEq, Clone)]
#[main_codec]
pub struct StoredBlockBody {
/// The id of the first transaction in this block
pub start_tx_id: TxNumber,
pub struct StoredBlockBodyIndices {
/// The number of the first transaction in this block
///
/// Note: If the block is empty, this is the number of the first transaction
/// in the next non-empty block.
pub first_tx_num: TxNumber,
/// The id of the first transition in this block.
///
/// NOTE: If the block is empty, this is the id of the first transition
/// in the next non-empty block.
pub first_transition_id: TransitionId,
/// The total number of transactions in the block
///
/// NOTE: Number of transitions is equal to number of transactions with
/// additional transition for block change if block has block reward or withdrawal.
pub tx_count: NumTransactions,
/// Flags if there is additional transition changeset of the withdrawal or block reward.
pub has_block_change: bool,
}
impl StoredBlockBody {
/// Return the range of transaction ids for this body
pub fn tx_id_range(&self) -> Range<u64> {
self.start_tx_id..self.start_tx_id + self.tx_count
impl StoredBlockBodyIndices {
/// Return the range of transaction ids for this block.
pub fn tx_num_range(&self) -> Range<TxNumber> {
self.first_tx_num..self.first_tx_num + self.tx_count
}
/// Return the range of transition ids for this block.
pub fn transition_range(&self) -> Range<TransitionId> {
self.transition_at_block()..self.transition_after_block()
}
/// Return transition id of the state after block executed.
/// This transitions is used with the history index to represent the state after this
/// block execution.
///
/// Because we are storing old values of the changeset in the history index, we need
/// transition of one after, to fetch correct values of the past state
///
/// NOTE: This is the same as the first transition id of the next block.
pub fn transition_after_block(&self) -> TransitionId {
self.first_transition_id + self.tx_count + (self.has_block_change as u64)
}
/// Return transition id of the state at the block execution.
/// This transitions is used with the history index to represent the state
/// before the block execution.
///
/// Because we are storing old values of the changeset in the history index, we need
/// transition of one after, to fetch correct values of the past state
///
/// NOTE: If block does not have transitions (empty block) then this is the same
/// as the first transition id of the next block.
pub fn transition_at_block(&self) -> TransitionId {
self.first_transition_id
}
/// Return the index of last transaction in this block unless the block
/// is empty in which case it refers to the last transaction in a previous
/// non-empty block
pub fn last_tx_index(&self) -> TxNumber {
self.start_tx_id.saturating_add(self.tx_count).saturating_sub(1)
pub fn last_tx_num(&self) -> TxNumber {
self.first_tx_num.saturating_add(self.tx_count).saturating_sub(1)
}
/// First transaction index.
pub fn first_tx_index(&self) -> TxNumber {
self.start_tx_id
pub fn first_tx_num(&self) -> TxNumber {
self.first_tx_num
}
/// Return the index of the next transaction after this block.
pub fn next_tx_num(&self) -> TxNumber {
self.first_tx_num + self.tx_count
}
/// Return a flag whether the block is empty
@ -43,9 +92,17 @@ impl StoredBlockBody {
}
/// Return number of transaction inside block
///
/// NOTE: This is not the same as the number of transitions.
pub fn tx_count(&self) -> NumTransactions {
self.tx_count
}
/// Return flag signifying whether the block has additional
/// transition changeset (withdrawal or uncle/block rewards).
pub fn has_block_change(&self) -> bool {
self.has_block_change
}
}
/// The storage representation of a block ommers.
@ -72,9 +129,8 @@ pub type HeaderHash = H256;
#[cfg(test)]
mod test {
use crate::table::{Compress, Decompress};
use super::*;
use crate::table::{Compress, Decompress};
#[test]
fn test_ommer() {
@ -85,4 +141,33 @@ mod test {
ommer.clone() == StoredBlockOmmers::decompress::<Vec<_>>(ommer.compress()).unwrap()
);
}
#[test]
fn block_meta_indices() {
let first_tx_num = 10;
let first_transition_id = 14;
let tx_count = 6;
let has_block_change = true;
let mut block_meta = StoredBlockBodyIndices {
first_tx_num,
first_transition_id,
tx_count,
has_block_change,
};
assert_eq!(block_meta.first_tx_num(), first_tx_num);
assert_eq!(block_meta.last_tx_num(), first_tx_num + tx_count - 1);
assert_eq!(block_meta.next_tx_num(), first_tx_num + tx_count);
assert_eq!(block_meta.tx_count(), tx_count);
assert!(block_meta.has_block_change());
assert_eq!(block_meta.transition_at_block(), first_transition_id);
assert_eq!(block_meta.transition_after_block(), first_transition_id + tx_count + 1);
assert_eq!(block_meta.tx_num_range(), first_tx_num..first_tx_num + tx_count);
assert_eq!(
block_meta.transition_range(),
first_transition_id..first_transition_id + tx_count + 1
);
block_meta.has_block_change = false;
assert_eq!(block_meta.transition_after_block(), first_transition_id + tx_count);
}
}

View File

@ -233,12 +233,11 @@ fn test_concurrent_readers_single_writer() {
let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap();
println!("wait2");
barrier.wait();
txn.put(&db, key, val, WriteFlags::empty()).unwrap();
txn.commit().unwrap();
println!("wait1");
barrier.wait();
assert!(threads.into_iter().all(|b| b.join().unwrap()))

View File

@ -132,8 +132,9 @@ impl<DB: Database> BlockProvider for ShareableDatabase<DB> {
if let Some(header) = self.header_by_number(number)? {
let id = BlockId::Number(number.into());
let tx = self.db.tx()?;
let transactions =
self.transactions_by_block(id)?.ok_or(ProviderError::BlockBody { number })?;
let transactions = self
.transactions_by_block(id)?
.ok_or(ProviderError::BlockBodyIndices { number })?;
let ommers = tx.get::<tables::BlockOmmers>(header.number)?.map(|o| o.ommers);
let withdrawals = self.withdrawals_by_block(id, header.timestamp)?;
@ -200,13 +201,13 @@ impl<DB: Database> TransactionsProvider for ShareableDatabase<DB> {
tx.get::<tables::CanonicalHeaders>(block_number)?
{
if let Some(block_body) =
tx.get::<tables::BlockBodies>(block_number)?
tx.get::<tables::BlockBodyIndices>(block_number)?
{
// the index of the tx in the block is the offset:
// len([start..tx_id])
// SAFETY: `transaction_id` is always `>=` the block's first
// index
let index = transaction_id - block_body.first_tx_index();
let index = transaction_id - block_body.first_tx_num();
let meta = TransactionMeta {
tx_hash,
@ -239,8 +240,8 @@ impl<DB: Database> TransactionsProvider for ShareableDatabase<DB> {
fn transactions_by_block(&self, id: BlockId) -> Result<Option<Vec<TransactionSigned>>> {
if let Some(number) = self.block_number_for_id(id)? {
let tx = self.db.tx()?;
if let Some(body) = tx.get::<tables::BlockBodies>(number)? {
let tx_range = body.tx_id_range();
if let Some(body) = tx.get::<tables::BlockBodyIndices>(number)? {
let tx_range = body.tx_num_range();
return if tx_range.is_empty() {
Ok(Some(Vec::new()))
} else {
@ -262,17 +263,17 @@ impl<DB: Database> TransactionsProvider for ShareableDatabase<DB> {
) -> Result<Vec<Vec<TransactionSigned>>> {
let tx = self.db.tx()?;
let mut results = Vec::default();
let mut body_cursor = tx.cursor_read::<tables::BlockBodies>()?;
let mut body_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
let mut tx_cursor = tx.cursor_read::<tables::Transactions>()?;
for entry in body_cursor.walk_range(range)? {
let (_, body) = entry?;
let tx_range = body.tx_id_range();
if body.tx_id_range().is_empty() {
let tx_num_range = body.tx_num_range();
if tx_num_range.is_empty() {
results.push(Vec::default());
} else {
results.push(
tx_cursor
.walk_range(tx_range)?
.walk_range(tx_num_range)?
.map(|result| result.map(|(_, tx)| tx))
.collect::<std::result::Result<Vec<_>, _>>()?,
);
@ -302,8 +303,8 @@ impl<DB: Database> ReceiptProvider for ShareableDatabase<DB> {
fn receipts_by_block(&self, block: BlockId) -> Result<Option<Vec<Receipt>>> {
if let Some(number) = self.block_number_for_id(block)? {
let tx = self.db.tx()?;
if let Some(body) = tx.get::<tables::BlockBodies>(number)? {
let tx_range = body.tx_id_range();
if let Some(body) = tx.get::<tables::BlockBodyIndices>(number)? {
let tx_range = body.tx_num_range();
return if tx_range.is_empty() {
Ok(Some(Vec::new()))
} else {
@ -419,8 +420,9 @@ impl<DB: Database> StateProviderFactory for ShareableDatabase<DB> {
// get transition id
let transition = tx
.get::<tables::BlockTransitionIndex>(block_number)?
.ok_or(ProviderError::BlockTransition { block_number })?;
.get::<tables::BlockBodyIndices>(block_number)?
.ok_or(ProviderError::BlockTransition { block_number })?
.transition_after_block();
Ok(Box::new(HistoricalStateProvider::new(tx, transition)))
}
@ -434,8 +436,9 @@ impl<DB: Database> StateProviderFactory for ShareableDatabase<DB> {
// get transition id
let transition = tx
.get::<tables::BlockTransitionIndex>(block_number)?
.ok_or(ProviderError::BlockTransition { block_number })?;
.get::<tables::BlockBodyIndices>(block_number)?
.ok_or(ProviderError::BlockTransition { block_number })?
.transition_after_block();
Ok(Box::new(HistoricalStateProvider::new(tx, transition)))
}

View File

@ -1,7 +1,7 @@
//! Dummy blocks and data for tests
use crate::{post_state::PostState, Transaction};
use reth_db::{database::Database, models::StoredBlockBody, tables};
use reth_db::{database::Database, models::StoredBlockBodyIndices, tables};
use reth_primitives::{
hex_literal::hex, proofs::EMPTY_ROOT, Account, Header, SealedBlock, SealedBlockWithSenders,
Withdrawal, H160, H256, U256,
@ -19,7 +19,10 @@ pub fn assert_genesis_block<DB: Database>(tx: &Transaction<'_, DB>, g: SealedBlo
assert_eq!(tx.table::<tables::HeaderNumbers>().unwrap(), vec![(h, n)]);
assert_eq!(tx.table::<tables::CanonicalHeaders>().unwrap(), vec![(n, h)]);
assert_eq!(tx.table::<tables::HeaderTD>().unwrap(), vec![(n, g.difficulty.into())]);
assert_eq!(tx.table::<tables::BlockBodies>().unwrap(), vec![(0, StoredBlockBody::default())]);
assert_eq!(
tx.table::<tables::BlockBodyIndices>().unwrap(),
vec![(0, StoredBlockBodyIndices::default())]
);
assert_eq!(tx.table::<tables::BlockOmmers>().unwrap(), vec![]);
assert_eq!(tx.table::<tables::BlockWithdrawals>().unwrap(), vec![]);
assert_eq!(tx.table::<tables::Transactions>().unwrap(), vec![]);
@ -32,8 +35,6 @@ pub fn assert_genesis_block<DB: Database>(tx: &Transaction<'_, DB>, g: SealedBlo
assert_eq!(tx.table::<tables::StorageHistory>().unwrap(), vec![]);
// TODO check after this gets done: https://github.com/paradigmxyz/reth/issues/1588
// Bytecodes are not reverted assert_eq!(tx.table::<tables::Bytecodes>().unwrap(), vec![]);
assert_eq!(tx.table::<tables::BlockTransitionIndex>().unwrap(), vec![(n, 0)]);
assert_eq!(tx.table::<tables::TxTransitionIndex>().unwrap(), vec![]);
assert_eq!(tx.table::<tables::AccountChangeSet>().unwrap(), vec![]);
assert_eq!(tx.table::<tables::StorageChangeSet>().unwrap(), vec![]);
assert_eq!(tx.table::<tables::HashedAccount>().unwrap(), vec![]);

View File

@ -11,7 +11,7 @@ use reth_db::{
models::{
sharded_key,
storage_sharded_key::{self, StorageShardedKey},
AccountBeforeTx, ShardedKey, StoredBlockBody, TransitionIdAddress,
AccountBeforeTx, ShardedKey, StoredBlockBodyIndices, TransitionIdAddress,
},
table::Table,
tables,
@ -140,17 +140,22 @@ where
}
/// Query the block body by number.
pub fn get_block_body(&self, number: BlockNumber) -> Result<StoredBlockBody, TransactionError> {
let body =
self.get::<tables::BlockBodies>(number)?.ok_or(ProviderError::BlockBody { number })?;
pub fn get_block_meta(
&self,
number: BlockNumber,
) -> Result<StoredBlockBodyIndices, TransactionError> {
let body = self
.get::<tables::BlockBodyIndices>(number)?
.ok_or(ProviderError::BlockBodyIndices { number })?;
Ok(body)
}
/// Query the last transition of the block by [BlockNumber] key
pub fn get_block_transition(&self, key: BlockNumber) -> Result<TransitionId, TransactionError> {
let last_transition_id = self
.get::<tables::BlockTransitionIndex>(key)?
.ok_or(ProviderError::BlockTransition { block_number: key })?;
.get::<tables::BlockBodyIndices>(key)?
.ok_or(ProviderError::BlockTransition { block_number: key })?
.transition_after_block();
Ok(last_transition_id)
}
@ -165,11 +170,8 @@ where
}
let prev_number = block - 1;
let prev_body = self.get_block_body(prev_number)?;
let last_transition = self
.get::<tables::BlockTransitionIndex>(prev_number)?
.ok_or(ProviderError::BlockTransition { block_number: prev_number })?;
Ok((prev_body.start_tx_id + prev_body.tx_count, last_transition))
let prev_body = self.get_block_meta(prev_number)?;
Ok((prev_body.first_tx_num + prev_body.tx_count, prev_body.transition_after_block()))
}
/// Query the block header by number
@ -567,9 +569,11 @@ where
// Header, Body, SenderRecovery, TD, TxLookup stages
let (block, senders) = block.into_components();
let (from, to) =
let block_meta =
insert_canonical_block(self.deref_mut(), block, Some(senders), false).unwrap();
let from = block_meta.transition_at_block();
let to = block_meta.transition_after_block();
// account history stage
{
let indices = self.get_account_transition_ids_from_changeset(from, to)?;
@ -664,16 +668,16 @@ where
&self,
range: impl RangeBounds<BlockNumber> + Clone,
) -> Result<Vec<(BlockNumber, Vec<TransactionSignedEcRecovered>)>, TransactionError> {
// Get the transaction ID ranges for the blocks
let block_bodies = self.get_or_take::<tables::BlockBodies, false>(range)?;
// Raad range of block bodies to get all transactions id's of this range.
let block_bodies = self.get_or_take::<tables::BlockBodyIndices, false>(range)?;
if block_bodies.is_empty() {
return Ok(Vec::new())
}
// Compute the first and last tx ID in the range
let first_transaction =
block_bodies.first().expect("If we have headers").1.first_tx_index();
let last_transaction = block_bodies.last().expect("Not empty").1.last_tx_index();
let first_transaction = block_bodies.first().expect("If we have headers").1.first_tx_num();
let last_transaction = block_bodies.last().expect("Not empty").1.last_tx_num();
// If this is the case then all of the blocks in the range are empty
if last_transaction < first_transaction {
@ -694,10 +698,6 @@ where
tx_hash_cursor.delete_current()?;
}
}
// Remove TxTransitionId
self.get_or_take::<tables::TxTransitionIndex, TAKE>(
first_transaction..=last_transaction,
)?;
// Remove TransactionBlock index if there are transaction present
if !transactions.is_empty() {
@ -712,7 +712,7 @@ where
let mut transactions = transactions.into_iter();
for (block_number, block_body) in block_bodies {
let mut one_block_tx = Vec::with_capacity(block_body.tx_count as usize);
for _ in block_body.tx_id_range() {
for _ in block_body.tx_num_range() {
let tx = transactions.next();
let sender = senders.next();
@ -835,8 +835,8 @@ where
/// Traverse over changesets and plain state and recreate the [`PostState`]s for the given range
/// of blocks.
///
/// 1. Iterate over the [BlockTransitionIndex][tables::BlockTransitionIndex] table to get all
/// the transitions
/// 1. Iterate over the [BlockBodyIndices][tables::BlockBodyIndices] table to get all
/// the transition indices.
/// 2. Iterate over the [StorageChangeSet][tables::StorageChangeSet] table
/// and the [AccountChangeSet][tables::AccountChangeSet] tables in reverse order to reconstruct
/// the changesets.
@ -859,29 +859,35 @@ where
&self,
range: impl RangeBounds<BlockNumber> + Clone,
) -> Result<Vec<PostState>, TransactionError> {
// We are not removing block meta as it is used to get block transitions.
let block_transition =
self.get_or_take::<tables::BlockTransitionIndex, TAKE>(range.clone())?;
self.get_or_take::<tables::BlockBodyIndices, false>(range.clone())?;
if block_transition.is_empty() {
return Ok(Vec::new())
}
// get block transitions
let first_block_number =
block_transition.first().expect("Check for empty is already done").0;
// get block transition of parent block.
let from = self.get_block_transition(first_block_number.saturating_sub(1))?;
let to = block_transition.last().expect("Check for empty is already done").1;
let from = block_transition
.first()
.expect("Check for empty is already done")
.1
.transition_at_block();
let to = block_transition
.last()
.expect("Check for empty is already done")
.1
.transition_after_block();
// NOTE: Just get block bodies dont remove them
// it is connection point for bodies getter and execution result getter.
let block_bodies = self.get_or_take::<tables::BlockBodies, false>(range)?;
let block_bodies = self.get_or_take::<tables::BlockBodyIndices, false>(range)?;
// get transaction receipts
let from_transaction_num =
block_bodies.first().expect("already checked if there are blocks").1.first_tx_index();
block_bodies.first().expect("already checked if there are blocks").1.first_tx_num();
let to_transaction_num =
block_bodies.last().expect("already checked if there are blocks").1.last_tx_index();
block_bodies.last().expect("already checked if there are blocks").1.last_tx_num();
let receipts =
self.get_or_take::<tables::Receipts, TAKE>(from_transaction_num..=to_transaction_num)?;
@ -1031,7 +1037,7 @@ where
// loop break if we are at the end of the blocks.
for (_, block_body) in block_bodies.into_iter() {
let mut block_post_state = PostState::new();
for tx_num in block_body.tx_id_range() {
for tx_num in block_body.tx_num_range() {
if let Some(changes) = all_changesets.remove(&next_transition_id) {
for mut change in changes.into_iter() {
change
@ -1048,10 +1054,10 @@ where
next_transition_id += 1;
}
let Some((_,block_transition)) = block_transition_iter.next() else { break};
let Some((_,block_meta)) = block_transition_iter.next() else { break};
// if block transition points to 1+next transition id it means that there is block
// changeset.
if block_transition == next_transition_id + 1 {
if block_meta.has_block_change() {
if let Some(changes) = all_changesets.remove(&next_transition_id) {
for mut change in changes.into_iter() {
change
@ -1139,7 +1145,7 @@ where
// that is why it is deleted afterwards.
if TAKE {
// rm block bodies
self.get_or_take::<tables::BlockBodies, TAKE>(range)?;
self.get_or_take::<tables::BlockBodyIndices, TAKE>(range)?;
// Update pipeline progress
if let Some(fork_number) = unwind_to {

View File

@ -1,28 +1,29 @@
use reth_db::{
models::{StoredBlockBody, StoredBlockOmmers, StoredBlockWithdrawals},
models::{StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals},
tables,
transaction::{DbTx, DbTxMut},
};
use reth_interfaces::{provider::ProviderError, Result};
use reth_primitives::{Address, SealedBlock, TransitionId};
use reth_primitives::{Address, SealedBlock};
/// Insert block data into corresponding tables. Used mainly for testing & internal tooling.
///
///
/// Check parent dependency in [tables::HeaderNumbers] and in [tables::BlockBodies] tables.
/// Check parent dependency in [tables::HeaderNumbers] and in [tables::BlockBodyIndices] tables.
/// Inserts header data to [tables::CanonicalHeaders], [tables::Headers], [tables::HeaderNumbers].
/// and transactions data to [tables::TxSenders], [tables::Transactions], [tables::TxHashNumber].
/// and transition indexes to [tables::BlockTransitionIndex] and [tables::TxTransitionIndex].
/// And block data [tables::BlockBodies], [tables::BlockBodies] and [tables::BlockWithdrawals].
/// and transition/transaction meta data to [tables::BlockBodyIndices]
/// and block data to [tables::BlockOmmers] and [tables::BlockWithdrawals].
///
/// Return [TransitionId] `(from,to)`
/// Return [StoredBlockBodyIndices] that contains indices of the first and last transactions and
/// transition in the block.
pub fn insert_block<'a, TX: DbTxMut<'a> + DbTx<'a>>(
tx: &TX,
block: SealedBlock,
senders: Option<Vec<Address>>,
has_block_reward: bool,
parent_tx_num_transition_id: Option<(u64, u64)>,
) -> Result<(TransitionId, TransitionId)> {
) -> Result<StoredBlockBodyIndices> {
let block_number = block.number;
tx.put::<tables::CanonicalHeaders>(block.number, block.hash())?;
// Put header with canonical hashes.
@ -48,35 +49,22 @@ pub fn insert_block<'a, TX: DbTxMut<'a> + DbTx<'a>>(
)?;
}
let (mut current_tx_id, mut transition_id) =
if let Some(parent_tx_num_transition_id) = parent_tx_num_transition_id {
parent_tx_num_transition_id
} else if block.number == 0 {
(0, 0)
} else {
let prev_block_num = block.number - 1;
let prev_body = tx
.get::<tables::BlockBodies>(prev_block_num)?
.ok_or(ProviderError::BlockBody { number: prev_block_num })?;
let last_transition_id = tx
.get::<tables::BlockTransitionIndex>(prev_block_num)?
.ok_or(ProviderError::BlockTransition { block_number: prev_block_num })?;
(prev_body.start_tx_id + prev_body.tx_count, last_transition_id)
};
let from_transition = transition_id;
// insert body data
tx.put::<tables::BlockBodies>(
block.number,
StoredBlockBody { start_tx_id: current_tx_id, tx_count: block.body.len() as u64 },
)?;
if !block.body.is_empty() {
// -1 is here as current_tx_id points to the next transaction.
tx.put::<tables::TransactionBlock>(
current_tx_id + block.body.len() as u64 - 1,
block.number,
)?;
}
let parent_block_meta = if let Some(parent_tx_num_transition_id) = parent_tx_num_transition_id {
StoredBlockBodyIndices {
first_transition_id: parent_tx_num_transition_id.1,
first_tx_num: parent_tx_num_transition_id.0,
tx_count: 0,
has_block_change: false,
}
} else if block.number == 0 {
StoredBlockBodyIndices::default()
} else {
let prev_block_num = block.number - 1;
tx.get::<tables::BlockBodyIndices>(prev_block_num)?
.ok_or(ProviderError::BlockBodyIndices { number: prev_block_num })?
};
let tx_count = block.body.len() as u64;
let mut next_tx_num = parent_block_meta.next_tx_num();
let senders_len = senders.as_ref().map(|s| s.len());
let tx_iter = if Some(block.body.len()) == senders_len {
@ -94,12 +82,10 @@ pub fn insert_block<'a, TX: DbTxMut<'a> + DbTx<'a>>(
for (transaction, sender) in tx_iter {
let hash = transaction.hash();
tx.put::<tables::TxSenders>(current_tx_id, sender)?;
tx.put::<tables::Transactions>(current_tx_id, transaction)?;
tx.put::<tables::TxTransitionIndex>(current_tx_id, transition_id)?;
tx.put::<tables::TxHashNumber>(hash, current_tx_id)?;
transition_id += 1;
current_tx_id += 1;
tx.put::<tables::TxSenders>(next_tx_num, sender)?;
tx.put::<tables::Transactions>(next_tx_num, transaction)?;
tx.put::<tables::TxHashNumber>(hash, next_tx_num)?;
next_tx_num += 1;
}
let mut has_withdrawals = false;
@ -113,13 +99,21 @@ pub fn insert_block<'a, TX: DbTxMut<'a> + DbTx<'a>>(
}
}
if has_block_reward || has_withdrawals {
transition_id += 1;
}
tx.put::<tables::BlockTransitionIndex>(block_number, transition_id)?;
let has_block_change = has_block_reward || has_withdrawals;
let to_transition = transition_id;
Ok((from_transition, to_transition))
let block_meta = StoredBlockBodyIndices {
first_transition_id: parent_block_meta.transition_after_block(),
first_tx_num: parent_block_meta.next_tx_num(),
tx_count,
has_block_change,
};
tx.put::<tables::BlockBodyIndices>(block_number, block_meta.clone())?;
if !block_meta.is_empty() {
tx.put::<tables::TransactionBlock>(block_meta.last_tx_num(), block_number)?;
}
Ok(block_meta)
}
/// Inserts canonical block in blockchain. Parent tx num and transition id is taken from
@ -129,6 +123,6 @@ pub fn insert_canonical_block<'a, TX: DbTxMut<'a> + DbTx<'a>>(
block: SealedBlock,
senders: Option<Vec<Address>>,
has_block_reward: bool,
) -> Result<(TransitionId, TransitionId)> {
) -> Result<StoredBlockBodyIndices> {
insert_block(tx, block, senders, has_block_reward, None)
}