Fix race condition betrween duties service and proposer preferences

This commit is contained in:
Eitan Seri- Levi
2026-05-16 14:09:25 -07:00
parent 1a68631180
commit 3bddfff153
2 changed files with 125 additions and 80 deletions

View File

@@ -1660,54 +1660,8 @@ async fn poll_beacon_proposers<S: ValidatorStore, T: SlotClock + 'static>(
// Only download duties and push out additional block production events if we have some // Only download duties and push out additional block production events if we have some
// validators. // validators.
if !local_pubkeys.is_empty() { if !local_pubkeys.is_empty() {
let download_result = duties_service for epoch in [current_epoch, current_epoch + 1] {
.beacon_nodes fetch_and_store_proposer_duties(duties_service, epoch, &local_pubkeys).await;
.first_success(|beacon_node| async move {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::DUTIES_SERVICE_TIMES,
&[validator_metrics::PROPOSER_DUTIES_HTTP_GET],
);
beacon_node
.get_validator_duties_proposer(current_epoch)
.await
})
.await;
match download_result {
Ok(response) => {
let dependent_root = response.dependent_root;
let relevant_duties = response
.data
.into_iter()
.filter(|proposer_duty| local_pubkeys.contains(&proposer_duty.pubkey))
.collect::<Vec<_>>();
debug!(
%dependent_root,
num_relevant_duties = relevant_duties.len(),
"Downloaded proposer duties"
);
if let Some((prior_dependent_root, _)) = duties_service
.proposers
.write()
.insert(current_epoch, (dependent_root, relevant_duties))
&& dependent_root != prior_dependent_root
{
warn!(
%prior_dependent_root,
%dependent_root,
msg = "this may happen from time to time",
"Proposer duties re-org"
)
}
}
// Don't return early here, we still want to try and produce blocks using the cached values.
Err(e) => error!(
err = %e,
"Failed to download proposer duties"
),
} }
// Compute the block proposers for this slot again, now that we've received an update from // Compute the block proposers for this slot again, now that we've received an update from
@@ -1750,6 +1704,62 @@ async fn poll_beacon_proposers<S: ValidatorStore, T: SlotClock + 'static>(
Ok(()) Ok(())
} }
async fn fetch_and_store_proposer_duties<S: ValidatorStore, T: SlotClock + 'static>(
duties_service: &DutiesService<S, T>,
epoch: Epoch,
local_pubkeys: &HashSet<PublicKeyBytes>,
) {
let download_result = duties_service
.beacon_nodes
.first_success(|beacon_node| async move {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::DUTIES_SERVICE_TIMES,
&[validator_metrics::PROPOSER_DUTIES_HTTP_GET],
);
beacon_node.get_validator_duties_proposer(epoch).await
})
.await;
match download_result {
Ok(response) => {
let dependent_root = response.dependent_root;
let relevant_duties = response
.data
.into_iter()
.filter(|proposer_duty| local_pubkeys.contains(&proposer_duty.pubkey))
.collect::<Vec<_>>();
debug!(
%dependent_root,
%epoch,
num_relevant_duties = relevant_duties.len(),
"Downloaded proposer duties"
);
if let Some((prior_dependent_root, _)) = duties_service
.proposers
.write()
.insert(epoch, (dependent_root, relevant_duties))
&& dependent_root != prior_dependent_root
{
warn!(
%prior_dependent_root,
%dependent_root,
%epoch,
msg = "this may happen from time to time",
"Proposer duties re-org"
)
}
}
Err(e) => error!(
err = %e,
%epoch,
"Failed to download proposer duties"
),
}
}
/// Query the beacon node for ptc duties for any known validators. /// Query the beacon node for ptc duties for any known validators.
async fn poll_beacon_ptc_attesters<S: ValidatorStore + 'static, T: SlotClock + 'static>( async fn poll_beacon_ptc_attesters<S: ValidatorStore + 'static, T: SlotClock + 'static>(
duties_service: &Arc<DutiesService<S, T>>, duties_service: &Arc<DutiesService<S, T>>,

View File

@@ -1,12 +1,14 @@
use crate::duties_service::DutiesService; use crate::duties_service::DutiesService;
use beacon_node_fallback::BeaconNodeFallback; use beacon_node_fallback::BeaconNodeFallback;
use eth2::types::ProposerData;
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::collections::HashMap;
use std::ops::Deref; use std::ops::Deref;
use std::sync::Arc; use std::sync::Arc;
use task_executor::TaskExecutor; use task_executor::TaskExecutor;
use tokio::time::sleep; use tokio::time::sleep;
use tracing::{debug, error, info, warn}; use tracing::{debug, error, info, warn};
use types::{ChainSpec, Epoch, EthSpec, ForkName, ProposerPreferences}; use types::{ChainSpec, Epoch, EthSpec, ForkName, Hash256, ProposerPreferences};
use validator_store::ValidatorStore; use validator_store::ValidatorStore;
pub struct Inner<S, T> { pub struct Inner<S, T> {
@@ -66,6 +68,8 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> ProposerPreferencesSer
let executor = self.executor.clone(); let executor = self.executor.clone();
let interval_fut = async move { let interval_fut = async move {
let mut published_preferences: HashMap<Epoch, Hash256> = HashMap::new();
loop { loop {
let Some(current_slot) = self.slot_clock.now() else { let Some(current_slot) = self.slot_clock.now() else {
error!("Failed to read slot clock"); error!("Failed to read slot clock");
@@ -73,29 +77,16 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> ProposerPreferencesSer
continue; continue;
}; };
if !self
.chain_spec
.fork_name_at_slot::<S::E>(current_slot)
.gloas_enabled()
{
let duration_to_next_epoch = self
.slot_clock
.duration_to_next_epoch(S::E::slots_per_epoch())
.unwrap_or_else(|| slot_duration * S::E::slots_per_epoch() as u32);
sleep(duration_to_next_epoch).await;
continue;
}
let current_epoch = current_slot.epoch(S::E::slots_per_epoch()); let current_epoch = current_slot.epoch(S::E::slots_per_epoch());
let fork_name = self.chain_spec.fork_name_at_slot::<S::E>(current_slot);
self.publish_proposer_preferences(current_epoch, fork_name) self.poll_and_publish_preferences(current_epoch, &mut published_preferences)
.await; .await;
let duration_to_next_epoch = self let duration_to_next_slot = self
.slot_clock .slot_clock
.duration_to_next_epoch(S::E::slots_per_epoch()) .duration_to_next_slot()
.unwrap_or_else(|| slot_duration * S::E::slots_per_epoch() as u32); .unwrap_or(slot_duration);
sleep(duration_to_next_epoch).await; sleep(duration_to_next_slot).await;
} }
}; };
@@ -103,15 +94,57 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> ProposerPreferencesSer
Ok(()) Ok(())
} }
async fn publish_proposer_preferences(&self, current_epoch: Epoch, fork_name: ForkName) { /// Publish proposer preferences for `current_epoch` and `current_epoch + 1`.
let (dependent_root, duties) = { /// Will only publish preferences for a given epoch once per dependent root.
let proposers = self.duties_service.proposers.read(); async fn poll_and_publish_preferences(
match proposers.get(&current_epoch) { &self,
Some((root, duties)) => (*root, duties.clone()), current_epoch: Epoch,
None => return, published_preferences: &mut HashMap<Epoch, Hash256>,
) {
for (epoch, fork_name) in [
(
current_epoch,
self.chain_spec.fork_name_at_epoch(current_epoch),
),
(
current_epoch + 1,
self.chain_spec.fork_name_at_epoch(current_epoch + 1),
),
] {
if !fork_name.gloas_enabled() {
continue;
} }
};
let (dependent_root, duties) = {
let proposers = self.duties_service.proposers.read();
match proposers.get(&epoch) {
Some((root, duties)) => (*root, duties.clone()),
None => continue,
}
};
if published_preferences.get(&epoch) == Some(&dependent_root) {
continue;
}
if self
.publish_proposer_preferences(epoch, fork_name, dependent_root, duties)
.await
{
published_preferences.insert(epoch, dependent_root);
}
}
published_preferences.retain(|epoch, _| *epoch >= current_epoch);
}
async fn publish_proposer_preferences(
&self,
epoch: Epoch,
fork_name: ForkName,
dependent_root: Hash256,
duties: Vec<ProposerData>,
) -> bool {
let preferences_to_sign: Vec<_> = { let preferences_to_sign: Vec<_> = {
let mut result = vec![]; let mut result = vec![];
for duty in &duties { for duty in &duties {
@@ -144,11 +177,11 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> ProposerPreferencesSer
}; };
if preferences_to_sign.is_empty() { if preferences_to_sign.is_empty() {
return; return false;
} }
debug!( debug!(
%current_epoch, %epoch,
count = preferences_to_sign.len(), count = preferences_to_sign.len(),
"Signing proposer preferences" "Signing proposer preferences"
); );
@@ -172,7 +205,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> ProposerPreferencesSer
} }
if signed.is_empty() { if signed.is_empty() {
return; return false;
} }
let count = signed.len(); let count = signed.len();
@@ -204,17 +237,19 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> ProposerPreferencesSer
match result { match result {
Ok(()) => { Ok(()) => {
info!( info!(
%current_epoch, %epoch,
%count, %count,
"Successfully published proposer preferences" "Successfully published proposer preferences"
); );
true
} }
Err(e) => { Err(e) => {
error!( error!(
error = %e, error = %e,
%current_epoch, %epoch,
"Failed to publish proposer preferences" "Failed to publish proposer preferences"
); );
false
} }
} }
} }