chore(db): remove block numhash key (#1242)

This commit is contained in:
Roman Krasiuk
2023-02-10 23:43:00 +02:00
committed by GitHub
parent 23848df73a
commit 00a49f5ee7
20 changed files with 175 additions and 271 deletions

View File

@ -8,12 +8,10 @@ pub enum Error {
BlockNumber { block_number: BlockNumber },
#[error("Block hash {block_hash:?} does not exist in Headers table")]
BlockHash { block_hash: BlockHash },
#[error("Block body not exists #{block_number} ({block_hash:?})")]
BlockBody { block_number: BlockNumber, block_hash: BlockHash },
#[error("Block body not exists #{block_number}")]
BlockBody { block_number: BlockNumber },
#[error("Block transition id does not exist for block #{block_number}")]
BlockTransition { block_number: BlockNumber },
#[error("Block number {block_number} from block hash #{block_hash} does not exist in canonical chain")]
BlockCanonical { block_number: BlockNumber, block_hash: BlockHash },
#[error("Block number {block_number} with hash #{received_hash:?} is not canonical block. Canonical block hash is #{expected_hash:?}")]
NonCanonicalBlock {
block_number: BlockNumber,

View File

@ -134,7 +134,7 @@ where
.ok_or(DownloadError::MissingHeader { block_number: current_block_num })?;
// Find the block header.
let (_, header) = header_cursor
.seek_exact((number, hash).into())?
.seek_exact(number)?
.ok_or(DownloadError::MissingHeader { block_number: number })?;
// If the header is not empty, increment the counter

View File

@ -50,7 +50,7 @@ pub(crate) fn insert_headers(db: &Env<WriteMap>, headers: &[SealedHeader]) {
db.update(|tx| -> Result<(), db::Error> {
for header in headers {
tx.put::<tables::CanonicalHeaders>(header.number, header.hash())?;
tx.put::<tables::Headers>(header.num_hash().into(), header.clone().unseal())?;
tx.put::<tables::Headers>(header.number, header.clone().unseal())?;
}
Ok(())
})

View File

@ -53,10 +53,10 @@ pub fn init_genesis<DB: Database>(db: Arc<DB>, chain: ChainSpec) -> Result<H256,
let hash = header.hash_slow();
tx.put::<tables::CanonicalHeaders>(0, hash)?;
tx.put::<tables::HeaderNumbers>(hash, 0)?;
tx.put::<tables::BlockBodies>((0, hash).into(), Default::default())?;
tx.put::<tables::BlockBodies>(0, Default::default())?;
tx.put::<tables::BlockTransitionIndex>(0, 0)?;
tx.put::<tables::HeaderTD>((0, hash).into(), header.difficulty.into())?;
tx.put::<tables::Headers>((0, hash).into(), header)?;
tx.put::<tables::HeaderTD>(0, header.difficulty.into())?;
tx.put::<tables::Headers>(0, header)?;
tx.commit()?;
Ok(hash)

View File

@ -7,7 +7,7 @@ use std::{
use reth_db::{
cursor::DbCursorRO,
database::{Database, DatabaseGAT},
models::{BlockNumHash, StoredBlockBody},
models::StoredBlockBody,
table::Table,
tables,
transaction::{DbTx, DbTxMut},
@ -110,29 +110,15 @@ where
Ok(hash)
}
/// Query for block hash by block number and return it as [BlockNumHash] key
pub(crate) fn get_block_numhash(
&self,
number: BlockNumber,
) -> Result<BlockNumHash, StageError> {
Ok((number, self.get_block_hash(number)?).into())
}
/// Query the block body by [BlockNumHash] key
pub(crate) fn get_block_body(&self, key: BlockNumHash) -> Result<StoredBlockBody, StageError> {
let body = self
.get::<tables::BlockBodies>(key)?
.ok_or(DatabaseIntegrityError::BlockBody { number: key.number() })?;
Ok(body)
}
/// Query the block body by number
pub(crate) fn get_block_body_by_num(
/// Query the block body by number.
pub(crate) fn get_block_body(
&self,
number: BlockNumber,
) -> Result<StoredBlockBody, StageError> {
let key = self.get_block_numhash(number)?;
self.get_block_body(key)
let body = self
.get::<tables::BlockBodies>(number)?
.ok_or(DatabaseIntegrityError::BlockBody { number })?;
Ok(body)
}
/// Query the last transition of the block by [BlockNumber] key
@ -156,20 +142,19 @@ where
return Ok((0, 0))
}
let prev_key = self.get_block_numhash(block - 1)?;
let prev_body = self.get_block_body(prev_key)?;
let prev_number = block - 1;
let prev_body = self.get_block_body(prev_number)?;
let last_transition = self
.get::<tables::BlockTransitionIndex>(prev_key.number())?
.ok_or(DatabaseIntegrityError::BlockTransition { number: prev_key.number() })?;
.get::<tables::BlockTransitionIndex>(prev_number)?
.ok_or(DatabaseIntegrityError::BlockTransition { number: prev_number })?;
Ok((prev_body.start_tx_id + prev_body.tx_count, last_transition))
}
/// Query the block header by number
pub(crate) fn get_header_by_num(&self, block: BlockNumber) -> Result<Header, StageError> {
let key = self.get_block_numhash(block)?;
pub(crate) fn get_header(&self, number: BlockNumber) -> Result<Header, StageError> {
let header = self
.get::<tables::Headers>(key)?
.ok_or(DatabaseIntegrityError::Header { number: block, hash: key.hash() })?;
.get::<tables::Headers>(number)?
.ok_or(DatabaseIntegrityError::Header { number })?;
Ok(header)
}
@ -183,16 +168,6 @@ where
self.unwind_table::<T, _>(num, |key| key)
}
/// Unwind table by composite block number hash key
#[inline]
pub(crate) fn unwind_table_by_num_hash<T>(&self, block: BlockNumber) -> Result<(), Error>
where
DB: Database,
T: Table<Key = BlockNumHash>,
{
self.unwind_table::<T, _>(block, |key| key.number())
}
/// Unwind the table to a provided block
pub(crate) fn unwind_table<T, F>(
&self,

View File

@ -1,6 +1,6 @@
use crate::pipeline::PipelineEvent;
use reth_interfaces::{consensus, db::Error as DbError, executor, p2p::error::DownloadError};
use reth_primitives::{BlockNumber, TxNumber, H256};
use reth_primitives::{BlockNumber, TxNumber};
use thiserror::Error;
use tokio::sync::mpsc::error::SendError;
@ -78,12 +78,10 @@ pub enum DatabaseIntegrityError {
number: BlockNumber,
},
/// A header is missing from the database.
#[error("No header for block #{number} ({hash:?})")]
#[error("No header for block #{number}")]
Header {
/// The block number key
number: BlockNumber,
/// The block hash key
hash: H256,
},
/// A ommers are missing.
#[error("Block ommers not found for block #{number}")]

View File

@ -6,7 +6,7 @@ use futures_util::TryStreamExt;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
database::Database,
models::{BlockNumHash, StoredBlockBody, StoredBlockOmmers},
models::{StoredBlockBody, StoredBlockOmmers},
tables,
transaction::{DbTx, DbTxMut},
};
@ -110,20 +110,19 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
trace!(target: "sync::stages::bodies", bodies_len = downloaded_bodies.len(), "Writing blocks");
for response in downloaded_bodies {
// Write block
let block_header = response.header();
let numhash: BlockNumHash = block_header.num_hash().into();
let block_number = response.block_number();
match response {
BlockResponse::Full(block) => {
body_cursor.append(
numhash,
block_number,
StoredBlockBody {
start_tx_id: current_tx_id,
tx_count: block.body.len() as u64,
},
)?;
ommers_cursor.append(
numhash,
block_number,
StoredBlockOmmers {
ommers: block
.ommers
@ -146,7 +145,7 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
}
BlockResponse::Empty(_) => {
body_cursor.append(
numhash,
block_number,
StoredBlockBody { start_tx_id: current_tx_id, tx_count: 0 },
)?;
}
@ -157,16 +156,16 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
// If the block does not have a reward, the transition will be the same as the
// transition at the last transaction of this block.
let td = td_cursor
.seek(numhash)?
.ok_or(DatabaseIntegrityError::TotalDifficulty { number: numhash.number() })?
.seek(block_number)?
.ok_or(DatabaseIntegrityError::TotalDifficulty { number: block_number })?
.1;
let has_reward = self.consensus.has_block_reward(td.into());
if has_reward {
transition_id += 1;
}
block_transition_cursor.append(numhash.number(), transition_id)?;
block_transition_cursor.append(block_number, transition_id)?;
highest_block = numhash.number();
highest_block = block_number;
}
// The stage is "done" if:
@ -193,18 +192,18 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
let mut tx_transition_cursor = tx.cursor_write::<tables::TxTransitionIndex>()?;
let mut rev_walker = body_cursor.walk_back(None)?;
while let Some((key, body)) = rev_walker.next().transpose()? {
if key.number() <= input.unwind_to {
while let Some((number, body)) = rev_walker.next().transpose()? {
if number <= input.unwind_to {
break
}
// Delete the ommers value if any
if ommers_cursor.seek_exact(key)?.is_some() {
if ommers_cursor.seek_exact(number)?.is_some() {
ommers_cursor.delete_current()?;
}
// Delete the block transition if any
if block_transition_cursor.seek_exact(key.number())?.is_some() {
if block_transition_cursor.seek_exact(number)?.is_some() {
block_transition_cursor.delete_current()?;
}
@ -221,7 +220,7 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
}
// Delete the current body value
tx.delete::<tables::BlockBodies>(key, None)?;
tx.delete::<tables::BlockBodies>(number, None)?;
}
Ok(UnwindOutput { stage_progress: input.unwind_to })
@ -411,7 +410,7 @@ mod tests {
cursor::DbCursorRO,
database::Database,
mdbx::{Env, WriteMap},
models::{BlockNumHash, StoredBlockBody, StoredBlockOmmers},
models::{StoredBlockBody, StoredBlockOmmers},
tables,
transaction::{DbTx, DbTxMut},
};
@ -516,7 +515,6 @@ mod tests {
if let Some(progress) = blocks.first() {
// Insert last progress data
self.tx.commit(|tx| {
let key: BlockNumHash = (progress.number, progress.hash()).into();
let body = StoredBlockBody {
start_tx_id: 0,
tx_count: progress.body.len() as u64,
@ -530,11 +528,14 @@ mod tests {
let last_transition_id = progress.body.len() as u64;
let block_transition_id = last_transition_id + 1; // for block reward
tx.put::<tables::BlockTransitionIndex>(key.number(), block_transition_id)?;
tx.put::<tables::BlockBodies>(key, body)?;
tx.put::<tables::BlockTransitionIndex>(
progress.number,
block_transition_id,
)?;
tx.put::<tables::BlockBodies>(progress.number, body)?;
if !progress.is_empty() {
tx.put::<tables::BlockOmmers>(
key,
progress.number,
StoredBlockOmmers { ommers: vec![] },
)?;
}
@ -561,13 +562,9 @@ mod tests {
impl UnwindStageTestRunner for BodyTestRunner {
fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
self.tx
.ensure_no_entry_above::<tables::BlockBodies, _>(input.unwind_to, |key| {
key.number()
})?;
.ensure_no_entry_above::<tables::BlockBodies, _>(input.unwind_to, |key| key)?;
self.tx
.ensure_no_entry_above::<tables::BlockOmmers, _>(input.unwind_to, |key| {
key.number()
})?;
.ensure_no_entry_above::<tables::BlockOmmers, _>(input.unwind_to, |key| key)?;
self.tx.ensure_no_entry_above::<tables::BlockTransitionIndex, _>(
input.unwind_to,
|key| key,
@ -620,30 +617,29 @@ mod tests {
None => return Ok(()),
};
let mut prev_key: Option<BlockNumHash> = None;
let mut prev_number: Option<BlockNumber> = None;
let mut expected_transition_id = 0;
for entry in bodies_cursor.walk(first_body_key)? {
let (key, body) = entry?;
let (number, body) = entry?;
// Validate sequentiality only after prev progress,
// since the data before is mocked and can contain gaps
if key.number() > prev_progress {
if let Some(prev_key) = prev_key {
assert_eq!(prev_key.number() + 1, key.number(), "Body entries must be sequential");
if number > prev_progress {
if let Some(prev_key) = prev_number {
assert_eq!(prev_key + 1, number, "Body entries must be sequential");
}
}
// Validate that the current entry is below or equals to the highest allowed block
assert!(
key.number() <= highest_block,
"We wrote a block body outside of our synced range. Found block with number {}, highest block according to stage is {}",
key.number(), highest_block
number <= highest_block,
"We wrote a block body outside of our synced range. Found block with number {number}, highest block according to stage is {highest_block}",
);
let (_, header) = headers_cursor.seek_exact(key)?.expect("to be present");
let (_, header) = headers_cursor.seek_exact(number)?.expect("to be present");
// Validate that ommers exist
assert_matches!(
ommers_cursor.seek_exact(key),
ommers_cursor.seek_exact(number),
Ok(ommers) => {
assert!(if header.is_empty() { ommers.is_none() } else { ommers.is_some() })
},
@ -662,7 +658,7 @@ mod tests {
// Increment expected id for block reward.
let td = td_cursor
.seek(key)?
.seek(number)?
.expect("Missing TD for header")
.1;
if self.consensus.has_block_reward(td.into()) {
@ -670,10 +666,10 @@ mod tests {
}
// Validate that block transition exists
assert_eq!(block_transition_cursor.seek_exact(key.number()).expect("To be okay").expect("Block transition to be present").1,expected_transition_id);
assert_eq!(block_transition_cursor.seek_exact(number).expect("To be okay").expect("Block transition to be present").1,expected_transition_id);
prev_key = Some(key);
prev_number = Some(number);
}
Ok(())
})?;
@ -738,9 +734,8 @@ mod tests {
let mut headers = Vec::default();
for entry in walker {
let (num, hash) = entry?;
let (_, header) = header_cursor
.seek_exact((num, hash).into())?
.expect("missing header");
let (_, header) =
header_cursor.seek_exact(num)?.expect("missing header");
headers.push(SealedHeader::new(header, hash));
}
Ok(headers)

View File

@ -5,7 +5,7 @@ use crate::{
use reth_db::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
database::Database,
models::{BlockNumHash, StoredBlockBody, TransitionIdAddress},
models::{StoredBlockBody, TransitionIdAddress},
tables,
transaction::{DbTx, DbTxMut},
};
@ -89,12 +89,10 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
exec_or_return!(input, self.commit_threshold, "sync::stages::execution");
let last_block = input.stage_progress.unwrap_or_default();
// Get next canonical block hashes to execute.
let mut canonicals = tx.cursor_read::<tables::CanonicalHeaders>()?;
// Get header with canonical hashes.
let mut headers = tx.cursor_read::<tables::Headers>()?;
let mut headers_cursor = tx.cursor_read::<tables::Headers>()?;
// Get total difficulty
let mut tds = tx.cursor_read::<tables::HeaderTD>()?;
let mut td_cursor = tx.cursor_read::<tables::HeaderTD>()?;
// Get bodies with canonical hashes.
let mut bodies_cursor = tx.cursor_read::<tables::BlockBodies>()?;
// Get ommers with canonical hashes.
@ -104,32 +102,18 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
// Skip sender recovery and load signer from database.
let mut tx_sender = tx.cursor_read::<tables::TxSenders>()?;
// get canonical blocks (num,hash)
let canonical_batch = canonicals
// Get block headers and bodies
let block_batch = headers_cursor
.walk_range(start_block..end_block + 1)?
.map(|i| i.map(BlockNumHash))
.collect::<Result<Vec<_>, _>>()?;
// Get block headers and bodies from canonical hashes
let block_batch = canonical_batch
.iter()
.map(|key| -> Result<(Header, U256, StoredBlockBody, Vec<Header>), StageError> {
// NOTE: It probably will be faster to fetch all items from one table with cursor,
// but to reduce complexity we are using `seek_exact` to skip some
// edge cases that can happen.
let (_, header) =
headers.seek_exact(*key)?.ok_or(DatabaseIntegrityError::Header {
number: key.number(),
hash: key.hash(),
})?;
let (_, td) = tds
.seek_exact(*key)?
.ok_or(DatabaseIntegrityError::TotalDifficulty { number: key.number() })?;
.map(|entry| -> Result<(Header, U256, StoredBlockBody, Vec<Header>), StageError> {
let (number, header) = entry?;
let (_, td) = td_cursor
.seek_exact(number)?
.ok_or(DatabaseIntegrityError::TotalDifficulty { number })?;
let (_, body) = bodies_cursor
.seek_exact(*key)?
.ok_or(DatabaseIntegrityError::BlockBody { number: key.number() })?;
let (_, stored_ommers) = ommers_cursor.seek_exact(*key)?.unwrap_or_default();
.seek_exact(number)?
.ok_or(DatabaseIntegrityError::BlockBody { number })?;
let (_, stored_ommers) = ommers_cursor.seek_exact(number)?.unwrap_or_default();
Ok((header, td.into(), body, stored_ommers.ommers))
})
.collect::<Result<Vec<_>, _>>()?;
@ -138,7 +122,7 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
let mut state_provider = SubState::new(State::new(LatestStateProviderRef::new(&**tx)));
// Fetch transactions, execute them and generate results
let mut block_change_patches = Vec::with_capacity(canonical_batch.len());
let mut block_change_patches = Vec::with_capacity(block_batch.len());
for (header, td, body, ommers) in block_batch.into_iter() {
let block_number = header.number;
tracing::trace!(target: "sync::stages::execution", ?block_number, "Execute block.");

View File

@ -226,7 +226,7 @@ mod tests {
use reth_db::{
cursor::DbCursorRW,
mdbx::{tx::Tx, WriteMap, RW},
models::{BlockNumHash, StoredBlockBody, TransitionIdAddress},
models::{StoredBlockBody, TransitionIdAddress},
};
use reth_interfaces::test_utils::generators::{
random_block_range, random_contract_account_range,
@ -313,8 +313,6 @@ mod tests {
for progress in iter {
// Insert last progress data
self.tx.commit(|tx| {
let key: BlockNumHash = (progress.number, progress.hash()).into();
let body = StoredBlockBody {
start_tx_id: tx_id,
tx_count: progress.body.len() as u64,
@ -359,8 +357,8 @@ mod tests {
transition_id += 1;
}
tx.put::<tables::BlockTransitionIndex>(key.number(), transition_id)?;
tx.put::<tables::BlockBodies>(key, body)
tx.put::<tables::BlockTransitionIndex>(progress.number, transition_id)?;
tx.put::<tables::BlockBodies>(progress.number, body)
})?;
}

View File

@ -6,7 +6,6 @@ use futures_util::StreamExt;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
database::Database,
models::blocks::BlockNumHash,
tables,
transaction::{DbTx, DbTxMut},
};
@ -14,7 +13,7 @@ use reth_interfaces::{
consensus::{Consensus, ForkchoiceState},
p2p::headers::downloader::{HeaderDownloader, SyncTarget},
};
use reth_primitives::{BlockNumber, Header, SealedHeader};
use reth_primitives::{BlockNumber, SealedHeader};
use std::sync::Arc;
use tracing::*;
@ -85,18 +84,18 @@ where
// Construct head
let (_, head) = header_cursor
.seek_exact((head_num, head_hash).into())?
.ok_or(DatabaseIntegrityError::Header { number: head_num, hash: head_hash })?;
.seek_exact(head_num)?
.ok_or(DatabaseIntegrityError::Header { number: head_num })?;
let local_head = SealedHeader::new(head, head_hash);
// Look up the next header
let next_header = cursor
.next()?
.map(|(next_num, next_hash)| -> Result<Header, StageError> {
.map(|(next_num, next_hash)| -> Result<SealedHeader, StageError> {
let (_, next) = header_cursor
.seek_exact((next_num, next_hash).into())?
.ok_or(DatabaseIntegrityError::Header { number: next_num, hash: next_hash })?;
Ok(next)
.seek_exact(next_num)?
.ok_or(DatabaseIntegrityError::Header { number: next_num })?;
Ok(SealedHeader::new(next, next_hash))
})
.transpose()?;
@ -105,7 +104,7 @@ where
// progress, then there is a gap in the database and we should start downloading in
// reverse from there. Else, it should use whatever the forkchoice state reports.
let target = match next_header {
Some(header) if stage_progress + 1 != header.number => SyncTarget::Gap(header.seal()),
Some(header) if stage_progress + 1 != header.number => SyncTarget::Gap(header),
None => SyncTarget::Tip(self.next_fork_choice_state().await.head_block_hash),
_ => return Err(StageError::StageProgress(stage_progress)),
};
@ -146,15 +145,15 @@ where
continue
}
let block_hash = header.hash();
let key: BlockNumHash = (header.number, block_hash).into();
let header_hash = header.hash();
let header_number = header.number;
let header = header.unseal();
latest = Some(header.number);
// NOTE: HeaderNumbers are not sorted and can't be inserted with cursor.
tx.put::<tables::HeaderNumbers>(block_hash, header.number)?;
cursor_header.insert(key, header)?;
cursor_canonical.insert(key.number(), key.hash())?;
tx.put::<tables::HeaderNumbers>(header_hash, header_number)?;
cursor_header.insert(header_number, header)?;
cursor_canonical.insert(header_number, header_hash)?;
}
Ok(latest)
}
@ -235,7 +234,7 @@ where
input.unwind_to + 1,
)?;
tx.unwind_table_by_num::<tables::CanonicalHeaders>(input.unwind_to)?;
tx.unwind_table_by_num_hash::<tables::Headers>(input.unwind_to)?;
tx.unwind_table_by_num::<tables::Headers>(input.unwind_to)?;
Ok(UnwindOutput { stage_progress: input.unwind_to })
}
}
@ -279,7 +278,6 @@ mod tests {
ExecInput, ExecOutput, UnwindInput,
};
use reth_db::{
models::blocks::BlockNumHash,
tables,
transaction::{DbTx, DbTxMut},
};
@ -343,9 +341,7 @@ mod tests {
let head = random_header(start, None);
self.tx.insert_headers(std::iter::once(&head))?;
// patch td table for `update_head` call
self.tx.commit(|tx| {
tx.put::<tables::HeaderTD>(head.num_hash().into(), U256::ZERO.into())
})?;
self.tx.commit(|tx| tx.put::<tables::HeaderTD>(head.number, U256::ZERO.into()))?;
// use previous progress as seed size
let end = input.previous_stage.map(|(_, num)| num).unwrap_or_default() + 1;
@ -374,13 +370,12 @@ mod tests {
let hash = tx
.get::<tables::CanonicalHeaders>(block_num)?
.expect("no header hash");
let key: BlockNumHash = (block_num, hash).into();
// validate the header number
assert_eq!(tx.get::<tables::HeaderNumbers>(hash)?, Some(block_num));
// validate the header
let header = tx.get::<tables::Headers>(key)?;
let header = tx.get::<tables::Headers>(block_num)?;
assert!(header.is_some());
let header = header.unwrap().seal();
assert_eq!(header.hash(), hash);
@ -443,7 +438,7 @@ mod tests {
self.tx
.ensure_no_entry_above_by_value::<tables::HeaderNumbers, _>(block, |val| val)?;
self.tx.ensure_no_entry_above::<tables::CanonicalHeaders, _>(block, |key| key)?;
self.tx.ensure_no_entry_above::<tables::Headers, _>(block, |key| key.number())?;
self.tx.ensure_no_entry_above::<tables::Headers, _>(block, |key| key)?;
Ok(())
}
}
@ -512,7 +507,7 @@ mod tests {
// Checkpoint and no gap
tx.put::<tables::CanonicalHeaders>(head.number, head.hash())
.expect("failed to write canonical");
tx.put::<tables::Headers>(head.num_hash().into(), head.clone().unseal())
tx.put::<tables::Headers>(head.number, head.clone().unseal())
.expect("failed to write header");
let gap = stage.get_sync_gap(&tx, stage_progress).await.unwrap();
@ -522,7 +517,7 @@ mod tests {
// Checkpoint and gap
tx.put::<tables::CanonicalHeaders>(gap_tip.number, gap_tip.hash())
.expect("failed to write canonical");
tx.put::<tables::Headers>(gap_tip.num_hash().into(), gap_tip.clone().unseal())
tx.put::<tables::Headers>(gap_tip.number, gap_tip.clone().unseal())
.expect("failed to write header");
let gap = stage.get_sync_gap(&tx, stage_progress).await.unwrap();
@ -532,7 +527,7 @@ mod tests {
// Checkpoint and gap closed
tx.put::<tables::CanonicalHeaders>(gap_fill.number, gap_fill.hash())
.expect("failed to write canonical");
tx.put::<tables::Headers>(gap_fill.num_hash().into(), gap_fill.clone().unseal())
tx.put::<tables::Headers>(gap_fill.number, gap_fill.clone().unseal())
.expect("failed to write header");
assert_matches!(

View File

@ -98,7 +98,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
let from_transition = tx.get_block_transition(stage_progress)?;
let to_transition = tx.get_block_transition(previous_stage_progress)?;
let block_root = tx.get_header_by_num(previous_stage_progress)?.state_root;
let block_root = tx.get_header(previous_stage_progress)?.state_root;
let trie_root = if from_transition == to_transition {
block_root
@ -110,7 +110,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
} else {
debug!(target: "sync::stages::merkle::exec", current = ?stage_progress, target = ?previous_stage_progress, "Updating trie");
// Iterate over changeset (similar to Hashing stages) and take new values
let current_root = tx.get_header_by_num(stage_progress)?.state_root;
let current_root = tx.get_header(stage_progress)?.state_root;
let loader = DBTrieLoader::default();
loader
.update_root(tx, current_root, from_transition..to_transition)
@ -140,7 +140,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
return Ok(UnwindOutput { stage_progress: input.unwind_to })
}
let target_root = tx.get_header_by_num(input.unwind_to)?.state_root;
let target_root = tx.get_header(input.unwind_to)?.state_root;
// If the merkle stage fails to execute, the trie changes weren't commited
// and the root stayed the same
@ -150,7 +150,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
}
let loader = DBTrieLoader::default();
let current_root = tx.get_header_by_num(input.stage_progress)?.state_root;
let current_root = tx.get_header(input.stage_progress)?.state_root;
let from_transition = tx.get_block_transition(input.unwind_to)?;
let to_transition = tx.get_block_transition(input.stage_progress)?;
@ -174,7 +174,7 @@ mod tests {
use assert_matches::assert_matches;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
models::{AccountBeforeTx, BlockNumHash, StoredBlockBody},
models::{AccountBeforeTx, StoredBlockBody},
tables,
transaction::{DbTx, DbTxMut},
};
@ -297,8 +297,6 @@ mod tests {
for progress in blocks.iter() {
// Insert last progress data
self.tx.commit(|tx| {
let key: BlockNumHash = (progress.number, progress.hash()).into();
let body = StoredBlockBody {
start_tx_id: tx_id,
tx_count: progress.body.len() as u64,
@ -341,20 +339,20 @@ mod tests {
Ok(())
})?;
tx.put::<tables::BlockTransitionIndex>(key.number(), transition_id)?;
tx.put::<tables::BlockBodies>(key, body)
tx.put::<tables::BlockTransitionIndex>(progress.number, transition_id)?;
tx.put::<tables::BlockBodies>(progress.number, body)
})?;
}
self.insert_accounts(&accounts)?;
self.insert_storages(&storages)?;
let last_numhash = self.tx.inner().get_block_numhash(end - 1).unwrap();
let last_block_number = end - 1;
let root = self.state_root()?;
self.tx.commit(|tx| {
let mut last_header = tx.get::<tables::Headers>(last_numhash)?.unwrap();
let mut last_header = tx.get::<tables::Headers>(last_block_number)?.unwrap();
last_header.state_root = root;
tx.put::<tables::Headers>(last_numhash, last_header)
tx.put::<tables::Headers>(last_block_number, last_header)
})?;
Ok(blocks)
@ -540,7 +538,7 @@ mod tests {
fn check_root(&self, previous_stage_progress: u64) -> Result<(), TestRunnerError> {
if previous_stage_progress != 0 {
let block_root =
self.tx.inner().get_header_by_num(previous_stage_progress).unwrap().state_root;
self.tx.inner().get_header(previous_stage_progress).unwrap().state_root;
let root = DBTrieLoader::default().calculate_root(&self.tx.inner()).unwrap();
assert_eq!(block_root, root);
}

View File

@ -57,10 +57,10 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
exec_or_return!(input, self.commit_threshold, "sync::stages::sender_recovery");
// Look up the start index for the transaction range
let start_tx_index = tx.get_block_body_by_num(start_block)?.start_tx_id;
let start_tx_index = tx.get_block_body(start_block)?.start_tx_id;
// Look up the end index for transaction range (inclusive)
let end_tx_index = tx.get_block_body_by_num(end_block)?.last_tx_index();
let end_tx_index = tx.get_block_body(end_block)?.last_tx_index();
// No transactions to walk over
if start_tx_index > end_tx_index {
@ -121,7 +121,7 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
) -> Result<UnwindOutput, StageError> {
info!(target: "sync::stages::sender_recovery", to_block = input.unwind_to, "Unwinding");
// Lookup latest tx id that we should unwind to
let latest_tx_id = tx.get_block_body_by_num(input.unwind_to)?.last_tx_index();
let latest_tx_id = tx.get_block_body(input.unwind_to)?.last_tx_index();
tx.unwind_table_by_num::<tables::TxSenders>(latest_tx_id)?;
Ok(UnwindOutput { stage_progress: input.unwind_to })
}
@ -252,7 +252,7 @@ mod tests {
/// 2. If the is no requested block entry in the bodies table,
/// but [tables::TxSenders] is not empty.
fn ensure_no_senders_by_block(&self, block: BlockNumber) -> Result<(), TestRunnerError> {
let body_result = self.tx.inner().get_block_body_by_num(block);
let body_result = self.tx.inner().get_block_body(block);
match body_result {
Ok(body) => self
.tx
@ -306,9 +306,8 @@ mod tests {
return Ok(())
}
let start_hash = tx.get::<tables::CanonicalHeaders>(start_block)?.unwrap();
let mut body_cursor = tx.cursor_read::<tables::BlockBodies>()?;
body_cursor.seek_exact((start_block, start_hash).into())?;
body_cursor.seek_exact(start_block)?;
while let Some((_, body)) = body_cursor.next()? {
for tx_id in body.tx_id_range() {

View File

@ -56,17 +56,16 @@ impl<DB: Database> Stage<DB> for TotalDifficultyStage {
let mut cursor_headers = tx.cursor_read::<tables::Headers>()?;
// Get latest total difficulty
let last_header_key = tx.get_block_numhash(input.stage_progress.unwrap_or_default())?;
let last_header_number = input.stage_progress.unwrap_or_default();
let last_entry = cursor_td
.seek_exact(last_header_key)?
.ok_or(DatabaseIntegrityError::TotalDifficulty { number: last_header_key.number() })?;
.seek_exact(last_header_number)?
.ok_or(DatabaseIntegrityError::TotalDifficulty { number: last_header_number })?;
let mut td: U256 = last_entry.1.into();
debug!(target: "sync::stages::total_difficulty", ?td, block_number = last_header_key.number(), "Last total difficulty entry");
debug!(target: "sync::stages::total_difficulty", ?td, block_number = last_header_number, "Last total difficulty entry");
let start_key = tx.get_block_numhash(start_block)?;
let walker = cursor_headers
.walk(start_key)?
.walk(start_block)?
.take_while(|e| e.as_ref().map(|(_, h)| h.number <= end_block).unwrap_or_default());
// Walk over newly inserted headers, update & insert td
for entry in walker {
@ -111,7 +110,7 @@ impl<DB: Database> Stage<DB> for TotalDifficultyStage {
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
info!(target: "sync::stages::total_difficulty", to_block = input.unwind_to, "Unwinding");
tx.unwind_table_by_num_hash::<tables::HeaderTD>(input.unwind_to)?;
tx.unwind_table_by_num::<tables::HeaderTD>(input.unwind_to)?;
Ok(UnwindOutput { stage_progress: input.unwind_to })
}
}
@ -211,7 +210,7 @@ mod tests {
.map(|(_, v)| v)
.unwrap_or_default()
.into();
tx.put::<tables::HeaderTD>(head.num_hash().into(), (td + head.difficulty).into())
tx.put::<tables::HeaderTD>(head.number, (td + head.difficulty).into())
})?;
// use previous progress as seed size
@ -237,15 +236,14 @@ mod tests {
match output {
Some(output) if output.stage_progress > initial_stage_progress => {
self.tx.query(|tx| {
let start_hash = tx
.get::<tables::CanonicalHeaders>(initial_stage_progress)?
.expect("no initial header hash");
let start_key = (initial_stage_progress, start_hash).into();
let mut header_cursor = tx.cursor_read::<tables::Headers>()?;
let (_, mut current_header) =
header_cursor.seek_exact(start_key)?.expect("no initial header");
let mut td: U256 =
tx.get::<tables::HeaderTD>(start_key)?.expect("no initial td").into();
let (_, mut current_header) = header_cursor
.seek_exact(initial_stage_progress)?
.expect("no initial header");
let mut td: U256 = tx
.get::<tables::HeaderTD>(initial_stage_progress)?
.expect("no initial td")
.into();
while let Some((next_key, next_header)) = header_cursor.next()? {
assert_eq!(current_header.number + 1, next_header.number);
@ -273,7 +271,7 @@ mod tests {
impl TotalDifficultyTestRunner {
fn check_no_td_above(&self, block: BlockNumber) -> Result<(), TestRunnerError> {
self.tx.ensure_no_entry_above::<tables::HeaderTD, _>(block, |key| key.number())?;
self.tx.ensure_no_entry_above::<tables::HeaderTD, _>(block, |num| num)?;
Ok(())
}

View File

@ -56,14 +56,10 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
let mut cursor_bodies = tx.cursor_read::<tables::BlockBodies>()?;
let mut tx_cursor = tx.cursor_write::<tables::Transactions>()?;
let start_key = tx.get_block_numhash(start_block)?;
// Walk over block bodies within a specified range.
let bodies = cursor_bodies.walk(start_key)?.take_while(|entry| {
entry
.as_ref()
.map(|(block_num_hash, _)| block_num_hash.number() <= end_block)
.unwrap_or_default()
let bodies = cursor_bodies.walk(start_block)?.take_while(|entry| {
entry.as_ref().map(|(num, _)| *num <= end_block).unwrap_or_default()
});
// Collect transactions for each body
@ -117,8 +113,8 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
let mut tx_hash_number_cursor = tx.cursor_write::<tables::TxHashNumber>()?;
let mut transaction_cursor = tx.cursor_write::<tables::Transactions>()?;
let mut rev_walker = body_cursor.walk_back(None)?;
while let Some((key, body)) = rev_walker.next().transpose()? {
if key.number() <= input.unwind_to {
while let Some((number, body)) = rev_walker.next().transpose()? {
if number <= input.unwind_to {
break
}
@ -247,8 +243,8 @@ mod tests {
///
/// 2. If the is no requested block entry in the bodies table,
/// but [tables::TxHashNumber] is not empty.
fn ensure_no_hash_by_block(&self, block: BlockNumber) -> Result<(), TestRunnerError> {
let body_result = self.tx.inner().get_block_body_by_num(block);
fn ensure_no_hash_by_block(&self, number: BlockNumber) -> Result<(), TestRunnerError> {
let body_result = self.tx.inner().get_block_body(number);
match body_result {
Ok(body) => self.tx.ensure_no_entry_above_by_value::<tables::TxHashNumber, _>(
body.last_tx_index(),
@ -301,9 +297,8 @@ mod tests {
return Ok(())
}
let start_hash = tx.get::<tables::CanonicalHeaders>(start_block)?.unwrap();
let mut body_cursor = tx.cursor_read::<tables::BlockBodies>()?;
body_cursor.seek_exact((start_block, start_hash).into())?;
body_cursor.seek_exact(start_block)?;
while let Some((_, body)) = body_cursor.next()? {
for tx_id in body.tx_id_range() {

View File

@ -5,7 +5,7 @@ use reth_db::{
tx::Tx,
Env, EnvKind, WriteMap, RW,
},
models::{BlockNumHash, StoredBlockBody},
models::StoredBlockBody,
table::Table,
tables,
transaction::{DbTx, DbTxMut},
@ -186,11 +186,9 @@ impl TestTransaction {
let headers = headers.collect::<Vec<_>>();
for header in headers {
let key: BlockNumHash = header.num_hash().into();
tx.put::<tables::CanonicalHeaders>(header.number, header.hash())?;
tx.put::<tables::HeaderNumbers>(header.hash(), header.number)?;
tx.put::<tables::Headers>(key, header.clone().unseal())?;
tx.put::<tables::Headers>(header.number, header.clone().unseal())?;
}
Ok(())
@ -209,13 +207,11 @@ impl TestTransaction {
let mut td = U256::ZERO;
for header in headers {
let key: BlockNumHash = header.num_hash().into();
td += header.difficulty;
tx.put::<tables::HeaderTD>(header.num_hash().into(), td.into())?;
tx.put::<tables::HeaderTD>(header.number, td.into())?;
tx.put::<tables::CanonicalHeaders>(header.number, header.hash())?;
tx.put::<tables::HeaderNumbers>(header.hash(), header.number)?;
tx.put::<tables::Headers>(key, header.clone().unseal())?;
tx.put::<tables::Headers>(header.number, header.clone().unseal())?;
}
Ok(())
@ -232,16 +228,14 @@ impl TestTransaction {
let mut current_tx_id = tx_offset.unwrap_or_default();
for block in blocks {
let key: BlockNumHash = block.num_hash().into();
// Insert into header tables.
tx.put::<tables::CanonicalHeaders>(block.number, block.hash())?;
tx.put::<tables::HeaderNumbers>(block.hash(), block.number)?;
tx.put::<tables::Headers>(key, block.header.clone().unseal())?;
tx.put::<tables::Headers>(block.number, block.header.clone().unseal())?;
// Insert into body tables.
tx.put::<tables::BlockBodies>(
key,
block.number,
StoredBlockBody {
start_tx_id: current_tx_id,
tx_count: block.body.len() as u64,

View File

@ -177,16 +177,16 @@ mod tests {
let env = test_utils::create_test_db::<NoWriteMap>(EnvKind::RW);
let value = Header::default();
let key = (1u64, H256::zero());
let key = 1u64;
// PUT
let tx = env.tx_mut().expect(ERROR_INIT_TX);
tx.put::<Headers>(key.into(), value.clone()).expect(ERROR_PUT);
tx.put::<Headers>(key, value.clone()).expect(ERROR_PUT);
tx.commit().expect(ERROR_COMMIT);
// GET
let tx = env.tx().expect(ERROR_INIT_TX);
let result = tx.get::<Headers>(key.into()).expect(ERROR_GET);
let result = tx.get::<Headers>(key).expect(ERROR_GET);
assert!(result.expect(ERROR_RETURN_VALUE) == value);
tx.commit().expect(ERROR_COMMIT);
}
@ -196,11 +196,11 @@ mod tests {
let env = test_utils::create_test_db::<NoWriteMap>(EnvKind::RW);
let value = Header::default();
let key = (1u64, H256::zero());
let key = 1u64;
// PUT
let tx = env.tx_mut().expect(ERROR_INIT_TX);
tx.put::<Headers>(key.into(), value.clone()).expect(ERROR_PUT);
tx.put::<Headers>(key, value.clone()).expect(ERROR_PUT);
tx.commit().expect(ERROR_COMMIT);
// Cursor
@ -211,7 +211,7 @@ mod tests {
assert!(first.is_some(), "First should be our put");
// Walk
let walk = cursor.walk(key.into()).unwrap();
let walk = cursor.walk(key).unwrap();
let first = walk.into_iter().next().unwrap().unwrap();
assert_eq!(first.1, value, "First next should be put value");
}

View File

@ -12,7 +12,7 @@ use crate::{
models::{
accounts::{AccountBeforeTx, TransitionIdAddress},
blocks::{HeaderHash, StoredBlockOmmers},
BlockNumHash, ShardedKey,
ShardedKey,
},
},
};
@ -33,14 +33,13 @@ pub enum TableType {
}
/// Default tables that should be present inside database.
pub const TABLES: [(TableType, &str); 27] = [
pub const TABLES: [(TableType, &str); 26] = [
(TableType::Table, CanonicalHeaders::const_name()),
(TableType::Table, HeaderTD::const_name()),
(TableType::Table, HeaderNumbers::const_name()),
(TableType::Table, Headers::const_name()),
(TableType::Table, BlockBodies::const_name()),
(TableType::Table, BlockOmmers::const_name()),
(TableType::Table, NonCanonicalTransactions::const_name()),
(TableType::Table, Transactions::const_name()),
(TableType::Table, TxHashNumber::const_name()),
(TableType::Table, Receipts::const_name()),
@ -119,7 +118,7 @@ table!(
table!(
/// Stores the total difficulty from a block header.
( HeaderTD ) BlockNumHash | CompactU256
( HeaderTD ) BlockNumber | CompactU256
);
table!(
@ -129,22 +128,17 @@ table!(
table!(
/// Stores header bodies.
( Headers ) BlockNumHash | Header
( Headers ) BlockNumber | Header
);
table!(
/// Stores block bodies.
( BlockBodies ) BlockNumHash | StoredBlockBody
( BlockBodies ) BlockNumber | StoredBlockBody
);
table!(
/// Stores the uncles/ommers of the block.
( BlockOmmers ) BlockNumHash | StoredBlockOmmers
);
table!(
/// Stores the transaction body from non canonical transactions.
( NonCanonicalTransactions ) BlockNumHashTxNumber | TransactionSigned
( BlockOmmers ) BlockNumber | StoredBlockOmmers
);
table!(
@ -317,6 +311,4 @@ pub type ConfigKey = Vec<u8>;
/// Temporary placeholder type for DB.
pub type ConfigValue = Vec<u8>;
/// Temporary placeholder type for DB.
pub type BlockNumHashTxNumber = Vec<u8>;
/// Temporary placeholder type for DB.
pub type Bytecode = Vec<u8>;

View File

@ -37,7 +37,13 @@ impl<DB: Clone> Clone for ShareableDatabase<DB> {
impl<DB: Database> HeaderProvider for ShareableDatabase<DB> {
fn header(&self, block_hash: &BlockHash) -> Result<Option<Header>> {
self.db.view(|tx| tx.get::<tables::Headers>((0, *block_hash).into()))?.map_err(Into::into)
self.db.view(|tx| {
if let Some(num) = tx.get::<tables::HeaderNumbers>(*block_hash)? {
Ok(tx.get::<tables::Headers>(num)?)
} else {
Ok(None)
}
})?
}
fn header_by_number(&self, num: BlockNumber) -> Result<Option<Header>> {
@ -49,12 +55,13 @@ impl<DB: Database> HeaderProvider for ShareableDatabase<DB> {
}
fn header_td(&self, hash: &BlockHash) -> Result<Option<U256>> {
if let Some(num) = self.db.view(|tx| tx.get::<tables::HeaderNumbers>(*hash))?? {
let td = self.db.view(|tx| tx.get::<tables::HeaderTD>((num, *hash).into()))??;
Ok(td.map(|v| v.0))
} else {
Ok(None)
}
self.db.view(|tx| {
if let Some(num) = tx.get::<tables::HeaderNumbers>(*hash)? {
Ok(tx.get::<tables::HeaderTD>(num)?.map(|td| td.0))
} else {
Ok(None)
}
})?
}
}
@ -112,19 +119,6 @@ impl<DB: Database> StateProviderFactory for ShareableDatabase<DB> {
let block_number =
tx.get::<tables::HeaderNumbers>(block_hash)?.ok_or(Error::BlockHash { block_hash })?;
// check if block is canonical or not. Only canonical blocks have changesets.
let canonical_block_hash = tx
.get::<tables::CanonicalHeaders>(block_number)?
.ok_or(Error::BlockCanonical { block_number, block_hash })?;
if canonical_block_hash != block_hash {
return Err(Error::NonCanonicalBlock {
block_number,
received_hash: block_hash,
expected_hash: canonical_block_hash,
}
.into())
}
// get transition id
let transition = tx
.get::<tables::BlockTransitionIndex>(block_number)?

View File

@ -1,5 +1,5 @@
use reth_db::{
models::{BlockNumHash, StoredBlockBody, StoredBlockOmmers},
models::{StoredBlockBody, StoredBlockOmmers},
tables,
transaction::{DbTx, DbTxMut},
};
@ -19,20 +19,19 @@ pub fn insert_block<'a, TX: DbTxMut<'a> + DbTx<'a>>(
has_block_reward: bool,
parent_tx_num_transition_id: Option<(u64, u64)>,
) -> Result<()> {
let block_num_hash = BlockNumHash((block.number, block.hash()));
tx.put::<tables::CanonicalHeaders>(block.number, block.hash())?;
// Put header with canonical hashes.
tx.put::<tables::Headers>(block_num_hash, block.header.as_ref().clone())?;
tx.put::<tables::Headers>(block.number, block.header.as_ref().clone())?;
tx.put::<tables::HeaderNumbers>(block.hash(), block.number)?;
tx.put::<tables::HeaderTD>(
block_num_hash,
block.number,
if has_block_reward { U256::ZERO } else { U256::from(58_750_000_000_000_000_000_000u128) }
.into(),
)?;
// insert body ommers data
tx.put::<tables::BlockOmmers>(
block_num_hash,
block.number,
StoredBlockOmmers { ommers: block.ommers.iter().map(|h| h.as_ref().clone()).collect() },
)?;
@ -43,15 +42,9 @@ pub fn insert_block<'a, TX: DbTxMut<'a> + DbTx<'a>>(
(0, 0)
} else {
let prev_block_num = block.number - 1;
let prev_block_hash = tx
.get::<tables::CanonicalHeaders>(prev_block_num)?
.ok_or(ProviderError::BlockNumber { block_number: prev_block_num })?;
let prev_body = tx
.get::<tables::BlockBodies>((prev_block_num, prev_block_hash).into())?
.ok_or(ProviderError::BlockBody {
block_number: prev_block_num,
block_hash: prev_block_hash,
})?;
.get::<tables::BlockBodies>(prev_block_num)?
.ok_or(ProviderError::BlockBody { block_number: prev_block_num })?;
let last_transition_id = tx
.get::<tables::BlockTransitionIndex>(prev_block_num)?
.ok_or(ProviderError::BlockTransition { block_number: prev_block_num })?;
@ -60,7 +53,7 @@ pub fn insert_block<'a, TX: DbTxMut<'a> + DbTx<'a>>(
// insert body data
tx.put::<tables::BlockBodies>(
block_num_hash,
block.number,
StoredBlockBody { start_tx_id: current_tx_id, tx_count: block.body.len() as u64 },
)?;

View File

@ -40,7 +40,6 @@ There are many tables within the node, all used to store different types of data
- Headers
- BlockBodies
- BlockOmmers
- NonCanonicalTransactions
- Transactions
- TxHashNumber
- Receipts
@ -241,10 +240,9 @@ Lets take a look at a couple examples before moving on. In the snippet below, th
[File: crates/storage/provider/src/block.rs](https://github.com/paradigmxyz/reth/blob/main/crates/storage/provider/src/block.rs#L121-L125)
```rust ignore
let block_num_hash = BlockNumHash((block.number, block.hash()));
tx.put::<tables::CanonicalHeaders>(block.number, block.hash())?;
// Put header with canonical hashes.
tx.put::<tables::Headers>(block_num_hash, block.header.as_ref().clone())?;
tx.put::<tables::Headers>(block.number, block.header.as_ref().clone())?;
tx.put::<tables::HeaderNumbers>(block.hash(), block.number)?;
```