mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 19:09:54 +00:00
feat(pruner): prune receipts based on log emitters during live sync (#4140)
This commit is contained in:
@ -24,24 +24,61 @@ impl ContractLogsPruneConfig {
|
||||
/// Given the `tip` block number, consolidates the structure so it can easily be queried for
|
||||
/// filtering across a range of blocks.
|
||||
///
|
||||
/// The [`BlockNumber`] key of the map should be viewed as `PruneMode::Before(block)`.
|
||||
/// Example:
|
||||
///
|
||||
/// `{ addrA: Before(872), addrB: Before(500), addrC: Distance(128) }`
|
||||
///
|
||||
/// for `tip: 1000`, gets transformed to a map such as:
|
||||
///
|
||||
/// `{ 500: [addrB], 872: [addrA, addrC] }`
|
||||
///
|
||||
/// The [`BlockNumber`] key of the new map should be viewed as `PruneMode::Before(block)`, which
|
||||
/// makes the previous result equivalent to
|
||||
///
|
||||
/// `{ Before(500): [addrB], Before(872): [addrA, addrC] }`
|
||||
pub fn group_by_block(
|
||||
&self,
|
||||
tip: BlockNumber,
|
||||
pruned_block: Option<BlockNumber>,
|
||||
) -> Result<BTreeMap<BlockNumber, Vec<&Address>>, PrunePartError> {
|
||||
let mut map = BTreeMap::new();
|
||||
let pruned_block = pruned_block.unwrap_or_default();
|
||||
|
||||
for (address, mode) in self.0.iter() {
|
||||
// Getting `None`, means that there is nothing to prune yet, so we need it to include in
|
||||
// the BTreeMap (block = 0), otherwise it will be excluded.
|
||||
// Reminder that this BTreeMap works as an inclusion list that excludes (prunes) all
|
||||
// other receipts.
|
||||
let block = mode
|
||||
.prune_target_block(tip, MINIMUM_PRUNING_DISTANCE, PrunePart::ContractLogs)?
|
||||
.map(|(block, _)| block)
|
||||
.unwrap_or_default();
|
||||
let block = (pruned_block + 1).max(
|
||||
mode.prune_target_block(tip, MINIMUM_PRUNING_DISTANCE, PrunePart::ContractLogs)?
|
||||
.map(|(block, _)| block)
|
||||
.unwrap_or_default(),
|
||||
);
|
||||
|
||||
map.entry(block).or_insert_with(Vec::new).push(address)
|
||||
}
|
||||
Ok(map)
|
||||
}
|
||||
|
||||
/// Returns the lowest block where we start filtering logs which use `PruneMode::Distance(_)`.
|
||||
pub fn lowest_block_with_distance(
|
||||
&self,
|
||||
tip: BlockNumber,
|
||||
pruned_block: Option<BlockNumber>,
|
||||
) -> Result<Option<BlockNumber>, PrunePartError> {
|
||||
let pruned_block = pruned_block.unwrap_or_default();
|
||||
let mut lowest = None;
|
||||
|
||||
for (_, mode) in self.0.iter() {
|
||||
if let PruneMode::Distance(_) = mode {
|
||||
if let Some((block, _)) =
|
||||
mode.prune_target_block(tip, MINIMUM_PRUNING_DISTANCE, PrunePart::ContractLogs)?
|
||||
{
|
||||
lowest = Some(lowest.unwrap_or(u64::MAX).min(block));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(lowest.map(|lowest| lowest.max(pruned_block)))
|
||||
}
|
||||
}
|
||||
|
||||
@ -13,6 +13,7 @@ use reth_db::{
|
||||
};
|
||||
use reth_primitives::{
|
||||
BlockNumber, ChainSpec, PruneCheckpoint, PruneMode, PruneModes, PrunePart, TxNumber,
|
||||
MINIMUM_PRUNING_DISTANCE,
|
||||
};
|
||||
use reth_provider::{
|
||||
BlockReader, DatabaseProviderRW, ProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
|
||||
@ -102,6 +103,15 @@ impl<DB: Database> Pruner<DB> {
|
||||
.record(part_start.elapsed())
|
||||
}
|
||||
|
||||
if !self.modes.contract_logs_filter.is_empty() {
|
||||
let part_start = Instant::now();
|
||||
self.prune_receipts_by_logs(&provider, tip_block_number)?;
|
||||
self.metrics
|
||||
.get_prune_part_metrics(PrunePart::ContractLogs)
|
||||
.duration_seconds
|
||||
.record(part_start.elapsed())
|
||||
}
|
||||
|
||||
if let Some((to_block, prune_mode)) =
|
||||
self.modes.prune_target_block_transaction_lookup(tip_block_number)?
|
||||
{
|
||||
@ -251,6 +261,7 @@ impl<DB: Database> Pruner<DB> {
|
||||
"Pruned receipts"
|
||||
);
|
||||
},
|
||||
|_| false,
|
||||
)?;
|
||||
|
||||
provider.save_prune_checkpoint(
|
||||
@ -258,6 +269,160 @@ impl<DB: Database> Pruner<DB> {
|
||||
PruneCheckpoint { block_number: to_block, prune_mode },
|
||||
)?;
|
||||
|
||||
// `PrunePart::Receipts` overrides `PrunePart::ContractLogs`, so we can preemptively
|
||||
// limit their pruning start point.
|
||||
provider.save_prune_checkpoint(
|
||||
PrunePart::ContractLogs,
|
||||
PruneCheckpoint { block_number: to_block, prune_mode },
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Prune receipts up to the provided block by filtering logs. Works as in inclusion list, and
|
||||
/// removes every receipt not belonging to it.
|
||||
#[instrument(level = "trace", skip(self, provider), target = "pruner")]
|
||||
fn prune_receipts_by_logs(
|
||||
&self,
|
||||
provider: &DatabaseProviderRW<'_, DB>,
|
||||
tip_block_number: BlockNumber,
|
||||
) -> PrunerResult {
|
||||
// Contract log filtering removes every receipt possible except the ones in the list. So,
|
||||
// for the other receipts it's as if they had a `PruneMode::Distance()` of 128.
|
||||
let to_block = PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)
|
||||
.prune_target_block(
|
||||
tip_block_number,
|
||||
MINIMUM_PRUNING_DISTANCE,
|
||||
PrunePart::ContractLogs,
|
||||
)?
|
||||
.map(|(bn, _)| bn)
|
||||
.unwrap_or_default();
|
||||
|
||||
// Figure out what receipts have already been pruned, so we can have an accurate
|
||||
// `address_filter`
|
||||
let pruned = provider
|
||||
.get_prune_checkpoint(PrunePart::ContractLogs)?
|
||||
.map(|checkpoint| checkpoint.block_number);
|
||||
|
||||
let address_filter =
|
||||
self.modes.contract_logs_filter.group_by_block(tip_block_number, pruned)?;
|
||||
|
||||
// Splits all transactions in different block ranges. Each block range will have its own
|
||||
// filter address list and will check it while going through the table
|
||||
//
|
||||
// Example:
|
||||
// For an `address_filter` such as:
|
||||
// { block9: [a1, a2], block20: [a3, a4, a5] }
|
||||
//
|
||||
// The following structures will be created in the exact order as showed:
|
||||
// `block_ranges`: [
|
||||
// (block0, block8, 0 addresses),
|
||||
// (block9, block19, 2 addresses),
|
||||
// (block20, to_block, 5 addresses)
|
||||
// ]
|
||||
// `filtered_addresses`: [a1, a2, a3, a4, a5]
|
||||
//
|
||||
// The first range will delete all receipts between block0 - block8
|
||||
// The second range will delete all receipts between block9 - 19, except the ones with
|
||||
// emitter logs from these addresses: [a1, a2].
|
||||
// The third range will delete all receipts between block20 - to_block, except the ones with
|
||||
// emitter logs from these addresses: [a1, a2, a3, a4, a5]
|
||||
let mut block_ranges = vec![];
|
||||
let mut blocks_iter = address_filter.iter().peekable();
|
||||
let mut filtered_addresses = vec![];
|
||||
|
||||
while let Some((start_block, addresses)) = blocks_iter.next() {
|
||||
filtered_addresses.extend_from_slice(addresses);
|
||||
|
||||
// This will clear all receipts before the first appearance of a contract log
|
||||
if block_ranges.is_empty() {
|
||||
block_ranges.push((0, *start_block - 1, 0));
|
||||
}
|
||||
|
||||
let end_block =
|
||||
blocks_iter.peek().map(|(next_block, _)| *next_block - 1).unwrap_or(to_block);
|
||||
|
||||
// Addresses in lower block ranges, are still included in the inclusion list for future
|
||||
// ranges.
|
||||
block_ranges.push((*start_block, end_block, filtered_addresses.len()));
|
||||
}
|
||||
|
||||
for (start_block, end_block, num_addresses) in block_ranges {
|
||||
let range = match self.get_next_tx_num_range_from_checkpoint(
|
||||
provider,
|
||||
PrunePart::ContractLogs,
|
||||
end_block,
|
||||
)? {
|
||||
Some(range) => range,
|
||||
None => {
|
||||
trace!(
|
||||
target: "pruner",
|
||||
block_range = format!("{start_block}..={end_block}"),
|
||||
"No receipts to prune."
|
||||
);
|
||||
continue
|
||||
}
|
||||
};
|
||||
|
||||
let total = range.clone().count();
|
||||
let mut processed = 0;
|
||||
|
||||
provider.prune_table_with_iterator_in_batches::<tables::Receipts>(
|
||||
range,
|
||||
self.batch_sizes.receipts,
|
||||
|rows| {
|
||||
processed += rows;
|
||||
trace!(
|
||||
target: "pruner",
|
||||
%rows,
|
||||
block_range = format!("{start_block}..={end_block}"),
|
||||
progress = format!("{:.1}%", 100.0 * processed as f64 / total as f64),
|
||||
"Pruned receipts"
|
||||
);
|
||||
},
|
||||
|receipt| {
|
||||
num_addresses > 0 &&
|
||||
receipt.logs.iter().any(|log| {
|
||||
filtered_addresses[..num_addresses].contains(&&log.address)
|
||||
})
|
||||
},
|
||||
)?;
|
||||
|
||||
// If this is the last block range, avoid writing an unused checkpoint
|
||||
if end_block != to_block {
|
||||
// This allows us to query for the transactions in the next block range with
|
||||
// [`get_next_tx_num_range_from_checkpoint`]. It's just a temporary intermediate
|
||||
// checkpoint, which should be adjusted in the end.
|
||||
provider.save_prune_checkpoint(
|
||||
PrunePart::ContractLogs,
|
||||
PruneCheckpoint {
|
||||
block_number: end_block,
|
||||
prune_mode: PruneMode::Before(end_block + 1),
|
||||
},
|
||||
)?;
|
||||
}
|
||||
}
|
||||
|
||||
// If there are contracts using `PruneMode::Distance(_)` there will be receipts before
|
||||
// `to_block` that become eligible to be pruned in future runs. Therefore, our
|
||||
// checkpoint is not actually `to_block`, but the `lowest_block_with_distance` from any
|
||||
// contract. This ensures that in future pruner runs we can
|
||||
// prune all these receipts between the previous `lowest_block_with_distance` and the new
|
||||
// one using `get_next_tx_num_range_from_checkpoint`.
|
||||
let checkpoint_block = self
|
||||
.modes
|
||||
.contract_logs_filter
|
||||
.lowest_block_with_distance(tip_block_number, pruned)?
|
||||
.unwrap_or(to_block);
|
||||
|
||||
provider.save_prune_checkpoint(
|
||||
PrunePart::ContractLogs,
|
||||
PruneCheckpoint {
|
||||
block_number: checkpoint_block - 1,
|
||||
prune_mode: PruneMode::Before(checkpoint_block),
|
||||
},
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@ -662,7 +662,7 @@ impl PostState {
|
||||
let contract_log_pruner = self
|
||||
.prune_modes
|
||||
.contract_logs_filter
|
||||
.group_by_block(tip)
|
||||
.group_by_block(tip, None)
|
||||
.map_err(|e| Error::Custom(e.to_string()))?;
|
||||
|
||||
// Empty implies that there is going to be
|
||||
|
||||
@ -629,27 +629,32 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
|
||||
&self,
|
||||
keys: impl IntoIterator<Item = T::Key>,
|
||||
) -> std::result::Result<usize, DatabaseError> {
|
||||
self.prune_table_with_iterator_in_batches::<T>(keys, usize::MAX, |_| {})
|
||||
self.prune_table_with_iterator_in_batches::<T>(keys, usize::MAX, |_| {}, |_| false)
|
||||
}
|
||||
|
||||
/// Prune the table for the specified pre-sorted key iterator, calling `chunk_callback` after
|
||||
/// every `batch_size` pruned rows with number of total rows pruned.
|
||||
///
|
||||
/// `skip_filter` can be used to skip pruning certain elements.
|
||||
///
|
||||
/// Returns number of rows pruned.
|
||||
pub fn prune_table_with_iterator_in_batches<T: Table>(
|
||||
&self,
|
||||
keys: impl IntoIterator<Item = T::Key>,
|
||||
batch_size: usize,
|
||||
mut batch_callback: impl FnMut(usize),
|
||||
skip_filter: impl Fn(&T::Value) -> bool,
|
||||
) -> std::result::Result<usize, DatabaseError> {
|
||||
let mut cursor = self.tx.cursor_write::<T>()?;
|
||||
let mut deleted = 0;
|
||||
|
||||
for key in keys {
|
||||
if cursor.seek_exact(key)?.is_some() {
|
||||
cursor.delete_current()?;
|
||||
if let Some((_, value)) = cursor.seek_exact(key)? {
|
||||
if !skip_filter(&value) {
|
||||
cursor.delete_current()?;
|
||||
deleted += 1;
|
||||
}
|
||||
}
|
||||
deleted += 1;
|
||||
|
||||
if deleted % batch_size == 0 {
|
||||
batch_callback(deleted);
|
||||
|
||||
Reference in New Issue
Block a user