Discussion draft: change DB Writer to take value references (#13672)

This commit is contained in:
James Prestwich
2025-01-07 12:38:09 -05:00
committed by GitHub
parent 2b301aa102
commit 3e980e61d8
41 changed files with 149 additions and 137 deletions

View File

@ -1670,7 +1670,7 @@ impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N
let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
cursor.upsert(
stage_id.to_string(),
StageCheckpoint {
&StageCheckpoint {
block_number,
..if drop_stage_checkpoint { Default::default() } else { checkpoint }
},
@ -1751,7 +1751,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
fn write_state(
&self,
execution_outcome: ExecutionOutcome<Self::Receipt>,
execution_outcome: &ExecutionOutcome<Self::Receipt>,
is_value_known: OriginalValuesKnown,
write_receipts_to: StorageLocation,
) -> ProviderResult<()> {
@ -1785,7 +1785,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
})
.transpose()?;
for (idx, receipts) in execution_outcome.receipts.into_iter().enumerate() {
for (idx, receipts) in execution_outcome.receipts.iter().enumerate() {
let block_number = execution_outcome.first_block + idx as u64;
// Increment block number for receipts static file writer
@ -1798,11 +1798,11 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
.map(|(_, indices)| indices.first_tx_num())
.ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))?;
for (idx, receipt) in receipts.into_iter().enumerate() {
for (idx, receipt) in receipts.iter().enumerate() {
let receipt_idx = first_tx_index + idx as u64;
if let Some(receipt) = receipt {
if let Some(writer) = &mut receipts_static_writer {
writer.append_receipt(receipt_idx, &receipt)?;
writer.append_receipt(receipt_idx, receipt)?;
}
if let Some(cursor) = &mut receipts_cursor {
@ -1897,7 +1897,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
for (address, account) in changes.accounts {
if let Some(account) = account {
tracing::trace!(?address, "Updating plain state account");
accounts_cursor.upsert(address, account.into())?;
accounts_cursor.upsert(address, &account.into())?;
} else if accounts_cursor.seek_exact(address)?.is_some() {
tracing::trace!(?address, "Deleting plain state account");
accounts_cursor.delete_current()?;
@ -1908,7 +1908,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
for (hash, bytecode) in changes.contracts {
bytecodes_cursor.upsert(hash, Bytecode(bytecode))?;
bytecodes_cursor.upsert(hash, &Bytecode(bytecode))?;
}
// Write new storage state and wipe storage if needed.
@ -1936,7 +1936,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
}
if !entry.value.is_zero() {
storages_cursor.upsert(address, entry)?;
storages_cursor.upsert(address, &entry)?;
}
}
}
@ -1949,7 +1949,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
for (hashed_address, account) in hashed_state.accounts().accounts_sorted() {
if let Some(account) = account {
hashed_accounts_cursor.upsert(hashed_address, account)?;
hashed_accounts_cursor.upsert(hashed_address, &account)?;
} else if hashed_accounts_cursor.seek_exact(hashed_address)?.is_some() {
hashed_accounts_cursor.delete_current()?;
}
@ -1975,7 +1975,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
}
if !entry.value.is_zero() {
hashed_storage_cursor.upsert(*hashed_address, entry)?;
hashed_storage_cursor.upsert(*hashed_address, &entry)?;
}
}
}
@ -2047,7 +2047,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
if old_account != new_account {
let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
if let Some(account) = old_account {
plain_accounts_cursor.upsert(*address, *account)?;
plain_accounts_cursor.upsert(*address, account)?;
} else if existing_entry.is_some() {
plain_accounts_cursor.delete_current()?;
}
@ -2068,7 +2068,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
// insert value if needed
if !old_storage_value.is_zero() {
plain_storage_cursor.upsert(*address, storage_entry)?;
plain_storage_cursor.upsert(*address, &storage_entry)?;
}
}
}
@ -2147,7 +2147,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
if old_account != new_account {
let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
if let Some(account) = old_account {
plain_accounts_cursor.upsert(*address, *account)?;
plain_accounts_cursor.upsert(*address, account)?;
} else if existing_entry.is_some() {
plain_accounts_cursor.delete_current()?;
}
@ -2168,7 +2168,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
// insert value if needed
if !old_storage_value.is_zero() {
plain_storage_cursor.upsert(*address, storage_entry)?;
plain_storage_cursor.upsert(*address, &storage_entry)?;
}
}
}
@ -2255,7 +2255,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider
Some(node) => {
if !nibbles.0.is_empty() {
num_entries += 1;
account_trie_cursor.upsert(nibbles, node.clone())?;
account_trie_cursor.upsert(nibbles, node)?;
}
}
None => {
@ -2330,7 +2330,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvi
let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
for (hashed_address, account) in &hashed_accounts {
if let Some(account) = account {
hashed_accounts_cursor.upsert(*hashed_address, *account)?;
hashed_accounts_cursor.upsert(*hashed_address, account)?;
} else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
hashed_accounts_cursor.delete_current()?;
}
@ -2360,7 +2360,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvi
changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
for (hashed_address, account) in &hashed_accounts {
if let Some(account) = account {
hashed_accounts_cursor.upsert(*hashed_address, *account)?;
hashed_accounts_cursor.upsert(*hashed_address, account)?;
} else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
hashed_accounts_cursor.delete_current()?;
}
@ -2397,7 +2397,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvi
}
if !value.is_zero() {
hashed_storage.upsert(hashed_address, StorageEntry { key, value })?;
hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
}
}
Ok(hashed_storage_keys)
@ -2449,7 +2449,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvi
}
if !value.is_zero() {
hashed_storage_cursor.upsert(hashed_address, StorageEntry { key, value })?;
hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
}
Ok(())
})
@ -2561,7 +2561,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvi
if !partial_shard.is_empty() {
cursor.insert(
ShardedKey::last(address),
BlockNumberList::new_pre_sorted(partial_shard),
&BlockNumberList::new_pre_sorted(partial_shard),
)?;
}
}
@ -2619,7 +2619,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvi
if !partial_shard.is_empty() {
cursor.insert(
StorageShardedKey::last(address, storage_key),
BlockNumberList::new_pre_sorted(partial_shard),
&BlockNumberList::new_pre_sorted(partial_shard),
)?;
}
}
@ -2864,7 +2864,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWrite
let mut durations_recorder = metrics::DurationsRecorder::default();
// insert block meta
block_indices_cursor.append(*block_number, block_indices)?;
block_indices_cursor.append(*block_number, &block_indices)?;
durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
@ -2872,7 +2872,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWrite
// write transaction block index
if !body.transactions().is_empty() {
tx_block_cursor.append(block_indices.last_tx_num(), *block_number)?;
tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
}
@ -2882,7 +2882,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWrite
writer.append_transaction(next_tx_num, transaction)?;
}
if let Some(cursor) = tx_cursor.as_mut() {
cursor.append(next_tx_num, transaction.clone())?;
cursor.append(next_tx_num, transaction)?;
}
// Increment transaction id for each transaction.
@ -2992,7 +2992,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWrite
fn append_blocks_with_state(
&self,
blocks: Vec<SealedBlockWithSenders<Self::Block>>,
execution_outcome: ExecutionOutcome<Self::Receipt>,
execution_outcome: &ExecutionOutcome<Self::Receipt>,
hashed_state: HashedPostStateSorted,
trie_updates: TrieUpdates,
) -> ProviderResult<()> {