From c05a90054263f6da4ff581a5a7d933261767f0c5 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Mon, 14 Oct 2024 21:44:28 +0900 Subject: [PATCH] perf(rpc): use `Arc` on cache and rpc (#11635) --- Cargo.lock | 1 + crates/ethereum/payload/src/lib.rs | 2 +- crates/evm/src/lib.rs | 10 +- crates/optimism/evm/src/l1.rs | 13 +- crates/optimism/node/src/txpool.rs | 2 +- crates/optimism/payload/src/builder.rs | 4 +- crates/optimism/rpc/src/eth/block.rs | 2 +- crates/optimism/rpc/src/eth/receipt.rs | 5 +- crates/primitives/src/transaction/mod.rs | 5 + crates/rpc/rpc-eth-api/src/helpers/block.rs | 25 +-- crates/rpc/rpc-eth-api/src/helpers/call.rs | 27 +-- crates/rpc/rpc-eth-api/src/helpers/fee.rs | 14 +- .../rpc-eth-api/src/helpers/pending_block.rs | 2 +- crates/rpc/rpc-eth-api/src/helpers/trace.rs | 10 +- .../rpc-eth-api/src/helpers/transaction.rs | 30 +-- crates/rpc/rpc-eth-types/Cargo.toml | 1 + crates/rpc/rpc-eth-types/src/cache/mod.rs | 182 +++++------------- crates/rpc/rpc-eth-types/src/fee_history.rs | 17 +- crates/rpc/rpc-eth-types/src/gas_oracle.rs | 14 +- crates/rpc/rpc-eth-types/src/logs_utils.rs | 7 +- crates/rpc/rpc/src/debug.rs | 17 +- crates/rpc/rpc/src/eth/filter.rs | 5 +- crates/rpc/rpc/src/trace.rs | 10 +- 23 files changed, 169 insertions(+), 236 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3e3fd18a8..3c5639ead 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8745,6 +8745,7 @@ dependencies = [ "alloy-sol-types", "derive_more 1.0.0", "futures", + "itertools 0.13.0", "jsonrpsee-core", "jsonrpsee-types", "metrics", diff --git a/crates/ethereum/payload/src/lib.rs b/crates/ethereum/payload/src/lib.rs index 5d2fcde2f..97237efa8 100644 --- a/crates/ethereum/payload/src/lib.rs +++ b/crates/ethereum/payload/src/lib.rs @@ -228,7 +228,7 @@ where let env = EnvWithHandlerCfg::new_with_cfg_env( initialized_cfg.clone(), initialized_block_env.clone(), - evm_config.tx_env(&tx), + evm_config.tx_env(tx.as_signed(), tx.signer()), ); // Configure the environment for the block. diff --git a/crates/evm/src/lib.rs b/crates/evm/src/lib.rs index 6fcb3d9f8..e0d45f04c 100644 --- a/crates/evm/src/lib.rs +++ b/crates/evm/src/lib.rs @@ -11,11 +11,9 @@ extern crate alloc; -use core::ops::Deref; - use crate::builder::RethEvmBuilder; use alloy_primitives::{Address, Bytes, B256, U256}; -use reth_primitives::{TransactionSigned, TransactionSignedEcRecovered}; +use reth_primitives::TransactionSigned; use reth_primitives_traits::BlockHeader; use revm::{Database, Evm, GetInspector}; use revm_primitives::{BlockEnv, CfgEnvWithHandlerCfg, Env, EnvWithHandlerCfg, SpecId, TxEnv}; @@ -111,10 +109,10 @@ pub trait ConfigureEvmEnv: Send + Sync + Unpin + Clone + 'static { /// The header type used by the EVM. type Header: BlockHeader; - /// Returns a [`TxEnv`] from a [`TransactionSignedEcRecovered`]. - fn tx_env(&self, transaction: &TransactionSignedEcRecovered) -> TxEnv { + /// Returns a [`TxEnv`] from a [`TransactionSigned`] and [`Address`]. + fn tx_env(&self, transaction: &TransactionSigned, signer: Address) -> TxEnv { let mut tx_env = TxEnv::default(); - self.fill_tx_env(&mut tx_env, transaction.deref(), transaction.signer()); + self.fill_tx_env(&mut tx_env, transaction, signer); tx_env } diff --git a/crates/optimism/evm/src/l1.rs b/crates/optimism/evm/src/l1.rs index 18ccbed95..3412501eb 100644 --- a/crates/optimism/evm/src/l1.rs +++ b/crates/optimism/evm/src/l1.rs @@ -6,7 +6,7 @@ use reth_chainspec::ChainSpec; use reth_execution_errors::BlockExecutionError; use reth_optimism_chainspec::OpChainSpec; use reth_optimism_forks::OptimismHardfork; -use reth_primitives::Block; +use reth_primitives::BlockBody; use revm::{ primitives::{Bytecode, HashMap, SpecId}, DatabaseCommit, L1BlockInfo, @@ -31,9 +31,8 @@ const L1_BLOCK_ECOTONE_SELECTOR: [u8; 4] = hex!("440a5e20"); /// transaction in the L2 block. /// /// Returns an error if the L1 info transaction is not found, if the block is empty. -pub fn extract_l1_info(block: &Block) -> Result { - let l1_info_tx_data = block - .body +pub fn extract_l1_info(body: &BlockBody) -> Result { + let l1_info_tx_data = body .transactions .first() .ok_or_else(|| OptimismBlockExecutionError::L1BlockInfoError { @@ -302,7 +301,7 @@ mod tests { use alloy_eips::eip2718::Decodable2718; use reth_optimism_chainspec::OP_MAINNET; use reth_optimism_forks::OptimismHardforks; - use reth_primitives::{BlockBody, TransactionSigned}; + use reth_primitives::{Block, BlockBody, TransactionSigned}; use super::*; @@ -318,7 +317,7 @@ mod tests { body: BlockBody { transactions: vec![l1_info_tx], ..Default::default() }, }; - let l1_info: L1BlockInfo = extract_l1_info(&mock_block).unwrap(); + let l1_info: L1BlockInfo = extract_l1_info(&mock_block.body).unwrap(); assert_eq!(l1_info.l1_base_fee, U256::from(652_114)); assert_eq!(l1_info.l1_fee_overhead, Some(U256::from(2100))); assert_eq!(l1_info.l1_base_fee_scalar, U256::from(1_000_000)); @@ -358,7 +357,7 @@ mod tests { // test - let l1_block_info: L1BlockInfo = extract_l1_info(&block).unwrap(); + let l1_block_info: L1BlockInfo = extract_l1_info(&block.body).unwrap(); assert_eq!(l1_block_info.l1_base_fee, expected_l1_base_fee); assert_eq!(l1_block_info.l1_base_fee_scalar, expected_l1_base_fee_scalar); diff --git a/crates/optimism/node/src/txpool.rs b/crates/optimism/node/src/txpool.rs index 7ed2a161d..09aa76fef 100644 --- a/crates/optimism/node/src/txpool.rs +++ b/crates/optimism/node/src/txpool.rs @@ -99,7 +99,7 @@ where /// Update the L1 block info. fn update_l1_block_info(&self, block: &Block) { self.block_info.timestamp.store(block.timestamp, Ordering::Relaxed); - if let Ok(cost_addition) = reth_optimism_evm::extract_l1_info(block) { + if let Ok(cost_addition) = reth_optimism_evm::extract_l1_info(&block.body) { *self.block_info.l1_block_info.write() = cost_addition; } } diff --git a/crates/optimism/payload/src/builder.rs b/crates/optimism/payload/src/builder.rs index 5e9c1b5d1..2ff3d3342 100644 --- a/crates/optimism/payload/src/builder.rs +++ b/crates/optimism/payload/src/builder.rs @@ -275,7 +275,7 @@ where let env = EnvWithHandlerCfg::new_with_cfg_env( initialized_cfg.clone(), initialized_block_env.clone(), - evm_config.tx_env(&sequencer_tx), + evm_config.tx_env(sequencer_tx.as_signed(), sequencer_tx.signer()), ); let mut evm = evm_config.evm_with_env(&mut db, env); @@ -356,7 +356,7 @@ where let env = EnvWithHandlerCfg::new_with_cfg_env( initialized_cfg.clone(), initialized_block_env.clone(), - evm_config.tx_env(&tx), + evm_config.tx_env(tx.as_signed(), tx.signer()), ); // Configure the environment for the block. diff --git a/crates/optimism/rpc/src/eth/block.rs b/crates/optimism/rpc/src/eth/block.rs index d5066be0c..dfdd09608 100644 --- a/crates/optimism/rpc/src/eth/block.rs +++ b/crates/optimism/rpc/src/eth/block.rs @@ -45,7 +45,7 @@ where let block = block.unseal(); let l1_block_info = - reth_optimism_evm::extract_l1_info(&block).map_err(OpEthApiError::from)?; + reth_optimism_evm::extract_l1_info(&block.body).map_err(OpEthApiError::from)?; return block .body diff --git a/crates/optimism/rpc/src/eth/receipt.rs b/crates/optimism/rpc/src/eth/receipt.rs index fac8d220c..cc6851041 100644 --- a/crates/optimism/rpc/src/eth/receipt.rs +++ b/crates/optimism/rpc/src/eth/receipt.rs @@ -40,9 +40,8 @@ where meta.block_hash.into(), )))?; - let block = block.unseal(); let l1_block_info = - reth_optimism_evm::extract_l1_info(&block).map_err(OpEthApiError::from)?; + reth_optimism_evm::extract_l1_info(&block.body).map_err(OpEthApiError::from)?; Ok(OpReceiptBuilder::new( &self.inner.provider().chain_spec(), @@ -353,7 +352,7 @@ mod test { }; let l1_block_info = - reth_optimism_evm::extract_l1_info(&block).expect("should extract l1 info"); + reth_optimism_evm::extract_l1_info(&block.body).expect("should extract l1 info"); // test assert!(OP_MAINNET.is_fjord_active_at_timestamp(BLOCK_124665056_TIMESTAMP)); diff --git a/crates/primitives/src/transaction/mod.rs b/crates/primitives/src/transaction/mod.rs index 7ef2c0c1f..3622b9531 100644 --- a/crates/primitives/src/transaction/mod.rs +++ b/crates/primitives/src/transaction/mod.rs @@ -1472,6 +1472,11 @@ impl TransactionSignedEcRecovered { self.signer } + /// Returns a reference to [`TransactionSigned`] + pub const fn as_signed(&self) -> &TransactionSigned { + &self.signed_transaction + } + /// Transform back to [`TransactionSigned`] pub fn into_signed(self) -> TransactionSigned { self.signed_transaction diff --git a/crates/rpc/rpc-eth-api/src/helpers/block.rs b/crates/rpc/rpc-eth-api/src/helpers/block.rs index 898037f4a..5d8496abd 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/block.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/block.rs @@ -64,7 +64,7 @@ pub trait EthBlocks: LoadBlock { } let block = from_block::( - block.unseal(), + (*block).clone().unseal(), total_difficulty.unwrap_or_default(), full.into(), Some(block_hash), @@ -100,10 +100,10 @@ pub trait EthBlocks: LoadBlock { Ok(self .cache() - .get_block_transactions(block_hash) + .get_sealed_block_with_senders(block_hash) .await .map_err(Self::Error::from_eth_err)? - .map(|txs| txs.len())) + .map(|b| b.body.transactions.len())) } } @@ -150,6 +150,7 @@ pub trait EthBlocks: LoadBlock { .get_block_and_receipts(block_hash) .await .map_err(Self::Error::from_eth_err) + .map(|b| b.map(|(b, r)| (b.block.clone(), r))) } Ok(None) @@ -208,23 +209,11 @@ pub trait LoadBlock: LoadPendingBlock + SpawnBlocking { /// Data access in default (L1) trait method implementations. fn cache(&self) -> &EthStateCache; - /// Returns the block object for the given block id. - fn block( - &self, - block_id: BlockId, - ) -> impl Future, Self::Error>> + Send { - async move { - self.block_with_senders(block_id) - .await - .map(|maybe_block| maybe_block.map(|block| block.block)) - } - } - /// Returns the block object for the given block id. fn block_with_senders( &self, block_id: BlockId, - ) -> impl Future, Self::Error>> + Send { + ) -> impl Future>, Self::Error>> + Send { async move { if block_id.is_pending() { // Pending block can be fetched directly without need for caching @@ -232,11 +221,11 @@ pub trait LoadBlock: LoadPendingBlock + SpawnBlocking { .pending_block_with_senders() .map_err(Self::Error::from_eth_err)?; return if maybe_pending.is_some() { - Ok(maybe_pending) + Ok(maybe_pending.map(Arc::new)) } else { // If no pending block from provider, try to get local pending block return match self.local_pending_block().await? { - Some((block, _)) => Ok(Some(block)), + Some((block, _)) => Ok(Some(Arc::new(block))), None => Ok(None), }; }; diff --git a/crates/rpc/rpc-eth-api/src/helpers/call.rs b/crates/rpc/rpc-eth-api/src/helpers/call.rs index 5f6f3d77f..ff6e93006 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/call.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/call.rs @@ -5,7 +5,7 @@ use crate::{ AsEthApiError, FromEthApiError, FromEvmError, FullEthApiTypes, IntoEthApiError, RpcBlock, }; use alloy_eips::{eip1559::calc_next_block_base_fee, eip2930::AccessListResult}; -use alloy_primitives::{Bytes, TxKind, B256, U256}; +use alloy_primitives::{Address, Bytes, TxKind, B256, U256}; use alloy_rpc_types::{ simulate::{SimBlock, SimulatePayload, SimulatedBlock}, state::{EvmOverrides, StateOverride}, @@ -20,7 +20,7 @@ use reth_primitives::{ BlockEnv, CfgEnvWithHandlerCfg, EnvWithHandlerCfg, ExecutionResult, HaltReason, ResultAndState, TransactTo, TxEnv, }, - Header, TransactionSignedEcRecovered, + Header, TransactionSigned, }; use reth_provider::{BlockIdReader, ChainSpecProvider, HeaderProvider, StateProvider}; use reth_revm::{database::StateProviderDatabase, db::CacheDB, DatabaseRef}; @@ -92,7 +92,8 @@ pub trait EthCall: Call + LoadPendingBlock { // Gas cap for entire operation let total_gas_limit = self.call_gas_limit(); - let base_block = self.block(block).await?.ok_or(EthApiError::HeaderNotFound(block))?; + let base_block = + self.block_with_senders(block).await?.ok_or(EthApiError::HeaderNotFound(block))?; let mut parent_hash = base_block.header.hash(); let total_difficulty = LoadPendingBlock::provider(self) .header_td_by_number(block_env.number.to()) @@ -292,12 +293,12 @@ pub trait EthCall: Call + LoadPendingBlock { if replay_block_txs { // only need to replay the transactions in the block if not all transactions are // to be replayed - let transactions = block.into_transactions_ecrecovered().take(num_txs); - for tx in transactions { + let transactions = block.transactions_with_sender().take(num_txs); + for (signer, tx) in transactions { let env = EnvWithHandlerCfg::new_with_cfg_env( cfg.clone(), block_env.clone(), - Call::evm_config(&this).tx_env(&tx), + Call::evm_config(&this).tx_env(tx, *signer), ); let (res, _) = this.transact(&mut db, env)?; db.commit(res.state); @@ -605,11 +606,11 @@ pub trait Call: LoadState + SpawnBlocking { // we need to get the state of the parent block because we're essentially replaying the // block the transaction is included in let parent_block = block.parent_hash; - let block_txs = block.into_transactions_ecrecovered(); let this = self.clone(); self.spawn_with_state_at_block(parent_block.into(), move |state| { let mut db = CacheDB::new(StateProviderDatabase::new(state)); + let block_txs = block.transactions_with_sender(); // replay all transactions prior to the targeted transaction this.replay_transactions_until( @@ -623,7 +624,7 @@ pub trait Call: LoadState + SpawnBlocking { let env = EnvWithHandlerCfg::new_with_cfg_env( cfg, block_env, - Call::evm_config(&this).tx_env(&tx), + Call::evm_config(&this).tx_env(tx.as_signed(), tx.signer()), ); let (res, _) = this.transact(&mut db, env)?; @@ -641,30 +642,30 @@ pub trait Call: LoadState + SpawnBlocking { /// /// Note: This assumes the target transaction is in the given iterator. /// Returns the index of the target transaction in the given iterator. - fn replay_transactions_until( + fn replay_transactions_until<'a, DB, I>( &self, db: &mut CacheDB, cfg: CfgEnvWithHandlerCfg, block_env: BlockEnv, - transactions: impl IntoIterator, + transactions: I, target_tx_hash: B256, ) -> Result where DB: DatabaseRef, EthApiError: From, + I: IntoIterator, { let env = EnvWithHandlerCfg::new_with_cfg_env(cfg, block_env, Default::default()); let mut evm = self.evm_config().evm_with_env(db, env); let mut index = 0; - for tx in transactions { + for (sender, tx) in transactions { if tx.hash() == target_tx_hash { // reached the target transaction break } - let sender = tx.signer(); - self.evm_config().fill_tx_env(evm.tx_mut(), &tx.into_signed(), sender); + self.evm_config().fill_tx_env(evm.tx_mut(), tx, *sender); evm.transact_commit().map_err(Self::Error::from_evm_err)?; index += 1; } diff --git a/crates/rpc/rpc-eth-api/src/helpers/fee.rs b/crates/rpc/rpc-eth-api/src/helpers/fee.rs index 52ccabebf..34ba6dc7e 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/fee.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/fee.rs @@ -172,8 +172,8 @@ pub trait EthFees: LoadFee { // Percentiles were specified, so we need to collect reward percentile ino if let Some(percentiles) = &reward_percentiles { - let (transactions, receipts) = LoadFee::cache(self) - .get_transactions_and_receipts(header.hash()) + let (block, receipts) = LoadFee::cache(self) + .get_block_and_receipts(header.hash()) .await .map_err(Self::Error::from_eth_err)? .ok_or(EthApiError::InvalidBlockRange)?; @@ -182,7 +182,7 @@ pub trait EthFees: LoadFee { percentiles, header.gas_used, header.base_fee_per_gas.unwrap_or_default(), - &transactions, + &block.body.transactions, &receipts, ) .unwrap_or_default(), @@ -296,7 +296,7 @@ pub trait LoadFee: LoadBlock { None => { // fetch pending base fee let base_fee = self - .block(BlockNumberOrTag::Pending.into()) + .block_with_senders(BlockNumberOrTag::Pending.into()) .await? .ok_or(EthApiError::HeaderNotFound(BlockNumberOrTag::Pending.into()))? .base_fee_per_gas @@ -332,7 +332,7 @@ pub trait LoadFee: LoadBlock { /// /// See also: fn gas_price(&self) -> impl Future> + Send { - let header = self.block(BlockNumberOrTag::Latest.into()); + let header = self.block_with_senders(BlockNumberOrTag::Latest.into()); let suggested_tip = self.suggested_priority_fee(); async move { let (header, suggested_tip) = futures::try_join!(header, suggested_tip)?; @@ -344,9 +344,9 @@ pub trait LoadFee: LoadBlock { /// Returns a suggestion for a base fee for blob transactions. fn blob_base_fee(&self) -> impl Future> + Send { async move { - self.block(BlockNumberOrTag::Latest.into()) + self.block_with_senders(BlockNumberOrTag::Latest.into()) .await? - .and_then(|h: reth_primitives::SealedBlock| h.next_block_blob_fee()) + .and_then(|h| h.next_block_blob_fee()) .ok_or(EthApiError::ExcessBlobGasNotSet.into()) .map(U256::from) } diff --git a/crates/rpc/rpc-eth-api/src/helpers/pending_block.rs b/crates/rpc/rpc-eth-api/src/helpers/pending_block.rs index 8014851e3..03597289f 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/pending_block.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/pending_block.rs @@ -323,7 +323,7 @@ pub trait LoadPendingBlock: EthApiTypes { let env = Env::boxed( cfg.cfg_env.clone(), block_env.clone(), - Self::evm_config(self).tx_env(&tx), + Self::evm_config(self).tx_env(tx.as_signed(), tx.signer()), ); let mut evm = revm::Evm::builder().with_env(env).with_db(&mut db).build(); diff --git a/crates/rpc/rpc-eth-api/src/helpers/trace.rs b/crates/rpc/rpc-eth-api/src/helpers/trace.rs index 7b5d13b2a..2dc1c5b47 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/trace.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/trace.rs @@ -191,11 +191,11 @@ pub trait Trace: LoadState { // block the transaction is included in let parent_block = block.parent_hash; let parent_beacon_block_root = block.parent_beacon_block_root; - let block_txs = block.into_transactions_ecrecovered(); let this = self.clone(); self.spawn_with_state_at_block(parent_block.into(), move |state| { let mut db = CacheDB::new(StateProviderDatabase::new(state)); + let block_txs = block.transactions_with_sender(); // apply relevant system calls let mut system_caller = SystemCaller::new( @@ -227,7 +227,7 @@ pub trait Trace: LoadState { let env = EnvWithHandlerCfg::new_with_cfg_env( cfg, block_env, - Call::evm_config(&this).tx_env(&tx), + Call::evm_config(&this).tx_env(tx.as_signed(), tx.signer()), ); let (res, _) = this.inspect(StateCacheDbRefMutWrapper(&mut db), env, &mut inspector)?; @@ -356,10 +356,10 @@ pub trait Trace: LoadState { let mut results = Vec::with_capacity(max_transactions); let mut transactions = block - .into_transactions_ecrecovered() + .transactions_with_sender() .take(max_transactions) .enumerate() - .map(|(idx, tx)| { + .map(|(idx, (signer, tx))| { let tx_info = TransactionInfo { hash: Some(tx.hash()), index: Some(idx as u64), @@ -367,7 +367,7 @@ pub trait Trace: LoadState { block_number: Some(block_number), base_fee: Some(base_fee), }; - let tx_env = Trace::evm_config(&this).tx_env(&tx); + let tx_env = Trace::evm_config(&this).tx_env(tx, *signer); (tx_info, tx_env) }) .peekable(); diff --git a/crates/rpc/rpc-eth-api/src/helpers/transaction.rs b/crates/rpc/rpc-eth-api/src/helpers/transaction.rs index 94dd04414..4c2493717 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/transaction.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/transaction.rs @@ -18,6 +18,7 @@ use reth_rpc_eth_types::{ }; use reth_rpc_types_compat::transaction::{from_recovered, from_recovered_with_block_context}; use reth_transaction_pool::{PoolTransaction, TransactionOrigin, TransactionPool}; +use std::sync::Arc; use crate::{FromEthApiError, FullEthApiTypes, IntoEthApiError, RpcReceipt, RpcTransaction}; @@ -79,7 +80,11 @@ pub trait EthTransactions: LoadTransaction { block: B256, ) -> impl Future>, Self::Error>> + Send { async move { - self.cache().get_block_transactions(block).await.map_err(Self::Error::from_eth_err) + self.cache() + .get_sealed_block_with_senders(block) + .await + .map(|b| b.map(|b| b.body.transactions.clone())) + .map_err(Self::Error::from_eth_err) } } @@ -194,7 +199,7 @@ pub trait EthTransactions: LoadTransaction { let block_hash = block.hash(); let block_number = block.number; let base_fee_per_gas = block.base_fee_per_gas; - if let Some(tx) = block.into_transactions_ecrecovered().nth(index) { + if let Some((signer, tx)) = block.transactions_with_sender().nth(index) { let tx_info = TransactionInfo { hash: Some(tx.hash()), block_hash: Some(block_hash), @@ -204,7 +209,8 @@ pub trait EthTransactions: LoadTransaction { }; return Ok(Some(from_recovered_with_block_context::( - tx, tx_info, + tx.clone().with_signer(*signer), + tx_info, ))) } } @@ -270,10 +276,10 @@ pub trait EthTransactions: LoadTransaction { let base_fee_per_gas = block.base_fee_per_gas; block - .into_transactions_ecrecovered() + .transactions_with_sender() .enumerate() - .find(|(_, tx)| tx.signer() == sender && tx.nonce() == nonce) - .map(|(index, tx)| { + .find(|(_, (signer, tx))| **signer == sender && tx.nonce() == nonce) + .map(|(index, (signer, tx))| { let tx_info = TransactionInfo { hash: Some(tx.hash()), block_hash: Some(block_hash), @@ -282,7 +288,8 @@ pub trait EthTransactions: LoadTransaction { index: Some(index as u64), }; from_recovered_with_block_context::( - tx, tx_info, + tx.clone().with_signer(*signer), + tx_info, ) }) }) @@ -544,8 +551,9 @@ pub trait LoadTransaction: SpawnBlocking + FullEthApiTypes { fn transaction_and_block( &self, hash: B256, - ) -> impl Future, Self::Error>> - + Send { + ) -> impl Future< + Output = Result)>, Self::Error>, + > + Send { async move { let (transaction, at) = match self.transaction_by_hash_at(hash).await? { None => return Ok(None), @@ -559,10 +567,10 @@ pub trait LoadTransaction: SpawnBlocking + FullEthApiTypes { }; let block = self .cache() - .get_block_with_senders(block_hash) + .get_sealed_block_with_senders(block_hash) .await .map_err(Self::Error::from_eth_err)?; - Ok(block.map(|block| (transaction, (*block).clone().seal(block_hash)))) + Ok(block.map(|block| (transaction, block))) } } } diff --git a/crates/rpc/rpc-eth-types/Cargo.toml b/crates/rpc/rpc-eth-types/Cargo.toml index 2c6f51ff4..46a0d7b5c 100644 --- a/crates/rpc/rpc-eth-types/Cargo.toml +++ b/crates/rpc/rpc-eth-types/Cargo.toml @@ -58,6 +58,7 @@ derive_more.workspace = true schnellru.workspace = true rand.workspace = true tracing.workspace = true +itertools.workspace = true [dev-dependencies] serde_json.workspace = true diff --git a/crates/rpc/rpc-eth-types/src/cache/mod.rs b/crates/rpc/rpc-eth-types/src/cache/mod.rs index e8353ecc4..cbf05f276 100644 --- a/crates/rpc/rpc-eth-types/src/cache/mod.rs +++ b/crates/rpc/rpc-eth-types/src/cache/mod.rs @@ -7,8 +7,7 @@ use reth_errors::{ProviderError, ProviderResult}; use reth_evm::{provider::EvmEnvProvider, ConfigureEvm}; use reth_execution_types::Chain; use reth_primitives::{ - Block, BlockHashOrNumber, BlockWithSenders, Header, Receipt, SealedBlock, - SealedBlockWithSenders, TransactionSigned, TransactionSignedEcRecovered, + BlockHashOrNumber, Header, Receipt, SealedBlockWithSenders, TransactionSigned, }; use reth_storage_api::{BlockReader, StateProviderFactory, TransactionVariant}; use reth_tasks::{TaskSpawner, TokioTaskExecutor}; @@ -33,13 +32,13 @@ pub mod db; pub mod metrics; pub mod multi_consumer; -/// The type that can send the response to a requested [`Block`] +/// The type that can send the response to a requested [`SealedBlockWithSenders`] type BlockTransactionsResponseSender = oneshot::Sender>>>; -/// The type that can send the response to a requested [`BlockWithSenders`] +/// The type that can send the response to a requested [`SealedBlockWithSenders`] type BlockWithSendersResponseSender = - oneshot::Sender>>>; + oneshot::Sender>>>; /// The type that can send the response to the requested receipts of a block. type ReceiptsResponseSender = oneshot::Sender>>>>; @@ -49,7 +48,7 @@ type EnvResponseSender = oneshot::Sender = MultiConsumerLruCache< B256, - Arc, + Arc, L, Either, >; @@ -142,90 +141,16 @@ impl EthStateCache { this } - /// Requests the [`Block`] for the block hash - /// - /// Returns `None` if the block does not exist. - pub async fn get_block(&self, block_hash: B256) -> ProviderResult> { - let (response_tx, rx) = oneshot::channel(); - let _ = self.to_service.send(CacheAction::GetBlockWithSenders { block_hash, response_tx }); - let block_with_senders_res = - rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?; - - if let Ok(Some(block_with_senders)) = block_with_senders_res { - Ok(Some(block_with_senders.block.clone())) - } else { - Ok(None) - } - } - - /// Requests the [`Block`] for the block hash, sealed with the given block hash. - /// - /// Returns `None` if the block does not exist. - pub async fn get_sealed_block(&self, block_hash: B256) -> ProviderResult> { - Ok(self.get_block(block_hash).await?.map(|block| block.seal(block_hash))) - } - - /// Requests the transactions of the [`Block`] - /// - /// Returns `None` if the block does not exist. - pub async fn get_block_transactions( - &self, - block_hash: B256, - ) -> ProviderResult>> { - let (response_tx, rx) = oneshot::channel(); - let _ = self.to_service.send(CacheAction::GetBlockTransactions { block_hash, response_tx }); - rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)? - } - - /// Requests the ecrecovered transactions of the [`Block`] - /// - /// Returns `None` if the block does not exist. - pub async fn get_block_transactions_ecrecovered( - &self, - block_hash: B256, - ) -> ProviderResult>> { - Ok(self - .get_block_with_senders(block_hash) - .await? - .map(|block| (*block).clone().into_transactions_ecrecovered().collect())) - } - - /// Fetches both transactions and receipts for the given block hash. - pub async fn get_transactions_and_receipts( - &self, - block_hash: B256, - ) -> ProviderResult, Arc>)>> { - let transactions = self.get_block_transactions(block_hash); - let receipts = self.get_receipts(block_hash); - - let (transactions, receipts) = futures::try_join!(transactions, receipts)?; - - Ok(transactions.zip(receipts)) - } - - /// Requests the [`BlockWithSenders`] for the block hash - /// - /// Returns `None` if the block does not exist. - pub async fn get_block_with_senders( - &self, - block_hash: B256, - ) -> ProviderResult>> { - let (response_tx, rx) = oneshot::channel(); - let _ = self.to_service.send(CacheAction::GetBlockWithSenders { block_hash, response_tx }); - rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)? - } - /// Requests the [`SealedBlockWithSenders`] for the block hash /// /// Returns `None` if the block does not exist. pub async fn get_sealed_block_with_senders( &self, block_hash: B256, - ) -> ProviderResult> { - Ok(self - .get_block_with_senders(block_hash) - .await? - .map(|block| (*block).clone().seal(block_hash))) + ) -> ProviderResult>> { + let (response_tx, rx) = oneshot::channel(); + let _ = self.to_service.send(CacheAction::GetBlockWithSenders { block_hash, response_tx }); + rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)? } /// Requests the [Receipt] for the block hash @@ -244,8 +169,8 @@ impl EthStateCache { pub async fn get_block_and_receipts( &self, block_hash: B256, - ) -> ProviderResult>)>> { - let block = self.get_sealed_block(block_hash); + ) -> ProviderResult, Arc>)>> { + let block = self.get_sealed_block_with_senders(block_hash); let receipts = self.get_receipts(block_hash); let (block, receipts) = futures::try_join!(block, receipts)?; @@ -292,7 +217,7 @@ pub(crate) struct EthStateCacheService< LimitReceipts = ByLength, LimitEnvs = ByLength, > where - LimitBlocks: Limiter>, + LimitBlocks: Limiter>, LimitReceipts: Limiter>>, LimitEnvs: Limiter, { @@ -325,7 +250,7 @@ where fn on_new_block( &mut self, block_hash: B256, - res: ProviderResult>>, + res: ProviderResult>>, ) { if let Some(queued) = self.full_block_cache.remove(&block_hash) { // send the response to queued senders @@ -367,7 +292,11 @@ where } } - fn on_reorg_block(&mut self, block_hash: B256, res: ProviderResult>) { + fn on_reorg_block( + &mut self, + block_hash: B256, + res: ProviderResult>, + ) { let res = res.map(|b| b.map(Arc::new)); if let Some(queued) = self.full_block_cache.remove(&block_hash) { // send the response to queued senders @@ -441,7 +370,7 @@ where // Only look in the database to prevent situations where we // looking up the tree is blocking let block_sender = provider - .block_with_senders( + .sealed_block_with_senders( BlockHashOrNumber::Hash(block_hash), TransactionVariant::WithHash, ) @@ -453,36 +382,6 @@ where })); } } - CacheAction::GetBlockTransactions { block_hash, response_tx } => { - // check if block is cached - if let Some(block) = this.full_block_cache.get(&block_hash) { - let _ = response_tx.send(Ok(Some(block.body.transactions.clone()))); - continue - } - - // block is not in the cache, request it if this is the first consumer - if this.full_block_cache.queue(block_hash, Either::Right(response_tx)) { - let provider = this.provider.clone(); - let action_tx = this.action_tx.clone(); - let rate_limiter = this.rate_limiter.clone(); - this.action_task_spawner.spawn_blocking(Box::pin(async move { - // Acquire permit - let _permit = rate_limiter.acquire().await; - // Only look in the database to prevent situations where we - // looking up the tree is blocking - let res = provider - .block_with_senders( - BlockHashOrNumber::Hash(block_hash), - TransactionVariant::WithHash, - ) - .map(|b| b.map(Arc::new)); - let _ = action_tx.send(CacheAction::BlockWithSendersResult { - block_hash, - res, - }); - })); - } - } CacheAction::GetReceipts { block_hash, response_tx } => { // check if block is cached if let Some(receipts) = this.receipts_cache.get(&block_hash).cloned() { @@ -574,7 +473,7 @@ where } CacheAction::CacheNewCanonicalChain { chain_change } => { for block in chain_change.blocks { - this.on_new_block(block.hash(), Ok(Some(Arc::new(block.unseal())))); + this.on_new_block(block.hash(), Ok(Some(Arc::new(block)))); } for block_receipts in chain_change.receipts { @@ -588,7 +487,7 @@ where } CacheAction::RemoveReorgedChain { chain_change } => { for block in chain_change.blocks { - this.on_reorg_block(block.hash(), Ok(Some(block.unseal()))); + this.on_reorg_block(block.hash(), Ok(Some(block))); } for block_receipts in chain_change.receipts { @@ -610,15 +509,36 @@ where /// All message variants sent through the channel enum CacheAction { - GetBlockWithSenders { block_hash: B256, response_tx: BlockWithSendersResponseSender }, - GetBlockTransactions { block_hash: B256, response_tx: BlockTransactionsResponseSender }, - GetEnv { block_hash: B256, response_tx: EnvResponseSender }, - GetReceipts { block_hash: B256, response_tx: ReceiptsResponseSender }, - BlockWithSendersResult { block_hash: B256, res: ProviderResult>> }, - ReceiptsResult { block_hash: B256, res: ProviderResult>>> }, - EnvResult { block_hash: B256, res: Box> }, - CacheNewCanonicalChain { chain_change: ChainChange }, - RemoveReorgedChain { chain_change: ChainChange }, + GetBlockWithSenders { + block_hash: B256, + response_tx: BlockWithSendersResponseSender, + }, + GetEnv { + block_hash: B256, + response_tx: EnvResponseSender, + }, + GetReceipts { + block_hash: B256, + response_tx: ReceiptsResponseSender, + }, + BlockWithSendersResult { + block_hash: B256, + res: ProviderResult>>, + }, + ReceiptsResult { + block_hash: B256, + res: ProviderResult>>>, + }, + EnvResult { + block_hash: B256, + res: Box>, + }, + CacheNewCanonicalChain { + chain_change: ChainChange, + }, + RemoveReorgedChain { + chain_change: ChainChange, + }, } struct BlockReceipts { diff --git a/crates/rpc/rpc-eth-types/src/fee_history.rs b/crates/rpc/rpc-eth-types/src/fee_history.rs index 08ac56845..57dd276e5 100644 --- a/crates/rpc/rpc-eth-types/src/fee_history.rs +++ b/crates/rpc/rpc-eth-types/src/fee_history.rs @@ -73,16 +73,16 @@ impl FeeHistoryCache { } /// Insert block data into the cache. - async fn insert_blocks(&self, blocks: I) + async fn insert_blocks<'a, I>(&self, blocks: I) where - I: IntoIterator>)>, + I: IntoIterator>)>, { let mut entries = self.inner.entries.write().await; let percentiles = self.predefined_percentiles(); // Insert all new blocks and calculate approximated rewards for (block, receipts) in blocks { - let mut fee_history_entry = FeeHistoryEntry::new(&block); + let mut fee_history_entry = FeeHistoryEntry::new(block); fee_history_entry.rewards = calculate_reward_percentiles_for_block( &percentiles, fee_history_entry.gas_used, @@ -237,7 +237,9 @@ pub async fn fee_history_cache_new_blocks_task( tokio::select! { res = &mut fetch_missing_block => { if let Ok(res) = res { - fee_history_cache.insert_blocks(res.into_iter()).await; + fee_history_cache.insert_blocks(res.as_ref() + .map(|(b, r)| (&b.block, r.clone())) + .into_iter()).await; } } event = events.next() => { @@ -245,11 +247,12 @@ pub async fn fee_history_cache_new_blocks_task( // the stream ended, we are done break; }; - let (blocks, receipts): (Vec<_>, Vec<_>) = event - .committed() + + let committed = event .committed(); + let (blocks, receipts): (Vec<_>, Vec<_>) = committed .blocks_and_receipts() .map(|(block, receipts)| { - (block.block.clone(), Arc::new(receipts.iter().flatten().cloned().collect::>())) + (&block.block, Arc::new(receipts.iter().flatten().cloned().collect::>())) }) .unzip(); fee_history_cache.insert_blocks(blocks.into_iter().zip(receipts)).await; diff --git a/crates/rpc/rpc-eth-types/src/gas_oracle.rs b/crates/rpc/rpc-eth-types/src/gas_oracle.rs index 01591bc4d..84e7ab830 100644 --- a/crates/rpc/rpc-eth-types/src/gas_oracle.rs +++ b/crates/rpc/rpc-eth-types/src/gas_oracle.rs @@ -1,16 +1,16 @@ //! An implementation of the eth gas price oracle, used for providing gas price estimates based on //! previous blocks. -use std::fmt::{self, Debug, Formatter}; - use alloy_primitives::{B256, U256}; use alloy_rpc_types::BlockId; use derive_more::{Deref, DerefMut, From, Into}; +use itertools::Itertools; use reth_primitives::{constants::GWEI_TO_WEI, BlockNumberOrTag}; use reth_rpc_server_types::constants; use reth_storage_api::BlockReaderIdExt; use schnellru::{ByLength, LruMap}; use serde::{Deserialize, Serialize}; +use std::fmt::{self, Debug, Formatter}; use tokio::sync::Mutex; use tracing::warn; @@ -212,7 +212,7 @@ where limit: usize, ) -> EthResult)>> { // check the cache (this will hit the disk if the block is not cached) - let mut block = match self.cache.get_block(block_hash).await? { + let block = match self.cache.get_sealed_block_with_senders(block_hash).await? { Some(block) => block, None => return Ok(None), }; @@ -221,11 +221,15 @@ where let parent_hash = block.parent_hash; // sort the functions by ascending effective tip first - block.body.transactions.sort_by_cached_key(|tx| tx.effective_tip_per_gas(base_fee_per_gas)); + let sorted_transactions = block + .body + .transactions + .iter() + .sorted_by_cached_key(|tx| tx.effective_tip_per_gas(base_fee_per_gas)); let mut prices = Vec::with_capacity(limit); - for tx in block.body.transactions() { + for tx in sorted_transactions { let mut effective_gas_tip = None; // ignore transactions with a tip under the configured threshold if let Some(ignore_under) = self.ignore_price { diff --git a/crates/rpc/rpc-eth-types/src/logs_utils.rs b/crates/rpc/rpc-eth-types/src/logs_utils.rs index c64bbe055..205e2bba3 100644 --- a/crates/rpc/rpc-eth-types/src/logs_utils.rs +++ b/crates/rpc/rpc-eth-types/src/logs_utils.rs @@ -6,8 +6,9 @@ use alloy_primitives::TxHash; use alloy_rpc_types::{FilteredParams, Log}; use reth_chainspec::ChainInfo; use reth_errors::ProviderError; -use reth_primitives::{BlockNumHash, Receipt, SealedBlock}; +use reth_primitives::{BlockNumHash, Receipt, SealedBlockWithSenders}; use reth_storage_api::BlockReader; +use std::sync::Arc; /// Returns all matching of a block's receipts when the transaction hashes are known. pub fn matching_block_logs_with_tx_hashes<'a, I>( @@ -50,8 +51,8 @@ where pub enum ProviderOrBlock<'a, P: BlockReader> { /// Provider Provider(&'a P), - /// [`SealedBlock`] - Block(SealedBlock), + /// [`SealedBlockWithSenders`] + Block(Arc), } /// Appends all matching logs of a block's receipts. diff --git a/crates/rpc/rpc/src/debug.rs b/crates/rpc/rpc/src/debug.rs index fbef6f7a7..acf215b3b 100644 --- a/crates/rpc/rpc/src/debug.rs +++ b/crates/rpc/rpc/src/debug.rs @@ -119,7 +119,7 @@ where env: Env::boxed( cfg.cfg_env.clone(), block_env.clone(), - Call::evm_config(this.eth_api()).tx_env(&tx), + Call::evm_config(this.eth_api()).tx_env(tx.as_signed(), tx.signer()), ), handler_cfg: cfg.handler_cfg, }; @@ -219,7 +219,7 @@ where self.trace_block( state_at.into(), - block.into_transactions_ecrecovered().collect(), + (*block).clone().into_transactions_ecrecovered().collect(), cfg, block_env, opts, @@ -245,11 +245,12 @@ where // block the transaction is included in let state_at: BlockId = block.parent_hash.into(); let block_hash = block.hash(); - let block_txs = block.into_transactions_ecrecovered(); let this = self.clone(); self.eth_api() .spawn_with_state_at_block(state_at, move |state| { + let block_txs = block.transactions_with_sender(); + // configure env for the target transaction let tx = transaction.into_recovered(); @@ -267,7 +268,7 @@ where env: Env::boxed( cfg.cfg_env.clone(), block_env, - Call::evm_config(this.eth_api()).tx_env(&tx), + Call::evm_config(this.eth_api()).tx_env(tx.as_signed(), tx.signer()), ), handler_cfg: cfg.handler_cfg, }; @@ -527,15 +528,15 @@ where if replay_block_txs { // only need to replay the transactions in the block if not all transactions are // to be replayed - let transactions = block.into_transactions_ecrecovered().take(num_txs); + let transactions = block.transactions_with_sender().take(num_txs); // Execute all transactions until index - for tx in transactions { + for (signer, tx) in transactions { let env = EnvWithHandlerCfg { env: Env::boxed( cfg.cfg_env.clone(), block_env.clone(), - Call::evm_config(this.eth_api()).tx_env(&tx), + Call::evm_config(this.eth_api()).tx_env(tx, *signer), ), handler_cfg: cfg.handler_cfg, }; @@ -613,7 +614,7 @@ where let _ = block_executor .execute_with_state_closure( - (&block.clone().unseal(), block.difficulty).into(), + (&(*block).clone().unseal(), block.difficulty).into(), |statedb: &State<_>| { codes = statedb .cache diff --git a/crates/rpc/rpc/src/eth/filter.rs b/crates/rpc/rpc/src/eth/filter.rs index 9efecf3da..b136861c7 100644 --- a/crates/rpc/rpc/src/eth/filter.rs +++ b/crates/rpc/rpc/src/eth/filter.rs @@ -19,7 +19,7 @@ use async_trait::async_trait; use jsonrpsee::{core::RpcResult, server::IdProvider}; use reth_chainspec::ChainInfo; use reth_node_api::EthApiTypes; -use reth_primitives::{Receipt, SealedBlock, TransactionSignedEcRecovered}; +use reth_primitives::{Receipt, SealedBlockWithSenders, TransactionSignedEcRecovered}; use reth_provider::{BlockIdReader, BlockReader, EvmEnvProvider, ProviderError}; use reth_rpc_eth_api::{EthFilterApiServer, FullEthApiTypes, RpcTransaction, TransactionCompat}; use reth_rpc_eth_types::{ @@ -534,7 +534,8 @@ where &self, block_num_hash: &BlockNumHash, best_number: u64, - ) -> Result>, Option)>, EthFilterError> { + ) -> Result>, Option>)>, EthFilterError> + { // The last 4 blocks are most likely cached, so we can just fetch them let cached_range = best_number.saturating_sub(4)..=best_number; let receipts_block = if cached_range.contains(&block_num_hash.number) { diff --git a/crates/rpc/rpc/src/trace.rs b/crates/rpc/rpc/src/trace.rs index f7b2f7ab7..05588d4da 100644 --- a/crates/rpc/rpc/src/trace.rs +++ b/crates/rpc/rpc/src/trace.rs @@ -118,14 +118,14 @@ where trace_types: HashSet, block_id: Option, ) -> Result { - let tx = recover_raw_transaction(tx)?; + let tx = recover_raw_transaction(tx)?.into_ecrecovered_transaction(); let (cfg, block, at) = self.eth_api().evm_env_at(block_id.unwrap_or_default()).await?; let env = EnvWithHandlerCfg::new_with_cfg_env( cfg, block, - Call::evm_config(self.eth_api()).tx_env(&tx.into_ecrecovered_transaction()), + Call::evm_config(self.eth_api()).tx_env(tx.as_signed(), tx.signer()), ); let config = TracingInspectorConfig::from_parity_config(&trace_types); @@ -372,7 +372,7 @@ where }, ); - let block = self.eth_api().block(block_id); + let block = self.eth_api().block_with_senders(block_id); let (maybe_traces, maybe_block) = futures::try_join!(traces, block)?; let mut maybe_traces = @@ -468,7 +468,9 @@ where let Some(transactions) = res else { return Ok(None) }; - let Some(block) = self.eth_api().block(block_id).await? else { return Ok(None) }; + let Some(block) = self.eth_api().block_with_senders(block_id).await? else { + return Ok(None) + }; Ok(Some(BlockOpcodeGas { block_hash: block.hash(),