mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-21 13:54:44 +00:00
Subscribe to subnets an epoch in advance (#1600)
## Issue Addressed N/A ## Proposed Changes Subscibe to subnet an epoch in advance of the attestation slot instead of 4 slots in advance.
This commit is contained in:
@@ -2,7 +2,7 @@
|
||||
//! given time. It schedules subscriptions to shard subnets, requests peer discoveries and
|
||||
//! determines whether attestations should be aggregated and/or passed to the beacon node.
|
||||
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::collections::{HashMap, HashSet, VecDeque};
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
@@ -10,10 +10,10 @@ use std::time::{Duration, Instant};
|
||||
|
||||
use futures::prelude::*;
|
||||
use rand::seq::SliceRandom;
|
||||
use slog::{crit, debug, error, o, trace, warn};
|
||||
use slog::{debug, error, o, trace, warn};
|
||||
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use eth2_libp2p::{types::GossipKind, NetworkGlobals, SubnetDiscovery};
|
||||
use eth2_libp2p::SubnetDiscovery;
|
||||
use hashset_delay::HashSetDelay;
|
||||
use rest_types::ValidatorSubscription;
|
||||
use slot_clock::SlotClock;
|
||||
@@ -66,17 +66,14 @@ pub struct AttestationService<T: BeaconChainTypes> {
|
||||
/// Queued events to return to the driving service.
|
||||
events: VecDeque<AttServiceMessage>,
|
||||
|
||||
/// A collection of public network variables.
|
||||
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
|
||||
|
||||
/// A reference to the beacon chain to process received attestations.
|
||||
beacon_chain: Arc<BeaconChain<T>>,
|
||||
|
||||
/// The collection of currently subscribed random subnets mapped to their expiry deadline.
|
||||
random_subnets: HashSetDelay<SubnetId>,
|
||||
|
||||
/// A collection of timeouts for when to subscribe to a shard subnet.
|
||||
subscriptions: HashSetDelay<ExactSubnet>,
|
||||
/// The collection of all currently subscribed subnets (long-lived **and** short-lived).
|
||||
subscriptions: HashSet<SubnetId>,
|
||||
|
||||
/// A collection of timeouts for when to unsubscribe from a shard subnet.
|
||||
unsubscriptions: HashSetDelay<ExactSubnet>,
|
||||
@@ -100,11 +97,7 @@ pub struct AttestationService<T: BeaconChainTypes> {
|
||||
impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
/* Public functions */
|
||||
|
||||
pub fn new(
|
||||
beacon_chain: Arc<BeaconChain<T>>,
|
||||
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
|
||||
log: &slog::Logger,
|
||||
) -> Self {
|
||||
pub fn new(beacon_chain: Arc<BeaconChain<T>>, log: &slog::Logger) -> Self {
|
||||
let log = log.new(o!("service" => "attestation_service"));
|
||||
|
||||
// calculate the random subnet duration from the spec constants
|
||||
@@ -125,10 +118,9 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
|
||||
AttestationService {
|
||||
events: VecDeque::with_capacity(10),
|
||||
network_globals,
|
||||
beacon_chain,
|
||||
random_subnets: HashSetDelay::new(Duration::from_millis(random_subnet_duration_millis)),
|
||||
subscriptions: HashSetDelay::new(default_timeout),
|
||||
subscriptions: HashSet::new(),
|
||||
unsubscriptions: HashSetDelay::new(default_timeout),
|
||||
aggregate_validators_on_subnet: HashSetDelay::new(default_timeout),
|
||||
known_validators: HashSetDelay::new(last_seen_val_timeout),
|
||||
@@ -137,6 +129,12 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Return count of all currently subscribed subnets (long-lived **and** short-lived).
|
||||
#[cfg(test)]
|
||||
pub fn subscription_count(&self) -> usize {
|
||||
self.subscriptions.len()
|
||||
}
|
||||
|
||||
/// Processes a list of validator subscriptions.
|
||||
///
|
||||
/// This will:
|
||||
@@ -321,40 +319,23 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
.now()
|
||||
.ok_or_else(|| "Could not get the current slot")?;
|
||||
|
||||
// Calculate the duration to the subscription event and the duration to the end event.
|
||||
// Calculate the duration to the unsubscription event.
|
||||
// There are two main cases. Attempting to subscribe to the current slot and all others.
|
||||
let (duration_to_subscribe, expected_end_subscription_duration) = {
|
||||
let duration_to_next_slot = self
|
||||
.beacon_chain
|
||||
let expected_end_subscription_duration = if current_slot >= exact_subnet.slot {
|
||||
self.beacon_chain
|
||||
.slot_clock
|
||||
.duration_to_next_slot()
|
||||
.ok_or_else(|| "Unable to determine duration to next slot")?;
|
||||
.ok_or_else(|| "Unable to determine duration to next slot")?
|
||||
} else {
|
||||
let slot_duration = self.beacon_chain.slot_clock.slot_duration();
|
||||
|
||||
if current_slot >= exact_subnet.slot {
|
||||
(Duration::from_secs(0), duration_to_next_slot)
|
||||
} else {
|
||||
let slot_duration = self.beacon_chain.slot_clock.slot_duration();
|
||||
let advance_subscription_duration = slot_duration
|
||||
.checked_div(ADVANCE_SUBSCRIBE_TIME)
|
||||
.expect("ADVANCE_SUBSCRIPTION_TIME cannot be too large");
|
||||
|
||||
// calculate the time to subscribe to the subnet
|
||||
let duration_to_subscribe = self
|
||||
.beacon_chain
|
||||
.slot_clock
|
||||
.duration_to_slot(exact_subnet.slot)
|
||||
.ok_or_else(|| "Unable to determine duration to subscription slot")?
|
||||
.checked_sub(advance_subscription_duration)
|
||||
.unwrap_or_else(|| Duration::from_secs(0));
|
||||
|
||||
// the duration until we no longer need this subscription. We assume a single slot is
|
||||
// sufficient.
|
||||
let expected_end_subscription_duration = duration_to_subscribe
|
||||
+ slot_duration
|
||||
+ std::cmp::min(advance_subscription_duration, duration_to_next_slot);
|
||||
|
||||
(duration_to_subscribe, expected_end_subscription_duration)
|
||||
}
|
||||
// the duration until we no longer need this subscription. We assume a single slot is
|
||||
// sufficient.
|
||||
self.beacon_chain
|
||||
.slot_clock
|
||||
.duration_to_slot(exact_subnet.slot)
|
||||
.ok_or_else(|| "Unable to determine duration to subscription slot")?
|
||||
+ slot_duration
|
||||
};
|
||||
|
||||
// Regardless of whether or not we have already subscribed to a subnet, track the expiration
|
||||
@@ -370,13 +351,12 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
// in-active. This case is checked on the subscription event (see `handle_subscriptions`).
|
||||
|
||||
// Return if we already have a subscription for this subnet_id and slot
|
||||
if self.subscriptions.contains(&exact_subnet) {
|
||||
if self.unsubscriptions.contains(&exact_subnet) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// We are not currently subscribed and have no waiting subscription, create one
|
||||
self.subscriptions
|
||||
.insert_at(exact_subnet.clone(), duration_to_subscribe);
|
||||
self.handle_subscriptions(exact_subnet.clone());
|
||||
|
||||
// if there is an unsubscription event for the slot prior, we remove it to prevent
|
||||
// unsubscriptions immediately after the subscription. We also want to minimize
|
||||
@@ -437,35 +417,30 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
};
|
||||
|
||||
for subnet_id in to_subscribe_subnets {
|
||||
// remove this subnet from any immediate subscription/un-subscription events
|
||||
self.subscriptions
|
||||
.retain(|exact_subnet| exact_subnet.subnet_id != subnet_id);
|
||||
// remove this subnet from any immediate un-subscription events
|
||||
self.unsubscriptions
|
||||
.retain(|exact_subnet| exact_subnet.subnet_id != subnet_id);
|
||||
|
||||
// insert a new random subnet
|
||||
self.random_subnets.insert(subnet_id);
|
||||
|
||||
// send discovery request
|
||||
// Note: it's wasteful to send a DiscoverPeers request if we already have peers for this subnet.
|
||||
// However, subscribing to random subnets ideally shouldn't happen very often (once in ~27 hours) and
|
||||
// this makes it easier to deterministically test the attestations service.
|
||||
self.events
|
||||
.push_back(AttServiceMessage::DiscoverPeers(vec![SubnetDiscovery {
|
||||
subnet_id,
|
||||
min_ttl: None,
|
||||
}]));
|
||||
|
||||
// if we are not already subscribed, then subscribe
|
||||
let topic_kind = &GossipKind::Attestation(subnet_id);
|
||||
|
||||
let already_subscribed = self
|
||||
.network_globals
|
||||
.gossipsub_subscriptions
|
||||
.read()
|
||||
.iter()
|
||||
.any(|topic| topic.kind() == topic_kind);
|
||||
|
||||
if !already_subscribed {
|
||||
// send a discovery request and a subscription
|
||||
self.events
|
||||
.push_back(AttServiceMessage::DiscoverPeers(vec![SubnetDiscovery {
|
||||
subnet_id,
|
||||
min_ttl: None,
|
||||
}]));
|
||||
if !self.subscriptions.contains(&subnet_id) {
|
||||
self.subscriptions.insert(subnet_id);
|
||||
self.events
|
||||
.push_back(AttServiceMessage::Subscribe(subnet_id));
|
||||
}
|
||||
|
||||
// add the subnet to the ENR bitfield
|
||||
self.events.push_back(AttServiceMessage::EnrAdd(subnet_id));
|
||||
}
|
||||
@@ -499,17 +474,10 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
// we are also not un-subscribing from a subnet if the next slot requires us to be
|
||||
// subscribed. Therefore there could be the case that we are already still subscribed
|
||||
// to the required subnet. In which case we do not issue another subscription request.
|
||||
let topic_kind = &GossipKind::Attestation(exact_subnet.subnet_id);
|
||||
if self
|
||||
.network_globals
|
||||
.gossipsub_subscriptions
|
||||
.read()
|
||||
.iter()
|
||||
.find(|topic| topic.kind() == topic_kind)
|
||||
.is_none()
|
||||
{
|
||||
if !self.subscriptions.contains(&exact_subnet.subnet_id) {
|
||||
// we are not already subscribed
|
||||
debug!(self.log, "Subscribing to subnet"; "subnet" => *exact_subnet.subnet_id, "target_slot" => exact_subnet.slot.as_u64());
|
||||
self.subscriptions.insert(exact_subnet.subnet_id);
|
||||
self.events
|
||||
.push_back(AttServiceMessage::Subscribe(exact_subnet.subnet_id));
|
||||
}
|
||||
@@ -528,10 +496,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
|
||||
debug!(self.log, "Unsubscribing from subnet"; "subnet" => *exact_subnet.subnet_id, "processed_slot" => exact_subnet.slot.as_u64());
|
||||
|
||||
// various logic checks
|
||||
if self.subscriptions.contains(&exact_subnet) {
|
||||
crit!(self.log, "Unsubscribing from a subnet in subscriptions");
|
||||
}
|
||||
self.subscriptions.remove(&exact_subnet.subnet_id);
|
||||
self.events
|
||||
.push_back(AttServiceMessage::Unsubscribe(exact_subnet.subnet_id));
|
||||
}
|
||||
@@ -581,33 +546,17 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
&mut rand::thread_rng(),
|
||||
random_subnets_per_validator as usize,
|
||||
);
|
||||
let current_slot = self.beacon_chain.slot_clock.now().ok_or_else(|| {
|
||||
warn!(self.log, "Could not get the current slot");
|
||||
})?;
|
||||
|
||||
for subnet_id in to_remove_subnets {
|
||||
// If a subscription is queued for two slots in the future, it's associated unsubscription
|
||||
// will unsubscribe from the expired subnet.
|
||||
// If there is no unsubscription for this subnet,slot it is safe to add one, without
|
||||
// unsubscribing early from a required subnet
|
||||
let subnet = ExactSubnet {
|
||||
subnet_id: *subnet_id,
|
||||
slot: current_slot + 2,
|
||||
};
|
||||
if self.subscriptions.get(&subnet).is_none() {
|
||||
// set an unsubscribe event
|
||||
let duration_to_next_slot = self
|
||||
.beacon_chain
|
||||
.slot_clock
|
||||
.duration_to_next_slot()
|
||||
.ok_or_else(|| {
|
||||
warn!(self.log, "Unable to determine duration to next slot");
|
||||
})?;
|
||||
let slot_duration = self.beacon_chain.slot_clock.slot_duration();
|
||||
// Set the unsubscription timeout
|
||||
let unsubscription_duration = duration_to_next_slot + slot_duration * 2;
|
||||
self.unsubscriptions
|
||||
.insert_at(subnet, unsubscription_duration);
|
||||
// If there are no unsubscription events for `subnet_id`, we unsubscribe immediately.
|
||||
if self
|
||||
.unsubscriptions
|
||||
.keys()
|
||||
.find(|s| s.subnet_id == *subnet_id)
|
||||
.is_none()
|
||||
{
|
||||
self.events
|
||||
.push_back(AttServiceMessage::Unsubscribe(*subnet_id));
|
||||
}
|
||||
// as the long lasting subnet subscription is being removed, remove the subnet_id from
|
||||
// the ENR bitfield
|
||||
@@ -632,15 +581,6 @@ impl<T: BeaconChainTypes> Stream for AttestationService<T> {
|
||||
self.waker = Some(cx.waker().clone());
|
||||
}
|
||||
|
||||
// process any subscription events
|
||||
match self.subscriptions.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(Ok(exact_subnet))) => self.handle_subscriptions(exact_subnet),
|
||||
Poll::Ready(Some(Err(e))) => {
|
||||
error!(self.log, "Failed to check for subnet subscription times"; "error"=> e);
|
||||
}
|
||||
Poll::Ready(None) | Poll::Pending => {}
|
||||
}
|
||||
|
||||
// process any un-subscription events
|
||||
match self.unsubscriptions.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(Ok(exact_subnet))) => self.handle_unsubscriptions(exact_subnet),
|
||||
|
||||
Reference in New Issue
Block a user