From 8248afa7932cb7eb89fd2f5cfded9a77078a6ed8 Mon Sep 17 00:00:00 2001 From: blacktemplar Date: Wed, 14 Oct 2020 06:51:58 +0000 Subject: [PATCH] Updates the message-id according to the Networking Spec (#1752) ## Proposed Changes Implement the new message id function (see https://github.com/ethereum/eth2.0-specs/pull/2089) using an additional fast message id function for better performance + caching decompressed data. --- Cargo.lock | 26 +++---- beacon_node/eth2_libp2p/Cargo.toml | 2 +- .../src/behaviour/handler/delegate.rs | 2 +- .../eth2_libp2p/src/behaviour/handler/mod.rs | 2 +- beacon_node/eth2_libp2p/src/behaviour/mod.rs | 11 +-- beacon_node/eth2_libp2p/src/config.rs | 38 ++++++++-- beacon_node/eth2_libp2p/src/lib.rs | 5 +- beacon_node/eth2_libp2p/src/types/mod.rs | 2 +- beacon_node/eth2_libp2p/src/types/pubsub.rs | 69 +++++++++++++------ beacon_node/eth2_libp2p/tests/common/mod.rs | 3 +- beacon_node/network/src/service.rs | 4 +- 11 files changed, 113 insertions(+), 51 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 17184a10f7..6c1ff4db64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2917,7 +2917,7 @@ checksum = "c7d73b3f436185384286bd8098d17ec07c9a7d2388a6599f824d8502b529702a" [[package]] name = "libp2p" version = "0.29.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=5a9f0819af3990cfefad528e957297af596399b4#5a9f0819af3990cfefad528e957297af596399b4" +source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd" dependencies = [ "atomic", "bytes 0.5.6", @@ -2978,7 +2978,7 @@ dependencies = [ [[package]] name = "libp2p-core" version = "0.22.2" -source = "git+https://github.com/sigp/rust-libp2p?rev=5a9f0819af3990cfefad528e957297af596399b4#5a9f0819af3990cfefad528e957297af596399b4" +source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd" dependencies = [ "asn1_der", "bs58", @@ -3011,7 +3011,7 @@ dependencies = [ [[package]] name = "libp2p-core-derive" version = "0.20.2" -source = "git+https://github.com/sigp/rust-libp2p?rev=5a9f0819af3990cfefad528e957297af596399b4#5a9f0819af3990cfefad528e957297af596399b4" +source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd" dependencies = [ "quote", "syn", @@ -3020,7 +3020,7 @@ dependencies = [ [[package]] name = "libp2p-dns" version = "0.22.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=5a9f0819af3990cfefad528e957297af596399b4#5a9f0819af3990cfefad528e957297af596399b4" +source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd" dependencies = [ "futures 0.3.6", "libp2p-core 0.22.2", @@ -3030,7 +3030,7 @@ dependencies = [ [[package]] name = "libp2p-gossipsub" version = "0.22.1" -source = "git+https://github.com/sigp/rust-libp2p?rev=5a9f0819af3990cfefad528e957297af596399b4#5a9f0819af3990cfefad528e957297af596399b4" +source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd" dependencies = [ "base64 0.12.3", "byteorder", @@ -3054,7 +3054,7 @@ dependencies = [ [[package]] name = "libp2p-identify" version = "0.22.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=5a9f0819af3990cfefad528e957297af596399b4#5a9f0819af3990cfefad528e957297af596399b4" +source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd" dependencies = [ "futures 0.3.6", "libp2p-core 0.22.2", @@ -3069,7 +3069,7 @@ dependencies = [ [[package]] name = "libp2p-mplex" version = "0.23.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=5a9f0819af3990cfefad528e957297af596399b4#5a9f0819af3990cfefad528e957297af596399b4" +source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd" dependencies = [ "bytes 0.5.6", "fnv", @@ -3084,7 +3084,7 @@ dependencies = [ [[package]] name = "libp2p-noise" version = "0.24.1" -source = "git+https://github.com/sigp/rust-libp2p?rev=5a9f0819af3990cfefad528e957297af596399b4#5a9f0819af3990cfefad528e957297af596399b4" +source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd" dependencies = [ "bytes 0.5.6", "curve25519-dalek", @@ -3105,7 +3105,7 @@ dependencies = [ [[package]] name = "libp2p-swarm" version = "0.22.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=5a9f0819af3990cfefad528e957297af596399b4#5a9f0819af3990cfefad528e957297af596399b4" +source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd" dependencies = [ "either", "futures 0.3.6", @@ -3120,7 +3120,7 @@ dependencies = [ [[package]] name = "libp2p-tcp" version = "0.22.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=5a9f0819af3990cfefad528e957297af596399b4#5a9f0819af3990cfefad528e957297af596399b4" +source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd" dependencies = [ "futures 0.3.6", "futures-timer", @@ -3135,7 +3135,7 @@ dependencies = [ [[package]] name = "libp2p-websocket" version = "0.23.1" -source = "git+https://github.com/sigp/rust-libp2p?rev=5a9f0819af3990cfefad528e957297af596399b4#5a9f0819af3990cfefad528e957297af596399b4" +source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd" dependencies = [ "async-tls", "either", @@ -3552,7 +3552,7 @@ dependencies = [ [[package]] name = "multistream-select" version = "0.8.3" -source = "git+https://github.com/sigp/rust-libp2p?rev=5a9f0819af3990cfefad528e957297af596399b4#5a9f0819af3990cfefad528e957297af596399b4" +source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd" dependencies = [ "bytes 0.5.6", "futures 0.3.6", @@ -3851,7 +3851,7 @@ dependencies = [ [[package]] name = "parity-multiaddr" version = "0.9.3" -source = "git+https://github.com/sigp/rust-libp2p?rev=5a9f0819af3990cfefad528e957297af596399b4#5a9f0819af3990cfefad528e957297af596399b4" +source = "git+https://github.com/sigp/rust-libp2p?rev=fb4fda2e393fc113577ef45f0ecdfe68e24f13dd#fb4fda2e393fc113577ef45f0ecdfe68e24f13dd" dependencies = [ "arrayref", "bs58", diff --git a/beacon_node/eth2_libp2p/Cargo.toml b/beacon_node/eth2_libp2p/Cargo.toml index 82cf8cf79d..397ffde4f5 100644 --- a/beacon_node/eth2_libp2p/Cargo.toml +++ b/beacon_node/eth2_libp2p/Cargo.toml @@ -42,7 +42,7 @@ regex = "1.3.9" [dependencies.libp2p] #version = "0.23.0" git = "https://github.com/sigp/rust-libp2p" -rev = "5a9f0819af3990cfefad528e957297af596399b4" +rev = "fb4fda2e393fc113577ef45f0ecdfe68e24f13dd" default-features = false features = ["websocket", "identify", "mplex", "noise", "gossipsub", "dns", "tcp-tokio"] diff --git a/beacon_node/eth2_libp2p/src/behaviour/handler/delegate.rs b/beacon_node/eth2_libp2p/src/behaviour/handler/delegate.rs index 452686a5cf..f849114f31 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/handler/delegate.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/handler/delegate.rs @@ -1,8 +1,8 @@ +use crate::behaviour::Gossipsub; use crate::rpc::*; use libp2p::{ core::either::{EitherError, EitherOutput}, core::upgrade::{EitherUpgrade, InboundUpgrade, OutboundUpgrade, SelectUpgrade, UpgradeError}, - gossipsub::Gossipsub, identify::Identify, swarm::{ protocols_handler::{ diff --git a/beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs index 538c122cc7..93a953e6a7 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs @@ -1,3 +1,4 @@ +use crate::behaviour::Gossipsub; use crate::rpc::*; use delegate::DelegatingHandler; pub(super) use delegate::{ @@ -5,7 +6,6 @@ pub(super) use delegate::{ }; use libp2p::{ core::upgrade::{InboundUpgrade, OutboundUpgrade}, - gossipsub::Gossipsub, identify::Identify, swarm::protocols_handler::{ KeepAlive, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, diff --git a/beacon_node/eth2_libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs index 4c3f8e05c5..87f73e1191 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -1,7 +1,7 @@ use crate::peer_manager::{score::PeerAction, PeerManager, PeerManagerEvent}; use crate::rpc::*; use crate::service::METADATA_FILENAME; -use crate::types::{GossipEncoding, GossipKind, GossipTopic, SubnetDiscovery}; +use crate::types::{GossipEncoding, GossipKind, GossipTopic, MessageData, SubnetDiscovery}; use crate::Eth2Enr; use crate::{error, metrics, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash}; use futures::prelude::*; @@ -13,8 +13,8 @@ use libp2p::{ Multiaddr, }, gossipsub::{ - Gossipsub, GossipsubEvent, IdentTopic as Topic, MessageAcceptance, MessageAuthenticity, - MessageId, + GenericGossipsub, GenericGossipsubEvent, IdentTopic as Topic, MessageAcceptance, + MessageAuthenticity, MessageId, }, identify::{Identify, IdentifyEvent}, swarm::{ @@ -43,6 +43,9 @@ const MAX_IDENTIFY_ADDRESSES: usize = 10; /// Identifier of requests sent by a peer. pub type PeerRequestId = (ConnectionId, SubstreamId); +pub type Gossipsub = GenericGossipsub; +pub type GossipsubEvent = GenericGossipsubEvent; + /// The types of events than can be obtained from polling the behaviour. #[derive(Debug)] pub enum BehaviourEvent { @@ -518,7 +521,7 @@ impl Behaviour { } => { // Note: We are keeping track here of the peer that sent us the message, not the // peer that originally published the message. - match PubsubMessage::decode(&gs_msg.topics, &gs_msg.data) { + match PubsubMessage::decode(&gs_msg.topics, gs_msg.data()) { Err(e) => { debug!(self.log, "Could not decode gossipsub message"; "error" => e); //reject the message diff --git a/beacon_node/eth2_libp2p/src/config.rs b/beacon_node/eth2_libp2p/src/config.rs index 93e0a423c7..e022f8c658 100644 --- a/beacon_node/eth2_libp2p/src/config.rs +++ b/beacon_node/eth2_libp2p/src/config.rs @@ -1,11 +1,12 @@ -use crate::types::GossipKind; +use crate::types::{GossipKind, MessageData}; use crate::{Enr, PeerIdSerialized}; use directory::{ DEFAULT_BEACON_NODE_DIR, DEFAULT_HARDCODED_TESTNET, DEFAULT_NETWORK_DIR, DEFAULT_ROOT_DIR, }; use discv5::{Discv5Config, Discv5ConfigBuilder}; use libp2p::gossipsub::{ - GossipsubConfig, GossipsubConfigBuilder, GossipsubMessage, MessageId, ValidationMode, + FastMessageId, GenericGossipsubConfig, GenericGossipsubConfigBuilder, GenericGossipsubMessage, + MessageId, RawGossipsubMessage, ValidationMode, }; use libp2p::Multiaddr; use serde_derive::{Deserialize, Serialize}; @@ -14,6 +15,12 @@ use std::path::PathBuf; use std::time::Duration; pub const GOSSIP_MAX_SIZE: usize = 1_048_576; +const MESSAGE_DOMAIN_INVALID_SNAPPY: [u8; 4] = [0, 0, 0, 0]; +const MESSAGE_DOMAIN_VALID_SNAPPY: [u8; 4] = [1, 0, 0, 0]; + +pub type GossipsubConfig = GenericGossipsubConfig; +pub type GossipsubConfigBuilder = GenericGossipsubConfigBuilder; +pub type GossipsubMessage = GenericGossipsubMessage; #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] @@ -91,8 +98,30 @@ impl Default for Config { // The function used to generate a gossipsub message id // We use the first 8 bytes of SHA256(data) for content addressing - let gossip_message_id = - |message: &GossipsubMessage| MessageId::from(&Sha256::digest(&message.data)[..]); + let fast_gossip_message_id = + |message: &RawGossipsubMessage| FastMessageId::from(&Sha256::digest(&message.data)[..]); + + fn prefix(prefix: [u8; 4], data: &[u8]) -> Vec { + prefix + .to_vec() + .into_iter() + .chain(data.iter().cloned()) + .collect() + } + + let gossip_message_id = |message: &GossipsubMessage| { + MessageId::from( + &Sha256::digest( + { + match &message.data.decompressed { + Ok(decompressed) => prefix(MESSAGE_DOMAIN_VALID_SNAPPY, decompressed), + _ => prefix(MESSAGE_DOMAIN_INVALID_SNAPPY, &message.data.raw), + } + } + .as_slice(), + )[..20], + ) + }; // gossipsub configuration // Note: The topics by default are sent as plain strings. Hashes are an optional @@ -112,6 +141,7 @@ impl Default for Config { // prevent duplicates for 550 heartbeats(700millis * 550) = 385 secs .duplicate_cache_time(Duration::from_secs(385)) .message_id_fn(gossip_message_id) + .fast_message_id_fn(fast_gossip_message_id) .build() .expect("valid gossipsub configuration"); diff --git a/beacon_node/eth2_libp2p/src/lib.rs b/beacon_node/eth2_libp2p/src/lib.rs index 655e69d659..f5a989b73c 100644 --- a/beacon_node/eth2_libp2p/src/lib.rs +++ b/beacon_node/eth2_libp2p/src/lib.rs @@ -59,11 +59,12 @@ impl<'de> Deserialize<'de> for PeerIdSerialized { } pub use crate::types::{error, Enr, GossipTopic, NetworkGlobals, PubsubMessage, SubnetDiscovery}; -pub use behaviour::{BehaviourEvent, PeerRequestId, Request, Response}; +pub use behaviour::{BehaviourEvent, Gossipsub, PeerRequestId, Request, Response}; pub use config::Config as NetworkConfig; +pub use config::{GossipsubConfig, GossipsubConfigBuilder, GossipsubMessage}; pub use discovery::{CombinedKeyExt, EnrExt, Eth2Enr}; pub use discv5; -pub use libp2p::gossipsub::{Gossipsub, MessageAcceptance, MessageId, Topic, TopicHash}; +pub use libp2p::gossipsub::{MessageAcceptance, MessageId, Topic, TopicHash}; pub use libp2p::{core::ConnectedPoint, PeerId, Swarm}; pub use libp2p::{multiaddr, Multiaddr}; pub use metrics::scrape_discovery_metrics; diff --git a/beacon_node/eth2_libp2p/src/types/mod.rs b/beacon_node/eth2_libp2p/src/types/mod.rs index 762ea7d740..6d51a1abab 100644 --- a/beacon_node/eth2_libp2p/src/types/mod.rs +++ b/beacon_node/eth2_libp2p/src/types/mod.rs @@ -13,7 +13,7 @@ pub type EnrBitfield = BitVector; pub type Enr = discv5::enr::Enr; pub use globals::NetworkGlobals; -pub use pubsub::PubsubMessage; +pub use pubsub::{MessageData, PubsubMessage}; pub use subnet::SubnetDiscovery; pub use sync_state::SyncState; pub use topics::{GossipEncoding, GossipKind, GossipTopic, CORE_TOPICS}; diff --git a/beacon_node/eth2_libp2p/src/types/pubsub.rs b/beacon_node/eth2_libp2p/src/types/pubsub.rs index 73e0dd710c..c27d3df4f5 100644 --- a/beacon_node/eth2_libp2p/src/types/pubsub.rs +++ b/beacon_node/eth2_libp2p/src/types/pubsub.rs @@ -12,6 +12,33 @@ use types::{ SignedBeaconBlock, SignedVoluntaryExit, }; +#[derive(Clone)] +pub struct MessageData { + pub raw: Vec, + pub decompressed: Result, String>, +} + +impl AsRef<[u8]> for MessageData { + fn as_ref(&self) -> &[u8] { + self.raw.as_ref() + } +} + +impl Into> for MessageData { + fn into(self) -> Vec { + self.raw + } +} + +impl From> for MessageData { + fn from(raw: Vec) -> Self { + Self { + decompressed: decompress_snappy(raw.as_ref()), + raw, + } + } +} + #[derive(Debug, Clone, PartialEq)] pub enum PubsubMessage { /// Gossipsub message providing notification of a new block. @@ -28,6 +55,24 @@ pub enum PubsubMessage { AttesterSlashing(Box>), } +fn decompress_snappy(data: &[u8]) -> Result, String> { + // Exit early if uncompressed data is > GOSSIP_MAX_SIZE + match decompress_len(data) { + Ok(n) if n > GOSSIP_MAX_SIZE => { + return Err("ssz_snappy decoded data > GOSSIP_MAX_SIZE".into()); + } + Ok(_) => {} + Err(e) => { + return Err(format!("{}", e)); + } + }; + let mut decoder = Decoder::new(); + match decoder.decompress_vec(data) { + Ok(decompressed_data) => Ok(decompressed_data), + Err(e) => Err(format!("{}", e)), + } +} + impl PubsubMessage { /// Returns the topics that each pubsub message will be sent across, given a supported /// gossipsub encoding and fork version. @@ -59,7 +104,7 @@ impl PubsubMessage { * Also note that a message can be associated with many topics. As soon as one of the topics is * known we match. If none of the topics are known we return an unknown state. */ - pub fn decode(topics: &[TopicHash], data: &[u8]) -> Result { + pub fn decode(topics: &[TopicHash], data: &MessageData) -> Result { let mut unknown_topics = Vec::new(); for topic in topics { match GossipTopic::decode(topic.as_str()) { @@ -68,25 +113,9 @@ impl PubsubMessage { continue; } Ok(gossip_topic) => { - let decompressed_data = &(match gossip_topic.encoding() { - GossipEncoding::SSZSnappy => { - // Exit early if uncompressed data is > GOSSIP_MAX_SIZE - match decompress_len(data) { - Ok(n) if n > GOSSIP_MAX_SIZE => { - return Err("ssz_snappy decoded data > GOSSIP_MAX_SIZE".into()); - } - Ok(_) => {} - Err(e) => { - return Err(format!("{}", e)); - } - }; - let mut decoder = Decoder::new(); - match decoder.decompress_vec(data) { - Ok(decompressed_data) => decompressed_data, - Err(e) => return Err(format!("{}", e)), - } - } - }); + let decompressed_data = match gossip_topic.encoding() { + GossipEncoding::SSZSnappy => data.decompressed.as_ref()?.as_slice(), + }; // the ssz decoders match gossip_topic.kind() { GossipKind::BeaconAggregateAndProof => { diff --git a/beacon_node/eth2_libp2p/tests/common/mod.rs b/beacon_node/eth2_libp2p/tests/common/mod.rs index 916f2f8411..608edd9e95 100644 --- a/beacon_node/eth2_libp2p/tests/common/mod.rs +++ b/beacon_node/eth2_libp2p/tests/common/mod.rs @@ -3,14 +3,13 @@ use eth2_libp2p::Enr; use eth2_libp2p::EnrExt; use eth2_libp2p::Multiaddr; use eth2_libp2p::Service as LibP2PService; -use eth2_libp2p::{Libp2pEvent, NetworkConfig}; +use eth2_libp2p::{GossipsubConfigBuilder, Libp2pEvent, NetworkConfig}; use slog::{debug, error, o, Drain}; use std::net::{TcpListener, UdpSocket}; use std::time::Duration; use types::{EnrForkId, MinimalEthSpec}; type E = MinimalEthSpec; -use libp2p::gossipsub::GossipsubConfigBuilder; use tempdir::TempDir; pub struct Libp2pInstance(LibP2PService, exit_future::Signal); diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 1188211efa..2ce6c086d4 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -8,7 +8,7 @@ use crate::{error, metrics}; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::{ rpc::{GoodbyeReason, RPCResponseErrorCode, RequestId}, - Libp2pEvent, PeerAction, PeerRequestId, PubsubMessage, Request, Response, + Gossipsub, Libp2pEvent, PeerAction, PeerRequestId, PubsubMessage, Request, Response, }; use eth2_libp2p::{ types::GossipKind, BehaviourEvent, GossipTopic, MessageId, NetworkGlobals, PeerId, TopicHash, @@ -537,7 +537,7 @@ fn expose_receive_metrics(message: &PubsubMessage) { } } -fn update_gossip_metrics(gossipsub: ð2_libp2p::Gossipsub) { +fn update_gossip_metrics(gossipsub: &Gossipsub) { // Clear the metrics let _ = metrics::PEERS_PER_PROTOCOL .as_ref()