feat(sync): stage tx index (#149)

* init

* feat(sync): tx index stage

* create test runner

* finish tests

* clippy

* cleanup & docs

* add more comments

* revert headers test refactor

* rm unused function
This commit is contained in:
Roman Krasiuk
2022-11-04 12:28:18 +02:00
committed by GitHub
parent 9b74d7d39d
commit f6d3a49d28
10 changed files with 479 additions and 9 deletions

View File

@ -32,6 +32,16 @@ impl BlockNumHash {
pub fn take(self) -> (BlockNumber, BlockHash) {
(self.0 .0, self.0 .1)
}
/// Return the block number
pub fn number(&self) -> BlockNumber {
self.0 .0
}
/// Return the block hash
pub fn hash(&self) -> BlockHash {
self.0 .1
}
}
impl From<(u64, H256)> for BlockNumHash {

View File

@ -53,6 +53,7 @@ impl DownloadError {
/// The header downloading strategy
#[async_trait]
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait Downloader: Sync + Send {
/// The Consensus used to verify block validity when
/// downloading

View File

@ -27,3 +27,4 @@ tokio = { version = "*", features = ["rt", "sync", "macros"] }
tokio-stream = "0.1.10"
tempfile = "3.3.0"
assert_matches = "1.5.0"
rand = "0.8.5"

View File

@ -32,19 +32,27 @@ pub enum StageError {
pub enum DatabaseIntegrityError {
/// Cannonical hash is missing from db
#[error("no cannonical hash for block #{number}")]
NoCannonicalHash {
CannonicalHash {
/// The block number key
number: BlockNumber,
},
/// Cannonical header is missing from db
#[error("no cannonical hash for block #{number}")]
NoCannonicalHeader {
CannonicalHeader {
/// The block number key
number: BlockNumber,
},
/// Header is missing from db
#[error("no header for block #{number} ({hash})")]
NoHeader {
Header {
/// The block number key
number: BlockNumber,
/// The block hash key
hash: H256,
},
/// Cumulative transaction count is missing from db
#[error("no cumulative tx count for ${number} ({hash})")]
CumulativeTxCount {
/// The block number key
number: BlockNumber,
/// The block hash key

View File

@ -4,7 +4,7 @@ use reth_interfaces::db::{DBContainer, Database};
use reth_primitives::BlockNumber;
/// Stage execution input, see [Stage::execute].
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
pub struct ExecInput {
/// The stage that was run before the current stage and the block number it reached.
pub previous_stage: Option<(StageId, BlockNumber)>,
@ -13,7 +13,7 @@ pub struct ExecInput {
}
/// Stage unwind input, see [Stage::unwind].
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
pub struct UnwindInput {
/// The current highest block of the stage.
pub stage_progress: BlockNumber,

View File

@ -55,12 +55,12 @@ impl<DB: Database, D: Downloader, C: Consensus, H: HeadersClient> Stage<DB>
// TODO: handle input.max_block
let last_hash =
tx.get::<tables::CanonicalHeaders>(last_block_num)?.ok_or_else(|| -> StageError {
DatabaseIntegrityError::NoCannonicalHash { number: last_block_num }.into()
DatabaseIntegrityError::CannonicalHash { number: last_block_num }.into()
})?;
let last_header = tx
.get::<tables::Headers>((last_block_num, last_hash).into())?
.ok_or_else(|| -> StageError {
DatabaseIntegrityError::NoHeader { number: last_block_num, hash: last_hash }.into()
DatabaseIntegrityError::Header { number: last_block_num, hash: last_hash }.into()
})?;
let head = HeaderLocked::new(last_header, last_hash);
@ -121,7 +121,7 @@ impl<D: Downloader, C: Consensus, H: HeadersClient> HeaderStage<D, C, H> {
height: BlockNumber,
) -> Result<(), StageError> {
let hash = tx.get::<tables::CanonicalHeaders>(height)?.ok_or_else(|| -> StageError {
DatabaseIntegrityError::NoCannonicalHeader { number: height }.into()
DatabaseIntegrityError::CannonicalHeader { number: height }.into()
})?;
let td: Vec<u8> = tx.get::<tables::HeaderTD>((height, hash).into())?.unwrap(); // TODO:
self.client.update_status(height, hash, H256::from_slice(&td)).await;
@ -227,7 +227,7 @@ pub mod tests {
let rx = execute_stage(db.inner(), input, H256::zero(), Ok(vec![]));
assert_matches!(
rx.await.unwrap(),
Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::NoCannonicalHeader { .. }))
Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::CannonicalHeader { .. }))
);
}

View File

@ -1,2 +1,4 @@
/// The headers stage.
pub mod headers;
/// The cumulative transaction index stage.
pub mod tx_index;

View File

@ -0,0 +1,242 @@
use crate::{
util::unwind::unwind_table_by_num_hash, DatabaseIntegrityError, ExecInput, ExecOutput, Stage,
StageError, StageId, UnwindInput, UnwindOutput,
};
use reth_interfaces::db::{tables, DBContainer, Database, DbCursorRO, DbCursorRW, DbTx, DbTxMut};
use std::fmt::Debug;
const TX_INDEX: StageId = StageId("TxIndex");
/// The cumulative transaction index stage
/// implementation for staged sync. This stage
/// updates the cumulative transaction count per block.
///
/// e.g. [key, value] entries in [tables::CumulativeTxCount]
/// block #1 with 24 transactions - [1, 24]
/// block #2 with 42 transactions - [2, 66]
/// block #3 with 33 transaction - [3, 99]
#[derive(Debug)]
pub struct TxIndex;
#[async_trait::async_trait]
impl<DB: Database> Stage<DB> for TxIndex {
/// Return the id of the stage
fn id(&self) -> StageId {
TX_INDEX
}
/// Execute the stage
async fn execute(
&mut self,
db: &mut DBContainer<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let tx = db.get_mut();
// The progress of this stage during last iteration
let last_block = input.stage_progress.unwrap_or_default();
let last_hash = tx
.get::<tables::CanonicalHeaders>(last_block)?
.ok_or(DatabaseIntegrityError::CannonicalHeader { number: last_block })?;
// The start block for this iteration
let start_block = last_block + 1;
let start_hash = tx
.get::<tables::CanonicalHeaders>(start_block)?
.ok_or(DatabaseIntegrityError::CannonicalHeader { number: start_block })?;
// The maximum block that this stage should insert to
let max_block = input.previous_stage.as_ref().map(|(_, block)| *block).unwrap_or_default();
// Get the cursor over the table
let mut cursor = tx.cursor_mut::<tables::CumulativeTxCount>()?;
// Find the last count that was inserted during previous iteration
let (_, mut count) = cursor.seek_exact((last_block, last_hash).into())?.ok_or(
DatabaseIntegrityError::CumulativeTxCount { number: last_block, hash: last_hash },
)?;
// Get the cursor over block bodies
let mut body_cursor = tx.cursor_mut::<tables::BlockBodies>()?;
let walker = body_cursor.walk((start_block, start_hash).into())?;
// Walk the block body entries up to maximum block (including)
let entries = walker
.take_while(|b| b.as_ref().map(|(k, _)| k.number() <= max_block).unwrap_or_default());
// Aggregate and insert cumulative transaction count for each block number
for entry in entries {
let (key, tx_count) = entry?;
count += tx_count as u64;
cursor.append(key, count)?;
}
Ok(ExecOutput { done: true, reached_tip: true, stage_progress: max_block })
}
/// Unwind the stage.
async fn unwind(
&mut self,
db: &mut DBContainer<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>> {
unwind_table_by_num_hash::<DB, tables::CumulativeTxCount>(db.get_mut(), input.unwind_to)?;
Ok(UnwindOutput { stage_progress: input.unwind_to })
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::util::test_utils::{StageTestDB, StageTestRunner};
use assert_matches::assert_matches;
use reth_interfaces::{db::models::BlockNumHash, test_utils::gen_random_header_range};
use reth_primitives::H256;
const TEST_STAGE: StageId = StageId("PrevStage");
#[tokio::test]
async fn execute_empty_db() {
let runner = TxIndexTestRunner::default();
let rx = runner.execute(ExecInput::default());
assert_matches!(
rx.await.unwrap(),
Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::CannonicalHeader { .. }))
);
}
#[tokio::test]
async fn execute_no_prev_tx_count() {
let runner = TxIndexTestRunner::default();
let headers = gen_random_header_range(0..10, H256::zero());
runner
.db()
.map_put::<tables::CanonicalHeaders, _, _>(&headers, |h| (h.number, h.hash()))
.expect("failed to insert");
let (head, tail) = (headers.first().unwrap(), headers.last().unwrap());
let input = ExecInput {
previous_stage: Some((TEST_STAGE, tail.number)),
stage_progress: Some(head.number),
};
let rx = runner.execute(input);
assert_matches!(
rx.await.unwrap(),
Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::CumulativeTxCount { .. }))
);
}
#[tokio::test]
async fn execute() {
let runner = TxIndexTestRunner::default();
let (start, pivot, end) = (0, 100, 200);
let headers = gen_random_header_range(start..end, H256::zero());
runner
.db()
.map_put::<tables::CanonicalHeaders, _, _>(&headers, |h| (h.number, h.hash()))
.expect("failed to insert");
runner
.db()
.transform_append::<tables::CumulativeTxCount, _, _>(&headers[..=pivot], |prev, h| {
(
BlockNumHash((h.number, h.hash())),
prev.unwrap_or_default() + (rand::random::<u8>() as u64),
)
})
.expect("failed to insert");
let (pivot, tail) = (headers.get(pivot).unwrap(), headers.last().unwrap());
let input = ExecInput {
previous_stage: Some((TEST_STAGE, tail.number)),
stage_progress: Some(pivot.number),
};
let rx = runner.execute(input);
assert_matches!(
rx.await.unwrap(),
Ok(ExecOutput { stage_progress, done, reached_tip })
if done && reached_tip && stage_progress == tail.number
);
}
#[tokio::test]
async fn unwind_empty_db() {
let runner = TxIndexTestRunner::default();
let rx = runner.unwind(UnwindInput::default());
assert_matches!(
rx.await.unwrap(),
Ok(UnwindOutput { stage_progress }) if stage_progress == 0
);
}
#[tokio::test]
async fn unwind_no_input() {
let runner = TxIndexTestRunner::default();
let headers = gen_random_header_range(0..10, H256::zero());
runner
.db()
.transform_append::<tables::CumulativeTxCount, _, _>(&headers, |prev, h| {
(
BlockNumHash((h.number, h.hash())),
prev.unwrap_or_default() + (rand::random::<u8>() as u64),
)
})
.expect("failed to insert");
let rx = runner.unwind(UnwindInput::default());
assert_matches!(
rx.await.unwrap(),
Ok(UnwindOutput { stage_progress }) if stage_progress == 0
);
runner
.db()
.check_no_entry_above::<tables::CumulativeTxCount, _>(0, |h| h.number())
.expect("failed to check tx count");
}
#[tokio::test]
async fn unwind_with_db_gaps() {
let runner = TxIndexTestRunner::default();
let first_range = gen_random_header_range(0..20, H256::zero());
let second_range = gen_random_header_range(50..100, H256::zero());
runner
.db()
.transform_append::<tables::CumulativeTxCount, _, _>(
&first_range.iter().chain(second_range.iter()).collect::<Vec<_>>(),
|prev, h| {
(
BlockNumHash((h.number, h.hash())),
prev.unwrap_or_default() + (rand::random::<u8>() as u64),
)
},
)
.expect("failed to insert");
let unwind_to = 10;
let input = UnwindInput { unwind_to, ..Default::default() };
let rx = runner.unwind(input);
assert_matches!(
rx.await.unwrap(),
Ok(UnwindOutput { stage_progress }) if stage_progress == unwind_to
);
runner
.db()
.check_no_entry_above::<tables::CumulativeTxCount, _>(unwind_to, |h| h.number())
.expect("failed to check tx count");
}
#[derive(Default)]
pub(crate) struct TxIndexTestRunner {
db: StageTestDB,
}
impl StageTestRunner for TxIndexTestRunner {
type S = TxIndex;
fn db(&self) -> &StageTestDB {
&self.db
}
fn stage(&self) -> Self::S {
TxIndex {}
}
}
}

View File

@ -61,3 +61,208 @@ pub(crate) mod opt {
}
}
}
pub(crate) mod unwind {
use reth_interfaces::db::{
models::BlockNumHash, Database, DatabaseGAT, DbCursorRO, DbCursorRW, DbTxMut, Error, Table,
};
use reth_primitives::BlockNumber;
/// Unwind table by composite block number hash key
pub(crate) fn unwind_table_by_num_hash<DB, T>(
tx: &mut <DB as DatabaseGAT<'_>>::TXMut,
block: BlockNumber,
) -> Result<(), Error>
where
DB: Database,
T: Table<Key = BlockNumHash>,
{
unwind_table::<DB, T, _>(tx, block, |key| key.number())
}
/// Unwind the table to a provided block
pub(crate) fn unwind_table<DB, T, F>(
tx: &mut <DB as DatabaseGAT<'_>>::TXMut,
block: BlockNumber,
mut selector: F,
) -> Result<(), Error>
where
DB: Database,
T: Table,
F: FnMut(T::Key) -> BlockNumber,
{
let mut cursor = tx.cursor_mut::<T>()?;
let mut entry = cursor.last()?;
while let Some((key, _)) = entry {
if selector(key) <= block {
break
}
cursor.delete_current()?;
entry = cursor.prev()?;
}
Ok(())
}
}
#[cfg(test)]
pub(crate) mod test_utils {
use reth_db::{
kv::{test_utils::create_test_db, Env, EnvKind},
mdbx::WriteMap,
};
use reth_interfaces::db::{DBContainer, DbCursorRO, DbCursorRW, DbTx, DbTxMut, Error, Table};
use reth_primitives::BlockNumber;
use std::{borrow::Borrow, sync::Arc};
use tokio::sync::oneshot;
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
/// The [StageTestDB] is used as an internal
/// database for testing stage implementation.
///
/// ```rust
/// let db = StageTestDB::default();
/// stage.execute(&mut db.container(), input);
/// ```
pub(crate) struct StageTestDB {
db: Arc<Env<WriteMap>>,
}
impl Default for StageTestDB {
/// Create a new instance of [StageTestDB]
fn default() -> Self {
Self { db: Arc::new(create_test_db::<WriteMap>(EnvKind::RW)) }
}
}
impl StageTestDB {
/// Get a pointer to an internal database.
pub(crate) fn inner(&self) -> Arc<Env<WriteMap>> {
self.db.clone()
}
/// Return a database wrapped in [DBContainer].
pub(crate) fn container(&self) -> DBContainer<'_, Env<WriteMap>> {
DBContainer::new(self.db.borrow()).expect("failed to create db container")
}
/// Map a collection of values and store them in the database.
/// This function commits the transaction before exiting.
///
/// ```rust
/// let db = StageTestDB::default();
/// db.map_put::<Table, _, _>(&items, |item| item)?;
/// ```
pub(crate) fn map_put<T, S, F>(&self, values: &[S], mut map: F) -> Result<(), Error>
where
T: Table,
S: Clone,
F: FnMut(&S) -> (T::Key, T::Value),
{
let mut db = self.container();
let tx = db.get_mut();
values.iter().try_for_each(|src| {
let (k, v) = map(src);
tx.put::<T>(k, v)
})?;
db.commit()?;
Ok(())
}
/// Transform a collection of values using a callback and store
/// them in the database. The callback additionally accepts the
/// optional last element that was stored.
/// This function commits the transaction before exiting.
///
/// ```rust
/// let db = StageTestDB::default();
/// db.transform_append::<Table, _, _>(&items, |prev, item| prev.unwrap_or_default() + item)?;
/// ```
pub(crate) fn transform_append<T, S, F>(
&self,
values: &[S],
mut transform: F,
) -> Result<(), Error>
where
T: Table,
<T as Table>::Value: Clone,
S: Clone,
F: FnMut(&Option<<T as Table>::Value>, &S) -> (T::Key, T::Value),
{
let mut db = self.container();
let tx = db.get_mut();
let mut cursor = tx.cursor_mut::<T>()?;
let mut last = cursor.last()?.map(|(_, v)| v);
values.iter().try_for_each(|src| {
let (k, v) = transform(&last, src);
last = Some(v.clone());
cursor.append(k, v)
})?;
db.commit()?;
Ok(())
}
/// Check there there is no table entry above a given block
pub(crate) fn check_no_entry_above<T: Table, F>(
&self,
block: BlockNumber,
mut selector: F,
) -> Result<(), Error>
where
T: Table,
F: FnMut(T::Key) -> BlockNumber,
{
let db = self.container();
let tx = db.get();
let mut cursor = tx.cursor::<T>()?;
if let Some((key, _)) = cursor.last()? {
assert!(selector(key) <= block);
}
Ok(())
}
}
/// A generic test runner for stages.
#[async_trait::async_trait]
pub(crate) trait StageTestRunner {
type S: Stage<Env<WriteMap>> + 'static;
/// Return a reference to the database.
fn db(&self) -> &StageTestDB;
/// Return an instance of a Stage.
fn stage(&self) -> Self::S;
/// Run [Stage::execute] and return a receiver for the result.
fn execute(&self, input: ExecInput) -> oneshot::Receiver<Result<ExecOutput, StageError>> {
let (tx, rx) = oneshot::channel();
let (db, mut stage) = (self.db().inner(), self.stage());
tokio::spawn(async move {
let mut db = DBContainer::new(db.borrow()).expect("failed to create db container");
let result = stage.execute(&mut db, input).await;
db.commit().expect("failed to commit");
tx.send(result).expect("failed to send message")
});
rx
}
/// Run [Stage::unwind] and return a receiver for the result.
fn unwind(
&self,
input: UnwindInput,
) -> oneshot::Receiver<Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>>>
{
let (tx, rx) = oneshot::channel();
let (db, mut stage) = (self.db().inner(), self.stage());
tokio::spawn(async move {
let mut db = DBContainer::new(db.borrow()).expect("failed to create db container");
let result = stage.unwind(&mut db, input).await;
db.commit().expect("failed to commit");
tx.send(result).expect("failed to send result");
});
rx
}
}
}