From 1b8225c76da2f3596e325c330b8f6d8a8346cb89 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Wed, 5 Apr 2023 22:13:39 +0530 Subject: [PATCH] Revert upgrade to tokio utils to reprocessing queue (#4167) --- Cargo.lock | 2 +- beacon_node/network/Cargo.toml | 2 +- .../work_reprocessing_queue.rs | 34 +++++++++++++++---- 3 files changed, 30 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 37bdb4294b..c2d36c7c85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5232,7 +5232,7 @@ dependencies = [ "task_executor", "tokio", "tokio-stream", - "tokio-util 0.7.7", + "tokio-util 0.6.10", "types", ] diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index ea415c0055..d068a20079 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -41,7 +41,7 @@ num_cpus = "1.13.0" lru_cache = { path = "../../common/lru_cache" } if-addrs = "0.6.4" strum = "0.24.0" -tokio-util = { version = "0.7.7", features = ["time"] } +tokio-util = { version = "0.6.3", features = ["time"] } derivative = "2.2.0" delay_map = "0.3.0" ethereum-types = { version = "0.14.1", optional = true } diff --git a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs index 4a56530799..2ec10439b3 100644 --- a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs +++ b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs @@ -21,7 +21,7 @@ use futures::task::Poll; use futures::{Stream, StreamExt}; use lighthouse_network::{MessageId, PeerId}; use logging::TimeLatch; -use slog::{debug, error, trace, warn, Logger}; +use slog::{crit, debug, error, trace, warn, Logger}; use slot_clock::SlotClock; use std::collections::{HashMap, HashSet}; use std::pin::Pin; @@ -29,6 +29,7 @@ use std::task::Context; use std::time::Duration; use task_executor::TaskExecutor; use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio::time::error::Error as TimeError; use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey}; use types::{ Attestation, EthSpec, Hash256, LightClientOptimisticUpdate, SignedAggregateAndProof, SubnetId, @@ -154,6 +155,8 @@ enum InboundEvent { ReadyAttestation(QueuedAttestationId), /// A light client update that is ready for re-processing. ReadyLightClientUpdate(QueuedLightClientUpdateId), + /// A `DelayQueue` returned an error. + DelayQueueError(TimeError, &'static str), /// A message sent to the `ReprocessQueue` Msg(ReprocessQueueMessage), } @@ -231,42 +234,54 @@ impl Stream for ReprocessQueue { // The sequential nature of blockchains means it is generally better to try and import all // existing blocks before new ones. match self.gossip_block_delay_queue.poll_expired(cx) { - Poll::Ready(Some(queued_block)) => { + Poll::Ready(Some(Ok(queued_block))) => { return Poll::Ready(Some(InboundEvent::ReadyGossipBlock( queued_block.into_inner(), ))); } + Poll::Ready(Some(Err(e))) => { + return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "gossip_block_queue"))); + } // `Poll::Ready(None)` means that there are no more entries in the delay queue and we // will continue to get this result until something else is added into the queue. Poll::Ready(None) | Poll::Pending => (), } match self.rpc_block_delay_queue.poll_expired(cx) { - Poll::Ready(Some(queued_block)) => { + Poll::Ready(Some(Ok(queued_block))) => { return Poll::Ready(Some(InboundEvent::ReadyRpcBlock(queued_block.into_inner()))); } + Poll::Ready(Some(Err(e))) => { + return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "rpc_block_queue"))); + } // `Poll::Ready(None)` means that there are no more entries in the delay queue and we // will continue to get this result until something else is added into the queue. Poll::Ready(None) | Poll::Pending => (), } match self.attestations_delay_queue.poll_expired(cx) { - Poll::Ready(Some(attestation_id)) => { + Poll::Ready(Some(Ok(attestation_id))) => { return Poll::Ready(Some(InboundEvent::ReadyAttestation( attestation_id.into_inner(), ))); } + Poll::Ready(Some(Err(e))) => { + return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "attestations_queue"))); + } // `Poll::Ready(None)` means that there are no more entries in the delay queue and we // will continue to get this result until something else is added into the queue. Poll::Ready(None) | Poll::Pending => (), } match self.lc_updates_delay_queue.poll_expired(cx) { - Poll::Ready(Some(lc_id)) => { + Poll::Ready(Some(Ok(lc_id))) => { return Poll::Ready(Some(InboundEvent::ReadyLightClientUpdate( lc_id.into_inner(), ))); } + Poll::Ready(Some(Err(e))) => { + return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "lc_updates_queue"))); + } // `Poll::Ready(None)` means that there are no more entries in the delay queue and we // will continue to get this result until something else is added into the queue. Poll::Ready(None) | Poll::Pending => (), @@ -689,7 +704,14 @@ impl ReprocessQueue { ); } } - + InboundEvent::DelayQueueError(e, queue_name) => { + crit!( + log, + "Failed to poll queue"; + "queue" => queue_name, + "e" => ?e + ) + } InboundEvent::ReadyAttestation(queued_id) => { metrics::inc_counter( &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_ATTESTATIONS,