mirror of
https://github.com/sigp/lighthouse.git
synced 2026-06-15 09:48:20 +00:00
Gloas data column reprocess queue (#9339)
When debugging ePBS with columns, we noticed that columns arriving before their block dont pass gossip verification checks and are dropped. This PR ensures that columns arriving before the block are sent to the reprocess queue. Once their block arrives, they are reprocessed. This isn't an issue pre-gloas because we don't make block root checks for fulu data columns. This allows us to gossip verify the column and send it to the DA cache before the block arrives. I think we also need to handle this edge case for partial data columns. Theres an existing TODO for that already. Co-Authored-By: Eitan Seri-Levi <eserilev@ucsc.edu>
This commit is contained in:
@@ -41,8 +41,8 @@
|
||||
pub use crate::scheduler::BeaconProcessorQueueLengths;
|
||||
use crate::scheduler::work_queue::WorkQueues;
|
||||
use crate::work_reprocessing_queue::{
|
||||
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, QueuedGossipEnvelope,
|
||||
ReprocessQueueMessage,
|
||||
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, QueuedGossipDataColumn,
|
||||
QueuedGossipEnvelope, ReprocessQueueMessage,
|
||||
};
|
||||
use futures::stream::{Stream, StreamExt};
|
||||
use futures::task::Poll;
|
||||
@@ -304,6 +304,10 @@ impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {
|
||||
work: Work::ColumnReconstruction(process_fn),
|
||||
}
|
||||
}
|
||||
ReadyWork::DataColumn(QueuedGossipDataColumn { process_fn, .. }) => Self {
|
||||
drop_during_sync: true,
|
||||
work: Work::UnknownBlockDataColumn { process_fn },
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -369,6 +373,9 @@ pub enum Work<E: EthSpec> {
|
||||
UnknownBlockAttestation {
|
||||
process_fn: BlockingFn,
|
||||
},
|
||||
UnknownBlockDataColumn {
|
||||
process_fn: BlockingFn,
|
||||
},
|
||||
GossipAttestationBatch {
|
||||
attestations: GossipAttestationBatch,
|
||||
process_batch: Box<dyn FnOnce(GossipAttestationBatch) + Send + Sync>,
|
||||
@@ -464,6 +471,7 @@ pub enum WorkType {
|
||||
GossipAttestation,
|
||||
GossipAttestationToConvert,
|
||||
UnknownBlockAttestation,
|
||||
UnknownBlockDataColumn,
|
||||
GossipAttestationBatch,
|
||||
GossipAggregate,
|
||||
UnknownBlockAggregate,
|
||||
@@ -569,6 +577,7 @@ impl<E: EthSpec> Work<E> {
|
||||
Work::LightClientFinalityUpdateRequest(_) => WorkType::LightClientFinalityUpdateRequest,
|
||||
Work::LightClientUpdatesByRangeRequest(_) => WorkType::LightClientUpdatesByRangeRequest,
|
||||
Work::UnknownBlockAttestation { .. } => WorkType::UnknownBlockAttestation,
|
||||
Work::UnknownBlockDataColumn { .. } => WorkType::UnknownBlockDataColumn,
|
||||
Work::UnknownBlockAggregate { .. } => WorkType::UnknownBlockAggregate,
|
||||
Work::UnknownLightClientOptimisticUpdate { .. } => {
|
||||
WorkType::UnknownLightClientOptimisticUpdate
|
||||
@@ -842,6 +851,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
|
||||
Some(item)
|
||||
} else if let Some(item) = work_queues.gossip_data_column_queue.pop() {
|
||||
Some(item)
|
||||
} else if let Some(item) = work_queues.unknown_block_data_column_queue.pop()
|
||||
{
|
||||
Some(item)
|
||||
} else if let Some(item) =
|
||||
work_queues.gossip_partial_data_column_queue.pop()
|
||||
{
|
||||
@@ -1238,6 +1250,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
|
||||
Work::UnknownBlockAttestation { .. } => {
|
||||
work_queues.unknown_block_attestation_queue.push(work)
|
||||
}
|
||||
Work::UnknownBlockDataColumn { .. } => work_queues
|
||||
.unknown_block_data_column_queue
|
||||
.push(work, work_id),
|
||||
Work::UnknownBlockAggregate { .. } => {
|
||||
work_queues.unknown_block_aggregate_queue.push(work)
|
||||
}
|
||||
@@ -1288,6 +1303,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
|
||||
WorkType::UnknownBlockAttestation => {
|
||||
work_queues.unknown_block_attestation_queue.len()
|
||||
}
|
||||
WorkType::UnknownBlockDataColumn => {
|
||||
work_queues.unknown_block_data_column_queue.len()
|
||||
}
|
||||
WorkType::GossipAttestationBatch => 0, // No queue
|
||||
WorkType::GossipAggregate => work_queues.aggregate_queue.len(),
|
||||
WorkType::UnknownBlockAggregate => {
|
||||
@@ -1504,6 +1522,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
|
||||
}),
|
||||
Work::UnknownBlockAttestation { process_fn }
|
||||
| Work::UnknownBlockAggregate { process_fn }
|
||||
| Work::UnknownBlockDataColumn { process_fn }
|
||||
| Work::UnknownLightClientOptimisticUpdate { process_fn, .. } => {
|
||||
task_spawner.spawn_blocking(process_fn)
|
||||
}
|
||||
|
||||
@@ -111,6 +111,7 @@ pub struct BeaconProcessorQueueLengths {
|
||||
attestation_queue: usize,
|
||||
unknown_block_aggregate_queue: usize,
|
||||
unknown_block_attestation_queue: usize,
|
||||
unknown_block_data_column_queue: usize,
|
||||
sync_message_queue: usize,
|
||||
sync_contribution_queue: usize,
|
||||
gossip_voluntary_exit_queue: usize,
|
||||
@@ -174,6 +175,8 @@ impl BeaconProcessorQueueLengths {
|
||||
Ok(Self {
|
||||
aggregate_queue: 4096,
|
||||
unknown_block_aggregate_queue: 1024,
|
||||
// Capacity for two slot's worth of data columns for a supernode.
|
||||
unknown_block_data_column_queue: 256,
|
||||
// Capacity for a full slot's worth of attestations if subscribed to all subnets
|
||||
attestation_queue: std::cmp::max(
|
||||
active_validator_count / slots_per_epoch,
|
||||
@@ -245,6 +248,7 @@ pub struct WorkQueues<E: EthSpec> {
|
||||
pub attestation_debounce: TimeLatch,
|
||||
pub unknown_block_aggregate_queue: LifoQueue<Work<E>>,
|
||||
pub unknown_block_attestation_queue: LifoQueue<Work<E>>,
|
||||
pub unknown_block_data_column_queue: FifoQueue<Work<E>>,
|
||||
pub sync_message_queue: LifoQueue<Work<E>>,
|
||||
pub sync_contribution_queue: LifoQueue<Work<E>>,
|
||||
pub gossip_voluntary_exit_queue: FifoQueue<Work<E>>,
|
||||
@@ -302,6 +306,8 @@ impl<E: EthSpec> WorkQueues<E> {
|
||||
LifoQueue::new(queue_lengths.unknown_block_aggregate_queue);
|
||||
let unknown_block_attestation_queue =
|
||||
LifoQueue::new(queue_lengths.unknown_block_attestation_queue);
|
||||
let unknown_block_data_column_queue =
|
||||
FifoQueue::new(queue_lengths.unknown_block_data_column_queue);
|
||||
|
||||
let sync_message_queue = LifoQueue::new(queue_lengths.sync_message_queue);
|
||||
let sync_contribution_queue = LifoQueue::new(queue_lengths.sync_contribution_queue);
|
||||
@@ -383,6 +389,7 @@ impl<E: EthSpec> WorkQueues<E> {
|
||||
attestation_debounce,
|
||||
unknown_block_aggregate_queue,
|
||||
unknown_block_attestation_queue,
|
||||
unknown_block_data_column_queue,
|
||||
sync_message_queue,
|
||||
sync_contribution_queue,
|
||||
gossip_voluntary_exit_queue,
|
||||
|
||||
@@ -52,6 +52,10 @@ pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12);
|
||||
/// For how long to queue light client updates for re-processing.
|
||||
pub const QUEUED_LIGHT_CLIENT_UPDATE_DELAY: Duration = Duration::from_secs(12);
|
||||
|
||||
/// Data column timeout as a multiplier of slot duration. Columns waiting for their block will be
|
||||
/// sent for processing after this many slots worth of time, even if the block hasn't arrived.
|
||||
const QUEUED_DATA_COLUMN_DELAY_SLOTS: u32 = 1;
|
||||
|
||||
/// Envelope timeout as a multiplier of slot duration. Envelopes waiting for their block will be
|
||||
/// sent for processing after this many slots worth of time, even if the block hasn't arrived.
|
||||
const QUEUED_ENVELOPE_DELAY_SLOTS: u32 = 1;
|
||||
@@ -76,6 +80,9 @@ const MAXIMUM_QUEUED_ENVELOPES: usize = 16;
|
||||
/// How many attestations we keep before new ones get dropped.
|
||||
const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384;
|
||||
|
||||
/// How many columns we keep before new ones get dropped.
|
||||
const MAXIMUM_QUEUED_DATA_COLUMNS: usize = 256;
|
||||
|
||||
/// How many light client updates we keep before new ones get dropped.
|
||||
const MAXIMUM_QUEUED_LIGHT_CLIENT_UPDATES: usize = 128;
|
||||
|
||||
@@ -123,6 +130,8 @@ pub enum ReprocessQueueMessage {
|
||||
UnknownLightClientOptimisticUpdate(QueuedLightClientUpdate),
|
||||
/// A new backfill batch that needs to be scheduled for processing.
|
||||
BackfillSync(QueuedBackfillBatch),
|
||||
/// A gossip data column that references an unknown block.
|
||||
UnknownBlockDataColumn(QueuedGossipDataColumn),
|
||||
/// A delayed column reconstruction that needs checking
|
||||
DelayColumnReconstruction(QueuedColumnReconstruction),
|
||||
}
|
||||
@@ -138,6 +147,7 @@ pub enum ReadyWork {
|
||||
LightClientUpdate(QueuedLightClientUpdate),
|
||||
BackfillSync(QueuedBackfillBatch),
|
||||
ColumnReconstruction(QueuedColumnReconstruction),
|
||||
DataColumn(QueuedGossipDataColumn),
|
||||
}
|
||||
|
||||
/// An Attestation for which the corresponding block was not seen while processing, queued for
|
||||
@@ -200,6 +210,12 @@ pub struct QueuedColumnReconstruction {
|
||||
pub process_fn: AsyncFn,
|
||||
}
|
||||
|
||||
/// A gossip data column that references an unknown block, queued for later reprocessing.
|
||||
pub struct QueuedGossipDataColumn {
|
||||
pub beacon_block_root: Hash256,
|
||||
pub process_fn: BlockingFn,
|
||||
}
|
||||
|
||||
impl<E: EthSpec> TryFrom<WorkEvent<E>> for QueuedBackfillBatch {
|
||||
type Error = WorkEvent<E>;
|
||||
|
||||
@@ -240,6 +256,8 @@ enum InboundEvent {
|
||||
ReadyBackfillSync(QueuedBackfillBatch),
|
||||
/// A column reconstruction that was queued is ready for processing.
|
||||
ReadyColumnReconstruction(QueuedColumnReconstruction),
|
||||
/// A gossip data column that is ready for re-processing.
|
||||
ReadyDataColumn(Hash256),
|
||||
/// A message sent to the `ReprocessQueue`
|
||||
Msg(ReprocessQueueMessage),
|
||||
}
|
||||
@@ -264,6 +282,8 @@ struct ReprocessQueue<S> {
|
||||
lc_updates_delay_queue: DelayQueue<QueuedLightClientUpdateId>,
|
||||
/// Queue to manage scheduled column reconstructions.
|
||||
column_reconstructions_delay_queue: DelayQueue<QueuedColumnReconstruction>,
|
||||
/// Queue to manage gossip data column timeouts.
|
||||
data_columns_delay_queue: DelayQueue<Hash256>,
|
||||
|
||||
/* Queued items */
|
||||
/// Queued blocks.
|
||||
@@ -284,6 +304,10 @@ struct ReprocessQueue<S> {
|
||||
queued_column_reconstructions: HashMap<Hash256, Option<DelayKey>>,
|
||||
/// Queued backfill batches
|
||||
queued_backfill_batches: Vec<QueuedBackfillBatch>,
|
||||
/// Queued gossip data columns awaiting their block, keyed by block root.
|
||||
awaiting_data_columns_per_root: HashMap<Hash256, (Vec<QueuedGossipDataColumn>, DelayKey)>,
|
||||
/// Total number of queued gossip data columns across all roots.
|
||||
queued_data_columns_count: usize,
|
||||
|
||||
/* Aux */
|
||||
/// Next attestation id, used for both aggregated and unaggregated attestations
|
||||
@@ -294,6 +318,7 @@ struct ReprocessQueue<S> {
|
||||
rpc_block_debounce: TimeLatch,
|
||||
attestation_delay_debounce: TimeLatch,
|
||||
lc_update_delay_debounce: TimeLatch,
|
||||
data_column_delay_debounce: TimeLatch,
|
||||
next_backfill_batch_event: Option<Pin<Box<tokio::time::Sleep>>>,
|
||||
slot_clock: Arc<S>,
|
||||
}
|
||||
@@ -387,6 +412,13 @@ impl<S: SlotClock> Stream for ReprocessQueue<S> {
|
||||
Poll::Ready(None) | Poll::Pending => (),
|
||||
}
|
||||
|
||||
match self.data_columns_delay_queue.poll_expired(cx) {
|
||||
Poll::Ready(Some(block_root)) => {
|
||||
return Poll::Ready(Some(InboundEvent::ReadyDataColumn(block_root.into_inner())));
|
||||
}
|
||||
Poll::Ready(None) | Poll::Pending => (),
|
||||
}
|
||||
|
||||
if let Some(next_backfill_batch_event) = self.next_backfill_batch_event.as_mut() {
|
||||
match next_backfill_batch_event.as_mut().poll(cx) {
|
||||
Poll::Ready(_) => {
|
||||
@@ -455,6 +487,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
|
||||
attestations_delay_queue: DelayQueue::new(),
|
||||
lc_updates_delay_queue: DelayQueue::new(),
|
||||
column_reconstructions_delay_queue: DelayQueue::new(),
|
||||
data_columns_delay_queue: DelayQueue::new(),
|
||||
queued_gossip_block_roots: HashSet::new(),
|
||||
awaiting_envelopes_per_root: HashMap::new(),
|
||||
queued_lc_updates: FnvHashMap::default(),
|
||||
@@ -464,6 +497,8 @@ impl<S: SlotClock> ReprocessQueue<S> {
|
||||
awaiting_lc_updates_per_parent_root: HashMap::new(),
|
||||
queued_backfill_batches: Vec::new(),
|
||||
queued_column_reconstructions: HashMap::new(),
|
||||
awaiting_data_columns_per_root: HashMap::new(),
|
||||
queued_data_columns_count: 0,
|
||||
next_attestation: 0,
|
||||
next_lc_update: 0,
|
||||
early_block_debounce: TimeLatch::default(),
|
||||
@@ -471,6 +506,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
|
||||
rpc_block_debounce: TimeLatch::default(),
|
||||
attestation_delay_debounce: TimeLatch::default(),
|
||||
lc_update_delay_debounce: TimeLatch::default(),
|
||||
data_column_delay_debounce: TimeLatch::default(),
|
||||
next_backfill_batch_event: None,
|
||||
slot_clock,
|
||||
}
|
||||
@@ -551,22 +587,16 @@ impl<S: SlotClock> ReprocessQueue<S> {
|
||||
return;
|
||||
}
|
||||
|
||||
// When the queue is full, evict the oldest entry to make room for newer envelopes.
|
||||
// When the queue is full, drop the new envelope.
|
||||
if self.awaiting_envelopes_per_root.len() >= MAXIMUM_QUEUED_ENVELOPES {
|
||||
if self.envelope_delay_debounce.elapsed() {
|
||||
warn!(
|
||||
queue_size = MAXIMUM_QUEUED_ENVELOPES,
|
||||
msg = "system resources may be saturated",
|
||||
"Envelope delay queue is full, evicting oldest entry"
|
||||
"Envelope delay queue is full, dropping envelope"
|
||||
);
|
||||
}
|
||||
if let Some(oldest_root) =
|
||||
self.awaiting_envelopes_per_root.keys().next().copied()
|
||||
&& let Some((_envelope, delay_key)) =
|
||||
self.awaiting_envelopes_per_root.remove(&oldest_root)
|
||||
{
|
||||
self.envelope_delay_queue.remove(&delay_key);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Register the timeout.
|
||||
@@ -688,6 +718,37 @@ impl<S: SlotClock> ReprocessQueue<S> {
|
||||
|
||||
self.next_attestation += 1;
|
||||
}
|
||||
InboundEvent::Msg(UnknownBlockDataColumn(queued_data_column)) => {
|
||||
let block_root = queued_data_column.beacon_block_root;
|
||||
|
||||
if self.queued_data_columns_count >= MAXIMUM_QUEUED_DATA_COLUMNS {
|
||||
if self.data_column_delay_debounce.elapsed() {
|
||||
warn!(
|
||||
queue_size = MAXIMUM_QUEUED_DATA_COLUMNS,
|
||||
msg = "system resources may be saturated",
|
||||
"Data column delay queue is full, dropping column"
|
||||
);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if let Some((columns, _delay_key)) =
|
||||
self.awaiting_data_columns_per_root.get_mut(&block_root)
|
||||
{
|
||||
// Append to existing entry; the timer for this root is already running.
|
||||
columns.push(queued_data_column);
|
||||
} else {
|
||||
let delay_key = self.data_columns_delay_queue.insert(
|
||||
block_root,
|
||||
self.slot_clock.slot_duration() * QUEUED_DATA_COLUMN_DELAY_SLOTS,
|
||||
);
|
||||
|
||||
self.awaiting_data_columns_per_root
|
||||
.insert(block_root, (vec![queued_data_column], delay_key));
|
||||
}
|
||||
|
||||
self.queued_data_columns_count += 1;
|
||||
}
|
||||
InboundEvent::Msg(UnknownLightClientOptimisticUpdate(
|
||||
queued_light_client_optimistic_update,
|
||||
)) => {
|
||||
@@ -800,6 +861,25 @@ impl<S: SlotClock> ReprocessQueue<S> {
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Unqueue the data columns we have for this root, if any.
|
||||
if let Some((data_columns, delay_key)) =
|
||||
self.awaiting_data_columns_per_root.remove(&block_root)
|
||||
{
|
||||
self.data_columns_delay_queue.remove(&delay_key);
|
||||
self.queued_data_columns_count = self
|
||||
.queued_data_columns_count
|
||||
.saturating_sub(data_columns.len());
|
||||
for data_column in data_columns {
|
||||
if self
|
||||
.ready_work_tx
|
||||
.try_send(ReadyWork::DataColumn(data_column))
|
||||
.is_err()
|
||||
{
|
||||
error!(?block_root, "Failed to send data column for reprocessing");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
InboundEvent::Msg(NewLightClientOptimisticUpdate { parent_root }) => {
|
||||
// Unqueue the light client optimistic updates we have for this root, if any.
|
||||
@@ -1053,6 +1133,27 @@ impl<S: SlotClock> ReprocessQueue<S> {
|
||||
);
|
||||
}
|
||||
}
|
||||
InboundEvent::ReadyDataColumn(block_root) => {
|
||||
if let Some((data_columns, _)) =
|
||||
self.awaiting_data_columns_per_root.remove(&block_root)
|
||||
{
|
||||
self.queued_data_columns_count = self
|
||||
.queued_data_columns_count
|
||||
.saturating_sub(data_columns.len());
|
||||
for data_column in data_columns {
|
||||
if self
|
||||
.ready_work_tx
|
||||
.try_send(ReadyWork::DataColumn(data_column))
|
||||
.is_err()
|
||||
{
|
||||
error!(
|
||||
hint = "system may be overloaded",
|
||||
"Ignored expired gossip data column"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
metrics::set_gauge_vec(
|
||||
@@ -1581,48 +1682,87 @@ mod tests {
|
||||
assert_eq!(queue.envelope_delay_queue.len(), 1);
|
||||
}
|
||||
|
||||
/// Tests that a queued gossip data column is released when its block is imported.
|
||||
#[tokio::test]
|
||||
async fn envelope_capacity_evicts_oldest() {
|
||||
async fn data_column_released_on_block_imported() {
|
||||
create_test_tracing_subscriber();
|
||||
|
||||
let config = BeaconProcessorConfig::default();
|
||||
let (ready_work_tx, mut ready_work_rx) =
|
||||
mpsc::channel::<ReadyWork>(config.max_scheduled_work_queue_len);
|
||||
let (_, reprocess_work_rx) =
|
||||
mpsc::channel::<ReprocessQueueMessage>(config.max_scheduled_work_queue_len);
|
||||
let slot_clock = Arc::new(testing_slot_clock(12));
|
||||
let mut queue = ReprocessQueue::new(ready_work_tx, reprocess_work_rx, slot_clock);
|
||||
|
||||
tokio::time::pause();
|
||||
|
||||
let beacon_block_root = Hash256::repeat_byte(0xbb);
|
||||
|
||||
let msg = ReprocessQueueMessage::UnknownBlockDataColumn(QueuedGossipDataColumn {
|
||||
beacon_block_root,
|
||||
process_fn: Box::new(|| {}),
|
||||
});
|
||||
queue.handle_message(InboundEvent::Msg(msg));
|
||||
|
||||
assert_eq!(queue.awaiting_data_columns_per_root.len(), 1);
|
||||
assert!(
|
||||
queue
|
||||
.awaiting_data_columns_per_root
|
||||
.contains_key(&beacon_block_root)
|
||||
);
|
||||
assert_eq!(queue.data_columns_delay_queue.len(), 1);
|
||||
|
||||
// Simulate block import.
|
||||
queue.handle_message(InboundEvent::Msg(ReprocessQueueMessage::BlockImported {
|
||||
block_root: beacon_block_root,
|
||||
parent_root: Hash256::repeat_byte(0x00),
|
||||
}));
|
||||
|
||||
// Internal state should be cleaned up.
|
||||
assert!(queue.awaiting_data_columns_per_root.is_empty());
|
||||
assert_eq!(queue.data_columns_delay_queue.len(), 0);
|
||||
|
||||
// The column should have been sent to the ready_work channel.
|
||||
let ready = ready_work_rx.try_recv().expect("column should be ready");
|
||||
assert!(matches!(ready, ReadyWork::DataColumn(_)));
|
||||
}
|
||||
|
||||
/// Tests that an expired gossip data column is pruned cleanly from all internal state.
|
||||
#[tokio::test]
|
||||
async fn prune_awaiting_data_columns_per_root() {
|
||||
create_test_tracing_subscriber();
|
||||
|
||||
let mut queue = test_queue();
|
||||
|
||||
// Pause time so it only advances manually
|
||||
tokio::time::pause();
|
||||
|
||||
// Fill the queue to capacity.
|
||||
for i in 0..MAXIMUM_QUEUED_ENVELOPES {
|
||||
let block_root = Hash256::repeat_byte(i as u8);
|
||||
let msg = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope {
|
||||
beacon_block_slot: Slot::new(1),
|
||||
beacon_block_root: block_root,
|
||||
process_fn: Box::pin(async {}),
|
||||
});
|
||||
queue.handle_message(InboundEvent::Msg(msg));
|
||||
}
|
||||
assert_eq!(
|
||||
queue.awaiting_envelopes_per_root.len(),
|
||||
MAXIMUM_QUEUED_ENVELOPES
|
||||
);
|
||||
let beacon_block_root = Hash256::repeat_byte(0xcd);
|
||||
|
||||
// One more should evict the oldest and insert the new one.
|
||||
let overflow_root = Hash256::repeat_byte(0xff);
|
||||
let msg = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope {
|
||||
beacon_block_slot: Slot::new(1),
|
||||
beacon_block_root: overflow_root,
|
||||
process_fn: Box::pin(async {}),
|
||||
let msg = ReprocessQueueMessage::UnknownBlockDataColumn(QueuedGossipDataColumn {
|
||||
beacon_block_root,
|
||||
process_fn: Box::new(|| {}),
|
||||
});
|
||||
queue.handle_message(InboundEvent::Msg(msg));
|
||||
|
||||
// Queue should still be at capacity, with the new root present.
|
||||
assert_eq!(
|
||||
queue.awaiting_envelopes_per_root.len(),
|
||||
MAXIMUM_QUEUED_ENVELOPES
|
||||
);
|
||||
assert_eq!(queue.awaiting_data_columns_per_root.len(), 1);
|
||||
assert!(
|
||||
queue
|
||||
.awaiting_envelopes_per_root
|
||||
.contains_key(&overflow_root)
|
||||
.awaiting_data_columns_per_root
|
||||
.contains_key(&beacon_block_root)
|
||||
);
|
||||
|
||||
// Advance time past the delay so the entry expires.
|
||||
advance_time(
|
||||
&queue.slot_clock,
|
||||
2 * queue.slot_clock.slot_duration() * QUEUED_DATA_COLUMN_DELAY_SLOTS,
|
||||
)
|
||||
.await;
|
||||
let ready_msg = queue.next().await.unwrap();
|
||||
assert!(matches!(ready_msg, InboundEvent::ReadyDataColumn(_)));
|
||||
queue.handle_message(ready_msg);
|
||||
|
||||
// All internal state should be cleaned up.
|
||||
assert!(queue.awaiting_data_columns_per_root.is_empty());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user