Merge remote-tracking branch 'sigp/peerdas-devnet-7' into peerdas-rangesync

This commit is contained in:
dapplion
2025-06-12 15:40:00 +02:00
45 changed files with 1945 additions and 378 deletions

View File

@@ -49,6 +49,7 @@ use tracing::{debug, error, info, trace, warn};
use types::{ChainSpec, EnrForkId, EthSpec};
mod subnet_predicate;
use crate::discovery::enr::PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY;
pub use subnet_predicate::subnet_predicate;
use types::non_zero_usize::new_non_zero_usize;
@@ -476,6 +477,15 @@ impl<E: EthSpec> Discovery<E> {
Ok(())
}
pub fn update_enr_cgc(&mut self, custody_group_count: u64) -> Result<(), String> {
self.discv5
.enr_insert(PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY, &custody_group_count)
.map_err(|e| format!("{:?}", e))?;
enr::save_enr_to_disk(Path::new(&self.enr_dir), &self.local_enr());
*self.network_globals.local_enr.write() = self.discv5.local_enr();
Ok(())
}
/// Adds/Removes a subnet from the ENR attnets/syncnets Bitfield
pub fn update_enr_bitfield(&mut self, subnet: Subnet, value: bool) -> Result<(), String> {
let local_enr = self.discv5.local_enr();

View File

@@ -177,6 +177,7 @@ impl<E: EthSpec> Network<E> {
pub async fn new(
executor: task_executor::TaskExecutor,
mut ctx: ServiceContext<'_>,
custody_group_count: u64,
) -> Result<(Self, Arc<NetworkGlobals<E>>), String> {
let config = ctx.config.clone();
trace!("Libp2p Service starting");
@@ -201,11 +202,12 @@ impl<E: EthSpec> Network<E> {
)?;
// Construct the metadata
let custody_group_count = ctx.chain_spec.is_peer_das_scheduled().then(|| {
ctx.chain_spec
.custody_group_count(config.subscribe_all_data_column_subnets)
});
let meta_data = utils::load_or_build_metadata(&config.network_dir, custody_group_count);
let custody_group_count_metadata = ctx
.chain_spec
.is_peer_das_scheduled()
.then_some(custody_group_count);
let meta_data =
utils::load_or_build_metadata(&config.network_dir, custody_group_count_metadata);
let seq_number = *meta_data.seq_number();
let globals = NetworkGlobals::new(
enr,
@@ -885,6 +887,23 @@ impl<E: EthSpec> Network<E> {
}
}
/// Subscribe to all data columns determined by the cgc.
#[instrument(parent = None,
level = "trace",
fields(service = "libp2p"),
name = "libp2p",
skip_all
)]
pub fn subscribe_new_data_column_subnets(&mut self, custody_column_count: u64) {
self.network_globals
.update_data_column_subnets(custody_column_count);
for column in self.network_globals.sampling_subnets() {
let kind = GossipKind::DataColumnSidecar(column);
self.subscribe_kind(kind);
}
}
/// Returns the scoring parameters for a topic if set.
#[instrument(parent = None,
level = "trace",
@@ -1254,6 +1273,21 @@ impl<E: EthSpec> Network<E> {
self.update_metadata_bitfields();
}
/// Updates the cgc value in the ENR.
#[instrument(parent = None,
level = "trace",
fields(service = "libp2p"),
name = "libp2p",
skip_all
)]
pub fn update_enr_cgc(&mut self, new_custody_group_count: u64) {
if let Err(e) = self.discovery_mut().update_enr_cgc(new_custody_group_count) {
crit!(error = e, "Could not update cgc in ENR");
}
// update the local meta data which informs our peers of the update during PINGS
self.update_metadata_cgc(new_custody_group_count);
}
/// Attempts to discover new peers for a given subnet. The `min_ttl` gives the time at which we
/// would like to retain the peers for.
#[instrument(parent = None,
@@ -1368,6 +1402,28 @@ impl<E: EthSpec> Network<E> {
utils::save_metadata_to_disk(&self.network_dir, meta_data);
}
#[instrument(parent = None,
level = "trace",
fields(service = "libp2p"),
name = "libp2p",
skip_all
)]
fn update_metadata_cgc(&mut self, custody_group_count: u64) {
let mut meta_data_w = self.network_globals.local_metadata.write();
*meta_data_w.seq_number_mut() += 1;
if let Ok(cgc) = meta_data_w.custody_group_count_mut() {
*cgc = custody_group_count;
}
let seq_number = *meta_data_w.seq_number();
let meta_data = meta_data_w.clone();
drop(meta_data_w);
self.eth2_rpc_mut().update_seq_number(seq_number);
// Save the updated metadata to disk
utils::save_metadata_to_disk(&self.network_dir, meta_data);
}
/// Sends a Ping request to the peer.
#[instrument(parent = None,
level = "trace",

View File

@@ -31,10 +31,8 @@ pub struct NetworkGlobals<E: EthSpec> {
/// The current state of the backfill sync.
pub backfill_state: RwLock<BackFillState>,
/// The computed sampling subnets and columns is stored to avoid re-computing.
pub sampling_subnets: HashSet<DataColumnSubnetId>,
pub sampling_columns: HashSet<ColumnIndex>,
/// Constant custody group count (CGC) set at startup
custody_group_count: u64,
pub sampling_subnets: RwLock<HashSet<DataColumnSubnetId>>,
pub sampling_columns: RwLock<HashSet<ColumnIndex>>,
/// Network-related configuration. Immutable after initialization.
pub config: Arc<NetworkConfig>,
/// Ethereum chain configuration. Immutable after initialization.
@@ -87,6 +85,13 @@ impl<E: EthSpec> NetworkGlobals<E> {
sampling_columns.extend(columns);
}
tracing::debug!(
cgc = custody_group_count,
?sampling_columns,
?sampling_subnets,
"Starting node with custody params"
);
NetworkGlobals {
local_enr: RwLock::new(enr.clone()),
peer_id: RwLock::new(enr.peer_id()),
@@ -96,14 +101,40 @@ impl<E: EthSpec> NetworkGlobals<E> {
gossipsub_subscriptions: RwLock::new(HashSet::new()),
sync_state: RwLock::new(SyncState::Stalled),
backfill_state: RwLock::new(BackFillState::Paused),
sampling_subnets,
sampling_columns,
custody_group_count,
sampling_subnets: RwLock::new(sampling_subnets),
sampling_columns: RwLock::new(sampling_columns),
config,
spec,
}
}
/// Update the sampling subnets based on an updated cgc.
pub fn update_data_column_subnets(&self, custody_group_count: u64) {
// The below `expect` calls will panic on start up if the chain spec config values used
// are invalid
let sampling_size = self
.spec
.sampling_size(custody_group_count)
.expect("should compute node sampling size from valid chain spec");
let custody_groups =
get_custody_groups(self.local_enr().node_id().raw(), sampling_size, &self.spec)
.expect("should compute node custody groups");
let mut sampling_subnets = self.sampling_subnets.write();
for custody_index in &custody_groups {
let subnets = compute_subnets_from_custody_group(*custody_index, &self.spec)
.expect("should compute custody subnets for node");
sampling_subnets.extend(subnets);
}
let mut sampling_columns = self.sampling_columns.write();
for custody_index in &custody_groups {
let columns = compute_columns_for_custody_group(*custody_index, &self.spec)
.expect("should compute custody columns for node");
sampling_columns.extend(columns);
}
}
/// Returns the local ENR from the underlying Discv5 behaviour that external peers may connect
/// to.
pub fn local_enr(&self) -> Enr {
@@ -120,19 +151,6 @@ impl<E: EthSpec> NetworkGlobals<E> {
self.listen_multiaddrs.read().clone()
}
/// Returns true if this node is configured as a PeerDAS supernode
pub fn is_supernode(&self) -> bool {
self.custody_group_count == self.spec.number_of_custody_groups
}
/// Returns the count of custody columns this node must sample for block import
pub fn custody_columns_count(&self) -> u64 {
// This only panics if the chain spec contains invalid values
self.spec
.sampling_size(self.custody_group_count)
.expect("should compute node sampling size from valid chain spec")
}
/// Returns the number of libp2p connected peers.
pub fn connected_peers(&self) -> usize {
self.peers.read().connected_peer_ids().count()
@@ -226,10 +244,18 @@ impl<E: EthSpec> NetworkGlobals<E> {
enable_light_client_server: self.config.enable_light_client_server,
subscribe_all_subnets: self.config.subscribe_all_subnets,
subscribe_all_data_column_subnets: self.config.subscribe_all_data_column_subnets,
sampling_subnets: &self.sampling_subnets,
sampling_subnets: self.sampling_subnets.read().clone(),
}
}
pub fn sampling_columns(&self) -> HashSet<ColumnIndex> {
self.sampling_columns.read().clone()
}
pub fn sampling_subnets(&self) -> HashSet<DataColumnSubnetId> {
self.sampling_subnets.read().clone()
}
/// TESTING ONLY. Build a dummy NetworkGlobals instance.
pub fn new_test_globals(
trusted_peers: Vec<PeerId>,
@@ -302,7 +328,7 @@ mod test {
Arc::new(spec),
);
assert_eq!(
globals.sampling_subnets.len(),
globals.sampling_subnets.read().len(),
subnet_sampling_size as usize
);
}
@@ -325,7 +351,7 @@ mod test {
Arc::new(spec),
);
assert_eq!(
globals.sampling_columns.len(),
globals.sampling_columns.read().len(),
subnet_sampling_size as usize
);
}

View File

@@ -26,11 +26,11 @@ pub const LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_update";
pub const LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update";
#[derive(Debug)]
pub struct TopicConfig<'a> {
pub struct TopicConfig {
pub enable_light_client_server: bool,
pub subscribe_all_subnets: bool,
pub subscribe_all_data_column_subnets: bool,
pub sampling_subnets: &'a HashSet<DataColumnSubnetId>,
pub sampling_subnets: HashSet<DataColumnSubnetId>,
}
/// Returns all the topics the node should subscribe at `fork_name`
@@ -85,7 +85,7 @@ pub fn core_topics_to_subscribe<E: EthSpec>(
topics.push(GossipKind::DataColumnSidecar(i.into()));
}
} else {
for subnet in opts.sampling_subnets {
for subnet in &opts.sampling_subnets {
topics.push(GossipKind::DataColumnSidecar(*subnet));
}
}
@@ -126,7 +126,7 @@ pub fn all_topics_at_fork<E: EthSpec>(fork: ForkName, spec: &ChainSpec) -> Vec<G
enable_light_client_server: true,
subscribe_all_subnets: true,
subscribe_all_data_column_subnets: true,
sampling_subnets: &sampling_subnets,
sampling_subnets,
};
core_topics_to_subscribe::<E>(fork, &opts, spec)
}
@@ -521,7 +521,7 @@ mod tests {
enable_light_client_server: false,
subscribe_all_subnets: false,
subscribe_all_data_column_subnets: false,
sampling_subnets,
sampling_subnets: sampling_subnets.clone(),
}
}

View File

@@ -118,6 +118,7 @@ pub async fn build_libp2p_instance(
let (signal, exit) = async_channel::bounded(1);
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let executor = task_executor::TaskExecutor::new(rt, exit, shutdown_tx, service_name);
let custody_group_count = chain_spec.custody_requirement;
let libp2p_context = lighthouse_network::Context {
config,
enr_fork_id: EnrForkId::default(),
@@ -126,7 +127,7 @@ pub async fn build_libp2p_instance(
libp2p_registry: None,
};
Libp2pInstance(
LibP2PService::new(executor, libp2p_context)
LibP2PService::new(executor, libp2p_context, custody_group_count)
.await
.expect("should build libp2p instance")
.0,