diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index a0a57ce8c6..c439583d55 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -9,7 +9,7 @@ use parking_lot::RwLock; use std::collections::HashSet; use std::sync::Arc; use types::data_column_custody_group::{ - compute_columns_for_custody_group, compute_subnets_from_custody_group, get_custody_groups, + compute_columns_from_custody_groups, compute_subnets_from_custody_groups, get_custody_groups, }; use types::{CGCUpdates, ChainSpec, ColumnIndex, DataColumnSubnetId, EthSpec, Slot}; @@ -28,9 +28,10 @@ pub struct NetworkGlobals { pub sync_state: RwLock, /// The current state of the backfill sync. pub backfill_state: RwLock, - /// The computed sampling subnets and columns is stored to avoid re-computing. - all_sampling_subnets: Vec, - all_sampling_columns: Vec, + /// The computed custody groups cached to avoid re-computing. + custody_groups_max_cgc: Vec, + sampling_columns_max_cgc: Vec, + sampling_subnets_max_cgc: Vec, /// Dynamic custody group count (CGC) cgc_updates: RwLock, /// Network-related configuration. Immutable after initialization. @@ -52,22 +53,13 @@ impl NetworkGlobals { // The below `expect` calls will panic on start up if the chain spec config values used // are invalid - let custody_groups = get_custody_groups(node_id, spec.number_of_custody_groups, &spec) - .expect("should compute node custody groups"); - - let mut all_sampling_subnets = vec![]; - for custody_index in &custody_groups { - let subnets = compute_subnets_from_custody_group(*custody_index, &spec) - .expect("should compute custody subnets for node"); - all_sampling_subnets.extend(subnets); - } - - let mut all_sampling_columns = vec![]; - for custody_index in &custody_groups { - let columns = compute_columns_for_custody_group(*custody_index, &spec) - .expect("should compute custody columns for node"); - all_sampling_columns.extend(columns); - } + let custody_groups_max_cgc = + get_custody_groups(node_id, spec.number_of_custody_groups, &spec) + .expect("should compute node custody groups"); + let sampling_columns_max_cgc = + compute_columns_from_custody_groups(&custody_groups_max_cgc, &spec).collect::>(); + let sampling_subnets_max_cgc = + compute_subnets_from_custody_groups(&custody_groups_max_cgc, &spec).collect::>(); Ok(NetworkGlobals { local_enr: RwLock::new(LocalMetadata::new(enr.clone(), &spec)?), @@ -77,8 +69,9 @@ impl NetworkGlobals { gossipsub_subscriptions: RwLock::new(HashSet::new()), sync_state: RwLock::new(SyncState::Stalled), backfill_state: RwLock::new(BackFillState::Paused), - all_sampling_subnets, - all_sampling_columns, + custody_groups_max_cgc, + sampling_columns_max_cgc, + sampling_subnets_max_cgc, cgc_updates: RwLock::new(cgc_updates), config, spec, @@ -124,14 +117,24 @@ impl NetworkGlobals { } pub fn sampling_columns(&self, slot: Slot) -> &[ColumnIndex] { - let cgc = self.custody_group_count(slot) as usize; - // Returns as many elements as possible, can't panic as it's upper bounded by len - &self.all_sampling_columns[..self.all_sampling_columns.len().min(cgc)] + let cgc = self.custody_group_count(slot); + self.sampling_columns_for_cgc(cgc) + } + + pub fn custody_groups_for_cgc(&self, cgc: u64) -> &[u64] { + &self.custody_groups_max_cgc[..self.custody_groups_max_cgc.len().min(cgc as usize)] } pub fn sampling_subnets_for_cgc(&self, cgc: u64) -> &[DataColumnSubnetId] { - // Returns as many elements as possible, can't panic as it's upper bounded by len - &self.all_sampling_subnets[..self.all_sampling_subnets.len().min(cgc as usize)] + // TODO(das): scale this index if custody_groups != subnet_count != column_count + let index = cgc as usize; + &self.sampling_subnets_max_cgc[..self.sampling_subnets_max_cgc.len().min(index)] + } + + pub fn sampling_columns_for_cgc(&self, cgc: u64) -> &[ColumnIndex] { + // TODO(das): scale this index if custody_groups != subnet_count != column_count + let index = cgc as usize; + &self.sampling_columns_max_cgc[..self.sampling_columns_max_cgc.len().min(index)] } /// Returns the custody group count (CGC) diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 8873b8819f..9f98cda6e8 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -26,6 +26,7 @@ use lighthouse_network::{ use logging::crit; use std::collections::BTreeSet; use std::{collections::HashSet, pin::Pin, sync::Arc, time::Duration}; +use store::metadata::CustodyInfo; use store::HotColdDB; use strum::IntoStaticStr; use task_executor::ShutdownReason; @@ -238,6 +239,7 @@ impl NetworkService { // get a reference to the beacon chain store let store = beacon_chain.store.clone(); + let spec = beacon_chain.spec.clone(); // build the current enr_fork_id for adding to our local ENR let enr_fork_id = beacon_chain.enr_fork_id(); @@ -247,15 +249,13 @@ impl NetworkService { let next_fork_subscriptions = Box::pin(next_fork_subscriptions_delay(&beacon_chain).into()); let next_unsubscribe = Box::pin(None.into()); - let current_slot = beacon_chain - .slot() - .unwrap_or(beacon_chain.spec.genesis_slot); + let current_slot = beacon_chain.slot().unwrap_or(spec.genesis_slot); // Create a fork context for the given config and genesis validators root let fork_context = Arc::new(ForkContext::new::( current_slot, beacon_chain.genesis_validators_root, - &beacon_chain.spec, + &spec, )); debug!(fork_name = ?fork_context.current_fork(), "Current fork"); @@ -265,17 +265,10 @@ impl NetworkService { config: config.clone(), enr_fork_id, fork_context: fork_context.clone(), - chain_spec: beacon_chain.spec.clone(), + chain_spec: spec.clone(), libp2p_registry, }; - if let Some(_disk_custody_info) = store - .get_custody_info() - .map_err(|e| format!("Unable to read custody info from the DB: {e:?}"))? - { - // TODO(das): check that list of columns is compatible - } - // If there are no stored disk_cgc_updates `Network::new` will default to empty ones with // the minimum CGC. let initial_cgc_updates = store @@ -286,6 +279,58 @@ impl NetworkService { let (mut libp2p, network_globals) = Network::new(executor.clone(), service_context, initial_cgc_updates).await?; + // Assert compatibility of the previous DB against the new PeerID and CGC + if let Some(disk_custody_info) = store + .get_custody_info() + .map_err(|e| format!("Unable to read custody info from the DB: {e:?}"))? + { + // TODO(das): only consider the cgc steps within the DA window + for (slot, cgc) in network_globals.dump_cgc_updates().iter() { + let custody_groups_disk = disk_custody_info + .custody_groups_for_cgc(cgc) + .iter() + .copied() + .collect::>(); + let custody_groups_now = network_globals + .custody_groups_for_cgc(cgc) + .iter() + .copied() + .collect::>(); + + if !custody_groups_disk.is_superset(&custody_groups_now) { + let error_message = format!( + "Incompatible database with current PeerID and config. + At slots > {slot} this node has a CGC of {cgc}. + Custody groups on disk {custody_groups_disk:?} + != + Custody groups now {custody_groups_now:?}" + ); + // Only hard error after PeerDAS activation + if spec + .fork_name_at_slot::(beacon_chain.head().head_slot()) + .fulu_enabled() + { + return Err(error_message); + } else { + warn!( + msg = error_message, + "Incompatible PeerDAS database, after Fulu activation this will result in an error" + ); + } + } + } + } else { + // Write CustodyInfo only if there's none written before + let custody_info_with_new_peerid = CustodyInfo::new( + network_globals.custody_groups_for_cgc(spec.number_of_custody_groups), + &spec, + ) + .map_err(|e| format!("Invalid custody groups for max CGC: {e:?}"))?; + store + .put_custody_info(&custody_info_with_new_peerid) + .map_err(|e| format!("Unable to write custody info from the DB: {e:?}"))?; + } + // Repopulate the DHT with stored ENR's if discovery is not disabled. if !config.disable_discovery { let enrs_to_load = load_dht::(store.clone()); @@ -336,7 +381,7 @@ impl NetworkService { validator_subscription_recv, } = network_receivers; - let epoch_in_seconds = fork_context.spec.seconds_per_slot * T::EthSpec::slots_per_epoch(); + let epoch_in_seconds = spec.seconds_per_slot * T::EthSpec::slots_per_epoch(); // create the network service and spawn the task let network_service = NetworkService { diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 2f737319a7..5ac8c19931 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -2466,7 +2466,7 @@ impl, Cold: ItemStore> HotColdDB } /// Store the given `custody_info` to disk. - pub fn put_custody_info_in_batch(&self, custody_info: &CustodyInfo) -> Result<(), Error> { + pub fn put_custody_info(&self, custody_info: &CustodyInfo) -> Result<(), Error> { let kv_store_op = custody_info.as_kv_store_op(CUSTODY_INFO_KEY); self.hot_db.do_atomically(vec![kv_store_op]) } diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index 1f73983a5b..fa7e475ea2 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -2,7 +2,7 @@ use crate::{DBColumn, Error, StoreItem}; use serde::{Deserialize, Serialize}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; -use types::{typenum::U4096, CGCUpdates, Checkpoint, Hash256, Slot, VariableList}; +use types::{typenum::U4096, CGCUpdates, ChainSpec, Checkpoint, Hash256, Slot, VariableList}; pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(22); @@ -252,10 +252,29 @@ impl StoreItem for DataColumnInfo { /// Database parameters relevant to data column sync. #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize)] pub struct CustodyInfo { - /// Given a PeerID, compute the set of custody columns with the maximum CGC value, then sort - /// them numerically. + /// Given a PeerID, compute the custody groups for the maximum CGC value. + /// /// 4096 is a random max limit that will never be reached - pub ordered_custody_columns: VariableList, + custody_groups_max_cgc: VariableList, +} + +impl CustodyInfo { + pub fn new(custody_groups_max_cgc: &[u64], spec: &ChainSpec) -> Result { + if custody_groups_max_cgc.len() != spec.number_of_custody_groups as usize { + return Err(format!( + "custody_groups_max_cgc {} len != number_of_custody_groups", + custody_groups_max_cgc.len() + )); + } + Ok(Self { + custody_groups_max_cgc: VariableList::new(custody_groups_max_cgc.to_vec()) + .map_err(|e| format!("Max CGC > 4096: {e:?}"))?, + }) + } + + pub fn custody_groups_for_cgc(&self, cgc: u64) -> &[u64] { + &self.custody_groups_max_cgc[..self.custody_groups_max_cgc.len().min(cgc as usize)] + } } impl StoreItem for CustodyInfo { diff --git a/consensus/types/src/custody.rs b/consensus/types/src/custody.rs index 4fc0642615..f038a8589d 100644 --- a/consensus/types/src/custody.rs +++ b/consensus/types/src/custody.rs @@ -33,4 +33,8 @@ impl CGCUpdates { .push(update) .map_err(|e| format!("Updates list full: {e:?}")) } + + pub fn iter(&self) -> impl Iterator + '_ { + std::iter::once((Slot::new(0), self.initial_value)).chain(self.updates.iter().copied()) + } } diff --git a/consensus/types/src/data_column_custody_group.rs b/consensus/types/src/data_column_custody_group.rs index 931dfe096a..2d727de91c 100644 --- a/consensus/types/src/data_column_custody_group.rs +++ b/consensus/types/src/data_column_custody_group.rs @@ -104,6 +104,33 @@ pub fn compute_subnets_from_custody_group( Ok(result) } +pub fn compute_subnets_from_custody_groups<'a>( + custody_groups: &'a [CustodyIndex], + spec: &'a ChainSpec, +) -> impl Iterator + 'a { + custody_groups + .iter() + .flat_map(|custody_group| { + compute_columns_for_custody_group(*custody_group, spec) + .expect("max(custody_groups) < number_of_custody_groups") + .map(|column_index| DataColumnSubnetId::from_column_index(column_index, spec)) + }) + .unique() +} + +pub fn compute_columns_from_custody_groups<'a>( + custody_groups: &'a [CustodyIndex], + spec: &'a ChainSpec, +) -> impl Iterator + 'a { + custody_groups + .iter() + .flat_map(|custody_group| { + compute_columns_for_custody_group(*custody_group, spec) + .expect("max(custody_groups) < number_of_custody_groups") + }) + .unique() +} + #[cfg(test)] mod test { use super::*;