Improve get_custody_columns validation, caching and error handling (#6308)

* Improve `get_custody_columns` validation, caching and error handling.

* Merge branch 'unstable' into get-custody-columns-error-handing

* Fix failing test and add more test.

* Fix failing test and add more test.

* Merge branch 'unstable' into get-custody-columns-error-handing

# Conflicts:
#	beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs
#	beacon_node/lighthouse_network/src/peer_manager/peerdb.rs
#	beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs
#	beacon_node/lighthouse_network/src/types/globals.rs
#	beacon_node/network/src/service.rs
#	consensus/types/src/data_column_subnet_id.rs

* Add unit test to make sure the default specs won't panic on the `compute_custody_requirement_subnets` function.

* Add condition when calling `compute_custody_subnets_from_metadata` and update logs.

* Validate `csc` when returning from enr. Remove `csc` computation on connection since we get them on metadata anyway.

* Add `peers_per_custody_subnet_count` to track peer csc and supernodes.

* Disconnect peers with invalid metadata and find other peers instead.

* Fix sampling tests.

* Merge branch 'unstable' into get-custody-columns-error-handing

* Merge branch 'unstable' into get-custody-columns-error-handing
This commit is contained in:
Jimmy Chen
2024-09-06 17:39:16 +10:00
committed by GitHub
parent 0c5e25b62a
commit c0b4f01cf3
13 changed files with 292 additions and 235 deletions

View File

@@ -19,7 +19,7 @@ use std::{
sync::Arc,
time::{Duration, Instant},
};
use types::{EthSpec, SyncSubnetId};
use types::{DataColumnSubnetId, EthSpec, SyncSubnetId};
pub use libp2p::core::Multiaddr;
pub use libp2p::identity::Keypair;
@@ -33,7 +33,7 @@ pub use peerdb::peer_info::{
};
use peerdb::score::{PeerAction, ReportSource};
pub use peerdb::sync_status::{SyncInfo, SyncStatus};
use std::collections::{hash_map::Entry, HashMap};
use std::collections::{hash_map::Entry, HashMap, HashSet};
use std::net::IpAddr;
use strum::IntoEnumIterator;
@@ -701,6 +701,8 @@ impl<E: EthSpec> PeerManager<E> {
/// Received a metadata response from a peer.
pub fn meta_data_response(&mut self, peer_id: &PeerId, meta_data: MetaData<E>) {
let mut invalid_meta_data = false;
if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
if let Some(known_meta_data) = &peer_info.meta_data() {
if *known_meta_data.seq_number() < *meta_data.seq_number() {
@@ -717,12 +719,39 @@ impl<E: EthSpec> PeerManager<E> {
debug!(self.log, "Obtained peer's metadata";
"peer_id" => %peer_id, "new_seq_no" => meta_data.seq_number());
}
let node_id_opt = peer_id_to_node_id(peer_id).ok();
peer_info.set_meta_data(meta_data, node_id_opt, &self.network_globals.spec);
let custody_subnet_count_opt = meta_data.custody_subnet_count().copied().ok();
peer_info.set_meta_data(meta_data);
if self.network_globals.spec.is_peer_das_scheduled() {
// Gracefully ignore metadata/v2 peers. Potentially downscore after PeerDAS to
// prioritize PeerDAS peers.
if let Some(custody_subnet_count) = custody_subnet_count_opt {
match self.compute_peer_custody_subnets(peer_id, custody_subnet_count) {
Ok(custody_subnets) => {
peer_info.set_custody_subnets(custody_subnets);
}
Err(err) => {
debug!(self.log, "Unable to compute peer custody subnets from metadata";
"info" => "Sending goodbye to peer",
"peer_id" => %peer_id,
"custody_subnet_count" => custody_subnet_count,
"error" => ?err,
);
invalid_meta_data = true;
}
};
}
}
} else {
error!(self.log, "Received METADATA from an unknown peer";
"peer_id" => %peer_id);
}
// Disconnect peers with invalid metadata and find other peers instead.
if invalid_meta_data {
self.goodbye_peer(peer_id, GoodbyeReason::Fault, ReportSource::PeerManager)
}
}
/// Updates the gossipsub scores for all known peers in gossipsub.
@@ -1290,6 +1319,7 @@ impl<E: EthSpec> PeerManager<E> {
let mut peers_connected = 0;
let mut clients_per_peer = HashMap::new();
let mut peers_connected_mutli: HashMap<(&str, &str), i32> = HashMap::new();
let mut peers_per_custody_subnet_count: HashMap<u64, i64> = HashMap::new();
for (_, peer_info) in self.network_globals.peers.read().connected_peers() {
peers_connected += 1;
@@ -1320,11 +1350,26 @@ impl<E: EthSpec> PeerManager<E> {
*peers_connected_mutli
.entry((direction, transport))
.or_default() += 1;
if let Some(MetaData::V3(meta_data)) = peer_info.meta_data() {
*peers_per_custody_subnet_count
.entry(meta_data.custody_subnet_count)
.or_default() += 1;
}
}
// PEERS_CONNECTED
metrics::set_gauge(&metrics::PEERS_CONNECTED, peers_connected);
// CUSTODY_SUBNET_COUNT
for (custody_subnet_count, peer_count) in peers_per_custody_subnet_count.into_iter() {
metrics::set_gauge_vec(
&metrics::PEERS_PER_CUSTODY_SUBNET_COUNT,
&[&custody_subnet_count.to_string()],
peer_count,
)
}
// PEERS_PER_CLIENT
for client_kind in ClientKind::iter() {
let value = clients_per_peer.get(&client_kind.to_string()).unwrap_or(&0);
@@ -1348,6 +1393,45 @@ impl<E: EthSpec> PeerManager<E> {
}
}
}
fn compute_peer_custody_subnets(
&self,
peer_id: &PeerId,
custody_subnet_count: u64,
) -> Result<HashSet<DataColumnSubnetId>, String> {
// If we don't have a node id, we cannot compute the custody duties anyway
let node_id = peer_id_to_node_id(peer_id)?;
let spec = &self.network_globals.spec;
if !(spec.custody_requirement..=spec.data_column_sidecar_subnet_count)
.contains(&custody_subnet_count)
{
return Err("Invalid custody subnet count in metadata: out of range".to_string());
}
let custody_subnets = DataColumnSubnetId::compute_custody_subnets::<E>(
node_id.raw(),
custody_subnet_count,
spec,
)
.map(|subnets| subnets.collect())
.unwrap_or_else(|e| {
// This is an unreachable scenario unless there's a bug, as we've validated the csc
// just above.
error!(
self.log,
"Computing peer custody subnets failed unexpectedly";
"info" => "Falling back to default custody requirement subnets",
"peer_id" => %peer_id,
"custody_subnet_count" => custody_subnet_count,
"error" => ?e
);
DataColumnSubnetId::compute_custody_requirement_subnets::<E>(node_id.raw(), spec)
.collect()
});
Ok(custody_subnets)
}
}
enum ConnectingType {
@@ -1680,11 +1764,7 @@ mod tests {
.write()
.peer_info_mut(&peer0)
.unwrap()
.set_meta_data(
MetaData::V2(metadata),
None,
&peer_manager.network_globals.spec,
);
.set_meta_data(MetaData::V2(metadata));
peer_manager
.network_globals
.peers
@@ -1704,11 +1784,7 @@ mod tests {
.write()
.peer_info_mut(&peer2)
.unwrap()
.set_meta_data(
MetaData::V2(metadata),
None,
&peer_manager.network_globals.spec,
);
.set_meta_data(MetaData::V2(metadata));
peer_manager
.network_globals
.peers
@@ -1728,11 +1804,7 @@ mod tests {
.write()
.peer_info_mut(&peer4)
.unwrap()
.set_meta_data(
MetaData::V2(metadata),
None,
&peer_manager.network_globals.spec,
);
.set_meta_data(MetaData::V2(metadata));
peer_manager
.network_globals
.peers
@@ -1806,11 +1878,7 @@ mod tests {
.write()
.peer_info_mut(&peer)
.unwrap()
.set_meta_data(
MetaData::V2(metadata),
None,
&peer_manager.network_globals.spec,
);
.set_meta_data(MetaData::V2(metadata));
peer_manager
.network_globals
.peers
@@ -1934,11 +2002,7 @@ mod tests {
.write()
.peer_info_mut(&peer)
.unwrap()
.set_meta_data(
MetaData::V2(metadata),
None,
&peer_manager.network_globals.spec,
);
.set_meta_data(MetaData::V2(metadata));
let long_lived_subnets = peer_manager
.network_globals
.peers
@@ -2047,11 +2111,7 @@ mod tests {
.write()
.peer_info_mut(&peer)
.unwrap()
.set_meta_data(
MetaData::V2(metadata),
None,
&peer_manager.network_globals.spec,
);
.set_meta_data(MetaData::V2(metadata));
let long_lived_subnets = peer_manager
.network_globals
.peers
@@ -2217,11 +2277,7 @@ mod tests {
.write()
.peer_info_mut(&peer)
.unwrap()
.set_meta_data(
MetaData::V2(metadata),
None,
&peer_manager.network_globals.spec,
);
.set_meta_data(MetaData::V2(metadata));
let long_lived_subnets = peer_manager
.network_globals
.peers
@@ -2378,11 +2434,7 @@ mod tests {
let mut peer_db = peer_manager.network_globals.peers.write();
let peer_info = peer_db.peer_info_mut(&condition.peer_id).unwrap();
peer_info.set_meta_data(
MetaData::V2(metadata),
None,
&peer_manager.network_globals.spec,
);
peer_info.set_meta_data(MetaData::V2(metadata));
peer_info.set_gossipsub_score(condition.gossipsub_score);
peer_info.add_to_score(condition.score);

View File

@@ -1,8 +1,6 @@
use crate::discovery::enr::PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY;
use crate::discovery::CombinedKey;
use crate::{
metrics, multiaddr::Multiaddr, types::Subnet, Enr, EnrExt, Eth2Enr, Gossipsub, PeerId,
};
use crate::{metrics, multiaddr::Multiaddr, types::Subnet, Enr, EnrExt, Gossipsub, PeerId};
use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo};
use rand::seq::SliceRandom;
use score::{PeerAction, ReportSource, Score, ScoreState};
@@ -47,16 +45,10 @@ pub struct PeerDB<E: EthSpec> {
disable_peer_scoring: bool,
/// PeerDB's logger
log: slog::Logger,
spec: ChainSpec,
}
impl<E: EthSpec> PeerDB<E> {
pub fn new(
trusted_peers: Vec<PeerId>,
disable_peer_scoring: bool,
log: &slog::Logger,
spec: ChainSpec,
) -> Self {
pub fn new(trusted_peers: Vec<PeerId>, disable_peer_scoring: bool, log: &slog::Logger) -> Self {
// Initialize the peers hashmap with trusted peers
let peers = trusted_peers
.into_iter()
@@ -68,7 +60,6 @@ impl<E: EthSpec> PeerDB<E> {
banned_peers_count: BannedPeersCount::default(),
disable_peer_scoring,
peers,
spec,
}
}
@@ -726,6 +717,14 @@ impl<E: EthSpec> PeerDB<E> {
},
);
if supernode {
let peer_info = self.peers.get_mut(&peer_id).expect("peer exists");
let all_subnets = (0..spec.data_column_sidecar_subnet_count)
.map(|csc| csc.into())
.collect();
peer_info.set_custody_subnets(all_subnets);
}
peer_id
}
@@ -791,14 +790,6 @@ impl<E: EthSpec> PeerDB<E> {
) => {
// Update the ENR if one exists, and compute the custody subnets
if let Some(enr) = enr {
let custody_subnet_count = enr.custody_subnet_count::<E>(&self.spec);
let custody_subnets = DataColumnSubnetId::compute_custody_subnets::<E>(
enr.node_id().raw(),
custody_subnet_count,
&self.spec,
)
.collect::<HashSet<_>>();
info.set_custody_subnets(custody_subnets);
info.set_enr(enr);
}
@@ -1349,8 +1340,7 @@ mod tests {
fn get_db() -> PeerDB<M> {
let log = build_log(slog::Level::Debug, false);
let spec = M::default_spec();
PeerDB::new(vec![], false, &log, spec)
PeerDB::new(vec![], false, &log)
}
#[test]
@@ -2049,8 +2039,7 @@ mod tests {
fn test_trusted_peers_score() {
let trusted_peer = PeerId::random();
let log = build_log(slog::Level::Debug, false);
let spec = M::default_spec();
let mut pdb: PeerDB<M> = PeerDB::new(vec![trusted_peer], false, &log, spec);
let mut pdb: PeerDB<M> = PeerDB::new(vec![trusted_peer], false, &log);
pdb.connect_ingoing(&trusted_peer, "/ip4/0.0.0.0".parse().unwrap(), None);
@@ -2074,8 +2063,7 @@ mod tests {
fn test_disable_peer_scoring() {
let peer = PeerId::random();
let log = build_log(slog::Level::Debug, false);
let spec = M::default_spec();
let mut pdb: PeerDB<M> = PeerDB::new(vec![], true, &log, spec);
let mut pdb: PeerDB<M> = PeerDB::new(vec![], true, &log);
pdb.connect_ingoing(&peer, "/ip4/0.0.0.0".parse().unwrap(), None);

View File

@@ -3,7 +3,6 @@ use super::score::{PeerAction, Score, ScoreState};
use super::sync_status::SyncStatus;
use crate::discovery::Eth2Enr;
use crate::{rpc::MetaData, types::Subnet};
use discv5::enr::NodeId;
use discv5::Enr;
use libp2p::core::multiaddr::{Multiaddr, Protocol};
use serde::{
@@ -14,7 +13,7 @@ use std::collections::HashSet;
use std::net::IpAddr;
use std::time::Instant;
use strum::AsRefStr;
use types::{ChainSpec, DataColumnSubnetId, EthSpec};
use types::{DataColumnSubnetId, EthSpec};
use PeerConnectionStatus::*;
/// Information about a given connected peer.
@@ -358,31 +357,7 @@ impl<E: EthSpec> PeerInfo<E> {
/// Sets an explicit value for the meta data.
// VISIBILITY: The peer manager is able to adjust the meta_data
pub(in crate::peer_manager) fn set_meta_data(
&mut self,
meta_data: MetaData<E>,
node_id_opt: Option<NodeId>,
spec: &ChainSpec,
) {
// If we don't have a node id, we cannot compute the custody duties anyway
let Some(node_id) = node_id_opt else {
self.meta_data = Some(meta_data);
return;
};
// Already set by enr if custody_subnets is non empty
if self.custody_subnets.is_empty() {
if let Ok(custody_subnet_count) = meta_data.custody_subnet_count() {
let custody_subnets = DataColumnSubnetId::compute_custody_subnets::<E>(
node_id.raw(),
std::cmp::min(*custody_subnet_count, spec.data_column_sidecar_subnet_count),
spec,
)
.collect::<HashSet<_>>();
self.set_custody_subnets(custody_subnets);
}
}
pub(in crate::peer_manager) fn set_meta_data(&mut self, meta_data: MetaData<E>) {
self.meta_data = Some(meta_data);
}
@@ -391,7 +366,10 @@ impl<E: EthSpec> PeerInfo<E> {
self.connection_status = connection_status
}
pub(super) fn set_custody_subnets(&mut self, custody_subnets: HashSet<DataColumnSubnetId>) {
pub(in crate::peer_manager) fn set_custody_subnets(
&mut self,
custody_subnets: HashSet<DataColumnSubnetId>,
) {
self.custody_subnets = custody_subnets
}