From 20106bb3fd201c02ddc44182e91be0596851866c Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Tue, 28 Sep 2021 16:53:33 +1000 Subject: [PATCH] Add first ExecutionLayer tests --- Cargo.lock | 1 + beacon_node/execution_layer/Cargo.toml | 1 + beacon_node/execution_layer/src/engine_api.rs | 3 +- .../execution_layer/src/engine_api/http.rs | 46 +++--- beacon_node/execution_layer/src/engines.rs | 27 ++++ beacon_node/execution_layer/src/lib.rs | 144 ++++++++++++++++++ .../test_utils/execution_block_generator.rs | 18 +-- .../src/test_utils/handle_rpc.rs | 57 ++++++- .../execution_layer/src/test_utils/mod.rs | 57 +++++-- 9 files changed, 305 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e189a564a2..f9c21e25fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2073,6 +2073,7 @@ dependencies = [ "eth1", "eth2_serde_utils 0.1.0", "eth2_ssz_types", + "exit-future", "futures", "hex", "lru", diff --git a/beacon_node/execution_layer/Cargo.toml b/beacon_node/execution_layer/Cargo.toml index a8f472e64c..dcfe4ed4d3 100644 --- a/beacon_node/execution_layer/Cargo.toml +++ b/beacon_node/execution_layer/Cargo.toml @@ -24,3 +24,4 @@ task_executor = { path = "../../common/task_executor" } hex = "0.4.2" eth2_ssz_types = { path = "../../consensus/ssz_types"} lru = "0.6.0" +exit-future = "0.2.0" diff --git a/beacon_node/execution_layer/src/engine_api.rs b/beacon_node/execution_layer/src/engine_api.rs index 01a55e83a6..aa2b2227f2 100644 --- a/beacon_node/execution_layer/src/engine_api.rs +++ b/beacon_node/execution_layer/src/engine_api.rs @@ -19,7 +19,6 @@ pub enum Error { Json(serde_json::Error), ServerMessage(String), Eip155Failure, - NoErrorOrResult, IsSyncing, ExecutionBlockNotFound(Hash256), ExecutionHeadBlockNotFound, @@ -103,7 +102,7 @@ pub enum BlockByNumberQuery<'a> { Tag(&'a str), } -#[derive(Clone, Copy, Debug, PartialEq, Deserialize)] +#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct ExecutionBlock { pub block_hash: Hash256, diff --git a/beacon_node/execution_layer/src/engine_api/http.rs b/beacon_node/execution_layer/src/engine_api/http.rs index 41b2cb848f..cae2e7c144 100644 --- a/beacon_node/execution_layer/src/engine_api/http.rs +++ b/beacon_node/execution_layer/src/engine_api/http.rs @@ -11,33 +11,33 @@ use types::{EthSpec, FixedVector, Transaction, Unsigned, VariableList}; pub use reqwest::Client; const STATIC_ID: u32 = 1; -const JSONRPC_VERSION: &str = "2.0"; +pub const JSONRPC_VERSION: &str = "2.0"; -const RETURN_FULL_TRANSACTION_OBJECTS: bool = false; +pub const RETURN_FULL_TRANSACTION_OBJECTS: bool = false; -const ETH_GET_BLOCK_BY_NUMBER: &str = "eth_getBlockByNumber"; -const ETH_GET_BLOCK_BY_NUMBER_TIMEOUT: Duration = Duration::from_secs(1); +pub const ETH_GET_BLOCK_BY_NUMBER: &str = "eth_getBlockByNumber"; +pub const ETH_GET_BLOCK_BY_NUMBER_TIMEOUT: Duration = Duration::from_secs(1); -const ETH_GET_BLOCK_BY_HASH: &str = "eth_getBlockByHash"; -const ETH_GET_BLOCK_BY_HASH_TIMEOUT: Duration = Duration::from_secs(1); +pub const ETH_GET_BLOCK_BY_HASH: &str = "eth_getBlockByHash"; +pub const ETH_GET_BLOCK_BY_HASH_TIMEOUT: Duration = Duration::from_secs(1); -const ETH_SYNCING: &str = "eth_syncing"; -const ETH_SYNCING_TIMEOUT: Duration = Duration::from_millis(250); +pub const ETH_SYNCING: &str = "eth_syncing"; +pub const ETH_SYNCING_TIMEOUT: Duration = Duration::from_millis(250); -const ENGINE_PREPARE_PAYLOAD: &str = "engine_preparePayload"; -const ENGINE_PREPARE_PAYLOAD_TIMEOUT: Duration = Duration::from_millis(500); +pub const ENGINE_PREPARE_PAYLOAD: &str = "engine_preparePayload"; +pub const ENGINE_PREPARE_PAYLOAD_TIMEOUT: Duration = Duration::from_millis(500); -const ENGINE_EXECUTE_PAYLOAD: &str = "engine_executePayload"; -const ENGINE_EXECUTE_PAYLOAD_TIMEOUT: Duration = Duration::from_secs(2); +pub const ENGINE_EXECUTE_PAYLOAD: &str = "engine_executePayload"; +pub const ENGINE_EXECUTE_PAYLOAD_TIMEOUT: Duration = Duration::from_secs(2); -const ENGINE_GET_PAYLOAD: &str = "engine_getPayload"; -const ENGINE_GET_PAYLOAD_TIMEOUT: Duration = Duration::from_secs(2); +pub const ENGINE_GET_PAYLOAD: &str = "engine_getPayload"; +pub const ENGINE_GET_PAYLOAD_TIMEOUT: Duration = Duration::from_secs(2); -const ENGINE_CONSENSUS_VALIDATED: &str = "engine_consensusValidated"; -const ENGINE_CONSENSUS_VALIDATED_TIMEOUT: Duration = Duration::from_millis(500); +pub const ENGINE_CONSENSUS_VALIDATED: &str = "engine_consensusValidated"; +pub const ENGINE_CONSENSUS_VALIDATED_TIMEOUT: Duration = Duration::from_millis(500); -const ENGINE_FORKCHOICE_UPDATED: &str = "engine_forkchoiceUpdated"; -const ENGINE_FORKCHOICE_UPDATED_TIMEOUT: Duration = Duration::from_millis(500); +pub const ENGINE_FORKCHOICE_UPDATED: &str = "engine_forkchoiceUpdated"; +pub const ENGINE_FORKCHOICE_UPDATED_TIMEOUT: Duration = Duration::from_millis(500); pub struct HttpJsonRpc { pub client: Client, @@ -78,7 +78,7 @@ impl HttpJsonRpc { .await?; match (body.result, body.error) { - (Some(result), None) => serde_json::from_value(result).map_err(Into::into), + (result, None) => serde_json::from_value(result).map_err(Into::into), (_, Some(error)) => { if error.contains(EIP155_ERROR_STR) { Err(Error::Eip155Failure) @@ -86,7 +86,6 @@ impl HttpJsonRpc { Err(Error::ServerMessage(error)) } } - (None, None) => Err(Error::NoErrorOrResult), } } } @@ -104,7 +103,6 @@ impl EngineApi for HttpJsonRpc { * Check the network and chain ids. We omit this to save time for the merge f2f and since it * also seems like it might get annoying during development. */ - match result.as_bool() { Some(false) => Ok(()), _ => Err(Error::IsSyncing), @@ -232,7 +230,7 @@ struct JsonRequestBody<'a> { struct JsonResponseBody { jsonrpc: String, error: Option, - result: Option, + result: serde_json::Value, id: u32, } @@ -375,13 +373,13 @@ mod test { use types::MainnetEthSpec; struct Tester { - server: MockServer, + server: MockServer, echo_client: Arc, } impl Tester { pub fn new() -> Self { - let server = MockServer::unit_testing::(); + let server = MockServer::unit_testing(); let echo_url = SensitiveUrl::parse(&format!("{}/echo", server.url())).unwrap(); let echo_client = Arc::new(HttpJsonRpc::new(echo_url).unwrap()); diff --git a/beacon_node/execution_layer/src/engines.rs b/beacon_node/execution_layer/src/engines.rs index 3de4ee6ae5..8b8ab3db97 100644 --- a/beacon_node/execution_layer/src/engines.rs +++ b/beacon_node/execution_layer/src/engines.rs @@ -158,6 +158,33 @@ impl Engines { } pub async fn broadcast<'a, F, G, H>(&'a self, func: F) -> Vec> + where + F: Fn(&'a Engine) -> G + Copy, + G: Future>, + { + let first_results = self.broadcast_without_retry(func).await; + + let mut any_offline = false; + for result in &first_results { + match result { + Ok(_) => return first_results, + Err(EngineError::Offline { .. }) => any_offline = true, + _ => (), + } + } + + if any_offline { + self.upcheck_offline().await; + self.broadcast_without_retry(func).await + } else { + first_results + } + } + + pub async fn broadcast_without_retry<'a, F, G, H>( + &'a self, + func: F, + ) -> Vec> where F: Fn(&'a Engine) -> G, G: Future>, diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 1cacda7643..32e3643b5e 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -397,3 +397,147 @@ impl ExecutionLayer { is_total_difficulty_reached && is_parent_total_difficulty_valid } } + +#[cfg(test)] +mod test { + use super::*; + use crate::test_utils::{block_number_to_hash, MockServer, DEFAULT_TERMINAL_DIFFICULTY}; + use environment::null_logger; + use types::MainnetEthSpec; + + struct SingleEngineTester { + server: MockServer, + el: ExecutionLayer, + runtime: Option>, + _runtime_shutdown: exit_future::Signal, + } + + impl SingleEngineTester { + pub fn new() -> Self { + let server = MockServer::unit_testing(); + let url = SensitiveUrl::parse(&server.url()).unwrap(); + let log = null_logger().unwrap(); + + let runtime = Arc::new( + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(), + ); + let (runtime_shutdown, exit) = exit_future::signal(); + let (shutdown_tx, _) = futures::channel::mpsc::channel(1); + let executor = + TaskExecutor::new(Arc::downgrade(&runtime), exit, log.clone(), shutdown_tx); + + let el = ExecutionLayer::from_urls( + vec![url], + DEFAULT_TERMINAL_DIFFICULTY.into(), + Hash256::zero(), + None, + executor, + log, + ) + .unwrap(); + + Self { + server, + el, + runtime: Some(runtime), + _runtime_shutdown: runtime_shutdown, + } + } + + pub async fn move_to_terminal_block(self) -> Self { + { + let mut block_gen = self.server.execution_block_generator().await; + block_gen.seconds_since_genesis = + block_gen.terminal_block_number * block_gen.block_interval_secs; + } + self + } + + pub async fn with_terminal_block_number<'a, T, U>(self, func: T) -> Self + where + T: Fn(ExecutionLayer, u64) -> U, + U: Future, + { + let terminal_block_number = self + .server + .execution_block_generator() + .await + .terminal_block_number; + func(self.el.clone(), terminal_block_number).await; + self + } + + pub fn shutdown(&mut self) { + if let Some(runtime) = self.runtime.take() { + Arc::try_unwrap(runtime).unwrap().shutdown_background() + } + } + } + + impl Drop for SingleEngineTester { + fn drop(&mut self) { + self.shutdown() + } + } + + #[tokio::test] + async fn finds_valid_terminal_block_hash() { + SingleEngineTester::new() + .move_to_terminal_block() + .await + .with_terminal_block_number(|el, terminal_block_number| async move { + assert_eq!( + el.is_valid_terminal_pow_block_hash(block_number_to_hash( + terminal_block_number + )) + .await + .unwrap(), + Some(true) + ) + }) + .await; + } + + #[tokio::test] + async fn rejects_invalid_terminal_block_hash() { + SingleEngineTester::new() + .move_to_terminal_block() + .await + .with_terminal_block_number(|el, terminal_block_number| async move { + let invalid_terminal_block = terminal_block_number.checked_sub(1).unwrap(); + + assert_eq!( + el.is_valid_terminal_pow_block_hash(block_number_to_hash( + invalid_terminal_block + )) + .await + .unwrap(), + Some(false) + ) + }) + .await; + } + + #[tokio::test] + async fn rejects_unknown_terminal_block_hash() { + SingleEngineTester::new() + .move_to_terminal_block() + .await + .with_terminal_block_number(|el, terminal_block_number| async move { + let missing_terminal_block = terminal_block_number.checked_add(1).unwrap(); + + assert_eq!( + el.is_valid_terminal_pow_block_hash(block_number_to_hash( + missing_terminal_block + )) + .await + .unwrap(), + None + ) + }) + .await; + } +} diff --git a/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs b/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs index db352ac3a0..90b7116504 100644 --- a/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs +++ b/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs @@ -62,9 +62,9 @@ impl ExecutionBlockGenerator { pub fn block_by_number(&self, number: u64) -> Option { let parent_hash = number .checked_sub(1) - .map(block_number_to_block_hash) + .map(block_number_to_hash) .unwrap_or_else(Hash256::zero); - let block_hash = block_number_to_block_hash(number); + let block_hash = block_number_to_hash(number); if number <= self.terminal_block_number { if number <= self.latest_block_number() { @@ -94,16 +94,16 @@ impl ExecutionBlockGenerator { } pub fn block_by_hash(&self, hash: Hash256) -> Option { - let block_number = block_hash_to_block_number(hash); + let block_number = block_hash_to_number(hash); self.block_by_number(block_number) } } -fn block_number_to_block_hash(n: u64) -> Hash256 { +pub fn block_number_to_hash(n: u64) -> Hash256 { Hash256::from_low_u64_be(n + 1) } -fn block_hash_to_block_number(hash: Hash256) -> u64 { +pub fn block_hash_to_number(hash: Hash256) -> u64 { hash.to_low_u64_be() .checked_sub(1) .expect("do not query for zero hash") @@ -129,12 +129,12 @@ mod test { */ let block = generator.latest_block().unwrap(); - assert_eq!(block.block_hash, block_number_to_block_hash(i)); - assert_eq!(block_hash_to_block_number(block.block_hash), i); + assert_eq!(block.block_hash, block_number_to_hash(i)); + assert_eq!(block_hash_to_number(block.block_hash), i); let expected_parent = i .checked_sub(1) - .map(block_number_to_block_hash) + .map(block_number_to_hash) .unwrap_or_else(Hash256::zero); assert_eq!(block.parent_hash, expected_parent); @@ -162,7 +162,7 @@ mod test { dbg!(next_i); assert!(generator.block_by_number(next_i).is_none()); assert!(generator - .block_by_hash(block_number_to_block_hash(next_i)) + .block_by_hash(block_number_to_hash(next_i)) .is_none()); generator.increment_seconds_since_genesis(1); diff --git a/beacon_node/execution_layer/src/test_utils/handle_rpc.rs b/beacon_node/execution_layer/src/test_utils/handle_rpc.rs index 19c968f40b..ecce225a42 100644 --- a/beacon_node/execution_layer/src/test_utils/handle_rpc.rs +++ b/beacon_node/execution_layer/src/test_utils/handle_rpc.rs @@ -1,10 +1,61 @@ use super::Context; +use crate::engine_api::http::*; +use serde_json::Value as JsonValue; use std::sync::Arc; use types::EthSpec; pub async fn handle_rpc( - body: serde_json::Value, + body: JsonValue, ctx: Arc>, -) -> Result { - todo!("handle_rpc") +) -> Result { + let method = body + .get("method") + .and_then(JsonValue::as_str) + .ok_or_else(|| "missing/invalid method field".to_string())?; + + let params = body + .get("params") + .ok_or_else(|| "missing/invalid params field".to_string())?; + + match method { + ETH_SYNCING => Ok(JsonValue::Bool(false)), + ETH_GET_BLOCK_BY_NUMBER => { + let tag = params + .get(0) + .and_then(JsonValue::as_str) + .ok_or_else(|| "missing/invalid params[0] value".to_string())?; + + match tag { + "latest" => Ok(serde_json::to_value( + ctx.execution_block_generator.read().await.latest_block(), + ) + .unwrap()), + other => Err(format!("The tag {} is not supported", other)), + } + } + ETH_GET_BLOCK_BY_HASH => { + let hash = params + .get(0) + .and_then(JsonValue::as_str) + .ok_or_else(|| "missing/invalid params[0] value".to_string()) + .and_then(|s| { + s.parse() + .map_err(|e| format!("unable to parse hash: {:?}", e)) + })?; + + Ok(serde_json::to_value( + ctx.execution_block_generator + .read() + .await + .block_by_hash(hash), + ) + .unwrap()) + } + ENGINE_CONSENSUS_VALIDATED => Ok(JsonValue::Null), + ENGINE_FORKCHOICE_UPDATED => Ok(JsonValue::Null), + other => Err(format!( + "The method {} does not exist/is not available", + other + )), + } } diff --git a/beacon_node/execution_layer/src/test_utils/mod.rs b/beacon_node/execution_layer/src/test_utils/mod.rs index c7383e2bc8..24583c0059 100644 --- a/beacon_node/execution_layer/src/test_utils/mod.rs +++ b/beacon_node/execution_layer/src/test_utils/mod.rs @@ -1,31 +1,36 @@ +use crate::engine_api::http::JSONRPC_VERSION; use bytes::Bytes; use environment::null_logger; use execution_block_generator::ExecutionBlockGenerator; use handle_rpc::handle_rpc; use serde::{Deserialize, Serialize}; +use serde_json::json; use slog::{info, Logger}; use std::future::Future; use std::marker::PhantomData; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::sync::Arc; -use tokio::sync::{oneshot, RwLock}; +use tokio::sync::{oneshot, RwLock, RwLockWriteGuard}; use types::EthSpec; use warp::Filter; -const DEFAULT_TERMINAL_DIFFICULTY: u64 = 6400; -const DEFAULT_TERMINAL_BLOCK: u64 = 64; +pub use execution_block_generator::{block_hash_to_number, block_number_to_hash}; + +pub const DEFAULT_TERMINAL_DIFFICULTY: u64 = 6400; +pub const DEFAULT_TERMINAL_BLOCK: u64 = 64; mod execution_block_generator; mod handle_rpc; -pub struct MockServer { +pub struct MockServer { _shutdown_tx: oneshot::Sender<()>, listen_socket_addr: SocketAddr, last_echo_request: Arc>>, + pub ctx: Arc>, } -impl MockServer { - pub fn unit_testing() -> Self { +impl MockServer { + pub fn unit_testing() -> Self { let last_echo_request = Arc::new(RwLock::new(None)); let execution_block_generator = ExecutionBlockGenerator::new(DEFAULT_TERMINAL_DIFFICULTY, DEFAULT_TERMINAL_BLOCK); @@ -34,7 +39,7 @@ impl MockServer { config: <_>::default(), log: null_logger().unwrap(), last_echo_request: last_echo_request.clone(), - execution_block_generator: Arc::new(RwLock::new(execution_block_generator)), + execution_block_generator: RwLock::new(execution_block_generator), _phantom: PhantomData, }); @@ -45,7 +50,7 @@ impl MockServer { let _ = shutdown_rx.await; }; - let (listen_socket_addr, server_future) = serve(ctx, shutdown_future).unwrap(); + let (listen_socket_addr, server_future) = serve(ctx.clone(), shutdown_future).unwrap(); tokio::spawn(server_future); @@ -53,9 +58,14 @@ impl MockServer { _shutdown_tx: shutdown_tx, listen_socket_addr, last_echo_request, + ctx, } } + pub async fn execution_block_generator(&self) -> RwLockWriteGuard<'_, ExecutionBlockGenerator> { + self.ctx.execution_block_generator.write().await + } + pub fn url(&self) -> String { format!( "http://{}:{}", @@ -91,6 +101,11 @@ impl From for Error { } } +#[derive(Debug)] +struct MissingIdField; + +impl warp::reject::Reject for MissingIdField {} + /// A wrapper around all the items required to spawn the HTTP server. /// /// The server will gracefully handle the case where any fields are `None`. @@ -98,7 +113,7 @@ pub struct Context { pub config: Config, pub log: Logger, pub last_echo_request: Arc>>, - pub execution_block_generator: Arc>, + pub execution_block_generator: RwLock, pub _phantom: PhantomData, } @@ -150,7 +165,27 @@ pub fn serve( .and(warp::body::json()) .and(ctx_filter.clone()) .and_then(|body: serde_json::Value, ctx: Arc>| async move { - let response = handle_rpc(body, ctx).await; + let id = body + .get("id") + .and_then(serde_json::Value::as_u64) + .ok_or_else(|| warp::reject::custom(MissingIdField))?; + + let response = match handle_rpc(body, ctx).await { + Ok(result) => json!({ + "id": id, + "jsonrpc": JSONRPC_VERSION, + "result": result + }), + Err(message) => json!({ + "id": id, + "jsonrpc": JSONRPC_VERSION, + "error": { + "code": -1234, // Junk error code. + "message": message + } + }), + }; + Ok::<_, warp::reject::Rejection>( warp::http::Response::builder() .status(200) @@ -163,7 +198,7 @@ pub fn serve( // Sends the body of the request to `ctx.last_echo_request` so we can inspect requests. let echo = warp::path("echo") .and(warp::body::bytes()) - .and(ctx_filter.clone()) + .and(ctx_filter) .and_then(|bytes: Bytes, ctx: Arc>| async move { *ctx.last_echo_request.write().await = Some(bytes.clone()); Ok::<_, warp::reject::Rejection>(