More progress

This commit is contained in:
dapplion
2025-04-04 14:16:28 -03:00
parent 614c01698d
commit 661bfebdf0
10 changed files with 148 additions and 101 deletions

View File

@@ -7165,7 +7165,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Register a local validator that has connected to the Beacon REST API
pub fn register_local_validator(&self, validator_index: u64) {
// TODO(das): persist `last_seen_local_validators` to disk on shutdown and once in a while
self.validator_monitor
.write()
.auto_register_local_validator(validator_index);

View File

@@ -430,11 +430,14 @@ impl<E: EthSpec> ValidatorMonitor<E> {
let mut s = Self {
validators: <_>::default(),
indices: <_>::default(),
// Don't persist the last seen time of indices, and reset them to just seen on start-up.
// Otherwise if the node crashes for more than `ttl` it will forget that it had indices.
// We will only prune an index if it is not seen during runtime for longer than `ttl`.
last_seen_local_validators: HashMap::from_iter(
persisted_local_indices
.indices
.iter()
.map(|e| (e.index, Duration::from_secs(e.last_seen_timestamp_sec))),
.into_iter()
.map(|index| (index, timestamp_now())),
),
auto_register,
individual_tracking_threshold,
@@ -465,7 +468,6 @@ impl<E: EthSpec> ValidatorMonitor<E> {
/// Prunes all validators not seen for longer than `ttl` (time to live)
pub fn prune_registered_local_validators(&mut self, ttl: Duration) {
let now = timestamp_now();
// Prune expired keys
self.last_seen_local_validators
.retain(|_, last_seen| now - *last_seen < ttl);
}
@@ -479,12 +481,8 @@ impl<E: EthSpec> ValidatorMonitor<E> {
pub fn to_persisted_local_validators(&self) -> Result<PersistedLocalIndices, ssz_types::Error> {
Ok(PersistedLocalIndices {
indices: VariableList::new(
self.last_seen_local_validators
.iter()
.map(|(index, last_seen)| PersistedLocalIndex {
index: *index,
last_seen_timestamp_sec: last_seen.as_secs(),
})
self.get_registered_local_validators()
.copied()
.collect::<Vec<_>>(),
)?,
})
@@ -2452,13 +2450,7 @@ fn min_opt<T: Ord>(a: Option<T>, b: Option<T>) -> Option<T> {
// Using 524288 as a really high limit that's never meant to be reached
#[derive(Encode, Decode, Default)]
pub struct PersistedLocalIndices {
indices: VariableList<PersistedLocalIndex, U524288>,
}
#[derive(Encode, Decode)]
pub struct PersistedLocalIndex {
index: u64,
last_seen_timestamp_sec: u64,
indices: VariableList<u64, U524288>,
}
impl StoreItem for PersistedLocalIndices {

View File

@@ -87,11 +87,11 @@ use tokio_stream::{
use tracing::{debug, error, info, warn};
use types::{
fork_versioned_response::EmptyMetadata, Attestation, AttestationData, AttestationShufflingId,
AttesterSlashing, BeaconStateError, ChainSpec, Checkpoint, CommitteeCache, ConfigAndPreset,
Epoch, EthSpec, ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData,
ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock,
SignedBlsToExecutionChange, SignedContributionAndProof, SignedValidatorRegistrationData,
SignedVoluntaryExit, Slot, SyncCommitteeMessage, SyncContributionData,
AttesterSlashing, BeaconStateError, Checkpoint, CommitteeCache, ConfigAndPreset, Epoch,
EthSpec, ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing,
RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange,
SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot,
SyncCommitteeMessage, SyncContributionData,
};
use validator::pubkey_to_validator_index;
use version::{
@@ -2907,11 +2907,9 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(network_globals.clone())
.and(chain_filter.clone())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
chain: Arc<BeaconChain<T>>| {
network_globals: Arc<NetworkGlobals<T::EthSpec>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let enr = network_globals.local_enr();
let p2p_addresses = enr.multiaddr_p2p_tcp();
@@ -2921,10 +2919,7 @@ pub fn serve<T: BeaconChainTypes>(
enr,
p2p_addresses,
discovery_addresses,
metadata: from_meta_data::<T::EthSpec>(
&network_globals.local_metadata(),
&chain.spec,
),
metadata: from_meta_data::<T::EthSpec>(&network_globals.local_metadata()),
}))
})
},
@@ -4815,32 +4810,26 @@ pub fn serve<T: BeaconChainTypes>(
Ok(http_server)
}
fn from_meta_data<E: EthSpec>(meta_data: &MetaData<E>, spec: &ChainSpec) -> api_types::MetaData {
fn from_meta_data<E: EthSpec>(meta_data: &MetaData<E>) -> api_types::MetaData {
let format_hex = |bytes: &[u8]| format!("0x{}", hex::encode(bytes));
let seq_number = *meta_data.seq_number();
let attnets = format_hex(&meta_data.attnets().clone().into_bytes());
let syncnets = format_hex(
&meta_data
.syncnets()
.cloned()
.unwrap_or_default()
.into_bytes(),
);
if spec.is_peer_das_scheduled() {
api_types::MetaData::V3(api_types::MetaDataV3 {
seq_number,
attnets,
syncnets,
custody_group_count: meta_data.custody_group_count().cloned().unwrap_or_default(),
})
} else {
api_types::MetaData::V2(api_types::MetaDataV2 {
seq_number,
attnets,
syncnets,
})
match meta_data {
MetaData::V1(meta_data) => api_types::MetaData::V2(api_types::MetaDataV2 {
seq_number: meta_data.seq_number,
attnets: format_hex(meta_data.attnets.as_slice()),
syncnets: format_hex(<_>::default()),
}),
MetaData::V2(meta_data) => api_types::MetaData::V2(api_types::MetaDataV2 {
seq_number: meta_data.seq_number,
attnets: format_hex(meta_data.attnets.as_slice()),
syncnets: format_hex(meta_data.syncnets.as_slice()),
}),
MetaData::V3(meta_data) => api_types::MetaData::V3(api_types::MetaDataV3 {
seq_number: meta_data.seq_number,
attnets: format_hex(meta_data.attnets.as_slice()),
syncnets: format_hex(meta_data.syncnets.as_slice()),
custody_group_count: meta_data.custody_group_count,
}),
}
}

View File

@@ -40,7 +40,7 @@ pub trait Eth2Enr {
) -> Result<EnrSyncCommitteeBitfield<E>, &'static str>;
/// The peerdas custody group count associated with the ENR.
fn custody_group_count<E: EthSpec>(&self, spec: &ChainSpec) -> Result<u64, &'static str>;
fn custody_group_count(&self, spec: &ChainSpec) -> Result<u64, &'static str>;
fn eth2(&self) -> Result<EnrForkId, &'static str>;
}
@@ -68,17 +68,19 @@ impl Eth2Enr for Enr {
.map_err(|_| "Could not decode the ENR syncnets bitfield")
}
fn custody_group_count<E: EthSpec>(&self, spec: &ChainSpec) -> Result<u64, &'static str> {
fn custody_group_count(&self, spec: &ChainSpec) -> Result<u64, &'static str> {
let cgc = self
.get_decodable::<u64>(PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY)
.ok_or("ENR custody group count non-existent")?
.map_err(|_| "Could not decode the ENR custody group count")?;
if (spec.custody_requirement..=spec.number_of_custody_groups).contains(&cgc) {
Ok(cgc)
} else {
Err("Invalid custody group count in ENR")
if cgc < spec.custody_requirement {
return Err("ENR CGC < custody_requirement");
}
if cgc > spec.number_of_custody_groups {
return Err("ENR CGC > number_of_custody_groups");
}
Ok(cgc)
}
fn eth2(&self) -> Result<EnrForkId, &'static str> {
@@ -363,7 +365,7 @@ mod test {
let enr = build_enr_with_config(config, &spec).0;
assert_eq!(
enr.custody_group_count::<E>(&spec).unwrap(),
enr.custody_group_count(&spec).unwrap(),
spec.custody_requirement,
);
}
@@ -378,7 +380,7 @@ mod test {
let enr = build_enr_with_config(config, &spec).0;
assert_eq!(
enr.custody_group_count::<E>(&spec).unwrap(),
enr.custody_group_count(&spec).unwrap(),
spec.number_of_custody_groups,
);
}

View File

@@ -594,6 +594,26 @@ impl<E: EthSpec> Discovery<E> {
enr::save_enr_to_disk(Path::new(&self.enr_dir), &self.local_enr());
}
/// Updates the `cgc` field of our local ENR.
pub fn update_cgc_enr(&mut self, cgc: u64) -> Result<bool, String> {
if let Ok(current_cgc) = self.local_enr().custody_group_count(&self.spec) {
if current_cgc == cgc {
return Ok(false);
}
}
self.discv5
.enr_insert(ETH2_ENR_KEY, &cgc)
.map_err(|e| format!("{:?}", e))?;
// replace the global version with discovery version
self.network_globals.set_enr(self.discv5.local_enr());
// persist modified enr to disk
enr::save_enr_to_disk(Path::new(&self.enr_dir), &self.local_enr());
Ok(true)
}
// Bans a peer and it's associated seen IP addresses.
pub fn ban_peer(&mut self, peer_id: &PeerId, ip_addresses: Vec<IpAddr>) {
// first try and convert the peer_id to a node_id.

View File

@@ -34,7 +34,7 @@ where
.as_ref()
.is_ok_and(|b| b.get(*s.deref() as usize).unwrap_or(false)),
Subnet::DataColumn(s) => {
if let Ok(custody_group_count) = enr.custody_group_count::<E>(&spec) {
if let Ok(custody_group_count) = enr.custody_group_count(&spec) {
compute_subnets_for_node(enr.node_id().raw(), custody_group_count, &spec)
.is_ok_and(|subnets| subnets.contains(s))
} else {

View File

@@ -173,7 +173,7 @@ impl<E: EthSpec> Network<E> {
pub async fn new(
executor: task_executor::TaskExecutor,
mut ctx: ServiceContext<'_>,
cgc_updates: Option<CGCUpdates>,
initial_cgc_updates: Option<CGCUpdates>,
) -> Result<(Self, Arc<NetworkGlobals<E>>), String> {
let config = ctx.config.clone();
trace!("Libp2p Service starting");
@@ -197,8 +197,8 @@ impl<E: EthSpec> Network<E> {
&ctx.chain_spec,
)?;
// TODO: Load from disk, and check consistency with DB somewhere
let cgc_updates = cgc_updates.unwrap_or_else(|| {
// Load initial CGC updates from persisted source (DB) or default to minimum CGC
let cgc_updates = initial_cgc_updates.unwrap_or_else(|| {
CGCUpdates::new(
ctx.chain_spec
.custody_group_count(config.subscribe_all_data_column_subnets),
@@ -1259,12 +1259,23 @@ impl<E: EthSpec> Network<E> {
crit!(error = e, "Could not update ENR bitfield");
}
// TODO: Can we deprecate this for a single source of truth?
// TODO(das): Can we deprecate this for a single source of truth?
let metadata = self.network_globals.local_metadata();
self.eth2_rpc_mut()
.update_seq_number(*metadata.seq_number());
}
/// Updates the CGC value in our local ENR
#[instrument(parent = None,
level = "trace",
fields(service = "libp2p"),
name = "libp2p",
skip_all
)]
pub fn update_enr_cgc(&mut self, cgc: u64) -> Result<bool, String> {
self.discovery_mut().update_cgc_enr(cgc)
}
/// Attempts to discover new peers for a given subnet. The `min_ttl` gives the time at which we
/// would like to retain the peers for.
#[instrument(parent = None,

View File

@@ -628,6 +628,40 @@ pub static SAMPLING_REQUEST_RESULT: LazyLock<Result<IntCounterVec>> = LazyLock::
)
});
/*
* PeerDAS custody metrics
*/
pub static LOCAL_INDICES_COUNT: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
try_create_int_gauge(
"beacon_custody_local_indices_count",
"Current count of local indices that recently connected to the beacon API",
)
});
pub static LOCAL_INDICES_ETH_BALANCE: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
try_create_int_gauge(
"beacon_custody_local_indices_eth_balance",
"Current sum of local indices balance that recently connected to the beacon API",
)
});
pub static CGC_INTERNAL: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
try_create_int_gauge(
"beacon_custody_cgc_internal",
"Current internal Custody Group Count CGC",
)
});
pub static CGC_ANNOUNCED: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
try_create_int_gauge(
"beacon_custody_cgc_announced",
"Current announced Custody Group Count CGC",
)
});
pub static CGC_UPDATES: LazyLock<Result<IntCounter>> = LazyLock::new(|| {
try_create_int_counter(
"beacon_custody_cgc_updates",
"Total count of Custody Group Count CGC updates",
)
});
pub fn register_finality_update_error(error: &LightClientFinalityUpdateError) {
inc_counter_vec(&GOSSIP_FINALITY_UPDATE_ERRORS_PER_TYPE, &[error.as_ref()]);
}

View File

@@ -195,28 +195,13 @@ impl TestRig {
let (sync_tx, _sync_rx) = mpsc::unbounded_channel();
// Default metadata
let meta_data = if spec.is_peer_das_scheduled() {
MetaData::V3(MetaDataV3 {
seq_number: SEQ_NUMBER,
attnets: EnrAttestationBitfield::<MainnetEthSpec>::default(),
syncnets: EnrSyncCommitteeBitfield::<MainnetEthSpec>::default(),
custody_group_count: spec.custody_requirement,
})
} else {
MetaData::V2(MetaDataV2 {
seq_number: SEQ_NUMBER,
attnets: EnrAttestationBitfield::<MainnetEthSpec>::default(),
syncnets: EnrSyncCommitteeBitfield::<MainnetEthSpec>::default(),
})
};
let enr_key = CombinedKey::generate_secp256k1();
let enr = enr::Enr::builder().build(&enr_key).unwrap();
let cgc_updates = CGCUpdates::new(spec.custody_requirement);
let network_config = Arc::new(NetworkConfig::default());
let network_globals = Arc::new(NetworkGlobals::new(
enr,
meta_data,
cgc_updates,
vec![],
false,
network_config,

View File

@@ -32,10 +32,9 @@ use task_executor::ShutdownReason;
use tokio::sync::mpsc;
use tokio::time::Sleep;
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
use types::CGCUpdates;
use types::{
ChainSpec, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId,
Unsigned, ValidatorSubscription,
ChainSpec, Epoch, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription,
SyncSubnetId, Unsigned, ValidatorSubscription,
};
mod tests;
@@ -277,22 +276,15 @@ impl<T: BeaconChainTypes> NetworkService<T> {
// TODO(das): check that list of columns is compatible
}
let cgc_udpates = if let Some(disk_cgc_updates) = store
// If there are no stored disk_cgc_updates `Network::new` will default to empty ones with
// the minimum CGC.
let initial_cgc_updates = store
.get_cgc_updates()
.map_err(|e| format!("Unable to read cgc updates from the DB: {e:?}"))?
{
disk_cgc_updates
} else {
// Just return the new one
let initial_cgc = beacon_chain
.spec
.custody_group_count(config.subscribe_all_data_column_subnets);
CGCUpdates::new(initial_cgc)
};
.map_err(|e| format!("Unable to read cgc updates from the DB: {e:?}"))?;
// launch libp2p service
let (mut libp2p, network_globals) =
Network::new(executor.clone(), service_context, Some(cgc_udpates)).await?;
Network::new(executor.clone(), service_context, initial_cgc_updates).await?;
// Repopulate the DHT with stored ENR's if discovery is not disabled.
if !config.disable_discovery {
@@ -819,6 +811,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
LOCAL_VALIDATOR_REGISTRY_TTL_SEC,
));
let mut local_indices_count = 0;
let known_validators_balance = self
.beacon_chain
.validator_monitor
@@ -826,6 +819,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
.get_registered_local_validators()
// TODO(das): should ignore non active validators?
.map(|validator_index| {
local_indices_count += 1;
cached_head
.snapshot
.beacon_state
@@ -834,7 +828,9 @@ impl<T: BeaconChainTypes> NetworkService<T> {
})
.sum::<u64>();
// TODO(das): track connected balance as a metric
// TODO(das): Should persist the local indices here to dump to DB once in a while in case of
// improper shutdown? Not sure if we do the same for other persisted data. It sounds
// sensible but at the same time it will waste I/O.
let next_cgc = self
.beacon_chain
@@ -849,7 +845,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
// progressing.
let next_epoch = clock_epoch + 1;
info!(prev_cgc, next_cgc, %next_epoch, "Updating internal custody count");
// TODO(das): Add CGC metrics for updates, current internal value and announced value
metrics::inc_counter(&metrics::CGC_UPDATES);
// Add a new entry to the network globals
if let Err(e) = self.network_globals.add_cgc_update(
@@ -899,7 +895,26 @@ impl<T: BeaconChainTypes> NetworkService<T> {
let cgc_to_announce = self
.network_globals
.custody_group_count(last_pruned_epoch.start_slot(T::EthSpec::slots_per_epoch()));
// TODO(das): Compare with current announced and update ENR
// update_enr_cgc updates the NetworkGlobals ENR
match self.libp2p.update_enr_cgc(cgc_to_announce) {
Ok(updated) => {
if updated {
info!(cgc = cgc_to_announce, "Updated ENR custody group count");
}
}
Err(e) => {
crit!(error = ?e, "Error updating local ENR custody group count");
}
}
metrics::set_gauge(&metrics::CGC_INTERNAL, next_cgc as i64);
metrics::set_gauge(&metrics::CGC_ANNOUNCED, cgc_to_announce as i64);
metrics::set_gauge(&metrics::LOCAL_INDICES_COUNT, local_indices_count as i64);
metrics::set_gauge(
&metrics::LOCAL_INDICES_ETH_BALANCE,
known_validators_balance as i64 / 1_000_000_000,
);
}
fn on_subnet_service_msg(&mut self, msg: SubnetServiceMessage) {