diff --git a/validator_client/validator_services/src/payload_attestation_service.rs b/validator_client/validator_services/src/payload_attestation_service.rs index f4cd26552a..34b9f96f7f 100644 --- a/validator_client/validator_services/src/payload_attestation_service.rs +++ b/validator_client/validator_services/src/payload_attestation_service.rs @@ -1,5 +1,6 @@ use crate::duties_service::DutiesService; use beacon_node_fallback::BeaconNodeFallback; +use eth2::types::PtcDuty; use logging::crit; use slot_clock::SlotClock; use std::ops::Deref; @@ -7,7 +8,7 @@ use std::sync::Arc; use task_executor::TaskExecutor; use tokio::time::sleep; use tracing::{debug, error, info}; -use types::{ChainSpec, EthSpec, Slot}; +use types::{ChainSpec, EthSpec, PayloadAttestationData, Slot}; use validator_store::ValidatorStore; pub struct Inner { @@ -74,7 +75,9 @@ where let interval_fut = async move { loop { - self.run_update().await; + if let Err(e) = self.spawn_payload_attestation_tasks().await { + error!(error = e, "Failed to produce payload attestations"); + } } }; @@ -82,18 +85,32 @@ where Ok(()) } - async fn run_update(&self) { + async fn spawn_payload_attestation_tasks(&self) -> Result<(), String> { let Some(attestation_slot) = self.wait_for_attestation_slot().await else { - return; + return Ok(()); + }; + + let Some((duties, attestation_data)) = self + .produce_payload_attestation_data(attestation_slot) + .await? + else { + return Ok(()); }; let service = self.clone(); self.executor.spawn( async move { - service.produce_and_publish(attestation_slot).await; + if let Err(e) = service + .sign_and_publish(attestation_slot, duties, attestation_data) + .await + { + crit!(error = e, %attestation_slot, "Failed to publish payload attestations"); + } }, "payload_attestation_producer", ); + + Ok(()) } async fn wait_for_attestation_slot(&self) -> Option { @@ -136,11 +153,18 @@ where Some(attestation_slot) } - async fn produce_and_publish(&self, slot: types::Slot) { + /// Produce the payload attestation data for `slot`, returned alongside the duties to sign. + /// + /// Returns `Ok(None)` when there is nothing to publish (no duties, or no block for the slot) + /// and `Err` when data production failed. + async fn produce_payload_attestation_data( + &self, + slot: Slot, + ) -> Result, PayloadAttestationData)>, String> { let duties = self.duties_service.get_ptc_duties_for_slot(slot); if duties.is_empty() { - return; + return Ok(None); } debug!( @@ -167,15 +191,10 @@ where %slot, "No block received for slot, skipping payload attestation" ); - return; + return Ok(None); } Err(e) => { - error!( - error = %e, - %slot, - "Failed to produce payload attestation data" - ); - return; + return Err(e.to_string()); } }; @@ -186,6 +205,17 @@ where "Received payload attestation data" ); + Ok(Some((duties, attestation_data))) + } + + /// Sign `attestation_data` for each duty and publish the resulting messages, preferring SSZ + /// and falling back to JSON. + async fn sign_and_publish( + &self, + slot: Slot, + duties: Vec, + attestation_data: PayloadAttestationData, + ) -> Result<(), String> { let mut messages = Vec::with_capacity(duties.len()); for duty in &duties { @@ -209,7 +239,7 @@ where } if messages.is_empty() { - return; + return Ok(()); } let count = messages.len(); @@ -227,42 +257,31 @@ where }) .await; - let result = match result { - Ok(()) => Ok(()), - Err(_) => { - debug!(%slot, "SSZ publish failed, falling back to JSON"); - self.beacon_nodes - .first_success(|beacon_node| { - let messages = messages.clone(); - async move { - beacon_node - .post_beacon_pool_payload_attestations(&messages, fork_name) - .await - .map_err(|e| { - format!("Failed to publish payload attestations (JSON): {e:?}") - }) - } - }) - .await - } - }; - - match result { - Ok(()) => { - info!( - %slot, - %count, - "Successfully published payload attestations" - ); - } - Err(e) => { - crit!( - error = %e, - %slot, - "Failed to publish payload attestations" - ); - } + if result.is_err() { + debug!(%slot, "SSZ publish failed, falling back to JSON"); + self.beacon_nodes + .first_success(|beacon_node| { + let messages = messages.clone(); + async move { + beacon_node + .post_beacon_pool_payload_attestations(&messages, fork_name) + .await + .map_err(|e| { + format!("Failed to publish payload attestations (JSON): {e:?}") + }) + } + }) + .await + .map_err(|e| e.to_string())?; } + + info!( + %slot, + %count, + "Successfully published payload attestations" + ); + + Ok(()) } } @@ -529,7 +548,15 @@ mod tests { .mock_post_beacon_pool_payload_attestations(); let service = harness.service; - service.produce_and_publish(attestation_slot).await; + let (duties, attestation_data) = service + .produce_payload_attestation_data(attestation_slot) + .await + .unwrap() + .unwrap(); + service + .sign_and_publish(attestation_slot, duties, attestation_data) + .await + .unwrap(); let messages = harness .mock_beacon_node_1 @@ -591,7 +618,15 @@ mod tests { .mock_post_beacon_pool_payload_attestations(); let service = harness.service; - service.produce_and_publish(attestation_slot).await; + let (duties, attestation_data) = service + .produce_payload_attestation_data(attestation_slot) + .await + .unwrap() + .unwrap(); + service + .sign_and_publish(attestation_slot, duties, attestation_data) + .await + .unwrap(); // first_success function tries both beacon nodes for SSZ post payload attestation: // first pass: both fail (mock_ssz returns 500, mock_json does not support SSZ) @@ -625,9 +660,16 @@ mod tests { let service = harness.service; - // when there is no duty, produce_and_publish should return early + // when there is no duty, data production returns `None` so there is nothing to publish // therefore, the beacon node is not called, expected to hit 0 - service.produce_and_publish(Slot::new(1)).await; + let data = service + .produce_payload_attestation_data(Slot::new(1)) + .await + .unwrap(); + assert!( + data.is_none(), + "Expected no data to be produced without duties" + ); mock.expect(0).assert(); assert!( @@ -665,8 +707,11 @@ mod tests { .mock_post_beacon_pool_payload_attestations(); let service = harness.service; - // The produce_and_publish() should return early before reaching the POST endpoint - service.produce_and_publish(attestation_slot).await; + // Data production should error before any signing/publishing happens. + let result = service + .produce_payload_attestation_data(attestation_slot) + .await; + assert!(result.is_err()); // Both beacon nodes should not be called at all mock_ssz.expect(0).assert(); @@ -712,7 +757,15 @@ mod tests { .mock_post_beacon_pool_payload_attestations_ssz(Duration::from_secs(0)); let service = harness.service; - service.produce_and_publish(attestation_slot).await; + let (duties, attestation_data) = service + .produce_payload_attestation_data(attestation_slot) + .await + .unwrap() + .unwrap(); + service + .sign_and_publish(attestation_slot, duties, attestation_data) + .await + .unwrap(); let messages = harness .mock_beacon_node_1