From 88cecd6fb8495152ae7323bc6183ed10c43378a2 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Wed, 1 Apr 2020 17:20:32 +1100 Subject: [PATCH] V0.11.0 network update (#976) * Adjust RPC methods to match v0.11.1 * Adds fork handling for gossipsub topics * Update gossipsub topics to v0.11.0 --- beacon_node/eth2-libp2p/src/behaviour.rs | 97 ++++++++++---- beacon_node/eth2-libp2p/src/config.rs | 14 +-- beacon_node/eth2-libp2p/src/lib.rs | 2 +- beacon_node/eth2-libp2p/src/rpc/methods.rs | 11 +- beacon_node/eth2-libp2p/src/service.rs | 21 ++-- beacon_node/eth2-libp2p/src/types/mod.rs | 2 +- beacon_node/eth2-libp2p/src/types/pubsub.rs | 118 +++++++----------- beacon_node/eth2-libp2p/src/types/topics.rs | 62 +++++++-- .../eth2-libp2p/tests/gossipsub_tests.rs | 28 +++-- beacon_node/eth2-libp2p/tests/rpc_tests.rs | 6 +- beacon_node/network/src/router/mod.rs | 16 +-- beacon_node/network/src/router/processor.rs | 17 +-- beacon_node/network/src/service.rs | 16 ++- .../network/src/sync/network_context.rs | 2 +- .../network/src/sync/range_sync/batch.rs | 14 +-- .../network/src/sync/range_sync/chain.rs | 8 +- beacon_node/rest_api/src/helpers.rs | 20 +-- beacon_node/src/cli.rs | 8 -- beacon_node/src/config.rs | 11 +- 19 files changed, 247 insertions(+), 226 deletions(-) diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index 20a33e4e78..94c847de84 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -1,8 +1,7 @@ use crate::discovery::Discovery; use crate::rpc::{RPCEvent, RPCMessage, RPC}; -use crate::types::GossipEncoding; -use crate::Enr; -use crate::{error, GossipTopic, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash}; +use crate::types::{GossipEncoding, GossipKind, GossipTopic}; +use crate::{error, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash}; use futures::prelude::*; use libp2p::{ core::identity::Keypair, @@ -47,6 +46,9 @@ pub struct Behaviour { #[behaviour(ignore)] network_globals: Arc>, #[behaviour(ignore)] + /// Keeps track of the current EnrForkId for upgrading gossipsub topics. + enr_fork_id: EnrForkId, + #[behaviour(ignore)] /// Logger for behaviour actions. log: slog::Logger, } @@ -74,7 +76,7 @@ impl Behaviour Behaviour Behaviour Behaviour { /* Pubsub behaviour functions */ + /// Subscribes to a gossipsub topic kind, letting the network service determine the + /// encoding and fork version. + pub fn subscribe_kind(&mut self, kind: GossipKind) -> bool { + let gossip_topic = + GossipTopic::new(kind, GossipEncoding::SSZ, self.enr_fork_id.fork_digest); + self.subscribe(gossip_topic) + } + + /// Unsubscribes from a gossipsub topic kind, letting the network service determine the + /// encoding and fork version. + pub fn unsubscribe_kind(&mut self, kind: GossipKind) -> bool { + let gossip_topic = + GossipTopic::new(kind, GossipEncoding::SSZ, self.enr_fork_id.fork_digest); + self.unsubscribe(gossip_topic) + } + + /// Subscribes to a specific subnet id; + pub fn subscribe_to_subnet(&mut self, subnet_id: SubnetId) -> bool { + let topic = GossipTopic::new( + subnet_id.into(), + GossipEncoding::SSZ, + self.enr_fork_id.fork_digest, + ); + self.subscribe(topic) + } + + /// Un-Subscribes from a specific subnet id; + pub fn unsubscribe_from_subnet(&mut self, subnet_id: SubnetId) -> bool { + let topic = GossipTopic::new( + subnet_id.into(), + GossipEncoding::SSZ, + self.enr_fork_id.fork_digest, + ); + self.unsubscribe(topic) + } + /// Subscribes to a gossipsub topic. - pub fn subscribe(&mut self, topic: GossipTopic) -> bool { + fn subscribe(&mut self, topic: GossipTopic) -> bool { // update the network globals self.network_globals .gossipsub_subscriptions .write() .insert(topic.clone()); - // subscribe to the topic + + let topic_str: String = topic.clone().into(); + debug!(self.log, "Subscribed to topic"; "topic" => topic_str); self.gossipsub.subscribe(topic.into()) } - /// Subscribes to a specific subnet id; - pub fn subscribe_to_subnet(&mut self, subnet_id: SubnetId) { - let topic = GossipTopic::new(subnet_id.into(), GossipEncoding::SSZ); - self.subscribe(topic); - } - /// Unsubscribe from a gossipsub topic. - pub fn unsubscribe(&mut self, topic: GossipTopic) -> bool { + fn unsubscribe(&mut self, topic: GossipTopic) -> bool { // update the network globals self.network_globals .gossipsub_subscriptions @@ -127,17 +162,11 @@ impl Behaviour>) { for message in messages { - for topic in message.topics() { - let message_data = message.encode(); + for topic in message.topics(GossipEncoding::SSZ, self.enr_fork_id.fork_digest) { + let message_data = message.encode(GossipEncoding::SSZ); self.gossipsub.publish(&topic.into(), message_data); } } @@ -195,8 +224,30 @@ impl Behaviour>(); + + // unsubscribe from all topics + for topic in &subscribed_topics { + self.unsubscribe(topic.clone()); + } + + // re-subscribe modifying the fork version + for mut topic in subscribed_topics { + *topic.digest() = enr_fork_id.fork_digest; + self.subscribe(topic); + } + + // update the local reference + self.enr_fork_id = enr_fork_id; } } diff --git a/beacon_node/eth2-libp2p/src/config.rs b/beacon_node/eth2-libp2p/src/config.rs index b10bd362e5..ef24c75f6a 100644 --- a/beacon_node/eth2-libp2p/src/config.rs +++ b/beacon_node/eth2-libp2p/src/config.rs @@ -1,4 +1,4 @@ -use crate::types::{GossipEncoding, GossipKind, GossipTopic}; +use crate::types::GossipKind; use crate::Enr; use libp2p::discv5::{Discv5Config, Discv5ConfigBuilder}; use libp2p::gossipsub::{GossipsubConfig, GossipsubConfigBuilder, GossipsubMessage, MessageId}; @@ -61,7 +61,7 @@ pub struct Config { pub client_version: String, /// List of extra topics to initially subscribe to as strings. - pub topics: Vec, + pub topics: Vec, /// Introduces randomization in network propagation of messages. This should only be set for /// testing purposes and will likely be removed in future versions. @@ -78,11 +78,11 @@ impl Default for Config { // The default topics that we will initially subscribe to let topics = vec![ - GossipTopic::new(GossipKind::BeaconBlock, GossipEncoding::SSZ), - GossipTopic::new(GossipKind::BeaconAggregateAndProof, GossipEncoding::SSZ), - GossipTopic::new(GossipKind::VoluntaryExit, GossipEncoding::SSZ), - GossipTopic::new(GossipKind::ProposerSlashing, GossipEncoding::SSZ), - GossipTopic::new(GossipKind::AttesterSlashing, GossipEncoding::SSZ), + GossipKind::BeaconBlock, + GossipKind::BeaconAggregateAndProof, + GossipKind::VoluntaryExit, + GossipKind::ProposerSlashing, + GossipKind::AttesterSlashing, ]; // The function used to generate a gossipsub message id diff --git a/beacon_node/eth2-libp2p/src/lib.rs b/beacon_node/eth2-libp2p/src/lib.rs index c630152b31..37e8aebfed 100644 --- a/beacon_node/eth2-libp2p/src/lib.rs +++ b/beacon_node/eth2-libp2p/src/lib.rs @@ -16,7 +16,7 @@ pub mod types; // shift this type into discv5 pub type Enr = libp2p::discv5::enr::Enr; -pub use crate::types::{error, GossipTopic, NetworkGlobals, PeerInfo, PubsubData, PubsubMessage}; +pub use crate::types::{error, GossipTopic, NetworkGlobals, PeerInfo, PubsubMessage}; pub use config::Config as NetworkConfig; pub use libp2p::gossipsub::{MessageId, Topic, TopicHash}; pub use libp2p::{multiaddr, Multiaddr}; diff --git a/beacon_node/eth2-libp2p/src/rpc/methods.rs b/beacon_node/eth2-libp2p/src/rpc/methods.rs index a98be96249..4f1258ef16 100644 --- a/beacon_node/eth2-libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2-libp2p/src/rpc/methods.rs @@ -13,7 +13,7 @@ pub type RequestId = usize; #[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct StatusMessage { /// The fork version of the chain we are broadcasting. - pub fork_version: [u8; 4], + pub fork_digest: [u8; 4], /// Latest finalized root. pub finalized_root: Hash256, @@ -101,9 +101,6 @@ impl ssz::Decode for GoodbyeReason { /// Request a number of beacon block roots from a peer. #[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct BlocksByRangeRequest { - /// The hash tree root of a block on the requested chain. - pub head_block_root: Hash256, - /// The starting slot to request blocks. pub start_slot: u64, @@ -238,7 +235,7 @@ impl ErrorMessage { impl std::fmt::Display for StatusMessage { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Status Message: Fork Version: {:?}, Finalized Root: {}, Finalized Epoch: {}, Head Root: {}, Head Slot: {}", self.fork_version, self.finalized_root, self.finalized_epoch, self.head_root, self.head_slot) + write!(f, "Status Message: Fork Digest: {:?}, Finalized Root: {}, Finalized Epoch: {}, Head Root: {}, Head Slot: {}", self.fork_digest, self.finalized_root, self.finalized_epoch, self.head_root, self.head_slot) } } @@ -283,8 +280,8 @@ impl std::fmt::Display for BlocksByRangeRequest { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "Head Block Root: {}, Start Slot: {}, Count: {}, Step: {}", - self.head_block_root, self.start_slot, self.count, self.step + "Start Slot: {}, Count: {}, Step: {}", + self.start_slot, self.count, self.step ) } } diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index 3aefcce950..9f265c2a93 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -1,9 +1,8 @@ use crate::behaviour::{Behaviour, BehaviourEvent}; use crate::multiaddr::Protocol; use crate::rpc::RPCEvent; -use crate::types::error; -use crate::NetworkConfig; -use crate::{NetworkGlobals, PubsubMessage, TopicHash}; +use crate::types::{error, GossipKind}; +use crate::{NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash}; use futures::prelude::*; use futures::Stream; use libp2p::core::{ @@ -144,18 +143,12 @@ impl Service { } } - let mut subscribed_topics: Vec = vec![]; - for topic in &config.topics { - let topic_string: String = topic.clone().into(); - if swarm.subscribe(topic.clone()) { - trace!(log, "Subscribed to topic"; "topic" => format!("{}", topic_string)); - subscribed_topics.push(topic_string); - network_globals - .gossipsub_subscriptions - .write() - .insert(topic.clone()); + let mut subscribed_topics: Vec = vec![]; + for topic_kind in &config.topics { + if swarm.subscribe_kind(topic_kind.clone()) { + subscribed_topics.push(topic_kind.clone()); } else { - warn!(log, "Could not subscribe to topic"; "topic" => format!("{}",topic_string)); + warn!(log, "Could not subscribe to topic"; "topic" => format!("{}",topic_kind)); } } info!(log, "Subscribed to topics"; "topics" => format!("{:?}", subscribed_topics)); diff --git a/beacon_node/eth2-libp2p/src/types/mod.rs b/beacon_node/eth2-libp2p/src/types/mod.rs index 5146ecdd44..13fa844a50 100644 --- a/beacon_node/eth2-libp2p/src/types/mod.rs +++ b/beacon_node/eth2-libp2p/src/types/mod.rs @@ -6,5 +6,5 @@ mod topics; pub use globals::NetworkGlobals; pub use peer_info::{EnrBitfield, PeerInfo}; -pub use pubsub::{PubsubData, PubsubMessage}; +pub use pubsub::PubsubMessage; pub use topics::{GossipEncoding, GossipKind, GossipTopic}; diff --git a/beacon_node/eth2-libp2p/src/types/pubsub.rs b/beacon_node/eth2-libp2p/src/types/pubsub.rs index efab328f22..ee352e0349 100644 --- a/beacon_node/eth2-libp2p/src/types/pubsub.rs +++ b/beacon_node/eth2-libp2p/src/types/pubsub.rs @@ -10,17 +10,8 @@ use types::{ SignedBeaconBlock, VoluntaryExit, }; -/// Messages that are passed to and from the pubsub (Gossipsub) behaviour. #[derive(Debug, Clone, PartialEq)] -pub struct PubsubMessage { - /// The encoding to be used to encode/decode the message - pub encoding: GossipEncoding, - /// The actual message being sent. - pub data: PubsubData, -} - -#[derive(Debug, Clone, PartialEq)] -pub enum PubsubData { +pub enum PubsubMessage { /// Gossipsub message providing notification of a new block. BeaconBlock(Box>), /// Gossipsub message providing notification of a Aggregate attestation and associated proof. @@ -36,36 +27,30 @@ pub enum PubsubData { } impl PubsubMessage { - pub fn new(encoding: GossipEncoding, data: PubsubData) -> Self { - PubsubMessage { encoding, data } + /// Returns the topics that each pubsub message will be sent across, given a supported + /// gossipsub encoding and fork version. + pub fn topics(&self, encoding: GossipEncoding, fork_version: [u8; 4]) -> Vec { + vec![GossipTopic::new(self.kind(), encoding, fork_version)] } - /// Returns the topics that each pubsub message will be sent across, given a supported - /// gossipsub encoding. - pub fn topics(&self) -> Vec { - let encoding = self.encoding.clone(); - match &self.data { - PubsubData::BeaconBlock(_) => vec![GossipTopic::new(GossipKind::BeaconBlock, encoding)], - PubsubData::AggregateAndProofAttestation(_) => vec![GossipTopic::new( - GossipKind::BeaconAggregateAndProof, - encoding, - )], - PubsubData::Attestation(attestation_data) => vec![GossipTopic::new( - GossipKind::CommitteeIndex(attestation_data.0), - encoding, - )], - PubsubData::VoluntaryExit(_) => { - vec![GossipTopic::new(GossipKind::VoluntaryExit, encoding)] - } - PubsubData::ProposerSlashing(_) => { - vec![GossipTopic::new(GossipKind::ProposerSlashing, encoding)] - } - PubsubData::AttesterSlashing(_) => { - vec![GossipTopic::new(GossipKind::AttesterSlashing, encoding)] + /// Returns the kind of gossipsub topic associated with the message. + pub fn kind(&self) -> GossipKind { + match self { + PubsubMessage::BeaconBlock(_) => GossipKind::BeaconBlock, + PubsubMessage::AggregateAndProofAttestation(_) => GossipKind::BeaconAggregateAndProof, + PubsubMessage::Attestation(attestation_data) => { + GossipKind::CommitteeIndex(attestation_data.0) } + PubsubMessage::VoluntaryExit(_) => GossipKind::VoluntaryExit, + PubsubMessage::ProposerSlashing(_) => GossipKind::ProposerSlashing, + PubsubMessage::AttesterSlashing(_) => GossipKind::AttesterSlashing, } } + /// This decodes `data` into a `PubsubMessage` given a list of topics. + /// + /// The topics are checked + /// in order and as soon as one topic matches the decoded data, we return the data. /* Note: This is assuming we are not hashing topics. If we choose to hash topics, these will * need to be modified. * @@ -85,61 +70,48 @@ impl PubsubMessage { // group each part by encoding type GossipEncoding::SSZ => { // the ssz decoders - let encoding = GossipEncoding::SSZ; match gossip_topic.kind() { GossipKind::BeaconAggregateAndProof => { let agg_and_proof = SignedAggregateAndProof::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?; - return Ok(PubsubMessage::new( - encoding, - PubsubData::AggregateAndProofAttestation(Box::new( - agg_and_proof, - )), + return Ok(PubsubMessage::AggregateAndProofAttestation( + Box::new(agg_and_proof), )); } GossipKind::CommitteeIndex(subnet_id) => { let attestation = Attestation::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?; - return Ok(PubsubMessage::new( - encoding, - PubsubData::Attestation(Box::new(( - *subnet_id, - attestation, - ))), - )); + return Ok(PubsubMessage::Attestation(Box::new(( + *subnet_id, + attestation, + )))); } GossipKind::BeaconBlock => { let beacon_block = SignedBeaconBlock::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?; - return Ok(PubsubMessage::new( - encoding, - PubsubData::BeaconBlock(Box::new(beacon_block)), - )); + return Ok(PubsubMessage::BeaconBlock(Box::new(beacon_block))); } GossipKind::VoluntaryExit => { let voluntary_exit = VoluntaryExit::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?; - return Ok(PubsubMessage::new( - encoding, - PubsubData::VoluntaryExit(Box::new(voluntary_exit)), - )); + return Ok(PubsubMessage::VoluntaryExit(Box::new( + voluntary_exit, + ))); } GossipKind::ProposerSlashing => { let proposer_slashing = ProposerSlashing::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?; - return Ok(PubsubMessage::new( - encoding, - PubsubData::ProposerSlashing(Box::new(proposer_slashing)), - )); + return Ok(PubsubMessage::ProposerSlashing(Box::new( + proposer_slashing, + ))); } GossipKind::AttesterSlashing => { let attester_slashing = AttesterSlashing::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?; - return Ok(PubsubMessage::new( - encoding, - PubsubData::AttesterSlashing(Box::new(attester_slashing)), - )); + return Ok(PubsubMessage::AttesterSlashing(Box::new( + attester_slashing, + ))); } } } @@ -150,19 +122,19 @@ impl PubsubMessage { Err(format!("Unknown gossipsub topics: {:?}", unknown_topics)) } - /// Encodes a pubsub message based on the topic encodings. The first known encoding is used. If + /// Encodes a `PubsubMessage` based on the topic encodings. The first known encoding is used. If /// no encoding is known, and error is returned. - pub fn encode(&self) -> Vec { - match self.encoding { + pub fn encode(&self, encoding: GossipEncoding) -> Vec { + match encoding { GossipEncoding::SSZ => { // SSZ Encodings - return match &self.data { - PubsubData::BeaconBlock(data) => data.as_ssz_bytes(), - PubsubData::AggregateAndProofAttestation(data) => data.as_ssz_bytes(), - PubsubData::VoluntaryExit(data) => data.as_ssz_bytes(), - PubsubData::ProposerSlashing(data) => data.as_ssz_bytes(), - PubsubData::AttesterSlashing(data) => data.as_ssz_bytes(), - PubsubData::Attestation(data) => data.1.as_ssz_bytes(), + return match &self { + PubsubMessage::BeaconBlock(data) => data.as_ssz_bytes(), + PubsubMessage::AggregateAndProofAttestation(data) => data.as_ssz_bytes(), + PubsubMessage::VoluntaryExit(data) => data.as_ssz_bytes(), + PubsubMessage::ProposerSlashing(data) => data.as_ssz_bytes(), + PubsubMessage::AttesterSlashing(data) => data.as_ssz_bytes(), + PubsubMessage::Attestation(data) => data.1.as_ssz_bytes(), }; } } diff --git a/beacon_node/eth2-libp2p/src/types/topics.rs b/beacon_node/eth2-libp2p/src/types/topics.rs index 5ea0b1e762..d93e1572b7 100644 --- a/beacon_node/eth2-libp2p/src/types/topics.rs +++ b/beacon_node/eth2-libp2p/src/types/topics.rs @@ -23,11 +23,14 @@ pub const ATTESTER_SLASHING_TOPIC: &str = "attester_slashing"; pub struct GossipTopic { /// The encoding of the topic. encoding: GossipEncoding, + /// The fork digest of the topic, + fork_digest: [u8; 4], /// The kind of topic. kind: GossipKind, } /// Enum that brings these topics into the rust type system. +// NOTE: There is intentionally no unknown type here. We only allow known gossipsub topics. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] pub enum GossipKind { /// Topic for publishing beacon blocks. @@ -44,6 +47,19 @@ pub enum GossipKind { AttesterSlashing, } +impl std::fmt::Display for GossipKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + GossipKind::BeaconBlock => write!(f, "beacon_block"), + GossipKind::BeaconAggregateAndProof => write!(f, "beacon_aggregate_and_proof"), + GossipKind::CommitteeIndex(subnet_id) => write!(f, "committee_index_{}", **subnet_id), + GossipKind::VoluntaryExit => write!(f, "voluntary_exit"), + GossipKind::ProposerSlashing => write!(f, "proposer_slashing"), + GossipKind::AttesterSlashing => write!(f, "attester_slashing"), + } + } +} + /// The known encoding types for gossipsub messages. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] pub enum GossipEncoding { @@ -52,8 +68,12 @@ pub enum GossipEncoding { } impl GossipTopic { - pub fn new(kind: GossipKind, encoding: GossipEncoding) -> Self { - GossipTopic { encoding, kind } + pub fn new(kind: GossipKind, encoding: GossipEncoding, fork_digest: [u8; 4]) -> Self { + GossipTopic { + encoding, + kind, + fork_digest, + } } /// Returns the encoding type for the gossipsub topic. @@ -61,6 +81,11 @@ impl GossipTopic { &self.encoding } + /// Returns a mutable reference to the fork digest of the gossipsub topic. + pub fn digest(&mut self) -> &mut [u8; 4] { + &mut self.fork_digest + } + /// Returns the kind of message expected on the gossipsub topic. pub fn kind(&self) -> &GossipKind { &self.kind @@ -68,12 +93,25 @@ impl GossipTopic { pub fn decode(topic: &str) -> Result { let topic_parts: Vec<&str> = topic.split('/').collect(); - if topic_parts.len() == 4 && topic_parts[1] == TOPIC_PREFIX { - let encoding = match topic_parts[3] { + if topic_parts.len() == 5 && topic_parts[1] == TOPIC_PREFIX { + let digest_bytes = hex::decode(topic_parts[2]) + .map_err(|e| format!("Could not decode fork_digest hex: {}", e))?; + + if digest_bytes.len() != 4 { + return Err(format!( + "Invalid gossipsub fork digest size: {}", + digest_bytes.len() + )); + } + + let mut fork_digest = [0; 4]; + fork_digest.copy_from_slice(&digest_bytes); + + let encoding = match topic_parts[4] { SSZ_ENCODING_POSTFIX => GossipEncoding::SSZ, _ => return Err(format!("Unknown encoding: {}", topic)), }; - let kind = match topic_parts[2] { + let kind = match topic_parts[3] { BEACON_BLOCK_TOPIC => GossipKind::BeaconBlock, BEACON_AGGREGATE_AND_PROOF_TOPIC => GossipKind::BeaconAggregateAndProof, VOLUNTARY_EXIT_TOPIC => GossipKind::VoluntaryExit, @@ -85,7 +123,11 @@ impl GossipTopic { }, }; - return Ok(GossipTopic { encoding, kind }); + return Ok(GossipTopic { + encoding, + kind, + fork_digest, + }); } Err(format!("Unknown topic: {}", topic)) @@ -115,7 +157,13 @@ impl Into for GossipTopic { COMMITEE_INDEX_TOPIC_PREFIX, *index, COMMITEE_INDEX_TOPIC_POSTFIX ), }; - format!("/{}/{}/{}", TOPIC_PREFIX, kind, encoding) + format!( + "/{}/{}/{}/{}", + TOPIC_PREFIX, + hex::encode(self.fork_digest), + kind, + encoding + ) } } diff --git a/beacon_node/eth2-libp2p/tests/gossipsub_tests.rs b/beacon_node/eth2-libp2p/tests/gossipsub_tests.rs index f2f765b773..ed18cfff40 100644 --- a/beacon_node/eth2-libp2p/tests/gossipsub_tests.rs +++ b/beacon_node/eth2-libp2p/tests/gossipsub_tests.rs @@ -33,9 +33,13 @@ fn test_gossipsub_forward() { message: empty_block, signature: Signature::empty_signature(), }; - let data = PubsubData::BeaconBlock(Box::new(signed_block)); - let pubsub_message = PubsubMessage::new(GossipEncoding::SSZ, data); - let publishing_topic: String = "/eth2/beacon_block/ssz".into(); + let pubsub_message = PubsubMessage::BeaconBlock(Box::new(signed_block)); + let publishing_topic: String = pubsub_message + .topics(GossipEncoding::SSZ, [0, 0, 0, 0]) + .first() + .unwrap() + .clone() + .into(); let mut subscribed_count = 0; tokio::run(futures::future::poll_fn(move || -> Result<_, ()> { for node in nodes.iter_mut() { @@ -65,10 +69,8 @@ fn test_gossipsub_forward() { } } Async::Ready(Some(Libp2pEvent::PeerSubscribed(_, topic))) => { - // Received topics is one of subscribed eth2 topics - assert!(topic.clone().into_string().starts_with("/eth2/")); // Publish on beacon block topic - if topic == TopicHash::from_raw("/eth2/beacon_block/ssz") { + if topic == TopicHash::from_raw(publishing_topic.clone()) { subscribed_count += 1; // Every node except the corner nodes are connected to 2 nodes. if subscribed_count == (num_nodes * 2) - 2 { @@ -104,9 +106,13 @@ fn test_gossipsub_full_mesh_publish() { message: empty_block, signature: Signature::empty_signature(), }; - let data = PubsubData::BeaconBlock(Box::new(signed_block)); - let pubsub_message = PubsubMessage::new(GossipEncoding::SSZ, data); - let publishing_topic: String = "/eth2/beacon_block/ssz".into(); + let pubsub_message = PubsubMessage::BeaconBlock(Box::new(signed_block)); + let publishing_topic: String = pubsub_message + .topics(GossipEncoding::SSZ, [0, 0, 0, 0]) + .first() + .unwrap() + .clone() + .into(); let mut subscribed_count = 0; let mut received_count = 0; tokio::run(futures::future::poll_fn(move || -> Result<_, ()> { @@ -132,10 +138,8 @@ fn test_gossipsub_full_mesh_publish() { while let Async::Ready(Some(Libp2pEvent::PeerSubscribed(_, topic))) = publishing_node.poll().unwrap() { - // Received topics is one of subscribed eth2 topics - assert!(topic.clone().into_string().starts_with("/eth2/")); // Publish on beacon block topic - if topic == TopicHash::from_raw("/eth2/beacon_block/ssz") { + if topic == TopicHash::from_raw(publishing_topic.clone()) { subscribed_count += 1; if subscribed_count == num_nodes - 1 { publishing_node.swarm.publish(vec![pubsub_message.clone()]); diff --git a/beacon_node/eth2-libp2p/tests/rpc_tests.rs b/beacon_node/eth2-libp2p/tests/rpc_tests.rs index 1882e08446..48a57e31e5 100644 --- a/beacon_node/eth2-libp2p/tests/rpc_tests.rs +++ b/beacon_node/eth2-libp2p/tests/rpc_tests.rs @@ -29,7 +29,7 @@ fn test_status_rpc() { // Dummy STATUS RPC message let rpc_request = RPCRequest::Status(StatusMessage { - fork_version: [0; 4], + fork_digest: [0; 4], finalized_root: Hash256::from_low_u64_be(0), finalized_epoch: Epoch::new(1), head_root: Hash256::from_low_u64_be(0), @@ -38,7 +38,7 @@ fn test_status_rpc() { // Dummy STATUS RPC message let rpc_response = RPCResponse::Status(StatusMessage { - fork_version: [0; 4], + fork_digest: [0; 4], finalized_root: Hash256::from_low_u64_be(0), finalized_epoch: Epoch::new(1), head_root: Hash256::from_low_u64_be(0), @@ -142,7 +142,6 @@ fn test_blocks_by_range_chunked_rpc() { // BlocksByRange Request let rpc_request = RPCRequest::BlocksByRange(BlocksByRangeRequest { - head_block_root: Hash256::from_low_u64_be(0), start_slot: 0, count: messages_to_send, step: 0, @@ -275,7 +274,6 @@ fn test_blocks_by_range_single_empty_rpc() { // BlocksByRange Request let rpc_request = RPCRequest::BlocksByRange(BlocksByRangeRequest { - head_block_root: Hash256::from_low_u64_be(0), start_slot: 0, count: 10, step: 0, diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index de2727436d..df9a364b33 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -11,7 +11,7 @@ use crate::service::NetworkMessage; use beacon_chain::{AttestationType, BeaconChain, BeaconChainTypes}; use eth2_libp2p::{ rpc::{RPCError, RPCErrorResponse, RPCRequest, RPCResponse, RequestId, ResponseTermination}, - MessageId, PeerId, PubsubData, PubsubMessage, RPCEvent, + MessageId, PeerId, PubsubMessage, RPCEvent, }; use futures::future::Future; use futures::stream::Stream; @@ -217,9 +217,9 @@ impl Router { peer_id: PeerId, gossip_message: PubsubMessage, ) { - match gossip_message.data { + match gossip_message { // Attestations should never reach the router. - PubsubData::AggregateAndProofAttestation(aggregate_and_proof) => { + PubsubMessage::AggregateAndProofAttestation(aggregate_and_proof) => { if self .processor .should_forward_aggregate_attestation(&aggregate_and_proof) @@ -232,7 +232,7 @@ impl Router { AttestationType::Aggregated, ); } - PubsubData::Attestation(subnet_attestation) => { + PubsubMessage::Attestation(subnet_attestation) => { if self .processor .should_forward_attestation(&subnet_attestation.1) @@ -245,7 +245,7 @@ impl Router { AttestationType::Unaggregated { should_store: true }, ); } - PubsubData::BeaconBlock(block) => match self.processor.should_forward_block(block) { + PubsubMessage::BeaconBlock(block) => match self.processor.should_forward_block(block) { Ok(verified_block) => { self.propagate_message(id, peer_id.clone()); self.processor.on_block_gossip(peer_id, verified_block); @@ -255,19 +255,19 @@ impl Router { "error" => format!("{:?}", e)); } }, - PubsubData::VoluntaryExit(_exit) => { + PubsubMessage::VoluntaryExit(_exit) => { // TODO: Apply more sophisticated validation self.propagate_message(id, peer_id.clone()); // TODO: Handle exits debug!(self.log, "Received a voluntary exit"; "peer_id" => format!("{}", peer_id) ); } - PubsubData::ProposerSlashing(_proposer_slashing) => { + PubsubMessage::ProposerSlashing(_proposer_slashing) => { // TODO: Apply more sophisticated validation self.propagate_message(id, peer_id.clone()); // TODO: Handle proposer slashings debug!(self.log, "Received a proposer slashing"; "peer_id" => format!("{}", peer_id) ); } - PubsubData::AttesterSlashing(_attester_slashing) => { + PubsubMessage::AttesterSlashing(_attester_slashing) => { // TODO: Apply more sophisticated validation self.propagate_message(id, peer_id.clone()); // TODO: Handle attester slashings diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 4711cf4cea..c297bf5e18 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -25,7 +25,7 @@ pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1; /// Keeps track of syncing information for known connected peers. #[derive(Clone, Copy, Debug)] pub struct PeerSyncInfo { - fork_version: [u8; 4], + fork_digest: [u8; 4], pub finalized_root: Hash256, pub finalized_epoch: Epoch, pub head_root: Hash256, @@ -35,7 +35,7 @@ pub struct PeerSyncInfo { impl From for PeerSyncInfo { fn from(status: StatusMessage) -> PeerSyncInfo { PeerSyncInfo { - fork_version: status.fork_version, + fork_digest: status.fork_digest, finalized_root: status.finalized_root, finalized_epoch: status.finalized_epoch, head_root: status.head_root, @@ -123,7 +123,7 @@ impl Processor { self.log, "Sending Status Request"; "peer" => format!("{:?}", peer_id), - "fork_version" => format!("{:?}", status_message.fork_version), + "fork_digest" => format!("{:?}", status_message.fork_digest), "finalized_root" => format!("{:?}", status_message.finalized_root), "finalized_epoch" => format!("{:?}", status_message.finalized_epoch), "head_root" => format!("{}", status_message.head_root), @@ -147,7 +147,7 @@ impl Processor { self.log, "Received Status Request"; "peer" => format!("{:?}", peer_id), - "fork_version" => format!("{:?}", status.fork_version), + "fork_digest" => format!("{:?}", status.fork_digest), "finalized_root" => format!("{:?}", status.finalized_root), "finalized_epoch" => format!("{:?}", status.finalized_epoch), "head_root" => format!("{}", status.head_root), @@ -193,12 +193,14 @@ impl Processor { let start_slot = |epoch: Epoch| epoch.start_slot(T::EthSpec::slots_per_epoch()); - if local.fork_version != remote.fork_version { + if local.fork_digest != remote.fork_digest { // The node is on a different network/fork, disconnect them. debug!( self.log, "Handshake Failure"; "peer" => format!("{:?}", peer_id), - "reason" => "network_id" + "reason" => "incompatible forks", + "our_fork" => hex::encode(local.fork_digest), + "their_fork" => hex::encode(remote.fork_digest) ); self.network @@ -631,8 +633,9 @@ pub(crate) fn status_message( ) -> Option { let head_info = beacon_chain.head_info().ok()?; + // TODO: Update fork digest calculation Some(StatusMessage { - fork_version: head_info.fork.current_version, + fork_digest: head_info.fork.current_version, finalized_root: head_info.finalized_checkpoint.root, finalized_epoch: head_info.finalized_checkpoint.epoch, head_root: head_info.block_root, diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index ab8c2d3a1f..409ea70ca9 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -8,7 +8,7 @@ use crate::{ use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::Service as LibP2PService; use eth2_libp2p::{rpc::RPCRequest, Enr, Libp2pEvent, MessageId, NetworkGlobals, PeerId, Swarm}; -use eth2_libp2p::{PubsubData, PubsubMessage, RPCEvent}; +use eth2_libp2p::{PubsubMessage, RPCEvent}; use futures::prelude::*; use futures::Stream; use rest_types::ValidatorSubscription; @@ -217,15 +217,13 @@ fn spawn_service( if !should_send { info!(log, "Random filter did not publish messages"); } else { - let mut unique_topics = Vec::new(); + let mut topic_kinds = Vec::new(); for message in &messages { - for topic in message.topics() { - if !unique_topics.contains(&topic) { - unique_topics.push(topic); + if !topic_kinds.contains(&message.kind()) { + topic_kinds.push(message.kind()); } } - } - debug!(log, "Sending pubsub messages"; "count" => messages.len(), "topics" => format!("{:?}", unique_topics)); + debug!(log, "Sending pubsub messages"; "count" => messages.len(), "topics" => format!("{:?}", topic_kinds)); service.libp2p.swarm.publish(messages); } } @@ -310,9 +308,9 @@ fn spawn_service( .. } => { - match message.data { + match message { // attestation information gets processed in the attestation service - PubsubData::Attestation(ref subnet_and_attestation) => { + PubsubMessage::Attestation(ref subnet_and_attestation) => { let subnet = &subnet_and_attestation.0; let attestation = &subnet_and_attestation.1; // checks if we have an aggregator for the slot. If so, we process diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 2d16391a0d..6b029e0a11 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -43,7 +43,7 @@ impl SyncNetworkContext { self.log, "Sending Status Request"; "peer" => format!("{:?}", peer_id), - "fork_version" => format!("{:?}", status_message.fork_version), + "fork_digest" => format!("{:?}", status_message.fork_digest), "finalized_root" => format!("{:?}", status_message.finalized_root), "finalized_epoch" => format!("{:?}", status_message.finalized_epoch), "head_root" => format!("{}", status_message.head_root), diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 3e516a93b0..58df693399 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -9,7 +9,7 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::hash::{Hash, Hasher}; use std::ops::Sub; -use types::{EthSpec, Hash256, SignedBeaconBlock, Slot}; +use types::{EthSpec, SignedBeaconBlock, Slot}; #[derive(Copy, Clone, Debug, PartialEq)] pub struct BatchId(pub u64); @@ -41,8 +41,6 @@ pub struct Batch { pub start_slot: Slot, /// The requested end slot of batch, exclusive. pub end_slot: Slot, - /// The hash of the chain root to requested from the peer. - pub head_root: Hash256, /// The peer that was originally assigned to the batch. pub original_peer: PeerId, /// The peer that is currently assigned to the batch. @@ -61,18 +59,11 @@ pub struct Batch { impl Eq for Batch {} impl Batch { - pub fn new( - id: BatchId, - start_slot: Slot, - end_slot: Slot, - head_root: Hash256, - peer_id: PeerId, - ) -> Self { + pub fn new(id: BatchId, start_slot: Slot, end_slot: Slot, peer_id: PeerId) -> Self { Batch { id, start_slot, end_slot, - head_root, original_peer: peer_id.clone(), current_peer: peer_id, retries: 0, @@ -84,7 +75,6 @@ impl Batch { pub fn to_blocks_by_range_request(&self) -> BlocksByRangeRequest { BlocksByRangeRequest { - head_block_root: self.head_root, start_slot: self.start_slot.into(), count: std::cmp::min(BLOCKS_PER_BATCH, self.end_slot.sub(self.start_slot).into()), step: 1, diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 8516e8df34..dd64600ab1 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -449,7 +449,6 @@ impl SyncingChain { "end_slot" => batch.end_slot, "id" => *batch.id, "peer" => format!("{}", batch.current_peer), - "head_root"=> format!("{}", batch.head_root), "retries" => batch.retries, "re-processes" => batch.reprocess_retries); self.send_batch(network, batch); @@ -578,8 +577,7 @@ impl SyncingChain { "start_slot" => batch.start_slot, "end_slot" => batch.end_slot, "id" => *batch.id, - "peer" => format!("{:?}", batch.current_peer), - "head_root"=> format!("{}", batch.head_root)); + "peer" => format!("{:?}", batch.current_peer)); self.send_batch(network, batch); ProcessingResult::KeepChain } @@ -603,8 +601,7 @@ impl SyncingChain { "start_slot" => batch.start_slot, "end_slot" => batch.end_slot, "id" => *batch.id, - "peer" => format!("{}", batch.current_peer), - "head_root"=> format!("{}", batch.head_root)); + "peer" => format!("{}", batch.current_peer)); // send the batch self.send_batch(network, batch); return true; @@ -675,7 +672,6 @@ impl SyncingChain { batch_id, batch_start_slot, batch_end_slot, - self.target_head_root, peer_id, )) } diff --git a/beacon_node/rest_api/src/helpers.rs b/beacon_node/rest_api/src/helpers.rs index 573b471170..893d583c27 100644 --- a/beacon_node/rest_api/src/helpers.rs +++ b/beacon_node/rest_api/src/helpers.rs @@ -1,8 +1,7 @@ use crate::{ApiError, ApiResult, NetworkChannel}; use beacon_chain::{BeaconChain, BeaconChainTypes, StateSkipConfig}; use bls::PublicKeyBytes; -use eth2_libp2p::types::GossipEncoding; -use eth2_libp2p::{PubsubData, PubsubMessage}; +use eth2_libp2p::PubsubMessage; use hex; use http::header; use hyper::{Body, Request}; @@ -235,10 +234,7 @@ pub fn publish_beacon_block_to_network( block: SignedBeaconBlock, ) -> Result<(), ApiError> { // send the block via SSZ encoding - let messages = vec![PubsubMessage::new( - GossipEncoding::SSZ, - PubsubData::BeaconBlock(Box::new(block)), - )]; + let messages = vec![PubsubMessage::BeaconBlock(Box::new(block))]; // Publish the block to the p2p network via gossipsub. if let Err(e) = chan.try_send(NetworkMessage::Publish { messages }) { @@ -261,10 +257,7 @@ pub fn publish_raw_attestations_to_network( .map(|attestation| { // create the gossip message to send to the network let subnet_id = attestation.subnet_id(); - PubsubMessage::new( - GossipEncoding::SSZ, - PubsubData::Attestation(Box::new((subnet_id, attestation))), - ) + PubsubMessage::Attestation(Box::new((subnet_id, attestation))) }) .collect::>(); @@ -286,12 +279,7 @@ pub fn publish_aggregate_attestations_to_network( ) -> Result<(), ApiError> { let messages = signed_proofs .into_iter() - .map(|signed_proof| { - PubsubMessage::new( - GossipEncoding::SSZ, - PubsubData::AggregateAndProofAttestation(Box::new(signed_proof)), - ) - }) + .map(|signed_proof| PubsubMessage::AggregateAndProofAttestation(Box::new(signed_proof))) .collect::>(); // Publish the attestations to the p2p network via gossipsub. diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 94c92bbdd5..a8bed30032 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -125,14 +125,6 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { This disables this feature, fixing the ENR's IP/PORT to those specified on boot.") .takes_value(true), ) - .arg( - Arg::with_name("topics") - .long("topics") - .value_name("STRING") - .help("One or more comma-delimited gossipsub topic strings to subscribe to. Default \ - is determined automatically.") - .takes_value(true), - ) .arg( Arg::with_name("libp2p-addresses") .long("libp2p-addresses") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 4ff6be5fc0..d09f937991 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -1,7 +1,7 @@ use clap::ArgMatches; use client::{config::DEFAULT_DATADIR, ClientConfig, ClientGenesis, Eth2Config}; use eth2_config::{read_from_file, write_to_file}; -use eth2_libp2p::{Enr, GossipTopic, Multiaddr}; +use eth2_libp2p::{Enr, Multiaddr}; use eth2_testnet_config::Eth2TestnetConfig; use genesis::recent_genesis_time; use rand::{distributions::Alphanumeric, Rng}; @@ -141,15 +141,6 @@ pub fn get_configs( .collect::>>()?; } - if let Some(topics_str) = cli_args.value_of("topics") { - let mut topics = Vec::new(); - let topic_list = topics_str.split(',').collect::>(); - for topic_str in topic_list { - topics.push(GossipTopic::decode(topic_str)?); - } - client_config.network.topics = topics; - } - if let Some(enr_address_str) = cli_args.value_of("enr-address") { client_config.network.enr_address = Some( enr_address_str