use crate::beacon_node_fallback::{Error as FallbackError, Errors}; use crate::{ beacon_node_fallback::{BeaconNodeFallback, RequireSynced}, determine_graffiti, graffiti_file::GraffitiFile, OfflineOnFailure, }; use crate::{http_metrics::metrics, validator_store::ValidatorStore}; use bls::SignatureBytes; use environment::RuntimeContext; use eth2::types::{BlockContents, SignedBlockContents}; use eth2::BeaconNodeHttpClient; use slog::{crit, debug, error, info, trace, warn, Logger}; use slot_clock::SlotClock; use std::ops::Deref; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; use tokio::time::sleep; use types::{ AbstractExecPayload, BlindedPayload, BlockType, EthSpec, FullPayload, Graffiti, PublicKeyBytes, Slot, }; #[derive(Debug)] pub enum BlockError { Recoverable(String), Irrecoverable(String), } impl From> for BlockError { fn from(e: Errors) -> Self { if e.0.iter().any(|(_, error)| { matches!( error, FallbackError::RequestFailed(BlockError::Irrecoverable(_)) ) }) { BlockError::Irrecoverable(e.to_string()) } else { BlockError::Recoverable(e.to_string()) } } } /// Builds a `BlockService`. pub struct BlockServiceBuilder { validator_store: Option>>, slot_clock: Option>, beacon_nodes: Option>>, context: Option>, graffiti: Option, graffiti_file: Option, block_delay: Option, } impl BlockServiceBuilder { pub fn new() -> Self { Self { validator_store: None, slot_clock: None, beacon_nodes: None, context: None, graffiti: None, graffiti_file: None, block_delay: None, } } pub fn validator_store(mut self, store: Arc>) -> Self { self.validator_store = Some(store); self } pub fn slot_clock(mut self, slot_clock: T) -> Self { self.slot_clock = Some(Arc::new(slot_clock)); self } pub fn beacon_nodes(mut self, beacon_nodes: Arc>) -> Self { self.beacon_nodes = Some(beacon_nodes); self } pub fn runtime_context(mut self, context: RuntimeContext) -> Self { self.context = Some(context); self } pub fn graffiti(mut self, graffiti: Option) -> Self { self.graffiti = graffiti; self } pub fn graffiti_file(mut self, graffiti_file: Option) -> Self { self.graffiti_file = graffiti_file; self } pub fn block_delay(mut self, block_delay: Option) -> Self { self.block_delay = block_delay; self } pub fn build(self) -> Result, String> { Ok(BlockService { inner: Arc::new(Inner { validator_store: self .validator_store .ok_or("Cannot build BlockService without validator_store")?, slot_clock: self .slot_clock .ok_or("Cannot build BlockService without slot_clock")?, beacon_nodes: self .beacon_nodes .ok_or("Cannot build BlockService without beacon_node")?, context: self .context .ok_or("Cannot build BlockService without runtime_context")?, graffiti: self.graffiti, graffiti_file: self.graffiti_file, block_delay: self.block_delay, }), }) } } /// Helper to minimise `Arc` usage. pub struct Inner { validator_store: Arc>, slot_clock: Arc, beacon_nodes: Arc>, context: RuntimeContext, graffiti: Option, graffiti_file: Option, block_delay: Option, } /// Attempts to produce attestations for any block producer(s) at the start of the epoch. pub struct BlockService { inner: Arc>, } impl Clone for BlockService { fn clone(&self) -> Self { Self { inner: self.inner.clone(), } } } impl Deref for BlockService { type Target = Inner; fn deref(&self) -> &Self::Target { self.inner.deref() } } /// Notification from the duties service that we should try to produce a block. pub struct BlockServiceNotification { pub slot: Slot, pub block_proposers: Vec, } impl BlockService { pub fn start_update_service( self, mut notification_rx: mpsc::Receiver, ) -> Result<(), String> { let log = self.context.log().clone(); info!(log, "Block production service started"); let executor = self.inner.context.executor.clone(); executor.spawn( async move { while let Some(notif) = notification_rx.recv().await { let service = self.clone(); if let Some(delay) = service.block_delay { debug!( service.context.log(), "Delaying block production by {}ms", delay.as_millis() ); sleep(delay).await; } service.do_update(notif).await.ok(); } debug!(log, "Block service shutting down"); }, "block_service", ); Ok(()) } /// Attempt to produce a block for any block producers in the `ValidatorStore`. async fn do_update(&self, notification: BlockServiceNotification) -> Result<(), ()> { let log = self.context.log(); let _timer = metrics::start_timer_vec(&metrics::BLOCK_SERVICE_TIMES, &[metrics::FULL_UPDATE]); let slot = self.slot_clock.now().ok_or_else(move || { crit!(log, "Duties manager failed to read slot clock"); })?; if notification.slot != slot { warn!( log, "Skipping block production for expired slot"; "current_slot" => slot.as_u64(), "notification_slot" => notification.slot.as_u64(), "info" => "Your machine could be overloaded" ); return Ok(()); } if slot == self.context.eth2_config.spec.genesis_slot { debug!( log, "Not producing block at genesis slot"; "proposers" => format!("{:?}", notification.block_proposers), ); return Ok(()); } trace!( log, "Block service update started"; "slot" => slot.as_u64() ); let proposers = notification.block_proposers; if proposers.is_empty() { trace!( log, "No local block proposers for this slot"; "slot" => slot.as_u64() ) } else if proposers.len() > 1 { error!( log, "Multiple block proposers for this slot"; "action" => "producing blocks for all proposers", "num_proposers" => proposers.len(), "slot" => slot.as_u64(), ) } for validator_pubkey in proposers { let builder_proposals = self .validator_store .get_builder_proposals(&validator_pubkey); let service = self.clone(); let log = log.clone(); self.inner.context.executor.spawn( async move { let publish_result = if builder_proposals { let mut result = service.clone() .publish_block::>(slot, validator_pubkey) .await; match result.as_ref() { Err(BlockError::Recoverable(e)) => { error!(log, "Error whilst producing a blinded block, attempting to \ publish full block"; "error" => ?e); result = service .publish_block::>(slot, validator_pubkey) .await; }, Err(BlockError::Irrecoverable(e)) => { error!(log, "Error whilst producing a blinded block, cannot fallback \ because the block was signed"; "error" => ?e); }, _ => {}, }; result } else { service .publish_block::>(slot, validator_pubkey) .await }; if let Err(e) = publish_result { crit!( log, "Error whilst producing block"; "message" => ?e ); } }, "block service", ); } Ok(()) } /// Produce a block at the given slot for validator_pubkey async fn publish_block>( self, slot: Slot, validator_pubkey: PublicKeyBytes, ) -> Result<(), BlockError> { let log = self.context.log(); let _timer = metrics::start_timer_vec(&metrics::BLOCK_SERVICE_TIMES, &[metrics::BEACON_BLOCK]); let current_slot = self.slot_clock.now().ok_or_else(|| { BlockError::Recoverable("Unable to determine current slot from clock".to_string()) })?; let randao_reveal = self .validator_store .randao_reveal(validator_pubkey, slot.epoch(E::slots_per_epoch())) .await .map_err(|e| { BlockError::Recoverable(format!( "Unable to produce randao reveal signature: {:?}", e )) })? .into(); let graffiti = determine_graffiti( &validator_pubkey, log, self.graffiti_file.clone(), self.validator_store.graffiti(&validator_pubkey), self.graffiti, ); let randao_reveal_ref = &randao_reveal; let self_ref = &self; let proposer_index = self.validator_store.validator_index(&validator_pubkey); let validator_pubkey_ref = &validator_pubkey; info!( log, "Requesting unsigned block"; "slot" => slot.as_u64(), ); // Request block from first responsive beacon node. let block_contents = self .beacon_nodes .first_success( RequireSynced::No, OfflineOnFailure::Yes, move |beacon_node| { Self::get_validator_block( beacon_node, slot, randao_reveal_ref, graffiti, proposer_index, log, ) }, ) .await?; let (block, maybe_blob_sidecars) = block_contents.deconstruct(); let signing_timer = metrics::start_timer(&metrics::BLOCK_SIGNING_TIMES); let signed_block = self_ref .validator_store .sign_block::(*validator_pubkey_ref, block, current_slot) .await .map_err(|e| BlockError::Recoverable(format!("Unable to sign block: {:?}", e)))?; let maybe_signed_blobs = match maybe_blob_sidecars { Some(blob_sidecars) => Some( self_ref .validator_store .sign_blobs(*validator_pubkey_ref, blob_sidecars) .await .map_err(|e| { BlockError::Recoverable(format!("Unable to sign blob: {:?}", e)) })?, ), None => None, }; let signing_time_ms = Duration::from_secs_f64(signing_timer.map_or(0.0, |t| t.stop_and_record())).as_millis(); info!( log, "Publishing signed block"; "slot" => slot.as_u64(), "signing_time_ms" => signing_time_ms, ); let signed_block_contents = SignedBlockContents::from((signed_block, maybe_signed_blobs)); // Publish block with first available beacon node. self.beacon_nodes .first_success( RequireSynced::No, OfflineOnFailure::Yes, |beacon_node| async { Self::publish_signed_block_contents::( &signed_block_contents, beacon_node, ) .await }, ) .await?; info!( log, "Successfully published block"; "block_type" => ?Payload::block_type(), "deposits" => signed_block_contents.signed_block().message().body().deposits().len(), "attestations" => signed_block_contents.signed_block().message().body().attestations().len(), "graffiti" => ?graffiti.map(|g| g.as_utf8_lossy()), "slot" => signed_block_contents.signed_block().slot().as_u64(), ); Ok(()) } async fn publish_signed_block_contents>( signed_block_contents: &SignedBlockContents, beacon_node: &BeaconNodeHttpClient, ) -> Result<(), BlockError> { match Payload::block_type() { BlockType::Full => { let _post_timer = metrics::start_timer_vec( &metrics::BLOCK_SERVICE_TIMES, &[metrics::BEACON_BLOCK_HTTP_POST], ); beacon_node .post_beacon_blocks(signed_block_contents) .await .map_err(|e| { BlockError::Irrecoverable(format!( "Error from beacon node when publishing block: {:?}", e )) })? } BlockType::Blinded => { let _post_timer = metrics::start_timer_vec( &metrics::BLOCK_SERVICE_TIMES, &[metrics::BLINDED_BEACON_BLOCK_HTTP_POST], ); todo!("need to be adjusted for blobs"); // beacon_node // .post_beacon_blinded_blocks(signed_block_contents.signed_block()) // .await // .map_err(|e| { // BlockError::Irrecoverable(format!( // "Error from beacon node when publishing block: {:?}", // e // )) // })? } } Ok::<_, BlockError>(()) } async fn get_validator_block>( beacon_node: &BeaconNodeHttpClient, slot: Slot, randao_reveal_ref: &SignatureBytes, graffiti: Option, proposer_index: Option, log: &Logger, ) -> Result, BlockError> { let block_contents: BlockContents = match Payload::block_type() { BlockType::Full => { let _get_timer = metrics::start_timer_vec( &metrics::BLOCK_SERVICE_TIMES, &[metrics::BEACON_BLOCK_HTTP_GET], ); beacon_node .get_validator_blocks::(slot, randao_reveal_ref, graffiti.as_ref()) .await .map_err(|e| { BlockError::Recoverable(format!( "Error from beacon node when producing block: {:?}", e )) })? .data } BlockType::Blinded => { let _get_timer = metrics::start_timer_vec( &metrics::BLOCK_SERVICE_TIMES, &[metrics::BLINDED_BEACON_BLOCK_HTTP_GET], ); todo!("implement blinded flow for blobs"); // beacon_node // .get_validator_blinded_blocks::( // slot, // randao_reveal_ref, // graffiti.as_ref(), // ) // .await // .map_err(|e| { // BlockError::Recoverable(format!( // "Error from beacon node when producing block: {:?}", // e // )) // })? // .data } }; info!( log, "Received unsigned block"; "slot" => slot.as_u64(), ); if proposer_index != Some(block_contents.block().proposer_index()) { return Err(BlockError::Recoverable( "Proposer index does not match block proposer. Beacon chain re-orged".to_string(), )); } Ok::<_, BlockError>(block_contents) } }