feat(db): Don't store TxHash when writing to the table (#2279)

This commit is contained in:
joshieDo
2023-04-21 17:15:52 +08:00
committed by GitHub
parent 2e7139c1ae
commit 9452b3658b
25 changed files with 358 additions and 118 deletions

View File

@ -176,7 +176,7 @@ where
let Block { mut header, body, .. } = block;
// clear all transactions from pool
pool.remove_transactions(body.iter().map(|tx| tx.hash));
pool.remove_transactions(body.iter().map(|tx| tx.hash()));
header.receipts_root = if post_state.receipts().is_empty() {
EMPTY_RECEIPTS

View File

@ -420,11 +420,11 @@ where
// If we received the transactions as the response to our GetPooledTransactions
// requests (based on received `NewPooledTransactionHashes`) then we already
// recorded the hashes in [`Self::on_new_pooled_transaction_hashes`]
if source.is_broadcast() && !peer.transactions.insert(tx.hash) {
if source.is_broadcast() && !peer.transactions.insert(tx.hash()) {
num_already_seen += 1;
}
match self.transactions_by_peers.entry(tx.hash) {
match self.transactions_by_peers.entry(tx.hash()) {
Entry::Occupied(mut entry) => {
// transaction was already inserted
entry.get_mut().push(peer_id);
@ -566,7 +566,7 @@ struct PropagateTransaction {
impl PropagateTransaction {
fn hash(&self) -> TxHash {
self.transaction.hash
self.transaction.hash()
}
fn new(transaction: Arc<TransactionSigned>) -> Self {

View File

@ -72,8 +72,8 @@ pub use transaction::{
util::secp256k1::sign_message, AccessList, AccessListItem, AccessListWithGasUsed,
FromRecoveredTransaction, IntoRecoveredTransaction, InvalidTransactionError, Signature,
Transaction, TransactionKind, TransactionMeta, TransactionSigned, TransactionSignedEcRecovered,
TxEip1559, TxEip2930, TxLegacy, TxType, EIP1559_TX_TYPE_ID, EIP2930_TX_TYPE_ID,
LEGACY_TX_TYPE_ID,
TransactionSignedNoHash, TxEip1559, TxEip2930, TxLegacy, TxType, EIP1559_TX_TYPE_ID,
EIP2930_TX_TYPE_ID, LEGACY_TX_TYPE_ID,
};
pub use withdrawal::Withdrawal;

View File

@ -4,10 +4,11 @@ use bytes::{Buf, BytesMut};
use derive_more::{AsRef, Deref};
pub use error::InvalidTransactionError;
pub use meta::TransactionMeta;
use reth_codecs::{add_arbitrary_tests, main_codec, Compact};
use reth_codecs::{add_arbitrary_tests, derive_arbitrary, main_codec, Compact};
use reth_rlp::{
length_of_length, Decodable, DecodeError, Encodable, Header, EMPTY_LIST_CODE, EMPTY_STRING_CODE,
};
use serde::{Deserialize, Serialize};
pub use signature::Signature;
pub use tx_type::{TxType, EIP1559_TX_TYPE_ID, EIP2930_TX_TYPE_ID, LEGACY_TX_TYPE_ID};
@ -167,8 +168,8 @@ pub struct TxEip1559 {
/// A raw transaction.
///
/// Transaction types were introduced in [EIP-2718](https://eips.ethereum.org/EIPS/eip-2718).
#[main_codec]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[derive_arbitrary(compact)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum Transaction {
/// Legacy transaction.
Legacy(TxLegacy),
@ -178,6 +179,90 @@ pub enum Transaction {
Eip1559(TxEip1559),
}
impl Transaction {
/// This encodes the transaction _without_ the signature, and is only suitable for creating a
/// hash intended for signing.
pub fn encode_without_signature(&self, out: &mut dyn bytes::BufMut) {
Encodable::encode(self, out);
}
/// Inner encoding function that is used for both rlp [`Encodable`] trait and for calculating
/// hash that for eip2718 does not require rlp header
pub fn encode_with_signature(
&self,
signature: &Signature,
out: &mut dyn bytes::BufMut,
with_header: bool,
) {
match self {
Transaction::Legacy(TxLegacy { chain_id, .. }) => {
// do nothing w/ with_header
let payload_length =
self.fields_len() + signature.payload_len_with_eip155_chain_id(*chain_id);
let header = Header { list: true, payload_length };
header.encode(out);
self.encode_fields(out);
signature.encode_with_eip155_chain_id(out, *chain_id);
}
_ => {
let payload_length = self.fields_len() + signature.payload_len();
if with_header {
Header {
list: false,
payload_length: 1 + length_of_length(payload_length) + payload_length,
}
.encode(out);
}
out.put_u8(self.tx_type() as u8);
let header = Header { list: true, payload_length };
header.encode(out);
self.encode_fields(out);
signature.encode(out);
}
}
}
}
impl Compact for Transaction {
fn to_compact<B>(self, buf: &mut B) -> usize
where
B: bytes::BufMut + AsMut<[u8]>,
{
match self {
Transaction::Legacy(tx) => {
tx.to_compact(buf);
0
}
Transaction::Eip2930(tx) => {
tx.to_compact(buf);
1
}
Transaction::Eip1559(tx) => {
tx.to_compact(buf);
2
}
}
}
fn from_compact(buf: &[u8], identifier: usize) -> (Self, &[u8]) {
match identifier {
0 => {
let (tx, buf) = TxLegacy::from_compact(buf, buf.len());
(Transaction::Legacy(tx), buf)
}
1 => {
let (tx, buf) = TxEip2930::from_compact(buf, buf.len());
(Transaction::Eip2930(tx), buf)
}
2 => {
let (tx, buf) = TxEip1559::from_compact(buf, buf.len());
(Transaction::Eip1559(tx), buf)
}
_ => unreachable!(),
}
}
}
// === impl Transaction ===
impl Transaction {
@ -549,8 +634,8 @@ impl Encodable for Transaction {
}
/// Whether or not the transaction is a contract creation.
#[main_codec]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
#[derive_arbitrary(compact, rlp)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
pub enum TransactionKind {
/// A transaction that creates a contract.
#[default]
@ -559,6 +644,32 @@ pub enum TransactionKind {
Call(Address),
}
impl Compact for TransactionKind {
fn to_compact<B>(self, buf: &mut B) -> usize
where
B: bytes::BufMut + AsMut<[u8]>,
{
match self {
TransactionKind::Create => 0,
TransactionKind::Call(address) => {
address.to_compact(buf);
1
}
}
}
fn from_compact(buf: &[u8], identifier: usize) -> (Self, &[u8]) {
match identifier {
0 => (TransactionKind::Create, buf),
1 => {
let (addr, buf) = Address::from_compact(buf, buf.len());
(TransactionKind::Call(addr), buf)
}
_ => unreachable!(),
}
}
}
impl Encodable for TransactionKind {
fn encode(&self, out: &mut dyn reth_rlp::BufMut) {
match self {
@ -590,10 +701,77 @@ impl Decodable for TransactionKind {
}
}
/// Signed transaction without its Hash. Used type for inserting into the DB.
#[derive_arbitrary(compact)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, AsRef, Deref, Default, Serialize, Deserialize)]
pub struct TransactionSignedNoHash {
/// The transaction signature values
pub signature: Signature,
/// Raw transaction info
#[deref]
#[as_ref]
pub transaction: Transaction,
}
impl TransactionSignedNoHash {
/// Calculates the transaction hash. If used more than once, it's better to convert it to
/// [`TransactionSigned`] first.
pub fn hash(&self) -> H256 {
let mut buf = Vec::new();
self.transaction.encode_with_signature(&self.signature, &mut buf, false);
keccak256(&buf)
}
/// Converts into a transaction type with its hash: [`TransactionSigned`].
pub fn with_hash(self) -> TransactionSigned {
self.into()
}
}
impl Compact for TransactionSignedNoHash {
fn to_compact<B>(self, buf: &mut B) -> usize
where
B: bytes::BufMut + AsMut<[u8]>,
{
let before = buf.as_mut().len();
// placeholder for bitflags
buf.put_u8(0);
let sig_bit = self.signature.to_compact(buf) as u8;
let tx_bit = self.transaction.to_compact(buf) as u8;
// replace with actual flags
buf.as_mut()[before] = sig_bit | (tx_bit << 1);
buf.as_mut().len() - before
}
fn from_compact(mut buf: &[u8], _: usize) -> (Self, &[u8]) {
let prefix = buf.get_u8() as usize;
let (signature, buf) = Signature::from_compact(buf, prefix & 1);
let (transaction, buf) = Transaction::from_compact(buf, prefix >> 1);
(TransactionSignedNoHash { signature, transaction }, buf)
}
}
impl From<TransactionSignedNoHash> for TransactionSigned {
fn from(tx: TransactionSignedNoHash) -> Self {
TransactionSigned::from_transaction_and_signature(tx.transaction, tx.signature)
}
}
impl From<TransactionSigned> for TransactionSignedNoHash {
fn from(tx: TransactionSigned) -> Self {
TransactionSignedNoHash { signature: tx.signature, transaction: tx.transaction }
}
}
/// Signed transaction.
#[main_codec(no_arbitrary)]
#[add_arbitrary_tests(rlp, compact)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, AsRef, Deref, Default)]
#[add_arbitrary_tests(rlp)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, AsRef, Deref, Default, Serialize, Deserialize)]
pub struct TransactionSigned {
/// Transaction hash
pub hash: TxHash,
@ -624,6 +802,11 @@ impl TransactionSigned {
self.hash
}
/// Reference to transaction hash. Used to identify transaction.
pub fn hash_ref(&self) -> &TxHash {
&self.hash
}
/// Recover signer from signature and hash.
///
/// Returns `None` if the transaction's signature is invalid, see also [Self::recover_signer].
@ -668,32 +851,7 @@ impl TransactionSigned {
/// Inner encoding function that is used for both rlp [`Encodable`] trait and for calculating
/// hash that for eip2718 does not require rlp header
pub(crate) fn encode_inner(&self, out: &mut dyn bytes::BufMut, with_header: bool) {
match self.transaction {
Transaction::Legacy(TxLegacy { chain_id, .. }) => {
// do nothing w/ with_header
let payload_length = self.transaction.fields_len() +
self.signature.payload_len_with_eip155_chain_id(chain_id);
let header = Header { list: true, payload_length };
header.encode(out);
self.transaction.encode_fields(out);
self.signature.encode_with_eip155_chain_id(out, chain_id);
}
_ => {
let payload_length = self.transaction.fields_len() + self.signature.payload_len();
if with_header {
Header {
list: false,
payload_length: 1 + length_of_length(payload_length) + payload_length,
}
.encode(out);
}
out.put_u8(self.transaction.tx_type() as u8);
let header = Header { list: true, payload_length };
header.encode(out);
self.transaction.encode_fields(out);
self.signature.encode(out);
}
}
self.transaction.encode_with_signature(&self.signature, out, with_header);
}
/// Output the length of the encode_inner(out, true). Note to assume that `with_header` is only
@ -920,7 +1078,6 @@ impl<'a> arbitrary::Arbitrary<'a> for TransactionSigned {
}
/// Signed transaction with recovered signer.
#[main_codec]
#[derive(Debug, Clone, PartialEq, Eq, Hash, AsRef, Deref, Default)]
pub struct TransactionSignedEcRecovered {
/// Signer of the transaction

View File

@ -1,12 +1,14 @@
use crate::{transaction::util::secp256k1, Address, H256, U256};
use reth_codecs::{main_codec, Compact};
use bytes::Buf;
use reth_codecs::{derive_arbitrary, Compact};
use reth_rlp::{Decodable, DecodeError, Encodable};
use serde::{Deserialize, Serialize};
/// r, s: Values corresponding to the signature of the
/// transaction and used to determine the sender of
/// the transaction; formally Tr and Ts. This is expanded in Appendix F of yellow paper.
#[main_codec]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Default)]
#[derive_arbitrary(compact)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
pub struct Signature {
/// The R field of the signature; the point on the curve.
pub r: U256,
@ -16,6 +18,27 @@ pub struct Signature {
pub odd_y_parity: bool,
}
impl Compact for Signature {
fn to_compact<B>(self, buf: &mut B) -> usize
where
B: bytes::BufMut + AsMut<[u8]>,
{
buf.put_slice(self.r.as_le_bytes().as_ref());
buf.put_slice(self.s.as_le_bytes().as_ref());
self.odd_y_parity as usize
}
fn from_compact(mut buf: &[u8], identifier: usize) -> (Self, &[u8]) {
let r = U256::try_from_le_slice(&buf[..32]).expect("qed");
buf.advance(32);
let s = U256::try_from_le_slice(&buf[..32]).expect("qed");
buf.advance(32);
(Signature { r, s, odd_y_parity: identifier != 0 }, buf)
}
}
impl Signature {
/// Output the length of the signature without the length of the RLP header, using the legacy
/// scheme with EIP-155 support depends on chain_id.
@ -91,7 +114,7 @@ impl Signature {
}
/// Recover signature from hash.
pub(crate) fn recover_signer(&self, hash: H256) -> Option<Address> {
pub fn recover_signer(&self, hash: H256) -> Option<Address> {
let mut sig: [u8; 65] = [0; 65];
sig[0..32].copy_from_slice(&self.r.to_be_bytes::<32>());

View File

@ -115,7 +115,7 @@ impl Block {
block_hash: Option<H256>,
) -> Self {
let block_hash = block_hash.unwrap_or_else(|| block.header.hash_slow());
let transactions = block.body.iter().map(|tx| tx.hash).collect();
let transactions = block.body.iter().map(|tx| tx.hash()).collect();
Self::from_block_with_transactions(
block_hash,

View File

@ -132,7 +132,7 @@ impl Transaction {
};
Self {
hash: signed_tx.hash,
hash: signed_tx.hash(),
nonce: U256::from(signed_tx.nonce()),
block_hash: None,
block_number: None,

View File

@ -195,7 +195,7 @@ where
&mut all_logs,
&filter,
(block_hash, block.number).into(),
block.body.into_iter().map(|tx| tx.hash).zip(receipts),
block.body.into_iter().map(|tx| tx.hash()).zip(receipts),
false,
);
}
@ -269,7 +269,7 @@ where
&mut all_logs,
&filter_params,
(block_number, block_hash).into(),
block.body.into_iter().map(|tx| tx.hash).zip(receipts),
block.body.into_iter().map(|tx| tx.hash()).zip(receipts),
false,
);

View File

@ -233,7 +233,7 @@ where
for (idx, tx) in transactions.into_iter().enumerate() {
let tx = tx.into_ecrecovered().ok_or(BlockError::InvalidSignature)?;
let tx_info = TransactionInfo {
hash: Some(tx.hash),
hash: Some(tx.hash()),
index: Some(idx as u64),
block_hash: Some(block_hash),
block_number: Some(block_env.number.try_into().unwrap_or(u64::MAX)),

View File

@ -119,7 +119,7 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
// Write transactions
for transaction in block.body {
// Append the transaction
tx_cursor.append(next_tx_num, transaction)?;
tx_cursor.append(next_tx_num, transaction.into())?;
// Increment transaction id for each transaction.
next_tx_num += 1;
}
@ -510,7 +510,7 @@ mod tests {
};
body.tx_num_range().try_for_each(|tx_num| {
let transaction = random_signed_tx();
tx.put::<tables::Transactions>(tx_num, transaction)
tx.put::<tables::Transactions>(tx_num, transaction.into())
})?;
if body.tx_count != 0 {

View File

@ -346,7 +346,7 @@ mod tests {
let rx = runner.execute(input);
let result = rx.await.unwrap();
assert_matches!(result, Ok(ExecOutput {done, stage_progress}) if done == false && stage_progress == 10);
assert_matches!(result, Ok(ExecOutput {done, stage_progress}) if !done && stage_progress == 10);
assert_eq!(runner.tx.table::<tables::HashedAccount>().unwrap().len(), 5);
let fifth_address = runner
.tx
@ -354,8 +354,7 @@ mod tests {
let (address, _) = tx
.cursor_read::<tables::PlainAccountState>()?
.walk(None)?
.skip(5)
.next()
.nth(5)
.unwrap()
.unwrap();
Ok(address)
@ -373,7 +372,7 @@ mod tests {
let rx = runner.execute(input);
let result = rx.await.unwrap();
assert_matches!(result, Ok(ExecOutput {done, stage_progress}) if done == true && stage_progress == 20);
assert_matches!(result, Ok(ExecOutput {done, stage_progress}) if done && stage_progress == 20);
assert_eq!(runner.tx.table::<tables::HashedAccount>().unwrap().len(), 10);
// Validate the stage execution

View File

@ -293,7 +293,7 @@ mod tests {
// first run, hash first half of storages.
let rx = runner.execute(input);
let result = rx.await.unwrap();
assert_matches!(result, Ok(ExecOutput {done, stage_progress}) if done == false && stage_progress == 100);
assert_matches!(result, Ok(ExecOutput {done, stage_progress}) if !done && stage_progress == 100);
assert_eq!(runner.tx.table::<tables::HashedStorage>().unwrap().len(), 500);
let (progress_address, progress_key) = runner
.tx
@ -301,8 +301,7 @@ mod tests {
let (address, entry) = tx
.cursor_read::<tables::PlainStorageState>()?
.walk(None)?
.skip(500)
.next()
.nth(500)
.unwrap()
.unwrap();
Ok((address, entry.key))
@ -325,7 +324,7 @@ mod tests {
runner.set_commit_threshold(2);
let rx = runner.execute(input);
let result = rx.await.unwrap();
assert_matches!(result, Ok(ExecOutput {done, stage_progress}) if done == false && stage_progress == 100);
assert_matches!(result, Ok(ExecOutput {done, stage_progress}) if !done && stage_progress == 100);
assert_eq!(runner.tx.table::<tables::HashedStorage>().unwrap().len(), 502);
let (progress_address, progress_key) = runner
.tx
@ -333,8 +332,7 @@ mod tests {
let (address, entry) = tx
.cursor_read::<tables::PlainStorageState>()?
.walk(None)?
.skip(502)
.next()
.nth(502)
.unwrap()
.unwrap();
Ok((address, entry.key))
@ -359,7 +357,7 @@ mod tests {
let rx = runner.execute(input);
let result = rx.await.unwrap();
assert_matches!(result, Ok(ExecOutput {done, stage_progress}) if done == true && stage_progress == 500);
assert_matches!(result, Ok(ExecOutput {done, stage_progress}) if done && stage_progress == 500);
assert_eq!(
runner.tx.table::<tables::HashedStorage>().unwrap().len(),
runner.tx.table::<tables::PlainStorageState>().unwrap().len()
@ -420,7 +418,7 @@ mod tests {
self.tx.commit(|tx| {
progress.body.iter().try_for_each(|transaction| {
tx.put::<tables::TxHashNumber>(transaction.hash(), next_tx_num)?;
tx.put::<tables::Transactions>(next_tx_num, transaction.clone())?;
tx.put::<tables::Transactions>(next_tx_num, transaction.clone().into())?;
let (addr, _) = accounts
.get_mut(rand::random::<usize>() % n_accounts as usize)

View File

@ -7,7 +7,7 @@ use reth_db::{
transaction::{DbTx, DbTxMut},
RawKey, RawTable, RawValue,
};
use reth_primitives::{TransactionSigned, TxNumber, H160};
use reth_primitives::{keccak256, TransactionSignedNoHash, TxNumber, H160};
use reth_provider::Transaction;
use std::fmt::Debug;
use thiserror::Error;
@ -100,16 +100,20 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
// closure that would recover signer. Used as utility to wrap result
let recover = |entry: Result<
(RawKey<TxNumber>, RawValue<TransactionSigned>),
(RawKey<TxNumber>, RawValue<TransactionSignedNoHash>),
reth_db::Error,
>|
>,
rlp_buf: &mut Vec<u8>|
-> Result<(u64, H160), Box<StageError>> {
let (tx_id, transaction) = entry.map_err(|e| Box::new(e.into()))?;
let tx_id = tx_id.key().expect("key to be formated");
let transaction = transaction.value().expect("value to be formated");
let sender = transaction.recover_signer().ok_or(StageError::from(
SenderRecoveryStageError::SenderRecovery { tx: tx_id },
))?;
let tx = transaction.value().expect("value to be formated");
tx.transaction.encode_without_signature(rlp_buf);
let sender = tx.signature.recover_signer(keccak256(rlp_buf)).ok_or(
StageError::from(SenderRecoveryStageError::SenderRecovery { tx: tx_id }),
)?;
Ok((tx_id, sender))
};
@ -117,8 +121,10 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
// Spawn the sender recovery task onto the global rayon pool
// This task will send the results through the channel after it recovered the senders.
rayon::spawn(move || {
let mut rlp_buf = Vec::with_capacity(128);
for entry in chunk {
let _ = tx.send(recover(entry));
rlp_buf.truncate(0);
let _ = tx.send(recover(entry, &mut rlp_buf));
}
});
}
@ -165,7 +171,7 @@ impl From<SenderRecoveryStageError> for StageError {
mod tests {
use assert_matches::assert_matches;
use reth_interfaces::test_utils::generators::{random_block, random_block_range};
use reth_primitives::{BlockNumber, SealedBlock, H256};
use reth_primitives::{BlockNumber, SealedBlock, TransactionSigned, H256};
use super::*;
use crate::test_utils::{
@ -330,9 +336,10 @@ mod tests {
while let Some((_, body)) = body_cursor.next()? {
for tx_id in body.tx_num_range() {
let transaction = tx
let transaction: TransactionSigned = tx
.get::<tables::Transactions>(tx_id)?
.expect("no transaction entry");
.expect("no transaction entry")
.into();
let signer =
transaction.recover_signer().expect("failed to recover signer");
assert_eq!(Some(signer), tx.get::<tables::TxSenders>(tx_id)?);

View File

@ -1,4 +1,5 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput};
use itertools::Itertools;
use rayon::prelude::*;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
@ -6,7 +7,10 @@ use reth_db::{
tables,
transaction::{DbTx, DbTxMut},
};
use reth_primitives::{rpc_utils::keccak256, BlockNumber, TransactionSignedNoHash, TxNumber, H256};
use reth_provider::Transaction;
use thiserror::Error;
use tokio::sync::mpsc;
use tracing::*;
/// The [`StageId`] of the transaction lookup stage.
@ -19,7 +23,7 @@ pub const TRANSACTION_LOOKUP: StageId = StageId("TransactionLookup");
/// [`tables::TxHashNumber`] This is used for looking up changesets via the transaction hash.
#[derive(Debug, Clone)]
pub struct TransactionLookupStage {
/// The number of table entries to commit at once
/// The number of blocks to commit at once
commit_threshold: u64,
}
@ -43,7 +47,7 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
TRANSACTION_LOOKUP
}
/// Write total difficulty entries
/// Write transaction hash -> id entries
async fn execute(
&mut self,
tx: &mut Transaction<'_, DB>,
@ -58,26 +62,57 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
debug!(target: "sync::stages::transaction_lookup", start_block, end_block, "Commencing sync");
let mut block_meta_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
let (_, first_block) = block_meta_cursor.seek_exact(start_block)?.ok_or(
StageError::from(TransactionLookupStageError::TransactionLookup { block: start_block }),
)?;
let (_, last_block) = block_meta_cursor.seek_exact(end_block)?.ok_or(StageError::from(
TransactionLookupStageError::TransactionLookup { block: end_block },
))?;
let mut tx_cursor = tx.cursor_read::<tables::Transactions>()?;
let tx_walker =
tx_cursor.walk_range(first_block.first_tx_num()..=last_block.last_tx_num())?;
// Walk over block bodies within a specified range.
let bodies = block_meta_cursor.walk_range(start_block..=end_block)?;
let mut channels = Vec::new();
let mut total_transactions = 0;
let mut tx_ranges = Vec::with_capacity((end_block - start_block) as usize);
for chunk in &tx_walker.chunks(100_000 / rayon::current_num_threads()) {
let (tx, rx) = mpsc::unbounded_channel();
channels.push(rx);
for block_meta_entry in bodies {
let (_, block_meta) = block_meta_entry?;
total_transactions += block_meta.tx_count;
tx_ranges.push(block_meta.tx_num_range());
// Note: Unfortunate side-effect of how chunk is designed in itertools (it is not Send)
let chunk: Vec<_> = chunk.collect();
// closure that will calculate the TxHash
let calculate_hash =
|entry: Result<(TxNumber, TransactionSignedNoHash), reth_db::Error>,
rlp_buf: &mut Vec<u8>|
-> Result<(H256, u64), Box<StageError>> {
let (tx_id, tx) = entry.map_err(|e| Box::new(e.into()))?;
tx.transaction.encode_with_signature(&tx.signature, rlp_buf, false);
Ok((H256(keccak256(rlp_buf)), tx_id))
};
// Spawn the task onto the global rayon pool
// This task will send the results through the channel after it has calculated the hash.
rayon::spawn(move || {
let mut rlp_buf = Vec::with_capacity(128);
for entry in chunk {
rlp_buf.truncate(0);
let _ = tx.send(calculate_hash(entry, &mut rlp_buf));
}
});
}
// Collect transactions for each body
let mut tx_list = Vec::with_capacity(total_transactions as usize);
for tx_num_range in tx_ranges {
for tx_entry in tx_cursor.walk_range(tx_num_range)? {
let (id, transaction) = tx_entry?;
tx_list.push((transaction.hash(), id));
let mut tx_list =
Vec::with_capacity((last_block.last_tx_num() - first_block.first_tx_num()) as usize);
// Iterate over channels and append the tx hashes to be sorted out later
for mut channel in channels {
while let Some(tx) = channel.recv().await {
let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
tx_list.push((tx_hash, tx_id));
}
}
@ -131,7 +166,7 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
for tx_id in body.tx_num_range() {
// First delete the transaction and hash to id mapping
if let Some((_, transaction)) = transaction_cursor.seek_exact(tx_id)? {
if tx_hash_number_cursor.seek_exact(transaction.hash)?.is_some() {
if tx_hash_number_cursor.seek_exact(transaction.hash())?.is_some() {
tx_hash_number_cursor.delete_current()?;
}
}
@ -142,6 +177,18 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
}
}
#[derive(Error, Debug)]
enum TransactionLookupStageError {
#[error("Transaction lookup failed to find block {block}.")]
TransactionLookup { block: BlockNumber },
}
impl From<TransactionLookupStageError> for StageError {
fn from(error: TransactionLookupStageError) -> Self {
StageError::Fatal(Box::new(error))
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -316,7 +363,7 @@ mod tests {
.expect("no transaction entry");
assert_eq!(
Some(tx_id),
tx.get::<tables::TxHashNumber>(transaction.hash)?,
tx.get::<tables::TxHashNumber>(transaction.hash())?,
);
}
}

View File

@ -243,7 +243,7 @@ impl TestTransaction {
},
)?;
block.body.iter().try_for_each(|body_tx| {
tx.put::<tables::Transactions>(next_tx_num, body_tx.clone())?;
tx.put::<tables::Transactions>(next_tx_num, body_tx.clone().into())?;
next_tx_num += 1;
Ok(())
})

View File

@ -53,7 +53,7 @@ pub fn generate_from_to(ident: &Ident, fields: &FieldList) -> TokenStream2 {
/// Generates code to implement the `Compact` trait method `to_compact`.
fn generate_from_compact(fields: &FieldList, ident: &Ident) -> Vec<TokenStream2> {
let mut lines = vec![];
let mut known_types = vec!["H256", "H160", "Address", "Bloom", "Vec"];
let mut known_types = vec!["H256", "H160", "Address", "Bloom", "Vec", "TxHash"];
// Only types without `bytes::Bytes` should be added here. It's currently manually added, since
// it's hard to figure out with derive_macro which types have bytes::Bytes fields.

View File

@ -144,7 +144,7 @@ fn should_use_alt_impl(ftype: &String, segment: &syn::PathSegment) -> bool {
if let (Some(path), 1) =
(arg_path.path.segments.first(), arg_path.path.segments.len())
{
if ["H256", "H160", "Address", "Bloom"]
if ["H256", "H160", "Address", "Bloom", "TxHash"]
.contains(&path.ident.to_string().as_str())
{
return true
@ -160,11 +160,11 @@ fn should_use_alt_impl(ftype: &String, segment: &syn::PathSegment) -> bool {
/// length.
pub fn get_bit_size(ftype: &str) -> u8 {
match ftype {
"bool" | "Option" => 1,
"TransactionKind" | "bool" | "Option" | "Signature" => 1,
"TxType" => 2,
"u64" | "BlockNumber" | "TxNumber" | "ChainId" | "NumTransactions" => 4,
"u128" => 5,
"U256" | "TxHash" => 6,
"U256" => 6,
_ => 0,
}
}

View File

@ -48,7 +48,7 @@ impl_compression_for_compact!(
Bytecode,
ProofCheckpoint
);
impl_compression_for_compact!(AccountBeforeTx, TransactionSigned);
impl_compression_for_compact!(AccountBeforeTx, TransactionSignedNoHash);
impl_compression_for_compact!(CompactU256);
macro_rules! impl_compression_fixed_compact {

View File

@ -35,7 +35,7 @@ use crate::{
use reth_primitives::{
trie::{BranchNodeCompact, StorageTrieEntry, StoredNibbles, StoredNibblesSubKey},
Account, Address, BlockHash, BlockNumber, Bytecode, Header, IntegerList, Receipt, StorageEntry,
TransactionSigned, TxHash, TxNumber, H256,
TransactionSignedNoHash, TxHash, TxNumber, H256,
};
/// Enum for the types of tables present in libmdbx.
@ -169,7 +169,7 @@ table!(
table!(
/// (Canonical only) Stores the transaction body for canonical transactions.
( Transactions ) TxNumber | TransactionSigned
( Transactions ) TxNumber | TransactionSignedNoHash
);
table!(

View File

@ -169,7 +169,10 @@ impl<DB: Database> TransactionsProvider for ShareableDatabase<DB> {
}
fn transaction_by_id(&self, id: TxNumber) -> Result<Option<TransactionSigned>> {
self.db.view(|tx| tx.get::<tables::Transactions>(id))?.map_err(Into::into)
self.db
.view(|tx| tx.get::<tables::Transactions>(id))?
.map_err(Into::into)
.map(|tx| tx.map(Into::into))
}
fn transaction_by_hash(&self, hash: TxHash) -> Result<Option<TransactionSigned>> {
@ -182,6 +185,7 @@ impl<DB: Database> TransactionsProvider for ShareableDatabase<DB> {
}
})?
.map_err(Into::into)
.map(|tx| tx.map(Into::into))
}
fn transaction_by_hash_with_meta(
@ -216,7 +220,7 @@ impl<DB: Database> TransactionsProvider for ShareableDatabase<DB> {
block_number,
};
return Ok(Some((transaction, meta)))
return Ok(Some((transaction.into(), meta)))
}
}
}
@ -248,7 +252,7 @@ impl<DB: Database> TransactionsProvider for ShareableDatabase<DB> {
let mut tx_cursor = tx.cursor_read::<tables::Transactions>()?;
let transactions = tx_cursor
.walk_range(tx_range)?
.map(|result| result.map(|(_, tx)| tx))
.map(|result| result.map(|(_, tx)| tx.into()))
.collect::<std::result::Result<Vec<_>, _>>()?;
Ok(Some(transactions))
}
@ -274,7 +278,7 @@ impl<DB: Database> TransactionsProvider for ShareableDatabase<DB> {
results.push(
tx_cursor
.walk_range(tx_num_range)?
.map(|result| result.map(|(_, tx)| tx))
.map(|result| result.map(|(_, tx)| tx.into()))
.collect::<std::result::Result<Vec<_>, _>>()?,
);
}

View File

@ -152,7 +152,7 @@ impl TransactionsProvider for MockEthProvider {
.blocks
.lock()
.iter()
.find_map(|(_, block)| block.body.iter().find(|tx| tx.hash == hash).cloned()))
.find_map(|(_, block)| block.body.iter().find(|tx| tx.hash() == hash).cloned()))
}
fn transaction_by_hash_with_meta(

View File

@ -20,7 +20,8 @@ use reth_db::{
use reth_interfaces::{db::Error as DbError, provider::ProviderError};
use reth_primitives::{
keccak256, Account, Address, BlockHash, BlockNumber, ChainSpec, Hardfork, Header, SealedBlock,
SealedBlockWithSenders, StorageEntry, TransactionSignedEcRecovered, TxNumber, H256, U256,
SealedBlockWithSenders, StorageEntry, TransactionSigned, TransactionSignedEcRecovered,
TxNumber, H256, U256,
};
use reth_trie::{StateRoot, StateRootError};
use std::{
@ -637,8 +638,12 @@ where
}
// Get transactions and senders
let transactions =
self.get_or_take::<tables::Transactions, TAKE>(first_transaction..=last_transaction)?;
let transactions = self
.get_or_take::<tables::Transactions, TAKE>(first_transaction..=last_transaction)?
.into_iter()
.map(|(id, tx)| (id, tx.into()))
.collect::<Vec<(u64, TransactionSigned)>>();
let senders =
self.get_or_take::<tables::TxSenders, TAKE>(first_transaction..=last_transaction)?;
@ -1454,8 +1459,8 @@ mod test {
let get = tx.get_block_and_execution_range(&chain_spec, 1..=1).unwrap();
let get_block = get[0].0.clone();
let get_state = get[0].1.clone();
assert_eq!(get_block, block1.clone());
assert_eq!(get_state, exec_res1.clone());
assert_eq!(get_block, block1);
assert_eq!(get_state, exec_res1);
// take one block
let take = tx.take_block_and_execution_range(&chain_spec, 1..=1).unwrap();
@ -1492,10 +1497,10 @@ mod test {
// get two blocks
let get = tx.get_block_and_execution_range(&chain_spec, 1..=2).unwrap();
assert_eq!(get[0].0, block1.clone());
assert_eq!(get[1].0, block2.clone());
assert_eq!(get[0].1, exec_res1.clone());
assert_eq!(get[1].1, exec_res2.clone());
assert_eq!(get[0].0, block1);
assert_eq!(get[1].0, block2);
assert_eq!(get[0].1, exec_res1);
assert_eq!(get[1].1, exec_res2);
// take two blocks
let get = tx.take_block_and_execution_range(&chain_spec, 1..=2).unwrap();

View File

@ -71,7 +71,7 @@ pub fn insert_block<'a, TX: DbTxMut<'a> + DbTx<'a>>(
for (transaction, sender) in tx_iter {
let hash = transaction.hash();
tx.put::<tables::TxSenders>(next_tx_num, sender)?;
tx.put::<tables::Transactions>(next_tx_num, transaction)?;
tx.put::<tables::Transactions>(next_tx_num, transaction.into())?;
tx.put::<tables::TxHashNumber>(hash, next_tx_num)?;
next_tx_num += 1;
}

View File

@ -367,7 +367,7 @@ impl FromRecoveredTransaction for MockTransaction {
fn from_recovered_transaction(tx: TransactionSignedEcRecovered) -> Self {
let sender = tx.signer();
let transaction = tx.into_signed();
let hash = transaction.hash;
let hash = transaction.hash();
match transaction.transaction {
Transaction::Legacy(TxLegacy {
chain_id,

View File

@ -352,7 +352,7 @@ impl PooledTransaction {
impl PoolTransaction for PooledTransaction {
/// Returns hash of the transaction.
fn hash(&self) -> &TxHash {
&self.transaction.hash
self.transaction.hash_ref()
}
/// Returns the Sender of the transaction.