From 203a93b3e1cd071bc3f7847e65bbee41d672102c Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sat, 25 Sep 2021 17:28:45 +1000 Subject: [PATCH] Add block processing methods to ExecutionLayer --- beacon_node/execution_layer/src/engine_api.rs | 4 +- beacon_node/execution_layer/src/engines.rs | 31 ++++++++ beacon_node/execution_layer/src/lib.rs | 72 ++++++++++++++++++- 3 files changed, 103 insertions(+), 4 deletions(-) diff --git a/beacon_node/execution_layer/src/engine_api.rs b/beacon_node/execution_layer/src/engine_api.rs index b1fd72deea..7362c52eee 100644 --- a/beacon_node/execution_layer/src/engine_api.rs +++ b/beacon_node/execution_layer/src/engine_api.rs @@ -68,7 +68,7 @@ pub trait EngineApi { ) -> Result<(), Error>; } -#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "SCREAMING_SNAKE_CASE")] pub enum ExecutePayloadResponse { Valid, @@ -76,7 +76,7 @@ pub enum ExecutePayloadResponse { Syncing, } -#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "SCREAMING_SNAKE_CASE")] pub enum ConsensusStatus { Valid, diff --git a/beacon_node/execution_layer/src/engines.rs b/beacon_node/execution_layer/src/engines.rs index f3bd72197b..3de4ee6ae5 100644 --- a/beacon_node/execution_layer/src/engines.rs +++ b/beacon_node/execution_layer/src/engines.rs @@ -156,4 +156,35 @@ impl Engines { Err(errors) } + + pub async fn broadcast<'a, F, G, H>(&'a self, func: F) -> Vec> + where + F: Fn(&'a Engine) -> G, + G: Future>, + { + let func = &func; + let futures = self.engines.iter().map(|engine| async move { + let engine_online = engine.state.read().await.is_online(); + if engine_online { + func(engine).await.map_err(|error| { + error!( + self.log, + "Execution engine call failed"; + "error" => ?error, + "id" => &engine.id + ); + EngineError::Api { + id: engine.id.clone(), + error, + } + }) + } else { + Err(EngineError::Offline { + id: engine.id.clone(), + }) + } + }); + + join_all(futures).await + } } diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 44935ccc17..4a1a834525 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -1,7 +1,7 @@ use engine_api::{Error as ApiError, *}; use engines::{Engine, EngineError, Engines}; use sensitive_url::SensitiveUrl; -use slog::Logger; +use slog::{crit, Logger}; use task_executor::TaskExecutor; pub use engine_api::http::HttpJsonRpc; @@ -14,6 +14,7 @@ pub mod test_utils; pub enum Error { ApiError(ApiError), EngineErrors(Vec), + NotSynced, } impl From for Error { @@ -26,6 +27,7 @@ pub struct ExecutionLayer { engines: Engines, /// Allows callers to execute async tasks in a non-async environment, if they desire. pub executor: TaskExecutor, + log: Logger, } impl ExecutionLayer { @@ -44,8 +46,12 @@ impl ExecutionLayer { .collect::>()?; Ok(Self { - engines: Engines { engines, log }, + engines: Engines { + engines, + log: log.clone(), + }, executor, + log, }) } } @@ -67,4 +73,66 @@ impl ExecutionLayer { .await .map_err(Error::EngineErrors) } + + pub async fn execute_payload( + &self, + execution_payload: &ExecutionPayload, + ) -> Result { + let broadcast_results = self + .engines + .broadcast(|engine| engine.api.execute_payload(execution_payload.clone())) + .await; + + let mut errors = vec![]; + let mut valid = 0; + let mut invalid = 0; + let mut syncing = 0; + for result in broadcast_results { + match result { + Ok(ExecutePayloadResponse::Valid) => valid += 1, + Ok(ExecutePayloadResponse::Invalid) => invalid += 1, + Ok(ExecutePayloadResponse::Syncing) => syncing += 1, + Err(e) => errors.push(e), + } + } + + if valid > 0 && invalid > 0 { + crit!( + self.log, + "Consensus failure between execution nodes"; + ); + } + + if valid > 0 { + Ok(ExecutePayloadResponse::Valid) + } else if invalid > 0 { + Ok(ExecutePayloadResponse::Invalid) + } else if syncing > 0 { + Ok(ExecutePayloadResponse::Syncing) + } else { + Err(Error::EngineErrors(errors)) + } + } + + pub async fn consensus_validated( + &self, + block_hash: Hash256, + status: ConsensusStatus, + ) -> Result<(), Error> { + let broadcast_results = self + .engines + .broadcast(|engine| engine.api.consensus_validated(block_hash, status)) + .await; + + if broadcast_results.iter().any(Result::is_ok) { + Ok(()) + } else { + Err(Error::EngineErrors( + broadcast_results + .into_iter() + .filter_map(Result::err) + .collect(), + )) + } + } }