diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 58877ed20c..75d6323d3f 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -1095,7 +1095,7 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> { object_fork: block.message().body().fork_name(), })?; - let payload_status = execution_layer + let (payload_status, payload_handle) = execution_layer .block_on(|execution_layer| execution_layer.execute_payload(execution_payload)) .map_err(ExecutionPayloadError::from)?; diff --git a/beacon_node/execution_layer/src/execute_payload_handle.rs b/beacon_node/execution_layer/src/execute_payload_handle.rs new file mode 100644 index 0000000000..2b45318e1f --- /dev/null +++ b/beacon_node/execution_layer/src/execute_payload_handle.rs @@ -0,0 +1,39 @@ +use crate::{ConsensusStatus, ExecutionLayer}; +use slog::crit; +use types::Hash256; + +pub struct ExecutePayloadHandle { + pub(crate) block_hash: Hash256, + pub(crate) execution_layer: ExecutionLayer, +} + +impl ExecutePayloadHandle { + pub fn publish_consensus_valid(self) { + self.publish(ConsensusStatus::Valid) + } + + pub fn publish_consensus_invalid(self) { + self.publish(ConsensusStatus::Invalid) + } + + fn publish(&self, status: ConsensusStatus) { + if let Err(e) = self.execution_layer.block_on(|execution_layer| { + execution_layer.consensus_validated(self.block_hash, status) + }) { + // TODO(paul): consider how to recover when we are temporarily unable to tell a node + // that the block was valid. + crit!( + self.execution_layer.log(), + "Failed to update execution consensus status"; + "error" => ?e, + "status" => ?status, + ); + } + } +} + +impl Drop for ExecutePayloadHandle { + fn drop(&mut self) { + self.publish(ConsensusStatus::Invalid) + } +} diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 5168fce211..316743e42c 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -3,12 +3,15 @@ use engines::{Engine, EngineError, Engines}; use sensitive_url::SensitiveUrl; use slog::{crit, Logger}; use std::future::Future; +use std::sync::Arc; use task_executor::TaskExecutor; pub use engine_api::{http::HttpJsonRpc, ConsensusStatus, ExecutePayloadResponse}; +pub use execute_payload_handle::ExecutePayloadHandle; mod engine_api; mod engines; +mod execute_payload_handle; pub mod test_utils; #[derive(Debug)] @@ -25,13 +28,17 @@ impl From for Error { } } -pub struct ExecutionLayer { +struct Inner { engines: Engines, - /// Allows callers to execute async tasks in a non-async environment, if they desire. - pub executor: TaskExecutor, + executor: TaskExecutor, log: Logger, } +#[derive(Clone)] +pub struct ExecutionLayer { + inner: Arc, +} + impl ExecutionLayer { pub fn from_urls( urls: Vec, @@ -47,18 +54,30 @@ impl ExecutionLayer { }) .collect::>()?; - Ok(Self { + let inner = Inner { engines: Engines { engines, log: log.clone(), }, executor, log, + }; + + Ok(Self { + inner: Arc::new(inner), }) } } impl ExecutionLayer { + fn engines(&self) -> &Engines { + &self.inner.engines + } + + fn log(&self) -> &Logger { + &self.inner.log + } + /// Convenience function to allow calling async functions in a non-async context. pub fn block_on<'a, T, U, V>(&'a self, future: T) -> Result where @@ -66,6 +85,7 @@ impl ExecutionLayer { U: Future>, { let runtime = self + .inner .executor .runtime() .upgrade() @@ -80,7 +100,7 @@ impl ExecutionLayer { random: Hash256, fee_recipient: Address, ) -> Result { - self.engines + self.engines() .first_success(|engine| { engine .api @@ -93,9 +113,9 @@ impl ExecutionLayer { pub async fn execute_payload( &self, execution_payload: &ExecutionPayload, - ) -> Result { + ) -> Result<(ExecutePayloadResponse, ExecutePayloadHandle), Error> { let broadcast_results = self - .engines + .engines() .broadcast(|engine| engine.api.execute_payload(execution_payload.clone())) .await; @@ -114,20 +134,27 @@ impl ExecutionLayer { if valid > 0 && invalid > 0 { crit!( - self.log, + self.log(), "Consensus failure between execution nodes"; ); } - if valid > 0 { - Ok(ExecutePayloadResponse::Valid) + let execute_payload_response = if valid > 0 { + ExecutePayloadResponse::Valid } else if invalid > 0 { - Ok(ExecutePayloadResponse::Invalid) + ExecutePayloadResponse::Invalid } else if syncing > 0 { - Ok(ExecutePayloadResponse::Syncing) + ExecutePayloadResponse::Syncing } else { - Err(Error::EngineErrors(errors)) - } + return Err(Error::EngineErrors(errors)); + }; + + let execute_payload_handle = ExecutePayloadHandle { + block_hash: execution_payload.block_hash, + execution_layer: self.clone(), + }; + + Ok((execute_payload_response, execute_payload_handle)) } pub async fn consensus_validated( @@ -136,7 +163,7 @@ impl ExecutionLayer { status: ConsensusStatus, ) -> Result<(), Error> { let broadcast_results = self - .engines + .engines() .broadcast(|engine| engine.api.consensus_validated(block_hash, status)) .await;