diff --git a/beacon_node/http_api/src/beacon/pool.rs b/beacon_node/http_api/src/beacon/pool.rs index 8c39814d35..56cc7c493c 100644 --- a/beacon_node/http_api/src/beacon/pool.rs +++ b/beacon_node/http_api/src/beacon/pool.rs @@ -1,5 +1,8 @@ use crate::task_spawner::{Priority, TaskSpawner}; -use crate::utils::{NetworkTxFilter, OptionalConsensusVersionHeaderFilter, ResponseFilter}; +use crate::utils::{ + ChainFilter, EthV1Filter, NetworkTxFilter, OptionalConsensusVersionHeaderFilter, + ResponseFilter, TaskSpawnerFilter, +}; use crate::version::{ ResponseIncludesVersion, V1, V2, add_consensus_version_header, beacon_response, unsupported_version_rejection, @@ -7,11 +10,13 @@ use crate::version::{ use crate::{sync_committees, utils}; use beacon_chain::observed_operations::ObservationOutcome; use beacon_chain::{BeaconChain, BeaconChainTypes}; +use bytes::Bytes; use eth2::types::{AttestationPoolQuery, EndpointVersion, Failure, GenericResponse}; use lighthouse_network::PubsubMessage; use network::NetworkMessage; use operation_pool::ReceivedPreCapella; use slot_clock::SlotClock; +use ssz::{Decode, Encode}; use std::collections::HashSet; use std::sync::Arc; use tokio::sync::mpsc::UnboundedSender; @@ -522,9 +527,10 @@ pub fn post_beacon_pool_attestations_v2( .boxed() } -/// POST beacon/pool/payload_attestations +/// POST beacon/pool/payload_attestations (JSON) pub fn post_beacon_pool_payload_attestations( network_tx_filter: &NetworkTxFilter, + optional_consensus_version_header_filter: OptionalConsensusVersionHeaderFilter, beacon_pool_path: &BeaconPoolPathFilter, ) -> ResponseFilter { beacon_pool_path @@ -532,23 +538,80 @@ pub fn post_beacon_pool_payload_attestations( .and(warp::path("payload_attestations")) .and(warp::path::end()) .and(warp_utils::json::json()) + .and(optional_consensus_version_header_filter) .and(network_tx_filter.clone()) .then( |task_spawner: TaskSpawner, _chain: Arc>, messages: Vec, + _fork_name: Option, network_tx: UnboundedSender>| { task_spawner.blocking_json_task(Priority::P0, move || { - // TODO(gloas): add proper verification once payload_attestation_verification is implemented - for message in messages { - utils::publish_pubsub_message( - &network_tx, - PubsubMessage::PayloadAttestation(Box::new(message)), - )?; - } - Ok(()) + publish_payload_attestation_messages(&network_tx, messages) }) }, ) .boxed() } + +/// POST beacon/pool/payload_attestations (SSZ) +pub fn post_beacon_pool_payload_attestations_ssz( + eth_v1: EthV1Filter, + task_spawner_filter: TaskSpawnerFilter, + chain_filter: ChainFilter, + network_tx_filter: NetworkTxFilter, +) -> ResponseFilter { + eth_v1 + .and(warp::path("beacon")) + .and(warp::path("pool")) + .and(warp::path("payload_attestations")) + .and(warp::path::end()) + .and(warp::body::bytes()) + .and(task_spawner_filter) + .and(chain_filter) + .and(network_tx_filter) + .then( + |body_bytes: Bytes, + task_spawner: TaskSpawner, + _chain: Arc>, + network_tx: UnboundedSender>| { + task_spawner.blocking_json_task(Priority::P0, move || { + let item_len = ::ssz_fixed_len(); + if body_bytes.len() % item_len != 0 { + return Err(warp_utils::reject::custom_bad_request(format!( + "SSZ body length {} is not a multiple of PayloadAttestationMessage size {}", + body_bytes.len(), + item_len, + ))); + } + let messages: Vec = body_bytes + .chunks(item_len) + .map(|chunk| { + PayloadAttestationMessage::from_ssz_bytes(chunk).map_err(|e| { + warp_utils::reject::custom_bad_request(format!( + "invalid SSZ: {e:?}" + )) + }) + }) + .collect::>()?; + + publish_payload_attestation_messages(&network_tx, messages) + }) + }, + ) + .boxed() +} + +fn publish_payload_attestation_messages( + network_tx: &UnboundedSender>, + messages: Vec, +) -> Result<(), warp::Rejection> { + // TODO(gloas): add proper gossip verification and store in ptc op pool. + for message in messages { + utils::publish_pubsub_message( + network_tx, + PubsubMessage::PayloadAttestation(Box::new(message)), + )?; + } + Ok(()) +} diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index eafb978b38..b2d069f384 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1454,7 +1454,7 @@ pub fn serve( let post_beacon_pool_attestations_v2 = post_beacon_pool_attestations_v2( &network_tx_filter, - optional_consensus_version_header_filter, + optional_consensus_version_header_filter.clone(), &beacon_pool_path_v2, ); @@ -1488,8 +1488,19 @@ pub fn serve( post_beacon_pool_sync_committees(&network_tx_filter, &beacon_pool_path); // POST beacon/pool/payload_attestations - let post_beacon_pool_payload_attestations = - post_beacon_pool_payload_attestations(&network_tx_filter, &beacon_pool_path); + let post_beacon_pool_payload_attestations = post_beacon_pool_payload_attestations( + &network_tx_filter, + optional_consensus_version_header_filter, + &beacon_pool_path, + ); + + // POST beacon/pool/payload_attestations (SSZ) + let post_beacon_pool_payload_attestations_ssz = post_beacon_pool_payload_attestations_ssz( + eth_v1.clone(), + task_spawner_filter.clone(), + chain_filter.clone(), + network_tx_filter.clone(), + ); // GET beacon/pool/bls_to_execution_changes let get_beacon_pool_bls_to_execution_changes = @@ -3404,7 +3415,8 @@ pub fn serve( .uor(post_beacon_blocks_v2_ssz) .uor(post_beacon_blinded_blocks_ssz) .uor(post_beacon_blinded_blocks_v2_ssz) - .uor(post_beacon_execution_payload_envelope_ssz), + .uor(post_beacon_execution_payload_envelope_ssz) + .uor(post_beacon_pool_payload_attestations_ssz), ) .uor(post_beacon_blocks) .uor(post_beacon_blinded_blocks) diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index aac3384fbd..1db9fc4e4f 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -2793,6 +2793,62 @@ impl ApiTester { self } + pub async fn test_post_beacon_pool_payload_attestations_valid(mut self) -> Self { + let slot = self.chain.slot().unwrap(); + let head_root = self.chain.head_beacon_block_root(); + + let message = PayloadAttestationMessage { + validator_index: 0, + data: PayloadAttestationData { + beacon_block_root: head_root, + slot, + payload_present: true, + blob_data_available: true, + }, + signature: Signature::empty(), + }; + + self.client + .post_beacon_pool_payload_attestations(&[message]) + .await + .unwrap(); + + assert!( + self.network_rx.network_recv.recv().await.is_some(), + "valid payload attestation should be sent to network" + ); + + self + } + + pub async fn test_post_beacon_pool_payload_attestations_valid_ssz(mut self) -> Self { + let slot = self.chain.slot().unwrap(); + let head_root = self.chain.head_beacon_block_root(); + + let message = PayloadAttestationMessage { + validator_index: 0, + data: PayloadAttestationData { + beacon_block_root: head_root, + slot, + payload_present: true, + blob_data_available: true, + }, + signature: Signature::empty(), + }; + + self.client + .post_beacon_pool_payload_attestations_ssz(&[message]) + .await + .unwrap(); + + assert!( + self.network_rx.network_recv.recv().await.is_some(), + "valid payload attestation (SSZ) should be sent to network" + ); + + self + } + pub async fn test_get_config_fork_schedule(self) -> Self { let result = self.client.get_config_fork_schedule().await.unwrap().data; @@ -8246,6 +8302,19 @@ async fn get_validator_payload_attestation_data_pre_gloas() { .await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn post_beacon_pool_payload_attestations_valid() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + ApiTester::new() + .await + .test_post_beacon_pool_payload_attestations_valid() + .await + .test_post_beacon_pool_payload_attestations_valid_ssz() + .await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_aggregate_attestation_v1() { ApiTester::new() diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 25e3a8a3f4..bc02db1ad3 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -1789,7 +1789,7 @@ impl BeaconNodeHttpClient { Ok(()) } - /// `POST beacon/pool/payload_attestations` + /// `POST beacon/pool/payload_attestations` (JSON) pub async fn post_beacon_pool_payload_attestations( &self, messages: &[PayloadAttestationMessage], @@ -1802,7 +1802,34 @@ impl BeaconNodeHttpClient { .push("pool") .push("payload_attestations"); - self.post(path, &messages).await?; + self.post_generic_with_consensus_version(path, &messages, None, ForkName::Gloas) + .await?; + + Ok(()) + } + + /// `POST beacon/pool/payload_attestations` (SSZ) + pub async fn post_beacon_pool_payload_attestations_ssz( + &self, + messages: &[PayloadAttestationMessage], + ) -> Result<(), Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("pool") + .push("payload_attestations"); + + let ssz_body: Vec = messages.iter().flat_map(|m| m.as_ssz_bytes()).collect(); + + self.post_generic_with_consensus_version_and_ssz_body( + path, + ssz_body, + None, + ForkName::Gloas, + ) + .await?; Ok(()) } diff --git a/validator_client/lighthouse_validator_store/src/lib.rs b/validator_client/lighthouse_validator_store/src/lib.rs index a3ab2ccbe4..1b32777678 100644 --- a/validator_client/lighthouse_validator_store/src/lib.rs +++ b/validator_client/lighthouse_validator_store/src/lib.rs @@ -21,8 +21,8 @@ use tracing::{Instrument, debug, error, info, info_span, instrument, warn}; use types::{ AbstractExecPayload, Address, AggregateAndProof, Attestation, BeaconBlock, BlindedPayload, ChainSpec, ContributionAndProof, Domain, Epoch, EthSpec, ExecutionPayloadEnvelope, Fork, - FullPayload, Graffiti, Hash256, SelectionProof, SignedAggregateAndProof, SignedBeaconBlock, - PayloadAttestationData, PayloadAttestationMessage, SignedContributionAndProof, + FullPayload, Graffiti, Hash256, PayloadAttestationData, PayloadAttestationMessage, + SelectionProof, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedExecutionPayloadEnvelope, SignedRoot, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncAggregatorSelectionData, SyncCommitteeContribution, SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId, ValidatorRegistrationData, @@ -1429,10 +1429,8 @@ impl ValidatorStore for LighthouseValidatorS validator_pubkey: PublicKeyBytes, data: PayloadAttestationData, ) -> Result { - let signing_context = self.signing_context( - Domain::PTCAttester, - data.slot.epoch(E::slots_per_epoch()), - ); + let signing_context = + self.signing_context(Domain::PTCAttester, data.slot.epoch(E::slots_per_epoch())); let validator_index = self .validator_index(&validator_pubkey) @@ -1457,6 +1455,8 @@ impl ValidatorStore for LighthouseValidatorS }) } + /// Sign an `ExecutionPayloadEnvelope` for Gloas (local building). + /// The proposer acts as the builder and signs with the BeaconBuilder domain. async fn sign_execution_payload_envelope( &self, validator_pubkey: PublicKeyBytes, diff --git a/validator_client/validator_services/src/payload_attestation_service.rs b/validator_client/validator_services/src/payload_attestation_service.rs index 2ae089c762..02aff8f1fb 100644 --- a/validator_client/validator_services/src/payload_attestation_service.rs +++ b/validator_client/validator_services/src/payload_attestation_service.rs @@ -5,9 +5,9 @@ use slot_clock::SlotClock; use std::ops::Deref; use std::sync::Arc; use task_executor::TaskExecutor; -use tokio::time::{Duration, sleep}; -use tracing::{debug, error, info, warn}; -use types::{ChainSpec, EthSpec}; +use tokio::time::sleep; +use tracing::{debug, error, info}; +use types::ChainSpec; use validator_store::ValidatorStore; pub struct PayloadAttestationServiceBuilder { @@ -230,19 +230,40 @@ impl PayloadAttestationServ } let count = messages.len(); - match self + let result = self .beacon_nodes .first_success(|beacon_node| { let messages = messages.clone(); async move { beacon_node - .post_beacon_pool_payload_attestations(&messages) + .post_beacon_pool_payload_attestations_ssz(&messages) .await - .map_err(|e| format!("Failed to publish payload attestations: {e:?}")) + .map_err(|e| format!("Failed to publish payload attestations (SSZ): {e:?}")) } }) - .await - { + .await; + + let result = match result { + Ok(()) => Ok(()), + Err(_) => { + debug!(%slot, "SSZ publish failed, falling back to JSON"); + self.beacon_nodes + .first_success(|beacon_node| { + let messages = messages.clone(); + async move { + beacon_node + .post_beacon_pool_payload_attestations(&messages) + .await + .map_err(|e| { + format!("Failed to publish payload attestations (JSON): {e:?}") + }) + } + }) + .await + } + }; + + match result { Ok(()) => { info!( %slot,