feat: CanonStateNotification for commit and reorg (#2156)

This commit is contained in:
rakita
2023-04-10 13:11:15 +02:00
committed by GitHub
parent cbbd834575
commit 087d0a6317
40 changed files with 801 additions and 617 deletions

4
Cargo.lock generated
View File

@ -4734,6 +4734,7 @@ name = "reth-executor"
version = "0.1.0"
dependencies = [
"aquamarine",
"assert_matches",
"auto_impl",
"hash-db",
"parking_lot 0.12.1",
@ -5005,8 +5006,8 @@ dependencies = [
"reth-revm-primitives",
"reth-rlp",
"reth-tracing",
"revm-primitives",
"thiserror",
"tokio",
"tracing",
"triehash",
]
@ -5109,6 +5110,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-stream",
"tokio-util 0.7.7",
"tower",
"tracing",
]

View File

@ -3,10 +3,11 @@
use crate::dirs::{JwtSecretPath, PlatformPath};
use clap::Args;
use jsonrpsee::server::ServerHandle;
use reth_interfaces::events::ChainEventSubscriptions;
use reth_network_api::{NetworkInfo, Peers};
use reth_primitives::ChainSpec;
use reth_provider::{BlockProvider, EvmEnvProvider, HeaderProvider, StateProviderFactory};
use reth_provider::{
BlockProvider, CanonStateSubscriptions, EvmEnvProvider, HeaderProvider, StateProviderFactory,
};
use reth_rpc::{JwtError, JwtSecret};
use reth_rpc_builder::{
constants, error::RpcError, IpcServerBuilder, RethRpcModule, RpcModuleSelection,
@ -134,7 +135,7 @@ impl RpcServerArgs {
Pool: TransactionPool + Clone + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
Events: ChainEventSubscriptions + Clone + 'static,
Events: CanonStateSubscriptions + Clone + 'static,
{
reth_rpc_builder::launch(
client,

View File

@ -30,7 +30,6 @@ use reth_executor::blockchain_tree::{
};
use reth_interfaces::{
consensus::{Consensus, ForkchoiceState},
events::NewBlockNotificationSink,
p2p::{
bodies::{client::BodiesClient, downloader::BodyDownloader},
headers::{client::StatusUpdater, downloader::HeaderDownloader},
@ -231,11 +230,11 @@ impl Command {
let tree_config = BlockchainTreeConfig::default();
// The size of the broadcast is twice the maximum reorg depth, because at maximum reorg
// depth at least N blocks must be sent at once.
let new_block_notification_sender =
NewBlockNotificationSink::new(tree_config.max_reorg_depth() as usize * 2);
let (canon_state_notification_sender, _receiver) =
tokio::sync::broadcast::channel(tree_config.max_reorg_depth() as usize * 2);
let blockchain_tree = ShareableBlockchainTree::new(BlockchainTree::new(
tree_externals,
new_block_notification_sender.clone(),
canon_state_notification_sender.clone(),
tree_config,
)?);
@ -246,7 +245,7 @@ impl Command {
shareable_db.clone(),
transaction_pool.clone(),
consensus_engine_tx.clone(),
new_block_notification_sender.clone(),
canon_state_notification_sender,
)
.build();

View File

@ -20,6 +20,7 @@ use reth_primitives::{
BlockBody, BlockHash, BlockHashOrNumber, BlockNumber, ChainSpec, Header, SealedBlock,
SealedHeader, H256, U256,
};
use reth_provider::CanonStateNotificationSender;
use reth_transaction_pool::TransactionPool;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::{mpsc::UnboundedSender, RwLock, RwLockReadGuard, RwLockWriteGuard};
@ -31,7 +32,6 @@ mod task;
pub use crate::client::AutoSealClient;
pub use mode::{FixedBlockTimeMiner, MiningMode, ReadyTransactionMiner};
use reth_interfaces::events::NewBlockNotificationSink;
pub use task::MiningTask;
/// A consensus implementation intended for local development and testing purposes.
@ -83,7 +83,7 @@ pub struct AutoSealBuilder<Client, Pool> {
mode: MiningMode,
storage: Storage,
to_engine: UnboundedSender<BeaconEngineMessage>,
new_block_notification_sender: NewBlockNotificationSink,
canon_state_notification: CanonStateNotificationSender,
}
// === impl AutoSealBuilder ===
@ -95,7 +95,7 @@ impl<Client, Pool: TransactionPool> AutoSealBuilder<Client, Pool> {
client: Client,
pool: Pool,
to_engine: UnboundedSender<BeaconEngineMessage>,
new_block_notification_sender: NewBlockNotificationSink,
canon_state_notification: CanonStateNotificationSender,
) -> Self {
let mode = MiningMode::interval(std::time::Duration::from_secs(1));
Self {
@ -105,7 +105,7 @@ impl<Client, Pool: TransactionPool> AutoSealBuilder<Client, Pool> {
pool,
mode,
to_engine,
new_block_notification_sender,
canon_state_notification,
}
}
@ -117,21 +117,14 @@ impl<Client, Pool: TransactionPool> AutoSealBuilder<Client, Pool> {
/// Consumes the type and returns all components
pub fn build(self) -> (AutoSealConsensus, AutoSealClient, MiningTask<Client, Pool>) {
let Self {
client,
consensus,
pool,
mode,
storage,
to_engine,
new_block_notification_sender,
} = self;
let Self { client, consensus, pool, mode, storage, to_engine, canon_state_notification } =
self;
let auto_client = AutoSealClient::new(storage.clone());
let task = MiningTask::new(
Arc::clone(&consensus.chain_spec),
mode,
to_engine,
new_block_notification_sender,
canon_state_notification,
storage,
client,
pool,

View File

@ -1,13 +1,13 @@
use crate::{mode::MiningMode, Storage};
use futures_util::{future::BoxFuture, FutureExt, StreamExt};
use reth_beacon_consensus::BeaconEngineMessage;
use reth_interfaces::{consensus::ForkchoiceState, events::NewBlockNotificationSink};
use reth_interfaces::consensus::ForkchoiceState;
use reth_primitives::{
constants::{EMPTY_RECEIPTS, EMPTY_TRANSACTIONS},
proofs, Block, BlockBody, ChainSpec, Header, IntoRecoveredTransaction, ReceiptWithBloom,
EMPTY_OMMER_ROOT, U256,
SealedBlockWithSenders, EMPTY_OMMER_ROOT, U256,
};
use reth_provider::StateProviderFactory;
use reth_provider::{CanonStateNotificationSender, Chain, StateProviderFactory};
use reth_revm::{
database::{State, SubState},
executor::Executor,
@ -45,7 +45,7 @@ pub struct MiningTask<Client, Pool: TransactionPool> {
/// TODO: ideally this would just be a sender of hashes
to_engine: UnboundedSender<BeaconEngineMessage>,
/// Used to notify consumers of new blocks
new_block_notification_sender: NewBlockNotificationSink,
canon_state_notification: CanonStateNotificationSender,
/// The pipeline events to listen on
pipe_line_events: Option<UnboundedReceiverStream<PipelineEvent>>,
}
@ -58,7 +58,7 @@ impl<Client, Pool: TransactionPool> MiningTask<Client, Pool> {
chain_spec: Arc<ChainSpec>,
miner: MiningMode,
to_engine: UnboundedSender<BeaconEngineMessage>,
new_block_notification_sender: NewBlockNotificationSink,
canon_state_notification: CanonStateNotificationSender,
storage: Storage,
client: Client,
pool: Pool,
@ -71,7 +71,7 @@ impl<Client, Pool: TransactionPool> MiningTask<Client, Pool> {
storage,
pool,
to_engine,
new_block_notification_sender,
canon_state_notification,
queued: Default::default(),
pipe_line_events: None,
}
@ -116,7 +116,7 @@ where
let chain_spec = Arc::clone(&this.chain_spec);
let pool = this.pool.clone();
let mut events = this.pipe_line_events.take();
let new_block_notification_sender = this.new_block_notification_sender.clone();
let canon_state_notification = this.canon_state_notification.clone();
// Create the mining future that creates a block, notifies the engine that drives
// the pipeline
@ -165,24 +165,30 @@ where
trace!(target: "consensus::auto", transactions=?&block.body, "executing transactions");
match executor.execute_transactions(&block, U256::ZERO, None) {
Ok((res, gas_used)) => {
let senders = block
.body
.iter()
.map(|tx| tx.recover_signer())
.collect::<Option<Vec<_>>>()?;
match executor.execute_transactions(&block, U256::ZERO, Some(senders.clone())) {
Ok((post_state, gas_used)) => {
let Block { mut header, body, .. } = block;
// clear all transactions from pool
pool.remove_transactions(body.iter().map(|tx| tx.hash));
header.receipts_root = if res.receipts().is_empty() {
header.receipts_root = if post_state.receipts().is_empty() {
EMPTY_RECEIPTS
} else {
let receipts_with_bloom = res
let receipts_with_bloom = post_state
.receipts()
.iter()
.map(|r| r.clone().into())
.collect::<Vec<ReceiptWithBloom>>();
proofs::calculate_receipt_root(receipts_with_bloom.iter())
};
let transactions = body.clone();
let body =
BlockBody { transactions: body, ommers: vec![], withdrawals: None };
header.gas_used = gas_used;
@ -223,11 +229,25 @@ where
}
}
let header = header.seal_slow();
debug!(target: "consensus::auto", header=?header.hash(), "sending block notification");
// seal the block
let block = Block {
header,
body: transactions,
ommers: vec![],
withdrawals: None,
};
let sealed_block = block.seal_slow();
let sealed_block_with_senders =
SealedBlockWithSenders::new(sealed_block, senders)
.expect("senders are valid");
debug!(target: "consensus::auto", header=?sealed_block_with_senders.hash(), "sending block notification");
let chain =
Arc::new(Chain::new(vec![(sealed_block_with_senders, post_state)]));
// send block notification
let _ = new_block_notification_sender.send(Arc::new(header));
let _ = canon_state_notification
.send(reth_provider::CanonStateNotification::Commit { new: chain });
}
Err(err) => {
warn!(target: "consensus::auto", ?err, "failed to execute block")

View File

@ -31,12 +31,19 @@ pub enum BeaconEngineError {
PayloadBuilderError(#[from] PayloadBuilderError),
/// Pipeline error.
#[error(transparent)]
Pipeline(#[from] PipelineError),
Pipeline(#[from] Box<PipelineError>),
/// Common error. Wrapper around [reth_interfaces::Error].
#[error(transparent)]
Common(#[from] reth_interfaces::Error),
}
// box the pipeline error as it is a large enum.
impl From<PipelineError> for BeaconEngineError {
fn from(e: PipelineError) -> Self {
Self::Pipeline(Box::new(e))
}
}
// for convenience in the beacon engine
impl From<reth_interfaces::db::Error> for BeaconEngineError {
fn from(e: reth_interfaces::db::Error) -> Self {

View File

@ -568,9 +568,7 @@ mod tests {
post_state::PostState,
test_utils::TestExecutorFactory,
};
use reth_interfaces::{
events::NewBlockNotificationSink, sync::NoopSyncStateUpdate, test_utils::TestConsensus,
};
use reth_interfaces::{sync::NoopSyncStateUpdate, test_utils::TestConsensus};
use reth_miner::TestPayloadStore;
use reth_primitives::{ChainSpec, ChainSpecBuilder, SealedBlockWithSenders, H256, MAINNET};
use reth_provider::Transaction;
@ -653,8 +651,9 @@ mod tests {
// Setup blockchain tree
let externals = TreeExternals::new(db.clone(), consensus, executor_factory, chain_spec);
let config = BlockchainTreeConfig::new(1, 2, 3);
let (canon_state_notification_sender, _) = tokio::sync::broadcast::channel(3);
let tree = ShareableBlockchainTree::new(
BlockchainTree::new(externals, NewBlockNotificationSink::new(2), config)
BlockchainTree::new(externals, canon_state_notification_sender, config)
.expect("failed to create tree"),
);
@ -709,7 +708,7 @@ mod tests {
.await;
assert_matches!(
rx.await,
Ok(Err(BeaconEngineError::Pipeline(PipelineError::Stage(StageError::ChannelClosed))))
Ok(Err(BeaconEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed))
);
}
@ -747,7 +746,7 @@ mod tests {
.await;
assert_matches!(
rx.await,
Ok(Err(BeaconEngineError::Pipeline(PipelineError::Stage(StageError::ChannelClosed))))
Ok(Err(BeaconEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed))
);
}
@ -782,7 +781,7 @@ mod tests {
assert_matches!(
rx.await,
Ok(Err(BeaconEngineError::Pipeline(PipelineError::Stage(StageError::ChannelClosed))))
Ok(Err(BeaconEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed))
);
}

View File

@ -46,6 +46,7 @@ reth-interfaces = { path = "../interfaces", features = ["test-utils"] }
reth-primitives = { path = "../primitives", features = ["test-utils"] }
reth-provider = { path = "../storage/provider", features = ["test-utils"] }
parking_lot = "0.12"
assert_matches = "1.5"
[features]
test-utils = []

View File

@ -1,7 +1,8 @@
//! Implementation of [`BlockIndices`] related to [`super::BlockchainTree`]
use super::chain::{BlockChainId, Chain};
use super::chain::BlockChainId;
use reth_primitives::{BlockHash, BlockNumHash, BlockNumber, SealedBlockWithSenders};
use reth_provider::Chain;
use std::collections::{btree_map, hash_map, BTreeMap, BTreeSet, HashMap, HashSet};
/// Internal indices of the blocks and chains.

View File

@ -6,122 +6,52 @@ use crate::{blockchain_tree::PostStateDataRef, post_state::PostState};
use reth_db::database::Database;
use reth_interfaces::{consensus::Consensus, executor::Error as ExecError, Error};
use reth_primitives::{
BlockHash, BlockNumHash, BlockNumber, SealedBlockWithSenders, SealedHeader, U256,
BlockHash, BlockNumber, ForkBlock, SealedBlockWithSenders, SealedHeader, U256,
};
use reth_provider::{
providers::PostStateProvider, BlockExecutor, ExecutorFactory, PostStateDataProvider,
providers::PostStateProvider, BlockExecutor, Chain, ExecutorFactory, PostStateDataProvider,
StateProviderFactory,
};
use std::collections::BTreeMap;
use std::{
collections::BTreeMap,
ops::{Deref, DerefMut},
};
use super::externals::TreeExternals;
/// The ID of a sidechain internally in a [`BlockchainTree`][super::BlockchainTree].
pub(crate) type BlockChainId = u64;
/// A side chain.
///
/// The sidechain contains the state of accounts after execution of its blocks,
/// changesets for those blocks (and their transactions), as well as the blocks themselves.
///
/// Each chain in the tree are identified using a unique ID.
/// A chain if the blockchain tree, that has functionality to execute blocks and append them to the
/// it self.
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct Chain {
/// The state of accounts after execution of the blocks in this chain.
///
/// This state also contains the individual changes that lead to the current state.
state: PostState,
/// The blocks in this chain.
blocks: BTreeMap<BlockNumber, SealedBlockWithSenders>,
/// A mapping of each block number in the chain to the highest transition ID in the chain's
/// state after execution of the block.
///
/// This is used to revert changes in the state until a certain block number when the chain is
/// split.
block_transitions: BTreeMap<BlockNumber, usize>,
pub struct AppendableChain {
chain: Chain,
}
/// Block number and hash of the forked block.
pub type ForkBlock = BlockNumHash;
impl Deref for AppendableChain {
type Target = Chain;
impl Chain {
/// Get the blocks in this chain.
pub fn blocks(&self) -> &BTreeMap<BlockNumber, SealedBlockWithSenders> {
&self.blocks
fn deref(&self) -> &Self::Target {
&self.chain
}
}
impl DerefMut for AppendableChain {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.chain
}
}
impl AppendableChain {
/// Crate a new appendable chain from a given chain.
pub fn new(chain: Chain) -> Self {
Self { chain }
}
/// Get post state of this chain
pub fn state(&self) -> &PostState {
&self.state
}
/// Return block number of the block hash.
pub fn block_number(&self, block_hash: BlockHash) -> Option<BlockNumber> {
self.blocks.iter().find_map(|(num, block)| (block.hash() == block_hash).then_some(*num))
}
/// Return post state of the block at the `block_number` or None if block is not known
pub fn state_at_block(&self, block_number: BlockNumber) -> Option<PostState> {
let mut state = self.state.clone();
if self.tip().number == block_number {
return Some(state)
}
if let Some(&transition_id) = self.block_transitions.get(&block_number) {
state.revert_to(transition_id);
return Some(state)
}
None
}
/// Destructure the chain into its inner components, the blocks and the state.
pub fn into_inner(self) -> (BTreeMap<BlockNumber, SealedBlockWithSenders>, PostState) {
(self.blocks, self.state)
}
/// Get the block at which this chain forked.
pub fn fork_block(&self) -> ForkBlock {
let tip = self.first();
ForkBlock { number: tip.number.saturating_sub(1), hash: tip.parent_hash }
}
/// Get the block number at which this chain forked.
pub fn fork_block_number(&self) -> BlockNumber {
self.first().number.saturating_sub(1)
}
/// Get the block hash at which this chain forked.
pub fn fork_block_hash(&self) -> BlockHash {
self.first().parent_hash
}
/// Get the first block in this chain.
pub fn first(&self) -> &SealedBlockWithSenders {
self.blocks.first_key_value().expect("Chain has at least one block for first").1
}
/// Get the tip of the chain.
///
/// # Note
///
/// Chains always have at least one block.
pub fn tip(&self) -> &SealedBlockWithSenders {
self.blocks.last_key_value().expect("Chain should have at least one block").1
}
/// Create new chain with given blocks and post state.
pub fn new(blocks: Vec<(SealedBlockWithSenders, PostState)>) -> Self {
let mut state = PostState::default();
let mut block_transitions = BTreeMap::new();
let mut block_num_hash = BTreeMap::new();
for (block, block_state) in blocks.into_iter() {
state.extend(block_state);
block_transitions.insert(block.number, state.transitions_count());
block_num_hash.insert(block.number, block);
}
Self { state, block_transitions, blocks: block_num_hash }
/// Get the chain.
pub fn into_inner(self) -> Chain {
self.chain
}
/// Create a new chain that forks off of the canonical chain.
@ -155,7 +85,7 @@ impl Chain {
externals,
)?;
Ok(Self::new(vec![(block.clone(), changeset)]))
Ok(Self { chain: Chain::new(vec![(block.clone(), changeset)]) })
}
/// Create a new chain that forks off of an existing sidechain.
@ -174,15 +104,15 @@ impl Chain {
{
let parent_number = block.number - 1;
let parent = self
.blocks
.blocks()
.get(&parent_number)
.ok_or(ExecError::BlockNumberNotFoundInChain { block_number: parent_number })?;
let revert_to_transition_id = self
.block_transitions
.block_transitions()
.get(&parent.number)
.expect("Should have the transition ID for the parent block");
let mut state = self.state.clone();
let mut state = self.chain.state().clone();
// Revert state to the state after execution of the parent block
state.revert_to(*revert_to_transition_id);
@ -204,9 +134,11 @@ impl Chain {
state.extend(block_state);
let chain = Self {
block_transitions: BTreeMap::from([(block.number, state.transitions_count())]),
state,
blocks: BTreeMap::from([(block.number, block)]),
chain: Chain {
block_transitions: BTreeMap::from([(block.number, state.transitions_count())]),
state,
blocks: BTreeMap::from([(block.number, block)]),
},
};
// If all is okay, return new chain back. Present chain is not modified.
@ -277,238 +209,9 @@ impl Chain {
externals,
)?;
self.state.extend(block_state);
self.block_transitions.insert(block.number, self.state.transitions_count());
let transition_count = self.state.transitions_count();
self.block_transitions.insert(block.number, transition_count);
self.blocks.insert(block.number, block);
Ok(())
}
/// Merge two chains by appending the given chain into the current one.
///
/// The state of accounts for this chain is set to the state of the newest chain.
pub fn append_chain(&mut self, chain: Chain) -> Result<(), Error> {
let chain_tip = self.tip();
if chain_tip.hash != chain.fork_block_hash() {
return Err(ExecError::AppendChainDoesntConnect {
chain_tip: chain_tip.num_hash(),
other_chain_fork: chain.fork_block().into_components(),
}
.into())
}
// Insert blocks from other chain
self.blocks.extend(chain.blocks.into_iter());
let current_transition_count = self.state.transitions_count();
self.state.extend(chain.state);
// Update the block transition mapping, shifting the transition ID by the current number of
// transitions in *this* chain
for (block_number, transition_id) in chain.block_transitions.iter() {
self.block_transitions.insert(*block_number, transition_id + current_transition_count);
}
Ok(())
}
/// Split this chain at the given block.
///
/// The given block will be the first block in the first returned chain.
///
/// If the given block is not found, [`ChainSplit::NoSplitPending`] is returned.
/// Split chain at the number or hash, block with given number will be included at first chain.
/// If any chain is empty (Does not have blocks) None will be returned.
///
/// # Note
///
/// The block number to transition ID mapping is only found in the second chain, making it
/// impossible to perform any state reverts on the first chain.
///
/// The second chain only contains the changes that were reverted on the first chain; however,
/// it retains the up to date state as if the chains were one, i.e. the second chain is an
/// extension of the first.
pub fn split(mut self, split_at: SplitAt) -> ChainSplit {
let chain_tip = *self.blocks.last_entry().expect("chain is never empty").key();
let block_number = match split_at {
SplitAt::Hash(block_hash) => {
let Some(block_number) = self.block_number(block_hash) else { return ChainSplit::NoSplitPending(self)};
// If block number is same as tip whole chain is becoming canonical.
if block_number == chain_tip {
return ChainSplit::NoSplitCanonical(self)
}
block_number
}
SplitAt::Number(block_number) => {
if block_number >= chain_tip {
return ChainSplit::NoSplitCanonical(self)
}
if block_number < *self.blocks.first_entry().expect("chain is never empty").key() {
return ChainSplit::NoSplitPending(self)
}
block_number
}
};
let higher_number_blocks = self.blocks.split_off(&(block_number + 1));
let mut canonical_state = std::mem::take(&mut self.state);
let new_state = canonical_state.split_at(
*self.block_transitions.get(&block_number).expect("Unknown block transition ID"),
);
self.state = new_state;
ChainSplit::Split {
canonical: Chain {
state: canonical_state,
block_transitions: BTreeMap::new(),
blocks: self.blocks,
},
pending: Chain {
state: self.state,
block_transitions: self.block_transitions,
blocks: higher_number_blocks,
},
}
}
}
/// Used in spliting the chain.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum SplitAt {
/// Split at block number.
Number(BlockNumber),
/// Split at block hash.
Hash(BlockHash),
}
/// Result of spliting chain.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ChainSplit {
/// Chain is not splited. Pending chain is returned.
/// Given block split is higher than last block.
/// Or in case of split by hash when hash is unknown.
NoSplitPending(Chain),
/// Chain is not splited. Canonical chain is returned.
/// Given block split is lower than first block.
NoSplitCanonical(Chain),
/// Chain is splited in two.
/// Given block split is contained in first chain.
Split {
/// Left contains lower block number that get canonicalized.
/// And substate is empty and not usable.
canonical: Chain,
/// Right contains higher block number, that is still pending.
/// And substate from original chain is moved here.
pending: Chain,
},
}
#[cfg(test)]
mod tests {
use super::*;
use reth_primitives::{Account, H160, H256};
#[test]
fn chain_append() {
let block = SealedBlockWithSenders::default();
let block1_hash = H256([0x01; 32]);
let block2_hash = H256([0x02; 32]);
let block3_hash = H256([0x03; 32]);
let block4_hash = H256([0x04; 32]);
let mut block1 = block.clone();
let mut block2 = block.clone();
let mut block3 = block.clone();
let mut block4 = block;
block1.block.header.hash = block1_hash;
block2.block.header.hash = block2_hash;
block3.block.header.hash = block3_hash;
block4.block.header.hash = block4_hash;
block3.block.header.header.parent_hash = block2_hash;
let mut chain1 =
Chain { blocks: BTreeMap::from([(1, block1), (2, block2)]), ..Default::default() };
let chain2 =
Chain { blocks: BTreeMap::from([(3, block3), (4, block4)]), ..Default::default() };
assert_eq!(chain1.append_chain(chain2.clone()), Ok(()));
// chain1 got changed so this will fail
assert!(chain1.append_chain(chain2).is_err());
}
#[test]
fn test_number_split() {
let mut base_state = PostState::default();
let account = Account { nonce: 10, ..Default::default() };
base_state.create_account(H160([1; 20]), account);
base_state.finish_transition();
let mut block_state1 = PostState::default();
block_state1.create_account(H160([2; 20]), Account::default());
block_state1.finish_transition();
let mut block_state2 = PostState::default();
block_state2.create_account(H160([3; 20]), Account::default());
block_state2.finish_transition();
let mut block1 = SealedBlockWithSenders::default();
let block1_hash = H256([15; 32]);
block1.number = 1;
block1.hash = block1_hash;
block1.senders.push(H160([4; 20]));
let mut block2 = SealedBlockWithSenders::default();
let block2_hash = H256([16; 32]);
block2.number = 2;
block2.hash = block2_hash;
block2.senders.push(H160([4; 20]));
let chain = Chain::new(vec![
(block1.clone(), block_state1.clone()),
(block2.clone(), block_state2.clone()),
]);
let mut split1_state = chain.state.clone();
let split2_state = split1_state.split_at(*chain.block_transitions.get(&1).unwrap());
let chain_split1 = Chain {
state: split1_state,
block_transitions: BTreeMap::new(),
blocks: BTreeMap::from([(1, block1.clone())]),
};
let chain_split2 = Chain {
state: split2_state,
block_transitions: chain.block_transitions.clone(),
blocks: BTreeMap::from([(2, block2.clone())]),
};
// return tip state
assert_eq!(chain.state_at_block(block2.number), Some(chain.state.clone()));
assert_eq!(chain.state_at_block(block1.number), Some(chain_split1.state.clone()));
// state at unknown block
assert_eq!(chain.state_at_block(100), None);
// split in two
assert_eq!(
chain.clone().split(SplitAt::Hash(block1_hash)),
ChainSplit::Split { canonical: chain_split1, pending: chain_split2 }
);
// split at unknown block hash
assert_eq!(
chain.clone().split(SplitAt::Hash(H256([100; 32]))),
ChainSplit::NoSplitPending(chain.clone())
);
// split at higher number
assert_eq!(
chain.clone().split(SplitAt::Number(10)),
ChainSplit::NoSplitCanonical(chain.clone())
);
// split at lower number
assert_eq!(chain.clone().split(SplitAt::Number(0)), ChainSplit::NoSplitPending(chain));
}
}

View File

@ -1,14 +1,18 @@
//! Implementation of [`BlockchainTree`]
use chain::{BlockChainId, Chain, ForkBlock};
use chain::BlockChainId;
use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx};
use reth_interfaces::{
blockchain_tree::BlockStatus, consensus::Consensus, events::NewBlockNotifications,
executor::Error as ExecError, Error,
blockchain_tree::BlockStatus, consensus::Consensus, executor::Error as ExecError, Error,
};
use reth_primitives::{
BlockHash, BlockNumber, Hardfork, SealedBlock, SealedBlockWithSenders, SealedHeader, U256,
BlockHash, BlockNumber, ForkBlock, Hardfork, SealedBlock, SealedBlockWithSenders, U256,
};
use reth_provider::{
chain::{ChainSplit, SplitAt},
post_state::PostState,
CanonStateNotification, CanonStateNotificationSender, CanonStateNotifications, Chain,
ExecutorFactory, HeaderProvider, Transaction,
};
use reth_provider::{post_state::PostState, ExecutorFactory, HeaderProvider, Transaction};
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
@ -18,7 +22,7 @@ pub mod block_indices;
use block_indices::BlockIndices;
pub mod chain;
use chain::{ChainSplit, SplitAt};
pub use chain::AppendableChain;
pub mod config;
use config::BlockchainTreeConfig;
@ -31,7 +35,6 @@ pub use shareable::ShareableBlockchainTree;
pub mod post_state_data;
pub use post_state_data::{PostStateData, PostStateDataRef};
use reth_interfaces::events::NewBlockNotificationSink;
#[cfg_attr(doc, aquamarine::aquamarine)]
/// Tree of chains and its identifications.
@ -76,7 +79,7 @@ use reth_interfaces::events::NewBlockNotificationSink;
#[derive(Debug)]
pub struct BlockchainTree<DB: Database, C: Consensus, EF: ExecutorFactory> {
/// The tracked chains and their current data.
chains: HashMap<BlockChainId, Chain>,
chains: HashMap<BlockChainId, AppendableChain>,
/// Static blockchain ID generator
block_chain_id_generator: u64,
/// Indices to block and their connection to the canonical chain.
@ -85,15 +88,15 @@ pub struct BlockchainTree<DB: Database, C: Consensus, EF: ExecutorFactory> {
externals: TreeExternals<DB, C, EF>,
/// Tree configuration
config: BlockchainTreeConfig,
/// Unbounded channel for sending new block notifications.
new_block_notification_sender: NewBlockNotificationSink,
/// Broadcast channel for canon state changes notifications.
canon_state_notification_sender: CanonStateNotificationSender,
}
/// A container that wraps chains and block indices to allow searching for block hashes across all
/// sidechains.
pub struct BlockHashes<'a> {
/// The current tracked chains.
pub chains: &'a mut HashMap<BlockChainId, Chain>,
pub chains: &'a mut HashMap<BlockChainId, AppendableChain>,
/// The block indices for all chains.
pub indices: &'a BlockIndices,
}
@ -102,7 +105,7 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
/// Create a new blockchain tree.
pub fn new(
externals: TreeExternals<DB, C, EF>,
new_block_notification_sender: NewBlockNotificationSink,
canon_state_notification_sender: CanonStateNotificationSender,
config: BlockchainTreeConfig,
) -> Result<Self, Error> {
let max_reorg_depth = config.max_reorg_depth();
@ -135,7 +138,7 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
BTreeMap::from_iter(last_canonical_hashes.into_iter()),
),
config,
new_block_notification_sender,
canon_state_notification_sender,
})
}
@ -279,7 +282,7 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
.header(&block.parent_hash)?
.ok_or(ExecError::CanonicalChain { block_hash: block.parent_hash })?
.seal(block.parent_hash);
let chain = Chain::new_canonical_fork(
let chain = AppendableChain::new_canonical_fork(
&block,
&parent_header,
canonical_block_hashes,
@ -348,7 +351,7 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
/// Insert a chain into the tree.
///
/// Inserts a chain into the tree and builds the block indices.
fn insert_chain(&mut self, chain: Chain) -> BlockChainId {
fn insert_chain(&mut self, chain: AppendableChain) -> BlockChainId {
let chain_id = self.block_chain_id_generator;
self.block_chain_id_generator += 1;
self.block_indices.insert_chain(chain_id, &chain);
@ -488,12 +491,18 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
/// Split a sidechain at the given point, and return the canonical part of it.
///
/// The pending part of the chain is reinserted into the tree with the same `chain_id`.
fn split_chain(&mut self, chain_id: BlockChainId, chain: Chain, split_at: SplitAt) -> Chain {
fn split_chain(
&mut self,
chain_id: BlockChainId,
chain: AppendableChain,
split_at: SplitAt,
) -> Chain {
let chain = chain.into_inner();
match chain.split(split_at) {
ChainSplit::Split { canonical, pending } => {
// rest of splited chain is inserted back with same chain_id.
self.block_indices.insert_chain(chain_id, &pending);
self.chains.insert(chain_id, pending);
self.chains.insert(chain_id, AppendableChain::new(pending));
canonical
}
ChainSplit::NoSplitCanonical(canonical) => canonical,
@ -558,10 +567,12 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
// update canonical index
self.block_indices.canonicalize_blocks(new_canon_chain.blocks());
let headers: Vec<Arc<SealedHeader>> =
new_canon_chain.blocks().iter().map(|(_, b)| Arc::new(b.header.clone())).collect();
// if joins to the tip
let chain_action;
// if joins to the tip;
if new_canon_chain.fork_block_hash() == old_tip.hash {
chain_action =
CanonStateNotification::Commit { new: Arc::new(new_canon_chain.clone()) };
// append to database
self.commit_canonical(new_canon_chain)?;
} else {
@ -574,18 +585,22 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
}
let old_canon_chain = self.revert_canonical(canon_fork.number)?;
// state action
chain_action = CanonStateNotification::Reorg {
old: Arc::new(old_canon_chain.clone()),
new: Arc::new(new_canon_chain.clone()),
};
// commit new canonical chain.
self.commit_canonical(new_canon_chain)?;
// insert old canon chain
self.insert_chain(old_canon_chain);
self.insert_chain(AppendableChain::new(old_canon_chain));
}
// Broadcast new canonical blocks.
headers.into_iter().for_each(|header| {
// ignore if receiver is dropped.
let _ = self.new_block_notification_sender.send(header);
});
// send notification
let _ = self.canon_state_notification_sender.send(chain_action);
Ok(())
}
@ -593,8 +608,8 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
/// Subscribe to new blocks events.
///
/// Note: Only canonical blocks are send.
pub fn subscribe_new_blocks(&self) -> NewBlockNotifications {
self.new_block_notification_sender.subscribe()
pub fn subscribe_canon_state(&self) -> CanonStateNotifications {
self.canon_state_notification_sender.subscribe()
}
/// Canonicalize the given chain and commit it to the database.
@ -626,7 +641,7 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
}
self.block_indices.unwind_canonical_chain(unwind_to);
// insert old canonical chain to BlockchainTree.
self.insert_chain(old_canon_chain);
self.insert_chain(AppendableChain::new(old_canon_chain));
Ok(())
}
@ -657,6 +672,7 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
mod tests {
use super::*;
use crate::test_utils::TestExecutorFactory;
use assert_matches::assert_matches;
use reth_db::{
mdbx::{test_utils::create_test_rw_db, Env, WriteMap},
transaction::DbTxMut,
@ -777,9 +793,9 @@ mod tests {
// make tree
let config = BlockchainTreeConfig::new(1, 2, 3);
let mut tree = BlockchainTree::new(externals, NewBlockNotificationSink::new(10), config)
.expect("failed to create tree");
let mut new_block_notification = tree.subscribe_new_blocks();
let (sender, mut canon_notif) = tokio::sync::broadcast::channel(10);
let mut tree =
BlockchainTree::new(externals, sender, config).expect("failed to create tree");
// genesis block 10 is already canonical
assert_eq!(tree.make_canonical(&H256::zero()), Ok(()));
@ -828,12 +844,12 @@ mod tests {
// make block1 canonical
assert_eq!(tree.make_canonical(&block1.hash()), Ok(()));
// check notification
assert_eq!(new_block_notification.try_recv(), Ok(Arc::new(block1.header.clone())));
assert_matches!(canon_notif.try_recv(), Ok(CanonStateNotification::Commit{ new}) if *new.blocks() == BTreeMap::from([(block1.number,block1.clone())]));
// make block2 canonical
// make block2 canonicals
assert_eq!(tree.make_canonical(&block2.hash()), Ok(()));
// check notification.
assert_eq!(new_block_notification.try_recv(), Ok(Arc::new(block2.header.clone())));
assert_matches!(canon_notif.try_recv(), Ok(CanonStateNotification::Commit{ new}) if *new.blocks() == BTreeMap::from([(block2.number,block2.clone())]));
// Trie state:
// b2 (canonical block)
@ -893,7 +909,10 @@ mod tests {
// make b2a canonical
assert_eq!(tree.make_canonical(&block2a_hash), Ok(()));
// check notification.
assert_eq!(new_block_notification.try_recv(), Ok(Arc::new(block2a.header.clone())));
assert_matches!(canon_notif.try_recv(),
Ok(CanonStateNotification::Reorg{ old, new})
if *old.blocks() == BTreeMap::from([(block2.number,block2.clone())])
&& *new.blocks() == BTreeMap::from([(block2a.number,block2a.clone())]));
// Trie state:
// b2a b2 (side chain)
@ -915,9 +934,6 @@ mod tests {
.assert(&tree);
assert_eq!(tree.make_canonical(&block1a_hash), Ok(()));
// check notification.
assert_eq!(new_block_notification.try_recv(), Ok(Arc::new(block1a.header.clone())));
// Trie state:
// b2a b2 (side chain)
// | /
@ -941,13 +957,15 @@ mod tests {
.with_pending_blocks((block1a.number + 1, HashSet::new()))
.assert(&tree);
// check notification.
assert_matches!(canon_notif.try_recv(),
Ok(CanonStateNotification::Reorg{ old, new})
if *old.blocks() == BTreeMap::from([(block1.number,block1.clone()),(block2a.number,block2a.clone())])
&& *new.blocks() == BTreeMap::from([(block1a.number,block1a.clone())]));
// make b2 canonical
assert_eq!(tree.make_canonical(&block2.hash()), Ok(()));
// check notification.
assert_eq!(new_block_notification.try_recv(), Ok(Arc::new(block1.header.clone())));
assert_eq!(new_block_notification.try_recv(), Ok(Arc::new(block2.header.clone())));
// Trie state:
// b2 b2a (side chain)
// | /
@ -967,6 +985,12 @@ mod tests {
.with_pending_blocks((block2.number + 1, HashSet::new()))
.assert(&tree);
// check notification.
assert_matches!(canon_notif.try_recv(),
Ok(CanonStateNotification::Reorg{ old, new})
if *old.blocks() == BTreeMap::from([(block1a.number,block1a.clone())])
&& *new.blocks() == BTreeMap::from([(block1.number,block1.clone()),(block2.number,block2.clone())]));
// finalize b1 that would make b1a removed from tree
tree.finalize_block(11);
// Trie state:
@ -1008,8 +1032,6 @@ mod tests {
// commit b2a
assert_eq!(tree.make_canonical(&block2.hash), Ok(()));
// check notification.
assert_eq!(new_block_notification.try_recv(), Ok(Arc::new(block2.header.clone())));
// Trie state:
// b2 b2a (side chain)
@ -1026,6 +1048,11 @@ mod tests {
.with_pending_blocks((block2.number + 1, HashSet::new()))
.assert(&tree);
// check notification.
assert_matches!(canon_notif.try_recv(),
Ok(CanonStateNotification::Commit{ new})
if *new.blocks() == BTreeMap::from([(block2.number,block2.clone())]));
// update canonical block to b2, this would make b2a be removed
assert_eq!(tree.restore_canonical_hashes(12), Ok(()));
// Trie state:

View File

@ -1,7 +1,6 @@
//! Substate for blockchain trees
use crate::blockchain_tree::chain::ForkBlock;
use reth_primitives::{BlockHash, BlockNumber};
use reth_primitives::{BlockHash, BlockNumber, ForkBlock};
use reth_provider::{post_state::PostState, PostStateDataProvider};
use std::collections::BTreeMap;

View File

@ -4,12 +4,14 @@ use reth_db::database::Database;
use reth_interfaces::{
blockchain_tree::{BlockStatus, BlockchainTreeEngine, BlockchainTreeViewer},
consensus::Consensus,
events::{ChainEventSubscriptions, NewBlockNotifications},
provider::ProviderError,
Error,
};
use reth_primitives::{BlockHash, BlockNumHash, BlockNumber, SealedBlockWithSenders};
use reth_provider::{BlockchainTreePendingStateProvider, ExecutorFactory, PostStateDataProvider};
use reth_provider::{
BlockchainTreePendingStateProvider, CanonStateSubscriptions, ExecutorFactory,
PostStateDataProvider,
};
use std::{
collections::{BTreeMap, HashSet},
sync::Arc,
@ -100,10 +102,10 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTreePendingState
}
}
impl<DB: Database, C: Consensus, EF: ExecutorFactory> ChainEventSubscriptions
impl<DB: Database, C: Consensus, EF: ExecutorFactory> CanonStateSubscriptions
for ShareableBlockchainTree<DB, C, EF>
{
fn subscribe_new_blocks(&self) -> NewBlockNotifications {
self.tree.read().subscribe_new_blocks()
fn subscribe_canon_state(&self) -> reth_provider::CanonStateNotifications {
self.tree.read().subscribe_canon_state()
}
}

View File

@ -1,54 +0,0 @@
use parking_lot::Mutex;
use reth_primitives::SealedHeader;
use std::sync::Arc;
use tokio::sync::broadcast::{error::SendError, Receiver, Sender};
/// New block notification that is Arc around [SealedHeader].
pub type NewBlockNotification = Arc<SealedHeader>;
/// Type alias for a receiver that receives [NewBlockNotification]
pub type NewBlockNotifications = Receiver<NewBlockNotification>;
/// Type alias for a sender that sends [NewBlockNotification]
pub type NewBlockNotificationsSender = Sender<NewBlockNotification>;
/// A type that allows to register chain related event subscriptions.
pub trait ChainEventSubscriptions: Send + Sync {
/// Get notified when a new block was imported.
fn subscribe_new_blocks(&self) -> NewBlockNotifications;
}
/// A shareable Sender that allows to send [NewBlockNotification] to all receivers.
#[derive(Debug, Clone)]
pub struct NewBlockNotificationSink {
inner: Arc<Mutex<Sender<NewBlockNotification>>>,
}
// === impl NewBlockNotificationSink ===
impl NewBlockNotificationSink {
/// Creates a new NewBlockNotificationSink with the given capacity.
// // size of the broadcast is double of max reorg depth because at max reorg depth we can have
// // send at least N block at the time.
pub fn new(capacity: usize) -> Self {
let inner = tokio::sync::broadcast::channel(capacity);
Self { inner: Arc::new(Mutex::new(inner.0)) }
}
/// Attempts to send a value to all active Receiver handles, returning it back if it could not
/// be sent.
pub fn send(
&self,
header: NewBlockNotification,
) -> Result<usize, SendError<NewBlockNotification>> {
let sender = self.inner.lock();
sender.send(header)
}
/// Creates a new Receiver handle that will receive notifications sent after this call to
/// subscribe.
pub fn subscribe(&self) -> Receiver<NewBlockNotification> {
let sender = self.inner.lock();
sender.subscribe()
}
}

View File

@ -20,9 +20,6 @@ pub mod executor;
mod error;
pub use error::{Error, Result};
/// Traits for subscribing to events.
pub mod events;
/// P2P traits.
pub mod p2p;

View File

@ -1,29 +0,0 @@
use crate::events::{ChainEventSubscriptions, NewBlockNotification, NewBlockNotifications};
use async_trait::async_trait;
use reth_primitives::{Header, SealedHeader, H256};
use std::sync::{Arc, Mutex};
use tokio::sync::broadcast::{self, Receiver, Sender};
/// A test ChainEventSubscriptions
#[derive(Clone, Default)]
pub struct TestChainEventSubscriptions {
new_blocks_txs: Arc<Mutex<Vec<Sender<NewBlockNotification>>>>,
}
impl TestChainEventSubscriptions {
/// Adds new block to the queue that can be consumed with
/// [`TestChainEventSubscriptions::subscribe_new_blocks`]
pub fn add_new_block(&mut self, header: SealedHeader) {
let header = Arc::new(header);
self.new_blocks_txs.lock().as_mut().unwrap().retain(|tx| tx.send(header.clone()).is_ok())
}
}
impl ChainEventSubscriptions for TestChainEventSubscriptions {
fn subscribe_new_blocks(&self) -> NewBlockNotifications {
let (new_blocks_tx, new_blocks_rx) = broadcast::channel(100);
self.new_blocks_txs.lock().as_mut().unwrap().push(new_blocks_tx);
new_blocks_rx
}
}

View File

@ -1,12 +1,10 @@
#![allow(unused)]
mod bodies;
mod events;
mod headers;
/// Generators for different data structures like block headers, block bodies and ranges of those.
pub mod generators;
pub use bodies::*;
pub use events::*;
pub use headers::*;

View File

@ -32,7 +32,7 @@ impl BuiltPayload {
}
/// Container type for all components required to build a payload.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PayloadBuilderAttributes {
/// Parent block to build the payload on top
pub(crate) parent: H256,

View File

@ -652,6 +652,9 @@ pub struct BlockNumHash {
pub hash: BlockHash,
}
/// Block number and hash of the forked block.
pub type ForkBlock = BlockNumHash;
impl std::fmt::Debug for BlockNumHash {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("").field(&self.number).field(&self.hash).finish()
@ -659,6 +662,11 @@ impl std::fmt::Debug for BlockNumHash {
}
impl BlockNumHash {
/// Creates a new `BlockNumHash` from a block number and hash.
pub fn new(number: BlockNumber, hash: BlockHash) -> Self {
Self { number, hash }
}
/// Consumes `Self` and returns [`BlockNumber`], [`BlockHash`]
pub fn into_components(self) -> (BlockNumber, BlockHash) {
(self.number, self.hash)
@ -671,6 +679,12 @@ impl From<(BlockNumber, BlockHash)> for BlockNumHash {
}
}
impl From<(BlockHash, BlockNumber)> for BlockNumHash {
fn from(val: (BlockHash, BlockNumber)) -> Self {
BlockNumHash { hash: val.0, number: val.1 }
}
}
/// A response to `GetBlockBodies`, containing bodies if any bodies were found.
///
/// Withdrawals can be optionally included at the end of the RLP encoded message.

View File

@ -40,7 +40,7 @@ pub use account::{Account, Bytecode};
pub use bits::H512;
pub use block::{
Block, BlockBody, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag, BlockWithSenders,
SealedBlock, SealedBlockWithSenders,
ForkBlock, SealedBlock, SealedBlockWithSenders,
};
pub use bloom::Bloom;
pub use chain::{
@ -86,6 +86,8 @@ pub type Address = H160;
pub type TxHash = H256;
/// The sequence number of all existing transactions.
pub type TxNumber = u64;
/// The index of transaction in a block.
pub type TxIndex = u64;
/// Chain identifier type (introduced in EIP-155).
pub type ChainId = u64;
/// An account storage key.

View File

@ -14,3 +14,6 @@ pub mod env;
/// Helpers for type compatibility between reth and revm types
mod compat;
pub use compat::*;
/// Re-exports revm types;
pub use revm::*;

View File

@ -24,9 +24,8 @@
//! Configure only a http server with a selection of [RethRpcModule]s
//!
//! ```
//! use reth_interfaces::events::ChainEventSubscriptions;
//! use reth_network_api::{NetworkInfo, Peers};
//! use reth_provider::{BlockProvider, StateProviderFactory, EvmEnvProvider};
//! use reth_provider::{BlockProvider, CanonStateSubscriptions, StateProviderFactory, EvmEnvProvider};
//! use reth_rpc_builder::{RethRpcModule, RpcModuleBuilder, RpcServerConfig, ServerBuilder, TransportRpcModuleConfig};
//! use reth_tasks::TokioTaskExecutor;
//! use reth_transaction_pool::TransactionPool;
@ -35,7 +34,7 @@
//! Client: BlockProvider + StateProviderFactory + EvmEnvProvider + Clone + Unpin + 'static,
//! Pool: TransactionPool + Clone + 'static,
//! Network: NetworkInfo + Peers + Clone + 'static,
//! Events: ChainEventSubscriptions + Clone + 'static,
//! Events: CanonStateSubscriptions + Clone + 'static,
//! {
//! // configure the rpc module per transport
//! let transports = TransportRpcModuleConfig::default().with_http(vec![
@ -60,10 +59,9 @@ use jsonrpsee::{
server::{IdProvider, Server, ServerHandle},
RpcModule,
};
use reth_interfaces::events::ChainEventSubscriptions;
use reth_ipc::server::IpcServer;
use reth_network_api::{NetworkInfo, Peers};
use reth_provider::{BlockProvider, EvmEnvProvider, StateProviderFactory};
use reth_provider::{BlockProvider, CanonStateSubscriptions, EvmEnvProvider, StateProviderFactory};
use reth_rpc::{
eth::cache::EthStateCache, AdminApi, DebugApi, EthApi, EthFilter, EthPubSub,
EthSubscriptionIdProvider, NetApi, TraceApi, TracingCallGuard, Web3Api,
@ -119,7 +117,7 @@ where
Pool: TransactionPool + Clone + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
Events: ChainEventSubscriptions + Clone + 'static,
Events: CanonStateSubscriptions + Clone + 'static,
{
let module_config = module_config.into();
let server_config = server_config.into();
@ -199,7 +197,7 @@ impl<Client, Pool, Network, Tasks, Events> RpcModuleBuilder<Client, Pool, Networ
/// Configure the event subscriber instance
pub fn with_events<E>(self, events: E) -> RpcModuleBuilder<Client, Pool, Network, Tasks, E>
where
E: ChainEventSubscriptions + 'static,
E: CanonStateSubscriptions + 'static,
{
let Self { client, pool, executor, network, .. } = self;
RpcModuleBuilder { client, network, pool, executor, events }
@ -212,7 +210,7 @@ where
Pool: TransactionPool + Clone + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
Events: ChainEventSubscriptions + Clone + 'static,
Events: CanonStateSubscriptions + Clone + 'static,
{
/// Configures all [RpcModule]s specific to the given [TransportRpcModuleConfig] which can be
/// used to start the transport server(s).
@ -364,7 +362,7 @@ impl RpcModuleSelection {
Pool: TransactionPool + Clone + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
Events: ChainEventSubscriptions + Clone + 'static,
Events: CanonStateSubscriptions + Clone + 'static,
{
let mut registry = RethModuleRegistry::new(client, pool, network, executor, events, config);
registry.module_for(self)
@ -529,7 +527,7 @@ where
Pool: TransactionPool + Clone + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
Events: ChainEventSubscriptions + Clone + 'static,
Events: CanonStateSubscriptions + Clone + 'static,
{
/// Register Eth Namespace
pub fn register_eth(&mut self) -> &mut Self {

View File

@ -1,6 +1,5 @@
use reth_interfaces::test_utils::TestChainEventSubscriptions;
use reth_network_api::test_utils::NoopNetwork;
use reth_provider::test_utils::NoopProvider;
use reth_provider::test_utils::{NoopProvider, TestCanonStateSubscriptions};
use reth_rpc_builder::{
RpcModuleBuilder, RpcModuleSelection, RpcServerConfig, RpcServerHandle,
TransportRpcModuleConfig,
@ -57,12 +56,12 @@ pub fn test_rpc_builder() -> RpcModuleBuilder<
TestPool,
NoopNetwork,
TokioTaskExecutor,
TestChainEventSubscriptions,
TestCanonStateSubscriptions,
> {
RpcModuleBuilder::default()
.with_client(NoopProvider::default())
.with_pool(testing_pool())
.with_network(NoopNetwork::default())
.with_executor(TokioTaskExecutor::default())
.with_events(TestChainEventSubscriptions::default())
.with_events(TestCanonStateSubscriptions::default())
}

View File

@ -42,6 +42,7 @@ async-trait = "0.1"
tokio = { version = "1", features = ["sync"] }
tower = "0.4"
tokio-stream = { version = "0.1", features = ["sync"] }
tokio-util = "0.7"
pin-project = "1.0"
bytes = "1.4"

View File

@ -194,9 +194,9 @@ where
logs_utils::append_matching_block_logs(
&mut all_logs,
&filter,
block_hash,
block.number,
(block_hash, block.number).into(),
block.body.into_iter().map(|tx| tx.hash).zip(receipts),
false,
);
}
}
@ -268,9 +268,9 @@ where
logs_utils::append_matching_block_logs(
&mut all_logs,
&filter_params,
block_hash,
block_number,
(block_number, block_hash).into(),
block.body.into_iter().map(|tx| tx.hash).zip(receipts),
false,
);
// size check

View File

@ -1,19 +1,20 @@
use reth_primitives::{filter::FilteredParams, BlockNumberOrTag, ChainInfo, Receipt, TxHash, U256};
use reth_primitives::{
filter::FilteredParams, BlockNumHash, BlockNumberOrTag, ChainInfo, Receipt, TxHash, U256,
};
use reth_rpc_types::Log;
use revm::primitives::B256 as H256;
/// Returns all matching logs of a block's receipts grouped with the hash of their transaction.
pub(crate) fn matching_block_logs<I>(
filter: &FilteredParams,
block_hash: H256,
block_number: u64,
block: BlockNumHash,
tx_and_receipts: I,
removed: bool,
) -> Vec<Log>
where
I: IntoIterator<Item = (TxHash, Receipt)>,
{
let mut all_logs = Vec::new();
append_matching_block_logs(&mut all_logs, filter, block_hash, block_number, tx_and_receipts);
append_matching_block_logs(&mut all_logs, filter, block, tx_and_receipts, removed);
all_logs
}
@ -21,30 +22,30 @@ where
pub(crate) fn append_matching_block_logs<I>(
all_logs: &mut Vec<Log>,
filter: &FilteredParams,
block_hash: H256,
block_number: u64,
block: BlockNumHash,
tx_and_receipts: I,
removed: bool,
) where
I: IntoIterator<Item = (TxHash, Receipt)>,
{
let block_number_u256 = U256::from(block_number);
let block_number_u256 = U256::from(block.number);
// tracks the index of a log in the entire block
let mut log_index: u32 = 0;
for (transaction_idx, (transaction_hash, receipt)) in tx_and_receipts.into_iter().enumerate() {
let logs = receipt.logs;
for (transaction_log_idx, log) in logs.into_iter().enumerate() {
if log_matches_filter(block_hash, block_number, &log, filter) {
if log_matches_filter(block, &log, filter) {
let log = Log {
address: log.address,
topics: log.topics,
data: log.data,
block_hash: Some(block_hash),
block_hash: Some(block.hash),
block_number: Some(block_number_u256),
transaction_hash: Some(transaction_hash),
transaction_index: Some(U256::from(transaction_idx)),
log_index: Some(U256::from(log_index)),
transaction_log_index: Some(U256::from(transaction_log_idx)),
removed: false,
removed,
};
all_logs.push(log);
}
@ -55,14 +56,13 @@ pub(crate) fn append_matching_block_logs<I>(
/// Returns true if the log matches the filter and should be included
pub(crate) fn log_matches_filter(
block_hash: H256,
block_number: u64,
block: BlockNumHash,
log: &reth_primitives::Log,
params: &FilteredParams,
) -> bool {
if params.filter.is_some() &&
(!params.filter_block_range(block_number) ||
!params.filter_block_hash(block_hash) ||
(!params.filter_block_range(block.number) ||
!params.filter_block_hash(block.hash) ||
!params.filter_address(log) ||
!params.filter_topics(log))
{

View File

@ -1,12 +1,10 @@
//! `eth_` PubSub RPC handler implementation
use crate::eth::{cache::EthStateCache, logs_utils};
use futures::StreamExt;
use jsonrpsee::{types::SubscriptionResult, SubscriptionSink};
use reth_interfaces::events::{ChainEventSubscriptions, NewBlockNotification};
use reth_network_api::NetworkInfo;
use reth_primitives::{filter::FilteredParams, Receipt, TransactionSigned, TxHash};
use reth_provider::{BlockProvider, EvmEnvProvider};
use reth_primitives::{filter::FilteredParams, TxHash};
use reth_provider::{BlockProvider, CanonStateSubscriptions, EvmEnvProvider};
use reth_rpc_api::EthPubSubApiServer;
use reth_rpc_types::{
pubsub::{
@ -74,7 +72,7 @@ impl<Client, Pool, Events, Network> EthPubSubApiServer for EthPubSub<Client, Poo
where
Client: BlockProvider + EvmEnvProvider + Clone + 'static,
Pool: TransactionPool + 'static,
Events: ChainEventSubscriptions + Clone + 'static,
Events: CanonStateSubscriptions + Clone + 'static,
Network: NetworkInfo + Clone + 'static,
{
/// Handler for `eth_subscribe`
@ -104,7 +102,7 @@ async fn handle_accepted<Client, Pool, Events, Network>(
) where
Client: BlockProvider + EvmEnvProvider + Clone + 'static,
Pool: TransactionPool + 'static,
Events: ChainEventSubscriptions + Clone + 'static,
Events: CanonStateSubscriptions + Clone + 'static,
Network: NetworkInfo + Clone + 'static,
{
match kind {
@ -132,7 +130,7 @@ async fn handle_accepted<Client, Pool, Events, Network>(
}
SubscriptionKind::Syncing => {
// get new block subscription
let mut new_blocks = BroadcastStream::new(pubsub.chain_events.subscribe_new_blocks());
let mut canon_state = BroadcastStream::new(pubsub.chain_events.subscribe_canon_state());
// get current sync status
let mut initial_sync_status = pubsub.network.is_syncing();
let current_sub_res = pubsub.sync_status(initial_sync_status).await;
@ -140,7 +138,7 @@ async fn handle_accepted<Client, Pool, Events, Network>(
// send the current status immediately
let _ = accepted_sink.send(&current_sub_res);
while (new_blocks.next().await).is_some() {
while (canon_state.next().await).is_some() {
let current_syncing = pubsub.network.is_syncing();
// Only send a new response if the sync status has changed
if current_syncing != initial_sync_status {
@ -213,50 +211,45 @@ where
impl<Client, Pool, Events, Network> EthPubSubInner<Client, Pool, Events, Network>
where
Client: BlockProvider + EvmEnvProvider + 'static,
Events: ChainEventSubscriptions + 'static,
Events: CanonStateSubscriptions + 'static,
Network: NetworkInfo + 'static,
Pool: 'static,
{
/// Returns a stream that yields all new RPC blocks.
fn into_new_headers_stream(self) -> impl Stream<Item = Header> {
BroadcastStream::new(self.chain_events.subscribe_new_blocks()).map(|new_block| {
let new_block = new_block.expect("new block subscription never ends; qed");
Header::from_primitive_with_hash(new_block.as_ref().clone())
})
BroadcastStream::new(self.chain_events.subscribe_canon_state())
.map(|new_block| {
let new_chain = new_block.expect("new block subscription never ends; qed");
new_chain
.commited()
.map(|c| {
c.blocks()
.iter()
.map(|(_, block)| {
Header::from_primitive_with_hash(block.header.clone())
})
.collect::<Vec<_>>()
})
.unwrap_or_default()
})
.flat_map(futures::stream::iter)
}
/// Returns a stream that yields all logs that match the given filter.
fn into_log_stream(self, filter: FilteredParams) -> impl Stream<Item = Log> {
BroadcastStream::new(self.chain_events.subscribe_new_blocks())
.filter_map(move |new_block| {
Box::pin(get_block_receipts(self.eth_cache.clone(), new_block.ok()))
BroadcastStream::new(self.chain_events.subscribe_canon_state())
.map(move |canon_state| {
canon_state.expect("new block subscription never ends; qed").block_receipts()
})
.flat_map(move |(new_block, transactions, receipts)| {
let block_hash = new_block.hash;
let block_number = new_block.header.number;
.flat_map(futures::stream::iter)
.flat_map(move |(block_receipts, removed)| {
let all_logs = logs_utils::matching_block_logs(
&filter,
block_hash,
block_number,
transactions.into_iter().map(|tx| tx.hash).zip(receipts),
block_receipts.block,
block_receipts.tx_receipts.into_iter(),
removed,
);
futures::stream::iter(all_logs)
})
}
}
/// Helper function for getting block receipts and transactions
async fn get_block_receipts(
eth_cache: EthStateCache,
new_block: Option<NewBlockNotification>,
) -> Option<(NewBlockNotification, Vec<TransactionSigned>, Vec<Receipt>)> {
let Some(new_block) = new_block else { return None; };
let (txs, receipts) = futures::join!(
eth_cache.get_block_transactions(new_block.hash),
eth_cache.get_receipts(new_block.hash)
);
match (txs.ok().flatten(), receipts.ok().flatten()) {
(Some(txs), Some(receipts)) => Some((new_block, txs, receipts)),
_ => None,
}
}

View File

@ -14,10 +14,11 @@ reth-interfaces = { path = "../../interfaces" }
reth-revm-primitives = { path = "../../revm/revm-primitives" }
reth-db = { path = "../db" }
reth-codecs = { path = "../codecs" }
reth-tracing = {path = "../../tracing"}
reth-rlp = {path = "../../rlp"}
reth-tracing = { path = "../../tracing" }
reth-rlp = { path = "../../rlp" }
revm-primitives = "1.1.0"
# async
tokio = { version = "1.21", features = ["sync", "macros", "rt-multi-thread"] }
# trie
cita_trie = "4.0.0"
@ -30,7 +31,6 @@ tracing = "0.1"
thiserror = "1.0.37"
auto_impl = "1.0"
itertools = "0.10"
parking_lot = "0.12"
[dev-dependencies]

View File

@ -0,0 +1,375 @@
//! Contains [Chain], a chain of blocks and their final state.
use crate::PostState;
use reth_interfaces::{executor::Error as ExecError, Error};
use reth_primitives::{
BlockHash, BlockNumHash, BlockNumber, ForkBlock, Receipt, SealedBlockWithSenders, TransitionId,
TxHash,
};
use std::collections::BTreeMap;
/// A chain of blocks and their final state.
///
/// The chain contains the state of accounts after execution of its blocks,
/// changesets for those blocks (and their transactions), as well as the blocks themselves.
///
/// Used inside the BlockchainTree.
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct Chain {
/// The state of accounts after execution of the blocks in this chain.
///
/// This state also contains the individual changes that lead to the current state.
pub state: PostState,
/// The blocks in this chain.
pub blocks: BTreeMap<BlockNumber, SealedBlockWithSenders>,
/// A mapping of each block number in the chain to the highest transition ID in the chain's
/// state after execution of the block.
///
/// This is used to revert changes in the state until a certain block number when the chain is
/// split.
pub block_transitions: BTreeMap<BlockNumber, TransitionId>,
}
impl Chain {
/// Get the blocks in this chain.
pub fn blocks(&self) -> &BTreeMap<BlockNumber, SealedBlockWithSenders> {
&self.blocks
}
/// Get post state of this chain
pub fn state(&self) -> &PostState {
&self.state
}
/// Return block number of the block hash.
pub fn block_number(&self, block_hash: BlockHash) -> Option<BlockNumber> {
self.blocks.iter().find_map(|(num, block)| (block.hash() == block_hash).then_some(*num))
}
/// Return block inner transition ids.
pub fn block_transitions(&self) -> &BTreeMap<BlockNumber, TransitionId> {
&self.block_transitions
}
/// Return post state of the block at the `block_number` or None if block is not known
pub fn state_at_block(&self, block_number: BlockNumber) -> Option<PostState> {
let mut state = self.state.clone();
if self.tip().number == block_number {
return Some(state)
}
if let Some(&transition_id) = self.block_transitions.get(&block_number) {
state.revert_to(transition_id);
return Some(state)
}
None
}
/// Destructure the chain into its inner components, the blocks and the state.
pub fn into_inner(self) -> (BTreeMap<BlockNumber, SealedBlockWithSenders>, PostState) {
(self.blocks, self.state)
}
/// Get the block at which this chain forked.
pub fn fork_block(&self) -> ForkBlock {
let tip = self.first();
ForkBlock { number: tip.number.saturating_sub(1), hash: tip.parent_hash }
}
/// Get the block number at which this chain forked.
pub fn fork_block_number(&self) -> BlockNumber {
self.first().number.saturating_sub(1)
}
/// Get the block hash at which this chain forked.
pub fn fork_block_hash(&self) -> BlockHash {
self.first().parent_hash
}
/// Get the first block in this chain.
pub fn first(&self) -> &SealedBlockWithSenders {
self.blocks.first_key_value().expect("Chain has at least one block for first").1
}
/// Get the tip of the chain.
///
/// # Note
///
/// Chains always have at least one block.
pub fn tip(&self) -> &SealedBlockWithSenders {
self.blocks.last_key_value().expect("Chain should have at least one block").1
}
/// Create new chain with given blocks and post state.
pub fn new(blocks: Vec<(SealedBlockWithSenders, PostState)>) -> Self {
let mut state = PostState::default();
let mut block_transitions = BTreeMap::new();
let mut block_num_hash = BTreeMap::new();
for (block, block_state) in blocks.into_iter() {
state.extend(block_state);
block_transitions.insert(block.number, state.transitions_count());
block_num_hash.insert(block.number, block);
}
Self { state, block_transitions, blocks: block_num_hash }
}
/// Merge two chains by appending the given chain into the current one.
///
/// The state of accounts for this chain is set to the state of the newest chain.
pub fn append_chain(&mut self, chain: Chain) -> Result<(), Error> {
let chain_tip = self.tip();
if chain_tip.hash != chain.fork_block_hash() {
return Err(ExecError::AppendChainDoesntConnect {
chain_tip: chain_tip.num_hash(),
other_chain_fork: chain.fork_block().into_components(),
}
.into())
}
// Insert blocks from other chain
self.blocks.extend(chain.blocks.into_iter());
let current_transition_count = self.state.transitions_count();
self.state.extend(chain.state);
// Update the block transition mapping, shifting the transition ID by the current number of
// transitions in *this* chain
for (block_number, transition_id) in chain.block_transitions.iter() {
self.block_transitions.insert(*block_number, transition_id + current_transition_count);
}
Ok(())
}
/// Get all receipts with attachment.
///
/// Attachment includes block number, block hash, transaction hash and transaction index.
pub fn receipts_with_attachment(&self) -> Vec<BlockReceipts> {
let mut receipt_attch = Vec::new();
let mut receipts = self.state().receipts().iter();
for (block_num, block) in self.blocks().iter() {
let block_num_hash = BlockNumHash::new(*block_num, block.hash());
let mut tx_receipts = Vec::new();
for tx in block.body.iter() {
if let Some(receipt) = receipts.next() {
tx_receipts.push((tx.hash(), receipt.clone()));
}
}
receipt_attch.push(BlockReceipts { block: block_num_hash, tx_receipts });
}
receipt_attch
}
/// Split this chain at the given block.
///
/// The given block will be the first block in the first returned chain.
///
/// If the given block is not found, [`ChainSplit::NoSplitPending`] is returned.
/// Split chain at the number or hash, block with given number will be included at first chain.
/// If any chain is empty (Does not have blocks) None will be returned.
///
/// # Note
///
/// The block number to transition ID mapping is only found in the second chain, making it
/// impossible to perform any state reverts on the first chain.
///
/// The second chain only contains the changes that were reverted on the first chain; however,
/// it retains the up to date state as if the chains were one, i.e. the second chain is an
/// extension of the first.
pub fn split(mut self, split_at: SplitAt) -> ChainSplit {
let chain_tip = *self.blocks.last_entry().expect("chain is never empty").key();
let block_number = match split_at {
SplitAt::Hash(block_hash) => {
let Some(block_number) = self.block_number(block_hash) else { return ChainSplit::NoSplitPending(self)};
// If block number is same as tip whole chain is becoming canonical.
if block_number == chain_tip {
return ChainSplit::NoSplitCanonical(self)
}
block_number
}
SplitAt::Number(block_number) => {
if block_number >= chain_tip {
return ChainSplit::NoSplitCanonical(self)
}
if block_number < *self.blocks.first_entry().expect("chain is never empty").key() {
return ChainSplit::NoSplitPending(self)
}
block_number
}
};
let higher_number_blocks = self.blocks.split_off(&(block_number + 1));
let mut canonical_state = std::mem::take(&mut self.state);
let new_state = canonical_state.split_at(
*self.block_transitions.get(&block_number).expect("Unknown block transition ID"),
);
self.state = new_state;
ChainSplit::Split {
canonical: Chain {
state: canonical_state,
block_transitions: BTreeMap::new(),
blocks: self.blocks,
},
pending: Chain {
state: self.state,
block_transitions: self.block_transitions,
blocks: higher_number_blocks,
},
}
}
}
/// Used to hold receipts and their attachment.
#[derive(Default, Clone, Debug)]
pub struct BlockReceipts {
/// Block identifier
pub block: BlockNumHash,
/// Transaction identifier and receipt.
pub tx_receipts: Vec<(TxHash, Receipt)>,
}
/// Used in spliting the chain.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum SplitAt {
/// Split at block number.
Number(BlockNumber),
/// Split at block hash.
Hash(BlockHash),
}
/// Result of spliting chain.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ChainSplit {
/// Chain is not splited. Pending chain is returned.
/// Given block split is higher than last block.
/// Or in case of split by hash when hash is unknown.
NoSplitPending(Chain),
/// Chain is not splited. Canonical chain is returned.
/// Given block split is lower than first block.
NoSplitCanonical(Chain),
/// Chain is splited in two.
/// Given block split is contained in first chain.
Split {
/// Left contains lower block number that get canonicalized.
/// And substate is empty and not usable.
canonical: Chain,
/// Right contains higher block number, that is still pending.
/// And substate from original chain is moved here.
pending: Chain,
},
}
#[cfg(test)]
mod tests {
use super::*;
use reth_primitives::{Account, H160, H256};
#[test]
fn chain_append() {
let block = SealedBlockWithSenders::default();
let block1_hash = H256([0x01; 32]);
let block2_hash = H256([0x02; 32]);
let block3_hash = H256([0x03; 32]);
let block4_hash = H256([0x04; 32]);
let mut block1 = block.clone();
let mut block2 = block.clone();
let mut block3 = block.clone();
let mut block4 = block;
block1.block.header.hash = block1_hash;
block2.block.header.hash = block2_hash;
block3.block.header.hash = block3_hash;
block4.block.header.hash = block4_hash;
block3.block.header.header.parent_hash = block2_hash;
let mut chain1 =
Chain { blocks: BTreeMap::from([(1, block1), (2, block2)]), ..Default::default() };
let chain2 =
Chain { blocks: BTreeMap::from([(3, block3), (4, block4)]), ..Default::default() };
assert_eq!(chain1.append_chain(chain2.clone()), Ok(()));
// chain1 got changed so this will fail
assert!(chain1.append_chain(chain2).is_err());
}
#[test]
fn test_number_split() {
let mut base_state = PostState::default();
let account = Account { nonce: 10, ..Default::default() };
base_state.create_account(H160([1; 20]), account);
base_state.finish_transition();
let mut block_state1 = PostState::default();
block_state1.create_account(H160([2; 20]), Account::default());
block_state1.finish_transition();
let mut block_state2 = PostState::default();
block_state2.create_account(H160([3; 20]), Account::default());
block_state2.finish_transition();
let mut block1 = SealedBlockWithSenders::default();
let block1_hash = H256([15; 32]);
block1.number = 1;
block1.hash = block1_hash;
block1.senders.push(H160([4; 20]));
let mut block2 = SealedBlockWithSenders::default();
let block2_hash = H256([16; 32]);
block2.number = 2;
block2.hash = block2_hash;
block2.senders.push(H160([4; 20]));
let chain = Chain::new(vec![
(block1.clone(), block_state1.clone()),
(block2.clone(), block_state2.clone()),
]);
let mut split1_state = chain.state.clone();
let split2_state = split1_state.split_at(*chain.block_transitions.get(&1).unwrap());
let chain_split1 = Chain {
state: split1_state,
block_transitions: BTreeMap::new(),
blocks: BTreeMap::from([(1, block1.clone())]),
};
let chain_split2 = Chain {
state: split2_state,
block_transitions: chain.block_transitions.clone(),
blocks: BTreeMap::from([(2, block2.clone())]),
};
// return tip state
assert_eq!(chain.state_at_block(block2.number), Some(chain.state.clone()));
assert_eq!(chain.state_at_block(block1.number), Some(chain_split1.state.clone()));
// state at unknown block
assert_eq!(chain.state_at_block(100), None);
// split in two
assert_eq!(
chain.clone().split(SplitAt::Hash(block1_hash)),
ChainSplit::Split { canonical: chain_split1, pending: chain_split2 }
);
// split at unknown block hash
assert_eq!(
chain.clone().split(SplitAt::Hash(H256([100; 32]))),
ChainSplit::NoSplitPending(chain.clone())
);
// split at higher number
assert_eq!(
chain.clone().split(SplitAt::Number(10)),
ChainSplit::NoSplitCanonical(chain.clone())
);
// split at lower number
assert_eq!(chain.clone().split(SplitAt::Number(0)), ChainSplit::NoSplitPending(chain));
}
}

View File

@ -12,9 +12,10 @@
mod traits;
pub use traits::{
AccountProvider, BlockExecutor, BlockHashProvider, BlockIdProvider, BlockProvider,
BlockchainTreePendingStateProvider, EvmEnvProvider, ExecutorFactory, HeaderProvider,
PostStateDataProvider, ReceiptProvider, StateProvider, StateProviderBox, StateProviderFactory,
TransactionsProvider, WithdrawalsProvider,
BlockchainTreePendingStateProvider, CanonStateNotification, CanonStateNotificationSender,
CanonStateNotifications, CanonStateSubscriptions, EvmEnvProvider, ExecutorFactory,
HeaderProvider, PostStateDataProvider, ReceiptProvider, StateProvider, StateProviderBox,
StateProviderFactory, TransactionsProvider, WithdrawalsProvider,
};
/// Provider trait implementations.
@ -45,3 +46,6 @@ pub mod test_utils;
/// Re-export provider error.
pub use reth_interfaces::provider::ProviderError;
pub mod chain;
pub use chain::Chain;

View File

@ -263,8 +263,8 @@ impl PostState {
}
/// Get the number of transitions causing this [PostState]
pub fn transitions_count(&self) -> usize {
self.current_transition_id as usize
pub fn transitions_count(&self) -> TransitionId {
self.current_transition_id
}
/// Extend this [PostState] with the changes in another [PostState].
@ -291,10 +291,10 @@ impl PostState {
/// The reverted changes are removed from this post-state, and their effects are reverted.
///
/// The reverted changes are returned.
pub fn revert_to(&mut self, transition_id: usize) -> Vec<Change> {
pub fn revert_to(&mut self, transition_id: TransitionId) -> Vec<Change> {
let mut changes_to_revert = Vec::new();
self.changes.retain(|change| {
if change.transition_id() >= transition_id as u64 {
if change.transition_id() >= transition_id {
changes_to_revert.push(change.clone());
false
} else {
@ -322,7 +322,7 @@ impl PostState {
/// 1. This post-state has the changes reverted
/// 2. The returned post-state does *not* have the changes reverted, but only contains the
/// descriptions of the changes that were reverted in the first post-state.
pub fn split_at(&mut self, transition_id: usize) -> Self {
pub fn split_at(&mut self, transition_id: TransitionId) -> Self {
// Clone ourselves
let mut non_reverted_state = self.clone();
@ -913,7 +913,7 @@ mod tests {
assert_eq!(state.transitions_count(), 2);
assert_eq!(state.accounts().len(), 2);
let reverted_changes = state.revert_to(revert_to as usize);
let reverted_changes = state.revert_to(revert_to);
assert_eq!(state.accounts().len(), 1);
assert_eq!(state.transitions_count(), 1);
assert_eq!(reverted_changes.len(), 1);

View File

@ -12,8 +12,8 @@ use reth_primitives::{
use reth_revm_primitives::{
config::revm_spec,
env::{fill_block_env, fill_cfg_and_block_env, fill_cfg_env},
primitives::{BlockEnv, CfgEnv, SpecId},
};
use revm_primitives::{BlockEnv, CfgEnv, SpecId};
use std::{ops::RangeBounds, sync::Arc};
mod state;

View File

@ -0,0 +1,35 @@
use std::sync::{Arc, Mutex};
use tokio::sync::broadcast::{self, Sender};
use crate::{CanonStateNotification, CanonStateNotifications, CanonStateSubscriptions, Chain};
/// A test ChainEventSubscriptions
#[derive(Clone, Default)]
pub struct TestCanonStateSubscriptions {
canon_notif_tx: Arc<Mutex<Vec<Sender<CanonStateNotification>>>>,
}
impl TestCanonStateSubscriptions {
/// Adds new block commit to the queue that can be consumed with
/// [`TestCanonStateSubscriptions::subscribe_canon_state`]
pub fn add_next_commit(&mut self, new: Arc<Chain>) {
let event = CanonStateNotification::Commit { new };
self.canon_notif_tx.lock().as_mut().unwrap().retain(|tx| tx.send(event.clone()).is_ok())
}
/// Adds reorg to the queue that can be consumed with
/// [`TestCanonStateSubscriptions::subscribe_canon_state`]
pub fn add_next_reorg(&mut self, old: Arc<Chain>, new: Arc<Chain>) {
let event = CanonStateNotification::Reorg { old, new };
self.canon_notif_tx.lock().as_mut().unwrap().retain(|tx| tx.send(event.clone()).is_ok())
}
}
impl CanonStateSubscriptions for TestCanonStateSubscriptions {
fn subscribe_canon_state(&self) -> CanonStateNotifications {
let (canon_notif_tx, canon_notif_rx) = broadcast::channel(100);
self.canon_notif_tx.lock().as_mut().unwrap().push(canon_notif_tx);
canon_notif_rx
}
}

View File

@ -10,7 +10,7 @@ use reth_primitives::{
Bytecode, Bytes, ChainInfo, Header, Receipt, StorageKey, StorageValue, TransactionMeta,
TransactionSigned, TxHash, TxNumber, H256, U256,
};
use revm_primitives::{BlockEnv, CfgEnv};
use reth_revm_primitives::primitives::{BlockEnv, CfgEnv};
use std::{
collections::{BTreeMap, HashMap},
ops::RangeBounds,

View File

@ -1,6 +1,8 @@
pub mod blocks;
mod events;
mod mock;
mod noop;
pub use events::TestCanonStateSubscriptions;
pub use mock::{ExtendedAccount, MockEthProvider};
pub use noop::NoopProvider;

View File

@ -9,7 +9,7 @@ use reth_primitives::{
Receipt, StorageKey, StorageValue, TransactionMeta, TransactionSigned, TxHash, TxNumber, H256,
KECCAK_EMPTY, U256,
};
use revm_primitives::{BlockEnv, CfgEnv};
use reth_revm_primitives::primitives::{BlockEnv, CfgEnv};
use std::ops::RangeBounds;
/// Supports various api interfaces for testing purposes.

View File

@ -0,0 +1,86 @@
///! Canonical chain state notification trait and types.
use crate::{chain::BlockReceipts, Chain};
use auto_impl::auto_impl;
use std::sync::Arc;
use tokio::sync::broadcast::{Receiver, Sender};
/// Type alias for a receiver that receives [CanonStateNotification]
pub type CanonStateNotifications = Receiver<CanonStateNotification>;
/// Type alias for a sender that sends [CanonStateNotification]
pub type CanonStateNotificationSender = Sender<CanonStateNotification>;
/// A type that allows to register chain related event subscriptions.
#[auto_impl(&, Arc)]
pub trait CanonStateSubscriptions: Send + Sync {
/// Get notified when a new block was imported.
fn subscribe_canon_state(&self) -> CanonStateNotifications;
}
/// Chain action that is triggered when a new block is imported or old block is reverted.
/// and will return all [`crate::PostState`] and [`reth_primitives::SealedBlockWithSenders`] of both
/// reverted and commited blocks.
#[derive(Clone, Debug)]
#[allow(missing_docs)]
pub enum CanonStateNotification {
/// Chain reorgs and both old and new chain are returned.
Reorg { old: Arc<Chain>, new: Arc<Chain> },
/// Chain got reverted without reorg and only old chain is returned.
Revert { old: Arc<Chain> },
/// Chain got extended without reorg and only new chain is returned.
Commit { new: Arc<Chain> },
}
// For one reason or another, the compiler can't derive PartialEq for CanonStateNotification.
// so we are forced to implement it manually.
impl PartialEq for CanonStateNotification {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::Reorg { old: old1, new: new1 }, Self::Reorg { old: old2, new: new2 }) => {
old1 == old2 && new1 == new2
}
(Self::Revert { old: old1 }, Self::Revert { old: old2 }) => old1 == old2,
(Self::Commit { new: new1 }, Self::Commit { new: new2 }) => new1 == new2,
_ => false,
}
}
}
impl CanonStateNotification {
/// Get old chain if any.
pub fn reverted(&self) -> Option<Arc<Chain>> {
match self {
Self::Reorg { old, .. } => Some(old.clone()),
Self::Revert { old } => Some(old.clone()),
Self::Commit { .. } => None,
}
}
/// Get new chain if any.
pub fn commited(&self) -> Option<Arc<Chain>> {
match self {
Self::Reorg { new, .. } => Some(new.clone()),
Self::Revert { .. } => None,
Self::Commit { new } => Some(new.clone()),
}
}
/// Return receipt with its block number and transaction hash.
///
/// Last boolean is true if receipt is from reverted block.
pub fn block_receipts(&self) -> Vec<(BlockReceipts, bool)> {
let mut receipts = Vec::new();
// get old receipts
if let Some(old) = self.reverted() {
receipts
.extend(old.receipts_with_attachment().into_iter().map(|receipt| (receipt, true)));
}
// get new receipts
if let Some(new) = self.commited() {
receipts
.extend(new.receipts_with_attachment().into_iter().map(|receipt| (receipt, false)));
}
receipts
}
}

View File

@ -1,9 +1,9 @@
use reth_interfaces::Result;
use reth_primitives::{BlockId, Header};
use revm_primitives::{BlockEnv, CfgEnv};
use reth_revm_primitives::primitives::{BlockEnv, CfgEnv};
/// A provider type that knows chain specific information required to configure an
/// [Env](revm_primitives::Env)
/// [Env](reth_revm_primitives::primitives::Env)
///
/// This type is mainly used to provide required data to configure the EVM environment.
#[auto_impl::auto_impl(&, Arc)]

View File

@ -35,3 +35,9 @@ pub use withdrawals::WithdrawalsProvider;
mod executor;
pub use executor::{BlockExecutor, ExecutorFactory};
mod chain;
pub use chain::{
CanonStateNotification, CanonStateNotificationSender, CanonStateNotifications,
CanonStateSubscriptions,
};

View File

@ -543,7 +543,7 @@ where
self.insert_hashes(
fork_block_number,
first_transition_id,
first_transition_id + num_transitions as u64,
first_transition_id + num_transitions,
new_tip_number,
new_tip_hash,
expected_state_root,