From eb561405828c3d15e23f8a6aad85a3f31f4618ce Mon Sep 17 00:00:00 2001 From: divma Date: Mon, 16 Nov 2020 04:06:14 +0000 Subject: [PATCH] Update logs + do not downscore peers if WE time out (#1901) ## Issue Addressed - RPC Errors were being logged twice: first in the peer manager and then again in the router, so leave just the peer manager's one - The "reduce peer count" warn message gets thrown to the user for every missed chunk, so instead print it when the request times out and also do not include there info that is not relevant to the user - The processor didn't have the service tag so add it - Impl `KV` for status message - Do not downscore peers if we are the ones that timed out Other small improvements --- beacon_node/eth2_libp2p/src/behaviour/mod.rs | 20 +++++++--- .../eth2_libp2p/src/peer_manager/mod.rs | 32 +++++++++++----- beacon_node/eth2_libp2p/src/rpc/handler.rs | 13 +++++-- beacon_node/eth2_libp2p/src/rpc/methods.rs | 16 ++++++++ .../src/beacon_processor/chain_segment.rs | 4 +- beacon_node/network/src/router/mod.rs | 16 ++------ beacon_node/network/src/router/processor.rs | 37 +++---------------- beacon_node/network/src/service.rs | 4 +- 8 files changed, 74 insertions(+), 68 deletions(-) diff --git a/beacon_node/eth2_libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs index bcb5914962..fde3955c1f 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -1,5 +1,5 @@ use crate::behaviour::gossipsub_scoring_parameters::PeerScoreSettings; -use crate::peer_manager::{score::PeerAction, PeerManager, PeerManagerEvent}; +use crate::peer_manager::{score::PeerAction, ConnectionDirection, PeerManager, PeerManagerEvent}; use crate::rpc::*; use crate::service::METADATA_FILENAME; use crate::types::{GossipEncoding, GossipKind, GossipTopic, MessageData, SubnetDiscovery}; @@ -70,8 +70,6 @@ pub enum BehaviourEvent { id: RequestId, /// The peer to which this request was sent. peer_id: PeerId, - /// The error that occurred. - error: RPCError, }, RequestReceived { /// The peer that sent the request. @@ -692,14 +690,24 @@ impl Behaviour { // Inform the peer manager of the error. // An inbound error here means we sent an error to the peer, or the stream // timed out. - self.peer_manager.handle_rpc_error(&peer_id, proto, &error); + self.peer_manager.handle_rpc_error( + &peer_id, + proto, + &error, + ConnectionDirection::Incoming, + ); } HandlerErr::Outbound { id, proto, error } => { // Inform the peer manager that a request we sent to the peer failed - self.peer_manager.handle_rpc_error(&peer_id, proto, &error); + self.peer_manager.handle_rpc_error( + &peer_id, + proto, + &error, + ConnectionDirection::Outgoing, + ); // inform failures of requests comming outside the behaviour if !matches!(id, RequestId::Behaviour) { - self.add_event(BehaviourEvent::RPCFailed { peer_id, id, error }); + self.add_event(BehaviourEvent::RPCFailed { peer_id, id }); } } } diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index 3377c3d7b4..11dc373155 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -354,10 +354,17 @@ impl PeerManager { /// An error has occured in the RPC. /// /// This adjusts a peer's score based on the error. - pub fn handle_rpc_error(&mut self, peer_id: &PeerId, protocol: Protocol, err: &RPCError) { + pub fn handle_rpc_error( + &mut self, + peer_id: &PeerId, + protocol: Protocol, + err: &RPCError, + direction: ConnectionDirection, + ) { let client = self.network_globals.client(peer_id); let score = self.network_globals.peers.read().score(peer_id); - debug!(self.log, "RPC Error"; "protocol" => protocol.to_string(), "err" => err.to_string(), "client" => client.to_string(), "peer_id" => peer_id.to_string(), "score" => score.to_string()); + debug!(self.log, "RPC Error"; "protocol" => %protocol, "err" => %err, "client" => %client, + "peer_id" => %peer_id, "score" => %score, "direction" => ?direction); // Map this error to a `PeerAction` (if any) let peer_action = match err { @@ -398,13 +405,20 @@ impl PeerManager { Protocol::Status => PeerAction::LowToleranceError, } } - RPCError::StreamTimeout => match protocol { - Protocol::Ping => PeerAction::LowToleranceError, - Protocol::BlocksByRange => PeerAction::MidToleranceError, - Protocol::BlocksByRoot => PeerAction::MidToleranceError, - Protocol::Goodbye => return, - Protocol::MetaData => return, - Protocol::Status => return, + RPCError::StreamTimeout => match direction { + ConnectionDirection::Incoming => { + // we timed out + warn!(self.log, "Timed out to a peer's request. Likely too many resources, reduce peer count"); + return; + } + ConnectionDirection::Outgoing => match protocol { + Protocol::Ping => PeerAction::LowToleranceError, + Protocol::BlocksByRange => PeerAction::MidToleranceError, + Protocol::BlocksByRoot => PeerAction::MidToleranceError, + Protocol::Goodbye => return, + Protocol::MetaData => return, + Protocol::Status => return, + }, }, RPCError::NegotiationTimeout => PeerAction::HighToleranceError, RPCError::RateLimited => match protocol { diff --git a/beacon_node/eth2_libp2p/src/rpc/handler.rs b/beacon_node/eth2_libp2p/src/rpc/handler.rs index b41426d41f..81641ff72f 100644 --- a/beacon_node/eth2_libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2_libp2p/src/rpc/handler.rs @@ -14,7 +14,7 @@ use libp2p::swarm::protocols_handler::{ KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, }; use libp2p::swarm::NegotiatedSubstream; -use slog::{crit, debug, warn}; +use slog::{crit, debug, trace, warn}; use smallvec::SmallVec; use std::{ collections::hash_map::Entry, @@ -238,7 +238,9 @@ where /// Initiates the handler's shutdown process, sending an optional last message to the peer. pub fn shutdown(&mut self, final_msg: Option<(RequestId, RPCRequest)>) { if matches!(self.state, HandlerState::Active) { - debug!(self.log, "Starting handler shutdown"; "unsent_queued_requests" => self.dial_queue.len()); + if !self.dial_queue.is_empty() { + debug!(self.log, "Starting handler shutdown"; "unsent_queued_requests" => self.dial_queue.len()); + } // we now drive to completion communications already dialed/established while let Some((id, req)) = self.dial_queue.pop() { self.pending_errors.push(HandlerErr::Outbound { @@ -283,8 +285,11 @@ where let inbound_info = if let Some(info) = self.inbound_substreams.get_mut(&inbound_id) { info } else { - warn!(self.log, "Inbound stream has expired, response not sent"; - "response" => response.to_string(), "id" => inbound_id, "msg" => "Likely too many resources, reduce peer count"); + if !matches!(response, RPCCodedResponse::StreamTermination(..)) { + // the stream is closed after sending the expected number of responses + trace!(self.log, "Inbound stream has expired, response not sent"; + "response" => %response, "id" => inbound_id); + } return; }; diff --git a/beacon_node/eth2_libp2p/src/rpc/methods.rs b/beacon_node/eth2_libp2p/src/rpc/methods.rs index 0fe4d13479..b2ce0cb8c1 100644 --- a/beacon_node/eth2_libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2_libp2p/src/rpc/methods.rs @@ -392,6 +392,22 @@ impl std::fmt::Display for BlocksByRangeRequest { } } +impl slog::KV for StatusMessage { + fn serialize( + &self, + record: &slog::Record, + serializer: &mut dyn slog::Serializer, + ) -> slog::Result { + use slog::Value; + serializer.emit_str("fork_digest", &format!("{:?}", self.fork_digest))?; + Value::serialize(&self.finalized_epoch, record, "finalized_epoch", serializer)?; + serializer.emit_str("finalized_root", &self.finalized_root.to_string())?; + Value::serialize(&self.head_slot, record, "head_slot", serializer)?; + serializer.emit_str("head_root", &self.head_root.to_string())?; + slog::Result::Ok(()) + } +} + impl slog::Value for RequestId { fn serialize( &self, diff --git a/beacon_node/network/src/beacon_processor/chain_segment.rs b/beacon_node/network/src/beacon_processor/chain_segment.rs index e659a84b8d..47e14f5e2a 100644 --- a/beacon_node/network/src/beacon_processor/chain_segment.rs +++ b/beacon_node/network/src/beacon_processor/chain_segment.rs @@ -34,12 +34,12 @@ pub fn handle_chain_segment( let result = match process_blocks(chain, downloaded_blocks.iter(), &log) { (_, Ok(_)) => { - debug!(log, "Batch processed"; "batch_epoch" => epoch, "first_block_slot" => start_slot, + debug!(log, "Batch processed"; "batch_epoch" => epoch, "first_block_slot" => start_slot, "chain" => chain_id, "last_block_slot" => end_slot, "processed_blocks" => sent_blocks, "service"=> "sync"); BatchProcessResult::Success(sent_blocks > 0) } (imported_blocks, Err(e)) => { - debug!(log, "Batch processing failed"; "batch_epoch" => epoch, "first_block_slot" => start_slot, + debug!(log, "Batch processing failed"; "batch_epoch" => epoch, "first_block_slot" => start_slot, "chain" => chain_id, "last_block_slot" => end_slot, "error" => e, "imported_blocks" => imported_blocks, "service" => "sync"); BatchProcessResult::Failed(imported_blocks > 0) } diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index 4701bdb735..0e98f04dd1 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -11,8 +11,8 @@ use crate::error; use crate::service::NetworkMessage; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::{ - rpc::{RPCError, RequestId}, - MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response, + rpc::RequestId, MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, + Response, }; use futures::prelude::*; use processor::Processor; @@ -26,8 +26,6 @@ use types::EthSpec; /// passing them to the internal message processor. The message processor spawns a syncing thread /// which manages which blocks need to be requested and processed. pub struct Router { - /// Access to the peer db for logging. - network_globals: Arc>, /// Processes validated and decoded messages from the network. Has direct access to the /// sync manager. processor: Processor, @@ -58,7 +56,6 @@ pub enum RouterMessage { RPCFailed { peer_id: PeerId, request_id: RequestId, - error: RPCError, }, /// A gossip message has been received. The fields are: message id, the peer that sent us this /// message, the message itself and a bool which indicates if the message should be processed @@ -86,14 +83,13 @@ impl Router { let processor = Processor::new( executor.clone(), beacon_chain, - network_globals.clone(), + network_globals, network_send, &log, ); // generate the Message handler let mut handler = Router { - network_globals, processor, log: message_handler_log, }; @@ -141,13 +137,7 @@ impl Router { RouterMessage::RPCFailed { peer_id, request_id, - error, } => { - debug!(self.log, "RPC Error"; - "peer_id" => peer_id.to_string(), - "request_id" => request_id, - "error" => error.to_string(), - "client" => self.network_globals.client(&peer_id).to_string()); self.processor.on_rpc_error(peer_id, request_id); } RouterMessage::PubsubMessage(id, peer_id, gossip, should_process) => { diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 109edba190..a0578a4b3b 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -78,7 +78,7 @@ impl Processor { sync_send, network: HandlerNetworkContext::new(network_send, log.clone()), beacon_processor_send, - log: log.clone(), + log: log.new(o!("service" => "router")), } } @@ -114,16 +114,7 @@ impl Processor { /// re-status. pub fn send_status(&mut self, peer_id: PeerId) { if let Ok(status_message) = status_message(&self.chain) { - debug!( - self.log, - "Sending Status Request"; - "peer" => peer_id.to_string(), - "fork_digest" => format!("{:?}", status_message.fork_digest), - "finalized_root" => format!("{:?}", status_message.finalized_root), - "finalized_epoch" => format!("{:?}", status_message.finalized_epoch), - "head_root" => format!("{}", status_message.head_root), - "head_slot" => format!("{}", status_message.head_slot), - ); + debug!(self.log, "Sending Status Request"; "peer" => %peer_id, &status_message); self.network .send_processor_request(peer_id, Request::Status(status_message)); } @@ -138,16 +129,7 @@ impl Processor { request_id: PeerRequestId, status: StatusMessage, ) { - debug!( - self.log, - "Received Status Request"; - "peer" => peer_id.to_string(), - "fork_digest" => format!("{:?}", status.fork_digest), - "finalized_root" => format!("{:?}", status.finalized_root), - "finalized_epoch" => format!("{:?}", status.finalized_epoch), - "head_root" => format!("{}", status.head_root), - "head_slot" => format!("{}", status.head_slot), - ); + debug!(self.log, "Received Status Request"; "peer_id" => %peer_id, &status); // ignore status responses if we are shutting down if let Ok(status_message) = status_message(&self.chain) { @@ -166,16 +148,7 @@ impl Processor { /// Process a `Status` response from a peer. pub fn on_status_response(&mut self, peer_id: PeerId, status: StatusMessage) { - debug!( - self.log, - "Received Status Response"; - "peer_id" => peer_id.to_string(), - "fork_digest" => format!("{:?}", status.fork_digest), - "finalized_root" => format!("{:?}", status.finalized_root), - "finalized_epoch" => format!("{:?}", status.finalized_epoch), - "head_root" => format!("{}", status.head_root), - "head_slot" => format!("{}", status.head_slot), - ); + debug!(self.log, "Received Status Response"; "peer_id" => %peer_id, &status); // Process the status message, without sending back another status. if let Err(e) = self.process_status(peer_id, status) { @@ -292,7 +265,7 @@ impl Processor { debug!( self.log, "Received BlocksByRange Request"; - "peer" => format!("{:?}", peer_id), + "peer_id" => %peer_id, "count" => req.count, "start_slot" => req.start_slot, "step" => req.step, diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 4973a1a7b7..d0c9e4159c 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -484,10 +484,10 @@ fn spawn_service( }); } - BehaviourEvent::RPCFailed{id, peer_id, error} => { + BehaviourEvent::RPCFailed{id, peer_id} => { let _ = service .router_send - .send(RouterMessage::RPCFailed{ peer_id, request_id: id, error }) + .send(RouterMessage::RPCFailed{ peer_id, request_id: id}) .map_err(|_| { debug!(service.log, "Failed to send RPC to router"); });