From ce64fefd78310407317eae860cd6983ba3b6902a Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 25 Oct 2022 14:23:24 +0200 Subject: [PATCH] feat(net): add discv4 crate (#113) * port kad * feat: port kad bucket * feat: add discv4 * chore: rustfmt * cargo update * just reuse discv5 table * test: add rlp tests * message encoding * feat: impl codec roundtrip testing * more work in message handling * implement ping * feat: impl commands * cleanup * more cleanup * trim config * more docs * feat: implement recursive lookup * docs * cleanup config * feat: implement update stream * chore: config cleanup * docs: add crate docs * feat: more testing * fix deny * clarify ring * docs: more docs * use discv5 master * docs: address review and add comments * update readme * rustmft * chore(clippy): make clippy happy --- Cargo.lock | 937 ++++++++++++++++++-- Cargo.toml | 1 + crates/net/discv4/Cargo.toml | 42 + crates/net/discv4/README.md | 22 + crates/net/discv4/src/bootnodes.rs | 61 ++ crates/net/discv4/src/config.rs | 128 +++ crates/net/discv4/src/error.rs | 36 + crates/net/discv4/src/lib.rs | 1273 ++++++++++++++++++++++++++++ crates/net/discv4/src/node.rs | 211 +++++ crates/net/discv4/src/proto.rs | 578 +++++++++++++ crates/primitives/src/lib.rs | 2 +- 11 files changed, 3230 insertions(+), 61 deletions(-) create mode 100644 crates/net/discv4/Cargo.toml create mode 100644 crates/net/discv4/README.md create mode 100644 crates/net/discv4/src/bootnodes.rs create mode 100644 crates/net/discv4/src/config.rs create mode 100644 crates/net/discv4/src/error.rs create mode 100644 crates/net/discv4/src/lib.rs create mode 100644 crates/net/discv4/src/node.rs create mode 100644 crates/net/discv4/src/proto.rs diff --git a/Cargo.lock b/Cargo.lock index e6c693508..26f1f74ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,28 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "aead" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b613b8e1e3cf911a086f53f03bf286f52fd7a7258e4fa606f0ef220d39d8877" +dependencies = [ + "generic-array", +] + +[[package]] +name = "aes" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e8b47f52ea9bae42228d07ec09eb676433d7c4ed1ebdf0f1d1c29ed446f1ab8" +dependencies = [ + "cfg-if", + "cipher 0.3.0", + "cpufeatures", + "ctr 0.8.0", + "opaque-debug", +] + [[package]] name = "aes" version = "0.8.1" @@ -9,17 +31,31 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bfe0133578c0986e1fe3dfcd4af1cc5b2dd6c3dbf534d69916ce16a2701d40ba" dependencies = [ "cfg-if", - "cipher", + "cipher 0.4.3", "cpufeatures", ] +[[package]] +name = "aes-gcm" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df5f85a83a7d8b0442b6aa7b504b8212c1733da07b98aae43d4bc21b2cb3cdf6" +dependencies = [ + "aead", + "aes 0.7.5", + "cipher 0.3.0", + "ctr 0.8.0", + "ghash", + "subtle", +] + [[package]] name = "ahash" version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" dependencies = [ - "getrandom", + "getrandom 0.2.8", "once_cell", "version_check", ] @@ -429,6 +465,15 @@ dependencies = [ "half", ] +[[package]] +name = "cipher" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ee52072ec15386f770805afd189a01c8841be8696bed250fa2f13c4c0d6dfb7" +dependencies = [ + "generic-array", +] + [[package]] name = "cipher" version = "0.4.3" @@ -473,7 +518,7 @@ dependencies = [ "clap_derive", "clap_lex 0.3.0", "once_cell", - "strsim", + "strsim 0.10.0", "termcolor", ] @@ -687,7 +732,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef2b4b23cddf68b89b8f8069890e8c270d54e2d5fe1b143820234805e4cb17ef" dependencies = [ "generic-array", - "rand_core", + "rand_core 0.6.4", "subtle", "zeroize", ] @@ -702,13 +747,45 @@ dependencies = [ "typenum", ] +[[package]] +name = "ctr" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "049bb91fb4aaf0e3c7efa6cd5ef877dbbbd15b39dad06d9948de4ec8a75761ea" +dependencies = [ + "cipher 0.3.0", +] + [[package]] name = "ctr" version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0369ee1ad671834580515889b80f2ea915f23b8be8d0daa4bbaf2ac5c7590835" dependencies = [ - "cipher", + "cipher 0.4.3", +] + +[[package]] +name = "curve25519-dalek" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b9fdf9972b2bd6af2d913799d9ebc165ea4d2e65878e329d9c6b372c4491b61" +dependencies = [ + "byteorder", + "digest 0.9.0", + "rand_core 0.5.1", + "subtle", + "zeroize", +] + +[[package]] +name = "darling" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d706e75d87e35569db781a9b5e2416cff1236a47ed380831f959382ccd5f858" +dependencies = [ + "darling_core 0.10.2", + "darling_macro 0.10.2", ] [[package]] @@ -717,8 +794,22 @@ version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4529658bdda7fd6769b8614be250cdcfc3aeb0ee72fe66f9e41e5e5eb73eac02" dependencies = [ - "darling_core", - "darling_macro", + "darling_core 0.14.1", + "darling_macro 0.14.1", +] + +[[package]] +name = "darling_core" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0c960ae2da4de88a91b2d920c2a7233b400bc33cb28453a2987822d8392519b" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim 0.9.3", + "syn", ] [[package]] @@ -731,7 +822,18 @@ dependencies = [ "ident_case", "proc-macro2", "quote", - "strsim", + "strsim 0.10.0", + "syn", +] + +[[package]] +name = "darling_macro" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b5a2f4ac4969822c62224815d069952656cadc7084fdca9751e6d959189b72" +dependencies = [ + "darling_core 0.10.2", + "quote", "syn", ] @@ -741,7 +843,7 @@ version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ddfc69c5bfcbd2fc09a0f38451d2daf0e372e367986a83906d1b0dbc88134fb5" dependencies = [ - "darling_core", + "darling_core 0.14.1", "quote", "syn", ] @@ -753,10 +855,26 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" dependencies = [ "cfg-if", - "hashbrown", + "hashbrown 0.12.3", "lock_api", "once_cell", - "parking_lot_core", + "parking_lot_core 0.9.4", +] + +[[package]] +name = "data-encoding" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ee2393c4a91429dffb4bedf19f4d6abf27d8a732c8ce4980305d782e5426d57" + +[[package]] +name = "delay_map" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6716ce9729be9628979ae1ff63e8bc8b7ad53b5472a2633bf079607a55328d36" +dependencies = [ + "futures", + "tokio-util 0.6.10", ] [[package]] @@ -780,6 +898,31 @@ dependencies = [ "syn", ] +[[package]] +name = "derive_builder" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2658621297f2cf68762a6f7dc0bb7e1ff2cfd6583daef8ee0fed6f7ec468ec0" +dependencies = [ + "darling 0.10.2", + "derive_builder_core", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "derive_builder_core" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2791ea3e372c8495c0bc2033991d76b512cd799d07491fbd6890124db9458bef" +dependencies = [ + "darling 0.10.2", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "derive_more" version = "0.99.17" @@ -813,6 +956,50 @@ dependencies = [ "subtle", ] +[[package]] +name = "discv5" +version = "0.1.0" +source = "git+https://github.com/sigp/discv5#7d8c1ce072de384a472beebaf03d36fb463b9f7a" +dependencies = [ + "aes 0.7.5", + "aes-gcm", + "arrayvec", + "delay_map", + "enr 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", + "fnv", + "futures", + "hashlink", + "hex", + "hkdf", + "lazy_static", + "lru", + "more-asserts", + "parking_lot 0.11.2", + "rand 0.8.5", + "rlp", + "smallvec", + "socket2", + "tokio", + "tokio-stream", + "tokio-util 0.6.10", + "tracing", + "tracing-subscriber", + "uint", + "zeroize", +] + +[[package]] +name = "dns-lookup" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53ecafc952c4528d9b51a458d1a8904b81783feff9fde08ab6ed2545ff396872" +dependencies = [ + "cfg-if", + "libc", + "socket2", + "winapi", +] + [[package]] name = "ecdsa" version = "0.14.8" @@ -825,6 +1012,29 @@ dependencies = [ "signature", ] +[[package]] +name = "ed25519" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e9c280362032ea4203659fc489832d0204ef09f247a0506f170dafcac08c369" +dependencies = [ + "signature", +] + +[[package]] +name = "ed25519-dalek" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c762bae6dcaf24c4c84667b8579785430908723d5c889f469d76a41d59cc7a9d" +dependencies = [ + "curve25519-dalek", + "ed25519", + "rand 0.7.3", + "serde", + "sha2 0.9.9", + "zeroize", +] + [[package]] name = "educe" version = "0.4.19" @@ -857,7 +1067,7 @@ dependencies = [ "generic-array", "group", "pkcs8", - "rand_core", + "rand_core 0.6.4", "sec1", "subtle", "zeroize", @@ -873,6 +1083,32 @@ dependencies = [ "void", ] +[[package]] +name = "endian-type" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" + +[[package]] +name = "enr" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26fa0a0be8915790626d5759eb51fe47435a8eac92c2f212bd2da9aa7f30ea56" +dependencies = [ + "base64", + "bs58", + "bytes", + "ed25519-dalek", + "hex", + "k256", + "log", + "rand 0.8.5", + "rlp", + "serde", + "sha3", + "zeroize", +] + [[package]] name = "enr" version = "0.6.2" @@ -884,7 +1120,7 @@ dependencies = [ "hex", "k256", "log", - "rand", + "rand 0.8.5", "rlp", "secp256k1", "serde", @@ -892,6 +1128,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "enum-as-inner" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "570d109b813e904becc80d8d5da38376818a143348413f7149f1340fe04754d4" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "enum-ordinalize" version = "3.1.11" @@ -968,7 +1216,7 @@ dependencies = [ "generic-array", "hex", "k256", - "rand", + "rand 0.8.5", "rlp", "rlp-derive", "rust_decimal", @@ -1042,7 +1290,7 @@ version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df689201f395c6b90dfe87127685f8dbfc083a5e779e613575d8bd7314300c3e" dependencies = [ - "rand_core", + "rand_core 0.6.4", "subtle", ] @@ -1053,7 +1301,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cfcf0ed7fe52a17a03854ec54a9f76d6d84508d1c0e66bc1793301c73fc8493c" dependencies = [ "byteorder", - "rand", + "rand 0.8.5", "rustc-hex", "static_assertions", ] @@ -1064,6 +1312,15 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "form_urlencoded" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9c384f161156f5260c24a097c56119f9be8c798586aecc13afbcbe7b7e26bf8" +dependencies = [ + "percent-encoding", +] + [[package]] name = "funty" version = "2.0.0" @@ -1179,6 +1436,17 @@ dependencies = [ "version_check", ] +[[package]] +name = "getrandom" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce" +dependencies = [ + "cfg-if", + "libc", + "wasi 0.9.0+wasi-snapshot-preview1", +] + [[package]] name = "getrandom" version = "0.2.8" @@ -1187,7 +1455,17 @@ checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31" dependencies = [ "cfg-if", "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", +] + +[[package]] +name = "ghash" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1583cc1656d7839fd3732b80cf4f38850336cdb9b8ded1cd399ca62958de3c99" +dependencies = [ + "opaque-debug", + "polyval", ] [[package]] @@ -1261,7 +1539,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5dfbfb3a6cfbd390d5c9564ab283a0349b9b9fcd46a706c1eb10e0db70bfbac7" dependencies = [ "ff", - "rand_core", + "rand_core 0.6.4", "subtle", ] @@ -1280,7 +1558,7 @@ dependencies = [ "indexmap", "slab", "tokio", - "tokio-util", + "tokio-util 0.7.4", "tracing", ] @@ -1299,6 +1577,15 @@ dependencies = [ "byteorder", ] +[[package]] +name = "hashbrown" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" +dependencies = [ + "ahash", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -1308,6 +1595,15 @@ dependencies = [ "ahash", ] +[[package]] +name = "hashlink" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7249a3129cbc1ffccd74857f81464a323a152173cdb134e0fd81bc803b29facf" +dependencies = [ + "hashbrown 0.11.2", +] + [[package]] name = "heapless" version = "0.7.16" @@ -1349,6 +1645,15 @@ version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ebdb29d2ea9ed0083cd8cece49bbd968021bd99b0849edb4a9a7ee0fdf6a4e0" +[[package]] +name = "hkdf" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "791a029f6b9fc27657f6f188ec6e5e43f6911f6f878e0dc5501396e09809d437" +dependencies = [ + "hmac", +] + [[package]] name = "hmac" version = "0.12.1" @@ -1441,6 +1746,20 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "hyper-system-resolver" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6eea26c5d0b6ab9d72219f65000af310f042a740926f7b2fa3553e774036e2e7" +dependencies = [ + "derive_builder", + "dns-lookup", + "hyper", + "tokio", + "tower-service", + "tracing", +] + [[package]] name = "iai" version = "0.1.1" @@ -1453,6 +1772,27 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" +[[package]] +name = "idna" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8" +dependencies = [ + "matches", + "unicode-bidi", + "unicode-normalization", +] + +[[package]] +name = "idna" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e14ddfc70884202db2244c223200c204c2bda1bc6e0998d11b5e024d657209e6" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "if_chain" version = "1.0.2" @@ -1510,7 +1850,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e" dependencies = [ "autocfg", - "hashbrown", + "hashbrown 0.12.3", ] [[package]] @@ -1532,6 +1872,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "ipnet" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "879d54834c8c76457ef4293a689b2a8c59b076067ad77b15efafbb05f92a592b" + [[package]] name = "itertools" version = "0.9.0" @@ -1602,7 +1948,7 @@ dependencies = [ "thiserror", "tokio", "tokio-rustls", - "tokio-util", + "tokio-util 0.7.4", "tracing", "webpki-roots", ] @@ -1626,8 +1972,8 @@ dependencies = [ "hyper", "jsonrpsee-types", "lazy_static", - "parking_lot", - "rand", + "parking_lot 0.12.1", + "rand 0.8.5", "rustc-hash", "serde", "serde_json", @@ -1742,7 +2088,7 @@ dependencies = [ "soketto", "tokio", "tokio-stream", - "tokio-util", + "tokio-util 0.7.4", "tracing", "tracing-futures", ] @@ -1756,7 +2102,7 @@ dependencies = [ "cfg-if", "ecdsa", "elliptic-curve", - "sha2", + "sha2 0.10.6", "sha3", ] @@ -1831,12 +2177,36 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "lru" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999beba7b6e8345721bd280141ed958096a2e4abdf74f67ff4ce49b4b54e47a" +dependencies = [ + "hashbrown 0.12.3", +] + [[package]] name = "maplit" version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata", +] + +[[package]] +name = "matches" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" + [[package]] name = "memchr" version = "2.5.0" @@ -1888,10 +2258,16 @@ checksum = "57ee1c23c7c63b0c9250c339ffdc69255f110b298b901b9f6c82547b7b87caaf" dependencies = [ "libc", "log", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys 0.36.1", ] +[[package]] +name = "more-asserts" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7843ec2de400bcbc6a6328c958dc38e5359da6e93e72e37bc5246bf1ae776389" + [[package]] name = "nb" version = "0.1.3" @@ -1907,6 +2283,15 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "546c37ac5d9e56f55e73b677106873d9d9f5190605e41a856503623648488cae" +[[package]] +name = "nibble_vec" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43" +dependencies = [ + "smallvec", +] + [[package]] name = "nom" version = "7.1.1" @@ -1917,6 +2302,16 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num" version = "0.4.0" @@ -2024,6 +2419,15 @@ dependencies = [ "syn", ] +[[package]] +name = "num_threads" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44" +dependencies = [ + "libc", +] + [[package]] name = "once_cell" version = "1.15.0" @@ -2054,6 +2458,12 @@ version = "6.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ff7415e9ae3fff1225851df9e0d9e4e5479f947619774677a63572e55e80eff" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "page_size" version = "0.4.2" @@ -2091,6 +2501,17 @@ dependencies = [ "syn", ] +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core 0.8.5", +] + [[package]] name = "parking_lot" version = "0.12.1" @@ -2098,7 +2519,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core", + "parking_lot_core 0.9.4", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall", + "smallvec", + "winapi", ] [[package]] @@ -2126,6 +2561,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" +[[package]] +name = "percent-encoding" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e" + [[package]] name = "pest" version = "2.4.0" @@ -2206,6 +2647,18 @@ dependencies = [ "plotters-backend", ] +[[package]] +name = "polyval" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8419d2b623c7c0896ff2d5d96e2cb4ede590fed28fcc34934f4c33c036e620a1" +dependencies = [ + "cfg-if", + "cpufeatures", + "opaque-debug", + "universal-hash", +] + [[package]] name = "portable-atomic" version = "0.3.15" @@ -2299,14 +2752,35 @@ dependencies = [ "lazy_static", "num-traits", "quick-error 2.0.1", - "rand", - "rand_chacha", + "rand 0.8.5", + "rand_chacha 0.3.1", "rand_xorshift", "regex-syntax", "rusty-fork", "tempfile", ] +[[package]] +name = "public-ip" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b4c40db5262d93298c363a299f8bc1b3a956a78eecddba3bc0e58b76e2f419a" +dependencies = [ + "dns-lookup", + "futures-core", + "futures-util", + "http", + "hyper", + "hyper-system-resolver", + "pin-project-lite", + "thiserror", + "tokio", + "tracing", + "tracing-futures", + "trust-dns-client", + "trust-dns-proto", +] + [[package]] name = "quick-error" version = "1.2.3" @@ -2334,6 +2808,29 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" +[[package]] +name = "radix_trie" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd" +dependencies = [ + "endian-type", + "nibble_vec", +] + +[[package]] +name = "rand" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" +dependencies = [ + "getrandom 0.1.16", + "libc", + "rand_chacha 0.2.2", + "rand_core 0.5.1", + "rand_hc", +] + [[package]] name = "rand" version = "0.8.5" @@ -2341,8 +2838,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha", - "rand_core", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_chacha" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" +dependencies = [ + "ppv-lite86", + "rand_core 0.5.1", ] [[package]] @@ -2352,7 +2859,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_core" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" +dependencies = [ + "getrandom 0.1.16", ] [[package]] @@ -2361,7 +2877,16 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "getrandom", + "getrandom 0.2.8", +] + +[[package]] +name = "rand_hc" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" +dependencies = [ + "rand_core 0.5.1", ] [[package]] @@ -2370,7 +2895,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d25bf25ec5ae4a3f1b92f929810509a2f53d7dca2f50b794ff57e3face536c8f" dependencies = [ - "rand_core", + "rand_core 0.6.4", ] [[package]] @@ -2417,6 +2942,15 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax", +] + [[package]] name = "regex-syntax" version = "0.6.27" @@ -2481,16 +3015,38 @@ dependencies = [ "tokio", ] +[[package]] +name = "reth-discv4" +version = "0.1.0" +dependencies = [ + "bytes", + "discv5", + "generic-array", + "hex", + "public-ip", + "rand 0.8.5", + "reth-primitives", + "reth-rlp", + "reth-rlp-derive", + "secp256k1", + "thiserror", + "tokio", + "tokio-stream", + "tracing", + "tracing-test", + "url", +] + [[package]] name = "reth-ecies" version = "0.1.0" dependencies = [ - "aes", + "aes 0.8.1", "block-padding", "byteorder", "bytes", - "cipher", - "ctr", + "cipher 0.4.3", + "ctr 0.9.2", "digest 0.10.5", "educe", "futures", @@ -2499,16 +3055,16 @@ dependencies = [ "hex-literal", "hmac", "proptest", - "rand", + "rand 0.8.5", "reth-primitives", "reth-rlp", "secp256k1", - "sha2", + "sha2 0.10.6", "sha3", "thiserror", "tokio", "tokio-stream", - "tokio-util", + "tokio-util 0.7.4", "tracing", "typenum", ] @@ -2525,7 +3081,7 @@ dependencies = [ "hex-literal", "maplit", "pin-project", - "rand", + "rand 0.8.5", "reth-ecies", "reth-primitives", "reth-rlp", @@ -2533,7 +3089,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", - "tokio-util", + "tokio-util 0.7.4", "tracing", ] @@ -2556,7 +3112,7 @@ dependencies = [ "assert_matches", "async-trait", "once_cell", - "rand", + "rand 0.8.5", "reth-interfaces", "reth-primitives", "reth-rpc-types", @@ -2577,7 +3133,7 @@ dependencies = [ "heapless", "parity-scale-codec", "postcard", - "rand", + "rand 0.8.5", "reth-codecs", "reth-db", "reth-primitives", @@ -2600,8 +3156,8 @@ dependencies = [ "indexmap", "libc", "lifetimed-bytes", - "parking_lot", - "rand", + "parking_lot 0.12.1", + "rand 0.8.5", "rand_xorshift", "reth-mdbx-sys", "tempfile", @@ -2621,7 +3177,7 @@ dependencies = [ name = "reth-p2p" version = "0.1.0" dependencies = [ - "enr", + "enr 0.6.2 (git+https://github.com/sigp/enr)", "secp256k1", "serde", "serde_derive", @@ -2742,9 +3298,9 @@ dependencies = [ "fnv", "futures", "linked-hash-map", - "parking_lot", + "parking_lot 0.12.1", "paste", - "rand", + "rand 0.8.5", "reth-primitives", "serde", "thiserror", @@ -2760,7 +3316,7 @@ dependencies = [ "arrayref", "auto_impl", "bytes", - "hashbrown", + "hashbrown 0.12.3", "num_enum", "primitive-types", "revm_precompiles", @@ -2775,13 +3331,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "00e68901326fe20437526cb6d64a2898d2976383b7d222329dfce1717902da50" dependencies = [ "bytes", - "hashbrown", + "hashbrown 0.12.3", "num", "once_cell", "primitive-types", "ripemd", "secp256k1", - "sha2", + "sha2 0.10.6", "sha3", "substrate-bn", ] @@ -3040,7 +3596,7 @@ version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7649a0b3ffb32636e60c7ce0d70511eda9c52c658cd0634e194d5a19943aeff" dependencies = [ - "rand", + "rand 0.8.5", "secp256k1-sys", ] @@ -3165,7 +3721,7 @@ dependencies = [ "futures", "lazy_static", "log", - "parking_lot", + "parking_lot 0.12.1", "serial_test_derive", ] @@ -3205,6 +3761,19 @@ dependencies = [ "digest 0.10.5", ] +[[package]] +name = "sha2" +version = "0.9.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800" +dependencies = [ + "block-buffer 0.9.0", + "cfg-if", + "cpufeatures", + "digest 0.9.0", + "opaque-debug", +] + [[package]] name = "sha2" version = "0.10.6" @@ -3226,6 +3795,15 @@ dependencies = [ "keccak", ] +[[package]] +name = "sharded-slab" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.1.0" @@ -3248,7 +3826,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c" dependencies = [ "digest 0.10.5", - "rand_core", + "rand_core 0.6.4", ] [[package]] @@ -3287,7 +3865,7 @@ dependencies = [ "futures", "httparse", "log", - "rand", + "rand 0.8.5", "sha-1 0.9.8", ] @@ -3328,6 +3906,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "strsim" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6446ced80d6c486436db5c078dde11a9f73d42b57fb273121e160b84f63d894c" + [[package]] name = "strsim" version = "0.10.0" @@ -3375,7 +3959,7 @@ dependencies = [ "byteorder", "crunchy", "lazy_static", - "rand", + "rand 0.8.5", "rustc-hex", ] @@ -3405,6 +3989,18 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "synstructure" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f36bdaa60a83aca3921b5259d5400cbf5e90fc51931376a9bd4a0eb79aa7210f" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "unicode-xid", +] + [[package]] name = "tap" version = "1.0.1" @@ -3465,7 +4061,7 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "856bbca0314c328004691b9c0639fb198ca764d1ce0e20d4dd8b78f2697c2a6f" dependencies = [ - "darling", + "darling 0.14.1", "if_chain", "lazy_static", "proc-macro2", @@ -3517,6 +4113,33 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5516c27b78311c50bf42c071425c560ac799b11c30b31f87e3081965fe5e0180" +dependencies = [ + "once_cell", +] + +[[package]] +name = "time" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fab5c8b9980850e06d92ddbe3ab839c062c801f3927c0fb8abd6fc8e918fbca" +dependencies = [ + "libc", + "num_threads", + "serde", + "time-core", +] + +[[package]] +name = "time-core" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e153e1f1acaef8acc537e68b44906d2db6436e2b35ac2c6b42640fff91f00fd" + [[package]] name = "tiny-keccak" version = "2.0.2" @@ -3536,6 +4159,21 @@ dependencies = [ "serde_json", ] +[[package]] +name = "tinyvec" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" + [[package]] name = "tokio" version = "1.21.2" @@ -3548,7 +4186,7 @@ dependencies = [ "memchr", "mio", "num_cpus", - "parking_lot", + "parking_lot 0.12.1", "pin-project-lite", "signal-hook-registry", "socket2", @@ -3587,7 +4225,22 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", - "tokio-util", + "tokio-util 0.7.4", +] + +[[package]] +name = "tokio-util" +version = "0.6.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36943ee01a6d67977dd3f84a5a1d2efeb4ada3a1ae771cadfaa535d9d9fc6507" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "log", + "pin-project-lite", + "slab", + "tokio", ] [[package]] @@ -3640,6 +4293,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ "cfg-if", + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -3663,6 +4317,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a" dependencies = [ "once_cell", + "valuable", ] [[package]] @@ -3671,10 +4326,109 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" dependencies = [ + "futures", + "futures-task", "pin-project", "tracing", ] +[[package]] +name = "tracing-log" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922" +dependencies = [ + "lazy_static", + "log", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6176eae26dd70d0c919749377897b54a9276bd7061339665dd68777926b5a70" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", +] + +[[package]] +name = "tracing-test" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e3d272c44878d2bbc9f4a20ad463724f03e19dbc667c6e84ac433ab7ffcc70b" +dependencies = [ + "lazy_static", + "tracing-core", + "tracing-subscriber", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "744324b12d69a9fc1edea4b38b7b1311295b662d161ad5deac17bb1358224a08" +dependencies = [ + "lazy_static", + "quote", + "syn", +] + +[[package]] +name = "trust-dns-client" +version = "0.20.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b4ef9b9bde0559b78a4abb00339143750085f05e5a453efb7b8bef1061f09dc" +dependencies = [ + "cfg-if", + "data-encoding", + "futures-channel", + "futures-util", + "lazy_static", + "log", + "radix_trie", + "rand 0.8.5", + "thiserror", + "time", + "tokio", + "trust-dns-proto", +] + +[[package]] +name = "trust-dns-proto" +version = "0.20.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca94d4e9feb6a181c690c4040d7a24ef34018d8313ac5044a61d21222ae24e31" +dependencies = [ + "async-trait", + "cfg-if", + "data-encoding", + "enum-as-inner", + "futures-channel", + "futures-io", + "futures-util", + "idna 0.2.3", + "ipnet", + "lazy_static", + "log", + "rand 0.8.5", + "smallvec", + "thiserror", + "tinyvec", + "tokio", + "url", +] + [[package]] name = "try-lock" version = "0.2.3" @@ -3714,18 +4468,43 @@ dependencies = [ "version_check", ] +[[package]] +name = "unicode-bidi" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992" + [[package]] name = "unicode-ident" version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ceab39d59e4c9499d4e5a8ee0e2735b891bb7308ac83dfb4e80cad195c9f6f3" +[[package]] +name = "unicode-normalization" +version = "0.1.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c5713f0fc4b5db668a2ac63cdb7bb4469d8c9fed047b1d0292cc7b0ce2ba921" +dependencies = [ + "tinyvec", +] + [[package]] name = "unicode-xid" version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" +[[package]] +name = "universal-hash" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f214e8f697e925001e66ec2c6e37a4ef93f0f78c2eed7814394e10c62025b05" +dependencies = [ + "generic-array", + "subtle", +] + [[package]] name = "untrusted" version = "0.7.1" @@ -3743,6 +4522,23 @@ dependencies = [ "syn", ] +[[package]] +name = "url" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d68c799ae75762b8c3fe375feb6600ef5602c883c5d21eb51c09f22b83c4643" +dependencies = [ + "form_urlencoded", + "idna 0.3.0", + "percent-encoding", +] + +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "vcell" version = "0.1.3" @@ -3800,6 +4596,12 @@ dependencies = [ "try-lock", ] +[[package]] +name = "wasi" +version = "0.9.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -4048,3 +4850,18 @@ name = "zeroize" version = "1.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c394b5bd0c6f669e7275d9c20aa90ae064cb22e75a1cad54e1b34088034b149f" +dependencies = [ + "zeroize_derive", +] + +[[package]] +name = "zeroize_derive" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f8f187641dad4f680d25c4bfc4225b418165984179f26ca76ec4fb6441d3a17" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] diff --git a/Cargo.toml b/Cargo.toml index 263f7839a..d6b599152 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "crates/net/p2p", "crates/net/ecies", "crates/net/eth-wire", + "crates/net/discv4", "crates/net/rpc", "crates/net/rpc-api", "crates/net/rpc-types", diff --git a/crates/net/discv4/Cargo.toml b/crates/net/discv4/Cargo.toml new file mode 100644 index 000000000..6cb678e22 --- /dev/null +++ b/crates/net/discv4/Cargo.toml @@ -0,0 +1,42 @@ +[package] +name = "reth-discv4" +version = "0.1.0" +edition = "2021" +license = "Apache-2.0" +repository = "https://github.com/foundry-rs/reth" +readme = "README.md" +description = """ +Ethereum network support +""" + +[dependencies] +# reth +reth-primitives = { path = "../../primitives" } +reth-rlp = { path = "../../common/rlp" } +reth-rlp-derive = { path = "../../common/rlp-derive" } + +# ethereum +discv5 = { git = "https://github.com/sigp/discv5" } +secp256k1 = { version = "0.24", features = [ + "global-context", + "rand-std", + "recovery", +] } + +# async/futures +tokio = { version = "1", features = ["io-util", "net", "time"] } +tokio-stream = "0.1" + +# misc +generic-array = "0.14" +tracing = "0.1" +bytes = "1.2" +thiserror = "1.0" +url = "2.3" +hex = "0.4" +public-ip = "0.2" + +[dev-dependencies] +rand = "0.8" +tokio = { version = "1", features = ["full"] } +tracing-test = "0.2" diff --git a/crates/net/discv4/README.md b/crates/net/discv4/README.md new file mode 100644 index 000000000..8e941e41c --- /dev/null +++ b/crates/net/discv4/README.md @@ -0,0 +1,22 @@ +#

discv4

+ +This is a rust implementation of +the [Discovery v4](https://github.com/ethereum/devp2p/blob/40ab248bf7e017e83cc9812a4e048446709623e8/discv4.md) +peer discovery protocol. + +For comparison to Discovery v5, +see [discv5#comparison-with-node-discovery-v4](https://github.com/ethereum/devp2p/blob/40ab248bf7e017e83cc9812a4e048446709623e8/discv5/discv5.md#comparison-with-node-discovery-v4) + +This is inspired by the [discv5](https://github.com/sigp/discv5) crate and reuses its kademlia implementation. + +## Finding peers + +The discovery service continuously attempts to connect to other nodes on the network until it has found enough peers. +If UPnP (Universal Plug and Play) is supported by the router the service is running on, it will also accept connections +from external nodes. In the discovery protocol, nodes exchange information about where the node can be reached to +eventually establish RLPx sessions. + +## Trouble Shooting + +The discv4 protocol depends on the local system clock. If the clock is not accurate it can cause connectivity issues +because the expiration timestamps might be wrong. \ No newline at end of file diff --git a/crates/net/discv4/src/bootnodes.rs b/crates/net/discv4/src/bootnodes.rs new file mode 100644 index 000000000..ca4b4cdf7 --- /dev/null +++ b/crates/net/discv4/src/bootnodes.rs @@ -0,0 +1,61 @@ +//! Various known bootstrap nodes for networks + +// + +use crate::node::NodeRecord; + +/// Ethereum Foundation Go Bootnodes +pub static MAINNET_BOOTNODES : [&str; 8] = [ + "enode://d860a01f9722d78051619d1e2351aba3f43f943f6f00718d1b9baa4101932a1f5011f16bb2b1bb35db20d6fe28fa0bf09636d26a87d31de9ec6203eeedb1f666@18.138.108.67:30303", // bootnode-aws-ap-southeast-1-001 + "enode://22a8232c3abc76a16ae9d6c3b164f98775fe226f0917b0ca871128a74a8e9630b458460865bab457221f1d448dd9791d24c4e5d88786180ac185df813a68d4de@3.209.45.79:30303", // bootnode-aws-us-east-1-001 + "enode://8499da03c47d637b20eee24eec3c356c9a2e6148d6fe25ca195c7949ab8ec2c03e3556126b0d7ed644675e78c4318b08691b7b57de10e5f0d40d05b09238fa0a@52.187.207.27:30303", // bootnode-azure-australiaeast-001 + "enode://103858bdb88756c71f15e9b5e09b56dc1be52f0a5021d46301dbbfb7e130029cc9d0d6f73f693bc29b665770fff7da4d34f3c6379fe12721b5d7a0bcb5ca1fc1@191.234.162.198:30303", // bootnode-azure-brazilsouth-001 + "enode://715171f50508aba88aecd1250af392a45a330af91d7b90701c436b618c86aaa1589c9184561907bebbb56439b8f8787bc01f49a7c77276c58c1b09822d75e8e8@52.231.165.108:30303", // bootnode-azure-koreasouth-001 + "enode://5d6d7cd20d6da4bb83a1d28cadb5d409b64edf314c0335df658c1a54e32c7c4a7ab7823d57c39b6a757556e68ff1df17c748b698544a55cb488b52479a92b60f@104.42.217.25:30303", // bootnode-azure-westus-001 + "enode://2b252ab6a1d0f971d9722cb839a42cb81db019ba44c08754628ab4a823487071b5695317c8ccd085219c3a03af063495b2f1da8d18218da2d6a82981b45e6ffc@65.108.70.101:30303", // bootnode-hetzner-hel + "enode://4aeb4ab6c14b23e2c4cfdce879c04b0748a20d8e9b59e25ded2a08143e265c6c25936e74cbc8e641e3312ca288673d91f2f93f8e277de3cfa444ecdaaf982052@157.90.35.166:30303", // bootnode-hetzner-fsn +]; + +/// SEPOLIA BOOTNODES +pub static SEPOLIA_BOOTNODES : [&str; 2] = [ + // geth + "enode://9246d00bc8fd1742e5ad2428b80fc4dc45d786283e05ef6edbd9002cbc335d40998444732fbe921cb88e1d2c73d1b1de53bae6a2237996e9bfe14f871baf7066@18.168.182.86:30303", + // besu + "enode://ec66ddcf1a974950bd4c782789a7e04f8aa7110a72569b6e65fcd51e937e74eed303b1ea734e4d19cfaec9fbff9b6ee65bf31dcb50ba79acce9dd63a6aca61c7@52.14.151.177:30303", +]; + +/// GOERLI bootnodes +pub static GOERLI_BOOTNODES : [&str; 7] = [ +// Upstream bootnodes +"enode://011f758e6552d105183b1761c5e2dea0111bc20fd5f6422bc7f91e0fabbec9a6595caf6239b37feb773dddd3f87240d99d859431891e4a642cf2a0a9e6cbb98a@51.141.78.53:30303", +"enode://176b9417f511d05b6b2cf3e34b756cf0a7096b3094572a8f6ef4cdcb9d1f9d00683bf0f83347eebdf3b81c3521c2332086d9592802230bf528eaf606a1d9677b@13.93.54.137:30303", +"enode://46add44b9f13965f7b9875ac6b85f016f341012d84f975377573800a863526f4da19ae2c620ec73d11591fa9510e992ecc03ad0751f53cc02f7c7ed6d55c7291@94.237.54.114:30313", +"enode://b5948a2d3e9d486c4d75bf32713221c2bd6cf86463302339299bd227dc2e276cd5a1c7ca4f43a0e9122fe9af884efed563bd2a1fd28661f3b5f5ad7bf1de5949@18.218.250.66:30303", + +// Ethereum Foundation bootnode +"enode://a61215641fb8714a373c80edbfa0ea8878243193f57c96eeb44d0bc019ef295abd4e044fd619bfc4c59731a73fb79afe84e9ab6da0c743ceb479cbb6d263fa91@3.11.147.67:30303", + +// Goerli Initiative bootnodes +"enode://d4f764a48ec2a8ecf883735776fdefe0a3949eb0ca476bd7bc8d0954a9defe8fea15ae5da7d40b5d2d59ce9524a99daedadf6da6283fca492cc80b53689fb3b3@46.4.99.122:32109", +"enode://d2b720352e8216c9efc470091aa91ddafc53e222b32780f505c817ceef69e01d5b0b0797b69db254c586f493872352f5a022b4d8479a00fc92ec55f9ad46a27e@88.99.70.182:30303", +]; + +/// Returns parsed mainnet nodes +pub fn mainnet_nodes() -> Vec { + parse_nodes(&MAINNET_BOOTNODES[..]) +} + +/// Returns parsed goerli nodes +pub fn goerli_nodes() -> Vec { + parse_nodes(&GOERLI_BOOTNODES[..]) +} + +/// Returns parsed sepolia nodes +pub fn sepolia_nodes() -> Vec { + parse_nodes(&SEPOLIA_BOOTNODES[..]) +} + +/// Parses all the nodes +pub fn parse_nodes(nodes: impl IntoIterator>) -> Vec { + nodes.into_iter().map(|s| s.as_ref().parse().unwrap()).collect() +} diff --git a/crates/net/discv4/src/config.rs b/crates/net/discv4/src/config.rs new file mode 100644 index 000000000..f594d44d5 --- /dev/null +++ b/crates/net/discv4/src/config.rs @@ -0,0 +1,128 @@ +use crate::node::NodeRecord; +use discv5::PermitBanList; +///! A set of configuration parameters to tune the discovery protocol. +// This basis of this file has been taken from the discv5 codebase: +// https://github.com/sigp/discv5 +use std::collections::HashSet; +use std::time::Duration; + +/// Configuration parameters that define the performance of the discovery network. +#[derive(Clone, Debug)] +pub struct Discv4Config { + /// Whether to enable the incoming packet filter. Default: false. + pub enable_packet_filter: bool, + /// The number of retries for each UDP request. Default: 1. + pub request_retries: u8, + /// The time between pings to ensure connectivity amongst connected nodes. Default: 300 + /// seconds. + pub ping_interval: Duration, + /// The duration of we consider a ping timed out. + pub ping_timeout: Duration, + /// The rate at which lookups should be triggered. + pub lookup_interval: Duration, + /// The duration of we consider a FindNode request timed out. + pub find_node_timeout: Duration, + /// The duration we set for neighbours responses + pub neighbours_timeout: Duration, + /// A set of lists that permit or ban IP's or NodeIds from the server. See + /// `crate::PermitBanList`. + pub permit_ban_list: PermitBanList, + /// Set the default duration for which nodes are banned for. This timeouts are checked every 5 + /// minutes, so the precision will be to the nearest 5 minutes. If set to `None`, bans from + /// the filter will last indefinitely. Default is 1 hour. + pub ban_duration: Option, + /// Nodes to boot from. + pub bootstrap_nodes: HashSet, +} + +impl Discv4Config { + /// Returns a new default builder instance + pub fn builder() -> Discv4ConfigBuilder { + Default::default() + } +} + +impl Default for Discv4Config { + fn default() -> Self { + Self { + enable_packet_filter: false, + request_retries: 1, + ping_interval: Duration::from_secs(300), + ping_timeout: Duration::from_secs(5), + find_node_timeout: Duration::from_secs(2), + neighbours_timeout: Duration::from_secs(30), + lookup_interval: Duration::from_secs(20), + permit_ban_list: PermitBanList::default(), + ban_duration: Some(Duration::from_secs(3600)), // 1 hour + bootstrap_nodes: Default::default(), + } + } +} + +#[derive(Debug, Default)] +pub struct Discv4ConfigBuilder { + config: Discv4Config, +} + +impl Discv4ConfigBuilder { + /// Whether to enable the incoming packet filter. + pub fn enable_packet_filter(&mut self) -> &mut Self { + self.config.enable_packet_filter = true; + self + } + + /// The number of retries for each UDP request. + pub fn request_retries(&mut self, retries: u8) -> &mut Self { + self.config.request_retries = retries; + self + } + + /// The time between pings to ensure connectivity amongst connected nodes. + pub fn ping_interval(&mut self, interval: Duration) -> &mut Self { + self.config.ping_interval = interval; + self + } + + /// Sets the timeout for pings + pub fn ping_timeout(&mut self, duration: Duration) -> &mut Self { + self.config.ping_timeout = duration; + self + } + + /// A set of lists that permit or ban IP's or NodeIds from the server. See + /// `crate::PermitBanList`. + pub fn permit_ban_list(&mut self, list: PermitBanList) -> &mut Self { + self.config.permit_ban_list = list; + self + } + + /// Sets the lookup interval duration. + pub fn lookup_interval(&mut self, lookup_interval: Duration) -> &mut Self { + self.config.lookup_interval = lookup_interval; + self + } + + /// Set the default duration for which nodes are banned for. This timeouts are checked every 5 + /// minutes, so the precision will be to the nearest 5 minutes. If set to `None`, bans from + /// the filter will last indefinitely. Default is 1 hour. + pub fn ban_duration(&mut self, ban_duration: Option) -> &mut Self { + self.config.ban_duration = ban_duration; + self + } + + /// Adds a boot node + pub fn add_boot_node(&mut self, node: NodeRecord) -> &mut Self { + self.config.bootstrap_nodes.insert(node); + self + } + + /// Adds multiple boot nodes + pub fn add_boot_nodes(&mut self, nodes: impl IntoIterator) -> &mut Self { + self.config.bootstrap_nodes.extend(nodes); + self + } + + pub fn build(&mut self) -> Discv4Config { + self.config.clone() + } +} diff --git a/crates/net/discv4/src/error.rs b/crates/net/discv4/src/error.rs new file mode 100644 index 000000000..804e084ba --- /dev/null +++ b/crates/net/discv4/src/error.rs @@ -0,0 +1,36 @@ +//! Error types that can occur in this crate. + +use tokio::sync::{mpsc::error::SendError, oneshot::error::RecvError}; + +/// Error thrown when decoding a UDP packet. +#[derive(Debug, thiserror::Error)] +#[allow(missing_docs)] +pub enum DecodePacketError { + #[error("Failed to rlp decode: {0:?}")] + Rlp(#[from] reth_rlp::DecodeError), + #[error("Received packet len too short.")] + PacketTooShort, + #[error("Hash of the header not equals to the hash of the data.")] + HashMismatch, + #[error("Message id {0} is not supported.")] + UnknownMessage(u8), + #[error("Failed to recover public key: {0:?}")] + Secp256k1(#[from] secp256k1::Error), +} + +/// High level errors that can occur when interacting with the discovery service +#[derive(Debug, thiserror::Error)] +pub enum Discv4Error { + /// Failed to send a command over the channel + #[error("Failed to send on a closed channel")] + Send, + /// Failed to receive a command response + #[error(transparent)] + Receive(#[from] RecvError), +} + +impl From> for Discv4Error { + fn from(_: SendError) -> Self { + Discv4Error::Send + } +} diff --git a/crates/net/discv4/src/lib.rs b/crates/net/discv4/src/lib.rs new file mode 100644 index 000000000..ffb45819b --- /dev/null +++ b/crates/net/discv4/src/lib.rs @@ -0,0 +1,1273 @@ +#![warn(missing_docs, unused_crate_dependencies)] +#![deny(unused_must_use, rust_2018_idioms)] +#![doc(test( + no_crate_inject, + attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) +))] + +//! Discovery v4 implementation: +//! +//! Discv4 employs a kademlia-like routing table to store and manage discovered peers and topics. +//! The protocol allows for external IP discovery in NAT environments through regular PING/PONG's +//! with discovered nodes. Nodes return the external IP address that they have received and a simple +//! majority is chosen as our external IP address. If an external IP address is updated, this is +//! produced as an event to notify the swarm (if one is used for this behaviour). +//! +//! This implementation consists of a [`Discv4`] and [`Discv4Service`] pair. The service manages the +//! state and drives the UDP socket. The (optional) [`Discv4`] serves as the frontend to interact +//! with the service via a channel. Whenever the underlying table changes service produces a +//! [`TableUpdate`] that listeners will receive. +use crate::{ + error::{DecodePacketError, Discv4Error}, + node::{kad_key, NodeKey}, + proto::{FindNode, Message, Neighbours, Packet, Ping, Pong}, +}; +use bytes::Bytes; +use discv5::{ + kbucket::{ + Distance, Entry as BucketEntry, InsertResult, KBucketsTable, NodeStatus, + MAX_NODES_PER_BUCKET, + }, + ConnectionDirection, ConnectionState, +}; +use reth_primitives::{H256, H512}; +use secp256k1::SecretKey; +use std::{ + cell::RefCell, + collections::{btree_map, hash_map::Entry, BTreeMap, HashMap, VecDeque}, + future::Future, + io, + net::SocketAddr, + pin::Pin, + rc::Rc, + sync::Arc, + task::{Context, Poll}, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, +}; +use tokio::{ + net::UdpSocket, + sync::{mpsc, mpsc::error::TrySendError, oneshot, oneshot::Sender as OneshotSender}, + task::{JoinHandle, JoinSet}, + time::Interval, +}; +use tokio_stream::{wrappers::ReceiverStream, StreamExt}; +use tracing::{debug, instrument, trace, warn}; + +pub mod bootnodes; +pub mod error; +mod proto; + +mod config; +pub use config::Discv4Config; +mod node; +pub use node::NodeRecord; + +/// reexport to get public ip. +pub use public_ip; + +/// Identifier for nodes. +pub type NodeId = H512; + +/// The default port for discv4 via UDP +/// +/// Note: the default TCP port is the same. +pub const DEFAULT_DISCOVERY_PORT: u16 = 30303; + +/// The maximum size of any packet is 1280 bytes. +const MAX_PACKET_SIZE: usize = 1280; + +/// Length of the UDP datagram packet-header: Hash(32b) + Signature(65b) + Packet Type(1b) +const MIN_PACKET_SIZE: usize = 32 + 65 + 1; + +/// Concurrency factor for `FindNode` requests to pick `ALPHA` closest nodes, +const ALPHA: usize = 3; + +/// Maximum number of nodes to ping at concurrently. 2 full `Neighbours` responses with 16 _new_ +/// nodes. This will apply some backpressure in recursive lookups. +const MAX_NODES_PING: usize = 2 * MAX_NODES_PER_BUCKET; + +/// The size of the datagram is limited [`MAX_PACKET_SIZE`], 16 nodes, as the discv4 specifies don't +/// fit in one datagram. The safe number of nodes that always fit in a datagram is 12, with worst +/// case all of them being IPv6 nodes. This is calculated by `(MAX_PACKET_SIZE - (header + expire + +/// rlp overhead) / size(rlp(Node_IPv6))` +const SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS: usize = (MAX_PACKET_SIZE - 109) / 91; + +/// The timeout used to identify expired nodes +const NODE_LAST_SEEN_TIMEOUT: Duration = Duration::from_secs(24 * 60 * 60); + +type EgressSender = mpsc::Sender<(Bytes, SocketAddr)>; +type EgressReceiver = mpsc::Receiver<(Bytes, SocketAddr)>; + +type IngressSender = mpsc::Sender; +type IngressReceiver = mpsc::Receiver; + +type NodeRecordSender = OneshotSender>; + +/// The Discv4 frontend +#[derive(Debug, Clone)] +pub struct Discv4 { + /// The address of the udp socket + local_addr: SocketAddr, + /// channel to send commands over to the service + to_service: mpsc::Sender, +} + +// === impl Discv4 === + +impl Discv4 { + /// Same as [`Self::bind`] but also spawns the service onto a new task, + /// [`Discv4Service::spawn()`] + pub async fn spawn( + local_address: SocketAddr, + local_enr: NodeRecord, + secret_key: SecretKey, + config: Discv4Config, + ) -> io::Result { + let (discv4, service) = Self::bind(local_address, local_enr, secret_key, config).await?; + + let _ = service.spawn(); + + Ok(discv4) + } + + /// Binds a new UdpSocket and creates the service + /// + /// ``` + /// # use std::io; + /// use std::net::SocketAddr; + /// use std::str::FromStr; + /// use rand::thread_rng; + /// use secp256k1::SECP256K1; + /// use reth_discv4::{Discv4, Discv4Config, NodeId, NodeRecord}; + /// # async fn t() -> io::Result<()> { + /// // generate a (random) keypair + /// let mut rng = thread_rng(); + /// let (secret_key, pk) = SECP256K1.generate_keypair(&mut rng); + /// let id = NodeId::from_slice(&pk.serialize_uncompressed()[1..]); + /// + /// let socket = SocketAddr::from_str("0.0.0.0:0").unwrap(); + /// let local_enr = NodeRecord { + /// address: socket.ip(), + /// tcp_port: socket.port(), + /// udp_port: socket.port(), + /// id, + /// }; + /// let config = Discv4Config::default(); + /// + /// let(discv4, mut service) = Discv4::bind(socket, local_enr, secret_key, config).await.unwrap(); + /// + /// // get an update strea + /// let mut updates = service.update_stream(); + /// + /// let _handle = service.spawn(); + /// + /// // lookup the local node in the DHT + /// let _discovered = discv4.lookup_self().await.unwrap(); + /// + /// # Ok(()) + /// # } + /// ``` + pub async fn bind( + local_address: SocketAddr, + mut local_enr: NodeRecord, + secret_key: SecretKey, + config: Discv4Config, + ) -> io::Result<(Self, Discv4Service)> { + let socket = UdpSocket::bind(local_address).await?; + let local_addr = socket.local_addr()?; + local_enr.udp_port = local_addr.port(); + trace!(?local_addr, target = "net::disc", "opened UDP socket"); + + // We don't expect many commands, so the buffer can be quite small here. + let (to_service, rx) = mpsc::channel(5); + let service = + Discv4Service::new(socket, local_addr, local_enr, secret_key, config, Some(rx)); + let discv4 = Discv4 { local_addr, to_service }; + Ok((discv4, service)) + } + + /// Returns the address of the UDP socket. + pub fn local_addr(&self) -> SocketAddr { + self.local_addr + } + + /// Starts a `FindNode` recursive lookup that locates the closest nodes to the given node id. See also: + /// + /// The lookup initiator starts by picking α closest nodes to the target it knows of. The + /// initiator then sends concurrent FindNode packets to those nodes. α is a system-wide + /// concurrency parameter, such as 3. In the recursive step, the initiator resends FindNode to + /// nodes it has learned about from previous queries. Of the k nodes the initiator has heard of + /// closest to the target, it picks α that it has not yet queried and resends FindNode to them. + /// Nodes that fail to respond quickly are removed from consideration until and unless they do + /// respond. + // + // If a round of FindNode queries fails to return a node any closer than the closest already + // seen, the initiator resends the find node to all of the k closest nodes it has not already + // queried. The lookup terminates when the initiator has queried and gotten responses from the k + // closest nodes it has seen. + pub async fn lookup_self(&self) -> Result, Discv4Error> { + self.lookup_node(None).await + } + + /// Looks up the given node id + pub async fn lookup(&self, node_id: NodeId) -> Result, Discv4Error> { + self.lookup_node(Some(node_id)).await + } + + async fn lookup_node(&self, node_id: Option) -> Result, Discv4Error> { + let (tx, rx) = oneshot::channel(); + let cmd = Discv4Command::Lookup { node_id, tx }; + self.to_service.send(cmd).await?; + Ok(rx.await?) + } + + /// Returns the receiver half of new listener channel that streams [`TableUpdate`]s. + pub async fn update_stream(&self) -> Result, Discv4Error> { + let (tx, rx) = oneshot::channel(); + let cmd = Discv4Command::Updates(tx); + self.to_service.send(cmd).await?; + Ok(rx.await?) + } +} + +/// Manages discv4 peer discovery over UDP. +#[must_use = "Stream does nothing unless polled"] +pub struct Discv4Service { + /// Local address of the UDP socket. + local_address: SocketAddr, + /// Local ENR of the server. + local_enr: NodeRecord, + /// The secret key used to sign payloads + secret_key: SecretKey, + /// The UDP socket for sending and receiving messages. + _socket: Arc, + /// The spawned UDP tasks. + /// + /// Note: If dropped, the spawned send+receive tasks are aborted. + _tasks: JoinSet<()>, + /// The routing table. + kbuckets: KBucketsTable, + /// Whether to respect timestamps + check_timestamps: bool, + /// Receiver for incoming messages + ingress: IngressReceiver, + /// Sender for sending outgoing messages + egress: EgressSender, + /// Buffered pending pings to apply backpressure. + /// + /// Lookups behave like bursts of requests: Endpoint proof followed by `FindNode` request. [Recursive lookups](https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup) can trigger multiple followup Pings+FindNode requests. + /// A cap on concurrent `Ping` prevents escalation where: A large number of new nodes + /// discovered via `FindNode` in a recursive lookup triggers a large number of `Ping`s, and + /// followup `FindNode` requests.... Buffering them effectively prevents high `Ping` peaks. + queued_pings: VecDeque<(NodeRecord, PingReason)>, + /// Currently active pings to specific nodes. + pending_pings: HashMap, + /// Currently active FindNode requests + pending_find_nodes: HashMap, + /// Commands listener + commands_rx: Option>, + /// All subscribers for table updates + update_listeners: Vec>, + /// The interval when to trigger lookups + lookup_interval: Interval, + /// Used to rotate targets to lookup + lookup_rotator: LookupTargetRotator, + /// Interval when to recheck active requests + evict_expired_requests_interval: Interval, + /// Interval when to resend pings. + ping_interval: Interval, + /// How this services is configured + config: Discv4Config, +} + +impl Discv4Service { + /// Create a new instance for a bound [`UdpSocket`]. + pub(crate) fn new( + socket: UdpSocket, + local_address: SocketAddr, + local_enr: NodeRecord, + secret_key: SecretKey, + config: Discv4Config, + commands_rx: Option>, + ) -> Self { + // Heuristic limit for channel buffer size, which is correlated with the number of + // concurrent requests and bucket size. This should be large enough to cover multiple + // lookups while also anticipating incoming requests. + const UDP_CHANNEL_BUFFER: usize = MAX_NODES_PER_BUCKET * ALPHA * (ALPHA * 2); + let socket = Arc::new(socket); + let (ingress_tx, ingress_rx) = mpsc::channel(UDP_CHANNEL_BUFFER); + let (egress_tx, egress_rx) = mpsc::channel(UDP_CHANNEL_BUFFER); + let mut tasks = JoinSet::<()>::new(); + + let udp = Arc::clone(&socket); + tasks.spawn(async move { receive_loop(udp, ingress_tx, local_enr.id).await }); + + let udp = Arc::clone(&socket); + tasks.spawn(async move { send_loop(udp, egress_rx).await }); + + let kbuckets = KBucketsTable::new( + local_enr.key(), + Duration::from_secs(60), + MAX_NODES_PER_BUCKET, + None, + None, + ); + + // delay the first lookup for a bit to give the bootstrap process a chance to resolve + // entries first + let self_lookup_interval = tokio::time::interval_at( + tokio::time::Instant::now() + config.ping_timeout / 2, + config.lookup_interval, + ); + + let ping_interval = tokio::time::interval(config.ping_interval); + + let evict_expired_requests_interval = tokio::time::interval(config.find_node_timeout); + + Discv4Service { + local_address, + local_enr, + _socket: socket, + kbuckets, + secret_key, + _tasks: tasks, + ingress: ingress_rx, + egress: egress_tx, + queued_pings: Default::default(), + pending_pings: Default::default(), + pending_find_nodes: Default::default(), + check_timestamps: false, + commands_rx, + update_listeners: Vec::with_capacity(1), + lookup_interval: self_lookup_interval, + ping_interval, + evict_expired_requests_interval, + config, + lookup_rotator: Default::default(), + } + } + + /// Returns the address of the UDP socket + pub fn local_address(&self) -> SocketAddr { + self.local_address + } + + /// Bootstraps the local node to join the DHT. + /// + /// Bootstrapping is a multi-step operation that starts with a lookup of the local node's + /// own ID in the DHT. This introduces the local node to the other nodes + /// in the DHT and populates its routing table with the closest proven neighbours. + /// + /// This is equivalent to adding all bootnodes via [`Self::add_node()`]. + /// + /// **Note:** This is a noop if there are no bootnodes. + pub fn bootstrap(&mut self) { + for node in self.config.bootstrap_nodes.clone() { + debug!(?node, target = "net::disc", "Adding bootstrap node"); + self.add_node(node); + } + } + + /// Spawns this services onto a new task + /// + /// Note: requires a running runtime + pub fn spawn(mut self) -> JoinHandle<()> { + tokio::task::spawn(async move { + self.bootstrap(); + self.await + }) + } + + /// Creates a new channel for [`TableUpdate`]s + pub fn update_stream(&mut self) -> ReceiverStream { + let (tx, rx) = mpsc::channel(512); + self.update_listeners.push(tx); + ReceiverStream::new(rx) + } + + /// Looks up the local node in the DHT. + pub fn lookup_self(&mut self) { + self.lookup(self.local_enr.id) + } + + /// Looks up the given node in the DHT + /// + /// A FindNode packet requests information about nodes close to target. The target is a 64-byte + /// secp256k1 public key. When FindNode is received, the recipient should reply with Neighbors + /// packets containing the closest 16 nodes to target found in its local table. + // + // To guard against traffic amplification attacks, Neighbors replies should only be sent if the + // sender of FindNode has been verified by the endpoint proof procedure. + pub fn lookup(&mut self, target: NodeId) { + self.lookup_with(target, None) + } + + /// Starts the recursive lookup process for the given target, . + /// + /// At first the `ALPHA` (==3, defined concurrency factor) nodes that are closest to the target + /// in the underlying DHT are selected to seed the lookup via `FindNode` requests. In the + /// recursive step, the initiator resends FindNode to nodes it has learned about from previous + /// queries. + /// + /// This takes an optional Sender through which all successfully discovered nodes are sent once + /// the request has finished. + #[instrument(skip_all, fields(?target), target = "net::discv4")] + fn lookup_with(&mut self, target: NodeId, tx: Option) { + trace!("Starting lookup"); + let key = kad_key(target); + + // Start a lookup context with the 16 (MAX_NODES_PER_BUCKET) closest nodes + let ctx = LookupContext::new( + target, + self.kbuckets + .closest_values(&key) + .take(MAX_NODES_PER_BUCKET) + .map(|n| (key.distance(&n.key), n.value.record)), + tx, + ); + + // From those 16, pick the 3 closest to start the lookup. + let closest = ctx.closest(ALPHA); + + trace!(num = closest.len(), "Start lookup closest nodes"); + + for node in closest { + self.find_node(&node, ctx.clone()); + } + } + + /// Sends a new `FindNode` packet to the node with `target` as the lookup target. + fn find_node(&mut self, node: &NodeRecord, ctx: LookupContext) { + trace!(?node, lookup=?ctx.target(), target = "net::disc", "Sending FindNode"); + ctx.mark_queried(node.id); + let id = ctx.target(); + let msg = Message::FindNode(FindNode { id, expire: self.find_node_timeout() }); + self.send_packet(msg, node.udp_addr()); + self.pending_find_nodes.insert(node.id, FindNodeRequest::new(ctx)); + } + + /// Gets the number of entries that are considered connected. + pub fn num_connected(&self) -> usize { + self.kbuckets.buckets_iter().fold(0, |count, bucket| count + bucket.num_connected()) + } + + /// Notifies all listeners + fn notify(&mut self, update: TableUpdate) { + self.update_listeners.retain_mut(|listener| match listener.try_send(update.clone()) { + Ok(()) => true, + Err(err) => match err { + TrySendError::Full(_) => true, + TrySendError::Closed(_) => false, + }, + }); + } + + /// Removes a `node_id` from the routing table. + /// + /// This allows applications, for whatever reason, to remove nodes from the local routing + /// table. Returns `true` if the node was in the table and `false` otherwise. + pub fn remove_node(&mut self, node_id: NodeId) -> bool { + let key = kad_key(node_id); + let removed = self.kbuckets.remove(&key); + if removed { + self.notify(TableUpdate::Removed(node_id)); + } + removed + } + + /// Updates the node entry + fn update_node(&mut self, record: NodeRecord) { + if record.id == self.local_enr.id { + return + } + let key = kad_key(record.id); + let entry = NodeEntry { record, last_seen: Instant::now() }; + + match self.kbuckets.insert_or_update( + &key, + entry, + NodeStatus { + state: ConnectionState::Connected, + direction: ConnectionDirection::Outgoing, + }, + ) { + InsertResult::Inserted => { + trace!(?record, target = "net::disc", "inserted new record to table"); + self.notify(TableUpdate::Added(record)); + } + InsertResult::ValueUpdated { .. } | InsertResult::Updated { .. } => { + trace!(?record, target = "net::disc", "updated record"); + } + res => { + debug!(?record, ?res, target = "net::disc", "failed to insert"); + } + } + } + + /// If the node's not in the table yet, this will start a ping to get it added on ping. + pub fn add_node(&mut self, record: NodeRecord) { + let key = kad_key(record.id); + + #[allow(clippy::single_match)] + match self.kbuckets.entry(&key) { + BucketEntry::Absent(_) => self.try_ping(record, PingReason::Normal), + _ => { + // is already in the table + } + } + } + + /// Encodes the packet, sends it and returns the hash. + fn send_packet(&mut self, msg: Message, to: SocketAddr) -> H256 { + let (payload, hash) = msg.encode(&self.secret_key); + trace!(r#type=?msg.msg_type(), ?to, ?hash, target = "net::disc", "sending packet"); + let _ = self.egress.try_send((payload, to)); + hash + } + + /// Message handler for an incoming `Ping` + fn on_ping(&mut self, ping: Ping, remote_addr: SocketAddr, remote_id: NodeId, hash: H256) { + // update the record + let record = NodeRecord { + address: ping.from.address, + tcp_port: ping.from.tcp_port, + udp_port: ping.from.udp_port, + id: remote_id, + }; + + self.add_node(record); + + // send the pong + let msg = Message::Pong(Pong { to: ping.from, echo: hash, expire: ping.expire }); + self.send_packet(msg, remote_addr); + } + + // Guarding function for [`Self::send_ping`] that applies pre-checks + fn try_ping(&mut self, node: NodeRecord, reason: PingReason) { + if self.pending_pings.contains_key(&node.id) || + self.pending_find_nodes.contains_key(&node.id) + { + return + } + + if self.queued_pings.iter().any(|(n, _)| n.id == node.id) { + return + } + + if self.pending_pings.len() < MAX_NODES_PING { + self.send_ping(node, reason) + } else { + self.queued_pings.push_back((node, reason)) + } + } + + /// Sends a ping message to the node's UDP address. + fn send_ping(&mut self, node: NodeRecord, reason: PingReason) { + let remote_addr = node.udp_addr(); + let id = node.id; + let ping = + Ping { from: self.local_enr.into(), to: node.into(), expire: self.ping_timeout() }; + trace!(?ping, target = "net::disc", "sending ping"); + let echo_hash = self.send_packet(Message::Ping(ping), remote_addr); + + self.pending_pings + .insert(id, PingRequest { sent_at: Instant::now(), node, echo_hash, reason }); + } + + /// Message handler for an incoming `Pong`. + fn on_pong(&mut self, pong: Pong, remote_addr: SocketAddr, remote_id: NodeId) { + if self.is_expired(pong.expire) { + return + } + + let PingRequest { node, reason, .. } = match self.pending_pings.entry(remote_id) { + Entry::Occupied(entry) => { + { + let request = entry.get(); + if request.echo_hash != pong.echo { + debug!(from=?remote_addr, expected=?request.echo_hash, echo_hash=?pong.echo, target = "net::disc", "Got unexpected Pong"); + return + } + } + entry.remove() + } + Entry::Vacant(_) => return, + }; + + match reason { + PingReason::Normal => { + self.update_node(node); + } + PingReason::FindNode(target, status) => { + // update the status of the node + match status { + NodeEntryStatus::Expired | NodeEntryStatus::Valid => { + // update node in the table + self.update_node(node) + } + NodeEntryStatus::IsLocal | NodeEntryStatus::Unknown => {} + } + // Received a pong for a discovery request + self.respond_closest(target, remote_addr); + } + PingReason::Lookup(node, ctx) => { + self.update_node(node); + self.find_node(&node, ctx); + } + } + } + + /// Handler for incoming `FindNode` message + fn on_find_node(&mut self, msg: FindNode, remote_addr: SocketAddr, node_id: NodeId) { + match self.node_status(node_id, remote_addr) { + NodeEntryStatus::IsLocal => { + // received address from self + } + NodeEntryStatus::Valid => self.respond_closest(msg.id, remote_addr), + status => { + // try to ping again + let node = NodeRecord { + address: remote_addr.ip(), + tcp_port: remote_addr.port(), + udp_port: remote_addr.port(), + id: node_id, + }; + self.try_ping(node, PingReason::FindNode(msg.id, status)) + } + } + } + + /// Handler for incoming `Neighbours` messages that are handled if they're responses to + /// `FindNode` requests + fn on_neighbours(&mut self, msg: Neighbours, remote_addr: SocketAddr, node_id: NodeId) { + // check if this request was expected + let ctx = match self.pending_find_nodes.entry(node_id) { + Entry::Occupied(mut entry) => { + { + let request = entry.get_mut(); + // Mark the request as answered + request.answered = true; + let total = request.response_count + msg.nodes.len(); + // Neighbours response is exactly 1 bucket (16 entries). + if total <= MAX_NODES_PER_BUCKET { + request.response_count = total; + } else { + debug!(total, from=?remote_addr, target = "net::disc", "Got oversized Neighbors packet"); + return + } + }; + + if entry.get().response_count == MAX_NODES_PER_BUCKET { + let ctx = entry.remove().lookup_context; + ctx.mark_responded(node_id); + ctx + } else { + entry.get().lookup_context.clone() + } + } + Entry::Vacant(_) => { + debug!(from=?remote_addr, target = "net::disc", "Received unsolicited Neighbours"); + return + } + }; + + let our_key = kad_key(self.local_enr.id); + + // This is the recursive lookup step where we initiate new FindNode requests for new nodes + // that where discovered. + for node in msg.nodes { + let key = kad_key(node.id); + let distance = our_key.distance(&key); + ctx.add_node(distance, node); + } + + // get the next closest nodes, not yet queried nodes and start over. + let closest = ctx.closest(ALPHA); + + for closest in closest { + let key = kad_key(closest.id); + match self.kbuckets.entry(&key) { + BucketEntry::Absent(_) => { + self.try_ping(closest, PingReason::Lookup(closest, ctx.clone())) + } + _ => self.find_node(&closest, ctx.clone()), + } + } + } + + /// Sends a Neighbours packet for `target` to the given addr + fn respond_closest(&mut self, target: NodeId, to: SocketAddr) { + let key = kad_key(target); + let expire = self.send_neighbours_timeout(); + let all_nodes = self.kbuckets.closest_values(&key).collect::>(); + + for nodes in all_nodes.chunks(SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS) { + let nodes = nodes.iter().map(|node| node.value.record).collect::>(); + trace!(len = nodes.len(), to=?to, target = "net::disc", "Sent neighbours packet"); + let msg = Message::Neighbours(Neighbours { nodes, expire }); + self.send_packet(msg, to); + } + } + + /// Returns the current status of the node + fn node_status(&mut self, node: NodeId, addr: SocketAddr) -> NodeEntryStatus { + if node == self.local_enr.id { + debug!(?node, target = "net::disc", "Got an incoming discovery request from self"); + return NodeEntryStatus::IsLocal + } + let key = kad_key(node); + + // Determines the status of the node based on the given address and the last observed + // timestamp + + if let Some(node) = self.kbuckets.get_bucket(&key).and_then(|bucket| bucket.get(&key)) { + match (node.value.is_expired(), node.value.record.udp_addr() == addr) { + (false, true) => { + // valid node + NodeEntryStatus::Valid + } + (true, true) => { + // expired + NodeEntryStatus::Expired + } + _ => NodeEntryStatus::Unknown, + } + } else { + NodeEntryStatus::Unknown + } + } + + fn evict_expired_requests(&mut self, now: Instant) { + let mut nodes_to_expire = Vec::new(); + self.pending_pings.retain(|node_id, ping_request| { + if now.duration_since(ping_request.sent_at) > self.config.ping_timeout { + nodes_to_expire.push(*node_id); + return false + } + true + }); + self.pending_find_nodes.retain(|node_id, find_node_request| { + if now.duration_since(find_node_request.sent_at) > self.config.find_node_timeout { + if !find_node_request.answered { + nodes_to_expire.push(*node_id); + } + return false + } + true + }); + for node_id in nodes_to_expire { + self.expire_node_request(node_id); + } + } + + /// Send some pings + fn reping_oldest(&mut self) { + let mut nodes = self.kbuckets.iter_ref().map(|n| n.node.value).collect::>(); + nodes.sort_by(|a, b| a.last_seen.cmp(&b.last_seen)); + let to_ping = nodes.into_iter().map(|n| n.record).take(MAX_NODES_PING).collect::>(); + for node in to_ping { + self.try_ping(node, PingReason::Normal) + } + } + + /// Removes the node from the table + fn expire_node_request(&mut self, node_id: NodeId) { + let key = kad_key(node_id); + self.kbuckets.remove(&key); + } + + /// Returns true if the expiration timestamp is considered invalid. + fn is_expired(&self, expiration: u64) -> bool { + self.ensure_timestamp(expiration).is_err() + } + + /// Validate that given timestamp is not expired. + fn ensure_timestamp(&self, expiration: u64) -> Result<(), ()> { + let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs(); + if self.check_timestamps && expiration < now { + debug!(target: "net::disc", "Expired packet"); + return Err(()) + } + Ok(()) + } + + /// Pops buffered ping requests and sends them. + fn ping_buffered(&mut self) { + while self.pending_pings.len() < MAX_NODES_PING { + match self.queued_pings.pop_front() { + Some((next, reason)) => self.try_ping(next, reason), + None => break, + } + } + } + + fn ping_timeout(&self) -> u64 { + (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.ping_timeout).as_secs() + } + + fn find_node_timeout(&self) -> u64 { + (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.find_node_timeout) + .as_secs() + } + + fn send_neighbours_timeout(&self) -> u64 { + (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.neighbours_timeout) + .as_secs() + } + + /// Polls the socket and advances the state. + /// + /// To prevent traffic amplification attacks, implementations must verify that the sender of a + /// query participates in the discovery protocol. The sender of a packet is considered verified + /// if it has sent a valid Pong response with matching ping hash within the last 12 hours. + pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> { + // trigger self lookup + if self.lookup_interval.poll_tick(cx).is_ready() { + let target = self.lookup_rotator.next(&self.local_enr.id); + self.lookup_with(target, None); + } + + // evict expired nodes + if self.evict_expired_requests_interval.poll_tick(cx).is_ready() { + self.evict_expired_requests(Instant::now()) + } + + // reping some peers + if self.ping_interval.poll_tick(cx).is_ready() { + self.reping_oldest(); + } + + // process all incoming commands + if let Some(mut rx) = self.commands_rx.take() { + let mut is_done = false; + while let Poll::Ready(cmd) = rx.poll_recv(cx) { + if let Some(cmd) = cmd { + match cmd { + Discv4Command::Lookup { node_id, tx } => { + let node_id = node_id.unwrap_or(self.local_enr.id); + self.lookup_with(node_id, Some(tx)); + } + Discv4Command::Updates(tx) => { + let rx = self.update_stream(); + let _ = tx.send(rx); + } + } + } else { + is_done = true; + break + } + } + if !is_done { + self.commands_rx = Some(rx); + } + } + + // process all incoming datagrams + while let Poll::Ready(Some(event)) = self.ingress.poll_recv(cx) { + match event { + IngressEvent::RecvError(_) => {} + IngressEvent::BadPacket(from, err, data) => { + warn!(?from, ?err, packet=?hex::encode(&data), target = "net::disc", "bad packet"); + } + IngressEvent::Packet(remote_addr, Packet { msg, node_id, hash }) => { + trace!(r#type=?msg.msg_type(), from=?remote_addr, target = "net::disc", "received packet"); + match msg { + Message::Ping(ping) => { + self.on_ping(ping, remote_addr, node_id, hash); + } + Message::Pong(pong) => { + self.on_pong(pong, remote_addr, node_id); + } + Message::FindNode(msg) => { + self.on_find_node(msg, remote_addr, node_id); + } + Message::Neighbours(msg) => { + self.on_neighbours(msg, remote_addr, node_id); + } + } + } + } + } + + // try resending buffered pings + self.ping_buffered(); + + Poll::Pending + } +} + +/// Endless future impl +impl Future for Discv4Service { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.get_mut().poll(cx) + } +} + +/// Continuously reads new messages from the channel and writes them to the socket +async fn send_loop(udp: Arc, rx: EgressReceiver) { + let mut stream = ReceiverStream::new(rx); + while let Some((payload, to)) = stream.next().await { + match udp.send_to(&payload, to).await { + Ok(size) => { + trace!(?to, ?size, target = "net::disc", "sent payload"); + } + Err(err) => { + warn!(?to, ?err, target = "net::disc", "Failed to send datagram."); + } + } + } +} + +/// Continuously awaits new incoming messages and sends them back through the channel. +async fn receive_loop(udp: Arc, tx: IngressSender, local_id: NodeId) { + loop { + let mut buf = [0; MAX_PACKET_SIZE]; + let res = udp.recv_from(&mut buf).await; + match res { + Err(err) => { + warn!(?err, target = "net::disc", "Failed to read datagram."); + let _ = tx.send(IngressEvent::RecvError(err)).await; + } + Ok((read, remote_addr)) => { + let packet = &buf[..read]; + match Message::decode(packet) { + Ok(packet) => { + if packet.node_id == local_id { + // received our own message + warn!(?remote_addr, target = "net::disc", "Received own packet."); + continue + } + let _ = tx.send(IngressEvent::Packet(remote_addr, packet)).await; + } + Err(err) => { + warn!(?err, target = "net::disc", "Failed to decode packet"); + let _ = tx + .send(IngressEvent::BadPacket(remote_addr, err, packet.to_vec())) + .await; + } + } + } + } + } +} + +/// The commands sent from the frontend to the service +enum Discv4Command { + Lookup { node_id: Option, tx: NodeRecordSender }, + Updates(OneshotSender>), +} + +/// Event type receiver produces +pub(crate) enum IngressEvent { + /// Encountered an error when reading a datagram message. + RecvError(io::Error), + /// Received a bad message + BadPacket(SocketAddr, DecodePacketError, Vec), + /// Received a datagram from an address. + Packet(SocketAddr, Packet), +} + +/// Tracks a sent ping +struct PingRequest { + // Timestamp when the request was sent. + sent_at: Instant, + // Node to which the request was sent. + node: NodeRecord, + // Hash sent in the Ping request + echo_hash: H256, + /// Why this ping was sent. + reason: PingReason, +} + +/// Rotates the NodeId that is periodically looked up. +/// +/// By selecting different targets, the lookups will be seeded with different ALPHA seed nodes. +#[derive(Debug)] +struct LookupTargetRotator { + interval: usize, + counter: usize, +} + +impl Default for LookupTargetRotator { + fn default() -> Self { + Self { + // every 4th lookup is our own node + interval: 4, + counter: 3, + } + } +} + +impl LookupTargetRotator { + /// this will return the next node id to lookup + fn next(&mut self, local: &NodeId) -> NodeId { + self.counter += 1; + self.counter %= self.interval; + if self.counter == 0 { + return *local + } + NodeId::random() + } +} + +/// Tracks lookups across multiple `FindNode` requests. +/// +/// If this type is dropped by all +#[derive(Clone)] +struct LookupContext { + inner: Rc, +} + +impl LookupContext { + /// Create new context for a recursive lookup + fn new( + target: NodeId, + nearest_nodes: impl IntoIterator, + listener: Option, + ) -> Self { + let closest_nodes = nearest_nodes + .into_iter() + .map(|(distance, record)| { + (distance, QueryNode { record, queried: false, responded: false }) + }) + .collect(); + + let inner = Rc::new(LookupContextInner { + target, + closest_nodes: RefCell::new(closest_nodes), + listener, + }); + Self { inner } + } + + /// Returns the target of this lookup + fn target(&self) -> NodeId { + self.inner.target + } + + /// Returns an iterator over the closest nodes. + fn closest(&self, num: usize) -> Vec { + self.inner + .closest_nodes + .borrow() + .iter() + .filter(|(_, node)| !node.queried) + .map(|(_, n)| n.record) + .take(num) + .collect() + } + + /// Inserts the node if it's missing + fn add_node(&self, distance: Distance, record: NodeRecord) { + let mut closest = self.inner.closest_nodes.borrow_mut(); + if let btree_map::Entry::Vacant(entry) = closest.entry(distance) { + entry.insert(QueryNode { record, queried: false, responded: false }); + } + } + + /// Marks the node as queried + fn mark_queried(&self, id: NodeId) { + if let Some((_, node)) = + self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id) + { + node.queried = true; + } + } + + /// Marks the node as responded + fn mark_responded(&self, id: NodeId) { + if let Some((_, node)) = + self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id) + { + node.responded = true; + } + } +} + +// SAFETY: The [`Discv4Service`] is intended to be spawned as task which requires `Send`. +// The `LookupContext` is shared by all active `FindNode` requests that are part of the lookup step. +// Which can modify the context. The shared context is only ever accessed mutably when a `Neighbour` +// response is processed and all Clones are stored inside [`Discv4Service`], in other words it is +// guaranteed that there's only 1 owner ([`Discv4Service`]) of all possible [`Rc`] clones of +// [`LookupContext`]. +unsafe impl Send for LookupContext {} + +struct LookupContextInner { + target: NodeId, + /// The closest nodes + closest_nodes: RefCell>, + /// A listener for all the nodes retrieved in this lookup + listener: Option, +} + +impl Drop for LookupContextInner { + fn drop(&mut self) { + if let Some(tx) = self.listener.take() { + // there's only 1 instance shared across `FindNode` requests, if this is dropped then + // all requests finished, and we can send all results back + let nodes = self + .closest_nodes + .take() + .into_values() + .filter(|node| node.responded) + .map(|node| node.record) + .collect(); + let _ = tx.send(nodes); + } + } +} + +/// Tracks the state of a recursive lookup step +#[derive(Debug, Clone, Copy)] +struct QueryNode { + record: NodeRecord, + queried: bool, + responded: bool, +} + +struct FindNodeRequest { + // Timestamp when the request was sent. + sent_at: Instant, + // Number of items sent by the node + response_count: usize, + // Whether the request has been answered yet. + answered: bool, + /// Response buffer + lookup_context: LookupContext, +} + +// === impl FindNodeRequest === + +impl FindNodeRequest { + fn new(resp: LookupContext) -> Self { + Self { sent_at: Instant::now(), response_count: 0, answered: false, lookup_context: resp } + } +} + +/// Stored node info. +#[derive(Debug, Clone, Eq, PartialEq)] +struct NodeEntry { + /// Node record info. + record: NodeRecord, + /// Timestamp of last pong. + last_seen: Instant, +} + +/// The status ofs a specific node +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum NodeEntryStatus { + /// Node is the local node, ourselves + IsLocal, + /// Node is missing in the table + Unknown, + /// Expired node, last seen timestamp is too long in the past + Expired, + /// Valid node, ready to interact with + Valid, +} + +// === impl NodeEntry === + +impl NodeEntry { + /// Returns true if the node is considered expired. + fn is_expired(&self) -> bool { + self.last_seen.elapsed() > NODE_LAST_SEEN_TIMEOUT + } +} + +/// Represents why a ping is issued +enum PingReason { + /// Standard ping + Normal, + /// Ping issued to adhere to endpoint proof procedure + /// + /// Once the expected PONG is received, the endpoint proof is complete and the find node can be + /// answered. + FindNode(NodeId, NodeEntryStatus), + /// Part of a lookup to ensure endpoint is proven. + Lookup(NodeRecord, LookupContext), +} + +/// Represents state changes in the underlying node table +#[derive(Debug, Clone)] +pub enum TableUpdate { + /// A new node was inserted to the table. + Added(NodeRecord), + /// Node that was removed from the table + Removed(NodeId), + /// A series of updates + Batch(Vec), +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::bootnodes::mainnet_nodes; + use rand::thread_rng; + use secp256k1::SECP256K1; + use std::str::FromStr; + use tracing_test::traced_test; + + async fn create() -> (Discv4, Discv4Service) { + create_with_config(Default::default()).await + } + + async fn create_with_config(config: Discv4Config) -> (Discv4, Discv4Service) { + let mut rng = thread_rng(); + let socket = SocketAddr::from_str("0.0.0.0:30303").unwrap(); + let (secret_key, pk) = SECP256K1.generate_keypair(&mut rng); + let id = NodeId::from_slice(&pk.serialize_uncompressed()[1..]); + let external_addr = public_ip::addr().await.unwrap_or_else(|| socket.ip()); + let local_enr = NodeRecord { + address: external_addr, + tcp_port: socket.port(), + udp_port: socket.port(), + id, + }; + Discv4::bind(socket, local_enr, secret_key, config).await.unwrap() + } + + #[tokio::test] + #[traced_test] + async fn test_pending_ping() { + let (_, mut service) = create().await; + + let local_addr = service.local_address(); + + for idx in 0..MAX_NODES_PING { + let node = NodeRecord::new(local_addr, NodeId::random()); + service.add_node(node); + assert!(service.pending_pings.contains_key(&node.id)); + assert_eq!(service.pending_pings.len(), idx + 1); + } + } + + #[tokio::test(flavor = "multi_thread")] + #[traced_test] + #[ignore] + async fn test_lookup() { + let all_nodes = mainnet_nodes(); + let config = Discv4Config::builder().add_boot_nodes(all_nodes).build(); + let (_discv4, mut service) = create_with_config(config).await; + + let mut updates = service.update_stream(); + + let _handle = service.spawn(); + + let mut table = HashMap::new(); + while let Some(update) = updates.next().await { + match update { + TableUpdate::Added(record) => { + table.insert(record.id, record); + } + TableUpdate::Removed(id) => { + table.remove(&id); + } + TableUpdate::Batch(_) => {} + } + println!("total peers {}", table.len()); + } + } +} diff --git a/crates/net/discv4/src/node.rs b/crates/net/discv4/src/node.rs new file mode 100644 index 000000000..3a7f4016b --- /dev/null +++ b/crates/net/discv4/src/node.rs @@ -0,0 +1,211 @@ +use crate::{proto::Octets, NodeId}; +use bytes::{Buf, BufMut}; +use generic_array::GenericArray; +use reth_primitives::keccak256; +use reth_rlp::{Decodable, DecodeError, Encodable}; +use reth_rlp_derive::RlpEncodable; +use std::{ + net::{IpAddr, Ipv4Addr, SocketAddr}, + str::FromStr, +}; +use url::{Host, Url}; + +/// The key type for the table. +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub(crate) struct NodeKey(pub(crate) NodeId); + +impl From for NodeKey { + fn from(value: NodeId) -> Self { + NodeKey(value) + } +} + +impl From for discv5::Key { + fn from(value: NodeKey) -> Self { + let hash = keccak256(value.0.as_bytes()); + let hash = *GenericArray::from_slice(hash.as_bytes()); + discv5::Key::new_raw(value, hash) + } +} + +/// Converts a `NodeId` into the required `Key` type for the table +#[inline] +pub(crate) fn kad_key(node: NodeId) -> discv5::Key { + discv5::kbucket::Key::from(NodeKey::from(node)) +} + +/// Represents a ENR in discv4. +#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] +pub struct NodeRecord { + /// The Address of a node. + pub address: IpAddr, + /// TCP port of the port that accepts connections. + pub tcp_port: u16, + /// UDP discovery port. + pub udp_port: u16, + /// Public key of the discovery service + pub id: NodeId, +} + +impl NodeRecord { + /// Creates a new record + #[allow(unused)] + pub(crate) fn new(addr: SocketAddr, id: NodeId) -> Self { + Self { address: addr.ip(), tcp_port: addr.port(), udp_port: addr.port(), id } + } + + /// The TCP socket address of this node + #[must_use] + pub fn tcp_addr(&self) -> SocketAddr { + SocketAddr::new(self.address, self.tcp_port) + } + + /// The UDP socket address of this node + #[must_use] + pub fn udp_addr(&self) -> SocketAddr { + SocketAddr::new(self.address, self.udp_port) + } + + /// Returns the key type for the kademlia table + #[must_use] + #[inline] + pub(crate) fn key(&self) -> discv5::Key { + NodeKey(self.id).into() + } +} + +/// Possible error types when parsing a `NodeRecord` +#[derive(Debug, thiserror::Error)] +pub enum NodeRecordParseError { + #[error("Failed to parse url: {0}")] + InvalidUrl(String), + #[error("Failed to parse id")] + InvalidId(String), +} + +impl FromStr for NodeRecord { + type Err = NodeRecordParseError; + + fn from_str(s: &str) -> Result { + let url = Url::parse(s).map_err(|e| NodeRecordParseError::InvalidUrl(e.to_string()))?; + + let address = match url.host() { + Some(Host::Ipv4(ip)) => IpAddr::V4(ip), + Some(Host::Ipv6(ip)) => IpAddr::V6(ip), + Some(Host::Domain(ip)) => IpAddr::V4( + Ipv4Addr::from_str(ip) + .map_err(|e| NodeRecordParseError::InvalidUrl(e.to_string()))?, + ), + _ => return Err(NodeRecordParseError::InvalidUrl(format!("invalid host: {url:?}"))), + }; + let port = url + .port() + .ok_or_else(|| NodeRecordParseError::InvalidUrl("no port specified".to_string()))?; + + let id = url + .username() + .parse::() + .map_err(|e| NodeRecordParseError::InvalidId(e.to_string()))?; + + Ok(Self { address, id, tcp_port: port, udp_port: port }) + } +} + +impl Encodable for NodeRecord { + fn encode(&self, out: &mut dyn BufMut) { + #[derive(RlpEncodable)] + struct EncodeNode { + octets: Octets, + udp_port: u16, + tcp_port: u16, + id: NodeId, + } + + let octets = match self.address { + IpAddr::V4(addr) => Octets::V4(addr.octets()), + IpAddr::V6(addr) => Octets::V6(addr.octets()), + }; + let node = + EncodeNode { octets, udp_port: self.udp_port, tcp_port: self.tcp_port, id: self.id }; + node.encode(out) + } +} + +impl Decodable for NodeRecord { + fn decode(buf: &mut &[u8]) -> Result { + let b = &mut &**buf; + let rlp_head = reth_rlp::Header::decode(b)?; + if !rlp_head.list { + return Err(DecodeError::UnexpectedString) + } + let started_len = b.len(); + let octets = Octets::decode(b)?; + let this = Self { + address: octets.into(), + udp_port: Decodable::decode(b)?, + tcp_port: Decodable::decode(b)?, + id: Decodable::decode(b)?, + }; + // the ENR record can contain additional entries that we skip + let consumed = started_len - b.len(); + if consumed > rlp_head.payload_length { + return Err(DecodeError::ListLengthMismatch { + expected: rlp_head.payload_length, + got: consumed, + }) + } + let rem = rlp_head.payload_length - consumed; + b.advance(rem); + *buf = *b; + Ok(this) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use bytes::BytesMut; + use rand::{thread_rng, Rng, RngCore}; + + #[test] + fn test_noderecord_codec_ipv4() { + let mut rng = thread_rng(); + for _ in 0..100 { + let mut ip = [0u8; 4]; + rng.fill_bytes(&mut ip); + let record = NodeRecord { + address: IpAddr::V4(ip.into()), + tcp_port: rng.gen(), + udp_port: rng.gen(), + id: NodeId::random(), + }; + + let mut buf = BytesMut::new(); + record.encode(&mut buf); + + let decoded = NodeRecord::decode(&mut buf.as_ref()).unwrap(); + assert_eq!(record, decoded); + } + } + + #[test] + fn test_noderecord_codec_ipv6() { + let mut rng = thread_rng(); + for _ in 0..100 { + let mut ip = [0u8; 16]; + rng.fill_bytes(&mut ip); + let record = NodeRecord { + address: IpAddr::V6(ip.into()), + tcp_port: rng.gen(), + udp_port: rng.gen(), + id: NodeId::random(), + }; + + let mut buf = BytesMut::new(); + record.encode(&mut buf); + + let decoded = NodeRecord::decode(&mut buf.as_ref()).unwrap(); + assert_eq!(record, decoded); + } + } +} diff --git a/crates/net/discv4/src/proto.rs b/crates/net/discv4/src/proto.rs new file mode 100644 index 000000000..ede7aa8fc --- /dev/null +++ b/crates/net/discv4/src/proto.rs @@ -0,0 +1,578 @@ +#![allow(missing_docs)] + +use crate::{error::DecodePacketError, node::NodeRecord, NodeId, MAX_PACKET_SIZE, MIN_PACKET_SIZE}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use reth_primitives::{keccak256, H256}; +use reth_rlp::{Decodable, DecodeError, Encodable, Header}; +use reth_rlp_derive::{RlpDecodable, RlpEncodable}; +use secp256k1::{ + ecdsa::{RecoverableSignature, RecoveryId}, + SecretKey, SECP256K1, +}; +use std::net::{IpAddr, Ipv6Addr}; + +// Note: this is adapted from https://github.com/vorot93/discv4 + +/// Id for message variants. +#[derive(Debug)] +#[repr(u8)] +pub enum MessageId { + Ping = 1, + Pong = 2, + FindNode = 3, + Neighbours = 4, +} + +impl MessageId { + /// Converts the byte that represents the message id to the enum. + fn from_u8(msg: u8) -> Result { + let msg = match msg { + 1 => MessageId::Ping, + 2 => MessageId::Pong, + 3 => MessageId::FindNode, + 4 => MessageId::Neighbours, + _ => return Err(msg), + }; + Ok(msg) + } +} + +/// All message variants +#[derive(Debug, Eq, PartialEq)] +pub enum Message { + Ping(Ping), + Pong(Pong), + FindNode(FindNode), + Neighbours(Neighbours), +} + +// === impl Message === + +impl Message { + /// Returns the id for this type + pub fn msg_type(&self) -> MessageId { + match self { + Message::Ping(_) => MessageId::Ping, + Message::Pong(_) => MessageId::Pong, + Message::FindNode(_) => MessageId::FindNode, + Message::Neighbours(_) => MessageId::Neighbours, + } + } + + /// Encodes the UDP datagram, See + /// + /// The datagram is `header || payload` + /// where header is `hash || signature || packet-type` + pub fn encode(&self, secret_key: &SecretKey) -> (Bytes, H256) { + // allocate max packet size + let mut datagram = BytesMut::with_capacity(MAX_PACKET_SIZE); + + // since signature has fixed len, we can split and fill the datagram buffer at fixed + // positions, this way we can encode the message directly in the datagram buffer + let mut sig_bytes = datagram.split_off(H256::len_bytes()); + let mut payload = sig_bytes.split_off(secp256k1::constants::COMPACT_SIGNATURE_SIZE + 1); + + match self { + Message::Ping(message) => { + payload.put_u8(1); + message.encode(&mut payload); + } + Message::Pong(message) => { + payload.put_u8(2); + message.encode(&mut payload); + } + Message::FindNode(message) => { + payload.put_u8(3); + message.encode(&mut payload); + } + Message::Neighbours(message) => { + payload.put_u8(4); + message.encode(&mut payload); + } + } + + let signature: RecoverableSignature = SECP256K1.sign_ecdsa_recoverable( + &secp256k1::Message::from_slice(keccak256(&payload).as_ref()) + .expect("is correct MESSAGE_SIZE; qed"), + secret_key, + ); + + let (rec, sig) = signature.serialize_compact(); + sig_bytes.extend_from_slice(&sig); + sig_bytes.put_u8(rec.to_i32() as u8); + sig_bytes.unsplit(payload); + + let hash = keccak256(&sig_bytes); + datagram.extend_from_slice(hash.as_bytes()); + + datagram.unsplit(sig_bytes); + (datagram.freeze(), hash) + } + + /// Decodes the [`Message`] from the given buffer. + /// + /// Returns the decoded message and the public key of the sender. + pub fn decode(packet: &[u8]) -> Result { + if packet.len() < MIN_PACKET_SIZE { + return Err(DecodePacketError::PacketTooShort) + } + + // parses the wire-protocol, every packet starts with a header: + // packet-header = hash || signature || packet-type + // hash = keccak256(signature || packet-type || packet-data) + // signature = sign(packet-type || packet-data) + + let header_hash = keccak256(&packet[32..]); + let data_hash = H256::from_slice(&packet[..32]); + if data_hash != header_hash { + return Err(DecodePacketError::HashMismatch) + } + + let signature = &packet[32..96]; + let recovery_id = RecoveryId::from_i32(packet[96] as i32)?; + let recoverable_sig = RecoverableSignature::from_compact(signature, recovery_id)?; + + // recover the public key + let msg = secp256k1::Message::from_slice(keccak256(&packet[97..]).as_bytes())?; + + let pk = SECP256K1.recover_ecdsa(&msg, &recoverable_sig)?; + let node_id = NodeId::from_slice(&pk.serialize_uncompressed()[1..]); + + let msg_type = packet[97]; + let payload = &mut &packet[98..]; + + let msg = match MessageId::from_u8(msg_type).map_err(DecodePacketError::UnknownMessage)? { + MessageId::Ping => Message::Ping(Ping::decode(payload)?), + MessageId::Pong => Message::Pong(Pong::decode(payload)?), + MessageId::FindNode => Message::FindNode(FindNode::decode(payload)?), + MessageId::Neighbours => Message::Neighbours(Neighbours::decode(payload)?), + }; + + Ok(Packet { msg, node_id, hash: header_hash }) + } +} + +/// Decoded packet +#[derive(Debug)] +pub struct Packet { + pub msg: Message, + pub node_id: NodeId, + pub hash: H256, +} + +/// Represents the `from`, `to` fields in the packets +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub struct NodeEndpoint { + pub address: IpAddr, + pub udp_port: u16, + pub tcp_port: u16, +} +impl Decodable for NodeEndpoint { + fn decode(buf: &mut &[u8]) -> Result { + let b = &mut &**buf; + let rlp_head = Header::decode(b)?; + if !rlp_head.list { + return Err(DecodeError::UnexpectedString) + } + let started_len = b.len(); + let octets = Octets::decode(b)?; + let this = Self { + address: octets.into(), + udp_port: Decodable::decode(b)?, + tcp_port: Decodable::decode(b)?, + }; + // the ENR record can contain additional entries that we skip + let consumed = started_len - b.len(); + if consumed > rlp_head.payload_length { + return Err(DecodeError::ListLengthMismatch { + expected: rlp_head.payload_length, + got: consumed, + }) + } + let rem = rlp_head.payload_length - consumed; + b.advance(rem); + *buf = *b; + Ok(this) + } +} + +impl Encodable for NodeEndpoint { + fn encode(&self, out: &mut dyn BufMut) { + #[derive(RlpEncodable)] + struct RlpEndpoint { + octets: Octets, + udp_port: u16, + tcp_port: u16, + } + + let octets = match self.address { + IpAddr::V4(addr) => Octets::V4(addr.octets()), + IpAddr::V6(addr) => Octets::V6(addr.octets()), + }; + let p = RlpEndpoint { octets, udp_port: self.udp_port, tcp_port: self.tcp_port }; + p.encode(out) + } +} + +impl From for NodeEndpoint { + fn from(NodeRecord { address, tcp_port, udp_port, .. }: NodeRecord) -> Self { + Self { address, tcp_port, udp_port } + } +} + +/// A [FindNode packet](https://github.com/ethereum/devp2p/blob/master/discv4.md#findnode-packet-0x03).). +#[derive(Clone, Copy, Debug, Eq, PartialEq, RlpEncodable, RlpDecodable)] +pub struct FindNode { + pub id: NodeId, + pub expire: u64, +} + +/// A [Neighbours packet](https://github.com/ethereum/devp2p/blob/master/discv4.md#neighbors-packet-0x04). +#[derive(Clone, Debug, Eq, PartialEq, RlpEncodable, RlpDecodable)] +pub struct Neighbours { + pub nodes: Vec, + pub expire: u64, +} + +/// A [Ping packet](https://github.com/ethereum/devp2p/blob/master/discv4.md#ping-packet-0x01). +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct Ping { + pub from: NodeEndpoint, + pub to: NodeEndpoint, + pub expire: u64, +} + +impl Encodable for Ping { + fn encode(&self, out: &mut dyn BufMut) { + #[derive(RlpEncodable)] + struct V4PingMessage<'a> { + version: u32, + from: &'a NodeEndpoint, + to: &'a NodeEndpoint, + expire: u64, + } + V4PingMessage { + version: 4, // version 4 + from: &self.from, + to: &self.to, + expire: self.expire, + } + .encode(out) + } +} + +impl Decodable for Ping { + fn decode(buf: &mut &[u8]) -> Result { + let b = &mut &**buf; + let rlp_head = Header::decode(b)?; + if !rlp_head.list { + return Err(DecodeError::UnexpectedString) + } + let started_len = b.len(); + let _version = u32::decode(b)?; + let this = Self { + from: Decodable::decode(b)?, + to: Decodable::decode(b)?, + expire: Decodable::decode(b)?, + }; + let consumed = started_len - b.len(); + if consumed > rlp_head.payload_length { + return Err(DecodeError::ListLengthMismatch { + expected: rlp_head.payload_length, + got: consumed, + }) + } + let rem = rlp_head.payload_length - consumed; + b.advance(rem); + *buf = *b; + Ok(this) + } +} + +/// A [Pong packet](https://github.com/ethereum/devp2p/blob/master/discv4.md#pong-packet-0x02). +// #[derive(Clone, Debug, Eq, PartialEq, RlpEncodable, RlpDecodable)] +#[derive(Clone, Debug, Eq, PartialEq, RlpEncodable)] +pub struct Pong { + pub to: NodeEndpoint, + pub echo: H256, + pub expire: u64, +} + +impl Decodable for Pong { + fn decode(buf: &mut &[u8]) -> Result { + let b = &mut &**buf; + let rlp_head = Header::decode(b)?; + if !rlp_head.list { + return Err(DecodeError::UnexpectedString) + } + let started_len = b.len(); + let this = Self { + to: Decodable::decode(b)?, + echo: Decodable::decode(b)?, + expire: Decodable::decode(b)?, + }; + let consumed = started_len - b.len(); + if consumed > rlp_head.payload_length { + return Err(DecodeError::ListLengthMismatch { + expected: rlp_head.payload_length, + got: consumed, + }) + } + let rem = rlp_head.payload_length - consumed; + b.advance(rem); + *buf = *b; + Ok(this) + } +} + +/// IpAddr octets +#[derive(Debug, Copy, Clone)] +pub(crate) enum Octets { + V4([u8; 4]), + V6([u8; 16]), +} + +impl From for IpAddr { + fn from(value: Octets) -> Self { + match value { + Octets::V4(o) => IpAddr::from(o), + Octets::V6(o) => { + let ipv6 = Ipv6Addr::from(o); + // If the ipv6 is ipv4 compatible/mapped, simply return the ipv4. + if let Some(ipv4) = ipv6.to_ipv4() { + IpAddr::V4(ipv4) + } else { + IpAddr::V6(ipv6) + } + } + } + } +} + +impl Encodable for Octets { + fn encode(&self, out: &mut dyn BufMut) { + let octets = match self { + Octets::V4(ref o) => &o[..], + Octets::V6(ref o) => &o[..], + }; + octets.encode(out) + } +} + +impl Decodable for Octets { + fn decode(buf: &mut &[u8]) -> Result { + let h = Header::decode(buf)?; + if h.list { + return Err(DecodeError::UnexpectedList) + } + let o = match h.payload_length { + 4 => { + let mut to = [0_u8; 4]; + to.copy_from_slice(&buf[..4]); + Octets::V4(to) + } + 16 => { + let mut to = [0u8; 16]; + to.copy_from_slice(&buf[..16]); + Octets::V6(to) + } + _ => return Err(DecodeError::UnexpectedLength), + }; + buf.advance(h.payload_length); + Ok(o) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS; + use bytes::BytesMut; + use rand::{thread_rng, Rng, RngCore}; + + fn rng_endpoint(rng: &mut impl Rng) -> NodeEndpoint { + let address = if rng.gen() { + let mut ip = [0u8; 4]; + rng.fill_bytes(&mut ip); + IpAddr::V4(ip.into()) + } else { + let mut ip = [0u8; 16]; + rng.fill_bytes(&mut ip); + IpAddr::V6(ip.into()) + }; + NodeEndpoint { address, tcp_port: rng.gen(), udp_port: rng.gen() } + } + + fn rng_record(rng: &mut impl RngCore) -> NodeRecord { + let NodeEndpoint { address, udp_port, tcp_port } = rng_endpoint(rng); + NodeRecord { address, tcp_port, udp_port, id: NodeId::random() } + } + + fn rng_ipv6_record(rng: &mut impl RngCore) -> NodeRecord { + let mut ip = [0u8; 16]; + rng.fill_bytes(&mut ip); + let address = IpAddr::V6(ip.into()); + NodeRecord { address, tcp_port: rng.gen(), udp_port: rng.gen(), id: NodeId::random() } + } + + fn rng_ipv4_record(rng: &mut impl RngCore) -> NodeRecord { + let mut ip = [0u8; 4]; + rng.fill_bytes(&mut ip); + let address = IpAddr::V4(ip.into()); + NodeRecord { address, tcp_port: rng.gen(), udp_port: rng.gen(), id: NodeId::random() } + } + + fn rng_message(rng: &mut impl RngCore) -> Message { + match rng.gen_range(1..=4) { + 1 => Message::Ping(Ping { + from: rng_endpoint(rng), + to: rng_endpoint(rng), + expire: rng.gen(), + }), + 2 => Message::Pong(Pong { + to: rng_endpoint(rng), + echo: H256::random(), + expire: rng.gen(), + }), + 3 => Message::FindNode(FindNode { id: NodeId::random(), expire: rng.gen() }), + 4 => { + let num: usize = rng.gen_range(1..=SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS); + Message::Neighbours(Neighbours { + nodes: std::iter::repeat_with(|| rng_record(rng)).take(num).collect(), + expire: rng.gen(), + }) + } + _ => unreachable!(), + } + } + + #[test] + fn test_endpoint_ipv_v4() { + let mut rng = thread_rng(); + for _ in 0..100 { + let mut ip = [0u8; 4]; + rng.fill_bytes(&mut ip); + let msg = NodeEndpoint { + address: IpAddr::V4(ip.into()), + tcp_port: rng.gen(), + udp_port: rng.gen(), + }; + + let mut buf = BytesMut::new(); + msg.encode(&mut buf); + + let decoded = NodeEndpoint::decode(&mut buf.as_ref()).unwrap(); + assert_eq!(msg, decoded); + } + } + + #[test] + fn test_endpoint_ipv_64() { + let mut rng = thread_rng(); + for _ in 0..100 { + let mut ip = [0u8; 16]; + rng.fill_bytes(&mut ip); + let msg = NodeEndpoint { + address: IpAddr::V6(ip.into()), + tcp_port: rng.gen(), + udp_port: rng.gen(), + }; + + let mut buf = BytesMut::new(); + msg.encode(&mut buf); + + let decoded = NodeEndpoint::decode(&mut buf.as_ref()).unwrap(); + assert_eq!(msg, decoded); + } + } + + #[test] + fn test_ping_message() { + let mut rng = thread_rng(); + for _ in 0..100 { + let mut ip = [0u8; 16]; + rng.fill_bytes(&mut ip); + let msg = Ping { from: rng_endpoint(&mut rng), to: rng_endpoint(&mut rng), expire: 0 }; + + let mut buf = BytesMut::new(); + msg.encode(&mut buf); + + let decoded = Ping::decode(&mut buf.as_ref()).unwrap(); + assert_eq!(msg, decoded); + } + } + + #[test] + fn test_hash_mismatch() { + let mut rng = thread_rng(); + let msg = rng_message(&mut rng); + let (secret_key, _) = SECP256K1.generate_keypair(&mut rng); + let (buf, _) = msg.encode(&secret_key); + let mut buf = BytesMut::from(buf.as_ref()); + buf.put_u8(0); + match Message::decode(buf.as_ref()).unwrap_err() { + DecodePacketError::HashMismatch => {} + err => { + unreachable!("unexpected err {}", err) + } + } + } + + #[test] + fn neighbours_max_nodes() { + let mut rng = thread_rng(); + for _ in 0..1000 { + let msg = Message::Neighbours(Neighbours { + nodes: std::iter::repeat_with(|| rng_ipv6_record(&mut rng)) + .take(SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS) + .collect(), + expire: rng.gen(), + }); + let (secret_key, _) = SECP256K1.generate_keypair(&mut rng); + + let (encoded, _) = msg.encode(&secret_key); + assert!(encoded.len() <= MAX_PACKET_SIZE, "{} {:?}", encoded.len(), msg); + + let mut neighbours = Neighbours { + nodes: std::iter::repeat_with(|| rng_ipv6_record(&mut rng)) + .take(SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS - 1) + .collect(), + expire: rng.gen(), + }; + neighbours.nodes.push(rng_ipv4_record(&mut rng)); + let msg = Message::Neighbours(neighbours); + let (encoded, _) = msg.encode(&secret_key); + assert!(encoded.len() <= MAX_PACKET_SIZE, "{} {:?}", encoded.len(), msg); + } + } + + #[test] + fn test_encode_decode_message() { + let mut rng = thread_rng(); + for _ in 0..100 { + let msg = rng_message(&mut rng); + let (secret_key, pk) = SECP256K1.generate_keypair(&mut rng); + let sender_id = NodeId::from_slice(&pk.serialize_uncompressed()[1..]); + + let (buf, _) = msg.encode(&secret_key); + + let packet = Message::decode(buf.as_ref()).unwrap(); + + assert_eq!(msg, packet.msg); + assert_eq!(sender_id, packet.node_id); + } + } + + #[test] + fn decode_pong_packet() { + let packet = "2ad84c37327a06c2522cf7bc039621da89f68907441b755935bb308dc4cd17d6fe550e90329ad6a516ca7db18e08900067928a0dfa3b5c75d55a42c984497373698d98616662c048983ea85895ea2da765eabeb15525478384e106337bfd8ed50002f3c9843ed8cae682fd1c80a008ad4dead0922211df47593e7d837b2b23d13954285871ca23250ea594993ded84635690e5829670"; + let data = hex::decode(packet).unwrap(); + Message::decode(&data).unwrap(); + } + #[test] + fn decode_ping_packet() { + let packet = "05ae5bf922cf2a93f97632a4ab0943dc252a0dab0c42d86dd62e5d91e1a0966e9b628fbf4763fdfbb928540460b797e6be2e7058a82f6083f6d2e7391bb021741459976d4152aa16bbee0c3609dcfac6668db1ef78b7ee9f8b4ced10dd5ae2900101df04cb8403d12d4f82765f82765fc9843ed8cae6828aa6808463569916829670"; + let data = hex::decode(packet).unwrap(); + Message::decode(&data).unwrap(); + } +} diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index d23326299..04737ebfc 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -54,7 +54,7 @@ pub type StorageValue = H256; // NOTE: There is a benefit of using wrapped Bytes as it gives us serde and debug pub use ethers_core::{ types as rpc, - types::{Bloom, Bytes, H128, H160, H256, H512, H64, U128, U256, U64}, + types::{BigEndianHash, Bloom, Bytes, H128, H160, H256, H512, H64, U128, U256, U64}, }; #[doc(hidden)]