From c329fae53c86f549e5cbd2d85f74adac0665c1d7 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 27 Sep 2021 15:14:22 +1000 Subject: [PATCH] Call forkchoiceUpdated --- beacon_node/beacon_chain/src/beacon_chain.rs | 66 ++++++++++++++++++++ beacon_node/beacon_chain/src/errors.rs | 2 + beacon_node/execution_layer/src/lib.rs | 47 ++++++++++++-- 3 files changed, 111 insertions(+), 4 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 25755c90c2..38602ffb76 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3037,6 +3037,14 @@ impl BeaconChain { .start_slot(T::EthSpec::slots_per_epoch()); let head_proposer_index = new_head.beacon_block.message().proposer_index(); + // Used later for the execution engine. + let new_head_execution_block_hash = new_head + .beacon_block + .message() + .body() + .execution_payload() + .map(|ep| ep.block_hash); + drop(lag_timer); // Update the snapshot that stores the head of the chain at the time it received the @@ -3174,9 +3182,67 @@ impl BeaconChain { } } + // If this is a post-merge block, update the execution layer. + if let Some(new_head_execution_block_hash) = new_head_execution_block_hash { + let execution_layer = self + .execution_layer + .clone() + .ok_or(Error::ExecutionLayerMissing)?; + let store = self.store.clone(); + let log = self.log.clone(); + + // Spawn the update task, without waiting for it to complete. + execution_layer.spawn( + move |execution_layer| async move { + if let Err(e) = Self::update_execution_engine_forkchoice( + execution_layer, + store, + new_finalized_checkpoint.root, + new_head_execution_block_hash, + ) + .await + { + error!( + log, + "Failed to update execution head"; + "error" => ?e + ); + } + }, + "update_execution_engine_forkchoice", + ) + } + Ok(()) } + pub async fn update_execution_engine_forkchoice( + execution_layer: ExecutionLayer, + store: BeaconStore, + finalized_beacon_block_root: Hash256, + head_execution_block_hash: Hash256, + ) -> Result<(), Error> { + // Loading the finalized block from the store is not ideal. Perhaps it would be better to + // store it on fork-choice so we can do a lookup without hitting the database. + // + // See: https://github.com/sigp/lighthouse/pull/2627#issuecomment-927537245 + let finalized_block = store + .get_block(&finalized_beacon_block_root)? + .ok_or(Error::MissingBeaconBlock(finalized_beacon_block_root))?; + + let finalized_execution_block_hash = finalized_block + .message() + .body() + .execution_payload() + .map(|ep| ep.block_hash) + .unwrap_or_else(Hash256::zero); + + execution_layer + .forkchoice_updated(head_execution_block_hash, finalized_execution_block_hash) + .await + .map_err(Error::ExecutionForkChoiceUpdateFailed) + } + /// This function takes a configured weak subjectivity `Checkpoint` and the latest finalized `Checkpoint`. /// If the weak subjectivity checkpoint and finalized checkpoint share the same epoch, we compare /// roots. If we the weak subjectivity checkpoint is from an older epoch, we iterate back through diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 972e701815..799ce86f28 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -134,6 +134,8 @@ pub enum BeaconChainError { new_slot: Slot, }, AltairForkDisabled, + ExecutionLayerMissing, + ExecutionForkChoiceUpdateFailed(execution_layer::Error), } easy_from_to!(SlotProcessingError, BeaconChainError); diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index ba4a1d3b66..5862a5cb12 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -74,23 +74,36 @@ impl ExecutionLayer { &self.inner.engines } + fn executor(&self) -> &TaskExecutor { + &self.inner.executor + } + fn log(&self) -> &Logger { &self.inner.log } /// Convenience function to allow calling async functions in a non-async context. - pub fn block_on<'a, T, U, V>(&'a self, future: T) -> Result + pub fn block_on<'a, T, U, V>(&'a self, generate_future: T) -> Result where T: Fn(&'a Self) -> U, U: Future>, { let runtime = self - .inner - .executor + .executor() .runtime() .upgrade() .ok_or(Error::ShuttingDown)?; - runtime.block_on(future(self)) + // TODO(paul): respect the shutdown signal. + runtime.block_on(generate_future(self)) + } + + /// Convenience function to allow calling spawning a task without waiting for the result. + pub fn spawn(&self, generate_future: T, name: &'static str) + where + T: FnOnce(Self) -> U, + U: Future + Send + 'static, + { + self.executor().spawn(generate_future(self.clone()), name); } pub async fn prepare_payload( @@ -179,4 +192,30 @@ impl ExecutionLayer { )) } } + + pub async fn forkchoice_updated( + &self, + head_block_hash: Hash256, + finalized_block_hash: Hash256, + ) -> Result<(), Error> { + let broadcast_results = self + .engines() + .broadcast(|engine| { + engine + .api + .forkchoice_updated(head_block_hash, finalized_block_hash) + }) + .await; + + if broadcast_results.iter().any(Result::is_ok) { + Ok(()) + } else { + Err(Error::EngineErrors( + broadcast_results + .into_iter() + .filter_map(Result::err) + .collect(), + )) + } + } }