feat: tree hook for persisting blocks (#9365)

This commit is contained in:
Federico Gimenez
2024-07-11 19:46:05 +02:00
committed by GitHub
parent add3725f83
commit 11c5e3121d
4 changed files with 219 additions and 7 deletions

1
Cargo.lock generated
View File

@ -7071,6 +7071,7 @@ dependencies = [
"reth-errors",
"reth-ethereum-consensus",
"reth-evm",
"reth-exex-types",
"reth-metrics",
"reth-network-p2p",
"reth-payload-builder",

View File

@ -63,7 +63,9 @@ reth-tracing = { workspace = true, optional = true }
[dev-dependencies]
# reth
reth-db = { workspace = true, features = ["test-utils"] }
reth-exex-types.workspace = true
reth-network-p2p = { workspace = true, features = ["test-utils"] }
reth-prune.workspace = true
reth-prune-types.workspace = true
reth-stages = { workspace = true, features = ["test-utils"] }
reth-tracing.workspace = true

View File

@ -229,7 +229,7 @@ pub enum PersistenceAction {
}
/// A handle to the persistence service
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct PersistenceHandle {
/// The channel used to communicate with the persistence service
sender: Sender<PersistenceAction>,
@ -254,13 +254,16 @@ impl PersistenceHandle {
/// assumed to be ordered by block number.
///
/// This returns the latest hash that has been saved, allowing removal of that block and any
/// previous blocks from in-memory data structures.
pub async fn save_blocks(&self, blocks: Vec<ExecutedBlock>) -> B256 {
let (tx, rx) = oneshot::channel();
/// previous blocks from in-memory data structures. This value is returned in the receiver end
/// of the sender argument.
pub fn save_blocks(&self, blocks: Vec<ExecutedBlock>, tx: oneshot::Sender<B256>) {
if blocks.is_empty() {
let _ = tx.send(B256::default());
return;
}
self.sender
.send(PersistenceAction::SaveBlocks((blocks, tx)))
.expect("should be able to send");
rx.await.expect("todo: err handling")
}
/// Tells the persistence service to remove blocks above a certain block number. The removed
@ -283,3 +286,86 @@ impl PersistenceHandle {
rx.await.expect("todo: err handling")
}
}
#[cfg(test)]
mod tests {
use super::*;
use reth_chainspec::MAINNET;
use reth_db::test_utils::{create_test_rw_db, create_test_static_files_dir};
use reth_exex_types::FinishedExExHeight;
use reth_primitives::{
Address, Block, Receipts, Requests, SealedBlockWithSenders, TransactionSigned, B256,
};
use reth_provider::{providers::StaticFileProvider, ExecutionOutcome, ProviderFactory};
use reth_prune::Pruner;
use reth_trie::{updates::TrieUpdates, HashedPostState};
use revm::db::BundleState;
use std::sync::{mpsc::channel, Arc};
fn default_persistence_handle() -> PersistenceHandle {
let db = create_test_rw_db();
let (_static_dir, static_dir_path) = create_test_static_files_dir();
let provider = ProviderFactory::new(
db,
MAINNET.clone(),
StaticFileProvider::read_write(static_dir_path).unwrap(),
);
let (finished_exex_height_tx, finished_exex_height_rx) =
tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
let pruner = Pruner::new(provider.clone(), vec![], 5, 0, 5, None, finished_exex_height_rx);
let (static_file_sender, _static_file_receiver) = channel();
let static_file_handle = StaticFileServiceHandle::new(static_file_sender);
Persistence::spawn_new(provider, static_file_handle, pruner)
}
#[tokio::test]
async fn test_save_blocks_empty() {
let persistence_handle = default_persistence_handle();
let blocks = vec![];
let (tx, rx) = oneshot::channel();
persistence_handle.save_blocks(blocks, tx);
let hash = rx.await.unwrap();
assert_eq!(hash, B256::default());
}
#[tokio::test]
async fn test_save_blocks_single_block() {
let persistence_handle = default_persistence_handle();
let mut block = Block::default();
let sender = Address::random();
let tx = TransactionSigned::default();
block.body.push(tx);
let block_hash = block.hash_slow();
let block_number = block.number;
let sealed = block.seal_slow();
let sealed_with_senders =
SealedBlockWithSenders::new(sealed.clone(), vec![sender]).unwrap();
let executed = ExecutedBlock::new(
Arc::new(sealed),
Arc::new(sealed_with_senders.senders),
Arc::new(ExecutionOutcome::new(
BundleState::default(),
Receipts { receipt_vec: vec![] },
block_number,
vec![Requests::default()],
)),
Arc::new(HashedPostState::default()),
Arc::new(TrieUpdates::default()),
);
let blocks = vec![executed];
let (tx, rx) = oneshot::channel();
persistence_handle.save_blocks(blocks, tx);
let actual_hash = rx.await.unwrap();
assert_eq!(block_hash, actual_hash);
}
}

View File

@ -2,6 +2,7 @@ use crate::{
backfill::BackfillAction,
chain::FromOrchestrator,
engine::{DownloadRequest, EngineApiEvent, FromEngine},
persistence::PersistenceHandle,
};
use reth_beacon_consensus::{
BeaconEngineMessage, ForkchoiceStateTracker, InvalidHeaderCache, OnForkChoiceUpdated,
@ -37,12 +38,15 @@ use std::{
marker::PhantomData,
sync::{mpsc::Receiver, Arc},
};
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::{mpsc::UnboundedSender, oneshot};
use tracing::*;
mod memory_overlay;
pub use memory_overlay::MemoryOverlayStateProvider;
/// Maximum number of blocks to be kept in memory without triggering persistence.
const PERSISTENCE_THRESHOLD: u64 = 256;
/// Represents an executed block stored in-memory.
#[derive(Clone, Debug)]
pub struct ExecutedBlock {
@ -54,6 +58,16 @@ pub struct ExecutedBlock {
}
impl ExecutedBlock {
pub(crate) const fn new(
block: Arc<SealedBlock>,
senders: Arc<Vec<Address>>,
execution_output: Arc<ExecutionOutcome>,
hashed_state: Arc<HashedPostState>,
trie: Arc<TrieUpdates>,
) -> Self {
Self { block, senders, execution_output, hashed_state, trie }
}
/// Returns a reference to the executed block.
pub(crate) fn block(&self) -> &SealedBlock {
&self.block
@ -120,6 +134,11 @@ impl TreeState {
}
}
}
/// Returns the maximum block number stored.
pub(crate) fn max_block_number(&self) -> BlockNumber {
*self.blocks_by_number.last_key_value().unwrap_or((&BlockNumber::default(), &vec![])).0
}
}
/// Tracks the state of the engine api internals.
@ -129,7 +148,7 @@ impl TreeState {
pub struct EngineApiTreeState {
/// Tracks the state of the blockchain tree.
tree_state: TreeState,
/// Tracks the received forkchoice state updates received by the CL.
/// Tracks the forkchoice state updates received by the CL.
forkchoice_state_tracker: ForkchoiceStateTracker,
/// Buffer of detached blocks.
buffer: BlockBuffer,
@ -242,6 +261,8 @@ pub struct EngineApiTreeHandlerImpl<P, E, T: EngineTypes> {
state: EngineApiTreeState,
incoming: Receiver<FromEngine<BeaconEngineMessage<T>>>,
outgoing: UnboundedSender<EngineApiEvent>,
persistence: PersistenceHandle,
persistence_state: PersistenceState,
/// (tmp) The flag indicating whether the pipeline is active.
is_pipeline_active: bool,
_marker: PhantomData<T>,
@ -262,6 +283,7 @@ where
incoming: Receiver<FromEngine<BeaconEngineMessage<T>>>,
outgoing: UnboundedSender<EngineApiEvent>,
state: EngineApiTreeState,
persistence: PersistenceHandle,
) -> Self {
Self {
provider,
@ -270,6 +292,8 @@ where
payload_validator,
incoming,
outgoing,
persistence,
persistence_state: PersistenceState::default(),
is_pipeline_active: false,
state,
_marker: PhantomData,
@ -284,6 +308,7 @@ where
payload_validator: ExecutionPayloadValidator,
incoming: Receiver<FromEngine<BeaconEngineMessage<T>>>,
state: EngineApiTreeState,
persistence: PersistenceHandle,
) -> UnboundedSender<EngineApiEvent> {
let (outgoing, rx) = tokio::sync::mpsc::unbounded_channel();
let task = Self::new(
@ -294,6 +319,7 @@ where
incoming,
outgoing.clone(),
state,
persistence,
);
std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| task.run()).unwrap();
outgoing
@ -342,6 +368,71 @@ where
}
}
}
if self.should_persist() && !self.persistence_state.in_progress() {
let blocks_to_persist = self.get_blocks_to_persist();
let (tx, rx) = oneshot::channel();
self.persistence.save_blocks(blocks_to_persist, tx);
self.persistence_state.start(rx);
}
if self.persistence_state.in_progress() {
let rx = self
.persistence_state
.rx
.as_mut()
.expect("if a persistence task is in progress Receiver must be Some");
// Check if persistence has completed
if let Ok(last_persisted_block_hash) = rx.try_recv() {
if let Some(block) =
self.state.tree_state.block_by_hash(last_persisted_block_hash)
{
self.persistence_state.finish(last_persisted_block_hash, block.number);
self.remove_persisted_blocks_from_memory();
} else {
error!("could not find persisted block with hash {last_persisted_block_hash} in memory");
}
}
}
}
}
/// Returns true if the canonical chain length minus the last persisted
/// block is more than the persistence threshold.
fn should_persist(&self) -> bool {
self.state.tree_state.max_block_number() -
self.persistence_state.last_persisted_block_number >
PERSISTENCE_THRESHOLD
}
fn get_blocks_to_persist(&self) -> Vec<ExecutedBlock> {
let start = self.persistence_state.last_persisted_block_number + 1;
let end = start + PERSISTENCE_THRESHOLD;
self.state
.tree_state
.blocks_by_number
.range(start..end)
.flat_map(|(_, blocks)| blocks.iter().cloned())
.collect()
}
fn remove_persisted_blocks_from_memory(&mut self) {
let keys_to_remove: Vec<BlockNumber> = self
.state
.tree_state
.blocks_by_number
.range(..=self.persistence_state.last_persisted_block_number)
.map(|(&k, _)| k)
.collect();
for key in keys_to_remove {
if let Some(blocks) = self.state.tree_state.blocks_by_number.remove(&key) {
// Remove corresponding blocks from blocks_by_hash
for block in blocks {
self.state.tree_state.blocks_by_hash.remove(&block.block().hash());
}
}
}
}
@ -755,3 +846,35 @@ where
todo!()
}
}
/// The state of the persistence task.
#[derive(Default, Debug)]
struct PersistenceState {
/// Hash of the last block persisted.
last_persisted_block_hash: B256,
/// Receiver end of channel where the result of the persistence task will be
/// sent when done. A None value means there's no persistence task in progress.
rx: Option<oneshot::Receiver<B256>>,
/// The last persisted block number.
last_persisted_block_number: u64,
}
impl PersistenceState {
/// Determines if there is a persistence task in progress by checking if the
/// receiver is set.
const fn in_progress(&self) -> bool {
self.rx.is_some()
}
/// Sets state for a started persistence task.
fn start(&mut self, rx: oneshot::Receiver<B256>) {
self.rx = Some(rx);
}
/// Sets state for a finished persistence task.
fn finish(&mut self, last_persisted_block_hash: B256, last_persisted_block_number: u64) {
self.rx = None;
self.last_persisted_block_number = last_persisted_block_number;
self.last_persisted_block_hash = last_persisted_block_hash;
}
}