From a46620155b1d6ed1ec2a775a0f4fa10e53cebd3f Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Wed, 17 Jun 2026 14:44:21 +0200 Subject: [PATCH] Gloas attestation payload reprocess (#9440) Handle payload-present attestations before the payload is seen (gloas) A gloas beacon_attestation with index == 1 claims a past block's payload is already present. If we haven't seen that block's payload envelope yet, we shouldn't reject it the envelope may just be in flight. So instead we IGNORE it (new AttnError::UnknownPayloadEnvelope), ask sync to fetch the envelope, and park the attestation in the reprocess queue. When the envelope is imported, the parked attestations are released and re-verified. The envelope lookup itself is stubbed here and wired up in #9155 or a follow up PR Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com> Co-Authored-By: Eitan Seri-Levi --- .../src/attestation_verification.rs | 30 ++ beacon_node/beacon_chain/src/beacon_chain.rs | 17 +- .../beacon_chain/src/fetch_blobs/tests.rs | 10 +- .../src/payload_bid_verification/tests.rs | 1 + .../payload_envelope_verification/import.rs | 7 +- .../tests/attestation_verification.rs | 170 ++++++++ .../beacon_chain/tests/column_verification.rs | 5 +- .../src/scheduler/work_reprocessing_queue.rs | 391 ++++++++++++++---- .../src/beacon/execution_payload_envelopes.rs | 4 +- .../http_api/src/publish_attestations.rs | 40 ++ beacon_node/http_api/src/publish_blocks.rs | 4 +- .../http_api/tests/interactive_tests.rs | 6 +- beacon_node/network/src/metrics.rs | 7 + .../gossip_methods.rs | 286 +++++++++++-- .../src/network_beacon_processor/mod.rs | 22 +- .../network_beacon_processor/sync_methods.rs | 22 +- beacon_node/network/src/sync/manager.rs | 28 ++ beacon_node/network/src/sync/tests/lookups.rs | 2 +- consensus/fork_choice/src/fork_choice.rs | 2 + consensus/proto_array/benches/find_head.rs | 1 + .../src/fork_choice_test_definition.rs | 1 + .../src/proto_array_fork_choice.rs | 7 + 22 files changed, 893 insertions(+), 170 deletions(-) diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index 635ca3a2ae..90ac7d68cf 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -174,6 +174,14 @@ pub enum Error { /// The attestation points to a block we have not yet imported. It's unclear if the attestation /// is valid or not. UnknownHeadBlock { beacon_block_root: Hash256 }, + /// An attestation indicating the presence of a payload (`index == 1`) references a block whose + /// execution payload envelope has not been seen yet. + /// + /// ## Peer scoring + /// + /// The attestation may be valid once the payload envelope is retrieved; it's unclear if the + /// attestation is valid or not, so it is ignored (not penalized) pending the envelope. + UnknownPayloadEnvelope { beacon_block_root: Hash256 }, /// The `attestation.data.beacon_block_root` block is from before the finalized checkpoint. /// /// ## Peer scoring @@ -612,6 +620,18 @@ impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> { )); } + // [New in Gloas]: `index == 1` claims the block's execution payload is present. Ignore the + // attestation until we have seen the block's payload envelope, so it can be re-processed + // (and the envelope retrieved) once the payload is received. + if fork_name.gloas_enabled() + && attestation.data().index == 1 + && !head_block.payload_received + { + return Err(Error::UnknownPayloadEnvelope { + beacon_block_root: attestation.data().beacon_block_root, + }); + } + // Check the attestation target root is consistent with the head root. // // This check is not in the specification, however we guard against it since it opens us up @@ -923,6 +943,16 @@ impl<'a, T: BeaconChainTypes> IndexedUnaggregatedAttestation<'a, T> { )); } + // [New in Gloas]: `index == 1` claims the block's execution payload is present. Ignore the + // attestation until we have seen the block's payload envelope, so it can be re-processed + // (and the envelope retrieved) once the payload is received. + if fork_name.gloas_enabled() && attestation.data.index == 1 && !head_block.payload_received + { + return Err(Error::UnknownPayloadEnvelope { + beacon_block_root: attestation.data.beacon_block_root, + }); + } + // Check the attestation target root is consistent with the head root. verify_attestation_target_root::(&head_block, &attestation.data)?; diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 5a521d18e6..f09b9b0520 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -196,7 +196,7 @@ pub enum WhenSlotSkipped { #[derive(Copy, Clone, Debug, PartialEq)] pub enum AvailabilityProcessingStatus { MissingComponents(Slot, Hash256), - Imported(Hash256), + Imported(Slot, Hash256), } impl TryInto for AvailabilityProcessingStatus { @@ -204,7 +204,7 @@ impl TryInto for AvailabilityProcessingStatus { fn try_into(self) -> Result { match self { - AvailabilityProcessingStatus::Imported(hash) => Ok(hash.into()), + AvailabilityProcessingStatus::Imported(_, hash) => Ok(hash.into()), _ => Err(()), } } @@ -215,7 +215,7 @@ impl TryInto for AvailabilityProcessingStatus { fn try_into(self) -> Result { match self { - AvailabilityProcessingStatus::Imported(hash) => Ok(hash), + AvailabilityProcessingStatus::Imported(_, hash) => Ok(hash), _ => Err(()), } } @@ -3159,9 +3159,9 @@ impl BeaconChain { { Ok(status) => { match status { - AvailabilityProcessingStatus::Imported(block_root) => { + AvailabilityProcessingStatus::Imported(slot, block_root) => { // The block was imported successfully. - imported_blocks.push((block_root, block_slot)); + imported_blocks.push((block_root, slot)); } AvailabilityProcessingStatus::MissingComponents(slot, block_root) => { warn!( @@ -3808,10 +3808,10 @@ impl BeaconChain { // Verify and import the block. match import_block.await { // The block was successfully verified and imported. Yay. - Ok(status @ AvailabilityProcessingStatus::Imported(block_root)) => { + Ok(status @ AvailabilityProcessingStatus::Imported(slot, block_root)) => { debug!( ?block_root, - %block_slot, + %slot, source = %block_source, "Beacon block imported" ); @@ -4149,6 +4149,7 @@ impl BeaconChain { payload_verification_outcome, } = *block; + let slot = block.slot(); let BlockImportData { block_root, state, @@ -4183,7 +4184,7 @@ impl BeaconChain { .await?? }; - Ok(AvailabilityProcessingStatus::Imported(block_root)) + Ok(AvailabilityProcessingStatus::Imported(slot, block_root)) } /// Accepts a fully-verified and available block and imports it into the chain without performing any diff --git a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs index 4a37113fd9..3c0f43fef0 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs @@ -200,7 +200,10 @@ mod get_blobs_v2 { .returning(|_, _| None); mock_process_engine_blobs_result( &mut mock_adapter, - Ok(AvailabilityProcessingStatus::Imported(block_root)), + Ok(AvailabilityProcessingStatus::Imported( + block.slot(), + block_root, + )), ); // Trigger fetch blobs on the block @@ -217,7 +220,10 @@ mod get_blobs_v2 { assert_eq!( processing_status, - Some(AvailabilityProcessingStatus::Imported(block_root)) + Some(AvailabilityProcessingStatus::Imported( + block.slot(), + block_root + )) ); let published_columns = extract_published_blobs(publish_fn_args); diff --git a/beacon_node/beacon_chain/src/payload_bid_verification/tests.rs b/beacon_node/beacon_chain/src/payload_bid_verification/tests.rs index 04eb875bd9..e764f0beb5 100644 --- a/beacon_node/beacon_chain/src/payload_bid_verification/tests.rs +++ b/beacon_node/beacon_chain/src/payload_bid_verification/tests.rs @@ -223,6 +223,7 @@ impl TestContext { execution_payload_parent_hash: Some(ExecutionBlockHash::zero()), execution_payload_block_hash: Some(ExecutionBlockHash::repeat_byte(0xab)), proposer_index: Some(0), + payload_received: false, }, Slot::new(1), &self.spec, diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs index 90cdb4fe97..29782b3294 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs @@ -108,10 +108,10 @@ impl BeaconChain { // Verify and import the payload envelope. match import_envelope.await { // The payload envelope was successfully verified and imported. - Ok(status @ AvailabilityProcessingStatus::Imported(block_root)) => { + Ok(status @ AvailabilityProcessingStatus::Imported(slot, block_root)) => { info!( ?block_root, - %block_slot, + %slot, source = %envelope_source, "Execution payload envelope imported" ); @@ -195,6 +195,7 @@ impl BeaconChain { block_root, payload_verification_outcome, } = *envelope; + let slot = envelope.envelope.slot(); let block_root = { let chain = self.clone(); @@ -211,7 +212,7 @@ impl BeaconChain { .await?? }; - Ok(AvailabilityProcessingStatus::Imported(block_root)) + Ok(AvailabilityProcessingStatus::Imported(slot, block_root)) } /// Accepts a fully-verified and available envelope and imports it into the chain without performing any diff --git a/beacon_node/beacon_chain/tests/attestation_verification.rs b/beacon_node/beacon_chain/tests/attestation_verification.rs index 03b8ae58ac..ad369c79ee 100644 --- a/beacon_node/beacon_chain/tests/attestation_verification.rs +++ b/beacon_node/beacon_chain/tests/attestation_verification.rs @@ -1970,6 +1970,176 @@ async fn gloas_aggregated_attestation_same_slot_index_must_be_zero() { ); } +/// [New in Gloas]: An unaggregated attestation claiming payload-present (`data.index == 1`) for a +/// block whose payload envelope has not yet been seen (`payload_received == false`) must be +/// rejected with `UnknownPayloadEnvelope`, so it can be parked for re-processing once the envelope +/// arrives. +#[tokio::test] +async fn gloas_unaggregated_attestation_unknown_payload_envelope() { + if !test_spec::() + .fork_name_at_epoch(Epoch::new(0)) + .gloas_enabled() + { + return; + } + + let harness = get_harness(VALIDATOR_COUNT); + + // Build some chain depth. `extend_chain` imports each block's payload envelope, so every block + // produced so far has `payload_received == true`. + harness + .extend_chain( + MainnetEthSpec::slots_per_epoch() as usize * 2, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + // Produce one more block but do NOT import its payload envelope, leaving the head block with + // `payload_received == false`. + let head = harness.chain.head_snapshot(); + let block_slot = head.beacon_block.slot() + 1; + let ((signed_block, blobs), _envelope, _post_state) = harness + .make_block_with_envelope(head.beacon_state.clone(), block_slot) + .await; + let block_root = signed_block.canonical_root(); + harness + .process_block(block_slot, block_root, (signed_block, blobs)) + .await + .expect("payload-less block should import"); + + // The block should be the head, and its payload envelope should not be recorded. + assert!( + !harness + .chain + .canonical_head + .fork_choice_read_lock() + .get_block(&block_root) + .expect("block should be in fork choice") + .payload_received, + "block should not have its payload envelope recorded" + ); + + // Advance a slot so the attestation slot is later than the (payload-less) head block's slot, + // which avoids the same-slot `index == 0` requirement. + harness.advance_slot(); + + // Produce a valid attestation for the head block, then claim payload-present (`index == 1`). + // The gloas payload-envelope check runs before signature verification, so mutating the index + // is sufficient to exercise the arm. + let (mut attestation, _attester_sk, subnet_id) = + get_valid_unaggregated_attestation(&harness.chain); + assert_eq!( + attestation.data.beacon_block_root, block_root, + "attestation should be for the payload-less head block" + ); + attestation.data.index = 1; + + let result = harness + .chain + .verify_unaggregated_attestation_for_gossip(&attestation, Some(subnet_id)); + assert!( + matches!( + result, + Err(AttnError::UnknownPayloadEnvelope { beacon_block_root }) + if beacon_block_root == block_root + ), + "gloas: payload-present attestation for a block with an unseen payload envelope should be \ + rejected with UnknownPayloadEnvelope, got {:?}", + result.err() + ); +} + +/// [New in Gloas]: The aggregate counterpart of +/// `gloas_unaggregated_attestation_unknown_payload_envelope`. An aggregate claiming payload-present +/// (`data.index == 1`) for a block whose payload envelope has not been seen must be rejected with +/// `UnknownPayloadEnvelope`. +#[tokio::test] +async fn gloas_aggregated_attestation_unknown_payload_envelope() { + // Skip unless running with the gloas fork, before paying for harness setup. + if !test_spec::() + .fork_name_at_epoch(Epoch::new(0)) + .gloas_enabled() + { + return; + } + + let harness = get_harness(VALIDATOR_COUNT); + + // Build some chain depth. `extend_chain` imports each block's payload envelope, so every block + // produced so far has `payload_received == true`. + harness + .extend_chain( + MainnetEthSpec::slots_per_epoch() as usize * 2, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + // Produce one more block but do NOT import its payload envelope, leaving the head block with + // `payload_received == false`. + let head = harness.chain.head_snapshot(); + let block_slot = head.beacon_block.slot() + 1; + let ((signed_block, blobs), _envelope, _post_state) = harness + .make_block_with_envelope(head.beacon_state.clone(), block_slot) + .await; + let block_root = signed_block.canonical_root(); + harness + .process_block(block_slot, block_root, (signed_block, blobs)) + .await + .expect("payload-less block should import"); + + // Advance a slot so the attestation slot is later than the (payload-less) head block's slot, + // which avoids the same-slot `index == 0` requirement. + harness.advance_slot(); + + let head = harness.chain.head_snapshot(); + let current_slot = harness.chain.slot().expect("should get slot"); + + // Build a valid aggregate for the head block, then claim payload-present (`index == 1`). The + // gloas payload-envelope check runs before signature verification, so mutating the index is + // sufficient to exercise the arm. + let (valid_attestation, _, _) = get_valid_unaggregated_attestation(&harness.chain); + assert_eq!( + valid_attestation.data.beacon_block_root, block_root, + "attestation should be for the payload-less head block" + ); + let committee = head + .beacon_state + .get_beacon_committee(current_slot, valid_attestation.committee_index) + .expect("should get committee"); + let fork_name = harness + .spec + .fork_name_at_slot::(valid_attestation.data.slot); + let aggregate_attestation = + single_attestation_to_attestation(&valid_attestation, committee.committee, fork_name) + .unwrap(); + let (mut valid_aggregate, _, _) = + get_valid_aggregated_attestation(&harness.chain, aggregate_attestation); + + valid_aggregate + .as_electra_mut() + .unwrap() + .message + .aggregate + .data + .index = 1; + + let result = harness + .chain + .verify_aggregated_attestation_for_gossip(&valid_aggregate); + assert!( + matches!( + result, + Err(AttnError::UnknownPayloadEnvelope { beacon_block_root }) + if beacon_block_root == block_root + ), + "gloas: payload-present aggregate for a block with an unseen payload envelope should be \ + rejected with UnknownPayloadEnvelope, got {:?}", + result.err() + ); +} + /// Regression test: a SingleAttestation with a huge bogus attester_index must not be forwarded to /// the slasher. Previously the slasher received the IndexedAttestation before committee-membership /// validation, causing an OOM when the slasher tried to allocate based on the untrusted index. diff --git a/beacon_node/beacon_chain/tests/column_verification.rs b/beacon_node/beacon_chain/tests/column_verification.rs index 06a5f44e5f..180e187e90 100644 --- a/beacon_node/beacon_chain/tests/column_verification.rs +++ b/beacon_node/beacon_chain/tests/column_verification.rs @@ -274,5 +274,8 @@ async fn verify_header_signature_fork_block_bug() { .process_rpc_custody_columns(data_column_sidecars) .await .unwrap(); - assert_eq!(status, AvailabilityProcessingStatus::Imported(block_root)); + assert_eq!( + status, + AvailabilityProcessingStatus::Imported(signed_block.slot(), block_root) + ); } diff --git a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs index 62ed86fbad..dddf2a740d 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs @@ -115,10 +115,10 @@ pub enum ReprocessQueueMessage { RpcBlock(QueuedRpcBlock), /// A block that was successfully processed. We use this to handle attestations updates /// for unknown blocks. - BlockImported { - block_root: Hash256, - parent_root: Hash256, - }, + BlockImported { block_root: Hash256 }, + /// A block's execution payload envelope was imported. We use this to release attestations that + /// claim payload-present (`index == 1`) for a block whose payload had not yet been seen. + PayloadEnvelopeImported { block_root: Hash256 }, /// A new `LightClientOptimisticUpdate` has been produced. We use this to handle light client /// updates for unknown parent blocks. NewLightClientOptimisticUpdate { parent_root: Hash256 }, @@ -126,6 +126,12 @@ pub enum ReprocessQueueMessage { UnknownBlockUnaggregate(QueuedUnaggregate), /// An aggregated attestation that references an unknown block. UnknownBlockAggregate(QueuedAggregate), + /// An unaggregated attestation (`index == 1`) whose block's execution payload envelope has not + /// been seen yet. + UnknownPayloadUnaggregate(QueuedUnaggregate), + /// An aggregated attestation (`index == 1`) whose block's execution payload envelope has not + /// been seen yet. + UnknownPayloadAggregate(QueuedAggregate), /// A light client optimistic update that references a parent root that has not been seen as a parent. UnknownLightClientOptimisticUpdate(QueuedLightClientUpdate), /// A new backfill batch that needs to be scheduled for processing. @@ -296,6 +302,9 @@ struct ReprocessQueue { queued_unaggregates: FnvHashMap, /// Attestations (aggregated and unaggregated) per root. awaiting_attestations_per_root: HashMap>, + /// Attestations (aggregated and unaggregated) awaiting a block's execution payload envelope, + /// keyed by block root. Released on `PayloadEnvelopeImported`. + awaiting_attestations_per_payload: HashMap>, /// Queued Light Client Updates. queued_lc_updates: FnvHashMap, /// Light Client Updates per parent_root. @@ -331,6 +340,20 @@ enum QueuedAttestationId { Unaggregate(usize), } +/// An attestation queued for re-processing, of either aggregation kind. +enum QueuedAttestation { + Aggregate(QueuedAggregate), + Unaggregate(QueuedUnaggregate), +} + +/// The component an attestation is waiting on before it can be re-processed. +enum AwaitingComponent { + /// The attestation's head block has not been seen. + Block, + /// The block's execution payload envelope has not been seen (`index == 1`, post-Gloas). + Payload, +} + impl QueuedAggregate { pub fn beacon_block_root(&self) -> &Hash256 { &self.beacon_block_root @@ -494,6 +517,7 @@ impl ReprocessQueue { queued_aggregates: FnvHashMap::default(), queued_unaggregates: FnvHashMap::default(), awaiting_attestations_per_root: HashMap::new(), + awaiting_attestations_per_payload: HashMap::new(), awaiting_lc_updates_per_parent_root: HashMap::new(), queued_backfill_batches: Vec::new(), queued_column_reconstructions: HashMap::new(), @@ -512,6 +536,65 @@ impl ReprocessQueue { } } + /// Queue an attestation for re-processing once the component it is waiting on (`awaiting`) is + /// imported. Shared by the unknown-block and unknown-payload paths for both aggregate and + /// unaggregate attestations. + fn queue_awaiting_attestation( + &mut self, + attestation: QueuedAttestation, + awaiting: AwaitingComponent, + ) { + if self.attestations_delay_queue.len() >= MAXIMUM_QUEUED_ATTESTATIONS { + if self.attestation_delay_debounce.elapsed() { + error!( + queue_size = MAXIMUM_QUEUED_ATTESTATIONS, + msg = "system resources may be saturated", + "Attestation delay queue is full" + ); + } + // Drop the attestation. + return; + } + + let id = self.next_attestation; + let (att_id, beacon_block_root) = match &attestation { + QueuedAttestation::Aggregate(a) => { + (QueuedAttestationId::Aggregate(id), *a.beacon_block_root()) + } + QueuedAttestation::Unaggregate(u) => { + (QueuedAttestationId::Unaggregate(id), *u.beacon_block_root()) + } + }; + + // Register the delay. + let delay_key = self + .attestations_delay_queue + .insert(att_id, QUEUED_ATTESTATION_DELAY); + + // Register this attestation against the component it awaits. + match awaiting { + AwaitingComponent::Block => &mut self.awaiting_attestations_per_root, + AwaitingComponent::Payload => &mut self.awaiting_attestations_per_payload, + } + .entry(beacon_block_root) + .or_default() + .push(att_id); + + // Store the attestation and its info. + match attestation { + QueuedAttestation::Aggregate(queued_aggregate) => { + self.queued_aggregates + .insert(id, (queued_aggregate, delay_key)); + } + QueuedAttestation::Unaggregate(queued_unaggregate) => { + self.queued_unaggregates + .insert(id, (queued_unaggregate, delay_key)); + } + } + + self.next_attestation += 1; + } + fn handle_message(&mut self, msg: InboundEvent) { use ReprocessQueueMessage::*; match msg { @@ -654,70 +737,26 @@ impl ReprocessQueue { error!("Failed to send rpc block to beacon processor"); } } - InboundEvent::Msg(UnknownBlockAggregate(queued_aggregate)) => { - if self.attestations_delay_queue.len() >= MAXIMUM_QUEUED_ATTESTATIONS { - if self.attestation_delay_debounce.elapsed() { - error!( - queue_size = MAXIMUM_QUEUED_ATTESTATIONS, - msg = "system resources may be saturated", - "Aggregate attestation delay queue is full" - ); - } - // Drop the attestation. - return; - } - - let att_id = QueuedAttestationId::Aggregate(self.next_attestation); - - // Register the delay. - let delay_key = self - .attestations_delay_queue - .insert(att_id, QUEUED_ATTESTATION_DELAY); - - // Register this attestation for the corresponding root. - self.awaiting_attestations_per_root - .entry(*queued_aggregate.beacon_block_root()) - .or_default() - .push(att_id); - - // Store the attestation and its info. - self.queued_aggregates - .insert(self.next_attestation, (queued_aggregate, delay_key)); - - self.next_attestation += 1; - } - InboundEvent::Msg(UnknownBlockUnaggregate(queued_unaggregate)) => { - if self.attestations_delay_queue.len() >= MAXIMUM_QUEUED_ATTESTATIONS { - if self.attestation_delay_debounce.elapsed() { - error!( - queue_size = MAXIMUM_QUEUED_ATTESTATIONS, - msg = "system resources may be saturated", - "Attestation delay queue is full" - ); - } - // Drop the attestation. - return; - } - - let att_id = QueuedAttestationId::Unaggregate(self.next_attestation); - - // Register the delay. - let delay_key = self - .attestations_delay_queue - .insert(att_id, QUEUED_ATTESTATION_DELAY); - - // Register this attestation for the corresponding root. - self.awaiting_attestations_per_root - .entry(*queued_unaggregate.beacon_block_root()) - .or_default() - .push(att_id); - - // Store the attestation and its info. - self.queued_unaggregates - .insert(self.next_attestation, (queued_unaggregate, delay_key)); - - self.next_attestation += 1; - } + InboundEvent::Msg(UnknownBlockAggregate(queued_aggregate)) => self + .queue_awaiting_attestation( + QueuedAttestation::Aggregate(queued_aggregate), + AwaitingComponent::Block, + ), + InboundEvent::Msg(UnknownBlockUnaggregate(queued_unaggregate)) => self + .queue_awaiting_attestation( + QueuedAttestation::Unaggregate(queued_unaggregate), + AwaitingComponent::Block, + ), + InboundEvent::Msg(UnknownPayloadAggregate(queued_aggregate)) => self + .queue_awaiting_attestation( + QueuedAttestation::Aggregate(queued_aggregate), + AwaitingComponent::Payload, + ), + InboundEvent::Msg(UnknownPayloadUnaggregate(queued_unaggregate)) => self + .queue_awaiting_attestation( + QueuedAttestation::Unaggregate(queued_unaggregate), + AwaitingComponent::Payload, + ), InboundEvent::Msg(UnknownBlockDataColumn(queued_data_column)) => { let block_root = queued_data_column.beacon_block_root; @@ -785,10 +824,7 @@ impl ReprocessQueue { self.next_lc_update += 1; } - InboundEvent::Msg(BlockImported { - block_root, - parent_root, - }) => { + InboundEvent::Msg(BlockImported { block_root }) => { // Unqueue the envelope we have for this root, if any. if let Some((envelope, delay_key)) = self.awaiting_envelopes_per_root.remove(&block_root) @@ -853,7 +889,6 @@ impl ReprocessQueue { if failed_to_send_count > 0 { error!( hint = "system may be overloaded", - ?parent_root, ?block_root, failed_count = failed_to_send_count, sent_count, @@ -881,6 +916,59 @@ impl ReprocessQueue { } } } + InboundEvent::Msg(PayloadEnvelopeImported { block_root }) => { + // Release attestations that were awaiting this block's execution payload envelope. + if let Some(queued_ids) = self.awaiting_attestations_per_payload.remove(&block_root) + { + let mut failed_to_send_count = 0; + + for id in queued_ids { + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_ATTESTATIONS, + ); + + if let Some((work, delay_key)) = match id { + QueuedAttestationId::Aggregate(id) => self + .queued_aggregates + .remove(&id) + .map(|(aggregate, delay_key)| { + (ReadyWork::Aggregate(aggregate), delay_key) + }), + QueuedAttestationId::Unaggregate(id) => self + .queued_unaggregates + .remove(&id) + .map(|(unaggregate, delay_key)| { + (ReadyWork::Unaggregate(unaggregate), delay_key) + }), + } { + // Remove the delay. + self.attestations_delay_queue.remove(&delay_key); + + // Send the work. + if self.ready_work_tx.try_send(work).is_err() { + failed_to_send_count += 1; + } + } else { + // There is a mismatch between the attestation ids registered for this + // root and the queued attestations. This should never happen. + error!( + ?block_root, + att_id = ?id, + "Unknown queued attestation for payload envelope" + ); + } + } + + if failed_to_send_count > 0 { + error!( + hint = "system may be overloaded", + ?block_root, + failed_count = failed_to_send_count, + "Ignored scheduled attestation(s) for payload envelope" + ); + } + } + } InboundEvent::Msg(NewLightClientOptimisticUpdate { parent_root }) => { // Unqueue the light client optimistic updates we have for this root, if any. if let Some(queued_lc_id) = self @@ -1033,18 +1121,25 @@ impl ReprocessQueue { ); } - if let Entry::Occupied(mut queued_atts) = - self.awaiting_attestations_per_root.entry(root) - && let Some(index) = - queued_atts.get().iter().position(|&id| id == queued_id) - { - let queued_atts_mut = queued_atts.get_mut(); - queued_atts_mut.swap_remove(index); + // The attestation is awaiting either its block or its payload envelope; prune it + // from whichever map holds it (the other lookup is a no-op) to avoid leaking the + // entry on expiry. + for awaiting in [ + &mut self.awaiting_attestations_per_root, + &mut self.awaiting_attestations_per_payload, + ] { + if let Entry::Occupied(mut queued_atts) = awaiting.entry(root) + && let Some(index) = + queued_atts.get().iter().position(|&id| id == queued_id) + { + let queued_atts_mut = queued_atts.get_mut(); + queued_atts_mut.swap_remove(index); - // If the vec is empty after this attestation's removal, we need to delete - // the entry to prevent bloating the hashmap indefinitely. - if queued_atts_mut.is_empty() { - queued_atts.remove_entry(); + // If the vec is empty after this attestation's removal, we need to + // delete the entry to prevent bloating the hashmap indefinitely. + if queued_atts_mut.is_empty() { + queued_atts.remove_entry(); + } } } } @@ -1412,6 +1507,131 @@ mod tests { assert!(queue.awaiting_attestations_per_root.is_empty()); } + // Regression test for the same memory leak as `prune_awaiting_attestations_per_root`, but for + // attestations awaiting a block's execution payload envelope. + #[tokio::test] + async fn prune_awaiting_attestations_per_payload() { + create_test_tracing_subscriber(); + + let mut queue = test_queue(); + + // Pause time so it only advances manually + tokio::time::pause(); + + let beacon_block_root = Hash256::repeat_byte(0xaf); + + // Insert a payload-present attestation awaiting its payload envelope. + let att = ReprocessQueueMessage::UnknownPayloadUnaggregate(QueuedUnaggregate { + beacon_block_root, + process_fn: Box::new(|| {}), + }); + queue.handle_message(InboundEvent::Msg(att)); + + // Check that it is queued. + assert_eq!(queue.awaiting_attestations_per_payload.len(), 1); + assert!( + queue + .awaiting_attestations_per_payload + .contains_key(&beacon_block_root) + ); + + // Advance time to expire the attestation. + advance_time(&queue.slot_clock, 2 * QUEUED_ATTESTATION_DELAY).await; + let ready_msg = queue.next().await.unwrap(); + assert!(matches!(ready_msg, InboundEvent::ReadyAttestation(_))); + queue.handle_message(ready_msg); + + // The entry should be pruned on expiry. + assert!(queue.awaiting_attestations_per_payload.is_empty()); + } + + // The payload envelope import releases attestations awaiting that block's payload. + #[tokio::test] + async fn release_awaiting_attestations_on_payload_envelope_imported() { + create_test_tracing_subscriber(); + + let mut queue = test_queue(); + tokio::time::pause(); + + let beacon_block_root = Hash256::repeat_byte(0xaf); + + let att = ReprocessQueueMessage::UnknownPayloadUnaggregate(QueuedUnaggregate { + beacon_block_root, + process_fn: Box::new(|| {}), + }); + queue.handle_message(InboundEvent::Msg(att)); + assert_eq!(queue.awaiting_attestations_per_payload.len(), 1); + + // Importing the payload envelope drains the awaiting attestations for that root. + queue.handle_message(InboundEvent::Msg( + ReprocessQueueMessage::PayloadEnvelopeImported { + block_root: beacon_block_root, + }, + )); + assert!(queue.awaiting_attestations_per_payload.is_empty()); + } + + // As `prune_awaiting_attestations_per_payload`, but for an aggregated payload-present + // attestation (`UnknownPayloadAggregate`). + #[tokio::test] + async fn prune_awaiting_attestations_per_payload_aggregate() { + create_test_tracing_subscriber(); + + let mut queue = test_queue(); + tokio::time::pause(); + + let beacon_block_root = Hash256::repeat_byte(0xaf); + + let att = ReprocessQueueMessage::UnknownPayloadAggregate(QueuedAggregate { + beacon_block_root, + process_fn: Box::new(|| {}), + }); + queue.handle_message(InboundEvent::Msg(att)); + + assert_eq!(queue.awaiting_attestations_per_payload.len(), 1); + assert!( + queue + .awaiting_attestations_per_payload + .contains_key(&beacon_block_root) + ); + + // Advance time to expire the attestation. + advance_time(&queue.slot_clock, 2 * QUEUED_ATTESTATION_DELAY).await; + let ready_msg = queue.next().await.unwrap(); + assert!(matches!(ready_msg, InboundEvent::ReadyAttestation(_))); + queue.handle_message(ready_msg); + + // The entry should be pruned on expiry. + assert!(queue.awaiting_attestations_per_payload.is_empty()); + } + + // As `release_awaiting_attestations_on_payload_envelope_imported`, but for an aggregated + // payload-present attestation (`UnknownPayloadAggregate`). + #[tokio::test] + async fn release_awaiting_aggregate_on_payload_envelope_imported() { + create_test_tracing_subscriber(); + + let mut queue = test_queue(); + tokio::time::pause(); + + let beacon_block_root = Hash256::repeat_byte(0xaf); + + let att = ReprocessQueueMessage::UnknownPayloadAggregate(QueuedAggregate { + beacon_block_root, + process_fn: Box::new(|| {}), + }); + queue.handle_message(InboundEvent::Msg(att)); + assert_eq!(queue.awaiting_attestations_per_payload.len(), 1); + + // Importing the payload envelope drains the awaiting attestations for that root. + queue.handle_message(InboundEvent::Msg( + ReprocessQueueMessage::PayloadEnvelopeImported { + block_root: beacon_block_root, + }, + )); + assert!(queue.awaiting_attestations_per_payload.is_empty()); + } + // This is a regression test for a memory leak in `awaiting_lc_updates_per_parent_root`. // See: https://github.com/sigp/lighthouse/pull/8065 #[tokio::test] @@ -1622,7 +1842,6 @@ mod tests { tokio::time::pause(); let beacon_block_root = Hash256::repeat_byte(0xaf); - let parent_root = Hash256::repeat_byte(0xab); // Insert an envelope. let msg = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope { @@ -1640,7 +1859,6 @@ mod tests { // Simulate block import. let imported = ReprocessQueueMessage::BlockImported { block_root: beacon_block_root, - parent_root, }; queue.handle_message(InboundEvent::Msg(imported)); @@ -1716,7 +1934,6 @@ mod tests { // Simulate block import. queue.handle_message(InboundEvent::Msg(ReprocessQueueMessage::BlockImported { block_root: beacon_block_root, - parent_root: Hash256::repeat_byte(0x00), })); // Internal state should be cleaned up. diff --git a/beacon_node/http_api/src/beacon/execution_payload_envelopes.rs b/beacon_node/http_api/src/beacon/execution_payload_envelopes.rs index b6b681e091..d058f66001 100644 --- a/beacon_node/http_api/src/beacon/execution_payload_envelopes.rs +++ b/beacon_node/http_api/src/beacon/execution_payload_envelopes.rs @@ -163,7 +163,7 @@ pub async fn publish_execution_payload_envelope( .await; let mut envelope_imported = match &import_result { - Ok(AvailabilityProcessingStatus::Imported(_)) => true, + Ok(AvailabilityProcessingStatus::Imported(_, _)) => true, Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => false, Err(e) => { warn!(%slot, error = ?e, "Failed to import execution payload envelope"); @@ -210,7 +210,7 @@ pub async fn publish_execution_payload_envelope( if !sampling_columns.is_empty() { match Box::pin(chain.process_gossip_data_columns(sampling_columns, || Ok(()))).await { - Ok(AvailabilityProcessingStatus::Imported(_)) => envelope_imported = true, + Ok(AvailabilityProcessingStatus::Imported(_, _)) => envelope_imported = true, Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => {} Err(e) => { error!( diff --git a/beacon_node/http_api/src/publish_attestations.rs b/beacon_node/http_api/src/publish_attestations.rs index b93f2a0b7b..c1ea241b79 100644 --- a/beacon_node/http_api/src/publish_attestations.rs +++ b/beacon_node/http_api/src/publish_attestations.rs @@ -189,6 +189,46 @@ pub async fn publish_attestations( PublishAttestationResult::Reprocessing(rx) } } + Err(Error::Validation(AttestationError::UnknownPayloadEnvelope { + beacon_block_root, + })) => { + if !allow_reprocess { + return PublishAttestationResult::Failure(Error::ReprocessDisabled); + }; + // Re-process once the block's payload envelope is seen (Gloas). + let (tx, rx) = oneshot::channel(); + let reprocess_chain = chain.clone(); + let reprocess_network_tx = network_tx.clone(); + let reprocess_fn = move || { + let result = verify_and_publish_attestation( + &reprocess_chain, + &attestation, + seen_timestamp, + &reprocess_network_tx, + ); + // Ignore failure on the oneshot that reports the result. This + // shouldn't happen unless some catastrophe befalls the waiting + // thread which causes it to drop. + let _ = tx.send(result); + }; + let reprocess_msg = ReprocessQueueMessage::UnknownPayloadUnaggregate( + QueuedUnaggregate { + beacon_block_root, + process_fn: Box::new(reprocess_fn), + }, + ); + if task_spawner + .try_send(WorkEvent { + drop_during_sync: false, + work: Work::Reprocess(reprocess_msg), + }) + .is_err() + { + PublishAttestationResult::Failure(Error::ReprocessFull) + } else { + PublishAttestationResult::Reprocessing(rx) + } + } Err(Error::Validation(AttestationError::PriorAttestationKnown { .. })) => PublishAttestationResult::AlreadyKnown, diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index b46576ddad..8b45a4b04c 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -266,7 +266,7 @@ pub async fn publish_block>( Err(BlockError::DuplicateFullyImported(root)) => { if publish_fn_completed.load(Ordering::SeqCst) { post_block_import_logging_and_response( - Ok(AvailabilityProcessingStatus::Imported(root)), + Ok(AvailabilityProcessingStatus::Imported(slot, root)), validation_level, block, is_locally_built_block, @@ -474,7 +474,7 @@ async fn post_block_import_logging_and_response( // result of the block being imported from gossip, OR it could be that it finished importing // after processing of a gossip blob. In the latter case we MUST run fork choice to // re-compute the head. - Ok(AvailabilityProcessingStatus::Imported(root)) + Ok(AvailabilityProcessingStatus::Imported(_, root)) | Err(BlockError::DuplicateFullyImported(root)) => { let delay = get_block_delay_ms(seen_timestamp, block.message(), &chain.slot_clock); info!( diff --git a/beacon_node/http_api/tests/interactive_tests.rs b/beacon_node/http_api/tests/interactive_tests.rs index e6135a81c7..9258dab1af 100644 --- a/beacon_node/http_api/tests/interactive_tests.rs +++ b/beacon_node/http_api/tests/interactive_tests.rs @@ -876,7 +876,6 @@ async fn queue_attestations_from_http() { // In parallel, apply the block. We need to manually notify the reprocess queue, because the // `beacon_chain` does not know about the queue and will not update it for us. - let parent_root = block.0.parent_root(); harness .process_block(attestation_slot, block_root, block) .await @@ -888,10 +887,7 @@ async fn queue_attestations_from_http() { .unwrap() .try_send(WorkEvent { drop_during_sync: false, - work: Work::Reprocess(ReprocessQueueMessage::BlockImported { - block_root, - parent_root, - }), + work: Work::Reprocess(ReprocessQueueMessage::BlockImported { block_root }), }) .unwrap(); diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index c043133cee..1a664662df 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -22,6 +22,13 @@ pub(crate) enum BlockSource { Rpc, } +/// The path through which a payload envelope was imported. +#[derive(Debug, Clone, Copy, AsRefStr)] +pub(crate) enum EnvelopeSource { + Gossip, + Rpc, +} + pub static BEACON_BLOCK_MESH_PEERS_PER_CLIENT: LazyLock> = LazyLock::new(|| { try_create_int_gauge_vec( diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index b52732000e..20342c1aa9 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1,5 +1,5 @@ use crate::{ - metrics::{self, register_process_result_metrics}, + metrics::{self, EnvelopeSource, register_process_result_metrics}, network_beacon_processor::{InvalidBlockStorage, NetworkBeaconProcessor}, service::NetworkMessage, sync::SyncMessage, @@ -70,6 +70,45 @@ use beacon_processor::{ /// messages. const STRICT_LATE_MESSAGE_PENALTIES: bool = false; +/// Tracks which kinds of attestation re-processing are still permitted for a gossip attestation +/// or aggregate. +/// +/// A new attestation may be re-queued for an unknown block, then (post-Gloas) for an unknown +/// payload envelope, and finally not at all. Each re-queue narrows the allowance to the next +/// variant. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ReprocessAllowance { + /// Re-queue for either an unknown block or an unknown payload envelope. + BlockAndPayload, + /// Re-queue only for an unknown payload envelope (already re-queued once for the block). + PayloadOnly, + /// Do not re-queue again. + None, +} + +impl ReprocessAllowance { + /// Whether the attestation may be re-queued for an unknown block. + fn allows_block(self) -> bool { + matches!(self, ReprocessAllowance::BlockAndPayload) + } + + /// Whether the attestation may be re-queued for an unknown payload envelope. + fn allows_payload(self) -> bool { + matches!( + self, + ReprocessAllowance::BlockAndPayload | ReprocessAllowance::PayloadOnly + ) + } + + /// Re-queuing always narrows the allowance so a message can't loop indefinitely. + fn next_requeue(self) -> Self { + match self { + ReprocessAllowance::BlockAndPayload => ReprocessAllowance::PayloadOnly, + ReprocessAllowance::PayloadOnly | ReprocessAllowance::None => ReprocessAllowance::None, + } + } +} + /// An attestation that has been validated by the `BeaconChain`. /// /// Since this struct implements `beacon_chain::VerifiedAttestation`, it would be a logic error to @@ -233,7 +272,7 @@ impl NetworkBeaconProcessor { attestation: Box, subnet_id: SubnetId, should_import: bool, - allow_reprocess: bool, + reprocess_allowance: ReprocessAllowance, seen_timestamp: Duration, ) { let result = match self @@ -256,7 +295,7 @@ impl NetworkBeaconProcessor { message_id, peer_id, subnet_id, - allow_reprocess, + reprocess_allowance, should_import, seen_timestamp, ); @@ -265,7 +304,7 @@ impl NetworkBeaconProcessor { pub fn process_gossip_attestation_batch( self: Arc, packages: GossipAttestationBatch, - allow_reprocess: bool, + reprocess_allowance: ReprocessAllowance, ) { let attestations_and_subnets = packages .iter() @@ -326,7 +365,7 @@ impl NetworkBeaconProcessor { package.message_id, package.peer_id, package.subnet_id, - allow_reprocess, + reprocess_allowance, package.should_import, package.seen_timestamp, ); @@ -342,7 +381,7 @@ impl NetworkBeaconProcessor { message_id: MessageId, peer_id: PeerId, subnet_id: SubnetId, - allow_reprocess: bool, + reprocess_allowance: ReprocessAllowance, should_import: bool, seen_timestamp: Duration, ) { @@ -426,7 +465,7 @@ impl NetworkBeaconProcessor { should_import, seen_timestamp, }, - allow_reprocess, + reprocess_allowance, error, seen_timestamp, ); @@ -446,7 +485,7 @@ impl NetworkBeaconProcessor { message_id: MessageId, peer_id: PeerId, aggregate: Box>, - allow_reprocess: bool, + reprocess_allowance: ReprocessAllowance, seen_timestamp: Duration, ) { let beacon_block_root = aggregate.message().aggregate().data().beacon_block_root; @@ -470,7 +509,7 @@ impl NetworkBeaconProcessor { beacon_block_root, message_id, peer_id, - allow_reprocess, + reprocess_allowance, seen_timestamp, ); } @@ -478,7 +517,7 @@ impl NetworkBeaconProcessor { pub fn process_gossip_aggregate_batch( self: Arc, packages: Vec>, - allow_reprocess: bool, + reprocess_allowance: ReprocessAllowance, ) { let aggregates = packages.iter().map(|package| package.aggregate.as_ref()); @@ -532,7 +571,7 @@ impl NetworkBeaconProcessor { package.beacon_block_root, package.message_id, package.peer_id, - allow_reprocess, + reprocess_allowance, package.seen_timestamp, ); } @@ -544,7 +583,7 @@ impl NetworkBeaconProcessor { beacon_block_root: Hash256, message_id: MessageId, peer_id: PeerId, - allow_reprocess: bool, + reprocess_allowance: ReprocessAllowance, seen_timestamp: Duration, ) { match result { @@ -624,7 +663,7 @@ impl NetworkBeaconProcessor { attestation: signed_aggregate, seen_timestamp, }, - allow_reprocess, + reprocess_allowance, error, seen_timestamp, ); @@ -918,12 +957,13 @@ impl NetworkBeaconProcessor { match result { Ok(availability) => match availability { - AvailabilityProcessingStatus::Imported(block_root) => { + AvailabilityProcessingStatus::Imported(slot, block_root) => { debug!( %block_root, "Gossipsub data column processed, imported fully available block" ); self.chain.recompute_head_at_current_slot().await; + self.notify_import_after_column(slot, block_root); metrics::set_gauge( &metrics::BEACON_BLOB_DELAY_FULL_VERIFICATION, @@ -1311,12 +1351,13 @@ impl NetworkBeaconProcessor { match &result { Ok(availability) => match availability { - AvailabilityProcessingStatus::Imported(block_root) => { + AvailabilityProcessingStatus::Imported(slot, block_root) => { debug!( %block_root, "Data column from partial processed, imported fully available block" ); self.chain.recompute_head_at_current_slot().await; + self.notify_import_after_column(*slot, *block_root); metrics::set_gauge( &metrics::BEACON_BLOB_DELAY_FULL_VERIFICATION, @@ -1784,24 +1825,8 @@ impl NetworkBeaconProcessor { register_process_result_metrics(&result, metrics::BlockSource::Gossip, "block"); match &result { - Ok(AvailabilityProcessingStatus::Imported(block_root)) => { - if self - .beacon_processor_send - .try_send(WorkEvent { - drop_during_sync: false, - work: Work::Reprocess(ReprocessQueueMessage::BlockImported { - block_root: *block_root, - parent_root: block.message().parent_root(), - }), - }) - .is_err() - { - error!( - source = "gossip", - ?block_root, - "Failed to inform block import" - ) - }; + Ok(AvailabilityProcessingStatus::Imported(_, block_root)) => { + self.notify_block_imported(*block_root); debug!( ?block_root, @@ -2458,7 +2483,7 @@ impl NetworkBeaconProcessor { peer_id: PeerId, message_id: MessageId, failed_att: FailedAtt, - allow_reprocess: bool, + reprocess_allowance: ReprocessAllowance, error: AttnError, seen_timestamp: Duration, ) { @@ -2717,7 +2742,7 @@ impl NetworkBeaconProcessor { block = ?beacon_block_root, "Attestation for unknown block" ); - if allow_reprocess { + if reprocess_allowance.allows_block() { // We don't know the block, get the sync manager to handle the block lookup, and // send the attestation to be scheduled for re-processing. self.send_sync_message(SyncMessage::UnknownBlockHashFromAttestation( @@ -2740,7 +2765,7 @@ impl NetworkBeaconProcessor { message_id, peer_id, attestation, - false, // Do not allow this attestation to be re-processed beyond this point. + reprocess_allowance.next_requeue(), seen_timestamp, ) }), @@ -2765,7 +2790,7 @@ impl NetworkBeaconProcessor { attestation, subnet_id, should_import, - false, // Do not allow this attestation to be re-processed beyond this point. + reprocess_allowance.next_requeue(), seen_timestamp, ) }), @@ -2797,6 +2822,89 @@ impl NetworkBeaconProcessor { return; } + AttnError::UnknownPayloadEnvelope { beacon_block_root } => { + trace!( + %peer_id, + block = ?beacon_block_root, + "Payload-present attestation for block with unseen payload envelope" + ); + if reprocess_allowance.allows_payload() { + // We haven't seen the block's payload envelope yet. Ask the sync manager to + // retrieve it, and schedule the attestation for re-processing once it arrives. + self.send_sync_message(SyncMessage::UnknownPayloadEnvelopeFromAttestation( + peer_id, + *beacon_block_root, + )); + let msg = match failed_att { + FailedAtt::Aggregate { + attestation, + seen_timestamp, + } => { + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_REQUEUED_TOTAL, + ); + let processor = self.clone(); + ReprocessQueueMessage::UnknownPayloadAggregate(QueuedAggregate { + beacon_block_root: *beacon_block_root, + process_fn: Box::new(move || { + processor.process_gossip_aggregate( + message_id, + peer_id, + attestation, + reprocess_allowance.next_requeue(), + seen_timestamp, + ) + }), + }) + } + FailedAtt::Unaggregate { + attestation, + subnet_id, + should_import, + seen_timestamp, + } => { + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_REQUEUED_TOTAL, + ); + let processor = self.clone(); + ReprocessQueueMessage::UnknownPayloadUnaggregate(QueuedUnaggregate { + beacon_block_root: *beacon_block_root, + process_fn: Box::new(move || { + processor.process_gossip_attestation( + message_id, + peer_id, + attestation, + subnet_id, + should_import, + reprocess_allowance.next_requeue(), + seen_timestamp, + ) + }), + }) + } + }; + + if self + .beacon_processor_send + .try_send(WorkEvent { + drop_during_sync: false, + work: Work::Reprocess(msg), + }) + .is_err() + { + error!("Failed to send attestation for re-processing") + } + } else { + // We shouldn't make any further attempts to process this attestation. + self.propagate_validation_result( + message_id, + peer_id, + MessageAcceptance::Ignore, + ); + } + + return; + } AttnError::UnknownTargetRoot(_) => { /* * The block indicated by the target root is not known to us. @@ -3796,10 +3904,13 @@ impl NetworkBeaconProcessor { // register_process_result_metrics(&result, metrics::BlockSource::Gossip, "envelope"); match &result { - Ok(AvailabilityProcessingStatus::Imported(_)) => { + Ok(AvailabilityProcessingStatus::Imported(_, block_root)) => { self.chain.recompute_head_at_current_slot().await; + // The payload envelope is imported (`is_payload_received` is now true); release any + // attestations awaiting this block's payload so they can be re-processed. + self.notify_payload_envelope_imported(*block_root, EnvelopeSource::Gossip); } - Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => {} + Ok(_) => {} Err(e) => { debug!( ?beacon_block_root, @@ -3811,6 +3922,64 @@ impl NetworkBeaconProcessor { } } + /// Inform the reprocess queue that a fully available block (or its payload envelope, post-gloas) + /// has been imported, so any attestations waiting on it can be released. + fn notify_import_after_column(&self, slot: Slot, block_root: Hash256) { + if self + .chain + .spec + .fork_name_at_slot::(slot) + .gloas_enabled() + { + self.notify_payload_envelope_imported(block_root, EnvelopeSource::Gossip); + } else { + self.notify_block_imported(block_root); + } + } + + /// Inform the reprocess queue that `block_root` has been imported as a full block. + fn notify_block_imported(&self, block_root: Hash256) { + if self + .beacon_processor_send + .try_send(WorkEvent { + drop_during_sync: false, + work: Work::Reprocess(ReprocessQueueMessage::BlockImported { block_root }), + }) + .is_err() + { + error!( + source = "gossip", + ?block_root, + "Failed to inform block import" + ) + }; + } + + /// Inform the reprocess queue that `block_root`'s payload envelope has been imported, releasing + /// any attestations awaiting the payload. `source` identifies the import path for logging. + pub(crate) fn notify_payload_envelope_imported( + &self, + block_root: Hash256, + source: EnvelopeSource, + ) { + if self + .beacon_processor_send + .try_send(WorkEvent { + drop_during_sync: false, + work: Work::Reprocess(ReprocessQueueMessage::PayloadEnvelopeImported { + block_root, + }), + }) + .is_err() + { + error!( + source = source.as_ref(), + ?block_root, + "Failed to inform payload envelope import" + ) + }; + } + #[instrument( name = "lh_process_execution_payload_bid", parent = None, @@ -4059,3 +4228,42 @@ impl NetworkBeaconProcessor { } } } + +#[cfg(test)] +mod tests { + use super::ReprocessAllowance::{BlockAndPayload, None, PayloadOnly}; + + #[test] + fn reprocess_allowance_gates() { + // A block re-queue is only permitted for a freshly received attestation. + assert!(BlockAndPayload.allows_block()); + assert!(!PayloadOnly.allows_block()); + assert!(!None.allows_block()); + + // A payload-envelope re-queue is permitted until we've already re-queued for it. + assert!(BlockAndPayload.allows_payload()); + assert!(PayloadOnly.allows_payload()); + assert!(!None.allows_payload()); + } + + #[test] + fn reprocess_allowance_progression() { + // Each re-queue narrows the allowance to the next variant in the progression. + assert_eq!(BlockAndPayload.next_requeue(), PayloadOnly); + assert_eq!(PayloadOnly.next_requeue(), None); + assert_eq!(None.next_requeue(), None); + } + + #[test] + fn reprocess_allowance_is_bounded() { + // Safety property: from any starting state, re-queuing twice reaches the terminal `None`, + // so an attestation can never loop indefinitely. + for start in [BlockAndPayload, PayloadOnly, None] { + assert_eq!( + start.next_requeue().next_requeue(), + None, + "re-queuing twice from {start:?} should be terminal" + ); + } + } +} diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index a9579caaeb..7619f706cc 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -39,6 +39,8 @@ use { pub use sync_methods::{BlockProcessingResult, ChainSegmentProcessId}; +use gossip_methods::ReprocessAllowance; + pub type Error = TrySendError>; mod gossip_methods; @@ -93,15 +95,17 @@ impl NetworkBeaconProcessor { package.attestation, package.subnet_id, package.should_import, - true, + ReprocessAllowance::BlockAndPayload, package.seen_timestamp, ) }; // Define a closure for processing batches of attestations. let processor = self.clone(); - let process_batch = - move |attestations| processor.process_gossip_attestation_batch(attestations, true); + let process_batch = move |attestations| { + processor + .process_gossip_attestation_batch(attestations, ReprocessAllowance::BlockAndPayload) + }; self.try_send(BeaconWorkEvent { drop_during_sync: true, @@ -135,15 +139,17 @@ impl NetworkBeaconProcessor { package.message_id, package.peer_id, package.aggregate, - true, + ReprocessAllowance::BlockAndPayload, package.seen_timestamp, ) }; // Define a closure for processing batches of attestations. let processor = self.clone(); - let process_batch = - move |aggregates| processor.process_gossip_aggregate_batch(aggregates, true); + let process_batch = move |aggregates| { + processor + .process_gossip_aggregate_batch(aggregates, ReprocessAllowance::BlockAndPayload) + }; let beacon_block_root = aggregate.message().aggregate().data().beacon_block_root; self.try_send(BeaconWorkEvent { @@ -932,7 +938,7 @@ impl NetworkBeaconProcessor { .await { Ok(Some(availability)) => match availability { - AvailabilityProcessingStatus::Imported(_) => { + AvailabilityProcessingStatus::Imported(..) => { debug!( result = "imported block and custody columns", %block_root, @@ -1020,7 +1026,7 @@ impl NetworkBeaconProcessor { Ok(Some((availability_processing_status, data_columns_to_publish))) => { self.publish_data_columns_gradually(data_columns_to_publish, block_root); match &availability_processing_status { - AvailabilityProcessingStatus::Imported(hash) => { + AvailabilityProcessingStatus::Imported(_, hash) => { debug!( result = "imported block and custody columns", block_hash = %hash, diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index caf718732b..35437e1a2e 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -1,4 +1,4 @@ -use crate::metrics::{self, register_process_result_metrics}; +use crate::metrics::{self, EnvelopeSource, register_process_result_metrics}; use crate::network_beacon_processor::{FUTURE_SLOT_TOLERANCE, NetworkBeaconProcessor}; use crate::sync::BatchProcessResult; use crate::sync::manager::CustodyBatchProcessResult; @@ -158,8 +158,6 @@ impl NetworkBeaconProcessor { return; }; - let slot = block.slot(); - let parent_root = block.message().parent_root(); let commitments_formatted = block.as_block().commitments_formatted(); debug!( @@ -186,17 +184,14 @@ impl NetworkBeaconProcessor { // RPC block imported, regardless of process type match result.as_ref() { - Ok(AvailabilityProcessingStatus::Imported(hash)) => { + Ok(AvailabilityProcessingStatus::Imported(slot, hash)) => { info!( %slot, %hash, "New RPC block received", ); // Trigger processing for work referencing this block. - let reprocess_msg = ReprocessQueueMessage::BlockImported { - block_root: *hash, - parent_root, - }; + let reprocess_msg = ReprocessQueueMessage::BlockImported { block_root: *hash }; if self .beacon_processor_send .try_send(WorkEvent { @@ -213,7 +208,7 @@ impl NetworkBeaconProcessor { }; self.chain.block_times_cache.write().set_time_observed( *hash, - slot, + *slot, seen_timestamp, None, None, @@ -294,7 +289,7 @@ impl NetworkBeaconProcessor { match &result { Ok(availability) => match availability { - AvailabilityProcessingStatus::Imported(hash) => { + AvailabilityProcessingStatus::Imported(_, hash) => { debug!( result = "imported block and custody columns", block_hash = %hash, @@ -376,8 +371,11 @@ impl NetworkBeaconProcessor { let result: Result = result.map_err(|e| BlockError::InternalError(format!("envelope: {e}"))); - if matches!(result, Ok(AvailabilityProcessingStatus::Imported(_))) { + // The payload envelope is imported; release any attestations awaiting this block's payload + // so they can be re-processed (parity with the gossip import path). + if let Ok(AvailabilityProcessingStatus::Imported(_, block_root)) = &result { self.chain.recompute_head_at_current_slot().await; + self.notify_payload_envelope_imported(*block_root, EnvelopeSource::Rpc); } self.send_sync_message(SyncMessage::BlockComponentProcessed { @@ -1022,7 +1020,7 @@ impl From> for BlockProcessingR )) } match result { - Ok(AvailabilityProcessingStatus::Imported(_)) => Self::Imported(true, "imported"), + Ok(AvailabilityProcessingStatus::Imported(..)) => Self::Imported(true, "imported"), Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => { Self::Imported(false, "missing_components") } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 8e7b8cd05a..3282f7f083 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -156,6 +156,11 @@ pub enum SyncMessage { /// manager to attempt to find the block matching the unknown hash. UnknownBlockHashFromAttestation(PeerId, Hash256), + /// A peer has sent a payload-present attestation (`index == 1`) for a block whose execution + /// payload envelope we have not seen. This triggers the manager to fetch the payload envelope + /// for `block_root` via `ExecutionPayloadEnvelopesByRoot`. + UnknownPayloadEnvelopeFromAttestation(PeerId, Hash256), + /// A peer has disconnected. Disconnect(PeerId), @@ -260,6 +265,10 @@ pub struct SyncManager { /// may forward us thousands of a attestations, each one triggering an individual event. Only /// one event is useful, the rest generating log noise and wasted cycles notified_unknown_roots: LRUTimeCache<(PeerId, Hash256)>, + /// Debounce duplicated `UnknownPayloadEnvelopeFromAttestation` for the same root/peer tuple, + /// for the same reason as `notified_unknown_roots`: a peer may forward many payload-present + /// attestations for a block whose execution payload envelope we have not yet seen. + notified_unknown_payload_roots: LRUTimeCache<(PeerId, Hash256)>, } /// Spawns a new `SyncManager` thread which has a weak reference to underlying beacon @@ -320,6 +329,9 @@ impl SyncManager { notified_unknown_roots: LRUTimeCache::new(Duration::from_secs( NOTIFIED_UNKNOWN_ROOT_EXPIRY_SECONDS, )), + notified_unknown_payload_roots: LRUTimeCache::new(Duration::from_secs( + NOTIFIED_UNKNOWN_ROOT_EXPIRY_SECONDS, + )), } } @@ -895,6 +907,22 @@ impl SyncManager { self.handle_unknown_block_root(peer_id, block_root); } } + SyncMessage::UnknownPayloadEnvelopeFromAttestation(peer_id, block_root) => { + if !self + .notified_unknown_payload_roots + .contains(&(peer_id, block_root)) + { + self.notified_unknown_payload_roots + .insert((peer_id, block_root)); + // TODO(gloas): trigger a payload-envelope lookup for `block_root` via + // `ExecutionPayloadEnvelopesByRoot`. Wired up in the gloas lookup-sync PR (#9155). + debug!( + ?block_root, + ?peer_id, + "Received unknown payload envelope from attestation" + ); + } + } SyncMessage::Disconnect(peer_id) => { debug!(%peer_id, "Received disconnected message"); self.peer_disconnect(&peer_id); diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 13eeaee9aa..621824c7d2 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -1989,7 +1989,7 @@ impl TestRig { block: Arc>, ) { match self.import_block_to_da_checker(block).await { - AvailabilityProcessingStatus::Imported(_) => { + AvailabilityProcessingStatus::Imported(..) => { panic!("block removed from da_checker, available") } AvailabilityProcessingStatus::MissingComponents(_, block_root) => { diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index 90f2bb9a67..e648d5669b 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -1064,6 +1064,8 @@ where execution_payload_parent_hash, execution_payload_block_hash, proposer_index: Some(block.proposer_index()), + // Set on payload-envelope import, not block import. + payload_received: false, }, current_slot, spec, diff --git a/consensus/proto_array/benches/find_head.rs b/consensus/proto_array/benches/find_head.rs index 98077a7f97..07edc4d46f 100644 --- a/consensus/proto_array/benches/find_head.rs +++ b/consensus/proto_array/benches/find_head.rs @@ -68,6 +68,7 @@ fn build_chain(num_blocks: u64, gloas: bool) -> (ProtoArrayForkChoice, types::Ch }, execution_payload_block_hash: if is_gloas { Some(get_hash(i)) } else { None }, proposer_index: Some(0), + payload_received: false, }; fork_choice diff --git a/consensus/proto_array/src/fork_choice_test_definition.rs b/consensus/proto_array/src/fork_choice_test_definition.rs index 7ffa763308..d9acda1258 100644 --- a/consensus/proto_array/src/fork_choice_test_definition.rs +++ b/consensus/proto_array/src/fork_choice_test_definition.rs @@ -330,6 +330,7 @@ impl ForkChoiceTestDefinition { execution_payload_parent_hash, execution_payload_block_hash, proposer_index: Some(0), + payload_received: false, }; fork_choice .process_block::(block, slot, &spec, Duration::ZERO) diff --git a/consensus/proto_array/src/proto_array_fork_choice.rs b/consensus/proto_array/src/proto_array_fork_choice.rs index 69202486a7..90143f1dd1 100644 --- a/consensus/proto_array/src/proto_array_fork_choice.rs +++ b/consensus/proto_array/src/proto_array_fork_choice.rs @@ -242,6 +242,8 @@ pub struct Block { pub execution_payload_parent_hash: Option, pub execution_payload_block_hash: Option, pub proposer_index: Option, + /// Whether the block's execution payload envelope has been received. Always `false` pre-Gloas. + pub payload_received: bool, } impl Block { @@ -502,6 +504,7 @@ impl ProtoArrayForkChoice { execution_payload_parent_hash, execution_payload_block_hash, proposer_index: Some(proposer_index), + payload_received: false, }; proto_array @@ -959,6 +962,7 @@ impl ProtoArrayForkChoice { execution_payload_parent_hash: block.execution_payload_parent_hash().ok(), execution_payload_block_hash: block.execution_payload_block_hash().ok(), proposer_index: block.proposer_index().ok(), + payload_received: block.payload_received().unwrap_or(false), }) } @@ -1383,6 +1387,7 @@ mod test_compute_deltas { execution_payload_parent_hash: None, execution_payload_block_hash: None, proposer_index: Some(0), + payload_received: false, }, genesis_slot + 1, &spec, @@ -1411,6 +1416,7 @@ mod test_compute_deltas { execution_payload_parent_hash: None, execution_payload_block_hash: None, proposer_index: Some(0), + payload_received: false, }, genesis_slot + 1, &spec, @@ -1547,6 +1553,7 @@ mod test_compute_deltas { execution_payload_parent_hash: None, execution_payload_block_hash: None, proposer_index: Some(0), + payload_received: false, }, Slot::from(block.slot), &spec,