fix(p2p): network sync state (#2699)

This commit is contained in:
Roman Krasiuk
2023-05-16 23:24:40 +03:00
committed by GitHub
parent f37ee4b83b
commit 9b79218c18
15 changed files with 94 additions and 239 deletions

View File

@ -12,7 +12,7 @@ use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder,
headers::reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::FileClient,
};
use reth_interfaces::{consensus::Consensus, p2p::headers::client::NoopStatusUpdater};
use reth_interfaces::consensus::Consensus;
use reth_primitives::{ChainSpec, H256};
use reth_staged_sync::{
utils::{
@ -157,14 +157,12 @@ impl ImportCommand {
.with_tip_sender(tip_tx)
// we want to sync all blocks the file client provides or 0 if empty
.with_max_block(file_client.max_block().unwrap_or(0))
.with_sync_state_updater(file_client)
.add_stages(
DefaultStages::new(
HeaderSyncMode::Tip(tip_rx),
consensus.clone(),
header_downloader,
body_downloader,
NoopStatusUpdater::default(),
factory.clone(),
)
.set(

View File

@ -34,9 +34,8 @@ use reth_interfaces::{
p2p::{
bodies::{client::BodiesClient, downloader::BodyDownloader},
either::EitherDownloader,
headers::{client::StatusUpdater, downloader::HeaderDownloader},
headers::downloader::HeaderDownloader,
},
sync::SyncStateUpdater,
};
use reth_network::{error::NetworkError, NetworkConfig, NetworkHandle, NetworkManager};
use reth_network_api::NetworkInfo;
@ -311,7 +310,6 @@ impl Command {
let mut pipeline = self
.build_networked_pipeline(
&mut config,
network.clone(),
client.clone(),
Arc::clone(&consensus),
db.clone(),
@ -329,7 +327,6 @@ impl Command {
let pipeline = self
.build_networked_pipeline(
&mut config,
network.clone(),
network_client.clone(),
Arc::clone(&consensus),
db.clone(),
@ -348,6 +345,7 @@ impl Command {
pipeline,
blockchain_db.clone(),
Box::new(ctx.task_executor.clone()),
Box::new(network.clone()),
self.debug.max_block,
self.debug.continuous,
payload_builder.clone(),
@ -417,7 +415,6 @@ impl Command {
async fn build_networked_pipeline<DB, Client>(
&self,
config: &mut Config,
network: NetworkHandle,
client: Client,
consensus: Arc<dyn Consensus>,
db: DB,
@ -450,7 +447,6 @@ impl Command {
config,
header_downloader,
body_downloader,
network.clone(),
consensus,
max_block,
self.debug.continuous,
@ -625,13 +621,12 @@ impl Command {
}
#[allow(clippy::too_many_arguments)]
async fn build_pipeline<DB, H, B, U>(
async fn build_pipeline<DB, H, B>(
&self,
db: DB,
config: &Config,
header_downloader: H,
body_downloader: B,
updater: U,
consensus: Arc<dyn Consensus>,
max_block: Option<u64>,
continuous: bool,
@ -640,7 +635,6 @@ impl Command {
DB: Database + Clone + 'static,
H: HeaderDownloader + 'static,
B: BodyDownloader + 'static,
U: SyncStateUpdater + StatusUpdater + Clone + 'static,
{
let stage_conf = &config.stages;
@ -673,7 +667,6 @@ impl Command {
let header_mode =
if continuous { HeaderSyncMode::Continuous } else { HeaderSyncMode::Tip(tip_rx) };
let pipeline = builder
.with_sync_state_updater(updater.clone())
.with_tip_sender(tip_tx)
.add_stages(
DefaultStages::new(
@ -681,7 +674,6 @@ impl Command {
Arc::clone(&consensus),
header_downloader,
body_downloader,
updater,
factory.clone(),
)
.set(

View File

@ -9,11 +9,12 @@ use reth_interfaces::{
consensus::ForkchoiceState,
executor::Error as ExecutorError,
p2p::{bodies::client::BodiesClient, headers::client::HeadersClient},
sync::{NetworkSyncUpdater, SyncState},
Error,
};
use reth_payload_builder::{PayloadBuilderAttributes, PayloadBuilderHandle};
use reth_primitives::{
listener::EventListeners, BlockNumber, Header, SealedBlock, SealedHeader, H256, U256,
listener::EventListeners, BlockNumber, Head, Header, SealedBlock, SealedHeader, H256, U256,
};
use reth_provider::{BlockProvider, BlockSource, CanonChainTracker, ProviderError};
use reth_rpc_types::engine::{
@ -151,6 +152,8 @@ where
sync: EngineSyncController<DB, Client>,
/// The type we can use to query both the database and the blockchain tree.
blockchain: BT,
/// Used for emitting updates about whether the engine is syncing or not.
sync_state_updater: Box<dyn NetworkSyncUpdater>,
/// The Engine API message receiver.
engine_message_rx: UnboundedReceiverStream<BeaconEngineMessage>,
/// A clone of the handle
@ -183,6 +186,7 @@ where
pipeline: Pipeline<DB>,
blockchain: BT,
task_spawner: Box<dyn TaskSpawner>,
sync_state_updater: Box<dyn NetworkSyncUpdater>,
max_block: Option<BlockNumber>,
run_pipeline_continuously: bool,
payload_builder: PayloadBuilderHandle,
@ -194,6 +198,7 @@ where
pipeline,
blockchain,
task_spawner,
sync_state_updater,
max_block,
run_pipeline_continuously,
payload_builder,
@ -211,6 +216,7 @@ where
pipeline: Pipeline<DB>,
blockchain: BT,
task_spawner: Box<dyn TaskSpawner>,
sync_state_updater: Box<dyn NetworkSyncUpdater>,
max_block: Option<BlockNumber>,
run_pipeline_continuously: bool,
payload_builder: PayloadBuilderHandle,
@ -229,6 +235,7 @@ where
db,
sync,
blockchain,
sync_state_updater,
engine_message_rx: UnboundedReceiverStream::new(rx),
handle: handle.clone(),
forkchoice_state: None,
@ -398,6 +405,8 @@ where
// properly updated
self.update_canon_chain(&state)?;
}
self.listeners.notify(BeaconConsensusEngineEvent::ForkchoiceUpdated(state));
trace!(target: "consensus::engine", ?state, status = ?payload_response, "Returning forkchoice status");
return Ok(payload_response)
}
@ -428,6 +437,7 @@ where
}
/// Sets the state of the canon chain tracker based on the given forkchoice update.
/// Additionally, updates the head used for p2p handshakes.
///
/// This should be called before issuing a VALID forkchoice update.
fn update_canon_chain(&self, update: &ForkchoiceState) -> Result<(), reth_interfaces::Error> {
@ -458,8 +468,19 @@ where
.ok_or_else(|| {
Error::Provider(ProviderError::UnknownBlockHash(update.head_block_hash))
})?;
let head = head.header.seal(update.head_block_hash);
let head_td = self.blockchain.header_td_by_number(head.number)?.ok_or_else(|| {
Error::Provider(ProviderError::TotalDifficulty { number: head.number })
})?;
self.blockchain.set_canonical_head(head.header.seal(update.head_block_hash));
self.sync_state_updater.update_status(Head {
number: head.number,
hash: head.hash,
difficulty: head.difficulty,
timestamp: head.timestamp,
total_difficulty: head_td,
});
self.blockchain.set_canonical_head(head);
self.blockchain.on_forkchoice_update_received(update);
Ok(())
}
@ -678,6 +699,12 @@ where
block_number,
block_hash,
));
// Update the network sync state to `Idle`.
// Handles the edge case where the pipeline is never triggered, because we
// are sufficiently synced.
self.sync_state_updater.update_sync_state(SyncState::Idle);
PayloadStatusEnum::Valid
}
BlockStatus::Accepted => {
@ -753,11 +780,14 @@ where
if !self.try_insert_new_payload(block).is_valid() {
// if the payload is invalid, we run the pipeline
self.sync.set_pipeline_sync_target(hash);
} else {
self.sync_state_updater.update_sync_state(SyncState::Idle);
}
}
EngineSyncEvent::PipelineStarted(target) => {
trace!(target: "consensus::engine", ?target, continuous = target.is_none(), "Started the pipeline");
self.metrics.pipeline_runs.increment(1);
self.sync_state_updater.update_sync_state(SyncState::Syncing);
}
EngineSyncEvent::PipelineTaskDropped => {
error!(target: "consensus::engine", "Failed to receive spawned pipeline");
@ -774,11 +804,14 @@ where
return Some(Ok(()))
}
// Update the state and hashes of the blockchain tree if possible
if let Err(error) = self.restore_tree_if_possible(*current_state) {
error!(target: "consensus::engine", ?error, "Error restoring blockchain tree");
return Some(Err(error.into()))
}
// Update the state and hashes of the blockchain tree if possible.
match self.restore_tree_if_possible(*current_state) {
Ok(_) => self.sync_state_updater.update_sync_state(SyncState::Idle),
Err(error) => {
error!(target: "consensus::engine", ?error, "Error restoring blockchain tree");
return Some(Err(error.into()))
}
};
}
// Any pipeline error at this point is fatal.
Err(error) => return Some(Err(error.into())),
@ -877,7 +910,10 @@ mod tests {
BlockchainTree, ShareableBlockchainTree,
};
use reth_db::mdbx::{test_utils::create_test_rw_db, Env, WriteMap};
use reth_interfaces::test_utils::{NoopFullBlockClient, TestConsensus};
use reth_interfaces::{
sync::NoopSyncStateUpdater,
test_utils::{NoopFullBlockClient, TestConsensus},
};
use reth_payload_builder::test_utils::spawn_test_payload_service;
use reth_primitives::{ChainSpec, ChainSpecBuilder, SealedBlockWithSenders, H256, MAINNET};
use reth_provider::{
@ -999,6 +1035,7 @@ mod tests {
pipeline,
blockchain_provider,
Box::<TokioTaskExecutor>::default(),
Box::<NoopSyncStateUpdater>::default(),
None,
false,
payload_builder,

View File

@ -1,7 +1,7 @@
use crate::p2p::{download::DownloadClient, error::PeerRequestResult, priority::Priority};
use futures::{Future, FutureExt};
pub use reth_eth_wire::BlockHeaders;
use reth_primitives::{BlockHashOrNumber, Head, Header, HeadersDirection};
use reth_primitives::{BlockHashOrNumber, Header, HeadersDirection};
use std::{
fmt::Debug,
pin::Pin,
@ -84,18 +84,3 @@ where
Poll::Ready(resp)
}
}
/// The status updater for updating the status of the p2p node
pub trait StatusUpdater: Send + Sync {
/// Updates the status of the p2p node
fn update_status(&self, head: Head);
}
/// A [StatusUpdater] implementation that does nothing.
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct NoopStatusUpdater;
impl StatusUpdater for NoopStatusUpdater {
fn update_status(&self, _: Head) {}
}

View File

@ -1,4 +1,5 @@
//! Traits used when interacting with the sync status of the network.
use reth_primitives::Head;
/// A type that provides information about whether the node is currently syncing and the network is
/// currently serving syncing related requests.
@ -8,7 +9,7 @@ pub trait SyncStateProvider: Send + Sync {
fn is_syncing(&self) -> bool;
}
/// An updater for updating the [SyncState] of the network.
/// An updater for updating the [SyncState] and status of the network.
///
/// The node is either syncing, or it is idle.
/// While syncing, the node will download data from the network and process it. The processing
@ -16,9 +17,12 @@ pub trait SyncStateProvider: Send + Sync {
/// Eventually the node reaches the `Finish` stage and will transition to [`SyncState::Idle`], it
/// which point the node is considered fully synced.
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait SyncStateUpdater: std::fmt::Debug + Send + Sync + 'static {
pub trait NetworkSyncUpdater: std::fmt::Debug + Send + Sync + 'static {
/// Notifies about an [SyncState] update.
fn update_sync_state(&self, state: SyncState);
/// Updates the status of the p2p node
fn update_status(&self, head: Head);
}
/// The state the network is currently in when it comes to synchronization.
@ -41,17 +45,18 @@ impl SyncState {
}
}
/// A [SyncStateUpdater] implementation that does nothing.
/// A [NetworkSyncUpdater] implementation that does nothing.
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct NoopSyncStateUpdate;
pub struct NoopSyncStateUpdater;
impl SyncStateProvider for NoopSyncStateUpdate {
impl SyncStateProvider for NoopSyncStateUpdater {
fn is_syncing(&self) -> bool {
false
}
}
impl SyncStateUpdater for NoopSyncStateUpdate {
impl NetworkSyncUpdater for NoopSyncStateUpdater {
fn update_sync_state(&self, _state: SyncState) {}
fn update_status(&self, _: Head) {}
}

View File

@ -5,7 +5,7 @@ use crate::{
download::DownloadClient,
error::{DownloadError, DownloadResult, PeerRequestResult, RequestError},
headers::{
client::{HeadersClient, HeadersRequest, StatusUpdater},
client::{HeadersClient, HeadersRequest},
downloader::{validate_header_download, HeaderDownloader, SyncTarget},
},
priority::Priority,
@ -281,29 +281,6 @@ impl TestConsensus {
}
}
/// Status updater for testing.
///
/// [`TestStatusUpdater::new()`] creates a new [`TestStatusUpdater`] that is **not** shareable. This
/// struct wraps the sender side of a [`tokio::sync::watch`] channel. The receiving side of the
/// channel (which is shareable by cloning it) is also returned.
#[derive(Debug)]
pub struct TestStatusUpdater(tokio::sync::watch::Sender<Head>);
impl TestStatusUpdater {
/// Create a new test status updater and a receiver to listen to status updates on.
pub fn new() -> (Self, tokio::sync::watch::Receiver<Head>) {
let (tx, rx) = tokio::sync::watch::channel(Head::default());
(Self(tx), rx)
}
}
impl StatusUpdater for TestStatusUpdater {
fn update_status(&self, head: Head) {
self.0.send(head).expect("could not send status update");
}
}
#[async_trait::async_trait]
impl Consensus for TestConsensus {
fn validate_header(&self, _header: &SealedHeader) -> Result<(), ConsensusError> {

View File

@ -8,7 +8,7 @@ use reth_interfaces::{
headers::client::{HeadersClient, HeadersFut, HeadersRequest},
priority::Priority,
},
sync::{SyncState, SyncStateProvider, SyncStateUpdater},
sync::{NetworkSyncUpdater, SyncState, SyncStateProvider},
};
use reth_primitives::{
Block, BlockBody, BlockHash, BlockHashOrNumber, BlockNumber, Header, HeadersDirection, PeerId,
@ -54,9 +54,6 @@ pub struct FileClient {
/// The buffered bodies retrieved when fetching new headers.
bodies: HashMap<BlockHash, BlockBody>,
/// Represents if we are currently syncing.
is_syncing: Arc<AtomicBool>,
}
/// An error that can occur when constructing and using a [`FileClient`](FileClient).
@ -114,7 +111,7 @@ impl FileClient {
trace!(blocks = headers.len(), "Initialized file client");
Ok(Self { headers, hash_to_number, bodies, is_syncing: Arc::new(Default::default()) })
Ok(Self { headers, hash_to_number, bodies })
}
/// Get the tip hash of the chain.
@ -247,19 +244,6 @@ impl DownloadClient for FileClient {
}
}
impl SyncStateProvider for FileClient {
fn is_syncing(&self) -> bool {
self.is_syncing.load(Ordering::Relaxed)
}
}
impl SyncStateUpdater for FileClient {
fn update_sync_state(&self, state: SyncState) {
let is_syncing = state.is_syncing();
self.is_syncing.store(is_syncing, Ordering::Relaxed)
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -5,10 +5,7 @@ use crate::{
use async_trait::async_trait;
use parking_lot::Mutex;
use reth_eth_wire::{DisconnectReason, NewBlock, NewPooledTransactionHashes, SharedTransactions};
use reth_interfaces::{
p2p::headers::client::StatusUpdater,
sync::{SyncState, SyncStateProvider, SyncStateUpdater},
};
use reth_interfaces::sync::{NetworkSyncUpdater, SyncState, SyncStateProvider};
use reth_net_common::bandwidth_meter::BandwidthMeter;
use reth_network_api::{
NetworkError, NetworkInfo, PeerKind, Peers, PeersInfo, Reputation, ReputationChangeKind,
@ -243,24 +240,22 @@ impl NetworkInfo for NetworkHandle {
}
}
impl StatusUpdater for NetworkHandle {
/// Update the status of the node.
fn update_status(&self, head: Head) {
self.send_message(NetworkHandleMessage::StatusUpdate { head });
}
}
impl SyncStateProvider for NetworkHandle {
fn is_syncing(&self) -> bool {
self.inner.is_syncing.load(Ordering::Relaxed)
}
}
impl SyncStateUpdater for NetworkHandle {
impl NetworkSyncUpdater for NetworkHandle {
fn update_sync_state(&self, state: SyncState) {
let is_syncing = state.is_syncing();
self.inner.is_syncing.store(is_syncing, Ordering::Relaxed)
}
/// Update the status of the node.
fn update_status(&self, head: Head) {
self.send_message(NetworkHandleMessage::StatusUpdate { head });
}
}
#[derive(Debug)]

View File

@ -733,7 +733,7 @@ pub enum NetworkTransactionEvent {
mod tests {
use super::*;
use crate::{test_utils::Testnet, NetworkConfigBuilder, NetworkManager};
use reth_interfaces::sync::{SyncState, SyncStateUpdater};
use reth_interfaces::sync::{NetworkSyncUpdater, SyncState};
use reth_network_api::NetworkInfo;
use reth_provider::test_utils::NoopProvider;
use reth_rlp::Decodable;

View File

@ -7,7 +7,7 @@ use reth_discv4::Discv4Config;
use reth_eth_wire::DisconnectReason;
use reth_interfaces::{
p2p::headers::client::{HeadersClient, HeadersRequest},
sync::{SyncState, SyncStateUpdater},
sync::{NetworkSyncUpdater, SyncState},
};
use reth_net_common::ban_list::BanList;
use reth_network::{

View File

@ -24,7 +24,7 @@
//! # use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder;
//! # use reth_downloaders::headers::reverse_headers::ReverseHeadersDownloaderBuilder;
//! # use reth_interfaces::consensus::Consensus;
//! # use reth_interfaces::test_utils::{TestBodiesClient, TestConsensus, TestHeadersClient, TestStatusUpdater};
//! # use reth_interfaces::test_utils::{TestBodiesClient, TestConsensus, TestHeadersClient};
//! # use reth_revm::Factory;
//! # use reth_primitives::{PeerId, MAINNET, H256};
//! # use reth_stages::Pipeline;
@ -44,13 +44,12 @@
//! # );
//! # let (tip_tx, tip_rx) = watch::channel(H256::default());
//! # let factory = Factory::new(Arc::new(MAINNET.clone()));
//! # let (status_updater, _) = TestStatusUpdater::new();
//! // Create a pipeline that can fully sync
//! # let pipeline =
//! Pipeline::builder()
//! .with_tip_sender(tip_tx)
//! .add_stages(
//! DefaultStages::new(HeaderSyncMode::Tip(tip_rx), consensus, headers_downloader, bodies_downloader, status_updater, factory)
//! DefaultStages::new(HeaderSyncMode::Tip(tip_rx), consensus, headers_downloader, bodies_downloader, factory)
//! )
//! .build(db);
//! ```

View File

@ -1,6 +1,5 @@
use crate::{pipeline::BoxedStage, Pipeline, Stage, StageId, StageSet};
use reth_db::database::Database;
use reth_interfaces::sync::{NoopSyncStateUpdate, SyncStateUpdater};
use reth_primitives::{BlockNumber, H256};
use tokio::sync::watch;
@ -14,8 +13,6 @@ where
stages: Vec<BoxedStage<DB>>,
/// The maximum block number to sync to.
max_block: Option<BlockNumber>,
/// Used for emitting updates about whether the pipeline is running or not.
sync_state_updater: Box<dyn SyncStateUpdater>,
/// A receiver for the current chain tip to sync to
///
/// Note: this is only used for debugging purposes.
@ -63,22 +60,15 @@ where
self
}
/// Set a [SyncStateUpdater].
pub fn with_sync_state_updater<U: SyncStateUpdater>(mut self, updater: U) -> Self {
self.sync_state_updater = Box::new(updater);
self
}
/// Builds the final [`Pipeline`] using the given database.
///
/// Note: it's expected that this is either an [Arc](std::sync::Arc) or an Arc wrapper type.
pub fn build(self, db: DB) -> Pipeline<DB> {
let Self { stages, max_block, sync_state_updater, tip_tx } = self;
let Self { stages, max_block, tip_tx } = self;
Pipeline {
db,
stages,
max_block,
sync_state_updater,
tip_tx,
listeners: Default::default(),
progress: Default::default(),
@ -89,12 +79,7 @@ where
impl<DB: Database> Default for PipelineBuilder<DB> {
fn default() -> Self {
Self {
stages: Vec::new(),
max_block: None,
sync_state_updater: Box::<NoopSyncStateUpdate>::default(),
tip_tx: None,
}
Self { stages: Vec::new(), max_block: None, tip_tx: None }
}
}
@ -103,7 +88,6 @@ impl<DB: Database> std::fmt::Debug for PipelineBuilder<DB> {
f.debug_struct("PipelineBuilder")
.field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())
.field("max_block", &self.max_block)
.field("sync_state_updater", &self.sync_state_updater)
.finish()
}
}

View File

@ -1,7 +1,6 @@
use crate::{error::*, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput};
use futures_util::Future;
use reth_db::database::Database;
use reth_interfaces::sync::{SyncState, SyncStateUpdater};
use reth_primitives::{listener::EventListeners, BlockNumber, H256};
use reth_provider::Transaction;
use std::{ops::Deref, pin::Pin};
@ -92,8 +91,6 @@ pub struct Pipeline<DB: Database> {
max_block: Option<BlockNumber>,
/// All listeners for events the pipeline emits.
listeners: EventListeners<PipelineEvent>,
/// Used for emitting updates about whether the pipeline is running or not.
sync_state_updater: Box<dyn SyncStateUpdater>,
/// Keeps track of the progress of the pipeline.
progress: PipelineProgress,
/// A receiver for the current chain tip to sync to
@ -202,13 +199,6 @@ where
let stage = &self.stages[stage_index];
let stage_id = stage.id();
// Update sync state
if stage_id.is_finish() {
self.sync_state_updater.update_sync_state(SyncState::Idle);
} else {
self.sync_state_updater.update_sync_state(SyncState::Syncing);
}
trace!(target: "sync::pipeline", stage = %stage_id, "Executing stage");
let next = self
.execute_stage_to_completion(previous_stage, stage_index)
@ -225,8 +215,6 @@ where
}
ControlFlow::Continue { progress } => self.progress.update(progress),
ControlFlow::Unwind { target, bad_block } => {
// reset the sync state
self.sync_state_updater.update_sync_state(SyncState::Syncing);
self.unwind(target, bad_block).await?;
return Ok(ControlFlow::Unwind { target, bad_block })
}
@ -404,7 +392,6 @@ impl<DB: Database> std::fmt::Debug for Pipeline<DB> {
.field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())
.field("max_block", &self.max_block)
.field("listeners", &self.listeners)
.field("sync_state_updater", &self.sync_state_updater)
.finish()
}
}

View File

@ -49,10 +49,7 @@ use crate::{
use reth_db::database::Database;
use reth_interfaces::{
consensus::Consensus,
p2p::{
bodies::downloader::BodyDownloader,
headers::{client::StatusUpdater, downloader::HeaderDownloader},
},
p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader},
};
use reth_provider::ExecutorFactory;
use std::sync::Arc;
@ -65,23 +62,20 @@ use std::sync::Arc;
/// - [`OfflineStages`]
/// - [`FinishStage`]
#[derive(Debug)]
pub struct DefaultStages<H, B, S, EF> {
pub struct DefaultStages<H, B, EF> {
/// Configuration for the online stages
online: OnlineStages<H, B>,
/// Executor factory needs for execution stage
executor_factory: EF,
/// Configuration for the [`FinishStage`] stage.
status_updater: S,
}
impl<H, B, S, EF> DefaultStages<H, B, S, EF> {
impl<H, B, EF> DefaultStages<H, B, EF> {
/// Create a new set of default stages with default values.
pub fn new(
header_mode: HeaderSyncMode,
consensus: Arc<dyn Consensus>,
header_downloader: H,
body_downloader: B,
status_updater: S,
executor_factory: EF,
) -> Self
where
@ -90,38 +84,32 @@ impl<H, B, S, EF> DefaultStages<H, B, S, EF> {
Self {
online: OnlineStages::new(header_mode, consensus, header_downloader, body_downloader),
executor_factory,
status_updater,
}
}
}
impl<H, B, S, EF> DefaultStages<H, B, S, EF>
impl<H, B, EF> DefaultStages<H, B, EF>
where
S: StatusUpdater + 'static,
EF: ExecutorFactory,
{
/// Appends the default offline stages and default finish stage to the given builder.
pub fn add_offline_stages<DB: Database>(
default_offline: StageSetBuilder<DB>,
status_updater: S,
executor_factory: EF,
) -> StageSetBuilder<DB> {
default_offline
.add_set(OfflineStages::new(executor_factory))
.add_stage(FinishStage::new(status_updater))
default_offline.add_set(OfflineStages::new(executor_factory)).add_stage(FinishStage)
}
}
impl<DB, H, B, S, EF> StageSet<DB> for DefaultStages<H, B, S, EF>
impl<DB, H, B, EF> StageSet<DB> for DefaultStages<H, B, EF>
where
DB: Database,
H: HeaderDownloader + 'static,
B: BodyDownloader + 'static,
S: StatusUpdater + 'static,
EF: ExecutorFactory,
{
fn builder(self) -> StageSetBuilder<DB> {
Self::add_offline_stages(self.online.builder(), self.status_updater, self.executor_factory)
Self::add_offline_stages(self.online.builder(), self.executor_factory)
}
}

View File

@ -1,7 +1,5 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput};
use reth_db::database::Database;
use reth_interfaces::p2p::headers::client::StatusUpdater;
use reth_primitives::{BlockNumber, Head};
use reth_provider::Transaction;
/// The [`StageId`] of the finish stage.
@ -10,71 +8,29 @@ pub const FINISH: StageId = StageId("Finish");
/// The finish stage.
///
/// This stage does not write anything; it's checkpoint is used to denote the highest fully synced
/// block, which is communicated to the P2P networking component, as well as the RPC component.
///
/// The highest block is communicated via a [StatusUpdater] when the stage executes or unwinds.
///
/// When the node starts up, the checkpoint for this stage should be used send the initial status
/// update to the relevant components. Assuming the genesis block is written before this, it is safe
/// to default the stage checkpoint to block number 0 on startup.
/// block.
#[derive(Default, Debug, Clone)]
pub struct FinishStage<S> {
updater: S,
}
impl<S> FinishStage<S>
where
S: StatusUpdater,
{
/// Create a new [FinishStage] with the given [StatusUpdater].
pub fn new(updater: S) -> Self {
Self { updater }
}
/// Construct a [Head] from `block_number`.
fn fetch_head<DB: Database>(
&self,
tx: &mut Transaction<'_, DB>,
block_number: BlockNumber,
) -> Result<Head, StageError> {
let header = tx.get_header(block_number)?;
let hash = tx.get_block_hash(block_number)?;
let total_difficulty = tx.get_td(block_number)?;
Ok(Head {
number: block_number,
hash,
total_difficulty,
difficulty: header.difficulty,
timestamp: header.timestamp,
})
}
}
pub struct FinishStage;
#[async_trait::async_trait]
impl<DB: Database, S> Stage<DB> for FinishStage<S>
where
S: StatusUpdater,
{
impl<DB: Database> Stage<DB> for FinishStage {
fn id(&self) -> StageId {
FINISH
}
async fn execute(
&mut self,
tx: &mut Transaction<'_, DB>,
_tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
self.updater.update_status(self.fetch_head(tx, input.previous_stage_progress())?);
Ok(ExecOutput { done: true, stage_progress: input.previous_stage_progress() })
}
async fn unwind(
&mut self,
tx: &mut Transaction<'_, DB>,
_tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
self.updater.update_status(self.fetch_head(tx, input.unwind_to)?);
Ok(UnwindOutput { stage_progress: input.unwind_to })
}
}
@ -86,52 +42,30 @@ mod tests {
stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
TestTransaction, UnwindStageTestRunner,
};
use reth_interfaces::test_utils::{
generators::{random_header, random_header_range},
TestStatusUpdater,
};
use reth_interfaces::test_utils::generators::{random_header, random_header_range};
use reth_primitives::SealedHeader;
use std::sync::Mutex;
stage_test_suite_ext!(FinishTestRunner, finish);
struct FinishTestRunner {
tx: TestTransaction,
status: Mutex<Option<tokio::sync::watch::Receiver<Head>>>,
}
impl FinishTestRunner {
/// Gets the current status.
///
/// # Panics
///
/// Panics if multiple threads try to read the status at the same time, or if no status
/// receiver has been set.
fn current_status(&self) -> Head {
let status_lock = self.status.try_lock().expect("competing for status lock");
let status = status_lock.as_ref().expect("no status receiver set").borrow();
*status
}
}
impl Default for FinishTestRunner {
fn default() -> Self {
FinishTestRunner { tx: TestTransaction::default(), status: Mutex::new(None) }
FinishTestRunner { tx: TestTransaction::default() }
}
}
impl StageTestRunner for FinishTestRunner {
type S = FinishStage<TestStatusUpdater>;
type S = FinishStage;
fn tx(&self) -> &TestTransaction {
&self.tx
}
fn stage(&self) -> Self::S {
let (status_updater, status) = TestStatusUpdater::new();
let mut status_container = self.status.try_lock().expect("competing for status lock");
*status_container = Some(status);
FinishStage::new(status_updater)
FinishStage
}
}
@ -168,23 +102,13 @@ mod tests {
input.previous_stage_progress(),
"stage progress should always match progress of previous stage"
);
assert_eq!(
self.current_status().number,
output.stage_progress,
"incorrect block number in status update"
);
}
Ok(())
}
}
impl UnwindStageTestRunner for FinishTestRunner {
fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
assert_eq!(
self.current_status().number,
input.unwind_to,
"incorrect block number in status update"
);
fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
Ok(())
}
}