diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2-libp2p/src/rpc/handler.rs index 2646c49a6c..9bb8500a34 100644 --- a/beacon_node/eth2-libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2-libp2p/src/rpc/handler.rs @@ -635,9 +635,9 @@ where for request_id in self.inbound_substreams.keys().copied().collect::>() { // Drain all queued items until all messages have been processed for this stream // TODO Improve this code logic - let mut new_items_to_send = true; - while new_items_to_send { - new_items_to_send = false; + let mut drive_stream_further = true; + while drive_stream_further { + drive_stream_further = false; match self.inbound_substreams.entry(request_id) { Entry::Occupied(mut entry) => { match std::mem::replace( @@ -659,7 +659,8 @@ where InboundSubstreamState::ResponsePendingFlush { substream, closing, - } + }; + drive_stream_further = true; } Err(e) => { // error with sending in the codec @@ -670,7 +671,8 @@ where // TODO: Duplicate code if closing { entry.get_mut().0 = - InboundSubstreamState::Closing(substream) + InboundSubstreamState::Closing(substream); + drive_stream_further = true; } else { // check for queued chunks and update the stream entry.get_mut().0 = apply_queued_responses( @@ -678,7 +680,7 @@ where &mut self .queued_outbound_items .get_mut(&request_id), - &mut new_items_to_send, + &mut drive_stream_further, ); } } @@ -711,7 +713,8 @@ where // TODO: Duplicate code if closing { entry.get_mut().0 = - InboundSubstreamState::Closing(substream) + InboundSubstreamState::Closing(substream); + drive_stream_further = true; } else { // check for queued chunks and update the stream entry.get_mut().0 = apply_queued_responses( @@ -719,7 +722,7 @@ where &mut self .queued_outbound_items .get_mut(&request_id), - &mut new_items_to_send, + &mut drive_stream_further, ); } } @@ -750,7 +753,7 @@ where entry.get_mut().0 = apply_queued_responses( substream, &mut self.queued_outbound_items.get_mut(&request_id), - &mut new_items_to_send, + &mut drive_stream_further, ); } InboundSubstreamState::Closing(mut substream) => {