Revert "chore(stages): reduce the progress logging " (#11698)

This commit is contained in:
Matthias Seitz
2024-10-14 10:29:58 +02:00
committed by GitHub
parent 661b260f61
commit 176496189d
5 changed files with 42 additions and 74 deletions

View File

@ -1,4 +1,3 @@
use crate::log_progress;
use alloy_primitives::{keccak256, B256};
use itertools::Itertools;
use reth_config::config::{EtlConfig, HashingConfig};
@ -19,7 +18,6 @@ use std::{
fmt::Debug,
ops::{Range, RangeInclusive},
sync::mpsc::{self, Receiver},
time::Instant,
};
use tracing::*;
@ -188,16 +186,16 @@ where
let mut hashed_account_cursor =
tx.cursor_write::<RawTable<tables::HashedAccounts>>()?;
let total = collector.len();
let mut last_log = Instant::now();
let total_hashes = collector.len();
let interval = (total_hashes / 10).max(1);
for (index, item) in collector.iter()?.enumerate() {
log_progress!(
"sync::stages::hashing_account",
index,
total,
last_log,
"Inserting hashes"
);
if index > 0 && index % interval == 0 {
info!(
target: "sync::stages::hashing_account",
progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
"Inserting hashes"
);
}
let (key, value) = item?;
hashed_account_cursor

View File

@ -1,4 +1,3 @@
use crate::log_progress;
use alloy_primitives::{bytes::BufMut, keccak256, B256};
use itertools::Itertools;
use reth_config::config::{EtlConfig, HashingConfig};
@ -20,7 +19,6 @@ use reth_storage_errors::provider::ProviderResult;
use std::{
fmt::Debug,
sync::mpsc::{self, Receiver},
time::Instant,
};
use tracing::*;
@ -119,17 +117,17 @@ where
collect(&mut channels, &mut collector)?;
let total = collector.len();
let mut last_log = Instant::now();
let total_hashes = collector.len();
let interval = (total_hashes / 10).max(1);
let mut cursor = tx.cursor_dup_write::<tables::HashedStorages>()?;
for (index, item) in collector.iter()?.enumerate() {
log_progress!(
"sync::stages::hashing_storage",
index,
total,
last_log,
"Inserting hashes"
);
if index > 0 && index % interval == 0 {
info!(
target: "sync::stages::hashing_storage",
progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
"Inserting hashes"
);
}
let (addr_key, value) = item?;
cursor.append_dup(

View File

@ -1,4 +1,3 @@
use crate::log_progress;
use alloy_primitives::{BlockHash, BlockNumber, Bytes, B256};
use futures_util::StreamExt;
use reth_config::config::EtlConfig;
@ -26,7 +25,6 @@ use reth_storage_errors::provider::ProviderError;
use std::{
sync::Arc,
task::{ready, Context, Poll},
time::Instant,
};
use tokio::sync::watch;
use tracing::*;
@ -97,9 +95,9 @@ where
provider: &impl DBProvider<Tx: DbTxMut>,
static_file_provider: StaticFileProvider,
) -> Result<BlockNumber, StageError> {
let total = self.header_collector.len();
let total_headers = self.header_collector.len();
info!(target: "sync::stages::headers", total, "Writing headers");
info!(target: "sync::stages::headers", total = total_headers, "Writing headers");
// Consistency check of expected headers in static files vs DB is done on provider::sync_gap
// when poll_execute_ready is polled.
@ -115,11 +113,13 @@ where
// Although headers were downloaded in reverse order, the collector iterates it in ascending
// order
let mut writer = static_file_provider.latest_writer(StaticFileSegment::Headers)?;
let mut last_log = Instant::now();
let interval = (total_headers / 10).max(1);
for (index, header) in self.header_collector.iter()?.enumerate() {
let (_, header_buf) = header?;
log_progress!("sync::stages::headers", index, total, last_log, "Writing headers");
if index > 0 && index % interval == 0 && total_headers > 100 {
info!(target: "sync::stages::headers", progress = %format!("{:.2}%", (index as f64 / total_headers as f64) * 100.0), "Writing headers");
}
let sealed_header: SealedHeader =
bincode::deserialize::<serde_bincode_compat::SealedHeader<'_>>(&header_buf)
@ -147,7 +147,7 @@ where
writer.append_header(&header, td, &header_hash)?;
}
info!(target: "sync::stages::headers", total, "Writing headers hash index");
info!(target: "sync::stages::headers", total = total_headers, "Writing headers hash index");
let mut cursor_header_numbers =
provider.tx_ref().cursor_write::<RawTable<tables::HeaderNumbers>>()?;
@ -168,13 +168,9 @@ where
for (index, hash_to_number) in self.hash_collector.iter()?.enumerate() {
let (hash, number) = hash_to_number?;
log_progress!(
"sync::stages::headers",
index,
total,
last_log,
"Writing headers hash index"
);
if index > 0 && index % interval == 0 && total_headers > 100 {
info!(target: "sync::stages::headers", progress = %format!("{:.2}%", (index as f64 / total_headers as f64) * 100.0), "Writing headers hash index");
}
if first_sync {
cursor_header_numbers.append(

View File

@ -1,4 +1,3 @@
use super::utils::LOG_INTERVAL;
use alloy_primitives::{TxHash, TxNumber};
use num_traits::Zero;
use reth_config::config::{EtlConfig, TransactionLookupConfig};
@ -18,7 +17,6 @@ use reth_stages_api::{
UnwindInput, UnwindOutput,
};
use reth_storage_errors::provider::ProviderError;
use std::time::Instant;
use tracing::*;
/// The transaction lookup stage.
@ -149,18 +147,16 @@ where
.cursor_write::<tables::RawTable<tables::TransactionHashNumbers>>()?;
let total_hashes = hash_collector.len();
let mut last_log = Instant::now();
let interval = (total_hashes / 10).max(1);
for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
let (hash, number) = hash_to_number?;
let now = Instant::now();
if now.duration_since(last_log) >= LOG_INTERVAL {
if index > 0 && index % interval == 0 {
info!(
target: "sync::stages::transaction_lookup",
?append_only,
progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
"Inserting hashes"
);
last_log = now;
}
let key = RawKey::<TxHash>::from_vec(hash);

View File

@ -12,36 +12,12 @@ use reth_db_api::{
use reth_etl::Collector;
use reth_provider::DBProvider;
use reth_stages_api::StageError;
use std::{
collections::HashMap,
hash::Hash,
ops::RangeBounds,
time::{Duration, Instant},
};
use std::{collections::HashMap, hash::Hash, ops::RangeBounds};
use tracing::info;
/// Number of blocks before pushing indices from cache to [`Collector`]
const DEFAULT_CACHE_THRESHOLD: u64 = 100_000;
/// Log interval for progress.
pub(crate) const LOG_INTERVAL: Duration = Duration::from_secs(5);
/// Log progress at a regular interval.
#[macro_export]
macro_rules! log_progress {
($target:expr, $index:expr, $total:expr, $last_log:expr, $message:expr) => {
let now = std::time::Instant::now();
if now.duration_since($last_log) >= $crate::stages::utils::LOG_INTERVAL {
info!(
target: $target,
progress = %format!("{:.2}%", ($index as f64 / $total as f64) * 100.0),
$message
);
$last_log = now;
}
}
}
/// Collects all history (`H`) indices for a range of changesets (`CS`) and stores them in a
/// [`Collector`].
///
@ -89,16 +65,18 @@ where
};
// observability
let total = provider.tx_ref().entries::<CS>()?;
let mut last_log = Instant::now();
let total_changesets = provider.tx_ref().entries::<CS>()?;
let interval = (total_changesets / 1000).max(1);
let mut flush_counter = 0;
let mut current_block_number = u64::MAX;
for (index, entry) in changeset_cursor.walk_range(range)?.enumerate() {
for (idx, entry) in changeset_cursor.walk_range(range)?.enumerate() {
let (block_number, key) = partial_key_factory(entry?);
cache.entry(key).or_default().push(block_number);
log_progress!("sync::stages::index_history", index, total, last_log, "Collecting indices");
if idx > 0 && idx % interval == 0 && total_changesets > 1000 {
info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices");
}
// Make sure we only flush the cache every DEFAULT_CACHE_THRESHOLD blocks.
if current_block_number != block_number {
@ -142,15 +120,17 @@ where
let mut current_list = Vec::<u64>::new();
// observability
let total = collector.len();
let mut last_log = Instant::now();
let total_entries = collector.len();
let interval = (total_entries / 100).max(1);
for (index, element) in collector.iter()?.enumerate() {
let (k, v) = element?;
let sharded_key = decode_key(k)?;
let new_list = BlockNumberList::decompress_owned(v)?;
log_progress!("sync::stages::index_history", index, total, last_log, "Writing indices");
if index > 0 && index % interval == 0 && total_entries > 100 {
info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
}
// AccountsHistory: `Address`.
// StorageHistory: `Address.StorageKey`.