Gloas attestation payload reprocess (#9440)

Handle payload-present attestations before the payload is seen (gloas)

A gloas beacon_attestation with index == 1 claims a past block's payload is already present. If we haven't seen that block's payload envelope yet, we shouldn't reject it the envelope may just be in flight.

So instead we IGNORE it (new AttnError::UnknownPayloadEnvelope), ask sync to fetch the envelope, and park the attestation in the reprocess queue. When the envelope is imported, the parked attestations are released and  re-verified.

The envelope lookup itself is stubbed here and wired up in #9155 or a follow up PR


  


Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com>

Co-Authored-By: Eitan Seri-Levi <eserilev@ucsc.edu>
This commit is contained in:
Lion - dapplion
2026-06-17 14:44:21 +02:00
committed by GitHub
parent 3bc9148e0e
commit a46620155b
22 changed files with 893 additions and 170 deletions

View File

@@ -115,10 +115,10 @@ pub enum ReprocessQueueMessage {
RpcBlock(QueuedRpcBlock),
/// A block that was successfully processed. We use this to handle attestations updates
/// for unknown blocks.
BlockImported {
block_root: Hash256,
parent_root: Hash256,
},
BlockImported { block_root: Hash256 },
/// A block's execution payload envelope was imported. We use this to release attestations that
/// claim payload-present (`index == 1`) for a block whose payload had not yet been seen.
PayloadEnvelopeImported { block_root: Hash256 },
/// A new `LightClientOptimisticUpdate` has been produced. We use this to handle light client
/// updates for unknown parent blocks.
NewLightClientOptimisticUpdate { parent_root: Hash256 },
@@ -126,6 +126,12 @@ pub enum ReprocessQueueMessage {
UnknownBlockUnaggregate(QueuedUnaggregate),
/// An aggregated attestation that references an unknown block.
UnknownBlockAggregate(QueuedAggregate),
/// An unaggregated attestation (`index == 1`) whose block's execution payload envelope has not
/// been seen yet.
UnknownPayloadUnaggregate(QueuedUnaggregate),
/// An aggregated attestation (`index == 1`) whose block's execution payload envelope has not
/// been seen yet.
UnknownPayloadAggregate(QueuedAggregate),
/// A light client optimistic update that references a parent root that has not been seen as a parent.
UnknownLightClientOptimisticUpdate(QueuedLightClientUpdate),
/// A new backfill batch that needs to be scheduled for processing.
@@ -296,6 +302,9 @@ struct ReprocessQueue<S> {
queued_unaggregates: FnvHashMap<usize, (QueuedUnaggregate, DelayKey)>,
/// Attestations (aggregated and unaggregated) per root.
awaiting_attestations_per_root: HashMap<Hash256, Vec<QueuedAttestationId>>,
/// Attestations (aggregated and unaggregated) awaiting a block's execution payload envelope,
/// keyed by block root. Released on `PayloadEnvelopeImported`.
awaiting_attestations_per_payload: HashMap<Hash256, Vec<QueuedAttestationId>>,
/// Queued Light Client Updates.
queued_lc_updates: FnvHashMap<usize, (QueuedLightClientUpdate, DelayKey)>,
/// Light Client Updates per parent_root.
@@ -331,6 +340,20 @@ enum QueuedAttestationId {
Unaggregate(usize),
}
/// An attestation queued for re-processing, of either aggregation kind.
enum QueuedAttestation {
Aggregate(QueuedAggregate),
Unaggregate(QueuedUnaggregate),
}
/// The component an attestation is waiting on before it can be re-processed.
enum AwaitingComponent {
/// The attestation's head block has not been seen.
Block,
/// The block's execution payload envelope has not been seen (`index == 1`, post-Gloas).
Payload,
}
impl QueuedAggregate {
pub fn beacon_block_root(&self) -> &Hash256 {
&self.beacon_block_root
@@ -494,6 +517,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
queued_aggregates: FnvHashMap::default(),
queued_unaggregates: FnvHashMap::default(),
awaiting_attestations_per_root: HashMap::new(),
awaiting_attestations_per_payload: HashMap::new(),
awaiting_lc_updates_per_parent_root: HashMap::new(),
queued_backfill_batches: Vec::new(),
queued_column_reconstructions: HashMap::new(),
@@ -512,6 +536,65 @@ impl<S: SlotClock> ReprocessQueue<S> {
}
}
/// Queue an attestation for re-processing once the component it is waiting on (`awaiting`) is
/// imported. Shared by the unknown-block and unknown-payload paths for both aggregate and
/// unaggregate attestations.
fn queue_awaiting_attestation(
&mut self,
attestation: QueuedAttestation,
awaiting: AwaitingComponent,
) {
if self.attestations_delay_queue.len() >= MAXIMUM_QUEUED_ATTESTATIONS {
if self.attestation_delay_debounce.elapsed() {
error!(
queue_size = MAXIMUM_QUEUED_ATTESTATIONS,
msg = "system resources may be saturated",
"Attestation delay queue is full"
);
}
// Drop the attestation.
return;
}
let id = self.next_attestation;
let (att_id, beacon_block_root) = match &attestation {
QueuedAttestation::Aggregate(a) => {
(QueuedAttestationId::Aggregate(id), *a.beacon_block_root())
}
QueuedAttestation::Unaggregate(u) => {
(QueuedAttestationId::Unaggregate(id), *u.beacon_block_root())
}
};
// Register the delay.
let delay_key = self
.attestations_delay_queue
.insert(att_id, QUEUED_ATTESTATION_DELAY);
// Register this attestation against the component it awaits.
match awaiting {
AwaitingComponent::Block => &mut self.awaiting_attestations_per_root,
AwaitingComponent::Payload => &mut self.awaiting_attestations_per_payload,
}
.entry(beacon_block_root)
.or_default()
.push(att_id);
// Store the attestation and its info.
match attestation {
QueuedAttestation::Aggregate(queued_aggregate) => {
self.queued_aggregates
.insert(id, (queued_aggregate, delay_key));
}
QueuedAttestation::Unaggregate(queued_unaggregate) => {
self.queued_unaggregates
.insert(id, (queued_unaggregate, delay_key));
}
}
self.next_attestation += 1;
}
fn handle_message(&mut self, msg: InboundEvent) {
use ReprocessQueueMessage::*;
match msg {
@@ -654,70 +737,26 @@ impl<S: SlotClock> ReprocessQueue<S> {
error!("Failed to send rpc block to beacon processor");
}
}
InboundEvent::Msg(UnknownBlockAggregate(queued_aggregate)) => {
if self.attestations_delay_queue.len() >= MAXIMUM_QUEUED_ATTESTATIONS {
if self.attestation_delay_debounce.elapsed() {
error!(
queue_size = MAXIMUM_QUEUED_ATTESTATIONS,
msg = "system resources may be saturated",
"Aggregate attestation delay queue is full"
);
}
// Drop the attestation.
return;
}
let att_id = QueuedAttestationId::Aggregate(self.next_attestation);
// Register the delay.
let delay_key = self
.attestations_delay_queue
.insert(att_id, QUEUED_ATTESTATION_DELAY);
// Register this attestation for the corresponding root.
self.awaiting_attestations_per_root
.entry(*queued_aggregate.beacon_block_root())
.or_default()
.push(att_id);
// Store the attestation and its info.
self.queued_aggregates
.insert(self.next_attestation, (queued_aggregate, delay_key));
self.next_attestation += 1;
}
InboundEvent::Msg(UnknownBlockUnaggregate(queued_unaggregate)) => {
if self.attestations_delay_queue.len() >= MAXIMUM_QUEUED_ATTESTATIONS {
if self.attestation_delay_debounce.elapsed() {
error!(
queue_size = MAXIMUM_QUEUED_ATTESTATIONS,
msg = "system resources may be saturated",
"Attestation delay queue is full"
);
}
// Drop the attestation.
return;
}
let att_id = QueuedAttestationId::Unaggregate(self.next_attestation);
// Register the delay.
let delay_key = self
.attestations_delay_queue
.insert(att_id, QUEUED_ATTESTATION_DELAY);
// Register this attestation for the corresponding root.
self.awaiting_attestations_per_root
.entry(*queued_unaggregate.beacon_block_root())
.or_default()
.push(att_id);
// Store the attestation and its info.
self.queued_unaggregates
.insert(self.next_attestation, (queued_unaggregate, delay_key));
self.next_attestation += 1;
}
InboundEvent::Msg(UnknownBlockAggregate(queued_aggregate)) => self
.queue_awaiting_attestation(
QueuedAttestation::Aggregate(queued_aggregate),
AwaitingComponent::Block,
),
InboundEvent::Msg(UnknownBlockUnaggregate(queued_unaggregate)) => self
.queue_awaiting_attestation(
QueuedAttestation::Unaggregate(queued_unaggregate),
AwaitingComponent::Block,
),
InboundEvent::Msg(UnknownPayloadAggregate(queued_aggregate)) => self
.queue_awaiting_attestation(
QueuedAttestation::Aggregate(queued_aggregate),
AwaitingComponent::Payload,
),
InboundEvent::Msg(UnknownPayloadUnaggregate(queued_unaggregate)) => self
.queue_awaiting_attestation(
QueuedAttestation::Unaggregate(queued_unaggregate),
AwaitingComponent::Payload,
),
InboundEvent::Msg(UnknownBlockDataColumn(queued_data_column)) => {
let block_root = queued_data_column.beacon_block_root;
@@ -785,10 +824,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
self.next_lc_update += 1;
}
InboundEvent::Msg(BlockImported {
block_root,
parent_root,
}) => {
InboundEvent::Msg(BlockImported { block_root }) => {
// Unqueue the envelope we have for this root, if any.
if let Some((envelope, delay_key)) =
self.awaiting_envelopes_per_root.remove(&block_root)
@@ -853,7 +889,6 @@ impl<S: SlotClock> ReprocessQueue<S> {
if failed_to_send_count > 0 {
error!(
hint = "system may be overloaded",
?parent_root,
?block_root,
failed_count = failed_to_send_count,
sent_count,
@@ -881,6 +916,59 @@ impl<S: SlotClock> ReprocessQueue<S> {
}
}
}
InboundEvent::Msg(PayloadEnvelopeImported { block_root }) => {
// Release attestations that were awaiting this block's execution payload envelope.
if let Some(queued_ids) = self.awaiting_attestations_per_payload.remove(&block_root)
{
let mut failed_to_send_count = 0;
for id in queued_ids {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_ATTESTATIONS,
);
if let Some((work, delay_key)) = match id {
QueuedAttestationId::Aggregate(id) => self
.queued_aggregates
.remove(&id)
.map(|(aggregate, delay_key)| {
(ReadyWork::Aggregate(aggregate), delay_key)
}),
QueuedAttestationId::Unaggregate(id) => self
.queued_unaggregates
.remove(&id)
.map(|(unaggregate, delay_key)| {
(ReadyWork::Unaggregate(unaggregate), delay_key)
}),
} {
// Remove the delay.
self.attestations_delay_queue.remove(&delay_key);
// Send the work.
if self.ready_work_tx.try_send(work).is_err() {
failed_to_send_count += 1;
}
} else {
// There is a mismatch between the attestation ids registered for this
// root and the queued attestations. This should never happen.
error!(
?block_root,
att_id = ?id,
"Unknown queued attestation for payload envelope"
);
}
}
if failed_to_send_count > 0 {
error!(
hint = "system may be overloaded",
?block_root,
failed_count = failed_to_send_count,
"Ignored scheduled attestation(s) for payload envelope"
);
}
}
}
InboundEvent::Msg(NewLightClientOptimisticUpdate { parent_root }) => {
// Unqueue the light client optimistic updates we have for this root, if any.
if let Some(queued_lc_id) = self
@@ -1033,18 +1121,25 @@ impl<S: SlotClock> ReprocessQueue<S> {
);
}
if let Entry::Occupied(mut queued_atts) =
self.awaiting_attestations_per_root.entry(root)
&& let Some(index) =
queued_atts.get().iter().position(|&id| id == queued_id)
{
let queued_atts_mut = queued_atts.get_mut();
queued_atts_mut.swap_remove(index);
// The attestation is awaiting either its block or its payload envelope; prune it
// from whichever map holds it (the other lookup is a no-op) to avoid leaking the
// entry on expiry.
for awaiting in [
&mut self.awaiting_attestations_per_root,
&mut self.awaiting_attestations_per_payload,
] {
if let Entry::Occupied(mut queued_atts) = awaiting.entry(root)
&& let Some(index) =
queued_atts.get().iter().position(|&id| id == queued_id)
{
let queued_atts_mut = queued_atts.get_mut();
queued_atts_mut.swap_remove(index);
// If the vec is empty after this attestation's removal, we need to delete
// the entry to prevent bloating the hashmap indefinitely.
if queued_atts_mut.is_empty() {
queued_atts.remove_entry();
// If the vec is empty after this attestation's removal, we need to
// delete the entry to prevent bloating the hashmap indefinitely.
if queued_atts_mut.is_empty() {
queued_atts.remove_entry();
}
}
}
}
@@ -1412,6 +1507,131 @@ mod tests {
assert!(queue.awaiting_attestations_per_root.is_empty());
}
// Regression test for the same memory leak as `prune_awaiting_attestations_per_root`, but for
// attestations awaiting a block's execution payload envelope.
#[tokio::test]
async fn prune_awaiting_attestations_per_payload() {
create_test_tracing_subscriber();
let mut queue = test_queue();
// Pause time so it only advances manually
tokio::time::pause();
let beacon_block_root = Hash256::repeat_byte(0xaf);
// Insert a payload-present attestation awaiting its payload envelope.
let att = ReprocessQueueMessage::UnknownPayloadUnaggregate(QueuedUnaggregate {
beacon_block_root,
process_fn: Box::new(|| {}),
});
queue.handle_message(InboundEvent::Msg(att));
// Check that it is queued.
assert_eq!(queue.awaiting_attestations_per_payload.len(), 1);
assert!(
queue
.awaiting_attestations_per_payload
.contains_key(&beacon_block_root)
);
// Advance time to expire the attestation.
advance_time(&queue.slot_clock, 2 * QUEUED_ATTESTATION_DELAY).await;
let ready_msg = queue.next().await.unwrap();
assert!(matches!(ready_msg, InboundEvent::ReadyAttestation(_)));
queue.handle_message(ready_msg);
// The entry should be pruned on expiry.
assert!(queue.awaiting_attestations_per_payload.is_empty());
}
// The payload envelope import releases attestations awaiting that block's payload.
#[tokio::test]
async fn release_awaiting_attestations_on_payload_envelope_imported() {
create_test_tracing_subscriber();
let mut queue = test_queue();
tokio::time::pause();
let beacon_block_root = Hash256::repeat_byte(0xaf);
let att = ReprocessQueueMessage::UnknownPayloadUnaggregate(QueuedUnaggregate {
beacon_block_root,
process_fn: Box::new(|| {}),
});
queue.handle_message(InboundEvent::Msg(att));
assert_eq!(queue.awaiting_attestations_per_payload.len(), 1);
// Importing the payload envelope drains the awaiting attestations for that root.
queue.handle_message(InboundEvent::Msg(
ReprocessQueueMessage::PayloadEnvelopeImported {
block_root: beacon_block_root,
},
));
assert!(queue.awaiting_attestations_per_payload.is_empty());
}
// As `prune_awaiting_attestations_per_payload`, but for an aggregated payload-present
// attestation (`UnknownPayloadAggregate`).
#[tokio::test]
async fn prune_awaiting_attestations_per_payload_aggregate() {
create_test_tracing_subscriber();
let mut queue = test_queue();
tokio::time::pause();
let beacon_block_root = Hash256::repeat_byte(0xaf);
let att = ReprocessQueueMessage::UnknownPayloadAggregate(QueuedAggregate {
beacon_block_root,
process_fn: Box::new(|| {}),
});
queue.handle_message(InboundEvent::Msg(att));
assert_eq!(queue.awaiting_attestations_per_payload.len(), 1);
assert!(
queue
.awaiting_attestations_per_payload
.contains_key(&beacon_block_root)
);
// Advance time to expire the attestation.
advance_time(&queue.slot_clock, 2 * QUEUED_ATTESTATION_DELAY).await;
let ready_msg = queue.next().await.unwrap();
assert!(matches!(ready_msg, InboundEvent::ReadyAttestation(_)));
queue.handle_message(ready_msg);
// The entry should be pruned on expiry.
assert!(queue.awaiting_attestations_per_payload.is_empty());
}
// As `release_awaiting_attestations_on_payload_envelope_imported`, but for an aggregated
// payload-present attestation (`UnknownPayloadAggregate`).
#[tokio::test]
async fn release_awaiting_aggregate_on_payload_envelope_imported() {
create_test_tracing_subscriber();
let mut queue = test_queue();
tokio::time::pause();
let beacon_block_root = Hash256::repeat_byte(0xaf);
let att = ReprocessQueueMessage::UnknownPayloadAggregate(QueuedAggregate {
beacon_block_root,
process_fn: Box::new(|| {}),
});
queue.handle_message(InboundEvent::Msg(att));
assert_eq!(queue.awaiting_attestations_per_payload.len(), 1);
// Importing the payload envelope drains the awaiting attestations for that root.
queue.handle_message(InboundEvent::Msg(
ReprocessQueueMessage::PayloadEnvelopeImported {
block_root: beacon_block_root,
},
));
assert!(queue.awaiting_attestations_per_payload.is_empty());
}
// This is a regression test for a memory leak in `awaiting_lc_updates_per_parent_root`.
// See: https://github.com/sigp/lighthouse/pull/8065
#[tokio::test]
@@ -1622,7 +1842,6 @@ mod tests {
tokio::time::pause();
let beacon_block_root = Hash256::repeat_byte(0xaf);
let parent_root = Hash256::repeat_byte(0xab);
// Insert an envelope.
let msg = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope {
@@ -1640,7 +1859,6 @@ mod tests {
// Simulate block import.
let imported = ReprocessQueueMessage::BlockImported {
block_root: beacon_block_root,
parent_root,
};
queue.handle_message(InboundEvent::Msg(imported));
@@ -1716,7 +1934,6 @@ mod tests {
// Simulate block import.
queue.handle_message(InboundEvent::Msg(ReprocessQueueMessage::BlockImported {
block_root: beacon_block_root,
parent_root: Hash256::repeat_byte(0x00),
}));
// Internal state should be cleaned up.