resolve merge conflicts

This commit is contained in:
Eitan Seri-Levi
2026-04-28 22:20:41 +02:00
14 changed files with 567 additions and 140 deletions

View File

@@ -2,16 +2,17 @@ use std::collections::{HashMap, HashSet};
use std::marker::PhantomData; use std::marker::PhantomData;
use std::sync::Arc; use std::sync::Arc;
use bls::Signature; use bls::{PublicKeyBytes, Signature};
use execution_layer::{ use execution_layer::{
BlockProposalContentsGloas, BuilderParams, PayloadAttributes, PayloadParameters, BlockProposalContentsGloas, BuilderParams, PayloadAttributes, PayloadParameters,
}; };
use fork_choice::PayloadStatus; use fork_choice::PayloadStatus;
use operation_pool::CompactAttestationRef; use operation_pool::CompactAttestationRef;
use ssz::Encode; use ssz::Encode;
use state_processing::common::get_attesting_indices_from_state; use state_processing::common::{get_attesting_indices_from_state, get_indexed_payload_attestation};
use state_processing::envelope_processing::verify_execution_payload_envelope; use state_processing::envelope_processing::verify_execution_payload_envelope;
use state_processing::epoch_cache::initialize_epoch_cache; use state_processing::epoch_cache::initialize_epoch_cache;
use state_processing::per_block_processing::is_valid_indexed_payload_attestation;
use state_processing::per_block_processing::{ use state_processing::per_block_processing::{
apply_parent_execution_payload, compute_timestamp_at_slot, get_expected_withdrawals, apply_parent_execution_payload, compute_timestamp_at_slot, get_expected_withdrawals,
verify_attestation_for_block_inclusion, verify_attestation_for_block_inclusion,
@@ -27,7 +28,7 @@ use types::consts::gloas::BUILDER_INDEX_SELF_BUILD;
use types::{ use types::{
Address, Attestation, AttestationElectra, AttesterSlashing, AttesterSlashingElectra, Address, Attestation, AttestationElectra, AttesterSlashing, AttesterSlashingElectra,
BeaconBlock, BeaconBlockBodyGloas, BeaconBlockGloas, BeaconState, BeaconStateError, BeaconBlock, BeaconBlockBodyGloas, BeaconBlockGloas, BeaconState, BeaconStateError,
BuilderIndex, Deposit, Eth1Data, EthSpec, ExecutionBlockHash, ExecutionPayloadBid, BuilderIndex, ChainSpec, Deposit, Eth1Data, EthSpec, ExecutionBlockHash, ExecutionPayloadBid,
ExecutionPayloadEnvelope, ExecutionPayloadGloas, ExecutionRequests, FullPayload, Graffiti, ExecutionPayloadEnvelope, ExecutionPayloadGloas, ExecutionRequests, FullPayload, Graffiti,
Hash256, PayloadAttestation, ProposerSlashing, RelativeEpoch, SignedBeaconBlock, Hash256, PayloadAttestation, ProposerSlashing, RelativeEpoch, SignedBeaconBlock,
SignedBlsToExecutionChange, SignedExecutionPayloadBid, SignedExecutionPayloadEnvelope, SignedBlsToExecutionChange, SignedExecutionPayloadBid, SignedExecutionPayloadEnvelope,
@@ -261,30 +262,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let (mut proposer_slashings, mut attester_slashings, mut voluntary_exits) = let (mut proposer_slashings, mut attester_slashings, mut voluntary_exits) =
self.op_pool.get_slashings_and_exits(&state, &self.spec); self.op_pool.get_slashings_and_exits(&state, &self.spec);
// Filter out voluntary exits that conflict with parent execution requests. filter_voluntary_exits_for_parent_execution_requests(
let mut exited_pubkeys = HashSet::with_capacity( &mut voluntary_exits,
parent_execution_requests.withdrawals.len() parent_execution_requests,
+ parent_execution_requests.consolidations.len(), |idx| state.validators().get(idx as usize).map(|v| v.pubkey),
&self.spec,
); );
for req in &parent_execution_requests.withdrawals {
if req.amount == self.spec.full_exit_request_amount {
exited_pubkeys.insert(req.validator_pubkey);
}
}
for req in &parent_execution_requests.consolidations {
if req.source_pubkey != req.target_pubkey {
exited_pubkeys.insert(req.source_pubkey);
}
}
if !exited_pubkeys.is_empty() {
voluntary_exits.retain(|exit| {
state
.validators()
.get(exit.message.validator_index as usize)
.map(|v| !exited_pubkeys.contains(&v.pubkey))
.unwrap_or(false)
});
}
drop(slashings_and_exits_span); drop(slashings_and_exits_span);
@@ -349,6 +332,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map_err(BlockProductionError::OpPoolError)? .map_err(BlockProductionError::OpPoolError)?
}; };
let mut payload_attestations = self
.op_pool
.get_payload_attestations(&state, parent_root, &self.spec)
.map_err(BlockProductionError::OpPoolError)?;
// If paranoid mode is enabled re-check the signatures of every included message. // If paranoid mode is enabled re-check the signatures of every included message.
// This will be a lot slower but guards against bugs in block production and can be // This will be a lot slower but guards against bugs in block production and can be
// quickly rolled out without a release. // quickly rolled out without a release.
@@ -373,6 +361,35 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.is_ok() .is_ok()
}); });
payload_attestations.retain(|att| {
match get_indexed_payload_attestation(&state, att, &self.spec) {
Ok(indexed) => is_valid_indexed_payload_attestation(
&state,
&indexed,
VerifySignatures::True,
&self.spec,
)
.map_err(|e| {
warn!(
err = ?e,
block_slot = %state.slot(),
?att,
"Attempted to include a payload attestation with invalid signature"
);
})
.is_ok(),
Err(e) => {
warn!(
err = ?e,
block_slot = %state.slot(),
?att,
"Failed to index payload attestation for verification"
);
false
}
}
});
proposer_slashings.retain(|slashing| { proposer_slashings.retain(|slashing| {
slashing slashing
.clone() .clone()
@@ -416,8 +433,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}) })
.is_ok() .is_ok()
}); });
// TODO(gloas) verify payload attestation signature here as well
} }
let attester_slashings = attester_slashings let attester_slashings = attester_slashings
@@ -464,10 +479,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
deposits, deposits,
voluntary_exits, voluntary_exits,
sync_aggregate, sync_aggregate,
payload_attestations: self payload_attestations,
.op_pool
.get_payload_attestations(&state, parent_root, &self.spec)
.map_err(BlockProductionError::OpPoolError)?,
bls_to_execution_changes, bls_to_execution_changes,
}, },
state, state,
@@ -637,12 +649,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let envelope_slot = payload_data.slot; let envelope_slot = payload_data.slot;
// TODO(gloas) might be safer to cache by root instead of by slot. // TODO(gloas) might be safer to cache by root instead of by slot.
// We should revisit this once this code path + beacon api spec matures // We should revisit this once this code path + beacon api spec matures
let blobs_and_proofs = payload_data.blobs_and_proofs; let (blobs, _) = payload_data.blobs_and_proofs;
self.pending_payload_envelopes.write().insert( self.pending_payload_envelopes.write().insert(
envelope_slot, envelope_slot,
PendingEnvelopeData { PendingEnvelopeData {
envelope: signed_envelope.message, envelope: signed_envelope.message,
blobs_and_proofs: Some(blobs_and_proofs), blobs: Some(blobs),
}, },
); );
@@ -964,3 +976,178 @@ where
Ok(block_contents) Ok(block_contents)
} }
/// Drop voluntary exits whose target validators will be exited by the parent envelope's
/// execution requests.
///
/// In Gloas the parent execution payload is processed before voluntary exits during block
/// processing. EL-triggered withdrawal-full-exit requests (EIP-7002) and cross-pubkey
/// consolidation requests (EIP-7251) call `initiate_validator_exit`, setting the target's
/// `exit_epoch`. A voluntary exit for the same validator would then fail with `AlreadyExited`.
fn filter_voluntary_exits_for_parent_execution_requests<E: EthSpec>(
voluntary_exits: &mut Vec<SignedVoluntaryExit>,
parent_execution_requests: &ExecutionRequests<E>,
pubkey_at_index: impl Fn(u64) -> Option<PublicKeyBytes>,
spec: &ChainSpec,
) {
let mut exited_pubkeys = HashSet::with_capacity(
parent_execution_requests.withdrawals.len()
+ parent_execution_requests.consolidations.len(),
);
for req in &parent_execution_requests.withdrawals {
if req.amount == spec.full_exit_request_amount {
exited_pubkeys.insert(req.validator_pubkey);
}
}
for req in &parent_execution_requests.consolidations {
if req.source_pubkey != req.target_pubkey {
exited_pubkeys.insert(req.source_pubkey);
}
}
if !exited_pubkeys.is_empty() {
voluntary_exits.retain(|exit| {
pubkey_at_index(exit.message.validator_index)
.map(|pk| !exited_pubkeys.contains(&pk))
.unwrap_or(false)
});
}
}
#[cfg(test)]
mod tests {
use super::*;
use ssz_types::VariableList;
use types::{ConsolidationRequest, Epoch, MainnetEthSpec, VoluntaryExit, WithdrawalRequest};
type TestSpec = MainnetEthSpec;
fn pubkey(byte: u8) -> PublicKeyBytes {
PublicKeyBytes::deserialize(&[byte; 48]).expect("valid pubkey byte length")
}
fn exit(validator_index: u64) -> SignedVoluntaryExit {
SignedVoluntaryExit {
message: VoluntaryExit {
epoch: Epoch::new(0),
validator_index,
},
signature: Signature::empty(),
}
}
fn requests(
withdrawals: Vec<WithdrawalRequest>,
consolidations: Vec<ConsolidationRequest>,
) -> ExecutionRequests<TestSpec> {
ExecutionRequests {
deposits: VariableList::empty(),
withdrawals: VariableList::new(withdrawals).unwrap(),
consolidations: VariableList::new(consolidations).unwrap(),
}
}
fn run_filter(
exits: &mut Vec<SignedVoluntaryExit>,
requests: &ExecutionRequests<TestSpec>,
validator_pubkeys: &[PublicKeyBytes],
spec: &ChainSpec,
) {
filter_voluntary_exits_for_parent_execution_requests(
exits,
requests,
|idx| validator_pubkeys.get(idx as usize).copied(),
spec,
);
}
#[test]
fn full_exit_withdrawal_request_filters_matching_voluntary_exit() {
let spec = ChainSpec::mainnet();
let validators = vec![pubkey(1), pubkey(2)];
let mut exits = vec![exit(0), exit(1)];
let reqs = requests(
vec![WithdrawalRequest {
source_address: Address::repeat_byte(0xaa),
validator_pubkey: validators[0],
amount: spec.full_exit_request_amount,
}],
vec![],
);
run_filter(&mut exits, &reqs, &validators, &spec);
assert_eq!(exits.len(), 1);
assert_eq!(exits[0].message.validator_index, 1);
}
#[test]
fn partial_withdrawal_request_does_not_filter_voluntary_exit() {
let spec = ChainSpec::mainnet();
let validators = vec![pubkey(1)];
let mut exits = vec![exit(0)];
let reqs = requests(
vec![WithdrawalRequest {
source_address: Address::repeat_byte(0xaa),
validator_pubkey: validators[0],
amount: spec.full_exit_request_amount + 1,
}],
vec![],
);
run_filter(&mut exits, &reqs, &validators, &spec);
assert_eq!(exits.len(), 1);
}
#[test]
fn cross_pubkey_consolidation_filters_voluntary_exit_for_source_only() {
let spec = ChainSpec::mainnet();
let validators = vec![pubkey(1), pubkey(2), pubkey(3)];
let mut exits = vec![exit(0), exit(1), exit(2)];
let reqs = requests(
vec![],
vec![ConsolidationRequest {
source_address: Address::repeat_byte(0xaa),
source_pubkey: validators[1],
target_pubkey: validators[2],
}],
);
run_filter(&mut exits, &reqs, &validators, &spec);
// The source (validator 1) is exited; the target (validator 2) is not.
let remaining: Vec<u64> = exits.iter().map(|e| e.message.validator_index).collect();
assert_eq!(remaining, vec![0, 2]);
}
#[test]
fn self_consolidation_does_not_filter_voluntary_exit() {
let spec = ChainSpec::mainnet();
let validators = vec![pubkey(1)];
let mut exits = vec![exit(0)];
let reqs = requests(
vec![],
vec![ConsolidationRequest {
source_address: Address::repeat_byte(0xaa),
source_pubkey: validators[0],
target_pubkey: validators[0],
}],
);
run_filter(&mut exits, &reqs, &validators, &spec);
assert_eq!(exits.len(), 1);
}
#[test]
fn empty_parent_requests_preserve_voluntary_exits() {
let spec = ChainSpec::mainnet();
let validators = vec![pubkey(1), pubkey(2)];
let mut exits = vec![exit(0), exit(1)];
let reqs = requests(vec![], vec![]);
run_filter(&mut exits, &reqs, &validators, &spec);
assert_eq!(exits.len(), 2);
}
}

View File

@@ -6,6 +6,7 @@ use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes, metrics}; use crate::{BeaconChain, BeaconChainError, BeaconChainTypes, metrics};
use bls::AggregateSignature; use bls::AggregateSignature;
use educe::Educe; use educe::Educe;
use eth2::types::{EventKind, ForkVersionedResponse};
use parking_lot::RwLock; use parking_lot::RwLock;
use safe_arith::SafeArith; use safe_arith::SafeArith;
use slot_clock::SlotClock; use slot_clock::SlotClock;
@@ -216,9 +217,24 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let _timer = metrics::start_timer(&metrics::PAYLOAD_ATTESTATION_GOSSIP_VERIFICATION_TIMES); let _timer = metrics::start_timer(&metrics::PAYLOAD_ATTESTATION_GOSSIP_VERIFICATION_TIMES);
let ctx = self.payload_attestation_gossip_context(); let ctx = self.payload_attestation_gossip_context();
VerifiedPayloadAttestationMessage::new(payload_attestation_message, &ctx).inspect(|_| { VerifiedPayloadAttestationMessage::new(payload_attestation_message, &ctx).inspect(
metrics::inc_counter(&metrics::PAYLOAD_ATTESTATION_PROCESSING_SUCCESSES); |verified| {
}) metrics::inc_counter(&metrics::PAYLOAD_ATTESTATION_PROCESSING_SUCCESSES);
if let Some(event_handler) = self.event_handler.as_ref()
&& event_handler.has_payload_attestation_message_subscribers()
{
let msg = verified.payload_attestation_message();
event_handler.register(EventKind::PayloadAttestationMessage(Box::new(
ForkVersionedResponse {
version: self.spec.fork_name_at_slot::<T::EthSpec>(msg.data.slot),
metadata: Default::default(),
data: msg.clone(),
},
)));
}
},
)
} }
} }

View File

@@ -6,6 +6,7 @@ use crate::{
proposer_preferences_verification::proposer_preference_cache::GossipVerifiedProposerPreferenceCache, proposer_preferences_verification::proposer_preference_cache::GossipVerifiedProposerPreferenceCache,
}; };
use educe::Educe; use educe::Educe;
use eth2::types::{EventKind, ForkVersionedResponse};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use state_processing::signature_sets::{ use state_processing::signature_sets::{
execution_payload_bid_signature_set, get_builder_pubkey_from_state, execution_payload_bid_signature_set, get_builder_pubkey_from_state,
@@ -233,6 +234,19 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
%parent_block_root, %parent_block_root,
"Successfully verified gossip payload bid" "Successfully verified gossip payload bid"
); );
if let Some(event_handler) = self.event_handler.as_ref()
&& event_handler.has_execution_payload_bid_subscribers()
{
event_handler.register(EventKind::ExecutionPayloadBid(Box::new(
ForkVersionedResponse {
version: self.spec.fork_name_at_slot::<T::EthSpec>(slot),
metadata: Default::default(),
data: (*verified.signed_bid).clone(),
},
)));
}
Ok(verified) Ok(verified)
} }
Err(e) => { Err(e) => {

View File

@@ -94,6 +94,7 @@ impl<T: BeaconChainTypes> GossipVerifiedEnvelope<T> {
payload.block_hash, payload.block_hash,
signed_envelope, signed_envelope,
vec![], vec![],
None,
chain.spec.clone(), chain.spec.clone(),
)), )),
import_data: EnvelopeImportData { import_data: EnvelopeImportData {

View File

@@ -1,7 +1,7 @@
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use eth2::types::{EventKind, SseExecutionPayload}; use eth2::types::{EventKind, SseExecutionPayload, SseExecutionPayloadAvailable};
use fork_choice::PayloadVerificationStatus; use fork_choice::PayloadVerificationStatus;
use slot_clock::SlotClock; use slot_clock::SlotClock;
use store::StoreOp; use store::StoreOp;
@@ -182,6 +182,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
signed_envelope, signed_envelope,
import_data, import_data,
payload_verification_outcome, payload_verification_outcome,
self.spec.clone(),
)) ))
} }
@@ -362,5 +363,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
execution_optimistic: payload_verification_status.is_optimistic(), execution_optimistic: payload_verification_status.is_optimistic(),
})); }));
} }
// TODO(gloas): once the DA checker handles envelopes, this event should also be
// emitted from the DA resolution path (similar to `process_availability` for blocks).
if let Some(event_handler) = self.event_handler.as_ref()
&& event_handler.has_execution_payload_available_subscribers()
{
event_handler.register(EventKind::ExecutionPayloadAvailable(
SseExecutionPayloadAvailable {
slot: envelope_slot,
block_root,
},
));
}
} }
} }

View File

@@ -64,13 +64,14 @@ impl<E: EthSpec> AvailableEnvelope<E> {
execution_block_hash: ExecutionBlockHash, execution_block_hash: ExecutionBlockHash,
envelope: Arc<SignedExecutionPayloadEnvelope<E>>, envelope: Arc<SignedExecutionPayloadEnvelope<E>>,
columns: DataColumnSidecarList<E>, columns: DataColumnSidecarList<E>,
columns_available_timestamp: Option<std::time::Duration>,
spec: Arc<ChainSpec>, spec: Arc<ChainSpec>,
) -> Self { ) -> Self {
Self { Self {
execution_block_hash, execution_block_hash,
envelope, envelope,
columns, columns,
columns_available_timestamp: None, columns_available_timestamp,
spec, spec,
} }
} }
@@ -119,9 +120,10 @@ pub struct EnvelopeProcessingSnapshot<E: EthSpec> {
/// fully available. /// fully available.
/// 2. `AvailabilityPending`: This envelope hasn't received all required blobs to consider it /// 2. `AvailabilityPending`: This envelope hasn't received all required blobs to consider it
/// fully available. /// fully available.
#[allow(dead_code)]
pub enum ExecutedEnvelope<E: EthSpec> { pub enum ExecutedEnvelope<E: EthSpec> {
Available(AvailableExecutedEnvelope<E>), Available(AvailableExecutedEnvelope<E>),
// TODO(gloas) implement availability pending // TODO(gloas): check data column availability via DA checker
AvailabilityPending(), AvailabilityPending(),
} }
@@ -130,6 +132,7 @@ impl<E: EthSpec> ExecutedEnvelope<E> {
envelope: MaybeAvailableEnvelope<E>, envelope: MaybeAvailableEnvelope<E>,
import_data: EnvelopeImportData<E>, import_data: EnvelopeImportData<E>,
payload_verification_outcome: PayloadVerificationOutcome, payload_verification_outcome: PayloadVerificationOutcome,
spec: Arc<ChainSpec>,
) -> Self { ) -> Self {
match envelope { match envelope {
MaybeAvailableEnvelope::Available(available_envelope) => { MaybeAvailableEnvelope::Available(available_envelope) => {
@@ -139,11 +142,15 @@ impl<E: EthSpec> ExecutedEnvelope<E> {
payload_verification_outcome, payload_verification_outcome,
)) ))
} }
// TODO(gloas) implement availability pending // TODO(gloas): check data column availability via DA checker
MaybeAvailableEnvelope::AvailabilityPending { MaybeAvailableEnvelope::AvailabilityPending {
block_hash: _, block_hash,
envelope: _, envelope,
} => Self::AvailabilityPending(), } => Self::Available(AvailableExecutedEnvelope::new(
AvailableEnvelope::new(block_hash, envelope, vec![], None, spec),
import_data,
payload_verification_outcome,
)),
} }
} }
} }

View File

@@ -6,11 +6,11 @@
//! and publishes the payload. //! and publishes the payload.
use std::collections::HashMap; use std::collections::HashMap;
use types::{BlobsList, EthSpec, ExecutionPayloadEnvelope, KzgProofs, Slot}; use types::{BlobsList, EthSpec, ExecutionPayloadEnvelope, Slot};
pub struct PendingEnvelopeData<E: EthSpec> { pub struct PendingEnvelopeData<E: EthSpec> {
pub envelope: ExecutionPayloadEnvelope<E>, pub envelope: ExecutionPayloadEnvelope<E>,
pub blobs_and_proofs: Option<(BlobsList<E>, KzgProofs<E>)>, pub blobs: Option<BlobsList<E>>,
} }
/// Cache for pending execution payload envelopes awaiting publishing. /// Cache for pending execution payload envelopes awaiting publishing.
@@ -44,6 +44,7 @@ impl<E: EthSpec> PendingPayloadEnvelopes<E> {
/// Insert a pending envelope into the cache. /// Insert a pending envelope into the cache.
pub fn insert(&mut self, slot: Slot, data: PendingEnvelopeData<E>) { pub fn insert(&mut self, slot: Slot, data: PendingEnvelopeData<E>) {
// TODO(gloas): we may want to check for duplicates here, which shouldn't be allowed
self.envelopes.insert(slot, data); self.envelopes.insert(slot, data);
} }
@@ -53,10 +54,8 @@ impl<E: EthSpec> PendingPayloadEnvelopes<E> {
} }
/// Remove and return the blobs and proofs for a slot, leaving the envelope in place. /// Remove and return the blobs and proofs for a slot, leaving the envelope in place.
pub fn take_blobs(&mut self, slot: Slot) -> Option<(BlobsList<E>, KzgProofs<E>)> { pub fn take_blobs(&mut self, slot: Slot) -> Option<BlobsList<E>> {
self.envelopes self.envelopes.get_mut(&slot).and_then(|d| d.blobs.take())
.get_mut(&slot)
.and_then(|d| d.blobs_and_proofs.take())
} }
/// Remove and return a pending envelope by slot. /// Remove and return a pending envelope by slot.
@@ -92,7 +91,7 @@ impl<E: EthSpec> PendingPayloadEnvelopes<E> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use types::{ExecutionPayloadGloas, ExecutionRequests, Hash256, KzgProofs, MainnetEthSpec}; use types::{ExecutionPayloadGloas, ExecutionRequests, Hash256, MainnetEthSpec};
type E = MainnetEthSpec; type E = MainnetEthSpec;
@@ -107,7 +106,7 @@ mod tests {
builder_index: 0, builder_index: 0,
beacon_block_root: Hash256::ZERO, beacon_block_root: Hash256::ZERO,
}, },
blobs_and_proofs: None, blobs: None,
} }
} }
@@ -150,10 +149,9 @@ mod tests {
let slot = Slot::new(1); let slot = Slot::new(1);
let blobs = BlobsList::<E>::default(); let blobs = BlobsList::<E>::default();
let proofs = KzgProofs::<E>::default();
let data = PendingEnvelopeData { let data = PendingEnvelopeData {
envelope: make_envelope(slot).envelope, envelope: make_envelope(slot).envelope,
blobs_and_proofs: Some((blobs, proofs)), blobs: Some(blobs),
}; };
cache.insert(slot, data); cache.insert(slot, data);

View File

@@ -86,6 +86,8 @@ pub const FORK_NAME_ENV_VAR: &str = "FORK_NAME";
// `beacon_node/execution_layer/src/test_utils/fixtures/mainnet/test_blobs_bundle.ssz` // `beacon_node/execution_layer/src/test_utils/fixtures/mainnet/test_blobs_bundle.ssz`
pub const TEST_DATA_COLUMN_SIDECARS_SSZ: &[u8] = pub const TEST_DATA_COLUMN_SIDECARS_SSZ: &[u8] =
include_bytes!("test_utils/fixtures/test_data_column_sidecars.ssz"); include_bytes!("test_utils/fixtures/test_data_column_sidecars.ssz");
pub const TEST_DATA_COLUMN_SIDECARS_GLOAS_SSZ: &[u8] =
include_bytes!("test_utils/fixtures/test_data_column_sidecars_gloas.ssz");
// Default target aggregators to set during testing, this ensures an aggregator at each slot. // Default target aggregators to set during testing, this ensures an aggregator at each slot.
// //
@@ -3806,7 +3808,7 @@ pub fn generate_data_column_sidecars_from_block<E: EthSpec>(
let signed_block_header = block.signed_block_header(); let signed_block_header = block.signed_block_header();
let template_data_columns = let template_data_columns =
RuntimeVariableList::<DataColumnSidecarGloas<E>>::from_ssz_bytes( RuntimeVariableList::<DataColumnSidecarGloas<E>>::from_ssz_bytes(
TEST_DATA_COLUMN_SIDECARS_SSZ, TEST_DATA_COLUMN_SIDECARS_GLOAS_SSZ,
E::number_of_columns(), E::number_of_columns(),
) )
.unwrap(); .unwrap();

View File

@@ -119,16 +119,10 @@ async fn rpc_columns_with_invalid_header_signature() {
/// data columns can be built from those cached blobs. /// data columns can be built from those cached blobs.
#[tokio::test] #[tokio::test]
async fn gloas_envelope_blobs_produce_valid_columns() { async fn gloas_envelope_blobs_produce_valid_columns() {
// TODO(gloas): Need a Gloas-format test_data_column_sidecars.ssz fixture before this test
// can run. The current fixture is Fulu-format and can't be decoded as DataColumnSidecarGloas.
// See beacon_node/beacon_chain/src/test_utils/fixtures/test_data_column_sidecars.ssz
let spec = Arc::new(test_spec::<E>()); let spec = Arc::new(test_spec::<E>());
if !spec.is_gloas_scheduled() { if !spec.is_gloas_scheduled() {
return; return;
} }
return;
#[allow(unreachable_code)]
let harness = get_harness(VALIDATOR_COUNT, spec.clone(), NodeCustodyType::Supernode); let harness = get_harness(VALIDATOR_COUNT, spec.clone(), NodeCustodyType::Supernode);
harness.execution_block_generator().set_min_blob_count(1); harness.execution_block_generator().set_min_blob_count(1);
@@ -147,7 +141,7 @@ async fn gloas_envelope_blobs_produce_valid_columns() {
// Produce a Gloas block via the harness. This caches envelope + blobs. // Produce a Gloas block via the harness. This caches envelope + blobs.
let state = harness.get_current_state(); let state = harness.get_current_state();
let (block_contents, opt_envelope, post_state) = let (block_contents, opt_envelope, _post_state) =
harness.make_block_with_envelope(state, slot).await; harness.make_block_with_envelope(state, slot).await;
let signed_block = &block_contents.0; let signed_block = &block_contents.0;
@@ -187,42 +181,9 @@ async fn gloas_envelope_blobs_produce_valid_columns() {
assert_eq!(gloas_col.slot, slot); assert_eq!(gloas_col.slot, slot);
} }
// Process the block (without blobs so it's pending availability). // End-to-end DA flow (process_block → process_envelope → process_rpc_custody_columns)
let block_root = signed_block.canonical_root(); // is not exercised here: Gloas blocks are not gated on columns at block-import time
let availability = harness // and the envelope/column gating belongs in a dedicated test once the DA path matures.
.chain
.process_block(
block_root,
LookupBlock::new(signed_block.clone()),
NotifyExecutionLayer::Yes,
BlockImportSource::Lookup,
|| Ok(()),
)
.await
.unwrap();
assert_eq!(
availability,
AvailabilityProcessingStatus::MissingComponents(slot, block_root),
"Block should be pending availability without columns"
);
// Process the envelope.
let envelope = opt_envelope.unwrap();
harness
.process_envelope(block_root, envelope, &post_state, signed_block.state_root())
.await;
// Supply columns via RPC to make the block available.
let status = harness
.chain
.process_rpc_custody_columns(data_column_sidecars)
.await
.unwrap();
assert_eq!(
status,
AvailabilityProcessingStatus::Imported(block_root),
"Block should be imported after supplying data columns"
);
} }
// Regression test for verify_header_signature bug: it uses head_fork() which is wrong for fork blocks // Regression test for verify_header_signature bug: it uses head_fork() which is wrong for fork blocks

View File

@@ -10,8 +10,8 @@ use std::sync::Arc;
use types::data::FixedBlobSidecarList; use types::data::FixedBlobSidecarList;
use types::test_utils::TestRandom; use types::test_utils::TestRandom;
use types::{ use types::{
BlobSidecar, DataColumnSidecar, DataColumnSidecarFulu, DataColumnSidecarGloas, EthSpec, BlobSidecar, DataColumnSidecar, DataColumnSidecarFulu, DataColumnSidecarGloas, Domain, EthSpec,
MinimalEthSpec, Slot, MinimalEthSpec, PayloadAttestationData, PayloadAttestationMessage, SignedRoot, Slot,
}; };
type E = MinimalEthSpec; type E = MinimalEthSpec;
@@ -258,3 +258,177 @@ async fn head_event_on_block_import() {
panic!("Expected Head event, got {:?}", head_event); panic!("Expected Head event, got {:?}", head_event);
} }
} }
/// Verifies that `execution_payload_gossip` fires at gossip verification time, and
/// `execution_payload` + `execution_payload_available` fire at import time.
#[tokio::test]
async fn execution_payload_envelope_events() {
if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) {
return;
}
let harness = BeaconChainHarness::builder(E::default())
.default_spec()
.deterministic_keypairs(64)
.fresh_ephemeral_store()
.mock_execution_layer()
.build();
harness.extend_to_slot(Slot::new(1)).await;
let state = harness.get_current_state();
let target_slot = Slot::new(2);
harness.advance_slot();
let (block_contents, opt_envelope, _new_state) =
harness.make_block_with_envelope(state, target_slot).await;
let block_root = block_contents.0.canonical_root();
harness
.process_block(target_slot, block_root, block_contents)
.await
.expect("block should be processed");
let signed_envelope = opt_envelope.expect("Gloas block should produce an envelope");
let event_handler = harness.chain.event_handler.as_ref().unwrap();
let mut gossip_receiver = event_handler.subscribe_execution_payload_gossip();
let mut payload_receiver = event_handler.subscribe_execution_payload();
let mut available_receiver = event_handler.subscribe_execution_payload_available();
// Stage 1: gossip verification fires execution_payload_gossip only.
let gossip_verified = harness
.chain
.verify_envelope_for_gossip(Arc::new(signed_envelope))
.await
.expect("envelope gossip verification should succeed");
let gossip_event = gossip_receiver
.try_recv()
.expect("should receive execution_payload_gossip after gossip verification");
if let EventKind::ExecutionPayloadGossip(sse) = gossip_event {
assert_eq!(sse.slot, target_slot);
assert_eq!(sse.block_root, block_root);
} else {
panic!(
"Expected ExecutionPayloadGossip event, got {:?}",
gossip_event
);
}
assert!(payload_receiver.try_recv().is_err());
assert!(available_receiver.try_recv().is_err());
// Stage 2: import fires execution_payload and execution_payload_available.
harness
.chain
.process_execution_payload_envelope(
block_root,
gossip_verified,
beacon_chain::NotifyExecutionLayer::Yes,
types::BlockImportSource::Gossip,
#[allow(clippy::result_large_err)]
|| Ok(()),
)
.await
.expect("envelope import should succeed");
let payload_event = payload_receiver
.try_recv()
.expect("should receive execution_payload after import");
if let EventKind::ExecutionPayload(sse) = payload_event {
assert_eq!(sse.slot, target_slot);
assert_eq!(sse.block_root, block_root);
} else {
panic!("Expected ExecutionPayload event, got {:?}", payload_event);
}
let available_event = available_receiver
.try_recv()
.expect("should receive execution_payload_available after import");
if let EventKind::ExecutionPayloadAvailable(sse) = available_event {
assert_eq!(sse.slot, target_slot);
assert_eq!(sse.block_root, block_root);
} else {
panic!(
"Expected ExecutionPayloadAvailable event, got {:?}",
available_event
);
}
assert!(
gossip_receiver.try_recv().is_err(),
"no extra gossip events should fire during import"
);
}
/// Verifies that a `payload_attestation_message` event is emitted when a payload attestation
/// message passes gossip verification.
#[tokio::test]
async fn payload_attestation_message_event_on_gossip_verification() {
if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) {
return;
}
let harness = BeaconChainHarness::builder(E::default())
.default_spec()
.deterministic_keypairs(64)
.fresh_ephemeral_store()
.mock_execution_layer()
.build();
// Advance chain to have a valid head block.
let target_slot = Slot::new(1);
harness.extend_to_slot(target_slot).await;
let head = harness.chain.canonical_head.cached_head();
let head_state = &head.snapshot.beacon_state;
// Get a PTC member for this slot.
let ptc = head_state
.get_ptc(target_slot, &harness.spec)
.expect("should get PTC");
let validator_index = *ptc.0.first().expect("PTC should have at least one member") as u64;
// Sign a payload attestation.
let target_epoch = target_slot.epoch(E::slots_per_epoch());
let domain = harness.spec.get_domain(
target_epoch,
Domain::PTCAttester,
&head_state.fork(),
head_state.genesis_validators_root(),
);
let data = PayloadAttestationData {
beacon_block_root: head.head_block_root(),
slot: target_slot,
payload_present: true,
blob_data_available: true,
};
let message = data.signing_root(domain);
let signature = harness.validator_keypairs[validator_index as usize]
.sk
.sign(message);
let msg = PayloadAttestationMessage {
validator_index,
data: data.clone(),
signature: signature.clone(),
};
// Subscribe before verification.
let event_handler = harness.chain.event_handler.as_ref().unwrap();
let mut receiver = event_handler.subscribe_payload_attestation_message();
// Verify the attestation through the gossip path.
harness
.chain
.verify_payload_attestation_message_for_gossip(msg)
.expect("verification should succeed");
// Assert the event was emitted.
let event = receiver.try_recv().expect("should receive event");
if let EventKind::PayloadAttestationMessage(versioned) = event {
assert_eq!(versioned.data.validator_index, validator_index);
assert_eq!(versioned.data.data, data);
} else {
panic!("Expected PayloadAttestationMessage event, got {:?}", event);
}
}

View File

@@ -652,26 +652,22 @@ async fn gloas_block_production_caches_blobs_for_column_publishing() {
); );
// Take the blobs from the cache — this is what publish_execution_payload_envelope does. // Take the blobs from the cache — this is what publish_execution_payload_envelope does.
let blobs_and_proofs = harness let blobs = harness
.chain .chain
.pending_payload_envelopes .pending_payload_envelopes
.write() .write()
.take_blobs(slot); .take_blobs(slot);
assert!( assert!(
blobs_and_proofs.is_some(), blobs.is_some(),
"Blobs and proofs should be cached alongside the envelope" "Blobs should be cached alongside the envelope"
); );
let (blobs, kzg_proofs) = blobs_and_proofs.unwrap(); let blobs = blobs.unwrap();
assert!( assert!(
!blobs.is_empty(), !blobs.is_empty(),
"Blobs should be non-empty when min_blob_count >= 1" "Blobs should be non-empty when min_blob_count >= 1"
); );
assert!(
!kzg_proofs.is_empty(),
"KZG proofs should be non-empty when blobs are present"
);
// Verify take_blobs is consume-once. // Verify take_blobs is consume-once.
let second_take = harness let second_take = harness

View File

@@ -7,9 +7,7 @@ use crate::version::{
execution_optimistic_finalized_beacon_response, execution_optimistic_finalized_beacon_response,
}; };
use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use beacon_chain::payload_envelope_verification::EnvelopeError; use beacon_chain::{BeaconChain, BeaconChainTypes};
use beacon_chain::payload_envelope_verification::gossip_verified_envelope::GossipVerifiedEnvelope;
use beacon_chain::{BeaconChain, BeaconChainTypes, NotifyExecutionLayer};
use bytes::Bytes; use bytes::Bytes;
use eth2::types as api_types; use eth2::types as api_types;
use eth2::{CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER}; use eth2::{CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER};
@@ -21,7 +19,7 @@ use std::future::Future;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::UnboundedSender;
use tracing::{debug, error, info, warn}; use tracing::{debug, error, info, warn};
use types::{BlockImportSource, EthSpec, SignedBeaconBlock, SignedExecutionPayloadEnvelope}; use types::{EthSpec, SignedExecutionPayloadEnvelope};
use warp::{ use warp::{
Filter, Rejection, Reply, Filter, Rejection, Reply,
hyper::{Body, Response}, hyper::{Body, Response},
@@ -91,7 +89,9 @@ pub(crate) fn post_beacon_execution_payload_envelope<T: BeaconChainTypes>(
) )
.boxed() .boxed()
} }
/// Publishes a signed execution payload envelope to the network. /// Publishes a signed execution payload envelope to the network. Implements
/// `POST /eth/v1/beacon/execution_payload_envelope` per the in-flight beacon-APIs PR
/// <https://github.com/ethereum/beacon-APIs/pull/580>.
pub async fn publish_execution_payload_envelope<T: BeaconChainTypes>( pub async fn publish_execution_payload_envelope<T: BeaconChainTypes>(
envelope: SignedExecutionPayloadEnvelope<T::EthSpec>, envelope: SignedExecutionPayloadEnvelope<T::EthSpec>,
chain: Arc<BeaconChain<T>>, chain: Arc<BeaconChain<T>>,
@@ -111,20 +111,30 @@ pub async fn publish_execution_payload_envelope<T: BeaconChainTypes>(
let blobs_and_proofs = chain.pending_payload_envelopes.write().take_blobs(slot); let blobs_and_proofs = chain.pending_payload_envelopes.write().take_blobs(slot);
// The publish_fn is called inside process_execution_payload_envelope after consensus // Spawn the column-build task (CPU-bound KZG cell-and-proof computation) before
// verification but before the EL call. // publishing the envelope so it runs in parallel with envelope gossip, narrowing
let envelope_for_publish = signed_envelope.clone(); // the window in which peers see envelope-without-columns. If envelope publication
let sender = network_tx.clone(); // fails below, dropping this future drops the spawned `JoinHandle` (the running
let publish_fn = move || { // closure on the blocking pool finishes and is then discarded — no work cancellation).
info!( let column_build_future = match blobs_and_proofs {
%slot, Some(blobs) if !blobs.is_empty() => Some(spawn_build_gloas_data_columns_task(
%beacon_block_root, &chain,
builder_index, beacon_block_root,
"Publishing signed execution payload envelope to network" slot,
); blobs,
crate::utils::publish_pubsub_message( )?),
&sender, _ => None,
PubsubMessage::ExecutionPayload(Box::new((*envelope_for_publish).clone())), };
// Publish the envelope to the network.
crate::utils::publish_pubsub_message(
network_tx,
PubsubMessage::ExecutionPayload(Box::new(envelope)),
)
.map_err(|_| {
warn!(%slot, "Failed to publish execution payload envelope to network");
warp_utils::reject::custom_server_error(
"Unable to publish execution payload envelope to network".into(),
) )
.map_err(|_| { .map_err(|_| {
warn!(%slot, "Failed to publish execution payload envelope to network"); warn!(%slot, "Failed to publish execution payload envelope to network");
@@ -205,34 +215,81 @@ pub async fn publish_execution_payload_envelope<T: BeaconChainTypes>(
} }
} }
// From here on the envelope is on the wire. `take_blobs` already consumed the cache
// entry, so a retry would not republish columns; returning Err would mislead the
// caller. Log column-build/publish failures and fall through to `Ok`.
if let Some(column_build_future) = column_build_future {
let gossip_verified_columns = match column_build_future.await {
Ok(columns) => columns,
Err(e) => {
error!(
%slot,
error = ?e,
"Failed to build data columns after envelope publication"
);
return Ok(warp::reply().into_response());
}
};
if !gossip_verified_columns.is_empty() {
if let Err(e) = publish_column_sidecars(network_tx, &gossip_verified_columns, &chain) {
error!(
%slot,
error = ?e,
"Failed to publish data column sidecars after envelope publication"
);
return Ok(warp::reply().into_response());
}
let epoch = slot.epoch(T::EthSpec::slots_per_epoch());
let sampling_column_indices = chain.sampling_columns_for_epoch(epoch);
let sampling_columns = gossip_verified_columns
.into_iter()
.filter(|col| sampling_column_indices.contains(&col.index()))
.collect::<Vec<_>>();
// Local processing only — envelope already broadcast, so log and fall through.
if !sampling_columns.is_empty()
&& let Err(e) =
Box::pin(chain.process_gossip_data_columns(sampling_columns, || Ok(()))).await
{
error!(
%slot,
error = ?e,
"Failed to process sampling data columns during envelope publication"
);
}
}
}
Ok(warp::reply().into_response()) Ok(warp::reply().into_response())
} }
fn spawn_build_gloas_data_columns_task<T: BeaconChainTypes>( fn spawn_build_gloas_data_columns_task<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>, chain: &Arc<BeaconChain<T>>,
beacon_block_root: types::Hash256, beacon_block_root: types::Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
slot: types::Slot, slot: types::Slot,
blobs: types::BlobsList<T::EthSpec>, blobs: types::BlobsList<T::EthSpec>,
) -> Result<impl Future<Output = Result<Vec<GossipVerifiedDataColumn<T>>, Rejection>>, Rejection> { ) -> Result<impl Future<Output = Result<Vec<GossipVerifiedDataColumn<T>>, Rejection>>, Rejection> {
chain let chain_for_build = chain.clone();
.clone() let handle = chain
.task_executor .task_executor
.spawn_blocking_handle( .spawn_blocking_handle(
move || build_gloas_data_columns(&chain, beacon_block_root, block, slot, &blobs), move || build_gloas_data_columns(&chain_for_build, beacon_block_root, slot, &blobs),
"build_gloas_data_columns", "build_gloas_data_columns",
) )
.ok_or_else(|| warp_utils::reject::custom_server_error("runtime shutdown".to_string())) .ok_or_else(|| warp_utils::reject::custom_server_error("runtime shutdown".to_string()))?;
.map(|r| {
r.map_err(|_| warp_utils::reject::custom_server_error("join error".to_string())) Ok(async move {
.and_then(|output| async move { output }) handle
}) .await
.map_err(|_| warp_utils::reject::custom_server_error("join error".to_string()))?
})
} }
fn build_gloas_data_columns<T: BeaconChainTypes>( fn build_gloas_data_columns<T: BeaconChainTypes>(
chain: &BeaconChain<T>, chain: &BeaconChain<T>,
beacon_block_root: types::Hash256, beacon_block_root: types::Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
slot: types::Slot, slot: types::Slot,
blobs: &types::BlobsList<T::EthSpec>, blobs: &types::BlobsList<T::EthSpec>,
) -> Result<Vec<GossipVerifiedDataColumn<T>>, Rejection> { ) -> Result<Vec<GossipVerifiedDataColumn<T>>, Rejection> {
@@ -257,7 +314,7 @@ fn build_gloas_data_columns<T: BeaconChainTypes>(
.into_iter() .into_iter()
.filter_map(|col| { .filter_map(|col| {
let index = *col.index(); let index = *col.index();
match GossipVerifiedDataColumn::new_for_block_publishing(col, &block, chain) { match GossipVerifiedDataColumn::new_for_block_publishing(col, chain) {
Ok(verified) => Some(verified), Ok(verified) => Some(verified),
Err(GossipDataColumnError::PriorKnownUnpublished) => None, Err(GossipDataColumnError::PriorKnownUnpublished) => None,
Err(e) => { Err(e) => {

View File

@@ -11,7 +11,7 @@ clap = { workspace = true }
clap_utils = { workspace = true } clap_utils = { workspace = true }
educe = { workspace = true } educe = { workspace = true }
environment = { workspace = true } environment = { workspace = true }
eth2 = { workspace = true } eth2 = { workspace = true, features = ["lighthouse"] }
eth2_network_config = { workspace = true } eth2_network_config = { workspace = true }
eth2_wallet = { workspace = true } eth2_wallet = { workspace = true }
ethereum_serde_utils = { workspace = true } ethereum_serde_utils = { workspace = true }