From 0a1bdf840b571d8c7d118b450e2ea497edf60e18 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Mon, 27 Apr 2026 08:48:06 +0200 Subject: [PATCH] Add payload attestation validator duty --- beacon_node/http_api/src/beacon/pool.rs | 36 ++- beacon_node/http_api/src/lib.rs | 5 + common/eth2/src/lib.rs | 20 +- .../lighthouse_validator_store/src/lib.rs | 44 ++- validator_client/signing_method/src/lib.rs | 5 + .../signing_method/src/web3signer.rs | 3 + validator_client/src/lib.rs | 17 ++ .../validator_services/src/lib.rs | 1 + .../src/payload_attestation_service.rs | 262 ++++++++++++++++++ validator_client/validator_store/src/lib.rs | 16 +- 10 files changed, 396 insertions(+), 13 deletions(-) create mode 100644 validator_client/validator_services/src/payload_attestation_service.rs diff --git a/beacon_node/http_api/src/beacon/pool.rs b/beacon_node/http_api/src/beacon/pool.rs index 059573c317..8c39814d35 100644 --- a/beacon_node/http_api/src/beacon/pool.rs +++ b/beacon_node/http_api/src/beacon/pool.rs @@ -17,8 +17,9 @@ use std::sync::Arc; use tokio::sync::mpsc::UnboundedSender; use tracing::{debug, info, warn}; use types::{ - Attestation, AttestationData, AttesterSlashing, ForkName, ProposerSlashing, - SignedBlsToExecutionChange, SignedVoluntaryExit, SingleAttestation, SyncCommitteeMessage, + Attestation, AttestationData, AttesterSlashing, ForkName, PayloadAttestationMessage, + ProposerSlashing, SignedBlsToExecutionChange, SignedVoluntaryExit, SingleAttestation, + SyncCommitteeMessage, }; use warp::filters::BoxedFilter; use warp::{Filter, Reply}; @@ -520,3 +521,34 @@ pub fn post_beacon_pool_attestations_v2( ) .boxed() } + +/// POST beacon/pool/payload_attestations +pub fn post_beacon_pool_payload_attestations( + network_tx_filter: &NetworkTxFilter, + beacon_pool_path: &BeaconPoolPathFilter, +) -> ResponseFilter { + beacon_pool_path + .clone() + .and(warp::path("payload_attestations")) + .and(warp::path::end()) + .and(warp_utils::json::json()) + .and(network_tx_filter.clone()) + .then( + |task_spawner: TaskSpawner, + _chain: Arc>, + messages: Vec, + network_tx: UnboundedSender>| { + task_spawner.blocking_json_task(Priority::P0, move || { + // TODO(gloas): add proper verification once payload_attestation_verification is implemented + for message in messages { + utils::publish_pubsub_message( + &network_tx, + PubsubMessage::PayloadAttestation(Box::new(message)), + )?; + } + Ok(()) + }) + }, + ) + .boxed() +} diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index bd80dd1e82..eafb978b38 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1487,6 +1487,10 @@ pub fn serve( let post_beacon_pool_sync_committees = post_beacon_pool_sync_committees(&network_tx_filter, &beacon_pool_path); + // POST beacon/pool/payload_attestations + let post_beacon_pool_payload_attestations = + post_beacon_pool_payload_attestations(&network_tx_filter, &beacon_pool_path); + // GET beacon/pool/bls_to_execution_changes let get_beacon_pool_bls_to_execution_changes = get_beacon_pool_bls_to_execution_changes(&beacon_pool_path); @@ -3411,6 +3415,7 @@ pub fn serve( .uor(post_beacon_pool_proposer_slashings) .uor(post_beacon_pool_voluntary_exits) .uor(post_beacon_pool_sync_committees) + .uor(post_beacon_pool_payload_attestations) .uor(post_beacon_pool_bls_to_execution_changes) .uor(post_beacon_execution_payload_envelope) .uor(post_beacon_state_validators) diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 4ec75468a2..25e3a8a3f4 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -46,7 +46,7 @@ use ssz::{Decode, Encode}; use std::fmt; use std::future::Future; use std::time::Duration; -use types::PayloadAttestationData; +use types::{PayloadAttestationData, PayloadAttestationMessage}; pub const V1: EndpointVersion = EndpointVersion(1); pub const V2: EndpointVersion = EndpointVersion(2); @@ -1789,6 +1789,24 @@ impl BeaconNodeHttpClient { Ok(()) } + /// `POST beacon/pool/payload_attestations` + pub async fn post_beacon_pool_payload_attestations( + &self, + messages: &[PayloadAttestationMessage], + ) -> Result<(), Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("pool") + .push("payload_attestations"); + + self.post(path, &messages).await?; + + Ok(()) + } + /// `POST beacon/pool/bls_to_execution_changes` pub async fn post_beacon_pool_bls_to_execution_changes( &self, diff --git a/validator_client/lighthouse_validator_store/src/lib.rs b/validator_client/lighthouse_validator_store/src/lib.rs index c5bcd88eb1..a3ab2ccbe4 100644 --- a/validator_client/lighthouse_validator_store/src/lib.rs +++ b/validator_client/lighthouse_validator_store/src/lib.rs @@ -22,10 +22,11 @@ use types::{ AbstractExecPayload, Address, AggregateAndProof, Attestation, BeaconBlock, BlindedPayload, ChainSpec, ContributionAndProof, Domain, Epoch, EthSpec, ExecutionPayloadEnvelope, Fork, FullPayload, Graffiti, Hash256, SelectionProof, SignedAggregateAndProof, SignedBeaconBlock, - SignedContributionAndProof, SignedExecutionPayloadEnvelope, SignedRoot, - SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncAggregatorSelectionData, - SyncCommitteeContribution, SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId, - ValidatorRegistrationData, VoluntaryExit, graffiti::GraffitiString, + PayloadAttestationData, PayloadAttestationMessage, SignedContributionAndProof, + SignedExecutionPayloadEnvelope, SignedRoot, SignedValidatorRegistrationData, + SignedVoluntaryExit, Slot, SyncAggregatorSelectionData, SyncCommitteeContribution, + SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId, ValidatorRegistrationData, + VoluntaryExit, graffiti::GraffitiString, }; use validator_store::{ AggregateToSign, AttestationToSign, ContributionToSign, DoppelgangerStatus, @@ -1423,8 +1424,39 @@ impl ValidatorStore for LighthouseValidatorS }) } - /// Sign an `ExecutionPayloadEnvelope` for Gloas (local building). - /// The proposer acts as the builder and signs with the BeaconBuilder domain. + async fn sign_payload_attestation( + &self, + validator_pubkey: PublicKeyBytes, + data: PayloadAttestationData, + ) -> Result { + let signing_context = self.signing_context( + Domain::PTCAttester, + data.slot.epoch(E::slots_per_epoch()), + ); + + let validator_index = self + .validator_index(&validator_pubkey) + .ok_or(ValidatorStoreError::UnknownPubkey(validator_pubkey))?; + + let signing_method = self.doppelganger_bypassed_signing_method(validator_pubkey)?; + + let signature = signing_method + .get_signature::>( + SignableMessage::PayloadAttestationData(&data), + signing_context, + &self.spec, + &self.task_executor, + ) + .await + .map_err(Error::SpecificError)?; + + Ok(PayloadAttestationMessage { + validator_index, + data, + signature, + }) + } + async fn sign_execution_payload_envelope( &self, validator_pubkey: PublicKeyBytes, diff --git a/validator_client/signing_method/src/lib.rs b/validator_client/signing_method/src/lib.rs index c132d86c17..2f80fa5761 100644 --- a/validator_client/signing_method/src/lib.rs +++ b/validator_client/signing_method/src/lib.rs @@ -50,6 +50,7 @@ pub enum SignableMessage<'a, E: EthSpec, Payload: AbstractExecPayload = FullP ValidatorRegistration(&'a ValidatorRegistrationData), VoluntaryExit(&'a VoluntaryExit), ExecutionPayloadEnvelope(&'a ExecutionPayloadEnvelope), + PayloadAttestationData(&'a PayloadAttestationData), } impl> SignableMessage<'_, E, Payload> { @@ -72,6 +73,7 @@ impl> SignableMessage<'_, E, Payload SignableMessage::ValidatorRegistration(v) => v.signing_root(domain), SignableMessage::VoluntaryExit(exit) => exit.signing_root(domain), SignableMessage::ExecutionPayloadEnvelope(e) => e.signing_root(domain), + SignableMessage::PayloadAttestationData(d) => d.signing_root(domain), } } } @@ -238,6 +240,9 @@ impl SigningMethod { SignableMessage::ExecutionPayloadEnvelope(e) => { Web3SignerObject::ExecutionPayloadEnvelope(e) } + SignableMessage::PayloadAttestationData(d) => { + Web3SignerObject::PayloadAttestationData(d) + } }; // Determine the Web3Signer message type. diff --git a/validator_client/signing_method/src/web3signer.rs b/validator_client/signing_method/src/web3signer.rs index e6fc8f3ba2..c2b7e06f92 100644 --- a/validator_client/signing_method/src/web3signer.rs +++ b/validator_client/signing_method/src/web3signer.rs @@ -21,6 +21,7 @@ pub enum MessageType { ValidatorRegistration, // TODO(gloas) verify w/ web3signer specs ExecutionPayloadEnvelope, + PayloadAttestation, } #[derive(Debug, PartialEq, Copy, Clone, Serialize)] @@ -78,6 +79,7 @@ pub enum Web3SignerObject<'a, E: EthSpec, Payload: AbstractExecPayload> { ContributionAndProof(&'a ContributionAndProof), ValidatorRegistration(&'a ValidatorRegistrationData), ExecutionPayloadEnvelope(&'a ExecutionPayloadEnvelope), + PayloadAttestationData(&'a PayloadAttestationData), } impl<'a, E: EthSpec, Payload: AbstractExecPayload> Web3SignerObject<'a, E, Payload> { @@ -144,6 +146,7 @@ impl<'a, E: EthSpec, Payload: AbstractExecPayload> Web3SignerObject<'a, E, Pa } Web3SignerObject::ValidatorRegistration(_) => MessageType::ValidatorRegistration, Web3SignerObject::ExecutionPayloadEnvelope(_) => MessageType::ExecutionPayloadEnvelope, + Web3SignerObject::PayloadAttestationData(_) => MessageType::PayloadAttestation, } } } diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index e26d5c3d30..3fbb52fcff 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -45,6 +45,7 @@ use validator_services::{ block_service::{BlockService, BlockServiceBuilder}, duties_service::{self, DutiesService, DutiesServiceBuilder}, latency_service, + payload_attestation_service::{PayloadAttestationService, PayloadAttestationServiceBuilder}, preparation_service::{PreparationService, PreparationServiceBuilder}, sync_committee_service::SyncCommitteeService, }; @@ -83,6 +84,7 @@ pub struct ProductionValidatorClient { block_service: BlockService, SystemTimeSlotClock>, attestation_service: AttestationService, SystemTimeSlotClock>, sync_committee_service: SyncCommitteeService, SystemTimeSlotClock>, + payload_attestation_service: PayloadAttestationService, SystemTimeSlotClock>, doppelganger_service: Option>, preparation_service: PreparationService, SystemTimeSlotClock>, validator_store: Arc>, @@ -552,12 +554,22 @@ impl ProductionValidatorClient { context.executor.clone(), ); + let payload_attestation_service = PayloadAttestationServiceBuilder::new() + .duties_service(duties_service.clone()) + .validator_store(validator_store.clone()) + .slot_clock(slot_clock.clone()) + .beacon_nodes(beacon_nodes.clone()) + .executor(context.executor.clone()) + .chain_spec(context.eth2_config.spec.clone()) + .build()?; + Ok(Self { context, duties_service, block_service, attestation_service, sync_committee_service, + payload_attestation_service, doppelganger_service, preparation_service, validator_store, @@ -629,6 +641,11 @@ impl ProductionValidatorClient { .start_update_service(&self.context.eth2_config.spec) .map_err(|e| format!("Unable to start sync committee service: {}", e))?; + self.payload_attestation_service + .clone() + .start_update_service(&self.context.eth2_config.spec) + .map_err(|e| format!("Unable to start payload attestation service: {}", e))?; + self.preparation_service .clone() .start_update_service(&self.context.eth2_config.spec) diff --git a/validator_client/validator_services/src/lib.rs b/validator_client/validator_services/src/lib.rs index 3b8bd9ae14..0169335a7f 100644 --- a/validator_client/validator_services/src/lib.rs +++ b/validator_client/validator_services/src/lib.rs @@ -3,6 +3,7 @@ pub mod block_service; pub mod duties_service; pub mod latency_service; pub mod notifier_service; +pub mod payload_attestation_service; pub mod preparation_service; pub mod sync; pub mod sync_committee_service; diff --git a/validator_client/validator_services/src/payload_attestation_service.rs b/validator_client/validator_services/src/payload_attestation_service.rs new file mode 100644 index 0000000000..2ae089c762 --- /dev/null +++ b/validator_client/validator_services/src/payload_attestation_service.rs @@ -0,0 +1,262 @@ +use crate::duties_service::DutiesService; +use beacon_node_fallback::BeaconNodeFallback; +use logging::crit; +use slot_clock::SlotClock; +use std::ops::Deref; +use std::sync::Arc; +use task_executor::TaskExecutor; +use tokio::time::{Duration, sleep}; +use tracing::{debug, error, info, warn}; +use types::{ChainSpec, EthSpec}; +use validator_store::ValidatorStore; + +pub struct PayloadAttestationServiceBuilder { + duties_service: Option>>, + validator_store: Option>, + slot_clock: Option, + beacon_nodes: Option>>, + executor: Option, + chain_spec: Option>, +} + +impl PayloadAttestationServiceBuilder { + pub fn new() -> Self { + Self { + duties_service: None, + validator_store: None, + slot_clock: None, + beacon_nodes: None, + executor: None, + chain_spec: None, + } + } + + pub fn duties_service(mut self, service: Arc>) -> Self { + self.duties_service = Some(service); + self + } + + pub fn validator_store(mut self, store: Arc) -> Self { + self.validator_store = Some(store); + self + } + + pub fn slot_clock(mut self, slot_clock: T) -> Self { + self.slot_clock = Some(slot_clock); + self + } + + pub fn beacon_nodes(mut self, beacon_nodes: Arc>) -> Self { + self.beacon_nodes = Some(beacon_nodes); + self + } + + pub fn executor(mut self, executor: TaskExecutor) -> Self { + self.executor = Some(executor); + self + } + + pub fn chain_spec(mut self, chain_spec: Arc) -> Self { + self.chain_spec = Some(chain_spec); + self + } + + pub fn build(self) -> Result, String> { + Ok(PayloadAttestationService { + inner: Arc::new(Inner { + duties_service: self + .duties_service + .ok_or("Cannot build PayloadAttestationService without duties_service")?, + validator_store: self + .validator_store + .ok_or("Cannot build PayloadAttestationService without validator_store")?, + slot_clock: self + .slot_clock + .ok_or("Cannot build PayloadAttestationService without slot_clock")?, + beacon_nodes: self + .beacon_nodes + .ok_or("Cannot build PayloadAttestationService without beacon_nodes")?, + executor: self + .executor + .ok_or("Cannot build PayloadAttestationService without executor")?, + chain_spec: self + .chain_spec + .ok_or("Cannot build PayloadAttestationService without chain_spec")?, + }), + }) + } +} + +pub struct Inner { + duties_service: Arc>, + validator_store: Arc, + slot_clock: T, + beacon_nodes: Arc>, + executor: TaskExecutor, + chain_spec: Arc, +} + +pub struct PayloadAttestationService { + inner: Arc>, +} + +impl Clone for PayloadAttestationService { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl Deref for PayloadAttestationService { + type Target = Inner; + + fn deref(&self) -> &Self::Target { + self.inner.deref() + } +} + +impl PayloadAttestationService { + pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> { + let slot_duration = spec.get_slot_duration(); + let payload_attestation_due = spec.get_payload_attestation_due(); + + info!( + payload_attestation_due_ms = payload_attestation_due.as_millis(), + "Payload attestation service started" + ); + + let executor = self.executor.clone(); + + let interval_fut = async move { + loop { + let Some(duration_to_next_slot) = self.slot_clock.duration_to_next_slot() else { + error!("Failed to read slot clock"); + sleep(slot_duration).await; + continue; + }; + + sleep(duration_to_next_slot + payload_attestation_due).await; + + let Some(current_slot) = self.slot_clock.now() else { + error!("Failed to read slot clock after trigger"); + continue; + }; + + let duties = self.duties_service.get_ptc_duties_for_slot(current_slot); + if duties.is_empty() { + continue; + } + + debug!( + %current_slot, + duty_count = duties.len(), + "Producing payload attestations" + ); + + let service = self.clone(); + self.executor.spawn( + async move { + service.produce_and_publish(current_slot).await; + }, + "payload_attestation_producer", + ); + } + }; + + executor.spawn(interval_fut, "payload_attestation_service"); + Ok(()) + } + + async fn produce_and_publish(&self, slot: types::Slot) { + let duties = self.duties_service.get_ptc_duties_for_slot(slot); + if duties.is_empty() { + return; + } + + let attestation_data = match self + .beacon_nodes + .first_success(|beacon_node| async move { + beacon_node + .get_validator_payload_attestation_data(slot) + .await + .map_err(|e| format!("Failed to get payload attestation data: {e:?}")) + .map(|resp| resp.into_data()) + }) + .await + { + Ok(data) => data, + Err(e) => { + crit!( + error = %e, + %slot, + "Failed to produce payload attestation data" + ); + return; + } + }; + + debug!( + %slot, + beacon_block_root = ?attestation_data.beacon_block_root, + payload_present = attestation_data.payload_present, + "Received payload attestation data" + ); + + let mut messages = Vec::with_capacity(duties.len()); + + for duty in &duties { + match self + .validator_store + .sign_payload_attestation(duty.pubkey, attestation_data.clone()) + .await + { + Ok(message) => { + messages.push(message); + } + Err(e) => { + crit!( + error = ?e, + validator = ?duty.pubkey, + %slot, + "Failed to sign payload attestation" + ); + } + } + } + + if messages.is_empty() { + return; + } + + let count = messages.len(); + match self + .beacon_nodes + .first_success(|beacon_node| { + let messages = messages.clone(); + async move { + beacon_node + .post_beacon_pool_payload_attestations(&messages) + .await + .map_err(|e| format!("Failed to publish payload attestations: {e:?}")) + } + }) + .await + { + Ok(()) => { + info!( + %slot, + %count, + "Successfully published payload attestations" + ); + } + Err(e) => { + crit!( + error = %e, + %slot, + "Failed to publish payload attestations" + ); + } + } + } +} diff --git a/validator_client/validator_store/src/lib.rs b/validator_client/validator_store/src/lib.rs index da0b33de18..4e5b415a41 100644 --- a/validator_client/validator_store/src/lib.rs +++ b/validator_client/validator_store/src/lib.rs @@ -7,10 +7,11 @@ use std::future::Future; use std::sync::Arc; use types::{ Address, Attestation, AttestationError, BlindedBeaconBlock, Epoch, EthSpec, - ExecutionPayloadEnvelope, Graffiti, Hash256, SelectionProof, SignedAggregateAndProof, - SignedBlindedBeaconBlock, SignedContributionAndProof, SignedExecutionPayloadEnvelope, - SignedValidatorRegistrationData, Slot, SyncCommitteeContribution, SyncCommitteeMessage, - SyncSelectionProof, SyncSubnetId, ValidatorRegistrationData, + ExecutionPayloadEnvelope, Graffiti, Hash256, PayloadAttestationData, PayloadAttestationMessage, + SelectionProof, SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedContributionAndProof, + SignedExecutionPayloadEnvelope, SignedValidatorRegistrationData, Slot, + SyncCommitteeContribution, SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId, + ValidatorRegistrationData, }; #[derive(Debug, PartialEq, Clone)] @@ -205,6 +206,13 @@ pub trait ValidatorStore: Send + Sync { envelope: ExecutionPayloadEnvelope, ) -> impl Future, Error>> + Send; + /// Sign a `PayloadAttestationData` for the PTC. + fn sign_payload_attestation( + &self, + validator_pubkey: PublicKeyBytes, + data: PayloadAttestationData, + ) -> impl Future>> + Send; + /// Returns `ProposalData` for the provided `pubkey` if it exists in `InitializedValidators`. /// `ProposalData` fields include defaulting logic described in `get_fee_recipient_defaulting`, /// `get_gas_limit_defaulting`, and `get_builder_proposals_defaulting`.