Add block processing methods to ExecutionLayer

This commit is contained in:
Paul Hauner
2021-09-25 17:28:45 +10:00
parent f698b91d77
commit 203a93b3e1
3 changed files with 103 additions and 4 deletions

View File

@@ -68,7 +68,7 @@ pub trait EngineApi {
) -> Result<(), Error>;
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub enum ExecutePayloadResponse {
Valid,
@@ -76,7 +76,7 @@ pub enum ExecutePayloadResponse {
Syncing,
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub enum ConsensusStatus {
Valid,

View File

@@ -156,4 +156,35 @@ impl<T: EngineApi> Engines<T> {
Err(errors)
}
pub async fn broadcast<'a, F, G, H>(&'a self, func: F) -> Vec<Result<H, EngineError>>
where
F: Fn(&'a Engine<T>) -> G,
G: Future<Output = Result<H, EngineApiError>>,
{
let func = &func;
let futures = self.engines.iter().map(|engine| async move {
let engine_online = engine.state.read().await.is_online();
if engine_online {
func(engine).await.map_err(|error| {
error!(
self.log,
"Execution engine call failed";
"error" => ?error,
"id" => &engine.id
);
EngineError::Api {
id: engine.id.clone(),
error,
}
})
} else {
Err(EngineError::Offline {
id: engine.id.clone(),
})
}
});
join_all(futures).await
}
}

View File

@@ -1,7 +1,7 @@
use engine_api::{Error as ApiError, *};
use engines::{Engine, EngineError, Engines};
use sensitive_url::SensitiveUrl;
use slog::Logger;
use slog::{crit, Logger};
use task_executor::TaskExecutor;
pub use engine_api::http::HttpJsonRpc;
@@ -14,6 +14,7 @@ pub mod test_utils;
pub enum Error {
ApiError(ApiError),
EngineErrors(Vec<EngineError>),
NotSynced,
}
impl From<ApiError> for Error {
@@ -26,6 +27,7 @@ pub struct ExecutionLayer {
engines: Engines<HttpJsonRpc>,
/// Allows callers to execute async tasks in a non-async environment, if they desire.
pub executor: TaskExecutor,
log: Logger,
}
impl ExecutionLayer {
@@ -44,8 +46,12 @@ impl ExecutionLayer {
.collect::<Result<_, ApiError>>()?;
Ok(Self {
engines: Engines { engines, log },
engines: Engines {
engines,
log: log.clone(),
},
executor,
log,
})
}
}
@@ -67,4 +73,66 @@ impl ExecutionLayer {
.await
.map_err(Error::EngineErrors)
}
pub async fn execute_payload<T: EthSpec>(
&self,
execution_payload: &ExecutionPayload<T>,
) -> Result<ExecutePayloadResponse, Error> {
let broadcast_results = self
.engines
.broadcast(|engine| engine.api.execute_payload(execution_payload.clone()))
.await;
let mut errors = vec![];
let mut valid = 0;
let mut invalid = 0;
let mut syncing = 0;
for result in broadcast_results {
match result {
Ok(ExecutePayloadResponse::Valid) => valid += 1,
Ok(ExecutePayloadResponse::Invalid) => invalid += 1,
Ok(ExecutePayloadResponse::Syncing) => syncing += 1,
Err(e) => errors.push(e),
}
}
if valid > 0 && invalid > 0 {
crit!(
self.log,
"Consensus failure between execution nodes";
);
}
if valid > 0 {
Ok(ExecutePayloadResponse::Valid)
} else if invalid > 0 {
Ok(ExecutePayloadResponse::Invalid)
} else if syncing > 0 {
Ok(ExecutePayloadResponse::Syncing)
} else {
Err(Error::EngineErrors(errors))
}
}
pub async fn consensus_validated(
&self,
block_hash: Hash256,
status: ConsensusStatus,
) -> Result<(), Error> {
let broadcast_results = self
.engines
.broadcast(|engine| engine.api.consensus_validated(block_hash, status))
.await;
if broadcast_results.iter().any(Result::is_ok) {
Ok(())
} else {
Err(Error::EngineErrors(
broadcast_results
.into_iter()
.filter_map(Result::err)
.collect(),
))
}
}
}