mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-30 04:37:13 +00:00
Implement PeerDAS subnet decoupling (aka custody groups) (#6736)
* Implement PeerDAS subnet decoupling (aka custody groups). * Merge branch 'unstable' into decouple-subnets * Refactor feature testing for spec tests (#6737) Squashed commit of the following: commit898d05ee17Merge:ffbd25e2b7e0cddef3Author: Jimmy Chen <jchen.tc@gmail.com> Date: Tue Dec 24 14:41:19 2024 +1100 Merge branch 'unstable' into refactor-ef-tests-features commitffbd25e2beAuthor: Jimmy Chen <jchen.tc@gmail.com> Date: Tue Dec 24 14:40:38 2024 +1100 Fix `SszStatic` tests for PeerDAS: exclude eip7594 test vectors when testing Electra types. commitaa593cf35cAuthor: Jimmy Chen <jchen.tc@gmail.com> Date: Fri Dec 20 12:08:54 2024 +1100 Refactor spec testing for features and simplify usage. * Fix build. * Add input validation and improve arithmetic handling when calculating custody groups. * Address review comments re code style consistency. * Merge branch 'unstable' into decouple-subnets # Conflicts: # beacon_node/beacon_chain/src/kzg_utils.rs # beacon_node/beacon_chain/src/observed_data_sidecars.rs # beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs # common/eth2_network_config/built_in_network_configs/chiado/config.yaml # common/eth2_network_config/built_in_network_configs/gnosis/config.yaml # common/eth2_network_config/built_in_network_configs/holesky/config.yaml # common/eth2_network_config/built_in_network_configs/mainnet/config.yaml # common/eth2_network_config/built_in_network_configs/sepolia/config.yaml # consensus/types/src/chain_spec.rs * Update consensus/types/src/chain_spec.rs Co-authored-by: Lion - dapplion <35266934+dapplion@users.noreply.github.com> * Merge remote-tracking branch 'origin/unstable' into decouple-subnets * Update error handling. * Address review comment. * Merge remote-tracking branch 'origin/unstable' into decouple-subnets # Conflicts: # consensus/types/src/chain_spec.rs * Update PeerDAS spec tests to `1.5.0-beta.0` and fix failing unit tests. * Merge remote-tracking branch 'origin/unstable' into decouple-subnets # Conflicts: # beacon_node/lighthouse_network/src/peer_manager/mod.rs
This commit is contained in:
@@ -25,8 +25,8 @@ pub const ETH2_ENR_KEY: &str = "eth2";
|
||||
pub const ATTESTATION_BITFIELD_ENR_KEY: &str = "attnets";
|
||||
/// The ENR field specifying the sync committee subnet bitfield.
|
||||
pub const SYNC_COMMITTEE_BITFIELD_ENR_KEY: &str = "syncnets";
|
||||
/// The ENR field specifying the peerdas custody subnet count.
|
||||
pub const PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY: &str = "csc";
|
||||
/// The ENR field specifying the peerdas custody group count.
|
||||
pub const PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY: &str = "cgc";
|
||||
|
||||
/// Extension trait for ENR's within Eth2.
|
||||
pub trait Eth2Enr {
|
||||
@@ -38,8 +38,8 @@ pub trait Eth2Enr {
|
||||
&self,
|
||||
) -> Result<EnrSyncCommitteeBitfield<E>, &'static str>;
|
||||
|
||||
/// The peerdas custody subnet count associated with the ENR.
|
||||
fn custody_subnet_count<E: EthSpec>(&self, spec: &ChainSpec) -> Result<u64, &'static str>;
|
||||
/// The peerdas custody group count associated with the ENR.
|
||||
fn custody_group_count<E: EthSpec>(&self, spec: &ChainSpec) -> Result<u64, &'static str>;
|
||||
|
||||
fn eth2(&self) -> Result<EnrForkId, &'static str>;
|
||||
}
|
||||
@@ -67,16 +67,16 @@ impl Eth2Enr for Enr {
|
||||
.map_err(|_| "Could not decode the ENR syncnets bitfield")
|
||||
}
|
||||
|
||||
fn custody_subnet_count<E: EthSpec>(&self, spec: &ChainSpec) -> Result<u64, &'static str> {
|
||||
let csc = self
|
||||
.get_decodable::<u64>(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY)
|
||||
.ok_or("ENR custody subnet count non-existent")?
|
||||
.map_err(|_| "Could not decode the ENR custody subnet count")?;
|
||||
fn custody_group_count<E: EthSpec>(&self, spec: &ChainSpec) -> Result<u64, &'static str> {
|
||||
let cgc = self
|
||||
.get_decodable::<u64>(PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY)
|
||||
.ok_or("ENR custody group count non-existent")?
|
||||
.map_err(|_| "Could not decode the ENR custody group count")?;
|
||||
|
||||
if csc >= spec.custody_requirement && csc <= spec.data_column_sidecar_subnet_count {
|
||||
Ok(csc)
|
||||
if (spec.custody_requirement..=spec.number_of_custody_groups).contains(&cgc) {
|
||||
Ok(cgc)
|
||||
} else {
|
||||
Err("Invalid custody subnet count in ENR")
|
||||
Err("Invalid custody group count in ENR")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -253,14 +253,14 @@ pub fn build_enr<E: EthSpec>(
|
||||
&bitfield.as_ssz_bytes().into(),
|
||||
);
|
||||
|
||||
// only set `csc` if PeerDAS fork epoch has been scheduled
|
||||
// only set `cgc` if PeerDAS fork epoch has been scheduled
|
||||
if spec.is_peer_das_scheduled() {
|
||||
let custody_subnet_count = if config.subscribe_all_data_column_subnets {
|
||||
spec.data_column_sidecar_subnet_count
|
||||
let custody_group_count = if config.subscribe_all_data_column_subnets {
|
||||
spec.number_of_custody_groups
|
||||
} else {
|
||||
spec.custody_requirement
|
||||
};
|
||||
builder.add_value(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY, &custody_subnet_count);
|
||||
builder.add_value(PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY, &custody_group_count);
|
||||
}
|
||||
|
||||
builder
|
||||
@@ -287,11 +287,11 @@ fn compare_enr(local_enr: &Enr, disk_enr: &Enr) -> bool {
|
||||
&& (local_enr.udp4().is_none() || local_enr.udp4() == disk_enr.udp4())
|
||||
&& (local_enr.udp6().is_none() || local_enr.udp6() == disk_enr.udp6())
|
||||
// we need the ATTESTATION_BITFIELD_ENR_KEY and SYNC_COMMITTEE_BITFIELD_ENR_KEY and
|
||||
// PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY key to match, otherwise we use a new ENR. This will
|
||||
// PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY key to match, otherwise we use a new ENR. This will
|
||||
// likely only be true for non-validating nodes.
|
||||
&& local_enr.get_decodable::<Bytes>(ATTESTATION_BITFIELD_ENR_KEY) == disk_enr.get_decodable(ATTESTATION_BITFIELD_ENR_KEY)
|
||||
&& local_enr.get_decodable::<Bytes>(SYNC_COMMITTEE_BITFIELD_ENR_KEY) == disk_enr.get_decodable(SYNC_COMMITTEE_BITFIELD_ENR_KEY)
|
||||
&& local_enr.get_decodable::<Bytes>(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY) == disk_enr.get_decodable(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY)
|
||||
&& local_enr.get_decodable::<Bytes>(PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY) == disk_enr.get_decodable(PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY)
|
||||
}
|
||||
|
||||
/// Loads enr from the given directory
|
||||
@@ -348,7 +348,7 @@ mod test {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn custody_subnet_count_default() {
|
||||
fn custody_group_count_default() {
|
||||
let config = NetworkConfig {
|
||||
subscribe_all_data_column_subnets: false,
|
||||
..NetworkConfig::default()
|
||||
@@ -358,13 +358,13 @@ mod test {
|
||||
let enr = build_enr_with_config(config, &spec).0;
|
||||
|
||||
assert_eq!(
|
||||
enr.custody_subnet_count::<E>(&spec).unwrap(),
|
||||
enr.custody_group_count::<E>(&spec).unwrap(),
|
||||
spec.custody_requirement,
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn custody_subnet_count_all() {
|
||||
fn custody_group_count_all() {
|
||||
let config = NetworkConfig {
|
||||
subscribe_all_data_column_subnets: true,
|
||||
..NetworkConfig::default()
|
||||
@@ -373,8 +373,8 @@ mod test {
|
||||
let enr = build_enr_with_config(config, &spec).0;
|
||||
|
||||
assert_eq!(
|
||||
enr.custody_subnet_count::<E>(&spec).unwrap(),
|
||||
spec.data_column_sidecar_subnet_count,
|
||||
enr.custody_group_count::<E>(&spec).unwrap(),
|
||||
spec.number_of_custody_groups,
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
//! The subnet predicate used for searching for a particular subnet.
|
||||
use super::*;
|
||||
use crate::types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield};
|
||||
use itertools::Itertools;
|
||||
use slog::trace;
|
||||
use std::ops::Deref;
|
||||
use types::{ChainSpec, DataColumnSubnetId};
|
||||
use types::data_column_custody_group::compute_subnets_for_node;
|
||||
use types::ChainSpec;
|
||||
|
||||
/// Returns the predicate for a given subnet.
|
||||
pub fn subnet_predicate<E>(
|
||||
@@ -37,13 +37,9 @@ where
|
||||
.as_ref()
|
||||
.is_ok_and(|b| b.get(*s.deref() as usize).unwrap_or(false)),
|
||||
Subnet::DataColumn(s) => {
|
||||
if let Ok(custody_subnet_count) = enr.custody_subnet_count::<E>(&spec) {
|
||||
DataColumnSubnetId::compute_custody_subnets::<E>(
|
||||
enr.node_id().raw(),
|
||||
custody_subnet_count,
|
||||
&spec,
|
||||
)
|
||||
.is_ok_and(|mut subnets| subnets.contains(s))
|
||||
if let Ok(custody_group_count) = enr.custody_group_count::<E>(&spec) {
|
||||
compute_subnets_for_node(enr.node_id().raw(), custody_group_count, &spec)
|
||||
.is_ok_and(|subnets| subnets.contains(s))
|
||||
} else {
|
||||
false
|
||||
}
|
||||
|
||||
@@ -93,11 +93,11 @@ pub static PEERS_PER_CLIENT: LazyLock<Result<IntGaugeVec>> = LazyLock::new(|| {
|
||||
)
|
||||
});
|
||||
|
||||
pub static PEERS_PER_CUSTODY_SUBNET_COUNT: LazyLock<Result<IntGaugeVec>> = LazyLock::new(|| {
|
||||
pub static PEERS_PER_CUSTODY_GROUP_COUNT: LazyLock<Result<IntGaugeVec>> = LazyLock::new(|| {
|
||||
try_create_int_gauge_vec(
|
||||
"peers_per_custody_subnet_count",
|
||||
"The current count of peers by custody subnet count",
|
||||
&["custody_subnet_count"],
|
||||
"peers_per_custody_group_count",
|
||||
"The current count of peers by custody group count",
|
||||
&["custody_group_count"],
|
||||
)
|
||||
});
|
||||
|
||||
|
||||
@@ -34,6 +34,9 @@ pub use peerdb::sync_status::{SyncInfo, SyncStatus};
|
||||
use std::collections::{hash_map::Entry, HashMap, HashSet};
|
||||
use std::net::IpAddr;
|
||||
use strum::IntoEnumIterator;
|
||||
use types::data_column_custody_group::{
|
||||
compute_subnets_from_custody_group, get_custody_groups, CustodyIndex,
|
||||
};
|
||||
|
||||
pub mod config;
|
||||
mod network_behaviour;
|
||||
@@ -101,6 +104,8 @@ pub struct PeerManager<E: EthSpec> {
|
||||
/// discovery queries for subnet peers if we disconnect from existing sync
|
||||
/// committee subnet peers.
|
||||
sync_committee_subnets: HashMap<SyncSubnetId, Instant>,
|
||||
/// A mapping of all custody groups to column subnets to avoid re-computation.
|
||||
subnets_by_custody_group: HashMap<u64, Vec<DataColumnSubnetId>>,
|
||||
/// The heartbeat interval to perform routine maintenance.
|
||||
heartbeat: tokio::time::Interval,
|
||||
/// Keeps track of whether the discovery service is enabled or not.
|
||||
@@ -160,6 +165,21 @@ impl<E: EthSpec> PeerManager<E> {
|
||||
// Set up the peer manager heartbeat interval
|
||||
let heartbeat = tokio::time::interval(tokio::time::Duration::from_secs(HEARTBEAT_INTERVAL));
|
||||
|
||||
// Compute subnets for all custody groups
|
||||
let subnets_by_custody_group = if network_globals.spec.is_peer_das_scheduled() {
|
||||
(0..network_globals.spec.number_of_custody_groups)
|
||||
.map(|custody_index| {
|
||||
let subnets =
|
||||
compute_subnets_from_custody_group(custody_index, &network_globals.spec)
|
||||
.expect("Should compute subnets for all custody groups")
|
||||
.collect();
|
||||
(custody_index, subnets)
|
||||
})
|
||||
.collect::<HashMap<_, Vec<DataColumnSubnetId>>>()
|
||||
} else {
|
||||
HashMap::new()
|
||||
};
|
||||
|
||||
Ok(PeerManager {
|
||||
network_globals,
|
||||
events: SmallVec::new(),
|
||||
@@ -170,6 +190,7 @@ impl<E: EthSpec> PeerManager<E> {
|
||||
target_peers: target_peer_count,
|
||||
temporary_banned_peers: LRUTimeCache::new(PEER_RECONNECTION_TIMEOUT),
|
||||
sync_committee_subnets: Default::default(),
|
||||
subnets_by_custody_group,
|
||||
heartbeat,
|
||||
discovery_enabled,
|
||||
metrics_enabled,
|
||||
@@ -711,22 +732,39 @@ impl<E: EthSpec> PeerManager<E> {
|
||||
"peer_id" => %peer_id, "new_seq_no" => meta_data.seq_number());
|
||||
}
|
||||
|
||||
let custody_subnet_count_opt = meta_data.custody_subnet_count().copied().ok();
|
||||
let custody_group_count_opt = meta_data.custody_group_count().copied().ok();
|
||||
peer_info.set_meta_data(meta_data);
|
||||
|
||||
if self.network_globals.spec.is_peer_das_scheduled() {
|
||||
// Gracefully ignore metadata/v2 peers. Potentially downscore after PeerDAS to
|
||||
// prioritize PeerDAS peers.
|
||||
if let Some(custody_subnet_count) = custody_subnet_count_opt {
|
||||
match self.compute_peer_custody_subnets(peer_id, custody_subnet_count) {
|
||||
Ok(custody_subnets) => {
|
||||
if let Some(custody_group_count) = custody_group_count_opt {
|
||||
match self.compute_peer_custody_groups(peer_id, custody_group_count) {
|
||||
Ok(custody_groups) => {
|
||||
let custody_subnets = custody_groups
|
||||
.into_iter()
|
||||
.flat_map(|custody_index| {
|
||||
self.subnets_by_custody_group
|
||||
.get(&custody_index)
|
||||
.cloned()
|
||||
.unwrap_or_else(|| {
|
||||
warn!(
|
||||
self.log,
|
||||
"Custody group not found in subnet mapping";
|
||||
"custody_index" => custody_index,
|
||||
"peer_id" => %peer_id
|
||||
);
|
||||
vec![]
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
peer_info.set_custody_subnets(custody_subnets);
|
||||
}
|
||||
Err(err) => {
|
||||
debug!(self.log, "Unable to compute peer custody subnets from metadata";
|
||||
debug!(self.log, "Unable to compute peer custody groups from metadata";
|
||||
"info" => "Sending goodbye to peer",
|
||||
"peer_id" => %peer_id,
|
||||
"custody_subnet_count" => custody_subnet_count,
|
||||
"custody_group_count" => custody_group_count,
|
||||
"error" => ?err,
|
||||
);
|
||||
invalid_meta_data = true;
|
||||
@@ -1312,7 +1350,7 @@ impl<E: EthSpec> PeerManager<E> {
|
||||
let mut inbound_ipv4_peers_connected: usize = 0;
|
||||
let mut inbound_ipv6_peers_connected: usize = 0;
|
||||
let mut peers_connected_multi: HashMap<(&str, &str), i32> = HashMap::new();
|
||||
let mut peers_per_custody_subnet_count: HashMap<u64, i64> = HashMap::new();
|
||||
let mut peers_per_custody_group_count: HashMap<u64, i64> = HashMap::new();
|
||||
|
||||
for (_, peer_info) in self.network_globals.peers.read().connected_peers() {
|
||||
peers_connected += 1;
|
||||
@@ -1345,8 +1383,8 @@ impl<E: EthSpec> PeerManager<E> {
|
||||
.or_default() += 1;
|
||||
|
||||
if let Some(MetaData::V3(meta_data)) = peer_info.meta_data() {
|
||||
*peers_per_custody_subnet_count
|
||||
.entry(meta_data.custody_subnet_count)
|
||||
*peers_per_custody_group_count
|
||||
.entry(meta_data.custody_group_count)
|
||||
.or_default() += 1;
|
||||
}
|
||||
// Check if incoming peer is ipv4
|
||||
@@ -1377,11 +1415,11 @@ impl<E: EthSpec> PeerManager<E> {
|
||||
// PEERS_CONNECTED
|
||||
metrics::set_gauge(&metrics::PEERS_CONNECTED, peers_connected);
|
||||
|
||||
// CUSTODY_SUBNET_COUNT
|
||||
for (custody_subnet_count, peer_count) in peers_per_custody_subnet_count.into_iter() {
|
||||
// CUSTODY_GROUP_COUNT
|
||||
for (custody_group_count, peer_count) in peers_per_custody_group_count.into_iter() {
|
||||
metrics::set_gauge_vec(
|
||||
&metrics::PEERS_PER_CUSTODY_SUBNET_COUNT,
|
||||
&[&custody_subnet_count.to_string()],
|
||||
&metrics::PEERS_PER_CUSTODY_GROUP_COUNT,
|
||||
&[&custody_group_count.to_string()],
|
||||
peer_count,
|
||||
)
|
||||
}
|
||||
@@ -1410,43 +1448,27 @@ impl<E: EthSpec> PeerManager<E> {
|
||||
}
|
||||
}
|
||||
|
||||
fn compute_peer_custody_subnets(
|
||||
fn compute_peer_custody_groups(
|
||||
&self,
|
||||
peer_id: &PeerId,
|
||||
custody_subnet_count: u64,
|
||||
) -> Result<HashSet<DataColumnSubnetId>, String> {
|
||||
custody_group_count: u64,
|
||||
) -> Result<HashSet<CustodyIndex>, String> {
|
||||
// If we don't have a node id, we cannot compute the custody duties anyway
|
||||
let node_id = peer_id_to_node_id(peer_id)?;
|
||||
let spec = &self.network_globals.spec;
|
||||
|
||||
if !(spec.custody_requirement..=spec.data_column_sidecar_subnet_count)
|
||||
.contains(&custody_subnet_count)
|
||||
if !(spec.custody_requirement..=spec.number_of_custody_groups)
|
||||
.contains(&custody_group_count)
|
||||
{
|
||||
return Err("Invalid custody subnet count in metadata: out of range".to_string());
|
||||
return Err("Invalid custody group count in metadata: out of range".to_string());
|
||||
}
|
||||
|
||||
let custody_subnets = DataColumnSubnetId::compute_custody_subnets::<E>(
|
||||
node_id.raw(),
|
||||
custody_subnet_count,
|
||||
spec,
|
||||
)
|
||||
.map(|subnets| subnets.collect())
|
||||
.unwrap_or_else(|e| {
|
||||
// This is an unreachable scenario unless there's a bug, as we've validated the csc
|
||||
// just above.
|
||||
error!(
|
||||
self.log,
|
||||
"Computing peer custody subnets failed unexpectedly";
|
||||
"info" => "Falling back to default custody requirement subnets",
|
||||
"peer_id" => %peer_id,
|
||||
"custody_subnet_count" => custody_subnet_count,
|
||||
"error" => ?e
|
||||
);
|
||||
DataColumnSubnetId::compute_custody_requirement_subnets::<E>(node_id.raw(), spec)
|
||||
.collect()
|
||||
});
|
||||
|
||||
Ok(custody_subnets)
|
||||
get_custody_groups(node_id.raw(), custody_group_count, spec).map_err(|e| {
|
||||
format!(
|
||||
"Error computing peer custody groups for node {} with cgc={}: {:?}",
|
||||
node_id, custody_group_count, e
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::discovery::enr::PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY;
|
||||
use crate::discovery::enr::PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY;
|
||||
use crate::discovery::{peer_id_to_node_id, CombinedKey};
|
||||
use crate::{metrics, multiaddr::Multiaddr, types::Subnet, Enr, EnrExt, Gossipsub, PeerId};
|
||||
use itertools::Itertools;
|
||||
@@ -13,6 +13,7 @@ use std::{
|
||||
fmt::Formatter,
|
||||
};
|
||||
use sync_status::SyncStatus;
|
||||
use types::data_column_custody_group::compute_subnets_for_node;
|
||||
use types::{ChainSpec, DataColumnSubnetId, EthSpec};
|
||||
|
||||
pub mod client;
|
||||
@@ -695,8 +696,8 @@ impl<E: EthSpec> PeerDB<E> {
|
||||
|
||||
if supernode {
|
||||
enr.insert(
|
||||
PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY,
|
||||
&spec.data_column_sidecar_subnet_count,
|
||||
PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY,
|
||||
&spec.number_of_custody_groups,
|
||||
&enr_key,
|
||||
)
|
||||
.expect("u64 can be encoded");
|
||||
@@ -714,19 +715,14 @@ impl<E: EthSpec> PeerDB<E> {
|
||||
if supernode {
|
||||
let peer_info = self.peers.get_mut(&peer_id).expect("peer exists");
|
||||
let all_subnets = (0..spec.data_column_sidecar_subnet_count)
|
||||
.map(|csc| csc.into())
|
||||
.map(|subnet_id| subnet_id.into())
|
||||
.collect();
|
||||
peer_info.set_custody_subnets(all_subnets);
|
||||
} else {
|
||||
let peer_info = self.peers.get_mut(&peer_id).expect("peer exists");
|
||||
let node_id = peer_id_to_node_id(&peer_id).expect("convert peer_id to node_id");
|
||||
let subnets = DataColumnSubnetId::compute_custody_subnets::<E>(
|
||||
node_id.raw(),
|
||||
spec.custody_requirement,
|
||||
spec,
|
||||
)
|
||||
.expect("should compute custody subnets")
|
||||
.collect();
|
||||
let subnets = compute_subnets_for_node(node_id.raw(), spec.custody_requirement, spec)
|
||||
.expect("should compute custody subnets");
|
||||
peer_info.set_custody_subnets(subnets);
|
||||
}
|
||||
|
||||
|
||||
@@ -89,7 +89,7 @@ impl<E: EthSpec> PeerInfo<E> {
|
||||
}
|
||||
|
||||
/// Returns if the peer is subscribed to a given `Subnet` from the metadata attnets/syncnets field.
|
||||
/// Also returns true if the peer is assigned to custody a given data column `Subnet` computed from the metadata `custody_column_count` field or ENR `csc` field.
|
||||
/// Also returns true if the peer is assigned to custody a given data column `Subnet` computed from the metadata `custody_group_count` field or ENR `cgc` field.
|
||||
pub fn on_subnet_metadata(&self, subnet: &Subnet) -> bool {
|
||||
if let Some(meta_data) = &self.meta_data {
|
||||
match subnet {
|
||||
@@ -101,7 +101,9 @@ impl<E: EthSpec> PeerInfo<E> {
|
||||
.syncnets()
|
||||
.is_ok_and(|s| s.get(**id as usize).unwrap_or(false))
|
||||
}
|
||||
Subnet::DataColumn(column) => return self.custody_subnets.contains(column),
|
||||
Subnet::DataColumn(subnet_id) => {
|
||||
return self.is_assigned_to_custody_subnet(subnet_id)
|
||||
}
|
||||
}
|
||||
}
|
||||
false
|
||||
|
||||
@@ -1139,7 +1139,7 @@ mod tests {
|
||||
seq_number: 1,
|
||||
attnets: EnrAttestationBitfield::<Spec>::default(),
|
||||
syncnets: EnrSyncCommitteeBitfield::<Spec>::default(),
|
||||
custody_subnet_count: 1,
|
||||
custody_group_count: 1,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -138,7 +138,7 @@ pub struct MetaData<E: EthSpec> {
|
||||
#[superstruct(only(V2, V3))]
|
||||
pub syncnets: EnrSyncCommitteeBitfield<E>,
|
||||
#[superstruct(only(V3))]
|
||||
pub custody_subnet_count: u64,
|
||||
pub custody_group_count: u64,
|
||||
}
|
||||
|
||||
impl<E: EthSpec> MetaData<E> {
|
||||
@@ -181,13 +181,13 @@ impl<E: EthSpec> MetaData<E> {
|
||||
seq_number: metadata.seq_number,
|
||||
attnets: metadata.attnets.clone(),
|
||||
syncnets: Default::default(),
|
||||
custody_subnet_count: spec.custody_requirement,
|
||||
custody_group_count: spec.custody_requirement,
|
||||
}),
|
||||
MetaData::V2(metadata) => MetaData::V3(MetaDataV3 {
|
||||
seq_number: metadata.seq_number,
|
||||
attnets: metadata.attnets.clone(),
|
||||
syncnets: metadata.syncnets.clone(),
|
||||
custody_subnet_count: spec.custody_requirement,
|
||||
custody_group_count: spec.custody_requirement,
|
||||
}),
|
||||
md @ MetaData::V3(_) => md.clone(),
|
||||
}
|
||||
@@ -364,7 +364,7 @@ impl DataColumnsByRangeRequest {
|
||||
DataColumnsByRangeRequest {
|
||||
start_slot: 0,
|
||||
count: 0,
|
||||
columns: vec![0; spec.number_of_columns],
|
||||
columns: vec![0; spec.number_of_columns as usize],
|
||||
}
|
||||
.as_ssz_bytes()
|
||||
.len()
|
||||
|
||||
@@ -198,15 +198,12 @@ impl<E: EthSpec> Network<E> {
|
||||
)?;
|
||||
|
||||
// Construct the metadata
|
||||
let custody_subnet_count = ctx.chain_spec.is_peer_das_scheduled().then(|| {
|
||||
if config.subscribe_all_data_column_subnets {
|
||||
ctx.chain_spec.data_column_sidecar_subnet_count
|
||||
} else {
|
||||
ctx.chain_spec.custody_requirement
|
||||
}
|
||||
let custody_group_count = ctx.chain_spec.is_peer_das_scheduled().then(|| {
|
||||
ctx.chain_spec
|
||||
.custody_group_count(config.subscribe_all_data_column_subnets)
|
||||
});
|
||||
let meta_data =
|
||||
utils::load_or_build_metadata(&config.network_dir, custody_subnet_count, &log);
|
||||
utils::load_or_build_metadata(&config.network_dir, custody_group_count, &log);
|
||||
let seq_number = *meta_data.seq_number();
|
||||
let globals = NetworkGlobals::new(
|
||||
enr,
|
||||
|
||||
@@ -164,8 +164,8 @@ pub fn strip_peer_id(addr: &mut Multiaddr) {
|
||||
|
||||
/// Load metadata from persisted file. Return default metadata if loading fails.
|
||||
pub fn load_or_build_metadata<E: EthSpec>(
|
||||
network_dir: &std::path::Path,
|
||||
custody_subnet_count: Option<u64>,
|
||||
network_dir: &Path,
|
||||
custody_group_count_opt: Option<u64>,
|
||||
log: &slog::Logger,
|
||||
) -> MetaData<E> {
|
||||
// We load a V2 metadata version by default (regardless of current fork)
|
||||
@@ -216,12 +216,12 @@ pub fn load_or_build_metadata<E: EthSpec>(
|
||||
};
|
||||
|
||||
// Wrap the MetaData
|
||||
let meta_data = if let Some(custody_count) = custody_subnet_count {
|
||||
let meta_data = if let Some(custody_group_count) = custody_group_count_opt {
|
||||
MetaData::V3(MetaDataV3 {
|
||||
attnets: meta_data.attnets,
|
||||
seq_number: meta_data.seq_number,
|
||||
syncnets: meta_data.syncnets,
|
||||
custody_subnet_count: custody_count,
|
||||
custody_group_count,
|
||||
})
|
||||
} else {
|
||||
MetaData::V2(meta_data)
|
||||
@@ -286,8 +286,8 @@ pub(crate) fn save_metadata_to_disk<E: EthSpec>(
|
||||
) {
|
||||
let _ = std::fs::create_dir_all(dir);
|
||||
// We always store the metadata v2 to disk because
|
||||
// custody_subnet_count parameter doesn't need to be persisted across runs.
|
||||
// custody_subnet_count is what the user sets it for the current run.
|
||||
// custody_group_count parameter doesn't need to be persisted across runs.
|
||||
// custody_group_count is what the user sets it for the current run.
|
||||
// This is to prevent ugly branching logic when reading the metadata from disk.
|
||||
let metadata_bytes = metadata.metadata_v2().as_ssz_bytes();
|
||||
match File::create(dir.join(METADATA_FILENAME)).and_then(|mut f| f.write_all(&metadata_bytes)) {
|
||||
|
||||
@@ -3,10 +3,13 @@ use crate::peer_manager::peerdb::PeerDB;
|
||||
use crate::rpc::{MetaData, MetaDataV3};
|
||||
use crate::types::{BackFillState, SyncState};
|
||||
use crate::{Client, Enr, EnrExt, GossipTopic, Multiaddr, NetworkConfig, PeerId};
|
||||
use itertools::Itertools;
|
||||
use parking_lot::RwLock;
|
||||
use slog::error;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use types::data_column_custody_group::{
|
||||
compute_columns_for_custody_group, compute_subnets_from_custody_group, get_custody_groups,
|
||||
};
|
||||
use types::{ChainSpec, ColumnIndex, DataColumnSubnetId, EthSpec};
|
||||
|
||||
pub struct NetworkGlobals<E: EthSpec> {
|
||||
@@ -27,8 +30,8 @@ pub struct NetworkGlobals<E: EthSpec> {
|
||||
/// The current state of the backfill sync.
|
||||
pub backfill_state: RwLock<BackFillState>,
|
||||
/// The computed sampling subnets and columns is stored to avoid re-computing.
|
||||
pub sampling_subnets: Vec<DataColumnSubnetId>,
|
||||
pub sampling_columns: Vec<ColumnIndex>,
|
||||
pub sampling_subnets: HashSet<DataColumnSubnetId>,
|
||||
pub sampling_columns: HashSet<ColumnIndex>,
|
||||
/// Network-related configuration. Immutable after initialization.
|
||||
pub config: Arc<NetworkConfig>,
|
||||
/// Ethereum chain configuration. Immutable after initialization.
|
||||
@@ -48,30 +51,43 @@ impl<E: EthSpec> NetworkGlobals<E> {
|
||||
let (sampling_subnets, sampling_columns) = if spec.is_peer_das_scheduled() {
|
||||
let node_id = enr.node_id().raw();
|
||||
|
||||
let custody_subnet_count = local_metadata
|
||||
.custody_subnet_count()
|
||||
.copied()
|
||||
.expect("custody subnet count must be set if PeerDAS is scheduled");
|
||||
let custody_group_count = match local_metadata.custody_group_count() {
|
||||
Ok(&cgc) if cgc <= spec.number_of_custody_groups => cgc,
|
||||
_ => {
|
||||
error!(
|
||||
log,
|
||||
"custody_group_count from metadata is either invalid or not set. This is a bug!";
|
||||
"info" => "falling back to default custody requirement"
|
||||
);
|
||||
spec.custody_requirement
|
||||
}
|
||||
};
|
||||
|
||||
let subnet_sampling_size = std::cmp::max(custody_subnet_count, spec.samples_per_slot);
|
||||
// The below `expect` calls will panic on start up if the chain spec config values used
|
||||
// are invalid
|
||||
let sampling_size = spec
|
||||
.sampling_size(custody_group_count)
|
||||
.expect("should compute node sampling size from valid chain spec");
|
||||
let custody_groups = get_custody_groups(node_id, sampling_size, &spec)
|
||||
.expect("should compute node custody groups");
|
||||
|
||||
let sampling_subnets = DataColumnSubnetId::compute_custody_subnets::<E>(
|
||||
node_id,
|
||||
subnet_sampling_size,
|
||||
&spec,
|
||||
)
|
||||
.expect("sampling subnet count must be valid")
|
||||
.collect::<Vec<_>>();
|
||||
let mut sampling_subnets = HashSet::new();
|
||||
for custody_index in &custody_groups {
|
||||
let subnets = compute_subnets_from_custody_group(*custody_index, &spec)
|
||||
.expect("should compute custody subnets for node");
|
||||
sampling_subnets.extend(subnets);
|
||||
}
|
||||
|
||||
let sampling_columns = sampling_subnets
|
||||
.iter()
|
||||
.flat_map(|subnet| subnet.columns::<E>(&spec))
|
||||
.sorted()
|
||||
.collect();
|
||||
let mut sampling_columns = HashSet::new();
|
||||
for custody_index in &custody_groups {
|
||||
let columns = compute_columns_for_custody_group(*custody_index, &spec)
|
||||
.expect("should compute custody columns for node");
|
||||
sampling_columns.extend(columns);
|
||||
}
|
||||
|
||||
(sampling_subnets, sampling_columns)
|
||||
} else {
|
||||
(vec![], vec![])
|
||||
(HashSet::new(), HashSet::new())
|
||||
};
|
||||
|
||||
NetworkGlobals {
|
||||
@@ -159,8 +175,8 @@ impl<E: EthSpec> NetworkGlobals<E> {
|
||||
pub fn custody_peers_for_column(&self, column_index: ColumnIndex) -> Vec<PeerId> {
|
||||
self.peers
|
||||
.read()
|
||||
.good_custody_subnet_peer(DataColumnSubnetId::from_column_index::<E>(
|
||||
column_index as usize,
|
||||
.good_custody_subnet_peer(DataColumnSubnetId::from_column_index(
|
||||
column_index,
|
||||
&self.spec,
|
||||
))
|
||||
.cloned()
|
||||
@@ -178,7 +194,7 @@ impl<E: EthSpec> NetworkGlobals<E> {
|
||||
seq_number: 0,
|
||||
attnets: Default::default(),
|
||||
syncnets: Default::default(),
|
||||
custody_subnet_count: spec.custody_requirement,
|
||||
custody_group_count: spec.custody_requirement,
|
||||
});
|
||||
Self::new_test_globals_with_metadata(trusted_peers, metadata, log, config, spec)
|
||||
}
|
||||
@@ -209,9 +225,9 @@ mod test {
|
||||
let mut spec = E::default_spec();
|
||||
spec.eip7594_fork_epoch = Some(Epoch::new(0));
|
||||
|
||||
let custody_subnet_count = spec.data_column_sidecar_subnet_count / 2;
|
||||
let subnet_sampling_size = std::cmp::max(custody_subnet_count, spec.samples_per_slot);
|
||||
let metadata = get_metadata(custody_subnet_count);
|
||||
let custody_group_count = spec.number_of_custody_groups / 2;
|
||||
let subnet_sampling_size = spec.sampling_size(custody_group_count).unwrap();
|
||||
let metadata = get_metadata(custody_group_count);
|
||||
let config = Arc::new(NetworkConfig::default());
|
||||
|
||||
let globals = NetworkGlobals::<E>::new_test_globals_with_metadata(
|
||||
@@ -233,9 +249,9 @@ mod test {
|
||||
let mut spec = E::default_spec();
|
||||
spec.eip7594_fork_epoch = Some(Epoch::new(0));
|
||||
|
||||
let custody_subnet_count = spec.data_column_sidecar_subnet_count / 2;
|
||||
let subnet_sampling_size = std::cmp::max(custody_subnet_count, spec.samples_per_slot);
|
||||
let metadata = get_metadata(custody_subnet_count);
|
||||
let custody_group_count = spec.number_of_custody_groups / 2;
|
||||
let subnet_sampling_size = spec.sampling_size(custody_group_count).unwrap();
|
||||
let metadata = get_metadata(custody_group_count);
|
||||
let config = Arc::new(NetworkConfig::default());
|
||||
|
||||
let globals = NetworkGlobals::<E>::new_test_globals_with_metadata(
|
||||
@@ -251,12 +267,12 @@ mod test {
|
||||
);
|
||||
}
|
||||
|
||||
fn get_metadata(custody_subnet_count: u64) -> MetaData<E> {
|
||||
fn get_metadata(custody_group_count: u64) -> MetaData<E> {
|
||||
MetaData::V3(MetaDataV3 {
|
||||
seq_number: 0,
|
||||
attnets: Default::default(),
|
||||
syncnets: Default::default(),
|
||||
custody_subnet_count,
|
||||
custody_group_count,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user