Add payload attestation validator duty

This commit is contained in:
Eitan Seri-Levi
2026-04-27 08:48:06 +02:00
parent fae7941b2d
commit 0a1bdf840b
10 changed files with 396 additions and 13 deletions

View File

@@ -3,6 +3,7 @@ pub mod block_service;
pub mod duties_service;
pub mod latency_service;
pub mod notifier_service;
pub mod payload_attestation_service;
pub mod preparation_service;
pub mod sync;
pub mod sync_committee_service;

View File

@@ -0,0 +1,262 @@
use crate::duties_service::DutiesService;
use beacon_node_fallback::BeaconNodeFallback;
use logging::crit;
use slot_clock::SlotClock;
use std::ops::Deref;
use std::sync::Arc;
use task_executor::TaskExecutor;
use tokio::time::{Duration, sleep};
use tracing::{debug, error, info, warn};
use types::{ChainSpec, EthSpec};
use validator_store::ValidatorStore;
pub struct PayloadAttestationServiceBuilder<S: ValidatorStore, T: SlotClock + 'static> {
duties_service: Option<Arc<DutiesService<S, T>>>,
validator_store: Option<Arc<S>>,
slot_clock: Option<T>,
beacon_nodes: Option<Arc<BeaconNodeFallback<T>>>,
executor: Option<TaskExecutor>,
chain_spec: Option<Arc<ChainSpec>>,
}
impl<S: ValidatorStore + 'static, T: SlotClock + 'static> PayloadAttestationServiceBuilder<S, T> {
pub fn new() -> Self {
Self {
duties_service: None,
validator_store: None,
slot_clock: None,
beacon_nodes: None,
executor: None,
chain_spec: None,
}
}
pub fn duties_service(mut self, service: Arc<DutiesService<S, T>>) -> Self {
self.duties_service = Some(service);
self
}
pub fn validator_store(mut self, store: Arc<S>) -> Self {
self.validator_store = Some(store);
self
}
pub fn slot_clock(mut self, slot_clock: T) -> Self {
self.slot_clock = Some(slot_clock);
self
}
pub fn beacon_nodes(mut self, beacon_nodes: Arc<BeaconNodeFallback<T>>) -> Self {
self.beacon_nodes = Some(beacon_nodes);
self
}
pub fn executor(mut self, executor: TaskExecutor) -> Self {
self.executor = Some(executor);
self
}
pub fn chain_spec(mut self, chain_spec: Arc<ChainSpec>) -> Self {
self.chain_spec = Some(chain_spec);
self
}
pub fn build(self) -> Result<PayloadAttestationService<S, T>, String> {
Ok(PayloadAttestationService {
inner: Arc::new(Inner {
duties_service: self
.duties_service
.ok_or("Cannot build PayloadAttestationService without duties_service")?,
validator_store: self
.validator_store
.ok_or("Cannot build PayloadAttestationService without validator_store")?,
slot_clock: self
.slot_clock
.ok_or("Cannot build PayloadAttestationService without slot_clock")?,
beacon_nodes: self
.beacon_nodes
.ok_or("Cannot build PayloadAttestationService without beacon_nodes")?,
executor: self
.executor
.ok_or("Cannot build PayloadAttestationService without executor")?,
chain_spec: self
.chain_spec
.ok_or("Cannot build PayloadAttestationService without chain_spec")?,
}),
})
}
}
pub struct Inner<S, T> {
duties_service: Arc<DutiesService<S, T>>,
validator_store: Arc<S>,
slot_clock: T,
beacon_nodes: Arc<BeaconNodeFallback<T>>,
executor: TaskExecutor,
chain_spec: Arc<ChainSpec>,
}
pub struct PayloadAttestationService<S, T> {
inner: Arc<Inner<S, T>>,
}
impl<S, T> Clone for PayloadAttestationService<S, T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<S, T> Deref for PayloadAttestationService<S, T> {
type Target = Inner<S, T>;
fn deref(&self) -> &Self::Target {
self.inner.deref()
}
}
impl<S: ValidatorStore + 'static, T: SlotClock + 'static> PayloadAttestationService<S, T> {
pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> {
let slot_duration = spec.get_slot_duration();
let payload_attestation_due = spec.get_payload_attestation_due();
info!(
payload_attestation_due_ms = payload_attestation_due.as_millis(),
"Payload attestation service started"
);
let executor = self.executor.clone();
let interval_fut = async move {
loop {
let Some(duration_to_next_slot) = self.slot_clock.duration_to_next_slot() else {
error!("Failed to read slot clock");
sleep(slot_duration).await;
continue;
};
sleep(duration_to_next_slot + payload_attestation_due).await;
let Some(current_slot) = self.slot_clock.now() else {
error!("Failed to read slot clock after trigger");
continue;
};
let duties = self.duties_service.get_ptc_duties_for_slot(current_slot);
if duties.is_empty() {
continue;
}
debug!(
%current_slot,
duty_count = duties.len(),
"Producing payload attestations"
);
let service = self.clone();
self.executor.spawn(
async move {
service.produce_and_publish(current_slot).await;
},
"payload_attestation_producer",
);
}
};
executor.spawn(interval_fut, "payload_attestation_service");
Ok(())
}
async fn produce_and_publish(&self, slot: types::Slot) {
let duties = self.duties_service.get_ptc_duties_for_slot(slot);
if duties.is_empty() {
return;
}
let attestation_data = match self
.beacon_nodes
.first_success(|beacon_node| async move {
beacon_node
.get_validator_payload_attestation_data(slot)
.await
.map_err(|e| format!("Failed to get payload attestation data: {e:?}"))
.map(|resp| resp.into_data())
})
.await
{
Ok(data) => data,
Err(e) => {
crit!(
error = %e,
%slot,
"Failed to produce payload attestation data"
);
return;
}
};
debug!(
%slot,
beacon_block_root = ?attestation_data.beacon_block_root,
payload_present = attestation_data.payload_present,
"Received payload attestation data"
);
let mut messages = Vec::with_capacity(duties.len());
for duty in &duties {
match self
.validator_store
.sign_payload_attestation(duty.pubkey, attestation_data.clone())
.await
{
Ok(message) => {
messages.push(message);
}
Err(e) => {
crit!(
error = ?e,
validator = ?duty.pubkey,
%slot,
"Failed to sign payload attestation"
);
}
}
}
if messages.is_empty() {
return;
}
let count = messages.len();
match self
.beacon_nodes
.first_success(|beacon_node| {
let messages = messages.clone();
async move {
beacon_node
.post_beacon_pool_payload_attestations(&messages)
.await
.map_err(|e| format!("Failed to publish payload attestations: {e:?}"))
}
})
.await
{
Ok(()) => {
info!(
%slot,
%count,
"Successfully published payload attestations"
);
}
Err(e) => {
crit!(
error = %e,
%slot,
"Failed to publish payload attestations"
);
}
}
}
}