Merge conlficts

This commit is contained in:
Eitan Seri- Levi
2026-03-26 21:50:30 -07:00
158 changed files with 4877 additions and 3908 deletions

View File

@@ -102,7 +102,7 @@ pub enum ReprocessQueueMessage {
/// A block that has been received early and we should queue for later processing.
EarlyBlock(QueuedGossipBlock),
/// An execution payload envelope that references a block not yet in fork choice.
UnknownBlockEnvelope(QueuedGossipEnvelope),
UnknownBlockForEnvelope(QueuedGossipEnvelope),
/// A gossip block for hash `X` is being imported, we should queue the rpc block for the same
/// hash until the gossip block is imported.
RpcBlock(QueuedRpcBlock),
@@ -535,25 +535,38 @@ impl<S: SlotClock> ReprocessQueue<S> {
}
// An envelope that references an unknown block. Queue it until the block is
// imported, or until the timeout expires.
InboundEvent::Msg(UnknownBlockEnvelope(queued_envelope)) => {
InboundEvent::Msg(UnknownBlockForEnvelope(queued_envelope)) => {
let block_root = queued_envelope.beacon_block_root;
// TODO(gloas): Perform lightweight pre-validation before queuing
// (e.g. verify builder signature) to prevent unsigned garbage from
// consuming queue slots.
// Don't add the same envelope to the queue twice. This prevents DoS attacks.
if self.awaiting_envelopes_per_root.contains_key(&block_root) {
trace!(
?block_root,
"Duplicate envelope for same block root, dropping"
);
return;
}
// Check to ensure this won't over-fill the queue.
// When the queue is full, evict the oldest entry to make room for newer envelopes.
if self.awaiting_envelopes_per_root.len() >= MAXIMUM_QUEUED_ENVELOPES {
if self.envelope_delay_debounce.elapsed() {
warn!(
queue_size = MAXIMUM_QUEUED_ENVELOPES,
msg = "system resources may be saturated",
"Envelope delay queue is full"
"Envelope delay queue is full, evicting oldest entry"
);
}
// Drop the envelope.
return;
if let Some(oldest_root) =
self.awaiting_envelopes_per_root.keys().next().copied()
&& let Some((_envelope, delay_key)) =
self.awaiting_envelopes_per_root.remove(&oldest_root)
{
self.envelope_delay_queue.remove(&delay_key);
}
}
// Register the timeout.
@@ -892,12 +905,18 @@ impl<S: SlotClock> ReprocessQueue<S> {
InboundEvent::ReadyEnvelope(block_root) => {
if let Some((envelope, _delay_key)) =
self.awaiting_envelopes_per_root.remove(&block_root)
&& self
{
debug!(
?block_root,
"Envelope timed out waiting for block, sending for processing"
);
if self
.ready_work_tx
.try_send(ReadyWork::Envelope(envelope))
.is_err()
{
error!(?block_root, "Failed to send envelope after timeout");
{
error!(?block_root, "Failed to send envelope after timeout");
}
}
}
InboundEvent::ReadyAttestation(queued_id) => {
@@ -1442,4 +1461,163 @@ mod tests {
assert_eq!(reconstruction.block_root, block_root);
}
}
// Test that envelopes are properly cleaned up from `awaiting_envelopes_per_root` on timeout.
#[tokio::test]
async fn prune_awaiting_envelopes_per_root() {
create_test_tracing_subscriber();
let mut queue = test_queue();
// Pause time so it only advances manually
tokio::time::pause();
let beacon_block_root = Hash256::repeat_byte(0xaf);
// Insert an envelope.
let msg = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope {
beacon_block_slot: Slot::new(1),
beacon_block_root,
process_fn: Box::pin(async {}),
});
// Process the event to enter it into the delay queue.
queue.handle_message(InboundEvent::Msg(msg));
// Check that it is queued.
assert_eq!(queue.awaiting_envelopes_per_root.len(), 1);
assert!(
queue
.awaiting_envelopes_per_root
.contains_key(&beacon_block_root)
);
// Advance time to expire the envelope.
advance_time(
&queue.slot_clock,
queue.slot_clock.slot_duration() * QUEUED_ENVELOPE_DELAY_SLOTS * 2,
)
.await;
let ready_msg = queue.next().await.unwrap();
assert!(matches!(ready_msg, InboundEvent::ReadyEnvelope(_)));
queue.handle_message(ready_msg);
// The entry for the block root should be gone.
assert!(queue.awaiting_envelopes_per_root.is_empty());
}
#[tokio::test]
async fn envelope_released_on_block_imported() {
create_test_tracing_subscriber();
let mut queue = test_queue();
// Pause time so it only advances manually
tokio::time::pause();
let beacon_block_root = Hash256::repeat_byte(0xaf);
let parent_root = Hash256::repeat_byte(0xab);
// Insert an envelope.
let msg = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope {
beacon_block_slot: Slot::new(1),
beacon_block_root,
process_fn: Box::pin(async {}),
});
// Process the event to enter it into the delay queue.
queue.handle_message(InboundEvent::Msg(msg));
// Check that it is queued.
assert_eq!(queue.awaiting_envelopes_per_root.len(), 1);
// Simulate block import.
let imported = ReprocessQueueMessage::BlockImported {
block_root: beacon_block_root,
parent_root,
};
queue.handle_message(InboundEvent::Msg(imported));
// The entry for the block root should be gone.
assert!(queue.awaiting_envelopes_per_root.is_empty());
// Delay queue entry should also be cancelled.
assert_eq!(queue.envelope_delay_queue.len(), 0);
}
#[tokio::test]
async fn envelope_dedup_drops_second() {
create_test_tracing_subscriber();
let mut queue = test_queue();
// Pause time so it only advances manually
tokio::time::pause();
let beacon_block_root = Hash256::repeat_byte(0xaf);
// Insert an envelope.
let msg1 = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope {
beacon_block_slot: Slot::new(1),
beacon_block_root,
process_fn: Box::pin(async {}),
});
let msg2 = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope {
beacon_block_slot: Slot::new(1),
beacon_block_root,
process_fn: Box::pin(async {}),
});
// Process both events.
queue.handle_message(InboundEvent::Msg(msg1));
queue.handle_message(InboundEvent::Msg(msg2));
// Only one should be queued.
assert_eq!(queue.awaiting_envelopes_per_root.len(), 1);
assert_eq!(queue.envelope_delay_queue.len(), 1);
}
#[tokio::test]
async fn envelope_capacity_evicts_oldest() {
create_test_tracing_subscriber();
let mut queue = test_queue();
// Pause time so it only advances manually
tokio::time::pause();
// Fill the queue to capacity.
for i in 0..MAXIMUM_QUEUED_ENVELOPES {
let block_root = Hash256::repeat_byte(i as u8);
let msg = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope {
beacon_block_slot: Slot::new(1),
beacon_block_root: block_root,
process_fn: Box::pin(async {}),
});
queue.handle_message(InboundEvent::Msg(msg));
}
assert_eq!(
queue.awaiting_envelopes_per_root.len(),
MAXIMUM_QUEUED_ENVELOPES
);
// One more should evict the oldest and insert the new one.
let overflow_root = Hash256::repeat_byte(0xff);
let msg = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope {
beacon_block_slot: Slot::new(1),
beacon_block_root: overflow_root,
process_fn: Box::pin(async {}),
});
queue.handle_message(InboundEvent::Msg(msg));
// Queue should still be at capacity, with the new root present.
assert_eq!(
queue.awaiting_envelopes_per_root.len(),
MAXIMUM_QUEUED_ENVELOPES
);
assert!(
queue
.awaiting_envelopes_per_root
.contains_key(&overflow_root)
);
}
}