mirror of
https://github.com/sigp/lighthouse.git
synced 2026-06-29 19:04:27 +00:00
Refactor payload attestation service (#9492)
Refactors the payload attestation service - Returns a `Result`. we need this for #9434 so we can keep track if we've succeeded with producing payload attestation earlier than the deadline - Separates getting payload attestation data and signing + publishing. This mimics what we do in attestation service and also is needed for #9434 to surface the error while still keeping the same spawning mechanism. In #9434 we want to broadcast payload attestations early if we've already seen an avail envelope. If the SSE event fires, but for some reason getting the payload attestation data from the BN fails, we still want to retry at the deadline. If signing + publishing fails we wont retry at the deadline (similar to the attestation service). Co-Authored-By: Eitan Seri-Levi <eserilev@ucsc.edu>
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
use crate::duties_service::DutiesService;
|
||||
use beacon_node_fallback::BeaconNodeFallback;
|
||||
use eth2::types::PtcDuty;
|
||||
use logging::crit;
|
||||
use slot_clock::SlotClock;
|
||||
use std::ops::Deref;
|
||||
@@ -7,7 +8,7 @@ use std::sync::Arc;
|
||||
use task_executor::TaskExecutor;
|
||||
use tokio::time::sleep;
|
||||
use tracing::{debug, error, info};
|
||||
use types::{ChainSpec, EthSpec, Slot};
|
||||
use types::{ChainSpec, EthSpec, PayloadAttestationData, Slot};
|
||||
use validator_store::ValidatorStore;
|
||||
|
||||
pub struct Inner<S, T> {
|
||||
@@ -74,7 +75,9 @@ where
|
||||
|
||||
let interval_fut = async move {
|
||||
loop {
|
||||
self.run_update().await;
|
||||
if let Err(e) = self.spawn_payload_attestation_tasks().await {
|
||||
error!(error = e, "Failed to produce payload attestations");
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -82,18 +85,32 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_update(&self) {
|
||||
async fn spawn_payload_attestation_tasks(&self) -> Result<(), String> {
|
||||
let Some(attestation_slot) = self.wait_for_attestation_slot().await else {
|
||||
return;
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let Some((duties, attestation_data)) = self
|
||||
.produce_payload_attestation_data(attestation_slot)
|
||||
.await?
|
||||
else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let service = self.clone();
|
||||
self.executor.spawn(
|
||||
async move {
|
||||
service.produce_and_publish(attestation_slot).await;
|
||||
if let Err(e) = service
|
||||
.sign_and_publish(attestation_slot, duties, attestation_data)
|
||||
.await
|
||||
{
|
||||
crit!(error = e, %attestation_slot, "Failed to publish payload attestations");
|
||||
}
|
||||
},
|
||||
"payload_attestation_producer",
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn wait_for_attestation_slot(&self) -> Option<Slot> {
|
||||
@@ -136,11 +153,18 @@ where
|
||||
Some(attestation_slot)
|
||||
}
|
||||
|
||||
async fn produce_and_publish(&self, slot: types::Slot) {
|
||||
/// Produce the payload attestation data for `slot`, returned alongside the duties to sign.
|
||||
///
|
||||
/// Returns `Ok(None)` when there is nothing to publish (no duties, or no block for the slot)
|
||||
/// and `Err` when data production failed.
|
||||
async fn produce_payload_attestation_data(
|
||||
&self,
|
||||
slot: Slot,
|
||||
) -> Result<Option<(Vec<PtcDuty>, PayloadAttestationData)>, String> {
|
||||
let duties = self.duties_service.get_ptc_duties_for_slot(slot);
|
||||
|
||||
if duties.is_empty() {
|
||||
return;
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
debug!(
|
||||
@@ -167,15 +191,10 @@ where
|
||||
%slot,
|
||||
"No block received for slot, skipping payload attestation"
|
||||
);
|
||||
return;
|
||||
return Ok(None);
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
error = %e,
|
||||
%slot,
|
||||
"Failed to produce payload attestation data"
|
||||
);
|
||||
return;
|
||||
return Err(e.to_string());
|
||||
}
|
||||
};
|
||||
|
||||
@@ -186,6 +205,17 @@ where
|
||||
"Received payload attestation data"
|
||||
);
|
||||
|
||||
Ok(Some((duties, attestation_data)))
|
||||
}
|
||||
|
||||
/// Sign `attestation_data` for each duty and publish the resulting messages, preferring SSZ
|
||||
/// and falling back to JSON.
|
||||
async fn sign_and_publish(
|
||||
&self,
|
||||
slot: Slot,
|
||||
duties: Vec<PtcDuty>,
|
||||
attestation_data: PayloadAttestationData,
|
||||
) -> Result<(), String> {
|
||||
let mut messages = Vec::with_capacity(duties.len());
|
||||
|
||||
for duty in &duties {
|
||||
@@ -209,7 +239,7 @@ where
|
||||
}
|
||||
|
||||
if messages.is_empty() {
|
||||
return;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let count = messages.len();
|
||||
@@ -227,42 +257,31 @@ where
|
||||
})
|
||||
.await;
|
||||
|
||||
let result = match result {
|
||||
Ok(()) => Ok(()),
|
||||
Err(_) => {
|
||||
debug!(%slot, "SSZ publish failed, falling back to JSON");
|
||||
self.beacon_nodes
|
||||
.first_success(|beacon_node| {
|
||||
let messages = messages.clone();
|
||||
async move {
|
||||
beacon_node
|
||||
.post_beacon_pool_payload_attestations(&messages, fork_name)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
format!("Failed to publish payload attestations (JSON): {e:?}")
|
||||
})
|
||||
}
|
||||
})
|
||||
.await
|
||||
}
|
||||
};
|
||||
|
||||
match result {
|
||||
Ok(()) => {
|
||||
info!(
|
||||
%slot,
|
||||
%count,
|
||||
"Successfully published payload attestations"
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
crit!(
|
||||
error = %e,
|
||||
%slot,
|
||||
"Failed to publish payload attestations"
|
||||
);
|
||||
}
|
||||
if result.is_err() {
|
||||
debug!(%slot, "SSZ publish failed, falling back to JSON");
|
||||
self.beacon_nodes
|
||||
.first_success(|beacon_node| {
|
||||
let messages = messages.clone();
|
||||
async move {
|
||||
beacon_node
|
||||
.post_beacon_pool_payload_attestations(&messages, fork_name)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
format!("Failed to publish payload attestations (JSON): {e:?}")
|
||||
})
|
||||
}
|
||||
})
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
}
|
||||
|
||||
info!(
|
||||
%slot,
|
||||
%count,
|
||||
"Successfully published payload attestations"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -529,7 +548,15 @@ mod tests {
|
||||
.mock_post_beacon_pool_payload_attestations();
|
||||
|
||||
let service = harness.service;
|
||||
service.produce_and_publish(attestation_slot).await;
|
||||
let (duties, attestation_data) = service
|
||||
.produce_payload_attestation_data(attestation_slot)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
service
|
||||
.sign_and_publish(attestation_slot, duties, attestation_data)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let messages = harness
|
||||
.mock_beacon_node_1
|
||||
@@ -591,7 +618,15 @@ mod tests {
|
||||
.mock_post_beacon_pool_payload_attestations();
|
||||
|
||||
let service = harness.service;
|
||||
service.produce_and_publish(attestation_slot).await;
|
||||
let (duties, attestation_data) = service
|
||||
.produce_payload_attestation_data(attestation_slot)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
service
|
||||
.sign_and_publish(attestation_slot, duties, attestation_data)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// first_success function tries both beacon nodes for SSZ post payload attestation:
|
||||
// first pass: both fail (mock_ssz returns 500, mock_json does not support SSZ)
|
||||
@@ -625,9 +660,16 @@ mod tests {
|
||||
|
||||
let service = harness.service;
|
||||
|
||||
// when there is no duty, produce_and_publish should return early
|
||||
// when there is no duty, data production returns `None` so there is nothing to publish
|
||||
// therefore, the beacon node is not called, expected to hit 0
|
||||
service.produce_and_publish(Slot::new(1)).await;
|
||||
let data = service
|
||||
.produce_payload_attestation_data(Slot::new(1))
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(
|
||||
data.is_none(),
|
||||
"Expected no data to be produced without duties"
|
||||
);
|
||||
mock.expect(0).assert();
|
||||
|
||||
assert!(
|
||||
@@ -665,8 +707,11 @@ mod tests {
|
||||
.mock_post_beacon_pool_payload_attestations();
|
||||
|
||||
let service = harness.service;
|
||||
// The produce_and_publish() should return early before reaching the POST endpoint
|
||||
service.produce_and_publish(attestation_slot).await;
|
||||
// Data production should error before any signing/publishing happens.
|
||||
let result = service
|
||||
.produce_payload_attestation_data(attestation_slot)
|
||||
.await;
|
||||
assert!(result.is_err());
|
||||
|
||||
// Both beacon nodes should not be called at all
|
||||
mock_ssz.expect(0).assert();
|
||||
@@ -712,7 +757,15 @@ mod tests {
|
||||
.mock_post_beacon_pool_payload_attestations_ssz(Duration::from_secs(0));
|
||||
|
||||
let service = harness.service;
|
||||
service.produce_and_publish(attestation_slot).await;
|
||||
let (duties, attestation_data) = service
|
||||
.produce_payload_attestation_data(attestation_slot)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
service
|
||||
.sign_and_publish(attestation_slot, duties, attestation_data)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let messages = harness
|
||||
.mock_beacon_node_1
|
||||
|
||||
Reference in New Issue
Block a user