diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 2b7ae9e4d1..598d6e2640 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -19,7 +19,7 @@ use tracing::{debug, error, info_span, Instrument}; use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList}; use types::{ BlobSidecarList, ChainSpec, DataColumnIdentifier, DataColumnSidecar, DataColumnSidecarList, - Epoch, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock, + Epoch, EthSpec, ForkName, Hash256, RuntimeVariableList, SignedBeaconBlock, }; mod error; @@ -495,10 +495,23 @@ impl DataAvailabilityChecker { fork_epoch, current_slot .epoch(T::EthSpec::slots_per_epoch()) + // TODO(das): use min_epochs_for_data_columns .saturating_sub(self.spec.min_epochs_for_blob_sidecars_requests), )) } + pub fn oldest_epoch_with_data_columns(&self) -> Option { + let fulu_fork_epoch = self.spec.fork_epoch(ForkName::Fulu)?; + if fulu_fork_epoch == Epoch::max_value() { + return None; + } + let current_epoch = self.slot_clock.now()?.epoch(T::EthSpec::slots_per_epoch()); + Some(std::cmp::max( + fulu_fork_epoch, + current_epoch.saturating_sub(self.spec.min_epochs_for_blob_sidecars_requests), + )) + } + /// Returns true if the given epoch lies within the da boundary and false otherwise. pub fn da_check_required_for_epoch(&self, block_epoch: Epoch) -> bool { self.data_availability_boundary() diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 1c0867bf3f..605faa3b4e 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -931,8 +931,10 @@ impl Network { warn!(%topic, error = ?e, "Failed to subscribe to topic"); false } - Ok(_) => { - debug!(%topic, "Subscribed to topic"); + Ok(new_subscription) => { + if new_subscription { + debug!(%topic, "Subscribed to topic"); + } true } } diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index acbe5b51de..f719fc49da 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -7,6 +7,7 @@ use crate::{Client, Enr, EnrExt, GossipTopic, Multiaddr, NetworkConfig, PeerId}; use local_metadata::LocalMetadata; use parking_lot::RwLock; use std::collections::HashSet; +use std::ops::Range; use std::sync::Arc; use types::data_column_custody_group::{ compute_columns_from_custody_groups, compute_subnets_from_custody_groups, get_custody_groups, @@ -141,6 +142,12 @@ impl NetworkGlobals { self.cgc_updates.read().at_slot(slot) } + /// Returns the minimum CGC value in the range of slots `range`. If the range is empty, + /// i.e. `3..1` returns the CGC value at `range.start`. + pub fn min_custody_group_count_at_range(&self, slot_range: Range) -> u64 { + self.cgc_updates.read().min_at_slot_range(slot_range) + } + /// Returns the count of custody columns this node must sample for block import pub fn custody_columns_count(&self, slot: Slot) -> u64 { // This only panics if the chain spec contains invalid values @@ -164,6 +171,10 @@ impl NetworkGlobals { self.cgc_updates.read().clone() } + pub fn cgc_updates_len(&self) -> usize { + self.cgc_updates.read().len() + } + /// Returns the number of libp2p connected peers. pub fn connected_peers(&self) -> usize { self.peers.read().connected_peer_ids().count() diff --git a/beacon_node/lighthouse_network/src/types/topics.rs b/beacon_node/lighthouse_network/src/types/topics.rs index 8a8eb6b849..480af1e223 100644 --- a/beacon_node/lighthouse_network/src/types/topics.rs +++ b/beacon_node/lighthouse_network/src/types/topics.rs @@ -195,6 +195,27 @@ impl std::fmt::Display for GossipKind { } } +impl GossipKind { + /// Returns the ForkName after which this GossipKind can form a valid topic + pub fn fork_activation(&self) -> ForkName { + match self { + Self::BeaconBlock => ForkName::Base, + Self::BeaconAggregateAndProof => ForkName::Base, + Self::BlobSidecar(_) => ForkName::Deneb, + Self::DataColumnSidecar(_) => ForkName::Fulu, + Self::Attestation(_) => ForkName::Base, + Self::VoluntaryExit => ForkName::Base, + Self::ProposerSlashing => ForkName::Base, + Self::AttesterSlashing => ForkName::Base, + Self::SignedContributionAndProof => ForkName::Altair, + Self::SyncCommitteeMessage(_) => ForkName::Altair, + Self::BlsToExecutionChange => ForkName::Capella, + Self::LightClientFinalityUpdate => ForkName::Altair, + Self::LightClientOptimisticUpdate => ForkName::Altair, + } + } +} + /// The known encoding types for gossipsub messages. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash, Default)] pub enum GossipEncoding { diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index 1e9763e97d..52f3fa3c38 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -655,10 +655,34 @@ pub static CGC_ANNOUNCED: LazyLock> = LazyLock::new(|| { "Current announced Custody Group Count CGC", ) }); -pub static CGC_UPDATES: LazyLock> = LazyLock::new(|| { +pub static CGC_MIN_BACKFILL_RANGE: LazyLock> = LazyLock::new(|| { + try_create_int_gauge( + "beacon_custody_cgc_min_backfill_range", + "Current min CGC value in backfill range", + ) +}); +pub static CGC_FINALIZED_SLOT: LazyLock> = LazyLock::new(|| { + try_create_int_gauge( + "beacon_custody_cgc_finalized_slot", + "Current CGC value at the finalized slot", + ) +}); +pub static CGC_UPDATES_LENGTH: LazyLock> = LazyLock::new(|| { + try_create_int_gauge( + "beacon_custody_cgc_updates_length", + "Current length of stored Custody Group Count CGC updates", + ) +}); +pub static CGC_UPDATE_EVENTS: LazyLock> = LazyLock::new(|| { try_create_int_counter( - "beacon_custody_cgc_updates", - "Total count of Custody Group Count CGC updates", + "beacon_custody_cgc_update_events", + "Total count of the Custody Group Count CGC is updated", + ) +}); +pub static BACKFILL_RESTARTED_FOR_CGC: LazyLock> = LazyLock::new(|| { + try_create_int_counter( + "beacon_custody_backfill_restarted_for_cgc", + "Total count of times backfill has restarted to backfill a higher CGC", ) }); diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 0a65c75c4a..eb3ccca462 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -34,7 +34,7 @@ use tokio::sync::mpsc; use tokio::time::Sleep; use tracing::{debug, error, info, info_span, trace, warn, Instrument}; use types::{ - ChainSpec, Epoch, EthSpec, ForkContext, ForkName, Slot, SubnetId, SyncCommitteeSubscription, + ChainSpec, Epoch, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId, Unsigned, ValidatorSubscription, }; @@ -388,8 +388,6 @@ impl NetworkService { validator_subscription_recv, } = network_receivers; - let epoch_in_seconds = spec.seconds_per_slot * T::EthSpec::slots_per_epoch(); - // create the network service and spawn the task let network_service = NetworkService { beacon_chain, @@ -407,7 +405,7 @@ impl NetworkService { metrics_enabled: config.metrics_enabled, metrics_update, gossipsub_parameter_update, - cgc_update_interval: tokio::time::interval(Duration::from_secs(epoch_in_seconds)), + cgc_update_interval: tokio::time::interval(Duration::from_secs(spec.seconds_per_slot)), fork_context, }; @@ -442,37 +440,33 @@ impl NetworkService { /// /// For `current_slot < fork_slot`, this function returns both the pre-fork and post-fork /// digests since we should be subscribed to post fork topics before the fork. - pub fn required_gossip_fork_digests(&self) -> Vec<[u8; 4]> { + pub fn required_gossip_forks_to_subscribe(&self, topic: &GossipKind) -> Vec<[u8; 4]> { let fork_context = &self.fork_context; let spec = &self.beacon_chain.spec; let current_slot = self.beacon_chain.slot().unwrap_or(spec.genesis_slot); let current_fork = fork_context.current_fork(); - let mut result = vec![fork_context - .to_context_bytes(current_fork) - .unwrap_or_else(|| { - panic!( - "{} fork bytes should exist as it's initialized in ForkContext", - current_fork - ) - })]; + let mut required_fork_to_subscribe = vec![current_fork]; if let Some((next_fork, fork_epoch)) = spec.next_fork_epoch::(current_slot) { if current_slot.saturating_add(Slot::new(SUBSCRIBE_DELAY_SLOTS)) >= fork_epoch.start_slot(T::EthSpec::slots_per_epoch()) { - let next_fork_context_bytes = - fork_context.to_context_bytes(next_fork).unwrap_or_else(|| { - panic!( - "context bytes should exist as spec.next_fork_epoch({}) returned Some({})", - current_slot, next_fork - ) - }); - result.push(next_fork_context_bytes); + required_fork_to_subscribe.push(next_fork); } } - result + required_fork_to_subscribe + .into_iter() + // This filter prevents subscribing to Electra data_columns topic if we call + // `Self::subscribe` close enough to the Fulu fork. + .filter(|fork| topic.fork_activation() >= *fork) + .map(|fork| { + fork_context + .to_context_bytes(fork) + .unwrap_or_else(|| panic!("context bytes should exist for {fork}")) + }) + .collect() } fn send_to_router(&mut self, msg: RouterMessage) { @@ -770,7 +764,7 @@ impl NetworkService { &self.network_globals.as_topic_config(self.clock_slot()), &self.fork_context.spec, ) { - for fork_digest in self.required_gossip_fork_digests() { + for fork_digest in self.required_gossip_forks_to_subscribe(&topic_kind) { let topic = GossipTopic::new( topic_kind.clone(), GossipEncoding::default(), @@ -852,16 +846,47 @@ impl NetworkService { fn on_cgc_update_interval(&mut self) { // Skip running this function if Fulu is not scheduled. But run it before the fork to start // announcing the CGC ahead of the fork. - let fulu_fork_epoch = match self.beacon_chain.spec.fork_epoch(ForkName::Fulu) { - None => return, - Some(epoch) if epoch == Epoch::max_value() => return, - Some(epoch) => epoch, - }; + if self.beacon_chain.spec.is_peer_das_scheduled() { + return; + } - let prev_cgc = self.network_globals.custody_group_count(Slot::max_value()); - let Ok(clock_epoch) = self.beacon_chain.epoch() else { + let Ok(clock_slot) = self.beacon_chain.slot() else { + // If the clcok is faulty just ignore CGC updates return; }; + + // Runs every slot, but we only compute the next cgc halfway through the epoch. We want to + // make sure that we don't reach `clock_slot` before updating the CGCUpdates map. Half epoch + // is plenty of time. + if clock_slot % T::EthSpec::slots_per_epoch() == T::EthSpec::slots_per_epoch() / 2 { + return; + } + + let next_cgc = self.compute_current_head_cgc(); + let prev_cgc = self.network_globals.custody_group_count(Slot::max_value()); + + // TODO(das): For now do not support a decrease in CGC. If we allow to decrease and increase + // we can have a scaled plot over time of `CGC(time)` where `min(CGC(time))` is not the + // oldest value. We would need to adjust some logic below to consider the minimum value + // through a range of slots, instead of the value at the oldest slot. + if next_cgc > prev_cgc { + self.on_cgc_increase(prev_cgc, next_cgc, clock_slot); + } + + // Prune updates that are older than the last slot with data columns + self.prune_old_cgc_updates(); + self.maybe_restart_backfill_sync_for_cgc_backfill(); + self.maybe_update_cgc_to_announce(); + + // Set next_cgc metric regardless if next value to track initial default value + let cgc_updates_len = self.network_globals.cgc_updates_len(); + metrics::set_gauge(&metrics::CGC_UPDATES_LENGTH, cgc_updates_len as i64); + metrics::set_gauge(&metrics::CGC_INTERNAL, next_cgc as i64); + } + + /// Computes the custody group count (CGC) based on the current set of connected local indices + /// and the head state balances. + fn compute_current_head_cgc(&self) -> u64 { let cached_head = self.beacon_chain.canonical_head.cached_head(); self.beacon_chain @@ -888,90 +913,138 @@ impl NetworkService { }) .sum::(); + metrics::set_gauge(&metrics::LOCAL_INDICES_COUNT, local_indices_count as i64); + metrics::set_gauge( + &metrics::LOCAL_INDICES_ETH_BALANCE, + known_validators_balance as i64 / 1_000_000_000, + ); + // TODO(das): Should persist the local indices here to dump to DB once in a while in case of // improper shutdown? Not sure if we do the same for other persisted data. It sounds // sensible but at the same time it will waste I/O. - let next_cgc = self - .beacon_chain + self.beacon_chain .spec - .custody_group_by_balance(known_validators_balance); + .custody_group_by_balance(known_validators_balance) + } - // TODO(das): For now do not support a decrease in CGC. If we allow to decrease and increase - // we can have a scaled plot over time of `CGC(time)` where `min(CGC(time))` is not the - // oldest value. We would need to adjust some logic below to consider the minimum value - // through a range of slots, instead of the value at the oldest slot. - if next_cgc > prev_cgc { - // TODO(das): Should we consider the case where the clock is almost at the end of the epoch? - // If I/O is slow we may update the in-memory map for an epoch that's already - // progressing. - let next_epoch = clock_epoch + 1; - info!(prev_cgc, next_cgc, %next_epoch, "Updating internal custody count"); - metrics::inc_counter(&metrics::CGC_UPDATES); + /// Handler for an increase of the internal CGC value. + fn on_cgc_increase(&mut self, prev_cgc: u64, next_cgc: u64, clock_slot: Slot) { + let clock_epoch = clock_slot.epoch(T::EthSpec::slots_per_epoch()); + let next_epoch = clock_epoch + Epoch::new(1); + let next_epoch_start_slot = next_epoch.start_slot(T::EthSpec::slots_per_epoch()); - // Add a new entry to the network globals - if let Err(e) = self.network_globals.add_cgc_update( - next_epoch.start_slot(T::EthSpec::slots_per_epoch()), - next_cgc, - ) { - error!("List of CGC updates full: {e:?}"); - return; - } - let cgc_updates = self.network_globals.dump_cgc_updates(); + metrics::inc_counter(&metrics::CGC_UPDATE_EVENTS); + info!(prev_cgc, next_cgc, slot = %next_epoch_start_slot, "Updating internal custody count"); - // Persist the entry to the store - if let Err(e) = self.beacon_chain.store.put_cgc_updates(cgc_updates) { - // Do not update the memory value unless it's persisted to disk - error!(error = ?e, "Unable to persist CGC updates to disk"); - } - - // Update the gossip subscriptions - let next_sampling_subnets = self - .network_globals - .sampling_subnets_for_cgc(next_cgc) - .iter() - .copied() - .collect::>(); - let prev_sampling_subnets = self - .network_globals - .sampling_subnets_for_cgc(prev_cgc) - .iter() - .copied() - .collect::>(); - if next_cgc > prev_cgc { - for subnet in next_sampling_subnets.difference(&prev_sampling_subnets) { - self.subscribe(Subnet::DataColumn(*subnet).into()); - } - } - if next_cgc < prev_cgc { - for subnet in prev_sampling_subnets.difference(&next_sampling_subnets) { - self.unsubscribe(Subnet::DataColumn(*subnet).into()); - } - } + // Add a new entry to the network globals + if let Err(e) = self + .network_globals + .add_cgc_update(next_epoch_start_slot, next_cgc) + { + error!("List of CGC updates full: {e:?}"); + return; + } else { + debug!(slot = %next_epoch_start_slot, cgc=next_cgc,"Added new CGC update entry"); } - // TODO(das): check the backfilled CGC and potentially update the network globals state - // IDEA: + // Persist the entry to the store + let cgc_updates = self.network_globals.dump_cgc_updates(); + if let Err(e) = self.beacon_chain.store.put_cgc_updates(cgc_updates) { + // Do not update the memory value unless it's persisted to disk + error!(error = ?e, "Unable to persist CGC updates to disk"); + } + + // Update the gossip subscriptions + // + // Note: Self::subscribe won't subscribe until we are close enough to the Fulu fork. So we + // can call this function eagerly every epoch. Topics for a higher CGC may be subscribed + // either here or in the topic fork transition function. Both will result in the same topic + // set, so we don't care about race conditions here. + let sampling_subnets_for = |cgc| { + self.network_globals + .sampling_subnets_for_cgc(cgc) + .iter() + .copied() + .collect::>() + }; + let next_sampling_subnets = sampling_subnets_for(next_cgc); + let prev_sampling_subnets = sampling_subnets_for(prev_cgc); + + if next_cgc > prev_cgc { + for subnet in next_sampling_subnets.difference(&prev_sampling_subnets) { + self.subscribe(Subnet::DataColumn(*subnet).into()); + } + } + if next_cgc < prev_cgc { + for subnet in prev_sampling_subnets.difference(&next_sampling_subnets) { + self.unsubscribe(Subnet::DataColumn(*subnet).into()); + } + } + } + + /// As per spec when we announced a CGC we MUST be able to serve columns for that CGC for all + /// slots with data columns within the DA window. Given a variable CGC over time the value we + /// can announce is the `min(CGC(slot))` over that time period. + fn maybe_update_cgc_to_announce(&mut self) { + let Some(oldest_slot_with_data_columns) = self + .beacon_chain + .data_availability_checker + .oldest_epoch_with_data_columns() + .map(|epoch| epoch.start_slot(T::EthSpec::slots_per_epoch())) + else { + return; // None before Fulu and on clock error + }; + + let cgc_to_announce = self + .network_globals + .min_custody_group_count_at_range(oldest_slot_with_data_columns..Slot::max_value()); + + let updated_enr = match self.libp2p.update_enr_cgc(cgc_to_announce) { + Ok(updated) => updated, + Err(e) => { + crit!(error = ?e, "Error updating local ENR custody group count"); + false + } + }; + if updated_enr { + info!(cgc = cgc_to_announce, "Updated ENR custody group count"); + } + + metrics::set_gauge(&metrics::CGC_ANNOUNCED, cgc_to_announce as i64); + } + + /// Maybe restart block backfill to re-download batches with a higher CGC value. This strategy + /// prevents having to develop a new backfill sync algorithm just for custody columns. + fn maybe_restart_backfill_sync_for_cgc_backfill(&mut self) { // When we forward sync and finalize a new block, we may restart backfill again from a later // block (the new finalized block). We will reset oldest_block to that block and fail // backfill sync to start over from it. Then make backfill sync use a higher CGC (say 128) // and when oldest_block is less than the oldest step with a value < 128 we can delete that // step such that `custody_group_count(clock - da_window)` returns 128. // + // Before restarting backfill a typical WS start results in the following plot. In the plot + // below the X axis is time and the Y axis is CGC value. CGCUpdates struct has two updates + // or steps: one at (Slot(0), 8) and a later (Slot(X), 128). Then backfill will download + // columns assuming a CGC = 8. + // // CGC 128 // : ___________:____________ // : | : // CGC 8 : | : - // ............:_____| : + // ____________:_____| : // : WS start : Finalized // <------ block block // backfill // with CGC 8 // // + // This function will remove from CGCUpdates all updates older than Slot(X), such that we + // get this `CGC(slot)` plot. Now backfill will fetch batches for CGC = 128. However, we + // need to re-download all batches since Slot(X). // // CGC 128 - // .............................. ___________:____________ + // ______________________________:____________ // : // : // : @@ -980,68 +1053,104 @@ impl NetworkService { // backfill // with CGC 128 // - let oldest_block_slot = self.beacon_chain.store.get_anchor_info().oldest_block_slot; - // TODO(das): use min_epochs_for_data_columns - let last_pruned_epoch = clock_epoch.saturating_sub(Epoch::new( - self.beacon_chain.spec.min_epochs_for_blob_sidecars_requests, - )); - let last_pruned_slot = last_pruned_epoch.start_slot(T::EthSpec::slots_per_epoch()); - let fulu_fork_slot = fulu_fork_epoch.start_slot(T::EthSpec::slots_per_epoch()); - let oldest_relevant_slot = std::cmp::max( - oldest_block_slot, - std::cmp::max(last_pruned_slot, fulu_fork_slot), - ); - let finalized_slot = self.beacon_chain.finalized_slot(); + let Some(oldest_slot_with_data_columns) = self + .beacon_chain + .data_availability_checker + .oldest_epoch_with_data_columns() + .map(|epoch| epoch.start_slot(T::EthSpec::slots_per_epoch())) + else { + return; // None before Fulu and on clock error + }; - let cgc_at_oldest_relevant_slot = self + let min_cgc_data_columns_backfill_range = self .network_globals - .custody_group_count(oldest_relevant_slot); + .min_custody_group_count_at_range(oldest_slot_with_data_columns..finalized_slot); let cgc_at_finalized_slot = self.network_globals.custody_group_count(finalized_slot); + let oldest_block_slot = self.beacon_chain.store.get_anchor_info().oldest_block_slot; let backfill_started_recently = finalized_slot.saturating_sub(oldest_block_slot) < MAX_SLOT_DISTANCE_BACKFILL_RESTART; let backfill_finished = oldest_block_slot == Slot::new(0); - // TODO(das): If we support a decreasing CGC we must consider the min value between this two - // slots. + // Backfill sync may currently be active and downloading batches of columns. `oldest_block_slot` + // only represents the current latest imported batch of backfill blocks, but more could be + // downloaded and pending. So for completeness we ask: + // + // Q: What are the complete set of CGC values that backfill sync can possibly have ever + // downloaded columns from? + // A: The set of CGC values between `oldest_slot_with_data_columns..finalized_slot`. + // `oldest_slot_with_data_columns` is a dynamic but strictly increasing values. We will + // never download relevant columns less than this value. `finalized_slot` is a dynamic + // strictly increasing value, so we will reset backfill sync to that value. // // Skip if backfill has finished. State reconstruction may have already started and we could // mess with the DB. For real networks Fulu fork is way ahead of genesis so it won't affect - if cgc_at_oldest_relevant_slot < cgc_at_finalized_slot + let restart_backfill_sync = min_cgc_data_columns_backfill_range < cgc_at_finalized_slot && backfill_started_recently - && !backfill_finished - { + && !backfill_finished; + if restart_backfill_sync { // We need backfill sync to fetch batches with `CGC_f = cgc_at_finalized_slot`. Then // `custody_group_count(oldest_block_slot) should now return `CGC_f`. So we have to // delete the CGC updates with `update.slot < finalized_slot` + // + // TODO(das): Don't start from finalized_slot, but instead the closest step before + // `finalized_slot`. self.network_globals .prune_cgc_updates_older_than(finalized_slot); self.send_to_router(RouterMessage::BackfillSyncRestart(finalized_slot)); + + info!(slot = %finalized_slot, "Restarting backfill sync to fetch custody columns"); + metrics::inc_counter(&metrics::BACKFILL_RESTARTED_FOR_CGC); } - // Schedule an advertise CGC update for later - let cgc_to_announce = cgc_at_oldest_relevant_slot; + // `oldest_block_slot` already tracked with metric `store_beacon_oldest_block_slot` + // `finalized_slot` already tracked with metrics `beacon_head_state_finalized_epoch * 32` - // update_enr_cgc updates the NetworkGlobals ENR - match self.libp2p.update_enr_cgc(cgc_to_announce) { - Ok(updated) => { - if updated { - info!(cgc = cgc_to_announce, "Updated ENR custody group count"); - } - } - Err(e) => { - crit!(error = ?e, "Error updating local ENR custody group count"); - } - } - - metrics::set_gauge(&metrics::CGC_INTERNAL, next_cgc as i64); - metrics::set_gauge(&metrics::CGC_ANNOUNCED, cgc_to_announce as i64); - metrics::set_gauge(&metrics::LOCAL_INDICES_COUNT, local_indices_count as i64); metrics::set_gauge( - &metrics::LOCAL_INDICES_ETH_BALANCE, - known_validators_balance as i64 / 1_000_000_000, + &metrics::CGC_MIN_BACKFILL_RANGE, + min_cgc_data_columns_backfill_range as i64, ); + metrics::set_gauge(&metrics::CGC_FINALIZED_SLOT, cgc_at_finalized_slot as i64); + + debug!( + %oldest_block_slot, + %finalized_slot, + %oldest_slot_with_data_columns, + min_cgc_data_columns_backfill_range, + cgc_at_finalized_slot, + backfill_started_recently, + backfill_finished, + restart_backfill_sync, + "Maybe restarting backfill sync for CGC backfill" + ); + } + + /// Prunes CGC updates that are older than the oldest slot with non-pruned data columns + fn prune_old_cgc_updates(&self) { + let Some(oldest_slot_with_data_columns) = self + .beacon_chain + .data_availability_checker + .oldest_epoch_with_data_columns() + .map(|epoch| epoch.start_slot(T::EthSpec::slots_per_epoch())) + else { + return; // None before Fulu and on clock error + }; + + let len_before = self.network_globals.cgc_updates_len(); + self.network_globals + .prune_cgc_updates_older_than(oldest_slot_with_data_columns); + let len_after = self.network_globals.cgc_updates_len(); + if len_after < len_before { + debug!(len_after, len_before, %oldest_slot_with_data_columns, "Pruned old CGC updates"); + + // Persist the entry to the store + let cgc_updates = self.network_globals.dump_cgc_updates(); + if let Err(e) = self.beacon_chain.store.put_cgc_updates(cgc_updates) { + // Do not update the memory value unless it's persisted to disk + error!(error = ?e, "Unable to persist CGC updates to disk"); + } + } } fn on_subnet_service_msg(&mut self, msg: SubnetServiceMessage) { @@ -1062,14 +1171,14 @@ impl NetworkService { } fn subscribe(&mut self, kind: GossipKind) { - for fork_digest in self.required_gossip_fork_digests() { + for fork_digest in self.required_gossip_forks_to_subscribe(&kind) { let topic = GossipTopic::new(kind.clone(), GossipEncoding::default(), fork_digest); self.libp2p.subscribe(topic); } } fn unsubscribe(&mut self, kind: GossipKind) { - for fork_digest in self.required_gossip_fork_digests() { + for fork_digest in self.required_gossip_forks_to_subscribe(&kind) { let topic = GossipTopic::new(kind.clone(), GossipEncoding::default(), fork_digest); self.libp2p.unsubscribe(topic); } diff --git a/consensus/types/src/custody.rs b/consensus/types/src/custody.rs index 6614a65b3b..96cc68752a 100644 --- a/consensus/types/src/custody.rs +++ b/consensus/types/src/custody.rs @@ -14,10 +14,14 @@ pub struct CGCUpdates { updates: VariableList, } +#[allow(clippy::len_without_is_empty)] impl CGCUpdates { - pub fn new(initial_update: CGCUpdate) -> Self { + pub fn new(initial_cgc: u64) -> Self { + // The slot of the initial update doesn't matter. It's only relevant when pushing the next + // update if it has the same Slot. Otherwise, the result function `cgc(slot)` is independent + // of the value of initial_update.slot Self { - updates: VariableList::new(vec![initial_update]).expect("1 < 131072"), + updates: VariableList::new(vec![(Slot::new(0), initial_cgc)]).expect("1 < 131072"), } } @@ -51,8 +55,8 @@ impl CGCUpdates { } /// Returns the ordered list of CGC values in the range of slots `range`. If the range is empty, - /// i.e. `slot..slot` returns the CGC value at `slot`. The return vector will never be empty. - pub fn at_slot_range(&self, range: Range) -> Vec { + /// i.e. `3..1` returns the CGC value at `range.start`. The return vector will never be empty. + fn at_slot_range(&self, range: Range) -> Vec { let first_update_index = self.update_index_at_slot(range.start); let cgcs = self @@ -75,6 +79,16 @@ impl CGCUpdates { } } + /// Returns the minimum CGC value in the range of slots `range`. If the range is empty, + /// i.e. `slot..slot` returns the CGC value at `slot`. + pub fn min_at_slot_range(&self, range: Range) -> u64 { + *self + .at_slot_range(range) + .iter() + .min() + .expect("at_slot_range never returns empty Vec") + } + pub fn add_latest_update(&mut self, update: CGCUpdate) -> Result<(), String> { if let Some(last_update) = self.updates.last_mut() { match last_update.0.cmp(&update.0) { @@ -119,6 +133,10 @@ impl CGCUpdates { pub fn iter(&self) -> impl Iterator + '_ { self.updates.iter().copied() } + + pub fn len(&self) -> usize { + self.updates.len() + } } #[cfg(test)] @@ -126,8 +144,8 @@ mod tests { use super::*; fn new(updates: &[(u64, u64)]) -> CGCUpdates { - let first_update = *updates.get(0).expect("should have at least one update"); - let mut u = CGCUpdates::new(to(first_update)); + let first_update = *updates.first().expect("should have at least one update"); + let mut u = CGCUpdates::new(first_update.1); for update in updates.iter().skip(1) { u.add_latest_update(to(*update)).unwrap(); }