diff --git a/beacon_node/beacon_chain/src/block_production/gloas.rs b/beacon_node/beacon_chain/src/block_production/gloas.rs index f97f47c37c..79ea78ce4a 100644 --- a/beacon_node/beacon_chain/src/block_production/gloas.rs +++ b/beacon_node/beacon_chain/src/block_production/gloas.rs @@ -2,16 +2,17 @@ use std::collections::{HashMap, HashSet}; use std::marker::PhantomData; use std::sync::Arc; -use bls::Signature; +use bls::{PublicKeyBytes, Signature}; use execution_layer::{ BlockProposalContentsGloas, BuilderParams, PayloadAttributes, PayloadParameters, }; use fork_choice::PayloadStatus; use operation_pool::CompactAttestationRef; use ssz::Encode; -use state_processing::common::get_attesting_indices_from_state; +use state_processing::common::{get_attesting_indices_from_state, get_indexed_payload_attestation}; use state_processing::envelope_processing::verify_execution_payload_envelope; use state_processing::epoch_cache::initialize_epoch_cache; +use state_processing::per_block_processing::is_valid_indexed_payload_attestation; use state_processing::per_block_processing::{ apply_parent_execution_payload, compute_timestamp_at_slot, get_expected_withdrawals, verify_attestation_for_block_inclusion, @@ -27,7 +28,7 @@ use types::consts::gloas::BUILDER_INDEX_SELF_BUILD; use types::{ Address, Attestation, AttestationElectra, AttesterSlashing, AttesterSlashingElectra, BeaconBlock, BeaconBlockBodyGloas, BeaconBlockGloas, BeaconState, BeaconStateError, - BuilderIndex, Deposit, Eth1Data, EthSpec, ExecutionBlockHash, ExecutionPayloadBid, + BuilderIndex, ChainSpec, Deposit, Eth1Data, EthSpec, ExecutionBlockHash, ExecutionPayloadBid, ExecutionPayloadEnvelope, ExecutionPayloadGloas, ExecutionRequests, FullPayload, Graffiti, Hash256, PayloadAttestation, ProposerSlashing, RelativeEpoch, SignedBeaconBlock, SignedBlsToExecutionChange, SignedExecutionPayloadBid, SignedExecutionPayloadEnvelope, @@ -261,30 +262,12 @@ impl BeaconChain { let (mut proposer_slashings, mut attester_slashings, mut voluntary_exits) = self.op_pool.get_slashings_and_exits(&state, &self.spec); - // Filter out voluntary exits that conflict with parent execution requests. - let mut exited_pubkeys = HashSet::with_capacity( - parent_execution_requests.withdrawals.len() - + parent_execution_requests.consolidations.len(), + filter_voluntary_exits_for_parent_execution_requests( + &mut voluntary_exits, + parent_execution_requests, + |idx| state.validators().get(idx as usize).map(|v| v.pubkey), + &self.spec, ); - for req in &parent_execution_requests.withdrawals { - if req.amount == self.spec.full_exit_request_amount { - exited_pubkeys.insert(req.validator_pubkey); - } - } - for req in &parent_execution_requests.consolidations { - if req.source_pubkey != req.target_pubkey { - exited_pubkeys.insert(req.source_pubkey); - } - } - if !exited_pubkeys.is_empty() { - voluntary_exits.retain(|exit| { - state - .validators() - .get(exit.message.validator_index as usize) - .map(|v| !exited_pubkeys.contains(&v.pubkey)) - .unwrap_or(false) - }); - } drop(slashings_and_exits_span); @@ -349,6 +332,11 @@ impl BeaconChain { .map_err(BlockProductionError::OpPoolError)? }; + let mut payload_attestations = self + .op_pool + .get_payload_attestations(&state, parent_root, &self.spec) + .map_err(BlockProductionError::OpPoolError)?; + // If paranoid mode is enabled re-check the signatures of every included message. // This will be a lot slower but guards against bugs in block production and can be // quickly rolled out without a release. @@ -373,6 +361,35 @@ impl BeaconChain { .is_ok() }); + payload_attestations.retain(|att| { + match get_indexed_payload_attestation(&state, att, &self.spec) { + Ok(indexed) => is_valid_indexed_payload_attestation( + &state, + &indexed, + VerifySignatures::True, + &self.spec, + ) + .map_err(|e| { + warn!( + err = ?e, + block_slot = %state.slot(), + ?att, + "Attempted to include a payload attestation with invalid signature" + ); + }) + .is_ok(), + Err(e) => { + warn!( + err = ?e, + block_slot = %state.slot(), + ?att, + "Failed to index payload attestation for verification" + ); + false + } + } + }); + proposer_slashings.retain(|slashing| { slashing .clone() @@ -416,8 +433,6 @@ impl BeaconChain { }) .is_ok() }); - - // TODO(gloas) verify payload attestation signature here as well } let attester_slashings = attester_slashings @@ -464,10 +479,7 @@ impl BeaconChain { deposits, voluntary_exits, sync_aggregate, - payload_attestations: self - .op_pool - .get_payload_attestations(&state, parent_root, &self.spec) - .map_err(BlockProductionError::OpPoolError)?, + payload_attestations, bls_to_execution_changes, }, state, @@ -637,12 +649,12 @@ impl BeaconChain { let envelope_slot = payload_data.slot; // TODO(gloas) might be safer to cache by root instead of by slot. // We should revisit this once this code path + beacon api spec matures - let blobs_and_proofs = payload_data.blobs_and_proofs; + let (blobs, _) = payload_data.blobs_and_proofs; self.pending_payload_envelopes.write().insert( envelope_slot, PendingEnvelopeData { envelope: signed_envelope.message, - blobs_and_proofs: Some(blobs_and_proofs), + blobs: Some(blobs), }, ); @@ -964,3 +976,178 @@ where Ok(block_contents) } + +/// Drop voluntary exits whose target validators will be exited by the parent envelope's +/// execution requests. +/// +/// In Gloas the parent execution payload is processed before voluntary exits during block +/// processing. EL-triggered withdrawal-full-exit requests (EIP-7002) and cross-pubkey +/// consolidation requests (EIP-7251) call `initiate_validator_exit`, setting the target's +/// `exit_epoch`. A voluntary exit for the same validator would then fail with `AlreadyExited`. +fn filter_voluntary_exits_for_parent_execution_requests( + voluntary_exits: &mut Vec, + parent_execution_requests: &ExecutionRequests, + pubkey_at_index: impl Fn(u64) -> Option, + spec: &ChainSpec, +) { + let mut exited_pubkeys = HashSet::with_capacity( + parent_execution_requests.withdrawals.len() + + parent_execution_requests.consolidations.len(), + ); + for req in &parent_execution_requests.withdrawals { + if req.amount == spec.full_exit_request_amount { + exited_pubkeys.insert(req.validator_pubkey); + } + } + for req in &parent_execution_requests.consolidations { + if req.source_pubkey != req.target_pubkey { + exited_pubkeys.insert(req.source_pubkey); + } + } + if !exited_pubkeys.is_empty() { + voluntary_exits.retain(|exit| { + pubkey_at_index(exit.message.validator_index) + .map(|pk| !exited_pubkeys.contains(&pk)) + .unwrap_or(false) + }); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use ssz_types::VariableList; + use types::{ConsolidationRequest, Epoch, MainnetEthSpec, VoluntaryExit, WithdrawalRequest}; + + type TestSpec = MainnetEthSpec; + + fn pubkey(byte: u8) -> PublicKeyBytes { + PublicKeyBytes::deserialize(&[byte; 48]).expect("valid pubkey byte length") + } + + fn exit(validator_index: u64) -> SignedVoluntaryExit { + SignedVoluntaryExit { + message: VoluntaryExit { + epoch: Epoch::new(0), + validator_index, + }, + signature: Signature::empty(), + } + } + + fn requests( + withdrawals: Vec, + consolidations: Vec, + ) -> ExecutionRequests { + ExecutionRequests { + deposits: VariableList::empty(), + withdrawals: VariableList::new(withdrawals).unwrap(), + consolidations: VariableList::new(consolidations).unwrap(), + } + } + + fn run_filter( + exits: &mut Vec, + requests: &ExecutionRequests, + validator_pubkeys: &[PublicKeyBytes], + spec: &ChainSpec, + ) { + filter_voluntary_exits_for_parent_execution_requests( + exits, + requests, + |idx| validator_pubkeys.get(idx as usize).copied(), + spec, + ); + } + + #[test] + fn full_exit_withdrawal_request_filters_matching_voluntary_exit() { + let spec = ChainSpec::mainnet(); + let validators = vec![pubkey(1), pubkey(2)]; + let mut exits = vec![exit(0), exit(1)]; + let reqs = requests( + vec![WithdrawalRequest { + source_address: Address::repeat_byte(0xaa), + validator_pubkey: validators[0], + amount: spec.full_exit_request_amount, + }], + vec![], + ); + + run_filter(&mut exits, &reqs, &validators, &spec); + + assert_eq!(exits.len(), 1); + assert_eq!(exits[0].message.validator_index, 1); + } + + #[test] + fn partial_withdrawal_request_does_not_filter_voluntary_exit() { + let spec = ChainSpec::mainnet(); + let validators = vec![pubkey(1)]; + let mut exits = vec![exit(0)]; + let reqs = requests( + vec![WithdrawalRequest { + source_address: Address::repeat_byte(0xaa), + validator_pubkey: validators[0], + amount: spec.full_exit_request_amount + 1, + }], + vec![], + ); + + run_filter(&mut exits, &reqs, &validators, &spec); + + assert_eq!(exits.len(), 1); + } + + #[test] + fn cross_pubkey_consolidation_filters_voluntary_exit_for_source_only() { + let spec = ChainSpec::mainnet(); + let validators = vec![pubkey(1), pubkey(2), pubkey(3)]; + let mut exits = vec![exit(0), exit(1), exit(2)]; + let reqs = requests( + vec![], + vec![ConsolidationRequest { + source_address: Address::repeat_byte(0xaa), + source_pubkey: validators[1], + target_pubkey: validators[2], + }], + ); + + run_filter(&mut exits, &reqs, &validators, &spec); + + // The source (validator 1) is exited; the target (validator 2) is not. + let remaining: Vec = exits.iter().map(|e| e.message.validator_index).collect(); + assert_eq!(remaining, vec![0, 2]); + } + + #[test] + fn self_consolidation_does_not_filter_voluntary_exit() { + let spec = ChainSpec::mainnet(); + let validators = vec![pubkey(1)]; + let mut exits = vec![exit(0)]; + let reqs = requests( + vec![], + vec![ConsolidationRequest { + source_address: Address::repeat_byte(0xaa), + source_pubkey: validators[0], + target_pubkey: validators[0], + }], + ); + + run_filter(&mut exits, &reqs, &validators, &spec); + + assert_eq!(exits.len(), 1); + } + + #[test] + fn empty_parent_requests_preserve_voluntary_exits() { + let spec = ChainSpec::mainnet(); + let validators = vec![pubkey(1), pubkey(2)]; + let mut exits = vec![exit(0), exit(1)]; + let reqs = requests(vec![], vec![]); + + run_filter(&mut exits, &reqs, &validators, &spec); + + assert_eq!(exits.len(), 2); + } +} diff --git a/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs b/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs index 2d9fce812e..c36c73b344 100644 --- a/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs +++ b/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs @@ -6,6 +6,7 @@ use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::{BeaconChain, BeaconChainError, BeaconChainTypes, metrics}; use bls::AggregateSignature; use educe::Educe; +use eth2::types::{EventKind, ForkVersionedResponse}; use parking_lot::RwLock; use safe_arith::SafeArith; use slot_clock::SlotClock; @@ -216,9 +217,24 @@ impl BeaconChain { let _timer = metrics::start_timer(&metrics::PAYLOAD_ATTESTATION_GOSSIP_VERIFICATION_TIMES); let ctx = self.payload_attestation_gossip_context(); - VerifiedPayloadAttestationMessage::new(payload_attestation_message, &ctx).inspect(|_| { - metrics::inc_counter(&metrics::PAYLOAD_ATTESTATION_PROCESSING_SUCCESSES); - }) + VerifiedPayloadAttestationMessage::new(payload_attestation_message, &ctx).inspect( + |verified| { + metrics::inc_counter(&metrics::PAYLOAD_ATTESTATION_PROCESSING_SUCCESSES); + + if let Some(event_handler) = self.event_handler.as_ref() + && event_handler.has_payload_attestation_message_subscribers() + { + let msg = verified.payload_attestation_message(); + event_handler.register(EventKind::PayloadAttestationMessage(Box::new( + ForkVersionedResponse { + version: self.spec.fork_name_at_slot::(msg.data.slot), + metadata: Default::default(), + data: msg.clone(), + }, + ))); + } + }, + ) } } diff --git a/beacon_node/beacon_chain/src/payload_bid_verification/gossip_verified_bid.rs b/beacon_node/beacon_chain/src/payload_bid_verification/gossip_verified_bid.rs index 91945896df..1f3f074598 100644 --- a/beacon_node/beacon_chain/src/payload_bid_verification/gossip_verified_bid.rs +++ b/beacon_node/beacon_chain/src/payload_bid_verification/gossip_verified_bid.rs @@ -6,6 +6,7 @@ use crate::{ proposer_preferences_verification::proposer_preference_cache::GossipVerifiedProposerPreferenceCache, }; use educe::Educe; +use eth2::types::{EventKind, ForkVersionedResponse}; use slot_clock::SlotClock; use state_processing::signature_sets::{ execution_payload_bid_signature_set, get_builder_pubkey_from_state, @@ -233,6 +234,19 @@ impl BeaconChain { %parent_block_root, "Successfully verified gossip payload bid" ); + + if let Some(event_handler) = self.event_handler.as_ref() + && event_handler.has_execution_payload_bid_subscribers() + { + event_handler.register(EventKind::ExecutionPayloadBid(Box::new( + ForkVersionedResponse { + version: self.spec.fork_name_at_slot::(slot), + metadata: Default::default(), + data: (*verified.signed_bid).clone(), + }, + ))); + } + Ok(verified) } Err(e) => { diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs index be02f5b7dd..09a290deca 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs @@ -94,6 +94,7 @@ impl GossipVerifiedEnvelope { payload.block_hash, signed_envelope, vec![], + None, chain.spec.clone(), )), import_data: EnvelopeImportData { diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs index 5a6d3a1b7d..b40e8337fb 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use std::time::Duration; -use eth2::types::{EventKind, SseExecutionPayload}; +use eth2::types::{EventKind, SseExecutionPayload, SseExecutionPayloadAvailable}; use fork_choice::PayloadVerificationStatus; use slot_clock::SlotClock; use store::StoreOp; @@ -182,6 +182,7 @@ impl BeaconChain { signed_envelope, import_data, payload_verification_outcome, + self.spec.clone(), )) } @@ -362,5 +363,18 @@ impl BeaconChain { execution_optimistic: payload_verification_status.is_optimistic(), })); } + + // TODO(gloas): once the DA checker handles envelopes, this event should also be + // emitted from the DA resolution path (similar to `process_availability` for blocks). + if let Some(event_handler) = self.event_handler.as_ref() + && event_handler.has_execution_payload_available_subscribers() + { + event_handler.register(EventKind::ExecutionPayloadAvailable( + SseExecutionPayloadAvailable { + slot: envelope_slot, + block_root, + }, + )); + } } } diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs index afd8ee25f9..5a8382e25f 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs @@ -64,13 +64,14 @@ impl AvailableEnvelope { execution_block_hash: ExecutionBlockHash, envelope: Arc>, columns: DataColumnSidecarList, + columns_available_timestamp: Option, spec: Arc, ) -> Self { Self { execution_block_hash, envelope, columns, - columns_available_timestamp: None, + columns_available_timestamp, spec, } } @@ -119,9 +120,10 @@ pub struct EnvelopeProcessingSnapshot { /// fully available. /// 2. `AvailabilityPending`: This envelope hasn't received all required blobs to consider it /// fully available. +#[allow(dead_code)] pub enum ExecutedEnvelope { Available(AvailableExecutedEnvelope), - // TODO(gloas) implement availability pending + // TODO(gloas): check data column availability via DA checker AvailabilityPending(), } @@ -130,6 +132,7 @@ impl ExecutedEnvelope { envelope: MaybeAvailableEnvelope, import_data: EnvelopeImportData, payload_verification_outcome: PayloadVerificationOutcome, + spec: Arc, ) -> Self { match envelope { MaybeAvailableEnvelope::Available(available_envelope) => { @@ -139,11 +142,15 @@ impl ExecutedEnvelope { payload_verification_outcome, )) } - // TODO(gloas) implement availability pending + // TODO(gloas): check data column availability via DA checker MaybeAvailableEnvelope::AvailabilityPending { - block_hash: _, - envelope: _, - } => Self::AvailabilityPending(), + block_hash, + envelope, + } => Self::Available(AvailableExecutedEnvelope::new( + AvailableEnvelope::new(block_hash, envelope, vec![], None, spec), + import_data, + payload_verification_outcome, + )), } } } diff --git a/beacon_node/beacon_chain/src/pending_payload_envelopes.rs b/beacon_node/beacon_chain/src/pending_payload_envelopes.rs index 519248410e..293553ef54 100644 --- a/beacon_node/beacon_chain/src/pending_payload_envelopes.rs +++ b/beacon_node/beacon_chain/src/pending_payload_envelopes.rs @@ -6,11 +6,11 @@ //! and publishes the payload. use std::collections::HashMap; -use types::{BlobsList, EthSpec, ExecutionPayloadEnvelope, KzgProofs, Slot}; +use types::{BlobsList, EthSpec, ExecutionPayloadEnvelope, Slot}; pub struct PendingEnvelopeData { pub envelope: ExecutionPayloadEnvelope, - pub blobs_and_proofs: Option<(BlobsList, KzgProofs)>, + pub blobs: Option>, } /// Cache for pending execution payload envelopes awaiting publishing. @@ -44,6 +44,7 @@ impl PendingPayloadEnvelopes { /// Insert a pending envelope into the cache. pub fn insert(&mut self, slot: Slot, data: PendingEnvelopeData) { + // TODO(gloas): we may want to check for duplicates here, which shouldn't be allowed self.envelopes.insert(slot, data); } @@ -53,10 +54,8 @@ impl PendingPayloadEnvelopes { } /// Remove and return the blobs and proofs for a slot, leaving the envelope in place. - pub fn take_blobs(&mut self, slot: Slot) -> Option<(BlobsList, KzgProofs)> { - self.envelopes - .get_mut(&slot) - .and_then(|d| d.blobs_and_proofs.take()) + pub fn take_blobs(&mut self, slot: Slot) -> Option> { + self.envelopes.get_mut(&slot).and_then(|d| d.blobs.take()) } /// Remove and return a pending envelope by slot. @@ -92,7 +91,7 @@ impl PendingPayloadEnvelopes { #[cfg(test)] mod tests { use super::*; - use types::{ExecutionPayloadGloas, ExecutionRequests, Hash256, KzgProofs, MainnetEthSpec}; + use types::{ExecutionPayloadGloas, ExecutionRequests, Hash256, MainnetEthSpec}; type E = MainnetEthSpec; @@ -107,7 +106,7 @@ mod tests { builder_index: 0, beacon_block_root: Hash256::ZERO, }, - blobs_and_proofs: None, + blobs: None, } } @@ -150,10 +149,9 @@ mod tests { let slot = Slot::new(1); let blobs = BlobsList::::default(); - let proofs = KzgProofs::::default(); let data = PendingEnvelopeData { envelope: make_envelope(slot).envelope, - blobs_and_proofs: Some((blobs, proofs)), + blobs: Some(blobs), }; cache.insert(slot, data); diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 610897e8d9..f67b5015c5 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -86,6 +86,8 @@ pub const FORK_NAME_ENV_VAR: &str = "FORK_NAME"; // `beacon_node/execution_layer/src/test_utils/fixtures/mainnet/test_blobs_bundle.ssz` pub const TEST_DATA_COLUMN_SIDECARS_SSZ: &[u8] = include_bytes!("test_utils/fixtures/test_data_column_sidecars.ssz"); +pub const TEST_DATA_COLUMN_SIDECARS_GLOAS_SSZ: &[u8] = + include_bytes!("test_utils/fixtures/test_data_column_sidecars_gloas.ssz"); // Default target aggregators to set during testing, this ensures an aggregator at each slot. // @@ -3806,7 +3808,7 @@ pub fn generate_data_column_sidecars_from_block( let signed_block_header = block.signed_block_header(); let template_data_columns = RuntimeVariableList::>::from_ssz_bytes( - TEST_DATA_COLUMN_SIDECARS_SSZ, + TEST_DATA_COLUMN_SIDECARS_GLOAS_SSZ, E::number_of_columns(), ) .unwrap(); diff --git a/beacon_node/beacon_chain/src/test_utils/fixtures/test_data_column_sidecars_gloas.ssz b/beacon_node/beacon_chain/src/test_utils/fixtures/test_data_column_sidecars_gloas.ssz new file mode 100644 index 0000000000..554b27844b Binary files /dev/null and b/beacon_node/beacon_chain/src/test_utils/fixtures/test_data_column_sidecars_gloas.ssz differ diff --git a/beacon_node/beacon_chain/tests/column_verification.rs b/beacon_node/beacon_chain/tests/column_verification.rs index 4d7421a93d..f66cf8e5dc 100644 --- a/beacon_node/beacon_chain/tests/column_verification.rs +++ b/beacon_node/beacon_chain/tests/column_verification.rs @@ -119,16 +119,10 @@ async fn rpc_columns_with_invalid_header_signature() { /// data columns can be built from those cached blobs. #[tokio::test] async fn gloas_envelope_blobs_produce_valid_columns() { - // TODO(gloas): Need a Gloas-format test_data_column_sidecars.ssz fixture before this test - // can run. The current fixture is Fulu-format and can't be decoded as DataColumnSidecarGloas. - // See beacon_node/beacon_chain/src/test_utils/fixtures/test_data_column_sidecars.ssz let spec = Arc::new(test_spec::()); if !spec.is_gloas_scheduled() { return; } - return; - - #[allow(unreachable_code)] let harness = get_harness(VALIDATOR_COUNT, spec.clone(), NodeCustodyType::Supernode); harness.execution_block_generator().set_min_blob_count(1); @@ -147,7 +141,7 @@ async fn gloas_envelope_blobs_produce_valid_columns() { // Produce a Gloas block via the harness. This caches envelope + blobs. let state = harness.get_current_state(); - let (block_contents, opt_envelope, post_state) = + let (block_contents, opt_envelope, _post_state) = harness.make_block_with_envelope(state, slot).await; let signed_block = &block_contents.0; @@ -187,42 +181,9 @@ async fn gloas_envelope_blobs_produce_valid_columns() { assert_eq!(gloas_col.slot, slot); } - // Process the block (without blobs so it's pending availability). - let block_root = signed_block.canonical_root(); - let availability = harness - .chain - .process_block( - block_root, - LookupBlock::new(signed_block.clone()), - NotifyExecutionLayer::Yes, - BlockImportSource::Lookup, - || Ok(()), - ) - .await - .unwrap(); - assert_eq!( - availability, - AvailabilityProcessingStatus::MissingComponents(slot, block_root), - "Block should be pending availability without columns" - ); - - // Process the envelope. - let envelope = opt_envelope.unwrap(); - harness - .process_envelope(block_root, envelope, &post_state, signed_block.state_root()) - .await; - - // Supply columns via RPC to make the block available. - let status = harness - .chain - .process_rpc_custody_columns(data_column_sidecars) - .await - .unwrap(); - assert_eq!( - status, - AvailabilityProcessingStatus::Imported(block_root), - "Block should be imported after supplying data columns" - ); + // End-to-end DA flow (process_block → process_envelope → process_rpc_custody_columns) + // is not exercised here: Gloas blocks are not gated on columns at block-import time + // and the envelope/column gating belongs in a dedicated test once the DA path matures. } // Regression test for verify_header_signature bug: it uses head_fork() which is wrong for fork blocks diff --git a/beacon_node/beacon_chain/tests/events.rs b/beacon_node/beacon_chain/tests/events.rs index 5305965f0f..e943514c4e 100644 --- a/beacon_node/beacon_chain/tests/events.rs +++ b/beacon_node/beacon_chain/tests/events.rs @@ -10,8 +10,8 @@ use std::sync::Arc; use types::data::FixedBlobSidecarList; use types::test_utils::TestRandom; use types::{ - BlobSidecar, DataColumnSidecar, DataColumnSidecarFulu, DataColumnSidecarGloas, EthSpec, - MinimalEthSpec, Slot, + BlobSidecar, DataColumnSidecar, DataColumnSidecarFulu, DataColumnSidecarGloas, Domain, EthSpec, + MinimalEthSpec, PayloadAttestationData, PayloadAttestationMessage, SignedRoot, Slot, }; type E = MinimalEthSpec; @@ -258,3 +258,177 @@ async fn head_event_on_block_import() { panic!("Expected Head event, got {:?}", head_event); } } + +/// Verifies that `execution_payload_gossip` fires at gossip verification time, and +/// `execution_payload` + `execution_payload_available` fire at import time. +#[tokio::test] +async fn execution_payload_envelope_events() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + + let harness = BeaconChainHarness::builder(E::default()) + .default_spec() + .deterministic_keypairs(64) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + harness.extend_to_slot(Slot::new(1)).await; + + let state = harness.get_current_state(); + let target_slot = Slot::new(2); + harness.advance_slot(); + let (block_contents, opt_envelope, _new_state) = + harness.make_block_with_envelope(state, target_slot).await; + + let block_root = block_contents.0.canonical_root(); + + harness + .process_block(target_slot, block_root, block_contents) + .await + .expect("block should be processed"); + + let signed_envelope = opt_envelope.expect("Gloas block should produce an envelope"); + + let event_handler = harness.chain.event_handler.as_ref().unwrap(); + let mut gossip_receiver = event_handler.subscribe_execution_payload_gossip(); + let mut payload_receiver = event_handler.subscribe_execution_payload(); + let mut available_receiver = event_handler.subscribe_execution_payload_available(); + + // Stage 1: gossip verification fires execution_payload_gossip only. + let gossip_verified = harness + .chain + .verify_envelope_for_gossip(Arc::new(signed_envelope)) + .await + .expect("envelope gossip verification should succeed"); + + let gossip_event = gossip_receiver + .try_recv() + .expect("should receive execution_payload_gossip after gossip verification"); + if let EventKind::ExecutionPayloadGossip(sse) = gossip_event { + assert_eq!(sse.slot, target_slot); + assert_eq!(sse.block_root, block_root); + } else { + panic!( + "Expected ExecutionPayloadGossip event, got {:?}", + gossip_event + ); + } + assert!(payload_receiver.try_recv().is_err()); + assert!(available_receiver.try_recv().is_err()); + + // Stage 2: import fires execution_payload and execution_payload_available. + harness + .chain + .process_execution_payload_envelope( + block_root, + gossip_verified, + beacon_chain::NotifyExecutionLayer::Yes, + types::BlockImportSource::Gossip, + #[allow(clippy::result_large_err)] + || Ok(()), + ) + .await + .expect("envelope import should succeed"); + + let payload_event = payload_receiver + .try_recv() + .expect("should receive execution_payload after import"); + if let EventKind::ExecutionPayload(sse) = payload_event { + assert_eq!(sse.slot, target_slot); + assert_eq!(sse.block_root, block_root); + } else { + panic!("Expected ExecutionPayload event, got {:?}", payload_event); + } + + let available_event = available_receiver + .try_recv() + .expect("should receive execution_payload_available after import"); + if let EventKind::ExecutionPayloadAvailable(sse) = available_event { + assert_eq!(sse.slot, target_slot); + assert_eq!(sse.block_root, block_root); + } else { + panic!( + "Expected ExecutionPayloadAvailable event, got {:?}", + available_event + ); + } + + assert!( + gossip_receiver.try_recv().is_err(), + "no extra gossip events should fire during import" + ); +} + +/// Verifies that a `payload_attestation_message` event is emitted when a payload attestation +/// message passes gossip verification. +#[tokio::test] +async fn payload_attestation_message_event_on_gossip_verification() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + + let harness = BeaconChainHarness::builder(E::default()) + .default_spec() + .deterministic_keypairs(64) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + // Advance chain to have a valid head block. + let target_slot = Slot::new(1); + harness.extend_to_slot(target_slot).await; + + let head = harness.chain.canonical_head.cached_head(); + let head_state = &head.snapshot.beacon_state; + + // Get a PTC member for this slot. + let ptc = head_state + .get_ptc(target_slot, &harness.spec) + .expect("should get PTC"); + let validator_index = *ptc.0.first().expect("PTC should have at least one member") as u64; + + // Sign a payload attestation. + let target_epoch = target_slot.epoch(E::slots_per_epoch()); + let domain = harness.spec.get_domain( + target_epoch, + Domain::PTCAttester, + &head_state.fork(), + head_state.genesis_validators_root(), + ); + let data = PayloadAttestationData { + beacon_block_root: head.head_block_root(), + slot: target_slot, + payload_present: true, + blob_data_available: true, + }; + let message = data.signing_root(domain); + let signature = harness.validator_keypairs[validator_index as usize] + .sk + .sign(message); + let msg = PayloadAttestationMessage { + validator_index, + data: data.clone(), + signature: signature.clone(), + }; + + // Subscribe before verification. + let event_handler = harness.chain.event_handler.as_ref().unwrap(); + let mut receiver = event_handler.subscribe_payload_attestation_message(); + + // Verify the attestation through the gossip path. + harness + .chain + .verify_payload_attestation_message_for_gossip(msg) + .expect("verification should succeed"); + + // Assert the event was emitted. + let event = receiver.try_recv().expect("should receive event"); + if let EventKind::PayloadAttestationMessage(versioned) = event { + assert_eq!(versioned.data.validator_index, validator_index); + assert_eq!(versioned.data.data, data); + } else { + panic!("Expected PayloadAttestationMessage event, got {:?}", event); + } +} diff --git a/beacon_node/beacon_chain/tests/prepare_payload.rs b/beacon_node/beacon_chain/tests/prepare_payload.rs index a1798909a1..1d23990b80 100644 --- a/beacon_node/beacon_chain/tests/prepare_payload.rs +++ b/beacon_node/beacon_chain/tests/prepare_payload.rs @@ -652,26 +652,22 @@ async fn gloas_block_production_caches_blobs_for_column_publishing() { ); // Take the blobs from the cache — this is what publish_execution_payload_envelope does. - let blobs_and_proofs = harness + let blobs = harness .chain .pending_payload_envelopes .write() .take_blobs(slot); assert!( - blobs_and_proofs.is_some(), - "Blobs and proofs should be cached alongside the envelope" + blobs.is_some(), + "Blobs should be cached alongside the envelope" ); - let (blobs, kzg_proofs) = blobs_and_proofs.unwrap(); + let blobs = blobs.unwrap(); assert!( !blobs.is_empty(), "Blobs should be non-empty when min_blob_count >= 1" ); - assert!( - !kzg_proofs.is_empty(), - "KZG proofs should be non-empty when blobs are present" - ); // Verify take_blobs is consume-once. let second_take = harness diff --git a/beacon_node/http_api/src/beacon/execution_payload_envelope.rs b/beacon_node/http_api/src/beacon/execution_payload_envelope.rs index c3d3062917..ad0c46c6ad 100644 --- a/beacon_node/http_api/src/beacon/execution_payload_envelope.rs +++ b/beacon_node/http_api/src/beacon/execution_payload_envelope.rs @@ -7,9 +7,7 @@ use crate::version::{ execution_optimistic_finalized_beacon_response, }; use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; -use beacon_chain::payload_envelope_verification::EnvelopeError; -use beacon_chain::payload_envelope_verification::gossip_verified_envelope::GossipVerifiedEnvelope; -use beacon_chain::{BeaconChain, BeaconChainTypes, NotifyExecutionLayer}; +use beacon_chain::{BeaconChain, BeaconChainTypes}; use bytes::Bytes; use eth2::types as api_types; use eth2::{CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER}; @@ -21,7 +19,7 @@ use std::future::Future; use std::sync::Arc; use tokio::sync::mpsc::UnboundedSender; use tracing::{debug, error, info, warn}; -use types::{BlockImportSource, EthSpec, SignedBeaconBlock, SignedExecutionPayloadEnvelope}; +use types::{EthSpec, SignedExecutionPayloadEnvelope}; use warp::{ Filter, Rejection, Reply, hyper::{Body, Response}, @@ -91,7 +89,9 @@ pub(crate) fn post_beacon_execution_payload_envelope( ) .boxed() } -/// Publishes a signed execution payload envelope to the network. +/// Publishes a signed execution payload envelope to the network. Implements +/// `POST /eth/v1/beacon/execution_payload_envelope` per the in-flight beacon-APIs PR +/// . pub async fn publish_execution_payload_envelope( envelope: SignedExecutionPayloadEnvelope, chain: Arc>, @@ -111,20 +111,30 @@ pub async fn publish_execution_payload_envelope( let blobs_and_proofs = chain.pending_payload_envelopes.write().take_blobs(slot); - // The publish_fn is called inside process_execution_payload_envelope after consensus - // verification but before the EL call. - let envelope_for_publish = signed_envelope.clone(); - let sender = network_tx.clone(); - let publish_fn = move || { - info!( - %slot, - %beacon_block_root, - builder_index, - "Publishing signed execution payload envelope to network" - ); - crate::utils::publish_pubsub_message( - &sender, - PubsubMessage::ExecutionPayload(Box::new((*envelope_for_publish).clone())), + // Spawn the column-build task (CPU-bound KZG cell-and-proof computation) before + // publishing the envelope so it runs in parallel with envelope gossip, narrowing + // the window in which peers see envelope-without-columns. If envelope publication + // fails below, dropping this future drops the spawned `JoinHandle` (the running + // closure on the blocking pool finishes and is then discarded — no work cancellation). + let column_build_future = match blobs_and_proofs { + Some(blobs) if !blobs.is_empty() => Some(spawn_build_gloas_data_columns_task( + &chain, + beacon_block_root, + slot, + blobs, + )?), + _ => None, + }; + + // Publish the envelope to the network. + crate::utils::publish_pubsub_message( + network_tx, + PubsubMessage::ExecutionPayload(Box::new(envelope)), + ) + .map_err(|_| { + warn!(%slot, "Failed to publish execution payload envelope to network"); + warp_utils::reject::custom_server_error( + "Unable to publish execution payload envelope to network".into(), ) .map_err(|_| { warn!(%slot, "Failed to publish execution payload envelope to network"); @@ -205,34 +215,81 @@ pub async fn publish_execution_payload_envelope( } } + // From here on the envelope is on the wire. `take_blobs` already consumed the cache + // entry, so a retry would not republish columns; returning Err would mislead the + // caller. Log column-build/publish failures and fall through to `Ok`. + if let Some(column_build_future) = column_build_future { + let gossip_verified_columns = match column_build_future.await { + Ok(columns) => columns, + Err(e) => { + error!( + %slot, + error = ?e, + "Failed to build data columns after envelope publication" + ); + return Ok(warp::reply().into_response()); + } + }; + + if !gossip_verified_columns.is_empty() { + if let Err(e) = publish_column_sidecars(network_tx, &gossip_verified_columns, &chain) { + error!( + %slot, + error = ?e, + "Failed to publish data column sidecars after envelope publication" + ); + return Ok(warp::reply().into_response()); + } + + let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); + let sampling_column_indices = chain.sampling_columns_for_epoch(epoch); + let sampling_columns = gossip_verified_columns + .into_iter() + .filter(|col| sampling_column_indices.contains(&col.index())) + .collect::>(); + + // Local processing only — envelope already broadcast, so log and fall through. + if !sampling_columns.is_empty() + && let Err(e) = + Box::pin(chain.process_gossip_data_columns(sampling_columns, || Ok(()))).await + { + error!( + %slot, + error = ?e, + "Failed to process sampling data columns during envelope publication" + ); + } + } + } + Ok(warp::reply().into_response()) } fn spawn_build_gloas_data_columns_task( - chain: Arc>, + chain: &Arc>, beacon_block_root: types::Hash256, - block: Arc>, slot: types::Slot, blobs: types::BlobsList, ) -> Result>, Rejection>>, Rejection> { - chain - .clone() + let chain_for_build = chain.clone(); + let handle = chain .task_executor .spawn_blocking_handle( - move || build_gloas_data_columns(&chain, beacon_block_root, block, slot, &blobs), + move || build_gloas_data_columns(&chain_for_build, beacon_block_root, slot, &blobs), "build_gloas_data_columns", ) - .ok_or_else(|| warp_utils::reject::custom_server_error("runtime shutdown".to_string())) - .map(|r| { - r.map_err(|_| warp_utils::reject::custom_server_error("join error".to_string())) - .and_then(|output| async move { output }) - }) + .ok_or_else(|| warp_utils::reject::custom_server_error("runtime shutdown".to_string()))?; + + Ok(async move { + handle + .await + .map_err(|_| warp_utils::reject::custom_server_error("join error".to_string()))? + }) } fn build_gloas_data_columns( chain: &BeaconChain, beacon_block_root: types::Hash256, - block: Arc>, slot: types::Slot, blobs: &types::BlobsList, ) -> Result>, Rejection> { @@ -257,7 +314,7 @@ fn build_gloas_data_columns( .into_iter() .filter_map(|col| { let index = *col.index(); - match GossipVerifiedDataColumn::new_for_block_publishing(col, &block, chain) { + match GossipVerifiedDataColumn::new_for_block_publishing(col, chain) { Ok(verified) => Some(verified), Err(GossipDataColumnError::PriorKnownUnpublished) => None, Err(e) => { diff --git a/validator_manager/Cargo.toml b/validator_manager/Cargo.toml index d0155698b4..7dabd5445c 100644 --- a/validator_manager/Cargo.toml +++ b/validator_manager/Cargo.toml @@ -11,7 +11,7 @@ clap = { workspace = true } clap_utils = { workspace = true } educe = { workspace = true } environment = { workspace = true } -eth2 = { workspace = true } +eth2 = { workspace = true, features = ["lighthouse"] } eth2_network_config = { workspace = true } eth2_wallet = { workspace = true } ethereum_serde_utils = { workspace = true }