diff --git a/beacon_node/lighthouse_network/src/behaviour/gossip_cache.rs b/beacon_node/lighthouse_network/src/behaviour/gossip_cache.rs new file mode 100644 index 0000000000..6aab56cd9e --- /dev/null +++ b/beacon_node/lighthouse_network/src/behaviour/gossip_cache.rs @@ -0,0 +1,247 @@ +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; + +use crate::types::GossipKind; +use crate::GossipTopic; + +use tokio_util::time::delay_queue::{DelayQueue, Key}; + +/// Store of gossip messages that we failed to publish and will try again later. By default, all +/// messages are ignored. This behaviour can be changed using `GossipCacheBuilder::default_timeout` +/// to apply the same delay to every kind. Individual timeouts for specific kinds can be set and +/// will overwrite the default_timeout if present. +pub struct GossipCache { + /// Expire timeouts for each topic-msg pair. + expirations: DelayQueue<(GossipTopic, Vec)>, + /// Messages cached for each topic. + topic_msgs: HashMap, Key>>, + /// Timeout for blocks. + beacon_block: Option, + /// Timeout for aggregate attestations. + aggregates: Option, + /// Timeout for attestations. + attestation: Option, + /// Timeout for voluntary exits. + voluntary_exit: Option, + /// Timeout for proposer slashings. + proposer_slashing: Option, + /// Timeout for attester slashings. + attester_slashing: Option, + /// Timeout for aggregated sync commitee signatures. + signed_contribution_and_proof: Option, + /// Timeout for sync commitee messages. + sync_committee_message: Option, +} + +#[derive(Default)] +pub struct GossipCacheBuilder { + default_timeout: Option, + /// Timeout for blocks. + beacon_block: Option, + /// Timeout for aggregate attestations. + aggregates: Option, + /// Timeout for attestations. + attestation: Option, + /// Timeout for voluntary exits. + voluntary_exit: Option, + /// Timeout for proposer slashings. + proposer_slashing: Option, + /// Timeout for attester slashings. + attester_slashing: Option, + /// Timeout for aggregated sync commitee signatures. + signed_contribution_and_proof: Option, + /// Timeout for sync commitee messages. + sync_committee_message: Option, +} + +#[allow(dead_code)] +impl GossipCacheBuilder { + /// By default, all timeouts all disabled. Setting a default timeout will enable all timeout + /// that are not already set. + pub fn default_timeout(mut self, timeout: Duration) -> Self { + self.default_timeout = Some(timeout); + self + } + /// Timeout for blocks. + pub fn beacon_block_timeout(mut self, timeout: Duration) -> Self { + self.beacon_block = Some(timeout); + self + } + + /// Timeout for aggregate attestations. + pub fn aggregates_timeout(mut self, timeout: Duration) -> Self { + self.aggregates = Some(timeout); + self + } + + /// Timeout for attestations. + pub fn attestation_timeout(mut self, timeout: Duration) -> Self { + self.attestation = Some(timeout); + self + } + + /// Timeout for voluntary exits. + pub fn voluntary_exit_timeout(mut self, timeout: Duration) -> Self { + self.voluntary_exit = Some(timeout); + self + } + + /// Timeout for proposer slashings. + pub fn proposer_slashing_timeout(mut self, timeout: Duration) -> Self { + self.proposer_slashing = Some(timeout); + self + } + + /// Timeout for attester slashings. + pub fn attester_slashing(mut self, timeout: Duration) -> Self { + self.attester_slashing = Some(timeout); + self + } + + /// Timeout for aggregated sync commitee signatures. + pub fn signed_contribution_and_proof(mut self, timeout: Duration) -> Self { + self.signed_contribution_and_proof = Some(timeout); + self + } + + /// Timeout for sync commitee messages. + pub fn sync_committee_message(mut self, timeout: Duration) -> Self { + self.sync_committee_message = Some(timeout); + self + } + + pub fn build(self) -> GossipCache { + let GossipCacheBuilder { + default_timeout, + beacon_block, + aggregates, + attestation, + voluntary_exit, + proposer_slashing, + attester_slashing, + signed_contribution_and_proof, + sync_committee_message, + } = self; + GossipCache { + expirations: DelayQueue::default(), + topic_msgs: HashMap::default(), + beacon_block: beacon_block.or(default_timeout), + aggregates: aggregates.or(default_timeout), + attestation: attestation.or(default_timeout), + voluntary_exit: voluntary_exit.or(default_timeout), + proposer_slashing: proposer_slashing.or(default_timeout), + attester_slashing: attester_slashing.or(default_timeout), + signed_contribution_and_proof: signed_contribution_and_proof.or(default_timeout), + sync_committee_message: sync_committee_message.or(default_timeout), + } + } +} + +impl GossipCache { + /// Get a builder of a `GossipCache`. Topic kinds for which no timeout is defined will be + /// ignored if added in `insert`. + pub fn builder() -> GossipCacheBuilder { + GossipCacheBuilder::default() + } + + // Insert a message to be sent later. + pub fn insert(&mut self, topic: GossipTopic, data: Vec) { + let expire_timeout = match topic.kind() { + GossipKind::BeaconBlock => self.beacon_block, + GossipKind::BeaconAggregateAndProof => self.aggregates, + GossipKind::Attestation(_) => self.attestation, + GossipKind::VoluntaryExit => self.voluntary_exit, + GossipKind::ProposerSlashing => self.proposer_slashing, + GossipKind::AttesterSlashing => self.attester_slashing, + GossipKind::SignedContributionAndProof => self.signed_contribution_and_proof, + GossipKind::SyncCommitteeMessage(_) => self.sync_committee_message, + }; + let expire_timeout = match expire_timeout { + Some(expire_timeout) => expire_timeout, + None => return, + }; + match self + .topic_msgs + .entry(topic.clone()) + .or_default() + .entry(data.clone()) + { + Entry::Occupied(key) => self.expirations.reset(key.get(), expire_timeout), + Entry::Vacant(entry) => { + let key = self.expirations.insert((topic, data), expire_timeout); + entry.insert(key); + } + } + } + + // Get the registered messages for this topic. + pub fn retrieve(&mut self, topic: &GossipTopic) -> Option> + '_> { + if let Some(msgs) = self.topic_msgs.remove(topic) { + for (_, key) in msgs.iter() { + self.expirations.remove(key); + } + Some(msgs.into_keys()) + } else { + None + } + } +} + +impl futures::stream::Stream for GossipCache { + type Item = Result; // We don't care to retrieve the expired data. + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.expirations.poll_expired(cx) { + Poll::Ready(Some(Ok(expired))) => { + let expected_key = expired.key(); + let (topic, data) = expired.into_inner(); + match self.topic_msgs.get_mut(&topic) { + Some(msgs) => { + let key = msgs.remove(&data); + debug_assert_eq!(key, Some(expected_key)); + if msgs.is_empty() { + // no more messages for this topic. + self.topic_msgs.remove(&topic); + } + } + None => { + #[cfg(debug_assertions)] + panic!("Topic for registered message is not present.") + } + } + Poll::Ready(Some(Ok(topic))) + } + Poll::Ready(Some(Err(x))) => Poll::Ready(Some(Err(x.to_string()))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} + +#[cfg(test)] +mod tests { + use crate::types::GossipKind; + + use super::*; + use futures::stream::StreamExt; + + #[tokio::test] + async fn test_stream() { + let mut cache = GossipCache::builder() + .default_timeout(Duration::from_millis(300)) + .build(); + let test_topic = GossipTopic::new( + GossipKind::Attestation(1u64.into()), + crate::types::GossipEncoding::SSZSnappy, + [0u8; 4], + ); + cache.insert(test_topic, vec![]); + tokio::time::sleep(Duration::from_millis(300)).await; + while cache.next().await.is_some() {} + assert!(cache.expirations.is_empty()); + assert!(cache.topic_msgs.is_empty()); + } +} diff --git a/beacon_node/lighthouse_network/src/behaviour/mod.rs b/beacon_node/lighthouse_network/src/behaviour/mod.rs index 2a79961094..4644b5b2af 100644 --- a/beacon_node/lighthouse_network/src/behaviour/mod.rs +++ b/beacon_node/lighthouse_network/src/behaviour/mod.rs @@ -15,6 +15,8 @@ use crate::types::{ }; use crate::Eth2Enr; use crate::{error, metrics, Enr, NetworkGlobals, PubsubMessage, TopicHash}; +use futures::stream::StreamExt; +use libp2p::gossipsub::error::PublishError; use libp2p::{ core::{ connection::ConnectionId, identity::Keypair, multiaddr::Protocol as MProtocol, Multiaddr, @@ -50,6 +52,9 @@ use types::{ SignedBeaconBlock, Slot, SubnetId, SyncSubnetId, }; +use self::gossip_cache::GossipCache; + +mod gossip_cache; pub mod gossipsub_scoring_parameters; /// The number of peers we target per subnet for discovery queries. @@ -177,6 +182,8 @@ pub struct Behaviour { /// The interval for updating gossipsub scores #[behaviour(ignore)] update_gossipsub_scores: tokio::time::Interval, + #[behaviour(ignore)] + gossip_cache: GossipCache, /// Logger for behaviour actions. #[behaviour(ignore)] log: slog::Logger, @@ -280,6 +287,16 @@ impl Behaviour { ..Default::default() }; + let slot_duration = std::time::Duration::from_secs(ctx.chain_spec.seconds_per_slot); + // Half an epoch + let gossip_max_retry_delay = std::time::Duration::from_secs( + ctx.chain_spec.seconds_per_slot * TSpec::slots_per_epoch() / 2, + ); + let gossip_cache = GossipCache::builder() + .default_timeout(gossip_max_retry_delay) + .beacon_block_timeout(slot_duration) + .build(); + Ok(Behaviour { // Sub-behaviours gossipsub, @@ -297,6 +314,7 @@ impl Behaviour { log: behaviour_log, score_settings, fork_context: ctx.fork_context, + gossip_cache, update_gossipsub_scores, }) } @@ -422,9 +440,11 @@ impl Behaviour { for message in messages { for topic in message.topics(GossipEncoding::default(), self.enr_fork_id.fork_digest) { let message_data = message.encode(GossipEncoding::default()); - if let Err(e) = self.gossipsub.publish(topic.clone().into(), message_data) { - slog::warn!(self.log, "Could not publish message"; - "error" => ?e); + if let Err(e) = self + .gossipsub + .publish(topic.clone().into(), message_data.clone()) + { + slog::warn!(self.log, "Could not publish message"; "error" => ?e); // add to metrics match topic.kind() { @@ -445,6 +465,10 @@ impl Behaviour { }; } } + + if let PublishError::InsufficientPeers = e { + self.gossip_cache.insert(topic, message_data); + } } } } @@ -868,11 +892,39 @@ impl NetworkBehaviourEventProcess for Behaviour< } } GossipsubEvent::Subscribed { peer_id, topic } => { - if let Some(subnet_id) = subnet_from_topic_hash(&topic) { - self.network_globals - .peers - .write() - .add_subscription(&peer_id, subnet_id); + if let Ok(topic) = GossipTopic::decode(topic.as_str()) { + if let Some(subnet_id) = topic.subnet_id() { + self.network_globals + .peers + .write() + .add_subscription(&peer_id, subnet_id); + } + // Try to send the cached messages for this topic + if let Some(msgs) = self.gossip_cache.retrieve(&topic) { + for data in msgs { + let topic_str: &str = topic.kind().as_ref(); + match self.gossipsub.publish(topic.clone().into(), data) { + Ok(_) => { + warn!(self.log, "Gossip message published on retry"; "topic" => topic_str); + if let Some(v) = metrics::get_int_counter( + &metrics::GOSSIP_LATE_PUBLISH_PER_TOPIC_KIND, + &[topic_str], + ) { + v.inc() + }; + } + Err(e) => { + warn!(self.log, "Gossip message publish failed on retry"; "topic" => topic_str, "error" => %e); + if let Some(v) = metrics::get_int_counter( + &metrics::GOSSIP_FAILED_LATE_PUBLISH_PER_TOPIC_KIND, + &[topic_str], + ) { + v.inc() + }; + } + } + } + } } } GossipsubEvent::Unsubscribed { peer_id, topic } => { @@ -1125,6 +1177,21 @@ impl Behaviour { self.peer_manager.update_gossipsub_scores(&self.gossipsub); } + // poll the gossipsub cache to clear expired messages + while let Poll::Ready(Some(result)) = self.gossip_cache.poll_next_unpin(cx) { + match result { + Err(e) => warn!(self.log, "Gossip cache error"; "error" => e), + Ok(expired_topic) => { + if let Some(v) = metrics::get_int_counter( + &metrics::GOSSIP_EXPIRED_LATE_PUBLISH_PER_TOPIC_KIND, + &[expired_topic.kind().as_ref()], + ) { + v.inc() + }; + } + } + } + Poll::Pending } } diff --git a/beacon_node/lighthouse_network/src/metrics.rs b/beacon_node/lighthouse_network/src/metrics.rs index 1dfe0448b7..66d7a1f74a 100644 --- a/beacon_node/lighthouse_network/src/metrics.rs +++ b/beacon_node/lighthouse_network/src/metrics.rs @@ -81,14 +81,30 @@ lazy_static! { "Gossipsub messages that we did not accept, per client", &["client", "validation_result"] ); - + pub static ref GOSSIP_LATE_PUBLISH_PER_TOPIC_KIND: Result = + try_create_int_counter_vec( + "gossipsub_late_publish_per_topic_kind", + "Messages published late to gossipsub per topic kind.", + &["topic_kind"] + ); + pub static ref GOSSIP_EXPIRED_LATE_PUBLISH_PER_TOPIC_KIND: Result = + try_create_int_counter_vec( + "gossipsub_expired_late_publish_per_topic_kind", + "Messages that expired waiting to be published on retry to gossipsub per topic kind.", + &["topic_kind"] + ); + pub static ref GOSSIP_FAILED_LATE_PUBLISH_PER_TOPIC_KIND: Result = + try_create_int_counter_vec( + "gossipsub_failed_late_publish_per_topic_kind", + "Messages that failed to be published on retry to gossipsub per topic kind.", + &["topic_kind"] + ); pub static ref PEER_SCORE_DISTRIBUTION: Result = try_create_int_gauge_vec( "peer_score_distribution", "The distribution of connected peer scores", &["position"] ); - pub static ref PEER_SCORE_PER_CLIENT: Result = try_create_float_gauge_vec( "peer_score_per_client", diff --git a/beacon_node/lighthouse_network/src/types/topics.rs b/beacon_node/lighthouse_network/src/types/topics.rs index f9860a003f..3dd7ad8470 100644 --- a/beacon_node/lighthouse_network/src/types/topics.rs +++ b/beacon_node/lighthouse_network/src/types/topics.rs @@ -159,6 +159,14 @@ impl GossipTopic { Err(format!("Unknown topic: {}", topic)) } + + pub fn subnet_id(&self) -> Option { + match self.kind() { + GossipKind::Attestation(subnet_id) => Some(Subnet::Attestation(*subnet_id)), + GossipKind::SyncCommitteeMessage(subnet_id) => Some(Subnet::SyncCommittee(*subnet_id)), + _ => None, + } + } } impl From for Topic { @@ -237,12 +245,7 @@ impl From for GossipKind { /// Get subnet id from an attestation subnet topic hash. pub fn subnet_from_topic_hash(topic_hash: &TopicHash) -> Option { - let gossip_topic = GossipTopic::decode(topic_hash.as_str()).ok()?; - match gossip_topic.kind() { - GossipKind::Attestation(subnet_id) => Some(Subnet::Attestation(*subnet_id)), - GossipKind::SyncCommitteeMessage(subnet_id) => Some(Subnet::SyncCommittee(*subnet_id)), - _ => None, - } + GossipTopic::decode(topic_hash.as_str()).ok()?.subnet_id() } // Determines if a string is an attestation or sync committee topic.