feat: introduce static file task (#9234)

This commit is contained in:
Dan Cline
2024-07-10 15:30:22 -04:00
committed by GitHub
parent f384b8e175
commit 797a4a1a38
5 changed files with 319 additions and 44 deletions

1
Cargo.lock generated
View File

@ -7084,6 +7084,7 @@ dependencies = [
"reth-rpc-types",
"reth-stages",
"reth-stages-api",
"reth-stages-types",
"reth-static-file",
"reth-tasks",
"reth-tokio-util",

View File

@ -33,6 +33,7 @@ reth-prune.workspace = true
reth-prune-types.workspace = true
reth-revm.workspace = true
reth-rpc-types.workspace = true
reth-stages-types.workspace = true
reth-stages-api.workspace = true
reth-static-file.workspace = true
reth-tasks.workspace = true

View File

@ -28,6 +28,8 @@ pub mod engine;
pub mod metrics;
/// The background writer task for batch db writes.
pub mod persistence;
/// The background writer task for static file writes.
pub mod static_files;
/// Support for interacting with the blockchain tree.
pub mod tree;

View File

@ -1,6 +1,6 @@
#![allow(dead_code)]
use crate::tree::ExecutedBlock;
use crate::{static_files::StaticFileServiceHandle, tree::ExecutedBlock};
use reth_db::database::Database;
use reth_errors::ProviderResult;
use reth_primitives::B256;
@ -9,16 +9,17 @@ use reth_provider::{
ProviderFactory, StageCheckpointWriter, StateWriter,
};
use reth_prune::{PruneProgress, Pruner};
use reth_stages_types::{StageCheckpoint, StageId};
use std::sync::mpsc::{Receiver, SendError, Sender};
use tokio::sync::oneshot;
use tracing::debug;
/// Writes parts of reth's in memory tree state to the database.
///
/// This is meant to be a spawned task that listens for various incoming persistence operations,
/// This is meant to be a spawned service that listens for various incoming persistence operations,
/// performing those actions on disk, and returning the result in a channel.
///
/// There are two types of operations this task can perform:
/// There are two types of operations this service can perform:
/// - Writing executed blocks to disk, returning the hash of the latest block that was inserted.
/// - Removing blocks from disk, returning the removed blocks.
///
@ -30,18 +31,21 @@ pub struct Persistence<DB> {
provider: ProviderFactory<DB>,
/// Incoming requests to persist stuff
incoming: Receiver<PersistenceAction>,
/// Handle for the static file service.
static_file_handle: StaticFileServiceHandle,
/// The pruner
pruner: Pruner<DB>,
}
impl<DB: Database> Persistence<DB> {
/// Create a new persistence task
/// Create a new persistence service
const fn new(
provider: ProviderFactory<DB>,
incoming: Receiver<PersistenceAction>,
static_file_handle: StaticFileServiceHandle,
pruner: Pruner<DB>,
) -> Self {
Self { provider, incoming, pruner }
Self { provider, incoming, static_file_handle, pruner }
}
/// Writes the cloned tree state to the database
@ -101,9 +105,18 @@ impl<DB: Database> Persistence<DB> {
Ok(())
}
/// Removes the blocks above the give block number from the database, returning them.
fn remove_blocks_above(&self, _block_number: u64) -> Vec<ExecutedBlock> {
todo!("implement this")
/// Removes block data above the given block number from the database.
/// This is exclusive, i.e., it only removes blocks above `block_number`, and does not remove
/// `block_number`.
///
/// Returns the block hash for the lowest block removed from the database, which should be
/// the hash for `block_number + 1`.
///
/// This will then send a command to the static file service, to remove the actual block data.
fn remove_blocks_above(&self, block_number: u64) -> ProviderResult<B256> {
todo!("depends on PR")
// let mut provider_rw = self.provider.provider_rw()?;
// provider_rw.get_or_take_block_and_execution_range(range);
}
/// Prunes block data before the given block hash according to the configured prune
@ -113,10 +126,14 @@ impl<DB: Database> Persistence<DB> {
self.pruner.run(block_num).expect("todo: handle errors")
}
/// Removes static file related data from the database, depending on the current block height in
/// existing static files.
fn clean_static_file_duplicates(&self) {
todo!("implement this")
/// Updates checkpoints related to block headers and bodies. This should be called by the static
/// file service, after new transactions have been successfully written to disk.
fn update_transaction_meta(&self, block_num: u64) -> ProviderResult<()> {
let provider_rw = self.provider.provider_rw()?;
provider_rw.save_stage_checkpoint(StageId::Headers, StageCheckpoint::new(block_num))?;
provider_rw.save_stage_checkpoint(StageId::Bodies, StageCheckpoint::new(block_num))?;
provider_rw.commit()?;
Ok(())
}
}
@ -124,13 +141,17 @@ impl<DB> Persistence<DB>
where
DB: Database + 'static,
{
/// Create a new persistence task, spawning it, and returning a [`PersistenceHandle`].
fn spawn_new(provider: ProviderFactory<DB>, pruner: Pruner<DB>) -> PersistenceHandle {
/// Create a new persistence service, spawning it, and returning a [`PersistenceHandle`].
fn spawn_new(
provider: ProviderFactory<DB>,
static_file_handle: StaticFileServiceHandle,
pruner: Pruner<DB>,
) -> PersistenceHandle {
let (tx, rx) = std::sync::mpsc::channel();
let task = Self::new(provider, rx, pruner);
let service = Self::new(provider, rx, static_file_handle, pruner);
std::thread::Builder::new()
.name("Persistence Task".to_string())
.spawn(|| task.run())
.name("Persistence Service".to_string())
.spawn(|| service.run())
.unwrap();
PersistenceHandle::new(tx)
@ -148,7 +169,8 @@ where
while let Ok(action) = self.incoming.recv() {
match action {
PersistenceAction::RemoveBlocksAbove((new_tip_num, sender)) => {
let output = self.remove_blocks_above(new_tip_num);
let output =
self.remove_blocks_above(new_tip_num).expect("todo: handle errors");
// we ignore the error because the caller may or may not care about the result
let _ = sender.send(output);
@ -169,8 +191,8 @@ where
// we ignore the error because the caller may or may not care about the result
let _ = sender.send(res);
}
PersistenceAction::CleanStaticFileDuplicates(sender) => {
self.clean_static_file_duplicates();
PersistenceAction::UpdateTransactionMeta((block_num, sender)) => {
self.update_transaction_meta(block_num).expect("todo: handle errors");
// we ignore the error because the caller may or may not care about the result
let _ = sender.send(());
@ -180,29 +202,36 @@ where
}
}
/// A signal to the persistence task that part of the tree state can be persisted.
/// A signal to the persistence service that part of the tree state can be persisted.
#[derive(Debug)]
pub enum PersistenceAction {
/// The section of tree state that should be persisted. These blocks are expected in order of
/// increasing block number.
///
/// This should just store the execution history-related data. Header, transaction, and
/// receipt-related data should already be written to static files.
SaveBlocks((Vec<ExecutedBlock>, oneshot::Sender<B256>)),
/// Removes the blocks above the given block number from the database.
RemoveBlocksAbove((u64, oneshot::Sender<Vec<ExecutedBlock>>)),
/// Updates checkpoints related to block headers and bodies. This should be called by the
/// static file service, after new transactions have been successfully written to disk.
UpdateTransactionMeta((u64, oneshot::Sender<()>)),
/// Removes block data above the given block number from the database.
///
/// This will then send a command to the static file service, to remove the actual block data.
///
/// Returns the block hash for the lowest block removed from the database.
RemoveBlocksAbove((u64, oneshot::Sender<B256>)),
/// Prune associated block data before the given block number, according to already-configured
/// prune modes.
PruneBefore((u64, oneshot::Sender<PruneProgress>)),
/// Trigger a read of static file data, and delete data depending on the highest block in each
/// static file segment.
CleanStaticFileDuplicates(oneshot::Sender<()>),
}
/// A handle to the persistence task
/// A handle to the persistence service
#[derive(Debug, Clone)]
pub struct PersistenceHandle {
/// The channel used to communicate with the persistence task
/// The channel used to communicate with the persistence service
sender: Sender<PersistenceAction>,
}
@ -221,7 +250,7 @@ impl PersistenceHandle {
self.sender.send(action)
}
/// Tells the persistence task to save a certain list of finalized blocks. The blocks are
/// Tells the persistence service to save a certain list of finalized blocks. The blocks are
/// assumed to be ordered by block number.
///
/// This returns the latest hash that has been saved, allowing removal of that block and any
@ -234,9 +263,9 @@ impl PersistenceHandle {
rx.await.expect("todo: err handling")
}
/// Tells the persistence task to remove blocks above a certain block number. The removed blocks
/// are returned by the task.
pub async fn remove_blocks_above(&self, block_num: u64) -> Vec<ExecutedBlock> {
/// Tells the persistence service to remove blocks above a certain block number. The removed
/// blocks are returned by the service.
pub async fn remove_blocks_above(&self, block_num: u64) -> B256 {
let (tx, rx) = oneshot::channel();
self.sender
.send(PersistenceAction::RemoveBlocksAbove((block_num, tx)))
@ -244,7 +273,7 @@ impl PersistenceHandle {
rx.await.expect("todo: err handling")
}
/// Tells the persistence task to remove block data before the given hash, according to the
/// Tells the persistence service to remove block data before the given hash, according to the
/// configured prune config.
pub async fn prune_before(&self, block_num: u64) -> PruneProgress {
let (tx, rx) = oneshot::channel();
@ -253,14 +282,4 @@ impl PersistenceHandle {
.expect("should be able to send");
rx.await.expect("todo: err handling")
}
/// Tells the persistence task to read static file data, and delete data depending on the
/// highest block in each static file segment.
pub async fn clean_static_file_duplicates(&self) {
let (tx, rx) = oneshot::channel();
self.sender
.send(PersistenceAction::CleanStaticFileDuplicates(tx))
.expect("should be able to send");
rx.await.expect("todo: err handling")
}
}

View File

@ -0,0 +1,252 @@
#![allow(dead_code)]
use reth_db::database::Database;
use reth_errors::ProviderResult;
use reth_primitives::{SealedBlock, StaticFileSegment, TransactionSignedNoHash, B256, U256};
use reth_provider::{ProviderFactory, StaticFileProviderFactory, StaticFileWriter};
use reth_prune::PruneModes;
use std::sync::{
mpsc::{Receiver, SendError, Sender},
Arc,
};
use tokio::sync::oneshot;
use crate::{
persistence::{PersistenceAction, PersistenceHandle},
tree::ExecutedBlock,
};
/// Writes finalized blocks to reth's static files.
///
/// This is meant to be a spawned service that listens for various incoming finalization operations,
/// and writing to or producing new static files.
///
/// This should be spawned in its own thread with [`std::thread::spawn`], since this performs
/// blocking file operations in an endless loop.
#[derive(Debug)]
pub struct StaticFileService<DB> {
/// The db / static file provider to use
provider: ProviderFactory<DB>,
/// Handle for the database service
database_handle: PersistenceHandle,
/// Incoming requests to write static files
incoming: Receiver<StaticFileAction>,
/// The pruning configuration
pruning: PruneModes,
}
impl<DB> StaticFileService<DB>
where
DB: Database + 'static,
{
/// Create a new static file service, spawning it, and returning a [`StaticFileServiceHandle`].
fn spawn_new(provider: ProviderFactory<DB>) -> StaticFileServiceHandle {
todo!("implement initialization first");
// let (tx, rx) = std::sync::mpsc::channel();
// let service = Self::new(provider, rx);
// std::thread::Builder::new()
// .name("StaticFile Service".to_string())
// .spawn(|| service.run())
// .unwrap();
// StaticFileServiceHandle::new(tx)
}
// TODO: some things about this are a bit weird, and just to make the underlying static file
// writes work - tx number, total difficulty inclusion. They require either additional in memory
// data or a db lookup. Maybe we can use a db read here
/// Writes the transactions to static files, to act as a log.
///
/// This will then send a command to the db service, that it should update the checkpoints for
/// headers and block bodies.
fn log_transactions(
&self,
block: Arc<SealedBlock>,
start_tx_number: u64,
td: U256,
sender: oneshot::Sender<()>,
) -> ProviderResult<()> {
let provider = self.provider.static_file_provider();
let mut header_writer = provider.get_writer(block.number, StaticFileSegment::Headers)?;
let mut transactions_writer =
provider.get_writer(block.number, StaticFileSegment::Transactions)?;
// TODO: does to_compact require ownership?
header_writer.append_header(block.header().clone(), td, block.hash())?;
let no_hash_transactions =
block.body.clone().into_iter().map(TransactionSignedNoHash::from);
let mut tx_number = start_tx_number;
for tx in no_hash_transactions {
transactions_writer.append_transaction(tx_number, tx)?;
tx_number += 1;
}
// increment block for both segments
header_writer.increment_block(StaticFileSegment::Headers, block.number)?;
transactions_writer.increment_block(StaticFileSegment::Transactions, block.number)?;
// finally commit
header_writer.commit()?;
transactions_writer.commit()?;
// TODO: do we care about the mpsc error here?
// send a command to the db service to update the checkpoints for headers / bodies
let _ = self
.database_handle
.send_action(PersistenceAction::UpdateTransactionMeta((block.number, sender)));
Ok(())
}
/// Write execution-related block data to static files.
///
/// This will then send a command to the db service, that it should write new data, and update
/// the checkpoints for execution and beyond.
fn write_execution_data(
&self,
blocks: Vec<ExecutedBlock>,
sender: oneshot::Sender<B256>,
) -> ProviderResult<()> {
if blocks.is_empty() {
return Ok(())
}
let provider = self.provider.static_file_provider();
// NOTE: checked non-empty above
let first_block = blocks.first().unwrap().block();
let last_block = blocks.last().unwrap().block();
let mut receipts_writer =
provider.get_writer(first_block.number, StaticFileSegment::Receipts)?;
for (num, receipts) in blocks
.iter()
.map(|block| (block.block().number, block.execution_outcome().receipts.clone()))
{
debug_assert!(receipts.len() == 1);
// TODO: should we also assert that the receipt is not None here, that means the
// receipt is pruned
for receipt in receipts.first().unwrap().iter().flatten() {
receipts_writer.append_receipt(num, receipt.clone())?;
}
}
// TODO: do we care about the mpsc error here?
// send a command to the db service to update the checkpoints for execution etc.
let _ = self.database_handle.send_action(PersistenceAction::SaveBlocks((blocks, sender)));
Ok(())
}
/// Removes the blocks above the given block number from static files. Also removes related
/// receipt and header data.
///
/// This is exclusive, i.e., it only removes blocks above `block_number`, and does not remove
/// `block_number`.
///
/// Returns the block hash for the lowest block removed from the database, which should be
/// the hash for `block_number + 1`.
///
/// This is meant to be called by the db service, as this should only be done after related data
/// is removed from the database, and checkpoints are updated.
///
/// Returns the hash of the lowest removed block.
fn remove_blocks_above(
&self,
block_num: u64,
sender: oneshot::Sender<B256>,
) -> ProviderResult<()> {
let provider = self.provider.static_file_provider();
// get the writers
let mut _header_writer = provider.get_writer(block_num, StaticFileSegment::Headers)?;
let mut _transactions_writer =
provider.get_writer(block_num, StaticFileSegment::Transactions)?;
let mut _receipts_writer = provider.get_writer(block_num, StaticFileSegment::Receipts)?;
// TODO: how do we delete s.t. `block_num` is the start? Additionally, do we need to index
// by tx num for the transactions segment?
todo!("implement me")
}
}
impl<DB> StaticFileService<DB>
where
DB: Database + 'static,
{
/// This is the main loop, that will listen to static file actions, and write DB data to static
/// files.
fn run(self) {
// If the receiver errors then senders have disconnected, so the loop should then end.
while let Ok(action) = self.incoming.recv() {
match action {
StaticFileAction::LogTransactions((
block,
start_tx_number,
td,
response_sender,
)) => {
self.log_transactions(block, start_tx_number, td, response_sender)
.expect("todo: handle errors");
todo!("implement me")
}
StaticFileAction::RemoveBlocksAbove((block_num, response_sender)) => {
self.remove_blocks_above(block_num, response_sender)
.expect("todo: handle errors");
todo!("implement me")
}
StaticFileAction::WriteExecutionData((blocks, response_sender)) => {
self.write_execution_data(blocks, response_sender)
.expect("todo: handle errors");
todo!("implement me")
}
}
}
}
}
/// A signal to the static file service that some data should be copied from the DB to static files.
#[derive(Debug)]
pub enum StaticFileAction {
/// The given block has been added to the canonical chain, its transactions and headers will be
/// persisted for durability.
///
/// This will then send a command to the db service, that it should update the checkpoints for
/// headers and block bodies.
LogTransactions((Arc<SealedBlock>, u64, U256, oneshot::Sender<()>)),
/// Write execution-related block data to static files.
///
/// This will then send a command to the db service, that it should write new data, and update
/// the checkpoints for execution and beyond.
WriteExecutionData((Vec<ExecutedBlock>, oneshot::Sender<B256>)),
/// Removes the blocks above the given block number from static files. Also removes related
/// receipt and header data.
///
/// This is meant to be called by the db service, as this should only be done after related
/// data is removed from the database, and checkpoints are updated.
///
/// Returns the hash of the lowest removed block.
RemoveBlocksAbove((u64, oneshot::Sender<B256>)),
}
/// A handle to the static file service
#[derive(Debug, Clone)]
pub struct StaticFileServiceHandle {
/// The channel used to communicate with the static file service
sender: Sender<StaticFileAction>,
}
impl StaticFileServiceHandle {
/// Create a new [`StaticFileServiceHandle`] from a [`Sender<StaticFileAction>`].
pub const fn new(sender: Sender<StaticFileAction>) -> Self {
Self { sender }
}
/// Sends a specific [`StaticFileAction`] in the contained channel. The caller is responsible
/// for creating any channels for the given action.
pub fn send_action(&self, action: StaticFileAction) -> Result<(), SendError<StaticFileAction>> {
self.sender.send(action)
}
}