diff --git a/Cargo.lock b/Cargo.lock index 659ad061c..a040a58b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3320,8 +3320,8 @@ dependencies = [ "assert_matches", "async-trait", "metrics", - "once_cell", "reth-db", + "reth-headers-downloaders", "reth-interfaces", "reth-primitives", "tempfile", diff --git a/crates/stages/Cargo.toml b/crates/stages/Cargo.toml index 1ac219b20..ea8bc595f 100644 --- a/crates/stages/Cargo.toml +++ b/crates/stages/Cargo.toml @@ -22,8 +22,8 @@ metrics = "0.20.1" [dev-dependencies] reth-db = { path = "../db", features = ["test-utils"] } reth-interfaces = { path = "../interfaces", features = ["test-utils"] } +reth-headers-downloaders = { path = "../net/headers-downloaders" } tokio = { version = "*", features = ["rt", "sync", "macros"] } tokio-stream = "0.1.10" -once_cell = "1.15.0" tempfile = "3.3.0" assert_matches = "1.5.0" diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index 49efe843a..2a7798b13 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -14,7 +14,7 @@ use reth_interfaces::{ }, }; use reth_primitives::{rpc::BigEndianHash, BlockNumber, HeaderLocked, H256, U256}; -use std::fmt::Debug; +use std::{fmt::Debug, sync::Arc}; use tracing::*; const HEADERS: StageId = StageId("HEADERS"); @@ -25,9 +25,9 @@ pub struct HeaderStage { /// Strategy for downloading the headers pub downloader: D, /// Consensus client implementation - pub consensus: C, + pub consensus: Arc, /// Downloader client implementation - pub client: H, + pub client: Arc, } #[async_trait::async_trait] @@ -201,11 +201,11 @@ impl HeaderStage { } #[cfg(test)] -pub(crate) mod tests { +pub mod tests { use super::*; use assert_matches::assert_matches; - use once_cell::sync::Lazy; use reth_db::{kv::Env, mdbx::WriteMap}; + use reth_headers_downloaders::linear::LinearDownloadBuilder; use reth_interfaces::{ db::DBContainer, test_utils::{ @@ -217,8 +217,6 @@ pub(crate) mod tests { use tokio::sync::oneshot; const TEST_STAGE: StageId = StageId("HEADERS"); - static CONSENSUS: Lazy = Lazy::new(|| TestConsensus::default()); - static CLIENT: Lazy = Lazy::new(|| TestHeadersClient::default()); #[tokio::test] // Check that the execution errors on empty database or @@ -226,7 +224,7 @@ pub(crate) mod tests { async fn headers_execute_empty_db() { let db = HeadersDB::default(); let input = ExecInput { previous_stage: None, stage_progress: None }; - let rx = execute_stage(db.inner(), input, Ok(vec![])); + let rx = execute_stage(db.inner(), input, H256::zero(), Ok(vec![])); assert_matches!( rx.await.unwrap(), Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::NoCannonicalHeader { .. })) @@ -241,8 +239,13 @@ pub(crate) mod tests { db.insert_header(&head).expect("failed to insert header"); let input = ExecInput { previous_stage: None, stage_progress: None }; - let rx = execute_stage(db.inner(), input, Err(DownloadError::Timeout { request_id: 0 })); - CONSENSUS.update_tip(H256::from_low_u64_be(1)); + let rx = execute_stage( + db.inner(), + input, + H256::from_low_u64_be(1), + Err(DownloadError::Timeout { request_id: 0 }), + ); + assert_matches!(rx.await.unwrap(), Ok(ExecOutput { done, .. }) if !done); } @@ -257,9 +260,9 @@ pub(crate) mod tests { let rx = execute_stage( db.inner(), input, + H256::from_low_u64_be(1), Err(DownloadError::HeaderValidation { hash: H256::zero(), details: "".to_owned() }), ); - CONSENSUS.update_tip(H256::from_low_u64_be(1)); assert_matches!(rx.await.unwrap(), Err(StageError::Validation { block }) if block == 0); } @@ -276,9 +279,8 @@ pub(crate) mod tests { let result: Vec<_> = headers.iter().rev().cloned().collect(); let input = ExecInput { previous_stage: None, stage_progress: None }; - let rx = execute_stage(db.inner(), input, Ok(result)); let tip = headers.last().unwrap(); - CONSENSUS.update_tip(tip.hash()); + let rx = execute_stage(db.inner(), input, tip.hash(), Ok(result)); let result = rx.await.unwrap(); assert_matches!(result, Ok(ExecOutput { .. })); @@ -307,9 +309,38 @@ pub(crate) mod tests { previous_stage: Some((TEST_STAGE, head.number)), stage_progress: Some(head.number), }; - let rx = execute_stage(db.inner(), input, Ok(result)); let tip = headers.last().unwrap(); - CONSENSUS.update_tip(tip.hash()); + let rx = execute_stage(db.inner(), input, tip.hash(), Ok(result)); + + let result = rx.await.unwrap(); + assert_matches!(result, Ok(ExecOutput { .. })); + let result = result.unwrap(); + assert!(result.done && result.reached_tip); + assert_eq!(result.stage_progress, tip.number); + + for header in headers { + assert!(db.validate_db_header(&header).is_ok()); + } + } + + #[tokio::test] + // Execute the stage with linear downloader + async fn headers_execute_linear() { + // TODO: set bigger range once `MDBX_EKEYMISMATCH` issue is resolved + let (start, end) = (1000, 1024); + let head = gen_random_header(start, None); + let headers = gen_random_header_range(start + 1..end, head.hash()); + let db = HeadersDB::default(); + db.insert_header(&head).expect("failed to insert header"); + + let input = ExecInput { + previous_stage: Some((TEST_STAGE, head.number)), + stage_progress: Some(head.number), + }; + let tip = headers.last().unwrap(); + let mut download_result = headers.clone(); + download_result.insert(0, head); + let rx = execute_stage_linear(db.inner(), input, tip.hash(), download_result).await; let result = rx.await.unwrap(); assert_matches!(result, Ok(ExecOutput { .. })); @@ -364,24 +395,63 @@ pub(crate) mod tests { } // A helper function to run [HeaderStage::execute] + // with default consensus, client & test downloader fn execute_stage( db: Arc>, input: ExecInput, + tip: H256, download_result: Result, DownloadError>, ) -> oneshot::Receiver> { let (tx, rx) = oneshot::channel(); + + let client = Arc::new(TestHeadersClient::default()); + let consensus = Arc::new(TestConsensus::default()); + let downloader = test_utils::TestDownloader::new(download_result); + + let mut stage = + HeaderStage { consensus: consensus.clone(), client: client.clone(), downloader }; tokio::spawn(async move { - let db = db.clone(); let mut db = DBContainer::>::new(db.borrow()).unwrap(); - let mut stage = HeaderStage { - client: &*CLIENT, - consensus: &*CONSENSUS, - downloader: test_utils::TestDownloader::new(download_result), - }; let result = stage.execute(&mut db, input).await; db.commit().expect("failed to commit"); tx.send(result).expect("failed to send result"); }); + consensus.update_tip(tip); + rx + } + + // A helper function to run [HeaderStage::execute] + // with linear downloader + async fn execute_stage_linear( + db: Arc>, + input: ExecInput, + tip: H256, + headers: Vec, + ) -> oneshot::Receiver> { + let (tx, rx) = oneshot::channel(); + + let consensus = Arc::new(TestConsensus::default()); + let client = Arc::new(TestHeadersClient::default()); + let downloader = LinearDownloadBuilder::new().build(consensus.clone(), client.clone()); + + let mut stage = + HeaderStage { consensus: consensus.clone(), client: client.clone(), downloader }; + tokio::spawn(async move { + let mut db = DBContainer::>::new(db.borrow()).unwrap(); + let result = stage.execute(&mut db, input).await; + db.commit().expect("failed to commit"); + tx.send(result).expect("failed to send result"); + }); + + consensus.update_tip(tip); + client + .on_header_request(1, |id, _| { + client.send_header_response( + id, + headers.clone().into_iter().map(|h| h.unlock()).collect(), + ) + }) + .await; rx } @@ -391,14 +461,13 @@ pub(crate) mod tests { input: UnwindInput, ) -> oneshot::Receiver>> { let (tx, rx) = oneshot::channel(); + let mut stage = HeaderStage { + client: Arc::new(TestHeadersClient::default()), + consensus: Arc::new(TestConsensus::default()), + downloader: test_utils::TestDownloader::new(Ok(vec![])), + }; tokio::spawn(async move { - let db = db.clone(); let mut db = DBContainer::>::new(db.borrow()).unwrap(); - let mut stage = HeaderStage { - client: &*CLIENT, - consensus: &*CONSENSUS, - downloader: test_utils::TestDownloader::new(Ok(vec![])), - }; let result = stage.unwind(&mut db, input).await; db.commit().expect("failed to commit"); tx.send(result).expect("failed to send result");