From 4bbc74cf59a155590c724b1bbcde19ff56369d73 Mon Sep 17 00:00:00 2001 From: hopinheimer Date: Fri, 17 Apr 2026 19:43:49 -0400 Subject: [PATCH] wiring up `process_gossip_payload_attestation` and implement observe cache --- beacon_node/beacon_chain/src/beacon_chain.rs | 37 ++++- beacon_node/beacon_chain/src/builder.rs | 1 + beacon_node/beacon_chain/src/lib.rs | 1 + .../beacon_chain/src/observed_attesters.rs | 42 ++++- .../gossip_methods.rs | 151 ++++++++++++++++-- consensus/types/src/core/consts.rs | 1 + 6 files changed, 219 insertions(+), 14 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index acf7ad9c4c..6718b33958 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -48,12 +48,16 @@ use crate::observed_aggregates::{ Error as AttestationObservationError, ObservedAggregateAttestations, ObservedSyncContributions, }; use crate::observed_attesters::{ - ObservedAggregators, ObservedAttesters, ObservedSyncAggregators, ObservedSyncContributors, + ObservedAggregators, ObservedAttesters, ObservedPayloadAttesters, ObservedSyncAggregators, + ObservedSyncContributors, }; use crate::observed_block_producers::ObservedBlockProducers; use crate::observed_data_sidecars::ObservedDataSidecars; use crate::observed_operations::{ObservationOutcome, ObservedOperations}; use crate::observed_slashable::ObservedSlashable; +use crate::payload_attestation_verification::{ + Error as PayloadAttestationError, VerifiedPayloadAttestationMessage, +}; use crate::payload_bid_verification::payload_bid_cache::GossipVerifiedPayloadBidCache; #[cfg(not(test))] use crate::payload_envelope_streamer::{EnvelopeRequestSource, launch_payload_envelope_stream}; @@ -412,6 +416,9 @@ pub struct BeaconChain { /// Maintains a record of which validators have been seen to create `SignedContributionAndProofs` /// in recent epochs. pub(crate) observed_sync_aggregators: RwLock>, + /// Maintains a record of which validators have sent payload attestation messages + /// in recent slots. + pub(crate) observed_payload_attesters: RwLock>, /// Maintains a record of which validators have proposed blocks for each slot. pub observed_block_producers: RwLock>, /// Maintains a record of blob sidecars seen over the gossip network. @@ -2194,6 +2201,34 @@ impl BeaconChain { }) } + pub fn verify_payload_attestation_message_for_gossip( + &self, + payload_attestation_message: PayloadAttestationMessage, + ) -> Result, PayloadAttestationError> { + metrics::inc_counter(&metrics::PAYLOAD_ATTESTATION_PROCESSING_REQUESTS); + let _timer = metrics::start_timer(&metrics::PAYLOAD_ATTESTATION_GOSSIP_VERIFICATION_TIMES); + + VerifiedPayloadAttestationMessage::verify(payload_attestation_message, self).inspect(|_| { + metrics::inc_counter(&metrics::PAYLOAD_ATTESTATION_PROCESSING_SUCCESSES); + }) + } + + pub fn apply_payload_attestation_to_fork_choice( + &self, + indexed_payload_attestation: &IndexedPayloadAttestation, + ptc: &PTC, + ) -> Result<(), Error> { + self.canonical_head + .fork_choice_write_lock() + .on_payload_attestation( + self.slot()?, + indexed_payload_attestation, + AttestationFromBlock::False, + &ptc.0, + ) + .map_err(Into::into) + } + /// Accepts some `SyncCommitteeMessage` from the network and attempts to verify it, returning `Ok(_)` if /// it is valid to be (re)broadcast on the gossip network. pub fn verify_sync_committee_message_for_gossip( diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index b963f7c342..ed726b5d10 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -1010,6 +1010,7 @@ where observed_aggregators: <_>::default(), // TODO: allow for persisting and loading the pool from disk. observed_sync_aggregators: <_>::default(), + observed_payload_attesters: <_>::default(), // TODO: allow for persisting and loading the pool from disk. observed_block_producers: <_>::default(), observed_column_sidecars: RwLock::new(ObservedDataSidecars::new(self.spec.clone())), diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index a8a706d8bc..0cd6c1b8d3 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -43,6 +43,7 @@ pub mod observed_block_producers; pub mod observed_data_sidecars; pub mod observed_operations; mod observed_slashable; +pub mod payload_attestation_verification; pub mod payload_bid_verification; pub mod payload_envelope_streamer; pub mod payload_envelope_verification; diff --git a/beacon_node/beacon_chain/src/observed_attesters.rs b/beacon_node/beacon_chain/src/observed_attesters.rs index 277bf38ffc..79bc181843 100644 --- a/beacon_node/beacon_chain/src/observed_attesters.rs +++ b/beacon_node/beacon_chain/src/observed_attesters.rs @@ -14,7 +14,7 @@ //! - `ObservedSyncAggregators`: allows filtering sync committee contributions from the same aggregators in //! the same slot and in the same subcommittee. -use crate::types::consts::altair::TARGET_AGGREGATORS_PER_SYNC_SUBCOMMITTEE; +use crate::types::consts::{altair::TARGET_AGGREGATORS_PER_SYNC_SUBCOMMITTEE, gloas::PTC_SIZE}; use bitvec::vec::BitVec; use std::collections::{HashMap, HashSet}; use std::hash::Hash; @@ -42,6 +42,8 @@ pub type ObservedSyncContributors = pub type ObservedAggregators = AutoPruningEpochContainer; pub type ObservedSyncAggregators = AutoPruningSlotContainer; +pub type ObservedPayloadAttesters = + AutoPruningSlotContainer; #[derive(Debug, PartialEq)] pub enum Error { @@ -255,6 +257,44 @@ impl Item<()> for SyncAggregatorSlotHashSet { } } +/// Stores a `HashSet` of validator indices that have sent a payload attestation gossip +/// message during a slot. +pub struct PayloadAttesterSlotHashSet { + set: HashSet, +} + +impl Item<()> for PayloadAttesterSlotHashSet { + fn with_capacity(capacity: usize) -> Self { + Self { + set: HashSet::with_capacity(capacity), + } + } + + /// Defaults to `PTC_SIZE`, the maximum number of payload attesters per slot. + fn default_capacity() -> usize { + PTC_SIZE as usize + } + + fn len(&self) -> usize { + self.set.len() + } + + fn validator_count(&self) -> usize { + self.set.len() + } + + /// Inserts the `validator_index` in the set. Returns `true` if the `validator_index` was + /// already in the set. + fn insert(&mut self, validator_index: usize, _value: ()) -> bool { + !self.set.insert(validator_index) + } + + /// Returns `true` if the `validator_index` is in the set. + fn get(&self, validator_index: usize) -> Option<()> { + self.set.contains(&validator_index).then_some(()) + } +} + /// A container that stores some number of `T` items. /// /// This container is "auto-pruning" since it gets an idea of the current slot by which diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 2238cb2f17..9387a4df3f 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -13,6 +13,9 @@ use beacon_chain::{ light_client_finality_update_verification::Error as LightClientFinalityUpdateError, light_client_optimistic_update_verification::Error as LightClientOptimisticUpdateError, observed_operations::ObservationOutcome, + payload_attestation_verification::{ + Error as PayloadAttestationError, VerifiedPayloadAttestationMessage, + }, sync_committee_verification::{self, Error as SyncCommitteeError}, validator_monitor::{get_block_delay_ms, get_slot_delay_ms}, }; @@ -130,6 +133,11 @@ struct RejectedAggregate { error: AttnError, } +struct RejectedPayloadAttestation { + error: PayloadAttestationError, + message_slot: Slot, +} + /// Data for an aggregated or unaggregated attestation that failed verification. enum FailedAtt { Unaggregate { @@ -3648,25 +3656,144 @@ impl NetworkBeaconProcessor { } } - // TODO(gloas) dont forget to add tracing instrumentation + #[instrument( + level = "trace", + skip(self, message_id, peer_id, payload_attestation_message), + fields( + peer_id = %peer_id, + slot = %payload_attestation_message.data.slot, + validator_index = payload_attestation_message.validator_index, + ) + )] pub fn process_gossip_payload_attestation( self: &Arc, message_id: MessageId, peer_id: PeerId, payload_attestation_message: PayloadAttestationMessage, ) { - // TODO(EIP-7732): Implement proper payload attestation message gossip processing. - // This should integrate with a payload_attestation_verification.rs module once it's implemented. + let message_slot = payload_attestation_message.data.slot; - trace!( - %peer_id, - validator_index = payload_attestation_message.validator_index, - slot = %payload_attestation_message.data.slot, - beacon_block_root = %payload_attestation_message.data.beacon_block_root, - "Processing payload attestation message" - ); + let result = self + .chain + .verify_payload_attestation_message_for_gossip(payload_attestation_message) + .map_err(|error| RejectedPayloadAttestation { + error, + message_slot, + }); - // For now, ignore all payload attestation messages since verification is not implemented - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + self.process_gossip_payload_attestation_result(result, message_id, peer_id); + } + + fn process_gossip_payload_attestation_result( + self: &Arc, + result: Result, RejectedPayloadAttestation>, + message_id: MessageId, + peer_id: PeerId, + ) { + match result { + Ok(verified) => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); + + if let Err(e) = self.chain.apply_payload_attestation_to_fork_choice( + verified.indexed_payload_attestation(), + verified.ptc(), + ) { + match e { + BeaconChainError::ForkChoiceError( + ForkChoiceError::InvalidPayloadAttestation(e), + ) => { + debug!( + reason = ?e, + %peer_id, + "Payload attestation invalid for fork choice" + ) + } + e => error!( + reason = ?e, + %peer_id, + "Error applying payload attestation to fork choice" + ), + } + } + } + Err(RejectedPayloadAttestation { + error, + message_slot, + }) => { + self.handle_payload_attestation_verification_failure( + peer_id, + message_id, + error, + message_slot, + ); + } + } + } + + fn handle_payload_attestation_verification_failure( + &self, + peer_id: PeerId, + message_id: MessageId, + error: PayloadAttestationError, + message_slot: Slot, + ) { + match &error { + PayloadAttestationError::FutureSlot { .. } => { + self.gossip_penalize_peer( + peer_id, + PeerAction::HighToleranceError, + "payload_attn_future_slot", + ); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + } + PayloadAttestationError::PastSlot { .. } => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + } + PayloadAttestationError::PriorPayloadAttestationMessageKnown { .. } => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + } + PayloadAttestationError::UnknownHeadBlock { .. } => { + debug!( + %peer_id, + %message_slot, + "Payload attestation references unknown block" + ); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + } + PayloadAttestationError::NotInPTC { .. } => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + self.gossip_penalize_peer( + peer_id, + PeerAction::LowToleranceError, + "payload_attn_not_in_ptc", + ); + } + PayloadAttestationError::UnknownValidatorIndex(_) => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + self.gossip_penalize_peer( + peer_id, + PeerAction::LowToleranceError, + "payload_attn_unknown_validator", + ); + } + PayloadAttestationError::InvalidSignature => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + self.gossip_penalize_peer( + peer_id, + PeerAction::LowToleranceError, + "payload_attn_invalid_sig", + ); + } + PayloadAttestationError::BeaconChainError(_) + | PayloadAttestationError::BeaconStateError(_) => { + debug!( + %peer_id, + %message_slot, + ?error, + "Internal error verifying payload attestation" + ); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + } + } } } diff --git a/consensus/types/src/core/consts.rs b/consensus/types/src/core/consts.rs index 049094da76..211208fc80 100644 --- a/consensus/types/src/core/consts.rs +++ b/consensus/types/src/core/consts.rs @@ -38,4 +38,5 @@ pub mod gloas { pub const ATTESTATION_TIMELINESS_INDEX: usize = 0; pub const PTC_TIMELINESS_INDEX: usize = 1; pub const NUM_BLOCK_TIMELINESS_DEADLINES: usize = 2; + pub const PTC_SIZE: u64 = 512; }