From f3fac56fd9c8a4737ca4f5ad505170a03ba05ba0 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Thu, 1 Aug 2024 17:36:33 +0100 Subject: [PATCH] chore: move `save_blocks` to `StorageWriter` (#9991) --- Cargo.lock | 1 - crates/engine/tree/Cargo.toml | 1 - crates/engine/tree/src/persistence.rs | 138 +------------ .../src/providers/database/provider.rs | 4 +- crates/storage/provider/src/writer/mod.rs | 183 ++++++++++++++++-- 5 files changed, 184 insertions(+), 143 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 05810f268..c85778c73 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7022,7 +7022,6 @@ dependencies = [ "reth-rpc-types-compat", "reth-stages", "reth-stages-api", - "reth-stages-types", "reth-static-file", "reth-tasks", "reth-tracing", diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index a65397603..addcf12ae 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -32,7 +32,6 @@ reth-prune.workspace = true reth-revm.workspace = true reth-rpc-types.workspace = true reth-stages-api.workspace = true -reth-stages-types.workspace = true reth-tasks.workspace = true reth-trie.workspace = true diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index b9ab9145b..fc7f4818c 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -1,23 +1,21 @@ #![allow(dead_code)] use reth_chain_state::ExecutedBlock; -use reth_db::{models::CompactU256, tables, transaction::DbTxMut, Database}; +use reth_db::Database; use reth_errors::ProviderResult; -use reth_primitives::{SealedBlock, StaticFileSegment, TransactionSignedNoHash, B256}; +use reth_primitives::{SealedBlock, StaticFileSegment, B256}; use reth_provider::{ providers::StaticFileProvider, writer::StorageWriter, BlockExecutionWriter, BlockNumReader, - BlockWriter, DatabaseProviderRW, HistoryWriter, OriginalValuesKnown, ProviderFactory, - StageCheckpointWriter, StateChangeWriter, StateWriter, StaticFileProviderFactory, - StaticFileWriter, TransactionsProviderExt, TrieWriter, + DatabaseProviderRW, ProviderFactory, StaticFileProviderFactory, StaticFileWriter, + TransactionsProviderExt, }; use reth_prune::{Pruner, PrunerOutput}; -use reth_stages_types::{StageCheckpoint, StageId}; use std::sync::{ mpsc::{Receiver, SendError, Sender}, Arc, }; use tokio::sync::oneshot; -use tracing::{debug, instrument}; +use tracing::debug; /// Writes parts of reth's in memory tree state to the database and static files. /// @@ -95,123 +93,6 @@ impl PersistenceService { // TODO: doing this properly depends on pruner segment changes self.pruner.run(block_num).expect("todo: handle errors") } - - /// Writes the transactions to static files. - /// - /// Returns the block number and new total difficulty. - #[instrument(level = "trace", skip_all, fields(block = ?block.num_hash()) target = "engine")] - fn write_transactions( - &self, - block: Arc, - provider_rw: &DatabaseProviderRW, - ) -> ProviderResult<()> { - debug!(target: "tree::persistence", "Writing transactions"); - let provider = self.provider.static_file_provider(); - - let td = { - let header_writer = provider.get_writer(block.number, StaticFileSegment::Headers)?; - let mut storage_writer = StorageWriter::new(Some(provider_rw), Some(header_writer)); - let td = storage_writer.append_headers_from_blocks( - block.header().number, - std::iter::once(&(block.header(), block.hash())), - )?; - - let transactions_writer = - provider.get_writer(block.number, StaticFileSegment::Transactions)?; - let mut storage_writer = - StorageWriter::new(Some(provider_rw), Some(transactions_writer)); - let no_hash_transactions = - block.body.clone().into_iter().map(TransactionSignedNoHash::from).collect(); - storage_writer.append_transactions_from_blocks( - block.header().number, - std::iter::once(&no_hash_transactions), - )?; - - td - }; - - debug!(target: "tree::persistence", block_num=block.number, "Updating transaction metadata after writing"); - provider_rw - .tx_ref() - .put::(block.number, CompactU256(td))?; - provider_rw.save_stage_checkpoint(StageId::Headers, StageCheckpoint::new(block.number))?; - provider_rw.save_stage_checkpoint(StageId::Bodies, StageCheckpoint::new(block.number))?; - - Ok(()) - } - - /// Writes the cloned tree state to database - fn save_blocks( - &self, - blocks: &[ExecutedBlock], - provider_rw: &DatabaseProviderRW, - static_file_provider: &StaticFileProvider, - ) -> ProviderResult<()> { - if blocks.is_empty() { - debug!(target: "tree::persistence", "Attempted to write empty block range"); - return Ok(()) - } - - // NOTE: checked non-empty above - let first_block = blocks.first().unwrap().block(); - let last_block = blocks.last().unwrap().block().clone(); - let first_number = first_block.number; - let last_block_number = last_block.number; - - // Only write receipts to static files if there is no receipt pruning configured. - let mut storage_writer = if provider_rw.prune_modes_ref().has_receipts_pruning() { - StorageWriter::new(Some(provider_rw), None) - } else { - StorageWriter::new( - Some(provider_rw), - Some( - static_file_provider - .get_writer(first_block.number, StaticFileSegment::Receipts)?, - ), - ) - }; - - debug!(target: "tree::persistence", block_count = %blocks.len(), "Writing blocks and execution data to storage"); - - // TODO: remove all the clones and do performant / batched writes for each type of object - // instead of a loop over all blocks, - // meaning: - // * blocks - // * state - // * hashed state - // * trie updates (cannot naively extend, need helper) - // * indices (already done basically) - // Insert the blocks - for block in blocks { - let sealed_block = - block.block().clone().try_with_senders_unchecked(block.senders().clone()).unwrap(); - provider_rw.insert_block(sealed_block)?; - self.write_transactions(block.block.clone(), provider_rw)?; - - // Write state and changesets to the database. - // Must be written after blocks because of the receipt lookup. - let execution_outcome = block.execution_outcome().clone(); - storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::No)?; - - // insert hashes and intermediate merkle nodes - { - let trie_updates = block.trie_updates().clone(); - let hashed_state = block.hashed_state(); - provider_rw.write_hashed_state(&hashed_state.clone().into_sorted())?; - provider_rw.write_trie_updates(&trie_updates)?; - } - } - - // update history indices - provider_rw.update_history_indices(first_number..=last_block_number)?; - - // Update pipeline progress - provider_rw.update_pipeline_stages(last_block_number, false)?; - - debug!(target: "tree::persistence", range = ?first_number..=last_block_number, "Appended block data"); - - Ok(()) - } } impl PersistenceService @@ -245,11 +126,12 @@ where let provider_rw = self.provider.provider_rw().expect("todo: handle errors"); let static_file_provider = self.provider.static_file_provider(); - self.save_blocks(&blocks, &provider_rw, &static_file_provider) - .expect("todo: handle errors"); - static_file_provider.commit().expect("todo: handle errors"); - provider_rw.commit().expect("todo: handle errors"); + StorageWriter::from(&provider_rw, &static_file_provider) + .save_blocks(&blocks) + .expect("todo: handle errors"); + StorageWriter::commit(provider_rw, static_file_provider) + .expect("todo: handle errors"); // we ignore the error because the caller may or may not care about the result let _ = sender.send(last_block_hash); diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 67e5299fc..d4e3ce98b 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -3228,7 +3228,7 @@ impl BlockExecutionReader for DatabaseProvider { } } -impl BlockExecutionWriter for DatabaseProviderRW { +impl BlockExecutionWriter for DatabaseProvider { fn take_block_and_execution_range( &self, range: RangeInclusive, @@ -3406,7 +3406,7 @@ impl BlockExecutionWriter for DatabaseProviderRW { } } -impl BlockWriter for DatabaseProviderRW { +impl BlockWriter for DatabaseProvider { /// Inserts the block into the database, always modifying the following tables: /// * [`CanonicalHeaders`](tables::CanonicalHeaders) /// * [`Headers`](tables::Headers) diff --git a/crates/storage/provider/src/writer/mod.rs b/crates/storage/provider/src/writer/mod.rs index 1443a7c57..b43eb30ce 100644 --- a/crates/storage/provider/src/writer/mod.rs +++ b/crates/storage/provider/src/writer/mod.rs @@ -1,21 +1,28 @@ use crate::{ - providers::StaticFileProviderRWRefMut, DatabaseProvider, StateChangeWriter, StateWriter, + providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter as SfWriter}, + writer::static_file::StaticFileWriter, + BlockWriter, DatabaseProvider, DatabaseProviderRW, HistoryWriter, StateChangeWriter, + StateWriter, TrieWriter, }; +use reth_chain_state::ExecutedBlock; use reth_db::{ cursor::DbCursorRO, + models::CompactU256, tables, transaction::{DbTx, DbTxMut}, + Database, }; use reth_errors::{ProviderError, ProviderResult}; use reth_execution_types::ExecutionOutcome; use reth_primitives::{ - BlockNumber, Header, StaticFileSegment, TransactionSignedNoHash, B256, U256, + BlockNumber, Header, SealedBlock, StaticFileSegment, TransactionSignedNoHash, B256, U256, }; -use reth_storage_api::{HeaderProvider, ReceiptWriter}; +use reth_stages_types::{StageCheckpoint, StageId}; +use reth_storage_api::{HeaderProvider, ReceiptWriter, StageCheckpointWriter}; use reth_storage_errors::writer::StorageWriterError; use revm::db::OriginalValuesKnown; -use static_file::StaticFileWriter; -use std::borrow::Borrow; +use std::{borrow::Borrow, sync::Arc}; +use tracing::{debug, instrument}; mod database; mod static_file; @@ -44,6 +51,12 @@ impl<'a, TX, SF> StorageWriter<'a, TX, SF> { Self { database, static_file } } + /// Creates a new instance of [`StorageWriter`] from a database provider and a static file + /// instance. + pub const fn from(database: &'a DatabaseProvider, static_file: SF) -> Self { + Self::new(Some(database), Some(static_file)) + } + /// Creates a new instance of [`StorageWriter`] from a static file instance. pub const fn from_static_file(static_file: SF) -> Self { Self::new(None, Some(static_file)) @@ -51,7 +64,7 @@ impl<'a, TX, SF> StorageWriter<'a, TX, SF> { /// Creates a new instance of [`StorageWriter`] from a database provider. pub const fn from_database(database: &'a DatabaseProvider) -> Self { - StorageWriter::new(Some(database), None) + Self::new(Some(database), None) } /// Returns a reference to the database writer. @@ -62,11 +75,19 @@ impl<'a, TX, SF> StorageWriter<'a, TX, SF> { self.database.as_ref().expect("should exist") } + /// Returns a reference to the static file instance. + /// + /// # Panics + /// If the static file instance is not set. + fn static_file(&self) -> &SF { + self.static_file.as_ref().expect("should exist") + } + /// Returns a mutable reference to the static file instance. /// /// # Panics /// If the static file instance is not set. - fn static_file(&mut self) -> &mut SF { + fn static_file_mut(&mut self) -> &mut SF { self.static_file.as_mut().expect("should exist") } @@ -96,6 +117,146 @@ impl<'a, TX, SF> StorageWriter<'a, TX, SF> { } } +impl StorageWriter<'_, (), ()> { + /// Commits both storage types in the right order. + /// + /// NOTE: If unwinding data from storage, use `commit_unwind` instead! + pub fn commit( + database: DatabaseProviderRW, + static_file: StaticFileProvider, + ) -> ProviderResult<()> { + static_file.commit()?; + database.commit()?; + Ok(()) + } + + /// Commits both storage types in the right order for an unwind operation. + /// + /// NOTE: Should only be used after unwinding data from storage! + pub fn commit_unwind( + database: DatabaseProviderRW, + static_file: StaticFileProvider, + ) -> ProviderResult<()> { + database.commit()?; + static_file.commit()?; + Ok(()) + } +} + +impl<'a, 'b, TX> StorageWriter<'a, TX, &'b StaticFileProvider> +where + TX: DbTxMut + DbTx, +{ + /// Writes executed blocks and receipts to storage. + pub fn save_blocks(&self, blocks: &[ExecutedBlock]) -> ProviderResult<()> { + if blocks.is_empty() { + debug!(target: "provider::storage_writer", "Attempted to write empty block range"); + return Ok(()) + } + + // NOTE: checked non-empty above + let first_block = blocks.first().unwrap().block(); + let last_block = blocks.last().unwrap().block().clone(); + let first_number = first_block.number; + let last_block_number = last_block.number; + + // Only write receipts to static files if there is no receipt pruning configured. + let mut state_writer = if self.database().prune_modes_ref().has_receipts_pruning() { + StorageWriter::from_database(self.database()) + } else { + StorageWriter::new( + Some(self.database()), + Some( + self.static_file() + .get_writer(first_block.number, StaticFileSegment::Receipts)?, + ), + ) + }; + + debug!(target: "tree::persistence", block_count = %blocks.len(), "Writing blocks and execution data to storage"); + + // TODO: remove all the clones and do performant / batched writes for each type of object + // instead of a loop over all blocks, + // meaning: + // * blocks + // * state + // * hashed state + // * trie updates (cannot naively extend, need helper) + // * indices (already done basically) + // Insert the blocks + for block in blocks { + let sealed_block = + block.block().clone().try_with_senders_unchecked(block.senders().clone()).unwrap(); + self.database().insert_block(sealed_block)?; + self.save_header_and_transactions(block.block.clone())?; + + // Write state and changesets to the database. + // Must be written after blocks because of the receipt lookup. + let execution_outcome = block.execution_outcome().clone(); + state_writer.write_to_storage(execution_outcome, OriginalValuesKnown::No)?; + + // insert hashes and intermediate merkle nodes + { + let trie_updates = block.trie_updates().clone(); + let hashed_state = block.hashed_state(); + self.database().write_hashed_state(&hashed_state.clone().into_sorted())?; + self.database().write_trie_updates(&trie_updates)?; + } + } + + // update history indices + self.database().update_history_indices(first_number..=last_block_number)?; + + // Update pipeline progress + self.database().update_pipeline_stages(last_block_number, false)?; + + debug!(target: "tree::persistence", range = ?first_number..=last_block_number, "Appended block data"); + + Ok(()) + } + + /// Writes the header & transactions to static files, and updates their respective checkpoints + /// on database. + #[instrument(level = "trace", skip_all, fields(block = ?block.num_hash()) target = "storage")] + fn save_header_and_transactions(&self, block: Arc) -> ProviderResult<()> { + debug!(target: "tree::persistence", "Writing headers and transactions."); + + { + let header_writer = + self.static_file().get_writer(block.number, StaticFileSegment::Headers)?; + let mut storage_writer = StorageWriter::new(Some(self.database()), Some(header_writer)); + let td = storage_writer.append_headers_from_blocks( + block.header().number, + std::iter::once(&(block.header(), block.hash())), + )?; + + debug!(target: "tree::persistence", block_num=block.number, "Updating transaction metadata after writing"); + self.database() + .tx_ref() + .put::(block.number, CompactU256(td))?; + self.database() + .save_stage_checkpoint(StageId::Headers, StageCheckpoint::new(block.number))?; + } + + { + let transactions_writer = + self.static_file().get_writer(block.number, StaticFileSegment::Transactions)?; + let mut storage_writer = + StorageWriter::new(Some(self.database()), Some(transactions_writer)); + let no_hash_transactions = + block.body.clone().into_iter().map(TransactionSignedNoHash::from).collect(); + storage_writer.append_transactions_from_blocks( + block.header().number, + std::iter::once(&no_hash_transactions), + )?; + self.database() + .save_stage_checkpoint(StageId::Bodies, StageCheckpoint::new(block.number))?; + } + + Ok(()) + } +} + impl<'a, 'b, TX> StorageWriter<'a, TX, StaticFileProviderRWRefMut<'b>> where TX: DbTx, @@ -151,7 +312,7 @@ where let (header, hash) = pair.borrow(); let header = header.borrow(); td += header.difficulty; - self.static_file().append_header(header, td, hash)?; + self.static_file_mut().append_header(header, td, hash)?; } Ok(td) @@ -196,11 +357,11 @@ where .ok_or_else(|| ProviderError::BlockBodyIndicesNotFound(block_number))?; for tx in transactions.borrow() { - self.static_file().append_transaction(tx_index, tx)?; + self.static_file_mut().append_transaction(tx_index, tx)?; tx_index += 1; } - self.static_file().increment_block(block_number)?; + self.static_file_mut().increment_block(block_number)?; // update index last_tx_idx = Some(tx_index); @@ -244,7 +405,7 @@ where StorageType::Database(self.database().tx_ref().cursor_write::()?) } else { self.ensure_static_file_segment(StaticFileSegment::Receipts)?; - StorageType::StaticFile(self.static_file()) + StorageType::StaticFile(self.static_file_mut()) }; let mut last_tx_idx = None;