wiring up process_gossip_payload_attestation and implement observe cache

This commit is contained in:
hopinheimer
2026-04-17 19:43:49 -04:00
parent 036d9c995d
commit 4bbc74cf59
6 changed files with 219 additions and 14 deletions

View File

@@ -48,12 +48,16 @@ use crate::observed_aggregates::{
Error as AttestationObservationError, ObservedAggregateAttestations, ObservedSyncContributions,
};
use crate::observed_attesters::{
ObservedAggregators, ObservedAttesters, ObservedSyncAggregators, ObservedSyncContributors,
ObservedAggregators, ObservedAttesters, ObservedPayloadAttesters, ObservedSyncAggregators,
ObservedSyncContributors,
};
use crate::observed_block_producers::ObservedBlockProducers;
use crate::observed_data_sidecars::ObservedDataSidecars;
use crate::observed_operations::{ObservationOutcome, ObservedOperations};
use crate::observed_slashable::ObservedSlashable;
use crate::payload_attestation_verification::{
Error as PayloadAttestationError, VerifiedPayloadAttestationMessage,
};
use crate::payload_bid_verification::payload_bid_cache::GossipVerifiedPayloadBidCache;
#[cfg(not(test))]
use crate::payload_envelope_streamer::{EnvelopeRequestSource, launch_payload_envelope_stream};
@@ -412,6 +416,9 @@ pub struct BeaconChain<T: BeaconChainTypes> {
/// Maintains a record of which validators have been seen to create `SignedContributionAndProofs`
/// in recent epochs.
pub(crate) observed_sync_aggregators: RwLock<ObservedSyncAggregators<T::EthSpec>>,
/// Maintains a record of which validators have sent payload attestation messages
/// in recent slots.
pub(crate) observed_payload_attesters: RwLock<ObservedPayloadAttesters<T::EthSpec>>,
/// Maintains a record of which validators have proposed blocks for each slot.
pub observed_block_producers: RwLock<ObservedBlockProducers<T::EthSpec>>,
/// Maintains a record of blob sidecars seen over the gossip network.
@@ -2194,6 +2201,34 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
})
}
pub fn verify_payload_attestation_message_for_gossip(
&self,
payload_attestation_message: PayloadAttestationMessage,
) -> Result<VerifiedPayloadAttestationMessage<T::EthSpec>, PayloadAttestationError> {
metrics::inc_counter(&metrics::PAYLOAD_ATTESTATION_PROCESSING_REQUESTS);
let _timer = metrics::start_timer(&metrics::PAYLOAD_ATTESTATION_GOSSIP_VERIFICATION_TIMES);
VerifiedPayloadAttestationMessage::verify(payload_attestation_message, self).inspect(|_| {
metrics::inc_counter(&metrics::PAYLOAD_ATTESTATION_PROCESSING_SUCCESSES);
})
}
pub fn apply_payload_attestation_to_fork_choice(
&self,
indexed_payload_attestation: &IndexedPayloadAttestation<T::EthSpec>,
ptc: &PTC<T::EthSpec>,
) -> Result<(), Error> {
self.canonical_head
.fork_choice_write_lock()
.on_payload_attestation(
self.slot()?,
indexed_payload_attestation,
AttestationFromBlock::False,
&ptc.0,
)
.map_err(Into::into)
}
/// Accepts some `SyncCommitteeMessage` from the network and attempts to verify it, returning `Ok(_)` if
/// it is valid to be (re)broadcast on the gossip network.
pub fn verify_sync_committee_message_for_gossip(

View File

@@ -1010,6 +1010,7 @@ where
observed_aggregators: <_>::default(),
// TODO: allow for persisting and loading the pool from disk.
observed_sync_aggregators: <_>::default(),
observed_payload_attesters: <_>::default(),
// TODO: allow for persisting and loading the pool from disk.
observed_block_producers: <_>::default(),
observed_column_sidecars: RwLock::new(ObservedDataSidecars::new(self.spec.clone())),

View File

@@ -43,6 +43,7 @@ pub mod observed_block_producers;
pub mod observed_data_sidecars;
pub mod observed_operations;
mod observed_slashable;
pub mod payload_attestation_verification;
pub mod payload_bid_verification;
pub mod payload_envelope_streamer;
pub mod payload_envelope_verification;

View File

@@ -14,7 +14,7 @@
//! - `ObservedSyncAggregators`: allows filtering sync committee contributions from the same aggregators in
//! the same slot and in the same subcommittee.
use crate::types::consts::altair::TARGET_AGGREGATORS_PER_SYNC_SUBCOMMITTEE;
use crate::types::consts::{altair::TARGET_AGGREGATORS_PER_SYNC_SUBCOMMITTEE, gloas::PTC_SIZE};
use bitvec::vec::BitVec;
use std::collections::{HashMap, HashSet};
use std::hash::Hash;
@@ -42,6 +42,8 @@ pub type ObservedSyncContributors<E> =
pub type ObservedAggregators<E> = AutoPruningEpochContainer<EpochHashSet, E>;
pub type ObservedSyncAggregators<E> =
AutoPruningSlotContainer<SlotSubcommitteeIndex, (), SyncAggregatorSlotHashSet, E>;
pub type ObservedPayloadAttesters<E> =
AutoPruningSlotContainer<Slot, (), PayloadAttesterSlotHashSet, E>;
#[derive(Debug, PartialEq)]
pub enum Error {
@@ -255,6 +257,44 @@ impl Item<()> for SyncAggregatorSlotHashSet {
}
}
/// Stores a `HashSet` of validator indices that have sent a payload attestation gossip
/// message during a slot.
pub struct PayloadAttesterSlotHashSet {
set: HashSet<usize>,
}
impl Item<()> for PayloadAttesterSlotHashSet {
fn with_capacity(capacity: usize) -> Self {
Self {
set: HashSet::with_capacity(capacity),
}
}
/// Defaults to `PTC_SIZE`, the maximum number of payload attesters per slot.
fn default_capacity() -> usize {
PTC_SIZE as usize
}
fn len(&self) -> usize {
self.set.len()
}
fn validator_count(&self) -> usize {
self.set.len()
}
/// Inserts the `validator_index` in the set. Returns `true` if the `validator_index` was
/// already in the set.
fn insert(&mut self, validator_index: usize, _value: ()) -> bool {
!self.set.insert(validator_index)
}
/// Returns `true` if the `validator_index` is in the set.
fn get(&self, validator_index: usize) -> Option<()> {
self.set.contains(&validator_index).then_some(())
}
}
/// A container that stores some number of `T` items.
///
/// This container is "auto-pruning" since it gets an idea of the current slot by which

View File

@@ -13,6 +13,9 @@ use beacon_chain::{
light_client_finality_update_verification::Error as LightClientFinalityUpdateError,
light_client_optimistic_update_verification::Error as LightClientOptimisticUpdateError,
observed_operations::ObservationOutcome,
payload_attestation_verification::{
Error as PayloadAttestationError, VerifiedPayloadAttestationMessage,
},
sync_committee_verification::{self, Error as SyncCommitteeError},
validator_monitor::{get_block_delay_ms, get_slot_delay_ms},
};
@@ -130,6 +133,11 @@ struct RejectedAggregate<E: EthSpec> {
error: AttnError,
}
struct RejectedPayloadAttestation {
error: PayloadAttestationError,
message_slot: Slot,
}
/// Data for an aggregated or unaggregated attestation that failed verification.
enum FailedAtt<E: EthSpec> {
Unaggregate {
@@ -3648,25 +3656,144 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
}
// TODO(gloas) dont forget to add tracing instrumentation
#[instrument(
level = "trace",
skip(self, message_id, peer_id, payload_attestation_message),
fields(
peer_id = %peer_id,
slot = %payload_attestation_message.data.slot,
validator_index = payload_attestation_message.validator_index,
)
)]
pub fn process_gossip_payload_attestation(
self: &Arc<Self>,
message_id: MessageId,
peer_id: PeerId,
payload_attestation_message: PayloadAttestationMessage,
) {
// TODO(EIP-7732): Implement proper payload attestation message gossip processing.
// This should integrate with a payload_attestation_verification.rs module once it's implemented.
let message_slot = payload_attestation_message.data.slot;
trace!(
%peer_id,
validator_index = payload_attestation_message.validator_index,
slot = %payload_attestation_message.data.slot,
beacon_block_root = %payload_attestation_message.data.beacon_block_root,
"Processing payload attestation message"
);
let result = self
.chain
.verify_payload_attestation_message_for_gossip(payload_attestation_message)
.map_err(|error| RejectedPayloadAttestation {
error,
message_slot,
});
// For now, ignore all payload attestation messages since verification is not implemented
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
self.process_gossip_payload_attestation_result(result, message_id, peer_id);
}
fn process_gossip_payload_attestation_result(
self: &Arc<Self>,
result: Result<VerifiedPayloadAttestationMessage<T::EthSpec>, RejectedPayloadAttestation>,
message_id: MessageId,
peer_id: PeerId,
) {
match result {
Ok(verified) => {
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);
if let Err(e) = self.chain.apply_payload_attestation_to_fork_choice(
verified.indexed_payload_attestation(),
verified.ptc(),
) {
match e {
BeaconChainError::ForkChoiceError(
ForkChoiceError::InvalidPayloadAttestation(e),
) => {
debug!(
reason = ?e,
%peer_id,
"Payload attestation invalid for fork choice"
)
}
e => error!(
reason = ?e,
%peer_id,
"Error applying payload attestation to fork choice"
),
}
}
}
Err(RejectedPayloadAttestation {
error,
message_slot,
}) => {
self.handle_payload_attestation_verification_failure(
peer_id,
message_id,
error,
message_slot,
);
}
}
}
fn handle_payload_attestation_verification_failure(
&self,
peer_id: PeerId,
message_id: MessageId,
error: PayloadAttestationError,
message_slot: Slot,
) {
match &error {
PayloadAttestationError::FutureSlot { .. } => {
self.gossip_penalize_peer(
peer_id,
PeerAction::HighToleranceError,
"payload_attn_future_slot",
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
}
PayloadAttestationError::PastSlot { .. } => {
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
}
PayloadAttestationError::PriorPayloadAttestationMessageKnown { .. } => {
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
}
PayloadAttestationError::UnknownHeadBlock { .. } => {
debug!(
%peer_id,
%message_slot,
"Payload attestation references unknown block"
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
}
PayloadAttestationError::NotInPTC { .. } => {
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
self.gossip_penalize_peer(
peer_id,
PeerAction::LowToleranceError,
"payload_attn_not_in_ptc",
);
}
PayloadAttestationError::UnknownValidatorIndex(_) => {
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
self.gossip_penalize_peer(
peer_id,
PeerAction::LowToleranceError,
"payload_attn_unknown_validator",
);
}
PayloadAttestationError::InvalidSignature => {
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
self.gossip_penalize_peer(
peer_id,
PeerAction::LowToleranceError,
"payload_attn_invalid_sig",
);
}
PayloadAttestationError::BeaconChainError(_)
| PayloadAttestationError::BeaconStateError(_) => {
debug!(
%peer_id,
%message_slot,
?error,
"Internal error verifying payload attestation"
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
}
}
}
}

View File

@@ -38,4 +38,5 @@ pub mod gloas {
pub const ATTESTATION_TIMELINESS_INDEX: usize = 0;
pub const PTC_TIMELINESS_INDEX: usize = 1;
pub const NUM_BLOCK_TIMELINESS_DEADLINES: usize = 2;
pub const PTC_SIZE: u64 = 512;
}