Complete initial testing of new RPC

This commit is contained in:
Age Manning
2019-07-23 22:45:42 +10:00
parent b350a78fec
commit 89ff7fb6b8
6 changed files with 54 additions and 126 deletions

View File

@@ -10,12 +10,11 @@ use eth2_libp2p::{
};
use futures::future::Future;
use futures::stream::Stream;
use slog::{debug, error, warn};
use ssz::Decode;
use std::collections::HashMap;
use slog::{debug, warn};
use ssz::{Decode, DecodeError};
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::mpsc;
use types::BeaconBlockHeader;
/// Handles messages received from the network and client and organises syncing.
pub struct MessageHandler<T: BeaconChainTypes> {
@@ -97,8 +96,6 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
HandlerMessage::PubsubMessage(peer_id, gossip) => {
self.handle_gossip(peer_id, *gossip);
}
//TODO: Handle all messages
_ => {}
}
}
@@ -115,7 +112,6 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
/// A new RPC request has been received from the network.
fn handle_rpc_request(&mut self, peer_id: PeerId, request_id: RequestId, request: RPCRequest) {
// TODO: process the `id`.
match request {
RPCRequest::Hello(hello_message) => self.sync.on_hello_request(
peer_id,
@@ -158,30 +154,6 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
id: RequestId,
error_response: RPCErrorResponse,
) {
//TODO: Potentially do not need to keep track of this at all. This has all been shifted
//into libp2p stack. Tracking Id's will only be necessary if a response is important
//relative to a specific request. Note: BeaconBlockBodies already returns with the data
//associated with its request.
// Currently leave this here for testing, to ensure it is redundant.
if self
.network_context
.outstanding_outgoing_request_ids
.remove(&(peer_id.clone(), id))
.is_none()
{
// This should never happen. The RPC layer handles all timeouts and ensures a response
// matches a request.
debug_assert!(false);
error!(
self.log,
"Unknown ResponseId for incoming RPCRequest";
"peer" => format!("{:?}", peer_id),
"request_id" => format!("{}", id)
);
return;
}
// an error could have occurred.
// TODO: Handle Error gracefully
match error_response {
@@ -214,25 +186,31 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
);
}
RPCResponse::BeaconBlockHeaders(response) => {
if let Some(decoded_block_headers) = self.decode_block_headers(response) {
self.sync.on_beacon_block_headers_response(
peer_id,
decoded_block_headers,
&mut self.network_context,
);
} else {
warn!(self.log, "Peer sent invalid block headers";"peer" => format!("{:?}", peer_id))
match self.decode_block_headers(response) {
Ok(decoded_block_headers) => {
self.sync.on_beacon_block_headers_response(
peer_id,
decoded_block_headers,
&mut self.network_context,
);
}
Err(_e) => {
warn!(self.log, "Peer sent invalid block headers";"peer" => format!("{:?}", peer_id))
}
}
}
RPCResponse::BeaconBlockBodies(response) => {
if let Some(decoded_block_bodies) = self.decode_block_bodies(response) {
self.sync.on_beacon_block_bodies_response(
peer_id,
decoded_block_bodies,
&mut self.network_context,
);
} else {
warn!(self.log, "Peer sent invalid block bodies";"peer" => format!("{:?}", peer_id))
match self.decode_block_bodies(response) {
Ok(decoded_block_bodies) => {
self.sync.on_beacon_block_bodies_response(
peer_id,
decoded_block_bodies,
&mut self.network_context,
);
}
Err(_e) => {
warn!(self.log, "Peer sent invalid block bodies";"peer" => format!("{:?}", peer_id))
}
}
}
RPCResponse::BeaconChainState(_) => {
@@ -252,27 +230,24 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
fn decode_block_bodies(
&self,
bodies_response: BeaconBlockBodiesResponse,
) -> Option<DecodedBeaconBlockBodiesResponse> {
) -> Result<DecodedBeaconBlockBodiesResponse, DecodeError> {
//TODO: Implement faster block verification before decoding entirely
if let Ok(simple_decoded_bodies) = simple_decoded_bodies {
Some(DecodedBeaconBlockBodiesResponse {
block_roots: bodies_response
.block_roots
.expect("Responses must have associated roots"),
block_bodies: Vec::from_ssz_bytes(&bodies_response.block_bodies).unwrap(),
})
} else {
None
}
let block_bodies = Vec::from_ssz_bytes(&bodies_response.block_bodies)?;
Ok(DecodedBeaconBlockBodiesResponse {
block_roots: bodies_response
.block_roots
.expect("Responses must have associated roots"),
block_bodies,
})
}
/// Verifies and decodes the ssz-encoded block headers received from peers.
fn decode_block_headers(
&self,
headers_response: BeaconBlockHeadersResponse,
) -> Option<EncodeableBeaconBlockHeadersResponse> {
) -> Result<Vec<BeaconBlockHeader>, DecodeError> {
//TODO: Implement faster header verification before decoding entirely
EncodeableBeaconBlockHeadersResponse::from_ssz_bytes(&headers_response.headers).ok()
Vec::from_ssz_bytes(&headers_response.headers)
}
/// Handle various RPC errors
@@ -297,25 +272,17 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
}
}
// TODO: RPC Rewrite makes this struct fairly pointless
pub struct NetworkContext {
/// The network channel to relay messages to the Network service.
network_send: mpsc::UnboundedSender<NetworkMessage>,
/// A mapping of peers and the RPC id we have sent an RPC request to.
outstanding_outgoing_request_ids: HashMap<(PeerId, RequestId), Instant>,
/// Stores the next `RequestId` we should include on an outgoing `RPCRequest` to a `PeerId`.
outgoing_request_ids: HashMap<PeerId, RequestId>,
/// The `MessageHandler` logger.
log: slog::Logger,
}
impl NetworkContext {
pub fn new(network_send: mpsc::UnboundedSender<NetworkMessage>, log: slog::Logger) -> Self {
Self {
network_send,
outstanding_outgoing_request_ids: HashMap::new(),
outgoing_request_ids: HashMap::new(),
log,
}
Self { network_send, log }
}
pub fn disconnect(&mut self, peer_id: PeerId, reason: GoodbyeReason) {
@@ -324,12 +291,9 @@ impl NetworkContext {
}
pub fn send_rpc_request(&mut self, peer_id: PeerId, rpc_request: RPCRequest) {
let id = self.generate_request_id(&peer_id);
self.outstanding_outgoing_request_ids
.insert((peer_id.clone(), id), Instant::now());
self.send_rpc_event(peer_id, RPCEvent::Request(id, rpc_request));
// Note: There is currently no use of keeping track of requests. However the functionality
// is left here for future revisions.
self.send_rpc_event(peer_id, RPCEvent::Request(0, rpc_request));
}
//TODO: Handle Error responses
@@ -359,15 +323,4 @@ impl NetworkContext {
)
});
}
/// Returns the next `RequestId` for sending an `RPCRequest` to the `peer_id`.
fn generate_request_id(&mut self, peer_id: &PeerId) -> RequestId {
let next_id = self
.outgoing_request_ids
.entry(peer_id.clone())
.and_modify(|id| *id += 1)
.or_insert_with(|| 0);
*next_id
}
}