From 35838dbfbea2e9ab83936ede32853fc25d904fcb Mon Sep 17 00:00:00 2001 From: Age Manning Date: Mon, 4 May 2020 18:08:48 +1000 Subject: [PATCH] Update RPC to master libp2p --- .../eth2-libp2p/src/discovery/enr_ext.rs | 4 +- beacon_node/eth2-libp2p/src/discovery/mod.rs | 2 +- beacon_node/eth2-libp2p/src/lib.rs | 2 +- beacon_node/eth2-libp2p/src/rpc/methods.rs | 8 ++-- beacon_node/eth2-libp2p/src/rpc/mod.rs | 37 +++++++++++++------ beacon_node/eth2-libp2p/src/rpc/protocol.rs | 8 ++-- beacon_node/eth2-libp2p/src/service.rs | 1 + beacon_node/eth2-libp2p/src/types/globals.rs | 1 + 8 files changed, 40 insertions(+), 23 deletions(-) diff --git a/beacon_node/eth2-libp2p/src/discovery/enr_ext.rs b/beacon_node/eth2-libp2p/src/discovery/enr_ext.rs index c32008b624..514fb3a8e2 100644 --- a/beacon_node/eth2-libp2p/src/discovery/enr_ext.rs +++ b/beacon_node/eth2-libp2p/src/discovery/enr_ext.rs @@ -5,7 +5,7 @@ use libp2p::core::{identity::Keypair, multiaddr::Protocol}; use tiny_keccak::{Hasher, Keccak}; /// Extend ENR for libp2p types. -pub trait ENRExt { +pub trait EnrExt { /// The libp2p `PeerId` for the record. fn peer_id(&self) -> PeerId; @@ -26,7 +26,7 @@ pub trait CombinedKeyExt { fn from_libp2p(key: &libp2p::core::identity::Keypair) -> Result; } -impl ENRExt for Enr { +impl EnrExt for Enr { /// The libp2p `PeerId` for the record. fn peer_id(&self) -> PeerId { self.public_key().into_peer_id() diff --git a/beacon_node/eth2-libp2p/src/discovery/mod.rs b/beacon_node/eth2-libp2p/src/discovery/mod.rs index 3d89f2877b..f46338600e 100644 --- a/beacon_node/eth2-libp2p/src/discovery/mod.rs +++ b/beacon_node/eth2-libp2p/src/discovery/mod.rs @@ -4,7 +4,7 @@ pub mod enr_ext; // Allow external use of the lighthouse ENR builder pub use enr::{build_enr, CombinedKey, Keypair}; -use enr_ext::{CombinedKeyExt, ENRExt}; +use enr_ext::{CombinedKeyExt, EnrExt}; use crate::metrics; use crate::{error, Enr, NetworkConfig, NetworkGlobals}; diff --git a/beacon_node/eth2-libp2p/src/lib.rs b/beacon_node/eth2-libp2p/src/lib.rs index 7346866b11..839986e5a0 100644 --- a/beacon_node/eth2-libp2p/src/lib.rs +++ b/beacon_node/eth2-libp2p/src/lib.rs @@ -17,7 +17,7 @@ pub mod types; pub use crate::types::{error, Enr, GossipTopic, NetworkGlobals, PubsubMessage}; pub use behaviour::BehaviourEvent; pub use config::Config as NetworkConfig; -pub use discovery::enr_ext::{CombinedKeyExt, ENRExt}; +pub use discovery::enr_ext::{CombinedKeyExt, EnrExt}; pub use libp2p::gossipsub::{MessageId, Topic, TopicHash}; pub use libp2p::{multiaddr, Multiaddr}; pub use libp2p::{PeerId, Swarm}; diff --git a/beacon_node/eth2-libp2p/src/rpc/methods.rs b/beacon_node/eth2-libp2p/src/rpc/methods.rs index e4b5b67144..cb601c14bc 100644 --- a/beacon_node/eth2-libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2-libp2p/src/rpc/methods.rs @@ -164,7 +164,7 @@ pub enum RPCResponse { } /// Indicates which response is being terminated by a stream termination response. -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum ResponseTermination { /// Blocks by range stream termination. BlocksByRange, @@ -175,7 +175,7 @@ pub enum ResponseTermination { /// The structured response containing a result/code indicating success or failure /// and the contents of the response -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum RPCCodedResponse { /// The response is a successful. Success(RPCResponse), @@ -194,7 +194,7 @@ pub enum RPCCodedResponse { } /// The code assigned to an erroneous `RPCResponse`. -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum RPCResponseErrorCode { InvalidRequest, ServerError, @@ -268,7 +268,7 @@ impl RPCCodedResponse { } } -#[derive(Encode, Decode, Debug)] +#[derive(Encode, Decode, Debug, Clone)] pub struct ErrorMessage { /// The UTF-8 encoded Error message string. pub error_message: Vec, diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index 269e70054d..e0c4c28151 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -5,10 +5,10 @@ //! syncing. use handler::RPCHandler; -use libp2p::core::ConnectedPoint; +use libp2p::core::{connection::ConnectionId, ConnectedPoint}; use libp2p::swarm::{ - protocols_handler::ProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters, - SubstreamProtocol, + protocols_handler::ProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, + PollParameters, SubstreamProtocol, }; use libp2p::{Multiaddr, PeerId}; pub use methods::{ @@ -20,7 +20,6 @@ use slog::{debug, o}; use std::marker::PhantomData; use std::task::{Context, Poll}; use std::time::Duration; -use tokio::io::{AsyncRead, AsyncWrite}; use types::EthSpec; pub(crate) mod codec; @@ -29,7 +28,7 @@ pub mod methods; mod protocol; /// The return type used in the behaviour and the resultant event from the protocols handler. -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum RPCEvent { /// An inbound/outbound request for RPC protocol. The first parameter is a sequential /// id which tracks an awaiting substream for the response. @@ -80,7 +79,6 @@ impl RPC { let log = log.new(o!("service" => "libp2p_rpc")); RPC { events: Vec::new(), - marker: PhantomData, log, } } @@ -89,8 +87,9 @@ impl RPC { /// /// The peer must be connected for this to succeed. pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { - self.events.push(NetworkBehaviourAction::SendEvent { + self.events.push(NetworkBehaviourAction::NotifyHandler { peer_id, + handler: NotifyHandler::Any, event: rpc_event, }); } @@ -118,7 +117,16 @@ where Vec::new() } - fn inject_connected(&mut self, peer_id: PeerId, connected_point: ConnectedPoint) { + // Use connection established/closed instead of these currently + fn inject_connected(&mut self, _peer_id: &PeerId) {} + fn inject_disconnected(&mut self, _peer_id: &PeerId) {} + + fn inject_connection_established( + &mut self, + peer_id: &PeerId, + _: &ConnectionId, + connected_point: &ConnectedPoint, + ) { // TODO: Remove this on proper peer discovery self.events.push(NetworkBehaviourAction::GenerateEvent( RPCMessage::PeerConnectedHack(peer_id.clone(), connected_point.clone()), @@ -134,13 +142,19 @@ where debug!(self.log, "Requesting new peer's metadata"; "peer_id" => format!("{}",peer_id)); let rpc_event = RPCEvent::Request(RequestId::from(0usize), RPCRequest::MetaData(PhantomData)); - self.events.push(NetworkBehaviourAction::SendEvent { - peer_id, + self.events.push(NetworkBehaviourAction::NotifyHandler { + peer_id: peer_id.clone(), + handler: NotifyHandler::Any, event: rpc_event, }); } - fn inject_disconnected(&mut self, peer_id: &PeerId, connected_point: ConnectedPoint) { + fn inject_connection_closed( + &mut self, + peer_id: &PeerId, + _: &ConnectionId, + connected_point: &ConnectedPoint, + ) { // TODO: Remove this on proper peer discovery self.events.push(NetworkBehaviourAction::GenerateEvent( RPCMessage::PeerDisconnectedHack(peer_id.clone(), connected_point.clone()), @@ -155,6 +169,7 @@ where fn inject_event( &mut self, source: PeerId, + _: ConnectionId, event: ::OutEvent, ) { // send the event to the user diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index b74af76ec5..a89c03c4e4 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -361,7 +361,7 @@ where fn upgrade_outbound(self, socket: TSocket, protocol: Self::Info) -> Self::Future { // convert to a tokio compatible socket - let socket = socket.comapt(); + let socket = socket.compat(); let codec = match protocol.encoding { Encoding::SSZSnappy => { let ssz_snappy_codec = @@ -381,13 +381,13 @@ where } /// Error in RPC Encoding/Decoding. -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum RPCError { /// Error when decoding the raw buffer from ssz. // NOTE: in the future a ssz::DecodeError should map to an InvalidData error SSZDecodeError(ssz::DecodeError), /// IO Error. - IoError(io::Error), + IoError(String), /// The peer returned a valid response but the response indicated an error. ErrorResponse(RPCResponseErrorCode), /// Timed out waiting for a response. @@ -418,7 +418,7 @@ impl From for RPCError { impl From for RPCError { fn from(err: io::Error) -> Self { - RPCError::IoError(err) + RPCError::IoError(err.to_string()) } } diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index 435d525664..476f16fd1f 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -2,6 +2,7 @@ use crate::behaviour::{Behaviour, BehaviourEvent}; use crate::discovery::enr; use crate::multiaddr::Protocol; use crate::types::{error, GossipKind}; +use crate::EnrExt; use crate::{NetworkConfig, NetworkGlobals}; use futures::prelude::*; use futures::Stream; diff --git a/beacon_node/eth2-libp2p/src/types/globals.rs b/beacon_node/eth2-libp2p/src/types/globals.rs index 3912bf86c3..60ae12a8c5 100644 --- a/beacon_node/eth2-libp2p/src/types/globals.rs +++ b/beacon_node/eth2-libp2p/src/types/globals.rs @@ -2,6 +2,7 @@ use crate::peer_manager::PeerDB; use crate::rpc::methods::MetaData; use crate::types::SyncState; +use crate::EnrExt; use crate::{discovery::enr::Eth2Enr, Enr, GossipTopic, Multiaddr, PeerId}; use parking_lot::RwLock; use std::collections::HashSet;