From e379ad0f4ecc8fabf14c99fc4c11c79931a46800 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Fri, 19 Jun 2020 14:13:23 +1000 Subject: [PATCH] Silky smooth discovery (#1274) * Initial structural re-write * Improving discovery update and correcting attestation service logic * Rework discovery.mod * Handling lifetimes of query futures * Discovery update first draft * format fixes * Stabalise discv5 update * Formatting corrections * Limit FindPeers queries and bug correction * Update to stable release discv5 * Remove unnecessary pin * formatting --- Cargo.lock | 282 ++++---- Cargo.toml | 3 +- beacon_node/Cargo.toml | 2 +- beacon_node/client/Cargo.toml | 2 +- beacon_node/eth2-libp2p/src/discovery/mod.rs | 628 ----------------- .../{eth2-libp2p => eth2_libp2p}/Cargo.toml | 5 +- .../src/behaviour/handler/delegate.rs | 112 +--- .../src/behaviour/handler/mod.rs | 10 +- .../src/behaviour/mod.rs | 85 ++- .../src/config.rs | 5 +- .../{eth2-libp2p => eth2_libp2p}/src/lib.rs | 10 +- .../src/metrics.rs | 8 + .../src/peer_manager/client.rs | 0 .../src/peer_manager}/discovery/enr.rs | 4 +- .../src/peer_manager}/discovery/enr_ext.rs | 0 .../src/peer_manager/discovery/mod.rs | 632 ++++++++++++++++++ .../discovery/subnet_predicate.rs | 2 +- .../src/peer_manager/mod.rs | 352 +++++++--- .../src/peer_manager/peer_info.rs | 0 .../src/peer_manager/peer_sync_status.rs | 0 .../src/peer_manager/peerdb.rs | 8 + .../src/rpc/codec/base.rs | 0 .../src/rpc/codec/mod.rs | 0 .../src/rpc/codec/ssz.rs | 0 .../src/rpc/codec/ssz_snappy.rs | 0 .../src/rpc/handler.rs | 0 .../src/rpc/methods.rs | 0 .../src/rpc/mod.rs | 0 .../src/rpc/protocol.rs | 0 .../src/service.rs | 0 .../src/types/error.rs | 0 .../src/types/globals.rs | 2 +- .../src/types/mod.rs | 0 .../src/types/pubsub.rs | 0 .../src/types/sync_state.rs | 0 .../src/types/topics.rs | 0 .../tests/common/mod.rs | 4 +- .../tests/gossipsub_tests.rs | 0 .../tests/noise.rs | 0 .../tests/rpc_tests.rs | 0 beacon_node/network/Cargo.toml | 2 +- .../network/src/attestation_service/mod.rs | 35 +- beacon_node/network/src/service.rs | 4 +- beacon_node/rest_api/Cargo.toml | 2 +- beacon_node/src/lib.rs | 5 + lcli/Cargo.toml | 2 +- lighthouse/environment/Cargo.toml | 1 + lighthouse/environment/src/executor.rs | 6 + 48 files changed, 1168 insertions(+), 1045 deletions(-) delete mode 100644 beacon_node/eth2-libp2p/src/discovery/mod.rs rename beacon_node/{eth2-libp2p => eth2_libp2p}/Cargo.toml (91%) rename beacon_node/{eth2-libp2p => eth2_libp2p}/src/behaviour/handler/delegate.rs (72%) rename beacon_node/{eth2-libp2p => eth2_libp2p}/src/behaviour/handler/mod.rs (95%) rename beacon_node/{eth2-libp2p => eth2_libp2p}/src/behaviour/mod.rs (94%) rename beacon_node/{eth2-libp2p => eth2_libp2p}/src/config.rs (97%) rename beacon_node/{eth2-libp2p => eth2_libp2p}/src/lib.rs (79%) rename beacon_node/{eth2-libp2p => eth2_libp2p}/src/metrics.rs (71%) rename beacon_node/{eth2-libp2p => eth2_libp2p}/src/peer_manager/client.rs (100%) rename beacon_node/{eth2-libp2p/src => eth2_libp2p/src/peer_manager}/discovery/enr.rs (98%) rename beacon_node/{eth2-libp2p/src => eth2_libp2p/src/peer_manager}/discovery/enr_ext.rs (100%) create mode 100644 beacon_node/eth2_libp2p/src/peer_manager/discovery/mod.rs rename beacon_node/{eth2-libp2p/src => eth2_libp2p/src/peer_manager}/discovery/subnet_predicate.rs (95%) rename beacon_node/{eth2-libp2p => eth2_libp2p}/src/peer_manager/mod.rs (74%) rename beacon_node/{eth2-libp2p => eth2_libp2p}/src/peer_manager/peer_info.rs (100%) rename beacon_node/{eth2-libp2p => eth2_libp2p}/src/peer_manager/peer_sync_status.rs (100%) rename beacon_node/{eth2-libp2p => eth2_libp2p}/src/peer_manager/peerdb.rs (98%) rename beacon_node/{eth2-libp2p => eth2_libp2p}/src/rpc/codec/base.rs (100%) rename beacon_node/{eth2-libp2p => eth2_libp2p}/src/rpc/codec/mod.rs (100%) rename beacon_node/{eth2-libp2p => eth2_libp2p}/src/rpc/codec/ssz.rs (100%) rename beacon_node/{eth2-libp2p => eth2_libp2p}/src/rpc/codec/ssz_snappy.rs (100%) rename beacon_node/{eth2-libp2p => eth2_libp2p}/src/rpc/handler.rs (100%) rename beacon_node/{eth2-libp2p => eth2_libp2p}/src/rpc/methods.rs (100%) rename beacon_node/{eth2-libp2p => eth2_libp2p}/src/rpc/mod.rs (100%) rename beacon_node/{eth2-libp2p => eth2_libp2p}/src/rpc/protocol.rs (100%) rename beacon_node/{eth2-libp2p => eth2_libp2p}/src/service.rs (100%) rename beacon_node/{eth2-libp2p => eth2_libp2p}/src/types/error.rs (100%) rename beacon_node/{eth2-libp2p => eth2_libp2p}/src/types/globals.rs (98%) rename beacon_node/{eth2-libp2p => eth2_libp2p}/src/types/mod.rs (100%) rename beacon_node/{eth2-libp2p => eth2_libp2p}/src/types/pubsub.rs (100%) rename beacon_node/{eth2-libp2p => eth2_libp2p}/src/types/sync_state.rs (100%) rename beacon_node/{eth2-libp2p => eth2_libp2p}/src/types/topics.rs (100%) rename beacon_node/{eth2-libp2p => eth2_libp2p}/tests/common/mod.rs (97%) rename beacon_node/{eth2-libp2p => eth2_libp2p}/tests/gossipsub_tests.rs (100%) rename beacon_node/{eth2-libp2p => eth2_libp2p}/tests/noise.rs (100%) rename beacon_node/{eth2-libp2p => eth2_libp2p}/tests/rpc_tests.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index fab4bc4489..59d4b66994 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -41,9 +41,9 @@ dependencies = [ [[package]] name = "adler32" -version = "1.0.4" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d2e7343e7fc9de883d1b0341e0b13970f764c14101234857d2ddafa1cb1cac2" +checksum = "567b077b825e468cc974f0020d4082ee6e03132512f207ef1a02fd5d00d1f32d" [[package]] name = "aes-ctr" @@ -132,9 +132,9 @@ dependencies = [ [[package]] name = "arc-swap" -version = "0.4.6" +version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b585a98a234c46fc563103e9278c9391fde1f4e6850334da895d27edb9580f62" +checksum = "4d25d88fd6b8041580a654f9d0c581a047baee2b3efee13275f2fc392fc75034" [[package]] name = "arrayref" @@ -225,13 +225,14 @@ checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d" [[package]] name = "backtrace" -version = "0.3.48" +version = "0.3.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0df2f85c8a2abbe3b7d7e748052fdd9b76a0458fdeb16ad4223f5eca78c7c130" +checksum = "05100821de9e028f12ae3d189176b41ee198341eb8f369956407fea2f5cc666c" dependencies = [ "addr2line", "cfg-if", "libc", + "miniz_oxide", "object", "rustc-demangle", ] @@ -260,9 +261,9 @@ checksum = "b41b7ea54a0c9d92199de89e20e58d49f02f8e699814ef3fdf266f6f748d15c7" [[package]] name = "base64" -version = "0.12.1" +version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53d1ccbaf7d9ec9537465a97bf19edc1a4e158ecb49fc16178202238c569cc42" +checksum = "e223af0dc48c96d4f8342ec01a4974f139df863896b316681efd36742f22cc67" [[package]] name = "beacon_chain" @@ -320,8 +321,8 @@ dependencies = [ "ctrlc", "dirs", "environment", - "eth2-libp2p", "eth2_config", + "eth2_libp2p", "eth2_ssz", "eth2_testnet_config", "exit-future", @@ -621,8 +622,8 @@ dependencies = [ "environment", "error-chain", "eth1", - "eth2-libp2p", "eth2_config", + "eth2_libp2p", "eth2_ssz", "futures 0.3.5", "genesis", @@ -843,12 +844,13 @@ dependencies = [ [[package]] name = "crossbeam-queue" -version = "0.2.2" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab6bffe714b6bb07e42f201352c34f51fefd355ace793f9e638ebd52d23f98d2" +checksum = "774ba60a54c213d409d5353bda12d49cd68d14e45036a285234c8d6f91f92570" dependencies = [ "cfg-if", "crossbeam-utils", + "maybe-uninit", ] [[package]] @@ -990,9 +992,9 @@ dependencies = [ [[package]] name = "derive_more" -version = "0.99.7" +version = "0.99.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2127768764f1556535c01b5326ef94bd60ff08dcfbdc544d53e69ed155610f5d" +checksum = "bc655351f820d774679da6cdc23355a93de496867d8203496675162e17b1d671" dependencies = [ "proc-macro2", "quote", @@ -1020,11 +1022,10 @@ dependencies = [ [[package]] name = "dirs-sys" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afa0b23de8fd801745c471deffa6e12d248f962c9fd4b4c33787b055599bde7b" +checksum = "8e93d7f5705de3e49895a2b5e0b8855a1c27f080192ae9c32a6432d50741a57a" dependencies = [ - "cfg-if", "libc", "redox_users", "winapi 0.3.8", @@ -1038,8 +1039,9 @@ checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" [[package]] name = "discv5" -version = "0.1.0-alpha.2" -source = "git+https://github.com/sigp/discv5?rev=7b3bd40591b62b8c002ffdb85de008aa9f82e2e5#7b3bd40591b62b8c002ffdb85de008aa9f82e2e5" +version = "0.1.0-alpha.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66319abef3e2f4dc434bf0c9bcb5dee5907d7fece3327dfd7da82db905d02441" dependencies = [ "arrayvec 0.5.1", "digest", @@ -1048,10 +1050,15 @@ dependencies = [ "futures 0.3.5", "hex 0.4.2", "hkdf", + "lazy_static", + "libp2p-core", "libsecp256k1", "log 0.4.8", + "lru_time_cache", + "multihash", "net2", "openssl", + "parking_lot 0.10.2", "rand 0.7.3", "rlp", "sha2", @@ -1069,9 +1076,9 @@ checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" [[package]] name = "dtoa" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4358a9e11b9a09cf52383b451b49a169e8d797b68aa02301ff586d70d9661ea3" +checksum = "134951f4028bdadb9b84baf4232681efbf277da25144b9b0ad65df75946c422b" [[package]] name = "ed25519-dalek" @@ -1129,7 +1136,7 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca3cd1bccf1bd78eee44d89c0f81b60008b40153db2b99c0fc01abf353781e13" dependencies = [ - "base64 0.12.1", + "base64 0.12.2", "bs58", "ed25519-dalek", "hex 0.4.2", @@ -1160,6 +1167,7 @@ name = "environment" version = "0.1.2" dependencies = [ "ctrlc", + "discv5", "eth2_config", "eth2_testnet_config", "exit-future", @@ -1228,49 +1236,6 @@ dependencies = [ "web3", ] -[[package]] -name = "eth2-libp2p" -version = "0.1.2" -dependencies = [ - "base64 0.12.1", - "dirs", - "discv5", - "environment", - "error-chain", - "eth2_ssz", - "eth2_ssz_derive", - "eth2_ssz_types", - "exit-future", - "fnv", - "futures 0.3.5", - "hashset_delay", - "hex 0.4.2", - "lazy_static", - "libp2p", - "libp2p-tcp", - "lighthouse_metrics", - "lru 0.5.1", - "parking_lot 0.10.2", - "serde", - "serde_derive", - "sha2", - "slog", - "slog-async", - "slog-stdlog", - "slog-term", - "smallvec 1.4.0", - "snap", - "tempdir", - "tiny-keccak 2.0.2", - "tokio 0.2.21", - "tokio-io-timeout", - "tokio-util", - "types", - "unsigned-varint 0.3.3 (git+https://github.com/sigp/unsigned-varint?branch=latest-codecs)", - "version", - "void", -] - [[package]] name = "eth2_config" version = "0.2.0" @@ -1296,7 +1261,7 @@ dependencies = [ name = "eth2_interop_keypairs" version = "0.2.0" dependencies = [ - "base64 0.12.1", + "base64 0.12.2", "eth2_hashing", "hex 0.4.2", "lazy_static", @@ -1335,6 +1300,49 @@ dependencies = [ "zeroize", ] +[[package]] +name = "eth2_libp2p" +version = "0.1.2" +dependencies = [ + "base64 0.12.2", + "dirs", + "discv5", + "environment", + "error-chain", + "eth2_ssz", + "eth2_ssz_derive", + "eth2_ssz_types", + "exit-future", + "fnv", + "futures 0.3.5", + "hashset_delay", + "hex 0.4.2", + "lazy_static", + "libp2p", + "libp2p-tcp", + "lighthouse_metrics", + "lru 0.5.1", + "parking_lot 0.10.2", + "serde", + "serde_derive", + "sha2", + "slog", + "slog-async", + "slog-stdlog", + "slog-term", + "smallvec 1.4.0", + "snap", + "tempdir", + "tiny-keccak 2.0.2", + "tokio 0.2.21", + "tokio-io-timeout", + "tokio-util", + "types", + "unsigned-varint 0.3.3 (git+https://github.com/sigp/unsigned-varint?branch=latest-codecs)", + "version", + "void", +] + [[package]] name = "eth2_ssz" version = "0.1.2" @@ -1850,9 +1858,9 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.1.13" +version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91780f809e750b0a89f5544be56617ff6b1227ee485bcb06ebe10cdf89bd3b71" +checksum = "b9586eedd4ce6b3c498bc3b4dd92fc9f11166aa908a914071953768066c67909" dependencies = [ "libc", ] @@ -2166,9 +2174,9 @@ dependencies = [ [[package]] name = "itoa" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8b7a7c0c47db5545ed3fef7468ee7bb5b74691498139e4b3f6a20685dc6dd8e" +checksum = "dc6f3ad7b9d11a0c00842ff8de1b60ee58661048eb8049ed33c73594f359d7e6" [[package]] name = "js-sys" @@ -2238,8 +2246,8 @@ dependencies = [ "deposit_contract", "dirs", "environment", - "eth2-libp2p", "eth2_keystore", + "eth2_libp2p", "eth2_ssz", "eth2_testnet_config", "futures 0.3.5", @@ -2708,6 +2716,12 @@ dependencies = [ "linked-hash-map", ] +[[package]] +name = "lru_time_cache" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adb241df5c4caeb888755363fc95f8a896618dc0d435e9e775f7930cb099beab" + [[package]] name = "mach" version = "0.3.2" @@ -2795,9 +2809,9 @@ dependencies = [ [[package]] name = "miniz_oxide" -version = "0.3.6" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa679ff6578b1cddee93d7e82e263b94a575e0bfced07284eb0c037c1d2416a5" +checksum = "791daaae1ed6889560f8c4359194f56648355540573244a5448a83ba1ecc7435" dependencies = [ "adler32", ] @@ -2841,7 +2855,7 @@ checksum = "f5e374eff525ce1c5b7687c4cef63943e7686524a387933ad27ca7ec43779cb3" dependencies = [ "log 0.4.8", "mio", - "miow 0.3.4", + "miow 0.3.5", "winapi 0.3.8", ] @@ -2870,9 +2884,9 @@ dependencies = [ [[package]] name = "miow" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22dfdd1d51b2639a5abd17ed07005c3af05fb7a2a3b1a1d0d7af1000a520c1c7" +checksum = "07b88fb9795d4d36d62a012dfbf49a8f5cf12751f36d31a9dbe66d528e58979e" dependencies = [ "socket2", "winapi 0.3.8", @@ -2949,7 +2963,7 @@ dependencies = [ "beacon_chain", "environment", "error-chain", - "eth2-libp2p", + "eth2_libp2p", "eth2_ssz", "exit-future", "fnv", @@ -3058,9 +3072,9 @@ dependencies = [ [[package]] name = "num-integer" -version = "0.1.42" +version = "0.1.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f6ea62e9d81a77cd3ee9a2a5b9b609447857f3d358704331e4ef39eb247fcba" +checksum = "8d59457e662d541ba17869cf51cf177c0b5f0cbf476c66bdc90bf1edac4f875b" dependencies = [ "autocfg 1.0.0", "num-traits", @@ -3068,9 +3082,9 @@ dependencies = [ [[package]] name = "num-iter" -version = "0.1.40" +version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfb0800a0291891dd9f4fe7bd9c19384f98f7fbe0cd0f39a2c6b88b9868bbc00" +checksum = "7a6e6b7c748f995c4c29c5f5ae0248536e04a5739927c74ec0fa564805094b9f" dependencies = [ "autocfg 1.0.0", "num-integer", @@ -3079,9 +3093,9 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.11" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c62be47e61d1842b9170f0fdeec8eba98e60e90e5446449a0545e5152acd7096" +checksum = "ac267bcc07f48ee5f8935ab0d24f316fb722d7a1292e2913f0cc196b29ffd611" dependencies = [ "autocfg 1.0.0", ] @@ -3098,9 +3112,9 @@ dependencies = [ [[package]] name = "object" -version = "0.19.0" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cbca9424c482ee628fa549d9c812e2cd22f1180b9222c9200fdfa6eb31aecb2" +checksum = "1ab52be62400ca80aa00285d25253d7f7c437b7375c4de678f5405d3afe82ca5" [[package]] name = "once_cell" @@ -3113,9 +3127,9 @@ dependencies = [ [[package]] name = "oorandom" -version = "11.1.1" +version = "11.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94af325bc33c7f60191be4e2c984d48aaa21e2854f473b85398344b60c9b6358" +checksum = "a170cebd8021a008ea92e4db85a72f80b35df514ec664b296fdcbb654eac0b2c" [[package]] name = "opaque-debug" @@ -3145,9 +3159,9 @@ checksum = "77af24da69f9d9341038eba93a073b1fdaaa1b788221b00a69bce9e762cb32de" [[package]] name = "openssl-sys" -version = "0.9.57" +version = "0.9.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7410fef80af8ac071d4f63755c0ab89ac3df0fd1ea91f1d1f37cf5cec4395990" +checksum = "a842db4709b604f0fe5d1170ae3565899be2ad3d9cbc72dedc789ac0511f78de" dependencies = [ "autocfg 1.0.0", "cc", @@ -3292,18 +3306,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "0.4.19" +version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba3a1acf4a3e70849f8a673497ef984f043f95d2d8252dcdf74d54e6a1e47e8a" +checksum = "12e3a6cdbfe94a5e4572812a0201f8c0ed98c1c452c7b8563ce2276988ef9c17" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "0.4.19" +version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "194e88048b71a3e02eb4ee36a6995fed9b8236c11a7bb9f7247a9d9835b3f265" +checksum = "6a0ffd45cf79d88737d7cc85bfd5d2894bee1139b356e616fe85dc389c61aaf7" dependencies = [ "proc-macro2", "quote", @@ -3373,9 +3387,9 @@ checksum = "7e0456befd48169b9f13ef0f0ad46d492cf9d2dbb918bcf38e01eed4ce3ec5e4" [[package]] name = "proc-macro-nested" -version = "0.1.4" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e946095f9d3ed29ec38de908c22f95d9ac008e424c7bcae54c75a79c527c694" +checksum = "eba180dafb9038b050a4c280019bbedf9f2467b61e5d892dcad585bb57aadc5a" [[package]] name = "proc-macro2" @@ -3543,9 +3557,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.6" +version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54a21852a652ad6f610c9510194f398ff6f8692e334fd1145fed931f7fbe44ea" +checksum = "aa563d17ecb180e500da1cfd2b028310ac758de548efdd203e18f283af693f37" dependencies = [ "proc-macro2", ] @@ -3680,10 +3694,11 @@ dependencies = [ [[package]] name = "rayon" -version = "1.3.0" +version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db6ce3297f9c85e16621bb8cca38a06779ffc31bb8184e1be4bed2be4678a098" +checksum = "62f02856753d04e03e26929f820d0a0a337ebe71f849801eea335d464b349080" dependencies = [ + "autocfg 1.0.0", "crossbeam-deque", "either", "rayon-core", @@ -3691,9 +3706,9 @@ dependencies = [ [[package]] name = "rayon-core" -version = "1.7.0" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08a89b46efaf957e52b18062fb2f4660f8b8a4dde1807ca002690868ef2c85a9" +checksum = "e92e15d89083484e11353891f1af602cc661426deb9564c298b270c726973280" dependencies = [ "crossbeam-deque", "crossbeam-queue", @@ -3775,9 +3790,9 @@ dependencies = [ [[package]] name = "remove_dir_all" -version = "0.5.2" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a83fa3702a688b9359eccba92d153ac33fd2e8462f9e0e3fdf155239ea7792e" +checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" dependencies = [ "winapi 0.3.8", ] @@ -3788,7 +3803,7 @@ version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b82c9238b305f26f53443e3a4bc8528d64b8d0bee408ec949eb7bf5635ec680" dependencies = [ - "base64 0.12.1", + "base64 0.12.2", "bytes 0.5.4", "encoding_rs", "futures-core", @@ -3826,8 +3841,8 @@ dependencies = [ "bls", "bus", "environment", - "eth2-libp2p", "eth2_config", + "eth2_libp2p", "eth2_ssz", "eth2_ssz_derive", "futures 0.3.5", @@ -4142,18 +4157,18 @@ checksum = "a0eddf2e8f50ced781f288c19f18621fa72a3779e3cb58dbf23b07469b0abeb4" [[package]] name = "serde" -version = "1.0.111" +version = "1.0.112" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9124df5b40cbd380080b2cc6ab894c040a3070d995f5c9dc77e18c34a8ae37d" +checksum = "736aac72d1eafe8e5962d1d1c3d99b0df526015ba40915cb3c49d042e92ec243" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.111" +version = "1.0.112" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f2c3ac8e6ca1e9c80b8be1023940162bf81ae3cffbb1809474152f2ce1eb250" +checksum = "bf0343ce212ac0d3d6afd9391ac8e9c9efe06b533c8d33f660f6390cc4093f57" dependencies = [ "proc-macro2", "quote", @@ -4170,9 +4185,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.53" +version = "1.0.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "993948e75b189211a9b31a7528f950c6adc21f9720b6438ff80a7fa2f864cea2" +checksum = "ec2c5d7e739bc07a3e73381a39d61fdb5f671c60c1df26a130690665803d8226" dependencies = [ "itoa", "ryu", @@ -4181,9 +4196,9 @@ dependencies = [ [[package]] name = "serde_repr" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd02c7587ec314570041b2754829f84d873ced14a96d1fd1823531e11db40573" +checksum = "2dc6b7951b17b051f3210b063f12cc17320e2fe30ae05b0fe2a3abb068551c76" dependencies = [ "proc-macro2", "quote", @@ -4204,9 +4219,9 @@ dependencies = [ [[package]] name = "serde_yaml" -version = "0.8.12" +version = "0.8.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16c7a592a1ec97c9c1c68d75b6e537dcbf60c7618e038e7841e00af1d9ccf0c4" +checksum = "ae3e2dd40a7cdc18ca80db804b7f461a39bb721160a85c9a1fa30134bf3c02a5" dependencies = [ "dtoa", "linked-hash-map", @@ -4697,9 +4712,9 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.30" +version = "1.0.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93a56fabc59dce20fe48b6c832cc249c713e7ed88fa28b0ee0a3bfcaae5fe4e2" +checksum = "b5304cfdf27365b7585c25d4af91b35016ed21ef88f17ced89c7093b43dba8b6" dependencies = [ "proc-macro2", "quote", @@ -4708,9 +4723,9 @@ dependencies = [ [[package]] name = "synstructure" -version = "0.12.3" +version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67656ea1dc1b41b1451851562ea232ec2e5a80242139f7e679ceccfb5d61f545" +checksum = "b834f2d66f734cb897113e34aaff2f1ab4719ca946f9a7358dba8f8064148701" dependencies = [ "proc-macro2", "quote", @@ -4792,18 +4807,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.19" +version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b13f926965ad00595dd129fa12823b04bbf866e9085ab0a5f2b05b850fbfc344" +checksum = "7dfdd070ccd8ccb78f4ad66bf1982dc37f620ef696c6b5028fe2ed83dd3d0d08" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.19" +version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "893582086c2f98cde18f906265a65b5030a074b1046c674ae898be6519a7f479" +checksum = "bd80fc12f73063ac132ac92aceea36734f04a1d93c1240c6944e23a3b8841793" dependencies = [ "proc-macro2", "quote", @@ -4925,6 +4940,12 @@ dependencies = [ "serde_json", ] +[[package]] +name = "tinyvec" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53953d2d3a5ad81d9f844a32f14ebb121f50b650cd59d0ee2a07cf13c617efed" + [[package]] name = "tokio" version = "0.1.22" @@ -5428,11 +5449,11 @@ dependencies = [ [[package]] name = "unicode-normalization" -version = "0.1.12" +version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5479532badd04e128284890390c1e876ef7a993d0570b3597ae43dfa1d59afa4" +checksum = "6fb19cf769fa8c6a80a162df694621ebeb4dafb606470b2b2fce0be40a98a977" dependencies = [ - "smallvec 1.4.0", + "tinyvec", ] [[package]] @@ -5566,9 +5587,9 @@ dependencies = [ [[package]] name = "vcpkg" -version = "0.2.9" +version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55d1e41d56121e07f1e223db0a4def204e45c85425f6a16d462fd07c8d10d74c" +checksum = "6454029bf181f092ad1b853286f23e2c507d8e8194d01d92da4a55c274a5508c" [[package]] name = "vec_map" @@ -5760,10 +5781,11 @@ dependencies = [ [[package]] name = "web3" version = "0.11.0" -source = "git+https://github.com/tomusdrw/rust-web3#69d5746f124033dee922d7d36acef9321c1df0b0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a681e8d15deced7c510db88c59133d2eafa7b6298b6e91b545e2a3fed93b3fe" dependencies = [ "arrayvec 0.5.1", - "base64 0.12.1", + "base64 0.12.2", "derive_more", "ethabi", "ethereum-types", diff --git a/Cargo.toml b/Cargo.toml index 8a4179ae8a..e8aab0ad08 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ members = [ "beacon_node/beacon_chain", "beacon_node/client", "beacon_node/eth1", - "beacon_node/eth2-libp2p", + "beacon_node/eth2_libp2p", "beacon_node/network", "beacon_node/rest_api", "beacon_node/store", @@ -72,4 +72,3 @@ eth2_ssz = { path = "consensus/ssz" } eth2_ssz_derive = { path = "consensus/ssz_derive" } eth2_ssz_types = { path = "consensus/ssz_types" } eth2_hashing = { path = "crypto/eth2_hashing" } -web3 = { git = "https://github.com/tomusdrw/rust-web3" } diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index c8d3cf616f..4d70b84f30 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -35,7 +35,7 @@ futures = "0.3.5" environment = { path = "../lighthouse/environment" } genesis = { path = "genesis" } eth2_testnet_config = { path = "../common/eth2_testnet_config" } -eth2-libp2p = { path = "./eth2-libp2p" } +eth2_libp2p = { path = "./eth2_libp2p" } eth2_ssz = "0.1.2" toml = "0.5.6" serde = "1.0.110" diff --git a/beacon_node/client/Cargo.toml b/beacon_node/client/Cargo.toml index 1b9ef0ab8e..aad15cd9e0 100644 --- a/beacon_node/client/Cargo.toml +++ b/beacon_node/client/Cargo.toml @@ -13,7 +13,7 @@ beacon_chain = { path = "../beacon_chain" } store = { path = "../store" } network = { path = "../network" } timer = { path = "../timer" } -eth2-libp2p = { path = "../eth2-libp2p" } +eth2_libp2p = { path = "../eth2_libp2p" } rest_api = { path = "../rest_api" } parking_lot = "0.10.2" websocket_server = { path = "../websocket_server" } diff --git a/beacon_node/eth2-libp2p/src/discovery/mod.rs b/beacon_node/eth2-libp2p/src/discovery/mod.rs deleted file mode 100644 index 136b00a5d3..0000000000 --- a/beacon_node/eth2-libp2p/src/discovery/mod.rs +++ /dev/null @@ -1,628 +0,0 @@ -///! This manages the discovery and management of peers. -pub(crate) mod enr; -pub mod enr_ext; - -// Allow external use of the lighthouse ENR builder -pub use enr::{build_enr, CombinedKey, Keypair}; -pub use enr_ext::{CombinedKeyExt, EnrExt}; - -use crate::metrics; -use crate::{error, Enr, NetworkConfig, NetworkGlobals}; -use discv5::{enr::NodeId, Discv5, Discv5Event, QueryId}; -use enr::{Eth2Enr, BITFIELD_ENR_KEY, ETH2_ENR_KEY}; -use futures::prelude::*; -use libp2p::core::{connection::ConnectionId, Multiaddr, PeerId}; -use libp2p::multiaddr::Protocol; -use libp2p::swarm::{ - protocols_handler::DummyProtocolsHandler, DialPeerCondition, NetworkBehaviour, - NetworkBehaviourAction, PollParameters, ProtocolsHandler, -}; -use lru::LruCache; -use slog::{crit, debug, info, trace, warn}; -use ssz::{Decode, Encode}; -use ssz_types::BitVector; -use std::{ - collections::{HashMap, HashSet, VecDeque}, - net::SocketAddr, - path::Path, - sync::Arc, - task::{Context, Poll}, - time::{Duration, Instant}, -}; -use tokio::time::{delay_until, Delay}; -use types::{EnrForkId, EthSpec, SubnetId}; - -mod subnet_predicate; - -use subnet_predicate::subnet_predicate; - -/// Maximum seconds before searching for extra peers. -const MAX_TIME_BETWEEN_PEER_SEARCHES: u64 = 120; -/// Initial delay between peer searches. -const INITIAL_SEARCH_DELAY: u64 = 5; -/// The number of peers we must be connected to before increasing the discovery delay. -const MINIMUM_PEERS_BEFORE_DELAY_INCREASE: usize = 5; -/// Local ENR storage filename. -pub const ENR_FILENAME: &str = "enr.dat"; -/// Number of peers we'd like to have connected to a given long-lived subnet. -const TARGET_SUBNET_PEERS: usize = 3; -/// Number of times to attempt a discovery request -const MAX_DISCOVERY_RETRY: u64 = 3; - -/// A struct representing the information associated with a single discovery request, -/// which can be retried with multiple queries -#[derive(Clone, Debug)] -pub struct Request { - pub query_id: Option, - pub min_ttl: Option, - pub retries: u64, -} - -/// Lighthouse discovery behaviour. This provides peer management and discovery using the Discv5 -/// libp2p protocol. -pub struct Discovery { - /// Events to be processed by the behaviour. - events: VecDeque>, - - /// A collection of seen live ENRs for quick lookup and to map peer-id's to ENRs. - cached_enrs: LruCache, - - /// The currently banned peers. - banned_peers: HashSet, - - /// The target number of connected peers on the libp2p interface. - max_peers: usize, - - /// The directory where the ENR is stored. - enr_dir: String, - - /// The delay between peer discovery searches. - peer_discovery_delay: Delay, - - /// Tracks the last discovery delay. The delay is doubled each round until the max - /// time is reached. - past_discovery_delay: u64, - - /// The TCP port for libp2p. Used to convert an updated IP address to a multiaddr. Note: This - /// assumes that the external TCP port is the same as the internal TCP port if behind a NAT. - //TODO: Improve NAT handling limit the above restriction - tcp_port: u16, - - /// The discovery behaviour used to discover new peers. - discovery: Discv5, - - /// A collection of network constants that can be read from other threads. - network_globals: Arc>, - - /// A mapping of SubnetId that we are currently searching for to all information associated with each request. - subnet_queries: HashMap, - - /// Logger for the discovery behaviour. - log: slog::Logger, -} - -impl Discovery { - pub fn new( - local_key: &Keypair, - config: &NetworkConfig, - network_globals: Arc>, - log: &slog::Logger, - ) -> error::Result { - let log = log.clone(); - - let enr_dir = match config.network_dir.to_str() { - Some(path) => String::from(path), - None => String::from(""), - }; - - let local_enr = network_globals.local_enr.read().clone(); - - info!(log, "ENR Initialised"; "enr" => local_enr.to_base64(), "seq" => local_enr.seq(), "id"=> format!("{}",local_enr.node_id()), "ip" => format!("{:?}", local_enr.ip()), "udp"=> format!("{:?}", local_enr.udp()), "tcp" => format!("{:?}", local_enr.tcp())); - - let listen_socket = SocketAddr::new(config.listen_address, config.discovery_port); - - // convert the keypair into an ENR key - let enr_key: CombinedKey = CombinedKey::from_libp2p(&local_key)?; - - let mut discovery = Discv5::new( - local_enr, - enr_key, - config.discv5_config.clone(), - listen_socket, - ) - .map_err(|e| format!("Discv5 service failed. Error: {:?}", e))?; - - // Add bootnodes to routing table - for bootnode_enr in config.boot_nodes.clone() { - debug!( - log, - "Adding node to routing table"; - "node_id" => format!("{}", bootnode_enr.node_id()), - "peer_id" => format!("{}", bootnode_enr.peer_id()), - "ip" => format!("{:?}", bootnode_enr.ip()), - "udp" => format!("{:?}", bootnode_enr.udp()), - "tcp" => format!("{:?}", bootnode_enr.tcp()) - ); - let _ = discovery.add_enr(bootnode_enr).map_err(|e| { - warn!( - log, - "Could not add peer to the local routing table"; - "error" => format!("{}", e) - ) - }); - } - - Ok(Self { - events: VecDeque::with_capacity(16), - cached_enrs: LruCache::new(50), - banned_peers: HashSet::new(), - max_peers: config.max_peers, - peer_discovery_delay: delay_until(tokio::time::Instant::now()), - past_discovery_delay: INITIAL_SEARCH_DELAY, - tcp_port: config.libp2p_port, - discovery, - network_globals, - subnet_queries: HashMap::new(), - log, - enr_dir, - }) - } - - /// Return the nodes local ENR. - pub fn local_enr(&self) -> &Enr { - self.discovery.local_enr() - } - - /// Manually search for peers. This restarts the discovery round, sparking multiple rapid - /// queries. - pub fn discover_peers(&mut self) { - self.past_discovery_delay = INITIAL_SEARCH_DELAY; - self.find_peers(); - } - - /// Add an ENR to the routing table of the discovery mechanism. - pub fn add_enr(&mut self, enr: Enr) { - // add the enr to seen caches - self.cached_enrs.put(enr.peer_id(), enr.clone()); - - let _ = self.discovery.add_enr(enr).map_err(|e| { - warn!( - self.log, - "Could not add peer to the local routing table"; - "error" => format!("{}", e) - ) - }); - } - - /// The peer has been banned. Add this peer to the banned list to prevent any future - /// re-connections. - // TODO: Remove the peer from the DHT if present - pub fn peer_banned(&mut self, peer_id: PeerId) { - self.banned_peers.insert(peer_id); - } - - pub fn peer_unbanned(&mut self, peer_id: &PeerId) { - self.banned_peers.remove(peer_id); - } - - /// Returns an iterator over all enr entries in the DHT. - pub fn enr_entries(&mut self) -> impl Iterator { - self.discovery.enr_entries() - } - - /// Returns the ENR of a known peer if it exists. - pub fn enr_of_peer(&mut self, peer_id: &PeerId) -> Option { - // first search the local cache - if let Some(enr) = self.cached_enrs.get(peer_id) { - return Some(enr.clone()); - } - // not in the local cache, look in the routing table - if let Ok(_node_id) = enr_ext::peer_id_to_node_id(peer_id) { - // TODO: Need to update discv5 - // self.discovery.find_enr(&node_id) - return None; - } else { - return None; - } - } - - /// Adds/Removes a subnet from the ENR Bitfield - pub fn update_enr_bitfield(&mut self, subnet_id: SubnetId, value: bool) -> Result<(), String> { - let id = *subnet_id as usize; - - let local_enr = self.discovery.local_enr(); - let mut current_bitfield = local_enr.bitfield::()?; - - if id >= current_bitfield.len() { - return Err(format!( - "Subnet id: {} is outside the ENR bitfield length: {}", - id, - current_bitfield.len() - )); - } - - if current_bitfield - .get(id) - .map_err(|_| String::from("Subnet ID out of bounds"))? - == value - { - return Err(format!( - "Subnet id: {} already in the local ENR already has value: {}", - id, value - )); - } - - // set the subnet bitfield in the ENR - current_bitfield - .set(id, value) - .map_err(|_| String::from("Subnet ID out of bounds, could not set subnet ID"))?; - - // insert the bitfield into the ENR record - let _ = self - .discovery - .enr_insert(BITFIELD_ENR_KEY, current_bitfield.as_ssz_bytes()); - - // replace the global version - *self.network_globals.local_enr.write() = self.discovery.local_enr().clone(); - Ok(()) - } - - /// Updates the `eth2` field of our local ENR. - pub fn update_eth2_enr(&mut self, enr_fork_id: EnrForkId) { - // to avoid having a reference to the spec constant, for the logging we assume - // FAR_FUTURE_EPOCH is u64::max_value() - let next_fork_epoch_log = if enr_fork_id.next_fork_epoch == u64::max_value() { - String::from("No other fork") - } else { - format!("{:?}", enr_fork_id.next_fork_epoch) - }; - - info!(self.log, "Updating the ENR fork version"; - "fork_digest" => format!("{:?}", enr_fork_id.fork_digest), - "next_fork_version" => format!("{:?}", enr_fork_id.next_fork_version), - "next_fork_epoch" => next_fork_epoch_log, - ); - - let _ = self - .discovery - .enr_insert(ETH2_ENR_KEY.into(), enr_fork_id.as_ssz_bytes()) - .map_err(|e| { - warn!( - self.log, - "Could not update eth2 ENR field"; - "error" => format!("{:?}", e) - ) - }); - - // replace the global version with discovery version - *self.network_globals.local_enr.write() = self.discovery.local_enr().clone(); - } - - /// A request to find peers on a given subnet. - pub fn discover_subnet_peers(&mut self, subnet_id: SubnetId, min_ttl: Option) { - // TODO: Extend this to an event once discovery becomes a thread managed by the peer - // manager - if let Some(min_ttl) = min_ttl { - self.network_globals - .peers - .write() - .extend_peers_on_subnet(subnet_id, min_ttl); - } - - // If there is already a discovery request in process for this subnet, ignore this request, - // but update the min_ttl. - if let Some(request) = self.subnet_queries.get_mut(&subnet_id) { - // update the min_ttl if required - if let Some(min_ttl) = min_ttl { - if request.min_ttl < Some(min_ttl) { - request.min_ttl = Some(min_ttl); - } - } - return; - } - - // Insert a request and start a query for the subnet - self.subnet_queries.insert( - subnet_id.clone(), - Request { - query_id: None, - min_ttl, - retries: 0, - }, - ); - self.run_subnet_query(subnet_id); - } - - /// Runs a discovery request for a given subnet_id if one already exists. - fn run_subnet_query(&mut self, subnet_id: SubnetId) { - let mut request = match self.subnet_queries.remove(&subnet_id) { - Some(v) => v, - None => return, // request doesn't exist - }; - - // increment the retry count - request.retries += 1; - - let peers_on_subnet = self - .network_globals - .peers - .read() - .peers_on_subnet(subnet_id) - .count(); - - if peers_on_subnet > TARGET_SUBNET_PEERS { - trace!(self.log, "Discovery ignored"; - "reason" => "Already connected to desired peers", - "connected_peers_on_subnet" => peers_on_subnet, - "target_subnet_peers" => TARGET_SUBNET_PEERS, - ); - return; - } - - // remove the entry and complete the query if greater than the maximum search count - if request.retries >= MAX_DISCOVERY_RETRY { - debug!( - self.log, - "Subnet peer discovery did not find sufficient peers. Reached max retry limit" - ); - return; - } - - let target_peers = TARGET_SUBNET_PEERS - peers_on_subnet; - debug!(self.log, "Searching for peers for subnet"; - "subnet_id" => *subnet_id, - "connected_peers_on_subnet" => peers_on_subnet, - "target_subnet_peers" => TARGET_SUBNET_PEERS, - "peers_to_find" => target_peers, - "attempt" => request.retries, - ); - - // start the query, and update the queries map if necessary - let subnet_predicate = subnet_predicate::(subnet_id, &self.log); - if let Some(query_id) = self.start_query(subnet_predicate, target_peers) { - request.query_id = Some(query_id); - } else { - // ENR is not present remove the query - return; - } - self.subnet_queries.insert(subnet_id, request); - } - - /* Internal Functions */ - - /// Run a standard query to search for more peers. - /// - /// This searches for the standard kademlia bucket size (16) peers. - fn find_peers(&mut self) { - debug!(self.log, "Searching for peers"); - self.start_query(|_| true, 16); - } - - /// Search for a specified number of new peers using the underlying discovery mechanism. - /// - /// This can optionally search for peers for a given predicate. Regardless of the predicate - /// given, this will only search for peers on the same enr_fork_id as specified in the local - /// ENR. - fn start_query(&mut self, enr_predicate: F, num_nodes: usize) -> Option - where - F: Fn(&Enr) -> bool + Send + 'static + Clone, - { - // pick a random NodeId - let random_node = NodeId::random(); - - let enr_fork_id = match self.local_enr().eth2() { - Ok(v) => v, - Err(e) => { - crit!(self.log, "Local ENR has no fork id"; "error" => e); - return None; - } - }; - // predicate for finding nodes with a matching fork - let eth2_fork_predicate = move |enr: &Enr| enr.eth2() == Ok(enr_fork_id.clone()); - let predicate = move |enr: &Enr| eth2_fork_predicate(enr) && enr_predicate(enr); - - // general predicate - Some( - self.discovery - .find_enr_predicate(random_node, predicate, num_nodes), - ) - } - - /// Peers that are found during discovery are optionally dialed. - // TODO: Shift to peer manager. As its own service, discovery should spit out discovered nodes - // and the peer manager should decide about who to connect to. - fn dial_discovered_peers(&mut self, peers: Vec, min_ttl: Option) { - for enr in peers { - // cache known peers - let peer_id = enr.peer_id(); - self.cached_enrs.put(enr.peer_id(), enr); - - // if we need more peers, attempt a connection - if self.network_globals.connected_or_dialing_peers() < self.max_peers - && !self - .network_globals - .peers - .read() - .is_connected_or_dialing(&peer_id) - && !self.banned_peers.contains(&peer_id) - { - debug!(self.log, "Connecting to discovered peer"; "peer_id"=> peer_id.to_string()); - // TODO: Update output - // This should be updated with the peer dialing. In fact created once the peer is - // dialed - if let Some(min_ttl) = min_ttl { - self.network_globals - .peers - .write() - .update_min_ttl(&peer_id, min_ttl); - } - self.events.push_back(NetworkBehaviourAction::DialPeer { - peer_id, - condition: DialPeerCondition::Disconnected, - }); - } - } - } -} - -// Build a dummy Network behaviour around the discv5 server -impl NetworkBehaviour for Discovery { - type ProtocolsHandler = DummyProtocolsHandler; - type OutEvent = Discv5Event; - - fn new_handler(&mut self) -> Self::ProtocolsHandler { - DummyProtocolsHandler::default() - } - - fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { - if let Some(enr) = self.enr_of_peer(peer_id) { - // ENR's may have multiple Multiaddrs. The multi-addr associated with the UDP - // port is removed, which is assumed to be associated with the discv5 protocol (and - // therefore irrelevant for other libp2p components). - let mut out_list = enr.multiaddr(); - out_list.retain(|addr| { - addr.iter() - .find(|v| match v { - Protocol::Udp(_) => true, - _ => false, - }) - .is_none() - }); - - out_list - } else { - // PeerId is not known - Vec::new() - } - } - - // ignore libp2p connections/streams - fn inject_connected(&mut self, _: &PeerId) {} - - // ignore libp2p connections/streams - fn inject_disconnected(&mut self, _: &PeerId) {} - - // no libp2p discv5 events - event originate from the session_service. - fn inject_event( - &mut self, - _: PeerId, - _: ConnectionId, - _event: ::OutEvent, - ) { - void::unreachable(_event) - } - - fn poll( - &mut self, - cx: &mut Context, - _: &mut impl PollParameters, - ) -> Poll< - NetworkBehaviourAction< - ::InEvent, - Self::OutEvent, - >, - > { - // search for peers if it is time - loop { - match self.peer_discovery_delay.poll_unpin(cx) { - Poll::Ready(_) => { - if self.network_globals.connected_peers() < self.max_peers { - self.find_peers(); - } - // Set to maximum, and update to earlier, once we get our results back. - self.peer_discovery_delay.reset( - tokio::time::Instant::now() - + Duration::from_secs(MAX_TIME_BETWEEN_PEER_SEARCHES), - ); - } - Poll::Pending => break, - } - } - - // Poll discovery - loop { - match self.discovery.poll_next_unpin(cx) { - Poll::Ready(Some(event)) => { - match event { - Discv5Event::Discovered(_enr) => { - // peers that get discovered during a query but are not contactable or - // don't match a predicate can end up here. For debugging purposes we - // log these to see if we are unnecessarily dropping discovered peers - /* - if enr.eth2() == self.local_enr().eth2() { - trace!(self.log, "Peer found in process of query"; "peer_id" => format!("{}", enr.peer_id()), "tcp_socket" => enr.tcp_socket()); - } else { - // this is temporary warning for debugging the DHT - warn!(self.log, "Found peer during discovery not on correct fork"; "peer_id" => format!("{}", enr.peer_id()), "tcp_socket" => enr.tcp_socket()); - } - */ - } - Discv5Event::SocketUpdated(socket) => { - info!(self.log, "Address updated"; "ip" => format!("{}",socket.ip()), "udp_port" => format!("{}", socket.port())); - metrics::inc_counter(&metrics::ADDRESS_UPDATE_COUNT); - let mut address = Multiaddr::from(socket.ip()); - address.push(Protocol::Tcp(self.tcp_port)); - let enr = self.discovery.local_enr(); - enr::save_enr_to_disk(Path::new(&self.enr_dir), enr, &self.log); - - return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { - address, - }); - } - Discv5Event::FindNodeResult { - closer_peers, - query_id, - .. - } => { - debug!(self.log, "Discovery query completed"; "peers_found" => closer_peers.len()); - // update the time to the next query - if self.past_discovery_delay < MAX_TIME_BETWEEN_PEER_SEARCHES - && self.network_globals.connected_or_dialing_peers() - > MINIMUM_PEERS_BEFORE_DELAY_INCREASE - { - self.past_discovery_delay *= 2; - } - let delay = std::cmp::max( - self.past_discovery_delay, - MAX_TIME_BETWEEN_PEER_SEARCHES, - ); - self.peer_discovery_delay - .reset(tokio::time::Instant::now() + Duration::from_secs(delay)); - - // if this is a subnet query, run it to completion - if let Some((subnet_id, min_ttl)) = self - .subnet_queries - .iter() - .find(|(_, request)| request.query_id == Some(query_id)) - .map(|(subnet_id, request)| { - (subnet_id.clone(), request.min_ttl.clone()) - }) - { - debug!(self.log, "Peer subnet discovery request completed"; "peers_found" => closer_peers.len(), "subnet_id" => *subnet_id); - self.dial_discovered_peers(closer_peers, min_ttl); - self.run_subnet_query(subnet_id); - } else { - if closer_peers.is_empty() { - debug!(self.log, "Peer Discovery request yielded no results."); - } else { - self.dial_discovered_peers(closer_peers, None); - } - } - } - _ => {} - } - } - // discv5 does not output any other NetworkBehaviourAction - Poll::Ready(_) => {} - Poll::Pending => break, - } - } - - // process any queued events - if let Some(event) = self.events.pop_front() { - return Poll::Ready(event); - } - - Poll::Pending - } -} diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2_libp2p/Cargo.toml similarity index 91% rename from beacon_node/eth2-libp2p/Cargo.toml rename to beacon_node/eth2_libp2p/Cargo.toml index 5adc2d2dbc..7aced93bb6 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2_libp2p/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "eth2-libp2p" +name = "eth2_libp2p" version = "0.1.2" authors = ["Age Manning "] edition = "2018" @@ -32,8 +32,7 @@ snap = "1.0.0" void = "1.0.2" tokio-io-timeout = "0.4.0" tokio-util = { version = "0.3.1", features = ["codec", "compat"] } -# Patched for quick updates -discv5 = { git = "https://github.com/sigp/discv5", rev = "7b3bd40591b62b8c002ffdb85de008aa9f82e2e5" } +discv5 = { version = "0.1.0-alpha.5", features = ["libp2p"] } tiny-keccak = "2.0.2" environment = { path = "../../lighthouse/environment" } libp2p-tcp = { version = "0.19.1", default-features = false, features = ["tokio"] } diff --git a/beacon_node/eth2-libp2p/src/behaviour/handler/delegate.rs b/beacon_node/eth2_libp2p/src/behaviour/handler/delegate.rs similarity index 72% rename from beacon_node/eth2-libp2p/src/behaviour/handler/delegate.rs rename to beacon_node/eth2_libp2p/src/behaviour/handler/delegate.rs index 8fb31feef9..b20fe5170d 100644 --- a/beacon_node/eth2-libp2p/src/behaviour/handler/delegate.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/handler/delegate.rs @@ -1,4 +1,3 @@ -use crate::discovery::Discovery; use crate::rpc::*; use libp2p::{ core::either::{EitherError, EitherOutput}, @@ -19,7 +18,6 @@ use types::EthSpec; type GossipHandler = ::ProtocolsHandler; type RPCHandler = as NetworkBehaviour>::ProtocolsHandler; type IdentifyHandler = ::ProtocolsHandler; -type DiscoveryHandler = as NetworkBehaviour>::ProtocolsHandler; /// Handler that combines Lighthouse's Behaviours' handlers in a delegating manner. pub(super) struct DelegatingHandler { @@ -29,22 +27,14 @@ pub(super) struct DelegatingHandler { rpc_handler: RPCHandler, /// Handler for the Identify protocol. identify_handler: IdentifyHandler, - /// Handler for the Discovery protocol. - discovery_handler: DiscoveryHandler, } impl DelegatingHandler { - pub fn new( - gossipsub: &mut Gossipsub, - rpc: &mut RPC, - identify: &mut Identify, - discovery: &mut Discovery, - ) -> Self { + pub fn new(gossipsub: &mut Gossipsub, rpc: &mut RPC, identify: &mut Identify) -> Self { DelegatingHandler { gossip_handler: gossipsub.new_handler(), rpc_handler: rpc.new_handler(), identify_handler: identify.new_handler(), - discovery_handler: discovery.new_handler(), } } @@ -73,7 +63,6 @@ pub enum DelegateIn { Gossipsub(::InEvent), RPC( as ProtocolsHandler>::InEvent), Identify(::InEvent), - Discovery( as ProtocolsHandler>::InEvent), } /// Wrapper around the `ProtocolsHandler::OutEvent` types of the handlers. @@ -82,7 +71,6 @@ pub enum DelegateOut { Gossipsub(::OutEvent), RPC( as ProtocolsHandler>::OutEvent), Identify(::OutEvent), - Discovery( as ProtocolsHandler>::OutEvent), } /// Wrapper around the `ProtocolsHandler::Error` types of the handlers. @@ -92,7 +80,6 @@ pub enum DelegateError { Gossipsub(::Error), RPC( as ProtocolsHandler>::Error), Identify(::Error), - Discovery( as ProtocolsHandler>::Error), } impl std::error::Error for DelegateError {} @@ -106,7 +93,6 @@ impl std::fmt::Display for DelegateError { DelegateError::Gossipsub(err) => err.fmt(formater), DelegateError::RPC(err) => err.fmt(formater), DelegateError::Identify(err) => err.fmt(formater), - DelegateError::Discovery(err) => err.fmt(formater), } } } @@ -115,10 +101,7 @@ pub type DelegateInProto = SelectUpgrade< ::InboundProtocol, SelectUpgrade< as ProtocolsHandler>::InboundProtocol, - SelectUpgrade< - ::InboundProtocol, - as ProtocolsHandler>::InboundProtocol, - >, + ::InboundProtocol, >, >; @@ -126,10 +109,7 @@ pub type DelegateOutProto = EitherUpgrade< ::OutboundProtocol, EitherUpgrade< as ProtocolsHandler>::OutboundProtocol, - EitherUpgrade< - ::OutboundProtocol, - as ProtocolsHandler>::OutboundProtocol, - >, + ::OutboundProtocol, >, >; @@ -138,10 +118,7 @@ pub type DelegateOutInfo = EitherOutput< ::OutboundOpenInfo, EitherOutput< as ProtocolsHandler>::OutboundOpenInfo, - EitherOutput< - ::OutboundOpenInfo, - as ProtocolsHandler>::OutboundOpenInfo, - >, + ::OutboundOpenInfo, >, >; @@ -157,24 +134,16 @@ impl ProtocolsHandler for DelegatingHandler { let gossip_proto = self.gossip_handler.listen_protocol(); let rpc_proto = self.rpc_handler.listen_protocol(); let identify_proto = self.identify_handler.listen_protocol(); - let discovery_proto = self.discovery_handler.listen_protocol(); let timeout = gossip_proto .timeout() .max(rpc_proto.timeout()) .max(identify_proto.timeout()) - .max(discovery_proto.timeout()) .clone(); let select = SelectUpgrade::new( gossip_proto.into_upgrade().1, - SelectUpgrade::new( - rpc_proto.into_upgrade().1, - SelectUpgrade::new( - identify_proto.into_upgrade().1, - discovery_proto.into_upgrade().1, - ), - ), + SelectUpgrade::new(rpc_proto.into_upgrade().1, identify_proto.into_upgrade().1), ); SubstreamProtocol::new(select).with_timeout(timeout) @@ -192,13 +161,9 @@ impl ProtocolsHandler for DelegatingHandler { self.rpc_handler.inject_fully_negotiated_inbound(out) } // Identify - EitherOutput::Second(EitherOutput::Second(EitherOutput::First(out))) => { + EitherOutput::Second(EitherOutput::Second(out)) => { self.identify_handler.inject_fully_negotiated_inbound(out) } - // Discovery - EitherOutput::Second(EitherOutput::Second(EitherOutput::Second(out))) => { - self.discovery_handler.inject_fully_negotiated_inbound(out) - } } } @@ -221,18 +186,11 @@ impl ProtocolsHandler for DelegatingHandler { .inject_fully_negotiated_outbound(protocol, info), // Identify ( - EitherOutput::Second(EitherOutput::Second(EitherOutput::First(protocol))), - EitherOutput::Second(EitherOutput::Second(EitherOutput::First(info))), + EitherOutput::Second(EitherOutput::Second(protocol)), + EitherOutput::Second(EitherOutput::Second(info)), ) => self .identify_handler .inject_fully_negotiated_outbound(protocol, info), - // Discovery - ( - EitherOutput::Second(EitherOutput::Second(EitherOutput::Second(protocol))), - EitherOutput::Second(EitherOutput::Second(EitherOutput::Second(info))), - ) => self - .discovery_handler - .inject_fully_negotiated_outbound(protocol, info), // Reaching here means we got a protocol and info for different behaviours _ => unreachable!("output and protocol don't match"), } @@ -243,7 +201,6 @@ impl ProtocolsHandler for DelegatingHandler { DelegateIn::Gossipsub(ev) => self.gossip_handler.inject_event(ev), DelegateIn::RPC(ev) => self.rpc_handler.inject_event(ev), DelegateIn::Identify(ev) => self.identify_handler.inject_event(ev), - DelegateIn::Discovery(ev) => self.discovery_handler.inject_event(ev), } } @@ -305,7 +262,7 @@ impl ProtocolsHandler for DelegatingHandler { } }, // Identify - EitherOutput::Second(EitherOutput::Second(EitherOutput::First(info))) => match error { + EitherOutput::Second(EitherOutput::Second(info)) => match error { ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) => { self.identify_handler.inject_dial_upgrade_error( info, @@ -319,7 +276,7 @@ impl ProtocolsHandler for DelegatingHandler { .identify_handler .inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timeout), ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B( - EitherError::B(EitherError::A(err)), + EitherError::B(err), ))) => self.identify_handler.inject_dial_upgrade_error( info, ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)), @@ -328,30 +285,6 @@ impl ProtocolsHandler for DelegatingHandler { unreachable!("info and error don't match") } }, - // Discovery - EitherOutput::Second(EitherOutput::Second(EitherOutput::Second(info))) => match error { - ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) => { - self.discovery_handler.inject_dial_upgrade_error( - info, - ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), - ) - } - ProtocolsHandlerUpgrErr::Timer => self - .discovery_handler - .inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timer), - ProtocolsHandlerUpgrErr::Timeout => self - .discovery_handler - .inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timeout), - ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B( - EitherError::B(EitherError::B(err)), - ))) => self.discovery_handler.inject_dial_upgrade_error( - info, - ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)), - ), - ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(_)) => { - unreachable!("info and error don't match") - } - }, } } @@ -360,7 +293,6 @@ impl ProtocolsHandler for DelegatingHandler { .connection_keep_alive() .max(self.rpc_handler.connection_keep_alive()) .max(self.identify_handler.connection_keep_alive()) - .max(self.discovery_handler.connection_keep_alive()) } fn poll( @@ -417,28 +349,8 @@ impl ProtocolsHandler for DelegatingHandler { } Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info }) => { return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: protocol - .map_upgrade(|u| EitherUpgrade::B(EitherUpgrade::B(EitherUpgrade::A(u)))), - info: EitherOutput::Second(EitherOutput::Second(EitherOutput::First(info))), - }); - } - Poll::Pending => (), - }; - - match self.discovery_handler.poll(cx) { - Poll::Ready(ProtocolsHandlerEvent::Custom(event)) => { - return Poll::Ready(ProtocolsHandlerEvent::Custom(DelegateOut::Discovery(event))); - } - Poll::Ready(ProtocolsHandlerEvent::Close(event)) => { - return Poll::Ready(ProtocolsHandlerEvent::Close(DelegateError::Discovery( - event, - ))); - } - Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info }) => { - return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: protocol - .map_upgrade(|u| EitherUpgrade::B(EitherUpgrade::B(EitherUpgrade::B(u)))), - info: EitherOutput::Second(EitherOutput::Second(EitherOutput::Second(info))), + protocol: protocol.map_upgrade(|u| EitherUpgrade::B(EitherUpgrade::B(u))), + info: EitherOutput::Second(EitherOutput::Second(info)), }); } Poll::Pending => (), diff --git a/beacon_node/eth2-libp2p/src/behaviour/handler/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs similarity index 95% rename from beacon_node/eth2-libp2p/src/behaviour/handler/mod.rs rename to beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs index c888131219..64d0bcaca8 100644 --- a/beacon_node/eth2-libp2p/src/behaviour/handler/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs @@ -1,4 +1,3 @@ -use crate::discovery::Discovery; use crate::rpc::*; use delegate::DelegatingHandler; pub(super) use delegate::{ @@ -27,14 +26,9 @@ pub struct BehaviourHandler { } impl BehaviourHandler { - pub fn new( - gossipsub: &mut Gossipsub, - rpc: &mut RPC, - identify: &mut Identify, - discovery: &mut Discovery, - ) -> Self { + pub fn new(gossipsub: &mut Gossipsub, rpc: &mut RPC, identify: &mut Identify) -> Self { BehaviourHandler { - delegate: DelegatingHandler::new(gossipsub, rpc, identify, discovery), + delegate: DelegatingHandler::new(gossipsub, rpc, identify), shutting_down: false, } } diff --git a/beacon_node/eth2-libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs similarity index 94% rename from beacon_node/eth2-libp2p/src/behaviour/mod.rs rename to beacon_node/eth2_libp2p/src/behaviour/mod.rs index d9534e2cc6..1aca8a5dd5 100644 --- a/beacon_node/eth2-libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -1,9 +1,8 @@ -use crate::discovery::{enr::Eth2Enr, Discovery}; use crate::peer_manager::{PeerManager, PeerManagerEvent}; use crate::rpc::*; use crate::types::{GossipEncoding, GossipKind, GossipTopic}; +use crate::Eth2Enr; use crate::{error, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash}; -use discv5::Discv5Event; use futures::prelude::*; use handler::{BehaviourHandler, BehaviourHandlerIn, BehaviourHandlerOut, DelegateIn, DelegateOut}; use libp2p::{ @@ -46,8 +45,6 @@ pub struct Behaviour { // TODO: Using id for initial interop. This will be removed by mainnet. /// Provides IP addresses and peer information. identify: Identify, - /// Discovery behaviour. - discovery: Discovery, /// The peer manager that keeps track of peer's reputation and status. peer_manager: PeerManager, /// The events generated by this behaviour to be consumed in the swarm poll. @@ -76,7 +73,6 @@ macro_rules! delegate_to_behaviours { $self.gossipsub.$fn($($arg),*); $self.eth2_rpc.$fn($($arg),*); $self.identify.$fn($($arg),*); - $self.discovery.$fn($($arg),*); }; } @@ -85,21 +81,11 @@ impl NetworkBehaviour for Behaviour { type OutEvent = BehaviourEvent; fn new_handler(&mut self) -> Self::ProtocolsHandler { - BehaviourHandler::new( - &mut self.gossipsub, - &mut self.eth2_rpc, - &mut self.identify, - &mut self.discovery, - ) + BehaviourHandler::new(&mut self.gossipsub, &mut self.eth2_rpc, &mut self.identify) } fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { - let mut out = Vec::new(); - out.extend(self.gossipsub.addresses_of_peer(peer_id)); - out.extend(self.eth2_rpc.addresses_of_peer(peer_id)); - out.extend(self.identify.addresses_of_peer(peer_id)); - out.extend(self.discovery.addresses_of_peer(peer_id)); - out + self.peer_manager.addresses_of_peer(peer_id) } fn inject_connected(&mut self, peer_id: &PeerId) { @@ -178,7 +164,6 @@ impl NetworkBehaviour for Behaviour { DelegateOut::Gossipsub(ev) => self.gossipsub.inject_event(peer_id, conn_id, ev), DelegateOut::RPC(ev) => self.eth2_rpc.inject_event(peer_id, conn_id, ev), DelegateOut::Identify(ev) => self.identify.inject_event(peer_id, conn_id, ev), - DelegateOut::Discovery(ev) => self.discovery.inject_event(peer_id, conn_id, ev), }, /* Custom events sent BY the handler */ BehaviourHandlerOut::Custom => { @@ -240,7 +225,6 @@ impl NetworkBehaviour for Behaviour { poll_behaviour!(gossipsub, on_gossip_event, DelegateIn::Gossipsub); poll_behaviour!(eth2_rpc, on_rpc_event, DelegateIn::RPC); poll_behaviour!(identify, on_identify_event, DelegateIn::Identify); - poll_behaviour!(discovery, on_discovery_event, DelegateIn::Discovery); self.custom_poll(cx) } @@ -264,14 +248,12 @@ impl Behaviour { ); let enr_fork_id = network_globals - .local_enr - .read() + .local_enr() .eth2() .expect("Local ENR must have a fork id"); let attnets = network_globals - .local_enr - .read() + .local_enr() .bitfield::() .expect("Local ENR must have subnet bitfield"); @@ -283,9 +265,8 @@ impl Behaviour { Ok(Behaviour { eth2_rpc: RPC::new(log.clone()), gossipsub: Gossipsub::new(local_peer_id, net_conf.gs_config.clone()), - discovery: Discovery::new(local_key, net_conf, network_globals.clone(), log)?, identify, - peer_manager: PeerManager::new(network_globals.clone(), log), + peer_manager: PeerManager::new(local_key, net_conf, network_globals.clone(), log)?, events: Vec::new(), peers_to_dc: Vec::new(), seen_gossip_messages: LruCache::new(100_000), @@ -296,9 +277,9 @@ impl Behaviour { }) } - /// Obtain a reference to the discovery protocol. - pub fn discovery(&self) -> &Discovery { - &self.discovery + /// Returns the local ENR of the node. + pub fn local_enr(&self) -> Enr { + self.network_globals.local_enr() } /// Obtain a reference to the gossipsub protocol. @@ -428,33 +409,35 @@ impl Behaviour { ) } - /* Discovery / Peer management functions */ + /* Peer management functions */ /// Notify discovery that the peer has been banned. - pub fn peer_banned(&mut self, peer_id: PeerId) { - self.discovery.peer_banned(peer_id); - } + // TODO: Remove this and integrate all disconnection/banning logic inside the peer manager. + pub fn peer_banned(&mut self, _peer_id: PeerId) {} /// Notify discovery that the peer has been unbanned. - pub fn peer_unbanned(&mut self, peer_id: &PeerId) { - self.discovery.peer_unbanned(peer_id); - } + // TODO: Remove this and integrate all disconnection/banning logic inside the peer manager. + pub fn peer_unbanned(&mut self, _peer_id: &PeerId) {} /// Returns an iterator over all enr entries in the DHT. - pub fn enr_entries(&mut self) -> impl Iterator { - self.discovery.enr_entries() + pub fn enr_entries(&mut self) -> Vec { + self.peer_manager.discovery_mut().table_entries_enr() } /// Add an ENR to the routing table of the discovery mechanism. pub fn add_enr(&mut self, enr: Enr) { - self.discovery.add_enr(enr); + self.peer_manager.discovery_mut().add_enr(enr); } /// Updates a subnet value to the ENR bitfield. /// /// The `value` is `true` if a subnet is being added and false otherwise. pub fn update_enr_subnet(&mut self, subnet_id: SubnetId, value: bool) { - if let Err(e) = self.discovery.update_enr_bitfield(subnet_id, value) { + if let Err(e) = self + .peer_manager + .discovery_mut() + .update_enr_bitfield(subnet_id, value) + { crit!(self.log, "Could not update ENR bitfield"; "error" => e); } // update the local meta data which informs our peers of the update during PINGS @@ -464,12 +447,14 @@ impl Behaviour { /// Attempts to discover new peers for a given subnet. The `min_ttl` gives the time at which we /// would like to retain the peers for. pub fn discover_subnet_peers(&mut self, subnet_id: SubnetId, min_ttl: Option) { - self.discovery.discover_subnet_peers(subnet_id, min_ttl) + self.peer_manager.discover_subnet_peers(subnet_id, min_ttl) } /// Updates the local ENR's "eth2" field with the latest EnrForkId. pub fn update_fork_version(&mut self, enr_fork_id: EnrForkId) { - self.discovery.update_eth2_enr(enr_fork_id.clone()); + self.peer_manager + .discovery_mut() + .update_eth2_enr(enr_fork_id.clone()); // unsubscribe from all gossip topics and re-subscribe to their new fork counterparts let subscribed_topics = self @@ -497,11 +482,12 @@ impl Behaviour { /* Private internal functions */ - /// Updates the current meta data of the node. + /// Updates the current meta data of the node to match the local ENR. fn update_metadata(&mut self) { self.meta_data.seq_number += 1; self.meta_data.attnets = self - .discovery + .peer_manager + .discovery() .local_enr() .bitfield::() .expect("Local discovery must have bitfield"); @@ -764,6 +750,15 @@ impl Behaviour { loop { match self.peer_manager.poll_next_unpin(cx) { Poll::Ready(Some(event)) => match event { + PeerManagerEvent::Dial(peer_id) => { + return Poll::Ready(NBAction::DialPeer { + peer_id, + condition: libp2p::swarm::DialPeerCondition::Disconnected, + }); + } + PeerManagerEvent::SocketUpdated(address) => { + return Poll::Ready(NBAction::ReportObservedAddr { address }); + } PeerManagerEvent::Status(peer_id) => { // it's time to status. We don't keep a beacon chain reference here, so we inform // the network to send a status to this peer @@ -835,10 +830,6 @@ impl Behaviour { IdentifyEvent::Error { .. } => {} } } - - fn on_discovery_event(&mut self, _event: Discv5Event) { - // discv5 has no events to inject - } } /* Public API types */ diff --git a/beacon_node/eth2-libp2p/src/config.rs b/beacon_node/eth2_libp2p/src/config.rs similarity index 97% rename from beacon_node/eth2-libp2p/src/config.rs rename to beacon_node/eth2_libp2p/src/config.rs index 369d0477e9..08c451cbe2 100644 --- a/beacon_node/eth2-libp2p/src/config.rs +++ b/beacon_node/eth2_libp2p/src/config.rs @@ -109,14 +109,15 @@ impl Default for Config { // discv5 configuration let discv5_config = Discv5ConfigBuilder::new() + .enable_packet_filter() + .session_cache_capacity(100) .request_timeout(Duration::from_secs(4)) .request_retries(2) - .enr_update(true) // update IP based on PONG responses .enr_peer_update_min(2) // prevents NAT's should be raised for mainnet .query_parallelism(5) .query_timeout(Duration::from_secs(60)) .query_peer_timeout(Duration::from_secs(2)) - .ip_limit(false) // limits /24 IP's in buckets. Enable for mainnet + .ip_limit() // limits /24 IP's in buckets. .ping_interval(Duration::from_secs(300)) .build(); diff --git a/beacon_node/eth2-libp2p/src/lib.rs b/beacon_node/eth2_libp2p/src/lib.rs similarity index 79% rename from beacon_node/eth2-libp2p/src/lib.rs rename to beacon_node/eth2_libp2p/src/lib.rs index 9bdf6ef083..24a66e0a6d 100644 --- a/beacon_node/eth2-libp2p/src/lib.rs +++ b/beacon_node/eth2_libp2p/src/lib.rs @@ -7,7 +7,6 @@ extern crate lazy_static; pub mod behaviour; mod config; -pub mod discovery; mod metrics; mod peer_manager; pub mod rpc; @@ -17,9 +16,14 @@ pub mod types; pub use crate::types::{error, Enr, GossipTopic, NetworkGlobals, PubsubMessage}; pub use behaviour::{BehaviourEvent, PeerRequestId, Request, Response}; pub use config::Config as NetworkConfig; -pub use discovery::enr_ext::{CombinedKeyExt, EnrExt}; +pub use discv5; pub use libp2p::gossipsub::{MessageId, Topic, TopicHash}; pub use libp2p::{core::ConnectedPoint, PeerId, Swarm}; pub use libp2p::{multiaddr, Multiaddr}; -pub use peer_manager::{client::Client, PeerDB, PeerInfo, PeerSyncStatus, SyncInfo}; +pub use peer_manager::discovery; +pub use peer_manager::{ + client::Client, + discovery::{CombinedKeyExt, EnrExt, Eth2Enr}, + PeerDB, PeerInfo, PeerSyncStatus, SyncInfo, +}; pub use service::{Libp2pEvent, Service, NETWORK_KEY_FILENAME}; diff --git a/beacon_node/eth2-libp2p/src/metrics.rs b/beacon_node/eth2_libp2p/src/metrics.rs similarity index 71% rename from beacon_node/eth2-libp2p/src/metrics.rs rename to beacon_node/eth2_libp2p/src/metrics.rs index b678ef6b41..52caf9e929 100644 --- a/beacon_node/eth2-libp2p/src/metrics.rs +++ b/beacon_node/eth2_libp2p/src/metrics.rs @@ -17,4 +17,12 @@ lazy_static! { "libp2p_peer_disconnect_event_total", "Count of libp2p peer disconnect events" ); + pub static ref DISCOVERY_QUEUE: Result = try_create_int_gauge( + "discovery_queue_size", + "The number of discovery queries awaiting execution" + ); + pub static ref DISCOVERY_REQS: Result = try_create_int_gauge( + "discovery_requests", + "The number of unsolicited discovery requests per second" + ); } diff --git a/beacon_node/eth2-libp2p/src/peer_manager/client.rs b/beacon_node/eth2_libp2p/src/peer_manager/client.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/peer_manager/client.rs rename to beacon_node/eth2_libp2p/src/peer_manager/client.rs diff --git a/beacon_node/eth2-libp2p/src/discovery/enr.rs b/beacon_node/eth2_libp2p/src/peer_manager/discovery/enr.rs similarity index 98% rename from beacon_node/eth2-libp2p/src/discovery/enr.rs rename to beacon_node/eth2_libp2p/src/peer_manager/discovery/enr.rs index 3f3cbe12f7..798fbee485 100644 --- a/beacon_node/eth2-libp2p/src/discovery/enr.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/discovery/enr.rs @@ -1,12 +1,12 @@ //! Helper functions and an extension trait for Ethereum 2 ENRs. pub use discv5::enr::{self, CombinedKey, EnrBuilder}; -pub use libp2p::core::identity::Keypair; +use super::enr_ext::CombinedKeyExt; use super::ENR_FILENAME; use crate::types::{Enr, EnrBitfield}; -use crate::CombinedKeyExt; use crate::NetworkConfig; +use libp2p::core::identity::Keypair; use slog::{debug, warn}; use ssz::{Decode, Encode}; use ssz_types::BitVector; diff --git a/beacon_node/eth2-libp2p/src/discovery/enr_ext.rs b/beacon_node/eth2_libp2p/src/peer_manager/discovery/enr_ext.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/discovery/enr_ext.rs rename to beacon_node/eth2_libp2p/src/peer_manager/discovery/enr_ext.rs diff --git a/beacon_node/eth2_libp2p/src/peer_manager/discovery/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/discovery/mod.rs new file mode 100644 index 0000000000..2e813b5362 --- /dev/null +++ b/beacon_node/eth2_libp2p/src/peer_manager/discovery/mod.rs @@ -0,0 +1,632 @@ +///! This manages the discovery and management of peers. +pub(crate) mod enr; +pub mod enr_ext; + +// Allow external use of the lighthouse ENR builder +pub use enr::{build_enr, CombinedKey, Eth2Enr}; +pub use enr_ext::{CombinedKeyExt, EnrExt}; +pub use libp2p::core::identity::Keypair; + +use crate::metrics; +use crate::{error, Enr, NetworkConfig, NetworkGlobals}; +use discv5::{enr::NodeId, Discv5, Discv5Event}; +use enr::{BITFIELD_ENR_KEY, ETH2_ENR_KEY}; +use futures::prelude::*; +use libp2p::core::PeerId; +// use libp2p::multiaddr::Protocol; +use futures::stream::FuturesUnordered; +use lru::LruCache; +use slog::{crit, debug, info, trace, warn}; +use ssz::{Decode, Encode}; +use ssz_types::BitVector; +use std::{ + collections::VecDeque, + net::SocketAddr, + path::Path, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + time::Instant, +}; +use tokio::sync::mpsc; +use types::{EnrForkId, EthSpec, SubnetId}; + +mod subnet_predicate; +use subnet_predicate::subnet_predicate; + +/// Local ENR storage filename. +pub const ENR_FILENAME: &str = "enr.dat"; +/// Target number of peers we'd like to have connected to a given long-lived subnet. +const TARGET_SUBNET_PEERS: usize = 3; +/// Number of times to attempt a discovery request +const MAX_DISCOVERY_RETRY: usize = 3; +/// The maximum number of concurrent discovery queries. +const MAX_CONCURRENT_QUERIES: usize = 1; +/// The number of closest peers to search for when doing a regular peer search. +/// +/// We could reduce this constant to speed up queries however at the cost of security. It will +/// make it easier to peers to eclipse this node. Kademlia suggests a value of 16. +const FIND_NODE_QUERY_CLOSEST_PEERS: usize = 16; + +/// The events emitted by polling discovery. +pub enum DiscoveryEvent { + /// A query has completed. The first parameter is the `min_ttl` of the peers if it is specified + /// and the second parameter are the discovered peers. + QueryResult(Option, Box>), + /// This indicates that our local UDP socketaddr has been updated and we should inform libp2p. + SocketUpdated(SocketAddr), +} + +#[derive(Debug, Clone, PartialEq)] +enum QueryType { + /// We are searching for subnet peers. + Subnet { + subnet_id: SubnetId, + min_ttl: Option, + retries: usize, + }, + /// We are searching for more peers without ENR or time constraints. + FindPeers, +} + +impl QueryType { + /// Returns true if this query has expired. + pub fn expired(&self) -> bool { + match self { + Self::FindPeers => false, + Self::Subnet { min_ttl, .. } => { + if let Some(ttl) = min_ttl { + ttl > &Instant::now() + } else { + true + } + } + } + } + + /// Returns the min_ttl of the query if one exists + /// + /// This is required for returning to the peer manager. The peer manager will update newly + /// connected peers with this `min_ttl` + pub fn min_ttl(&self) -> Option { + match self { + Self::FindPeers => None, + Self::Subnet { min_ttl, .. } => min_ttl.clone(), + } + } +} + +/// The result of a query. +struct QueryResult(QueryType, Result, discv5::QueryError>); + +// Awaiting the event stream future +enum EventStream { + /// Awaiting an event stream to be generated. This is required due to the poll nature of + /// `Discovery` + Awaiting( + Pin< + Box< + dyn Future, discv5::Discv5Error>> + + Send, + >, + >, + ), + /// The future has completed. + Present(mpsc::Receiver), + // The future has failed, there are no events from discv5. + Failed, +} + +pub struct Discovery { + /// A collection of seen live ENRs for quick lookup and to map peer-id's to ENRs. + cached_enrs: LruCache, + + /// The directory where the ENR is stored. + enr_dir: String, + + /// The handle for the underlying discv5 Server. + /// + /// This is behind a Reference counter to allow for futures to be spawned and polled with a + /// static lifetime. + discv5: Discv5, + + /// A collection of network constants that can be read from other threads. + network_globals: Arc>, + + /// Indicates if we are actively searching for peers. We only allow a single FindPeers query at + /// a time, regardless of the query concurrency. + find_peer_active: bool, + + /// A queue of discovery queries to be processed. + queued_queries: VecDeque, + + /// Active discovery queries. + active_queries: FuturesUnordered + Send>>>, + + /// The discv5 event stream. + event_stream: EventStream, + + /// Logger for the discovery behaviour. + log: slog::Logger, +} + +impl Discovery { + /// NOTE: Creating discovery requires running within a tokio execution environment. + pub fn new( + local_key: &Keypair, + config: &NetworkConfig, + network_globals: Arc>, + log: &slog::Logger, + ) -> error::Result { + let log = log.clone(); + + let enr_dir = match config.network_dir.to_str() { + Some(path) => String::from(path), + None => String::from(""), + }; + + let local_enr = network_globals.local_enr.read().clone(); + + info!(log, "ENR Initialised"; "enr" => local_enr.to_base64(), "seq" => local_enr.seq(), "id"=> format!("{}",local_enr.node_id()), "ip" => format!("{:?}", local_enr.ip()), "udp"=> format!("{:?}", local_enr.udp()), "tcp" => format!("{:?}", local_enr.tcp())); + + let listen_socket = SocketAddr::new(config.listen_address, config.discovery_port); + + // convert the keypair into an ENR key + let enr_key: CombinedKey = CombinedKey::from_libp2p(&local_key)?; + + let mut discv5 = Discv5::new(local_enr, enr_key, config.discv5_config.clone()) + .map_err(|e| format!("Discv5 service failed. Error: {:?}", e))?; + + // Add bootnodes to routing table + for bootnode_enr in config.boot_nodes.clone() { + debug!( + log, + "Adding node to routing table"; + "node_id" => format!("{}", bootnode_enr.node_id()), + "peer_id" => format!("{}", bootnode_enr.peer_id()), + "ip" => format!("{:?}", bootnode_enr.ip()), + "udp" => format!("{:?}", bootnode_enr.udp()), + "tcp" => format!("{:?}", bootnode_enr.tcp()) + ); + let _ = discv5.add_enr(bootnode_enr).map_err(|e| { + warn!( + log, + "Could not add peer to the local routing table"; + "error" => format!("{}", e) + ) + }); + } + + // start the discv5 service. + discv5.start(listen_socket); + debug!(log, "Discovery service started"); + + // obtain the event stream + let event_stream = EventStream::Awaiting(Box::pin(discv5.event_stream())); + + Ok(Self { + cached_enrs: LruCache::new(50), + network_globals, + find_peer_active: false, + queued_queries: VecDeque::with_capacity(10), + active_queries: FuturesUnordered::new(), + discv5, + event_stream, + log, + enr_dir, + }) + } + + /// Return the nodes local ENR. + pub fn local_enr(&self) -> Enr { + self.discv5.local_enr() + } + + /// This adds a new `FindPeers` query to the queue if one doesn't already exist. + pub fn discover_peers(&mut self) { + // If there is not already a find peer's query queued, add one + let query = QueryType::FindPeers; + if !self.queued_queries.contains(&query) { + trace!(self.log, "Queuing a peer discovery request"); + self.queued_queries.push_back(query); + // update the metrics + metrics::set_gauge(&metrics::DISCOVERY_QUEUE, self.queued_queries.len() as i64); + } + } + + /// Processes a request to search for more peers on a subnet. + pub fn discover_subnet_peers(&mut self, subnet_id: SubnetId, min_ttl: Option) { + self.add_subnet_query(subnet_id, min_ttl, 0); + } + + /// Adds a subnet query if one doesn't exist. If a subnet query already exists, this + /// updates the min_ttl field. + fn add_subnet_query(&mut self, subnet_id: SubnetId, min_ttl: Option, retries: usize) { + // remove the entry and complete the query if greater than the maximum search count + if retries >= MAX_DISCOVERY_RETRY { + debug!( + self.log, + "Subnet peer discovery did not find sufficient peers. Reached max retry limit" + ); + return; + } + + // Search through any queued requests and update the timeout if a query for this subnet + // already exists + let mut found = false; + for query in self.queued_queries.iter_mut() { + if let QueryType::Subnet { + subnet_id: ref mut q_subnet_id, + min_ttl: ref mut q_min_ttl, + retries: ref mut q_retries, + } = query + { + if *q_subnet_id == subnet_id { + if *q_min_ttl < min_ttl { + *q_min_ttl = min_ttl; + } + // update the number of retries + *q_retries = retries; + // mimic an `Iter::Find()` and short-circuit the loop + found = true; + break; + } + } + } + if !found { + // Set up the query and add it to the queue + let query = QueryType::Subnet { + subnet_id, + min_ttl, + retries, + }; + // update the metrics and insert into the queue. + metrics::set_gauge(&metrics::DISCOVERY_QUEUE, self.queued_queries.len() as i64); + self.queued_queries.push_back(query); + } + } + + /// Add an ENR to the routing table of the discovery mechanism. + pub fn add_enr(&mut self, enr: Enr) { + // add the enr to seen caches + self.cached_enrs.put(enr.peer_id(), enr.clone()); + + if let Err(e) = self.discv5.add_enr(enr) { + warn!( + self.log, + "Could not add peer to the local routing table"; + "error" => format!("{}", e) + ) + } + } + + /// Returns an iterator over all enr entries in the DHT. + pub fn table_entries_enr(&mut self) -> Vec { + self.discv5.table_entries_enr() + } + + /// Returns the ENR of a known peer if it exists. + pub fn enr_of_peer(&mut self, peer_id: &PeerId) -> Option { + // first search the local cache + if let Some(enr) = self.cached_enrs.get(peer_id) { + return Some(enr.clone()); + } + // not in the local cache, look in the routing table + if let Ok(node_id) = enr_ext::peer_id_to_node_id(peer_id) { + self.discv5.find_enr(&node_id) + } else { + None + } + } + + /// Adds/Removes a subnet from the ENR Bitfield + pub fn update_enr_bitfield(&mut self, subnet_id: SubnetId, value: bool) -> Result<(), String> { + let id = *subnet_id as usize; + + let local_enr = self.discv5.local_enr(); + let mut current_bitfield = local_enr.bitfield::()?; + + if id >= current_bitfield.len() { + return Err(format!( + "Subnet id: {} is outside the ENR bitfield length: {}", + id, + current_bitfield.len() + )); + } + + if current_bitfield + .get(id) + .map_err(|_| String::from("Subnet ID out of bounds"))? + == value + { + return Err(format!( + "Subnet id: {} already in the local ENR already has value: {}", + id, value + )); + } + + // set the subnet bitfield in the ENR + current_bitfield + .set(id, value) + .map_err(|_| String::from("Subnet ID out of bounds, could not set subnet ID"))?; + + // insert the bitfield into the ENR record + let _ = self + .discv5 + .enr_insert(BITFIELD_ENR_KEY, current_bitfield.as_ssz_bytes()); + + // replace the global version + *self.network_globals.local_enr.write() = self.discv5.local_enr().clone(); + Ok(()) + } + + /// Updates the `eth2` field of our local ENR. + pub fn update_eth2_enr(&mut self, enr_fork_id: EnrForkId) { + // to avoid having a reference to the spec constant, for the logging we assume + // FAR_FUTURE_EPOCH is u64::max_value() + let next_fork_epoch_log = if enr_fork_id.next_fork_epoch == u64::max_value() { + String::from("No other fork") + } else { + format!("{:?}", enr_fork_id.next_fork_epoch) + }; + + info!(self.log, "Updating the ENR fork version"; + "fork_digest" => format!("{:?}", enr_fork_id.fork_digest), + "next_fork_version" => format!("{:?}", enr_fork_id.next_fork_version), + "next_fork_epoch" => next_fork_epoch_log, + ); + + let _ = self + .discv5 + .enr_insert(ETH2_ENR_KEY.into(), enr_fork_id.as_ssz_bytes()) + .map_err(|e| { + warn!( + self.log, + "Could not update eth2 ENR field"; + "error" => format!("{:?}", e) + ) + }); + + // replace the global version with discovery version + *self.network_globals.local_enr.write() = self.discv5.local_enr().clone(); + } + + /* Internal Functions */ + + /// Consume the discovery queue and initiate queries when applicable. + /// + /// This also sanitizes the queue removing out-dated queries. + fn process_queue(&mut self) { + // Sanitize the queue, removing any out-dated subnet queries + self.queued_queries.retain(|query| !query.expired()); + + // Check that we are within our query concurrency limit + while !self.at_capacity() && !self.queued_queries.is_empty() { + // consume and process the query queue + match self.queued_queries.pop_front() { + Some(QueryType::FindPeers) => { + // Only permit one FindPeers query at a time + if self.find_peer_active { + self.queued_queries.push_back(QueryType::FindPeers); + continue; + } + // This is a regular request to find additional peers + debug!(self.log, "Searching for new peers"); + self.find_peer_active = true; + self.start_query(QueryType::FindPeers, FIND_NODE_QUERY_CLOSEST_PEERS); + } + Some(QueryType::Subnet { + subnet_id, + min_ttl, + retries, + }) => { + // This query is for searching for peers of a particular subnet + self.start_subnet_query(subnet_id, min_ttl, retries); + } + None => {} // Queue is empty + } + } + } + + // Returns a boolean indicating if we are currently processing the maximum number of + // concurrent queries or not. + fn at_capacity(&self) -> bool { + if self.active_queries.len() >= MAX_CONCURRENT_QUERIES { + true + } else { + false + } + } + + /// Runs a discovery request for a given subnet_id if one already exists. + fn start_subnet_query( + &mut self, + subnet_id: SubnetId, + min_ttl: Option, + retries: usize, + ) { + // Determine if we have sufficient peers, which may make this discovery unnecessary. + let peers_on_subnet = self + .network_globals + .peers + .read() + .peers_on_subnet(subnet_id) + .count(); + + if peers_on_subnet > TARGET_SUBNET_PEERS { + trace!(self.log, "Discovery ignored"; + "reason" => "Already connected to desired peers", + "connected_peers_on_subnet" => peers_on_subnet, + "target_subnet_peers" => TARGET_SUBNET_PEERS, + ); + return; + } + + let target_peers = TARGET_SUBNET_PEERS - peers_on_subnet; + debug!(self.log, "Searching for peers for subnet"; + "subnet_id" => *subnet_id, + "connected_peers_on_subnet" => peers_on_subnet, + "target_subnet_peers" => TARGET_SUBNET_PEERS, + "peers_to_find" => target_peers, + "attempt" => retries, + ); + + // start the query, and update the queries map if necessary + let query = QueryType::Subnet { + subnet_id, + min_ttl, + retries, + }; + self.start_query(query, target_peers); + } + + /// Search for a specified number of new peers using the underlying discovery mechanism. + /// + /// This can optionally search for peers for a given predicate. Regardless of the predicate + /// given, this will only search for peers on the same enr_fork_id as specified in the local + /// ENR. + fn start_query(&mut self, query: QueryType, target_peers: usize) { + // Generate a random target node id. + let random_node = NodeId::random(); + + let enr_fork_id = match self.local_enr().eth2() { + Ok(v) => v, + Err(e) => { + crit!(self.log, "Local ENR has no fork id"; "error" => e); + return; + } + }; + // predicate for finding nodes with a matching fork + let eth2_fork_predicate = move |enr: &Enr| enr.eth2() == Ok(enr_fork_id.clone()); + + // General predicate + let predicate: Box bool + Send> = match &query { + QueryType::FindPeers => Box::new(eth2_fork_predicate), + QueryType::Subnet { subnet_id, .. } => { + // build the subnet predicate as a combination of the eth2_fork_predicate and the + // subnet predicate + let subnet_predicate = subnet_predicate::(subnet_id.clone(), &self.log); + Box::new(move |enr: &Enr| eth2_fork_predicate(enr) && subnet_predicate(enr)) + } + }; + + // Build the future + let query_future = self + .discv5 + .find_node_predicate(random_node, predicate, target_peers) + .map(|v| QueryResult(query, v)); + + // Add the future to active queries, to be executed. + self.active_queries.push(Box::pin(query_future)); + } + + /// Drives the queries returning any results from completed queries. + fn poll_queries(&mut self, cx: &mut Context) -> Option<(Option, Vec)> { + while let Poll::Ready(Some(query_future)) = self.active_queries.poll_next_unpin(cx) { + match query_future.0 { + QueryType::FindPeers => { + self.find_peer_active = false; + match query_future.1 { + Ok(r) if r.is_empty() => { + debug!(self.log, "Discovery query yielded no results."); + } + Ok(r) => { + debug!(self.log, "Discovery query completed"; "peers_found" => r.len()); + return Some((None, r)); + } + Err(e) => { + warn!(self.log, "Discovery query failed"; "error" => e.to_string()); + } + } + } + QueryType::Subnet { + subnet_id, + min_ttl, + retries, + } => { + match query_future.1 { + Ok(r) if r.is_empty() => { + debug!(self.log, "Subnet discovery query yielded no results."; "subnet_id" => *subnet_id, "retries" => retries); + } + Ok(r) => { + debug!(self.log, "Peer subnet discovery request completed"; "peers_found" => r.len(), "subnet_id" => *subnet_id); + // A subnet query has completed. Add back to the queue, incrementing retries. + self.add_subnet_query(subnet_id, min_ttl, retries + 1); + // Report the results back to the peer manager. + return Some((query_future.0.min_ttl(), r)); + } + Err(e) => { + warn!(self.log,"Subnet Discovery query failed"; "subnet_id" => *subnet_id, "error" => e.to_string()); + } + } + } + } + } + None + } + + // Main execution loop to be driven by the peer manager. + pub fn poll(&mut self, cx: &mut Context) -> Poll { + // Process the query queue + self.process_queue(); + + // Drive the queries and return any results from completed queries + if let Some((min_ttl, result)) = self.poll_queries(cx) { + // cache the found ENR's + for enr in result.iter().cloned() { + self.cached_enrs.put(enr.peer_id(), enr); + } + // return the result to the peer manager + return Poll::Ready(DiscoveryEvent::QueryResult(min_ttl, Box::new(result))); + } + + // Process the server event stream + match self.event_stream { + EventStream::Awaiting(ref mut fut) => { + // Still awaiting the event stream, poll it + if let Poll::Ready(event_stream) = fut.poll_unpin(cx) { + match event_stream { + Ok(stream) => self.event_stream = EventStream::Present(stream), + Err(e) => { + slog::crit!(self.log, "Discv5 event stream failed"; "error" => e.to_string()); + self.event_stream = EventStream::Failed; + } + } + } + } + EventStream::Failed => {} // ignore checking the stream + EventStream::Present(ref mut stream) => { + while let Ok(event) = stream.try_recv() { + match event { + // We filter out unwanted discv5 events here and only propagate useful results to + // the peer manager. + Discv5Event::Discovered(_enr) => { + // Peers that get discovered during a query but are not contactable or + // don't match a predicate can end up here. For debugging purposes we + // log these to see if we are unnecessarily dropping discovered peers + /* + if enr.eth2() == self.local_enr().eth2() { + trace!(self.log, "Peer found in process of query"; "peer_id" => format!("{}", enr.peer_id()), "tcp_socket" => enr.tcp_socket()); + } else { + // this is temporary warning for debugging the DHT + warn!(self.log, "Found peer during discovery not on correct fork"; "peer_id" => format!("{}", enr.peer_id()), "tcp_socket" => enr.tcp_socket()); + } + */ + } + Discv5Event::SocketUpdated(socket) => { + info!(self.log, "Address updated"; "ip" => format!("{}",socket.ip()), "udp_port" => format!("{}", socket.port())); + metrics::inc_counter(&metrics::ADDRESS_UPDATE_COUNT); + // Discv5 will have updated our local ENR. We save the updated version + // to disk. + let enr = self.discv5.local_enr(); + enr::save_enr_to_disk(Path::new(&self.enr_dir), &enr, &self.log); + return Poll::Ready(DiscoveryEvent::SocketUpdated(socket)); + } + _ => {} // Ignore all other discv5 server events + } + } + } + } + Poll::Pending + } +} diff --git a/beacon_node/eth2-libp2p/src/discovery/subnet_predicate.rs b/beacon_node/eth2_libp2p/src/peer_manager/discovery/subnet_predicate.rs similarity index 95% rename from beacon_node/eth2-libp2p/src/discovery/subnet_predicate.rs rename to beacon_node/eth2_libp2p/src/peer_manager/discovery/subnet_predicate.rs index 89451d7f6b..2e74ff32ef 100644 --- a/beacon_node/eth2-libp2p/src/discovery/subnet_predicate.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/discovery/subnet_predicate.rs @@ -5,7 +5,7 @@ use super::*; pub fn subnet_predicate( subnet_id: SubnetId, log: &slog::Logger, -) -> impl Fn(&Enr) -> bool + Send + 'static + Clone +) -> impl Fn(&Enr) -> bool + Send where TSpec: EthSpec, { diff --git a/beacon_node/eth2-libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs similarity index 74% rename from beacon_node/eth2-libp2p/src/peer_manager/mod.rs rename to beacon_node/eth2_libp2p/src/peer_manager/mod.rs index a051f9b77d..ac4bd939c5 100644 --- a/beacon_node/eth2-libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -1,27 +1,35 @@ //! Implementation of a Lighthouse's peer management system. pub use self::peerdb::*; -use crate::metrics; use crate::rpc::{MetaData, Protocol, RPCError, RPCResponseErrorCode}; -use crate::{NetworkGlobals, PeerId}; +use crate::{error, metrics}; +use crate::{Enr, EnrExt, NetworkConfig, NetworkGlobals, PeerId}; use futures::prelude::*; use futures::Stream; use hashset_delay::HashSetDelay; +use libp2p::core::multiaddr::Protocol as MProtocol; use libp2p::identify::IdentifyInfo; -use slog::{crit, debug, error, warn}; +use slog::{crit, debug, error}; use smallvec::SmallVec; -use std::convert::TryInto; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; -use std::time::{Duration, Instant}; -use types::EthSpec; +use std::{ + net::SocketAddr, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + time::{Duration, Instant}, +}; +use types::{EthSpec, SubnetId}; + +pub use libp2p::core::{identity::Keypair, Multiaddr}; pub mod client; +pub mod discovery; mod peer_info; mod peer_sync_status; mod peerdb; +use discovery::{Discovery, DiscoveryEvent}; + pub use peer_info::{PeerConnectionStatus::*, PeerInfo}; pub use peer_sync_status::{PeerSyncStatus, SyncInfo}; /// The minimum reputation before a peer is disconnected. @@ -33,18 +41,26 @@ const STATUS_INTERVAL: u64 = 300; /// this time frame (Seconds) const PING_INTERVAL: u64 = 30; +/// The heartbeat performs regular updates such as updating reputations and performing discovery +/// requests. This defines the interval in seconds. +const HEARTBEAT_INTERVAL: u64 = 30; + /// The main struct that handles peer's reputation and connection status. pub struct PeerManager { /// Storage of network globals to access the `PeerDB`. network_globals: Arc>, /// A queue of events that the `PeerManager` is waiting to produce. - events: SmallVec<[PeerManagerEvent; 5]>, + events: SmallVec<[PeerManagerEvent; 16]>, /// A collection of peers awaiting to be Ping'd. ping_peers: HashSetDelay, /// A collection of peers awaiting to be Status'd. status_peers: HashSetDelay, - /// Last updated moment. - _last_updated: Instant, + /// The target number of peers we would like to connect to. + target_peers: usize, + /// The discovery service. + discovery: Discovery, + /// The heartbeat interval to perform routine maintenance. + heartbeat: tokio::time::Interval, /// The logger associated with the `PeerManager`. log: slog::Logger, } @@ -89,6 +105,10 @@ impl PeerAction { /// The events that the `PeerManager` outputs (requests). pub enum PeerManagerEvent { + /// Dial a PeerId. + Dial(PeerId), + /// Inform libp2p that our external socket addr has been updated. + SocketUpdated(Multiaddr), /// Sends a STATUS to a peer. Status(PeerId), /// Sends a PING to a peer. @@ -100,99 +120,59 @@ pub enum PeerManagerEvent { } impl PeerManager { - pub fn new(network_globals: Arc>, log: &slog::Logger) -> Self { - PeerManager { + // NOTE: Must be run inside a tokio executor. + pub fn new( + local_key: &Keypair, + config: &NetworkConfig, + network_globals: Arc>, + log: &slog::Logger, + ) -> error::Result { + // start the discovery service + let mut discovery = Discovery::new(local_key, config, network_globals.clone(), log)?; + + // start searching for peers + discovery.discover_peers(); + + let heartbeat = tokio::time::interval(tokio::time::Duration::from_secs(HEARTBEAT_INTERVAL)); + + Ok(PeerManager { network_globals, events: SmallVec::new(), - _last_updated: Instant::now(), ping_peers: HashSetDelay::new(Duration::from_secs(PING_INTERVAL)), status_peers: HashSetDelay::new(Duration::from_secs(STATUS_INTERVAL)), + target_peers: config.max_peers, //TODO: Add support for target peers and max peers + discovery, + heartbeat, log: log.clone(), - } + }) } /* Public accessible functions */ - /// A ping request has been received. - // NOTE: The behaviour responds with a PONG automatically - // TODO: Update last seen - pub fn ping_request(&mut self, peer_id: &PeerId, seq: u64) { - if let Some(peer_info) = self.network_globals.peers.read().peer_info(peer_id) { - // received a ping - // reset the to-ping timer for this peer - debug!(self.log, "Received a ping request"; "peer_id" => peer_id.to_string(), "seq_no" => seq); - self.ping_peers.insert(peer_id.clone()); + /* Discovery Requests */ - // if the sequence number is unknown send an update the meta data of the peer. - if let Some(meta_data) = &peer_info.meta_data { - if meta_data.seq_number < seq { - debug!(self.log, "Requesting new metadata from peer"; - "peer_id" => peer_id.to_string(), "known_seq_no" => meta_data.seq_number, "ping_seq_no" => seq); - self.events - .push(PeerManagerEvent::MetaData(peer_id.clone())); - } - } else { - // if we don't know the meta-data, request it - debug!(self.log, "Requesting first metadata from peer"; - "peer_id" => peer_id.to_string()); - self.events - .push(PeerManagerEvent::MetaData(peer_id.clone())); - } - } else { - crit!(self.log, "Received a PING from an unknown peer"; - "peer_id" => peer_id.to_string()); - } + /// Provides a reference to the underlying discovery service. + pub fn discovery(&self) -> &Discovery { + &self.discovery } - /// A PONG has been returned from a peer. - // TODO: Update last seen - pub fn pong_response(&mut self, peer_id: &PeerId, seq: u64) { - if let Some(peer_info) = self.network_globals.peers.read().peer_info(peer_id) { - // received a pong - - // if the sequence number is unknown send update the meta data of the peer. - if let Some(meta_data) = &peer_info.meta_data { - if meta_data.seq_number < seq { - debug!(self.log, "Requesting new metadata from peer"; - "peer_id" => peer_id.to_string(), "known_seq_no" => meta_data.seq_number, "pong_seq_no" => seq); - self.events - .push(PeerManagerEvent::MetaData(peer_id.clone())); - } - } else { - // if we don't know the meta-data, request it - debug!(self.log, "Requesting first metadata from peer"; - "peer_id" => peer_id.to_string()); - self.events - .push(PeerManagerEvent::MetaData(peer_id.clone())); - } - } else { - crit!(self.log, "Received a PONG from an unknown peer"; "peer_id" => peer_id.to_string()); - } + /// Provides a mutable reference to the underlying discovery service. + pub fn discovery_mut(&mut self) -> &mut Discovery { + &mut self.discovery } - /// Received a metadata response from a peer. - // TODO: Update last seen - pub fn meta_data_response(&mut self, peer_id: &PeerId, meta_data: MetaData) { - if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { - if let Some(known_meta_data) = &peer_info.meta_data { - if known_meta_data.seq_number < meta_data.seq_number { - debug!(self.log, "Updating peer's metadata"; - "peer_id" => peer_id.to_string(), "known_seq_no" => known_meta_data.seq_number, "new_seq_no" => meta_data.seq_number); - peer_info.meta_data = Some(meta_data); - } else { - debug!(self.log, "Received old metadata"; - "peer_id" => peer_id.to_string(), "known_seq_no" => known_meta_data.seq_number, "new_seq_no" => meta_data.seq_number); - } - } else { - // we have no meta-data for this peer, update - debug!(self.log, "Obtained peer's metadata"; - "peer_id" => peer_id.to_string(), "new_seq_no" => meta_data.seq_number); - peer_info.meta_data = Some(meta_data); - } - } else { - crit!(self.log, "Received METADATA from an unknown peer"; - "peer_id" => peer_id.to_string()); + /// A request to find peers on a given subnet. + pub fn discover_subnet_peers(&mut self, subnet_id: SubnetId, min_ttl: Option) { + // Extend the time to maintain peers if required. + if let Some(min_ttl) = min_ttl { + self.network_globals + .peers + .write() + .extend_peers_on_subnet(subnet_id, min_ttl); } + + // request the subnet query from discovery + self.discovery.discover_subnet_peers(subnet_id, min_ttl); } /// A STATUS message has been received from a peer. This resets the status timer. @@ -320,8 +300,158 @@ impl PeerManager { self.report_peer(peer_id, peer_action); } + /// A ping request has been received. + // NOTE: The behaviour responds with a PONG automatically + // TODO: Update last seen + pub fn ping_request(&mut self, peer_id: &PeerId, seq: u64) { + if let Some(peer_info) = self.network_globals.peers.read().peer_info(peer_id) { + // received a ping + // reset the to-ping timer for this peer + debug!(self.log, "Received a ping request"; "peer_id" => peer_id.to_string(), "seq_no" => seq); + self.ping_peers.insert(peer_id.clone()); + + // if the sequence number is unknown send an update the meta data of the peer. + if let Some(meta_data) = &peer_info.meta_data { + if meta_data.seq_number < seq { + debug!(self.log, "Requesting new metadata from peer"; + "peer_id" => peer_id.to_string(), "known_seq_no" => meta_data.seq_number, "ping_seq_no" => seq); + self.events + .push(PeerManagerEvent::MetaData(peer_id.clone())); + } + } else { + // if we don't know the meta-data, request it + debug!(self.log, "Requesting first metadata from peer"; + "peer_id" => peer_id.to_string()); + self.events + .push(PeerManagerEvent::MetaData(peer_id.clone())); + } + } else { + crit!(self.log, "Received a PING from an unknown peer"; + "peer_id" => peer_id.to_string()); + } + } + + /// A PONG has been returned from a peer. + // TODO: Update last seen + pub fn pong_response(&mut self, peer_id: &PeerId, seq: u64) { + if let Some(peer_info) = self.network_globals.peers.read().peer_info(peer_id) { + // received a pong + + // if the sequence number is unknown send update the meta data of the peer. + if let Some(meta_data) = &peer_info.meta_data { + if meta_data.seq_number < seq { + debug!(self.log, "Requesting new metadata from peer"; + "peer_id" => peer_id.to_string(), "known_seq_no" => meta_data.seq_number, "pong_seq_no" => seq); + self.events + .push(PeerManagerEvent::MetaData(peer_id.clone())); + } + } else { + // if we don't know the meta-data, request it + debug!(self.log, "Requesting first metadata from peer"; + "peer_id" => peer_id.to_string()); + self.events + .push(PeerManagerEvent::MetaData(peer_id.clone())); + } + } else { + crit!(self.log, "Received a PONG from an unknown peer"; "peer_id" => peer_id.to_string()); + } + } + + /// Received a metadata response from a peer. + // TODO: Update last seen + pub fn meta_data_response(&mut self, peer_id: &PeerId, meta_data: MetaData) { + if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { + if let Some(known_meta_data) = &peer_info.meta_data { + if known_meta_data.seq_number < meta_data.seq_number { + debug!(self.log, "Updating peer's metadata"; + "peer_id" => peer_id.to_string(), "known_seq_no" => known_meta_data.seq_number, "new_seq_no" => meta_data.seq_number); + peer_info.meta_data = Some(meta_data); + } else { + debug!(self.log, "Received old metadata"; + "peer_id" => peer_id.to_string(), "known_seq_no" => known_meta_data.seq_number, "new_seq_no" => meta_data.seq_number); + } + } else { + // we have no meta-data for this peer, update + debug!(self.log, "Obtained peer's metadata"; + "peer_id" => peer_id.to_string(), "new_seq_no" => meta_data.seq_number); + peer_info.meta_data = Some(meta_data); + } + } else { + crit!(self.log, "Received METADATA from an unknown peer"; + "peer_id" => peer_id.to_string()); + } + } + + // Handles the libp2p request to obtain multiaddrs for peer_id's in order to dial them. + pub fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { + if let Some(enr) = self.discovery.enr_of_peer(peer_id) { + // ENR's may have multiple Multiaddrs. The multi-addr associated with the UDP + // port is removed, which is assumed to be associated with the discv5 protocol (and + // therefore irrelevant for other libp2p components). + let mut out_list = enr.multiaddr(); + out_list.retain(|addr| { + addr.iter() + .find(|v| match v { + MProtocol::Udp(_) => true, + _ => false, + }) + .is_none() + }); + + out_list + } else { + // PeerId is not known + Vec::new() + } + } + /* Internal functions */ + // The underlying discovery server has updated our external IP address. We send this up to + // notify libp2p. + fn socket_updated(&mut self, socket: SocketAddr) { + // Build a multiaddr to report to libp2p + let mut multiaddr = Multiaddr::from(socket.ip()); + // NOTE: This doesn't actually track the external TCP port. More sophisticated NAT handling + // should handle this. + multiaddr.push(MProtocol::Tcp(self.network_globals.listen_port_tcp())); + self.events.push(PeerManagerEvent::SocketUpdated(multiaddr)); + } + + /// Peers that have been returned by discovery requests are dialed here if they are suitable. + /// + /// NOTE: By dialing `PeerId`s and not multiaddrs, libp2p requests the multiaddr associated + /// with a new `PeerId` which involves a discovery routing table lookup. We could dial the + /// multiaddr here, however this could relate to duplicate PeerId's etc. If the lookup + /// proves resource constraining, we should switch to multiaddr dialling here. + fn peers_discovered(&mut self, peers: Vec, min_ttl: Option) { + for enr in peers { + let peer_id = enr.peer_id(); + + // if we need more peers, attempt a connection + if self.network_globals.connected_or_dialing_peers() < self.target_peers + && !self + .network_globals + .peers + .read() + .is_connected_or_dialing(&peer_id) + && !self.network_globals.peers.read().peer_banned(&peer_id) + { + debug!(self.log, "Dialing discovered peer"; "peer_id"=> peer_id.to_string()); + // TODO: Update output + // This should be updated with the peer dialing. In fact created once the peer is + // dialed + if let Some(min_ttl) = min_ttl { + self.network_globals + .peers + .write() + .update_min_ttl(&peer_id, min_ttl); + } + self.events.push(PeerManagerEvent::Dial(peer_id)); + } + } + } + /// Registers a peer as connected. The `ingoing` parameter determines if the peer is being /// dialed or connecting to us. /// @@ -376,12 +506,11 @@ impl PeerManager { /// /// A banned(disconnected) peer that gets its rep above(below) MIN_REP_BEFORE_BAN is /// now considered a disconnected(banned) peer. + // TODO: Implement when reputation is added. fn _update_reputations(&mut self) { + /* // avoid locking the peerdb too often // TODO: call this on a timer - if self._last_updated.elapsed().as_secs() < 30 { - return; - } let now = Instant::now(); @@ -457,6 +586,28 @@ impl PeerManager { } self._last_updated = Instant::now(); + */ + } + + /// The Peer manager's heartbeat maintains the peer count and maintains peer reputations. + /// + /// It will request discovery queries if the peer count has not reached the desired number of + /// peers. + /// + /// NOTE: Discovery will only add a new query if one isn't already queued. + fn heartbeat(&mut self) { + // TODO: Provide a back-off time for discovery queries. I.e Queue many initially, then only + // perform discoveries over a larger fixed interval. Perhaps one every 6 heartbeats + let peer_count = self.network_globals.connected_or_dialing_peers(); + if peer_count < self.target_peers { + // If we need more peers, queue a discovery lookup. + self.discovery.discover_peers(); + } + + // TODO: If we have too many peers, remove peers that are not required for subnet + // validation. + + // TODO: Perform peer reputation maintenance here } } @@ -464,6 +615,21 @@ impl Stream for PeerManager { type Item = PeerManagerEvent; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // perform the heartbeat when necessary + while let Poll::Ready(Some(_)) = self.heartbeat.poll_next_unpin(cx) { + self.heartbeat(); + } + + // handle any discovery events + while let Poll::Ready(event) = self.discovery.poll(cx) { + match event { + DiscoveryEvent::SocketUpdated(socket_addr) => self.socket_updated(socket_addr), + DiscoveryEvent::QueryResult(min_ttl, peers) => { + self.peers_discovered(*peers, min_ttl) + } + } + } + // poll the timeouts for pings and status' loop { match self.ping_peers.poll_next_unpin(cx) { diff --git a/beacon_node/eth2-libp2p/src/peer_manager/peer_info.rs b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/peer_manager/peer_info.rs rename to beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs diff --git a/beacon_node/eth2-libp2p/src/peer_manager/peer_sync_status.rs b/beacon_node/eth2_libp2p/src/peer_manager/peer_sync_status.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/peer_manager/peer_sync_status.rs rename to beacon_node/eth2_libp2p/src/peer_manager/peer_sync_status.rs diff --git a/beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs similarity index 98% rename from beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs rename to beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs index b5aa2cb3d2..063d36db92 100644 --- a/beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs @@ -106,6 +106,14 @@ impl PeerDB { } } + /// Returns true if the Peer is banned. + pub fn peer_banned(&self, peer_id: &PeerId) -> bool { + match self.peers.get(peer_id).map(|info| &info.connection_status) { + Some(status) => status.is_banned(), + None => false, + } + } + /// Gives the ids of all known connected peers. pub fn connected_peers(&self) -> impl Iterator)> { self.peers diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/base.rs b/beacon_node/eth2_libp2p/src/rpc/codec/base.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/rpc/codec/base.rs rename to beacon_node/eth2_libp2p/src/rpc/codec/base.rs diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs b/beacon_node/eth2_libp2p/src/rpc/codec/mod.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/rpc/codec/mod.rs rename to beacon_node/eth2_libp2p/src/rpc/codec/mod.rs diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs b/beacon_node/eth2_libp2p/src/rpc/codec/ssz.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs rename to beacon_node/eth2_libp2p/src/rpc/codec/ssz.rs diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/ssz_snappy.rs b/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/rpc/codec/ssz_snappy.rs rename to beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2_libp2p/src/rpc/handler.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/rpc/handler.rs rename to beacon_node/eth2_libp2p/src/rpc/handler.rs diff --git a/beacon_node/eth2-libp2p/src/rpc/methods.rs b/beacon_node/eth2_libp2p/src/rpc/methods.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/rpc/methods.rs rename to beacon_node/eth2_libp2p/src/rpc/methods.rs diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2_libp2p/src/rpc/mod.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/rpc/mod.rs rename to beacon_node/eth2_libp2p/src/rpc/mod.rs diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2_libp2p/src/rpc/protocol.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/rpc/protocol.rs rename to beacon_node/eth2_libp2p/src/rpc/protocol.rs diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2_libp2p/src/service.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/service.rs rename to beacon_node/eth2_libp2p/src/service.rs diff --git a/beacon_node/eth2-libp2p/src/types/error.rs b/beacon_node/eth2_libp2p/src/types/error.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/types/error.rs rename to beacon_node/eth2_libp2p/src/types/error.rs diff --git a/beacon_node/eth2-libp2p/src/types/globals.rs b/beacon_node/eth2_libp2p/src/types/globals.rs similarity index 98% rename from beacon_node/eth2-libp2p/src/types/globals.rs rename to beacon_node/eth2_libp2p/src/types/globals.rs index d765d4240f..84e183b6bc 100644 --- a/beacon_node/eth2-libp2p/src/types/globals.rs +++ b/beacon_node/eth2_libp2p/src/types/globals.rs @@ -4,7 +4,7 @@ use crate::rpc::methods::MetaData; use crate::types::SyncState; use crate::Client; use crate::EnrExt; -use crate::{discovery::enr::Eth2Enr, Enr, GossipTopic, Multiaddr, PeerId}; +use crate::{Enr, Eth2Enr, GossipTopic, Multiaddr, PeerId}; use parking_lot::RwLock; use std::collections::HashSet; use std::sync::atomic::{AtomicU16, Ordering}; diff --git a/beacon_node/eth2-libp2p/src/types/mod.rs b/beacon_node/eth2_libp2p/src/types/mod.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/types/mod.rs rename to beacon_node/eth2_libp2p/src/types/mod.rs diff --git a/beacon_node/eth2-libp2p/src/types/pubsub.rs b/beacon_node/eth2_libp2p/src/types/pubsub.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/types/pubsub.rs rename to beacon_node/eth2_libp2p/src/types/pubsub.rs diff --git a/beacon_node/eth2-libp2p/src/types/sync_state.rs b/beacon_node/eth2_libp2p/src/types/sync_state.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/types/sync_state.rs rename to beacon_node/eth2_libp2p/src/types/sync_state.rs diff --git a/beacon_node/eth2-libp2p/src/types/topics.rs b/beacon_node/eth2_libp2p/src/types/topics.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/types/topics.rs rename to beacon_node/eth2_libp2p/src/types/topics.rs diff --git a/beacon_node/eth2-libp2p/tests/common/mod.rs b/beacon_node/eth2_libp2p/tests/common/mod.rs similarity index 97% rename from beacon_node/eth2-libp2p/tests/common/mod.rs rename to beacon_node/eth2_libp2p/tests/common/mod.rs index e26381f51c..ba88bf5031 100644 --- a/beacon_node/eth2-libp2p/tests/common/mod.rs +++ b/beacon_node/eth2_libp2p/tests/common/mod.rs @@ -115,7 +115,7 @@ pub fn build_libp2p_instance( #[allow(dead_code)] pub fn get_enr(node: &LibP2PService) -> Enr { - let enr = node.swarm.discovery().local_enr().clone(); + let enr = node.swarm.local_enr().clone(); enr } @@ -153,7 +153,7 @@ pub async fn build_node_pair(log: &slog::Logger) -> (Libp2pInstance, Libp2pInsta let mut sender = build_libp2p_instance(vec![], None, sender_log); let mut receiver = build_libp2p_instance(vec![], None, receiver_log); - let receiver_multiaddr = receiver.swarm.discovery().local_enr().clone().multiaddr()[1].clone(); + let receiver_multiaddr = receiver.swarm.local_enr().multiaddr()[1].clone(); // let the two nodes set up listeners let sender_fut = async { diff --git a/beacon_node/eth2-libp2p/tests/gossipsub_tests.rs b/beacon_node/eth2_libp2p/tests/gossipsub_tests.rs similarity index 100% rename from beacon_node/eth2-libp2p/tests/gossipsub_tests.rs rename to beacon_node/eth2_libp2p/tests/gossipsub_tests.rs diff --git a/beacon_node/eth2-libp2p/tests/noise.rs b/beacon_node/eth2_libp2p/tests/noise.rs similarity index 100% rename from beacon_node/eth2-libp2p/tests/noise.rs rename to beacon_node/eth2_libp2p/tests/noise.rs diff --git a/beacon_node/eth2-libp2p/tests/rpc_tests.rs b/beacon_node/eth2_libp2p/tests/rpc_tests.rs similarity index 100% rename from beacon_node/eth2-libp2p/tests/rpc_tests.rs rename to beacon_node/eth2_libp2p/tests/rpc_tests.rs diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index b58a1a6ef7..47f248433f 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -15,7 +15,7 @@ exit-future = "0.2.0" [dependencies] beacon_chain = { path = "../beacon_chain" } store = { path = "../store" } -eth2-libp2p = { path = "../eth2-libp2p" } +eth2_libp2p = { path = "../eth2_libp2p" } hashset_delay = { path = "../../common/hashset_delay" } rest_types = { path = "../../common/rest_types" } types = { path = "../../consensus/types" } diff --git a/beacon_node/network/src/attestation_service/mod.rs b/beacon_node/network/src/attestation_service/mod.rs index 2fb8facea6..5626e32ce7 100644 --- a/beacon_node/network/src/attestation_service/mod.rs +++ b/beacon_node/network/src/attestation_service/mod.rs @@ -71,17 +71,20 @@ impl PartialEq for AttServiceMessage { subnet_id: other_subnet_id, min_ttl: other_min_ttl, }, - ) => match (min_ttl, other_min_ttl) { - (Some(min_ttl_instant), Some(other_min_ttl_instant)) => { - min_ttl_instant.saturating_duration_since(other_min_ttl_instant) - < DURATION_DIFFERENCE - && other_min_ttl_instant.saturating_duration_since(min_ttl_instant) - < DURATION_DIFFERENCE - && subnet_id == other_subnet_id - } - (None, None) => subnet_id == other_subnet_id, - _ => false, - }, + ) => { + subnet_id == other_subnet_id + && match (min_ttl, other_min_ttl) { + (Some(min_ttl_instant), Some(other_min_ttl_instant)) => { + min_ttl_instant.saturating_duration_since(other_min_ttl_instant) + < DURATION_DIFFERENCE + && other_min_ttl_instant.saturating_duration_since(min_ttl_instant) + < DURATION_DIFFERENCE + } + (None, None) => true, + (None, Some(_)) => true, + (Some(_), None) => true, + } + } _ => false, } } @@ -353,12 +356,12 @@ impl AttestationService { *other_min_ttl = min_ttl; } } - (None, Some(_)) => { - // Update the min_ttl to None, because the new message is longer-lived. - *other_min_ttl = None; + (None, Some(_)) => {} // Keep the current one as it has an actual min_ttl + (Some(min_ttl), None) => { + // Update the request to include a min_ttl. + *other_min_ttl = Some(min_ttl); } - (Some(_), None) => {} // Don't replace this because the existing message is for a longer-lived peer. - (None, None) => {} // Duplicate message, do nothing. + (None, None) => {} // Duplicate message, do nothing. } is_duplicate = true; return; diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 669f670372..1b29a0a2c0 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -11,7 +11,7 @@ use eth2_libp2p::{ rpc::{RPCResponseErrorCode, RequestId}, Libp2pEvent, PeerRequestId, PubsubMessage, Request, Response, }; -use eth2_libp2p::{BehaviourEvent, Enr, MessageId, NetworkGlobals, PeerId}; +use eth2_libp2p::{BehaviourEvent, MessageId, NetworkGlobals, PeerId}; use futures::prelude::*; use rest_types::ValidatorSubscription; use slog::{debug, error, info, o, trace}; @@ -136,7 +136,7 @@ fn spawn_service( // handle network shutdown _ = (&mut exit_rx) => { // network thread is terminating - let enrs: Vec = service.libp2p.swarm.enr_entries().cloned().collect(); + let enrs = service.libp2p.swarm.enr_entries(); debug!( service.log, "Persisting DHT to store"; diff --git a/beacon_node/rest_api/Cargo.toml b/beacon_node/rest_api/Cargo.toml index e226df87f0..bd9302f62a 100644 --- a/beacon_node/rest_api/Cargo.toml +++ b/beacon_node/rest_api/Cargo.toml @@ -10,7 +10,7 @@ bls = { path = "../../crypto/bls" } rest_types = { path = "../../common/rest_types" } beacon_chain = { path = "../beacon_chain" } network = { path = "../network" } -eth2-libp2p = { path = "../eth2-libp2p" } +eth2_libp2p = { path = "../eth2_libp2p" } store = { path = "../store" } version = { path = "../version" } serde = { version = "1.0.110", features = ["derive"] } diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index d8459b3c3a..fdf8855a4a 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -82,6 +82,8 @@ impl ProductionBeaconNode { let db_path = client_config.create_db_path()?; let freezer_db_path_res = client_config.create_freezer_db_path(); + let executor = context.executor.clone(); + let builder = ClientBuilder::new(context.eth_spec_instance.clone()) .runtime_context(context) .chain_spec(spec) @@ -119,6 +121,9 @@ impl ProductionBeaconNode { .system_time_slot_clock()? .tee_event_handler(client_config.websocket_server.clone())?; + // Inject the executor into the discv5 network config. + client_config.network.discv5_config.executor = Some(Box::new(executor)); + let builder = builder .build_beacon_chain()? .network(&mut client_config.network)? diff --git a/lcli/Cargo.toml b/lcli/Cargo.toml index a87cf88e7d..8e64c279cb 100644 --- a/lcli/Cargo.toml +++ b/lcli/Cargo.toml @@ -28,7 +28,7 @@ deposit_contract = { path = "../common/deposit_contract" } tree_hash = "0.1.0" tokio = { version = "0.2.21", features = ["full"] } clap_utils = { path = "../common/clap_utils" } -eth2-libp2p = { path = "../beacon_node/eth2-libp2p" } +eth2_libp2p = { path = "../beacon_node/eth2_libp2p" } validator_dir = { path = "../common/validator_dir", features = ["insecure_keys"] } rand = "0.7.2" eth2_keystore = { path = "../crypto/eth2_keystore" } diff --git a/lighthouse/environment/Cargo.toml b/lighthouse/environment/Cargo.toml index 7d36f7f987..4510b9bbae 100644 --- a/lighthouse/environment/Cargo.toml +++ b/lighthouse/environment/Cargo.toml @@ -21,3 +21,4 @@ slog-json = "2.3.0" exit-future = "0.2.0" lazy_static = "1.4.0" lighthouse_metrics = { path = "../../common/lighthouse_metrics" } +discv5 = "0.1.0-alpha.5" diff --git a/lighthouse/environment/src/executor.rs b/lighthouse/environment/src/executor.rs index feb95eeb14..78c9de3039 100644 --- a/lighthouse/environment/src/executor.rs +++ b/lighthouse/environment/src/executor.rs @@ -126,3 +126,9 @@ impl TaskExecutor { &self.log } } + +impl discv5::Executor for TaskExecutor { + fn spawn(&self, future: std::pin::Pin + Send>>) { + self.spawn(future, "discv5") + } +}