Track CGC updates

This commit is contained in:
dapplion
2025-03-28 14:47:19 -03:00
parent ced8910c09
commit 64bb7a4f55
5 changed files with 61 additions and 11 deletions

View File

@@ -14,7 +14,7 @@ use lighthouse_network::{
behaviour::{ConnectionEstablished, FromSwarm}, behaviour::{ConnectionEstablished, FromSwarm},
ConnectionId, NetworkBehaviour, ConnectionId, NetworkBehaviour,
}, },
types::SyncState, types::{CGCUpdates, SyncState},
ConnectedPoint, Enr, NetworkConfig, NetworkGlobals, PeerId, PeerManager, ConnectedPoint, Enr, NetworkConfig, NetworkGlobals, PeerId, PeerManager,
}; };
use network::{NetworkReceivers, NetworkSenders}; use network::{NetworkReceivers, NetworkSenders};
@@ -140,8 +140,10 @@ pub async fn create_api_server_with_config<T: BeaconChainTypes>(
let enr_key = CombinedKey::generate_secp256k1(); let enr_key = CombinedKey::generate_secp256k1();
let enr = Enr::builder().build(&enr_key).unwrap(); let enr = Enr::builder().build(&enr_key).unwrap();
let network_config = Arc::new(NetworkConfig::default()); let network_config = Arc::new(NetworkConfig::default());
let cgc_updates = CGCUpdates::new(chain.spec.custody_requirement);
let network_globals = Arc::new(NetworkGlobals::new( let network_globals = Arc::new(NetworkGlobals::new(
enr.clone(), enr.clone(),
cgc_updates,
vec![], vec![],
false, false,
network_config, network_config,

View File

@@ -1194,6 +1194,7 @@ impl<E: EthSpec> Discovery<E> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::types::CGCUpdates;
use libp2p::identity::secp256k1; use libp2p::identity::secp256k1;
use types::{BitVector, MinimalEthSpec, SubnetId}; use types::{BitVector, MinimalEthSpec, SubnetId};
@@ -1207,7 +1208,15 @@ mod tests {
let config = Arc::new(config); let config = Arc::new(config);
let enr_key: CombinedKey = CombinedKey::from_secp256k1(&keypair); let enr_key: CombinedKey = CombinedKey::from_secp256k1(&keypair);
let enr: Enr = build_enr::<E>(&enr_key, &config, &EnrForkId::default(), &spec).unwrap(); let enr: Enr = build_enr::<E>(&enr_key, &config, &EnrForkId::default(), &spec).unwrap();
let globals = NetworkGlobals::new(enr, vec![], false, config.clone(), spec.clone()); let cgc_updates = CGCUpdates::new(spec.custody_requirement);
let globals = NetworkGlobals::new(
enr,
cgc_updates,
vec![],
false,
config.clone(),
spec.clone(),
);
let keypair = keypair.into(); let keypair = keypair.into();
Discovery::new(keypair, &config, Arc::new(globals), &spec) Discovery::new(keypair, &config, Arc::new(globals), &spec)
.await .await

View File

@@ -15,7 +15,7 @@ use crate::rpc::{
}; };
use crate::types::{ use crate::types::{
all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic, subnet_from_topic_hash, all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic, subnet_from_topic_hash,
GossipEncoding, GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery, CGCUpdates, GossipEncoding, GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery,
}; };
use crate::EnrExt; use crate::EnrExt;
use crate::Eth2Enr; use crate::Eth2Enr;
@@ -196,9 +196,16 @@ impl<E: EthSpec> Network<E> {
&ctx.chain_spec, &ctx.chain_spec,
)?; )?;
// TODO: Load from disk, and check consistency with DB somewhere
let initial_cgc = ctx
.chain_spec
.custody_group_count(config.subscribe_all_data_column_subnets);
let cgc_updates = CGCUpdates::new(initial_cgc);
// Construct the metadata // Construct the metadata
let globals = NetworkGlobals::new( let globals = NetworkGlobals::new(
enr, enr,
cgc_updates,
trusted_peers, trusted_peers,
config.disable_peer_scoring, config.disable_peer_scoring,
config.clone(), config.clone(),

View File

@@ -32,20 +32,23 @@ pub struct NetworkGlobals<E: EthSpec> {
all_sampling_subnets: Vec<DataColumnSubnetId>, all_sampling_subnets: Vec<DataColumnSubnetId>,
all_sampling_columns: Vec<ColumnIndex>, all_sampling_columns: Vec<ColumnIndex>,
/// Dynamic custody group count (CGC) /// Dynamic custody group count (CGC)
custody_group_count: RwLock<CustodyGroupCount>, cgc_updates: RwLock<CGCUpdates>,
/// Network-related configuration. Immutable after initialization. /// Network-related configuration. Immutable after initialization.
pub config: Arc<NetworkConfig>, pub config: Arc<NetworkConfig>,
/// Ethereum chain configuration. Immutable after initialization. /// Ethereum chain configuration. Immutable after initialization.
pub spec: Arc<ChainSpec>, pub spec: Arc<ChainSpec>,
} }
struct CustodyGroupCount { pub struct CGCUpdates {
value: u64, initial_value: u64,
updates: Vec<(Slot, u64)>,
// TODO(das): Track backfilled CGC
} }
impl<E: EthSpec> NetworkGlobals<E> { impl<E: EthSpec> NetworkGlobals<E> {
pub fn new( pub fn new(
enr: Enr, enr: Enr,
cgc_updates: CGCUpdates,
trusted_peers: Vec<PeerId>, trusted_peers: Vec<PeerId>,
disable_peer_scoring: bool, disable_peer_scoring: bool,
config: Arc<NetworkConfig>, config: Arc<NetworkConfig>,
@@ -82,7 +85,7 @@ impl<E: EthSpec> NetworkGlobals<E> {
backfill_state: RwLock::new(BackFillState::Paused), backfill_state: RwLock::new(BackFillState::Paused),
all_sampling_subnets, all_sampling_subnets,
all_sampling_columns, all_sampling_columns,
custody_group_count: RwLock::new(CustodyGroupCount { value: 0 }), cgc_updates: RwLock::new(cgc_updates),
config, config,
spec, spec,
} }
@@ -157,8 +160,7 @@ impl<E: EthSpec> NetworkGlobals<E> {
/// Returns the custody group count (CGC) /// Returns the custody group count (CGC)
fn custody_group_count(&self, slot: Slot) -> u64 { fn custody_group_count(&self, slot: Slot) -> u64 {
let cgc = self.custody_group_count.read().value; self.cgc_updates.read().at_slot(slot)
todo!("CGC at slot {slot} {cgc}");
} }
/// Returns the count of custody columns this node must sample for block import /// Returns the count of custody columns this node must sample for block import
@@ -169,6 +171,11 @@ impl<E: EthSpec> NetworkGlobals<E> {
.expect("should compute node sampling size from valid chain spec") .expect("should compute node sampling size from valid chain spec")
} }
/// Adds a new CGC value update
pub fn add_cgc_update(&self, update: (Slot, u64)) {
self.cgc_updates.write().add_latest_update(update);
}
/// Returns the number of libp2p connected peers. /// Returns the number of libp2p connected peers.
pub fn connected_peers(&self) -> usize { pub fn connected_peers(&self) -> usize {
self.peers.read().connected_peer_ids().count() self.peers.read().connected_peer_ids().count()
@@ -266,7 +273,32 @@ impl<E: EthSpec> NetworkGlobals<E> {
let keypair = libp2p::identity::secp256k1::Keypair::generate(); let keypair = libp2p::identity::secp256k1::Keypair::generate();
let enr_key: discv5::enr::CombinedKey = discv5::enr::CombinedKey::from_secp256k1(&keypair); let enr_key: discv5::enr::CombinedKey = discv5::enr::CombinedKey::from_secp256k1(&keypair);
let enr = discv5::enr::Enr::builder().build(&enr_key).unwrap(); let enr = discv5::enr::Enr::builder().build(&enr_key).unwrap();
NetworkGlobals::new(enr, trusted_peers, false, config, spec) let cgc_updates = CGCUpdates::new(spec.custody_requirement);
NetworkGlobals::new(enr, cgc_updates, trusted_peers, false, config, spec)
}
}
impl CGCUpdates {
pub fn new(initial_value: u64) -> Self {
Self {
initial_value,
updates: vec![],
}
}
fn at_slot(&self, slot: Slot) -> u64 {
// TODO: Test and fix logic
for (update_slot, cgc) in &self.updates {
if slot > *update_slot {
return *cgc;
}
}
self.initial_value
}
fn add_latest_update(&mut self, update: (Slot, u64)) {
self.updates.push(update);
} }
} }

View File

@@ -11,7 +11,7 @@ pub type EnrSyncCommitteeBitfield<E> = BitVector<<E as EthSpec>::SyncCommitteeSu
pub type Enr = discv5::enr::Enr<discv5::enr::CombinedKey>; pub type Enr = discv5::enr::Enr<discv5::enr::CombinedKey>;
pub use eth2::lighthouse::sync_state::{BackFillState, SyncState}; pub use eth2::lighthouse::sync_state::{BackFillState, SyncState};
pub use globals::NetworkGlobals; pub use globals::{CGCUpdates, NetworkGlobals};
pub use pubsub::{PubsubMessage, SnappyTransform}; pub use pubsub::{PubsubMessage, SnappyTransform};
pub use subnet::{Subnet, SubnetDiscovery}; pub use subnet::{Subnet, SubnetDiscovery};
pub use topics::{ pub use topics::{