feat: share SnapshotProvider through ProviderFactory (#5249)

Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
This commit is contained in:
joshieDo
2023-11-14 17:50:12 +00:00
committed by GitHub
parent 8ecd90b884
commit d21e346c04
8 changed files with 146 additions and 56 deletions

View File

@ -266,6 +266,11 @@ impl<D> ChainPath<D> {
self.0.join("db").into() self.0.join("db").into()
} }
/// Returns the path to the snapshots directory for this chain.
pub fn snapshots_path(&self) -> PathBuf {
self.0.join("snapshots").into()
}
/// Returns the path to the reth p2p secret key for this chain. /// Returns the path to the reth p2p secret key for this chain.
/// ///
/// `<DIR>/<CHAIN_ID>/discovery-secret` /// `<DIR>/<CHAIN_ID>/discovery-secret`

View File

@ -298,8 +298,16 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
// fetch the head block from the database // fetch the head block from the database
let head = self.lookup_head(Arc::clone(&db)).wrap_err("the head block is missing")?; let head = self.lookup_head(Arc::clone(&db)).wrap_err("the head block is missing")?;
// configure snapshotter
let snapshotter = reth_snapshot::Snapshotter::new(
db.clone(),
self.chain.clone(),
self.chain.snapshot_block_interval,
);
// setup the blockchain provider // setup the blockchain provider
let factory = ProviderFactory::new(Arc::clone(&db), Arc::clone(&self.chain)); let factory = ProviderFactory::new(Arc::clone(&db), Arc::clone(&self.chain))
.with_snapshots(snapshotter.highest_snapshot_receiver());
let blockchain_db = BlockchainProvider::new(factory, blockchain_tree.clone())?; let blockchain_db = BlockchainProvider::new(factory, blockchain_tree.clone())?;
let blob_store = InMemoryBlobStore::default(); let blob_store = InMemoryBlobStore::default();
let validator = TransactionValidationTaskExecutor::eth_builder(Arc::clone(&self.chain)) let validator = TransactionValidationTaskExecutor::eth_builder(Arc::clone(&self.chain))
@ -454,12 +462,14 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
None None
}; };
let (highest_snapshots_tx, highest_snapshots_rx) = watch::channel(None);
let mut hooks = EngineHooks::new(); let mut hooks = EngineHooks::new();
let pruner_events = if let Some(prune_config) = prune_config { let pruner_events = if let Some(prune_config) = prune_config {
let mut pruner = self.build_pruner(&prune_config, db.clone(), highest_snapshots_rx); let mut pruner = self.build_pruner(
&prune_config,
db.clone(),
snapshotter.highest_snapshot_receiver(),
);
let events = pruner.events(); let events = pruner.events();
hooks.add(PruneHook::new(pruner, Box::new(ctx.task_executor.clone()))); hooks.add(PruneHook::new(pruner, Box::new(ctx.task_executor.clone())));
@ -470,13 +480,6 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
Either::Right(stream::empty()) Either::Right(stream::empty())
}; };
let _snapshotter = reth_snapshot::Snapshotter::new(
db,
self.chain.clone(),
self.chain.snapshot_block_interval,
highest_snapshots_tx,
);
// Configure the consensus engine // Configure the consensus engine
let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel( let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel(
client, client,

View File

@ -4,9 +4,35 @@ mod compression;
mod filters; mod filters;
mod segment; mod segment;
use alloy_primitives::BlockNumber;
pub use compression::Compression; pub use compression::Compression;
pub use filters::{Filters, InclusionFilter, PerfectHashingFunction}; pub use filters::{Filters, InclusionFilter, PerfectHashingFunction};
pub use segment::{SegmentHeader, SnapshotSegment}; pub use segment::{SegmentHeader, SnapshotSegment};
/// Default snapshot block count. /// Default snapshot block count.
pub const BLOCKS_PER_SNAPSHOT: u64 = 500_000; pub const BLOCKS_PER_SNAPSHOT: u64 = 500_000;
/// Highest snapshotted block numbers, per data part.
#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
pub struct HighestSnapshots {
/// Highest snapshotted block of headers, inclusive.
/// If [`None`], no snapshot is available.
pub headers: Option<BlockNumber>,
/// Highest snapshotted block of receipts, inclusive.
/// If [`None`], no snapshot is available.
pub receipts: Option<BlockNumber>,
/// Highest snapshotted block of transactions, inclusive.
/// If [`None`], no snapshot is available.
pub transactions: Option<BlockNumber>,
}
impl HighestSnapshots {
/// Returns the highest snapshot if it exists for a segment
pub fn highest(&self, segment: SnapshotSegment) -> Option<BlockNumber> {
match segment {
SnapshotSegment::Headers => self.headers,
SnapshotSegment::Transactions => self.transactions,
SnapshotSegment::Receipts => self.receipts,
}
}
}

View File

@ -15,6 +15,5 @@ mod snapshotter;
pub use error::SnapshotterError; pub use error::SnapshotterError;
pub use snapshotter::{ pub use snapshotter::{
HighestSnapshots, HighestSnapshotsTracker, SnapshotTargets, Snapshotter, SnapshotterResult, HighestSnapshotsTracker, SnapshotTargets, Snapshotter, SnapshotterResult, SnapshotterWithResult,
SnapshotterWithResult,
}; };

View File

@ -3,7 +3,7 @@
use crate::SnapshotterError; use crate::SnapshotterError;
use reth_db::database::Database; use reth_db::database::Database;
use reth_interfaces::{RethError, RethResult}; use reth_interfaces::{RethError, RethResult};
use reth_primitives::{BlockNumber, ChainSpec, TxNumber}; use reth_primitives::{snapshot::HighestSnapshots, BlockNumber, ChainSpec, TxNumber};
use reth_provider::{BlockReader, DatabaseProviderRO, ProviderFactory}; use reth_provider::{BlockReader, DatabaseProviderRO, ProviderFactory};
use std::{collections::HashMap, ops::RangeInclusive, sync::Arc}; use std::{collections::HashMap, ops::RangeInclusive, sync::Arc};
use tokio::sync::watch; use tokio::sync::watch;
@ -18,9 +18,14 @@ pub type SnapshotterWithResult<DB> = (Snapshotter<DB>, SnapshotterResult);
/// Snapshotting routine. Main snapshotting logic happens in [Snapshotter::run]. /// Snapshotting routine. Main snapshotting logic happens in [Snapshotter::run].
#[derive(Debug)] #[derive(Debug)]
pub struct Snapshotter<DB> { pub struct Snapshotter<DB> {
/// Provider factory
provider_factory: ProviderFactory<DB>, provider_factory: ProviderFactory<DB>,
/// Highest snapshotted block numbers for each segment
highest_snapshots: HighestSnapshots, highest_snapshots: HighestSnapshots,
highest_snapshots_tracker: watch::Sender<Option<HighestSnapshots>>, /// Channel sender to notify other components of the new highest snapshots
highest_snapshots_notifier: watch::Sender<Option<HighestSnapshots>>,
/// Channel receiver to be cloned and shared that already comes with the newest value
highest_snapshots_tracker: HighestSnapshotsTracker,
/// Block interval after which the snapshot is taken. /// Block interval after which the snapshot is taken.
block_interval: u64, block_interval: u64,
} }
@ -28,20 +33,6 @@ pub struct Snapshotter<DB> {
/// Tracker for the latest [`HighestSnapshots`] value. /// Tracker for the latest [`HighestSnapshots`] value.
pub type HighestSnapshotsTracker = watch::Receiver<Option<HighestSnapshots>>; pub type HighestSnapshotsTracker = watch::Receiver<Option<HighestSnapshots>>;
/// Highest snapshotted block numbers, per data part.
#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
pub struct HighestSnapshots {
/// Highest snapshotted block of headers, inclusive.
/// If [`None`], no snapshot is available.
pub headers: Option<BlockNumber>,
/// Highest snapshotted block of receipts, inclusive.
/// If [`None`], no snapshot is available.
pub receipts: Option<BlockNumber>,
/// Highest snapshotted block of transactions, inclusive.
/// If [`None`], no snapshot is available.
pub transactions: Option<BlockNumber>,
}
/// Snapshot targets, per data part, measured in [`BlockNumber`] and [`TxNumber`], if applicable. /// Snapshot targets, per data part, measured in [`BlockNumber`] and [`TxNumber`], if applicable.
#[derive(Debug, Clone, Eq, PartialEq)] #[derive(Debug, Clone, Eq, PartialEq)]
pub struct SnapshotTargets { pub struct SnapshotTargets {
@ -88,16 +79,14 @@ impl SnapshotTargets {
impl<DB: Database> Snapshotter<DB> { impl<DB: Database> Snapshotter<DB> {
/// Creates a new [Snapshotter]. /// Creates a new [Snapshotter].
pub fn new( pub fn new(db: DB, chain_spec: Arc<ChainSpec>, block_interval: u64) -> Self {
db: DB, let (highest_snapshots_notifier, highest_snapshots_tracker) = watch::channel(None);
chain_spec: Arc<ChainSpec>,
block_interval: u64,
highest_snapshots_tracker: watch::Sender<Option<HighestSnapshots>>,
) -> Self {
let snapshotter = Self { let snapshotter = Self {
provider_factory: ProviderFactory::new(db, chain_spec), provider_factory: ProviderFactory::new(db, chain_spec),
// TODO(alexey): fill from on-disk snapshot data // TODO(alexey): fill from on-disk snapshot data
highest_snapshots: HighestSnapshots::default(), highest_snapshots: HighestSnapshots::default(),
highest_snapshots_notifier,
highest_snapshots_tracker, highest_snapshots_tracker,
block_interval, block_interval,
}; };
@ -121,11 +110,16 @@ impl<DB: Database> Snapshotter<DB> {
} }
fn update_highest_snapshots_tracker(&self) { fn update_highest_snapshots_tracker(&self) {
let _ = self.highest_snapshots_tracker.send(Some(self.highest_snapshots)).map_err(|_| { let _ = self.highest_snapshots_notifier.send(Some(self.highest_snapshots)).map_err(|_| {
warn!(target: "snapshot", "Highest snapshots channel closed"); warn!(target: "snapshot", "Highest snapshots channel closed");
}); });
} }
/// Returns a new [`HighestSnapshotsTracker`].
pub fn highest_snapshot_receiver(&self) -> HighestSnapshotsTracker {
self.highest_snapshots_tracker.clone()
}
/// Run the snapshotter /// Run the snapshotter
pub fn run(&mut self, targets: SnapshotTargets) -> SnapshotterResult { pub fn run(&mut self, targets: SnapshotTargets) -> SnapshotterResult {
debug_assert!(targets.is_multiple_of_block_interval(self.block_interval)); debug_assert!(targets.is_multiple_of_block_interval(self.block_interval));
@ -240,25 +234,25 @@ impl<DB: Database> Snapshotter<DB> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::{snapshotter::SnapshotTargets, HighestSnapshots, Snapshotter}; use crate::{snapshotter::SnapshotTargets, Snapshotter};
use assert_matches::assert_matches; use assert_matches::assert_matches;
use reth_interfaces::{ use reth_interfaces::{
test_utils::{generators, generators::random_block_range}, test_utils::{generators, generators::random_block_range},
RethError, RethError,
}; };
use reth_primitives::{B256, MAINNET}; use reth_primitives::{snapshot::HighestSnapshots, B256, MAINNET};
use reth_stages::test_utils::TestTransaction; use reth_stages::test_utils::TestTransaction;
use tokio::sync::watch;
#[test] #[test]
fn new() { fn new() {
let tx = TestTransaction::default(); let tx = TestTransaction::default();
let (highest_snapshots_tx, highest_snapshots_rx) = watch::channel(None); let snapshotter = Snapshotter::new(tx.inner_raw(), MAINNET.clone(), 2);
assert_eq!(*highest_snapshots_rx.borrow(), None);
Snapshotter::new(tx.inner_raw(), MAINNET.clone(), 2, highest_snapshots_tx); assert_eq!(
assert_eq!(*highest_snapshots_rx.borrow(), Some(HighestSnapshots::default())); *snapshotter.highest_snapshot_receiver().borrow(),
Some(HighestSnapshots::default())
);
} }
#[test] #[test]
@ -269,8 +263,7 @@ mod tests {
let blocks = random_block_range(&mut rng, 0..=3, B256::ZERO, 2..3); let blocks = random_block_range(&mut rng, 0..=3, B256::ZERO, 2..3);
tx.insert_blocks(blocks.iter(), None).expect("insert blocks"); tx.insert_blocks(blocks.iter(), None).expect("insert blocks");
let mut snapshotter = let mut snapshotter = Snapshotter::new(tx.inner_raw(), MAINNET.clone(), 2);
Snapshotter::new(tx.inner_raw(), MAINNET.clone(), 2, watch::channel(None).0);
// Snapshot targets has data per part up to the passed finalized block number, // Snapshot targets has data per part up to the passed finalized block number,
// respecting the block interval // respecting the block interval

View File

@ -1,5 +1,8 @@
use crate::{ use crate::{
providers::state::{historical::HistoricalStateProvider, latest::LatestStateProvider}, providers::{
state::{historical::HistoricalStateProvider, latest::LatestStateProvider},
SnapshotProvider,
},
traits::{BlockSource, ReceiptProvider}, traits::{BlockSource, ReceiptProvider},
BlockHashReader, BlockNumReader, BlockReader, ChainSpecProvider, EvmEnvProvider, BlockHashReader, BlockNumReader, BlockReader, ChainSpecProvider, EvmEnvProvider,
HeaderProvider, ProviderError, PruneCheckpointReader, StageCheckpointReader, StateProviderBox, HeaderProvider, ProviderError, PruneCheckpointReader, StageCheckpointReader, StateProviderBox,
@ -8,6 +11,7 @@ use crate::{
use reth_db::{database::Database, init_db, models::StoredBlockBodyIndices, DatabaseEnv}; use reth_db::{database::Database, init_db, models::StoredBlockBodyIndices, DatabaseEnv};
use reth_interfaces::{db::LogLevel, RethError, RethResult}; use reth_interfaces::{db::LogLevel, RethError, RethResult};
use reth_primitives::{ use reth_primitives::{
snapshot::HighestSnapshots,
stage::{StageCheckpoint, StageId}, stage::{StageCheckpoint, StageId},
Address, Block, BlockHash, BlockHashOrNumber, BlockNumber, BlockWithSenders, ChainInfo, Address, Block, BlockHash, BlockHashOrNumber, BlockNumber, BlockWithSenders, ChainInfo,
ChainSpec, Header, PruneCheckpoint, PruneSegment, Receipt, SealedBlock, SealedHeader, ChainSpec, Header, PruneCheckpoint, PruneSegment, Receipt, SealedBlock, SealedHeader,
@ -19,6 +23,7 @@ use std::{
ops::{RangeBounds, RangeInclusive}, ops::{RangeBounds, RangeInclusive},
sync::Arc, sync::Arc,
}; };
use tokio::sync::watch;
use tracing::trace; use tracing::trace;
mod metrics; mod metrics;
@ -35,6 +40,8 @@ pub struct ProviderFactory<DB> {
db: DB, db: DB,
/// Chain spec /// Chain spec
chain_spec: Arc<ChainSpec>, chain_spec: Arc<ChainSpec>,
/// Snapshot Provider
snapshot_provider: Option<Arc<SnapshotProvider>>,
} }
impl<DB: Database> ProviderFactory<DB> { impl<DB: Database> ProviderFactory<DB> {
@ -42,7 +49,13 @@ impl<DB: Database> ProviderFactory<DB> {
/// database using different types of providers. Example: [`HeaderProvider`] /// database using different types of providers. Example: [`HeaderProvider`]
/// [`BlockHashReader`]. This may fail if the inner read database transaction fails to open. /// [`BlockHashReader`]. This may fail if the inner read database transaction fails to open.
pub fn provider(&self) -> RethResult<DatabaseProviderRO<'_, DB>> { pub fn provider(&self) -> RethResult<DatabaseProviderRO<'_, DB>> {
Ok(DatabaseProvider::new(self.db.tx()?, self.chain_spec.clone())) let mut provider = DatabaseProvider::new(self.db.tx()?, self.chain_spec.clone());
if let Some(snapshot_provider) = &self.snapshot_provider {
provider = provider.with_snapshot_provider(snapshot_provider.clone());
}
Ok(provider)
} }
/// Returns a provider with a created `DbTxMut` inside, which allows fetching and updating /// Returns a provider with a created `DbTxMut` inside, which allows fetching and updating
@ -50,14 +63,31 @@ impl<DB: Database> ProviderFactory<DB> {
/// [`BlockHashReader`]. This may fail if the inner read/write database transaction fails to /// [`BlockHashReader`]. This may fail if the inner read/write database transaction fails to
/// open. /// open.
pub fn provider_rw(&self) -> RethResult<DatabaseProviderRW<'_, DB>> { pub fn provider_rw(&self) -> RethResult<DatabaseProviderRW<'_, DB>> {
Ok(DatabaseProviderRW(DatabaseProvider::new_rw(self.db.tx_mut()?, self.chain_spec.clone()))) let mut provider = DatabaseProvider::new_rw(self.db.tx_mut()?, self.chain_spec.clone());
if let Some(snapshot_provider) = &self.snapshot_provider {
provider = provider.with_snapshot_provider(snapshot_provider.clone());
}
Ok(DatabaseProviderRW(provider))
} }
} }
impl<DB> ProviderFactory<DB> { impl<DB> ProviderFactory<DB> {
/// create new database provider /// create new database provider
pub fn new(db: DB, chain_spec: Arc<ChainSpec>) -> Self { pub fn new(db: DB, chain_spec: Arc<ChainSpec>) -> Self {
Self { db, chain_spec } Self { db, chain_spec, snapshot_provider: None }
}
/// database provider comes with a shared snapshot provider
pub fn with_snapshots(
mut self,
highest_snapshot_tracker: watch::Receiver<Option<HighestSnapshots>>,
) -> Self {
self.snapshot_provider = Some(Arc::new(
SnapshotProvider::default().with_highest_tracker(Some(highest_snapshot_tracker)),
));
self
} }
} }
@ -72,13 +102,18 @@ impl<DB: Database> ProviderFactory<DB> {
Ok(ProviderFactory::<DatabaseEnv> { Ok(ProviderFactory::<DatabaseEnv> {
db: init_db(path, log_level).map_err(|e| RethError::Custom(e.to_string()))?, db: init_db(path, log_level).map_err(|e| RethError::Custom(e.to_string()))?,
chain_spec, chain_spec,
snapshot_provider: None,
}) })
} }
} }
impl<DB: Clone> Clone for ProviderFactory<DB> { impl<DB: Clone> Clone for ProviderFactory<DB> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { db: self.db.clone(), chain_spec: Arc::clone(&self.chain_spec) } Self {
db: self.db.clone(),
chain_spec: Arc::clone(&self.chain_spec),
snapshot_provider: self.snapshot_provider.clone(),
}
} }
} }

View File

@ -1,6 +1,6 @@
use crate::{ use crate::{
bundle_state::{BundleStateInit, BundleStateWithReceipts, RevertsInit}, bundle_state::{BundleStateInit, BundleStateWithReceipts, RevertsInit},
providers::database::metrics, providers::{database::metrics, SnapshotProvider},
traits::{ traits::{
AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter, AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
}, },
@ -100,12 +100,15 @@ pub struct DatabaseProvider<TX> {
tx: TX, tx: TX,
/// Chain spec /// Chain spec
chain_spec: Arc<ChainSpec>, chain_spec: Arc<ChainSpec>,
/// Snapshot provider
#[allow(unused)]
snapshot_provider: Option<Arc<SnapshotProvider>>,
} }
impl<TX: DbTxMut> DatabaseProvider<TX> { impl<TX: DbTxMut> DatabaseProvider<TX> {
/// Creates a provider with an inner read-write transaction. /// Creates a provider with an inner read-write transaction.
pub fn new_rw(tx: TX, chain_spec: Arc<ChainSpec>) -> Self { pub fn new_rw(tx: TX, chain_spec: Arc<ChainSpec>) -> Self {
Self { tx, chain_spec } Self { tx, chain_spec, snapshot_provider: None }
} }
} }
@ -160,7 +163,13 @@ where
impl<TX: DbTx> DatabaseProvider<TX> { impl<TX: DbTx> DatabaseProvider<TX> {
/// Creates a provider with an inner read-only transaction. /// Creates a provider with an inner read-only transaction.
pub fn new(tx: TX, chain_spec: Arc<ChainSpec>) -> Self { pub fn new(tx: TX, chain_spec: Arc<ChainSpec>) -> Self {
Self { tx, chain_spec } Self { tx, chain_spec, snapshot_provider: None }
}
/// Creates a new [`Self`] with access to a [`SnapshotProvider`].
pub fn with_snapshot_provider(mut self, snapshot_provider: Arc<SnapshotProvider>) -> Self {
self.snapshot_provider = Some(snapshot_provider);
self
} }
/// Consume `DbTx` or `DbTxMut`. /// Consume `DbTx` or `DbTxMut`.

View File

@ -4,11 +4,13 @@ use dashmap::DashMap;
use reth_interfaces::RethResult; use reth_interfaces::RethResult;
use reth_nippy_jar::NippyJar; use reth_nippy_jar::NippyJar;
use reth_primitives::{ use reth_primitives::{
snapshot::BLOCKS_PER_SNAPSHOT, Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, snapshot::{HighestSnapshots, BLOCKS_PER_SNAPSHOT},
Header, SealedHeader, SnapshotSegment, TransactionMeta, TransactionSigned, Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, Header, SealedHeader,
TransactionSignedNoHash, TxHash, TxNumber, B256, U256, SnapshotSegment, TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber,
B256, U256,
}; };
use std::{ops::RangeBounds, path::PathBuf}; use std::{ops::RangeBounds, path::PathBuf};
use tokio::sync::watch;
/// SnapshotProvider /// SnapshotProvider
#[derive(Debug, Default)] #[derive(Debug, Default)]
@ -16,9 +18,20 @@ pub struct SnapshotProvider {
/// Maintains a map which allows for concurrent access to different `NippyJars`, over different /// Maintains a map which allows for concurrent access to different `NippyJars`, over different
/// segments and ranges. /// segments and ranges.
map: DashMap<(BlockNumber, SnapshotSegment), LoadedJar>, map: DashMap<(BlockNumber, SnapshotSegment), LoadedJar>,
/// Tracks the highest snapshot of every segment.
highest_tracker: Option<watch::Receiver<Option<HighestSnapshots>>>,
} }
impl SnapshotProvider { impl SnapshotProvider {
/// Adds a highest snapshot tracker to the provider
pub fn with_highest_tracker(
mut self,
highest_tracker: Option<watch::Receiver<Option<HighestSnapshots>>>,
) -> Self {
self.highest_tracker = highest_tracker;
self
}
/// Gets the provider of the requested segment and range. /// Gets the provider of the requested segment and range.
pub fn get_segment_provider( pub fn get_segment_provider(
&self, &self,
@ -44,6 +57,13 @@ impl SnapshotProvider {
self.get_segment_provider(segment, block, path) self.get_segment_provider(segment, block, path)
} }
/// Gets the highest snapshot if it exists for a snapshot segment.
pub fn get_highest_snapshot(&self, segment: SnapshotSegment) -> Option<BlockNumber> {
self.highest_tracker
.as_ref()
.and_then(|tracker| tracker.borrow().and_then(|highest| highest.highest(segment)))
}
} }
impl HeaderProvider for SnapshotProvider { impl HeaderProvider for SnapshotProvider {