mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-17 12:58:31 +00:00
merge upstream
This commit is contained in:
@@ -49,7 +49,5 @@ operation_pool = { path = "../operation_pool" }
|
||||
execution_layer = { path = "../execution_layer" }
|
||||
|
||||
[features]
|
||||
deterministic_long_lived_attnets = [ "ethereum-types" ]
|
||||
spec-minimal = ["beacon_chain/spec-minimal"]
|
||||
fork_from_env = ["beacon_chain/fork_from_env"]
|
||||
# default = ["deterministic_long_lived_attnets"]
|
||||
|
||||
@@ -7,8 +7,9 @@ use crate::beacon_processor::DuplicateCache;
|
||||
use crate::metrics;
|
||||
use crate::sync::manager::{BlockProcessType, ResponseType, SyncMessage};
|
||||
use crate::sync::{BatchProcessResult, ChainId};
|
||||
use beacon_chain::blob_verification::AsBlock;
|
||||
use beacon_chain::blob_verification::BlockWrapper;
|
||||
use beacon_chain::blob_verification::{AsBlock, MaybeAvailableBlock};
|
||||
use beacon_chain::data_availability_checker::AvailabilityCheckError;
|
||||
use beacon_chain::{
|
||||
observed_block_producers::Error as ObserveError, validator_monitor::get_block_delay_ms,
|
||||
BeaconChainError, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError,
|
||||
@@ -18,7 +19,6 @@ use beacon_chain::{AvailabilityProcessingStatus, CountUnrealized};
|
||||
use lighthouse_network::PeerAction;
|
||||
use slog::{debug, error, info, warn};
|
||||
use slot_clock::SlotClock;
|
||||
use std::sync::Arc;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use tokio::sync::mpsc;
|
||||
use types::blob_sidecar::FixedBlobSidecarList;
|
||||
@@ -311,20 +311,19 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
let start_slot = downloaded_blocks.first().map(|b| b.slot().as_u64());
|
||||
let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64());
|
||||
let sent_blocks = downloaded_blocks.len();
|
||||
let n_blobs = downloaded_blocks
|
||||
.iter()
|
||||
.map(|wrapped| wrapped.n_blobs())
|
||||
.sum::<usize>();
|
||||
|
||||
let unwrapped = downloaded_blocks
|
||||
.into_iter()
|
||||
//FIXME(sean) handle blobs in backfill
|
||||
.map(|block| block.block_cloned())
|
||||
.collect();
|
||||
|
||||
match self.process_backfill_blocks(unwrapped) {
|
||||
match self.process_backfill_blocks(downloaded_blocks) {
|
||||
(_, Ok(_)) => {
|
||||
debug!(self.log, "Backfill batch processed";
|
||||
"batch_epoch" => epoch,
|
||||
"first_block_slot" => start_slot,
|
||||
"last_block_slot" => end_slot,
|
||||
"processed_blocks" => sent_blocks,
|
||||
"processed_blobs" => n_blobs,
|
||||
"service"=> "sync");
|
||||
BatchProcessResult::Success {
|
||||
was_non_empty: sent_blocks > 0,
|
||||
@@ -335,6 +334,7 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
"batch_epoch" => epoch,
|
||||
"first_block_slot" => start_slot,
|
||||
"last_block_slot" => end_slot,
|
||||
"processed_blobs" => n_blobs,
|
||||
"error" => %e.message,
|
||||
"service" => "sync");
|
||||
match e.peer_action {
|
||||
@@ -424,19 +424,67 @@ impl<T: BeaconChainTypes> Worker<T> {
|
||||
/// Helper function to process backfill block batches which only consumes the chain and blocks to process.
|
||||
fn process_backfill_blocks(
|
||||
&self,
|
||||
blocks: Vec<Arc<SignedBeaconBlock<T::EthSpec>>>,
|
||||
downloaded_blocks: Vec<BlockWrapper<T::EthSpec>>,
|
||||
) -> (usize, Result<(), ChainSegmentFailed>) {
|
||||
let blinded_blocks = blocks
|
||||
.iter()
|
||||
.map(|full_block| full_block.clone_as_blinded())
|
||||
.map(Arc::new)
|
||||
.collect();
|
||||
match self.chain.import_historical_block_batch(blinded_blocks) {
|
||||
let total_blocks = downloaded_blocks.len();
|
||||
let available_blocks = match downloaded_blocks
|
||||
.into_iter()
|
||||
.map(|block| {
|
||||
self.chain
|
||||
.data_availability_checker
|
||||
.check_availability(block)
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
{
|
||||
Ok(blocks) => blocks
|
||||
.into_iter()
|
||||
.filter_map(|maybe_available| match maybe_available {
|
||||
MaybeAvailableBlock::Available(block) => Some(block),
|
||||
MaybeAvailableBlock::AvailabilityPending(_) => None,
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
Err(e) => match e {
|
||||
AvailabilityCheckError::StoreError(_)
|
||||
| AvailabilityCheckError::KzgNotInitialized => {
|
||||
return (
|
||||
0,
|
||||
Err(ChainSegmentFailed {
|
||||
peer_action: None,
|
||||
message: "Failed to check block availability".into(),
|
||||
}),
|
||||
);
|
||||
}
|
||||
e => {
|
||||
return (
|
||||
0,
|
||||
Err(ChainSegmentFailed {
|
||||
peer_action: Some(PeerAction::LowToleranceError),
|
||||
message: format!("Failed to check block availability : {:?}", e),
|
||||
}),
|
||||
)
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
if available_blocks.len() != total_blocks {
|
||||
return (
|
||||
0,
|
||||
Err(ChainSegmentFailed {
|
||||
peer_action: Some(PeerAction::LowToleranceError),
|
||||
message: format!(
|
||||
"{} out of {} blocks were unavailable",
|
||||
(total_blocks - available_blocks.len()),
|
||||
total_blocks
|
||||
),
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
match self.chain.import_historical_block_batch(available_blocks) {
|
||||
Ok(imported_blocks) => {
|
||||
metrics::inc_counter(
|
||||
&metrics::BEACON_PROCESSOR_BACKFILL_CHAIN_SEGMENT_SUCCESS_TOTAL,
|
||||
);
|
||||
|
||||
(imported_blocks, Ok(()))
|
||||
}
|
||||
Err(error) => {
|
||||
|
||||
@@ -317,8 +317,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
// attestation subnet service
|
||||
let attestation_service = AttestationService::new(
|
||||
beacon_chain.clone(),
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
network_globals.local_enr().node_id().raw().into(),
|
||||
network_globals.local_enr().node_id(),
|
||||
config,
|
||||
&network_log,
|
||||
);
|
||||
|
||||
@@ -3,10 +3,7 @@
|
||||
//! determines whether attestations should be aggregated and/or passed to the beacon node.
|
||||
|
||||
use super::SubnetServiceMessage;
|
||||
#[cfg(any(
|
||||
all(test, feature = "spec-mainnet"),
|
||||
feature = "deterministic_long_lived_attnets"
|
||||
))]
|
||||
#[cfg(all(test, feature = "spec-mainnet"))]
|
||||
use std::collections::HashSet;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::pin::Pin;
|
||||
@@ -17,10 +14,8 @@ use std::time::Duration;
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use delay_map::{HashMapDelay, HashSetDelay};
|
||||
use futures::prelude::*;
|
||||
use lighthouse_network::{NetworkConfig, Subnet, SubnetDiscovery};
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
use rand::seq::SliceRandom;
|
||||
use slog::{debug, error, o, trace, warn};
|
||||
use lighthouse_network::{discv5::enr::NodeId, NetworkConfig, Subnet, SubnetDiscovery};
|
||||
use slog::{debug, error, info, o, trace, warn};
|
||||
use slot_clock::SlotClock;
|
||||
use types::{Attestation, EthSpec, Slot, SubnetId, ValidatorSubscription};
|
||||
|
||||
@@ -30,10 +25,6 @@ use crate::metrics;
|
||||
/// slot is less than this number, skip the peer discovery process.
|
||||
/// Subnet discovery query takes at most 30 secs, 2 slots take 24s.
|
||||
pub(crate) const MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 2;
|
||||
/// The time (in slots) before a last seen validator is considered absent and we unsubscribe from
|
||||
/// the random gossip topics that we subscribed to due to the validator connection.
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
const LAST_SEEN_VALIDATOR_TIMEOUT_SLOTS: u32 = 150;
|
||||
/// The fraction of a slot that we subscribe to a subnet before the required slot.
|
||||
///
|
||||
/// Currently a whole slot ahead.
|
||||
@@ -70,30 +61,23 @@ pub struct AttestationService<T: BeaconChainTypes> {
|
||||
/// Subnets we are currently subscribed to as short lived subscriptions.
|
||||
///
|
||||
/// Once they expire, we unsubscribe from these.
|
||||
/// We subscribe to subnets when we are an aggregator for an exact subnet.
|
||||
short_lived_subscriptions: HashMapDelay<SubnetId, Slot>,
|
||||
|
||||
/// Subnets we are currently subscribed to as long lived subscriptions.
|
||||
///
|
||||
/// We advertise these in our ENR. When these expire, the subnet is removed from our ENR.
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
/// These are required of all beacon nodes. The exact number is determined by the chain
|
||||
/// specification.
|
||||
long_lived_subscriptions: HashSet<SubnetId>,
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
long_lived_subscriptions: HashMapDelay<SubnetId, Slot>,
|
||||
|
||||
/// Short lived subscriptions that need to be done in the future.
|
||||
/// Short lived subscriptions that need to be executed in the future.
|
||||
scheduled_short_lived_subscriptions: HashSetDelay<ExactSubnet>,
|
||||
|
||||
/// A collection timeouts to track the existence of aggregate validator subscriptions at an
|
||||
/// `ExactSubnet`.
|
||||
aggregate_validators_on_subnet: Option<HashSetDelay<ExactSubnet>>,
|
||||
|
||||
/// A collection of seen validators. These dictate how many random subnets we should be
|
||||
/// subscribed to. As these time out, we unsubscribe for the required random subnets and update
|
||||
/// our ENR.
|
||||
/// This is a set of validator indices.
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
known_validators: HashSetDelay<u64>,
|
||||
|
||||
/// The waker for the current thread.
|
||||
waker: Option<std::task::Waker>,
|
||||
|
||||
@@ -103,16 +87,10 @@ pub struct AttestationService<T: BeaconChainTypes> {
|
||||
/// We are always subscribed to all subnets.
|
||||
subscribe_all_subnets: bool,
|
||||
|
||||
/// For how many slots we subscribe to long lived subnets.
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
long_lived_subnet_subscription_slots: u64,
|
||||
|
||||
/// Our Discv5 node_id.
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
node_id: ethereum_types::U256,
|
||||
node_id: NodeId,
|
||||
|
||||
/// Future used to manage subscribing and unsubscribing from long lived subnets.
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
next_long_lived_subscription_event: Pin<Box<tokio::time::Sleep>>,
|
||||
|
||||
/// Whether this node is a block proposer-only node.
|
||||
@@ -125,62 +103,22 @@ pub struct AttestationService<T: BeaconChainTypes> {
|
||||
impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
/* Public functions */
|
||||
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
/// Establish the service based on the passed configuration.
|
||||
pub fn new(
|
||||
beacon_chain: Arc<BeaconChain<T>>,
|
||||
node_id: NodeId,
|
||||
config: &NetworkConfig,
|
||||
log: &slog::Logger,
|
||||
) -> Self {
|
||||
let log = log.new(o!("service" => "attestation_service"));
|
||||
|
||||
// Calculate the random subnet duration from the spec constants.
|
||||
let spec = &beacon_chain.spec;
|
||||
let slot_duration = beacon_chain.slot_clock.slot_duration();
|
||||
let long_lived_subnet_subscription_slots = spec
|
||||
.epochs_per_random_subnet_subscription
|
||||
.saturating_mul(T::EthSpec::slots_per_epoch());
|
||||
let long_lived_subscription_duration = Duration::from_millis(
|
||||
slot_duration.as_millis() as u64 * long_lived_subnet_subscription_slots,
|
||||
);
|
||||
|
||||
// Panics on overflow. Ensure LAST_SEEN_VALIDATOR_TIMEOUT_SLOTS is not too large.
|
||||
let last_seen_val_timeout = slot_duration
|
||||
.checked_mul(LAST_SEEN_VALIDATOR_TIMEOUT_SLOTS)
|
||||
.expect("LAST_SEEN_VALIDATOR_TIMEOUT must not be ridiculously large");
|
||||
|
||||
let track_validators = !config.import_all_attestations;
|
||||
let aggregate_validators_on_subnet =
|
||||
track_validators.then(|| HashSetDelay::new(slot_duration));
|
||||
AttestationService {
|
||||
events: VecDeque::with_capacity(10),
|
||||
beacon_chain,
|
||||
short_lived_subscriptions: HashMapDelay::new(slot_duration),
|
||||
long_lived_subscriptions: HashMapDelay::new(long_lived_subscription_duration),
|
||||
scheduled_short_lived_subscriptions: HashSetDelay::default(),
|
||||
aggregate_validators_on_subnet,
|
||||
known_validators: HashSetDelay::new(last_seen_val_timeout),
|
||||
waker: None,
|
||||
discovery_disabled: config.disable_discovery,
|
||||
proposer_only: config.proposer_only,
|
||||
subscribe_all_subnets: config.subscribe_all_subnets,
|
||||
long_lived_subnet_subscription_slots,
|
||||
log,
|
||||
if config.subscribe_all_subnets {
|
||||
slog::info!(log, "Subscribing to all subnets");
|
||||
} else {
|
||||
slog::info!(log, "Deterministic long lived subnets enabled"; "subnets_per_node" => beacon_chain.spec.subnets_per_node, "subscription_duration_in_epochs" => beacon_chain.spec.epochs_per_subnet_subscription);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
pub fn new(
|
||||
beacon_chain: Arc<BeaconChain<T>>,
|
||||
node_id: ethereum_types::U256,
|
||||
config: &NetworkConfig,
|
||||
log: &slog::Logger,
|
||||
) -> Self {
|
||||
let log = log.new(o!("service" => "attestation_service"));
|
||||
|
||||
// Calculate the random subnet duration from the spec constants.
|
||||
let slot_duration = beacon_chain.slot_clock.slot_duration();
|
||||
|
||||
slog::info!(log, "Deterministic long lived subnets enabled"; "subnets_per_node" => beacon_chain.spec.subnets_per_node);
|
||||
|
||||
let track_validators = !config.import_all_attestations;
|
||||
let aggregate_validators_on_subnet =
|
||||
@@ -201,9 +139,15 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
// value with a smarter timing
|
||||
Box::pin(tokio::time::sleep(Duration::from_secs(1)))
|
||||
},
|
||||
proposer_only: config.proposer_only,
|
||||
log,
|
||||
};
|
||||
service.recompute_long_lived_subnets();
|
||||
|
||||
// If we are not subscribed to all subnets, handle the deterministic set of subnets
|
||||
if !config.subscribe_all_subnets {
|
||||
service.recompute_long_lived_subnets();
|
||||
}
|
||||
|
||||
service
|
||||
}
|
||||
|
||||
@@ -213,20 +157,12 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
if self.subscribe_all_subnets {
|
||||
self.beacon_chain.spec.attestation_subnet_count as usize
|
||||
} else {
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
let count = self
|
||||
.short_lived_subscriptions
|
||||
.keys()
|
||||
.chain(self.long_lived_subscriptions.iter())
|
||||
.collect::<HashSet<_>>()
|
||||
.len();
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
let count = self
|
||||
.short_lived_subscriptions
|
||||
.keys()
|
||||
.chain(self.long_lived_subscriptions.keys())
|
||||
.collect::<HashSet<_>>()
|
||||
.len();
|
||||
count
|
||||
}
|
||||
}
|
||||
@@ -239,20 +175,20 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
subscription_kind: SubscriptionKind,
|
||||
) -> bool {
|
||||
match subscription_kind {
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
SubscriptionKind::LongLived => self.long_lived_subscriptions.contains(subnet_id),
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
SubscriptionKind::LongLived => self.long_lived_subscriptions.contains_key(subnet_id),
|
||||
SubscriptionKind::ShortLived => self.short_lived_subscriptions.contains_key(subnet_id),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn long_lived_subscriptions(&self) -> &HashSet<SubnetId> {
|
||||
&self.long_lived_subscriptions
|
||||
}
|
||||
|
||||
/// Processes a list of validator subscriptions.
|
||||
///
|
||||
/// This will:
|
||||
/// - Register new validators as being known.
|
||||
/// - Subscribe to the required number of random subnets.
|
||||
/// - Update the local ENR for new random subnets due to seeing new validators.
|
||||
/// - Search for peers for required subnets.
|
||||
/// - Request subscriptions for subnets on specific slots when required.
|
||||
/// - Build the timeouts for each of these events.
|
||||
@@ -270,18 +206,17 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
|
||||
// Maps each subnet_id subscription to it's highest slot
|
||||
let mut subnets_to_discover: HashMap<SubnetId, Slot> = HashMap::new();
|
||||
|
||||
// Registers the validator with the attestation service.
|
||||
for subscription in subscriptions {
|
||||
metrics::inc_counter(&metrics::SUBNET_SUBSCRIPTION_REQUESTS);
|
||||
|
||||
// Registers the validator with the attestation service.
|
||||
// This will subscribe to long-lived random subnets if required.
|
||||
trace!(self.log,
|
||||
"Validator subscription";
|
||||
"subscription" => ?subscription,
|
||||
);
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
self.add_known_validator(subscription.validator_index);
|
||||
|
||||
// Compute the subnet that is associated with this subscription
|
||||
let subnet_id = match SubnetId::compute_subnet::<T::EthSpec>(
|
||||
subscription.slot,
|
||||
subscription.attestation_committee_index,
|
||||
@@ -319,7 +254,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
|
||||
if subscription.is_aggregator {
|
||||
metrics::inc_counter(&metrics::SUBNET_SUBSCRIPTION_AGGREGATOR_REQUESTS);
|
||||
if let Err(e) = self.subscribe_to_subnet(exact_subnet) {
|
||||
if let Err(e) = self.subscribe_to_short_lived_subnet(exact_subnet) {
|
||||
warn!(self.log,
|
||||
"Subscription to subnet error";
|
||||
"error" => e,
|
||||
@@ -350,14 +285,13 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
fn recompute_long_lived_subnets(&mut self) {
|
||||
// Ensure the next computation is scheduled even if assigning subnets fails.
|
||||
let next_subscription_event = self
|
||||
.recompute_long_lived_subnets_inner()
|
||||
.unwrap_or_else(|_| self.beacon_chain.slot_clock.slot_duration());
|
||||
|
||||
debug!(self.log, "Recomputing deterministic long lived attnets");
|
||||
debug!(self.log, "Recomputing deterministic long lived subnets");
|
||||
self.next_long_lived_subscription_event =
|
||||
Box::pin(tokio::time::sleep(next_subscription_event));
|
||||
|
||||
@@ -368,14 +302,13 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
|
||||
/// Gets the long lived subnets the node should be subscribed to during the current epoch and
|
||||
/// the remaining duration for which they remain valid.
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
fn recompute_long_lived_subnets_inner(&mut self) -> Result<Duration, ()> {
|
||||
let current_epoch = self.beacon_chain.epoch().map_err(
|
||||
|e| error!(self.log, "Failed to get the current epoch from clock"; "err" => ?e),
|
||||
)?;
|
||||
|
||||
let (subnets, next_subscription_epoch) = SubnetId::compute_subnets_for_epoch::<T::EthSpec>(
|
||||
self.node_id,
|
||||
self.node_id.raw().into(),
|
||||
current_epoch,
|
||||
&self.beacon_chain.spec,
|
||||
)
|
||||
@@ -399,17 +332,12 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
Ok(next_subscription_event)
|
||||
}
|
||||
|
||||
#[cfg(all(test, feature = "deterministic_long_lived_attnets"))]
|
||||
pub fn update_long_lived_subnets_testing(&mut self, subnets: HashSet<SubnetId>) {
|
||||
self.update_long_lived_subnets(subnets)
|
||||
}
|
||||
|
||||
/// Updates the long lived subnets.
|
||||
///
|
||||
/// New subnets are registered as subscribed, removed subnets as unsubscribed and the Enr
|
||||
/// updated accordingly.
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
fn update_long_lived_subnets(&mut self, mut subnets: HashSet<SubnetId>) {
|
||||
info!(self.log, "Subscribing to long-lived subnets"; "subnets" => ?subnets.iter().collect::<Vec<_>>());
|
||||
for subnet in &subnets {
|
||||
// Add the events for those subnets that are new as long lived subscriptions.
|
||||
if !self.long_lived_subscriptions.contains(subnet) {
|
||||
@@ -433,28 +361,15 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
}
|
||||
}
|
||||
|
||||
// Check for subnets that are being removed
|
||||
// Update the long_lived_subnets set and check for subnets that are being removed
|
||||
std::mem::swap(&mut self.long_lived_subscriptions, &mut subnets);
|
||||
for subnet in subnets {
|
||||
if !self.long_lived_subscriptions.contains(&subnet) {
|
||||
if !self.short_lived_subscriptions.contains_key(&subnet) {
|
||||
debug!(self.log, "Unsubscribing from subnet"; "subnet" => ?subnet, "subscription_kind" => ?SubscriptionKind::LongLived);
|
||||
self.queue_event(SubnetServiceMessage::Unsubscribe(Subnet::Attestation(
|
||||
subnet,
|
||||
)));
|
||||
}
|
||||
|
||||
self.queue_event(SubnetServiceMessage::EnrRemove(Subnet::Attestation(subnet)));
|
||||
self.handle_removed_subnet(subnet, SubscriptionKind::LongLived);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Overwrites the long lived subscriptions for testing.
|
||||
#[cfg(all(test, feature = "deterministic_long_lived_attnets"))]
|
||||
pub fn set_long_lived_subscriptions(&mut self, subnets: HashSet<SubnetId>) {
|
||||
self.long_lived_subscriptions = subnets
|
||||
}
|
||||
|
||||
/// Checks if we have subscribed aggregate validators for the subnet. If not, checks the gossip
|
||||
/// verification, re-propagates and returns false.
|
||||
pub fn should_process_attestation(
|
||||
@@ -538,7 +453,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
}
|
||||
|
||||
// Subscribes to the subnet if it should be done immediately, or schedules it if required.
|
||||
fn subscribe_to_subnet(
|
||||
fn subscribe_to_short_lived_subnet(
|
||||
&mut self,
|
||||
ExactSubnet { subnet_id, slot }: ExactSubnet,
|
||||
) -> Result<(), &'static str> {
|
||||
@@ -567,12 +482,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
// immediately.
|
||||
if time_to_subscription_start.is_zero() {
|
||||
// This is a current or past slot, we subscribe immediately.
|
||||
self.subscribe_to_subnet_immediately(
|
||||
subnet_id,
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
SubscriptionKind::ShortLived,
|
||||
slot + 1,
|
||||
)?;
|
||||
self.subscribe_to_short_lived_subnet_immediately(subnet_id, slot + 1)?;
|
||||
} else {
|
||||
// This is a future slot, schedule subscribing.
|
||||
trace!(self.log, "Scheduling subnet subscription"; "subnet" => ?subnet_id, "time_to_subscription_start" => ?time_to_subscription_start);
|
||||
@@ -583,79 +493,6 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Updates the `known_validators` mapping and subscribes to long lived subnets if required.
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
fn add_known_validator(&mut self, validator_index: u64) {
|
||||
let previously_known = self.known_validators.contains_key(&validator_index);
|
||||
// Add the new validator or update the current timeout for a known validator.
|
||||
self.known_validators.insert(validator_index);
|
||||
if !previously_known {
|
||||
// New validator has subscribed.
|
||||
// Subscribe to random topics and update the ENR if needed.
|
||||
self.subscribe_to_random_subnets();
|
||||
}
|
||||
}
|
||||
|
||||
/// Subscribe to long-lived random subnets and update the local ENR bitfield.
|
||||
/// The number of subnets to subscribe depends on the number of active validators and number of
|
||||
/// current subscriptions.
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
fn subscribe_to_random_subnets(&mut self) {
|
||||
if self.subscribe_all_subnets {
|
||||
// This case is not handled by this service.
|
||||
return;
|
||||
}
|
||||
|
||||
let max_subnets = self.beacon_chain.spec.attestation_subnet_count;
|
||||
// Calculate how many subnets we need,
|
||||
let required_long_lived_subnets = {
|
||||
let subnets_for_validators = self
|
||||
.known_validators
|
||||
.len()
|
||||
.saturating_mul(self.beacon_chain.spec.random_subnets_per_validator as usize);
|
||||
subnets_for_validators // How many subnets we need
|
||||
.min(max_subnets as usize) // Capped by the max
|
||||
.saturating_sub(self.long_lived_subscriptions.len()) // Minus those we have
|
||||
};
|
||||
|
||||
if required_long_lived_subnets == 0 {
|
||||
// Nothing to do.
|
||||
return;
|
||||
}
|
||||
|
||||
// Build a list of the subnets that we are not currently advertising.
|
||||
let available_subnets = (0..max_subnets)
|
||||
.map(SubnetId::new)
|
||||
.filter(|subnet_id| !self.long_lived_subscriptions.contains_key(subnet_id))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let subnets_to_subscribe: Vec<_> = available_subnets
|
||||
.choose_multiple(&mut rand::thread_rng(), required_long_lived_subnets)
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
// Calculate in which slot does this subscription end.
|
||||
let end_slot = match self.beacon_chain.slot_clock.now() {
|
||||
Some(slot) => slot + self.long_lived_subnet_subscription_slots,
|
||||
None => {
|
||||
return debug!(
|
||||
self.log,
|
||||
"Failed to calculate end slot of long lived subnet subscriptions."
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
for subnet_id in &subnets_to_subscribe {
|
||||
if let Err(e) = self.subscribe_to_subnet_immediately(
|
||||
*subnet_id,
|
||||
SubscriptionKind::LongLived,
|
||||
end_slot,
|
||||
) {
|
||||
debug!(self.log, "Failed to subscribe to long lived subnet"; "subnet" => ?subnet_id, "err" => e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* A collection of functions that handle the various timeouts */
|
||||
|
||||
/// Registers a subnet as subscribed.
|
||||
@@ -665,11 +502,9 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
/// out the appropriate events.
|
||||
///
|
||||
/// On determinist long lived subnets, this is only used for short lived subscriptions.
|
||||
fn subscribe_to_subnet_immediately(
|
||||
fn subscribe_to_short_lived_subnet_immediately(
|
||||
&mut self,
|
||||
subnet_id: SubnetId,
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
subscription_kind: SubscriptionKind,
|
||||
end_slot: Slot,
|
||||
) -> Result<(), &'static str> {
|
||||
if self.subscribe_all_subnets {
|
||||
@@ -688,25 +523,12 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
return Err("Time when subscription would end has already passed.");
|
||||
}
|
||||
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
let subscription_kind = SubscriptionKind::ShortLived;
|
||||
|
||||
// We need to check and add a subscription for the right kind, regardless of the presence
|
||||
// of the subnet as a subscription of the other kind. This is mainly since long lived
|
||||
// subscriptions can be removed at any time when a validator goes offline.
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
let (subscriptions, already_subscribed_as_other_kind) = match subscription_kind {
|
||||
SubscriptionKind::ShortLived => (
|
||||
&mut self.short_lived_subscriptions,
|
||||
self.long_lived_subscriptions.contains_key(&subnet_id),
|
||||
),
|
||||
SubscriptionKind::LongLived => (
|
||||
&mut self.long_lived_subscriptions,
|
||||
self.short_lived_subscriptions.contains_key(&subnet_id),
|
||||
),
|
||||
};
|
||||
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
let (subscriptions, already_subscribed_as_other_kind) = (
|
||||
&mut self.short_lived_subscriptions,
|
||||
self.long_lived_subscriptions.contains(&subnet_id),
|
||||
@@ -741,57 +563,19 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
subnet_id,
|
||||
)));
|
||||
}
|
||||
|
||||
// If this is a new long lived subscription, send out the appropriate events.
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
if SubscriptionKind::LongLived == subscription_kind {
|
||||
let subnet = Subnet::Attestation(subnet_id);
|
||||
// Advertise this subnet in our ENR.
|
||||
self.long_lived_subscriptions.insert_at(
|
||||
subnet_id,
|
||||
end_slot,
|
||||
time_to_subscription_end,
|
||||
);
|
||||
self.queue_event(SubnetServiceMessage::EnrAdd(subnet));
|
||||
|
||||
if !self.discovery_disabled {
|
||||
self.queue_event(SubnetServiceMessage::DiscoverPeers(vec![
|
||||
SubnetDiscovery {
|
||||
subnet,
|
||||
min_ttl: None,
|
||||
},
|
||||
]))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// A random subnet has expired.
|
||||
///
|
||||
/// This function selects a new subnet to join, or extends the expiry if there are no more
|
||||
/// available subnets to choose from.
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
fn handle_random_subnet_expiry(&mut self, subnet_id: SubnetId) {
|
||||
self.handle_removed_subnet(subnet_id, SubscriptionKind::LongLived);
|
||||
|
||||
// Remove the ENR bitfield bit and choose a new random on from the available subnets
|
||||
// Subscribe to a new random subnet.
|
||||
self.subscribe_to_random_subnets();
|
||||
}
|
||||
|
||||
// Unsubscribes from a subnet that was removed if it does not continue to exist as a
|
||||
// subscription of the other kind. For long lived subscriptions, it also removes the
|
||||
// advertisement from our ENR.
|
||||
fn handle_removed_subnet(&mut self, subnet_id: SubnetId, subscription_kind: SubscriptionKind) {
|
||||
let exists_in_other_subscriptions = match subscription_kind {
|
||||
SubscriptionKind::LongLived => self.short_lived_subscriptions.contains_key(&subnet_id),
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
SubscriptionKind::ShortLived => self.long_lived_subscriptions.contains(&subnet_id),
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
SubscriptionKind::ShortLived => self.long_lived_subscriptions.contains_key(&subnet_id),
|
||||
};
|
||||
|
||||
if !exists_in_other_subscriptions {
|
||||
@@ -809,48 +593,6 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
/// A known validator has not sent a subscription in a while. They are considered offline and the
|
||||
/// beacon node no longer needs to be subscribed to the allocated random subnets.
|
||||
///
|
||||
/// We don't keep track of a specific validator to random subnet, rather the ratio of active
|
||||
/// validators to random subnets. So when a validator goes offline, we can simply remove the
|
||||
/// allocated amount of random subnets.
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
fn handle_known_validator_expiry(&mut self) {
|
||||
// Calculate how many subnets should we remove.
|
||||
let extra_subnet_count = {
|
||||
let max_subnets = self.beacon_chain.spec.attestation_subnet_count;
|
||||
let subnets_for_validators = self
|
||||
.known_validators
|
||||
.len()
|
||||
.saturating_mul(self.beacon_chain.spec.random_subnets_per_validator as usize)
|
||||
.min(max_subnets as usize);
|
||||
|
||||
self.long_lived_subscriptions
|
||||
.len()
|
||||
.saturating_sub(subnets_for_validators)
|
||||
};
|
||||
|
||||
if extra_subnet_count == 0 {
|
||||
// Nothing to do
|
||||
return;
|
||||
}
|
||||
|
||||
let advertised_subnets = self
|
||||
.long_lived_subscriptions
|
||||
.keys()
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
let to_remove_subnets = advertised_subnets
|
||||
.choose_multiple(&mut rand::thread_rng(), extra_subnet_count)
|
||||
.cloned();
|
||||
|
||||
for subnet_id in to_remove_subnets {
|
||||
self.long_lived_subscriptions.remove(&subnet_id);
|
||||
self.handle_removed_subnet(subnet_id, SubscriptionKind::LongLived);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> Stream for AttestationService<T> {
|
||||
@@ -871,37 +613,34 @@ impl<T: BeaconChainTypes> Stream for AttestationService<T> {
|
||||
return Poll::Ready(Some(event));
|
||||
}
|
||||
|
||||
// Process first any known validator expiries, since these affect how many long lived
|
||||
// subnets we need.
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
match self.known_validators.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(Ok(_validator_index))) => {
|
||||
self.handle_known_validator_expiry();
|
||||
// If we aren't subscribed to all subnets, handle the deterministic long-lived subnets
|
||||
if !self.subscribe_all_subnets {
|
||||
match self.next_long_lived_subscription_event.as_mut().poll(cx) {
|
||||
Poll::Ready(_) => {
|
||||
self.recompute_long_lived_subnets();
|
||||
// We re-wake the task as there could be other subscriptions to process
|
||||
self.waker
|
||||
.as_ref()
|
||||
.expect("Waker has been set")
|
||||
.wake_by_ref();
|
||||
}
|
||||
Poll::Pending => {}
|
||||
}
|
||||
Poll::Ready(Some(Err(e))) => {
|
||||
error!(self.log, "Failed to check for random subnet cycles"; "error"=> e);
|
||||
}
|
||||
Poll::Ready(None) | Poll::Pending => {}
|
||||
}
|
||||
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
match self.next_long_lived_subscription_event.as_mut().poll(cx) {
|
||||
Poll::Ready(_) => self.recompute_long_lived_subnets(),
|
||||
Poll::Pending => {}
|
||||
}
|
||||
|
||||
// Process scheduled subscriptions that might be ready, since those can extend a soon to
|
||||
// expire subscription.
|
||||
match self.scheduled_short_lived_subscriptions.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(Ok(ExactSubnet { subnet_id, slot }))) => {
|
||||
if let Err(e) = self.subscribe_to_subnet_immediately(
|
||||
subnet_id,
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
SubscriptionKind::ShortLived,
|
||||
slot + 1,
|
||||
) {
|
||||
if let Err(e) =
|
||||
self.subscribe_to_short_lived_subnet_immediately(subnet_id, slot + 1)
|
||||
{
|
||||
debug!(self.log, "Failed to subscribe to short lived subnet"; "subnet" => ?subnet_id, "err" => e);
|
||||
}
|
||||
self.waker
|
||||
.as_ref()
|
||||
.expect("Waker has been set")
|
||||
.wake_by_ref();
|
||||
}
|
||||
Poll::Ready(Some(Err(e))) => {
|
||||
error!(self.log, "Failed to check for scheduled subnet subscriptions"; "error"=> e);
|
||||
@@ -913,6 +652,11 @@ impl<T: BeaconChainTypes> Stream for AttestationService<T> {
|
||||
match self.short_lived_subscriptions.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(Ok((subnet_id, _end_slot)))) => {
|
||||
self.handle_removed_subnet(subnet_id, SubscriptionKind::ShortLived);
|
||||
// We re-wake the task as there could be other subscriptions to process
|
||||
self.waker
|
||||
.as_ref()
|
||||
.expect("Waker has been set")
|
||||
.wake_by_ref();
|
||||
}
|
||||
Poll::Ready(Some(Err(e))) => {
|
||||
error!(self.log, "Failed to check for subnet unsubscription times"; "error"=> e);
|
||||
@@ -920,18 +664,6 @@ impl<T: BeaconChainTypes> Stream for AttestationService<T> {
|
||||
Poll::Ready(None) | Poll::Pending => {}
|
||||
}
|
||||
|
||||
// Process any random subnet expiries.
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
match self.long_lived_subscriptions.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(Ok((subnet_id, _end_slot)))) => {
|
||||
self.handle_random_subnet_expiry(subnet_id)
|
||||
}
|
||||
Poll::Ready(Some(Err(e))) => {
|
||||
error!(self.log, "Failed to check for random subnet cycles"; "error"=> e);
|
||||
}
|
||||
Poll::Ready(None) | Poll::Pending => {}
|
||||
}
|
||||
|
||||
// Poll to remove entries on expiration, no need to act on expiration events.
|
||||
if let Some(tracked_vals) = self.aggregate_validators_on_subnet.as_mut() {
|
||||
if let Poll::Ready(Some(Err(e))) = tracked_vals.poll_next_unpin(cx) {
|
||||
|
||||
@@ -127,10 +127,7 @@ fn get_attestation_service(
|
||||
|
||||
AttestationService::new(
|
||||
beacon_chain,
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
lighthouse_network::discv5::enr::NodeId::random()
|
||||
.raw()
|
||||
.into(),
|
||||
lighthouse_network::discv5::enr::NodeId::random(),
|
||||
&config,
|
||||
&log,
|
||||
)
|
||||
@@ -180,9 +177,6 @@ async fn get_events<S: Stream<Item = SubnetServiceMessage> + Unpin>(
|
||||
|
||||
mod attestation_service {
|
||||
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
use std::collections::HashSet;
|
||||
|
||||
#[cfg(not(windows))]
|
||||
use crate::subnet_service::attestation_subnets::MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD;
|
||||
|
||||
@@ -193,8 +187,8 @@ mod attestation_service {
|
||||
attestation_committee_index: CommitteeIndex,
|
||||
slot: Slot,
|
||||
committee_count_at_slot: u64,
|
||||
is_aggregator: bool,
|
||||
) -> ValidatorSubscription {
|
||||
let is_aggregator = true;
|
||||
ValidatorSubscription {
|
||||
validator_index,
|
||||
attestation_committee_index,
|
||||
@@ -204,11 +198,11 @@ mod attestation_service {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
fn get_subscriptions(
|
||||
validator_count: u64,
|
||||
slot: Slot,
|
||||
committee_count_at_slot: u64,
|
||||
is_aggregator: bool,
|
||||
) -> Vec<ValidatorSubscription> {
|
||||
(0..validator_count)
|
||||
.map(|validator_index| {
|
||||
@@ -217,6 +211,7 @@ mod attestation_service {
|
||||
validator_index,
|
||||
slot,
|
||||
committee_count_at_slot,
|
||||
is_aggregator,
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
@@ -230,6 +225,7 @@ mod attestation_service {
|
||||
// Keep a low subscription slot so that there are no additional subnet discovery events.
|
||||
let subscription_slot = 0;
|
||||
let committee_count = 1;
|
||||
let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize;
|
||||
|
||||
// create the attestation service and subscriptions
|
||||
let mut attestation_service = get_attestation_service(None);
|
||||
@@ -244,6 +240,7 @@ mod attestation_service {
|
||||
committee_index,
|
||||
current_slot + Slot::new(subscription_slot),
|
||||
committee_count,
|
||||
true,
|
||||
)];
|
||||
|
||||
// submit the subscriptions
|
||||
@@ -267,16 +264,19 @@ mod attestation_service {
|
||||
// Wait for 1 slot duration to get the unsubscription event
|
||||
let events = get_events(
|
||||
&mut attestation_service,
|
||||
Some(5),
|
||||
Some(subnets_per_node * 3 + 2),
|
||||
(MainnetEthSpec::slots_per_epoch() * 3) as u32,
|
||||
)
|
||||
.await;
|
||||
matches::assert_matches!(
|
||||
events[..3],
|
||||
events[..6],
|
||||
[
|
||||
SubnetServiceMessage::Subscribe(_any1),
|
||||
SubnetServiceMessage::EnrAdd(_any3),
|
||||
SubnetServiceMessage::DiscoverPeers(_),
|
||||
SubnetServiceMessage::Subscribe(_),
|
||||
SubnetServiceMessage::EnrAdd(_),
|
||||
SubnetServiceMessage::DiscoverPeers(_),
|
||||
]
|
||||
);
|
||||
|
||||
@@ -285,10 +285,10 @@ mod attestation_service {
|
||||
if !attestation_service
|
||||
.is_subscribed(&subnet_id, attestation_subnets::SubscriptionKind::LongLived)
|
||||
{
|
||||
assert_eq!(expected[..], events[3..]);
|
||||
assert_eq!(expected[..], events[subnets_per_node * 3..]);
|
||||
}
|
||||
// Should be subscribed to only 1 long lived subnet after unsubscription.
|
||||
assert_eq!(attestation_service.subscription_count(), 1);
|
||||
// Should be subscribed to only subnets_per_node long lived subnet after unsubscription.
|
||||
assert_eq!(attestation_service.subscription_count(), subnets_per_node);
|
||||
}
|
||||
|
||||
/// Test to verify that we are not unsubscribing to a subnet before a required subscription.
|
||||
@@ -298,6 +298,7 @@ mod attestation_service {
|
||||
// subscription config
|
||||
let validator_index = 1;
|
||||
let committee_count = 1;
|
||||
let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize;
|
||||
|
||||
// Makes 2 validator subscriptions to the same subnet but at different slots.
|
||||
// There should be just 1 unsubscription event for the later slot subscription (subscription_slot2).
|
||||
@@ -319,6 +320,7 @@ mod attestation_service {
|
||||
com1,
|
||||
current_slot + Slot::new(subscription_slot1),
|
||||
committee_count,
|
||||
true,
|
||||
);
|
||||
|
||||
let sub2 = get_subscription(
|
||||
@@ -326,6 +328,7 @@ mod attestation_service {
|
||||
com2,
|
||||
current_slot + Slot::new(subscription_slot2),
|
||||
committee_count,
|
||||
true,
|
||||
);
|
||||
|
||||
let subnet_id1 = SubnetId::compute_subnet::<MainnetEthSpec>(
|
||||
@@ -367,16 +370,22 @@ mod attestation_service {
|
||||
|
||||
let expected = SubnetServiceMessage::Subscribe(Subnet::Attestation(subnet_id1));
|
||||
|
||||
// Should be still subscribed to 1 long lived and 1 short lived subnet if both are
|
||||
// Should be still subscribed to 2 long lived and up to 1 short lived subnet if both are
|
||||
// different.
|
||||
if !attestation_service.is_subscribed(
|
||||
&subnet_id1,
|
||||
attestation_subnets::SubscriptionKind::LongLived,
|
||||
) {
|
||||
assert_eq!(expected, events[3]);
|
||||
assert_eq!(attestation_service.subscription_count(), 2);
|
||||
// The index is 3*subnets_per_node (because we subscribe + discover + enr per long lived
|
||||
// subnet) + 1
|
||||
let index = 3 * subnets_per_node;
|
||||
assert_eq!(expected, events[index]);
|
||||
assert_eq!(
|
||||
attestation_service.subscription_count(),
|
||||
subnets_per_node + 1
|
||||
);
|
||||
} else {
|
||||
assert_eq!(attestation_service.subscription_count(), 1);
|
||||
assert!(attestation_service.subscription_count() == subnets_per_node);
|
||||
}
|
||||
|
||||
// Get event for 1 more slot duration, we should get the unsubscribe event now.
|
||||
@@ -396,17 +405,17 @@ mod attestation_service {
|
||||
);
|
||||
}
|
||||
|
||||
// Should be subscribed to only 1 long lived subnet after unsubscription.
|
||||
assert_eq!(attestation_service.subscription_count(), 1);
|
||||
// Should be subscribed 2 long lived subnet after unsubscription.
|
||||
assert_eq!(attestation_service.subscription_count(), subnets_per_node);
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
#[tokio::test]
|
||||
async fn subscribe_all_random_subnets() {
|
||||
async fn subscribe_all_subnets() {
|
||||
let attestation_subnet_count = MainnetEthSpec::default_spec().attestation_subnet_count;
|
||||
let subscription_slot = 10;
|
||||
let subscription_slot = 3;
|
||||
let subscription_count = attestation_subnet_count;
|
||||
let committee_count = 1;
|
||||
let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize;
|
||||
|
||||
// create the attestation service and subscriptions
|
||||
let mut attestation_service = get_attestation_service(None);
|
||||
@@ -420,6 +429,7 @@ mod attestation_service {
|
||||
subscription_count,
|
||||
current_slot + subscription_slot,
|
||||
committee_count,
|
||||
true,
|
||||
);
|
||||
|
||||
// submit the subscriptions
|
||||
@@ -427,42 +437,52 @@ mod attestation_service {
|
||||
.validator_subscriptions(subscriptions)
|
||||
.unwrap();
|
||||
|
||||
let events = get_events(&mut attestation_service, None, 3).await;
|
||||
let events = get_events(&mut attestation_service, Some(131), 10).await;
|
||||
let mut discover_peer_count = 0;
|
||||
let mut enr_add_count = 0;
|
||||
let mut unexpected_msg_count = 0;
|
||||
let mut unsubscribe_event_count = 0;
|
||||
|
||||
for event in &events {
|
||||
match event {
|
||||
SubnetServiceMessage::DiscoverPeers(_) => discover_peer_count += 1,
|
||||
SubnetServiceMessage::Subscribe(_any_subnet) => {}
|
||||
SubnetServiceMessage::EnrAdd(_any_subnet) => enr_add_count += 1,
|
||||
SubnetServiceMessage::Unsubscribe(_) => unsubscribe_event_count += 1,
|
||||
_ => unexpected_msg_count += 1,
|
||||
}
|
||||
}
|
||||
|
||||
// There should be a Subscribe Event, and Enr Add event and a DiscoverPeers event for each
|
||||
// long-lived subnet initially. The next event should be a bulk discovery event.
|
||||
let bulk_discovery_index = 3 * subnets_per_node;
|
||||
// The bulk discovery request length should be equal to validator_count
|
||||
let bulk_discovery_event = events.last().unwrap();
|
||||
let bulk_discovery_event = &events[bulk_discovery_index];
|
||||
if let SubnetServiceMessage::DiscoverPeers(d) = bulk_discovery_event {
|
||||
assert_eq!(d.len(), attestation_subnet_count as usize);
|
||||
} else {
|
||||
panic!("Unexpected event {:?}", bulk_discovery_event);
|
||||
}
|
||||
|
||||
// 64 `DiscoverPeer` requests of length 1 corresponding to random subnets
|
||||
// 64 `DiscoverPeer` requests of length 1 corresponding to deterministic subnets
|
||||
// and 1 `DiscoverPeer` request corresponding to bulk subnet discovery.
|
||||
assert_eq!(discover_peer_count, subscription_count + 1);
|
||||
assert_eq!(attestation_service.subscription_count(), 64);
|
||||
assert_eq!(enr_add_count, 64);
|
||||
assert_eq!(discover_peer_count, subnets_per_node + 1);
|
||||
assert_eq!(attestation_service.subscription_count(), subnets_per_node);
|
||||
assert_eq!(enr_add_count, subnets_per_node);
|
||||
assert_eq!(
|
||||
unsubscribe_event_count,
|
||||
attestation_subnet_count - subnets_per_node as u64
|
||||
);
|
||||
assert_eq!(unexpected_msg_count, 0);
|
||||
// test completed successfully
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
|
||||
#[tokio::test]
|
||||
async fn subscribe_all_random_subnets_plus_one() {
|
||||
async fn subscribe_correct_number_of_subnets() {
|
||||
let attestation_subnet_count = MainnetEthSpec::default_spec().attestation_subnet_count;
|
||||
let subscription_slot = 10;
|
||||
let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize;
|
||||
|
||||
// the 65th subscription should result in no more messages than the previous scenario
|
||||
let subscription_count = attestation_subnet_count + 1;
|
||||
let committee_count = 1;
|
||||
@@ -479,6 +499,7 @@ mod attestation_service {
|
||||
subscription_count,
|
||||
current_slot + subscription_slot,
|
||||
committee_count,
|
||||
true,
|
||||
);
|
||||
|
||||
// submit the subscriptions
|
||||
@@ -507,12 +528,12 @@ mod attestation_service {
|
||||
} else {
|
||||
panic!("Unexpected event {:?}", bulk_discovery_event);
|
||||
}
|
||||
// 64 `DiscoverPeer` requests of length 1 corresponding to random subnets
|
||||
// subnets_per_node `DiscoverPeer` requests of length 1 corresponding to long-lived subnets
|
||||
// and 1 `DiscoverPeer` request corresponding to the bulk subnet discovery.
|
||||
// For the 65th subscription, the call to `subscribe_to_random_subnets` is not made because we are at capacity.
|
||||
assert_eq!(discover_peer_count, 64 + 1);
|
||||
assert_eq!(attestation_service.subscription_count(), 64);
|
||||
assert_eq!(enr_add_count, 64);
|
||||
|
||||
assert_eq!(discover_peer_count, subnets_per_node + 1);
|
||||
assert_eq!(attestation_service.subscription_count(), subnets_per_node);
|
||||
assert_eq!(enr_add_count, subnets_per_node);
|
||||
assert_eq!(unexpected_msg_count, 0);
|
||||
}
|
||||
|
||||
@@ -522,6 +543,7 @@ mod attestation_service {
|
||||
// subscription config
|
||||
let validator_index = 1;
|
||||
let committee_count = 1;
|
||||
let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize;
|
||||
|
||||
// Makes 2 validator subscriptions to the same subnet but at different slots.
|
||||
// There should be just 1 unsubscription event for the later slot subscription (subscription_slot2).
|
||||
@@ -543,6 +565,7 @@ mod attestation_service {
|
||||
com1,
|
||||
current_slot + Slot::new(subscription_slot1),
|
||||
committee_count,
|
||||
true,
|
||||
);
|
||||
|
||||
let sub2 = get_subscription(
|
||||
@@ -550,6 +573,7 @@ mod attestation_service {
|
||||
com2,
|
||||
current_slot + Slot::new(subscription_slot2),
|
||||
committee_count,
|
||||
true,
|
||||
);
|
||||
|
||||
let subnet_id1 = SubnetId::compute_subnet::<MainnetEthSpec>(
|
||||
@@ -597,11 +621,10 @@ mod attestation_service {
|
||||
&subnet_id1,
|
||||
attestation_subnets::SubscriptionKind::LongLived,
|
||||
) {
|
||||
assert_eq!(expected_subscription, events[3]);
|
||||
// fourth is a discovery event
|
||||
assert_eq!(expected_unsubscription, events[5]);
|
||||
assert_eq!(expected_subscription, events[subnets_per_node * 3]);
|
||||
assert_eq!(expected_unsubscription, events[subnets_per_node * 3 + 2]);
|
||||
}
|
||||
assert_eq!(attestation_service.subscription_count(), 1);
|
||||
assert_eq!(attestation_service.subscription_count(), 2);
|
||||
|
||||
println!("{events:?}");
|
||||
let subscription_slot = current_slot + subscription_slot2 - 1; // one less do to the
|
||||
@@ -634,40 +657,44 @@ mod attestation_service {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[cfg(feature = "deterministic_long_lived_attnets")]
|
||||
async fn test_update_deterministic_long_lived_subnets() {
|
||||
let mut attestation_service = get_attestation_service(None);
|
||||
let new_subnet = SubnetId::new(1);
|
||||
let maintained_subnet = SubnetId::new(2);
|
||||
let removed_subnet = SubnetId::new(3);
|
||||
let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize;
|
||||
|
||||
let current_slot = attestation_service
|
||||
.beacon_chain
|
||||
.slot_clock
|
||||
.now()
|
||||
.expect("Could not get current slot");
|
||||
|
||||
let subscriptions = get_subscriptions(20, current_slot, 30, false);
|
||||
|
||||
// submit the subscriptions
|
||||
attestation_service
|
||||
.set_long_lived_subscriptions(HashSet::from([removed_subnet, maintained_subnet]));
|
||||
// clear initial events
|
||||
let _events = get_events(&mut attestation_service, None, 1).await;
|
||||
.validator_subscriptions(subscriptions)
|
||||
.unwrap();
|
||||
|
||||
attestation_service
|
||||
.update_long_lived_subnets_testing(HashSet::from([maintained_subnet, new_subnet]));
|
||||
|
||||
let events = get_events(&mut attestation_service, None, 1).await;
|
||||
let new_subnet = Subnet::Attestation(new_subnet);
|
||||
let removed_subnet = Subnet::Attestation(removed_subnet);
|
||||
// There should only be the same subscriptions as there are in the specification,
|
||||
// regardless of subscriptions
|
||||
assert_eq!(
|
||||
events,
|
||||
attestation_service.long_lived_subscriptions().len(),
|
||||
subnets_per_node
|
||||
);
|
||||
|
||||
let events = get_events(&mut attestation_service, None, 4).await;
|
||||
|
||||
// Check that we attempt to subscribe and register ENRs
|
||||
matches::assert_matches!(
|
||||
events[..6],
|
||||
[
|
||||
// events for the new subnet
|
||||
SubnetServiceMessage::Subscribe(new_subnet),
|
||||
SubnetServiceMessage::EnrAdd(new_subnet),
|
||||
SubnetServiceMessage::DiscoverPeers(vec![SubnetDiscovery {
|
||||
subnet: new_subnet,
|
||||
min_ttl: None
|
||||
}]),
|
||||
// events for the removed subnet
|
||||
SubnetServiceMessage::Unsubscribe(removed_subnet),
|
||||
SubnetServiceMessage::EnrRemove(removed_subnet),
|
||||
SubnetServiceMessage::Subscribe(_),
|
||||
SubnetServiceMessage::EnrAdd(_),
|
||||
SubnetServiceMessage::DiscoverPeers(_),
|
||||
SubnetServiceMessage::Subscribe(_),
|
||||
SubnetServiceMessage::EnrAdd(_),
|
||||
SubnetServiceMessage::DiscoverPeers(_),
|
||||
]
|
||||
);
|
||||
println!("{events:?}")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user