From 6323cd3827b596080fa43add5b09a7adc91fd58e Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Sun, 26 Apr 2026 01:51:02 +0200 Subject: [PATCH 1/5] Fix builder exit signature batch verification logic and small refactor (#9173) We had a bug when performing batch builder exit signature verification. The EF spec tests cover this case, but the EF tests only calls individual signature verification (which is a separate code path). This PR unifies the two code paths. We should probably spend some time reviewing EF test code coverage and make sure we don't have separate code paths that do similar things. Co-Authored-By: Eitan Seri-Levi --- .../process_operations.rs | 27 +++++++++---------- .../per_block_processing/signature_sets.rs | 18 ++++++++++--- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/consensus/state_processing/src/per_block_processing/process_operations.rs b/consensus/state_processing/src/per_block_processing/process_operations.rs index f1de284fc8..422e0afe06 100644 --- a/consensus/state_processing/src/per_block_processing/process_operations.rs +++ b/consensus/state_processing/src/per_block_processing/process_operations.rs @@ -8,6 +8,7 @@ use crate::per_block_processing::builder::{ convert_validator_index_to_builder_index, is_builder_index, }; use crate::per_block_processing::errors::{BlockProcessingError, ExitInvalid, IntoWithIndex}; +use crate::per_block_processing::signature_sets::{exit_signature_set, get_pubkey_from_state}; use crate::per_block_processing::verify_payload_attestation::verify_payload_attestation; use bls::{PublicKeyBytes, SignatureBytes}; use ssz_types::FixedVector; @@ -547,7 +548,8 @@ fn process_builder_voluntary_exit( let builder_index = convert_validator_index_to_builder_index(signed_exit.message.validator_index); - let builder = state + // Verify builder is known + state .builders()? .get(builder_index as usize) .cloned() @@ -570,22 +572,17 @@ fn process_builder_voluntary_exit( )); } - // Verify signature (using EIP-7044 domain: capella_fork_version for Deneb+) if verify_signatures.is_true() { - let pubkey = builder.pubkey; - let domain = spec.compute_domain( - Domain::VoluntaryExit, - spec.capella_fork_version, - state.genesis_validators_root(), + verify!( + exit_signature_set( + state, + |i| get_pubkey_from_state(state, i), + signed_exit, + spec + )? + .verify(), + ExitInvalid::BadSignature ); - let message = signed_exit.message.signing_root(domain); - // TODO(gloas): use builder pubkey cache once available - let bls_pubkey = pubkey - .decompress() - .map_err(|_| BlockOperationError::invalid(ExitInvalid::BadSignature))?; - if !signed_exit.signature.verify(&bls_pubkey, message) { - return Err(BlockOperationError::invalid(ExitInvalid::BadSignature)); - } } // Initiate builder exit diff --git a/consensus/state_processing/src/per_block_processing/signature_sets.rs b/consensus/state_processing/src/per_block_processing/signature_sets.rs index 5c1767f227..0686c4d605 100644 --- a/consensus/state_processing/src/per_block_processing/signature_sets.rs +++ b/consensus/state_processing/src/per_block_processing/signature_sets.rs @@ -2,6 +2,7 @@ //! validated individually, or alongside in others in a potentially cheaper bulk operation. //! //! This module exposes one function to extract each type of `SignatureSet` from a `BeaconBlock`. +use super::builder::{convert_validator_index_to_builder_index, is_builder_index}; use bls::{AggregateSignature, PublicKey, PublicKeyBytes, Signature, SignatureSet}; use ssz::DecodeError; use std::borrow::Cow; @@ -503,7 +504,7 @@ pub fn deposit_pubkey_signature_message( } /// Returns a signature set that is valid if the `SignedVoluntaryExit` was signed by the indicated -/// validator. +/// validator (or builder, in the case of a builder exit). pub fn exit_signature_set<'a, E, F>( state: &'a BeaconState, get_pubkey: F, @@ -515,7 +516,18 @@ where F: Fn(usize) -> Option>, { let exit = &signed_exit.message; - let proposer_index = exit.validator_index as usize; + let validator_index = exit.validator_index; + + let is_builder_exit = + state.fork_name_unchecked().gloas_enabled() && is_builder_index(validator_index); + + let pubkey = if is_builder_exit { + let builder_index = convert_validator_index_to_builder_index(validator_index); + get_builder_pubkey_from_state(state, builder_index) + .ok_or(Error::ValidatorUnknown(validator_index))? + } else { + get_pubkey(validator_index as usize).ok_or(Error::ValidatorUnknown(validator_index))? + }; let domain = if state.fork_name_unchecked().deneb_enabled() { // EIP-7044 @@ -537,7 +549,7 @@ where Ok(SignatureSet::single_pubkey( &signed_exit.signature, - get_pubkey(proposer_index).ok_or(Error::ValidatorUnknown(proposer_index as u64))?, + pubkey, message, )) } From 276c4d5ff353fe93db306668fca7f8639a1e2ab1 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Sun, 26 Apr 2026 15:40:22 +0200 Subject: [PATCH 2/5] Gloas set `AttestationData.index` (#9100) For gloas `attestation.data.index` should be set to 1 if we are attesting to a block whose slot is not the attestation duty slot and slot payload_status is `FULL` Co-Authored-By: Eitan Seri- Levi Co-Authored-By: Eitan Seri-Levi Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com> --- beacon_node/beacon_chain/src/beacon_chain.rs | 26 +++ .../beacon_chain/src/early_attester_cache.rs | 13 ++ beacon_node/beacon_chain/src/test_utils.rs | 2 + .../tests/attestation_production.rs | 179 +++++++++++++++--- .../types/src/attestation/attestation.rs | 11 +- .../src/attestation_service.rs | 1 + 6 files changed, 209 insertions(+), 23 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 98dc9cd7fd..b556e6d849 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1956,6 +1956,7 @@ impl BeaconChain { let beacon_block_root; let beacon_state_root; let target; + let is_same_slot_attestation; let current_epoch_attesting_info: Option<(Checkpoint, usize)>; let head_timer = metrics::start_timer(&metrics::ATTESTATION_PRODUCTION_HEAD_SCRAPE_SECONDS); let head_span = debug_span!("attestation_production_head_scrape").entered(); @@ -1996,11 +1997,20 @@ impl BeaconChain { // When attesting to the head slot or later, always use the head of the chain. beacon_block_root = head.beacon_block_root; beacon_state_root = head.beacon_state_root(); + is_same_slot_attestation = request_slot == head.beacon_block.slot(); } else { // Permit attesting to slots *prior* to the current head. This is desirable when // the VC and BN are out-of-sync due to time issues or overloading. beacon_block_root = *head_state.get_block_root(request_slot)?; beacon_state_root = *head_state.get_state_root(request_slot)?; + + // Fetch the previous block root. If the previous block root equals + // the block root being attested to, the `request_slot` is a skipped slot + // and this is not a same slot attestation. + let prior_slot_root = head_state + .get_block_root(request_slot.saturating_sub(1u64)) + .ok(); + is_same_slot_attestation = prior_slot_root != Some(&beacon_block_root); }; let target_slot = request_epoch.start_slot(T::EthSpec::slots_per_epoch()); @@ -2090,6 +2100,21 @@ impl BeaconChain { ) }; + // For gloas the attestation data index indicates payload presence: + // `payload_present=false` for same-slot attestations or when payload not received. + // `payload_present=true` when attesting to a prior slot whose payload has been received. + let payload_present = if self + .spec + .fork_name_at_slot::(request_slot) + .gloas_enabled() + && !is_same_slot_attestation + { + self.canonical_head + .block_has_canonical_payload(&beacon_block_root, &self.spec)? + } else { + false + }; + Ok(Attestation::::empty_for_signing( request_index, committee_len, @@ -2097,6 +2122,7 @@ impl BeaconChain { beacon_block_root, justified_checkpoint, target, + payload_present, &self.spec, )?) } diff --git a/beacon_node/beacon_chain/src/early_attester_cache.rs b/beacon_node/beacon_chain/src/early_attester_cache.rs index 752e4d1a96..e3a83f9374 100644 --- a/beacon_node/beacon_chain/src/early_attester_cache.rs +++ b/beacon_node/beacon_chain/src/early_attester_cache.rs @@ -165,6 +165,12 @@ impl EarlyAttesterCache { /// - There is a cache `item` present. /// - If `request_slot` is in the same epoch as `item.epoch`. /// - If `request_index` does not exceed `item.committee_count`. + /// + /// Post gloas an additional condition must be met: + /// - `request_slot` is the same slot as `item.block.slot` (i.e. a same slot attestation). + /// + /// Non-same-slot Gloas attestations need `data.index` set from the canonical payload + /// status, which the cache doesn't track. Returning `None` falls through to fork choice. #[instrument(skip_all, fields(%request_slot, %request_index), level = "debug")] pub fn try_attest( &self, @@ -197,6 +203,12 @@ impl EarlyAttesterCache { item.committee_lengths .get_committee_length::(request_slot, request_index, spec)?; + let is_same_slot_attestation = request_slot == item.block.slot(); + if spec.fork_name_at_slot::(request_slot).gloas_enabled() && !is_same_slot_attestation { + return Ok(None); + } + let payload_present = false; + let attestation = Attestation::empty_for_signing( request_index, committee_len, @@ -204,6 +216,7 @@ impl EarlyAttesterCache { item.beacon_block_root, item.source, item.target, + payload_present, spec, ) .map_err(Error::AttestationError)?; diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index b657f81b1f..274f41d1cb 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -1451,6 +1451,7 @@ where epoch, root: target_root, }, + false, &self.spec, )?; @@ -1560,6 +1561,7 @@ where epoch, root: target_root, }, + false, &self.spec, )?) } diff --git a/beacon_node/beacon_chain/tests/attestation_production.rs b/beacon_node/beacon_chain/tests/attestation_production.rs index a3ab959d12..1b87fc041a 100644 --- a/beacon_node/beacon_chain/tests/attestation_production.rs +++ b/beacon_node/beacon_chain/tests/attestation_production.rs @@ -2,7 +2,9 @@ use beacon_chain::attestation_simulator::produce_unaggregated_attestation; use beacon_chain::custody_context::NodeCustodyType; -use beacon_chain::test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy}; +use beacon_chain::test_utils::{ + AttestationStrategy, BeaconChainHarness, BlockStrategy, fork_name_from_env, +}; use beacon_chain::validator_monitor::UNAGGREGATED_ATTESTATION_LAG_SLOTS; use beacon_chain::{StateSkipConfig, WhenSlotSkipped, metrics}; use bls::{AggregateSignature, Keypair}; @@ -206,7 +208,15 @@ async fn produces_attestations() { &AggregateSignature::infinity(), "bad signature" ); - assert_eq!(data.index, index, "bad index"); + if harness + .spec + .fork_name_at_slot::(data.slot) + .gloas_enabled() + { + assert!(data.index <= 1, "invalid index"); + } else { + assert_eq!(data.index, index, "bad index"); + } assert_eq!(data.slot, slot, "bad slot"); assert_eq!(data.beacon_block_root, block_root, "bad block root"); assert_eq!( @@ -226,27 +236,35 @@ async fn produces_attestations() { .build_range_sync_block_from_store_blobs(Some(block_root), Arc::new(block.clone())); let available_block = range_sync_block.into_available_block(); - let early_attestation = { - let proto_block = chain - .canonical_head - .fork_choice_read_lock() - .get_block(&block_root) - .unwrap(); - chain - .early_attester_cache - .add_head_block(block_root, &available_block, proto_block, &state) - .unwrap(); - chain - .early_attester_cache - .try_attest(slot, index, &chain.spec) - .unwrap() - .unwrap() - }; + // For Gloas non-same-slot attestations, the early attester cache returns None. + let is_same_slot_attestation = slot == block_slot; + let is_gloas = harness + .spec + .fork_name_at_slot::(slot) + .gloas_enabled(); + if !is_gloas || is_same_slot_attestation { + let early_attestation = { + let proto_block = chain + .canonical_head + .fork_choice_read_lock() + .get_block(&block_root) + .unwrap(); + chain + .early_attester_cache + .add_head_block(block_root, &available_block, proto_block, &state) + .unwrap(); + chain + .early_attester_cache + .try_attest(slot, index, &chain.spec) + .unwrap() + .unwrap() + }; - assert_eq!( - attestation, early_attestation, - "early attester cache inconsistent" - ); + assert_eq!( + attestation, early_attestation, + "early attester cache inconsistent" + ); + } } } } @@ -313,3 +331,120 @@ async fn early_attester_cache_old_request() { .unwrap(); assert_eq!(attested_block.slot(), attest_slot); } + +/// Verify that `produce_unaggregated_attestation` sets `data.index = 1` (payload_present) +/// when a gloas validator attests to a prior slot whose block+envelope have been received. +/// +/// Setup: build a chain at gloas genesis, produce a block with envelope at slot N, +/// then advance the clock to slot N+1 without producing a block (skipped slot). +/// Attesting at slot N+1 should target the block at slot N with payload_present = true. +#[tokio::test] +async fn gloas_attestation_index_payload_present() { + if fork_name_from_env().is_some_and(|f| !f.gloas_enabled()) { + return; + } + + let harness = BeaconChainHarness::builder(MainnetEthSpec) + .default_spec() + .keypairs(KEYPAIRS[..].to_vec()) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + let chain = &harness.chain; + + // Build a few blocks so the chain is established (slots 1..=3). + harness.advance_slot(); + harness + .extend_chain( + 3, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + let head = chain.head_snapshot(); + assert_eq!(head.beacon_block.slot(), Slot::new(3)); + + // Advance clock to slot 4 without producing a block (skipped slot). + harness.advance_slot(); + let attest_slot = chain.slot().unwrap(); + assert_eq!(attest_slot, Slot::new(4)); + + // Attest at slot 4 — this should target the block at slot 3 whose payload was received. + let attestation = chain + .produce_unaggregated_attestation(attest_slot, 0) + .expect("should produce attestation"); + + assert_eq!(attestation.data().slot, attest_slot); + assert_eq!( + attestation.data().index, + 1, + "gloas attestation to prior slot with payload should have index=1 (payload_present)" + ); +} + +/// Verify that `produce_unaggregated_attestation` sets `data.index = 0` (payload NOT present) +/// when a gloas validator attests to a prior slot whose block was imported but whose +/// payload envelope was never received. +/// +/// Setup: build a chain at gloas genesis through slot 2, then at slot 3 import only the +/// beacon block (no envelope), advance to slot 4 (skipped), and attest. +#[tokio::test] +async fn gloas_attestation_index_payload_absent() { + if fork_name_from_env().is_some_and(|f| !f.gloas_enabled()) { + return; + } + + let harness = BeaconChainHarness::builder(MainnetEthSpec) + .default_spec() + .keypairs(KEYPAIRS[..].to_vec()) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + let chain = &harness.chain; + + // Build slots 1..=2 normally (with envelopes). + harness.advance_slot(); + harness + .extend_chain( + 2, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + assert_eq!(chain.head_snapshot().beacon_block.slot(), Slot::new(2)); + + // Slot 3: produce and import the beacon block but do NOT process the envelope. + harness.advance_slot(); + let state = harness.get_current_state(); + let (block_contents, _envelope, _new_state) = + harness.make_block_with_envelope(state, Slot::new(3)).await; + + let block_root = block_contents.0.canonical_root(); + harness + .process_block(Slot::new(3), block_root, block_contents) + .await + .expect("block should import without envelope"); + + assert_eq!(chain.head_snapshot().beacon_block.slot(), Slot::new(3)); + + // Advance clock to slot 4 without producing a block (skipped slot). + harness.advance_slot(); + let attest_slot = chain.slot().unwrap(); + assert_eq!(attest_slot, Slot::new(4)); + + // Attest at slot 4 — targets slot 3 whose payload was NOT received. + let attestation = chain + .produce_unaggregated_attestation(attest_slot, 0) + .expect("should produce attestation"); + + assert_eq!(attestation.data().slot, attest_slot); + assert_eq!( + attestation.data().index, + 0, + "gloas attestation to prior slot without payload should have index=0 (payload_absent)" + ); +} diff --git a/consensus/types/src/attestation/attestation.rs b/consensus/types/src/attestation/attestation.rs index 693b5889f5..28059efee6 100644 --- a/consensus/types/src/attestation/attestation.rs +++ b/consensus/types/src/attestation/attestation.rs @@ -102,6 +102,7 @@ impl Hash for Attestation { impl Attestation { /// Produces an attestation with empty signature. + #[allow(clippy::too_many_arguments)] pub fn empty_for_signing( committee_index: u64, committee_length: usize, @@ -109,6 +110,7 @@ impl Attestation { beacon_block_root: Hash256, source: Checkpoint, target: Checkpoint, + payload_present: bool, spec: &ChainSpec, ) -> Result { if spec.fork_name_at_slot::(slot).electra_enabled() { @@ -116,12 +118,19 @@ impl Attestation { committee_bits .set(committee_index as usize, true) .map_err(|_| Error::InvalidCommitteeIndex)?; + // Gloas attestation data index now indicates payload presence. + // Pre-gloas index is always 0. + let index = if spec.fork_name_at_slot::(slot).gloas_enabled() && payload_present { + 1u64 + } else { + 0u64 + }; Ok(Attestation::Electra(AttestationElectra { aggregation_bits: BitList::with_capacity(committee_length) .map_err(|_| Error::InvalidCommitteeLength)?, data: AttestationData { slot, - index: 0u64, + index, beacon_block_root, source, target, diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index dc5fc27a4f..3ffe602892 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -546,6 +546,7 @@ impl AttestationService attestation, From fae7941b2d13dc9cd1ba8282aefe2798a70c7c74 Mon Sep 17 00:00:00 2001 From: Shane K Moore <41407272+shane-moore@users.noreply.github.com> Date: Sun, 26 Apr 2026 08:25:00 -0700 Subject: [PATCH 3/5] Gloas ptc duties beacon node response (#8415) Co-Authored-By: shane-moore Co-Authored-By: Eitan Seri-Levi Co-Authored-By: Eitan Seri-Levi Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com> --- beacon_node/beacon_chain/src/beacon_chain.rs | 44 ++++- beacon_node/http_api/src/lib.rs | 10 + beacon_node/http_api/src/ptc_duties.rs | 182 +++++++++++++++++++ beacon_node/http_api/src/validator/mod.rs | 38 +++- beacon_node/http_api/tests/tests.rs | 120 +++++++++++- consensus/types/src/state/beacon_state.rs | 21 +++ 6 files changed, 411 insertions(+), 4 deletions(-) create mode 100644 beacon_node/http_api/src/ptc_duties.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index b556e6d849..bfe1b404e0 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -84,8 +84,8 @@ use crate::{ use bls::{PublicKey, PublicKeyBytes, Signature}; use eth2::beacon_response::ForkVersionedResponse; use eth2::types::{ - EventKind, SseBlobSidecar, SseBlock, SseDataColumnSidecar, SseExtendedPayloadAttributes, - SseHead, + EventKind, PtcDuty, SseBlobSidecar, SseBlock, SseDataColumnSidecar, + SseExtendedPayloadAttributes, SseHead, }; use execution_layer::{ BlockProposalContents, BlockProposalContentsType, BuilderParams, ChainHealth, ExecutionLayer, @@ -1719,6 +1719,46 @@ impl BeaconChain { Ok((duties, dependent_root, execution_status)) } + /// Get PTC duties for validators at a given epoch. + /// + /// TODO(gloas): per-validator `get_ptc_assignment` makes this O(N * slots_per_epoch * PTCSize). + /// A future ptc cache (or a single-pass `ptc_window` walk) can drop this to + /// O(slots_per_epoch * PTCSize + N). + pub fn compute_ptc_duties( + &self, + state: &BeaconState, + epoch: Epoch, + validator_indices: &[u64], + dependent_block_root: Hash256, + ) -> Result<(Vec>, Hash256), Error> { + // The ptc_window only covers previous, current, and next epochs. + let relative_epoch = RelativeEpoch::from_epoch(state.current_epoch(), epoch) + .map_err(Error::IncorrectStateForAttestation)?; + + let dependent_root = + state.attester_shuffling_decision_root(dependent_block_root, relative_epoch)?; + + let pubkey_cache = self.validator_pubkey_cache.read(); + + let duties = validator_indices + .iter() + .map(|&validator_index| -> Result, Error> { + let Some(&pubkey) = pubkey_cache.get_pubkey_bytes(validator_index as usize) else { + return Ok(None); + }; + let slot_opt = + state.get_ptc_assignment(validator_index as usize, epoch, &self.spec)?; + Ok(slot_opt.map(|slot| PtcDuty { + validator_index, + slot, + pubkey, + })) + }) + .collect::, _>>()?; + + Ok((duties, dependent_root)) + } + pub fn get_aggregated_attestation( &self, attestation: AttestationRef, diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 0be631c057..bd80dd1e82 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -19,6 +19,7 @@ mod metrics; mod peer; mod produce_block; mod proposer_duties; +mod ptc_duties; mod publish_attestations; mod publish_blocks; mod standard_block_rewards; @@ -2560,6 +2561,14 @@ pub fn serve( task_spawner_filter.clone(), ); + // POST validator/duties/ptc/{epoch} + let post_validator_duties_ptc = post_validator_duties_ptc( + eth_v1.clone(), + chain_filter.clone(), + not_while_syncing_filter.clone(), + task_spawner_filter.clone(), + ); + // POST validator/duties/sync/{epoch} let post_validator_duties_sync = post_validator_duties_sync( eth_v1.clone(), @@ -3410,6 +3419,7 @@ pub fn serve( .uor(post_beacon_rewards_attestations) .uor(post_beacon_rewards_sync_committee) .uor(post_validator_duties_attester) + .uor(post_validator_duties_ptc) .uor(post_validator_duties_sync) .uor(post_validator_aggregate_and_proofs) .uor(post_validator_contribution_and_proofs) diff --git a/beacon_node/http_api/src/ptc_duties.rs b/beacon_node/http_api/src/ptc_duties.rs new file mode 100644 index 0000000000..f727b84004 --- /dev/null +++ b/beacon_node/http_api/src/ptc_duties.rs @@ -0,0 +1,182 @@ +//! Contains the handler for the `POST validator/duties/ptc/{epoch}` endpoint. + +use crate::state_id::StateId; +use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; +use eth2::types::{self as api_types, PtcDuty}; +use slot_clock::SlotClock; +use state_processing::state_advance::partial_state_advance; +use types::{BeaconState, ChainSpec, Epoch, EthSpec, Hash256}; + +type ApiDuties = api_types::DutiesResponse>; + +pub fn ptc_duties( + request_epoch: Epoch, + request_indices: &[u64], + chain: &BeaconChain, +) -> Result { + let current_epoch = chain + .slot_clock + .now_or_genesis() + .map(|slot| slot.epoch(T::EthSpec::slots_per_epoch())) + .ok_or(BeaconChainError::UnableToReadSlot) + .map_err(warp_utils::reject::unhandled_error)?; + + let tolerant_current_epoch = if chain.slot_clock.is_prior_to_genesis().unwrap_or(true) { + current_epoch + } else { + chain + .slot_clock + .now_with_future_tolerance(chain.spec.maximum_gossip_clock_disparity()) + .ok_or_else(|| { + warp_utils::reject::custom_server_error("unable to read slot clock".into()) + })? + .epoch(T::EthSpec::slots_per_epoch()) + }; + + let is_within_clock_tolerance = request_epoch == current_epoch + || request_epoch == current_epoch + 1 + || request_epoch == tolerant_current_epoch + 1; + + if is_within_clock_tolerance { + let head_epoch = chain + .canonical_head + .cached_head() + .snapshot + .beacon_state + .current_epoch(); + + let head_can_serve_request = request_epoch == head_epoch || request_epoch == head_epoch + 1; + + if head_can_serve_request { + compute_ptc_duties_from_cached_head(request_epoch, request_indices, chain) + } else { + compute_ptc_duties_from_state(request_epoch, request_indices, chain) + } + } else if request_epoch > current_epoch + 1 { + Err(warp_utils::reject::custom_bad_request(format!( + "request epoch {} is more than one epoch past the current epoch {}", + request_epoch, current_epoch + ))) + } else { + compute_ptc_duties_from_state(request_epoch, request_indices, chain) + } +} + +fn compute_ptc_duties_from_cached_head( + request_epoch: Epoch, + request_indices: &[u64], + chain: &BeaconChain, +) -> Result { + let (cached_head, execution_status) = chain + .canonical_head + .head_and_execution_status() + .map_err(warp_utils::reject::unhandled_error)?; + let state = &cached_head.snapshot.beacon_state; + let head_block_root = cached_head.head_block_root(); + + let (duties, dependent_root) = chain + .compute_ptc_duties(state, request_epoch, request_indices, head_block_root) + .map_err(warp_utils::reject::unhandled_error)?; + + convert_to_api_response( + duties, + dependent_root, + execution_status.is_optimistic_or_invalid(), + ) +} + +fn compute_ptc_duties_from_state( + request_epoch: Epoch, + request_indices: &[u64], + chain: &BeaconChain, +) -> Result { + let state_opt = { + let (cached_head, execution_status) = chain + .canonical_head + .head_and_execution_status() + .map_err(warp_utils::reject::unhandled_error)?; + let head = &cached_head.snapshot; + + if head.beacon_state.current_epoch() <= request_epoch { + Some(( + head.beacon_state_root(), + head.beacon_state.clone(), + execution_status.is_optimistic_or_invalid(), + )) + } else { + None + } + }; + + let (state, execution_optimistic) = + if let Some((state_root, mut state, execution_optimistic)) = state_opt { + ensure_state_knows_ptc_duties_for_epoch( + &mut state, + state_root, + request_epoch, + &chain.spec, + )?; + (state, execution_optimistic) + } else { + let (state, execution_optimistic, _finalized) = + StateId::from_slot(request_epoch.start_slot(T::EthSpec::slots_per_epoch())) + .state(chain)?; + (state, execution_optimistic) + }; + + if !(state.current_epoch() == request_epoch || state.current_epoch() + 1 == request_epoch) { + return Err(warp_utils::reject::custom_server_error(format!( + "state epoch {} not suitable for request epoch {}", + state.current_epoch(), + request_epoch + ))); + } + + let (duties, dependent_root) = chain + .compute_ptc_duties( + &state, + request_epoch, + request_indices, + chain.genesis_block_root, + ) + .map_err(warp_utils::reject::unhandled_error)?; + + convert_to_api_response(duties, dependent_root, execution_optimistic) +} + +fn ensure_state_knows_ptc_duties_for_epoch( + state: &mut BeaconState, + state_root: Hash256, + target_epoch: Epoch, + spec: &ChainSpec, +) -> Result<(), warp::reject::Rejection> { + if state.current_epoch() > target_epoch { + return Err(warp_utils::reject::custom_server_error(format!( + "state epoch {} is later than target epoch {}", + state.current_epoch(), + target_epoch + ))); + } else if state.current_epoch() + 1 < target_epoch { + let target_slot = target_epoch + .saturating_sub(1_u64) + .start_slot(E::slots_per_epoch()); + + partial_state_advance(state, Some(state_root), target_slot, spec) + .map_err(BeaconChainError::from) + .map_err(warp_utils::reject::unhandled_error)?; + } + + Ok(()) +} + +fn convert_to_api_response( + duties: Vec>, + dependent_root: Hash256, + execution_optimistic: bool, +) -> Result { + Ok(api_types::DutiesResponse { + dependent_root, + execution_optimistic: Some(execution_optimistic), + data: duties.into_iter().flatten().collect(), + }) +} diff --git a/beacon_node/http_api/src/validator/mod.rs b/beacon_node/http_api/src/validator/mod.rs index 7349aa4db0..27fe5de6e7 100644 --- a/beacon_node/http_api/src/validator/mod.rs +++ b/beacon_node/http_api/src/validator/mod.rs @@ -7,7 +7,7 @@ use crate::utils::{ ResponseFilter, TaskSpawnerFilter, ValidatorSubscriptionTxFilter, publish_network_message, }; use crate::version::{V1, V2, V3, unsupported_version_rejection}; -use crate::{StateId, attester_duties, proposer_duties, sync_committees}; +use crate::{StateId, attester_duties, proposer_duties, ptc_duties, sync_committees}; use beacon_chain::attestation_verification::VerifiedAttestation; use beacon_chain::{AttestationError, BeaconChain, BeaconChainError, BeaconChainTypes}; use bls::PublicKeyBytes; @@ -168,6 +168,42 @@ pub fn post_validator_duties_attester( .boxed() } +// POST validator/duties/ptc/{epoch} +pub fn post_validator_duties_ptc( + eth_v1: EthV1Filter, + chain_filter: ChainFilter, + not_while_syncing_filter: NotWhileSyncingFilter, + task_spawner_filter: TaskSpawnerFilter, +) -> ResponseFilter { + eth_v1 + .and(warp::path("validator")) + .and(warp::path("duties")) + .and(warp::path("ptc")) + .and(warp::path::param::().or_else(|_| async { + Err(warp_utils::reject::custom_bad_request( + "Invalid epoch".to_string(), + )) + })) + .and(warp::path::end()) + .and(not_while_syncing_filter.clone()) + .and(warp_utils::json::json()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .then( + |epoch: Epoch, + not_synced_filter: Result<(), Rejection>, + indices: ValidatorIndexData, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P0, move || { + not_synced_filter?; + ptc_duties::ptc_duties(epoch, &indices.0, &chain) + }) + }, + ) + .boxed() +} + // GET validator/aggregate_attestation?attestation_data_root,slot pub fn get_validator_aggregate_attestation( any_version: AnyVersionFilter, diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 2dd4c28040..aac3384fbd 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -3474,7 +3474,6 @@ impl ApiTester { self } - // TODO(EIP-7732): Add test_get_validator_duties_ptc function to test PTC duties endpoint pub async fn test_get_validator_duties_proposer_v2(self) -> Self { let current_epoch = self.chain.epoch().unwrap(); @@ -3598,6 +3597,17 @@ impl ApiTester { "should not get attester duties outside of tolerance" ); + assert_eq!( + self.client + .post_validator_duties_ptc(next_epoch, &[0]) + .await + .unwrap_err() + .status() + .map(Into::into), + Some(400), + "should not get ptc duties outside of tolerance" + ); + self.chain.slot_clock.set_current_time( current_epoch_start - self.chain.spec.maximum_gossip_clock_disparity(), ); @@ -3621,6 +3631,88 @@ impl ApiTester { .await .expect("should get attester duties within tolerance"); + self.client + .post_validator_duties_ptc(next_epoch, &[0]) + .await + .expect("should get ptc duties within tolerance"); + + self + } + + pub async fn test_get_validator_duties_ptc(self) -> Self { + let current_epoch = self.chain.epoch().unwrap().as_u64(); + + let half = current_epoch / 2; + let first = current_epoch - half; + let last = current_epoch + half; + + for epoch in first..=last { + for indices in self.interesting_validator_indices() { + let epoch = Epoch::from(epoch); + + // The endpoint does not allow getting duties past the next epoch. + if epoch > current_epoch + 1 { + assert_eq!( + self.client + .post_validator_duties_ptc(epoch, indices.as_slice()) + .await + .unwrap_err() + .status() + .map(Into::into), + Some(400) + ); + continue; + } + + let results = self + .client + .post_validator_duties_ptc(epoch, indices.as_slice()) + .await + .unwrap(); + + let dependent_root = self + .chain + .block_root_at_slot( + (epoch - 1).start_slot(E::slots_per_epoch()) - 1, + WhenSlotSkipped::Prev, + ) + .unwrap() + .unwrap_or(self.chain.head_beacon_block_root()); + + assert_eq!(results.dependent_root, dependent_root); + + let result_duties = results.data; + + let state = self + .chain + .state_at_slot( + epoch.start_slot(E::slots_per_epoch()), + StateSkipConfig::WithStateRoots, + ) + .unwrap(); + + let expected_duties: Vec = indices + .iter() + .filter_map(|&validator_index| { + let validator = state.validators().get(validator_index as usize)?; + let slot = state + .get_ptc_assignment(validator_index as usize, epoch, &self.chain.spec) + .unwrap()?; + Some(PtcDuty { + pubkey: validator.pubkey, + validator_index, + slot, + }) + }) + .collect(); + + assert_eq!( + result_duties, expected_duties, + "ptc duties should exactly match state assignments" + ); + } + } + self } @@ -7871,6 +7963,9 @@ async fn get_light_client_finality_update() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_duties_early() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } ApiTester::new() .await .test_get_validator_duties_early() @@ -7936,6 +8031,29 @@ async fn get_validator_duties_proposer_v2_with_skip_slots() { .await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn get_validator_duties_ptc() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + ApiTester::new_with_hard_forks() + .await + .test_get_validator_duties_ptc() + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn get_validator_duties_ptc_with_skip_slots() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + ApiTester::new_with_hard_forks() + .await + .skip_slots(E::slots_per_epoch() * 2) + .test_get_validator_duties_ptc() + .await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn block_production() { ApiTester::new().await.test_block_production().await; diff --git a/consensus/types/src/state/beacon_state.rs b/consensus/types/src/state/beacon_state.rs index 7e2b3096a8..7ed3121d6e 100644 --- a/consensus/types/src/state/beacon_state.rs +++ b/consensus/types/src/state/beacon_state.rs @@ -3198,6 +3198,27 @@ impl BeaconState { Ok(hash(&preimage)) } + /// Find the first slot in the given epoch where the validator is assigned to the PTC. + /// + /// Returns `Ok(Some(slot))` if the validator is in the PTC for any slot in the epoch, + /// `Ok(None)` if the validator is not in the PTC for this epoch. + /// + /// This iterates through all slots in the epoch, so it's O(slots_per_epoch) per validator. + pub fn get_ptc_assignment( + &self, + validator_index: usize, + epoch: Epoch, + spec: &ChainSpec, + ) -> Result, BeaconStateError> { + for slot in epoch.slot_iter(E::slots_per_epoch()) { + let ptc = self.get_ptc(slot, spec)?; + if ptc.0.contains(&validator_index) { + return Ok(Some(slot)); + } + } + Ok(None) + } + /// Return size indices sampled by effective balance, using indices as candidates. /// /// If shuffle_indices is True, candidate indices are themselves sampled from indices From 6ab48a76f0aab997dd7a818d8b02541d197e1746 Mon Sep 17 00:00:00 2001 From: hopinheimer <48147533+hopinheimer@users.noreply.github.com> Date: Mon, 27 Apr 2026 05:51:20 -0400 Subject: [PATCH 4/5] Gloas `PayloadAttestation` gossip verification (#9145) Co-Authored-By: hopinheimer Co-Authored-By: hopinheimer <48147533+hopinheimer@users.noreply.github.com> Co-Authored-By: Eitan Seri-Levi Co-Authored-By: Jimmy Chen --- beacon_node/beacon_chain/src/beacon_chain.rs | 22 +- beacon_node/beacon_chain/src/builder.rs | 1 + beacon_node/beacon_chain/src/lib.rs | 1 + beacon_node/beacon_chain/src/metrics.rs | 21 + .../beacon_chain/src/observed_attesters.rs | 42 ++ .../gossip_verified_payload_attestation.rs | 255 +++++++++++ .../payload_attestation_verification/mod.rs | 110 +++++ .../payload_attestation_verification/tests.rs | 422 ++++++++++++++++++ .../gossip_methods.rs | 152 ++++++- .../src/network_beacon_processor/mod.rs | 2 +- consensus/fork_choice/src/fork_choice.rs | 2 +- 11 files changed, 1014 insertions(+), 16 deletions(-) create mode 100644 beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs create mode 100644 beacon_node/beacon_chain/src/payload_attestation_verification/mod.rs create mode 100644 beacon_node/beacon_chain/src/payload_attestation_verification/tests.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index bfe1b404e0..cf5afb089a 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -53,7 +53,8 @@ use crate::observed_aggregates::{ Error as AttestationObservationError, ObservedAggregateAttestations, ObservedSyncContributions, }; use crate::observed_attesters::{ - ObservedAggregators, ObservedAttesters, ObservedSyncAggregators, ObservedSyncContributors, + ObservedAggregators, ObservedAttesters, ObservedPayloadAttesters, ObservedSyncAggregators, + ObservedSyncContributors, }; use crate::observed_block_producers::ObservedBlockProducers; use crate::observed_data_sidecars::ObservedDataSidecars; @@ -418,6 +419,9 @@ pub struct BeaconChain { /// Maintains a record of which validators have been seen to create `SignedContributionAndProofs` /// in recent epochs. pub(crate) observed_sync_aggregators: RwLock>, + /// Maintains a record of which validators have sent payload attestation messages + /// in recent slots. + pub(crate) observed_payload_attesters: RwLock>, /// Maintains a record of which validators have proposed blocks for each slot. pub observed_block_producers: RwLock>, /// Maintains a record of blob sidecars seen over the gossip network. @@ -2308,6 +2312,22 @@ impl BeaconChain { }) } + pub fn apply_payload_attestation_to_fork_choice( + &self, + indexed_payload_attestation: &IndexedPayloadAttestation, + ptc: &PTC, + ) -> Result<(), Error> { + self.canonical_head + .fork_choice_write_lock() + .on_payload_attestation( + self.slot()?, + indexed_payload_attestation, + AttestationFromBlock::False, + &ptc.0, + ) + .map_err(Into::into) + } + /// Accepts some `SyncCommitteeMessage` from the network and attempts to verify it, returning `Ok(_)` if /// it is valid to be (re)broadcast on the gossip network. pub fn verify_sync_committee_message_for_gossip( diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 19eb1aa877..d70561db9b 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -1015,6 +1015,7 @@ where observed_aggregators: <_>::default(), // TODO: allow for persisting and loading the pool from disk. observed_sync_aggregators: <_>::default(), + observed_payload_attesters: <_>::default(), // TODO: allow for persisting and loading the pool from disk. observed_block_producers: <_>::default(), observed_column_sidecars: RwLock::new(ObservedDataSidecars::new(self.spec.clone())), diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 7631e6b904..d70fc1b3ec 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -44,6 +44,7 @@ pub mod observed_data_sidecars; pub mod observed_operations; mod observed_slashable; pub mod partial_data_column_assembler; +pub mod payload_attestation_verification; pub mod payload_bid_verification; pub mod payload_envelope_streamer; pub mod payload_envelope_verification; diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index ce136ef3fc..43c3337bc9 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -1468,6 +1468,27 @@ pub static SYNC_MESSAGE_GOSSIP_VERIFICATION_TIMES: LazyLock> = "Full runtime of sync contribution gossip verification", ) }); +pub static PAYLOAD_ATTESTATION_PROCESSING_REQUESTS: LazyLock> = + LazyLock::new(|| { + try_create_int_counter( + "beacon_payload_attestation_processing_requests_total", + "Count of all payload attestation messages submitted for processing", + ) + }); +pub static PAYLOAD_ATTESTATION_PROCESSING_SUCCESSES: LazyLock> = + LazyLock::new(|| { + try_create_int_counter( + "beacon_payload_attestation_processing_successes_total", + "Number of payload attestation messages verified for gossip", + ) + }); +pub static PAYLOAD_ATTESTATION_GOSSIP_VERIFICATION_TIMES: LazyLock> = + LazyLock::new(|| { + try_create_histogram( + "beacon_payload_attestation_gossip_verification_seconds", + "Full runtime of payload attestation gossip verification", + ) + }); pub static SYNC_MESSAGE_EQUIVOCATIONS: LazyLock> = LazyLock::new(|| { try_create_int_counter( "sync_message_equivocations_total", diff --git a/beacon_node/beacon_chain/src/observed_attesters.rs b/beacon_node/beacon_chain/src/observed_attesters.rs index 277bf38ffc..4bb536880c 100644 --- a/beacon_node/beacon_chain/src/observed_attesters.rs +++ b/beacon_node/beacon_chain/src/observed_attesters.rs @@ -42,6 +42,8 @@ pub type ObservedSyncContributors = pub type ObservedAggregators = AutoPruningEpochContainer; pub type ObservedSyncAggregators = AutoPruningSlotContainer; +pub type ObservedPayloadAttesters = + AutoPruningSlotContainer, E>; #[derive(Debug, PartialEq)] pub enum Error { @@ -255,6 +257,46 @@ impl Item<()> for SyncAggregatorSlotHashSet { } } +/// Stores a `HashSet` of validator indices that have sent a payload attestation gossip +/// message during a slot. +pub struct PayloadAttesterSlotHashSet { + set: HashSet, + phantom: PhantomData, +} + +impl Item<()> for PayloadAttesterSlotHashSet { + fn with_capacity(capacity: usize) -> Self { + Self { + set: HashSet::with_capacity(capacity), + phantom: PhantomData, + } + } + + /// Defaults to `PTC_SIZE`, the maximum number of payload attesters per slot. + fn default_capacity() -> usize { + E::ptc_size() + } + + fn len(&self) -> usize { + self.set.len() + } + + fn validator_count(&self) -> usize { + self.set.len() + } + + /// Inserts the `validator_index` in the set. Returns `true` if the `validator_index` was + /// already in the set. + fn insert(&mut self, validator_index: usize, _value: ()) -> bool { + !self.set.insert(validator_index) + } + + /// Returns `true` if the `validator_index` is in the set. + fn get(&self, validator_index: usize) -> Option<()> { + self.set.contains(&validator_index).then_some(()) + } +} + /// A container that stores some number of `T` items. /// /// This container is "auto-pruning" since it gets an idea of the current slot by which diff --git a/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs b/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs new file mode 100644 index 0000000000..2d9fce812e --- /dev/null +++ b/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs @@ -0,0 +1,255 @@ +use super::Error; +use crate::beacon_chain::BeaconStore; +use crate::canonical_head::CanonicalHead; +use crate::observed_attesters::ObservedPayloadAttesters; +use crate::validator_pubkey_cache::ValidatorPubkeyCache; +use crate::{BeaconChain, BeaconChainError, BeaconChainTypes, metrics}; +use bls::AggregateSignature; +use educe::Educe; +use parking_lot::RwLock; +use safe_arith::SafeArith; +use slot_clock::SlotClock; +use state_processing::per_block_processing::signature_sets::indexed_payload_attestation_signature_set; +use state_processing::state_advance::partial_state_advance; +use std::borrow::Cow; +use types::{ChainSpec, EthSpec, IndexedPayloadAttestation, PTC, PayloadAttestationMessage, Slot}; + +pub struct GossipVerificationContext<'a, T: BeaconChainTypes> { + pub slot_clock: &'a T::SlotClock, + pub spec: &'a ChainSpec, + pub observed_payload_attesters: &'a RwLock>, + pub canonical_head: &'a CanonicalHead, + pub validator_pubkey_cache: &'a RwLock>, + pub store: &'a BeaconStore, +} + +/// A `PayloadAttestationMessage` that has been verified for propagation on the gossip network. +#[derive(Educe)] +#[educe(Clone, Debug)] +pub struct VerifiedPayloadAttestationMessage { + payload_attestation_message: PayloadAttestationMessage, + indexed_payload_attestation: IndexedPayloadAttestation, + ptc: PTC, +} + +impl VerifiedPayloadAttestationMessage { + pub fn new( + payload_attestation_message: PayloadAttestationMessage, + ctx: &GossipVerificationContext<'_, T>, + ) -> Result { + let slot = payload_attestation_message.data.slot; + let validator_index = payload_attestation_message.validator_index; + + // [IGNORE] `data.slot` is within the `MAXIMUM_GOSSIP_CLOCK_DISPARITY` allowance. + verify_propagation_slot_range(ctx.slot_clock, slot, ctx.spec)?; + + // [IGNORE] There has been no other valid payload attestation message for this + // validator index. + if ctx + .observed_payload_attesters + .read() + .validator_has_been_observed(slot, validator_index as usize) + .map_err(BeaconChainError::from)? + { + return Err(Error::PriorPayloadAttestationMessageKnown { + validator_index, + slot, + }); + } + + // [IGNORE] `data.beacon_block_root` has been seen + // [REJECT] `data.beacon_block_root` passes validation. + // + // TODO(gloas): These two conditions are conflated. We need a status table to + // differentiate between: + // 1. Blocks we haven't seen (IGNORE), and + // 2. Blocks we've seen that are invalid (REJECT). + // Presently both cases return IGNORE. + let beacon_block_root = payload_attestation_message.data.beacon_block_root; + if ctx + .canonical_head + .fork_choice_read_lock() + .get_block(&beacon_block_root) + .is_none() + { + return Err(Error::UnknownHeadBlock { beacon_block_root }); + } + + // Get head state for PTC computation. If the cached head state is too stale + // (e.g. during liveness failures with many skipped slots), fall back to loading + // a more recent state from the store and advancing it if necessary. + let head = ctx.canonical_head.cached_head(); + let head_state = &head.snapshot.beacon_state; + + let message_epoch = slot.epoch(T::EthSpec::slots_per_epoch()); + let state_epoch = head_state.current_epoch(); + + // get_ptc can serve epochs in [state_epoch - 1, state_epoch + min_seed_lookahead]. + // If the message epoch is beyond that range, the head state is stale. + let advanced_state = if message_epoch + > state_epoch + .safe_add(ctx.spec.min_seed_lookahead) + .map_err(BeaconChainError::from)? + { + let head_block_root = head.head_block_root(); + let target_slot = message_epoch.start_slot(T::EthSpec::slots_per_epoch()); + + let (state_root, mut state) = ctx + .store + .get_advanced_hot_state( + head_block_root, + target_slot, + head.snapshot.beacon_state_root(), + ) + .map_err(BeaconChainError::from)? + .ok_or(BeaconChainError::MissingBeaconState( + head.snapshot.beacon_state_root(), + ))?; + + if state + .current_epoch() + .safe_add(ctx.spec.min_seed_lookahead) + .map_err(BeaconChainError::from)? + < message_epoch + { + partial_state_advance(&mut state, Some(state_root), target_slot, ctx.spec) + .map_err(BeaconChainError::from)?; + } + + Some(state) + } else { + None + }; + + let state = advanced_state.as_ref().unwrap_or(head_state); + + // [REJECT] `validator_index` is within `get_ptc(state, data.slot)`. + let ptc = state.get_ptc(slot, ctx.spec)?; + if !ptc.0.contains(&(validator_index as usize)) { + return Err(Error::NotInPTC { + validator_index, + slot, + }); + } + + // Build the indexed form for signature verification and downstream fork choice. + let indexed_payload_attestation = IndexedPayloadAttestation { + attesting_indices: vec![validator_index] + .try_into() + .map_err(|_| Error::UnknownValidatorIndex(validator_index))?, + data: payload_attestation_message.data.clone(), + signature: AggregateSignature::from(&payload_attestation_message.signature), + }; + + { + // [REJECT] The signature is valid with respect to the `validator_index`. + let pubkey_cache = ctx.validator_pubkey_cache.read(); + let signature_set = indexed_payload_attestation_signature_set( + state, + |validator_index| pubkey_cache.get(validator_index).map(Cow::Borrowed), + &indexed_payload_attestation.signature, + &indexed_payload_attestation, + ctx.spec, + ) + .map_err(|_| Error::UnknownValidatorIndex(validator_index))?; + + if !signature_set.verify() { + return Err(Error::InvalidSignature); + } + } + + // Record that we have received a valid payload attestation message from this + // validator. Double check with the write lock to handle race conditions. + if ctx + .observed_payload_attesters + .write() + .observe_validator(slot, validator_index as usize, ()) + .map_err(BeaconChainError::from)? + { + return Err(Error::PriorPayloadAttestationMessageKnown { + validator_index, + slot, + }); + } + + Ok(Self { + payload_attestation_message, + indexed_payload_attestation, + ptc, + }) + } + + pub fn payload_attestation_message(&self) -> &PayloadAttestationMessage { + &self.payload_attestation_message + } + + pub fn indexed_payload_attestation(&self) -> &IndexedPayloadAttestation { + &self.indexed_payload_attestation + } + + pub fn ptc(&self) -> &PTC { + &self.ptc + } + + pub fn into_payload_attestation_message(self) -> PayloadAttestationMessage { + self.payload_attestation_message + } +} + +impl BeaconChain { + pub fn payload_attestation_gossip_context(&self) -> GossipVerificationContext<'_, T> { + GossipVerificationContext { + slot_clock: &self.slot_clock, + spec: &self.spec, + observed_payload_attesters: &self.observed_payload_attesters, + canonical_head: &self.canonical_head, + validator_pubkey_cache: &self.validator_pubkey_cache, + store: &self.store, + } + } + + pub fn verify_payload_attestation_message_for_gossip( + &self, + payload_attestation_message: PayloadAttestationMessage, + ) -> Result, Error> { + metrics::inc_counter(&metrics::PAYLOAD_ATTESTATION_PROCESSING_REQUESTS); + let _timer = metrics::start_timer(&metrics::PAYLOAD_ATTESTATION_GOSSIP_VERIFICATION_TIMES); + + let ctx = self.payload_attestation_gossip_context(); + VerifiedPayloadAttestationMessage::new(payload_attestation_message, &ctx).inspect(|_| { + metrics::inc_counter(&metrics::PAYLOAD_ATTESTATION_PROCESSING_SUCCESSES); + }) + } +} + +/// Verify that the `slot` is within the acceptable gossip propagation range, with reference +/// to the current slot of the clock. +/// +/// Accounts for `MAXIMUM_GOSSIP_CLOCK_DISPARITY`. +fn verify_propagation_slot_range( + slot_clock: &S, + message_slot: Slot, + spec: &ChainSpec, +) -> Result<(), Error> { + let latest_permissible_slot = slot_clock + .now_with_future_tolerance(spec.maximum_gossip_clock_disparity()) + .ok_or(BeaconChainError::UnableToReadSlot)?; + if message_slot > latest_permissible_slot { + return Err(Error::FutureSlot { + message_slot, + latest_permissible_slot, + }); + } + + let earliest_permissible_slot = slot_clock + .now_with_past_tolerance(spec.maximum_gossip_clock_disparity()) + .ok_or(BeaconChainError::UnableToReadSlot)?; + if message_slot < earliest_permissible_slot { + return Err(Error::PastSlot { + message_slot, + earliest_permissible_slot, + }); + } + + Ok(()) +} diff --git a/beacon_node/beacon_chain/src/payload_attestation_verification/mod.rs b/beacon_node/beacon_chain/src/payload_attestation_verification/mod.rs new file mode 100644 index 0000000000..477527c0aa --- /dev/null +++ b/beacon_node/beacon_chain/src/payload_attestation_verification/mod.rs @@ -0,0 +1,110 @@ +//! Provides verification for `PayloadAttestationMessage` received from the gossip network. +//! +//! ```ignore +//! types::PayloadAttestationMessage +//! | +//! ▼ +//! VerifiedPayloadAttestationMessage +//! ``` + +use crate::BeaconChainError; +use strum::AsRefStr; +use types::{BeaconStateError, Hash256, Slot}; + +pub mod gossip_verified_payload_attestation; + +pub use gossip_verified_payload_attestation::{ + GossipVerificationContext, VerifiedPayloadAttestationMessage, +}; + +/// Returned when a payload attestation message was not successfully verified. It might not have +/// been verified for two reasons: +/// +/// - The message is malformed or inappropriate for the context (indicated by all variants +/// other than `BeaconChainError`). +/// - The application encountered an internal error whilst attempting to determine validity +/// (the `BeaconChainError` variant) +#[derive(Debug, AsRefStr)] +pub enum Error { + /// The payload attestation message is from a slot that is later than the current slot + /// (with respect to the gossip clock disparity). + /// + /// ## Peer scoring + /// + /// Assuming the local clock is correct, the peer has sent an invalid message. + FutureSlot { + message_slot: Slot, + latest_permissible_slot: Slot, + }, + /// The payload attestation message is from a slot that is prior to the earliest + /// permissible slot (with respect to the gossip clock disparity). + /// + /// ## Peer scoring + /// + /// Assuming the local clock is correct, the peer has sent an invalid message. + PastSlot { + message_slot: Slot, + earliest_permissible_slot: Slot, + }, + /// We have already observed a valid payload attestation message from this validator + /// for this slot. + /// + /// ## Peer scoring + /// + /// The peer is not necessarily faulty. + PriorPayloadAttestationMessageKnown { validator_index: u64, slot: Slot }, + /// The beacon block referenced by the payload attestation message is not known. + /// + /// ## Peer scoring + /// + /// The attestation points to a block we have not yet imported. It's unclear if the + /// attestation is valid or not. + UnknownHeadBlock { beacon_block_root: Hash256 }, + /// The validator index is not a member of the PTC for this slot. + /// + /// ## Peer scoring + /// + /// The peer has sent an invalid message. + NotInPTC { validator_index: u64, slot: Slot }, + /// The validator index is unknown. + /// + /// ## Peer scoring + /// + /// The peer has sent an invalid message. + UnknownValidatorIndex(u64), + /// The signature on the payload attestation message is invalid. + /// + /// ## Peer scoring + /// + /// The peer has sent an invalid message. + InvalidSignature, + /// There was an error whilst processing the payload attestation message. It is not known + /// if it is valid or invalid. + /// + /// ## Peer scoring + /// + /// We were unable to process this message due to an internal error. It's unclear if the + /// message is valid. + BeaconChainError(Box), + /// An error reading beacon state. + /// + /// ## Peer scoring + /// + /// We were unable to process this message due to an internal error. + BeaconStateError(BeaconStateError), +} + +impl From for Error { + fn from(e: BeaconChainError) -> Self { + Error::BeaconChainError(Box::new(e)) + } +} + +impl From for Error { + fn from(e: BeaconStateError) -> Self { + Error::BeaconStateError(e) + } +} + +#[cfg(test)] +mod tests; diff --git a/beacon_node/beacon_chain/src/payload_attestation_verification/tests.rs b/beacon_node/beacon_chain/src/payload_attestation_verification/tests.rs new file mode 100644 index 0000000000..7faad98e55 --- /dev/null +++ b/beacon_node/beacon_chain/src/payload_attestation_verification/tests.rs @@ -0,0 +1,422 @@ +use std::sync::Arc; +use std::time::Duration; + +use bls::{Keypair, Signature}; +use fork_choice::ForkChoice; +use genesis::{generate_deterministic_keypairs, interop_genesis_state}; +use parking_lot::RwLock; +use proto_array::PayloadStatus; +use slot_clock::{SlotClock, TestingSlotClock}; +use state_processing::AllCaches; +use state_processing::genesis::genesis_block; +use store::{HotColdDB, StoreConfig}; +use types::{ + ChainSpec, Checkpoint, Domain, Epoch, EthSpec, Hash256, MinimalEthSpec, PayloadAttestationData, + PayloadAttestationMessage, SignedBeaconBlock, SignedRoot, Slot, +}; + +use crate::{ + beacon_fork_choice_store::BeaconForkChoiceStore, + beacon_snapshot::BeaconSnapshot, + canonical_head::CanonicalHead, + observed_attesters::ObservedPayloadAttesters, + payload_attestation_verification::{ + Error as PayloadAttestationError, + gossip_verified_payload_attestation::{ + GossipVerificationContext, VerifiedPayloadAttestationMessage, + }, + }, + test_utils::{BeaconChainHarness, EphemeralHarnessType, fork_name_from_env, test_spec}, + validator_pubkey_cache::ValidatorPubkeyCache, +}; + +type E = MinimalEthSpec; +type T = EphemeralHarnessType; + +const NUM_VALIDATORS: usize = 64; + +struct TestContext { + canonical_head: CanonicalHead, + observed_payload_attesters: RwLock>, + validator_pubkey_cache: RwLock>, + slot_clock: TestingSlotClock, + keypairs: Vec, + spec: ChainSpec, + genesis_block_root: Hash256, + store: Arc, store::MemoryStore>>, +} + +impl TestContext { + fn new() -> Self { + let spec = test_spec::(); + let store = Arc::new( + HotColdDB::open_ephemeral(StoreConfig::default(), Arc::new(spec.clone())) + .expect("should open ephemeral store"), + ); + + let keypairs = generate_deterministic_keypairs(NUM_VALIDATORS); + + let mut state = + interop_genesis_state::(&keypairs, 0, Hash256::repeat_byte(0x42), None, &spec) + .expect("should build genesis state"); + + *state.finalized_checkpoint_mut() = Checkpoint { + epoch: Epoch::new(1), + root: Hash256::ZERO, + }; + + let mut block = genesis_block(&state, &spec).expect("should build genesis block"); + *block.state_root_mut() = state + .update_tree_hash_cache() + .expect("should hash genesis state"); + let signed_block = SignedBeaconBlock::from_block(block, Signature::empty()); + let block_root = signed_block.canonical_root(); + + let snapshot = BeaconSnapshot::new( + Arc::new(signed_block.clone()), + None, + block_root, + state.clone(), + ); + + let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store.clone(), snapshot.clone()) + .expect("should create fork choice store"); + let fork_choice = + ForkChoice::from_anchor(fc_store, block_root, &signed_block, &state, None, &spec) + .expect("should create fork choice"); + + let canonical_head = + CanonicalHead::new(fork_choice, Arc::new(snapshot), PayloadStatus::Pending); + + let slot_clock = TestingSlotClock::new( + Slot::new(0), + Duration::from_secs(0), + spec.get_slot_duration(), + ); + // Advance past genesis so `now_with_past_tolerance` doesn't underflow. + slot_clock.set_current_time(spec.get_slot_duration()); + + let validator_pubkey_cache = + ValidatorPubkeyCache::new(&state, store.clone()).expect("should create pubkey cache"); + + Self { + canonical_head, + observed_payload_attesters: RwLock::new(ObservedPayloadAttesters::default()), + validator_pubkey_cache: RwLock::new(validator_pubkey_cache), + slot_clock, + keypairs, + spec, + genesis_block_root: block_root, + store, + } + } + + fn gossip_ctx(&self) -> GossipVerificationContext<'_, T> { + GossipVerificationContext { + slot_clock: &self.slot_clock, + spec: &self.spec, + observed_payload_attesters: &self.observed_payload_attesters, + canonical_head: &self.canonical_head, + validator_pubkey_cache: &self.validator_pubkey_cache, + store: &self.store, + } + } + + fn ptc_members(&self, slot: Slot) -> Vec { + let head = self.canonical_head.cached_head(); + let state = &head.snapshot.beacon_state; + let ptc = state.get_ptc(slot, &self.spec).expect("should get PTC"); + ptc.0.to_vec() + } + + fn sign_payload_attestation( + &self, + data: PayloadAttestationData, + validator_index: u64, + ) -> PayloadAttestationMessage { + let head = self.canonical_head.cached_head(); + let state = &head.snapshot.beacon_state; + let domain = self.spec.get_domain( + data.slot.epoch(E::slots_per_epoch()), + Domain::PTCAttester, + &state.fork(), + state.genesis_validators_root(), + ); + let message = data.signing_root(domain); + let signature = self.keypairs[validator_index as usize].sk.sign(message); + PayloadAttestationMessage { + validator_index, + data, + signature, + } + } +} + +fn make_payload_attestation( + slot: Slot, + validator_index: u64, + beacon_block_root: Hash256, +) -> PayloadAttestationMessage { + PayloadAttestationMessage { + validator_index, + data: PayloadAttestationData { + beacon_block_root, + slot, + payload_present: true, + blob_data_available: true, + }, + signature: Signature::empty(), + } +} + +#[test] +fn future_slot() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + let ctx = TestContext::new(); + let gossip = ctx.gossip_ctx(); + + let future_slot = Slot::new(5); + let msg = make_payload_attestation(future_slot, 0, ctx.genesis_block_root); + let result = VerifiedPayloadAttestationMessage::new(msg, &gossip); + assert!(matches!( + result, + Err(PayloadAttestationError::FutureSlot { .. }) + )); +} + +#[test] +fn past_slot() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + let ctx = TestContext::new(); + ctx.slot_clock.set_slot(5); + let gossip = ctx.gossip_ctx(); + + let msg = make_payload_attestation(Slot::new(0), 0, ctx.genesis_block_root); + let result = VerifiedPayloadAttestationMessage::new(msg, &gossip); + assert!(matches!( + result, + Err(PayloadAttestationError::PastSlot { .. }) + )); +} + +#[test] +fn unknown_head_block() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + let ctx = TestContext::new(); + let gossip = ctx.gossip_ctx(); + + let unknown_root = Hash256::repeat_byte(0xff); + let msg = make_payload_attestation(Slot::new(1), 0, unknown_root); + let result = VerifiedPayloadAttestationMessage::new(msg, &gossip); + assert!( + matches!( + result, + Err(PayloadAttestationError::UnknownHeadBlock { .. }) + ), + "expected UnknownHeadBlock, got: {:?}", + result + ); +} + +#[test] +fn not_in_ptc() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + let ctx = TestContext::new(); + let gossip = ctx.gossip_ctx(); + let slot = Slot::new(1); + + let ptc_members = ctx.ptc_members(slot); + let non_ptc_validator = (0..NUM_VALIDATORS as u64) + .find(|&i| !ptc_members.contains(&(i as usize))) + .expect("should find non-PTC validator"); + + let msg = make_payload_attestation(slot, non_ptc_validator, ctx.genesis_block_root); + let result = VerifiedPayloadAttestationMessage::new(msg, &gossip); + assert!(matches!( + result, + Err(PayloadAttestationError::NotInPTC { .. }) + )); +} + +#[test] +fn invalid_signature() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + let ctx = TestContext::new(); + let gossip = ctx.gossip_ctx(); + let slot = Slot::new(1); + + let ptc_members = ctx.ptc_members(slot); + let validator_index = ptc_members[0] as u64; + + let msg = make_payload_attestation(slot, validator_index, ctx.genesis_block_root); + let result = VerifiedPayloadAttestationMessage::new(msg, &gossip); + assert!(matches!( + result, + Err(PayloadAttestationError::InvalidSignature) + )); +} + +#[test] +fn valid_payload_attestation() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + let ctx = TestContext::new(); + let gossip = ctx.gossip_ctx(); + let slot = Slot::new(1); + + let ptc_members = ctx.ptc_members(slot); + let validator_index = ptc_members[0] as u64; + + let data = PayloadAttestationData { + beacon_block_root: ctx.genesis_block_root, + slot, + payload_present: true, + blob_data_available: true, + }; + let msg = ctx.sign_payload_attestation(data, validator_index); + let result = VerifiedPayloadAttestationMessage::new(msg, &gossip); + assert!( + result.is_ok(), + "expected Ok, got: {:?}", + result.unwrap_err() + ); +} + +#[test] +fn duplicate_after_valid() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + let ctx = TestContext::new(); + let gossip = ctx.gossip_ctx(); + let slot = Slot::new(1); + + let ptc_members = ctx.ptc_members(slot); + let validator_index = ptc_members[0] as u64; + + let data = PayloadAttestationData { + beacon_block_root: ctx.genesis_block_root, + slot, + payload_present: true, + blob_data_available: true, + }; + + let msg1 = ctx.sign_payload_attestation(data.clone(), validator_index); + let result1 = VerifiedPayloadAttestationMessage::new(msg1, &gossip); + assert!( + result1.is_ok(), + "first message should pass: {:?}", + result1.unwrap_err() + ); + + let msg2 = ctx.sign_payload_attestation(data, validator_index); + let result2 = VerifiedPayloadAttestationMessage::new(msg2, &gossip); + assert!(matches!( + result2, + Err(PayloadAttestationError::PriorPayloadAttestationMessageKnown { .. }) + )); +} + +/// Exercises the `partial_state_advance` fallback in gossip verification when +/// the head state is too stale to compute PTC membership (e.g., during a +/// network liveness failure with many missed slots). +#[tokio::test] +async fn stale_head_with_partial_advance() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + + let slots_per_epoch = E::slots_per_epoch(); + // Head at epoch 1, message at epoch 5 — 4 epochs of missed slots. + // This exceeds min_seed_lookahead (1), triggering the fallback path: + // get_advanced_hot_state loads the stored state, then partial_state_advance + // advances it through epoch boundaries to populate ptc_window. + let head_slot = Slot::new(slots_per_epoch); + let missed_epochs = 4; + let target_slot = Slot::new(slots_per_epoch * (1 + missed_epochs)); + let target_epoch = target_slot.epoch(slots_per_epoch); + + // GIVEN a chain with blocks through epoch 1 (so the store has states). + let harness = BeaconChainHarness::builder(E::default()) + .default_spec() + .deterministic_keypairs(64) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + harness.extend_to_slot(head_slot).await; + + let head = harness.chain.canonical_head.cached_head(); + let head_epoch = head.snapshot.beacon_state.current_epoch(); + assert!( + target_epoch > head_epoch + harness.spec.min_seed_lookahead, + "precondition: message epoch must exceed head + min_seed_lookahead to trigger fallback" + ); + + // GIVEN a slot clock advanced to epoch 5 without producing blocks + // (simulating missed slots during a liveness failure). + harness.chain.slot_clock.set_slot(target_slot.as_u64()); + + // Advance a reference state to compute the PTC at the target slot. + let mut reference_state = head.snapshot.beacon_state.clone(); + state_processing::state_advance::partial_state_advance( + &mut reference_state, + Some(head.snapshot.beacon_state_root()), + target_slot, + &harness.spec, + ) + .expect("should advance reference state"); + reference_state + .build_all_caches(&harness.spec) + .expect("should build caches"); + + let ptc = reference_state + .get_ptc(target_slot, &harness.spec) + .expect("should get PTC from reference state"); + let validator_index = *ptc.0.first().expect("PTC should have at least one member") as u64; + + // WHEN a properly-signed payload attestation from a PTC member is verified. + let domain = harness.spec.get_domain( + target_epoch, + Domain::PTCAttester, + &reference_state.fork(), + reference_state.genesis_validators_root(), + ); + let data = PayloadAttestationData { + beacon_block_root: head.head_block_root(), + slot: target_slot, + payload_present: true, + blob_data_available: true, + }; + let message = data.signing_root(domain); + let signature = harness.validator_keypairs[validator_index as usize] + .sk + .sign(message); + let msg = PayloadAttestationMessage { + validator_index, + data, + signature, + }; + + // THEN verification succeeds despite the head being 4 epochs stale. + let result = harness + .chain + .verify_payload_attestation_message_for_gossip(msg); + assert!( + result.is_ok(), + "expected Ok (head epoch {}, message epoch {}), got: {:?}", + head_epoch, + target_epoch, + result.unwrap_err() + ); +} diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index ea1a2286a0..4083b1a3af 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -21,6 +21,9 @@ use beacon_chain::{ light_client_finality_update_verification::Error as LightClientFinalityUpdateError, light_client_optimistic_update_verification::Error as LightClientOptimisticUpdateError, observed_operations::ObservationOutcome, + payload_attestation_verification::{ + Error as PayloadAttestationError, VerifiedPayloadAttestationMessage, + }, sync_committee_verification::{self, Error as SyncCommitteeError}, validator_monitor::{get_block_delay_ms, get_slot_delay_ms}, }; @@ -137,6 +140,11 @@ struct RejectedAggregate { error: AttnError, } +struct RejectedPayloadAttestation { + payload_attestation_message: Box, + error: PayloadAttestationError, +} + /// Data for an aggregated or unaggregated attestation that failed verification. enum FailedAtt { Unaggregate { @@ -4088,25 +4096,143 @@ impl NetworkBeaconProcessor { } } - // TODO(gloas) dont forget to add tracing instrumentation + #[instrument( + level = "trace", + skip_all, + fields( + peer_id = %peer_id, + slot = %payload_attestation_message.data.slot, + validator_index = payload_attestation_message.validator_index, + ) + )] pub fn process_gossip_payload_attestation( self: &Arc, message_id: MessageId, peer_id: PeerId, - payload_attestation_message: PayloadAttestationMessage, + payload_attestation_message: Box, ) { - // TODO(EIP-7732): Implement proper payload attestation message gossip processing. - // This should integrate with a payload_attestation_verification.rs module once it's implemented. + let result = match self + .chain + .verify_payload_attestation_message_for_gossip(*payload_attestation_message.clone()) + { + Ok(verified) => Ok(verified), + Err(error) => Err(RejectedPayloadAttestation { + payload_attestation_message: payload_attestation_message.clone(), + error, + }), + }; - trace!( - %peer_id, - validator_index = payload_attestation_message.validator_index, - slot = %payload_attestation_message.data.slot, - beacon_block_root = %payload_attestation_message.data.beacon_block_root, - "Processing payload attestation message" - ); + self.process_gossip_payload_attestation_result(result, message_id, peer_id); + } - // For now, ignore all payload attestation messages since verification is not implemented - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + fn process_gossip_payload_attestation_result( + self: &Arc, + result: Result, RejectedPayloadAttestation>, + message_id: MessageId, + peer_id: PeerId, + ) { + match result { + Ok(verified) => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); + + if let Err(e) = self.chain.apply_payload_attestation_to_fork_choice( + verified.indexed_payload_attestation(), + verified.ptc(), + ) { + match e { + BeaconChainError::ForkChoiceError( + ForkChoiceError::InvalidPayloadAttestation(e), + ) => { + debug!( + reason = ?e, + %peer_id, + "Payload attestation invalid for fork choice" + ) + } + e => error!( + reason = ?e, + %peer_id, + "Error applying payload attestation to fork choice" + ), + } + } + } + Err(RejectedPayloadAttestation { + payload_attestation_message, + error, + }) => { + self.handle_payload_attestation_verification_failure( + peer_id, + message_id, + error, + payload_attestation_message.data.slot, + ); + } + } + } + + fn handle_payload_attestation_verification_failure( + &self, + peer_id: PeerId, + message_id: MessageId, + error: PayloadAttestationError, + message_slot: Slot, + ) { + match &error { + PayloadAttestationError::FutureSlot { .. } => { + self.gossip_penalize_peer( + peer_id, + PeerAction::HighToleranceError, + "payload_attn_future_slot", + ); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + } + PayloadAttestationError::PastSlot { .. } + | PayloadAttestationError::PriorPayloadAttestationMessageKnown { .. } => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + } + PayloadAttestationError::UnknownHeadBlock { .. } => { + debug!( + %peer_id, + %message_slot, + "Payload attestation references unknown block" + ); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + } + PayloadAttestationError::NotInPTC { .. } => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + self.gossip_penalize_peer( + peer_id, + PeerAction::LowToleranceError, + "payload_attn_not_in_ptc", + ); + } + PayloadAttestationError::UnknownValidatorIndex(_) => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + self.gossip_penalize_peer( + peer_id, + PeerAction::LowToleranceError, + "payload_attn_unknown_validator", + ); + } + PayloadAttestationError::InvalidSignature => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + self.gossip_penalize_peer( + peer_id, + PeerAction::LowToleranceError, + "payload_attn_invalid_sig", + ); + } + PayloadAttestationError::BeaconChainError(_) + | PayloadAttestationError::BeaconStateError(_) => { + debug!( + %peer_id, + %message_slot, + ?error, + "Internal error verifying payload attestation" + ); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + } + } } } diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 015b6a616e..bfcff2088b 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -511,7 +511,7 @@ impl NetworkBeaconProcessor { processor.process_gossip_payload_attestation( message_id, peer_id, - *payload_attestation_message, + payload_attestation_message, ) }; diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index f9d779fd24..a9e62dbe94 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -1351,7 +1351,7 @@ where let ptc_indices: Vec = attestation .attesting_indices .iter() - .filter_map(|vi| ptc.iter().position(|&p| p == *vi as usize)) + .filter_map(|validator_index| ptc.iter().position(|&p| p == *validator_index as usize)) .collect(); // Check that all the attesters are in the PTC From 028b5a42a9715c31f416d45db70add39d9934b12 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Mon, 27 Apr 2026 17:13:35 +0200 Subject: [PATCH 5/5] Add payload attestation validator duty (#9178) Co-Authored-By: Eitan Seri-Levi Co-Authored-By: Jimmy Chen --- beacon_node/http_api/src/beacon/pool.rs | 149 ++++++++++- beacon_node/http_api/src/lib.rs | 21 +- beacon_node/http_api/tests/tests.rs | 96 +++++++ common/eth2/src/lib.rs | 44 +++- .../lighthouse_validator_store/src/lib.rs | 42 +++- validator_client/signing_method/src/lib.rs | 5 + .../signing_method/src/web3signer.rs | 3 + validator_client/src/lib.rs | 19 ++ .../validator_services/src/lib.rs | 1 + .../src/payload_attestation_service.rs | 238 ++++++++++++++++++ validator_client/validator_store/src/lib.rs | 16 +- 11 files changed, 618 insertions(+), 16 deletions(-) create mode 100644 validator_client/validator_services/src/payload_attestation_service.rs diff --git a/beacon_node/http_api/src/beacon/pool.rs b/beacon_node/http_api/src/beacon/pool.rs index 059573c317..c6b8a69643 100644 --- a/beacon_node/http_api/src/beacon/pool.rs +++ b/beacon_node/http_api/src/beacon/pool.rs @@ -1,24 +1,31 @@ use crate::task_spawner::{Priority, TaskSpawner}; -use crate::utils::{NetworkTxFilter, OptionalConsensusVersionHeaderFilter, ResponseFilter}; +use crate::utils::{ + ChainFilter, EthV1Filter, NetworkTxFilter, OptionalConsensusVersionHeaderFilter, + ResponseFilter, TaskSpawnerFilter, +}; use crate::version::{ ResponseIncludesVersion, V1, V2, add_consensus_version_header, beacon_response, unsupported_version_rejection, }; use crate::{sync_committees, utils}; use beacon_chain::observed_operations::ObservationOutcome; +use beacon_chain::payload_attestation_verification::Error as PayloadAttestationError; use beacon_chain::{BeaconChain, BeaconChainTypes}; +use bytes::Bytes; use eth2::types::{AttestationPoolQuery, EndpointVersion, Failure, GenericResponse}; use lighthouse_network::PubsubMessage; use network::NetworkMessage; use operation_pool::ReceivedPreCapella; use slot_clock::SlotClock; +use ssz::{Decode, Encode}; use std::collections::HashSet; use std::sync::Arc; use tokio::sync::mpsc::UnboundedSender; -use tracing::{debug, info, warn}; +use tracing::{debug, error, info, warn}; use types::{ - Attestation, AttestationData, AttesterSlashing, ForkName, ProposerSlashing, - SignedBlsToExecutionChange, SignedVoluntaryExit, SingleAttestation, SyncCommitteeMessage, + Attestation, AttestationData, AttesterSlashing, ForkName, PayloadAttestationMessage, + ProposerSlashing, SignedBlsToExecutionChange, SignedVoluntaryExit, SingleAttestation, + SyncCommitteeMessage, }; use warp::filters::BoxedFilter; use warp::{Filter, Reply}; @@ -520,3 +527,137 @@ pub fn post_beacon_pool_attestations_v2( ) .boxed() } + +/// POST beacon/pool/payload_attestations (JSON) +pub fn post_beacon_pool_payload_attestations( + network_tx_filter: &NetworkTxFilter, + optional_consensus_version_header_filter: OptionalConsensusVersionHeaderFilter, + beacon_pool_path: &BeaconPoolPathFilter, +) -> ResponseFilter { + beacon_pool_path + .clone() + .and(warp::path("payload_attestations")) + .and(warp::path::end()) + .and(warp_utils::json::json()) + .and(optional_consensus_version_header_filter) + .and(network_tx_filter.clone()) + .then( + |task_spawner: TaskSpawner, + chain: Arc>, + messages: Vec, + _fork_name: Option, + network_tx: UnboundedSender>| { + task_spawner.blocking_json_task(Priority::P0, move || { + publish_payload_attestation_messages(&chain, &network_tx, messages) + }) + }, + ) + .boxed() +} + +/// POST beacon/pool/payload_attestations (SSZ) +pub fn post_beacon_pool_payload_attestations_ssz( + eth_v1: EthV1Filter, + task_spawner_filter: TaskSpawnerFilter, + chain_filter: ChainFilter, + network_tx_filter: NetworkTxFilter, +) -> ResponseFilter { + eth_v1 + .and(warp::path("beacon")) + .and(warp::path("pool")) + .and(warp::path("payload_attestations")) + .and(warp::path::end()) + .and(warp::body::bytes()) + .and(task_spawner_filter) + .and(chain_filter) + .and(network_tx_filter) + .then( + |body_bytes: Bytes, + task_spawner: TaskSpawner, + chain: Arc>, + network_tx: UnboundedSender>| { + task_spawner.blocking_json_task(Priority::P0, move || { + let item_len = ::ssz_fixed_len(); + if !body_bytes.len().is_multiple_of(item_len) { + return Err(warp_utils::reject::custom_bad_request(format!( + "SSZ body length {} is not a multiple of PayloadAttestationMessage size {}", + body_bytes.len(), + item_len, + ))); + } + let messages: Vec = body_bytes + .chunks(item_len) + .map(|chunk| { + PayloadAttestationMessage::from_ssz_bytes(chunk).map_err(|e| { + warp_utils::reject::custom_bad_request(format!( + "invalid SSZ: {e:?}" + )) + }) + }) + .collect::>()?; + + publish_payload_attestation_messages(&chain, &network_tx, messages) + }) + }, + ) + .boxed() +} + +fn publish_payload_attestation_messages( + chain: &BeaconChain, + network_tx: &UnboundedSender>, + messages: Vec, +) -> Result<(), warp::Rejection> { + let mut failures = vec![]; + let mut num_already_known = 0; + + for (index, message) in messages.into_iter().enumerate() { + match chain.verify_payload_attestation_message_for_gossip(message.clone()) { + Ok(verified) => { + utils::publish_pubsub_message( + network_tx, + PubsubMessage::PayloadAttestation(Box::new(message)), + )?; + + if let Err(e) = chain.apply_payload_attestation_to_fork_choice( + verified.indexed_payload_attestation(), + verified.ptc(), + ) { + warn!( + error = ?e, + request_index = index, + "Payload attestation invalid for fork choice" + ); + } + } + Err(PayloadAttestationError::PriorPayloadAttestationMessageKnown { .. }) => { + num_already_known += 1; + } + // TODO(gloas): requeue for reprocessing like attestations do. + Err(e) => { + error!( + error = ?e, + request_index = index, + "Failure verifying payload attestation for gossip" + ); + failures.push(Failure::new(index, format!("{e:?}"))); + } + } + } + + if num_already_known > 0 { + debug!( + count = num_already_known, + "Some payload attestations already known" + ); + } + + if failures.is_empty() { + Ok(()) + } else { + Err(warp_utils::reject::indexed_bad_request( + "error processing payload attestations".to_string(), + failures, + )) + } +} diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index bd80dd1e82..b2d069f384 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1454,7 +1454,7 @@ pub fn serve( let post_beacon_pool_attestations_v2 = post_beacon_pool_attestations_v2( &network_tx_filter, - optional_consensus_version_header_filter, + optional_consensus_version_header_filter.clone(), &beacon_pool_path_v2, ); @@ -1487,6 +1487,21 @@ pub fn serve( let post_beacon_pool_sync_committees = post_beacon_pool_sync_committees(&network_tx_filter, &beacon_pool_path); + // POST beacon/pool/payload_attestations + let post_beacon_pool_payload_attestations = post_beacon_pool_payload_attestations( + &network_tx_filter, + optional_consensus_version_header_filter, + &beacon_pool_path, + ); + + // POST beacon/pool/payload_attestations (SSZ) + let post_beacon_pool_payload_attestations_ssz = post_beacon_pool_payload_attestations_ssz( + eth_v1.clone(), + task_spawner_filter.clone(), + chain_filter.clone(), + network_tx_filter.clone(), + ); + // GET beacon/pool/bls_to_execution_changes let get_beacon_pool_bls_to_execution_changes = get_beacon_pool_bls_to_execution_changes(&beacon_pool_path); @@ -3400,7 +3415,8 @@ pub fn serve( .uor(post_beacon_blocks_v2_ssz) .uor(post_beacon_blinded_blocks_ssz) .uor(post_beacon_blinded_blocks_v2_ssz) - .uor(post_beacon_execution_payload_envelope_ssz), + .uor(post_beacon_execution_payload_envelope_ssz) + .uor(post_beacon_pool_payload_attestations_ssz), ) .uor(post_beacon_blocks) .uor(post_beacon_blinded_blocks) @@ -3411,6 +3427,7 @@ pub fn serve( .uor(post_beacon_pool_proposer_slashings) .uor(post_beacon_pool_voluntary_exits) .uor(post_beacon_pool_sync_committees) + .uor(post_beacon_pool_payload_attestations) .uor(post_beacon_pool_bls_to_execution_changes) .uor(post_beacon_execution_payload_envelope) .uor(post_beacon_state_validators) diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index aac3384fbd..b8326f4495 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -2793,6 +2793,89 @@ impl ApiTester { self } + fn make_valid_payload_attestation_message( + &self, + ptc_offset: usize, + ) -> PayloadAttestationMessage { + let head = self.chain.head_snapshot(); + let head_slot = head.beacon_block.slot(); + let head_root = head.beacon_block_root; + let fork = head.beacon_state.fork(); + let genesis_validators_root = self.chain.genesis_validators_root; + + let ptc = head + .beacon_state + .get_ptc(head_slot, &self.chain.spec) + .expect("should get PTC"); + + // Find distinct validator indices in the PTC (may contain duplicates due to + // weighted sampling with a small validator set). + let mut seen = std::collections::HashSet::new(); + let distinct_indices: Vec = ptc + .0 + .iter() + .copied() + .filter(|idx| seen.insert(*idx)) + .collect(); + let validator_index = distinct_indices[ptc_offset % distinct_indices.len()]; + + let data = PayloadAttestationData { + beacon_block_root: head_root, + slot: head_slot, + payload_present: true, + blob_data_available: true, + }; + + let epoch = head_slot.epoch(E::slots_per_epoch()); + let domain = + self.chain + .spec + .get_domain(epoch, Domain::PTCAttester, &fork, genesis_validators_root); + let signing_root = data.signing_root(domain); + let sk = &self.validator_keypairs()[validator_index].sk; + let signature = sk.sign(signing_root); + + PayloadAttestationMessage { + validator_index: validator_index as u64, + data, + signature, + } + } + + pub async fn test_post_beacon_pool_payload_attestations_valid(mut self) -> Self { + let message = self.make_valid_payload_attestation_message(0); + let fork_name = self.chain.spec.fork_name_at_slot::(message.data.slot); + + self.client + .post_beacon_pool_payload_attestations(&[message], fork_name) + .await + .unwrap(); + + assert!( + self.network_rx.network_recv.recv().await.is_some(), + "valid payload attestation should be sent to network" + ); + + self + } + + pub async fn test_post_beacon_pool_payload_attestations_valid_ssz(mut self) -> Self { + let message = self.make_valid_payload_attestation_message(1); + let fork_name = self.chain.spec.fork_name_at_slot::(message.data.slot); + + self.client + .post_beacon_pool_payload_attestations_ssz(&[message], fork_name) + .await + .unwrap(); + + assert!( + self.network_rx.network_recv.recv().await.is_some(), + "valid payload attestation (SSZ) should be sent to network" + ); + + self + } + pub async fn test_get_config_fork_schedule(self) -> Self { let result = self.client.get_config_fork_schedule().await.unwrap().data; @@ -8246,6 +8329,19 @@ async fn get_validator_payload_attestation_data_pre_gloas() { .await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn post_beacon_pool_payload_attestations_valid() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + ApiTester::new() + .await + .test_post_beacon_pool_payload_attestations_valid() + .await + .test_post_beacon_pool_payload_attestations_valid_ssz() + .await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_aggregate_attestation_v1() { ApiTester::new() diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 4ec75468a2..e866547b9f 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -46,7 +46,7 @@ use ssz::{Decode, Encode}; use std::fmt; use std::future::Future; use std::time::Duration; -use types::PayloadAttestationData; +use types::{PayloadAttestationData, PayloadAttestationMessage}; pub const V1: EndpointVersion = EndpointVersion(1); pub const V2: EndpointVersion = EndpointVersion(2); @@ -1789,6 +1789,48 @@ impl BeaconNodeHttpClient { Ok(()) } + /// `POST beacon/pool/payload_attestations` (JSON) + pub async fn post_beacon_pool_payload_attestations( + &self, + messages: &[PayloadAttestationMessage], + fork_name: ForkName, + ) -> Result<(), Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("pool") + .push("payload_attestations"); + + self.post_generic_with_consensus_version(path, &messages, None, fork_name) + .await?; + + Ok(()) + } + + /// `POST beacon/pool/payload_attestations` (SSZ) + pub async fn post_beacon_pool_payload_attestations_ssz( + &self, + messages: &[PayloadAttestationMessage], + fork_name: ForkName, + ) -> Result<(), Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("pool") + .push("payload_attestations"); + + let ssz_body: Vec = messages.iter().flat_map(|m| m.as_ssz_bytes()).collect(); + + self.post_generic_with_consensus_version_and_ssz_body(path, ssz_body, None, fork_name) + .await?; + + Ok(()) + } + /// `POST beacon/pool/bls_to_execution_changes` pub async fn post_beacon_pool_bls_to_execution_changes( &self, diff --git a/validator_client/lighthouse_validator_store/src/lib.rs b/validator_client/lighthouse_validator_store/src/lib.rs index c5bcd88eb1..1b32777678 100644 --- a/validator_client/lighthouse_validator_store/src/lib.rs +++ b/validator_client/lighthouse_validator_store/src/lib.rs @@ -21,11 +21,12 @@ use tracing::{Instrument, debug, error, info, info_span, instrument, warn}; use types::{ AbstractExecPayload, Address, AggregateAndProof, Attestation, BeaconBlock, BlindedPayload, ChainSpec, ContributionAndProof, Domain, Epoch, EthSpec, ExecutionPayloadEnvelope, Fork, - FullPayload, Graffiti, Hash256, SelectionProof, SignedAggregateAndProof, SignedBeaconBlock, - SignedContributionAndProof, SignedExecutionPayloadEnvelope, SignedRoot, - SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncAggregatorSelectionData, - SyncCommitteeContribution, SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId, - ValidatorRegistrationData, VoluntaryExit, graffiti::GraffitiString, + FullPayload, Graffiti, Hash256, PayloadAttestationData, PayloadAttestationMessage, + SelectionProof, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, + SignedExecutionPayloadEnvelope, SignedRoot, SignedValidatorRegistrationData, + SignedVoluntaryExit, Slot, SyncAggregatorSelectionData, SyncCommitteeContribution, + SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId, ValidatorRegistrationData, + VoluntaryExit, graffiti::GraffitiString, }; use validator_store::{ AggregateToSign, AttestationToSign, ContributionToSign, DoppelgangerStatus, @@ -1423,6 +1424,37 @@ impl ValidatorStore for LighthouseValidatorS }) } + async fn sign_payload_attestation( + &self, + validator_pubkey: PublicKeyBytes, + data: PayloadAttestationData, + ) -> Result { + let signing_context = + self.signing_context(Domain::PTCAttester, data.slot.epoch(E::slots_per_epoch())); + + let validator_index = self + .validator_index(&validator_pubkey) + .ok_or(ValidatorStoreError::UnknownPubkey(validator_pubkey))?; + + let signing_method = self.doppelganger_bypassed_signing_method(validator_pubkey)?; + + let signature = signing_method + .get_signature::>( + SignableMessage::PayloadAttestationData(&data), + signing_context, + &self.spec, + &self.task_executor, + ) + .await + .map_err(Error::SpecificError)?; + + Ok(PayloadAttestationMessage { + validator_index, + data, + signature, + }) + } + /// Sign an `ExecutionPayloadEnvelope` for Gloas (local building). /// The proposer acts as the builder and signs with the BeaconBuilder domain. async fn sign_execution_payload_envelope( diff --git a/validator_client/signing_method/src/lib.rs b/validator_client/signing_method/src/lib.rs index c132d86c17..2f80fa5761 100644 --- a/validator_client/signing_method/src/lib.rs +++ b/validator_client/signing_method/src/lib.rs @@ -50,6 +50,7 @@ pub enum SignableMessage<'a, E: EthSpec, Payload: AbstractExecPayload = FullP ValidatorRegistration(&'a ValidatorRegistrationData), VoluntaryExit(&'a VoluntaryExit), ExecutionPayloadEnvelope(&'a ExecutionPayloadEnvelope), + PayloadAttestationData(&'a PayloadAttestationData), } impl> SignableMessage<'_, E, Payload> { @@ -72,6 +73,7 @@ impl> SignableMessage<'_, E, Payload SignableMessage::ValidatorRegistration(v) => v.signing_root(domain), SignableMessage::VoluntaryExit(exit) => exit.signing_root(domain), SignableMessage::ExecutionPayloadEnvelope(e) => e.signing_root(domain), + SignableMessage::PayloadAttestationData(d) => d.signing_root(domain), } } } @@ -238,6 +240,9 @@ impl SigningMethod { SignableMessage::ExecutionPayloadEnvelope(e) => { Web3SignerObject::ExecutionPayloadEnvelope(e) } + SignableMessage::PayloadAttestationData(d) => { + Web3SignerObject::PayloadAttestationData(d) + } }; // Determine the Web3Signer message type. diff --git a/validator_client/signing_method/src/web3signer.rs b/validator_client/signing_method/src/web3signer.rs index e6fc8f3ba2..c2b7e06f92 100644 --- a/validator_client/signing_method/src/web3signer.rs +++ b/validator_client/signing_method/src/web3signer.rs @@ -21,6 +21,7 @@ pub enum MessageType { ValidatorRegistration, // TODO(gloas) verify w/ web3signer specs ExecutionPayloadEnvelope, + PayloadAttestation, } #[derive(Debug, PartialEq, Copy, Clone, Serialize)] @@ -78,6 +79,7 @@ pub enum Web3SignerObject<'a, E: EthSpec, Payload: AbstractExecPayload> { ContributionAndProof(&'a ContributionAndProof), ValidatorRegistration(&'a ValidatorRegistrationData), ExecutionPayloadEnvelope(&'a ExecutionPayloadEnvelope), + PayloadAttestationData(&'a PayloadAttestationData), } impl<'a, E: EthSpec, Payload: AbstractExecPayload> Web3SignerObject<'a, E, Payload> { @@ -144,6 +146,7 @@ impl<'a, E: EthSpec, Payload: AbstractExecPayload> Web3SignerObject<'a, E, Pa } Web3SignerObject::ValidatorRegistration(_) => MessageType::ValidatorRegistration, Web3SignerObject::ExecutionPayloadEnvelope(_) => MessageType::ExecutionPayloadEnvelope, + Web3SignerObject::PayloadAttestationData(_) => MessageType::PayloadAttestation, } } } diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index e26d5c3d30..b412db45f6 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -45,6 +45,7 @@ use validator_services::{ block_service::{BlockService, BlockServiceBuilder}, duties_service::{self, DutiesService, DutiesServiceBuilder}, latency_service, + payload_attestation_service::PayloadAttestationService, preparation_service::{PreparationService, PreparationServiceBuilder}, sync_committee_service::SyncCommitteeService, }; @@ -83,6 +84,7 @@ pub struct ProductionValidatorClient { block_service: BlockService, SystemTimeSlotClock>, attestation_service: AttestationService, SystemTimeSlotClock>, sync_committee_service: SyncCommitteeService, SystemTimeSlotClock>, + payload_attestation_service: PayloadAttestationService, SystemTimeSlotClock>, doppelganger_service: Option>, preparation_service: PreparationService, SystemTimeSlotClock>, validator_store: Arc>, @@ -552,12 +554,22 @@ impl ProductionValidatorClient { context.executor.clone(), ); + let payload_attestation_service = PayloadAttestationService::new( + duties_service.clone(), + validator_store.clone(), + slot_clock.clone(), + beacon_nodes.clone(), + context.executor.clone(), + context.eth2_config.spec.clone(), + ); + Ok(Self { context, duties_service, block_service, attestation_service, sync_committee_service, + payload_attestation_service, doppelganger_service, preparation_service, validator_store, @@ -629,6 +641,13 @@ impl ProductionValidatorClient { .start_update_service(&self.context.eth2_config.spec) .map_err(|e| format!("Unable to start sync committee service: {}", e))?; + if self.context.eth2_config.spec.is_gloas_scheduled() { + self.payload_attestation_service + .clone() + .start_update_service() + .map_err(|e| format!("Unable to start payload attestation service: {}", e))?; + } + self.preparation_service .clone() .start_update_service(&self.context.eth2_config.spec) diff --git a/validator_client/validator_services/src/lib.rs b/validator_client/validator_services/src/lib.rs index 3b8bd9ae14..0169335a7f 100644 --- a/validator_client/validator_services/src/lib.rs +++ b/validator_client/validator_services/src/lib.rs @@ -3,6 +3,7 @@ pub mod block_service; pub mod duties_service; pub mod latency_service; pub mod notifier_service; +pub mod payload_attestation_service; pub mod preparation_service; pub mod sync; pub mod sync_committee_service; diff --git a/validator_client/validator_services/src/payload_attestation_service.rs b/validator_client/validator_services/src/payload_attestation_service.rs new file mode 100644 index 0000000000..2f3ca8bed2 --- /dev/null +++ b/validator_client/validator_services/src/payload_attestation_service.rs @@ -0,0 +1,238 @@ +use crate::duties_service::DutiesService; +use beacon_node_fallback::BeaconNodeFallback; +use logging::crit; +use slot_clock::SlotClock; +use std::ops::Deref; +use std::sync::Arc; +use task_executor::TaskExecutor; +use tokio::time::sleep; +use tracing::{debug, error, info}; +use types::{ChainSpec, EthSpec}; +use validator_store::ValidatorStore; + +pub struct Inner { + duties_service: Arc>, + validator_store: Arc, + slot_clock: T, + beacon_nodes: Arc>, + executor: TaskExecutor, + chain_spec: Arc, +} + +pub struct PayloadAttestationService { + inner: Arc>, +} + +impl Clone for PayloadAttestationService { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl Deref for PayloadAttestationService { + type Target = Inner; + + fn deref(&self) -> &Self::Target { + self.inner.deref() + } +} + +impl PayloadAttestationService { + pub fn new( + duties_service: Arc>, + validator_store: Arc, + slot_clock: T, + beacon_nodes: Arc>, + executor: TaskExecutor, + chain_spec: Arc, + ) -> Self { + Self { + inner: Arc::new(Inner { + duties_service, + validator_store, + slot_clock, + beacon_nodes, + executor, + chain_spec, + }), + } + } + + pub fn start_update_service(self) -> Result<(), String> { + let slot_duration = self.chain_spec.get_slot_duration(); + let payload_attestation_due = self.chain_spec.get_payload_attestation_due(); + + info!( + payload_attestation_due_ms = payload_attestation_due.as_millis(), + "Payload attestation service started" + ); + + let executor = self.executor.clone(); + + let interval_fut = async move { + loop { + let Some(duration_to_next_slot) = self.slot_clock.duration_to_next_slot() else { + error!("Failed to read slot clock"); + sleep(slot_duration).await; + continue; + }; + + let Some(current_slot) = self.slot_clock.now() else { + error!("Failed to read slot clock after trigger"); + continue; + }; + + if !self + .chain_spec + .fork_name_at_slot::(current_slot) + .gloas_enabled() + { + let duration_to_next_epoch = self + .slot_clock + .duration_to_next_epoch(S::E::slots_per_epoch()) + .unwrap_or_else(|| { + self.chain_spec.get_slot_duration() * S::E::slots_per_epoch() as u32 + }); + sleep(duration_to_next_epoch).await; + continue; + } + + sleep(duration_to_next_slot + payload_attestation_due).await; + + let service = self.clone(); + self.executor.spawn( + async move { + service.produce_and_publish(current_slot).await; + }, + "payload_attestation_producer", + ); + } + }; + + executor.spawn(interval_fut, "payload_attestation_service"); + Ok(()) + } + + async fn produce_and_publish(&self, slot: types::Slot) { + let duties = self.duties_service.get_ptc_duties_for_slot(slot); + + if duties.is_empty() { + return; + } + + debug!( + %slot, + duty_count = duties.len(), + "Producing payload attestations" + ); + + let attestation_data = match self + .beacon_nodes + .first_success(|beacon_node| async move { + beacon_node + .get_validator_payload_attestation_data(slot) + .await + .map_err(|e| format!("Failed to get payload attestation data: {e:?}")) + .map(|resp| resp.into_data()) + }) + .await + { + Ok(data) => data, + Err(e) => { + crit!( + error = %e, + %slot, + "Failed to produce payload attestation data" + ); + return; + } + }; + + debug!( + %slot, + beacon_block_root = ?attestation_data.beacon_block_root, + payload_present = attestation_data.payload_present, + "Received payload attestation data" + ); + + let mut messages = Vec::with_capacity(duties.len()); + + for duty in &duties { + match self + .validator_store + .sign_payload_attestation(duty.pubkey, attestation_data.clone()) + .await + { + Ok(message) => { + messages.push(message); + } + Err(e) => { + crit!( + error = ?e, + validator = ?duty.pubkey, + %slot, + "Failed to sign payload attestation" + ); + } + } + } + + if messages.is_empty() { + return; + } + + let count = messages.len(); + let fork_name = self.chain_spec.fork_name_at_slot::(slot); + let result = self + .beacon_nodes + .first_success(|beacon_node| { + let messages = messages.clone(); + async move { + beacon_node + .post_beacon_pool_payload_attestations_ssz(&messages, fork_name) + .await + .map_err(|e| format!("Failed to publish payload attestations (SSZ): {e:?}")) + } + }) + .await; + + let result = match result { + Ok(()) => Ok(()), + Err(_) => { + debug!(%slot, "SSZ publish failed, falling back to JSON"); + self.beacon_nodes + .first_success(|beacon_node| { + let messages = messages.clone(); + async move { + beacon_node + .post_beacon_pool_payload_attestations(&messages, fork_name) + .await + .map_err(|e| { + format!("Failed to publish payload attestations (JSON): {e:?}") + }) + } + }) + .await + } + }; + + match result { + Ok(()) => { + info!( + %slot, + %count, + "Successfully published payload attestations" + ); + } + Err(e) => { + crit!( + error = %e, + %slot, + "Failed to publish payload attestations" + ); + } + } + } +} diff --git a/validator_client/validator_store/src/lib.rs b/validator_client/validator_store/src/lib.rs index da0b33de18..4e5b415a41 100644 --- a/validator_client/validator_store/src/lib.rs +++ b/validator_client/validator_store/src/lib.rs @@ -7,10 +7,11 @@ use std::future::Future; use std::sync::Arc; use types::{ Address, Attestation, AttestationError, BlindedBeaconBlock, Epoch, EthSpec, - ExecutionPayloadEnvelope, Graffiti, Hash256, SelectionProof, SignedAggregateAndProof, - SignedBlindedBeaconBlock, SignedContributionAndProof, SignedExecutionPayloadEnvelope, - SignedValidatorRegistrationData, Slot, SyncCommitteeContribution, SyncCommitteeMessage, - SyncSelectionProof, SyncSubnetId, ValidatorRegistrationData, + ExecutionPayloadEnvelope, Graffiti, Hash256, PayloadAttestationData, PayloadAttestationMessage, + SelectionProof, SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedContributionAndProof, + SignedExecutionPayloadEnvelope, SignedValidatorRegistrationData, Slot, + SyncCommitteeContribution, SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId, + ValidatorRegistrationData, }; #[derive(Debug, PartialEq, Clone)] @@ -205,6 +206,13 @@ pub trait ValidatorStore: Send + Sync { envelope: ExecutionPayloadEnvelope, ) -> impl Future, Error>> + Send; + /// Sign a `PayloadAttestationData` for the PTC. + fn sign_payload_attestation( + &self, + validator_pubkey: PublicKeyBytes, + data: PayloadAttestationData, + ) -> impl Future>> + Send; + /// Returns `ProposalData` for the provided `pubkey` if it exists in `InitializedValidators`. /// `ProposalData` fields include defaulting logic described in `get_fee_recipient_defaulting`, /// `get_gas_limit_defaulting`, and `get_builder_proposals_defaulting`.