Add bones of execution_layer

This commit is contained in:
Paul Hauner
2021-09-21 17:47:18 +10:00
parent 1ce8339d96
commit 7433385fb3
7 changed files with 371 additions and 0 deletions

View File

@@ -0,0 +1,35 @@
use async_trait::async_trait;
use eth1::http::RpcError;
pub use types::{Address, Hash256};
pub mod http;
pub type PayloadId = u64;
#[derive(Debug)]
pub enum Error {
Reqwest(reqwest::Error),
BadResponse(String),
RequestFailed(String),
JsonRpc(RpcError),
}
impl From<reqwest::Error> for Error {
fn from(e: reqwest::Error) -> Self {
Error::Reqwest(e)
}
}
#[async_trait]
pub trait EngineApi {
async fn upcheck(&self) -> Result<(), Error>;
async fn prepare_payload(
&self,
parent_hash: Hash256,
timestamp: u64,
random: Hash256,
fee_recipient: Address,
) -> Result<PayloadId, Error>;
}

View File

@@ -0,0 +1,73 @@
use super::*;
use async_trait::async_trait;
use eth1::http::{hex_to_u64_be, response_result_or_error, send_rpc_request};
pub use reqwest::Client;
use sensitive_url::SensitiveUrl;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::time::Duration;
const ENGINE_PREPARE_PAYLOAD: &str = "engine_preparePayload";
const ENGINE_PREPARE_PAYLOAD_TIMEOUT: Duration = Duration::from_millis(500);
pub struct HttpJsonRpc {
pub client: Client,
pub url: SensitiveUrl,
}
impl HttpJsonRpc {
pub fn new(url: SensitiveUrl) -> Result<Self, Error> {
Ok(Self {
client: Client::builder().build()?,
url,
})
}
}
#[async_trait]
impl EngineApi for HttpJsonRpc {
async fn upcheck(&self) -> Result<(), Error> {
todo!()
}
async fn prepare_payload(
&self,
parent_hash: Hash256,
timestamp: u64,
random: Hash256,
fee_recipient: Address,
) -> Result<PayloadId, Error> {
let params = json!([PreparePayloadRequest {
parent_hash,
timestamp,
random,
fee_recipient
}]);
let response_body = send_rpc_request(
&self.url,
ENGINE_PREPARE_PAYLOAD,
params,
ENGINE_PREPARE_PAYLOAD_TIMEOUT,
)
.await
.map_err(Error::RequestFailed)?;
let result = response_result_or_error(&response_body).map_err(Error::JsonRpc)?;
let string = result
.as_str()
.ok_or(Error::BadResponse("data was not string".to_string()))?;
hex_to_u64_be(string).map_err(Error::BadResponse)
}
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename = "camelCase")]
struct PreparePayloadRequest {
parent_hash: Hash256,
#[serde(with = "eth2_serde_utils::u64_hex_be")]
timestamp: u64,
random: Hash256,
fee_recipient: Address,
}

View File

@@ -0,0 +1,159 @@
use crate::engine_api::{EngineApi, Error as EngineApiError};
use futures::future::join_all;
use slog::{crit, error, info, warn, Logger};
use std::future::Future;
use tokio::sync::RwLock;
#[derive(Copy, Clone, PartialEq)]
enum EngineState {
Online,
Offline,
}
impl EngineState {
fn set_online(&mut self) {
*self = EngineState::Online
}
fn set_offline(&mut self) {
*self = EngineState::Offline
}
fn is_online(&self) -> bool {
*self == EngineState::Online
}
fn is_offline(&self) -> bool {
*self == EngineState::Offline
}
}
pub struct Engine<T> {
pub id: String,
pub api: T,
state: RwLock<EngineState>,
}
impl<T> Engine<T> {
pub fn new(id: String, api: T) -> Self {
Self {
id,
api,
state: RwLock::new(EngineState::Offline),
}
}
}
pub struct Engines<T> {
pub engines: Vec<Engine<T>>,
pub log: Logger,
}
#[derive(Debug)]
pub enum EngineError {
Offline { id: String },
Api { id: String, error: EngineApiError },
}
impl<T: EngineApi> Engines<T> {
async fn upcheck_offline(&self) {
let upcheck_futures = self.engines.iter().map(|engine| async move {
let mut state = engine.state.write().await;
if state.is_offline() {
match engine.api.upcheck().await {
Ok(()) => {
info!(
self.log,
"Execution engine online";
"id" => &engine.id
);
state.set_online()
}
Err(e) => {
warn!(
self.log,
"Execution engine offline";
"error" => ?e,
"id" => &engine.id
)
}
}
}
*state
});
let num_online = join_all(upcheck_futures)
.await
.into_iter()
.filter(|state: &EngineState| state.is_online())
.count();
if num_online == 0 {
crit!(
self.log,
"No execution engines online";
)
}
}
pub async fn first_success<'a, F, G, H>(&'a self, func: F) -> Result<H, Vec<EngineError>>
where
F: Fn(&'a Engine<T>) -> G + Copy,
G: Future<Output = Result<H, EngineApiError>>,
{
match self.first_success_without_retry(func).await {
Ok(result) => Ok(result),
Err(mut first_errors) => {
// Try to recover some nodes.
self.upcheck_offline().await;
// Retry the call on all nodes.
match self.first_success_without_retry(func).await {
Ok(result) => Ok(result),
Err(second_errors) => {
first_errors.extend(second_errors);
Err(first_errors)
}
}
}
}
}
async fn first_success_without_retry<'a, F, G, H>(
&'a self,
func: F,
) -> Result<H, Vec<EngineError>>
where
F: Fn(&'a Engine<T>) -> G,
G: Future<Output = Result<H, EngineApiError>>,
{
let mut errors = vec![];
for engine in &self.engines {
let engine_online = engine.state.read().await.is_online();
if engine_online {
match func(engine).await {
Ok(result) => return Ok(result),
Err(error) => {
error!(
self.log,
"Execution engine call failed";
"error" => ?error,
"id" => &engine.id
);
engine.state.write().await.set_offline();
errors.push(EngineError::Api {
id: engine.id.clone(),
error,
})
}
}
} else {
errors.push(EngineError::Offline {
id: engine.id.clone(),
})
}
}
Err(errors)
}
}

View File

@@ -0,0 +1,67 @@
use engine_api::{http::HttpJsonRpc, Error as ApiError, *};
use engines::{Engine, EngineError, Engines};
use sensitive_url::SensitiveUrl;
use slog::Logger;
mod engine_api;
mod engines;
#[derive(Debug)]
pub enum Error {
ApiError(ApiError),
EngineErrors(Vec<EngineError>),
}
impl From<ApiError> for Error {
fn from(e: ApiError) -> Self {
Error::ApiError(e)
}
}
pub struct ExecutionLayer<T> {
engines: Engines<T>,
}
impl ExecutionLayer<HttpJsonRpc> {
pub fn from_urls(urls: Vec<SensitiveUrl>, log: Logger) -> Result<Self, Error> {
let engines = urls
.into_iter()
.map(|url| {
let id = url.to_string();
let api = HttpJsonRpc::new(url)?;
Ok(Engine::new(id, api))
})
.collect::<Result<_, ApiError>>()?;
Ok(Self {
engines: Engines { engines, log },
})
}
}
impl<T: EngineApi> ExecutionLayer<T> {
pub async fn prepare_payload(
&self,
parent_hash: Hash256,
timestamp: u64,
random: Hash256,
fee_recipient: Address,
) -> Result<PayloadId, Error> {
self.engines
.first_success(|engine| {
engine
.api
.prepare_payload(parent_hash, timestamp, random, fee_recipient)
})
.await
.map_err(Error::EngineErrors)
}
}
#[cfg(test)]
mod tests {
#[test]
fn it_works() {
assert_eq!(2 + 2, 4);
}
}