diff --git a/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs b/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs index 2d9fce812e..c36c73b344 100644 --- a/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs +++ b/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs @@ -6,6 +6,7 @@ use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::{BeaconChain, BeaconChainError, BeaconChainTypes, metrics}; use bls::AggregateSignature; use educe::Educe; +use eth2::types::{EventKind, ForkVersionedResponse}; use parking_lot::RwLock; use safe_arith::SafeArith; use slot_clock::SlotClock; @@ -216,9 +217,24 @@ impl BeaconChain { let _timer = metrics::start_timer(&metrics::PAYLOAD_ATTESTATION_GOSSIP_VERIFICATION_TIMES); let ctx = self.payload_attestation_gossip_context(); - VerifiedPayloadAttestationMessage::new(payload_attestation_message, &ctx).inspect(|_| { - metrics::inc_counter(&metrics::PAYLOAD_ATTESTATION_PROCESSING_SUCCESSES); - }) + VerifiedPayloadAttestationMessage::new(payload_attestation_message, &ctx).inspect( + |verified| { + metrics::inc_counter(&metrics::PAYLOAD_ATTESTATION_PROCESSING_SUCCESSES); + + if let Some(event_handler) = self.event_handler.as_ref() + && event_handler.has_payload_attestation_message_subscribers() + { + let msg = verified.payload_attestation_message(); + event_handler.register(EventKind::PayloadAttestationMessage(Box::new( + ForkVersionedResponse { + version: self.spec.fork_name_at_slot::(msg.data.slot), + metadata: Default::default(), + data: msg.clone(), + }, + ))); + } + }, + ) } } diff --git a/beacon_node/beacon_chain/src/payload_bid_verification/gossip_verified_bid.rs b/beacon_node/beacon_chain/src/payload_bid_verification/gossip_verified_bid.rs index 91945896df..1f3f074598 100644 --- a/beacon_node/beacon_chain/src/payload_bid_verification/gossip_verified_bid.rs +++ b/beacon_node/beacon_chain/src/payload_bid_verification/gossip_verified_bid.rs @@ -6,6 +6,7 @@ use crate::{ proposer_preferences_verification::proposer_preference_cache::GossipVerifiedProposerPreferenceCache, }; use educe::Educe; +use eth2::types::{EventKind, ForkVersionedResponse}; use slot_clock::SlotClock; use state_processing::signature_sets::{ execution_payload_bid_signature_set, get_builder_pubkey_from_state, @@ -233,6 +234,19 @@ impl BeaconChain { %parent_block_root, "Successfully verified gossip payload bid" ); + + if let Some(event_handler) = self.event_handler.as_ref() + && event_handler.has_execution_payload_bid_subscribers() + { + event_handler.register(EventKind::ExecutionPayloadBid(Box::new( + ForkVersionedResponse { + version: self.spec.fork_name_at_slot::(slot), + metadata: Default::default(), + data: (*verified.signed_bid).clone(), + }, + ))); + } + Ok(verified) } Err(e) => { diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs index 5a6d3a1b7d..b40e8337fb 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use std::time::Duration; -use eth2::types::{EventKind, SseExecutionPayload}; +use eth2::types::{EventKind, SseExecutionPayload, SseExecutionPayloadAvailable}; use fork_choice::PayloadVerificationStatus; use slot_clock::SlotClock; use store::StoreOp; @@ -182,6 +182,7 @@ impl BeaconChain { signed_envelope, import_data, payload_verification_outcome, + self.spec.clone(), )) } @@ -362,5 +363,18 @@ impl BeaconChain { execution_optimistic: payload_verification_status.is_optimistic(), })); } + + // TODO(gloas): once the DA checker handles envelopes, this event should also be + // emitted from the DA resolution path (similar to `process_availability` for blocks). + if let Some(event_handler) = self.event_handler.as_ref() + && event_handler.has_execution_payload_available_subscribers() + { + event_handler.register(EventKind::ExecutionPayloadAvailable( + SseExecutionPayloadAvailable { + slot: envelope_slot, + block_root, + }, + )); + } } } diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs index 51fc3f235d..b153a3cd6a 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs @@ -60,6 +60,22 @@ pub struct AvailableEnvelope { } impl AvailableEnvelope { + pub fn new( + execution_block_hash: ExecutionBlockHash, + envelope: Arc>, + columns: DataColumnSidecarList, + columns_available_timestamp: Option, + spec: Arc, + ) -> Self { + Self { + execution_block_hash, + envelope, + columns, + columns_available_timestamp, + spec, + } + } + pub fn message(&self) -> &ExecutionPayloadEnvelope { &self.envelope.message } @@ -104,9 +120,10 @@ pub struct EnvelopeProcessingSnapshot { /// fully available. /// 2. `AvailabilityPending`: This envelope hasn't received all required blobs to consider it /// fully available. +#[allow(dead_code)] pub enum ExecutedEnvelope { Available(AvailableExecutedEnvelope), - // TODO(gloas) implement availability pending + // TODO(gloas): check data column availability via DA checker AvailabilityPending(), } @@ -115,6 +132,7 @@ impl ExecutedEnvelope { envelope: MaybeAvailableEnvelope, import_data: EnvelopeImportData, payload_verification_outcome: PayloadVerificationOutcome, + spec: Arc, ) -> Self { match envelope { MaybeAvailableEnvelope::Available(available_envelope) => { @@ -124,11 +142,15 @@ impl ExecutedEnvelope { payload_verification_outcome, )) } - // TODO(gloas) implement availability pending + // TODO(gloas): check data column availability via DA checker MaybeAvailableEnvelope::AvailabilityPending { - block_hash: _, - envelope: _, - } => Self::AvailabilityPending(), + block_hash, + envelope, + } => Self::Available(AvailableExecutedEnvelope::new( + AvailableEnvelope::new(block_hash, envelope, vec![], None, spec), + import_data, + payload_verification_outcome, + )), } } } diff --git a/beacon_node/beacon_chain/tests/events.rs b/beacon_node/beacon_chain/tests/events.rs index 5305965f0f..e943514c4e 100644 --- a/beacon_node/beacon_chain/tests/events.rs +++ b/beacon_node/beacon_chain/tests/events.rs @@ -10,8 +10,8 @@ use std::sync::Arc; use types::data::FixedBlobSidecarList; use types::test_utils::TestRandom; use types::{ - BlobSidecar, DataColumnSidecar, DataColumnSidecarFulu, DataColumnSidecarGloas, EthSpec, - MinimalEthSpec, Slot, + BlobSidecar, DataColumnSidecar, DataColumnSidecarFulu, DataColumnSidecarGloas, Domain, EthSpec, + MinimalEthSpec, PayloadAttestationData, PayloadAttestationMessage, SignedRoot, Slot, }; type E = MinimalEthSpec; @@ -258,3 +258,177 @@ async fn head_event_on_block_import() { panic!("Expected Head event, got {:?}", head_event); } } + +/// Verifies that `execution_payload_gossip` fires at gossip verification time, and +/// `execution_payload` + `execution_payload_available` fire at import time. +#[tokio::test] +async fn execution_payload_envelope_events() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + + let harness = BeaconChainHarness::builder(E::default()) + .default_spec() + .deterministic_keypairs(64) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + harness.extend_to_slot(Slot::new(1)).await; + + let state = harness.get_current_state(); + let target_slot = Slot::new(2); + harness.advance_slot(); + let (block_contents, opt_envelope, _new_state) = + harness.make_block_with_envelope(state, target_slot).await; + + let block_root = block_contents.0.canonical_root(); + + harness + .process_block(target_slot, block_root, block_contents) + .await + .expect("block should be processed"); + + let signed_envelope = opt_envelope.expect("Gloas block should produce an envelope"); + + let event_handler = harness.chain.event_handler.as_ref().unwrap(); + let mut gossip_receiver = event_handler.subscribe_execution_payload_gossip(); + let mut payload_receiver = event_handler.subscribe_execution_payload(); + let mut available_receiver = event_handler.subscribe_execution_payload_available(); + + // Stage 1: gossip verification fires execution_payload_gossip only. + let gossip_verified = harness + .chain + .verify_envelope_for_gossip(Arc::new(signed_envelope)) + .await + .expect("envelope gossip verification should succeed"); + + let gossip_event = gossip_receiver + .try_recv() + .expect("should receive execution_payload_gossip after gossip verification"); + if let EventKind::ExecutionPayloadGossip(sse) = gossip_event { + assert_eq!(sse.slot, target_slot); + assert_eq!(sse.block_root, block_root); + } else { + panic!( + "Expected ExecutionPayloadGossip event, got {:?}", + gossip_event + ); + } + assert!(payload_receiver.try_recv().is_err()); + assert!(available_receiver.try_recv().is_err()); + + // Stage 2: import fires execution_payload and execution_payload_available. + harness + .chain + .process_execution_payload_envelope( + block_root, + gossip_verified, + beacon_chain::NotifyExecutionLayer::Yes, + types::BlockImportSource::Gossip, + #[allow(clippy::result_large_err)] + || Ok(()), + ) + .await + .expect("envelope import should succeed"); + + let payload_event = payload_receiver + .try_recv() + .expect("should receive execution_payload after import"); + if let EventKind::ExecutionPayload(sse) = payload_event { + assert_eq!(sse.slot, target_slot); + assert_eq!(sse.block_root, block_root); + } else { + panic!("Expected ExecutionPayload event, got {:?}", payload_event); + } + + let available_event = available_receiver + .try_recv() + .expect("should receive execution_payload_available after import"); + if let EventKind::ExecutionPayloadAvailable(sse) = available_event { + assert_eq!(sse.slot, target_slot); + assert_eq!(sse.block_root, block_root); + } else { + panic!( + "Expected ExecutionPayloadAvailable event, got {:?}", + available_event + ); + } + + assert!( + gossip_receiver.try_recv().is_err(), + "no extra gossip events should fire during import" + ); +} + +/// Verifies that a `payload_attestation_message` event is emitted when a payload attestation +/// message passes gossip verification. +#[tokio::test] +async fn payload_attestation_message_event_on_gossip_verification() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + + let harness = BeaconChainHarness::builder(E::default()) + .default_spec() + .deterministic_keypairs(64) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + // Advance chain to have a valid head block. + let target_slot = Slot::new(1); + harness.extend_to_slot(target_slot).await; + + let head = harness.chain.canonical_head.cached_head(); + let head_state = &head.snapshot.beacon_state; + + // Get a PTC member for this slot. + let ptc = head_state + .get_ptc(target_slot, &harness.spec) + .expect("should get PTC"); + let validator_index = *ptc.0.first().expect("PTC should have at least one member") as u64; + + // Sign a payload attestation. + let target_epoch = target_slot.epoch(E::slots_per_epoch()); + let domain = harness.spec.get_domain( + target_epoch, + Domain::PTCAttester, + &head_state.fork(), + head_state.genesis_validators_root(), + ); + let data = PayloadAttestationData { + beacon_block_root: head.head_block_root(), + slot: target_slot, + payload_present: true, + blob_data_available: true, + }; + let message = data.signing_root(domain); + let signature = harness.validator_keypairs[validator_index as usize] + .sk + .sign(message); + let msg = PayloadAttestationMessage { + validator_index, + data: data.clone(), + signature: signature.clone(), + }; + + // Subscribe before verification. + let event_handler = harness.chain.event_handler.as_ref().unwrap(); + let mut receiver = event_handler.subscribe_payload_attestation_message(); + + // Verify the attestation through the gossip path. + harness + .chain + .verify_payload_attestation_message_for_gossip(msg) + .expect("verification should succeed"); + + // Assert the event was emitted. + let event = receiver.try_recv().expect("should receive event"); + if let EventKind::PayloadAttestationMessage(versioned) = event { + assert_eq!(versioned.data.validator_index, validator_index); + assert_eq!(versioned.data.data, data); + } else { + panic!("Expected PayloadAttestationMessage event, got {:?}", event); + } +}