diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index e7cfe615a1..6281688b31 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -57,13 +57,15 @@ use crate::observed_aggregates::{ Error as AttestationObservationError, ObservedAggregateAttestations, ObservedSyncContributions, }; use crate::observed_attesters::{ - ObservedAggregators, ObservedAttesters, ObservedSyncAggregators, ObservedSyncContributors, + ObservedAggregators, ObservedAttesters, ObservedPayloadAttesters, ObservedSyncAggregators, + ObservedSyncContributors, }; use crate::observed_block_producers::ObservedBlockProducers; use crate::observed_data_sidecars::ObservedDataSidecars; use crate::observed_operations::{ObservationOutcome, ObservedOperations}; use crate::observed_slashable::ObservedSlashable; use crate::partial_data_column_assembler::PartialMergeResult; +use crate::payload_attestation_verification::VerifiedPayloadAttestationMessage; use crate::payload_bid_verification::payload_bid_cache::GossipVerifiedPayloadBidCache; #[cfg(not(test))] use crate::payload_envelope_streamer::{EnvelopeRequestSource, launch_payload_envelope_stream}; @@ -423,6 +425,9 @@ pub struct BeaconChain { /// Maintains a record of which validators have been seen to create `SignedContributionAndProofs` /// in recent epochs. pub(crate) observed_sync_aggregators: RwLock>, + /// Maintains a record of which validators have sent payload attestation messages + /// in recent slots. + pub(crate) observed_payload_attesters: RwLock>, /// Maintains a record of which validators have proposed blocks for each slot. pub observed_block_producers: RwLock>, /// Maintains a record of blob sidecars seen over the gossip network. @@ -2314,6 +2319,33 @@ impl BeaconChain { }) } + pub fn apply_payload_attestation_to_fork_choice( + &self, + indexed_payload_attestation: &IndexedPayloadAttestation, + ptc: &PTC, + ) -> Result<(), Error> { + self.canonical_head + .fork_choice_write_lock() + .on_payload_attestation( + self.slot()?, + indexed_payload_attestation, + AttestationFromBlock::False, + &ptc.0, + ) + .map_err(Into::into) + } + + /// Add a verified payload attestation message to the operation pool for block inclusion. + pub fn add_payload_attestation_to_pool( + &self, + verified: &VerifiedPayloadAttestationMessage, + ) -> Result<(), Error> { + self.op_pool + .insert_payload_attestation_message(verified.payload_attestation_message().clone()) + .map_err(Error::OpPoolError)?; + Ok(()) + } + /// Accepts some `SyncCommitteeMessage` from the network and attempts to verify it, returning `Ok(_)` if /// it is valid to be (re)broadcast on the gossip network. pub fn verify_sync_committee_message_for_gossip( diff --git a/beacon_node/beacon_chain/src/block_production/gloas.rs b/beacon_node/beacon_chain/src/block_production/gloas.rs index 9b3fc2806e..79ea78ce4a 100644 --- a/beacon_node/beacon_chain/src/block_production/gloas.rs +++ b/beacon_node/beacon_chain/src/block_production/gloas.rs @@ -1,17 +1,18 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::marker::PhantomData; use std::sync::Arc; -use bls::Signature; +use bls::{PublicKeyBytes, Signature}; use execution_layer::{ BlockProposalContentsGloas, BuilderParams, PayloadAttributes, PayloadParameters, }; use fork_choice::PayloadStatus; use operation_pool::CompactAttestationRef; use ssz::Encode; -use state_processing::common::get_attesting_indices_from_state; +use state_processing::common::{get_attesting_indices_from_state, get_indexed_payload_attestation}; use state_processing::envelope_processing::verify_execution_payload_envelope; use state_processing::epoch_cache::initialize_epoch_cache; +use state_processing::per_block_processing::is_valid_indexed_payload_attestation; use state_processing::per_block_processing::{ apply_parent_execution_payload, compute_timestamp_at_slot, get_expected_withdrawals, verify_attestation_for_block_inclusion, @@ -27,13 +28,14 @@ use types::consts::gloas::BUILDER_INDEX_SELF_BUILD; use types::{ Address, Attestation, AttestationElectra, AttesterSlashing, AttesterSlashingElectra, BeaconBlock, BeaconBlockBodyGloas, BeaconBlockGloas, BeaconState, BeaconStateError, - BuilderIndex, Deposit, Eth1Data, EthSpec, ExecutionBlockHash, ExecutionPayloadBid, + BuilderIndex, ChainSpec, Deposit, Eth1Data, EthSpec, ExecutionBlockHash, ExecutionPayloadBid, ExecutionPayloadEnvelope, ExecutionPayloadGloas, ExecutionRequests, FullPayload, Graffiti, Hash256, PayloadAttestation, ProposerSlashing, RelativeEpoch, SignedBeaconBlock, SignedBlsToExecutionChange, SignedExecutionPayloadBid, SignedExecutionPayloadEnvelope, SignedVoluntaryExit, Slot, SyncAggregate, Withdrawal, Withdrawals, }; +use crate::pending_payload_envelopes::PendingEnvelopeData; use crate::{ BeaconChain, BeaconChainError, BeaconChainTypes, BlockProductionError, ProduceBlockVerification, block_production::BlockProductionState, @@ -73,6 +75,7 @@ pub struct ExecutionPayloadData { pub execution_requests: ExecutionRequests, pub builder_index: BuilderIndex, pub slot: Slot, + pub blobs_and_proofs: (types::BlobsList, types::KzgProofs), } impl BeaconChain { @@ -136,6 +139,16 @@ impl BeaconChain { graffiti_settings: GraffitiSettings, verification: ProduceBlockVerification, ) -> Result, BlockProductionError> { + // Extract the parent's execution requests from the envelope (if parent was full). + let parent_execution_requests = if parent_payload_status == PayloadStatus::Full { + parent_envelope + .as_ref() + .map(|env| env.message.execution_requests.clone()) + .ok_or(BlockProductionError::MissingParentExecutionPayload)? + } else { + ExecutionRequests::default() + }; + // Part 1/3 (blocking) // // Perform the state advance and block-packing functions. @@ -144,6 +157,7 @@ impl BeaconChain { .graffiti_calculator .get_graffiti(graffiti_settings) .await; + let parent_execution_requests_ref = parent_execution_requests.clone(); let (partial_beacon_block, state) = self .task_executor .spawn_blocking_handle( @@ -154,6 +168,7 @@ impl BeaconChain { produce_at_slot, randao_reveal, graffiti, + &parent_execution_requests_ref, ) }, "produce_partial_beacon_block_gloas", @@ -162,16 +177,6 @@ impl BeaconChain { .await .map_err(BlockProductionError::TokioJoin)??; - // Extract the parent's execution requests from the envelope (if parent was full). - let parent_execution_requests = if parent_payload_status == PayloadStatus::Full { - parent_envelope - .as_ref() - .map(|env| env.message.execution_requests.clone()) - .ok_or(BlockProductionError::MissingParentExecutionPayload)? - } else { - ExecutionRequests::default() - }; - // Part 2/3 (async) // // Produce the execution payload bid. @@ -222,6 +227,7 @@ impl BeaconChain { produce_at_slot: Slot, randao_reveal: Signature, graffiti: Graffiti, + parent_execution_requests: &ExecutionRequests, ) -> Result<(PartialBeaconBlock, BeaconState), BlockProductionError> { // It is invalid to try to produce a block using a state from a future slot. @@ -256,6 +262,13 @@ impl BeaconChain { let (mut proposer_slashings, mut attester_slashings, mut voluntary_exits) = self.op_pool.get_slashings_and_exits(&state, &self.spec); + filter_voluntary_exits_for_parent_execution_requests( + &mut voluntary_exits, + parent_execution_requests, + |idx| state.validators().get(idx as usize).map(|v| v.pubkey), + &self.spec, + ); + drop(slashings_and_exits_span); let eth1_data = state.eth1_data().clone(); @@ -319,6 +332,11 @@ impl BeaconChain { .map_err(BlockProductionError::OpPoolError)? }; + let mut payload_attestations = self + .op_pool + .get_payload_attestations(&state, parent_root, &self.spec) + .map_err(BlockProductionError::OpPoolError)?; + // If paranoid mode is enabled re-check the signatures of every included message. // This will be a lot slower but guards against bugs in block production and can be // quickly rolled out without a release. @@ -343,6 +361,35 @@ impl BeaconChain { .is_ok() }); + payload_attestations.retain(|att| { + match get_indexed_payload_attestation(&state, att, &self.spec) { + Ok(indexed) => is_valid_indexed_payload_attestation( + &state, + &indexed, + VerifySignatures::True, + &self.spec, + ) + .map_err(|e| { + warn!( + err = ?e, + block_slot = %state.slot(), + ?att, + "Attempted to include a payload attestation with invalid signature" + ); + }) + .is_ok(), + Err(e) => { + warn!( + err = ?e, + block_slot = %state.slot(), + ?att, + "Failed to index payload attestation for verification" + ); + false + } + } + }); + proposer_slashings.retain(|slashing| { slashing .clone() @@ -386,8 +433,6 @@ impl BeaconChain { }) .is_ok() }); - - // TODO(gloas) verify payload attestation signature here as well } let attester_slashings = attester_slashings @@ -434,8 +479,7 @@ impl BeaconChain { deposits, voluntary_exits, sync_aggregate, - // TODO(gloas) need to implement payload attestations - payload_attestations: vec![], + payload_attestations, bls_to_execution_changes, }, state, @@ -605,9 +649,14 @@ impl BeaconChain { let envelope_slot = payload_data.slot; // TODO(gloas) might be safer to cache by root instead of by slot. // We should revisit this once this code path + beacon api spec matures - self.pending_payload_envelopes - .write() - .insert(envelope_slot, signed_envelope.message); + let (blobs, _) = payload_data.blobs_and_proofs; + self.pending_payload_envelopes.write().insert( + envelope_slot, + PendingEnvelopeData { + envelope: signed_envelope.message, + blobs: Some(blobs), + }, + ); debug!( %beacon_block_root, @@ -727,7 +776,7 @@ impl BeaconChain { payload_value: _, execution_requests, blob_kzg_commitments, - blobs_and_proofs: _, + blobs_and_proofs, } = block_proposal_contents; // TODO(gloas) since we are defaulting to local building, execution payment is 0 @@ -753,6 +802,7 @@ impl BeaconChain { execution_requests, builder_index, slot: produce_at_slot, + blobs_and_proofs, }; // TODO(gloas) this is only local building @@ -926,3 +976,178 @@ where Ok(block_contents) } + +/// Drop voluntary exits whose target validators will be exited by the parent envelope's +/// execution requests. +/// +/// In Gloas the parent execution payload is processed before voluntary exits during block +/// processing. EL-triggered withdrawal-full-exit requests (EIP-7002) and cross-pubkey +/// consolidation requests (EIP-7251) call `initiate_validator_exit`, setting the target's +/// `exit_epoch`. A voluntary exit for the same validator would then fail with `AlreadyExited`. +fn filter_voluntary_exits_for_parent_execution_requests( + voluntary_exits: &mut Vec, + parent_execution_requests: &ExecutionRequests, + pubkey_at_index: impl Fn(u64) -> Option, + spec: &ChainSpec, +) { + let mut exited_pubkeys = HashSet::with_capacity( + parent_execution_requests.withdrawals.len() + + parent_execution_requests.consolidations.len(), + ); + for req in &parent_execution_requests.withdrawals { + if req.amount == spec.full_exit_request_amount { + exited_pubkeys.insert(req.validator_pubkey); + } + } + for req in &parent_execution_requests.consolidations { + if req.source_pubkey != req.target_pubkey { + exited_pubkeys.insert(req.source_pubkey); + } + } + if !exited_pubkeys.is_empty() { + voluntary_exits.retain(|exit| { + pubkey_at_index(exit.message.validator_index) + .map(|pk| !exited_pubkeys.contains(&pk)) + .unwrap_or(false) + }); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use ssz_types::VariableList; + use types::{ConsolidationRequest, Epoch, MainnetEthSpec, VoluntaryExit, WithdrawalRequest}; + + type TestSpec = MainnetEthSpec; + + fn pubkey(byte: u8) -> PublicKeyBytes { + PublicKeyBytes::deserialize(&[byte; 48]).expect("valid pubkey byte length") + } + + fn exit(validator_index: u64) -> SignedVoluntaryExit { + SignedVoluntaryExit { + message: VoluntaryExit { + epoch: Epoch::new(0), + validator_index, + }, + signature: Signature::empty(), + } + } + + fn requests( + withdrawals: Vec, + consolidations: Vec, + ) -> ExecutionRequests { + ExecutionRequests { + deposits: VariableList::empty(), + withdrawals: VariableList::new(withdrawals).unwrap(), + consolidations: VariableList::new(consolidations).unwrap(), + } + } + + fn run_filter( + exits: &mut Vec, + requests: &ExecutionRequests, + validator_pubkeys: &[PublicKeyBytes], + spec: &ChainSpec, + ) { + filter_voluntary_exits_for_parent_execution_requests( + exits, + requests, + |idx| validator_pubkeys.get(idx as usize).copied(), + spec, + ); + } + + #[test] + fn full_exit_withdrawal_request_filters_matching_voluntary_exit() { + let spec = ChainSpec::mainnet(); + let validators = vec![pubkey(1), pubkey(2)]; + let mut exits = vec![exit(0), exit(1)]; + let reqs = requests( + vec![WithdrawalRequest { + source_address: Address::repeat_byte(0xaa), + validator_pubkey: validators[0], + amount: spec.full_exit_request_amount, + }], + vec![], + ); + + run_filter(&mut exits, &reqs, &validators, &spec); + + assert_eq!(exits.len(), 1); + assert_eq!(exits[0].message.validator_index, 1); + } + + #[test] + fn partial_withdrawal_request_does_not_filter_voluntary_exit() { + let spec = ChainSpec::mainnet(); + let validators = vec![pubkey(1)]; + let mut exits = vec![exit(0)]; + let reqs = requests( + vec![WithdrawalRequest { + source_address: Address::repeat_byte(0xaa), + validator_pubkey: validators[0], + amount: spec.full_exit_request_amount + 1, + }], + vec![], + ); + + run_filter(&mut exits, &reqs, &validators, &spec); + + assert_eq!(exits.len(), 1); + } + + #[test] + fn cross_pubkey_consolidation_filters_voluntary_exit_for_source_only() { + let spec = ChainSpec::mainnet(); + let validators = vec![pubkey(1), pubkey(2), pubkey(3)]; + let mut exits = vec![exit(0), exit(1), exit(2)]; + let reqs = requests( + vec![], + vec![ConsolidationRequest { + source_address: Address::repeat_byte(0xaa), + source_pubkey: validators[1], + target_pubkey: validators[2], + }], + ); + + run_filter(&mut exits, &reqs, &validators, &spec); + + // The source (validator 1) is exited; the target (validator 2) is not. + let remaining: Vec = exits.iter().map(|e| e.message.validator_index).collect(); + assert_eq!(remaining, vec![0, 2]); + } + + #[test] + fn self_consolidation_does_not_filter_voluntary_exit() { + let spec = ChainSpec::mainnet(); + let validators = vec![pubkey(1)]; + let mut exits = vec![exit(0)]; + let reqs = requests( + vec![], + vec![ConsolidationRequest { + source_address: Address::repeat_byte(0xaa), + source_pubkey: validators[0], + target_pubkey: validators[0], + }], + ); + + run_filter(&mut exits, &reqs, &validators, &spec); + + assert_eq!(exits.len(), 1); + } + + #[test] + fn empty_parent_requests_preserve_voluntary_exits() { + let spec = ChainSpec::mainnet(); + let validators = vec![pubkey(1), pubkey(2)]; + let mut exits = vec![exit(0), exit(1)]; + let reqs = requests(vec![], vec![]); + + run_filter(&mut exits, &reqs, &validators, &spec); + + assert_eq!(exits.len(), 2); + } +} diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index b8aeef0700..3f658f0d11 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -1045,6 +1045,7 @@ where observed_aggregators: <_>::default(), // TODO: allow for persisting and loading the pool from disk. observed_sync_aggregators: <_>::default(), + observed_payload_attesters: <_>::default(), // TODO: allow for persisting and loading the pool from disk. observed_block_producers: <_>::default(), observed_column_sidecars: RwLock::new(ObservedDataSidecars::new(self.spec.clone())), diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index 04c18c88e0..0e6515ebbd 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -796,9 +796,9 @@ impl BeaconChain { let new_snapshot = &new_cached_head.snapshot; let old_snapshot = &old_cached_head.snapshot; - // If the head changed, perform some updates. - if (new_snapshot.beacon_block_root != old_snapshot.beacon_block_root - || new_payload_status != old_payload_status) + // Only run on head *block* changes - payload status changes only need the + // `cached_head` update above, not re-org detection or event emission. + if new_snapshot.beacon_block_root != old_snapshot.beacon_block_root && let Err(e) = self.after_new_head(&old_cached_head, &new_cached_head, new_head_proto_block) { diff --git a/beacon_node/beacon_chain/src/kzg_utils.rs b/beacon_node/beacon_chain/src/kzg_utils.rs index 9641aec47d..b05a896777 100644 --- a/beacon_node/beacon_chain/src/kzg_utils.rs +++ b/beacon_node/beacon_chain/src/kzg_utils.rs @@ -296,6 +296,35 @@ pub fn blobs_to_data_column_sidecars( } } +/// Build Gloas data column sidecars from blobs, computing cells and proofs locally. +pub fn blobs_to_data_column_sidecars_gloas( + blobs: &[&Blob], + beacon_block_root: Hash256, + slot: Slot, + kzg: &Kzg, + spec: &ChainSpec, +) -> Result, DataColumnSidecarError> { + if blobs.is_empty() { + return Ok(vec![]); + } + + let blob_cells_and_proofs_vec = blobs + .into_par_iter() + .map(|blob| { + let blob = blob.as_ref().try_into().map_err(|e| { + KzgError::InconsistentArrayLength(format!( + "blob should have a guaranteed size due to FixedVector: {e:?}" + )) + })?; + + kzg.compute_cells_and_proofs(blob) + }) + .collect::, KzgError>>()?; + + build_data_column_sidecars_gloas(beacon_block_root, slot, blob_cells_and_proofs_vec, spec) + .map_err(DataColumnSidecarError::BuildSidecarFailed) +} + /// Build data column sidecars from a signed beacon block and its blobs. #[instrument(skip_all, level = "debug", fields(blob_count = blobs_and_proofs.len()))] pub fn blobs_to_partial_data_columns( @@ -728,8 +757,8 @@ pub fn reconstruct_data_columns( #[cfg(test)] mod test { use crate::kzg_utils::{ - blobs_to_data_column_sidecars, reconstruct_blobs, reconstruct_data_columns, - validate_full_data_columns, + blobs_to_data_column_sidecars, blobs_to_data_column_sidecars_gloas, reconstruct_blobs, + reconstruct_data_columns, validate_full_data_columns, }; use bls::Signature; use eth2::types::BlobsBundle; @@ -737,25 +766,30 @@ mod test { use kzg::{Kzg, KzgCommitment, trusted_setup::get_trusted_setup}; use types::{ BeaconBlock, BeaconBlockFulu, BlobsList, ChainSpec, EmptyBlock, EthSpec, ForkName, - FullPayload, KzgProofs, MainnetEthSpec, SignedBeaconBlock, kzg_ext::KzgCommitments, + FullPayload, Hash256, KzgProofs, MainnetEthSpec, SignedBeaconBlock, Slot, + kzg_ext::KzgCommitments, }; type E = MainnetEthSpec; // Loading and initializing PeerDAS KZG is expensive and slow, so we group the tests together // only load it once. - // TODO(Gloas) make this generic over fulu/gloas, or write a separate function for Gloas #[test] fn test_build_data_columns_sidecars() { - let spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); let kzg = get_kzg(); - test_build_data_columns_empty(&kzg, &spec); - test_build_data_columns_fulu(&kzg, &spec); - test_reconstruct_data_columns(&kzg, &spec); - test_reconstruct_data_columns_unordered(&kzg, &spec); - test_reconstruct_blobs_from_data_columns(&kzg, &spec); - test_reconstruct_blobs_from_data_columns_unordered(&kzg, &spec); - test_validate_data_columns(&kzg, &spec); + + let fulu_spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); + test_build_data_columns_empty(&kzg, &fulu_spec); + test_build_data_columns_fulu(&kzg, &fulu_spec); + test_reconstruct_data_columns(&kzg, &fulu_spec); + test_reconstruct_data_columns_unordered(&kzg, &fulu_spec); + test_reconstruct_blobs_from_data_columns(&kzg, &fulu_spec); + test_reconstruct_blobs_from_data_columns_unordered(&kzg, &fulu_spec); + test_validate_data_columns(&kzg, &fulu_spec); + + let gloas_spec = ForkName::Gloas.make_genesis_spec(E::default_spec()); + test_build_data_columns_gloas(&kzg, &gloas_spec); + test_build_data_columns_gloas_empty(&kzg, &gloas_spec); } #[track_caller] @@ -784,8 +818,49 @@ mod test { assert!(column_sidecars.is_empty()); } - // TODO(gloas) create `test_build_data_columns_gloas` and make sure its called - // in the relevant places + #[track_caller] + fn test_build_data_columns_gloas(kzg: &Kzg, spec: &ChainSpec) { + let num_of_blobs = 2; + let (blobs, _proofs) = create_test_gloas_blobs::(num_of_blobs); + let beacon_block_root = Hash256::random(); + let slot = Slot::new(0); + + let blob_refs: Vec<_> = blobs.iter().collect(); + let column_sidecars = blobs_to_data_column_sidecars_gloas::( + &blob_refs, + beacon_block_root, + slot, + kzg, + spec, + ) + .unwrap(); + + assert_eq!(column_sidecars.len(), E::number_of_columns()); + for (idx, col_sidecar) in column_sidecars.iter().enumerate() { + assert_eq!(*col_sidecar.index(), idx as u64); + assert_eq!(col_sidecar.column().len(), num_of_blobs); + assert_eq!(col_sidecar.kzg_proofs().len(), num_of_blobs); + + let gloas_col = col_sidecar.as_gloas().expect("should be Gloas sidecar"); + assert_eq!(gloas_col.beacon_block_root, beacon_block_root); + assert_eq!(gloas_col.slot, slot); + } + } + + #[track_caller] + fn test_build_data_columns_gloas_empty(kzg: &Kzg, spec: &ChainSpec) { + let blob_refs: Vec<&types::Blob> = vec![]; + let column_sidecars = blobs_to_data_column_sidecars_gloas::( + &blob_refs, + Hash256::random(), + Slot::new(0), + kzg, + spec, + ) + .unwrap(); + assert!(column_sidecars.is_empty()); + } + #[track_caller] fn test_build_data_columns_fulu(kzg: &Kzg, spec: &ChainSpec) { // Using at least 2 blobs to make sure we're arranging the data columns correctly. @@ -974,4 +1049,9 @@ mod test { (signed_block, blobs, proofs) } + + fn create_test_gloas_blobs(num_of_blobs: usize) -> (BlobsList, KzgProofs) { + let (blobs_bundle, _) = generate_blobs::(num_of_blobs, ForkName::Gloas).unwrap(); + (blobs_bundle.blobs, blobs_bundle.proofs) + } } diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 4af5b1627e..12f3a86956 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -45,6 +45,7 @@ pub mod observed_data_sidecars; pub mod observed_operations; mod observed_slashable; pub mod partial_data_column_assembler; +pub mod payload_attestation_verification; pub mod payload_bid_verification; pub mod payload_envelope_streamer; pub mod payload_envelope_verification; diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index c95ba87520..ef3b1995c3 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -1468,6 +1468,27 @@ pub static SYNC_MESSAGE_GOSSIP_VERIFICATION_TIMES: LazyLock> = "Full runtime of sync contribution gossip verification", ) }); +pub static PAYLOAD_ATTESTATION_PROCESSING_REQUESTS: LazyLock> = + LazyLock::new(|| { + try_create_int_counter( + "beacon_payload_attestation_processing_requests_total", + "Count of all payload attestation messages submitted for processing", + ) + }); +pub static PAYLOAD_ATTESTATION_PROCESSING_SUCCESSES: LazyLock> = + LazyLock::new(|| { + try_create_int_counter( + "beacon_payload_attestation_processing_successes_total", + "Number of payload attestation messages verified for gossip", + ) + }); +pub static PAYLOAD_ATTESTATION_GOSSIP_VERIFICATION_TIMES: LazyLock> = + LazyLock::new(|| { + try_create_histogram( + "beacon_payload_attestation_gossip_verification_seconds", + "Full runtime of payload attestation gossip verification", + ) + }); pub static SYNC_MESSAGE_EQUIVOCATIONS: LazyLock> = LazyLock::new(|| { try_create_int_counter( "sync_message_equivocations_total", diff --git a/beacon_node/beacon_chain/src/observed_attesters.rs b/beacon_node/beacon_chain/src/observed_attesters.rs index 277bf38ffc..4bb536880c 100644 --- a/beacon_node/beacon_chain/src/observed_attesters.rs +++ b/beacon_node/beacon_chain/src/observed_attesters.rs @@ -42,6 +42,8 @@ pub type ObservedSyncContributors = pub type ObservedAggregators = AutoPruningEpochContainer; pub type ObservedSyncAggregators = AutoPruningSlotContainer; +pub type ObservedPayloadAttesters = + AutoPruningSlotContainer, E>; #[derive(Debug, PartialEq)] pub enum Error { @@ -255,6 +257,46 @@ impl Item<()> for SyncAggregatorSlotHashSet { } } +/// Stores a `HashSet` of validator indices that have sent a payload attestation gossip +/// message during a slot. +pub struct PayloadAttesterSlotHashSet { + set: HashSet, + phantom: PhantomData, +} + +impl Item<()> for PayloadAttesterSlotHashSet { + fn with_capacity(capacity: usize) -> Self { + Self { + set: HashSet::with_capacity(capacity), + phantom: PhantomData, + } + } + + /// Defaults to `PTC_SIZE`, the maximum number of payload attesters per slot. + fn default_capacity() -> usize { + E::ptc_size() + } + + fn len(&self) -> usize { + self.set.len() + } + + fn validator_count(&self) -> usize { + self.set.len() + } + + /// Inserts the `validator_index` in the set. Returns `true` if the `validator_index` was + /// already in the set. + fn insert(&mut self, validator_index: usize, _value: ()) -> bool { + !self.set.insert(validator_index) + } + + /// Returns `true` if the `validator_index` is in the set. + fn get(&self, validator_index: usize) -> Option<()> { + self.set.contains(&validator_index).then_some(()) + } +} + /// A container that stores some number of `T` items. /// /// This container is "auto-pruning" since it gets an idea of the current slot by which diff --git a/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs b/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs new file mode 100644 index 0000000000..c36c73b344 --- /dev/null +++ b/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs @@ -0,0 +1,271 @@ +use super::Error; +use crate::beacon_chain::BeaconStore; +use crate::canonical_head::CanonicalHead; +use crate::observed_attesters::ObservedPayloadAttesters; +use crate::validator_pubkey_cache::ValidatorPubkeyCache; +use crate::{BeaconChain, BeaconChainError, BeaconChainTypes, metrics}; +use bls::AggregateSignature; +use educe::Educe; +use eth2::types::{EventKind, ForkVersionedResponse}; +use parking_lot::RwLock; +use safe_arith::SafeArith; +use slot_clock::SlotClock; +use state_processing::per_block_processing::signature_sets::indexed_payload_attestation_signature_set; +use state_processing::state_advance::partial_state_advance; +use std::borrow::Cow; +use types::{ChainSpec, EthSpec, IndexedPayloadAttestation, PTC, PayloadAttestationMessage, Slot}; + +pub struct GossipVerificationContext<'a, T: BeaconChainTypes> { + pub slot_clock: &'a T::SlotClock, + pub spec: &'a ChainSpec, + pub observed_payload_attesters: &'a RwLock>, + pub canonical_head: &'a CanonicalHead, + pub validator_pubkey_cache: &'a RwLock>, + pub store: &'a BeaconStore, +} + +/// A `PayloadAttestationMessage` that has been verified for propagation on the gossip network. +#[derive(Educe)] +#[educe(Clone, Debug)] +pub struct VerifiedPayloadAttestationMessage { + payload_attestation_message: PayloadAttestationMessage, + indexed_payload_attestation: IndexedPayloadAttestation, + ptc: PTC, +} + +impl VerifiedPayloadAttestationMessage { + pub fn new( + payload_attestation_message: PayloadAttestationMessage, + ctx: &GossipVerificationContext<'_, T>, + ) -> Result { + let slot = payload_attestation_message.data.slot; + let validator_index = payload_attestation_message.validator_index; + + // [IGNORE] `data.slot` is within the `MAXIMUM_GOSSIP_CLOCK_DISPARITY` allowance. + verify_propagation_slot_range(ctx.slot_clock, slot, ctx.spec)?; + + // [IGNORE] There has been no other valid payload attestation message for this + // validator index. + if ctx + .observed_payload_attesters + .read() + .validator_has_been_observed(slot, validator_index as usize) + .map_err(BeaconChainError::from)? + { + return Err(Error::PriorPayloadAttestationMessageKnown { + validator_index, + slot, + }); + } + + // [IGNORE] `data.beacon_block_root` has been seen + // [REJECT] `data.beacon_block_root` passes validation. + // + // TODO(gloas): These two conditions are conflated. We need a status table to + // differentiate between: + // 1. Blocks we haven't seen (IGNORE), and + // 2. Blocks we've seen that are invalid (REJECT). + // Presently both cases return IGNORE. + let beacon_block_root = payload_attestation_message.data.beacon_block_root; + if ctx + .canonical_head + .fork_choice_read_lock() + .get_block(&beacon_block_root) + .is_none() + { + return Err(Error::UnknownHeadBlock { beacon_block_root }); + } + + // Get head state for PTC computation. If the cached head state is too stale + // (e.g. during liveness failures with many skipped slots), fall back to loading + // a more recent state from the store and advancing it if necessary. + let head = ctx.canonical_head.cached_head(); + let head_state = &head.snapshot.beacon_state; + + let message_epoch = slot.epoch(T::EthSpec::slots_per_epoch()); + let state_epoch = head_state.current_epoch(); + + // get_ptc can serve epochs in [state_epoch - 1, state_epoch + min_seed_lookahead]. + // If the message epoch is beyond that range, the head state is stale. + let advanced_state = if message_epoch + > state_epoch + .safe_add(ctx.spec.min_seed_lookahead) + .map_err(BeaconChainError::from)? + { + let head_block_root = head.head_block_root(); + let target_slot = message_epoch.start_slot(T::EthSpec::slots_per_epoch()); + + let (state_root, mut state) = ctx + .store + .get_advanced_hot_state( + head_block_root, + target_slot, + head.snapshot.beacon_state_root(), + ) + .map_err(BeaconChainError::from)? + .ok_or(BeaconChainError::MissingBeaconState( + head.snapshot.beacon_state_root(), + ))?; + + if state + .current_epoch() + .safe_add(ctx.spec.min_seed_lookahead) + .map_err(BeaconChainError::from)? + < message_epoch + { + partial_state_advance(&mut state, Some(state_root), target_slot, ctx.spec) + .map_err(BeaconChainError::from)?; + } + + Some(state) + } else { + None + }; + + let state = advanced_state.as_ref().unwrap_or(head_state); + + // [REJECT] `validator_index` is within `get_ptc(state, data.slot)`. + let ptc = state.get_ptc(slot, ctx.spec)?; + if !ptc.0.contains(&(validator_index as usize)) { + return Err(Error::NotInPTC { + validator_index, + slot, + }); + } + + // Build the indexed form for signature verification and downstream fork choice. + let indexed_payload_attestation = IndexedPayloadAttestation { + attesting_indices: vec![validator_index] + .try_into() + .map_err(|_| Error::UnknownValidatorIndex(validator_index))?, + data: payload_attestation_message.data.clone(), + signature: AggregateSignature::from(&payload_attestation_message.signature), + }; + + { + // [REJECT] The signature is valid with respect to the `validator_index`. + let pubkey_cache = ctx.validator_pubkey_cache.read(); + let signature_set = indexed_payload_attestation_signature_set( + state, + |validator_index| pubkey_cache.get(validator_index).map(Cow::Borrowed), + &indexed_payload_attestation.signature, + &indexed_payload_attestation, + ctx.spec, + ) + .map_err(|_| Error::UnknownValidatorIndex(validator_index))?; + + if !signature_set.verify() { + return Err(Error::InvalidSignature); + } + } + + // Record that we have received a valid payload attestation message from this + // validator. Double check with the write lock to handle race conditions. + if ctx + .observed_payload_attesters + .write() + .observe_validator(slot, validator_index as usize, ()) + .map_err(BeaconChainError::from)? + { + return Err(Error::PriorPayloadAttestationMessageKnown { + validator_index, + slot, + }); + } + + Ok(Self { + payload_attestation_message, + indexed_payload_attestation, + ptc, + }) + } + + pub fn payload_attestation_message(&self) -> &PayloadAttestationMessage { + &self.payload_attestation_message + } + + pub fn indexed_payload_attestation(&self) -> &IndexedPayloadAttestation { + &self.indexed_payload_attestation + } + + pub fn ptc(&self) -> &PTC { + &self.ptc + } + + pub fn into_payload_attestation_message(self) -> PayloadAttestationMessage { + self.payload_attestation_message + } +} + +impl BeaconChain { + pub fn payload_attestation_gossip_context(&self) -> GossipVerificationContext<'_, T> { + GossipVerificationContext { + slot_clock: &self.slot_clock, + spec: &self.spec, + observed_payload_attesters: &self.observed_payload_attesters, + canonical_head: &self.canonical_head, + validator_pubkey_cache: &self.validator_pubkey_cache, + store: &self.store, + } + } + + pub fn verify_payload_attestation_message_for_gossip( + &self, + payload_attestation_message: PayloadAttestationMessage, + ) -> Result, Error> { + metrics::inc_counter(&metrics::PAYLOAD_ATTESTATION_PROCESSING_REQUESTS); + let _timer = metrics::start_timer(&metrics::PAYLOAD_ATTESTATION_GOSSIP_VERIFICATION_TIMES); + + let ctx = self.payload_attestation_gossip_context(); + VerifiedPayloadAttestationMessage::new(payload_attestation_message, &ctx).inspect( + |verified| { + metrics::inc_counter(&metrics::PAYLOAD_ATTESTATION_PROCESSING_SUCCESSES); + + if let Some(event_handler) = self.event_handler.as_ref() + && event_handler.has_payload_attestation_message_subscribers() + { + let msg = verified.payload_attestation_message(); + event_handler.register(EventKind::PayloadAttestationMessage(Box::new( + ForkVersionedResponse { + version: self.spec.fork_name_at_slot::(msg.data.slot), + metadata: Default::default(), + data: msg.clone(), + }, + ))); + } + }, + ) + } +} + +/// Verify that the `slot` is within the acceptable gossip propagation range, with reference +/// to the current slot of the clock. +/// +/// Accounts for `MAXIMUM_GOSSIP_CLOCK_DISPARITY`. +fn verify_propagation_slot_range( + slot_clock: &S, + message_slot: Slot, + spec: &ChainSpec, +) -> Result<(), Error> { + let latest_permissible_slot = slot_clock + .now_with_future_tolerance(spec.maximum_gossip_clock_disparity()) + .ok_or(BeaconChainError::UnableToReadSlot)?; + if message_slot > latest_permissible_slot { + return Err(Error::FutureSlot { + message_slot, + latest_permissible_slot, + }); + } + + let earliest_permissible_slot = slot_clock + .now_with_past_tolerance(spec.maximum_gossip_clock_disparity()) + .ok_or(BeaconChainError::UnableToReadSlot)?; + if message_slot < earliest_permissible_slot { + return Err(Error::PastSlot { + message_slot, + earliest_permissible_slot, + }); + } + + Ok(()) +} diff --git a/beacon_node/beacon_chain/src/payload_attestation_verification/mod.rs b/beacon_node/beacon_chain/src/payload_attestation_verification/mod.rs new file mode 100644 index 0000000000..477527c0aa --- /dev/null +++ b/beacon_node/beacon_chain/src/payload_attestation_verification/mod.rs @@ -0,0 +1,110 @@ +//! Provides verification for `PayloadAttestationMessage` received from the gossip network. +//! +//! ```ignore +//! types::PayloadAttestationMessage +//! | +//! ▼ +//! VerifiedPayloadAttestationMessage +//! ``` + +use crate::BeaconChainError; +use strum::AsRefStr; +use types::{BeaconStateError, Hash256, Slot}; + +pub mod gossip_verified_payload_attestation; + +pub use gossip_verified_payload_attestation::{ + GossipVerificationContext, VerifiedPayloadAttestationMessage, +}; + +/// Returned when a payload attestation message was not successfully verified. It might not have +/// been verified for two reasons: +/// +/// - The message is malformed or inappropriate for the context (indicated by all variants +/// other than `BeaconChainError`). +/// - The application encountered an internal error whilst attempting to determine validity +/// (the `BeaconChainError` variant) +#[derive(Debug, AsRefStr)] +pub enum Error { + /// The payload attestation message is from a slot that is later than the current slot + /// (with respect to the gossip clock disparity). + /// + /// ## Peer scoring + /// + /// Assuming the local clock is correct, the peer has sent an invalid message. + FutureSlot { + message_slot: Slot, + latest_permissible_slot: Slot, + }, + /// The payload attestation message is from a slot that is prior to the earliest + /// permissible slot (with respect to the gossip clock disparity). + /// + /// ## Peer scoring + /// + /// Assuming the local clock is correct, the peer has sent an invalid message. + PastSlot { + message_slot: Slot, + earliest_permissible_slot: Slot, + }, + /// We have already observed a valid payload attestation message from this validator + /// for this slot. + /// + /// ## Peer scoring + /// + /// The peer is not necessarily faulty. + PriorPayloadAttestationMessageKnown { validator_index: u64, slot: Slot }, + /// The beacon block referenced by the payload attestation message is not known. + /// + /// ## Peer scoring + /// + /// The attestation points to a block we have not yet imported. It's unclear if the + /// attestation is valid or not. + UnknownHeadBlock { beacon_block_root: Hash256 }, + /// The validator index is not a member of the PTC for this slot. + /// + /// ## Peer scoring + /// + /// The peer has sent an invalid message. + NotInPTC { validator_index: u64, slot: Slot }, + /// The validator index is unknown. + /// + /// ## Peer scoring + /// + /// The peer has sent an invalid message. + UnknownValidatorIndex(u64), + /// The signature on the payload attestation message is invalid. + /// + /// ## Peer scoring + /// + /// The peer has sent an invalid message. + InvalidSignature, + /// There was an error whilst processing the payload attestation message. It is not known + /// if it is valid or invalid. + /// + /// ## Peer scoring + /// + /// We were unable to process this message due to an internal error. It's unclear if the + /// message is valid. + BeaconChainError(Box), + /// An error reading beacon state. + /// + /// ## Peer scoring + /// + /// We were unable to process this message due to an internal error. + BeaconStateError(BeaconStateError), +} + +impl From for Error { + fn from(e: BeaconChainError) -> Self { + Error::BeaconChainError(Box::new(e)) + } +} + +impl From for Error { + fn from(e: BeaconStateError) -> Self { + Error::BeaconStateError(e) + } +} + +#[cfg(test)] +mod tests; diff --git a/beacon_node/beacon_chain/src/payload_attestation_verification/tests.rs b/beacon_node/beacon_chain/src/payload_attestation_verification/tests.rs new file mode 100644 index 0000000000..7faad98e55 --- /dev/null +++ b/beacon_node/beacon_chain/src/payload_attestation_verification/tests.rs @@ -0,0 +1,422 @@ +use std::sync::Arc; +use std::time::Duration; + +use bls::{Keypair, Signature}; +use fork_choice::ForkChoice; +use genesis::{generate_deterministic_keypairs, interop_genesis_state}; +use parking_lot::RwLock; +use proto_array::PayloadStatus; +use slot_clock::{SlotClock, TestingSlotClock}; +use state_processing::AllCaches; +use state_processing::genesis::genesis_block; +use store::{HotColdDB, StoreConfig}; +use types::{ + ChainSpec, Checkpoint, Domain, Epoch, EthSpec, Hash256, MinimalEthSpec, PayloadAttestationData, + PayloadAttestationMessage, SignedBeaconBlock, SignedRoot, Slot, +}; + +use crate::{ + beacon_fork_choice_store::BeaconForkChoiceStore, + beacon_snapshot::BeaconSnapshot, + canonical_head::CanonicalHead, + observed_attesters::ObservedPayloadAttesters, + payload_attestation_verification::{ + Error as PayloadAttestationError, + gossip_verified_payload_attestation::{ + GossipVerificationContext, VerifiedPayloadAttestationMessage, + }, + }, + test_utils::{BeaconChainHarness, EphemeralHarnessType, fork_name_from_env, test_spec}, + validator_pubkey_cache::ValidatorPubkeyCache, +}; + +type E = MinimalEthSpec; +type T = EphemeralHarnessType; + +const NUM_VALIDATORS: usize = 64; + +struct TestContext { + canonical_head: CanonicalHead, + observed_payload_attesters: RwLock>, + validator_pubkey_cache: RwLock>, + slot_clock: TestingSlotClock, + keypairs: Vec, + spec: ChainSpec, + genesis_block_root: Hash256, + store: Arc, store::MemoryStore>>, +} + +impl TestContext { + fn new() -> Self { + let spec = test_spec::(); + let store = Arc::new( + HotColdDB::open_ephemeral(StoreConfig::default(), Arc::new(spec.clone())) + .expect("should open ephemeral store"), + ); + + let keypairs = generate_deterministic_keypairs(NUM_VALIDATORS); + + let mut state = + interop_genesis_state::(&keypairs, 0, Hash256::repeat_byte(0x42), None, &spec) + .expect("should build genesis state"); + + *state.finalized_checkpoint_mut() = Checkpoint { + epoch: Epoch::new(1), + root: Hash256::ZERO, + }; + + let mut block = genesis_block(&state, &spec).expect("should build genesis block"); + *block.state_root_mut() = state + .update_tree_hash_cache() + .expect("should hash genesis state"); + let signed_block = SignedBeaconBlock::from_block(block, Signature::empty()); + let block_root = signed_block.canonical_root(); + + let snapshot = BeaconSnapshot::new( + Arc::new(signed_block.clone()), + None, + block_root, + state.clone(), + ); + + let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store.clone(), snapshot.clone()) + .expect("should create fork choice store"); + let fork_choice = + ForkChoice::from_anchor(fc_store, block_root, &signed_block, &state, None, &spec) + .expect("should create fork choice"); + + let canonical_head = + CanonicalHead::new(fork_choice, Arc::new(snapshot), PayloadStatus::Pending); + + let slot_clock = TestingSlotClock::new( + Slot::new(0), + Duration::from_secs(0), + spec.get_slot_duration(), + ); + // Advance past genesis so `now_with_past_tolerance` doesn't underflow. + slot_clock.set_current_time(spec.get_slot_duration()); + + let validator_pubkey_cache = + ValidatorPubkeyCache::new(&state, store.clone()).expect("should create pubkey cache"); + + Self { + canonical_head, + observed_payload_attesters: RwLock::new(ObservedPayloadAttesters::default()), + validator_pubkey_cache: RwLock::new(validator_pubkey_cache), + slot_clock, + keypairs, + spec, + genesis_block_root: block_root, + store, + } + } + + fn gossip_ctx(&self) -> GossipVerificationContext<'_, T> { + GossipVerificationContext { + slot_clock: &self.slot_clock, + spec: &self.spec, + observed_payload_attesters: &self.observed_payload_attesters, + canonical_head: &self.canonical_head, + validator_pubkey_cache: &self.validator_pubkey_cache, + store: &self.store, + } + } + + fn ptc_members(&self, slot: Slot) -> Vec { + let head = self.canonical_head.cached_head(); + let state = &head.snapshot.beacon_state; + let ptc = state.get_ptc(slot, &self.spec).expect("should get PTC"); + ptc.0.to_vec() + } + + fn sign_payload_attestation( + &self, + data: PayloadAttestationData, + validator_index: u64, + ) -> PayloadAttestationMessage { + let head = self.canonical_head.cached_head(); + let state = &head.snapshot.beacon_state; + let domain = self.spec.get_domain( + data.slot.epoch(E::slots_per_epoch()), + Domain::PTCAttester, + &state.fork(), + state.genesis_validators_root(), + ); + let message = data.signing_root(domain); + let signature = self.keypairs[validator_index as usize].sk.sign(message); + PayloadAttestationMessage { + validator_index, + data, + signature, + } + } +} + +fn make_payload_attestation( + slot: Slot, + validator_index: u64, + beacon_block_root: Hash256, +) -> PayloadAttestationMessage { + PayloadAttestationMessage { + validator_index, + data: PayloadAttestationData { + beacon_block_root, + slot, + payload_present: true, + blob_data_available: true, + }, + signature: Signature::empty(), + } +} + +#[test] +fn future_slot() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + let ctx = TestContext::new(); + let gossip = ctx.gossip_ctx(); + + let future_slot = Slot::new(5); + let msg = make_payload_attestation(future_slot, 0, ctx.genesis_block_root); + let result = VerifiedPayloadAttestationMessage::new(msg, &gossip); + assert!(matches!( + result, + Err(PayloadAttestationError::FutureSlot { .. }) + )); +} + +#[test] +fn past_slot() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + let ctx = TestContext::new(); + ctx.slot_clock.set_slot(5); + let gossip = ctx.gossip_ctx(); + + let msg = make_payload_attestation(Slot::new(0), 0, ctx.genesis_block_root); + let result = VerifiedPayloadAttestationMessage::new(msg, &gossip); + assert!(matches!( + result, + Err(PayloadAttestationError::PastSlot { .. }) + )); +} + +#[test] +fn unknown_head_block() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + let ctx = TestContext::new(); + let gossip = ctx.gossip_ctx(); + + let unknown_root = Hash256::repeat_byte(0xff); + let msg = make_payload_attestation(Slot::new(1), 0, unknown_root); + let result = VerifiedPayloadAttestationMessage::new(msg, &gossip); + assert!( + matches!( + result, + Err(PayloadAttestationError::UnknownHeadBlock { .. }) + ), + "expected UnknownHeadBlock, got: {:?}", + result + ); +} + +#[test] +fn not_in_ptc() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + let ctx = TestContext::new(); + let gossip = ctx.gossip_ctx(); + let slot = Slot::new(1); + + let ptc_members = ctx.ptc_members(slot); + let non_ptc_validator = (0..NUM_VALIDATORS as u64) + .find(|&i| !ptc_members.contains(&(i as usize))) + .expect("should find non-PTC validator"); + + let msg = make_payload_attestation(slot, non_ptc_validator, ctx.genesis_block_root); + let result = VerifiedPayloadAttestationMessage::new(msg, &gossip); + assert!(matches!( + result, + Err(PayloadAttestationError::NotInPTC { .. }) + )); +} + +#[test] +fn invalid_signature() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + let ctx = TestContext::new(); + let gossip = ctx.gossip_ctx(); + let slot = Slot::new(1); + + let ptc_members = ctx.ptc_members(slot); + let validator_index = ptc_members[0] as u64; + + let msg = make_payload_attestation(slot, validator_index, ctx.genesis_block_root); + let result = VerifiedPayloadAttestationMessage::new(msg, &gossip); + assert!(matches!( + result, + Err(PayloadAttestationError::InvalidSignature) + )); +} + +#[test] +fn valid_payload_attestation() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + let ctx = TestContext::new(); + let gossip = ctx.gossip_ctx(); + let slot = Slot::new(1); + + let ptc_members = ctx.ptc_members(slot); + let validator_index = ptc_members[0] as u64; + + let data = PayloadAttestationData { + beacon_block_root: ctx.genesis_block_root, + slot, + payload_present: true, + blob_data_available: true, + }; + let msg = ctx.sign_payload_attestation(data, validator_index); + let result = VerifiedPayloadAttestationMessage::new(msg, &gossip); + assert!( + result.is_ok(), + "expected Ok, got: {:?}", + result.unwrap_err() + ); +} + +#[test] +fn duplicate_after_valid() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + let ctx = TestContext::new(); + let gossip = ctx.gossip_ctx(); + let slot = Slot::new(1); + + let ptc_members = ctx.ptc_members(slot); + let validator_index = ptc_members[0] as u64; + + let data = PayloadAttestationData { + beacon_block_root: ctx.genesis_block_root, + slot, + payload_present: true, + blob_data_available: true, + }; + + let msg1 = ctx.sign_payload_attestation(data.clone(), validator_index); + let result1 = VerifiedPayloadAttestationMessage::new(msg1, &gossip); + assert!( + result1.is_ok(), + "first message should pass: {:?}", + result1.unwrap_err() + ); + + let msg2 = ctx.sign_payload_attestation(data, validator_index); + let result2 = VerifiedPayloadAttestationMessage::new(msg2, &gossip); + assert!(matches!( + result2, + Err(PayloadAttestationError::PriorPayloadAttestationMessageKnown { .. }) + )); +} + +/// Exercises the `partial_state_advance` fallback in gossip verification when +/// the head state is too stale to compute PTC membership (e.g., during a +/// network liveness failure with many missed slots). +#[tokio::test] +async fn stale_head_with_partial_advance() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + + let slots_per_epoch = E::slots_per_epoch(); + // Head at epoch 1, message at epoch 5 — 4 epochs of missed slots. + // This exceeds min_seed_lookahead (1), triggering the fallback path: + // get_advanced_hot_state loads the stored state, then partial_state_advance + // advances it through epoch boundaries to populate ptc_window. + let head_slot = Slot::new(slots_per_epoch); + let missed_epochs = 4; + let target_slot = Slot::new(slots_per_epoch * (1 + missed_epochs)); + let target_epoch = target_slot.epoch(slots_per_epoch); + + // GIVEN a chain with blocks through epoch 1 (so the store has states). + let harness = BeaconChainHarness::builder(E::default()) + .default_spec() + .deterministic_keypairs(64) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + harness.extend_to_slot(head_slot).await; + + let head = harness.chain.canonical_head.cached_head(); + let head_epoch = head.snapshot.beacon_state.current_epoch(); + assert!( + target_epoch > head_epoch + harness.spec.min_seed_lookahead, + "precondition: message epoch must exceed head + min_seed_lookahead to trigger fallback" + ); + + // GIVEN a slot clock advanced to epoch 5 without producing blocks + // (simulating missed slots during a liveness failure). + harness.chain.slot_clock.set_slot(target_slot.as_u64()); + + // Advance a reference state to compute the PTC at the target slot. + let mut reference_state = head.snapshot.beacon_state.clone(); + state_processing::state_advance::partial_state_advance( + &mut reference_state, + Some(head.snapshot.beacon_state_root()), + target_slot, + &harness.spec, + ) + .expect("should advance reference state"); + reference_state + .build_all_caches(&harness.spec) + .expect("should build caches"); + + let ptc = reference_state + .get_ptc(target_slot, &harness.spec) + .expect("should get PTC from reference state"); + let validator_index = *ptc.0.first().expect("PTC should have at least one member") as u64; + + // WHEN a properly-signed payload attestation from a PTC member is verified. + let domain = harness.spec.get_domain( + target_epoch, + Domain::PTCAttester, + &reference_state.fork(), + reference_state.genesis_validators_root(), + ); + let data = PayloadAttestationData { + beacon_block_root: head.head_block_root(), + slot: target_slot, + payload_present: true, + blob_data_available: true, + }; + let message = data.signing_root(domain); + let signature = harness.validator_keypairs[validator_index as usize] + .sk + .sign(message); + let msg = PayloadAttestationMessage { + validator_index, + data, + signature, + }; + + // THEN verification succeeds despite the head being 4 epochs stale. + let result = harness + .chain + .verify_payload_attestation_message_for_gossip(msg); + assert!( + result.is_ok(), + "expected Ok (head epoch {}, message epoch {}), got: {:?}", + head_epoch, + target_epoch, + result.unwrap_err() + ); +} diff --git a/beacon_node/beacon_chain/src/payload_bid_verification/gossip_verified_bid.rs b/beacon_node/beacon_chain/src/payload_bid_verification/gossip_verified_bid.rs index 91945896df..1f3f074598 100644 --- a/beacon_node/beacon_chain/src/payload_bid_verification/gossip_verified_bid.rs +++ b/beacon_node/beacon_chain/src/payload_bid_verification/gossip_verified_bid.rs @@ -6,6 +6,7 @@ use crate::{ proposer_preferences_verification::proposer_preference_cache::GossipVerifiedProposerPreferenceCache, }; use educe::Educe; +use eth2::types::{EventKind, ForkVersionedResponse}; use slot_clock::SlotClock; use state_processing::signature_sets::{ execution_payload_bid_signature_set, get_builder_pubkey_from_state, @@ -233,6 +234,19 @@ impl BeaconChain { %parent_block_root, "Successfully verified gossip payload bid" ); + + if let Some(event_handler) = self.event_handler.as_ref() + && event_handler.has_execution_payload_bid_subscribers() + { + event_handler.register(EventKind::ExecutionPayloadBid(Box::new( + ForkVersionedResponse { + version: self.spec.fork_name_at_slot::(slot), + metadata: Default::default(), + data: (*verified.signed_bid).clone(), + }, + ))); + } + Ok(verified) } Err(e) => { diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs index 8f8ed3dd5e..cc61d11f25 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use std::time::Duration; -use eth2::types::{EventKind, SseExecutionPayload}; +use eth2::types::{EventKind, SseExecutionPayload, SseExecutionPayloadAvailable}; use fork_choice::PayloadVerificationStatus; use slot_clock::SlotClock; use store::StoreOp; @@ -226,6 +226,7 @@ impl BeaconChain { signed_envelope, import_data, payload_verification_outcome, + self.spec.clone(), )) } @@ -406,5 +407,18 @@ impl BeaconChain { execution_optimistic: payload_verification_status.is_optimistic(), })); } + + // TODO(gloas): once the DA checker handles envelopes, this event should also be + // emitted from the DA resolution path (similar to `process_availability` for blocks). + if let Some(event_handler) = self.event_handler.as_ref() + && event_handler.has_execution_payload_available_subscribers() + { + event_handler.register(EventKind::ExecutionPayloadAvailable( + SseExecutionPayloadAvailable { + slot: envelope_slot, + block_root, + }, + )); + } } } diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs index c043a0d74e..330c87c30d 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs @@ -61,6 +61,22 @@ pub struct AvailableEnvelope { } impl AvailableEnvelope { + pub fn new( + execution_block_hash: ExecutionBlockHash, + envelope: Arc>, + columns: DataColumnSidecarList, + columns_available_timestamp: Option, + spec: Arc, + ) -> Self { + Self { + execution_block_hash, + envelope, + columns, + columns_available_timestamp, + spec, + } + } + pub fn message(&self) -> &ExecutionPayloadEnvelope { &self.envelope.message } @@ -105,6 +121,7 @@ pub struct EnvelopeProcessingSnapshot { /// fully available. /// 2. `AvailabilityPending`: This envelope hasn't received all required blobs to consider it /// fully available. +#[allow(dead_code)] pub enum ExecutedEnvelope { Available(AvailableExecutedEnvelope), AvailabilityPending(AvailabilityPendingExecutedEnvelope), @@ -115,6 +132,7 @@ impl ExecutedEnvelope { envelope: MaybeAvailableEnvelope, import_data: EnvelopeImportData, payload_verification_outcome: PayloadVerificationOutcome, + spec: Arc, ) -> Self { match envelope { MaybeAvailableEnvelope::Available(available_envelope) => { diff --git a/beacon_node/beacon_chain/src/pending_payload_envelopes.rs b/beacon_node/beacon_chain/src/pending_payload_envelopes.rs index 351783832d..293553ef54 100644 --- a/beacon_node/beacon_chain/src/pending_payload_envelopes.rs +++ b/beacon_node/beacon_chain/src/pending_payload_envelopes.rs @@ -6,7 +6,12 @@ //! and publishes the payload. use std::collections::HashMap; -use types::{EthSpec, ExecutionPayloadEnvelope, Slot}; +use types::{BlobsList, EthSpec, ExecutionPayloadEnvelope, Slot}; + +pub struct PendingEnvelopeData { + pub envelope: ExecutionPayloadEnvelope, + pub blobs: Option>, +} /// Cache for pending execution payload envelopes awaiting publishing. /// @@ -16,7 +21,7 @@ pub struct PendingPayloadEnvelopes { /// Maximum number of slots to keep envelopes before pruning. max_slot_age: u64, /// The envelopes, keyed by slot. - envelopes: HashMap>, + envelopes: HashMap>, } impl Default for PendingPayloadEnvelopes { @@ -38,19 +43,24 @@ impl PendingPayloadEnvelopes { } /// Insert a pending envelope into the cache. - pub fn insert(&mut self, slot: Slot, envelope: ExecutionPayloadEnvelope) { + pub fn insert(&mut self, slot: Slot, data: PendingEnvelopeData) { // TODO(gloas): we may want to check for duplicates here, which shouldn't be allowed - self.envelopes.insert(slot, envelope); + self.envelopes.insert(slot, data); } /// Get a pending envelope by slot. pub fn get(&self, slot: Slot) -> Option<&ExecutionPayloadEnvelope> { - self.envelopes.get(&slot) + self.envelopes.get(&slot).map(|d| &d.envelope) + } + + /// Remove and return the blobs and proofs for a slot, leaving the envelope in place. + pub fn take_blobs(&mut self, slot: Slot) -> Option> { + self.envelopes.get_mut(&slot).and_then(|d| d.blobs.take()) } /// Remove and return a pending envelope by slot. pub fn remove(&mut self, slot: Slot) -> Option> { - self.envelopes.remove(&slot) + self.envelopes.remove(&slot).map(|d| d.envelope) } /// Check if an envelope exists for the given slot. @@ -85,15 +95,18 @@ mod tests { type E = MainnetEthSpec; - fn make_envelope(slot: Slot) -> ExecutionPayloadEnvelope { - ExecutionPayloadEnvelope { - payload: ExecutionPayloadGloas { - slot_number: slot, - ..ExecutionPayloadGloas::default() + fn make_envelope(slot: Slot) -> PendingEnvelopeData { + PendingEnvelopeData { + envelope: ExecutionPayloadEnvelope { + payload: ExecutionPayloadGloas { + slot_number: slot, + ..ExecutionPayloadGloas::default() + }, + execution_requests: ExecutionRequests::default(), + builder_index: 0, + beacon_block_root: Hash256::ZERO, }, - execution_requests: ExecutionRequests::default(), - builder_index: 0, - beacon_block_root: Hash256::ZERO, + blobs: None, } } @@ -101,33 +114,73 @@ mod tests { fn insert_and_get() { let mut cache = PendingPayloadEnvelopes::::default(); let slot = Slot::new(1); - let envelope = make_envelope(slot); + let data = make_envelope(slot); + let expected_envelope = data.envelope.clone(); assert!(!cache.contains(slot)); assert_eq!(cache.len(), 0); - cache.insert(slot, envelope.clone()); + cache.insert(slot, data); assert!(cache.contains(slot)); assert_eq!(cache.len(), 1); - assert_eq!(cache.get(slot), Some(&envelope)); + assert_eq!(cache.get(slot), Some(&expected_envelope)); } #[test] fn remove() { let mut cache = PendingPayloadEnvelopes::::default(); let slot = Slot::new(1); - let envelope = make_envelope(slot); + let data = make_envelope(slot); + let expected_envelope = data.envelope.clone(); - cache.insert(slot, envelope.clone()); + cache.insert(slot, data); assert!(cache.contains(slot)); let removed = cache.remove(slot); - assert_eq!(removed, Some(envelope)); + assert_eq!(removed, Some(expected_envelope)); assert!(!cache.contains(slot)); assert_eq!(cache.len(), 0); } + #[test] + fn take_blobs_returns_once() { + let mut cache = PendingPayloadEnvelopes::::default(); + let slot = Slot::new(1); + + let blobs = BlobsList::::default(); + let data = PendingEnvelopeData { + envelope: make_envelope(slot).envelope, + blobs: Some(blobs), + }; + cache.insert(slot, data); + + // First take returns the blobs + let taken = cache.take_blobs(slot); + assert!(taken.is_some()); + + // Second take returns None — blobs are consumed + let taken_again = cache.take_blobs(slot); + assert!(taken_again.is_none()); + + // Envelope is still in the cache + assert!(cache.contains(slot)); + assert!(cache.get(slot).is_some()); + } + + #[test] + fn take_blobs_returns_none_when_absent() { + let mut cache = PendingPayloadEnvelopes::::default(); + let slot = Slot::new(1); + + // Insert with no blobs + cache.insert(slot, make_envelope(slot)); + assert!(cache.take_blobs(slot).is_none()); + + // Non-existent slot + assert!(cache.take_blobs(Slot::new(99)).is_none()); + } + #[test] fn prune_old_envelopes() { let mut cache = PendingPayloadEnvelopes::::new(2); diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 39da3ce0ae..de944af5b9 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -86,6 +86,8 @@ pub const FORK_NAME_ENV_VAR: &str = "FORK_NAME"; // `beacon_node/execution_layer/src/test_utils/fixtures/mainnet/test_blobs_bundle.ssz` pub const TEST_DATA_COLUMN_SIDECARS_SSZ: &[u8] = include_bytes!("test_utils/fixtures/test_data_column_sidecars.ssz"); +pub const TEST_DATA_COLUMN_SIDECARS_GLOAS_SSZ: &[u8] = + include_bytes!("test_utils/fixtures/test_data_column_sidecars_gloas.ssz"); // Default target aggregators to set during testing, this ensures an aggregator at each slot. // @@ -3808,30 +3810,27 @@ pub fn generate_data_column_sidecars_from_block( block: &SignedBeaconBlock, spec: &ChainSpec, ) -> DataColumnSidecarList { - let signed_block_header = block.signed_block_header(); - - // load the precomputed column sidecar to avoid computing them for every block in the tests. - let template_data_columns = RuntimeVariableList::>::from_ssz_bytes( - TEST_DATA_COLUMN_SIDECARS_SSZ, - E::number_of_columns(), - ) - .unwrap(); - // Load the precomputed column sidecar to avoid computing them for every block in the tests. // Then repeat the cells and proofs for every blob if block.fork_name_unchecked().gloas_enabled() { - // For Gloas, commitments are in the bid, not the block body - let kzg_commitments = block + let kzg_commitments = &block .message() .body() .signed_execution_payload_bid() - .unwrap() + .expect("Gloas block should have a payload bid") .message - .blob_kzg_commitments - .clone(); + .blob_kzg_commitments; if kzg_commitments.is_empty() { return vec![]; } + let num_blobs = kzg_commitments.len(); + let signed_block_header = block.signed_block_header(); + let template_data_columns = + RuntimeVariableList::>::from_ssz_bytes( + TEST_DATA_COLUMN_SIDECARS_GLOAS_SSZ, + E::number_of_columns(), + ) + .unwrap(); // TODO(gloas): The fixture is Fulu format. Generate Gloas-specific fixture once format // is finalized, or compute columns dynamically for Gloas tests. @@ -3850,7 +3849,7 @@ pub fn generate_data_column_sidecars_from_block( .collect::<(Vec<_>, Vec<_>)>(); let blob_cells_and_proofs_vec = - vec![(cells.try_into().unwrap(), proofs.try_into().unwrap()); kzg_commitments.len()]; + vec![(cells.try_into().unwrap(), proofs.try_into().unwrap()); num_blobs]; build_data_column_sidecars_gloas( signed_block_header.message.tree_hash_root(), @@ -3860,16 +3859,25 @@ pub fn generate_data_column_sidecars_from_block( ) .unwrap() } else { - // For pre-Gloas forks, commitments are in the block body let kzg_commitments = block.message().body().blob_kzg_commitments().unwrap(); if kzg_commitments.is_empty() { return vec![]; } + let kzg_commitments_inclusion_proof = block .message() .body() .kzg_commitments_merkle_proof() .unwrap(); + let signed_block_header = block.signed_block_header(); + + // load the precomputed column sidecar to avoid computing them for every block in the tests. + let template_data_columns = + RuntimeVariableList::>::from_ssz_bytes( + TEST_DATA_COLUMN_SIDECARS_SSZ, + E::number_of_columns(), + ) + .unwrap(); let (cells, proofs) = template_data_columns .into_iter() diff --git a/beacon_node/beacon_chain/src/test_utils/fixtures/test_data_column_sidecars_gloas.ssz b/beacon_node/beacon_chain/src/test_utils/fixtures/test_data_column_sidecars_gloas.ssz new file mode 100644 index 0000000000..554b27844b Binary files /dev/null and b/beacon_node/beacon_chain/src/test_utils/fixtures/test_data_column_sidecars_gloas.ssz differ diff --git a/beacon_node/beacon_chain/tests/column_verification.rs b/beacon_node/beacon_chain/tests/column_verification.rs index 5846ccfd7e..06a5f44e5f 100644 --- a/beacon_node/beacon_chain/tests/column_verification.rs +++ b/beacon_node/beacon_chain/tests/column_verification.rs @@ -115,6 +115,78 @@ async fn rpc_columns_with_invalid_header_signature() { )); } +/// Test that Gloas block production caches blobs alongside the envelope, and that +/// data columns can be built from those cached blobs. +#[tokio::test] +async fn gloas_envelope_blobs_produce_valid_columns() { + let spec = Arc::new(test_spec::()); + if !spec.is_gloas_scheduled() { + return; + } + + let harness = get_harness(VALIDATOR_COUNT, spec.clone(), NodeCustodyType::Supernode); + harness.execution_block_generator().set_min_blob_count(1); + + // Build some chain depth. + let num_blocks = E::slots_per_epoch() as usize; + harness + .extend_chain( + num_blocks, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + harness.advance_slot(); + let slot = harness.get_current_slot(); + + // Produce a Gloas block via the harness. This caches envelope + blobs. + let state = harness.get_current_state(); + let (block_contents, opt_envelope, _post_state) = + harness.make_block_with_envelope(state, slot).await; + let signed_block = &block_contents.0; + + assert!( + opt_envelope.is_some(), + "Gloas block production should produce an envelope" + ); + + // Verify the block has blob commitments in the bid. + let bid = signed_block + .message() + .body() + .signed_execution_payload_bid() + .expect("Gloas block should have a payload bid"); + assert!( + !bid.message.blob_kzg_commitments.is_empty(), + "Block should have blob KZG commitments" + ); + + // Generate data columns from the block (using test fixtures, same as the harness does). + let data_column_sidecars = + generate_data_column_sidecars_from_block(signed_block, &harness.chain.spec); + assert_eq!( + data_column_sidecars.len(), + E::number_of_columns(), + "Should produce the correct number of data columns" + ); + + // Verify all columns are Gloas-format. + for col in &data_column_sidecars { + assert!( + col.as_gloas().is_ok(), + "Data column sidecar should be Gloas variant" + ); + let gloas_col = col.as_gloas().expect("should be Gloas sidecar"); + assert_eq!(gloas_col.beacon_block_root, signed_block.canonical_root()); + assert_eq!(gloas_col.slot, slot); + } + + // End-to-end DA flow (process_block → process_envelope → process_rpc_custody_columns) + // is not exercised here: Gloas blocks are not gated on columns at block-import time + // and the envelope/column gating belongs in a dedicated test once the DA path matures. +} + // Regression test for verify_header_signature bug: it uses head_fork() which is wrong for fork blocks #[tokio::test] async fn verify_header_signature_fork_block_bug() { diff --git a/beacon_node/beacon_chain/tests/events.rs b/beacon_node/beacon_chain/tests/events.rs index 5305965f0f..e943514c4e 100644 --- a/beacon_node/beacon_chain/tests/events.rs +++ b/beacon_node/beacon_chain/tests/events.rs @@ -10,8 +10,8 @@ use std::sync::Arc; use types::data::FixedBlobSidecarList; use types::test_utils::TestRandom; use types::{ - BlobSidecar, DataColumnSidecar, DataColumnSidecarFulu, DataColumnSidecarGloas, EthSpec, - MinimalEthSpec, Slot, + BlobSidecar, DataColumnSidecar, DataColumnSidecarFulu, DataColumnSidecarGloas, Domain, EthSpec, + MinimalEthSpec, PayloadAttestationData, PayloadAttestationMessage, SignedRoot, Slot, }; type E = MinimalEthSpec; @@ -258,3 +258,177 @@ async fn head_event_on_block_import() { panic!("Expected Head event, got {:?}", head_event); } } + +/// Verifies that `execution_payload_gossip` fires at gossip verification time, and +/// `execution_payload` + `execution_payload_available` fire at import time. +#[tokio::test] +async fn execution_payload_envelope_events() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + + let harness = BeaconChainHarness::builder(E::default()) + .default_spec() + .deterministic_keypairs(64) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + harness.extend_to_slot(Slot::new(1)).await; + + let state = harness.get_current_state(); + let target_slot = Slot::new(2); + harness.advance_slot(); + let (block_contents, opt_envelope, _new_state) = + harness.make_block_with_envelope(state, target_slot).await; + + let block_root = block_contents.0.canonical_root(); + + harness + .process_block(target_slot, block_root, block_contents) + .await + .expect("block should be processed"); + + let signed_envelope = opt_envelope.expect("Gloas block should produce an envelope"); + + let event_handler = harness.chain.event_handler.as_ref().unwrap(); + let mut gossip_receiver = event_handler.subscribe_execution_payload_gossip(); + let mut payload_receiver = event_handler.subscribe_execution_payload(); + let mut available_receiver = event_handler.subscribe_execution_payload_available(); + + // Stage 1: gossip verification fires execution_payload_gossip only. + let gossip_verified = harness + .chain + .verify_envelope_for_gossip(Arc::new(signed_envelope)) + .await + .expect("envelope gossip verification should succeed"); + + let gossip_event = gossip_receiver + .try_recv() + .expect("should receive execution_payload_gossip after gossip verification"); + if let EventKind::ExecutionPayloadGossip(sse) = gossip_event { + assert_eq!(sse.slot, target_slot); + assert_eq!(sse.block_root, block_root); + } else { + panic!( + "Expected ExecutionPayloadGossip event, got {:?}", + gossip_event + ); + } + assert!(payload_receiver.try_recv().is_err()); + assert!(available_receiver.try_recv().is_err()); + + // Stage 2: import fires execution_payload and execution_payload_available. + harness + .chain + .process_execution_payload_envelope( + block_root, + gossip_verified, + beacon_chain::NotifyExecutionLayer::Yes, + types::BlockImportSource::Gossip, + #[allow(clippy::result_large_err)] + || Ok(()), + ) + .await + .expect("envelope import should succeed"); + + let payload_event = payload_receiver + .try_recv() + .expect("should receive execution_payload after import"); + if let EventKind::ExecutionPayload(sse) = payload_event { + assert_eq!(sse.slot, target_slot); + assert_eq!(sse.block_root, block_root); + } else { + panic!("Expected ExecutionPayload event, got {:?}", payload_event); + } + + let available_event = available_receiver + .try_recv() + .expect("should receive execution_payload_available after import"); + if let EventKind::ExecutionPayloadAvailable(sse) = available_event { + assert_eq!(sse.slot, target_slot); + assert_eq!(sse.block_root, block_root); + } else { + panic!( + "Expected ExecutionPayloadAvailable event, got {:?}", + available_event + ); + } + + assert!( + gossip_receiver.try_recv().is_err(), + "no extra gossip events should fire during import" + ); +} + +/// Verifies that a `payload_attestation_message` event is emitted when a payload attestation +/// message passes gossip verification. +#[tokio::test] +async fn payload_attestation_message_event_on_gossip_verification() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + + let harness = BeaconChainHarness::builder(E::default()) + .default_spec() + .deterministic_keypairs(64) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + // Advance chain to have a valid head block. + let target_slot = Slot::new(1); + harness.extend_to_slot(target_slot).await; + + let head = harness.chain.canonical_head.cached_head(); + let head_state = &head.snapshot.beacon_state; + + // Get a PTC member for this slot. + let ptc = head_state + .get_ptc(target_slot, &harness.spec) + .expect("should get PTC"); + let validator_index = *ptc.0.first().expect("PTC should have at least one member") as u64; + + // Sign a payload attestation. + let target_epoch = target_slot.epoch(E::slots_per_epoch()); + let domain = harness.spec.get_domain( + target_epoch, + Domain::PTCAttester, + &head_state.fork(), + head_state.genesis_validators_root(), + ); + let data = PayloadAttestationData { + beacon_block_root: head.head_block_root(), + slot: target_slot, + payload_present: true, + blob_data_available: true, + }; + let message = data.signing_root(domain); + let signature = harness.validator_keypairs[validator_index as usize] + .sk + .sign(message); + let msg = PayloadAttestationMessage { + validator_index, + data: data.clone(), + signature: signature.clone(), + }; + + // Subscribe before verification. + let event_handler = harness.chain.event_handler.as_ref().unwrap(); + let mut receiver = event_handler.subscribe_payload_attestation_message(); + + // Verify the attestation through the gossip path. + harness + .chain + .verify_payload_attestation_message_for_gossip(msg) + .expect("verification should succeed"); + + // Assert the event was emitted. + let event = receiver.try_recv().expect("should receive event"); + if let EventKind::PayloadAttestationMessage(versioned) = event { + assert_eq!(versioned.data.validator_index, validator_index); + assert_eq!(versioned.data.data, data); + } else { + panic!("Expected PayloadAttestationMessage event, got {:?}", event); + } +} diff --git a/beacon_node/beacon_chain/tests/prepare_payload.rs b/beacon_node/beacon_chain/tests/prepare_payload.rs index dc4f999eb2..1d23990b80 100644 --- a/beacon_node/beacon_chain/tests/prepare_payload.rs +++ b/beacon_node/beacon_chain/tests/prepare_payload.rs @@ -573,3 +573,121 @@ async fn prepare_payload_on_fork_boundary( advanced state" ); } + +#[tokio::test] +async fn gloas_block_production_caches_blobs_for_column_publishing() { + use beacon_chain::ProduceBlockVerification; + use beacon_chain::graffiti_calculator::GraffitiSettings; + use eth2::types::GraffitiPolicy; + + let spec = Arc::new(test_spec::()); + if !spec.fork_name_at_slot::(Slot::new(0)).gloas_enabled() { + return; + } + + let db_path = tempdir().unwrap(); + let store = get_store(&db_path, spec.clone()); + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); + + // Configure the mock EL to produce at least 1 blob per block. + harness.execution_block_generator().set_min_blob_count(1); + + // Extend the chain a few slots to get past genesis. + harness + .extend_chain( + (E::slots_per_epoch() as usize) + 1, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + harness.advance_slot(); + let slot = harness.get_current_slot(); + + // Produce a Gloas block directly via produce_block_on_state_gloas so we can + // inspect the pending cache before it's consumed. + let mut state = harness.get_current_state(); + complete_state_advance(&mut state, None, slot, &spec).unwrap(); + state.build_caches(&spec).unwrap(); + + let proposer_index = state.get_beacon_proposer_index(slot, &spec).unwrap(); + let randao_reveal = harness.sign_randao_reveal(&state, proposer_index, slot); + + let (parent_payload_status, parent_envelope) = { + let head = harness.chain.canonical_head.cached_head(); + ( + head.head_payload_status(), + head.snapshot.execution_envelope.clone(), + ) + }; + + let graffiti_settings = GraffitiSettings::new( + Some(Graffiti::default()), + Some(GraffitiPolicy::PreserveUserGraffiti), + ); + + let (_block, _post_state, _value) = harness + .chain + .produce_block_on_state_gloas( + state, + None, + parent_payload_status, + parent_envelope, + slot, + randao_reveal, + graffiti_settings, + ProduceBlockVerification::VerifyRandao, + ) + .await + .unwrap(); + + // The envelope + blobs should now be in the pending cache. + assert!( + harness + .chain + .pending_payload_envelopes + .read() + .contains(slot), + "Pending cache should contain an envelope for the produced slot" + ); + + // Take the blobs from the cache — this is what publish_execution_payload_envelope does. + let blobs = harness + .chain + .pending_payload_envelopes + .write() + .take_blobs(slot); + + assert!( + blobs.is_some(), + "Blobs should be cached alongside the envelope" + ); + + let blobs = blobs.unwrap(); + assert!( + !blobs.is_empty(), + "Blobs should be non-empty when min_blob_count >= 1" + ); + + // Verify take_blobs is consume-once. + let second_take = harness + .chain + .pending_payload_envelopes + .write() + .take_blobs(slot); + assert!( + second_take.is_none(), + "Blobs should only be consumable once" + ); + + // The envelope should still be in the cache after taking blobs. + assert!( + harness + .chain + .pending_payload_envelopes + .read() + .get(slot) + .is_some(), + "Envelope should remain in cache after taking blobs" + ); +} diff --git a/beacon_node/client/src/notifier.rs b/beacon_node/client/src/notifier.rs index 0d73a6bf7a..bdb4228765 100644 --- a/beacon_node/client/src/notifier.rs +++ b/beacon_node/client/src/notifier.rs @@ -360,7 +360,7 @@ pub fn spawn_notifier( let block_info = if current_slot > head_slot { " … empty".to_string() } else { - head_root.to_string() + head_root.short().to_string() }; let block_hash = match beacon_chain.canonical_head.head_execution_status() { @@ -393,7 +393,7 @@ pub fn spawn_notifier( info!( peers = peer_count_pretty(connected_peer_count), exec_hash = block_hash, - finalized_root = %finalized_checkpoint.root, + finalized_root = %finalized_checkpoint.root.short(), finalized_epoch = %finalized_checkpoint.epoch, epoch = %current_epoch, block = block_info, @@ -404,7 +404,7 @@ pub fn spawn_notifier( metrics::set_gauge(&metrics::IS_SYNCED, 0); info!( peers = peer_count_pretty(connected_peer_count), - finalized_root = %finalized_checkpoint.root, + finalized_root = %finalized_checkpoint.root.short(), finalized_epoch = %finalized_checkpoint.epoch, %head_slot, %current_slot, diff --git a/beacon_node/http_api/src/beacon/execution_payload_envelope.rs b/beacon_node/http_api/src/beacon/execution_payload_envelope.rs index 382b967b43..06a5915c08 100644 --- a/beacon_node/http_api/src/beacon/execution_payload_envelope.rs +++ b/beacon_node/http_api/src/beacon/execution_payload_envelope.rs @@ -1,10 +1,12 @@ use crate::block_id::BlockId; +use crate::publish_blocks::publish_column_sidecars; use crate::task_spawner::{Priority, TaskSpawner}; use crate::utils::{ChainFilter, EthV1Filter, NetworkTxFilter, ResponseFilter, TaskSpawnerFilter}; use crate::version::{ ResponseIncludesVersion, add_consensus_version_header, add_ssz_content_type_header, execution_optimistic_finalized_beacon_response, }; +use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; use beacon_chain::{BeaconChain, BeaconChainTypes}; use bytes::Bytes; use eth2::types as api_types; @@ -12,10 +14,11 @@ use eth2::{CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER}; use lighthouse_network::PubsubMessage; use network::NetworkMessage; use ssz::{Decode, Encode}; +use std::future::Future; use std::sync::Arc; use tokio::sync::mpsc::UnboundedSender; -use tracing::{info, warn}; -use types::SignedExecutionPayloadEnvelope; +use tracing::{debug, error, info, warn}; +use types::{EthSpec, SignedExecutionPayloadEnvelope}; use warp::{ Filter, Rejection, Reply, hyper::{Body, Response}, @@ -85,7 +88,9 @@ pub(crate) fn post_beacon_execution_payload_envelope( ) .boxed() } -/// Publishes a signed execution payload envelope to the network. +/// Publishes a signed execution payload envelope to the network. Implements +/// `POST /eth/v1/beacon/execution_payload_envelope` per the in-flight beacon-APIs PR +/// . pub async fn publish_execution_payload_envelope( envelope: SignedExecutionPayloadEnvelope, chain: Arc>, @@ -109,7 +114,24 @@ pub async fn publish_execution_payload_envelope( "Publishing signed execution payload envelope to network" ); - // Publish to the network + let blobs_and_proofs = chain.pending_payload_envelopes.write().take_blobs(slot); + + // Spawn the column-build task (CPU-bound KZG cell-and-proof computation) before + // publishing the envelope so it runs in parallel with envelope gossip, narrowing + // the window in which peers see envelope-without-columns. If envelope publication + // fails below, dropping this future drops the spawned `JoinHandle` (the running + // closure on the blocking pool finishes and is then discarded — no work cancellation). + let column_build_future = match blobs_and_proofs { + Some(blobs) if !blobs.is_empty() => Some(spawn_build_gloas_data_columns_task( + &chain, + beacon_block_root, + slot, + blobs, + )?), + _ => None, + }; + + // Publish the envelope to the network. crate::utils::publish_pubsub_message( network_tx, PubsubMessage::ExecutionPayload(Box::new(envelope)), @@ -121,9 +143,130 @@ pub async fn publish_execution_payload_envelope( ) })?; + // From here on the envelope is on the wire. `take_blobs` already consumed the cache + // entry, so a retry would not republish columns; returning Err would mislead the + // caller. Log column-build/publish failures and fall through to `Ok`. + if let Some(column_build_future) = column_build_future { + let gossip_verified_columns = match column_build_future.await { + Ok(columns) => columns, + Err(e) => { + error!( + %slot, + error = ?e, + "Failed to build data columns after envelope publication" + ); + return Ok(warp::reply().into_response()); + } + }; + + if !gossip_verified_columns.is_empty() { + if let Err(e) = publish_column_sidecars(network_tx, &gossip_verified_columns, &chain) { + error!( + %slot, + error = ?e, + "Failed to publish data column sidecars after envelope publication" + ); + return Ok(warp::reply().into_response()); + } + + let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); + let sampling_column_indices = chain.sampling_columns_for_epoch(epoch); + let sampling_columns = gossip_verified_columns + .into_iter() + .filter(|col| sampling_column_indices.contains(&col.index())) + .collect::>(); + + // Local processing only — envelope already broadcast, so log and fall through. + if !sampling_columns.is_empty() + && let Err(e) = + Box::pin(chain.process_gossip_data_columns(sampling_columns, || Ok(()))).await + { + error!( + %slot, + error = ?e, + "Failed to process sampling data columns during envelope publication" + ); + } + } + } + Ok(warp::reply().into_response()) } +fn spawn_build_gloas_data_columns_task( + chain: &Arc>, + beacon_block_root: types::Hash256, + slot: types::Slot, + blobs: types::BlobsList, +) -> Result>, Rejection>>, Rejection> { + let chain_for_build = chain.clone(); + let handle = chain + .task_executor + .spawn_blocking_handle( + move || build_gloas_data_columns(&chain_for_build, beacon_block_root, slot, &blobs), + "build_gloas_data_columns", + ) + .ok_or_else(|| warp_utils::reject::custom_server_error("runtime shutdown".to_string()))?; + + Ok(async move { + handle + .await + .map_err(|_| warp_utils::reject::custom_server_error("join error".to_string()))? + }) +} + +fn build_gloas_data_columns( + chain: &BeaconChain, + beacon_block_root: types::Hash256, + slot: types::Slot, + blobs: &types::BlobsList, +) -> Result>, Rejection> { + let blob_refs: Vec<_> = blobs.iter().collect(); + let data_column_sidecars = beacon_chain::kzg_utils::blobs_to_data_column_sidecars_gloas( + &blob_refs, + beacon_block_root, + slot, + &chain.kzg, + &chain.spec, + ) + .map_err(|e| { + error!( + error = ?e, + %slot, + "Failed to build data column sidecars for envelope" + ); + warp_utils::reject::custom_server_error(format!("{e:?}")) + })?; + + let gossip_verified_columns = data_column_sidecars + .into_iter() + .filter_map(|col| { + let index = *col.index(); + match GossipVerifiedDataColumn::new_for_block_publishing(col, chain) { + Ok(verified) => Some(verified), + Err(GossipDataColumnError::PriorKnownUnpublished) => None, + Err(e) => { + warn!( + %slot, + column_index = index, + error = ?e, + "Locally-built data column failed gossip verification" + ); + None + } + } + }) + .collect::>(); + + debug!( + %slot, + column_count = gossip_verified_columns.len(), + "Built data columns for envelope publication" + ); + + Ok(gossip_verified_columns) +} + // TODO(gloas): add tests for this endpoint once we support importing payloads into the db // GET beacon/execution_payload_envelope/{block_id} pub(crate) fn get_beacon_execution_payload_envelope( diff --git a/beacon_node/http_api/src/beacon/pool.rs b/beacon_node/http_api/src/beacon/pool.rs index 059573c317..c6b8a69643 100644 --- a/beacon_node/http_api/src/beacon/pool.rs +++ b/beacon_node/http_api/src/beacon/pool.rs @@ -1,24 +1,31 @@ use crate::task_spawner::{Priority, TaskSpawner}; -use crate::utils::{NetworkTxFilter, OptionalConsensusVersionHeaderFilter, ResponseFilter}; +use crate::utils::{ + ChainFilter, EthV1Filter, NetworkTxFilter, OptionalConsensusVersionHeaderFilter, + ResponseFilter, TaskSpawnerFilter, +}; use crate::version::{ ResponseIncludesVersion, V1, V2, add_consensus_version_header, beacon_response, unsupported_version_rejection, }; use crate::{sync_committees, utils}; use beacon_chain::observed_operations::ObservationOutcome; +use beacon_chain::payload_attestation_verification::Error as PayloadAttestationError; use beacon_chain::{BeaconChain, BeaconChainTypes}; +use bytes::Bytes; use eth2::types::{AttestationPoolQuery, EndpointVersion, Failure, GenericResponse}; use lighthouse_network::PubsubMessage; use network::NetworkMessage; use operation_pool::ReceivedPreCapella; use slot_clock::SlotClock; +use ssz::{Decode, Encode}; use std::collections::HashSet; use std::sync::Arc; use tokio::sync::mpsc::UnboundedSender; -use tracing::{debug, info, warn}; +use tracing::{debug, error, info, warn}; use types::{ - Attestation, AttestationData, AttesterSlashing, ForkName, ProposerSlashing, - SignedBlsToExecutionChange, SignedVoluntaryExit, SingleAttestation, SyncCommitteeMessage, + Attestation, AttestationData, AttesterSlashing, ForkName, PayloadAttestationMessage, + ProposerSlashing, SignedBlsToExecutionChange, SignedVoluntaryExit, SingleAttestation, + SyncCommitteeMessage, }; use warp::filters::BoxedFilter; use warp::{Filter, Reply}; @@ -520,3 +527,137 @@ pub fn post_beacon_pool_attestations_v2( ) .boxed() } + +/// POST beacon/pool/payload_attestations (JSON) +pub fn post_beacon_pool_payload_attestations( + network_tx_filter: &NetworkTxFilter, + optional_consensus_version_header_filter: OptionalConsensusVersionHeaderFilter, + beacon_pool_path: &BeaconPoolPathFilter, +) -> ResponseFilter { + beacon_pool_path + .clone() + .and(warp::path("payload_attestations")) + .and(warp::path::end()) + .and(warp_utils::json::json()) + .and(optional_consensus_version_header_filter) + .and(network_tx_filter.clone()) + .then( + |task_spawner: TaskSpawner, + chain: Arc>, + messages: Vec, + _fork_name: Option, + network_tx: UnboundedSender>| { + task_spawner.blocking_json_task(Priority::P0, move || { + publish_payload_attestation_messages(&chain, &network_tx, messages) + }) + }, + ) + .boxed() +} + +/// POST beacon/pool/payload_attestations (SSZ) +pub fn post_beacon_pool_payload_attestations_ssz( + eth_v1: EthV1Filter, + task_spawner_filter: TaskSpawnerFilter, + chain_filter: ChainFilter, + network_tx_filter: NetworkTxFilter, +) -> ResponseFilter { + eth_v1 + .and(warp::path("beacon")) + .and(warp::path("pool")) + .and(warp::path("payload_attestations")) + .and(warp::path::end()) + .and(warp::body::bytes()) + .and(task_spawner_filter) + .and(chain_filter) + .and(network_tx_filter) + .then( + |body_bytes: Bytes, + task_spawner: TaskSpawner, + chain: Arc>, + network_tx: UnboundedSender>| { + task_spawner.blocking_json_task(Priority::P0, move || { + let item_len = ::ssz_fixed_len(); + if !body_bytes.len().is_multiple_of(item_len) { + return Err(warp_utils::reject::custom_bad_request(format!( + "SSZ body length {} is not a multiple of PayloadAttestationMessage size {}", + body_bytes.len(), + item_len, + ))); + } + let messages: Vec = body_bytes + .chunks(item_len) + .map(|chunk| { + PayloadAttestationMessage::from_ssz_bytes(chunk).map_err(|e| { + warp_utils::reject::custom_bad_request(format!( + "invalid SSZ: {e:?}" + )) + }) + }) + .collect::>()?; + + publish_payload_attestation_messages(&chain, &network_tx, messages) + }) + }, + ) + .boxed() +} + +fn publish_payload_attestation_messages( + chain: &BeaconChain, + network_tx: &UnboundedSender>, + messages: Vec, +) -> Result<(), warp::Rejection> { + let mut failures = vec![]; + let mut num_already_known = 0; + + for (index, message) in messages.into_iter().enumerate() { + match chain.verify_payload_attestation_message_for_gossip(message.clone()) { + Ok(verified) => { + utils::publish_pubsub_message( + network_tx, + PubsubMessage::PayloadAttestation(Box::new(message)), + )?; + + if let Err(e) = chain.apply_payload_attestation_to_fork_choice( + verified.indexed_payload_attestation(), + verified.ptc(), + ) { + warn!( + error = ?e, + request_index = index, + "Payload attestation invalid for fork choice" + ); + } + } + Err(PayloadAttestationError::PriorPayloadAttestationMessageKnown { .. }) => { + num_already_known += 1; + } + // TODO(gloas): requeue for reprocessing like attestations do. + Err(e) => { + error!( + error = ?e, + request_index = index, + "Failure verifying payload attestation for gossip" + ); + failures.push(Failure::new(index, format!("{e:?}"))); + } + } + } + + if num_already_known > 0 { + debug!( + count = num_already_known, + "Some payload attestations already known" + ); + } + + if failures.is_empty() { + Ok(()) + } else { + Err(warp_utils::reject::indexed_bad_request( + "error processing payload attestations".to_string(), + failures, + )) + } +} diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index bd80dd1e82..b2d069f384 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1454,7 +1454,7 @@ pub fn serve( let post_beacon_pool_attestations_v2 = post_beacon_pool_attestations_v2( &network_tx_filter, - optional_consensus_version_header_filter, + optional_consensus_version_header_filter.clone(), &beacon_pool_path_v2, ); @@ -1487,6 +1487,21 @@ pub fn serve( let post_beacon_pool_sync_committees = post_beacon_pool_sync_committees(&network_tx_filter, &beacon_pool_path); + // POST beacon/pool/payload_attestations + let post_beacon_pool_payload_attestations = post_beacon_pool_payload_attestations( + &network_tx_filter, + optional_consensus_version_header_filter, + &beacon_pool_path, + ); + + // POST beacon/pool/payload_attestations (SSZ) + let post_beacon_pool_payload_attestations_ssz = post_beacon_pool_payload_attestations_ssz( + eth_v1.clone(), + task_spawner_filter.clone(), + chain_filter.clone(), + network_tx_filter.clone(), + ); + // GET beacon/pool/bls_to_execution_changes let get_beacon_pool_bls_to_execution_changes = get_beacon_pool_bls_to_execution_changes(&beacon_pool_path); @@ -3400,7 +3415,8 @@ pub fn serve( .uor(post_beacon_blocks_v2_ssz) .uor(post_beacon_blinded_blocks_ssz) .uor(post_beacon_blinded_blocks_v2_ssz) - .uor(post_beacon_execution_payload_envelope_ssz), + .uor(post_beacon_execution_payload_envelope_ssz) + .uor(post_beacon_pool_payload_attestations_ssz), ) .uor(post_beacon_blocks) .uor(post_beacon_blinded_blocks) @@ -3411,6 +3427,7 @@ pub fn serve( .uor(post_beacon_pool_proposer_slashings) .uor(post_beacon_pool_voluntary_exits) .uor(post_beacon_pool_sync_committees) + .uor(post_beacon_pool_payload_attestations) .uor(post_beacon_pool_bls_to_execution_changes) .uor(post_beacon_execution_payload_envelope) .uor(post_beacon_state_validators) diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 6b65995a73..644ade956a 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -494,7 +494,7 @@ fn publish_blob_sidecars( .map_err(|_| BlockError::BeaconChainError(Box::new(BeaconChainError::UnableToPublish))) } -fn publish_column_sidecars( +pub(crate) fn publish_column_sidecars( sender_clone: &UnboundedSender>, data_column_sidecars: &[GossipVerifiedDataColumn], chain: &BeaconChain, diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index aac3384fbd..b8326f4495 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -2793,6 +2793,89 @@ impl ApiTester { self } + fn make_valid_payload_attestation_message( + &self, + ptc_offset: usize, + ) -> PayloadAttestationMessage { + let head = self.chain.head_snapshot(); + let head_slot = head.beacon_block.slot(); + let head_root = head.beacon_block_root; + let fork = head.beacon_state.fork(); + let genesis_validators_root = self.chain.genesis_validators_root; + + let ptc = head + .beacon_state + .get_ptc(head_slot, &self.chain.spec) + .expect("should get PTC"); + + // Find distinct validator indices in the PTC (may contain duplicates due to + // weighted sampling with a small validator set). + let mut seen = std::collections::HashSet::new(); + let distinct_indices: Vec = ptc + .0 + .iter() + .copied() + .filter(|idx| seen.insert(*idx)) + .collect(); + let validator_index = distinct_indices[ptc_offset % distinct_indices.len()]; + + let data = PayloadAttestationData { + beacon_block_root: head_root, + slot: head_slot, + payload_present: true, + blob_data_available: true, + }; + + let epoch = head_slot.epoch(E::slots_per_epoch()); + let domain = + self.chain + .spec + .get_domain(epoch, Domain::PTCAttester, &fork, genesis_validators_root); + let signing_root = data.signing_root(domain); + let sk = &self.validator_keypairs()[validator_index].sk; + let signature = sk.sign(signing_root); + + PayloadAttestationMessage { + validator_index: validator_index as u64, + data, + signature, + } + } + + pub async fn test_post_beacon_pool_payload_attestations_valid(mut self) -> Self { + let message = self.make_valid_payload_attestation_message(0); + let fork_name = self.chain.spec.fork_name_at_slot::(message.data.slot); + + self.client + .post_beacon_pool_payload_attestations(&[message], fork_name) + .await + .unwrap(); + + assert!( + self.network_rx.network_recv.recv().await.is_some(), + "valid payload attestation should be sent to network" + ); + + self + } + + pub async fn test_post_beacon_pool_payload_attestations_valid_ssz(mut self) -> Self { + let message = self.make_valid_payload_attestation_message(1); + let fork_name = self.chain.spec.fork_name_at_slot::(message.data.slot); + + self.client + .post_beacon_pool_payload_attestations_ssz(&[message], fork_name) + .await + .unwrap(); + + assert!( + self.network_rx.network_recv.recv().await.is_some(), + "valid payload attestation (SSZ) should be sent to network" + ); + + self + } + pub async fn test_get_config_fork_schedule(self) -> Self { let result = self.client.get_config_fork_schedule().await.unwrap().data; @@ -8246,6 +8329,19 @@ async fn get_validator_payload_attestation_data_pre_gloas() { .await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn post_beacon_pool_payload_attestations_valid() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + ApiTester::new() + .await + .test_post_beacon_pool_payload_attestations_valid() + .await + .test_post_beacon_pool_payload_attestations_valid_ssz() + .await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_aggregate_attestation_v1() { ApiTester::new() diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index c459bd8dce..c7c8bc1d46 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -21,6 +21,9 @@ use beacon_chain::{ light_client_finality_update_verification::Error as LightClientFinalityUpdateError, light_client_optimistic_update_verification::Error as LightClientOptimisticUpdateError, observed_operations::ObservationOutcome, + payload_attestation_verification::{ + Error as PayloadAttestationError, VerifiedPayloadAttestationMessage, + }, sync_committee_verification::{self, Error as SyncCommitteeError}, validator_monitor::{get_block_delay_ms, get_slot_delay_ms}, }; @@ -4090,25 +4093,143 @@ impl NetworkBeaconProcessor { } } - // TODO(gloas) dont forget to add tracing instrumentation + #[instrument( + level = "trace", + skip_all, + fields( + peer_id = %peer_id, + slot = %payload_attestation_message.data.slot, + validator_index = payload_attestation_message.validator_index, + ) + )] pub fn process_gossip_payload_attestation( self: &Arc, message_id: MessageId, peer_id: PeerId, - payload_attestation_message: PayloadAttestationMessage, + payload_attestation_message: Box, ) { - // TODO(EIP-7732): Implement proper payload attestation message gossip processing. - // This should integrate with a payload_attestation_verification.rs module once it's implemented. + let message_slot = payload_attestation_message.data.slot; + let result = self + .chain + .verify_payload_attestation_message_for_gossip(*payload_attestation_message); - trace!( - %peer_id, - validator_index = payload_attestation_message.validator_index, - slot = %payload_attestation_message.data.slot, - beacon_block_root = %payload_attestation_message.data.beacon_block_root, - "Processing payload attestation message" - ); + self.process_gossip_payload_attestation_result(result, message_id, peer_id, message_slot); + } - // For now, ignore all payload attestation messages since verification is not implemented - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + fn process_gossip_payload_attestation_result( + self: &Arc, + result: Result, PayloadAttestationError>, + message_id: MessageId, + peer_id: PeerId, + message_slot: Slot, + ) { + match result { + Ok(verified) => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); + + if let Err(e) = self.chain.apply_payload_attestation_to_fork_choice( + verified.indexed_payload_attestation(), + verified.ptc(), + ) { + match e { + BeaconChainError::ForkChoiceError( + ForkChoiceError::InvalidPayloadAttestation(e), + ) => { + debug!( + reason = ?e, + %peer_id, + "Payload attestation invalid for fork choice" + ) + } + e => error!( + reason = ?e, + %peer_id, + "Error applying payload attestation to fork choice" + ), + } + } + + if let Err(e) = self.chain.add_payload_attestation_to_pool(&verified) { + warn!( + reason = ?e, + %peer_id, + "Failed to add payload attestation to pool" + ); + } + } + Err(error) => { + self.handle_payload_attestation_verification_failure( + peer_id, + message_id, + error, + message_slot, + ); + } + } + } + + fn handle_payload_attestation_verification_failure( + &self, + peer_id: PeerId, + message_id: MessageId, + error: PayloadAttestationError, + message_slot: Slot, + ) { + match &error { + PayloadAttestationError::FutureSlot { .. } => { + self.gossip_penalize_peer( + peer_id, + PeerAction::HighToleranceError, + "payload_attn_future_slot", + ); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + } + PayloadAttestationError::PastSlot { .. } + | PayloadAttestationError::PriorPayloadAttestationMessageKnown { .. } => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + } + PayloadAttestationError::UnknownHeadBlock { .. } => { + debug!( + %peer_id, + %message_slot, + "Payload attestation references unknown block" + ); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + } + PayloadAttestationError::NotInPTC { .. } => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + self.gossip_penalize_peer( + peer_id, + PeerAction::LowToleranceError, + "payload_attn_not_in_ptc", + ); + } + PayloadAttestationError::UnknownValidatorIndex(_) => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + self.gossip_penalize_peer( + peer_id, + PeerAction::LowToleranceError, + "payload_attn_unknown_validator", + ); + } + PayloadAttestationError::InvalidSignature => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + self.gossip_penalize_peer( + peer_id, + PeerAction::LowToleranceError, + "payload_attn_invalid_sig", + ); + } + PayloadAttestationError::BeaconChainError(_) + | PayloadAttestationError::BeaconStateError(_) => { + debug!( + %peer_id, + %message_slot, + ?error, + "Internal error verifying payload attestation" + ); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + } + } } } diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index a39923723d..41867907b2 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -511,7 +511,7 @@ impl NetworkBeaconProcessor { processor.process_gossip_payload_attestation( message_id, peer_id, - *payload_attestation_message, + payload_attestation_message, ) }; diff --git a/beacon_node/operation_pool/src/lib.rs b/beacon_node/operation_pool/src/lib.rs index 4b815704d9..de5fe9a098 100644 --- a/beacon_node/operation_pool/src/lib.rs +++ b/beacon_node/operation_pool/src/lib.rs @@ -23,10 +23,12 @@ use crate::attestation_storage::{AttestationMap, CheckpointKey}; use crate::bls_to_execution_changes::BlsToExecutionChanges; use crate::sync_aggregate_id::SyncAggregateId; use attester_slashing::AttesterSlashingMaxCover; +use bls::AggregateSignature; use max_cover::maximum_cover; use parking_lot::{RwLock, RwLockWriteGuard}; use rand::rng; use rand::seq::SliceRandom; +use ssz::BitVector; use state_processing::per_block_processing::errors::AttestationValidationError; use state_processing::per_block_processing::{ VerifySignatures, get_slashable_indices_modular, verify_exit, @@ -38,7 +40,8 @@ use std::ptr; use typenum::Unsigned; use types::{ AbstractExecPayload, Attestation, AttestationData, AttesterSlashing, BeaconState, - BeaconStateError, ChainSpec, Epoch, EthSpec, ProposerSlashing, SignedBeaconBlock, + BeaconStateError, ChainSpec, Epoch, EthSpec, Hash256, PayloadAttestation, + PayloadAttestationData, PayloadAttestationMessage, ProposerSlashing, SignedBeaconBlock, SignedBlsToExecutionChange, SignedVoluntaryExit, Slot, SyncAggregate, SyncAggregateError, SyncCommitteeContribution, Validator, }; @@ -59,6 +62,9 @@ pub struct OperationPool { voluntary_exits: RwLock>>, /// Map from credential changing validator to their position in the queue. bls_to_execution_changes: RwLock>, + /// Map from payload attestation data to individual messages for aggregation at block production. + payload_attestation_messages: + RwLock>>, /// Reward cache for accelerating attestation packing. reward_cache: RwLock, _phantom: PhantomData, @@ -78,6 +84,8 @@ pub enum OpPoolError { IncorrectOpPoolVariant, EpochCacheNotInitialized, EpochCacheError(EpochCacheError), + GetPtcError(BeaconStateError), + PayloadAttestationBitError, } #[derive(Default)] @@ -193,6 +201,100 @@ impl OperationPool { }); } + /// Insert a validated `PayloadAttestationMessage` into the pool. + pub fn insert_payload_attestation_message( + &self, + message: PayloadAttestationMessage, + ) -> Result<(), OpPoolError> { + let mut messages = self.payload_attestation_messages.write(); + let entry = messages.entry(message.data.clone()).or_default(); + if !entry + .iter() + .any(|m| m.validator_index == message.validator_index) + { + entry.push(message); + } + Ok(()) + } + + /// Build `PayloadAttestation`s from stored messages for block production. + /// + /// `parent_block_root` is the root of the parent block (the block PTC members attested to). + /// Returns one `PayloadAttestation` per distinct `PayloadAttestationData`. With two boolean + /// fields this yields at most 4, capped to `MaxPayloadAttestations`. + pub fn get_payload_attestations( + &self, + state: &BeaconState, + parent_block_root: Hash256, + spec: &ChainSpec, + ) -> Result>, OpPoolError> { + let target_slot = state.slot().saturating_sub(1u64); + + let ptc = state + .get_ptc(target_slot, spec) + .map_err(OpPoolError::GetPtcError)?; + + let messages = self.payload_attestation_messages.read(); + let mut result = Vec::new(); + + for (data, msgs) in messages.iter() { + if data.slot != target_slot || data.beacon_block_root != parent_block_root { + continue; + } + + let mut aggregation_bits = BitVector::new(); + let mut aggregate_sig = AggregateSignature::infinity(); + + for msg in msgs { + if let Some(pos) = ptc + .0 + .iter() + .position(|&idx| idx == msg.validator_index as usize) + && !aggregation_bits.get(pos).unwrap_or(false) + { + aggregation_bits + .set(pos, true) + .map_err(|_| OpPoolError::PayloadAttestationBitError)?; + aggregate_sig.add_assign(&msg.signature); + } + } + + if aggregation_bits.num_set_bits() > 0 { + result.push(PayloadAttestation { + aggregation_bits, + data: data.clone(), + signature: aggregate_sig, + }); + } + } + + // Prefer most participation and cap by `max_payload_attestations` + result.sort_by(|a, b| { + b.aggregation_bits + .num_set_bits() + .cmp(&a.aggregation_bits.num_set_bits()) + }); + result.truncate(E::max_payload_attestations()); + + Ok(result) + } + + /// Remove payload attestation messages that are too old for block inclusion. + pub fn prune_payload_attestation_messages(&self, current_slot: Slot) { + self.payload_attestation_messages + .write() + .retain(|data, _| current_slot <= data.slot.saturating_add(Slot::new(1))); + } + + /// Total number of payload attestation messages in the pool. + pub fn num_payload_attestation_messages(&self) -> usize { + self.payload_attestation_messages + .read() + .values() + .map(|msgs| msgs.len()) + .sum() + } + /// Insert an attestation into the pool, aggregating it with existing attestations if possible. /// /// ## Note @@ -646,6 +748,7 @@ impl OperationPool { ) { self.prune_attestations(current_epoch); self.prune_sync_contributions(head_state.slot()); + self.prune_payload_attestation_messages(head_state.slot()); self.prune_proposer_slashings(finalized_state); self.prune_attester_slashings(finalized_state); self.prune_voluntary_exits(finalized_state, spec); @@ -2075,4 +2178,214 @@ mod release_tests { op_pool.prune_attester_slashings(&electra_head.beacon_state); assert_eq!(op_pool.attester_slashings.read().len(), 1); } + + fn make_payload_attestation_message( + slot: Slot, + validator_index: u64, + beacon_block_root: Hash256, + ) -> PayloadAttestationMessage { + make_payload_attestation_message_with_flags( + slot, + validator_index, + beacon_block_root, + true, + true, + ) + } + + fn make_payload_attestation_message_with_flags( + slot: Slot, + validator_index: u64, + beacon_block_root: Hash256, + payload_present: bool, + blob_data_available: bool, + ) -> PayloadAttestationMessage { + PayloadAttestationMessage { + validator_index, + data: PayloadAttestationData { + beacon_block_root, + slot, + payload_present, + blob_data_available, + }, + signature: bls::Signature::empty(), + } + } + + #[test] + fn payload_attestation_insert_and_dedup() { + let op_pool = OperationPool::::new(); + let root = Hash256::repeat_byte(0xaa); + let slot = Slot::new(1); + + let msg1 = make_payload_attestation_message(slot, 0, root); + let msg2 = make_payload_attestation_message(slot, 1, root); + let msg1_dup = make_payload_attestation_message(slot, 0, root); + + op_pool.insert_payload_attestation_message(msg1).unwrap(); + op_pool.insert_payload_attestation_message(msg2).unwrap(); + op_pool + .insert_payload_attestation_message(msg1_dup) + .unwrap(); + + assert_eq!(op_pool.num_payload_attestation_messages(), 2); + } + + #[test] + fn payload_attestation_prune() { + let op_pool = OperationPool::::new(); + let root = Hash256::repeat_byte(0xaa); + + let msg_slot1 = make_payload_attestation_message(Slot::new(1), 0, root); + let msg_slot2 = make_payload_attestation_message(Slot::new(2), 1, root); + let msg_slot3 = make_payload_attestation_message(Slot::new(3), 2, root); + + op_pool + .insert_payload_attestation_message(msg_slot1) + .unwrap(); + op_pool + .insert_payload_attestation_message(msg_slot2) + .unwrap(); + op_pool + .insert_payload_attestation_message(msg_slot3) + .unwrap(); + + assert_eq!(op_pool.num_payload_attestation_messages(), 3); + + op_pool.prune_payload_attestation_messages(Slot::new(3)); + assert_eq!(op_pool.num_payload_attestation_messages(), 2); + + op_pool.prune_payload_attestation_messages(Slot::new(4)); + assert_eq!(op_pool.num_payload_attestation_messages(), 1); + + op_pool.prune_payload_attestation_messages(Slot::new(5)); + assert_eq!(op_pool.num_payload_attestation_messages(), 0); + } + + #[tokio::test] + async fn payload_attestation_packs_bits_from_ptc_positions() { + let spec = test_spec::(); + if spec.gloas_fork_epoch.is_none() { + return; + }; + + let num_validators = 64; + let harness = get_harness::(num_validators, Some(spec.clone())); + + harness + .add_attested_blocks_at_slots( + harness.get_current_state(), + Hash256::zero(), + &[Slot::new(1)], + (0..num_validators).collect::>().as_slice(), + ) + .await; + + let head = harness.chain.canonical_head.cached_head(); + let state = &head.snapshot.beacon_state; + assert_eq!(state.slot(), Slot::new(1)); + + let target_slot = Slot::new(1); + let parent_root = head.head_block_root(); + let ptc = state.get_ptc(target_slot, &spec).unwrap(); + let ptc_member_0 = ptc.0[0] as u64; + let ptc_member_1 = ptc.0[1] as u64; + + let op_pool = OperationPool::::new(); + + let msg0 = make_payload_attestation_message(target_slot, ptc_member_0, parent_root); + let msg1 = make_payload_attestation_message(target_slot, ptc_member_1, parent_root); + op_pool.insert_payload_attestation_message(msg0).unwrap(); + op_pool.insert_payload_attestation_message(msg1).unwrap(); + + // Advance state to slot 2 so get_payload_attestations looks at slot 1. + let mut advanced_state = state.clone(); + state_processing::state_advance::complete_state_advance( + &mut advanced_state, + None, + Slot::new(2), + &spec, + ) + .unwrap(); + + let attestations = op_pool + .get_payload_attestations(&advanced_state, parent_root, &spec) + .unwrap(); + + assert_eq!(attestations.len(), 1); + assert_eq!(attestations[0].aggregation_bits.num_set_bits(), 2); + assert!(attestations[0].aggregation_bits.get(0).unwrap()); + assert!(attestations[0].aggregation_bits.get(1).unwrap()); + assert!(attestations[0].data.payload_present); + } + + #[tokio::test] + async fn payload_attestation_multiple_data_combos_capped() { + let spec = test_spec::(); + if spec.gloas_fork_epoch.is_none() { + return; + }; + + let num_validators = 64; + let harness = get_harness::(num_validators, Some(spec.clone())); + + harness + .add_attested_blocks_at_slots( + harness.get_current_state(), + Hash256::zero(), + &[Slot::new(1)], + (0..num_validators).collect::>().as_slice(), + ) + .await; + + let head = harness.chain.canonical_head.cached_head(); + let state = &head.snapshot.beacon_state; + let target_slot = Slot::new(1); + let parent_root = head.head_block_root(); + let ptc = state.get_ptc(target_slot, &spec).unwrap(); + + let op_pool = OperationPool::::new(); + + // Given: PTC members vote with all 4 boolean combos, with varying participation. + let combos: [(bool, bool, &[usize]); 4] = [ + (true, true, &[0, 1, 2]), + (true, false, &[3, 4]), + (false, true, &[5]), + (false, false, &[6]), + ]; + for (payload_present, blob_available, positions) in &combos { + for &pos in *positions { + let validator_index = ptc.0[pos] as u64; + let msg = make_payload_attestation_message_with_flags( + target_slot, + validator_index, + parent_root, + *payload_present, + *blob_available, + ); + op_pool.insert_payload_attestation_message(msg).unwrap(); + } + } + + // When: we pack attestations for block production at slot 2. + let mut advanced_state = state.clone(); + state_processing::state_advance::complete_state_advance( + &mut advanced_state, + None, + Slot::new(2), + &spec, + ) + .unwrap(); + let attestations = op_pool + .get_payload_attestations(&advanced_state, parent_root, &spec) + .unwrap(); + + // Then: one attestation per combo, sorted by participation (most first). + assert_eq!(attestations.len(), 4); + let bit_counts: Vec<_> = attestations + .iter() + .map(|a| a.aggregation_bits.num_set_bits()) + .collect(); + assert_eq!(bit_counts, vec![3, 2, 1, 1]); + } } diff --git a/beacon_node/operation_pool/src/persistence.rs b/beacon_node/operation_pool/src/persistence.rs index 241b5fec53..56aafc27fe 100644 --- a/beacon_node/operation_pool/src/persistence.rs +++ b/beacon_node/operation_pool/src/persistence.rs @@ -209,6 +209,7 @@ impl PersistedOperationPool { proposer_slashings, voluntary_exits, bls_to_execution_changes: RwLock::new(bls_to_execution_changes), + payload_attestation_messages: Default::default(), reward_cache: Default::default(), _phantom: Default::default(), }; diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 4ec75468a2..e866547b9f 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -46,7 +46,7 @@ use ssz::{Decode, Encode}; use std::fmt; use std::future::Future; use std::time::Duration; -use types::PayloadAttestationData; +use types::{PayloadAttestationData, PayloadAttestationMessage}; pub const V1: EndpointVersion = EndpointVersion(1); pub const V2: EndpointVersion = EndpointVersion(2); @@ -1789,6 +1789,48 @@ impl BeaconNodeHttpClient { Ok(()) } + /// `POST beacon/pool/payload_attestations` (JSON) + pub async fn post_beacon_pool_payload_attestations( + &self, + messages: &[PayloadAttestationMessage], + fork_name: ForkName, + ) -> Result<(), Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("pool") + .push("payload_attestations"); + + self.post_generic_with_consensus_version(path, &messages, None, fork_name) + .await?; + + Ok(()) + } + + /// `POST beacon/pool/payload_attestations` (SSZ) + pub async fn post_beacon_pool_payload_attestations_ssz( + &self, + messages: &[PayloadAttestationMessage], + fork_name: ForkName, + ) -> Result<(), Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("pool") + .push("payload_attestations"); + + let ssz_body: Vec = messages.iter().flat_map(|m| m.as_ssz_bytes()).collect(); + + self.post_generic_with_consensus_version_and_ssz_body(path, ssz_body, None, fork_name) + .await?; + + Ok(()) + } + /// `POST beacon/pool/bls_to_execution_changes` pub async fn post_beacon_pool_bls_to_execution_changes( &self, diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index f9d779fd24..a9e62dbe94 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -1351,7 +1351,7 @@ where let ptc_indices: Vec = attestation .attesting_indices .iter() - .filter_map(|vi| ptc.iter().position(|&p| p == *vi as usize)) + .filter_map(|validator_index| ptc.iter().position(|&p| p == *validator_index as usize)) .collect(); // Check that all the attesters are in the PTC diff --git a/consensus/types/src/core/execution_block_hash.rs b/consensus/types/src/core/execution_block_hash.rs index cbacf7cf74..71e63727ee 100644 --- a/consensus/types/src/core/execution_block_hash.rs +++ b/consensus/types/src/core/execution_block_hash.rs @@ -5,7 +5,10 @@ use rand::RngCore; use serde::{Deserialize, Serialize}; use ssz::{Decode, DecodeError, Encode}; -use crate::{core::Hash256, test_utils::TestRandom}; +use crate::{ + core::{Hash256, Hash256Ext}, + test_utils::TestRandom, +}; #[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] #[derive(Default, Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Hash)] @@ -20,13 +23,7 @@ impl fmt::Debug for ExecutionBlockHash { impl fmt::Display for ExecutionBlockHash { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let hash = format!("{}", self.0); - write!( - f, - "{}…{}", - &hash[..6], - &hash[hash.len().saturating_sub(4)..] - ) + self.0.short().fmt(f) } } diff --git a/consensus/types/src/core/mod.rs b/consensus/types/src/core/mod.rs index 4e583fbc67..f722ac5191 100644 --- a/consensus/types/src/core/mod.rs +++ b/consensus/types/src/core/mod.rs @@ -49,3 +49,29 @@ pub type Hash64 = alloy_primitives::B64; pub type Address = alloy_primitives::Address; pub type VersionedHash = Hash256; pub type MerkleProof = Vec; + +/// Extension trait for `Hash256` to allow us to implement additional methods on it. +pub trait Hash256Ext { + fn short(&self) -> ShortenedHash<'_>; +} + +impl Hash256Ext for Hash256 { + fn short(&self) -> ShortenedHash<'_> { + ShortenedHash(self) + } +} + +pub struct ShortenedHash<'a>(&'a Hash256); + +impl<'a> std::fmt::Display for ShortenedHash<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + let hash: &[u8; 32] = self.0.as_ref(); + write!( + f, + // Format as hex, padded to 2 digits per byte. + // This outputs a consistent "0x1234...abcd" format. + "0x{:02x}{:02x}…{:02x}{:02x}", + hash[0], hash[1], hash[30], hash[31] + ) + } +} diff --git a/validator_client/lighthouse_validator_store/src/lib.rs b/validator_client/lighthouse_validator_store/src/lib.rs index c5bcd88eb1..1b32777678 100644 --- a/validator_client/lighthouse_validator_store/src/lib.rs +++ b/validator_client/lighthouse_validator_store/src/lib.rs @@ -21,11 +21,12 @@ use tracing::{Instrument, debug, error, info, info_span, instrument, warn}; use types::{ AbstractExecPayload, Address, AggregateAndProof, Attestation, BeaconBlock, BlindedPayload, ChainSpec, ContributionAndProof, Domain, Epoch, EthSpec, ExecutionPayloadEnvelope, Fork, - FullPayload, Graffiti, Hash256, SelectionProof, SignedAggregateAndProof, SignedBeaconBlock, - SignedContributionAndProof, SignedExecutionPayloadEnvelope, SignedRoot, - SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncAggregatorSelectionData, - SyncCommitteeContribution, SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId, - ValidatorRegistrationData, VoluntaryExit, graffiti::GraffitiString, + FullPayload, Graffiti, Hash256, PayloadAttestationData, PayloadAttestationMessage, + SelectionProof, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, + SignedExecutionPayloadEnvelope, SignedRoot, SignedValidatorRegistrationData, + SignedVoluntaryExit, Slot, SyncAggregatorSelectionData, SyncCommitteeContribution, + SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId, ValidatorRegistrationData, + VoluntaryExit, graffiti::GraffitiString, }; use validator_store::{ AggregateToSign, AttestationToSign, ContributionToSign, DoppelgangerStatus, @@ -1423,6 +1424,37 @@ impl ValidatorStore for LighthouseValidatorS }) } + async fn sign_payload_attestation( + &self, + validator_pubkey: PublicKeyBytes, + data: PayloadAttestationData, + ) -> Result { + let signing_context = + self.signing_context(Domain::PTCAttester, data.slot.epoch(E::slots_per_epoch())); + + let validator_index = self + .validator_index(&validator_pubkey) + .ok_or(ValidatorStoreError::UnknownPubkey(validator_pubkey))?; + + let signing_method = self.doppelganger_bypassed_signing_method(validator_pubkey)?; + + let signature = signing_method + .get_signature::>( + SignableMessage::PayloadAttestationData(&data), + signing_context, + &self.spec, + &self.task_executor, + ) + .await + .map_err(Error::SpecificError)?; + + Ok(PayloadAttestationMessage { + validator_index, + data, + signature, + }) + } + /// Sign an `ExecutionPayloadEnvelope` for Gloas (local building). /// The proposer acts as the builder and signs with the BeaconBuilder domain. async fn sign_execution_payload_envelope( diff --git a/validator_client/signing_method/src/lib.rs b/validator_client/signing_method/src/lib.rs index c132d86c17..2f80fa5761 100644 --- a/validator_client/signing_method/src/lib.rs +++ b/validator_client/signing_method/src/lib.rs @@ -50,6 +50,7 @@ pub enum SignableMessage<'a, E: EthSpec, Payload: AbstractExecPayload = FullP ValidatorRegistration(&'a ValidatorRegistrationData), VoluntaryExit(&'a VoluntaryExit), ExecutionPayloadEnvelope(&'a ExecutionPayloadEnvelope), + PayloadAttestationData(&'a PayloadAttestationData), } impl> SignableMessage<'_, E, Payload> { @@ -72,6 +73,7 @@ impl> SignableMessage<'_, E, Payload SignableMessage::ValidatorRegistration(v) => v.signing_root(domain), SignableMessage::VoluntaryExit(exit) => exit.signing_root(domain), SignableMessage::ExecutionPayloadEnvelope(e) => e.signing_root(domain), + SignableMessage::PayloadAttestationData(d) => d.signing_root(domain), } } } @@ -238,6 +240,9 @@ impl SigningMethod { SignableMessage::ExecutionPayloadEnvelope(e) => { Web3SignerObject::ExecutionPayloadEnvelope(e) } + SignableMessage::PayloadAttestationData(d) => { + Web3SignerObject::PayloadAttestationData(d) + } }; // Determine the Web3Signer message type. diff --git a/validator_client/signing_method/src/web3signer.rs b/validator_client/signing_method/src/web3signer.rs index e6fc8f3ba2..c2b7e06f92 100644 --- a/validator_client/signing_method/src/web3signer.rs +++ b/validator_client/signing_method/src/web3signer.rs @@ -21,6 +21,7 @@ pub enum MessageType { ValidatorRegistration, // TODO(gloas) verify w/ web3signer specs ExecutionPayloadEnvelope, + PayloadAttestation, } #[derive(Debug, PartialEq, Copy, Clone, Serialize)] @@ -78,6 +79,7 @@ pub enum Web3SignerObject<'a, E: EthSpec, Payload: AbstractExecPayload> { ContributionAndProof(&'a ContributionAndProof), ValidatorRegistration(&'a ValidatorRegistrationData), ExecutionPayloadEnvelope(&'a ExecutionPayloadEnvelope), + PayloadAttestationData(&'a PayloadAttestationData), } impl<'a, E: EthSpec, Payload: AbstractExecPayload> Web3SignerObject<'a, E, Payload> { @@ -144,6 +146,7 @@ impl<'a, E: EthSpec, Payload: AbstractExecPayload> Web3SignerObject<'a, E, Pa } Web3SignerObject::ValidatorRegistration(_) => MessageType::ValidatorRegistration, Web3SignerObject::ExecutionPayloadEnvelope(_) => MessageType::ExecutionPayloadEnvelope, + Web3SignerObject::PayloadAttestationData(_) => MessageType::PayloadAttestation, } } } diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index e26d5c3d30..b412db45f6 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -45,6 +45,7 @@ use validator_services::{ block_service::{BlockService, BlockServiceBuilder}, duties_service::{self, DutiesService, DutiesServiceBuilder}, latency_service, + payload_attestation_service::PayloadAttestationService, preparation_service::{PreparationService, PreparationServiceBuilder}, sync_committee_service::SyncCommitteeService, }; @@ -83,6 +84,7 @@ pub struct ProductionValidatorClient { block_service: BlockService, SystemTimeSlotClock>, attestation_service: AttestationService, SystemTimeSlotClock>, sync_committee_service: SyncCommitteeService, SystemTimeSlotClock>, + payload_attestation_service: PayloadAttestationService, SystemTimeSlotClock>, doppelganger_service: Option>, preparation_service: PreparationService, SystemTimeSlotClock>, validator_store: Arc>, @@ -552,12 +554,22 @@ impl ProductionValidatorClient { context.executor.clone(), ); + let payload_attestation_service = PayloadAttestationService::new( + duties_service.clone(), + validator_store.clone(), + slot_clock.clone(), + beacon_nodes.clone(), + context.executor.clone(), + context.eth2_config.spec.clone(), + ); + Ok(Self { context, duties_service, block_service, attestation_service, sync_committee_service, + payload_attestation_service, doppelganger_service, preparation_service, validator_store, @@ -629,6 +641,13 @@ impl ProductionValidatorClient { .start_update_service(&self.context.eth2_config.spec) .map_err(|e| format!("Unable to start sync committee service: {}", e))?; + if self.context.eth2_config.spec.is_gloas_scheduled() { + self.payload_attestation_service + .clone() + .start_update_service() + .map_err(|e| format!("Unable to start payload attestation service: {}", e))?; + } + self.preparation_service .clone() .start_update_service(&self.context.eth2_config.spec) diff --git a/validator_client/validator_services/src/lib.rs b/validator_client/validator_services/src/lib.rs index 3b8bd9ae14..0169335a7f 100644 --- a/validator_client/validator_services/src/lib.rs +++ b/validator_client/validator_services/src/lib.rs @@ -3,6 +3,7 @@ pub mod block_service; pub mod duties_service; pub mod latency_service; pub mod notifier_service; +pub mod payload_attestation_service; pub mod preparation_service; pub mod sync; pub mod sync_committee_service; diff --git a/validator_client/validator_services/src/payload_attestation_service.rs b/validator_client/validator_services/src/payload_attestation_service.rs new file mode 100644 index 0000000000..24949edc1f --- /dev/null +++ b/validator_client/validator_services/src/payload_attestation_service.rs @@ -0,0 +1,243 @@ +use crate::duties_service::DutiesService; +use beacon_node_fallback::BeaconNodeFallback; +use logging::crit; +use slot_clock::SlotClock; +use std::ops::Deref; +use std::sync::Arc; +use task_executor::TaskExecutor; +use tokio::time::sleep; +use tracing::{debug, error, info}; +use types::{ChainSpec, EthSpec}; +use validator_store::ValidatorStore; + +pub struct Inner { + duties_service: Arc>, + validator_store: Arc, + slot_clock: T, + beacon_nodes: Arc>, + executor: TaskExecutor, + chain_spec: Arc, +} + +pub struct PayloadAttestationService { + inner: Arc>, +} + +impl Clone for PayloadAttestationService { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl Deref for PayloadAttestationService { + type Target = Inner; + + fn deref(&self) -> &Self::Target { + self.inner.deref() + } +} + +impl PayloadAttestationService { + pub fn new( + duties_service: Arc>, + validator_store: Arc, + slot_clock: T, + beacon_nodes: Arc>, + executor: TaskExecutor, + chain_spec: Arc, + ) -> Self { + Self { + inner: Arc::new(Inner { + duties_service, + validator_store, + slot_clock, + beacon_nodes, + executor, + chain_spec, + }), + } + } + + pub fn start_update_service(self) -> Result<(), String> { + let slot_duration = self.chain_spec.get_slot_duration(); + let payload_attestation_due = self.chain_spec.get_payload_attestation_due(); + + info!( + payload_attestation_due_ms = payload_attestation_due.as_millis(), + "Payload attestation service started" + ); + + let executor = self.executor.clone(); + + let interval_fut = async move { + loop { + let Some(duration_to_next_slot) = self.slot_clock.duration_to_next_slot() else { + error!("Failed to read slot clock"); + sleep(slot_duration).await; + continue; + }; + + let Some(current_slot) = self.slot_clock.now() else { + error!("Failed to read slot clock after trigger"); + continue; + }; + + if !self + .chain_spec + .fork_name_at_slot::(current_slot) + .gloas_enabled() + { + let duration_to_next_epoch = self + .slot_clock + .duration_to_next_epoch(S::E::slots_per_epoch()) + .unwrap_or_else(|| { + self.chain_spec.get_slot_duration() * S::E::slots_per_epoch() as u32 + }); + sleep(duration_to_next_epoch).await; + continue; + } + + sleep(duration_to_next_slot + payload_attestation_due).await; + + let Some(attestation_slot) = self.slot_clock.now() else { + error!("Failed to read slot clock after sleep"); + continue; + }; + + let service = self.clone(); + self.executor.spawn( + async move { + service.produce_and_publish(attestation_slot).await; + }, + "payload_attestation_producer", + ); + } + }; + + executor.spawn(interval_fut, "payload_attestation_service"); + Ok(()) + } + + async fn produce_and_publish(&self, slot: types::Slot) { + let duties = self.duties_service.get_ptc_duties_for_slot(slot); + + if duties.is_empty() { + return; + } + + debug!( + %slot, + duty_count = duties.len(), + "Producing payload attestations" + ); + + let attestation_data = match self + .beacon_nodes + .first_success(|beacon_node| async move { + beacon_node + .get_validator_payload_attestation_data(slot) + .await + .map_err(|e| format!("Failed to get payload attestation data: {e:?}")) + .map(|resp| resp.into_data()) + }) + .await + { + Ok(data) => data, + Err(e) => { + crit!( + error = %e, + %slot, + "Failed to produce payload attestation data" + ); + return; + } + }; + + debug!( + %slot, + beacon_block_root = ?attestation_data.beacon_block_root, + payload_present = attestation_data.payload_present, + "Received payload attestation data" + ); + + let mut messages = Vec::with_capacity(duties.len()); + + for duty in &duties { + match self + .validator_store + .sign_payload_attestation(duty.pubkey, attestation_data.clone()) + .await + { + Ok(message) => { + messages.push(message); + } + Err(e) => { + crit!( + error = ?e, + validator = ?duty.pubkey, + %slot, + "Failed to sign payload attestation" + ); + } + } + } + + if messages.is_empty() { + return; + } + + let count = messages.len(); + let fork_name = self.chain_spec.fork_name_at_slot::(slot); + let result = self + .beacon_nodes + .first_success(|beacon_node| { + let messages = messages.clone(); + async move { + beacon_node + .post_beacon_pool_payload_attestations_ssz(&messages, fork_name) + .await + .map_err(|e| format!("Failed to publish payload attestations (SSZ): {e:?}")) + } + }) + .await; + + let result = match result { + Ok(()) => Ok(()), + Err(_) => { + debug!(%slot, "SSZ publish failed, falling back to JSON"); + self.beacon_nodes + .first_success(|beacon_node| { + let messages = messages.clone(); + async move { + beacon_node + .post_beacon_pool_payload_attestations(&messages, fork_name) + .await + .map_err(|e| { + format!("Failed to publish payload attestations (JSON): {e:?}") + }) + } + }) + .await + } + }; + + match result { + Ok(()) => { + info!( + %slot, + %count, + "Successfully published payload attestations" + ); + } + Err(e) => { + crit!( + error = %e, + %slot, + "Failed to publish payload attestations" + ); + } + } + } +} diff --git a/validator_client/validator_store/src/lib.rs b/validator_client/validator_store/src/lib.rs index da0b33de18..4e5b415a41 100644 --- a/validator_client/validator_store/src/lib.rs +++ b/validator_client/validator_store/src/lib.rs @@ -7,10 +7,11 @@ use std::future::Future; use std::sync::Arc; use types::{ Address, Attestation, AttestationError, BlindedBeaconBlock, Epoch, EthSpec, - ExecutionPayloadEnvelope, Graffiti, Hash256, SelectionProof, SignedAggregateAndProof, - SignedBlindedBeaconBlock, SignedContributionAndProof, SignedExecutionPayloadEnvelope, - SignedValidatorRegistrationData, Slot, SyncCommitteeContribution, SyncCommitteeMessage, - SyncSelectionProof, SyncSubnetId, ValidatorRegistrationData, + ExecutionPayloadEnvelope, Graffiti, Hash256, PayloadAttestationData, PayloadAttestationMessage, + SelectionProof, SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedContributionAndProof, + SignedExecutionPayloadEnvelope, SignedValidatorRegistrationData, Slot, + SyncCommitteeContribution, SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId, + ValidatorRegistrationData, }; #[derive(Debug, PartialEq, Clone)] @@ -205,6 +206,13 @@ pub trait ValidatorStore: Send + Sync { envelope: ExecutionPayloadEnvelope, ) -> impl Future, Error>> + Send; + /// Sign a `PayloadAttestationData` for the PTC. + fn sign_payload_attestation( + &self, + validator_pubkey: PublicKeyBytes, + data: PayloadAttestationData, + ) -> impl Future>> + Send; + /// Returns `ProposalData` for the provided `pubkey` if it exists in `InitializedValidators`. /// `ProposalData` fields include defaulting logic described in `get_fee_recipient_defaulting`, /// `get_gas_limit_defaulting`, and `get_builder_proposals_defaulting`. diff --git a/validator_manager/Cargo.toml b/validator_manager/Cargo.toml index d0155698b4..7dabd5445c 100644 --- a/validator_manager/Cargo.toml +++ b/validator_manager/Cargo.toml @@ -11,7 +11,7 @@ clap = { workspace = true } clap_utils = { workspace = true } educe = { workspace = true } environment = { workspace = true } -eth2 = { workspace = true } +eth2 = { workspace = true, features = ["lighthouse"] } eth2_network_config = { workspace = true } eth2_wallet = { workspace = true } ethereum_serde_utils = { workspace = true }