From e0723dfc3b814c31646cedfa6ee8c581ecac0771 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 30 Apr 2020 17:12:26 +1000 Subject: [PATCH 1/8] Correctly notify delay queues (#1087) --- .../eth2-libp2p/src/peer_manager/mod.rs | 21 +++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/beacon_node/eth2-libp2p/src/peer_manager/mod.rs b/beacon_node/eth2-libp2p/src/peer_manager/mod.rs index 7060e82243..e09fbbb076 100644 --- a/beacon_node/eth2-libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2-libp2p/src/peer_manager/mod.rs @@ -318,24 +318,41 @@ impl Stream for PeerManager { fn poll(&mut self) -> Poll, Self::Error> { // poll the timeouts for pings and status' + // TODO: Remove task notifies and temporary vecs for stable futures + // These exist to handle a bug in delayqueue + let mut peers_to_add = Vec::new(); while let Async::Ready(Some(peer_id)) = self.ping_peers.poll().map_err(|e| { error!(self.log, "Failed to check for peers to ping"; "error" => format!("{}",e)); })? { debug!(self.log, "Pinging peer"; "peer_id" => format!("{}", peer_id)); // add the ping timer back - self.ping_peers.insert(peer_id.clone()); + peers_to_add.push(peer_id.clone()); self.events.push(PeerManagerEvent::Ping(peer_id)); } + if !peers_to_add.is_empty() { + futures::task::current().notify(); + } + while let Some(peer) = peers_to_add.pop() { + self.ping_peers.insert(peer); + } + while let Async::Ready(Some(peer_id)) = self.status_peers.poll().map_err(|e| { error!(self.log, "Failed to check for peers to status"; "error" => format!("{}",e)); })? { debug!(self.log, "Sending Status to peer"; "peer_id" => format!("{}", peer_id)); // add the status timer back - self.status_peers.insert(peer_id.clone()); + peers_to_add.push(peer_id.clone()); self.events.push(PeerManagerEvent::Status(peer_id)); } + if !peers_to_add.is_empty() { + futures::task::current().notify(); + } + while let Some(peer) = peers_to_add.pop() { + self.status_peers.insert(peer); + } + if !self.events.is_empty() { return Ok(Async::Ready(Some(self.events.remove(0)))); } else { From b6c027b9ecc77182d64f99594e6d4418764d01b8 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Thu, 30 Apr 2020 17:14:57 +1000 Subject: [PATCH 2/8] Follow distance fix (#1082) * Ensure eth1 follow distance is respected * Add more info! logs for eth1 * Improve builder log * Fix timestamp --- beacon_node/client/src/builder.rs | 4 +- beacon_node/eth1/src/service.rs | 78 +++++++++++++++++++++++++------ beacon_node/src/config.rs | 4 +- beacon_node/src/lib.rs | 1 + 4 files changed, 70 insertions(+), 17 deletions(-) diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index ad6443bd6d..ace8a91012 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -207,7 +207,9 @@ where info!( context.log, "Waiting for eth2 genesis from eth1"; - "eth1_node" => &config.eth1.endpoint + "eth1_endpoint" => &config.eth1.endpoint, + "contract_deploy_block" => config.eth1.deposit_contract_deploy_block, + "deposit_contract" => &config.eth1.deposit_contract_address ); let genesis_service = diff --git a/beacon_node/eth1/src/service.rs b/beacon_node/eth1/src/service.rs index 1d54c09ea0..ba97d34174 100644 --- a/beacon_node/eth1/src/service.rs +++ b/beacon_node/eth1/src/service.rs @@ -12,10 +12,10 @@ use futures::{ }; use parking_lot::{RwLock, RwLockReadGuard}; use serde::{Deserialize, Serialize}; -use slog::{debug, error, trace, Logger}; +use slog::{debug, error, info, trace, Logger}; use std::ops::{Range, RangeInclusive}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use tokio::timer::Delay; const STANDARD_TIMEOUT_MILLIS: u64 = 15_000; @@ -61,19 +61,15 @@ pub enum Error { /// The success message for an Eth1Data cache update. #[derive(Debug, PartialEq, Clone)] -pub enum BlockCacheUpdateOutcome { - /// The cache was sucessfully updated. - Success { - blocks_imported: usize, - head_block_number: Option, - }, +pub struct BlockCacheUpdateOutcome { + pub blocks_imported: usize, + pub head_block_number: Option, } /// The success message for an Eth1 deposit cache update. #[derive(Debug, PartialEq, Clone)] -pub enum DepositCacheUpdateOutcome { - /// The cache was sucessfully updated. - Success { logs_imported: usize }, +pub struct DepositCacheUpdateOutcome { + pub logs_imported: usize, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -259,7 +255,7 @@ impl Service { .map_err(|e| format!("Failed to update eth1 cache: {:?}", e)) .then(move |result| { match &result { - Ok(DepositCacheUpdateOutcome::Success { logs_imported }) => trace!( + Ok(DepositCacheUpdateOutcome { logs_imported }) => trace!( log_a, "Updated eth1 deposit cache"; "cached_deposits" => inner_1.deposit_cache.read().cache.len(), @@ -281,7 +277,7 @@ impl Service { .map_err(|e| format!("Failed to update eth1 cache: {:?}", e)) .then(move |result| { match &result { - Ok(BlockCacheUpdateOutcome::Success { + Ok(BlockCacheUpdateOutcome { blocks_imported, head_block_number, }) => trace!( @@ -382,6 +378,7 @@ impl Service { ) -> impl Future { let service_1 = self.clone(); let service_2 = self.clone(); + let service_3 = self.clone(); let blocks_per_log_query = self.config().blocks_per_log_query; let max_log_requests_per_update = self .config() @@ -484,7 +481,26 @@ impl Service { Ok(sum) }) - .map(|logs_imported| DepositCacheUpdateOutcome::Success { logs_imported }) + .map(move |logs_imported| { + if logs_imported > 0 { + info!( + service_3.log, + "Imported deposit log(s)"; + "latest_block" => service_3.inner.deposit_cache.read().cache.latest_block_number(), + "total" => service_3.deposit_cache_len(), + "new" => logs_imported + ); + } else { + debug!( + service_3.log, + "No new deposits found"; + "latest_block" => service_3.inner.deposit_cache.read().cache.latest_block_number(), + "total_deposits" => service_3.deposit_cache_len(), + ); + } + + DepositCacheUpdateOutcome { logs_imported } + }) }) } @@ -507,6 +523,8 @@ impl Service { let cache_5 = self.inner.clone(); let cache_6 = self.inner.clone(); + let service_1 = self.clone(); + let block_cache_truncation = self.config().block_cache_truncation; let max_blocks_per_update = self .config() @@ -613,7 +631,37 @@ impl Service { cache_4.block_cache.read().len() as i64, ); - Ok(BlockCacheUpdateOutcome::Success { + let block_cache = service_1.inner.block_cache.read(); + let latest_block_mins = block_cache + .latest_block_timestamp() + .and_then(|timestamp| { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .ok() + .and_then(|now| now.checked_sub(Duration::from_secs(timestamp))) + }) + .map(|duration| format!("{} mins", duration.as_secs() / 60)) + .unwrap_or_else(|| "n/a".into()); + + if blocks_imported > 0 { + info!( + service_1.log, + "Imported eth1 block(s)"; + "latest_block_age" => latest_block_mins, + "latest_block" => block_cache.highest_block_number(), + "total_cached_blocks" => block_cache.len(), + "new" => blocks_imported + ); + } else { + debug!( + service_1.log, + "No new eth1 blocks imported"; + "latest_block" => block_cache.highest_block_number(), + "cached_blocks" => block_cache.len(), + ); + } + + Ok(BlockCacheUpdateOutcome { blocks_imported, head_block_number: cache_4.block_cache.read().highest_block_number(), }) diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index cd425c1a09..b852bb7f3d 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -12,7 +12,7 @@ use std::io::prelude::*; use std::net::{IpAddr, Ipv4Addr}; use std::net::{TcpListener, UdpSocket}; use std::path::PathBuf; -use types::EthSpec; +use types::{ChainSpec, EthSpec}; pub const CLIENT_CONFIG_FILENAME: &str = "beacon-node.toml"; pub const BEACON_NODE_DIR: &str = "beacon"; @@ -29,6 +29,7 @@ pub const NETWORK_DIR: &str = "network"; pub fn get_config( cli_args: &ArgMatches, spec_constants: &str, + spec: &ChainSpec, log: Logger, ) -> Result { let mut client_config = ClientConfig::default(); @@ -331,6 +332,7 @@ pub fn get_config( eth2_testnet_config.deposit_contract_deploy_block; client_config.eth1.lowest_cached_block_number = client_config.eth1.deposit_contract_deploy_block; + client_config.eth1.follow_distance = spec.eth1_follow_distance; if let Some(mut boot_nodes) = eth2_testnet_config.boot_enr { client_config.network.boot_nodes.append(&mut boot_nodes) diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index 6c37b67944..20870a99a3 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -58,6 +58,7 @@ impl ProductionBeaconNode { get_config::( &matches, &context.eth2_config.spec_constants, + &context.eth2_config().spec, context.log.clone(), ) .into_future() From b4a1a2e483fdcc20348d7310c7595af9a829ffe4 Mon Sep 17 00:00:00 2001 From: divma Date: Sun, 3 May 2020 08:17:12 -0500 Subject: [PATCH 3/8] Better handling of RPC errors and RPC conn with the PeerManager (#1047) --- beacon_node/eth2-libp2p/src/behaviour.rs | 16 +- beacon_node/eth2-libp2p/src/discovery/mod.rs | 2 +- .../eth2-libp2p/src/peer_manager/client.rs | 4 +- .../eth2-libp2p/src/peer_manager/mod.rs | 291 +++++++++++++----- .../eth2-libp2p/src/peer_manager/peerdb.rs | 96 ++++-- beacon_node/eth2-libp2p/src/rpc/codec/base.rs | 14 +- beacon_node/eth2-libp2p/src/rpc/codec/mod.rs | 6 +- beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs | 46 +-- .../eth2-libp2p/src/rpc/codec/ssz_snappy.rs | 39 +-- beacon_node/eth2-libp2p/src/rpc/handler.rs | 277 +++++++++-------- beacon_node/eth2-libp2p/src/rpc/methods.rs | 77 +++-- beacon_node/eth2-libp2p/src/rpc/mod.rs | 18 +- beacon_node/eth2-libp2p/src/rpc/protocol.rs | 148 ++++----- beacon_node/eth2-libp2p/tests/rpc_tests.rs | 30 +- beacon_node/network/src/router/mod.rs | 43 ++- beacon_node/network/src/router/processor.rs | 12 +- 16 files changed, 656 insertions(+), 463 deletions(-) diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index bc8be50141..7691669b23 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -306,7 +306,7 @@ impl Behaviour Behaviour let bitfield = match enr.bitfield::() { Ok(v) => v, Err(e) => { - warn!(self.log, "Peer has invalid ENR bitfield"; + warn!(self.log, "Peer has invalid ENR bitfield"; "peer_id" => format!("{}", peer_id), "error" => format!("{:?}", e)); return; @@ -435,22 +435,26 @@ impl // send the requested meta-data self.send_meta_data_response(id, peer_id); } - RPCEvent::Response(_, RPCErrorResponse::Success(RPCResponse::Pong(ping))) => { + RPCEvent::Response(_, RPCCodedResponse::Success(RPCResponse::Pong(ping))) => { self.peer_manager.pong_response(&peer_id, ping.data); } RPCEvent::Response( _, - RPCErrorResponse::Success(RPCResponse::MetaData(meta_data)), + RPCCodedResponse::Success(RPCResponse::MetaData(meta_data)), ) => { self.peer_manager.meta_data_response(&peer_id, meta_data); } RPCEvent::Request(_, RPCRequest::Status(_)) - | RPCEvent::Response(_, RPCErrorResponse::Success(RPCResponse::Status(_))) => { + | RPCEvent::Response(_, RPCCodedResponse::Success(RPCResponse::Status(_))) => { // inform the peer manager that we have received a status from a peer self.peer_manager.peer_statusd(&peer_id); // propagate the STATUS message upwards self.events.push(BehaviourEvent::RPC(peer_id, rpc_event)); } + RPCEvent::Error(_, protocol, ref err) => { + self.peer_manager.handle_rpc_error(&peer_id, protocol, err); + self.events.push(BehaviourEvent::RPC(peer_id, rpc_event)); + } _ => { // propagate all other RPC messages upwards self.events.push(BehaviourEvent::RPC(peer_id, rpc_event)) diff --git a/beacon_node/eth2-libp2p/src/discovery/mod.rs b/beacon_node/eth2-libp2p/src/discovery/mod.rs index 2cd40b8a15..4544656471 100644 --- a/beacon_node/eth2-libp2p/src/discovery/mod.rs +++ b/beacon_node/eth2-libp2p/src/discovery/mod.rs @@ -258,7 +258,7 @@ impl Discovery { .network_globals .peers .read() - .peers_on_subnet(&subnet_id) + .peers_on_subnet(subnet_id) .count() as u64; if peers_on_subnet < TARGET_SUBNET_PEERS { diff --git a/beacon_node/eth2-libp2p/src/peer_manager/client.rs b/beacon_node/eth2-libp2p/src/peer_manager/client.rs index 3ba68faaa3..36a67325b5 100644 --- a/beacon_node/eth2-libp2p/src/peer_manager/client.rs +++ b/beacon_node/eth2-libp2p/src/peer_manager/client.rs @@ -98,7 +98,7 @@ impl std::fmt::Display for Client { // helper function to identify clients from their agent_version. Returns the client // kind and it's associated version and the OS kind. fn client_from_agent_version(agent_version: &str) -> (ClientKind, String, String) { - let mut agent_split = agent_version.split("/"); + let mut agent_split = agent_version.split('/'); match agent_split.next() { Some("Lighthouse") => { let kind = ClientKind::Lighthouse; @@ -116,7 +116,7 @@ fn client_from_agent_version(agent_version: &str) -> (ClientKind, String, String let kind = ClientKind::Teku; let mut version = String::from("unknown"); let mut os_version = version.clone(); - if let Some(_) = agent_split.next() { + if agent_split.next().is_some() { if let Some(agent_version) = agent_split.next() { version = agent_version.into(); if let Some(agent_os_version) = agent_split.next() { diff --git a/beacon_node/eth2-libp2p/src/peer_manager/mod.rs b/beacon_node/eth2-libp2p/src/peer_manager/mod.rs index e09fbbb076..95963696df 100644 --- a/beacon_node/eth2-libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2-libp2p/src/peer_manager/mod.rs @@ -2,7 +2,7 @@ pub use self::peerdb::*; use crate::metrics; -use crate::rpc::MetaData; +use crate::rpc::{MetaData, Protocol, RPCError, RPCResponseErrorCode}; use crate::{NetworkGlobals, PeerId}; use futures::prelude::*; use futures::Stream; @@ -10,6 +10,7 @@ use hashmap_delay::HashSetDelay; use libp2p::identify::IdentifyInfo; use slog::{crit, debug, error, warn}; use smallvec::SmallVec; +use std::convert::TryInto; use std::sync::Arc; use std::time::{Duration, Instant}; use types::EthSpec; @@ -19,11 +20,11 @@ mod peer_info; mod peer_sync_status; mod peerdb; -pub use peer_info::PeerInfo; +pub use peer_info::{PeerConnectionStatus::*, PeerInfo}; pub use peer_sync_status::{PeerSyncStatus, SyncInfo}; /// The minimum reputation before a peer is disconnected. -// Most likely this needs tweaking -const _MINIMUM_REPUTATION_BEFORE_BAN: Rep = 20; +// Most likely this needs tweaking. +const MIN_REP_BEFORE_BAN: Rep = 10; /// The time in seconds between re-status's peers. const STATUS_INTERVAL: u64 = 300; /// The time in seconds between PING events. We do not send a ping if the other peer as PING'd us within @@ -32,7 +33,7 @@ const PING_INTERVAL: u64 = 30; /// The main struct that handles peer's reputation and connection status. pub struct PeerManager { - /// Storage of network globals to access the PeerDB. + /// Storage of network globals to access the `PeerDB`. network_globals: Arc>, /// A queue of events that the `PeerManager` is waiting to produce. events: SmallVec<[PeerManagerEvent; 5]>, @@ -46,22 +47,45 @@ pub struct PeerManager { log: slog::Logger, } -/// A collection of actions a peer can perform which will adjust its reputation +/// A collection of actions a peer can perform which will adjust its reputation. /// Each variant has an associated reputation change. +// To easily assess the behaviour of reputation changes the number of variants should stay low, and +// somewhat generic. pub enum PeerAction { - /// The peer timed out on an RPC request/response. - _TimedOut = -10, - /// The peer sent and invalid request/response or encoding. - _InvalidMessage = -20, - /// The peer sent something objectively malicious. - _Malicious = -50, + /// We should not communicate more with this peer. + /// This action will cause the peer to get banned. + Fatal, + /// An error occurred with this peer but it is not necessarily malicious. + /// We have high tolerance for this actions: several occurrences are needed for a peer to get + /// kicked. + /// NOTE: ~15 occurrences will get the peer banned + HighToleranceError, + /// An error occurred with this peer but it is not necessarily malicious. + /// We have high tolerance for this actions: several occurrences are needed for a peer to get + /// kicked. + /// NOTE: ~10 occurrences will get the peer banned + MidToleranceError, + /// This peer's action is not malicious but will not be tolerated. A few occurrences will cause + /// the peer to get kicked. + /// NOTE: ~5 occurrences will get the peer banned + LowToleranceError, /// Received an expected message. - _ValidMessage = 20, - /// Peer disconnected. - Disconnected = -30, + _ValidMessage, } -/// The events that the PeerManager outputs (requests). +impl PeerAction { + fn rep_change(&self) -> RepChange { + match self { + PeerAction::Fatal => RepChange::worst(), + PeerAction::LowToleranceError => RepChange::bad(60), + PeerAction::MidToleranceError => RepChange::bad(25), + PeerAction::HighToleranceError => RepChange::bad(15), + PeerAction::_ValidMessage => RepChange::good(20), + } + } +} + +/// The events that the `PeerManager` outputs (requests). pub enum PeerManagerEvent { /// Sends a STATUS to a peer. Status(PeerId), @@ -96,24 +120,27 @@ impl PeerManager { if let Some(peer_info) = self.network_globals.peers.read().peer_info(peer_id) { // received a ping // reset the to-ping timer for this peer - debug!(self.log, "Received a ping request"; "peer_id" => format!("{}", peer_id), "seq_no" => seq); + debug!(self.log, "Received a ping request"; "peer_id" => peer_id.to_string(), "seq_no" => seq); self.ping_peers.insert(peer_id.clone()); // if the sequence number is unknown send update the meta data of the peer. if let Some(meta_data) = &peer_info.meta_data { if meta_data.seq_number < seq { - debug!(self.log, "Requesting new metadata from peer"; "peer_id" => format!("{}", peer_id), "known_seq_no" => meta_data.seq_number, "ping_seq_no" => seq); + debug!(self.log, "Requesting new metadata from peer"; + "peer_id" => peer_id.to_string(), "known_seq_no" => meta_data.seq_number, "ping_seq_no" => seq); self.events .push(PeerManagerEvent::MetaData(peer_id.clone())); } } else { // if we don't know the meta-data, request it - debug!(self.log, "Requesting first metadata from peer"; "peer_id" => format!("{}", peer_id)); + debug!(self.log, "Requesting first metadata from peer"; + "peer_id" => peer_id.to_string()); self.events .push(PeerManagerEvent::MetaData(peer_id.clone())); } } else { - crit!(self.log, "Received a PING from an unknown peer"; "peer_id" => format!("{}", peer_id)); + crit!(self.log, "Received a PING from an unknown peer"; + "peer_id" => peer_id.to_string()); } } @@ -126,18 +153,20 @@ impl PeerManager { // if the sequence number is unknown send update the meta data of the peer. if let Some(meta_data) = &peer_info.meta_data { if meta_data.seq_number < seq { - debug!(self.log, "Requesting new metadata from peer"; "peer_id" => format!("{}", peer_id), "known_seq_no" => meta_data.seq_number, "pong_seq_no" => seq); + debug!(self.log, "Requesting new metadata from peer"; + "peer_id" => peer_id.to_string(), "known_seq_no" => meta_data.seq_number, "pong_seq_no" => seq); self.events .push(PeerManagerEvent::MetaData(peer_id.clone())); } } else { // if we don't know the meta-data, request it - debug!(self.log, "Requesting first metadata from peer"; "peer_id" => format!("{}", peer_id)); + debug!(self.log, "Requesting first metadata from peer"; + "peer_id" => peer_id.to_string()); self.events .push(PeerManagerEvent::MetaData(peer_id.clone())); } } else { - crit!(self.log, "Received a PONG from an unknown peer"; "peer_id" => format!("{}", peer_id)); + crit!(self.log, "Received a PONG from an unknown peer"; "peer_id" => peer_id.to_string()); } } @@ -147,18 +176,24 @@ impl PeerManager { if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { if let Some(known_meta_data) = &peer_info.meta_data { if known_meta_data.seq_number < meta_data.seq_number { - debug!(self.log, "Updating peer's metadata"; "peer_id" => format!("{}", peer_id), "known_seq_no" => known_meta_data.seq_number, "new_seq_no" => meta_data.seq_number); + debug!(self.log, "Updating peer's metadata"; + "peer_id" => peer_id.to_string(), "known_seq_no" => known_meta_data.seq_number, "new_seq_no" => meta_data.seq_number); peer_info.meta_data = Some(meta_data); } else { - warn!(self.log, "Received old metadata"; "peer_id" => format!("{}", peer_id), "known_seq_no" => known_meta_data.seq_number, "new_seq_no" => meta_data.seq_number); + // TODO: isn't this malicious/random behaviour? What happens if the seq_number + // is the same but the contents differ? + warn!(self.log, "Received old metadata"; + "peer_id" => peer_id.to_string(), "known_seq_no" => known_meta_data.seq_number, "new_seq_no" => meta_data.seq_number); } } else { // we have no meta-data for this peer, update - debug!(self.log, "Obtained peer's metadata"; "peer_id" => format!("{}", peer_id), "new_seq_no" => meta_data.seq_number); + debug!(self.log, "Obtained peer's metadata"; + "peer_id" => peer_id.to_string(), "new_seq_no" => meta_data.seq_number); peer_info.meta_data = Some(meta_data); } } else { - crit!(self.log, "Received METADATA from an unknown peer"; "peer_id" => format!("{}", peer_id)); + crit!(self.log, "Received METADATA from an unknown peer"; + "peer_id" => peer_id.to_string()); } } @@ -167,38 +202,12 @@ impl PeerManager { self.status_peers.insert(peer_id.clone()); } - /// Checks the reputation of a peer and if it is too low, bans it and - /// sends the corresponding event. Informs if it got banned - fn _gets_banned(&mut self, peer_id: &PeerId) -> bool { - // if the peer was already banned don't inform again - let mut peerdb = self.network_globals.peers.write(); - - if let Some(connection_status) = peerdb.connection_status(peer_id) { - if peerdb.reputation(peer_id) < _MINIMUM_REPUTATION_BEFORE_BAN - && !connection_status.is_banned() - { - peerdb.ban(peer_id); - self.events - .push(PeerManagerEvent::_BanPeer(peer_id.clone())); - return true; - } - } - false - } - - /// Requests that a peer get disconnected. - pub fn _disconnect_peer(&mut self, peer_id: &PeerId) { - self.events - .push(PeerManagerEvent::_DisconnectPeer(peer_id.clone())); - } - /// Updates the state of the peer as disconnected. pub fn notify_disconnect(&mut self, peer_id: &PeerId) { self.update_reputations(); { let mut peerdb = self.network_globals.peers.write(); peerdb.disconnect(peer_id); - peerdb.add_reputation(peer_id, PeerAction::Disconnected as Rep); } // remove the ping and status timer for the peer @@ -223,35 +232,15 @@ impl PeerManager { self.connect_peer(peer_id, true) } - /// Provides a given peer's reputation if it exists. - pub fn _get_peer_rep(&self, peer_id: &PeerId) -> Rep { - self.network_globals.peers.read().reputation(peer_id) - } - - /// Updates the reputation of known peers according to their connection - /// status and the time that has passed. - pub fn update_reputations(&mut self) { - let now = Instant::now(); - let elapsed = (now - self.last_updated).as_secs(); - // 0 seconds means now - last_updated < 0, but (most likely) not = 0. - // In this case, do nothing (updating last_updated would propagate - // rounding errors) - if elapsed > 0 { - self.last_updated = now; - // TODO decide how reputations change with time. If they get too low - // set the peers as banned - } - } - /// Reports a peer for some action. /// /// If the peer doesn't exist, log a warning and insert defaults. - pub fn _report_peer(&mut self, peer_id: &PeerId, action: PeerAction) { + pub fn report_peer(&mut self, peer_id: &PeerId, action: PeerAction) { self.update_reputations(); self.network_globals .peers .write() - .add_reputation(peer_id, action as Rep); + .add_reputation(peer_id, action.rep_change()); self.update_reputations(); } @@ -261,10 +250,68 @@ impl PeerManager { peer_info.client = client::Client::from_identify_info(info); peer_info.listening_addresses = info.listen_addrs.clone(); } else { - crit!(self.log, "Received an Identify response from an unknown peer"; "peer_id" => format!("{}", peer_id)); + crit!(self.log, "Received an Identify response from an unknown peer"; "peer_id" => peer_id.to_string()); } } + pub fn handle_rpc_error(&mut self, peer_id: &PeerId, protocol: Protocol, err: &RPCError) { + debug!(self.log, "RPCError"; "protocol" => protocol.to_string(), "err" => err.to_string()); + + // Map this error to a `PeerAction` (if any) + let peer_action = match err { + RPCError::IncompleteStream => { + // They closed early, this could mean poor connection + PeerAction::MidToleranceError + } + RPCError::InternalError(_reason) => { + // Our fault. Do nothing + return; + } + RPCError::InvalidData => { + // Peer is not complying with the protocol. This is considered a malicious action + PeerAction::Fatal + } + RPCError::IoError(_e) => { + // this could their fault or ours, so we tolerate this + PeerAction::HighToleranceError + } + RPCError::ErrorResponse(code) => match code { + RPCResponseErrorCode::Unknown => PeerAction::HighToleranceError, + RPCResponseErrorCode::ServerError => PeerAction::MidToleranceError, + RPCResponseErrorCode::InvalidRequest => PeerAction::LowToleranceError, + }, + RPCError::SSZDecodeError(_) => PeerAction::Fatal, + RPCError::UnsupportedProtocol => { + // Not supporting a protocol shouldn't be considered a malicious action, but + // it is an action that in some cases will make the peer unfit to continue + // communicating. + // TODO: To avoid punishing a peer repeatedly for not supporting a protocol, this + // information could be stored and used to prevent sending requests for the given + // protocol to this peer. Similarly, to avoid blacklisting a peer for a protocol + // forever, if stored this information should expire. + match protocol { + Protocol::Ping => PeerAction::Fatal, + Protocol::BlocksByRange => return, + Protocol::BlocksByRoot => return, + Protocol::Goodbye => return, + Protocol::MetaData => PeerAction::LowToleranceError, + Protocol::Status => PeerAction::LowToleranceError, + } + } + RPCError::StreamTimeout => match protocol { + Protocol::Ping => PeerAction::LowToleranceError, + Protocol::BlocksByRange => PeerAction::MidToleranceError, + Protocol::BlocksByRoot => PeerAction::MidToleranceError, + Protocol::Goodbye => return, + Protocol::MetaData => return, + Protocol::Status => return, + }, + RPCError::NegotiationTimeout => PeerAction::HighToleranceError, + }; + + self.report_peer(peer_id, peer_action); + } + /* Internal functions */ /// Registers a peer as connected. The `ingoing` parameter determines if the peer is being @@ -275,7 +322,7 @@ impl PeerManager { /// This informs if the peer was accepted in to the db or not. // TODO: Drop peers if over max_peer limit fn connect_peer(&mut self, peer_id: &PeerId, outgoing: bool) -> bool { - // TODO: Call this on a timer + // TODO: remove after timed updates self.update_reputations(); { @@ -288,7 +335,7 @@ impl PeerManager { if outgoing { peerdb.connect_outgoing(peer_id); } else { - peerdb.connect_outgoing(peer_id); + peerdb.connect_ingoing(peer_id); } } @@ -310,6 +357,86 @@ impl PeerManager { pub fn _dialing_peer(&mut self, peer_id: &PeerId) { self.network_globals.peers.write().dialing_peer(peer_id); } + + /// Updates the reputation of known peers according to their connection + /// status and the time that has passed. + /// + /// **Disconnected peers** get a 1rep hit every hour they stay disconnected. + /// **Banned peers** get a 1rep gain for every hour to slowly allow them back again. + /// + /// A banned(disconnected) peer that gets its rep above(below) MIN_REP_BEFORE_BAN is + /// now considered a disconnected(banned) peer. + fn update_reputations(&mut self) { + // avoid locking the peerdb too often + // TODO: call this on a timer + if self.last_updated.elapsed().as_secs() < 30 { + return; + } + + let now = Instant::now(); + + // Check for peers that get banned, unbanned and that should be disconnected + let mut ban_queue = Vec::new(); + let mut unban_queue = Vec::new(); + + /* Check how long have peers been in this state and update their reputations if needed */ + let mut pdb = self.network_globals.peers.write(); + + for (id, info) in pdb.peers_mut() { + // Update reputations + match info.connection_status { + Connected { .. } => { + // Connected peers gain reputation by sending useful messages + } + Disconnected { since } | Banned { since } => { + // For disconnected peers, lower their reputation by 1 for every hour they + // stay disconnected. This helps us slowly forget disconnected peers. + // In the same way, slowly allow banned peers back again. + let dc_hours = (now - since).as_secs() / 3600; + let last_dc_hours = (self.last_updated - since).as_secs() / 3600; + if dc_hours > last_dc_hours { + // this should be 1 most of the time + let rep_dif = (dc_hours - last_dc_hours) + .try_into() + .unwrap_or(Rep::max_value()); + + info.reputation = if info.connection_status.is_banned() { + info.reputation.saturating_add(rep_dif) + } else { + info.reputation.saturating_sub(rep_dif) + }; + } + } + Dialing { since } => { + // A peer shouldn't be dialing for more than 2 minutes + if since.elapsed().as_secs() > 120 { + warn!(self.log,"Peer has been dialing for too long"; "peer_id" => id.to_string()); + // TODO: decide how to handle this + } + } + } + // Check if the peer gets banned or unbanned and if it should be disconnected + if info.reputation < MIN_REP_BEFORE_BAN && !info.connection_status.is_banned() { + // This peer gets banned. Check if we should request disconnection + ban_queue.push(id.clone()); + } else if info.reputation >= MIN_REP_BEFORE_BAN && info.connection_status.is_banned() { + // This peer gets unbanned + unban_queue.push(id.clone()); + } + } + + for id in ban_queue { + pdb.ban(&id); + + self.events.push(PeerManagerEvent::_BanPeer(id.clone())); + } + + for id in unban_queue { + pdb.disconnect(&id); + } + + self.last_updated = Instant::now(); + } } impl Stream for PeerManager { @@ -322,9 +449,9 @@ impl Stream for PeerManager { // These exist to handle a bug in delayqueue let mut peers_to_add = Vec::new(); while let Async::Ready(Some(peer_id)) = self.ping_peers.poll().map_err(|e| { - error!(self.log, "Failed to check for peers to ping"; "error" => format!("{}",e)); + error!(self.log, "Failed to check for peers to ping"; "error" => e.to_string()); })? { - debug!(self.log, "Pinging peer"; "peer_id" => format!("{}", peer_id)); + debug!(self.log, "Pinging peer"; "peer_id" => peer_id.to_string()); // add the ping timer back peers_to_add.push(peer_id.clone()); self.events.push(PeerManagerEvent::Ping(peer_id)); @@ -338,9 +465,9 @@ impl Stream for PeerManager { } while let Async::Ready(Some(peer_id)) = self.status_peers.poll().map_err(|e| { - error!(self.log, "Failed to check for peers to status"; "error" => format!("{}",e)); + error!(self.log, "Failed to check for peers to status"; "error" => e.to_string()); })? { - debug!(self.log, "Sending Status to peer"; "peer_id" => format!("{}", peer_id)); + debug!(self.log, "Sending Status to peer"; "peer_id" => peer_id.to_string()); // add the status timer back peers_to_add.push(peer_id.clone()); self.events.push(PeerManagerEvent::Status(peer_id)); diff --git a/beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs b/beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs index 066fa3736b..d5fa4bcf7f 100644 --- a/beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs +++ b/beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs @@ -7,11 +7,18 @@ use std::collections::HashMap; use std::time::Instant; use types::{EthSpec, SubnetId}; -/// A peer's reputation. -pub type Rep = i32; +/// A peer's reputation (perceived potential usefulness) +pub type Rep = u8; + +/// Reputation change (positive or negative) +pub struct RepChange { + is_good: bool, + diff: Rep, +} /// Max number of disconnected nodes to remember const MAX_DC_PEERS: usize = 30; + /// The default starting reputation for an unknown peer. pub const DEFAULT_REPUTATION: Rep = 50; @@ -25,6 +32,27 @@ pub struct PeerDB { log: slog::Logger, } +impl RepChange { + pub fn good(diff: Rep) -> Self { + RepChange { + is_good: true, + diff, + } + } + pub fn bad(diff: Rep) -> Self { + RepChange { + is_good: false, + diff, + } + } + pub const fn worst() -> Self { + RepChange { + is_good: false, + diff: Rep::max_value(), + } + } +} + impl PeerDB { pub fn new(log: &slog::Logger) -> Self { Self { @@ -48,6 +76,11 @@ impl PeerDB { self.peers.iter() } + /// Returns an iterator over all peers in the db. + pub(super) fn peers_mut(&mut self) -> impl Iterator)> { + self.peers.iter_mut() + } + /// Gives the ids of all known peers. pub fn peer_ids(&self) -> impl Iterator { self.peers.keys() @@ -59,6 +92,7 @@ impl PeerDB { } /// Returns a mutable reference to a peer's info if known. + /// TODO: make pub(super) to ensure that peer management is unified pub fn peer_info_mut(&mut self, peer_id: &PeerId) -> Option<&mut PeerInfo> { self.peers.get_mut(peer_id) } @@ -111,12 +145,11 @@ impl PeerDB { } /// Gives an iterator of all peers on a given subnet. - pub fn peers_on_subnet(&self, subnet_id: &SubnetId) -> impl Iterator { - let subnet_id_filter = subnet_id.clone(); + pub fn peers_on_subnet(&self, subnet_id: SubnetId) -> impl Iterator { self.peers .iter() .filter(move |(_, info)| { - info.connection_status.is_connected() && info.on_subnet(subnet_id_filter) + info.connection_status.is_connected() && info.on_subnet(subnet_id) }) .map(|(peer_id, _)| peer_id) } @@ -192,10 +225,7 @@ impl PeerDB { /// A peer is being dialed. pub fn dialing_peer(&mut self, peer_id: &PeerId) { - let info = self - .peers - .entry(peer_id.clone()) - .or_insert_with(|| Default::default()); + let info = self.peers.entry(peer_id.clone()).or_default(); if info.connection_status.is_disconnected() { self.n_dc -= 1; @@ -207,10 +237,7 @@ impl PeerDB { /// Sets a peer as connected with an ingoing connection. pub fn connect_ingoing(&mut self, peer_id: &PeerId) { - let info = self - .peers - .entry(peer_id.clone()) - .or_insert_with(|| Default::default()); + let info = self.peers.entry(peer_id.clone()).or_default(); if info.connection_status.is_disconnected() { self.n_dc -= 1; @@ -220,10 +247,7 @@ impl PeerDB { /// Sets a peer as connected with an outgoing connection. pub fn connect_outgoing(&mut self, peer_id: &PeerId) { - let info = self - .peers - .entry(peer_id.clone()) - .or_insert_with(|| Default::default()); + let info = self.peers.entry(peer_id.clone()).or_default(); if info.connection_status.is_disconnected() { self.n_dc -= 1; @@ -231,16 +255,16 @@ impl PeerDB { info.connection_status.connect_outgoing(); } - /// Sets the peer as disconnected. + /// Sets the peer as disconnected. A banned peer remains banned pub fn disconnect(&mut self, peer_id: &PeerId) { let log_ref = &self.log; let info = self.peers.entry(peer_id.clone()).or_insert_with(|| { warn!(log_ref, "Disconnecting unknown peer"; - "peer_id" => format!("{:?}",peer_id)); + "peer_id" => peer_id.to_string()); PeerInfo::default() }); - if !info.connection_status.is_disconnected() { + if !info.connection_status.is_disconnected() && !info.connection_status.is_banned() { info.connection_status.disconnect(); self.n_dc += 1; } @@ -269,7 +293,7 @@ impl PeerDB { let log_ref = &self.log; let info = self.peers.entry(peer_id.clone()).or_insert_with(|| { warn!(log_ref, "Banning unknown peer"; - "peer_id" => format!("{:?}",peer_id)); + "peer_id" => peer_id.to_string()); PeerInfo::default() }); if info.connection_status.is_disconnected() { @@ -283,16 +307,17 @@ impl PeerDB { if let Some(peer_info) = self.peers.get_mut(peer_id) { peer_info.meta_data = Some(meta_data); } else { - warn!(self.log, "Tried to add meta data for a non-existant peer"; "peer_id" => format!("{}", peer_id)); + warn!(self.log, "Tried to add meta data for a non-existant peer"; "peer_id" => peer_id.to_string()); } } /// Sets the reputation of peer. - pub fn set_reputation(&mut self, peer_id: &PeerId, rep: Rep) { + #[allow(dead_code)] + pub(super) fn set_reputation(&mut self, peer_id: &PeerId, rep: Rep) { if let Some(peer_info) = self.peers.get_mut(peer_id) { peer_info.reputation = rep; } else { - crit!(self.log, "Tried to modify reputation for an unknown peer"; "peer_id" => format!("{}",peer_id)); + crit!(self.log, "Tried to modify reputation for an unknown peer"; "peer_id" => peer_id.to_string()); } } @@ -301,20 +326,25 @@ impl PeerDB { if let Some(peer_info) = self.peers.get_mut(peer_id) { peer_info.sync_status = sync_status; } else { - crit!(self.log, "Tried to the sync status for an unknown peer"; "peer_id" => format!("{}",peer_id)); + crit!(self.log, "Tried to the sync status for an unknown peer"; "peer_id" => peer_id.to_string()); } } /// Adds to a peer's reputation by `change`. If the reputation exceeds Rep's /// upper (lower) bounds, it stays at the maximum (minimum) value. - pub fn add_reputation(&mut self, peer_id: &PeerId, change: Rep) { + pub(super) fn add_reputation(&mut self, peer_id: &PeerId, change: RepChange) { let log_ref = &self.log; let info = self.peers.entry(peer_id.clone()).or_insert_with(|| { warn!(log_ref, "Adding to the reputation of an unknown peer"; - "peer_id" => format!("{:?}",peer_id)); + "peer_id" => peer_id.to_string()); PeerInfo::default() }); - info.reputation = info.reputation.saturating_add(change); + + info.reputation = if change.is_good { + info.reputation.saturating_add(change.diff) + } else { + info.reputation.saturating_sub(change.diff) + }; } } @@ -396,14 +426,20 @@ mod tests { // 0 change does not change de reputation let random_peer = PeerId::random(); - let change: Rep = 0; + let change = RepChange::good(0); pdb.connect_ingoing(&random_peer); pdb.add_reputation(&random_peer, change); assert_eq!(pdb.reputation(&random_peer), DEFAULT_REPUTATION); // overflowing change is capped let random_peer = PeerId::random(); - let change = Rep::max_value(); + let change = RepChange::worst(); + pdb.connect_ingoing(&random_peer); + pdb.add_reputation(&random_peer, change); + assert_eq!(pdb.reputation(&random_peer), Rep::min_value()); + + let random_peer = PeerId::random(); + let change = RepChange::good(Rep::max_value()); pdb.connect_ingoing(&random_peer); pdb.add_reputation(&random_peer, change); assert_eq!(pdb.reputation(&random_peer), Rep::max_value()); diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/base.rs b/beacon_node/eth2-libp2p/src/rpc/codec/base.rs index 48b537c7bc..43f0f494c2 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/base.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/base.rs @@ -1,6 +1,6 @@ //! This handles the various supported encoding mechanism for the Eth 2.0 RPC. -use crate::rpc::{ErrorMessage, RPCErrorResponse, RPCRequest, RPCResponse}; +use crate::rpc::{ErrorMessage, RPCCodedResponse, RPCRequest, RPCResponse}; use libp2p::bytes::BufMut; use libp2p::bytes::BytesMut; use std::marker::PhantomData; @@ -78,9 +78,9 @@ where impl Encoder for BaseInboundCodec where TSpec: EthSpec, - TCodec: Decoder + Encoder>, + TCodec: Decoder + Encoder>, { - type Item = RPCErrorResponse; + type Item = RPCCodedResponse; type Error = ::Error; fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { @@ -130,7 +130,7 @@ where TSpec: EthSpec, TCodec: OutboundCodec + Decoder>, { - type Item = RPCErrorResponse; + type Item = RPCCodedResponse; type Error = ::Error; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { @@ -146,17 +146,17 @@ where }); let inner_result = { - if RPCErrorResponse::::is_response(response_code) { + if RPCCodedResponse::::is_response(response_code) { // decode an actual response and mutates the buffer if enough bytes have been read // returning the result. self.inner .decode(src) - .map(|r| r.map(RPCErrorResponse::Success)) + .map(|r| r.map(RPCCodedResponse::Success)) } else { // decode an error self.inner .decode_error(src) - .map(|r| r.map(|resp| RPCErrorResponse::from_error(response_code, resp))) + .map(|r| r.map(|resp| RPCCodedResponse::from_error(response_code, resp))) } }; // if the inner decoder was capable of decoding a chunk, we need to reset the current diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs b/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs index f1b7f74daf..1fd97a78b3 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs @@ -6,7 +6,7 @@ use self::base::{BaseInboundCodec, BaseOutboundCodec}; use self::ssz::{SSZInboundCodec, SSZOutboundCodec}; use self::ssz_snappy::{SSZSnappyInboundCodec, SSZSnappyOutboundCodec}; use crate::rpc::protocol::RPCError; -use crate::rpc::{RPCErrorResponse, RPCRequest}; +use crate::rpc::{RPCCodedResponse, RPCRequest}; use libp2p::bytes::BytesMut; use tokio::codec::{Decoder, Encoder}; use types::EthSpec; @@ -23,7 +23,7 @@ pub enum OutboundCodec { } impl Encoder for InboundCodec { - type Item = RPCErrorResponse; + type Item = RPCCodedResponse; type Error = RPCError; fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { @@ -59,7 +59,7 @@ impl Encoder for OutboundCodec { } impl Decoder for OutboundCodec { - type Item = RPCErrorResponse; + type Item = RPCCodedResponse; type Error = RPCError; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs index bd8cbfd9f8..37ea4eac55 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs @@ -3,7 +3,7 @@ use crate::rpc::{ codec::base::OutboundCodec, protocol::{Encoding, Protocol, ProtocolId, RPCError, Version}, }; -use crate::rpc::{ErrorMessage, RPCErrorResponse, RPCRequest, RPCResponse}; +use crate::rpc::{ErrorMessage, RPCCodedResponse, RPCRequest, RPCResponse}; use libp2p::bytes::{BufMut, Bytes, BytesMut}; use ssz::{Decode, Encode}; use std::marker::PhantomData; @@ -37,22 +37,22 @@ impl SSZInboundCodec { // Encoder for inbound streams: Encodes RPC Responses sent to peers. impl Encoder for SSZInboundCodec { - type Item = RPCErrorResponse; + type Item = RPCCodedResponse; type Error = RPCError; fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { let bytes = match item { - RPCErrorResponse::Success(resp) => match resp { + RPCCodedResponse::Success(resp) => match resp { RPCResponse::Status(res) => res.as_ssz_bytes(), RPCResponse::BlocksByRange(res) => res.as_ssz_bytes(), RPCResponse::BlocksByRoot(res) => res.as_ssz_bytes(), RPCResponse::Pong(res) => res.data.as_ssz_bytes(), RPCResponse::MetaData(res) => res.as_ssz_bytes(), }, - RPCErrorResponse::InvalidRequest(err) => err.as_ssz_bytes(), - RPCErrorResponse::ServerError(err) => err.as_ssz_bytes(), - RPCErrorResponse::Unknown(err) => err.as_ssz_bytes(), - RPCErrorResponse::StreamTermination(_) => { + RPCCodedResponse::InvalidRequest(err) => err.as_ssz_bytes(), + RPCCodedResponse::ServerError(err) => err.as_ssz_bytes(), + RPCCodedResponse::Unknown(err) => err.as_ssz_bytes(), + RPCCodedResponse::StreamTermination(_) => { unreachable!("Code error - attempting to encode a stream termination") } }; @@ -107,9 +107,7 @@ impl Decoder for SSZInboundCodec { Protocol::MetaData => match self.protocol.version { Version::V1 => { if packet.len() > 0 { - Err(RPCError::Custom( - "Get metadata request should be empty".into(), - )) + Err(RPCError::InvalidData) } else { Ok(Some(RPCRequest::MetaData(PhantomData))) } @@ -183,32 +181,20 @@ impl Decoder for SSZOutboundCodec { src.clear(); match self.protocol.message_name { Protocol::Status => match self.protocol.version { - Version::V1 => Err(RPCError::Custom( - "Status stream terminated unexpectedly".into(), - )), // cannot have an empty HELLO message. The stream has terminated unexpectedly + Version::V1 => Err(RPCError::IncompleteStream), // cannot have an empty HELLO message. The stream has terminated unexpectedly }, - Protocol::Goodbye => { - Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response")) - } + Protocol::Goodbye => Err(RPCError::InvalidData), Protocol::BlocksByRange => match self.protocol.version { - Version::V1 => Err(RPCError::Custom( - "Status stream terminated unexpectedly, empty block".into(), - )), // cannot have an empty block message. + Version::V1 => Err(RPCError::IncompleteStream), // cannot have an empty block message. }, Protocol::BlocksByRoot => match self.protocol.version { - Version::V1 => Err(RPCError::Custom( - "Status stream terminated unexpectedly, empty block".into(), - )), // cannot have an empty block message. + Version::V1 => Err(RPCError::IncompleteStream), // cannot have an empty block message. }, Protocol::Ping => match self.protocol.version { - Version::V1 => Err(RPCError::Custom( - "PING stream terminated unexpectedly".into(), - )), // cannot have an empty block message. + Version::V1 => Err(RPCError::IncompleteStream), // cannot have an empty block message. }, Protocol::MetaData => match self.protocol.version { - Version::V1 => Err(RPCError::Custom( - "Metadata stream terminated unexpectedly".into(), - )), // cannot have an empty block message. + Version::V1 => Err(RPCError::IncompleteStream), // cannot have an empty block message. }, } } else { @@ -223,9 +209,7 @@ impl Decoder for SSZOutboundCodec { StatusMessage::from_ssz_bytes(&raw_bytes)?, ))), }, - Protocol::Goodbye => { - Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response")) - } + Protocol::Goodbye => Err(RPCError::InvalidData), Protocol::BlocksByRange => match self.protocol.version { Version::V1 => Ok(Some(RPCResponse::BlocksByRange(Box::new( SignedBeaconBlock::from_ssz_bytes(&raw_bytes)?, diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/ssz_snappy.rs b/beacon_node/eth2-libp2p/src/rpc/codec/ssz_snappy.rs index e2f0db1ff4..c99d1f6fdd 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/ssz_snappy.rs @@ -3,7 +3,7 @@ use crate::rpc::{ codec::base::OutboundCodec, protocol::{Encoding, Protocol, ProtocolId, RPCError, Version}, }; -use crate::rpc::{ErrorMessage, RPCErrorResponse, RPCRequest, RPCResponse}; +use crate::rpc::{ErrorMessage, RPCCodedResponse, RPCRequest, RPCResponse}; use libp2p::bytes::BytesMut; use snap::read::FrameDecoder; use snap::write::FrameEncoder; @@ -45,28 +45,28 @@ impl SSZSnappyInboundCodec { // Encoder for inbound streams: Encodes RPC Responses sent to peers. impl Encoder for SSZSnappyInboundCodec { - type Item = RPCErrorResponse; + type Item = RPCCodedResponse; type Error = RPCError; fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { let bytes = match item { - RPCErrorResponse::Success(resp) => match resp { + RPCCodedResponse::Success(resp) => match resp { RPCResponse::Status(res) => res.as_ssz_bytes(), RPCResponse::BlocksByRange(res) => res.as_ssz_bytes(), RPCResponse::BlocksByRoot(res) => res.as_ssz_bytes(), RPCResponse::Pong(res) => res.data.as_ssz_bytes(), RPCResponse::MetaData(res) => res.as_ssz_bytes(), }, - RPCErrorResponse::InvalidRequest(err) => err.as_ssz_bytes(), - RPCErrorResponse::ServerError(err) => err.as_ssz_bytes(), - RPCErrorResponse::Unknown(err) => err.as_ssz_bytes(), - RPCErrorResponse::StreamTermination(_) => { + RPCCodedResponse::InvalidRequest(err) => err.as_ssz_bytes(), + RPCCodedResponse::ServerError(err) => err.as_ssz_bytes(), + RPCCodedResponse::Unknown(err) => err.as_ssz_bytes(), + RPCCodedResponse::StreamTermination(_) => { unreachable!("Code error - attempting to encode a stream termination") } }; // SSZ encoded bytes should be within `max_packet_size` if bytes.len() > self.max_packet_size { - return Err(RPCError::Custom( + return Err(RPCError::InternalError( "attempting to encode data > max_packet_size".into(), )); } @@ -106,9 +106,7 @@ impl Decoder for SSZSnappyInboundCodec { // Should not attempt to decode rpc chunks with length > max_packet_size if length > self.max_packet_size { - return Err(RPCError::Custom( - "attempting to decode data > max_packet_size".into(), - )); + return Err(RPCError::InvalidData); } let mut reader = FrameDecoder::new(Cursor::new(&src)); let mut decoded_buffer = vec![0; length]; @@ -148,9 +146,7 @@ impl Decoder for SSZSnappyInboundCodec { Protocol::MetaData => match self.protocol.version { Version::V1 => { if decoded_buffer.len() > 0 { - Err(RPCError::Custom( - "Get metadata request should be empty".into(), - )) + Err(RPCError::InvalidData) } else { Ok(Some(RPCRequest::MetaData(PhantomData))) } @@ -212,8 +208,8 @@ impl Encoder for SSZSnappyOutboundCodec { }; // SSZ encoded bytes should be within `max_packet_size` if bytes.len() > self.max_packet_size { - return Err(RPCError::Custom( - "attempting to encode data > max_packet_size".into(), + return Err(RPCError::InternalError( + "attempting to encode data > max_packet_size", )); } @@ -257,9 +253,7 @@ impl Decoder for SSZSnappyOutboundCodec { // Should not attempt to decode rpc chunks with length > max_packet_size if length > self.max_packet_size { - return Err(RPCError::Custom( - "attempting to decode data > max_packet_size".into(), - )); + return Err(RPCError::InvalidData); } let mut reader = FrameDecoder::new(Cursor::new(&src)); let mut decoded_buffer = vec![0; length]; @@ -276,7 +270,8 @@ impl Decoder for SSZSnappyOutboundCodec { ))), }, Protocol::Goodbye => { - Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response")) + // Goodbye does not have a response + Err(RPCError::InvalidData) } Protocol::BlocksByRange => match self.protocol.version { Version::V1 => Ok(Some(RPCResponse::BlocksByRange(Box::new( @@ -330,9 +325,7 @@ impl OutboundCodec for SSZSnappyOutboundCodec { // Should not attempt to decode rpc chunks with length > max_packet_size if length > self.max_packet_size { - return Err(RPCError::Custom( - "attempting to decode data > max_packet_size".into(), - )); + return Err(RPCError::InvalidData); } let mut reader = FrameDecoder::new(Cursor::new(&src)); let mut decoded_buffer = vec![0; length]; diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2-libp2p/src/rpc/handler.rs index d8ff541c9d..d8fa347abd 100644 --- a/beacon_node/eth2-libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2-libp2p/src/rpc/handler.rs @@ -1,14 +1,16 @@ #![allow(clippy::type_complexity)] #![allow(clippy::cognitive_complexity)] -use super::methods::{ErrorMessage, RPCErrorResponse, RequestId, ResponseTermination}; -use super::protocol::{RPCError, RPCProtocol, RPCRequest}; +use super::methods::{ErrorMessage, RPCCodedResponse, RequestId, ResponseTermination}; +use super::protocol::{Protocol, RPCError, RPCProtocol, RPCRequest}; use super::RPCEvent; use crate::rpc::protocol::{InboundFramed, OutboundFramed}; use core::marker::PhantomData; use fnv::FnvHashMap; use futures::prelude::*; -use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError}; +use libp2p::core::upgrade::{ + InboundUpgrade, NegotiationError, OutboundUpgrade, ProtocolError, UpgradeError, +}; use libp2p::swarm::protocols_handler::{ KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, }; @@ -46,13 +48,13 @@ where listen_protocol: SubstreamProtocol>, /// If something bad happened and we should shut down the handler with an error. - pending_error: Vec<(RequestId, ProtocolsHandlerUpgrErr)>, + pending_error: Vec<(RequestId, Protocol, RPCError)>, /// Queue of events to produce in `poll()`. events_out: SmallVec<[RPCEvent; 4]>, /// Queue of outbound substreams to open. - dial_queue: SmallVec<[RPCEvent; 4]>, + dial_queue: SmallVec<[(RequestId, RPCRequest); 4]>, /// Current number of concurrent outbound substreams being opened. dial_negotiated: u32, @@ -63,6 +65,7 @@ where ( InboundSubstreamState, Option, + Protocol, ), >, @@ -73,14 +76,18 @@ where /// maintained by the application sending the request. outbound_substreams: FnvHashMap< OutboundRequestId, - (OutboundSubstreamState, delay_queue::Key), + ( + OutboundSubstreamState, + delay_queue::Key, + Protocol, + ), >, /// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout. outbound_substreams_delay: DelayQueue, /// Map of outbound items that are queued as the stream processes them. - queued_outbound_items: FnvHashMap>>, + queued_outbound_items: FnvHashMap>>, /// Sequential ID for waiting substreams. For inbound substreams, this is also the inbound request ID. current_inbound_substream_id: RequestId, @@ -152,17 +159,17 @@ where /// Moves the substream state to closing and informs the connected peer. The /// `queued_outbound_items` must be given as a parameter to add stream termination messages to /// the outbound queue. - pub fn close(&mut self, outbound_queue: &mut Vec>) { + pub fn close(&mut self, outbound_queue: &mut Vec>) { // When terminating a stream, report the stream termination to the requesting user via // an RPC error - let error = RPCErrorResponse::ServerError(ErrorMessage { - error_message: "Request timed out".as_bytes().to_vec(), + let error = RPCCodedResponse::ServerError(ErrorMessage { + error_message: b"Request timed out".to_vec(), }); // The stream termination type is irrelevant, this will terminate the // stream let stream_termination = - RPCErrorResponse::StreamTermination(ResponseTermination::BlocksByRange); + RPCCodedResponse::StreamTermination(ResponseTermination::BlocksByRange); match std::mem::replace(self, InboundSubstreamState::Poisoned) { InboundSubstreamState::ResponsePendingSend { substream, closing } => { @@ -244,10 +251,10 @@ where } /// Opens an outbound substream with a request. - pub fn send_request(&mut self, rpc_event: RPCEvent) { + pub fn send_request(&mut self, id: RequestId, req: RPCRequest) { self.keep_alive = KeepAlive::Yes; - self.dial_queue.push(rpc_event); + self.dial_queue.push((id, req)); } } @@ -262,7 +269,7 @@ where type Substream = TSubstream; type InboundProtocol = RPCProtocol; type OutboundProtocol = RPCRequest; - type OutboundOpenInfo = RPCEvent; // Keep track of the id and the request + type OutboundOpenInfo = (RequestId, RPCRequest); // Keep track of the id and the request fn listen_protocol(&self) -> SubstreamProtocol { self.listen_protocol.clone() @@ -292,7 +299,7 @@ where let awaiting_stream = InboundSubstreamState::ResponseIdle(substream); self.inbound_substreams.insert( self.current_inbound_substream_id, - (awaiting_stream, Some(delay_key)), + (awaiting_stream, Some(delay_key), req.protocol()), ); self.events_out @@ -303,7 +310,7 @@ where fn inject_fully_negotiated_outbound( &mut self, out: as OutboundUpgrade>::Output, - rpc_event: Self::OutboundOpenInfo, + request_info: Self::OutboundOpenInfo, ) { self.dial_negotiated -= 1; @@ -317,70 +324,80 @@ where } // add the stream to substreams if we expect a response, otherwise drop the stream. - match rpc_event { - RPCEvent::Request(mut id, request) if request.expect_response() => { - // outbound requests can be sent from various aspects of lighthouse which don't - // track request ids. In the future these will be flagged as None, currently they - // are flagged as 0. These can overlap. In this case, we pick the highest request - // Id available - if id == 0 && self.outbound_substreams.get(&id).is_some() { - // have duplicate outbound request with no id. Pick one that will not collide - let mut new_id = std::usize::MAX; - while self.outbound_substreams.get(&new_id).is_some() { - // panic all outbound substreams are full - new_id -= 1; - } - trace!(self.log, "New outbound stream id created"; "id" => new_id); - id = RequestId::from(new_id); - } - - // new outbound request. Store the stream and tag the output. - let delay_key = self - .outbound_substreams_delay - .insert(id, Duration::from_secs(RESPONSE_TIMEOUT)); - let awaiting_stream = OutboundSubstreamState::RequestPendingResponse { - substream: out, - request, - }; - if let Some(_) = self - .outbound_substreams - .insert(id, (awaiting_stream, delay_key)) - { - crit!(self.log, "Duplicate outbound substream id"; "id" => format!("{:?}", id)); + let (mut id, request) = request_info; + if request.expect_response() { + // outbound requests can be sent from various aspects of lighthouse which don't + // track request ids. In the future these will be flagged as None, currently they + // are flagged as 0. These can overlap. In this case, we pick the highest request + // Id available + if id == 0 && self.outbound_substreams.get(&id).is_some() { + // have duplicate outbound request with no id. Pick one that will not collide + let mut new_id = std::usize::MAX; + while self.outbound_substreams.get(&new_id).is_some() { + // panic all outbound substreams are full + new_id -= 1; } + trace!(self.log, "New outbound stream id created"; "id" => new_id); + id = RequestId::from(new_id); } - _ => { // a response is not expected, drop the stream for all other requests + + // new outbound request. Store the stream and tag the output. + let delay_key = self + .outbound_substreams_delay + .insert(id, Duration::from_secs(RESPONSE_TIMEOUT)); + let protocol = request.protocol(); + let awaiting_stream = OutboundSubstreamState::RequestPendingResponse { + substream: out, + request, + }; + if let Some(_) = self + .outbound_substreams + .insert(id, (awaiting_stream, delay_key, protocol)) + { + crit!(self.log, "Duplicate outbound substream id"; "id" => format!("{:?}", id)); } } } - // Note: If the substream has closed due to inactivity, or the substream is in the + // NOTE: If the substream has closed due to inactivity, or the substream is in the // wrong state a response will fail silently. fn inject_event(&mut self, rpc_event: Self::InEvent) { match rpc_event { - RPCEvent::Request(_, _) => self.send_request(rpc_event), + RPCEvent::Request(id, req) => self.send_request(id, req), RPCEvent::Response(rpc_id, response) => { - // check if the stream matching the response still exists - // variables indicating if the response is an error response or a multi-part + // Variables indicating if the response is an error response or a multi-part // response let res_is_error = response.is_error(); let res_is_multiple = response.multiple_responses(); + // check if the stream matching the response still exists match self.inbound_substreams.get_mut(&rpc_id) { - Some((substream_state, _)) => { + Some((substream_state, _, protocol)) => { match std::mem::replace(substream_state, InboundSubstreamState::Poisoned) { InboundSubstreamState::ResponseIdle(substream) => { // close the stream if there is no response - if let RPCErrorResponse::StreamTermination(_) = response { - //trace!(self.log, "Stream termination sent. Ending the stream"); - *substream_state = InboundSubstreamState::Closing(substream); - } else { - // send the response - // if it's a single rpc request or an error, close the stream after - *substream_state = InboundSubstreamState::ResponsePendingSend { - substream: substream.send(response), - closing: !res_is_multiple | res_is_error, // close if an error or we are not expecting more responses - }; + match response { + RPCCodedResponse::StreamTermination(_) => { + //trace!(self.log, "Stream termination sent. Ending the stream"); + *substream_state = + InboundSubstreamState::Closing(substream); + } + _ => { + if let Some(error_code) = response.error_code() { + self.pending_error.push(( + rpc_id, + *protocol, + RPCError::ErrorResponse(error_code), + )); + } + // send the response + // if it's a single rpc request or an error, close the stream after + *substream_state = + InboundSubstreamState::ResponsePendingSend { + substream: substream.send(response), + closing: !res_is_multiple | res_is_error, // close if an error or we are not expecting more responses + }; + } } } InboundSubstreamState::ResponsePendingSend { substream, closing } @@ -416,39 +433,55 @@ where } } None => { - warn!(self.log, "Stream has expired. Response not sent"; "response" => format!("{}",response)); + warn!(self.log, "Stream has expired. Response not sent"; "response" => format!("{}", response)); } }; } // We do not send errors as responses - RPCEvent::Error(_, _) => {} + RPCEvent::Error(..) => {} } } fn inject_dial_upgrade_error( &mut self, - request: Self::OutboundOpenInfo, + request_info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr< >::Error, >, ) { + let (id, req) = request_info; if let ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(RPCError::IoError(_))) = error { self.outbound_io_error_retries += 1; if self.outbound_io_error_retries < IO_ERROR_RETRIES { - self.send_request(request); + self.send_request(id, req); return; } } + self.outbound_io_error_retries = 0; - // add the error - let request_id = { - if let RPCEvent::Request(id, _) = request { - id - } else { - 0 + // map the error + let rpc_error = match error { + ProtocolsHandlerUpgrErr::Timer => RPCError::InternalError("Timer failed"), + ProtocolsHandlerUpgrErr::Timeout => RPCError::NegotiationTimeout, + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => e, + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + RPCError::UnsupportedProtocol } + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select( + NegotiationError::ProtocolError(e), + )) => match e { + ProtocolError::IoError(io_err) => RPCError::IoError(io_err), + ProtocolError::InvalidProtocol => { + RPCError::InternalError("Protocol was deemed invalid") + } + ProtocolError::InvalidMessage | ProtocolError::TooManyProtocols => { + // Peer is sending invalid data during the negotiation phase, not + // participating in the protocol + RPCError::InvalidData + } + }, }; - self.pending_error.push((request_id, error)); + self.pending_error.push((id, req.protocol(), rpc_error)); } fn connection_keep_alive(&self) -> KeepAlive { @@ -461,46 +494,11 @@ where ProtocolsHandlerEvent, Self::Error, > { - if let Some((request_id, err)) = self.pending_error.pop() { - // Returning an error here will result in dropping the peer. - match err { - ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply( - RPCError::InvalidProtocol(protocol_string), - )) => { - // Peer does not support the protocol. - // TODO: We currently will not drop the peer, for maximal compatibility with - // other clients testing their software. In the future, we will need to decide - // which protocols are a bare minimum to support before kicking the peer. - error!(self.log, "Peer doesn't support the RPC protocol"; "protocol" => protocol_string); - return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( - RPCEvent::Error(request_id, RPCError::InvalidProtocol(protocol_string)), - ))); - } - ProtocolsHandlerUpgrErr::Timeout | ProtocolsHandlerUpgrErr::Timer => { - // negotiation timeout, mark the request as failed - debug!(self.log, "Active substreams before timeout"; "len" => self.outbound_substreams.len()); - return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( - RPCEvent::Error( - request_id, - RPCError::Custom("Protocol negotiation timeout".into()), - ), - ))); - } - ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)) => { - // IO/Decode/Custom Error, report to the application - debug!(self.log, "Upgrade Error"; "error" => format!("{}",err)); - return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( - RPCEvent::Error(request_id, err), - ))); - } - ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) => { - // Error during negotiation - debug!(self.log, "Upgrade Error"; "error" => format!("{}",err)); - return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( - RPCEvent::Error(request_id, RPCError::Custom(format!("{}", err))), - ))); - } - } + if !self.pending_error.is_empty() { + let (id, protocol, err) = self.pending_error.remove(0); + return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( + RPCEvent::Error(id, protocol, err), + ))); } // return any events that need to be reported @@ -522,7 +520,7 @@ where let rpc_id = stream_id.get_ref(); // handle a stream timeout for various states - if let Some((substream_state, delay_key)) = self.inbound_substreams.get_mut(rpc_id) { + if let Some((substream_state, delay_key, _)) = self.inbound_substreams.get_mut(rpc_id) { // the delay has been removed *delay_key = None; @@ -541,14 +539,16 @@ where ProtocolsHandlerUpgrErr::Timer })? { - self.outbound_substreams.remove(stream_id.get_ref()); - // notify the user - return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( - RPCEvent::Error( - *stream_id.get_ref(), - RPCError::Custom("Stream timed out".into()), - ), - ))); + if let Some((_id, _stream, protocol)) = + self.outbound_substreams.remove(stream_id.get_ref()) + { + // notify the user + return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( + RPCEvent::Error(*stream_id.get_ref(), protocol, RPCError::StreamTimeout), + ))); + } else { + crit!(self.log, "timed out substream not in the books"; "stream_id" => stream_id.get_ref()); + } } // drive inbound streams that need to be processed @@ -598,9 +598,10 @@ where if let Some(delay_key) = &entry.get().1 { self.inbound_substreams_delay.remove(delay_key); } + let protocol = entry.get().2; entry.remove_entry(); return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( - RPCEvent::Error(0, e), + RPCEvent::Error(0, protocol, e), ))); } }; @@ -696,7 +697,7 @@ where return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( RPCEvent::Response( request_id, - RPCErrorResponse::StreamTermination( + RPCCodedResponse::StreamTermination( request.stream_termination(), ), ), @@ -705,9 +706,8 @@ where return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( RPCEvent::Error( request_id, - RPCError::Custom( - "Stream closed early. Empty response".into(), - ), + request.protocol(), + RPCError::IncompleteStream, ), ))); } @@ -721,9 +721,10 @@ where // drop the stream let delay_key = &entry.get().1; self.outbound_substreams_delay.remove(delay_key); + let protocol = entry.get().2; entry.remove_entry(); return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( - RPCEvent::Error(request_id, e), + RPCEvent::Error(request_id, protocol, e), ))); } }, @@ -759,16 +760,14 @@ where // establish outbound substreams if !self.dial_queue.is_empty() && self.dial_negotiated < self.max_dial_negotiated { self.dial_negotiated += 1; - let rpc_event = self.dial_queue.remove(0); + let (id, req) = self.dial_queue.remove(0); self.dial_queue.shrink_to_fit(); - if let RPCEvent::Request(id, req) = rpc_event { - return Ok(Async::Ready( - ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(req.clone()), - info: RPCEvent::Request(id, req), - }, - )); - } + return Ok(Async::Ready( + ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(req.clone()), + info: (id, req), + }, + )); } Ok(Async::NotReady) } @@ -777,7 +776,7 @@ where // Check for new items to send to the peer and update the underlying stream fn apply_queued_responses( raw_substream: InboundFramed, - queued_outbound_items: &mut Option<&mut Vec>>, + queued_outbound_items: &mut Option<&mut Vec>>, new_items_to_send: &mut bool, ) -> InboundSubstreamState { match queued_outbound_items { @@ -785,7 +784,7 @@ fn apply_queued_responses( *new_items_to_send = true; // we have queued items match queue.remove(0) { - RPCErrorResponse::StreamTermination(_) => { + RPCCodedResponse::StreamTermination(_) => { // close the stream if this is a stream termination InboundSubstreamState::Closing(raw_substream) } diff --git a/beacon_node/eth2-libp2p/src/rpc/methods.rs b/beacon_node/eth2-libp2p/src/rpc/methods.rs index c9e86d3ecd..e4b5b67144 100644 --- a/beacon_node/eth2-libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2-libp2p/src/rpc/methods.rs @@ -173,8 +173,10 @@ pub enum ResponseTermination { BlocksByRoot, } +/// The structured response containing a result/code indicating success or failure +/// and the contents of the response #[derive(Debug)] -pub enum RPCErrorResponse { +pub enum RPCCodedResponse { /// The response is a successful. Success(RPCResponse), @@ -191,15 +193,23 @@ pub enum RPCErrorResponse { StreamTermination(ResponseTermination), } -impl RPCErrorResponse { +/// The code assigned to an erroneous `RPCResponse`. +#[derive(Debug)] +pub enum RPCResponseErrorCode { + InvalidRequest, + ServerError, + Unknown, +} + +impl RPCCodedResponse { /// Used to encode the response in the codec. pub fn as_u8(&self) -> Option { match self { - RPCErrorResponse::Success(_) => Some(0), - RPCErrorResponse::InvalidRequest(_) => Some(1), - RPCErrorResponse::ServerError(_) => Some(2), - RPCErrorResponse::Unknown(_) => Some(255), - RPCErrorResponse::StreamTermination(_) => None, + RPCCodedResponse::Success(_) => Some(0), + RPCCodedResponse::InvalidRequest(_) => Some(1), + RPCCodedResponse::ServerError(_) => Some(2), + RPCCodedResponse::Unknown(_) => Some(255), + RPCCodedResponse::StreamTermination(_) => None, } } @@ -211,30 +221,30 @@ impl RPCErrorResponse { } } - /// Builds an RPCErrorResponse from a response code and an ErrorMessage + /// Builds an RPCCodedResponse from a response code and an ErrorMessage pub fn from_error(response_code: u8, err: ErrorMessage) -> Self { match response_code { - 1 => RPCErrorResponse::InvalidRequest(err), - 2 => RPCErrorResponse::ServerError(err), - _ => RPCErrorResponse::Unknown(err), + 1 => RPCCodedResponse::InvalidRequest(err), + 2 => RPCCodedResponse::ServerError(err), + _ => RPCCodedResponse::Unknown(err), } } /// Specifies which response allows for multiple chunks for the stream handler. pub fn multiple_responses(&self) -> bool { match self { - RPCErrorResponse::Success(resp) => match resp { + RPCCodedResponse::Success(resp) => match resp { RPCResponse::Status(_) => false, RPCResponse::BlocksByRange(_) => true, RPCResponse::BlocksByRoot(_) => true, RPCResponse::Pong(_) => false, RPCResponse::MetaData(_) => false, }, - RPCErrorResponse::InvalidRequest(_) => true, - RPCErrorResponse::ServerError(_) => true, - RPCErrorResponse::Unknown(_) => true, + RPCCodedResponse::InvalidRequest(_) => true, + RPCCodedResponse::ServerError(_) => true, + RPCCodedResponse::Unknown(_) => true, // Stream terminations are part of responses that have chunks - RPCErrorResponse::StreamTermination(_) => true, + RPCCodedResponse::StreamTermination(_) => true, } } @@ -242,10 +252,20 @@ impl RPCErrorResponse { /// sent. pub fn is_error(&self) -> bool { match self { - RPCErrorResponse::Success(_) => false, + RPCCodedResponse::Success(_) => false, _ => true, } } + + pub fn error_code(&self) -> Option { + match self { + RPCCodedResponse::Success(_) => None, + RPCCodedResponse::StreamTermination(_) => None, + RPCCodedResponse::InvalidRequest(_) => Some(RPCResponseErrorCode::InvalidRequest), + RPCCodedResponse::ServerError(_) => Some(RPCResponseErrorCode::ServerError), + RPCCodedResponse::Unknown(_) => Some(RPCResponseErrorCode::Unknown), + } + } } #[derive(Encode, Decode, Debug)] @@ -260,6 +280,17 @@ impl ErrorMessage { } } +impl std::fmt::Display for RPCResponseErrorCode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let repr = match self { + RPCResponseErrorCode::InvalidRequest => "The request was invalid", + RPCResponseErrorCode::ServerError => "Server error occurred", + RPCResponseErrorCode::Unknown => "Unknown error occurred", + }; + f.write_str(repr) + } +} + impl std::fmt::Display for StatusMessage { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "Status Message: Fork Digest: {:?}, Finalized Root: {}, Finalized Epoch: {}, Head Root: {}, Head Slot: {}", self.fork_digest, self.finalized_root, self.finalized_epoch, self.head_root, self.head_slot) @@ -282,14 +313,14 @@ impl std::fmt::Display for RPCResponse { } } -impl std::fmt::Display for RPCErrorResponse { +impl std::fmt::Display for RPCCodedResponse { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - RPCErrorResponse::Success(res) => write!(f, "{}", res), - RPCErrorResponse::InvalidRequest(err) => write!(f, "Invalid Request: {:?}", err), - RPCErrorResponse::ServerError(err) => write!(f, "Server Error: {:?}", err), - RPCErrorResponse::Unknown(err) => write!(f, "Unknown Error: {:?}", err), - RPCErrorResponse::StreamTermination(_) => write!(f, "Stream Termination"), + RPCCodedResponse::Success(res) => write!(f, "{}", res), + RPCCodedResponse::InvalidRequest(err) => write!(f, "Invalid Request: {:?}", err), + RPCCodedResponse::ServerError(err) => write!(f, "Server Error: {:?}", err), + RPCCodedResponse::Unknown(err) => write!(f, "Unknown Error: {:?}", err), + RPCCodedResponse::StreamTermination(_) => write!(f, "Stream Termination"), } } } diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index b3a250818a..df87d8f89c 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -13,10 +13,10 @@ use libp2p::swarm::{ }; use libp2p::{Multiaddr, PeerId}; pub use methods::{ - ErrorMessage, MetaData, RPCErrorResponse, RPCResponse, RequestId, ResponseTermination, - StatusMessage, + ErrorMessage, MetaData, RPCCodedResponse, RPCResponse, RPCResponseErrorCode, RequestId, + ResponseTermination, StatusMessage, }; -pub use protocol::{RPCError, RPCProtocol, RPCRequest}; +pub use protocol::{Protocol, RPCError, RPCProtocol, RPCRequest}; use slog::{debug, o}; use std::marker::PhantomData; use std::time::Duration; @@ -37,9 +37,9 @@ pub enum RPCEvent { /// A response that is being sent or has been received from the RPC protocol. The first parameter returns /// that which was sent with the corresponding request, the second is a single chunk of a /// response. - Response(RequestId, RPCErrorResponse), + Response(RequestId, RPCCodedResponse), /// An Error occurred. - Error(RequestId, RPCError), + Error(RequestId, Protocol, RPCError), } impl RPCEvent { @@ -47,7 +47,7 @@ impl RPCEvent { match *self { RPCEvent::Request(id, _) => id, RPCEvent::Response(id, _) => id, - RPCEvent::Error(id, _) => id, + RPCEvent::Error(id, _, _) => id, } } } @@ -57,7 +57,11 @@ impl std::fmt::Display for RPCEvent { match self { RPCEvent::Request(id, req) => write!(f, "RPC Request(id: {}, {})", id, req), RPCEvent::Response(id, res) => write!(f, "RPC Response(id: {}, {})", id, res), - RPCEvent::Error(id, err) => write!(f, "RPC Request(id: {}, error: {:?})", id, err), + RPCEvent::Error(id, prot, err) => write!( + f, + "RPC Error(id: {}, protocol: {:?} error: {:?})", + id, prot, err + ), } } } diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index 76567cf466..d0b313bcf6 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -34,7 +34,7 @@ const TTFB_TIMEOUT: u64 = 5; const REQUEST_TIMEOUT: u64 = 15; /// Protocol names to be used. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy)] pub enum Protocol { /// The Status protocol name. Status, @@ -128,7 +128,7 @@ impl UpgradeInfo for RPCProtocol { /// Tracks the types in a protocol id. #[derive(Clone, Debug)] pub struct ProtocolId { - /// The rpc message type/name. + /// The RPC message type/name. pub message_name: Protocol, /// The version of the RPC. @@ -151,7 +151,7 @@ impl ProtocolId { ProtocolId { message_name, - version: version, + version, encoding, protocol_id, } @@ -172,11 +172,20 @@ impl ProtocolName for ProtocolId { pub type InboundOutput = (RPCRequest, InboundFramed); pub type InboundFramed = Framed>, InboundCodec>; + +// Auxiliary types + +// The type of the socket timeout in the `InboundUpgrade` type `Future` +type TTimeout = + timeout::Timeout>>; +// The type of the socket timeout error in the `InboundUpgrade` type `Future` +type TTimeoutErr = timeout::Error<(RPCError, InboundFramed)>; +// `TimeoutErr` to `RPCError` mapping function +type FnMapErr = fn(TTimeoutErr) -> RPCError; + type FnAndThen = fn( (Option>, InboundFramed), ) -> FutureResult, RPCError>; -type FnMapErr = - fn(timeout::Error<(RPCError, InboundFramed)>) -> RPCError; impl InboundUpgrade for RPCProtocol where @@ -189,10 +198,7 @@ where type Future = future::Either< FutureResult, RPCError>, future::AndThen< - future::MapErr< - timeout::Timeout>>, - FnMapErr, - >, + future::MapErr, FnMapErr>, FutureResult, RPCError>, FnAndThen, >, @@ -203,7 +209,7 @@ where socket: upgrade::Negotiated, protocol: ProtocolId, ) -> Self::Future { - let protocol_name = protocol.message_name.clone(); + let protocol_name = protocol.message_name; let codec = match protocol.encoding { Encoding::SSZSnappy => { let ssz_snappy_codec = @@ -220,27 +226,31 @@ where let socket = Framed::new(timed_socket, codec); - // MetaData requests should be empty, return the stream match protocol_name { - Protocol::MetaData => futures::future::Either::A(futures::future::ok(( - RPCRequest::MetaData(PhantomData), - socket, - ))), - - _ => futures::future::Either::B( + // `MetaData` requests should be empty, return the stream + Protocol::MetaData => { + future::Either::A(future::ok((RPCRequest::MetaData(PhantomData), socket))) + } + _ => future::Either::B({ socket .into_future() .timeout(Duration::from_secs(REQUEST_TIMEOUT)) - .map_err(RPCError::from as FnMapErr) + .map_err({ + |err| { + if err.is_elapsed() { + RPCError::StreamTimeout + } else { + RPCError::InternalError("Stream timer failed") + } + } + } as FnMapErr) .and_then({ |(req, stream)| match req { - Some(request) => futures::future::ok((request, stream)), - None => futures::future::err(RPCError::Custom( - "Stream terminated early".into(), - )), + Some(request) => future::ok((request, stream)), + None => future::err(RPCError::IncompleteStream), } - } as FnAndThen), - ), + } as FnAndThen) + }), } } } @@ -270,7 +280,7 @@ impl UpgradeInfo for RPCRequest { } } -/// Implements the encoding per supported protocol for RPCRequest. +/// Implements the encoding per supported protocol for `RPCRequest`. impl RPCRequest { pub fn supported_protocols(&self) -> Vec { match self { @@ -330,6 +340,17 @@ impl RPCRequest { } } + pub fn protocol(&self) -> Protocol { + match self { + RPCRequest::Status(_) => Protocol::Status, + RPCRequest::Goodbye(_) => Protocol::Goodbye, + RPCRequest::BlocksByRange(_) => Protocol::BlocksByRange, + RPCRequest::BlocksByRoot(_) => Protocol::BlocksByRoot, + RPCRequest::Ping(_) => Protocol::Ping, + RPCRequest::MetaData(_) => Protocol::MetaData, + } + } + /// Returns the `ResponseTermination` type associated with the request if a stream gets /// terminated. pub fn stream_termination(&self) -> ResponseTermination { @@ -361,6 +382,7 @@ where type Output = OutboundFramed; type Error = RPCError; type Future = sink::Send>; + fn upgrade_outbound( self, socket: upgrade::Negotiated, @@ -385,29 +407,25 @@ where /// Error in RPC Encoding/Decoding. #[derive(Debug)] pub enum RPCError { - /// Error when reading the packet from the socket. - ReadError(upgrade::ReadOneError), /// Error when decoding the raw buffer from ssz. + // NOTE: in the future a ssz::DecodeError should map to an InvalidData error SSZDecodeError(ssz::DecodeError), - /// Snappy error - SnappyError(snap::Error), - /// Invalid Protocol ID. - InvalidProtocol(&'static str), /// IO Error. IoError(io::Error), - /// Waiting for a request/response timed out, or timer error'd. + /// The peer returned a valid response but the response indicated an error. + ErrorResponse(RPCResponseErrorCode), + /// Timed out waiting for a response. StreamTimeout, - /// The peer returned a valid RPCErrorResponse but the response was an error. - RPCErrorResponse, - /// Custom message. - Custom(String), -} - -impl From for RPCError { - #[inline] - fn from(err: upgrade::ReadOneError) -> Self { - RPCError::ReadError(err) - } + /// Peer does not support the protocol. + UnsupportedProtocol, + /// Stream ended unexpectedly. + IncompleteStream, + /// Peer sent invalid data. + InvalidData, + /// An error occurred due to internal reasons. Ex: timer failure. + InternalError(&'static str), + /// Negotiation with this peer timed out + NegotiationTimeout, } impl From for RPCError { @@ -416,21 +434,6 @@ impl From for RPCError { RPCError::SSZDecodeError(err) } } -impl From> for RPCError { - fn from(err: tokio::timer::timeout::Error) -> Self { - if err.is_elapsed() { - RPCError::StreamTimeout - } else { - RPCError::Custom("Stream timer failed".into()) - } - } -} - -impl From<()> for RPCError { - fn from(_err: ()) -> Self { - RPCError::Custom("".into()) - } -} impl From for RPCError { fn from(err: io::Error) -> Self { @@ -438,24 +441,19 @@ impl From for RPCError { } } -impl From for RPCError { - fn from(err: snap::Error) -> Self { - RPCError::SnappyError(err) - } -} - // Error trait is required for `ProtocolsHandler` impl std::fmt::Display for RPCError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match *self { - RPCError::ReadError(ref err) => write!(f, "Error while reading from socket: {}", err), RPCError::SSZDecodeError(ref err) => write!(f, "Error while decoding ssz: {:?}", err), - RPCError::InvalidProtocol(ref err) => write!(f, "Invalid Protocol: {}", err), + RPCError::InvalidData => write!(f, "Peer sent unexpected data"), RPCError::IoError(ref err) => write!(f, "IO Error: {}", err), - RPCError::RPCErrorResponse => write!(f, "RPC Response Error"), + RPCError::ErrorResponse(ref code) => write!(f, "RPC response was an error: {}", code), RPCError::StreamTimeout => write!(f, "Stream Timeout"), - RPCError::SnappyError(ref err) => write!(f, "Snappy error: {}", err), - RPCError::Custom(ref err) => write!(f, "{}", err), + RPCError::UnsupportedProtocol => write!(f, "Peer does not support the protocol"), + RPCError::IncompleteStream => write!(f, "Stream ended unexpectedly"), + RPCError::InternalError(ref err) => write!(f, "Internal error: {}", err), + RPCError::NegotiationTimeout => write!(f, "Negotiation timeout"), } } } @@ -463,14 +461,16 @@ impl std::fmt::Display for RPCError { impl std::error::Error for RPCError { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { match *self { - RPCError::ReadError(ref err) => Some(err), + // NOTE: this does have a source RPCError::SSZDecodeError(_) => None, - RPCError::SnappyError(ref err) => Some(err), - RPCError::InvalidProtocol(_) => None, RPCError::IoError(ref err) => Some(err), RPCError::StreamTimeout => None, - RPCError::RPCErrorResponse => None, - RPCError::Custom(_) => None, + RPCError::UnsupportedProtocol => None, + RPCError::IncompleteStream => None, + RPCError::InvalidData => None, + RPCError::InternalError(_) => None, + RPCError::ErrorResponse(_) => None, + RPCError::NegotiationTimeout => None, } } } diff --git a/beacon_node/eth2-libp2p/tests/rpc_tests.rs b/beacon_node/eth2-libp2p/tests/rpc_tests.rs index 74a0a7a1b7..6f2a00bbb8 100644 --- a/beacon_node/eth2-libp2p/tests/rpc_tests.rs +++ b/beacon_node/eth2-libp2p/tests/rpc_tests.rs @@ -62,12 +62,12 @@ fn test_status_rpc() { } Async::Ready(Some(BehaviourEvent::RPC(_, event))) => match event { // Should receive the RPC response - RPCEvent::Response(id, response @ RPCErrorResponse::Success(_)) => { + RPCEvent::Response(id, response @ RPCCodedResponse::Success(_)) => { if id == 1 { warn!(sender_log, "Sender Received"); let response = { match response { - RPCErrorResponse::Success(r) => r, + RPCCodedResponse::Success(r) => r, _ => unreachable!(), } }; @@ -99,7 +99,7 @@ fn test_status_rpc() { peer_id, RPCEvent::Response( id, - RPCErrorResponse::Success(rpc_response.clone()), + RPCCodedResponse::Success(rpc_response.clone()), ), ); } @@ -181,12 +181,12 @@ fn test_blocks_by_range_chunked_rpc() { if id == 1 { warn!(sender_log, "Sender received a response"); match response { - RPCErrorResponse::Success(res) => { + RPCCodedResponse::Success(res) => { assert_eq!(res, sender_response.clone()); *messages_received.lock().unwrap() += 1; warn!(sender_log, "Chunk received"); } - RPCErrorResponse::StreamTermination( + RPCCodedResponse::StreamTermination( ResponseTermination::BlocksByRange, ) => { // should be exactly 10 messages before terminating @@ -225,7 +225,7 @@ fn test_blocks_by_range_chunked_rpc() { peer_id.clone(), RPCEvent::Response( id, - RPCErrorResponse::Success(rpc_response.clone()), + RPCCodedResponse::Success(rpc_response.clone()), ), ); } @@ -234,7 +234,7 @@ fn test_blocks_by_range_chunked_rpc() { peer_id, RPCEvent::Response( id, - RPCErrorResponse::StreamTermination( + RPCCodedResponse::StreamTermination( ResponseTermination::BlocksByRange, ), ), @@ -316,12 +316,12 @@ fn test_blocks_by_range_single_empty_rpc() { if id == 1 { warn!(sender_log, "Sender received a response"); match response { - RPCErrorResponse::Success(res) => { + RPCCodedResponse::Success(res) => { assert_eq!(res, sender_response.clone()); *messages_received.lock().unwrap() += 1; warn!(sender_log, "Chunk received"); } - RPCErrorResponse::StreamTermination( + RPCCodedResponse::StreamTermination( ResponseTermination::BlocksByRange, ) => { // should be exactly 1 messages before terminating @@ -356,7 +356,7 @@ fn test_blocks_by_range_single_empty_rpc() { peer_id.clone(), RPCEvent::Response( id, - RPCErrorResponse::Success(rpc_response.clone()), + RPCCodedResponse::Success(rpc_response.clone()), ), ); // send the stream termination @@ -364,7 +364,7 @@ fn test_blocks_by_range_single_empty_rpc() { peer_id, RPCEvent::Response( id, - RPCErrorResponse::StreamTermination( + RPCCodedResponse::StreamTermination( ResponseTermination::BlocksByRange, ), ), @@ -449,12 +449,12 @@ fn test_blocks_by_root_chunked_rpc() { warn!(sender_log, "Sender received a response"); assert_eq!(id, 1); match response { - RPCErrorResponse::Success(res) => { + RPCCodedResponse::Success(res) => { assert_eq!(res, sender_response.clone()); *messages_received.lock().unwrap() += 1; warn!(sender_log, "Chunk received"); } - RPCErrorResponse::StreamTermination( + RPCCodedResponse::StreamTermination( ResponseTermination::BlocksByRoot, ) => { // should be exactly 10 messages before terminating @@ -489,7 +489,7 @@ fn test_blocks_by_root_chunked_rpc() { peer_id.clone(), RPCEvent::Response( id, - RPCErrorResponse::Success(rpc_response.clone()), + RPCCodedResponse::Success(rpc_response.clone()), ), ); } @@ -498,7 +498,7 @@ fn test_blocks_by_root_chunked_rpc() { peer_id, RPCEvent::Response( id, - RPCErrorResponse::StreamTermination( + RPCCodedResponse::StreamTermination( ResponseTermination::BlocksByRange, ), ), diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index 466c70363f..8615d55863 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -10,7 +10,10 @@ use crate::error; use crate::service::NetworkMessage; use beacon_chain::{AttestationType, BeaconChain, BeaconChainTypes, BlockError}; use eth2_libp2p::{ - rpc::{RPCError, RPCErrorResponse, RPCRequest, RPCResponse, RequestId, ResponseTermination}, + rpc::{ + RPCCodedResponse, RPCError, RPCRequest, RPCResponse, RPCResponseErrorCode, RequestId, + ResponseTermination, + }, MessageId, NetworkGlobals, PeerId, PubsubMessage, RPCEvent, }; use futures::future::Future; @@ -123,7 +126,7 @@ impl Router { match rpc_message { RPCEvent::Request(id, req) => self.handle_rpc_request(peer_id, id, req), RPCEvent::Response(id, resp) => self.handle_rpc_response(peer_id, id, resp), - RPCEvent::Error(id, error) => self.handle_rpc_error(peer_id, id, error), + RPCEvent::Error(id, _protocol, error) => self.handle_rpc_error(peer_id, id, error), } } @@ -164,23 +167,35 @@ impl Router { &mut self, peer_id: PeerId, request_id: RequestId, - error_response: RPCErrorResponse, + error_response: RPCCodedResponse, ) { // an error could have occurred. match error_response { - RPCErrorResponse::InvalidRequest(error) => { - warn!(self.log, "Peer indicated invalid request";"peer_id" => format!("{:?}", peer_id), "error" => error.as_string()); - self.handle_rpc_error(peer_id, request_id, RPCError::RPCErrorResponse); + RPCCodedResponse::InvalidRequest(error) => { + warn!(self.log, "Peer indicated invalid request"; "peer_id" => format!("{:?}", peer_id), "error" => error.as_string()); + self.handle_rpc_error( + peer_id, + request_id, + RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest), + ); } - RPCErrorResponse::ServerError(error) => { - warn!(self.log, "Peer internal server error";"peer_id" => format!("{:?}", peer_id), "error" => error.as_string()); - self.handle_rpc_error(peer_id, request_id, RPCError::RPCErrorResponse); + RPCCodedResponse::ServerError(error) => { + warn!(self.log, "Peer internal server error"; "peer_id" => format!("{:?}", peer_id), "error" => error.as_string()); + self.handle_rpc_error( + peer_id, + request_id, + RPCError::ErrorResponse(RPCResponseErrorCode::ServerError), + ); } - RPCErrorResponse::Unknown(error) => { - warn!(self.log, "Unknown peer error";"peer" => format!("{:?}", peer_id), "error" => error.as_string()); - self.handle_rpc_error(peer_id, request_id, RPCError::RPCErrorResponse); + RPCCodedResponse::Unknown(error) => { + warn!(self.log, "Unknown peer error"; "peer" => format!("{:?}", peer_id), "error" => error.as_string()); + self.handle_rpc_error( + peer_id, + request_id, + RPCError::ErrorResponse(RPCResponseErrorCode::Unknown), + ); } - RPCErrorResponse::Success(response) => match response { + RPCCodedResponse::Success(response) => match response { RPCResponse::Status(status_message) => { self.processor.on_status_response(peer_id, status_message); } @@ -205,7 +220,7 @@ impl Router { unreachable!("Meta data must be handled in the behaviour"); } }, - RPCErrorResponse::StreamTermination(response_type) => { + RPCCodedResponse::StreamTermination(response_type) => { // have received a stream termination, notify the processing functions match response_type { ResponseTermination::BlocksByRange => { diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index d309ee9bb1..fc1f6b6fb5 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -5,7 +5,7 @@ use beacon_chain::{ BlockProcessingOutcome, GossipVerifiedBlock, }; use eth2_libp2p::rpc::methods::*; -use eth2_libp2p::rpc::{RPCEvent, RPCRequest, RPCResponse, RequestId}; +use eth2_libp2p::rpc::{RPCCodedResponse, RPCEvent, RPCRequest, RPCResponse, RequestId}; use eth2_libp2p::{NetworkGlobals, PeerId}; use slog::{debug, error, o, trace, warn}; use ssz::Encode; @@ -314,7 +314,7 @@ impl Processor { self.network.send_rpc_error_response( peer_id, request_id, - RPCErrorResponse::StreamTermination(ResponseTermination::BlocksByRoot), + RPCCodedResponse::StreamTermination(ResponseTermination::BlocksByRoot), ); } @@ -413,7 +413,7 @@ impl Processor { self.network.send_rpc_error_response( peer_id, request_id, - RPCErrorResponse::StreamTermination(ResponseTermination::BlocksByRange), + RPCCodedResponse::StreamTermination(ResponseTermination::BlocksByRange), ); } @@ -691,16 +691,16 @@ impl HandlerNetworkContext { ) { self.send_rpc_event( peer_id, - RPCEvent::Response(request_id, RPCErrorResponse::Success(rpc_response)), + RPCEvent::Response(request_id, RPCCodedResponse::Success(rpc_response)), ); } - /// Send an RPCErrorResponse. This handles errors and stream terminations. + /// Send an RPCCodedResponse. This handles errors and stream terminations. pub fn send_rpc_error_response( &mut self, peer_id: PeerId, request_id: RequestId, - rpc_error_response: RPCErrorResponse, + rpc_error_response: RPCCodedResponse, ) { self.send_rpc_event(peer_id, RPCEvent::Response(request_id, rpc_error_response)); } From 36f213c092eb4802e603cff5901269168637c3f6 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Sun, 3 May 2020 18:48:19 +0530 Subject: [PATCH 4/8] Dns discovery (#1015) * Add cli flag and parse dns address * Fail if enr udp port isn't set * Improve docs and address parsing * address review comments * Remove debug statements * Add requires condition for enr-address * Return address in error --- beacon_node/src/cli.rs | 8 ++++--- beacon_node/src/config.rs | 44 +++++++++++++++++++++++++++++++-------- 2 files changed, 40 insertions(+), 12 deletions(-) diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 9a2dfc1d49..8f42037f64 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -94,10 +94,12 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { Arg::with_name("enr-address") .long("enr-address") .value_name("ADDRESS") - .help("The IP address to broadcast to other peers on how to reach this node. \ + .help("The IP address/ DNS address to broadcast to other peers on how to reach this node. \ + If a DNS address is provided, the enr-address is set to the IP address it resolves to and \ + does not auto-update based on PONG responses in discovery. \ Set this only if you are sure other nodes can connect to your local node on this address. \ - Discovery will automatically find your external address,if possible. - ") + Discovery will automatically find your external address,if possible.") + .requires("enr-udp-port") .takes_value(true), ) .arg( diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index b852bb7f3d..2b848c81b3 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -9,7 +9,7 @@ use ssz::Encode; use std::fs; use std::fs::File; use std::io::prelude::*; -use std::net::{IpAddr, Ipv4Addr}; +use std::net::{IpAddr, Ipv4Addr, ToSocketAddrs}; use std::net::{TcpListener, UdpSocket}; use std::path::PathBuf; use types::{ChainSpec, EthSpec}; @@ -141,14 +141,6 @@ pub fn get_config( .collect::, _>>()?; } - if let Some(enr_address_str) = cli_args.value_of("enr-address") { - client_config.network.enr_address = Some( - enr_address_str - .parse() - .map_err(|_| format!("Invalid discovery address: {:?}", enr_address_str))?, - ) - } - if let Some(enr_udp_port_str) = cli_args.value_of("enr-udp-port") { client_config.network.enr_udp_port = Some( enr_udp_port_str @@ -178,6 +170,40 @@ pub fn get_config( client_config.network.enr_udp_port = Some(client_config.network.discovery_port); } + if let Some(enr_address) = cli_args.value_of("enr-address") { + let resolved_addr = match enr_address.parse::() { + Ok(addr) => addr, // // Input is an IpAddr + Err(_) => { + let mut addr = enr_address.to_string(); + // Appending enr-port to the dns hostname to appease `to_socket_addrs()` parsing. + // Since enr-update is disabled with a dns address, not setting the enr-udp-port + // will make the node undiscoverable. + if let Some(enr_udp_port) = client_config.network.enr_udp_port { + addr.push_str(&format!(":{}", enr_udp_port.to_string())); + } else { + return Err( + "enr-udp-port must be set for node to be discoverable with dns address" + .into(), + ); + } + // `to_socket_addr()` does the dns resolution + // Note: `to_socket_addrs()` is a blocking call + let resolved_addr = if let Ok(mut resolved_addrs) = addr.to_socket_addrs() { + // Pick the first ip from the list of resolved addresses + resolved_addrs + .next() + .map(|a| a.ip()) + .ok_or_else(|| format!("Resolved dns addr contains no entries"))? + } else { + return Err(format!("Failed to parse enr-address: {}", enr_address)); + }; + client_config.network.discv5_config.enr_update = false; + resolved_addr + } + }; + client_config.network.enr_address = Some(resolved_addr); + } + if cli_args.is_present("disable_enr_auto_update") { client_config.network.discv5_config.enr_update = false; } From 90453181f8644960beb6f079068bc617a3ebbc0e Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 4 May 2020 08:03:31 +1000 Subject: [PATCH 5/8] Fix lcli arg typo (#1097) --- lcli/src/new_testnet.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lcli/src/new_testnet.rs b/lcli/src/new_testnet.rs index 6e2eea4030..674080af79 100644 --- a/lcli/src/new_testnet.rs +++ b/lcli/src/new_testnet.rs @@ -46,7 +46,7 @@ pub fn run(matches: &ArgMatches) -> Result<(), String> { maybe_update!("max-effective-balance", max_effective_balance); maybe_update!("effective-balance-increment", effective_balance_increment); maybe_update!("ejection-balance", ejection_balance); - maybe_update!("eth1-follow_distance", eth1_follow_distance); + maybe_update!("eth1-follow-distance", eth1_follow_distance); maybe_update!("min-genesis-delay", min_genesis_delay); if let Some(v) = parse_ssz_optional(matches, "genesis-fork-version")? { From ebbc4e36304036e0c13916b8bf4aff1352e9aae8 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 4 May 2020 08:04:00 +1000 Subject: [PATCH 6/8] Add skip-slots command (#1095) --- lcli/src/main.rs | 30 +++++++++++++++++++ lcli/src/skip_slots.rs | 55 +++++++++++++++++++++++++++++++++++ lcli/src/transition_blocks.rs | 2 +- 3 files changed, 86 insertions(+), 1 deletion(-) create mode 100644 lcli/src/skip_slots.rs diff --git a/lcli/src/main.rs b/lcli/src/main.rs index f4cd748f26..99621c1b15 100644 --- a/lcli/src/main.rs +++ b/lcli/src/main.rs @@ -10,6 +10,7 @@ mod interop_genesis; mod new_testnet; mod parse_hex; mod refund_deposit_contract; +mod skip_slots; mod transition_blocks; use clap::{App, Arg, ArgMatches, SubCommand}; @@ -77,6 +78,32 @@ fn main() { .help("Output file for generated state."), ), ) + .subcommand( + SubCommand::with_name("skip-slots") + .about("Performs a state transition from some state across some number of skip slots") + .arg( + Arg::with_name("pre-state") + .value_name("BEACON_STATE") + .takes_value(true) + .required(true) + .help("Path to a SSZ file of the pre-state."), + ) + .arg( + Arg::with_name("slots") + .value_name("SLOT_COUNT") + .takes_value(true) + .required(true) + .help("Number of slots to skip before outputting a state.."), + ) + .arg( + Arg::with_name("output") + .value_name("SSZ_FILE") + .takes_value(true) + .required(true) + .default_value("./output.ssz") + .help("Path to output a SSZ file."), + ), + ) .subcommand( SubCommand::with_name("transition-blocks") .about("Performs a state transition given a pre-state and block") @@ -488,6 +515,9 @@ fn run(env_builder: EnvironmentBuilder, matches: &ArgMatches) -> } ("transition-blocks", Some(matches)) => run_transition_blocks::(matches) .map_err(|e| format!("Failed to transition blocks: {}", e)), + ("skip-slots", Some(matches)) => { + skip_slots::run::(matches).map_err(|e| format!("Failed to skip slots: {}", e)) + } ("pretty-hex", Some(matches)) => { run_parse_hex::(matches).map_err(|e| format!("Failed to pretty print hex: {}", e)) } diff --git a/lcli/src/skip_slots.rs b/lcli/src/skip_slots.rs new file mode 100644 index 0000000000..92cd482624 --- /dev/null +++ b/lcli/src/skip_slots.rs @@ -0,0 +1,55 @@ +use crate::transition_blocks::load_from_ssz; +use clap::ArgMatches; +use ssz::Encode; +use state_processing::per_slot_processing; +use std::fs::File; +use std::io::prelude::*; +use std::path::PathBuf; +use types::{BeaconState, EthSpec}; + +pub fn run(matches: &ArgMatches) -> Result<(), String> { + let pre_state_path = matches + .value_of("pre-state") + .ok_or_else(|| "No pre-state file supplied".to_string())? + .parse::() + .map_err(|e| format!("Failed to parse pre-state path: {}", e))?; + + let slots = matches + .value_of("slots") + .ok_or_else(|| "No slots supplied".to_string())? + .parse::() + .map_err(|e| format!("Failed to parse slots: {}", e))?; + + let output_path = matches + .value_of("output") + .ok_or_else(|| "No output file supplied".to_string())? + .parse::() + .map_err(|e| format!("Failed to parse output path: {}", e))?; + + info!("Using minimal spec"); + info!("Pre-state path: {:?}", pre_state_path); + info!("Slots: {:?}", slots); + + let mut state: BeaconState = load_from_ssz(pre_state_path)?; + + let spec = &T::default_spec(); + + state + .build_all_caches(spec) + .map_err(|e| format!("Unable to build caches: {:?}", e))?; + + // Transition the parent state to the block slot. + for i in 0..slots { + per_slot_processing(&mut state, None, spec) + .map_err(|e| format!("Failed to advance slot on iteration {}: {:?}", i, e))?; + } + + let mut output_file = + File::create(output_path).map_err(|e| format!("Unable to create output file: {:?}", e))?; + + output_file + .write_all(&state.as_ssz_bytes()) + .map_err(|e| format!("Unable to write to output file: {:?}", e))?; + + Ok(()) +} diff --git a/lcli/src/transition_blocks.rs b/lcli/src/transition_blocks.rs index 6a13f7ce14..28e4b06572 100644 --- a/lcli/src/transition_blocks.rs +++ b/lcli/src/transition_blocks.rs @@ -76,7 +76,7 @@ fn do_transition( Ok(pre_state) } -fn load_from_ssz(path: PathBuf) -> Result { +pub fn load_from_ssz(path: PathBuf) -> Result { let mut file = File::open(path.clone()).map_err(|e| format!("Unable to open file {:?}: {:?}", path, e))?; let mut bytes = vec![]; From 6c713d1e5f7987ed2894150cb45e3dfc47315f10 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 4 May 2020 08:04:24 +1000 Subject: [PATCH 7/8] Add note about building from testnet5 (#1094) --- book/src/become-a-validator.md | 1 + 1 file changed, 1 insertion(+) diff --git a/book/src/become-a-validator.md b/book/src/become-a-validator.md index 6427b414f4..cf19417617 100644 --- a/book/src/become-a-validator.md +++ b/book/src/become-a-validator.md @@ -21,6 +21,7 @@ There are two, different ways to install and start a Lighthouse validator: 2. [Building from source](./become-a-validator-source.md): this is a little more involved, however it gives a more hands-on experience. + - Note: to connect to the testnet you must build from the `testnet5` branch, `master` will not work. Once you've completed **either one** of these steps, you can move onto the next step. From 353e496bcb6aceafb8b6731a6a4972e779a90f6e Mon Sep 17 00:00:00 2001 From: Justin Date: Mon, 4 May 2020 01:04:28 +0100 Subject: [PATCH 8/8] Delete macros.rs (#1099) The `macros.rs` file under `eth2/utils/ssz/src` is (almost) empty. Can it be deleted? --- eth2/utils/ssz/src/macros.rs | 1 - 1 file changed, 1 deletion(-) delete mode 100644 eth2/utils/ssz/src/macros.rs diff --git a/eth2/utils/ssz/src/macros.rs b/eth2/utils/ssz/src/macros.rs deleted file mode 100644 index 8b13789179..0000000000 --- a/eth2/utils/ssz/src/macros.rs +++ /dev/null @@ -1 +0,0 @@ -