From 494634399027b94f31759ba5bb4d3a5d2aaff503 Mon Sep 17 00:00:00 2001 From: Povilas Liubauskas Date: Thu, 12 Dec 2024 10:36:34 +0200 Subject: [PATCH] Fix subscribing to attestation subnets for aggregating (#6681) (#6682) * Fix subscribing to attestation subnets for aggregating (#6681) * Prevent scheduled subnet subscriptions from being overwritten by other subscriptions from same subnet with additional scoping by slot --- beacon_node/network/src/subnet_service/mod.rs | 9 ++- .../network/src/subnet_service/tests/mod.rs | 55 +++++++++++++++++-- 2 files changed, 57 insertions(+), 7 deletions(-) diff --git a/beacon_node/network/src/subnet_service/mod.rs b/beacon_node/network/src/subnet_service/mod.rs index ec6f3b10a3..da1f220f04 100644 --- a/beacon_node/network/src/subnet_service/mod.rs +++ b/beacon_node/network/src/subnet_service/mod.rs @@ -86,7 +86,7 @@ pub struct SubnetService { subscriptions: HashSetDelay, /// Subscriptions that need to be executed in the future. - scheduled_subscriptions: HashSetDelay, + scheduled_subscriptions: HashSetDelay, /// A list of permanent subnets that this node is subscribed to. // TODO: Shift this to a dynamic bitfield @@ -484,8 +484,10 @@ impl SubnetService { self.subscribe_to_subnet_immediately(subnet, slot + 1)?; } else { // This is a future slot, schedule subscribing. + // We need to include the slot to make the key unique to prevent overwriting the entry + // for the same subnet. self.scheduled_subscriptions - .insert_at(subnet, time_to_subscription_start); + .insert_at(ExactSubnet { subnet, slot }, time_to_subscription_start); } Ok(()) @@ -626,7 +628,8 @@ impl Stream for SubnetService { // Process scheduled subscriptions that might be ready, since those can extend a soon to // expire subscription. match self.scheduled_subscriptions.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(subnet))) => { + Poll::Ready(Some(Ok(exact_subnet))) => { + let ExactSubnet { subnet, .. } = exact_subnet; let current_slot = self.beacon_chain.slot_clock.now().unwrap_or_default(); if let Err(e) = self.subscribe_to_subnet_immediately(subnet, current_slot + 1) { debug!(self.log, "Failed to subscribe to short lived subnet"; "subnet" => ?subnet, "err" => e); diff --git a/beacon_node/network/src/subnet_service/tests/mod.rs b/beacon_node/network/src/subnet_service/tests/mod.rs index 91e4841b26..7283b4af31 100644 --- a/beacon_node/network/src/subnet_service/tests/mod.rs +++ b/beacon_node/network/src/subnet_service/tests/mod.rs @@ -500,12 +500,15 @@ mod test { // subscription config let committee_count = 1; - // Makes 2 validator subscriptions to the same subnet but at different slots. - // There should be just 1 unsubscription event for the later slot subscription (subscription_slot2). + // Makes 3 validator subscriptions to the same subnet but at different slots. + // There should be just 1 unsubscription event for each of the later slots subscriptions + // (subscription_slot2 and subscription_slot3). let subscription_slot1 = 0; let subscription_slot2 = MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD + 4; + let subscription_slot3 = subscription_slot2 * 2; let com1 = MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD + 4; let com2 = 0; + let com3 = CHAIN.chain.spec.attestation_subnet_count - com1; // create the attestation service and subscriptions let mut subnet_service = get_subnet_service(); @@ -532,6 +535,13 @@ mod test { true, ); + let sub3 = get_subscription( + com3, + current_slot + Slot::new(subscription_slot3), + committee_count, + true, + ); + let subnet_id1 = SubnetId::compute_subnet::( current_slot + Slot::new(subscription_slot1), com1, @@ -548,12 +558,23 @@ mod test { ) .unwrap(); + let subnet_id3 = SubnetId::compute_subnet::( + current_slot + Slot::new(subscription_slot3), + com3, + committee_count, + &subnet_service.beacon_chain.spec, + ) + .unwrap(); + // Assert that subscriptions are different but their subnet is the same assert_ne!(sub1, sub2); + assert_ne!(sub1, sub3); + assert_ne!(sub2, sub3); assert_eq!(subnet_id1, subnet_id2); + assert_eq!(subnet_id1, subnet_id3); // submit the subscriptions - subnet_service.validator_subscriptions(vec![sub1, sub2].into_iter()); + subnet_service.validator_subscriptions(vec![sub1, sub2, sub3].into_iter()); // Unsubscription event should happen at the end of the slot. // We wait for 2 slots, to avoid timeout issues @@ -590,10 +611,36 @@ mod test { // If the permanent and short lived subnets are different, we should get an unsubscription event. if !subnet_service.is_subscribed(&Subnet::Attestation(subnet_id1)) { assert_eq!( - [expected_subscription, expected_unsubscription], + [ + expected_subscription.clone(), + expected_unsubscription.clone(), + ], second_subscribe_event[..] ); } + + let subscription_slot = current_slot + subscription_slot3 - 1; + + let wait_slots = subnet_service + .beacon_chain + .slot_clock + .duration_to_slot(subscription_slot) + .unwrap() + .as_millis() as u64 + / SLOT_DURATION_MILLIS; + + let no_events = dbg!(get_events(&mut subnet_service, None, wait_slots as u32).await); + + assert_eq!(no_events, []); + + let third_subscribe_event = get_events(&mut subnet_service, None, 2).await; + + if !subnet_service.is_subscribed(&Subnet::Attestation(subnet_id1)) { + assert_eq!( + [expected_subscription, expected_unsubscription], + third_subscribe_event[..] + ); + } } #[tokio::test]