diff --git a/Cargo.lock b/Cargo.lock index 64ddf65391..faaeaa4c86 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2061,6 +2061,23 @@ version = "2.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59" +[[package]] +name = "execution_layer" +version = "0.1.0" +dependencies = [ + "async-trait", + "eth1", + "eth2_serde_utils 0.1.0", + "futures", + "reqwest", + "sensitive_url", + "serde", + "serde_json", + "slog", + "tokio", + "types", +] + [[package]] name = "exit-future" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index ab0cb15dc2..f8c9a41395 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "beacon_node/client", "beacon_node/eth1", "beacon_node/eth2_libp2p", + "beacon_node/execution_layer", "beacon_node/http_api", "beacon_node/http_metrics", "beacon_node/network", diff --git a/beacon_node/execution_layer/Cargo.toml b/beacon_node/execution_layer/Cargo.toml new file mode 100644 index 0000000000..7883d02caa --- /dev/null +++ b/beacon_node/execution_layer/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "execution_layer" +version = "0.1.0" +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +types = { path = "../../consensus/types"} +tokio = { version = "1.10.0", features = ["full"] } +async-trait = "0.1.51" +slog = "2.5.2" +futures = "0.3.7" +sensitive_url = { path = "../../common/sensitive_url" } +reqwest = { version = "0.11.0", features = ["json","stream"] } +eth2_serde_utils = { path = "../../consensus/serde_utils" } +serde_json = "1.0.58" +serde = { version = "1.0.116", features = ["derive"] } +eth1 = { path = "../eth1" } diff --git a/beacon_node/execution_layer/src/engine_api.rs b/beacon_node/execution_layer/src/engine_api.rs new file mode 100644 index 0000000000..84b5b6c8b3 --- /dev/null +++ b/beacon_node/execution_layer/src/engine_api.rs @@ -0,0 +1,35 @@ +use async_trait::async_trait; +use eth1::http::RpcError; + +pub use types::{Address, Hash256}; + +pub mod http; + +pub type PayloadId = u64; + +#[derive(Debug)] +pub enum Error { + Reqwest(reqwest::Error), + BadResponse(String), + RequestFailed(String), + JsonRpc(RpcError), +} + +impl From for Error { + fn from(e: reqwest::Error) -> Self { + Error::Reqwest(e) + } +} + +#[async_trait] +pub trait EngineApi { + async fn upcheck(&self) -> Result<(), Error>; + + async fn prepare_payload( + &self, + parent_hash: Hash256, + timestamp: u64, + random: Hash256, + fee_recipient: Address, + ) -> Result; +} diff --git a/beacon_node/execution_layer/src/engine_api/http.rs b/beacon_node/execution_layer/src/engine_api/http.rs new file mode 100644 index 0000000000..6ba1bd1dbb --- /dev/null +++ b/beacon_node/execution_layer/src/engine_api/http.rs @@ -0,0 +1,73 @@ +use super::*; +use async_trait::async_trait; +use eth1::http::{hex_to_u64_be, response_result_or_error, send_rpc_request}; +pub use reqwest::Client; +use sensitive_url::SensitiveUrl; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use std::time::Duration; + +const ENGINE_PREPARE_PAYLOAD: &str = "engine_preparePayload"; +const ENGINE_PREPARE_PAYLOAD_TIMEOUT: Duration = Duration::from_millis(500); + +pub struct HttpJsonRpc { + pub client: Client, + pub url: SensitiveUrl, +} + +impl HttpJsonRpc { + pub fn new(url: SensitiveUrl) -> Result { + Ok(Self { + client: Client::builder().build()?, + url, + }) + } +} + +#[async_trait] +impl EngineApi for HttpJsonRpc { + async fn upcheck(&self) -> Result<(), Error> { + todo!() + } + + async fn prepare_payload( + &self, + parent_hash: Hash256, + timestamp: u64, + random: Hash256, + fee_recipient: Address, + ) -> Result { + let params = json!([PreparePayloadRequest { + parent_hash, + timestamp, + random, + fee_recipient + }]); + + let response_body = send_rpc_request( + &self.url, + ENGINE_PREPARE_PAYLOAD, + params, + ENGINE_PREPARE_PAYLOAD_TIMEOUT, + ) + .await + .map_err(Error::RequestFailed)?; + + let result = response_result_or_error(&response_body).map_err(Error::JsonRpc)?; + let string = result + .as_str() + .ok_or(Error::BadResponse("data was not string".to_string()))?; + + hex_to_u64_be(string).map_err(Error::BadResponse) + } +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename = "camelCase")] +struct PreparePayloadRequest { + parent_hash: Hash256, + #[serde(with = "eth2_serde_utils::u64_hex_be")] + timestamp: u64, + random: Hash256, + fee_recipient: Address, +} diff --git a/beacon_node/execution_layer/src/engines.rs b/beacon_node/execution_layer/src/engines.rs new file mode 100644 index 0000000000..f3bd72197b --- /dev/null +++ b/beacon_node/execution_layer/src/engines.rs @@ -0,0 +1,159 @@ +use crate::engine_api::{EngineApi, Error as EngineApiError}; +use futures::future::join_all; +use slog::{crit, error, info, warn, Logger}; +use std::future::Future; +use tokio::sync::RwLock; + +#[derive(Copy, Clone, PartialEq)] +enum EngineState { + Online, + Offline, +} + +impl EngineState { + fn set_online(&mut self) { + *self = EngineState::Online + } + + fn set_offline(&mut self) { + *self = EngineState::Offline + } + + fn is_online(&self) -> bool { + *self == EngineState::Online + } + + fn is_offline(&self) -> bool { + *self == EngineState::Offline + } +} + +pub struct Engine { + pub id: String, + pub api: T, + state: RwLock, +} + +impl Engine { + pub fn new(id: String, api: T) -> Self { + Self { + id, + api, + state: RwLock::new(EngineState::Offline), + } + } +} + +pub struct Engines { + pub engines: Vec>, + pub log: Logger, +} + +#[derive(Debug)] +pub enum EngineError { + Offline { id: String }, + Api { id: String, error: EngineApiError }, +} + +impl Engines { + async fn upcheck_offline(&self) { + let upcheck_futures = self.engines.iter().map(|engine| async move { + let mut state = engine.state.write().await; + if state.is_offline() { + match engine.api.upcheck().await { + Ok(()) => { + info!( + self.log, + "Execution engine online"; + "id" => &engine.id + ); + state.set_online() + } + Err(e) => { + warn!( + self.log, + "Execution engine offline"; + "error" => ?e, + "id" => &engine.id + ) + } + } + } + *state + }); + + let num_online = join_all(upcheck_futures) + .await + .into_iter() + .filter(|state: &EngineState| state.is_online()) + .count(); + + if num_online == 0 { + crit!( + self.log, + "No execution engines online"; + ) + } + } + + pub async fn first_success<'a, F, G, H>(&'a self, func: F) -> Result> + where + F: Fn(&'a Engine) -> G + Copy, + G: Future>, + { + match self.first_success_without_retry(func).await { + Ok(result) => Ok(result), + Err(mut first_errors) => { + // Try to recover some nodes. + self.upcheck_offline().await; + // Retry the call on all nodes. + match self.first_success_without_retry(func).await { + Ok(result) => Ok(result), + Err(second_errors) => { + first_errors.extend(second_errors); + Err(first_errors) + } + } + } + } + } + + async fn first_success_without_retry<'a, F, G, H>( + &'a self, + func: F, + ) -> Result> + where + F: Fn(&'a Engine) -> G, + G: Future>, + { + let mut errors = vec![]; + + for engine in &self.engines { + let engine_online = engine.state.read().await.is_online(); + if engine_online { + match func(engine).await { + Ok(result) => return Ok(result), + Err(error) => { + error!( + self.log, + "Execution engine call failed"; + "error" => ?error, + "id" => &engine.id + ); + engine.state.write().await.set_offline(); + errors.push(EngineError::Api { + id: engine.id.clone(), + error, + }) + } + } + } else { + errors.push(EngineError::Offline { + id: engine.id.clone(), + }) + } + } + + Err(errors) + } +} diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs new file mode 100644 index 0000000000..589568a51d --- /dev/null +++ b/beacon_node/execution_layer/src/lib.rs @@ -0,0 +1,67 @@ +use engine_api::{http::HttpJsonRpc, Error as ApiError, *}; +use engines::{Engine, EngineError, Engines}; +use sensitive_url::SensitiveUrl; +use slog::Logger; + +mod engine_api; +mod engines; + +#[derive(Debug)] +pub enum Error { + ApiError(ApiError), + EngineErrors(Vec), +} + +impl From for Error { + fn from(e: ApiError) -> Self { + Error::ApiError(e) + } +} + +pub struct ExecutionLayer { + engines: Engines, +} + +impl ExecutionLayer { + pub fn from_urls(urls: Vec, log: Logger) -> Result { + let engines = urls + .into_iter() + .map(|url| { + let id = url.to_string(); + let api = HttpJsonRpc::new(url)?; + Ok(Engine::new(id, api)) + }) + .collect::>()?; + + Ok(Self { + engines: Engines { engines, log }, + }) + } +} + +impl ExecutionLayer { + pub async fn prepare_payload( + &self, + parent_hash: Hash256, + timestamp: u64, + random: Hash256, + fee_recipient: Address, + ) -> Result { + self.engines + .first_success(|engine| { + engine + .api + .prepare_payload(parent_hash, timestamp, random, fee_recipient) + }) + .await + .map_err(Error::EngineErrors) + } +} + +#[cfg(test)] +mod tests { + #[test] + fn it_works() { + assert_eq!(2 + 2, 4); + } +}