Add plumbing for PeerDAS supernodes (#5050, #5409, #5570, #5966) (#6216)

* Add plumbing for peerdas supernodes (#5050, #5409, #5570, #5966)
- add cli option `--subscribe-to-all-data-columns`
- add custody subnet count to ENR, only if PeerDAS is scheduled
- subscribe to data column topics, only if PeerDAS is scheduled

Co-authored-by: Jacob Kaufmann <jacobkaufmann18@gmail.com>

* Merge branch 'unstable' into das-supernode

* Update CLI docs.

* Merge branch 'unstable' into das-supernode

* Fix fork epoch comparison with `FAR_FUTURE_EPOCH`.

* Merge branch 'unstable' into das-supernode

* Hide `--subscribe-all-data-column-subnets` flag and update help.

* Fix docs only

* Merge branch 'unstable' into das-supernode
This commit is contained in:
Jimmy Chen
2024-08-12 12:31:21 +10:00
committed by GitHub
parent 781c5ecb1f
commit f2fdbe7fbe
13 changed files with 223 additions and 17 deletions

View File

@@ -16,6 +16,7 @@ use futures::prelude::*;
use futures::StreamExt;
use lighthouse_network::service::Network;
use lighthouse_network::types::GossipKind;
use lighthouse_network::Eth2Enr;
use lighthouse_network::{prometheus_client::registry::Registry, MessageAcceptance};
use lighthouse_network::{
rpc::{GoodbyeReason, RPCResponseErrorCode},
@@ -35,8 +36,8 @@ use task_executor::ShutdownReason;
use tokio::sync::mpsc;
use tokio::time::Sleep;
use types::{
ChainSpec, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId,
Unsigned, ValidatorSubscription,
ChainSpec, DataColumnSubnetId, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription,
SyncSubnetId, Unsigned, ValidatorSubscription,
};
mod tests;
@@ -183,6 +184,8 @@ pub struct NetworkService<T: BeaconChainTypes> {
next_fork_subscriptions: Pin<Box<OptionFuture<Sleep>>>,
/// A delay that expires when we need to unsubscribe from old fork topics.
next_unsubscribe: Pin<Box<OptionFuture<Sleep>>>,
/// Subscribe to all the data column subnets.
subscribe_all_data_column_subnets: bool,
/// Subscribe to all the subnets once synced.
subscribe_all_subnets: bool,
/// Shutdown beacon node after sync is complete.
@@ -349,6 +352,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
next_fork_update,
next_fork_subscriptions,
next_unsubscribe,
subscribe_all_data_column_subnets: config.subscribe_all_data_column_subnets,
subscribe_all_subnets: config.subscribe_all_subnets,
shutdown_after_sync: config.shutdown_after_sync,
metrics_enabled: config.metrics_enabled,
@@ -733,6 +737,15 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
}
// TODO(das): This is added here for the purpose of testing, *without* having to
// activate Electra. This should happen as part of the Electra upgrade and we should
// move the subscription logic once it's ready to rebase PeerDAS on Electra, or if
// we decide to activate via the soft fork route:
// https://github.com/sigp/lighthouse/pull/5899
if self.fork_context.spec.is_peer_das_scheduled() {
self.subscribe_to_peer_das_topics(&mut subscribed_topics);
}
// If we are to subscribe to all subnets we do it here
if self.subscribe_all_subnets {
for subnet_id in 0..<<T as BeaconChainTypes>::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() {
@@ -779,6 +792,45 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
}
fn subscribe_to_peer_das_topics(&mut self, subscribed_topics: &mut Vec<GossipTopic>) {
if self.subscribe_all_data_column_subnets {
for column_subnet in 0..self.fork_context.spec.data_column_sidecar_subnet_count {
for fork_digest in self.required_gossip_fork_digests() {
let gossip_kind =
Subnet::DataColumn(DataColumnSubnetId::new(column_subnet)).into();
let topic =
GossipTopic::new(gossip_kind, GossipEncoding::default(), fork_digest);
if self.libp2p.subscribe(topic.clone()) {
subscribed_topics.push(topic);
} else {
warn!(self.log, "Could not subscribe to topic"; "topic" => %topic);
}
}
}
} else {
for column_subnet in DataColumnSubnetId::compute_custody_subnets::<T::EthSpec>(
self.network_globals.local_enr().node_id().raw().into(),
self.network_globals
.local_enr()
.custody_subnet_count::<<T as BeaconChainTypes>::EthSpec>(
&self.fork_context.spec,
),
&self.fork_context.spec,
) {
for fork_digest in self.required_gossip_fork_digests() {
let gossip_kind = Subnet::DataColumn(column_subnet).into();
let topic =
GossipTopic::new(gossip_kind, GossipEncoding::default(), fork_digest);
if self.libp2p.subscribe(topic.clone()) {
subscribed_topics.push(topic);
} else {
warn!(self.log, "Could not subscribe to topic"; "topic" => %topic);
}
}
}
}
}
/// Handle a message sent to the network service.
async fn on_validator_subscription_msg(&mut self, msg: ValidatorSubscriptionMessage) {
match msg {