diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index cf5afb089a..9da64888c2 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -61,6 +61,7 @@ use crate::observed_data_sidecars::ObservedDataSidecars; use crate::observed_operations::{ObservationOutcome, ObservedOperations}; use crate::observed_slashable::ObservedSlashable; use crate::partial_data_column_assembler::PartialMergeResult; +use crate::payload_attestation_verification::VerifiedPayloadAttestationMessage; use crate::payload_bid_verification::payload_bid_cache::GossipVerifiedPayloadBidCache; #[cfg(not(test))] use crate::payload_envelope_streamer::{EnvelopeRequestSource, launch_payload_envelope_stream}; @@ -2328,6 +2329,17 @@ impl BeaconChain { .map_err(Into::into) } + /// Add a verified payload attestation message to the operation pool for block inclusion. + pub fn add_payload_attestation_to_pool( + &self, + verified: &VerifiedPayloadAttestationMessage, + ) -> Result<(), Error> { + self.op_pool + .insert_payload_attestation_message(verified.payload_attestation_message().clone()) + .map_err(Error::OpPoolError)?; + Ok(()) + } + /// Accepts some `SyncCommitteeMessage` from the network and attempts to verify it, returning `Ok(_)` if /// it is valid to be (re)broadcast on the gossip network. pub fn verify_sync_committee_message_for_gossip( diff --git a/beacon_node/beacon_chain/src/block_production/gloas.rs b/beacon_node/beacon_chain/src/block_production/gloas.rs index 9b3fc2806e..4bc4b9862c 100644 --- a/beacon_node/beacon_chain/src/block_production/gloas.rs +++ b/beacon_node/beacon_chain/src/block_production/gloas.rs @@ -9,9 +9,10 @@ use execution_layer::{ use fork_choice::PayloadStatus; use operation_pool::CompactAttestationRef; use ssz::Encode; -use state_processing::common::get_attesting_indices_from_state; +use state_processing::common::{get_attesting_indices_from_state, get_indexed_payload_attestation}; use state_processing::envelope_processing::verify_execution_payload_envelope; use state_processing::epoch_cache::initialize_epoch_cache; +use state_processing::per_block_processing::is_valid_indexed_payload_attestation; use state_processing::per_block_processing::{ apply_parent_execution_payload, compute_timestamp_at_slot, get_expected_withdrawals, verify_attestation_for_block_inclusion, @@ -319,6 +320,11 @@ impl BeaconChain { .map_err(BlockProductionError::OpPoolError)? }; + let mut payload_attestations = self + .op_pool + .get_payload_attestations(&state, parent_root, &self.spec) + .map_err(BlockProductionError::OpPoolError)?; + // If paranoid mode is enabled re-check the signatures of every included message. // This will be a lot slower but guards against bugs in block production and can be // quickly rolled out without a release. @@ -343,6 +349,35 @@ impl BeaconChain { .is_ok() }); + payload_attestations.retain(|att| { + match get_indexed_payload_attestation(&state, att, &self.spec) { + Ok(indexed) => is_valid_indexed_payload_attestation( + &state, + &indexed, + VerifySignatures::True, + &self.spec, + ) + .map_err(|e| { + warn!( + err = ?e, + block_slot = %state.slot(), + ?att, + "Attempted to include a payload attestation with invalid signature" + ); + }) + .is_ok(), + Err(e) => { + warn!( + err = ?e, + block_slot = %state.slot(), + ?att, + "Failed to index payload attestation for verification" + ); + false + } + } + }); + proposer_slashings.retain(|slashing| { slashing .clone() @@ -386,8 +421,6 @@ impl BeaconChain { }) .is_ok() }); - - // TODO(gloas) verify payload attestation signature here as well } let attester_slashings = attester_slashings @@ -434,8 +467,7 @@ impl BeaconChain { deposits, voluntary_exits, sync_aggregate, - // TODO(gloas) need to implement payload attestations - payload_attestations: vec![], + payload_attestations, bls_to_execution_changes, }, state, diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 4083b1a3af..29306c198d 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -140,11 +140,6 @@ struct RejectedAggregate { error: AttnError, } -struct RejectedPayloadAttestation { - payload_attestation_message: Box, - error: PayloadAttestationError, -} - /// Data for an aggregated or unaggregated attestation that failed verification. enum FailedAtt { Unaggregate { @@ -4111,25 +4106,20 @@ impl NetworkBeaconProcessor { peer_id: PeerId, payload_attestation_message: Box, ) { - let result = match self + let message_slot = payload_attestation_message.data.slot; + let result = self .chain - .verify_payload_attestation_message_for_gossip(*payload_attestation_message.clone()) - { - Ok(verified) => Ok(verified), - Err(error) => Err(RejectedPayloadAttestation { - payload_attestation_message: payload_attestation_message.clone(), - error, - }), - }; + .verify_payload_attestation_message_for_gossip(*payload_attestation_message); - self.process_gossip_payload_attestation_result(result, message_id, peer_id); + self.process_gossip_payload_attestation_result(result, message_id, peer_id, message_slot); } fn process_gossip_payload_attestation_result( self: &Arc, - result: Result, RejectedPayloadAttestation>, + result: Result, PayloadAttestationError>, message_id: MessageId, peer_id: PeerId, + message_slot: Slot, ) { match result { Ok(verified) => { @@ -4156,16 +4146,21 @@ impl NetworkBeaconProcessor { ), } } + + if let Err(e) = self.chain.add_payload_attestation_to_pool(&verified) { + warn!( + reason = ?e, + %peer_id, + "Failed to add payload attestation to pool" + ); + } } - Err(RejectedPayloadAttestation { - payload_attestation_message, - error, - }) => { + Err(error) => { self.handle_payload_attestation_verification_failure( peer_id, message_id, error, - payload_attestation_message.data.slot, + message_slot, ); } } diff --git a/beacon_node/operation_pool/src/lib.rs b/beacon_node/operation_pool/src/lib.rs index 4b815704d9..de5fe9a098 100644 --- a/beacon_node/operation_pool/src/lib.rs +++ b/beacon_node/operation_pool/src/lib.rs @@ -23,10 +23,12 @@ use crate::attestation_storage::{AttestationMap, CheckpointKey}; use crate::bls_to_execution_changes::BlsToExecutionChanges; use crate::sync_aggregate_id::SyncAggregateId; use attester_slashing::AttesterSlashingMaxCover; +use bls::AggregateSignature; use max_cover::maximum_cover; use parking_lot::{RwLock, RwLockWriteGuard}; use rand::rng; use rand::seq::SliceRandom; +use ssz::BitVector; use state_processing::per_block_processing::errors::AttestationValidationError; use state_processing::per_block_processing::{ VerifySignatures, get_slashable_indices_modular, verify_exit, @@ -38,7 +40,8 @@ use std::ptr; use typenum::Unsigned; use types::{ AbstractExecPayload, Attestation, AttestationData, AttesterSlashing, BeaconState, - BeaconStateError, ChainSpec, Epoch, EthSpec, ProposerSlashing, SignedBeaconBlock, + BeaconStateError, ChainSpec, Epoch, EthSpec, Hash256, PayloadAttestation, + PayloadAttestationData, PayloadAttestationMessage, ProposerSlashing, SignedBeaconBlock, SignedBlsToExecutionChange, SignedVoluntaryExit, Slot, SyncAggregate, SyncAggregateError, SyncCommitteeContribution, Validator, }; @@ -59,6 +62,9 @@ pub struct OperationPool { voluntary_exits: RwLock>>, /// Map from credential changing validator to their position in the queue. bls_to_execution_changes: RwLock>, + /// Map from payload attestation data to individual messages for aggregation at block production. + payload_attestation_messages: + RwLock>>, /// Reward cache for accelerating attestation packing. reward_cache: RwLock, _phantom: PhantomData, @@ -78,6 +84,8 @@ pub enum OpPoolError { IncorrectOpPoolVariant, EpochCacheNotInitialized, EpochCacheError(EpochCacheError), + GetPtcError(BeaconStateError), + PayloadAttestationBitError, } #[derive(Default)] @@ -193,6 +201,100 @@ impl OperationPool { }); } + /// Insert a validated `PayloadAttestationMessage` into the pool. + pub fn insert_payload_attestation_message( + &self, + message: PayloadAttestationMessage, + ) -> Result<(), OpPoolError> { + let mut messages = self.payload_attestation_messages.write(); + let entry = messages.entry(message.data.clone()).or_default(); + if !entry + .iter() + .any(|m| m.validator_index == message.validator_index) + { + entry.push(message); + } + Ok(()) + } + + /// Build `PayloadAttestation`s from stored messages for block production. + /// + /// `parent_block_root` is the root of the parent block (the block PTC members attested to). + /// Returns one `PayloadAttestation` per distinct `PayloadAttestationData`. With two boolean + /// fields this yields at most 4, capped to `MaxPayloadAttestations`. + pub fn get_payload_attestations( + &self, + state: &BeaconState, + parent_block_root: Hash256, + spec: &ChainSpec, + ) -> Result>, OpPoolError> { + let target_slot = state.slot().saturating_sub(1u64); + + let ptc = state + .get_ptc(target_slot, spec) + .map_err(OpPoolError::GetPtcError)?; + + let messages = self.payload_attestation_messages.read(); + let mut result = Vec::new(); + + for (data, msgs) in messages.iter() { + if data.slot != target_slot || data.beacon_block_root != parent_block_root { + continue; + } + + let mut aggregation_bits = BitVector::new(); + let mut aggregate_sig = AggregateSignature::infinity(); + + for msg in msgs { + if let Some(pos) = ptc + .0 + .iter() + .position(|&idx| idx == msg.validator_index as usize) + && !aggregation_bits.get(pos).unwrap_or(false) + { + aggregation_bits + .set(pos, true) + .map_err(|_| OpPoolError::PayloadAttestationBitError)?; + aggregate_sig.add_assign(&msg.signature); + } + } + + if aggregation_bits.num_set_bits() > 0 { + result.push(PayloadAttestation { + aggregation_bits, + data: data.clone(), + signature: aggregate_sig, + }); + } + } + + // Prefer most participation and cap by `max_payload_attestations` + result.sort_by(|a, b| { + b.aggregation_bits + .num_set_bits() + .cmp(&a.aggregation_bits.num_set_bits()) + }); + result.truncate(E::max_payload_attestations()); + + Ok(result) + } + + /// Remove payload attestation messages that are too old for block inclusion. + pub fn prune_payload_attestation_messages(&self, current_slot: Slot) { + self.payload_attestation_messages + .write() + .retain(|data, _| current_slot <= data.slot.saturating_add(Slot::new(1))); + } + + /// Total number of payload attestation messages in the pool. + pub fn num_payload_attestation_messages(&self) -> usize { + self.payload_attestation_messages + .read() + .values() + .map(|msgs| msgs.len()) + .sum() + } + /// Insert an attestation into the pool, aggregating it with existing attestations if possible. /// /// ## Note @@ -646,6 +748,7 @@ impl OperationPool { ) { self.prune_attestations(current_epoch); self.prune_sync_contributions(head_state.slot()); + self.prune_payload_attestation_messages(head_state.slot()); self.prune_proposer_slashings(finalized_state); self.prune_attester_slashings(finalized_state); self.prune_voluntary_exits(finalized_state, spec); @@ -2075,4 +2178,214 @@ mod release_tests { op_pool.prune_attester_slashings(&electra_head.beacon_state); assert_eq!(op_pool.attester_slashings.read().len(), 1); } + + fn make_payload_attestation_message( + slot: Slot, + validator_index: u64, + beacon_block_root: Hash256, + ) -> PayloadAttestationMessage { + make_payload_attestation_message_with_flags( + slot, + validator_index, + beacon_block_root, + true, + true, + ) + } + + fn make_payload_attestation_message_with_flags( + slot: Slot, + validator_index: u64, + beacon_block_root: Hash256, + payload_present: bool, + blob_data_available: bool, + ) -> PayloadAttestationMessage { + PayloadAttestationMessage { + validator_index, + data: PayloadAttestationData { + beacon_block_root, + slot, + payload_present, + blob_data_available, + }, + signature: bls::Signature::empty(), + } + } + + #[test] + fn payload_attestation_insert_and_dedup() { + let op_pool = OperationPool::::new(); + let root = Hash256::repeat_byte(0xaa); + let slot = Slot::new(1); + + let msg1 = make_payload_attestation_message(slot, 0, root); + let msg2 = make_payload_attestation_message(slot, 1, root); + let msg1_dup = make_payload_attestation_message(slot, 0, root); + + op_pool.insert_payload_attestation_message(msg1).unwrap(); + op_pool.insert_payload_attestation_message(msg2).unwrap(); + op_pool + .insert_payload_attestation_message(msg1_dup) + .unwrap(); + + assert_eq!(op_pool.num_payload_attestation_messages(), 2); + } + + #[test] + fn payload_attestation_prune() { + let op_pool = OperationPool::::new(); + let root = Hash256::repeat_byte(0xaa); + + let msg_slot1 = make_payload_attestation_message(Slot::new(1), 0, root); + let msg_slot2 = make_payload_attestation_message(Slot::new(2), 1, root); + let msg_slot3 = make_payload_attestation_message(Slot::new(3), 2, root); + + op_pool + .insert_payload_attestation_message(msg_slot1) + .unwrap(); + op_pool + .insert_payload_attestation_message(msg_slot2) + .unwrap(); + op_pool + .insert_payload_attestation_message(msg_slot3) + .unwrap(); + + assert_eq!(op_pool.num_payload_attestation_messages(), 3); + + op_pool.prune_payload_attestation_messages(Slot::new(3)); + assert_eq!(op_pool.num_payload_attestation_messages(), 2); + + op_pool.prune_payload_attestation_messages(Slot::new(4)); + assert_eq!(op_pool.num_payload_attestation_messages(), 1); + + op_pool.prune_payload_attestation_messages(Slot::new(5)); + assert_eq!(op_pool.num_payload_attestation_messages(), 0); + } + + #[tokio::test] + async fn payload_attestation_packs_bits_from_ptc_positions() { + let spec = test_spec::(); + if spec.gloas_fork_epoch.is_none() { + return; + }; + + let num_validators = 64; + let harness = get_harness::(num_validators, Some(spec.clone())); + + harness + .add_attested_blocks_at_slots( + harness.get_current_state(), + Hash256::zero(), + &[Slot::new(1)], + (0..num_validators).collect::>().as_slice(), + ) + .await; + + let head = harness.chain.canonical_head.cached_head(); + let state = &head.snapshot.beacon_state; + assert_eq!(state.slot(), Slot::new(1)); + + let target_slot = Slot::new(1); + let parent_root = head.head_block_root(); + let ptc = state.get_ptc(target_slot, &spec).unwrap(); + let ptc_member_0 = ptc.0[0] as u64; + let ptc_member_1 = ptc.0[1] as u64; + + let op_pool = OperationPool::::new(); + + let msg0 = make_payload_attestation_message(target_slot, ptc_member_0, parent_root); + let msg1 = make_payload_attestation_message(target_slot, ptc_member_1, parent_root); + op_pool.insert_payload_attestation_message(msg0).unwrap(); + op_pool.insert_payload_attestation_message(msg1).unwrap(); + + // Advance state to slot 2 so get_payload_attestations looks at slot 1. + let mut advanced_state = state.clone(); + state_processing::state_advance::complete_state_advance( + &mut advanced_state, + None, + Slot::new(2), + &spec, + ) + .unwrap(); + + let attestations = op_pool + .get_payload_attestations(&advanced_state, parent_root, &spec) + .unwrap(); + + assert_eq!(attestations.len(), 1); + assert_eq!(attestations[0].aggregation_bits.num_set_bits(), 2); + assert!(attestations[0].aggregation_bits.get(0).unwrap()); + assert!(attestations[0].aggregation_bits.get(1).unwrap()); + assert!(attestations[0].data.payload_present); + } + + #[tokio::test] + async fn payload_attestation_multiple_data_combos_capped() { + let spec = test_spec::(); + if spec.gloas_fork_epoch.is_none() { + return; + }; + + let num_validators = 64; + let harness = get_harness::(num_validators, Some(spec.clone())); + + harness + .add_attested_blocks_at_slots( + harness.get_current_state(), + Hash256::zero(), + &[Slot::new(1)], + (0..num_validators).collect::>().as_slice(), + ) + .await; + + let head = harness.chain.canonical_head.cached_head(); + let state = &head.snapshot.beacon_state; + let target_slot = Slot::new(1); + let parent_root = head.head_block_root(); + let ptc = state.get_ptc(target_slot, &spec).unwrap(); + + let op_pool = OperationPool::::new(); + + // Given: PTC members vote with all 4 boolean combos, with varying participation. + let combos: [(bool, bool, &[usize]); 4] = [ + (true, true, &[0, 1, 2]), + (true, false, &[3, 4]), + (false, true, &[5]), + (false, false, &[6]), + ]; + for (payload_present, blob_available, positions) in &combos { + for &pos in *positions { + let validator_index = ptc.0[pos] as u64; + let msg = make_payload_attestation_message_with_flags( + target_slot, + validator_index, + parent_root, + *payload_present, + *blob_available, + ); + op_pool.insert_payload_attestation_message(msg).unwrap(); + } + } + + // When: we pack attestations for block production at slot 2. + let mut advanced_state = state.clone(); + state_processing::state_advance::complete_state_advance( + &mut advanced_state, + None, + Slot::new(2), + &spec, + ) + .unwrap(); + let attestations = op_pool + .get_payload_attestations(&advanced_state, parent_root, &spec) + .unwrap(); + + // Then: one attestation per combo, sorted by participation (most first). + assert_eq!(attestations.len(), 4); + let bit_counts: Vec<_> = attestations + .iter() + .map(|a| a.aggregation_bits.num_set_bits()) + .collect(); + assert_eq!(bit_counts, vec![3, 2, 1, 1]); + } } diff --git a/beacon_node/operation_pool/src/persistence.rs b/beacon_node/operation_pool/src/persistence.rs index 241b5fec53..56aafc27fe 100644 --- a/beacon_node/operation_pool/src/persistence.rs +++ b/beacon_node/operation_pool/src/persistence.rs @@ -209,6 +209,7 @@ impl PersistedOperationPool { proposer_slashings, voluntary_exits, bls_to_execution_changes: RwLock::new(bls_to_execution_changes), + payload_attestation_messages: Default::default(), reward_cache: Default::default(), _phantom: Default::default(), }; diff --git a/validator_client/validator_services/src/payload_attestation_service.rs b/validator_client/validator_services/src/payload_attestation_service.rs index 2f3ca8bed2..24949edc1f 100644 --- a/validator_client/validator_services/src/payload_attestation_service.rs +++ b/validator_client/validator_services/src/payload_attestation_service.rs @@ -101,10 +101,15 @@ impl PayloadAttestationServ sleep(duration_to_next_slot + payload_attestation_due).await; + let Some(attestation_slot) = self.slot_clock.now() else { + error!("Failed to read slot clock after sleep"); + continue; + }; + let service = self.clone(); self.executor.spawn( async move { - service.produce_and_publish(current_slot).await; + service.produce_and_publish(attestation_slot).await; }, "payload_attestation_producer", );