diff --git a/Cargo.lock b/Cargo.lock index 38fdbbf96..ba818364f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3344,6 +3344,7 @@ dependencies = [ "assert_matches", "async-trait", "metrics", + "rand 0.8.5", "reth-db", "reth-headers-downloaders", "reth-interfaces", diff --git a/crates/interfaces/src/db/models/blocks.rs b/crates/interfaces/src/db/models/blocks.rs index 1c8ab5fba..c24c0ca4b 100644 --- a/crates/interfaces/src/db/models/blocks.rs +++ b/crates/interfaces/src/db/models/blocks.rs @@ -32,6 +32,16 @@ impl BlockNumHash { pub fn take(self) -> (BlockNumber, BlockHash) { (self.0 .0, self.0 .1) } + + /// Return the block number + pub fn number(&self) -> BlockNumber { + self.0 .0 + } + + /// Return the block hash + pub fn hash(&self) -> BlockHash { + self.0 .1 + } } impl From<(u64, H256)> for BlockNumHash { diff --git a/crates/interfaces/src/p2p/headers/downloader.rs b/crates/interfaces/src/p2p/headers/downloader.rs index ccbc2630a..ef49e2ed9 100644 --- a/crates/interfaces/src/p2p/headers/downloader.rs +++ b/crates/interfaces/src/p2p/headers/downloader.rs @@ -53,6 +53,7 @@ impl DownloadError { /// The header downloading strategy #[async_trait] +#[auto_impl::auto_impl(&, Arc, Box)] pub trait Downloader: Sync + Send { /// The Consensus used to verify block validity when /// downloading diff --git a/crates/stages/Cargo.toml b/crates/stages/Cargo.toml index ea8bc595f..099d64e8b 100644 --- a/crates/stages/Cargo.toml +++ b/crates/stages/Cargo.toml @@ -27,3 +27,4 @@ tokio = { version = "*", features = ["rt", "sync", "macros"] } tokio-stream = "0.1.10" tempfile = "3.3.0" assert_matches = "1.5.0" +rand = "0.8.5" diff --git a/crates/stages/src/error.rs b/crates/stages/src/error.rs index 546b22a4d..522457917 100644 --- a/crates/stages/src/error.rs +++ b/crates/stages/src/error.rs @@ -32,19 +32,27 @@ pub enum StageError { pub enum DatabaseIntegrityError { /// Cannonical hash is missing from db #[error("no cannonical hash for block #{number}")] - NoCannonicalHash { + CannonicalHash { /// The block number key number: BlockNumber, }, /// Cannonical header is missing from db #[error("no cannonical hash for block #{number}")] - NoCannonicalHeader { + CannonicalHeader { /// The block number key number: BlockNumber, }, /// Header is missing from db #[error("no header for block #{number} ({hash})")] - NoHeader { + Header { + /// The block number key + number: BlockNumber, + /// The block hash key + hash: H256, + }, + /// Cumulative transaction count is missing from db + #[error("no cumulative tx count for ${number} ({hash})")] + CumulativeTxCount { /// The block number key number: BlockNumber, /// The block hash key diff --git a/crates/stages/src/stage.rs b/crates/stages/src/stage.rs index d962d4e55..cafa04e8b 100644 --- a/crates/stages/src/stage.rs +++ b/crates/stages/src/stage.rs @@ -4,7 +4,7 @@ use reth_interfaces::db::{DBContainer, Database}; use reth_primitives::BlockNumber; /// Stage execution input, see [Stage::execute]. -#[derive(Debug, PartialEq, Eq, Clone, Copy)] +#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)] pub struct ExecInput { /// The stage that was run before the current stage and the block number it reached. pub previous_stage: Option<(StageId, BlockNumber)>, @@ -13,7 +13,7 @@ pub struct ExecInput { } /// Stage unwind input, see [Stage::unwind]. -#[derive(Debug, PartialEq, Eq, Clone, Copy)] +#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)] pub struct UnwindInput { /// The current highest block of the stage. pub stage_progress: BlockNumber, diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index 27616f23b..3ffa711c4 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -55,12 +55,12 @@ impl Stage // TODO: handle input.max_block let last_hash = tx.get::(last_block_num)?.ok_or_else(|| -> StageError { - DatabaseIntegrityError::NoCannonicalHash { number: last_block_num }.into() + DatabaseIntegrityError::CannonicalHash { number: last_block_num }.into() })?; let last_header = tx .get::((last_block_num, last_hash).into())? .ok_or_else(|| -> StageError { - DatabaseIntegrityError::NoHeader { number: last_block_num, hash: last_hash }.into() + DatabaseIntegrityError::Header { number: last_block_num, hash: last_hash }.into() })?; let head = HeaderLocked::new(last_header, last_hash); @@ -121,7 +121,7 @@ impl HeaderStage { height: BlockNumber, ) -> Result<(), StageError> { let hash = tx.get::(height)?.ok_or_else(|| -> StageError { - DatabaseIntegrityError::NoCannonicalHeader { number: height }.into() + DatabaseIntegrityError::CannonicalHeader { number: height }.into() })?; let td: Vec = tx.get::((height, hash).into())?.unwrap(); // TODO: self.client.update_status(height, hash, H256::from_slice(&td)).await; @@ -227,7 +227,7 @@ pub mod tests { let rx = execute_stage(db.inner(), input, H256::zero(), Ok(vec![])); assert_matches!( rx.await.unwrap(), - Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::NoCannonicalHeader { .. })) + Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::CannonicalHeader { .. })) ); } diff --git a/crates/stages/src/stages/mod.rs b/crates/stages/src/stages/mod.rs index 9afee3d00..2eb1b3483 100644 --- a/crates/stages/src/stages/mod.rs +++ b/crates/stages/src/stages/mod.rs @@ -1,2 +1,4 @@ /// The headers stage. pub mod headers; +/// The cumulative transaction index stage. +pub mod tx_index; diff --git a/crates/stages/src/stages/tx_index.rs b/crates/stages/src/stages/tx_index.rs new file mode 100644 index 000000000..a2d986ba7 --- /dev/null +++ b/crates/stages/src/stages/tx_index.rs @@ -0,0 +1,242 @@ +use crate::{ + util::unwind::unwind_table_by_num_hash, DatabaseIntegrityError, ExecInput, ExecOutput, Stage, + StageError, StageId, UnwindInput, UnwindOutput, +}; +use reth_interfaces::db::{tables, DBContainer, Database, DbCursorRO, DbCursorRW, DbTx, DbTxMut}; +use std::fmt::Debug; + +const TX_INDEX: StageId = StageId("TxIndex"); + +/// The cumulative transaction index stage +/// implementation for staged sync. This stage +/// updates the cumulative transaction count per block. +/// +/// e.g. [key, value] entries in [tables::CumulativeTxCount] +/// block #1 with 24 transactions - [1, 24] +/// block #2 with 42 transactions - [2, 66] +/// block #3 with 33 transaction - [3, 99] +#[derive(Debug)] +pub struct TxIndex; + +#[async_trait::async_trait] +impl Stage for TxIndex { + /// Return the id of the stage + fn id(&self) -> StageId { + TX_INDEX + } + + /// Execute the stage + async fn execute( + &mut self, + db: &mut DBContainer<'_, DB>, + input: ExecInput, + ) -> Result { + let tx = db.get_mut(); + + // The progress of this stage during last iteration + let last_block = input.stage_progress.unwrap_or_default(); + let last_hash = tx + .get::(last_block)? + .ok_or(DatabaseIntegrityError::CannonicalHeader { number: last_block })?; + + // The start block for this iteration + let start_block = last_block + 1; + let start_hash = tx + .get::(start_block)? + .ok_or(DatabaseIntegrityError::CannonicalHeader { number: start_block })?; + + // The maximum block that this stage should insert to + let max_block = input.previous_stage.as_ref().map(|(_, block)| *block).unwrap_or_default(); + + // Get the cursor over the table + let mut cursor = tx.cursor_mut::()?; + // Find the last count that was inserted during previous iteration + let (_, mut count) = cursor.seek_exact((last_block, last_hash).into())?.ok_or( + DatabaseIntegrityError::CumulativeTxCount { number: last_block, hash: last_hash }, + )?; + + // Get the cursor over block bodies + let mut body_cursor = tx.cursor_mut::()?; + let walker = body_cursor.walk((start_block, start_hash).into())?; + + // Walk the block body entries up to maximum block (including) + let entries = walker + .take_while(|b| b.as_ref().map(|(k, _)| k.number() <= max_block).unwrap_or_default()); + + // Aggregate and insert cumulative transaction count for each block number + for entry in entries { + let (key, tx_count) = entry?; + count += tx_count as u64; + cursor.append(key, count)?; + } + + Ok(ExecOutput { done: true, reached_tip: true, stage_progress: max_block }) + } + + /// Unwind the stage. + async fn unwind( + &mut self, + db: &mut DBContainer<'_, DB>, + input: UnwindInput, + ) -> Result> { + unwind_table_by_num_hash::(db.get_mut(), input.unwind_to)?; + Ok(UnwindOutput { stage_progress: input.unwind_to }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::util::test_utils::{StageTestDB, StageTestRunner}; + use assert_matches::assert_matches; + use reth_interfaces::{db::models::BlockNumHash, test_utils::gen_random_header_range}; + use reth_primitives::H256; + + const TEST_STAGE: StageId = StageId("PrevStage"); + + #[tokio::test] + async fn execute_empty_db() { + let runner = TxIndexTestRunner::default(); + let rx = runner.execute(ExecInput::default()); + assert_matches!( + rx.await.unwrap(), + Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::CannonicalHeader { .. })) + ); + } + + #[tokio::test] + async fn execute_no_prev_tx_count() { + let runner = TxIndexTestRunner::default(); + let headers = gen_random_header_range(0..10, H256::zero()); + runner + .db() + .map_put::(&headers, |h| (h.number, h.hash())) + .expect("failed to insert"); + + let (head, tail) = (headers.first().unwrap(), headers.last().unwrap()); + let input = ExecInput { + previous_stage: Some((TEST_STAGE, tail.number)), + stage_progress: Some(head.number), + }; + let rx = runner.execute(input); + assert_matches!( + rx.await.unwrap(), + Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::CumulativeTxCount { .. })) + ); + } + + #[tokio::test] + async fn execute() { + let runner = TxIndexTestRunner::default(); + let (start, pivot, end) = (0, 100, 200); + let headers = gen_random_header_range(start..end, H256::zero()); + runner + .db() + .map_put::(&headers, |h| (h.number, h.hash())) + .expect("failed to insert"); + runner + .db() + .transform_append::(&headers[..=pivot], |prev, h| { + ( + BlockNumHash((h.number, h.hash())), + prev.unwrap_or_default() + (rand::random::() as u64), + ) + }) + .expect("failed to insert"); + + let (pivot, tail) = (headers.get(pivot).unwrap(), headers.last().unwrap()); + let input = ExecInput { + previous_stage: Some((TEST_STAGE, tail.number)), + stage_progress: Some(pivot.number), + }; + let rx = runner.execute(input); + assert_matches!( + rx.await.unwrap(), + Ok(ExecOutput { stage_progress, done, reached_tip }) + if done && reached_tip && stage_progress == tail.number + ); + } + + #[tokio::test] + async fn unwind_empty_db() { + let runner = TxIndexTestRunner::default(); + let rx = runner.unwind(UnwindInput::default()); + assert_matches!( + rx.await.unwrap(), + Ok(UnwindOutput { stage_progress }) if stage_progress == 0 + ); + } + + #[tokio::test] + async fn unwind_no_input() { + let runner = TxIndexTestRunner::default(); + let headers = gen_random_header_range(0..10, H256::zero()); + runner + .db() + .transform_append::(&headers, |prev, h| { + ( + BlockNumHash((h.number, h.hash())), + prev.unwrap_or_default() + (rand::random::() as u64), + ) + }) + .expect("failed to insert"); + + let rx = runner.unwind(UnwindInput::default()); + assert_matches!( + rx.await.unwrap(), + Ok(UnwindOutput { stage_progress }) if stage_progress == 0 + ); + runner + .db() + .check_no_entry_above::(0, |h| h.number()) + .expect("failed to check tx count"); + } + + #[tokio::test] + async fn unwind_with_db_gaps() { + let runner = TxIndexTestRunner::default(); + let first_range = gen_random_header_range(0..20, H256::zero()); + let second_range = gen_random_header_range(50..100, H256::zero()); + runner + .db() + .transform_append::( + &first_range.iter().chain(second_range.iter()).collect::>(), + |prev, h| { + ( + BlockNumHash((h.number, h.hash())), + prev.unwrap_or_default() + (rand::random::() as u64), + ) + }, + ) + .expect("failed to insert"); + + let unwind_to = 10; + let input = UnwindInput { unwind_to, ..Default::default() }; + let rx = runner.unwind(input); + assert_matches!( + rx.await.unwrap(), + Ok(UnwindOutput { stage_progress }) if stage_progress == unwind_to + ); + runner + .db() + .check_no_entry_above::(unwind_to, |h| h.number()) + .expect("failed to check tx count"); + } + + #[derive(Default)] + pub(crate) struct TxIndexTestRunner { + db: StageTestDB, + } + + impl StageTestRunner for TxIndexTestRunner { + type S = TxIndex; + + fn db(&self) -> &StageTestDB { + &self.db + } + + fn stage(&self) -> Self::S { + TxIndex {} + } + } +} diff --git a/crates/stages/src/util.rs b/crates/stages/src/util.rs index bb70011da..6ad50e0bd 100644 --- a/crates/stages/src/util.rs +++ b/crates/stages/src/util.rs @@ -61,3 +61,208 @@ pub(crate) mod opt { } } } + +pub(crate) mod unwind { + use reth_interfaces::db::{ + models::BlockNumHash, Database, DatabaseGAT, DbCursorRO, DbCursorRW, DbTxMut, Error, Table, + }; + use reth_primitives::BlockNumber; + + /// Unwind table by composite block number hash key + pub(crate) fn unwind_table_by_num_hash( + tx: &mut >::TXMut, + block: BlockNumber, + ) -> Result<(), Error> + where + DB: Database, + T: Table, + { + unwind_table::(tx, block, |key| key.number()) + } + + /// Unwind the table to a provided block + pub(crate) fn unwind_table( + tx: &mut >::TXMut, + block: BlockNumber, + mut selector: F, + ) -> Result<(), Error> + where + DB: Database, + T: Table, + F: FnMut(T::Key) -> BlockNumber, + { + let mut cursor = tx.cursor_mut::()?; + let mut entry = cursor.last()?; + while let Some((key, _)) = entry { + if selector(key) <= block { + break + } + cursor.delete_current()?; + entry = cursor.prev()?; + } + Ok(()) + } +} + +#[cfg(test)] +pub(crate) mod test_utils { + use reth_db::{ + kv::{test_utils::create_test_db, Env, EnvKind}, + mdbx::WriteMap, + }; + use reth_interfaces::db::{DBContainer, DbCursorRO, DbCursorRW, DbTx, DbTxMut, Error, Table}; + use reth_primitives::BlockNumber; + use std::{borrow::Borrow, sync::Arc}; + use tokio::sync::oneshot; + + use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; + + /// The [StageTestDB] is used as an internal + /// database for testing stage implementation. + /// + /// ```rust + /// let db = StageTestDB::default(); + /// stage.execute(&mut db.container(), input); + /// ``` + pub(crate) struct StageTestDB { + db: Arc>, + } + + impl Default for StageTestDB { + /// Create a new instance of [StageTestDB] + fn default() -> Self { + Self { db: Arc::new(create_test_db::(EnvKind::RW)) } + } + } + + impl StageTestDB { + /// Get a pointer to an internal database. + pub(crate) fn inner(&self) -> Arc> { + self.db.clone() + } + + /// Return a database wrapped in [DBContainer]. + pub(crate) fn container(&self) -> DBContainer<'_, Env> { + DBContainer::new(self.db.borrow()).expect("failed to create db container") + } + + /// Map a collection of values and store them in the database. + /// This function commits the transaction before exiting. + /// + /// ```rust + /// let db = StageTestDB::default(); + /// db.map_put::(&items, |item| item)?; + /// ``` + pub(crate) fn map_put(&self, values: &[S], mut map: F) -> Result<(), Error> + where + T: Table, + S: Clone, + F: FnMut(&S) -> (T::Key, T::Value), + { + let mut db = self.container(); + let tx = db.get_mut(); + values.iter().try_for_each(|src| { + let (k, v) = map(src); + tx.put::(k, v) + })?; + db.commit()?; + Ok(()) + } + + /// Transform a collection of values using a callback and store + /// them in the database. The callback additionally accepts the + /// optional last element that was stored. + /// This function commits the transaction before exiting. + /// + /// ```rust + /// let db = StageTestDB::default(); + /// db.transform_append::(&items, |prev, item| prev.unwrap_or_default() + item)?; + /// ``` + pub(crate) fn transform_append( + &self, + values: &[S], + mut transform: F, + ) -> Result<(), Error> + where + T: Table, + ::Value: Clone, + S: Clone, + F: FnMut(&Option<::Value>, &S) -> (T::Key, T::Value), + { + let mut db = self.container(); + let tx = db.get_mut(); + let mut cursor = tx.cursor_mut::()?; + let mut last = cursor.last()?.map(|(_, v)| v); + values.iter().try_for_each(|src| { + let (k, v) = transform(&last, src); + last = Some(v.clone()); + cursor.append(k, v) + })?; + db.commit()?; + Ok(()) + } + + /// Check there there is no table entry above a given block + pub(crate) fn check_no_entry_above( + &self, + block: BlockNumber, + mut selector: F, + ) -> Result<(), Error> + where + T: Table, + F: FnMut(T::Key) -> BlockNumber, + { + let db = self.container(); + let tx = db.get(); + + let mut cursor = tx.cursor::()?; + if let Some((key, _)) = cursor.last()? { + assert!(selector(key) <= block); + } + + Ok(()) + } + } + + /// A generic test runner for stages. + #[async_trait::async_trait] + pub(crate) trait StageTestRunner { + type S: Stage> + 'static; + + /// Return a reference to the database. + fn db(&self) -> &StageTestDB; + + /// Return an instance of a Stage. + fn stage(&self) -> Self::S; + + /// Run [Stage::execute] and return a receiver for the result. + fn execute(&self, input: ExecInput) -> oneshot::Receiver> { + let (tx, rx) = oneshot::channel(); + let (db, mut stage) = (self.db().inner(), self.stage()); + tokio::spawn(async move { + let mut db = DBContainer::new(db.borrow()).expect("failed to create db container"); + let result = stage.execute(&mut db, input).await; + db.commit().expect("failed to commit"); + tx.send(result).expect("failed to send message") + }); + rx + } + + /// Run [Stage::unwind] and return a receiver for the result. + fn unwind( + &self, + input: UnwindInput, + ) -> oneshot::Receiver>> + { + let (tx, rx) = oneshot::channel(); + let (db, mut stage) = (self.db().inner(), self.stage()); + tokio::spawn(async move { + let mut db = DBContainer::new(db.borrow()).expect("failed to create db container"); + let result = stage.unwind(&mut db, input).await; + db.commit().expect("failed to commit"); + tx.send(result).expect("failed to send result"); + }); + rx + } + } +}