refactor(prune): segment trait, receipts impl (#4887)

This commit is contained in:
Alexey Shekhirin
2023-10-05 12:56:47 +01:00
committed by GitHub
parent 4dceabf06b
commit 1e7d028d53
43 changed files with 633 additions and 428 deletions

View File

@ -23,7 +23,7 @@ impl PruningArgs {
Ok(if self.full {
Some(PruneConfig {
block_interval: 5,
parts: PruneModes {
segments: PruneModes {
sender_recovery: Some(PruneMode::Full),
transaction_lookup: None,
receipts: chain_spec

View File

@ -193,7 +193,7 @@ impl ImportCommand {
.clean_threshold
.max(config.stages.account_hashing.clean_threshold)
.max(config.stages.storage_hashing.clean_threshold),
config.prune.map(|prune| prune.parts).unwrap_or_default(),
config.prune.map(|prune| prune.segments).unwrap_or_default(),
)),
)
.build(db, self.chain.clone());

View File

@ -145,7 +145,7 @@ impl Command {
.clean_threshold
.max(stage_conf.account_hashing.clean_threshold)
.max(stage_conf.storage_hashing.clean_threshold),
config.prune.as_ref().map(|prune| prune.parts.clone()).unwrap_or_default(),
config.prune.as_ref().map(|prune| prune.segments.clone()).unwrap_or_default(),
)),
)
.build(db, self.chain.clone());

View File

@ -163,8 +163,8 @@ impl NodeState {
fn handle_pruner_event(&self, event: PrunerEvent) {
match event {
PrunerEvent::Finished { tip_block_number, elapsed, parts } => {
info!(tip_block_number, ?elapsed, ?parts, "Pruner finished");
PrunerEvent::Finished { tip_block_number, elapsed, segments } => {
info!(tip_block_number, ?elapsed, ?segments, "Pruner finished");
}
}
}

View File

@ -286,7 +286,7 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
let tree = BlockchainTree::new(
tree_externals,
BlockchainTreeConfig::default(),
prune_config.clone().map(|config| config.parts),
prune_config.clone().map(|config| config.segments),
)?
.with_sync_metrics_tx(metrics_tx.clone());
let canon_state_notification_sender = tree.canon_state_notification_sender();
@ -461,7 +461,7 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
db.clone(),
self.chain.clone(),
prune_config.block_interval,
prune_config.parts,
prune_config.segments,
self.chain.prune_delete_limit,
highest_snapshots_rx,
);
@ -829,7 +829,7 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
let factory = factory.with_stack_config(stack_config);
let prune_modes = prune_config.map(|prune| prune.parts).unwrap_or_default();
let prune_modes = prune_config.map(|prune| prune.segments).unwrap_or_default();
let header_mode =
if continuous { HeaderSyncMode::Continuous } else { HeaderSyncMode::Tip(tip_rx) };

View File

@ -205,7 +205,7 @@ impl Command {
max_cumulative_gas: None,
},
config.stages.merkle.clean_threshold,
config.prune.map(|prune| prune.parts).unwrap_or_default(),
config.prune.map(|prune| prune.segments).unwrap_or_default(),
)),
None,
)

View File

@ -335,8 +335,8 @@ nanos = 0
The prune section configures the pruning configuration.
You can configure the pruning of different parts of the data independently of others.
For any unspecified parts, the default setting is no pruning.
You can configure the pruning of different segments of the data independently of others.
For any unspecified segments, the default setting is no pruning.
### Default config

View File

@ -57,10 +57,10 @@ You can track the growth of Reth archive node size with our
### Pruned Node
Different parts take up different amounts of disk space.
If pruned fully, this is the total freed space you'll get, per part:
Different segments take up different amounts of disk space.
If pruned fully, this is the total freed space you'll get, per segment:
| Part | Size |
| Segment | Size |
|--------------------|-------|
| Sender Recovery | 70GB |
| Transaction Lookup | 140GB |
@ -96,7 +96,7 @@ Meaning, it prunes:
is completed, so the disk space is reclaimed slowly.
- Receipts up to the last 128 blocks, preserving all receipts with the logs from Beacon Deposit Contract
Given the aforementioned part sizes, we get the following full node size:
Given the aforementioned segment sizes, we get the following full node size:
```text
Archive Node - Receipts - AccountHistory - StorageHistory = Full Node
```
@ -106,15 +106,15 @@ Archive Node - Receipts - AccountHistory - StorageHistory = Full Node
## RPC support
As it was mentioned in the [pruning configuration chapter](./config.md#the-prune-section), there are several parts
which can be pruned independently of each other:
As it was mentioned in the [pruning configuration chapter](./config.md#the-prune-section), there are several segments which can be pruned
independently of each other:
- Sender Recovery
- Transaction Lookup
- Receipts
- Account History
- Storage History
Pruning of each of these parts disables different RPC methods, because the historical data or lookup indexes
Pruning of each of these segments disables different RPC methods, because the historical data or lookup indexes
become unavailable.
### Full Node
@ -140,7 +140,7 @@ The following tables describe RPC methods available in the full node.
#### `eth` namespace
| RPC / Part | Note |
| RPC / Segment | Note |
|-------------------------------------------|----------------------------------------------------------|
| `eth_accounts` | |
| `eth_blockNumber` | |
@ -188,7 +188,7 @@ The following tables describe RPC methods available in the full node.
#### `net` namespace
| RPC / Part |
| RPC / Segment |
|-----------------|
| `net_listening` |
| `net_peerCount` |
@ -196,7 +196,7 @@ The following tables describe RPC methods available in the full node.
#### `trace` namespace
| RPC / Part | Note |
| RPC / Segment | Note |
|---------------------------------|------------------------------|
| `trace_block` | Only for the last 128 blocks |
| `trace_call` | Only for the last 128 blocks |
@ -209,7 +209,7 @@ The following tables describe RPC methods available in the full node.
#### `txpool` namespace
| RPC / Part |
| RPC / Segment |
|----------------------|
| `txpool_content` |
| `txpool_contentFrom` |
@ -219,13 +219,13 @@ The following tables describe RPC methods available in the full node.
### Pruned Node
The following tables describe the requirements for prune parts, per RPC method:
- if the part is pruned, the RPC method still works
- ❌ - if the part is pruned, the RPC method doesn't work anymore
The following tables describe the requirements for prune segments, per RPC method:
- if the segment is pruned, the RPC method still works
- ❌ - if the segment is pruned, the RPC method doesn't work anymore
#### `debug` namespace
| RPC / Part | Sender Recovery | Transaction Lookup | Receipts | Account History | Storage History |
| RPC / Segment | Sender Recovery | Transaction Lookup | Receipts | Account History | Storage History |
|----------------------------|-----------------|--------------------|----------|-----------------|-----------------|
| `debug_getRawBlock` | ✅ | ✅ | ✅ | ✅ | ✅ |
| `debug_getRawHeader` | ✅ | ✅ | ✅ | ✅ | ✅ |
@ -241,7 +241,7 @@ The following tables describe the requirements for prune parts, per RPC method:
#### `eth` namespace
| RPC / Part | Sender Recovery | Transaction Lookup | Receipts | Account History | Storage History |
| RPC / Segment | Sender Recovery | Transaction Lookup | Receipts | Account History | Storage History |
|-------------------------------------------|-----------------|--------------------|----------|-----------------|-----------------|
| `eth_accounts` | ✅ | ✅ | ✅ | ✅ | ✅ |
| `eth_blockNumber` | ✅ | ✅ | ✅ | ✅ | ✅ |
@ -289,7 +289,7 @@ The following tables describe the requirements for prune parts, per RPC method:
#### `net` namespace
| RPC / Part | Sender Recovery | Transaction Lookup | Receipts | Account History | Storage History |
| RPC / Segment | Sender Recovery | Transaction Lookup | Receipts | Account History | Storage History |
|-----------------|-----------------|--------------------|----------|-----------------|-----------------|
| `net_listening` | ✅ | ✅ | ✅ | ✅ | ✅ |
| `net_peerCount` | ✅ | ✅ | ✅ | ✅ | ✅ |
@ -297,7 +297,7 @@ The following tables describe the requirements for prune parts, per RPC method:
#### `trace` namespace
| RPC / Part | Sender Recovery | Transaction Lookup | Receipts | Account History | Storage History |
| RPC / Segment | Sender Recovery | Transaction Lookup | Receipts | Account History | Storage History |
|---------------------------------|-----------------|--------------------|----------|-----------------|-----------------|
| `trace_block` | ✅ | ✅ | ✅ | ❌ | ❌ |
| `trace_call` | ✅ | ✅ | ✅ | ❌ | ❌ |
@ -310,7 +310,7 @@ The following tables describe the requirements for prune parts, per RPC method:
#### `txpool` namespace
| RPC / Part | Sender Recovery | Transaction Lookup | Receipts | Account History | Storage History |
| RPC / Segment | Sender Recovery | Transaction Lookup | Receipts | Account History | Storage History |
|----------------------|-----------------|--------------------|----------|-----------------|-----------------|
| `txpool_content` | ✅ | ✅ | ✅ | ✅ | ✅ |
| `txpool_contentFrom` | ✅ | ✅ | ✅ | ✅ | ✅ |

View File

@ -292,12 +292,13 @@ pub struct PruneConfig {
/// Minimum pruning interval measured in blocks.
pub block_interval: usize,
/// Pruning configuration for every part of the data that can be pruned.
pub parts: PruneModes,
#[serde(alias = "parts")]
pub segments: PruneModes,
}
impl Default for PruneConfig {
fn default() -> Self {
Self { block_interval: 5, parts: PruneModes::none() }
Self { block_interval: 5, segments: PruneModes::none() }
}
}
@ -338,7 +339,8 @@ mod tests {
// ensures config deserialization is backwards compatible
#[test]
fn test_backwards_compatibility() {
let alpha_0_0_8 = r"#[stages.headers]
let alpha_0_0_8 = r"#
[stages.headers]
downloader_max_concurrent_requests = 100
downloader_min_concurrent_requests = 5
downloader_max_buffered_responses = 100
@ -422,8 +424,33 @@ nanos = 0
[sessions.protocol_breach_request_timeout]
secs = 120
nanos = 0
#";
[prune]
block_interval = 5
[prune.parts]
sender_recovery = { distance = 16384 }
transaction_lookup = 'full'
receipts = { before = 1920000 }
account_history = { distance = 16384 }
storage_history = { distance = 16384 }
[prune.parts.receipts_log_filter]
'0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48' = { before = 17000000 }
'0xdac17f958d2ee523a2206206994597c13d831ec7' = { distance = 1000 }
#";
let _conf: Config = toml::from_str(alpha_0_0_8).unwrap();
let alpha_0_0_11 = r"#
[prune.segments]
sender_recovery = { distance = 16384 }
transaction_lookup = 'full'
receipts = { before = 1920000 }
account_history = { distance = 16384 }
storage_history = { distance = 16384 }
[prune.segments.receipts_log_filter]
'0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48' = { before = 17000000 }
'0xdac17f958d2ee523a2206206994597c13d831ec7' = { distance = 1000 }
#";
let _conf: Config = toml::from_str(alpha_0_0_11).unwrap();
}
}

View File

@ -174,7 +174,7 @@ struct Metrics {
impl From<PrunerError> for EngineHookError {
fn from(err: PrunerError) -> Self {
match err {
PrunerError::PrunePart(_) | PrunerError::InconsistentData(_) => {
PrunerError::PruneSegment(_) | PrunerError::InconsistentData(_) => {
EngineHookError::Internal(Box::new(err))
}
PrunerError::Interface(err) => err.into(),

View File

@ -84,7 +84,7 @@ impl<DB: Database + 'static> SnapshotHook<DB> {
let targets = snapshotter.get_snapshot_targets(finalized_block_number)?;
// Check if the snapshotting of any parts has been requested.
// Check if the snapshotting of any data has been requested.
if targets.any() {
let (tx, rx) = oneshot::channel();
self.task_spawner.spawn_critical_blocking(

View File

@ -1,4 +1,4 @@
use reth_primitives::{BlockNumHash, Bloom, PrunePartError, B256};
use reth_primitives::{BlockNumHash, Bloom, PruneSegmentError, B256};
use thiserror::Error;
/// Transaction validation errors
@ -76,9 +76,9 @@ pub enum BlockExecutionError {
/// Validation error, transparently wrapping `BlockValidationError`
#[error(transparent)]
Validation(#[from] BlockValidationError),
/// Pruning error, transparently wrapping `PrunePartError`
/// Pruning error, transparently wrapping `PruneSegmentError`
#[error(transparent)]
Pruning(#[from] PrunePartError),
Pruning(#[from] PruneSegmentError),
/// Error representing a provider error
#[error("Provider error")]
ProviderError,

View File

@ -78,7 +78,7 @@ pub use net::{
};
pub use peer::{PeerId, WithPeerId};
pub use prune::{
PruneCheckpoint, PruneMode, PruneModes, PrunePart, PrunePartError, PruneProgress,
PruneCheckpoint, PruneMode, PruneModes, PruneProgress, PruneSegment, PruneSegmentError,
ReceiptsLogPruneConfig, MINIMUM_PRUNING_DISTANCE,
};
pub use receipt::{Receipt, ReceiptWithBloom, ReceiptWithBloomRef, Receipts};

View File

@ -6,8 +6,7 @@ use reth_codecs::{main_codec, Compact};
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
#[cfg_attr(test, derive(Default))]
pub struct PruneCheckpoint {
/// Highest pruned block number.
/// If it's [None], the pruning for block `0` is not finished yet.
/// Highest pruned block number. If it's [None], the pruning for block `0` is not finished yet.
pub block_number: Option<BlockNumber>,
/// Highest pruned transaction number, if applicable.
pub tx_number: Option<TxNumber>,

View File

@ -1,12 +1,12 @@
mod checkpoint;
mod mode;
mod part;
mod segment;
mod target;
use crate::{Address, BlockNumber};
pub use checkpoint::PruneCheckpoint;
pub use mode::PruneMode;
pub use part::{PrunePart, PrunePartError};
pub use segment::{PruneSegment, PruneSegmentError};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
pub use target::{PruneModes, MINIMUM_PRUNING_DISTANCE};
@ -40,7 +40,7 @@ impl ReceiptsLogPruneConfig {
&self,
tip: BlockNumber,
pruned_block: Option<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<&Address>>, PrunePartError> {
) -> Result<BTreeMap<BlockNumber, Vec<&Address>>, PruneSegmentError> {
let mut map = BTreeMap::new();
let pruned_block = pruned_block.unwrap_or_default();
@ -53,7 +53,7 @@ impl ReceiptsLogPruneConfig {
// Reminder, that we increment because the [`BlockNumber`] key of the new map should be
// viewed as `PruneMode::Before(block)`
let block = (pruned_block + 1).max(
mode.prune_target_block(tip, MINIMUM_PRUNING_DISTANCE, PrunePart::ContractLogs)?
mode.prune_target_block(tip, MINIMUM_PRUNING_DISTANCE, PruneSegment::ContractLogs)?
.map(|(block, _)| block)
.unwrap_or_default() +
1,
@ -69,15 +69,17 @@ impl ReceiptsLogPruneConfig {
&self,
tip: BlockNumber,
pruned_block: Option<BlockNumber>,
) -> Result<Option<BlockNumber>, PrunePartError> {
) -> Result<Option<BlockNumber>, PruneSegmentError> {
let pruned_block = pruned_block.unwrap_or_default();
let mut lowest = None;
for (_, mode) in self.0.iter() {
if let PruneMode::Distance(_) = mode {
if let Some((block, _)) =
mode.prune_target_block(tip, MINIMUM_PRUNING_DISTANCE, PrunePart::ContractLogs)?
{
if let Some((block, _)) = mode.prune_target_block(
tip,
MINIMUM_PRUNING_DISTANCE,
PruneSegment::ContractLogs,
)? {
lowest = Some(lowest.unwrap_or(u64::MAX).min(block));
}
}

View File

@ -1,4 +1,4 @@
use crate::{BlockNumber, PrunePart, PrunePartError};
use crate::{BlockNumber, PruneSegment, PruneSegmentError};
use reth_codecs::{main_codec, Compact};
/// Prune mode.
@ -21,8 +21,8 @@ impl PruneMode {
&self,
tip: BlockNumber,
min_blocks: u64,
prune_part: PrunePart,
) -> Result<Option<(BlockNumber, PruneMode)>, PrunePartError> {
segment: PruneSegment,
) -> Result<Option<(BlockNumber, PruneMode)>, PruneSegmentError> {
let result = match self {
PruneMode::Full if min_blocks == 0 => Some((tip, *self)),
PruneMode::Distance(distance) if *distance > tip => None, // Nothing to prune yet
@ -31,7 +31,7 @@ impl PruneMode {
}
PruneMode::Before(n) if *n > tip => None, // Nothing to prune yet
PruneMode::Before(n) if tip - n >= min_blocks => Some((n - 1, *self)),
_ => return Err(PrunePartError::Configuration(prune_part)),
_ => return Err(PruneSegmentError::Configuration(segment)),
};
Ok(result)
}
@ -65,7 +65,7 @@ impl Default for PruneMode {
#[cfg(test)]
mod tests {
use crate::{prune::PruneMode, PrunePart, PrunePartError, MINIMUM_PRUNING_DISTANCE};
use crate::{prune::PruneMode, PruneSegment, PruneSegmentError, MINIMUM_PRUNING_DISTANCE};
use assert_matches::assert_matches;
use serde::Deserialize;
@ -73,11 +73,11 @@ mod tests {
fn test_prune_target_block() {
let tip = 20000;
let min_blocks = MINIMUM_PRUNING_DISTANCE;
let prune_part = PrunePart::Receipts;
let segment = PruneSegment::Receipts;
let tests = vec![
// MINIMUM_PRUNING_DISTANCE makes this impossible
(PruneMode::Full, Err(PrunePartError::Configuration(prune_part))),
(PruneMode::Full, Err(PruneSegmentError::Configuration(segment))),
// Nothing to prune
(PruneMode::Distance(tip + 1), Ok(None)),
(PruneMode::Distance(min_blocks + 1), Ok(Some(tip - (min_blocks + 1)))),
@ -91,12 +91,12 @@ mod tests {
PruneMode::Before(tip - MINIMUM_PRUNING_DISTANCE - 1),
Ok(Some(tip - MINIMUM_PRUNING_DISTANCE - 2)),
),
(PruneMode::Before(tip - 1), Err(PrunePartError::Configuration(prune_part))),
(PruneMode::Before(tip - 1), Err(PruneSegmentError::Configuration(segment))),
];
for (index, (mode, expected_result)) in tests.into_iter().enumerate() {
assert_eq!(
mode.prune_target_block(tip, min_blocks, prune_part),
mode.prune_target_block(tip, min_blocks, segment),
expected_result.map(|r| r.map(|b| (b, mode))),
"Test {} failed",
index + 1,
@ -105,7 +105,7 @@ mod tests {
// Test for a scenario where there are no minimum blocks and Full can be used
assert_eq!(
PruneMode::Full.prune_target_block(tip, 0, prune_part),
PruneMode::Full.prune_target_block(tip, 0, segment),
Ok(Some((tip, PruneMode::Full))),
);
}

View File

@ -1,39 +0,0 @@
use derive_more::Display;
use reth_codecs::{main_codec, Compact};
use thiserror::Error;
/// Part of the data that can be pruned.
#[main_codec]
#[derive(Debug, Display, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub enum PrunePart {
/// Prune part responsible for the `TxSenders` table.
SenderRecovery,
/// Prune part responsible for the `TxHashNumber` table.
TransactionLookup,
/// Prune part responsible for all `Receipts`.
Receipts,
/// Prune part responsible for some `Receipts` filtered by logs.
ContractLogs,
/// Prune part responsible for the `AccountChangeSet` and `AccountHistory` tables.
AccountHistory,
/// Prune part responsible for the `StorageChangeSet` and `StorageHistory` tables.
StorageHistory,
}
/// PrunePart error type.
#[derive(Debug, Error, PartialEq, Eq, Clone)]
pub enum PrunePartError {
/// Invalid configuration of a prune part.
#[error("The configuration provided for {0} is invalid.")]
Configuration(PrunePart),
/// Receipts have been pruned
#[error("Receipts have been pruned")]
ReceiptsPruned,
}
#[cfg(test)]
impl Default for PrunePart {
fn default() -> Self {
Self::SenderRecovery
}
}

View File

@ -0,0 +1,39 @@
use derive_more::Display;
use reth_codecs::{main_codec, Compact};
use thiserror::Error;
/// Segment of the data that can be pruned.
#[main_codec]
#[derive(Debug, Display, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub enum PruneSegment {
/// Prune segment responsible for the `TxSenders` table.
SenderRecovery,
/// Prune segment responsible for the `TxHashNumber` table.
TransactionLookup,
/// Prune segment responsible for all `Receipts`.
Receipts,
/// Prune segment responsible for some `Receipts` filtered by logs.
ContractLogs,
/// Prune segment responsible for the `AccountChangeSet` and `AccountHistory` tables.
AccountHistory,
/// Prune segment responsible for the `StorageChangeSet` and `StorageHistory` tables.
StorageHistory,
}
/// PruneSegment error type.
#[derive(Debug, Error, PartialEq, Eq, Clone)]
pub enum PruneSegmentError {
/// Invalid configuration of a prune segment.
#[error("The configuration provided for {0} is invalid.")]
Configuration(PruneSegment),
/// Receipts have been pruned
#[error("Receipts have been pruned")]
ReceiptsPruned,
}
#[cfg(test)]
impl Default for PruneSegment {
fn default() -> Self {
Self::SenderRecovery
}
}

View File

@ -1,6 +1,6 @@
use crate::{
prune::PrunePartError, serde_helper::deserialize_opt_prune_mode_with_min_blocks, BlockNumber,
PruneMode, PrunePart, ReceiptsLogPruneConfig,
prune::PruneSegmentError, serde_helper::deserialize_opt_prune_mode_with_min_blocks,
BlockNumber, PruneMode, PruneSegment, ReceiptsLogPruneConfig,
};
use paste::paste;
use serde::{Deserialize, Serialize};
@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize};
/// unwind is required.
pub const MINIMUM_PRUNING_DISTANCE: u64 = 32 * 2 + 10_000;
/// Pruning configuration for every part of the data that can be pruned.
/// Pruning configuration for every segment of the data that can be pruned.
#[derive(Debug, Clone, Default, Deserialize, Eq, PartialEq, Serialize)]
#[serde(default)]
pub struct PruneModes {
@ -49,8 +49,8 @@ pub struct PruneModes {
pub receipts_log_filter: ReceiptsLogPruneConfig,
}
macro_rules! impl_prune_parts {
($(($part:ident, $variant:ident, $min_blocks:expr)),+) => {
macro_rules! impl_prune_segments {
($(($segment:ident, $variant:ident, $min_blocks:expr)),+) => {
$(
paste! {
#[doc = concat!(
@ -58,8 +58,8 @@ macro_rules! impl_prune_parts {
stringify!($variant),
" should be pruned at the target block according to the provided tip."
)]
pub fn [<should_prune_ $part>](&self, block: BlockNumber, tip: BlockNumber) -> bool {
if let Some(mode) = &self.$part {
pub fn [<should_prune_ $segment>](&self, block: BlockNumber, tip: BlockNumber) -> bool {
if let Some(mode) = &self.$segment {
return mode.should_prune(block, tip)
}
false
@ -74,9 +74,9 @@ macro_rules! impl_prune_parts {
stringify!($variant),
" pruning needs to be done, inclusive, according to the provided tip."
)]
pub fn [<prune_target_block_ $part>](&self, tip: BlockNumber) -> Result<Option<(BlockNumber, PruneMode)>, PrunePartError> {
match self.$part {
Some(mode) => mode.prune_target_block(tip, $min_blocks.unwrap_or_default(), PrunePart::$variant),
pub fn [<prune_target_block_ $segment>](&self, tip: BlockNumber) -> Result<Option<(BlockNumber, PruneMode)>, PruneSegmentError> {
match self.$segment {
Some(mode) => mode.prune_target_block(tip, $min_blocks.unwrap_or_default(), PruneSegment::$variant),
None => Ok(None)
}
}
@ -87,7 +87,7 @@ macro_rules! impl_prune_parts {
pub fn all() -> Self {
Self {
$(
$part: Some(PruneMode::Full),
$segment: Some(PruneMode::Full),
)+
receipts_log_filter: Default::default()
}
@ -102,7 +102,7 @@ impl PruneModes {
PruneModes::default()
}
impl_prune_parts!(
impl_prune_segments!(
(sender_recovery, SenderRecovery, None),
(transaction_lookup, TransactionLookup, None),
(receipts, Receipts, Some(MINIMUM_PRUNING_DISTANCE)),

View File

@ -2,7 +2,7 @@ use crate::{
compression::{RECEIPT_COMPRESSOR, RECEIPT_DECOMPRESSOR},
logs_bloom,
proofs::calculate_receipt_root_ref,
Bloom, Log, PrunePartError, TxType, B256,
Bloom, Log, PruneSegmentError, TxType, B256,
};
use alloy_rlp::{length_of_length, Decodable, Encodable};
use bytes::{Buf, BufMut, BytesMut};
@ -94,7 +94,7 @@ impl Receipts {
}
/// Retrieves gas spent by transactions as a vector of tuples (transaction index, gas used).
pub fn gas_spent_by_tx(&self) -> Result<Vec<(u64, u64)>, PrunePartError> {
pub fn gas_spent_by_tx(&self) -> Result<Vec<(u64, u64)>, PruneSegmentError> {
self.last()
.map(|block_r| {
block_r
@ -104,10 +104,10 @@ impl Receipts {
if let Some(receipt) = tx_r.as_ref() {
Ok((id as u64, receipt.cumulative_gas_used))
} else {
Err(PrunePartError::ReceiptsPruned)
Err(PruneSegmentError::ReceiptsPruned)
}
})
.collect::<Result<Vec<_>, PrunePartError>>()
.collect::<Result<Vec<_>, PruneSegmentError>>()
})
.unwrap_or(Ok(vec![]))
}

View File

@ -1,13 +1,13 @@
use reth_db::DatabaseError;
use reth_interfaces::RethError;
use reth_primitives::PrunePartError;
use reth_primitives::PruneSegmentError;
use reth_provider::ProviderError;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum PrunerError {
#[error(transparent)]
PrunePart(#[from] PrunePartError),
PruneSegment(#[from] PruneSegmentError),
#[error("Inconsistent data: {0}")]
InconsistentData(&'static str),

View File

@ -1,4 +1,4 @@
use reth_primitives::{BlockNumber, PrunePart, PruneProgress};
use reth_primitives::{BlockNumber, PruneProgress, PruneSegment};
use std::{collections::BTreeMap, time::Duration};
/// An event emitted by a [Pruner][crate::Pruner].
@ -8,6 +8,6 @@ pub enum PrunerEvent {
Finished {
tip_block_number: BlockNumber,
elapsed: Duration,
parts: BTreeMap<PrunePart, (PruneProgress, usize)>,
segments: BTreeMap<PruneSegment, (PruneProgress, usize)>,
},
}

View File

@ -13,6 +13,7 @@ mod error;
mod event;
mod metrics;
mod pruner;
mod segments;
use crate::metrics::Metrics;
pub use error::PrunerError;

View File

@ -1,5 +1,5 @@
use reth_metrics::{metrics, metrics::Histogram, Metrics};
use reth_primitives::PrunePart;
use reth_primitives::PruneSegment;
use std::collections::HashMap;
#[derive(Metrics)]
@ -8,25 +8,25 @@ pub(crate) struct Metrics {
/// Pruning duration
pub(crate) duration_seconds: Histogram,
#[metric(skip)]
prune_parts: HashMap<PrunePart, PrunerPartMetrics>,
prune_segments: HashMap<PruneSegment, PrunerSegmentMetrics>,
}
impl Metrics {
/// Returns existing or initializes a new instance of [PrunerPartMetrics] for the provided
/// [PrunePart].
pub(crate) fn get_prune_part_metrics(
/// Returns existing or initializes a new instance of [PrunerSegmentMetrics] for the provided
/// [PruneSegment].
pub(crate) fn get_prune_segment_metrics(
&mut self,
prune_part: PrunePart,
) -> &mut PrunerPartMetrics {
self.prune_parts.entry(prune_part).or_insert_with(|| {
PrunerPartMetrics::new_with_labels(&[("part", prune_part.to_string())])
segment: PruneSegment,
) -> &mut PrunerSegmentMetrics {
self.prune_segments.entry(segment).or_insert_with(|| {
PrunerSegmentMetrics::new_with_labels(&[("segment", segment.to_string())])
})
}
}
#[derive(Metrics)]
#[metrics(scope = "pruner.parts")]
pub(crate) struct PrunerPartMetrics {
/// Pruning duration for this part
#[metrics(scope = "pruner.segments")]
pub(crate) struct PrunerSegmentMetrics {
/// Pruning duration for this segment
pub(crate) duration_seconds: Histogram,
}

View File

@ -1,6 +1,10 @@
//! Support for pruning.
use crate::{Metrics, PrunerError, PrunerEvent};
use crate::{
segments,
segments::{PruneInput, Segment},
Metrics, PrunerError, PrunerEvent,
};
use rayon::prelude::*;
use reth_db::{
abstraction::cursor::{DbCursorRO, DbCursorRW},
@ -14,7 +18,7 @@ use reth_db::{
use reth_interfaces::RethResult;
use reth_primitives::{
listener::EventListeners, BlockNumber, ChainSpec, PruneCheckpoint, PruneMode, PruneModes,
PrunePart, PruneProgress, TxNumber, MINIMUM_PRUNING_DISTANCE,
PruneProgress, PruneSegment, TxNumber, MINIMUM_PRUNING_DISTANCE,
};
use reth_provider::{
BlockReader, DatabaseProviderRW, ProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
@ -28,8 +32,8 @@ use tracing::{debug, error, instrument, trace};
/// Result of [Pruner::run] execution.
pub type PrunerResult = Result<PruneProgress, PrunerError>;
/// Result of part pruning.
type PrunePartResult = Result<(PruneProgress, usize), PrunerError>;
/// Result of segment pruning.
type PruneSegmentResult = Result<(PruneProgress, usize), PrunerError>;
/// The pruner type itself with the result of [Pruner::run]
pub type PrunerWithResult<DB> = (Pruner<DB>, PrunerResult);
@ -39,7 +43,7 @@ pub type PrunerWithResult<DB> = (Pruner<DB>, PrunerResult);
pub struct Pruner<DB> {
metrics: Metrics,
provider_factory: ProviderFactory<DB>,
/// Minimum pruning interval measured in blocks. All prune parts are checked and, if needed,
/// Minimum pruning interval measured in blocks. All prune segments are checked and, if needed,
/// pruned, when the chain advances by the specified number of blocks.
min_block_interval: usize,
/// Previous tip block number when the pruner was run. Even if no data was pruned, this block
@ -96,9 +100,9 @@ impl<DB: Database> Pruner<DB> {
let provider = self.provider_factory.provider_rw()?;
let mut done = true;
let mut parts = BTreeMap::new();
let mut segments = BTreeMap::new();
// TODO(alexey): prune snapshot parts of data (headers, transactions)
// TODO(alexey): prune snapshotted segments of data (headers, transactions)
// let highest_snapshots = *self.highest_snapshots_tracker.borrow();
// Multiply `delete_limit` (number of row to delete per block) by number of blocks since
@ -115,41 +119,47 @@ impl<DB: Database> Pruner<DB> {
{
trace!(
target: "pruner",
prune_part = ?PrunePart::Receipts,
segment = ?PruneSegment::Receipts,
%to_block,
?prune_mode,
"Got target block to prune"
);
let part_start = Instant::now();
let (part_progress, deleted) =
self.prune_receipts(&provider, to_block, prune_mode, delete_limit)?;
let segment_start = Instant::now();
let segment = segments::Receipts::default();
let output = segment.prune(&provider, PruneInput { to_block, delete_limit })?;
if let Some(checkpoint) = output.checkpoint {
segment.save_checkpoint(&provider, checkpoint.as_prune_checkpoint(prune_mode))?;
}
self.metrics
.get_prune_part_metrics(PrunePart::Receipts)
.get_prune_segment_metrics(PruneSegment::Receipts)
.duration_seconds
.record(part_start.elapsed());
.record(segment_start.elapsed());
done = done && part_progress.is_finished();
delete_limit = delete_limit.saturating_sub(deleted);
parts.insert(PrunePart::Receipts, (part_progress, deleted));
done = done && output.done;
delete_limit = delete_limit.saturating_sub(output.pruned);
segments.insert(
PruneSegment::Receipts,
(PruneProgress::from_done(output.done), output.pruned),
);
} else {
trace!(target: "pruner", prune_part = ?PrunePart::Receipts, "No target block to prune");
trace!(target: "pruner", prune_segment = ?PruneSegment::Receipts, "No target block to prune");
}
if !self.modes.receipts_log_filter.is_empty() {
let part_start = Instant::now();
let (part_progress, deleted) =
let segment_start = Instant::now();
let (segment_progress, deleted) =
self.prune_receipts_by_logs(&provider, tip_block_number, delete_limit)?;
self.metrics
.get_prune_part_metrics(PrunePart::ContractLogs)
.get_prune_segment_metrics(PruneSegment::ContractLogs)
.duration_seconds
.record(part_start.elapsed());
.record(segment_start.elapsed());
done = done && part_progress.is_finished();
done = done && segment_progress.is_finished();
delete_limit = delete_limit.saturating_sub(deleted);
parts.insert(PrunePart::ContractLogs, (part_progress, deleted));
segments.insert(PruneSegment::ContractLogs, (segment_progress, deleted));
} else {
trace!(target: "pruner", prune_part = ?PrunePart::ContractLogs, "No filter to prune");
trace!(target: "pruner", prune_segment = ?PruneSegment::ContractLogs, "No filter to prune");
}
if let (Some((to_block, prune_mode)), true) =
@ -157,27 +167,27 @@ impl<DB: Database> Pruner<DB> {
{
trace!(
target: "pruner",
prune_part = ?PrunePart::TransactionLookup,
prune_segment = ?PruneSegment::TransactionLookup,
%to_block,
?prune_mode,
"Got target block to prune"
);
let part_start = Instant::now();
let (part_progress, deleted) =
let segment_start = Instant::now();
let (segment_progress, deleted) =
self.prune_transaction_lookup(&provider, to_block, prune_mode, delete_limit)?;
self.metrics
.get_prune_part_metrics(PrunePart::TransactionLookup)
.get_prune_segment_metrics(PruneSegment::TransactionLookup)
.duration_seconds
.record(part_start.elapsed());
.record(segment_start.elapsed());
done = done && part_progress.is_finished();
done = done && segment_progress.is_finished();
delete_limit = delete_limit.saturating_sub(deleted);
parts.insert(PrunePart::TransactionLookup, (part_progress, deleted));
segments.insert(PruneSegment::TransactionLookup, (segment_progress, deleted));
} else {
trace!(
target: "pruner",
prune_part = ?PrunePart::TransactionLookup,
prune_segment = ?PruneSegment::TransactionLookup,
"No target block to prune"
);
}
@ -187,27 +197,27 @@ impl<DB: Database> Pruner<DB> {
{
trace!(
target: "pruner",
prune_part = ?PrunePart::SenderRecovery,
prune_segment = ?PruneSegment::SenderRecovery,
%to_block,
?prune_mode,
"Got target block to prune"
);
let part_start = Instant::now();
let (part_progress, deleted) =
let segment_start = Instant::now();
let (segment_progress, deleted) =
self.prune_transaction_senders(&provider, to_block, prune_mode, delete_limit)?;
self.metrics
.get_prune_part_metrics(PrunePart::SenderRecovery)
.get_prune_segment_metrics(PruneSegment::SenderRecovery)
.duration_seconds
.record(part_start.elapsed());
.record(segment_start.elapsed());
done = done && part_progress.is_finished();
done = done && segment_progress.is_finished();
delete_limit = delete_limit.saturating_sub(deleted);
parts.insert(PrunePart::SenderRecovery, (part_progress, deleted));
segments.insert(PruneSegment::SenderRecovery, (segment_progress, deleted));
} else {
trace!(
target: "pruner",
prune_part = ?PrunePart::SenderRecovery,
prune_segment = ?PruneSegment::SenderRecovery,
"No target block to prune"
);
}
@ -217,27 +227,27 @@ impl<DB: Database> Pruner<DB> {
{
trace!(
target: "pruner",
prune_part = ?PrunePart::AccountHistory,
prune_segment = ?PruneSegment::AccountHistory,
%to_block,
?prune_mode,
"Got target block to prune"
);
let part_start = Instant::now();
let (part_progress, deleted) =
let segment_start = Instant::now();
let (segment_progress, deleted) =
self.prune_account_history(&provider, to_block, prune_mode, delete_limit)?;
self.metrics
.get_prune_part_metrics(PrunePart::AccountHistory)
.get_prune_segment_metrics(PruneSegment::AccountHistory)
.duration_seconds
.record(part_start.elapsed());
.record(segment_start.elapsed());
done = done && part_progress.is_finished();
done = done && segment_progress.is_finished();
delete_limit = delete_limit.saturating_sub(deleted);
parts.insert(PrunePart::AccountHistory, (part_progress, deleted));
segments.insert(PruneSegment::AccountHistory, (segment_progress, deleted));
} else {
trace!(
target: "pruner",
prune_part = ?PrunePart::AccountHistory,
prune_segment = ?PruneSegment::AccountHistory,
"No target block to prune"
);
}
@ -247,27 +257,27 @@ impl<DB: Database> Pruner<DB> {
{
trace!(
target: "pruner",
prune_part = ?PrunePart::StorageHistory,
prune_segment = ?PruneSegment::StorageHistory,
%to_block,
?prune_mode,
"Got target block to prune"
);
let part_start = Instant::now();
let (part_progress, deleted) =
let segment_start = Instant::now();
let (segment_progress, deleted) =
self.prune_storage_history(&provider, to_block, prune_mode, delete_limit)?;
self.metrics
.get_prune_part_metrics(PrunePart::StorageHistory)
.get_prune_segment_metrics(PruneSegment::StorageHistory)
.duration_seconds
.record(part_start.elapsed());
.record(segment_start.elapsed());
done = done && part_progress.is_finished();
done = done && segment_progress.is_finished();
delete_limit = delete_limit.saturating_sub(deleted);
parts.insert(PrunePart::StorageHistory, (part_progress, deleted));
segments.insert(PruneSegment::StorageHistory, (segment_progress, deleted));
} else {
trace!(
target: "pruner",
prune_part = ?PrunePart::StorageHistory,
prune_segment = ?PruneSegment::StorageHistory,
"No target block to prune"
);
}
@ -284,11 +294,11 @@ impl<DB: Database> Pruner<DB> {
?elapsed,
%delete_limit,
%done,
?parts,
?segments,
"Pruner finished"
);
self.listeners.notify(PrunerEvent::Finished { tip_block_number, elapsed, parts });
self.listeners.notify(PrunerEvent::Finished { tip_block_number, elapsed, segments });
Ok(PruneProgress::from_done(done))
}
@ -326,11 +336,11 @@ impl<DB: Database> Pruner<DB> {
fn get_next_block_range_from_checkpoint(
&self,
provider: &DatabaseProviderRW<'_, DB>,
prune_part: PrunePart,
prune_segment: PruneSegment,
to_block: BlockNumber,
) -> RethResult<Option<RangeInclusive<BlockNumber>>> {
let from_block = provider
.get_prune_checkpoint(prune_part)?
.get_prune_checkpoint(prune_segment)?
.and_then(|checkpoint| checkpoint.block_number)
// Checkpoint exists, prune from the next block after the highest pruned one
.map(|block_number| block_number + 1)
@ -356,16 +366,16 @@ impl<DB: Database> Pruner<DB> {
fn get_next_tx_num_range_from_checkpoint(
&self,
provider: &DatabaseProviderRW<'_, DB>,
prune_part: PrunePart,
prune_segment: PruneSegment,
to_block: BlockNumber,
) -> RethResult<Option<RangeInclusive<TxNumber>>> {
let from_tx_number = provider
.get_prune_checkpoint(prune_part)?
.get_prune_checkpoint(prune_segment)?
// Checkpoint exists, prune from the next transaction after the highest pruned one
.and_then(|checkpoint| match checkpoint.tx_number {
Some(tx_number) => Some(tx_number + 1),
_ => {
error!(target: "pruner", %prune_part, ?checkpoint, "Expected transaction number in prune checkpoint, found None");
error!(target: "pruner", %prune_segment, ?checkpoint, "Expected transaction number in prune checkpoint, found None");
None
},
})
@ -386,59 +396,6 @@ impl<DB: Database> Pruner<DB> {
Ok(Some(range))
}
/// Prune receipts up to the provided block, inclusive, respecting the batch size.
#[instrument(level = "trace", skip(self, provider), target = "pruner")]
fn prune_receipts(
&self,
provider: &DatabaseProviderRW<'_, DB>,
to_block: BlockNumber,
prune_mode: PruneMode,
delete_limit: usize,
) -> PrunePartResult {
let tx_range = match self.get_next_tx_num_range_from_checkpoint(
provider,
PrunePart::Receipts,
to_block,
)? {
Some(range) => range,
None => {
trace!(target: "pruner", "No receipts to prune");
return Ok((PruneProgress::Finished, 0))
}
};
let tx_range_end = *tx_range.end();
let mut last_pruned_transaction = tx_range_end;
let (deleted, done) = provider.prune_table_with_range::<tables::Receipts>(
tx_range,
delete_limit,
|_| false,
|row| last_pruned_transaction = row.0,
)?;
trace!(target: "pruner", %deleted, %done, "Pruned receipts");
let last_pruned_block = provider
.transaction_block(last_pruned_transaction)?
.ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
// If there's more receipts to prune, set the checkpoint block number to previous,
// so we could finish pruning its receipts on the next run.
.checked_sub(if done { 0 } else { 1 });
let prune_checkpoint = PruneCheckpoint {
block_number: last_pruned_block,
tx_number: Some(last_pruned_transaction),
prune_mode,
};
provider.save_prune_checkpoint(PrunePart::Receipts, prune_checkpoint)?;
// `PrunePart::Receipts` overrides `PrunePart::ContractLogs`, so we can preemptively
// limit their pruning start point.
provider.save_prune_checkpoint(PrunePart::ContractLogs, prune_checkpoint)?;
Ok((PruneProgress::from_done(done), deleted))
}
/// Prune receipts up to the provided block, inclusive, by filtering logs. Works as in inclusion
/// list, and removes every receipt not belonging to it. Respects the batch size.
#[instrument(level = "trace", skip(self, provider), target = "pruner")]
@ -447,7 +404,7 @@ impl<DB: Database> Pruner<DB> {
provider: &DatabaseProviderRW<'_, DB>,
tip_block_number: BlockNumber,
delete_limit: usize,
) -> PrunePartResult {
) -> PruneSegmentResult {
// Contract log filtering removes every receipt possible except the ones in the list. So,
// for the other receipts it's as if they had a `PruneMode::Distance()` of
// `MINIMUM_PRUNING_DISTANCE`.
@ -455,14 +412,14 @@ impl<DB: Database> Pruner<DB> {
.prune_target_block(
tip_block_number,
MINIMUM_PRUNING_DISTANCE,
PrunePart::ContractLogs,
PruneSegment::ContractLogs,
)?
.map(|(bn, _)| bn)
.unwrap_or_default();
// Get status checkpoint from latest run
let mut last_pruned_block = provider
.get_prune_checkpoint(PrunePart::ContractLogs)?
.get_prune_checkpoint(PruneSegment::ContractLogs)?
.and_then(|checkpoint| checkpoint.block_number);
let initial_last_pruned_block = last_pruned_block;
@ -613,7 +570,7 @@ impl<DB: Database> Pruner<DB> {
.unwrap_or(to_block);
provider.save_prune_checkpoint(
PrunePart::ContractLogs,
PruneSegment::ContractLogs,
PruneCheckpoint {
block_number: Some(prune_mode_block.min(last_pruned_block.unwrap_or(u64::MAX))),
tx_number: last_pruned_transaction,
@ -632,10 +589,10 @@ impl<DB: Database> Pruner<DB> {
to_block: BlockNumber,
prune_mode: PruneMode,
delete_limit: usize,
) -> PrunePartResult {
) -> PruneSegmentResult {
let (start, end) = match self.get_next_tx_num_range_from_checkpoint(
provider,
PrunePart::TransactionLookup,
PruneSegment::TransactionLookup,
to_block,
)? {
Some(range) => range,
@ -685,7 +642,7 @@ impl<DB: Database> Pruner<DB> {
.checked_sub(if done { 0 } else { 1 });
provider.save_prune_checkpoint(
PrunePart::TransactionLookup,
PruneSegment::TransactionLookup,
PruneCheckpoint {
block_number: last_pruned_block,
tx_number: Some(last_pruned_transaction),
@ -704,10 +661,10 @@ impl<DB: Database> Pruner<DB> {
to_block: BlockNumber,
prune_mode: PruneMode,
delete_limit: usize,
) -> PrunePartResult {
) -> PruneSegmentResult {
let tx_range = match self.get_next_tx_num_range_from_checkpoint(
provider,
PrunePart::SenderRecovery,
PruneSegment::SenderRecovery,
to_block,
)? {
Some(range) => range,
@ -735,7 +692,7 @@ impl<DB: Database> Pruner<DB> {
.checked_sub(if done { 0 } else { 1 });
provider.save_prune_checkpoint(
PrunePart::SenderRecovery,
PruneSegment::SenderRecovery,
PruneCheckpoint {
block_number: last_pruned_block,
tx_number: Some(last_pruned_transaction),
@ -754,10 +711,10 @@ impl<DB: Database> Pruner<DB> {
to_block: BlockNumber,
prune_mode: PruneMode,
delete_limit: usize,
) -> PrunePartResult {
) -> PruneSegmentResult {
let range = match self.get_next_block_range_from_checkpoint(
provider,
PrunePart::AccountHistory,
PruneSegment::AccountHistory,
to_block,
)? {
Some(range) => range,
@ -801,7 +758,7 @@ impl<DB: Database> Pruner<DB> {
}
provider.save_prune_checkpoint(
PrunePart::AccountHistory,
PruneSegment::AccountHistory,
PruneCheckpoint {
block_number: last_changeset_pruned_block,
tx_number: None,
@ -820,10 +777,10 @@ impl<DB: Database> Pruner<DB> {
to_block: BlockNumber,
prune_mode: PruneMode,
delete_limit: usize,
) -> PrunePartResult {
) -> PruneSegmentResult {
let range = match self.get_next_block_range_from_checkpoint(
provider,
PrunePart::StorageHistory,
PruneSegment::StorageHistory,
to_block,
)? {
Some(range) => range,
@ -867,7 +824,7 @@ impl<DB: Database> Pruner<DB> {
}
provider.save_prune_checkpoint(
PrunePart::StorageHistory,
PruneSegment::StorageHistory,
PruneCheckpoint {
block_number: last_changeset_pruned_block,
tx_number: None,
@ -1002,7 +959,7 @@ mod tests {
},
};
use reth_primitives::{
BlockNumber, PruneCheckpoint, PruneMode, PruneModes, PrunePart, PruneProgress,
BlockNumber, PruneCheckpoint, PruneMode, PruneModes, PruneProgress, PruneSegment,
ReceiptsLogPruneConfig, TxNumber, B256, MAINNET,
};
use reth_provider::{PruneCheckpointReader, TransactionsProvider};
@ -1034,105 +991,6 @@ mod tests {
assert!(!pruner.is_pruning_needed(third_block_number));
}
#[test]
fn prune_receipts() {
let tx = TestTransaction::default();
let mut rng = generators::rng();
let blocks = random_block_range(&mut rng, 1..=10, B256::ZERO, 2..3);
tx.insert_blocks(blocks.iter(), None).expect("insert blocks");
let mut receipts = Vec::new();
for block in &blocks {
for transaction in &block.body {
receipts
.push((receipts.len() as u64, random_receipt(&mut rng, transaction, Some(0))));
}
}
tx.insert_receipts(receipts.clone()).expect("insert receipts");
assert_eq!(
tx.table::<tables::Transactions>().unwrap().len(),
blocks.iter().map(|block| block.body.len()).sum::<usize>()
);
assert_eq!(
tx.table::<tables::Transactions>().unwrap().len(),
tx.table::<tables::Receipts>().unwrap().len()
);
let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
let prune_mode = PruneMode::Before(to_block);
let pruner = Pruner::new(
tx.inner_raw(),
MAINNET.clone(),
1,
PruneModes { receipts: Some(prune_mode), ..Default::default() },
// Less than total amount of receipts to prune to test the batching logic
10,
watch::channel(None).1,
);
let next_tx_number_to_prune = tx
.inner()
.get_prune_checkpoint(PrunePart::Receipts)
.unwrap()
.and_then(|checkpoint| checkpoint.tx_number)
.map(|tx_number| tx_number + 1)
.unwrap_or_default();
let last_pruned_tx_number = blocks
.iter()
.take(to_block as usize)
.map(|block| block.body.len())
.sum::<usize>()
.min(next_tx_number_to_prune as usize + pruner.delete_limit)
.sub(1);
let last_pruned_block_number = blocks
.iter()
.fold_while((0, 0), |(_, mut tx_count), block| {
tx_count += block.body.len();
if tx_count > last_pruned_tx_number {
Done((block.number, tx_count))
} else {
Continue((block.number, tx_count))
}
})
.into_inner()
.0;
let provider = tx.inner_rw();
let result =
pruner.prune_receipts(&provider, to_block, prune_mode, pruner.delete_limit);
provider.commit().expect("commit");
assert_matches!(result, Ok(_));
let result = result.unwrap();
assert_eq!(result, expected_result);
let last_pruned_block_number =
last_pruned_block_number.checked_sub(if result.0.is_finished() { 0 } else { 1 });
assert_eq!(
tx.table::<tables::Receipts>().unwrap().len(),
receipts.len() - (last_pruned_tx_number + 1)
);
assert_eq!(
tx.inner().get_prune_checkpoint(PrunePart::Receipts).unwrap(),
Some(PruneCheckpoint {
block_number: last_pruned_block_number,
tx_number: Some(last_pruned_tx_number as TxNumber),
prune_mode
})
);
};
test_prune(6, (PruneProgress::HasMoreData, 10));
test_prune(6, (PruneProgress::Finished, 2));
test_prune(10, (PruneProgress::Finished, 8));
}
#[test]
fn prune_transaction_lookup() {
let tx = TestTransaction::default();
@ -1173,7 +1031,7 @@ mod tests {
let next_tx_number_to_prune = tx
.inner()
.get_prune_checkpoint(PrunePart::TransactionLookup)
.get_prune_checkpoint(PruneSegment::TransactionLookup)
.unwrap()
.and_then(|checkpoint| checkpoint.tx_number)
.map(|tx_number| tx_number + 1)
@ -1222,7 +1080,7 @@ mod tests {
tx_hash_numbers.len() - (last_pruned_tx_number + 1)
);
assert_eq!(
tx.inner().get_prune_checkpoint(PrunePart::TransactionLookup).unwrap(),
tx.inner().get_prune_checkpoint(PruneSegment::TransactionLookup).unwrap(),
Some(PruneCheckpoint {
block_number: last_pruned_block_number,
tx_number: Some(last_pruned_tx_number as TxNumber),
@ -1280,7 +1138,7 @@ mod tests {
let next_tx_number_to_prune = tx
.inner()
.get_prune_checkpoint(PrunePart::SenderRecovery)
.get_prune_checkpoint(PruneSegment::SenderRecovery)
.unwrap()
.and_then(|checkpoint| checkpoint.tx_number)
.map(|tx_number| tx_number + 1)
@ -1329,7 +1187,7 @@ mod tests {
transaction_senders.len() - (last_pruned_tx_number + 1)
);
assert_eq!(
tx.inner().get_prune_checkpoint(PrunePart::SenderRecovery).unwrap(),
tx.inner().get_prune_checkpoint(PruneSegment::SenderRecovery).unwrap(),
Some(PruneCheckpoint {
block_number: last_pruned_block_number,
tx_number: Some(last_pruned_tx_number as TxNumber),
@ -1467,7 +1325,7 @@ mod tests {
assert_eq!(actual_shards, expected_shards);
assert_eq!(
tx.inner().get_prune_checkpoint(PrunePart::AccountHistory).unwrap(),
tx.inner().get_prune_checkpoint(PruneSegment::AccountHistory).unwrap(),
Some(PruneCheckpoint {
block_number: Some(last_pruned_block_number),
tx_number: None,
@ -1607,7 +1465,7 @@ mod tests {
assert_eq!(actual_shards, expected_shards);
assert_eq!(
tx.inner().get_prune_checkpoint(PrunePart::StorageHistory).unwrap(),
tx.inner().get_prune_checkpoint(PruneSegment::StorageHistory).unwrap(),
Some(PruneCheckpoint {
block_number: Some(last_pruned_block_number),
tx_number: None,
@ -1688,7 +1546,7 @@ mod tests {
let (pruned_block, pruned_tx) = tx
.inner()
.get_prune_checkpoint(PrunePart::ContractLogs)
.get_prune_checkpoint(PruneSegment::ContractLogs)
.unwrap()
.map(|checkpoint| (checkpoint.block_number.unwrap(), checkpoint.tx_number.unwrap()))
.unwrap_or_default();

View File

@ -0,0 +1,126 @@
mod receipts;
pub(crate) use receipts::Receipts;
use crate::PrunerError;
use reth_db::database::Database;
use reth_interfaces::RethResult;
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber};
use reth_provider::{
BlockReader, DatabaseProviderRW, PruneCheckpointReader, PruneCheckpointWriter,
};
use std::ops::RangeInclusive;
use tracing::error;
/// A segment represents a pruning of some portion of the data.
///
/// Segments are called from [Pruner](crate::Pruner) with the following lifecycle:
/// 1. Call [Segment::prune] with `delete_limit` of [PruneInput].
/// 2. If [Segment::prune] returned a [Some] in `checkpoint` of [PruneOutput], call
/// [Segment::save_checkpoint].
/// 3. Subtract `pruned` of [PruneOutput] from `delete_limit` of next [PruneInput].
pub(crate) trait Segment {
/// Segment of the data that's pruned.
const SEGMENT: PruneSegment;
/// Prune data for [Self::SEGMENT] using the provided input.
fn prune<DB: Database>(
&self,
provider: &DatabaseProviderRW<'_, DB>,
input: PruneInput,
) -> Result<PruneOutput, PrunerError>;
/// Save checkpoint for [Self::SEGMENT] to the database.
fn save_checkpoint<DB: Database>(
&self,
provider: &DatabaseProviderRW<'_, DB>,
checkpoint: PruneCheckpoint,
) -> RethResult<()> {
provider.save_prune_checkpoint(Self::SEGMENT, checkpoint)
}
}
/// Segment pruning input, see [Segment::prune].
#[derive(Debug, Clone, Copy)]
pub(crate) struct PruneInput {
/// Target block up to which the pruning needs to be done, inclusive.
pub(crate) to_block: BlockNumber,
/// Maximum entries to delete from the database.
pub(crate) delete_limit: usize,
}
impl PruneInput {
/// Get next inclusive tx number range to prune according to the checkpoint and `to_block` block
/// number.
///
/// To get the range start:
/// 1. If checkpoint exists, get next block body and return its first tx number.
/// 2. If checkpoint doesn't exist, return 0.
///
/// To get the range end: get last tx number for `to_block`.
pub(crate) fn get_next_tx_num_range_from_checkpoint<DB: Database>(
&self,
provider: &DatabaseProviderRW<'_, DB>,
segment: PruneSegment,
) -> RethResult<Option<RangeInclusive<TxNumber>>> {
let from_tx_number = provider
.get_prune_checkpoint(segment)?
// Checkpoint exists, prune from the next transaction after the highest pruned one
.and_then(|checkpoint| match checkpoint.tx_number {
Some(tx_number) => Some(tx_number + 1),
_ => {
error!(target: "pruner", %segment, ?checkpoint, "Expected transaction number in prune checkpoint, found None");
None
},
})
// No checkpoint exists, prune from genesis
.unwrap_or(0);
let to_tx_number = match provider.block_body_indices(self.to_block)? {
Some(body) => body,
None => return Ok(None),
}
.last_tx_num();
let range = from_tx_number..=to_tx_number;
if range.is_empty() {
return Ok(None)
}
Ok(Some(range))
}
}
/// Segment pruning output, see [Segment::prune].
#[derive(Debug, Clone, Copy)]
pub(crate) struct PruneOutput {
/// `true` if pruning has been completed up to the target block, and `false` if there's more
/// data to prune in further runs.
pub(crate) done: bool,
/// Number of entries pruned, i.e. deleted from the database.
pub(crate) pruned: usize,
/// Pruning checkpoint to save to database, if any.
pub(crate) checkpoint: Option<PruneOutputCheckpoint>,
}
impl PruneOutput {
/// Returns a [PruneOutput] with `done = true`, `pruned = 0` and `checkpoint = None`.
/// Use when no pruning is needed.
pub(crate) fn done() -> Self {
Self { done: true, pruned: 0, checkpoint: None }
}
}
#[derive(Debug, Clone, Copy)]
pub(crate) struct PruneOutputCheckpoint {
/// Highest pruned block number. If it's [None], the pruning for block `0` is not finished yet.
pub(crate) block_number: Option<BlockNumber>,
/// Highest pruned transaction number, if applicable.
pub(crate) tx_number: Option<TxNumber>,
}
impl PruneOutputCheckpoint {
/// Converts [PruneOutputCheckpoint] to [PruneCheckpoint] with the provided [PruneMode]
pub(crate) fn as_prune_checkpoint(&self, prune_mode: PruneMode) -> PruneCheckpoint {
PruneCheckpoint { block_number: self.block_number, tx_number: self.tx_number, prune_mode }
}
}

View File

@ -0,0 +1,189 @@
use crate::{
segments::{PruneInput, PruneOutput, PruneOutputCheckpoint, Segment},
PrunerError,
};
use reth_db::{database::Database, tables};
use reth_interfaces::RethResult;
use reth_primitives::{PruneCheckpoint, PruneSegment};
use reth_provider::{DatabaseProviderRW, PruneCheckpointWriter, TransactionsProvider};
use tracing::{instrument, trace};
#[derive(Default)]
#[non_exhaustive]
pub(crate) struct Receipts;
impl Segment for Receipts {
const SEGMENT: PruneSegment = PruneSegment::Receipts;
#[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
fn prune<DB: Database>(
&self,
provider: &DatabaseProviderRW<'_, DB>,
input: PruneInput,
) -> Result<PruneOutput, PrunerError> {
let tx_range = match input.get_next_tx_num_range_from_checkpoint(provider, Self::SEGMENT)? {
Some(range) => range,
None => {
trace!(target: "pruner", "No receipts to prune");
return Ok(PruneOutput::done())
}
};
let tx_range_end = *tx_range.end();
let mut last_pruned_transaction = tx_range_end;
let (pruned, done) = provider.prune_table_with_range::<tables::Receipts>(
tx_range,
input.delete_limit,
|_| false,
|row| last_pruned_transaction = row.0,
)?;
trace!(target: "pruner", %pruned, %done, "Pruned receipts");
let last_pruned_block = provider
.transaction_block(last_pruned_transaction)?
.ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
// If there's more receipts to prune, set the checkpoint block number to previous,
// so we could finish pruning its receipts on the next run.
.checked_sub(if done { 0 } else { 1 });
Ok(PruneOutput {
done,
pruned,
checkpoint: Some(PruneOutputCheckpoint {
block_number: last_pruned_block,
tx_number: Some(last_pruned_transaction),
}),
})
}
fn save_checkpoint<DB: Database>(
&self,
provider: &DatabaseProviderRW<'_, DB>,
checkpoint: PruneCheckpoint,
) -> RethResult<()> {
provider.save_prune_checkpoint(Self::SEGMENT, checkpoint)?;
// `PruneSegment::Receipts` overrides `PruneSegmnt::ContractLogs`, so we can preemptively
// limit their pruning start point.
provider.save_prune_checkpoint(PruneSegment::ContractLogs, checkpoint)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::segments::{PruneInput, PruneOutput, Receipts, Segment};
use assert_matches::assert_matches;
use itertools::{
FoldWhile::{Continue, Done},
Itertools,
};
use reth_db::tables;
use reth_interfaces::test_utils::{
generators,
generators::{random_block_range, random_receipt},
};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, TxNumber, B256};
use reth_provider::PruneCheckpointReader;
use reth_stages::test_utils::TestTransaction;
use std::ops::Sub;
#[test]
fn prune() {
let tx = TestTransaction::default();
let mut rng = generators::rng();
let blocks = random_block_range(&mut rng, 1..=10, B256::ZERO, 2..3);
tx.insert_blocks(blocks.iter(), None).expect("insert blocks");
let mut receipts = Vec::new();
for block in &blocks {
for transaction in &block.body {
receipts
.push((receipts.len() as u64, random_receipt(&mut rng, transaction, Some(0))));
}
}
tx.insert_receipts(receipts.clone()).expect("insert receipts");
assert_eq!(
tx.table::<tables::Transactions>().unwrap().len(),
blocks.iter().map(|block| block.body.len()).sum::<usize>()
);
assert_eq!(
tx.table::<tables::Transactions>().unwrap().len(),
tx.table::<tables::Receipts>().unwrap().len()
);
let test_prune = |to_block: BlockNumber, expected_result: (bool, usize)| {
let prune_mode = PruneMode::Before(to_block);
let input = PruneInput { to_block, delete_limit: 10 };
let segment = Receipts::default();
let next_tx_number_to_prune = tx
.inner()
.get_prune_checkpoint(Receipts::SEGMENT)
.unwrap()
.and_then(|checkpoint| checkpoint.tx_number)
.map(|tx_number| tx_number + 1)
.unwrap_or_default();
let last_pruned_tx_number = blocks
.iter()
.take(to_block as usize)
.map(|block| block.body.len())
.sum::<usize>()
.min(next_tx_number_to_prune as usize + input.delete_limit)
.sub(1);
let last_pruned_block_number = blocks
.iter()
.fold_while((0, 0), |(_, mut tx_count), block| {
tx_count += block.body.len();
if tx_count > last_pruned_tx_number {
Done((block.number, tx_count))
} else {
Continue((block.number, tx_count))
}
})
.into_inner()
.0;
let provider = tx.inner_rw();
let result = segment.prune(&provider, input).unwrap();
assert_matches!(
result,
PruneOutput {done, pruned, checkpoint: Some(_)}
if (done, pruned) == expected_result
);
segment
.save_checkpoint(
&provider,
result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
)
.unwrap();
provider.commit().expect("commit");
let last_pruned_block_number =
last_pruned_block_number.checked_sub(if result.done { 0 } else { 1 });
assert_eq!(
tx.table::<tables::Receipts>().unwrap().len(),
receipts.len() - (last_pruned_tx_number + 1)
);
assert_eq!(
tx.inner().get_prune_checkpoint(Receipts::SEGMENT).unwrap(),
Some(PruneCheckpoint {
block_number: last_pruned_block_number,
tx_number: Some(last_pruned_tx_number as TxNumber),
prune_mode
})
);
};
test_prune(6, (false, 10));
test_prune(6, (true, 2));
test_prune(10, (true, 8));
}
}

View File

@ -12,7 +12,7 @@ use reth_interfaces::{
};
use reth_primitives::{
Address, Block, BlockNumber, Bloom, ChainSpec, Hardfork, Header, PruneMode, PruneModes,
PrunePartError, Receipt, ReceiptWithBloom, Receipts, TransactionSigned, B256,
PruneSegmentError, Receipt, ReceiptWithBloom, Receipts, TransactionSigned, B256,
MINIMUM_PRUNING_DISTANCE, U256,
};
use reth_provider::{
@ -394,7 +394,7 @@ impl<'a> EVMProcessor<'a> {
fn prune_receipts(
&mut self,
receipts: &mut Vec<Option<Receipt>>,
) -> Result<(), PrunePartError> {
) -> Result<(), PruneSegmentError> {
let (first_block, tip) = match self.first_block.zip(self.tip) {
Some((block, tip)) => (block, tip),
_ => return Ok(()),
@ -404,7 +404,7 @@ impl<'a> EVMProcessor<'a> {
// Block receipts should not be retained
if self.prune_modes.receipts == Some(PruneMode::Full) ||
// [`PrunePart::Receipts`] takes priority over [`PrunePart::ContractLogs`]
// [`PruneSegment::Receipts`] takes priority over [`PruneSegment::ContractLogs`]
self.prune_modes.should_prune_receipts(block_number, tip)
{
receipts.clear();
@ -412,7 +412,7 @@ impl<'a> EVMProcessor<'a> {
}
// All receipts from the last 128 blocks are required for blockchain tree, even with
// [`PrunePart::ContractLogs`].
// [`PruneSegment::ContractLogs`].
let prunable_receipts =
PruneMode::Distance(MINIMUM_PRUNING_DISTANCE).should_prune(block_number, tip);
if !prunable_receipts {

View File

@ -51,12 +51,12 @@ pub struct SnapshotTargets {
}
impl SnapshotTargets {
/// Returns `true` if any of the data parts has targets, i.e. is [`Some`].
/// Returns `true` if any of the targets are [Some].
pub fn any(&self) -> bool {
self.headers.is_some() || self.receipts.is_some() || self.transactions.is_some()
}
/// Returns `true` if all targets are either [`None`] or multiple of `block_interval`.
/// Returns `true` if all targets are either [None] or multiple of `block_interval`.
fn is_multiple_of_block_interval(&self, block_interval: u64) -> bool {
[
self.headers.as_ref(),

View File

@ -51,7 +51,7 @@ pub enum StageError {
},
/// Invalid pruning configuration
#[error(transparent)]
PruningConfiguration(#[from] reth_primitives::PrunePartError),
PruningConfiguration(#[from] reth_primitives::PruneSegmentError),
/// Invalid checkpoint passed to the stage
#[error("Invalid stage checkpoint: {0}")]
StageCheckpoint(u64),

View File

@ -2,7 +2,7 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}
use reth_db::database::Database;
use reth_primitives::{
stage::{StageCheckpoint, StageId},
PruneCheckpoint, PruneModes, PrunePart,
PruneCheckpoint, PruneModes, PruneSegment,
};
use reth_provider::{
AccountExtReader, DatabaseProviderRW, HistoryWriter, PruneCheckpointReader,
@ -56,9 +56,9 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
// Save prune checkpoint only if we don't have one already.
// Otherwise, pruner may skip the unpruned range of blocks.
if provider.get_prune_checkpoint(PrunePart::AccountHistory)?.is_none() {
if provider.get_prune_checkpoint(PruneSegment::AccountHistory)?.is_none() {
provider.save_prune_checkpoint(
PrunePart::AccountHistory,
PruneSegment::AccountHistory,
PruneCheckpoint {
block_number: Some(target_prunable_block),
tx_number: None,

View File

@ -2,7 +2,7 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}
use reth_db::{database::Database, models::BlockNumberAddress};
use reth_primitives::{
stage::{StageCheckpoint, StageId},
PruneCheckpoint, PruneModes, PrunePart,
PruneCheckpoint, PruneModes, PruneSegment,
};
use reth_provider::{
DatabaseProviderRW, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter, StorageReader,
@ -55,9 +55,9 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
// Save prune checkpoint only if we don't have one already.
// Otherwise, pruner may skip the unpruned range of blocks.
if provider.get_prune_checkpoint(PrunePart::StorageHistory)?.is_none() {
if provider.get_prune_checkpoint(PruneSegment::StorageHistory)?.is_none() {
provider.save_prune_checkpoint(
PrunePart::StorageHistory,
PruneSegment::StorageHistory,
PruneCheckpoint {
block_number: Some(target_prunable_block),
tx_number: None,

View File

@ -11,7 +11,7 @@ use reth_interfaces::consensus;
use reth_primitives::{
keccak256,
stage::{EntitiesCheckpoint, StageCheckpoint, StageId},
Address, PrunePart, TransactionSignedNoHash, TxNumber,
Address, PruneSegment, TransactionSignedNoHash, TxNumber,
};
use reth_provider::{
BlockReader, DatabaseProviderRW, HeaderProvider, ProviderError, PruneCheckpointReader,
@ -211,7 +211,7 @@ fn stage_checkpoint<DB: Database>(
provider: &DatabaseProviderRW<'_, &DB>,
) -> Result<EntitiesCheckpoint, StageError> {
let pruned_entries = provider
.get_prune_checkpoint(PrunePart::SenderRecovery)?
.get_prune_checkpoint(PruneSegment::SenderRecovery)?
.and_then(|checkpoint| checkpoint.tx_number)
.unwrap_or_default();
Ok(EntitiesCheckpoint {
@ -403,7 +403,7 @@ mod tests {
let provider = tx.inner_rw();
provider
.save_prune_checkpoint(
PrunePart::SenderRecovery,
PruneSegment::SenderRecovery,
PruneCheckpoint {
block_number: Some(max_pruned_block),
tx_number: Some(

View File

@ -12,7 +12,7 @@ use reth_interfaces::provider::ProviderError;
use reth_primitives::{
keccak256,
stage::{EntitiesCheckpoint, StageCheckpoint, StageId},
PruneCheckpoint, PruneModes, PrunePart, TransactionSignedNoHash, TxNumber, B256,
PruneCheckpoint, PruneModes, PruneSegment, TransactionSignedNoHash, TxNumber, B256,
};
use reth_provider::{
BlockReader, DatabaseProviderRW, PruneCheckpointReader, PruneCheckpointWriter,
@ -66,14 +66,14 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
// Save prune checkpoint only if we don't have one already.
// Otherwise, pruner may skip the unpruned range of blocks.
if provider.get_prune_checkpoint(PrunePart::TransactionLookup)?.is_none() {
if provider.get_prune_checkpoint(PruneSegment::TransactionLookup)?.is_none() {
let target_prunable_tx_number = provider
.block_body_indices(target_prunable_block)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(target_prunable_block))?
.last_tx_num();
provider.save_prune_checkpoint(
PrunePart::TransactionLookup,
PruneSegment::TransactionLookup,
PruneCheckpoint {
block_number: Some(target_prunable_block),
tx_number: Some(target_prunable_tx_number),
@ -213,7 +213,7 @@ fn stage_checkpoint<DB: Database>(
provider: &DatabaseProviderRW<'_, &DB>,
) -> Result<EntitiesCheckpoint, StageError> {
let pruned_entries = provider
.get_prune_checkpoint(PrunePart::TransactionLookup)?
.get_prune_checkpoint(PruneSegment::TransactionLookup)?
.and_then(|checkpoint| checkpoint.tx_number)
// `+1` is needed because `TxNumber` is 0-indexed
.map(|tx_number| tx_number + 1)
@ -434,7 +434,7 @@ mod tests {
let provider = tx.inner_rw();
provider
.save_prune_checkpoint(
PrunePart::TransactionLookup,
PruneSegment::TransactionLookup,
PruneCheckpoint {
block_number: Some(max_pruned_block),
tx_number: Some(

View File

@ -38,7 +38,7 @@ use reth_primitives::{
stage::StageCheckpoint,
trie::{BranchNodeCompact, StorageTrieEntry, StoredNibbles, StoredNibblesSubKey},
Account, Address, BlockHash, BlockNumber, Bytecode, Header, IntegerList, PruneCheckpoint,
PrunePart, Receipt, StorageEntry, TransactionSignedNoHash, TxHash, TxNumber, B256,
PruneSegment, Receipt, StorageEntry, TransactionSignedNoHash, TxHash, TxNumber, B256,
};
/// Enum for the types of tables present in libmdbx.
@ -417,8 +417,8 @@ table!(
);
table!(
/// Stores the highest pruned block number and prune mode of each prune part.
( PruneCheckpoints ) PrunePart | PruneCheckpoint
/// Stores the highest pruned block number and prune mode of each prune segment.
( PruneCheckpoints ) PruneSegment | PruneCheckpoint
);
/// Alias Types

View File

@ -6,7 +6,7 @@ use crate::{
use reth_codecs::Compact;
use reth_primitives::{
trie::{StoredNibbles, StoredNibblesSubKey},
Address, PrunePart, B256,
Address, PruneSegment, B256,
};
pub mod accounts;
@ -136,7 +136,7 @@ impl Decode for StoredNibblesSubKey {
}
}
impl Encode for PrunePart {
impl Encode for PruneSegment {
type Encoded = [u8; 1];
fn encode(self) -> Self::Encoded {
@ -146,7 +146,7 @@ impl Encode for PrunePart {
}
}
impl Decode for PrunePart {
impl Decode for PruneSegment {
fn decode<B: AsRef<[u8]>>(value: B) -> Result<Self, DatabaseError> {
let buf = value.as_ref();
Ok(Self::from_compact(buf, buf.len()).0)

View File

@ -10,7 +10,7 @@ use reth_interfaces::{db::LogLevel, RethError, RethResult};
use reth_primitives::{
stage::{StageCheckpoint, StageId},
Address, Block, BlockHash, BlockHashOrNumber, BlockNumber, BlockWithSenders, ChainInfo,
ChainSpec, Header, PruneCheckpoint, PrunePart, Receipt, SealedBlock, SealedHeader,
ChainSpec, Header, PruneCheckpoint, PruneSegment, Receipt, SealedBlock, SealedHeader,
TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber, Withdrawal,
B256, U256,
};
@ -104,14 +104,14 @@ impl<DB: Database> ProviderFactory<DB> {
block_number += 1;
let account_history_prune_checkpoint =
provider.get_prune_checkpoint(PrunePart::AccountHistory)?;
provider.get_prune_checkpoint(PruneSegment::AccountHistory)?;
let storage_history_prune_checkpoint =
provider.get_prune_checkpoint(PrunePart::StorageHistory)?;
provider.get_prune_checkpoint(PruneSegment::StorageHistory)?;
let mut state_provider = HistoricalStateProvider::new(provider.into_tx(), block_number);
// If we pruned account or storage history, we can't return state on every historical block.
// Instead, we should cap it at the latest prune checkpoint for corresponding prune part.
// Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
if let Some(prune_checkpoint_block_number) =
account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
{
@ -404,8 +404,8 @@ where
}
impl<DB: Database> PruneCheckpointReader for ProviderFactory<DB> {
fn get_prune_checkpoint(&self, part: PrunePart) -> RethResult<Option<PruneCheckpoint>> {
self.provider()?.get_prune_checkpoint(part)
fn get_prune_checkpoint(&self, segment: PruneSegment) -> RethResult<Option<PruneCheckpoint>> {
self.provider()?.get_prune_checkpoint(segment)
}
}

View File

@ -31,8 +31,8 @@ use reth_primitives::{
stage::{StageCheckpoint, StageId},
trie::Nibbles,
Account, Address, Block, BlockHash, BlockHashOrNumber, BlockNumber, BlockWithSenders,
ChainInfo, ChainSpec, Hardfork, Head, Header, PruneCheckpoint, PruneModes, PrunePart, Receipt,
SealedBlock, SealedBlockWithSenders, SealedHeader, StorageEntry, TransactionMeta,
ChainInfo, ChainSpec, Hardfork, Head, Header, PruneCheckpoint, PruneModes, PruneSegment,
Receipt, SealedBlock, SealedBlockWithSenders, SealedHeader, StorageEntry, TransactionMeta,
TransactionSigned, TransactionSignedEcRecovered, TransactionSignedNoHash, TxHash, TxNumber,
Withdrawal, B256, U256,
};
@ -2130,17 +2130,17 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> BlockWriter for DatabaseProvider<'
}
impl<'this, TX: DbTx<'this>> PruneCheckpointReader for DatabaseProvider<'this, TX> {
fn get_prune_checkpoint(&self, part: PrunePart) -> RethResult<Option<PruneCheckpoint>> {
Ok(self.tx.get::<tables::PruneCheckpoints>(part)?)
fn get_prune_checkpoint(&self, segment: PruneSegment) -> RethResult<Option<PruneCheckpoint>> {
Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
}
}
impl<'this, TX: DbTxMut<'this>> PruneCheckpointWriter for DatabaseProvider<'this, TX> {
fn save_prune_checkpoint(
&self,
part: PrunePart,
segment: PruneSegment,
checkpoint: PruneCheckpoint,
) -> RethResult<()> {
Ok(self.tx.put::<tables::PruneCheckpoints>(part, checkpoint)?)
Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
}
}

View File

@ -15,9 +15,9 @@ use reth_interfaces::{
use reth_primitives::{
stage::{StageCheckpoint, StageId},
Account, Address, Block, BlockHash, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumber,
BlockNumberOrTag, BlockWithSenders, ChainInfo, ChainSpec, Header, PruneCheckpoint, PrunePart,
Receipt, SealedBlock, SealedBlockWithSenders, SealedHeader, TransactionMeta, TransactionSigned,
TransactionSignedNoHash, TxHash, TxNumber, Withdrawal, B256, U256,
BlockNumberOrTag, BlockWithSenders, ChainInfo, ChainSpec, Header, PruneCheckpoint,
PruneSegment, Receipt, SealedBlock, SealedBlockWithSenders, SealedHeader, TransactionMeta,
TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber, Withdrawal, B256, U256,
};
use revm::primitives::{BlockEnv, CfgEnv};
use std::{
@ -466,8 +466,8 @@ where
DB: Database,
Tree: Send + Sync,
{
fn get_prune_checkpoint(&self, part: PrunePart) -> RethResult<Option<PruneCheckpoint>> {
self.database.provider()?.get_prune_checkpoint(part)
fn get_prune_checkpoint(&self, segment: PruneSegment) -> RethResult<Option<PruneCheckpoint>> {
self.database.provider()?.get_prune_checkpoint(segment)
}
}

View File

@ -319,11 +319,11 @@ delegate_provider_impls!(HistoricalStateProvider<'a, TX> where [TX: DbTx<'a>]);
#[derive(Clone, Copy, Debug, Default)]
pub struct LowestAvailableBlocks {
/// Lowest block number at which the account history is available. It may not be available if
/// [reth_primitives::PrunePart::AccountHistory] was pruned.
/// [reth_primitives::PruneSegment::AccountHistory] was pruned.
/// [Option::None] means all history is available.
pub account_history_block_number: Option<BlockNumber>,
/// Lowest block number at which the storage history is available. It may not be available if
/// [reth_primitives::PrunePart::StorageHistory] was pruned.
/// [reth_primitives::PruneSegment::StorageHistory] was pruned.
/// [Option::None] means all history is available.
pub storage_history_block_number: Option<BlockNumber>,
}

View File

@ -11,9 +11,9 @@ use reth_interfaces::RethResult;
use reth_primitives::{
stage::{StageCheckpoint, StageId},
Account, Address, Block, BlockHash, BlockHashOrNumber, BlockId, BlockNumber, Bytecode, Bytes,
ChainInfo, ChainSpec, Header, PruneCheckpoint, PrunePart, Receipt, SealedBlock, SealedHeader,
StorageKey, StorageValue, TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash,
TxNumber, B256, KECCAK_EMPTY, MAINNET, U256,
ChainInfo, ChainSpec, Header, PruneCheckpoint, PruneSegment, Receipt, SealedBlock,
SealedHeader, StorageKey, StorageValue, TransactionMeta, TransactionSigned,
TransactionSignedNoHash, TxHash, TxNumber, B256, KECCAK_EMPTY, MAINNET, U256,
};
use revm::primitives::{BlockEnv, CfgEnv};
use std::{
@ -388,7 +388,7 @@ impl WithdrawalsProvider for NoopProvider {
}
impl PruneCheckpointReader for NoopProvider {
fn get_prune_checkpoint(&self, _part: PrunePart) -> RethResult<Option<PruneCheckpoint>> {
fn get_prune_checkpoint(&self, _segment: PruneSegment) -> RethResult<Option<PruneCheckpoint>> {
Ok(None)
}
}

View File

@ -1,17 +1,20 @@
use reth_interfaces::RethResult;
use reth_primitives::{PruneCheckpoint, PrunePart};
use reth_primitives::{PruneCheckpoint, PruneSegment};
/// The trait for fetching prune checkpoint related data.
#[auto_impl::auto_impl(&, Arc)]
pub trait PruneCheckpointReader: Send + Sync {
/// Fetch the checkpoint for the given prune part.
fn get_prune_checkpoint(&self, part: PrunePart) -> RethResult<Option<PruneCheckpoint>>;
/// Fetch the checkpoint for the given prune segment.
fn get_prune_checkpoint(&self, segment: PruneSegment) -> RethResult<Option<PruneCheckpoint>>;
}
/// The trait for updating prune checkpoint related data.
#[auto_impl::auto_impl(&, Arc)]
pub trait PruneCheckpointWriter: Send + Sync {
/// Save prune checkpoint.
fn save_prune_checkpoint(&self, part: PrunePart, checkpoint: PruneCheckpoint)
-> RethResult<()>;
fn save_prune_checkpoint(
&self,
segment: PruneSegment,
checkpoint: PruneCheckpoint,
) -> RethResult<()>;
}

View File

@ -5511,14 +5511,14 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "rate(reth_pruner_parts_duration_seconds_sum{instance=~\"$instance\"}[$__rate_interval]) / rate(reth_pruner_parts_duration_seconds_count{instance=~\"$instance\"}[$__rate_interval])",
"expr": "rate(reth_pruner_segments_duration_seconds_sum{instance=~\"$instance\"}[$__rate_interval]) / rate(reth_pruner_segments_duration_seconds_count{instance=~\"$instance\"}[$__rate_interval])",
"instant": false,
"legendFormat": "{{part}}",
"legendFormat": "{{segment}}",
"range": true,
"refId": "A"
}
],
"title": "Pruner duration, per part",
"title": "Pruner duration, per segment",
"type": "timeseries"
},
{