mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat(engine, snapshot): snapshot hook (#4690)
This commit is contained in:
14
Cargo.lock
generated
14
Cargo.lock
generated
@ -5523,6 +5523,7 @@ dependencies = [
|
||||
"reth-revm",
|
||||
"reth-rpc-types",
|
||||
"reth-rpc-types-compat",
|
||||
"reth-snapshot",
|
||||
"reth-stages",
|
||||
"reth-tasks",
|
||||
"reth-tracing",
|
||||
@ -6289,6 +6290,19 @@ dependencies = [
|
||||
"reth-rpc-types",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "reth-snapshot"
|
||||
version = "0.1.0-alpha.10"
|
||||
dependencies = [
|
||||
"assert_matches",
|
||||
"reth-db",
|
||||
"reth-interfaces",
|
||||
"reth-primitives",
|
||||
"reth-provider",
|
||||
"reth-stages",
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "reth-stages"
|
||||
version = "0.1.0-alpha.10"
|
||||
|
||||
@ -32,6 +32,7 @@ members = [
|
||||
"crates/rpc/rpc-engine-api",
|
||||
"crates/rpc/rpc-types",
|
||||
"crates/rpc/rpc-testing-util",
|
||||
"crates/snapshot",
|
||||
"crates/stages",
|
||||
"crates/storage/codecs",
|
||||
"crates/storage/db",
|
||||
|
||||
@ -19,6 +19,7 @@ reth-rpc-types.workspace = true
|
||||
reth-tasks.workspace = true
|
||||
reth-payload-builder.workspace = true
|
||||
reth-prune = { path = "../../prune" }
|
||||
reth-snapshot = { path = "../../snapshot" }
|
||||
reth-rpc-types-compat.workspace = true
|
||||
# async
|
||||
tokio = { workspace = true, features = ["sync"] }
|
||||
|
||||
@ -54,7 +54,7 @@ impl EngineHooksController {
|
||||
) -> Poll<Result<PolledHook, EngineHookError>> {
|
||||
let Some(mut hook) = self.running_hook_with_db_write.take() else { return Poll::Pending };
|
||||
|
||||
match hook.poll(cx, args) {
|
||||
match hook.poll(cx, args)? {
|
||||
Poll::Ready((event, action)) => {
|
||||
let result = PolledHook { event, action, db_access_level: hook.db_access_level() };
|
||||
|
||||
@ -109,7 +109,7 @@ impl EngineHooksController {
|
||||
return Poll::Pending
|
||||
}
|
||||
|
||||
if let Poll::Ready((event, action)) = hook.poll(cx, args) {
|
||||
if let Poll::Ready((event, action)) = hook.poll(cx, args)? {
|
||||
let result = PolledHook { event, action, db_access_level: hook.db_access_level() };
|
||||
|
||||
debug!(
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
use reth_interfaces::RethError;
|
||||
use reth_interfaces::{RethError, RethResult};
|
||||
use reth_primitives::BlockNumber;
|
||||
use std::{
|
||||
fmt,
|
||||
@ -11,6 +11,9 @@ pub(crate) use controller::{EngineHooksController, PolledHook};
|
||||
mod prune;
|
||||
pub use prune::PruneHook;
|
||||
|
||||
mod snapshot;
|
||||
pub use snapshot::SnapshotHook;
|
||||
|
||||
/// Collection of [engine hooks][`EngineHook`].
|
||||
#[derive(Default)]
|
||||
pub struct EngineHooks {
|
||||
@ -47,7 +50,7 @@ pub trait EngineHook: Send + Sync + 'static {
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
ctx: EngineContext,
|
||||
) -> Poll<(EngineHookEvent, Option<EngineHookAction>)>;
|
||||
) -> Poll<RethResult<(EngineHookEvent, Option<EngineHookAction>)>>;
|
||||
|
||||
/// Returns [db access level][`EngineHookDBAccessLevel`] the hook needs.
|
||||
fn db_access_level(&self) -> EngineHookDBAccessLevel;
|
||||
@ -58,6 +61,8 @@ pub trait EngineHook: Send + Sync + 'static {
|
||||
pub struct EngineContext {
|
||||
/// Tip block number.
|
||||
pub tip_block_number: BlockNumber,
|
||||
/// Finalized block number, if known.
|
||||
pub finalized_block_number: Option<BlockNumber>,
|
||||
}
|
||||
|
||||
/// An event emitted when [hook][`EngineHook`] is polled.
|
||||
|
||||
@ -9,7 +9,7 @@ use crate::{
|
||||
use futures::FutureExt;
|
||||
use metrics::Counter;
|
||||
use reth_db::database::Database;
|
||||
use reth_interfaces::RethError;
|
||||
use reth_interfaces::{RethError, RethResult};
|
||||
use reth_primitives::BlockNumber;
|
||||
use reth_prune::{Pruner, PrunerError, PrunerWithResult};
|
||||
use reth_tasks::TaskSpawner;
|
||||
@ -55,7 +55,7 @@ impl<DB: Database + 'static> PruneHook<DB> {
|
||||
fn poll_pruner(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<(EngineHookEvent, Option<EngineHookAction>)> {
|
||||
) -> Poll<RethResult<(EngineHookEvent, Option<EngineHookAction>)>> {
|
||||
let result = match self.pruner_state {
|
||||
PrunerState::Idle(_) => return Poll::Pending,
|
||||
PrunerState::Running(ref mut fut) => {
|
||||
@ -69,14 +69,7 @@ impl<DB: Database + 'static> PruneHook<DB> {
|
||||
|
||||
match result {
|
||||
Ok(_) => EngineHookEvent::Finished(Ok(())),
|
||||
Err(err) => EngineHookEvent::Finished(Err(match err {
|
||||
PrunerError::PrunePart(_) | PrunerError::InconsistentData(_) => {
|
||||
EngineHookError::Internal(Box::new(err))
|
||||
}
|
||||
PrunerError::Interface(err) => err.into(),
|
||||
PrunerError::Database(err) => RethError::Database(err).into(),
|
||||
PrunerError::Provider(err) => RethError::Provider(err).into(),
|
||||
})),
|
||||
Err(err) => EngineHookEvent::Finished(Err(err.into())),
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
@ -85,14 +78,15 @@ impl<DB: Database + 'static> PruneHook<DB> {
|
||||
}
|
||||
};
|
||||
|
||||
Poll::Ready((event, None))
|
||||
Poll::Ready(Ok((event, None)))
|
||||
}
|
||||
|
||||
/// This will try to spawn the pruner if it is idle:
|
||||
/// 1. Check if pruning is needed through [Pruner::is_pruning_needed].
|
||||
/// 2a. If pruning is needed, pass tip block number to the [Pruner::run] and spawn it in a
|
||||
/// 2.
|
||||
/// 1. If pruning is needed, pass tip block number to the [Pruner::run] and spawn it in a
|
||||
/// separate task. Set pruner state to [PrunerState::Running].
|
||||
/// 2b. If pruning is not needed, set pruner state back to [PrunerState::Idle].
|
||||
/// 2. If pruning is not needed, set pruner state back to [PrunerState::Idle].
|
||||
///
|
||||
/// If pruner is already running, do nothing.
|
||||
fn try_spawn_pruner(
|
||||
@ -136,11 +130,11 @@ impl<DB: Database + 'static> EngineHook for PruneHook<DB> {
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
ctx: EngineContext,
|
||||
) -> Poll<(EngineHookEvent, Option<EngineHookAction>)> {
|
||||
) -> Poll<RethResult<(EngineHookEvent, Option<EngineHookAction>)>> {
|
||||
// Try to spawn a pruner
|
||||
match self.try_spawn_pruner(ctx.tip_block_number) {
|
||||
Some((EngineHookEvent::NotReady, _)) => return Poll::Pending,
|
||||
Some((event, action)) => return Poll::Ready((event, action)),
|
||||
Some((event, action)) => return Poll::Ready(Ok((event, action))),
|
||||
None => (),
|
||||
}
|
||||
|
||||
@ -176,3 +170,16 @@ struct Metrics {
|
||||
/// The number of times the pruner was run.
|
||||
runs: Counter,
|
||||
}
|
||||
|
||||
impl From<PrunerError> for EngineHookError {
|
||||
fn from(err: PrunerError) -> Self {
|
||||
match err {
|
||||
PrunerError::PrunePart(_) | PrunerError::InconsistentData(_) => {
|
||||
EngineHookError::Internal(Box::new(err))
|
||||
}
|
||||
PrunerError::Interface(err) => err.into(),
|
||||
PrunerError::Database(err) => RethError::Database(err).into(),
|
||||
PrunerError::Provider(err) => RethError::Provider(err).into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
161
crates/consensus/beacon/src/engine/hooks/snapshot.rs
Normal file
161
crates/consensus/beacon/src/engine/hooks/snapshot.rs
Normal file
@ -0,0 +1,161 @@
|
||||
//! Snapshot hook for the engine implementation.
|
||||
|
||||
use crate::{
|
||||
engine::hooks::{
|
||||
EngineContext, EngineHook, EngineHookAction, EngineHookError, EngineHookEvent,
|
||||
},
|
||||
hooks::EngineHookDBAccessLevel,
|
||||
};
|
||||
use futures::FutureExt;
|
||||
use reth_db::database::Database;
|
||||
use reth_interfaces::{RethError, RethResult};
|
||||
use reth_primitives::BlockNumber;
|
||||
use reth_snapshot::{Snapshotter, SnapshotterError, SnapshotterWithResult};
|
||||
use reth_tasks::TaskSpawner;
|
||||
use std::task::{ready, Context, Poll};
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
/// Manages snapshotting under the control of the engine.
|
||||
///
|
||||
/// This type controls the [Snapshotter].
|
||||
#[derive(Debug)]
|
||||
pub struct SnapshotHook<DB> {
|
||||
/// The current state of the snapshotter.
|
||||
state: SnapshotterState<DB>,
|
||||
/// The type that can spawn the snapshotter task.
|
||||
task_spawner: Box<dyn TaskSpawner>,
|
||||
}
|
||||
|
||||
impl<DB: Database + 'static> SnapshotHook<DB> {
|
||||
/// Create a new instance
|
||||
pub fn new(snapshotter: Snapshotter<DB>, task_spawner: Box<dyn TaskSpawner>) -> Self {
|
||||
Self { state: SnapshotterState::Idle(Some(snapshotter)), task_spawner }
|
||||
}
|
||||
|
||||
/// Advances the snapshotter state.
|
||||
///
|
||||
/// This checks for the result in the channel, or returns pending if the snapshotter is idle.
|
||||
fn poll_snapshotter(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<RethResult<(EngineHookEvent, Option<EngineHookAction>)>> {
|
||||
let result = match self.state {
|
||||
SnapshotterState::Idle(_) => return Poll::Pending,
|
||||
SnapshotterState::Running(ref mut fut) => {
|
||||
ready!(fut.poll_unpin(cx))
|
||||
}
|
||||
};
|
||||
|
||||
let event = match result {
|
||||
Ok((snapshotter, result)) => {
|
||||
self.state = SnapshotterState::Idle(Some(snapshotter));
|
||||
|
||||
match result {
|
||||
Ok(_) => EngineHookEvent::Finished(Ok(())),
|
||||
Err(err) => EngineHookEvent::Finished(Err(err.into())),
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
// failed to receive the snapshotter
|
||||
EngineHookEvent::Finished(Err(EngineHookError::ChannelClosed))
|
||||
}
|
||||
};
|
||||
|
||||
Poll::Ready(Ok((event, None)))
|
||||
}
|
||||
|
||||
/// This will try to spawn the snapshotter if it is idle:
|
||||
/// 1. Check if snapshotting is needed through [Snapshotter::get_snapshot_targets] and then
|
||||
/// [SnapshotTargets::any](reth_snapshot::SnapshotTargets::any).
|
||||
/// 2.
|
||||
/// 1. If snapshotting is needed, pass snapshot request to the [Snapshotter::run] and spawn
|
||||
/// it in a separate task. Set snapshotter state to [SnapshotterState::Running].
|
||||
/// 2. If snapshotting is not needed, set snapshotter state back to
|
||||
/// [SnapshotterState::Idle].
|
||||
///
|
||||
/// If snapshotter is already running, do nothing.
|
||||
fn try_spawn_snapshotter(
|
||||
&mut self,
|
||||
finalized_block_number: BlockNumber,
|
||||
) -> RethResult<Option<(EngineHookEvent, Option<EngineHookAction>)>> {
|
||||
Ok(match &mut self.state {
|
||||
SnapshotterState::Idle(snapshotter) => {
|
||||
let Some(mut snapshotter) = snapshotter.take() else { return Ok(None) };
|
||||
|
||||
let targets = snapshotter.get_snapshot_targets(finalized_block_number)?;
|
||||
|
||||
// Check if the snapshotting of any parts has been requested.
|
||||
if targets.any() {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.task_spawner.spawn_critical_blocking(
|
||||
"snapshotter task",
|
||||
Box::pin(async move {
|
||||
let result = snapshotter.run(targets);
|
||||
let _ = tx.send((snapshotter, result));
|
||||
}),
|
||||
);
|
||||
self.state = SnapshotterState::Running(rx);
|
||||
|
||||
Some((EngineHookEvent::Started, None))
|
||||
} else {
|
||||
self.state = SnapshotterState::Idle(Some(snapshotter));
|
||||
Some((EngineHookEvent::NotReady, None))
|
||||
}
|
||||
}
|
||||
SnapshotterState::Running(_) => None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<DB: Database + 'static> EngineHook for SnapshotHook<DB> {
|
||||
fn name(&self) -> &'static str {
|
||||
"Snapshot"
|
||||
}
|
||||
|
||||
fn poll(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
ctx: EngineContext,
|
||||
) -> Poll<RethResult<(EngineHookEvent, Option<EngineHookAction>)>> {
|
||||
let Some(finalized_block_number) = ctx.finalized_block_number else {
|
||||
return Poll::Ready(Ok((EngineHookEvent::NotReady, None)))
|
||||
};
|
||||
|
||||
// Try to spawn a snapshotter
|
||||
match self.try_spawn_snapshotter(finalized_block_number)? {
|
||||
Some((EngineHookEvent::NotReady, _)) => return Poll::Pending,
|
||||
Some((event, action)) => return Poll::Ready(Ok((event, action))),
|
||||
None => (),
|
||||
}
|
||||
|
||||
// Poll snapshotter and check its status
|
||||
self.poll_snapshotter(cx)
|
||||
}
|
||||
|
||||
fn db_access_level(&self) -> EngineHookDBAccessLevel {
|
||||
EngineHookDBAccessLevel::ReadOnly
|
||||
}
|
||||
}
|
||||
|
||||
/// The possible snapshotter states within the sync controller.
|
||||
///
|
||||
/// [SnapshotterState::Idle] means that the snapshotter is currently idle.
|
||||
/// [SnapshotterState::Running] means that the snapshotter is currently running.
|
||||
#[derive(Debug)]
|
||||
enum SnapshotterState<DB> {
|
||||
/// Snapshotter is idle.
|
||||
Idle(Option<Snapshotter<DB>>),
|
||||
/// Snapshotter is running and waiting for a response
|
||||
Running(oneshot::Receiver<SnapshotterWithResult<DB>>),
|
||||
}
|
||||
|
||||
impl From<SnapshotterError> for EngineHookError {
|
||||
fn from(err: SnapshotterError) -> Self {
|
||||
match err {
|
||||
SnapshotterError::InconsistentData(_) => EngineHookError::Internal(Box::new(err)),
|
||||
SnapshotterError::Interface(err) => err.into(),
|
||||
SnapshotterError::Database(err) => RethError::Database(err).into(),
|
||||
SnapshotterError::Provider(err) => RethError::Provider(err).into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1796,7 +1796,10 @@ where
|
||||
// any engine messages until it's finished.
|
||||
if let Poll::Ready(result) = this.hooks.poll_running_hook_with_db_write(
|
||||
cx,
|
||||
EngineContext { tip_block_number: this.blockchain.canonical_tip().number },
|
||||
EngineContext {
|
||||
tip_block_number: this.blockchain.canonical_tip().number,
|
||||
finalized_block_number: this.blockchain.finalized_block_number()?,
|
||||
},
|
||||
)? {
|
||||
this.on_hook_result(result)?;
|
||||
}
|
||||
@ -1856,7 +1859,10 @@ where
|
||||
if !this.forkchoice_state_tracker.is_latest_invalid() {
|
||||
if let Poll::Ready(result) = this.hooks.poll_next_hook(
|
||||
cx,
|
||||
EngineContext { tip_block_number: this.blockchain.canonical_tip().number },
|
||||
EngineContext {
|
||||
tip_block_number: this.blockchain.canonical_tip().number,
|
||||
finalized_block_number: this.blockchain.finalized_block_number()?,
|
||||
},
|
||||
this.sync.is_pipeline_active(),
|
||||
)? {
|
||||
this.on_hook_result(result)?;
|
||||
|
||||
30
crates/snapshot/Cargo.toml
Normal file
30
crates/snapshot/Cargo.toml
Normal file
@ -0,0 +1,30 @@
|
||||
[package]
|
||||
name = "reth-snapshot"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
rust-version.workspace = true
|
||||
license.workspace = true
|
||||
homepage.workspace = true
|
||||
repository.workspace = true
|
||||
description = """
|
||||
Snapshotting implementation
|
||||
"""
|
||||
|
||||
[dependencies]
|
||||
# reth
|
||||
reth-primitives.workspace = true
|
||||
reth-db.workspace = true
|
||||
reth-provider.workspace = true
|
||||
reth-interfaces.workspace = true
|
||||
|
||||
# misc
|
||||
thiserror.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
# reth
|
||||
reth-db = { workspace = true, features = ["test-utils"] }
|
||||
reth-stages = { path = "../stages", features = ["test-utils"] }
|
||||
|
||||
# misc
|
||||
|
||||
assert_matches.workspace = true
|
||||
21
crates/snapshot/src/error.rs
Normal file
21
crates/snapshot/src/error.rs
Normal file
@ -0,0 +1,21 @@
|
||||
use reth_db::DatabaseError;
|
||||
use reth_interfaces::RethError;
|
||||
use reth_provider::ProviderError;
|
||||
use thiserror::Error;
|
||||
|
||||
/// Error returned by [crate::Snapshotter::run]
|
||||
#[derive(Error, Debug)]
|
||||
#[allow(missing_docs)]
|
||||
pub enum SnapshotterError {
|
||||
#[error("Inconsistent data: {0}")]
|
||||
InconsistentData(&'static str),
|
||||
|
||||
#[error("An interface error occurred.")]
|
||||
Interface(#[from] RethError),
|
||||
|
||||
#[error(transparent)]
|
||||
Database(#[from] DatabaseError),
|
||||
|
||||
#[error(transparent)]
|
||||
Provider(#[from] ProviderError),
|
||||
}
|
||||
16
crates/snapshot/src/lib.rs
Normal file
16
crates/snapshot/src/lib.rs
Normal file
@ -0,0 +1,16 @@
|
||||
//! Snapshotting implementation.
|
||||
|
||||
#![doc(
|
||||
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
|
||||
html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
|
||||
issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
|
||||
)]
|
||||
#![warn(missing_debug_implementations, missing_docs, unreachable_pub, rustdoc::all)]
|
||||
#![deny(unused_must_use, rust_2018_idioms)]
|
||||
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
|
||||
|
||||
mod error;
|
||||
mod snapshotter;
|
||||
|
||||
pub use error::SnapshotterError;
|
||||
pub use snapshotter::{SnapshotTargets, Snapshotter, SnapshotterResult, SnapshotterWithResult};
|
||||
279
crates/snapshot/src/snapshotter.rs
Normal file
279
crates/snapshot/src/snapshotter.rs
Normal file
@ -0,0 +1,279 @@
|
||||
//! Support for snapshotting.
|
||||
|
||||
use crate::SnapshotterError;
|
||||
use reth_db::database::Database;
|
||||
use reth_interfaces::{RethError, RethResult};
|
||||
use reth_primitives::{BlockNumber, ChainSpec, TxNumber};
|
||||
use reth_provider::{BlockReader, DatabaseProviderRO, ProviderFactory};
|
||||
use std::{collections::HashMap, ops::RangeInclusive, sync::Arc};
|
||||
|
||||
/// Result of [Snapshotter::run] execution.
|
||||
pub type SnapshotterResult = Result<SnapshotTargets, SnapshotterError>;
|
||||
|
||||
/// The snapshotter type itself with the result of [Snapshotter::run]
|
||||
pub type SnapshotterWithResult<DB> = (Snapshotter<DB>, SnapshotterResult);
|
||||
|
||||
/// Snapshotting routine. Main snapshotting logic happens in [Snapshotter::run].
|
||||
#[derive(Debug)]
|
||||
pub struct Snapshotter<DB> {
|
||||
provider_factory: ProviderFactory<DB>,
|
||||
highest_snapshots: HighestSnapshots,
|
||||
/// Block interval after which the snapshot is taken.
|
||||
block_interval: u64,
|
||||
}
|
||||
|
||||
/// Highest snapshotted block numbers, per data part.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
struct HighestSnapshots {
|
||||
headers: Option<BlockNumber>,
|
||||
receipts: Option<BlockNumber>,
|
||||
transactions: Option<BlockNumber>,
|
||||
}
|
||||
|
||||
/// Snapshot targets, per data part, measured in [`BlockNumber`] and [`TxNumber`], if applicable.
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
pub struct SnapshotTargets {
|
||||
headers: Option<RangeInclusive<BlockNumber>>,
|
||||
receipts: Option<(RangeInclusive<BlockNumber>, RangeInclusive<TxNumber>)>,
|
||||
transactions: Option<(RangeInclusive<BlockNumber>, RangeInclusive<TxNumber>)>,
|
||||
}
|
||||
|
||||
impl SnapshotTargets {
|
||||
/// Returns `true` if any of the data parts has targets, i.e. is [`Some`].
|
||||
pub fn any(&self) -> bool {
|
||||
self.headers.is_some() || self.receipts.is_some() || self.transactions.is_some()
|
||||
}
|
||||
|
||||
/// Returns `true` if all targets are either [`None`] or multiple of `block_interval`.
|
||||
#[cfg(debug_assertions)]
|
||||
fn is_multiple_of_block_interval(&self, block_interval: u64) -> bool {
|
||||
[
|
||||
self.headers.as_ref(),
|
||||
self.receipts.as_ref().map(|(blocks, _)| blocks),
|
||||
self.transactions.as_ref().map(|(blocks, _)| blocks),
|
||||
]
|
||||
.iter()
|
||||
.all(|blocks| blocks.map_or(true, |blocks| (blocks.end() + 1) % block_interval == 0))
|
||||
}
|
||||
|
||||
// Returns `true` if all targets are either [`None`] or has beginning of the range equal to the
|
||||
// highest snapshot.
|
||||
#[cfg(debug_assertions)]
|
||||
fn is_contiguous_to_highest_snapshots(&self, snapshots: HighestSnapshots) -> bool {
|
||||
[
|
||||
(self.headers.as_ref(), snapshots.headers),
|
||||
(self.receipts.as_ref().map(|(blocks, _)| blocks), snapshots.receipts),
|
||||
(self.transactions.as_ref().map(|(blocks, _)| blocks), snapshots.transactions),
|
||||
]
|
||||
.iter()
|
||||
.all(|(target, highest)| {
|
||||
target.map_or(true, |block_number| {
|
||||
highest.map_or(*block_number.start() == 0, |previous_block_number| {
|
||||
*block_number.start() == previous_block_number + 1
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<DB: Database> Snapshotter<DB> {
|
||||
/// Creates a new [Snapshotter].
|
||||
pub fn new(db: DB, chain_spec: Arc<ChainSpec>, block_interval: u64) -> Self {
|
||||
Self {
|
||||
provider_factory: ProviderFactory::new(db, chain_spec),
|
||||
// TODO(alexey): fill from on-disk snapshot data
|
||||
highest_snapshots: HighestSnapshots {
|
||||
headers: None,
|
||||
receipts: None,
|
||||
transactions: None,
|
||||
},
|
||||
block_interval,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn set_highest_snapshots_from_targets(&mut self, targets: &SnapshotTargets) {
|
||||
if let Some(block_number) = &targets.headers {
|
||||
self.highest_snapshots.headers = Some(*block_number.end());
|
||||
}
|
||||
if let Some((block_number, _)) = &targets.receipts {
|
||||
self.highest_snapshots.receipts = Some(*block_number.end());
|
||||
}
|
||||
if let Some((block_number, _)) = &targets.transactions {
|
||||
self.highest_snapshots.transactions = Some(*block_number.end());
|
||||
}
|
||||
}
|
||||
|
||||
/// Run the snapshotter
|
||||
pub fn run(&mut self, targets: SnapshotTargets) -> SnapshotterResult {
|
||||
debug_assert!(targets.is_multiple_of_block_interval(self.block_interval));
|
||||
debug_assert!(targets.is_contiguous_to_highest_snapshots(self.highest_snapshots));
|
||||
|
||||
// TODO(alexey): snapshot logic
|
||||
|
||||
Ok(targets)
|
||||
}
|
||||
|
||||
/// Returns a snapshot targets at the provided finalized block number, respecting the block
|
||||
/// interval. The target is determined by the check against last snapshots.
|
||||
pub fn get_snapshot_targets(
|
||||
&self,
|
||||
finalized_block_number: BlockNumber,
|
||||
) -> RethResult<SnapshotTargets> {
|
||||
let provider = self.provider_factory.provider()?;
|
||||
|
||||
// Round down `finalized_block_number` to a multiple of `block_interval`
|
||||
let to_block_number = finalized_block_number.saturating_sub(
|
||||
// Adjust for 0-indexed block numbers
|
||||
(finalized_block_number + 1) % self.block_interval,
|
||||
);
|
||||
|
||||
// Calculate block ranges to snapshot
|
||||
let headers_block_range =
|
||||
self.get_snapshot_target_block_range(to_block_number, self.highest_snapshots.headers);
|
||||
let receipts_block_range =
|
||||
self.get_snapshot_target_block_range(to_block_number, self.highest_snapshots.receipts);
|
||||
let transactions_block_range = self
|
||||
.get_snapshot_target_block_range(to_block_number, self.highest_snapshots.transactions);
|
||||
|
||||
// Calculate transaction ranges to snapshot
|
||||
let mut block_to_tx_number_cache = HashMap::default();
|
||||
let receipts_tx_range = self.get_snapshot_target_tx_range(
|
||||
&provider,
|
||||
&mut block_to_tx_number_cache,
|
||||
self.highest_snapshots.receipts,
|
||||
&receipts_block_range,
|
||||
)?;
|
||||
let transactions_tx_range = self.get_snapshot_target_tx_range(
|
||||
&provider,
|
||||
&mut block_to_tx_number_cache,
|
||||
self.highest_snapshots.transactions,
|
||||
&transactions_block_range,
|
||||
)?;
|
||||
|
||||
Ok(SnapshotTargets {
|
||||
headers: headers_block_range
|
||||
.size_hint()
|
||||
.1
|
||||
.expect("finalized block should be >= last headers snapshot")
|
||||
.ge(&(self.block_interval as usize))
|
||||
.then_some(headers_block_range),
|
||||
receipts: receipts_block_range
|
||||
.size_hint()
|
||||
.1
|
||||
.expect("finalized block should be >= last receipts snapshot")
|
||||
.ge(&(self.block_interval as usize))
|
||||
.then_some((receipts_block_range, receipts_tx_range)),
|
||||
transactions: transactions_block_range
|
||||
.size_hint()
|
||||
.1
|
||||
.expect("finalized block should be >= last transactions snapshot")
|
||||
.ge(&(self.block_interval as usize))
|
||||
.then_some((transactions_block_range, transactions_tx_range)),
|
||||
})
|
||||
}
|
||||
|
||||
fn get_snapshot_target_block_range(
|
||||
&self,
|
||||
to_block_number: BlockNumber,
|
||||
highest_snapshot: Option<BlockNumber>,
|
||||
) -> RangeInclusive<BlockNumber> {
|
||||
let highest_snapshot = highest_snapshot.map_or(0, |block_number| block_number + 1);
|
||||
highest_snapshot..=(highest_snapshot + self.block_interval - 1).min(to_block_number)
|
||||
}
|
||||
|
||||
fn get_snapshot_target_tx_range(
|
||||
&self,
|
||||
provider: &DatabaseProviderRO<'_, DB>,
|
||||
block_to_tx_number_cache: &mut HashMap<BlockNumber, TxNumber>,
|
||||
highest_snapshot: Option<BlockNumber>,
|
||||
block_range: &RangeInclusive<BlockNumber>,
|
||||
) -> RethResult<RangeInclusive<TxNumber>> {
|
||||
let from_tx_number = if let Some(block_number) = highest_snapshot {
|
||||
*block_to_tx_number_cache.entry(block_number).or_insert(
|
||||
provider
|
||||
.block_body_indices(block_number)?
|
||||
.ok_or(RethError::Custom(
|
||||
"Block body indices for highest snapshot not found".to_string(),
|
||||
))?
|
||||
.next_tx_num(),
|
||||
)
|
||||
} else {
|
||||
0
|
||||
};
|
||||
|
||||
let to_tx_number = *block_to_tx_number_cache.entry(*block_range.end()).or_insert(
|
||||
provider
|
||||
.block_body_indices(*block_range.end())?
|
||||
.ok_or(RethError::Custom(
|
||||
"Block body indices for block range end not found".to_string(),
|
||||
))?
|
||||
.last_tx_num(),
|
||||
);
|
||||
Ok(from_tx_number..=to_tx_number)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::{snapshotter::SnapshotTargets, Snapshotter};
|
||||
use assert_matches::assert_matches;
|
||||
use reth_interfaces::{
|
||||
test_utils::{generators, generators::random_block_range},
|
||||
RethError,
|
||||
};
|
||||
use reth_primitives::{B256, MAINNET};
|
||||
use reth_stages::test_utils::TestTransaction;
|
||||
|
||||
#[test]
|
||||
fn get_snapshot_targets() {
|
||||
let tx = TestTransaction::default();
|
||||
let mut rng = generators::rng();
|
||||
|
||||
let blocks = random_block_range(&mut rng, 0..=3, B256::ZERO, 2..3);
|
||||
tx.insert_blocks(blocks.iter(), None).expect("insert blocks");
|
||||
|
||||
let mut snapshotter = Snapshotter::new(tx.inner_raw(), MAINNET.clone(), 2);
|
||||
|
||||
// Snapshot targets has data per part up to the passed finalized block number,
|
||||
// respecting the block interval
|
||||
let targets = snapshotter.get_snapshot_targets(1).expect("get snapshot targets");
|
||||
assert_eq!(
|
||||
targets,
|
||||
SnapshotTargets {
|
||||
headers: Some(0..=1),
|
||||
receipts: Some((0..=1, 0..=3)),
|
||||
transactions: Some((0..=1, 0..=3))
|
||||
}
|
||||
);
|
||||
assert!(targets.is_multiple_of_block_interval(snapshotter.block_interval));
|
||||
assert!(targets.is_contiguous_to_highest_snapshots(snapshotter.highest_snapshots));
|
||||
// Imitate snapshotter run according to the targets which updates the last snapshots state
|
||||
snapshotter.set_highest_snapshots_from_targets(&targets);
|
||||
|
||||
// Nothing to snapshot, last snapshots state of snapshotter doesn't pass the thresholds
|
||||
assert_eq!(
|
||||
snapshotter.get_snapshot_targets(2),
|
||||
Ok(SnapshotTargets { headers: None, receipts: None, transactions: None })
|
||||
);
|
||||
|
||||
// Snapshot targets has data per part up to the passed finalized block number,
|
||||
// respecting the block interval
|
||||
let targets = snapshotter.get_snapshot_targets(5).expect("get snapshot targets");
|
||||
assert_eq!(
|
||||
targets,
|
||||
SnapshotTargets {
|
||||
headers: Some(2..=3),
|
||||
receipts: Some((2..=3, 4..=7)),
|
||||
transactions: Some((2..=3, 4..=7))
|
||||
}
|
||||
);
|
||||
assert!(targets.is_multiple_of_block_interval(snapshotter.block_interval));
|
||||
assert!(targets.is_contiguous_to_highest_snapshots(snapshotter.highest_snapshots));
|
||||
// Imitate snapshotter run according to the targets which updates the last snapshots state
|
||||
snapshotter.set_highest_snapshots_from_targets(&targets);
|
||||
|
||||
// Block body indices not found
|
||||
assert_matches!(snapshotter.get_snapshot_targets(5), Err(RethError::Custom(_)));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user