Add first ExecutionLayer tests

This commit is contained in:
Paul Hauner
2021-09-28 16:53:33 +10:00
parent 354955ccff
commit 20106bb3fd
9 changed files with 305 additions and 49 deletions

View File

@@ -24,3 +24,4 @@ task_executor = { path = "../../common/task_executor" }
hex = "0.4.2"
eth2_ssz_types = { path = "../../consensus/ssz_types"}
lru = "0.6.0"
exit-future = "0.2.0"

View File

@@ -19,7 +19,6 @@ pub enum Error {
Json(serde_json::Error),
ServerMessage(String),
Eip155Failure,
NoErrorOrResult,
IsSyncing,
ExecutionBlockNotFound(Hash256),
ExecutionHeadBlockNotFound,
@@ -103,7 +102,7 @@ pub enum BlockByNumberQuery<'a> {
Tag(&'a str),
}
#[derive(Clone, Copy, Debug, PartialEq, Deserialize)]
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ExecutionBlock {
pub block_hash: Hash256,

View File

@@ -11,33 +11,33 @@ use types::{EthSpec, FixedVector, Transaction, Unsigned, VariableList};
pub use reqwest::Client;
const STATIC_ID: u32 = 1;
const JSONRPC_VERSION: &str = "2.0";
pub const JSONRPC_VERSION: &str = "2.0";
const RETURN_FULL_TRANSACTION_OBJECTS: bool = false;
pub const RETURN_FULL_TRANSACTION_OBJECTS: bool = false;
const ETH_GET_BLOCK_BY_NUMBER: &str = "eth_getBlockByNumber";
const ETH_GET_BLOCK_BY_NUMBER_TIMEOUT: Duration = Duration::from_secs(1);
pub const ETH_GET_BLOCK_BY_NUMBER: &str = "eth_getBlockByNumber";
pub const ETH_GET_BLOCK_BY_NUMBER_TIMEOUT: Duration = Duration::from_secs(1);
const ETH_GET_BLOCK_BY_HASH: &str = "eth_getBlockByHash";
const ETH_GET_BLOCK_BY_HASH_TIMEOUT: Duration = Duration::from_secs(1);
pub const ETH_GET_BLOCK_BY_HASH: &str = "eth_getBlockByHash";
pub const ETH_GET_BLOCK_BY_HASH_TIMEOUT: Duration = Duration::from_secs(1);
const ETH_SYNCING: &str = "eth_syncing";
const ETH_SYNCING_TIMEOUT: Duration = Duration::from_millis(250);
pub const ETH_SYNCING: &str = "eth_syncing";
pub const ETH_SYNCING_TIMEOUT: Duration = Duration::from_millis(250);
const ENGINE_PREPARE_PAYLOAD: &str = "engine_preparePayload";
const ENGINE_PREPARE_PAYLOAD_TIMEOUT: Duration = Duration::from_millis(500);
pub const ENGINE_PREPARE_PAYLOAD: &str = "engine_preparePayload";
pub const ENGINE_PREPARE_PAYLOAD_TIMEOUT: Duration = Duration::from_millis(500);
const ENGINE_EXECUTE_PAYLOAD: &str = "engine_executePayload";
const ENGINE_EXECUTE_PAYLOAD_TIMEOUT: Duration = Duration::from_secs(2);
pub const ENGINE_EXECUTE_PAYLOAD: &str = "engine_executePayload";
pub const ENGINE_EXECUTE_PAYLOAD_TIMEOUT: Duration = Duration::from_secs(2);
const ENGINE_GET_PAYLOAD: &str = "engine_getPayload";
const ENGINE_GET_PAYLOAD_TIMEOUT: Duration = Duration::from_secs(2);
pub const ENGINE_GET_PAYLOAD: &str = "engine_getPayload";
pub const ENGINE_GET_PAYLOAD_TIMEOUT: Duration = Duration::from_secs(2);
const ENGINE_CONSENSUS_VALIDATED: &str = "engine_consensusValidated";
const ENGINE_CONSENSUS_VALIDATED_TIMEOUT: Duration = Duration::from_millis(500);
pub const ENGINE_CONSENSUS_VALIDATED: &str = "engine_consensusValidated";
pub const ENGINE_CONSENSUS_VALIDATED_TIMEOUT: Duration = Duration::from_millis(500);
const ENGINE_FORKCHOICE_UPDATED: &str = "engine_forkchoiceUpdated";
const ENGINE_FORKCHOICE_UPDATED_TIMEOUT: Duration = Duration::from_millis(500);
pub const ENGINE_FORKCHOICE_UPDATED: &str = "engine_forkchoiceUpdated";
pub const ENGINE_FORKCHOICE_UPDATED_TIMEOUT: Duration = Duration::from_millis(500);
pub struct HttpJsonRpc {
pub client: Client,
@@ -78,7 +78,7 @@ impl HttpJsonRpc {
.await?;
match (body.result, body.error) {
(Some(result), None) => serde_json::from_value(result).map_err(Into::into),
(result, None) => serde_json::from_value(result).map_err(Into::into),
(_, Some(error)) => {
if error.contains(EIP155_ERROR_STR) {
Err(Error::Eip155Failure)
@@ -86,7 +86,6 @@ impl HttpJsonRpc {
Err(Error::ServerMessage(error))
}
}
(None, None) => Err(Error::NoErrorOrResult),
}
}
}
@@ -104,7 +103,6 @@ impl EngineApi for HttpJsonRpc {
* Check the network and chain ids. We omit this to save time for the merge f2f and since it
* also seems like it might get annoying during development.
*/
match result.as_bool() {
Some(false) => Ok(()),
_ => Err(Error::IsSyncing),
@@ -232,7 +230,7 @@ struct JsonRequestBody<'a> {
struct JsonResponseBody {
jsonrpc: String,
error: Option<String>,
result: Option<serde_json::Value>,
result: serde_json::Value,
id: u32,
}
@@ -375,13 +373,13 @@ mod test {
use types::MainnetEthSpec;
struct Tester {
server: MockServer,
server: MockServer<MainnetEthSpec>,
echo_client: Arc<HttpJsonRpc>,
}
impl Tester {
pub fn new() -> Self {
let server = MockServer::unit_testing::<MainnetEthSpec>();
let server = MockServer::unit_testing();
let echo_url = SensitiveUrl::parse(&format!("{}/echo", server.url())).unwrap();
let echo_client = Arc::new(HttpJsonRpc::new(echo_url).unwrap());

View File

@@ -158,6 +158,33 @@ impl<T: EngineApi> Engines<T> {
}
pub async fn broadcast<'a, F, G, H>(&'a self, func: F) -> Vec<Result<H, EngineError>>
where
F: Fn(&'a Engine<T>) -> G + Copy,
G: Future<Output = Result<H, EngineApiError>>,
{
let first_results = self.broadcast_without_retry(func).await;
let mut any_offline = false;
for result in &first_results {
match result {
Ok(_) => return first_results,
Err(EngineError::Offline { .. }) => any_offline = true,
_ => (),
}
}
if any_offline {
self.upcheck_offline().await;
self.broadcast_without_retry(func).await
} else {
first_results
}
}
pub async fn broadcast_without_retry<'a, F, G, H>(
&'a self,
func: F,
) -> Vec<Result<H, EngineError>>
where
F: Fn(&'a Engine<T>) -> G,
G: Future<Output = Result<H, EngineApiError>>,

View File

@@ -397,3 +397,147 @@ impl ExecutionLayer {
is_total_difficulty_reached && is_parent_total_difficulty_valid
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::test_utils::{block_number_to_hash, MockServer, DEFAULT_TERMINAL_DIFFICULTY};
use environment::null_logger;
use types::MainnetEthSpec;
struct SingleEngineTester {
server: MockServer<MainnetEthSpec>,
el: ExecutionLayer,
runtime: Option<Arc<tokio::runtime::Runtime>>,
_runtime_shutdown: exit_future::Signal,
}
impl SingleEngineTester {
pub fn new() -> Self {
let server = MockServer::unit_testing();
let url = SensitiveUrl::parse(&server.url()).unwrap();
let log = null_logger().unwrap();
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap(),
);
let (runtime_shutdown, exit) = exit_future::signal();
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let executor =
TaskExecutor::new(Arc::downgrade(&runtime), exit, log.clone(), shutdown_tx);
let el = ExecutionLayer::from_urls(
vec![url],
DEFAULT_TERMINAL_DIFFICULTY.into(),
Hash256::zero(),
None,
executor,
log,
)
.unwrap();
Self {
server,
el,
runtime: Some(runtime),
_runtime_shutdown: runtime_shutdown,
}
}
pub async fn move_to_terminal_block(self) -> Self {
{
let mut block_gen = self.server.execution_block_generator().await;
block_gen.seconds_since_genesis =
block_gen.terminal_block_number * block_gen.block_interval_secs;
}
self
}
pub async fn with_terminal_block_number<'a, T, U>(self, func: T) -> Self
where
T: Fn(ExecutionLayer, u64) -> U,
U: Future<Output = ()>,
{
let terminal_block_number = self
.server
.execution_block_generator()
.await
.terminal_block_number;
func(self.el.clone(), terminal_block_number).await;
self
}
pub fn shutdown(&mut self) {
if let Some(runtime) = self.runtime.take() {
Arc::try_unwrap(runtime).unwrap().shutdown_background()
}
}
}
impl Drop for SingleEngineTester {
fn drop(&mut self) {
self.shutdown()
}
}
#[tokio::test]
async fn finds_valid_terminal_block_hash() {
SingleEngineTester::new()
.move_to_terminal_block()
.await
.with_terminal_block_number(|el, terminal_block_number| async move {
assert_eq!(
el.is_valid_terminal_pow_block_hash(block_number_to_hash(
terminal_block_number
))
.await
.unwrap(),
Some(true)
)
})
.await;
}
#[tokio::test]
async fn rejects_invalid_terminal_block_hash() {
SingleEngineTester::new()
.move_to_terminal_block()
.await
.with_terminal_block_number(|el, terminal_block_number| async move {
let invalid_terminal_block = terminal_block_number.checked_sub(1).unwrap();
assert_eq!(
el.is_valid_terminal_pow_block_hash(block_number_to_hash(
invalid_terminal_block
))
.await
.unwrap(),
Some(false)
)
})
.await;
}
#[tokio::test]
async fn rejects_unknown_terminal_block_hash() {
SingleEngineTester::new()
.move_to_terminal_block()
.await
.with_terminal_block_number(|el, terminal_block_number| async move {
let missing_terminal_block = terminal_block_number.checked_add(1).unwrap();
assert_eq!(
el.is_valid_terminal_pow_block_hash(block_number_to_hash(
missing_terminal_block
))
.await
.unwrap(),
None
)
})
.await;
}
}

View File

@@ -62,9 +62,9 @@ impl ExecutionBlockGenerator {
pub fn block_by_number(&self, number: u64) -> Option<ExecutionBlock> {
let parent_hash = number
.checked_sub(1)
.map(block_number_to_block_hash)
.map(block_number_to_hash)
.unwrap_or_else(Hash256::zero);
let block_hash = block_number_to_block_hash(number);
let block_hash = block_number_to_hash(number);
if number <= self.terminal_block_number {
if number <= self.latest_block_number() {
@@ -94,16 +94,16 @@ impl ExecutionBlockGenerator {
}
pub fn block_by_hash(&self, hash: Hash256) -> Option<ExecutionBlock> {
let block_number = block_hash_to_block_number(hash);
let block_number = block_hash_to_number(hash);
self.block_by_number(block_number)
}
}
fn block_number_to_block_hash(n: u64) -> Hash256 {
pub fn block_number_to_hash(n: u64) -> Hash256 {
Hash256::from_low_u64_be(n + 1)
}
fn block_hash_to_block_number(hash: Hash256) -> u64 {
pub fn block_hash_to_number(hash: Hash256) -> u64 {
hash.to_low_u64_be()
.checked_sub(1)
.expect("do not query for zero hash")
@@ -129,12 +129,12 @@ mod test {
*/
let block = generator.latest_block().unwrap();
assert_eq!(block.block_hash, block_number_to_block_hash(i));
assert_eq!(block_hash_to_block_number(block.block_hash), i);
assert_eq!(block.block_hash, block_number_to_hash(i));
assert_eq!(block_hash_to_number(block.block_hash), i);
let expected_parent = i
.checked_sub(1)
.map(block_number_to_block_hash)
.map(block_number_to_hash)
.unwrap_or_else(Hash256::zero);
assert_eq!(block.parent_hash, expected_parent);
@@ -162,7 +162,7 @@ mod test {
dbg!(next_i);
assert!(generator.block_by_number(next_i).is_none());
assert!(generator
.block_by_hash(block_number_to_block_hash(next_i))
.block_by_hash(block_number_to_hash(next_i))
.is_none());
generator.increment_seconds_since_genesis(1);

View File

@@ -1,10 +1,61 @@
use super::Context;
use crate::engine_api::http::*;
use serde_json::Value as JsonValue;
use std::sync::Arc;
use types::EthSpec;
pub async fn handle_rpc<T: EthSpec>(
body: serde_json::Value,
body: JsonValue,
ctx: Arc<Context<T>>,
) -> Result<serde_json::Value, String> {
todo!("handle_rpc")
) -> Result<JsonValue, String> {
let method = body
.get("method")
.and_then(JsonValue::as_str)
.ok_or_else(|| "missing/invalid method field".to_string())?;
let params = body
.get("params")
.ok_or_else(|| "missing/invalid params field".to_string())?;
match method {
ETH_SYNCING => Ok(JsonValue::Bool(false)),
ETH_GET_BLOCK_BY_NUMBER => {
let tag = params
.get(0)
.and_then(JsonValue::as_str)
.ok_or_else(|| "missing/invalid params[0] value".to_string())?;
match tag {
"latest" => Ok(serde_json::to_value(
ctx.execution_block_generator.read().await.latest_block(),
)
.unwrap()),
other => Err(format!("The tag {} is not supported", other)),
}
}
ETH_GET_BLOCK_BY_HASH => {
let hash = params
.get(0)
.and_then(JsonValue::as_str)
.ok_or_else(|| "missing/invalid params[0] value".to_string())
.and_then(|s| {
s.parse()
.map_err(|e| format!("unable to parse hash: {:?}", e))
})?;
Ok(serde_json::to_value(
ctx.execution_block_generator
.read()
.await
.block_by_hash(hash),
)
.unwrap())
}
ENGINE_CONSENSUS_VALIDATED => Ok(JsonValue::Null),
ENGINE_FORKCHOICE_UPDATED => Ok(JsonValue::Null),
other => Err(format!(
"The method {} does not exist/is not available",
other
)),
}
}

View File

@@ -1,31 +1,36 @@
use crate::engine_api::http::JSONRPC_VERSION;
use bytes::Bytes;
use environment::null_logger;
use execution_block_generator::ExecutionBlockGenerator;
use handle_rpc::handle_rpc;
use serde::{Deserialize, Serialize};
use serde_json::json;
use slog::{info, Logger};
use std::future::Future;
use std::marker::PhantomData;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::Arc;
use tokio::sync::{oneshot, RwLock};
use tokio::sync::{oneshot, RwLock, RwLockWriteGuard};
use types::EthSpec;
use warp::Filter;
const DEFAULT_TERMINAL_DIFFICULTY: u64 = 6400;
const DEFAULT_TERMINAL_BLOCK: u64 = 64;
pub use execution_block_generator::{block_hash_to_number, block_number_to_hash};
pub const DEFAULT_TERMINAL_DIFFICULTY: u64 = 6400;
pub const DEFAULT_TERMINAL_BLOCK: u64 = 64;
mod execution_block_generator;
mod handle_rpc;
pub struct MockServer {
pub struct MockServer<T: EthSpec> {
_shutdown_tx: oneshot::Sender<()>,
listen_socket_addr: SocketAddr,
last_echo_request: Arc<RwLock<Option<Bytes>>>,
pub ctx: Arc<Context<T>>,
}
impl MockServer {
pub fn unit_testing<T: EthSpec>() -> Self {
impl<T: EthSpec> MockServer<T> {
pub fn unit_testing() -> Self {
let last_echo_request = Arc::new(RwLock::new(None));
let execution_block_generator =
ExecutionBlockGenerator::new(DEFAULT_TERMINAL_DIFFICULTY, DEFAULT_TERMINAL_BLOCK);
@@ -34,7 +39,7 @@ impl MockServer {
config: <_>::default(),
log: null_logger().unwrap(),
last_echo_request: last_echo_request.clone(),
execution_block_generator: Arc::new(RwLock::new(execution_block_generator)),
execution_block_generator: RwLock::new(execution_block_generator),
_phantom: PhantomData,
});
@@ -45,7 +50,7 @@ impl MockServer {
let _ = shutdown_rx.await;
};
let (listen_socket_addr, server_future) = serve(ctx, shutdown_future).unwrap();
let (listen_socket_addr, server_future) = serve(ctx.clone(), shutdown_future).unwrap();
tokio::spawn(server_future);
@@ -53,9 +58,14 @@ impl MockServer {
_shutdown_tx: shutdown_tx,
listen_socket_addr,
last_echo_request,
ctx,
}
}
pub async fn execution_block_generator(&self) -> RwLockWriteGuard<'_, ExecutionBlockGenerator> {
self.ctx.execution_block_generator.write().await
}
pub fn url(&self) -> String {
format!(
"http://{}:{}",
@@ -91,6 +101,11 @@ impl From<String> for Error {
}
}
#[derive(Debug)]
struct MissingIdField;
impl warp::reject::Reject for MissingIdField {}
/// A wrapper around all the items required to spawn the HTTP server.
///
/// The server will gracefully handle the case where any fields are `None`.
@@ -98,7 +113,7 @@ pub struct Context<T> {
pub config: Config,
pub log: Logger,
pub last_echo_request: Arc<RwLock<Option<Bytes>>>,
pub execution_block_generator: Arc<RwLock<ExecutionBlockGenerator>>,
pub execution_block_generator: RwLock<ExecutionBlockGenerator>,
pub _phantom: PhantomData<T>,
}
@@ -150,7 +165,27 @@ pub fn serve<T: EthSpec>(
.and(warp::body::json())
.and(ctx_filter.clone())
.and_then(|body: serde_json::Value, ctx: Arc<Context<T>>| async move {
let response = handle_rpc(body, ctx).await;
let id = body
.get("id")
.and_then(serde_json::Value::as_u64)
.ok_or_else(|| warp::reject::custom(MissingIdField))?;
let response = match handle_rpc(body, ctx).await {
Ok(result) => json!({
"id": id,
"jsonrpc": JSONRPC_VERSION,
"result": result
}),
Err(message) => json!({
"id": id,
"jsonrpc": JSONRPC_VERSION,
"error": {
"code": -1234, // Junk error code.
"message": message
}
}),
};
Ok::<_, warp::reject::Rejection>(
warp::http::Response::builder()
.status(200)
@@ -163,7 +198,7 @@ pub fn serve<T: EthSpec>(
// Sends the body of the request to `ctx.last_echo_request` so we can inspect requests.
let echo = warp::path("echo")
.and(warp::body::bytes())
.and(ctx_filter.clone())
.and(ctx_filter)
.and_then(|bytes: Bytes, ctx: Arc<Context<T>>| async move {
*ctx.last_echo_request.write().await = Some(bytes.clone());
Ok::<_, warp::reject::Rejection>(