feat(execution): state change size based commits (#2494)

This commit is contained in:
Roman Krasiuk
2023-05-05 19:57:16 +03:00
committed by GitHub
parent a00ba2ad7b
commit 09f1a5b640
16 changed files with 479 additions and 180 deletions

1
Cargo.lock generated
View File

@ -5160,6 +5160,7 @@ name = "reth-provider"
version = "0.1.0"
dependencies = [
"auto_impl",
"derive_more",
"itertools",
"parking_lot 0.12.1",
"pin-project",

View File

@ -23,7 +23,10 @@ use reth_staged_sync::{
};
use reth_stages::{
prelude::*,
stages::{ExecutionStage, HeaderSyncMode, SenderRecoveryStage, TotalDifficultyStage},
stages::{
ExecutionStage, ExecutionStageThresholds, HeaderSyncMode, SenderRecoveryStage,
TotalDifficultyStage,
},
};
use std::{path::PathBuf, sync::Arc};
use tokio::sync::watch;
@ -171,7 +174,14 @@ impl ImportCommand {
.set(SenderRecoveryStage {
commit_threshold: config.stages.sender_recovery.commit_threshold,
})
.set(ExecutionStage::new(factory, config.stages.execution.commit_threshold)),
.set(ExecutionStage::new(
factory,
ExecutionStageThresholds {
max_blocks: config.stages.execution.max_blocks,
max_changes: config.stages.execution.max_changes,
max_changesets: config.stages.execution.max_changesets,
},
)),
)
.build(db);

View File

@ -4,7 +4,10 @@ use reth_db::{database::Database, table::TableImporter, tables};
use reth_primitives::{BlockNumber, MAINNET};
use reth_provider::Transaction;
use reth_stages::{
stages::{AccountHashingStage, ExecutionStage, MerkleStage, StorageHashingStage},
stages::{
AccountHashingStage, ExecutionStage, ExecutionStageThresholds, MerkleStage,
StorageHashingStage,
},
Stage, StageId, UnwindInput,
};
use std::{ops::DerefMut, path::PathBuf, sync::Arc};
@ -58,8 +61,14 @@ async fn unwind_and_copy<DB: Database>(
MerkleStage::default_unwind().unwind(&mut unwind_tx, unwind).await?;
// Bring Plainstate to TO (hashing stage execution requires it)
let mut exec_stage =
ExecutionStage::new(reth_revm::Factory::new(Arc::new(MAINNET.clone())), u64::MAX);
let mut exec_stage = ExecutionStage::new(
reth_revm::Factory::new(Arc::new(MAINNET.clone())),
ExecutionStageThresholds {
max_blocks: Some(u64::MAX),
max_changes: None,
max_changesets: None,
},
);
exec_stage
.unwind(

View File

@ -7,8 +7,9 @@ use reth_provider::Transaction;
use reth_staged_sync::utils::{chainspec::genesis_value_parser, init::init_db};
use reth_stages::{
stages::{
AccountHashingStage, ExecutionStage, MerkleStage, StorageHashingStage, ACCOUNT_HASHING,
EXECUTION, MERKLE_EXECUTION, SENDER_RECOVERY, STORAGE_HASHING,
AccountHashingStage, ExecutionStage, ExecutionStageThresholds, MerkleStage,
StorageHashingStage, ACCOUNT_HASHING, EXECUTION, MERKLE_EXECUTION, SENDER_RECOVERY,
STORAGE_HASHING,
},
ExecInput, Stage,
};
@ -82,7 +83,14 @@ impl Command {
MERKLE_EXECUTION.get_progress(tx.deref())?.unwrap_or_default());
let factory = reth_revm::Factory::new(self.chain.clone());
let mut execution_stage = ExecutionStage::new(factory, 1);
let mut execution_stage = ExecutionStage::new(
factory,
ExecutionStageThresholds {
max_blocks: Some(1),
max_changes: None,
max_changesets: None,
},
);
let mut account_hashing_stage = AccountHashingStage::default();
let mut storage_hashing_stage = StorageHashingStage::default();

View File

@ -54,7 +54,10 @@ use reth_staged_sync::{
};
use reth_stages::{
prelude::*,
stages::{ExecutionStage, HeaderSyncMode, SenderRecoveryStage, TotalDifficultyStage, FINISH},
stages::{
ExecutionStage, ExecutionStageThresholds, HeaderSyncMode, SenderRecoveryStage,
TotalDifficultyStage, FINISH,
},
};
use reth_tasks::TaskExecutor;
use reth_transaction_pool::{EthTransactionValidator, TransactionPool};
@ -689,7 +692,14 @@ impl Command {
.set(SenderRecoveryStage {
commit_threshold: stage_conf.sender_recovery.commit_threshold,
})
.set(ExecutionStage::new(factory, stage_conf.execution.commit_threshold))
.set(ExecutionStage::new(
factory,
ExecutionStageThresholds {
max_blocks: stage_conf.execution.max_blocks,
max_changes: stage_conf.execution.max_changes,
max_changesets: stage_conf.execution.max_changesets,
},
))
.disable_if(MERKLE_UNWIND, || self.auto_mine)
.disable_if(MERKLE_EXECUTION, || self.auto_mine),
)

View File

@ -16,7 +16,10 @@ use reth_staged_sync::{
Config,
};
use reth_stages::{
stages::{BodyStage, ExecutionStage, MerkleStage, SenderRecoveryStage, TransactionLookupStage},
stages::{
BodyStage, ExecutionStage, ExecutionStageThresholds, MerkleStage, SenderRecoveryStage,
TransactionLookupStage,
},
ExecInput, Stage, StageId, UnwindInput,
};
use std::{net::SocketAddr, path::PathBuf, sync::Arc};
@ -181,7 +184,14 @@ impl Command {
}
StageEnum::Execution => {
let factory = reth_revm::Factory::new(self.chain.clone());
let mut stage = ExecutionStage::new(factory, num_blocks);
let mut stage = ExecutionStage::new(
factory,
ExecutionStageThresholds {
max_blocks: Some(num_blocks),
max_changes: None,
max_changesets: None,
},
);
if !self.skip_unwind {
stage.unwind(&mut tx, unwind).await?;
}

View File

@ -193,7 +193,7 @@ pub async fn run_test(path: PathBuf) -> eyre::Result<TestOutcome> {
// Initialize the execution stage
// Hardcode the chain_id to Ethereum 1.
let factory = reth_revm::Factory::new(Arc::new(chain_spec));
let mut stage = ExecutionStage::new(factory, 1_000);
let mut stage = ExecutionStage::new_with_factory(factory);
// Call execution stage
let input = ExecInput {

View File

@ -50,6 +50,9 @@ pub const FINNEY_TO_WEI: u128 = (GWEI_TO_WEI as u128) * 1_000_000;
/// Multiplier for converting ether to wei.
pub const ETH_TO_WEI: u128 = FINNEY_TO_WEI * 1000;
/// Multiplier for converting mgas to gas.
pub const MGAS_TO_GAS: u64 = 1_000_000u64;
/// The Ethereum mainnet genesis hash.
pub const MAINNET_GENESIS: H256 =
H256(hex!("d4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3"));

View File

@ -800,8 +800,8 @@ mod tests {
// Clone and sort to make the test deterministic
assert_eq!(
post_state.account_changes(),
&BTreeMap::from([(
post_state.account_changes().inner,
BTreeMap::from([(
block.number,
BTreeMap::from([
// New account
@ -815,8 +815,8 @@ mod tests {
"Account changeset did not match"
);
assert_eq!(
post_state.storage_changes(),
&BTreeMap::from([(
post_state.storage_changes().inner,
BTreeMap::from([(
block.number,
BTreeMap::from([(
account1,

View File

@ -154,13 +154,25 @@ impl Default for SenderRecoveryConfig {
/// Execution stage configuration.
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)]
pub struct ExecutionConfig {
/// The maximum number of blocks to execution before committing progress to the database.
pub commit_threshold: u64,
/// The maximum number of blocks to process before the execution stage commits.
pub max_blocks: Option<u64>,
/// The maximum amount of state changes to keep in memory before the execution stage commits.
pub max_changes: Option<u64>,
/// The maximum amount of changesets to keep in memory before they are written to the pending
/// database transaction.
///
/// If this is lower than `max_gas`, then history is periodically flushed to the database
/// transaction, which frees up memory.
pub max_changesets: Option<u64>,
}
impl Default for ExecutionConfig {
fn default() -> Self {
Self { commit_threshold: 5_000 }
Self {
max_blocks: Some(500_000),
max_changes: Some(5_000_000),
max_changesets: Some(1_000_000),
}
}
}

View File

@ -247,7 +247,7 @@ impl<EF: ExecutorFactory, DB: Database> StageSet<DB> for ExecutionStages<EF> {
fn builder(self) -> StageSetBuilder<DB> {
StageSetBuilder::default()
.add_stage(SenderRecoveryStage::default())
.add_stage(ExecutionStage::new(self.executor_factory, 10_000))
.add_stage(ExecutionStage::new_with_factory(self.executor_factory))
}
}

View File

@ -8,7 +8,9 @@ use reth_db::{
transaction::{DbTx, DbTxMut},
};
use reth_metrics_derive::Metrics;
use reth_primitives::{Block, BlockNumber, BlockWithSenders, U256};
use reth_primitives::{
constants::MGAS_TO_GAS, Block, BlockNumber, BlockWithSenders, TransactionSigned, U256,
};
use reth_provider::{
post_state::PostState, BlockExecutor, ExecutorFactory, LatestStateProviderRef, Transaction,
};
@ -60,25 +62,21 @@ pub struct ExecutionStage<EF: ExecutorFactory> {
metrics: ExecutionStageMetrics,
/// The stage's internal executor
executor_factory: EF,
/// Commit threshold
commit_threshold: u64,
/// The commit thresholds of the execution stage.
thresholds: ExecutionStageThresholds,
}
impl<EF: ExecutorFactory> ExecutionStage<EF> {
/// Create new execution stage with specified config.
pub fn new(executor_factory: EF, commit_threshold: u64) -> Self {
Self { metrics: ExecutionStageMetrics::default(), executor_factory, commit_threshold }
pub fn new(executor_factory: EF, thresholds: ExecutionStageThresholds) -> Self {
Self { metrics: ExecutionStageMetrics::default(), executor_factory, thresholds }
}
/// Create an execution stage with the provided executor factory.
///
/// The commit threshold will be set to 10_000.
pub fn new_with_factory(executor_factory: EF) -> Self {
Self {
metrics: ExecutionStageMetrics::default(),
executor_factory,
commit_threshold: 10_000,
}
Self::new(executor_factory, ExecutionStageThresholds::default())
}
// TODO: This should be in the block provider trait once we consolidate
@ -92,14 +90,42 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
let ommers = tx.get::<tables::BlockOmmers>(block_number)?.unwrap_or_default().ommers;
let withdrawals = tx.get::<tables::BlockWithdrawals>(block_number)?.map(|v| v.withdrawals);
let (transactions, senders): (Vec<_>, Vec<_>) = tx
.get_block_transaction_range(block_number..=block_number)?
.into_iter()
.flat_map(|(_, txs)| txs.into_iter())
.map(|tx| tx.to_components())
.unzip();
// Get the block body
let body = tx.get::<tables::BlockBodyIndices>(block_number)?.unwrap();
let tx_range = body.tx_num_range();
Ok((Block { header, body: transactions, ommers, withdrawals }.with_senders(senders), td))
// Get the transactions in the body
let (transactions, senders) = if tx_range.is_empty() {
(Vec::new(), Vec::new())
} else {
let transactions = tx
.cursor_read::<tables::Transactions>()?
.walk_range(tx_range.clone())?
.map(|entry| entry.map(|tx| tx.1))
.collect::<Result<Vec<_>, _>>()?;
let senders = tx
.cursor_read::<tables::TxSenders>()?
.walk_range(tx_range)?
.map(|entry| entry.map(|sender| sender.1))
.collect::<Result<Vec<_>, _>>()?;
(transactions, senders)
};
let body = transactions
.into_iter()
.map(|tx| {
TransactionSigned {
// TODO: This is the fastest way right now to make everything just work with
// a dummy transaction hash.
hash: Default::default(),
signature: tx.signature,
transaction: tx.transaction,
}
})
.collect();
Ok((Block { header, body, ommers, withdrawals }.with_senders(senders), td))
}
/// Execute the stage.
@ -108,37 +134,59 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold);
let start_block = input.stage_progress() + 1;
let max_block = input.previous_stage_progress();
// Create state provider with cached state
// Build executor
let mut executor = self.executor_factory.with_sp(LatestStateProviderRef::new(&**tx));
// Fetch transactions, execute them and generate results
// Progress tracking
let mut progress = start_block;
// Execute block range
let mut state = PostState::default();
for block_number in range.clone() {
for block_number in start_block..=max_block {
let (block, td) = Self::read_block_with_senders(tx, block_number)?;
// Configure the executor to use the current state.
trace!(target: "sync::stages::execution", number = block_number, txs = block.body.len(), "Executing block");
// Execute the block
let (block, senders) = block.into_components();
let block_state = executor
.execute_and_verify_receipt(&block, td, Some(senders))
.map_err(|error| StageError::ExecutionError { block: block_number, error })?;
if let Some(last_receipt) = block_state.receipts().last() {
// Gas metrics
self.metrics
.mgas_processed_total
.increment(last_receipt.cumulative_gas_used as f64 / 1_000_000.);
}
state.extend(block_state);
.increment(block.header.gas_used as f64 / MGAS_TO_GAS as f64);
// Write history periodically to free up memory
if self.thresholds.should_write_history(state.changeset_size() as u64) {
info!(target: "sync::stages::execution", ?block_number, "Writing history.");
state.write_history_to_db(&**tx)?;
info!(target: "sync::stages::execution", ?block_number, "Wrote history.");
// gas_since_history_write = 0;
}
// Check if we should commit now
if self.thresholds.is_end_of_batch(block_number - start_block, state.size() as u64) {
info!(target: "sync::stages::execution", ?block_number, "Threshold hit, committing.");
break
}
// Merge state changes
state.extend(block_state);
progress = block_number;
}
// Write remaining changes
let start = Instant::now();
trace!(target: "sync::stages::execution", accounts = state.accounts().len(), "Writing updated state to database");
state.write_to_db(&**tx)?;
trace!(target: "sync::stages::execution", took = ?Instant::now().duration_since(start), "Wrote state");
info!(target: "sync::stages::execution", stage_progress = *range.end(), is_final_range, "Sync iteration finished");
Ok(ExecOutput { stage_progress: *range.end(), done: is_final_range })
Ok(ExecOutput { stage_progress: progress, done: progress == max_block })
}
}
@ -258,6 +306,52 @@ impl<EF: ExecutorFactory, DB: Database> Stage<DB> for ExecutionStage<EF> {
}
}
/// The thresholds at which the execution stage writes state changes to the database.
///
/// If either of the thresholds (`max_blocks` and `max_changes`) are hit, then the execution stage
/// commits all pending changes to the database.
///
/// A third threshold, `max_changesets`, can be set to periodically write changesets to the
/// current database transaction, which frees up memory.
#[derive(Debug)]
pub struct ExecutionStageThresholds {
/// The maximum number of blocks to process before the execution stage commits.
pub max_blocks: Option<u64>,
/// The maximum amount of state changes to keep in memory before the execution stage commits.
pub max_changes: Option<u64>,
/// The maximum amount of changesets to keep in memory before they are written to the pending
/// database transaction.
///
/// If this is lower than `max_changes`, then history is periodically flushed to the database
/// transaction, which frees up memory.
pub max_changesets: Option<u64>,
}
impl Default for ExecutionStageThresholds {
fn default() -> Self {
Self {
max_blocks: Some(500_000),
max_changes: Some(5_000_000),
max_changesets: Some(1_000_000),
}
}
}
impl ExecutionStageThresholds {
/// Check if the batch thresholds have been hit.
#[inline]
pub fn is_end_of_batch(&self, blocks_processed: u64, changes_processed: u64) -> bool {
blocks_processed >= self.max_blocks.unwrap_or(u64::MAX) ||
changes_processed >= self.max_changes.unwrap_or(u64::MAX)
}
/// Check if the history write threshold has been hit.
#[inline]
pub fn should_write_history(&self, history_changes: u64) -> bool {
history_changes >= self.max_changesets.unwrap_or(u64::MAX)
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -281,7 +375,14 @@ mod tests {
fn stage() -> ExecutionStage<Factory> {
let factory =
Factory::new(Arc::new(ChainSpecBuilder::mainnet().berlin_activated().build()));
ExecutionStage::new(factory, 100)
ExecutionStage::new(
factory,
ExecutionStageThresholds {
max_blocks: Some(100),
max_changes: None,
max_changesets: None,
},
)
}
#[tokio::test]

View File

@ -27,6 +27,7 @@ thiserror = "1.0.37"
auto_impl = "1.0"
itertools = "0.10"
pin-project = "1.0"
derive_more = "0.99"
# test-utils
reth-rlp = { path = "../../rlp", optional = true }

View File

@ -0,0 +1,73 @@
use derive_more::Deref;
use reth_primitives::{Account, Address, BlockNumber};
use std::collections::{btree_map::Entry, BTreeMap};
/// A mapping of `block -> address -> account` that represents what accounts were changed, and what
/// their state were prior to that change.
///
/// If the prior state was `None`, then the account is new.
#[derive(Default, Clone, Eq, PartialEq, Debug, Deref)]
pub struct AccountChanges {
/// The inner mapping of block changes.
#[deref]
pub inner: BTreeMap<BlockNumber, BTreeMap<Address, Option<Account>>>,
/// Hand tracked change size.
pub size: usize,
}
impl AccountChanges {
/// Insert account change at specified block number. The value is **not** updated if it already
/// exists.
pub fn insert(&mut self, block: BlockNumber, address: Address, account: Option<Account>) {
if let Entry::Vacant(entry) = self.inner.entry(block).or_default().entry(address) {
self.size += 1;
entry.insert(account);
}
}
/// Insert account changes at specified block number. The values are **not** updated if they
/// already exist.
pub fn insert_for_block(
&mut self,
block: BlockNumber,
changes: BTreeMap<Address, Option<Account>>,
) {
let block_entry = self.inner.entry(block).or_default();
for (address, account) in changes {
if let Entry::Vacant(entry) = block_entry.entry(address) {
entry.insert(account);
self.size += 1;
}
}
}
/// Drain and return any entries above the target block number.
pub fn drain_above(
&mut self,
target_block: BlockNumber,
) -> BTreeMap<BlockNumber, BTreeMap<Address, Option<Account>>> {
let mut evicted = BTreeMap::new();
self.inner.retain(|block_number, accounts| {
if *block_number > target_block {
self.size -= accounts.len();
evicted.insert(*block_number, accounts.clone());
false
} else {
true
}
});
evicted
}
/// Retain entries only above specified block number.
pub fn retain_above(&mut self, target_block: BlockNumber) {
self.inner.retain(|block_number, accounts| {
if *block_number > target_block {
true
} else {
self.size -= accounts.len();
false
}
});
}
}

View File

@ -16,55 +16,11 @@ use reth_trie::{
};
use std::collections::BTreeMap;
/// Storage for an account with the old and new values for each slot: (slot -> (old, new)).
pub type StorageChangeset = BTreeMap<U256, (U256, U256)>;
mod account;
pub use account::AccountChanges;
/// A mapping of `block -> address -> account` that represents what accounts were changed, and what
/// their state were prior to that change.
///
/// If the prior state was `None`, then the account is new.
pub type AccountChanges = BTreeMap<BlockNumber, BTreeMap<Address, Option<Account>>>;
/// A mapping of `block -> account -> slot -> old value` that represents what slots were changed,
/// and what their values were prior to that change.
pub type StorageChanges = BTreeMap<BlockNumber, BTreeMap<Address, ChangedStorage>>;
/// Changed storage state for the account.
///
/// # Wiped Storage
///
/// The field `wiped` denotes whether the pre-existing storage in the database should be cleared or
/// not.
#[derive(Debug, Default, Clone, Eq, PartialEq)]
pub struct ChangedStorage {
/// Whether the storage was wiped or not.
pub wiped: bool,
/// The storage slots.
pub storage: BTreeMap<U256, U256>,
}
/// Latest storage state for the account.
///
/// # Wiped Storage
///
/// The `times_wiped` field indicates the number of times the storage was wiped in this poststate.
///
/// If `times_wiped` is greater than 0, then the account was selfdestructed at some point, and the
/// values contained in `storage` should be the only values written to the database.
#[derive(Debug, Default, Clone, Eq, PartialEq)]
pub struct Storage {
/// The number of times the storage was wiped.
pub times_wiped: u64,
/// The storage slots.
pub storage: BTreeMap<U256, U256>,
}
impl Storage {
/// Returns `true` if the storage was wiped at any point.
pub fn wiped(&self) -> bool {
self.times_wiped > 0
}
}
mod storage;
pub use storage::{ChangedStorage, Storage, StorageChanges, StorageChangeset};
// todo: rewrite all the docs for this
/// The state of accounts after execution of one or more transactions, including receipts and new
@ -135,6 +91,18 @@ impl PostState {
Self { receipts: Vec::with_capacity(txs), ..Default::default() }
}
/// Return the current size of the poststate.
///
/// Size is the sum of individual changes to accounts, storage, bytecode and receipts.
pub fn size(&self) -> usize {
self.accounts.len() + self.bytecode.len() + self.receipts.len() + self.changeset_size()
}
/// Return the current size of history changes in the poststate.
pub fn changeset_size(&self) -> usize {
self.account_changes.size + self.storage_changes.size
}
/// Get the latest state of all changed accounts.
pub fn accounts(&self) -> &BTreeMap<Address, Option<Account>> {
&self.accounts
@ -289,32 +257,21 @@ impl PostState {
}
// Insert account change sets
for (block_number, account_changes) in std::mem::take(&mut other.account_changes) {
let block = self.account_changes.entry(block_number).or_default();
for (address, account) in account_changes {
if block.contains_key(&address) {
continue
}
block.insert(address, account);
}
for (block_number, account_changes) in std::mem::take(&mut other.account_changes).inner {
self.account_changes.insert_for_block(block_number, account_changes);
}
// Insert storage change sets
for (block_number, storage_changes) in std::mem::take(&mut other.storage_changes) {
for (block_number, storage_changes) in std::mem::take(&mut other.storage_changes).inner {
for (address, their_storage) in storage_changes {
let our_storage = self
.storage_changes
.entry(block_number)
.or_default()
.entry(address)
.or_default();
if their_storage.wiped {
our_storage.wiped = true;
}
for (slot, value) in their_storage.storage {
our_storage.storage.entry(slot).or_insert(value);
self.storage_changes.set_wiped(block_number, address);
}
self.storage_changes.insert_for_block_and_address(
block_number,
address,
their_storage.storage.into_iter(),
);
}
}
self.receipts.extend(other.receipts);
@ -325,28 +282,12 @@ impl PostState {
///
/// The reverted changes are removed from this post-state, and their effects are reverted.
pub fn revert_to(&mut self, target_block_number: BlockNumber) {
let mut account_changes_to_revert = BTreeMap::new();
self.account_changes.retain(|block_number, accounts| {
if *block_number > target_block_number {
account_changes_to_revert.insert(*block_number, accounts.clone());
false
} else {
true
}
});
let account_changes_to_revert = self.account_changes.drain_above(target_block_number);
for (_, accounts) in account_changes_to_revert.into_iter().rev() {
self.accounts.extend(accounts);
}
let mut storage_changes_to_revert = BTreeMap::new();
self.storage_changes.retain(|block_number, storages| {
if *block_number > target_block_number {
storage_changes_to_revert.insert(*block_number, storages.clone());
false
} else {
true
}
});
let storage_changes_to_revert = self.storage_changes.drain_above(target_block_number);
for (_, storages) in storage_changes_to_revert.into_iter().rev() {
for (address, storage) in storages {
self.storage.entry(address).and_modify(|head_storage| {
@ -379,12 +320,8 @@ impl PostState {
self.revert_to(revert_to_block);
// Remove all changes in the returned post-state that were not reverted
non_reverted_state
.storage_changes
.retain(|block_number, _| *block_number > revert_to_block);
non_reverted_state
.account_changes
.retain(|block_number, _| *block_number > revert_to_block);
non_reverted_state.storage_changes.retain_above(revert_to_block);
non_reverted_state.account_changes.retain_above(revert_to_block);
non_reverted_state
}
@ -397,7 +334,7 @@ impl PostState {
account: Account,
) {
self.accounts.insert(address, Some(account));
self.account_changes.entry(block_number).or_default().entry(address).or_insert(None);
self.account_changes.insert(block_number, address, None);
}
/// Add a changed account to the post-state.
@ -412,7 +349,7 @@ impl PostState {
new: Account,
) {
self.accounts.insert(address, Some(new));
self.account_changes.entry(block_number).or_default().entry(address).or_insert(Some(old));
self.account_changes.insert(block_number, address, Some(old));
}
/// Mark an account as destroyed.
@ -423,17 +360,12 @@ impl PostState {
account: Account,
) {
self.accounts.insert(address, None);
self.account_changes
.entry(block_number)
.or_default()
.entry(address)
.or_insert(Some(account));
self.account_changes.insert(block_number, address, Some(account));
let storage = self.storage.entry(address).or_default();
storage.times_wiped += 1;
storage.storage.clear();
let storage_changes =
self.storage_changes.entry(block_number).or_default().entry(address).or_default();
storage_changes.wiped = true;
self.storage_changes.set_wiped(block_number, address);
}
/// Add changed storage values to the post-state.
@ -448,11 +380,11 @@ impl PostState {
.or_default()
.storage
.extend(changeset.iter().map(|(slot, (_, new))| (*slot, *new)));
let storage_changes =
self.storage_changes.entry(block_number).or_default().entry(address).or_default();
for (slot, (old, _)) in changeset.into_iter() {
storage_changes.storage.entry(slot).or_insert(old);
}
self.storage_changes.insert_for_block_and_address(
block_number,
address,
changeset.into_iter().map(|(slot, (old, _))| (slot, old)),
);
}
/// Add new bytecode to the post-state.
@ -472,12 +404,17 @@ impl PostState {
self.receipts.push(receipt);
}
/// Write the post state to the database.
pub fn write_to_db<'a, TX: DbTxMut<'a> + DbTx<'a>>(self, tx: &TX) -> Result<(), DbError> {
/// Write changeset history to the database.
pub fn write_history_to_db<'a, TX: DbTxMut<'a> + DbTx<'a>>(
&mut self,
tx: &TX,
) -> Result<(), DbError> {
// Write account changes
tracing::trace!(target: "provider::post_state", "Writing account changes");
let mut account_changeset_cursor = tx.cursor_dup_write::<tables::AccountChangeSet>()?;
for (block_number, account_changes) in self.account_changes.into_iter() {
for (block_number, account_changes) in
std::mem::take(&mut self.account_changes).inner.into_iter()
{
for (address, info) in account_changes.into_iter() {
tracing::trace!(target: "provider::post_state", block_number, ?address, old = ?info, "Account changed");
account_changeset_cursor
@ -489,7 +426,9 @@ impl PostState {
tracing::trace!(target: "provider::post_state", "Writing storage changes");
let mut storages_cursor = tx.cursor_dup_write::<tables::PlainStorageState>()?;
let mut storage_changeset_cursor = tx.cursor_dup_write::<tables::StorageChangeSet>()?;
for (block_number, storage_changes) in self.storage_changes.into_iter() {
for (block_number, storage_changes) in
std::mem::take(&mut self.storage_changes).inner.into_iter()
{
for (address, mut storage) in storage_changes.into_iter() {
let storage_id = BlockNumberAddress((block_number, address));
@ -514,7 +453,15 @@ impl PostState {
}
}
Ok(())
}
/// Write the post state to the database.
pub fn write_to_db<'a, TX: DbTxMut<'a> + DbTx<'a>>(mut self, tx: &TX) -> Result<(), DbError> {
self.write_history_to_db(tx)?;
// Write new storage state
let mut storages_cursor = tx.cursor_dup_write::<tables::PlainStorageState>()?;
for (address, storage) in self.storage.into_iter() {
// If the storage was wiped, remove all previous entries from the database.
if storage.wiped() {
@ -526,7 +473,7 @@ impl PostState {
for (key, value) in storage.storage {
tracing::trace!(target: "provider::post_state", ?address, ?key, "Updating plain state storage");
let key = H256(key.to_be_bytes());
let key: H256 = key.into();
if let Some(entry) = storages_cursor.seek_by_key_subkey(address, key)? {
if entry.key == key {
storages_cursor.delete_current()?;
@ -968,8 +915,8 @@ mod tests {
// The value in the changeset for the account should be `None` since this was an account
// creation
assert_eq!(
state.account_changes(),
&BTreeMap::from([(block, BTreeMap::from([(address, None)]))]),
state.account_changes().inner,
BTreeMap::from([(block, BTreeMap::from([(address, None)]))]),
"The changeset for the account is incorrect"
);
@ -990,8 +937,8 @@ mod tests {
// The value in the changeset for the account should still be `None`
assert_eq!(
state.account_changes(),
&BTreeMap::from([(block, BTreeMap::from([(address, None)]))]),
state.account_changes().inner,
BTreeMap::from([(block, BTreeMap::from([(address, None)]))]),
"The changeset for the account is incorrect"
);
@ -1046,8 +993,8 @@ mod tests {
// Slot 0: 0 (the value before the first tx in the block)
// Slot 1: 3
assert_eq!(
state.storage_changes(),
&BTreeMap::from([(
state.storage_changes().inner,
BTreeMap::from([(
block,
BTreeMap::from([(
address,
@ -1113,13 +1060,13 @@ mod tests {
BTreeMap::from([(U256::from(0), (U256::from(0), U256::from(1)))]),
);
assert_eq!(
a.account_changes(),
&BTreeMap::from([(block, BTreeMap::from([(address, None)]))]),
a.account_changes().inner,
BTreeMap::from([(block, BTreeMap::from([(address, None)]))]),
"The changeset for the account is incorrect in state A"
);
assert_eq!(
a.storage_changes(),
&BTreeMap::from([(
a.storage_changes().inner,
BTreeMap::from([(
block,
BTreeMap::from([(
address,
@ -1150,8 +1097,8 @@ mod tests {
BTreeMap::from([(U256::from(0), (U256::from(1), U256::from(2)))]),
);
assert_eq!(
b.account_changes(),
&BTreeMap::from([(
b.account_changes().inner,
BTreeMap::from([(
block,
BTreeMap::from([(
address,
@ -1161,8 +1108,8 @@ mod tests {
"The changeset for the account is incorrect in state B"
);
assert_eq!(
b.storage_changes(),
&BTreeMap::from([(
b.storage_changes().inner,
BTreeMap::from([(
block,
BTreeMap::from([(
address,
@ -1190,13 +1137,13 @@ mod tests {
// Storage:
// - Slot 0: 2
assert_eq!(
a.account_changes(),
&BTreeMap::from([(block, BTreeMap::from([(address, None)]))]),
a.account_changes().inner,
BTreeMap::from([(block, BTreeMap::from([(address, None)]))]),
"The changeset for the account is incorrect in the merged state"
);
assert_eq!(
a.storage_changes(),
&BTreeMap::from([(
a.storage_changes().inner,
BTreeMap::from([(
block,
BTreeMap::from([(
address,

View File

@ -0,0 +1,114 @@
use derive_more::Deref;
use reth_primitives::{Address, BlockNumber, U256};
use std::collections::{btree_map::Entry, BTreeMap};
/// Storage for an account with the old and new values for each slot: (slot -> (old, new)).
pub type StorageChangeset = BTreeMap<U256, (U256, U256)>;
/// Changed storage state for the account.
///
/// # Wiped Storage
///
/// The field `wiped` denotes whether the pre-existing storage in the database should be cleared or
/// not.
#[derive(Debug, Default, Clone, Eq, PartialEq)]
pub struct ChangedStorage {
/// Whether the storage was wiped or not.
pub wiped: bool,
/// The storage slots.
pub storage: BTreeMap<U256, U256>,
}
/// Latest storage state for the account.
///
/// # Wiped Storage
///
/// The `times_wiped` field indicates the number of times the storage was wiped in this poststate.
///
/// If `times_wiped` is greater than 0, then the account was selfdestructed at some point, and the
/// values contained in `storage` should be the only values written to the database.
#[derive(Debug, Default, Clone, Eq, PartialEq)]
pub struct Storage {
/// The number of times the storage was wiped.
pub times_wiped: u64,
/// The storage slots.
pub storage: BTreeMap<U256, U256>,
}
impl Storage {
/// Returns `true` if the storage was wiped at any point.
pub fn wiped(&self) -> bool {
self.times_wiped > 0
}
}
/// A mapping of `block -> account -> slot -> old value` that represents what slots were changed,
/// and what their values were prior to that change.
#[derive(Default, Clone, Eq, PartialEq, Debug, Deref)]
pub struct StorageChanges {
/// The inner mapping of block changes.
#[deref]
pub inner: BTreeMap<BlockNumber, BTreeMap<Address, ChangedStorage>>,
/// Hand tracked change size.
pub size: usize,
}
impl StorageChanges {
/// Set storage `wiped` flag for specified block number and address.
pub fn set_wiped(&mut self, block: BlockNumber, address: Address) {
self.inner.entry(block).or_default().entry(address).or_default().wiped = true;
}
/// Insert storage entries for specified block number and address.
pub fn insert_for_block_and_address<I>(
&mut self,
block: BlockNumber,
address: Address,
storage: I,
) where
I: Iterator<Item = (U256, U256)>,
{
let block_entry = self.inner.entry(block).or_default();
let storage_entry = block_entry.entry(address).or_default();
for (slot, value) in storage {
if let Entry::Vacant(entry) = storage_entry.storage.entry(slot) {
entry.insert(value);
self.size += 1;
}
}
}
/// Drain and return any entries above the target block number.
pub fn drain_above(
&mut self,
target_block: BlockNumber,
) -> BTreeMap<BlockNumber, BTreeMap<Address, ChangedStorage>> {
let mut evicted = BTreeMap::new();
self.inner.retain(|block_number, storages| {
if *block_number > target_block {
// This is fine, because it's called only on post state splits
self.size -=
storages.iter().fold(0, |acc, (_, storage)| acc + storage.storage.len());
evicted.insert(*block_number, storages.clone());
false
} else {
true
}
});
evicted
}
/// Retain entries only above specified block number.
pub fn retain_above(&mut self, target_block: BlockNumber) {
self.inner.retain(|block_number, storages| {
if *block_number > target_block {
true
} else {
// This is fine, because it's called only on post state splits
self.size -=
storages.iter().fold(0, |acc, (_, storage)| acc + storage.storage.len());
false
}
});
}
}