Begin implementing payload handle

This commit is contained in:
Paul Hauner
2021-09-27 12:17:31 +10:00
parent 03b984aa89
commit 3c816a3533
3 changed files with 82 additions and 16 deletions

View File

@@ -1095,7 +1095,7 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> {
object_fork: block.message().body().fork_name(), object_fork: block.message().body().fork_name(),
})?; })?;
let payload_status = execution_layer let (payload_status, payload_handle) = execution_layer
.block_on(|execution_layer| execution_layer.execute_payload(execution_payload)) .block_on(|execution_layer| execution_layer.execute_payload(execution_payload))
.map_err(ExecutionPayloadError::from)?; .map_err(ExecutionPayloadError::from)?;

View File

@@ -0,0 +1,39 @@
use crate::{ConsensusStatus, ExecutionLayer};
use slog::crit;
use types::Hash256;
pub struct ExecutePayloadHandle {
pub(crate) block_hash: Hash256,
pub(crate) execution_layer: ExecutionLayer,
}
impl ExecutePayloadHandle {
pub fn publish_consensus_valid(self) {
self.publish(ConsensusStatus::Valid)
}
pub fn publish_consensus_invalid(self) {
self.publish(ConsensusStatus::Invalid)
}
fn publish(&self, status: ConsensusStatus) {
if let Err(e) = self.execution_layer.block_on(|execution_layer| {
execution_layer.consensus_validated(self.block_hash, status)
}) {
// TODO(paul): consider how to recover when we are temporarily unable to tell a node
// that the block was valid.
crit!(
self.execution_layer.log(),
"Failed to update execution consensus status";
"error" => ?e,
"status" => ?status,
);
}
}
}
impl Drop for ExecutePayloadHandle {
fn drop(&mut self) {
self.publish(ConsensusStatus::Invalid)
}
}

View File

@@ -3,12 +3,15 @@ use engines::{Engine, EngineError, Engines};
use sensitive_url::SensitiveUrl; use sensitive_url::SensitiveUrl;
use slog::{crit, Logger}; use slog::{crit, Logger};
use std::future::Future; use std::future::Future;
use std::sync::Arc;
use task_executor::TaskExecutor; use task_executor::TaskExecutor;
pub use engine_api::{http::HttpJsonRpc, ConsensusStatus, ExecutePayloadResponse}; pub use engine_api::{http::HttpJsonRpc, ConsensusStatus, ExecutePayloadResponse};
pub use execute_payload_handle::ExecutePayloadHandle;
mod engine_api; mod engine_api;
mod engines; mod engines;
mod execute_payload_handle;
pub mod test_utils; pub mod test_utils;
#[derive(Debug)] #[derive(Debug)]
@@ -25,13 +28,17 @@ impl From<ApiError> for Error {
} }
} }
pub struct ExecutionLayer { struct Inner {
engines: Engines<HttpJsonRpc>, engines: Engines<HttpJsonRpc>,
/// Allows callers to execute async tasks in a non-async environment, if they desire. executor: TaskExecutor,
pub executor: TaskExecutor,
log: Logger, log: Logger,
} }
#[derive(Clone)]
pub struct ExecutionLayer {
inner: Arc<Inner>,
}
impl ExecutionLayer { impl ExecutionLayer {
pub fn from_urls( pub fn from_urls(
urls: Vec<SensitiveUrl>, urls: Vec<SensitiveUrl>,
@@ -47,18 +54,30 @@ impl ExecutionLayer {
}) })
.collect::<Result<_, ApiError>>()?; .collect::<Result<_, ApiError>>()?;
Ok(Self { let inner = Inner {
engines: Engines { engines: Engines {
engines, engines,
log: log.clone(), log: log.clone(),
}, },
executor, executor,
log, log,
};
Ok(Self {
inner: Arc::new(inner),
}) })
} }
} }
impl ExecutionLayer { impl ExecutionLayer {
fn engines(&self) -> &Engines<HttpJsonRpc> {
&self.inner.engines
}
fn log(&self) -> &Logger {
&self.inner.log
}
/// Convenience function to allow calling async functions in a non-async context. /// Convenience function to allow calling async functions in a non-async context.
pub fn block_on<'a, T, U, V>(&'a self, future: T) -> Result<V, Error> pub fn block_on<'a, T, U, V>(&'a self, future: T) -> Result<V, Error>
where where
@@ -66,6 +85,7 @@ impl ExecutionLayer {
U: Future<Output = Result<V, Error>>, U: Future<Output = Result<V, Error>>,
{ {
let runtime = self let runtime = self
.inner
.executor .executor
.runtime() .runtime()
.upgrade() .upgrade()
@@ -80,7 +100,7 @@ impl ExecutionLayer {
random: Hash256, random: Hash256,
fee_recipient: Address, fee_recipient: Address,
) -> Result<PayloadId, Error> { ) -> Result<PayloadId, Error> {
self.engines self.engines()
.first_success(|engine| { .first_success(|engine| {
engine engine
.api .api
@@ -93,9 +113,9 @@ impl ExecutionLayer {
pub async fn execute_payload<T: EthSpec>( pub async fn execute_payload<T: EthSpec>(
&self, &self,
execution_payload: &ExecutionPayload<T>, execution_payload: &ExecutionPayload<T>,
) -> Result<ExecutePayloadResponse, Error> { ) -> Result<(ExecutePayloadResponse, ExecutePayloadHandle), Error> {
let broadcast_results = self let broadcast_results = self
.engines .engines()
.broadcast(|engine| engine.api.execute_payload(execution_payload.clone())) .broadcast(|engine| engine.api.execute_payload(execution_payload.clone()))
.await; .await;
@@ -114,20 +134,27 @@ impl ExecutionLayer {
if valid > 0 && invalid > 0 { if valid > 0 && invalid > 0 {
crit!( crit!(
self.log, self.log(),
"Consensus failure between execution nodes"; "Consensus failure between execution nodes";
); );
} }
if valid > 0 { let execute_payload_response = if valid > 0 {
Ok(ExecutePayloadResponse::Valid) ExecutePayloadResponse::Valid
} else if invalid > 0 { } else if invalid > 0 {
Ok(ExecutePayloadResponse::Invalid) ExecutePayloadResponse::Invalid
} else if syncing > 0 { } else if syncing > 0 {
Ok(ExecutePayloadResponse::Syncing) ExecutePayloadResponse::Syncing
} else { } else {
Err(Error::EngineErrors(errors)) return Err(Error::EngineErrors(errors));
} };
let execute_payload_handle = ExecutePayloadHandle {
block_hash: execution_payload.block_hash,
execution_layer: self.clone(),
};
Ok((execute_payload_response, execute_payload_handle))
} }
pub async fn consensus_validated( pub async fn consensus_validated(
@@ -136,7 +163,7 @@ impl ExecutionLayer {
status: ConsensusStatus, status: ConsensusStatus,
) -> Result<(), Error> { ) -> Result<(), Error> {
let broadcast_results = self let broadcast_results = self
.engines .engines()
.broadcast(|engine| engine.api.consensus_validated(block_hash, status)) .broadcast(|engine| engine.api.consensus_validated(block_hash, status))
.await; .await;