[Merge] Implement execution_layer (#2635)

* Checkout serde_utils from rayonism

* Make eth1::http functions pub

* Add bones of execution_layer

* Modify decoding

* Expose Transaction, cargo fmt

* Add executePayload

* Add all minimal spec endpoints

* Start adding json rpc wrapper

* Finish custom JSON response handler

* Switch to new rpc sending method

* Add first test

* Fix camelCase

* Finish adding tests

* Begin threading execution layer into BeaconChain

* Fix clippy lints

* Fix clippy lints

* Thread execution layer into ClientBuilder

* Add CLI flags

* Add block processing methods to ExecutionLayer

* Add block_on to execution_layer

* Integrate execute_payload

* Add extra_data field

* Begin implementing payload handle

* Send consensus valid/invalid messages

* Fix minor type in task_executor

* Call forkchoiceUpdated

* Add search for TTD block

* Thread TTD into execution layer

* Allow producing block with execution payload

* Add LRU cache for execution blocks

* Remove duplicate 0x on ssz_types serialization

* Add tests for block getter methods

* Add basic block generator impl

* Add is_valid_terminal_block to EL

* Verify merge block in block_verification

* Partially implement --terminal-block-hash-override

* Add terminal_block_hash to ChainSpec

* Remove Option from terminal_block_hash in EL

* Revert merge changes to consensus/fork_choice

* Remove commented-out code

* Add bones for handling RPC methods on test server

* Add first ExecutionLayer tests

* Add testing for finding terminal block

* Prevent infinite loops

* Add insert_merge_block to block gen

* Add block gen test for pos blocks

* Start adding payloads to block gen

* Fix clippy lints

* Add execution payload to block gen

* Add execute_payload to block_gen

* Refactor block gen

* Add all routes to mock server

* Use Uint256 for base_fee_per_gas

* Add working execution chain build

* Remove unused var

* Revert "Use Uint256 for base_fee_per_gas"

This reverts commit 6c88f19ac4.

* Fix base_fee_for_gas Uint256

* Update execute payload handle

* Improve testing, fix bugs

* Fix default fee-recipient

* Fix fee-recipient address (again)

* Add check for terminal block, add comments, tidy

* Apply suggestions from code review

Co-authored-by: realbigsean <seananderson33@GMAIL.com>

* Fix is_none on handle Drop

* Remove commented-out tests

Co-authored-by: realbigsean <seananderson33@GMAIL.com>
This commit is contained in:
Paul Hauner
2021-09-30 08:14:15 +10:00
parent 1563bce905
commit d8623cfc4f
38 changed files with 3239 additions and 114 deletions

View File

@@ -0,0 +1,114 @@
use async_trait::async_trait;
use eth1::http::RpcError;
use serde::{Deserialize, Serialize};
pub const LATEST_TAG: &str = "latest";
pub use types::{Address, EthSpec, ExecutionPayload, Hash256, Uint256};
pub mod http;
pub type PayloadId = u64;
#[derive(Debug)]
pub enum Error {
Reqwest(reqwest::Error),
BadResponse(String),
RequestFailed(String),
JsonRpc(RpcError),
Json(serde_json::Error),
ServerMessage { code: i64, message: String },
Eip155Failure,
IsSyncing,
ExecutionBlockNotFound(Hash256),
ExecutionHeadBlockNotFound,
ParentHashEqualsBlockHash(Hash256),
}
impl From<reqwest::Error> for Error {
fn from(e: reqwest::Error) -> Self {
Error::Reqwest(e)
}
}
impl From<serde_json::Error> for Error {
fn from(e: serde_json::Error) -> Self {
Error::Json(e)
}
}
/// A generic interface for an execution engine API.
#[async_trait]
pub trait EngineApi {
async fn upcheck(&self) -> Result<(), Error>;
async fn get_block_by_number<'a>(
&self,
block_by_number: BlockByNumberQuery<'a>,
) -> Result<Option<ExecutionBlock>, Error>;
async fn get_block_by_hash<'a>(
&self,
block_hash: Hash256,
) -> Result<Option<ExecutionBlock>, Error>;
async fn prepare_payload(
&self,
parent_hash: Hash256,
timestamp: u64,
random: Hash256,
fee_recipient: Address,
) -> Result<PayloadId, Error>;
async fn execute_payload<T: EthSpec>(
&self,
execution_payload: ExecutionPayload<T>,
) -> Result<ExecutePayloadResponse, Error>;
async fn get_payload<T: EthSpec>(
&self,
payload_id: PayloadId,
) -> Result<ExecutionPayload<T>, Error>;
async fn consensus_validated(
&self,
block_hash: Hash256,
status: ConsensusStatus,
) -> Result<(), Error>;
async fn forkchoice_updated(
&self,
head_block_hash: Hash256,
finalized_block_hash: Hash256,
) -> Result<(), Error>;
}
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub enum ExecutePayloadResponse {
Valid,
Invalid,
Syncing,
}
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub enum ConsensusStatus {
Valid,
Invalid,
}
#[derive(Clone, Copy, Debug, PartialEq, Serialize)]
#[serde(untagged)]
pub enum BlockByNumberQuery<'a> {
Tag(&'a str),
}
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ExecutionBlock {
pub block_hash: Hash256,
pub block_number: u64,
pub parent_hash: Hash256,
pub total_difficulty: Uint256,
}

View File

@@ -0,0 +1,637 @@
//! Contains an implementation of `EngineAPI` using the JSON-RPC API via HTTP.
use super::*;
use async_trait::async_trait;
use eth1::http::EIP155_ERROR_STR;
use reqwest::header::CONTENT_TYPE;
use sensitive_url::SensitiveUrl;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::json;
use std::time::Duration;
use types::{EthSpec, FixedVector, Transaction, Unsigned, VariableList};
pub use reqwest::Client;
const STATIC_ID: u32 = 1;
pub const JSONRPC_VERSION: &str = "2.0";
pub const RETURN_FULL_TRANSACTION_OBJECTS: bool = false;
pub const ETH_GET_BLOCK_BY_NUMBER: &str = "eth_getBlockByNumber";
pub const ETH_GET_BLOCK_BY_NUMBER_TIMEOUT: Duration = Duration::from_secs(1);
pub const ETH_GET_BLOCK_BY_HASH: &str = "eth_getBlockByHash";
pub const ETH_GET_BLOCK_BY_HASH_TIMEOUT: Duration = Duration::from_secs(1);
pub const ETH_SYNCING: &str = "eth_syncing";
pub const ETH_SYNCING_TIMEOUT: Duration = Duration::from_millis(250);
pub const ENGINE_PREPARE_PAYLOAD: &str = "engine_preparePayload";
pub const ENGINE_PREPARE_PAYLOAD_TIMEOUT: Duration = Duration::from_millis(500);
pub const ENGINE_EXECUTE_PAYLOAD: &str = "engine_executePayload";
pub const ENGINE_EXECUTE_PAYLOAD_TIMEOUT: Duration = Duration::from_secs(2);
pub const ENGINE_GET_PAYLOAD: &str = "engine_getPayload";
pub const ENGINE_GET_PAYLOAD_TIMEOUT: Duration = Duration::from_secs(2);
pub const ENGINE_CONSENSUS_VALIDATED: &str = "engine_consensusValidated";
pub const ENGINE_CONSENSUS_VALIDATED_TIMEOUT: Duration = Duration::from_millis(500);
pub const ENGINE_FORKCHOICE_UPDATED: &str = "engine_forkchoiceUpdated";
pub const ENGINE_FORKCHOICE_UPDATED_TIMEOUT: Duration = Duration::from_millis(500);
pub struct HttpJsonRpc {
pub client: Client,
pub url: SensitiveUrl,
}
impl HttpJsonRpc {
pub fn new(url: SensitiveUrl) -> Result<Self, Error> {
Ok(Self {
client: Client::builder().build()?,
url,
})
}
pub async fn rpc_request<T: DeserializeOwned>(
&self,
method: &str,
params: serde_json::Value,
timeout: Duration,
) -> Result<T, Error> {
let body = JsonRequestBody {
jsonrpc: JSONRPC_VERSION,
method,
params,
id: STATIC_ID,
};
let body: JsonResponseBody = self
.client
.post(self.url.full.clone())
.timeout(timeout)
.header(CONTENT_TYPE, "application/json")
.json(&body)
.send()
.await?
.error_for_status()?
.json()
.await?;
match (body.result, body.error) {
(result, None) => serde_json::from_value(result).map_err(Into::into),
(_, Some(error)) => {
if error.message.contains(EIP155_ERROR_STR) {
Err(Error::Eip155Failure)
} else {
Err(Error::ServerMessage {
code: error.code,
message: error.message,
})
}
}
}
}
}
#[async_trait]
impl EngineApi for HttpJsonRpc {
async fn upcheck(&self) -> Result<(), Error> {
let result: serde_json::Value = self
.rpc_request(ETH_SYNCING, json!([]), ETH_SYNCING_TIMEOUT)
.await?;
/*
* TODO
*
* Check the network and chain ids. We omit this to save time for the merge f2f and since it
* also seems like it might get annoying during development.
*/
match result.as_bool() {
Some(false) => Ok(()),
_ => Err(Error::IsSyncing),
}
}
async fn get_block_by_number<'a>(
&self,
query: BlockByNumberQuery<'a>,
) -> Result<Option<ExecutionBlock>, Error> {
let params = json!([query, RETURN_FULL_TRANSACTION_OBJECTS]);
self.rpc_request(
ETH_GET_BLOCK_BY_NUMBER,
params,
ETH_GET_BLOCK_BY_NUMBER_TIMEOUT,
)
.await
}
async fn get_block_by_hash<'a>(
&self,
block_hash: Hash256,
) -> Result<Option<ExecutionBlock>, Error> {
let params = json!([block_hash, RETURN_FULL_TRANSACTION_OBJECTS]);
self.rpc_request(ETH_GET_BLOCK_BY_HASH, params, ETH_GET_BLOCK_BY_HASH_TIMEOUT)
.await
}
async fn prepare_payload(
&self,
parent_hash: Hash256,
timestamp: u64,
random: Hash256,
fee_recipient: Address,
) -> Result<PayloadId, Error> {
let params = json!([JsonPreparePayloadRequest {
parent_hash,
timestamp,
random,
fee_recipient
}]);
let response: JsonPayloadId = self
.rpc_request(
ENGINE_PREPARE_PAYLOAD,
params,
ENGINE_PREPARE_PAYLOAD_TIMEOUT,
)
.await?;
Ok(response.payload_id)
}
async fn execute_payload<T: EthSpec>(
&self,
execution_payload: ExecutionPayload<T>,
) -> Result<ExecutePayloadResponse, Error> {
let params = json!([JsonExecutionPayload::from(execution_payload)]);
self.rpc_request(
ENGINE_EXECUTE_PAYLOAD,
params,
ENGINE_EXECUTE_PAYLOAD_TIMEOUT,
)
.await
}
async fn get_payload<T: EthSpec>(
&self,
payload_id: PayloadId,
) -> Result<ExecutionPayload<T>, Error> {
let params = json!([JsonPayloadId { payload_id }]);
let response: JsonExecutionPayload<T> = self
.rpc_request(ENGINE_GET_PAYLOAD, params, ENGINE_GET_PAYLOAD_TIMEOUT)
.await?;
Ok(ExecutionPayload::from(response))
}
async fn consensus_validated(
&self,
block_hash: Hash256,
status: ConsensusStatus,
) -> Result<(), Error> {
let params = json!([JsonConsensusValidatedRequest { block_hash, status }]);
self.rpc_request(
ENGINE_CONSENSUS_VALIDATED,
params,
ENGINE_CONSENSUS_VALIDATED_TIMEOUT,
)
.await
}
async fn forkchoice_updated(
&self,
head_block_hash: Hash256,
finalized_block_hash: Hash256,
) -> Result<(), Error> {
let params = json!([JsonForkChoiceUpdatedRequest {
head_block_hash,
finalized_block_hash
}]);
self.rpc_request(
ENGINE_FORKCHOICE_UPDATED,
params,
ENGINE_FORKCHOICE_UPDATED_TIMEOUT,
)
.await
}
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct JsonRequestBody<'a> {
jsonrpc: &'a str,
method: &'a str,
params: serde_json::Value,
id: u32,
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
struct JsonError {
code: i64,
message: String,
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct JsonResponseBody {
jsonrpc: String,
#[serde(default)]
error: Option<JsonError>,
#[serde(default)]
result: serde_json::Value,
id: u32,
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct JsonPreparePayloadRequest {
pub parent_hash: Hash256,
#[serde(with = "eth2_serde_utils::u64_hex_be")]
pub timestamp: u64,
pub random: Hash256,
pub fee_recipient: Address,
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[serde(transparent, rename_all = "camelCase")]
pub struct JsonPayloadId {
#[serde(with = "eth2_serde_utils::u64_hex_be")]
pub payload_id: u64,
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[serde(bound = "T: EthSpec", rename_all = "camelCase")]
pub struct JsonExecutionPayload<T: EthSpec> {
pub parent_hash: Hash256,
pub coinbase: Address,
pub state_root: Hash256,
pub receipt_root: Hash256,
#[serde(with = "serde_logs_bloom")]
pub logs_bloom: FixedVector<u8, T::BytesPerLogsBloom>,
pub random: Hash256,
#[serde(with = "eth2_serde_utils::u64_hex_be")]
pub block_number: u64,
#[serde(with = "eth2_serde_utils::u64_hex_be")]
pub gas_limit: u64,
#[serde(with = "eth2_serde_utils::u64_hex_be")]
pub gas_used: u64,
#[serde(with = "eth2_serde_utils::u64_hex_be")]
pub timestamp: u64,
// FIXME(paul): check serialization
#[serde(with = "ssz_types::serde_utils::hex_var_list")]
pub extra_data: VariableList<u8, T::MaxExtraDataBytes>,
pub base_fee_per_gas: Uint256,
pub block_hash: Hash256,
// FIXME(paul): add transaction parsing.
#[serde(default, skip_deserializing)]
pub transactions: VariableList<Transaction<T>, T::MaxTransactionsPerPayload>,
}
impl<T: EthSpec> From<ExecutionPayload<T>> for JsonExecutionPayload<T> {
fn from(e: ExecutionPayload<T>) -> Self {
Self {
parent_hash: e.parent_hash,
coinbase: e.coinbase,
state_root: e.state_root,
receipt_root: e.receipt_root,
logs_bloom: e.logs_bloom,
random: e.random,
block_number: e.block_number,
gas_limit: e.gas_limit,
gas_used: e.gas_used,
timestamp: e.timestamp,
extra_data: e.extra_data,
base_fee_per_gas: Uint256::from_little_endian(e.base_fee_per_gas.as_bytes()),
block_hash: e.block_hash,
transactions: e.transactions,
}
}
}
impl<T: EthSpec> From<JsonExecutionPayload<T>> for ExecutionPayload<T> {
fn from(e: JsonExecutionPayload<T>) -> Self {
Self {
parent_hash: e.parent_hash,
coinbase: e.coinbase,
state_root: e.state_root,
receipt_root: e.receipt_root,
logs_bloom: e.logs_bloom,
random: e.random,
block_number: e.block_number,
gas_limit: e.gas_limit,
gas_used: e.gas_used,
timestamp: e.timestamp,
extra_data: e.extra_data,
base_fee_per_gas: uint256_to_hash256(e.base_fee_per_gas),
block_hash: e.block_hash,
transactions: e.transactions,
}
}
}
fn uint256_to_hash256(u: Uint256) -> Hash256 {
let mut bytes = [0; 32];
u.to_little_endian(&mut bytes);
Hash256::from_slice(&bytes)
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct JsonConsensusValidatedRequest {
pub block_hash: Hash256,
pub status: ConsensusStatus,
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct JsonForkChoiceUpdatedRequest {
pub head_block_hash: Hash256,
pub finalized_block_hash: Hash256,
}
// Serializes the `logs_bloom` field.
pub mod serde_logs_bloom {
use super::*;
use eth2_serde_utils::hex::PrefixedHexVisitor;
use serde::{Deserializer, Serializer};
pub fn serialize<S, U>(bytes: &FixedVector<u8, U>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
U: Unsigned,
{
let mut hex_string: String = "0x".to_string();
hex_string.push_str(&hex::encode(&bytes[..]));
serializer.serialize_str(&hex_string)
}
pub fn deserialize<'de, D, U>(deserializer: D) -> Result<FixedVector<u8, U>, D::Error>
where
D: Deserializer<'de>,
U: Unsigned,
{
let vec = deserializer.deserialize_string(PrefixedHexVisitor)?;
FixedVector::new(vec)
.map_err(|e| serde::de::Error::custom(format!("invalid logs bloom: {:?}", e)))
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::test_utils::MockServer;
use std::future::Future;
use std::sync::Arc;
use types::MainnetEthSpec;
struct Tester {
server: MockServer<MainnetEthSpec>,
echo_client: Arc<HttpJsonRpc>,
}
impl Tester {
pub fn new() -> Self {
let server = MockServer::unit_testing();
let echo_url = SensitiveUrl::parse(&format!("{}/echo", server.url())).unwrap();
let echo_client = Arc::new(HttpJsonRpc::new(echo_url).unwrap());
Self {
server,
echo_client,
}
}
pub async fn assert_request_equals<R, F>(
self,
request_func: R,
expected_json: serde_json::Value,
) -> Self
where
R: Fn(Arc<HttpJsonRpc>) -> F,
F: Future<Output = ()>,
{
request_func(self.echo_client.clone()).await;
let request_bytes = self.server.last_echo_request().await;
let request_json: serde_json::Value =
serde_json::from_slice(&request_bytes).expect("request was not valid json");
if request_json != expected_json {
panic!(
"json mismatch!\n\nobserved: {}\n\nexpected: {}\n\n",
request_json.to_string(),
expected_json.to_string()
)
}
self
}
}
const HASH_00: &str = "0x0000000000000000000000000000000000000000000000000000000000000000";
const HASH_01: &str = "0x0101010101010101010101010101010101010101010101010101010101010101";
const ADDRESS_00: &str = "0x0000000000000000000000000000000000000000";
const ADDRESS_01: &str = "0x0101010101010101010101010101010101010101";
const LOGS_BLOOM_01: &str = "0x01010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101";
#[tokio::test]
async fn get_block_by_number_request() {
Tester::new()
.assert_request_equals(
|client| async move {
let _ = client
.get_block_by_number(BlockByNumberQuery::Tag(LATEST_TAG))
.await;
},
json!({
"id": STATIC_ID,
"jsonrpc": JSONRPC_VERSION,
"method": ETH_GET_BLOCK_BY_NUMBER,
"params": ["latest", false]
}),
)
.await;
}
#[tokio::test]
async fn get_block_by_hash_request() {
Tester::new()
.assert_request_equals(
|client| async move {
let _ = client.get_block_by_hash(Hash256::repeat_byte(1)).await;
},
json!({
"id": STATIC_ID,
"jsonrpc": JSONRPC_VERSION,
"method": ETH_GET_BLOCK_BY_HASH,
"params": [HASH_01, false]
}),
)
.await;
}
#[tokio::test]
async fn prepare_payload_request() {
Tester::new()
.assert_request_equals(
|client| async move {
let _ = client
.prepare_payload(
Hash256::repeat_byte(0),
42,
Hash256::repeat_byte(1),
Address::repeat_byte(0),
)
.await;
},
json!({
"id": STATIC_ID,
"jsonrpc": JSONRPC_VERSION,
"method": ENGINE_PREPARE_PAYLOAD,
"params": [{
"parentHash": HASH_00,
"timestamp": "0x2a",
"random": HASH_01,
"feeRecipient": ADDRESS_00,
}]
}),
)
.await;
}
#[tokio::test]
async fn get_payload_request() {
Tester::new()
.assert_request_equals(
|client| async move {
let _ = client.get_payload::<MainnetEthSpec>(42).await;
},
json!({
"id": STATIC_ID,
"jsonrpc": JSONRPC_VERSION,
"method": ENGINE_GET_PAYLOAD,
"params": ["0x2a"]
}),
)
.await;
}
#[tokio::test]
async fn execute_payload_request() {
Tester::new()
.assert_request_equals(
|client| async move {
let _ = client
.execute_payload::<MainnetEthSpec>(ExecutionPayload {
parent_hash: Hash256::repeat_byte(0),
coinbase: Address::repeat_byte(1),
state_root: Hash256::repeat_byte(1),
receipt_root: Hash256::repeat_byte(0),
logs_bloom: vec![1; 256].into(),
random: Hash256::repeat_byte(1),
block_number: 0,
gas_limit: 1,
gas_used: 2,
timestamp: 42,
extra_data: vec![].into(),
base_fee_per_gas: uint256_to_hash256(Uint256::from(1)),
block_hash: Hash256::repeat_byte(1),
transactions: vec![].into(),
})
.await;
},
json!({
"id": STATIC_ID,
"jsonrpc": JSONRPC_VERSION,
"method": ENGINE_EXECUTE_PAYLOAD,
"params": [{
"parentHash": HASH_00,
"coinbase": ADDRESS_01,
"stateRoot": HASH_01,
"receiptRoot": HASH_00,
"logsBloom": LOGS_BLOOM_01,
"random": HASH_01,
"blockNumber": "0x0",
"gasLimit": "0x1",
"gasUsed": "0x2",
"timestamp": "0x2a",
"extraData": "0x",
"baseFeePerGas": "0x1",
"blockHash": HASH_01,
"transactions": [],
}]
}),
)
.await;
}
#[tokio::test]
async fn consensus_validated_request() {
Tester::new()
.assert_request_equals(
|client| async move {
let _ = client
.consensus_validated(Hash256::repeat_byte(0), ConsensusStatus::Valid)
.await;
},
json!({
"id": STATIC_ID,
"jsonrpc": JSONRPC_VERSION,
"method": ENGINE_CONSENSUS_VALIDATED,
"params": [{
"blockHash": HASH_00,
"status": "VALID",
}]
}),
)
.await
.assert_request_equals(
|client| async move {
let _ = client
.consensus_validated(Hash256::repeat_byte(1), ConsensusStatus::Invalid)
.await;
},
json!({
"id": STATIC_ID,
"jsonrpc": JSONRPC_VERSION,
"method": ENGINE_CONSENSUS_VALIDATED,
"params": [{
"blockHash": HASH_01,
"status": "INVALID",
}]
}),
)
.await;
}
#[tokio::test]
async fn forkchoice_updated_request() {
Tester::new()
.assert_request_equals(
|client| async move {
let _ = client
.forkchoice_updated(Hash256::repeat_byte(0), Hash256::repeat_byte(1))
.await;
},
json!({
"id": STATIC_ID,
"jsonrpc": JSONRPC_VERSION,
"method": ENGINE_FORKCHOICE_UPDATED,
"params": [{
"headBlockHash": HASH_00,
"finalizedBlockHash": HASH_01,
}]
}),
)
.await;
}
}

View File

@@ -0,0 +1,239 @@
//! Provides generic behaviour for multiple execution engines, specifically fallback behaviour.
use crate::engine_api::{EngineApi, Error as EngineApiError};
use futures::future::join_all;
use slog::{crit, error, info, warn, Logger};
use std::future::Future;
use tokio::sync::RwLock;
/// Stores the remembered state of a engine.
#[derive(Copy, Clone, PartialEq)]
enum EngineState {
Online,
Offline,
}
impl EngineState {
fn set_online(&mut self) {
*self = EngineState::Online
}
fn set_offline(&mut self) {
*self = EngineState::Offline
}
fn is_online(&self) -> bool {
*self == EngineState::Online
}
fn is_offline(&self) -> bool {
*self == EngineState::Offline
}
}
/// An execution engine.
pub struct Engine<T> {
pub id: String,
pub api: T,
state: RwLock<EngineState>,
}
impl<T> Engine<T> {
/// Creates a new, offline engine.
pub fn new(id: String, api: T) -> Self {
Self {
id,
api,
state: RwLock::new(EngineState::Offline),
}
}
}
/// Holds multiple execution engines and provides functionality for managing them in a fallback
/// manner.
pub struct Engines<T> {
pub engines: Vec<Engine<T>>,
pub log: Logger,
}
#[derive(Debug)]
pub enum EngineError {
Offline { id: String },
Api { id: String, error: EngineApiError },
}
impl<T: EngineApi> Engines<T> {
/// Run the `EngineApi::upcheck` function on all nodes which are currently offline.
///
/// This can be used to try and recover any offline nodes.
async fn upcheck_offline(&self) {
let upcheck_futures = self.engines.iter().map(|engine| async move {
let mut state = engine.state.write().await;
if state.is_offline() {
match engine.api.upcheck().await {
Ok(()) => {
info!(
self.log,
"Execution engine online";
"id" => &engine.id
);
state.set_online()
}
Err(e) => {
warn!(
self.log,
"Execution engine offline";
"error" => ?e,
"id" => &engine.id
)
}
}
}
*state
});
let num_online = join_all(upcheck_futures)
.await
.into_iter()
.filter(|state: &EngineState| state.is_online())
.count();
if num_online == 0 {
crit!(
self.log,
"No execution engines online";
)
}
}
/// Run `func` on all engines, in the order in which they are defined, returning the first
/// successful result that is found.
///
/// This function might try to run `func` twice. If all nodes return an error on the first time
/// it runs, it will try to upcheck all offline nodes and then run the function again.
pub async fn first_success<'a, F, G, H>(&'a self, func: F) -> Result<H, Vec<EngineError>>
where
F: Fn(&'a Engine<T>) -> G + Copy,
G: Future<Output = Result<H, EngineApiError>>,
{
match self.first_success_without_retry(func).await {
Ok(result) => Ok(result),
Err(mut first_errors) => {
// Try to recover some nodes.
self.upcheck_offline().await;
// Retry the call on all nodes.
match self.first_success_without_retry(func).await {
Ok(result) => Ok(result),
Err(second_errors) => {
first_errors.extend(second_errors);
Err(first_errors)
}
}
}
}
}
/// Run `func` on all engines, in the order in which they are defined, returning the first
/// successful result that is found.
async fn first_success_without_retry<'a, F, G, H>(
&'a self,
func: F,
) -> Result<H, Vec<EngineError>>
where
F: Fn(&'a Engine<T>) -> G,
G: Future<Output = Result<H, EngineApiError>>,
{
let mut errors = vec![];
for engine in &self.engines {
let engine_online = engine.state.read().await.is_online();
if engine_online {
match func(engine).await {
Ok(result) => return Ok(result),
Err(error) => {
error!(
self.log,
"Execution engine call failed";
"error" => ?error,
"id" => &engine.id
);
engine.state.write().await.set_offline();
errors.push(EngineError::Api {
id: engine.id.clone(),
error,
})
}
}
} else {
errors.push(EngineError::Offline {
id: engine.id.clone(),
})
}
}
Err(errors)
}
/// Runs `func` on all nodes concurrently, returning all results.
///
/// This function might try to run `func` twice. If all nodes return an error on the first time
/// it runs, it will try to upcheck all offline nodes and then run the function again.
pub async fn broadcast<'a, F, G, H>(&'a self, func: F) -> Vec<Result<H, EngineError>>
where
F: Fn(&'a Engine<T>) -> G + Copy,
G: Future<Output = Result<H, EngineApiError>>,
{
let first_results = self.broadcast_without_retry(func).await;
let mut any_offline = false;
for result in &first_results {
match result {
Ok(_) => return first_results,
Err(EngineError::Offline { .. }) => any_offline = true,
_ => (),
}
}
if any_offline {
self.upcheck_offline().await;
self.broadcast_without_retry(func).await
} else {
first_results
}
}
/// Runs `func` on all nodes concurrently, returning all results.
pub async fn broadcast_without_retry<'a, F, G, H>(
&'a self,
func: F,
) -> Vec<Result<H, EngineError>>
where
F: Fn(&'a Engine<T>) -> G,
G: Future<Output = Result<H, EngineApiError>>,
{
let func = &func;
let futures = self.engines.iter().map(|engine| async move {
let engine_online = engine.state.read().await.is_online();
if engine_online {
func(engine).await.map_err(|error| {
error!(
self.log,
"Execution engine call failed";
"error" => ?error,
"id" => &engine.id
);
EngineError::Api {
id: engine.id.clone(),
error,
}
})
} else {
Err(EngineError::Offline {
id: engine.id.clone(),
})
}
});
join_all(futures).await
}
}

View File

@@ -0,0 +1,103 @@
use crate::{ConsensusStatus, ExecutionLayer};
use slog::{crit, error, Logger};
use types::Hash256;
/// Provides a "handle" which should be returned after an `engine_executePayload` call.
///
/// This handle allows the holder to send a valid or invalid message to the execution nodes to
/// indicate the consensus verification status of `self.block_hash`.
///
/// Most notably, this `handle` will send an "invalid" message when it is dropped unless it has
/// already sent a "valid" or "invalid" message. This is to help ensure that any accidental
/// dropping of this handle results in an "invalid" message. Such dropping would be expected when a
/// block verification returns early with an error.
pub struct ExecutePayloadHandle {
pub(crate) block_hash: Hash256,
pub(crate) execution_layer: Option<ExecutionLayer>,
pub(crate) log: Logger,
}
impl ExecutePayloadHandle {
/// Publish a "valid" message to all nodes for `self.block_hash`.
pub fn publish_consensus_valid(mut self) {
self.publish_blocking(ConsensusStatus::Valid)
}
/// Publish an "invalid" message to all nodes for `self.block_hash`.
pub fn publish_consensus_invalid(mut self) {
self.publish_blocking(ConsensusStatus::Invalid)
}
/// Publish the `status` message to all nodes for `self.block_hash`.
pub async fn publish_async(&mut self, status: ConsensusStatus) {
if let Some(execution_layer) = self.execution_layer() {
publish(&execution_layer, self.block_hash, status, &self.log).await
}
}
/// Publishes a message, suitable for running in a non-async context.
fn publish_blocking(&mut self, status: ConsensusStatus) {
if let Some(execution_layer) = self.execution_layer() {
let log = &self.log.clone();
let block_hash = self.block_hash;
if let Err(e) = execution_layer.block_on(|execution_layer| async move {
publish(execution_layer, block_hash, status, log).await;
Ok(())
}) {
error!(
self.log,
"Failed to spawn payload status task";
"error" => ?e,
"block_hash" => ?block_hash,
"status" => ?status,
);
}
}
}
/// Takes `self.execution_layer`, it cannot be used to send another duplicate or conflicting
/// message. Creates a log message if such an attempt is made.
fn execution_layer(&mut self) -> Option<ExecutionLayer> {
let execution_layer = self.execution_layer.take();
if execution_layer.is_none() {
crit!(
self.log,
"Double usage of ExecutePayloadHandle";
"block_hash" => ?self.block_hash,
);
}
execution_layer
}
}
/// Publish a `status`, creating a log message if it fails.
async fn publish(
execution_layer: &ExecutionLayer,
block_hash: Hash256,
status: ConsensusStatus,
log: &Logger,
) {
if let Err(e) = execution_layer
.consensus_validated(block_hash, status)
.await
{
// TODO(paul): consider how to recover when we are temporarily unable to tell a node
// that the block was valid.
crit!(
log,
"Failed to update execution consensus status";
"error" => ?e,
"block_hash" => ?block_hash,
"status" => ?status,
);
}
}
/// See the struct-level documentation for the reasoning for this `Drop` implementation.
impl Drop for ExecutePayloadHandle {
fn drop(&mut self) {
if self.execution_layer.is_some() {
self.publish_blocking(ConsensusStatus::Invalid)
}
}
}

View File

@@ -0,0 +1,799 @@
//! This crate provides an abstraction over one or more *execution engines*. An execution engine
//! was formerly known as an "eth1 node", like Geth, Nethermind, Erigon, etc.
//!
//! This crate only provides useful functionality for "The Merge", it does not provide any of the
//! deposit-contract functionality that the `beacon_node/eth1` crate already provides.
use engine_api::{Error as ApiError, *};
use engines::{Engine, EngineError, Engines};
use lru::LruCache;
use sensitive_url::SensitiveUrl;
use slog::{crit, Logger};
use std::future::Future;
use std::sync::Arc;
use task_executor::TaskExecutor;
use tokio::sync::{Mutex, MutexGuard};
pub use engine_api::{http::HttpJsonRpc, ConsensusStatus, ExecutePayloadResponse};
pub use execute_payload_handle::ExecutePayloadHandle;
mod engine_api;
mod engines;
mod execute_payload_handle;
pub mod test_utils;
/// Each time the `ExecutionLayer` retrieves a block from an execution node, it stores that block
/// in an LRU cache to avoid redundant lookups. This is the size of that cache.
const EXECUTION_BLOCKS_LRU_CACHE_SIZE: usize = 128;
#[derive(Debug)]
pub enum Error {
NoEngines,
ApiError(ApiError),
EngineErrors(Vec<EngineError>),
NotSynced,
ShuttingDown,
FeeRecipientUnspecified,
}
impl From<ApiError> for Error {
fn from(e: ApiError) -> Self {
Error::ApiError(e)
}
}
struct Inner {
engines: Engines<HttpJsonRpc>,
terminal_total_difficulty: Uint256,
terminal_block_hash: Hash256,
fee_recipient: Option<Address>,
execution_blocks: Mutex<LruCache<Hash256, ExecutionBlock>>,
executor: TaskExecutor,
log: Logger,
}
/// Provides access to one or more execution engines and provides a neat interface for consumption
/// by the `BeaconChain`.
///
/// When there is more than one execution node specified, the others will be used in a "fallback"
/// fashion. Some requests may be broadcast to all nodes and others might only be sent to the first
/// node that returns a valid response. Ultimately, the purpose of fallback nodes is to provide
/// redundancy in the case where one node is offline.
///
/// The fallback nodes have an ordering. The first supplied will be the first contacted, and so on.
#[derive(Clone)]
pub struct ExecutionLayer {
inner: Arc<Inner>,
}
impl ExecutionLayer {
/// Instantiate `Self` with `urls.len()` engines, all using the JSON-RPC via HTTP.
pub fn from_urls(
urls: Vec<SensitiveUrl>,
terminal_total_difficulty: Uint256,
terminal_block_hash: Hash256,
fee_recipient: Option<Address>,
executor: TaskExecutor,
log: Logger,
) -> Result<Self, Error> {
if urls.is_empty() {
return Err(Error::NoEngines);
}
let engines = urls
.into_iter()
.map(|url| {
let id = url.to_string();
let api = HttpJsonRpc::new(url)?;
Ok(Engine::new(id, api))
})
.collect::<Result<_, ApiError>>()?;
let inner = Inner {
engines: Engines {
engines,
log: log.clone(),
},
terminal_total_difficulty,
terminal_block_hash,
fee_recipient,
execution_blocks: Mutex::new(LruCache::new(EXECUTION_BLOCKS_LRU_CACHE_SIZE)),
executor,
log,
};
Ok(Self {
inner: Arc::new(inner),
})
}
}
impl ExecutionLayer {
fn engines(&self) -> &Engines<HttpJsonRpc> {
&self.inner.engines
}
fn executor(&self) -> &TaskExecutor {
&self.inner.executor
}
fn terminal_total_difficulty(&self) -> Uint256 {
self.inner.terminal_total_difficulty
}
fn terminal_block_hash(&self) -> Hash256 {
self.inner.terminal_block_hash
}
fn fee_recipient(&self) -> Result<Address, Error> {
self.inner
.fee_recipient
.ok_or(Error::FeeRecipientUnspecified)
}
/// Note: this function returns a mutex guard, be careful to avoid deadlocks.
async fn execution_blocks(&self) -> MutexGuard<'_, LruCache<Hash256, ExecutionBlock>> {
self.inner.execution_blocks.lock().await
}
fn log(&self) -> &Logger {
&self.inner.log
}
/// Convenience function to allow calling async functions in a non-async context.
pub fn block_on<'a, T, U, V>(&'a self, generate_future: T) -> Result<V, Error>
where
T: Fn(&'a Self) -> U,
U: Future<Output = Result<V, Error>>,
{
let runtime = self
.executor()
.runtime()
.upgrade()
.ok_or(Error::ShuttingDown)?;
// TODO(paul): respect the shutdown signal.
runtime.block_on(generate_future(self))
}
/// Convenience function to allow spawning a task without waiting for the result.
pub fn spawn<T, U>(&self, generate_future: T, name: &'static str)
where
T: FnOnce(Self) -> U,
U: Future<Output = ()> + Send + 'static,
{
self.executor().spawn(generate_future(self.clone()), name);
}
/// Maps to the `engine_preparePayload` JSON-RPC function.
///
/// ## Fallback Behavior
///
/// The result will be returned from the first node that returns successfully. No more nodes
/// will be contacted.
pub async fn prepare_payload(
&self,
parent_hash: Hash256,
timestamp: u64,
random: Hash256,
) -> Result<PayloadId, Error> {
let fee_recipient = self.fee_recipient()?;
self.engines()
.first_success(|engine| {
// TODO(merge): make a cache for these IDs, so we don't always have to perform this
// request.
engine
.api
.prepare_payload(parent_hash, timestamp, random, fee_recipient)
})
.await
.map_err(Error::EngineErrors)
}
/// Maps to the `engine_getPayload` JSON-RPC call.
///
/// However, it will attempt to call `self.prepare_payload` if it cannot find an existing
/// payload id for the given parameters.
///
/// ## Fallback Behavior
///
/// The result will be returned from the first node that returns successfully. No more nodes
/// will be contacted.
pub async fn get_payload<T: EthSpec>(
&self,
parent_hash: Hash256,
timestamp: u64,
random: Hash256,
) -> Result<ExecutionPayload<T>, Error> {
let fee_recipient = self.fee_recipient()?;
self.engines()
.first_success(|engine| async move {
// TODO(merge): make a cache for these IDs, so we don't always have to perform this
// request.
let payload_id = engine
.api
.prepare_payload(parent_hash, timestamp, random, fee_recipient)
.await?;
engine.api.get_payload(payload_id).await
})
.await
.map_err(Error::EngineErrors)
}
/// Maps to the `engine_executePayload` JSON-RPC call.
///
/// ## Fallback Behaviour
///
/// The request will be broadcast to all nodes, simultaneously. It will await a response (or
/// failure) from all nodes and then return based on the first of these conditions which
/// returns true:
///
/// - Valid, if any nodes return valid.
/// - Invalid, if any nodes return invalid.
/// - Syncing, if any nodes return syncing.
/// - An error, if all nodes return an error.
pub async fn execute_payload<T: EthSpec>(
&self,
execution_payload: &ExecutionPayload<T>,
) -> Result<(ExecutePayloadResponse, ExecutePayloadHandle), Error> {
let broadcast_results = self
.engines()
.broadcast(|engine| engine.api.execute_payload(execution_payload.clone()))
.await;
let mut errors = vec![];
let mut valid = 0;
let mut invalid = 0;
let mut syncing = 0;
for result in broadcast_results {
match result {
Ok(ExecutePayloadResponse::Valid) => valid += 1,
Ok(ExecutePayloadResponse::Invalid) => invalid += 1,
Ok(ExecutePayloadResponse::Syncing) => syncing += 1,
Err(e) => errors.push(e),
}
}
if valid > 0 && invalid > 0 {
crit!(
self.log(),
"Consensus failure between execution nodes";
"method" => "execute_payload"
);
}
let execute_payload_response = if valid > 0 {
ExecutePayloadResponse::Valid
} else if invalid > 0 {
ExecutePayloadResponse::Invalid
} else if syncing > 0 {
ExecutePayloadResponse::Syncing
} else {
return Err(Error::EngineErrors(errors));
};
let execute_payload_handle = ExecutePayloadHandle {
block_hash: execution_payload.block_hash,
execution_layer: Some(self.clone()),
log: self.log().clone(),
};
Ok((execute_payload_response, execute_payload_handle))
}
/// Maps to the `engine_consensusValidated` JSON-RPC call.
///
/// ## Fallback Behaviour
///
/// The request will be broadcast to all nodes, simultaneously. It will await a response (or
/// failure) from all nodes and then return based on the first of these conditions which
/// returns true:
///
/// - Ok, if any node returns successfully.
/// - An error, if all nodes return an error.
pub async fn consensus_validated(
&self,
block_hash: Hash256,
status: ConsensusStatus,
) -> Result<(), Error> {
let broadcast_results = self
.engines()
.broadcast(|engine| engine.api.consensus_validated(block_hash, status))
.await;
if broadcast_results.iter().any(Result::is_ok) {
Ok(())
} else {
Err(Error::EngineErrors(
broadcast_results
.into_iter()
.filter_map(Result::err)
.collect(),
))
}
}
/// Maps to the `engine_consensusValidated` JSON-RPC call.
///
/// ## Fallback Behaviour
///
/// The request will be broadcast to all nodes, simultaneously. It will await a response (or
/// failure) from all nodes and then return based on the first of these conditions which
/// returns true:
///
/// - Ok, if any node returns successfully.
/// - An error, if all nodes return an error.
pub async fn forkchoice_updated(
&self,
head_block_hash: Hash256,
finalized_block_hash: Hash256,
) -> Result<(), Error> {
let broadcast_results = self
.engines()
.broadcast(|engine| {
engine
.api
.forkchoice_updated(head_block_hash, finalized_block_hash)
})
.await;
if broadcast_results.iter().any(Result::is_ok) {
Ok(())
} else {
Err(Error::EngineErrors(
broadcast_results
.into_iter()
.filter_map(Result::err)
.collect(),
))
}
}
/// Used during block production to determine if the merge has been triggered.
///
/// ## Specification
///
/// `get_terminal_pow_block_hash`
///
/// https://github.com/ethereum/consensus-specs/blob/v1.1.0/specs/merge/validator.md
pub async fn get_terminal_pow_block_hash(&self) -> Result<Option<Hash256>, Error> {
self.engines()
.first_success(|engine| async move {
if self.terminal_block_hash() != Hash256::zero() {
// Note: the specification is written such that if there are multiple blocks in
// the PoW chain with the terminal block hash, then to select 0'th one.
//
// Whilst it's not clear what the 0'th block is, we ignore this completely and
// make the assumption that there are no two blocks in the chain with the same
// hash. Such a scenario would be a devestating hash collision with external
// implications far outweighing those here.
Ok(self
.get_pow_block(engine, self.terminal_block_hash())
.await?
.map(|block| block.block_hash))
} else {
self.get_pow_block_hash_at_total_difficulty(engine).await
}
})
.await
.map_err(Error::EngineErrors)
}
/// This function should remain internal. External users should use
/// `self.get_terminal_pow_block` instead, since it checks against the terminal block hash
/// override.
///
/// ## Specification
///
/// `get_pow_block_at_terminal_total_difficulty`
///
/// https://github.com/ethereum/consensus-specs/blob/v1.1.0/specs/merge/validator.md
async fn get_pow_block_hash_at_total_difficulty(
&self,
engine: &Engine<HttpJsonRpc>,
) -> Result<Option<Hash256>, ApiError> {
let mut ttd_exceeding_block = None;
let mut block = engine
.api
.get_block_by_number(BlockByNumberQuery::Tag(LATEST_TAG))
.await?
.ok_or(ApiError::ExecutionHeadBlockNotFound)?;
self.execution_blocks().await.put(block.block_hash, block);
// TODO(merge): This function can theoretically loop indefinitely, as per the
// specification. We should consider how to fix this. See discussion:
//
// https://github.com/ethereum/consensus-specs/issues/2636
loop {
if block.total_difficulty >= self.terminal_total_difficulty() {
ttd_exceeding_block = Some(block.block_hash);
// Try to prevent infinite loops.
if block.block_hash == block.parent_hash {
return Err(ApiError::ParentHashEqualsBlockHash(block.block_hash));
}
block = self
.get_pow_block(engine, block.parent_hash)
.await?
.ok_or(ApiError::ExecutionBlockNotFound(block.parent_hash))?;
} else {
return Ok(ttd_exceeding_block);
}
}
}
/// Used during block verification to check that a block correctly triggers the merge.
///
/// ## Returns
///
/// - `Some(true)` if the given `block_hash` is the terminal proof-of-work block.
/// - `Some(false)` if the given `block_hash` is certainly *not* the terminal proof-of-work
/// block.
/// - `None` if the `block_hash` or its parent were not present on the execution engines.
/// - `Err(_)` if there was an error connecting to the execution engines.
///
/// ## Fallback Behaviour
///
/// The request will be broadcast to all nodes, simultaneously. It will await a response (or
/// failure) from all nodes and then return based on the first of these conditions which
/// returns true:
///
/// - Terminal, if any node indicates it is terminal.
/// - Not terminal, if any node indicates it is non-terminal.
/// - Block not found, if any node cannot find the block.
/// - An error, if all nodes return an error.
///
/// ## Specification
///
/// `is_valid_terminal_pow_block`
///
/// https://github.com/ethereum/consensus-specs/blob/v1.1.0/specs/merge/fork-choice.md
pub async fn is_valid_terminal_pow_block_hash(
&self,
block_hash: Hash256,
) -> Result<Option<bool>, Error> {
let broadcast_results = self
.engines()
.broadcast(|engine| async move {
if let Some(pow_block) = self.get_pow_block(engine, block_hash).await? {
if let Some(pow_parent) =
self.get_pow_block(engine, pow_block.parent_hash).await?
{
return Ok(Some(
self.is_valid_terminal_pow_block(pow_block, pow_parent),
));
}
}
Ok(None)
})
.await;
let mut errors = vec![];
let mut terminal = 0;
let mut not_terminal = 0;
let mut block_missing = 0;
for result in broadcast_results {
match result {
Ok(Some(true)) => terminal += 1,
Ok(Some(false)) => not_terminal += 1,
Ok(None) => block_missing += 1,
Err(e) => errors.push(e),
}
}
if terminal > 0 && not_terminal > 0 {
crit!(
self.log(),
"Consensus failure between execution nodes";
"method" => "is_valid_terminal_pow_block_hash"
);
}
if terminal > 0 {
Ok(Some(true))
} else if not_terminal > 0 {
Ok(Some(false))
} else if block_missing > 0 {
Ok(None)
} else {
Err(Error::EngineErrors(errors))
}
}
/// This function should remain internal.
///
/// External users should use `self.is_valid_terminal_pow_block_hash`.
fn is_valid_terminal_pow_block(&self, block: ExecutionBlock, parent: ExecutionBlock) -> bool {
if block.block_hash == self.terminal_block_hash() {
return true;
}
let is_total_difficulty_reached =
block.total_difficulty >= self.terminal_total_difficulty();
let is_parent_total_difficulty_valid =
parent.total_difficulty < self.terminal_total_difficulty();
is_total_difficulty_reached && is_parent_total_difficulty_valid
}
/// Maps to the `eth_getBlockByHash` JSON-RPC call.
///
/// ## TODO(merge)
///
/// This will return an execution block regardless of whether or not it was created by a PoW
/// miner (pre-merge) or a PoS validator (post-merge). It's not immediately clear if this is
/// correct or not, see the discussion here:
///
/// https://github.com/ethereum/consensus-specs/issues/2636
async fn get_pow_block(
&self,
engine: &Engine<HttpJsonRpc>,
hash: Hash256,
) -> Result<Option<ExecutionBlock>, ApiError> {
if let Some(cached) = self.execution_blocks().await.get(&hash).copied() {
// The block was in the cache, no need to request it from the execution
// engine.
return Ok(Some(cached));
}
// The block was *not* in the cache, request it from the execution
// engine and cache it for future reference.
if let Some(block) = engine.api.get_block_by_hash(hash).await? {
self.execution_blocks().await.put(hash, block);
Ok(Some(block))
} else {
Ok(None)
}
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::test_utils::{MockServer, DEFAULT_TERMINAL_DIFFICULTY};
use environment::null_logger;
use types::MainnetEthSpec;
struct SingleEngineTester {
server: MockServer<MainnetEthSpec>,
el: ExecutionLayer,
runtime: Option<Arc<tokio::runtime::Runtime>>,
_runtime_shutdown: exit_future::Signal,
}
impl SingleEngineTester {
pub fn new() -> Self {
let server = MockServer::unit_testing();
let url = SensitiveUrl::parse(&server.url()).unwrap();
let log = null_logger().unwrap();
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap(),
);
let (runtime_shutdown, exit) = exit_future::signal();
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let executor =
TaskExecutor::new(Arc::downgrade(&runtime), exit, log.clone(), shutdown_tx);
let el = ExecutionLayer::from_urls(
vec![url],
DEFAULT_TERMINAL_DIFFICULTY.into(),
Hash256::zero(),
Some(Address::repeat_byte(42)),
executor,
log,
)
.unwrap();
Self {
server,
el,
runtime: Some(runtime),
_runtime_shutdown: runtime_shutdown,
}
}
pub async fn produce_valid_execution_payload_on_head(self) -> Self {
let latest_execution_block = {
let block_gen = self.server.execution_block_generator().await;
block_gen.latest_block().unwrap()
};
let parent_hash = latest_execution_block.block_hash();
let block_number = latest_execution_block.block_number() + 1;
let timestamp = block_number;
let random = Hash256::from_low_u64_be(block_number);
let _payload_id = self
.el
.prepare_payload(parent_hash, timestamp, random)
.await
.unwrap();
let payload = self
.el
.get_payload::<MainnetEthSpec>(parent_hash, timestamp, random)
.await
.unwrap();
let block_hash = payload.block_hash;
assert_eq!(payload.parent_hash, parent_hash);
assert_eq!(payload.block_number, block_number);
assert_eq!(payload.timestamp, timestamp);
assert_eq!(payload.random, random);
let (payload_response, mut payload_handle) =
self.el.execute_payload(&payload).await.unwrap();
assert_eq!(payload_response, ExecutePayloadResponse::Valid);
payload_handle.publish_async(ConsensusStatus::Valid).await;
self.el
.forkchoice_updated(block_hash, Hash256::zero())
.await
.unwrap();
let head_execution_block = {
let block_gen = self.server.execution_block_generator().await;
block_gen.latest_block().unwrap()
};
assert_eq!(head_execution_block.block_number(), block_number);
assert_eq!(head_execution_block.block_hash(), block_hash);
assert_eq!(head_execution_block.parent_hash(), parent_hash);
self
}
pub async fn move_to_block_prior_to_terminal_block(self) -> Self {
let target_block = {
let block_gen = self.server.execution_block_generator().await;
block_gen.terminal_block_number.checked_sub(1).unwrap()
};
self.move_to_pow_block(target_block).await
}
pub async fn move_to_terminal_block(self) -> Self {
let target_block = {
let block_gen = self.server.execution_block_generator().await;
block_gen.terminal_block_number
};
self.move_to_pow_block(target_block).await
}
pub async fn move_to_pow_block(self, target_block: u64) -> Self {
{
let mut block_gen = self.server.execution_block_generator().await;
let next_block = block_gen.latest_block().unwrap().block_number() + 1;
assert!(target_block >= next_block);
block_gen
.insert_pow_blocks(next_block..=target_block)
.unwrap();
}
self
}
pub async fn with_terminal_block<'a, T, U>(self, func: T) -> Self
where
T: Fn(ExecutionLayer, Option<ExecutionBlock>) -> U,
U: Future<Output = ()>,
{
let terminal_block_number = self
.server
.execution_block_generator()
.await
.terminal_block_number;
let terminal_block = self
.server
.execution_block_generator()
.await
.execution_block_by_number(terminal_block_number);
func(self.el.clone(), terminal_block).await;
self
}
pub fn shutdown(&mut self) {
if let Some(runtime) = self.runtime.take() {
Arc::try_unwrap(runtime).unwrap().shutdown_background()
}
}
}
impl Drop for SingleEngineTester {
fn drop(&mut self) {
self.shutdown()
}
}
#[tokio::test]
async fn produce_three_valid_pos_execution_blocks() {
SingleEngineTester::new()
.move_to_terminal_block()
.await
.produce_valid_execution_payload_on_head()
.await
.produce_valid_execution_payload_on_head()
.await
.produce_valid_execution_payload_on_head()
.await;
}
#[tokio::test]
async fn finds_valid_terminal_block_hash() {
SingleEngineTester::new()
.move_to_block_prior_to_terminal_block()
.await
.with_terminal_block(|el, _| async move {
assert_eq!(el.get_terminal_pow_block_hash().await.unwrap(), None)
})
.await
.move_to_terminal_block()
.await
.with_terminal_block(|el, terminal_block| async move {
assert_eq!(
el.get_terminal_pow_block_hash().await.unwrap(),
Some(terminal_block.unwrap().block_hash)
)
})
.await;
}
#[tokio::test]
async fn verifies_valid_terminal_block_hash() {
SingleEngineTester::new()
.move_to_terminal_block()
.await
.with_terminal_block(|el, terminal_block| async move {
assert_eq!(
el.is_valid_terminal_pow_block_hash(terminal_block.unwrap().block_hash)
.await
.unwrap(),
Some(true)
)
})
.await;
}
#[tokio::test]
async fn rejects_invalid_terminal_block_hash() {
SingleEngineTester::new()
.move_to_terminal_block()
.await
.with_terminal_block(|el, terminal_block| async move {
let invalid_terminal_block = terminal_block.unwrap().parent_hash;
assert_eq!(
el.is_valid_terminal_pow_block_hash(invalid_terminal_block)
.await
.unwrap(),
Some(false)
)
})
.await;
}
#[tokio::test]
async fn rejects_unknown_terminal_block_hash() {
SingleEngineTester::new()
.move_to_terminal_block()
.await
.with_terminal_block(|el, _| async move {
let missing_terminal_block = Hash256::repeat_byte(42);
assert_eq!(
el.is_valid_terminal_pow_block_hash(missing_terminal_block)
.await
.unwrap(),
None
)
})
.await;
}
}

View File

@@ -0,0 +1,373 @@
use crate::engine_api::{
http::JsonPreparePayloadRequest, ConsensusStatus, ExecutePayloadResponse, ExecutionBlock,
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tree_hash::TreeHash;
use tree_hash_derive::TreeHash;
use types::{EthSpec, ExecutionPayload, Hash256, Uint256};
#[derive(Clone, Debug, PartialEq)]
#[allow(clippy::large_enum_variant)] // This struct is only for testing.
pub enum Block<T: EthSpec> {
PoW(PoWBlock),
PoS(ExecutionPayload<T>),
}
impl<T: EthSpec> Block<T> {
pub fn block_number(&self) -> u64 {
match self {
Block::PoW(block) => block.block_number,
Block::PoS(payload) => payload.block_number,
}
}
pub fn parent_hash(&self) -> Hash256 {
match self {
Block::PoW(block) => block.parent_hash,
Block::PoS(payload) => payload.parent_hash,
}
}
pub fn block_hash(&self) -> Hash256 {
match self {
Block::PoW(block) => block.block_hash,
Block::PoS(payload) => payload.block_hash,
}
}
pub fn total_difficulty(&self) -> Option<Uint256> {
match self {
Block::PoW(block) => Some(block.total_difficulty),
Block::PoS(_) => None,
}
}
pub fn as_execution_block(&self, total_difficulty: u64) -> ExecutionBlock {
match self {
Block::PoW(block) => ExecutionBlock {
block_hash: block.block_hash,
block_number: block.block_number,
parent_hash: block.parent_hash,
total_difficulty: block.total_difficulty,
},
Block::PoS(payload) => ExecutionBlock {
block_hash: payload.block_hash,
block_number: payload.block_number,
parent_hash: payload.parent_hash,
total_difficulty: total_difficulty.into(),
},
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize, TreeHash)]
#[serde(rename_all = "camelCase")]
pub struct PoWBlock {
pub block_number: u64,
pub block_hash: Hash256,
pub parent_hash: Hash256,
pub total_difficulty: Uint256,
}
pub struct ExecutionBlockGenerator<T: EthSpec> {
/*
* Common database
*/
blocks: HashMap<Hash256, Block<T>>,
block_hashes: HashMap<u64, Hash256>,
/*
* PoW block parameters
*/
pub terminal_total_difficulty: u64,
pub terminal_block_number: u64,
/*
* PoS block parameters
*/
pub pending_payloads: HashMap<Hash256, ExecutionPayload<T>>,
pub next_payload_id: u64,
pub payload_ids: HashMap<u64, ExecutionPayload<T>>,
}
impl<T: EthSpec> ExecutionBlockGenerator<T> {
pub fn new(terminal_total_difficulty: u64, terminal_block_number: u64) -> Self {
let mut gen = Self {
blocks: <_>::default(),
block_hashes: <_>::default(),
terminal_total_difficulty,
terminal_block_number,
pending_payloads: <_>::default(),
next_payload_id: 0,
payload_ids: <_>::default(),
};
gen.insert_pow_block(0).unwrap();
gen
}
pub fn latest_block(&self) -> Option<Block<T>> {
let hash = *self
.block_hashes
.iter()
.max_by_key(|(number, _)| *number)
.map(|(_, hash)| hash)?;
self.block_by_hash(hash)
}
pub fn latest_execution_block(&self) -> Option<ExecutionBlock> {
self.latest_block()
.map(|block| block.as_execution_block(self.terminal_total_difficulty))
}
pub fn block_by_number(&self, number: u64) -> Option<Block<T>> {
let hash = *self.block_hashes.get(&number)?;
self.block_by_hash(hash)
}
pub fn execution_block_by_number(&self, number: u64) -> Option<ExecutionBlock> {
self.block_by_number(number)
.map(|block| block.as_execution_block(self.terminal_total_difficulty))
}
pub fn block_by_hash(&self, hash: Hash256) -> Option<Block<T>> {
self.blocks.get(&hash).cloned()
}
pub fn execution_block_by_hash(&self, hash: Hash256) -> Option<ExecutionBlock> {
self.block_by_hash(hash)
.map(|block| block.as_execution_block(self.terminal_total_difficulty))
}
pub fn insert_pow_blocks(
&mut self,
block_numbers: impl Iterator<Item = u64>,
) -> Result<(), String> {
for i in block_numbers {
self.insert_pow_block(i)?;
}
Ok(())
}
pub fn insert_pow_block(&mut self, block_number: u64) -> Result<(), String> {
if block_number > self.terminal_block_number {
return Err(format!(
"{} is beyond terminal pow block {}",
block_number, self.terminal_block_number
));
}
let parent_hash = if block_number == 0 {
Hash256::zero()
} else if let Some(hash) = self.block_hashes.get(&(block_number - 1)) {
*hash
} else {
return Err(format!(
"parent with block number {} not found",
block_number - 1
));
};
let increment = self
.terminal_total_difficulty
.checked_div(self.terminal_block_number)
.expect("terminal block number must be non-zero");
let total_difficulty = increment
.checked_mul(block_number)
.expect("overflow computing total difficulty")
.into();
let mut block = PoWBlock {
block_number,
block_hash: Hash256::zero(),
parent_hash,
total_difficulty,
};
block.block_hash = block.tree_hash_root();
self.insert_block(Block::PoW(block))
}
pub fn insert_block(&mut self, block: Block<T>) -> Result<(), String> {
if self.blocks.contains_key(&block.block_hash()) {
return Err(format!("{:?} is already known", block.block_hash()));
} else if self.block_hashes.contains_key(&block.block_number()) {
return Err(format!(
"block {} is already known, forking is not supported",
block.block_number()
));
} else if block.parent_hash() != Hash256::zero()
&& !self.blocks.contains_key(&block.parent_hash())
{
return Err(format!("parent block {:?} is unknown", block.parent_hash()));
}
self.block_hashes
.insert(block.block_number(), block.block_hash());
self.blocks.insert(block.block_hash(), block);
Ok(())
}
pub fn prepare_payload(&mut self, payload: JsonPreparePayloadRequest) -> Result<u64, String> {
if !self
.blocks
.iter()
.any(|(_, block)| block.block_number() == self.terminal_block_number)
{
return Err("refusing to create payload id before terminal block".to_string());
}
let parent = self
.blocks
.get(&payload.parent_hash)
.ok_or_else(|| format!("unknown parent block {:?}", payload.parent_hash))?;
let id = self.next_payload_id;
self.next_payload_id += 1;
let mut execution_payload = ExecutionPayload {
parent_hash: payload.parent_hash,
coinbase: payload.fee_recipient,
receipt_root: Hash256::repeat_byte(42),
state_root: Hash256::repeat_byte(43),
logs_bloom: vec![0; 256].into(),
random: payload.random,
block_number: parent.block_number() + 1,
gas_limit: 10,
gas_used: 9,
timestamp: payload.timestamp,
extra_data: "block gen was here".as_bytes().to_vec().into(),
base_fee_per_gas: Hash256::from_low_u64_le(1),
block_hash: Hash256::zero(),
transactions: vec![].into(),
};
execution_payload.block_hash = execution_payload.tree_hash_root();
self.payload_ids.insert(id, execution_payload);
Ok(id)
}
pub fn get_payload(&mut self, id: u64) -> Option<ExecutionPayload<T>> {
self.payload_ids.remove(&id)
}
pub fn execute_payload(&mut self, payload: ExecutionPayload<T>) -> ExecutePayloadResponse {
let parent = if let Some(parent) = self.blocks.get(&payload.parent_hash) {
parent
} else {
return ExecutePayloadResponse::Invalid;
};
if payload.block_number != parent.block_number() + 1 {
return ExecutePayloadResponse::Invalid;
}
self.pending_payloads.insert(payload.block_hash, payload);
ExecutePayloadResponse::Valid
}
pub fn consensus_validated(
&mut self,
block_hash: Hash256,
status: ConsensusStatus,
) -> Result<(), String> {
let payload = self
.pending_payloads
.remove(&block_hash)
.ok_or_else(|| format!("no pending payload for {:?}", block_hash))?;
match status {
ConsensusStatus::Valid => self.insert_block(Block::PoS(payload)),
ConsensusStatus::Invalid => Ok(()),
}
}
pub fn forkchoice_updated(
&mut self,
block_hash: Hash256,
finalized_block_hash: Hash256,
) -> Result<(), String> {
if !self.blocks.contains_key(&block_hash) {
return Err(format!("block hash {:?} unknown", block_hash));
}
if finalized_block_hash != Hash256::zero()
&& !self.blocks.contains_key(&finalized_block_hash)
{
return Err(format!(
"finalized block hash {:?} is unknown",
finalized_block_hash
));
}
Ok(())
}
}
#[cfg(test)]
mod test {
use super::*;
use types::MainnetEthSpec;
#[test]
fn pow_chain_only() {
const TERMINAL_DIFFICULTY: u64 = 10;
const TERMINAL_BLOCK: u64 = 10;
const DIFFICULTY_INCREMENT: u64 = 1;
let mut generator: ExecutionBlockGenerator<MainnetEthSpec> =
ExecutionBlockGenerator::new(TERMINAL_DIFFICULTY, TERMINAL_BLOCK);
for i in 0..=TERMINAL_BLOCK {
if i > 0 {
generator.insert_pow_block(i).unwrap();
}
/*
* Generate a block, inspect it.
*/
let block = generator.latest_block().unwrap();
assert_eq!(block.block_number(), i);
let expected_parent = i
.checked_sub(1)
.map(|i| generator.block_by_number(i).unwrap().block_hash())
.unwrap_or_else(Hash256::zero);
assert_eq!(block.parent_hash(), expected_parent);
assert_eq!(
block.total_difficulty().unwrap(),
(i * DIFFICULTY_INCREMENT).into()
);
assert_eq!(generator.block_by_hash(block.block_hash()).unwrap(), block);
assert_eq!(generator.block_by_number(i).unwrap(), block);
/*
* Check the parent is accessible.
*/
if let Some(prev_i) = i.checked_sub(1) {
assert_eq!(
generator.block_by_number(prev_i).unwrap(),
generator.block_by_hash(block.parent_hash()).unwrap()
);
}
/*
* Check the next block is inaccessible.
*/
let next_i = i + 1;
assert!(generator.block_by_number(next_i).is_none());
}
}
}

View File

@@ -0,0 +1,125 @@
use super::Context;
use crate::engine_api::http::*;
use serde::de::DeserializeOwned;
use serde_json::Value as JsonValue;
use std::sync::Arc;
use types::EthSpec;
pub async fn handle_rpc<T: EthSpec>(
body: JsonValue,
ctx: Arc<Context<T>>,
) -> Result<JsonValue, String> {
let method = body
.get("method")
.and_then(JsonValue::as_str)
.ok_or_else(|| "missing/invalid method field".to_string())?;
let params = body
.get("params")
.ok_or_else(|| "missing/invalid params field".to_string())?;
match method {
ETH_SYNCING => Ok(JsonValue::Bool(false)),
ETH_GET_BLOCK_BY_NUMBER => {
let tag = params
.get(0)
.and_then(JsonValue::as_str)
.ok_or_else(|| "missing/invalid params[0] value".to_string())?;
match tag {
"latest" => Ok(serde_json::to_value(
ctx.execution_block_generator
.read()
.await
.latest_execution_block(),
)
.unwrap()),
other => Err(format!("The tag {} is not supported", other)),
}
}
ETH_GET_BLOCK_BY_HASH => {
let hash = params
.get(0)
.and_then(JsonValue::as_str)
.ok_or_else(|| "missing/invalid params[0] value".to_string())
.and_then(|s| {
s.parse()
.map_err(|e| format!("unable to parse hash: {:?}", e))
})?;
Ok(serde_json::to_value(
ctx.execution_block_generator
.read()
.await
.execution_block_by_hash(hash),
)
.unwrap())
}
ENGINE_PREPARE_PAYLOAD => {
let request = get_param_0(params)?;
let payload_id = ctx
.execution_block_generator
.write()
.await
.prepare_payload(request)?;
Ok(serde_json::to_value(JsonPayloadId { payload_id }).unwrap())
}
ENGINE_EXECUTE_PAYLOAD => {
let request: JsonExecutionPayload<T> = get_param_0(params)?;
let response = ctx
.execution_block_generator
.write()
.await
.execute_payload(request.into());
Ok(serde_json::to_value(response).unwrap())
}
ENGINE_GET_PAYLOAD => {
let request: JsonPayloadId = get_param_0(params)?;
let id = request.payload_id;
let response = ctx
.execution_block_generator
.write()
.await
.get_payload(id)
.ok_or_else(|| format!("no payload for id {}", id))?;
Ok(serde_json::to_value(JsonExecutionPayload::from(response)).unwrap())
}
ENGINE_CONSENSUS_VALIDATED => {
let request: JsonConsensusValidatedRequest = get_param_0(params)?;
ctx.execution_block_generator
.write()
.await
.consensus_validated(request.block_hash, request.status)?;
Ok(JsonValue::Null)
}
ENGINE_FORKCHOICE_UPDATED => {
let request: JsonForkChoiceUpdatedRequest = get_param_0(params)?;
ctx.execution_block_generator
.write()
.await
.forkchoice_updated(request.head_block_hash, request.finalized_block_hash)?;
Ok(JsonValue::Null)
}
other => Err(format!(
"The method {} does not exist/is not available",
other
)),
}
}
fn get_param_0<T: DeserializeOwned>(params: &JsonValue) -> Result<T, String> {
params
.get(0)
.ok_or_else(|| "missing/invalid params[0] value".to_string())
.and_then(|param| {
serde_json::from_value(param.clone())
.map_err(|e| format!("failed to deserialize param[0]: {:?}", e))
})
}

View File

@@ -0,0 +1,230 @@
//! Provides a mock execution engine HTTP JSON-RPC API for use in testing.
use crate::engine_api::http::JSONRPC_VERSION;
use bytes::Bytes;
use environment::null_logger;
use execution_block_generator::ExecutionBlockGenerator;
use handle_rpc::handle_rpc;
use serde::{Deserialize, Serialize};
use serde_json::json;
use slog::{info, Logger};
use std::future::Future;
use std::marker::PhantomData;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::Arc;
use tokio::sync::{oneshot, RwLock, RwLockWriteGuard};
use types::EthSpec;
use warp::Filter;
pub const DEFAULT_TERMINAL_DIFFICULTY: u64 = 6400;
pub const DEFAULT_TERMINAL_BLOCK: u64 = 64;
mod execution_block_generator;
mod handle_rpc;
pub struct MockServer<T: EthSpec> {
_shutdown_tx: oneshot::Sender<()>,
listen_socket_addr: SocketAddr,
last_echo_request: Arc<RwLock<Option<Bytes>>>,
pub ctx: Arc<Context<T>>,
}
impl<T: EthSpec> MockServer<T> {
pub fn unit_testing() -> Self {
let last_echo_request = Arc::new(RwLock::new(None));
let execution_block_generator =
ExecutionBlockGenerator::new(DEFAULT_TERMINAL_DIFFICULTY, DEFAULT_TERMINAL_BLOCK);
let ctx: Arc<Context<T>> = Arc::new(Context {
config: <_>::default(),
log: null_logger().unwrap(),
last_echo_request: last_echo_request.clone(),
execution_block_generator: RwLock::new(execution_block_generator),
_phantom: PhantomData,
});
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let shutdown_future = async {
// Ignore the result from the channel, shut down regardless.
let _ = shutdown_rx.await;
};
let (listen_socket_addr, server_future) = serve(ctx.clone(), shutdown_future).unwrap();
tokio::spawn(server_future);
Self {
_shutdown_tx: shutdown_tx,
listen_socket_addr,
last_echo_request,
ctx,
}
}
pub async fn execution_block_generator(
&self,
) -> RwLockWriteGuard<'_, ExecutionBlockGenerator<T>> {
self.ctx.execution_block_generator.write().await
}
pub fn url(&self) -> String {
format!(
"http://{}:{}",
self.listen_socket_addr.ip(),
self.listen_socket_addr.port()
)
}
pub async fn last_echo_request(&self) -> Bytes {
self.last_echo_request
.write()
.await
.take()
.expect("last echo request is none")
}
}
#[derive(Debug)]
pub enum Error {
Warp(warp::Error),
Other(String),
}
impl From<warp::Error> for Error {
fn from(e: warp::Error) -> Self {
Error::Warp(e)
}
}
impl From<String> for Error {
fn from(e: String) -> Self {
Error::Other(e)
}
}
#[derive(Debug)]
struct MissingIdField;
impl warp::reject::Reject for MissingIdField {}
/// A wrapper around all the items required to spawn the HTTP server.
///
/// The server will gracefully handle the case where any fields are `None`.
pub struct Context<T: EthSpec> {
pub config: Config,
pub log: Logger,
pub last_echo_request: Arc<RwLock<Option<Bytes>>>,
pub execution_block_generator: RwLock<ExecutionBlockGenerator<T>>,
pub _phantom: PhantomData<T>,
}
/// Configuration for the HTTP server.
#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
pub struct Config {
pub listen_addr: Ipv4Addr,
pub listen_port: u16,
}
impl Default for Config {
fn default() -> Self {
Self {
listen_addr: Ipv4Addr::new(127, 0, 0, 1),
listen_port: 0,
}
}
}
/// Creates a server that will serve requests using information from `ctx`.
///
/// The server will shut down gracefully when the `shutdown` future resolves.
///
/// ## Returns
///
/// This function will bind the server to the provided address and then return a tuple of:
///
/// - `SocketAddr`: the address that the HTTP server will listen on.
/// - `Future`: the actual server future that will need to be awaited.
///
/// ## Errors
///
/// Returns an error if the server is unable to bind or there is another error during
/// configuration.
pub fn serve<T: EthSpec>(
ctx: Arc<Context<T>>,
shutdown: impl Future<Output = ()> + Send + Sync + 'static,
) -> Result<(SocketAddr, impl Future<Output = ()>), Error> {
let config = &ctx.config;
let log = ctx.log.clone();
let inner_ctx = ctx.clone();
let ctx_filter = warp::any().map(move || inner_ctx.clone());
// `/`
//
// Handles actual JSON-RPC requests.
let root = warp::path::end()
.and(warp::body::json())
.and(ctx_filter.clone())
.and_then(|body: serde_json::Value, ctx: Arc<Context<T>>| async move {
let id = body
.get("id")
.and_then(serde_json::Value::as_u64)
.ok_or_else(|| warp::reject::custom(MissingIdField))?;
let response = match handle_rpc(body, ctx).await {
Ok(result) => json!({
"id": id,
"jsonrpc": JSONRPC_VERSION,
"result": result
}),
Err(message) => json!({
"id": id,
"jsonrpc": JSONRPC_VERSION,
"error": {
"code": -1234, // Junk error code.
"message": message
}
}),
};
Ok::<_, warp::reject::Rejection>(
warp::http::Response::builder()
.status(200)
.body(serde_json::to_string(&response).expect("response must be valid JSON")),
)
});
// `/echo`
//
// Sends the body of the request to `ctx.last_echo_request` so we can inspect requests.
let echo = warp::path("echo")
.and(warp::body::bytes())
.and(ctx_filter)
.and_then(|bytes: Bytes, ctx: Arc<Context<T>>| async move {
*ctx.last_echo_request.write().await = Some(bytes.clone());
Ok::<_, warp::reject::Rejection>(
warp::http::Response::builder().status(200).body(bytes),
)
});
let routes = warp::post()
.and(root.or(echo))
// Add a `Server` header.
.map(|reply| warp::reply::with_header(reply, "Server", "lighthouse-mock-execution-client"));
let (listening_socket, server) = warp::serve(routes).try_bind_with_graceful_shutdown(
SocketAddrV4::new(config.listen_addr, config.listen_port),
async {
shutdown.await;
},
)?;
info!(
log,
"Metrics HTTP server started";
"listen_address" => listening_socket.to_string(),
);
Ok((listening_socket, server))
}