diff --git a/validator_client/validator_services/src/duties_service.rs b/validator_client/validator_services/src/duties_service.rs index 2a371abf62..19a3936799 100644 --- a/validator_client/validator_services/src/duties_service.rs +++ b/validator_client/validator_services/src/duties_service.rs @@ -1660,54 +1660,8 @@ async fn poll_beacon_proposers( // Only download duties and push out additional block production events if we have some // validators. if !local_pubkeys.is_empty() { - let download_result = duties_service - .beacon_nodes - .first_success(|beacon_node| async move { - let _timer = validator_metrics::start_timer_vec( - &validator_metrics::DUTIES_SERVICE_TIMES, - &[validator_metrics::PROPOSER_DUTIES_HTTP_GET], - ); - beacon_node - .get_validator_duties_proposer(current_epoch) - .await - }) - .await; - - match download_result { - Ok(response) => { - let dependent_root = response.dependent_root; - - let relevant_duties = response - .data - .into_iter() - .filter(|proposer_duty| local_pubkeys.contains(&proposer_duty.pubkey)) - .collect::>(); - - debug!( - %dependent_root, - num_relevant_duties = relevant_duties.len(), - "Downloaded proposer duties" - ); - - if let Some((prior_dependent_root, _)) = duties_service - .proposers - .write() - .insert(current_epoch, (dependent_root, relevant_duties)) - && dependent_root != prior_dependent_root - { - warn!( - %prior_dependent_root, - %dependent_root, - msg = "this may happen from time to time", - "Proposer duties re-org" - ) - } - } - // Don't return early here, we still want to try and produce blocks using the cached values. - Err(e) => error!( - err = %e, - "Failed to download proposer duties" - ), + for epoch in [current_epoch, current_epoch + 1] { + fetch_and_store_proposer_duties(duties_service, epoch, &local_pubkeys).await; } // Compute the block proposers for this slot again, now that we've received an update from @@ -1750,6 +1704,62 @@ async fn poll_beacon_proposers( Ok(()) } +async fn fetch_and_store_proposer_duties( + duties_service: &DutiesService, + epoch: Epoch, + local_pubkeys: &HashSet, +) { + let download_result = duties_service + .beacon_nodes + .first_success(|beacon_node| async move { + let _timer = validator_metrics::start_timer_vec( + &validator_metrics::DUTIES_SERVICE_TIMES, + &[validator_metrics::PROPOSER_DUTIES_HTTP_GET], + ); + beacon_node.get_validator_duties_proposer(epoch).await + }) + .await; + + match download_result { + Ok(response) => { + let dependent_root = response.dependent_root; + + let relevant_duties = response + .data + .into_iter() + .filter(|proposer_duty| local_pubkeys.contains(&proposer_duty.pubkey)) + .collect::>(); + + debug!( + %dependent_root, + %epoch, + num_relevant_duties = relevant_duties.len(), + "Downloaded proposer duties" + ); + + if let Some((prior_dependent_root, _)) = duties_service + .proposers + .write() + .insert(epoch, (dependent_root, relevant_duties)) + && dependent_root != prior_dependent_root + { + warn!( + %prior_dependent_root, + %dependent_root, + %epoch, + msg = "this may happen from time to time", + "Proposer duties re-org" + ) + } + } + Err(e) => error!( + err = %e, + %epoch, + "Failed to download proposer duties" + ), + } +} + /// Query the beacon node for ptc duties for any known validators. async fn poll_beacon_ptc_attesters( duties_service: &Arc>, diff --git a/validator_client/validator_services/src/proposer_preferences_service.rs b/validator_client/validator_services/src/proposer_preferences_service.rs index fbefdf5d96..81905b9c08 100644 --- a/validator_client/validator_services/src/proposer_preferences_service.rs +++ b/validator_client/validator_services/src/proposer_preferences_service.rs @@ -1,12 +1,14 @@ use crate::duties_service::DutiesService; use beacon_node_fallback::BeaconNodeFallback; +use eth2::types::ProposerData; use slot_clock::SlotClock; +use std::collections::HashMap; use std::ops::Deref; use std::sync::Arc; use task_executor::TaskExecutor; use tokio::time::sleep; use tracing::{debug, error, info, warn}; -use types::{ChainSpec, Epoch, EthSpec, ForkName, ProposerPreferences}; +use types::{ChainSpec, Epoch, EthSpec, ForkName, Hash256, ProposerPreferences}; use validator_store::ValidatorStore; pub struct Inner { @@ -66,6 +68,8 @@ impl ProposerPreferencesSer let executor = self.executor.clone(); let interval_fut = async move { + let mut published_preferences: HashMap = HashMap::new(); + loop { let Some(current_slot) = self.slot_clock.now() else { error!("Failed to read slot clock"); @@ -73,29 +77,16 @@ impl ProposerPreferencesSer continue; }; - if !self - .chain_spec - .fork_name_at_slot::(current_slot) - .gloas_enabled() - { - let duration_to_next_epoch = self - .slot_clock - .duration_to_next_epoch(S::E::slots_per_epoch()) - .unwrap_or_else(|| slot_duration * S::E::slots_per_epoch() as u32); - sleep(duration_to_next_epoch).await; - continue; - } - let current_epoch = current_slot.epoch(S::E::slots_per_epoch()); - let fork_name = self.chain_spec.fork_name_at_slot::(current_slot); - self.publish_proposer_preferences(current_epoch, fork_name) + + self.poll_and_publish_preferences(current_epoch, &mut published_preferences) .await; - let duration_to_next_epoch = self + let duration_to_next_slot = self .slot_clock - .duration_to_next_epoch(S::E::slots_per_epoch()) - .unwrap_or_else(|| slot_duration * S::E::slots_per_epoch() as u32); - sleep(duration_to_next_epoch).await; + .duration_to_next_slot() + .unwrap_or(slot_duration); + sleep(duration_to_next_slot).await; } }; @@ -103,15 +94,57 @@ impl ProposerPreferencesSer Ok(()) } - async fn publish_proposer_preferences(&self, current_epoch: Epoch, fork_name: ForkName) { - let (dependent_root, duties) = { - let proposers = self.duties_service.proposers.read(); - match proposers.get(¤t_epoch) { - Some((root, duties)) => (*root, duties.clone()), - None => return, + /// Publish proposer preferences for `current_epoch` and `current_epoch + 1`. + /// Will only publish preferences for a given epoch once per dependent root. + async fn poll_and_publish_preferences( + &self, + current_epoch: Epoch, + published_preferences: &mut HashMap, + ) { + for (epoch, fork_name) in [ + ( + current_epoch, + self.chain_spec.fork_name_at_epoch(current_epoch), + ), + ( + current_epoch + 1, + self.chain_spec.fork_name_at_epoch(current_epoch + 1), + ), + ] { + if !fork_name.gloas_enabled() { + continue; } - }; + let (dependent_root, duties) = { + let proposers = self.duties_service.proposers.read(); + match proposers.get(&epoch) { + Some((root, duties)) => (*root, duties.clone()), + None => continue, + } + }; + + if published_preferences.get(&epoch) == Some(&dependent_root) { + continue; + } + + if self + .publish_proposer_preferences(epoch, fork_name, dependent_root, duties) + .await + { + published_preferences.insert(epoch, dependent_root); + } + } + + published_preferences.retain(|epoch, _| *epoch >= current_epoch); + } + + async fn publish_proposer_preferences( + &self, + epoch: Epoch, + fork_name: ForkName, + dependent_root: Hash256, + duties: Vec, + ) -> bool { let preferences_to_sign: Vec<_> = { let mut result = vec![]; for duty in &duties { @@ -144,11 +177,11 @@ impl ProposerPreferencesSer }; if preferences_to_sign.is_empty() { - return; + return false; } debug!( - %current_epoch, + %epoch, count = preferences_to_sign.len(), "Signing proposer preferences" ); @@ -172,7 +205,7 @@ impl ProposerPreferencesSer } if signed.is_empty() { - return; + return false; } let count = signed.len(); @@ -204,17 +237,19 @@ impl ProposerPreferencesSer match result { Ok(()) => { info!( - %current_epoch, + %epoch, %count, "Successfully published proposer preferences" ); + true } Err(e) => { error!( error = %e, - %current_epoch, + %epoch, "Failed to publish proposer preferences" ); + false } } }