Revert upgrade to tokio utils to reprocessing queue (#4167)

This commit is contained in:
Pawan Dhananjay
2023-04-05 22:13:39 +05:30
committed by GitHub
parent 3a21317600
commit 1b8225c76d
3 changed files with 30 additions and 8 deletions

View File

@@ -21,7 +21,7 @@ use futures::task::Poll;
use futures::{Stream, StreamExt};
use lighthouse_network::{MessageId, PeerId};
use logging::TimeLatch;
use slog::{debug, error, trace, warn, Logger};
use slog::{crit, debug, error, trace, warn, Logger};
use slot_clock::SlotClock;
use std::collections::{HashMap, HashSet};
use std::pin::Pin;
@@ -29,6 +29,7 @@ use std::task::Context;
use std::time::Duration;
use task_executor::TaskExecutor;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::time::error::Error as TimeError;
use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey};
use types::{
Attestation, EthSpec, Hash256, LightClientOptimisticUpdate, SignedAggregateAndProof, SubnetId,
@@ -154,6 +155,8 @@ enum InboundEvent<T: BeaconChainTypes> {
ReadyAttestation(QueuedAttestationId),
/// A light client update that is ready for re-processing.
ReadyLightClientUpdate(QueuedLightClientUpdateId),
/// A `DelayQueue` returned an error.
DelayQueueError(TimeError, &'static str),
/// A message sent to the `ReprocessQueue`
Msg(ReprocessQueueMessage<T>),
}
@@ -231,42 +234,54 @@ impl<T: BeaconChainTypes> Stream for ReprocessQueue<T> {
// The sequential nature of blockchains means it is generally better to try and import all
// existing blocks before new ones.
match self.gossip_block_delay_queue.poll_expired(cx) {
Poll::Ready(Some(queued_block)) => {
Poll::Ready(Some(Ok(queued_block))) => {
return Poll::Ready(Some(InboundEvent::ReadyGossipBlock(
queued_block.into_inner(),
)));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "gossip_block_queue")));
}
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
Poll::Ready(None) | Poll::Pending => (),
}
match self.rpc_block_delay_queue.poll_expired(cx) {
Poll::Ready(Some(queued_block)) => {
Poll::Ready(Some(Ok(queued_block))) => {
return Poll::Ready(Some(InboundEvent::ReadyRpcBlock(queued_block.into_inner())));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "rpc_block_queue")));
}
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
Poll::Ready(None) | Poll::Pending => (),
}
match self.attestations_delay_queue.poll_expired(cx) {
Poll::Ready(Some(attestation_id)) => {
Poll::Ready(Some(Ok(attestation_id))) => {
return Poll::Ready(Some(InboundEvent::ReadyAttestation(
attestation_id.into_inner(),
)));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "attestations_queue")));
}
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
Poll::Ready(None) | Poll::Pending => (),
}
match self.lc_updates_delay_queue.poll_expired(cx) {
Poll::Ready(Some(lc_id)) => {
Poll::Ready(Some(Ok(lc_id))) => {
return Poll::Ready(Some(InboundEvent::ReadyLightClientUpdate(
lc_id.into_inner(),
)));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "lc_updates_queue")));
}
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
Poll::Ready(None) | Poll::Pending => (),
@@ -689,7 +704,14 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
);
}
}
InboundEvent::DelayQueueError(e, queue_name) => {
crit!(
log,
"Failed to poll queue";
"queue" => queue_name,
"e" => ?e
)
}
InboundEvent::ReadyAttestation(queued_id) => {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_ATTESTATIONS,