mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
test(sync): headers stage test runner (#163)
* refactor headers stage tests * inline unwind util functions
This commit is contained in:
@ -1,12 +1,13 @@
|
|||||||
use crate::{
|
use crate::{
|
||||||
|
util::unwind::{unwind_table_by_num, unwind_table_by_num_hash},
|
||||||
DatabaseIntegrityError, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput,
|
DatabaseIntegrityError, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput,
|
||||||
UnwindOutput,
|
UnwindOutput,
|
||||||
};
|
};
|
||||||
use reth_interfaces::{
|
use reth_interfaces::{
|
||||||
consensus::{Consensus, ForkchoiceState},
|
consensus::{Consensus, ForkchoiceState},
|
||||||
db::{
|
db::{
|
||||||
self, models::blocks::BlockNumHash, tables, DBContainer, Database, DatabaseGAT, DbCursorRO,
|
models::blocks::BlockNumHash, tables, DBContainer, Database, DatabaseGAT, DbCursorRO,
|
||||||
DbCursorRW, DbTx, DbTxMut, Table,
|
DbCursorRW, DbTx, DbTxMut,
|
||||||
},
|
},
|
||||||
p2p::headers::{
|
p2p::headers::{
|
||||||
client::HeadersClient,
|
client::HeadersClient,
|
||||||
@ -17,7 +18,7 @@ use reth_primitives::{rpc::BigEndianHash, BlockNumber, HeaderLocked, H256, U256}
|
|||||||
use std::{fmt::Debug, sync::Arc};
|
use std::{fmt::Debug, sync::Arc};
|
||||||
use tracing::*;
|
use tracing::*;
|
||||||
|
|
||||||
const HEADERS: StageId = StageId("HEADERS");
|
const HEADERS: StageId = StageId("Headers");
|
||||||
|
|
||||||
/// The headers stage implementation for staged sync
|
/// The headers stage implementation for staged sync
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@ -47,20 +48,17 @@ impl<DB: Database, D: Downloader, C: Consensus, H: HeadersClient> Stage<DB>
|
|||||||
input: ExecInput,
|
input: ExecInput,
|
||||||
) -> Result<ExecOutput, StageError> {
|
) -> Result<ExecOutput, StageError> {
|
||||||
let tx = db.get_mut();
|
let tx = db.get_mut();
|
||||||
let last_block_num =
|
let last_block_num = input.stage_progress.unwrap_or_default();
|
||||||
input.previous_stage.as_ref().map(|(_, block)| *block).unwrap_or_default();
|
|
||||||
self.update_head::<DB>(tx, last_block_num).await?;
|
self.update_head::<DB>(tx, last_block_num).await?;
|
||||||
|
|
||||||
// download the headers
|
// download the headers
|
||||||
// TODO: handle input.max_block
|
// TODO: handle input.max_block
|
||||||
let last_hash =
|
let last_hash = tx
|
||||||
tx.get::<tables::CanonicalHeaders>(last_block_num)?.ok_or_else(|| -> StageError {
|
.get::<tables::CanonicalHeaders>(last_block_num)?
|
||||||
DatabaseIntegrityError::CannonicalHash { number: last_block_num }.into()
|
.ok_or(DatabaseIntegrityError::CannonicalHash { number: last_block_num })?;
|
||||||
})?;
|
let last_header =
|
||||||
let last_header = tx
|
tx.get::<tables::Headers>((last_block_num, last_hash).into())?.ok_or({
|
||||||
.get::<tables::Headers>((last_block_num, last_hash).into())?
|
DatabaseIntegrityError::Header { number: last_block_num, hash: last_hash }
|
||||||
.ok_or_else(|| -> StageError {
|
|
||||||
DatabaseIntegrityError::Header { number: last_block_num, hash: last_hash }.into()
|
|
||||||
})?;
|
})?;
|
||||||
let head = HeaderLocked::new(last_header, last_hash);
|
let head = HeaderLocked::new(last_header, last_hash);
|
||||||
|
|
||||||
@ -105,11 +103,11 @@ impl<DB: Database, D: Downloader, C: Consensus, H: HeadersClient> Stage<DB>
|
|||||||
input: UnwindInput,
|
input: UnwindInput,
|
||||||
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>> {
|
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>> {
|
||||||
// TODO: handle bad block
|
// TODO: handle bad block
|
||||||
let tx = &mut db.get_mut();
|
let tx = db.get_mut();
|
||||||
self.unwind_table::<DB, tables::CanonicalHeaders, _>(tx, input.unwind_to, |num| num)?;
|
unwind_table_by_num::<DB, tables::CanonicalHeaders>(tx, input.unwind_to)?;
|
||||||
self.unwind_table::<DB, tables::HeaderNumbers, _>(tx, input.unwind_to, |key| key.0 .0)?;
|
unwind_table_by_num_hash::<DB, tables::HeaderNumbers>(tx, input.unwind_to)?;
|
||||||
self.unwind_table::<DB, tables::Headers, _>(tx, input.unwind_to, |key| key.0 .0)?;
|
unwind_table_by_num_hash::<DB, tables::Headers>(tx, input.unwind_to)?;
|
||||||
self.unwind_table::<DB, tables::HeaderTD, _>(tx, input.unwind_to, |key| key.0 .0)?;
|
unwind_table_by_num_hash::<DB, tables::HeaderTD>(tx, input.unwind_to)?;
|
||||||
Ok(UnwindOutput { stage_progress: input.unwind_to })
|
Ok(UnwindOutput { stage_progress: input.unwind_to })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -120,9 +118,9 @@ impl<D: Downloader, C: Consensus, H: HeadersClient> HeaderStage<D, C, H> {
|
|||||||
tx: &mut <DB as DatabaseGAT<'_>>::TXMut,
|
tx: &mut <DB as DatabaseGAT<'_>>::TXMut,
|
||||||
height: BlockNumber,
|
height: BlockNumber,
|
||||||
) -> Result<(), StageError> {
|
) -> Result<(), StageError> {
|
||||||
let hash = tx.get::<tables::CanonicalHeaders>(height)?.ok_or_else(|| -> StageError {
|
let hash = tx
|
||||||
DatabaseIntegrityError::CannonicalHeader { number: height }.into()
|
.get::<tables::CanonicalHeaders>(height)?
|
||||||
})?;
|
.ok_or(DatabaseIntegrityError::CannonicalHeader { number: height })?;
|
||||||
let td: Vec<u8> = tx.get::<tables::HeaderTD>((height, hash).into())?.unwrap(); // TODO:
|
let td: Vec<u8> = tx.get::<tables::HeaderTD>((height, hash).into())?.unwrap(); // TODO:
|
||||||
self.client.update_status(height, hash, H256::from_slice(&td)).await;
|
self.client.update_status(height, hash, H256::from_slice(&td)).await;
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -168,63 +166,30 @@ impl<D: Downloader, C: Consensus, H: HeadersClient> HeaderStage<D, C, H> {
|
|||||||
// TODO: investigate default write flags
|
// TODO: investigate default write flags
|
||||||
cursor_header_number.append(key, header.number)?;
|
cursor_header_number.append(key, header.number)?;
|
||||||
cursor_header.append(key, header)?;
|
cursor_header.append(key, header)?;
|
||||||
cursor_canonical.append(key.0 .0, key.0 .1)?;
|
cursor_canonical.append(key.number(), key.hash())?;
|
||||||
cursor_td.append(key, H256::from_uint(&td).as_bytes().to_vec())?;
|
cursor_td.append(key, H256::from_uint(&td).as_bytes().to_vec())?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(latest)
|
Ok(latest)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Unwind the table to a provided block
|
|
||||||
fn unwind_table<DB, T, F>(
|
|
||||||
&self,
|
|
||||||
tx: &mut <DB as DatabaseGAT<'_>>::TXMut,
|
|
||||||
block: BlockNumber,
|
|
||||||
mut selector: F,
|
|
||||||
) -> Result<(), db::Error>
|
|
||||||
where
|
|
||||||
DB: Database,
|
|
||||||
T: Table,
|
|
||||||
F: FnMut(T::Key) -> BlockNumber,
|
|
||||||
{
|
|
||||||
let mut cursor = tx.cursor_mut::<T>()?;
|
|
||||||
let mut entry = cursor.last()?;
|
|
||||||
while let Some((key, _)) = entry {
|
|
||||||
if selector(key) <= block {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
cursor.delete_current()?;
|
|
||||||
entry = cursor.prev()?;
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::util::test_utils::StageTestRunner;
|
||||||
use assert_matches::assert_matches;
|
use assert_matches::assert_matches;
|
||||||
use reth_db::{kv::Env, mdbx::WriteMap};
|
use reth_interfaces::test_utils::{gen_random_header, gen_random_header_range};
|
||||||
use reth_headers_downloaders::linear::LinearDownloadBuilder;
|
use test_utils::{HeadersTestRunner, TestDownloader};
|
||||||
use reth_interfaces::{
|
|
||||||
db::DBContainer,
|
|
||||||
test_utils::{
|
|
||||||
gen_random_header, gen_random_header_range, TestConsensus, TestHeadersClient,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
use std::{borrow::Borrow, sync::Arc};
|
|
||||||
use test_utils::HeadersDB;
|
|
||||||
use tokio::sync::oneshot;
|
|
||||||
|
|
||||||
const TEST_STAGE: StageId = StageId("HEADERS");
|
const TEST_STAGE: StageId = StageId("Headers");
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
// Check that the execution errors on empty database or
|
// Check that the execution errors on empty database or
|
||||||
// prev progress missing from the database.
|
// prev progress missing from the database.
|
||||||
async fn headers_execute_empty_db() {
|
async fn execute_empty_db() {
|
||||||
let db = HeadersDB::default();
|
let runner = HeadersTestRunner::default();
|
||||||
let input = ExecInput { previous_stage: None, stage_progress: None };
|
let rx = runner.execute(ExecInput::default());
|
||||||
let rx = execute_stage(db.inner(), input, H256::zero(), Ok(vec![]));
|
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
rx.await.unwrap(),
|
rx.await.unwrap(),
|
||||||
Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::CannonicalHeader { .. }))
|
Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::CannonicalHeader { .. }))
|
||||||
@ -233,301 +198,271 @@ pub mod tests {
|
|||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
// Check that the execution exits on downloader timeout.
|
// Check that the execution exits on downloader timeout.
|
||||||
async fn headers_execute_timeout() {
|
async fn execute_timeout() {
|
||||||
let head = gen_random_header(0, None);
|
let head = gen_random_header(0, None);
|
||||||
let db = HeadersDB::default();
|
let runner =
|
||||||
db.insert_header(&head).expect("failed to insert header");
|
HeadersTestRunner::with_downloader(TestDownloader::new(Err(DownloadError::Timeout {
|
||||||
|
request_id: 0,
|
||||||
let input = ExecInput { previous_stage: None, stage_progress: None };
|
})));
|
||||||
let rx = execute_stage(
|
runner.insert_header(&head).expect("failed to insert header");
|
||||||
db.inner(),
|
|
||||||
input,
|
|
||||||
H256::from_low_u64_be(1),
|
|
||||||
Err(DownloadError::Timeout { request_id: 0 }),
|
|
||||||
);
|
|
||||||
|
|
||||||
|
let rx = runner.execute(ExecInput::default());
|
||||||
|
runner.consensus.update_tip(H256::from_low_u64_be(1));
|
||||||
assert_matches!(rx.await.unwrap(), Ok(ExecOutput { done, .. }) if !done);
|
assert_matches!(rx.await.unwrap(), Ok(ExecOutput { done, .. }) if !done);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
// Check that validation error is propagated during the execution.
|
// Check that validation error is propagated during the execution.
|
||||||
async fn headers_execute_validation_error() {
|
async fn execute_validation_error() {
|
||||||
let head = gen_random_header(0, None);
|
let head = gen_random_header(0, None);
|
||||||
let db = HeadersDB::default();
|
let runner = HeadersTestRunner::with_downloader(TestDownloader::new(Err(
|
||||||
db.insert_header(&head).expect("failed to insert header");
|
DownloadError::HeaderValidation { hash: H256::zero(), details: "".to_owned() },
|
||||||
|
)));
|
||||||
let input = ExecInput { previous_stage: None, stage_progress: None };
|
runner.insert_header(&head).expect("failed to insert header");
|
||||||
let rx = execute_stage(
|
|
||||||
db.inner(),
|
|
||||||
input,
|
|
||||||
H256::from_low_u64_be(1),
|
|
||||||
Err(DownloadError::HeaderValidation { hash: H256::zero(), details: "".to_owned() }),
|
|
||||||
);
|
|
||||||
|
|
||||||
|
let rx = runner.execute(ExecInput::default());
|
||||||
|
runner.consensus.update_tip(H256::from_low_u64_be(1));
|
||||||
assert_matches!(rx.await.unwrap(), Err(StageError::Validation { block }) if block == 0);
|
assert_matches!(rx.await.unwrap(), Err(StageError::Validation { block }) if block == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
// Validate that all necessary tables are updated after the
|
// Validate that all necessary tables are updated after the
|
||||||
// header download on no previous progress.
|
// header download on no previous progress.
|
||||||
async fn headers_execute_no_progress() {
|
async fn execute_no_progress() {
|
||||||
let (start, end) = (0, 100);
|
let (start, end) = (0, 100);
|
||||||
let head = gen_random_header(start, None);
|
let head = gen_random_header(start, None);
|
||||||
let headers = gen_random_header_range(start + 1..end, head.hash());
|
let headers = gen_random_header_range(start + 1..end, head.hash());
|
||||||
let db = HeadersDB::default();
|
|
||||||
db.insert_header(&head).expect("failed to insert header");
|
|
||||||
|
|
||||||
let result: Vec<_> = headers.iter().rev().cloned().collect();
|
let result = headers.iter().rev().cloned().collect::<Vec<_>>();
|
||||||
let input = ExecInput { previous_stage: None, stage_progress: None };
|
let runner = HeadersTestRunner::with_downloader(TestDownloader::new(Ok(result)));
|
||||||
|
runner.insert_header(&head).expect("failed to insert header");
|
||||||
|
|
||||||
|
let rx = runner.execute(ExecInput::default());
|
||||||
let tip = headers.last().unwrap();
|
let tip = headers.last().unwrap();
|
||||||
let rx = execute_stage(db.inner(), input, tip.hash(), Ok(result));
|
runner.consensus.update_tip(tip.hash());
|
||||||
|
|
||||||
let result = rx.await.unwrap();
|
assert_matches!(
|
||||||
assert_matches!(result, Ok(ExecOutput { .. }));
|
rx.await.unwrap(),
|
||||||
let result = result.unwrap();
|
Ok(ExecOutput { done, reached_tip, stage_progress })
|
||||||
assert!(result.done && result.reached_tip);
|
if done && reached_tip && stage_progress == tip.number
|
||||||
assert_eq!(result.stage_progress, tip.number);
|
);
|
||||||
|
assert!(headers.iter().try_for_each(|h| runner.validate_db_header(&h)).is_ok());
|
||||||
for header in headers {
|
|
||||||
assert!(db.validate_db_header(&header).is_ok());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
// Validate that all necessary tables are updated after the
|
// Validate that all necessary tables are updated after the
|
||||||
// header download with some previous progress.
|
// header download with some previous progress.
|
||||||
async fn headers_stage_prev_progress() {
|
async fn execute_prev_progress() {
|
||||||
let (start, end) = (10000, 10241);
|
let (start, end) = (10000, 10241);
|
||||||
let head = gen_random_header(start, None);
|
let head = gen_random_header(start, None);
|
||||||
let headers = gen_random_header_range(start + 1..end, head.hash());
|
let headers = gen_random_header_range(start + 1..end, head.hash());
|
||||||
let db = HeadersDB::default();
|
|
||||||
db.insert_header(&head).expect("failed to insert header");
|
|
||||||
|
|
||||||
let result: Vec<_> = headers.iter().rev().cloned().collect();
|
let result = headers.iter().rev().cloned().collect::<Vec<_>>();
|
||||||
let input = ExecInput {
|
let runner = HeadersTestRunner::with_downloader(TestDownloader::new(Ok(result)));
|
||||||
|
runner.insert_header(&head).expect("failed to insert header");
|
||||||
|
|
||||||
|
let rx = runner.execute(ExecInput {
|
||||||
previous_stage: Some((TEST_STAGE, head.number)),
|
previous_stage: Some((TEST_STAGE, head.number)),
|
||||||
stage_progress: Some(head.number),
|
stage_progress: Some(head.number),
|
||||||
};
|
});
|
||||||
let tip = headers.last().unwrap();
|
let tip = headers.last().unwrap();
|
||||||
let rx = execute_stage(db.inner(), input, tip.hash(), Ok(result));
|
runner.consensus.update_tip(tip.hash());
|
||||||
|
|
||||||
let result = rx.await.unwrap();
|
assert_matches!(
|
||||||
assert_matches!(result, Ok(ExecOutput { .. }));
|
rx.await.unwrap(),
|
||||||
let result = result.unwrap();
|
Ok(ExecOutput { done, reached_tip, stage_progress })
|
||||||
assert!(result.done && result.reached_tip);
|
if done && reached_tip && stage_progress == tip.number
|
||||||
assert_eq!(result.stage_progress, tip.number);
|
);
|
||||||
|
assert!(headers.iter().try_for_each(|h| runner.validate_db_header(&h)).is_ok());
|
||||||
for header in headers {
|
|
||||||
assert!(db.validate_db_header(&header).is_ok());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
// Execute the stage with linear downloader
|
// Execute the stage with linear downloader
|
||||||
async fn headers_execute_linear() {
|
async fn execute_with_linear_downloader() {
|
||||||
// TODO: set bigger range once `MDBX_EKEYMISMATCH` issue is resolved
|
let (start, end) = (1000, 1200);
|
||||||
let (start, end) = (1000, 1024);
|
|
||||||
let head = gen_random_header(start, None);
|
let head = gen_random_header(start, None);
|
||||||
let headers = gen_random_header_range(start + 1..end, head.hash());
|
let headers = gen_random_header_range(start + 1..end, head.hash());
|
||||||
let db = HeadersDB::default();
|
|
||||||
db.insert_header(&head).expect("failed to insert header");
|
|
||||||
|
|
||||||
let input = ExecInput {
|
let runner = HeadersTestRunner::with_linear_downloader();
|
||||||
|
runner.insert_header(&head).expect("failed to insert header");
|
||||||
|
let rx = runner.execute(ExecInput {
|
||||||
previous_stage: Some((TEST_STAGE, head.number)),
|
previous_stage: Some((TEST_STAGE, head.number)),
|
||||||
stage_progress: Some(head.number),
|
stage_progress: Some(head.number),
|
||||||
};
|
});
|
||||||
|
|
||||||
let tip = headers.last().unwrap();
|
let tip = headers.last().unwrap();
|
||||||
|
runner.consensus.update_tip(tip.hash());
|
||||||
|
|
||||||
let mut download_result = headers.clone();
|
let mut download_result = headers.clone();
|
||||||
download_result.insert(0, head);
|
download_result.insert(0, head);
|
||||||
let rx = execute_stage_linear(db.inner(), input, tip.hash(), download_result).await;
|
runner
|
||||||
|
.client
|
||||||
|
.on_header_request(1, |id, _| {
|
||||||
|
runner.client.send_header_response(
|
||||||
|
id,
|
||||||
|
download_result.clone().into_iter().map(|h| h.unlock()).collect(),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
let result = rx.await.unwrap();
|
assert_matches!(
|
||||||
assert_matches!(result, Ok(ExecOutput { .. }));
|
rx.await.unwrap(),
|
||||||
let result = result.unwrap();
|
Ok(ExecOutput { done, reached_tip, stage_progress })
|
||||||
assert!(result.done && result.reached_tip);
|
if done && reached_tip && stage_progress == tip.number
|
||||||
assert_eq!(result.stage_progress, tip.number);
|
);
|
||||||
|
assert!(headers.iter().try_for_each(|h| runner.validate_db_header(&h)).is_ok());
|
||||||
for header in headers {
|
|
||||||
assert!(db.validate_db_header(&header).is_ok());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
// Check that unwind does not panic on empty database.
|
// Check that unwind does not panic on empty database.
|
||||||
async fn headers_unwind_empty_db() {
|
async fn unwind_empty_db() {
|
||||||
let db = HeadersDB::default();
|
let unwind_to = 100;
|
||||||
let input = UnwindInput { bad_block: None, stage_progress: 100, unwind_to: 100 };
|
let runner = HeadersTestRunner::default();
|
||||||
let rx = unwind_stage(db.inner(), input);
|
let rx =
|
||||||
|
runner.unwind(UnwindInput { bad_block: None, stage_progress: unwind_to, unwind_to });
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
rx.await.unwrap(),
|
rx.await.unwrap(),
|
||||||
Ok(UnwindOutput {stage_progress} ) if stage_progress == input.unwind_to
|
Ok(UnwindOutput {stage_progress} ) if stage_progress == unwind_to
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
// Check that unwind can remove headers across gaps
|
// Check that unwind can remove headers across gaps
|
||||||
async fn headers_unwind_db_gaps() {
|
async fn unwind_db_gaps() {
|
||||||
|
let runner = HeadersTestRunner::default();
|
||||||
let head = gen_random_header(0, None);
|
let head = gen_random_header(0, None);
|
||||||
let first_range = gen_random_header_range(1..20, head.hash());
|
let first_range = gen_random_header_range(1..20, head.hash());
|
||||||
let second_range = gen_random_header_range(50..100, H256::zero());
|
let second_range = gen_random_header_range(50..100, H256::zero());
|
||||||
let db = HeadersDB::default();
|
runner.insert_header(&head).expect("failed to insert header");
|
||||||
db.insert_header(&head).expect("failed to insert header");
|
runner
|
||||||
for header in first_range.iter().chain(second_range.iter()) {
|
.insert_headers(first_range.iter().chain(second_range.iter()))
|
||||||
db.insert_header(header).expect("failed to insert header");
|
.expect("failed to insert headers");
|
||||||
}
|
|
||||||
|
|
||||||
let input = UnwindInput { bad_block: None, stage_progress: 15, unwind_to: 15 };
|
let unwind_to = 15;
|
||||||
let rx = unwind_stage(db.inner(), input);
|
let rx =
|
||||||
|
runner.unwind(UnwindInput { bad_block: None, stage_progress: unwind_to, unwind_to });
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
rx.await.unwrap(),
|
rx.await.unwrap(),
|
||||||
Ok(UnwindOutput {stage_progress} ) if stage_progress == input.unwind_to
|
Ok(UnwindOutput {stage_progress} ) if stage_progress == unwind_to
|
||||||
);
|
);
|
||||||
|
|
||||||
db.check_no_entry_above::<tables::CanonicalHeaders, _>(input.unwind_to, |key| key)
|
runner
|
||||||
|
.db()
|
||||||
|
.check_no_entry_above::<tables::CanonicalHeaders, _>(unwind_to, |key| key)
|
||||||
.expect("failed to check cannonical headers");
|
.expect("failed to check cannonical headers");
|
||||||
db.check_no_entry_above::<tables::HeaderNumbers, _>(input.unwind_to, |key| key.0 .0)
|
runner
|
||||||
|
.db()
|
||||||
|
.check_no_entry_above::<tables::HeaderNumbers, _>(unwind_to, |key| key.number())
|
||||||
.expect("failed to check header numbers");
|
.expect("failed to check header numbers");
|
||||||
db.check_no_entry_above::<tables::Headers, _>(input.unwind_to, |key| key.0 .0)
|
runner
|
||||||
|
.db()
|
||||||
|
.check_no_entry_above::<tables::Headers, _>(unwind_to, |key| key.number())
|
||||||
.expect("failed to check headers");
|
.expect("failed to check headers");
|
||||||
db.check_no_entry_above::<tables::HeaderTD, _>(input.unwind_to, |key| key.0 .0)
|
runner
|
||||||
|
.db()
|
||||||
|
.check_no_entry_above::<tables::HeaderTD, _>(unwind_to, |key| key.number())
|
||||||
.expect("failed to check td");
|
.expect("failed to check td");
|
||||||
}
|
}
|
||||||
|
|
||||||
// A helper function to run [HeaderStage::execute]
|
mod test_utils {
|
||||||
// with default consensus, client & test downloader
|
use crate::{
|
||||||
fn execute_stage(
|
stages::headers::HeaderStage,
|
||||||
db: Arc<Env<WriteMap>>,
|
util::test_utils::{StageTestDB, StageTestRunner},
|
||||||
input: ExecInput,
|
|
||||||
tip: H256,
|
|
||||||
download_result: Result<Vec<HeaderLocked>, DownloadError>,
|
|
||||||
) -> oneshot::Receiver<Result<ExecOutput, StageError>> {
|
|
||||||
let (tx, rx) = oneshot::channel();
|
|
||||||
|
|
||||||
let client = Arc::new(TestHeadersClient::default());
|
|
||||||
let consensus = Arc::new(TestConsensus::default());
|
|
||||||
let downloader = test_utils::TestDownloader::new(download_result);
|
|
||||||
|
|
||||||
let mut stage = HeaderStage { consensus: consensus.clone(), client, downloader };
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let mut db = DBContainer::<Env<WriteMap>>::new(db.borrow()).unwrap();
|
|
||||||
let result = stage.execute(&mut db, input).await;
|
|
||||||
db.commit().expect("failed to commit");
|
|
||||||
tx.send(result).expect("failed to send result");
|
|
||||||
});
|
|
||||||
consensus.update_tip(tip);
|
|
||||||
rx
|
|
||||||
}
|
|
||||||
|
|
||||||
// A helper function to run [HeaderStage::execute]
|
|
||||||
// with linear downloader
|
|
||||||
async fn execute_stage_linear(
|
|
||||||
db: Arc<Env<WriteMap>>,
|
|
||||||
input: ExecInput,
|
|
||||||
tip: H256,
|
|
||||||
headers: Vec<HeaderLocked>,
|
|
||||||
) -> oneshot::Receiver<Result<ExecOutput, StageError>> {
|
|
||||||
let (tx, rx) = oneshot::channel();
|
|
||||||
|
|
||||||
let consensus = Arc::new(TestConsensus::default());
|
|
||||||
let client = Arc::new(TestHeadersClient::default());
|
|
||||||
let downloader = LinearDownloadBuilder::new().build(consensus.clone(), client.clone());
|
|
||||||
|
|
||||||
let mut stage =
|
|
||||||
HeaderStage { consensus: consensus.clone(), client: client.clone(), downloader };
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let mut db = DBContainer::<Env<WriteMap>>::new(db.borrow()).unwrap();
|
|
||||||
let result = stage.execute(&mut db, input).await;
|
|
||||||
db.commit().expect("failed to commit");
|
|
||||||
tx.send(result).expect("failed to send result");
|
|
||||||
});
|
|
||||||
|
|
||||||
consensus.update_tip(tip);
|
|
||||||
client
|
|
||||||
.on_header_request(1, |id, _| {
|
|
||||||
client.send_header_response(
|
|
||||||
id,
|
|
||||||
headers.clone().into_iter().map(|h| h.unlock()).collect(),
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
rx
|
|
||||||
}
|
|
||||||
|
|
||||||
// A helper function to run [HeaderStage::unwind]
|
|
||||||
fn unwind_stage(
|
|
||||||
db: Arc<Env<WriteMap>>,
|
|
||||||
input: UnwindInput,
|
|
||||||
) -> oneshot::Receiver<Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>>> {
|
|
||||||
let (tx, rx) = oneshot::channel();
|
|
||||||
let mut stage = HeaderStage {
|
|
||||||
client: Arc::new(TestHeadersClient::default()),
|
|
||||||
consensus: Arc::new(TestConsensus::default()),
|
|
||||||
downloader: test_utils::TestDownloader::new(Ok(vec![])),
|
|
||||||
};
|
};
|
||||||
tokio::spawn(async move {
|
|
||||||
let mut db = DBContainer::<Env<WriteMap>>::new(db.borrow()).unwrap();
|
|
||||||
let result = stage.unwind(&mut db, input).await;
|
|
||||||
db.commit().expect("failed to commit");
|
|
||||||
tx.send(result).expect("failed to send result");
|
|
||||||
});
|
|
||||||
rx
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) mod test_utils {
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use reth_db::{
|
use reth_headers_downloaders::linear::{LinearDownloadBuilder, LinearDownloader};
|
||||||
kv::{test_utils::create_test_db, Env, EnvKind},
|
|
||||||
mdbx,
|
|
||||||
mdbx::WriteMap,
|
|
||||||
};
|
|
||||||
use reth_interfaces::{
|
use reth_interfaces::{
|
||||||
consensus::ForkchoiceState,
|
consensus::ForkchoiceState,
|
||||||
db::{
|
db::{self, models::blocks::BlockNumHash, tables, DbTx},
|
||||||
self, models::blocks::BlockNumHash, tables, DBContainer, DbCursorRO, DbCursorRW,
|
|
||||||
DbTx, DbTxMut, Table,
|
|
||||||
},
|
|
||||||
p2p::headers::downloader::{DownloadError, Downloader},
|
p2p::headers::downloader::{DownloadError, Downloader},
|
||||||
test_utils::{TestConsensus, TestHeadersClient},
|
test_utils::{TestConsensus, TestHeadersClient},
|
||||||
};
|
};
|
||||||
use reth_primitives::{rpc::BigEndianHash, BlockNumber, HeaderLocked, H256, U256};
|
use reth_primitives::{rpc::BigEndianHash, HeaderLocked, H256, U256};
|
||||||
use std::{borrow::Borrow, sync::Arc, time::Duration};
|
use std::{ops::Deref, sync::Arc, time::Duration};
|
||||||
|
|
||||||
pub(crate) struct HeadersDB {
|
pub(crate) struct HeadersTestRunner<D: Downloader> {
|
||||||
db: Arc<Env<WriteMap>>,
|
pub(crate) consensus: Arc<TestConsensus>,
|
||||||
|
pub(crate) client: Arc<TestHeadersClient>,
|
||||||
|
downloader: Arc<D>,
|
||||||
|
db: StageTestDB,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for HeadersDB {
|
impl Default for HeadersTestRunner<TestDownloader> {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self { db: Arc::new(create_test_db::<mdbx::WriteMap>(EnvKind::RW)) }
|
Self {
|
||||||
|
client: Arc::new(TestHeadersClient::default()),
|
||||||
|
consensus: Arc::new(TestConsensus::default()),
|
||||||
|
downloader: Arc::new(TestDownloader::new(Ok(Vec::default()))),
|
||||||
|
db: StageTestDB::default(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl HeadersDB {
|
impl<D: Downloader + 'static> StageTestRunner for HeadersTestRunner<D> {
|
||||||
pub(crate) fn inner(&self) -> Arc<Env<WriteMap>> {
|
type S = HeaderStage<Arc<D>, TestConsensus, TestHeadersClient>;
|
||||||
self.db.clone()
|
|
||||||
|
fn db(&self) -> &StageTestDB {
|
||||||
|
&self.db
|
||||||
}
|
}
|
||||||
|
|
||||||
fn container(&self) -> DBContainer<'_, Env<WriteMap>> {
|
fn stage(&self) -> Self::S {
|
||||||
DBContainer::new(self.db.borrow()).expect("failed to create db container")
|
HeaderStage {
|
||||||
|
consensus: self.consensus.clone(),
|
||||||
|
client: self.client.clone(),
|
||||||
|
downloader: self.downloader.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HeadersTestRunner<LinearDownloader<TestConsensus, TestHeadersClient>> {
|
||||||
|
pub(crate) fn with_linear_downloader() -> Self {
|
||||||
|
let client = Arc::new(TestHeadersClient::default());
|
||||||
|
let consensus = Arc::new(TestConsensus::default());
|
||||||
|
let downloader =
|
||||||
|
Arc::new(LinearDownloadBuilder::new().build(consensus.clone(), client.clone()));
|
||||||
|
Self { client, consensus, downloader, db: StageTestDB::default() }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<D: Downloader> HeadersTestRunner<D> {
|
||||||
|
pub(crate) fn with_downloader(downloader: D) -> Self {
|
||||||
|
HeadersTestRunner {
|
||||||
|
client: Arc::new(TestHeadersClient::default()),
|
||||||
|
consensus: Arc::new(TestConsensus::default()),
|
||||||
|
downloader: Arc::new(downloader),
|
||||||
|
db: StageTestDB::default(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Insert header into tables
|
/// Insert header into tables
|
||||||
pub(crate) fn insert_header(&self, header: &HeaderLocked) -> Result<(), db::Error> {
|
pub(crate) fn insert_header(&self, header: &HeaderLocked) -> Result<(), db::Error> {
|
||||||
let mut db = self.container();
|
self.insert_headers(std::iter::once(header))
|
||||||
let tx = db.get_mut();
|
}
|
||||||
|
|
||||||
let key: BlockNumHash = (header.number, header.hash()).into();
|
/// Insert headers into tables
|
||||||
tx.put::<tables::HeaderNumbers>(key, header.number)?;
|
pub(crate) fn insert_headers<'a, I>(&self, headers: I) -> Result<(), db::Error>
|
||||||
tx.put::<tables::Headers>(key, header.clone().unlock())?;
|
where
|
||||||
tx.put::<tables::CanonicalHeaders>(header.number, header.hash())?;
|
I: Iterator<Item = &'a HeaderLocked>,
|
||||||
|
{
|
||||||
|
let headers = headers.collect::<Vec<_>>();
|
||||||
|
self.db.map_put::<tables::HeaderNumbers, _, _>(&headers, |h| {
|
||||||
|
(BlockNumHash((h.number, h.hash())), h.number)
|
||||||
|
})?;
|
||||||
|
self.db.map_put::<tables::Headers, _, _>(&headers, |h| {
|
||||||
|
(BlockNumHash((h.number, h.hash())), h.deref().clone().unlock())
|
||||||
|
})?;
|
||||||
|
self.db.map_put::<tables::CanonicalHeaders, _, _>(&headers, |h| {
|
||||||
|
(h.number, h.hash())
|
||||||
|
})?;
|
||||||
|
|
||||||
let mut cursor_td = tx.cursor_mut::<tables::HeaderTD>()?;
|
self.db.transform_append::<tables::HeaderTD, _, _>(&headers, |prev, h| {
|
||||||
let td =
|
let prev_td = U256::from_big_endian(&prev.clone().unwrap_or_default());
|
||||||
U256::from_big_endian(&cursor_td.last()?.map(|(_, v)| v).unwrap_or_default());
|
(
|
||||||
cursor_td
|
BlockNumHash((h.number, h.hash())),
|
||||||
.append(key, H256::from_uint(&(td + header.difficulty)).as_bytes().to_vec())?;
|
H256::from_uint(&(prev_td + h.difficulty)).as_bytes().to_vec(),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
db.commit()?;
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -536,7 +471,7 @@ pub mod tests {
|
|||||||
&self,
|
&self,
|
||||||
header: &HeaderLocked,
|
header: &HeaderLocked,
|
||||||
) -> Result<(), db::Error> {
|
) -> Result<(), db::Error> {
|
||||||
let db = self.container();
|
let db = self.db.container();
|
||||||
let tx = db.get();
|
let tx = db.get();
|
||||||
let key: BlockNumHash = (header.number, header.hash()).into();
|
let key: BlockNumHash = (header.number, header.hash()).into();
|
||||||
|
|
||||||
@ -561,27 +496,6 @@ pub mod tests {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check there there is no table entry above given block
|
|
||||||
pub(crate) fn check_no_entry_above<T: Table, F>(
|
|
||||||
&self,
|
|
||||||
block: BlockNumber,
|
|
||||||
mut selector: F,
|
|
||||||
) -> Result<(), db::Error>
|
|
||||||
where
|
|
||||||
T: Table,
|
|
||||||
F: FnMut(T::Key) -> BlockNumber,
|
|
||||||
{
|
|
||||||
let db = self.container();
|
|
||||||
let tx = db.get();
|
|
||||||
|
|
||||||
let mut cursor = tx.cursor::<T>()?;
|
|
||||||
if let Some((key, _)) = cursor.last()? {
|
|
||||||
assert!(selector(key) <= block);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
|||||||
@ -68,7 +68,21 @@ pub(crate) mod unwind {
|
|||||||
};
|
};
|
||||||
use reth_primitives::BlockNumber;
|
use reth_primitives::BlockNumber;
|
||||||
|
|
||||||
|
/// Unwind table by block number key
|
||||||
|
#[inline]
|
||||||
|
pub(crate) fn unwind_table_by_num<DB, T>(
|
||||||
|
tx: &mut <DB as DatabaseGAT<'_>>::TXMut,
|
||||||
|
block: BlockNumber,
|
||||||
|
) -> Result<(), Error>
|
||||||
|
where
|
||||||
|
DB: Database,
|
||||||
|
T: Table<Key = BlockNumber>,
|
||||||
|
{
|
||||||
|
unwind_table::<DB, T, _>(tx, block, |key| key)
|
||||||
|
}
|
||||||
|
|
||||||
/// Unwind table by composite block number hash key
|
/// Unwind table by composite block number hash key
|
||||||
|
#[inline]
|
||||||
pub(crate) fn unwind_table_by_num_hash<DB, T>(
|
pub(crate) fn unwind_table_by_num_hash<DB, T>(
|
||||||
tx: &mut <DB as DatabaseGAT<'_>>::TXMut,
|
tx: &mut <DB as DatabaseGAT<'_>>::TXMut,
|
||||||
block: BlockNumber,
|
block: BlockNumber,
|
||||||
|
|||||||
Reference in New Issue
Block a user