From cbe8dd96b21f275c802fcd9e25e25c704c84a421 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Fri, 29 Nov 2019 22:25:36 +1100 Subject: [PATCH] Clean up network logging and code (#643) * Apply clippy lints to beacon node * Remove unnecessary logging and correct formatting * Apply reviewers suggestions --- beacon_node/beacon_chain/src/beacon_chain.rs | 4 +- beacon_node/beacon_chain/src/eth1_chain.rs | 4 +- beacon_node/beacon_chain/src/fork_choice.rs | 2 +- beacon_node/eth2-libp2p/src/behaviour.rs | 16 ++++---- beacon_node/eth2-libp2p/src/rpc/handler.rs | 24 +++-------- beacon_node/eth2-libp2p/src/rpc/mod.rs | 2 +- beacon_node/eth2-libp2p/src/rpc/protocol.rs | 2 + beacon_node/eth2-libp2p/src/service.rs | 9 +---- beacon_node/network/src/message_handler.rs | 1 + beacon_node/network/src/service.rs | 6 +-- beacon_node/network/src/sync/manager.rs | 40 +++++++++---------- .../network/src/sync/message_processor.rs | 21 +++++----- beacon_node/src/config.rs | 8 ++-- 13 files changed, 58 insertions(+), 81 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 09aa28e86b..7feb8f2f96 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -664,7 +664,7 @@ impl BeaconChain { }); } other => { - warn!( + trace!( self.log, "Beacon attestation rejected"; "reason" => format!("{:?}", other), @@ -1019,7 +1019,7 @@ impl BeaconChain { }); } other => { - warn!( + trace!( self.log, "Beacon block rejected"; "reason" => format!("{:?}", other), diff --git a/beacon_node/beacon_chain/src/eth1_chain.rs b/beacon_node/beacon_chain/src/eth1_chain.rs index 9df12f6f08..5082ddefaf 100644 --- a/beacon_node/beacon_chain/src/eth1_chain.rs +++ b/beacon_node/beacon_chain/src/eth1_chain.rs @@ -290,11 +290,11 @@ fn eth1_block_hash_at_start_of_voting_period( let slot = (state.slot / period) * period; let prev_state_root = state .get_state_root(slot) - .map_err(|e| Error::UnableToGetPreviousStateRoot(e))?; + .map_err(Error::UnableToGetPreviousStateRoot)?; store .get_state::(&prev_state_root, Some(slot)) - .map_err(|e| Error::StoreError(e))? + .map_err(Error::StoreError)? .map(|state| state.eth1_data.block_hash) .ok_or_else(|| Error::PreviousStateNotInDB) } diff --git a/beacon_node/beacon_chain/src/fork_choice.rs b/beacon_node/beacon_chain/src/fork_choice.rs index 9f9277693f..0b20fdb8d6 100644 --- a/beacon_node/beacon_chain/src/fork_choice.rs +++ b/beacon_node/beacon_chain/src/fork_choice.rs @@ -145,7 +145,7 @@ impl ForkChoice { // Fast-forward the state to the start slot of the epoch where it was justified. for _ in block.slot.as_u64()..block_justified_slot.as_u64() { per_slot_processing(&mut state, &chain.spec) - .map_err(|e| BeaconChainError::SlotProcessingError(e))? + .map_err(BeaconChainError::SlotProcessingError)? } (state, block_root, block_justified_slot) diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index 0b5118199b..63464bc729 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -173,11 +173,11 @@ impl NetworkBehaviourEventProcess format!("{}", peer_id), - "Protocol Version" => info.protocol_version, - "Agent Version" => info.agent_version, - "Listening Addresses" => format!("{:?}", info.listen_addrs), - "Observed Address" => format!("{:?}", observed_addr), - "Protocols" => format!("{:?}", info.protocols) + "protocol_version" => info.protocol_version, + "agent_version" => info.agent_version, + "listening_ addresses" => format!("{:?}", info.listen_addrs), + "observed_address" => format!("{:?}", observed_addr), + "protocols" => format!("{:?}", info.protocols) ); } IdentifyEvent::Sent { .. } => {} @@ -210,7 +210,7 @@ impl Behaviour { /// Publishes a message on the pubsub (gossipsub) behaviour. pub fn publish(&mut self, topics: &[Topic], message: PubsubMessage) { - let message_data = message.to_data(); + let message_data = message.into_data(); for topic in topics { self.gossipsub.publish(topic, message_data.clone()); } @@ -295,7 +295,7 @@ impl PubsubMessage { * Also note that a message can be associated with many topics. As soon as one of the topics is * known we match. If none of the topics are known we return an unknown state. */ - fn from_topics(topics: &Vec, data: Vec) -> Self { + fn from_topics(topics: &[TopicHash], data: Vec) -> Self { for topic in topics { // compare the prefix and postfix, then match on the topic let topic_parts: Vec<&str> = topic.as_str().split('/').collect(); @@ -316,7 +316,7 @@ impl PubsubMessage { PubsubMessage::Unknown(data) } - fn to_data(self) -> Vec { + fn into_data(self) -> Vec { match self { PubsubMessage::Block(data) | PubsubMessage::Attestation(data) diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2-libp2p/src/rpc/handler.rs index eef45cb26a..ce728dc995 100644 --- a/beacon_node/eth2-libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2-libp2p/src/rpc/handler.rs @@ -1,3 +1,6 @@ +#![allow(clippy::type_complexity)] +#![allow(clippy::cognitive_complexity)] + use super::methods::{RPCErrorResponse, RequestId}; use super::protocol::{RPCError, RPCProtocol, RPCRequest}; use super::RPCEvent; @@ -174,7 +177,6 @@ where } /// Opens an outbound substream with a request. - #[inline] pub fn send_request(&mut self, rpc_event: RPCEvent) { self.keep_alive = KeepAlive::Yes; @@ -194,12 +196,10 @@ where type OutboundProtocol = RPCRequest; type OutboundOpenInfo = RPCEvent; // Keep track of the id and the request - #[inline] fn listen_protocol(&self) -> SubstreamProtocol { self.listen_protocol.clone() } - #[inline] fn inject_fully_negotiated_inbound( &mut self, out: >::Output, @@ -225,7 +225,6 @@ where self.current_substream_id += 1; } - #[inline] fn inject_fully_negotiated_outbound( &mut self, out: >::Output, @@ -263,14 +262,11 @@ where // Note: If the substream has closed due to inactivity, or the substream is in the // wrong state a response will fail silently. - #[inline] fn inject_event(&mut self, rpc_event: Self::InEvent) { match rpc_event { RPCEvent::Request(_, _) => self.send_request(rpc_event), RPCEvent::Response(rpc_id, response) => { // check if the stream matching the response still exists - trace!(self.log, "Checking for outbound stream"); - // variables indicating if the response is an error response or a multi-part // response let res_is_error = response.is_error(); @@ -280,7 +276,6 @@ where Some((substream_state, _)) => { match std::mem::replace(substream_state, InboundSubstreamState::Poisoned) { InboundSubstreamState::ResponseIdle(substream) => { - trace!(self.log, "Stream is idle, sending message"; "message" => format!("{}", response)); // close the stream if there is no response if let RPCErrorResponse::StreamTermination(_) = response { trace!(self.log, "Stream termination sent. Ending the stream"); @@ -298,7 +293,6 @@ where if res_is_multiple => { // the stream is in use, add the request to a pending queue - trace!(self.log, "Adding message to queue"; "message" => format!("{}", response)); (*self .queued_outbound_items .entry(rpc_id) @@ -338,7 +332,6 @@ where } } - #[inline] fn inject_dial_upgrade_error( &mut self, _: Self::OutboundOpenInfo, @@ -351,7 +344,6 @@ where } } - #[inline] fn connection_keep_alive(&self) -> KeepAlive { self.keep_alive } @@ -384,7 +376,6 @@ where .poll() .map_err(|_| ProtocolsHandlerUpgrErr::Timer)? { - trace!(self.log, "Closing expired inbound stream"); self.inbound_substreams.remove(stream_id.get_ref()); } @@ -394,7 +385,6 @@ where .poll() .map_err(|_| ProtocolsHandlerUpgrErr::Timer)? { - trace!(self.log, "Closing expired outbound stream"); self.outbound_substreams.remove(stream_id.get_ref()); } @@ -403,7 +393,7 @@ where // Drain all queued items until all messages have been processed for this stream // TODO Improve this code logic let mut new_items_to_send = true; - while new_items_to_send == true { + while new_items_to_send { new_items_to_send = false; match self.inbound_substreams.entry(request_id) { Entry::Occupied(mut entry) => { @@ -418,7 +408,6 @@ where match substream.poll() { Ok(Async::Ready(raw_substream)) => { // completed the send - trace!(self.log, "RPC message sent"); // close the stream if required if closing { @@ -426,7 +415,6 @@ where InboundSubstreamState::Closing(raw_substream) } else { // check for queued chunks and update the stream - trace!(self.log, "Checking for queued items"); entry.get_mut().0 = apply_queued_responses( raw_substream, &mut self @@ -454,7 +442,6 @@ where }; } InboundSubstreamState::ResponseIdle(substream) => { - trace!(self.log, "Idle stream searching queue"); entry.get_mut().0 = apply_queued_responses( substream, &mut self.queued_outbound_items.get_mut(&request_id), @@ -500,12 +487,11 @@ where request, } => match substream.poll() { Ok(Async::Ready(Some(response))) => { - trace!(self.log, "Message received"; "message" => format!("{}", response)); if request.multiple_responses() { entry.get_mut().0 = OutboundSubstreamState::RequestPendingResponse { substream, - request: request, + request, }; let delay_key = &entry.get().1; self.outbound_substreams_delay diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index f467bc7ab4..47c19f3a2a 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -77,7 +77,7 @@ impl RPC { RPC { events: Vec::new(), marker: PhantomData, - log: log, + log, } } diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index 5c1ba63b69..003770af92 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -1,3 +1,5 @@ +#![allow(clippy::type_complexity)] + use super::methods::*; use crate::rpc::{ codec::{ diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index 1600a014a1..38d4f2aba5 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -141,14 +141,7 @@ impl Service { topics.push(topic_builder(ATTESTER_SLASHING_TOPIC)); // Add any topics specified by the user - topics.append( - &mut config - .topics - .iter() - .cloned() - .map(|s| Topic::new(s)) - .collect(), - ); + topics.append(&mut config.topics.iter().cloned().map(Topic::new).collect()); let mut subscribed_topics = vec![]; for topic in topics { diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index b21d6d1aa7..206d3c01e1 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -1,3 +1,4 @@ +#![allow(clippy::unit_arg)] use crate::error; use crate::service::NetworkMessage; use crate::sync::MessageProcessor; diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index ee3702826c..94b00bb7fe 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -161,7 +161,7 @@ fn network_service( match network_recv.poll() { Ok(Async::Ready(Some(message))) => match message { NetworkMessage::RPC(peer_id, rpc_event) => { - trace!(log, "Sending RPC"; "RPC" => format!("{}", rpc_event)); + trace!(log, "Sending RPC"; "rpc" => format!("{}", rpc_event)); libp2p_service.lock().swarm.send_rpc(peer_id, rpc_event); } NetworkMessage::Propagate { @@ -184,7 +184,7 @@ fn network_service( } else { trace!(log, "Propagating gossipsub message"; "propagation_peer" => format!("{:?}", propagation_source), - "message_id" => format!("{}", message_id), + "message_id" => message_id.to_string(), ); libp2p_service .lock() @@ -231,7 +231,7 @@ fn network_service( match locked_service.poll() { Ok(Async::Ready(Some(event))) => match event { Libp2pEvent::RPC(peer_id, rpc_event) => { - trace!(log, "Received RPC"; "RPC" => format!("{}", rpc_event)); + trace!(log, "Received RPC"; "rpc" => format!("{}", rpc_event)); // if we received a Goodbye message, drop and ban the peer if let RPCEvent::Request(_, RPCRequest::Goodbye(_)) = rpc_event { diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 1ac53e534f..a464532965 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -107,18 +107,18 @@ pub enum SyncMessage { BlocksByRangeResponse { peer_id: PeerId, request_id: RequestId, - beacon_block: Option>, + beacon_block: Option>>, }, /// A `BlocksByRoot` response has been received. BlocksByRootResponse { peer_id: PeerId, request_id: RequestId, - beacon_block: Option>, + beacon_block: Option>>, }, /// A block with an unknown parent has been received. - UnknownBlock(PeerId, BeaconBlock), + UnknownBlock(PeerId, Box>), /// A peer has sent an object that references a block that is unknown. This triggers the /// manager to attempt to find the block matching the unknown hash. @@ -520,7 +520,6 @@ impl SyncManager { parent_request.failed_attempts += 1; parent_request.state = BlockRequestsState::Queued; parent_request.last_submitted_peer = peer_id; - return; } } } @@ -549,7 +548,7 @@ impl SyncManager { BlockProcessingOutcome::Processed { block_root } => { info!(self.log, "Processed block"; "block" => format!("{}", block_root)); } - BlockProcessingOutcome::ParentUnknown { parent: _ } => { + BlockProcessingOutcome::ParentUnknown { .. } => { // We don't know of the blocks parent, begin a parent lookup search self.add_unknown_block(peer_id, block); } @@ -580,10 +579,10 @@ impl SyncManager { // make sure this block is not already being searched for // TODO: Potentially store a hashset of blocks for O(1) lookups for parent_req in self.parent_queue.iter() { - if let Some(_) = parent_req + if parent_req .downloaded_blocks .iter() - .find(|d_block| d_block == &&block) + .any(|d_block| d_block == &block) { // we are already searching for this block, ignore it return; @@ -915,14 +914,14 @@ impl SyncManager { // check if the chain exists if let Some(chain) = self.chain.upgrade() { match chain.process_block(block.clone()) { - Ok(BlockProcessingOutcome::ParentUnknown { parent: _ }) => { + Ok(BlockProcessingOutcome::ParentUnknown { .. }) => { // need to keep looking for parents completed_request.downloaded_blocks.push(block); completed_request.state = BlockRequestsState::Queued; re_run_poll = true; break; } - Ok(BlockProcessingOutcome::Processed { block_root: _ }) + Ok(BlockProcessingOutcome::Processed { .. }) | Ok(BlockProcessingOutcome::BlockIsAlreadyKnown { .. }) => {} Ok(outcome) => { // it's a future slot or an invalid block, remove it and try again @@ -965,13 +964,8 @@ impl SyncManager { } // remove any fully processed parent chains - self.parent_queue.retain(|req| { - if req.state == BlockRequestsState::ReadyToProcess { - false - } else { - true - } - }); + self.parent_queue + .retain(|req| req.state != BlockRequestsState::ReadyToProcess); re_run_poll } } @@ -1172,17 +1166,21 @@ impl Future for SyncManager { request_id, beacon_block, } => { - self.blocks_by_range_response(peer_id, request_id, beacon_block); + self.blocks_by_range_response( + peer_id, + request_id, + beacon_block.map(|b| *b), + ); } SyncMessage::BlocksByRootResponse { peer_id, request_id, beacon_block, } => { - self.blocks_by_root_response(peer_id, request_id, beacon_block); + self.blocks_by_root_response(peer_id, request_id, beacon_block.map(|b| *b)); } SyncMessage::UnknownBlock(peer_id, block) => { - self.add_unknown_block(peer_id, block); + self.add_unknown_block(peer_id, *block); } SyncMessage::UnknownBlockHash(peer_id, block_hash) => { self.search_for_block(peer_id, block_hash); @@ -1228,7 +1226,7 @@ impl Future for SyncManager { } // Shutdown the thread if the chain has termined - if let None = self.chain.upgrade() { + if self.chain.upgrade().is_none() { return Ok(Async::Ready(())); } @@ -1240,6 +1238,6 @@ impl Future for SyncManager { // update the state of the manager self.update_state(); - return Ok(Async::NotReady); + Ok(Async::NotReady) } } diff --git a/beacon_node/network/src/sync/message_processor.rs b/beacon_node/network/src/sync/message_processor.rs index dd9be8990c..c683beff37 100644 --- a/beacon_node/network/src/sync/message_processor.rs +++ b/beacon_node/network/src/sync/message_processor.rs @@ -6,7 +6,7 @@ use beacon_chain::{ use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::{RPCEvent, RPCRequest, RPCResponse, RequestId}; use eth2_libp2p::PeerId; -use slog::{debug, error, info, o, trace, warn}; +use slog::{debug, info, o, trace, warn}; use ssz::Encode; use std::sync::Arc; use store::Store; @@ -396,6 +396,7 @@ impl MessageProcessor { request_id: RequestId, beacon_block: Option>, ) { + let beacon_block = beacon_block.map(Box::new); trace!( self.log, "Received BlocksByRange Response"; @@ -416,6 +417,7 @@ impl MessageProcessor { request_id: RequestId, beacon_block: Option>, ) { + let beacon_block = beacon_block.map(Box::new); trace!( self.log, "Received BlocksByRoot Response"; @@ -442,11 +444,11 @@ impl MessageProcessor { "peer_id" => format!("{:?}",peer_id)); SHOULD_FORWARD_GOSSIP_BLOCK } - BlockProcessingOutcome::ParentUnknown { parent: _ } => { + BlockProcessingOutcome::ParentUnknown { .. } => { // Inform the sync manager to find parents for this block trace!(self.log, "Block with unknown parent received"; "peer_id" => format!("{:?}",peer_id)); - self.send_to_sync(SyncMessage::UnknownBlock(peer_id, block.clone())); + self.send_to_sync(SyncMessage::UnknownBlock(peer_id, Box::new(block.clone()))); SHOULD_FORWARD_GOSSIP_BLOCK } BlockProcessingOutcome::FutureSlot { @@ -473,13 +475,8 @@ impl MessageProcessor { SHOULD_NOT_FORWARD_GOSSIP_BLOCK //TODO: Decide if we want to forward these } }, - Err(e) => { - error!( - self.log, - "Error processing gossip beacon block"; - "error" => format!("{:?}", e), - "block slot" => block.slot - ); + Err(_) => { + // error is logged during the processing therefore no error is logged here trace!( self.log, "Erroneous gossip beacon block ssz"; @@ -523,13 +520,13 @@ impl MessageProcessor { self.network.disconnect(peer_id, GoodbyeReason::Fault); } }, - Err(e) => { + Err(_) => { + // error is logged during the processing therefore no error is logged here trace!( self.log, "Erroneous gossip attestation ssz"; "ssz" => format!("0x{}", hex::encode(msg.as_ssz_bytes())), ); - error!(self.log, "Invalid gossip attestation"; "error" => format!("{:?}", e)); } } } diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 20668fb9c6..7ddd9cd26a 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -110,9 +110,9 @@ fn process_testnet_subcommand( if let Some(propagation_percentage_string) = cli_args.value_of("random-propagation") { let percentage = propagation_percentage_string .parse::() - .map_err(|_| format!("Unable to parse the propagation percentage"))?; + .map_err(|_| "Unable to parse the propagation percentage".to_string())?; if percentage > 100 { - return Err(format!("Propagation percentage greater than 100")); + return Err("Propagation percentage greater than 100".to_string()); } builder.client_config.network.propagation_percentage = Some(percentage); } @@ -255,7 +255,7 @@ fn process_testnet_subcommand( client_config.eth1.deposit_contract_address = "0x802dF6aAaCe28B2EEb1656bb18dF430dDC42cc2e".to_string(); - client_config.eth1.deposit_contract_deploy_block = 1487270; + client_config.eth1.deposit_contract_deploy_block = 1_487_270; client_config.eth1.follow_distance = 16; client_config.dummy_eth1_backend = false; @@ -608,7 +608,7 @@ impl ConfigBuilder { /// The supplied `cli_args` should be the base-level `clap` cli_args (i.e., not a subcommand /// cli_args). pub fn build(mut self, cli_args: &ArgMatches) -> Result { - self.client_config.apply_cli_args(cli_args, &mut self.log)?; + self.client_config.apply_cli_args(cli_args, &self.log)?; if let Some(bump) = cli_args.value_of("port-bump") { let bump = bump