Merge branch 'unstable' into gloas-head-block-number

This commit is contained in:
Eitan Seri-Levi
2026-06-17 14:00:59 -07:00
committed by GitHub

View File

@@ -1,5 +1,6 @@
use crate::duties_service::DutiesService;
use beacon_node_fallback::BeaconNodeFallback;
use eth2::types::PtcDuty;
use logging::crit;
use slot_clock::SlotClock;
use std::ops::Deref;
@@ -7,7 +8,7 @@ use std::sync::Arc;
use task_executor::TaskExecutor;
use tokio::time::sleep;
use tracing::{debug, error, info};
use types::{ChainSpec, EthSpec, Slot};
use types::{ChainSpec, EthSpec, PayloadAttestationData, Slot};
use validator_store::ValidatorStore;
pub struct Inner<S, T> {
@@ -74,7 +75,9 @@ where
let interval_fut = async move {
loop {
self.run_update().await;
if let Err(e) = self.spawn_payload_attestation_tasks().await {
error!(error = e, "Failed to produce payload attestations");
}
}
};
@@ -82,18 +85,32 @@ where
Ok(())
}
async fn run_update(&self) {
async fn spawn_payload_attestation_tasks(&self) -> Result<(), String> {
let Some(attestation_slot) = self.wait_for_attestation_slot().await else {
return;
return Ok(());
};
let Some((duties, attestation_data)) = self
.produce_payload_attestation_data(attestation_slot)
.await?
else {
return Ok(());
};
let service = self.clone();
self.executor.spawn(
async move {
service.produce_and_publish(attestation_slot).await;
if let Err(e) = service
.sign_and_publish(attestation_slot, duties, attestation_data)
.await
{
crit!(error = e, %attestation_slot, "Failed to publish payload attestations");
}
},
"payload_attestation_producer",
);
Ok(())
}
async fn wait_for_attestation_slot(&self) -> Option<Slot> {
@@ -136,11 +153,18 @@ where
Some(attestation_slot)
}
async fn produce_and_publish(&self, slot: types::Slot) {
/// Produce the payload attestation data for `slot`, returned alongside the duties to sign.
///
/// Returns `Ok(None)` when there is nothing to publish (no duties, or no block for the slot)
/// and `Err` when data production failed.
async fn produce_payload_attestation_data(
&self,
slot: Slot,
) -> Result<Option<(Vec<PtcDuty>, PayloadAttestationData)>, String> {
let duties = self.duties_service.get_ptc_duties_for_slot(slot);
if duties.is_empty() {
return;
return Ok(None);
}
debug!(
@@ -167,15 +191,10 @@ where
%slot,
"No block received for slot, skipping payload attestation"
);
return;
return Ok(None);
}
Err(e) => {
error!(
error = %e,
%slot,
"Failed to produce payload attestation data"
);
return;
return Err(e.to_string());
}
};
@@ -186,6 +205,17 @@ where
"Received payload attestation data"
);
Ok(Some((duties, attestation_data)))
}
/// Sign `attestation_data` for each duty and publish the resulting messages, preferring SSZ
/// and falling back to JSON.
async fn sign_and_publish(
&self,
slot: Slot,
duties: Vec<PtcDuty>,
attestation_data: PayloadAttestationData,
) -> Result<(), String> {
let mut messages = Vec::with_capacity(duties.len());
for duty in &duties {
@@ -209,7 +239,7 @@ where
}
if messages.is_empty() {
return;
return Ok(());
}
let count = messages.len();
@@ -227,42 +257,31 @@ where
})
.await;
let result = match result {
Ok(()) => Ok(()),
Err(_) => {
debug!(%slot, "SSZ publish failed, falling back to JSON");
self.beacon_nodes
.first_success(|beacon_node| {
let messages = messages.clone();
async move {
beacon_node
.post_beacon_pool_payload_attestations(&messages, fork_name)
.await
.map_err(|e| {
format!("Failed to publish payload attestations (JSON): {e:?}")
})
}
})
.await
}
};
match result {
Ok(()) => {
info!(
%slot,
%count,
"Successfully published payload attestations"
);
}
Err(e) => {
crit!(
error = %e,
%slot,
"Failed to publish payload attestations"
);
}
if result.is_err() {
debug!(%slot, "SSZ publish failed, falling back to JSON");
self.beacon_nodes
.first_success(|beacon_node| {
let messages = messages.clone();
async move {
beacon_node
.post_beacon_pool_payload_attestations(&messages, fork_name)
.await
.map_err(|e| {
format!("Failed to publish payload attestations (JSON): {e:?}")
})
}
})
.await
.map_err(|e| e.to_string())?;
}
info!(
%slot,
%count,
"Successfully published payload attestations"
);
Ok(())
}
}
@@ -529,7 +548,15 @@ mod tests {
.mock_post_beacon_pool_payload_attestations();
let service = harness.service;
service.produce_and_publish(attestation_slot).await;
let (duties, attestation_data) = service
.produce_payload_attestation_data(attestation_slot)
.await
.unwrap()
.unwrap();
service
.sign_and_publish(attestation_slot, duties, attestation_data)
.await
.unwrap();
let messages = harness
.mock_beacon_node_1
@@ -591,7 +618,15 @@ mod tests {
.mock_post_beacon_pool_payload_attestations();
let service = harness.service;
service.produce_and_publish(attestation_slot).await;
let (duties, attestation_data) = service
.produce_payload_attestation_data(attestation_slot)
.await
.unwrap()
.unwrap();
service
.sign_and_publish(attestation_slot, duties, attestation_data)
.await
.unwrap();
// first_success function tries both beacon nodes for SSZ post payload attestation:
// first pass: both fail (mock_ssz returns 500, mock_json does not support SSZ)
@@ -625,9 +660,16 @@ mod tests {
let service = harness.service;
// when there is no duty, produce_and_publish should return early
// when there is no duty, data production returns `None` so there is nothing to publish
// therefore, the beacon node is not called, expected to hit 0
service.produce_and_publish(Slot::new(1)).await;
let data = service
.produce_payload_attestation_data(Slot::new(1))
.await
.unwrap();
assert!(
data.is_none(),
"Expected no data to be produced without duties"
);
mock.expect(0).assert();
assert!(
@@ -665,8 +707,11 @@ mod tests {
.mock_post_beacon_pool_payload_attestations();
let service = harness.service;
// The produce_and_publish() should return early before reaching the POST endpoint
service.produce_and_publish(attestation_slot).await;
// Data production should error before any signing/publishing happens.
let result = service
.produce_payload_attestation_data(attestation_slot)
.await;
assert!(result.is_err());
// Both beacon nodes should not be called at all
mock_ssz.expect(0).assert();
@@ -712,7 +757,15 @@ mod tests {
.mock_post_beacon_pool_payload_attestations_ssz(Duration::from_secs(0));
let service = harness.service;
service.produce_and_publish(attestation_slot).await;
let (duties, attestation_data) = service
.produce_payload_attestation_data(attestation_slot)
.await
.unwrap()
.unwrap();
service
.sign_and_publish(attestation_slot, duties, attestation_data)
.await
.unwrap();
let messages = harness
.mock_beacon_node_1