mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-06 10:11:44 +00:00
Fix subnet unsubscription time (#6890)
Hopefully fixes https://github.com/sigp/lighthouse/issues/6732 In our `scheduled_subscriptions`, we were setting unsubscription slot to be `current_slot + 1`. Given that we were subscribing to the subnet at `duty.slot - 1`, the unsubscription slot ended up being `duty.slot`. So we were unsubscribing to the subnet at the beginning of the duty slot which is insane. Fixes the `scheduled_subscriptions` to unsubscribe at `duty.slot + 1`.
This commit is contained in:
@@ -216,6 +216,12 @@ impl<T: BeaconChainTypes> SubnetService<T> {
|
||||
|| self.permanent_attestation_subscriptions.contains(subnet)
|
||||
}
|
||||
|
||||
/// Returns whether we are subscribed to a permanent subnet for testing purposes.
|
||||
#[cfg(test)]
|
||||
pub(crate) fn is_subscribed_permanent(&self, subnet: &Subnet) -> bool {
|
||||
self.permanent_attestation_subscriptions.contains(subnet)
|
||||
}
|
||||
|
||||
/// Processes a list of validator subscriptions.
|
||||
///
|
||||
/// This is fundamentally called form the HTTP API when a validator requests duties from us
|
||||
@@ -629,9 +635,10 @@ impl<T: BeaconChainTypes> Stream for SubnetService<T> {
|
||||
// expire subscription.
|
||||
match self.scheduled_subscriptions.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(Ok(exact_subnet))) => {
|
||||
let ExactSubnet { subnet, .. } = exact_subnet;
|
||||
let current_slot = self.beacon_chain.slot_clock.now().unwrap_or_default();
|
||||
if let Err(e) = self.subscribe_to_subnet_immediately(subnet, current_slot + 1) {
|
||||
let ExactSubnet { subnet, slot } = exact_subnet;
|
||||
// Set the `end_slot` for the subscription to be `duty.slot + 1` so that we unsubscribe
|
||||
// only at the end of the duty slot.
|
||||
if let Err(e) = self.subscribe_to_subnet_immediately(subnet, slot + 1) {
|
||||
debug!(self.log, "Failed to subscribe to short lived subnet"; "subnet" => ?subnet, "err" => e);
|
||||
}
|
||||
self.waker
|
||||
|
||||
@@ -7,9 +7,6 @@ use beacon_chain::{
|
||||
};
|
||||
use genesis::{generate_deterministic_keypairs, interop_genesis_state, DEFAULT_ETH1_BLOCK_HASH};
|
||||
use lighthouse_network::NetworkConfig;
|
||||
use logging::test_logger;
|
||||
use slog::{o, Drain, Logger};
|
||||
use sloggers::{null::NullLoggerBuilder, Build};
|
||||
use slot_clock::{SlotClock, SystemTimeSlotClock};
|
||||
use std::sync::{Arc, LazyLock};
|
||||
use std::time::{Duration, SystemTime};
|
||||
@@ -21,10 +18,6 @@ use types::{
|
||||
SyncCommitteeSubscription, SyncSubnetId, ValidatorSubscription,
|
||||
};
|
||||
|
||||
// Set to enable/disable logging
|
||||
// const TEST_LOG_LEVEL: Option<slog::Level> = Some(slog::Level::Debug);
|
||||
const TEST_LOG_LEVEL: Option<slog::Level> = None;
|
||||
|
||||
const SLOT_DURATION_MILLIS: u64 = 400;
|
||||
|
||||
type TestBeaconChainType = Witness<
|
||||
@@ -46,7 +39,7 @@ impl TestBeaconChain {
|
||||
|
||||
let keypairs = generate_deterministic_keypairs(1);
|
||||
|
||||
let log = get_logger(TEST_LOG_LEVEL);
|
||||
let log = logging::test_logger();
|
||||
let store =
|
||||
HotColdDB::open_ephemeral(StoreConfig::default(), spec.clone(), log.clone()).unwrap();
|
||||
|
||||
@@ -98,28 +91,10 @@ pub fn recent_genesis_time() -> u64 {
|
||||
.as_secs()
|
||||
}
|
||||
|
||||
fn get_logger(log_level: Option<slog::Level>) -> Logger {
|
||||
if let Some(level) = log_level {
|
||||
let drain = {
|
||||
let decorator = slog_term::TermDecorator::new().build();
|
||||
let decorator =
|
||||
logging::AlignedTermDecorator::new(decorator, logging::MAX_MESSAGE_WIDTH);
|
||||
let drain = slog_term::FullFormat::new(decorator).build().fuse();
|
||||
let drain = slog_async::Async::new(drain).chan_size(2048).build();
|
||||
drain.filter_level(level)
|
||||
};
|
||||
|
||||
Logger::root(drain.fuse(), o!())
|
||||
} else {
|
||||
let builder = NullLoggerBuilder;
|
||||
builder.build().expect("should build logger")
|
||||
}
|
||||
}
|
||||
|
||||
static CHAIN: LazyLock<TestBeaconChain> = LazyLock::new(TestBeaconChain::new_with_system_clock);
|
||||
|
||||
fn get_subnet_service() -> SubnetService<TestBeaconChainType> {
|
||||
let log = test_logger();
|
||||
let log = logging::test_logger();
|
||||
let config = NetworkConfig::default();
|
||||
|
||||
let beacon_chain = CHAIN.chain.clone();
|
||||
@@ -501,8 +476,6 @@ mod test {
|
||||
let committee_count = 1;
|
||||
|
||||
// Makes 3 validator subscriptions to the same subnet but at different slots.
|
||||
// There should be just 1 unsubscription event for each of the later slots subscriptions
|
||||
// (subscription_slot2 and subscription_slot3).
|
||||
let subscription_slot1 = 0;
|
||||
let subscription_slot2 = MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD + 4;
|
||||
let subscription_slot3 = subscription_slot2 * 2;
|
||||
@@ -585,7 +558,7 @@ mod test {
|
||||
let expected_unsubscription =
|
||||
SubnetServiceMessage::Unsubscribe(Subnet::Attestation(subnet_id1));
|
||||
|
||||
if !subnet_service.is_subscribed(&Subnet::Attestation(subnet_id1)) {
|
||||
if !subnet_service.is_subscribed_permanent(&Subnet::Attestation(subnet_id1)) {
|
||||
assert_eq!(expected_subscription, events[0]);
|
||||
assert_eq!(expected_unsubscription, events[2]);
|
||||
}
|
||||
@@ -607,9 +580,18 @@ mod test {
|
||||
|
||||
assert_eq!(no_events, []);
|
||||
|
||||
let second_subscribe_event = get_events(&mut subnet_service, None, 2).await;
|
||||
let subscription_end_slot = current_slot + subscription_slot2 + 2; // +1 to get to the end of the duty slot, +1 for the slot to complete
|
||||
let wait_slots = subnet_service
|
||||
.beacon_chain
|
||||
.slot_clock
|
||||
.duration_to_slot(subscription_end_slot)
|
||||
.unwrap()
|
||||
.as_millis() as u64
|
||||
/ SLOT_DURATION_MILLIS;
|
||||
|
||||
let second_subscribe_event = get_events(&mut subnet_service, None, wait_slots as u32).await;
|
||||
// If the permanent and short lived subnets are different, we should get an unsubscription event.
|
||||
if !subnet_service.is_subscribed(&Subnet::Attestation(subnet_id1)) {
|
||||
if !subnet_service.is_subscribed_permanent(&Subnet::Attestation(subnet_id1)) {
|
||||
assert_eq!(
|
||||
[
|
||||
expected_subscription.clone(),
|
||||
@@ -633,9 +615,18 @@ mod test {
|
||||
|
||||
assert_eq!(no_events, []);
|
||||
|
||||
let third_subscribe_event = get_events(&mut subnet_service, None, 2).await;
|
||||
let subscription_end_slot = current_slot + subscription_slot3 + 2; // +1 to get to the end of the duty slot, +1 for the slot to complete
|
||||
let wait_slots = subnet_service
|
||||
.beacon_chain
|
||||
.slot_clock
|
||||
.duration_to_slot(subscription_end_slot)
|
||||
.unwrap()
|
||||
.as_millis() as u64
|
||||
/ SLOT_DURATION_MILLIS;
|
||||
|
||||
if !subnet_service.is_subscribed(&Subnet::Attestation(subnet_id1)) {
|
||||
let third_subscribe_event = get_events(&mut subnet_service, None, wait_slots as u32).await;
|
||||
|
||||
if !subnet_service.is_subscribed_permanent(&Subnet::Attestation(subnet_id1)) {
|
||||
assert_eq!(
|
||||
[expected_subscription, expected_unsubscription],
|
||||
third_subscribe_event[..]
|
||||
|
||||
Reference in New Issue
Block a user