From dd51a72f1fe5169547e40e74e289bf89f4f7c1b8 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Mon, 18 May 2020 21:35:14 +1000 Subject: [PATCH] Client identification (#1158) * Add logs and client identification * Add client to RPC Error log * Remove attestation service tests --- .../eth2-libp2p/src/peer_manager/mod.rs | 8 +-- beacon_node/eth2-libp2p/src/service.rs | 4 +- beacon_node/eth2-libp2p/src/types/globals.rs | 10 +++ .../network/src/attestation_service/mod.rs | 3 +- beacon_node/network/src/router/mod.rs | 63 +++++++++---------- beacon_node/network/src/router/processor.rs | 4 +- 6 files changed, 48 insertions(+), 44 deletions(-) diff --git a/beacon_node/eth2-libp2p/src/peer_manager/mod.rs b/beacon_node/eth2-libp2p/src/peer_manager/mod.rs index d3ca8954a0..2f5d0e9d3f 100644 --- a/beacon_node/eth2-libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2-libp2p/src/peer_manager/mod.rs @@ -258,13 +258,7 @@ impl PeerManager { } pub fn handle_rpc_error(&mut self, peer_id: &PeerId, protocol: Protocol, err: &RPCError) { - let client = self - .network_globals - .peers - .read() - .peer_info(peer_id) - .map(|info| info.client.clone()) - .unwrap_or_default(); + let client = self.network_globals.client(peer_id); debug!(self.log, "RPCError"; "protocol" => protocol.to_string(), "err" => err.to_string(), "client" => client.to_string()); // Map this error to a `PeerAction` (if any) diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index b3f0b64062..0d98d9df15 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -18,7 +18,7 @@ use libp2p::{ swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}, PeerId, Swarm, Transport, }; -use slog::{crit, debug, error, info, o, trace, warn}; +use slog::{crit, debug, info, o, trace, warn}; use std::fs::File; use std::io::prelude::*; use std::io::{Error, ErrorKind}; @@ -220,7 +220,7 @@ impl Service { /// Adds a peer to be banned for a period of time, specified by a timeout. pub fn disconnect_and_ban_peer(&mut self, peer_id: PeerId, timeout: Duration) { - error!(self.log, "Disconnecting and banning peer"; "peer_id" => format!("{:?}", peer_id), "timeout" => format!("{:?}", timeout)); + warn!(self.log, "Disconnecting and banning peer"; "peer_id" => peer_id.to_string(), "timeout" => format!("{:?}", timeout)); self.peers_to_ban.insert( peer_id.clone(), Duration::from_millis(BAN_PEER_WAIT_TIMEOUT), diff --git a/beacon_node/eth2-libp2p/src/types/globals.rs b/beacon_node/eth2-libp2p/src/types/globals.rs index 60ae12a8c5..d765d4240f 100644 --- a/beacon_node/eth2-libp2p/src/types/globals.rs +++ b/beacon_node/eth2-libp2p/src/types/globals.rs @@ -2,6 +2,7 @@ use crate::peer_manager::PeerDB; use crate::rpc::methods::MetaData; use crate::types::SyncState; +use crate::Client; use crate::EnrExt; use crate::{discovery::enr::Eth2Enr, Enr, GossipTopic, Multiaddr, PeerId}; use parking_lot::RwLock; @@ -99,6 +100,15 @@ impl NetworkGlobals { self.sync_state.read().clone() } + /// Returns a `Client` type if one is known for the `PeerId`. + pub fn client(&self, peer_id: &PeerId) -> Client { + self.peers + .read() + .peer_info(peer_id) + .map(|info| info.client.clone()) + .unwrap_or_default() + } + /// Updates the syncing state of the node. /// /// If there is a new state, the old state and the new states are returned. diff --git a/beacon_node/network/src/attestation_service/mod.rs b/beacon_node/network/src/attestation_service/mod.rs index 90f67629f3..e8c80a5d7f 100644 --- a/beacon_node/network/src/attestation_service/mod.rs +++ b/beacon_node/network/src/attestation_service/mod.rs @@ -17,7 +17,8 @@ use std::task::{Context, Poll}; use std::time::{Duration, Instant}; use types::{Attestation, EthSpec, Slot, SubnetId}; -mod tests; +//TODO: Removed attestation subnet tests until they become deterministic +//mod tests; /// The minimum number of slots ahead that we attempt to discover peers for a subscription. If the /// slot is less than this number, skip the peer discovery process. diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index 900c1825bf..78e1b8b2a1 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -10,10 +10,7 @@ use crate::error; use crate::service::NetworkMessage; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError}; use eth2_libp2p::{ - rpc::{ - RPCCodedResponse, RPCError, RPCRequest, RPCResponse, RPCResponseErrorCode, RequestId, - ResponseTermination, - }, + rpc::{RPCCodedResponse, RPCRequest, RPCResponse, RequestId, ResponseTermination}, MessageId, NetworkGlobals, PeerId, PubsubMessage, RPCEvent, }; use futures::prelude::*; @@ -30,6 +27,8 @@ use types::EthSpec; pub struct Router { /// A channel to the network service to allow for gossip propagation. network_send: mpsc::UnboundedSender>, + /// Access to the peer db for logging. + network_globals: Arc>, /// Processes validated and decoded messages from the network. Has direct access to the /// sync manager. processor: Processor, @@ -71,7 +70,7 @@ impl Router { let processor = Processor::new( runtime_handle, beacon_chain, - network_globals, + network_globals.clone(), network_send.clone(), &log, ); @@ -79,6 +78,7 @@ impl Router { // generate the Message handler let mut handler = Router { network_send, + network_globals, processor, log: message_handler_log, }; @@ -124,7 +124,11 @@ impl Router { match rpc_message { RPCEvent::Request(id, req) => self.handle_rpc_request(peer_id, id, req), RPCEvent::Response(id, resp) => self.handle_rpc_response(peer_id, id, resp), - RPCEvent::Error(id, _protocol, error) => self.handle_rpc_error(peer_id, id, error), + RPCEvent::Error(id, _protocol, error) => { + warn!(self.log, "RPC Error"; "peer_id" => peer_id.to_string(), "request_id" => id, "error" => error.to_string(), + "client" => self.network_globals.client(&peer_id).to_string()); + self.processor.on_rpc_error(peer_id, id); + } } } @@ -142,9 +146,10 @@ impl Router { } RPCRequest::Goodbye(goodbye_reason) => { debug!( - self.log, "PeerGoodbye"; - "peer" => format!("{:?}", peer_id), + self.log, "Peer sent Goodbye"; + "peer_id" => peer_id.to_string(), "reason" => format!("{:?}", goodbye_reason), + "client" => self.network_globals.client(&peer_id).to_string(), ); self.processor.on_disconnect(peer_id); } @@ -170,28 +175,28 @@ impl Router { // an error could have occurred. match error_response { RPCCodedResponse::InvalidRequest(error) => { - warn!(self.log, "RPC Invalid Request"; "peer_id" => peer_id.to_string(), "request_id" => request_id, "error" => error.to_string()); - self.handle_rpc_error( - peer_id, - request_id, - RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest), - ); + warn!(self.log, "RPC Invalid Request"; + "peer_id" => peer_id.to_string(), + "request_id" => request_id, + "error" => error.to_string(), + "client" => self.network_globals.client(&peer_id).to_string()); + self.processor.on_rpc_error(peer_id, request_id); } RPCCodedResponse::ServerError(error) => { - warn!(self.log, "RPC Server Error"; "peer_id" => peer_id.to_string(), "request_id" => request_id, "error" => error.to_string()); - self.handle_rpc_error( - peer_id, - request_id, - RPCError::ErrorResponse(RPCResponseErrorCode::ServerError), - ); + warn!(self.log, "RPC Server Error" ; + "peer_id" => peer_id.to_string(), + "request_id" => request_id, + "error" => error.to_string(), + "client" => self.network_globals.client(&peer_id).to_string()); + self.processor.on_rpc_error(peer_id, request_id); } RPCCodedResponse::Unknown(error) => { - warn!(self.log, "RPC Unknown Error"; "peer_id" => peer_id.to_string(), "request_id" => request_id, "error" => error.to_string()); - self.handle_rpc_error( - peer_id, - request_id, - RPCError::ErrorResponse(RPCResponseErrorCode::Unknown), - ); + warn!(self.log, "RPC Unknown Error"; + "peer_id" => peer_id.to_string(), + "request_id" => request_id, + "error" => error.to_string(), + "client" => self.network_globals.client(&peer_id).to_string()); + self.processor.on_rpc_error(peer_id, request_id); } RPCCodedResponse::Success(response) => match response { RPCResponse::Status(status_message) => { @@ -234,12 +239,6 @@ impl Router { } } - /// Handle various RPC errors - fn handle_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) { - warn!(self.log, "RPC Error"; "Peer" => format!("{:?}", peer_id), "request_id" => format!("{}", request_id), "Error" => format!("{:?}", error)); - self.processor.on_rpc_error(peer_id, request_id); - } - /// Handle RPC messages fn handle_gossip( &mut self, diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index cf791e5579..76f0cdf449 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -151,7 +151,7 @@ impl Processor { debug!( self.log, "Received Status Response"; - "peer" => format!("{:?}", peer_id), + "peer_id" => peer_id.to_string(), "fork_digest" => format!("{:?}", status.fork_digest), "finalized_root" => format!("{:?}", status.finalized_root), "finalized_epoch" => format!("{:?}", status.finalized_epoch), @@ -185,7 +185,7 @@ impl Processor { // The node is on a different network/fork, disconnect them. debug!( self.log, "Handshake Failure"; - "peer" => format!("{:?}", peer_id), + "peer_id" => peer_id.to_string(), "reason" => "incompatible forks", "our_fork" => hex::encode(local.fork_digest), "their_fork" => hex::encode(remote.fork_digest)