From d21e346c04b8f3893aadc8d60d0e573528ba4d0e Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Tue, 14 Nov 2023 17:50:12 +0000 Subject: [PATCH] feat: share `SnapshotProvider` through `ProviderFactory` (#5249) Co-authored-by: Alexey Shekhirin --- bin/reth/src/dirs.rs | 5 ++ bin/reth/src/node/mod.rs | 25 ++++---- crates/primitives/src/snapshot/mod.rs | 26 +++++++++ crates/snapshot/src/lib.rs | 3 +- crates/snapshot/src/snapshotter.rs | 57 ++++++++----------- .../provider/src/providers/database/mod.rs | 45 +++++++++++++-- .../src/providers/database/provider.rs | 15 ++++- .../src/providers/snapshot/manager.rs | 26 ++++++++- 8 files changed, 146 insertions(+), 56 deletions(-) diff --git a/bin/reth/src/dirs.rs b/bin/reth/src/dirs.rs index 7f3f14232..9d1a1f552 100644 --- a/bin/reth/src/dirs.rs +++ b/bin/reth/src/dirs.rs @@ -266,6 +266,11 @@ impl ChainPath { self.0.join("db").into() } + /// Returns the path to the snapshots directory for this chain. + pub fn snapshots_path(&self) -> PathBuf { + self.0.join("snapshots").into() + } + /// Returns the path to the reth p2p secret key for this chain. /// /// `//discovery-secret` diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index e83684dd6..0dc2cd1ea 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -298,8 +298,16 @@ impl NodeCommand { // fetch the head block from the database let head = self.lookup_head(Arc::clone(&db)).wrap_err("the head block is missing")?; + // configure snapshotter + let snapshotter = reth_snapshot::Snapshotter::new( + db.clone(), + self.chain.clone(), + self.chain.snapshot_block_interval, + ); + // setup the blockchain provider - let factory = ProviderFactory::new(Arc::clone(&db), Arc::clone(&self.chain)); + let factory = ProviderFactory::new(Arc::clone(&db), Arc::clone(&self.chain)) + .with_snapshots(snapshotter.highest_snapshot_receiver()); let blockchain_db = BlockchainProvider::new(factory, blockchain_tree.clone())?; let blob_store = InMemoryBlobStore::default(); let validator = TransactionValidationTaskExecutor::eth_builder(Arc::clone(&self.chain)) @@ -454,12 +462,14 @@ impl NodeCommand { None }; - let (highest_snapshots_tx, highest_snapshots_rx) = watch::channel(None); - let mut hooks = EngineHooks::new(); let pruner_events = if let Some(prune_config) = prune_config { - let mut pruner = self.build_pruner(&prune_config, db.clone(), highest_snapshots_rx); + let mut pruner = self.build_pruner( + &prune_config, + db.clone(), + snapshotter.highest_snapshot_receiver(), + ); let events = pruner.events(); hooks.add(PruneHook::new(pruner, Box::new(ctx.task_executor.clone()))); @@ -470,13 +480,6 @@ impl NodeCommand { Either::Right(stream::empty()) }; - let _snapshotter = reth_snapshot::Snapshotter::new( - db, - self.chain.clone(), - self.chain.snapshot_block_interval, - highest_snapshots_tx, - ); - // Configure the consensus engine let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel( client, diff --git a/crates/primitives/src/snapshot/mod.rs b/crates/primitives/src/snapshot/mod.rs index d8fc8db53..f73ba9874 100644 --- a/crates/primitives/src/snapshot/mod.rs +++ b/crates/primitives/src/snapshot/mod.rs @@ -4,9 +4,35 @@ mod compression; mod filters; mod segment; +use alloy_primitives::BlockNumber; pub use compression::Compression; pub use filters::{Filters, InclusionFilter, PerfectHashingFunction}; pub use segment::{SegmentHeader, SnapshotSegment}; /// Default snapshot block count. pub const BLOCKS_PER_SNAPSHOT: u64 = 500_000; + +/// Highest snapshotted block numbers, per data part. +#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)] +pub struct HighestSnapshots { + /// Highest snapshotted block of headers, inclusive. + /// If [`None`], no snapshot is available. + pub headers: Option, + /// Highest snapshotted block of receipts, inclusive. + /// If [`None`], no snapshot is available. + pub receipts: Option, + /// Highest snapshotted block of transactions, inclusive. + /// If [`None`], no snapshot is available. + pub transactions: Option, +} + +impl HighestSnapshots { + /// Returns the highest snapshot if it exists for a segment + pub fn highest(&self, segment: SnapshotSegment) -> Option { + match segment { + SnapshotSegment::Headers => self.headers, + SnapshotSegment::Transactions => self.transactions, + SnapshotSegment::Receipts => self.receipts, + } + } +} diff --git a/crates/snapshot/src/lib.rs b/crates/snapshot/src/lib.rs index 18b22bdb5..82f42b2d4 100644 --- a/crates/snapshot/src/lib.rs +++ b/crates/snapshot/src/lib.rs @@ -15,6 +15,5 @@ mod snapshotter; pub use error::SnapshotterError; pub use snapshotter::{ - HighestSnapshots, HighestSnapshotsTracker, SnapshotTargets, Snapshotter, SnapshotterResult, - SnapshotterWithResult, + HighestSnapshotsTracker, SnapshotTargets, Snapshotter, SnapshotterResult, SnapshotterWithResult, }; diff --git a/crates/snapshot/src/snapshotter.rs b/crates/snapshot/src/snapshotter.rs index b850f68a1..6bc722f0f 100644 --- a/crates/snapshot/src/snapshotter.rs +++ b/crates/snapshot/src/snapshotter.rs @@ -3,7 +3,7 @@ use crate::SnapshotterError; use reth_db::database::Database; use reth_interfaces::{RethError, RethResult}; -use reth_primitives::{BlockNumber, ChainSpec, TxNumber}; +use reth_primitives::{snapshot::HighestSnapshots, BlockNumber, ChainSpec, TxNumber}; use reth_provider::{BlockReader, DatabaseProviderRO, ProviderFactory}; use std::{collections::HashMap, ops::RangeInclusive, sync::Arc}; use tokio::sync::watch; @@ -18,9 +18,14 @@ pub type SnapshotterWithResult = (Snapshotter, SnapshotterResult); /// Snapshotting routine. Main snapshotting logic happens in [Snapshotter::run]. #[derive(Debug)] pub struct Snapshotter { + /// Provider factory provider_factory: ProviderFactory, + /// Highest snapshotted block numbers for each segment highest_snapshots: HighestSnapshots, - highest_snapshots_tracker: watch::Sender>, + /// Channel sender to notify other components of the new highest snapshots + highest_snapshots_notifier: watch::Sender>, + /// Channel receiver to be cloned and shared that already comes with the newest value + highest_snapshots_tracker: HighestSnapshotsTracker, /// Block interval after which the snapshot is taken. block_interval: u64, } @@ -28,20 +33,6 @@ pub struct Snapshotter { /// Tracker for the latest [`HighestSnapshots`] value. pub type HighestSnapshotsTracker = watch::Receiver>; -/// Highest snapshotted block numbers, per data part. -#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)] -pub struct HighestSnapshots { - /// Highest snapshotted block of headers, inclusive. - /// If [`None`], no snapshot is available. - pub headers: Option, - /// Highest snapshotted block of receipts, inclusive. - /// If [`None`], no snapshot is available. - pub receipts: Option, - /// Highest snapshotted block of transactions, inclusive. - /// If [`None`], no snapshot is available. - pub transactions: Option, -} - /// Snapshot targets, per data part, measured in [`BlockNumber`] and [`TxNumber`], if applicable. #[derive(Debug, Clone, Eq, PartialEq)] pub struct SnapshotTargets { @@ -88,16 +79,14 @@ impl SnapshotTargets { impl Snapshotter { /// Creates a new [Snapshotter]. - pub fn new( - db: DB, - chain_spec: Arc, - block_interval: u64, - highest_snapshots_tracker: watch::Sender>, - ) -> Self { + pub fn new(db: DB, chain_spec: Arc, block_interval: u64) -> Self { + let (highest_snapshots_notifier, highest_snapshots_tracker) = watch::channel(None); + let snapshotter = Self { provider_factory: ProviderFactory::new(db, chain_spec), // TODO(alexey): fill from on-disk snapshot data highest_snapshots: HighestSnapshots::default(), + highest_snapshots_notifier, highest_snapshots_tracker, block_interval, }; @@ -121,11 +110,16 @@ impl Snapshotter { } fn update_highest_snapshots_tracker(&self) { - let _ = self.highest_snapshots_tracker.send(Some(self.highest_snapshots)).map_err(|_| { + let _ = self.highest_snapshots_notifier.send(Some(self.highest_snapshots)).map_err(|_| { warn!(target: "snapshot", "Highest snapshots channel closed"); }); } + /// Returns a new [`HighestSnapshotsTracker`]. + pub fn highest_snapshot_receiver(&self) -> HighestSnapshotsTracker { + self.highest_snapshots_tracker.clone() + } + /// Run the snapshotter pub fn run(&mut self, targets: SnapshotTargets) -> SnapshotterResult { debug_assert!(targets.is_multiple_of_block_interval(self.block_interval)); @@ -240,25 +234,25 @@ impl Snapshotter { #[cfg(test)] mod tests { - use crate::{snapshotter::SnapshotTargets, HighestSnapshots, Snapshotter}; + use crate::{snapshotter::SnapshotTargets, Snapshotter}; use assert_matches::assert_matches; use reth_interfaces::{ test_utils::{generators, generators::random_block_range}, RethError, }; - use reth_primitives::{B256, MAINNET}; + use reth_primitives::{snapshot::HighestSnapshots, B256, MAINNET}; use reth_stages::test_utils::TestTransaction; - use tokio::sync::watch; #[test] fn new() { let tx = TestTransaction::default(); - let (highest_snapshots_tx, highest_snapshots_rx) = watch::channel(None); - assert_eq!(*highest_snapshots_rx.borrow(), None); + let snapshotter = Snapshotter::new(tx.inner_raw(), MAINNET.clone(), 2); - Snapshotter::new(tx.inner_raw(), MAINNET.clone(), 2, highest_snapshots_tx); - assert_eq!(*highest_snapshots_rx.borrow(), Some(HighestSnapshots::default())); + assert_eq!( + *snapshotter.highest_snapshot_receiver().borrow(), + Some(HighestSnapshots::default()) + ); } #[test] @@ -269,8 +263,7 @@ mod tests { let blocks = random_block_range(&mut rng, 0..=3, B256::ZERO, 2..3); tx.insert_blocks(blocks.iter(), None).expect("insert blocks"); - let mut snapshotter = - Snapshotter::new(tx.inner_raw(), MAINNET.clone(), 2, watch::channel(None).0); + let mut snapshotter = Snapshotter::new(tx.inner_raw(), MAINNET.clone(), 2); // Snapshot targets has data per part up to the passed finalized block number, // respecting the block interval diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index e3764845c..5336de0bf 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -1,5 +1,8 @@ use crate::{ - providers::state::{historical::HistoricalStateProvider, latest::LatestStateProvider}, + providers::{ + state::{historical::HistoricalStateProvider, latest::LatestStateProvider}, + SnapshotProvider, + }, traits::{BlockSource, ReceiptProvider}, BlockHashReader, BlockNumReader, BlockReader, ChainSpecProvider, EvmEnvProvider, HeaderProvider, ProviderError, PruneCheckpointReader, StageCheckpointReader, StateProviderBox, @@ -8,6 +11,7 @@ use crate::{ use reth_db::{database::Database, init_db, models::StoredBlockBodyIndices, DatabaseEnv}; use reth_interfaces::{db::LogLevel, RethError, RethResult}; use reth_primitives::{ + snapshot::HighestSnapshots, stage::{StageCheckpoint, StageId}, Address, Block, BlockHash, BlockHashOrNumber, BlockNumber, BlockWithSenders, ChainInfo, ChainSpec, Header, PruneCheckpoint, PruneSegment, Receipt, SealedBlock, SealedHeader, @@ -19,6 +23,7 @@ use std::{ ops::{RangeBounds, RangeInclusive}, sync::Arc, }; +use tokio::sync::watch; use tracing::trace; mod metrics; @@ -35,6 +40,8 @@ pub struct ProviderFactory { db: DB, /// Chain spec chain_spec: Arc, + /// Snapshot Provider + snapshot_provider: Option>, } impl ProviderFactory { @@ -42,7 +49,13 @@ impl ProviderFactory { /// database using different types of providers. Example: [`HeaderProvider`] /// [`BlockHashReader`]. This may fail if the inner read database transaction fails to open. pub fn provider(&self) -> RethResult> { - Ok(DatabaseProvider::new(self.db.tx()?, self.chain_spec.clone())) + let mut provider = DatabaseProvider::new(self.db.tx()?, self.chain_spec.clone()); + + if let Some(snapshot_provider) = &self.snapshot_provider { + provider = provider.with_snapshot_provider(snapshot_provider.clone()); + } + + Ok(provider) } /// Returns a provider with a created `DbTxMut` inside, which allows fetching and updating @@ -50,14 +63,31 @@ impl ProviderFactory { /// [`BlockHashReader`]. This may fail if the inner read/write database transaction fails to /// open. pub fn provider_rw(&self) -> RethResult> { - Ok(DatabaseProviderRW(DatabaseProvider::new_rw(self.db.tx_mut()?, self.chain_spec.clone()))) + let mut provider = DatabaseProvider::new_rw(self.db.tx_mut()?, self.chain_spec.clone()); + + if let Some(snapshot_provider) = &self.snapshot_provider { + provider = provider.with_snapshot_provider(snapshot_provider.clone()); + } + + Ok(DatabaseProviderRW(provider)) } } impl ProviderFactory { /// create new database provider pub fn new(db: DB, chain_spec: Arc) -> Self { - Self { db, chain_spec } + Self { db, chain_spec, snapshot_provider: None } + } + + /// database provider comes with a shared snapshot provider + pub fn with_snapshots( + mut self, + highest_snapshot_tracker: watch::Receiver>, + ) -> Self { + self.snapshot_provider = Some(Arc::new( + SnapshotProvider::default().with_highest_tracker(Some(highest_snapshot_tracker)), + )); + self } } @@ -72,13 +102,18 @@ impl ProviderFactory { Ok(ProviderFactory:: { db: init_db(path, log_level).map_err(|e| RethError::Custom(e.to_string()))?, chain_spec, + snapshot_provider: None, }) } } impl Clone for ProviderFactory { fn clone(&self) -> Self { - Self { db: self.db.clone(), chain_spec: Arc::clone(&self.chain_spec) } + Self { + db: self.db.clone(), + chain_spec: Arc::clone(&self.chain_spec), + snapshot_provider: self.snapshot_provider.clone(), + } } } diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index c082e8d4e..8da8df839 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -1,6 +1,6 @@ use crate::{ bundle_state::{BundleStateInit, BundleStateWithReceipts, RevertsInit}, - providers::database::metrics, + providers::{database::metrics, SnapshotProvider}, traits::{ AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter, }, @@ -100,12 +100,15 @@ pub struct DatabaseProvider { tx: TX, /// Chain spec chain_spec: Arc, + /// Snapshot provider + #[allow(unused)] + snapshot_provider: Option>, } impl DatabaseProvider { /// Creates a provider with an inner read-write transaction. pub fn new_rw(tx: TX, chain_spec: Arc) -> Self { - Self { tx, chain_spec } + Self { tx, chain_spec, snapshot_provider: None } } } @@ -160,7 +163,13 @@ where impl DatabaseProvider { /// Creates a provider with an inner read-only transaction. pub fn new(tx: TX, chain_spec: Arc) -> Self { - Self { tx, chain_spec } + Self { tx, chain_spec, snapshot_provider: None } + } + + /// Creates a new [`Self`] with access to a [`SnapshotProvider`]. + pub fn with_snapshot_provider(mut self, snapshot_provider: Arc) -> Self { + self.snapshot_provider = Some(snapshot_provider); + self } /// Consume `DbTx` or `DbTxMut`. diff --git a/crates/storage/provider/src/providers/snapshot/manager.rs b/crates/storage/provider/src/providers/snapshot/manager.rs index a065f6d1d..1b26f1db6 100644 --- a/crates/storage/provider/src/providers/snapshot/manager.rs +++ b/crates/storage/provider/src/providers/snapshot/manager.rs @@ -4,11 +4,13 @@ use dashmap::DashMap; use reth_interfaces::RethResult; use reth_nippy_jar::NippyJar; use reth_primitives::{ - snapshot::BLOCKS_PER_SNAPSHOT, Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, - Header, SealedHeader, SnapshotSegment, TransactionMeta, TransactionSigned, - TransactionSignedNoHash, TxHash, TxNumber, B256, U256, + snapshot::{HighestSnapshots, BLOCKS_PER_SNAPSHOT}, + Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, Header, SealedHeader, + SnapshotSegment, TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber, + B256, U256, }; use std::{ops::RangeBounds, path::PathBuf}; +use tokio::sync::watch; /// SnapshotProvider #[derive(Debug, Default)] @@ -16,9 +18,20 @@ pub struct SnapshotProvider { /// Maintains a map which allows for concurrent access to different `NippyJars`, over different /// segments and ranges. map: DashMap<(BlockNumber, SnapshotSegment), LoadedJar>, + /// Tracks the highest snapshot of every segment. + highest_tracker: Option>>, } impl SnapshotProvider { + /// Adds a highest snapshot tracker to the provider + pub fn with_highest_tracker( + mut self, + highest_tracker: Option>>, + ) -> Self { + self.highest_tracker = highest_tracker; + self + } + /// Gets the provider of the requested segment and range. pub fn get_segment_provider( &self, @@ -44,6 +57,13 @@ impl SnapshotProvider { self.get_segment_provider(segment, block, path) } + + /// Gets the highest snapshot if it exists for a snapshot segment. + pub fn get_highest_snapshot(&self, segment: SnapshotSegment) -> Option { + self.highest_tracker + .as_ref() + .and_then(|tracker| tracker.borrow().and_then(|highest| highest.highest(segment))) + } } impl HeaderProvider for SnapshotProvider {