diff --git a/beacon_node/http_api/src/beacon/pool.rs b/beacon_node/http_api/src/beacon/pool.rs index 059573c317..c6b8a69643 100644 --- a/beacon_node/http_api/src/beacon/pool.rs +++ b/beacon_node/http_api/src/beacon/pool.rs @@ -1,24 +1,31 @@ use crate::task_spawner::{Priority, TaskSpawner}; -use crate::utils::{NetworkTxFilter, OptionalConsensusVersionHeaderFilter, ResponseFilter}; +use crate::utils::{ + ChainFilter, EthV1Filter, NetworkTxFilter, OptionalConsensusVersionHeaderFilter, + ResponseFilter, TaskSpawnerFilter, +}; use crate::version::{ ResponseIncludesVersion, V1, V2, add_consensus_version_header, beacon_response, unsupported_version_rejection, }; use crate::{sync_committees, utils}; use beacon_chain::observed_operations::ObservationOutcome; +use beacon_chain::payload_attestation_verification::Error as PayloadAttestationError; use beacon_chain::{BeaconChain, BeaconChainTypes}; +use bytes::Bytes; use eth2::types::{AttestationPoolQuery, EndpointVersion, Failure, GenericResponse}; use lighthouse_network::PubsubMessage; use network::NetworkMessage; use operation_pool::ReceivedPreCapella; use slot_clock::SlotClock; +use ssz::{Decode, Encode}; use std::collections::HashSet; use std::sync::Arc; use tokio::sync::mpsc::UnboundedSender; -use tracing::{debug, info, warn}; +use tracing::{debug, error, info, warn}; use types::{ - Attestation, AttestationData, AttesterSlashing, ForkName, ProposerSlashing, - SignedBlsToExecutionChange, SignedVoluntaryExit, SingleAttestation, SyncCommitteeMessage, + Attestation, AttestationData, AttesterSlashing, ForkName, PayloadAttestationMessage, + ProposerSlashing, SignedBlsToExecutionChange, SignedVoluntaryExit, SingleAttestation, + SyncCommitteeMessage, }; use warp::filters::BoxedFilter; use warp::{Filter, Reply}; @@ -520,3 +527,137 @@ pub fn post_beacon_pool_attestations_v2( ) .boxed() } + +/// POST beacon/pool/payload_attestations (JSON) +pub fn post_beacon_pool_payload_attestations( + network_tx_filter: &NetworkTxFilter, + optional_consensus_version_header_filter: OptionalConsensusVersionHeaderFilter, + beacon_pool_path: &BeaconPoolPathFilter, +) -> ResponseFilter { + beacon_pool_path + .clone() + .and(warp::path("payload_attestations")) + .and(warp::path::end()) + .and(warp_utils::json::json()) + .and(optional_consensus_version_header_filter) + .and(network_tx_filter.clone()) + .then( + |task_spawner: TaskSpawner, + chain: Arc>, + messages: Vec, + _fork_name: Option, + network_tx: UnboundedSender>| { + task_spawner.blocking_json_task(Priority::P0, move || { + publish_payload_attestation_messages(&chain, &network_tx, messages) + }) + }, + ) + .boxed() +} + +/// POST beacon/pool/payload_attestations (SSZ) +pub fn post_beacon_pool_payload_attestations_ssz( + eth_v1: EthV1Filter, + task_spawner_filter: TaskSpawnerFilter, + chain_filter: ChainFilter, + network_tx_filter: NetworkTxFilter, +) -> ResponseFilter { + eth_v1 + .and(warp::path("beacon")) + .and(warp::path("pool")) + .and(warp::path("payload_attestations")) + .and(warp::path::end()) + .and(warp::body::bytes()) + .and(task_spawner_filter) + .and(chain_filter) + .and(network_tx_filter) + .then( + |body_bytes: Bytes, + task_spawner: TaskSpawner, + chain: Arc>, + network_tx: UnboundedSender>| { + task_spawner.blocking_json_task(Priority::P0, move || { + let item_len = ::ssz_fixed_len(); + if !body_bytes.len().is_multiple_of(item_len) { + return Err(warp_utils::reject::custom_bad_request(format!( + "SSZ body length {} is not a multiple of PayloadAttestationMessage size {}", + body_bytes.len(), + item_len, + ))); + } + let messages: Vec = body_bytes + .chunks(item_len) + .map(|chunk| { + PayloadAttestationMessage::from_ssz_bytes(chunk).map_err(|e| { + warp_utils::reject::custom_bad_request(format!( + "invalid SSZ: {e:?}" + )) + }) + }) + .collect::>()?; + + publish_payload_attestation_messages(&chain, &network_tx, messages) + }) + }, + ) + .boxed() +} + +fn publish_payload_attestation_messages( + chain: &BeaconChain, + network_tx: &UnboundedSender>, + messages: Vec, +) -> Result<(), warp::Rejection> { + let mut failures = vec![]; + let mut num_already_known = 0; + + for (index, message) in messages.into_iter().enumerate() { + match chain.verify_payload_attestation_message_for_gossip(message.clone()) { + Ok(verified) => { + utils::publish_pubsub_message( + network_tx, + PubsubMessage::PayloadAttestation(Box::new(message)), + )?; + + if let Err(e) = chain.apply_payload_attestation_to_fork_choice( + verified.indexed_payload_attestation(), + verified.ptc(), + ) { + warn!( + error = ?e, + request_index = index, + "Payload attestation invalid for fork choice" + ); + } + } + Err(PayloadAttestationError::PriorPayloadAttestationMessageKnown { .. }) => { + num_already_known += 1; + } + // TODO(gloas): requeue for reprocessing like attestations do. + Err(e) => { + error!( + error = ?e, + request_index = index, + "Failure verifying payload attestation for gossip" + ); + failures.push(Failure::new(index, format!("{e:?}"))); + } + } + } + + if num_already_known > 0 { + debug!( + count = num_already_known, + "Some payload attestations already known" + ); + } + + if failures.is_empty() { + Ok(()) + } else { + Err(warp_utils::reject::indexed_bad_request( + "error processing payload attestations".to_string(), + failures, + )) + } +} diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index bd80dd1e82..b2d069f384 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1454,7 +1454,7 @@ pub fn serve( let post_beacon_pool_attestations_v2 = post_beacon_pool_attestations_v2( &network_tx_filter, - optional_consensus_version_header_filter, + optional_consensus_version_header_filter.clone(), &beacon_pool_path_v2, ); @@ -1487,6 +1487,21 @@ pub fn serve( let post_beacon_pool_sync_committees = post_beacon_pool_sync_committees(&network_tx_filter, &beacon_pool_path); + // POST beacon/pool/payload_attestations + let post_beacon_pool_payload_attestations = post_beacon_pool_payload_attestations( + &network_tx_filter, + optional_consensus_version_header_filter, + &beacon_pool_path, + ); + + // POST beacon/pool/payload_attestations (SSZ) + let post_beacon_pool_payload_attestations_ssz = post_beacon_pool_payload_attestations_ssz( + eth_v1.clone(), + task_spawner_filter.clone(), + chain_filter.clone(), + network_tx_filter.clone(), + ); + // GET beacon/pool/bls_to_execution_changes let get_beacon_pool_bls_to_execution_changes = get_beacon_pool_bls_to_execution_changes(&beacon_pool_path); @@ -3400,7 +3415,8 @@ pub fn serve( .uor(post_beacon_blocks_v2_ssz) .uor(post_beacon_blinded_blocks_ssz) .uor(post_beacon_blinded_blocks_v2_ssz) - .uor(post_beacon_execution_payload_envelope_ssz), + .uor(post_beacon_execution_payload_envelope_ssz) + .uor(post_beacon_pool_payload_attestations_ssz), ) .uor(post_beacon_blocks) .uor(post_beacon_blinded_blocks) @@ -3411,6 +3427,7 @@ pub fn serve( .uor(post_beacon_pool_proposer_slashings) .uor(post_beacon_pool_voluntary_exits) .uor(post_beacon_pool_sync_committees) + .uor(post_beacon_pool_payload_attestations) .uor(post_beacon_pool_bls_to_execution_changes) .uor(post_beacon_execution_payload_envelope) .uor(post_beacon_state_validators) diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index aac3384fbd..b8326f4495 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -2793,6 +2793,89 @@ impl ApiTester { self } + fn make_valid_payload_attestation_message( + &self, + ptc_offset: usize, + ) -> PayloadAttestationMessage { + let head = self.chain.head_snapshot(); + let head_slot = head.beacon_block.slot(); + let head_root = head.beacon_block_root; + let fork = head.beacon_state.fork(); + let genesis_validators_root = self.chain.genesis_validators_root; + + let ptc = head + .beacon_state + .get_ptc(head_slot, &self.chain.spec) + .expect("should get PTC"); + + // Find distinct validator indices in the PTC (may contain duplicates due to + // weighted sampling with a small validator set). + let mut seen = std::collections::HashSet::new(); + let distinct_indices: Vec = ptc + .0 + .iter() + .copied() + .filter(|idx| seen.insert(*idx)) + .collect(); + let validator_index = distinct_indices[ptc_offset % distinct_indices.len()]; + + let data = PayloadAttestationData { + beacon_block_root: head_root, + slot: head_slot, + payload_present: true, + blob_data_available: true, + }; + + let epoch = head_slot.epoch(E::slots_per_epoch()); + let domain = + self.chain + .spec + .get_domain(epoch, Domain::PTCAttester, &fork, genesis_validators_root); + let signing_root = data.signing_root(domain); + let sk = &self.validator_keypairs()[validator_index].sk; + let signature = sk.sign(signing_root); + + PayloadAttestationMessage { + validator_index: validator_index as u64, + data, + signature, + } + } + + pub async fn test_post_beacon_pool_payload_attestations_valid(mut self) -> Self { + let message = self.make_valid_payload_attestation_message(0); + let fork_name = self.chain.spec.fork_name_at_slot::(message.data.slot); + + self.client + .post_beacon_pool_payload_attestations(&[message], fork_name) + .await + .unwrap(); + + assert!( + self.network_rx.network_recv.recv().await.is_some(), + "valid payload attestation should be sent to network" + ); + + self + } + + pub async fn test_post_beacon_pool_payload_attestations_valid_ssz(mut self) -> Self { + let message = self.make_valid_payload_attestation_message(1); + let fork_name = self.chain.spec.fork_name_at_slot::(message.data.slot); + + self.client + .post_beacon_pool_payload_attestations_ssz(&[message], fork_name) + .await + .unwrap(); + + assert!( + self.network_rx.network_recv.recv().await.is_some(), + "valid payload attestation (SSZ) should be sent to network" + ); + + self + } + pub async fn test_get_config_fork_schedule(self) -> Self { let result = self.client.get_config_fork_schedule().await.unwrap().data; @@ -8246,6 +8329,19 @@ async fn get_validator_payload_attestation_data_pre_gloas() { .await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn post_beacon_pool_payload_attestations_valid() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + ApiTester::new() + .await + .test_post_beacon_pool_payload_attestations_valid() + .await + .test_post_beacon_pool_payload_attestations_valid_ssz() + .await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_aggregate_attestation_v1() { ApiTester::new() diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 4ec75468a2..e866547b9f 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -46,7 +46,7 @@ use ssz::{Decode, Encode}; use std::fmt; use std::future::Future; use std::time::Duration; -use types::PayloadAttestationData; +use types::{PayloadAttestationData, PayloadAttestationMessage}; pub const V1: EndpointVersion = EndpointVersion(1); pub const V2: EndpointVersion = EndpointVersion(2); @@ -1789,6 +1789,48 @@ impl BeaconNodeHttpClient { Ok(()) } + /// `POST beacon/pool/payload_attestations` (JSON) + pub async fn post_beacon_pool_payload_attestations( + &self, + messages: &[PayloadAttestationMessage], + fork_name: ForkName, + ) -> Result<(), Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("pool") + .push("payload_attestations"); + + self.post_generic_with_consensus_version(path, &messages, None, fork_name) + .await?; + + Ok(()) + } + + /// `POST beacon/pool/payload_attestations` (SSZ) + pub async fn post_beacon_pool_payload_attestations_ssz( + &self, + messages: &[PayloadAttestationMessage], + fork_name: ForkName, + ) -> Result<(), Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("pool") + .push("payload_attestations"); + + let ssz_body: Vec = messages.iter().flat_map(|m| m.as_ssz_bytes()).collect(); + + self.post_generic_with_consensus_version_and_ssz_body(path, ssz_body, None, fork_name) + .await?; + + Ok(()) + } + /// `POST beacon/pool/bls_to_execution_changes` pub async fn post_beacon_pool_bls_to_execution_changes( &self, diff --git a/validator_client/lighthouse_validator_store/src/lib.rs b/validator_client/lighthouse_validator_store/src/lib.rs index c5bcd88eb1..1b32777678 100644 --- a/validator_client/lighthouse_validator_store/src/lib.rs +++ b/validator_client/lighthouse_validator_store/src/lib.rs @@ -21,11 +21,12 @@ use tracing::{Instrument, debug, error, info, info_span, instrument, warn}; use types::{ AbstractExecPayload, Address, AggregateAndProof, Attestation, BeaconBlock, BlindedPayload, ChainSpec, ContributionAndProof, Domain, Epoch, EthSpec, ExecutionPayloadEnvelope, Fork, - FullPayload, Graffiti, Hash256, SelectionProof, SignedAggregateAndProof, SignedBeaconBlock, - SignedContributionAndProof, SignedExecutionPayloadEnvelope, SignedRoot, - SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncAggregatorSelectionData, - SyncCommitteeContribution, SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId, - ValidatorRegistrationData, VoluntaryExit, graffiti::GraffitiString, + FullPayload, Graffiti, Hash256, PayloadAttestationData, PayloadAttestationMessage, + SelectionProof, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, + SignedExecutionPayloadEnvelope, SignedRoot, SignedValidatorRegistrationData, + SignedVoluntaryExit, Slot, SyncAggregatorSelectionData, SyncCommitteeContribution, + SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId, ValidatorRegistrationData, + VoluntaryExit, graffiti::GraffitiString, }; use validator_store::{ AggregateToSign, AttestationToSign, ContributionToSign, DoppelgangerStatus, @@ -1423,6 +1424,37 @@ impl ValidatorStore for LighthouseValidatorS }) } + async fn sign_payload_attestation( + &self, + validator_pubkey: PublicKeyBytes, + data: PayloadAttestationData, + ) -> Result { + let signing_context = + self.signing_context(Domain::PTCAttester, data.slot.epoch(E::slots_per_epoch())); + + let validator_index = self + .validator_index(&validator_pubkey) + .ok_or(ValidatorStoreError::UnknownPubkey(validator_pubkey))?; + + let signing_method = self.doppelganger_bypassed_signing_method(validator_pubkey)?; + + let signature = signing_method + .get_signature::>( + SignableMessage::PayloadAttestationData(&data), + signing_context, + &self.spec, + &self.task_executor, + ) + .await + .map_err(Error::SpecificError)?; + + Ok(PayloadAttestationMessage { + validator_index, + data, + signature, + }) + } + /// Sign an `ExecutionPayloadEnvelope` for Gloas (local building). /// The proposer acts as the builder and signs with the BeaconBuilder domain. async fn sign_execution_payload_envelope( diff --git a/validator_client/signing_method/src/lib.rs b/validator_client/signing_method/src/lib.rs index c132d86c17..2f80fa5761 100644 --- a/validator_client/signing_method/src/lib.rs +++ b/validator_client/signing_method/src/lib.rs @@ -50,6 +50,7 @@ pub enum SignableMessage<'a, E: EthSpec, Payload: AbstractExecPayload = FullP ValidatorRegistration(&'a ValidatorRegistrationData), VoluntaryExit(&'a VoluntaryExit), ExecutionPayloadEnvelope(&'a ExecutionPayloadEnvelope), + PayloadAttestationData(&'a PayloadAttestationData), } impl> SignableMessage<'_, E, Payload> { @@ -72,6 +73,7 @@ impl> SignableMessage<'_, E, Payload SignableMessage::ValidatorRegistration(v) => v.signing_root(domain), SignableMessage::VoluntaryExit(exit) => exit.signing_root(domain), SignableMessage::ExecutionPayloadEnvelope(e) => e.signing_root(domain), + SignableMessage::PayloadAttestationData(d) => d.signing_root(domain), } } } @@ -238,6 +240,9 @@ impl SigningMethod { SignableMessage::ExecutionPayloadEnvelope(e) => { Web3SignerObject::ExecutionPayloadEnvelope(e) } + SignableMessage::PayloadAttestationData(d) => { + Web3SignerObject::PayloadAttestationData(d) + } }; // Determine the Web3Signer message type. diff --git a/validator_client/signing_method/src/web3signer.rs b/validator_client/signing_method/src/web3signer.rs index e6fc8f3ba2..c2b7e06f92 100644 --- a/validator_client/signing_method/src/web3signer.rs +++ b/validator_client/signing_method/src/web3signer.rs @@ -21,6 +21,7 @@ pub enum MessageType { ValidatorRegistration, // TODO(gloas) verify w/ web3signer specs ExecutionPayloadEnvelope, + PayloadAttestation, } #[derive(Debug, PartialEq, Copy, Clone, Serialize)] @@ -78,6 +79,7 @@ pub enum Web3SignerObject<'a, E: EthSpec, Payload: AbstractExecPayload> { ContributionAndProof(&'a ContributionAndProof), ValidatorRegistration(&'a ValidatorRegistrationData), ExecutionPayloadEnvelope(&'a ExecutionPayloadEnvelope), + PayloadAttestationData(&'a PayloadAttestationData), } impl<'a, E: EthSpec, Payload: AbstractExecPayload> Web3SignerObject<'a, E, Payload> { @@ -144,6 +146,7 @@ impl<'a, E: EthSpec, Payload: AbstractExecPayload> Web3SignerObject<'a, E, Pa } Web3SignerObject::ValidatorRegistration(_) => MessageType::ValidatorRegistration, Web3SignerObject::ExecutionPayloadEnvelope(_) => MessageType::ExecutionPayloadEnvelope, + Web3SignerObject::PayloadAttestationData(_) => MessageType::PayloadAttestation, } } } diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index e26d5c3d30..b412db45f6 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -45,6 +45,7 @@ use validator_services::{ block_service::{BlockService, BlockServiceBuilder}, duties_service::{self, DutiesService, DutiesServiceBuilder}, latency_service, + payload_attestation_service::PayloadAttestationService, preparation_service::{PreparationService, PreparationServiceBuilder}, sync_committee_service::SyncCommitteeService, }; @@ -83,6 +84,7 @@ pub struct ProductionValidatorClient { block_service: BlockService, SystemTimeSlotClock>, attestation_service: AttestationService, SystemTimeSlotClock>, sync_committee_service: SyncCommitteeService, SystemTimeSlotClock>, + payload_attestation_service: PayloadAttestationService, SystemTimeSlotClock>, doppelganger_service: Option>, preparation_service: PreparationService, SystemTimeSlotClock>, validator_store: Arc>, @@ -552,12 +554,22 @@ impl ProductionValidatorClient { context.executor.clone(), ); + let payload_attestation_service = PayloadAttestationService::new( + duties_service.clone(), + validator_store.clone(), + slot_clock.clone(), + beacon_nodes.clone(), + context.executor.clone(), + context.eth2_config.spec.clone(), + ); + Ok(Self { context, duties_service, block_service, attestation_service, sync_committee_service, + payload_attestation_service, doppelganger_service, preparation_service, validator_store, @@ -629,6 +641,13 @@ impl ProductionValidatorClient { .start_update_service(&self.context.eth2_config.spec) .map_err(|e| format!("Unable to start sync committee service: {}", e))?; + if self.context.eth2_config.spec.is_gloas_scheduled() { + self.payload_attestation_service + .clone() + .start_update_service() + .map_err(|e| format!("Unable to start payload attestation service: {}", e))?; + } + self.preparation_service .clone() .start_update_service(&self.context.eth2_config.spec) diff --git a/validator_client/validator_services/src/lib.rs b/validator_client/validator_services/src/lib.rs index 3b8bd9ae14..0169335a7f 100644 --- a/validator_client/validator_services/src/lib.rs +++ b/validator_client/validator_services/src/lib.rs @@ -3,6 +3,7 @@ pub mod block_service; pub mod duties_service; pub mod latency_service; pub mod notifier_service; +pub mod payload_attestation_service; pub mod preparation_service; pub mod sync; pub mod sync_committee_service; diff --git a/validator_client/validator_services/src/payload_attestation_service.rs b/validator_client/validator_services/src/payload_attestation_service.rs new file mode 100644 index 0000000000..2f3ca8bed2 --- /dev/null +++ b/validator_client/validator_services/src/payload_attestation_service.rs @@ -0,0 +1,238 @@ +use crate::duties_service::DutiesService; +use beacon_node_fallback::BeaconNodeFallback; +use logging::crit; +use slot_clock::SlotClock; +use std::ops::Deref; +use std::sync::Arc; +use task_executor::TaskExecutor; +use tokio::time::sleep; +use tracing::{debug, error, info}; +use types::{ChainSpec, EthSpec}; +use validator_store::ValidatorStore; + +pub struct Inner { + duties_service: Arc>, + validator_store: Arc, + slot_clock: T, + beacon_nodes: Arc>, + executor: TaskExecutor, + chain_spec: Arc, +} + +pub struct PayloadAttestationService { + inner: Arc>, +} + +impl Clone for PayloadAttestationService { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl Deref for PayloadAttestationService { + type Target = Inner; + + fn deref(&self) -> &Self::Target { + self.inner.deref() + } +} + +impl PayloadAttestationService { + pub fn new( + duties_service: Arc>, + validator_store: Arc, + slot_clock: T, + beacon_nodes: Arc>, + executor: TaskExecutor, + chain_spec: Arc, + ) -> Self { + Self { + inner: Arc::new(Inner { + duties_service, + validator_store, + slot_clock, + beacon_nodes, + executor, + chain_spec, + }), + } + } + + pub fn start_update_service(self) -> Result<(), String> { + let slot_duration = self.chain_spec.get_slot_duration(); + let payload_attestation_due = self.chain_spec.get_payload_attestation_due(); + + info!( + payload_attestation_due_ms = payload_attestation_due.as_millis(), + "Payload attestation service started" + ); + + let executor = self.executor.clone(); + + let interval_fut = async move { + loop { + let Some(duration_to_next_slot) = self.slot_clock.duration_to_next_slot() else { + error!("Failed to read slot clock"); + sleep(slot_duration).await; + continue; + }; + + let Some(current_slot) = self.slot_clock.now() else { + error!("Failed to read slot clock after trigger"); + continue; + }; + + if !self + .chain_spec + .fork_name_at_slot::(current_slot) + .gloas_enabled() + { + let duration_to_next_epoch = self + .slot_clock + .duration_to_next_epoch(S::E::slots_per_epoch()) + .unwrap_or_else(|| { + self.chain_spec.get_slot_duration() * S::E::slots_per_epoch() as u32 + }); + sleep(duration_to_next_epoch).await; + continue; + } + + sleep(duration_to_next_slot + payload_attestation_due).await; + + let service = self.clone(); + self.executor.spawn( + async move { + service.produce_and_publish(current_slot).await; + }, + "payload_attestation_producer", + ); + } + }; + + executor.spawn(interval_fut, "payload_attestation_service"); + Ok(()) + } + + async fn produce_and_publish(&self, slot: types::Slot) { + let duties = self.duties_service.get_ptc_duties_for_slot(slot); + + if duties.is_empty() { + return; + } + + debug!( + %slot, + duty_count = duties.len(), + "Producing payload attestations" + ); + + let attestation_data = match self + .beacon_nodes + .first_success(|beacon_node| async move { + beacon_node + .get_validator_payload_attestation_data(slot) + .await + .map_err(|e| format!("Failed to get payload attestation data: {e:?}")) + .map(|resp| resp.into_data()) + }) + .await + { + Ok(data) => data, + Err(e) => { + crit!( + error = %e, + %slot, + "Failed to produce payload attestation data" + ); + return; + } + }; + + debug!( + %slot, + beacon_block_root = ?attestation_data.beacon_block_root, + payload_present = attestation_data.payload_present, + "Received payload attestation data" + ); + + let mut messages = Vec::with_capacity(duties.len()); + + for duty in &duties { + match self + .validator_store + .sign_payload_attestation(duty.pubkey, attestation_data.clone()) + .await + { + Ok(message) => { + messages.push(message); + } + Err(e) => { + crit!( + error = ?e, + validator = ?duty.pubkey, + %slot, + "Failed to sign payload attestation" + ); + } + } + } + + if messages.is_empty() { + return; + } + + let count = messages.len(); + let fork_name = self.chain_spec.fork_name_at_slot::(slot); + let result = self + .beacon_nodes + .first_success(|beacon_node| { + let messages = messages.clone(); + async move { + beacon_node + .post_beacon_pool_payload_attestations_ssz(&messages, fork_name) + .await + .map_err(|e| format!("Failed to publish payload attestations (SSZ): {e:?}")) + } + }) + .await; + + let result = match result { + Ok(()) => Ok(()), + Err(_) => { + debug!(%slot, "SSZ publish failed, falling back to JSON"); + self.beacon_nodes + .first_success(|beacon_node| { + let messages = messages.clone(); + async move { + beacon_node + .post_beacon_pool_payload_attestations(&messages, fork_name) + .await + .map_err(|e| { + format!("Failed to publish payload attestations (JSON): {e:?}") + }) + } + }) + .await + } + }; + + match result { + Ok(()) => { + info!( + %slot, + %count, + "Successfully published payload attestations" + ); + } + Err(e) => { + crit!( + error = %e, + %slot, + "Failed to publish payload attestations" + ); + } + } + } +} diff --git a/validator_client/validator_store/src/lib.rs b/validator_client/validator_store/src/lib.rs index da0b33de18..4e5b415a41 100644 --- a/validator_client/validator_store/src/lib.rs +++ b/validator_client/validator_store/src/lib.rs @@ -7,10 +7,11 @@ use std::future::Future; use std::sync::Arc; use types::{ Address, Attestation, AttestationError, BlindedBeaconBlock, Epoch, EthSpec, - ExecutionPayloadEnvelope, Graffiti, Hash256, SelectionProof, SignedAggregateAndProof, - SignedBlindedBeaconBlock, SignedContributionAndProof, SignedExecutionPayloadEnvelope, - SignedValidatorRegistrationData, Slot, SyncCommitteeContribution, SyncCommitteeMessage, - SyncSelectionProof, SyncSubnetId, ValidatorRegistrationData, + ExecutionPayloadEnvelope, Graffiti, Hash256, PayloadAttestationData, PayloadAttestationMessage, + SelectionProof, SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedContributionAndProof, + SignedExecutionPayloadEnvelope, SignedValidatorRegistrationData, Slot, + SyncCommitteeContribution, SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId, + ValidatorRegistrationData, }; #[derive(Debug, PartialEq, Clone)] @@ -205,6 +206,13 @@ pub trait ValidatorStore: Send + Sync { envelope: ExecutionPayloadEnvelope, ) -> impl Future, Error>> + Send; + /// Sign a `PayloadAttestationData` for the PTC. + fn sign_payload_attestation( + &self, + validator_pubkey: PublicKeyBytes, + data: PayloadAttestationData, + ) -> impl Future>> + Send; + /// Returns `ProposalData` for the provided `pubkey` if it exists in `InitializedValidators`. /// `ProposalData` fields include defaulting logic described in `get_fee_recipient_defaulting`, /// `get_gas_limit_defaulting`, and `get_builder_proposals_defaulting`.