diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index bfe1b404e0..cf5afb089a 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -53,7 +53,8 @@ use crate::observed_aggregates::{ Error as AttestationObservationError, ObservedAggregateAttestations, ObservedSyncContributions, }; use crate::observed_attesters::{ - ObservedAggregators, ObservedAttesters, ObservedSyncAggregators, ObservedSyncContributors, + ObservedAggregators, ObservedAttesters, ObservedPayloadAttesters, ObservedSyncAggregators, + ObservedSyncContributors, }; use crate::observed_block_producers::ObservedBlockProducers; use crate::observed_data_sidecars::ObservedDataSidecars; @@ -418,6 +419,9 @@ pub struct BeaconChain { /// Maintains a record of which validators have been seen to create `SignedContributionAndProofs` /// in recent epochs. pub(crate) observed_sync_aggregators: RwLock>, + /// Maintains a record of which validators have sent payload attestation messages + /// in recent slots. + pub(crate) observed_payload_attesters: RwLock>, /// Maintains a record of which validators have proposed blocks for each slot. pub observed_block_producers: RwLock>, /// Maintains a record of blob sidecars seen over the gossip network. @@ -2308,6 +2312,22 @@ impl BeaconChain { }) } + pub fn apply_payload_attestation_to_fork_choice( + &self, + indexed_payload_attestation: &IndexedPayloadAttestation, + ptc: &PTC, + ) -> Result<(), Error> { + self.canonical_head + .fork_choice_write_lock() + .on_payload_attestation( + self.slot()?, + indexed_payload_attestation, + AttestationFromBlock::False, + &ptc.0, + ) + .map_err(Into::into) + } + /// Accepts some `SyncCommitteeMessage` from the network and attempts to verify it, returning `Ok(_)` if /// it is valid to be (re)broadcast on the gossip network. pub fn verify_sync_committee_message_for_gossip( diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 19eb1aa877..d70561db9b 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -1015,6 +1015,7 @@ where observed_aggregators: <_>::default(), // TODO: allow for persisting and loading the pool from disk. observed_sync_aggregators: <_>::default(), + observed_payload_attesters: <_>::default(), // TODO: allow for persisting and loading the pool from disk. observed_block_producers: <_>::default(), observed_column_sidecars: RwLock::new(ObservedDataSidecars::new(self.spec.clone())), diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 7631e6b904..d70fc1b3ec 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -44,6 +44,7 @@ pub mod observed_data_sidecars; pub mod observed_operations; mod observed_slashable; pub mod partial_data_column_assembler; +pub mod payload_attestation_verification; pub mod payload_bid_verification; pub mod payload_envelope_streamer; pub mod payload_envelope_verification; diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index ce136ef3fc..43c3337bc9 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -1468,6 +1468,27 @@ pub static SYNC_MESSAGE_GOSSIP_VERIFICATION_TIMES: LazyLock> = "Full runtime of sync contribution gossip verification", ) }); +pub static PAYLOAD_ATTESTATION_PROCESSING_REQUESTS: LazyLock> = + LazyLock::new(|| { + try_create_int_counter( + "beacon_payload_attestation_processing_requests_total", + "Count of all payload attestation messages submitted for processing", + ) + }); +pub static PAYLOAD_ATTESTATION_PROCESSING_SUCCESSES: LazyLock> = + LazyLock::new(|| { + try_create_int_counter( + "beacon_payload_attestation_processing_successes_total", + "Number of payload attestation messages verified for gossip", + ) + }); +pub static PAYLOAD_ATTESTATION_GOSSIP_VERIFICATION_TIMES: LazyLock> = + LazyLock::new(|| { + try_create_histogram( + "beacon_payload_attestation_gossip_verification_seconds", + "Full runtime of payload attestation gossip verification", + ) + }); pub static SYNC_MESSAGE_EQUIVOCATIONS: LazyLock> = LazyLock::new(|| { try_create_int_counter( "sync_message_equivocations_total", diff --git a/beacon_node/beacon_chain/src/observed_attesters.rs b/beacon_node/beacon_chain/src/observed_attesters.rs index 277bf38ffc..4bb536880c 100644 --- a/beacon_node/beacon_chain/src/observed_attesters.rs +++ b/beacon_node/beacon_chain/src/observed_attesters.rs @@ -42,6 +42,8 @@ pub type ObservedSyncContributors = pub type ObservedAggregators = AutoPruningEpochContainer; pub type ObservedSyncAggregators = AutoPruningSlotContainer; +pub type ObservedPayloadAttesters = + AutoPruningSlotContainer, E>; #[derive(Debug, PartialEq)] pub enum Error { @@ -255,6 +257,46 @@ impl Item<()> for SyncAggregatorSlotHashSet { } } +/// Stores a `HashSet` of validator indices that have sent a payload attestation gossip +/// message during a slot. +pub struct PayloadAttesterSlotHashSet { + set: HashSet, + phantom: PhantomData, +} + +impl Item<()> for PayloadAttesterSlotHashSet { + fn with_capacity(capacity: usize) -> Self { + Self { + set: HashSet::with_capacity(capacity), + phantom: PhantomData, + } + } + + /// Defaults to `PTC_SIZE`, the maximum number of payload attesters per slot. + fn default_capacity() -> usize { + E::ptc_size() + } + + fn len(&self) -> usize { + self.set.len() + } + + fn validator_count(&self) -> usize { + self.set.len() + } + + /// Inserts the `validator_index` in the set. Returns `true` if the `validator_index` was + /// already in the set. + fn insert(&mut self, validator_index: usize, _value: ()) -> bool { + !self.set.insert(validator_index) + } + + /// Returns `true` if the `validator_index` is in the set. + fn get(&self, validator_index: usize) -> Option<()> { + self.set.contains(&validator_index).then_some(()) + } +} + /// A container that stores some number of `T` items. /// /// This container is "auto-pruning" since it gets an idea of the current slot by which diff --git a/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs b/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs new file mode 100644 index 0000000000..2d9fce812e --- /dev/null +++ b/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs @@ -0,0 +1,255 @@ +use super::Error; +use crate::beacon_chain::BeaconStore; +use crate::canonical_head::CanonicalHead; +use crate::observed_attesters::ObservedPayloadAttesters; +use crate::validator_pubkey_cache::ValidatorPubkeyCache; +use crate::{BeaconChain, BeaconChainError, BeaconChainTypes, metrics}; +use bls::AggregateSignature; +use educe::Educe; +use parking_lot::RwLock; +use safe_arith::SafeArith; +use slot_clock::SlotClock; +use state_processing::per_block_processing::signature_sets::indexed_payload_attestation_signature_set; +use state_processing::state_advance::partial_state_advance; +use std::borrow::Cow; +use types::{ChainSpec, EthSpec, IndexedPayloadAttestation, PTC, PayloadAttestationMessage, Slot}; + +pub struct GossipVerificationContext<'a, T: BeaconChainTypes> { + pub slot_clock: &'a T::SlotClock, + pub spec: &'a ChainSpec, + pub observed_payload_attesters: &'a RwLock>, + pub canonical_head: &'a CanonicalHead, + pub validator_pubkey_cache: &'a RwLock>, + pub store: &'a BeaconStore, +} + +/// A `PayloadAttestationMessage` that has been verified for propagation on the gossip network. +#[derive(Educe)] +#[educe(Clone, Debug)] +pub struct VerifiedPayloadAttestationMessage { + payload_attestation_message: PayloadAttestationMessage, + indexed_payload_attestation: IndexedPayloadAttestation, + ptc: PTC, +} + +impl VerifiedPayloadAttestationMessage { + pub fn new( + payload_attestation_message: PayloadAttestationMessage, + ctx: &GossipVerificationContext<'_, T>, + ) -> Result { + let slot = payload_attestation_message.data.slot; + let validator_index = payload_attestation_message.validator_index; + + // [IGNORE] `data.slot` is within the `MAXIMUM_GOSSIP_CLOCK_DISPARITY` allowance. + verify_propagation_slot_range(ctx.slot_clock, slot, ctx.spec)?; + + // [IGNORE] There has been no other valid payload attestation message for this + // validator index. + if ctx + .observed_payload_attesters + .read() + .validator_has_been_observed(slot, validator_index as usize) + .map_err(BeaconChainError::from)? + { + return Err(Error::PriorPayloadAttestationMessageKnown { + validator_index, + slot, + }); + } + + // [IGNORE] `data.beacon_block_root` has been seen + // [REJECT] `data.beacon_block_root` passes validation. + // + // TODO(gloas): These two conditions are conflated. We need a status table to + // differentiate between: + // 1. Blocks we haven't seen (IGNORE), and + // 2. Blocks we've seen that are invalid (REJECT). + // Presently both cases return IGNORE. + let beacon_block_root = payload_attestation_message.data.beacon_block_root; + if ctx + .canonical_head + .fork_choice_read_lock() + .get_block(&beacon_block_root) + .is_none() + { + return Err(Error::UnknownHeadBlock { beacon_block_root }); + } + + // Get head state for PTC computation. If the cached head state is too stale + // (e.g. during liveness failures with many skipped slots), fall back to loading + // a more recent state from the store and advancing it if necessary. + let head = ctx.canonical_head.cached_head(); + let head_state = &head.snapshot.beacon_state; + + let message_epoch = slot.epoch(T::EthSpec::slots_per_epoch()); + let state_epoch = head_state.current_epoch(); + + // get_ptc can serve epochs in [state_epoch - 1, state_epoch + min_seed_lookahead]. + // If the message epoch is beyond that range, the head state is stale. + let advanced_state = if message_epoch + > state_epoch + .safe_add(ctx.spec.min_seed_lookahead) + .map_err(BeaconChainError::from)? + { + let head_block_root = head.head_block_root(); + let target_slot = message_epoch.start_slot(T::EthSpec::slots_per_epoch()); + + let (state_root, mut state) = ctx + .store + .get_advanced_hot_state( + head_block_root, + target_slot, + head.snapshot.beacon_state_root(), + ) + .map_err(BeaconChainError::from)? + .ok_or(BeaconChainError::MissingBeaconState( + head.snapshot.beacon_state_root(), + ))?; + + if state + .current_epoch() + .safe_add(ctx.spec.min_seed_lookahead) + .map_err(BeaconChainError::from)? + < message_epoch + { + partial_state_advance(&mut state, Some(state_root), target_slot, ctx.spec) + .map_err(BeaconChainError::from)?; + } + + Some(state) + } else { + None + }; + + let state = advanced_state.as_ref().unwrap_or(head_state); + + // [REJECT] `validator_index` is within `get_ptc(state, data.slot)`. + let ptc = state.get_ptc(slot, ctx.spec)?; + if !ptc.0.contains(&(validator_index as usize)) { + return Err(Error::NotInPTC { + validator_index, + slot, + }); + } + + // Build the indexed form for signature verification and downstream fork choice. + let indexed_payload_attestation = IndexedPayloadAttestation { + attesting_indices: vec![validator_index] + .try_into() + .map_err(|_| Error::UnknownValidatorIndex(validator_index))?, + data: payload_attestation_message.data.clone(), + signature: AggregateSignature::from(&payload_attestation_message.signature), + }; + + { + // [REJECT] The signature is valid with respect to the `validator_index`. + let pubkey_cache = ctx.validator_pubkey_cache.read(); + let signature_set = indexed_payload_attestation_signature_set( + state, + |validator_index| pubkey_cache.get(validator_index).map(Cow::Borrowed), + &indexed_payload_attestation.signature, + &indexed_payload_attestation, + ctx.spec, + ) + .map_err(|_| Error::UnknownValidatorIndex(validator_index))?; + + if !signature_set.verify() { + return Err(Error::InvalidSignature); + } + } + + // Record that we have received a valid payload attestation message from this + // validator. Double check with the write lock to handle race conditions. + if ctx + .observed_payload_attesters + .write() + .observe_validator(slot, validator_index as usize, ()) + .map_err(BeaconChainError::from)? + { + return Err(Error::PriorPayloadAttestationMessageKnown { + validator_index, + slot, + }); + } + + Ok(Self { + payload_attestation_message, + indexed_payload_attestation, + ptc, + }) + } + + pub fn payload_attestation_message(&self) -> &PayloadAttestationMessage { + &self.payload_attestation_message + } + + pub fn indexed_payload_attestation(&self) -> &IndexedPayloadAttestation { + &self.indexed_payload_attestation + } + + pub fn ptc(&self) -> &PTC { + &self.ptc + } + + pub fn into_payload_attestation_message(self) -> PayloadAttestationMessage { + self.payload_attestation_message + } +} + +impl BeaconChain { + pub fn payload_attestation_gossip_context(&self) -> GossipVerificationContext<'_, T> { + GossipVerificationContext { + slot_clock: &self.slot_clock, + spec: &self.spec, + observed_payload_attesters: &self.observed_payload_attesters, + canonical_head: &self.canonical_head, + validator_pubkey_cache: &self.validator_pubkey_cache, + store: &self.store, + } + } + + pub fn verify_payload_attestation_message_for_gossip( + &self, + payload_attestation_message: PayloadAttestationMessage, + ) -> Result, Error> { + metrics::inc_counter(&metrics::PAYLOAD_ATTESTATION_PROCESSING_REQUESTS); + let _timer = metrics::start_timer(&metrics::PAYLOAD_ATTESTATION_GOSSIP_VERIFICATION_TIMES); + + let ctx = self.payload_attestation_gossip_context(); + VerifiedPayloadAttestationMessage::new(payload_attestation_message, &ctx).inspect(|_| { + metrics::inc_counter(&metrics::PAYLOAD_ATTESTATION_PROCESSING_SUCCESSES); + }) + } +} + +/// Verify that the `slot` is within the acceptable gossip propagation range, with reference +/// to the current slot of the clock. +/// +/// Accounts for `MAXIMUM_GOSSIP_CLOCK_DISPARITY`. +fn verify_propagation_slot_range( + slot_clock: &S, + message_slot: Slot, + spec: &ChainSpec, +) -> Result<(), Error> { + let latest_permissible_slot = slot_clock + .now_with_future_tolerance(spec.maximum_gossip_clock_disparity()) + .ok_or(BeaconChainError::UnableToReadSlot)?; + if message_slot > latest_permissible_slot { + return Err(Error::FutureSlot { + message_slot, + latest_permissible_slot, + }); + } + + let earliest_permissible_slot = slot_clock + .now_with_past_tolerance(spec.maximum_gossip_clock_disparity()) + .ok_or(BeaconChainError::UnableToReadSlot)?; + if message_slot < earliest_permissible_slot { + return Err(Error::PastSlot { + message_slot, + earliest_permissible_slot, + }); + } + + Ok(()) +} diff --git a/beacon_node/beacon_chain/src/payload_attestation_verification/mod.rs b/beacon_node/beacon_chain/src/payload_attestation_verification/mod.rs new file mode 100644 index 0000000000..477527c0aa --- /dev/null +++ b/beacon_node/beacon_chain/src/payload_attestation_verification/mod.rs @@ -0,0 +1,110 @@ +//! Provides verification for `PayloadAttestationMessage` received from the gossip network. +//! +//! ```ignore +//! types::PayloadAttestationMessage +//! | +//! ▼ +//! VerifiedPayloadAttestationMessage +//! ``` + +use crate::BeaconChainError; +use strum::AsRefStr; +use types::{BeaconStateError, Hash256, Slot}; + +pub mod gossip_verified_payload_attestation; + +pub use gossip_verified_payload_attestation::{ + GossipVerificationContext, VerifiedPayloadAttestationMessage, +}; + +/// Returned when a payload attestation message was not successfully verified. It might not have +/// been verified for two reasons: +/// +/// - The message is malformed or inappropriate for the context (indicated by all variants +/// other than `BeaconChainError`). +/// - The application encountered an internal error whilst attempting to determine validity +/// (the `BeaconChainError` variant) +#[derive(Debug, AsRefStr)] +pub enum Error { + /// The payload attestation message is from a slot that is later than the current slot + /// (with respect to the gossip clock disparity). + /// + /// ## Peer scoring + /// + /// Assuming the local clock is correct, the peer has sent an invalid message. + FutureSlot { + message_slot: Slot, + latest_permissible_slot: Slot, + }, + /// The payload attestation message is from a slot that is prior to the earliest + /// permissible slot (with respect to the gossip clock disparity). + /// + /// ## Peer scoring + /// + /// Assuming the local clock is correct, the peer has sent an invalid message. + PastSlot { + message_slot: Slot, + earliest_permissible_slot: Slot, + }, + /// We have already observed a valid payload attestation message from this validator + /// for this slot. + /// + /// ## Peer scoring + /// + /// The peer is not necessarily faulty. + PriorPayloadAttestationMessageKnown { validator_index: u64, slot: Slot }, + /// The beacon block referenced by the payload attestation message is not known. + /// + /// ## Peer scoring + /// + /// The attestation points to a block we have not yet imported. It's unclear if the + /// attestation is valid or not. + UnknownHeadBlock { beacon_block_root: Hash256 }, + /// The validator index is not a member of the PTC for this slot. + /// + /// ## Peer scoring + /// + /// The peer has sent an invalid message. + NotInPTC { validator_index: u64, slot: Slot }, + /// The validator index is unknown. + /// + /// ## Peer scoring + /// + /// The peer has sent an invalid message. + UnknownValidatorIndex(u64), + /// The signature on the payload attestation message is invalid. + /// + /// ## Peer scoring + /// + /// The peer has sent an invalid message. + InvalidSignature, + /// There was an error whilst processing the payload attestation message. It is not known + /// if it is valid or invalid. + /// + /// ## Peer scoring + /// + /// We were unable to process this message due to an internal error. It's unclear if the + /// message is valid. + BeaconChainError(Box), + /// An error reading beacon state. + /// + /// ## Peer scoring + /// + /// We were unable to process this message due to an internal error. + BeaconStateError(BeaconStateError), +} + +impl From for Error { + fn from(e: BeaconChainError) -> Self { + Error::BeaconChainError(Box::new(e)) + } +} + +impl From for Error { + fn from(e: BeaconStateError) -> Self { + Error::BeaconStateError(e) + } +} + +#[cfg(test)] +mod tests; diff --git a/beacon_node/beacon_chain/src/payload_attestation_verification/tests.rs b/beacon_node/beacon_chain/src/payload_attestation_verification/tests.rs new file mode 100644 index 0000000000..7faad98e55 --- /dev/null +++ b/beacon_node/beacon_chain/src/payload_attestation_verification/tests.rs @@ -0,0 +1,422 @@ +use std::sync::Arc; +use std::time::Duration; + +use bls::{Keypair, Signature}; +use fork_choice::ForkChoice; +use genesis::{generate_deterministic_keypairs, interop_genesis_state}; +use parking_lot::RwLock; +use proto_array::PayloadStatus; +use slot_clock::{SlotClock, TestingSlotClock}; +use state_processing::AllCaches; +use state_processing::genesis::genesis_block; +use store::{HotColdDB, StoreConfig}; +use types::{ + ChainSpec, Checkpoint, Domain, Epoch, EthSpec, Hash256, MinimalEthSpec, PayloadAttestationData, + PayloadAttestationMessage, SignedBeaconBlock, SignedRoot, Slot, +}; + +use crate::{ + beacon_fork_choice_store::BeaconForkChoiceStore, + beacon_snapshot::BeaconSnapshot, + canonical_head::CanonicalHead, + observed_attesters::ObservedPayloadAttesters, + payload_attestation_verification::{ + Error as PayloadAttestationError, + gossip_verified_payload_attestation::{ + GossipVerificationContext, VerifiedPayloadAttestationMessage, + }, + }, + test_utils::{BeaconChainHarness, EphemeralHarnessType, fork_name_from_env, test_spec}, + validator_pubkey_cache::ValidatorPubkeyCache, +}; + +type E = MinimalEthSpec; +type T = EphemeralHarnessType; + +const NUM_VALIDATORS: usize = 64; + +struct TestContext { + canonical_head: CanonicalHead, + observed_payload_attesters: RwLock>, + validator_pubkey_cache: RwLock>, + slot_clock: TestingSlotClock, + keypairs: Vec, + spec: ChainSpec, + genesis_block_root: Hash256, + store: Arc, store::MemoryStore>>, +} + +impl TestContext { + fn new() -> Self { + let spec = test_spec::(); + let store = Arc::new( + HotColdDB::open_ephemeral(StoreConfig::default(), Arc::new(spec.clone())) + .expect("should open ephemeral store"), + ); + + let keypairs = generate_deterministic_keypairs(NUM_VALIDATORS); + + let mut state = + interop_genesis_state::(&keypairs, 0, Hash256::repeat_byte(0x42), None, &spec) + .expect("should build genesis state"); + + *state.finalized_checkpoint_mut() = Checkpoint { + epoch: Epoch::new(1), + root: Hash256::ZERO, + }; + + let mut block = genesis_block(&state, &spec).expect("should build genesis block"); + *block.state_root_mut() = state + .update_tree_hash_cache() + .expect("should hash genesis state"); + let signed_block = SignedBeaconBlock::from_block(block, Signature::empty()); + let block_root = signed_block.canonical_root(); + + let snapshot = BeaconSnapshot::new( + Arc::new(signed_block.clone()), + None, + block_root, + state.clone(), + ); + + let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store.clone(), snapshot.clone()) + .expect("should create fork choice store"); + let fork_choice = + ForkChoice::from_anchor(fc_store, block_root, &signed_block, &state, None, &spec) + .expect("should create fork choice"); + + let canonical_head = + CanonicalHead::new(fork_choice, Arc::new(snapshot), PayloadStatus::Pending); + + let slot_clock = TestingSlotClock::new( + Slot::new(0), + Duration::from_secs(0), + spec.get_slot_duration(), + ); + // Advance past genesis so `now_with_past_tolerance` doesn't underflow. + slot_clock.set_current_time(spec.get_slot_duration()); + + let validator_pubkey_cache = + ValidatorPubkeyCache::new(&state, store.clone()).expect("should create pubkey cache"); + + Self { + canonical_head, + observed_payload_attesters: RwLock::new(ObservedPayloadAttesters::default()), + validator_pubkey_cache: RwLock::new(validator_pubkey_cache), + slot_clock, + keypairs, + spec, + genesis_block_root: block_root, + store, + } + } + + fn gossip_ctx(&self) -> GossipVerificationContext<'_, T> { + GossipVerificationContext { + slot_clock: &self.slot_clock, + spec: &self.spec, + observed_payload_attesters: &self.observed_payload_attesters, + canonical_head: &self.canonical_head, + validator_pubkey_cache: &self.validator_pubkey_cache, + store: &self.store, + } + } + + fn ptc_members(&self, slot: Slot) -> Vec { + let head = self.canonical_head.cached_head(); + let state = &head.snapshot.beacon_state; + let ptc = state.get_ptc(slot, &self.spec).expect("should get PTC"); + ptc.0.to_vec() + } + + fn sign_payload_attestation( + &self, + data: PayloadAttestationData, + validator_index: u64, + ) -> PayloadAttestationMessage { + let head = self.canonical_head.cached_head(); + let state = &head.snapshot.beacon_state; + let domain = self.spec.get_domain( + data.slot.epoch(E::slots_per_epoch()), + Domain::PTCAttester, + &state.fork(), + state.genesis_validators_root(), + ); + let message = data.signing_root(domain); + let signature = self.keypairs[validator_index as usize].sk.sign(message); + PayloadAttestationMessage { + validator_index, + data, + signature, + } + } +} + +fn make_payload_attestation( + slot: Slot, + validator_index: u64, + beacon_block_root: Hash256, +) -> PayloadAttestationMessage { + PayloadAttestationMessage { + validator_index, + data: PayloadAttestationData { + beacon_block_root, + slot, + payload_present: true, + blob_data_available: true, + }, + signature: Signature::empty(), + } +} + +#[test] +fn future_slot() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + let ctx = TestContext::new(); + let gossip = ctx.gossip_ctx(); + + let future_slot = Slot::new(5); + let msg = make_payload_attestation(future_slot, 0, ctx.genesis_block_root); + let result = VerifiedPayloadAttestationMessage::new(msg, &gossip); + assert!(matches!( + result, + Err(PayloadAttestationError::FutureSlot { .. }) + )); +} + +#[test] +fn past_slot() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + let ctx = TestContext::new(); + ctx.slot_clock.set_slot(5); + let gossip = ctx.gossip_ctx(); + + let msg = make_payload_attestation(Slot::new(0), 0, ctx.genesis_block_root); + let result = VerifiedPayloadAttestationMessage::new(msg, &gossip); + assert!(matches!( + result, + Err(PayloadAttestationError::PastSlot { .. }) + )); +} + +#[test] +fn unknown_head_block() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + let ctx = TestContext::new(); + let gossip = ctx.gossip_ctx(); + + let unknown_root = Hash256::repeat_byte(0xff); + let msg = make_payload_attestation(Slot::new(1), 0, unknown_root); + let result = VerifiedPayloadAttestationMessage::new(msg, &gossip); + assert!( + matches!( + result, + Err(PayloadAttestationError::UnknownHeadBlock { .. }) + ), + "expected UnknownHeadBlock, got: {:?}", + result + ); +} + +#[test] +fn not_in_ptc() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + let ctx = TestContext::new(); + let gossip = ctx.gossip_ctx(); + let slot = Slot::new(1); + + let ptc_members = ctx.ptc_members(slot); + let non_ptc_validator = (0..NUM_VALIDATORS as u64) + .find(|&i| !ptc_members.contains(&(i as usize))) + .expect("should find non-PTC validator"); + + let msg = make_payload_attestation(slot, non_ptc_validator, ctx.genesis_block_root); + let result = VerifiedPayloadAttestationMessage::new(msg, &gossip); + assert!(matches!( + result, + Err(PayloadAttestationError::NotInPTC { .. }) + )); +} + +#[test] +fn invalid_signature() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + let ctx = TestContext::new(); + let gossip = ctx.gossip_ctx(); + let slot = Slot::new(1); + + let ptc_members = ctx.ptc_members(slot); + let validator_index = ptc_members[0] as u64; + + let msg = make_payload_attestation(slot, validator_index, ctx.genesis_block_root); + let result = VerifiedPayloadAttestationMessage::new(msg, &gossip); + assert!(matches!( + result, + Err(PayloadAttestationError::InvalidSignature) + )); +} + +#[test] +fn valid_payload_attestation() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + let ctx = TestContext::new(); + let gossip = ctx.gossip_ctx(); + let slot = Slot::new(1); + + let ptc_members = ctx.ptc_members(slot); + let validator_index = ptc_members[0] as u64; + + let data = PayloadAttestationData { + beacon_block_root: ctx.genesis_block_root, + slot, + payload_present: true, + blob_data_available: true, + }; + let msg = ctx.sign_payload_attestation(data, validator_index); + let result = VerifiedPayloadAttestationMessage::new(msg, &gossip); + assert!( + result.is_ok(), + "expected Ok, got: {:?}", + result.unwrap_err() + ); +} + +#[test] +fn duplicate_after_valid() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + let ctx = TestContext::new(); + let gossip = ctx.gossip_ctx(); + let slot = Slot::new(1); + + let ptc_members = ctx.ptc_members(slot); + let validator_index = ptc_members[0] as u64; + + let data = PayloadAttestationData { + beacon_block_root: ctx.genesis_block_root, + slot, + payload_present: true, + blob_data_available: true, + }; + + let msg1 = ctx.sign_payload_attestation(data.clone(), validator_index); + let result1 = VerifiedPayloadAttestationMessage::new(msg1, &gossip); + assert!( + result1.is_ok(), + "first message should pass: {:?}", + result1.unwrap_err() + ); + + let msg2 = ctx.sign_payload_attestation(data, validator_index); + let result2 = VerifiedPayloadAttestationMessage::new(msg2, &gossip); + assert!(matches!( + result2, + Err(PayloadAttestationError::PriorPayloadAttestationMessageKnown { .. }) + )); +} + +/// Exercises the `partial_state_advance` fallback in gossip verification when +/// the head state is too stale to compute PTC membership (e.g., during a +/// network liveness failure with many missed slots). +#[tokio::test] +async fn stale_head_with_partial_advance() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + + let slots_per_epoch = E::slots_per_epoch(); + // Head at epoch 1, message at epoch 5 — 4 epochs of missed slots. + // This exceeds min_seed_lookahead (1), triggering the fallback path: + // get_advanced_hot_state loads the stored state, then partial_state_advance + // advances it through epoch boundaries to populate ptc_window. + let head_slot = Slot::new(slots_per_epoch); + let missed_epochs = 4; + let target_slot = Slot::new(slots_per_epoch * (1 + missed_epochs)); + let target_epoch = target_slot.epoch(slots_per_epoch); + + // GIVEN a chain with blocks through epoch 1 (so the store has states). + let harness = BeaconChainHarness::builder(E::default()) + .default_spec() + .deterministic_keypairs(64) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + harness.extend_to_slot(head_slot).await; + + let head = harness.chain.canonical_head.cached_head(); + let head_epoch = head.snapshot.beacon_state.current_epoch(); + assert!( + target_epoch > head_epoch + harness.spec.min_seed_lookahead, + "precondition: message epoch must exceed head + min_seed_lookahead to trigger fallback" + ); + + // GIVEN a slot clock advanced to epoch 5 without producing blocks + // (simulating missed slots during a liveness failure). + harness.chain.slot_clock.set_slot(target_slot.as_u64()); + + // Advance a reference state to compute the PTC at the target slot. + let mut reference_state = head.snapshot.beacon_state.clone(); + state_processing::state_advance::partial_state_advance( + &mut reference_state, + Some(head.snapshot.beacon_state_root()), + target_slot, + &harness.spec, + ) + .expect("should advance reference state"); + reference_state + .build_all_caches(&harness.spec) + .expect("should build caches"); + + let ptc = reference_state + .get_ptc(target_slot, &harness.spec) + .expect("should get PTC from reference state"); + let validator_index = *ptc.0.first().expect("PTC should have at least one member") as u64; + + // WHEN a properly-signed payload attestation from a PTC member is verified. + let domain = harness.spec.get_domain( + target_epoch, + Domain::PTCAttester, + &reference_state.fork(), + reference_state.genesis_validators_root(), + ); + let data = PayloadAttestationData { + beacon_block_root: head.head_block_root(), + slot: target_slot, + payload_present: true, + blob_data_available: true, + }; + let message = data.signing_root(domain); + let signature = harness.validator_keypairs[validator_index as usize] + .sk + .sign(message); + let msg = PayloadAttestationMessage { + validator_index, + data, + signature, + }; + + // THEN verification succeeds despite the head being 4 epochs stale. + let result = harness + .chain + .verify_payload_attestation_message_for_gossip(msg); + assert!( + result.is_ok(), + "expected Ok (head epoch {}, message epoch {}), got: {:?}", + head_epoch, + target_epoch, + result.unwrap_err() + ); +} diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index ea1a2286a0..4083b1a3af 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -21,6 +21,9 @@ use beacon_chain::{ light_client_finality_update_verification::Error as LightClientFinalityUpdateError, light_client_optimistic_update_verification::Error as LightClientOptimisticUpdateError, observed_operations::ObservationOutcome, + payload_attestation_verification::{ + Error as PayloadAttestationError, VerifiedPayloadAttestationMessage, + }, sync_committee_verification::{self, Error as SyncCommitteeError}, validator_monitor::{get_block_delay_ms, get_slot_delay_ms}, }; @@ -137,6 +140,11 @@ struct RejectedAggregate { error: AttnError, } +struct RejectedPayloadAttestation { + payload_attestation_message: Box, + error: PayloadAttestationError, +} + /// Data for an aggregated or unaggregated attestation that failed verification. enum FailedAtt { Unaggregate { @@ -4088,25 +4096,143 @@ impl NetworkBeaconProcessor { } } - // TODO(gloas) dont forget to add tracing instrumentation + #[instrument( + level = "trace", + skip_all, + fields( + peer_id = %peer_id, + slot = %payload_attestation_message.data.slot, + validator_index = payload_attestation_message.validator_index, + ) + )] pub fn process_gossip_payload_attestation( self: &Arc, message_id: MessageId, peer_id: PeerId, - payload_attestation_message: PayloadAttestationMessage, + payload_attestation_message: Box, ) { - // TODO(EIP-7732): Implement proper payload attestation message gossip processing. - // This should integrate with a payload_attestation_verification.rs module once it's implemented. + let result = match self + .chain + .verify_payload_attestation_message_for_gossip(*payload_attestation_message.clone()) + { + Ok(verified) => Ok(verified), + Err(error) => Err(RejectedPayloadAttestation { + payload_attestation_message: payload_attestation_message.clone(), + error, + }), + }; - trace!( - %peer_id, - validator_index = payload_attestation_message.validator_index, - slot = %payload_attestation_message.data.slot, - beacon_block_root = %payload_attestation_message.data.beacon_block_root, - "Processing payload attestation message" - ); + self.process_gossip_payload_attestation_result(result, message_id, peer_id); + } - // For now, ignore all payload attestation messages since verification is not implemented - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + fn process_gossip_payload_attestation_result( + self: &Arc, + result: Result, RejectedPayloadAttestation>, + message_id: MessageId, + peer_id: PeerId, + ) { + match result { + Ok(verified) => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); + + if let Err(e) = self.chain.apply_payload_attestation_to_fork_choice( + verified.indexed_payload_attestation(), + verified.ptc(), + ) { + match e { + BeaconChainError::ForkChoiceError( + ForkChoiceError::InvalidPayloadAttestation(e), + ) => { + debug!( + reason = ?e, + %peer_id, + "Payload attestation invalid for fork choice" + ) + } + e => error!( + reason = ?e, + %peer_id, + "Error applying payload attestation to fork choice" + ), + } + } + } + Err(RejectedPayloadAttestation { + payload_attestation_message, + error, + }) => { + self.handle_payload_attestation_verification_failure( + peer_id, + message_id, + error, + payload_attestation_message.data.slot, + ); + } + } + } + + fn handle_payload_attestation_verification_failure( + &self, + peer_id: PeerId, + message_id: MessageId, + error: PayloadAttestationError, + message_slot: Slot, + ) { + match &error { + PayloadAttestationError::FutureSlot { .. } => { + self.gossip_penalize_peer( + peer_id, + PeerAction::HighToleranceError, + "payload_attn_future_slot", + ); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + } + PayloadAttestationError::PastSlot { .. } + | PayloadAttestationError::PriorPayloadAttestationMessageKnown { .. } => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + } + PayloadAttestationError::UnknownHeadBlock { .. } => { + debug!( + %peer_id, + %message_slot, + "Payload attestation references unknown block" + ); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + } + PayloadAttestationError::NotInPTC { .. } => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + self.gossip_penalize_peer( + peer_id, + PeerAction::LowToleranceError, + "payload_attn_not_in_ptc", + ); + } + PayloadAttestationError::UnknownValidatorIndex(_) => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + self.gossip_penalize_peer( + peer_id, + PeerAction::LowToleranceError, + "payload_attn_unknown_validator", + ); + } + PayloadAttestationError::InvalidSignature => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + self.gossip_penalize_peer( + peer_id, + PeerAction::LowToleranceError, + "payload_attn_invalid_sig", + ); + } + PayloadAttestationError::BeaconChainError(_) + | PayloadAttestationError::BeaconStateError(_) => { + debug!( + %peer_id, + %message_slot, + ?error, + "Internal error verifying payload attestation" + ); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + } + } } } diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 015b6a616e..bfcff2088b 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -511,7 +511,7 @@ impl NetworkBeaconProcessor { processor.process_gossip_payload_attestation( message_id, peer_id, - *payload_attestation_message, + payload_attestation_message, ) }; diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index f9d779fd24..a9e62dbe94 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -1351,7 +1351,7 @@ where let ptc_indices: Vec = attestation .attesting_indices .iter() - .filter_map(|vi| ptc.iter().position(|&p| p == *vi as usize)) + .filter_map(|validator_index| ptc.iter().position(|&p| p == *validator_index as usize)) .collect(); // Check that all the attesters are in the PTC