diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index 476f16fd1f..cde39bf18b 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -179,36 +179,31 @@ impl Service { } } +// TODO: Convert to an async function via building a stored stream from libp2p swarm impl Stream for Service { type Item = Result, error::Error>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { - match self.swarm.poll() { + match self.swarm.poll_next_unpin(cx) { Poll::Ready(Some(event)) => { - return Poll::Ready(Some(event)); + return Poll::Ready(Some(Ok(event))); } - Ok(Poll::Ready(None)) => unreachable!("Swarm stream shouldn't end"), - Ok(Poll::Pending) => break, + Poll::Ready(None) => unreachable!("Swarm stream shouldn't end"), + Poll::Pending => break, _ => break, } } // check if peers need to be banned loop { - match self.peers_to_ban.poll() { + match self.peers_to_ban.poll_next_unpin(cx) { Poll::Ready(Some(Ok(peer_id))) => { let peer_id = peer_id.into_inner(); Swarm::ban_peer_id(&mut self.swarm, peer_id.clone()); // TODO: Correctly notify protocols of the disconnect // TODO: Also remove peer from the DHT: https://github.com/sigp/lighthouse/issues/629 - let dummy_connected_point = ConnectedPoint::Dialer { - address: "/ip4/0.0.0.0" - .parse::() - .expect("valid multiaddr"), - }; - self.swarm - .inject_disconnected(&peer_id, dummy_connected_point); + self.swarm.inject_disconnected(&peer_id); // inform the behaviour that the peer has been banned self.swarm.peer_banned(peer_id); } @@ -221,7 +216,7 @@ impl Stream for Service { // un-ban peer if it's timeout has expired loop { - match self.peer_ban_timeout.poll() { + match self.peer_ban_timeout.poll_next_unpin(cx) { Poll::Ready(Some(Ok(peer_id))) => { let peer_id = peer_id.into_inner(); debug!(self.log, "Peer has been unbanned"; "peer" => format!("{:?}", peer_id)); @@ -241,58 +236,32 @@ impl Stream for Service { /// The implementation supports TCP/IP, WebSockets over TCP/IP, noise/secio as the encryption layer, and /// mplex or yamux as the multiplexing layer. -fn build_transport(local_private_key: Keypair) -> Boxed<(PeerId, StreamMuxerBox), Error> { +fn build_transport( + local_private_key: Keypair, +) -> Result, io::Error> { // TODO: The Wire protocol currently doesn't specify encryption and this will need to be customised // in the future. let transport = libp2p::tcp::TcpConfig::new().nodelay(true); - let transport = libp2p::dns::DnsConfig::new(transport); + let transport = libp2p::dns::DnsConfig::new(transport)?; #[cfg(feature = "libp2p-websocket")] let transport = { let trans_clone = transport.clone(); transport.or_transport(websocket::WsConfig::new(trans_clone)) - }; - // Authentication - let transport = transport - .and_then(move |stream, endpoint| { - let upgrade = core::upgrade::SelectUpgrade::new( - generate_noise_config(&local_private_key), - secio::SecioConfig::new(local_private_key), - ); - core::upgrade::apply(stream, upgrade, endpoint, core::upgrade::Version::V1).and_then( - move |out| { - match out { - // Noise was negotiated - core::either::EitherOutput::First((remote_id, out)) => { - Ok((core::either::EitherOutput::First(out), remote_id)) - } - // Secio was negotiated - core::either::EitherOutput::Second((remote_id, out)) => { - Ok((core::either::EitherOutput::Second(out), remote_id)) - } - } - }, - ) - }) - .timeout(Duration::from_secs(20)); - - // Multiplexing - let transport = transport - .and_then(move |(stream, peer_id), endpoint| { - let peer_id2 = peer_id.clone(); - let upgrade = core::upgrade::SelectUpgrade::new( - libp2p::yamux::Config::default(), - libp2p::mplex::MplexConfig::new(), - ) - .map_inbound(move |muxer| (peer_id, muxer)) - .map_outbound(move |muxer| (peer_id2, muxer)); - - core::upgrade::apply(stream, upgrade, endpoint, core::upgrade::Version::V1) - .map(|(id, muxer)| (id, core::muxing::StreamMuxerBox::new(muxer))) - }) + }?; + transport + .upgrade(core::upgrade::Version::V1) + .authenticate(core::upgrade::SelectUpgrade::new( + generate_noise_config(&local_private_key), + secio::SecioConfig::new(local_private_key), + )) + .multiplex(core::upgrade::SelectUpgrade::new( + yamux::Config::default(), + mplex::MplexConfig::new(), + )) + .map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer))) .timeout(Duration::from_secs(20)) .map_err(|err| Error::new(ErrorKind::Other, err)) - .boxed(); - transport + .boxed() } fn keypair_from_hex(hex_bytes: &str) -> error::Result {