Wire up ePBS SSE events and fix envelope availability (#9199)

Co-Authored-By: Jimmy Chen <jchen.tc@gmail.com>
This commit is contained in:
Jimmy Chen
2026-04-28 10:59:01 +02:00
committed by GitHub
parent 919c996c18
commit 280e2f1d53
5 changed files with 251 additions and 11 deletions

View File

@@ -6,6 +6,7 @@ use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes, metrics};
use bls::AggregateSignature;
use educe::Educe;
use eth2::types::{EventKind, ForkVersionedResponse};
use parking_lot::RwLock;
use safe_arith::SafeArith;
use slot_clock::SlotClock;
@@ -216,9 +217,24 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let _timer = metrics::start_timer(&metrics::PAYLOAD_ATTESTATION_GOSSIP_VERIFICATION_TIMES);
let ctx = self.payload_attestation_gossip_context();
VerifiedPayloadAttestationMessage::new(payload_attestation_message, &ctx).inspect(|_| {
metrics::inc_counter(&metrics::PAYLOAD_ATTESTATION_PROCESSING_SUCCESSES);
})
VerifiedPayloadAttestationMessage::new(payload_attestation_message, &ctx).inspect(
|verified| {
metrics::inc_counter(&metrics::PAYLOAD_ATTESTATION_PROCESSING_SUCCESSES);
if let Some(event_handler) = self.event_handler.as_ref()
&& event_handler.has_payload_attestation_message_subscribers()
{
let msg = verified.payload_attestation_message();
event_handler.register(EventKind::PayloadAttestationMessage(Box::new(
ForkVersionedResponse {
version: self.spec.fork_name_at_slot::<T::EthSpec>(msg.data.slot),
metadata: Default::default(),
data: msg.clone(),
},
)));
}
},
)
}
}

View File

@@ -6,6 +6,7 @@ use crate::{
proposer_preferences_verification::proposer_preference_cache::GossipVerifiedProposerPreferenceCache,
};
use educe::Educe;
use eth2::types::{EventKind, ForkVersionedResponse};
use slot_clock::SlotClock;
use state_processing::signature_sets::{
execution_payload_bid_signature_set, get_builder_pubkey_from_state,
@@ -233,6 +234,19 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
%parent_block_root,
"Successfully verified gossip payload bid"
);
if let Some(event_handler) = self.event_handler.as_ref()
&& event_handler.has_execution_payload_bid_subscribers()
{
event_handler.register(EventKind::ExecutionPayloadBid(Box::new(
ForkVersionedResponse {
version: self.spec.fork_name_at_slot::<T::EthSpec>(slot),
metadata: Default::default(),
data: (*verified.signed_bid).clone(),
},
)));
}
Ok(verified)
}
Err(e) => {

View File

@@ -1,7 +1,7 @@
use std::sync::Arc;
use std::time::Duration;
use eth2::types::{EventKind, SseExecutionPayload};
use eth2::types::{EventKind, SseExecutionPayload, SseExecutionPayloadAvailable};
use fork_choice::PayloadVerificationStatus;
use slot_clock::SlotClock;
use store::StoreOp;
@@ -182,6 +182,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
signed_envelope,
import_data,
payload_verification_outcome,
self.spec.clone(),
))
}
@@ -362,5 +363,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
execution_optimistic: payload_verification_status.is_optimistic(),
}));
}
// TODO(gloas): once the DA checker handles envelopes, this event should also be
// emitted from the DA resolution path (similar to `process_availability` for blocks).
if let Some(event_handler) = self.event_handler.as_ref()
&& event_handler.has_execution_payload_available_subscribers()
{
event_handler.register(EventKind::ExecutionPayloadAvailable(
SseExecutionPayloadAvailable {
slot: envelope_slot,
block_root,
},
));
}
}
}

View File

@@ -60,6 +60,22 @@ pub struct AvailableEnvelope<E: EthSpec> {
}
impl<E: EthSpec> AvailableEnvelope<E> {
pub fn new(
execution_block_hash: ExecutionBlockHash,
envelope: Arc<SignedExecutionPayloadEnvelope<E>>,
columns: DataColumnSidecarList<E>,
columns_available_timestamp: Option<std::time::Duration>,
spec: Arc<ChainSpec>,
) -> Self {
Self {
execution_block_hash,
envelope,
columns,
columns_available_timestamp,
spec,
}
}
pub fn message(&self) -> &ExecutionPayloadEnvelope<E> {
&self.envelope.message
}
@@ -104,9 +120,10 @@ pub struct EnvelopeProcessingSnapshot<E: EthSpec> {
/// fully available.
/// 2. `AvailabilityPending`: This envelope hasn't received all required blobs to consider it
/// fully available.
#[allow(dead_code)]
pub enum ExecutedEnvelope<E: EthSpec> {
Available(AvailableExecutedEnvelope<E>),
// TODO(gloas) implement availability pending
// TODO(gloas): check data column availability via DA checker
AvailabilityPending(),
}
@@ -115,6 +132,7 @@ impl<E: EthSpec> ExecutedEnvelope<E> {
envelope: MaybeAvailableEnvelope<E>,
import_data: EnvelopeImportData<E>,
payload_verification_outcome: PayloadVerificationOutcome,
spec: Arc<ChainSpec>,
) -> Self {
match envelope {
MaybeAvailableEnvelope::Available(available_envelope) => {
@@ -124,11 +142,15 @@ impl<E: EthSpec> ExecutedEnvelope<E> {
payload_verification_outcome,
))
}
// TODO(gloas) implement availability pending
// TODO(gloas): check data column availability via DA checker
MaybeAvailableEnvelope::AvailabilityPending {
block_hash: _,
envelope: _,
} => Self::AvailabilityPending(),
block_hash,
envelope,
} => Self::Available(AvailableExecutedEnvelope::new(
AvailableEnvelope::new(block_hash, envelope, vec![], None, spec),
import_data,
payload_verification_outcome,
)),
}
}
}