Kiln mev boost (#3062)

## Issue Addressed

MEV boost compatibility

## Proposed Changes

See #2987

## Additional Info

This is blocked on the stabilization of a couple specs, [here](https://github.com/ethereum/beacon-APIs/pull/194) and [here](https://github.com/flashbots/mev-boost/pull/20).

Additional TODO's and outstanding questions

- [ ] MEV boost JWT Auth
- [ ] Will `builder_proposeBlindedBlock` return the revealed payload for the BN to propogate
- [ ] Should we remove `private-tx-proposals` flag and communicate BN <> VC with blinded blocks by default once these endpoints enter the beacon-API's repo? This simplifies merge transition logic. 

Co-authored-by: realbigsean <seananderson33@gmail.com>
Co-authored-by: realbigsean <sean@sigmaprime.io>
This commit is contained in:
realbigsean
2022-03-31 07:52:23 +00:00
parent 83234ee4ce
commit ea783360d3
48 changed files with 1628 additions and 644 deletions

View File

@@ -1,18 +1,21 @@
use crate::engines::ForkChoiceState;
use async_trait::async_trait;
use eth1::http::RpcError;
pub use json_structures::TransitionConfigurationV1;
use reqwest::StatusCode;
use serde::{Deserialize, Serialize};
pub const LATEST_TAG: &str = "latest";
use crate::engines::ForkChoiceState;
pub use json_structures::TransitionConfigurationV1;
pub use types::{Address, EthSpec, ExecutionBlockHash, ExecutionPayload, Hash256, Uint256};
use slog::Logger;
pub use types::{
Address, EthSpec, ExecutionBlockHash, ExecutionPayload, ExecutionPayloadHeader, Hash256,
Uint256,
};
pub mod auth;
pub mod http;
pub mod json_structures;
pub const LATEST_TAG: &str = "latest";
pub type PayloadId = [u8; 8];
#[derive(Debug)]
@@ -24,7 +27,10 @@ pub enum Error {
InvalidExecutePayloadResponse(&'static str),
JsonRpc(RpcError),
Json(serde_json::Error),
ServerMessage { code: i64, message: String },
ServerMessage {
code: i64,
message: String,
},
Eip155Failure,
IsSyncing,
ExecutionBlockNotFound(ExecutionBlockHash),
@@ -32,6 +38,14 @@ pub enum Error {
ParentHashEqualsBlockHash(ExecutionBlockHash),
PayloadIdUnavailable,
TransitionConfigurationMismatch,
PayloadConversionLogicFlaw,
InvalidBuilderQuery,
MissingPayloadId {
parent_hash: ExecutionBlockHash,
timestamp: u64,
prev_randao: Hash256,
suggested_fee_recipient: Address,
},
}
impl From<reqwest::Error> for Error {
@@ -59,41 +73,17 @@ impl From<auth::Error> for Error {
}
}
/// A generic interface for an execution engine API.
pub struct EngineApi;
pub struct BuilderApi;
#[async_trait]
pub trait EngineApi {
async fn upcheck(&self) -> Result<(), Error>;
async fn get_block_by_number<'a>(
&self,
block_by_number: BlockByNumberQuery<'a>,
) -> Result<Option<ExecutionBlock>, Error>;
async fn get_block_by_hash<'a>(
&self,
block_hash: ExecutionBlockHash,
) -> Result<Option<ExecutionBlock>, Error>;
async fn new_payload_v1<T: EthSpec>(
&self,
execution_payload: ExecutionPayload<T>,
) -> Result<PayloadStatusV1, Error>;
async fn get_payload_v1<T: EthSpec>(
&self,
payload_id: PayloadId,
) -> Result<ExecutionPayload<T>, Error>;
async fn forkchoice_updated_v1(
pub trait Builder {
async fn notify_forkchoice_updated(
&self,
forkchoice_state: ForkChoiceState,
payload_attributes: Option<PayloadAttributes>,
log: &Logger,
) -> Result<ForkchoiceUpdatedResponse, Error>;
async fn exchange_transition_configuration_v1(
&self,
transition_configuration: TransitionConfigurationV1,
) -> Result<TransitionConfigurationV1, Error>;
}
#[derive(Clone, Copy, Debug, PartialEq)]
@@ -142,3 +132,17 @@ pub struct ForkchoiceUpdatedResponse {
pub payload_status: PayloadStatusV1,
pub payload_id: Option<PayloadId>,
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum ProposeBlindedBlockResponseStatus {
Valid,
Invalid,
Syncing,
}
#[derive(Clone, Debug, PartialEq)]
pub struct ProposeBlindedBlockResponse {
pub status: ProposeBlindedBlockResponseStatus,
pub latest_valid_hash: Option<Hash256>,
pub validation_error: Option<String>,
}

View File

@@ -3,14 +3,14 @@
use super::*;
use crate::auth::Auth;
use crate::json_structures::*;
use async_trait::async_trait;
use eth1::http::EIP155_ERROR_STR;
use reqwest::header::CONTENT_TYPE;
use sensitive_url::SensitiveUrl;
use serde::de::DeserializeOwned;
use serde_json::json;
use std::marker::PhantomData;
use std::time::Duration;
use types::EthSpec;
use types::{BlindedPayload, EthSpec, ExecutionPayloadHeader, SignedBeaconBlock};
pub use reqwest::Client;
@@ -42,18 +42,26 @@ pub const ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1: &str =
pub const ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1_TIMEOUT: Duration =
Duration::from_millis(500);
pub struct HttpJsonRpc {
pub const BUILDER_GET_PAYLOAD_HEADER_V1: &str = "builder_getPayloadHeaderV1";
pub const BUILDER_GET_PAYLOAD_HEADER_TIMEOUT: Duration = Duration::from_secs(2);
pub const BUILDER_PROPOSE_BLINDED_BLOCK_V1: &str = "builder_proposeBlindedBlockV1";
pub const BUILDER_PROPOSE_BLINDED_BLOCK_TIMEOUT: Duration = Duration::from_secs(2);
pub struct HttpJsonRpc<T = EngineApi> {
pub client: Client,
pub url: SensitiveUrl,
auth: Option<Auth>,
_phantom: PhantomData<T>,
}
impl HttpJsonRpc {
impl<T> HttpJsonRpc<T> {
pub fn new(url: SensitiveUrl) -> Result<Self, Error> {
Ok(Self {
client: Client::builder().build()?,
url,
auth: None,
_phantom: PhantomData,
})
}
@@ -62,15 +70,16 @@ impl HttpJsonRpc {
client: Client::builder().build()?,
url,
auth: Some(auth),
_phantom: PhantomData,
})
}
pub async fn rpc_request<T: DeserializeOwned>(
pub async fn rpc_request<D: DeserializeOwned>(
&self,
method: &str,
params: serde_json::Value,
timeout: Duration,
) -> Result<T, Error> {
) -> Result<D, Error> {
let body = JsonRequestBody {
jsonrpc: JSONRPC_VERSION,
method,
@@ -108,9 +117,8 @@ impl HttpJsonRpc {
}
}
#[async_trait]
impl EngineApi for HttpJsonRpc {
async fn upcheck(&self) -> Result<(), Error> {
impl HttpJsonRpc<EngineApi> {
pub async fn upcheck(&self) -> Result<(), Error> {
let result: serde_json::Value = self
.rpc_request(ETH_SYNCING, json!([]), ETH_SYNCING_TIMEOUT)
.await?;
@@ -127,7 +135,7 @@ impl EngineApi for HttpJsonRpc {
}
}
async fn get_block_by_number<'a>(
pub async fn get_block_by_number<'a>(
&self,
query: BlockByNumberQuery<'a>,
) -> Result<Option<ExecutionBlock>, Error> {
@@ -141,7 +149,7 @@ impl EngineApi for HttpJsonRpc {
.await
}
async fn get_block_by_hash<'a>(
pub async fn get_block_by_hash<'a>(
&self,
block_hash: ExecutionBlockHash,
) -> Result<Option<ExecutionBlock>, Error> {
@@ -151,7 +159,7 @@ impl EngineApi for HttpJsonRpc {
.await
}
async fn new_payload_v1<T: EthSpec>(
pub async fn new_payload_v1<T: EthSpec>(
&self,
execution_payload: ExecutionPayload<T>,
) -> Result<PayloadStatusV1, Error> {
@@ -164,7 +172,7 @@ impl EngineApi for HttpJsonRpc {
Ok(response.into())
}
async fn get_payload_v1<T: EthSpec>(
pub async fn get_payload_v1<T: EthSpec>(
&self,
payload_id: PayloadId,
) -> Result<ExecutionPayload<T>, Error> {
@@ -177,7 +185,7 @@ impl EngineApi for HttpJsonRpc {
Ok(response.into())
}
async fn forkchoice_updated_v1(
pub async fn forkchoice_updated_v1(
&self,
forkchoice_state: ForkChoiceState,
payload_attributes: Option<PayloadAttributes>,
@@ -198,7 +206,7 @@ impl EngineApi for HttpJsonRpc {
Ok(response.into())
}
async fn exchange_transition_configuration_v1(
pub async fn exchange_transition_configuration_v1(
&self,
transition_configuration: TransitionConfigurationV1,
) -> Result<TransitionConfigurationV1, Error> {
@@ -216,6 +224,62 @@ impl EngineApi for HttpJsonRpc {
}
}
impl HttpJsonRpc<BuilderApi> {
pub async fn get_payload_header_v1<T: EthSpec>(
&self,
payload_id: PayloadId,
) -> Result<ExecutionPayloadHeader<T>, Error> {
let params = json!([JsonPayloadIdRequest::from(payload_id)]);
let response: JsonExecutionPayloadHeaderV1<T> = self
.rpc_request(
BUILDER_GET_PAYLOAD_HEADER_V1,
params,
BUILDER_GET_PAYLOAD_HEADER_TIMEOUT,
)
.await?;
Ok(response.into())
}
pub async fn forkchoice_updated_v1(
&self,
forkchoice_state: ForkChoiceState,
payload_attributes: Option<PayloadAttributes>,
) -> Result<ForkchoiceUpdatedResponse, Error> {
let params = json!([
JsonForkChoiceStateV1::from(forkchoice_state),
payload_attributes.map(JsonPayloadAttributesV1::from)
]);
let response: JsonForkchoiceUpdatedV1Response = self
.rpc_request(
ENGINE_FORKCHOICE_UPDATED_V1,
params,
ENGINE_FORKCHOICE_UPDATED_TIMEOUT,
)
.await?;
Ok(response.into())
}
pub async fn propose_blinded_block_v1<T: EthSpec>(
&self,
block: SignedBeaconBlock<T, BlindedPayload<T>>,
) -> Result<ExecutionPayload<T>, Error> {
let params = json!([block]);
let response: JsonExecutionPayloadV1<T> = self
.rpc_request(
BUILDER_PROPOSE_BLINDED_BLOCK_V1,
params,
BUILDER_PROPOSE_BLINDED_BLOCK_TIMEOUT,
)
.await?;
Ok(response.into())
}
}
#[cfg(test)]
mod test {
use super::auth::JwtKey;
@@ -224,7 +288,7 @@ mod test {
use std::future::Future;
use std::str::FromStr;
use std::sync::Arc;
use types::{MainnetEthSpec, Transaction, Unsigned, VariableList};
use types::{MainnetEthSpec, Transactions, Unsigned, VariableList};
struct Tester {
server: MockServer<MainnetEthSpec>,
@@ -326,10 +390,7 @@ mod test {
const LOGS_BLOOM_01: &str = "0x01010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101";
fn encode_transactions<E: EthSpec>(
transactions: VariableList<
Transaction<E::MaxBytesPerTransaction>,
E::MaxTransactionsPerPayload,
>,
transactions: Transactions<E>,
) -> Result<serde_json::Value, serde_json::Error> {
let ep: JsonExecutionPayloadV1<E> = JsonExecutionPayloadV1 {
transactions,
@@ -341,10 +402,7 @@ mod test {
fn decode_transactions<E: EthSpec>(
transactions: serde_json::Value,
) -> Result<
VariableList<Transaction<E::MaxBytesPerTransaction>, E::MaxTransactionsPerPayload>,
serde_json::Error,
> {
) -> Result<Transactions<E>, serde_json::Error> {
let mut json = json!({
"parentHash": HASH_00,
"feeRecipient": ADDRESS_01,
@@ -370,7 +428,7 @@ mod test {
fn assert_transactions_serde<E: EthSpec>(
name: &str,
as_obj: VariableList<Transaction<E::MaxBytesPerTransaction>, E::MaxTransactionsPerPayload>,
as_obj: Transactions<E>,
as_json: serde_json::Value,
) {
assert_eq!(
@@ -388,9 +446,7 @@ mod test {
}
/// Example: if `spec == &[1, 1]`, then two one-byte transactions will be created.
fn generate_transactions<E: EthSpec>(
spec: &[usize],
) -> VariableList<Transaction<E::MaxBytesPerTransaction>, E::MaxTransactionsPerPayload> {
fn generate_transactions<E: EthSpec>(spec: &[usize]) -> Transactions<E> {
let mut txs = VariableList::default();
for &num_bytes in spec {
@@ -860,7 +916,7 @@ mod test {
extra_data: vec![].into(),
base_fee_per_gas: Uint256::from(7),
block_hash: ExecutionBlockHash::from_str("0x6359b8381a370e2f54072a5784ddd78b6ed024991558c511d4452eb4f6ac898c").unwrap(),
transactions: vec![].into(),
transactions: vec![].into(),
};
assert_eq!(payload, expected);

View File

@@ -1,6 +1,9 @@
use super::*;
use serde::{Deserialize, Serialize};
use types::{EthSpec, ExecutionBlockHash, FixedVector, Transaction, Unsigned, VariableList};
use types::{
EthSpec, ExecutionBlockHash, ExecutionPayloadHeader, FixedVector, Transaction, Unsigned,
VariableList,
};
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
@@ -55,6 +58,70 @@ pub struct JsonPayloadIdResponse {
pub payload_id: PayloadId,
}
#[derive(Debug, PartialEq, Default, Serialize, Deserialize)]
#[serde(bound = "T: EthSpec", rename_all = "camelCase")]
pub struct JsonExecutionPayloadHeaderV1<T: EthSpec> {
pub parent_hash: ExecutionBlockHash,
pub fee_recipient: Address,
pub state_root: Hash256,
pub receipts_root: Hash256,
#[serde(with = "serde_logs_bloom")]
pub logs_bloom: FixedVector<u8, T::BytesPerLogsBloom>,
pub prev_randao: Hash256,
#[serde(with = "eth2_serde_utils::u64_hex_be")]
pub block_number: u64,
#[serde(with = "eth2_serde_utils::u64_hex_be")]
pub gas_limit: u64,
#[serde(with = "eth2_serde_utils::u64_hex_be")]
pub gas_used: u64,
#[serde(with = "eth2_serde_utils::u64_hex_be")]
pub timestamp: u64,
#[serde(with = "ssz_types::serde_utils::hex_var_list")]
pub extra_data: VariableList<u8, T::MaxExtraDataBytes>,
pub base_fee_per_gas: Uint256,
pub block_hash: ExecutionBlockHash,
pub transactions_root: Hash256,
}
impl<T: EthSpec> From<JsonExecutionPayloadHeaderV1<T>> for ExecutionPayloadHeader<T> {
fn from(e: JsonExecutionPayloadHeaderV1<T>) -> Self {
// Use this verbose deconstruction pattern to ensure no field is left unused.
let JsonExecutionPayloadHeaderV1 {
parent_hash,
fee_recipient,
state_root,
receipts_root,
logs_bloom,
prev_randao,
block_number,
gas_limit,
gas_used,
timestamp,
extra_data,
base_fee_per_gas,
block_hash,
transactions_root,
} = e;
Self {
parent_hash,
fee_recipient,
state_root,
receipts_root,
logs_bloom,
prev_randao,
block_number,
gas_limit,
gas_used,
timestamp,
extra_data,
base_fee_per_gas,
block_hash,
transactions_root,
}
}
}
#[derive(Debug, PartialEq, Default, Serialize, Deserialize)]
#[serde(bound = "T: EthSpec", rename_all = "camelCase")]
pub struct JsonExecutionPayloadV1<T: EthSpec> {
@@ -77,7 +144,7 @@ pub struct JsonExecutionPayloadV1<T: EthSpec> {
pub extra_data: VariableList<u8, T::MaxExtraDataBytes>,
pub base_fee_per_gas: Uint256,
pub block_hash: ExecutionBlockHash,
#[serde(with = "serde_transactions")]
#[serde(with = "ssz_types::serde_utils::list_of_hex_var_list")]
pub transactions:
VariableList<Transaction<T::MaxBytesPerTransaction>, T::MaxTransactionsPerPayload>,
}
@@ -363,6 +430,59 @@ impl From<ForkchoiceUpdatedResponse> for JsonForkchoiceUpdatedV1Response {
}
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub enum JsonProposeBlindedBlockResponseStatus {
Valid,
Invalid,
Syncing,
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
#[serde(bound = "E: EthSpec")]
pub struct JsonProposeBlindedBlockResponse<E: EthSpec> {
pub result: ExecutionPayload<E>,
pub error: Option<String>,
}
impl<E: EthSpec> From<JsonProposeBlindedBlockResponse<E>> for ExecutionPayload<E> {
fn from(j: JsonProposeBlindedBlockResponse<E>) -> Self {
let JsonProposeBlindedBlockResponse { result, error: _ } = j;
result
}
}
impl From<JsonProposeBlindedBlockResponseStatus> for ProposeBlindedBlockResponseStatus {
fn from(j: JsonProposeBlindedBlockResponseStatus) -> Self {
match j {
JsonProposeBlindedBlockResponseStatus::Valid => {
ProposeBlindedBlockResponseStatus::Valid
}
JsonProposeBlindedBlockResponseStatus::Invalid => {
ProposeBlindedBlockResponseStatus::Invalid
}
JsonProposeBlindedBlockResponseStatus::Syncing => {
ProposeBlindedBlockResponseStatus::Syncing
}
}
}
}
impl From<ProposeBlindedBlockResponseStatus> for JsonProposeBlindedBlockResponseStatus {
fn from(f: ProposeBlindedBlockResponseStatus) -> Self {
match f {
ProposeBlindedBlockResponseStatus::Valid => {
JsonProposeBlindedBlockResponseStatus::Valid
}
ProposeBlindedBlockResponseStatus::Invalid => {
JsonProposeBlindedBlockResponseStatus::Invalid
}
ProposeBlindedBlockResponseStatus::Syncing => {
JsonProposeBlindedBlockResponseStatus::Syncing
}
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TransitionConfigurationV1 {
@@ -400,75 +520,3 @@ pub mod serde_logs_bloom {
.map_err(|e| serde::de::Error::custom(format!("invalid logs bloom: {:?}", e)))
}
}
/// Serializes the `transactions` field of an `ExecutionPayload`.
pub mod serde_transactions {
use super::*;
use eth2_serde_utils::hex;
use serde::ser::SerializeSeq;
use serde::{de, Deserializer, Serializer};
use std::marker::PhantomData;
type Value<M, N> = VariableList<Transaction<M>, N>;
#[derive(Default)]
pub struct ListOfBytesListVisitor<M, N> {
_phantom_m: PhantomData<M>,
_phantom_n: PhantomData<N>,
}
impl<'a, M: Unsigned, N: Unsigned> serde::de::Visitor<'a> for ListOfBytesListVisitor<M, N> {
type Value = Value<M, N>;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(formatter, "a list of 0x-prefixed byte lists")
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: serde::de::SeqAccess<'a>,
{
let mut outer = VariableList::default();
while let Some(val) = seq.next_element::<String>()? {
let inner_vec = hex::decode(&val).map_err(de::Error::custom)?;
let transaction = VariableList::new(inner_vec).map_err(|e| {
serde::de::Error::custom(format!("transaction too large: {:?}", e))
})?;
outer.push(transaction).map_err(|e| {
serde::de::Error::custom(format!("too many transactions: {:?}", e))
})?;
}
Ok(outer)
}
}
pub fn serialize<S, M: Unsigned, N: Unsigned>(
value: &Value<M, N>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut seq = serializer.serialize_seq(Some(value.len()))?;
for transaction in value {
// It's important to match on the inner values of the transaction. Serializing the
// entire `Transaction` will result in appending the SSZ union prefix byte. The
// execution node does not want that.
let hex = hex::encode(&transaction[..]);
seq.serialize_element(&hex)?;
}
seq.end()
}
pub fn deserialize<'de, D, M: Unsigned, N: Unsigned>(
deserializer: D,
) -> Result<Value<M, N>, D::Error>
where
D: Deserializer<'de>,
{
let visitor: ListOfBytesListVisitor<M, N> = <_>::default();
deserializer.deserialize_any(visitor)
}
}

View File

@@ -1,8 +1,11 @@
//! Provides generic behaviour for multiple execution engines, specifically fallback behaviour.
use crate::engine_api::{
EngineApi, Error as EngineApiError, ForkchoiceUpdatedResponse, PayloadAttributes, PayloadId,
Builder, EngineApi, Error as EngineApiError, ForkchoiceUpdatedResponse, PayloadAttributes,
PayloadId,
};
use crate::{BuilderApi, HttpJsonRpc};
use async_trait::async_trait;
use futures::future::join_all;
use lru::LruCache;
use slog::{crit, debug, info, warn, Logger};
@@ -58,14 +61,14 @@ struct PayloadIdCacheKey {
/// An execution engine.
pub struct Engine<T> {
pub id: String,
pub api: T,
pub api: HttpJsonRpc<T>,
payload_id_cache: Mutex<LruCache<PayloadIdCacheKey, PayloadId>>,
state: RwLock<EngineState>,
}
impl<T> Engine<T> {
/// Creates a new, offline engine.
pub fn new(id: String, api: T) -> Self {
pub fn new(id: String, api: HttpJsonRpc<T>) -> Self {
Self {
id,
api,
@@ -94,8 +97,9 @@ impl<T> Engine<T> {
}
}
impl<T: EngineApi> Engine<T> {
pub async fn notify_forkchoice_updated(
#[async_trait]
impl Builder for Engine<EngineApi> {
async fn notify_forkchoice_updated(
&self,
forkchoice_state: ForkChoiceState,
payload_attributes: Option<PayloadAttributes>,
@@ -124,14 +128,47 @@ impl<T: EngineApi> Engine<T> {
}
}
#[async_trait]
impl Builder for Engine<BuilderApi> {
async fn notify_forkchoice_updated(
&self,
forkchoice_state: ForkChoiceState,
pa: Option<PayloadAttributes>,
log: &Logger,
) -> Result<ForkchoiceUpdatedResponse, EngineApiError> {
let payload_attributes = pa.ok_or(EngineApiError::InvalidBuilderQuery)?;
let response = self
.api
.forkchoice_updated_v1(forkchoice_state, Some(payload_attributes))
.await?;
if let Some(payload_id) = response.payload_id {
let key = PayloadIdCacheKey::new(&forkchoice_state, &payload_attributes);
self.payload_id_cache.lock().await.put(key, payload_id);
} else {
warn!(
log,
"Builder should have returned a payload_id for attributes {:?}", payload_attributes
);
}
Ok(response)
}
}
/// Holds multiple execution engines and provides functionality for managing them in a fallback
/// manner.
pub struct Engines<T> {
pub engines: Vec<Engine<T>>,
pub struct Engines {
pub engines: Vec<Engine<EngineApi>>,
pub latest_forkchoice_state: RwLock<Option<ForkChoiceState>>,
pub log: Logger,
}
pub struct Builders {
pub builders: Vec<Engine<BuilderApi>>,
pub log: Logger,
}
#[derive(Debug)]
pub enum EngineError {
Offline { id: String },
@@ -139,7 +176,7 @@ pub enum EngineError {
Auth { id: String },
}
impl<T: EngineApi> Engines<T> {
impl Engines {
async fn get_latest_forkchoice_state(&self) -> Option<ForkChoiceState> {
*self.latest_forkchoice_state.read().await
}
@@ -148,7 +185,7 @@ impl<T: EngineApi> Engines<T> {
*self.latest_forkchoice_state.write().await = Some(state);
}
async fn send_latest_forkchoice_state(&self, engine: &Engine<T>) {
async fn send_latest_forkchoice_state(&self, engine: &Engine<EngineApi>) {
let latest_forkchoice_state = self.get_latest_forkchoice_state().await;
if let Some(forkchoice_state) = latest_forkchoice_state {
@@ -286,7 +323,7 @@ impl<T: EngineApi> Engines<T> {
/// it runs, it will try to upcheck all offline nodes and then run the function again.
pub async fn first_success<'a, F, G, H>(&'a self, func: F) -> Result<H, Vec<EngineError>>
where
F: Fn(&'a Engine<T>) -> G + Copy,
F: Fn(&'a Engine<EngineApi>) -> G + Copy,
G: Future<Output = Result<H, EngineApiError>>,
{
match self.first_success_without_retry(func).await {
@@ -308,12 +345,12 @@ impl<T: EngineApi> Engines<T> {
/// Run `func` on all engines, in the order in which they are defined, returning the first
/// successful result that is found.
async fn first_success_without_retry<'a, F, G, H>(
pub async fn first_success_without_retry<'a, F, G, H>(
&'a self,
func: F,
) -> Result<H, Vec<EngineError>>
where
F: Fn(&'a Engine<T>) -> G,
F: Fn(&'a Engine<EngineApi>) -> G,
G: Future<Output = Result<H, EngineApiError>>,
{
let mut errors = vec![];
@@ -364,7 +401,7 @@ impl<T: EngineApi> Engines<T> {
/// it runs, it will try to upcheck all offline nodes and then run the function again.
pub async fn broadcast<'a, F, G, H>(&'a self, func: F) -> Vec<Result<H, EngineError>>
where
F: Fn(&'a Engine<T>) -> G + Copy,
F: Fn(&'a Engine<EngineApi>) -> G + Copy,
G: Future<Output = Result<H, EngineApiError>>,
{
let first_results = self.broadcast_without_retry(func).await;
@@ -392,7 +429,7 @@ impl<T: EngineApi> Engines<T> {
func: F,
) -> Vec<Result<H, EngineError>>
where
F: Fn(&'a Engine<T>) -> G,
F: Fn(&'a Engine<EngineApi>) -> G,
G: Future<Output = Result<H, EngineApiError>>,
{
let func = &func;
@@ -426,6 +463,66 @@ impl<T: EngineApi> Engines<T> {
}
}
impl Builders {
pub async fn first_success_without_retry<'a, F, G, H>(
&'a self,
func: F,
) -> Result<H, Vec<EngineError>>
where
F: Fn(&'a Engine<BuilderApi>) -> G,
G: Future<Output = Result<H, EngineApiError>>,
{
let mut errors = vec![];
for builder in &self.builders {
match func(builder).await {
Ok(result) => return Ok(result),
Err(error) => {
debug!(
self.log,
"Builder call failed";
"error" => ?error,
"id" => &builder.id
);
errors.push(EngineError::Api {
id: builder.id.clone(),
error,
})
}
}
}
Err(errors)
}
pub async fn broadcast_without_retry<'a, F, G, H>(
&'a self,
func: F,
) -> Vec<Result<H, EngineError>>
where
F: Fn(&'a Engine<BuilderApi>) -> G,
G: Future<Output = Result<H, EngineApiError>>,
{
let func = &func;
let futures = self.builders.iter().map(|engine| async move {
func(engine).await.map_err(|error| {
debug!(
self.log,
"Builder call failed";
"error" => ?error,
"id" => &engine.id
);
EngineError::Api {
id: engine.id.clone(),
error,
}
})
});
join_all(futures).await
}
}
impl PayloadIdCacheKey {
fn new(state: &ForkChoiceState, attributes: &PayloadAttributes) -> Self {
Self {

View File

@@ -14,6 +14,7 @@ use serde::{Deserialize, Serialize};
use slog::{crit, debug, error, info, trace, Logger};
use slot_clock::SlotClock;
use std::collections::HashMap;
use std::convert::TryInto;
use std::future::Future;
use std::io::Write;
use std::path::PathBuf;
@@ -24,11 +25,15 @@ use tokio::{
sync::{Mutex, MutexGuard, RwLock},
time::{sleep, sleep_until, Instant},
};
use types::{ChainSpec, Epoch, ExecutionBlockHash, ProposerPreparationData, Slot};
pub use engine_api::{
http::HttpJsonRpc, json_structures, PayloadAttributes, PayloadStatusV1Status,
use types::{
BlindedPayload, BlockType, ChainSpec, Epoch, ExecPayload, ExecutionBlockHash,
ProposerPreparationData, SignedBeaconBlock, Slot,
};
use crate::engine_api::Builder;
use crate::engines::Builders;
pub use engine_api::*;
pub use engine_api::{http, http::HttpJsonRpc};
pub use payload_status::PayloadStatus;
mod engine_api;
@@ -59,6 +64,7 @@ const CONFIG_POLL_INTERVAL: Duration = Duration::from_secs(60);
#[derive(Debug)]
pub enum Error {
NoEngines,
NoPayloadBuilder,
ApiError(ApiError),
EngineErrors(Vec<EngineError>),
NotSynced,
@@ -94,7 +100,8 @@ pub struct Proposer {
}
struct Inner {
engines: Engines<HttpJsonRpc>,
engines: Engines,
builders: Builders,
execution_engine_forkchoice_lock: Mutex<()>,
suggested_fee_recipient: Option<Address>,
proposer_preparation_data: Mutex<HashMap<u64, ProposerPreparationDataEntry>>,
@@ -108,6 +115,8 @@ struct Inner {
pub struct Config {
/// Endpoint urls for EL nodes that are running the engine api.
pub execution_endpoints: Vec<SensitiveUrl>,
/// Endpoint urls for services providing the builder api.
pub builder_endpoints: Vec<SensitiveUrl>,
/// JWT secrets for the above endpoints running the engine api.
pub secret_files: Vec<PathBuf>,
/// The default fee recipient to use on the beacon node if none if provided from
@@ -148,6 +157,7 @@ impl ExecutionLayer {
pub fn from_config(config: Config, executor: TaskExecutor, log: Logger) -> Result<Self, Error> {
let Config {
execution_endpoints: urls,
builder_endpoints: builder_urls,
mut secret_files,
suggested_fee_recipient,
jwt_id,
@@ -203,15 +213,24 @@ impl ExecutionLayer {
.collect::<Result<_, _>>()
.map_err(Error::InvalidJWTSecret)?;
let engines: Vec<Engine<_>> = urls
let engines: Vec<Engine<EngineApi>> = urls
.into_iter()
.zip(secrets.into_iter())
.map(|(url, (secret, path))| {
let id = url.to_string();
let auth = Auth::new(secret, jwt_id.clone(), jwt_version.clone());
debug!(log, "Loaded execution endpoint"; "endpoint" => %id, "jwt_path" => ?path);
let api = HttpJsonRpc::new_with_auth(url, auth)?;
Ok(Engine::new(id, api))
let api = HttpJsonRpc::<EngineApi>::new_with_auth(url, auth)?;
Ok(Engine::<EngineApi>::new(id, api))
})
.collect::<Result<_, ApiError>>()?;
let builders: Vec<Engine<BuilderApi>> = builder_urls
.into_iter()
.map(|url| {
let id = url.to_string();
let api = HttpJsonRpc::<BuilderApi>::new(url)?;
Ok(Engine::<BuilderApi>::new(id, api))
})
.collect::<Result<_, ApiError>>()?;
@@ -221,6 +240,10 @@ impl ExecutionLayer {
latest_forkchoice_state: <_>::default(),
log: log.clone(),
},
builders: Builders {
builders,
log: log.clone(),
},
execution_engine_forkchoice_lock: <_>::default(),
suggested_fee_recipient,
proposer_preparation_data: Mutex::new(HashMap::new()),
@@ -237,10 +260,14 @@ impl ExecutionLayer {
}
impl ExecutionLayer {
fn engines(&self) -> &Engines<HttpJsonRpc> {
fn engines(&self) -> &Engines {
&self.inner.engines
}
fn builders(&self) -> &Builders {
&self.inner.builders
}
fn executor(&self) -> &TaskExecutor {
&self.inner.executor
}
@@ -542,14 +569,14 @@ impl ExecutionLayer {
///
/// The result will be returned from the first node that returns successfully. No more nodes
/// will be contacted.
pub async fn get_payload<T: EthSpec>(
pub async fn get_payload<T: EthSpec, Payload: ExecPayload<T>>(
&self,
parent_hash: ExecutionBlockHash,
timestamp: u64,
prev_randao: Hash256,
finalized_block_hash: ExecutionBlockHash,
proposer_index: u64,
) -> Result<ExecutionPayload<T>, Error> {
) -> Result<Payload, Error> {
let _timer = metrics::start_timer_vec(
&metrics::EXECUTION_LAYER_REQUEST_TIMES,
&[metrics::GET_PAYLOAD],
@@ -557,72 +584,121 @@ impl ExecutionLayer {
let suggested_fee_recipient = self.get_suggested_fee_recipient(proposer_index).await;
debug!(
self.log(),
"Issuing engine_getPayload";
"suggested_fee_recipient" => ?suggested_fee_recipient,
"prev_randao" => ?prev_randao,
"timestamp" => timestamp,
"parent_hash" => ?parent_hash,
);
self.engines()
.first_success(|engine| async move {
let payload_id = if let Some(id) = engine
.get_payload_id(parent_hash, timestamp, prev_randao, suggested_fee_recipient)
match Payload::block_type() {
BlockType::Blinded => {
debug!(
self.log(),
"Issuing builder_getPayloadHeader";
"suggested_fee_recipient" => ?suggested_fee_recipient,
"prev_randao" => ?prev_randao,
"timestamp" => timestamp,
"parent_hash" => ?parent_hash,
);
self.builders()
.first_success_without_retry(|engine| async move {
let payload_id = engine
.get_payload_id(
parent_hash,
timestamp,
prev_randao,
suggested_fee_recipient,
)
.await
.ok_or(ApiError::MissingPayloadId {
parent_hash,
timestamp,
prev_randao,
suggested_fee_recipient,
})?;
engine
.api
.get_payload_header_v1::<T>(payload_id)
.await?
.try_into()
.map_err(|_| ApiError::PayloadConversionLogicFlaw)
})
.await
{
// The payload id has been cached for this engine.
metrics::inc_counter_vec(
&metrics::EXECUTION_LAYER_PRE_PREPARED_PAYLOAD_ID,
&[metrics::HIT],
);
id
} else {
// The payload id has *not* been cached for this engine. Trigger an artificial
// fork choice update to retrieve a payload ID.
//
// TODO(merge): a better algorithm might try to favour a node that already had a
// cached payload id, since a payload that has had more time to produce is
// likely to be more profitable.
metrics::inc_counter_vec(
&metrics::EXECUTION_LAYER_PRE_PREPARED_PAYLOAD_ID,
&[metrics::MISS],
);
let fork_choice_state = ForkChoiceState {
head_block_hash: parent_hash,
safe_block_hash: parent_hash,
finalized_block_hash,
};
let payload_attributes = PayloadAttributes {
timestamp,
prev_randao,
suggested_fee_recipient,
};
engine
.notify_forkchoice_updated(
fork_choice_state,
Some(payload_attributes),
self.log(),
)
.await
.map(|response| response.payload_id)?
.ok_or_else(|| {
error!(
self.log(),
"Exec engine unable to produce payload";
"msg" => "No payload ID, the engine is likely syncing. \
This has the potential to cause a missed block proposal.",
.map_err(Error::EngineErrors)
}
BlockType::Full => {
debug!(
self.log(),
"Issuing engine_getPayload";
"suggested_fee_recipient" => ?suggested_fee_recipient,
"prev_randao" => ?prev_randao,
"timestamp" => timestamp,
"parent_hash" => ?parent_hash,
);
self.engines()
.first_success(|engine| async move {
let payload_id = if let Some(id) = engine
.get_payload_id(
parent_hash,
timestamp,
prev_randao,
suggested_fee_recipient,
)
.await
{
// The payload id has been cached for this engine.
metrics::inc_counter_vec(
&metrics::EXECUTION_LAYER_PRE_PREPARED_PAYLOAD_ID,
&[metrics::HIT],
);
id
} else {
// The payload id has *not* been cached for this engine. Trigger an artificial
// fork choice update to retrieve a payload ID.
//
// TODO(merge): a better algorithm might try to favour a node that already had a
// cached payload id, since a payload that has had more time to produce is
// likely to be more profitable.
metrics::inc_counter_vec(
&metrics::EXECUTION_LAYER_PRE_PREPARED_PAYLOAD_ID,
&[metrics::MISS],
);
let fork_choice_state = ForkChoiceState {
head_block_hash: parent_hash,
safe_block_hash: parent_hash,
finalized_block_hash,
};
let payload_attributes = PayloadAttributes {
timestamp,
prev_randao,
suggested_fee_recipient,
};
ApiError::PayloadIdUnavailable
})?
};
engine
.notify_forkchoice_updated(
fork_choice_state,
Some(payload_attributes),
self.log(),
)
.await
.map(|response| response.payload_id)?
.ok_or_else(|| {
error!(
self.log(),
"Exec engine unable to produce payload";
"msg" => "No payload ID, the engine is likely syncing. \
This has the potential to cause a missed block \
proposal.",
);
engine.api.get_payload_v1(payload_id).await
})
.await
.map_err(Error::EngineErrors)
ApiError::PayloadIdUnavailable
})?
};
engine
.api
.get_payload_v1::<T>(payload_id)
.await
.map(Into::into)
})
.await
.map_err(Error::EngineErrors)
}
}
}
/// Maps to the `engine_newPayload` JSON-RPC call.
@@ -801,10 +877,23 @@ impl ExecutionLayer {
})
.await;
// Only query builders with payload attributes populated.
let builder_broadcast_results = if payload_attributes.is_some() {
self.builders()
.broadcast_without_retry(|engine| async move {
engine
.notify_forkchoice_updated(forkchoice_state, payload_attributes, self.log())
.await
})
.await
} else {
vec![]
};
process_multiple_payload_statuses(
head_block_hash,
broadcast_results
.into_iter()
.chain(builder_broadcast_results.into_iter())
.map(|result| result.map(|response| response.payload_status)),
self.log(),
)
@@ -931,7 +1020,7 @@ impl ExecutionLayer {
/// https://github.com/ethereum/consensus-specs/blob/v1.1.5/specs/merge/validator.md
async fn get_pow_block_hash_at_total_difficulty(
&self,
engine: &Engine<HttpJsonRpc>,
engine: &Engine<EngineApi>,
spec: &ChainSpec,
) -> Result<Option<ExecutionBlockHash>, ApiError> {
let mut block = engine
@@ -1013,7 +1102,6 @@ impl ExecutionLayer {
));
}
}
Ok(None)
})
.await;
@@ -1076,7 +1164,7 @@ impl ExecutionLayer {
/// https://github.com/ethereum/consensus-specs/issues/2636
async fn get_pow_block(
&self,
engine: &Engine<HttpJsonRpc>,
engine: &Engine<EngineApi>,
hash: ExecutionBlockHash,
) -> Result<Option<ExecutionBlock>, ApiError> {
if let Some(cached) = self.execution_blocks().await.get(&hash).copied() {
@@ -1094,6 +1182,23 @@ impl ExecutionLayer {
Ok(None)
}
}
pub async fn propose_blinded_beacon_block<T: EthSpec>(
&self,
block: &SignedBeaconBlock<T, BlindedPayload<T>>,
) -> Result<ExecutionPayload<T>, Error> {
debug!(
self.log(),
"Issuing builder_proposeBlindedBlock";
"root" => ?block.canonical_root(),
);
self.builders()
.first_success_without_retry(|engine| async move {
engine.api.propose_blinded_block_v1(block.clone()).await
})
.await
.map_err(Error::EngineErrors)
}
}
#[cfg(test)]

View File

@@ -7,7 +7,7 @@ use sensitive_url::SensitiveUrl;
use std::sync::Arc;
use task_executor::TaskExecutor;
use tempfile::NamedTempFile;
use types::{Address, ChainSpec, Epoch, EthSpec, Hash256, Uint256};
use types::{Address, ChainSpec, Epoch, EthSpec, FullPayload, Hash256, Uint256};
pub struct ExecutionLayerRuntime {
pub runtime: Option<Arc<tokio::runtime::Runtime>>,
@@ -154,7 +154,7 @@ impl<T: EthSpec> MockExecutionLayer<T> {
let validator_index = 0;
let payload = self
.el
.get_payload::<T>(
.get_payload::<T, FullPayload<T>>(
parent_hash,
timestamp,
prev_randao,
@@ -162,7 +162,8 @@ impl<T: EthSpec> MockExecutionLayer<T> {
validator_index,
)
.await
.unwrap();
.unwrap()
.execution_payload;
let block_hash = payload.block_hash;
assert_eq!(payload.parent_hash, parent_hash);
assert_eq!(payload.block_number, block_number);