diff --git a/Cargo.lock b/Cargo.lock index 56200dd4d..57d4a7dea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7084,6 +7084,7 @@ dependencies = [ "reth-rpc-types", "reth-stages", "reth-stages-api", + "reth-stages-types", "reth-static-file", "reth-tasks", "reth-tokio-util", diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index e2a1c462d..57adf06f1 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -33,6 +33,7 @@ reth-prune.workspace = true reth-prune-types.workspace = true reth-revm.workspace = true reth-rpc-types.workspace = true +reth-stages-types.workspace = true reth-stages-api.workspace = true reth-static-file.workspace = true reth-tasks.workspace = true diff --git a/crates/engine/tree/src/lib.rs b/crates/engine/tree/src/lib.rs index 8f40119b2..6540269f7 100644 --- a/crates/engine/tree/src/lib.rs +++ b/crates/engine/tree/src/lib.rs @@ -28,6 +28,8 @@ pub mod engine; pub mod metrics; /// The background writer task for batch db writes. pub mod persistence; +/// The background writer task for static file writes. +pub mod static_files; /// Support for interacting with the blockchain tree. pub mod tree; diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index 23b3a5827..5b5092727 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -1,6 +1,6 @@ #![allow(dead_code)] -use crate::tree::ExecutedBlock; +use crate::{static_files::StaticFileServiceHandle, tree::ExecutedBlock}; use reth_db::database::Database; use reth_errors::ProviderResult; use reth_primitives::B256; @@ -9,16 +9,17 @@ use reth_provider::{ ProviderFactory, StageCheckpointWriter, StateWriter, }; use reth_prune::{PruneProgress, Pruner}; +use reth_stages_types::{StageCheckpoint, StageId}; use std::sync::mpsc::{Receiver, SendError, Sender}; use tokio::sync::oneshot; use tracing::debug; /// Writes parts of reth's in memory tree state to the database. /// -/// This is meant to be a spawned task that listens for various incoming persistence operations, +/// This is meant to be a spawned service that listens for various incoming persistence operations, /// performing those actions on disk, and returning the result in a channel. /// -/// There are two types of operations this task can perform: +/// There are two types of operations this service can perform: /// - Writing executed blocks to disk, returning the hash of the latest block that was inserted. /// - Removing blocks from disk, returning the removed blocks. /// @@ -30,18 +31,21 @@ pub struct Persistence { provider: ProviderFactory, /// Incoming requests to persist stuff incoming: Receiver, + /// Handle for the static file service. + static_file_handle: StaticFileServiceHandle, /// The pruner pruner: Pruner, } impl Persistence { - /// Create a new persistence task + /// Create a new persistence service const fn new( provider: ProviderFactory, incoming: Receiver, + static_file_handle: StaticFileServiceHandle, pruner: Pruner, ) -> Self { - Self { provider, incoming, pruner } + Self { provider, incoming, static_file_handle, pruner } } /// Writes the cloned tree state to the database @@ -101,9 +105,18 @@ impl Persistence { Ok(()) } - /// Removes the blocks above the give block number from the database, returning them. - fn remove_blocks_above(&self, _block_number: u64) -> Vec { - todo!("implement this") + /// Removes block data above the given block number from the database. + /// This is exclusive, i.e., it only removes blocks above `block_number`, and does not remove + /// `block_number`. + /// + /// Returns the block hash for the lowest block removed from the database, which should be + /// the hash for `block_number + 1`. + /// + /// This will then send a command to the static file service, to remove the actual block data. + fn remove_blocks_above(&self, block_number: u64) -> ProviderResult { + todo!("depends on PR") + // let mut provider_rw = self.provider.provider_rw()?; + // provider_rw.get_or_take_block_and_execution_range(range); } /// Prunes block data before the given block hash according to the configured prune @@ -113,10 +126,14 @@ impl Persistence { self.pruner.run(block_num).expect("todo: handle errors") } - /// Removes static file related data from the database, depending on the current block height in - /// existing static files. - fn clean_static_file_duplicates(&self) { - todo!("implement this") + /// Updates checkpoints related to block headers and bodies. This should be called by the static + /// file service, after new transactions have been successfully written to disk. + fn update_transaction_meta(&self, block_num: u64) -> ProviderResult<()> { + let provider_rw = self.provider.provider_rw()?; + provider_rw.save_stage_checkpoint(StageId::Headers, StageCheckpoint::new(block_num))?; + provider_rw.save_stage_checkpoint(StageId::Bodies, StageCheckpoint::new(block_num))?; + provider_rw.commit()?; + Ok(()) } } @@ -124,13 +141,17 @@ impl Persistence where DB: Database + 'static, { - /// Create a new persistence task, spawning it, and returning a [`PersistenceHandle`]. - fn spawn_new(provider: ProviderFactory, pruner: Pruner) -> PersistenceHandle { + /// Create a new persistence service, spawning it, and returning a [`PersistenceHandle`]. + fn spawn_new( + provider: ProviderFactory, + static_file_handle: StaticFileServiceHandle, + pruner: Pruner, + ) -> PersistenceHandle { let (tx, rx) = std::sync::mpsc::channel(); - let task = Self::new(provider, rx, pruner); + let service = Self::new(provider, rx, static_file_handle, pruner); std::thread::Builder::new() - .name("Persistence Task".to_string()) - .spawn(|| task.run()) + .name("Persistence Service".to_string()) + .spawn(|| service.run()) .unwrap(); PersistenceHandle::new(tx) @@ -148,7 +169,8 @@ where while let Ok(action) = self.incoming.recv() { match action { PersistenceAction::RemoveBlocksAbove((new_tip_num, sender)) => { - let output = self.remove_blocks_above(new_tip_num); + let output = + self.remove_blocks_above(new_tip_num).expect("todo: handle errors"); // we ignore the error because the caller may or may not care about the result let _ = sender.send(output); @@ -169,8 +191,8 @@ where // we ignore the error because the caller may or may not care about the result let _ = sender.send(res); } - PersistenceAction::CleanStaticFileDuplicates(sender) => { - self.clean_static_file_duplicates(); + PersistenceAction::UpdateTransactionMeta((block_num, sender)) => { + self.update_transaction_meta(block_num).expect("todo: handle errors"); // we ignore the error because the caller may or may not care about the result let _ = sender.send(()); @@ -180,29 +202,36 @@ where } } -/// A signal to the persistence task that part of the tree state can be persisted. +/// A signal to the persistence service that part of the tree state can be persisted. #[derive(Debug)] pub enum PersistenceAction { /// The section of tree state that should be persisted. These blocks are expected in order of /// increasing block number. + /// + /// This should just store the execution history-related data. Header, transaction, and + /// receipt-related data should already be written to static files. SaveBlocks((Vec, oneshot::Sender)), - /// Removes the blocks above the given block number from the database. - RemoveBlocksAbove((u64, oneshot::Sender>)), + /// Updates checkpoints related to block headers and bodies. This should be called by the + /// static file service, after new transactions have been successfully written to disk. + UpdateTransactionMeta((u64, oneshot::Sender<()>)), + + /// Removes block data above the given block number from the database. + /// + /// This will then send a command to the static file service, to remove the actual block data. + /// + /// Returns the block hash for the lowest block removed from the database. + RemoveBlocksAbove((u64, oneshot::Sender)), /// Prune associated block data before the given block number, according to already-configured /// prune modes. PruneBefore((u64, oneshot::Sender)), - - /// Trigger a read of static file data, and delete data depending on the highest block in each - /// static file segment. - CleanStaticFileDuplicates(oneshot::Sender<()>), } -/// A handle to the persistence task +/// A handle to the persistence service #[derive(Debug, Clone)] pub struct PersistenceHandle { - /// The channel used to communicate with the persistence task + /// The channel used to communicate with the persistence service sender: Sender, } @@ -221,7 +250,7 @@ impl PersistenceHandle { self.sender.send(action) } - /// Tells the persistence task to save a certain list of finalized blocks. The blocks are + /// Tells the persistence service to save a certain list of finalized blocks. The blocks are /// assumed to be ordered by block number. /// /// This returns the latest hash that has been saved, allowing removal of that block and any @@ -234,9 +263,9 @@ impl PersistenceHandle { rx.await.expect("todo: err handling") } - /// Tells the persistence task to remove blocks above a certain block number. The removed blocks - /// are returned by the task. - pub async fn remove_blocks_above(&self, block_num: u64) -> Vec { + /// Tells the persistence service to remove blocks above a certain block number. The removed + /// blocks are returned by the service. + pub async fn remove_blocks_above(&self, block_num: u64) -> B256 { let (tx, rx) = oneshot::channel(); self.sender .send(PersistenceAction::RemoveBlocksAbove((block_num, tx))) @@ -244,7 +273,7 @@ impl PersistenceHandle { rx.await.expect("todo: err handling") } - /// Tells the persistence task to remove block data before the given hash, according to the + /// Tells the persistence service to remove block data before the given hash, according to the /// configured prune config. pub async fn prune_before(&self, block_num: u64) -> PruneProgress { let (tx, rx) = oneshot::channel(); @@ -253,14 +282,4 @@ impl PersistenceHandle { .expect("should be able to send"); rx.await.expect("todo: err handling") } - - /// Tells the persistence task to read static file data, and delete data depending on the - /// highest block in each static file segment. - pub async fn clean_static_file_duplicates(&self) { - let (tx, rx) = oneshot::channel(); - self.sender - .send(PersistenceAction::CleanStaticFileDuplicates(tx)) - .expect("should be able to send"); - rx.await.expect("todo: err handling") - } } diff --git a/crates/engine/tree/src/static_files.rs b/crates/engine/tree/src/static_files.rs new file mode 100644 index 000000000..acc022a9a --- /dev/null +++ b/crates/engine/tree/src/static_files.rs @@ -0,0 +1,252 @@ +#![allow(dead_code)] + +use reth_db::database::Database; +use reth_errors::ProviderResult; +use reth_primitives::{SealedBlock, StaticFileSegment, TransactionSignedNoHash, B256, U256}; +use reth_provider::{ProviderFactory, StaticFileProviderFactory, StaticFileWriter}; +use reth_prune::PruneModes; +use std::sync::{ + mpsc::{Receiver, SendError, Sender}, + Arc, +}; +use tokio::sync::oneshot; + +use crate::{ + persistence::{PersistenceAction, PersistenceHandle}, + tree::ExecutedBlock, +}; + +/// Writes finalized blocks to reth's static files. +/// +/// This is meant to be a spawned service that listens for various incoming finalization operations, +/// and writing to or producing new static files. +/// +/// This should be spawned in its own thread with [`std::thread::spawn`], since this performs +/// blocking file operations in an endless loop. +#[derive(Debug)] +pub struct StaticFileService { + /// The db / static file provider to use + provider: ProviderFactory, + /// Handle for the database service + database_handle: PersistenceHandle, + /// Incoming requests to write static files + incoming: Receiver, + /// The pruning configuration + pruning: PruneModes, +} + +impl StaticFileService +where + DB: Database + 'static, +{ + /// Create a new static file service, spawning it, and returning a [`StaticFileServiceHandle`]. + fn spawn_new(provider: ProviderFactory) -> StaticFileServiceHandle { + todo!("implement initialization first"); + // let (tx, rx) = std::sync::mpsc::channel(); + // let service = Self::new(provider, rx); + // std::thread::Builder::new() + // .name("StaticFile Service".to_string()) + // .spawn(|| service.run()) + // .unwrap(); + + // StaticFileServiceHandle::new(tx) + } + + // TODO: some things about this are a bit weird, and just to make the underlying static file + // writes work - tx number, total difficulty inclusion. They require either additional in memory + // data or a db lookup. Maybe we can use a db read here + /// Writes the transactions to static files, to act as a log. + /// + /// This will then send a command to the db service, that it should update the checkpoints for + /// headers and block bodies. + fn log_transactions( + &self, + block: Arc, + start_tx_number: u64, + td: U256, + sender: oneshot::Sender<()>, + ) -> ProviderResult<()> { + let provider = self.provider.static_file_provider(); + let mut header_writer = provider.get_writer(block.number, StaticFileSegment::Headers)?; + let mut transactions_writer = + provider.get_writer(block.number, StaticFileSegment::Transactions)?; + + // TODO: does to_compact require ownership? + header_writer.append_header(block.header().clone(), td, block.hash())?; + let no_hash_transactions = + block.body.clone().into_iter().map(TransactionSignedNoHash::from); + + let mut tx_number = start_tx_number; + for tx in no_hash_transactions { + transactions_writer.append_transaction(tx_number, tx)?; + tx_number += 1; + } + + // increment block for both segments + header_writer.increment_block(StaticFileSegment::Headers, block.number)?; + transactions_writer.increment_block(StaticFileSegment::Transactions, block.number)?; + + // finally commit + header_writer.commit()?; + transactions_writer.commit()?; + + // TODO: do we care about the mpsc error here? + // send a command to the db service to update the checkpoints for headers / bodies + let _ = self + .database_handle + .send_action(PersistenceAction::UpdateTransactionMeta((block.number, sender))); + + Ok(()) + } + + /// Write execution-related block data to static files. + /// + /// This will then send a command to the db service, that it should write new data, and update + /// the checkpoints for execution and beyond. + fn write_execution_data( + &self, + blocks: Vec, + sender: oneshot::Sender, + ) -> ProviderResult<()> { + if blocks.is_empty() { + return Ok(()) + } + let provider = self.provider.static_file_provider(); + + // NOTE: checked non-empty above + let first_block = blocks.first().unwrap().block(); + let last_block = blocks.last().unwrap().block(); + + let mut receipts_writer = + provider.get_writer(first_block.number, StaticFileSegment::Receipts)?; + for (num, receipts) in blocks + .iter() + .map(|block| (block.block().number, block.execution_outcome().receipts.clone())) + { + debug_assert!(receipts.len() == 1); + // TODO: should we also assert that the receipt is not None here, that means the + // receipt is pruned + for receipt in receipts.first().unwrap().iter().flatten() { + receipts_writer.append_receipt(num, receipt.clone())?; + } + } + + // TODO: do we care about the mpsc error here? + // send a command to the db service to update the checkpoints for execution etc. + let _ = self.database_handle.send_action(PersistenceAction::SaveBlocks((blocks, sender))); + + Ok(()) + } + + /// Removes the blocks above the given block number from static files. Also removes related + /// receipt and header data. + /// + /// This is exclusive, i.e., it only removes blocks above `block_number`, and does not remove + /// `block_number`. + /// + /// Returns the block hash for the lowest block removed from the database, which should be + /// the hash for `block_number + 1`. + /// + /// This is meant to be called by the db service, as this should only be done after related data + /// is removed from the database, and checkpoints are updated. + /// + /// Returns the hash of the lowest removed block. + fn remove_blocks_above( + &self, + block_num: u64, + sender: oneshot::Sender, + ) -> ProviderResult<()> { + let provider = self.provider.static_file_provider(); + + // get the writers + let mut _header_writer = provider.get_writer(block_num, StaticFileSegment::Headers)?; + let mut _transactions_writer = + provider.get_writer(block_num, StaticFileSegment::Transactions)?; + let mut _receipts_writer = provider.get_writer(block_num, StaticFileSegment::Receipts)?; + + // TODO: how do we delete s.t. `block_num` is the start? Additionally, do we need to index + // by tx num for the transactions segment? + todo!("implement me") + } +} + +impl StaticFileService +where + DB: Database + 'static, +{ + /// This is the main loop, that will listen to static file actions, and write DB data to static + /// files. + fn run(self) { + // If the receiver errors then senders have disconnected, so the loop should then end. + while let Ok(action) = self.incoming.recv() { + match action { + StaticFileAction::LogTransactions(( + block, + start_tx_number, + td, + response_sender, + )) => { + self.log_transactions(block, start_tx_number, td, response_sender) + .expect("todo: handle errors"); + todo!("implement me") + } + StaticFileAction::RemoveBlocksAbove((block_num, response_sender)) => { + self.remove_blocks_above(block_num, response_sender) + .expect("todo: handle errors"); + todo!("implement me") + } + StaticFileAction::WriteExecutionData((blocks, response_sender)) => { + self.write_execution_data(blocks, response_sender) + .expect("todo: handle errors"); + todo!("implement me") + } + } + } + } +} + +/// A signal to the static file service that some data should be copied from the DB to static files. +#[derive(Debug)] +pub enum StaticFileAction { + /// The given block has been added to the canonical chain, its transactions and headers will be + /// persisted for durability. + /// + /// This will then send a command to the db service, that it should update the checkpoints for + /// headers and block bodies. + LogTransactions((Arc, u64, U256, oneshot::Sender<()>)), + + /// Write execution-related block data to static files. + /// + /// This will then send a command to the db service, that it should write new data, and update + /// the checkpoints for execution and beyond. + WriteExecutionData((Vec, oneshot::Sender)), + + /// Removes the blocks above the given block number from static files. Also removes related + /// receipt and header data. + /// + /// This is meant to be called by the db service, as this should only be done after related + /// data is removed from the database, and checkpoints are updated. + /// + /// Returns the hash of the lowest removed block. + RemoveBlocksAbove((u64, oneshot::Sender)), +} + +/// A handle to the static file service +#[derive(Debug, Clone)] +pub struct StaticFileServiceHandle { + /// The channel used to communicate with the static file service + sender: Sender, +} + +impl StaticFileServiceHandle { + /// Create a new [`StaticFileServiceHandle`] from a [`Sender`]. + pub const fn new(sender: Sender) -> Self { + Self { sender } + } + + /// Sends a specific [`StaticFileAction`] in the contained channel. The caller is responsible + /// for creating any channels for the given action. + pub fn send_action(&self, action: StaticFileAction) -> Result<(), SendError> { + self.sender.send(action) + } +}