fix: write transactions on save_blocks call (#9912)

This commit is contained in:
Dan Cline
2024-07-30 18:17:20 -04:00
committed by GitHub
parent 04986f2d04
commit f3ce077c8a
2 changed files with 32 additions and 16 deletions

View File

@ -1,9 +1,9 @@
#![allow(dead_code)]
use reth_chain_state::ExecutedBlock;
use reth_db::Database;
use reth_db::{models::CompactU256, tables, transaction::DbTxMut, Database};
use reth_errors::ProviderResult;
use reth_primitives::{SealedBlock, StaticFileSegment, TransactionSignedNoHash, B256};
use reth_primitives::{SealedBlock, StaticFileSegment, TransactionSignedNoHash, B256, U256};
use reth_provider::{
writer::StorageWriter, BlockExecutionWriter, BlockNumReader, BlockWriter, HistoryWriter,
OriginalValuesKnown, ProviderFactory, StageCheckpointWriter, StateWriter,
@ -16,7 +16,7 @@ use std::sync::{
Arc,
};
use tokio::sync::oneshot;
use tracing::debug;
use tracing::{debug, instrument};
/// Writes parts of reth's in memory tree state to the database and static files.
///
@ -127,9 +127,12 @@ impl<DB: Database> PersistenceService<DB> {
/// Updates checkpoints related to block headers and bodies. This should be called after new
/// transactions have been successfully written to disk.
fn update_transaction_meta(&self, block_num: u64) -> ProviderResult<()> {
fn update_transaction_meta(&self, block_num: u64, td: U256) -> ProviderResult<()> {
debug!(target: "tree::persistence", ?block_num, "Updating transaction metadata after writing");
let provider_rw = self.provider.provider_rw()?;
provider_rw
.tx_ref()
.put::<tables::HeaderTerminalDifficulties>(block_num, CompactU256(td))?;
provider_rw.save_stage_checkpoint(StageId::Headers, StageCheckpoint::new(block_num))?;
provider_rw.save_stage_checkpoint(StageId::Bodies, StageCheckpoint::new(block_num))?;
provider_rw.commit()?;
@ -138,17 +141,20 @@ impl<DB: Database> PersistenceService<DB> {
/// Writes the transactions to static files.
///
/// Returns the block number and new total difficulty.
///
/// The [`update_transaction_meta`](Self::update_transaction_meta) method should be called
/// after this, to update the checkpoints for headers and block bodies.
fn write_transactions(&self, block: Arc<SealedBlock>) -> ProviderResult<u64> {
#[instrument(level = "trace", skip(self), target = "engine")]
fn write_transactions(&self, block: Arc<SealedBlock>) -> ProviderResult<(u64, U256)> {
debug!(target: "tree::persistence", "Writing transactions");
let provider = self.provider.static_file_provider();
{
let new_td = {
let header_writer = provider.get_writer(block.number, StaticFileSegment::Headers)?;
let provider_ro = self.provider.provider()?;
let mut storage_writer = StorageWriter::new(Some(&provider_ro), Some(header_writer));
storage_writer.append_headers_from_blocks(
let new_td = storage_writer.append_headers_from_blocks(
block.header().number,
std::iter::once(&(block.header(), block.hash())),
)?;
@ -163,11 +169,13 @@ impl<DB: Database> PersistenceService<DB> {
block.header().number,
std::iter::once(&no_hash_transactions),
)?;
}
new_td
};
provider.commit()?;
Ok(block.number)
Ok((block.number, new_td))
}
/// Write execution-related block data to static files.
@ -281,6 +289,13 @@ where
self.write_execution_data(&blocks).expect("todo: handle errors");
// then write to db
self.write(&blocks).expect("todo: handle errors");
for block in &blocks {
// first write transactions
let (block_num, td) = self
.write_transactions(block.block.clone())
.expect("todo: handle errors");
self.update_transaction_meta(block_num, td).expect("todo: handle errors");
}
// we ignore the error because the caller may or may not care about the result
let _ = sender.send(last_block_hash);
@ -292,8 +307,9 @@ where
let _ = sender.send(res);
}
PersistenceAction::WriteTransactions((block, sender)) => {
let block_num = self.write_transactions(block).expect("todo: handle errors");
self.update_transaction_meta(block_num).expect("todo: handle errors");
let (block_num, td) =
self.write_transactions(block).expect("todo: handle errors");
self.update_transaction_meta(block_num, td).expect("todo: handle errors");
// we ignore the error because the caller may or may not care about the result
let _ = sender.send(());

View File

@ -133,7 +133,7 @@ where
&mut self,
initial_block_number: BlockNumber,
headers: impl Iterator<Item = I>,
) -> ProviderResult<()>
) -> ProviderResult<U256>
where
I: Borrow<(H, B256)>,
H: Borrow<Header>,
@ -143,7 +143,7 @@ where
let mut td_cursor =
self.database_writer().tx_ref().cursor_read::<tables::HeaderTerminalDifficulties>()?;
let first_td = if initial_block_number == 0 {
let mut first_td = if initial_block_number == 0 {
U256::ZERO
} else {
td_cursor
@ -155,11 +155,11 @@ where
for pair in headers {
let (header, hash) = pair.borrow();
let header = header.borrow();
let td = first_td + header.difficulty;
self.static_file_writer().append_header(header, td, hash)?;
first_td += header.difficulty;
self.static_file_writer().append_header(header, first_td, hash)?;
}
Ok(())
Ok(first_td)
}
/// Appends transactions to static files, using the