Prepare proposer (#3043)

## Issue Addressed

Resolves #2936

## Proposed Changes

Adds functionality for calling [`validator/prepare_beacon_proposer`](https://ethereum.github.io/beacon-APIs/?urls.primaryName=dev#/Validator/prepareBeaconProposer) in advance.

There is a `BeaconChain::prepare_beacon_proposer` method which, which called, computes the proposer for the next slot. If that proposer has been registered via the `validator/prepare_beacon_proposer` API method, then the `beacon_chain.execution_layer` will be provided the `PayloadAttributes` for us in all future forkchoiceUpdated calls. An artificial forkchoiceUpdated call will be created 4s before each slot, when the head updates and when a validator updates their information.

Additionally, I added strict ordering for calls from the `BeaconChain` to the `ExecutionLayer`. I'm not certain the `ExecutionLayer` will always maintain this ordering, but it's a good start to have consistency from the `BeaconChain`. There are some deadlock opportunities introduced, they are documented in the code.

## Additional Info

- ~~Blocked on #2837~~

Co-authored-by: realbigsean <seananderson33@GMAIL.com>
This commit is contained in:
Paul Hauner
2022-03-09 00:42:05 +00:00
parent 527dfa4893
commit 267d8babc8
21 changed files with 883 additions and 181 deletions

View File

@@ -130,7 +130,7 @@ pub struct ExecutionBlock {
pub total_difficulty: Uint256,
}
#[derive(Clone, Copy, Debug)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub struct PayloadAttributes {
pub timestamp: u64,
pub prev_randao: Hash256,

View File

@@ -152,6 +152,16 @@ impl<T: EngineApi> Engines<T> {
let latest_forkchoice_state = self.get_latest_forkchoice_state().await;
if let Some(forkchoice_state) = latest_forkchoice_state {
if forkchoice_state.head_block_hash == ExecutionBlockHash::zero() {
debug!(
self.log,
"No need to call forkchoiceUpdated";
"msg" => "head does not have execution enabled",
"id" => &engine.id,
);
return;
}
info!(
self.log,
"Issuing forkchoiceUpdated";

View File

@@ -18,19 +18,22 @@ use std::future::Future;
use std::io::Write;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use task_executor::TaskExecutor;
use tokio::{
sync::{Mutex, MutexGuard},
sync::{Mutex, MutexGuard, RwLock},
time::{sleep, sleep_until, Instant},
};
use types::{ChainSpec, Epoch, ExecutionBlockHash, ProposerPreparationData};
use types::{ChainSpec, Epoch, ExecutionBlockHash, ProposerPreparationData, Slot};
pub use engine_api::{http::HttpJsonRpc, PayloadAttributes, PayloadStatusV1Status};
pub use engine_api::{
http::HttpJsonRpc, json_structures, PayloadAttributes, PayloadStatusV1Status,
};
pub use payload_status::PayloadStatus;
mod engine_api;
mod engines;
mod metrics;
mod payload_status;
pub mod test_utils;
@@ -72,17 +75,31 @@ impl From<ApiError> for Error {
}
}
#[derive(Clone)]
#[derive(Clone, PartialEq)]
pub struct ProposerPreparationDataEntry {
update_epoch: Epoch,
preparation_data: ProposerPreparationData,
}
#[derive(Hash, PartialEq, Eq)]
pub struct ProposerKey {
slot: Slot,
head_block_root: Hash256,
}
#[derive(PartialEq, Clone)]
pub struct Proposer {
validator_index: u64,
payload_attributes: PayloadAttributes,
}
struct Inner {
engines: Engines<HttpJsonRpc>,
execution_engine_forkchoice_lock: Mutex<()>,
suggested_fee_recipient: Option<Address>,
proposer_preparation_data: Mutex<HashMap<u64, ProposerPreparationDataEntry>>,
execution_blocks: Mutex<LruCache<ExecutionBlockHash, ExecutionBlock>>,
proposers: RwLock<HashMap<ProposerKey, Proposer>>,
executor: TaskExecutor,
log: Logger,
}
@@ -204,8 +221,10 @@ impl ExecutionLayer {
latest_forkchoice_state: <_>::default(),
log: log.clone(),
},
execution_engine_forkchoice_lock: <_>::default(),
suggested_fee_recipient,
proposer_preparation_data: Mutex::new(HashMap::new()),
proposers: RwLock::new(HashMap::new()),
execution_blocks: Mutex::new(LruCache::new(EXECUTION_BLOCKS_LRU_CACHE_SIZE)),
executor,
log,
@@ -240,10 +259,18 @@ impl ExecutionLayer {
self.inner.proposer_preparation_data.lock().await
}
fn proposers(&self) -> &RwLock<HashMap<ProposerKey, Proposer>> {
&self.inner.proposers
}
fn log(&self) -> &Logger {
&self.inner.log
}
pub async fn execution_engine_forkchoice_lock(&self) -> MutexGuard<'_, ()> {
self.inner.execution_engine_forkchoice_lock.lock().await
}
/// Convenience function to allow calling async functions in a non-async context.
pub fn block_on<'a, T, U, V>(&'a self, generate_future: T) -> Result<V, Error>
where
@@ -421,7 +448,7 @@ impl ExecutionLayer {
self.block_on_generic(|_| async move {
self.update_proposer_preparation(update_epoch, preparation_data)
.await
})?
})
}
/// Updates the proposer preparation data provided by validators
@@ -429,19 +456,21 @@ impl ExecutionLayer {
&self,
update_epoch: Epoch,
preparation_data: &[ProposerPreparationData],
) -> Result<(), Error> {
) {
let mut proposer_preparation_data = self.proposer_preparation_data().await;
for preparation_entry in preparation_data {
proposer_preparation_data.insert(
preparation_entry.validator_index,
ProposerPreparationDataEntry {
update_epoch,
preparation_data: preparation_entry.clone(),
},
);
}
let new = ProposerPreparationDataEntry {
update_epoch,
preparation_data: preparation_entry.clone(),
};
Ok(())
let existing =
proposer_preparation_data.insert(preparation_entry.validator_index, new.clone());
if existing != Some(new) {
metrics::inc_counter(&metrics::EXECUTION_LAYER_PROPOSER_DATA_UPDATED);
}
}
}
/// Removes expired entries from cached proposer preparations
@@ -457,8 +486,22 @@ impl ExecutionLayer {
Ok(())
}
/// Returns `true` if there have been any validators registered via
/// `Self::update_proposer_preparation`.
pub async fn has_any_proposer_preparation_data(&self) -> bool {
!self.proposer_preparation_data().await.is_empty()
}
/// Returns `true` if the `proposer_index` has registered as a local validator via
/// `Self::update_proposer_preparation`.
pub async fn has_proposer_preparation_data(&self, proposer_index: u64) -> bool {
self.proposer_preparation_data()
.await
.contains_key(&proposer_index)
}
/// Returns the fee-recipient address that should be used to build a block
async fn get_suggested_fee_recipient(&self, proposer_index: u64) -> Address {
pub async fn get_suggested_fee_recipient(&self, proposer_index: u64) -> Address {
if let Some(preparation_data_entry) =
self.proposer_preparation_data().await.get(&proposer_index)
{
@@ -500,6 +543,11 @@ impl ExecutionLayer {
finalized_block_hash: ExecutionBlockHash,
proposer_index: u64,
) -> Result<ExecutionPayload<T>, Error> {
let _timer = metrics::start_timer_vec(
&metrics::EXECUTION_LAYER_REQUEST_TIMES,
&[metrics::GET_PAYLOAD],
);
let suggested_fee_recipient = self.get_suggested_fee_recipient(proposer_index).await;
debug!(
@@ -517,6 +565,10 @@ impl ExecutionLayer {
.await
{
// The payload id has been cached for this engine.
metrics::inc_counter_vec(
&metrics::EXECUTION_LAYER_PRE_PREPARED_PAYLOAD_ID,
&[metrics::HIT],
);
id
} else {
// The payload id has *not* been cached for this engine. Trigger an artificial
@@ -525,6 +577,10 @@ impl ExecutionLayer {
// TODO(merge): a better algorithm might try to favour a node that already had a
// cached payload id, since a payload that has had more time to produce is
// likely to be more profitable.
metrics::inc_counter_vec(
&metrics::EXECUTION_LAYER_PRE_PREPARED_PAYLOAD_ID,
&[metrics::MISS],
);
let fork_choice_state = ForkChoiceState {
head_block_hash: parent_hash,
safe_block_hash: parent_hash,
@@ -579,6 +635,11 @@ impl ExecutionLayer {
&self,
execution_payload: &ExecutionPayload<T>,
) -> Result<PayloadStatus, Error> {
let _timer = metrics::start_timer_vec(
&metrics::EXECUTION_LAYER_REQUEST_TIMES,
&[metrics::NEW_PAYLOAD],
);
trace!(
self.log(),
"Issuing engine_newPayload";
@@ -599,6 +660,64 @@ impl ExecutionLayer {
)
}
/// Register that the given `validator_index` is going to produce a block at `slot`.
///
/// The block will be built atop `head_block_root` and the EL will need to prepare an
/// `ExecutionPayload` as defined by the given `payload_attributes`.
pub async fn insert_proposer(
&self,
slot: Slot,
head_block_root: Hash256,
validator_index: u64,
payload_attributes: PayloadAttributes,
) -> bool {
let proposers_key = ProposerKey {
slot,
head_block_root,
};
let existing = self.proposers().write().await.insert(
proposers_key,
Proposer {
validator_index,
payload_attributes,
},
);
if existing.is_none() {
metrics::inc_counter(&metrics::EXECUTION_LAYER_PROPOSER_INSERTED);
}
existing.is_some()
}
/// If there has been a proposer registered via `Self::insert_proposer` with a matching `slot`
/// `head_block_root`, then return the appropriate `PayloadAttributes` for inclusion in
/// `forkchoiceUpdated` calls.
pub async fn payload_attributes(
&self,
current_slot: Slot,
head_block_root: Hash256,
) -> Option<PayloadAttributes> {
let proposers_key = ProposerKey {
slot: current_slot,
head_block_root,
};
let proposer = self.proposers().read().await.get(&proposers_key).cloned()?;
debug!(
self.log(),
"Beacon proposer found";
"payload_attributes" => ?proposer.payload_attributes,
"head_block_root" => ?head_block_root,
"slot" => current_slot,
"validator_index" => proposer.validator_index,
);
Some(proposer.payload_attributes)
}
/// Maps to the `engine_consensusValidated` JSON-RPC call.
///
/// ## Fallback Behaviour
@@ -616,8 +735,14 @@ impl ExecutionLayer {
&self,
head_block_hash: ExecutionBlockHash,
finalized_block_hash: ExecutionBlockHash,
payload_attributes: Option<PayloadAttributes>,
current_slot: Slot,
head_block_root: Hash256,
) -> Result<PayloadStatus, Error> {
let _timer = metrics::start_timer_vec(
&metrics::EXECUTION_LAYER_REQUEST_TIMES,
&[metrics::FORKCHOICE_UPDATED],
);
trace!(
self.log(),
"Issuing engine_forkchoiceUpdated";
@@ -625,6 +750,29 @@ impl ExecutionLayer {
"head_block_hash" => ?head_block_hash,
);
let next_slot = current_slot + 1;
let payload_attributes = self.payload_attributes(next_slot, head_block_root).await;
// Compute the "lookahead", the time between when the payload will be produced and now.
if let Some(payload_attributes) = payload_attributes {
if let Ok(now) = SystemTime::now().duration_since(UNIX_EPOCH) {
let timestamp = Duration::from_secs(payload_attributes.timestamp);
if let Some(lookahead) = timestamp.checked_sub(now) {
metrics::observe_duration(
&metrics::EXECUTION_LAYER_PAYLOAD_ATTRIBUTES_LOOKAHEAD,
lookahead,
);
} else {
debug!(
self.log(),
"Late payload attributes";
"timestamp" => ?timestamp,
"now" => ?now,
)
}
}
}
// see https://hackmd.io/@n0ble/kintsugi-spec#Engine-API
// for now, we must set safe_block_hash = head_block_hash
let forkchoice_state = ForkChoiceState {
@@ -725,6 +873,11 @@ impl ExecutionLayer {
&self,
spec: &ChainSpec,
) -> Result<Option<ExecutionBlockHash>, Error> {
let _timer = metrics::start_timer_vec(
&metrics::EXECUTION_LAYER_REQUEST_TIMES,
&[metrics::GET_TERMINAL_POW_BLOCK_HASH],
);
let hash_opt = self
.engines()
.first_success(|engine| async move {
@@ -836,6 +989,11 @@ impl ExecutionLayer {
block_hash: ExecutionBlockHash,
spec: &ChainSpec,
) -> Result<Option<bool>, Error> {
let _timer = metrics::start_timer_vec(
&metrics::EXECUTION_LAYER_REQUEST_TIMES,
&[metrics::IS_VALID_TERMINAL_POW_BLOCK_HASH],
);
let broadcast_results = self
.engines()
.broadcast(|engine| async move {

View File

@@ -0,0 +1,34 @@
pub use lighthouse_metrics::*;
pub const HIT: &str = "hit";
pub const MISS: &str = "miss";
pub const GET_PAYLOAD: &str = "get_payload";
pub const NEW_PAYLOAD: &str = "new_payload";
pub const FORKCHOICE_UPDATED: &str = "forkchoice_updated";
pub const GET_TERMINAL_POW_BLOCK_HASH: &str = "get_terminal_pow_block_hash";
pub const IS_VALID_TERMINAL_POW_BLOCK_HASH: &str = "is_valid_terminal_pow_block_hash";
lazy_static::lazy_static! {
pub static ref EXECUTION_LAYER_PROPOSER_INSERTED: Result<IntCounter> = try_create_int_counter(
"execution_layer_proposer_inserted",
"Count of times a new proposer is known",
);
pub static ref EXECUTION_LAYER_PROPOSER_DATA_UPDATED: Result<IntCounter> = try_create_int_counter(
"execution_layer_proposer_data_updated",
"Count of times new proposer data is supplied",
);
pub static ref EXECUTION_LAYER_REQUEST_TIMES: Result<HistogramVec> = try_create_histogram_vec(
"execution_layer_request_times",
"Duration of calls to ELs",
&["method"]
);
pub static ref EXECUTION_LAYER_PAYLOAD_ATTRIBUTES_LOOKAHEAD: Result<Histogram> = try_create_histogram(
"execution_layer_payload_attributes_lookahead",
"Duration between an fcU call with PayloadAttributes and when the block should be produced",
);
pub static ref EXECUTION_LAYER_PRE_PREPARED_PAYLOAD_ID: Result<IntCounterVec> = try_create_int_counter_vec(
"execution_layer_pre_prepared_payload_id",
"Indicates hits or misses for already having prepared a payload id before payload production",
&["event"]
);
}

View File

@@ -10,6 +10,8 @@ pub async fn handle_rpc<T: EthSpec>(
body: JsonValue,
ctx: Arc<Context<T>>,
) -> Result<JsonValue, String> {
*ctx.previous_request.lock() = Some(body.clone());
let method = body
.get("method")
.and_then(JsonValue::as_str)

View File

@@ -124,15 +124,29 @@ impl<T: EthSpec> MockExecutionLayer<T> {
let prev_randao = Hash256::from_low_u64_be(block_number);
let finalized_block_hash = parent_hash;
// Insert a proposer to ensure the fork choice updated command works.
let slot = Slot::new(0);
let head_block_root = Hash256::repeat_byte(42);
let validator_index = 0;
self.el
.insert_proposer(
slot,
head_block_root,
validator_index,
PayloadAttributes {
timestamp,
prev_randao,
suggested_fee_recipient: Address::repeat_byte(42),
},
)
.await;
self.el
.notify_forkchoice_updated(
parent_hash,
ExecutionBlockHash::zero(),
Some(PayloadAttributes {
timestamp,
prev_randao,
suggested_fee_recipient: Address::repeat_byte(42),
}),
slot,
head_block_root,
)
.await
.unwrap();
@@ -158,8 +172,16 @@ impl<T: EthSpec> MockExecutionLayer<T> {
let status = self.el.notify_new_payload(&payload).await.unwrap();
assert_eq!(status, PayloadStatus::Valid);
// Use junk values for slot/head-root to ensure there is no payload supplied.
let slot = Slot::new(0);
let head_block_root = Hash256::repeat_byte(13);
self.el
.notify_forkchoice_updated(block_hash, ExecutionBlockHash::zero(), None)
.notify_forkchoice_updated(
block_hash,
ExecutionBlockHash::zero(),
slot,
head_block_root,
)
.await
.unwrap();

View File

@@ -65,6 +65,7 @@ impl<T: EthSpec> MockServer<T> {
log: null_logger().unwrap(),
last_echo_request: last_echo_request.clone(),
execution_block_generator: RwLock::new(execution_block_generator),
previous_request: <_>::default(),
preloaded_responses,
static_new_payload_response: <_>::default(),
_phantom: PhantomData,
@@ -120,6 +121,10 @@ impl<T: EthSpec> MockServer<T> {
self.ctx.preloaded_responses.lock().push(response)
}
pub fn take_previous_request(&self) -> Option<serde_json::Value> {
self.ctx.previous_request.lock().take()
}
pub fn all_payloads_valid(&self) {
let response = StaticNewPayloadResponse {
status: PayloadStatusV1 {
@@ -241,6 +246,7 @@ pub struct Context<T: EthSpec> {
pub last_echo_request: Arc<RwLock<Option<Bytes>>>,
pub execution_block_generator: RwLock<ExecutionBlockGenerator<T>>,
pub preloaded_responses: Arc<Mutex<Vec<serde_json::Value>>>,
pub previous_request: Arc<Mutex<Option<serde_json::Value>>>,
pub static_new_payload_response: Arc<Mutex<Option<StaticNewPayloadResponse>>>,
pub _phantom: PhantomData<T>,
}