chore: use primitives::BlockNumber (#17)

This commit is contained in:
Bjerg
2022-10-06 19:30:32 +02:00
committed by GitHub
parent c749658fd4
commit 8317cd2b63
2 changed files with 49 additions and 85 deletions

View File

@ -10,7 +10,7 @@
use async_trait::async_trait;
use reth_db::mdbx;
use reth_primitives::U64;
use reth_primitives::BlockNumber;
use std::fmt::Display;
use thiserror::Error;
@ -21,27 +21,27 @@ pub use pipeline::*;
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct ExecInput {
/// The stage that was run before the current stage and the block number it reached.
pub previous_stage: Option<(StageId, U64)>,
pub previous_stage: Option<(StageId, BlockNumber)>,
/// The progress of this stage the last time it was executed.
pub stage_progress: Option<U64>,
pub stage_progress: Option<BlockNumber>,
}
/// Stage unwind input, see [Stage::unwind].
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct UnwindInput {
/// The current highest block of the stage.
pub stage_progress: U64,
pub stage_progress: BlockNumber,
/// The block to unwind to.
pub unwind_to: U64,
pub unwind_to: BlockNumber,
/// The bad block that caused the unwind, if any.
pub bad_block: Option<U64>,
pub bad_block: Option<BlockNumber>,
}
/// The output of a stage execution.
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct ExecOutput {
/// How far the stage got.
pub stage_progress: U64,
pub stage_progress: BlockNumber,
/// Whether or not the stage is done.
pub done: bool,
/// Whether or not the stage reached the tip of the chain.
@ -52,7 +52,7 @@ pub struct ExecOutput {
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct UnwindOutput {
/// The block at which the stage has unwound to.
pub stage_progress: U64,
pub stage_progress: BlockNumber,
}
/// A stage execution error.
@ -64,7 +64,7 @@ pub enum StageError {
#[error("Stage encountered a validation error in block {block}.")]
Validation {
/// The block that failed validation.
block: U64,
block: BlockNumber,
},
/// The stage encountered an internal error.
#[error(transparent)]
@ -126,7 +126,7 @@ impl StageId {
pub fn get_progress<'db, K, E>(
&self,
tx: &mdbx::Transaction<'db, K, E>,
) -> Result<Option<U64>, mdbx::Error>
) -> Result<Option<BlockNumber>, mdbx::Error>
where
K: mdbx::TransactionKind,
E: mdbx::EnvironmentKind,
@ -134,14 +134,14 @@ impl StageId {
// TODO: Clean up when we get better database abstractions
let bytes: Option<Vec<u8>> = tx.get(&tx.open_db(Some("SyncStage"))?, self.0.as_ref())?;
Ok(bytes.map(|b| U64::from_big_endian(b.as_ref())))
Ok(bytes.map(|b| BlockNumber::from_be_bytes(b.try_into().expect("Database corrupt"))))
}
/// Save the progress of this stage.
pub fn save_progress<'db, E>(
&self,
tx: &mdbx::Transaction<'db, mdbx::RW, E>,
block: U64,
block: BlockNumber,
) -> Result<(), mdbx::Error>
where
E: mdbx::EnvironmentKind,
@ -150,7 +150,7 @@ impl StageId {
tx.put(
&tx.open_db(Some("SyncStage"))?,
self.0,
block.0[0].to_be_bytes(),
block.to_be_bytes(),
mdbx::WriteFlags::UPSERT,
)
}

View File

@ -1,6 +1,6 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput};
use reth_db::mdbx;
use reth_primitives::U64;
use reth_primitives::BlockNumber;
use std::fmt::{Debug, Formatter};
use tokio::sync::mpsc::Sender;
use tracing::*;
@ -41,7 +41,7 @@ where
E: mdbx::EnvironmentKind,
{
stages: Vec<QueuedStage<'db, E>>,
max_block: Option<U64>,
max_block: Option<BlockNumber>,
events_sender: Option<Sender<PipelineEvent>>,
}
@ -106,7 +106,7 @@ where
/// Set the target block.
///
/// Once this block is reached, syncing will stop.
pub fn set_max_block(mut self, block: Option<U64>) -> Self {
pub fn set_max_block(mut self, block: Option<BlockNumber>) -> Self {
self.max_block = block;
self
}
@ -123,8 +123,8 @@ where
db: &'db mdbx::Environment<E>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut previous_stage = None;
let mut minimum_progress: Option<U64> = None;
let mut maximum_progress: Option<U64> = None;
let mut minimum_progress: Option<BlockNumber> = None;
let mut maximum_progress: Option<BlockNumber> = None;
let mut reached_tip_flag = true;
'run: loop {
@ -265,8 +265,8 @@ where
pub async fn unwind(
&mut self,
db: &'db mdbx::Environment<E>,
to: U64,
bad_block: Option<U64>,
to: BlockNumber,
bad_block: Option<BlockNumber>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Sort stages by unwind priority
let mut unwind_pipeline = {
@ -353,7 +353,7 @@ pub enum PipelineEvent {
/// The stage that is about to be run.
stage_id: StageId,
/// The previous checkpoint of the stage.
stage_progress: Option<U64>,
stage_progress: Option<BlockNumber>,
},
/// Emitted when a stage has run a single time.
///
@ -407,7 +407,7 @@ mod tests {
Pipeline::<mdbx::WriteMap>::new_with_channel(tx)
.push(
TestStage::new(StageId("A")).add_exec(Ok(ExecOutput {
stage_progress: 20.into(),
stage_progress: 20,
done: true,
reached_tip: true,
})),
@ -415,13 +415,13 @@ mod tests {
)
.push(
TestStage::new(StageId("B")).add_exec(Ok(ExecOutput {
stage_progress: 10.into(),
stage_progress: 10,
done: true,
reached_tip: true,
})),
false,
)
.set_max_block(Some(10.into()))
.set_max_block(Some(10))
.run(&db)
.await
});
@ -433,20 +433,12 @@ mod tests {
PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None },
PipelineEvent::Ran {
stage_id: StageId("A"),
result: Some(ExecOutput {
stage_progress: 20.into(),
done: true,
reached_tip: true,
}),
result: Some(ExecOutput { stage_progress: 20, done: true, reached_tip: true }),
},
PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None },
PipelineEvent::Ran {
stage_id: StageId("B"),
result: Some(ExecOutput {
stage_progress: 10.into(),
done: true,
reached_tip: true,
}),
result: Some(ExecOutput { stage_progress: 10, done: true, reached_tip: true }),
},
]
);
@ -464,34 +456,30 @@ mod tests {
.push(
TestStage::new(StageId("A"))
.add_exec(Ok(ExecOutput {
stage_progress: 100.into(),
stage_progress: 100,
done: true,
reached_tip: true,
}))
.add_unwind(Ok(UnwindOutput { stage_progress: 1.into() })),
.add_unwind(Ok(UnwindOutput { stage_progress: 1 })),
false,
)
.push(
TestStage::new(StageId("B"))
.add_exec(Ok(ExecOutput {
stage_progress: 10.into(),
stage_progress: 10,
done: true,
reached_tip: true,
}))
.add_unwind(Ok(UnwindOutput { stage_progress: 1.into() })),
.add_unwind(Ok(UnwindOutput { stage_progress: 1 })),
false,
)
.set_max_block(Some(10.into()));
.set_max_block(Some(10));
// Sync first
pipeline.run(&db).await.expect("Could not run pipeline");
// Unwind
pipeline
.set_channel(tx)
.unwind(&db, 1.into(), None)
.await
.expect("Could not unwind pipeline");
pipeline.set_channel(tx).unwind(&db, 1, None).await.expect("Could not unwind pipeline");
});
// Check that the stages were unwound in reverse order
@ -500,27 +488,19 @@ mod tests {
vec![
PipelineEvent::Unwinding {
stage_id: StageId("B"),
input: UnwindInput {
stage_progress: 10.into(),
unwind_to: 1.into(),
bad_block: None
}
input: UnwindInput { stage_progress: 10, unwind_to: 1, bad_block: None }
},
PipelineEvent::Unwound {
stage_id: StageId("B"),
result: Some(UnwindOutput { stage_progress: 1.into() }),
result: Some(UnwindOutput { stage_progress: 1 }),
},
PipelineEvent::Unwinding {
stage_id: StageId("A"),
input: UnwindInput {
stage_progress: 100.into(),
unwind_to: 1.into(),
bad_block: None
}
input: UnwindInput { stage_progress: 100, unwind_to: 1, bad_block: None }
},
PipelineEvent::Unwound {
stage_id: StageId("A"),
result: Some(UnwindOutput { stage_progress: 1.into() }),
result: Some(UnwindOutput { stage_progress: 1 }),
},
]
);
@ -549,13 +529,13 @@ mod tests {
.push(
TestStage::new(StageId("A"))
.add_exec(Ok(ExecOutput {
stage_progress: 10.into(),
stage_progress: 10,
done: true,
reached_tip: true,
}))
.add_unwind(Ok(UnwindOutput { stage_progress: 0.into() }))
.add_unwind(Ok(UnwindOutput { stage_progress: 0 }))
.add_exec(Ok(ExecOutput {
stage_progress: 10.into(),
stage_progress: 10,
done: true,
reached_tip: true,
})),
@ -563,16 +543,16 @@ mod tests {
)
.push(
TestStage::new(StageId("B"))
.add_exec(Err(StageError::Validation { block: 5.into() }))
.add_unwind(Ok(UnwindOutput { stage_progress: 0.into() }))
.add_exec(Err(StageError::Validation { block: 5 }))
.add_unwind(Ok(UnwindOutput { stage_progress: 0 }))
.add_exec(Ok(ExecOutput {
stage_progress: 10.into(),
stage_progress: 10,
done: true,
reached_tip: true,
})),
false,
)
.set_max_block(Some(10.into()))
.set_max_block(Some(10))
.set_channel(tx)
.run(&db)
.await
@ -586,43 +566,27 @@ mod tests {
PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None },
PipelineEvent::Ran {
stage_id: StageId("A"),
result: Some(ExecOutput {
stage_progress: 10.into(),
done: true,
reached_tip: true,
}),
result: Some(ExecOutput { stage_progress: 10, done: true, reached_tip: true }),
},
PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None },
PipelineEvent::Ran { stage_id: StageId("B"), result: None },
PipelineEvent::Unwinding {
stage_id: StageId("A"),
input: UnwindInput {
stage_progress: 10.into(),
unwind_to: 0.into(),
bad_block: Some(5.into())
}
input: UnwindInput { stage_progress: 10, unwind_to: 0, bad_block: Some(5) }
},
PipelineEvent::Unwound {
stage_id: StageId("A"),
result: Some(UnwindOutput { stage_progress: 0.into() }),
result: Some(UnwindOutput { stage_progress: 0 }),
},
PipelineEvent::Running { stage_id: StageId("A"), stage_progress: Some(0.into()) },
PipelineEvent::Running { stage_id: StageId("A"), stage_progress: Some(0) },
PipelineEvent::Ran {
stage_id: StageId("A"),
result: Some(ExecOutput {
stage_progress: 10.into(),
done: true,
reached_tip: true,
}),
result: Some(ExecOutput { stage_progress: 10, done: true, reached_tip: true }),
},
PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None },
PipelineEvent::Ran {
stage_id: StageId("B"),
result: Some(ExecOutput {
stage_progress: 10.into(),
done: true,
reached_tip: true,
}),
result: Some(ExecOutput { stage_progress: 10, done: true, reached_tip: true }),
},
]
);