Merge branch 'master' into proto-array

This commit is contained in:
Paul Hauner
2020-01-24 17:00:32 +11:00
115 changed files with 3943 additions and 1029 deletions

View File

@@ -145,7 +145,7 @@ where
// When terminating a stream, report the stream termination to the requesting user via
// an RPC error
let error = RPCErrorResponse::ServerError(ErrorMessage {
error_message: "Request timed out".as_bytes().to_vec(),
error_message: b"Request timed out".to_vec(),
});
// The stream termination type is irrelevant, this will terminate the
@@ -163,11 +163,16 @@ where
*self = InboundSubstreamState::ResponsePendingSend { substream, closing }
}
InboundSubstreamState::ResponseIdle(substream) => {
*self = InboundSubstreamState::ResponsePendingSend {
substream: substream.send(error),
closing: true,
};
InboundSubstreamState::ResponseIdle(mut substream) => {
// check if the stream is already closed
if let Ok(Async::Ready(None)) = substream.poll() {
*self = InboundSubstreamState::Closing(substream);
} else {
*self = InboundSubstreamState::ResponsePendingSend {
substream: substream.send(error),
closing: true,
};
}
}
InboundSubstreamState::Closing(substream) => {
// let the stream close
@@ -422,6 +427,8 @@ where
};
if self.pending_error.is_none() {
self.pending_error = Some((request_id, error));
} else {
crit!(self.log, "Couldn't add error");
}
}
@@ -452,6 +459,7 @@ where
}
ProtocolsHandlerUpgrErr::Timeout | ProtocolsHandlerUpgrErr::Timer => {
// negotiation timeout, mark the request as failed
debug!(self.log, "Active substreams before timeout"; "len" => self.outbound_substreams.len());
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(
request_id,
@@ -514,7 +522,7 @@ where
// notify the user
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(
stream_id.get_ref().clone(),
*stream_id.get_ref(),
RPCError::Custom("Stream timed out".into()),
),
)));
@@ -711,21 +719,18 @@ where
}
// establish outbound substreams
if !self.dial_queue.is_empty() {
if self.dial_negotiated < self.max_dial_negotiated {
self.dial_negotiated += 1;
let rpc_event = self.dial_queue.remove(0);
if let RPCEvent::Request(id, req) = rpc_event {
return Ok(Async::Ready(
ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(req.clone()),
info: RPCEvent::Request(id, req),
},
));
}
}
} else {
if !self.dial_queue.is_empty() && self.dial_negotiated < self.max_dial_negotiated {
self.dial_negotiated += 1;
let rpc_event = self.dial_queue.remove(0);
self.dial_queue.shrink_to_fit();
if let RPCEvent::Request(id, req) = rpc_event {
return Ok(Async::Ready(
ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(req.clone()),
info: RPCEvent::Request(id, req),
},
));
}
}
Ok(Async::NotReady)
}