Non rpc stuff compiles

This commit is contained in:
pawanjay176
2020-04-28 20:30:29 +05:30
parent 47124ca404
commit 5ae53c9699
13 changed files with 3027 additions and 3078 deletions

View File

@@ -10,14 +10,15 @@ use libp2p::{
gossipsub::{Gossipsub, GossipsubEvent, MessageId},
identify::{Identify, IdentifyEvent},
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess},
tokio_io::{AsyncRead, AsyncWrite},
NetworkBehaviour, PeerId,
};
use tokio::io::{AsyncRead, AsyncWrite};
use lru::LruCache;
use slog::{crit, debug, o, warn};
use std::marker::PhantomData;
use std::sync::Arc;
use types::{EnrForkId, EthSpec, SubnetId};
use std::task::Poll;
const MAX_IDENTIFY_ADDRESSES: usize = 10;
@@ -28,15 +29,15 @@ const MAX_IDENTIFY_ADDRESSES: usize = 10;
#[behaviour(out_event = "BehaviourEvent<TSpec>", poll_method = "poll")]
pub struct Behaviour<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> {
/// The routing pub-sub mechanism for eth2.
gossipsub: Gossipsub<TSubstream>,
gossipsub: Gossipsub,
/// The Eth2 RPC specified in the wire-0 protocol.
eth2_rpc: RPC<TSubstream, TSpec>,
/// Keep regular connection to peers and disconnect if absent.
// TODO: Using id for initial interop. This will be removed by mainnet.
/// Provides IP addresses and peer information.
identify: Identify<TSubstream>,
identify: Identify,
/// Discovery behaviour.
discovery: Discovery<TSubstream, TSpec>,
discovery: Discovery<TSpec>,
/// The peer manager that keeps track of peer's reputation and status.
#[behaviour(ignore)]
peer_manager: PeerManager<TSpec>,
@@ -114,12 +115,12 @@ impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> Behaviour<TSubstream, T
}
/// Obtain a reference to the discovery protocol.
pub fn discovery(&self) -> &Discovery<TSubstream, TSpec> {
pub fn discovery(&self) -> &Discovery<TSpec> {
&self.discovery
}
/// Obtain a reference to the gossipsub protocol.
pub fn gs(&self) -> &Gossipsub<TSubstream> {
pub fn gs(&self) -> &Gossipsub {
&self.gossipsub
}
@@ -465,15 +466,15 @@ impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> Behaviour<TSubstream, T
/// Consumes the events list when polled.
fn poll<TBehaviourIn>(
&mut self,
) -> Async<NetworkBehaviourAction<TBehaviourIn, BehaviourEvent<TSpec>>> {
) -> Poll<NetworkBehaviourAction<TBehaviourIn, BehaviourEvent<TSpec>>> {
// check the peer manager for events
loop {
match self.peer_manager.poll() {
Ok(Async::Ready(Some(event))) => match event {
Ok(Poll::Ready(Some(event))) => match event {
PeerManagerEvent::Status(peer_id) => {
// it's time to status. We don't keep a beacon chain reference here, so we inform
// the network to send a status to this peer
return Async::Ready(NetworkBehaviourAction::GenerateEvent(
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(
BehaviourEvent::StatusPeer(peer_id),
));
}
@@ -491,8 +492,8 @@ impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> Behaviour<TSubstream, T
//TODO: Implement
}
},
Ok(Async::NotReady) => break,
Ok(Async::Ready(None)) | Err(_) => {
Poll::Pending => break,
Poll::Ready(None) | Err(_) => {
crit!(self.log, "Error polling peer manager");
break;
}
@@ -500,10 +501,10 @@ impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> Behaviour<TSubstream, T
}
if !self.events.is_empty() {
return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)));
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)));
}
Async::NotReady
Poll::Pending
}
}

View File

@@ -8,11 +8,13 @@ use crate::metrics;
use crate::{error, Enr, NetworkConfig, NetworkGlobals};
use enr::{Eth2Enr, BITFIELD_ENR_KEY, ETH2_ENR_KEY};
use futures::prelude::*;
use libp2p::core::{ConnectedPoint, Multiaddr, PeerId};
use libp2p::core::{Multiaddr, PeerId};
use libp2p::discv5::enr::NodeId;
use libp2p::discv5::{Discv5, Discv5Event};
use libp2p::multiaddr::Protocol;
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler};
use libp2p::swarm::{
DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler,
};
use slog::{crit, debug, info, warn};
use ssz::{Decode, Encode};
use ssz_types::BitVector;
@@ -20,9 +22,9 @@ use std::collections::{HashSet, VecDeque};
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::timer::Delay;
use std::task::Poll;
use std::time::Duration;
use tokio::time::{delay_until, Delay, Instant};
use types::{EnrForkId, EthSpec, SubnetId};
/// Maximum seconds before searching for extra peers.
@@ -36,7 +38,7 @@ const TARGET_SUBNET_PEERS: u64 = 3;
/// Lighthouse discovery behaviour. This provides peer management and discovery using the Discv5
/// libp2p protocol.
pub struct Discovery<TSubstream, TSpec: EthSpec> {
pub struct Discovery<TSpec: EthSpec> {
/// Events to be processed by the behaviour.
events: VecDeque<NetworkBehaviourAction<void::Void, Discv5Event>>,
@@ -62,7 +64,7 @@ pub struct Discovery<TSubstream, TSpec: EthSpec> {
tcp_port: u16,
/// The discovery behaviour used to discover new peers.
discovery: Discv5<TSubstream>,
discovery: Discv5,
/// A collection of network constants that can be read from other threads.
network_globals: Arc<NetworkGlobals<TSpec>>,
@@ -71,8 +73,8 @@ pub struct Discovery<TSubstream, TSpec: EthSpec> {
log: slog::Logger,
}
impl<TSubstream, TSpec: EthSpec> Discovery<TSubstream, TSpec> {
pub fn new(
impl<TSpec: EthSpec> Discovery<TSpec> {
pub async fn new(
local_key: &Keypair,
config: &NetworkConfig,
network_globals: Arc<NetworkGlobals<TSpec>>,
@@ -97,6 +99,7 @@ impl<TSubstream, TSpec: EthSpec> Discovery<TSubstream, TSpec> {
config.discv5_config.clone(),
listen_socket,
)
.await
.map_err(|e| format!("Discv5 service failed. Error: {:?}", e))?;
// Add bootnodes to routing table
@@ -123,7 +126,7 @@ impl<TSubstream, TSpec: EthSpec> Discovery<TSubstream, TSpec> {
events: VecDeque::with_capacity(16),
banned_peers: HashSet::new(),
max_peers: config.max_peers,
peer_discovery_delay: Delay::new(Instant::now()),
peer_discovery_delay: delay_until(Instant::now()),
past_discovery_delay: INITIAL_SEARCH_DELAY,
tcp_port: config.libp2p_port,
discovery,
@@ -343,12 +346,9 @@ impl<TSubstream, TSpec: EthSpec> Discovery<TSubstream, TSpec> {
}
// Redirect all behaviour events to underlying discovery behaviour.
impl<TSubstream, TSpec: EthSpec> NetworkBehaviour for Discovery<TSubstream, TSpec>
where
TSubstream: AsyncRead + AsyncWrite,
{
type ProtocolsHandler = <Discv5<TSubstream> as NetworkBehaviour>::ProtocolsHandler;
type OutEvent = <Discv5<TSubstream> as NetworkBehaviour>::OutEvent;
impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
type ProtocolsHandler = <Discv5 as NetworkBehaviour>::ProtocolsHandler;
type OutEvent = <Discv5 as NetworkBehaviour>::OutEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
NetworkBehaviour::new_handler(&mut self.discovery)
@@ -359,31 +359,14 @@ where
self.discovery.addresses_of_peer(peer_id)
}
fn inject_connected(&mut self, _peer_id: PeerId, _endpoint: ConnectedPoint) {}
fn inject_connected(&mut self, _peer_id: &PeerId) {}
fn inject_disconnected(&mut self, _peer_id: &PeerId, _endpoint: ConnectedPoint) {}
fn inject_replaced(
&mut self,
_peer_id: PeerId,
_closed: ConnectedPoint,
_opened: ConnectedPoint,
) {
// discv5 doesn't implement
}
fn inject_node_event(
&mut self,
_peer_id: PeerId,
_event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
) {
// discv5 doesn't implement
}
fn inject_disconnected(&mut self, _peer_id: &PeerId) {}
fn poll(
&mut self,
params: &mut impl PollParameters,
) -> Async<
) -> Poll<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
Self::OutEvent,
@@ -392,7 +375,7 @@ where
// search for peers if it is time
loop {
match self.peer_discovery_delay.poll() {
Ok(Async::Ready(_)) => {
Poll::Ready(_) => {
if self.network_globals.connected_peers() < self.max_peers {
self.find_peers();
}
@@ -401,7 +384,7 @@ where
Instant::now() + Duration::from_secs(MAX_TIME_BETWEEN_PEER_SEARCHES),
);
}
Ok(Async::NotReady) => break,
Poll::Pending => break,
Err(e) => {
warn!(self.log, "Discovery peer search failed"; "error" => format!("{:?}", e));
}
@@ -411,7 +394,7 @@ where
// Poll discovery
loop {
match self.discovery.poll(params) {
Async::Ready(NetworkBehaviourAction::GenerateEvent(event)) => {
Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => {
match event {
Discv5Event::Discovered(_enr) => {
// peers that get discovered during a query but are not contactable or
@@ -434,7 +417,7 @@ where
let enr = self.discovery.local_enr();
enr::save_enr_to_disk(Path::new(&self.enr_dir), enr, &self.log);
return Async::Ready(NetworkBehaviourAction::ReportObservedAddr {
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr {
address,
});
}
@@ -465,8 +448,10 @@ where
{
debug!(self.log, "Connecting to discovered peer"; "peer_id"=> format!("{:?}", peer_id));
self.network_globals.peers.write().dialing_peer(&peer_id);
self.events
.push_back(NetworkBehaviourAction::DialPeer { peer_id });
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id,
condition: DialPeerCondition::NotDialing, // TODO: check if this is the condition we want
});
}
}
}
@@ -474,16 +459,16 @@ where
}
}
// discv5 does not output any other NetworkBehaviourAction
Async::Ready(_) => {}
Async::NotReady => break,
Poll::Ready(_) => {}
Poll::Pending => break,
}
}
// process any queued events
if let Some(event) = self.events.pop_front() {
return Async::Ready(event);
return Poll::Ready(event);
}
Async::NotReady
Poll::Pending
}
}

View File

@@ -10,7 +10,9 @@ use hashmap_delay::HashSetDelay;
use libp2p::identify::IdentifyInfo;
use slog::{crit, debug, error, warn};
use smallvec::SmallVec;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use types::EthSpec;
@@ -314,34 +316,38 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
impl<TSpec: EthSpec> Stream for PeerManager<TSpec> {
type Item = PeerManagerEvent;
type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// poll the timeouts for pings and status'
while let Async::Ready(Some(peer_id)) = self.ping_peers.poll().map_err(|e| {
error!(self.log, "Failed to check for peers to ping"; "error" => format!("{}",e));
})? {
debug!(self.log, "Pinging peer"; "peer_id" => format!("{}", peer_id));
// add the ping timer back
self.ping_peers.insert(peer_id.clone());
self.events.push(PeerManagerEvent::Ping(peer_id));
// TODO: was getting a bit messy with while lets. Check if logic is preserved.
loop {
match self.ping_peers.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(peer_id))) => self.events.push(PeerManagerEvent::Ping(peer_id)),
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for peers to ping"; "error" => format!("{}",e))
}
Poll::Ready(None) | Poll::Pending => break,
}
}
while let Async::Ready(Some(peer_id)) = self.status_peers.poll().map_err(|e| {
error!(self.log, "Failed to check for peers to status"; "error" => format!("{}",e));
})? {
debug!(self.log, "Sending Status to peer"; "peer_id" => format!("{}", peer_id));
// add the status timer back
self.status_peers.insert(peer_id.clone());
self.events.push(PeerManagerEvent::Status(peer_id));
loop {
match self.status_peers.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(peer_id))) => {
self.events.push(PeerManagerEvent::Status(peer_id))
}
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for peers to ping"; "error" => format!("{}",e))
}
Poll::Ready(None) | Poll::Pending => break,
}
}
if !self.events.is_empty() {
return Ok(Async::Ready(Some(self.events.remove(0))));
return Poll::Ready(Some(self.events.remove(0)));
} else {
self.events.shrink_to_fit();
}
Ok(Async::NotReady)
Poll::Pending
}
}

View File

@@ -4,10 +4,10 @@ use crate::rpc::{ErrorMessage, RPCErrorResponse, RPCRequest, RPCResponse};
use libp2p::bytes::BufMut;
use libp2p::bytes::BytesMut;
use std::marker::PhantomData;
use tokio::codec::{Decoder, Encoder};
use tokio_util::codec::{Decoder, Encoder};
use types::EthSpec;
pub trait OutboundCodec: Encoder + Decoder {
pub trait OutboundCodec<TItem>: Encoder<TItem> + Decoder {
type ErrorType;
fn decode_error(
@@ -19,9 +19,9 @@ pub trait OutboundCodec: Encoder + Decoder {
/* Global Inbound Codec */
// This deals with Decoding RPC Requests from other peers and encoding our responses
pub struct BaseInboundCodec<TCodec, TSpec>
pub struct BaseInboundCodec<TCodec, TSpec, TItem>
where
TCodec: Encoder + Decoder,
TCodec: Encoder<TItem> + Decoder,
TSpec: EthSpec,
{
/// Inner codec for handling various encodings
@@ -29,9 +29,9 @@ where
phantom: PhantomData<TSpec>,
}
impl<TCodec, TSpec> BaseInboundCodec<TCodec, TSpec>
impl<TCodec, TSpec, TItem> BaseInboundCodec<TCodec, TSpec, TItem>
where
TCodec: Encoder + Decoder,
TCodec: Encoder<TItem> + Decoder,
TSpec: EthSpec,
{
pub fn new(codec: TCodec) -> Self {
@@ -44,9 +44,9 @@ where
/* Global Outbound Codec */
// This deals with Decoding RPC Responses from other peers and encoding our requests
pub struct BaseOutboundCodec<TOutboundCodec, TSpec>
pub struct BaseOutboundCodec<TOutboundCodec, TSpec, TItem>
where
TOutboundCodec: OutboundCodec,
TOutboundCodec: OutboundCodec<TItem>,
TSpec: EthSpec,
{
/// Inner codec for handling various encodings.
@@ -56,10 +56,10 @@ where
phantom: PhantomData<TSpec>,
}
impl<TOutboundCodec, TSpec> BaseOutboundCodec<TOutboundCodec, TSpec>
impl<TOutboundCodec, TSpec, TItem> BaseOutboundCodec<TOutboundCodec, TSpec, TItem>
where
TSpec: EthSpec,
TOutboundCodec: OutboundCodec,
TOutboundCodec: OutboundCodec<TItem>,
{
pub fn new(codec: TOutboundCodec) -> Self {
BaseOutboundCodec {
@@ -75,15 +75,14 @@ where
/* Base Inbound Codec */
// This Encodes RPC Responses sent to external peers
impl<TCodec, TSpec> Encoder for BaseInboundCodec<TCodec, TSpec>
impl<TCodec, TSpec> Encoder<RPCErrorResponse<TSpec>> for BaseInboundCodec<TCodec, TSpec, RPCErrorResponse<TSpec>>
where
TSpec: EthSpec,
TCodec: Decoder + Encoder<Item = RPCErrorResponse<TSpec>>,
TCodec: Decoder + Encoder<RPCErrorResponse<TSpec>>,
{
type Item = RPCErrorResponse<TSpec>;
type Error = <TCodec as Encoder>::Error;
type Error = <TCodec as Encoder<RPCErrorResponse<TSpec>>>::Error;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
fn encode(&mut self, item: RPCErrorResponse<TSpec>, dst: &mut BytesMut) -> Result<(), Self::Error> {
dst.clear();
dst.reserve(1);
dst.put_u8(
@@ -95,10 +94,12 @@ where
}
// This Decodes RPC Requests from external peers
impl<TCodec, TSpec> Decoder for BaseInboundCodec<TCodec, TSpec>
// TODO: check if the Item parameter is correct
impl<TCodec, TSpec> Decoder for BaseInboundCodec<TCodec, TSpec, RPCErrorResponse<TSpec>>
where
TSpec: EthSpec,
TCodec: Encoder + Decoder<Item = RPCRequest<TSpec>>,
// TODO: check if the Item parameter is correct
TCodec: Encoder<RPCErrorResponse<TSpec>> + Decoder<Item = RPCRequest<TSpec>>,
{
type Item = RPCRequest<TSpec>;
type Error = <TCodec as Decoder>::Error;
@@ -111,24 +112,23 @@ where
/* Base Outbound Codec */
// This Encodes RPC Requests sent to external peers
impl<TCodec, TSpec> Encoder for BaseOutboundCodec<TCodec, TSpec>
impl<TCodec, TSpec> Encoder<RPCRequest<TSpec>> for BaseOutboundCodec<TCodec, TSpec, RPCRequest<TSpec>>
where
TSpec: EthSpec,
TCodec: OutboundCodec + Encoder<Item = RPCRequest<TSpec>>,
TCodec: OutboundCodec<RPCRequest<TSpec>> + Encoder<RPCRequest<TSpec>>,
{
type Item = RPCRequest<TSpec>;
type Error = <TCodec as Encoder>::Error;
type Error = <TCodec as Encoder<RPCRequest<TSpec>>>::Error;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
fn encode(&mut self, item: RPCRequest<TSpec>, dst: &mut BytesMut) -> Result<(), Self::Error> {
self.inner.encode(item, dst)
}
}
// This decodes RPC Responses received from external peers
impl<TCodec, TSpec> Decoder for BaseOutboundCodec<TCodec, TSpec>
impl<TCodec, TSpec> Decoder for BaseOutboundCodec<TCodec, TSpec, RPCRequest<TSpec>>
where
TSpec: EthSpec,
TCodec: OutboundCodec<ErrorType = ErrorMessage> + Decoder<Item = RPCResponse<TSpec>>,
TCodec: OutboundCodec<RPCRequest<TSpec>, ErrorType = ErrorMessage> + Decoder<Item = RPCResponse<TSpec>>,
{
type Item = RPCErrorResponse<TSpec>;
type Error = <TCodec as Decoder>::Error;

View File

@@ -8,25 +8,24 @@ use self::ssz_snappy::{SSZSnappyInboundCodec, SSZSnappyOutboundCodec};
use crate::rpc::protocol::RPCError;
use crate::rpc::{RPCErrorResponse, RPCRequest};
use libp2p::bytes::BytesMut;
use tokio::codec::{Decoder, Encoder};
use tokio_util::codec::{Decoder, Encoder};
use types::EthSpec;
// Known types of codecs
pub enum InboundCodec<TSpec: EthSpec> {
SSZSnappy(BaseInboundCodec<SSZSnappyInboundCodec<TSpec>, TSpec>),
SSZ(BaseInboundCodec<SSZInboundCodec<TSpec>, TSpec>),
pub enum InboundCodec<TSpec: EthSpec, TItem> {
SSZSnappy(BaseInboundCodec<SSZSnappyInboundCodec<TSpec>, TSpec, TItem>),
SSZ(BaseInboundCodec<SSZInboundCodec<TSpec>, TSpec, TItem>),
}
pub enum OutboundCodec<TSpec: EthSpec> {
SSZSnappy(BaseOutboundCodec<SSZSnappyOutboundCodec<TSpec>, TSpec>),
SSZ(BaseOutboundCodec<SSZOutboundCodec<TSpec>, TSpec>),
pub enum OutboundCodec<TSpec: EthSpec, TItem> {
SSZSnappy(BaseOutboundCodec<SSZSnappyOutboundCodec<TSpec>, TSpec, TItem>),
SSZ(BaseOutboundCodec<SSZOutboundCodec<TSpec>, TSpec, TItem>),
}
impl<T: EthSpec> Encoder for InboundCodec<T> {
type Item = RPCErrorResponse<T>;
impl<T: EthSpec> Encoder<RPCErrorResponse<T>> for InboundCodec<T, RPCErrorResponse<T>> {
type Error = RPCError;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
fn encode(&mut self, item: RPCErrorResponse<T>, dst: &mut BytesMut) -> Result<(), Self::Error> {
match self {
InboundCodec::SSZ(codec) => codec.encode(item, dst),
InboundCodec::SSZSnappy(codec) => codec.encode(item, dst),
@@ -34,7 +33,7 @@ impl<T: EthSpec> Encoder for InboundCodec<T> {
}
}
impl<TSpec: EthSpec> Decoder for InboundCodec<TSpec> {
impl<TSpec: EthSpec> Decoder for InboundCodec<TSpec, RPCErrorResponse<TSpec>> {
type Item = RPCRequest<TSpec>;
type Error = RPCError;
@@ -46,11 +45,10 @@ impl<TSpec: EthSpec> Decoder for InboundCodec<TSpec> {
}
}
impl<TSpec: EthSpec> Encoder for OutboundCodec<TSpec> {
type Item = RPCRequest<TSpec>;
impl<TSpec: EthSpec> Encoder<RPCRequest<TSpec>> for OutboundCodec<TSpec, RPCRequest<TSpec>> {
type Error = RPCError;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
fn encode(&mut self, item: RPCRequest<TSpec>, dst: &mut BytesMut) -> Result<(), Self::Error> {
match self {
OutboundCodec::SSZ(codec) => codec.encode(item, dst),
OutboundCodec::SSZSnappy(codec) => codec.encode(item, dst),
@@ -58,7 +56,7 @@ impl<TSpec: EthSpec> Encoder for OutboundCodec<TSpec> {
}
}
impl<T: EthSpec> Decoder for OutboundCodec<T> {
impl<T: EthSpec> Decoder for OutboundCodec<T, RPCRequest<T>> {
type Item = RPCErrorResponse<T>;
type Error = RPCError;

View File

@@ -7,7 +7,7 @@ use crate::rpc::{ErrorMessage, RPCErrorResponse, RPCRequest, RPCResponse};
use libp2p::bytes::{BufMut, Bytes, BytesMut};
use ssz::{Decode, Encode};
use std::marker::PhantomData;
use tokio::codec::{Decoder, Encoder};
use tokio_util::codec::{Decoder, Encoder};
use types::{EthSpec, SignedBeaconBlock};
use unsigned_varint::codec::UviBytes;
@@ -36,11 +36,10 @@ impl<T: EthSpec> SSZInboundCodec<T> {
}
// Encoder for inbound streams: Encodes RPC Responses sent to peers.
impl<TSpec: EthSpec> Encoder for SSZInboundCodec<TSpec> {
type Item = RPCErrorResponse<TSpec>;
impl<TSpec: EthSpec> Encoder<RPCErrorResponse<TSpec>> for SSZInboundCodec<TSpec> {
type Error = RPCError;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
fn encode(&mut self, item: RPCErrorResponse<TSpec>, dst: &mut BytesMut) -> Result<(), Self::Error> {
let bytes = match item {
RPCErrorResponse::Success(resp) => match resp {
RPCResponse::Status(res) => res.as_ssz_bytes(),
@@ -147,11 +146,10 @@ impl<TSpec: EthSpec> SSZOutboundCodec<TSpec> {
}
// Encoder for outbound streams: Encodes RPC Requests to peers
impl<TSpec: EthSpec> Encoder for SSZOutboundCodec<TSpec> {
type Item = RPCRequest<TSpec>;
impl<TSpec: EthSpec> Encoder<RPCRequest<TSpec>> for SSZOutboundCodec<TSpec> {
type Error = RPCError;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
fn encode(&mut self, item: RPCRequest<TSpec>, dst: &mut BytesMut) -> Result<(), Self::Error> {
let bytes = match item {
RPCRequest::Status(req) => req.as_ssz_bytes(),
RPCRequest::Goodbye(req) => req.as_ssz_bytes(),
@@ -255,7 +253,7 @@ impl<TSpec: EthSpec> Decoder for SSZOutboundCodec<TSpec> {
}
}
impl<TSpec: EthSpec> OutboundCodec for SSZOutboundCodec<TSpec> {
impl<TSpec: EthSpec> OutboundCodec<RPCRequest<TSpec>> for SSZOutboundCodec<TSpec> {
type ErrorType = ErrorMessage;
fn decode_error(&mut self, src: &mut BytesMut) -> Result<Option<Self::ErrorType>, RPCError> {

View File

@@ -12,7 +12,7 @@ use std::io::Cursor;
use std::io::ErrorKind;
use std::io::{Read, Write};
use std::marker::PhantomData;
use tokio::codec::{Decoder, Encoder};
use tokio_util::codec::{Decoder, Encoder};
use types::{EthSpec, SignedBeaconBlock};
use unsigned_varint::codec::Uvi;
@@ -44,11 +44,10 @@ impl<T: EthSpec> SSZSnappyInboundCodec<T> {
}
// Encoder for inbound streams: Encodes RPC Responses sent to peers.
impl<TSpec: EthSpec> Encoder for SSZSnappyInboundCodec<TSpec> {
type Item = RPCErrorResponse<TSpec>;
impl<TSpec: EthSpec> Encoder<RPCErrorResponse<TSpec>> for SSZSnappyInboundCodec<TSpec> {
type Error = RPCError;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
fn encode(&mut self, item: RPCErrorResponse<TSpec>, dst: &mut BytesMut) -> Result<(), Self::Error> {
let bytes = match item {
RPCErrorResponse::Success(resp) => match resp {
RPCResponse::Status(res) => res.as_ssz_bytes(),
@@ -197,11 +196,10 @@ impl<TSpec: EthSpec> SSZSnappyOutboundCodec<TSpec> {
}
// Encoder for outbound streams: Encodes RPC Requests to peers
impl<TSpec: EthSpec> Encoder for SSZSnappyOutboundCodec<TSpec> {
type Item = RPCRequest<TSpec>;
impl<TSpec: EthSpec> Encoder<RPCRequest<TSpec>> for SSZSnappyOutboundCodec<TSpec> {
type Error = RPCError;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
fn encode(&mut self, item: RPCRequest<TSpec>, dst: &mut BytesMut) -> Result<(), Self::Error> {
let bytes = match item {
RPCRequest::Status(req) => req.as_ssz_bytes(),
RPCRequest::Goodbye(req) => req.as_ssz_bytes(),
@@ -312,7 +310,7 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyOutboundCodec<TSpec> {
}
}
impl<TSpec: EthSpec> OutboundCodec for SSZSnappyOutboundCodec<TSpec> {
impl<TSpec: EthSpec> OutboundCodec<RPCRequest<TSpec>> for SSZSnappyOutboundCodec<TSpec> {
type ErrorType = ErrorMessage;
fn decode_error(&mut self, src: &mut BytesMut) -> Result<Option<Self::ErrorType>, RPCError> {

View File

@@ -15,9 +15,10 @@ use libp2p::swarm::protocols_handler::{
use slog::{crit, debug, error, trace, warn};
use smallvec::SmallVec;
use std::collections::hash_map::Entry;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::timer::{delay_queue, DelayQueue};
use tokio::time::{delay_queue, DelayQueue};
use types::EthSpec;
//TODO: Implement close() on the substream types to improve the poll code.
@@ -37,9 +38,8 @@ type InboundRequestId = RequestId;
type OutboundRequestId = RequestId;
/// Implementation of `ProtocolsHandler` for the RPC protocol.
pub struct RPCHandler<TSubstream, TSpec>
pub struct RPCHandler<TSpec>
where
TSubstream: AsyncRead + AsyncWrite,
TSpec: EthSpec,
{
/// The upgrade for inbound substreams.
@@ -61,7 +61,7 @@ where
inbound_substreams: FnvHashMap<
InboundRequestId,
(
InboundSubstreamState<TSubstream, TSpec>,
InboundSubstreamState<TSpec>,
Option<delay_queue::Key>,
),
>,
@@ -73,7 +73,7 @@ where
/// maintained by the application sending the request.
outbound_substreams: FnvHashMap<
OutboundRequestId,
(OutboundSubstreamState<TSubstream, TSpec>, delay_queue::Key),
(OutboundSubstreamState<TSpec>, delay_queue::Key),
>,
/// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout.
@@ -100,9 +100,6 @@ where
/// Logger for handling RPC streams
log: slog::Logger,
/// Marker to pin the generic stream.
_phantom: PhantomData<TSubstream>,
}
/// State of an outbound substream. Either waiting for a response, or in the process of sending.
@@ -259,7 +256,6 @@ where
type InEvent = RPCEvent<TSpec>;
type OutEvent = RPCEvent<TSpec>;
type Error = ProtocolsHandlerUpgrErr<RPCError>;
type Substream = TSubstream;
type InboundProtocol = RPCProtocol<TSpec>;
type OutboundProtocol = RPCRequest<TSpec>;
type OutboundOpenInfo = RPCEvent<TSpec>; // Keep track of the id and the request
@@ -429,7 +425,7 @@ where
&mut self,
request: Self::OutboundOpenInfo,
error: ProtocolsHandlerUpgrErr<
<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error,
<Self::OutboundProtocol as OutboundUpgrade<libp2p::swarm::NegotiatedSubstream>>::Error,
>,
) {
if let ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(RPCError::IoError(_))) = error {
@@ -458,8 +454,12 @@ where
fn poll(
&mut self,
) -> Poll<
ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>,
Self::Error,
ProtocolsHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::OutEvent,
Self::Error,
>,
> {
if let Some((request_id, err)) = self.pending_error.pop() {
// Returning an error here will result in dropping the peer.
@@ -472,53 +472,52 @@ where
// other clients testing their software. In the future, we will need to decide
// which protocols are a bare minimum to support before kicking the peer.
error!(self.log, "Peer doesn't support the RPC protocol"; "protocol" => protocol_string);
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(request_id, RPCError::InvalidProtocol(protocol_string)),
)));
return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom(RPCEvent::Error(
request_id,
RPCError::InvalidProtocol(protocol_string),
))));
}
ProtocolsHandlerUpgrErr::Timeout | ProtocolsHandlerUpgrErr::Timer => {
// negotiation timeout, mark the request as failed
debug!(self.log, "Active substreams before timeout"; "len" => self.outbound_substreams.len());
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(
request_id,
RPCError::Custom("Protocol negotiation timeout".into()),
),
)));
return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom(RPCEvent::Error(
request_id,
RPCError::Custom("Protocol negotiation timeout".into()),
))));
}
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)) => {
// IO/Decode/Custom Error, report to the application
debug!(self.log, "Upgrade Error"; "error" => format!("{}",err));
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(request_id, err),
)));
return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom(RPCEvent::Error(
request_id, err,
))));
}
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) => {
// Error during negotiation
debug!(self.log, "Upgrade Error"; "error" => format!("{}",err));
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(request_id, RPCError::Custom(format!("{}", err))),
)));
return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom(RPCEvent::Error(
request_id,
RPCError::Custom(format!("{}", err)),
))));
}
}
}
// return any events that need to be reported
if !self.events_out.is_empty() {
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
self.events_out.remove(0),
)));
return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom(self.events_out.remove(0))));
} else {
self.events_out.shrink_to_fit();
}
// purge expired inbound substreams and send an error
while let Async::Ready(Some(stream_id)) =
self.inbound_substreams_delay.poll().map_err(|e| {
// TODO: check if this pattern is equivalent to
// while let Async::Ready() = stream.poll().map_err(..)
while let Poll::Ready(Some(d)) = self.inbound_substreams_delay.poll() {
let stream_id = d.map_err(|e| {
warn!(self.log, "Inbound substream poll failed"; "error" => format!("{:?}", e));
ProtocolsHandlerUpgrErr::Timer
})?
{
})?;
let rpc_id = stream_id.get_ref();
// handle a stream timeout for various states
@@ -535,15 +534,16 @@ where
}
// purge expired outbound substreams
if let Async::Ready(Some(stream_id)) =
self.outbound_substreams_delay.poll().map_err(|e| {
if let Poll::Ready(Some(d)) =
self.outbound_substreams_delay.poll() {
let stream_id = d.map_err(|e| {
warn!(self.log, "Outbound substream poll failed"; "error" => format!("{:?}", e));
ProtocolsHandlerUpgrErr::Timer
})?
{
})?;
self.outbound_substreams.remove(stream_id.get_ref());
// notify the user
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(
*stream_id.get_ref(),
RPCError::Custom("Stream timed out".into()),
@@ -569,7 +569,7 @@ where
closing,
} => {
match substream.poll() {
Ok(Async::Ready(raw_substream)) => {
Poll::Ready(Ok(raw_substream)) => {
// completed the send
// close the stream if required
@@ -587,7 +587,7 @@ where
);
}
}
Ok(Async::NotReady) => {
Poll::Pending => {
entry.get_mut().0 =
InboundSubstreamState::ResponsePendingSend {
substream,
@@ -599,7 +599,7 @@ where
self.inbound_substreams_delay.remove(delay_key);
}
entry.remove_entry();
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(0, e),
)));
}
@@ -613,8 +613,9 @@ where
);
}
InboundSubstreamState::Closing(mut substream) => {
// TODO: check if this is supposed to be a stream
match substream.close() {
Ok(Async::Ready(())) | Err(_) => {
Poll::Ready(_) => {
//trace!(self.log, "Inbound stream dropped");
if let Some(delay_key) = &entry.get().1 {
self.inbound_substreams_delay.remove(delay_key);
@@ -630,7 +631,7 @@ where
);
}
} // drop the stream
Ok(Async::NotReady) => {
Poll::Pending => {
entry.get_mut().0 =
InboundSubstreamState::Closing(substream);
}
@@ -659,7 +660,7 @@ where
mut substream,
request,
} => match substream.poll() {
Ok(Async::Ready(Some(response))) => {
Poll::Ready(Some(Ok(response))) => {
if request.multiple_responses() && !response.is_error() {
entry.get_mut().0 =
OutboundSubstreamState::RequestPendingResponse {
@@ -677,11 +678,11 @@ where
entry.get_mut().0 = OutboundSubstreamState::Closing(substream);
}
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom(
RPCEvent::Response(request_id, response),
)));
}
Ok(Async::Ready(None)) => {
Poll::Ready(None) => {
// stream closed
// if we expected multiple streams send a stream termination,
// else report the stream terminating only.
@@ -693,7 +694,7 @@ where
// notify the application error
if request.multiple_responses() {
// return an end of stream result
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom(
RPCEvent::Response(
request_id,
RPCErrorResponse::StreamTermination(
@@ -702,7 +703,7 @@ where
),
)));
} // else we return an error, stream should not have closed early.
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(
request_id,
RPCError::Custom(
@@ -711,24 +712,25 @@ where
),
)));
}
Ok(Async::NotReady) => {
Poll::Pending => {
entry.get_mut().0 = OutboundSubstreamState::RequestPendingResponse {
substream,
request,
}
}
Err(e) => {
Poll::Ready(Some(Err(e))) => {
// drop the stream
let delay_key = &entry.get().1;
self.outbound_substreams_delay.remove(delay_key);
entry.remove_entry();
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(request_id, e),
)));
}
},
OutboundSubstreamState::Closing(mut substream) => match substream.close() {
Ok(Async::Ready(())) | Err(_) => {
// TODO: check if this is supposed to be a stream
Poll::Ready(_)=> {
//trace!(self.log, "Outbound stream dropped");
// drop the stream
let delay_key = &entry.get().1;
@@ -742,7 +744,7 @@ where
KeepAlive::Until(Instant::now() + self.inactive_timeout);
}
}
Ok(Async::NotReady) => {
Poll::Pending => {
entry.get_mut().0 = OutboundSubstreamState::Closing(substream);
}
},
@@ -762,7 +764,7 @@ where
let rpc_event = self.dial_queue.remove(0);
self.dial_queue.shrink_to_fit();
if let RPCEvent::Request(id, req) = rpc_event {
return Ok(Async::Ready(
return Poll::Ready(Ok(
ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(req.clone()),
info: RPCEvent::Request(id, req),
@@ -770,7 +772,7 @@ where
));
}
}
Ok(Async::NotReady)
Poll::Pending
}
}

View File

@@ -4,7 +4,6 @@
//! direct peer-to-peer communication primarily for sending/receiving chain information for
//! syncing.
use futures::prelude::*;
use handler::RPCHandler;
use libp2p::core::ConnectedPoint;
use libp2p::swarm::{
@@ -22,6 +21,7 @@ use std::marker::PhantomData;
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite};
use types::EthSpec;
use std::task::{Poll, Context};
pub(crate) mod codec;
mod handler;
@@ -151,7 +151,7 @@ where
));
}
fn inject_node_event(
fn inject_event(
&mut self,
source: PeerId,
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
@@ -165,17 +165,18 @@ where
fn poll(
&mut self,
_cx: &mut Context,
_: &mut impl PollParameters,
) -> Async<
) -> Poll<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
Self::OutEvent,
>,
> {
if !self.events.is_empty() {
return Async::Ready(self.events.remove(0));
return Poll::Ready(self.events.remove(0));
}
Async::NotReady
Poll::Pending
}
}

View File

@@ -16,11 +16,9 @@ use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, ProtocolName, Upgra
use std::io;
use std::marker::PhantomData;
use std::time::Duration;
use tokio::codec::Framed;
use tokio_util::codec::Framed;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::timer::timeout;
use tokio::util::FutureExt;
use tokio_io_timeout::TimeoutStream;
use tokio::time::Timeout;
use types::EthSpec;
/// The maximum bytes that can be sent across the RPC.
@@ -171,7 +169,7 @@ impl ProtocolName for ProtocolId {
pub type InboundOutput<TSocket, TSpec> = (RPCRequest<TSpec>, InboundFramed<TSocket, TSpec>);
pub type InboundFramed<TSocket, TSpec> =
Framed<TimeoutStream<upgrade::Negotiated<TSocket>>, InboundCodec<TSpec>>;
Framed<Timeout<upgrade::Negotiated<TSocket>>, InboundCodec<TSpec>>;
type FnAndThen<TSocket, TSpec> = fn(
(Option<RPCRequest<TSpec>>, InboundFramed<TSocket, TSpec>),
) -> FutureResult<InboundOutput<TSocket, TSpec>, RPCError>;

View File

@@ -9,7 +9,7 @@ use libp2p::core::{
identity::Keypair,
multiaddr::Multiaddr,
muxing::StreamMuxerBox,
nodes::Substream,
connection::Substream,
transport::boxed::Boxed,
upgrade::{InboundUpgradeExt, OutboundUpgradeExt},
ConnectedPoint,
@@ -21,8 +21,10 @@ use std::io::prelude::*;
use std::io::{Error, ErrorKind};
use std::sync::Arc;
use std::time::Duration;
use tokio::timer::DelayQueue;
use tokio::time::DelayQueue;
use types::{EnrForkId, EthSpec};
use std::pin::Pin;
use std::task::{Poll, Context};
type Libp2pStream = Boxed<(PeerId, StreamMuxerBox), Error>;
type Libp2pBehaviour<TSpec> = Behaviour<Substream<StreamMuxerBox>, TSpec>;
@@ -180,17 +182,16 @@ impl<TSpec: EthSpec> Service<TSpec> {
}
impl<TSpec: EthSpec> Stream for Service<TSpec> {
type Item = BehaviourEvent<TSpec>;
type Error = error::Error;
type Item = Result<BehaviourEvent<TSpec>, error::Error>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match self.swarm.poll() {
Ok(Async::Ready(Some(event))) => {
return Ok(Async::Ready(Some(event)));
Poll::Ready(Some(event)) => {
return Poll::Ready(Some(event));
}
Ok(Async::Ready(None)) => unreachable!("Swarm stream shouldn't end"),
Ok(Async::NotReady) => break,
Ok(Poll::Ready(None)) => unreachable!("Swarm stream shouldn't end"),
Ok(Poll::Pending) => break,
_ => break,
}
}
@@ -198,7 +199,7 @@ impl<TSpec: EthSpec> Stream for Service<TSpec> {
// check if peers need to be banned
loop {
match self.peers_to_ban.poll() {
Ok(Async::Ready(Some(peer_id))) => {
Poll::Ready(Some(Ok(peer_id))) => {
let peer_id = peer_id.into_inner();
Swarm::ban_peer_id(&mut self.swarm, peer_id.clone());
// TODO: Correctly notify protocols of the disconnect
@@ -213,8 +214,8 @@ impl<TSpec: EthSpec> Stream for Service<TSpec> {
// inform the behaviour that the peer has been banned
self.swarm.peer_banned(peer_id);
}
Ok(Async::NotReady) | Ok(Async::Ready(None)) => break,
Err(e) => {
Poll::Pending | Poll::Ready(None) => break,
Poll::Ready(Some(Err(e))) => {
warn!(self.log, "Peer banning queue failed"; "error" => format!("{:?}", e));
}
}
@@ -223,20 +224,20 @@ impl<TSpec: EthSpec> Stream for Service<TSpec> {
// un-ban peer if it's timeout has expired
loop {
match self.peer_ban_timeout.poll() {
Ok(Async::Ready(Some(peer_id))) => {
Poll::Ready(Some(Ok(peer_id))) => {
let peer_id = peer_id.into_inner();
debug!(self.log, "Peer has been unbanned"; "peer" => format!("{:?}", peer_id));
self.swarm.peer_unbanned(&peer_id);
Swarm::unban_peer_id(&mut self.swarm, peer_id);
}
Ok(Async::NotReady) | Ok(Async::Ready(None)) => break,
Err(e) => {
Poll::Pending | Poll::Ready(None) => break,
Poll::Ready(Some(Err(e))) => {
warn!(self.log, "Peer banning timeout queue failed"; "error" => format!("{:?}", e));
}
}
}
Ok(Async::NotReady)
Poll::Pending
}
}