mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat(snapshot, prune): highest snapshots tracker (#4721)
This commit is contained in:
@ -17,8 +17,12 @@ reth-db.workspace = true
|
||||
reth-provider.workspace = true
|
||||
reth-interfaces.workspace = true
|
||||
|
||||
# async
|
||||
tokio = { workspace = true, features = ["sync"] }
|
||||
|
||||
# misc
|
||||
thiserror.workspace = true
|
||||
tracing.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
# reth
|
||||
|
||||
@ -13,4 +13,7 @@ mod error;
|
||||
mod snapshotter;
|
||||
|
||||
pub use error::SnapshotterError;
|
||||
pub use snapshotter::{SnapshotTargets, Snapshotter, SnapshotterResult, SnapshotterWithResult};
|
||||
pub use snapshotter::{
|
||||
HighestSnapshots, HighestSnapshotsTracker, SnapshotTargets, Snapshotter, SnapshotterResult,
|
||||
SnapshotterWithResult,
|
||||
};
|
||||
|
||||
@ -6,6 +6,8 @@ use reth_interfaces::{RethError, RethResult};
|
||||
use reth_primitives::{BlockNumber, ChainSpec, TxNumber};
|
||||
use reth_provider::{BlockReader, DatabaseProviderRO, ProviderFactory};
|
||||
use std::{collections::HashMap, ops::RangeInclusive, sync::Arc};
|
||||
use tokio::sync::watch;
|
||||
use tracing::warn;
|
||||
|
||||
/// Result of [Snapshotter::run] execution.
|
||||
pub type SnapshotterResult = Result<SnapshotTargets, SnapshotterError>;
|
||||
@ -18,16 +20,26 @@ pub type SnapshotterWithResult<DB> = (Snapshotter<DB>, SnapshotterResult);
|
||||
pub struct Snapshotter<DB> {
|
||||
provider_factory: ProviderFactory<DB>,
|
||||
highest_snapshots: HighestSnapshots,
|
||||
highest_snapshots_tracker: watch::Sender<Option<HighestSnapshots>>,
|
||||
/// Block interval after which the snapshot is taken.
|
||||
block_interval: u64,
|
||||
}
|
||||
|
||||
/// Tracker for the latest [`HighestSnapshots`] value.
|
||||
pub type HighestSnapshotsTracker = watch::Receiver<Option<HighestSnapshots>>;
|
||||
|
||||
/// Highest snapshotted block numbers, per data part.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
struct HighestSnapshots {
|
||||
headers: Option<BlockNumber>,
|
||||
receipts: Option<BlockNumber>,
|
||||
transactions: Option<BlockNumber>,
|
||||
#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
|
||||
pub struct HighestSnapshots {
|
||||
/// Highest snapshotted block of headers, inclusive.
|
||||
/// If [`None`], no snapshot is available.
|
||||
pub headers: Option<BlockNumber>,
|
||||
/// Highest snapshotted block of receipts, inclusive.
|
||||
/// If [`None`], no snapshot is available.
|
||||
pub receipts: Option<BlockNumber>,
|
||||
/// Highest snapshotted block of transactions, inclusive.
|
||||
/// If [`None`], no snapshot is available.
|
||||
pub transactions: Option<BlockNumber>,
|
||||
}
|
||||
|
||||
/// Snapshot targets, per data part, measured in [`BlockNumber`] and [`TxNumber`], if applicable.
|
||||
@ -45,6 +57,7 @@ impl SnapshotTargets {
|
||||
}
|
||||
|
||||
/// Returns `true` if all targets are either [`None`] or multiple of `block_interval`.
|
||||
#[cfg(debug_assertions)]
|
||||
fn is_multiple_of_block_interval(&self, block_interval: u64) -> bool {
|
||||
[
|
||||
self.headers.as_ref(),
|
||||
@ -57,6 +70,7 @@ impl SnapshotTargets {
|
||||
|
||||
// Returns `true` if all targets are either [`None`] or has beginning of the range equal to the
|
||||
// highest snapshot.
|
||||
#[cfg(debug_assertions)]
|
||||
fn is_contiguous_to_highest_snapshots(&self, snapshots: HighestSnapshots) -> bool {
|
||||
[
|
||||
(self.headers.as_ref(), snapshots.headers),
|
||||
@ -76,17 +90,23 @@ impl SnapshotTargets {
|
||||
|
||||
impl<DB: Database> Snapshotter<DB> {
|
||||
/// Creates a new [Snapshotter].
|
||||
pub fn new(db: DB, chain_spec: Arc<ChainSpec>, block_interval: u64) -> Self {
|
||||
Self {
|
||||
pub fn new(
|
||||
db: DB,
|
||||
chain_spec: Arc<ChainSpec>,
|
||||
block_interval: u64,
|
||||
highest_snapshots_tracker: watch::Sender<Option<HighestSnapshots>>,
|
||||
) -> Self {
|
||||
let snapshotter = Self {
|
||||
provider_factory: ProviderFactory::new(db, chain_spec),
|
||||
// TODO(alexey): fill from on-disk snapshot data
|
||||
highest_snapshots: HighestSnapshots {
|
||||
headers: None,
|
||||
receipts: None,
|
||||
transactions: None,
|
||||
},
|
||||
highest_snapshots: HighestSnapshots::default(),
|
||||
highest_snapshots_tracker,
|
||||
block_interval,
|
||||
}
|
||||
};
|
||||
|
||||
snapshotter.update_highest_snapshots_tracker();
|
||||
|
||||
snapshotter
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@ -102,6 +122,12 @@ impl<DB: Database> Snapshotter<DB> {
|
||||
}
|
||||
}
|
||||
|
||||
fn update_highest_snapshots_tracker(&self) {
|
||||
let _ = self.highest_snapshots_tracker.send(Some(self.highest_snapshots)).map_err(|_| {
|
||||
warn!(target: "snapshot", "Highest snapshots channel closed");
|
||||
});
|
||||
}
|
||||
|
||||
/// Run the snapshotter
|
||||
pub fn run(&mut self, targets: SnapshotTargets) -> SnapshotterResult {
|
||||
debug_assert!(targets.is_multiple_of_block_interval(self.block_interval));
|
||||
@ -109,6 +135,8 @@ impl<DB: Database> Snapshotter<DB> {
|
||||
|
||||
// TODO(alexey): snapshot logic
|
||||
|
||||
self.update_highest_snapshots_tracker();
|
||||
|
||||
Ok(targets)
|
||||
}
|
||||
|
||||
@ -214,7 +242,7 @@ impl<DB: Database> Snapshotter<DB> {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::{snapshotter::SnapshotTargets, Snapshotter};
|
||||
use crate::{snapshotter::SnapshotTargets, HighestSnapshots, Snapshotter};
|
||||
use assert_matches::assert_matches;
|
||||
use reth_interfaces::{
|
||||
test_utils::{generators, generators::random_block_range},
|
||||
@ -222,6 +250,18 @@ mod tests {
|
||||
};
|
||||
use reth_primitives::{B256, MAINNET};
|
||||
use reth_stages::test_utils::TestTransaction;
|
||||
use tokio::sync::watch;
|
||||
|
||||
#[test]
|
||||
fn new() {
|
||||
let tx = TestTransaction::default();
|
||||
|
||||
let (highest_snapshots_tx, highest_snapshots_rx) = watch::channel(None);
|
||||
assert_eq!(*highest_snapshots_rx.borrow(), None);
|
||||
|
||||
Snapshotter::new(tx.inner_raw(), MAINNET.clone(), 2, highest_snapshots_tx);
|
||||
assert_eq!(*highest_snapshots_rx.borrow(), Some(HighestSnapshots::default()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn get_snapshot_targets() {
|
||||
@ -231,7 +271,8 @@ mod tests {
|
||||
let blocks = random_block_range(&mut rng, 0..=3, B256::ZERO, 2..3);
|
||||
tx.insert_blocks(blocks.iter(), None).expect("insert blocks");
|
||||
|
||||
let mut snapshotter = Snapshotter::new(tx.inner_raw(), MAINNET.clone(), 2);
|
||||
let mut snapshotter =
|
||||
Snapshotter::new(tx.inner_raw(), MAINNET.clone(), 2, watch::channel(None).0);
|
||||
|
||||
// Snapshot targets has data per part up to the passed finalized block number,
|
||||
// respecting the block interval
|
||||
|
||||
Reference in New Issue
Block a user