Breaking changes (#5191)

Co-authored-by: Bjerg <onbjerg@users.noreply.github.com>
Co-authored-by: Roman Krasiuk <rokrassyuk@gmail.com>
Co-authored-by: joshieDo <ranriver@protonmail.com>
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Oliver Nordbjerg <hi@notbjerg.me>
Co-authored-by: Thomas Coratger <thomas.coratger@gmail.com>
This commit is contained in:
Alexey Shekhirin
2024-02-29 12:37:28 +00:00
committed by GitHub
parent 025fa5f038
commit 6b5b6f7a40
252 changed files with 10154 additions and 6327 deletions

View File

@ -0,0 +1,43 @@
[package]
name = "reth-static-file"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
description = "Static file producer implementation"
[lints]
workspace = true
[dependencies]
# reth
reth-primitives.workspace = true
reth-db.workspace = true
reth-provider.workspace = true
reth-interfaces.workspace = true
reth-nippy-jar.workspace = true
reth-tokio-util.workspace = true
# async
tokio = { workspace = true, features = ["sync"] }
tokio-stream.workspace = true
# misc
thiserror.workspace = true
tracing.workspace = true
clap = { workspace = true, features = ["derive"], optional = true }
rayon.workspace = true
[dev-dependencies]
# reth
reth-db = { workspace = true, features = ["test-utils"] }
reth-stages = { workspace = true, features = ["test-utils"] }
# misc
tempfile.workspace = true
assert_matches.workspace = true
[features]
clap = ["dep:clap"]

View File

@ -0,0 +1,88 @@
# StaticFile
## Overview
Data that has reached a finalized state and won't undergo further changes (essentially frozen) should be read without concerns of modification. This makes it unsuitable for traditional databases.
This crate aims to copy this data from the current database to multiple static files, aggregated by block ranges. At every 500_000th block new static files are created.
Below are two diagrams illustrating the processes of creating static files (custom format: `NippyJar`) and querying them. A glossary is also provided to explain the different (linked) components involved in these processes.
<details>
<summary>Creation diagram (<code>StaticFileProducer</code>)</summary>
```mermaid
graph TD;
I("BLOCK_HEIGHT % 500_000 == 0")--triggers-->SP(StaticFileProducer)
SP --> |triggers| SH["create_static_file(block_range, StaticFileSegment::Headers)"]
SP --> |triggers| ST["create_static_file(block_range, StaticFileSegment::Transactions)"]
SP --> |triggers| SR["create_static_file(block_range, StaticFileSegment::Receipts)"]
SP --> |triggers| ETC["create_static_file(block_range, ...)"]
SH --> CS["create_static_file::&lt; T &gt;(DatabaseCursor)"]
ST --> CS
SR --> CS
ETC --> CS
CS --> |create| IF(NippyJar::InclusionFilters)
CS -- iterates --> DC(DatabaseCursor) -->HN{HasNext}
HN --> |true| NJC(NippyJar::Compression)
NJC --> HN
NJC --store--> NJ
HN --> |false| NJ
IF --store--> NJ(NippyJar)
NJ --freeze--> F(File)
F--"on success"--> SP1(StaticFileProducer)
SP1 --"sends BLOCK_HEIGHT"--> HST(HighestStaticFileTracker)
HST --"read by"-->Pruner
HST --"read by"-->DatabaseProvider
HST --"read by"-->SnapsotProvider
HST --"read by"-->ProviderFactory
```
</details>
<details>
<summary>Query diagram (<code>Provider</code>)</summary>
```mermaid
graph TD;
RPC-->P
P("Provider::header(block_number)")-->PF(ProviderFactory)
PF--shares-->SP1("Arc(StaticFileProvider)")
SP1--shares-->PD(DatabaseProvider)
PF--creates-->PD
PD--check `HighestStaticFileTracker`-->PD
PD-->DC1{block_number <br> > <br> highest static_file block}
DC1 --> |true| PD1("DatabaseProvider::header(block_number)")
DC1 --> |false| ASP("StaticFileProvider::header(block_number)")
PD1 --> MDBX
ASP --find correct jar and creates--> JP("StaticFileJarProvider::header(block_number)")
JP --"creates"-->SC(StaticFileCursor)
SC --".get_one&lt; HeaderMask&lt; Header &gt; &gt;(number)"--->NJC("NippyJarCursor")
NJC--".row_by_number(row_index, mask)"-->NJ[NippyJar]
NJ--"&[u8]"-->NJC
NJC--"&[u8]"-->SC
SC--"Header"--> JP
JP--"Header"--> ASP
```
</details>
### Glossary
In descending order of abstraction hierarchy:
[`StaticFileProducer`](../../crates/static_file/src/static_file_producer.rs#L20): A `reth` background service that **copies** data from the database to new static-file files when the block height reaches a certain threshold (e.g., `500_000th`). Upon completion, it dispatches a notification about the higher static file block to `HighestStaticFileTracker` channel. **It DOES NOT remove data from the database.**
[`HighestStaticFileTracker`](../../crates/static_file/src/static_file_producer.rs#L22): A channel utilized by `StaticFileProducer` to announce the newest static_file block to all components with a listener: `Pruner` (to know which additional tables can be pruned) and `DatabaseProvider` (to know which data can be queried from the static files).
[`StaticFileProvider`](../../crates/storage/provider/src/providers/static_file/manager.rs#L15) A provider similar to `DatabaseProvider`, **managing all existing static_file files** and selecting the optimal one (by range and segment type) to fulfill a request. **A single instance is shared across all components and should be instantiated only once within `ProviderFactory`**. An immutable reference is given everytime `ProviderFactory` creates a new `DatabaseProvider`.
[`StaticFileJarProvider`](../../crates/storage/provider/src/providers/static_file/jar.rs#L42) A provider similar to `DatabaseProvider` that provides access to a **single static_file file**.
[`StaticFileCursor`](../../crates/storage/db/src/static_file/cursor.rs#L12) An elevated abstraction of `NippyJarCursor` for simplified access. It associates the bitmasks with type decoding. For instance, `cursor.get_two::<TransactionMask<Tx, Signature>>(tx_number)` would yield `Tx` and `Signature`, eliminating the need to manage masks or invoke a decoder/decompressor.
[`StaticFileSegment`](../../crates/primitives/src/static_file/segment.rs#L10) Each static_file file only contains data of a specific segment, e.g., `Headers`, `Transactions`, or `Receipts`.
[`NippyJarCursor`](../../crates/storage/nippy-jar/src/cursor.rs#L12) Accessor of data in a `NippyJar` file. It enables queries either by row number (e.g., block number 1) or by a predefined key not part of the file (e.g., transaction hashes). If a file has multiple columns (e.g., `Tx | TxSender | Signature`), and one wishes to access only one of the column values, this can be accomplished by bitmasks. (e.g., for `TxSender`, the mask would be `0b010`).
[`NippyJar`](../../crates/storage/nippy-jar/src/lib.rs#57) A create-only file format. No data can be appended after creation. It supports multiple columns, compression (e.g., Zstd (with and without dictionaries), lz4, uncompressed) and inclusion filters (e.g., cuckoo filter: `is hash X part of this dataset`). StaticFiles are organized by block ranges. (e.g., `TransactionStaticFile_499_999.jar` contains a transaction per row for all transactions from block `0` to block `499_999`). For more check the struct documentation.

View File

@ -0,0 +1,19 @@
use crate::StaticFileTargets;
use std::time::Duration;
/// An event emitted by a [StaticFileProducer][crate::StaticFileProducer].
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum StaticFileProducerEvent {
/// Emitted when static file producer started running.
Started {
/// Targets that will be moved to static files
targets: StaticFileTargets,
},
/// Emitted when static file producer finished running.
Finished {
/// Targets that were moved to static files
targets: StaticFileTargets,
/// Time it took to run the static file producer
elapsed: Duration,
},
}

View File

@ -0,0 +1,17 @@
//! Static file producer implementation.
#![doc(
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
)]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
mod event;
pub mod segments;
mod static_file_producer;
pub use event::StaticFileProducerEvent;
pub use static_file_producer::{
StaticFileProducer, StaticFileProducerResult, StaticFileProducerWithResult, StaticFileTargets,
};

View File

@ -0,0 +1,128 @@
use crate::segments::{dataset_for_compression, prepare_jar, Segment, SegmentHeader};
use reth_db::{
cursor::DbCursorRO, database::Database, static_file::create_static_file_T1_T2_T3, tables,
transaction::DbTx, RawKey, RawTable,
};
use reth_interfaces::provider::ProviderResult;
use reth_primitives::{static_file::SegmentConfig, BlockNumber, StaticFileSegment};
use reth_provider::{
providers::{StaticFileProvider, StaticFileWriter},
DatabaseProviderRO,
};
use std::{ops::RangeInclusive, path::Path};
/// Static File segment responsible for [StaticFileSegment::Headers] part of data.
#[derive(Debug, Default)]
pub struct Headers;
impl<DB: Database> Segment<DB> for Headers {
fn segment(&self) -> StaticFileSegment {
StaticFileSegment::Headers
}
fn copy_to_static_files(
&self,
provider: DatabaseProviderRO<DB>,
static_file_provider: StaticFileProvider,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
let mut static_file_writer =
static_file_provider.get_writer(*block_range.start(), StaticFileSegment::Headers)?;
let mut headers_cursor = provider.tx_ref().cursor_read::<tables::Headers>()?;
let headers_walker = headers_cursor.walk_range(block_range.clone())?;
let mut header_td_cursor =
provider.tx_ref().cursor_read::<tables::HeaderTerminalDifficulties>()?;
let header_td_walker = header_td_cursor.walk_range(block_range.clone())?;
let mut canonical_headers_cursor =
provider.tx_ref().cursor_read::<tables::CanonicalHeaders>()?;
let canonical_headers_walker = canonical_headers_cursor.walk_range(block_range)?;
for ((header_entry, header_td_entry), canonical_header_entry) in
headers_walker.zip(header_td_walker).zip(canonical_headers_walker)
{
let (header_block, header) = header_entry?;
let (header_td_block, header_td) = header_td_entry?;
let (canonical_header_block, canonical_header) = canonical_header_entry?;
debug_assert_eq!(header_block, header_td_block);
debug_assert_eq!(header_td_block, canonical_header_block);
let _static_file_block =
static_file_writer.append_header(header, header_td.0, canonical_header)?;
debug_assert_eq!(_static_file_block, header_block);
}
Ok(())
}
fn create_static_file_file(
&self,
provider: &DatabaseProviderRO<DB>,
directory: &Path,
config: SegmentConfig,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
let range_len = block_range.clone().count();
let jar = prepare_jar::<DB, 3>(
provider,
directory,
StaticFileSegment::Headers,
config,
block_range.clone(),
range_len,
|| {
Ok([
dataset_for_compression::<DB, tables::Headers>(
provider,
&block_range,
range_len,
)?,
dataset_for_compression::<DB, tables::HeaderTerminalDifficulties>(
provider,
&block_range,
range_len,
)?,
dataset_for_compression::<DB, tables::CanonicalHeaders>(
provider,
&block_range,
range_len,
)?,
])
},
)?;
// Generate list of hashes for filters & PHF
let mut cursor = provider.tx_ref().cursor_read::<RawTable<tables::CanonicalHeaders>>()?;
let mut hashes = None;
if config.filters.has_filters() {
hashes = Some(
cursor
.walk(Some(RawKey::from(*block_range.start())))?
.take(range_len)
.map(|row| row.map(|(_key, value)| value.into_value()).map_err(|e| e.into())),
);
}
create_static_file_T1_T2_T3::<
tables::Headers,
tables::HeaderTerminalDifficulties,
tables::CanonicalHeaders,
BlockNumber,
SegmentHeader,
>(
provider.tx_ref(),
block_range,
None,
// We already prepared the dictionary beforehand
None::<Vec<std::vec::IntoIter<Vec<u8>>>>,
hashes,
range_len,
jar,
)?;
Ok(())
}
}

View File

@ -0,0 +1,116 @@
//! StaticFile segment implementations and utilities.
mod transactions;
pub use transactions::Transactions;
mod headers;
pub use headers::Headers;
mod receipts;
pub use receipts::Receipts;
use reth_db::{
cursor::DbCursorRO, database::Database, table::Table, transaction::DbTx, RawKey, RawTable,
};
use reth_interfaces::provider::ProviderResult;
use reth_nippy_jar::NippyJar;
use reth_primitives::{
static_file::{
find_fixed_range, Compression, Filters, InclusionFilter, PerfectHashingFunction,
SegmentConfig, SegmentHeader,
},
BlockNumber, StaticFileSegment,
};
use reth_provider::{providers::StaticFileProvider, DatabaseProviderRO, TransactionsProviderExt};
use std::{ops::RangeInclusive, path::Path};
pub(crate) type Rows<const COLUMNS: usize> = [Vec<Vec<u8>>; COLUMNS];
/// A segment represents moving some portion of the data to static files.
pub trait Segment<DB: Database>: Send + Sync {
/// Returns the [`StaticFileSegment`].
fn segment(&self) -> StaticFileSegment;
/// Move data to static files for the provided block range. [StaticFileProvider] will handle the
/// management of and writing to files.
fn copy_to_static_files(
&self,
provider: DatabaseProviderRO<DB>,
static_file_provider: StaticFileProvider,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()>;
/// Create a static file of data for the provided block range. The `directory` parameter
/// determines the static file's save location.
fn create_static_file_file(
&self,
provider: &DatabaseProviderRO<DB>,
directory: &Path,
config: SegmentConfig,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()>;
}
/// Returns a [`NippyJar`] according to the desired configuration. The `directory` parameter
/// determines the static file's save location.
pub(crate) fn prepare_jar<DB: Database, const COLUMNS: usize>(
provider: &DatabaseProviderRO<DB>,
directory: impl AsRef<Path>,
segment: StaticFileSegment,
segment_config: SegmentConfig,
block_range: RangeInclusive<BlockNumber>,
total_rows: usize,
prepare_compression: impl Fn() -> ProviderResult<Rows<COLUMNS>>,
) -> ProviderResult<NippyJar<SegmentHeader>> {
let tx_range = match segment {
StaticFileSegment::Headers => None,
StaticFileSegment::Receipts | StaticFileSegment::Transactions => {
Some(provider.transaction_range_by_block_range(block_range.clone())?.into())
}
};
let mut nippy_jar = NippyJar::new(
COLUMNS,
&directory.as_ref().join(segment.filename(&find_fixed_range(*block_range.end())).as_str()),
SegmentHeader::new(block_range.clone().into(), Some(block_range.into()), tx_range, segment),
);
nippy_jar = match segment_config.compression {
Compression::Lz4 => nippy_jar.with_lz4(),
Compression::Zstd => nippy_jar.with_zstd(false, 0),
Compression::ZstdWithDictionary => {
let dataset = prepare_compression()?;
nippy_jar = nippy_jar.with_zstd(true, 5_000_000);
nippy_jar.prepare_compression(dataset.to_vec())?;
nippy_jar
}
Compression::Uncompressed => nippy_jar,
};
if let Filters::WithFilters(inclusion_filter, phf) = segment_config.filters {
nippy_jar = match inclusion_filter {
InclusionFilter::Cuckoo => nippy_jar.with_cuckoo_filter(total_rows),
};
nippy_jar = match phf {
PerfectHashingFunction::Fmph => nippy_jar.with_fmph(),
PerfectHashingFunction::GoFmph => nippy_jar.with_gofmph(),
};
}
Ok(nippy_jar)
}
/// Generates the dataset to train a zstd dictionary with the most recent rows (at most 1000).
pub(crate) fn dataset_for_compression<DB: Database, T: Table<Key = u64>>(
provider: &DatabaseProviderRO<DB>,
range: &RangeInclusive<u64>,
range_len: usize,
) -> ProviderResult<Vec<Vec<u8>>> {
let mut cursor = provider.tx_ref().cursor_read::<RawTable<T>>()?;
Ok(cursor
.walk_back(Some(RawKey::from(*range.end())))?
.take(range_len.min(1000))
.map(|row| row.map(|(_key, value)| value.into_value()).expect("should exist"))
.collect::<Vec<_>>())
}

View File

@ -0,0 +1,107 @@
use crate::segments::{dataset_for_compression, prepare_jar, Segment};
use reth_db::{
cursor::DbCursorRO, database::Database, static_file::create_static_file_T1, tables,
transaction::DbTx,
};
use reth_interfaces::provider::{ProviderError, ProviderResult};
use reth_primitives::{
static_file::{SegmentConfig, SegmentHeader},
BlockNumber, StaticFileSegment, TxNumber,
};
use reth_provider::{
providers::{StaticFileProvider, StaticFileWriter},
BlockReader, DatabaseProviderRO, TransactionsProviderExt,
};
use std::{ops::RangeInclusive, path::Path};
/// Static File segment responsible for [StaticFileSegment::Receipts] part of data.
#[derive(Debug, Default)]
pub struct Receipts;
impl<DB: Database> Segment<DB> for Receipts {
fn segment(&self) -> StaticFileSegment {
StaticFileSegment::Receipts
}
fn copy_to_static_files(
&self,
provider: DatabaseProviderRO<DB>,
static_file_provider: StaticFileProvider,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
let mut static_file_writer =
static_file_provider.get_writer(*block_range.start(), StaticFileSegment::Receipts)?;
for block in block_range {
let _static_file_block =
static_file_writer.increment_block(StaticFileSegment::Receipts)?;
debug_assert_eq!(_static_file_block, block);
let block_body_indices = provider
.block_body_indices(block)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
let mut receipts_cursor = provider.tx_ref().cursor_read::<tables::Receipts>()?;
let receipts_walker = receipts_cursor.walk_range(block_body_indices.tx_num_range())?;
for entry in receipts_walker {
let (tx_number, receipt) = entry?;
static_file_writer.append_receipt(tx_number, receipt)?;
}
}
Ok(())
}
fn create_static_file_file(
&self,
provider: &DatabaseProviderRO<DB>,
directory: &Path,
config: SegmentConfig,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
let tx_range = provider.transaction_range_by_block_range(block_range.clone())?;
let tx_range_len = tx_range.clone().count();
let jar = prepare_jar::<DB, 1>(
provider,
directory,
StaticFileSegment::Receipts,
config,
block_range,
tx_range_len,
|| {
Ok([dataset_for_compression::<DB, tables::Receipts>(
provider,
&tx_range,
tx_range_len,
)?])
},
)?;
// Generate list of hashes for filters & PHF
let mut hashes = None;
if config.filters.has_filters() {
hashes = Some(
provider
.transaction_hashes_by_range(*tx_range.start()..(*tx_range.end() + 1))?
.into_iter()
.map(|(tx, _)| Ok(tx)),
);
}
create_static_file_T1::<tables::Receipts, TxNumber, SegmentHeader>(
provider.tx_ref(),
tx_range,
None,
// We already prepared the dictionary beforehand
None::<Vec<std::vec::IntoIter<Vec<u8>>>>,
hashes,
tx_range_len,
jar,
)?;
Ok(())
}
}

View File

@ -0,0 +1,111 @@
use crate::segments::{dataset_for_compression, prepare_jar, Segment};
use reth_db::{
cursor::DbCursorRO, database::Database, static_file::create_static_file_T1, tables,
transaction::DbTx,
};
use reth_interfaces::provider::{ProviderError, ProviderResult};
use reth_primitives::{
static_file::{SegmentConfig, SegmentHeader},
BlockNumber, StaticFileSegment, TxNumber,
};
use reth_provider::{
providers::{StaticFileProvider, StaticFileWriter},
BlockReader, DatabaseProviderRO, TransactionsProviderExt,
};
use std::{ops::RangeInclusive, path::Path};
/// Static File segment responsible for [StaticFileSegment::Transactions] part of data.
#[derive(Debug, Default)]
pub struct Transactions;
impl<DB: Database> Segment<DB> for Transactions {
fn segment(&self) -> StaticFileSegment {
StaticFileSegment::Transactions
}
/// Write transactions from database table [tables::Transactions] to static files with segment
/// [StaticFileSegment::Transactions] for the provided block range.
fn copy_to_static_files(
&self,
provider: DatabaseProviderRO<DB>,
static_file_provider: StaticFileProvider,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
let mut static_file_writer = static_file_provider
.get_writer(*block_range.start(), StaticFileSegment::Transactions)?;
for block in block_range {
let _static_file_block =
static_file_writer.increment_block(StaticFileSegment::Transactions)?;
debug_assert_eq!(_static_file_block, block);
let block_body_indices = provider
.block_body_indices(block)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
let mut transactions_cursor =
provider.tx_ref().cursor_read::<tables::Transactions>()?;
let transactions_walker =
transactions_cursor.walk_range(block_body_indices.tx_num_range())?;
for entry in transactions_walker {
let (tx_number, transaction) = entry?;
static_file_writer.append_transaction(tx_number, transaction)?;
}
}
Ok(())
}
fn create_static_file_file(
&self,
provider: &DatabaseProviderRO<DB>,
directory: &Path,
config: SegmentConfig,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
let tx_range = provider.transaction_range_by_block_range(block_range.clone())?;
let tx_range_len = tx_range.clone().count();
let jar = prepare_jar::<DB, 1>(
provider,
directory,
StaticFileSegment::Transactions,
config,
block_range,
tx_range_len,
|| {
Ok([dataset_for_compression::<DB, tables::Transactions>(
provider,
&tx_range,
tx_range_len,
)?])
},
)?;
// Generate list of hashes for filters & PHF
let mut hashes = None;
if config.filters.has_filters() {
hashes = Some(
provider
.transaction_hashes_by_range(*tx_range.start()..(*tx_range.end() + 1))?
.into_iter()
.map(|(tx, _)| Ok(tx)),
);
}
create_static_file_T1::<tables::Transactions, TxNumber, SegmentHeader>(
provider.tx_ref(),
tx_range,
None,
// We already prepared the dictionary beforehand
None::<Vec<std::vec::IntoIter<Vec<u8>>>>,
hashes,
tx_range_len,
jar,
)?;
Ok(())
}
}

View File

@ -0,0 +1,327 @@
//! Support for producing static files.
use crate::{segments, segments::Segment, StaticFileProducerEvent};
use rayon::prelude::*;
use reth_db::database::Database;
use reth_interfaces::RethResult;
use reth_primitives::{static_file::HighestStaticFiles, BlockNumber, PruneModes};
use reth_provider::{
providers::{StaticFileProvider, StaticFileWriter},
ProviderFactory,
};
use reth_tokio_util::EventListeners;
use std::{ops::RangeInclusive, time::Instant};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, trace};
/// Result of [StaticFileProducer::run] execution.
pub type StaticFileProducerResult = RethResult<StaticFileTargets>;
/// The [StaticFileProducer] instance itself with the result of [StaticFileProducer::run]
pub type StaticFileProducerWithResult<DB> = (StaticFileProducer<DB>, StaticFileProducerResult);
/// Static File producer routine. See [StaticFileProducer::run] for more detailed description.
#[derive(Debug, Clone)]
pub struct StaticFileProducer<DB> {
/// Provider factory
provider_factory: ProviderFactory<DB>,
/// Static File provider
static_file_provider: StaticFileProvider,
/// Pruning configuration for every part of the data that can be pruned. Set by user, and
/// needed in [StaticFileProducer] to prevent attempting to move prunable data to static files.
/// See [StaticFileProducer::get_static_file_targets].
prune_modes: PruneModes,
listeners: EventListeners<StaticFileProducerEvent>,
}
/// Static File targets, per data part, measured in [`BlockNumber`].
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct StaticFileTargets {
headers: Option<RangeInclusive<BlockNumber>>,
receipts: Option<RangeInclusive<BlockNumber>>,
transactions: Option<RangeInclusive<BlockNumber>>,
}
impl StaticFileTargets {
/// Returns `true` if any of the targets are [Some].
pub fn any(&self) -> bool {
self.headers.is_some() || self.receipts.is_some() || self.transactions.is_some()
}
// Returns `true` if all targets are either [`None`] or has beginning of the range equal to the
// highest static_file.
fn is_contiguous_to_highest_static_files(&self, static_files: HighestStaticFiles) -> bool {
[
(self.headers.as_ref(), static_files.headers),
(self.receipts.as_ref(), static_files.receipts),
(self.transactions.as_ref(), static_files.transactions),
]
.iter()
.all(|(target_block_range, highest_static_fileted_block)| {
target_block_range.map_or(true, |target_block_range| {
*target_block_range.start() ==
highest_static_fileted_block.map_or(0, |highest_static_fileted_block| {
highest_static_fileted_block + 1
})
})
})
}
}
impl<DB: Database> StaticFileProducer<DB> {
/// Creates a new [StaticFileProducer].
pub fn new(
provider_factory: ProviderFactory<DB>,
static_file_provider: StaticFileProvider,
prune_modes: PruneModes,
) -> Self {
Self { provider_factory, static_file_provider, prune_modes, listeners: Default::default() }
}
/// Listen for events on the static_file_producer.
pub fn events(&mut self) -> UnboundedReceiverStream<StaticFileProducerEvent> {
self.listeners.new_listener()
}
/// Run the static_file_producer.
///
/// For each [Some] target in [StaticFileTargets], initializes a corresponding [Segment] and
/// runs it with the provided block range using [StaticFileProvider] and a read-only
/// database transaction from [ProviderFactory]. All segments are run in parallel.
///
/// NOTE: it doesn't delete the data from database, and the actual deleting (aka pruning) logic
/// lives in the `prune` crate.
pub fn run(&mut self, targets: StaticFileTargets) -> StaticFileProducerResult {
debug_assert!(targets.is_contiguous_to_highest_static_files(
self.static_file_provider.get_highest_static_files()
));
self.listeners.notify(StaticFileProducerEvent::Started { targets: targets.clone() });
debug!(target: "static_file", ?targets, "StaticFileProducer started");
let start = Instant::now();
let mut segments = Vec::<(Box<dyn Segment<DB>>, RangeInclusive<BlockNumber>)>::new();
if let Some(block_range) = targets.transactions.clone() {
segments.push((Box::new(segments::Transactions), block_range));
}
if let Some(block_range) = targets.headers.clone() {
segments.push((Box::new(segments::Headers), block_range));
}
if let Some(block_range) = targets.receipts.clone() {
segments.push((Box::new(segments::Receipts), block_range));
}
segments.par_iter().try_for_each(|(segment, block_range)| -> RethResult<()> {
debug!(target: "static_file", segment = %segment.segment(), ?block_range, "StaticFileProducer segment");
let start = Instant::now();
// Create a new database transaction on every segment to prevent long-lived read-only
// transactions
let provider = self.provider_factory.provider()?.disable_long_read_transaction_safety();
segment.copy_to_static_files(provider, self.static_file_provider.clone(), block_range.clone())?;
let elapsed = start.elapsed(); // TODO(alexey): track in metrics
debug!(target: "static_file", segment = %segment.segment(), ?block_range, ?elapsed, "Finished StaticFileProducer segment");
Ok(())
})?;
self.static_file_provider.commit()?;
for (segment, block_range) in segments {
self.static_file_provider.update_index(segment.segment(), Some(*block_range.end()))?;
}
let elapsed = start.elapsed(); // TODO(alexey): track in metrics
debug!(target: "static_file", ?targets, ?elapsed, "StaticFileProducer finished");
self.listeners
.notify(StaticFileProducerEvent::Finished { targets: targets.clone(), elapsed });
Ok(targets)
}
/// Returns a static file targets at the provided finalized block numbers per segment.
/// The target is determined by the check against highest static_files using
/// [StaticFileProvider::get_highest_static_files].
pub fn get_static_file_targets(
&self,
finalized_block_numbers: HighestStaticFiles,
) -> RethResult<StaticFileTargets> {
let highest_static_files = self.static_file_provider.get_highest_static_files();
let targets = StaticFileTargets {
headers: finalized_block_numbers.headers.and_then(|finalized_block_number| {
self.get_static_file_target(highest_static_files.headers, finalized_block_number)
}),
// StaticFile receipts only if they're not pruned according to the user configuration
receipts: if self.prune_modes.receipts.is_none() &&
self.prune_modes.receipts_log_filter.is_empty()
{
finalized_block_numbers.receipts.and_then(|finalized_block_number| {
self.get_static_file_target(
highest_static_files.receipts,
finalized_block_number,
)
})
} else {
None
},
transactions: finalized_block_numbers.transactions.and_then(|finalized_block_number| {
self.get_static_file_target(
highest_static_files.transactions,
finalized_block_number,
)
}),
};
trace!(
target: "static_file",
?finalized_block_numbers,
?highest_static_files,
?targets,
any = %targets.any(),
"StaticFile targets"
);
Ok(targets)
}
fn get_static_file_target(
&self,
highest_static_file: Option<BlockNumber>,
finalized_block_number: BlockNumber,
) -> Option<RangeInclusive<BlockNumber>> {
let range = highest_static_file.map_or(0, |block| block + 1)..=finalized_block_number;
(!range.is_empty()).then_some(range)
}
}
#[cfg(test)]
mod tests {
use crate::{static_file_producer::StaticFileTargets, StaticFileProducer};
use assert_matches::assert_matches;
use reth_db::{database::Database, transaction::DbTx};
use reth_interfaces::{
provider::ProviderError,
test_utils::{
generators,
generators::{random_block_range, random_receipt},
},
RethError,
};
use reth_primitives::{
static_file::HighestStaticFiles, PruneModes, StaticFileSegment, B256, U256,
};
use reth_provider::providers::StaticFileWriter;
use reth_stages::test_utils::{StorageKind, TestStageDB};
#[test]
fn run() {
let mut rng = generators::rng();
let db = TestStageDB::default();
let blocks = random_block_range(&mut rng, 0..=3, B256::ZERO, 2..3);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
// Unwind headers from static_files and manually insert them into the database, so we're
// able to check that static_file_producer works
db.factory
.static_file_provider()
.latest_writer(StaticFileSegment::Headers)
.expect("get static file writer for headers")
.prune_headers(blocks.len() as u64)
.expect("prune headers");
let tx = db.factory.db_ref().tx_mut().expect("init tx");
blocks.iter().for_each(|block| {
TestStageDB::insert_header(None, &tx, &block.header, U256::ZERO)
.expect("insert block header");
});
tx.commit().expect("commit tx");
let mut receipts = Vec::new();
for block in &blocks {
for transaction in &block.body {
receipts
.push((receipts.len() as u64, random_receipt(&mut rng, transaction, Some(0))));
}
}
db.insert_receipts(receipts).expect("insert receipts");
let provider_factory = db.factory;
let static_file_provider = provider_factory.static_file_provider();
let mut static_file_producer = StaticFileProducer::new(
provider_factory,
static_file_provider.clone(),
PruneModes::default(),
);
let targets = static_file_producer
.get_static_file_targets(HighestStaticFiles {
headers: Some(1),
receipts: Some(1),
transactions: Some(1),
})
.expect("get static file targets");
assert_eq!(
targets,
StaticFileTargets {
headers: Some(0..=1),
receipts: Some(0..=1),
transactions: Some(0..=1)
}
);
assert_matches!(static_file_producer.run(targets), Ok(_));
assert_eq!(
static_file_provider.get_highest_static_files(),
HighestStaticFiles { headers: Some(1), receipts: Some(1), transactions: Some(1) }
);
let targets = static_file_producer
.get_static_file_targets(HighestStaticFiles {
headers: Some(3),
receipts: Some(3),
transactions: Some(3),
})
.expect("get static file targets");
assert_eq!(
targets,
StaticFileTargets {
headers: Some(2..=3),
receipts: Some(2..=3),
transactions: Some(2..=3)
}
);
assert_matches!(static_file_producer.run(targets), Ok(_));
assert_eq!(
static_file_provider.get_highest_static_files(),
HighestStaticFiles { headers: Some(3), receipts: Some(3), transactions: Some(3) }
);
let targets = static_file_producer
.get_static_file_targets(HighestStaticFiles {
headers: Some(4),
receipts: Some(4),
transactions: Some(4),
})
.expect("get static file targets");
assert_eq!(
targets,
StaticFileTargets {
headers: Some(4..=4),
receipts: Some(4..=4),
transactions: Some(4..=4)
}
);
assert_matches!(
static_file_producer.run(targets),
Err(RethError::Provider(ProviderError::BlockBodyIndicesNotFound(4)))
);
assert_eq!(
static_file_provider.get_highest_static_files(),
HighestStaticFiles { headers: Some(3), receipts: Some(3), transactions: Some(3) }
);
}
}