diff --git a/beacon_node/eth2_libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs index ebcaca7f4a..79ff3ebe9c 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -1,6 +1,6 @@ use crate::peer_manager::{score::PeerAction, PeerManager, PeerManagerEvent}; use crate::rpc::*; -use crate::types::{EnrBitfield, GossipEncoding, GossipKind, GossipTopic}; +use crate::types::{EnrBitfield, GossipEncoding, GossipKind, GossipTopic, SubnetDiscovery}; use crate::Eth2Enr; use crate::{error, metrics, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash}; use futures::prelude::*; @@ -29,7 +29,6 @@ use std::{ marker::PhantomData, sync::Arc, task::{Context, Poll}, - time::Instant, }; use types::{EnrForkId, EthSpec, SignedBeaconBlock, SubnetId}; @@ -301,8 +300,9 @@ impl Behaviour { /// Attempts to discover new peers for a given subnet. The `min_ttl` gives the time at which we /// would like to retain the peers for. - pub fn discover_subnet_peers(&mut self, subnet_id: SubnetId, min_ttl: Option) { - self.peer_manager.discover_subnet_peers(subnet_id, min_ttl) + pub fn discover_subnet_peers(&mut self, subnet_subscriptions: Vec) { + self.peer_manager + .discover_subnet_peers(subnet_subscriptions) } /// Updates the local ENR's "eth2" field with the latest EnrForkId. diff --git a/beacon_node/eth2_libp2p/src/discovery/mod.rs b/beacon_node/eth2_libp2p/src/discovery/mod.rs index be2cd69432..1b2163410b 100644 --- a/beacon_node/eth2_libp2p/src/discovery/mod.rs +++ b/beacon_node/eth2_libp2p/src/discovery/mod.rs @@ -8,7 +8,7 @@ pub use enr_ext::{CombinedKeyExt, EnrExt}; pub use libp2p::core::identity::Keypair; use crate::metrics; -use crate::{error, Enr, NetworkConfig, NetworkGlobals}; +use crate::{error, Enr, NetworkConfig, NetworkGlobals, SubnetDiscovery}; use discv5::{enr::NodeId, Discv5, Discv5Event}; use enr::{BITFIELD_ENR_KEY, ETH2_ENR_KEY}; use futures::prelude::*; @@ -305,12 +305,19 @@ impl Discovery { } /// Processes a request to search for more peers on a subnet. - pub fn discover_subnet_peers(&mut self, subnet_id: SubnetId, min_ttl: Option) { + pub fn discover_subnet_peers(&mut self, subnets_to_discover: Vec) { // If the discv5 service isn't running, ignore queries if !self.started { return; } - self.add_subnet_query(subnet_id, min_ttl, 0); + debug!( + self.log, + "Making discovery query for subnets"; + "subnets" => format!("{:?}", subnets_to_discover.iter().map(|s| s.subnet_id).collect::>()) + ); + for subnet in subnets_to_discover { + self.add_subnet_query(subnet.subnet_id, subnet.min_ttl, 0); + } } /// Add an ENR to the routing table of the discovery mechanism. @@ -514,6 +521,11 @@ impl Discovery { // This query is for searching for peers of a particular subnet // Drain subnet_queries so we can re-use it as we continue to process the queue let grouped_queries: Vec = subnet_queries.drain(..).collect(); + debug!( + self.log, + "Starting grouped subnet query"; + "subnets" => format!("{:?}", grouped_queries.iter().map(|q| q.subnet_id).collect::>()), + ); self.start_subnet_query(grouped_queries); } } diff --git a/beacon_node/eth2_libp2p/src/discovery/subnet_predicate.rs b/beacon_node/eth2_libp2p/src/discovery/subnet_predicate.rs index e38fd4cf89..cc38350623 100644 --- a/beacon_node/eth2_libp2p/src/discovery/subnet_predicate.rs +++ b/beacon_node/eth2_libp2p/src/discovery/subnet_predicate.rs @@ -1,5 +1,6 @@ ///! The subnet predicate used for searching for a particular subnet. use super::*; +use slog::{debug, trace}; use std::ops::Deref; /// Returns the predicate for a given subnet. @@ -30,7 +31,7 @@ where .collect(); if matches.is_empty() { - debug!( + trace!( log_clone, "Peer found but not on any of the desired subnets"; "peer_id" => format!("{}", enr.peer_id()) diff --git a/beacon_node/eth2_libp2p/src/lib.rs b/beacon_node/eth2_libp2p/src/lib.rs index f5c9e75a26..ee422c43d4 100644 --- a/beacon_node/eth2_libp2p/src/lib.rs +++ b/beacon_node/eth2_libp2p/src/lib.rs @@ -14,7 +14,7 @@ pub mod rpc; mod service; pub mod types; -pub use crate::types::{error, Enr, GossipTopic, NetworkGlobals, PubsubMessage}; +pub use crate::types::{error, Enr, GossipTopic, NetworkGlobals, PubsubMessage, SubnetDiscovery}; pub use behaviour::{BehaviourEvent, PeerRequestId, Request, Response}; pub use config::Config as NetworkConfig; pub use discovery::{CombinedKeyExt, EnrExt, Eth2Enr}; diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index a23994a258..861b27faac 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -4,7 +4,7 @@ pub use self::peerdb::*; use crate::discovery::{Discovery, DiscoveryEvent}; use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCode}; use crate::{error, metrics}; -use crate::{EnrExt, NetworkConfig, NetworkGlobals, PeerId}; +use crate::{EnrExt, NetworkConfig, NetworkGlobals, PeerId, SubnetDiscovery}; use futures::prelude::*; use futures::Stream; use hashset_delay::HashSetDelay; @@ -19,7 +19,7 @@ use std::{ task::{Context, Poll}, time::{Duration, Instant}, }; -use types::{EthSpec, SubnetId}; +use types::EthSpec; pub use libp2p::core::{identity::Keypair, Multiaddr}; @@ -213,17 +213,19 @@ impl PeerManager { } /// A request to find peers on a given subnet. - pub fn discover_subnet_peers(&mut self, subnet_id: SubnetId, min_ttl: Option) { + pub fn discover_subnet_peers(&mut self, subnets_to_discover: Vec) { // Extend the time to maintain peers if required. - if let Some(min_ttl) = min_ttl { - self.network_globals - .peers - .write() - .extend_peers_on_subnet(subnet_id, min_ttl); + for s in subnets_to_discover.iter() { + if let Some(min_ttl) = s.min_ttl { + self.network_globals + .peers + .write() + .extend_peers_on_subnet(s.subnet_id, min_ttl); + } } // request the subnet query from discovery - self.discovery.discover_subnet_peers(subnet_id, min_ttl); + self.discovery.discover_subnet_peers(subnets_to_discover); } /// A STATUS message has been received from a peer. This resets the status timer. diff --git a/beacon_node/eth2_libp2p/src/types/mod.rs b/beacon_node/eth2_libp2p/src/types/mod.rs index 8f9b07fd33..ec6fcd4af4 100644 --- a/beacon_node/eth2_libp2p/src/types/mod.rs +++ b/beacon_node/eth2_libp2p/src/types/mod.rs @@ -1,6 +1,7 @@ pub mod error; mod globals; mod pubsub; +mod subnet; mod sync_state; mod topics; @@ -13,5 +14,6 @@ pub type Enr = discv5::enr::Enr; pub use globals::NetworkGlobals; pub use pubsub::PubsubMessage; +pub use subnet::SubnetDiscovery; pub use sync_state::SyncState; pub use topics::{GossipEncoding, GossipKind, GossipTopic}; diff --git a/beacon_node/eth2_libp2p/src/types/subnet.rs b/beacon_node/eth2_libp2p/src/types/subnet.rs new file mode 100644 index 0000000000..0136e63010 --- /dev/null +++ b/beacon_node/eth2_libp2p/src/types/subnet.rs @@ -0,0 +1,28 @@ +use std::time::{Duration, Instant}; +use types::SubnetId; + +const DURATION_DIFFERENCE: Duration = Duration::from_millis(1); + +/// A subnet to discover peers on along with the instant after which it's no longer useful. +#[derive(Debug, Clone)] +pub struct SubnetDiscovery { + pub subnet_id: SubnetId, + pub min_ttl: Option, +} + +impl PartialEq for SubnetDiscovery { + fn eq(&self, other: &SubnetDiscovery) -> bool { + self.subnet_id == other.subnet_id + && match (self.min_ttl, other.min_ttl) { + (Some(min_ttl_instant), Some(other_min_ttl_instant)) => { + min_ttl_instant.saturating_duration_since(other_min_ttl_instant) + < DURATION_DIFFERENCE + && other_min_ttl_instant.saturating_duration_since(min_ttl_instant) + < DURATION_DIFFERENCE + } + (None, None) => true, + (None, Some(_)) => true, + (Some(_), None) => true, + } + } +} diff --git a/beacon_node/network/src/attestation_service/mod.rs b/beacon_node/network/src/attestation_service/mod.rs index 8ba791c169..cf94c5cdd7 100644 --- a/beacon_node/network/src/attestation_service/mod.rs +++ b/beacon_node/network/src/attestation_service/mod.rs @@ -2,7 +2,7 @@ //! given time. It schedules subscriptions to shard subnets, requests peer discoveries and //! determines whether attestations should be aggregated and/or passed to the beacon node. -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -13,7 +13,7 @@ use rand::seq::SliceRandom; use slog::{crit, debug, error, o, trace, warn}; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use eth2_libp2p::{types::GossipKind, NetworkGlobals}; +use eth2_libp2p::{types::GossipKind, NetworkGlobals, SubnetDiscovery}; use hashset_delay::HashSetDelay; use rest_types::ValidatorSubscription; use slot_clock::SlotClock; @@ -25,11 +25,8 @@ mod tests; /// The minimum number of slots ahead that we attempt to discover peers for a subscription. If the /// slot is less than this number, skip the peer discovery process. -const MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 1; -/// The number of slots ahead that we attempt to discover peers for a subscription. If the slot to -/// attest to is greater than this, we queue a discovery request for this many slots prior to -/// subscribing. -const TARGET_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 6; +/// Subnet discovery query takes atmost 30 secs, 2 slots take 24s. +const MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 2; /// The time (in slots) before a last seen validator is considered absent and we unsubscribe from the random /// gossip topics that we subscribed to due to the validator connection. const LAST_SEEN_VALIDATOR_TIMEOUT: u32 = 150; @@ -39,12 +36,10 @@ const LAST_SEEN_VALIDATOR_TIMEOUT: u32 = 150; /// Note: The time is calculated as `time = milliseconds_per_slot / ADVANCE_SUBSCRIPTION_TIME`. const ADVANCE_SUBSCRIBE_TIME: u32 = 3; /// The default number of slots before items in hash delay sets used by this class should expire. +/// 36s at 12s slot time const DEFAULT_EXPIRATION_TIMEOUT: u32 = 3; -// 36s at 12s slot time -/// The duration difference between two instance for them to be considered equal. -const DURATION_DIFFERENCE: Duration = Duration::from_millis(1); -#[derive(Debug, Eq, Clone)] +#[derive(Debug, PartialEq, Clone)] pub enum AttServiceMessage { /// Subscribe to the specified subnet id. Subscribe(SubnetId), @@ -54,44 +49,8 @@ pub enum AttServiceMessage { EnrAdd(SubnetId), /// Remove the `SubnetId` from the ENR bitfield. EnrRemove(SubnetId), - /// Discover peers for a particular subnet. - /// The includes the `Instant` we need the discovered peer until. - DiscoverPeers { - subnet_id: SubnetId, - min_ttl: Option, - }, -} - -impl PartialEq for AttServiceMessage { - fn eq(&self, other: &AttServiceMessage) -> bool { - match (self, other) { - (&AttServiceMessage::Subscribe(a), &AttServiceMessage::Subscribe(b)) => a == b, - (&AttServiceMessage::Unsubscribe(a), &AttServiceMessage::Unsubscribe(b)) => a == b, - (&AttServiceMessage::EnrAdd(a), &AttServiceMessage::EnrAdd(b)) => a == b, - (&AttServiceMessage::EnrRemove(a), &AttServiceMessage::EnrRemove(b)) => a == b, - ( - &AttServiceMessage::DiscoverPeers { subnet_id, min_ttl }, - &AttServiceMessage::DiscoverPeers { - subnet_id: other_subnet_id, - min_ttl: other_min_ttl, - }, - ) => { - subnet_id == other_subnet_id - && match (min_ttl, other_min_ttl) { - (Some(min_ttl_instant), Some(other_min_ttl_instant)) => { - min_ttl_instant.saturating_duration_since(other_min_ttl_instant) - < DURATION_DIFFERENCE - && other_min_ttl_instant.saturating_duration_since(min_ttl_instant) - < DURATION_DIFFERENCE - } - (None, None) => true, - (None, Some(_)) => true, - (Some(_), None) => true, - } - } - _ => false, - } - } + /// Discover peers for a list of `SubnetDiscovery`. + DiscoverPeers(Vec), } /// A particular subnet at a given slot. @@ -116,9 +75,6 @@ pub struct AttestationService { /// The collection of currently subscribed random subnets mapped to their expiry deadline. random_subnets: HashSetDelay, - /// A collection of timeouts for when to start searching for peers for a particular shard. - discover_peers: HashSetDelay, - /// A collection of timeouts for when to subscribe to a shard subnet. subscriptions: HashSetDelay, @@ -172,7 +128,6 @@ impl AttestationService { network_globals, beacon_chain, random_subnets: HashSetDelay::new(Duration::from_millis(random_subnet_duration_millis)), - discover_peers: HashSetDelay::new(default_timeout), subscriptions: HashSetDelay::new(default_timeout), unsubscriptions: HashSetDelay::new(default_timeout), aggregate_validators_on_subnet: HashSetDelay::new(default_timeout), @@ -198,6 +153,8 @@ impl AttestationService { &mut self, subscriptions: Vec, ) -> Result<(), String> { + // Maps each subnet_id subscription to it's highest slot + let mut subnets_to_discover: HashMap = HashMap::new(); for subscription in subscriptions { metrics::inc_counter(&metrics::SUBNET_SUBSCRIPTION_REQUESTS); //NOTE: We assume all subscriptions have been verified before reaching this service @@ -226,15 +183,20 @@ impl AttestationService { continue; } }; + // Ensure each subnet_id inserted into the map has the highest slot as it's value. + // Higher slot corresponds to higher min_ttl in the `SubnetDiscovery` entry. + if let Some(slot) = subnets_to_discover.get(&subnet_id) { + if subscription.slot > *slot { + subnets_to_discover.insert(subnet_id, subscription.slot); + } + } else { + subnets_to_discover.insert(subnet_id, subscription.slot); + } let exact_subnet = ExactSubnet { subnet_id, slot: subscription.slot, }; - // determine if we should run a discovery lookup request and request it if required - if let Err(e) = self.discover_peers_request(exact_subnet.clone()) { - warn!(self.log, "Discovery lookup request error"; "error" => e); - } // determine if the validator is an aggregator. If so, we subscribe to the subnet and // if successful add the validator to a mapping of known aggregators for that exact @@ -264,6 +226,14 @@ impl AttestationService { } } + if let Err(e) = self.discover_peers_request( + subnets_to_discover + .into_iter() + .map(|(subnet_id, slot)| ExactSubnet { subnet_id, slot }), + ) { + warn!(self.log, "Discovery lookup request error"; "error" => e); + }; + // pre-emptively wake the thread to check for new events if let Some(waker) = &self.waker { waker.wake_by_ref(); @@ -290,114 +260,55 @@ impl AttestationService { /// Checks if there are currently queued discovery requests and the time required to make the /// request. /// - /// If there is sufficient time and no other request exists, queues a peer discovery request - /// for the required subnet. - fn discover_peers_request(&mut self, exact_subnet: ExactSubnet) -> Result<(), &'static str> { + /// If there is sufficient time, queues a peer discovery request for all the required subnets. + fn discover_peers_request( + &mut self, + exact_subnets: impl Iterator, + ) -> Result<(), &'static str> { let current_slot = self .beacon_chain .slot_clock .now() .ok_or_else(|| "Could not get the current slot")?; - let slot_duration = self.beacon_chain.slot_clock.slot_duration(); - // if there is enough time to perform a discovery lookup - if exact_subnet.slot >= current_slot.saturating_add(MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD) { - // check if a discovery request already exists - if self.discover_peers.get(&exact_subnet).is_some() { - // already a request queued, end - return Ok(()); - } - - // if the slot is more than epoch away, add an event to start looking for peers - if exact_subnet.slot - < current_slot.saturating_add(TARGET_PEER_DISCOVERY_SLOT_LOOK_AHEAD) - { - // add one slot to ensure we keep the peer for the subscription slot - let min_ttl = self - .beacon_chain - .slot_clock - .duration_to_slot(exact_subnet.slot + 1) - .map(|duration| std::time::Instant::now() + duration); - - self.send_or_update_discovery_event(exact_subnet.subnet_id, min_ttl); - } else { - // Queue the discovery event to be executed for - // TARGET_PEER_DISCOVERY_SLOT_LOOK_AHEAD - - let duration_to_discover = { - let duration_to_next_slot = self + let discovery_subnets: Vec = exact_subnets + .filter_map(|exact_subnet| { + // check if there is enough time to perform a discovery lookup + if exact_subnet.slot + >= current_slot.saturating_add(MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD) + { + // if the slot is more than epoch away, add an event to start looking for peers + // add one slot to ensure we keep the peer for the subscription slot + let min_ttl = self .beacon_chain .slot_clock - .duration_to_next_slot() - .ok_or_else(|| "Unable to determine duration to next slot")?; - // The -1 is done here to exclude the current slot duration, as we will use - // `duration_to_next_slot`. - let slots_until_discover = exact_subnet - .slot - .saturating_sub(current_slot) - .saturating_sub(1u64) - .saturating_sub(TARGET_PEER_DISCOVERY_SLOT_LOOK_AHEAD); + .duration_to_slot(exact_subnet.slot + 1) + .map(|duration| std::time::Instant::now() + duration); + Some(SubnetDiscovery { + subnet_id: exact_subnet.subnet_id, + min_ttl, + }) + } else { + // TODO: Send the time frame needed to have a peer connected, so that we can + // maintain peers for a least this duration. + // We may want to check the global PeerInfo to see estimated timeouts for each + // peer before they can be removed. + warn!(self.log, + "Not enough time for a discovery search"; + "subnet_id" => format!("{:?}", exact_subnet) + ); + None + } + }) + .collect(); - duration_to_next_slot + slot_duration * (slots_until_discover.as_u64() as u32) - }; - - self.discover_peers - .insert_at(exact_subnet, duration_to_discover); - } - } else { - // TODO: Send the time frame needed to have a peer connected, so that we can - // maintain peers for a least this duration. - // We may want to check the global PeerInfo to see estimated timeouts for each - // peer before they can be removed. - return Err("Not enough time for a discovery search"); + if !discovery_subnets.is_empty() { + self.events + .push_back(AttServiceMessage::DiscoverPeers(discovery_subnets)); } Ok(()) } - /// Checks if we have a discover peers event already and sends a new event if necessary - /// - /// If a message exists for the same subnet, compare the `min_ttl` of the current and - /// existing messages and extend the existing message as necessary. - fn send_or_update_discovery_event(&mut self, subnet_id: SubnetId, min_ttl: Option) { - // track whether this message already exists in the event queue - let mut is_duplicate = false; - - self.events.iter_mut().for_each(|event| { - if let AttServiceMessage::DiscoverPeers { - subnet_id: other_subnet_id, - min_ttl: other_min_ttl, - } = event - { - if subnet_id == *other_subnet_id { - let other_min_ttl_clone = *other_min_ttl; - match (min_ttl, other_min_ttl_clone) { - (Some(min_ttl_instant), Some(other_min_ttl_instant)) => - // only update the min_ttl if it is greater than the existing min_ttl and a DURATION_DIFFERENCE padding - { - if min_ttl_instant.saturating_duration_since(other_min_ttl_instant) - > DURATION_DIFFERENCE - { - *other_min_ttl = min_ttl; - } - } - (None, Some(_)) => {} // Keep the current one as it has an actual min_ttl - (Some(min_ttl), None) => { - // Update the request to include a min_ttl. - *other_min_ttl = Some(min_ttl); - } - (None, None) => {} // Duplicate message, do nothing. - } - is_duplicate = true; - return; - } - }; - }); - if !is_duplicate { - self.events - .push_back(AttServiceMessage::DiscoverPeers { subnet_id, min_ttl }); - } - } - /// Checks the current random subnets and subscriptions to determine if a new subscription for this /// subnet is required for the given slot. /// @@ -547,7 +458,11 @@ impl AttestationService { if !already_subscribed { // send a discovery request and a subscription - self.send_or_update_discovery_event(subnet_id, None); + self.events + .push_back(AttServiceMessage::DiscoverPeers(vec![SubnetDiscovery { + subnet_id, + min_ttl: None, + }])); self.events .push_back(AttServiceMessage::Subscribe(subnet_id)); } @@ -558,20 +473,6 @@ impl AttestationService { /* A collection of functions that handle the various timeouts */ - /// Request a discovery query to find peers for a particular subnet. - fn handle_discover_peers(&mut self, exact_subnet: ExactSubnet) { - debug!(self.log, "Searching for peers for subnet"; "subnet" => *exact_subnet.subnet_id, "target_slot" => exact_subnet.slot); - - // add one slot to ensure we keep the peer for the subscription slot - let min_ttl = self - .beacon_chain - .slot_clock - .duration_to_slot(exact_subnet.slot + 1) - .map(|duration| std::time::Instant::now() + duration); - - self.send_or_update_discovery_event(exact_subnet.subnet_id, min_ttl) - } - /// A queued subscription is ready. /// /// We add subscriptions events even if we are already subscribed to a random subnet (as these @@ -731,15 +632,6 @@ impl Stream for AttestationService { self.waker = Some(cx.waker().clone()); } - // process any peer discovery events - match self.discover_peers.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(exact_subnet))) => self.handle_discover_peers(exact_subnet), - Poll::Ready(Some(Err(e))) => { - error!(self.log, "Failed to check for peer discovery requests"; "error"=> e); - } - Poll::Ready(None) | Poll::Pending => {} - } - // process any subscription events match self.subscriptions.poll_next_unpin(cx) { Poll::Ready(Some(Ok(exact_subnet))) => self.handle_subscriptions(exact_subnet), diff --git a/beacon_node/network/src/attestation_service/tests/mod.rs b/beacon_node/network/src/attestation_service/tests/mod.rs index 84d792c176..5e1ef76a29 100644 --- a/beacon_node/network/src/attestation_service/tests/mod.rs +++ b/beacon_node/network/src/attestation_service/tests/mod.rs @@ -8,7 +8,9 @@ mod tests { migrate::NullMigrator, }; use eth2_libp2p::discovery::{build_enr, Keypair}; - use eth2_libp2p::{discovery::CombinedKey, CombinedKeyExt, NetworkConfig, NetworkGlobals}; + use eth2_libp2p::{ + discovery::CombinedKey, CombinedKeyExt, NetworkConfig, NetworkGlobals, SubnetDiscovery, + }; use futures::Stream; use genesis::{generate_deterministic_keypairs, interop_genesis_state}; use lazy_static::lazy_static; @@ -120,23 +122,21 @@ mod tests { } } - fn _get_subscriptions( + fn get_subscriptions( validator_count: u64, slot: Slot, committee_count_at_slot: u64, ) -> Vec { - let mut subscriptions: Vec = Vec::new(); - for validator_index in 0..validator_count { - let is_aggregator = true; - subscriptions.push(ValidatorSubscription { - validator_index, - attestation_committee_index: validator_index, - slot, - committee_count_at_slot, - is_aggregator, - }); - } - subscriptions + (0..validator_count) + .map(|validator_index| { + get_subscription( + validator_index, + validator_index, + slot, + committee_count_at_slot, + ) + }) + .collect() } // gets a number of events from the subscription service, or returns none if it times out after a number @@ -210,14 +210,7 @@ mod tests { let events = get_events(attestation_service, no_events_expected, 1).await; assert_matches!( events[..3], - [ - AttServiceMessage::DiscoverPeers { - subnet_id: _any_subnet, - min_ttl: _any_instant - }, - AttServiceMessage::Subscribe(_any1), - AttServiceMessage::EnrAdd(_any3) - ] + [AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any1), AttServiceMessage::EnrAdd(_any3)] ); // if there are fewer events than expected, there's been a collision if events.len() == no_events_expected { @@ -270,14 +263,7 @@ mod tests { let events = get_events(attestation_service, no_events_expected, 2).await; assert_matches!( events[..3], - [ - AttServiceMessage::DiscoverPeers { - subnet_id: _any_subnet, - min_ttl: _any_instant - }, - AttServiceMessage::Subscribe(_any1), - AttServiceMessage::EnrAdd(_any3) - ] + [AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any1), AttServiceMessage::EnrAdd(_any3)] ); // if there are fewer events than expected, there's been a collision if events.len() == no_events_expected { @@ -330,19 +316,15 @@ mod tests { &attestation_service.beacon_chain.spec, ) .unwrap(); - let expected = vec![AttServiceMessage::DiscoverPeers { subnet_id, min_ttl }]; + let expected = vec![AttServiceMessage::DiscoverPeers(vec![SubnetDiscovery { + subnet_id, + min_ttl, + }])]; let events = get_events(attestation_service, no_events_expected, 1).await; assert_matches!( events[..3], - [ - AttServiceMessage::DiscoverPeers { - subnet_id: _any_subnet, - min_ttl: _any_instant - }, - AttServiceMessage::Subscribe(_any2), - AttServiceMessage::EnrAdd(_any3) - ] + [AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any2), AttServiceMessage::EnrAdd(_any3)] ); // if there are fewer events than expected, there's been a collision if events.len() == no_events_expected { @@ -396,21 +378,14 @@ mod tests { ) .unwrap(); let expected = vec![ - AttServiceMessage::DiscoverPeers { subnet_id, min_ttl }, + AttServiceMessage::DiscoverPeers(vec![SubnetDiscovery { subnet_id, min_ttl }]), AttServiceMessage::Subscribe(subnet_id), ]; let events = get_events(attestation_service, no_events_expected, 5).await; assert_matches!( events[..3], - [ - AttServiceMessage::DiscoverPeers { - subnet_id: _any_subnet, - min_ttl: _any_instant - }, - AttServiceMessage::Subscribe(_any2), - AttServiceMessage::EnrAdd(_any3) - ] + [AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any2), AttServiceMessage::EnrAdd(_any3)] ); // if there are fewer events than expected, there's been a collision if events.len() == no_events_expected { @@ -454,14 +429,7 @@ mod tests { assert_matches!( events[..3], - [ - AttServiceMessage::DiscoverPeers { - subnet_id: _any_subnet, - min_ttl: _any_instant - }, - AttServiceMessage::Subscribe(_any2), - AttServiceMessage::EnrAdd(_any3) - ] + [AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any2), AttServiceMessage::EnrAdd(_any3)] ); // if there are fewer events than expected, there's been a collision if events.len() == no_events_expected { @@ -517,20 +485,16 @@ mod tests { // expect discover peers because we will enter TARGET_PEER_DISCOVERY_SLOT_LOOK_AHEAD range let expected: Vec = - vec![AttServiceMessage::DiscoverPeers { subnet_id, min_ttl }]; + vec![AttServiceMessage::DiscoverPeers(vec![SubnetDiscovery { + subnet_id, + min_ttl, + }])]; let events = get_events(attestation_service, no_events_expected, 5).await; assert_matches!( events[..3], - [ - AttServiceMessage::DiscoverPeers { - subnet_id: _any_subnet, - min_ttl: _any_instant - }, - AttServiceMessage::Subscribe(_any2), - AttServiceMessage::EnrAdd(_any3) - ] + [AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any2), AttServiceMessage::EnrAdd(_any3)] ); // if there are fewer events than expected, there's been a collision if events.len() == no_events_expected { @@ -553,7 +517,7 @@ mod tests { .now() .expect("Could not get current slot"); - let subscriptions = _get_subscriptions( + let subscriptions = get_subscriptions( subscription_count, current_slot + subscription_slot, committee_count, @@ -572,10 +536,9 @@ mod tests { for event in events { match event { - AttServiceMessage::DiscoverPeers { - subnet_id: _any_subnet, - min_ttl: _any_instant, - } => discover_peer_count = discover_peer_count + 1, + AttServiceMessage::DiscoverPeers(_) => { + discover_peer_count = discover_peer_count + 1 + } AttServiceMessage::Subscribe(_any_subnet) => subscribe_count = subscribe_count + 1, AttServiceMessage::EnrAdd(_any_subnet) => enr_add_count = enr_add_count + 1, _ => unexpected_msg_count = unexpected_msg_count + 1, @@ -605,7 +568,7 @@ mod tests { .now() .expect("Could not get current slot"); - let subscriptions = _get_subscriptions( + let subscriptions = get_subscriptions( subscription_count, current_slot + subscription_slot, committee_count, @@ -624,10 +587,9 @@ mod tests { for event in events { match event { - AttServiceMessage::DiscoverPeers { - subnet_id: _any_subnet, - min_ttl: _any_instant, - } => discover_peer_count = discover_peer_count + 1, + AttServiceMessage::DiscoverPeers(_) => { + discover_peer_count = discover_peer_count + 1 + } AttServiceMessage::Subscribe(_any_subnet) => subscribe_count = subscribe_count + 1, AttServiceMessage::EnrAdd(_any_subnet) => enr_add_count = enr_add_count + 1, _ => unexpected_msg_count = unexpected_msg_count + 1, @@ -639,4 +601,40 @@ mod tests { assert_eq!(enr_add_count, 64); assert_eq!(unexpected_msg_count, 0); } + + #[tokio::test] + async fn test_discovery_peers_count() { + let subscription_slot = 10; + let validator_count = 32; + let committee_count = 1; + let expected_events = 97; + + // create the attestation service and subscriptions + let mut attestation_service = get_attestation_service(); + let current_slot = attestation_service + .beacon_chain + .slot_clock + .now() + .expect("Could not get current slot"); + + let subscriptions = get_subscriptions( + validator_count, + current_slot + subscription_slot, + committee_count, + ); + + // submit sthe subscriptions + attestation_service + .validator_subscriptions(subscriptions) + .unwrap(); + + let events = get_events(attestation_service, expected_events, 3).await; + + let event = events.get(96); + if let Some(AttServiceMessage::DiscoverPeers(d)) = event { + assert_eq!(d.len(), validator_count as usize); + } else { + panic!("Unexpected event {:?}", event); + } + } } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 873354bbc7..3eab4401a4 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -272,8 +272,8 @@ fn spawn_service( AttServiceMessage::EnrRemove(subnet_id) => { service.libp2p.swarm.update_enr_subnet(subnet_id, false); } - AttServiceMessage::DiscoverPeers{subnet_id, min_ttl} => { - service.libp2p.swarm.discover_subnet_peers(subnet_id, min_ttl); + AttServiceMessage::DiscoverPeers(subnets_to_discover) => { + service.libp2p.swarm.discover_subnet_peers(subnets_to_discover); } } }