feat(stages): bodies & TD progress (#3010)

This commit is contained in:
Alexey Shekhirin
2023-06-07 00:58:51 +04:00
committed by GitHub
parent 0bf360e460
commit 5345d0c1b8
6 changed files with 164 additions and 36 deletions

View File

@ -5,15 +5,16 @@ use reth_db::{
database::Database,
models::{StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals},
tables,
transaction::DbTxMut,
transaction::{DbTx, DbTxMut},
DatabaseError,
};
use reth_interfaces::{
consensus::Consensus,
p2p::bodies::{downloader::BodyDownloader, response::BlockResponse},
};
use reth_primitives::stage::{StageCheckpoint, StageId};
use reth_primitives::stage::{EntitiesCheckpoint, StageCheckpoint, StageId};
use reth_provider::Transaction;
use std::sync::Arc;
use std::{ops::Deref, sync::Arc};
use tracing::*;
// TODO(onbjerg): Metrics and events (gradual status for e.g. CLI)
@ -154,7 +155,11 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
// - We reached our target and the target was not limited by the batch size of the stage
let done = highest_block == to_block;
info!(target: "sync::stages::bodies", stage_progress = highest_block, target = to_block, is_final_range = done, "Stage iteration finished");
Ok(ExecOutput { checkpoint: StageCheckpoint::new(highest_block), done })
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(highest_block)
.with_entities_stage_checkpoint(stage_checkpoint(tx)?),
done,
})
}
/// Unwind the stage.
@ -207,10 +212,25 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
}
info!(target: "sync::stages::bodies", to_block = input.unwind_to, stage_progress = input.unwind_to, is_final_range = true, "Unwind iteration finished");
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })
Ok(UnwindOutput {
checkpoint: StageCheckpoint::new(input.unwind_to)
.with_entities_stage_checkpoint(stage_checkpoint(tx)?),
})
}
}
// TODO(alexey): ideally, we want to measure Bodies stage progress in bytes, but it's hard to know
// beforehand how many bytes we need to download. So the good solution would be to measure the
// progress in gas as a proxy to size. Execution stage uses a similar approach.
fn stage_checkpoint<DB: Database>(
tx: &Transaction<'_, DB>,
) -> Result<EntitiesCheckpoint, DatabaseError> {
Ok(EntitiesCheckpoint {
processed: tx.deref().entries::<tables::BlockBodyIndices>()? as u64,
total: tx.deref().entries::<tables::Headers>()? as u64,
})
}
#[cfg(test)]
mod tests {
use super::*;
@ -219,6 +239,7 @@ mod tests {
PREV_STAGE_ID,
};
use assert_matches::assert_matches;
use reth_primitives::stage::StageUnitCheckpoint;
use test_utils::*;
stage_test_suite_ext!(BodyTestRunner, body);
@ -238,7 +259,8 @@ mod tests {
// Set the batch size (max we sync per stage execution) to less than the number of blocks
// the previous stage synced (10 vs 20)
runner.set_batch_size(10);
let batch_size = 10;
runner.set_batch_size(batch_size);
// Run the stage
let rx = runner.execute(input);
@ -248,7 +270,14 @@ mod tests {
let output = rx.await.unwrap();
assert_matches!(
output,
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, ..}, done: false }) if block_number < 200
Ok(ExecOutput { checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed, // 1 seeded block body + batch size
total // seeded headers
}))
}, done: false }) if block_number < 200 &&
processed == 1 + batch_size && total == previous_stage
);
assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation");
}
@ -278,9 +307,15 @@ mod tests {
assert_matches!(
output,
Ok(ExecOutput {
checkpoint: StageCheckpoint { block_number: 20, stage_checkpoint: None },
checkpoint: StageCheckpoint {
block_number: 20,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total
}))
},
done: true
})
}) if processed == total && total == previous_stage
);
assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation");
}
@ -298,7 +333,8 @@ mod tests {
};
runner.seed_execution(input).expect("failed to seed execution");
runner.set_batch_size(10);
let batch_size = 10;
runner.set_batch_size(batch_size);
// Run the stage
let rx = runner.execute(input);
@ -307,7 +343,14 @@ mod tests {
let first_run = rx.await.unwrap();
assert_matches!(
first_run,
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, ..}, done: false }) if block_number >= 10
Ok(ExecOutput { checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total
}))
}, done: false }) if block_number >= 10 &&
processed == 1 + batch_size && total == previous_stage
);
let first_run_checkpoint = first_run.unwrap().checkpoint;
@ -322,7 +365,14 @@ mod tests {
let output = rx.await.unwrap();
assert_matches!(
output,
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, ..}, done: true }) if block_number > first_run_checkpoint.block_number
Ok(ExecOutput { checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total
}))
}, done: true }) if block_number > first_run_checkpoint.block_number &&
processed == total && total == previous_stage
);
assert_matches!(
runner.validate_execution(input, output.ok()),
@ -355,7 +405,14 @@ mod tests {
let output = rx.await.unwrap();
assert_matches!(
output,
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, ..}, done: true }) if block_number == previous_stage
Ok(ExecOutput { checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total
}))
}, done: true }) if block_number == previous_stage &&
processed == total && total == previous_stage
);
let checkpoint = output.unwrap().checkpoint;
runner
@ -379,7 +436,13 @@ mod tests {
let res = runner.unwind(input).await;
assert_matches!(
res,
Ok(UnwindOutput { checkpoint: StageCheckpoint { block_number: 1, .. } })
Ok(UnwindOutput { checkpoint: StageCheckpoint {
block_number: 1,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed: 1,
total
}))
}}) if total == previous_stage
);
assert_matches!(runner.validate_unwind(input), Ok(_), "unwind validation");

View File

@ -4,14 +4,15 @@ use reth_db::{
database::Database,
tables,
transaction::{DbTx, DbTxMut},
DatabaseError,
};
use reth_interfaces::{consensus::Consensus, provider::ProviderError};
use reth_primitives::{
stage::{StageCheckpoint, StageId},
stage::{EntitiesCheckpoint, StageCheckpoint, StageId},
U256,
};
use reth_provider::Transaction;
use std::sync::Arc;
use std::{ops::Deref, sync::Arc};
use tracing::*;
/// The total difficulty stage.
@ -81,8 +82,13 @@ impl<DB: Database> Stage<DB> for TotalDifficultyStage {
.map_err(|error| StageError::Validation { block: header.seal_slow(), error })?;
cursor_td.append(block_number, td.into())?;
}
info!(target: "sync::stages::total_difficulty", stage_progress = end_block, is_final_range, "Stage iteration finished");
Ok(ExecOutput { checkpoint: StageCheckpoint::new(end_block), done: is_final_range })
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(end_block)
.with_entities_stage_checkpoint(stage_checkpoint(tx)?),
done: is_final_range,
})
}
/// Unwind the stage.
@ -97,18 +103,31 @@ impl<DB: Database> Stage<DB> for TotalDifficultyStage {
tx.unwind_table_by_num::<tables::HeaderTD>(unwind_to)?;
info!(target: "sync::stages::total_difficulty", to_block = input.unwind_to, unwind_progress = unwind_to, is_final_range, "Unwind iteration finished");
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) })
Ok(UnwindOutput {
checkpoint: StageCheckpoint::new(unwind_to)
.with_entities_stage_checkpoint(stage_checkpoint(tx)?),
})
}
}
fn stage_checkpoint<DB: Database>(
tx: &Transaction<'_, DB>,
) -> Result<EntitiesCheckpoint, DatabaseError> {
Ok(EntitiesCheckpoint {
processed: tx.deref().entries::<tables::HeaderTD>()? as u64,
total: tx.deref().entries::<tables::Headers>()? as u64,
})
}
#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
use reth_db::transaction::DbTx;
use reth_interfaces::test_utils::{
generators::{random_header, random_header_range},
TestConsensus,
};
use reth_primitives::{BlockNumber, SealedHeader};
use reth_primitives::{stage::StageUnitCheckpoint, BlockNumber, SealedHeader};
use super::*;
use crate::test_utils::{
@ -137,11 +156,17 @@ mod tests {
// Execute first time
let result = runner.execute(first_input).await.unwrap();
let expected_progress = stage_progress + threshold;
assert!(matches!(
assert_matches!(
result,
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, ..}, done: false })
if block_number == expected_progress
));
Ok(ExecOutput { checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total
}))
}, done: false }) if block_number == expected_progress && processed == 1 + threshold &&
total == runner.tx.table::<tables::Headers>().unwrap().len() as u64
);
// Execute second time
let second_input = ExecInput {
@ -149,11 +174,17 @@ mod tests {
checkpoint: Some(StageCheckpoint::new(expected_progress)),
};
let result = runner.execute(second_input).await.unwrap();
assert!(matches!(
assert_matches!(
result,
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, ..}, done: true })
if block_number == previous_stage
));
Ok(ExecOutput { checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total
}))
}, done: true }) if block_number == previous_stage && processed == total &&
total == runner.tx.table::<tables::Headers>().unwrap().len() as u64
);
assert!(runner.validate_execution(first_input, result.ok()).is_ok(), "validation failed");
}