diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2-libp2p/src/rpc/handler.rs index ea59722850..72c0379d47 100644 --- a/beacon_node/eth2-libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2-libp2p/src/rpc/handler.rs @@ -43,8 +43,8 @@ where /// The upgrade for inbound substreams. listen_protocol: SubstreamProtocol, - /// If `Some`, something bad happened and we should shut down the handler with an error. - pending_error: Option<(RequestId, ProtocolsHandlerUpgrErr)>, + /// If something bad happened and we should shut down the handler with an error. + pending_error: Vec<(RequestId, ProtocolsHandlerUpgrErr)>, /// Queue of events to produce in `poll()`. events_out: SmallVec<[RPCEvent; 4]>, @@ -196,7 +196,7 @@ where ) -> Self { RPCHandler { listen_protocol, - pending_error: None, + pending_error: Vec::new(), events_out: SmallVec::new(), dial_queue: SmallVec::new(), dial_negotiated: 0, @@ -425,11 +425,7 @@ where 0 } }; - if self.pending_error.is_none() { - self.pending_error = Some((request_id, error)); - } else { - crit!(self.log, "Couldn't add error"); - } + self.pending_error.push((request_id, error)); } fn connection_keep_alive(&self) -> KeepAlive { @@ -442,7 +438,7 @@ where ProtocolsHandlerEvent, Self::Error, > { - if let Some((request_id, err)) = self.pending_error.take() { + if let Some((request_id, err)) = self.pending_error.pop() { // Returning an error here will result in dropping the peer. match err { ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply( @@ -598,6 +594,14 @@ where } self.queued_outbound_items.remove(&request_id); entry.remove(); + + if self.outbound_substreams.is_empty() + && self.inbound_substreams.is_empty() + { + self.keep_alive = KeepAlive::Until( + Instant::now() + self.inactive_timeout, + ); + } } // drop the stream Ok(Async::NotReady) => { entry.get_mut().0 = @@ -703,6 +707,13 @@ where let delay_key = &entry.get().1; self.outbound_substreams_delay.remove(delay_key); entry.remove_entry(); + + if self.outbound_substreams.is_empty() + && self.inbound_substreams.is_empty() + { + self.keep_alive = + KeepAlive::Until(Instant::now() + self.inactive_timeout); + } } Ok(Async::NotReady) => { entry.get_mut().0 = OutboundSubstreamState::Closing(substream);