From c23c65fc3b1760bcfbf4848885ea3bb02991b52f Mon Sep 17 00:00:00 2001 From: mempirate Date: Thu, 15 Dec 2022 08:59:28 +0100 Subject: [PATCH] feat(net): draft for sending status updates through `NetworkHandle` (#436) * feat(net): draft for sending status updates through `NetworkHandle` * feat(net): draft for sending status updates through `NetworkHandle` * fix(net): remove unused import * feat(net): implement getters for status update sender half * docs(net): document methods * chore: cargo fmt * feat(net): move status updating logic to NetworkManager and NetworkHandle * feat(net): move status updating logic to NetworkManager and NetworkHandle * fix(net): fix headers stage testing * fix: derive default * fix: remove StatusUpdate struct Co-authored-by: Matthias Seitz --- Cargo.lock | 1 + bin/reth/src/node/mod.rs | 1 + crates/interfaces/src/p2p/headers/client.rs | 11 +++---- crates/interfaces/src/test_utils/headers.rs | 14 ++++++--- crates/net/network/src/fetch/client.rs | 10 ++----- crates/net/network/src/fetch/mod.rs | 28 ++---------------- crates/net/network/src/manager.rs | 11 ++----- crates/net/network/src/network.rs | 20 +++++++++++-- crates/net/network/src/peers/manager.rs | 2 +- crates/net/network/src/session/mod.rs | 11 ++++--- crates/net/network/src/state.rs | 8 +----- crates/net/network/src/swarm.rs | 4 --- crates/stages/Cargo.toml | 5 ++-- crates/stages/src/stages/headers.rs | 32 +++++++++++++++------ 14 files changed, 76 insertions(+), 82 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 04ec0d89f..86d7f8c94 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3635,6 +3635,7 @@ dependencies = [ "reth-eth-wire", "reth-executor", "reth-interfaces", + "reth-network", "reth-primitives", "reth-provider", "reth-rlp", diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index da6bd5efc..b42694336 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -63,6 +63,7 @@ impl Command { .build(consensus.clone(), fetch_client.clone()), consensus: consensus.clone(), client: fetch_client.clone(), + network_handle: network.clone(), commit_threshold: 100, }, false, diff --git a/crates/interfaces/src/p2p/headers/client.rs b/crates/interfaces/src/p2p/headers/client.rs index a5f525044..3f60c7708 100644 --- a/crates/interfaces/src/p2p/headers/client.rs +++ b/crates/interfaces/src/p2p/headers/client.rs @@ -20,12 +20,13 @@ pub struct HeadersRequest { #[async_trait] #[auto_impl::auto_impl(&, Arc, Box)] pub trait HeadersClient: Send + Sync + Debug { - /// Update the node's Status message. - /// - /// The updated Status message will be used during any new eth/65 handshakes. - fn update_status(&self, height: u64, hash: H256, td: U256); - /// Sends the header request to the p2p network and returns the header response received from a /// peer. async fn get_headers(&self, request: HeadersRequest) -> PeerRequestResult; } + +/// The status updater for updating the status of the p2p node +pub trait StatusUpdater: Send + Sync { + /// Updates the status of the p2p node + fn update_status(&self, height: u64, hash: H256, total_difficulty: U256); +} diff --git a/crates/interfaces/src/test_utils/headers.rs b/crates/interfaces/src/test_utils/headers.rs index 796c44c5b..9ad8ddfdd 100644 --- a/crates/interfaces/src/test_utils/headers.rs +++ b/crates/interfaces/src/test_utils/headers.rs @@ -5,7 +5,7 @@ use crate::{ downloader::{DownloadStream, Downloader}, error::{PeerRequestResult, RequestError}, headers::{ - client::{HeadersClient, HeadersRequest}, + client::{HeadersClient, HeadersRequest, StatusUpdater}, downloader::HeaderDownloader, error::DownloadError, }, @@ -14,7 +14,7 @@ use crate::{ use futures::{Future, FutureExt, Stream}; use reth_eth_wire::BlockHeaders; use reth_primitives::{ - BlockLocked, BlockNumber, Header, HeadersDirection, PeerId, SealedHeader, H256, U256, + BlockLocked, BlockNumber, Header, HeadersDirection, PeerId, SealedHeader, H256, }; use reth_rpc_types::engine::ForkchoiceState; use std::{ @@ -170,8 +170,6 @@ impl TestHeadersClient { #[async_trait::async_trait] impl HeadersClient for TestHeadersClient { - fn update_status(&self, _height: u64, _hash: H256, _td: U256) {} - async fn get_headers(&self, request: HeadersRequest) -> PeerRequestResult { if let Some(err) = &mut *self.error.lock().await { return Err(err.clone()) @@ -228,6 +226,14 @@ impl TestConsensus { } } +/// Nil status updater for testing +#[derive(Debug, Clone, Default)] +pub struct TestStatusUpdater; + +impl StatusUpdater for TestStatusUpdater { + fn update_status(&self, _height: u64, _hash: H256, _total_difficulty: reth_primitives::U256) {} +} + #[async_trait::async_trait] impl Consensus for TestConsensus { fn fork_choice_state(&self) -> watch::Receiver { diff --git a/crates/net/network/src/fetch/client.rs b/crates/net/network/src/fetch/client.rs index 341de0ec6..3977bebac 100644 --- a/crates/net/network/src/fetch/client.rs +++ b/crates/net/network/src/fetch/client.rs @@ -1,13 +1,13 @@ //! A client implementation that can interact with the network and download data. -use crate::fetch::{DownloadRequest, StatusUpdate}; +use crate::fetch::DownloadRequest; use reth_eth_wire::{BlockBody, BlockHeaders}; use reth_interfaces::p2p::{ bodies::client::BodiesClient, error::PeerRequestResult, headers::client::{HeadersClient, HeadersRequest}, }; -use reth_primitives::{WithPeerId, H256, U256}; +use reth_primitives::{WithPeerId, H256}; use tokio::sync::{mpsc::UnboundedSender, oneshot}; /// Front-end API for fetching data from the network. @@ -15,16 +15,10 @@ use tokio::sync::{mpsc::UnboundedSender, oneshot}; pub struct FetchClient { /// Sender half of the request channel. pub(crate) request_tx: UnboundedSender, - /// Sender for sending Status updates - pub(crate) status_tx: UnboundedSender, } #[async_trait::async_trait] impl HeadersClient for FetchClient { - fn update_status(&self, height: u64, hash: H256, total_difficulty: U256) { - let _ = self.status_tx.send(StatusUpdate { height, hash, total_difficulty }); - } - /// Sends a `GetBlockHeaders` request to an available peer. async fn get_headers(&self, request: HeadersRequest) -> PeerRequestResult { let (response, rx) = oneshot::channel(); diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index 2a92f23f9..21aa312e8 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -7,7 +7,7 @@ use reth_interfaces::p2p::{ error::{PeerRequestResult, RequestError, RequestResult}, headers::client::HeadersRequest, }; -use reth_primitives::{Header, PeerId, H256, U256}; +use reth_primitives::{Header, PeerId, H256}; use std::{ collections::{HashMap, VecDeque}, task::{Context, Poll}, @@ -38,10 +38,6 @@ pub struct StateFetcher { download_requests_rx: UnboundedReceiverStream, /// Sender for download requests, used to detach a [`FetchClient`] download_requests_tx: UnboundedSender, - /// Receiver for new incoming [`StatusUpdate`] requests. - status_rx: UnboundedReceiverStream, - /// Sender for updating the status, used to detach a [`FetchClient`] - status_tx: UnboundedSender, } // === impl StateSyncer === @@ -120,10 +116,6 @@ impl StateFetcher { PollAction::NoPeersAvailable => true, }; - if let Poll::Ready(Some(status)) = self.status_rx.poll_next_unpin(cx) { - return Poll::Ready(FetchAction::StatusUpdate(status)) - } - loop { // poll incoming requests match self.download_requests_rx.poll_next_unpin(cx) { @@ -229,17 +221,13 @@ impl StateFetcher { /// Returns a new [`FetchClient`] that can send requests to this type. pub(crate) fn client(&self) -> FetchClient { - FetchClient { - request_tx: self.download_requests_tx.clone(), - status_tx: self.status_tx.clone(), - } + FetchClient { request_tx: self.download_requests_tx.clone() } } } impl Default for StateFetcher { fn default() -> Self { let (download_requests_tx, download_requests_rx) = mpsc::unbounded_channel(); - let (status_tx, status_rx) = mpsc::unbounded_channel(); Self { inflight_headers_requests: Default::default(), inflight_bodies_requests: Default::default(), @@ -247,8 +235,6 @@ impl Default for StateFetcher { queued_requests: Default::default(), download_requests_rx: UnboundedReceiverStream::new(download_requests_rx), download_requests_tx, - status_rx: UnboundedReceiverStream::new(status_rx), - status_tx, } } } @@ -314,14 +300,6 @@ struct Request { response: oneshot::Sender, } -/// A message to update the status. -#[derive(Debug, Clone)] -pub(crate) struct StatusUpdate { - pub(crate) height: u64, - pub(crate) hash: H256, - pub(crate) total_difficulty: U256, -} - /// Requests that can be sent to the Syncer from a [`FetchClient`] pub(crate) enum DownloadRequest { /// Download the requested headers and send response through channel @@ -357,8 +335,6 @@ pub(crate) enum FetchAction { /// The request to send request: BlockRequest, }, - /// Propagate a received status update for the node - StatusUpdate(StatusUpdate), } /// Outcome of a processed response. diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 65ee18bf0..bfbd8045b 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -456,6 +456,9 @@ where NetworkHandleMessage::FetchClient(tx) => { let _ = tx.send(self.fetch_client()); } + NetworkHandleMessage::StatusUpdate { height, hash, total_difficulty } => { + self.swarm.sessions_mut().on_status_update(height, hash, total_difficulty); + } } } } @@ -612,14 +615,6 @@ where .peers_mut() .apply_reputation_change(&peer_id, ReputationChangeKind::FailedToConnect); } - SwarmEvent::StatusUpdate(status) => { - trace!( - target : "net", - ?status, - "Status Update received" - ); - this.swarm.sessions_mut().on_status_update(status.clone()) - } } } diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index 12427e21d..8981da8d9 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -7,7 +7,8 @@ use crate::{ }; use parking_lot::Mutex; use reth_eth_wire::{NewBlock, NewPooledTransactionHashes, SharedTransactions}; -use reth_primitives::{PeerId, TransactionSigned, TxHash, H256}; +use reth_interfaces::p2p::headers::client::StatusUpdater; +use reth_primitives::{PeerId, TransactionSigned, TxHash, H256, U256}; use std::{ net::SocketAddr, sync::{ @@ -21,7 +22,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream; /// A _shareable_ network frontend. Used to interact with the network. /// /// See also [`NetworkManager`](crate::NetworkManager). -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct NetworkHandle { /// The Arc'ed delegate that contains the state. inner: Arc, @@ -102,6 +103,11 @@ impl NetworkHandle { let _ = self.inner.to_manager_tx.send(msg); } + /// Update the status of the node. + pub fn update_status(&self, height: u64, hash: H256, total_difficulty: U256) { + self.send_message(NetworkHandleMessage::StatusUpdate { height, hash, total_difficulty }); + } + /// Announce a block over devp2p /// /// Caution: in PoS this is a noop, since new block propagation will happen over devp2p @@ -148,6 +154,14 @@ impl NetworkHandle { } } +impl StatusUpdater for NetworkHandle { + /// Update the status of the node. + fn update_status(&self, height: u64, hash: H256, total_difficulty: U256) { + self.send_message(NetworkHandleMessage::StatusUpdate { height, hash, total_difficulty }); + } +} + +#[derive(Debug)] struct NetworkInner { /// Number of active peer sessions the node's currently handling. num_active_peers: Arc, @@ -189,4 +203,6 @@ pub(crate) enum NetworkHandleMessage { ReputationChange(PeerId, ReputationChangeKind), /// Returns the client that can be used to interact with the network. FetchClient(oneshot::Sender), + /// Apply a status update. + StatusUpdate { height: u64, hash: H256, total_difficulty: U256 }, } diff --git a/crates/net/network/src/peers/manager.rs b/crates/net/network/src/peers/manager.rs index b59ab1a66..48f1dd2ed 100644 --- a/crates/net/network/src/peers/manager.rs +++ b/crates/net/network/src/peers/manager.rs @@ -22,7 +22,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::trace; /// A communication channel to the [`PeersManager`] to apply manual changes to the peer set. -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct PeersHandle { /// Sender half of command channel back to the [`PeersManager`] manager_tx: mpsc::UnboundedSender, diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index e4739b824..721fbbd4c 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -1,7 +1,6 @@ //! Support for handling peer sessions. pub use crate::message::PeerRequestSender; use crate::{ - fetch::StatusUpdate, message::PeerMessage, session::{ active::ActiveSession, @@ -20,7 +19,7 @@ use reth_eth_wire::{ error::EthStreamError, DisconnectReason, HelloMessage, Status, UnauthedEthStream, UnauthedP2PStream, }; -use reth_primitives::{ForkFilter, Hardfork, PeerId}; +use reth_primitives::{ForkFilter, Hardfork, PeerId, H256, U256}; use secp256k1::SecretKey; use std::{ collections::HashMap, @@ -150,10 +149,10 @@ impl SessionManager { } /// Invoked on a received status update - pub(crate) fn on_status_update(&mut self, status: StatusUpdate) { - self.status.blockhash = status.hash; - self.status.total_difficulty = status.total_difficulty; - self.fork_filter.set_head(status.height); + pub(crate) fn on_status_update(&mut self, height: u64, hash: H256, total_difficulty: U256) { + self.status.blockhash = hash; + self.status.total_difficulty = total_difficulty; + self.fork_filter.set_head(height); } /// An incoming TCP connection was received. This starts the authentication process to turn this diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index 844d6f66b..7b41b464d 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -3,7 +3,7 @@ use crate::{ cache::LruCache, discovery::{Discovery, DiscoveryEvent}, - fetch::{BlockResponseOutcome, FetchAction, StateFetcher, StatusUpdate}, + fetch::{BlockResponseOutcome, FetchAction, StateFetcher}, message::{ BlockRequest, NewBlockMessage, PeerRequest, PeerRequestSender, PeerResponse, PeerResponseResult, @@ -353,10 +353,6 @@ where FetchAction::BlockRequest { peer_id, request } => { self.handle_block_request(peer_id, request) } - FetchAction::StatusUpdate(status) => { - // we want to return this directly - return Poll::Ready(StateAction::StatusUpdate(status)) - } } } @@ -427,8 +423,6 @@ pub(crate) struct ActivePeer { /// Message variants triggered by the [`State`] pub(crate) enum StateAction { - /// Received a node status update. - StatusUpdate(StatusUpdate), /// Dispatch a `NewBlock` message to the peer NewBlock { /// Target of the message diff --git a/crates/net/network/src/swarm.rs b/crates/net/network/src/swarm.rs index d73c72d69..dc5efe58f 100644 --- a/crates/net/network/src/swarm.rs +++ b/crates/net/network/src/swarm.rs @@ -1,5 +1,4 @@ use crate::{ - fetch::StatusUpdate, listener::{ConnectionListener, ListenerEvent}, message::{PeerMessage, PeerRequestSender}, peers::InboundConnectionError, @@ -221,7 +220,6 @@ where let msg = PeerMessage::NewBlockHashes(hashes); self.sessions.send_message(&peer_id, msg); } - StateAction::StatusUpdate(status) => return Some(SwarmEvent::StatusUpdate(status)), } None } @@ -279,8 +277,6 @@ where /// All events created or delegated by the [`Swarm`] that represents changes to the state of the /// network. pub(crate) enum SwarmEvent { - /// Received a node status update. - StatusUpdate(StatusUpdate), /// Events related to the actual network protocol. ValidMessage { /// The peer that sent the message diff --git a/crates/stages/Cargo.toml b/crates/stages/Cargo.toml index 9481adb88..574ec2afc 100644 --- a/crates/stages/Cargo.toml +++ b/crates/stages/Cargo.toml @@ -14,7 +14,8 @@ reth-interfaces = { path = "../interfaces" } reth-executor = { path = "../executor" } reth-rlp = { path = "../common/rlp" } reth-db = { path = "../storage/db" } -reth-provider = { path = "../storage/provider"} +reth-provider = { path = "../storage/provider" } +reth-network = { path = "../net/network" } #async tokio = { version = "1.21.2", features = ["sync"] } @@ -34,7 +35,7 @@ rayon = "1.6.0" reth-db = { path = "../storage/db", features = ["test-utils", "mdbx"] } reth-interfaces = { path = "../interfaces", features = ["test-utils"] } reth-downloaders = { path = "../net/downloaders" } -reth-eth-wire = { path = "../net/eth-wire" } # TODO(onbjerg): We only need this for [BlockBody] +reth-eth-wire = { path = "../net/eth-wire" } # TODO(onbjerg): We only need this for [BlockBody] tokio = { version = "*", features = ["rt", "sync", "macros"] } tokio-stream = "0.1.10" tempfile = "3.3.0" diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index 1a3ccdfb9..7e59fd82a 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -13,7 +13,7 @@ use reth_db::{ use reth_interfaces::{ consensus::{Consensus, ForkchoiceState}, p2p::headers::{ - client::HeadersClient, + client::{HeadersClient, StatusUpdater}, downloader::{ensure_parent, HeaderDownloader}, error::DownloadError, }, @@ -40,20 +40,22 @@ const HEADERS: StageId = StageId("Headers"); /// [`HeaderTD`][reth_interfaces::db::tables::HeaderTD] table). The stage does not return the /// control flow to the pipeline in order to preserve the context of the chain tip. #[derive(Debug)] -pub struct HeaderStage { +pub struct HeaderStage { /// Strategy for downloading the headers pub downloader: D, /// Consensus client implementation pub consensus: Arc, /// Downloader client implementation pub client: Arc, + /// Network handle for updating status + pub network_handle: S, /// The number of block headers to commit at once pub commit_threshold: usize, } #[async_trait::async_trait] -impl Stage - for HeaderStage +impl Stage + for HeaderStage { /// Return the id of the stage fn id(&self) -> StageId { @@ -146,7 +148,9 @@ impl Stage HeaderStage { +impl + HeaderStage +{ async fn update_head( &self, db: &StageDB<'_, DB>, @@ -156,7 +160,8 @@ impl HeaderStage { let td: U256 = *db .get::(block_key)? .ok_or(DatabaseIntegrityError::TotalDifficulty { number: height })?; - self.client.update_status(height, block_key.hash(), td); + // self.client.update_status(height, block_key.hash(), td); + self.network_handle.update_status(height, block_key.hash(), td); Ok(()) } @@ -366,7 +371,7 @@ mod tests { p2p::headers::downloader::HeaderDownloader, test_utils::{ generators::{random_header, random_header_range}, - TestConsensus, TestHeaderDownloader, TestHeadersClient, + TestConsensus, TestHeaderDownloader, TestHeadersClient, TestStatusUpdater, }, }; use reth_primitives::{BlockNumber, SealedHeader, U256}; @@ -376,6 +381,7 @@ mod tests { pub(crate) consensus: Arc, pub(crate) client: Arc, downloader: Arc, + network_handle: TestStatusUpdater, db: TestStageDB, } @@ -387,13 +393,14 @@ mod tests { client: client.clone(), consensus: consensus.clone(), downloader: Arc::new(TestHeaderDownloader::new(client, consensus, 1000)), + network_handle: TestStatusUpdater::default(), db: TestStageDB::default(), } } } impl StageTestRunner for HeadersTestRunner { - type S = HeaderStage, TestConsensus, TestHeadersClient>; + type S = HeaderStage, TestConsensus, TestHeadersClient, TestStatusUpdater>; fn db(&self) -> &TestStageDB { &self.db @@ -404,6 +411,7 @@ mod tests { consensus: self.consensus.clone(), client: self.client.clone(), downloader: self.downloader.clone(), + network_handle: self.network_handle.clone(), commit_threshold: 100, } } @@ -504,7 +512,13 @@ mod tests { let downloader = Arc::new( LinearDownloadBuilder::default().build(consensus.clone(), client.clone()), ); - Self { client, consensus, downloader, db: TestStageDB::default() } + Self { + client, + consensus, + downloader, + network_handle: TestStatusUpdater::default(), + db: TestStageDB::default(), + } } }