More progress

This commit is contained in:
dapplion
2025-04-04 03:02:01 -03:00
parent 63a4e378ce
commit 614c01698d
20 changed files with 464 additions and 91 deletions

View File

@@ -32,6 +32,7 @@ use task_executor::ShutdownReason;
use tokio::sync::mpsc;
use tokio::time::Sleep;
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
use types::CGCUpdates;
use types::{
ChainSpec, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId,
Unsigned, ValidatorSubscription,
@@ -48,6 +49,8 @@ const UNSUBSCRIBE_DELAY_EPOCHS: u64 = 2;
/// Size of the queue for validator subnet subscriptions. The number is chosen so that we may be
/// able to run tens of thousands of validators on one BN.
const VALIDATOR_SUBSCRIPTION_MESSAGE_QUEUE_SIZE: usize = 65_536;
/// Seconds to expire a local validator registration
const LOCAL_VALIDATOR_REGISTRY_TTL_SEC: u64 = 60 * 60;
/// Types of messages that the network service can receive.
#[derive(Debug, IntoStaticStr)]
@@ -191,6 +194,8 @@ pub struct NetworkService<T: BeaconChainTypes> {
gossipsub_parameter_update: tokio::time::Interval,
/// Provides fork specific info.
fork_context: Arc<ForkContext>,
/// A timer to trigger CGC updates
cgc_update_interval: tokio::time::Interval,
}
impl<T: BeaconChainTypes> NetworkService<T> {
@@ -265,8 +270,29 @@ impl<T: BeaconChainTypes> NetworkService<T> {
libp2p_registry,
};
if let Some(_disk_custody_info) = store
.get_custody_info()
.map_err(|e| format!("Unable to read custody info from the DB: {e:?}"))?
{
// TODO(das): check that list of columns is compatible
}
let cgc_udpates = if let Some(disk_cgc_updates) = store
.get_cgc_updates()
.map_err(|e| format!("Unable to read cgc updates from the DB: {e:?}"))?
{
disk_cgc_updates
} else {
// Just return the new one
let initial_cgc = beacon_chain
.spec
.custody_group_count(config.subscribe_all_data_column_subnets);
CGCUpdates::new(initial_cgc)
};
// launch libp2p service
let (mut libp2p, network_globals) = Network::new(executor.clone(), service_context).await?;
let (mut libp2p, network_globals) =
Network::new(executor.clone(), service_context, Some(cgc_udpates)).await?;
// Repopulate the DHT with stored ENR's if discovery is not disabled.
if !config.disable_discovery {
@@ -318,6 +344,8 @@ impl<T: BeaconChainTypes> NetworkService<T> {
validator_subscription_recv,
} = network_receivers;
let epoch_in_seconds = fork_context.spec.seconds_per_slot * T::EthSpec::slots_per_epoch();
// create the network service and spawn the task
let network_service = NetworkService {
beacon_chain,
@@ -335,6 +363,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
metrics_enabled: config.metrics_enabled,
metrics_update,
gossipsub_parameter_update,
cgc_update_interval: tokio::time::interval(Duration::from_secs(epoch_in_seconds)),
fork_context,
};
@@ -427,6 +456,10 @@ impl<T: BeaconChainTypes> NetworkService<T> {
_ = self.gossipsub_parameter_update.tick() => self.update_gossipsub_parameters(),
// TODO(das) should align this tick to be in the middle of an epoch, and ideally
// outside a busy slot period
_ = self.cgc_update_interval.tick() => self.on_cgc_update_interval(),
// handle a message sent to the network
Some(msg) = self.network_recv.recv() => self.on_network_msg(msg, &mut shutdown_sender).await,
@@ -772,22 +805,107 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
}
fn on_cgc_update_interval(&mut self) {
let prev_cgc = self.network_globals.custody_group_count(Slot::max_value());
let Ok(clock_epoch) = self.beacon_chain.epoch() else {
return;
};
let cached_head = self.beacon_chain.canonical_head.cached_head();
self.beacon_chain
.validator_monitor
.write()
.prune_registered_local_validators(Duration::from_secs(
LOCAL_VALIDATOR_REGISTRY_TTL_SEC,
));
let known_validators_balance = self
.beacon_chain
.validator_monitor
.read()
.get_registered_local_validators()
// TODO(das): should ignore non active validators?
.map(|validator_index| {
cached_head
.snapshot
.beacon_state
.get_effective_balance(*validator_index as usize)
.unwrap_or(0)
})
.sum::<u64>();
// TODO(das): track connected balance as a metric
let next_cgc = self
.beacon_chain
.spec
.custody_group_by_balance(known_validators_balance);
// TODO(das): check the backfilled CGC and potentially update the network globals state
if next_cgc != prev_cgc {
// TODO(das): Should we consider the case where the clock is almost at the end of the epoch?
// If I/O is slow we may update the in-memory map for an epoch that's already
// progressing.
let next_epoch = clock_epoch + 1;
info!(prev_cgc, next_cgc, %next_epoch, "Updating internal custody count");
// TODO(das): Add CGC metrics for updates, current internal value and announced value
// Add a new entry to the network globals
if let Err(e) = self.network_globals.add_cgc_update(
next_epoch.start_slot(T::EthSpec::slots_per_epoch()),
next_cgc,
) {
error!("List of CGC updates full: {e:?}");
return;
}
let cgc_updates = self.network_globals.dump_cgc_updates();
// Persist the entry to the store
if let Err(e) = self.beacon_chain.store.put_cgc_updates(cgc_updates) {
// Do not update the memory value unless it's persisted to disk
error!(error = ?e, "Unable to persist CGC updates to disk");
}
// Update the gossip subscriptions
let next_sampling_subnets = self
.network_globals
.sampling_subnets_for_cgc(next_cgc)
.iter()
.copied()
.collect::<HashSet<_>>();
let prev_sampling_subnets = self
.network_globals
.sampling_subnets_for_cgc(prev_cgc)
.iter()
.copied()
.collect::<HashSet<_>>();
if next_cgc > prev_cgc {
for subnet in next_sampling_subnets.difference(&prev_sampling_subnets) {
self.subscribe(Subnet::DataColumn(*subnet).into());
}
}
if next_cgc < prev_cgc {
for subnet in prev_sampling_subnets.difference(&next_sampling_subnets) {
self.unsubscribe(Subnet::DataColumn(*subnet).into());
}
}
}
// Schedule an advertise CGC update for later
// TODO(das): use min_epochs_for_data_columns
let last_pruned_epoch =
clock_epoch - Epoch::new(self.beacon_chain.spec.min_epochs_for_blob_sidecars_requests);
let cgc_to_announce = self
.network_globals
.custody_group_count(last_pruned_epoch.start_slot(T::EthSpec::slots_per_epoch()));
// TODO(das): Compare with current announced and update ENR
}
fn on_subnet_service_msg(&mut self, msg: SubnetServiceMessage) {
match msg {
SubnetServiceMessage::Subscribe(subnet) => {
for fork_digest in self.required_gossip_fork_digests() {
let topic =
GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
self.libp2p.subscribe(topic);
}
}
SubnetServiceMessage::Unsubscribe(subnet) => {
for fork_digest in self.required_gossip_fork_digests() {
let topic =
GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
self.libp2p.unsubscribe(topic);
}
}
SubnetServiceMessage::Subscribe(subnet) => self.subscribe(subnet.into()),
SubnetServiceMessage::Unsubscribe(subnet) => self.unsubscribe(subnet.into()),
SubnetServiceMessage::EnrAdd(subnet) => {
self.libp2p.update_enr_subnet(subnet, true);
}
@@ -801,6 +919,20 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
}
fn subscribe(&mut self, kind: GossipKind) {
for fork_digest in self.required_gossip_fork_digests() {
let topic = GossipTopic::new(kind.clone(), GossipEncoding::default(), fork_digest);
self.libp2p.subscribe(topic);
}
}
fn unsubscribe(&mut self, kind: GossipKind) {
for fork_digest in self.required_gossip_fork_digests() {
let topic = GossipTopic::new(kind.clone(), GossipEncoding::default(), fork_digest);
self.libp2p.unsubscribe(topic);
}
}
fn update_next_fork(&mut self) {
let new_enr_fork_id = self.beacon_chain.enr_fork_id();
let new_fork_digest = new_enr_fork_id.fork_digest;
@@ -901,6 +1033,16 @@ impl<T: BeaconChainTypes> Drop for NetworkService<T> {
),
Ok(_) => info!("Saved DHT state"),
}
// Persist CGC updates
match self
.beacon_chain
.store
.put_cgc_updates(self.network_globals.dump_cgc_updates())
{
Ok(_) => info!("Saved CGC updates"),
Err(e) => error!(error = ?e, "Failed to persist CGC updates"),
}
info!("Network service shutdown");
}
}