Shift changes into message handler and simple sync for rpc-rewrite

This commit is contained in:
Age Manning
2019-07-16 22:32:37 +10:00
parent 704263e35f
commit 414d41cb57
11 changed files with 486 additions and 227 deletions

View File

@@ -9,13 +9,10 @@ use futures::{
sink, stream, Sink, Stream,
};
use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use ssz::Encode;
use ssz_derive::{Decode, Encode};
use std::io;
use std::time::Duration;
use tokio::codec::Framed;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::prelude::future::MapErr;
use tokio::prelude::*;
use tokio::timer::timeout;
use tokio::util::FutureExt;
@@ -24,25 +21,25 @@ use tokio::util::FutureExt;
const MAX_RPC_SIZE: usize = 4_194_304; // 4M
/// The protocol prefix the RPC protocol id.
const PROTOCOL_PREFIX: &str = "/eth/serenity/rpc/";
/// The number of seconds to wait for a response before the stream is terminated.
const RESPONSE_TIMEOUT: u64 = 10;
/// The number of seconds to wait for a request once a protocol has been established before the stream is terminated.
const REQUEST_TIMEOUT: u64 = 3;
/// Implementation of the `ConnectionUpgrade` for the RPC protocol.
#[derive(Debug, Clone)]
pub struct RPCProtocol;
impl UpgradeInfo for RPCProtocol {
type Info = &'static [u8];
type Info = RawProtocolId;
type InfoIter = Vec<Self::Info>;
fn protocol_info(&self) -> Self::InfoIter {
vec![
b"/eth/serenity/rpc/hello/1.0.0/ssz",
b"/eth/serenity/rpc/goodbye/1.0.0/ssz",
b"/eth/serenity/rpc/beacon_block_roots/1.0.0/ssz",
b"/eth/serenity/rpc/beacon_block_headers/1.0.0/ssz",
b"/eth/serenity/rpc/beacon_block_bodies/1.0.0/ssz",
b"/eth/serenity/rpc/beacon_chain_state/1.0.0/ssz",
ProtocolId::new("hello", "1.0.0", "ssz").into(),
ProtocolId::new("goodbye", "1.0.0", "ssz").into(),
ProtocolId::new("beacon_block_roots", "1.0.0", "ssz").into(),
ProtocolId::new("beacon_block_headers", "1.0.0", "ssz").into(),
ProtocolId::new("beacon_block_bodies", "1.0.0", "ssz").into(),
ProtocolId::new("beacon_chain_state", "1.0.0", "ssz").into(),
]
}
}
@@ -106,16 +103,18 @@ impl Into<RawProtocolId> for ProtocolId {
// The inbound protocol reads the request, decodes it and returns the stream to the protocol
// handler to respond to once ready.
type InboundFramed<TSocket> = Framed<upgrade::Negotiated<TSocket>, InboundCodec>;
type FnAndThen<TSocket> =
fn((Option<RPCRequest>, InboundFramed<TSocket>)) -> FutureResult<RPCRequest, RPCError>;
pub type InboundOutput<TSocket> = (RPCRequest, InboundFramed<TSocket>);
pub type InboundFramed<TSocket> = Framed<upgrade::Negotiated<TSocket>, InboundCodec>;
type FnAndThen<TSocket> = fn(
(Option<RPCRequest>, InboundFramed<TSocket>),
) -> FutureResult<InboundOutput<TSocket>, RPCError>;
type FnMapErr<TSocket> = fn(timeout::Error<(RPCError, InboundFramed<TSocket>)>) -> RPCError;
impl<TSocket> InboundUpgrade<TSocket> for RPCProtocol
where
TSocket: AsyncRead + AsyncWrite,
{
type Output = RPCRequest;
type Output = InboundOutput<TSocket>;
type Error = RPCError;
type Future = future::AndThen<
@@ -123,31 +122,34 @@ where
timeout::Timeout<stream::StreamFuture<InboundFramed<TSocket>>>,
FnMapErr<TSocket>,
>,
FutureResult<RPCRequest, RPCError>,
FutureResult<InboundOutput<TSocket>, RPCError>,
FnAndThen<TSocket>,
>;
fn upgrade_inbound(
self,
socket: upgrade::Negotiated<TSocket>,
protocol: &'static [u8],
protocol: RawProtocolId,
) -> Self::Future {
// TODO: Verify this
let protocol_id =
ProtocolId::from_bytes(protocol).expect("Can decode all supported protocols");
ProtocolId::from_bytes(&protocol).expect("Can decode all supported protocols");
match protocol_id.encoding.as_str() {
"ssz" | _ => {
let ssz_codec = BaseInboundCodec::new(SSZInboundCodec::new(protocol_id, 4096));
let ssz_codec =
BaseInboundCodec::new(SSZInboundCodec::new(protocol_id, MAX_RPC_SIZE));
let codec = InboundCodec::SSZ(ssz_codec);
Framed::new(socket, codec)
.into_future()
.timeout(Duration::from_secs(RESPONSE_TIMEOUT))
.timeout(Duration::from_secs(REQUEST_TIMEOUT))
.map_err(RPCError::from as FnMapErr<TSocket>)
.and_then({
|(madouby, _)| match madouby {
Some(x) => futures::future::ok(x),
None => futures::future::err(RPCError::Custom("Go home".into())),
|(req, stream)| match req {
Some(req) => futures::future::ok((req, stream)),
None => futures::future::err(RPCError::Custom(
"Stream terminated early".into(),
)),
}
} as FnAndThen<TSocket>)
}
@@ -163,7 +165,7 @@ where
#[derive(Debug, Clone)]
pub enum RPCRequest {
Hello(HelloMessage),
Goodbye(Goodbye),
Goodbye(GoodbyeReason),
BeaconBlockRoots(BeaconBlockRootsRequest),
BeaconBlockHeaders(BeaconBlockHeadersRequest),
BeaconBlockBodies(BeaconBlockBodiesRequest),
@@ -202,28 +204,12 @@ impl RPCRequest {
}
}
/// Encodes the Request object based on the negotiated protocol.
pub fn encode(&self, protocol: ProtocolId) -> Result<Vec<u8>, RPCError> {
// Match on the encoding and in the future, the version
match protocol.encoding.as_str() {
"ssz" => Ok(self.ssz_encode()),
_ => {
return Err(RPCError::Custom(format!(
"Unknown Encoding: {}",
protocol.encoding
)))
}
}
}
fn ssz_encode(&self) -> Vec<u8> {
/// This specifies whether a stream should remain open and await a response, given a request.
/// A GOODBYE request has no response.
pub fn expect_response(&self) -> bool {
match self {
RPCRequest::Hello(req) => req.as_ssz_bytes(),
RPCRequest::Goodbye(req) => req.as_ssz_bytes(),
RPCRequest::BeaconBlockRoots(req) => req.as_ssz_bytes(),
RPCRequest::BeaconBlockHeaders(req) => req.as_ssz_bytes(),
RPCRequest::BeaconBlockBodies(req) => req.as_ssz_bytes(),
RPCRequest::BeaconChainState(req) => req.as_ssz_bytes(),
RPCRequest::Goodbye(_) => false,
_ => true,
}
}
}
@@ -232,7 +218,7 @@ impl RPCRequest {
/* Outbound upgrades */
type OutboundFramed<TSocket> = Framed<upgrade::Negotiated<TSocket>, OutboundCodec>;
pub type OutboundFramed<TSocket> = Framed<upgrade::Negotiated<TSocket>, OutboundCodec>;
impl<TSocket> OutboundUpgrade<TSocket> for RPCRequest
where
@@ -323,11 +309,11 @@ impl std::error::Error for RPCError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match *self {
RPCError::ReadError(ref err) => Some(err),
RPCError::SSZDecodeError(ref err) => None,
RPCError::InvalidProtocol(ref err) => None,
RPCError::SSZDecodeError(_) => None,
RPCError::InvalidProtocol(_) => None,
RPCError::IoError(ref err) => Some(err),
RPCError::StreamTimeout => None,
RPCError::Custom(ref err) => None,
RPCError::Custom(_) => None,
}
}
}