fix: share DatabaseProviderRW on PersistenceAction::SaveBlocks (#9929)

This commit is contained in:
joshieDo
2024-07-31 17:27:47 +01:00
committed by GitHub
parent 245284d62f
commit 78c461ea4e

View File

@ -5,9 +5,9 @@ use reth_db::{models::CompactU256, tables, transaction::DbTxMut, Database};
use reth_errors::ProviderResult;
use reth_primitives::{SealedBlock, StaticFileSegment, TransactionSignedNoHash, B256, U256};
use reth_provider::{
writer::StorageWriter, BlockExecutionWriter, BlockNumReader, BlockWriter, HistoryWriter,
OriginalValuesKnown, ProviderFactory, StageCheckpointWriter, StateChangeWriter, StateWriter,
StaticFileProviderFactory, StaticFileWriter, TransactionsProviderExt, TrieWriter,
writer::StorageWriter, BlockExecutionWriter, BlockNumReader, BlockWriter, DatabaseProviderRW,
HistoryWriter, OriginalValuesKnown, ProviderFactory, StageCheckpointWriter, StateChangeWriter,
StateWriter, StaticFileProviderFactory, StaticFileWriter, TransactionsProviderExt, TrieWriter,
};
use reth_prune::{Pruner, PrunerOutput};
use reth_stages_types::{StageCheckpoint, StageId};
@ -46,14 +46,17 @@ impl<DB: Database> PersistenceService<DB> {
}
/// Writes the cloned tree state to database
fn write(&self, blocks: &[ExecutedBlock]) -> ProviderResult<()> {
fn write(
&self,
blocks: &[ExecutedBlock],
provider_rw: &DatabaseProviderRW<DB>,
) -> ProviderResult<()> {
if blocks.is_empty() {
debug!(target: "tree::persistence", "Attempted to write empty block range");
return Ok(())
}
debug!(target: "tree::persistence", block_count = %blocks.len(), "Writing blocks to database");
let provider_rw = self.provider.provider_rw()?;
let first_number = blocks.first().unwrap().block().number;
let last = blocks.last().unwrap().block();
@ -77,7 +80,7 @@ impl<DB: Database> PersistenceService<DB> {
// Must be written after blocks because of the receipt lookup.
let execution_outcome = block.execution_outcome().clone();
// TODO: do we provide a static file producer here?
let mut storage_writer = StorageWriter::new(Some(&provider_rw), None);
let mut storage_writer = StorageWriter::new(Some(provider_rw), None);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::No)?;
// insert hashes and intermediate merkle nodes
@ -95,8 +98,6 @@ impl<DB: Database> PersistenceService<DB> {
// Update pipeline progress
provider_rw.update_pipeline_stages(last_block_number, false)?;
provider_rw.commit()?;
debug!(target: "tree::persistence", range = ?first_number..=last_block_number, "Appended block data");
Ok(())
@ -127,15 +128,18 @@ impl<DB: Database> PersistenceService<DB> {
/// Updates checkpoints related to block headers and bodies. This should be called after new
/// transactions have been successfully written to disk.
fn update_transaction_meta(&self, block_num: u64, td: U256) -> ProviderResult<()> {
fn update_transaction_meta(
&self,
block_num: u64,
td: U256,
provider_rw: &DatabaseProviderRW<DB>,
) -> ProviderResult<()> {
debug!(target: "tree::persistence", ?block_num, "Updating transaction metadata after writing");
let provider_rw = self.provider.provider_rw()?;
provider_rw
.tx_ref()
.put::<tables::HeaderTerminalDifficulties>(block_num, CompactU256(td))?;
provider_rw.save_stage_checkpoint(StageId::Headers, StageCheckpoint::new(block_num))?;
provider_rw.save_stage_checkpoint(StageId::Bodies, StageCheckpoint::new(block_num))?;
provider_rw.commit()?;
Ok(())
}
@ -145,15 +149,18 @@ impl<DB: Database> PersistenceService<DB> {
///
/// The [`update_transaction_meta`](Self::update_transaction_meta) method should be called
/// after this, to update the checkpoints for headers and block bodies.
#[instrument(level = "trace", skip(self), target = "engine")]
fn write_transactions(&self, block: Arc<SealedBlock>) -> ProviderResult<(u64, U256)> {
#[instrument(level = "trace", skip_all, fields(block = ?block.num_hash()) target = "engine")]
fn write_transactions(
&self,
block: Arc<SealedBlock>,
provider_rw: &DatabaseProviderRW<DB>,
) -> ProviderResult<(u64, U256)> {
debug!(target: "tree::persistence", "Writing transactions");
let provider = self.provider.static_file_provider();
let new_td = {
let header_writer = provider.get_writer(block.number, StaticFileSegment::Headers)?;
let provider_ro = self.provider.provider()?;
let mut storage_writer = StorageWriter::new(Some(&provider_ro), Some(header_writer));
let mut storage_writer = StorageWriter::new(Some(provider_rw), Some(header_writer));
let new_td = storage_writer.append_headers_from_blocks(
block.header().number,
std::iter::once(&(block.header(), block.hash())),
@ -162,7 +169,7 @@ impl<DB: Database> PersistenceService<DB> {
let transactions_writer =
provider.get_writer(block.number, StaticFileSegment::Transactions)?;
let mut storage_writer =
StorageWriter::new(Some(&provider_ro), Some(transactions_writer));
StorageWriter::new(Some(provider_rw), Some(transactions_writer));
let no_hash_transactions =
block.body.clone().into_iter().map(TransactionSignedNoHash::from).collect();
storage_writer.append_transactions_from_blocks(
@ -173,20 +180,18 @@ impl<DB: Database> PersistenceService<DB> {
new_td
};
provider.commit()?;
Ok((block.number, new_td))
}
/// Write execution-related block data to static files.
///
/// This will then send a command to the db service, that it should write new data, and update
/// the checkpoints for execution and beyond.
fn write_execution_data(&self, blocks: &[ExecutedBlock]) -> ProviderResult<()> {
/// Write execution-related block data to database and/or static files.
fn write_execution_data(
&self,
blocks: &[ExecutedBlock],
provider_rw: &DatabaseProviderRW<DB>,
) -> ProviderResult<()> {
if blocks.is_empty() {
return Ok(())
}
let provider_rw = self.provider.provider_rw()?;
let provider = self.provider.static_file_provider();
// NOTE: checked non-empty above
@ -201,7 +206,7 @@ impl<DB: Database> PersistenceService<DB> {
provider.get_writer(first_block.number, StaticFileSegment::Receipts)?;
{
let mut storage_writer = StorageWriter::new(Some(&provider_rw), Some(receipts_writer));
let mut storage_writer = StorageWriter::new(Some(provider_rw), Some(receipts_writer));
let receipts_iter = blocks.iter().map(|block| {
let receipts = block.execution_outcome().receipts().receipt_vec.clone();
debug_assert!(receipts.len() == 1);
@ -210,9 +215,6 @@ impl<DB: Database> PersistenceService<DB> {
storage_writer.append_receipts_from_blocks(current_block, receipts_iter)?;
}
provider.commit()?;
provider_rw.commit()?;
Ok(())
}
@ -285,18 +287,23 @@ where
todo!("return error or something");
}
let last_block_hash = blocks.last().unwrap().block().hash();
// first write to static files
self.write_execution_data(&blocks).expect("todo: handle errors");
// then write to db
self.write(&blocks).expect("todo: handle errors");
let provider_rw = self.provider.provider_rw().expect("todo: handle errors");
self.write_execution_data(&blocks, &provider_rw).expect("todo: handle errors");
self.write(&blocks, &provider_rw).expect("todo: handle errors");
for block in &blocks {
// first write transactions
let (block_num, td) = self
.write_transactions(block.block.clone())
.write_transactions(block.block.clone(), &provider_rw)
.expect("todo: handle errors");
self.update_transaction_meta(block_num, td, &provider_rw)
.expect("todo: handle errors");
self.update_transaction_meta(block_num, td).expect("todo: handle errors");
}
self.provider.static_file_provider().commit().expect("todo: handle errors");
provider_rw.commit().expect("todo: handle errors");
// we ignore the error because the caller may or may not care about the result
let _ = sender.send(last_block_hash);
}
@ -307,12 +314,13 @@ where
let _ = sender.send(res);
}
PersistenceAction::WriteTransactions((block, sender)) => {
let (block_num, td) =
self.write_transactions(block).expect("todo: handle errors");
self.update_transaction_meta(block_num, td).expect("todo: handle errors");
unimplemented!()
// let (block_num, td) =
// self.write_transactions(block).expect("todo: handle errors");
// self.update_transaction_meta(block_num, td).expect("todo: handle errors");
// we ignore the error because the caller may or may not care about the result
let _ = sender.send(());
// // we ignore the error because the caller may or may not care about the
// result let _ = sender.send(());
}
}
}