Partial eth2-libp2p stable future upgrade

This commit is contained in:
Age Manning
2020-04-30 23:09:48 +10:00
parent 522d0e1201
commit 08838fca23
11 changed files with 245 additions and 151 deletions

View File

@@ -3,22 +3,21 @@ use crate::peer_manager::{PeerManager, PeerManagerEvent};
use crate::rpc::*;
use crate::types::{GossipEncoding, GossipKind, GossipTopic};
use crate::{error, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash};
use discv5::Discv5Event;
use futures::prelude::*;
use libp2p::{
core::{identity::Keypair, ConnectedPoint},
discv5::Discv5Event,
gossipsub::{Gossipsub, GossipsubEvent, MessageId},
identify::{Identify, IdentifyEvent},
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess},
NetworkBehaviour, PeerId,
};
use tokio::io::{AsyncRead, AsyncWrite};
use lru::LruCache;
use slog::{crit, debug, o, warn};
use std::marker::PhantomData;
use std::sync::Arc;
use types::{EnrForkId, EthSpec, SubnetId};
use std::task::Poll;
use types::{EnrForkId, EthSpec, SubnetId};
const MAX_IDENTIFY_ADDRESSES: usize = 10;
@@ -27,11 +26,11 @@ const MAX_IDENTIFY_ADDRESSES: usize = 10;
/// behaviours.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "BehaviourEvent<TSpec>", poll_method = "poll")]
pub struct Behaviour<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> {
pub struct Behaviour<TSpec: EthSpec> {
/// The routing pub-sub mechanism for eth2.
gossipsub: Gossipsub,
/// The Eth2 RPC specified in the wire-0 protocol.
eth2_rpc: RPC<TSubstream, TSpec>,
eth2_rpc: RPC<TSpec>,
/// Keep regular connection to peers and disconnect if absent.
// TODO: Using id for initial interop. This will be removed by mainnet.
/// Provides IP addresses and peer information.
@@ -66,7 +65,7 @@ pub struct Behaviour<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> {
}
/// Implements the combined behaviour for the libp2p service.
impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> Behaviour<TSubstream, TSpec> {
impl<TSpec: EthSpec> Behaviour<TSpec> {
pub fn new(
local_key: &Keypair,
net_conf: &NetworkConfig,
@@ -330,9 +329,7 @@ impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> Behaviour<TSubstream, T
}
// Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for Behaviour
impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec>
NetworkBehaviourEventProcess<GossipsubEvent> for Behaviour<TSubstream, TSpec>
{
impl<TSpec: EthSpec> NetworkBehaviourEventProcess<GossipsubEvent> for Behaviour<TSpec> {
fn inject_event(&mut self, event: GossipsubEvent) {
match event {
GossipsubEvent::Message(propagation_source, id, gs_msg) => {
@@ -373,9 +370,7 @@ impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec>
}
}
impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec>
NetworkBehaviourEventProcess<RPCMessage<TSpec>> for Behaviour<TSubstream, TSpec>
{
impl<TSpec: EthSpec> NetworkBehaviourEventProcess<RPCMessage<TSpec>> for Behaviour<TSpec> {
fn inject_event(&mut self, event: RPCMessage<TSpec>) {
match event {
// TODO: These are temporary methods to give access to injected behaviour
@@ -462,7 +457,7 @@ impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec>
}
}
impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> Behaviour<TSubstream, TSpec> {
impl<TSpec: EthSpec> Behaviour<TSpec> {
/// Consumes the events list when polled.
fn poll<TBehaviourIn>(
&mut self,
@@ -508,9 +503,7 @@ impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> Behaviour<TSubstream, T
}
}
impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> NetworkBehaviourEventProcess<IdentifyEvent>
for Behaviour<TSubstream, TSpec>
{
impl<TSpec: EthSpec> NetworkBehaviourEventProcess<IdentifyEvent> for Behaviour<TSpec> {
fn inject_event(&mut self, event: IdentifyEvent) {
match event {
IdentifyEvent::Received {
@@ -542,9 +535,7 @@ impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> NetworkBehaviourEventPr
}
}
impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> NetworkBehaviourEventProcess<Discv5Event>
for Behaviour<TSubstream, TSpec>
{
impl<TSpec: EthSpec> NetworkBehaviourEventProcess<Discv5Event> for Behaviour<TSpec> {
fn inject_event(&mut self, _event: Discv5Event) {
// discv5 has no events to inject
}

View File

@@ -1,6 +1,6 @@
use crate::types::GossipKind;
use crate::Enr;
use libp2p::discv5::{Discv5Config, Discv5ConfigBuilder};
use discv5::{Discv5Config, Discv5ConfigBuilder};
use libp2p::gossipsub::{GossipsubConfig, GossipsubConfigBuilder, GossipsubMessage, MessageId};
use libp2p::Multiaddr;
use serde_derive::{Deserialize, Serialize};

View File

@@ -1,11 +1,11 @@
//! Helper functions and an extension trait for Ethereum 2 ENRs.
pub use libp2p::{core::identity::Keypair, discv5::enr::CombinedKey};
pub use discv5::enr::{CombinedKey, EnrBuilder};
pub use libp2p::core::identity::Keypair;
use super::ENR_FILENAME;
use crate::types::{Enr, EnrBitfield};
use crate::NetworkConfig;
use libp2p::discv5::enr::EnrBuilder;
use slog::{debug, warn};
use ssz::{Decode, Encode};
use ssz_types::BitVector;

View File

@@ -6,14 +6,14 @@ pub use enr::{build_enr, CombinedKey, Keypair};
use crate::metrics;
use crate::{error, Enr, NetworkConfig, NetworkGlobals};
use discv5::{enr::NodeId, Discv5, Discv5Event};
use enr::{Eth2Enr, BITFIELD_ENR_KEY, ETH2_ENR_KEY};
use futures::prelude::*;
use libp2p::core::{Multiaddr, PeerId};
use libp2p::discv5::enr::NodeId;
use libp2p::discv5::{Discv5, Discv5Event};
use libp2p::core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId};
use libp2p::multiaddr::Protocol;
use libp2p::swarm::{
DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler,
protocols_handler::DummyProtocolsHandler, DialPeerCondition, NetworkBehaviour,
NetworkBehaviourAction, PollParameters, ProtocolsHandler,
};
use slog::{crit, debug, info, warn};
use ssz::{Decode, Encode};
@@ -74,7 +74,7 @@ pub struct Discovery<TSpec: EthSpec> {
}
impl<TSpec: EthSpec> Discovery<TSpec> {
pub async fn new(
pub fn new(
local_key: &Keypair,
config: &NetworkConfig,
network_globals: Arc<NetworkGlobals<TSpec>>,
@@ -93,13 +93,17 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
let listen_socket = SocketAddr::new(config.listen_address, config.discovery_port);
// convert the keypair into an ENR key
let enr_key: CombinedKey = local_key
.try_into()
.map_err(|_| "Invalid key type for ENR records")?;
let mut discovery = Discv5::new(
local_enr,
local_key.clone(),
enr_key,
config.discv5_config.clone(),
listen_socket,
)
.await
.map_err(|e| format!("Discv5 service failed. Error: {:?}", e))?;
// Add bootnodes to routing table
@@ -345,23 +349,66 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
}
}
// Redirect all behaviour events to underlying discovery behaviour.
// Build a dummy Network behaviour around the discv5 server
impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
type ProtocolsHandler = <Discv5 as NetworkBehaviour>::ProtocolsHandler;
type OutEvent = <Discv5 as NetworkBehaviour>::OutEvent;
type ProtocolsHandler = DummyProtocolsHandler;
type OutEvent = Discv5Event;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
NetworkBehaviour::new_handler(&mut self.discovery)
DummyProtocolsHandler::default()
}
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
// Let discovery track possible known peers.
self.discovery.addresses_of_peer(peer_id)
// TODO
// Addresses are ordered by decreasing likelyhood of connectivity, so start with
// the addresses of that peer in the k-buckets.
/*
if let Some(node_id) = self.known_peer_ids.get(peer_id) {
let key = kbucket::Key::from(node_id.clone());
let mut out_list =
if let kbucket::Entry::Present(mut entry, _) = self.kbuckets.entry(&key) {
entry.value().multiaddr().to_vec()
} else {
Vec::new()
};
// ENR's may have multiple Multiaddrs. The multi-addr associated with the UDP
// port is removed, which is assumed to be associated with the discv5 protocol (and
// therefore irrelevant for other libp2p components).
out_list.retain(|addr| {
addr.iter()
.find(|v| match v {
Protocol::Udp(_) => true,
_ => false,
})
.is_none()
});
out_list
} else {
// PeerId is not known
Vec::new()
}
*/
Vec::new()
}
fn inject_connected(&mut self, _peer_id: &PeerId) {}
// ignore libp2p connections/streams
fn inject_connected(&mut self, _: PeerId, _: ConnectedPoint) {}
fn inject_disconnected(&mut self, _peer_id: &PeerId) {}
// ignore libp2p connections/streams
fn inject_disconnected(&mut self, _: &PeerId, _: ConnectedPoint) {}
// no libp2p discv5 events - event originate from the session_service.
fn inject_event(
&mut self,
_: PeerId,
_: ConnectionId,
_event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
) {
void::unreachable(_event)
}
fn poll(
&mut self,

View File

@@ -75,14 +75,19 @@ where
/* Base Inbound Codec */
// This Encodes RPC Responses sent to external peers
impl<TCodec, TSpec> Encoder<RPCErrorResponse<TSpec>> for BaseInboundCodec<TCodec, TSpec, RPCErrorResponse<TSpec>>
impl<TCodec, TSpec> Encoder<RPCErrorResponse<TSpec>>
for BaseInboundCodec<TCodec, TSpec, RPCErrorResponse<TSpec>>
where
TSpec: EthSpec,
TCodec: Decoder + Encoder<RPCErrorResponse<TSpec>>,
{
type Error = <TCodec as Encoder<RPCErrorResponse<TSpec>>>::Error;
fn encode(&mut self, item: RPCErrorResponse<TSpec>, dst: &mut BytesMut) -> Result<(), Self::Error> {
fn encode(
&mut self,
item: RPCErrorResponse<TSpec>,
dst: &mut BytesMut,
) -> Result<(), Self::Error> {
dst.clear();
dst.reserve(1);
dst.put_u8(
@@ -112,7 +117,8 @@ where
/* Base Outbound Codec */
// This Encodes RPC Requests sent to external peers
impl<TCodec, TSpec> Encoder<RPCRequest<TSpec>> for BaseOutboundCodec<TCodec, TSpec, RPCRequest<TSpec>>
impl<TCodec, TSpec> Encoder<RPCRequest<TSpec>>
for BaseOutboundCodec<TCodec, TSpec, RPCRequest<TSpec>>
where
TSpec: EthSpec,
TCodec: OutboundCodec<RPCRequest<TSpec>> + Encoder<RPCRequest<TSpec>>,
@@ -128,7 +134,8 @@ where
impl<TCodec, TSpec> Decoder for BaseOutboundCodec<TCodec, TSpec, RPCRequest<TSpec>>
where
TSpec: EthSpec,
TCodec: OutboundCodec<RPCRequest<TSpec>, ErrorType = ErrorMessage> + Decoder<Item = RPCResponse<TSpec>>,
TCodec: OutboundCodec<RPCRequest<TSpec>, ErrorType = ErrorMessage>
+ Decoder<Item = RPCResponse<TSpec>>,
{
type Item = RPCErrorResponse<TSpec>;
type Error = <TCodec as Decoder>::Error;

View File

@@ -5,19 +5,21 @@ use super::methods::{ErrorMessage, RPCErrorResponse, RequestId, ResponseTerminat
use super::protocol::{RPCError, RPCProtocol, RPCRequest};
use super::RPCEvent;
use crate::rpc::protocol::{InboundFramed, OutboundFramed};
use core::marker::PhantomData;
use fnv::FnvHashMap;
use futures::prelude::*;
use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError};
use libp2p::swarm::protocols_handler::{
KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol,
};
use libp2p::swarm::NegotiatedSubstream;
use slog::{crit, debug, error, trace, warn};
use smallvec::SmallVec;
use std::collections::hash_map::Entry;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use tokio::io::{AsyncRead, AsyncWrite};
use std::{
collections::hash_map::Entry,
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant},
};
use tokio::time::{delay_queue, DelayQueue};
use types::EthSpec;
@@ -38,9 +40,8 @@ type InboundRequestId = RequestId;
type OutboundRequestId = RequestId;
/// Implementation of `ProtocolsHandler` for the RPC protocol.
pub struct RPCHandler<'a, TSubstream, TSpec>
pub struct RPCHandler<TSpec>
where
TSubstream: AsyncRead + AsyncWrite,
TSpec: EthSpec,
{
/// The upgrade for inbound substreams.
@@ -59,23 +60,16 @@ where
dial_negotiated: u32,
/// Current inbound substreams awaiting processing.
inbound_substreams: FnvHashMap<
InboundRequestId,
(
InboundSubstreamState<'a, TSubstream, TSpec>,
Option<delay_queue::Key>,
),
>,
inbound_substreams:
FnvHashMap<InboundRequestId, (InboundSubstreamState<TSpec>, Option<delay_queue::Key>)>,
/// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout.
inbound_substreams_delay: DelayQueue<InboundRequestId>,
/// Map of outbound substreams that need to be driven to completion. The `RequestId` is
/// maintained by the application sending the request.
outbound_substreams: FnvHashMap<
OutboundRequestId,
(OutboundSubstreamState<TSubstream, TSpec>, delay_queue::Key),
>,
outbound_substreams:
FnvHashMap<OutboundRequestId, (OutboundSubstreamState<TSpec>, delay_queue::Key)>,
/// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout.
outbound_substreams_delay: DelayQueue<OutboundRequestId>,
@@ -103,16 +97,14 @@ where
log: slog::Logger,
}
/// State of an outbound substream. Either waiting for a response, or in the process of sending.
pub enum InboundSubstreamState<'a, TSubstream, TSpec>
pub enum InboundSubstreamState<TSpec>
where
TSubstream: AsyncRead + AsyncWrite + Unpin,
TSpec: EthSpec,
{
/// A response has been sent, pending writing and flush.
ResponsePendingSend {
/// The substream used to send the response
substream: futures::sink::Send<'a, InboundFramed<TSubstream, TSpec>, RPCErrorResponse<TSpec>>,
substream: InboundFramed<NegotiatedSubstream, TSpec>,
/// Whether a stream termination is requested. If true the stream will be closed after
/// this send. Otherwise it will transition to an idle state until a stream termination is
/// requested or a timeout is reached.
@@ -120,29 +112,30 @@ where
},
/// The response stream is idle and awaiting input from the application to send more chunked
/// responses.
ResponseIdle(InboundFramed<TSubstream, TSpec>),
ResponseIdle(InboundFramed<NegotiatedSubstream, TSpec>),
/// The substream is attempting to shutdown.
Closing(InboundFramed<TSubstream, TSpec>),
Closing(InboundFramed<NegotiatedSubstream, TSpec>),
/// Temporary state during processing
Poisoned,
}
pub enum OutboundSubstreamState<TSubstream, TSpec: EthSpec> {
/// State of an outbound substream. Either waiting for a response, or in the process of sending.
pub enum OutboundSubstreamState<TSpec: EthSpec> {
/// A request has been sent, and we are awaiting a response. This future is driven in the
/// handler because GOODBYE requests can be handled and responses dropped instantly.
RequestPendingResponse {
/// The framed negotiated substream.
substream: OutboundFramed<TSubstream, TSpec>,
substream: OutboundFramed<NegotiatedSubstream, TSpec>,
/// Keeps track of the actual request sent.
request: RPCRequest<TSpec>,
},
/// Closing an outbound substream>
Closing(OutboundFramed<TSubstream, TSpec>),
Closing(OutboundFramed<NegotiatedSubstream, TSpec>),
/// Temporary state during processing
Poisoned,
}
impl<'a, TSubstream, TSpec> InboundSubstreamState<'a, TSpec, TSubstream>
impl<TSpec> InboundSubstreamState<TSpec>
where
TSpec: EthSpec,
{
@@ -188,9 +181,8 @@ where
}
}
impl<'a, TSubstream, TSpec> RPCHandler<'a, TSubstream, TSpec>
impl<TSpec> RPCHandler<TSpec>
where
TSubstream: AsyncRead + AsyncWrite,
TSpec: EthSpec,
{
pub fn new(
@@ -247,9 +239,8 @@ where
}
}
impl<'a, TSubstream, TSpec> ProtocolsHandler for RPCHandler<'a, TSubstream, TSpec>
impl<TSpec> ProtocolsHandler for RPCHandler<TSpec>
where
TSubstream: AsyncRead + AsyncWrite + Unpin,
TSpec: EthSpec,
{
type InEvent = RPCEvent<TSpec>;
@@ -265,14 +256,14 @@ where
fn inject_fully_negotiated_inbound(
&mut self,
out: <RPCProtocol<TSpec> as InboundUpgrade<TSubstream>>::Output,
substream: <RPCProtocol<TSpec> as InboundUpgrade<NegotiatedSubstream>>::Output,
) {
// update the keep alive timeout if there are no more remaining outbound streams
if let KeepAlive::Until(_) = self.keep_alive {
self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout);
}
let (req, substream) = out;
let (req, substream) = substream;
// drop the stream and return a 0 id for goodbye "requests"
if let r @ RPCRequest::Goodbye(_) = req {
self.events_out.push(RPCEvent::Request(0, r));
@@ -297,7 +288,7 @@ where
fn inject_fully_negotiated_outbound(
&mut self,
out: <RPCRequest<TSpec> as OutboundUpgrade<TSubstream>>::Output,
out: <RPCRequest<TSpec> as OutboundUpgrade<NegotiatedSubstream>>::Output,
rpc_event: Self::OutboundOpenInfo,
) {
self.dial_negotiated -= 1;
@@ -452,6 +443,7 @@ where
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ProtocolsHandlerEvent<
Self::OutboundProtocol,
@@ -533,21 +525,18 @@ where
}
// purge expired outbound substreams
if let Poll::Ready(Some(d)) =
self.outbound_substreams_delay.poll() {
let stream_id = d.map_err(|e| {
if let Poll::Ready(Some(d)) = self.outbound_substreams_delay.poll() {
let stream_id = d.map_err(|e| {
warn!(self.log, "Outbound substream poll failed"; "error" => format!("{:?}", e));
ProtocolsHandlerUpgrErr::Timer
})?;
self.outbound_substreams.remove(stream_id.get_ref());
// notify the user
return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(
*stream_id.get_ref(),
RPCError::Custom("Stream timed out".into()),
),
)));
return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom(RPCEvent::Error(
*stream_id.get_ref(),
RPCError::Custom("Stream timed out".into()),
))));
}
// drive inbound streams that need to be processed
@@ -614,7 +603,7 @@ where
InboundSubstreamState::Closing(mut substream) => {
// TODO: check if this is supposed to be a stream
match substream.close() {
Poll::Ready(_) => {
Poll::Ready(_) => {
//trace!(self.log, "Inbound stream dropped");
if let Some(delay_key) = &entry.get().1 {
self.inbound_substreams_delay.remove(delay_key);
@@ -729,7 +718,7 @@ where
},
OutboundSubstreamState::Closing(mut substream) => match substream.close() {
// TODO: check if this is supposed to be a stream
Poll::Ready(_)=> {
Poll::Ready(_) => {
//trace!(self.log, "Outbound stream dropped");
// drop the stream
let delay_key = &entry.get().1;
@@ -763,12 +752,10 @@ where
let rpc_event = self.dial_queue.remove(0);
self.dial_queue.shrink_to_fit();
if let RPCEvent::Request(id, req) = rpc_event {
return Poll::Ready(Ok(
ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(req.clone()),
info: RPCEvent::Request(id, req),
},
));
return Poll::Ready(Ok(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(req.clone()),
info: RPCEvent::Request(id, req),
}));
}
}
Poll::Pending
@@ -776,11 +763,11 @@ where
}
// Check for new items to send to the peer and update the underlying stream
fn apply_queued_responses<'a, TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec>(
raw_substream: InboundFramed<TSubstream, TSpec>,
fn apply_queued_responses<TSpec: EthSpec>(
raw_substream: InboundFramed<NegotiatedSubstream, TSpec>,
queued_outbound_items: &mut Option<&mut Vec<RPCErrorResponse<TSpec>>>,
new_items_to_send: &mut bool,
) -> InboundSubstreamState<'a, TSubstream, TSpec> {
) -> InboundSubstreamState<TSpec> {
match queued_outbound_items {
Some(ref mut queue) if !queue.is_empty() => {
*new_items_to_send = true;

View File

@@ -18,10 +18,10 @@ pub use methods::{
pub use protocol::{RPCError, RPCProtocol, RPCRequest};
use slog::{debug, o};
use std::marker::PhantomData;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite};
use types::EthSpec;
use std::task::{Poll, Context};
pub(crate) mod codec;
mod handler;
@@ -64,16 +64,14 @@ impl<T: EthSpec> std::fmt::Display for RPCEvent<T> {
/// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level
/// logic.
pub struct RPC<TSubstream, TSpec: EthSpec> {
pub struct RPC<TSpec: EthSpec> {
/// Queue of events to processed.
events: Vec<NetworkBehaviourAction<RPCEvent<TSpec>, RPCMessage<TSpec>>>,
/// Pins the generic substream.
marker: PhantomData<TSubstream>,
/// Slog logger for RPC behaviour.
log: slog::Logger,
}
impl<TSubstream, TSpec: EthSpec> RPC<TSubstream, TSpec> {
impl<TSpec: EthSpec> RPC<TSpec> {
pub fn new(log: slog::Logger) -> Self {
let log = log.new(o!("service" => "libp2p_rpc"));
RPC {
@@ -94,12 +92,11 @@ impl<TSubstream, TSpec: EthSpec> RPC<TSubstream, TSpec> {
}
}
impl<'a, TSubstream, TSpec> NetworkBehaviour for RPC<TSubstream, TSpec>
impl<TSpec> NetworkBehaviour for RPC<TSpec>
where
TSubstream: AsyncRead + AsyncWrite,
TSpec: EthSpec,
{
type ProtocolsHandler = RPCHandler<'a, TSubstream, TSpec>;
type ProtocolsHandler = RPCHandler<TSpec>;
type OutEvent = RPCMessage<TSpec>;
fn new_handler(&mut self) -> Self::ProtocolsHandler {

View File

@@ -11,17 +11,16 @@ use crate::rpc::{
methods::ResponseTermination,
};
use futures::future::*;
use futures::{future, sink, stream};
use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo};
use std::io;
use std::marker::PhantomData;
use std::pin::Pin;
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::time::Timeout;
use tokio_io_timeout::TimeoutStream;
use tokio_util::codec::Framed;
use types::EthSpec;
use std::pin::Pin;
/// The maximum bytes that can be sent across the RPC.
const MAX_RPC_SIZE: usize = 1_048_576; // 1M
@@ -171,7 +170,7 @@ impl ProtocolName for ProtocolId {
pub type InboundOutput<TSocket, TSpec> = (RPCRequest<TSpec>, InboundFramed<TSocket, TSpec>);
pub type InboundFramed<TSocket, TSpec> =
Framed<Timeout<upgrade::Negotiated<TSocket>>, InboundCodec<TSpec, RPCErrorResponse<TSpec>>>;
Framed<Timeout<TSocket>, InboundCodec<TSpec, RPCErrorResponse<TSpec>>>;
type FnAndThen<TSocket, TSpec> = fn(
(Option<RPCRequest<TSpec>>, InboundFramed<TSocket, TSpec>),
) -> Ready<Result<InboundOutput<TSocket, TSpec>, RPCError>>;
@@ -181,26 +180,14 @@ type FnMapErr = fn(tokio::time::Error) -> RPCError;
impl<TSocket, TSpec> InboundUpgrade<TSocket> for RPCProtocol<TSpec>
where
TSocket: AsyncRead + AsyncWrite,
TSocket: AsyncRead + AsyncWrite + Unpin + Send + 'static,
TSpec: EthSpec,
{
type Output = InboundOutput<TSocket, TSpec>;
type Error = RPCError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
type Future = future::Either<
Ready<Result<InboundOutput<TSocket, TSpec>, RPCError>>,
future::AndThen<
future::MapErr<Timeout<stream::StreamFuture<InboundFramed<TSocket, TSpec>>>, FnMapErr>,
Ready<Result<InboundOutput<TSocket, TSpec>, RPCError>>,
FnAndThen<TSocket, TSpec>,
>,
>;
fn upgrade_inbound(
self,
socket: upgrade::Negotiated<TSocket>,
protocol: ProtocolId,
) -> Self::Future {
fn upgrade_inbound(self, socket: TSocket, protocol: ProtocolId) -> Self::Future {
let protocol_name = protocol.message_name.clone();
let codec = match protocol.encoding {
Encoding::SSZSnappy => {
@@ -348,24 +335,18 @@ impl<TSpec: EthSpec> RPCRequest<TSpec> {
/* Outbound upgrades */
pub type OutboundFramed<TSocket, TSpec> =
Framed<upgrade::Negotiated<TSocket>, OutboundCodec<TSpec, RPCRequest<TSpec>>>;
pub type OutboundFramed<TSocket, TSpec> = Framed<TSocket, OutboundCodec<TSpec, RPCRequest<TSpec>>>;
impl<TSocket, TSpec> OutboundUpgrade<TSocket> for RPCRequest<TSpec>
where
TSpec: EthSpec,
TSocket: AsyncRead + AsyncWrite + Send,
TSocket: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Output = OutboundFramed<TSocket, TSpec>;
type Error = RPCError;
// TODO: Send takes a mutable reference to the sink now, hence the lifetime parameter
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
// type Future = sink::Send<'a, &'a mut OutboundFramed<TSocket, TSpec>, RPCRequest<TSpec>>;
fn upgrade_outbound(
self,
socket: upgrade::Negotiated<TSocket>,
protocol: Self::Info,
) -> Self::Future {
fn upgrade_outbound(self, socket: TSocket, protocol: Self::Info) -> Self::Future {
let codec = match protocol.encoding {
Encoding::SSZSnappy => {
let ssz_snappy_codec =

View File

@@ -6,10 +6,10 @@ use crate::{NetworkConfig, NetworkGlobals};
use futures::prelude::*;
use futures::Stream;
use libp2p::core::{
connection::Substream,
identity::Keypair,
multiaddr::Multiaddr,
muxing::StreamMuxerBox,
connection::Substream,
transport::boxed::Boxed,
upgrade::{InboundUpgradeExt, OutboundUpgradeExt},
ConnectedPoint,
@@ -19,15 +19,12 @@ use slog::{crit, debug, error, info, trace, warn};
use std::fs::File;
use std::io::prelude::*;
use std::io::{Error, ErrorKind};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::time::DelayQueue;
use types::{EnrForkId, EthSpec};
use std::pin::Pin;
use std::task::{Poll, Context};
type Libp2pStream = Boxed<(PeerId, StreamMuxerBox), Error>;
type Libp2pBehaviour<TSpec> = Behaviour<Substream<StreamMuxerBox>, TSpec>;
pub const NETWORK_KEY_FILENAME: &str = "key";
/// The time in milliseconds to wait before banning a peer. This allows for any Goodbye messages to be
@@ -38,7 +35,7 @@ const BAN_PEER_WAIT_TIMEOUT: u64 = 200;
pub struct Service<TSpec: EthSpec> {
/// The libp2p Swarm handler.
//TODO: Make this private
pub swarm: Swarm<Libp2pStream, Libp2pBehaviour<TSpec>>,
pub swarm: Swarm<Behaviour<TSpec>>,
/// This node's PeerId.
pub local_peer_id: PeerId,

View File

@@ -9,7 +9,7 @@ use types::{BitVector, EthSpec};
#[allow(type_alias_bounds)]
pub type EnrBitfield<T: EthSpec> = BitVector<T::SubnetBitfieldLength>;
pub type Enr = libp2p::discv5::enr::Enr<libp2p::discv5::enr::CombinedKey>;
pub type Enr = discv5::enr::Enr<discv5::enr::CombinedKey>;
pub use globals::NetworkGlobals;
pub use pubsub::PubsubMessage;