Add payload attestation to op pool and pack into block (#9180)

Store gossip-verified `PayloadAttestationMessage`s in the operation pool and pack them into the block body at during block production.

Built on top of #9145.


  


Co-Authored-By: Jimmy Chen <jchen.tc@gmail.com>
This commit is contained in:
Jimmy Chen
2026-04-28 12:49:28 +02:00
committed by GitHub
parent e35a671303
commit d8790f6677
6 changed files with 386 additions and 28 deletions

View File

@@ -23,10 +23,12 @@ use crate::attestation_storage::{AttestationMap, CheckpointKey};
use crate::bls_to_execution_changes::BlsToExecutionChanges;
use crate::sync_aggregate_id::SyncAggregateId;
use attester_slashing::AttesterSlashingMaxCover;
use bls::AggregateSignature;
use max_cover::maximum_cover;
use parking_lot::{RwLock, RwLockWriteGuard};
use rand::rng;
use rand::seq::SliceRandom;
use ssz::BitVector;
use state_processing::per_block_processing::errors::AttestationValidationError;
use state_processing::per_block_processing::{
VerifySignatures, get_slashable_indices_modular, verify_exit,
@@ -38,7 +40,8 @@ use std::ptr;
use typenum::Unsigned;
use types::{
AbstractExecPayload, Attestation, AttestationData, AttesterSlashing, BeaconState,
BeaconStateError, ChainSpec, Epoch, EthSpec, ProposerSlashing, SignedBeaconBlock,
BeaconStateError, ChainSpec, Epoch, EthSpec, Hash256, PayloadAttestation,
PayloadAttestationData, PayloadAttestationMessage, ProposerSlashing, SignedBeaconBlock,
SignedBlsToExecutionChange, SignedVoluntaryExit, Slot, SyncAggregate, SyncAggregateError,
SyncCommitteeContribution, Validator,
};
@@ -59,6 +62,9 @@ pub struct OperationPool<E: EthSpec + Default> {
voluntary_exits: RwLock<HashMap<u64, SigVerifiedOp<SignedVoluntaryExit, E>>>,
/// Map from credential changing validator to their position in the queue.
bls_to_execution_changes: RwLock<BlsToExecutionChanges<E>>,
/// Map from payload attestation data to individual messages for aggregation at block production.
payload_attestation_messages:
RwLock<HashMap<PayloadAttestationData, Vec<PayloadAttestationMessage>>>,
/// Reward cache for accelerating attestation packing.
reward_cache: RwLock<RewardCache>,
_phantom: PhantomData<E>,
@@ -78,6 +84,8 @@ pub enum OpPoolError {
IncorrectOpPoolVariant,
EpochCacheNotInitialized,
EpochCacheError(EpochCacheError),
GetPtcError(BeaconStateError),
PayloadAttestationBitError,
}
#[derive(Default)]
@@ -193,6 +201,100 @@ impl<E: EthSpec> OperationPool<E> {
});
}
/// Insert a validated `PayloadAttestationMessage` into the pool.
pub fn insert_payload_attestation_message(
&self,
message: PayloadAttestationMessage,
) -> Result<(), OpPoolError> {
let mut messages = self.payload_attestation_messages.write();
let entry = messages.entry(message.data.clone()).or_default();
if !entry
.iter()
.any(|m| m.validator_index == message.validator_index)
{
entry.push(message);
}
Ok(())
}
/// Build `PayloadAttestation`s from stored messages for block production.
///
/// `parent_block_root` is the root of the parent block (the block PTC members attested to).
/// Returns one `PayloadAttestation` per distinct `PayloadAttestationData`. With two boolean
/// fields this yields at most 4, capped to `MaxPayloadAttestations`.
pub fn get_payload_attestations(
&self,
state: &BeaconState<E>,
parent_block_root: Hash256,
spec: &ChainSpec,
) -> Result<Vec<PayloadAttestation<E>>, OpPoolError> {
let target_slot = state.slot().saturating_sub(1u64);
let ptc = state
.get_ptc(target_slot, spec)
.map_err(OpPoolError::GetPtcError)?;
let messages = self.payload_attestation_messages.read();
let mut result = Vec::new();
for (data, msgs) in messages.iter() {
if data.slot != target_slot || data.beacon_block_root != parent_block_root {
continue;
}
let mut aggregation_bits = BitVector::new();
let mut aggregate_sig = AggregateSignature::infinity();
for msg in msgs {
if let Some(pos) = ptc
.0
.iter()
.position(|&idx| idx == msg.validator_index as usize)
&& !aggregation_bits.get(pos).unwrap_or(false)
{
aggregation_bits
.set(pos, true)
.map_err(|_| OpPoolError::PayloadAttestationBitError)?;
aggregate_sig.add_assign(&msg.signature);
}
}
if aggregation_bits.num_set_bits() > 0 {
result.push(PayloadAttestation {
aggregation_bits,
data: data.clone(),
signature: aggregate_sig,
});
}
}
// Prefer most participation and cap by `max_payload_attestations`
result.sort_by(|a, b| {
b.aggregation_bits
.num_set_bits()
.cmp(&a.aggregation_bits.num_set_bits())
});
result.truncate(E::max_payload_attestations());
Ok(result)
}
/// Remove payload attestation messages that are too old for block inclusion.
pub fn prune_payload_attestation_messages(&self, current_slot: Slot) {
self.payload_attestation_messages
.write()
.retain(|data, _| current_slot <= data.slot.saturating_add(Slot::new(1)));
}
/// Total number of payload attestation messages in the pool.
pub fn num_payload_attestation_messages(&self) -> usize {
self.payload_attestation_messages
.read()
.values()
.map(|msgs| msgs.len())
.sum()
}
/// Insert an attestation into the pool, aggregating it with existing attestations if possible.
///
/// ## Note
@@ -646,6 +748,7 @@ impl<E: EthSpec> OperationPool<E> {
) {
self.prune_attestations(current_epoch);
self.prune_sync_contributions(head_state.slot());
self.prune_payload_attestation_messages(head_state.slot());
self.prune_proposer_slashings(finalized_state);
self.prune_attester_slashings(finalized_state);
self.prune_voluntary_exits(finalized_state, spec);
@@ -2075,4 +2178,214 @@ mod release_tests {
op_pool.prune_attester_slashings(&electra_head.beacon_state);
assert_eq!(op_pool.attester_slashings.read().len(), 1);
}
fn make_payload_attestation_message(
slot: Slot,
validator_index: u64,
beacon_block_root: Hash256,
) -> PayloadAttestationMessage {
make_payload_attestation_message_with_flags(
slot,
validator_index,
beacon_block_root,
true,
true,
)
}
fn make_payload_attestation_message_with_flags(
slot: Slot,
validator_index: u64,
beacon_block_root: Hash256,
payload_present: bool,
blob_data_available: bool,
) -> PayloadAttestationMessage {
PayloadAttestationMessage {
validator_index,
data: PayloadAttestationData {
beacon_block_root,
slot,
payload_present,
blob_data_available,
},
signature: bls::Signature::empty(),
}
}
#[test]
fn payload_attestation_insert_and_dedup() {
let op_pool = OperationPool::<MinimalEthSpec>::new();
let root = Hash256::repeat_byte(0xaa);
let slot = Slot::new(1);
let msg1 = make_payload_attestation_message(slot, 0, root);
let msg2 = make_payload_attestation_message(slot, 1, root);
let msg1_dup = make_payload_attestation_message(slot, 0, root);
op_pool.insert_payload_attestation_message(msg1).unwrap();
op_pool.insert_payload_attestation_message(msg2).unwrap();
op_pool
.insert_payload_attestation_message(msg1_dup)
.unwrap();
assert_eq!(op_pool.num_payload_attestation_messages(), 2);
}
#[test]
fn payload_attestation_prune() {
let op_pool = OperationPool::<MinimalEthSpec>::new();
let root = Hash256::repeat_byte(0xaa);
let msg_slot1 = make_payload_attestation_message(Slot::new(1), 0, root);
let msg_slot2 = make_payload_attestation_message(Slot::new(2), 1, root);
let msg_slot3 = make_payload_attestation_message(Slot::new(3), 2, root);
op_pool
.insert_payload_attestation_message(msg_slot1)
.unwrap();
op_pool
.insert_payload_attestation_message(msg_slot2)
.unwrap();
op_pool
.insert_payload_attestation_message(msg_slot3)
.unwrap();
assert_eq!(op_pool.num_payload_attestation_messages(), 3);
op_pool.prune_payload_attestation_messages(Slot::new(3));
assert_eq!(op_pool.num_payload_attestation_messages(), 2);
op_pool.prune_payload_attestation_messages(Slot::new(4));
assert_eq!(op_pool.num_payload_attestation_messages(), 1);
op_pool.prune_payload_attestation_messages(Slot::new(5));
assert_eq!(op_pool.num_payload_attestation_messages(), 0);
}
#[tokio::test]
async fn payload_attestation_packs_bits_from_ptc_positions() {
let spec = test_spec::<MinimalEthSpec>();
if spec.gloas_fork_epoch.is_none() {
return;
};
let num_validators = 64;
let harness = get_harness::<MinimalEthSpec>(num_validators, Some(spec.clone()));
harness
.add_attested_blocks_at_slots(
harness.get_current_state(),
Hash256::zero(),
&[Slot::new(1)],
(0..num_validators).collect::<Vec<_>>().as_slice(),
)
.await;
let head = harness.chain.canonical_head.cached_head();
let state = &head.snapshot.beacon_state;
assert_eq!(state.slot(), Slot::new(1));
let target_slot = Slot::new(1);
let parent_root = head.head_block_root();
let ptc = state.get_ptc(target_slot, &spec).unwrap();
let ptc_member_0 = ptc.0[0] as u64;
let ptc_member_1 = ptc.0[1] as u64;
let op_pool = OperationPool::<MinimalEthSpec>::new();
let msg0 = make_payload_attestation_message(target_slot, ptc_member_0, parent_root);
let msg1 = make_payload_attestation_message(target_slot, ptc_member_1, parent_root);
op_pool.insert_payload_attestation_message(msg0).unwrap();
op_pool.insert_payload_attestation_message(msg1).unwrap();
// Advance state to slot 2 so get_payload_attestations looks at slot 1.
let mut advanced_state = state.clone();
state_processing::state_advance::complete_state_advance(
&mut advanced_state,
None,
Slot::new(2),
&spec,
)
.unwrap();
let attestations = op_pool
.get_payload_attestations(&advanced_state, parent_root, &spec)
.unwrap();
assert_eq!(attestations.len(), 1);
assert_eq!(attestations[0].aggregation_bits.num_set_bits(), 2);
assert!(attestations[0].aggregation_bits.get(0).unwrap());
assert!(attestations[0].aggregation_bits.get(1).unwrap());
assert!(attestations[0].data.payload_present);
}
#[tokio::test]
async fn payload_attestation_multiple_data_combos_capped() {
let spec = test_spec::<MinimalEthSpec>();
if spec.gloas_fork_epoch.is_none() {
return;
};
let num_validators = 64;
let harness = get_harness::<MinimalEthSpec>(num_validators, Some(spec.clone()));
harness
.add_attested_blocks_at_slots(
harness.get_current_state(),
Hash256::zero(),
&[Slot::new(1)],
(0..num_validators).collect::<Vec<_>>().as_slice(),
)
.await;
let head = harness.chain.canonical_head.cached_head();
let state = &head.snapshot.beacon_state;
let target_slot = Slot::new(1);
let parent_root = head.head_block_root();
let ptc = state.get_ptc(target_slot, &spec).unwrap();
let op_pool = OperationPool::<MinimalEthSpec>::new();
// Given: PTC members vote with all 4 boolean combos, with varying participation.
let combos: [(bool, bool, &[usize]); 4] = [
(true, true, &[0, 1, 2]),
(true, false, &[3, 4]),
(false, true, &[5]),
(false, false, &[6]),
];
for (payload_present, blob_available, positions) in &combos {
for &pos in *positions {
let validator_index = ptc.0[pos] as u64;
let msg = make_payload_attestation_message_with_flags(
target_slot,
validator_index,
parent_root,
*payload_present,
*blob_available,
);
op_pool.insert_payload_attestation_message(msg).unwrap();
}
}
// When: we pack attestations for block production at slot 2.
let mut advanced_state = state.clone();
state_processing::state_advance::complete_state_advance(
&mut advanced_state,
None,
Slot::new(2),
&spec,
)
.unwrap();
let attestations = op_pool
.get_payload_attestations(&advanced_state, parent_root, &spec)
.unwrap();
// Then: one attestation per combo, sorted by participation (most first).
assert_eq!(attestations.len(), 4);
let bit_counts: Vec<_> = attestations
.iter()
.map(|a| a.aggregation_bits.num_set_bits())
.collect();
assert_eq!(bit_counts, vec![3, 2, 1, 1]);
}
}

View File

@@ -209,6 +209,7 @@ impl<E: EthSpec> PersistedOperationPool<E> {
proposer_slashings,
voluntary_exits,
bls_to_execution_changes: RwLock::new(bls_to_execution_changes),
payload_attestation_messages: Default::default(),
reward_cache: Default::default(),
_phantom: Default::default(),
};