fix: Add more observability on stdout during ETL stages execution (#7377)

This commit is contained in:
joshieDo
2024-04-04 16:44:48 +02:00
committed by GitHub
parent 2ebbd38127
commit 9c11961488
4 changed files with 30 additions and 10 deletions

View File

@ -200,7 +200,17 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
let mut hashed_account_cursor =
tx.cursor_write::<RawTable<tables::HashedAccounts>>()?;
for item in collector.iter()? {
let total_hashes = collector.len();
let interval = (total_hashes / 10).max(1);
for (index, item) in collector.iter()?.enumerate() {
if index > 0 && index % interval == 0 {
info!(
target: "sync::stages::hashing_account",
progress = format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
"Inserting hashes"
);
}
let (key, value) = item?;
hashed_account_cursor
.append(RawKey::<B256>::from_vec(key), RawValue::<Account>::from_vec(value))?;
@ -262,7 +272,7 @@ fn collect(
collector.insert(key, v)?;
}
}
debug!(target: "sync::stages::hashing_account", "Hashed {} entries", collector.len());
info!(target: "sync::stages::hashing_account", "Hashed {} entries", collector.len());
channels.clear();
Ok(())
}

View File

@ -116,8 +116,18 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
collect(&mut channels, &mut collector)?;
let total_hashes = collector.len();
let interval = (total_hashes / 10).max(1);
let mut cursor = tx.cursor_dup_write::<tables::HashedStorages>()?;
for item in collector.iter()? {
for (index, item) in collector.iter()?.enumerate() {
if index > 0 && index % interval == 0 {
info!(
target: "sync::stages::hashing_storage",
progress = format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
"Inserting hashes"
);
}
let (addr_key, value) = item?;
cursor.append_dup(
B256::from_slice(&addr_key[..32]),
@ -182,7 +192,7 @@ fn collect(
collector.insert(key, v)?;
}
}
debug!(target: "sync::stages::hashing_storage", "Hashed {} entries", collector.len());
info!(target: "sync::stages::hashing_storage", "Hashed {} entries", collector.len());
channels.clear();
Ok(())
}

View File

@ -104,7 +104,7 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
let mut hash_collector: Collector<TxHash, TxNumber> =
Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
debug!(
info!(
target: "sync::stages::transaction_lookup",
tx_range = ?input.checkpoint().block_number..=input.target(),
"Updating transaction lookup"
@ -116,7 +116,7 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
let end_block = *block_range.end();
debug!(target: "sync::stages::transaction_lookup", ?tx_range, "Calculating transaction hashes");
info!(target: "sync::stages::transaction_lookup", ?tx_range, "Calculating transaction hashes");
for (key, value) in provider.transaction_hashes_by_range(tx_range)? {
hash_collector.insert(key, value)?;
@ -139,7 +139,7 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
let (hash, number) = hash_to_number?;
if index > 0 && index % interval == 0 {
debug!(
info!(
target: "sync::stages::transaction_lookup",
?append_only,
progress = format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),

View File

@ -64,7 +64,7 @@ where
// observability
let total_changesets = tx.entries::<CS>()?;
let interval = (total_changesets / 100).max(1);
let interval = (total_changesets / 1000).max(1);
let mut flush_counter = 0;
let mut current_block_number = u64::MAX;
@ -72,8 +72,8 @@ where
let (block_number, key) = partial_key_factory(entry?);
cache.entry(key).or_default().push(block_number);
if idx > 0 && idx % interval == 0 && total_changesets > 100 {
info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices");
if idx > 0 && idx % interval == 0 && total_changesets > 1000 {
info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices");
}
// Make sure we only flush the cache every DEFAULT_CACHE_THRESHOLD blocks.