Fix subscribing to attestation subnets for aggregating (#6681) (#6682)

* Fix subscribing to attestation subnets for aggregating (#6681)

* Prevent scheduled subnet subscriptions from being overwritten by other subscriptions from same subnet with additional scoping by slot
This commit is contained in:
Povilas Liubauskas
2024-12-12 10:36:34 +02:00
committed by GitHub
parent fc0e0ae613
commit 4946343990
2 changed files with 57 additions and 7 deletions

View File

@@ -86,7 +86,7 @@ pub struct SubnetService<T: BeaconChainTypes> {
subscriptions: HashSetDelay<Subnet>,
/// Subscriptions that need to be executed in the future.
scheduled_subscriptions: HashSetDelay<Subnet>,
scheduled_subscriptions: HashSetDelay<ExactSubnet>,
/// A list of permanent subnets that this node is subscribed to.
// TODO: Shift this to a dynamic bitfield
@@ -484,8 +484,10 @@ impl<T: BeaconChainTypes> SubnetService<T> {
self.subscribe_to_subnet_immediately(subnet, slot + 1)?;
} else {
// This is a future slot, schedule subscribing.
// We need to include the slot to make the key unique to prevent overwriting the entry
// for the same subnet.
self.scheduled_subscriptions
.insert_at(subnet, time_to_subscription_start);
.insert_at(ExactSubnet { subnet, slot }, time_to_subscription_start);
}
Ok(())
@@ -626,7 +628,8 @@ impl<T: BeaconChainTypes> Stream for SubnetService<T> {
// Process scheduled subscriptions that might be ready, since those can extend a soon to
// expire subscription.
match self.scheduled_subscriptions.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(subnet))) => {
Poll::Ready(Some(Ok(exact_subnet))) => {
let ExactSubnet { subnet, .. } = exact_subnet;
let current_slot = self.beacon_chain.slot_clock.now().unwrap_or_default();
if let Err(e) = self.subscribe_to_subnet_immediately(subnet, current_slot + 1) {
debug!(self.log, "Failed to subscribe to short lived subnet"; "subnet" => ?subnet, "err" => e);