Add payload attestation pool and block packing

Store gossip-verified PayloadAttestationMessages in the operation pool
keyed by PayloadAttestationData. At block production time, aggregate
them into PayloadAttestations using PTC position mapping and include
in the block body. Wire pool insertion into the gossip handler after
fork choice.
This commit is contained in:
Jimmy Chen
2026-04-27 11:32:25 +02:00
parent 6ab48a76f0
commit 1543db8b87
5 changed files with 343 additions and 3 deletions

View File

@@ -23,10 +23,12 @@ use crate::attestation_storage::{AttestationMap, CheckpointKey};
use crate::bls_to_execution_changes::BlsToExecutionChanges;
use crate::sync_aggregate_id::SyncAggregateId;
use attester_slashing::AttesterSlashingMaxCover;
use bls::AggregateSignature;
use max_cover::maximum_cover;
use parking_lot::{RwLock, RwLockWriteGuard};
use rand::rng;
use rand::seq::SliceRandom;
use ssz::BitVector;
use state_processing::per_block_processing::errors::AttestationValidationError;
use state_processing::per_block_processing::{
VerifySignatures, get_slashable_indices_modular, verify_exit,
@@ -38,7 +40,8 @@ use std::ptr;
use typenum::Unsigned;
use types::{
AbstractExecPayload, Attestation, AttestationData, AttesterSlashing, BeaconState,
BeaconStateError, ChainSpec, Epoch, EthSpec, ProposerSlashing, SignedBeaconBlock,
BeaconStateError, ChainSpec, Epoch, EthSpec, Hash256, PayloadAttestation,
PayloadAttestationData, PayloadAttestationMessage, ProposerSlashing, SignedBeaconBlock,
SignedBlsToExecutionChange, SignedVoluntaryExit, Slot, SyncAggregate, SyncAggregateError,
SyncCommitteeContribution, Validator,
};
@@ -59,6 +62,9 @@ pub struct OperationPool<E: EthSpec + Default> {
voluntary_exits: RwLock<HashMap<u64, SigVerifiedOp<SignedVoluntaryExit, E>>>,
/// Map from credential changing validator to their position in the queue.
bls_to_execution_changes: RwLock<BlsToExecutionChanges<E>>,
/// Map from payload attestation data to individual messages for aggregation at block production.
payload_attestation_messages:
RwLock<HashMap<PayloadAttestationData, Vec<PayloadAttestationMessage>>>,
/// Reward cache for accelerating attestation packing.
reward_cache: RwLock<RewardCache>,
_phantom: PhantomData<E>,
@@ -78,6 +84,8 @@ pub enum OpPoolError {
IncorrectOpPoolVariant,
EpochCacheNotInitialized,
EpochCacheError(EpochCacheError),
GetPtcError(BeaconStateError),
PayloadAttestationBitError,
}
#[derive(Default)]
@@ -193,6 +201,92 @@ impl<E: EthSpec> OperationPool<E> {
});
}
/// Insert a validated `PayloadAttestationMessage` into the pool.
pub fn insert_payload_attestation_message(
&self,
message: PayloadAttestationMessage,
) -> Result<(), OpPoolError> {
let mut messages = self.payload_attestation_messages.write();
let entry = messages.entry(message.data.clone()).or_default();
if !entry
.iter()
.any(|m| m.validator_index == message.validator_index)
{
entry.push(message);
}
Ok(())
}
/// Build aggregated `PayloadAttestation`s from stored messages for block production.
///
/// `parent_block_root` is the root of the parent block (the block PTC members attested to).
/// Returns up to `MAX_PAYLOAD_ATTESTATIONS` attestations for the previous slot.
pub fn get_payload_attestations(
&self,
state: &BeaconState<E>,
parent_block_root: Hash256,
spec: &ChainSpec,
) -> Result<Vec<PayloadAttestation<E>>, OpPoolError> {
let target_slot = state.slot().saturating_sub(1u64);
let ptc = state
.get_ptc(target_slot, spec)
.map_err(OpPoolError::GetPtcError)?;
let messages = self.payload_attestation_messages.read();
let mut result = Vec::new();
for (data, msgs) in messages.iter() {
if data.slot != target_slot || data.beacon_block_root != parent_block_root {
continue;
}
let mut aggregation_bits = BitVector::new();
let mut aggregate_sig = AggregateSignature::infinity();
for msg in msgs {
if let Some(pos) = ptc
.0
.iter()
.position(|&idx| idx == msg.validator_index as usize)
{
if !aggregation_bits.get(pos).unwrap_or(false) {
aggregation_bits
.set(pos, true)
.map_err(|_| OpPoolError::PayloadAttestationBitError)?;
aggregate_sig.add_assign(&msg.signature);
}
}
}
if aggregation_bits.num_set_bits() > 0 {
result.push(PayloadAttestation {
aggregation_bits,
data: data.clone(),
signature: aggregate_sig,
});
}
}
Ok(result)
}
/// Remove payload attestation messages that are too old for block inclusion.
pub fn prune_payload_attestation_messages(&self, current_slot: Slot) {
self.payload_attestation_messages
.write()
.retain(|data, _| current_slot <= data.slot.saturating_add(Slot::new(1)));
}
/// Total number of payload attestation messages in the pool.
pub fn num_payload_attestation_messages(&self) -> usize {
self.payload_attestation_messages
.read()
.values()
.map(|msgs| msgs.len())
.sum()
}
/// Insert an attestation into the pool, aggregating it with existing attestations if possible.
///
/// ## Note
@@ -646,6 +740,7 @@ impl<E: EthSpec> OperationPool<E> {
) {
self.prune_attestations(current_epoch);
self.prune_sync_contributions(head_state.slot());
self.prune_payload_attestation_messages(head_state.slot());
self.prune_proposer_slashings(finalized_state);
self.prune_attester_slashings(finalized_state);
self.prune_voluntary_exits(finalized_state, spec);
@@ -2075,4 +2170,226 @@ mod release_tests {
op_pool.prune_attester_slashings(&electra_head.beacon_state);
assert_eq!(op_pool.attester_slashings.read().len(), 1);
}
fn make_payload_attestation_message(
slot: Slot,
validator_index: u64,
beacon_block_root: Hash256,
) -> PayloadAttestationMessage {
PayloadAttestationMessage {
validator_index,
data: PayloadAttestationData {
beacon_block_root,
slot,
payload_present: true,
blob_data_available: true,
},
signature: bls::Signature::empty(),
}
}
#[test]
fn payload_attestation_insert_and_dedup() {
let op_pool = OperationPool::<MinimalEthSpec>::new();
let root = Hash256::repeat_byte(0xaa);
let slot = Slot::new(1);
let msg1 = make_payload_attestation_message(slot, 0, root);
let msg2 = make_payload_attestation_message(slot, 1, root);
let msg1_dup = make_payload_attestation_message(slot, 0, root);
op_pool.insert_payload_attestation_message(msg1).unwrap();
op_pool.insert_payload_attestation_message(msg2).unwrap();
op_pool
.insert_payload_attestation_message(msg1_dup)
.unwrap();
assert_eq!(op_pool.num_payload_attestation_messages(), 2);
}
#[test]
fn payload_attestation_prune() {
let op_pool = OperationPool::<MinimalEthSpec>::new();
let root = Hash256::repeat_byte(0xaa);
let msg_slot1 = make_payload_attestation_message(Slot::new(1), 0, root);
let msg_slot2 = make_payload_attestation_message(Slot::new(2), 1, root);
let msg_slot3 = make_payload_attestation_message(Slot::new(3), 2, root);
op_pool
.insert_payload_attestation_message(msg_slot1)
.unwrap();
op_pool
.insert_payload_attestation_message(msg_slot2)
.unwrap();
op_pool
.insert_payload_attestation_message(msg_slot3)
.unwrap();
assert_eq!(op_pool.num_payload_attestation_messages(), 3);
op_pool.prune_payload_attestation_messages(Slot::new(3));
assert_eq!(op_pool.num_payload_attestation_messages(), 2);
op_pool.prune_payload_attestation_messages(Slot::new(4));
assert_eq!(op_pool.num_payload_attestation_messages(), 1);
op_pool.prune_payload_attestation_messages(Slot::new(5));
assert_eq!(op_pool.num_payload_attestation_messages(), 0);
}
#[tokio::test]
async fn payload_attestation_packs_bits_from_ptc_positions() {
let spec = test_spec::<MinimalEthSpec>();
if !spec.gloas_fork_epoch.is_some_and(|e| e == Epoch::new(0)) {
return;
}
let num_validators = 64;
let harness = get_harness::<MinimalEthSpec>(num_validators, Some(spec.clone()));
harness
.add_attested_blocks_at_slots(
harness.get_current_state(),
Hash256::zero(),
&[Slot::new(1)],
(0..num_validators).collect::<Vec<_>>().as_slice(),
)
.await;
let head = harness.chain.canonical_head.cached_head();
let state = &head.snapshot.beacon_state;
assert_eq!(state.slot(), Slot::new(1));
let target_slot = Slot::new(1);
let parent_root = head.head_block_root();
let ptc = state.get_ptc(target_slot, &spec).unwrap();
let ptc_member_0 = ptc.0[0] as u64;
let ptc_member_1 = ptc.0[1] as u64;
let op_pool = OperationPool::<MinimalEthSpec>::new();
let msg0 = make_payload_attestation_message(target_slot, ptc_member_0, parent_root);
let msg1 = make_payload_attestation_message(target_slot, ptc_member_1, parent_root);
op_pool.insert_payload_attestation_message(msg0).unwrap();
op_pool.insert_payload_attestation_message(msg1).unwrap();
// Advance state to slot 2 so get_payload_attestations looks at slot 1.
let mut advanced_state = state.clone();
state_processing::state_advance::complete_state_advance(
&mut advanced_state,
None,
Slot::new(2),
&spec,
)
.unwrap();
let attestations = op_pool
.get_payload_attestations(&advanced_state, parent_root, &spec)
.unwrap();
assert_eq!(attestations.len(), 1);
assert_eq!(attestations[0].aggregation_bits.num_set_bits(), 2);
assert!(attestations[0].aggregation_bits.get(0).unwrap());
assert!(attestations[0].aggregation_bits.get(1).unwrap());
assert!(attestations[0].data.payload_present);
}
#[tokio::test]
async fn payload_attestation_filters_wrong_root() {
let spec = test_spec::<MinimalEthSpec>();
if !spec.gloas_fork_epoch.is_some_and(|e| e == Epoch::new(0)) {
return;
}
let num_validators = 64;
let harness = get_harness::<MinimalEthSpec>(num_validators, Some(spec.clone()));
harness
.add_attested_blocks_at_slots(
harness.get_current_state(),
Hash256::zero(),
&[Slot::new(1)],
(0..num_validators).collect::<Vec<_>>().as_slice(),
)
.await;
let head = harness.chain.canonical_head.cached_head();
let state = &head.snapshot.beacon_state;
let target_slot = Slot::new(1);
let ptc = state.get_ptc(target_slot, &spec).unwrap();
let ptc_member = ptc.0[0] as u64;
let op_pool = OperationPool::<MinimalEthSpec>::new();
let wrong_root = Hash256::repeat_byte(0xff);
let msg = make_payload_attestation_message(target_slot, ptc_member, wrong_root);
op_pool.insert_payload_attestation_message(msg).unwrap();
let mut advanced_state = state.clone();
state_processing::state_advance::complete_state_advance(
&mut advanced_state,
None,
Slot::new(2),
&spec,
)
.unwrap();
let parent_root = head.head_block_root();
let attestations = op_pool
.get_payload_attestations(&advanced_state, parent_root, &spec)
.unwrap();
assert!(attestations.is_empty());
}
#[tokio::test]
async fn payload_attestation_non_ptc_validator_skipped() {
let spec = test_spec::<MinimalEthSpec>();
if !spec.gloas_fork_epoch.is_some_and(|e| e == Epoch::new(0)) {
return;
}
let num_validators = 64;
let harness = get_harness::<MinimalEthSpec>(num_validators, Some(spec.clone()));
harness
.add_attested_blocks_at_slots(
harness.get_current_state(),
Hash256::zero(),
&[Slot::new(1)],
(0..num_validators).collect::<Vec<_>>().as_slice(),
)
.await;
let head = harness.chain.canonical_head.cached_head();
let state = &head.snapshot.beacon_state;
let target_slot = Slot::new(1);
let parent_root = head.head_block_root();
let ptc = state.get_ptc(target_slot, &spec).unwrap();
// Find a validator NOT in the PTC.
let non_ptc_validator = (0..num_validators as u64)
.find(|&i| !ptc.0.contains(&(i as usize)))
.expect("should find non-PTC validator");
let op_pool = OperationPool::<MinimalEthSpec>::new();
let msg = make_payload_attestation_message(target_slot, non_ptc_validator, parent_root);
op_pool.insert_payload_attestation_message(msg).unwrap();
let mut advanced_state = state.clone();
state_processing::state_advance::complete_state_advance(
&mut advanced_state,
None,
Slot::new(2),
&spec,
)
.unwrap();
let attestations = op_pool
.get_payload_attestations(&advanced_state, parent_root, &spec)
.unwrap();
// Message was in the pool but validator isn't in PTC, so no attestation produced.
assert!(attestations.is_empty());
}
}

View File

@@ -209,6 +209,7 @@ impl<E: EthSpec> PersistedOperationPool<E> {
proposer_slashings,
voluntary_exits,
bls_to_execution_changes: RwLock::new(bls_to_execution_changes),
payload_attestation_messages: Default::default(),
reward_cache: Default::default(),
_phantom: Default::default(),
};