test(stage): headers linear downloader (#143)

* headers stage linear test

* cleanup unwind
This commit is contained in:
Roman Krasiuk
2022-11-03 02:28:44 +02:00
committed by GitHub
parent ac2f3fcd8a
commit c232a72338
3 changed files with 98 additions and 29 deletions

2
Cargo.lock generated
View File

@ -3320,8 +3320,8 @@ dependencies = [
"assert_matches",
"async-trait",
"metrics",
"once_cell",
"reth-db",
"reth-headers-downloaders",
"reth-interfaces",
"reth-primitives",
"tempfile",

View File

@ -22,8 +22,8 @@ metrics = "0.20.1"
[dev-dependencies]
reth-db = { path = "../db", features = ["test-utils"] }
reth-interfaces = { path = "../interfaces", features = ["test-utils"] }
reth-headers-downloaders = { path = "../net/headers-downloaders" }
tokio = { version = "*", features = ["rt", "sync", "macros"] }
tokio-stream = "0.1.10"
once_cell = "1.15.0"
tempfile = "3.3.0"
assert_matches = "1.5.0"

View File

@ -14,7 +14,7 @@ use reth_interfaces::{
},
};
use reth_primitives::{rpc::BigEndianHash, BlockNumber, HeaderLocked, H256, U256};
use std::fmt::Debug;
use std::{fmt::Debug, sync::Arc};
use tracing::*;
const HEADERS: StageId = StageId("HEADERS");
@ -25,9 +25,9 @@ pub struct HeaderStage<D: Downloader, C: Consensus, H: HeadersClient> {
/// Strategy for downloading the headers
pub downloader: D,
/// Consensus client implementation
pub consensus: C,
pub consensus: Arc<C>,
/// Downloader client implementation
pub client: H,
pub client: Arc<H>,
}
#[async_trait::async_trait]
@ -201,11 +201,11 @@ impl<D: Downloader, C: Consensus, H: HeadersClient> HeaderStage<D, C, H> {
}
#[cfg(test)]
pub(crate) mod tests {
pub mod tests {
use super::*;
use assert_matches::assert_matches;
use once_cell::sync::Lazy;
use reth_db::{kv::Env, mdbx::WriteMap};
use reth_headers_downloaders::linear::LinearDownloadBuilder;
use reth_interfaces::{
db::DBContainer,
test_utils::{
@ -217,8 +217,6 @@ pub(crate) mod tests {
use tokio::sync::oneshot;
const TEST_STAGE: StageId = StageId("HEADERS");
static CONSENSUS: Lazy<TestConsensus> = Lazy::new(|| TestConsensus::default());
static CLIENT: Lazy<TestHeadersClient> = Lazy::new(|| TestHeadersClient::default());
#[tokio::test]
// Check that the execution errors on empty database or
@ -226,7 +224,7 @@ pub(crate) mod tests {
async fn headers_execute_empty_db() {
let db = HeadersDB::default();
let input = ExecInput { previous_stage: None, stage_progress: None };
let rx = execute_stage(db.inner(), input, Ok(vec![]));
let rx = execute_stage(db.inner(), input, H256::zero(), Ok(vec![]));
assert_matches!(
rx.await.unwrap(),
Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::NoCannonicalHeader { .. }))
@ -241,8 +239,13 @@ pub(crate) mod tests {
db.insert_header(&head).expect("failed to insert header");
let input = ExecInput { previous_stage: None, stage_progress: None };
let rx = execute_stage(db.inner(), input, Err(DownloadError::Timeout { request_id: 0 }));
CONSENSUS.update_tip(H256::from_low_u64_be(1));
let rx = execute_stage(
db.inner(),
input,
H256::from_low_u64_be(1),
Err(DownloadError::Timeout { request_id: 0 }),
);
assert_matches!(rx.await.unwrap(), Ok(ExecOutput { done, .. }) if !done);
}
@ -257,9 +260,9 @@ pub(crate) mod tests {
let rx = execute_stage(
db.inner(),
input,
H256::from_low_u64_be(1),
Err(DownloadError::HeaderValidation { hash: H256::zero(), details: "".to_owned() }),
);
CONSENSUS.update_tip(H256::from_low_u64_be(1));
assert_matches!(rx.await.unwrap(), Err(StageError::Validation { block }) if block == 0);
}
@ -276,9 +279,8 @@ pub(crate) mod tests {
let result: Vec<_> = headers.iter().rev().cloned().collect();
let input = ExecInput { previous_stage: None, stage_progress: None };
let rx = execute_stage(db.inner(), input, Ok(result));
let tip = headers.last().unwrap();
CONSENSUS.update_tip(tip.hash());
let rx = execute_stage(db.inner(), input, tip.hash(), Ok(result));
let result = rx.await.unwrap();
assert_matches!(result, Ok(ExecOutput { .. }));
@ -307,9 +309,38 @@ pub(crate) mod tests {
previous_stage: Some((TEST_STAGE, head.number)),
stage_progress: Some(head.number),
};
let rx = execute_stage(db.inner(), input, Ok(result));
let tip = headers.last().unwrap();
CONSENSUS.update_tip(tip.hash());
let rx = execute_stage(db.inner(), input, tip.hash(), Ok(result));
let result = rx.await.unwrap();
assert_matches!(result, Ok(ExecOutput { .. }));
let result = result.unwrap();
assert!(result.done && result.reached_tip);
assert_eq!(result.stage_progress, tip.number);
for header in headers {
assert!(db.validate_db_header(&header).is_ok());
}
}
#[tokio::test]
// Execute the stage with linear downloader
async fn headers_execute_linear() {
// TODO: set bigger range once `MDBX_EKEYMISMATCH` issue is resolved
let (start, end) = (1000, 1024);
let head = gen_random_header(start, None);
let headers = gen_random_header_range(start + 1..end, head.hash());
let db = HeadersDB::default();
db.insert_header(&head).expect("failed to insert header");
let input = ExecInput {
previous_stage: Some((TEST_STAGE, head.number)),
stage_progress: Some(head.number),
};
let tip = headers.last().unwrap();
let mut download_result = headers.clone();
download_result.insert(0, head);
let rx = execute_stage_linear(db.inner(), input, tip.hash(), download_result).await;
let result = rx.await.unwrap();
assert_matches!(result, Ok(ExecOutput { .. }));
@ -364,24 +395,63 @@ pub(crate) mod tests {
}
// A helper function to run [HeaderStage::execute]
// with default consensus, client & test downloader
fn execute_stage(
db: Arc<Env<WriteMap>>,
input: ExecInput,
tip: H256,
download_result: Result<Vec<HeaderLocked>, DownloadError>,
) -> oneshot::Receiver<Result<ExecOutput, StageError>> {
let (tx, rx) = oneshot::channel();
let client = Arc::new(TestHeadersClient::default());
let consensus = Arc::new(TestConsensus::default());
let downloader = test_utils::TestDownloader::new(download_result);
let mut stage =
HeaderStage { consensus: consensus.clone(), client: client.clone(), downloader };
tokio::spawn(async move {
let db = db.clone();
let mut db = DBContainer::<Env<WriteMap>>::new(db.borrow()).unwrap();
let mut stage = HeaderStage {
client: &*CLIENT,
consensus: &*CONSENSUS,
downloader: test_utils::TestDownloader::new(download_result),
};
let result = stage.execute(&mut db, input).await;
db.commit().expect("failed to commit");
tx.send(result).expect("failed to send result");
});
consensus.update_tip(tip);
rx
}
// A helper function to run [HeaderStage::execute]
// with linear downloader
async fn execute_stage_linear(
db: Arc<Env<WriteMap>>,
input: ExecInput,
tip: H256,
headers: Vec<HeaderLocked>,
) -> oneshot::Receiver<Result<ExecOutput, StageError>> {
let (tx, rx) = oneshot::channel();
let consensus = Arc::new(TestConsensus::default());
let client = Arc::new(TestHeadersClient::default());
let downloader = LinearDownloadBuilder::new().build(consensus.clone(), client.clone());
let mut stage =
HeaderStage { consensus: consensus.clone(), client: client.clone(), downloader };
tokio::spawn(async move {
let mut db = DBContainer::<Env<WriteMap>>::new(db.borrow()).unwrap();
let result = stage.execute(&mut db, input).await;
db.commit().expect("failed to commit");
tx.send(result).expect("failed to send result");
});
consensus.update_tip(tip);
client
.on_header_request(1, |id, _| {
client.send_header_response(
id,
headers.clone().into_iter().map(|h| h.unlock()).collect(),
)
})
.await;
rx
}
@ -391,14 +461,13 @@ pub(crate) mod tests {
input: UnwindInput,
) -> oneshot::Receiver<Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>>> {
let (tx, rx) = oneshot::channel();
let mut stage = HeaderStage {
client: Arc::new(TestHeadersClient::default()),
consensus: Arc::new(TestConsensus::default()),
downloader: test_utils::TestDownloader::new(Ok(vec![])),
};
tokio::spawn(async move {
let db = db.clone();
let mut db = DBContainer::<Env<WriteMap>>::new(db.borrow()).unwrap();
let mut stage = HeaderStage {
client: &*CLIENT,
consensus: &*CONSENSUS,
downloader: test_utils::TestDownloader::new(Ok(vec![])),
};
let result = stage.unwind(&mut db, input).await;
db.commit().expect("failed to commit");
tx.send(result).expect("failed to send result");