Stable futures (#879)

* Port eth1 lib to use stable futures

* Port eth1_test_rig to stable futures

* Port eth1 tests to stable futures

* Port genesis service to stable futures

* Port genesis tests to stable futures

* Port beacon_chain to stable futures

* Port lcli to stable futures

* Fix eth1_test_rig (#1014)

* Fix lcli

* Port timer to stable futures

* Fix timer

* Port websocket_server to stable futures

* Port notifier to stable futures

* Add TODOS

* Update hashmap hashset to stable futures

* Adds panic test to hashset delay

* Port remote_beacon_node to stable futures

* Fix lcli merge conflicts

* Non rpc stuff compiles

* protocol.rs compiles

* Port websockets, timer and notifier to stable futures (#1035)

* Fix lcli

* Port timer to stable futures

* Fix timer

* Port websocket_server to stable futures

* Port notifier to stable futures

* Add TODOS

* Port remote_beacon_node to stable futures

* Partial eth2-libp2p stable future upgrade

* Finished first round of fighting RPC types

* Further progress towards porting eth2-libp2p adds caching to discovery

* Update behaviour

* RPC handler to stable futures

* Update RPC to master libp2p

* Network service additions

* Fix the fallback transport construction (#1102)

* Correct warning

* Remove hashmap delay

* Compiling version of eth2-libp2p

* Update all crates versions

* Fix conversion function and add tests (#1113)

* Port validator_client to stable futures (#1114)

* Add PH & MS slot clock changes

* Account for genesis time

* Add progress on duties refactor

* Add simple is_aggregator bool to val subscription

* Start work on attestation_verification.rs

* Add progress on ObservedAttestations

* Progress with ObservedAttestations

* Fix tests

* Add observed attestations to the beacon chain

* Add attestation observation to processing code

* Add progress on attestation verification

* Add first draft of ObservedAttesters

* Add more tests

* Add observed attesters to beacon chain

* Add observers to attestation processing

* Add more attestation verification

* Create ObservedAggregators map

* Remove commented-out code

* Add observed aggregators into chain

* Add progress

* Finish adding features to attestation verification

* Ensure beacon chain compiles

* Link attn verification into chain

* Integrate new attn verification in chain

* Remove old attestation processing code

* Start trying to fix beacon_chain tests

* Split adding into pools into two functions

* Add aggregation to harness

* Get test harness working again

* Adjust the number of aggregators for test harness

* Fix edge-case in harness

* Integrate new attn processing in network

* Fix compile bug in validator_client

* Update validator API endpoints

* Fix aggreagation in test harness

* Fix enum thing

* Fix attestation observation bug:

* Patch failing API tests

* Start adding comments to attestation verification

* Remove unused attestation field

* Unify "is block known" logic

* Update comments

* Supress fork choice errors for network processing

* Add todos

* Tidy

* Add gossip attn tests

* Disallow test harness to produce old attns

* Comment out in-progress tests

* Partially address pruning tests

* Fix failing store test

* Add aggregate tests

* Add comments about which spec conditions we check

* Dont re-aggregate

* Split apart test harness attn production

* Fix compile error in network

* Make progress on commented-out test

* Fix skipping attestation test

* Add fork choice verification tests

* Tidy attn tests, remove dead code

* Remove some accidentally added code

* Fix clippy lint

* Rename test file

* Add block tests, add cheap block proposer check

* Rename block testing file

* Add observed_block_producers

* Tidy

* Switch around block signature verification

* Finish block testing

* Remove gossip from signature tests

* First pass of self review

* Fix deviation in spec

* Update test spec tags

* Start moving over to hashset

* Finish moving observed attesters to hashmap

* Move aggregation pool over to hashmap

* Make fc attn borrow again

* Fix rest_api compile error

* Fix missing comments

* Fix monster test

* Uncomment increasing slots test

* Address remaining comments

* Remove unsafe, use cfg test

* Remove cfg test flag

* Fix dodgy comment

* Revert "Update hashmap hashset to stable futures"

This reverts commit d432378a3c.

* Revert "Adds panic test to hashset delay"

This reverts commit 281502396f.

* Ported attestation_service

* Ported duties_service

* Ported fork_service

* More ports

* Port block_service

* Minor fixes

* VC compiles

* Update TODOS

* Borrow self where possible

* Ignore aggregates that are already known.

* Unify aggregator modulo logic

* Fix typo in logs

* Refactor validator subscription logic

* Avoid reproducing selection proof

* Skip HTTP call if no subscriptions

* Rename DutyAndState -> DutyAndProof

* Tidy logs

* Print root as dbg

* Fix compile errors in tests

* Fix compile error in test

* Re-Fix attestation and duties service

* Minor fixes

Co-authored-by: Paul Hauner <paul@paulhauner.com>

* Network crate update to stable futures

* Port account_manager to stable futures (#1121)

* Port account_manager to stable futures

* Run async fns in tokio environment

* Port rest_api crate to stable futures (#1118)

* Port rest_api lib to stable futures

* Reduce tokio features

* Update notifier to stable futures

* Builder update

* Further updates

* Convert self referential async functions

* stable futures fixes (#1124)

* Fix eth1 update functions

* Fix genesis and client

* Fix beacon node lib

* Return appropriate runtimes from environment

* Fix test rig

* Refactor eth1 service update

* Upgrade simulator to stable futures

* Lighthouse compiles on stable futures

* Remove println debugging statement

* Update libp2p service, start rpc test upgrade

* Update network crate for new libp2p

* Update tokio::codec to futures_codec (#1128)

* Further work towards RPC corrections

* Correct http timeout and network service select

* Use tokio runtime for libp2p

* Revert "Update tokio::codec to futures_codec (#1128)"

This reverts commit e57aea924a.

* Upgrade RPC libp2p tests

* Upgrade secio fallback test

* Upgrade gossipsub examples

* Clean up RPC protocol

* Test fixes (#1133)

* Correct websocket timeout and run on os thread

* Fix network test

* Clean up PR

* Correct tokio tcp move attestation service tests

* Upgrade attestation service tests

* Correct network test

* Correct genesis test

* Test corrections

* Log info when block is received

* Modify logs and update attester service events

* Stable futures: fixes to vc, eth1 and account manager (#1142)

* Add local testnet scripts

* Remove whiteblock script

* Rename local testnet script

* Move spawns onto handle

* Fix VC panic

* Initial fix to block production issue

* Tidy block producer fix

* Tidy further

* Add local testnet clean script

* Run cargo fmt

* Tidy duties service

* Tidy fork service

* Tidy ForkService

* Tidy AttestationService

* Tidy notifier

* Ensure await is not suppressed in eth1

* Ensure await is not suppressed in account_manager

* Use .ok() instead of .unwrap_or(())

* RPC decoding test for proto

* Update discv5 and eth2-libp2p deps

* Fix lcli double runtime issue (#1144)

* Handle stream termination and dialing peer errors

* Correct peer_info variant types

* Remove unnecessary warnings

* Handle subnet unsubscription removal and improve logigng

* Add logs around ping

* Upgrade discv5 and improve logging

* Handle peer connection status for multiple connections

* Improve network service logging

* Improve logging around peer manager

* Upgrade swarm poll centralise peer management

* Identify clients on error

* Fix `remove_peer` in sync (#1150)

* remove_peer removes from all chains

* Remove logs

* Fix early return from loop

* Improved logging, fix panic

* Partially correct tests

* Stable futures: Vc sync (#1149)

* Improve syncing heuristic

* Add comments

* Use safer method for tolerance

* Fix tests

* Stable futures: Fix VC bug, update agg pool, add more metrics (#1151)

* Expose epoch processing summary

* Expose participation metrics to prometheus

* Switch to f64

* Reduce precision

* Change precision

* Expose observed attesters metrics

* Add metrics for agg/unagg attn counts

* Add metrics for gossip rx

* Add metrics for gossip tx

* Adds ignored attns to prom

* Add attestation timing

* Add timer for aggregation pool sig agg

* Add write lock timer for agg pool

* Add more metrics to agg pool

* Change map lock code

* Add extra metric to agg pool

* Change lock handling in agg pool

* Change .write() to .read()

* Add another agg pool timer

* Fix for is_aggregator

* Fix pruning bug

Co-authored-by: pawan <pawandhananjay@gmail.com>
Co-authored-by: Paul Hauner <paul@paulhauner.com>
This commit is contained in:
Age Manning
2020-05-17 21:16:48 +10:00
committed by GitHub
parent 21901b1615
commit b6408805a2
165 changed files with 7924 additions and 7733 deletions

View File

@@ -5,7 +5,6 @@ use super::methods::{ErrorMessage, RPCCodedResponse, RequestId, ResponseTerminat
use super::protocol::{Protocol, RPCError, RPCProtocol, RPCRequest};
use super::RPCEvent;
use crate::rpc::protocol::{InboundFramed, OutboundFramed};
use core::marker::PhantomData;
use fnv::FnvHashMap;
use futures::prelude::*;
use libp2p::core::upgrade::{
@@ -14,15 +13,18 @@ use libp2p::core::upgrade::{
use libp2p::swarm::protocols_handler::{
KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol,
};
use libp2p::swarm::NegotiatedSubstream;
use slog::{crit, debug, error, trace, warn};
use smallvec::SmallVec;
use std::collections::hash_map::Entry;
use std::time::{Duration, Instant};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::timer::{delay_queue, DelayQueue};
use std::{
collections::hash_map::Entry,
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant},
};
use tokio::time::{delay_queue, DelayQueue};
use types::EthSpec;
//TODO: Implement close() on the substream types to improve the poll code.
//TODO: Implement check_timeout() on the substream types
/// The time (in seconds) before a substream that is awaiting a response from the user times out.
@@ -39,9 +41,8 @@ type InboundRequestId = RequestId;
type OutboundRequestId = RequestId;
/// Implementation of `ProtocolsHandler` for the RPC protocol.
pub struct RPCHandler<TSubstream, TSpec>
pub struct RPCHandler<TSpec>
where
TSubstream: AsyncRead + AsyncWrite,
TSpec: EthSpec,
{
/// The upgrade for inbound substreams.
@@ -63,7 +64,7 @@ where
inbound_substreams: FnvHashMap<
InboundRequestId,
(
InboundSubstreamState<TSubstream, TSpec>,
InboundSubstreamState<TSpec>,
Option<delay_queue::Key>,
Protocol,
),
@@ -74,14 +75,8 @@ where
/// Map of outbound substreams that need to be driven to completion. The `RequestId` is
/// maintained by the application sending the request.
outbound_substreams: FnvHashMap<
OutboundRequestId,
(
OutboundSubstreamState<TSubstream, TSpec>,
delay_queue::Key,
Protocol,
),
>,
outbound_substreams:
FnvHashMap<OutboundRequestId, (OutboundSubstreamState<TSpec>, delay_queue::Key, Protocol)>,
/// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout.
outbound_substreams_delay: DelayQueue<OutboundRequestId>,
@@ -107,21 +102,27 @@ where
/// Logger for handling RPC streams
log: slog::Logger,
/// Marker to pin the generic stream.
_phantom: PhantomData<TSubstream>,
}
/// State of an outbound substream. Either waiting for a response, or in the process of sending.
pub enum InboundSubstreamState<TSubstream, TSpec>
pub enum InboundSubstreamState<TSpec>
where
TSubstream: AsyncRead + AsyncWrite,
TSpec: EthSpec,
{
/// A response has been sent, pending writing and flush.
/// A response has been sent, pending writing.
ResponsePendingSend {
/// The substream used to send the response
substream: futures::sink::Send<InboundFramed<TSubstream, TSpec>>,
substream: InboundFramed<NegotiatedSubstream, TSpec>,
/// The message that is attempting to be sent.
message: RPCCodedResponse<TSpec>,
/// Whether a stream termination is requested. If true the stream will be closed after
/// this send. Otherwise it will transition to an idle state until a stream termination is
/// requested or a timeout is reached.
closing: bool,
},
/// A response has been sent, pending flush.
ResponsePendingFlush {
/// The substream used to send the response
substream: InboundFramed<NegotiatedSubstream, TSpec>,
/// Whether a stream termination is requested. If true the stream will be closed after
/// this send. Otherwise it will transition to an idle state until a stream termination is
/// requested or a timeout is reached.
@@ -129,31 +130,31 @@ where
},
/// The response stream is idle and awaiting input from the application to send more chunked
/// responses.
ResponseIdle(InboundFramed<TSubstream, TSpec>),
ResponseIdle(InboundFramed<NegotiatedSubstream, TSpec>),
/// The substream is attempting to shutdown.
Closing(InboundFramed<TSubstream, TSpec>),
Closing(InboundFramed<NegotiatedSubstream, TSpec>),
/// Temporary state during processing
Poisoned,
}
pub enum OutboundSubstreamState<TSubstream, TSpec: EthSpec> {
/// State of an outbound substream. Either waiting for a response, or in the process of sending.
pub enum OutboundSubstreamState<TSpec: EthSpec> {
/// A request has been sent, and we are awaiting a response. This future is driven in the
/// handler because GOODBYE requests can be handled and responses dropped instantly.
RequestPendingResponse {
/// The framed negotiated substream.
substream: OutboundFramed<TSubstream, TSpec>,
substream: OutboundFramed<NegotiatedSubstream, TSpec>,
/// Keeps track of the actual request sent.
request: RPCRequest<TSpec>,
},
/// Closing an outbound substream>
Closing(OutboundFramed<TSubstream, TSpec>),
Closing(OutboundFramed<NegotiatedSubstream, TSpec>),
/// Temporary state during processing
Poisoned,
}
impl<TSubstream, TSpec> InboundSubstreamState<TSubstream, TSpec>
impl<TSpec> InboundSubstreamState<TSpec>
where
TSubstream: AsyncRead + AsyncWrite,
TSpec: EthSpec,
{
/// Moves the substream state to closing and informs the connected peer. The
@@ -172,18 +173,37 @@ where
RPCCodedResponse::StreamTermination(ResponseTermination::BlocksByRange);
match std::mem::replace(self, InboundSubstreamState::Poisoned) {
InboundSubstreamState::ResponsePendingSend { substream, closing } => {
// if we are busy awaiting a send/flush add the termination to the queue
InboundSubstreamState::ResponsePendingSend {
substream,
message,
closing,
} => {
if !closing {
outbound_queue.push(error);
outbound_queue.push(stream_termination);
}
// if the stream is closing after the send, allow it to finish
*self = InboundSubstreamState::ResponsePendingSend { substream, closing }
*self = InboundSubstreamState::ResponsePendingSend {
substream,
message,
closing,
}
}
// if we are busy awaiting a send/flush add the termination to the queue
InboundSubstreamState::ResponsePendingFlush { substream, closing } => {
if !closing {
outbound_queue.push(error);
outbound_queue.push(stream_termination);
}
// if the stream is closing after the send, allow it to finish
*self = InboundSubstreamState::ResponsePendingFlush { substream, closing }
}
InboundSubstreamState::ResponseIdle(substream) => {
*self = InboundSubstreamState::ResponsePendingSend {
substream: substream.send(error),
substream: substream,
message: error,
closing: true,
};
}
@@ -198,9 +218,8 @@ where
}
}
impl<TSubstream, TSpec> RPCHandler<TSubstream, TSpec>
impl<TSpec> RPCHandler<TSpec>
where
TSubstream: AsyncRead + AsyncWrite,
TSpec: EthSpec,
{
pub fn new(
@@ -225,7 +244,6 @@ where
inactive_timeout,
outbound_io_error_retries: 0,
log: log.clone(),
_phantom: PhantomData,
}
}
@@ -258,15 +276,13 @@ where
}
}
impl<TSubstream, TSpec> ProtocolsHandler for RPCHandler<TSubstream, TSpec>
impl<TSpec> ProtocolsHandler for RPCHandler<TSpec>
where
TSubstream: AsyncRead + AsyncWrite,
TSpec: EthSpec,
{
type InEvent = RPCEvent<TSpec>;
type OutEvent = RPCEvent<TSpec>;
type Error = ProtocolsHandlerUpgrErr<RPCError>;
type Substream = TSubstream;
type Error = RPCError;
type InboundProtocol = RPCProtocol<TSpec>;
type OutboundProtocol = RPCRequest<TSpec>;
type OutboundOpenInfo = (RequestId, RPCRequest<TSpec>); // Keep track of the id and the request
@@ -277,14 +293,14 @@ where
fn inject_fully_negotiated_inbound(
&mut self,
out: <RPCProtocol<TSpec> as InboundUpgrade<TSubstream>>::Output,
substream: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
) {
// update the keep alive timeout if there are no more remaining outbound streams
if let KeepAlive::Until(_) = self.keep_alive {
self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout);
}
let (req, substream) = out;
let (req, substream) = substream;
// drop the stream and return a 0 id for goodbye "requests"
if let r @ RPCRequest::Goodbye(_) = req {
self.events_out.push(RPCEvent::Request(0, r));
@@ -309,7 +325,7 @@ where
fn inject_fully_negotiated_outbound(
&mut self,
out: <RPCRequest<TSpec> as OutboundUpgrade<TSubstream>>::Output,
out: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
request_info: Self::OutboundOpenInfo,
) {
self.dial_negotiated -= 1;
@@ -394,15 +410,18 @@ where
// if it's a single rpc request or an error, close the stream after
*substream_state =
InboundSubstreamState::ResponsePendingSend {
substream: substream.send(response),
substream: substream,
message: response,
closing: !res_is_multiple | res_is_error, // close if an error or we are not expecting more responses
};
}
}
}
InboundSubstreamState::ResponsePendingSend { substream, closing }
if res_is_multiple =>
{
InboundSubstreamState::ResponsePendingSend {
substream,
message,
closing,
} if res_is_multiple => {
// the stream is in use, add the request to a pending queue
self.queued_outbound_items
.entry(rpc_id)
@@ -411,6 +430,22 @@ where
// return the state
*substream_state = InboundSubstreamState::ResponsePendingSend {
substream,
message,
closing,
};
}
InboundSubstreamState::ResponsePendingFlush { substream, closing }
if res_is_multiple =>
{
// the stream is in use, add the request to a pending queue
self.queued_outbound_items
.entry(rpc_id)
.or_insert_with(Vec::new)
.push(response);
// return the state
*substream_state = InboundSubstreamState::ResponsePendingFlush {
substream,
closing,
};
@@ -419,8 +454,20 @@ where
*substream_state = InboundSubstreamState::Closing(substream);
debug!(self.log, "Response not sent. Stream is closing"; "response" => format!("{}",response));
}
InboundSubstreamState::ResponsePendingSend { substream, .. } => {
InboundSubstreamState::ResponsePendingSend {
substream,
message,
..
} => {
*substream_state = InboundSubstreamState::ResponsePendingSend {
substream,
message,
closing: true,
};
error!(self.log, "Attempted sending multiple responses to a single response request");
}
InboundSubstreamState::ResponsePendingFlush { substream, .. } => {
*substream_state = InboundSubstreamState::ResponsePendingFlush {
substream,
closing: true,
};
@@ -433,7 +480,7 @@ where
}
}
None => {
warn!(self.log, "Stream has expired. Response not sent"; "response" => format!("{}", response));
warn!(self.log, "Stream has expired. Response not sent"; "response" => response.to_string(), "id" => rpc_id);
}
};
}
@@ -446,7 +493,7 @@ where
&mut self,
request_info: Self::OutboundOpenInfo,
error: ProtocolsHandlerUpgrErr<
<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error,
<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error,
>,
) {
let (id, req) = request_info;
@@ -470,7 +517,7 @@ where
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(
NegotiationError::ProtocolError(e),
)) => match e {
ProtocolError::IoError(io_err) => RPCError::IoError(io_err),
ProtocolError::IoError(io_err) => RPCError::IoError(io_err.to_string()),
ProtocolError::InvalidProtocol => {
RPCError::InternalError("Protocol was deemed invalid")
}
@@ -490,64 +537,82 @@ where
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>,
Self::Error,
ProtocolsHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::OutEvent,
Self::Error,
>,
> {
if !self.pending_error.is_empty() {
let (id, protocol, err) = self.pending_error.remove(0);
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(id, protocol, err),
return Poll::Ready(ProtocolsHandlerEvent::Custom(RPCEvent::Error(
id, protocol, err,
)));
}
// return any events that need to be reported
if !self.events_out.is_empty() {
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
self.events_out.remove(0),
)));
return Poll::Ready(ProtocolsHandlerEvent::Custom(self.events_out.remove(0)));
} else {
self.events_out.shrink_to_fit();
}
// purge expired inbound substreams and send an error
while let Async::Ready(Some(stream_id)) =
self.inbound_substreams_delay.poll().map_err(|e| {
warn!(self.log, "Inbound substream poll failed"; "error" => format!("{:?}", e));
ProtocolsHandlerUpgrErr::Timer
})?
{
let rpc_id = stream_id.get_ref();
loop {
match self.inbound_substreams_delay.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(stream_id))) => {
// handle a stream timeout for various states
if let Some((substream_state, delay_key, _)) =
self.inbound_substreams.get_mut(stream_id.get_ref())
{
// the delay has been removed
*delay_key = None;
// handle a stream timeout for various states
if let Some((substream_state, delay_key, _)) = self.inbound_substreams.get_mut(rpc_id) {
// the delay has been removed
*delay_key = None;
let outbound_queue = self
.queued_outbound_items
.entry(*rpc_id)
.or_insert_with(Vec::new);
substream_state.close(outbound_queue);
let outbound_queue = self
.queued_outbound_items
.entry(stream_id.into_inner())
.or_insert_with(Vec::new);
substream_state.close(outbound_queue);
}
}
Poll::Ready(Some(Err(e))) => {
warn!(self.log, "Inbound substream poll failed"; "error" => format!("{:?}", e));
// drops the peer if we cannot read the delay queue
return Poll::Ready(ProtocolsHandlerEvent::Close(RPCError::InternalError(
"Could not poll inbound stream timer",
)));
}
Poll::Pending | Poll::Ready(None) => break,
}
}
// purge expired outbound substreams
if let Async::Ready(Some(stream_id)) =
self.outbound_substreams_delay.poll().map_err(|e| {
warn!(self.log, "Outbound substream poll failed"; "error" => format!("{:?}", e));
ProtocolsHandlerUpgrErr::Timer
})?
{
if let Some((_id, _stream, protocol)) =
self.outbound_substreams.remove(stream_id.get_ref())
{
// notify the user
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(*stream_id.get_ref(), protocol, RPCError::StreamTimeout),
)));
} else {
crit!(self.log, "timed out substream not in the books"; "stream_id" => stream_id.get_ref());
loop {
match self.outbound_substreams_delay.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(stream_id))) => {
if let Some((_id, _stream, protocol)) =
self.outbound_substreams.remove(stream_id.get_ref())
{
// notify the user
return Poll::Ready(ProtocolsHandlerEvent::Custom(RPCEvent::Error(
*stream_id.get_ref(),
protocol,
RPCError::StreamTimeout,
)));
} else {
crit!(self.log, "timed out substream not in the books"; "stream_id" => stream_id.get_ref());
}
}
Poll::Ready(Some(Err(e))) => {
warn!(self.log, "Outbound substream poll failed"; "error" => format!("{:?}", e));
return Poll::Ready(ProtocolsHandlerEvent::Close(RPCError::InternalError(
"Could not poll outbound stream timer",
)));
}
Poll::Pending | Poll::Ready(None) => break,
}
}
@@ -566,20 +631,75 @@ where
) {
InboundSubstreamState::ResponsePendingSend {
mut substream,
message,
closing,
} => {
match substream.poll() {
Ok(Async::Ready(raw_substream)) => {
// completed the send
// close the stream if required
match Sink::poll_ready(Pin::new(&mut substream), cx) {
Poll::Ready(Ok(())) => {
// stream is ready to send data
match Sink::start_send(Pin::new(&mut substream), message) {
Ok(()) => {
// await flush
entry.get_mut().0 =
InboundSubstreamState::ResponsePendingFlush {
substream,
closing,
}
}
Err(e) => {
// error with sending in the codec
warn!(self.log, "Error sending RPC message"; "error" => e.to_string());
// keep connection with the peer and return the
// stream to awaiting response if this message
// wasn't closing the stream
// TODO: Duplicate code
if closing {
entry.get_mut().0 =
InboundSubstreamState::Closing(substream)
} else {
// check for queued chunks and update the stream
entry.get_mut().0 = apply_queued_responses(
substream,
&mut self
.queued_outbound_items
.get_mut(&request_id),
&mut new_items_to_send,
);
}
}
}
}
Poll::Ready(Err(e)) => {
error!(self.log, "Outbound substream error while sending RPC message: {:?}", e);
entry.remove();
return Poll::Ready(ProtocolsHandlerEvent::Close(e));
}
Poll::Pending => {
// the stream is not yet ready, continue waiting
entry.get_mut().0 =
InboundSubstreamState::ResponsePendingSend {
substream,
message,
closing,
};
}
}
}
InboundSubstreamState::ResponsePendingFlush {
mut substream,
closing,
} => {
match Sink::poll_flush(Pin::new(&mut substream), cx) {
Poll::Ready(Ok(())) => {
// finished flushing
// TODO: Duplicate code
if closing {
entry.get_mut().0 =
InboundSubstreamState::Closing(raw_substream)
InboundSubstreamState::Closing(substream)
} else {
// check for queued chunks and update the stream
entry.get_mut().0 = apply_queued_responses(
raw_substream,
substream,
&mut self
.queued_outbound_items
.get_mut(&request_id),
@@ -587,24 +707,34 @@ where
);
}
}
Ok(Async::NotReady) => {
Poll::Ready(Err(e)) => {
// error during flush
trace!(self.log, "Error sending flushing RPC message"; "error" => e.to_string());
// we drop the stream on error and inform the user, remove
// any pending requests
// TODO: Duplicate code
if let Some(delay_key) = &entry.get().1 {
self.inbound_substreams_delay.remove(delay_key);
}
self.queued_outbound_items.remove(&request_id);
entry.remove();
if self.outbound_substreams.is_empty()
&& self.inbound_substreams.is_empty()
{
self.keep_alive = KeepAlive::Until(
Instant::now() + self.inactive_timeout,
);
}
}
Poll::Pending => {
entry.get_mut().0 =
InboundSubstreamState::ResponsePendingSend {
InboundSubstreamState::ResponsePendingFlush {
substream,
closing,
};
}
Err(e) => {
if let Some(delay_key) = &entry.get().1 {
self.inbound_substreams_delay.remove(delay_key);
}
let protocol = entry.get().2;
entry.remove_entry();
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(0, protocol, e),
)));
}
};
}
}
InboundSubstreamState::ResponseIdle(substream) => {
entry.get_mut().0 = apply_queued_responses(
@@ -614,9 +744,8 @@ where
);
}
InboundSubstreamState::Closing(mut substream) => {
match substream.close() {
Ok(Async::Ready(())) | Err(_) => {
//trace!(self.log, "Inbound stream dropped");
match Sink::poll_close(Pin::new(&mut substream), cx) {
Poll::Ready(Ok(())) => {
if let Some(delay_key) = &entry.get().1 {
self.inbound_substreams_delay.remove(delay_key);
}
@@ -631,7 +760,25 @@ where
);
}
} // drop the stream
Ok(Async::NotReady) => {
Poll::Ready(Err(e)) => {
error!(self.log, "Error closing inbound stream"; "error" => e.to_string());
// drop the stream anyway
// TODO: Duplicate code
if let Some(delay_key) = &entry.get().1 {
self.inbound_substreams_delay.remove(delay_key);
}
self.queued_outbound_items.remove(&request_id);
entry.remove();
if self.outbound_substreams.is_empty()
&& self.inbound_substreams.is_empty()
{
self.keep_alive = KeepAlive::Until(
Instant::now() + self.inactive_timeout,
);
}
}
Poll::Pending => {
entry.get_mut().0 =
InboundSubstreamState::Closing(substream);
}
@@ -641,7 +788,7 @@ where
crit!(self.log, "Poisoned outbound substream");
unreachable!("Coding Error: Inbound Substream is poisoned");
}
};
}
}
Entry::Vacant(_) => unreachable!(),
}
@@ -659,8 +806,8 @@ where
OutboundSubstreamState::RequestPendingResponse {
mut substream,
request,
} => match substream.poll() {
Ok(Async::Ready(Some(response))) => {
} => match substream.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(response))) => {
if request.multiple_responses() && !response.is_error() {
entry.get_mut().0 =
OutboundSubstreamState::RequestPendingResponse {
@@ -678,11 +825,11 @@ where
entry.get_mut().0 = OutboundSubstreamState::Closing(substream);
}
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
return Poll::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Response(request_id, response),
)));
));
}
Ok(Async::Ready(None)) => {
Poll::Ready(None) => {
// stream closed
// if we expected multiple streams send a stream termination,
// else report the stream terminating only.
@@ -694,59 +841,62 @@ where
// notify the application error
if request.multiple_responses() {
// return an end of stream result
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
return Poll::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Response(
request_id,
RPCCodedResponse::StreamTermination(
request.stream_termination(),
),
),
)));
));
} // else we return an error, stream should not have closed early.
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
return Poll::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(
request_id,
request.protocol(),
RPCError::IncompleteStream,
),
)));
));
}
Ok(Async::NotReady) => {
Poll::Pending => {
entry.get_mut().0 = OutboundSubstreamState::RequestPendingResponse {
substream,
request,
}
}
Err(e) => {
Poll::Ready(Some(Err(e))) => {
// drop the stream
let delay_key = &entry.get().1;
self.outbound_substreams_delay.remove(delay_key);
let protocol = entry.get().2;
entry.remove_entry();
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
return Poll::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(request_id, protocol, e),
)));
));
}
},
OutboundSubstreamState::Closing(mut substream) => match substream.close() {
Ok(Async::Ready(())) | Err(_) => {
//trace!(self.log, "Outbound stream dropped");
// drop the stream
let delay_key = &entry.get().1;
self.outbound_substreams_delay.remove(delay_key);
entry.remove_entry();
OutboundSubstreamState::Closing(mut substream) => {
match Sink::poll_close(Pin::new(&mut substream), cx) {
// TODO: check if this is supposed to be a stream
Poll::Ready(_) => {
// drop the stream - including if there is an error
let delay_key = &entry.get().1;
self.outbound_substreams_delay.remove(delay_key);
entry.remove_entry();
if self.outbound_substreams.is_empty()
&& self.inbound_substreams.is_empty()
{
self.keep_alive =
KeepAlive::Until(Instant::now() + self.inactive_timeout);
if self.outbound_substreams.is_empty()
&& self.inbound_substreams.is_empty()
{
self.keep_alive = KeepAlive::Until(
Instant::now() + self.inactive_timeout,
);
}
}
Poll::Pending => {
entry.get_mut().0 = OutboundSubstreamState::Closing(substream);
}
}
Ok(Async::NotReady) => {
entry.get_mut().0 = OutboundSubstreamState::Closing(substream);
}
},
}
OutboundSubstreamState::Poisoned => {
crit!(self.log, "Poisoned outbound substream");
unreachable!("Coding Error: Outbound substream is poisoned")
@@ -762,23 +912,21 @@ where
self.dial_negotiated += 1;
let (id, req) = self.dial_queue.remove(0);
self.dial_queue.shrink_to_fit();
return Ok(Async::Ready(
ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(req.clone()),
info: (id, req),
},
));
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(req.clone()),
info: (id, req),
});
}
Ok(Async::NotReady)
Poll::Pending
}
}
// Check for new items to send to the peer and update the underlying stream
fn apply_queued_responses<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec>(
raw_substream: InboundFramed<TSubstream, TSpec>,
fn apply_queued_responses<TSpec: EthSpec>(
substream: InboundFramed<NegotiatedSubstream, TSpec>,
queued_outbound_items: &mut Option<&mut Vec<RPCCodedResponse<TSpec>>>,
new_items_to_send: &mut bool,
) -> InboundSubstreamState<TSubstream, TSpec> {
) -> InboundSubstreamState<TSpec> {
match queued_outbound_items {
Some(ref mut queue) if !queue.is_empty() => {
*new_items_to_send = true;
@@ -786,17 +934,18 @@ fn apply_queued_responses<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec>(
match queue.remove(0) {
RPCCodedResponse::StreamTermination(_) => {
// close the stream if this is a stream termination
InboundSubstreamState::Closing(raw_substream)
InboundSubstreamState::Closing(substream)
}
chunk => InboundSubstreamState::ResponsePendingSend {
substream: raw_substream.send(chunk),
substream: substream,
message: chunk,
closing: false,
},
}
}
_ => {
// no items queued set to idle
InboundSubstreamState::ResponseIdle(raw_substream)
InboundSubstreamState::ResponseIdle(substream)
}
}
}