From b10517b3bf52fcb08026aabde56bcbf22f7fa6b3 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Thu, 1 Aug 2024 18:44:23 +0200 Subject: [PATCH] chore(net): extract `NetworkHandle` methods for launching node to traits (#9966) --- Cargo.lock | 168 +++++--- bin/reth/src/commands/debug_cmd/execution.rs | 8 +- .../commands/debug_cmd/in_memory_merkle.rs | 16 +- bin/reth/src/commands/debug_cmd/merkle.rs | 9 +- .../src/commands/debug_cmd/replay_engine.rs | 8 +- crates/cli/commands/src/p2p.rs | 2 +- crates/cli/commands/src/stage/run.rs | 8 +- crates/e2e-test-utils/src/network.rs | 2 +- crates/ethereum/node/src/launch.rs | 2 +- crates/net/network-api/src/lib.rs | 51 +-- crates/net/network-api/src/noop.rs | 10 +- crates/net/network-types/Cargo.toml | 5 + crates/net/network-types/src/lib.rs | 17 +- crates/net/network-types/src/peers/addr.rs | 45 ++ crates/net/network-types/src/peers/cmd.rs | 25 ++ crates/net/network-types/src/peers/config.rs | 8 +- crates/net/network-types/src/peers/handle.rs | 55 +++ crates/net/network-types/src/peers/kind.rs | 30 ++ crates/net/network-types/src/peers/mod.rs | 137 ++++++ crates/net/network-types/src/peers/state.rs | 57 +++ crates/net/network/src/builder.rs | 6 +- crates/net/network/src/cache.rs | 3 +- crates/net/network/src/config.rs | 16 +- crates/net/network/src/discovery.rs | 26 +- crates/net/network/src/error.rs | 6 +- crates/net/network/src/eth_requests.rs | 21 +- crates/net/network/src/fetch/client.rs | 15 +- crates/net/network/src/fetch/mod.rs | 29 +- crates/net/network/src/flattened_response.rs | 5 +- crates/net/network/src/import.rs | 6 +- crates/net/network/src/lib.rs | 50 +-- crates/net/network/src/listener.rs | 3 +- crates/net/network/src/manager.rs | 4 +- crates/net/network/src/message.rs | 11 +- crates/net/network/src/network.rs | 112 +++-- crates/net/network/src/peers.rs | 400 +++--------------- crates/net/network/src/protocol.rs | 13 +- crates/net/network/src/session/active.rs | 36 +- crates/net/network/src/session/conn.rs | 9 +- crates/net/network/src/session/handle.rs | 17 +- crates/net/network/src/session/mod.rs | 52 ++- crates/net/network/src/state.rs | 41 +- crates/net/network/src/swarm.rs | 31 +- crates/net/network/src/test_utils/init.rs | 3 +- crates/net/network/src/test_utils/testnet.rs | 36 +- crates/net/network/src/transactions/config.rs | 3 +- .../net/network/src/transactions/fetcher.rs | 26 +- crates/net/network/src/transactions/mod.rs | 60 +-- crates/net/network/tests/it/connect.rs | 6 +- crates/net/network/tests/it/multiplex.rs | 14 +- crates/net/network/tests/it/requests.rs | 5 +- crates/net/network/tests/it/startup.rs | 9 +- crates/net/network/tests/it/txgossip.rs | 3 +- crates/net/p2p/src/reputation.rs | 13 + crates/node/builder/src/launch/mod.rs | 2 +- examples/custom-rlpx-subprotocol/src/main.rs | 6 +- 56 files changed, 985 insertions(+), 776 deletions(-) create mode 100644 crates/net/network-types/src/peers/addr.rs create mode 100644 crates/net/network-types/src/peers/cmd.rs create mode 100644 crates/net/network-types/src/peers/handle.rs create mode 100644 crates/net/network-types/src/peers/kind.rs create mode 100644 crates/net/network-types/src/peers/state.rs diff --git a/Cargo.lock b/Cargo.lock index c85778c73..67fd9c2a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -74,7 +74,7 @@ dependencies = [ "getrandom 0.2.15", "once_cell", "version_check", - "zerocopy", + "zerocopy 0.7.35", ] [[package]] @@ -109,9 +109,9 @@ checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" [[package]] name = "alloy-chains" -version = "0.1.23" +version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1752d7d62e2665da650a36d84abbf239f812534475d51f072a49a533513b7cdd" +checksum = "47ff94ce0f141c2671c23d02c7b88990dd432856639595c5d010663d017c2c58" dependencies = [ "alloy-rlp", "arbitrary", @@ -151,7 +151,7 @@ dependencies = [ "itoa", "serde", "serde_json", - "winnow 0.6.16", + "winnow 0.6.18", ] [[package]] @@ -567,7 +567,7 @@ dependencies = [ "alloy-sol-macro-input", "const-hex", "heck 0.5.0", - "indexmap 2.2.6", + "indexmap 2.3.0", "proc-macro-error", "proc-macro2", "quote", @@ -598,7 +598,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cbcba3ca07cf7975f15d871b721fb18031eec8bce51103907f6dcce00b255d98" dependencies = [ "serde", - "winnow 0.6.16", + "winnow 0.6.18", ] [[package]] @@ -1302,7 +1302,7 @@ dependencies = [ "bitflags 2.6.0", "boa_interner", "boa_macros", - "indexmap 2.2.6", + "indexmap 2.3.0", "num-bigint", "rustc-hash 2.0.0", ] @@ -1328,7 +1328,7 @@ dependencies = [ "fast-float", "hashbrown 0.14.5", "icu_normalizer", - "indexmap 2.2.6", + "indexmap 2.3.0", "intrusive-collections", "itertools 0.13.0", "num-bigint", @@ -1374,7 +1374,7 @@ dependencies = [ "boa_gc", "boa_macros", "hashbrown 0.14.5", - "indexmap 2.2.6", + "indexmap 2.3.0", "once_cell", "phf", "rustc-hash 2.0.0", @@ -1496,9 +1496,9 @@ checksum = "c3ac9f8b63eca6fd385229b3675f6cc0dc5c8a5c8a54a59d4f52ffd670d87b0c" [[package]] name = "bytemuck" -version = "1.16.1" +version = "1.16.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b236fc92302c97ed75b38da1f4917b5cdda4984745740f153a5d3059e48d725e" +checksum = "102087e286b4677862ea56cf8fc58bb2cdfa8725c40ffb80fe3a008eb7f2fc83" dependencies = [ "bytemuck_derive", ] @@ -1522,9 +1522,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.6.1" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a12916984aab3fa6e39d655a33e09c0071eb36d6ab3aea5c2d78551f1df6d952" +checksum = "fca2be1d5c43812bae364ee3f30b3afcb7877cf59f4aeb94c66f313a41d2fac9" dependencies = [ "serde", ] @@ -1598,9 +1598,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.6" +version = "1.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2aba8f4e9906c7ce3c73463f62a7f0c65183ada1a2d47e397cc8810827f9694f" +checksum = "26a5c3fd7bfa1ce3897a3a3501d362b2d87b7f2583ebcb4a949ec25911025cbc" dependencies = [ "jobserver", "libc", @@ -1701,9 +1701,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.11" +version = "4.5.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35723e6a11662c2afb578bcf0b88bf6ea8e21282a953428f240574fcc3a2b5b3" +checksum = "0fbb260a053428790f3de475e304ff84cdbc4face759ea7a3e64c1edd938a7fc" dependencies = [ "clap_builder", "clap_derive", @@ -1711,9 +1711,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.11" +version = "4.5.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49eb96cbfa7cfa35017b7cd548c75b14c3118c98b423041d70562665e07fb0fa" +checksum = "64b17d7ea74e9f833c7dbf2cbe4fb12ff26783eda4782a8975b72f895c9b4d99" dependencies = [ "anstream", "anstyle", @@ -1723,9 +1723,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.11" +version = "4.5.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d029b67f89d30bbb547c89fd5161293c0aec155fc691d7924b64550662db93e" +checksum = "501d359d5f3dcaf6ecdeee48833ae73ec6e42723a1e52419c79abf9507eec0a0" dependencies = [ "heck 0.5.0", "proc-macro2", @@ -2660,9 +2660,9 @@ dependencies = [ [[package]] name = "enumn" -version = "0.1.13" +version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fd000fd6988e73bbe993ea3db9b1aa64906ab88766d654973924340c8cddb42" +checksum = "2f9ed6b3789237c8a0c1c505af1c7eb2c560df6186f01b098c3a1064ea532f38" dependencies = [ "proc-macro2", "quote", @@ -3399,7 +3399,7 @@ dependencies = [ "futures-core", "futures-sink", "http 1.1.0", - "indexmap 2.2.6", + "indexmap 2.3.0", "slab", "tokio", "tokio-util", @@ -3988,9 +3988,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.2.6" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" +checksum = "de3fc2e30ba82dd1b3911c8de1ffc143c74a914a14e99514d7637e3099df5ea0" dependencies = [ "equivalent", "hashbrown 0.14.5", @@ -4010,7 +4010,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c77a3ae7d4761b9c64d2c030f70746ceb8cfba32dce0325a56792e0a4816c31" dependencies = [ "ahash", - "indexmap 2.2.6", + "indexmap 2.3.0", "is-terminal", "itoa", "log", @@ -4701,9 +4701,9 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" [[package]] name = "lru" -version = "0.12.3" +version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3262e75e648fce39813cb56ac41f3c3e3f65217ebf3844d818d1f9398cfb0dc" +checksum = "37ee39891760e7d94734f6f63fedc29a2e4a152f836120753a72503f09fcf904" dependencies = [ "hashbrown 0.14.5", ] @@ -4788,7 +4788,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4f0c8427b39666bf970460908b213ec09b3b350f20c0c2eabcbba51704a08e6" dependencies = [ "base64 0.22.1", - "indexmap 2.2.6", + "indexmap 2.3.0", "metrics", "metrics-util", "quanta", @@ -4820,7 +4820,7 @@ dependencies = [ "crossbeam-epoch", "crossbeam-utils", "hashbrown 0.14.5", - "indexmap 2.2.6", + "indexmap 2.3.0", "metrics", "num_cpus", "ordered-float", @@ -5164,18 +5164,18 @@ dependencies = [ [[package]] name = "num_enum" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02339744ee7253741199f897151b38e72257d13802d4ee837285cc2990a90845" +checksum = "4e613fc340b2220f734a8595782c551f1250e969d87d3be1ae0579e8d4065179" dependencies = [ "num_enum_derive", ] [[package]] name = "num_enum_derive" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "681030a937600a36906c185595136d26abfebb4aa9c65701cefcaf8578bb982b" +checksum = "af1844ef2428cc3e1cb900be36181049ef3d3193c63e43026cfe202983b27a56" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -5276,9 +5276,9 @@ checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" [[package]] name = "ordered-float" -version = "4.2.1" +version = "4.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19ff2cf528c6c03d9ed653d6c4ce1dc0582dc4af309790ad92f07c1cd551b0be" +checksum = "4a91171844676f8c7990ce64959210cd2eaef32c2612c50f9fae9f8aaa6065a6" dependencies = [ "num-traits", ] @@ -5648,9 +5648,12 @@ dependencies = [ [[package]] name = "ppv-lite86" -version = "0.2.17" +version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +checksum = "dee4364d9f3b902ef14fab8a1ddffb783a1cb6b4bba3bfc1fa3922732c7de97f" +dependencies = [ + "zerocopy 0.6.6", +] [[package]] name = "predicates" @@ -7432,7 +7435,7 @@ dependencies = [ "criterion", "dashmap 6.0.1", "derive_more", - "indexmap 2.2.6", + "indexmap 2.3.0", "parking_lot 0.12.3", "pprof", "rand 0.8.5", @@ -7608,12 +7611,15 @@ dependencies = [ name = "reth-network-types" version = "1.0.3" dependencies = [ + "derive_more", "humantime-serde", + "reth-ethereum-forks", "reth-net-banlist", "reth-network-p2p", "reth-network-peers", "serde", "serde_json", + "tokio", "tracing", ] @@ -9211,9 +9217,9 @@ checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d" [[package]] name = "rustls-platform-verifier" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e3beb939bcd33c269f4bf946cc829fcd336370267c4a927ac0399c84a3151a1" +checksum = "93bda3f493b9abe5b93b3e7e3ecde0df292f2bd28c0296b90586ee0055ff5123" dependencies = [ "core-foundation", "core-foundation-sys", @@ -9232,9 +9238,9 @@ dependencies = [ [[package]] name = "rustls-platform-verifier-android" -version = "0.1.0" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84e217e7fdc8466b5b35d30f8c0a30febd29173df4a3a0c2115d306b9c4117ad" +checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f" [[package]] name = "rustls-webpki" @@ -9299,9 +9305,9 @@ dependencies = [ [[package]] name = "scc" -version = "2.1.5" +version = "2.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fadf67e3cf23f8b11a6c8c48a16cb2437381503615acd91094ec7b4686a5a53" +checksum = "05ccfb12511cdb770157ace92d7dda771e498445b78f9886e8cdbc5140a4eced" dependencies = [ "sdd", ] @@ -9334,9 +9340,9 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "sdd" -version = "1.7.0" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85f05a494052771fc5bd0619742363b5e24e5ad72ab3111ec2e27925b8edc5f3" +checksum = "177258b64c0faaa9ffd3c65cd3262c2bc7e2588dbbd9c1641d0346145c1bbda8" [[package]] name = "sec1" @@ -9466,12 +9472,13 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.120" +version = "1.0.121" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e0d21c9a8cae1235ad58a00c11cb40d4b1e5c784f1ef2c537876ed6ffd8b7c5" +checksum = "4ab380d7d9f22ef3f21ad3e6c1ebe8e4fc7a2000ccba2e4d71fc96f15b2cb609" dependencies = [ - "indexmap 2.2.6", + "indexmap 2.3.0", "itoa", + "memchr", "ryu", "serde", ] @@ -9518,7 +9525,7 @@ dependencies = [ "chrono", "hex", "indexmap 1.9.3", - "indexmap 2.2.6", + "indexmap 2.3.0", "serde", "serde_derive", "serde_json", @@ -9544,7 +9551,7 @@ version = "0.9.34+deprecated" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ - "indexmap 2.2.6", + "indexmap 2.3.0", "itoa", "ryu", "serde", @@ -9667,9 +9674,9 @@ dependencies = [ [[package]] name = "signal-hook-mio" -version = "0.2.3" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29ad2e15f37ec9a6cc544097b78a1ec90001e9f71b81338ca39f430adaca99af" +checksum = "34db1a06d485c9142248b7a054f034b349b212551f3dfd19c94d45a754a217cd" dependencies = [ "libc", "mio 0.8.11", @@ -10338,21 +10345,21 @@ dependencies = [ [[package]] name = "toml" -version = "0.8.16" +version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81967dd0dd2c1ab0bc3468bd7caecc32b8a4aa47d0c8c695d8c2b2108168d62c" +checksum = "a1ed1f98e3fdc28d6d910e6737ae6ab1a93bf1985935a1193e68f93eeb68d24e" dependencies = [ "serde", "serde_spanned", "toml_datetime", - "toml_edit 0.22.17", + "toml_edit 0.22.20", ] [[package]] name = "toml_datetime" -version = "0.6.7" +version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8fb9f64314842840f1d940ac544da178732128f1c78c21772e876579e0da1db" +checksum = "0dd7358ecb8fc2f8d014bf86f6f638ce72ba252a2c3a2572f2a795f1d23efb41" dependencies = [ "serde", ] @@ -10363,22 +10370,22 @@ version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" dependencies = [ - "indexmap 2.2.6", + "indexmap 2.3.0", "toml_datetime", "winnow 0.5.40", ] [[package]] name = "toml_edit" -version = "0.22.17" +version = "0.22.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d9f8729f5aea9562aac1cc0441f5d6de3cff1ee0c5d67293eeca5eb36ee7c16" +checksum = "583c44c02ad26b0c3f3066fe629275e50627026c51ac2e595cca4c230ce1ce1d" dependencies = [ - "indexmap 2.2.6", + "indexmap 2.3.0", "serde", "serde_spanned", "toml_datetime", - "winnow 0.6.16", + "winnow 0.6.18", ] [[package]] @@ -10629,9 +10636,9 @@ checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] name = "trybuild" -version = "1.0.97" +version = "1.0.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b1e5645f2ee8025c2f1d75e1138f2dd034d74e6ba54620f3c569ba2a2a1ea06" +checksum = "207aa50d36c4be8d8c6ea829478be44a372c6a77669937bb39c698e52f1491e8" dependencies = [ "glob", "serde", @@ -11275,9 +11282,9 @@ dependencies = [ [[package]] name = "winnow" -version = "0.6.16" +version = "0.6.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b480ae9340fc261e6be3e95a1ba86d54ae3f9171132a73ce8d4bbaf68339507c" +checksum = "68a9bda4691f099d435ad181000724da8e5899daa10713c2d432552b9ccd3a6f" dependencies = [ "memchr", ] @@ -11375,13 +11382,34 @@ dependencies = [ "synstructure", ] +[[package]] +name = "zerocopy" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "854e949ac82d619ee9a14c66a1b674ac730422372ccb759ce0c39cabcf2bf8e6" +dependencies = [ + "byteorder", + "zerocopy-derive 0.6.6", +] + [[package]] name = "zerocopy" version = "0.7.35" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" dependencies = [ - "zerocopy-derive", + "zerocopy-derive 0.7.35", +] + +[[package]] +name = "zerocopy-derive" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "125139de3f6b9d625c39e2efdd73d41bdac468ccd556556440e322be0e1bbd91" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.72", ] [[package]] diff --git a/bin/reth/src/commands/debug_cmd/execution.rs b/bin/reth/src/commands/debug_cmd/execution.rs index 3298fe5d6..9df22664f 100644 --- a/bin/reth/src/commands/debug_cmd/execution.rs +++ b/bin/reth/src/commands/debug_cmd/execution.rs @@ -1,6 +1,7 @@ //! Command for debugging execution. -use crate::{args::NetworkArgs, macros::block_executor, utils::get_single_header}; +use std::{path::PathBuf, sync::Arc}; + use clap::Parser; use futures::{stream::select as stream_select, StreamExt}; use reth_beacon_consensus::EthBeaconConsensus; @@ -16,7 +17,7 @@ use reth_downloaders::{ headers::reverse_headers::ReverseHeadersDownloaderBuilder, }; use reth_exex::ExExManagerHandle; -use reth_network::{NetworkEvents, NetworkHandle}; +use reth_network::{BlockDownloaderProvider, NetworkEvents, NetworkHandle}; use reth_network_api::NetworkInfo; use reth_network_p2p::{bodies::client::BodiesClient, headers::client::HeadersClient}; use reth_primitives::{BlockHashOrNumber, BlockNumber, B256}; @@ -30,10 +31,11 @@ use reth_stages::{ }; use reth_static_file::StaticFileProducer; use reth_tasks::TaskExecutor; -use std::{path::PathBuf, sync::Arc}; use tokio::sync::watch; use tracing::*; +use crate::{args::NetworkArgs, macros::block_executor, utils::get_single_header}; + /// `reth debug execution` command #[derive(Debug, Parser)] pub struct Command { diff --git a/bin/reth/src/commands/debug_cmd/in_memory_merkle.rs b/bin/reth/src/commands/debug_cmd/in_memory_merkle.rs index df288b5ef..3d5e20c8f 100644 --- a/bin/reth/src/commands/debug_cmd/in_memory_merkle.rs +++ b/bin/reth/src/commands/debug_cmd/in_memory_merkle.rs @@ -1,10 +1,7 @@ //! Command for debugging in-memory merkle trie calculation. -use crate::{ - args::NetworkArgs, - macros::block_executor, - utils::{get_single_body, get_single_header}, -}; +use std::{path::PathBuf, sync::Arc}; + use backon::{ConstantBuilder, Retryable}; use clap::Parser; use reth_cli_commands::common::{AccessRights, Environment, EnvironmentArgs}; @@ -15,7 +12,7 @@ use reth_db::DatabaseEnv; use reth_errors::BlockValidationError; use reth_evm::execute::{BlockExecutionOutput, BlockExecutorProvider, Executor}; use reth_execution_types::ExecutionOutcome; -use reth_network::NetworkHandle; +use reth_network::{BlockDownloaderProvider, NetworkHandle}; use reth_network_api::NetworkInfo; use reth_primitives::BlockHashOrNumber; use reth_provider::{ @@ -28,9 +25,14 @@ use reth_stages::StageId; use reth_tasks::TaskExecutor; use reth_trie::StateRoot; use reth_trie_db::DatabaseStateRoot; -use std::{path::PathBuf, sync::Arc}; use tracing::*; +use crate::{ + args::NetworkArgs, + macros::block_executor, + utils::{get_single_body, get_single_header}, +}; + /// `reth debug in-memory-merkle` command /// This debug routine requires that the node is positioned at the block before the target. /// The script will then download the block from p2p network and attempt to calculate and verify diff --git a/bin/reth/src/commands/debug_cmd/merkle.rs b/bin/reth/src/commands/debug_cmd/merkle.rs index 6d8667f82..9bfaf6412 100644 --- a/bin/reth/src/commands/debug_cmd/merkle.rs +++ b/bin/reth/src/commands/debug_cmd/merkle.rs @@ -1,5 +1,7 @@ //! Command for debugging merkle trie calculation. -use crate::{args::NetworkArgs, macros::block_executor, utils::get_single_header}; + +use std::{path::PathBuf, sync::Arc}; + use backon::{ConstantBuilder, Retryable}; use clap::Parser; use reth_beacon_consensus::EthBeaconConsensus; @@ -11,7 +13,7 @@ use reth_consensus::Consensus; use reth_db::{tables, DatabaseEnv}; use reth_db_api::{cursor::DbCursorRO, transaction::DbTx}; use reth_evm::execute::{BatchExecutor, BlockExecutorProvider}; -use reth_network::NetworkHandle; +use reth_network::{BlockDownloaderProvider, NetworkHandle}; use reth_network_api::NetworkInfo; use reth_network_p2p::full_block::FullBlockClient; use reth_primitives::BlockHashOrNumber; @@ -25,9 +27,10 @@ use reth_stages::{ ExecInput, Stage, StageCheckpoint, }; use reth_tasks::TaskExecutor; -use std::{path::PathBuf, sync::Arc}; use tracing::*; +use crate::{args::NetworkArgs, macros::block_executor, utils::get_single_header}; + /// `reth debug merkle` command #[derive(Debug, Parser)] pub struct Command { diff --git a/bin/reth/src/commands/debug_cmd/replay_engine.rs b/bin/reth/src/commands/debug_cmd/replay_engine.rs index 4339a0f76..0f0e14b4a 100644 --- a/bin/reth/src/commands/debug_cmd/replay_engine.rs +++ b/bin/reth/src/commands/debug_cmd/replay_engine.rs @@ -1,4 +1,5 @@ -use crate::{args::NetworkArgs, macros::block_executor}; +use std::{path::PathBuf, sync::Arc, time::Duration}; + use clap::Parser; use eyre::Context; use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig}; @@ -14,7 +15,7 @@ use reth_consensus::Consensus; use reth_db::DatabaseEnv; use reth_engine_util::engine_store::{EngineMessageStore, StoredEngineApiMessage}; use reth_fs_util as fs; -use reth_network::NetworkHandle; +use reth_network::{BlockDownloaderProvider, NetworkHandle}; use reth_network_api::NetworkInfo; use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService}; use reth_provider::{ @@ -25,10 +26,11 @@ use reth_stages::Pipeline; use reth_static_file::StaticFileProducer; use reth_tasks::TaskExecutor; use reth_transaction_pool::noop::NoopTransactionPool; -use std::{path::PathBuf, sync::Arc, time::Duration}; use tokio::sync::oneshot; use tracing::*; +use crate::{args::NetworkArgs, macros::block_executor}; + /// `reth debug replay-engine` command /// This script will read stored engine API messages and replay them by the timestamp. /// It does not require diff --git a/crates/cli/commands/src/p2p.rs b/crates/cli/commands/src/p2p.rs index c7c2c414d..67fe7ad11 100644 --- a/crates/cli/commands/src/p2p.rs +++ b/crates/cli/commands/src/p2p.rs @@ -5,7 +5,7 @@ use clap::{Parser, Subcommand}; use reth_chainspec::ChainSpec; use reth_cli_util::{get_secret_key, hash_or_num_value_parser}; use reth_config::Config; -use reth_network::NetworkConfigBuilder; +use reth_network::{BlockDownloaderProvider, NetworkConfigBuilder}; use reth_network_p2p::bodies::client::BodiesClient; use reth_node_core::{ args::{ diff --git a/crates/cli/commands/src/stage/run.rs b/crates/cli/commands/src/stage/run.rs index 6949d8aa5..ddcbdd13c 100644 --- a/crates/cli/commands/src/stage/run.rs +++ b/crates/cli/commands/src/stage/run.rs @@ -1,7 +1,9 @@ //! Main `stage` command //! //! Stage debugging tool -use crate::common::{AccessRights, Environment, EnvironmentArgs}; + +use std::{any::Any, net::SocketAddr, sync::Arc, time::Instant}; + use clap::Parser; use reth_beacon_consensus::EthBeaconConsensus; use reth_chainspec::ChainSpec; @@ -11,6 +13,7 @@ use reth_config::config::{HashingConfig, SenderRecoveryConfig, TransactionLookup use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder; use reth_evm::execute::BlockExecutorProvider; use reth_exex::ExExManagerHandle; +use reth_network::BlockDownloaderProvider; use reth_node_core::{ args::{NetworkArgs, StageEnum}, version::{ @@ -35,9 +38,10 @@ use reth_stages::{ }, ExecInput, ExecOutput, ExecutionStageThresholds, Stage, StageExt, UnwindInput, UnwindOutput, }; -use std::{any::Any, net::SocketAddr, sync::Arc, time::Instant}; use tracing::*; +use crate::common::{AccessRights, Environment, EnvironmentArgs}; + /// `reth stage` command #[derive(Debug, Parser)] pub struct Command { diff --git a/crates/e2e-test-utils/src/network.rs b/crates/e2e-test-utils/src/network.rs index 057072299..fd4258efa 100644 --- a/crates/e2e-test-utils/src/network.rs +++ b/crates/e2e-test-utils/src/network.rs @@ -1,6 +1,6 @@ use futures_util::StreamExt; use reth::{ - network::{NetworkEvent, NetworkEvents, NetworkHandle, PeersInfo}, + network::{NetworkEvent, NetworkEvents, NetworkHandle, PeersHandleProvider, PeersInfo}, rpc::types::PeerId, }; use reth_network_peers::NodeRecord; diff --git a/crates/ethereum/node/src/launch.rs b/crates/ethereum/node/src/launch.rs index 2ecc57ac0..c2f36f619 100644 --- a/crates/ethereum/node/src/launch.rs +++ b/crates/ethereum/node/src/launch.rs @@ -10,7 +10,7 @@ use reth_engine_tree::tree::TreeConfig; use reth_ethereum_engine::service::{ChainEvent, EthService}; use reth_ethereum_engine_primitives::EthEngineTypes; use reth_exex::ExExManagerHandle; -use reth_network::{NetworkEvents, NetworkSyncUpdater, SyncState}; +use reth_network::{BlockDownloaderProvider, NetworkEvents, NetworkSyncUpdater, SyncState}; use reth_node_api::{FullNodeTypes, NodeAddOns}; use reth_node_builder::{ hooks::NodeHooks, diff --git a/crates/net/network-api/src/lib.rs b/crates/net/network-api/src/lib.rs index d973a43e5..fddba76df 100644 --- a/crates/net/network-api/src/lib.rs +++ b/crates/net/network-api/src/lib.rs @@ -13,24 +13,23 @@ )] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] -use std::{future::Future, net::SocketAddr, sync::Arc, time::Instant}; +/// Network Error +pub mod error; +/// Implementation of network traits for that does nothing. +pub mod noop; pub use alloy_rpc_types_admin::EthProtocolInfo; pub use error::NetworkError; +pub use reth_network_types::{PeerKind, PeersHandle, Reputation, ReputationChangeKind}; + +use std::{future::Future, net::SocketAddr, sync::Arc, time::Instant}; use reth_eth_wire::{capability::Capabilities, DisconnectReason, EthVersion, Status}; use reth_network_peers::NodeRecord; -pub use reth_network_types::{Reputation, ReputationChangeKind}; /// The `PeerId` type. pub type PeerId = alloy_primitives::B512; -/// Network Error -pub mod error; - -/// Implementation of network traits for that does nothing. -pub mod noop; - /// Provides general purpose information about the network. #[auto_impl::auto_impl(&, Arc)] pub trait NetworkInfo: Send + Sync { @@ -51,6 +50,7 @@ pub trait NetworkInfo: Send + Sync { } /// Provides general purpose information about Peers in the network. +#[auto_impl::auto_impl(&, Arc)] pub trait PeersInfo: Send + Sync { /// Returns how many peers the network is currently connected to. /// @@ -65,6 +65,7 @@ pub trait PeersInfo: Send + Sync { } /// Provides an API for managing the peers of the network. +#[auto_impl::auto_impl(&, Arc)] pub trait Peers: PeersInfo { /// Adds a peer to the peer set with UDP `SocketAddr`. fn add_peer(&self, peer: PeerId, tcp_addr: SocketAddr) { @@ -157,33 +158,13 @@ pub trait Peers: PeersInfo { ) -> impl Future, NetworkError>> + Send; } -/// Represents the kind of peer -#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)] -pub enum PeerKind { - /// Basic peer kind. - #[default] - Basic, - /// Static peer, added via JSON-RPC. - Static, - /// Trusted peer. - Trusted, -} - -impl PeerKind { - /// Returns `true` if the peer is trusted. - pub const fn is_trusted(&self) -> bool { - matches!(self, Self::Trusted) - } - - /// Returns `true` if the peer is static. - pub const fn is_static(&self) -> bool { - matches!(self, Self::Static) - } - - /// Returns `true` if the peer is basic. - pub const fn is_basic(&self) -> bool { - matches!(self, Self::Basic) - } +/// Provides an API for managing the peers of the network. +#[auto_impl::auto_impl(&, Arc)] +pub trait PeersHandleProvider { + /// Returns the [`PeersHandle`] that can be cloned and shared. + /// + /// The [`PeersHandle`] can be used to interact with the network's peer set. + fn peers_handle(&self) -> &PeersHandle; } /// Info about an active peer session. diff --git a/crates/net/network-api/src/noop.rs b/crates/net/network-api/src/noop.rs index a74204a3f..1dd748448 100644 --- a/crates/net/network-api/src/noop.rs +++ b/crates/net/network-api/src/noop.rs @@ -3,15 +3,15 @@ //! This is useful for wiring components together that don't require network but still need to be //! generic over it. -use crate::{ - NetworkError, NetworkInfo, NetworkStatus, PeerId, PeerInfo, PeerKind, Peers, PeersInfo, - Reputation, ReputationChangeKind, -}; +use std::net::{IpAddr, SocketAddr}; + use alloy_rpc_types_admin::EthProtocolInfo; use enr::{secp256k1::SecretKey, Enr}; use reth_eth_wire::{DisconnectReason, ProtocolVersion}; use reth_network_peers::NodeRecord; -use std::net::{IpAddr, SocketAddr}; +use reth_network_types::{PeerKind, Reputation, ReputationChangeKind}; + +use crate::{NetworkError, NetworkInfo, NetworkStatus, PeerId, PeerInfo, Peers, PeersInfo}; /// A type that implements all network trait that does nothing. /// diff --git a/crates/net/network-types/Cargo.toml b/crates/net/network-types/Cargo.toml index 3cef30cf7..6db629207 100644 --- a/crates/net/network-types/Cargo.toml +++ b/crates/net/network-types/Cargo.toml @@ -15,6 +15,10 @@ workspace = true # reth reth-network-peers.workspace = true reth-net-banlist.workspace = true +reth-ethereum-forks.workspace = true + +# async +tokio = { workspace = true, features = ["sync"] } reth-network-p2p.workspace = true # io @@ -24,6 +28,7 @@ serde_json = { workspace = true } # misc tracing.workspace = true +derive_more.workspace = true [features] serde = ["dep:serde", "dep:humantime-serde", "reth-network-p2p/serde"] diff --git a/crates/net/network-types/src/lib.rs b/crates/net/network-types/src/lib.rs index a4b5f5b10..2a220a58a 100644 --- a/crates/net/network-types/src/lib.rs +++ b/crates/net/network-types/src/lib.rs @@ -14,13 +14,22 @@ /// Types related to peering. pub mod peers; -pub use peers::{ConnectionsConfig, PeersConfig}; - pub mod session; -pub use session::{SessionLimits, SessionsConfig}; /// [`BackoffKind`] definition. mod backoff; -pub use backoff::BackoffKind; pub use reth_network_p2p::reputation::{Reputation, ReputationChangeKind, ReputationChangeWeights}; + +pub use backoff::BackoffKind; +pub use peers::{ + addr::PeerAddr, + handle::PeersHandle, + kind::PeerKind, + reputation::{ + is_banned_reputation, ReputationChange, ReputationChangeOutcome, DEFAULT_REPUTATION, + }, + state::PeerConnectionState, + ConnectionsConfig, Peer, PeerCommand, PeersConfig, +}; +pub use session::{SessionLimits, SessionsConfig}; diff --git a/crates/net/network-types/src/peers/addr.rs b/crates/net/network-types/src/peers/addr.rs new file mode 100644 index 000000000..ae0ee2ab2 --- /dev/null +++ b/crates/net/network-types/src/peers/addr.rs @@ -0,0 +1,45 @@ +//! `RLPx` (TCP) and `Discovery` (UDP) sockets of a peer. + +use std::net::{IpAddr, SocketAddr}; + +/// Represents a peer's address information. +/// +/// # Fields +/// +/// - `tcp`: A `SocketAddr` representing the peer's data transfer address. +/// - `udp`: An optional `SocketAddr` representing the peer's discover address. `None` if the peer +/// is directly connecting to us or the port is the same to `tcp`'s +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct PeerAddr { + tcp: SocketAddr, + udp: Option, +} + +impl PeerAddr { + /// Returns the peer's TCP address. + pub const fn tcp(&self) -> SocketAddr { + self.tcp + } + + /// Returns the peer's UDP address. + pub const fn udp(&self) -> Option { + self.udp + } + + /// Returns a new `PeerAddr` with the given `tcp` and `udp` addresses. + pub const fn new(tcp: SocketAddr, udp: Option) -> Self { + Self { tcp, udp } + } + + /// Returns a new `PeerAddr` with a `tcp` address only. + pub const fn from_tcp(tcp: SocketAddr) -> Self { + Self { tcp, udp: None } + } + + /// Returns a new `PeerAddr` with the given `tcp` and `udp` ports. + pub fn new_with_ports(ip: IpAddr, tcp_port: u16, udp_port: Option) -> Self { + let tcp = SocketAddr::new(ip, tcp_port); + let udp = udp_port.map(|port| SocketAddr::new(ip, port)); + Self::new(tcp, udp) + } +} diff --git a/crates/net/network-types/src/peers/cmd.rs b/crates/net/network-types/src/peers/cmd.rs new file mode 100644 index 000000000..253b5a103 --- /dev/null +++ b/crates/net/network-types/src/peers/cmd.rs @@ -0,0 +1,25 @@ +//! Commands sent to peer manager. + +use std::net::SocketAddr; + +use reth_network_peers::{NodeRecord, PeerId}; +use tokio::sync::oneshot; + +use crate::{Peer, ReputationChangeKind}; + +/// Commands the `PeersManager` listens for. +#[derive(Debug)] +pub enum PeerCommand { + /// Command for manually add + Add(PeerId, SocketAddr), + /// Remove a peer from the set + /// + /// If currently connected this will disconnect the session + Remove(PeerId), + /// Apply a reputation change to the given peer. + ReputationChange(PeerId, ReputationChangeKind), + /// Get information about a peer + GetPeer(PeerId, oneshot::Sender>), + /// Get node information on all peers + GetPeers(oneshot::Sender>), +} diff --git a/crates/net/network-types/src/peers/config.rs b/crates/net/network-types/src/peers/config.rs index fcd363357..a28b9e602 100644 --- a/crates/net/network-types/src/peers/config.rs +++ b/crates/net/network-types/src/peers/config.rs @@ -1,16 +1,18 @@ //! Configuration for peering. -use crate::{BackoffKind, ReputationChangeWeights}; -use reth_net_banlist::BanList; -use reth_network_peers::{NodeRecord, TrustedPeer}; use std::{ collections::HashSet, io::{self, ErrorKind}, path::Path, time::Duration, }; + +use reth_net_banlist::BanList; +use reth_network_peers::{NodeRecord, TrustedPeer}; use tracing::info; +use crate::{BackoffKind, ReputationChangeWeights}; + /// Maximum number of available slots for outbound sessions. pub const DEFAULT_MAX_COUNT_PEERS_OUTBOUND: u32 = 100; diff --git a/crates/net/network-types/src/peers/handle.rs b/crates/net/network-types/src/peers/handle.rs new file mode 100644 index 000000000..1503fd1bd --- /dev/null +++ b/crates/net/network-types/src/peers/handle.rs @@ -0,0 +1,55 @@ +//! Async peer handle. + +use std::net::SocketAddr; + +use derive_more::Constructor; +use reth_network_peers::{NodeRecord, PeerId}; +use tokio::sync::{mpsc, oneshot}; + +use crate::{Peer, PeerCommand, ReputationChangeKind}; + +/// A communication channel to the `PeersManager` to apply manual changes to the peer set. +#[derive(Clone, Debug, Constructor)] +pub struct PeersHandle { + /// Sender half of command channel back to the `PeersManager` + manager_tx: mpsc::UnboundedSender, +} + +// === impl PeersHandle === + +impl PeersHandle { + fn send(&self, cmd: PeerCommand) { + let _ = self.manager_tx.send(cmd); + } + + /// Adds a peer to the set. + pub fn add_peer(&self, peer_id: PeerId, addr: SocketAddr) { + self.send(PeerCommand::Add(peer_id, addr)); + } + + /// Removes a peer from the set. + pub fn remove_peer(&self, peer_id: PeerId) { + self.send(PeerCommand::Remove(peer_id)); + } + + /// Send a reputation change for the given peer. + pub fn reputation_change(&self, peer_id: PeerId, kind: ReputationChangeKind) { + self.send(PeerCommand::ReputationChange(peer_id, kind)); + } + + /// Returns a peer by its [`PeerId`], or `None` if the peer is not in the peer set. + pub async fn peer_by_id(&self, peer_id: PeerId) -> Option { + let (tx, rx) = oneshot::channel(); + self.send(PeerCommand::GetPeer(peer_id, tx)); + + rx.await.unwrap_or(None) + } + + /// Returns all peers in the peerset. + pub async fn all_peers(&self) -> Vec { + let (tx, rx) = oneshot::channel(); + self.send(PeerCommand::GetPeers(tx)); + + rx.await.unwrap_or_default() + } +} diff --git a/crates/net/network-types/src/peers/kind.rs b/crates/net/network-types/src/peers/kind.rs new file mode 100644 index 000000000..420e967ea --- /dev/null +++ b/crates/net/network-types/src/peers/kind.rs @@ -0,0 +1,30 @@ +//! Classification of a peer based on trust. + +/// Represents the kind of peer +#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)] +pub enum PeerKind { + /// Basic peer kind. + #[default] + Basic, + /// Static peer, added via JSON-RPC. + Static, + /// Trusted peer. + Trusted, +} + +impl PeerKind { + /// Returns `true` if the peer is trusted. + pub const fn is_trusted(&self) -> bool { + matches!(self, Self::Trusted) + } + + /// Returns `true` if the peer is static. + pub const fn is_static(&self) -> bool { + matches!(self, Self::Static) + } + + /// Returns `true` if the peer is basic. + pub const fn is_basic(&self) -> bool { + matches!(self, Self::Basic) + } +} diff --git a/crates/net/network-types/src/peers/mod.rs b/crates/net/network-types/src/peers/mod.rs index b28f998ef..c80db89f7 100644 --- a/crates/net/network-types/src/peers/mod.rs +++ b/crates/net/network-types/src/peers/mod.rs @@ -1,5 +1,142 @@ +pub mod addr; +pub mod cmd; pub mod config; +pub mod handle; +pub mod kind; +pub mod state; pub use reth_network_p2p::reputation; +pub use cmd::PeerCommand; pub use config::{ConnectionsConfig, PeersConfig}; +pub use reputation::ReputationChangeWeights; + +use reth_ethereum_forks::ForkId; +use tracing::trace; + +use crate::{ + is_banned_reputation, PeerAddr, PeerConnectionState, PeerKind, ReputationChangeOutcome, + DEFAULT_REPUTATION, +}; + +/// Tracks info about a single peer. +#[derive(Debug, Clone)] +pub struct Peer { + /// Where to reach the peer. + pub addr: PeerAddr, + /// Reputation of the peer. + pub reputation: i32, + /// The state of the connection, if any. + pub state: PeerConnectionState, + /// The [`ForkId`] that the peer announced via discovery. + pub fork_id: Option, + /// Whether the entry should be removed after an existing session was terminated. + pub remove_after_disconnect: bool, + /// The kind of peer + pub kind: PeerKind, + /// Whether the peer is currently backed off. + pub backed_off: bool, + /// Counts number of times the peer was backed off due to a severe + /// [`BackoffKind`](crate::BackoffKind). + pub severe_backoff_counter: u8, +} + +// === impl Peer === + +impl Peer { + /// Returns a new peer for given [`PeerAddr`]. + pub fn new(addr: PeerAddr) -> Self { + Self::with_state(addr, Default::default()) + } + + /// Returns a new trusted peer for given [`PeerAddr`]. + pub fn trusted(addr: PeerAddr) -> Self { + Self { kind: PeerKind::Trusted, ..Self::new(addr) } + } + + /// Returns the reputation of the peer + pub const fn reputation(&self) -> i32 { + self.reputation + } + + /// Returns a new peer for given [`PeerAddr`] and [`PeerConnectionState`]. + pub fn with_state(addr: PeerAddr, state: PeerConnectionState) -> Self { + Self { + addr, + state, + reputation: DEFAULT_REPUTATION, + fork_id: None, + remove_after_disconnect: false, + kind: Default::default(), + backed_off: false, + severe_backoff_counter: 0, + } + } + + /// Returns a new peer for given [`PeerAddr`] and [`PeerKind`]. + pub fn with_kind(addr: PeerAddr, kind: PeerKind) -> Self { + Self { kind, ..Self::new(addr) } + } + + /// Resets the reputation of the peer to the default value. This always returns + /// [`ReputationChangeOutcome::None`]. + pub fn reset_reputation(&mut self) -> ReputationChangeOutcome { + self.reputation = DEFAULT_REPUTATION; + + ReputationChangeOutcome::None + } + + /// Applies a reputation change to the peer and returns what action should be taken. + pub fn apply_reputation(&mut self, reputation: i32) -> ReputationChangeOutcome { + let previous = self.reputation; + // we add reputation since negative reputation change decrease total reputation + self.reputation = previous.saturating_add(reputation); + + trace!(target: "net::peers", reputation=%self.reputation, banned=%self.is_banned(), "applied reputation change"); + + if self.state.is_connected() && self.is_banned() { + self.state.disconnect(); + return ReputationChangeOutcome::DisconnectAndBan + } + + if self.is_banned() && !is_banned_reputation(previous) { + return ReputationChangeOutcome::Ban + } + + if !self.is_banned() && is_banned_reputation(previous) { + return ReputationChangeOutcome::Unban + } + + ReputationChangeOutcome::None + } + + /// Returns true if the peer's reputation is below the banned threshold. + #[inline] + pub const fn is_banned(&self) -> bool { + is_banned_reputation(self.reputation) + } + + /// Returns `true` if peer is banned. + #[inline] + pub const fn is_backed_off(&self) -> bool { + self.backed_off + } + + /// Unbans the peer by resetting its reputation + #[inline] + pub fn unban(&mut self) { + self.reputation = DEFAULT_REPUTATION + } + + /// Returns whether this peer is trusted + #[inline] + pub const fn is_trusted(&self) -> bool { + matches!(self.kind, PeerKind::Trusted) + } + + /// Returns whether this peer is static + #[inline] + pub const fn is_static(&self) -> bool { + matches!(self.kind, PeerKind::Static) + } +} diff --git a/crates/net/network-types/src/peers/state.rs b/crates/net/network-types/src/peers/state.rs new file mode 100644 index 000000000..f6ab1a39f --- /dev/null +++ b/crates/net/network-types/src/peers/state.rs @@ -0,0 +1,57 @@ +//! State of connection to a peer. + +/// Represents the kind of connection established to the peer, if any +#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)] +pub enum PeerConnectionState { + /// Not connected currently. + #[default] + Idle, + /// Disconnect of an incoming connection in progress + DisconnectingIn, + /// Disconnect of an outgoing connection in progress + DisconnectingOut, + /// Connected via incoming connection. + In, + /// Connected via outgoing connection. + Out, + /// Pending outgoing connection. + PendingOut, +} + +// === impl PeerConnectionState === + +impl PeerConnectionState { + /// Sets the disconnect state + #[inline] + pub fn disconnect(&mut self) { + match self { + Self::In => *self = Self::DisconnectingIn, + Self::Out => *self = Self::DisconnectingOut, + _ => {} + } + } + + /// Returns true if this is an active incoming connection. + #[inline] + pub const fn is_incoming(&self) -> bool { + matches!(self, Self::In) + } + + /// Returns whether we're currently connected with this peer + #[inline] + pub const fn is_connected(&self) -> bool { + matches!(self, Self::In | Self::Out | Self::PendingOut) + } + + /// Returns if there's currently no connection to that peer. + #[inline] + pub const fn is_unconnected(&self) -> bool { + matches!(self, Self::Idle) + } + + /// Returns true if there's currently an outbound dial to that peer. + #[inline] + pub const fn is_pending_out(&self) -> bool { + matches!(self, Self::PendingOut) + } +} diff --git a/crates/net/network/src/builder.rs b/crates/net/network/src/builder.rs index ea03a00a2..8a9c01fab 100644 --- a/crates/net/network/src/builder.rs +++ b/crates/net/network/src/builder.rs @@ -1,12 +1,14 @@ //! Builder support for configuring the entire setup. +use reth_network_api::PeersHandleProvider; +use reth_transaction_pool::TransactionPool; +use tokio::sync::mpsc; + use crate::{ eth_requests::EthRequestHandler, transactions::{TransactionsManager, TransactionsManagerConfig}, NetworkHandle, NetworkManager, }; -use reth_transaction_pool::TransactionPool; -use tokio::sync::mpsc; /// We set the max channel capacity of the `EthRequestHandler` to 256 /// 256 requests with malicious 10MB body requests is 2.6GB which can be absorbed by the node. diff --git a/crates/net/network/src/cache.rs b/crates/net/network/src/cache.rs index 51337336f..f9f1a4da3 100644 --- a/crates/net/network/src/cache.rs +++ b/crates/net/network/src/cache.rs @@ -1,11 +1,12 @@ //! Network cache support use core::hash::BuildHasher; +use std::{fmt, hash::Hash}; + use derive_more::{Deref, DerefMut}; use itertools::Itertools; // use linked_hash_set::LinkedHashSet; use schnellru::{ByLength, Limiter, RandomState, Unlimited}; -use std::{fmt, hash::Hash}; /// A minimal LRU cache based on a [`LruMap`](schnellru::LruMap) with limited capacity. /// diff --git a/crates/net/network/src/config.rs b/crates/net/network/src/config.rs index a74694487..1020170e2 100644 --- a/crates/net/network/src/config.rs +++ b/crates/net/network/src/config.rs @@ -1,11 +1,7 @@ //! Network config support -use crate::{ - error::NetworkError, - import::{BlockImport, ProofOfStakeBlockImport}, - transactions::TransactionsManagerConfig, - NetworkHandle, NetworkManager, -}; +use std::{collections::HashSet, net::SocketAddr, sync::Arc}; + use reth_chainspec::{ChainSpec, MAINNET}; use reth_discv4::{Discv4Config, Discv4ConfigBuilder, NatResolver, DEFAULT_DISCOVERY_ADDRESS}; use reth_discv5::NetworkStackId; @@ -17,7 +13,13 @@ use reth_primitives::{ForkFilter, Head}; use reth_storage_api::{BlockNumReader, BlockReader, HeaderProvider}; use reth_tasks::{TaskSpawner, TokioTaskExecutor}; use secp256k1::SECP256K1; -use std::{collections::HashSet, net::SocketAddr, sync::Arc}; + +use crate::{ + error::NetworkError, + import::{BlockImport, ProofOfStakeBlockImport}, + transactions::TransactionsManagerConfig, + NetworkHandle, NetworkManager, +}; // re-export for convenience use crate::protocol::{IntoRlpxSubProtocol, RlpxSubProtocols}; diff --git a/crates/net/network/src/discovery.rs b/crates/net/network/src/discovery.rs index 2e51a6b71..366601c63 100644 --- a/crates/net/network/src/discovery.rs +++ b/crates/net/network/src/discovery.rs @@ -1,11 +1,13 @@ //! Discovery support for the network. -use crate::{ - cache::LruMap, - error::{NetworkError, ServiceKind}, - manager::DiscoveredEvent, - peers::PeerAddr, +use std::{ + collections::VecDeque, + net::{IpAddr, SocketAddr}, + pin::Pin, + sync::Arc, + task::{ready, Context, Poll}, }; + use enr::Enr; use futures::StreamExt; use reth_discv4::{DiscoveryUpdate, Discv4, Discv4Config}; @@ -14,19 +16,19 @@ use reth_dns_discovery::{ DnsDiscoveryConfig, DnsDiscoveryHandle, DnsDiscoveryService, DnsNodeRecordUpdate, DnsResolver, }; use reth_network_peers::{NodeRecord, PeerId}; +use reth_network_types::PeerAddr; use reth_primitives::{EnrForkIdEntry, ForkId}; use secp256k1::SecretKey; -use std::{ - collections::VecDeque, - net::{IpAddr, SocketAddr}, - pin::Pin, - sync::Arc, - task::{ready, Context, Poll}, -}; use tokio::{sync::mpsc, task::JoinHandle}; use tokio_stream::{wrappers::ReceiverStream, Stream}; use tracing::trace; +use crate::{ + cache::LruMap, + error::{NetworkError, ServiceKind}, + manager::DiscoveredEvent, +}; + /// Default max capacity for cache of discovered peers. /// /// Default is 10 000 peers. diff --git a/crates/net/network/src/error.rs b/crates/net/network/src/error.rs index d5e0f4537..d52e602d1 100644 --- a/crates/net/network/src/error.rs +++ b/crates/net/network/src/error.rs @@ -1,13 +1,15 @@ //! Possible errors when interacting with the network. -use crate::session::PendingSessionHandshakeError; +use std::{fmt, io, io::ErrorKind, net::SocketAddr}; + use reth_dns_discovery::resolver::ResolveError; use reth_eth_wire::{ errors::{EthHandshakeError, EthStreamError, P2PHandshakeError, P2PStreamError}, DisconnectReason, }; use reth_network_types::BackoffKind; -use std::{fmt, io, io::ErrorKind, net::SocketAddr}; + +use crate::session::PendingSessionHandshakeError; /// Service kind. #[derive(Debug, PartialEq, Eq, Copy, Clone)] diff --git a/crates/net/network/src/eth_requests.rs b/crates/net/network/src/eth_requests.rs index 8ee317554..a18f0656b 100644 --- a/crates/net/network/src/eth_requests.rs +++ b/crates/net/network/src/eth_requests.rs @@ -1,9 +1,12 @@ //! Blocks/Headers management for the p2p network. -use crate::{ - budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget, - metrics::EthRequestHandlerMetrics, peers::PeersHandle, +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, + time::Duration, }; + use alloy_rlp::Encodable; use futures::StreamExt; use reth_eth_wire::{ @@ -12,17 +15,17 @@ use reth_eth_wire::{ }; use reth_network_p2p::error::RequestResult; use reth_network_peers::PeerId; +use reth_network_types::PeersHandle; use reth_primitives::{BlockBody, BlockHashOrNumber, Header}; use reth_storage_api::{BlockReader, HeaderProvider, ReceiptProvider}; -use std::{ - future::Future, - pin::Pin, - task::{Context, Poll}, - time::Duration, -}; use tokio::sync::{mpsc::Receiver, oneshot}; use tokio_stream::wrappers::ReceiverStream; +use crate::{ + budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget, + metrics::EthRequestHandlerMetrics, +}; + // Limits: /// Maximum number of receipts to serve. diff --git a/crates/net/network/src/fetch/client.rs b/crates/net/network/src/fetch/client.rs index 32d54d77f..4644dc77c 100644 --- a/crates/net/network/src/fetch/client.rs +++ b/crates/net/network/src/fetch/client.rs @@ -1,8 +1,11 @@ //! A client implementation that can interact with the network and download data. -use crate::{fetch::DownloadRequest, flattened_response::FlattenedResponse, peers::PeersHandle}; -use futures::{future, future::Either}; +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; +use futures::{future, future::Either}; use reth_network_p2p::{ bodies::client::{BodiesClient, BodiesFut}, download::DownloadClient, @@ -11,14 +14,12 @@ use reth_network_p2p::{ priority::Priority, }; use reth_network_peers::PeerId; -use reth_network_types::ReputationChangeKind; +use reth_network_types::{PeersHandle, ReputationChangeKind}; use reth_primitives::{Header, B256}; -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, -}; use tokio::sync::{mpsc::UnboundedSender, oneshot}; +use crate::{fetch::DownloadRequest, flattened_response::FlattenedResponse}; + #[cfg_attr(doc, aquamarine::aquamarine)] /// Front-end API for fetching data from the network. /// diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index 5d0c6e1b1..7de7e27a1 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -1,16 +1,9 @@ //! Fetch data from the network. -use crate::{message::BlockRequest, peers::PeersHandle}; -use futures::StreamExt; -use reth_eth_wire::{GetBlockBodies, GetBlockHeaders}; -use reth_network_p2p::{ - error::{EthResponseValidator, PeerRequestResult, RequestError, RequestResult}, - headers::client::HeadersRequest, - priority::Priority, -}; -use reth_network_peers::PeerId; -use reth_network_types::ReputationChangeKind; -use reth_primitives::{BlockBody, Header, B256}; +mod client; + +pub use client::FetchClient; + use std::{ collections::{HashMap, VecDeque}, sync::{ @@ -19,11 +12,21 @@ use std::{ }, task::{Context, Poll}, }; + +use futures::StreamExt; +use reth_eth_wire::{GetBlockBodies, GetBlockHeaders}; +use reth_network_p2p::{ + error::{EthResponseValidator, PeerRequestResult, RequestError, RequestResult}, + headers::client::HeadersRequest, + priority::Priority, +}; +use reth_network_peers::PeerId; +use reth_network_types::{PeersHandle, ReputationChangeKind}; +use reth_primitives::{BlockBody, Header, B256}; use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot}; use tokio_stream::wrappers::UnboundedReceiverStream; -mod client; -pub use client::FetchClient; +use crate::message::BlockRequest; /// Manages data fetching operations. /// diff --git a/crates/net/network/src/flattened_response.rs b/crates/net/network/src/flattened_response.rs index df2a9db78..78c3c35f5 100644 --- a/crates/net/network/src/flattened_response.rs +++ b/crates/net/network/src/flattened_response.rs @@ -1,9 +1,10 @@ -use futures::Future; -use pin_project::pin_project; use std::{ pin::Pin, task::{Context, Poll}, }; + +use futures::Future; +use pin_project::pin_project; use tokio::sync::oneshot::{error::RecvError, Receiver}; /// Flatten a [Receiver] message in order to get rid of the [RecvError] result diff --git a/crates/net/network/src/import.rs b/crates/net/network/src/import.rs index 11acfd03b..201dc3e4f 100644 --- a/crates/net/network/src/import.rs +++ b/crates/net/network/src/import.rs @@ -1,9 +1,11 @@ //! This module provides an abstraction over block import in the form of the `BlockImport` trait. -use crate::message::NewBlockMessage; -use reth_network_peers::PeerId; use std::task::{Context, Poll}; +use reth_network_peers::PeerId; + +use crate::message::NewBlockMessage; + /// Abstraction over block import. pub trait BlockImport: std::fmt::Debug + Send + Sync { /// Invoked for a received `NewBlock` broadcast message from the peer. diff --git a/crates/net/network/src/lib.rs b/crates/net/network/src/lib.rs index d37319723..7a2585eb7 100644 --- a/crates/net/network/src/lib.rs +++ b/crates/net/network/src/lib.rs @@ -114,47 +114,49 @@ /// Common helpers for network testing. pub mod test_utils; -mod budget; -mod builder; pub mod cache; pub mod config; -mod discovery; pub mod error; pub mod eth_requests; -mod fetch; -mod flattened_response; pub mod import; -mod listener; -mod manager; pub mod message; -mod metrics; -mod network; pub mod peers; pub mod protocol; +pub mod transactions; + +mod budget; +mod builder; +mod discovery; +mod fetch; +mod flattened_response; +mod listener; +mod manager; +mod metrics; +mod network; mod session; mod state; mod swarm; -pub mod transactions; + +pub use reth_eth_wire::{DisconnectReason, HelloMessageWithProtocols}; +pub use reth_network_api::{NetworkInfo, Peers, PeersHandleProvider, PeersInfo}; +pub use reth_network_p2p::sync::{NetworkSyncUpdater, SyncState}; +pub use reth_network_types::{PeersConfig, SessionsConfig}; +pub use session::{ + ActiveSessionHandle, ActiveSessionMessage, Direction, EthRlpxConnection, PeerInfo, + PendingSessionEvent, PendingSessionHandle, PendingSessionHandshakeError, SessionCommand, + SessionEvent, SessionId, SessionManager, +}; pub use builder::NetworkBuilder; pub use config::{NetworkConfig, NetworkConfigBuilder}; pub use discovery::{Discovery, DiscoveryEvent}; pub use fetch::FetchClient; -pub use manager::{NetworkEvent, NetworkManager}; -pub use message::PeerRequest; -pub use network::{NetworkEvents, NetworkHandle, NetworkProtocols}; -pub use session::{ - ActiveSessionHandle, ActiveSessionMessage, Direction, PeerInfo, PendingSessionEvent, - PendingSessionHandle, PendingSessionHandshakeError, SessionCommand, SessionEvent, SessionId, - SessionManager, -}; - pub use flattened_response::FlattenedResponse; -pub use manager::DiscoveredEvent; +pub use manager::{DiscoveredEvent, NetworkEvent, NetworkManager}; +pub use message::PeerRequest; pub use metrics::TxTypesCounter; -pub use reth_eth_wire::{DisconnectReason, HelloMessageWithProtocols}; -pub use reth_network_p2p::sync::{NetworkSyncUpdater, SyncState}; -pub use reth_network_types::{PeersConfig, SessionsConfig}; -pub use session::EthRlpxConnection; +pub use network::{ + BlockDownloaderProvider, FullNetwork, NetworkEvents, NetworkHandle, NetworkProtocols, +}; pub use swarm::NetworkConnectionState; pub use transactions::{FilterAnnouncement, MessageFilter, ValidateTx68}; diff --git a/crates/net/network/src/listener.rs b/crates/net/network/src/listener.rs index 9fcc15a10..e5094f689 100644 --- a/crates/net/network/src/listener.rs +++ b/crates/net/network/src/listener.rs @@ -1,12 +1,13 @@ //! Contains connection-oriented interfaces. -use futures::{ready, Stream}; use std::{ io, net::SocketAddr, pin::Pin, task::{Context, Poll}, }; + +use futures::{ready, Stream}; use tokio::net::{TcpListener, TcpStream}; /// A tcp connection listener. diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 7c378d9ef..603feca5d 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -37,7 +37,7 @@ use reth_fs_util::{self as fs, FsPathError}; use reth_metrics::common::mpsc::UnboundedMeteredSender; use reth_network_api::{EthProtocolInfo, NetworkStatus, PeerInfo}; use reth_network_peers::{NodeRecord, PeerId}; -use reth_network_types::ReputationChangeKind; +use reth_network_types::{PeerAddr, PeersHandle, ReputationChangeKind}; use reth_primitives::ForkId; use reth_storage_api::BlockNumReader; use reth_tasks::shutdown::GracefulShutdown; @@ -58,7 +58,7 @@ use crate::{ message::{NewBlockMessage, PeerMessage, PeerRequest, PeerRequestSender}, metrics::{DisconnectMetrics, NetworkMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE}, network::{NetworkHandle, NetworkHandleMessage}, - peers::{PeerAddr, PeersHandle, PeersManager}, + peers::PeersManager, poll_nested_stream_with_budget, protocol::IntoRlpxSubProtocol, session::SessionManager, diff --git a/crates/net/network/src/message.rs b/crates/net/network/src/message.rs index 9d55051e3..296f634f0 100644 --- a/crates/net/network/src/message.rs +++ b/crates/net/network/src/message.rs @@ -3,6 +3,12 @@ //! An `RLPx` stream is multiplexed via the prepended message-id of a framed message. //! Capabilities are exchanged via the `RLPx` `Hello` message as pairs of `(id, version)`, +use std::{ + fmt, + sync::Arc, + task::{ready, Context, Poll}, +}; + use futures::FutureExt; use reth_eth_wire::{ capability::RawCapabilityMessage, message::RequestPair, BlockBodies, BlockHeaders, EthMessage, @@ -15,11 +21,6 @@ use reth_network_peers::PeerId; use reth_primitives::{ BlockBody, Bytes, Header, PooledTransactionsElement, ReceiptWithBloom, B256, }; -use std::{ - fmt, - sync::Arc, - task::{ready, Context, Poll}, -}; use tokio::sync::{mpsc, mpsc::error::TrySendError, oneshot}; /// Internal form of a `NewBlock` message diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index cf696f138..d25618acf 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -1,27 +1,3 @@ -use crate::{ - config::NetworkMode, - discovery::DiscoveryEvent, - manager::NetworkEvent, - message::PeerRequest, - peers::{PeerAddr, PeersHandle}, - protocol::RlpxSubProtocol, - swarm::NetworkConnectionState, - transactions::TransactionsHandle, - FetchClient, -}; -use enr::Enr; -use parking_lot::Mutex; -use reth_discv4::Discv4; -use reth_eth_wire::{DisconnectReason, NewBlock, NewPooledTransactionHashes, SharedTransactions}; -use reth_network_api::{ - NetworkError, NetworkInfo, NetworkStatus, PeerInfo, PeerKind, Peers, PeersInfo, -}; -use reth_network_p2p::sync::{NetworkSyncUpdater, SyncState, SyncStateProvider}; -use reth_network_peers::{NodeRecord, PeerId}; -use reth_network_types::{Reputation, ReputationChangeKind}; -use reth_primitives::{Head, TransactionSigned, B256}; -use reth_tokio_util::{EventSender, EventStream}; -use secp256k1::SecretKey; use std::{ net::SocketAddr, sync::{ @@ -29,12 +5,58 @@ use std::{ Arc, }, }; + +use enr::Enr; +use futures::Future; +use parking_lot::Mutex; +use reth_discv4::Discv4; +use reth_eth_wire::{DisconnectReason, NewBlock, NewPooledTransactionHashes, SharedTransactions}; +use reth_network_api::{ + NetworkError, NetworkInfo, NetworkStatus, PeerInfo, Peers, PeersHandleProvider, PeersInfo, +}; +use reth_network_p2p::sync::{NetworkSyncUpdater, SyncState, SyncStateProvider}; +use reth_network_peers::{NodeRecord, PeerId}; +use reth_network_types::{PeerAddr, PeerKind, PeersHandle, Reputation, ReputationChangeKind}; +use reth_primitives::{Head, TransactionSigned, B256}; +use reth_tokio_util::{EventSender, EventStream}; +use secp256k1::SecretKey; use tokio::sync::{ mpsc::{self, UnboundedSender}, oneshot, }; use tokio_stream::wrappers::UnboundedReceiverStream; +use crate::{ + config::NetworkMode, discovery::DiscoveryEvent, manager::NetworkEvent, message::PeerRequest, + protocol::RlpxSubProtocol, swarm::NetworkConnectionState, transactions::TransactionsHandle, + FetchClient, +}; + +/// Helper trait that unifies network API needed to launch node. +pub trait FullNetwork: + BlockDownloaderProvider + + NetworkSyncUpdater + + NetworkInfo + + NetworkEvents + + PeersInfo + + Peers + + Clone + + 'static +{ +} + +impl FullNetwork for T where + T: BlockDownloaderProvider + + NetworkSyncUpdater + + NetworkInfo + + NetworkEvents + + PeersInfo + + Peers + + Clone + + 'static +{ +} + /// A _shareable_ network frontend. Used to interact with the network. /// /// See also [`NetworkManager`](crate::NetworkManager). @@ -85,26 +107,10 @@ impl NetworkHandle { &self.inner.local_peer_id } - /// Returns the [`PeersHandle`] that can be cloned and shared. - /// - /// The [`PeersHandle`] can be used to interact with the network's peer set. - pub fn peers_handle(&self) -> &PeersHandle { - &self.inner.peers - } - fn manager(&self) -> &UnboundedSender { &self.inner.to_manager_tx } - /// Returns a new [`FetchClient`] that can be cloned and shared. - /// - /// The [`FetchClient`] is the entrypoint for sending requests to the network. - pub async fn fetch_client(&self) -> Result { - let (tx, rx) = oneshot::channel(); - let _ = self.manager().send(NetworkHandleMessage::FetchClient(tx)); - rx.await - } - /// Returns the mode of the network, either pow, or pos pub fn mode(&self) -> &NetworkMode { &self.inner.network_mode @@ -328,6 +334,12 @@ impl Peers for NetworkHandle { } } +impl PeersHandleProvider for NetworkHandle { + fn peers_handle(&self) -> &PeersHandle { + &self.inner.peers + } +} + impl NetworkInfo for NetworkHandle { fn local_addr(&self) -> SocketAddr { *self.inner.listener_address.lock() @@ -381,6 +393,25 @@ impl NetworkSyncUpdater for NetworkHandle { } } +/// Provides [`FetchClient`] for downloading blocks. +#[auto_impl::auto_impl(&, Arc)] +pub trait BlockDownloaderProvider { + /// Returns a new [`FetchClient`] that can be cloned and shared. + /// + /// The [`FetchClient`] is the entrypoint for sending requests to the network. + fn fetch_client( + &self, + ) -> impl Future> + Send; +} + +impl BlockDownloaderProvider for NetworkHandle { + async fn fetch_client(&self) -> Result { + let (tx, rx) = oneshot::channel(); + let _ = self.manager().send(NetworkHandleMessage::FetchClient(tx)); + rx.await + } +} + #[derive(Debug)] struct NetworkInner { /// Number of active peer sessions the node's currently handling. @@ -412,6 +443,7 @@ struct NetworkInner { } /// Provides event subscription for the network. +#[auto_impl::auto_impl(&, Arc)] pub trait NetworkEvents: Send + Sync { /// Creates a new [`NetworkEvent`] listener channel. fn event_listener(&self) -> EventStream; diff --git a/crates/net/network/src/peers.rs b/crates/net/network/src/peers.rs index 9584594fc..dc05b96fc 100644 --- a/crates/net/network/src/peers.rs +++ b/crates/net/network/src/peers.rs @@ -1,25 +1,5 @@ //! Peer related implementations -use crate::{ - error::SessionError, - session::{Direction, PendingSessionHandshakeError}, - swarm::NetworkConnectionState, -}; -use futures::StreamExt; -use reth_eth_wire::{errors::EthStreamError, DisconnectReason}; -use reth_net_banlist::BanList; -use reth_network_api::PeerKind; -use reth_network_peers::{NodeRecord, PeerId}; -use reth_network_types::{ - peers::{ - config::PeerBackoffDurations, - reputation::{ - is_banned_reputation, DEFAULT_REPUTATION, MAX_TRUSTED_PEER_REPUTATION_CHANGE, - }, - }, - ConnectionsConfig, PeersConfig, ReputationChangeKind, ReputationChangeWeights, -}; -use reth_primitives::ForkId; use std::{ collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, fmt::Display, @@ -28,59 +8,33 @@ use std::{ task::{Context, Poll}, time::Duration, }; + +use futures::StreamExt; +use reth_eth_wire::{errors::EthStreamError, DisconnectReason}; +use reth_net_banlist::BanList; +use reth_network_peers::{NodeRecord, PeerId}; +use reth_network_types::{ + peers::{ + config::PeerBackoffDurations, + reputation::{DEFAULT_REPUTATION, MAX_TRUSTED_PEER_REPUTATION_CHANGE}, + }, + ConnectionsConfig, Peer, PeerAddr, PeerCommand, PeerConnectionState, PeerKind, PeersConfig, + PeersHandle, ReputationChangeKind, ReputationChangeOutcome, ReputationChangeWeights, +}; +use reth_primitives::ForkId; use thiserror::Error; use tokio::{ - sync::{mpsc, oneshot}, + sync::mpsc, time::{Instant, Interval}, }; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{trace, warn}; -/// A communication channel to the [`PeersManager`] to apply manual changes to the peer set. -#[derive(Clone, Debug)] -pub struct PeersHandle { - /// Sender half of command channel back to the [`PeersManager`] - manager_tx: mpsc::UnboundedSender, -} - -// === impl PeersHandle === - -impl PeersHandle { - fn send(&self, cmd: PeerCommand) { - let _ = self.manager_tx.send(cmd); - } - - /// Adds a peer to the set. - pub fn add_peer(&self, peer_id: PeerId, addr: SocketAddr) { - self.send(PeerCommand::Add(peer_id, addr)); - } - - /// Removes a peer from the set. - pub fn remove_peer(&self, peer_id: PeerId) { - self.send(PeerCommand::Remove(peer_id)); - } - - /// Send a reputation change for the given peer. - pub fn reputation_change(&self, peer_id: PeerId, kind: ReputationChangeKind) { - self.send(PeerCommand::ReputationChange(peer_id, kind)); - } - - /// Returns a peer by its [`PeerId`], or `None` if the peer is not in the peer set. - pub async fn peer_by_id(&self, peer_id: PeerId) -> Option { - let (tx, rx) = oneshot::channel(); - self.send(PeerCommand::GetPeer(peer_id, tx)); - - rx.await.unwrap_or(None) - } - - /// Returns all peers in the peerset. - pub async fn all_peers(&self) -> Vec { - let (tx, rx) = oneshot::channel(); - self.send(PeerCommand::GetPeers(tx)); - - rx.await.unwrap_or_default() - } -} +use crate::{ + error::SessionError, + session::{Direction, PendingSessionHandshakeError}, + swarm::NetworkConnectionState, +}; /// Maintains the state of _all_ the peers known to the network. /// @@ -198,7 +152,7 @@ impl PeersManager { /// Returns a new [`PeersHandle`] that can send commands to this type. pub(crate) fn handle(&self) -> PeersHandle { - PeersHandle { manager_tx: self.manager_tx.clone() } + PeersHandle::new(self.manager_tx.clone()) } /// Returns the number of peers in the peer set @@ -211,9 +165,9 @@ impl PeersManager { pub(crate) fn iter_peers(&self) -> impl Iterator + '_ { self.peers.iter().map(|(peer_id, v)| { NodeRecord::new_with_ports( - v.addr.tcp.ip(), - v.addr.tcp.port(), - v.addr.udp.map(|addr| addr.port()), + v.addr.tcp().ip(), + v.addr.tcp().port(), + v.addr.udp().map(|addr| addr.port()), *peer_id, ) }) @@ -224,9 +178,9 @@ impl PeersManager { self.peers.get(&peer_id).map(|v| { ( NodeRecord::new_with_ports( - v.addr.tcp.ip(), - v.addr.tcp.port(), - v.addr.udp.map(|addr| addr.port()), + v.addr.tcp().ip(), + v.addr.tcp().port(), + v.addr.udp().map(|addr| addr.port()), peer_id, ), v.kind, @@ -362,7 +316,7 @@ impl PeersManager { Entry::Vacant(entry) => { // peer is missing in the table, we add it but mark it as to be removed after // disconnect, because we only know the outgoing port - let mut peer = Peer::with_state(PeerAddr::tcp(addr), PeerConnectionState::In); + let mut peer = Peer::with_state(PeerAddr::from_tcp(addr), PeerConnectionState::In); peer.remove_after_disconnect = true; entry.insert(peer); self.queued_actions.push_back(PeerAction::PeerAdded(peer_id)); @@ -715,7 +669,7 @@ impl PeersManager { addr: PeerAddr, fork_id: Option, ) { - if self.ban_list.is_banned(&peer_id, &addr.tcp.ip()) { + if self.ban_list.is_banned(&peer_id, &addr.tcp().ip()) { return } @@ -734,7 +688,7 @@ impl PeersManager { } } Entry::Vacant(entry) => { - trace!(target: "net::peers", ?peer_id, ?addr.tcp, "discovered new node"); + trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "discovered new node"); let mut peer = Peer::with_kind(addr, kind); peer.fork_id = fork_id; entry.insert(peer); @@ -849,7 +803,7 @@ impl PeersManager { trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "schedule outbound connection"); peer.state = PeerConnectionState::PendingOut; - PeerAction::Connect { peer_id, remote_addr: peer.addr.tcp } + PeerAction::Connect { peer_id, remote_addr: peer.addr.tcp() } }; self.connection_info.inc_pending_out(); @@ -887,7 +841,7 @@ impl PeersManager { while let Poll::Ready(Some(cmd)) = self.handle_rx.poll_next_unpin(cx) { match cmd { PeerCommand::Add(peer_id, addr) => { - self.add_peer(peer_id, PeerAddr::tcp(addr), None); + self.add_peer(peer_id, PeerAddr::from_tcp(addr), None); } PeerCommand::Remove(peer) => self.remove_peer(peer), PeerCommand::ReputationChange(peer_id, rep) => { @@ -1019,240 +973,6 @@ impl ConnectionInfo { } } -/// Represents a peer's address information. -/// -/// # Fields -/// -/// - `tcp`: A `SocketAddr` representing the peer's data transfer address. -/// - `udp`: An optional `SocketAddr` representing the peer's discover address. `None` if the peer -/// is directly connecting to us or the port is the same to `tcp`'s -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub struct PeerAddr { - tcp: SocketAddr, - udp: Option, -} - -impl PeerAddr { - /// Returns a new `PeerAddr` with the given `tcp` and `udp` addresses. - pub const fn new(tcp: SocketAddr, udp: Option) -> Self { - Self { tcp, udp } - } - - /// Returns a new `PeerAddr` with a `tcp` address only. - pub const fn tcp(tcp: SocketAddr) -> Self { - Self { tcp, udp: None } - } - - /// Returns a new `PeerAddr` with the given `tcp` and `udp` ports. - fn new_with_ports(ip: IpAddr, tcp_port: u16, udp_port: Option) -> Self { - let tcp = SocketAddr::new(ip, tcp_port); - let udp = udp_port.map(|port| SocketAddr::new(ip, port)); - Self::new(tcp, udp) - } -} - -/// Tracks info about a single peer. -#[derive(Debug, Clone)] -pub struct Peer { - /// Where to reach the peer. - addr: PeerAddr, - /// Reputation of the peer. - reputation: i32, - /// The state of the connection, if any. - state: PeerConnectionState, - /// The [`ForkId`] that the peer announced via discovery. - fork_id: Option, - /// Whether the entry should be removed after an existing session was terminated. - remove_after_disconnect: bool, - /// The kind of peer - kind: PeerKind, - /// Whether the peer is currently backed off. - backed_off: bool, - /// Counts number of times the peer was backed off due to a severe - /// [`reth_network_types::BackoffKind`]. - severe_backoff_counter: u8, -} - -// === impl Peer === - -impl Peer { - fn new(addr: PeerAddr) -> Self { - Self::with_state(addr, Default::default()) - } - - fn trusted(addr: PeerAddr) -> Self { - Self { kind: PeerKind::Trusted, ..Self::new(addr) } - } - - /// Returns the reputation of the peer - pub const fn reputation(&self) -> i32 { - self.reputation - } - - fn with_state(addr: PeerAddr, state: PeerConnectionState) -> Self { - Self { - addr, - state, - reputation: DEFAULT_REPUTATION, - fork_id: None, - remove_after_disconnect: false, - kind: Default::default(), - backed_off: false, - severe_backoff_counter: 0, - } - } - - fn with_kind(addr: PeerAddr, kind: PeerKind) -> Self { - Self { kind, ..Self::new(addr) } - } - - /// Resets the reputation of the peer to the default value. This always returns - /// [`ReputationChangeOutcome::None`]. - fn reset_reputation(&mut self) -> ReputationChangeOutcome { - self.reputation = DEFAULT_REPUTATION; - - ReputationChangeOutcome::None - } - - /// Applies a reputation change to the peer and returns what action should be taken. - fn apply_reputation(&mut self, reputation: i32) -> ReputationChangeOutcome { - let previous = self.reputation; - // we add reputation since negative reputation change decrease total reputation - self.reputation = previous.saturating_add(reputation); - - trace!(target: "net::peers", reputation=%self.reputation, banned=%self.is_banned(), "applied reputation change"); - - if self.state.is_connected() && self.is_banned() { - self.state.disconnect(); - return ReputationChangeOutcome::DisconnectAndBan - } - - if self.is_banned() && !is_banned_reputation(previous) { - return ReputationChangeOutcome::Ban - } - - if !self.is_banned() && is_banned_reputation(previous) { - return ReputationChangeOutcome::Unban - } - - ReputationChangeOutcome::None - } - - /// Returns true if the peer's reputation is below the banned threshold. - #[inline] - const fn is_banned(&self) -> bool { - is_banned_reputation(self.reputation) - } - - #[inline] - const fn is_backed_off(&self) -> bool { - self.backed_off - } - - /// Unbans the peer by resetting its reputation - #[inline] - fn unban(&mut self) { - self.reputation = DEFAULT_REPUTATION - } - - /// Returns whether this peer is trusted - #[inline] - const fn is_trusted(&self) -> bool { - matches!(self.kind, PeerKind::Trusted) - } - - /// Returns whether this peer is static - #[inline] - const fn is_static(&self) -> bool { - matches!(self.kind, PeerKind::Static) - } -} - -/// Outcomes when a reputation change is applied to a peer -enum ReputationChangeOutcome { - /// Nothing to do. - None, - /// Ban the peer. - Ban, - /// Ban and disconnect - DisconnectAndBan, - /// Unban the peer - Unban, -} - -/// Represents the kind of connection established to the peer, if any -#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)] -enum PeerConnectionState { - /// Not connected currently. - #[default] - Idle, - /// Disconnect of an incoming connection in progress - DisconnectingIn, - /// Disconnect of an outgoing connection in progress - DisconnectingOut, - /// Connected via incoming connection. - In, - /// Connected via outgoing connection. - Out, - /// Pending outgoing connection. - PendingOut, -} - -// === impl PeerConnectionState === - -impl PeerConnectionState { - /// Sets the disconnect state - #[inline] - fn disconnect(&mut self) { - match self { - Self::In => *self = Self::DisconnectingIn, - Self::Out => *self = Self::DisconnectingOut, - _ => {} - } - } - - /// Returns true if this is an active incoming connection. - #[inline] - const fn is_incoming(&self) -> bool { - matches!(self, Self::In) - } - - /// Returns whether we're currently connected with this peer - #[inline] - const fn is_connected(&self) -> bool { - matches!(self, Self::In | Self::Out | Self::PendingOut) - } - - /// Returns if there's currently no connection to that peer. - #[inline] - const fn is_unconnected(&self) -> bool { - matches!(self, Self::Idle) - } - - /// Returns true if there's currently an outbound dial to that peer. - #[inline] - const fn is_pending_out(&self) -> bool { - matches!(self, Self::PendingOut) - } -} - -/// Commands the [`PeersManager`] listens for. -#[derive(Debug)] -pub(crate) enum PeerCommand { - /// Command for manually add - Add(PeerId, SocketAddr), - /// Remove a peer from the set - /// - /// If currently connected this will disconnect the session - Remove(PeerId), - /// Apply a reputation change to the given peer. - ReputationChange(PeerId, ReputationChangeKind), - /// Get information about a peer - GetPeer(PeerId, oneshot::Sender>), - /// Get node information on all peers - GetPeers(oneshot::Sender>), -} - /// Actions the peer manager can trigger. #[derive(Debug)] pub enum PeerAction { @@ -1382,7 +1102,7 @@ mod tests { let peer = PeerId::random(); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::default(); - peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); + peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -1436,7 +1156,7 @@ mod tests { let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::default(); peers.ban_peer(peer); - peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); + peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None); match event!(peers) { PeerAction::BanPeer { peer_id } => { @@ -1458,7 +1178,7 @@ mod tests { let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::default(); peers.ban_peer(peer); - peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); + peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None); match event!(peers) { PeerAction::BanPeer { peer_id } => { @@ -1495,7 +1215,7 @@ mod tests { let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::new(PeersConfig::test()); - peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); + peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -1554,7 +1274,7 @@ mod tests { let backoff_durations = PeerBackoffDurations::test(); let config = PeersConfig { backoff_durations, ..PeersConfig::test() }; let mut peers = PeersManager::new(config); - peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); + peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -1611,7 +1331,7 @@ mod tests { let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let config = PeersConfig::test(); let mut peers = PeersManager::new(config); - peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); + peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None); let peer_struct = peers.peers.get_mut(&peer).unwrap(); let backoff_timestamp = peers @@ -1628,7 +1348,7 @@ mod tests { let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let config = PeersConfig::default(); let mut peers = PeersManager::new(config); - peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); + peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None); let peer_struct = peers.peers.get_mut(&peer).unwrap(); // Simulate a peer that was already backed off once @@ -1656,7 +1376,7 @@ mod tests { let peer = PeerId::random(); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::default(); - peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); + peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -1713,7 +1433,7 @@ mod tests { let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let config = PeersConfig::test(); let mut peers = PeersManager::new(config.clone()); - peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); + peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None); let peer_struct = peers.peers.get_mut(&peer).unwrap(); // Simulate a peer that was already backed off once @@ -1767,7 +1487,7 @@ mod tests { let peer = PeerId::random(); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::default(); - peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); + peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -1879,7 +1599,7 @@ mod tests { let peer = PeerId::random(); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::default(); - peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); + peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -1997,7 +1717,7 @@ mod tests { // to increase by 1 peers.on_incoming_session_established(peer, socket_addr); let p = peers.peers.get_mut(&peer).expect("peer not found"); - assert_eq!(p.addr.tcp, socket_addr); + assert_eq!(p.addr.tcp(), socket_addr); assert_eq!(peers.connection_info.num_pending_in, 0); assert_eq!(peers.connection_info.num_inbound, 1); @@ -2012,7 +1732,7 @@ mod tests { peers.on_already_connected(Direction::Incoming); let p = peers.peers.get_mut(&peer).expect("peer not found"); - assert_eq!(p.addr.tcp, socket_addr); + assert_eq!(p.addr.tcp(), socket_addr); assert_eq!(peers.connection_info.num_pending_in, 0); assert_eq!(peers.connection_info.num_inbound, 1); } @@ -2022,7 +1742,7 @@ mod tests { let peer = PeerId::random(); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::default(); - peers.add_trusted_peer(peer, PeerAddr::tcp(socket_addr)); + peers.add_trusted_peer(peer, PeerAddr::from_tcp(socket_addr)); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -2074,7 +1794,7 @@ mod tests { let peer = PeerId::random(); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::default(); - peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); + peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None); assert_eq!(peers.get_reputation(&peer), Some(0)); peers.apply_reputation_change(&peer, ReputationChangeKind::Other(1024)); @@ -2089,7 +1809,7 @@ mod tests { let peer = PeerId::random(); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::default(); - peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); + peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -2126,7 +1846,7 @@ mod tests { let p = peers.peers.get(&peer).unwrap(); assert_eq!(p.state, PeerConnectionState::PendingOut); - peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); + peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None); let p = peers.peers.get(&peer).unwrap(); assert_eq!(p.state, PeerConnectionState::PendingOut); @@ -2139,7 +1859,7 @@ mod tests { let peer = PeerId::random(); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::default(); - peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); + peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -2174,7 +1894,7 @@ mod tests { let peer = PeerId::random(); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::default(); - peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); + peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -2208,7 +1928,7 @@ mod tests { let ban_list = BanList::new(HashSet::new(), vec![ip]); let config = PeersConfig::default().with_ban_list(ban_list); let mut peer_manager = PeersManager::new(config); - peer_manager.add_peer(B512::default(), PeerAddr::tcp(socket_addr), None); + peer_manager.add_peer(B512::default(), PeerAddr::from_tcp(socket_addr), None); assert!(peer_manager.peers.is_empty()); } @@ -2311,7 +2031,7 @@ mod tests { let basic_peer = PeerId::random(); let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009); - peers.add_peer(basic_peer, PeerAddr::tcp(basic_sock), None); + peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -2351,7 +2071,7 @@ mod tests { let basic_peer = PeerId::random(); let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009); - peers.add_peer(basic_peer, PeerAddr::tcp(basic_sock), None); + peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -2459,7 +2179,7 @@ mod tests { let config = PeersConfig::test(); let mut peer_manager = PeersManager::new(config); let peer_id = PeerId::random(); - peer_manager.add_peer(peer_id, PeerAddr::tcp(socket_addr), None); + peer_manager.add_peer(peer_id, PeerAddr::from_tcp(socket_addr), None); tokio::time::sleep(Duration::from_secs(1)).await; peer_manager.tick(); @@ -2514,7 +2234,7 @@ mod tests { assert!(peer.remove_after_disconnect); // trigger discovery manually while the peer is still connected - peers.add_peer(peer_id, PeerAddr::tcp(addr), None); + peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None); peers.on_active_session_gracefully_closed(peer_id); @@ -2530,7 +2250,7 @@ mod tests { let mut peers = PeersManager::default(); peers.on_incoming_pending_session(addr.ip()).unwrap(); - peers.add_peer(peer_id, PeerAddr::tcp(addr), None); + peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None); match event!(peers) { PeerAction::PeerAdded(_) => {} @@ -2558,7 +2278,7 @@ mod tests { let mut peers = PeersManager::default(); peers.on_incoming_pending_session(addr.ip()).unwrap(); - peers.add_peer(peer_id, PeerAddr::tcp(addr), None); + peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None); match event!(peers) { PeerAction::PeerAdded(_) => {} @@ -2589,7 +2309,7 @@ mod tests { let config = PeersConfig::default(); let mut peer_manager = PeersManager::new(config); let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)); - let peer_addr = PeerAddr::tcp(SocketAddr::new(ip, 8008)); + let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008)); for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 { peer_manager.add_peer(PeerId::random(), peer_addr, None); } @@ -2608,7 +2328,7 @@ mod tests { let config = PeersConfig::default(); let mut peer_manager = PeersManager::new(config); let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)); - let peer_addr = PeerAddr::tcp(SocketAddr::new(ip, 8008)); + let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008)); // add more peers than allowed for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 { diff --git a/crates/net/network/src/protocol.rs b/crates/net/network/src/protocol.rs index 2ae1b132d..8ea34fff1 100644 --- a/crates/net/network/src/protocol.rs +++ b/crates/net/network/src/protocol.rs @@ -2,18 +2,19 @@ //! //! See also +use std::{ + fmt, + net::SocketAddr, + ops::{Deref, DerefMut}, + pin::Pin, +}; + use futures::Stream; use reth_eth_wire::{ capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol, }; use reth_network_api::{Direction, PeerId}; use reth_primitives::BytesMut; -use std::{ - fmt, - net::SocketAddr, - ops::{Deref, DerefMut}, - pin::Pin, -}; /// A trait that allows to offer additional RLPx-based application-level protocols when establishing /// a peer-to-peer connection. diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index 1cd090105..1aa1e6c26 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -1,14 +1,16 @@ //! Represents an established session. -use crate::{ - message::{NewBlockMessage, PeerMessage, PeerRequest, PeerResponse, PeerResponseResult}, - session::{ - conn::EthRlpxConnection, - handle::{ActiveSessionMessage, SessionCommand}, - SessionId, - }, -}; use core::sync::atomic::Ordering; +use std::{ + collections::VecDeque, + future::Future, + net::SocketAddr, + pin::Pin, + sync::{atomic::AtomicU64, Arc}, + task::{ready, Context, Poll}, + time::{Duration, Instant}, +}; + use futures::{stream::Fuse, SinkExt, StreamExt}; use reth_eth_wire::{ capability::Capabilities, @@ -21,15 +23,6 @@ use reth_network_p2p::error::RequestError; use reth_network_peers::PeerId; use reth_network_types::session::config::INITIAL_REQUEST_TIMEOUT; use rustc_hash::FxHashMap; -use std::{ - collections::VecDeque, - future::Future, - net::SocketAddr, - pin::Pin, - sync::{atomic::AtomicU64, Arc}, - task::{ready, Context, Poll}, - time::{Duration, Instant}, -}; use tokio::{ sync::{mpsc::error::TrySendError, oneshot}, time::Interval, @@ -38,6 +31,15 @@ use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::PollSender; use tracing::{debug, trace}; +use crate::{ + message::{NewBlockMessage, PeerMessage, PeerRequest, PeerResponse, PeerResponseResult}, + session::{ + conn::EthRlpxConnection, + handle::{ActiveSessionMessage, SessionCommand}, + SessionId, + }, +}; + // Constants for timeout updating. /// Minimum timeout value diff --git a/crates/net/network/src/session/conn.rs b/crates/net/network/src/session/conn.rs index fce7b3908..628c880c8 100644 --- a/crates/net/network/src/session/conn.rs +++ b/crates/net/network/src/session/conn.rs @@ -1,5 +1,10 @@ //! Connection types for a session +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + use futures::{Sink, Stream}; use reth_ecies::stream::ECIESStream; use reth_eth_wire::{ @@ -8,10 +13,6 @@ use reth_eth_wire::{ multiplex::{ProtocolProxy, RlpxSatelliteStream}, EthMessage, EthStream, EthVersion, P2PStream, }; -use std::{ - pin::Pin, - task::{Context, Poll}, -}; use tokio::net::TcpStream; /// The type of the underlying peer network connection. diff --git a/crates/net/network/src/session/handle.rs b/crates/net/network/src/session/handle.rs index 4c1a5e531..ad9c491a2 100644 --- a/crates/net/network/src/session/handle.rs +++ b/crates/net/network/src/session/handle.rs @@ -1,24 +1,27 @@ //! Session handles. -use crate::{ - message::PeerMessage, - session::{conn::EthRlpxConnection, Direction, SessionId}, - PendingSessionHandshakeError, -}; +use std::{io, net::SocketAddr, sync::Arc, time::Instant}; + use reth_ecies::ECIESError; use reth_eth_wire::{ capability::{Capabilities, CapabilityMessage}, errors::EthStreamError, DisconnectReason, EthVersion, Status, }; -use reth_network_api::{PeerInfo, PeerKind}; +use reth_network_api::PeerInfo; use reth_network_peers::{NodeRecord, PeerId}; -use std::{io, net::SocketAddr, sync::Arc, time::Instant}; +use reth_network_types::PeerKind; use tokio::sync::{ mpsc::{self, error::SendError}, oneshot, }; +use crate::{ + message::PeerMessage, + session::{conn::EthRlpxConnection, Direction, SessionId}, + PendingSessionHandshakeError, +}; + /// A handler attached to a peer session that's not authenticated yet, pending Handshake and hello /// message which exchanges the `capabilities` of the peer. /// diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index 5f01b8dad..1dfb9a4ea 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -1,12 +1,36 @@ //! Support for handling peer sessions. -use crate::{message::PeerMessage, metrics::SessionManagerMetrics, session::active::ActiveSession}; +mod active; +mod conn; +mod counter; +mod handle; + +pub use conn::EthRlpxConnection; +pub use handle::{ + ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle, + SessionCommand, +}; + +pub use crate::message::PeerRequestSender; + +pub use reth_network_api::{Direction, PeerInfo}; + +use std::{ + collections::HashMap, + future::Future, + net::SocketAddr, + sync::{atomic::AtomicU64, Arc}, + task::{Context, Poll}, + time::{Duration, Instant}, +}; + use counter::SessionCounter; use futures::{future::Either, io, FutureExt, StreamExt}; use reth_ecies::{stream::ECIESStream, ECIESError}; use reth_eth_wire::{ capability::{Capabilities, CapabilityMessage}, errors::EthStreamError, + multiplex::RlpxProtocolMultiplexer, DisconnectReason, EthVersion, HelloMessageWithProtocols, Status, UnauthedEthStream, UnauthedP2PStream, }; @@ -17,14 +41,6 @@ use reth_primitives::{ForkFilter, ForkId, ForkTransition, Head}; use reth_tasks::TaskSpawner; use rustc_hash::FxHashMap; use secp256k1::SecretKey; -use std::{ - collections::HashMap, - future::Future, - net::SocketAddr, - sync::{atomic::AtomicU64, Arc}, - task::{Context, Poll}, - time::{Duration, Instant}, -}; use tokio::{ io::{AsyncRead, AsyncWrite}, net::TcpStream, @@ -34,19 +50,13 @@ use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::PollSender; use tracing::{debug, instrument, trace}; -mod active; -mod conn; -mod counter; -mod handle; -pub use crate::message::PeerRequestSender; -use crate::protocol::{IntoRlpxSubProtocol, RlpxSubProtocolHandlers, RlpxSubProtocols}; -pub use conn::EthRlpxConnection; -pub use handle::{ - ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle, - SessionCommand, +use crate::{ + message::PeerMessage, + metrics::SessionManagerMetrics, + protocol::{IntoRlpxSubProtocol, RlpxSubProtocolHandlers, RlpxSubProtocols}, + session::active::ActiveSession, }; -use reth_eth_wire::multiplex::RlpxProtocolMultiplexer; -pub use reth_network_api::{Direction, PeerInfo}; + /// Internal identifier for active sessions. #[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)] pub struct SessionId(usize); diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index 39802b426..8038deed5 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -1,25 +1,5 @@ //! Keeps track of the state of the network. -use crate::{ - cache::LruCache, - discovery::{Discovery, DiscoveryEvent}, - fetch::{BlockResponseOutcome, FetchAction, StateFetcher}, - manager::DiscoveredEvent, - message::{ - BlockRequest, NewBlockMessage, PeerRequest, PeerRequestSender, PeerResponse, - PeerResponseResult, - }, - peers::{PeerAction, PeerAddr, PeersManager}, - FetchClient, -}; -use rand::seq::SliceRandom; - -use reth_eth_wire::{ - capability::Capabilities, BlockHashNumber, DisconnectReason, NewBlockHashes, Status, -}; -use reth_network_api::PeerKind; -use reth_network_peers::PeerId; -use reth_primitives::{ForkId, B256}; use std::{ collections::{HashMap, VecDeque}, fmt, @@ -31,9 +11,30 @@ use std::{ }, task::{Context, Poll}, }; + +use rand::seq::SliceRandom; +use reth_eth_wire::{ + capability::Capabilities, BlockHashNumber, DisconnectReason, NewBlockHashes, Status, +}; +use reth_network_peers::PeerId; +use reth_network_types::{PeerAddr, PeerKind}; +use reth_primitives::{ForkId, B256}; use tokio::sync::oneshot; use tracing::{debug, trace}; +use crate::{ + cache::LruCache, + discovery::{Discovery, DiscoveryEvent}, + fetch::{BlockResponseOutcome, FetchAction, StateFetcher}, + manager::DiscoveredEvent, + message::{ + BlockRequest, NewBlockMessage, PeerRequest, PeerRequestSender, PeerResponse, + PeerResponseResult, + }, + peers::{PeerAction, PeersManager}, + FetchClient, +}; + /// Cache limit of blocks to keep track of for a single peer. const PEER_BLOCK_CACHE_LIMIT: u32 = 512; diff --git a/crates/net/network/src/swarm.rs b/crates/net/network/src/swarm.rs index 83d26a56b..b2c93dc42 100644 --- a/crates/net/network/src/swarm.rs +++ b/crates/net/network/src/swarm.rs @@ -1,18 +1,3 @@ -use crate::{ - listener::{ConnectionListener, ListenerEvent}, - message::{PeerMessage, PeerRequestSender}, - peers::InboundConnectionError, - protocol::IntoRlpxSubProtocol, - session::{Direction, PendingSessionHandshakeError, SessionEvent, SessionId, SessionManager}, - state::{NetworkState, StateAction}, -}; -use futures::Stream; -use reth_eth_wire::{ - capability::{Capabilities, CapabilityMessage}, - errors::EthStreamError, - EthVersion, Status, -}; -use reth_network_peers::PeerId; use std::{ io, net::SocketAddr, @@ -21,8 +6,24 @@ use std::{ task::{Context, Poll}, }; +use futures::Stream; +use reth_eth_wire::{ + capability::{Capabilities, CapabilityMessage}, + errors::EthStreamError, + EthVersion, Status, +}; +use reth_network_peers::PeerId; use tracing::trace; +use crate::{ + listener::{ConnectionListener, ListenerEvent}, + message::{PeerMessage, PeerRequestSender}, + peers::InboundConnectionError, + protocol::IntoRlpxSubProtocol, + session::{Direction, PendingSessionHandshakeError, SessionEvent, SessionId, SessionManager}, + state::{NetworkState, StateAction}, +}; + #[cfg_attr(doc, aquamarine::aquamarine)] /// Contains the connectivity related state of the network. /// diff --git a/crates/net/network/src/test_utils/init.rs b/crates/net/network/src/test_utils/init.rs index 87ccbb5f9..767f68180 100644 --- a/crates/net/network/src/test_utils/init.rs +++ b/crates/net/network/src/test_utils/init.rs @@ -1,6 +1,7 @@ +use std::{net::SocketAddr, time::Duration}; + use enr::{k256::ecdsa::SigningKey, Enr, EnrPublicKey}; use reth_network_peers::PeerId; -use std::{net::SocketAddr, time::Duration}; /// The timeout for tests that create a `GethInstance` pub const GETH_TIMEOUT: Duration = Duration::from_secs(60); diff --git a/crates/net/network/src/test_utils/testnet.rs b/crates/net/network/src/test_utils/testnet.rs index 1fa0603f8..2357ff324 100644 --- a/crates/net/network/src/test_utils/testnet.rs +++ b/crates/net/network/src/test_utils/testnet.rs @@ -1,21 +1,20 @@ //! A network implementation for testing purposes. -use crate::{ - builder::ETH_REQUEST_CHANNEL_CAPACITY, - error::NetworkError, - eth_requests::EthRequestHandler, - peers::PeersHandle, - protocol::IntoRlpxSubProtocol, - transactions::{TransactionsHandle, TransactionsManager, TransactionsManagerConfig}, - NetworkConfig, NetworkConfigBuilder, NetworkEvent, NetworkEvents, NetworkHandle, - NetworkManager, +use std::{ + fmt, + future::Future, + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + pin::Pin, + task::{Context, Poll}, }; + use futures::{FutureExt, StreamExt}; use pin_project::pin_project; use reth_chainspec::MAINNET; use reth_eth_wire::{protocol::Protocol, DisconnectReason, HelloMessageWithProtocols}; -use reth_network_api::{NetworkInfo, Peers}; +use reth_network_api::{NetworkInfo, Peers, PeersHandleProvider}; use reth_network_peers::PeerId; +use reth_network_types::PeersHandle; use reth_provider::test_utils::NoopProvider; use reth_storage_api::{BlockReader, BlockReaderIdExt, HeaderProvider, StateProviderFactory}; use reth_tasks::TokioTaskExecutor; @@ -26,13 +25,6 @@ use reth_transaction_pool::{ EthTransactionPool, TransactionPool, TransactionValidationTaskExecutor, }; use secp256k1::SecretKey; -use std::{ - fmt, - future::Future, - net::{Ipv4Addr, SocketAddr, SocketAddrV4}, - pin::Pin, - task::{Context, Poll}, -}; use tokio::{ sync::{ mpsc::{channel, unbounded_channel}, @@ -41,6 +33,16 @@ use tokio::{ task::JoinHandle, }; +use crate::{ + builder::ETH_REQUEST_CHANNEL_CAPACITY, + error::NetworkError, + eth_requests::EthRequestHandler, + protocol::IntoRlpxSubProtocol, + transactions::{TransactionsHandle, TransactionsManager, TransactionsManagerConfig}, + NetworkConfig, NetworkConfigBuilder, NetworkEvent, NetworkEvents, NetworkHandle, + NetworkManager, +}; + /// A test network consisting of multiple peers. pub struct Testnet { /// All running peers in the network. diff --git a/crates/net/network/src/transactions/config.rs b/crates/net/network/src/transactions/config.rs index 9ddb9ce9c..083aefa8a 100644 --- a/crates/net/network/src/transactions/config.rs +++ b/crates/net/network/src/transactions/config.rs @@ -1,8 +1,9 @@ +use derive_more::Constructor; + use super::{ DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ, SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE, }; -use derive_more::Constructor; /// Configuration for managing transactions within the network. #[derive(Debug, Default, Clone)] diff --git a/crates/net/network/src/transactions/fetcher.rs b/crates/net/network/src/transactions/fetcher.rs index 41081b191..f5563d102 100644 --- a/crates/net/network/src/transactions/fetcher.rs +++ b/crates/net/network/src/transactions/fetcher.rs @@ -25,16 +25,15 @@ //! before it's re-tried. Nonetheless, the capacity of the buffered hashes cache must be large //! enough to buffer many hashes during network failure, to allow for recovery. -use crate::{ - cache::{LruCache, LruMap}, - duration_metered_exec, - message::PeerRequest, - metrics::TransactionFetcherMetrics, - transactions::{validation, PartiallyFilterMessage}, +use std::{ + collections::HashMap, + pin::Pin, + task::{ready, Context, Poll}, + time::Duration, }; + use derive_more::{Constructor, Deref}; use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt}; - use pin_project::pin_project; use reth_eth_wire::{ DedupPayload, EthVersion, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData, @@ -46,12 +45,6 @@ use reth_primitives::{PooledTransactionsElement, TxHash}; use schnellru::ByLength; #[cfg(debug_assertions)] use smallvec::{smallvec, SmallVec}; -use std::{ - collections::HashMap, - pin::Pin, - task::{ready, Context, Poll}, - time::Duration, -}; use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError}; use tracing::{debug, trace}; use validation::FilterOutcome; @@ -62,6 +55,13 @@ use super::{ MessageFilter, PeerMetadata, PooledTransactions, SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE, }; +use crate::{ + cache::{LruCache, LruMap}, + duration_metered_exec, + message::PeerRequest, + metrics::TransactionFetcherMetrics, + transactions::{validation, PartiallyFilterMessage}, +}; /// The type responsible for fetching missing transactions from peers. /// diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 5abc9cf82..7e9c52f5f 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -1,5 +1,36 @@ //! Transactions management for the p2p network. +/// Aggregation on configurable parameters for [`TransactionsManager`]. +pub mod config; +/// Default and spec'd bounds. +pub mod constants; +/// Component responsible for fetching transactions from [`NewPooledTransactionHashes`]. +pub mod fetcher; +pub mod validation; + +pub use self::constants::{ + tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ, + SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE, +}; +pub use config::{TransactionFetcherConfig, TransactionsManagerConfig}; +pub use validation::*; + +pub(crate) use fetcher::{FetchEvent, TransactionFetcher}; + +use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE}; +use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE; + +use std::{ + collections::{hash_map::Entry, HashMap, HashSet}, + pin::Pin, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + task::{Context, Poll}, + time::{Duration, Instant}, +}; + use futures::{stream::FuturesUnordered, Future, StreamExt}; use reth_eth_wire::{ EthVersion, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData, @@ -21,16 +52,6 @@ use reth_transaction_pool::{ GetPooledTransactionLimit, PoolTransaction, PropagateKind, PropagatedTransactions, TransactionPool, ValidPoolTransaction, }; -use std::{ - collections::{hash_map::Entry, HashMap, HashSet}, - pin::Pin, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, - task::{Context, Poll}, - time::{Duration, Instant}, -}; use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError}; use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream}; use tracing::{debug, trace}; @@ -50,25 +71,6 @@ use crate::{ NetworkEvents, NetworkHandle, }; -/// Aggregation on configurable parameters for [`TransactionsManager`]. -pub mod config; -/// Default and spec'd bounds. -pub mod constants; -/// Component responsible for fetching transactions from [`NewPooledTransactionHashes`]. -pub mod fetcher; -pub mod validation; -pub use config::{TransactionFetcherConfig, TransactionsManagerConfig}; - -use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE; -pub(crate) use fetcher::{FetchEvent, TransactionFetcher}; -pub use validation::*; - -pub use self::constants::{ - tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ, - SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE, -}; -use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE}; - /// The future for importing transactions into the pool. /// /// Resolves with the result of each transaction import. diff --git a/crates/net/network/tests/it/connect.rs b/crates/net/network/tests/it/connect.rs index 73dcd272e..9433d87c1 100644 --- a/crates/net/network/tests/it/connect.rs +++ b/crates/net/network/tests/it/connect.rs @@ -1,5 +1,7 @@ //! Connection tests +use std::{collections::HashSet, net::SocketAddr, time::Duration}; + use alloy_node_bindings::Geth; use alloy_provider::{ext::AdminApi, ProviderBuilder}; use futures::StreamExt; @@ -8,7 +10,8 @@ use reth_eth_wire::{DisconnectReason, HeadersDirection}; use reth_net_banlist::BanList; use reth_network::{ test_utils::{enr_to_peer_id, NetworkEventStream, PeerConfig, Testnet, GETH_TIMEOUT}, - NetworkConfigBuilder, NetworkEvent, NetworkEvents, NetworkManager, PeersConfig, + BlockDownloaderProvider, NetworkConfigBuilder, NetworkEvent, NetworkEvents, NetworkManager, + PeersConfig, }; use reth_network_api::{NetworkInfo, Peers, PeersInfo}; use reth_network_p2p::{ @@ -19,7 +22,6 @@ use reth_network_peers::{mainnet_nodes, NodeRecord, TrustedPeer}; use reth_provider::test_utils::NoopProvider; use reth_transaction_pool::test_utils::testing_pool; use secp256k1::SecretKey; -use std::{collections::HashSet, net::SocketAddr, time::Duration}; use tokio::task; use url::Host; diff --git a/crates/net/network/tests/it/multiplex.rs b/crates/net/network/tests/it/multiplex.rs index ea63ace95..5e3e8f40c 100644 --- a/crates/net/network/tests/it/multiplex.rs +++ b/crates/net/network/tests/it/multiplex.rs @@ -1,7 +1,12 @@ #![allow(unreachable_pub)] //! Testing gossiping of transactions. -use crate::multiplex::proto::{PingPongProtoMessage, PingPongProtoMessageKind}; +use std::{ + net::SocketAddr, + pin::Pin, + task::{ready, Context, Poll}, +}; + use futures::{Stream, StreamExt}; use reth_eth_wire::{ capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol, @@ -13,14 +18,11 @@ use reth_network::{ use reth_network_api::{Direction, PeerId}; use reth_primitives::BytesMut; use reth_provider::test_utils::MockEthProvider; -use std::{ - net::SocketAddr, - pin::Pin, - task::{ready, Context, Poll}, -}; use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::UnboundedReceiverStream; +use crate::multiplex::proto::{PingPongProtoMessage, PingPongProtoMessageKind}; + /// A simple Rlpx subprotocol that sends pings and pongs mod proto { use super::*; diff --git a/crates/net/network/tests/it/requests.rs b/crates/net/network/tests/it/requests.rs index 85669dc9f..cca48186c 100644 --- a/crates/net/network/tests/it/requests.rs +++ b/crates/net/network/tests/it/requests.rs @@ -1,11 +1,13 @@ #![allow(unreachable_pub)] //! Tests for eth related requests +use std::sync::Arc; + use rand::Rng; use reth_eth_wire::HeadersDirection; use reth_network::{ test_utils::{NetworkEventStream, Testnet}, - NetworkEvents, + BlockDownloaderProvider, NetworkEvents, }; use reth_network_api::{NetworkInfo, Peers}; use reth_network_p2p::{ @@ -17,7 +19,6 @@ use reth_primitives::{ U256, }; use reth_provider::test_utils::MockEthProvider; -use std::sync::Arc; /// Returns a new [`TransactionSigned`] with some random parameters pub fn rng_transaction(rng: &mut impl rand::RngCore) -> TransactionSigned { diff --git a/crates/net/network/tests/it/startup.rs b/crates/net/network/tests/it/startup.rs index 9317bef0f..269b352e4 100644 --- a/crates/net/network/tests/it/startup.rs +++ b/crates/net/network/tests/it/startup.rs @@ -1,3 +1,8 @@ +use std::{ + io, + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, +}; + use reth_discv4::Discv4Config; use reth_network::{ error::{NetworkError, ServiceKind}, @@ -6,10 +11,6 @@ use reth_network::{ use reth_network_api::{NetworkInfo, PeersInfo}; use reth_provider::test_utils::NoopProvider; use secp256k1::SecretKey; -use std::{ - io, - net::{Ipv4Addr, SocketAddr, SocketAddrV4}, -}; use tokio::net::TcpListener; fn is_addr_in_use_kind(err: &NetworkError, kind: ServiceKind) -> bool { diff --git a/crates/net/network/tests/it/txgossip.rs b/crates/net/network/tests/it/txgossip.rs index 68a7775f0..d5c5ccf63 100644 --- a/crates/net/network/tests/it/txgossip.rs +++ b/crates/net/network/tests/it/txgossip.rs @@ -1,5 +1,7 @@ //! Testing gossiping of transactions. +use std::sync::Arc; + use futures::StreamExt; use rand::thread_rng; use reth_network::{test_utils::Testnet, NetworkEvent, NetworkEvents}; @@ -7,7 +9,6 @@ use reth_network_api::PeersInfo; use reth_primitives::{TransactionSigned, TxLegacy, U256}; use reth_provider::test_utils::{ExtendedAccount, MockEthProvider}; use reth_transaction_pool::{test_utils::TransactionGenerator, PoolTransaction, TransactionPool}; -use std::sync::Arc; #[tokio::test(flavor = "multi_thread")] async fn test_tx_gossip() { diff --git a/crates/net/p2p/src/reputation.rs b/crates/net/p2p/src/reputation.rs index c2b2860a6..74f83ba4c 100644 --- a/crates/net/p2p/src/reputation.rs +++ b/crates/net/p2p/src/reputation.rs @@ -189,3 +189,16 @@ impl From for ReputationChange { Self(value) } } + +/// Outcomes when a reputation change is applied to a peer +#[derive(Debug, Clone, Copy)] +pub enum ReputationChangeOutcome { + /// Nothing to do. + None, + /// Ban the peer. + Ban, + /// Ban and disconnect + DisconnectAndBan, + /// Unban the peer + Unban, +} diff --git a/crates/node/builder/src/launch/mod.rs b/crates/node/builder/src/launch/mod.rs index 138403ed1..7791c7002 100644 --- a/crates/node/builder/src/launch/mod.rs +++ b/crates/node/builder/src/launch/mod.rs @@ -15,7 +15,7 @@ use reth_blockchain_tree::{noop::NoopBlockchainTree, BlockchainTreeConfig}; use reth_consensus_debug_client::{DebugConsensusClient, EtherscanBlockProvider, RpcBlockProvider}; use reth_engine_util::EngineMessageStreamExt; use reth_exex::ExExManagerHandle; -use reth_network::{NetworkEvents, NetworkHandle}; +use reth_network::{BlockDownloaderProvider, NetworkEvents, NetworkHandle}; use reth_node_api::{FullNodeComponents, FullNodeTypes, NodeAddOns}; use reth_node_core::{ dirs::{ChainPath, DataDirPath}, diff --git a/examples/custom-rlpx-subprotocol/src/main.rs b/examples/custom-rlpx-subprotocol/src/main.rs index 3a198c38d..d528a5a92 100644 --- a/examples/custom-rlpx-subprotocol/src/main.rs +++ b/examples/custom-rlpx-subprotocol/src/main.rs @@ -7,15 +7,17 @@ //! ``` //! //! This launch a regular reth node with a custom rlpx subprotocol. + +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; + use reth::builder::NodeHandle; use reth_network::{ config::SecretKey, protocol::IntoRlpxSubProtocol, NetworkConfig, NetworkManager, - NetworkProtocols, + NetworkProtocols, PeersHandleProvider, }; use reth_network_api::NetworkInfo; use reth_node_ethereum::EthereumNode; use reth_provider::test_utils::NoopProvider; -use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use subprotocol::{ connection::CustomCommand, protocol::{