diff --git a/Cargo.lock b/Cargo.lock index f4a29bf6b5..33df82b01c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2983,7 +2983,7 @@ checksum = "c7d73b3f436185384286bd8098d17ec07c9a7d2388a6599f824d8502b529702a" [[package]] name = "libp2p" version = "0.30.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=de104a80c48f6e61bd7bdac8e17f809477fb4c4a#de104a80c48f6e61bd7bdac8e17f809477fb4c4a" +source = "git+https://github.com/sigp/rust-libp2p?rev=b6278e1ba7b6bcfad1eef300f72148705da5d8d2#b6278e1ba7b6bcfad1eef300f72148705da5d8d2" dependencies = [ "atomic", "bytes 0.5.6", @@ -3001,7 +3001,7 @@ dependencies = [ "libp2p-websocket", "libp2p-yamux", "multihash", - "parity-multiaddr 0.9.3 (git+https://github.com/sigp/rust-libp2p?rev=de104a80c48f6e61bd7bdac8e17f809477fb4c4a)", + "parity-multiaddr 0.9.3 (git+https://github.com/sigp/rust-libp2p?rev=b6278e1ba7b6bcfad1eef300f72148705da5d8d2)", "parking_lot 0.11.0", "pin-project 1.0.1", "smallvec 1.4.2", @@ -3045,7 +3045,7 @@ dependencies = [ [[package]] name = "libp2p-core" version = "0.24.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=de104a80c48f6e61bd7bdac8e17f809477fb4c4a#de104a80c48f6e61bd7bdac8e17f809477fb4c4a" +source = "git+https://github.com/sigp/rust-libp2p?rev=b6278e1ba7b6bcfad1eef300f72148705da5d8d2#b6278e1ba7b6bcfad1eef300f72148705da5d8d2" dependencies = [ "asn1_der", "bs58", @@ -3058,8 +3058,8 @@ dependencies = [ "libsecp256k1", "log 0.4.11", "multihash", - "multistream-select 0.8.4 (git+https://github.com/sigp/rust-libp2p?rev=de104a80c48f6e61bd7bdac8e17f809477fb4c4a)", - "parity-multiaddr 0.9.3 (git+https://github.com/sigp/rust-libp2p?rev=de104a80c48f6e61bd7bdac8e17f809477fb4c4a)", + "multistream-select 0.8.4 (git+https://github.com/sigp/rust-libp2p?rev=b6278e1ba7b6bcfad1eef300f72148705da5d8d2)", + "parity-multiaddr 0.9.3 (git+https://github.com/sigp/rust-libp2p?rev=b6278e1ba7b6bcfad1eef300f72148705da5d8d2)", "parking_lot 0.11.0", "pin-project 1.0.1", "prost", @@ -3078,7 +3078,7 @@ dependencies = [ [[package]] name = "libp2p-core-derive" version = "0.20.2" -source = "git+https://github.com/sigp/rust-libp2p?rev=de104a80c48f6e61bd7bdac8e17f809477fb4c4a#de104a80c48f6e61bd7bdac8e17f809477fb4c4a" +source = "git+https://github.com/sigp/rust-libp2p?rev=b6278e1ba7b6bcfad1eef300f72148705da5d8d2#b6278e1ba7b6bcfad1eef300f72148705da5d8d2" dependencies = [ "quote", "syn", @@ -3087,7 +3087,7 @@ dependencies = [ [[package]] name = "libp2p-dns" version = "0.24.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=de104a80c48f6e61bd7bdac8e17f809477fb4c4a#de104a80c48f6e61bd7bdac8e17f809477fb4c4a" +source = "git+https://github.com/sigp/rust-libp2p?rev=b6278e1ba7b6bcfad1eef300f72148705da5d8d2#b6278e1ba7b6bcfad1eef300f72148705da5d8d2" dependencies = [ "futures 0.3.7", "libp2p-core 0.24.0", @@ -3097,7 +3097,7 @@ dependencies = [ [[package]] name = "libp2p-gossipsub" version = "0.24.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=de104a80c48f6e61bd7bdac8e17f809477fb4c4a#de104a80c48f6e61bd7bdac8e17f809477fb4c4a" +source = "git+https://github.com/sigp/rust-libp2p?rev=b6278e1ba7b6bcfad1eef300f72148705da5d8d2#b6278e1ba7b6bcfad1eef300f72148705da5d8d2" dependencies = [ "base64 0.13.0", "byteorder", @@ -3121,7 +3121,7 @@ dependencies = [ [[package]] name = "libp2p-identify" version = "0.24.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=de104a80c48f6e61bd7bdac8e17f809477fb4c4a#de104a80c48f6e61bd7bdac8e17f809477fb4c4a" +source = "git+https://github.com/sigp/rust-libp2p?rev=b6278e1ba7b6bcfad1eef300f72148705da5d8d2#b6278e1ba7b6bcfad1eef300f72148705da5d8d2" dependencies = [ "futures 0.3.7", "libp2p-core 0.24.0", @@ -3136,7 +3136,7 @@ dependencies = [ [[package]] name = "libp2p-mplex" version = "0.24.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=de104a80c48f6e61bd7bdac8e17f809477fb4c4a#de104a80c48f6e61bd7bdac8e17f809477fb4c4a" +source = "git+https://github.com/sigp/rust-libp2p?rev=b6278e1ba7b6bcfad1eef300f72148705da5d8d2#b6278e1ba7b6bcfad1eef300f72148705da5d8d2" dependencies = [ "bytes 0.5.6", "futures 0.3.7", @@ -3153,7 +3153,7 @@ dependencies = [ [[package]] name = "libp2p-noise" version = "0.26.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=de104a80c48f6e61bd7bdac8e17f809477fb4c4a#de104a80c48f6e61bd7bdac8e17f809477fb4c4a" +source = "git+https://github.com/sigp/rust-libp2p?rev=b6278e1ba7b6bcfad1eef300f72148705da5d8d2#b6278e1ba7b6bcfad1eef300f72148705da5d8d2" dependencies = [ "bytes 0.5.6", "curve25519-dalek", @@ -3174,7 +3174,7 @@ dependencies = [ [[package]] name = "libp2p-swarm" version = "0.24.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=de104a80c48f6e61bd7bdac8e17f809477fb4c4a#de104a80c48f6e61bd7bdac8e17f809477fb4c4a" +source = "git+https://github.com/sigp/rust-libp2p?rev=b6278e1ba7b6bcfad1eef300f72148705da5d8d2#b6278e1ba7b6bcfad1eef300f72148705da5d8d2" dependencies = [ "either", "futures 0.3.7", @@ -3189,7 +3189,7 @@ dependencies = [ [[package]] name = "libp2p-tcp" version = "0.24.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=de104a80c48f6e61bd7bdac8e17f809477fb4c4a#de104a80c48f6e61bd7bdac8e17f809477fb4c4a" +source = "git+https://github.com/sigp/rust-libp2p?rev=b6278e1ba7b6bcfad1eef300f72148705da5d8d2#b6278e1ba7b6bcfad1eef300f72148705da5d8d2" dependencies = [ "futures 0.3.7", "futures-timer", @@ -3204,7 +3204,7 @@ dependencies = [ [[package]] name = "libp2p-websocket" version = "0.25.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=de104a80c48f6e61bd7bdac8e17f809477fb4c4a#de104a80c48f6e61bd7bdac8e17f809477fb4c4a" +source = "git+https://github.com/sigp/rust-libp2p?rev=b6278e1ba7b6bcfad1eef300f72148705da5d8d2#b6278e1ba7b6bcfad1eef300f72148705da5d8d2" dependencies = [ "async-tls", "either", @@ -3223,7 +3223,7 @@ dependencies = [ [[package]] name = "libp2p-yamux" version = "0.27.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=de104a80c48f6e61bd7bdac8e17f809477fb4c4a#de104a80c48f6e61bd7bdac8e17f809477fb4c4a" +source = "git+https://github.com/sigp/rust-libp2p?rev=b6278e1ba7b6bcfad1eef300f72148705da5d8d2#b6278e1ba7b6bcfad1eef300f72148705da5d8d2" dependencies = [ "futures 0.3.7", "libp2p-core 0.24.0", @@ -3656,7 +3656,7 @@ dependencies = [ [[package]] name = "multistream-select" version = "0.8.4" -source = "git+https://github.com/sigp/rust-libp2p?rev=de104a80c48f6e61bd7bdac8e17f809477fb4c4a#de104a80c48f6e61bd7bdac8e17f809477fb4c4a" +source = "git+https://github.com/sigp/rust-libp2p?rev=b6278e1ba7b6bcfad1eef300f72148705da5d8d2#b6278e1ba7b6bcfad1eef300f72148705da5d8d2" dependencies = [ "bytes 0.5.6", "futures 0.3.7", @@ -3973,7 +3973,7 @@ dependencies = [ [[package]] name = "parity-multiaddr" version = "0.9.3" -source = "git+https://github.com/sigp/rust-libp2p?rev=de104a80c48f6e61bd7bdac8e17f809477fb4c4a#de104a80c48f6e61bd7bdac8e17f809477fb4c4a" +source = "git+https://github.com/sigp/rust-libp2p?rev=b6278e1ba7b6bcfad1eef300f72148705da5d8d2#b6278e1ba7b6bcfad1eef300f72148705da5d8d2" dependencies = [ "arrayref", "bs58", diff --git a/beacon_node/eth2_libp2p/Cargo.toml b/beacon_node/eth2_libp2p/Cargo.toml index f027105a4d..cf539512a0 100644 --- a/beacon_node/eth2_libp2p/Cargo.toml +++ b/beacon_node/eth2_libp2p/Cargo.toml @@ -42,7 +42,7 @@ regex = "1.3.9" [dependencies.libp2p] #version = "0.23.0" git = "https://github.com/sigp/rust-libp2p" -rev = "de104a80c48f6e61bd7bdac8e17f809477fb4c4a" +rev = "b6278e1ba7b6bcfad1eef300f72148705da5d8d2" default-features = false features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dns", "tcp-tokio"] diff --git a/beacon_node/eth2_libp2p/src/behaviour/gossipsub_scoring_parameters.rs b/beacon_node/eth2_libp2p/src/behaviour/gossipsub_scoring_parameters.rs new file mode 100644 index 0000000000..ea4527033b --- /dev/null +++ b/beacon_node/eth2_libp2p/src/behaviour/gossipsub_scoring_parameters.rs @@ -0,0 +1,346 @@ +use crate::types::{GossipEncoding, GossipKind, GossipTopic}; +use crate::{error, TopicHash}; +use libp2p::gossipsub::{ + GenericGossipsubConfig, IdentTopic as Topic, PeerScoreParams, PeerScoreThresholds, + TopicScoreParams, +}; +use std::cmp::max; +use std::collections::HashMap; +use std::marker::PhantomData; +use std::time::Duration; +use types::{ChainSpec, EnrForkId, EthSpec, Slot, SubnetId}; + +const MAX_IN_MESH_SCORE: f64 = 10.0; +const MAX_FIRST_MESSAGE_DELIVERIES_SCORE: f64 = 40.0; +const BEACON_BLOCK_WEIGHT: f64 = 0.5; +const BEACON_AGGREGATE_PROOF_WEIGHT: f64 = 0.5; +const VOLUNTARY_EXIT_WEIGHT: f64 = 0.05; +const PROPOSER_SLASHING_WEIGHT: f64 = 0.05; +const ATTESTER_SLASHING_WEIGHT: f64 = 0.05; + +pub struct PeerScoreSettings { + slot: Duration, + epoch: Duration, + + beacon_attestation_subnet_weight: f64, + max_positive_score: f64, + + decay_interval: Duration, + decay_to_zero: f64, + + mesh_n: usize, + max_committees_per_slot: usize, + target_committee_size: usize, + target_aggregators_per_committee: usize, + attestation_subnet_count: u64, + phantom: PhantomData, +} + +impl PeerScoreSettings { + pub fn new( + chain_spec: &ChainSpec, + gs_config: &GenericGossipsubConfig, + ) -> PeerScoreSettings { + let slot = Duration::from_millis(chain_spec.milliseconds_per_slot); + let beacon_attestation_subnet_weight = 1.0 / chain_spec.attestation_subnet_count as f64; + let max_positive_score = (MAX_IN_MESH_SCORE + MAX_FIRST_MESSAGE_DELIVERIES_SCORE) + * (BEACON_BLOCK_WEIGHT + + BEACON_AGGREGATE_PROOF_WEIGHT + + beacon_attestation_subnet_weight * chain_spec.attestation_subnet_count as f64 + + VOLUNTARY_EXIT_WEIGHT + + PROPOSER_SLASHING_WEIGHT + + ATTESTER_SLASHING_WEIGHT); + + PeerScoreSettings { + slot, + epoch: slot * TSpec::slots_per_epoch() as u32, + beacon_attestation_subnet_weight, + max_positive_score, + decay_interval: slot, + decay_to_zero: 0.01, + mesh_n: gs_config.mesh_n(), + max_committees_per_slot: chain_spec.max_committees_per_slot, + target_committee_size: chain_spec.target_committee_size, + target_aggregators_per_committee: chain_spec.target_aggregators_per_committee as usize, + attestation_subnet_count: chain_spec.attestation_subnet_count, + phantom: PhantomData, + } + } + + pub fn get_peer_score_params( + &self, + active_validators: usize, + thresholds: &PeerScoreThresholds, + enr_fork_id: &EnrForkId, + current_slot: Slot, + ) -> error::Result { + let mut params = PeerScoreParams::default(); + + params.decay_interval = self.decay_interval; + params.decay_to_zero = self.decay_to_zero; + params.retain_score = self.epoch * 100; + params.app_specific_weight = 1.0; + params.ip_colocation_factor_threshold = 3.0; + params.behaviour_penalty_threshold = 6.0; + + params.behaviour_penalty_decay = self.score_parameter_decay(self.epoch * 10); + + let target_value = Self::decay_convergence( + params.behaviour_penalty_decay, + 10.0 / TSpec::slots_per_epoch() as f64, + ) - params.behaviour_penalty_threshold; + params.behaviour_penalty_weight = thresholds.gossip_threshold / target_value.powi(2); + + params.topic_score_cap = self.max_positive_score * 0.5; + params.ip_colocation_factor_weight = -params.topic_score_cap; + + params.topics = HashMap::new(); + + let get_hash = |kind: GossipKind| -> TopicHash { + let topic: Topic = + GossipTopic::new(kind, GossipEncoding::default(), enr_fork_id.fork_digest).into(); + topic.hash() + }; + + //first all fixed topics + params.topics.insert( + get_hash(GossipKind::VoluntaryExit), + Self::get_topic_params( + self, + VOLUNTARY_EXIT_WEIGHT, + 4.0 / TSpec::slots_per_epoch() as f64, + self.epoch * 100, + None, + ), + ); + params.topics.insert( + get_hash(GossipKind::AttesterSlashing), + Self::get_topic_params( + self, + ATTESTER_SLASHING_WEIGHT, + 1.0 / 5.0 / TSpec::slots_per_epoch() as f64, + self.epoch * 100, + None, + ), + ); + params.topics.insert( + get_hash(GossipKind::ProposerSlashing), + Self::get_topic_params( + self, + PROPOSER_SLASHING_WEIGHT, + 1.0 / 5.0 / TSpec::slots_per_epoch() as f64, + self.epoch * 100, + None, + ), + ); + + //dynamic topics + let (beacon_block_params, beacon_aggregate_proof_params, beacon_attestation_subnet_params) = + self.get_dynamic_topic_params(active_validators, current_slot)?; + + params + .topics + .insert(get_hash(GossipKind::BeaconBlock), beacon_block_params); + + params.topics.insert( + get_hash(GossipKind::BeaconAggregateAndProof), + beacon_aggregate_proof_params, + ); + + for i in 0..self.attestation_subnet_count { + params.topics.insert( + get_hash(GossipKind::Attestation(SubnetId::new(i))), + beacon_attestation_subnet_params.clone(), + ); + } + + Ok(params) + } + + pub fn get_dynamic_topic_params( + &self, + active_validators: usize, + current_slot: Slot, + ) -> error::Result<(TopicScoreParams, TopicScoreParams, TopicScoreParams)> { + let (aggregators_per_slot, committees_per_slot) = + self.expected_aggregator_count_per_slot(active_validators)?; + let multiple_bursts_per_subnet_per_epoch = committees_per_slot as u64 + >= 2 * self.attestation_subnet_count / TSpec::slots_per_epoch(); + + let beacon_block_params = Self::get_topic_params( + self, + BEACON_BLOCK_WEIGHT, + 1.0, + self.epoch * 20, + Some((TSpec::slots_per_epoch() * 5, 3.0, self.epoch, current_slot)), + ); + + let beacon_aggregate_proof_params = Self::get_topic_params( + self, + BEACON_AGGREGATE_PROOF_WEIGHT, + aggregators_per_slot, + self.epoch, + Some((TSpec::slots_per_epoch() * 2, 4.0, self.epoch, current_slot)), + ); + let beacon_attestation_subnet_params = Self::get_topic_params( + self, + self.beacon_attestation_subnet_weight, + active_validators as f64 + / self.attestation_subnet_count as f64 + / TSpec::slots_per_epoch() as f64, + self.epoch + * (if multiple_bursts_per_subnet_per_epoch { + 1 + } else { + 4 + }), + Some(( + TSpec::slots_per_epoch() + * (if multiple_bursts_per_subnet_per_epoch { + 4 + } else { + 16 + }), + 16.0, + if multiple_bursts_per_subnet_per_epoch { + self.slot * (TSpec::slots_per_epoch() as u32 / 2 + 1) + } else { + self.epoch * 3 + }, + current_slot, + )), + ); + + Ok(( + beacon_block_params, + beacon_aggregate_proof_params, + beacon_attestation_subnet_params, + )) + } + + pub fn attestation_subnet_count(&self) -> u64 { + self.attestation_subnet_count + } + + fn score_parameter_decay_with_base( + decay_time: Duration, + decay_interval: Duration, + decay_to_zero: f64, + ) -> f64 { + let ticks = decay_time.as_secs_f64() / decay_interval.as_secs_f64(); + decay_to_zero.powf(1.0 / ticks) + } + + fn decay_convergence(decay: f64, rate: f64) -> f64 { + rate / (1.0 - decay) + } + + fn threshold(decay: f64, rate: f64) -> f64 { + Self::decay_convergence(decay, rate) * decay + } + + fn expected_aggregator_count_per_slot( + &self, + active_validators: usize, + ) -> error::Result<(f64, usize)> { + let committees_per_slot = TSpec::get_committee_count_per_slot_with( + active_validators, + self.max_committees_per_slot, + self.target_committee_size, + ) + .map_err(|e| format!("Could not get committee count from spec: {:?}", e))?; + + let committees = committees_per_slot * TSpec::slots_per_epoch() as usize; + + let smaller_committee_size = active_validators / committees; + let num_larger_committees = active_validators - smaller_committee_size * committees; + + let modulo_smaller = max( + 1, + smaller_committee_size / self.target_aggregators_per_committee as usize, + ); + let modulo_larger = max( + 1, + (smaller_committee_size + 1) / self.target_aggregators_per_committee as usize, + ); + + Ok(( + (((committees - num_larger_committees) * smaller_committee_size) as f64 + / modulo_smaller as f64 + + (num_larger_committees * (smaller_committee_size + 1)) as f64 + / modulo_larger as f64) + / TSpec::slots_per_epoch() as f64, + committees_per_slot, + )) + } + + fn score_parameter_decay(&self, decay_time: Duration) -> f64 { + Self::score_parameter_decay_with_base(decay_time, self.decay_interval, self.decay_to_zero) + } + + fn get_topic_params( + &self, + topic_weight: f64, + expected_message_rate: f64, + first_message_decay_time: Duration, + // decay slots (decay time in slots), cap factor, activation window, current slot + mesh_message_info: Option<(u64, f64, Duration, Slot)>, + ) -> TopicScoreParams { + let mut t_params = TopicScoreParams::default(); + + t_params.topic_weight = topic_weight; + + t_params.time_in_mesh_quantum = self.slot; + t_params.time_in_mesh_cap = 3600.0 / t_params.time_in_mesh_quantum.as_secs_f64(); + t_params.time_in_mesh_weight = 10.0 / t_params.time_in_mesh_cap; + + t_params.first_message_deliveries_decay = + self.score_parameter_decay(first_message_decay_time); + t_params.first_message_deliveries_cap = Self::decay_convergence( + t_params.first_message_deliveries_decay, + 2.0 * expected_message_rate / self.mesh_n as f64, + ); + t_params.first_message_deliveries_weight = 40.0 / t_params.first_message_deliveries_cap; + + if let Some((decay_slots, cap_factor, activation_window, current_slot)) = mesh_message_info + { + let decay_time = self.slot * decay_slots as u32; + t_params.mesh_message_deliveries_decay = self.score_parameter_decay(decay_time); + t_params.mesh_message_deliveries_threshold = Self::threshold( + t_params.mesh_message_deliveries_decay, + expected_message_rate / 50.0, + ); + t_params.mesh_message_deliveries_cap = + if cap_factor * t_params.mesh_message_deliveries_threshold < 2.0 { + 2.0 + } else { + cap_factor * t_params.mesh_message_deliveries_threshold + }; + t_params.mesh_message_deliveries_activation = activation_window; + t_params.mesh_message_deliveries_window = Duration::from_secs(2); + t_params.mesh_failure_penalty_decay = t_params.mesh_message_deliveries_decay; + t_params.mesh_message_deliveries_weight = -self.max_positive_score + / (t_params.topic_weight * t_params.mesh_message_deliveries_threshold.powi(2)); + t_params.mesh_failure_penalty_weight = t_params.mesh_message_deliveries_weight; + if decay_slots >= current_slot.as_u64() { + t_params.mesh_message_deliveries_threshold = 0.0; + t_params.mesh_message_deliveries_weight = 0.0; + } + } else { + t_params.mesh_message_deliveries_weight = 0.0; + t_params.mesh_message_deliveries_threshold = 0.0; + t_params.mesh_message_deliveries_decay = 0.0; + t_params.mesh_message_deliveries_cap = 0.0; + t_params.mesh_message_deliveries_window = Duration::from_secs(0); + t_params.mesh_message_deliveries_activation = Duration::from_secs(0); + t_params.mesh_failure_penalty_decay = 0.0; + t_params.mesh_failure_penalty_weight = 0.0; + } + + t_params.invalid_message_deliveries_weight = + -self.max_positive_score / t_params.topic_weight; + t_params.invalid_message_deliveries_decay = self.score_parameter_decay(self.epoch * 50); + + t_params + } +} diff --git a/beacon_node/eth2_libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs index f39fcb768c..bcb5914962 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -1,3 +1,4 @@ +use crate::behaviour::gossipsub_scoring_parameters::PeerScoreSettings; use crate::peer_manager::{score::PeerAction, PeerManager, PeerManagerEvent}; use crate::rpc::*; use crate::service::METADATA_FILENAME; @@ -9,6 +10,7 @@ use handler::{BehaviourHandler, BehaviourHandlerIn, DelegateIn, DelegateOut}; use libp2p::gossipsub::subscription_filter::{ MaxCountSubscriptionFilter, WhitelistSubscriptionFilter, }; +use libp2p::gossipsub::PeerScoreThresholds; use libp2p::{ core::{ connection::{ConnectedPoint, ConnectionId, ListenerId}, @@ -38,11 +40,13 @@ use std::{ sync::Arc, task::{Context, Poll}, }; -use types::{EnrForkId, EthSpec, SignedBeaconBlock, SubnetId}; +use types::{ChainSpec, EnrForkId, EthSpec, SignedBeaconBlock, Slot, SubnetId}; +mod gossipsub_scoring_parameters; mod handler; const MAX_IDENTIFY_ADDRESSES: usize = 10; +pub const GOSSIPSUB_GREYLIST_THRESHOLD: f64 = -16000.0; /// Identifier of requests sent by a peer. pub type PeerRequestId = (ConnectionId, SubstreamId); @@ -131,6 +135,11 @@ pub struct Behaviour { network_dir: PathBuf, /// Logger for behaviour actions. log: slog::Logger, + + score_settings: PeerScoreSettings, + + /// The interval for updating gossipsub scores + update_gossipsub_scores: tokio::time::Interval, } /// Implements the combined behaviour for the libp2p service. @@ -140,6 +149,7 @@ impl Behaviour { net_conf: &NetworkConfig, network_globals: Arc>, log: &slog::Logger, + chain_spec: &ChainSpec, ) -> error::Result { let behaviour_log = log.new(o!()); @@ -161,19 +171,42 @@ impl Behaviour { max_subscriptions_per_request: 100, //this is according to the current go implementation }; - let gossipsub = Gossipsub::new_with_subscription_filter( + let mut gossipsub = Gossipsub::new_with_subscription_filter( MessageAuthenticity::Anonymous, net_conf.gs_config.clone(), filter, ) .map_err(|e| format!("Could not construct gossipsub: {:?}", e))?; - // Temporarily disable scoring until parameters are tested. - /* + //we don't know the number of active validators and the current slot yet + let active_validators = TSpec::minimum_validator_count(); + let current_slot = Slot::new(0); + + let thresholds = PeerScoreThresholds { + gossip_threshold: -4000.0, + publish_threshold: -8000.0, + graylist_threshold: GOSSIPSUB_GREYLIST_THRESHOLD, + accept_px_threshold: 100.0, + opportunistic_graft_threshold: 5.0, + }; + + let score_settings = PeerScoreSettings::new(chain_spec, &net_conf.gs_config); + + //Prepare scoring parameters + let params = score_settings.get_peer_score_params( + active_validators, + &thresholds, + &enr_fork_id, + current_slot, + )?; + + trace!(behaviour_log, "Using peer score params"; "params" => format!("{:?}", params)); + + let update_gossipsub_scores = tokio::time::interval(params.decay_interval); + gossipsub - .with_peer_score(PeerScoreParams::default(), PeerScoreThresholds::default()) + .with_peer_score(params.clone(), thresholds) .expect("Valid score params and thresholds"); - */ Ok(Behaviour { eth2_rpc: RPC::new(log.clone()), @@ -188,9 +221,51 @@ impl Behaviour { waker: None, network_dir: net_conf.network_dir.clone(), log: behaviour_log, + score_settings, + update_gossipsub_scores, }) } + pub fn update_gossipsub_parameters( + &mut self, + active_validators: usize, + current_slot: Slot, + ) -> error::Result<()> { + let (beacon_block_params, beacon_aggregate_proof_params, beacon_attestation_subnet_params) = + self.score_settings + .get_dynamic_topic_params(active_validators, current_slot)?; + + let fork_digest = self.enr_fork_id.fork_digest; + let get_topic = |kind: GossipKind| -> Topic { + GossipTopic::new(kind, GossipEncoding::default(), fork_digest).into() + }; + + debug!(self.log, "Updating gossipsub score parameters"; + "active_validators" => active_validators); + trace!(self.log, "Updated gossipsub score parameters"; + "beacon_block_params" => format!("{:?}", beacon_block_params), + "beacon_aggregate_proof_params" => format!("{:?}", beacon_aggregate_proof_params), + "beacon_attestation_subnet_params" => format!("{:?}", beacon_attestation_subnet_params), + ); + + self.gossipsub + .set_topic_params(get_topic(GossipKind::BeaconBlock), beacon_block_params)?; + + self.gossipsub.set_topic_params( + get_topic(GossipKind::BeaconAggregateAndProof), + beacon_aggregate_proof_params, + )?; + + for i in 0..self.score_settings.attestation_subnet_count() { + self.gossipsub.set_topic_params( + get_topic(GossipKind::Attestation(SubnetId::new(i))), + beacon_attestation_subnet_params.clone(), + )?; + } + + Ok(()) + } + /// Attempts to connect to a libp2p peer. /// /// This MUST be used over Swarm::dial() as this keeps track of the peer in the peer manager. @@ -308,7 +383,8 @@ impl Behaviour { match message.encode(GossipEncoding::default()) { Ok(message_data) => { if let Err(e) = self.gossipsub.publish(topic.clone().into(), message_data) { - slog::warn!(self.log, "Could not publish message"; "error" => format!("{:?}", e)); + slog::warn!(self.log, "Could not publish message"; + "error" => format!("{:?}", e)); // add to metrics match topic.kind() { @@ -772,6 +848,11 @@ impl Behaviour { return Poll::Ready(NBAction::GenerateEvent(event)); } + // perform gossipsub score updates when necessary + while let Poll::Ready(Some(_)) = self.update_gossipsub_scores.poll_next_unpin(cx) { + self.peer_manager.update_gossipsub_scores(&self.gossipsub); + } + Poll::Pending } diff --git a/beacon_node/eth2_libp2p/src/config.rs b/beacon_node/eth2_libp2p/src/config.rs index 0e2aab147a..8c0ccad47b 100644 --- a/beacon_node/eth2_libp2p/src/config.rs +++ b/beacon_node/eth2_libp2p/src/config.rs @@ -142,6 +142,7 @@ impl Default for Config { .duplicate_cache_time(Duration::from_secs(385)) .message_id_fn(gossip_message_id) .fast_message_id_fn(fast_gossip_message_id) + .allow_self_origin(true) .build() .expect("valid gossipsub configuration"); diff --git a/beacon_node/eth2_libp2p/src/discovery/mod.rs b/beacon_node/eth2_libp2p/src/discovery/mod.rs index 8cd728b382..1d440f4ed3 100644 --- a/beacon_node/eth2_libp2p/src/discovery/mod.rs +++ b/beacon_node/eth2_libp2p/src/discovery/mod.rs @@ -632,7 +632,7 @@ impl Discovery { .network_globals .peers .read() - .peers_on_subnet(subnet_query.subnet_id) + .good_peers_on_subnet(subnet_query.subnet_id) .count(); if peers_on_subnet >= TARGET_SUBNET_PEERS { diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index 6578517937..b6eeeaba5f 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -4,7 +4,7 @@ pub use self::peerdb::*; use crate::discovery::{subnet_predicate, Discovery, DiscoveryEvent, TARGET_SUBNET_PEERS}; use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCode}; use crate::types::SyncState; -use crate::{error, metrics}; +use crate::{error, metrics, Gossipsub}; use crate::{EnrExt, NetworkConfig, NetworkGlobals, PeerId, SubnetDiscovery}; use futures::prelude::*; use futures::Stream; @@ -33,7 +33,9 @@ pub(crate) mod score; pub use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerConnectionStatus::*, PeerInfo}; pub use peer_sync_status::{PeerSyncStatus, SyncInfo}; use score::{PeerAction, ScoreState}; +use std::cmp::Ordering; use std::collections::HashMap; + /// The time in seconds between re-status's peers. const STATUS_INTERVAL: u64 = 300; /// The time in seconds between PING events. We do not send a ping if the other peer has PING'd us @@ -49,6 +51,10 @@ const HEARTBEAT_INTERVAL: u64 = 30; /// PEER_EXCESS_FACTOR = 0.1 we allow 10% more nodes, i.e 55. const PEER_EXCESS_FACTOR: f32 = 0.1; +/// Relative factor of peers that are allowed to have a negative gossipsub score without penalizing +/// them in lighthouse. +const ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR: f32 = 0.1; + /// The main struct that handles peer's reputation and connection status. pub struct PeerManager { /// Storage of network globals to access the `PeerDB`. @@ -237,7 +243,7 @@ impl PeerManager { .network_globals .peers .read() - .peers_on_subnet(s.subnet_id) + .good_peers_on_subnet(s.subnet_id) .count(); if peers_on_subnet >= TARGET_SUBNET_PEERS { debug!( @@ -521,6 +527,34 @@ impl PeerManager { } } + pub(crate) fn update_gossipsub_scores(&mut self, gossipsub: &Gossipsub) { + //collect peers with scores + let mut guard = self.network_globals.peers.write(); + let mut peers: Vec<_> = guard + .peers_mut() + .filter_map(|(peer_id, info)| gossipsub.peer_score(peer_id).map(|score| (info, score))) + .collect(); + + // sort descending by score + peers.sort_unstable_by(|(.., s1), (.., s2)| s2.partial_cmp(s1).unwrap_or(Ordering::Equal)); + + let mut to_ignore_negative_peers = + (self.target_peers as f32 * ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR).ceil() as usize; + for (info, score) in peers { + info.update_gossipsub_score( + score, + if score < 0.0 && to_ignore_negative_peers > 0 { + to_ignore_negative_peers -= 1; + // We ignore the negative score for the best negative peers so that their + // gossipsub score can recover without getting disconnected. + true + } else { + false + }, + ); + } + } + /* Internal functions */ // The underlying discovery server has updated our external IP address. We send this up to diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs index 3da4cab135..4d5aa4c1b4 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs @@ -109,8 +109,8 @@ impl PeerInfo { } /// Returns score of the peer. - pub fn score(&self) -> Score { - self.score + pub fn score(&self) -> &Score { + &self.score } /// Returns the state of the peer based on the score. @@ -132,6 +132,14 @@ impl PeerInfo { } } + pub(crate) fn update_gossipsub_score(&mut self, new_score: f64, ignore: bool) { + self.score.update_gossipsub_score(new_score, ignore); + } + + pub fn is_good_gossipsub_peer(&self) -> bool { + self.score.is_good_gossipsub_peer() + } + #[cfg(test)] /// Resets the peers score. pub fn reset_score(&mut self) { diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs index 1351e7d851..5d2eece4d0 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs @@ -95,10 +95,11 @@ impl PeerDB { /* Getters */ /// Gives the score of a peer, or default score if it is unknown. - pub fn score(&self, peer_id: &PeerId) -> Score { + pub fn score(&self, peer_id: &PeerId) -> f64 { self.peers .get(peer_id) - .map_or(Score::default(), |info| info.score()) + .map_or(&Score::default(), |info| info.score()) + .score() } /// Returns an iterator over all peers in the db. @@ -162,7 +163,7 @@ impl PeerDB { /// This is used to determine if we should accept incoming connections or not. pub fn is_banned(&self, peer_id: &PeerId) -> bool { if let Some(peer) = self.peers.get(peer_id) { - match peer.score().state() { + match peer.score_state() { ScoreState::Banned => true, _ => self.ip_is_banned(peer), } @@ -184,7 +185,7 @@ impl PeerDB { /// Returns true if the Peer is either banned or in the disconnected state. pub fn is_banned_or_disconnected(&self, peer_id: &PeerId) -> bool { if let Some(peer) = self.peers.get(peer_id) { - match peer.score().state() { + match peer.score_state() { ScoreState::Banned | ScoreState::Disconnected => true, _ => self.ip_is_banned(peer), } @@ -241,10 +242,12 @@ impl PeerDB { } /// Gives an iterator of all peers on a given subnet. - pub fn peers_on_subnet(&self, subnet_id: SubnetId) -> impl Iterator { + pub fn good_peers_on_subnet(&self, subnet_id: SubnetId) -> impl Iterator { self.peers .iter() - .filter(move |(_, info)| info.is_connected() && info.on_subnet(subnet_id)) + .filter(move |(_, info)| { + info.is_connected() && info.on_subnet(subnet_id) && info.is_good_gossipsub_peer() + }) .map(|(peer_id, _)| peer_id) } @@ -664,7 +667,7 @@ mod tests { // this is the only peer assert_eq!(pdb.peers().count(), 1); // the peer has the default reputation - assert_eq!(pdb.score(&random_peer).score(), Score::default().score()); + assert_eq!(pdb.score(&random_peer), Score::default().score()); // it should be connected, and therefore not counted as disconnected assert_eq!(pdb.disconnected_peers, 0); assert!(peer_info.unwrap().is_connected()); diff --git a/beacon_node/eth2_libp2p/src/peer_manager/score.rs b/beacon_node/eth2_libp2p/src/peer_manager/score.rs index 8f43a24a41..80017bdf8d 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/score.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/score.rs @@ -5,6 +5,7 @@ //! As the logic develops this documentation will advance. //! //! The scoring algorithms are currently experimental. +use crate::behaviour::GOSSIPSUB_GREYLIST_THRESHOLD; use serde::Serialize; use std::time::Instant; use tokio::time::Duration; @@ -19,6 +20,9 @@ pub(crate) const DEFAULT_SCORE: f64 = 0.0; const MIN_SCORE_BEFORE_DISCONNECT: f64 = -20.0; /// The minimum reputation before a peer is banned. const MIN_SCORE_BEFORE_BAN: f64 = -50.0; +/// If a peer has a lighthouse score below this constant all other score parts will get ignored and +/// the peer will get banned regardless of the other parts. +const MIN_LIGHTHOUSE_SCORE_BEFORE_BAN: f64 = -60.0; /// The maximum score a peer can obtain. const MAX_SCORE: f64 = 100.0; /// The minimum score a peer can obtain. @@ -28,6 +32,12 @@ const SCORE_HALFLIFE: f64 = 600.0; /// The number of seconds we ban a peer for before their score begins to decay. const BANNED_BEFORE_DECAY: Duration = Duration::from_secs(1800); +/// We weight negative gossipsub scores in such a way that they never result in a disconnect by +/// themselves. This "solves" the problem of non-decaying gossipsub scores for disconnected peers. +const GOSSIPSUB_NEGATIVE_SCORE_WEIGHT: f64 = + (MIN_SCORE_BEFORE_DISCONNECT + 1.0) / GOSSIPSUB_GREYLIST_THRESHOLD; +const GOSSIPSUB_POSITIVE_SCORE_WEIGHT: f64 = GOSSIPSUB_NEGATIVE_SCORE_WEIGHT; + /// A collection of actions a peer can perform which will adjust its score. /// Each variant has an associated score change. // To easily assess the behaviour of scores changes the number of variants should stay low, and @@ -55,74 +65,6 @@ pub enum PeerAction { _ValidMessage, } -/// The expected state of the peer given the peer's score. -#[derive(Debug, PartialEq)] -pub(crate) enum ScoreState { - /// We are content with the peers performance. We permit connections and messages. - Healthy, - /// The peer should be disconnected. We allow re-connections if the peer is persistent. - Disconnected, - /// The peer is banned. We disallow new connections until it's score has decayed into a - /// tolerable threshold. - Banned, -} - -/// A peer's score (perceived potential usefulness). -/// -/// This simplistic version consists of a global score per peer which decays to 0 over time. The -/// decay rate applies equally to positive and negative scores. -#[derive(Copy, PartialEq, Clone, Debug, Serialize)] -pub struct Score { - /// The global score. - // NOTE: In the future we may separate this into sub-scores involving the RPC, Gossipsub and - // lighthouse. - score: f64, - /// The time the score was last updated to perform time-based adjustments such as score-decay. - #[serde(skip)] - last_updated: Instant, -} - -impl Default for Score { - fn default() -> Self { - Score { - score: DEFAULT_SCORE, - last_updated: Instant::now(), - } - } -} - -impl Eq for Score {} - -impl PartialOrd for Score { - fn partial_cmp(&self, other: &Score) -> Option { - self.score - .partial_cmp(&other.score) - .or_else(|| self.last_updated.partial_cmp(&other.last_updated)) - } -} - -impl Ord for Score { - fn cmp(&self, other: &Score) -> std::cmp::Ordering { - self.partial_cmp(other) - .unwrap_or_else(|| std::cmp::Ordering::Equal) - } -} - -impl From for Score { - fn from(f: f64) -> Self { - Score { - score: f, - last_updated: Instant::now(), - } - } -} - -impl std::fmt::Display for Score { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:.2}", self.score) - } -} - impl std::fmt::Display for PeerAction { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -135,6 +77,18 @@ impl std::fmt::Display for PeerAction { } } +/// The expected state of the peer given the peer's score. +#[derive(Debug, PartialEq)] +pub(crate) enum ScoreState { + /// We are content with the peers performance. We permit connections and messages. + Healthy, + /// The peer should be disconnected. We allow re-connections if the peer is persistent. + Disconnected, + /// The peer is banned. We disallow new connections until it's score has decayed into a + /// tolerable threshold. + Banned, +} + impl std::fmt::Display for ScoreState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -145,23 +99,59 @@ impl std::fmt::Display for ScoreState { } } -impl Score { - /// Return max possible score. - pub fn max_score() -> Self { - Score { - score: MAX_SCORE, +/// A peer's score (perceived potential usefulness). +/// +/// This simplistic version consists of a global score per peer which decays to 0 over time. The +/// decay rate applies equally to positive and negative scores. +#[derive(PartialEq, Clone, Debug, Serialize)] +pub struct RealScore { + /// The global score. + // NOTE: In the future we may separate this into sub-scores involving the RPC, Gossipsub and + // lighthouse. + lighthouse_score: f64, + gossipsub_score: f64, + /// We ignore the negative gossipsub scores of some peers to allow decaying without + /// disconnecting. + ignore_negative_gossipsub_score: bool, + score: f64, + /// The time the score was last updated to perform time-based adjustments such as score-decay. + #[serde(skip)] + last_updated: Instant, +} + +impl Default for RealScore { + fn default() -> Self { + RealScore { + lighthouse_score: DEFAULT_SCORE, + gossipsub_score: DEFAULT_SCORE, + score: DEFAULT_SCORE, last_updated: Instant::now(), + ignore_negative_gossipsub_score: false, } } +} + +impl RealScore { /// Access to the underlying score. - pub fn score(&self) -> f64 { + fn recompute_score(&mut self) { + self.score = self.lighthouse_score; + if self.lighthouse_score <= MIN_LIGHTHOUSE_SCORE_BEFORE_BAN { + //ignore all other scores, i.e. do nothing here + } else if self.gossipsub_score >= 0.0 { + self.score += self.gossipsub_score * GOSSIPSUB_POSITIVE_SCORE_WEIGHT; + } else if !self.ignore_negative_gossipsub_score { + self.score += self.gossipsub_score * GOSSIPSUB_NEGATIVE_SCORE_WEIGHT; + } + } + + fn score(&self) -> f64 { self.score } /// Modifies the score based on a peer's action. pub fn apply_peer_action(&mut self, peer_action: PeerAction) { match peer_action { - PeerAction::Fatal => self.score = MIN_SCORE, // The worst possible score + PeerAction::Fatal => self.set_lighthouse_score(MIN_SCORE), // The worst possible score PeerAction::LowToleranceError => self.add(-10.0), PeerAction::MidToleranceError => self.add(-5.0), PeerAction::HighToleranceError => self.add(-1.0), @@ -169,18 +159,14 @@ impl Score { } } - /// Returns the expected state of the peer given it's score. - pub(crate) fn state(&self) -> ScoreState { - match self.score { - x if x <= MIN_SCORE_BEFORE_BAN => ScoreState::Banned, - x if x <= MIN_SCORE_BEFORE_DISCONNECT => ScoreState::Disconnected, - _ => ScoreState::Healthy, - } + fn set_lighthouse_score(&mut self, new_score: f64) { + self.lighthouse_score = new_score; + self.update_state(); } /// Add an f64 to the score abiding by the limits. fn add(&mut self, score: f64) { - let mut new_score = self.score + score; + let mut new_score = self.lighthouse_score + score; if new_score > MAX_SCORE { new_score = MAX_SCORE; } @@ -188,32 +174,28 @@ impl Score { new_score = MIN_SCORE; } - if self.score > MIN_SCORE_BEFORE_BAN && new_score <= MIN_SCORE_BEFORE_BAN { + self.set_lighthouse_score(new_score); + } + + fn update_state(&mut self) { + let was_not_banned = self.score > MIN_SCORE_BEFORE_BAN; + self.recompute_score(); + if was_not_banned && self.score <= MIN_SCORE_BEFORE_BAN { //we ban this peer for at least BANNED_BEFORE_DECAY seconds self.last_updated += BANNED_BEFORE_DECAY; } - - self.score = new_score; } /// Add an f64 to the score abiding by the limits. #[cfg(test)] pub fn test_add(&mut self, score: f64) { - let mut new_score = self.score + score; - if new_score > MAX_SCORE { - new_score = MAX_SCORE; - } - if new_score < MIN_SCORE { - new_score = MIN_SCORE; - } - - self.score = new_score; + self.add(score); } #[cfg(test)] // reset the score pub fn test_reset(&mut self) { - self.score = 0f64; + self.set_lighthouse_score(0f64); } /// Applies time-based logic such as decay rates to the score. @@ -237,10 +219,110 @@ impl Score { { // e^(-ln(2)/HL*t) let decay_factor = (*HALFLIFE_DECAY * secs_since_update as f64).exp(); - self.score *= decay_factor; + self.lighthouse_score *= decay_factor; self.last_updated = now; + self.update_state(); } } + + pub fn update_gossipsub_score(&mut self, new_score: f64, ignore: bool) { + // we only update gossipsub if last_updated is in the past which means either the peer is + // not banned or the BANNED_BEFORE_DECAY time is over. + if self.last_updated <= Instant::now() { + self.gossipsub_score = new_score; + self.ignore_negative_gossipsub_score = ignore; + self.update_state(); + } + } + + pub fn is_good_gossipsub_peer(&self) -> bool { + self.gossipsub_score >= 0.0 + } +} + +#[derive(PartialEq, Clone, Debug, Serialize)] +pub enum Score { + Max, + Real(RealScore), +} + +impl Default for Score { + fn default() -> Self { + Self::Real(RealScore::default()) + } +} + +macro_rules! apply { + ( $method:ident $(, $param_name: ident: $param_type: ty)*) => { + impl Score { + pub fn $method( + &mut self, $($param_name: $param_type, )* + ) { + if let Self::Real(score) = self { + score.$method($($param_name, )*); + } + } + } + }; +} + +apply!(apply_peer_action, peer_action: PeerAction); +apply!(add, delta: f64); +apply!(update); +apply!(update_gossipsub_score, new_score: f64, ignore: bool); +#[cfg(test)] +apply!(test_add, score: f64); +#[cfg(test)] +apply!(test_reset); + +impl Score { + pub fn score(&self) -> f64 { + match self { + Self::Max => f64::INFINITY, + Self::Real(score) => score.score(), + } + } + + pub fn max_score() -> Self { + Self::Max + } + + /// Returns the expected state of the peer given it's score. + pub(crate) fn state(&self) -> ScoreState { + match self.score() { + x if x <= MIN_SCORE_BEFORE_BAN => ScoreState::Banned, + x if x <= MIN_SCORE_BEFORE_DISCONNECT => ScoreState::Disconnected, + _ => ScoreState::Healthy, + } + } + + pub fn is_good_gossipsub_peer(&self) -> bool { + match self { + Self::Max => true, + Self::Real(score) => score.is_good_gossipsub_peer(), + } + } +} + +impl Eq for Score {} + +impl PartialOrd for Score { + fn partial_cmp(&self, other: &Score) -> Option { + self.score().partial_cmp(&other.score()) + } +} + +impl Ord for Score { + fn cmp(&self, other: &Score) -> std::cmp::Ordering { + self.partial_cmp(other) + .unwrap_or_else(|| std::cmp::Ordering::Equal) + } +} + +impl std::fmt::Display for Score { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:.2}", self.score()) + } } #[cfg(test)] @@ -278,20 +360,36 @@ mod tests { #[test] fn test_ban_time() { - let mut score = Score::default(); + let mut score = RealScore::default(); let now = Instant::now(); let change = MIN_SCORE_BEFORE_BAN; score.add(change); assert_eq!(score.score(), MIN_SCORE_BEFORE_BAN); - assert_eq!(score.state(), ScoreState::Banned); score.update_at(now + BANNED_BEFORE_DECAY); assert_eq!(score.score(), MIN_SCORE_BEFORE_BAN); - assert_eq!(score.state(), ScoreState::Banned); score.update_at(now + BANNED_BEFORE_DECAY + Duration::from_secs(1)); assert!(score.score() > MIN_SCORE_BEFORE_BAN); + } + + #[test] + fn test_very_negative_gossipsub_score() { + let mut score = Score::default(); + score.update_gossipsub_score(GOSSIPSUB_GREYLIST_THRESHOLD, false); + assert!(!score.is_good_gossipsub_peer()); + assert!(score.score() < 0.0); + assert_eq!(score.state(), ScoreState::Healthy); + score.add(-1.0001); assert_eq!(score.state(), ScoreState::Disconnected); } + + #[test] + fn test_ignored_gossipsub_score() { + let mut score = Score::default(); + score.update_gossipsub_score(GOSSIPSUB_GREYLIST_THRESHOLD, true); + assert!(!score.is_good_gossipsub_peer()); + assert_eq!(score.score(), 0.0); + } } diff --git a/beacon_node/eth2_libp2p/src/service.rs b/beacon_node/eth2_libp2p/src/service.rs index e2fbb672e2..2f638e0e36 100644 --- a/beacon_node/eth2_libp2p/src/service.rs +++ b/beacon_node/eth2_libp2p/src/service.rs @@ -23,7 +23,7 @@ use std::io::prelude::*; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -use types::{EnrForkId, EthSpec}; +use types::{ChainSpec, EnrForkId, EthSpec}; pub const NETWORK_KEY_FILENAME: &str = "key"; /// The maximum simultaneous libp2p connections per peer. @@ -62,6 +62,7 @@ impl Service { config: &NetworkConfig, enr_fork_id: EnrForkId, log: &slog::Logger, + chain_spec: &ChainSpec, ) -> error::Result<(Arc>, Self)> { let log = log.new(o!("service"=> "libp2p")); trace!(log, "Libp2p Service starting"); @@ -104,8 +105,14 @@ impl Service { let transport = build_transport(local_keypair.clone()) .map_err(|e| format!("Failed to build transport: {:?}", e))?; // Lighthouse network behaviour - let behaviour = - Behaviour::new(&local_keypair, config, network_globals.clone(), &log).await?; + let behaviour = Behaviour::new( + &local_keypair, + config, + network_globals.clone(), + &log, + chain_spec, + ) + .await?; // use the executor for libp2p struct Executor(task_executor::TaskExecutor); diff --git a/beacon_node/eth2_libp2p/tests/common/mod.rs b/beacon_node/eth2_libp2p/tests/common/mod.rs index 608edd9e95..8d9859db75 100644 --- a/beacon_node/eth2_libp2p/tests/common/mod.rs +++ b/beacon_node/eth2_libp2p/tests/common/mod.rs @@ -7,7 +7,7 @@ use eth2_libp2p::{GossipsubConfigBuilder, Libp2pEvent, NetworkConfig}; use slog::{debug, error, o, Drain}; use std::net::{TcpListener, UdpSocket}; use std::time::Duration; -use types::{EnrForkId, MinimalEthSpec}; +use types::{ChainSpec, EnrForkId, MinimalEthSpec}; type E = MinimalEthSpec; use tempdir::TempDir; @@ -105,10 +105,16 @@ pub async fn build_libp2p_instance(boot_nodes: Vec, log: slog::Logger) -> L shutdown_tx, ); Libp2pInstance( - LibP2PService::new(executor, &config, EnrForkId::default(), &log) - .await - .expect("should build libp2p instance") - .1, + LibP2PService::new( + executor, + &config, + EnrForkId::default(), + &log, + &ChainSpec::minimal(), + ) + .await + .expect("should build libp2p instance") + .1, signal, ) } diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index cbe7f550ad..bf365262b0 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -36,13 +36,13 @@ lazy_static! { &["subnet"] ); - pub static ref AVG_GOSSIPSUB_PEER_SCORE_PER_MAIN_TOPIC: Result = try_create_int_gauge_vec( + pub static ref AVG_GOSSIPSUB_PEER_SCORE_PER_MAIN_TOPIC: Result = try_create_float_gauge_vec( "gossipsub_avg_peer_score_per_topic", "Average peer's score per topic", &["topic_hash"] ); - pub static ref AVG_GOSSIPSUB_PEER_SCORE_PER_SUBNET_TOPIC: Result = try_create_int_gauge_vec( + pub static ref AVG_GOSSIPSUB_PEER_SCORE_PER_SUBNET_TOPIC: Result = try_create_float_gauge_vec( "gossipsub_avg_peer_score_per_subnet_topic", "Average peer's score per subnet topic", &["subnet"] @@ -53,6 +53,60 @@ lazy_static! { "Failed attestation publishes per subnet", &["subnet"] ); + + pub static ref SCORES_BELOW_ZERO_PER_CLIENT: Result = try_create_float_gauge_vec( + "gossipsub_scores_below_zero_per_client", + "Relative number of scores below zero per client", + &["Client"] + ); + pub static ref SCORES_BELOW_GOSSIP_THRESHOLD_PER_CLIENT: Result = try_create_float_gauge_vec( + "gossipsub_scores_below_gossip_threshold_per_client", + "Relative number of scores below gossip threshold per client", + &["Client"] + ); + pub static ref SCORES_BELOW_PUBLISH_THRESHOLD_PER_CLIENT: Result = try_create_float_gauge_vec( + "gossipsub_scores_below_publish_threshold_per_client", + "Relative number of scores below publish threshold per client", + &["Client"] + ); + pub static ref SCORES_BELOW_GREYLIST_THRESHOLD_PER_CLIENT: Result = try_create_float_gauge_vec( + "gossipsub_scores_below_greylist_threshold_per_client", + "Relative number of scores below greylist threshold per client", + &["Client"] + ); + + pub static ref MIN_SCORES_PER_CLIENT: Result = try_create_float_gauge_vec( + "gossipsub_min_scores_per_client", + "Minimum scores per client", + &["Client"] + ); + pub static ref MEDIAN_SCORES_PER_CLIENT: Result = try_create_float_gauge_vec( + "gossipsub_median_scores_per_client", + "Median scores per client", + &["Client"] + ); + pub static ref MEAN_SCORES_PER_CLIENT: Result = try_create_float_gauge_vec( + "gossipsub_mean_scores_per_client", + "Mean scores per client", + &["Client"] + ); + pub static ref MAX_SCORES_PER_CLIENT: Result = try_create_float_gauge_vec( + "gossipsub_max_scores_per_client", + "Max scores per client", + &["Client"] + ); + pub static ref BEACON_BLOCK_MESH_PEERS_PER_CLIENT: Result = + try_create_int_gauge_vec( + "block_mesh_peers_per_client", + "Number of mesh peers for BeaconBlock topic per client", + &["Client"] + ); + pub static ref BEACON_AGGREGATE_AND_PROOF_MESH_PEERS_PER_CLIENT: Result = + try_create_int_gauge_vec( + "beacon_aggregate_and_proof_mesh_peers_per_client", + "Number of mesh peers for BeaconAggregateAndProof topic per client", + &["Client"] + ); } lazy_static! { diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 2ce6c086d4..5a19c46f08 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -20,7 +20,7 @@ use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; use store::HotColdDB; use tokio::sync::mpsc; use tokio::time::Delay; -use types::{EthSpec, ValidatorSubscription}; +use types::{EthSpec, RelativeEpoch, ValidatorSubscription}; mod tests; @@ -112,6 +112,8 @@ pub struct NetworkService { next_fork_update: Option, /// A timer for updating various network metrics. metrics_update: tokio::time::Interval, + /// gossipsub_parameter_update timer + gossipsub_parameter_update: tokio::time::Interval, /// The logger for the network service. log: slog::Logger, } @@ -153,8 +155,14 @@ impl NetworkService { let next_fork_update = next_fork_delay(&beacon_chain); // launch libp2p service - let (network_globals, mut libp2p) = - LibP2PService::new(executor.clone(), config, enr_fork_id, &network_log).await?; + let (network_globals, mut libp2p) = LibP2PService::new( + executor.clone(), + config, + enr_fork_id, + &network_log, + &beacon_chain.spec, + ) + .await?; // Repopulate the DHT with stored ENR's. let enrs_to_load = load_dht::(store.clone()); @@ -183,6 +191,9 @@ impl NetworkService { // create a timer for updating network metrics let metrics_update = tokio::time::interval(Duration::from_secs(METRIC_UPDATE_INTERVAL)); + // create a timer for updating gossipsub parameters + let gossipsub_parameter_update = tokio::time::interval(Duration::from_secs(60)); + // create the network service and spawn the task let network_log = network_log.new(o!("service" => "network")); let network_service = NetworkService { @@ -197,6 +208,7 @@ impl NetworkService { discovery_auto_update: config.discv5_config.enr_update, next_fork_update, metrics_update, + gossipsub_parameter_update, log: network_log, }; @@ -256,7 +268,51 @@ fn spawn_service( .as_ref() .map(|gauge| gauge.reset()); } - update_gossip_metrics::(&service.libp2p.swarm.gs()); + update_gossip_metrics::( + &service.libp2p.swarm.gs(), + &service.network_globals, + &service.log + ); + } + _ = service.gossipsub_parameter_update.next() => { + if let Ok(slot) = service.beacon_chain.slot() { + if let Some(active_validators) = service.beacon_chain.with_head(|head| { + Ok( + head + .beacon_state + .get_cached_active_validator_indices(RelativeEpoch::Current) + .map(|indices| indices.len()) + .ok() + .or_else(|| { + // if active validator cached was not build we count the + // active validators + service + .beacon_chain + .epoch() + .ok() + .map(|current_epoch| { + head + .beacon_state + .validators + .iter() + .filter(|validator| + validator.is_active_at(current_epoch) + ) + .count() + }) + }) + ) + }).unwrap_or(None) { + if (*service.libp2p.swarm) + .update_gossipsub_parameters(active_validators, slot).is_err() { + error!( + service.log, + "Failed to update gossipsub parameters"; + "active_validators" => active_validators + ); + } + } + } } // handle a message sent to the network Some(message) = service.network_recv.recv() => { @@ -296,6 +352,7 @@ fn spawn_service( trace!(service.log, "Propagating gossipsub message"; "propagation_peer" => format!("{:?}", propagation_source), "message_id" => message_id.to_string(), + "validation_result" => format!("{:?}", validation_result) ); service .libp2p @@ -537,7 +594,11 @@ fn expose_receive_metrics(message: &PubsubMessage) { } } -fn update_gossip_metrics(gossipsub: &Gossipsub) { +fn update_gossip_metrics( + gossipsub: &Gossipsub, + network_globals: &Arc>, + logger: &slog::Logger, +) { // Clear the metrics let _ = metrics::PEERS_PER_PROTOCOL .as_ref() @@ -555,6 +616,38 @@ fn update_gossip_metrics(gossipsub: &Gossipsub) { .as_ref() .map(|gauge| gauge.reset()); + let _ = metrics::SCORES_BELOW_ZERO_PER_CLIENT + .as_ref() + .map(|gauge| gauge.reset()); + let _ = metrics::SCORES_BELOW_GOSSIP_THRESHOLD_PER_CLIENT + .as_ref() + .map(|gauge| gauge.reset()); + let _ = metrics::SCORES_BELOW_PUBLISH_THRESHOLD_PER_CLIENT + .as_ref() + .map(|gauge| gauge.reset()); + let _ = metrics::SCORES_BELOW_GREYLIST_THRESHOLD_PER_CLIENT + .as_ref() + .map(|gauge| gauge.reset()); + let _ = metrics::MIN_SCORES_PER_CLIENT + .as_ref() + .map(|gauge| gauge.reset()); + let _ = metrics::MEDIAN_SCORES_PER_CLIENT + .as_ref() + .map(|gauge| gauge.reset()); + let _ = metrics::MEAN_SCORES_PER_CLIENT + .as_ref() + .map(|gauge| gauge.reset()); + let _ = metrics::MAX_SCORES_PER_CLIENT + .as_ref() + .map(|gauge| gauge.reset()); + + let _ = metrics::BEACON_BLOCK_MESH_PEERS_PER_CLIENT + .as_ref() + .map(|gauge| gauge.reset()); + let _ = metrics::BEACON_AGGREGATE_AND_PROOF_MESH_PEERS_PER_CLIENT + .as_ref() + .map(|gauge| gauge.reset()); + // reset the mesh peers, showing all subnets for subnet_id in 0..T::default_spec().attestation_subnet_count { let _ = metrics::get_int_gauge( @@ -607,22 +700,22 @@ fn update_gossip_metrics(gossipsub: &Gossipsub) { // average peer scores if let Some(score) = gossipsub.peer_score(peer_id) { - if let Some(v) = metrics::get_int_gauge( + if let Some(v) = metrics::get_gauge( &metrics::AVG_GOSSIPSUB_PEER_SCORE_PER_SUBNET_TOPIC, &[&subnet_id.to_string()], ) { - v.add(score as i64) + v.add(score) }; } } kind => { // main topics if let Some(score) = gossipsub.peer_score(peer_id) { - if let Some(v) = metrics::get_int_gauge( + if let Some(v) = metrics::get_gauge( &metrics::AVG_GOSSIPSUB_PEER_SCORE_PER_MAIN_TOPIC, &[&format!("{:?}", kind)], ) { - v.add(score as i64) + v.add(score) }; } } @@ -636,20 +729,20 @@ fn update_gossip_metrics(gossipsub: &Gossipsub) { match topic.kind() { GossipKind::Attestation(subnet_id) => { // average peer scores - if let Some(v) = metrics::get_int_gauge( + if let Some(v) = metrics::get_gauge( &metrics::AVG_GOSSIPSUB_PEER_SCORE_PER_SUBNET_TOPIC, &[&subnet_id.to_string()], ) { - v.set(v.get() / (*peers as i64)) + v.set(v.get() / (*peers as f64)) }; } kind => { // main topics - if let Some(v) = metrics::get_int_gauge( + if let Some(v) = metrics::get_gauge( &metrics::AVG_GOSSIPSUB_PEER_SCORE_PER_MAIN_TOPIC, &[&format!("{:?}", kind)], ) { - v.set(v.get() / (*peers as i64)) + v.set(v.get() / (*peers as f64)) }; } } @@ -695,4 +788,132 @@ fn update_gossip_metrics(gossipsub: &Gossipsub) { v.set(*peers) }; } + + let mut peer_to_client = HashMap::new(); + let mut scores_per_client: HashMap> = HashMap::new(); + { + let peers = network_globals.peers.read(); + for (peer_id, _) in gossipsub.all_peers() { + let client = peers + .peer_info(peer_id) + .map_or("Unknown".to_string(), |peer_info| { + peer_info.client.kind.to_string() + }); + peer_to_client.insert(peer_id, client.clone()); + let score = gossipsub.peer_score(peer_id).unwrap_or(0.0); + if (client == "Prysm" || client == "Lighthouse") && score < 0.0 { + trace!(logger, "Peer has negative score"; "peer" => format!("{:?}", peer_id), + "client" => &client, "score" => score); + } + scores_per_client.entry(client).or_default().push(score); + } + } + + // mesh peers per client + for topic_hash in gossipsub.topics() { + if let Ok(topic) = GossipTopic::decode(topic_hash.as_str()) { + match topic.kind() { + GossipKind::BeaconBlock => { + for peer in gossipsub.mesh_peers(&topic_hash) { + if let Some(client) = peer_to_client.get(peer) { + if let Some(v) = metrics::get_int_gauge( + &metrics::BEACON_BLOCK_MESH_PEERS_PER_CLIENT, + &[client], + ) { + v.inc() + }; + } + } + } + GossipKind::BeaconAggregateAndProof => { + for peer in gossipsub.mesh_peers(&topic_hash) { + if let Some(client) = peer_to_client.get(peer) { + if let Some(v) = metrics::get_int_gauge( + &metrics::BEACON_AGGREGATE_AND_PROOF_MESH_PEERS_PER_CLIENT, + &[client], + ) { + v.inc() + }; + } + } + } + _ => (), + } + } + } + + for (client, scores) in scores_per_client.into_iter() { + let c = &[client.as_ref()]; + let len = scores.len(); + if len > 0 { + let mut below0 = 0; + let mut below_gossip_threshold = 0; + let mut below_publish_threshold = 0; + let mut below_greylist_threshold = 0; + let mut min = f64::INFINITY; + let mut sum = 0.0; + let mut max = f64::NEG_INFINITY; + + let count = scores.len() as f64; + + for &score in &scores { + if score < 0.0 { + below0 += 1; + } + if score < -4000.0 { + //TODO not hardcode + below_gossip_threshold += 1; + } + if score < -8000.0 { + //TODO not hardcode + below_publish_threshold += 1; + } + if score < -16000.0 { + //TODO not hardcode + below_greylist_threshold += 1; + } + if score < min { + min = score; + } + if score > max { + max = score; + } + sum += score; + } + + let median = if len == 0 { + 0.0 + } else if len % 2 == 0 { + (scores[len / 2 - 1] + scores[len / 2]) / 2.0 + } else { + scores[len / 2] + }; + + metrics::set_gauge_entry( + &metrics::SCORES_BELOW_ZERO_PER_CLIENT, + c, + below0 as f64 / count, + ); + metrics::set_gauge_entry( + &metrics::SCORES_BELOW_GOSSIP_THRESHOLD_PER_CLIENT, + c, + below_gossip_threshold as f64 / count, + ); + metrics::set_gauge_entry( + &metrics::SCORES_BELOW_PUBLISH_THRESHOLD_PER_CLIENT, + c, + below_publish_threshold as f64 / count, + ); + metrics::set_gauge_entry( + &metrics::SCORES_BELOW_GREYLIST_THRESHOLD_PER_CLIENT, + c, + below_greylist_threshold as f64 / count, + ); + + metrics::set_gauge_entry(&metrics::MIN_SCORES_PER_CLIENT, c, min); + metrics::set_gauge_entry(&metrics::MEDIAN_SCORES_PER_CLIENT, c, median); + metrics::set_gauge_entry(&metrics::MEAN_SCORES_PER_CLIENT, c, sum / count); + metrics::set_gauge_entry(&metrics::MAX_SCORES_PER_CLIENT, c, max); + } + } } diff --git a/common/lighthouse_metrics/src/lib.rs b/common/lighthouse_metrics/src/lib.rs index 0637b973c7..1bed7b74b1 100644 --- a/common/lighthouse_metrics/src/lib.rs +++ b/common/lighthouse_metrics/src/lib.rs @@ -57,6 +57,7 @@ use prometheus::{HistogramOpts, HistogramTimer, Opts}; use std::time::Duration; +use prometheus::core::{Atomic, GenericGauge, GenericGaugeVec}; pub use prometheus::{ Encoder, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Result, TextEncoder, @@ -164,6 +165,27 @@ pub fn get_int_gauge(int_gauge_vec: &Result, name: &[&str]) -> Opti } } +pub fn get_gauge( + gauge_vec: &Result>, + name: &[&str], +) -> Option> { + if let Ok(gauge_vec) = gauge_vec { + Some(gauge_vec.get_metric_with_label_values(name).ok()?) + } else { + None + } +} + +pub fn set_gauge_entry( + gauge_vec: &Result>, + name: &[&str], + value: P::T, +) { + if let Some(v) = get_gauge(gauge_vec, name) { + v.set(value) + }; +} + /// If `int_gauge_vec.is_ok()`, sets the gauge with the given `name` to the given `value` /// otherwise returns false. pub fn set_int_gauge(int_gauge_vec: &Result, name: &[&str], value: i64) -> bool { diff --git a/consensus/types/src/eth_spec.rs b/consensus/types/src/eth_spec.rs index 27b3255dce..43b6443203 100644 --- a/consensus/types/src/eth_spec.rs +++ b/consensus/types/src/eth_spec.rs @@ -109,16 +109,28 @@ pub trait EthSpec: 'static + Default + Sync + Send + Clone + Debug + PartialEq + fn get_committee_count_per_slot( active_validator_count: usize, spec: &ChainSpec, + ) -> Result { + Self::get_committee_count_per_slot_with( + active_validator_count, + spec.max_committees_per_slot, + spec.target_committee_size, + ) + } + + fn get_committee_count_per_slot_with( + active_validator_count: usize, + max_committees_per_slot: usize, + target_committee_size: usize, ) -> Result { let slots_per_epoch = Self::SlotsPerEpoch::to_usize(); Ok(std::cmp::max( 1, std::cmp::min( - spec.max_committees_per_slot, + max_committees_per_slot, active_validator_count .safe_div(slots_per_epoch)? - .safe_div(spec.target_committee_size)?, + .safe_div(target_committee_size)?, ), )) } diff --git a/testing/simulator/src/eth1_sim.rs b/testing/simulator/src/eth1_sim.rs index acf4fe845d..28e1f5f0d6 100644 --- a/testing/simulator/src/eth1_sim.rs +++ b/testing/simulator/src/eth1_sim.rs @@ -53,16 +53,17 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> { let spec = &mut env.eth2_config.spec; + let total_validator_count = validators_per_node * node_count; + spec.milliseconds_per_slot /= speed_up_factor; spec.eth1_follow_distance = 16; spec.genesis_delay = eth1_block_time.as_secs() * spec.eth1_follow_distance * 2; spec.min_genesis_time = 0; - spec.min_genesis_active_validator_count = 64; + spec.min_genesis_active_validator_count = total_validator_count as u64; spec.seconds_per_eth1_block = 1; let slot_duration = Duration::from_millis(spec.milliseconds_per_slot); let initial_validator_count = spec.min_genesis_active_validator_count as usize; - let total_validator_count = validators_per_node * node_count; let deposit_amount = env.eth2_config.spec.max_effective_balance; let context = env.core_context(); diff --git a/testing/simulator/src/no_eth1_sim.rs b/testing/simulator/src/no_eth1_sim.rs index afcd96986c..45292b23bb 100644 --- a/testing/simulator/src/no_eth1_sim.rs +++ b/testing/simulator/src/no_eth1_sim.rs @@ -52,11 +52,13 @@ pub fn run_no_eth1_sim(matches: &ArgMatches) -> Result<(), String> { let spec = &mut env.eth2_config.spec; + let total_validator_count = validators_per_node * node_count; + spec.milliseconds_per_slot /= speed_up_factor; spec.eth1_follow_distance = 16; spec.genesis_delay = eth1_block_time.as_secs() * spec.eth1_follow_distance * 2; spec.min_genesis_time = 0; - spec.min_genesis_active_validator_count = 64; + spec.min_genesis_active_validator_count = total_validator_count as u64; spec.seconds_per_eth1_block = 1; let genesis_delay = Duration::from_secs(5); @@ -67,7 +69,6 @@ pub fn run_no_eth1_sim(matches: &ArgMatches) -> Result<(), String> { let genesis_instant = Instant::now() + genesis_delay; let slot_duration = Duration::from_millis(spec.milliseconds_per_slot); - let total_validator_count = validators_per_node * node_count; let context = env.core_context();