Merge remote-tracking branch 'origin/unstable' into tree-states

This commit is contained in:
Michael Sproul
2022-02-15 12:00:52 +11:00
72 changed files with 1891 additions and 621 deletions

View File

@@ -10,6 +10,7 @@ use lru::LruCache;
use sensitive_url::SensitiveUrl;
use slog::{crit, debug, error, info, Logger};
use slot_clock::SlotClock;
use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
@@ -18,7 +19,7 @@ use tokio::{
sync::{Mutex, MutexGuard},
time::{sleep, sleep_until, Instant},
};
use types::ChainSpec;
use types::{ChainSpec, Epoch, ProposerPreparationData};
pub use engine_api::{http::HttpJsonRpc, ExecutePayloadResponseStatus};
@@ -30,6 +31,16 @@ pub mod test_utils;
/// in an LRU cache to avoid redundant lookups. This is the size of that cache.
const EXECUTION_BLOCKS_LRU_CACHE_SIZE: usize = 128;
/// A fee recipient address for use during block production. Only used as a very last resort if
/// there is no address provided by the user.
///
/// ## Note
///
/// This is *not* the zero-address, since Geth has been known to return errors for a coinbase of
/// 0x00..00.
const DEFAULT_SUGGESTED_FEE_RECIPIENT: [u8; 20] =
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1];
#[derive(Debug)]
pub enum Error {
NoEngines,
@@ -46,9 +57,16 @@ impl From<ApiError> for Error {
}
}
#[derive(Clone)]
pub struct ProposerPreparationDataEntry {
update_epoch: Epoch,
preparation_data: ProposerPreparationData,
}
struct Inner {
engines: Engines<HttpJsonRpc>,
suggested_fee_recipient: Option<Address>,
proposer_preparation_data: Mutex<HashMap<u64, ProposerPreparationDataEntry>>,
execution_blocks: Mutex<LruCache<Hash256, ExecutionBlock>>,
executor: TaskExecutor,
log: Logger,
@@ -96,6 +114,7 @@ impl ExecutionLayer {
log: log.clone(),
},
suggested_fee_recipient,
proposer_preparation_data: Mutex::new(HashMap::new()),
execution_blocks: Mutex::new(LruCache::new(EXECUTION_BLOCKS_LRU_CACHE_SIZE)),
executor,
log,
@@ -116,17 +135,18 @@ impl ExecutionLayer {
&self.inner.executor
}
fn suggested_fee_recipient(&self) -> Result<Address, Error> {
self.inner
.suggested_fee_recipient
.ok_or(Error::FeeRecipientUnspecified)
}
/// Note: this function returns a mutex guard, be careful to avoid deadlocks.
async fn execution_blocks(&self) -> MutexGuard<'_, LruCache<Hash256, ExecutionBlock>> {
self.inner.execution_blocks.lock().await
}
/// Note: this function returns a mutex guard, be careful to avoid deadlocks.
async fn proposer_preparation_data(
&self,
) -> MutexGuard<'_, HashMap<u64, ProposerPreparationDataEntry>> {
self.inner.proposer_preparation_data.lock().await
}
fn log(&self) -> &Logger {
&self.inner.log
}
@@ -234,11 +254,124 @@ impl ExecutionLayer {
self.engines().upcheck_not_synced(Logging::Disabled).await;
}
/// Spawns a routine which cleans the cached proposer preparations periodically.
pub fn spawn_clean_proposer_preparation_routine<S: SlotClock + 'static, T: EthSpec>(
&self,
slot_clock: S,
) {
let preparation_cleaner = |el: ExecutionLayer| async move {
// Start the loop to periodically clean proposer preparation cache.
loop {
if let Some(duration_to_next_epoch) =
slot_clock.duration_to_next_epoch(T::slots_per_epoch())
{
// Wait for next epoch
sleep(duration_to_next_epoch).await;
match slot_clock
.now()
.map(|slot| slot.epoch(T::slots_per_epoch()))
{
Some(current_epoch) => el
.clean_proposer_preparation(current_epoch)
.await
.map_err(|e| {
error!(
el.log(),
"Failed to clean proposer preparation cache";
"error" => format!("{:?}", e)
)
})
.unwrap_or(()),
None => error!(el.log(), "Failed to get current epoch from slot clock"),
}
} else {
error!(el.log(), "Failed to read slot clock");
// If we can't read the slot clock, just wait another slot and retry.
sleep(slot_clock.slot_duration()).await;
}
}
};
self.spawn(preparation_cleaner, "exec_preparation_cleanup");
}
/// Returns `true` if there is at least one synced and reachable engine.
pub async fn is_synced(&self) -> bool {
self.engines().any_synced().await
}
/// Updates the proposer preparation data provided by validators
pub fn update_proposer_preparation_blocking(
&self,
update_epoch: Epoch,
preparation_data: &[ProposerPreparationData],
) -> Result<(), Error> {
self.block_on_generic(|_| async move {
self.update_proposer_preparation(update_epoch, preparation_data)
.await
})?
}
/// Updates the proposer preparation data provided by validators
async fn update_proposer_preparation(
&self,
update_epoch: Epoch,
preparation_data: &[ProposerPreparationData],
) -> Result<(), Error> {
let mut proposer_preparation_data = self.proposer_preparation_data().await;
for preparation_entry in preparation_data {
proposer_preparation_data.insert(
preparation_entry.validator_index,
ProposerPreparationDataEntry {
update_epoch,
preparation_data: preparation_entry.clone(),
},
);
}
Ok(())
}
/// Removes expired entries from cached proposer preparations
async fn clean_proposer_preparation(&self, current_epoch: Epoch) -> Result<(), Error> {
let mut proposer_preparation_data = self.proposer_preparation_data().await;
// Keep all entries that have been updated in the last 2 epochs
let retain_epoch = current_epoch.saturating_sub(Epoch::new(2));
proposer_preparation_data.retain(|_validator_index, preparation_entry| {
preparation_entry.update_epoch >= retain_epoch
});
Ok(())
}
/// Returns the fee-recipient address that should be used to build a block
async fn get_suggested_fee_recipient(&self, proposer_index: u64) -> Address {
if let Some(preparation_data_entry) =
self.proposer_preparation_data().await.get(&proposer_index)
{
// The values provided via the API have first priority.
preparation_data_entry.preparation_data.fee_recipient
} else if let Some(address) = self.inner.suggested_fee_recipient {
// If there has been no fee recipient provided via the API, but the BN has been provided
// with a global default address, use that.
address
} else {
// If there is no user-provided fee recipient, use a junk value and complain loudly.
crit!(
self.log(),
"Fee recipient unknown";
"msg" => "the suggested_fee_recipient was unknown during block production. \
a junk address was used, rewards were lost! \
check the --suggested-fee-recipient flag and VC configuration.",
"proposer_index" => ?proposer_index
);
Address::from_slice(&DEFAULT_SUGGESTED_FEE_RECIPIENT)
}
}
/// Maps to the `engine_getPayload` JSON-RPC call.
///
/// However, it will attempt to call `self.prepare_payload` if it cannot find an existing
@@ -254,8 +387,10 @@ impl ExecutionLayer {
timestamp: u64,
random: Hash256,
finalized_block_hash: Hash256,
proposer_index: u64,
) -> Result<ExecutionPayload<T>, Error> {
let suggested_fee_recipient = self.suggested_fee_recipient()?;
let suggested_fee_recipient = self.get_suggested_fee_recipient(proposer_index).await;
debug!(
self.log(),
"Issuing engine_getPayload";

View File

@@ -127,9 +127,16 @@ impl<T: EthSpec> MockExecutionLayer<T> {
.await
.unwrap();
let validator_index = 0;
let payload = self
.el
.get_payload::<T>(parent_hash, timestamp, random, finalized_block_hash)
.get_payload::<T>(
parent_hash,
timestamp,
random,
finalized_block_hash,
validator_index,
)
.await
.unwrap();
let block_hash = payload.block_hash;