Updated all crates in eth2-libp2p

This commit is contained in:
Age Manning
2020-04-30 21:31:12 +10:00
13 changed files with 3059 additions and 2970 deletions

5522
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -5,38 +5,38 @@ authors = ["Age Manning <Age@AgeManning.com>"]
edition = "2018" edition = "2018"
[dependencies] [dependencies]
hex = "0.3" hex = "0.4.2"
# rust-libp2p is presently being sourced from a Sigma Prime fork of the
# `libp2p/rust-libp2p` repository.
libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "71cf486b4d992862f5a05f9f4ef5e5c1631f4add" }
types = { path = "../../eth2/types" } types = { path = "../../eth2/types" }
hashmap_delay = { path = "../../eth2/utils/hashmap_delay" } hashmap_delay = { path = "../../eth2/utils/hashmap_delay" }
eth2_ssz_types = { path = "../../eth2/utils/ssz_types" } eth2_ssz_types = { path = "../../eth2/utils/ssz_types" }
serde = { version = "1.0.102", features = ["derive"] } serde = { version = "1.0.106", features = ["derive"] }
serde_derive = "1.0.102" serde_derive = "1.0.106"
eth2_ssz = "0.1.2" eth2_ssz = "0.1.2"
eth2_ssz_derive = "0.1.0" eth2_ssz_derive = "0.1.0"
slog = { version = "2.5.2", features = ["max_level_trace"] } slog = { version = "2.5.2", features = ["max_level_trace"] }
version = { path = "../version" } version = { path = "../version" }
tokio = "0.1.22" tokio = { version = "0.2.20", features = ["time"] }
futures = "0.1.29" futures = "0.3.4"
error-chain = "0.12.1" error-chain = "0.12.2"
dirs = "2.0.2" dirs = "2.0.2"
fnv = "1.0.6" fnv = "1.0.6"
unsigned-varint = "0.2.3" unsigned-varint = "0.3.3"
lazy_static = "1.4.0" lazy_static = "1.4.0"
lighthouse_metrics = { path = "../../eth2/utils/lighthouse_metrics" } lighthouse_metrics = { path = "../../eth2/utils/lighthouse_metrics" }
tokio-io-timeout = "0.3.1" smallvec = "1.4.0"
smallvec = "1.0.0"
lru = "0.4.3" lru = "0.4.3"
parking_lot = "0.9.0" parking_lot = "0.10.2"
sha2 = "0.8.0" sha2 = "0.8.1"
base64 = "0.11.0" base64 = "0.12.0"
snap = "1" snap = "1.0.0"
void = "1.0.2" void = "1.0.2"
tokio-io-timeout = "0.4.0"
tokio-util = { version = "0.3.1", features = ["codec"] }
libp2p = "0.18.1"
discv5 = "0.1.0-alpha.1"
[dev-dependencies] [dev-dependencies]
slog-stdlog = "4.0.0" slog-stdlog = "4.0.0"
slog-term = "2.4.2" slog-term = "2.5.0"
slog-async = "2.3.0" slog-async = "2.5.0"
tempdir = "0.3" tempdir = "0.3.7"

View File

@@ -10,14 +10,15 @@ use libp2p::{
gossipsub::{Gossipsub, GossipsubEvent, MessageId}, gossipsub::{Gossipsub, GossipsubEvent, MessageId},
identify::{Identify, IdentifyEvent}, identify::{Identify, IdentifyEvent},
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}, swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess},
tokio_io::{AsyncRead, AsyncWrite},
NetworkBehaviour, PeerId, NetworkBehaviour, PeerId,
}; };
use tokio::io::{AsyncRead, AsyncWrite};
use lru::LruCache; use lru::LruCache;
use slog::{crit, debug, o, warn}; use slog::{crit, debug, o, warn};
use std::marker::PhantomData; use std::marker::PhantomData;
use std::sync::Arc; use std::sync::Arc;
use types::{EnrForkId, EthSpec, SubnetId}; use types::{EnrForkId, EthSpec, SubnetId};
use std::task::Poll;
const MAX_IDENTIFY_ADDRESSES: usize = 10; const MAX_IDENTIFY_ADDRESSES: usize = 10;
@@ -28,15 +29,15 @@ const MAX_IDENTIFY_ADDRESSES: usize = 10;
#[behaviour(out_event = "BehaviourEvent<TSpec>", poll_method = "poll")] #[behaviour(out_event = "BehaviourEvent<TSpec>", poll_method = "poll")]
pub struct Behaviour<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> { pub struct Behaviour<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> {
/// The routing pub-sub mechanism for eth2. /// The routing pub-sub mechanism for eth2.
gossipsub: Gossipsub<TSubstream>, gossipsub: Gossipsub,
/// The Eth2 RPC specified in the wire-0 protocol. /// The Eth2 RPC specified in the wire-0 protocol.
eth2_rpc: RPC<TSubstream, TSpec>, eth2_rpc: RPC<TSubstream, TSpec>,
/// Keep regular connection to peers and disconnect if absent. /// Keep regular connection to peers and disconnect if absent.
// TODO: Using id for initial interop. This will be removed by mainnet. // TODO: Using id for initial interop. This will be removed by mainnet.
/// Provides IP addresses and peer information. /// Provides IP addresses and peer information.
identify: Identify<TSubstream>, identify: Identify,
/// Discovery behaviour. /// Discovery behaviour.
discovery: Discovery<TSubstream, TSpec>, discovery: Discovery<TSpec>,
/// The peer manager that keeps track of peer's reputation and status. /// The peer manager that keeps track of peer's reputation and status.
#[behaviour(ignore)] #[behaviour(ignore)]
peer_manager: PeerManager<TSpec>, peer_manager: PeerManager<TSpec>,
@@ -114,12 +115,12 @@ impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> Behaviour<TSubstream, T
} }
/// Obtain a reference to the discovery protocol. /// Obtain a reference to the discovery protocol.
pub fn discovery(&self) -> &Discovery<TSubstream, TSpec> { pub fn discovery(&self) -> &Discovery<TSpec> {
&self.discovery &self.discovery
} }
/// Obtain a reference to the gossipsub protocol. /// Obtain a reference to the gossipsub protocol.
pub fn gs(&self) -> &Gossipsub<TSubstream> { pub fn gs(&self) -> &Gossipsub {
&self.gossipsub &self.gossipsub
} }
@@ -465,15 +466,15 @@ impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> Behaviour<TSubstream, T
/// Consumes the events list when polled. /// Consumes the events list when polled.
fn poll<TBehaviourIn>( fn poll<TBehaviourIn>(
&mut self, &mut self,
) -> Async<NetworkBehaviourAction<TBehaviourIn, BehaviourEvent<TSpec>>> { ) -> Poll<NetworkBehaviourAction<TBehaviourIn, BehaviourEvent<TSpec>>> {
// check the peer manager for events // check the peer manager for events
loop { loop {
match self.peer_manager.poll() { match self.peer_manager.poll() {
Ok(Async::Ready(Some(event))) => match event { Ok(Poll::Ready(Some(event))) => match event {
PeerManagerEvent::Status(peer_id) => { PeerManagerEvent::Status(peer_id) => {
// it's time to status. We don't keep a beacon chain reference here, so we inform // it's time to status. We don't keep a beacon chain reference here, so we inform
// the network to send a status to this peer // the network to send a status to this peer
return Async::Ready(NetworkBehaviourAction::GenerateEvent( return Poll::Ready(NetworkBehaviourAction::GenerateEvent(
BehaviourEvent::StatusPeer(peer_id), BehaviourEvent::StatusPeer(peer_id),
)); ));
} }
@@ -491,8 +492,8 @@ impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> Behaviour<TSubstream, T
//TODO: Implement //TODO: Implement
} }
}, },
Ok(Async::NotReady) => break, Poll::Pending => break,
Ok(Async::Ready(None)) | Err(_) => { Poll::Ready(None) | Err(_) => {
crit!(self.log, "Error polling peer manager"); crit!(self.log, "Error polling peer manager");
break; break;
} }
@@ -500,10 +501,10 @@ impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> Behaviour<TSubstream, T
} }
if !self.events.is_empty() { if !self.events.is_empty() {
return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0))); return Poll::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)));
} }
Async::NotReady Poll::Pending
} }
} }

View File

@@ -8,11 +8,13 @@ use crate::metrics;
use crate::{error, Enr, NetworkConfig, NetworkGlobals}; use crate::{error, Enr, NetworkConfig, NetworkGlobals};
use enr::{Eth2Enr, BITFIELD_ENR_KEY, ETH2_ENR_KEY}; use enr::{Eth2Enr, BITFIELD_ENR_KEY, ETH2_ENR_KEY};
use futures::prelude::*; use futures::prelude::*;
use libp2p::core::{ConnectedPoint, Multiaddr, PeerId}; use libp2p::core::{Multiaddr, PeerId};
use libp2p::discv5::enr::NodeId; use libp2p::discv5::enr::NodeId;
use libp2p::discv5::{Discv5, Discv5Event}; use libp2p::discv5::{Discv5, Discv5Event};
use libp2p::multiaddr::Protocol; use libp2p::multiaddr::Protocol;
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler}; use libp2p::swarm::{
DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler,
};
use slog::{crit, debug, info, warn}; use slog::{crit, debug, info, warn};
use ssz::{Decode, Encode}; use ssz::{Decode, Encode};
use ssz_types::BitVector; use ssz_types::BitVector;
@@ -20,9 +22,9 @@ use std::collections::{HashSet, VecDeque};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::task::Poll;
use tokio::io::{AsyncRead, AsyncWrite}; use std::time::Duration;
use tokio::timer::Delay; use tokio::time::{delay_until, Delay, Instant};
use types::{EnrForkId, EthSpec, SubnetId}; use types::{EnrForkId, EthSpec, SubnetId};
/// Maximum seconds before searching for extra peers. /// Maximum seconds before searching for extra peers.
@@ -36,7 +38,7 @@ const TARGET_SUBNET_PEERS: u64 = 3;
/// Lighthouse discovery behaviour. This provides peer management and discovery using the Discv5 /// Lighthouse discovery behaviour. This provides peer management and discovery using the Discv5
/// libp2p protocol. /// libp2p protocol.
pub struct Discovery<TSubstream, TSpec: EthSpec> { pub struct Discovery<TSpec: EthSpec> {
/// Events to be processed by the behaviour. /// Events to be processed by the behaviour.
events: VecDeque<NetworkBehaviourAction<void::Void, Discv5Event>>, events: VecDeque<NetworkBehaviourAction<void::Void, Discv5Event>>,
@@ -62,7 +64,7 @@ pub struct Discovery<TSubstream, TSpec: EthSpec> {
tcp_port: u16, tcp_port: u16,
/// The discovery behaviour used to discover new peers. /// The discovery behaviour used to discover new peers.
discovery: Discv5<TSubstream>, discovery: Discv5,
/// A collection of network constants that can be read from other threads. /// A collection of network constants that can be read from other threads.
network_globals: Arc<NetworkGlobals<TSpec>>, network_globals: Arc<NetworkGlobals<TSpec>>,
@@ -71,8 +73,8 @@ pub struct Discovery<TSubstream, TSpec: EthSpec> {
log: slog::Logger, log: slog::Logger,
} }
impl<TSubstream, TSpec: EthSpec> Discovery<TSubstream, TSpec> { impl<TSpec: EthSpec> Discovery<TSpec> {
pub fn new( pub async fn new(
local_key: &Keypair, local_key: &Keypair,
config: &NetworkConfig, config: &NetworkConfig,
network_globals: Arc<NetworkGlobals<TSpec>>, network_globals: Arc<NetworkGlobals<TSpec>>,
@@ -97,6 +99,7 @@ impl<TSubstream, TSpec: EthSpec> Discovery<TSubstream, TSpec> {
config.discv5_config.clone(), config.discv5_config.clone(),
listen_socket, listen_socket,
) )
.await
.map_err(|e| format!("Discv5 service failed. Error: {:?}", e))?; .map_err(|e| format!("Discv5 service failed. Error: {:?}", e))?;
// Add bootnodes to routing table // Add bootnodes to routing table
@@ -123,7 +126,7 @@ impl<TSubstream, TSpec: EthSpec> Discovery<TSubstream, TSpec> {
events: VecDeque::with_capacity(16), events: VecDeque::with_capacity(16),
banned_peers: HashSet::new(), banned_peers: HashSet::new(),
max_peers: config.max_peers, max_peers: config.max_peers,
peer_discovery_delay: Delay::new(Instant::now()), peer_discovery_delay: delay_until(Instant::now()),
past_discovery_delay: INITIAL_SEARCH_DELAY, past_discovery_delay: INITIAL_SEARCH_DELAY,
tcp_port: config.libp2p_port, tcp_port: config.libp2p_port,
discovery, discovery,
@@ -343,12 +346,9 @@ impl<TSubstream, TSpec: EthSpec> Discovery<TSubstream, TSpec> {
} }
// Redirect all behaviour events to underlying discovery behaviour. // Redirect all behaviour events to underlying discovery behaviour.
impl<TSubstream, TSpec: EthSpec> NetworkBehaviour for Discovery<TSubstream, TSpec> impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
where type ProtocolsHandler = <Discv5 as NetworkBehaviour>::ProtocolsHandler;
TSubstream: AsyncRead + AsyncWrite, type OutEvent = <Discv5 as NetworkBehaviour>::OutEvent;
{
type ProtocolsHandler = <Discv5<TSubstream> as NetworkBehaviour>::ProtocolsHandler;
type OutEvent = <Discv5<TSubstream> as NetworkBehaviour>::OutEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler { fn new_handler(&mut self) -> Self::ProtocolsHandler {
NetworkBehaviour::new_handler(&mut self.discovery) NetworkBehaviour::new_handler(&mut self.discovery)
@@ -359,31 +359,14 @@ where
self.discovery.addresses_of_peer(peer_id) self.discovery.addresses_of_peer(peer_id)
} }
fn inject_connected(&mut self, _peer_id: PeerId, _endpoint: ConnectedPoint) {} fn inject_connected(&mut self, _peer_id: &PeerId) {}
fn inject_disconnected(&mut self, _peer_id: &PeerId, _endpoint: ConnectedPoint) {} fn inject_disconnected(&mut self, _peer_id: &PeerId) {}
fn inject_replaced(
&mut self,
_peer_id: PeerId,
_closed: ConnectedPoint,
_opened: ConnectedPoint,
) {
// discv5 doesn't implement
}
fn inject_node_event(
&mut self,
_peer_id: PeerId,
_event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
) {
// discv5 doesn't implement
}
fn poll( fn poll(
&mut self, &mut self,
params: &mut impl PollParameters, params: &mut impl PollParameters,
) -> Async< ) -> Poll<
NetworkBehaviourAction< NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent, <Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
Self::OutEvent, Self::OutEvent,
@@ -392,7 +375,7 @@ where
// search for peers if it is time // search for peers if it is time
loop { loop {
match self.peer_discovery_delay.poll() { match self.peer_discovery_delay.poll() {
Ok(Async::Ready(_)) => { Poll::Ready(_) => {
if self.network_globals.connected_peers() < self.max_peers { if self.network_globals.connected_peers() < self.max_peers {
self.find_peers(); self.find_peers();
} }
@@ -401,7 +384,7 @@ where
Instant::now() + Duration::from_secs(MAX_TIME_BETWEEN_PEER_SEARCHES), Instant::now() + Duration::from_secs(MAX_TIME_BETWEEN_PEER_SEARCHES),
); );
} }
Ok(Async::NotReady) => break, Poll::Pending => break,
Err(e) => { Err(e) => {
warn!(self.log, "Discovery peer search failed"; "error" => format!("{:?}", e)); warn!(self.log, "Discovery peer search failed"; "error" => format!("{:?}", e));
} }
@@ -411,7 +394,7 @@ where
// Poll discovery // Poll discovery
loop { loop {
match self.discovery.poll(params) { match self.discovery.poll(params) {
Async::Ready(NetworkBehaviourAction::GenerateEvent(event)) => { Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => {
match event { match event {
Discv5Event::Discovered(_enr) => { Discv5Event::Discovered(_enr) => {
// peers that get discovered during a query but are not contactable or // peers that get discovered during a query but are not contactable or
@@ -434,7 +417,7 @@ where
let enr = self.discovery.local_enr(); let enr = self.discovery.local_enr();
enr::save_enr_to_disk(Path::new(&self.enr_dir), enr, &self.log); enr::save_enr_to_disk(Path::new(&self.enr_dir), enr, &self.log);
return Async::Ready(NetworkBehaviourAction::ReportObservedAddr { return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr {
address, address,
}); });
} }
@@ -465,8 +448,10 @@ where
{ {
debug!(self.log, "Connecting to discovered peer"; "peer_id"=> format!("{:?}", peer_id)); debug!(self.log, "Connecting to discovered peer"; "peer_id"=> format!("{:?}", peer_id));
self.network_globals.peers.write().dialing_peer(&peer_id); self.network_globals.peers.write().dialing_peer(&peer_id);
self.events self.events.push_back(NetworkBehaviourAction::DialPeer {
.push_back(NetworkBehaviourAction::DialPeer { peer_id }); peer_id,
condition: DialPeerCondition::NotDialing, // TODO: check if this is the condition we want
});
} }
} }
} }
@@ -474,16 +459,16 @@ where
} }
} }
// discv5 does not output any other NetworkBehaviourAction // discv5 does not output any other NetworkBehaviourAction
Async::Ready(_) => {} Poll::Ready(_) => {}
Async::NotReady => break, Poll::Pending => break,
} }
} }
// process any queued events // process any queued events
if let Some(event) = self.events.pop_front() { if let Some(event) = self.events.pop_front() {
return Async::Ready(event); return Poll::Ready(event);
} }
Async::NotReady Poll::Pending
} }
} }

View File

@@ -10,7 +10,9 @@ use hashmap_delay::HashSetDelay;
use libp2p::identify::IdentifyInfo; use libp2p::identify::IdentifyInfo;
use slog::{crit, debug, error, warn}; use slog::{crit, debug, error, warn};
use smallvec::SmallVec; use smallvec::SmallVec;
use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use types::EthSpec; use types::EthSpec;
@@ -314,34 +316,38 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
impl<TSpec: EthSpec> Stream for PeerManager<TSpec> { impl<TSpec: EthSpec> Stream for PeerManager<TSpec> {
type Item = PeerManagerEvent; type Item = PeerManagerEvent;
type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// poll the timeouts for pings and status' // poll the timeouts for pings and status'
while let Async::Ready(Some(peer_id)) = self.ping_peers.poll().map_err(|e| { // TODO: was getting a bit messy with while lets. Check if logic is preserved.
error!(self.log, "Failed to check for peers to ping"; "error" => format!("{}",e)); loop {
})? { match self.ping_peers.poll_next_unpin(cx) {
debug!(self.log, "Pinging peer"; "peer_id" => format!("{}", peer_id)); Poll::Ready(Some(Ok(peer_id))) => self.events.push(PeerManagerEvent::Ping(peer_id)),
// add the ping timer back Poll::Ready(Some(Err(e))) => {
self.ping_peers.insert(peer_id.clone()); error!(self.log, "Failed to check for peers to ping"; "error" => format!("{}",e))
self.events.push(PeerManagerEvent::Ping(peer_id)); }
Poll::Ready(None) | Poll::Pending => break,
}
} }
while let Async::Ready(Some(peer_id)) = self.status_peers.poll().map_err(|e| { loop {
error!(self.log, "Failed to check for peers to status"; "error" => format!("{}",e)); match self.status_peers.poll_next_unpin(cx) {
})? { Poll::Ready(Some(Ok(peer_id))) => {
debug!(self.log, "Sending Status to peer"; "peer_id" => format!("{}", peer_id)); self.events.push(PeerManagerEvent::Status(peer_id))
// add the status timer back }
self.status_peers.insert(peer_id.clone()); Poll::Ready(Some(Err(e))) => {
self.events.push(PeerManagerEvent::Status(peer_id)); error!(self.log, "Failed to check for peers to ping"; "error" => format!("{}",e))
}
Poll::Ready(None) | Poll::Pending => break,
}
} }
if !self.events.is_empty() { if !self.events.is_empty() {
return Ok(Async::Ready(Some(self.events.remove(0)))); return Poll::Ready(Some(self.events.remove(0)));
} else { } else {
self.events.shrink_to_fit(); self.events.shrink_to_fit();
} }
Ok(Async::NotReady) Poll::Pending
} }
} }

View File

@@ -4,10 +4,10 @@ use crate::rpc::{ErrorMessage, RPCErrorResponse, RPCRequest, RPCResponse};
use libp2p::bytes::BufMut; use libp2p::bytes::BufMut;
use libp2p::bytes::BytesMut; use libp2p::bytes::BytesMut;
use std::marker::PhantomData; use std::marker::PhantomData;
use tokio::codec::{Decoder, Encoder}; use tokio_util::codec::{Decoder, Encoder};
use types::EthSpec; use types::EthSpec;
pub trait OutboundCodec: Encoder + Decoder { pub trait OutboundCodec<TItem>: Encoder<TItem> + Decoder {
type ErrorType; type ErrorType;
fn decode_error( fn decode_error(
@@ -19,9 +19,9 @@ pub trait OutboundCodec: Encoder + Decoder {
/* Global Inbound Codec */ /* Global Inbound Codec */
// This deals with Decoding RPC Requests from other peers and encoding our responses // This deals with Decoding RPC Requests from other peers and encoding our responses
pub struct BaseInboundCodec<TCodec, TSpec> pub struct BaseInboundCodec<TCodec, TSpec, TItem>
where where
TCodec: Encoder + Decoder, TCodec: Encoder<TItem> + Decoder,
TSpec: EthSpec, TSpec: EthSpec,
{ {
/// Inner codec for handling various encodings /// Inner codec for handling various encodings
@@ -29,9 +29,9 @@ where
phantom: PhantomData<TSpec>, phantom: PhantomData<TSpec>,
} }
impl<TCodec, TSpec> BaseInboundCodec<TCodec, TSpec> impl<TCodec, TSpec, TItem> BaseInboundCodec<TCodec, TSpec, TItem>
where where
TCodec: Encoder + Decoder, TCodec: Encoder<TItem> + Decoder,
TSpec: EthSpec, TSpec: EthSpec,
{ {
pub fn new(codec: TCodec) -> Self { pub fn new(codec: TCodec) -> Self {
@@ -44,9 +44,9 @@ where
/* Global Outbound Codec */ /* Global Outbound Codec */
// This deals with Decoding RPC Responses from other peers and encoding our requests // This deals with Decoding RPC Responses from other peers and encoding our requests
pub struct BaseOutboundCodec<TOutboundCodec, TSpec> pub struct BaseOutboundCodec<TOutboundCodec, TSpec, TItem>
where where
TOutboundCodec: OutboundCodec, TOutboundCodec: OutboundCodec<TItem>,
TSpec: EthSpec, TSpec: EthSpec,
{ {
/// Inner codec for handling various encodings. /// Inner codec for handling various encodings.
@@ -56,10 +56,10 @@ where
phantom: PhantomData<TSpec>, phantom: PhantomData<TSpec>,
} }
impl<TOutboundCodec, TSpec> BaseOutboundCodec<TOutboundCodec, TSpec> impl<TOutboundCodec, TSpec, TItem> BaseOutboundCodec<TOutboundCodec, TSpec, TItem>
where where
TSpec: EthSpec, TSpec: EthSpec,
TOutboundCodec: OutboundCodec, TOutboundCodec: OutboundCodec<TItem>,
{ {
pub fn new(codec: TOutboundCodec) -> Self { pub fn new(codec: TOutboundCodec) -> Self {
BaseOutboundCodec { BaseOutboundCodec {
@@ -75,15 +75,14 @@ where
/* Base Inbound Codec */ /* Base Inbound Codec */
// This Encodes RPC Responses sent to external peers // This Encodes RPC Responses sent to external peers
impl<TCodec, TSpec> Encoder for BaseInboundCodec<TCodec, TSpec> impl<TCodec, TSpec> Encoder<RPCErrorResponse<TSpec>> for BaseInboundCodec<TCodec, TSpec, RPCErrorResponse<TSpec>>
where where
TSpec: EthSpec, TSpec: EthSpec,
TCodec: Decoder + Encoder<Item = RPCErrorResponse<TSpec>>, TCodec: Decoder + Encoder<RPCErrorResponse<TSpec>>,
{ {
type Item = RPCErrorResponse<TSpec>; type Error = <TCodec as Encoder<RPCErrorResponse<TSpec>>>::Error;
type Error = <TCodec as Encoder>::Error;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { fn encode(&mut self, item: RPCErrorResponse<TSpec>, dst: &mut BytesMut) -> Result<(), Self::Error> {
dst.clear(); dst.clear();
dst.reserve(1); dst.reserve(1);
dst.put_u8( dst.put_u8(
@@ -95,10 +94,12 @@ where
} }
// This Decodes RPC Requests from external peers // This Decodes RPC Requests from external peers
impl<TCodec, TSpec> Decoder for BaseInboundCodec<TCodec, TSpec> // TODO: check if the Item parameter is correct
impl<TCodec, TSpec> Decoder for BaseInboundCodec<TCodec, TSpec, RPCErrorResponse<TSpec>>
where where
TSpec: EthSpec, TSpec: EthSpec,
TCodec: Encoder + Decoder<Item = RPCRequest<TSpec>>, // TODO: check if the Item parameter is correct
TCodec: Encoder<RPCErrorResponse<TSpec>> + Decoder<Item = RPCRequest<TSpec>>,
{ {
type Item = RPCRequest<TSpec>; type Item = RPCRequest<TSpec>;
type Error = <TCodec as Decoder>::Error; type Error = <TCodec as Decoder>::Error;
@@ -111,24 +112,23 @@ where
/* Base Outbound Codec */ /* Base Outbound Codec */
// This Encodes RPC Requests sent to external peers // This Encodes RPC Requests sent to external peers
impl<TCodec, TSpec> Encoder for BaseOutboundCodec<TCodec, TSpec> impl<TCodec, TSpec> Encoder<RPCRequest<TSpec>> for BaseOutboundCodec<TCodec, TSpec, RPCRequest<TSpec>>
where where
TSpec: EthSpec, TSpec: EthSpec,
TCodec: OutboundCodec + Encoder<Item = RPCRequest<TSpec>>, TCodec: OutboundCodec<RPCRequest<TSpec>> + Encoder<RPCRequest<TSpec>>,
{ {
type Item = RPCRequest<TSpec>; type Error = <TCodec as Encoder<RPCRequest<TSpec>>>::Error;
type Error = <TCodec as Encoder>::Error;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { fn encode(&mut self, item: RPCRequest<TSpec>, dst: &mut BytesMut) -> Result<(), Self::Error> {
self.inner.encode(item, dst) self.inner.encode(item, dst)
} }
} }
// This decodes RPC Responses received from external peers // This decodes RPC Responses received from external peers
impl<TCodec, TSpec> Decoder for BaseOutboundCodec<TCodec, TSpec> impl<TCodec, TSpec> Decoder for BaseOutboundCodec<TCodec, TSpec, RPCRequest<TSpec>>
where where
TSpec: EthSpec, TSpec: EthSpec,
TCodec: OutboundCodec<ErrorType = ErrorMessage> + Decoder<Item = RPCResponse<TSpec>>, TCodec: OutboundCodec<RPCRequest<TSpec>, ErrorType = ErrorMessage> + Decoder<Item = RPCResponse<TSpec>>,
{ {
type Item = RPCErrorResponse<TSpec>; type Item = RPCErrorResponse<TSpec>;
type Error = <TCodec as Decoder>::Error; type Error = <TCodec as Decoder>::Error;

View File

@@ -8,25 +8,24 @@ use self::ssz_snappy::{SSZSnappyInboundCodec, SSZSnappyOutboundCodec};
use crate::rpc::protocol::RPCError; use crate::rpc::protocol::RPCError;
use crate::rpc::{RPCErrorResponse, RPCRequest}; use crate::rpc::{RPCErrorResponse, RPCRequest};
use libp2p::bytes::BytesMut; use libp2p::bytes::BytesMut;
use tokio::codec::{Decoder, Encoder}; use tokio_util::codec::{Decoder, Encoder};
use types::EthSpec; use types::EthSpec;
// Known types of codecs // Known types of codecs
pub enum InboundCodec<TSpec: EthSpec> { pub enum InboundCodec<TSpec: EthSpec, TItem> {
SSZSnappy(BaseInboundCodec<SSZSnappyInboundCodec<TSpec>, TSpec>), SSZSnappy(BaseInboundCodec<SSZSnappyInboundCodec<TSpec>, TSpec, TItem>),
SSZ(BaseInboundCodec<SSZInboundCodec<TSpec>, TSpec>), SSZ(BaseInboundCodec<SSZInboundCodec<TSpec>, TSpec, TItem>),
} }
pub enum OutboundCodec<TSpec: EthSpec> { pub enum OutboundCodec<TSpec: EthSpec, TItem> {
SSZSnappy(BaseOutboundCodec<SSZSnappyOutboundCodec<TSpec>, TSpec>), SSZSnappy(BaseOutboundCodec<SSZSnappyOutboundCodec<TSpec>, TSpec, TItem>),
SSZ(BaseOutboundCodec<SSZOutboundCodec<TSpec>, TSpec>), SSZ(BaseOutboundCodec<SSZOutboundCodec<TSpec>, TSpec, TItem>),
} }
impl<T: EthSpec> Encoder for InboundCodec<T> { impl<T: EthSpec> Encoder<RPCErrorResponse<T>> for InboundCodec<T, RPCErrorResponse<T>> {
type Item = RPCErrorResponse<T>;
type Error = RPCError; type Error = RPCError;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { fn encode(&mut self, item: RPCErrorResponse<T>, dst: &mut BytesMut) -> Result<(), Self::Error> {
match self { match self {
InboundCodec::SSZ(codec) => codec.encode(item, dst), InboundCodec::SSZ(codec) => codec.encode(item, dst),
InboundCodec::SSZSnappy(codec) => codec.encode(item, dst), InboundCodec::SSZSnappy(codec) => codec.encode(item, dst),
@@ -34,7 +33,7 @@ impl<T: EthSpec> Encoder for InboundCodec<T> {
} }
} }
impl<TSpec: EthSpec> Decoder for InboundCodec<TSpec> { impl<TSpec: EthSpec> Decoder for InboundCodec<TSpec, RPCErrorResponse<TSpec>> {
type Item = RPCRequest<TSpec>; type Item = RPCRequest<TSpec>;
type Error = RPCError; type Error = RPCError;
@@ -46,11 +45,10 @@ impl<TSpec: EthSpec> Decoder for InboundCodec<TSpec> {
} }
} }
impl<TSpec: EthSpec> Encoder for OutboundCodec<TSpec> { impl<TSpec: EthSpec> Encoder<RPCRequest<TSpec>> for OutboundCodec<TSpec, RPCRequest<TSpec>> {
type Item = RPCRequest<TSpec>;
type Error = RPCError; type Error = RPCError;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { fn encode(&mut self, item: RPCRequest<TSpec>, dst: &mut BytesMut) -> Result<(), Self::Error> {
match self { match self {
OutboundCodec::SSZ(codec) => codec.encode(item, dst), OutboundCodec::SSZ(codec) => codec.encode(item, dst),
OutboundCodec::SSZSnappy(codec) => codec.encode(item, dst), OutboundCodec::SSZSnappy(codec) => codec.encode(item, dst),
@@ -58,7 +56,7 @@ impl<TSpec: EthSpec> Encoder for OutboundCodec<TSpec> {
} }
} }
impl<T: EthSpec> Decoder for OutboundCodec<T> { impl<T: EthSpec> Decoder for OutboundCodec<T, RPCRequest<T>> {
type Item = RPCErrorResponse<T>; type Item = RPCErrorResponse<T>;
type Error = RPCError; type Error = RPCError;

View File

@@ -7,7 +7,7 @@ use crate::rpc::{ErrorMessage, RPCErrorResponse, RPCRequest, RPCResponse};
use libp2p::bytes::{BufMut, Bytes, BytesMut}; use libp2p::bytes::{BufMut, Bytes, BytesMut};
use ssz::{Decode, Encode}; use ssz::{Decode, Encode};
use std::marker::PhantomData; use std::marker::PhantomData;
use tokio::codec::{Decoder, Encoder}; use tokio_util::codec::{Decoder, Encoder};
use types::{EthSpec, SignedBeaconBlock}; use types::{EthSpec, SignedBeaconBlock};
use unsigned_varint::codec::UviBytes; use unsigned_varint::codec::UviBytes;
@@ -36,11 +36,10 @@ impl<T: EthSpec> SSZInboundCodec<T> {
} }
// Encoder for inbound streams: Encodes RPC Responses sent to peers. // Encoder for inbound streams: Encodes RPC Responses sent to peers.
impl<TSpec: EthSpec> Encoder for SSZInboundCodec<TSpec> { impl<TSpec: EthSpec> Encoder<RPCErrorResponse<TSpec>> for SSZInboundCodec<TSpec> {
type Item = RPCErrorResponse<TSpec>;
type Error = RPCError; type Error = RPCError;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { fn encode(&mut self, item: RPCErrorResponse<TSpec>, dst: &mut BytesMut) -> Result<(), Self::Error> {
let bytes = match item { let bytes = match item {
RPCErrorResponse::Success(resp) => match resp { RPCErrorResponse::Success(resp) => match resp {
RPCResponse::Status(res) => res.as_ssz_bytes(), RPCResponse::Status(res) => res.as_ssz_bytes(),
@@ -147,11 +146,10 @@ impl<TSpec: EthSpec> SSZOutboundCodec<TSpec> {
} }
// Encoder for outbound streams: Encodes RPC Requests to peers // Encoder for outbound streams: Encodes RPC Requests to peers
impl<TSpec: EthSpec> Encoder for SSZOutboundCodec<TSpec> { impl<TSpec: EthSpec> Encoder<RPCRequest<TSpec>> for SSZOutboundCodec<TSpec> {
type Item = RPCRequest<TSpec>;
type Error = RPCError; type Error = RPCError;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { fn encode(&mut self, item: RPCRequest<TSpec>, dst: &mut BytesMut) -> Result<(), Self::Error> {
let bytes = match item { let bytes = match item {
RPCRequest::Status(req) => req.as_ssz_bytes(), RPCRequest::Status(req) => req.as_ssz_bytes(),
RPCRequest::Goodbye(req) => req.as_ssz_bytes(), RPCRequest::Goodbye(req) => req.as_ssz_bytes(),
@@ -255,7 +253,7 @@ impl<TSpec: EthSpec> Decoder for SSZOutboundCodec<TSpec> {
} }
} }
impl<TSpec: EthSpec> OutboundCodec for SSZOutboundCodec<TSpec> { impl<TSpec: EthSpec> OutboundCodec<RPCRequest<TSpec>> for SSZOutboundCodec<TSpec> {
type ErrorType = ErrorMessage; type ErrorType = ErrorMessage;
fn decode_error(&mut self, src: &mut BytesMut) -> Result<Option<Self::ErrorType>, RPCError> { fn decode_error(&mut self, src: &mut BytesMut) -> Result<Option<Self::ErrorType>, RPCError> {

View File

@@ -12,7 +12,7 @@ use std::io::Cursor;
use std::io::ErrorKind; use std::io::ErrorKind;
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::marker::PhantomData; use std::marker::PhantomData;
use tokio::codec::{Decoder, Encoder}; use tokio_util::codec::{Decoder, Encoder};
use types::{EthSpec, SignedBeaconBlock}; use types::{EthSpec, SignedBeaconBlock};
use unsigned_varint::codec::Uvi; use unsigned_varint::codec::Uvi;
@@ -44,11 +44,10 @@ impl<T: EthSpec> SSZSnappyInboundCodec<T> {
} }
// Encoder for inbound streams: Encodes RPC Responses sent to peers. // Encoder for inbound streams: Encodes RPC Responses sent to peers.
impl<TSpec: EthSpec> Encoder for SSZSnappyInboundCodec<TSpec> { impl<TSpec: EthSpec> Encoder<RPCErrorResponse<TSpec>> for SSZSnappyInboundCodec<TSpec> {
type Item = RPCErrorResponse<TSpec>;
type Error = RPCError; type Error = RPCError;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { fn encode(&mut self, item: RPCErrorResponse<TSpec>, dst: &mut BytesMut) -> Result<(), Self::Error> {
let bytes = match item { let bytes = match item {
RPCErrorResponse::Success(resp) => match resp { RPCErrorResponse::Success(resp) => match resp {
RPCResponse::Status(res) => res.as_ssz_bytes(), RPCResponse::Status(res) => res.as_ssz_bytes(),
@@ -197,11 +196,10 @@ impl<TSpec: EthSpec> SSZSnappyOutboundCodec<TSpec> {
} }
// Encoder for outbound streams: Encodes RPC Requests to peers // Encoder for outbound streams: Encodes RPC Requests to peers
impl<TSpec: EthSpec> Encoder for SSZSnappyOutboundCodec<TSpec> { impl<TSpec: EthSpec> Encoder<RPCRequest<TSpec>> for SSZSnappyOutboundCodec<TSpec> {
type Item = RPCRequest<TSpec>;
type Error = RPCError; type Error = RPCError;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { fn encode(&mut self, item: RPCRequest<TSpec>, dst: &mut BytesMut) -> Result<(), Self::Error> {
let bytes = match item { let bytes = match item {
RPCRequest::Status(req) => req.as_ssz_bytes(), RPCRequest::Status(req) => req.as_ssz_bytes(),
RPCRequest::Goodbye(req) => req.as_ssz_bytes(), RPCRequest::Goodbye(req) => req.as_ssz_bytes(),
@@ -312,7 +310,7 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyOutboundCodec<TSpec> {
} }
} }
impl<TSpec: EthSpec> OutboundCodec for SSZSnappyOutboundCodec<TSpec> { impl<TSpec: EthSpec> OutboundCodec<RPCRequest<TSpec>> for SSZSnappyOutboundCodec<TSpec> {
type ErrorType = ErrorMessage; type ErrorType = ErrorMessage;
fn decode_error(&mut self, src: &mut BytesMut) -> Result<Option<Self::ErrorType>, RPCError> { fn decode_error(&mut self, src: &mut BytesMut) -> Result<Option<Self::ErrorType>, RPCError> {

View File

@@ -15,9 +15,10 @@ use libp2p::swarm::protocols_handler::{
use slog::{crit, debug, error, trace, warn}; use slog::{crit, debug, error, trace, warn};
use smallvec::SmallVec; use smallvec::SmallVec;
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
use std::task::{Context, Poll};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use tokio::timer::{delay_queue, DelayQueue}; use tokio::time::{delay_queue, DelayQueue};
use types::EthSpec; use types::EthSpec;
//TODO: Implement close() on the substream types to improve the poll code. //TODO: Implement close() on the substream types to improve the poll code.
@@ -37,7 +38,7 @@ type InboundRequestId = RequestId;
type OutboundRequestId = RequestId; type OutboundRequestId = RequestId;
/// Implementation of `ProtocolsHandler` for the RPC protocol. /// Implementation of `ProtocolsHandler` for the RPC protocol.
pub struct RPCHandler<TSubstream, TSpec> pub struct RPCHandler<'a, TSubstream, TSpec>
where where
TSubstream: AsyncRead + AsyncWrite, TSubstream: AsyncRead + AsyncWrite,
TSpec: EthSpec, TSpec: EthSpec,
@@ -61,7 +62,7 @@ where
inbound_substreams: FnvHashMap< inbound_substreams: FnvHashMap<
InboundRequestId, InboundRequestId,
( (
InboundSubstreamState<TSubstream, TSpec>, InboundSubstreamState<'a, TSubstream, TSpec>,
Option<delay_queue::Key>, Option<delay_queue::Key>,
), ),
>, >,
@@ -100,21 +101,18 @@ where
/// Logger for handling RPC streams /// Logger for handling RPC streams
log: slog::Logger, log: slog::Logger,
/// Marker to pin the generic stream.
_phantom: PhantomData<TSubstream>,
} }
/// State of an outbound substream. Either waiting for a response, or in the process of sending. /// State of an outbound substream. Either waiting for a response, or in the process of sending.
pub enum InboundSubstreamState<TSubstream, TSpec> pub enum InboundSubstreamState<'a, TSubstream, TSpec>
where where
TSubstream: AsyncRead + AsyncWrite, TSubstream: AsyncRead + AsyncWrite + Unpin,
TSpec: EthSpec, TSpec: EthSpec,
{ {
/// A response has been sent, pending writing and flush. /// A response has been sent, pending writing and flush.
ResponsePendingSend { ResponsePendingSend {
/// The substream used to send the response /// The substream used to send the response
substream: futures::sink::Send<InboundFramed<TSubstream, TSpec>>, substream: futures::sink::Send<'a, InboundFramed<TSubstream, TSpec>, RPCErrorResponse<TSpec>>,
/// Whether a stream termination is requested. If true the stream will be closed after /// Whether a stream termination is requested. If true the stream will be closed after
/// this send. Otherwise it will transition to an idle state until a stream termination is /// this send. Otherwise it will transition to an idle state until a stream termination is
/// requested or a timeout is reached. /// requested or a timeout is reached.
@@ -144,9 +142,8 @@ pub enum OutboundSubstreamState<TSubstream, TSpec: EthSpec> {
Poisoned, Poisoned,
} }
impl<TSubstream, TSpec> InboundSubstreamState<TSubstream, TSpec> impl<'a, TSubstream, TSpec> InboundSubstreamState<'a, TSpec, TSubstream>
where where
TSubstream: AsyncRead + AsyncWrite,
TSpec: EthSpec, TSpec: EthSpec,
{ {
/// Moves the substream state to closing and informs the connected peer. The /// Moves the substream state to closing and informs the connected peer. The
@@ -191,7 +188,7 @@ where
} }
} }
impl<TSubstream, TSpec> RPCHandler<TSubstream, TSpec> impl<'a, TSubstream, TSpec> RPCHandler<'a, TSubstream, TSpec>
where where
TSubstream: AsyncRead + AsyncWrite, TSubstream: AsyncRead + AsyncWrite,
TSpec: EthSpec, TSpec: EthSpec,
@@ -218,7 +215,6 @@ where
inactive_timeout, inactive_timeout,
outbound_io_error_retries: 0, outbound_io_error_retries: 0,
log: log.clone(), log: log.clone(),
_phantom: PhantomData,
} }
} }
@@ -251,15 +247,14 @@ where
} }
} }
impl<TSubstream, TSpec> ProtocolsHandler for RPCHandler<TSubstream, TSpec> impl<'a, TSubstream, TSpec> ProtocolsHandler for RPCHandler<'a, TSubstream, TSpec>
where where
TSubstream: AsyncRead + AsyncWrite, TSubstream: AsyncRead + AsyncWrite + Unpin,
TSpec: EthSpec, TSpec: EthSpec,
{ {
type InEvent = RPCEvent<TSpec>; type InEvent = RPCEvent<TSpec>;
type OutEvent = RPCEvent<TSpec>; type OutEvent = RPCEvent<TSpec>;
type Error = ProtocolsHandlerUpgrErr<RPCError>; type Error = ProtocolsHandlerUpgrErr<RPCError>;
type Substream = TSubstream;
type InboundProtocol = RPCProtocol<TSpec>; type InboundProtocol = RPCProtocol<TSpec>;
type OutboundProtocol = RPCRequest<TSpec>; type OutboundProtocol = RPCRequest<TSpec>;
type OutboundOpenInfo = RPCEvent<TSpec>; // Keep track of the id and the request type OutboundOpenInfo = RPCEvent<TSpec>; // Keep track of the id and the request
@@ -429,7 +424,7 @@ where
&mut self, &mut self,
request: Self::OutboundOpenInfo, request: Self::OutboundOpenInfo,
error: ProtocolsHandlerUpgrErr< error: ProtocolsHandlerUpgrErr<
<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error, <Self::OutboundProtocol as OutboundUpgrade<libp2p::swarm::NegotiatedSubstream>>::Error,
>, >,
) { ) {
if let ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(RPCError::IoError(_))) = error { if let ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(RPCError::IoError(_))) = error {
@@ -458,8 +453,12 @@ where
fn poll( fn poll(
&mut self, &mut self,
) -> Poll< ) -> Poll<
ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>, ProtocolsHandlerEvent<
Self::Error, Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::OutEvent,
Self::Error,
>,
> { > {
if let Some((request_id, err)) = self.pending_error.pop() { if let Some((request_id, err)) = self.pending_error.pop() {
// Returning an error here will result in dropping the peer. // Returning an error here will result in dropping the peer.
@@ -472,53 +471,52 @@ where
// other clients testing their software. In the future, we will need to decide // other clients testing their software. In the future, we will need to decide
// which protocols are a bare minimum to support before kicking the peer. // which protocols are a bare minimum to support before kicking the peer.
error!(self.log, "Peer doesn't support the RPC protocol"; "protocol" => protocol_string); error!(self.log, "Peer doesn't support the RPC protocol"; "protocol" => protocol_string);
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom(RPCEvent::Error(
RPCEvent::Error(request_id, RPCError::InvalidProtocol(protocol_string)), request_id,
))); RPCError::InvalidProtocol(protocol_string),
))));
} }
ProtocolsHandlerUpgrErr::Timeout | ProtocolsHandlerUpgrErr::Timer => { ProtocolsHandlerUpgrErr::Timeout | ProtocolsHandlerUpgrErr::Timer => {
// negotiation timeout, mark the request as failed // negotiation timeout, mark the request as failed
debug!(self.log, "Active substreams before timeout"; "len" => self.outbound_substreams.len()); debug!(self.log, "Active substreams before timeout"; "len" => self.outbound_substreams.len());
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom(RPCEvent::Error(
RPCEvent::Error( request_id,
request_id, RPCError::Custom("Protocol negotiation timeout".into()),
RPCError::Custom("Protocol negotiation timeout".into()), ))));
),
)));
} }
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)) => { ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)) => {
// IO/Decode/Custom Error, report to the application // IO/Decode/Custom Error, report to the application
debug!(self.log, "Upgrade Error"; "error" => format!("{}",err)); debug!(self.log, "Upgrade Error"; "error" => format!("{}",err));
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom(RPCEvent::Error(
RPCEvent::Error(request_id, err), request_id, err,
))); ))));
} }
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) => { ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) => {
// Error during negotiation // Error during negotiation
debug!(self.log, "Upgrade Error"; "error" => format!("{}",err)); debug!(self.log, "Upgrade Error"; "error" => format!("{}",err));
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom(RPCEvent::Error(
RPCEvent::Error(request_id, RPCError::Custom(format!("{}", err))), request_id,
))); RPCError::Custom(format!("{}", err)),
))));
} }
} }
} }
// return any events that need to be reported // return any events that need to be reported
if !self.events_out.is_empty() { if !self.events_out.is_empty() {
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom(self.events_out.remove(0))));
self.events_out.remove(0),
)));
} else { } else {
self.events_out.shrink_to_fit(); self.events_out.shrink_to_fit();
} }
// purge expired inbound substreams and send an error // purge expired inbound substreams and send an error
while let Async::Ready(Some(stream_id)) = // TODO: check if this pattern is equivalent to
self.inbound_substreams_delay.poll().map_err(|e| { // while let Async::Ready() = stream.poll().map_err(..)
while let Poll::Ready(Some(d)) = self.inbound_substreams_delay.poll() {
let stream_id = d.map_err(|e| {
warn!(self.log, "Inbound substream poll failed"; "error" => format!("{:?}", e)); warn!(self.log, "Inbound substream poll failed"; "error" => format!("{:?}", e));
ProtocolsHandlerUpgrErr::Timer ProtocolsHandlerUpgrErr::Timer
})? })?;
{
let rpc_id = stream_id.get_ref(); let rpc_id = stream_id.get_ref();
// handle a stream timeout for various states // handle a stream timeout for various states
@@ -535,15 +533,16 @@ where
} }
// purge expired outbound substreams // purge expired outbound substreams
if let Async::Ready(Some(stream_id)) = if let Poll::Ready(Some(d)) =
self.outbound_substreams_delay.poll().map_err(|e| { self.outbound_substreams_delay.poll() {
let stream_id = d.map_err(|e| {
warn!(self.log, "Outbound substream poll failed"; "error" => format!("{:?}", e)); warn!(self.log, "Outbound substream poll failed"; "error" => format!("{:?}", e));
ProtocolsHandlerUpgrErr::Timer ProtocolsHandlerUpgrErr::Timer
})? })?;
{
self.outbound_substreams.remove(stream_id.get_ref()); self.outbound_substreams.remove(stream_id.get_ref());
// notify the user // notify the user
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom(
RPCEvent::Error( RPCEvent::Error(
*stream_id.get_ref(), *stream_id.get_ref(),
RPCError::Custom("Stream timed out".into()), RPCError::Custom("Stream timed out".into()),
@@ -569,7 +568,7 @@ where
closing, closing,
} => { } => {
match substream.poll() { match substream.poll() {
Ok(Async::Ready(raw_substream)) => { Poll::Ready(Ok(raw_substream)) => {
// completed the send // completed the send
// close the stream if required // close the stream if required
@@ -587,7 +586,7 @@ where
); );
} }
} }
Ok(Async::NotReady) => { Poll::Pending => {
entry.get_mut().0 = entry.get_mut().0 =
InboundSubstreamState::ResponsePendingSend { InboundSubstreamState::ResponsePendingSend {
substream, substream,
@@ -599,7 +598,7 @@ where
self.inbound_substreams_delay.remove(delay_key); self.inbound_substreams_delay.remove(delay_key);
} }
entry.remove_entry(); entry.remove_entry();
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(0, e), RPCEvent::Error(0, e),
))); )));
} }
@@ -613,8 +612,9 @@ where
); );
} }
InboundSubstreamState::Closing(mut substream) => { InboundSubstreamState::Closing(mut substream) => {
// TODO: check if this is supposed to be a stream
match substream.close() { match substream.close() {
Ok(Async::Ready(())) | Err(_) => { Poll::Ready(_) => {
//trace!(self.log, "Inbound stream dropped"); //trace!(self.log, "Inbound stream dropped");
if let Some(delay_key) = &entry.get().1 { if let Some(delay_key) = &entry.get().1 {
self.inbound_substreams_delay.remove(delay_key); self.inbound_substreams_delay.remove(delay_key);
@@ -630,7 +630,7 @@ where
); );
} }
} // drop the stream } // drop the stream
Ok(Async::NotReady) => { Poll::Pending => {
entry.get_mut().0 = entry.get_mut().0 =
InboundSubstreamState::Closing(substream); InboundSubstreamState::Closing(substream);
} }
@@ -659,7 +659,7 @@ where
mut substream, mut substream,
request, request,
} => match substream.poll() { } => match substream.poll() {
Ok(Async::Ready(Some(response))) => { Poll::Ready(Some(Ok(response))) => {
if request.multiple_responses() && !response.is_error() { if request.multiple_responses() && !response.is_error() {
entry.get_mut().0 = entry.get_mut().0 =
OutboundSubstreamState::RequestPendingResponse { OutboundSubstreamState::RequestPendingResponse {
@@ -677,11 +677,11 @@ where
entry.get_mut().0 = OutboundSubstreamState::Closing(substream); entry.get_mut().0 = OutboundSubstreamState::Closing(substream);
} }
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom(
RPCEvent::Response(request_id, response), RPCEvent::Response(request_id, response),
))); )));
} }
Ok(Async::Ready(None)) => { Poll::Ready(None) => {
// stream closed // stream closed
// if we expected multiple streams send a stream termination, // if we expected multiple streams send a stream termination,
// else report the stream terminating only. // else report the stream terminating only.
@@ -693,7 +693,7 @@ where
// notify the application error // notify the application error
if request.multiple_responses() { if request.multiple_responses() {
// return an end of stream result // return an end of stream result
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom(
RPCEvent::Response( RPCEvent::Response(
request_id, request_id,
RPCErrorResponse::StreamTermination( RPCErrorResponse::StreamTermination(
@@ -702,7 +702,7 @@ where
), ),
))); )));
} // else we return an error, stream should not have closed early. } // else we return an error, stream should not have closed early.
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom(
RPCEvent::Error( RPCEvent::Error(
request_id, request_id,
RPCError::Custom( RPCError::Custom(
@@ -711,24 +711,25 @@ where
), ),
))); )));
} }
Ok(Async::NotReady) => { Poll::Pending => {
entry.get_mut().0 = OutboundSubstreamState::RequestPendingResponse { entry.get_mut().0 = OutboundSubstreamState::RequestPendingResponse {
substream, substream,
request, request,
} }
} }
Err(e) => { Poll::Ready(Some(Err(e))) => {
// drop the stream // drop the stream
let delay_key = &entry.get().1; let delay_key = &entry.get().1;
self.outbound_substreams_delay.remove(delay_key); self.outbound_substreams_delay.remove(delay_key);
entry.remove_entry(); entry.remove_entry();
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(request_id, e), RPCEvent::Error(request_id, e),
))); )));
} }
}, },
OutboundSubstreamState::Closing(mut substream) => match substream.close() { OutboundSubstreamState::Closing(mut substream) => match substream.close() {
Ok(Async::Ready(())) | Err(_) => { // TODO: check if this is supposed to be a stream
Poll::Ready(_)=> {
//trace!(self.log, "Outbound stream dropped"); //trace!(self.log, "Outbound stream dropped");
// drop the stream // drop the stream
let delay_key = &entry.get().1; let delay_key = &entry.get().1;
@@ -742,7 +743,7 @@ where
KeepAlive::Until(Instant::now() + self.inactive_timeout); KeepAlive::Until(Instant::now() + self.inactive_timeout);
} }
} }
Ok(Async::NotReady) => { Poll::Pending => {
entry.get_mut().0 = OutboundSubstreamState::Closing(substream); entry.get_mut().0 = OutboundSubstreamState::Closing(substream);
} }
}, },
@@ -762,7 +763,7 @@ where
let rpc_event = self.dial_queue.remove(0); let rpc_event = self.dial_queue.remove(0);
self.dial_queue.shrink_to_fit(); self.dial_queue.shrink_to_fit();
if let RPCEvent::Request(id, req) = rpc_event { if let RPCEvent::Request(id, req) = rpc_event {
return Ok(Async::Ready( return Poll::Ready(Ok(
ProtocolsHandlerEvent::OutboundSubstreamRequest { ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(req.clone()), protocol: SubstreamProtocol::new(req.clone()),
info: RPCEvent::Request(id, req), info: RPCEvent::Request(id, req),
@@ -770,16 +771,16 @@ where
)); ));
} }
} }
Ok(Async::NotReady) Poll::Pending
} }
} }
// Check for new items to send to the peer and update the underlying stream // Check for new items to send to the peer and update the underlying stream
fn apply_queued_responses<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec>( fn apply_queued_responses<'a, TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec>(
raw_substream: InboundFramed<TSubstream, TSpec>, raw_substream: InboundFramed<TSubstream, TSpec>,
queued_outbound_items: &mut Option<&mut Vec<RPCErrorResponse<TSpec>>>, queued_outbound_items: &mut Option<&mut Vec<RPCErrorResponse<TSpec>>>,
new_items_to_send: &mut bool, new_items_to_send: &mut bool,
) -> InboundSubstreamState<TSubstream, TSpec> { ) -> InboundSubstreamState<'a, TSubstream, TSpec> {
match queued_outbound_items { match queued_outbound_items {
Some(ref mut queue) if !queue.is_empty() => { Some(ref mut queue) if !queue.is_empty() => {
*new_items_to_send = true; *new_items_to_send = true;

View File

@@ -4,7 +4,6 @@
//! direct peer-to-peer communication primarily for sending/receiving chain information for //! direct peer-to-peer communication primarily for sending/receiving chain information for
//! syncing. //! syncing.
use futures::prelude::*;
use handler::RPCHandler; use handler::RPCHandler;
use libp2p::core::ConnectedPoint; use libp2p::core::ConnectedPoint;
use libp2p::swarm::{ use libp2p::swarm::{
@@ -22,6 +21,7 @@ use std::marker::PhantomData;
use std::time::Duration; use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use types::EthSpec; use types::EthSpec;
use std::task::{Poll, Context};
pub(crate) mod codec; pub(crate) mod codec;
mod handler; mod handler;
@@ -94,12 +94,12 @@ impl<TSubstream, TSpec: EthSpec> RPC<TSubstream, TSpec> {
} }
} }
impl<TSubstream, TSpec> NetworkBehaviour for RPC<TSubstream, TSpec> impl<'a, TSubstream, TSpec> NetworkBehaviour for RPC<TSubstream, TSpec>
where where
TSubstream: AsyncRead + AsyncWrite, TSubstream: AsyncRead + AsyncWrite,
TSpec: EthSpec, TSpec: EthSpec,
{ {
type ProtocolsHandler = RPCHandler<TSubstream, TSpec>; type ProtocolsHandler = RPCHandler<'a, TSubstream, TSpec>;
type OutEvent = RPCMessage<TSpec>; type OutEvent = RPCMessage<TSpec>;
fn new_handler(&mut self) -> Self::ProtocolsHandler { fn new_handler(&mut self) -> Self::ProtocolsHandler {
@@ -151,7 +151,7 @@ where
)); ));
} }
fn inject_node_event( fn inject_event(
&mut self, &mut self,
source: PeerId, source: PeerId,
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent, event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
@@ -165,17 +165,18 @@ where
fn poll( fn poll(
&mut self, &mut self,
_cx: &mut Context,
_: &mut impl PollParameters, _: &mut impl PollParameters,
) -> Async< ) -> Poll<
NetworkBehaviourAction< NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent, <Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
Self::OutEvent, Self::OutEvent,
>, >,
> { > {
if !self.events.is_empty() { if !self.events.is_empty() {
return Async::Ready(self.events.remove(0)); return Poll::Ready(self.events.remove(0));
} }
Async::NotReady Poll::Pending
} }
} }

View File

@@ -11,17 +11,17 @@ use crate::rpc::{
methods::ResponseTermination, methods::ResponseTermination,
}; };
use futures::future::*; use futures::future::*;
use futures::{future, sink, stream, Sink, Stream}; use futures::{future, sink, stream};
use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo}; use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo};
use std::io; use std::io;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::time::Duration; use std::time::Duration;
use tokio::codec::Framed;
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use tokio::timer::timeout; use tokio::time::Timeout;
use tokio::util::FutureExt;
use tokio_io_timeout::TimeoutStream; use tokio_io_timeout::TimeoutStream;
use tokio_util::codec::Framed;
use types::EthSpec; use types::EthSpec;
use std::pin::Pin;
/// The maximum bytes that can be sent across the RPC. /// The maximum bytes that can be sent across the RPC.
const MAX_RPC_SIZE: usize = 1_048_576; // 1M const MAX_RPC_SIZE: usize = 1_048_576; // 1M
@@ -171,12 +171,13 @@ impl ProtocolName for ProtocolId {
pub type InboundOutput<TSocket, TSpec> = (RPCRequest<TSpec>, InboundFramed<TSocket, TSpec>); pub type InboundOutput<TSocket, TSpec> = (RPCRequest<TSpec>, InboundFramed<TSocket, TSpec>);
pub type InboundFramed<TSocket, TSpec> = pub type InboundFramed<TSocket, TSpec> =
Framed<TimeoutStream<upgrade::Negotiated<TSocket>>, InboundCodec<TSpec>>; Framed<Timeout<upgrade::Negotiated<TSocket>>, InboundCodec<TSpec, RPCErrorResponse<TSpec>>>;
type FnAndThen<TSocket, TSpec> = fn( type FnAndThen<TSocket, TSpec> = fn(
(Option<RPCRequest<TSpec>>, InboundFramed<TSocket, TSpec>), (Option<RPCRequest<TSpec>>, InboundFramed<TSocket, TSpec>),
) -> FutureResult<InboundOutput<TSocket, TSpec>, RPCError>; ) -> Ready<Result<InboundOutput<TSocket, TSpec>, RPCError>>;
type FnMapErr<TSocket, TSpec> = // TODO: Error doesn't take a generic parameter in new tokio
fn(timeout::Error<(RPCError, InboundFramed<TSocket, TSpec>)>) -> RPCError; // Need to check implications
type FnMapErr = fn(tokio::time::Error) -> RPCError;
impl<TSocket, TSpec> InboundUpgrade<TSocket> for RPCProtocol<TSpec> impl<TSocket, TSpec> InboundUpgrade<TSocket> for RPCProtocol<TSpec>
where where
@@ -187,13 +188,10 @@ where
type Error = RPCError; type Error = RPCError;
type Future = future::Either< type Future = future::Either<
FutureResult<InboundOutput<TSocket, TSpec>, RPCError>, Ready<Result<InboundOutput<TSocket, TSpec>, RPCError>>,
future::AndThen< future::AndThen<
future::MapErr< future::MapErr<Timeout<stream::StreamFuture<InboundFramed<TSocket, TSpec>>>, FnMapErr>,
timeout::Timeout<stream::StreamFuture<InboundFramed<TSocket, TSpec>>>, Ready<Result<InboundOutput<TSocket, TSpec>, RPCError>>,
FnMapErr<TSocket, TSpec>,
>,
FutureResult<InboundOutput<TSocket, TSpec>, RPCError>,
FnAndThen<TSocket, TSpec>, FnAndThen<TSocket, TSpec>,
>, >,
>; >;
@@ -351,16 +349,18 @@ impl<TSpec: EthSpec> RPCRequest<TSpec> {
/* Outbound upgrades */ /* Outbound upgrades */
pub type OutboundFramed<TSocket, TSpec> = pub type OutboundFramed<TSocket, TSpec> =
Framed<upgrade::Negotiated<TSocket>, OutboundCodec<TSpec>>; Framed<upgrade::Negotiated<TSocket>, OutboundCodec<TSpec, RPCRequest<TSpec>>>;
impl<TSocket, TSpec> OutboundUpgrade<TSocket> for RPCRequest<TSpec> impl<TSocket, TSpec> OutboundUpgrade<TSocket> for RPCRequest<TSpec>
where where
TSpec: EthSpec, TSpec: EthSpec,
TSocket: AsyncRead + AsyncWrite, TSocket: AsyncRead + AsyncWrite + Send,
{ {
type Output = OutboundFramed<TSocket, TSpec>; type Output = OutboundFramed<TSocket, TSpec>;
type Error = RPCError; type Error = RPCError;
type Future = sink::Send<OutboundFramed<TSocket, TSpec>>; // TODO: Send takes a mutable reference to the sink now, hence the lifetime parameter
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
// type Future = sink::Send<'a, &'a mut OutboundFramed<TSocket, TSpec>, RPCRequest<TSpec>>;
fn upgrade_outbound( fn upgrade_outbound(
self, self,
socket: upgrade::Negotiated<TSocket>, socket: upgrade::Negotiated<TSocket>,
@@ -378,7 +378,7 @@ where
OutboundCodec::SSZ(ssz_codec) OutboundCodec::SSZ(ssz_codec)
} }
}; };
Framed::new(socket, codec).send(self) Box::pin(Framed::new(socket, codec).send(self))
} }
} }
@@ -416,8 +416,8 @@ impl From<ssz::DecodeError> for RPCError {
RPCError::SSZDecodeError(err) RPCError::SSZDecodeError(err)
} }
} }
impl<T> From<tokio::timer::timeout::Error<T>> for RPCError { impl From<tokio::time::Error> for RPCError {
fn from(err: tokio::timer::timeout::Error<T>) -> Self { fn from(err: tokio::time::Error) -> Self {
if err.is_elapsed() { if err.is_elapsed() {
RPCError::StreamTimeout RPCError::StreamTimeout
} else { } else {

View File

@@ -9,7 +9,7 @@ use libp2p::core::{
identity::Keypair, identity::Keypair,
multiaddr::Multiaddr, multiaddr::Multiaddr,
muxing::StreamMuxerBox, muxing::StreamMuxerBox,
nodes::Substream, connection::Substream,
transport::boxed::Boxed, transport::boxed::Boxed,
upgrade::{InboundUpgradeExt, OutboundUpgradeExt}, upgrade::{InboundUpgradeExt, OutboundUpgradeExt},
ConnectedPoint, ConnectedPoint,
@@ -21,8 +21,10 @@ use std::io::prelude::*;
use std::io::{Error, ErrorKind}; use std::io::{Error, ErrorKind};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::timer::DelayQueue; use tokio::time::DelayQueue;
use types::{EnrForkId, EthSpec}; use types::{EnrForkId, EthSpec};
use std::pin::Pin;
use std::task::{Poll, Context};
type Libp2pStream = Boxed<(PeerId, StreamMuxerBox), Error>; type Libp2pStream = Boxed<(PeerId, StreamMuxerBox), Error>;
type Libp2pBehaviour<TSpec> = Behaviour<Substream<StreamMuxerBox>, TSpec>; type Libp2pBehaviour<TSpec> = Behaviour<Substream<StreamMuxerBox>, TSpec>;
@@ -180,17 +182,16 @@ impl<TSpec: EthSpec> Service<TSpec> {
} }
impl<TSpec: EthSpec> Stream for Service<TSpec> { impl<TSpec: EthSpec> Stream for Service<TSpec> {
type Item = BehaviourEvent<TSpec>; type Item = Result<BehaviourEvent<TSpec>, error::Error>;
type Error = error::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop { loop {
match self.swarm.poll() { match self.swarm.poll() {
Ok(Async::Ready(Some(event))) => { Poll::Ready(Some(event)) => {
return Ok(Async::Ready(Some(event))); return Poll::Ready(Some(event));
} }
Ok(Async::Ready(None)) => unreachable!("Swarm stream shouldn't end"), Ok(Poll::Ready(None)) => unreachable!("Swarm stream shouldn't end"),
Ok(Async::NotReady) => break, Ok(Poll::Pending) => break,
_ => break, _ => break,
} }
} }
@@ -198,7 +199,7 @@ impl<TSpec: EthSpec> Stream for Service<TSpec> {
// check if peers need to be banned // check if peers need to be banned
loop { loop {
match self.peers_to_ban.poll() { match self.peers_to_ban.poll() {
Ok(Async::Ready(Some(peer_id))) => { Poll::Ready(Some(Ok(peer_id))) => {
let peer_id = peer_id.into_inner(); let peer_id = peer_id.into_inner();
Swarm::ban_peer_id(&mut self.swarm, peer_id.clone()); Swarm::ban_peer_id(&mut self.swarm, peer_id.clone());
// TODO: Correctly notify protocols of the disconnect // TODO: Correctly notify protocols of the disconnect
@@ -213,8 +214,8 @@ impl<TSpec: EthSpec> Stream for Service<TSpec> {
// inform the behaviour that the peer has been banned // inform the behaviour that the peer has been banned
self.swarm.peer_banned(peer_id); self.swarm.peer_banned(peer_id);
} }
Ok(Async::NotReady) | Ok(Async::Ready(None)) => break, Poll::Pending | Poll::Ready(None) => break,
Err(e) => { Poll::Ready(Some(Err(e))) => {
warn!(self.log, "Peer banning queue failed"; "error" => format!("{:?}", e)); warn!(self.log, "Peer banning queue failed"; "error" => format!("{:?}", e));
} }
} }
@@ -223,20 +224,20 @@ impl<TSpec: EthSpec> Stream for Service<TSpec> {
// un-ban peer if it's timeout has expired // un-ban peer if it's timeout has expired
loop { loop {
match self.peer_ban_timeout.poll() { match self.peer_ban_timeout.poll() {
Ok(Async::Ready(Some(peer_id))) => { Poll::Ready(Some(Ok(peer_id))) => {
let peer_id = peer_id.into_inner(); let peer_id = peer_id.into_inner();
debug!(self.log, "Peer has been unbanned"; "peer" => format!("{:?}", peer_id)); debug!(self.log, "Peer has been unbanned"; "peer" => format!("{:?}", peer_id));
self.swarm.peer_unbanned(&peer_id); self.swarm.peer_unbanned(&peer_id);
Swarm::unban_peer_id(&mut self.swarm, peer_id); Swarm::unban_peer_id(&mut self.swarm, peer_id);
} }
Ok(Async::NotReady) | Ok(Async::Ready(None)) => break, Poll::Pending | Poll::Ready(None) => break,
Err(e) => { Poll::Ready(Some(Err(e))) => {
warn!(self.log, "Peer banning timeout queue failed"; "error" => format!("{:?}", e)); warn!(self.log, "Peer banning timeout queue failed"; "error" => format!("{:?}", e));
} }
} }
} }
Ok(Async::NotReady) Poll::Pending
} }
} }