From d7e29382967c16a7a4ce73d1672867523287402b Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Sun, 5 Apr 2020 13:59:14 +0530 Subject: [PATCH] Add snappy encoding to gossipsub messages (#984) * Add snappy encode/decode to gossip messages * Fix gossipsub tests --- Cargo.lock | 7 + beacon_node/eth2-libp2p/Cargo.toml | 1 + beacon_node/eth2-libp2p/src/behaviour.rs | 28 ++-- beacon_node/eth2-libp2p/src/config.rs | 4 +- beacon_node/eth2-libp2p/src/types/pubsub.rs | 138 +++++++++++------- beacon_node/eth2-libp2p/src/types/topics.rs | 11 ++ .../eth2-libp2p/tests/gossipsub_tests.rs | 4 +- 7 files changed, 128 insertions(+), 65 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b3983b23bf..42304d508b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1207,6 +1207,7 @@ dependencies = [ "slog-stdlog 4.0.0", "slog-term", "smallvec 1.2.0", + "snap", "tempdir", "tokio", "tokio-io-timeout", @@ -4173,6 +4174,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c2fb2ec9bcd216a5b0d0ccf31ab17b5ed1d627960edff65bbe95d3ce221cefc" +[[package]] +name = "snap" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7fb9b0bb877b35a1cc1474a3b43d9c226a2625311760cdda2cbccbc0c7a8376" + [[package]] name = "snow" version = "0.6.2" diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index 5cbf7dffb8..2432b34208 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -31,6 +31,7 @@ lru = "0.4.3" parking_lot = "0.9.0" sha2 = "0.8.0" base64 = "0.11.0" +snap = "1" [dev-dependencies] slog-stdlog = "4.0.0" diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index 94c847de84..4878106f55 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -105,16 +105,22 @@ impl Behaviour bool { - let gossip_topic = - GossipTopic::new(kind, GossipEncoding::SSZ, self.enr_fork_id.fork_digest); + let gossip_topic = GossipTopic::new( + kind, + GossipEncoding::default(), + self.enr_fork_id.fork_digest, + ); self.subscribe(gossip_topic) } /// Unsubscribes from a gossipsub topic kind, letting the network service determine the /// encoding and fork version. pub fn unsubscribe_kind(&mut self, kind: GossipKind) -> bool { - let gossip_topic = - GossipTopic::new(kind, GossipEncoding::SSZ, self.enr_fork_id.fork_digest); + let gossip_topic = GossipTopic::new( + kind, + GossipEncoding::default(), + self.enr_fork_id.fork_digest, + ); self.unsubscribe(gossip_topic) } @@ -122,7 +128,7 @@ impl Behaviour bool { let topic = GossipTopic::new( subnet_id.into(), - GossipEncoding::SSZ, + GossipEncoding::default(), self.enr_fork_id.fork_digest, ); self.subscribe(topic) @@ -132,7 +138,7 @@ impl Behaviour bool { let topic = GossipTopic::new( subnet_id.into(), - GossipEncoding::SSZ, + GossipEncoding::default(), self.enr_fork_id.fork_digest, ); self.unsubscribe(topic) @@ -165,9 +171,13 @@ impl Behaviour>) { for message in messages { - for topic in message.topics(GossipEncoding::SSZ, self.enr_fork_id.fork_digest) { - let message_data = message.encode(GossipEncoding::SSZ); - self.gossipsub.publish(&topic.into(), message_data); + for topic in message.topics(GossipEncoding::default(), self.enr_fork_id.fork_digest) { + match message.encode(GossipEncoding::default()) { + Ok(message_data) => { + self.gossipsub.publish(&topic.into(), message_data); + } + Err(e) => crit!(self.log, "Could not publish message"; "error" => e), + } } } } diff --git a/beacon_node/eth2-libp2p/src/config.rs b/beacon_node/eth2-libp2p/src/config.rs index ef24c75f6a..cffe04e31c 100644 --- a/beacon_node/eth2-libp2p/src/config.rs +++ b/beacon_node/eth2-libp2p/src/config.rs @@ -8,6 +8,8 @@ use sha2::{Digest, Sha256}; use std::path::PathBuf; use std::time::Duration; +pub const GOSSIP_MAX_SIZE: usize = 1_048_576; + #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] /// Network configuration for lighthouse. @@ -98,7 +100,7 @@ impl Default for Config { // Note: The topics by default are sent as plain strings. Hashes are an optional // parameter. let gs_config = GossipsubConfigBuilder::new() - .max_transmit_size(1_048_576) + .max_transmit_size(GOSSIP_MAX_SIZE) .heartbeat_interval(Duration::from_secs(20)) // TODO: Reduce for mainnet .manual_propagation() // require validation before propagation .no_source_id() diff --git a/beacon_node/eth2-libp2p/src/types/pubsub.rs b/beacon_node/eth2-libp2p/src/types/pubsub.rs index ee352e0349..8ed3e3ce23 100644 --- a/beacon_node/eth2-libp2p/src/types/pubsub.rs +++ b/beacon_node/eth2-libp2p/src/types/pubsub.rs @@ -1,7 +1,9 @@ //! Handles the encoding and decoding of pubsub messages. +use crate::config::GOSSIP_MAX_SIZE; use crate::types::{GossipEncoding, GossipKind, GossipTopic}; use crate::TopicHash; +use snap::raw::{decompress_len, Decoder, Encoder}; use ssz::{Decode, Encode}; use std::boxed::Box; use types::SubnetId; @@ -66,55 +68,71 @@ impl PubsubMessage { continue; } Ok(gossip_topic) => { - match gossip_topic.encoding() { + let mut decompressed_data: Vec = Vec::new(); + let data = match gossip_topic.encoding() { // group each part by encoding type - GossipEncoding::SSZ => { - // the ssz decoders - match gossip_topic.kind() { - GossipKind::BeaconAggregateAndProof => { - let agg_and_proof = - SignedAggregateAndProof::from_ssz_bytes(data) - .map_err(|e| format!("{:?}", e))?; - return Ok(PubsubMessage::AggregateAndProofAttestation( - Box::new(agg_and_proof), - )); + GossipEncoding::SSZSnappy => { + match decompress_len(data) { + Ok(n) if n > GOSSIP_MAX_SIZE => { + return Err("ssz_snappy decoded data > GOSSIP_MAX_SIZE".into()); } - GossipKind::CommitteeIndex(subnet_id) => { - let attestation = Attestation::from_ssz_bytes(data) - .map_err(|e| format!("{:?}", e))?; - return Ok(PubsubMessage::Attestation(Box::new(( - *subnet_id, - attestation, - )))); + Ok(n) => decompressed_data.resize(n, 0), + Err(e) => { + return Err(format!("{}", e)); } - GossipKind::BeaconBlock => { - let beacon_block = SignedBeaconBlock::from_ssz_bytes(data) - .map_err(|e| format!("{:?}", e))?; - return Ok(PubsubMessage::BeaconBlock(Box::new(beacon_block))); - } - GossipKind::VoluntaryExit => { - let voluntary_exit = VoluntaryExit::from_ssz_bytes(data) - .map_err(|e| format!("{:?}", e))?; - return Ok(PubsubMessage::VoluntaryExit(Box::new( - voluntary_exit, - ))); - } - GossipKind::ProposerSlashing => { - let proposer_slashing = ProposerSlashing::from_ssz_bytes(data) - .map_err(|e| format!("{:?}", e))?; - return Ok(PubsubMessage::ProposerSlashing(Box::new( - proposer_slashing, - ))); - } - GossipKind::AttesterSlashing => { - let attester_slashing = AttesterSlashing::from_ssz_bytes(data) - .map_err(|e| format!("{:?}", e))?; - return Ok(PubsubMessage::AttesterSlashing(Box::new( - attester_slashing, - ))); + }; + let mut decoder = Decoder::new(); + match decoder.decompress(data, &mut decompressed_data) { + Ok(n) => { + decompressed_data.truncate(n); + &decompressed_data } + Err(e) => return Err(format!("{}", e)), } } + GossipEncoding::SSZ => data, + }; + // the ssz decoders + match gossip_topic.kind() { + GossipKind::BeaconAggregateAndProof => { + let agg_and_proof = SignedAggregateAndProof::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?; + return Ok(PubsubMessage::AggregateAndProofAttestation(Box::new( + agg_and_proof, + ))); + } + GossipKind::CommitteeIndex(subnet_id) => { + let attestation = Attestation::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?; + return Ok(PubsubMessage::Attestation(Box::new(( + *subnet_id, + attestation, + )))); + } + GossipKind::BeaconBlock => { + let beacon_block = SignedBeaconBlock::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?; + return Ok(PubsubMessage::BeaconBlock(Box::new(beacon_block))); + } + GossipKind::VoluntaryExit => { + let voluntary_exit = VoluntaryExit::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?; + return Ok(PubsubMessage::VoluntaryExit(Box::new(voluntary_exit))); + } + GossipKind::ProposerSlashing => { + let proposer_slashing = ProposerSlashing::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?; + return Ok(PubsubMessage::ProposerSlashing(Box::new( + proposer_slashing, + ))); + } + GossipKind::AttesterSlashing => { + let attester_slashing = AttesterSlashing::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?; + return Ok(PubsubMessage::AttesterSlashing(Box::new( + attester_slashing, + ))); + } } } } @@ -124,18 +142,32 @@ impl PubsubMessage { /// Encodes a `PubsubMessage` based on the topic encodings. The first known encoding is used. If /// no encoding is known, and error is returned. - pub fn encode(&self, encoding: GossipEncoding) -> Vec { + pub fn encode(&self, encoding: GossipEncoding) -> Result, String> { + let data = match &self { + PubsubMessage::BeaconBlock(data) => data.as_ssz_bytes(), + PubsubMessage::AggregateAndProofAttestation(data) => data.as_ssz_bytes(), + PubsubMessage::VoluntaryExit(data) => data.as_ssz_bytes(), + PubsubMessage::ProposerSlashing(data) => data.as_ssz_bytes(), + PubsubMessage::AttesterSlashing(data) => data.as_ssz_bytes(), + PubsubMessage::Attestation(data) => data.1.as_ssz_bytes(), + }; match encoding { GossipEncoding::SSZ => { - // SSZ Encodings - return match &self { - PubsubMessage::BeaconBlock(data) => data.as_ssz_bytes(), - PubsubMessage::AggregateAndProofAttestation(data) => data.as_ssz_bytes(), - PubsubMessage::VoluntaryExit(data) => data.as_ssz_bytes(), - PubsubMessage::ProposerSlashing(data) => data.as_ssz_bytes(), - PubsubMessage::AttesterSlashing(data) => data.as_ssz_bytes(), - PubsubMessage::Attestation(data) => data.1.as_ssz_bytes(), - }; + if data.len() > GOSSIP_MAX_SIZE { + return Err("ssz encoded data > GOSSIP_MAX_SIZE".into()); + } else { + Ok(data) + } + } + GossipEncoding::SSZSnappy => { + let mut encoder = Encoder::new(); + match encoder.compress_vec(&data) { + Ok(compressed) if compressed.len() > GOSSIP_MAX_SIZE => { + Err("ssz_snappy Encoded data > GOSSIP_MAX_SIZE".into()) + } + Ok(compressed) => Ok(compressed), + Err(e) => Err(format!("{}", e)), + } } } } diff --git a/beacon_node/eth2-libp2p/src/types/topics.rs b/beacon_node/eth2-libp2p/src/types/topics.rs index d93e1572b7..ebb067e6b9 100644 --- a/beacon_node/eth2-libp2p/src/types/topics.rs +++ b/beacon_node/eth2-libp2p/src/types/topics.rs @@ -7,6 +7,7 @@ use types::SubnetId; // For example /eth2/beacon_block/ssz pub const TOPIC_PREFIX: &str = "eth2"; pub const SSZ_ENCODING_POSTFIX: &str = "ssz"; +pub const SSZ_SNAPPY_ENCODING_POSTFIX: &str = "ssz_snappy"; pub const BEACON_BLOCK_TOPIC: &str = "beacon_block"; pub const BEACON_AGGREGATE_AND_PROOF_TOPIC: &str = "beacon_aggregate_and_proof"; // for speed and easier string manipulation, committee topic index is split into a prefix and a @@ -65,6 +66,14 @@ impl std::fmt::Display for GossipKind { pub enum GossipEncoding { /// Messages are encoded with SSZ. SSZ, + /// Messages are encoded with SSZSnappy. + SSZSnappy, +} + +impl Default for GossipEncoding { + fn default() -> Self { + GossipEncoding::SSZSnappy + } } impl GossipTopic { @@ -109,6 +118,7 @@ impl GossipTopic { let encoding = match topic_parts[4] { SSZ_ENCODING_POSTFIX => GossipEncoding::SSZ, + SSZ_SNAPPY_ENCODING_POSTFIX => GossipEncoding::SSZSnappy, _ => return Err(format!("Unknown encoding: {}", topic)), }; let kind = match topic_parts[3] { @@ -144,6 +154,7 @@ impl Into for GossipTopic { fn into(self) -> String { let encoding = match self.encoding { GossipEncoding::SSZ => SSZ_ENCODING_POSTFIX, + GossipEncoding::SSZSnappy => SSZ_SNAPPY_ENCODING_POSTFIX, }; let kind = match self.kind { diff --git a/beacon_node/eth2-libp2p/tests/gossipsub_tests.rs b/beacon_node/eth2-libp2p/tests/gossipsub_tests.rs index ed18cfff40..49d0c538db 100644 --- a/beacon_node/eth2-libp2p/tests/gossipsub_tests.rs +++ b/beacon_node/eth2-libp2p/tests/gossipsub_tests.rs @@ -35,7 +35,7 @@ fn test_gossipsub_forward() { }; let pubsub_message = PubsubMessage::BeaconBlock(Box::new(signed_block)); let publishing_topic: String = pubsub_message - .topics(GossipEncoding::SSZ, [0, 0, 0, 0]) + .topics(GossipEncoding::default(), [0, 0, 0, 0]) .first() .unwrap() .clone() @@ -108,7 +108,7 @@ fn test_gossipsub_full_mesh_publish() { }; let pubsub_message = PubsubMessage::BeaconBlock(Box::new(signed_block)); let publishing_topic: String = pubsub_message - .topics(GossipEncoding::SSZ, [0, 0, 0, 0]) + .topics(GossipEncoding::default(), [0, 0, 0, 0]) .first() .unwrap() .clone()