Remove builder redundancy (#3294)

## Issue Addressed

This PR is a subset of the changes in #3134. Unstable will still not function correctly with the new builder spec once this is merged, #3134 should be used on testnets

## Proposed Changes

- Removes redundancy in "builders" (servers implementing the builder spec)
- Renames `payload-builder` flag to `builder`
- Moves from old builder RPC API to new HTTP API, but does not implement the validator registration API (implemented in https://github.com/sigp/lighthouse/pull/3194)



Co-authored-by: sean <seananderson33@gmail.com>
Co-authored-by: realbigsean <sean@sigmaprime.io>
This commit is contained in:
realbigsean
2022-07-01 01:15:19 +00:00
parent d40c76e667
commit a7da0677d5
25 changed files with 564 additions and 374 deletions

View File

@@ -38,3 +38,4 @@ zeroize = { version = "1.4.2", features = ["zeroize_derive"] }
lighthouse_metrics = { path = "../../common/lighthouse_metrics" }
lazy_static = "1.4.0"
ethers-core = { git = "https://github.com/gakonst/ethers-rs", rev = "02ad93a1cfb7b62eb051c77c61dc4c0218428e4a" }
builder_client = { path = "../builder_client" }

View File

@@ -1,11 +1,9 @@
use crate::engines::ForkChoiceState;
use async_trait::async_trait;
pub use ethers_core::types::Transaction;
use http::deposit_methods::RpcError;
pub use json_structures::TransitionConfigurationV1;
use reqwest::StatusCode;
use serde::{Deserialize, Serialize};
use slog::Logger;
pub use types::{
Address, EthSpec, ExecutionBlockHash, ExecutionPayload, ExecutionPayloadHeader, FixedVector,
Hash256, Uint256, VariableList,
@@ -28,10 +26,7 @@ pub enum Error {
InvalidExecutePayloadResponse(&'static str),
JsonRpc(RpcError),
Json(serde_json::Error),
ServerMessage {
code: i64,
message: String,
},
ServerMessage { code: i64, message: String },
Eip155Failure,
IsSyncing,
ExecutionBlockNotFound(ExecutionBlockHash),
@@ -40,15 +35,9 @@ pub enum Error {
PayloadIdUnavailable,
TransitionConfigurationMismatch,
PayloadConversionLogicFlaw,
InvalidBuilderQuery,
MissingPayloadId {
parent_hash: ExecutionBlockHash,
timestamp: u64,
prev_randao: Hash256,
suggested_fee_recipient: Address,
},
DeserializeTransaction(ssz_types::Error),
DeserializeTransactions(ssz_types::Error),
BuilderApi(builder_client::Error),
}
impl From<reqwest::Error> for Error {
@@ -76,19 +65,14 @@ impl From<auth::Error> for Error {
}
}
pub struct EngineApi;
pub struct BuilderApi;
#[async_trait]
pub trait Builder {
async fn notify_forkchoice_updated(
&self,
forkchoice_state: ForkChoiceState,
payload_attributes: Option<PayloadAttributes>,
log: &Logger,
) -> Result<ForkchoiceUpdatedResponse, Error>;
impl From<builder_client::Error> for Error {
fn from(e: builder_client::Error) -> Self {
Error::BuilderApi(e)
}
}
pub struct EngineApi;
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum PayloadStatusV1Status {
Valid,

View File

@@ -10,7 +10,7 @@ use serde_json::json;
use std::marker::PhantomData;
use std::time::Duration;
use types::{BlindedPayload, EthSpec, ExecutionPayloadHeader, SignedBeaconBlock};
use types::EthSpec;
pub use deposit_log::{DepositLog, Log};
pub use reqwest::Client;
@@ -43,12 +43,6 @@ pub const ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1: &str =
pub const ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1_TIMEOUT: Duration =
Duration::from_millis(500);
pub const BUILDER_GET_PAYLOAD_HEADER_V1: &str = "builder_getPayloadHeaderV1";
pub const BUILDER_GET_PAYLOAD_HEADER_TIMEOUT: Duration = Duration::from_secs(2);
pub const BUILDER_PROPOSE_BLINDED_BLOCK_V1: &str = "builder_proposeBlindedBlockV1";
pub const BUILDER_PROPOSE_BLINDED_BLOCK_TIMEOUT: Duration = Duration::from_secs(2);
/// This error is returned during a `chainId` call by Geth.
pub const EIP155_ERROR_STR: &str = "chain not synced beyond EIP-155 replay-protection fork block";
@@ -714,63 +708,6 @@ impl HttpJsonRpc<EngineApi> {
}
}
impl HttpJsonRpc<BuilderApi> {
pub async fn get_payload_header_v1<T: EthSpec>(
&self,
payload_id: PayloadId,
) -> Result<ExecutionPayloadHeader<T>, Error> {
let params = json!([JsonPayloadIdRequest::from(payload_id)]);
let response: JsonExecutionPayloadHeaderV1<T> = self
.rpc_request(
BUILDER_GET_PAYLOAD_HEADER_V1,
params,
BUILDER_GET_PAYLOAD_HEADER_TIMEOUT,
)
.await?;
Ok(response.into())
}
pub async fn forkchoice_updated_v1(
&self,
forkchoice_state: ForkChoiceState,
payload_attributes: Option<PayloadAttributes>,
) -> Result<ForkchoiceUpdatedResponse, Error> {
let params = json!([
JsonForkChoiceStateV1::from(forkchoice_state),
payload_attributes.map(JsonPayloadAttributesV1::from)
]);
let response: JsonForkchoiceUpdatedV1Response = self
.rpc_request(
ENGINE_FORKCHOICE_UPDATED_V1,
params,
ENGINE_FORKCHOICE_UPDATED_TIMEOUT,
)
.await?;
Ok(response.into())
}
pub async fn propose_blinded_block_v1<T: EthSpec>(
&self,
block: SignedBeaconBlock<T, BlindedPayload<T>>,
) -> Result<ExecutionPayload<T>, Error> {
let params = json!([block]);
let response: JsonExecutionPayloadV1<T> = self
.rpc_request(
BUILDER_PROPOSE_BLINDED_BLOCK_V1,
params,
BUILDER_PROPOSE_BLINDED_BLOCK_TIMEOUT,
)
.await?;
Ok(response.into())
}
}
#[cfg(test)]
mod test {
use super::auth::JwtKey;

View File

@@ -1,12 +1,9 @@
//! Provides generic behaviour for multiple execution engines, specifically fallback behaviour.
use crate::engine_api::{
Builder, EngineApi, Error as EngineApiError, ForkchoiceUpdatedResponse, PayloadAttributes,
PayloadId,
EngineApi, Error as EngineApiError, ForkchoiceUpdatedResponse, PayloadAttributes, PayloadId,
};
use crate::{BuilderApi, HttpJsonRpc};
use async_trait::async_trait;
use futures::future::join_all;
use crate::HttpJsonRpc;
use lru::LruCache;
use slog::{crit, debug, info, warn, Logger};
use std::future::Future;
@@ -97,9 +94,8 @@ impl<T> Engine<T> {
}
}
#[async_trait]
impl Builder for Engine<EngineApi> {
async fn notify_forkchoice_updated(
impl Engine<EngineApi> {
pub async fn notify_forkchoice_updated(
&self,
forkchoice_state: ForkChoiceState,
payload_attributes: Option<PayloadAttributes>,
@@ -128,34 +124,6 @@ impl Builder for Engine<EngineApi> {
}
}
#[async_trait]
impl Builder for Engine<BuilderApi> {
async fn notify_forkchoice_updated(
&self,
forkchoice_state: ForkChoiceState,
pa: Option<PayloadAttributes>,
log: &Logger,
) -> Result<ForkchoiceUpdatedResponse, EngineApiError> {
let payload_attributes = pa.ok_or(EngineApiError::InvalidBuilderQuery)?;
let response = self
.api
.forkchoice_updated_v1(forkchoice_state, Some(payload_attributes))
.await?;
if let Some(payload_id) = response.payload_id {
let key = PayloadIdCacheKey::new(&forkchoice_state, &payload_attributes);
self.payload_id_cache.lock().await.put(key, payload_id);
} else {
warn!(
log,
"Builder should have returned a payload_id for attributes {:?}", payload_attributes
);
}
Ok(response)
}
}
// This structure used to hold multiple execution engines managed in a fallback manner. This
// functionality has been removed following https://github.com/sigp/lighthouse/issues/3118 and this
// struct will likely be removed in the future.
@@ -165,15 +133,11 @@ pub struct Engines {
pub log: Logger,
}
pub struct Builders {
pub builders: Vec<Engine<BuilderApi>>,
pub log: Logger,
}
#[derive(Debug)]
pub enum EngineError {
Offline { id: String },
Api { id: String, error: EngineApiError },
BuilderApi { error: EngineApiError },
Auth { id: String },
}
@@ -422,66 +386,6 @@ impl Engines {
}
}
impl Builders {
pub async fn first_success_without_retry<'a, F, G, H>(
&'a self,
func: F,
) -> Result<H, Vec<EngineError>>
where
F: Fn(&'a Engine<BuilderApi>) -> G,
G: Future<Output = Result<H, EngineApiError>>,
{
let mut errors = vec![];
for builder in &self.builders {
match func(builder).await {
Ok(result) => return Ok(result),
Err(error) => {
debug!(
self.log,
"Builder call failed";
"error" => ?error,
"id" => &builder.id
);
errors.push(EngineError::Api {
id: builder.id.clone(),
error,
})
}
}
}
Err(errors)
}
pub async fn broadcast_without_retry<'a, F, G, H>(
&'a self,
func: F,
) -> Vec<Result<H, EngineError>>
where
F: Fn(&'a Engine<BuilderApi>) -> G,
G: Future<Output = Result<H, EngineApiError>>,
{
let func = &func;
let futures = self.builders.iter().map(|engine| async move {
func(engine).await.map_err(|error| {
debug!(
self.log,
"Builder call failed";
"error" => ?error,
"id" => &engine.id
);
EngineError::Api {
id: engine.id.clone(),
error,
}
})
});
join_all(futures).await
}
}
impl PayloadIdCacheKey {
fn new(state: &ForkChoiceState, attributes: &PayloadAttributes) -> Self {
Self {

View File

@@ -4,9 +4,8 @@
//! This crate only provides useful functionality for "The Merge", it does not provide any of the
//! deposit-contract functionality that the `beacon_node/eth1` crate already provides.
use crate::engine_api::Builder;
use crate::engines::Builders;
use auth::{strip_prefix, Auth, JwtKey};
use builder_client::BuilderHttpClient;
use engine_api::Error as ApiError;
pub use engine_api::*;
pub use engine_api::{http, http::deposit_methods, http::HttpJsonRpc};
@@ -20,7 +19,6 @@ use serde::{Deserialize, Serialize};
use slog::{crit, debug, error, info, trace, warn, Logger};
use slot_clock::SlotClock;
use std::collections::HashMap;
use std::convert::TryInto;
use std::future::Future;
use std::io::Write;
use std::path::PathBuf;
@@ -33,7 +31,7 @@ use tokio::{
};
use types::{
BlindedPayload, BlockType, ChainSpec, Epoch, ExecPayload, ExecutionBlockHash,
ProposerPreparationData, SignedBeaconBlock, Slot,
ProposerPreparationData, PublicKeyBytes, SignedBeaconBlock, Slot,
};
mod engine_api;
@@ -69,6 +67,7 @@ pub enum Error {
NoEngines,
NoPayloadBuilder,
ApiError(ApiError),
Builder(builder_client::Error),
EngineErrors(Vec<EngineError>),
NotSynced,
ShuttingDown,
@@ -102,15 +101,16 @@ pub struct Proposer {
payload_attributes: PayloadAttributes,
}
struct Inner {
struct Inner<E: EthSpec> {
engines: Engines,
builders: Builders,
builder: Option<BuilderHttpClient>,
execution_engine_forkchoice_lock: Mutex<()>,
suggested_fee_recipient: Option<Address>,
proposer_preparation_data: Mutex<HashMap<u64, ProposerPreparationDataEntry>>,
execution_blocks: Mutex<LruCache<ExecutionBlockHash, ExecutionBlock>>,
proposers: RwLock<HashMap<ProposerKey, Proposer>>,
executor: TaskExecutor,
phantom: std::marker::PhantomData<E>,
log: Logger,
}
@@ -119,7 +119,7 @@ pub struct Config {
/// Endpoint urls for EL nodes that are running the engine api.
pub execution_endpoints: Vec<SensitiveUrl>,
/// Endpoint urls for services providing the builder api.
pub builder_endpoints: Vec<SensitiveUrl>,
pub builder_url: Option<SensitiveUrl>,
/// JWT secrets for the above endpoints running the engine api.
pub secret_files: Vec<PathBuf>,
/// The default fee recipient to use on the beacon node if none if provided from
@@ -143,16 +143,16 @@ pub struct Config {
///
/// The fallback nodes have an ordering. The first supplied will be the first contacted, and so on.
#[derive(Clone)]
pub struct ExecutionLayer {
inner: Arc<Inner>,
pub struct ExecutionLayer<T: EthSpec> {
inner: Arc<Inner<T>>,
}
impl ExecutionLayer {
impl<T: EthSpec> ExecutionLayer<T> {
/// Instantiate `Self` with Execution engines specified using `Config`, all using the JSON-RPC via HTTP.
pub fn from_config(config: Config, executor: TaskExecutor, log: Logger) -> Result<Self, Error> {
let Config {
execution_endpoints: urls,
builder_endpoints: builder_urls,
builder_url,
secret_files,
suggested_fee_recipient,
jwt_id,
@@ -208,14 +208,9 @@ impl ExecutionLayer {
Engine::<EngineApi>::new(id, api)
};
let builders: Vec<Engine<BuilderApi>> = builder_urls
.into_iter()
.map(|url| {
let id = url.to_string();
let api = HttpJsonRpc::<BuilderApi>::new(url)?;
Ok(Engine::<BuilderApi>::new(id, api))
})
.collect::<Result<_, ApiError>>()?;
let builder = builder_url
.map(|url| BuilderHttpClient::new(url).map_err(Error::Builder))
.transpose()?;
let inner = Inner {
engines: Engines {
@@ -223,16 +218,14 @@ impl ExecutionLayer {
latest_forkchoice_state: <_>::default(),
log: log.clone(),
},
builders: Builders {
builders,
log: log.clone(),
},
builder,
execution_engine_forkchoice_lock: <_>::default(),
suggested_fee_recipient,
proposer_preparation_data: Mutex::new(HashMap::new()),
proposers: RwLock::new(HashMap::new()),
execution_blocks: Mutex::new(LruCache::new(EXECUTION_BLOCKS_LRU_CACHE_SIZE)),
executor,
phantom: std::marker::PhantomData,
log,
};
@@ -242,13 +235,13 @@ impl ExecutionLayer {
}
}
impl ExecutionLayer {
impl<T: EthSpec> ExecutionLayer<T> {
fn engines(&self) -> &Engines {
&self.inner.engines
}
fn builders(&self) -> &Builders {
&self.inner.builders
pub fn builder(&self) -> &Option<BuilderHttpClient> {
&self.inner.builder
}
pub fn executor(&self) -> &TaskExecutor {
@@ -282,9 +275,9 @@ impl ExecutionLayer {
}
/// Convenience function to allow calling async functions in a non-async context.
pub fn block_on<'a, T, U, V>(&'a self, generate_future: T) -> Result<V, Error>
pub fn block_on<'a, F, U, V>(&'a self, generate_future: F) -> Result<V, Error>
where
T: Fn(&'a Self) -> U,
F: Fn(&'a Self) -> U,
U: Future<Output = Result<V, Error>>,
{
let runtime = self.executor().handle().ok_or(Error::ShuttingDown)?;
@@ -296,9 +289,9 @@ impl ExecutionLayer {
///
/// The function is "generic" since it does not enforce a particular return type on
/// `generate_future`.
pub fn block_on_generic<'a, T, U, V>(&'a self, generate_future: T) -> Result<V, Error>
pub fn block_on_generic<'a, F, U, V>(&'a self, generate_future: F) -> Result<V, Error>
where
T: Fn(&'a Self) -> U,
F: Fn(&'a Self) -> U,
U: Future<Output = V>,
{
let runtime = self.executor().handle().ok_or(Error::ShuttingDown)?;
@@ -307,9 +300,9 @@ impl ExecutionLayer {
}
/// Convenience function to allow spawning a task without waiting for the result.
pub fn spawn<T, U>(&self, generate_future: T, name: &'static str)
pub fn spawn<F, U>(&self, generate_future: F, name: &'static str)
where
T: FnOnce(Self) -> U,
F: FnOnce(Self) -> U,
U: Future<Output = ()> + Send + 'static,
{
self.executor().spawn(generate_future(self.clone()), name);
@@ -317,12 +310,12 @@ impl ExecutionLayer {
/// Spawns a routine which attempts to keep the execution engines online.
pub fn spawn_watchdog_routine<S: SlotClock + 'static>(&self, slot_clock: S) {
let watchdog = |el: ExecutionLayer| async move {
let watchdog = |el: ExecutionLayer<T>| async move {
// Run one task immediately.
el.watchdog_task().await;
let recurring_task =
|el: ExecutionLayer, now: Instant, duration_to_next_slot: Duration| async move {
|el: ExecutionLayer<T>, now: Instant, duration_to_next_slot: Duration| async move {
// We run the task three times per slot.
//
// The interval between each task is 1/3rd of the slot duration. This matches nicely
@@ -377,11 +370,8 @@ impl ExecutionLayer {
}
/// Spawns a routine which cleans the cached proposer data periodically.
pub fn spawn_clean_proposer_caches_routine<S: SlotClock + 'static, T: EthSpec>(
&self,
slot_clock: S,
) {
let preparation_cleaner = |el: ExecutionLayer| async move {
pub fn spawn_clean_proposer_caches_routine<S: SlotClock + 'static>(&self, slot_clock: S) {
let preparation_cleaner = |el: ExecutionLayer<T>| async move {
// Start the loop to periodically clean proposer preparation cache.
loop {
if let Some(duration_to_next_epoch) =
@@ -395,7 +385,7 @@ impl ExecutionLayer {
.map(|slot| slot.epoch(T::slots_per_epoch()))
{
Some(current_epoch) => el
.clean_proposer_caches::<T>(current_epoch)
.clean_proposer_caches(current_epoch)
.await
.map_err(|e| {
error!(
@@ -420,7 +410,7 @@ impl ExecutionLayer {
/// Spawns a routine that polls the `exchange_transition_configuration` endpoint.
pub fn spawn_transition_configuration_poll(&self, spec: ChainSpec) {
let routine = |el: ExecutionLayer| async move {
let routine = |el: ExecutionLayer<T>| async move {
loop {
if let Err(e) = el.exchange_transition_configuration(&spec).await {
error!(
@@ -454,7 +444,7 @@ impl ExecutionLayer {
}
/// Updates the proposer preparation data provided by validators
async fn update_proposer_preparation(
pub async fn update_proposer_preparation(
&self,
update_epoch: Epoch,
preparation_data: &[ProposerPreparationData],
@@ -476,7 +466,7 @@ impl ExecutionLayer {
}
/// Removes expired entries from proposer_preparation_data and proposers caches
async fn clean_proposer_caches<T: EthSpec>(&self, current_epoch: Epoch) -> Result<(), Error> {
async fn clean_proposer_caches(&self, current_epoch: Epoch) -> Result<(), Error> {
let mut proposer_preparation_data = self.proposer_preparation_data().await;
// Keep all entries that have been updated in the last 2 epochs
@@ -561,104 +551,164 @@ impl ExecutionLayer {
///
/// The result will be returned from the first node that returns successfully. No more nodes
/// will be contacted.
pub async fn get_payload<T: EthSpec, Payload: ExecPayload<T>>(
#[allow(clippy::too_many_arguments)]
pub async fn get_payload<Payload: ExecPayload<T>>(
&self,
parent_hash: ExecutionBlockHash,
timestamp: u64,
prev_randao: Hash256,
finalized_block_hash: ExecutionBlockHash,
proposer_index: u64,
pubkey: Option<PublicKeyBytes>,
slot: Slot,
) -> Result<Payload, Error> {
let _timer = metrics::start_timer_vec(
&metrics::EXECUTION_LAYER_REQUEST_TIMES,
&[metrics::GET_PAYLOAD],
);
let suggested_fee_recipient = self.get_suggested_fee_recipient(proposer_index).await;
match Payload::block_type() {
BlockType::Blinded => {
debug!(
self.log(),
"Issuing builder_getPayloadHeader";
"suggested_fee_recipient" => ?suggested_fee_recipient,
"prev_randao" => ?prev_randao,
"timestamp" => timestamp,
"parent_hash" => ?parent_hash,
let _timer = metrics::start_timer_vec(
&metrics::EXECUTION_LAYER_REQUEST_TIMES,
&[metrics::GET_BLINDED_PAYLOAD],
);
self.builders()
.first_success_without_retry(|engine| async move {
let payload_id = engine
.get_payload_id(
parent_hash,
timestamp,
prev_randao,
suggested_fee_recipient,
)
.await
.ok_or(ApiError::MissingPayloadId {
parent_hash,
timestamp,
prev_randao,
suggested_fee_recipient,
})?;
engine
.api
.get_payload_header_v1::<T>(payload_id)
.await?
.try_into()
.map_err(|_| ApiError::PayloadConversionLogicFlaw)
})
.await
.map_err(Error::EngineErrors)
self.get_blinded_payload(
parent_hash,
timestamp,
prev_randao,
finalized_block_hash,
suggested_fee_recipient,
pubkey,
slot,
)
.await
}
BlockType::Full => {
debug!(
let _timer = metrics::start_timer_vec(
&metrics::EXECUTION_LAYER_REQUEST_TIMES,
&[metrics::GET_PAYLOAD],
);
self.get_full_payload(
parent_hash,
timestamp,
prev_randao,
finalized_block_hash,
suggested_fee_recipient,
)
.await
}
}
}
#[allow(clippy::too_many_arguments)]
async fn get_blinded_payload<Payload: ExecPayload<T>>(
&self,
parent_hash: ExecutionBlockHash,
timestamp: u64,
prev_randao: Hash256,
finalized_block_hash: ExecutionBlockHash,
suggested_fee_recipient: Address,
pubkey_opt: Option<PublicKeyBytes>,
slot: Slot,
) -> Result<Payload, Error> {
//FIXME(sean) fallback logic included in PR #3134
// Don't attempt to outsource payload construction until after the merge transition has been
// finalized. We want to be conservative with payload construction until then.
if let (Some(builder), Some(pubkey)) = (self.builder(), pubkey_opt) {
if finalized_block_hash != ExecutionBlockHash::zero() {
info!(
self.log(),
"Issuing engine_getPayload";
"suggested_fee_recipient" => ?suggested_fee_recipient,
"prev_randao" => ?prev_randao,
"timestamp" => timestamp,
"Requesting blinded header from connected builder";
"slot" => ?slot,
"pubkey" => ?pubkey,
"parent_hash" => ?parent_hash,
);
self.engines()
.first_success(|engine| async move {
let payload_id = if let Some(id) = engine
.get_payload_id(
parent_hash,
timestamp,
prev_randao,
suggested_fee_recipient,
)
.await
{
// The payload id has been cached for this engine.
metrics::inc_counter_vec(
&metrics::EXECUTION_LAYER_PRE_PREPARED_PAYLOAD_ID,
&[metrics::HIT],
);
id
} else {
// The payload id has *not* been cached for this engine. Trigger an artificial
// fork choice update to retrieve a payload ID.
//
// TODO(merge): a better algorithm might try to favour a node that already had a
// cached payload id, since a payload that has had more time to produce is
// likely to be more profitable.
metrics::inc_counter_vec(
&metrics::EXECUTION_LAYER_PRE_PREPARED_PAYLOAD_ID,
&[metrics::MISS],
);
let fork_choice_state = ForkChoiceState {
head_block_hash: parent_hash,
safe_block_hash: parent_hash,
finalized_block_hash,
};
let payload_attributes = PayloadAttributes {
timestamp,
prev_randao,
suggested_fee_recipient,
};
return builder
.get_builder_header::<T, Payload>(slot, parent_hash, &pubkey)
.await
.map(|d| d.data.message.header)
.map_err(Error::Builder);
}
}
self.get_full_payload::<Payload>(
parent_hash,
timestamp,
prev_randao,
finalized_block_hash,
suggested_fee_recipient,
)
.await
}
/// Get a full payload without caching its result in the execution layer's payload cache.
async fn get_full_payload<Payload: ExecPayload<T>>(
&self,
parent_hash: ExecutionBlockHash,
timestamp: u64,
prev_randao: Hash256,
finalized_block_hash: ExecutionBlockHash,
suggested_fee_recipient: Address,
) -> Result<Payload, Error> {
self.get_full_payload_with(
parent_hash,
timestamp,
prev_randao,
finalized_block_hash,
suggested_fee_recipient,
noop,
)
.await
}
async fn get_full_payload_with<Payload: ExecPayload<T>>(
&self,
parent_hash: ExecutionBlockHash,
timestamp: u64,
prev_randao: Hash256,
finalized_block_hash: ExecutionBlockHash,
suggested_fee_recipient: Address,
f: fn(&ExecutionLayer<T>, &ExecutionPayload<T>) -> Option<ExecutionPayload<T>>,
) -> Result<Payload, Error> {
debug!(
self.log(),
"Issuing engine_getPayload";
"suggested_fee_recipient" => ?suggested_fee_recipient,
"prev_randao" => ?prev_randao,
"timestamp" => timestamp,
"parent_hash" => ?parent_hash,
);
self.engines()
.first_success(|engine| async move {
let payload_id = if let Some(id) = engine
.get_payload_id(parent_hash, timestamp, prev_randao, suggested_fee_recipient)
.await
{
// The payload id has been cached for this engine.
metrics::inc_counter_vec(
&metrics::EXECUTION_LAYER_PRE_PREPARED_PAYLOAD_ID,
&[metrics::HIT],
);
id
} else {
// The payload id has *not* been cached for this engine. Trigger an artificial
// fork choice update to retrieve a payload ID.
//
// TODO(merge): a better algorithm might try to favour a node that already had a
// cached payload id, since a payload that has had more time to produce is
// likely to be more profitable.
metrics::inc_counter_vec(
&metrics::EXECUTION_LAYER_PRE_PREPARED_PAYLOAD_ID,
&[metrics::MISS],
);
let fork_choice_state = ForkChoiceState {
head_block_hash: parent_hash,
safe_block_hash: parent_hash,
finalized_block_hash,
};
let payload_attributes = PayloadAttributes {
timestamp,
prev_randao,
suggested_fee_recipient,
};
let response = engine
.notify_forkchoice_updated(
@@ -684,16 +734,19 @@ impl ExecutionLayer {
}
};
engine
.api
.get_payload_v1::<T>(payload_id)
.await
.map(Into::into)
})
engine
.api
.get_payload_v1::<T>(payload_id)
.await
.map_err(Error::EngineErrors)
}
}
.map(|full_payload| {
if f(self, &full_payload).is_some() {
warn!(self.log(), "Duplicate payload cached, this might indicate redundant proposal attempts.");
}
full_payload.into()
})
})
.await
.map_err(Error::EngineErrors)
}
/// Maps to the `engine_newPayload` JSON-RPC call.
@@ -709,7 +762,7 @@ impl ExecutionLayer {
/// - Invalid, if any nodes return invalid.
/// - Syncing, if any nodes return syncing.
/// - An error, if all nodes return an error.
pub async fn notify_new_payload<T: EthSpec>(
pub async fn notify_new_payload(
&self,
execution_payload: &ExecutionPayload<T>,
) -> Result<PayloadStatus, Error> {
@@ -872,23 +925,10 @@ impl ExecutionLayer {
})
.await;
// Only query builders with payload attributes populated.
let builder_broadcast_results = if payload_attributes.is_some() {
self.builders()
.broadcast_without_retry(|engine| async move {
engine
.notify_forkchoice_updated(forkchoice_state, payload_attributes, self.log())
.await
})
.await
} else {
vec![]
};
process_multiple_payload_statuses(
head_block_hash,
Some(broadcast_results)
.into_iter()
.chain(builder_broadcast_results.into_iter())
.map(|result| result.map(|response| response.payload_status)),
self.log(),
)
@@ -1147,7 +1187,7 @@ impl ExecutionLayer {
}
}
pub async fn get_payload_by_block_hash<T: EthSpec>(
pub async fn get_payload_by_block_hash(
&self,
hash: ExecutionBlockHash,
) -> Result<Option<ExecutionPayload<T>>, Error> {
@@ -1160,7 +1200,7 @@ impl ExecutionLayer {
.map_err(Error::EngineErrors)
}
async fn get_payload_by_block_hash_from_engine<T: EthSpec>(
async fn get_payload_by_block_hash_from_engine(
&self,
engine: &Engine<EngineApi>,
hash: ExecutionBlockHash,
@@ -1205,21 +1245,24 @@ impl ExecutionLayer {
}))
}
pub async fn propose_blinded_beacon_block<T: EthSpec>(
pub async fn propose_blinded_beacon_block(
&self,
block: &SignedBeaconBlock<T, BlindedPayload<T>>,
) -> Result<ExecutionPayload<T>, Error> {
debug!(
self.log(),
"Issuing builder_proposeBlindedBlock";
"Sending block to builder";
"root" => ?block.canonical_root(),
);
self.builders()
.first_success_without_retry(|engine| async move {
engine.api.propose_blinded_block_v1(block.clone()).await
})
.await
.map_err(Error::EngineErrors)
if let Some(builder) = self.builder() {
builder
.post_builder_blinded_blocks(block)
.await
.map_err(Error::Builder)
.map(|d| d.data)
} else {
Err(Error::NoPayloadBuilder)
}
}
}
@@ -1320,3 +1363,7 @@ mod test {
.await;
}
}
fn noop<T: EthSpec>(_: &ExecutionLayer<T>, _: &ExecutionPayload<T>) -> Option<ExecutionPayload<T>> {
None
}

View File

@@ -3,6 +3,7 @@ pub use lighthouse_metrics::*;
pub const HIT: &str = "hit";
pub const MISS: &str = "miss";
pub const GET_PAYLOAD: &str = "get_payload";
pub const GET_BLINDED_PAYLOAD: &str = "get_blinded_payload";
pub const NEW_PAYLOAD: &str = "new_payload";
pub const FORKCHOICE_UPDATED: &str = "forkchoice_updated";
pub const GET_TERMINAL_POW_BLOCK_HASH: &str = "get_terminal_pow_block_hash";

View File

@@ -9,7 +9,7 @@ use types::{Address, ChainSpec, Epoch, EthSpec, FullPayload, Hash256, Uint256};
pub struct MockExecutionLayer<T: EthSpec> {
pub server: MockServer<T>,
pub el: ExecutionLayer,
pub el: ExecutionLayer<T>,
pub executor: TaskExecutor,
pub spec: ChainSpec,
}
@@ -22,6 +22,7 @@ impl<T: EthSpec> MockExecutionLayer<T> {
DEFAULT_TERMINAL_BLOCK,
ExecutionBlockHash::zero(),
Epoch::new(0),
None,
)
}
@@ -31,6 +32,7 @@ impl<T: EthSpec> MockExecutionLayer<T> {
terminal_block: u64,
terminal_block_hash: ExecutionBlockHash,
terminal_block_hash_activation_epoch: Epoch,
builder_url: Option<SensitiveUrl>,
) -> Self {
let handle = executor.handle().unwrap();
@@ -54,6 +56,7 @@ impl<T: EthSpec> MockExecutionLayer<T> {
let config = Config {
execution_endpoints: vec![url],
builder_url,
secret_files: vec![path],
suggested_fee_recipient: Some(Address::repeat_byte(42)),
..Default::default()
@@ -111,12 +114,14 @@ impl<T: EthSpec> MockExecutionLayer<T> {
let validator_index = 0;
let payload = self
.el
.get_payload::<T, FullPayload<T>>(
.get_payload::<FullPayload<T>>(
parent_hash,
timestamp,
prev_randao,
finalized_block_hash,
validator_index,
None,
slot,
)
.await
.unwrap()
@@ -173,7 +178,7 @@ impl<T: EthSpec> MockExecutionLayer<T> {
pub async fn with_terminal_block<'a, U, V>(self, func: U) -> Self
where
U: Fn(ChainSpec, ExecutionLayer, Option<ExecutionBlock>) -> V,
U: Fn(ChainSpec, ExecutionLayer<T>, Option<ExecutionBlock>) -> V,
V: Future<Output = ()>,
{
let terminal_block_number = self