Merge branch 'master' into eth1-deploy

This commit is contained in:
Paul Hauner
2019-11-29 13:17:06 +11:00
10 changed files with 101 additions and 102 deletions

View File

@@ -22,7 +22,6 @@ pub use libp2p::enr::Enr;
pub use libp2p::gossipsub::{Topic, TopicHash};
pub use libp2p::multiaddr;
pub use libp2p::Multiaddr;
pub use libp2p::{core::ConnectedPoint, swarm::NetworkBehaviour};
pub use libp2p::{
gossipsub::{GossipsubConfig, GossipsubConfigBuilder},
PeerId, Swarm,

View File

@@ -1,7 +1,9 @@
use crate::rpc::methods::*;
use crate::rpc::{
codec::base::OutboundCodec,
protocol::{ProtocolId, RPCError},
protocol::{
ProtocolId, RPCError, RPC_BLOCKS_BY_RANGE, RPC_BLOCKS_BY_ROOT, RPC_GOODBYE, RPC_STATUS,
},
};
use crate::rpc::{ErrorMessage, RPCErrorResponse, RPCRequest, RPCResponse};
use libp2p::bytes::{BufMut, Bytes, BytesMut};
@@ -43,7 +45,6 @@ impl Encoder for SSZInboundCodec {
RPCResponse::Status(res) => res.as_ssz_bytes(),
RPCResponse::BlocksByRange(res) => res, // already raw bytes
RPCResponse::BlocksByRoot(res) => res, // already raw bytes
RPCResponse::Goodbye => unreachable!("Never encode or decode this message"),
}
}
RPCErrorResponse::InvalidRequest(err) => err.as_ssz_bytes(),
@@ -76,25 +77,25 @@ impl Decoder for SSZInboundCodec {
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match self.inner.decode(src).map_err(RPCError::from) {
Ok(Some(packet)) => match self.protocol.message_name.as_str() {
"status" => match self.protocol.version.as_str() {
RPC_STATUS => match self.protocol.version.as_str() {
"1" => Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes(
&packet,
)?))),
_ => unreachable!("Cannot negotiate an unknown version"),
},
"goodbye" => match self.protocol.version.as_str() {
RPC_GOODBYE => match self.protocol.version.as_str() {
"1" => Ok(Some(RPCRequest::Goodbye(GoodbyeReason::from_ssz_bytes(
&packet,
)?))),
_ => unreachable!("Cannot negotiate an unknown version"),
},
"blocks_by_range" => match self.protocol.version.as_str() {
RPC_BLOCKS_BY_RANGE => match self.protocol.version.as_str() {
"1" => Ok(Some(RPCRequest::BlocksByRange(
BlocksByRangeRequest::from_ssz_bytes(&packet)?,
))),
_ => unreachable!("Cannot negotiate an unknown version"),
},
"blocks_by_root" => match self.protocol.version.as_str() {
RPC_BLOCKS_BY_ROOT => match self.protocol.version.as_str() {
"1" => Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest {
block_roots: Vec::from_ssz_bytes(&packet)?,
}))),
@@ -164,18 +165,18 @@ impl Decoder for SSZOutboundCodec {
// clear the buffer and return an empty object
src.clear();
match self.protocol.message_name.as_str() {
"status" => match self.protocol.version.as_str() {
RPC_STATUS => match self.protocol.version.as_str() {
"1" => Err(RPCError::Custom(
"Status stream terminated unexpectedly".into(),
)), // cannot have an empty HELLO message. The stream has terminated unexpectedly
_ => unreachable!("Cannot negotiate an unknown version"),
},
"goodbye" => Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response")),
"blocks_by_range" => match self.protocol.version.as_str() {
RPC_GOODBYE => Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response")),
RPC_BLOCKS_BY_RANGE => match self.protocol.version.as_str() {
"1" => Ok(Some(RPCResponse::BlocksByRange(Vec::new()))),
_ => unreachable!("Cannot negotiate an unknown version"),
},
"blocks_by_root" => match self.protocol.version.as_str() {
RPC_BLOCKS_BY_ROOT => match self.protocol.version.as_str() {
"1" => Ok(Some(RPCResponse::BlocksByRoot(Vec::new()))),
_ => unreachable!("Cannot negotiate an unknown version"),
},
@@ -188,20 +189,20 @@ impl Decoder for SSZOutboundCodec {
let raw_bytes = packet.take();
match self.protocol.message_name.as_str() {
"status" => match self.protocol.version.as_str() {
RPC_STATUS => match self.protocol.version.as_str() {
"1" => Ok(Some(RPCResponse::Status(StatusMessage::from_ssz_bytes(
&raw_bytes,
)?))),
_ => unreachable!("Cannot negotiate an unknown version"),
},
"goodbye" => {
RPC_GOODBYE => {
Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response"))
}
"blocks_by_range" => match self.protocol.version.as_str() {
RPC_BLOCKS_BY_RANGE => match self.protocol.version.as_str() {
"1" => Ok(Some(RPCResponse::BlocksByRange(raw_bytes.to_vec()))),
_ => unreachable!("Cannot negotiate an unknown version"),
},
"blocks_by_root" => match self.protocol.version.as_str() {
RPC_BLOCKS_BY_ROOT => match self.protocol.version.as_str() {
"1" => Ok(Some(RPCResponse::BlocksByRoot(raw_bytes.to_vec()))),
_ => unreachable!("Cannot negotiate an unknown version"),
},

View File

@@ -1,4 +1,4 @@
use super::methods::{RPCErrorResponse, RPCResponse, RequestId};
use super::methods::{RPCErrorResponse, RequestId};
use super::protocol::{RPCError, RPCProtocol, RPCRequest};
use super::RPCEvent;
use crate::rpc::protocol::{InboundFramed, OutboundFramed};
@@ -208,7 +208,6 @@ where
// drop the stream and return a 0 id for goodbye "requests"
if let r @ RPCRequest::Goodbye(_) = req {
self.events_out.push(RPCEvent::Request(0, r));
warn!(self.log, "Goodbye Received");
return;
}
@@ -245,14 +244,6 @@ where
// add the stream to substreams if we expect a response, otherwise drop the stream.
match rpc_event {
RPCEvent::Request(id, RPCRequest::Goodbye(_)) => {
// notify the application layer, that a goodbye has been sent, so the application can
// drop and remove the peer
self.events_out.push(RPCEvent::Response(
id,
RPCErrorResponse::Success(RPCResponse::Goodbye),
));
}
RPCEvent::Request(id, request) if request.expect_response() => {
// new outbound request. Store the stream and tag the output.
let delay_key = self

View File

@@ -139,9 +139,6 @@ pub enum RPCResponse {
/// A response to a get BLOCKS_BY_ROOT request.
BlocksByRoot(Vec<u8>),
/// A Goodbye message has been sent
Goodbye,
}
/// Indicates which response is being terminated by a stream termination response.
@@ -208,7 +205,6 @@ impl RPCErrorResponse {
RPCResponse::Status(_) => false,
RPCResponse::BlocksByRange(_) => true,
RPCResponse::BlocksByRoot(_) => true,
RPCResponse::Goodbye => false,
},
RPCErrorResponse::InvalidRequest(_) => true,
RPCErrorResponse::ServerError(_) => true,
@@ -252,7 +248,6 @@ impl std::fmt::Display for RPCResponse {
RPCResponse::Status(status) => write!(f, "{}", status),
RPCResponse::BlocksByRange(_) => write!(f, "<BlocksByRange>"),
RPCResponse::BlocksByRoot(_) => write!(f, "<BlocksByRoot>"),
RPCResponse::Goodbye => write!(f, "Goodbye Sent"),
}
}
}

View File

@@ -31,6 +31,16 @@ const TTFB_TIMEOUT: u64 = 5;
/// established before the stream is terminated.
const REQUEST_TIMEOUT: u64 = 15;
/// Protocol names to be used.
/// The Status protocol name.
pub const RPC_STATUS: &str = "status";
/// The Goodbye protocol name.
pub const RPC_GOODBYE: &str = "goodbye";
/// The `BlocksByRange` protocol name.
pub const RPC_BLOCKS_BY_RANGE: &str = "beacon_blocks_by_range";
/// The `BlocksByRoot` protocol name.
pub const RPC_BLOCKS_BY_ROOT: &str = "beacon_blocks_by_root";
#[derive(Debug, Clone)]
pub struct RPCProtocol;
@@ -40,10 +50,10 @@ impl UpgradeInfo for RPCProtocol {
fn protocol_info(&self) -> Self::InfoIter {
vec![
ProtocolId::new("status", "1", "ssz"),
ProtocolId::new("goodbye", "1", "ssz"),
ProtocolId::new("blocks_by_range", "1", "ssz"),
ProtocolId::new("blocks_by_root", "1", "ssz"),
ProtocolId::new(RPC_STATUS, "1", "ssz"),
ProtocolId::new(RPC_GOODBYE, "1", "ssz"),
ProtocolId::new(RPC_BLOCKS_BY_RANGE, "1", "ssz"),
ProtocolId::new(RPC_BLOCKS_BY_ROOT, "1", "ssz"),
]
}
}
@@ -171,10 +181,10 @@ impl RPCRequest {
pub fn supported_protocols(&self) -> Vec<ProtocolId> {
match self {
// add more protocols when versions/encodings are supported
RPCRequest::Status(_) => vec![ProtocolId::new("status", "1", "ssz")],
RPCRequest::Goodbye(_) => vec![ProtocolId::new("goodbye", "1", "ssz")],
RPCRequest::BlocksByRange(_) => vec![ProtocolId::new("blocks_by_range", "1", "ssz")],
RPCRequest::BlocksByRoot(_) => vec![ProtocolId::new("blocks_by_root", "1", "ssz")],
RPCRequest::Status(_) => vec![ProtocolId::new(RPC_STATUS, "1", "ssz")],
RPCRequest::Goodbye(_) => vec![ProtocolId::new(RPC_GOODBYE, "1", "ssz")],
RPCRequest::BlocksByRange(_) => vec![ProtocolId::new(RPC_BLOCKS_BY_RANGE, "1", "ssz")],
RPCRequest::BlocksByRoot(_) => vec![ProtocolId::new(RPC_BLOCKS_BY_ROOT, "1", "ssz")],
}
}

View File

@@ -9,19 +9,24 @@ use futures::prelude::*;
use futures::Stream;
use libp2p::core::{
identity::Keypair, multiaddr::Multiaddr, muxing::StreamMuxerBox, nodes::Substream,
transport::boxed::Boxed,
transport::boxed::Boxed, ConnectedPoint,
};
use libp2p::{core, secio, PeerId, Swarm, Transport};
use libp2p::{core, secio, swarm::NetworkBehaviour, PeerId, Swarm, Transport};
use slog::{crit, debug, info, trace, warn};
use smallvec::SmallVec;
use std::fs::File;
use std::io::prelude::*;
use std::io::{Error, ErrorKind};
use std::time::Duration;
use std::time::Instant;
type Libp2pStream = Boxed<(PeerId, StreamMuxerBox), Error>;
type Libp2pBehaviour = Behaviour<Substream<StreamMuxerBox>>;
const NETWORK_KEY_FILENAME: &str = "key";
/// The time in milliseconds to wait before banning a peer. This allows for any Goodbye messages to be
/// flushed and protocols to be negotiated.
const BAN_PEER_TIMEOUT: u64 = 200;
/// The configuration and state of the libp2p components for the beacon node.
pub struct Service {
@@ -32,8 +37,11 @@ pub struct Service {
/// This node's PeerId.
pub local_peer_id: PeerId,
/// A current list of peers to ban after a given timeout.
peers_to_ban: SmallVec<[(PeerId, Instant); 4]>,
/// Indicates if the listening address have been verified and compared to the expected ENR.
pub verified_listen_address: bool,
verified_listen_address: bool,
/// The libp2p logger handle.
pub log: slog::Logger,
@@ -156,10 +164,19 @@ impl Service {
Ok(Service {
local_peer_id,
swarm,
peers_to_ban: SmallVec::new(),
verified_listen_address: false,
log,
})
}
/// Adds a peer to be banned after a timeout period.
pub fn disconnect_and_ban_peer(&mut self, peer_id: PeerId) {
self.peers_to_ban.push((
peer_id,
Instant::now() + Duration::from_millis(BAN_PEER_TIMEOUT),
));
}
}
impl Stream for Service {
@@ -200,22 +217,43 @@ impl Stream for Service {
}
},
Ok(Async::Ready(None)) => unreachable!("Swarm stream shouldn't end"),
Ok(Async::NotReady) => {
// check to see if the address is different to the config. If so, update our ENR
if !self.verified_listen_address {
let multiaddr = Swarm::listeners(&self.swarm).next();
if let Some(multiaddr) = multiaddr {
if let Some(socket_addr) = multiaddr_to_socket_addr(multiaddr) {
self.swarm.update_local_enr_socket(socket_addr, true);
}
}
}
break;
}
Ok(Async::NotReady) => break,
_ => break,
}
}
// swarm is not ready
// check to see if the address is different to the config. If so, update our ENR
if !self.verified_listen_address {
let multiaddr = Swarm::listeners(&self.swarm).next();
if let Some(multiaddr) = multiaddr {
if let Some(socket_addr) = multiaddr_to_socket_addr(multiaddr) {
self.swarm.update_local_enr_socket(socket_addr, true);
}
}
}
// check if there are peers to ban
while !self.peers_to_ban.is_empty() {
if self.peers_to_ban[0].1 < Instant::now() {
let (peer_id, _) = self.peers_to_ban.remove(0);
warn!(self.log, "Disconnecting and banning peer"; "peer_id" => format!("{:?}", peer_id));
Swarm::ban_peer_id(&mut self.swarm, peer_id.clone());
// TODO: Correctly notify protocols of the disconnect
// TODO: Also remove peer from the DHT: https://github.com/sigp/lighthouse/issues/629
let dummy_connected_point = ConnectedPoint::Dialer {
address: "/ip4/0.0.0.0"
.parse::<Multiaddr>()
.expect("valid multiaddr"),
};
self.swarm
.inject_disconnected(&peer_id, dummy_connected_point);
// inform the behaviour that the peer has been banned
self.swarm.peer_banned(peer_id);
} else {
break;
}
}
Ok(Async::NotReady)
}
}