diff --git a/crates/interfaces/src/test_utils/api.rs b/crates/interfaces/src/test_utils/api.rs index fa9b6383d..dc6247605 100644 --- a/crates/interfaces/src/test_utils/api.rs +++ b/crates/interfaces/src/test_utils/api.rs @@ -1,5 +1,8 @@ -use crate::{provider, provider::BlockProvider}; -use reth_primitives::{rpc::BlockId, Block, BlockNumber, H256, U256}; +use crate::{ + provider, + provider::{BlockProvider, HeaderProvider}, +}; +use reth_primitives::{rpc::BlockId, Block, BlockHash, BlockNumber, Header, H256, U256}; /// Supports various api interfaces for testing purposes. #[derive(Debug, Clone, Default)] @@ -29,3 +32,13 @@ impl BlockProvider for TestApi { Ok(None) } } + +impl HeaderProvider for TestApi { + fn header(&self, _block_hash: &BlockHash) -> crate::Result> { + Ok(None) + } + + fn header_by_number(&self, _num: u64) -> crate::Result> { + Ok(None) + } +} diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index 6ec0b7d99..45b207ad2 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -108,46 +108,35 @@ impl StateFetcher { Some(FetchAction::BlockRequest { peer_id, request }) } - /// Received a request via a downloader - fn on_download_request(&mut self, request: DownloadRequest) -> Option { - match request { - DownloadRequest::GetBlockHeaders { request: _, response: _ } => {} - DownloadRequest::GetBlockBodies { .. } => {} - } - None - } - /// Advance the state the syncer pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll { // drain buffered actions first - if let Some(action) = self.poll_action() { - return Poll::Ready(action) - } - - if let Poll::Ready(Some(status)) = self.status_rx.poll_next_unpin(cx) { - return Poll::Ready(FetchAction::StatusUpdate(status)) - } - loop { - // poll incoming requests - match self.download_requests_rx.poll_next_unpin(cx) { - Poll::Ready(Some(request)) => { - if let Some(action) = self.on_download_request(request) { - return Poll::Ready(action) + if let Some(action) = self.poll_action() { + return Poll::Ready(action) + } + + if let Poll::Ready(Some(status)) = self.status_rx.poll_next_unpin(cx) { + return Poll::Ready(FetchAction::StatusUpdate(status)) + } + + loop { + // poll incoming requests + match self.download_requests_rx.poll_next_unpin(cx) { + Poll::Ready(Some(request)) => { + self.queued_requests.push_back(request); } + Poll::Ready(None) => { + unreachable!("channel can't close") + } + Poll::Pending => break, } - Poll::Ready(None) => { - unreachable!("channel can't close") - } - Poll::Pending => break, + } + + if self.queued_requests.is_empty() { + return Poll::Pending } } - - if self.queued_requests.is_empty() { - return Poll::Pending - } - - Poll::Pending } /// Handles a new request to a peer. diff --git a/crates/net/network/tests/it/main.rs b/crates/net/network/tests/it/main.rs index da8117f5a..d219480a6 100644 --- a/crates/net/network/tests/it/main.rs +++ b/crates/net/network/tests/it/main.rs @@ -1,4 +1,5 @@ mod connect; +mod requests; mod testnet; pub use testnet::*; diff --git a/crates/net/network/tests/it/requests.rs b/crates/net/network/tests/it/requests.rs new file mode 100644 index 000000000..9c2b04bfa --- /dev/null +++ b/crates/net/network/tests/it/requests.rs @@ -0,0 +1,120 @@ +//! Tests for eth related requests + +use super::testnet::Testnet; +use crate::{MockEthProvider, NetworkEventStream}; +use rand::Rng; +use reth_eth_wire::BlockBody; +use reth_interfaces::p2p::{ + bodies::client::BodiesClient, + headers::client::{HeadersClient, HeadersRequest}, +}; +use reth_primitives::{ + Block, Bytes, Header, Signature, Transaction, TransactionKind, TransactionSigned, TxEip2930, + H256, U256, +}; +use std::sync::Arc; + +/// Returns a new [`TransactionSigned`] with some random parameters +pub fn rng_transaction(rng: &mut impl rand::RngCore) -> TransactionSigned { + let request = Transaction::Eip2930(TxEip2930 { + chain_id: rng.gen(), + nonce: rng.gen(), + gas_price: rng.gen(), + gas_limit: rng.gen(), + to: TransactionKind::Create, + value: rng.gen(), + input: Bytes::from(vec![1, 2]), + access_list: Default::default(), + }); + let signature = Signature { odd_y_parity: true, r: U256::default(), s: U256::default() }; + + TransactionSigned::from_transaction_and_signature(request, signature) +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_get_body() { + reth_tracing::init_tracing(); + let mut rng = rand::thread_rng(); + let mock_provider = Arc::new(MockEthProvider::default()); + + let mut net = Testnet::create_with(2, mock_provider.clone()).await; + + // install request handlers + net.for_each_mut(|peer| peer.install_request_handler()); + + let handle0 = net.peers()[0].handle(); + let mut events0 = NetworkEventStream::new(handle0.event_listener()); + + let handle1 = net.peers()[1].handle(); + + let _handle = net.spawn(); + + let fetch0 = handle0.fetch_client().await.unwrap(); + + handle0.add_peer(*handle1.peer_id(), handle1.local_addr()); + let connected = events0.next_session_established().await.unwrap(); + assert_eq!(connected, *handle1.peer_id()); + + // request some blocks + for _ in 0..100 { + // Set a new random block to the mock storage and request it via the network + let block_hash = H256::random(); + let mut block = Block::default(); + block.body.push(rng_transaction(&mut rng)); + + mock_provider.add_block(block_hash, block.clone()); + + let res = fetch0.get_block_body(vec![block_hash]).await; + assert!(res.is_ok()); + + let blocks = res.unwrap(); + assert_eq!(blocks.len(), 1); + let expected = BlockBody { transactions: block.body, ommers: block.ommers }; + assert_eq!(blocks[0], expected); + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_get_header() { + reth_tracing::init_tracing(); + let mut rng = rand::thread_rng(); + let mock_provider = Arc::new(MockEthProvider::default()); + + let mut net = Testnet::create_with(2, mock_provider.clone()).await; + + // install request handlers + net.for_each_mut(|peer| peer.install_request_handler()); + + let handle0 = net.peers()[0].handle(); + let mut events0 = NetworkEventStream::new(handle0.event_listener()); + + let handle1 = net.peers()[1].handle(); + + let _handle = net.spawn(); + + let fetch0 = handle0.fetch_client().await.unwrap(); + + handle0.add_peer(*handle1.peer_id(), handle1.local_addr()); + let connected = events0.next_session_established().await.unwrap(); + assert_eq!(connected, *handle1.peer_id()); + + let start: u64 = rng.gen(); + let mut hash = H256::random(); + // request some headers + for idx in 0..100 { + // Set a new random header to the mock storage and request it via the network + let header = Header { number: start + idx, parent_hash: hash, ..Default::default() }; + hash = H256::random(); + + mock_provider.add_header(hash, header.clone()); + + let req = HeadersRequest { start: hash.into(), limit: 1, reverse: false }; + + let res = fetch0.get_headers(req).await; + assert!(res.is_ok()); + + let headers = res.unwrap().0; + assert_eq!(headers.len(), 1); + assert_eq!(headers[0], header); + } +} diff --git a/crates/net/network/tests/it/testnet.rs b/crates/net/network/tests/it/testnet.rs index 54af83682..7797def48 100644 --- a/crates/net/network/tests/it/testnet.rs +++ b/crates/net/network/tests/it/testnet.rs @@ -1,11 +1,23 @@ //! A network implementation for testing purposes. -use futures::FutureExt; +use futures::{FutureExt, StreamExt}; +use parking_lot::Mutex; use pin_project::pin_project; -use reth_interfaces::{provider::BlockProvider, test_utils::TestApi}; -use reth_network::{error::NetworkError, NetworkConfig, NetworkHandle, NetworkManager}; +use reth_interfaces::{ + provider::{BlockProvider, ChainInfo, HeaderProvider}, + test_utils::TestApi, +}; +use reth_network::{ + error::NetworkError, eth_requests::EthRequestHandler, NetworkConfig, NetworkEvent, + NetworkHandle, NetworkManager, +}; +use reth_primitives::{ + rpc::{BlockId, BlockNumber}, + Block, BlockHash, Header, PeerId, H256, U256, +}; use secp256k1::SecretKey; use std::{ + collections::HashMap, fmt, future::Future, net::{Ipv4Addr, SocketAddr, SocketAddrV4}, @@ -13,7 +25,11 @@ use std::{ sync::Arc, task::{Context, Poll}, }; -use tokio::{sync::oneshot, task::JoinHandle}; +use tokio::{ + sync::{mpsc::unbounded_channel, oneshot}, + task::JoinHandle, +}; +use tokio_stream::wrappers::UnboundedReceiverStream; /// A test network consisting of multiple peers. #[derive(Default)] @@ -26,8 +42,23 @@ pub struct Testnet { impl Testnet where - C: BlockProvider, + C: BlockProvider + HeaderProvider, { + /// Same as [`Self::try_create_with`] but panics on error + pub async fn create_with(num_peers: usize, provider: Arc) -> Self { + Self::try_create_with(num_peers, provider).await.unwrap() + } + + /// Creates a new [`Testnet`] with the given number of peers and the provider. + pub async fn try_create_with(num_peers: usize, provider: Arc) -> Result { + let mut this = Self { peers: Vec::with_capacity(num_peers) }; + for _ in 0..num_peers { + let config = PeerConfig::new(Arc::clone(&provider)); + this.add_peer_with_config(config).await?; + } + Ok(this) + } + pub fn peers_mut(&mut self) -> &mut [Peer] { &mut self.peers } @@ -51,7 +82,7 @@ where let PeerConfig { config, client, secret_key } = config; let network = NetworkManager::new(config).await?; - let peer = Peer { network, client, secret_key }; + let peer = Peer { network, client, secret_key, request_handler: None }; self.peers.push(peer); Ok(()) } @@ -68,11 +99,19 @@ where { self.peers.iter().for_each(f) } + + /// Apply a closure on each peer + pub fn for_each_mut(&mut self, f: F) + where + F: FnMut(&mut Peer), + { + self.peers.iter_mut().for_each(f) + } } impl Testnet where - C: BlockProvider + 'static, + C: BlockProvider + HeaderProvider + 'static, { /// Spawns the testnet to a separate task pub fn spawn(self) -> TestnetHandle { @@ -126,7 +165,7 @@ impl fmt::Debug for Testnet { impl Future for Testnet where - C: BlockProvider, + C: BlockProvider + HeaderProvider, { type Output = (); @@ -159,6 +198,8 @@ impl TestnetHandle { pub struct Peer { #[pin] network: NetworkManager, + #[pin] + request_handler: Option>, client: Arc, secret_key: SecretKey, } @@ -167,7 +208,7 @@ pub struct Peer { impl Peer where - C: BlockProvider, + C: BlockProvider + HeaderProvider, { pub fn num_peers(&self) -> usize { self.network.num_connected_peers() @@ -181,16 +222,31 @@ where pub fn handle(&self) -> NetworkHandle { self.network.handle().clone() } + + /// Set a new request handler that's connected tot the peer's network + pub fn install_request_handler(&mut self) { + let (tx, rx) = unbounded_channel(); + self.network.set_eth_request_handler(tx); + let peers = self.network.peers_handle(); + let request_handler = EthRequestHandler::new(Arc::clone(&self.client), peers, rx); + self.request_handler = Some(request_handler); + } } impl Future for Peer where - C: BlockProvider, + C: BlockProvider + HeaderProvider, { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().network.poll(cx) + let this = self.project(); + + if let Some(request) = this.request_handler.as_pin_mut() { + let _ = request.poll(cx); + } + + this.network.poll(cx) } } @@ -200,9 +256,13 @@ pub struct PeerConfig { secret_key: SecretKey, } -impl Default for PeerConfig { - fn default() -> Self { - let client = Arc::new(TestApi::default()); +// === impl PeerConfig === + +impl PeerConfig +where + C: BlockProvider + HeaderProvider, +{ + pub fn new(client: Arc) -> Self { let secret_key = SecretKey::new(&mut rand::thread_rng()); let config = NetworkConfig::builder(Arc::clone(&client), secret_key) .listener_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))) @@ -211,3 +271,120 @@ impl Default for PeerConfig { Self { config, client, secret_key } } } + +impl Default for PeerConfig { + fn default() -> Self { + Self::new(Arc::new(TestApi::default())) + } +} + +/// A helper type to await network events +/// +/// This makes it easier to await established connections +pub struct NetworkEventStream { + inner: UnboundedReceiverStream, +} + +// === impl NetworkEventStream === + +impl NetworkEventStream { + pub fn new(inner: UnboundedReceiverStream) -> Self { + Self { inner } + } + + /// Awaits the next event for an established session + pub async fn next_session_established(&mut self) -> Option { + while let Some(ev) = self.inner.next().await { + match ev { + NetworkEvent::SessionClosed { .. } => continue, + NetworkEvent::SessionEstablished { peer_id, .. } => return Some(peer_id), + } + } + None + } +} + +/// A mock implementation for Provider interfaces. +#[derive(Debug, Clone, Default)] +pub struct MockEthProvider { + pub blocks: Arc>>, + pub headers: Arc>>, +} + +impl MockEthProvider { + pub fn add_block(&self, hash: H256, block: Block) { + self.blocks.lock().insert(hash, block); + } + + pub fn extend_blocks(&self, iter: impl IntoIterator) { + for (hash, block) in iter.into_iter() { + self.add_block(hash, block) + } + } + + pub fn add_header(&self, hash: H256, header: Header) { + self.headers.lock().insert(hash, header); + } + + pub fn extend_headers(&self, iter: impl IntoIterator) { + for (hash, header) in iter.into_iter() { + self.add_header(hash, header) + } + } +} + +impl HeaderProvider for MockEthProvider { + fn header(&self, block_hash: &BlockHash) -> reth_interfaces::Result> { + let lock = self.headers.lock(); + Ok(lock.get(block_hash).cloned()) + } + + fn header_by_number(&self, num: u64) -> reth_interfaces::Result> { + let lock = self.headers.lock(); + Ok(lock.values().find(|h| h.number == num).cloned()) + } +} + +impl BlockProvider for MockEthProvider { + fn chain_info(&self) -> reth_interfaces::Result { + todo!() + } + + fn block(&self, id: BlockId) -> reth_interfaces::Result> { + let lock = self.blocks.lock(); + match id { + BlockId::Hash(hash) => Ok(lock.get(&hash).cloned()), + BlockId::Number(BlockNumber::Number(num)) => { + Ok(lock.values().find(|b| b.number == num.as_u64()).cloned()) + } + _ => { + unreachable!("unused in network tests") + } + } + } + + fn block_number( + &self, + hash: H256, + ) -> reth_interfaces::Result> { + let lock = self.blocks.lock(); + let num = lock.iter().find_map(|(h, b)| if *h == hash { Some(b.number) } else { None }); + Ok(num) + } + + fn block_hash(&self, number: U256) -> reth_interfaces::Result> { + let lock = self.blocks.lock(); + + let hash = + lock.iter().find_map( + |(hash, b)| { + if b.number == number.as_u64() { + Some(*hash) + } else { + None + } + }, + ); + Ok(hash) + } +} diff --git a/crates/primitives/src/transaction/mod.rs b/crates/primitives/src/transaction/mod.rs index bfaa81063..5238313cc 100644 --- a/crates/primitives/src/transaction/mod.rs +++ b/crates/primitives/src/transaction/mod.rs @@ -541,16 +541,16 @@ pub struct TransactionSigned { } impl Encodable for TransactionSigned { + fn encode(&self, out: &mut dyn bytes::BufMut) { + self.encode_inner(out, true); + } + fn length(&self) -> usize { let len = self.payload_len(); // add the length of the RLP header len + length_of_length(len) } - - fn encode(&self, out: &mut dyn bytes::BufMut) { - self.encode_inner(out, true); - } } /// This `Decodable` implementation only supports decoding the transaction format sent over p2p.