mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
chore(stages): reduce the progress logging (#11653)
Signed-off-by: jsvisa <delweng@gmail.com>
This commit is contained in:
@ -1,3 +1,4 @@
|
||||
use crate::log_progress;
|
||||
use alloy_primitives::{keccak256, B256};
|
||||
use itertools::Itertools;
|
||||
use reth_config::config::{EtlConfig, HashingConfig};
|
||||
@ -18,6 +19,7 @@ use std::{
|
||||
fmt::Debug,
|
||||
ops::{Range, RangeInclusive},
|
||||
sync::mpsc::{self, Receiver},
|
||||
time::Instant,
|
||||
};
|
||||
use tracing::*;
|
||||
|
||||
@ -186,16 +188,16 @@ where
|
||||
let mut hashed_account_cursor =
|
||||
tx.cursor_write::<RawTable<tables::HashedAccounts>>()?;
|
||||
|
||||
let total_hashes = collector.len();
|
||||
let interval = (total_hashes / 10).max(1);
|
||||
let total = collector.len();
|
||||
let mut last_log = Instant::now();
|
||||
for (index, item) in collector.iter()?.enumerate() {
|
||||
if index > 0 && index % interval == 0 {
|
||||
info!(
|
||||
target: "sync::stages::hashing_account",
|
||||
progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
|
||||
"Inserting hashes"
|
||||
);
|
||||
}
|
||||
log_progress!(
|
||||
"sync::stages::hashing_account",
|
||||
index,
|
||||
total,
|
||||
last_log,
|
||||
"Inserting hashes"
|
||||
);
|
||||
|
||||
let (key, value) = item?;
|
||||
hashed_account_cursor
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
use crate::log_progress;
|
||||
use alloy_primitives::{bytes::BufMut, keccak256, B256};
|
||||
use itertools::Itertools;
|
||||
use reth_config::config::{EtlConfig, HashingConfig};
|
||||
@ -19,6 +20,7 @@ use reth_storage_errors::provider::ProviderResult;
|
||||
use std::{
|
||||
fmt::Debug,
|
||||
sync::mpsc::{self, Receiver},
|
||||
time::Instant,
|
||||
};
|
||||
use tracing::*;
|
||||
|
||||
@ -117,17 +119,17 @@ where
|
||||
|
||||
collect(&mut channels, &mut collector)?;
|
||||
|
||||
let total_hashes = collector.len();
|
||||
let interval = (total_hashes / 10).max(1);
|
||||
let total = collector.len();
|
||||
let mut last_log = Instant::now();
|
||||
let mut cursor = tx.cursor_dup_write::<tables::HashedStorages>()?;
|
||||
for (index, item) in collector.iter()?.enumerate() {
|
||||
if index > 0 && index % interval == 0 {
|
||||
info!(
|
||||
target: "sync::stages::hashing_storage",
|
||||
progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
|
||||
"Inserting hashes"
|
||||
);
|
||||
}
|
||||
log_progress!(
|
||||
"sync::stages::hashing_storage",
|
||||
index,
|
||||
total,
|
||||
last_log,
|
||||
"Inserting hashes"
|
||||
);
|
||||
|
||||
let (addr_key, value) = item?;
|
||||
cursor.append_dup(
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
use crate::log_progress;
|
||||
use alloy_primitives::{BlockHash, BlockNumber, Bytes, B256};
|
||||
use futures_util::StreamExt;
|
||||
use reth_config::config::EtlConfig;
|
||||
@ -25,6 +26,7 @@ use reth_storage_errors::provider::ProviderError;
|
||||
use std::{
|
||||
sync::Arc,
|
||||
task::{ready, Context, Poll},
|
||||
time::Instant,
|
||||
};
|
||||
use tokio::sync::watch;
|
||||
use tracing::*;
|
||||
@ -95,9 +97,9 @@ where
|
||||
provider: &impl DBProvider<Tx: DbTxMut>,
|
||||
static_file_provider: StaticFileProvider,
|
||||
) -> Result<BlockNumber, StageError> {
|
||||
let total_headers = self.header_collector.len();
|
||||
let total = self.header_collector.len();
|
||||
|
||||
info!(target: "sync::stages::headers", total = total_headers, "Writing headers");
|
||||
info!(target: "sync::stages::headers", total, "Writing headers");
|
||||
|
||||
// Consistency check of expected headers in static files vs DB is done on provider::sync_gap
|
||||
// when poll_execute_ready is polled.
|
||||
@ -113,13 +115,11 @@ where
|
||||
// Although headers were downloaded in reverse order, the collector iterates it in ascending
|
||||
// order
|
||||
let mut writer = static_file_provider.latest_writer(StaticFileSegment::Headers)?;
|
||||
let interval = (total_headers / 10).max(1);
|
||||
let mut last_log = Instant::now();
|
||||
for (index, header) in self.header_collector.iter()?.enumerate() {
|
||||
let (_, header_buf) = header?;
|
||||
|
||||
if index > 0 && index % interval == 0 && total_headers > 100 {
|
||||
info!(target: "sync::stages::headers", progress = %format!("{:.2}%", (index as f64 / total_headers as f64) * 100.0), "Writing headers");
|
||||
}
|
||||
log_progress!("sync::stages::headers", index, total, last_log, "Writing headers");
|
||||
|
||||
let sealed_header: SealedHeader =
|
||||
bincode::deserialize::<serde_bincode_compat::SealedHeader<'_>>(&header_buf)
|
||||
@ -147,7 +147,7 @@ where
|
||||
writer.append_header(&header, td, &header_hash)?;
|
||||
}
|
||||
|
||||
info!(target: "sync::stages::headers", total = total_headers, "Writing headers hash index");
|
||||
info!(target: "sync::stages::headers", total, "Writing headers hash index");
|
||||
|
||||
let mut cursor_header_numbers =
|
||||
provider.tx_ref().cursor_write::<RawTable<tables::HeaderNumbers>>()?;
|
||||
@ -168,9 +168,13 @@ where
|
||||
for (index, hash_to_number) in self.hash_collector.iter()?.enumerate() {
|
||||
let (hash, number) = hash_to_number?;
|
||||
|
||||
if index > 0 && index % interval == 0 && total_headers > 100 {
|
||||
info!(target: "sync::stages::headers", progress = %format!("{:.2}%", (index as f64 / total_headers as f64) * 100.0), "Writing headers hash index");
|
||||
}
|
||||
log_progress!(
|
||||
"sync::stages::headers",
|
||||
index,
|
||||
total,
|
||||
last_log,
|
||||
"Writing headers hash index"
|
||||
);
|
||||
|
||||
if first_sync {
|
||||
cursor_header_numbers.append(
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
use super::utils::LOG_INTERVAL;
|
||||
use alloy_primitives::{TxHash, TxNumber};
|
||||
use num_traits::Zero;
|
||||
use reth_config::config::{EtlConfig, TransactionLookupConfig};
|
||||
@ -17,6 +18,7 @@ use reth_stages_api::{
|
||||
UnwindInput, UnwindOutput,
|
||||
};
|
||||
use reth_storage_errors::provider::ProviderError;
|
||||
use std::time::Instant;
|
||||
use tracing::*;
|
||||
|
||||
/// The transaction lookup stage.
|
||||
@ -147,16 +149,18 @@ where
|
||||
.cursor_write::<tables::RawTable<tables::TransactionHashNumbers>>()?;
|
||||
|
||||
let total_hashes = hash_collector.len();
|
||||
let interval = (total_hashes / 10).max(1);
|
||||
let mut last_log = Instant::now();
|
||||
for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
|
||||
let (hash, number) = hash_to_number?;
|
||||
if index > 0 && index % interval == 0 {
|
||||
let now = Instant::now();
|
||||
if now.duration_since(last_log) >= LOG_INTERVAL {
|
||||
info!(
|
||||
target: "sync::stages::transaction_lookup",
|
||||
?append_only,
|
||||
progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
|
||||
"Inserting hashes"
|
||||
);
|
||||
last_log = now;
|
||||
}
|
||||
|
||||
let key = RawKey::<TxHash>::from_vec(hash);
|
||||
|
||||
@ -12,12 +12,36 @@ use reth_db_api::{
|
||||
use reth_etl::Collector;
|
||||
use reth_provider::DBProvider;
|
||||
use reth_stages_api::StageError;
|
||||
use std::{collections::HashMap, hash::Hash, ops::RangeBounds};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
hash::Hash,
|
||||
ops::RangeBounds,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tracing::info;
|
||||
|
||||
/// Number of blocks before pushing indices from cache to [`Collector`]
|
||||
const DEFAULT_CACHE_THRESHOLD: u64 = 100_000;
|
||||
|
||||
/// Log interval for progress.
|
||||
pub(crate) const LOG_INTERVAL: Duration = Duration::from_secs(5);
|
||||
|
||||
/// Log progress at a regular interval.
|
||||
#[macro_export]
|
||||
macro_rules! log_progress {
|
||||
($target:expr, $index:expr, $total:expr, $last_log:expr, $message:expr) => {
|
||||
let now = std::time::Instant::now();
|
||||
if now.duration_since($last_log) >= $crate::stages::utils::LOG_INTERVAL {
|
||||
info!(
|
||||
target: $target,
|
||||
progress = %format!("{:.2}%", ($index as f64 / $total as f64) * 100.0),
|
||||
$message
|
||||
);
|
||||
$last_log = now;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Collects all history (`H`) indices for a range of changesets (`CS`) and stores them in a
|
||||
/// [`Collector`].
|
||||
///
|
||||
@ -65,18 +89,16 @@ where
|
||||
};
|
||||
|
||||
// observability
|
||||
let total_changesets = provider.tx_ref().entries::<CS>()?;
|
||||
let interval = (total_changesets / 1000).max(1);
|
||||
let total = provider.tx_ref().entries::<CS>()?;
|
||||
let mut last_log = Instant::now();
|
||||
|
||||
let mut flush_counter = 0;
|
||||
let mut current_block_number = u64::MAX;
|
||||
for (idx, entry) in changeset_cursor.walk_range(range)?.enumerate() {
|
||||
for (index, entry) in changeset_cursor.walk_range(range)?.enumerate() {
|
||||
let (block_number, key) = partial_key_factory(entry?);
|
||||
cache.entry(key).or_default().push(block_number);
|
||||
|
||||
if idx > 0 && idx % interval == 0 && total_changesets > 1000 {
|
||||
info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices");
|
||||
}
|
||||
log_progress!("sync::stages::index_history", index, total, last_log, "Collecting indices");
|
||||
|
||||
// Make sure we only flush the cache every DEFAULT_CACHE_THRESHOLD blocks.
|
||||
if current_block_number != block_number {
|
||||
@ -120,17 +142,15 @@ where
|
||||
let mut current_list = Vec::<u64>::new();
|
||||
|
||||
// observability
|
||||
let total_entries = collector.len();
|
||||
let interval = (total_entries / 100).max(1);
|
||||
let total = collector.len();
|
||||
let mut last_log = Instant::now();
|
||||
|
||||
for (index, element) in collector.iter()?.enumerate() {
|
||||
let (k, v) = element?;
|
||||
let sharded_key = decode_key(k)?;
|
||||
let new_list = BlockNumberList::decompress_owned(v)?;
|
||||
|
||||
if index > 0 && index % interval == 0 && total_entries > 100 {
|
||||
info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
|
||||
}
|
||||
log_progress!("sync::stages::index_history", index, total, last_log, "Writing indices");
|
||||
|
||||
// AccountsHistory: `Address`.
|
||||
// StorageHistory: `Address.StorageKey`.
|
||||
|
||||
Reference in New Issue
Block a user