feat: Snapshotter triggers segment snapshots (#5287)

Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
joshieDo
2023-11-14 21:38:54 +00:00
committed by GitHub
parent 7b781eb602
commit 6ee481b817
10 changed files with 171 additions and 55 deletions

View File

@ -13,7 +13,10 @@ use reth_provider::{
providers::SnapshotProvider, DatabaseProviderRO, HeaderProvider, ProviderError, ProviderFactory,
};
use reth_snapshot::{segments, segments::Segment};
use std::{path::Path, sync::Arc};
use std::{
path::{Path, PathBuf},
sync::Arc,
};
impl Command {
pub(crate) fn generate_headers_snapshot<DB: Database>(
@ -32,7 +35,7 @@ impl Command {
let segment = segments::Headers::new(compression, filters);
segment.snapshot::<DB>(provider, range.clone())?;
segment.snapshot::<DB>(provider, PathBuf::default(), range.clone())?;
// Default name doesn't have any configuration
reth_primitives::fs::rename(

View File

@ -14,7 +14,10 @@ use reth_provider::{
ReceiptProvider, TransactionsProvider, TransactionsProviderExt,
};
use reth_snapshot::{segments, segments::Segment};
use std::{path::Path, sync::Arc};
use std::{
path::{Path, PathBuf},
sync::Arc,
};
impl Command {
pub(crate) fn generate_receipts_snapshot<DB: Database>(
@ -33,7 +36,7 @@ impl Command {
let segment = segments::Receipts::new(compression, filters);
segment.snapshot::<DB>(provider, range.clone())?;
segment.snapshot::<DB>(provider, PathBuf::default(), range.clone())?;
// Default name doesn't have any configuration
reth_primitives::fs::rename(

View File

@ -14,7 +14,10 @@ use reth_provider::{
TransactionsProvider, TransactionsProviderExt,
};
use reth_snapshot::{segments, segments::Segment};
use std::{path::Path, sync::Arc};
use std::{
path::{Path, PathBuf},
sync::Arc,
};
impl Command {
pub(crate) fn generate_transactions_snapshot<DB: Database>(
@ -33,7 +36,7 @@ impl Command {
let segment = segments::Transactions::new(compression, filters);
segment.snapshot::<DB>(provider, range.clone())?;
segment.snapshot::<DB>(provider, PathBuf::default(), range.clone())?;
// Default name doesn't have any configuration
reth_primitives::fs::rename(

View File

@ -7,7 +7,7 @@ mod segment;
use alloy_primitives::BlockNumber;
pub use compression::Compression;
pub use filters::{Filters, InclusionFilter, PerfectHashingFunction};
pub use segment::{SegmentHeader, SnapshotSegment};
pub use segment::{SegmentConfig, SegmentHeader, SnapshotSegment};
/// Default snapshot block count.
pub const BLOCKS_PER_SNAPSHOT: u64 = 500_000;

View File

@ -36,11 +36,14 @@ pub enum SnapshotSegment {
impl SnapshotSegment {
/// Returns the default configuration of the segment.
pub const fn config(&self) -> (Filters, Compression) {
let default_config = (
Filters::WithFilters(InclusionFilter::Cuckoo, super::PerfectHashingFunction::Fmph),
Compression::Lz4,
);
pub const fn config(&self) -> SegmentConfig {
let default_config = SegmentConfig {
filters: Filters::WithFilters(
InclusionFilter::Cuckoo,
super::PerfectHashingFunction::Fmph,
),
compression: Compression::Lz4,
};
match self {
SnapshotSegment::Headers => default_config,
@ -133,3 +136,12 @@ impl SegmentHeader {
}
}
}
/// Configuration used on the segment.
#[derive(Debug, Clone, Copy)]
pub struct SegmentConfig {
/// Inclusion filters used on the segment
pub filters: Filters,
/// Compression used on the segment
pub compression: Compression,
}

View File

@ -5,38 +5,48 @@ use reth_db::{
};
use reth_interfaces::RethResult;
use reth_primitives::{
snapshot::{Compression, Filters},
snapshot::{Compression, Filters, SegmentConfig},
BlockNumber, SnapshotSegment,
};
use reth_provider::DatabaseProviderRO;
use std::ops::RangeInclusive;
use std::{ops::RangeInclusive, path::Path};
/// Snapshot segment responsible for [SnapshotSegment::Headers] part of data.
#[derive(Debug)]
pub struct Headers {
compression: Compression,
filters: Filters,
config: SegmentConfig,
}
impl Headers {
/// Creates new instance of [Headers] snapshot segment.
pub fn new(compression: Compression, filters: Filters) -> Self {
Self { compression, filters }
Self { config: SegmentConfig { compression, filters } }
}
}
impl Default for Headers {
fn default() -> Self {
Self { config: SnapshotSegment::Headers.config() }
}
}
impl Segment for Headers {
fn segment() -> SnapshotSegment {
SnapshotSegment::Headers
}
fn snapshot<DB: Database>(
&self,
provider: &DatabaseProviderRO<'_, DB>,
directory: impl AsRef<Path>,
range: RangeInclusive<BlockNumber>,
) -> RethResult<()> {
let range_len = range.clone().count();
let mut jar = prepare_jar::<DB, 3>(
provider,
SnapshotSegment::Headers,
self.filters,
self.compression,
directory,
Self::segment(),
self.config,
range.clone(),
range_len,
|| {
@ -57,7 +67,7 @@ impl Segment for Headers {
// Generate list of hashes for filters & PHF
let mut cursor = provider.tx_ref().cursor_read::<RawTable<tables::CanonicalHeaders>>()?;
let mut hashes = None;
if self.filters.has_filters() {
if self.config.filters.has_filters() {
hashes = Some(
cursor
.walk(Some(RawKey::from(*range.start())))?

View File

@ -15,7 +15,9 @@ use reth_db::{
use reth_interfaces::RethResult;
use reth_nippy_jar::NippyJar;
use reth_primitives::{
snapshot::{Compression, Filters, InclusionFilter, PerfectHashingFunction, SegmentHeader},
snapshot::{
Compression, Filters, InclusionFilter, PerfectHashingFunction, SegmentConfig, SegmentHeader,
},
BlockNumber, SnapshotSegment,
};
use reth_provider::{DatabaseProviderRO, TransactionsProviderExt};
@ -24,14 +26,19 @@ use std::{ops::RangeInclusive, path::Path};
pub(crate) type Rows<const COLUMNS: usize> = [Vec<Vec<u8>>; COLUMNS];
/// A segment represents a snapshotting of some portion of the data.
pub trait Segment {
/// Snapshot data using the provided range.
pub trait Segment: Default {
/// Snapshot data using the provided range. The `directory` parameter determines the snapshot
/// file's save location.
fn snapshot<DB: Database>(
&self,
provider: &DatabaseProviderRO<'_, DB>,
directory: impl AsRef<Path>,
range: RangeInclusive<BlockNumber>,
) -> RethResult<()>;
/// Returns this struct's [`SnapshotSegment`].
fn segment() -> SnapshotSegment;
/// Generates the dataset to train a zstd dictionary with the most recent rows (at most 1000).
fn dataset_for_compression<DB: Database, T: Table<Key = u64>>(
&self,
@ -48,12 +55,13 @@ pub trait Segment {
}
}
/// Returns a [`NippyJar`] according to the desired configuration.
/// Returns a [`NippyJar`] according to the desired configuration. The `directory` parameter
/// determines the snapshot file's save location.
pub(crate) fn prepare_jar<DB: Database, const COLUMNS: usize>(
provider: &DatabaseProviderRO<'_, DB>,
directory: impl AsRef<Path>,
segment: SnapshotSegment,
filters: Filters,
compression: Compression,
segment_config: SegmentConfig,
block_range: RangeInclusive<BlockNumber>,
total_rows: usize,
prepare_compression: impl Fn() -> RethResult<Rows<COLUMNS>>,
@ -61,11 +69,11 @@ pub(crate) fn prepare_jar<DB: Database, const COLUMNS: usize>(
let tx_range = provider.transaction_range_by_block_range(block_range.clone())?;
let mut nippy_jar = NippyJar::new(
COLUMNS,
Path::new(segment.filename(&block_range).as_str()),
&directory.as_ref().join(segment.filename(&block_range).as_str()),
SegmentHeader::new(block_range, tx_range, segment),
);
nippy_jar = match compression {
nippy_jar = match segment_config.compression {
Compression::Lz4 => nippy_jar.with_lz4(),
Compression::Zstd => nippy_jar.with_zstd(false, 0),
Compression::ZstdWithDictionary => {
@ -78,7 +86,7 @@ pub(crate) fn prepare_jar<DB: Database, const COLUMNS: usize>(
Compression::Uncompressed => nippy_jar,
};
if let Filters::WithFilters(inclusion_filter, phf) = filters {
if let Filters::WithFilters(inclusion_filter, phf) = segment_config.filters {
nippy_jar = match inclusion_filter {
InclusionFilter::Cuckoo => nippy_jar.with_cuckoo_filter(total_rows),
};

View File

@ -2,30 +2,40 @@ use crate::segments::{prepare_jar, Segment};
use reth_db::{database::Database, snapshot::create_snapshot_T1, tables};
use reth_interfaces::RethResult;
use reth_primitives::{
snapshot::{Compression, Filters, SegmentHeader},
snapshot::{Compression, Filters, SegmentConfig, SegmentHeader},
BlockNumber, SnapshotSegment, TxNumber,
};
use reth_provider::{DatabaseProviderRO, TransactionsProviderExt};
use std::ops::RangeInclusive;
use std::{ops::RangeInclusive, path::Path};
/// Snapshot segment responsible for [SnapshotSegment::Receipts] part of data.
#[derive(Debug)]
pub struct Receipts {
compression: Compression,
filters: Filters,
config: SegmentConfig,
}
impl Receipts {
/// Creates new instance of [Receipts] snapshot segment.
pub fn new(compression: Compression, filters: Filters) -> Self {
Self { compression, filters }
Self { config: SegmentConfig { compression, filters } }
}
}
impl Default for Receipts {
fn default() -> Self {
Self { config: SnapshotSegment::Receipts.config() }
}
}
impl Segment for Receipts {
fn segment() -> SnapshotSegment {
SnapshotSegment::Receipts
}
fn snapshot<DB: Database>(
&self,
provider: &DatabaseProviderRO<'_, DB>,
directory: impl AsRef<Path>,
block_range: RangeInclusive<BlockNumber>,
) -> RethResult<()> {
let tx_range = provider.transaction_range_by_block_range(block_range.clone())?;
@ -33,9 +43,9 @@ impl Segment for Receipts {
let mut jar = prepare_jar::<DB, 1>(
provider,
SnapshotSegment::Receipts,
self.filters,
self.compression,
directory,
Self::segment(),
self.config,
block_range,
tx_range_len,
|| {
@ -49,7 +59,7 @@ impl Segment for Receipts {
// Generate list of hashes for filters & PHF
let mut hashes = None;
if self.filters.has_filters() {
if self.config.filters.has_filters() {
hashes = Some(
provider
.transaction_hashes_by_range(*tx_range.start()..(*tx_range.end() + 1))?

View File

@ -2,30 +2,40 @@ use crate::segments::{prepare_jar, Segment};
use reth_db::{database::Database, snapshot::create_snapshot_T1, tables};
use reth_interfaces::RethResult;
use reth_primitives::{
snapshot::{Compression, Filters, SegmentHeader},
snapshot::{Compression, Filters, SegmentConfig, SegmentHeader},
BlockNumber, SnapshotSegment, TxNumber,
};
use reth_provider::{DatabaseProviderRO, TransactionsProviderExt};
use std::ops::RangeInclusive;
use std::{ops::RangeInclusive, path::Path};
/// Snapshot segment responsible for [SnapshotSegment::Transactions] part of data.
#[derive(Debug)]
pub struct Transactions {
compression: Compression,
filters: Filters,
config: SegmentConfig,
}
impl Transactions {
/// Creates new instance of [Transactions] snapshot segment.
pub fn new(compression: Compression, filters: Filters) -> Self {
Self { compression, filters }
Self { config: SegmentConfig { compression, filters } }
}
}
impl Default for Transactions {
fn default() -> Self {
Self { config: SnapshotSegment::Transactions.config() }
}
}
impl Segment for Transactions {
fn segment() -> SnapshotSegment {
SnapshotSegment::Transactions
}
fn snapshot<DB: Database>(
&self,
provider: &DatabaseProviderRO<'_, DB>,
directory: impl AsRef<Path>,
block_range: RangeInclusive<BlockNumber>,
) -> RethResult<()> {
let tx_range = provider.transaction_range_by_block_range(block_range.clone())?;
@ -33,9 +43,9 @@ impl Segment for Transactions {
let mut jar = prepare_jar::<DB, 1>(
provider,
SnapshotSegment::Transactions,
self.filters,
self.compression,
directory,
Self::segment(),
self.config,
block_range,
tx_range_len,
|| {
@ -49,7 +59,7 @@ impl Segment for Transactions {
// Generate list of hashes for filters & PHF
let mut hashes = None;
if self.filters.has_filters() {
if self.config.filters.has_filters() {
hashes = Some(
provider
.transaction_hashes_by_range(*tx_range.start()..(*tx_range.end() + 1))?

View File

@ -1,6 +1,6 @@
//! Support for snapshotting.
use crate::SnapshotterError;
use crate::{segments, segments::Segment, SnapshotterError};
use reth_db::database::Database;
use reth_interfaces::{RethError, RethResult};
use reth_primitives::{
@ -17,6 +17,10 @@ pub type SnapshotterResult = Result<SnapshotTargets, SnapshotterError>;
/// The snapshotter type itself with the result of [Snapshotter::run]
pub type SnapshotterWithResult<DB> = (Snapshotter<DB>, SnapshotterResult);
/// Snapshots are initially created in `{...}/datadir/snapshots/temp` and moved once finished. This
/// directory is cleaned up on every booting up of the node.
const TEMPORARY_SUBDIRECTORY: &str = "temp";
/// Snapshotting routine. Main snapshotting logic happens in [Snapshotter::run].
#[derive(Debug)]
pub struct Snapshotter<DB> {
@ -89,11 +93,6 @@ impl<DB: Database> Snapshotter<DB> {
chain_spec: Arc<ChainSpec>,
block_interval: u64,
) -> RethResult<Self> {
// Create directory for snapshots if it doesn't exist.
if !snapshots_path.exists() {
reth_primitives::fs::create_dir_all(&snapshots_path)?;
}
let (highest_snapshots_notifier, highest_snapshots_tracker) = watch::channel(None);
let mut snapshotter = Self {
@ -106,11 +105,34 @@ impl<DB: Database> Snapshotter<DB> {
block_interval,
};
snapshotter.create_directory()?;
snapshotter.update_highest_snapshots_tracker()?;
Ok(snapshotter)
}
/// Ensures the snapshots directory and its temporary subdirectory are properly set up.
///
/// This function performs the following actions:
/// 1. If `datadir/snapshots` does not exist, it creates it.
/// 2. Ensures `datadir/snapshots/temp` exists and is empty.
///
/// The `temp` subdirectory is where snapshots are initially created before being
/// moved to their final location within `datadir/snapshots`.
fn create_directory(&self) -> RethResult<()> {
let temporary_path = self.snapshots_path.join(TEMPORARY_SUBDIRECTORY);
if !self.snapshots_path.exists() {
reth_primitives::fs::create_dir_all(&self.snapshots_path)?;
} else if temporary_path.exists() {
reth_primitives::fs::remove_dir_all(&temporary_path)?;
}
reth_primitives::fs::create_dir_all(temporary_path)?;
Ok(())
}
#[cfg(test)]
fn set_highest_snapshots_from_targets(&mut self, targets: &SnapshotTargets) {
if let Some(block_number) = &targets.headers {
@ -167,13 +189,48 @@ impl<DB: Database> Snapshotter<DB> {
debug_assert!(targets.is_multiple_of_block_interval(self.block_interval));
debug_assert!(targets.is_contiguous_to_highest_snapshots(self.highest_snapshots));
// TODO(alexey): snapshot logic
self.run_segment::<segments::Receipts>(
targets.receipts.as_ref().map(|(range, _)| range.clone()),
)?;
self.run_segment::<segments::Transactions>(
targets.transactions.as_ref().map(|(range, _)| range.clone()),
)?;
self.run_segment::<segments::Headers>(targets.headers.clone())?;
self.update_highest_snapshots_tracker()?;
Ok(targets)
}
/// Run the snapshotter for one segment.
///
/// It first builds the snapshot in a **temporary directory** inside the snapshots directory. If
/// for some reason the node is terminated during the snapshot process, it will be cleaned
/// up on boot (on [`Snapshotter::new`]) and the snapshot process restarted from scratch for
/// this block range and segment.
///
/// If it succeeds, then we move the snapshot file from the temporary directory to its main one.
fn run_segment<S: Segment>(
&self,
block_range: Option<RangeInclusive<BlockNumber>>,
) -> RethResult<()> {
if let Some(block_range) = block_range {
let temp = self.snapshots_path.join(TEMPORARY_SUBDIRECTORY);
let filename = S::segment().filename(&block_range);
S::default().snapshot::<DB>(
&self.provider_factory.provider()?,
temp.clone(),
block_range.clone(),
)?;
reth_primitives::fs::rename(temp.join(&filename), self.snapshots_path.join(filename))?;
}
Ok(())
}
/// Returns a snapshot targets at the provided finalized block number, respecting the block
/// interval. The target is determined by the check against last snapshots.
pub fn get_snapshot_targets(