Update RPC to master libp2p

This commit is contained in:
Age Manning
2020-05-04 18:08:48 +10:00
parent a43381e3d5
commit 35838dbfbe
8 changed files with 40 additions and 23 deletions

View File

@@ -5,7 +5,7 @@ use libp2p::core::{identity::Keypair, multiaddr::Protocol};
use tiny_keccak::{Hasher, Keccak}; use tiny_keccak::{Hasher, Keccak};
/// Extend ENR for libp2p types. /// Extend ENR for libp2p types.
pub trait ENRExt { pub trait EnrExt {
/// The libp2p `PeerId` for the record. /// The libp2p `PeerId` for the record.
fn peer_id(&self) -> PeerId; fn peer_id(&self) -> PeerId;
@@ -26,7 +26,7 @@ pub trait CombinedKeyExt {
fn from_libp2p(key: &libp2p::core::identity::Keypair) -> Result<CombinedKey, &'static str>; fn from_libp2p(key: &libp2p::core::identity::Keypair) -> Result<CombinedKey, &'static str>;
} }
impl ENRExt for Enr { impl EnrExt for Enr {
/// The libp2p `PeerId` for the record. /// The libp2p `PeerId` for the record.
fn peer_id(&self) -> PeerId { fn peer_id(&self) -> PeerId {
self.public_key().into_peer_id() self.public_key().into_peer_id()

View File

@@ -4,7 +4,7 @@ pub mod enr_ext;
// Allow external use of the lighthouse ENR builder // Allow external use of the lighthouse ENR builder
pub use enr::{build_enr, CombinedKey, Keypair}; pub use enr::{build_enr, CombinedKey, Keypair};
use enr_ext::{CombinedKeyExt, ENRExt}; use enr_ext::{CombinedKeyExt, EnrExt};
use crate::metrics; use crate::metrics;
use crate::{error, Enr, NetworkConfig, NetworkGlobals}; use crate::{error, Enr, NetworkConfig, NetworkGlobals};

View File

@@ -17,7 +17,7 @@ pub mod types;
pub use crate::types::{error, Enr, GossipTopic, NetworkGlobals, PubsubMessage}; pub use crate::types::{error, Enr, GossipTopic, NetworkGlobals, PubsubMessage};
pub use behaviour::BehaviourEvent; pub use behaviour::BehaviourEvent;
pub use config::Config as NetworkConfig; pub use config::Config as NetworkConfig;
pub use discovery::enr_ext::{CombinedKeyExt, ENRExt}; pub use discovery::enr_ext::{CombinedKeyExt, EnrExt};
pub use libp2p::gossipsub::{MessageId, Topic, TopicHash}; pub use libp2p::gossipsub::{MessageId, Topic, TopicHash};
pub use libp2p::{multiaddr, Multiaddr}; pub use libp2p::{multiaddr, Multiaddr};
pub use libp2p::{PeerId, Swarm}; pub use libp2p::{PeerId, Swarm};

View File

@@ -164,7 +164,7 @@ pub enum RPCResponse<T: EthSpec> {
} }
/// Indicates which response is being terminated by a stream termination response. /// Indicates which response is being terminated by a stream termination response.
#[derive(Debug)] #[derive(Debug, Clone)]
pub enum ResponseTermination { pub enum ResponseTermination {
/// Blocks by range stream termination. /// Blocks by range stream termination.
BlocksByRange, BlocksByRange,
@@ -175,7 +175,7 @@ pub enum ResponseTermination {
/// The structured response containing a result/code indicating success or failure /// The structured response containing a result/code indicating success or failure
/// and the contents of the response /// and the contents of the response
#[derive(Debug)] #[derive(Debug, Clone)]
pub enum RPCCodedResponse<T: EthSpec> { pub enum RPCCodedResponse<T: EthSpec> {
/// The response is a successful. /// The response is a successful.
Success(RPCResponse<T>), Success(RPCResponse<T>),
@@ -194,7 +194,7 @@ pub enum RPCCodedResponse<T: EthSpec> {
} }
/// The code assigned to an erroneous `RPCResponse`. /// The code assigned to an erroneous `RPCResponse`.
#[derive(Debug)] #[derive(Debug, Clone)]
pub enum RPCResponseErrorCode { pub enum RPCResponseErrorCode {
InvalidRequest, InvalidRequest,
ServerError, ServerError,
@@ -268,7 +268,7 @@ impl<T: EthSpec> RPCCodedResponse<T> {
} }
} }
#[derive(Encode, Decode, Debug)] #[derive(Encode, Decode, Debug, Clone)]
pub struct ErrorMessage { pub struct ErrorMessage {
/// The UTF-8 encoded Error message string. /// The UTF-8 encoded Error message string.
pub error_message: Vec<u8>, pub error_message: Vec<u8>,

View File

@@ -5,10 +5,10 @@
//! syncing. //! syncing.
use handler::RPCHandler; use handler::RPCHandler;
use libp2p::core::ConnectedPoint; use libp2p::core::{connection::ConnectionId, ConnectedPoint};
use libp2p::swarm::{ use libp2p::swarm::{
protocols_handler::ProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters, protocols_handler::ProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler,
SubstreamProtocol, PollParameters, SubstreamProtocol,
}; };
use libp2p::{Multiaddr, PeerId}; use libp2p::{Multiaddr, PeerId};
pub use methods::{ pub use methods::{
@@ -20,7 +20,6 @@ use slog::{debug, o};
use std::marker::PhantomData; use std::marker::PhantomData;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite};
use types::EthSpec; use types::EthSpec;
pub(crate) mod codec; pub(crate) mod codec;
@@ -29,7 +28,7 @@ pub mod methods;
mod protocol; mod protocol;
/// The return type used in the behaviour and the resultant event from the protocols handler. /// The return type used in the behaviour and the resultant event from the protocols handler.
#[derive(Debug)] #[derive(Debug, Clone)]
pub enum RPCEvent<T: EthSpec> { pub enum RPCEvent<T: EthSpec> {
/// An inbound/outbound request for RPC protocol. The first parameter is a sequential /// An inbound/outbound request for RPC protocol. The first parameter is a sequential
/// id which tracks an awaiting substream for the response. /// id which tracks an awaiting substream for the response.
@@ -80,7 +79,6 @@ impl<TSpec: EthSpec> RPC<TSpec> {
let log = log.new(o!("service" => "libp2p_rpc")); let log = log.new(o!("service" => "libp2p_rpc"));
RPC { RPC {
events: Vec::new(), events: Vec::new(),
marker: PhantomData,
log, log,
} }
} }
@@ -89,8 +87,9 @@ impl<TSpec: EthSpec> RPC<TSpec> {
/// ///
/// The peer must be connected for this to succeed. /// The peer must be connected for this to succeed.
pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent<TSpec>) { pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent<TSpec>) {
self.events.push(NetworkBehaviourAction::SendEvent { self.events.push(NetworkBehaviourAction::NotifyHandler {
peer_id, peer_id,
handler: NotifyHandler::Any,
event: rpc_event, event: rpc_event,
}); });
} }
@@ -118,7 +117,16 @@ where
Vec::new() Vec::new()
} }
fn inject_connected(&mut self, peer_id: PeerId, connected_point: ConnectedPoint) { // Use connection established/closed instead of these currently
fn inject_connected(&mut self, _peer_id: &PeerId) {}
fn inject_disconnected(&mut self, _peer_id: &PeerId) {}
fn inject_connection_established(
&mut self,
peer_id: &PeerId,
_: &ConnectionId,
connected_point: &ConnectedPoint,
) {
// TODO: Remove this on proper peer discovery // TODO: Remove this on proper peer discovery
self.events.push(NetworkBehaviourAction::GenerateEvent( self.events.push(NetworkBehaviourAction::GenerateEvent(
RPCMessage::PeerConnectedHack(peer_id.clone(), connected_point.clone()), RPCMessage::PeerConnectedHack(peer_id.clone(), connected_point.clone()),
@@ -134,13 +142,19 @@ where
debug!(self.log, "Requesting new peer's metadata"; "peer_id" => format!("{}",peer_id)); debug!(self.log, "Requesting new peer's metadata"; "peer_id" => format!("{}",peer_id));
let rpc_event = let rpc_event =
RPCEvent::Request(RequestId::from(0usize), RPCRequest::MetaData(PhantomData)); RPCEvent::Request(RequestId::from(0usize), RPCRequest::MetaData(PhantomData));
self.events.push(NetworkBehaviourAction::SendEvent { self.events.push(NetworkBehaviourAction::NotifyHandler {
peer_id, peer_id: peer_id.clone(),
handler: NotifyHandler::Any,
event: rpc_event, event: rpc_event,
}); });
} }
fn inject_disconnected(&mut self, peer_id: &PeerId, connected_point: ConnectedPoint) { fn inject_connection_closed(
&mut self,
peer_id: &PeerId,
_: &ConnectionId,
connected_point: &ConnectedPoint,
) {
// TODO: Remove this on proper peer discovery // TODO: Remove this on proper peer discovery
self.events.push(NetworkBehaviourAction::GenerateEvent( self.events.push(NetworkBehaviourAction::GenerateEvent(
RPCMessage::PeerDisconnectedHack(peer_id.clone(), connected_point.clone()), RPCMessage::PeerDisconnectedHack(peer_id.clone(), connected_point.clone()),
@@ -155,6 +169,7 @@ where
fn inject_event( fn inject_event(
&mut self, &mut self,
source: PeerId, source: PeerId,
_: ConnectionId,
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent, event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
) { ) {
// send the event to the user // send the event to the user

View File

@@ -361,7 +361,7 @@ where
fn upgrade_outbound(self, socket: TSocket, protocol: Self::Info) -> Self::Future { fn upgrade_outbound(self, socket: TSocket, protocol: Self::Info) -> Self::Future {
// convert to a tokio compatible socket // convert to a tokio compatible socket
let socket = socket.comapt(); let socket = socket.compat();
let codec = match protocol.encoding { let codec = match protocol.encoding {
Encoding::SSZSnappy => { Encoding::SSZSnappy => {
let ssz_snappy_codec = let ssz_snappy_codec =
@@ -381,13 +381,13 @@ where
} }
/// Error in RPC Encoding/Decoding. /// Error in RPC Encoding/Decoding.
#[derive(Debug)] #[derive(Debug, Clone)]
pub enum RPCError { pub enum RPCError {
/// Error when decoding the raw buffer from ssz. /// Error when decoding the raw buffer from ssz.
// NOTE: in the future a ssz::DecodeError should map to an InvalidData error // NOTE: in the future a ssz::DecodeError should map to an InvalidData error
SSZDecodeError(ssz::DecodeError), SSZDecodeError(ssz::DecodeError),
/// IO Error. /// IO Error.
IoError(io::Error), IoError(String),
/// The peer returned a valid response but the response indicated an error. /// The peer returned a valid response but the response indicated an error.
ErrorResponse(RPCResponseErrorCode), ErrorResponse(RPCResponseErrorCode),
/// Timed out waiting for a response. /// Timed out waiting for a response.
@@ -418,7 +418,7 @@ impl From<tokio::time::Elapsed> for RPCError {
impl From<io::Error> for RPCError { impl From<io::Error> for RPCError {
fn from(err: io::Error) -> Self { fn from(err: io::Error) -> Self {
RPCError::IoError(err) RPCError::IoError(err.to_string())
} }
} }

View File

@@ -2,6 +2,7 @@ use crate::behaviour::{Behaviour, BehaviourEvent};
use crate::discovery::enr; use crate::discovery::enr;
use crate::multiaddr::Protocol; use crate::multiaddr::Protocol;
use crate::types::{error, GossipKind}; use crate::types::{error, GossipKind};
use crate::EnrExt;
use crate::{NetworkConfig, NetworkGlobals}; use crate::{NetworkConfig, NetworkGlobals};
use futures::prelude::*; use futures::prelude::*;
use futures::Stream; use futures::Stream;

View File

@@ -2,6 +2,7 @@
use crate::peer_manager::PeerDB; use crate::peer_manager::PeerDB;
use crate::rpc::methods::MetaData; use crate::rpc::methods::MetaData;
use crate::types::SyncState; use crate::types::SyncState;
use crate::EnrExt;
use crate::{discovery::enr::Eth2Enr, Enr, GossipTopic, Multiaddr, PeerId}; use crate::{discovery::enr::Eth2Enr, Enr, GossipTopic, Multiaddr, PeerId};
use parking_lot::RwLock; use parking_lot::RwLock;
use std::collections::HashSet; use std::collections::HashSet;