mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-08 17:26:04 +00:00
Modularize CGC logic in network service
This commit is contained in:
@@ -19,7 +19,7 @@ use tracing::{debug, error, info_span, Instrument};
|
|||||||
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
|
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
|
||||||
use types::{
|
use types::{
|
||||||
BlobSidecarList, ChainSpec, DataColumnIdentifier, DataColumnSidecar, DataColumnSidecarList,
|
BlobSidecarList, ChainSpec, DataColumnIdentifier, DataColumnSidecar, DataColumnSidecarList,
|
||||||
Epoch, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock,
|
Epoch, EthSpec, ForkName, Hash256, RuntimeVariableList, SignedBeaconBlock,
|
||||||
};
|
};
|
||||||
|
|
||||||
mod error;
|
mod error;
|
||||||
@@ -495,10 +495,23 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
|||||||
fork_epoch,
|
fork_epoch,
|
||||||
current_slot
|
current_slot
|
||||||
.epoch(T::EthSpec::slots_per_epoch())
|
.epoch(T::EthSpec::slots_per_epoch())
|
||||||
|
// TODO(das): use min_epochs_for_data_columns
|
||||||
.saturating_sub(self.spec.min_epochs_for_blob_sidecars_requests),
|
.saturating_sub(self.spec.min_epochs_for_blob_sidecars_requests),
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn oldest_epoch_with_data_columns(&self) -> Option<Epoch> {
|
||||||
|
let fulu_fork_epoch = self.spec.fork_epoch(ForkName::Fulu)?;
|
||||||
|
if fulu_fork_epoch == Epoch::max_value() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
let current_epoch = self.slot_clock.now()?.epoch(T::EthSpec::slots_per_epoch());
|
||||||
|
Some(std::cmp::max(
|
||||||
|
fulu_fork_epoch,
|
||||||
|
current_epoch.saturating_sub(self.spec.min_epochs_for_blob_sidecars_requests),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns true if the given epoch lies within the da boundary and false otherwise.
|
/// Returns true if the given epoch lies within the da boundary and false otherwise.
|
||||||
pub fn da_check_required_for_epoch(&self, block_epoch: Epoch) -> bool {
|
pub fn da_check_required_for_epoch(&self, block_epoch: Epoch) -> bool {
|
||||||
self.data_availability_boundary()
|
self.data_availability_boundary()
|
||||||
|
|||||||
@@ -931,8 +931,10 @@ impl<E: EthSpec> Network<E> {
|
|||||||
warn!(%topic, error = ?e, "Failed to subscribe to topic");
|
warn!(%topic, error = ?e, "Failed to subscribe to topic");
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
Ok(_) => {
|
Ok(new_subscription) => {
|
||||||
debug!(%topic, "Subscribed to topic");
|
if new_subscription {
|
||||||
|
debug!(%topic, "Subscribed to topic");
|
||||||
|
}
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ use crate::{Client, Enr, EnrExt, GossipTopic, Multiaddr, NetworkConfig, PeerId};
|
|||||||
use local_metadata::LocalMetadata;
|
use local_metadata::LocalMetadata;
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
|
use std::ops::Range;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use types::data_column_custody_group::{
|
use types::data_column_custody_group::{
|
||||||
compute_columns_from_custody_groups, compute_subnets_from_custody_groups, get_custody_groups,
|
compute_columns_from_custody_groups, compute_subnets_from_custody_groups, get_custody_groups,
|
||||||
@@ -141,6 +142,12 @@ impl<E: EthSpec> NetworkGlobals<E> {
|
|||||||
self.cgc_updates.read().at_slot(slot)
|
self.cgc_updates.read().at_slot(slot)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns the minimum CGC value in the range of slots `range`. If the range is empty,
|
||||||
|
/// i.e. `3..1` returns the CGC value at `range.start`.
|
||||||
|
pub fn min_custody_group_count_at_range(&self, slot_range: Range<Slot>) -> u64 {
|
||||||
|
self.cgc_updates.read().min_at_slot_range(slot_range)
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns the count of custody columns this node must sample for block import
|
/// Returns the count of custody columns this node must sample for block import
|
||||||
pub fn custody_columns_count(&self, slot: Slot) -> u64 {
|
pub fn custody_columns_count(&self, slot: Slot) -> u64 {
|
||||||
// This only panics if the chain spec contains invalid values
|
// This only panics if the chain spec contains invalid values
|
||||||
@@ -164,6 +171,10 @@ impl<E: EthSpec> NetworkGlobals<E> {
|
|||||||
self.cgc_updates.read().clone()
|
self.cgc_updates.read().clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn cgc_updates_len(&self) -> usize {
|
||||||
|
self.cgc_updates.read().len()
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns the number of libp2p connected peers.
|
/// Returns the number of libp2p connected peers.
|
||||||
pub fn connected_peers(&self) -> usize {
|
pub fn connected_peers(&self) -> usize {
|
||||||
self.peers.read().connected_peer_ids().count()
|
self.peers.read().connected_peer_ids().count()
|
||||||
|
|||||||
@@ -195,6 +195,27 @@ impl std::fmt::Display for GossipKind {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl GossipKind {
|
||||||
|
/// Returns the ForkName after which this GossipKind can form a valid topic
|
||||||
|
pub fn fork_activation(&self) -> ForkName {
|
||||||
|
match self {
|
||||||
|
Self::BeaconBlock => ForkName::Base,
|
||||||
|
Self::BeaconAggregateAndProof => ForkName::Base,
|
||||||
|
Self::BlobSidecar(_) => ForkName::Deneb,
|
||||||
|
Self::DataColumnSidecar(_) => ForkName::Fulu,
|
||||||
|
Self::Attestation(_) => ForkName::Base,
|
||||||
|
Self::VoluntaryExit => ForkName::Base,
|
||||||
|
Self::ProposerSlashing => ForkName::Base,
|
||||||
|
Self::AttesterSlashing => ForkName::Base,
|
||||||
|
Self::SignedContributionAndProof => ForkName::Altair,
|
||||||
|
Self::SyncCommitteeMessage(_) => ForkName::Altair,
|
||||||
|
Self::BlsToExecutionChange => ForkName::Capella,
|
||||||
|
Self::LightClientFinalityUpdate => ForkName::Altair,
|
||||||
|
Self::LightClientOptimisticUpdate => ForkName::Altair,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// The known encoding types for gossipsub messages.
|
/// The known encoding types for gossipsub messages.
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash, Default)]
|
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash, Default)]
|
||||||
pub enum GossipEncoding {
|
pub enum GossipEncoding {
|
||||||
|
|||||||
@@ -655,10 +655,34 @@ pub static CGC_ANNOUNCED: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
|
|||||||
"Current announced Custody Group Count CGC",
|
"Current announced Custody Group Count CGC",
|
||||||
)
|
)
|
||||||
});
|
});
|
||||||
pub static CGC_UPDATES: LazyLock<Result<IntCounter>> = LazyLock::new(|| {
|
pub static CGC_MIN_BACKFILL_RANGE: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
|
||||||
|
try_create_int_gauge(
|
||||||
|
"beacon_custody_cgc_min_backfill_range",
|
||||||
|
"Current min CGC value in backfill range",
|
||||||
|
)
|
||||||
|
});
|
||||||
|
pub static CGC_FINALIZED_SLOT: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
|
||||||
|
try_create_int_gauge(
|
||||||
|
"beacon_custody_cgc_finalized_slot",
|
||||||
|
"Current CGC value at the finalized slot",
|
||||||
|
)
|
||||||
|
});
|
||||||
|
pub static CGC_UPDATES_LENGTH: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
|
||||||
|
try_create_int_gauge(
|
||||||
|
"beacon_custody_cgc_updates_length",
|
||||||
|
"Current length of stored Custody Group Count CGC updates",
|
||||||
|
)
|
||||||
|
});
|
||||||
|
pub static CGC_UPDATE_EVENTS: LazyLock<Result<IntCounter>> = LazyLock::new(|| {
|
||||||
try_create_int_counter(
|
try_create_int_counter(
|
||||||
"beacon_custody_cgc_updates",
|
"beacon_custody_cgc_update_events",
|
||||||
"Total count of Custody Group Count CGC updates",
|
"Total count of the Custody Group Count CGC is updated",
|
||||||
|
)
|
||||||
|
});
|
||||||
|
pub static BACKFILL_RESTARTED_FOR_CGC: LazyLock<Result<IntCounter>> = LazyLock::new(|| {
|
||||||
|
try_create_int_counter(
|
||||||
|
"beacon_custody_backfill_restarted_for_cgc",
|
||||||
|
"Total count of times backfill has restarted to backfill a higher CGC",
|
||||||
)
|
)
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -34,7 +34,7 @@ use tokio::sync::mpsc;
|
|||||||
use tokio::time::Sleep;
|
use tokio::time::Sleep;
|
||||||
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
|
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
|
||||||
use types::{
|
use types::{
|
||||||
ChainSpec, Epoch, EthSpec, ForkContext, ForkName, Slot, SubnetId, SyncCommitteeSubscription,
|
ChainSpec, Epoch, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription,
|
||||||
SyncSubnetId, Unsigned, ValidatorSubscription,
|
SyncSubnetId, Unsigned, ValidatorSubscription,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -388,8 +388,6 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
|||||||
validator_subscription_recv,
|
validator_subscription_recv,
|
||||||
} = network_receivers;
|
} = network_receivers;
|
||||||
|
|
||||||
let epoch_in_seconds = spec.seconds_per_slot * T::EthSpec::slots_per_epoch();
|
|
||||||
|
|
||||||
// create the network service and spawn the task
|
// create the network service and spawn the task
|
||||||
let network_service = NetworkService {
|
let network_service = NetworkService {
|
||||||
beacon_chain,
|
beacon_chain,
|
||||||
@@ -407,7 +405,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
|||||||
metrics_enabled: config.metrics_enabled,
|
metrics_enabled: config.metrics_enabled,
|
||||||
metrics_update,
|
metrics_update,
|
||||||
gossipsub_parameter_update,
|
gossipsub_parameter_update,
|
||||||
cgc_update_interval: tokio::time::interval(Duration::from_secs(epoch_in_seconds)),
|
cgc_update_interval: tokio::time::interval(Duration::from_secs(spec.seconds_per_slot)),
|
||||||
fork_context,
|
fork_context,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -442,37 +440,33 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
|||||||
///
|
///
|
||||||
/// For `current_slot < fork_slot`, this function returns both the pre-fork and post-fork
|
/// For `current_slot < fork_slot`, this function returns both the pre-fork and post-fork
|
||||||
/// digests since we should be subscribed to post fork topics before the fork.
|
/// digests since we should be subscribed to post fork topics before the fork.
|
||||||
pub fn required_gossip_fork_digests(&self) -> Vec<[u8; 4]> {
|
pub fn required_gossip_forks_to_subscribe(&self, topic: &GossipKind) -> Vec<[u8; 4]> {
|
||||||
let fork_context = &self.fork_context;
|
let fork_context = &self.fork_context;
|
||||||
let spec = &self.beacon_chain.spec;
|
let spec = &self.beacon_chain.spec;
|
||||||
let current_slot = self.beacon_chain.slot().unwrap_or(spec.genesis_slot);
|
let current_slot = self.beacon_chain.slot().unwrap_or(spec.genesis_slot);
|
||||||
let current_fork = fork_context.current_fork();
|
let current_fork = fork_context.current_fork();
|
||||||
|
|
||||||
let mut result = vec![fork_context
|
let mut required_fork_to_subscribe = vec![current_fork];
|
||||||
.to_context_bytes(current_fork)
|
|
||||||
.unwrap_or_else(|| {
|
|
||||||
panic!(
|
|
||||||
"{} fork bytes should exist as it's initialized in ForkContext",
|
|
||||||
current_fork
|
|
||||||
)
|
|
||||||
})];
|
|
||||||
|
|
||||||
if let Some((next_fork, fork_epoch)) = spec.next_fork_epoch::<T::EthSpec>(current_slot) {
|
if let Some((next_fork, fork_epoch)) = spec.next_fork_epoch::<T::EthSpec>(current_slot) {
|
||||||
if current_slot.saturating_add(Slot::new(SUBSCRIBE_DELAY_SLOTS))
|
if current_slot.saturating_add(Slot::new(SUBSCRIBE_DELAY_SLOTS))
|
||||||
>= fork_epoch.start_slot(T::EthSpec::slots_per_epoch())
|
>= fork_epoch.start_slot(T::EthSpec::slots_per_epoch())
|
||||||
{
|
{
|
||||||
let next_fork_context_bytes =
|
required_fork_to_subscribe.push(next_fork);
|
||||||
fork_context.to_context_bytes(next_fork).unwrap_or_else(|| {
|
|
||||||
panic!(
|
|
||||||
"context bytes should exist as spec.next_fork_epoch({}) returned Some({})",
|
|
||||||
current_slot, next_fork
|
|
||||||
)
|
|
||||||
});
|
|
||||||
result.push(next_fork_context_bytes);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
result
|
required_fork_to_subscribe
|
||||||
|
.into_iter()
|
||||||
|
// This filter prevents subscribing to Electra data_columns topic if we call
|
||||||
|
// `Self::subscribe` close enough to the Fulu fork.
|
||||||
|
.filter(|fork| topic.fork_activation() >= *fork)
|
||||||
|
.map(|fork| {
|
||||||
|
fork_context
|
||||||
|
.to_context_bytes(fork)
|
||||||
|
.unwrap_or_else(|| panic!("context bytes should exist for {fork}"))
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send_to_router(&mut self, msg: RouterMessage<T::EthSpec>) {
|
fn send_to_router(&mut self, msg: RouterMessage<T::EthSpec>) {
|
||||||
@@ -770,7 +764,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
|||||||
&self.network_globals.as_topic_config(self.clock_slot()),
|
&self.network_globals.as_topic_config(self.clock_slot()),
|
||||||
&self.fork_context.spec,
|
&self.fork_context.spec,
|
||||||
) {
|
) {
|
||||||
for fork_digest in self.required_gossip_fork_digests() {
|
for fork_digest in self.required_gossip_forks_to_subscribe(&topic_kind) {
|
||||||
let topic = GossipTopic::new(
|
let topic = GossipTopic::new(
|
||||||
topic_kind.clone(),
|
topic_kind.clone(),
|
||||||
GossipEncoding::default(),
|
GossipEncoding::default(),
|
||||||
@@ -852,16 +846,47 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
|||||||
fn on_cgc_update_interval(&mut self) {
|
fn on_cgc_update_interval(&mut self) {
|
||||||
// Skip running this function if Fulu is not scheduled. But run it before the fork to start
|
// Skip running this function if Fulu is not scheduled. But run it before the fork to start
|
||||||
// announcing the CGC ahead of the fork.
|
// announcing the CGC ahead of the fork.
|
||||||
let fulu_fork_epoch = match self.beacon_chain.spec.fork_epoch(ForkName::Fulu) {
|
if self.beacon_chain.spec.is_peer_das_scheduled() {
|
||||||
None => return,
|
return;
|
||||||
Some(epoch) if epoch == Epoch::max_value() => return,
|
}
|
||||||
Some(epoch) => epoch,
|
|
||||||
};
|
|
||||||
|
|
||||||
let prev_cgc = self.network_globals.custody_group_count(Slot::max_value());
|
let Ok(clock_slot) = self.beacon_chain.slot() else {
|
||||||
let Ok(clock_epoch) = self.beacon_chain.epoch() else {
|
// If the clcok is faulty just ignore CGC updates
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Runs every slot, but we only compute the next cgc halfway through the epoch. We want to
|
||||||
|
// make sure that we don't reach `clock_slot` before updating the CGCUpdates map. Half epoch
|
||||||
|
// is plenty of time.
|
||||||
|
if clock_slot % T::EthSpec::slots_per_epoch() == T::EthSpec::slots_per_epoch() / 2 {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let next_cgc = self.compute_current_head_cgc();
|
||||||
|
let prev_cgc = self.network_globals.custody_group_count(Slot::max_value());
|
||||||
|
|
||||||
|
// TODO(das): For now do not support a decrease in CGC. If we allow to decrease and increase
|
||||||
|
// we can have a scaled plot over time of `CGC(time)` where `min(CGC(time))` is not the
|
||||||
|
// oldest value. We would need to adjust some logic below to consider the minimum value
|
||||||
|
// through a range of slots, instead of the value at the oldest slot.
|
||||||
|
if next_cgc > prev_cgc {
|
||||||
|
self.on_cgc_increase(prev_cgc, next_cgc, clock_slot);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prune updates that are older than the last slot with data columns
|
||||||
|
self.prune_old_cgc_updates();
|
||||||
|
self.maybe_restart_backfill_sync_for_cgc_backfill();
|
||||||
|
self.maybe_update_cgc_to_announce();
|
||||||
|
|
||||||
|
// Set next_cgc metric regardless if next value to track initial default value
|
||||||
|
let cgc_updates_len = self.network_globals.cgc_updates_len();
|
||||||
|
metrics::set_gauge(&metrics::CGC_UPDATES_LENGTH, cgc_updates_len as i64);
|
||||||
|
metrics::set_gauge(&metrics::CGC_INTERNAL, next_cgc as i64);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Computes the custody group count (CGC) based on the current set of connected local indices
|
||||||
|
/// and the head state balances.
|
||||||
|
fn compute_current_head_cgc(&self) -> u64 {
|
||||||
let cached_head = self.beacon_chain.canonical_head.cached_head();
|
let cached_head = self.beacon_chain.canonical_head.cached_head();
|
||||||
|
|
||||||
self.beacon_chain
|
self.beacon_chain
|
||||||
@@ -888,90 +913,138 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
|||||||
})
|
})
|
||||||
.sum::<u64>();
|
.sum::<u64>();
|
||||||
|
|
||||||
|
metrics::set_gauge(&metrics::LOCAL_INDICES_COUNT, local_indices_count as i64);
|
||||||
|
metrics::set_gauge(
|
||||||
|
&metrics::LOCAL_INDICES_ETH_BALANCE,
|
||||||
|
known_validators_balance as i64 / 1_000_000_000,
|
||||||
|
);
|
||||||
|
|
||||||
// TODO(das): Should persist the local indices here to dump to DB once in a while in case of
|
// TODO(das): Should persist the local indices here to dump to DB once in a while in case of
|
||||||
// improper shutdown? Not sure if we do the same for other persisted data. It sounds
|
// improper shutdown? Not sure if we do the same for other persisted data. It sounds
|
||||||
// sensible but at the same time it will waste I/O.
|
// sensible but at the same time it will waste I/O.
|
||||||
|
|
||||||
let next_cgc = self
|
self.beacon_chain
|
||||||
.beacon_chain
|
|
||||||
.spec
|
.spec
|
||||||
.custody_group_by_balance(known_validators_balance);
|
.custody_group_by_balance(known_validators_balance)
|
||||||
|
}
|
||||||
|
|
||||||
// TODO(das): For now do not support a decrease in CGC. If we allow to decrease and increase
|
/// Handler for an increase of the internal CGC value.
|
||||||
// we can have a scaled plot over time of `CGC(time)` where `min(CGC(time))` is not the
|
fn on_cgc_increase(&mut self, prev_cgc: u64, next_cgc: u64, clock_slot: Slot) {
|
||||||
// oldest value. We would need to adjust some logic below to consider the minimum value
|
let clock_epoch = clock_slot.epoch(T::EthSpec::slots_per_epoch());
|
||||||
// through a range of slots, instead of the value at the oldest slot.
|
let next_epoch = clock_epoch + Epoch::new(1);
|
||||||
if next_cgc > prev_cgc {
|
let next_epoch_start_slot = next_epoch.start_slot(T::EthSpec::slots_per_epoch());
|
||||||
// TODO(das): Should we consider the case where the clock is almost at the end of the epoch?
|
|
||||||
// If I/O is slow we may update the in-memory map for an epoch that's already
|
|
||||||
// progressing.
|
|
||||||
let next_epoch = clock_epoch + 1;
|
|
||||||
info!(prev_cgc, next_cgc, %next_epoch, "Updating internal custody count");
|
|
||||||
metrics::inc_counter(&metrics::CGC_UPDATES);
|
|
||||||
|
|
||||||
// Add a new entry to the network globals
|
metrics::inc_counter(&metrics::CGC_UPDATE_EVENTS);
|
||||||
if let Err(e) = self.network_globals.add_cgc_update(
|
info!(prev_cgc, next_cgc, slot = %next_epoch_start_slot, "Updating internal custody count");
|
||||||
next_epoch.start_slot(T::EthSpec::slots_per_epoch()),
|
|
||||||
next_cgc,
|
|
||||||
) {
|
|
||||||
error!("List of CGC updates full: {e:?}");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
let cgc_updates = self.network_globals.dump_cgc_updates();
|
|
||||||
|
|
||||||
// Persist the entry to the store
|
// Add a new entry to the network globals
|
||||||
if let Err(e) = self.beacon_chain.store.put_cgc_updates(cgc_updates) {
|
if let Err(e) = self
|
||||||
// Do not update the memory value unless it's persisted to disk
|
.network_globals
|
||||||
error!(error = ?e, "Unable to persist CGC updates to disk");
|
.add_cgc_update(next_epoch_start_slot, next_cgc)
|
||||||
}
|
{
|
||||||
|
error!("List of CGC updates full: {e:?}");
|
||||||
// Update the gossip subscriptions
|
return;
|
||||||
let next_sampling_subnets = self
|
} else {
|
||||||
.network_globals
|
debug!(slot = %next_epoch_start_slot, cgc=next_cgc,"Added new CGC update entry");
|
||||||
.sampling_subnets_for_cgc(next_cgc)
|
|
||||||
.iter()
|
|
||||||
.copied()
|
|
||||||
.collect::<HashSet<_>>();
|
|
||||||
let prev_sampling_subnets = self
|
|
||||||
.network_globals
|
|
||||||
.sampling_subnets_for_cgc(prev_cgc)
|
|
||||||
.iter()
|
|
||||||
.copied()
|
|
||||||
.collect::<HashSet<_>>();
|
|
||||||
if next_cgc > prev_cgc {
|
|
||||||
for subnet in next_sampling_subnets.difference(&prev_sampling_subnets) {
|
|
||||||
self.subscribe(Subnet::DataColumn(*subnet).into());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if next_cgc < prev_cgc {
|
|
||||||
for subnet in prev_sampling_subnets.difference(&next_sampling_subnets) {
|
|
||||||
self.unsubscribe(Subnet::DataColumn(*subnet).into());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(das): check the backfilled CGC and potentially update the network globals state
|
// Persist the entry to the store
|
||||||
// IDEA:
|
let cgc_updates = self.network_globals.dump_cgc_updates();
|
||||||
|
if let Err(e) = self.beacon_chain.store.put_cgc_updates(cgc_updates) {
|
||||||
|
// Do not update the memory value unless it's persisted to disk
|
||||||
|
error!(error = ?e, "Unable to persist CGC updates to disk");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the gossip subscriptions
|
||||||
|
//
|
||||||
|
// Note: Self::subscribe won't subscribe until we are close enough to the Fulu fork. So we
|
||||||
|
// can call this function eagerly every epoch. Topics for a higher CGC may be subscribed
|
||||||
|
// either here or in the topic fork transition function. Both will result in the same topic
|
||||||
|
// set, so we don't care about race conditions here.
|
||||||
|
let sampling_subnets_for = |cgc| {
|
||||||
|
self.network_globals
|
||||||
|
.sampling_subnets_for_cgc(cgc)
|
||||||
|
.iter()
|
||||||
|
.copied()
|
||||||
|
.collect::<HashSet<_>>()
|
||||||
|
};
|
||||||
|
let next_sampling_subnets = sampling_subnets_for(next_cgc);
|
||||||
|
let prev_sampling_subnets = sampling_subnets_for(prev_cgc);
|
||||||
|
|
||||||
|
if next_cgc > prev_cgc {
|
||||||
|
for subnet in next_sampling_subnets.difference(&prev_sampling_subnets) {
|
||||||
|
self.subscribe(Subnet::DataColumn(*subnet).into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if next_cgc < prev_cgc {
|
||||||
|
for subnet in prev_sampling_subnets.difference(&next_sampling_subnets) {
|
||||||
|
self.unsubscribe(Subnet::DataColumn(*subnet).into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// As per spec when we announced a CGC we MUST be able to serve columns for that CGC for all
|
||||||
|
/// slots with data columns within the DA window. Given a variable CGC over time the value we
|
||||||
|
/// can announce is the `min(CGC(slot))` over that time period.
|
||||||
|
fn maybe_update_cgc_to_announce(&mut self) {
|
||||||
|
let Some(oldest_slot_with_data_columns) = self
|
||||||
|
.beacon_chain
|
||||||
|
.data_availability_checker
|
||||||
|
.oldest_epoch_with_data_columns()
|
||||||
|
.map(|epoch| epoch.start_slot(T::EthSpec::slots_per_epoch()))
|
||||||
|
else {
|
||||||
|
return; // None before Fulu and on clock error
|
||||||
|
};
|
||||||
|
|
||||||
|
let cgc_to_announce = self
|
||||||
|
.network_globals
|
||||||
|
.min_custody_group_count_at_range(oldest_slot_with_data_columns..Slot::max_value());
|
||||||
|
|
||||||
|
let updated_enr = match self.libp2p.update_enr_cgc(cgc_to_announce) {
|
||||||
|
Ok(updated) => updated,
|
||||||
|
Err(e) => {
|
||||||
|
crit!(error = ?e, "Error updating local ENR custody group count");
|
||||||
|
false
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if updated_enr {
|
||||||
|
info!(cgc = cgc_to_announce, "Updated ENR custody group count");
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics::set_gauge(&metrics::CGC_ANNOUNCED, cgc_to_announce as i64);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Maybe restart block backfill to re-download batches with a higher CGC value. This strategy
|
||||||
|
/// prevents having to develop a new backfill sync algorithm just for custody columns.
|
||||||
|
fn maybe_restart_backfill_sync_for_cgc_backfill(&mut self) {
|
||||||
// When we forward sync and finalize a new block, we may restart backfill again from a later
|
// When we forward sync and finalize a new block, we may restart backfill again from a later
|
||||||
// block (the new finalized block). We will reset oldest_block to that block and fail
|
// block (the new finalized block). We will reset oldest_block to that block and fail
|
||||||
// backfill sync to start over from it. Then make backfill sync use a higher CGC (say 128)
|
// backfill sync to start over from it. Then make backfill sync use a higher CGC (say 128)
|
||||||
// and when oldest_block is less than the oldest step with a value < 128 we can delete that
|
// and when oldest_block is less than the oldest step with a value < 128 we can delete that
|
||||||
// step such that `custody_group_count(clock - da_window)` returns 128.
|
// step such that `custody_group_count(clock - da_window)` returns 128.
|
||||||
//
|
//
|
||||||
|
// Before restarting backfill a typical WS start results in the following plot. In the plot
|
||||||
|
// below the X axis is time and the Y axis is CGC value. CGCUpdates struct has two updates
|
||||||
|
// or steps: one at (Slot(0), 8) and a later (Slot(X), 128). Then backfill will download
|
||||||
|
// columns assuming a CGC = 8.
|
||||||
|
//
|
||||||
// CGC 128
|
// CGC 128
|
||||||
// : ___________:____________
|
// : ___________:____________
|
||||||
// : | :
|
// : | :
|
||||||
// CGC 8 : | :
|
// CGC 8 : | :
|
||||||
// ............:_____| :
|
// ____________:_____| :
|
||||||
// : WS start : Finalized
|
// : WS start : Finalized
|
||||||
// <------ block block
|
// <------ block block
|
||||||
// backfill
|
// backfill
|
||||||
// with CGC 8
|
// with CGC 8
|
||||||
//
|
//
|
||||||
//
|
//
|
||||||
|
// This function will remove from CGCUpdates all updates older than Slot(X), such that we
|
||||||
|
// get this `CGC(slot)` plot. Now backfill will fetch batches for CGC = 128. However, we
|
||||||
|
// need to re-download all batches since Slot(X).
|
||||||
//
|
//
|
||||||
// CGC 128
|
// CGC 128
|
||||||
// .............................. ___________:____________
|
// ______________________________:____________
|
||||||
// :
|
// :
|
||||||
// :
|
// :
|
||||||
// :
|
// :
|
||||||
@@ -980,68 +1053,104 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
|||||||
// backfill
|
// backfill
|
||||||
// with CGC 128
|
// with CGC 128
|
||||||
//
|
//
|
||||||
let oldest_block_slot = self.beacon_chain.store.get_anchor_info().oldest_block_slot;
|
|
||||||
// TODO(das): use min_epochs_for_data_columns
|
|
||||||
let last_pruned_epoch = clock_epoch.saturating_sub(Epoch::new(
|
|
||||||
self.beacon_chain.spec.min_epochs_for_blob_sidecars_requests,
|
|
||||||
));
|
|
||||||
let last_pruned_slot = last_pruned_epoch.start_slot(T::EthSpec::slots_per_epoch());
|
|
||||||
let fulu_fork_slot = fulu_fork_epoch.start_slot(T::EthSpec::slots_per_epoch());
|
|
||||||
let oldest_relevant_slot = std::cmp::max(
|
|
||||||
oldest_block_slot,
|
|
||||||
std::cmp::max(last_pruned_slot, fulu_fork_slot),
|
|
||||||
);
|
|
||||||
|
|
||||||
let finalized_slot = self.beacon_chain.finalized_slot();
|
let finalized_slot = self.beacon_chain.finalized_slot();
|
||||||
|
let Some(oldest_slot_with_data_columns) = self
|
||||||
|
.beacon_chain
|
||||||
|
.data_availability_checker
|
||||||
|
.oldest_epoch_with_data_columns()
|
||||||
|
.map(|epoch| epoch.start_slot(T::EthSpec::slots_per_epoch()))
|
||||||
|
else {
|
||||||
|
return; // None before Fulu and on clock error
|
||||||
|
};
|
||||||
|
|
||||||
let cgc_at_oldest_relevant_slot = self
|
let min_cgc_data_columns_backfill_range = self
|
||||||
.network_globals
|
.network_globals
|
||||||
.custody_group_count(oldest_relevant_slot);
|
.min_custody_group_count_at_range(oldest_slot_with_data_columns..finalized_slot);
|
||||||
let cgc_at_finalized_slot = self.network_globals.custody_group_count(finalized_slot);
|
let cgc_at_finalized_slot = self.network_globals.custody_group_count(finalized_slot);
|
||||||
|
|
||||||
|
let oldest_block_slot = self.beacon_chain.store.get_anchor_info().oldest_block_slot;
|
||||||
let backfill_started_recently =
|
let backfill_started_recently =
|
||||||
finalized_slot.saturating_sub(oldest_block_slot) < MAX_SLOT_DISTANCE_BACKFILL_RESTART;
|
finalized_slot.saturating_sub(oldest_block_slot) < MAX_SLOT_DISTANCE_BACKFILL_RESTART;
|
||||||
let backfill_finished = oldest_block_slot == Slot::new(0);
|
let backfill_finished = oldest_block_slot == Slot::new(0);
|
||||||
|
|
||||||
// TODO(das): If we support a decreasing CGC we must consider the min value between this two
|
// Backfill sync may currently be active and downloading batches of columns. `oldest_block_slot`
|
||||||
// slots.
|
// only represents the current latest imported batch of backfill blocks, but more could be
|
||||||
|
// downloaded and pending. So for completeness we ask:
|
||||||
|
//
|
||||||
|
// Q: What are the complete set of CGC values that backfill sync can possibly have ever
|
||||||
|
// downloaded columns from?
|
||||||
|
// A: The set of CGC values between `oldest_slot_with_data_columns..finalized_slot`.
|
||||||
|
// `oldest_slot_with_data_columns` is a dynamic but strictly increasing values. We will
|
||||||
|
// never download relevant columns less than this value. `finalized_slot` is a dynamic
|
||||||
|
// strictly increasing value, so we will reset backfill sync to that value.
|
||||||
//
|
//
|
||||||
// Skip if backfill has finished. State reconstruction may have already started and we could
|
// Skip if backfill has finished. State reconstruction may have already started and we could
|
||||||
// mess with the DB. For real networks Fulu fork is way ahead of genesis so it won't affect
|
// mess with the DB. For real networks Fulu fork is way ahead of genesis so it won't affect
|
||||||
if cgc_at_oldest_relevant_slot < cgc_at_finalized_slot
|
let restart_backfill_sync = min_cgc_data_columns_backfill_range < cgc_at_finalized_slot
|
||||||
&& backfill_started_recently
|
&& backfill_started_recently
|
||||||
&& !backfill_finished
|
&& !backfill_finished;
|
||||||
{
|
if restart_backfill_sync {
|
||||||
// We need backfill sync to fetch batches with `CGC_f = cgc_at_finalized_slot`. Then
|
// We need backfill sync to fetch batches with `CGC_f = cgc_at_finalized_slot`. Then
|
||||||
// `custody_group_count(oldest_block_slot) should now return `CGC_f`. So we have to
|
// `custody_group_count(oldest_block_slot) should now return `CGC_f`. So we have to
|
||||||
// delete the CGC updates with `update.slot < finalized_slot`
|
// delete the CGC updates with `update.slot < finalized_slot`
|
||||||
|
//
|
||||||
|
// TODO(das): Don't start from finalized_slot, but instead the closest step before
|
||||||
|
// `finalized_slot`.
|
||||||
self.network_globals
|
self.network_globals
|
||||||
.prune_cgc_updates_older_than(finalized_slot);
|
.prune_cgc_updates_older_than(finalized_slot);
|
||||||
self.send_to_router(RouterMessage::BackfillSyncRestart(finalized_slot));
|
self.send_to_router(RouterMessage::BackfillSyncRestart(finalized_slot));
|
||||||
|
|
||||||
|
info!(slot = %finalized_slot, "Restarting backfill sync to fetch custody columns");
|
||||||
|
metrics::inc_counter(&metrics::BACKFILL_RESTARTED_FOR_CGC);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Schedule an advertise CGC update for later
|
// `oldest_block_slot` already tracked with metric `store_beacon_oldest_block_slot`
|
||||||
let cgc_to_announce = cgc_at_oldest_relevant_slot;
|
// `finalized_slot` already tracked with metrics `beacon_head_state_finalized_epoch * 32`
|
||||||
|
|
||||||
// update_enr_cgc updates the NetworkGlobals ENR
|
|
||||||
match self.libp2p.update_enr_cgc(cgc_to_announce) {
|
|
||||||
Ok(updated) => {
|
|
||||||
if updated {
|
|
||||||
info!(cgc = cgc_to_announce, "Updated ENR custody group count");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
crit!(error = ?e, "Error updating local ENR custody group count");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
metrics::set_gauge(&metrics::CGC_INTERNAL, next_cgc as i64);
|
|
||||||
metrics::set_gauge(&metrics::CGC_ANNOUNCED, cgc_to_announce as i64);
|
|
||||||
metrics::set_gauge(&metrics::LOCAL_INDICES_COUNT, local_indices_count as i64);
|
|
||||||
metrics::set_gauge(
|
metrics::set_gauge(
|
||||||
&metrics::LOCAL_INDICES_ETH_BALANCE,
|
&metrics::CGC_MIN_BACKFILL_RANGE,
|
||||||
known_validators_balance as i64 / 1_000_000_000,
|
min_cgc_data_columns_backfill_range as i64,
|
||||||
);
|
);
|
||||||
|
metrics::set_gauge(&metrics::CGC_FINALIZED_SLOT, cgc_at_finalized_slot as i64);
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
%oldest_block_slot,
|
||||||
|
%finalized_slot,
|
||||||
|
%oldest_slot_with_data_columns,
|
||||||
|
min_cgc_data_columns_backfill_range,
|
||||||
|
cgc_at_finalized_slot,
|
||||||
|
backfill_started_recently,
|
||||||
|
backfill_finished,
|
||||||
|
restart_backfill_sync,
|
||||||
|
"Maybe restarting backfill sync for CGC backfill"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Prunes CGC updates that are older than the oldest slot with non-pruned data columns
|
||||||
|
fn prune_old_cgc_updates(&self) {
|
||||||
|
let Some(oldest_slot_with_data_columns) = self
|
||||||
|
.beacon_chain
|
||||||
|
.data_availability_checker
|
||||||
|
.oldest_epoch_with_data_columns()
|
||||||
|
.map(|epoch| epoch.start_slot(T::EthSpec::slots_per_epoch()))
|
||||||
|
else {
|
||||||
|
return; // None before Fulu and on clock error
|
||||||
|
};
|
||||||
|
|
||||||
|
let len_before = self.network_globals.cgc_updates_len();
|
||||||
|
self.network_globals
|
||||||
|
.prune_cgc_updates_older_than(oldest_slot_with_data_columns);
|
||||||
|
let len_after = self.network_globals.cgc_updates_len();
|
||||||
|
if len_after < len_before {
|
||||||
|
debug!(len_after, len_before, %oldest_slot_with_data_columns, "Pruned old CGC updates");
|
||||||
|
|
||||||
|
// Persist the entry to the store
|
||||||
|
let cgc_updates = self.network_globals.dump_cgc_updates();
|
||||||
|
if let Err(e) = self.beacon_chain.store.put_cgc_updates(cgc_updates) {
|
||||||
|
// Do not update the memory value unless it's persisted to disk
|
||||||
|
error!(error = ?e, "Unable to persist CGC updates to disk");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_subnet_service_msg(&mut self, msg: SubnetServiceMessage) {
|
fn on_subnet_service_msg(&mut self, msg: SubnetServiceMessage) {
|
||||||
@@ -1062,14 +1171,14 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn subscribe(&mut self, kind: GossipKind) {
|
fn subscribe(&mut self, kind: GossipKind) {
|
||||||
for fork_digest in self.required_gossip_fork_digests() {
|
for fork_digest in self.required_gossip_forks_to_subscribe(&kind) {
|
||||||
let topic = GossipTopic::new(kind.clone(), GossipEncoding::default(), fork_digest);
|
let topic = GossipTopic::new(kind.clone(), GossipEncoding::default(), fork_digest);
|
||||||
self.libp2p.subscribe(topic);
|
self.libp2p.subscribe(topic);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn unsubscribe(&mut self, kind: GossipKind) {
|
fn unsubscribe(&mut self, kind: GossipKind) {
|
||||||
for fork_digest in self.required_gossip_fork_digests() {
|
for fork_digest in self.required_gossip_forks_to_subscribe(&kind) {
|
||||||
let topic = GossipTopic::new(kind.clone(), GossipEncoding::default(), fork_digest);
|
let topic = GossipTopic::new(kind.clone(), GossipEncoding::default(), fork_digest);
|
||||||
self.libp2p.unsubscribe(topic);
|
self.libp2p.unsubscribe(topic);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,10 +14,14 @@ pub struct CGCUpdates {
|
|||||||
updates: VariableList<CGCUpdate, ssz_types::typenum::U131072>,
|
updates: VariableList<CGCUpdate, ssz_types::typenum::U131072>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::len_without_is_empty)]
|
||||||
impl CGCUpdates {
|
impl CGCUpdates {
|
||||||
pub fn new(initial_update: CGCUpdate) -> Self {
|
pub fn new(initial_cgc: u64) -> Self {
|
||||||
|
// The slot of the initial update doesn't matter. It's only relevant when pushing the next
|
||||||
|
// update if it has the same Slot. Otherwise, the result function `cgc(slot)` is independent
|
||||||
|
// of the value of initial_update.slot
|
||||||
Self {
|
Self {
|
||||||
updates: VariableList::new(vec![initial_update]).expect("1 < 131072"),
|
updates: VariableList::new(vec![(Slot::new(0), initial_cgc)]).expect("1 < 131072"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -51,8 +55,8 @@ impl CGCUpdates {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the ordered list of CGC values in the range of slots `range`. If the range is empty,
|
/// Returns the ordered list of CGC values in the range of slots `range`. If the range is empty,
|
||||||
/// i.e. `slot..slot` returns the CGC value at `slot`. The return vector will never be empty.
|
/// i.e. `3..1` returns the CGC value at `range.start`. The return vector will never be empty.
|
||||||
pub fn at_slot_range(&self, range: Range<Slot>) -> Vec<u64> {
|
fn at_slot_range(&self, range: Range<Slot>) -> Vec<u64> {
|
||||||
let first_update_index = self.update_index_at_slot(range.start);
|
let first_update_index = self.update_index_at_slot(range.start);
|
||||||
|
|
||||||
let cgcs = self
|
let cgcs = self
|
||||||
@@ -75,6 +79,16 @@ impl CGCUpdates {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns the minimum CGC value in the range of slots `range`. If the range is empty,
|
||||||
|
/// i.e. `slot..slot` returns the CGC value at `slot`.
|
||||||
|
pub fn min_at_slot_range(&self, range: Range<Slot>) -> u64 {
|
||||||
|
*self
|
||||||
|
.at_slot_range(range)
|
||||||
|
.iter()
|
||||||
|
.min()
|
||||||
|
.expect("at_slot_range never returns empty Vec")
|
||||||
|
}
|
||||||
|
|
||||||
pub fn add_latest_update(&mut self, update: CGCUpdate) -> Result<(), String> {
|
pub fn add_latest_update(&mut self, update: CGCUpdate) -> Result<(), String> {
|
||||||
if let Some(last_update) = self.updates.last_mut() {
|
if let Some(last_update) = self.updates.last_mut() {
|
||||||
match last_update.0.cmp(&update.0) {
|
match last_update.0.cmp(&update.0) {
|
||||||
@@ -119,6 +133,10 @@ impl CGCUpdates {
|
|||||||
pub fn iter(&self) -> impl Iterator<Item = CGCUpdate> + '_ {
|
pub fn iter(&self) -> impl Iterator<Item = CGCUpdate> + '_ {
|
||||||
self.updates.iter().copied()
|
self.updates.iter().copied()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn len(&self) -> usize {
|
||||||
|
self.updates.len()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -126,8 +144,8 @@ mod tests {
|
|||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
fn new(updates: &[(u64, u64)]) -> CGCUpdates {
|
fn new(updates: &[(u64, u64)]) -> CGCUpdates {
|
||||||
let first_update = *updates.get(0).expect("should have at least one update");
|
let first_update = *updates.first().expect("should have at least one update");
|
||||||
let mut u = CGCUpdates::new(to(first_update));
|
let mut u = CGCUpdates::new(first_update.1);
|
||||||
for update in updates.iter().skip(1) {
|
for update in updates.iter().skip(1) {
|
||||||
u.add_latest_update(to(*update)).unwrap();
|
u.add_latest_update(to(*update)).unwrap();
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user