Simple Subnet Management (#6146)

* Initial temp commit

* Merge latest unstable

* First draft without tests

* Update tests for new version

* Correct comments and reviewers comments

* Merge latest unstable

* Fix errors

* Missed a comment, corrected it

* Fix lints

* Merge latest unstable

* Fix tests

* Merge latest unstable

* Reviewers comments

* Remove sync subnets from ENR on unsubscribe

* Merge branch 'unstable' into simple-peer-mapping

* Merge branch 'unstable' into simple-peer-mapping

* Merge branch 'unstable' into simple-peer-mapping

* Merge latest unstable

* Prevent clash with pin of rust_eth_kzg
This commit is contained in:
Age Manning
2024-11-26 12:48:07 +11:00
committed by GitHub
parent 6e1945fc5d
commit 08e8b92e50
10 changed files with 1606 additions and 1934 deletions

View File

@@ -2,12 +2,9 @@ use crate::nat;
use crate::network_beacon_processor::InvalidBlockStorage;
use crate::persisted_dht::{clear_dht, load_dht, persist_dht};
use crate::router::{Router, RouterMessage};
use crate::subnet_service::SyncCommitteeService;
use crate::subnet_service::{SubnetService, SubnetServiceMessage, Subscription};
use crate::NetworkConfig;
use crate::{error, metrics};
use crate::{
subnet_service::{AttestationService, SubnetServiceMessage},
NetworkConfig,
};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use beacon_processor::{work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend};
use futures::channel::mpsc::Sender;
@@ -165,10 +162,8 @@ pub struct NetworkService<T: BeaconChainTypes> {
beacon_chain: Arc<BeaconChain<T>>,
/// The underlying libp2p service that drives all the network interactions.
libp2p: Network<T::EthSpec>,
/// An attestation and subnet manager service.
attestation_service: AttestationService<T>,
/// A sync committeee subnet manager service.
sync_committee_service: SyncCommitteeService<T>,
/// An attestation and sync committee subnet manager service.
subnet_service: SubnetService<T>,
/// The receiver channel for lighthouse to communicate with the network service.
network_recv: mpsc::UnboundedReceiver<NetworkMessage<T::EthSpec>>,
/// The receiver channel for lighthouse to send validator subscription requests.
@@ -317,16 +312,13 @@ impl<T: BeaconChainTypes> NetworkService<T> {
network_log.clone(),
)?;
// attestation subnet service
let attestation_service = AttestationService::new(
// attestation and sync committee subnet service
let subnet_service = SubnetService::new(
beacon_chain.clone(),
network_globals.local_enr().node_id(),
&config,
&network_log,
);
// sync committee subnet service
let sync_committee_service =
SyncCommitteeService::new(beacon_chain.clone(), &config, &network_log);
// create a timer for updating network metrics
let metrics_update = tokio::time::interval(Duration::from_secs(METRIC_UPDATE_INTERVAL));
@@ -344,8 +336,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
let network_service = NetworkService {
beacon_chain,
libp2p,
attestation_service,
sync_committee_service,
subnet_service,
network_recv,
validator_subscription_recv,
router_send,
@@ -460,11 +451,8 @@ impl<T: BeaconChainTypes> NetworkService<T> {
// handle a message from a validator requesting a subscription to a subnet
Some(msg) = self.validator_subscription_recv.recv() => self.on_validator_subscription_msg(msg).await,
// process any attestation service events
Some(msg) = self.attestation_service.next() => self.on_attestation_service_msg(msg),
// process any sync committee service events
Some(msg) = self.sync_committee_service.next() => self.on_sync_committee_service_message(msg),
// process any subnet service events
Some(msg) = self.subnet_service.next() => self.on_subnet_service_msg(msg),
event = self.libp2p.next_event() => self.on_libp2p_event(event, &mut shutdown_sender).await,
@@ -552,13 +540,14 @@ impl<T: BeaconChainTypes> NetworkService<T> {
match message {
// attestation information gets processed in the attestation service
PubsubMessage::Attestation(ref subnet_and_attestation) => {
let subnet = subnet_and_attestation.0;
let subnet_id = subnet_and_attestation.0;
let attestation = &subnet_and_attestation.1;
// checks if we have an aggregator for the slot. If so, we should process
// the attestation, else we just just propagate the Attestation.
let should_process = self
.attestation_service
.should_process_attestation(subnet, attestation);
let should_process = self.subnet_service.should_process_attestation(
Subnet::Attestation(subnet_id),
attestation,
);
self.send_to_router(RouterMessage::PubsubMessage(
id,
source,
@@ -832,20 +821,12 @@ impl<T: BeaconChainTypes> NetworkService<T> {
async fn on_validator_subscription_msg(&mut self, msg: ValidatorSubscriptionMessage) {
match msg {
ValidatorSubscriptionMessage::AttestationSubscribe { subscriptions } => {
if let Err(e) = self
.attestation_service
.validator_subscriptions(subscriptions.into_iter())
{
warn!(self.log, "Attestation validator subscription failed"; "error" => e);
}
let subscriptions = subscriptions.into_iter().map(Subscription::Attestation);
self.subnet_service.validator_subscriptions(subscriptions)
}
ValidatorSubscriptionMessage::SyncCommitteeSubscribe { subscriptions } => {
if let Err(e) = self
.sync_committee_service
.validator_subscriptions(subscriptions)
{
warn!(self.log, "Sync committee calidator subscription failed"; "error" => e);
}
let subscriptions = subscriptions.into_iter().map(Subscription::SyncCommittee);
self.subnet_service.validator_subscriptions(subscriptions)
}
}
}
@@ -881,7 +862,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
}
fn on_attestation_service_msg(&mut self, msg: SubnetServiceMessage) {
fn on_subnet_service_msg(&mut self, msg: SubnetServiceMessage) {
match msg {
SubnetServiceMessage::Subscribe(subnet) => {
for fork_digest in self.required_gossip_fork_digests() {
@@ -900,36 +881,9 @@ impl<T: BeaconChainTypes> NetworkService<T> {
SubnetServiceMessage::EnrAdd(subnet) => {
self.libp2p.update_enr_subnet(subnet, true);
}
SubnetServiceMessage::EnrRemove(subnet) => {
self.libp2p.update_enr_subnet(subnet, false);
}
SubnetServiceMessage::DiscoverPeers(subnets_to_discover) => {
self.libp2p.discover_subnet_peers(subnets_to_discover);
}
}
}
fn on_sync_committee_service_message(&mut self, msg: SubnetServiceMessage) {
match msg {
SubnetServiceMessage::Subscribe(subnet) => {
for fork_digest in self.required_gossip_fork_digests() {
let topic =
GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
self.libp2p.subscribe(topic);
}
}
SubnetServiceMessage::Unsubscribe(subnet) => {
for fork_digest in self.required_gossip_fork_digests() {
let topic =
GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
self.libp2p.unsubscribe(topic);
}
}
SubnetServiceMessage::EnrAdd(subnet) => {
self.libp2p.update_enr_subnet(subnet, true);
}
SubnetServiceMessage::EnrRemove(subnet) => {
self.libp2p.update_enr_subnet(subnet, false);
SubnetServiceMessage::EnrRemove(sync_subnet_id) => {
self.libp2p
.update_enr_subnet(Subnet::SyncCommittee(sync_subnet_id), false);
}
SubnetServiceMessage::DiscoverPeers(subnets_to_discover) => {
self.libp2p.discover_subnet_peers(subnets_to_discover);

View File

@@ -169,21 +169,18 @@ mod tests {
// Subscribe to the topics.
runtime.block_on(async {
while network_globals.gossipsub_subscriptions.read().len() < 2 {
if let Some(msg) = network_service.attestation_service.next().await {
network_service.on_attestation_service_msg(msg);
if let Some(msg) = network_service.subnet_service.next().await {
network_service.on_subnet_service_msg(msg);
}
}
});
// Make sure the service is subscribed to the topics.
let (old_topic1, old_topic2) = {
let mut subnets = SubnetId::compute_subnets_for_epoch::<MinimalEthSpec>(
let mut subnets = SubnetId::compute_attestation_subnets(
network_globals.local_enr().node_id().raw(),
beacon_chain.epoch().unwrap(),
&spec,
)
.unwrap()
.0
.collect::<Vec<_>>();
assert_eq!(2, subnets.len());

View File

@@ -1,687 +0,0 @@
//! This service keeps track of which shard subnet the beacon node should be subscribed to at any
//! given time. It schedules subscriptions to shard subnets, requests peer discoveries and
//! determines whether attestations should be aggregated and/or passed to the beacon node.
use super::SubnetServiceMessage;
use std::collections::HashSet;
use std::collections::{HashMap, VecDeque};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use delay_map::{HashMapDelay, HashSetDelay};
use futures::prelude::*;
use lighthouse_network::{discv5::enr::NodeId, NetworkConfig, Subnet, SubnetDiscovery};
use slog::{debug, error, info, o, trace, warn};
use slot_clock::SlotClock;
use types::{Attestation, EthSpec, Slot, SubnetId, ValidatorSubscription};
use crate::metrics;
/// The minimum number of slots ahead that we attempt to discover peers for a subscription. If the
/// slot is less than this number, skip the peer discovery process.
/// Subnet discovery query takes at most 30 secs, 2 slots take 24s.
pub(crate) const MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 2;
/// The fraction of a slot that we subscribe to a subnet before the required slot.
///
/// Currently a whole slot ahead.
const ADVANCE_SUBSCRIBE_SLOT_FRACTION: u32 = 1;
/// The number of slots after an aggregator duty where we remove the entry from
/// `aggregate_validators_on_subnet` delay map.
const UNSUBSCRIBE_AFTER_AGGREGATOR_DUTY: u32 = 2;
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub(crate) enum SubscriptionKind {
/// Long lived subscriptions.
///
/// These have a longer duration and are advertised in our ENR.
LongLived,
/// Short lived subscriptions.
///
/// Subscribing to these subnets has a short duration and we don't advertise it in our ENR.
ShortLived,
}
/// A particular subnet at a given slot.
#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
pub struct ExactSubnet {
/// The `SubnetId` associated with this subnet.
pub subnet_id: SubnetId,
/// The `Slot` associated with this subnet.
pub slot: Slot,
}
pub struct AttestationService<T: BeaconChainTypes> {
/// Queued events to return to the driving service.
events: VecDeque<SubnetServiceMessage>,
/// A reference to the beacon chain to process received attestations.
pub(crate) beacon_chain: Arc<BeaconChain<T>>,
/// Subnets we are currently subscribed to as short lived subscriptions.
///
/// Once they expire, we unsubscribe from these.
/// We subscribe to subnets when we are an aggregator for an exact subnet.
short_lived_subscriptions: HashMapDelay<SubnetId, Slot>,
/// Subnets we are currently subscribed to as long lived subscriptions.
///
/// We advertise these in our ENR. When these expire, the subnet is removed from our ENR.
/// These are required of all beacon nodes. The exact number is determined by the chain
/// specification.
long_lived_subscriptions: HashSet<SubnetId>,
/// Short lived subscriptions that need to be executed in the future.
scheduled_short_lived_subscriptions: HashSetDelay<ExactSubnet>,
/// A collection timeouts to track the existence of aggregate validator subscriptions at an
/// `ExactSubnet`.
aggregate_validators_on_subnet: Option<HashSetDelay<ExactSubnet>>,
/// The waker for the current thread.
waker: Option<std::task::Waker>,
/// The discovery mechanism of lighthouse is disabled.
discovery_disabled: bool,
/// We are always subscribed to all subnets.
subscribe_all_subnets: bool,
/// Our Discv5 node_id.
node_id: NodeId,
/// Future used to manage subscribing and unsubscribing from long lived subnets.
next_long_lived_subscription_event: Pin<Box<tokio::time::Sleep>>,
/// Whether this node is a block proposer-only node.
proposer_only: bool,
/// The logger for the attestation service.
log: slog::Logger,
}
impl<T: BeaconChainTypes> AttestationService<T> {
/* Public functions */
/// Establish the service based on the passed configuration.
pub fn new(
beacon_chain: Arc<BeaconChain<T>>,
node_id: NodeId,
config: &NetworkConfig,
log: &slog::Logger,
) -> Self {
let log = log.new(o!("service" => "attestation_service"));
let slot_duration = beacon_chain.slot_clock.slot_duration();
if config.subscribe_all_subnets {
slog::info!(log, "Subscribing to all subnets");
} else {
slog::info!(log, "Deterministic long lived subnets enabled"; "subnets_per_node" => beacon_chain.spec.subnets_per_node, "subscription_duration_in_epochs" => beacon_chain.spec.epochs_per_subnet_subscription);
}
let track_validators = !config.import_all_attestations;
let aggregate_validators_on_subnet =
track_validators.then(|| HashSetDelay::new(slot_duration));
let mut service = AttestationService {
events: VecDeque::with_capacity(10),
beacon_chain,
short_lived_subscriptions: HashMapDelay::new(slot_duration),
long_lived_subscriptions: HashSet::default(),
scheduled_short_lived_subscriptions: HashSetDelay::default(),
aggregate_validators_on_subnet,
waker: None,
discovery_disabled: config.disable_discovery,
subscribe_all_subnets: config.subscribe_all_subnets,
node_id,
next_long_lived_subscription_event: {
// Set a dummy sleep. Calculating the current subnet subscriptions will update this
// value with a smarter timing
Box::pin(tokio::time::sleep(Duration::from_secs(1)))
},
proposer_only: config.proposer_only,
log,
};
// If we are not subscribed to all subnets, handle the deterministic set of subnets
if !config.subscribe_all_subnets {
service.recompute_long_lived_subnets();
}
service
}
/// Return count of all currently subscribed subnets (long-lived **and** short-lived).
#[cfg(test)]
pub fn subscription_count(&self) -> usize {
if self.subscribe_all_subnets {
self.beacon_chain.spec.attestation_subnet_count as usize
} else {
let count = self
.short_lived_subscriptions
.keys()
.chain(self.long_lived_subscriptions.iter())
.collect::<HashSet<_>>()
.len();
count
}
}
/// Returns whether we are subscribed to a subnet for testing purposes.
#[cfg(test)]
pub(crate) fn is_subscribed(
&self,
subnet_id: &SubnetId,
subscription_kind: SubscriptionKind,
) -> bool {
match subscription_kind {
SubscriptionKind::LongLived => self.long_lived_subscriptions.contains(subnet_id),
SubscriptionKind::ShortLived => self.short_lived_subscriptions.contains_key(subnet_id),
}
}
#[cfg(test)]
pub(crate) fn long_lived_subscriptions(&self) -> &HashSet<SubnetId> {
&self.long_lived_subscriptions
}
/// Processes a list of validator subscriptions.
///
/// This will:
/// - Register new validators as being known.
/// - Search for peers for required subnets.
/// - Request subscriptions for subnets on specific slots when required.
/// - Build the timeouts for each of these events.
///
/// This returns a result simply for the ergonomics of using ?. The result can be
/// safely dropped.
pub fn validator_subscriptions(
&mut self,
subscriptions: impl Iterator<Item = ValidatorSubscription>,
) -> Result<(), String> {
// If the node is in a proposer-only state, we ignore all subnet subscriptions.
if self.proposer_only {
return Ok(());
}
// Maps each subnet_id subscription to it's highest slot
let mut subnets_to_discover: HashMap<SubnetId, Slot> = HashMap::new();
// Registers the validator with the attestation service.
for subscription in subscriptions {
metrics::inc_counter(&metrics::SUBNET_SUBSCRIPTION_REQUESTS);
trace!(self.log,
"Validator subscription";
"subscription" => ?subscription,
);
// Compute the subnet that is associated with this subscription
let subnet_id = match SubnetId::compute_subnet::<T::EthSpec>(
subscription.slot,
subscription.attestation_committee_index,
subscription.committee_count_at_slot,
&self.beacon_chain.spec,
) {
Ok(subnet_id) => subnet_id,
Err(e) => {
warn!(self.log,
"Failed to compute subnet id for validator subscription";
"error" => ?e,
);
continue;
}
};
// Ensure each subnet_id inserted into the map has the highest slot as it's value.
// Higher slot corresponds to higher min_ttl in the `SubnetDiscovery` entry.
if let Some(slot) = subnets_to_discover.get(&subnet_id) {
if subscription.slot > *slot {
subnets_to_discover.insert(subnet_id, subscription.slot);
}
} else if !self.discovery_disabled {
subnets_to_discover.insert(subnet_id, subscription.slot);
}
let exact_subnet = ExactSubnet {
subnet_id,
slot: subscription.slot,
};
// Determine if the validator is an aggregator. If so, we subscribe to the subnet and
// if successful add the validator to a mapping of known aggregators for that exact
// subnet.
if subscription.is_aggregator {
metrics::inc_counter(&metrics::SUBNET_SUBSCRIPTION_AGGREGATOR_REQUESTS);
if let Err(e) = self.subscribe_to_short_lived_subnet(exact_subnet) {
warn!(self.log,
"Subscription to subnet error";
"error" => e,
);
} else {
trace!(self.log,
"Subscribed to subnet for aggregator duties";
"exact_subnet" => ?exact_subnet,
);
}
}
}
// If the discovery mechanism isn't disabled, attempt to set up a peer discovery for the
// required subnets.
if !self.discovery_disabled {
if let Err(e) = self.discover_peers_request(
subnets_to_discover
.into_iter()
.map(|(subnet_id, slot)| ExactSubnet { subnet_id, slot }),
) {
warn!(self.log, "Discovery lookup request error"; "error" => e);
};
}
Ok(())
}
fn recompute_long_lived_subnets(&mut self) {
// Ensure the next computation is scheduled even if assigning subnets fails.
let next_subscription_event = self
.recompute_long_lived_subnets_inner()
.unwrap_or_else(|_| self.beacon_chain.slot_clock.slot_duration());
debug!(self.log, "Recomputing deterministic long lived subnets");
self.next_long_lived_subscription_event =
Box::pin(tokio::time::sleep(next_subscription_event));
if let Some(waker) = self.waker.as_ref() {
waker.wake_by_ref();
}
}
/// Gets the long lived subnets the node should be subscribed to during the current epoch and
/// the remaining duration for which they remain valid.
fn recompute_long_lived_subnets_inner(&mut self) -> Result<Duration, ()> {
let current_epoch = self.beacon_chain.epoch().map_err(|e| {
if !self
.beacon_chain
.slot_clock
.is_prior_to_genesis()
.unwrap_or(false)
{
error!(self.log, "Failed to get the current epoch from clock"; "err" => ?e)
}
})?;
let (subnets, next_subscription_epoch) = SubnetId::compute_subnets_for_epoch::<T::EthSpec>(
self.node_id.raw(),
current_epoch,
&self.beacon_chain.spec,
)
.map_err(|e| error!(self.log, "Could not compute subnets for current epoch"; "err" => e))?;
let next_subscription_slot =
next_subscription_epoch.start_slot(T::EthSpec::slots_per_epoch());
let next_subscription_event = self
.beacon_chain
.slot_clock
.duration_to_slot(next_subscription_slot)
.ok_or_else(|| {
error!(
self.log,
"Failed to compute duration to next to long lived subscription event"
)
})?;
self.update_long_lived_subnets(subnets.collect());
Ok(next_subscription_event)
}
/// Updates the long lived subnets.
///
/// New subnets are registered as subscribed, removed subnets as unsubscribed and the Enr
/// updated accordingly.
fn update_long_lived_subnets(&mut self, mut subnets: HashSet<SubnetId>) {
info!(self.log, "Subscribing to long-lived subnets"; "subnets" => ?subnets.iter().collect::<Vec<_>>());
for subnet in &subnets {
// Add the events for those subnets that are new as long lived subscriptions.
if !self.long_lived_subscriptions.contains(subnet) {
// Check if this subnet is new and send the subscription event if needed.
if !self.short_lived_subscriptions.contains_key(subnet) {
debug!(self.log, "Subscribing to subnet";
"subnet" => ?subnet,
"subscription_kind" => ?SubscriptionKind::LongLived,
);
self.queue_event(SubnetServiceMessage::Subscribe(Subnet::Attestation(
*subnet,
)));
}
self.queue_event(SubnetServiceMessage::EnrAdd(Subnet::Attestation(*subnet)));
if !self.discovery_disabled {
self.queue_event(SubnetServiceMessage::DiscoverPeers(vec![SubnetDiscovery {
subnet: Subnet::Attestation(*subnet),
min_ttl: None,
}]))
}
}
}
// Update the long_lived_subnets set and check for subnets that are being removed
std::mem::swap(&mut self.long_lived_subscriptions, &mut subnets);
for subnet in subnets {
if !self.long_lived_subscriptions.contains(&subnet) {
self.handle_removed_subnet(subnet, SubscriptionKind::LongLived);
}
}
}
/// Checks if we have subscribed aggregate validators for the subnet. If not, checks the gossip
/// verification, re-propagates and returns false.
pub fn should_process_attestation(
&self,
subnet: SubnetId,
attestation: &Attestation<T::EthSpec>,
) -> bool {
// Proposer-only mode does not need to process attestations
if self.proposer_only {
return false;
}
self.aggregate_validators_on_subnet
.as_ref()
.map(|tracked_vals| {
tracked_vals.contains_key(&ExactSubnet {
subnet_id: subnet,
slot: attestation.data().slot,
})
})
.unwrap_or(true)
}
/* Internal private functions */
/// Adds an event to the event queue and notifies that this service is ready to be polled
/// again.
fn queue_event(&mut self, ev: SubnetServiceMessage) {
self.events.push_back(ev);
if let Some(waker) = &self.waker {
waker.wake_by_ref()
}
}
/// Checks if there are currently queued discovery requests and the time required to make the
/// request.
///
/// If there is sufficient time, queues a peer discovery request for all the required subnets.
fn discover_peers_request(
&mut self,
exact_subnets: impl Iterator<Item = ExactSubnet>,
) -> Result<(), &'static str> {
let current_slot = self
.beacon_chain
.slot_clock
.now()
.ok_or("Could not get the current slot")?;
let discovery_subnets: Vec<SubnetDiscovery> = exact_subnets
.filter_map(|exact_subnet| {
// Check if there is enough time to perform a discovery lookup.
if exact_subnet.slot
>= current_slot.saturating_add(MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD)
{
// Send out an event to start looking for peers.
// Require the peer for an additional slot to ensure we keep the peer for the
// duration of the subscription.
let min_ttl = self
.beacon_chain
.slot_clock
.duration_to_slot(exact_subnet.slot + 1)
.map(|duration| std::time::Instant::now() + duration);
Some(SubnetDiscovery {
subnet: Subnet::Attestation(exact_subnet.subnet_id),
min_ttl,
})
} else {
// We may want to check the global PeerInfo to see estimated timeouts for each
// peer before they can be removed.
warn!(self.log,
"Not enough time for a discovery search";
"subnet_id" => ?exact_subnet
);
None
}
})
.collect();
if !discovery_subnets.is_empty() {
self.queue_event(SubnetServiceMessage::DiscoverPeers(discovery_subnets));
}
Ok(())
}
// Subscribes to the subnet if it should be done immediately, or schedules it if required.
fn subscribe_to_short_lived_subnet(
&mut self,
ExactSubnet { subnet_id, slot }: ExactSubnet,
) -> Result<(), &'static str> {
let slot_duration = self.beacon_chain.slot_clock.slot_duration();
// The short time we schedule the subscription before it's actually required. This
// ensures we are subscribed on time, and allows consecutive subscriptions to the same
// subnet to overlap, reducing subnet churn.
let advance_subscription_duration = slot_duration / ADVANCE_SUBSCRIBE_SLOT_FRACTION;
// The time to the required slot.
let time_to_subscription_slot = self
.beacon_chain
.slot_clock
.duration_to_slot(slot)
.unwrap_or_default(); // If this is a past slot we will just get a 0 duration.
// Calculate how long before we need to subscribe to the subnet.
let time_to_subscription_start =
time_to_subscription_slot.saturating_sub(advance_subscription_duration);
// The time after a duty slot where we no longer need it in the `aggregate_validators_on_subnet`
// delay map.
let time_to_unsubscribe =
time_to_subscription_slot + UNSUBSCRIBE_AFTER_AGGREGATOR_DUTY * slot_duration;
if let Some(tracked_vals) = self.aggregate_validators_on_subnet.as_mut() {
tracked_vals.insert_at(ExactSubnet { subnet_id, slot }, time_to_unsubscribe);
}
// If the subscription should be done in the future, schedule it. Otherwise subscribe
// immediately.
if time_to_subscription_start.is_zero() {
// This is a current or past slot, we subscribe immediately.
self.subscribe_to_short_lived_subnet_immediately(subnet_id, slot + 1)?;
} else {
// This is a future slot, schedule subscribing.
trace!(self.log, "Scheduling subnet subscription"; "subnet" => ?subnet_id, "time_to_subscription_start" => ?time_to_subscription_start);
self.scheduled_short_lived_subscriptions
.insert_at(ExactSubnet { subnet_id, slot }, time_to_subscription_start);
}
Ok(())
}
/* A collection of functions that handle the various timeouts */
/// Registers a subnet as subscribed.
///
/// Checks that the time in which the subscription would end is not in the past. If we are
/// already subscribed, extends the timeout if necessary. If this is a new subscription, we send
/// out the appropriate events.
///
/// On determinist long lived subnets, this is only used for short lived subscriptions.
fn subscribe_to_short_lived_subnet_immediately(
&mut self,
subnet_id: SubnetId,
end_slot: Slot,
) -> Result<(), &'static str> {
if self.subscribe_all_subnets {
// Case not handled by this service.
return Ok(());
}
let time_to_subscription_end = self
.beacon_chain
.slot_clock
.duration_to_slot(end_slot)
.unwrap_or_default();
// First check this is worth doing.
if time_to_subscription_end.is_zero() {
return Err("Time when subscription would end has already passed.");
}
let subscription_kind = SubscriptionKind::ShortLived;
// We need to check and add a subscription for the right kind, regardless of the presence
// of the subnet as a subscription of the other kind. This is mainly since long lived
// subscriptions can be removed at any time when a validator goes offline.
let (subscriptions, already_subscribed_as_other_kind) = (
&mut self.short_lived_subscriptions,
self.long_lived_subscriptions.contains(&subnet_id),
);
match subscriptions.get(&subnet_id) {
Some(current_end_slot) => {
// We are already subscribed. Check if we need to extend the subscription.
if &end_slot > current_end_slot {
trace!(self.log, "Extending subscription to subnet";
"subnet" => ?subnet_id,
"prev_end_slot" => current_end_slot,
"new_end_slot" => end_slot,
"subscription_kind" => ?subscription_kind,
);
subscriptions.insert_at(subnet_id, end_slot, time_to_subscription_end);
}
}
None => {
// This is a new subscription. Add with the corresponding timeout and send the
// notification.
subscriptions.insert_at(subnet_id, end_slot, time_to_subscription_end);
// Inform of the subscription.
if !already_subscribed_as_other_kind {
debug!(self.log, "Subscribing to subnet";
"subnet" => ?subnet_id,
"end_slot" => end_slot,
"subscription_kind" => ?subscription_kind,
);
self.queue_event(SubnetServiceMessage::Subscribe(Subnet::Attestation(
subnet_id,
)));
}
}
}
Ok(())
}
// Unsubscribes from a subnet that was removed if it does not continue to exist as a
// subscription of the other kind. For long lived subscriptions, it also removes the
// advertisement from our ENR.
fn handle_removed_subnet(&mut self, subnet_id: SubnetId, subscription_kind: SubscriptionKind) {
let exists_in_other_subscriptions = match subscription_kind {
SubscriptionKind::LongLived => self.short_lived_subscriptions.contains_key(&subnet_id),
SubscriptionKind::ShortLived => self.long_lived_subscriptions.contains(&subnet_id),
};
if !exists_in_other_subscriptions {
// Subscription no longer exists as short lived or long lived.
debug!(self.log, "Unsubscribing from subnet"; "subnet" => ?subnet_id, "subscription_kind" => ?subscription_kind);
self.queue_event(SubnetServiceMessage::Unsubscribe(Subnet::Attestation(
subnet_id,
)));
}
if subscription_kind == SubscriptionKind::LongLived {
// Remove from our ENR even if we remain subscribed in other way.
self.queue_event(SubnetServiceMessage::EnrRemove(Subnet::Attestation(
subnet_id,
)));
}
}
}
impl<T: BeaconChainTypes> Stream for AttestationService<T> {
type Item = SubnetServiceMessage;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Update the waker if needed.
if let Some(waker) = &self.waker {
if waker.will_wake(cx.waker()) {
self.waker = Some(cx.waker().clone());
}
} else {
self.waker = Some(cx.waker().clone());
}
// Send out any generated events.
if let Some(event) = self.events.pop_front() {
return Poll::Ready(Some(event));
}
// If we aren't subscribed to all subnets, handle the deterministic long-lived subnets
if !self.subscribe_all_subnets {
match self.next_long_lived_subscription_event.as_mut().poll(cx) {
Poll::Ready(_) => {
self.recompute_long_lived_subnets();
// We re-wake the task as there could be other subscriptions to process
self.waker
.as_ref()
.expect("Waker has been set")
.wake_by_ref();
}
Poll::Pending => {}
}
}
// Process scheduled subscriptions that might be ready, since those can extend a soon to
// expire subscription.
match self.scheduled_short_lived_subscriptions.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(ExactSubnet { subnet_id, slot }))) => {
if let Err(e) =
self.subscribe_to_short_lived_subnet_immediately(subnet_id, slot + 1)
{
debug!(self.log, "Failed to subscribe to short lived subnet"; "subnet" => ?subnet_id, "err" => e);
}
self.waker
.as_ref()
.expect("Waker has been set")
.wake_by_ref();
}
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for scheduled subnet subscriptions"; "error"=> e);
}
Poll::Ready(None) | Poll::Pending => {}
}
// Finally process any expired subscriptions.
match self.short_lived_subscriptions.poll_next_unpin(cx) {
Poll::Ready(Some(Ok((subnet_id, _end_slot)))) => {
self.handle_removed_subnet(subnet_id, SubscriptionKind::ShortLived);
// We re-wake the task as there could be other subscriptions to process
self.waker
.as_ref()
.expect("Waker has been set")
.wake_by_ref();
}
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for subnet unsubscription times"; "error"=> e);
}
Poll::Ready(None) | Poll::Pending => {}
}
// Poll to remove entries on expiration, no need to act on expiration events.
if let Some(tracked_vals) = self.aggregate_validators_on_subnet.as_mut() {
if let Poll::Ready(Some(Err(e))) = tracked_vals.poll_next_unpin(cx) {
error!(self.log, "Failed to check for aggregate validator on subnet expirations"; "error"=> e);
}
}
Poll::Pending
}
}

View File

@@ -1,10 +1,25 @@
pub mod attestation_subnets;
pub mod sync_subnets;
//! This service keeps track of which shard subnet the beacon node should be subscribed to at any
//! given time. It schedules subscriptions to shard subnets, requests peer discoveries and
//! determines whether attestations should be aggregated and/or passed to the beacon node.
use lighthouse_network::{Subnet, SubnetDiscovery};
use std::collections::HashSet;
use std::collections::{HashMap, VecDeque};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::time::Instant;
pub use attestation_subnets::AttestationService;
pub use sync_subnets::SyncCommitteeService;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use delay_map::HashSetDelay;
use futures::prelude::*;
use lighthouse_network::{discv5::enr::NodeId, NetworkConfig, Subnet, SubnetDiscovery};
use slog::{debug, error, o, warn};
use slot_clock::SlotClock;
use types::{
Attestation, EthSpec, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId,
ValidatorSubscription,
};
#[cfg(test)]
mod tests;
@@ -17,12 +32,642 @@ pub enum SubnetServiceMessage {
Unsubscribe(Subnet),
/// Add the `SubnetId` to the ENR bitfield.
EnrAdd(Subnet),
/// Remove the `SubnetId` from the ENR bitfield.
EnrRemove(Subnet),
/// Remove a sync committee subnet from the ENR.
EnrRemove(SyncSubnetId),
/// Discover peers for a list of `SubnetDiscovery`.
DiscoverPeers(Vec<SubnetDiscovery>),
}
use crate::metrics;
/// The minimum number of slots ahead that we attempt to discover peers for a subscription. If the
/// slot is less than this number, skip the peer discovery process.
/// Subnet discovery query takes at most 30 secs, 2 slots take 24s.
pub(crate) const MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 2;
/// The fraction of a slot that we subscribe to a subnet before the required slot.
///
/// Currently a whole slot ahead.
const ADVANCE_SUBSCRIBE_SLOT_FRACTION: u32 = 1;
/// The number of slots after an aggregator duty where we remove the entry from
/// `aggregate_validators_on_subnet` delay map.
const UNSUBSCRIBE_AFTER_AGGREGATOR_DUTY: u32 = 2;
/// A particular subnet at a given slot. This is used for Attestation subnets and not for sync
/// committee subnets because the logic for handling subscriptions between these types is different.
#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
pub struct ExactSubnet {
/// The `SubnetId` associated with this subnet.
pub subnet: Subnet,
/// For Attestations, this slot represents the start time at which we need to subscribe to the
/// slot.
pub slot: Slot,
}
/// The enum used to group all kinds of validator subscriptions
#[derive(Debug, Clone, PartialEq)]
pub enum Subscription {
Attestation(ValidatorSubscription),
SyncCommittee(SyncCommitteeSubscription),
}
pub struct SubnetService<T: BeaconChainTypes> {
/// Queued events to return to the driving service.
events: VecDeque<SubnetServiceMessage>,
/// A reference to the beacon chain to process received attestations.
pub(crate) beacon_chain: Arc<BeaconChain<T>>,
/// Subnets we are currently subscribed to as short lived subscriptions.
///
/// Once they expire, we unsubscribe from these.
/// We subscribe to subnets when we are an aggregator for an exact subnet.
// NOTE: When setup the default timeout is set for sync committee subscriptions.
subscriptions: HashSetDelay<Subnet>,
/// Subscriptions that need to be executed in the future.
scheduled_subscriptions: HashSetDelay<Subnet>,
/// A list of permanent subnets that this node is subscribed to.
// TODO: Shift this to a dynamic bitfield
permanent_attestation_subscriptions: HashSet<Subnet>,
/// A collection timeouts to track the existence of aggregate validator subscriptions at an
/// `ExactSubnet`.
aggregate_validators_on_subnet: Option<HashSetDelay<ExactSubnet>>,
/// The waker for the current thread.
waker: Option<std::task::Waker>,
/// The discovery mechanism of lighthouse is disabled.
discovery_disabled: bool,
/// We are always subscribed to all subnets.
subscribe_all_subnets: bool,
/// Whether this node is a block proposer-only node.
proposer_only: bool,
/// The logger for the attestation service.
log: slog::Logger,
}
impl<T: BeaconChainTypes> SubnetService<T> {
/* Public functions */
/// Establish the service based on the passed configuration.
pub fn new(
beacon_chain: Arc<BeaconChain<T>>,
node_id: NodeId,
config: &NetworkConfig,
log: &slog::Logger,
) -> Self {
let log = log.new(o!("service" => "subnet_service"));
let slot_duration = beacon_chain.slot_clock.slot_duration();
if config.subscribe_all_subnets {
slog::info!(log, "Subscribing to all subnets");
}
// Build the list of known permanent subscriptions, so that we know not to subscribe or
// discover them.
let mut permanent_attestation_subscriptions = HashSet::default();
if config.subscribe_all_subnets {
// We are subscribed to all subnets, set all the bits to true.
for index in 0..beacon_chain.spec.attestation_subnet_count {
permanent_attestation_subscriptions
.insert(Subnet::Attestation(SubnetId::from(index)));
}
} else {
// Not subscribed to all subnets, so just calculate the required subnets from the node
// id.
for subnet_id in
SubnetId::compute_attestation_subnets(node_id.raw(), &beacon_chain.spec)
{
permanent_attestation_subscriptions.insert(Subnet::Attestation(subnet_id));
}
}
// Set up the sync committee subscriptions
let spec = &beacon_chain.spec;
let epoch_duration_secs =
beacon_chain.slot_clock.slot_duration().as_secs() * T::EthSpec::slots_per_epoch();
let default_sync_committee_duration = Duration::from_secs(
epoch_duration_secs.saturating_mul(spec.epochs_per_sync_committee_period.as_u64()),
);
let track_validators = !config.import_all_attestations;
let aggregate_validators_on_subnet =
track_validators.then(|| HashSetDelay::new(slot_duration));
let mut events = VecDeque::with_capacity(10);
// Queue discovery queries for the permanent attestation subnets
if !config.disable_discovery {
events.push_back(SubnetServiceMessage::DiscoverPeers(
permanent_attestation_subscriptions
.iter()
.cloned()
.map(|subnet| SubnetDiscovery {
subnet,
min_ttl: None,
})
.collect(),
));
}
// Pre-populate the events with permanent subscriptions
for subnet in permanent_attestation_subscriptions.iter() {
events.push_back(SubnetServiceMessage::Subscribe(*subnet));
events.push_back(SubnetServiceMessage::EnrAdd(*subnet));
}
SubnetService {
events,
beacon_chain,
subscriptions: HashSetDelay::new(default_sync_committee_duration),
permanent_attestation_subscriptions,
scheduled_subscriptions: HashSetDelay::default(),
aggregate_validators_on_subnet,
waker: None,
discovery_disabled: config.disable_discovery,
subscribe_all_subnets: config.subscribe_all_subnets,
proposer_only: config.proposer_only,
log,
}
}
/// Return count of all currently subscribed short-lived subnets.
#[cfg(test)]
pub fn subscriptions(&self) -> impl Iterator<Item = &Subnet> {
self.subscriptions.iter()
}
#[cfg(test)]
pub fn permanent_subscriptions(&self) -> impl Iterator<Item = &Subnet> {
self.permanent_attestation_subscriptions.iter()
}
/// Returns whether we are subscribed to a subnet for testing purposes.
#[cfg(test)]
pub(crate) fn is_subscribed(&self, subnet: &Subnet) -> bool {
self.subscriptions.contains_key(subnet)
}
/// Processes a list of validator subscriptions.
///
/// This is fundamentally called form the HTTP API when a validator requests duties from us
/// This will:
/// - Register new validators as being known.
/// - Search for peers for required subnets.
/// - Request subscriptions for subnets on specific slots when required.
/// - Build the timeouts for each of these events.
///
/// This returns a result simply for the ergonomics of using ?. The result can be
/// safely dropped.
pub fn validator_subscriptions(&mut self, subscriptions: impl Iterator<Item = Subscription>) {
// If the node is in a proposer-only state, we ignore all subnet subscriptions.
if self.proposer_only {
return;
}
// Maps each subnet subscription to it's highest slot
let mut subnets_to_discover: HashMap<Subnet, Slot> = HashMap::new();
// Registers the validator with the attestation service.
for general_subscription in subscriptions {
match general_subscription {
Subscription::Attestation(subscription) => {
metrics::inc_counter(&metrics::SUBNET_SUBSCRIPTION_REQUESTS);
// Compute the subnet that is associated with this subscription
let subnet = match SubnetId::compute_subnet::<T::EthSpec>(
subscription.slot,
subscription.attestation_committee_index,
subscription.committee_count_at_slot,
&self.beacon_chain.spec,
) {
Ok(subnet_id) => Subnet::Attestation(subnet_id),
Err(e) => {
warn!(self.log,
"Failed to compute subnet id for validator subscription";
"error" => ?e,
);
continue;
}
};
// Ensure each subnet_id inserted into the map has the highest slot as it's value.
// Higher slot corresponds to higher min_ttl in the `SubnetDiscovery` entry.
if let Some(slot) = subnets_to_discover.get(&subnet) {
if subscription.slot > *slot {
subnets_to_discover.insert(subnet, subscription.slot);
}
} else if !self.discovery_disabled {
subnets_to_discover.insert(subnet, subscription.slot);
}
let exact_subnet = ExactSubnet {
subnet,
slot: subscription.slot,
};
// Determine if the validator is an aggregator. If so, we subscribe to the subnet and
// if successful add the validator to a mapping of known aggregators for that exact
// subnet.
if subscription.is_aggregator {
metrics::inc_counter(&metrics::SUBNET_SUBSCRIPTION_AGGREGATOR_REQUESTS);
if let Err(e) = self.subscribe_to_subnet(exact_subnet) {
warn!(self.log,
"Subscription to subnet error";
"error" => e,
);
}
}
}
Subscription::SyncCommittee(subscription) => {
metrics::inc_counter(&metrics::SYNC_COMMITTEE_SUBSCRIPTION_REQUESTS);
// NOTE: We assume all subscriptions have been verified before reaching this service
// Registers the validator with the subnet service.
let subnet_ids =
match SyncSubnetId::compute_subnets_for_sync_committee::<T::EthSpec>(
&subscription.sync_committee_indices,
) {
Ok(subnet_ids) => subnet_ids,
Err(e) => {
warn!(self.log,
"Failed to compute subnet id for sync committee subscription";
"error" => ?e,
"validator_index" => subscription.validator_index
);
continue;
}
};
for subnet_id in subnet_ids {
let subnet = Subnet::SyncCommittee(subnet_id);
let slot_required_until = subscription
.until_epoch
.start_slot(T::EthSpec::slots_per_epoch());
subnets_to_discover.insert(subnet, slot_required_until);
let Some(duration_to_unsubscribe) = self
.beacon_chain
.slot_clock
.duration_to_slot(slot_required_until)
else {
warn!(self.log, "Subscription to sync subnet error"; "error" => "Unable to determine duration to unsubscription slot", "validator_index" => subscription.validator_index);
continue;
};
if duration_to_unsubscribe == Duration::from_secs(0) {
let current_slot = self
.beacon_chain
.slot_clock
.now()
.unwrap_or(Slot::from(0u64));
warn!(
self.log,
"Sync committee subscription is past expiration";
"subnet" => ?subnet,
"current_slot" => ?current_slot,
"unsubscribe_slot" => ?slot_required_until, );
continue;
}
self.subscribe_to_sync_subnet(
subnet,
duration_to_unsubscribe,
slot_required_until,
);
}
}
}
}
// If the discovery mechanism isn't disabled, attempt to set up a peer discovery for the
// required subnets.
if !self.discovery_disabled {
if let Err(e) = self.discover_peers_request(subnets_to_discover.into_iter()) {
warn!(self.log, "Discovery lookup request error"; "error" => e);
};
}
}
/// Checks if we have subscribed aggregate validators for the subnet. If not, checks the gossip
/// verification, re-propagates and returns false.
pub fn should_process_attestation(
&self,
subnet: Subnet,
attestation: &Attestation<T::EthSpec>,
) -> bool {
// Proposer-only mode does not need to process attestations
if self.proposer_only {
return false;
}
self.aggregate_validators_on_subnet
.as_ref()
.map(|tracked_vals| {
tracked_vals.contains_key(&ExactSubnet {
subnet,
slot: attestation.data().slot,
})
})
.unwrap_or(true)
}
/* Internal private functions */
/// Adds an event to the event queue and notifies that this service is ready to be polled
/// again.
fn queue_event(&mut self, ev: SubnetServiceMessage) {
self.events.push_back(ev);
if let Some(waker) = &self.waker {
waker.wake_by_ref()
}
}
/// Checks if there are currently queued discovery requests and the time required to make the
/// request.
///
/// If there is sufficient time, queues a peer discovery request for all the required subnets.
// NOTE: Sending early subscriptions results in early searching for peers on subnets.
fn discover_peers_request(
&mut self,
subnets_to_discover: impl Iterator<Item = (Subnet, Slot)>,
) -> Result<(), &'static str> {
let current_slot = self
.beacon_chain
.slot_clock
.now()
.ok_or("Could not get the current slot")?;
let discovery_subnets: Vec<SubnetDiscovery> = subnets_to_discover
.filter_map(|(subnet, relevant_slot)| {
// We generate discovery requests for all subnets (even one's we are permenantly
// subscribed to) in order to ensure our peer counts are satisfactory to perform the
// necessary duties.
// Check if there is enough time to perform a discovery lookup.
if relevant_slot >= current_slot.saturating_add(MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD)
{
// Send out an event to start looking for peers.
// Require the peer for an additional slot to ensure we keep the peer for the
// duration of the subscription.
let min_ttl = self
.beacon_chain
.slot_clock
.duration_to_slot(relevant_slot + 1)
.map(|duration| std::time::Instant::now() + duration);
Some(SubnetDiscovery { subnet, min_ttl })
} else {
// We may want to check the global PeerInfo to see estimated timeouts for each
// peer before they can be removed.
warn!(self.log,
"Not enough time for a discovery search";
"subnet_id" => ?subnet,
);
None
}
})
.collect();
if !discovery_subnets.is_empty() {
self.queue_event(SubnetServiceMessage::DiscoverPeers(discovery_subnets));
}
Ok(())
}
// Subscribes to the subnet if it should be done immediately, or schedules it if required.
fn subscribe_to_subnet(
&mut self,
ExactSubnet { subnet, slot }: ExactSubnet,
) -> Result<(), &'static str> {
// If the subnet is one of our permanent subnets, we do not need to subscribe.
if self.subscribe_all_subnets || self.permanent_attestation_subscriptions.contains(&subnet)
{
return Ok(());
}
let slot_duration = self.beacon_chain.slot_clock.slot_duration();
// The short time we schedule the subscription before it's actually required. This
// ensures we are subscribed on time, and allows consecutive subscriptions to the same
// subnet to overlap, reducing subnet churn.
let advance_subscription_duration = slot_duration / ADVANCE_SUBSCRIBE_SLOT_FRACTION;
// The time to the required slot.
let time_to_subscription_slot = self
.beacon_chain
.slot_clock
.duration_to_slot(slot)
.unwrap_or_default(); // If this is a past slot we will just get a 0 duration.
// Calculate how long before we need to subscribe to the subnet.
let time_to_subscription_start =
time_to_subscription_slot.saturating_sub(advance_subscription_duration);
// The time after a duty slot where we no longer need it in the `aggregate_validators_on_subnet`
// delay map.
let time_to_unsubscribe =
time_to_subscription_slot + UNSUBSCRIBE_AFTER_AGGREGATOR_DUTY * slot_duration;
if let Some(tracked_vals) = self.aggregate_validators_on_subnet.as_mut() {
tracked_vals.insert_at(ExactSubnet { subnet, slot }, time_to_unsubscribe);
}
// If the subscription should be done in the future, schedule it. Otherwise subscribe
// immediately.
if time_to_subscription_start.is_zero() {
// This is a current or past slot, we subscribe immediately.
self.subscribe_to_subnet_immediately(subnet, slot + 1)?;
} else {
// This is a future slot, schedule subscribing.
self.scheduled_subscriptions
.insert_at(subnet, time_to_subscription_start);
}
Ok(())
}
/// Adds a subscription event to the sync subnet.
fn subscribe_to_sync_subnet(
&mut self,
subnet: Subnet,
duration_to_unsubscribe: Duration,
slot_required_until: Slot,
) {
// Return if we have subscribed to all subnets
if self.subscribe_all_subnets {
return;
}
// Update the unsubscription duration if we already have a subscription for the subnet
if let Some(current_instant_to_unsubscribe) = self.subscriptions.deadline(&subnet) {
// The extra 500ms in the comparison accounts of the inaccuracy of the underlying
// DelayQueue inside the delaymap struct.
let current_duration_to_unsubscribe = (current_instant_to_unsubscribe
+ Duration::from_millis(500))
.checked_duration_since(Instant::now())
.unwrap_or(Duration::from_secs(0));
if duration_to_unsubscribe > current_duration_to_unsubscribe {
self.subscriptions
.update_timeout(&subnet, duration_to_unsubscribe);
}
} else {
// We have not subscribed before, so subscribe
self.subscriptions
.insert_at(subnet, duration_to_unsubscribe);
// We are not currently subscribed and have no waiting subscription, create one
debug!(self.log, "Subscribing to subnet"; "subnet" => ?subnet, "until" => ?slot_required_until);
self.events
.push_back(SubnetServiceMessage::Subscribe(subnet));
// add the sync subnet to the ENR bitfield
self.events.push_back(SubnetServiceMessage::EnrAdd(subnet));
}
}
/* A collection of functions that handle the various timeouts */
/// Registers a subnet as subscribed.
///
/// Checks that the time in which the subscription would end is not in the past. If we are
/// already subscribed, extends the timeout if necessary. If this is a new subscription, we send
/// out the appropriate events.
fn subscribe_to_subnet_immediately(
&mut self,
subnet: Subnet,
end_slot: Slot,
) -> Result<(), &'static str> {
if self.subscribe_all_subnets {
// Case not handled by this service.
return Ok(());
}
let time_to_subscription_end = self
.beacon_chain
.slot_clock
.duration_to_slot(end_slot)
.unwrap_or_default();
// First check this is worth doing.
if time_to_subscription_end.is_zero() {
return Err("Time when subscription would end has already passed.");
}
// Check if we already have this subscription. If we do, optionally update the timeout of
// when we need the subscription, otherwise leave as is.
// If this is a new subscription simply add it to our mapping and subscribe.
match self.subscriptions.deadline(&subnet) {
Some(current_end_slot_time) => {
// We are already subscribed. Check if we need to extend the subscription.
if current_end_slot_time
.checked_duration_since(Instant::now())
.unwrap_or(Duration::from_secs(0))
< time_to_subscription_end
{
self.subscriptions
.update_timeout(&subnet, time_to_subscription_end);
}
}
None => {
// This is a new subscription. Add with the corresponding timeout and send the
// notification.
self.subscriptions
.insert_at(subnet, time_to_subscription_end);
// Inform of the subscription.
debug!(self.log, "Subscribing to subnet";
"subnet" => ?subnet,
"end_slot" => end_slot,
);
self.queue_event(SubnetServiceMessage::Subscribe(subnet));
}
}
Ok(())
}
// Unsubscribes from a subnet that was removed.
fn handle_removed_subnet(&mut self, subnet: Subnet) {
if !self.subscriptions.contains_key(&subnet) {
// Subscription no longer exists as short lived subnet
debug!(self.log, "Unsubscribing from subnet"; "subnet" => ?subnet);
self.queue_event(SubnetServiceMessage::Unsubscribe(subnet));
// If this is a sync subnet, we need to remove it from our ENR.
if let Subnet::SyncCommittee(sync_subnet_id) = subnet {
self.queue_event(SubnetServiceMessage::EnrRemove(sync_subnet_id));
}
}
}
}
impl<T: BeaconChainTypes> Stream for SubnetService<T> {
type Item = SubnetServiceMessage;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Update the waker if needed.
if let Some(waker) = &self.waker {
if waker.will_wake(cx.waker()) {
self.waker = Some(cx.waker().clone());
}
} else {
self.waker = Some(cx.waker().clone());
}
// Send out any generated events.
if let Some(event) = self.events.pop_front() {
return Poll::Ready(Some(event));
}
// Process scheduled subscriptions that might be ready, since those can extend a soon to
// expire subscription.
match self.scheduled_subscriptions.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(subnet))) => {
let current_slot = self.beacon_chain.slot_clock.now().unwrap_or_default();
if let Err(e) = self.subscribe_to_subnet_immediately(subnet, current_slot + 1) {
debug!(self.log, "Failed to subscribe to short lived subnet"; "subnet" => ?subnet, "err" => e);
}
self.waker
.as_ref()
.expect("Waker has been set")
.wake_by_ref();
}
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for scheduled subnet subscriptions"; "error"=> e);
}
Poll::Ready(None) | Poll::Pending => {}
}
// Process any expired subscriptions.
match self.subscriptions.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(subnet))) => {
self.handle_removed_subnet(subnet);
// We re-wake the task as there could be other subscriptions to process
self.waker
.as_ref()
.expect("Waker has been set")
.wake_by_ref();
}
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for subnet unsubscription times"; "error"=> e);
}
Poll::Ready(None) | Poll::Pending => {}
}
// Poll to remove entries on expiration, no need to act on expiration events.
if let Some(tracked_vals) = self.aggregate_validators_on_subnet.as_mut() {
if let Poll::Ready(Some(Err(e))) = tracked_vals.poll_next_unpin(cx) {
error!(self.log, "Failed to check for aggregate validator on subnet expirations"; "error"=> e);
}
}
Poll::Pending
}
}
/// Note: This `PartialEq` impl is for use only in tests.
/// The `DiscoverPeers` comparison is good enough for testing only.
#[cfg(test)]
@@ -32,7 +677,6 @@ impl PartialEq for SubnetServiceMessage {
(SubnetServiceMessage::Subscribe(a), SubnetServiceMessage::Subscribe(b)) => a == b,
(SubnetServiceMessage::Unsubscribe(a), SubnetServiceMessage::Unsubscribe(b)) => a == b,
(SubnetServiceMessage::EnrAdd(a), SubnetServiceMessage::EnrAdd(b)) => a == b,
(SubnetServiceMessage::EnrRemove(a), SubnetServiceMessage::EnrRemove(b)) => a == b,
(SubnetServiceMessage::DiscoverPeers(a), SubnetServiceMessage::DiscoverPeers(b)) => {
if a.len() != b.len() {
return false;

View File

@@ -1,359 +0,0 @@
//! This service keeps track of which sync committee subnet the beacon node should be subscribed to at any
//! given time. It schedules subscriptions to sync committee subnets and requests peer discoveries.
use std::collections::{hash_map::Entry, HashMap, VecDeque};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use futures::prelude::*;
use slog::{debug, error, o, trace, warn};
use super::SubnetServiceMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use delay_map::HashSetDelay;
use lighthouse_network::{NetworkConfig, Subnet, SubnetDiscovery};
use slot_clock::SlotClock;
use types::{Epoch, EthSpec, SyncCommitteeSubscription, SyncSubnetId};
use crate::metrics;
/// The minimum number of slots ahead that we attempt to discover peers for a subscription. If the
/// slot is less than this number, skip the peer discovery process.
/// Subnet discovery query takes at most 30 secs, 2 slots take 24s.
const MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 2;
/// A particular subnet at a given slot.
#[derive(PartialEq, Eq, Hash, Clone, Debug)]
pub struct ExactSubnet {
/// The `SyncSubnetId` associated with this subnet.
pub subnet_id: SyncSubnetId,
/// The epoch until which we need to stay subscribed to the subnet.
pub until_epoch: Epoch,
}
pub struct SyncCommitteeService<T: BeaconChainTypes> {
/// Queued events to return to the driving service.
events: VecDeque<SubnetServiceMessage>,
/// A reference to the beacon chain to process received attestations.
pub(crate) beacon_chain: Arc<BeaconChain<T>>,
/// The collection of all currently subscribed subnets.
subscriptions: HashMap<SyncSubnetId, Epoch>,
/// A collection of timeouts for when to unsubscribe from a subnet.
unsubscriptions: HashSetDelay<SyncSubnetId>,
/// The waker for the current thread.
waker: Option<std::task::Waker>,
/// The discovery mechanism of lighthouse is disabled.
discovery_disabled: bool,
/// We are always subscribed to all subnets.
subscribe_all_subnets: bool,
/// Whether this node is a block proposer-only node.
proposer_only: bool,
/// The logger for the attestation service.
log: slog::Logger,
}
impl<T: BeaconChainTypes> SyncCommitteeService<T> {
/* Public functions */
pub fn new(
beacon_chain: Arc<BeaconChain<T>>,
config: &NetworkConfig,
log: &slog::Logger,
) -> Self {
let log = log.new(o!("service" => "sync_committee_service"));
let spec = &beacon_chain.spec;
let epoch_duration_secs =
beacon_chain.slot_clock.slot_duration().as_secs() * T::EthSpec::slots_per_epoch();
let default_timeout =
epoch_duration_secs.saturating_mul(spec.epochs_per_sync_committee_period.as_u64());
SyncCommitteeService {
events: VecDeque::with_capacity(10),
beacon_chain,
subscriptions: HashMap::new(),
unsubscriptions: HashSetDelay::new(Duration::from_secs(default_timeout)),
waker: None,
subscribe_all_subnets: config.subscribe_all_subnets,
discovery_disabled: config.disable_discovery,
proposer_only: config.proposer_only,
log,
}
}
/// Return count of all currently subscribed subnets.
#[cfg(test)]
pub fn subscription_count(&self) -> usize {
use types::consts::altair::SYNC_COMMITTEE_SUBNET_COUNT;
if self.subscribe_all_subnets {
SYNC_COMMITTEE_SUBNET_COUNT as usize
} else {
self.subscriptions.len()
}
}
/// Processes a list of sync committee subscriptions.
///
/// This will:
/// - Search for peers for required subnets.
/// - Request subscriptions required subnets.
/// - Build the timeouts for each of these events.
///
/// This returns a result simply for the ergonomics of using ?. The result can be
/// safely dropped.
pub fn validator_subscriptions(
&mut self,
subscriptions: Vec<SyncCommitteeSubscription>,
) -> Result<(), String> {
// A proposer-only node does not subscribe to any sync-committees
if self.proposer_only {
return Ok(());
}
let mut subnets_to_discover = Vec::new();
for subscription in subscriptions {
metrics::inc_counter(&metrics::SYNC_COMMITTEE_SUBSCRIPTION_REQUESTS);
//NOTE: We assume all subscriptions have been verified before reaching this service
// Registers the validator with the subnet service.
// This will subscribe to long-lived random subnets if required.
trace!(self.log,
"Sync committee subscription";
"subscription" => ?subscription,
);
let subnet_ids = match SyncSubnetId::compute_subnets_for_sync_committee::<T::EthSpec>(
&subscription.sync_committee_indices,
) {
Ok(subnet_ids) => subnet_ids,
Err(e) => {
warn!(self.log,
"Failed to compute subnet id for sync committee subscription";
"error" => ?e,
"validator_index" => subscription.validator_index
);
continue;
}
};
for subnet_id in subnet_ids {
let exact_subnet = ExactSubnet {
subnet_id,
until_epoch: subscription.until_epoch,
};
subnets_to_discover.push(exact_subnet.clone());
if let Err(e) = self.subscribe_to_subnet(exact_subnet.clone()) {
warn!(self.log,
"Subscription to sync subnet error";
"error" => e,
"validator_index" => subscription.validator_index,
);
} else {
trace!(self.log,
"Subscribed to subnet for sync committee duties";
"exact_subnet" => ?exact_subnet,
"validator_index" => subscription.validator_index
);
}
}
}
// If the discovery mechanism isn't disabled, attempt to set up a peer discovery for the
// required subnets.
if !self.discovery_disabled {
if let Err(e) = self.discover_peers_request(subnets_to_discover.iter()) {
warn!(self.log, "Discovery lookup request error"; "error" => e);
};
}
// pre-emptively wake the thread to check for new events
if let Some(waker) = &self.waker {
waker.wake_by_ref();
}
Ok(())
}
/* Internal private functions */
/// Checks if there are currently queued discovery requests and the time required to make the
/// request.
///
/// If there is sufficient time, queues a peer discovery request for all the required subnets.
fn discover_peers_request<'a>(
&mut self,
exact_subnets: impl Iterator<Item = &'a ExactSubnet>,
) -> Result<(), &'static str> {
let current_slot = self
.beacon_chain
.slot_clock
.now()
.ok_or("Could not get the current slot")?;
let slots_per_epoch = T::EthSpec::slots_per_epoch();
let discovery_subnets: Vec<SubnetDiscovery> = exact_subnets
.filter_map(|exact_subnet| {
let until_slot = exact_subnet.until_epoch.end_slot(slots_per_epoch);
// check if there is enough time to perform a discovery lookup
if until_slot >= current_slot.saturating_add(MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD) {
// if the slot is more than epoch away, add an event to start looking for peers
// add one slot to ensure we keep the peer for the subscription slot
let min_ttl = self
.beacon_chain
.slot_clock
.duration_to_slot(until_slot + 1)
.map(|duration| std::time::Instant::now() + duration);
Some(SubnetDiscovery {
subnet: Subnet::SyncCommittee(exact_subnet.subnet_id),
min_ttl,
})
} else {
// We may want to check the global PeerInfo to see estimated timeouts for each
// peer before they can be removed.
warn!(self.log,
"Not enough time for a discovery search";
"subnet_id" => ?exact_subnet
);
None
}
})
.collect();
if !discovery_subnets.is_empty() {
self.events
.push_back(SubnetServiceMessage::DiscoverPeers(discovery_subnets));
}
Ok(())
}
/// Adds a subscription event and an associated unsubscription event if required.
fn subscribe_to_subnet(&mut self, exact_subnet: ExactSubnet) -> Result<(), &'static str> {
// Return if we have subscribed to all subnets
if self.subscribe_all_subnets {
return Ok(());
}
// Return if we already have a subscription for exact_subnet
if self.subscriptions.get(&exact_subnet.subnet_id) == Some(&exact_subnet.until_epoch) {
return Ok(());
}
// Return if we already have subscription set to expire later than the current request.
if let Some(until_epoch) = self.subscriptions.get(&exact_subnet.subnet_id) {
if *until_epoch >= exact_subnet.until_epoch {
return Ok(());
}
}
// initialise timing variables
let current_slot = self
.beacon_chain
.slot_clock
.now()
.ok_or("Could not get the current slot")?;
let slots_per_epoch = T::EthSpec::slots_per_epoch();
let until_slot = exact_subnet.until_epoch.end_slot(slots_per_epoch);
// Calculate the duration to the unsubscription event.
let expected_end_subscription_duration = if current_slot >= until_slot {
warn!(
self.log,
"Sync committee subscription is past expiration";
"current_slot" => current_slot,
"exact_subnet" => ?exact_subnet,
);
return Ok(());
} else {
let slot_duration = self.beacon_chain.slot_clock.slot_duration();
// the duration until we no longer need this subscription. We assume a single slot is
// sufficient.
self.beacon_chain
.slot_clock
.duration_to_slot(until_slot)
.ok_or("Unable to determine duration to unsubscription slot")?
+ slot_duration
};
if let Entry::Vacant(e) = self.subscriptions.entry(exact_subnet.subnet_id) {
// We are not currently subscribed and have no waiting subscription, create one
debug!(self.log, "Subscribing to subnet"; "subnet" => *exact_subnet.subnet_id, "until_epoch" => ?exact_subnet.until_epoch);
e.insert(exact_subnet.until_epoch);
self.events
.push_back(SubnetServiceMessage::Subscribe(Subnet::SyncCommittee(
exact_subnet.subnet_id,
)));
// add the subnet to the ENR bitfield
self.events
.push_back(SubnetServiceMessage::EnrAdd(Subnet::SyncCommittee(
exact_subnet.subnet_id,
)));
// add an unsubscription event to remove ourselves from the subnet once completed
self.unsubscriptions
.insert_at(exact_subnet.subnet_id, expected_end_subscription_duration);
} else {
// We are already subscribed, extend the unsubscription duration
self.unsubscriptions
.update_timeout(&exact_subnet.subnet_id, expected_end_subscription_duration);
}
Ok(())
}
/// A queued unsubscription is ready.
fn handle_unsubscriptions(&mut self, subnet_id: SyncSubnetId) {
debug!(self.log, "Unsubscribing from subnet"; "subnet" => *subnet_id);
self.subscriptions.remove(&subnet_id);
self.events
.push_back(SubnetServiceMessage::Unsubscribe(Subnet::SyncCommittee(
subnet_id,
)));
self.events
.push_back(SubnetServiceMessage::EnrRemove(Subnet::SyncCommittee(
subnet_id,
)));
}
}
impl<T: BeaconChainTypes> Stream for SyncCommitteeService<T> {
type Item = SubnetServiceMessage;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// update the waker if needed
if let Some(waker) = &self.waker {
if waker.will_wake(cx.waker()) {
self.waker = Some(cx.waker().clone());
}
} else {
self.waker = Some(cx.waker().clone());
}
// process any un-subscription events
match self.unsubscriptions.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(exact_subnet))) => self.handle_unsubscriptions(exact_subnet),
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for subnet unsubscription times"; "error"=> e);
}
Poll::Ready(None) | Poll::Pending => {}
}
// process any generated events
if let Some(event) = self.events.pop_front() {
return Poll::Ready(Some(event));
}
Poll::Pending
}
}

View File

@@ -5,9 +5,9 @@ use beacon_chain::{
test_utils::get_kzg,
BeaconChain,
};
use futures::prelude::*;
use genesis::{generate_deterministic_keypairs, interop_genesis_state, DEFAULT_ETH1_BLOCK_HASH};
use lighthouse_network::NetworkConfig;
use logging::test_logger;
use slog::{o, Drain, Logger};
use sloggers::{null::NullLoggerBuilder, Build};
use slot_clock::{SlotClock, SystemTimeSlotClock};
@@ -21,6 +21,10 @@ use types::{
SyncCommitteeSubscription, SyncSubnetId, ValidatorSubscription,
};
// Set to enable/disable logging
// const TEST_LOG_LEVEL: Option<slog::Level> = Some(slog::Level::Debug);
const TEST_LOG_LEVEL: Option<slog::Level> = None;
const SLOT_DURATION_MILLIS: u64 = 400;
type TestBeaconChainType = Witness<
@@ -42,7 +46,7 @@ impl TestBeaconChain {
let keypairs = generate_deterministic_keypairs(1);
let log = get_logger(None);
let log = get_logger(TEST_LOG_LEVEL);
let store =
HotColdDB::open_ephemeral(StoreConfig::default(), spec.clone(), log.clone()).unwrap();
@@ -114,15 +118,13 @@ fn get_logger(log_level: Option<slog::Level>) -> Logger {
static CHAIN: LazyLock<TestBeaconChain> = LazyLock::new(TestBeaconChain::new_with_system_clock);
fn get_attestation_service(
log_level: Option<slog::Level>,
) -> AttestationService<TestBeaconChainType> {
let log = get_logger(log_level);
fn get_subnet_service() -> SubnetService<TestBeaconChainType> {
let log = test_logger();
let config = NetworkConfig::default();
let beacon_chain = CHAIN.chain.clone();
AttestationService::new(
SubnetService::new(
beacon_chain,
lighthouse_network::discv5::enr::NodeId::random(),
&config,
@@ -130,15 +132,6 @@ fn get_attestation_service(
)
}
fn get_sync_committee_service() -> SyncCommitteeService<TestBeaconChainType> {
let log = get_logger(None);
let config = NetworkConfig::default();
let beacon_chain = CHAIN.chain.clone();
SyncCommitteeService::new(beacon_chain, &config, &log)
}
// gets a number of events from the subscription service, or returns none if it times out after a number
// of slots
async fn get_events<S: Stream<Item = SubnetServiceMessage> + Unpin>(
@@ -172,10 +165,10 @@ async fn get_events<S: Stream<Item = SubnetServiceMessage> + Unpin>(
events
}
mod attestation_service {
mod test {
#[cfg(not(windows))]
use crate::subnet_service::attestation_subnets::MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD;
use crate::subnet_service::MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD;
use super::*;
@@ -184,13 +177,13 @@ mod attestation_service {
slot: Slot,
committee_count_at_slot: u64,
is_aggregator: bool,
) -> ValidatorSubscription {
ValidatorSubscription {
) -> Subscription {
Subscription::Attestation(ValidatorSubscription {
attestation_committee_index,
slot,
committee_count_at_slot,
is_aggregator,
}
})
}
fn get_subscriptions(
@@ -198,7 +191,7 @@ mod attestation_service {
slot: Slot,
committee_count_at_slot: u64,
is_aggregator: bool,
) -> Vec<ValidatorSubscription> {
) -> Vec<Subscription> {
(0..validator_count)
.map(|validator_index| {
get_subscription(
@@ -215,72 +208,77 @@ mod attestation_service {
async fn subscribe_current_slot_wait_for_unsubscribe() {
// subscription config
let committee_index = 1;
// Keep a low subscription slot so that there are no additional subnet discovery events.
let subscription_slot = 0;
let committee_count = 1;
let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize;
// create the attestation service and subscriptions
let mut attestation_service = get_attestation_service(None);
let current_slot = attestation_service
let mut subnet_service = get_subnet_service();
let _events = get_events(&mut subnet_service, None, 1).await;
let current_slot = subnet_service
.beacon_chain
.slot_clock
.now()
.expect("Could not get current slot");
// Generate a subnet that isn't in our permanent subnet collection
let subscription_slot = current_slot + 1;
let mut committee_count = 1;
let mut subnet = Subnet::Attestation(
SubnetId::compute_subnet::<MainnetEthSpec>(
current_slot,
committee_index,
committee_count,
&subnet_service.beacon_chain.spec,
)
.unwrap(),
);
while subnet_service
.permanent_subscriptions()
.any(|x| *x == subnet)
{
committee_count += 1;
subnet = Subnet::Attestation(
SubnetId::compute_subnet::<MainnetEthSpec>(
subscription_slot,
committee_index,
committee_count,
&subnet_service.beacon_chain.spec,
)
.unwrap(),
);
}
let subscriptions = vec![get_subscription(
committee_index,
current_slot + Slot::new(subscription_slot),
current_slot,
committee_count,
true,
)];
// submit the subscriptions
attestation_service
.validator_subscriptions(subscriptions.into_iter())
.unwrap();
subnet_service.validator_subscriptions(subscriptions.into_iter());
// not enough time for peer discovery, just subscribe, unsubscribe
let subnet_id = SubnetId::compute_subnet::<MainnetEthSpec>(
current_slot + Slot::new(subscription_slot),
committee_index,
committee_count,
&attestation_service.beacon_chain.spec,
)
.unwrap();
let expected = [
SubnetServiceMessage::Subscribe(Subnet::Attestation(subnet_id)),
SubnetServiceMessage::Unsubscribe(Subnet::Attestation(subnet_id)),
SubnetServiceMessage::Subscribe(subnet),
SubnetServiceMessage::Unsubscribe(subnet),
];
// Wait for 1 slot duration to get the unsubscription event
let events = get_events(
&mut attestation_service,
Some(subnets_per_node * 3 + 2),
(MainnetEthSpec::slots_per_epoch() * 3) as u32,
&mut subnet_service,
Some(2),
(MainnetEthSpec::slots_per_epoch()) as u32,
)
.await;
matches::assert_matches!(
events[..6],
[
SubnetServiceMessage::Subscribe(_any1),
SubnetServiceMessage::EnrAdd(_any3),
SubnetServiceMessage::DiscoverPeers(_),
SubnetServiceMessage::Subscribe(_),
SubnetServiceMessage::EnrAdd(_),
SubnetServiceMessage::DiscoverPeers(_),
]
);
assert_eq!(events, expected);
// If the long lived and short lived subnets are the same, there should be no more events
// as we don't resubscribe already subscribed subnets.
if !attestation_service
.is_subscribed(&subnet_id, attestation_subnets::SubscriptionKind::LongLived)
{
assert_eq!(expected[..], events[subnets_per_node * 3..]);
}
// Should be subscribed to only subnets_per_node long lived subnet after unsubscription.
assert_eq!(attestation_service.subscription_count(), subnets_per_node);
// Should be subscribed to only subnets_per_node permananet subnet after unsubscription.
assert_eq!(
subnet_service.permanent_subscriptions().count(),
subnets_per_node
);
assert_eq!(subnet_service.subscriptions().count(), 0);
}
/// Test to verify that we are not unsubscribing to a subnet before a required subscription.
@@ -289,7 +287,6 @@ mod attestation_service {
async fn test_same_subnet_unsubscription() {
// subscription config
let committee_count = 1;
let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize;
// Makes 2 validator subscriptions to the same subnet but at different slots.
// There should be just 1 unsubscription event for the later slot subscription (subscription_slot2).
@@ -298,9 +295,10 @@ mod attestation_service {
let com1 = 1;
let com2 = 0;
// create the attestation service and subscriptions
let mut attestation_service = get_attestation_service(None);
let current_slot = attestation_service
// create the subnet service and subscriptions
let mut subnet_service = get_subnet_service();
let _events = get_events(&mut subnet_service, None, 0).await;
let current_slot = subnet_service
.beacon_chain
.slot_clock
.now()
@@ -324,7 +322,7 @@ mod attestation_service {
current_slot + Slot::new(subscription_slot1),
com1,
committee_count,
&attestation_service.beacon_chain.spec,
&subnet_service.beacon_chain.spec,
)
.unwrap();
@@ -332,7 +330,7 @@ mod attestation_service {
current_slot + Slot::new(subscription_slot2),
com2,
committee_count,
&attestation_service.beacon_chain.spec,
&subnet_service.beacon_chain.spec,
)
.unwrap();
@@ -341,110 +339,80 @@ mod attestation_service {
assert_eq!(subnet_id1, subnet_id2);
// submit the subscriptions
attestation_service
.validator_subscriptions(vec![sub1, sub2].into_iter())
.unwrap();
subnet_service.validator_subscriptions(vec![sub1, sub2].into_iter());
// Unsubscription event should happen at slot 2 (since subnet id's are the same, unsubscription event should be at higher slot + 1)
// Get all events for 1 slot duration (unsubscription event should happen after 2 slot durations).
let events = get_events(&mut attestation_service, None, 1).await;
matches::assert_matches!(
events[..3],
[
SubnetServiceMessage::Subscribe(_any1),
SubnetServiceMessage::EnrAdd(_any3),
SubnetServiceMessage::DiscoverPeers(_),
]
);
let expected = SubnetServiceMessage::Subscribe(Subnet::Attestation(subnet_id1));
// Should be still subscribed to 2 long lived and up to 1 short lived subnet if both are
// different.
if !attestation_service.is_subscribed(
&subnet_id1,
attestation_subnets::SubscriptionKind::LongLived,
) {
// The index is 3*subnets_per_node (because we subscribe + discover + enr per long lived
// subnet) + 1
let index = 3 * subnets_per_node;
assert_eq!(expected, events[index]);
assert_eq!(
attestation_service.subscription_count(),
subnets_per_node + 1
);
if subnet_service.is_subscribed(&Subnet::Attestation(subnet_id1)) {
// If we are permanently subscribed to this subnet, we won't see a subscribe message
let _ = get_events(&mut subnet_service, None, 1).await;
} else {
assert!(attestation_service.subscription_count() == subnets_per_node);
let subscription = get_events(&mut subnet_service, None, 1).await;
assert_eq!(subscription, [expected]);
}
// Get event for 1 more slot duration, we should get the unsubscribe event now.
let unsubscribe_event = get_events(&mut attestation_service, None, 1).await;
let unsubscribe_event = get_events(&mut subnet_service, None, 1).await;
// If the long lived and short lived subnets are different, we should get an unsubscription
// event.
if !attestation_service.is_subscribed(
&subnet_id1,
attestation_subnets::SubscriptionKind::LongLived,
) {
assert_eq!(
[SubnetServiceMessage::Unsubscribe(Subnet::Attestation(
subnet_id1
))],
unsubscribe_event[..]
);
let expected = SubnetServiceMessage::Unsubscribe(Subnet::Attestation(subnet_id1));
if !subnet_service.is_subscribed(&Subnet::Attestation(subnet_id1)) {
assert_eq!([expected], unsubscribe_event[..]);
}
// Should be subscribed 2 long lived subnet after unsubscription.
assert_eq!(attestation_service.subscription_count(), subnets_per_node);
// Should no longer be subscribed to any short lived subnets after unsubscription.
assert_eq!(subnet_service.subscriptions().count(), 0);
}
#[tokio::test]
async fn subscribe_all_subnets() {
let attestation_subnet_count = MainnetEthSpec::default_spec().attestation_subnet_count;
let subscription_slot = 3;
let subscription_count = attestation_subnet_count;
let subscriptions_count = attestation_subnet_count;
let committee_count = 1;
let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize;
// create the attestation service and subscriptions
let mut attestation_service = get_attestation_service(None);
let current_slot = attestation_service
let mut subnet_service = get_subnet_service();
let current_slot = subnet_service
.beacon_chain
.slot_clock
.now()
.expect("Could not get current slot");
let subscriptions = get_subscriptions(
subscription_count,
subscriptions_count,
current_slot + subscription_slot,
committee_count,
true,
);
// submit the subscriptions
attestation_service
.validator_subscriptions(subscriptions.into_iter())
.unwrap();
subnet_service.validator_subscriptions(subscriptions.into_iter());
let events = get_events(&mut attestation_service, Some(131), 10).await;
let events = get_events(&mut subnet_service, Some(130), 10).await;
let mut discover_peer_count = 0;
let mut enr_add_count = 0;
let mut unexpected_msg_count = 0;
let mut unsubscribe_event_count = 0;
let mut subscription_event_count = 0;
for event in &events {
match event {
SubnetServiceMessage::DiscoverPeers(_) => discover_peer_count += 1,
SubnetServiceMessage::Subscribe(_any_subnet) => {}
SubnetServiceMessage::Subscribe(_any_subnet) => subscription_event_count += 1,
SubnetServiceMessage::EnrAdd(_any_subnet) => enr_add_count += 1,
SubnetServiceMessage::Unsubscribe(_) => unsubscribe_event_count += 1,
_ => unexpected_msg_count += 1,
SubnetServiceMessage::EnrRemove(_) => {}
}
}
// There should be a Subscribe Event, and Enr Add event and a DiscoverPeers event for each
// long-lived subnet initially. The next event should be a bulk discovery event.
let bulk_discovery_index = 3 * subnets_per_node;
// There should be a Subscribe Event, an Enr Add event for each
// permanent subnet initially. There is a single discovery event for the permanent
// subnets.
// The next event should be a bulk discovery event.
let bulk_discovery_index = subnets_per_node * 2 + 1;
// The bulk discovery request length should be equal to validator_count
let bulk_discovery_event = &events[bulk_discovery_index];
if let SubnetServiceMessage::DiscoverPeers(d) = bulk_discovery_event {
@@ -455,14 +423,13 @@ mod attestation_service {
// 64 `DiscoverPeer` requests of length 1 corresponding to deterministic subnets
// and 1 `DiscoverPeer` request corresponding to bulk subnet discovery.
assert_eq!(discover_peer_count, subnets_per_node + 1);
assert_eq!(attestation_service.subscription_count(), subnets_per_node);
assert_eq!(discover_peer_count, 1 + 1);
assert_eq!(subscription_event_count, attestation_subnet_count);
assert_eq!(enr_add_count, subnets_per_node);
assert_eq!(
unsubscribe_event_count,
attestation_subnet_count - subnets_per_node as u64
);
assert_eq!(unexpected_msg_count, 0);
// test completed successfully
}
@@ -473,30 +440,28 @@ mod attestation_service {
let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize;
// the 65th subscription should result in no more messages than the previous scenario
let subscription_count = attestation_subnet_count + 1;
let subscriptions_count = attestation_subnet_count + 1;
let committee_count = 1;
// create the attestation service and subscriptions
let mut attestation_service = get_attestation_service(None);
let current_slot = attestation_service
let mut subnet_service = get_subnet_service();
let current_slot = subnet_service
.beacon_chain
.slot_clock
.now()
.expect("Could not get current slot");
let subscriptions = get_subscriptions(
subscription_count,
subscriptions_count,
current_slot + subscription_slot,
committee_count,
true,
);
// submit the subscriptions
attestation_service
.validator_subscriptions(subscriptions.into_iter())
.unwrap();
subnet_service.validator_subscriptions(subscriptions.into_iter());
let events = get_events(&mut attestation_service, None, 3).await;
let events = get_events(&mut subnet_service, None, 3).await;
let mut discover_peer_count = 0;
let mut enr_add_count = 0;
let mut unexpected_msg_count = 0;
@@ -506,7 +471,10 @@ mod attestation_service {
SubnetServiceMessage::DiscoverPeers(_) => discover_peer_count += 1,
SubnetServiceMessage::Subscribe(_any_subnet) => {}
SubnetServiceMessage::EnrAdd(_any_subnet) => enr_add_count += 1,
_ => unexpected_msg_count += 1,
_ => {
unexpected_msg_count += 1;
println!("{:?}", event);
}
}
}
@@ -520,8 +488,8 @@ mod attestation_service {
// subnets_per_node `DiscoverPeer` requests of length 1 corresponding to long-lived subnets
// and 1 `DiscoverPeer` request corresponding to the bulk subnet discovery.
assert_eq!(discover_peer_count, subnets_per_node + 1);
assert_eq!(attestation_service.subscription_count(), subnets_per_node);
assert_eq!(discover_peer_count, 1 + 1); // Generates a single discovery for permanent
// subscriptions and 1 for the subscription
assert_eq!(enr_add_count, subnets_per_node);
assert_eq!(unexpected_msg_count, 0);
}
@@ -531,7 +499,6 @@ mod attestation_service {
async fn test_subscribe_same_subnet_several_slots_apart() {
// subscription config
let committee_count = 1;
let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize;
// Makes 2 validator subscriptions to the same subnet but at different slots.
// There should be just 1 unsubscription event for the later slot subscription (subscription_slot2).
@@ -541,8 +508,11 @@ mod attestation_service {
let com2 = 0;
// create the attestation service and subscriptions
let mut attestation_service = get_attestation_service(None);
let current_slot = attestation_service
let mut subnet_service = get_subnet_service();
// Remove permanent events
let _events = get_events(&mut subnet_service, None, 0).await;
let current_slot = subnet_service
.beacon_chain
.slot_clock
.now()
@@ -566,7 +536,7 @@ mod attestation_service {
current_slot + Slot::new(subscription_slot1),
com1,
committee_count,
&attestation_service.beacon_chain.spec,
&subnet_service.beacon_chain.spec,
)
.unwrap();
@@ -574,7 +544,7 @@ mod attestation_service {
current_slot + Slot::new(subscription_slot2),
com2,
committee_count,
&attestation_service.beacon_chain.spec,
&subnet_service.beacon_chain.spec,
)
.unwrap();
@@ -583,39 +553,26 @@ mod attestation_service {
assert_eq!(subnet_id1, subnet_id2);
// submit the subscriptions
attestation_service
.validator_subscriptions(vec![sub1, sub2].into_iter())
.unwrap();
subnet_service.validator_subscriptions(vec![sub1, sub2].into_iter());
// Unsubscription event should happen at the end of the slot.
let events = get_events(&mut attestation_service, None, 1).await;
matches::assert_matches!(
events[..3],
[
SubnetServiceMessage::Subscribe(_any1),
SubnetServiceMessage::EnrAdd(_any3),
SubnetServiceMessage::DiscoverPeers(_),
]
);
let events = get_events(&mut subnet_service, None, 1).await;
let expected_subscription =
SubnetServiceMessage::Subscribe(Subnet::Attestation(subnet_id1));
let expected_unsubscription =
SubnetServiceMessage::Unsubscribe(Subnet::Attestation(subnet_id1));
if !attestation_service.is_subscribed(
&subnet_id1,
attestation_subnets::SubscriptionKind::LongLived,
) {
assert_eq!(expected_subscription, events[subnets_per_node * 3]);
assert_eq!(expected_unsubscription, events[subnets_per_node * 3 + 2]);
if !subnet_service.is_subscribed(&Subnet::Attestation(subnet_id1)) {
assert_eq!(expected_subscription, events[0]);
assert_eq!(expected_unsubscription, events[2]);
}
assert_eq!(attestation_service.subscription_count(), 2);
assert_eq!(subnet_service.subscriptions().count(), 0);
println!("{events:?}");
let subscription_slot = current_slot + subscription_slot2 - 1; // one less do to the
// advance subscription time
let wait_slots = attestation_service
let wait_slots = subnet_service
.beacon_chain
.slot_clock
.duration_to_slot(subscription_slot)
@@ -623,90 +580,42 @@ mod attestation_service {
.as_millis() as u64
/ SLOT_DURATION_MILLIS;
let no_events = dbg!(get_events(&mut attestation_service, None, wait_slots as u32).await);
let no_events = dbg!(get_events(&mut subnet_service, None, wait_slots as u32).await);
assert_eq!(no_events, []);
let second_subscribe_event = get_events(&mut attestation_service, None, 2).await;
// If the long lived and short lived subnets are different, we should get an unsubscription event.
if !attestation_service.is_subscribed(
&subnet_id1,
attestation_subnets::SubscriptionKind::LongLived,
) {
let second_subscribe_event = get_events(&mut subnet_service, None, 2).await;
// If the permanent and short lived subnets are different, we should get an unsubscription event.
if !subnet_service.is_subscribed(&Subnet::Attestation(subnet_id1)) {
assert_eq!(
[SubnetServiceMessage::Subscribe(Subnet::Attestation(
subnet_id1
))],
[expected_subscription, expected_unsubscription],
second_subscribe_event[..]
);
}
}
#[tokio::test]
async fn test_update_deterministic_long_lived_subnets() {
let mut attestation_service = get_attestation_service(None);
let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize;
let current_slot = attestation_service
.beacon_chain
.slot_clock
.now()
.expect("Could not get current slot");
let subscriptions = get_subscriptions(20, current_slot, 30, false);
// submit the subscriptions
attestation_service
.validator_subscriptions(subscriptions.into_iter())
.unwrap();
// There should only be the same subscriptions as there are in the specification,
// regardless of subscriptions
assert_eq!(
attestation_service.long_lived_subscriptions().len(),
subnets_per_node
);
let events = get_events(&mut attestation_service, None, 4).await;
// Check that we attempt to subscribe and register ENRs
matches::assert_matches!(
events[..6],
[
SubnetServiceMessage::Subscribe(_),
SubnetServiceMessage::EnrAdd(_),
SubnetServiceMessage::DiscoverPeers(_),
SubnetServiceMessage::Subscribe(_),
SubnetServiceMessage::EnrAdd(_),
SubnetServiceMessage::DiscoverPeers(_),
]
);
}
}
mod sync_committee_service {
use super::*;
#[tokio::test]
async fn subscribe_and_unsubscribe() {
async fn subscribe_and_unsubscribe_sync_committee() {
// subscription config
let validator_index = 1;
let until_epoch = Epoch::new(1);
let sync_committee_indices = vec![1];
// create the attestation service and subscriptions
let mut sync_committee_service = get_sync_committee_service();
let mut subnet_service = get_subnet_service();
let _events = get_events(&mut subnet_service, None, 0).await;
let subscriptions = vec![SyncCommitteeSubscription {
validator_index,
sync_committee_indices: sync_committee_indices.clone(),
until_epoch,
}];
let subscriptions =
std::iter::once(Subscription::SyncCommittee(SyncCommitteeSubscription {
validator_index,
sync_committee_indices: sync_committee_indices.clone(),
until_epoch,
}));
// submit the subscriptions
sync_committee_service
.validator_subscriptions(subscriptions)
.unwrap();
subnet_service.validator_subscriptions(subscriptions);
// Remove permanent subscription events
let subnet_ids = SyncSubnetId::compute_subnets_for_sync_committee::<MainnetEthSpec>(
&sync_committee_indices,
@@ -716,7 +625,7 @@ mod sync_committee_service {
// Note: the unsubscription event takes 2 epochs (8 * 2 * 0.4 secs = 3.2 secs)
let events = get_events(
&mut sync_committee_service,
&mut subnet_service,
Some(5),
(MainnetEthSpec::slots_per_epoch() * 3) as u32, // Have some buffer time before getting 5 events
)
@@ -738,7 +647,7 @@ mod sync_committee_service {
);
// Should be unsubscribed at the end.
assert_eq!(sync_committee_service.subscription_count(), 0);
assert_eq!(subnet_service.subscriptions().count(), 0);
}
#[tokio::test]
@@ -749,21 +658,22 @@ mod sync_committee_service {
let sync_committee_indices = vec![1];
// create the attestation service and subscriptions
let mut sync_committee_service = get_sync_committee_service();
let mut subnet_service = get_subnet_service();
// Get the initial events from permanent subnet subscriptions
let _events = get_events(&mut subnet_service, None, 1).await;
let subscriptions = vec![SyncCommitteeSubscription {
validator_index,
sync_committee_indices: sync_committee_indices.clone(),
until_epoch,
}];
let subscriptions =
std::iter::once(Subscription::SyncCommittee(SyncCommitteeSubscription {
validator_index,
sync_committee_indices: sync_committee_indices.clone(),
until_epoch,
}));
// submit the subscriptions
sync_committee_service
.validator_subscriptions(subscriptions)
.unwrap();
subnet_service.validator_subscriptions(subscriptions);
// Get all immediate events (won't include unsubscriptions)
let events = get_events(&mut sync_committee_service, None, 1).await;
let events = get_events(&mut subnet_service, None, 1).await;
matches::assert_matches!(
events[..],
[
@@ -777,28 +687,30 @@ mod sync_committee_service {
// Event 1 is a duplicate of an existing subscription
// Event 2 is the same subscription with lower `until_epoch` than the existing subscription
let subscriptions = vec![
SyncCommitteeSubscription {
Subscription::SyncCommittee(SyncCommitteeSubscription {
validator_index,
sync_committee_indices: sync_committee_indices.clone(),
until_epoch,
},
SyncCommitteeSubscription {
}),
Subscription::SyncCommittee(SyncCommitteeSubscription {
validator_index,
sync_committee_indices: sync_committee_indices.clone(),
until_epoch: until_epoch - 1,
},
}),
];
// submit the subscriptions
sync_committee_service
.validator_subscriptions(subscriptions)
.unwrap();
subnet_service.validator_subscriptions(subscriptions.into_iter());
// Get all immediate events (won't include unsubscriptions)
let events = get_events(&mut sync_committee_service, None, 1).await;
let events = get_events(&mut subnet_service, None, 1).await;
matches::assert_matches!(events[..], [SubnetServiceMessage::DiscoverPeers(_),]);
// Should be unsubscribed at the end.
assert_eq!(sync_committee_service.subscription_count(), 1);
let sync_committee_subscriptions = subnet_service
.subscriptions()
.filter(|s| matches!(s, Subnet::SyncCommittee(_)))
.count();
assert_eq!(sync_committee_subscriptions, 1);
}
}