Assert DB compatibility

This commit is contained in:
dapplion
2025-04-04 18:32:19 -03:00
parent f6fa2380c7
commit 2384e9659b
6 changed files with 143 additions and 45 deletions

View File

@@ -9,7 +9,7 @@ use parking_lot::RwLock;
use std::collections::HashSet;
use std::sync::Arc;
use types::data_column_custody_group::{
compute_columns_for_custody_group, compute_subnets_from_custody_group, get_custody_groups,
compute_columns_from_custody_groups, compute_subnets_from_custody_groups, get_custody_groups,
};
use types::{CGCUpdates, ChainSpec, ColumnIndex, DataColumnSubnetId, EthSpec, Slot};
@@ -28,9 +28,10 @@ pub struct NetworkGlobals<E: EthSpec> {
pub sync_state: RwLock<SyncState>,
/// The current state of the backfill sync.
pub backfill_state: RwLock<BackFillState>,
/// The computed sampling subnets and columns is stored to avoid re-computing.
all_sampling_subnets: Vec<DataColumnSubnetId>,
all_sampling_columns: Vec<ColumnIndex>,
/// The computed custody groups cached to avoid re-computing.
custody_groups_max_cgc: Vec<u64>,
sampling_columns_max_cgc: Vec<ColumnIndex>,
sampling_subnets_max_cgc: Vec<DataColumnSubnetId>,
/// Dynamic custody group count (CGC)
cgc_updates: RwLock<CGCUpdates>,
/// Network-related configuration. Immutable after initialization.
@@ -52,22 +53,13 @@ impl<E: EthSpec> NetworkGlobals<E> {
// The below `expect` calls will panic on start up if the chain spec config values used
// are invalid
let custody_groups = get_custody_groups(node_id, spec.number_of_custody_groups, &spec)
.expect("should compute node custody groups");
let mut all_sampling_subnets = vec![];
for custody_index in &custody_groups {
let subnets = compute_subnets_from_custody_group(*custody_index, &spec)
.expect("should compute custody subnets for node");
all_sampling_subnets.extend(subnets);
}
let mut all_sampling_columns = vec![];
for custody_index in &custody_groups {
let columns = compute_columns_for_custody_group(*custody_index, &spec)
.expect("should compute custody columns for node");
all_sampling_columns.extend(columns);
}
let custody_groups_max_cgc =
get_custody_groups(node_id, spec.number_of_custody_groups, &spec)
.expect("should compute node custody groups");
let sampling_columns_max_cgc =
compute_columns_from_custody_groups(&custody_groups_max_cgc, &spec).collect::<Vec<_>>();
let sampling_subnets_max_cgc =
compute_subnets_from_custody_groups(&custody_groups_max_cgc, &spec).collect::<Vec<_>>();
Ok(NetworkGlobals {
local_enr: RwLock::new(LocalMetadata::new(enr.clone(), &spec)?),
@@ -77,8 +69,9 @@ impl<E: EthSpec> NetworkGlobals<E> {
gossipsub_subscriptions: RwLock::new(HashSet::new()),
sync_state: RwLock::new(SyncState::Stalled),
backfill_state: RwLock::new(BackFillState::Paused),
all_sampling_subnets,
all_sampling_columns,
custody_groups_max_cgc,
sampling_columns_max_cgc,
sampling_subnets_max_cgc,
cgc_updates: RwLock::new(cgc_updates),
config,
spec,
@@ -124,14 +117,24 @@ impl<E: EthSpec> NetworkGlobals<E> {
}
pub fn sampling_columns(&self, slot: Slot) -> &[ColumnIndex] {
let cgc = self.custody_group_count(slot) as usize;
// Returns as many elements as possible, can't panic as it's upper bounded by len
&self.all_sampling_columns[..self.all_sampling_columns.len().min(cgc)]
let cgc = self.custody_group_count(slot);
self.sampling_columns_for_cgc(cgc)
}
pub fn custody_groups_for_cgc(&self, cgc: u64) -> &[u64] {
&self.custody_groups_max_cgc[..self.custody_groups_max_cgc.len().min(cgc as usize)]
}
pub fn sampling_subnets_for_cgc(&self, cgc: u64) -> &[DataColumnSubnetId] {
// Returns as many elements as possible, can't panic as it's upper bounded by len
&self.all_sampling_subnets[..self.all_sampling_subnets.len().min(cgc as usize)]
// TODO(das): scale this index if custody_groups != subnet_count != column_count
let index = cgc as usize;
&self.sampling_subnets_max_cgc[..self.sampling_subnets_max_cgc.len().min(index)]
}
pub fn sampling_columns_for_cgc(&self, cgc: u64) -> &[ColumnIndex] {
// TODO(das): scale this index if custody_groups != subnet_count != column_count
let index = cgc as usize;
&self.sampling_columns_max_cgc[..self.sampling_columns_max_cgc.len().min(index)]
}
/// Returns the custody group count (CGC)

View File

@@ -26,6 +26,7 @@ use lighthouse_network::{
use logging::crit;
use std::collections::BTreeSet;
use std::{collections::HashSet, pin::Pin, sync::Arc, time::Duration};
use store::metadata::CustodyInfo;
use store::HotColdDB;
use strum::IntoStaticStr;
use task_executor::ShutdownReason;
@@ -238,6 +239,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
// get a reference to the beacon chain store
let store = beacon_chain.store.clone();
let spec = beacon_chain.spec.clone();
// build the current enr_fork_id for adding to our local ENR
let enr_fork_id = beacon_chain.enr_fork_id();
@@ -247,15 +249,13 @@ impl<T: BeaconChainTypes> NetworkService<T> {
let next_fork_subscriptions = Box::pin(next_fork_subscriptions_delay(&beacon_chain).into());
let next_unsubscribe = Box::pin(None.into());
let current_slot = beacon_chain
.slot()
.unwrap_or(beacon_chain.spec.genesis_slot);
let current_slot = beacon_chain.slot().unwrap_or(spec.genesis_slot);
// Create a fork context for the given config and genesis validators root
let fork_context = Arc::new(ForkContext::new::<T::EthSpec>(
current_slot,
beacon_chain.genesis_validators_root,
&beacon_chain.spec,
&spec,
));
debug!(fork_name = ?fork_context.current_fork(), "Current fork");
@@ -265,17 +265,10 @@ impl<T: BeaconChainTypes> NetworkService<T> {
config: config.clone(),
enr_fork_id,
fork_context: fork_context.clone(),
chain_spec: beacon_chain.spec.clone(),
chain_spec: spec.clone(),
libp2p_registry,
};
if let Some(_disk_custody_info) = store
.get_custody_info()
.map_err(|e| format!("Unable to read custody info from the DB: {e:?}"))?
{
// TODO(das): check that list of columns is compatible
}
// If there are no stored disk_cgc_updates `Network::new` will default to empty ones with
// the minimum CGC.
let initial_cgc_updates = store
@@ -286,6 +279,58 @@ impl<T: BeaconChainTypes> NetworkService<T> {
let (mut libp2p, network_globals) =
Network::new(executor.clone(), service_context, initial_cgc_updates).await?;
// Assert compatibility of the previous DB against the new PeerID and CGC
if let Some(disk_custody_info) = store
.get_custody_info()
.map_err(|e| format!("Unable to read custody info from the DB: {e:?}"))?
{
// TODO(das): only consider the cgc steps within the DA window
for (slot, cgc) in network_globals.dump_cgc_updates().iter() {
let custody_groups_disk = disk_custody_info
.custody_groups_for_cgc(cgc)
.iter()
.copied()
.collect::<HashSet<_>>();
let custody_groups_now = network_globals
.custody_groups_for_cgc(cgc)
.iter()
.copied()
.collect::<HashSet<_>>();
if !custody_groups_disk.is_superset(&custody_groups_now) {
let error_message = format!(
"Incompatible database with current PeerID and config.
At slots > {slot} this node has a CGC of {cgc}.
Custody groups on disk {custody_groups_disk:?}
!=
Custody groups now {custody_groups_now:?}"
);
// Only hard error after PeerDAS activation
if spec
.fork_name_at_slot::<T::EthSpec>(beacon_chain.head().head_slot())
.fulu_enabled()
{
return Err(error_message);
} else {
warn!(
msg = error_message,
"Incompatible PeerDAS database, after Fulu activation this will result in an error"
);
}
}
}
} else {
// Write CustodyInfo only if there's none written before
let custody_info_with_new_peerid = CustodyInfo::new(
network_globals.custody_groups_for_cgc(spec.number_of_custody_groups),
&spec,
)
.map_err(|e| format!("Invalid custody groups for max CGC: {e:?}"))?;
store
.put_custody_info(&custody_info_with_new_peerid)
.map_err(|e| format!("Unable to write custody info from the DB: {e:?}"))?;
}
// Repopulate the DHT with stored ENR's if discovery is not disabled.
if !config.disable_discovery {
let enrs_to_load = load_dht::<T::EthSpec, T::HotStore, T::ColdStore>(store.clone());
@@ -336,7 +381,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
validator_subscription_recv,
} = network_receivers;
let epoch_in_seconds = fork_context.spec.seconds_per_slot * T::EthSpec::slots_per_epoch();
let epoch_in_seconds = spec.seconds_per_slot * T::EthSpec::slots_per_epoch();
// create the network service and spawn the task
let network_service = NetworkService {

View File

@@ -2466,7 +2466,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
/// Store the given `custody_info` to disk.
pub fn put_custody_info_in_batch(&self, custody_info: &CustodyInfo) -> Result<(), Error> {
pub fn put_custody_info(&self, custody_info: &CustodyInfo) -> Result<(), Error> {
let kv_store_op = custody_info.as_kv_store_op(CUSTODY_INFO_KEY);
self.hot_db.do_atomically(vec![kv_store_op])
}

View File

@@ -2,7 +2,7 @@ use crate::{DBColumn, Error, StoreItem};
use serde::{Deserialize, Serialize};
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use types::{typenum::U4096, CGCUpdates, Checkpoint, Hash256, Slot, VariableList};
use types::{typenum::U4096, CGCUpdates, ChainSpec, Checkpoint, Hash256, Slot, VariableList};
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(22);
@@ -252,10 +252,29 @@ impl StoreItem for DataColumnInfo {
/// Database parameters relevant to data column sync.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize)]
pub struct CustodyInfo {
/// Given a PeerID, compute the set of custody columns with the maximum CGC value, then sort
/// them numerically.
/// Given a PeerID, compute the custody groups for the maximum CGC value.
///
/// 4096 is a random max limit that will never be reached
pub ordered_custody_columns: VariableList<u64, U4096>,
custody_groups_max_cgc: VariableList<u64, U4096>,
}
impl CustodyInfo {
pub fn new(custody_groups_max_cgc: &[u64], spec: &ChainSpec) -> Result<Self, String> {
if custody_groups_max_cgc.len() != spec.number_of_custody_groups as usize {
return Err(format!(
"custody_groups_max_cgc {} len != number_of_custody_groups",
custody_groups_max_cgc.len()
));
}
Ok(Self {
custody_groups_max_cgc: VariableList::new(custody_groups_max_cgc.to_vec())
.map_err(|e| format!("Max CGC > 4096: {e:?}"))?,
})
}
pub fn custody_groups_for_cgc(&self, cgc: u64) -> &[u64] {
&self.custody_groups_max_cgc[..self.custody_groups_max_cgc.len().min(cgc as usize)]
}
}
impl StoreItem for CustodyInfo {

View File

@@ -33,4 +33,8 @@ impl CGCUpdates {
.push(update)
.map_err(|e| format!("Updates list full: {e:?}"))
}
pub fn iter(&self) -> impl Iterator<Item = (Slot, u64)> + '_ {
std::iter::once((Slot::new(0), self.initial_value)).chain(self.updates.iter().copied())
}
}

View File

@@ -104,6 +104,33 @@ pub fn compute_subnets_from_custody_group(
Ok(result)
}
pub fn compute_subnets_from_custody_groups<'a>(
custody_groups: &'a [CustodyIndex],
spec: &'a ChainSpec,
) -> impl Iterator<Item = DataColumnSubnetId> + 'a {
custody_groups
.iter()
.flat_map(|custody_group| {
compute_columns_for_custody_group(*custody_group, spec)
.expect("max(custody_groups) < number_of_custody_groups")
.map(|column_index| DataColumnSubnetId::from_column_index(column_index, spec))
})
.unique()
}
pub fn compute_columns_from_custody_groups<'a>(
custody_groups: &'a [CustodyIndex],
spec: &'a ChainSpec,
) -> impl Iterator<Item = ColumnIndex> + 'a {
custody_groups
.iter()
.flat_map(|custody_group| {
compute_columns_for_custody_group(*custody_group, spec)
.expect("max(custody_groups) < number_of_custody_groups")
})
.unique()
}
#[cfg(test)]
mod test {
use super::*;