Implement engine API v1.0.0-alpha.4 (#2810)

* Added ForkchoiceUpdatedV1 & GetPayloadV1

* Added ExecutePayloadV1

* Added new geth test vectors

* Separated Json Object/Serialization Code into file

* Deleted code/tests for Requests Removed from spec

* Finally fixed serialization of null '0x'

* Made Naming of JSON Structs Consistent

* Fix clippy lints

* Remove u64 payload id

* Remove unused serde impls

* Swap to [u8; 8] for payload id

* Tidy

* Adjust some block gen return vals

* Tidy

* Add fallback when payload id is unknown

* Remove comment

Co-authored-by: Mark Mackey <mark@sigmaprime.io>
This commit is contained in:
Paul Hauner
2021-11-15 17:13:38 +11:00
parent cdfd1304a5
commit 47db682d7e
17 changed files with 1271 additions and 915 deletions

View File

@@ -5,7 +5,7 @@
//! deposit-contract functionality that the `beacon_node/eth1` crate already provides.
use engine_api::{Error as ApiError, *};
use engines::{Engine, EngineError, Engines, ForkChoiceHead, Logging};
use engines::{Engine, EngineError, Engines, ForkChoiceState, Logging};
use lru::LruCache;
use sensitive_url::SensitiveUrl;
use slog::{crit, debug, error, info, Logger};
@@ -19,12 +19,10 @@ use tokio::{
time::{sleep, sleep_until, Instant},
};
pub use engine_api::{http::HttpJsonRpc, ConsensusStatus, ExecutePayloadResponse};
pub use execute_payload_handle::ExecutePayloadHandle;
pub use engine_api::{http::HttpJsonRpc, ExecutePayloadResponseStatus};
mod engine_api;
mod engines;
mod execute_payload_handle;
pub mod test_utils;
/// Each time the `ExecutionLayer` retrieves a block from an execution node, it stores that block
@@ -97,7 +95,7 @@ impl ExecutionLayer {
let inner = Inner {
engines: Engines {
engines,
latest_head: <_>::default(),
latest_forkchoice_state: <_>::default(),
log: log.clone(),
},
terminal_total_difficulty,
@@ -236,39 +234,6 @@ impl ExecutionLayer {
self.engines().any_synced().await
}
/// Maps to the `engine_preparePayload` JSON-RPC function.
///
/// ## Fallback Behavior
///
/// The result will be returned from the first node that returns successfully. No more nodes
/// will be contacted.
pub async fn prepare_payload(
&self,
parent_hash: Hash256,
timestamp: u64,
random: Hash256,
) -> Result<PayloadId, Error> {
let fee_recipient = self.fee_recipient()?;
debug!(
self.log(),
"Issuing engine_preparePayload";
"fee_recipient" => ?fee_recipient,
"random" => ?random,
"timestamp" => timestamp,
"parent_hash" => ?parent_hash,
);
self.engines()
.first_success(|engine| {
// TODO(merge): make a cache for these IDs, so we don't always have to perform this
// request.
engine
.api
.prepare_payload(parent_hash, timestamp, random, fee_recipient)
})
.await
.map_err(Error::EngineErrors)
}
/// Maps to the `engine_getPayload` JSON-RPC call.
///
/// However, it will attempt to call `self.prepare_payload` if it cannot find an existing
@@ -283,6 +248,7 @@ impl ExecutionLayer {
parent_hash: Hash256,
timestamp: u64,
random: Hash256,
finalized_block_hash: Hash256,
) -> Result<ExecutionPayload<T>, Error> {
let fee_recipient = self.fee_recipient()?;
debug!(
@@ -295,14 +261,41 @@ impl ExecutionLayer {
);
self.engines()
.first_success(|engine| async move {
// TODO(merge): make a cache for these IDs, so we don't always have to perform this
// request.
let payload_id = engine
.api
.prepare_payload(parent_hash, timestamp, random, fee_recipient)
.await?;
let payload_id = if let Some(id) = engine
.get_payload_id(parent_hash, timestamp, random, fee_recipient)
.await
{
// The payload id has been cached for this engine.
id
} else {
// The payload id has *not* been cached for this engine. Trigger an artificial
// fork choice update to retrieve a payload ID.
//
// TODO(merge): a better algorithm might try to favour a node that already had a
// cached payload id, since a payload that has had more time to produce is
// likely to be more profitable.
let fork_choice_state = ForkChoiceState {
head_block_hash: parent_hash,
safe_block_hash: parent_hash,
finalized_block_hash,
};
let payload_attributes = PayloadAttributes {
timestamp,
random,
fee_recipient,
};
engine.api.get_payload(payload_id).await
engine
.notify_forkchoice_updated(
fork_choice_state,
Some(payload_attributes),
self.log(),
)
.await?
.ok_or(ApiError::PayloadIdUnavailable)?
};
engine.api.get_payload_v1(payload_id).await
})
.await
.map_err(Error::EngineErrors)
@@ -323,7 +316,7 @@ impl ExecutionLayer {
pub async fn execute_payload<T: EthSpec>(
&self,
execution_payload: &ExecutionPayload<T>,
) -> Result<(ExecutePayloadResponse, Option<ExecutePayloadHandle>), Error> {
) -> Result<(ExecutePayloadResponseStatus, Option<Hash256>), Error> {
debug!(
self.log(),
"Issuing engine_executePayload";
@@ -334,18 +327,46 @@ impl ExecutionLayer {
let broadcast_results = self
.engines()
.broadcast(|engine| engine.api.execute_payload(execution_payload.clone()))
.broadcast(|engine| engine.api.execute_payload_v1(execution_payload.clone()))
.await;
let mut errors = vec![];
let mut valid = 0;
let mut invalid = 0;
let mut syncing = 0;
let mut invalid_latest_valid_hash = vec![];
for result in broadcast_results {
match result {
Ok(ExecutePayloadResponse::Valid) => valid += 1,
Ok(ExecutePayloadResponse::Invalid) => invalid += 1,
Ok(ExecutePayloadResponse::Syncing) => syncing += 1,
match result.map(|response| (response.latest_valid_hash, response.status)) {
Ok((Some(latest_hash), ExecutePayloadResponseStatus::Valid)) => {
if latest_hash == execution_payload.block_hash {
valid += 1;
} else {
invalid += 1;
errors.push(EngineError::Api {
id: "unknown".to_string(),
error: engine_api::Error::BadResponse(
format!(
"execute_payload: response.status = Valid but invalid latest_valid_hash. Expected({:?}) Found({:?})",
execution_payload.block_hash,
latest_hash,
)
),
});
invalid_latest_valid_hash.push(latest_hash);
}
}
Ok((Some(latest_hash), ExecutePayloadResponseStatus::Invalid)) => {
invalid += 1;
invalid_latest_valid_hash.push(latest_hash);
}
Ok((_, ExecutePayloadResponseStatus::Syncing)) => syncing += 1,
Ok((None, status)) => errors.push(EngineError::Api {
id: "unknown".to_string(),
error: engine_api::Error::BadResponse(format!(
"execute_payload: status {:?} returned with null latest_valid_hash",
status
)),
}),
Err(e) => errors.push(e),
}
}
@@ -359,16 +380,14 @@ impl ExecutionLayer {
}
if valid > 0 {
let handle = ExecutePayloadHandle {
block_hash: execution_payload.block_hash,
execution_layer: Some(self.clone()),
log: self.log().clone(),
};
Ok((ExecutePayloadResponse::Valid, Some(handle)))
Ok((
ExecutePayloadResponseStatus::Valid,
Some(execution_payload.block_hash),
))
} else if invalid > 0 {
Ok((ExecutePayloadResponse::Invalid, None))
Ok((ExecutePayloadResponseStatus::Invalid, None))
} else if syncing > 0 {
Ok((ExecutePayloadResponse::Syncing, None))
Ok((ExecutePayloadResponseStatus::Syncing, None))
} else {
Err(Error::EngineErrors(errors))
}
@@ -384,48 +403,11 @@ impl ExecutionLayer {
///
/// - Ok, if any node returns successfully.
/// - An error, if all nodes return an error.
pub async fn consensus_validated(
&self,
block_hash: Hash256,
status: ConsensusStatus,
) -> Result<(), Error> {
debug!(
self.log(),
"Issuing engine_consensusValidated";
"status" => ?status,
"block_hash" => ?block_hash,
);
let broadcast_results = self
.engines()
.broadcast(|engine| engine.api.consensus_validated(block_hash, status))
.await;
if broadcast_results.iter().any(Result::is_ok) {
Ok(())
} else {
Err(Error::EngineErrors(
broadcast_results
.into_iter()
.filter_map(Result::err)
.collect(),
))
}
}
/// Maps to the `engine_consensusValidated` JSON-RPC call.
///
/// ## Fallback Behaviour
///
/// The request will be broadcast to all nodes, simultaneously. It will await a response (or
/// failure) from all nodes and then return based on the first of these conditions which
/// returns true:
///
/// - Ok, if any node returns successfully.
/// - An error, if all nodes return an error.
pub async fn forkchoice_updated(
pub async fn notify_forkchoice_updated(
&self,
head_block_hash: Hash256,
finalized_block_hash: Hash256,
payload_attributes: Option<PayloadAttributes>,
) -> Result<(), Error> {
debug!(
self.log(),
@@ -434,33 +416,35 @@ impl ExecutionLayer {
"head_block_hash" => ?head_block_hash,
);
// Update the cached version of the latest head so it can be sent to new or reconnecting
// execution nodes.
// see https://hackmd.io/@n0ble/kintsugi-spec#Engine-API
// for now, we must set safe_block_hash = head_block_hash
let forkchoice_state = ForkChoiceState {
head_block_hash,
safe_block_hash: head_block_hash,
finalized_block_hash,
};
self.engines()
.set_latest_head(ForkChoiceHead {
head_block_hash,
finalized_block_hash,
})
.set_latest_forkchoice_state(forkchoice_state)
.await;
let broadcast_results = self
.engines()
.broadcast(|engine| {
.broadcast(|engine| async move {
engine
.api
.forkchoice_updated(head_block_hash, finalized_block_hash)
.notify_forkchoice_updated(forkchoice_state, payload_attributes, self.log())
.await
})
.await;
if broadcast_results.iter().any(Result::is_ok) {
Ok(())
} else {
Err(Error::EngineErrors(
broadcast_results
.into_iter()
.filter_map(Result::err)
.collect(),
))
let errors = broadcast_results
.into_iter()
.filter_map(Result::err)
.collect();
Err(Error::EngineErrors(errors))
}
}