diff --git a/Cargo.lock b/Cargo.lock index ec7315c6ca..d742276687 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1250,6 +1250,16 @@ version = "0.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b72465f46d518f6015d9cf07f7f3013a95dd6b9c2747c3d65ae0cce43929d14f" +[[package]] +name = "delay_map" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6716ce9729be9628979ae1ff63e8bc8b7ad53b5472a2633bf079607a55328d36" +dependencies = [ + "futures", + "tokio-util 0.6.10", +] + [[package]] name = "deposit_contract" version = "0.2.0" @@ -2554,15 +2564,6 @@ dependencies = [ "hashbrown 0.11.2", ] -[[package]] -name = "hashset_delay" -version = "0.2.0" -dependencies = [ - "futures", - "tokio", - "tokio-util 0.6.10", -] - [[package]] name = "headers" version = "0.3.7" @@ -3656,6 +3657,7 @@ dependencies = [ name = "lighthouse_network" version = "0.2.0" dependencies = [ + "delay_map", "directory", "dirs", "discv5", @@ -3666,7 +3668,6 @@ dependencies = [ "exit-future", "fnv", "futures", - "hashset_delay", "hex", "lazy_static", "libp2p", @@ -4146,6 +4147,7 @@ name = "network" version = "0.2.0" dependencies = [ "beacon_chain", + "delay_map", "derivative", "environment", "error-chain", @@ -4155,7 +4157,6 @@ dependencies = [ "fnv", "futures", "genesis", - "hashset_delay", "hex", "if-addrs 0.6.7", "igd", diff --git a/Cargo.toml b/Cargo.toml index 819f92d99e..a71a97a959 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,6 @@ members = [ "common/eth2_interop_keypairs", "common/eth2_network_config", "common/eth2_wallet_manager", - "common/hashset_delay", "common/lighthouse_metrics", "common/lighthouse_version", "common/lockfile", diff --git a/beacon_node/lighthouse_network/Cargo.toml b/beacon_node/lighthouse_network/Cargo.toml index bbef8a301b..c6ba530508 100644 --- a/beacon_node/lighthouse_network/Cargo.toml +++ b/beacon_node/lighthouse_network/Cargo.toml @@ -8,7 +8,6 @@ edition = "2021" discv5 = { version = "0.1.0-beta.13", features = ["libp2p"] } unsigned-varint = { version = "0.6.0", features = ["codec"] } types = { path = "../../consensus/types" } -hashset_delay = { path = "../../common/hashset_delay" } eth2_ssz_types = "0.2.2" serde = { version = "1.0.116", features = ["derive"] } serde_derive = "1.0.116" @@ -40,6 +39,7 @@ strum = { version = "0.24.0", features = ["derive"] } superstruct = "0.5.0" prometheus-client = "0.16.0" unused_port = { path = "../../common/unused_port" } +delay_map = "0.1.1" [dependencies.libp2p] version = "0.45.1" diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 55b3884454..63d0816604 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -5,8 +5,8 @@ use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCo use crate::{error, metrics, Gossipsub}; use crate::{NetworkGlobals, PeerId}; use crate::{Subnet, SubnetDiscovery}; +use delay_map::HashSetDelay; use discv5::Enr; -use hashset_delay::HashSetDelay; use libp2p::identify::IdentifyInfo; use peerdb::{client::ClientKind, BanOperation, BanResult, ScoreUpdateResult}; use rand::seq::SliceRandom; diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 5aae8652e7..87c7650fb5 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -17,7 +17,6 @@ environment = { path = "../../lighthouse/environment" } beacon_chain = { path = "../beacon_chain" } store = { path = "../store" } lighthouse_network = { path = "../lighthouse_network" } -hashset_delay = { path = "../../common/hashset_delay" } types = { path = "../../consensus/types" } slot_clock = { path = "../../common/slot_clock" } slog = { version = "2.5.2", features = ["max_level_trace"] } @@ -44,3 +43,4 @@ if-addrs = "0.6.4" strum = "0.24.0" tokio-util = { version = "0.6.3", features = ["time"] } derivative = "2.2.0" +delay_map = "0.1.1" diff --git a/beacon_node/network/src/subnet_service/attestation_subnets.rs b/beacon_node/network/src/subnet_service/attestation_subnets.rs index 475bd7f17d..ecca3c9682 100644 --- a/beacon_node/network/src/subnet_service/attestation_subnets.rs +++ b/beacon_node/network/src/subnet_service/attestation_subnets.rs @@ -3,19 +3,20 @@ //! determines whether attestations should be aggregated and/or passed to the beacon node. use super::SubnetServiceMessage; -use std::collections::{HashMap, HashSet, VecDeque}; +#[cfg(test)] +use std::collections::HashSet; +use std::collections::{HashMap, VecDeque}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use std::time::{Duration, Instant}; - -use futures::prelude::*; -use rand::seq::SliceRandom; -use slog::{debug, error, o, trace, warn}; +use std::time::Duration; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use hashset_delay::HashSetDelay; +use delay_map::{HashMapDelay, HashSetDelay}; +use futures::prelude::*; use lighthouse_network::{NetworkConfig, Subnet, SubnetDiscovery}; +use rand::seq::SliceRandom; +use slog::{debug, error, o, trace, warn}; use slot_clock::SlotClock; use types::{Attestation, EthSpec, Slot, SubnetId, ValidatorSubscription}; @@ -24,20 +25,29 @@ use crate::metrics; /// The minimum number of slots ahead that we attempt to discover peers for a subscription. If the /// slot is less than this number, skip the peer discovery process. /// Subnet discovery query takes at most 30 secs, 2 slots take 24s. -const MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 2; -/// The time (in slots) before a last seen validator is considered absent and we unsubscribe from the random -/// gossip topics that we subscribed to due to the validator connection. -const LAST_SEEN_VALIDATOR_TIMEOUT: u32 = 150; +pub(crate) const MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 2; +/// The time (in slots) before a last seen validator is considered absent and we unsubscribe from +/// the random gossip topics that we subscribed to due to the validator connection. +const LAST_SEEN_VALIDATOR_TIMEOUT_SLOTS: u32 = 150; /// The fraction of a slot that we subscribe to a subnet before the required slot. /// -/// Note: The time is calculated as `time = seconds_per_slot / ADVANCE_SUBSCRIPTION_TIME`. -const ADVANCE_SUBSCRIBE_TIME: u32 = 3; -/// The default number of slots before items in hash delay sets used by this class should expire. -/// 36s at 12s slot time -const DEFAULT_EXPIRATION_TIMEOUT: u32 = 3; +/// Currently a whole slot ahead. +const ADVANCE_SUBSCRIBE_SLOT_FRACTION: u32 = 1; + +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] +pub(crate) enum SubscriptionKind { + /// Long lived subscriptions. + /// + /// These have a longer duration and are advertised in our ENR. + LongLived, + /// Short lived subscriptions. + /// + /// Subscribing to these subnets has a short duration and we don't advertise it in our ENR. + ShortLived, +} /// A particular subnet at a given slot. -#[derive(PartialEq, Eq, Hash, Clone, Debug)] +#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)] pub struct ExactSubnet { /// The `SubnetId` associated with this subnet. pub subnet_id: SubnetId, @@ -52,17 +62,22 @@ pub struct AttestationService { /// A reference to the beacon chain to process received attestations. pub(crate) beacon_chain: Arc>, - /// The collection of currently subscribed random subnets mapped to their expiry deadline. - pub(crate) random_subnets: HashSetDelay, + /// Subnets we are currently subscribed to as short lived subscriptions. + /// + /// Once they expire, we unsubscribe from these. + short_lived_subscriptions: HashMapDelay, - /// The collection of all currently subscribed subnets (long-lived **and** short-lived). - subscriptions: HashSet, + /// Subnets we are currently subscribed to as long lived subscriptions. + /// + /// We advertise these in our ENR. When these expire, the subnet is removed from our ENR. + long_lived_subscriptions: HashMapDelay, - /// A collection of timeouts for when to unsubscribe from a shard subnet. - unsubscriptions: HashSetDelay, + /// Short lived subscriptions that need to be done in the future. + scheduled_short_lived_subscriptions: HashSetDelay, - /// A collection timeouts to track the existence of aggregate validator subscriptions at an `ExactSubnet`. - aggregate_validators_on_subnet: HashSetDelay, + /// A collection timeouts to track the existence of aggregate validator subscriptions at an + /// `ExactSubnet`. + aggregate_validators_on_subnet: Option>, /// A collection of seen validators. These dictate how many random subnets we should be /// subscribed to. As these time out, we unsubscribe for the required random subnets and update @@ -79,8 +94,8 @@ pub struct AttestationService { /// We are always subscribed to all subnets. subscribe_all_subnets: bool, - /// We process and aggregate all attestations on subscribed subnets. - import_all_attestations: bool, + /// For how many slots we subscribe to long lived subnets. + long_lived_subnet_subscription_slots: u64, /// The logger for the attestation service. log: slog::Logger, @@ -96,34 +111,36 @@ impl AttestationService { ) -> Self { let log = log.new(o!("service" => "attestation_service")); - // calculate the random subnet duration from the spec constants + // Calculate the random subnet duration from the spec constants. let spec = &beacon_chain.spec; let slot_duration = beacon_chain.slot_clock.slot_duration(); - let random_subnet_duration_millis = spec + let long_lived_subnet_subscription_slots = spec .epochs_per_random_subnet_subscription - .saturating_mul(T::EthSpec::slots_per_epoch()) - .saturating_mul(slot_duration.as_millis() as u64); + .saturating_mul(T::EthSpec::slots_per_epoch()); + let long_lived_subscription_duration = Duration::from_millis( + slot_duration.as_millis() as u64 * long_lived_subnet_subscription_slots, + ); - // Panics on overflow. Ensure LAST_SEEN_VALIDATOR_TIMEOUT is not too large. + // Panics on overflow. Ensure LAST_SEEN_VALIDATOR_TIMEOUT_SLOTS is not too large. let last_seen_val_timeout = slot_duration - .checked_mul(LAST_SEEN_VALIDATOR_TIMEOUT) + .checked_mul(LAST_SEEN_VALIDATOR_TIMEOUT_SLOTS) .expect("LAST_SEEN_VALIDATOR_TIMEOUT must not be ridiculously large"); - let default_timeout = slot_duration - .checked_mul(DEFAULT_EXPIRATION_TIMEOUT) - .expect("DEFAULT_EXPIRATION_TIMEOUT must not be ridiculoustly large"); + let track_validators = !config.import_all_attestations; + let aggregate_validators_on_subnet = + track_validators.then(|| HashSetDelay::new(slot_duration)); AttestationService { events: VecDeque::with_capacity(10), beacon_chain, - random_subnets: HashSetDelay::new(Duration::from_millis(random_subnet_duration_millis)), - subscriptions: HashSet::new(), - unsubscriptions: HashSetDelay::new(default_timeout), - aggregate_validators_on_subnet: HashSetDelay::new(default_timeout), + short_lived_subscriptions: HashMapDelay::new(slot_duration), + long_lived_subscriptions: HashMapDelay::new(long_lived_subscription_duration), + scheduled_short_lived_subscriptions: HashSetDelay::default(), + aggregate_validators_on_subnet, known_validators: HashSetDelay::new(last_seen_val_timeout), waker: None, - subscribe_all_subnets: config.subscribe_all_subnets, - import_all_attestations: config.import_all_attestations, discovery_disabled: config.disable_discovery, + subscribe_all_subnets: config.subscribe_all_subnets, + long_lived_subnet_subscription_slots, log, } } @@ -134,10 +151,25 @@ impl AttestationService { if self.subscribe_all_subnets { self.beacon_chain.spec.attestation_subnet_count as usize } else { - self.subscriptions.len() + self.short_lived_subscriptions + .keys() + .chain(self.long_lived_subscriptions.keys()) + .collect::>() + .len() } } + /// Give access to the current subscriptions for testing purposes. + #[cfg(test)] + pub(crate) fn subscriptions( + &self, + subscription_kind: SubscriptionKind, + ) -> &HashMapDelay { + match subscription_kind { + SubscriptionKind::LongLived => &self.long_lived_subscriptions, + SubscriptionKind::ShortLived => &self.short_lived_subscriptions, + } + } /// Processes a list of validator subscriptions. /// /// This will: @@ -158,7 +190,6 @@ impl AttestationService { let mut subnets_to_discover: HashMap = HashMap::new(); for subscription in subscriptions { metrics::inc_counter(&metrics::SUBNET_SUBSCRIPTION_REQUESTS); - //NOTE: We assume all subscriptions have been verified before reaching this service // Registers the validator with the attestation service. // This will subscribe to long-lived random subnets if required. @@ -205,8 +236,7 @@ impl AttestationService { if subscription.is_aggregator { metrics::inc_counter(&metrics::SUBNET_SUBSCRIPTION_AGGREGATOR_REQUESTS); - // set the subscription timer to subscribe to the next subnet if required - if let Err(e) = self.subscribe_to_subnet(exact_subnet.clone()) { + if let Err(e) = self.subscribe_to_subnet(exact_subnet) { warn!(self.log, "Subscription to subnet error"; "error" => e, @@ -234,10 +264,6 @@ impl AttestationService { }; } - // pre-emptively wake the thread to check for new events - if let Some(waker) = &self.waker { - waker.wake_by_ref(); - } Ok(()) } @@ -248,19 +274,27 @@ impl AttestationService { subnet: SubnetId, attestation: &Attestation, ) -> bool { - if self.import_all_attestations { - return true; - } - - let exact_subnet = ExactSubnet { - subnet_id: subnet, - slot: attestation.data.slot, - }; - self.aggregate_validators_on_subnet.contains(&exact_subnet) + self.aggregate_validators_on_subnet + .as_ref() + .map(|tracked_vals| { + tracked_vals.contains_key(&ExactSubnet { + subnet_id: subnet, + slot: attestation.data.slot, + }) + }) + .unwrap_or(true) } /* Internal private functions */ + /// Adds an event to the event queue and notifies that this service is ready to be polled + /// again. + fn queue_event(&mut self, ev: SubnetServiceMessage) { + self.events.push_back(ev); + if let Some(waker) = &self.waker { + waker.wake_by_ref() + } + } /// Checks if there are currently queued discovery requests and the time required to make the /// request. /// @@ -277,12 +311,13 @@ impl AttestationService { let discovery_subnets: Vec = exact_subnets .filter_map(|exact_subnet| { - // check if there is enough time to perform a discovery lookup + // Check if there is enough time to perform a discovery lookup. if exact_subnet.slot >= current_slot.saturating_add(MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD) { - // if the slot is more than epoch away, add an event to start looking for peers - // add one slot to ensure we keep the peer for the subscription slot + // Send out an event to start looking for peers. + // Require the peer for an additional slot to ensure we keep the peer for the + // duration of the subscription. let min_ttl = self .beacon_chain .slot_clock @@ -305,244 +340,279 @@ impl AttestationService { .collect(); if !discovery_subnets.is_empty() { - self.events - .push_back(SubnetServiceMessage::DiscoverPeers(discovery_subnets)); + self.queue_event(SubnetServiceMessage::DiscoverPeers(discovery_subnets)); } Ok(()) } - /// Checks the current random subnets and subscriptions to determine if a new subscription for this - /// subnet is required for the given slot. - /// - /// If required, adds a subscription event and an associated unsubscription event. - fn subscribe_to_subnet(&mut self, exact_subnet: ExactSubnet) -> Result<(), &'static str> { - // initialise timing variables - let current_slot = self - .beacon_chain - .slot_clock - .now() - .ok_or("Could not get the current slot")?; + // Subscribes to the subnet if it should be done immediately, or schedules it if required. + fn subscribe_to_subnet( + &mut self, + ExactSubnet { subnet_id, slot }: ExactSubnet, + ) -> Result<(), &'static str> { + let slot_duration = self.beacon_chain.slot_clock.slot_duration(); - // Calculate the duration to the unsubscription event. - // There are two main cases. Attempting to subscribe to the current slot and all others. - let expected_end_subscription_duration = if current_slot >= exact_subnet.slot { - self.beacon_chain + // Calculate how long before we need to subscribe to the subnet. + let time_to_subscription_start = { + // The short time we schedule the subscription before it's actually required. This + // ensures we are subscribed on time, and allows consecutive subscriptions to the same + // subnet to overlap, reducing subnet churn. + let advance_subscription_duration = slot_duration / ADVANCE_SUBSCRIBE_SLOT_FRACTION; + // The time to the required slot. + let time_to_subscription_slot = self + .beacon_chain .slot_clock - .duration_to_next_slot() - .ok_or("Unable to determine duration to next slot")? - } else { - let slot_duration = self.beacon_chain.slot_clock.slot_duration(); - - // the duration until we no longer need this subscription. We assume a single slot is - // sufficient. - self.beacon_chain - .slot_clock - .duration_to_slot(exact_subnet.slot) - .ok_or("Unable to determine duration to subscription slot")? - + slot_duration + .duration_to_slot(slot) + .unwrap_or_default(); // If this is a past slot we will just get a 0 duration. + time_to_subscription_slot.saturating_sub(advance_subscription_duration) }; - // Regardless of whether or not we have already subscribed to a subnet, track the expiration - // of aggregate validator subscriptions to exact subnets so we know whether or not to drop - // attestations for a given subnet + slot - self.aggregate_validators_on_subnet - .insert_at(exact_subnet.clone(), expected_end_subscription_duration); - - // Checks on current subscriptions - // Note: We may be connected to a long-lived random subnet. In this case we still add the - // subscription timeout and check this case when the timeout fires. This is because a - // long-lived random subnet can be unsubscribed at any time when a validator becomes - // in-active. This case is checked on the subscription event (see `handle_subscriptions`). - - // Return if we already have a subscription for this subnet_id and slot - if self.unsubscriptions.contains(&exact_subnet) || self.subscribe_all_subnets { - return Ok(()); + if let Some(tracked_vals) = self.aggregate_validators_on_subnet.as_mut() { + tracked_vals.insert(ExactSubnet { subnet_id, slot }); } - // We are not currently subscribed and have no waiting subscription, create one - self.handle_subscriptions(exact_subnet.clone()); + // If the subscription should be done in the future, schedule it. Otherwise subscribe + // immediately. + if time_to_subscription_start.is_zero() { + // This is a current or past slot, we subscribe immediately. + self.subscribe_to_subnet_immediately( + subnet_id, + SubscriptionKind::ShortLived, + slot + 1, + )?; + } else { + // This is a future slot, schedule subscribing. + trace!(self.log, "Scheduling subnet subscription"; "subnet" => ?subnet_id, "time_to_subscription_start" => ?time_to_subscription_start); + self.scheduled_short_lived_subscriptions + .insert_at(ExactSubnet { subnet_id, slot }, time_to_subscription_start); + } - // if there is an unsubscription event for the slot prior, we remove it to prevent - // unsubscriptions immediately after the subscription. We also want to minimize - // subscription churn and maintain a consecutive subnet subscriptions. - self.unsubscriptions.retain(|subnet| { - !(subnet.subnet_id == exact_subnet.subnet_id && subnet.slot <= exact_subnet.slot) - }); - // add an unsubscription event to remove ourselves from the subnet once completed - self.unsubscriptions - .insert_at(exact_subnet, expected_end_subscription_duration); Ok(()) } - /// Updates the `known_validators` mapping and subscribes to a set of random subnets if required. - /// - /// This also updates the ENR to indicate our long-lived subscription to the subnet + /// Updates the `known_validators` mapping and subscribes to long lived subnets if required. fn add_known_validator(&mut self, validator_index: u64) { - if self.known_validators.get(&validator_index).is_none() && !self.subscribe_all_subnets { - // New validator has subscribed - // Subscribe to random topics and update the ENR if needed. - - let spec = &self.beacon_chain.spec; - - if self.random_subnets.len() < spec.attestation_subnet_count as usize { - // Still room for subscriptions - self.subscribe_to_random_subnets( - self.beacon_chain.spec.random_subnets_per_validator as usize, - ); - } - } - // add the new validator or update the current timeout for a known validator + let previously_known = self.known_validators.contains_key(&validator_index); + // Add the new validator or update the current timeout for a known validator. self.known_validators.insert(validator_index); + if !previously_known { + // New validator has subscribed. + // Subscribe to random topics and update the ENR if needed. + self.subscribe_to_random_subnets(); + } } /// Subscribe to long-lived random subnets and update the local ENR bitfield. - fn subscribe_to_random_subnets(&mut self, no_subnets_to_subscribe: usize) { - let subnet_count = self.beacon_chain.spec.attestation_subnet_count; + /// The number of subnets to subscribe depends on the number of active validators and number of + /// current subscriptions. + fn subscribe_to_random_subnets(&mut self) { + if self.subscribe_all_subnets { + // This case is not handled by this service. + return; + } - // Build a list of random subnets that we are not currently subscribed to. - let available_subnets = (0..subnet_count) + let max_subnets = self.beacon_chain.spec.attestation_subnet_count; + // Calculate how many subnets we need, + let required_long_lived_subnets = { + let subnets_for_validators = self + .known_validators + .len() + .saturating_mul(self.beacon_chain.spec.random_subnets_per_validator as usize); + subnets_for_validators // How many subnets we need + .min(max_subnets as usize) // Capped by the max + .saturating_sub(self.long_lived_subscriptions.len()) // Minus those we have + }; + + if required_long_lived_subnets == 0 { + // Nothing to do. + return; + } + + // Build a list of the subnets that we are not currently advertising. + let available_subnets = (0..max_subnets) .map(SubnetId::new) - .filter(|subnet_id| self.random_subnets.get(subnet_id).is_none()) + .filter(|subnet_id| !self.long_lived_subscriptions.contains_key(subnet_id)) .collect::>(); - let to_subscribe_subnets = { - if available_subnets.len() < no_subnets_to_subscribe { - debug!(self.log, "Reached maximum random subnet subscriptions"); - available_subnets - } else { - // select a random sample of available subnets - available_subnets - .choose_multiple(&mut rand::thread_rng(), no_subnets_to_subscribe) - .cloned() - .collect::>() + let subnets_to_subscribe: Vec<_> = available_subnets + .choose_multiple(&mut rand::thread_rng(), required_long_lived_subnets) + .cloned() + .collect(); + + // Calculate in which slot does this subscription end. + let end_slot = match self.beacon_chain.slot_clock.now() { + Some(slot) => slot + self.long_lived_subnet_subscription_slots, + None => { + return debug!( + self.log, + "Failed to calculate end slot of long lived subnet subscriptions." + ) } }; - for subnet_id in to_subscribe_subnets { - // remove this subnet from any immediate un-subscription events - self.unsubscriptions - .retain(|exact_subnet| exact_subnet.subnet_id != subnet_id); - - // insert a new random subnet - self.random_subnets.insert(subnet_id); - - // send discovery request - // Note: it's wasteful to send a DiscoverPeers request if we already have peers for this subnet. - // However, subscribing to random subnets ideally shouldn't happen very often (once in ~27 hours) and - // this makes it easier to deterministically test the attestations service. - self.events - .push_back(SubnetServiceMessage::DiscoverPeers(vec![SubnetDiscovery { - subnet: Subnet::Attestation(subnet_id), - min_ttl: None, - }])); - - // if we are not already subscribed, then subscribe - if !self.subscriptions.contains(&subnet_id) { - self.subscriptions.insert(subnet_id); - debug!(self.log, "Subscribing to random subnet"; "subnet_id" => ?subnet_id); - self.events - .push_back(SubnetServiceMessage::Subscribe(Subnet::Attestation( - subnet_id, - ))); + for subnet_id in &subnets_to_subscribe { + if let Err(e) = self.subscribe_to_subnet_immediately( + *subnet_id, + SubscriptionKind::LongLived, + end_slot, + ) { + debug!(self.log, "Failed to subscribe to long lived subnet"; "subnet" => ?subnet_id, "err" => e); } - - // add the subnet to the ENR bitfield - self.events - .push_back(SubnetServiceMessage::EnrAdd(Subnet::Attestation(subnet_id))); } } /* A collection of functions that handle the various timeouts */ - /// A queued subscription is ready. + /// Registers a subnet as subscribed. /// - /// We add subscriptions events even if we are already subscribed to a random subnet (as these - /// can be unsubscribed at any time by inactive validators). If we are - /// still subscribed at the time the event fires, we don't re-subscribe. - fn handle_subscriptions(&mut self, exact_subnet: ExactSubnet) { - // Check if the subnet currently exists as a long-lasting random subnet - if let Some(expiry) = self.random_subnets.get(&exact_subnet.subnet_id) { - // we are subscribed via a random subnet, if this is to expire during the time we need - // to be subscribed, just extend the expiry - let slot_duration = self.beacon_chain.slot_clock.slot_duration(); - let advance_subscription_duration = slot_duration - .checked_div(ADVANCE_SUBSCRIBE_TIME) - .expect("ADVANCE_SUBSCRIPTION_TIME cannot be too large"); - // we require the subnet subscription for at least a slot on top of the initial - // subscription time - let expected_end_subscription_duration = slot_duration + advance_subscription_duration; + /// Checks that the time in which the subscription would end is not in the past. If we are + /// already subscribed, extends the timeout if necessary. If this is a new subscription, we send + /// out the appropriate events. + fn subscribe_to_subnet_immediately( + &mut self, + subnet_id: SubnetId, + subscription_kind: SubscriptionKind, + end_slot: Slot, + ) -> Result<(), &'static str> { + if self.subscribe_all_subnets { + // Case not handled by this service. + return Ok(()); + } - if expiry < &(Instant::now() + expected_end_subscription_duration) { - self.random_subnets - .update_timeout(&exact_subnet.subnet_id, expected_end_subscription_duration); + let time_to_subscription_end = self + .beacon_chain + .slot_clock + .duration_to_slot(end_slot) + .unwrap_or_default(); + + // First check this is worth doing. + if time_to_subscription_end.is_zero() { + return Err("Time when subscription would end has already passed."); + } + + // We need to check and add a subscription for the right kind, regardless of the presence + // of the subnet as a subscription of the other kind. This is mainly since long lived + // subscriptions can be removed at any time when a validator goes offline. + let (subscriptions, already_subscribed_as_other_kind) = match subscription_kind { + SubscriptionKind::ShortLived => ( + &mut self.short_lived_subscriptions, + self.long_lived_subscriptions.contains_key(&subnet_id), + ), + SubscriptionKind::LongLived => ( + &mut self.long_lived_subscriptions, + self.short_lived_subscriptions.contains_key(&subnet_id), + ), + }; + + match subscriptions.get(&subnet_id) { + Some(current_end_slot) => { + // We are already subscribed. Check if we need to extend the subscription. + if &end_slot > current_end_slot { + trace!(self.log, "Extending subscription to subnet"; + "subnet" => ?subnet_id, + "prev_end_slot" => current_end_slot, + "new_end_slot" => end_slot, + "subscription_kind" => ?subscription_kind, + ); + subscriptions.insert_at(subnet_id, end_slot, time_to_subscription_end); + } } - } else { - // we are also not un-subscribing from a subnet if the next slot requires us to be - // subscribed. Therefore there could be the case that we are already still subscribed - // to the required subnet. In which case we do not issue another subscription request. - if !self.subscriptions.contains(&exact_subnet.subnet_id) { - // we are not already subscribed - debug!(self.log, "Subscribing to subnet"; "subnet" => *exact_subnet.subnet_id, "target_slot" => exact_subnet.slot.as_u64()); - self.subscriptions.insert(exact_subnet.subnet_id); - self.events - .push_back(SubnetServiceMessage::Subscribe(Subnet::Attestation( - exact_subnet.subnet_id, + None => { + // This is a new subscription. Add with the corresponding timeout and send the + // notification. + subscriptions.insert_at(subnet_id, end_slot, time_to_subscription_end); + + // Inform of the subscription. + if !already_subscribed_as_other_kind { + debug!(self.log, "Subscribing to subnet"; + "subnet" => ?subnet_id, + "end_slot" => end_slot, + "subscription_kind" => ?subscription_kind, + ); + self.queue_event(SubnetServiceMessage::Subscribe(Subnet::Attestation( + subnet_id, ))); + } + + // If this is a new long lived subscription, send out the appropriate events. + if SubscriptionKind::LongLived == subscription_kind { + let subnet = Subnet::Attestation(subnet_id); + // Advertise this subnet in our ENR. + self.long_lived_subscriptions.insert_at( + subnet_id, + end_slot, + time_to_subscription_end, + ); + self.queue_event(SubnetServiceMessage::EnrAdd(subnet)); + + if !self.discovery_disabled { + self.queue_event(SubnetServiceMessage::DiscoverPeers(vec![ + SubnetDiscovery { + subnet, + min_ttl: None, + }, + ])) + } + } } } - } - /// A queued unsubscription is ready. - /// - /// Unsubscription events are added, even if we are subscribed to long-lived random subnets. If - /// a random subnet is present, we do not unsubscribe from it. - fn handle_unsubscriptions(&mut self, exact_subnet: ExactSubnet) { - // Check if the subnet currently exists as a long-lasting random subnet - if self.random_subnets.contains(&exact_subnet.subnet_id) { - return; - } - - debug!(self.log, "Unsubscribing from subnet"; "subnet" => *exact_subnet.subnet_id, "processed_slot" => exact_subnet.slot.as_u64()); - - self.subscriptions.remove(&exact_subnet.subnet_id); - self.events - .push_back(SubnetServiceMessage::Unsubscribe(Subnet::Attestation( - exact_subnet.subnet_id, - ))); + Ok(()) } /// A random subnet has expired. /// /// This function selects a new subnet to join, or extends the expiry if there are no more /// available subnets to choose from. - fn handle_random_subnet_expiry(&mut self, subnet_id: SubnetId) { + fn handle_random_subnet_expiry(&mut self, subnet_id: SubnetId, end_slot: Slot) { let subnet_count = self.beacon_chain.spec.attestation_subnet_count; - if self.random_subnets.len() == (subnet_count - 1) as usize { - // We are at capacity, simply increase the timeout of the current subnet - self.random_subnets.insert(subnet_id); - return; - } - // If there are no unsubscription events for `subnet_id`, we unsubscribe immediately. - if !self - .unsubscriptions - .keys() - .any(|s| s.subnet_id == subnet_id) - { - // we are not at capacity, unsubscribe from the current subnet. - debug!(self.log, "Unsubscribing from random subnet"; "subnet_id" => *subnet_id); - self.events - .push_back(SubnetServiceMessage::Unsubscribe(Subnet::Attestation( + if self.long_lived_subscriptions.len() == (subnet_count - 1) as usize { + let end_slot = end_slot + self.long_lived_subnet_subscription_slots; + // This is just an extra accuracy precaution, we could use the default timeout if + // needed. + if let Some(time_to_subscription_end) = + self.beacon_chain.slot_clock.duration_to_slot(end_slot) + { + // We are at capacity, simply increase the timeout of the current subnet. + self.long_lived_subscriptions.insert_at( subnet_id, - ))); + end_slot + 1, + time_to_subscription_end, + ); + } else { + self.long_lived_subscriptions.insert(subnet_id, end_slot); + } + return; } // Remove the ENR bitfield bit and choose a new random on from the available subnets - self.events - .push_back(SubnetServiceMessage::EnrRemove(Subnet::Attestation( + // Subscribe to a new random subnet. + self.subscribe_to_random_subnets(); + } + + // Unsubscribes from a subnet that was removed if it does not continue to exist as a + // subscription of the other kind. For long lived subscriptions, it also removes the + // advertisement from our ENR. + fn handle_removed_subnet(&mut self, subnet_id: SubnetId, subscription_kind: SubscriptionKind) { + let other_subscriptions = match subscription_kind { + SubscriptionKind::LongLived => &self.short_lived_subscriptions, + SubscriptionKind::ShortLived => &self.long_lived_subscriptions, + }; + + if !other_subscriptions.contains_key(&subnet_id) { + // Subscription no longer exists as short lived or long lived. + debug!(self.log, "Unsubscribing from subnet"; "subnet" => ?subnet_id, "subscription_kind" => ?subscription_kind); + self.queue_event(SubnetServiceMessage::Unsubscribe(Subnet::Attestation( subnet_id, ))); - // Subscribe to a new random subnet - self.subscribe_to_random_subnets(1); + } + + if subscription_kind == SubscriptionKind::LongLived { + // Remove from our ENR even if we remain subscribed in other way. + self.queue_event(SubnetServiceMessage::EnrRemove(Subnet::Attestation( + subnet_id, + ))); + } } /// A known validator has not sent a subscription in a while. They are considered offline and the @@ -552,39 +622,37 @@ impl AttestationService { /// validators to random subnets. So when a validator goes offline, we can simply remove the /// allocated amount of random subnets. fn handle_known_validator_expiry(&mut self) { - let spec = &self.beacon_chain.spec; - let subnet_count = spec.attestation_subnet_count; - let random_subnets_per_validator = spec.random_subnets_per_validator; - if self.known_validators.len() as u64 * random_subnets_per_validator >= subnet_count { - // have too many validators, ignore + // Calculate how many subnets should we remove. + let extra_subnet_count = { + let max_subnets = self.beacon_chain.spec.attestation_subnet_count; + let subnets_for_validators = self + .known_validators + .len() + .saturating_mul(self.beacon_chain.spec.random_subnets_per_validator as usize) + .min(max_subnets as usize); + + self.long_lived_subscriptions + .len() + .saturating_sub(subnets_for_validators) + }; + + if extra_subnet_count == 0 { + // Nothing to do return; } - let subscribed_subnets = self.random_subnets.keys().cloned().collect::>(); - let to_remove_subnets = subscribed_subnets.choose_multiple( - &mut rand::thread_rng(), - random_subnets_per_validator as usize, - ); + let advertised_subnets = self + .long_lived_subscriptions + .keys() + .cloned() + .collect::>(); + let to_remove_subnets = advertised_subnets + .choose_multiple(&mut rand::thread_rng(), extra_subnet_count) + .cloned(); for subnet_id in to_remove_subnets { - // If there are no unsubscription events for `subnet_id`, we unsubscribe immediately. - if !self - .unsubscriptions - .keys() - .any(|s| s.subnet_id == *subnet_id) - { - self.events - .push_back(SubnetServiceMessage::Unsubscribe(Subnet::Attestation( - *subnet_id, - ))); - } - // as the long lasting subnet subscription is being removed, remove the subnet_id from - // the ENR bitfield - self.events - .push_back(SubnetServiceMessage::EnrRemove(Subnet::Attestation( - *subnet_id, - ))); - self.random_subnets.remove(subnet_id); + self.long_lived_subscriptions.remove(&subnet_id); + self.handle_removed_subnet(subnet_id, SubscriptionKind::LongLived); } } } @@ -593,7 +661,7 @@ impl Stream for AttestationService { type Item = SubnetServiceMessage; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // update the waker if needed + // Update the waker if needed. if let Some(waker) = &self.waker { if waker.will_wake(cx.waker()) { self.waker = Some(cx.waker().clone()); @@ -602,25 +670,13 @@ impl Stream for AttestationService { self.waker = Some(cx.waker().clone()); } - // process any un-subscription events - match self.unsubscriptions.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(exact_subnet))) => self.handle_unsubscriptions(exact_subnet), - Poll::Ready(Some(Err(e))) => { - error!(self.log, "Failed to check for subnet unsubscription times"; "error"=> e); - } - Poll::Ready(None) | Poll::Pending => {} + // Send out any generated events. + if let Some(event) = self.events.pop_front() { + return Poll::Ready(Some(event)); } - // process any random subnet expiries - match self.random_subnets.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(subnet))) => self.handle_random_subnet_expiry(subnet), - Poll::Ready(Some(Err(e))) => { - error!(self.log, "Failed to check for random subnet cycles"; "error"=> e); - } - Poll::Ready(None) | Poll::Pending => {} - } - - // process any known validator expiries + // Process first any known validator expiries, since these affect how many long lived + // subnets we need. match self.known_validators.poll_next_unpin(cx) { Poll::Ready(Some(Ok(_validator_index))) => { self.handle_known_validator_expiry(); @@ -630,14 +686,52 @@ impl Stream for AttestationService { } Poll::Ready(None) | Poll::Pending => {} } - // poll to remove entries on expiration, no need to act on expiration events - if let Poll::Ready(Some(Err(e))) = self.aggregate_validators_on_subnet.poll_next_unpin(cx) { - error!(self.log, "Failed to check for aggregate validator on subnet expirations"; "error"=> e); + + // Process scheduled subscriptions that might be ready, since those can extend a soon to + // expire subscription. + match self.scheduled_short_lived_subscriptions.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(ExactSubnet { subnet_id, slot }))) => { + if let Err(e) = self.subscribe_to_subnet_immediately( + subnet_id, + SubscriptionKind::ShortLived, + slot + 1, + ) { + debug!(self.log, "Failed to subscribe to short lived subnet"; "subnet" => ?subnet_id, "err" => e); + } + } + Poll::Ready(Some(Err(e))) => { + error!(self.log, "Failed to check for scheduled subnet subscriptions"; "error"=> e); + } + Poll::Ready(None) | Poll::Pending => {} } - // process any generated events - if let Some(event) = self.events.pop_front() { - return Poll::Ready(Some(event)); + // Finally process any expired subscriptions. + match self.short_lived_subscriptions.poll_next_unpin(cx) { + Poll::Ready(Some(Ok((subnet_id, _end_slot)))) => { + self.handle_removed_subnet(subnet_id, SubscriptionKind::ShortLived); + } + Poll::Ready(Some(Err(e))) => { + error!(self.log, "Failed to check for subnet unsubscription times"; "error"=> e); + } + Poll::Ready(None) | Poll::Pending => {} + } + + // Process any random subnet expiries. + match self.long_lived_subscriptions.poll_next_unpin(cx) { + Poll::Ready(Some(Ok((subnet_id, end_slot)))) => { + self.handle_random_subnet_expiry(subnet_id, end_slot) + } + Poll::Ready(Some(Err(e))) => { + error!(self.log, "Failed to check for random subnet cycles"; "error"=> e); + } + Poll::Ready(None) | Poll::Pending => {} + } + + // Poll to remove entries on expiration, no need to act on expiration events. + if let Some(tracked_vals) = self.aggregate_validators_on_subnet.as_mut() { + if let Poll::Ready(Some(Err(e))) = tracked_vals.poll_next_unpin(cx) { + error!(self.log, "Failed to check for aggregate validator on subnet expirations"; "error"=> e); + } } Poll::Pending diff --git a/beacon_node/network/src/subnet_service/sync_subnets.rs b/beacon_node/network/src/subnet_service/sync_subnets.rs index 9e92f62250..0b27ff527f 100644 --- a/beacon_node/network/src/subnet_service/sync_subnets.rs +++ b/beacon_node/network/src/subnet_service/sync_subnets.rs @@ -12,7 +12,7 @@ use slog::{debug, error, o, trace, warn}; use super::SubnetServiceMessage; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use hashset_delay::HashSetDelay; +use delay_map::HashSetDelay; use lighthouse_network::{NetworkConfig, Subnet, SubnetDiscovery}; use slot_clock::SlotClock; use types::{Epoch, EthSpec, SyncCommitteeSubscription, SyncSubnetId}; diff --git a/beacon_node/network/src/subnet_service/tests/mod.rs b/beacon_node/network/src/subnet_service/tests/mod.rs index 778eb63263..65ca9f2194 100644 --- a/beacon_node/network/src/subnet_service/tests/mod.rs +++ b/beacon_node/network/src/subnet_service/tests/mod.rs @@ -8,7 +8,7 @@ use futures::prelude::*; use genesis::{generate_deterministic_keypairs, interop_genesis_state, DEFAULT_ETH1_BLOCK_HASH}; use lazy_static::lazy_static; use lighthouse_network::NetworkConfig; -use slog::Logger; +use slog::{o, Drain, Logger}; use sloggers::{null::NullLoggerBuilder, Build}; use slot_clock::{SlotClock, SystemTimeSlotClock}; use std::sync::Arc; @@ -42,7 +42,7 @@ impl TestBeaconChain { let keypairs = generate_deterministic_keypairs(1); - let log = get_logger(); + let log = get_logger(None); let store = HotColdDB::open_ephemeral(StoreConfig::default(), spec.clone(), log.clone()).unwrap(); @@ -93,16 +93,32 @@ pub fn recent_genesis_time() -> u64 { .as_secs() } -fn get_logger() -> Logger { - NullLoggerBuilder.build().expect("logger should build") +fn get_logger(log_level: Option) -> Logger { + if let Some(level) = log_level { + let drain = { + let decorator = slog_term::TermDecorator::new().build(); + let decorator = + logging::AlignedTermDecorator::new(decorator, logging::MAX_MESSAGE_WIDTH); + let drain = slog_term::FullFormat::new(decorator).build().fuse(); + let drain = slog_async::Async::new(drain).chan_size(2048).build(); + drain.filter_level(level) + }; + + Logger::root(drain.fuse(), o!()) + } else { + let builder = NullLoggerBuilder; + builder.build().expect("should build logger") + } } lazy_static! { static ref CHAIN: TestBeaconChain = TestBeaconChain::new_with_system_clock(); } -fn get_attestation_service() -> AttestationService { - let log = get_logger(); +fn get_attestation_service( + log_level: Option, +) -> AttestationService { + let log = get_logger(log_level); let config = NetworkConfig::default(); let beacon_chain = CHAIN.chain.clone(); @@ -111,7 +127,7 @@ fn get_attestation_service() -> AttestationService { } fn get_sync_committee_service() -> SyncCommitteeService { - let log = get_logger(); + let log = get_logger(None); let config = NetworkConfig::default(); let beacon_chain = CHAIN.chain.clone(); @@ -128,28 +144,34 @@ async fn get_events + Unpin>( ) -> Vec { let mut events = Vec::new(); - let collect_stream_fut = async { - loop { - if let Some(result) = stream.next().await { - events.push(result); + let timeout = + tokio::time::sleep(Duration::from_millis(SLOT_DURATION_MILLIS) * num_slots_before_timeout); + futures::pin_mut!(timeout); + + loop { + tokio::select! { + Some(event) = stream.next() => { + events.push(event); if let Some(num) = num_events { if events.len() == num { - return; + break; } } } - } - }; + _ = timeout.as_mut() => { + break; + } - tokio::select! { - _ = collect_stream_fut => events, - _ = tokio::time::sleep( - Duration::from_millis(SLOT_DURATION_MILLIS) * num_slots_before_timeout, - ) => events + } } + + events } mod attestation_service { + + use crate::subnet_service::attestation_subnets::MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD; + use super::*; fn get_subscription( @@ -195,7 +217,7 @@ mod attestation_service { let committee_count = 1; // create the attestation service and subscriptions - let mut attestation_service = get_attestation_service(); + let mut attestation_service = get_attestation_service(None); let current_slot = attestation_service .beacon_chain .slot_clock @@ -237,15 +259,18 @@ mod attestation_service { matches::assert_matches!( events[..3], [ - SubnetServiceMessage::DiscoverPeers(_), SubnetServiceMessage::Subscribe(_any1), - SubnetServiceMessage::EnrAdd(_any3) + SubnetServiceMessage::EnrAdd(_any3), + SubnetServiceMessage::DiscoverPeers(_), ] ); // If the long lived and short lived subnets are the same, there should be no more events // as we don't resubscribe already subscribed subnets. - if !attestation_service.random_subnets.contains(&subnet_id) { + if !attestation_service + .subscriptions(attestation_subnets::SubscriptionKind::LongLived) + .contains_key(&subnet_id) + { assert_eq!(expected[..], events[3..]); } // Should be subscribed to only 1 long lived subnet after unsubscription. @@ -267,7 +292,7 @@ mod attestation_service { let com2 = 0; // create the attestation service and subscriptions - let mut attestation_service = get_attestation_service(); + let mut attestation_service = get_attestation_service(None); let current_slot = attestation_service .beacon_chain .slot_clock @@ -319,16 +344,19 @@ mod attestation_service { matches::assert_matches!( events[..3], [ - SubnetServiceMessage::DiscoverPeers(_), SubnetServiceMessage::Subscribe(_any1), - SubnetServiceMessage::EnrAdd(_any3) + SubnetServiceMessage::EnrAdd(_any3), + SubnetServiceMessage::DiscoverPeers(_), ] ); let expected = SubnetServiceMessage::Subscribe(Subnet::Attestation(subnet_id1)); // Should be still subscribed to 1 long lived and 1 short lived subnet if both are different. - if !attestation_service.random_subnets.contains(&subnet_id1) { + if !attestation_service + .subscriptions(attestation_subnets::SubscriptionKind::LongLived) + .contains_key(&subnet_id1) + { assert_eq!(expected, events[3]); assert_eq!(attestation_service.subscription_count(), 2); } else { @@ -339,7 +367,10 @@ mod attestation_service { let unsubscribe_event = get_events(&mut attestation_service, None, 1).await; // If the long lived and short lived subnets are different, we should get an unsubscription event. - if !attestation_service.random_subnets.contains(&subnet_id1) { + if !attestation_service + .subscriptions(attestation_subnets::SubscriptionKind::LongLived) + .contains_key(&subnet_id1) + { assert_eq!( [SubnetServiceMessage::Unsubscribe(Subnet::Attestation( subnet_id1 @@ -360,7 +391,7 @@ mod attestation_service { let committee_count = 1; // create the attestation service and subscriptions - let mut attestation_service = get_attestation_service(); + let mut attestation_service = get_attestation_service(None); let current_slot = attestation_service .beacon_chain .slot_clock @@ -418,7 +449,7 @@ mod attestation_service { let committee_count = 1; // create the attestation service and subscriptions - let mut attestation_service = get_attestation_service(); + let mut attestation_service = get_attestation_service(None); let current_slot = attestation_service .beacon_chain .slot_clock @@ -465,6 +496,122 @@ mod attestation_service { assert_eq!(enr_add_count, 64); assert_eq!(unexpected_msg_count, 0); } + + #[tokio::test] + async fn test_subscribe_same_subnet_several_slots_apart() { + // subscription config + let validator_index = 1; + let committee_count = 1; + + // Makes 2 validator subscriptions to the same subnet but at different slots. + // There should be just 1 unsubscription event for the later slot subscription (subscription_slot2). + let subscription_slot1 = 0; + let subscription_slot2 = MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD + 4; + let com1 = MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD + 4; + let com2 = 0; + + // create the attestation service and subscriptions + let mut attestation_service = get_attestation_service(None); + let current_slot = attestation_service + .beacon_chain + .slot_clock + .now() + .expect("Could not get current slot"); + + let sub1 = get_subscription( + validator_index, + com1, + current_slot + Slot::new(subscription_slot1), + committee_count, + ); + + let sub2 = get_subscription( + validator_index, + com2, + current_slot + Slot::new(subscription_slot2), + committee_count, + ); + + let subnet_id1 = SubnetId::compute_subnet::( + current_slot + Slot::new(subscription_slot1), + com1, + committee_count, + &attestation_service.beacon_chain.spec, + ) + .unwrap(); + + let subnet_id2 = SubnetId::compute_subnet::( + current_slot + Slot::new(subscription_slot2), + com2, + committee_count, + &attestation_service.beacon_chain.spec, + ) + .unwrap(); + + // Assert that subscriptions are different but their subnet is the same + assert_ne!(sub1, sub2); + assert_eq!(subnet_id1, subnet_id2); + + // submit the subscriptions + attestation_service + .validator_subscriptions(vec![sub1, sub2]) + .unwrap(); + + // Unsubscription event should happen at the end of the slot. + let events = get_events(&mut attestation_service, None, 1).await; + matches::assert_matches!( + events[..3], + [ + SubnetServiceMessage::Subscribe(_any1), + SubnetServiceMessage::EnrAdd(_any3), + SubnetServiceMessage::DiscoverPeers(_), + ] + ); + + let expected_subscription = + SubnetServiceMessage::Subscribe(Subnet::Attestation(subnet_id1)); + let expected_unsubscription = + SubnetServiceMessage::Unsubscribe(Subnet::Attestation(subnet_id1)); + + if !attestation_service + .subscriptions(attestation_subnets::SubscriptionKind::LongLived) + .contains_key(&subnet_id1) + { + assert_eq!(expected_subscription, events[3]); + // fourth is a discovery event + assert_eq!(expected_unsubscription, events[5]); + } + assert_eq!(attestation_service.subscription_count(), 1); + + println!("{events:?}"); + let subscription_slot = current_slot + subscription_slot2 - 1; // one less do to the + // advance subscription time + let wait_slots = attestation_service + .beacon_chain + .slot_clock + .duration_to_slot(subscription_slot) + .unwrap() + .as_millis() as u64 + / SLOT_DURATION_MILLIS; + + let no_events = dbg!(get_events(&mut attestation_service, None, wait_slots as u32).await); + + assert_eq!(no_events, []); + + let second_subscribe_event = get_events(&mut attestation_service, None, 2).await; + // If the long lived and short lived subnets are different, we should get an unsubscription event. + if !attestation_service + .subscriptions(attestation_subnets::SubscriptionKind::LongLived) + .contains_key(&subnet_id1) + { + assert_eq!( + [SubnetServiceMessage::Subscribe(Subnet::Attestation( + subnet_id1 + ))], + second_subscribe_event[..] + ); + } + } } mod sync_committee_service { diff --git a/common/hashset_delay/Cargo.toml b/common/hashset_delay/Cargo.toml deleted file mode 100644 index 1aa525a115..0000000000 --- a/common/hashset_delay/Cargo.toml +++ /dev/null @@ -1,12 +0,0 @@ -[package] -name = "hashset_delay" -version = "0.2.0" -authors = ["Sigma Prime "] -edition = "2021" - -[dependencies] -futures = "0.3.7" -tokio-util = { version = "0.6.2", features = ["time"] } - -[dev-dependencies] -tokio = { version = "1.14.0", features = ["time", "rt-multi-thread", "macros"] } diff --git a/common/hashset_delay/src/hashset_delay.rs b/common/hashset_delay/src/hashset_delay.rs deleted file mode 100644 index 052d71fe3b..0000000000 --- a/common/hashset_delay/src/hashset_delay.rs +++ /dev/null @@ -1,197 +0,0 @@ -//NOTE: This is just a specific case of a HashMapDelay. -// The code has been copied to make unique `insert` and `insert_at` functions. - -/// The default delay for entries, in seconds. This is only used when `insert()` is used to add -/// entries. -const DEFAULT_DELAY: u64 = 30; - -use futures::prelude::*; -use std::{ - collections::HashMap, - pin::Pin, - task::{Context, Poll}, - time::{Duration, Instant}, -}; -use tokio_util::time::delay_queue::{self, DelayQueue}; - -pub struct HashSetDelay -where - K: std::cmp::Eq + std::hash::Hash + std::clone::Clone + Unpin, -{ - /// The given entries. - entries: HashMap, - /// A queue holding the timeouts of each entry. - expirations: DelayQueue, - /// The default expiration timeout of an entry. - default_entry_timeout: Duration, -} - -/// A wrapping around entries that adds the link to the entry's expiration, via a `delay_queue` key. -struct MapEntry { - /// The expiration key for the entry. - key: delay_queue::Key, - /// The actual entry. - value: Instant, -} - -impl Default for HashSetDelay -where - K: std::cmp::Eq + std::hash::Hash + std::clone::Clone + Unpin, -{ - fn default() -> Self { - HashSetDelay::new(Duration::from_secs(DEFAULT_DELAY)) - } -} - -impl HashSetDelay -where - K: std::cmp::Eq + std::hash::Hash + std::clone::Clone + Unpin, -{ - /// Creates a new instance of `HashSetDelay`. - pub fn new(default_entry_timeout: Duration) -> Self { - HashSetDelay { - entries: HashMap::new(), - expirations: DelayQueue::new(), - default_entry_timeout, - } - } - - /// Insert an entry into the mapping. Entries will expire after the `default_entry_timeout`. - pub fn insert(&mut self, key: K) { - self.insert_at(key, self.default_entry_timeout); - } - - /// Inserts an entry that will expire at a given instant. If the entry already exists, the - /// timeout is updated. - pub fn insert_at(&mut self, key: K, entry_duration: Duration) { - if self.contains(&key) { - // update the timeout - self.update_timeout(&key, entry_duration); - } else { - let delay_key = self.expirations.insert(key.clone(), entry_duration); - let entry = MapEntry { - key: delay_key, - value: Instant::now() + entry_duration, - }; - self.entries.insert(key, entry); - } - } - - /// Gets a reference to an entry if it exists. - /// - /// Returns None if the entry does not exist. - pub fn get(&self, key: &K) -> Option<&Instant> { - self.entries.get(key).map(|entry| &entry.value) - } - - /// Returns true if the key exists, false otherwise. - pub fn contains(&self, key: &K) -> bool { - self.entries.contains_key(key) - } - - /// Returns the length of the mapping. - pub fn len(&self) -> usize { - self.entries.len() - } - - /// Checks if the mapping is empty. - pub fn is_empty(&self) -> bool { - self.entries.is_empty() - } - - /// Updates the timeout for a given key. Returns true if the key existed, false otherwise. - /// - /// Panics if the duration is too far in the future. - pub fn update_timeout(&mut self, key: &K, timeout: Duration) -> bool { - if let Some(entry) = self.entries.get(key) { - self.expirations.reset(&entry.key, timeout); - true - } else { - false - } - } - - /// Removes a key from the map returning the value associated with the key that was in the map. - /// - /// Return false if the key was not in the map. - pub fn remove(&mut self, key: &K) -> bool { - if let Some(entry) = self.entries.remove(key) { - self.expirations.remove(&entry.key); - return true; - } - false - } - - /// Retains only the elements specified by the predicate. - /// - /// In other words, remove all pairs `(k, v)` such that `f(&k,&mut v)` returns false. - pub fn retain bool>(&mut self, mut f: F) { - let expiration = &mut self.expirations; - self.entries.retain(|key, entry| { - let result = f(key); - if !result { - expiration.remove(&entry.key); - } - result - }) - } - - /// Removes all entries from the map. - pub fn clear(&mut self) { - self.entries.clear(); - self.expirations.clear(); - } - - /// Returns a vector of referencing all keys in the map. - pub fn keys(&self) -> impl Iterator { - self.entries.keys() - } -} - -impl Stream for HashSetDelay -where - K: std::cmp::Eq + std::hash::Hash + std::clone::Clone + Unpin, -{ - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match self.expirations.poll_expired(cx) { - Poll::Ready(Some(Ok(key))) => match self.entries.remove(key.get_ref()) { - Some(_) => Poll::Ready(Some(Ok(key.into_inner()))), - None => Poll::Ready(Some(Err("Value no longer exists in expirations".into()))), - }, - Poll::Ready(Some(Err(e))) => { - Poll::Ready(Some(Err(format!("delay queue error: {:?}", e)))) - } - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, - } - } -} - -#[cfg(test)] - -mod tests { - use super::*; - - #[tokio::test] - async fn should_not_panic() { - let key = 2u8; - - let mut map = HashSetDelay::default(); - - map.insert(key); - map.update_timeout(&key, Duration::from_secs(100)); - - let fut = |cx: &mut Context| { - let _ = map.poll_next_unpin(cx); - let _ = map.poll_next_unpin(cx); - Poll::Ready(()) - }; - - future::poll_fn(fut).await; - - map.insert(key); - map.update_timeout(&key, Duration::from_secs(100)); - } -} diff --git a/common/hashset_delay/src/lib.rs b/common/hashset_delay/src/lib.rs deleted file mode 100644 index 175ad72cfa..0000000000 --- a/common/hashset_delay/src/lib.rs +++ /dev/null @@ -1,12 +0,0 @@ -//! This crate provides a single type (its counter-part HashMapDelay has been removed as it -//! currently is not in use in lighthouse): -//! - `HashSetDelay` -//! -//! # HashSetDelay -//! -//! This is similar to a `HashMapDelay` except the mapping maps to the expiry time. This -//! allows users to add objects and check their expiry deadlines before the `Stream` -//! consumes them. - -mod hashset_delay; -pub use crate::hashset_delay::HashSetDelay;