use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced}; use crate::{ fee_recipient_file::FeeRecipientFile, validator_store::{DoppelgangerStatus, ValidatorStore}, }; use environment::RuntimeContext; use slog::{debug, error, info}; use slot_clock::SlotClock; use std::ops::Deref; use std::sync::Arc; use tokio::time::{sleep, Duration}; use types::{Address, ChainSpec, EthSpec, ProposerPreparationData}; /// Builds an `PreparationService`. pub struct PreparationServiceBuilder { validator_store: Option>>, slot_clock: Option, beacon_nodes: Option>>, context: Option>, fee_recipient: Option
, fee_recipient_file: Option, } impl PreparationServiceBuilder { pub fn new() -> Self { Self { validator_store: None, slot_clock: None, beacon_nodes: None, context: None, fee_recipient: None, fee_recipient_file: None, } } pub fn validator_store(mut self, store: Arc>) -> Self { self.validator_store = Some(store); self } pub fn slot_clock(mut self, slot_clock: T) -> Self { self.slot_clock = Some(slot_clock); self } pub fn beacon_nodes(mut self, beacon_nodes: Arc>) -> Self { self.beacon_nodes = Some(beacon_nodes); self } pub fn runtime_context(mut self, context: RuntimeContext) -> Self { self.context = Some(context); self } pub fn fee_recipient(mut self, fee_recipient: Option
) -> Self { self.fee_recipient = fee_recipient; self } pub fn fee_recipient_file(mut self, fee_recipient_file: Option) -> Self { self.fee_recipient_file = fee_recipient_file; self } pub fn build(self) -> Result, String> { Ok(PreparationService { inner: Arc::new(Inner { validator_store: self .validator_store .ok_or("Cannot build PreparationService without validator_store")?, slot_clock: self .slot_clock .ok_or("Cannot build PreparationService without slot_clock")?, beacon_nodes: self .beacon_nodes .ok_or("Cannot build PreparationService without beacon_nodes")?, context: self .context .ok_or("Cannot build PreparationService without runtime_context")?, fee_recipient: self.fee_recipient, fee_recipient_file: self.fee_recipient_file, }), }) } } /// Helper to minimise `Arc` usage. pub struct Inner { validator_store: Arc>, slot_clock: T, beacon_nodes: Arc>, context: RuntimeContext, fee_recipient: Option
, fee_recipient_file: Option, } /// Attempts to produce proposer preparations for all known validators at the beginning of each epoch. pub struct PreparationService { inner: Arc>, } impl Clone for PreparationService { fn clone(&self) -> Self { Self { inner: self.inner.clone(), } } } impl Deref for PreparationService { type Target = Inner; fn deref(&self) -> &Self::Target { self.inner.deref() } } impl PreparationService { /// Starts the service which periodically produces proposer preparations. pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> { let log = self.context.log().clone(); let slot_duration = Duration::from_secs(spec.seconds_per_slot); let duration_to_next_epoch = self .slot_clock .duration_to_next_epoch(E::slots_per_epoch()) .ok_or("Unable to determine duration to next epoch")?; info!( log, "Proposer preparation service started"; "next_update_millis" => duration_to_next_epoch.as_millis() ); let executor = self.context.executor.clone(); let spec = spec.clone(); let interval_fut = async move { loop { // Poll the endpoint immediately to ensure fee recipients are received. self.prepare_proposers_and_publish(&spec) .await .map_err(|e| { error!( log, "Error during proposer preparation"; "error" => format!("{:?}", e), ) }) .unwrap_or(()); if let Some(duration_to_next_slot) = self.slot_clock.duration_to_next_slot() { sleep(duration_to_next_slot).await; } else { error!(log, "Failed to read slot clock"); // If we can't read the slot clock, just wait another slot. sleep(slot_duration).await; } } }; executor.spawn(interval_fut, "preparation_service"); Ok(()) } /// Prepare proposer preparations and send to beacon node async fn prepare_proposers_and_publish(&self, spec: &ChainSpec) -> Result<(), String> { let preparation_data = self.collect_preparation_data(spec); if !preparation_data.is_empty() { self.publish_preparation_data(preparation_data).await?; } Ok(()) } fn collect_preparation_data(&self, spec: &ChainSpec) -> Vec { let log = self.context.log(); let fee_recipient_file = self .fee_recipient_file .clone() .map(|mut fee_recipient_file| { fee_recipient_file .read_fee_recipient_file() .map_err(|e| { error!( log, "{}", format!("Error loading fee-recipient file: {:?}", e); ); }) .unwrap_or(()); fee_recipient_file }); let all_pubkeys: Vec<_> = self .validator_store .voting_pubkeys(DoppelgangerStatus::ignored); all_pubkeys .into_iter() .filter_map(|pubkey| { let validator_index = self.validator_store.validator_index(&pubkey); if let Some(validator_index) = validator_index { let fee_recipient = if let Some(from_validator_defs) = self.validator_store.suggested_fee_recipient(&pubkey) { // If there is a `suggested_fee_recipient` in the validator definitions yaml // file, use that value. Some(from_validator_defs) } else { // If there's nothing in the validator defs file, check the fee recipient // file. fee_recipient_file .as_ref() .and_then(|f| match f.get_fee_recipient(&pubkey) { Ok(f) => f, Err(_e) => None, }) // If there's nothing in the file, try the process-level default value. .or(self.fee_recipient) }; if let Some(fee_recipient) = fee_recipient { Some(ProposerPreparationData { validator_index, fee_recipient, }) } else { if spec.bellatrix_fork_epoch.is_some() { error!( log, "Validator is missing fee recipient"; "msg" => "update validator_definitions.yml", "pubkey" => ?pubkey ); } None } } else { None } }) .collect() } async fn publish_preparation_data( &self, preparation_data: Vec, ) -> Result<(), String> { let log = self.context.log(); // Post the proposer preparations to the BN. let preparation_data_len = preparation_data.len(); let preparation_entries = preparation_data.as_slice(); match self .beacon_nodes .first_success(RequireSynced::Yes, |beacon_node| async move { beacon_node .post_validator_prepare_beacon_proposer(preparation_entries) .await }) .await { Ok(()) => debug!( log, "Published proposer preparation"; "count" => preparation_data_len, ), Err(e) => error!( log, "Unable to publish proposer preparation"; "error" => %e, ), } Ok(()) } }