From b885d79ac3b46b68766574e10beaf443bec494a3 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Mon, 20 Jul 2020 08:25:32 +0530 Subject: [PATCH] Fix attestation propagation (#1360) * Add `should_process` for conditional processing of Attestations * Remove ATTESTATIONS_IGNORED metric --- .../network/src/attestation_service/mod.rs | 2 +- beacon_node/network/src/metrics.rs | 4 --- beacon_node/network/src/router/mod.rs | 21 ++++++++++------ beacon_node/network/src/service.rs | 25 ++++++++----------- 4 files changed, 26 insertions(+), 26 deletions(-) diff --git a/beacon_node/network/src/attestation_service/mod.rs b/beacon_node/network/src/attestation_service/mod.rs index bfd63e962a..b96d472696 100644 --- a/beacon_node/network/src/attestation_service/mod.rs +++ b/beacon_node/network/src/attestation_service/mod.rs @@ -271,7 +271,7 @@ impl AttestationService { /// Checks if we have subscribed aggregate validators for the subnet. If not, checks the gossip /// verification, re-propagates and returns false. pub fn should_process_attestation( - &mut self, + &self, subnet: SubnetId, attestation: &Attestation, ) -> bool { diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index e7b4b53c09..b2f789e36a 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -12,10 +12,6 @@ lazy_static! { "network_gossip_unaggregated_attestations_rx_total", "Count of gossip unaggregated attestations received" ); - pub static ref GOSSIP_UNAGGREGATED_ATTESTATIONS_IGNORED: Result = try_create_int_counter( - "network_gossip_unaggregated_attestations_ignored_total", - "Count of gossip unaggregated attestations ignored by attestation service" - ); pub static ref GOSSIP_AGGREGATED_ATTESTATIONS_RX: Result = try_create_int_counter( "network_gossip_aggregated_attestations_rx_total", "Count of gossip aggregated attestations received" diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index 067b258407..e06bd1a9d0 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -63,8 +63,9 @@ pub enum RouterMessage { error: RPCError, }, /// A gossip message has been received. The fields are: message id, the peer that sent us this - /// message and the message itself. - PubsubMessage(MessageId, PeerId, PubsubMessage), + /// message, the message itself and a bool which indicates if the message should be processed + /// by the beacon chain after successful verification. + PubsubMessage(MessageId, PeerId, PubsubMessage, bool), /// The peer manager has requested we re-status a peer. StatusPeer(PeerId), } @@ -152,8 +153,8 @@ impl Router { "client" => self.network_globals.client(&peer_id).to_string()); self.processor.on_rpc_error(peer_id, request_id); } - RouterMessage::PubsubMessage(id, peer_id, gossip) => { - self.handle_gossip(id, peer_id, gossip); + RouterMessage::PubsubMessage(id, peer_id, gossip, should_process) => { + self.handle_gossip(id, peer_id, gossip, should_process); } } } @@ -200,12 +201,16 @@ impl Router { } } - /// Handle RPC messages + /// Handle RPC messages. + /// Note: `should_process` is currently only useful for the `Attestation` variant. + /// if `should_process` is `false`, we only propagate the message on successful verification, + /// else, we propagate **and** import into the beacon chain. fn handle_gossip( &mut self, id: MessageId, peer_id: PeerId, gossip_message: PubsubMessage, + should_process: bool, ) { match gossip_message { // Attestations should never reach the router. @@ -228,8 +233,10 @@ impl Router { ) { self.propagate_message(id, peer_id.clone()); - self.processor - .import_unaggregated_attestation(peer_id, gossip_verified); + if should_process { + self.processor + .import_unaggregated_attestation(peer_id, gossip_verified); + } } } PubsubMessage::BeaconBlock(block) => { diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 0f1b1f77e6..fa9e165061 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -344,27 +344,24 @@ fn spawn_service( PubsubMessage::Attestation(ref subnet_and_attestation) => { let subnet = subnet_and_attestation.0; let attestation = &subnet_and_attestation.1; - // checks if we have an aggregator for the slot. If so, we process - // the attestation - if service.attestation_service.should_process_attestation( + // checks if we have an aggregator for the slot. If so, we should process + // the attestation, else we just just propagate the Attestation. + let should_process = service.attestation_service.should_process_attestation( subnet, attestation, - ) { - let _ = service - .router_send - .send(RouterMessage::PubsubMessage(id, source, message)) - .map_err(|_| { - debug!(service.log, "Failed to send pubsub message to router"); - }); - } else { - metrics::inc_counter(&metrics::GOSSIP_UNAGGREGATED_ATTESTATIONS_IGNORED) - } + ); + let _ = service + .router_send + .send(RouterMessage::PubsubMessage(id, source, message, should_process)) + .map_err(|_| { + debug!(service.log, "Failed to send pubsub message to router"); + }); } _ => { // all else is sent to the router let _ = service .router_send - .send(RouterMessage::PubsubMessage(id, source, message)) + .send(RouterMessage::PubsubMessage(id, source, message, true)) .map_err(|_| { debug!(service.log, "Failed to send pubsub message to router"); });