mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-21 14:58:31 +00:00
Following the release of Rust v1.94.0 there are new Clippy lints which do not pass and are blocking CI (which pulls in the latest version of Rust) This is pretty much the minimum just to get CI running again. Most of the errors involve error types being too large. For now I've added allows but later it might be worth doing a refactor to `Box` or otherwise remove the problematic error types. Co-Authored-By: Mac L <mjladson@pm.me>
2166 lines
79 KiB
Rust
2166 lines
79 KiB
Rust
//! This crate provides an abstraction over one or more *execution engines*. An execution engine
|
|
//! was formerly known as an "eth1 node", like Geth, Nethermind, Erigon, etc.
|
|
//!
|
|
//! This crate only provides useful functionality for "The Merge", it does not provide any of the
|
|
//! deposit-contract functionality that the `beacon_node/eth1` crate already provides.
|
|
|
|
use crate::json_structures::{BlobAndProofV1, BlobAndProofV2};
|
|
use crate::payload_cache::PayloadCache;
|
|
use arc_swap::ArcSwapOption;
|
|
use auth::{Auth, JwtKey, strip_prefix};
|
|
pub use block_hash::calculate_execution_block_hash;
|
|
use bls::{PublicKeyBytes, Signature};
|
|
use builder_client::BuilderHttpClient;
|
|
pub use engine_api::EngineCapabilities;
|
|
use engine_api::Error as ApiError;
|
|
pub use engine_api::*;
|
|
pub use engine_api::{http, http::HttpJsonRpc, http::deposit_methods};
|
|
use engines::{Engine, EngineError};
|
|
pub use engines::{EngineState, ForkchoiceState};
|
|
use eth2::types::{BlobsBundle, FullPayloadContents};
|
|
use eth2::types::{ForkVersionedResponse, builder::SignedBuilderBid};
|
|
use fixed_bytes::UintExtended;
|
|
use fork_choice::ForkchoiceUpdateParameters;
|
|
use logging::crit;
|
|
pub use payload_status::PayloadStatus;
|
|
use payload_status::process_payload_status;
|
|
use sensitive_url::SensitiveUrl;
|
|
use serde::{Deserialize, Serialize};
|
|
use slot_clock::SlotClock;
|
|
use std::collections::{HashMap, hash_map::Entry};
|
|
use std::fmt;
|
|
use std::future::Future;
|
|
use std::io::Write;
|
|
use std::path::PathBuf;
|
|
use std::sync::Arc;
|
|
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
|
use strum::AsRefStr;
|
|
use task_executor::TaskExecutor;
|
|
use tokio::{
|
|
sync::{Mutex, MutexGuard, RwLock},
|
|
time::sleep,
|
|
};
|
|
use tokio_stream::wrappers::WatchStream;
|
|
use tracing::{Instrument, debug, debug_span, error, info, instrument, warn};
|
|
use tree_hash::TreeHash;
|
|
use types::ExecutionPayloadGloas;
|
|
use types::builder::BuilderBid;
|
|
use types::execution::BlockProductionVersion;
|
|
use types::kzg_ext::KzgCommitments;
|
|
use types::{
|
|
AbstractExecPayload, BlobsList, ExecutionPayloadDeneb, ExecutionRequests, KzgProofs,
|
|
SignedBlindedBeaconBlock,
|
|
};
|
|
use types::{
|
|
BeaconStateError, BlindedPayload, ChainSpec, Epoch, ExecPayload, ExecutionPayloadBellatrix,
|
|
ExecutionPayloadCapella, ExecutionPayloadElectra, ExecutionPayloadFulu, FullPayload,
|
|
ProposerPreparationData, Slot,
|
|
};
|
|
|
|
mod block_hash;
|
|
mod engine_api;
|
|
pub mod engines;
|
|
mod keccak;
|
|
mod metrics;
|
|
pub mod payload_cache;
|
|
mod payload_status;
|
|
pub mod test_utils;
|
|
pub mod versioned_hashes;
|
|
|
|
/// Indicates the default jwt authenticated execution endpoint.
|
|
pub const DEFAULT_EXECUTION_ENDPOINT: &str = "http://localhost:8551/";
|
|
|
|
/// Name for the default file used for the jwt secret.
|
|
pub const DEFAULT_JWT_FILE: &str = "jwt.hex";
|
|
|
|
/// A fee recipient address for use during block production. Only used as a very last resort if
|
|
/// there is no address provided by the user.
|
|
///
|
|
/// ## Note
|
|
///
|
|
/// This is *not* the zero-address, since Geth has been known to return errors for a coinbase of
|
|
/// 0x00..00.
|
|
const DEFAULT_SUGGESTED_FEE_RECIPIENT: [u8; 20] =
|
|
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1];
|
|
|
|
/// A payload alongside some information about where it came from.
|
|
pub enum ProvenancedPayload<P> {
|
|
/// A good old fashioned farm-to-table payload from your local EE.
|
|
Local(P),
|
|
/// A payload from a builder (e.g. mev-boost).
|
|
Builder(P),
|
|
}
|
|
|
|
impl<E: EthSpec> TryFrom<BuilderBid<E>> for ProvenancedPayload<BlockProposalContentsType<E>> {
|
|
type Error = Error;
|
|
|
|
fn try_from(value: BuilderBid<E>) -> Result<Self, Error> {
|
|
let block_proposal_contents = match value {
|
|
BuilderBid::Bellatrix(builder_bid) => BlockProposalContents::Payload {
|
|
payload: ExecutionPayloadHeader::Bellatrix(builder_bid.header).into(),
|
|
block_value: builder_bid.value,
|
|
},
|
|
BuilderBid::Capella(builder_bid) => BlockProposalContents::Payload {
|
|
payload: ExecutionPayloadHeader::Capella(builder_bid.header).into(),
|
|
block_value: builder_bid.value,
|
|
},
|
|
BuilderBid::Deneb(builder_bid) => BlockProposalContents::PayloadAndBlobs {
|
|
payload: ExecutionPayloadHeader::Deneb(builder_bid.header).into(),
|
|
block_value: builder_bid.value,
|
|
kzg_commitments: builder_bid.blob_kzg_commitments,
|
|
blobs_and_proofs: None,
|
|
requests: None,
|
|
},
|
|
BuilderBid::Electra(builder_bid) => BlockProposalContents::PayloadAndBlobs {
|
|
payload: ExecutionPayloadHeader::Electra(builder_bid.header).into(),
|
|
block_value: builder_bid.value,
|
|
kzg_commitments: builder_bid.blob_kzg_commitments,
|
|
blobs_and_proofs: None,
|
|
requests: Some(builder_bid.execution_requests),
|
|
},
|
|
BuilderBid::Fulu(builder_bid) => BlockProposalContents::PayloadAndBlobs {
|
|
payload: ExecutionPayloadHeader::Fulu(builder_bid.header).into(),
|
|
block_value: builder_bid.value,
|
|
kzg_commitments: builder_bid.blob_kzg_commitments,
|
|
blobs_and_proofs: None,
|
|
requests: Some(builder_bid.execution_requests),
|
|
},
|
|
};
|
|
Ok(ProvenancedPayload::Builder(
|
|
BlockProposalContentsType::Blinded(block_proposal_contents),
|
|
))
|
|
}
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub enum Error {
|
|
NoEngine,
|
|
NoPayloadBuilder,
|
|
ApiError(ApiError),
|
|
Builder(builder_client::Error),
|
|
NoHeaderFromBuilder,
|
|
CannotProduceHeader,
|
|
EngineError(Box<EngineError>),
|
|
NotSynced,
|
|
ShuttingDown,
|
|
FeeRecipientUnspecified,
|
|
MissingLatestValidHash,
|
|
BlockHashMismatch {
|
|
computed: ExecutionBlockHash,
|
|
payload: ExecutionBlockHash,
|
|
transactions_root: Hash256,
|
|
},
|
|
ZeroLengthTransaction,
|
|
PayloadBodiesByRangeNotSupported,
|
|
GetBlobsNotSupported,
|
|
InvalidJWTSecret(String),
|
|
InvalidForkForPayload,
|
|
InvalidPayloadBody(String),
|
|
InvalidPayloadConversion,
|
|
InvalidBlobConversion(String),
|
|
SszTypesError(ssz_types::Error),
|
|
BeaconStateError(BeaconStateError),
|
|
PayloadTypeMismatch,
|
|
VerifyingVersionedHashes(versioned_hashes::Error),
|
|
Unexpected(String),
|
|
}
|
|
|
|
impl From<ssz_types::Error> for Error {
|
|
fn from(e: ssz_types::Error) -> Self {
|
|
Error::SszTypesError(e)
|
|
}
|
|
}
|
|
|
|
impl From<BeaconStateError> for Error {
|
|
fn from(e: BeaconStateError) -> Self {
|
|
Error::BeaconStateError(e)
|
|
}
|
|
}
|
|
|
|
impl From<ApiError> for Error {
|
|
fn from(e: ApiError) -> Self {
|
|
Error::ApiError(e)
|
|
}
|
|
}
|
|
|
|
impl From<EngineError> for Error {
|
|
fn from(e: EngineError) -> Self {
|
|
match e {
|
|
// This removes an unnecessary layer of indirection.
|
|
// TODO (mark): consider refactoring these error enums
|
|
EngineError::Api { error } => Error::ApiError(error),
|
|
_ => Error::EngineError(Box::new(e)),
|
|
}
|
|
}
|
|
}
|
|
|
|
pub enum BlockProposalContentsType<E: EthSpec> {
|
|
Full(BlockProposalContents<E, FullPayload<E>>),
|
|
Blinded(BlockProposalContents<E, BlindedPayload<E>>),
|
|
}
|
|
|
|
pub struct BlockProposalContentsGloas<E: EthSpec> {
|
|
pub payload: ExecutionPayloadGloas<E>,
|
|
pub payload_value: Uint256,
|
|
pub blob_kzg_commitments: KzgCommitments<E>,
|
|
pub blobs_and_proofs: (BlobsList<E>, KzgProofs<E>),
|
|
pub execution_requests: ExecutionRequests<E>,
|
|
}
|
|
|
|
impl<E: EthSpec> From<GetPayloadResponseGloas<E>> for BlockProposalContentsGloas<E> {
|
|
fn from(response: GetPayloadResponseGloas<E>) -> Self {
|
|
Self {
|
|
payload: response.execution_payload,
|
|
payload_value: response.block_value,
|
|
blob_kzg_commitments: response.blobs_bundle.commitments,
|
|
blobs_and_proofs: (response.blobs_bundle.blobs, response.blobs_bundle.proofs),
|
|
execution_requests: response.requests,
|
|
}
|
|
}
|
|
}
|
|
|
|
pub enum BlockProposalContents<E: EthSpec, Payload: AbstractExecPayload<E>> {
|
|
Payload {
|
|
payload: Payload,
|
|
block_value: Uint256,
|
|
},
|
|
PayloadAndBlobs {
|
|
payload: Payload,
|
|
block_value: Uint256,
|
|
kzg_commitments: KzgCommitments<E>,
|
|
/// `None` for blinded `PayloadAndBlobs`.
|
|
blobs_and_proofs: Option<(BlobsList<E>, KzgProofs<E>)>,
|
|
// TODO(electra): this should probably be a separate variant/superstruct
|
|
// See: https://github.com/sigp/lighthouse/issues/6981
|
|
requests: Option<ExecutionRequests<E>>,
|
|
},
|
|
}
|
|
|
|
impl<E: EthSpec> From<BlockProposalContents<E, FullPayload<E>>>
|
|
for BlockProposalContents<E, BlindedPayload<E>>
|
|
{
|
|
fn from(item: BlockProposalContents<E, FullPayload<E>>) -> Self {
|
|
match item {
|
|
BlockProposalContents::Payload {
|
|
payload,
|
|
block_value,
|
|
} => BlockProposalContents::Payload {
|
|
payload: payload.execution_payload().into(),
|
|
block_value,
|
|
},
|
|
BlockProposalContents::PayloadAndBlobs {
|
|
payload,
|
|
block_value,
|
|
kzg_commitments,
|
|
blobs_and_proofs: _,
|
|
requests,
|
|
} => BlockProposalContents::PayloadAndBlobs {
|
|
payload: payload.execution_payload().into(),
|
|
block_value,
|
|
kzg_commitments,
|
|
blobs_and_proofs: None,
|
|
requests,
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<E: EthSpec, Payload: AbstractExecPayload<E>> TryFrom<GetPayloadResponse<E>>
|
|
for BlockProposalContents<E, Payload>
|
|
{
|
|
type Error = Error;
|
|
|
|
fn try_from(response: GetPayloadResponse<E>) -> Result<Self, Error> {
|
|
let (execution_payload, block_value, maybe_bundle, maybe_requests) = response.into();
|
|
match maybe_bundle {
|
|
Some(bundle) => Ok(Self::PayloadAndBlobs {
|
|
payload: execution_payload.into(),
|
|
block_value,
|
|
kzg_commitments: bundle.commitments,
|
|
blobs_and_proofs: Some((bundle.blobs, bundle.proofs)),
|
|
requests: maybe_requests,
|
|
}),
|
|
None => Ok(Self::Payload {
|
|
payload: execution_payload.into(),
|
|
block_value,
|
|
}),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<E: EthSpec> TryFrom<GetPayloadResponseType<E>> for BlockProposalContentsType<E> {
|
|
type Error = Error;
|
|
|
|
fn try_from(response_type: GetPayloadResponseType<E>) -> Result<Self, Error> {
|
|
match response_type {
|
|
GetPayloadResponseType::Full(response) => Ok(Self::Full(response.try_into()?)),
|
|
GetPayloadResponseType::Blinded(response) => Ok(Self::Blinded(response.try_into()?)),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[allow(clippy::type_complexity)]
|
|
impl<E: EthSpec, Payload: AbstractExecPayload<E>> BlockProposalContents<E, Payload> {
|
|
pub fn deconstruct(
|
|
self,
|
|
) -> (
|
|
Payload,
|
|
Option<KzgCommitments<E>>,
|
|
Option<(BlobsList<E>, KzgProofs<E>)>,
|
|
Option<ExecutionRequests<E>>,
|
|
Uint256,
|
|
) {
|
|
match self {
|
|
Self::Payload {
|
|
payload,
|
|
block_value,
|
|
} => (payload, None, None, None, block_value),
|
|
Self::PayloadAndBlobs {
|
|
payload,
|
|
block_value,
|
|
kzg_commitments,
|
|
blobs_and_proofs,
|
|
requests,
|
|
} => (
|
|
payload,
|
|
Some(kzg_commitments),
|
|
blobs_and_proofs,
|
|
requests,
|
|
block_value,
|
|
),
|
|
}
|
|
}
|
|
|
|
pub fn payload(&self) -> &Payload {
|
|
match self {
|
|
Self::Payload { payload, .. } => payload,
|
|
Self::PayloadAndBlobs { payload, .. } => payload,
|
|
}
|
|
}
|
|
pub fn to_payload(self) -> Payload {
|
|
match self {
|
|
Self::Payload { payload, .. } => payload,
|
|
Self::PayloadAndBlobs { payload, .. } => payload,
|
|
}
|
|
}
|
|
pub fn block_value(&self) -> &Uint256 {
|
|
match self {
|
|
Self::Payload { block_value, .. } => block_value,
|
|
Self::PayloadAndBlobs { block_value, .. } => block_value,
|
|
}
|
|
}
|
|
}
|
|
|
|
// This just groups together a bunch of parameters that commonly
|
|
// get passed around together in calls to get_payload.
|
|
#[derive(Clone, Copy, Debug)]
|
|
pub struct PayloadParameters<'a> {
|
|
pub parent_hash: ExecutionBlockHash,
|
|
pub parent_gas_limit: u64,
|
|
pub proposer_gas_limit: Option<u64>,
|
|
pub payload_attributes: &'a PayloadAttributes,
|
|
pub forkchoice_update_params: &'a ForkchoiceUpdateParameters,
|
|
pub current_fork: ForkName,
|
|
}
|
|
|
|
#[derive(Clone, PartialEq)]
|
|
pub struct ProposerPreparationDataEntry {
|
|
update_epoch: Epoch,
|
|
preparation_data: ProposerPreparationData,
|
|
gas_limit: Option<u64>,
|
|
}
|
|
|
|
impl ProposerPreparationDataEntry {
|
|
pub fn update(&mut self, updated: Self) -> bool {
|
|
let mut changed = false;
|
|
// Update `gas_limit` if `updated.gas_limit` is `Some` and:
|
|
// - `self.gas_limit` is `None`, or
|
|
// - both are `Some` but the values differ.
|
|
if let Some(updated_gas_limit) = updated.gas_limit
|
|
&& self.gas_limit != Some(updated_gas_limit)
|
|
{
|
|
self.gas_limit = Some(updated_gas_limit);
|
|
changed = true;
|
|
}
|
|
|
|
// Update `update_epoch` if it differs
|
|
if self.update_epoch != updated.update_epoch {
|
|
self.update_epoch = updated.update_epoch;
|
|
changed = true;
|
|
}
|
|
|
|
// Update `preparation_data` if it differs
|
|
if self.preparation_data != updated.preparation_data {
|
|
self.preparation_data = updated.preparation_data;
|
|
changed = true;
|
|
}
|
|
|
|
changed
|
|
}
|
|
}
|
|
|
|
#[derive(Hash, PartialEq, Eq)]
|
|
pub struct ProposerKey {
|
|
slot: Slot,
|
|
head_block_root: Hash256,
|
|
}
|
|
|
|
#[derive(PartialEq, Clone)]
|
|
pub struct Proposer {
|
|
validator_index: u64,
|
|
payload_attributes: PayloadAttributes,
|
|
}
|
|
|
|
/// Information from the beacon chain that is necessary for querying the builder API.
|
|
pub struct BuilderParams {
|
|
pub pubkey: PublicKeyBytes,
|
|
pub slot: Slot,
|
|
pub chain_health: ChainHealth,
|
|
}
|
|
|
|
#[derive(PartialEq)]
|
|
pub enum ChainHealth {
|
|
Healthy,
|
|
Unhealthy(FailedCondition),
|
|
Optimistic,
|
|
PreMerge,
|
|
}
|
|
|
|
#[derive(Debug, PartialEq)]
|
|
pub enum FailedCondition {
|
|
Skips,
|
|
SkipsPerEpoch,
|
|
EpochsSinceFinalization,
|
|
}
|
|
|
|
pub enum SubmitBlindedBlockResponse<E: EthSpec> {
|
|
V1(Box<FullPayloadContents<E>>),
|
|
V2,
|
|
}
|
|
|
|
type PayloadContentsRefTuple<'a, E> = (ExecutionPayloadRef<'a, E>, Option<&'a BlobsBundle<E>>);
|
|
|
|
struct Inner<E: EthSpec> {
|
|
engine: Arc<Engine>,
|
|
builder: ArcSwapOption<BuilderHttpClient>,
|
|
execution_engine_forkchoice_lock: Mutex<()>,
|
|
suggested_fee_recipient: Option<Address>,
|
|
proposer_preparation_data: Mutex<HashMap<u64, ProposerPreparationDataEntry>>,
|
|
proposers: RwLock<HashMap<ProposerKey, Proposer>>,
|
|
executor: TaskExecutor,
|
|
payload_cache: PayloadCache<E>,
|
|
/// Track whether the last `newPayload` call errored.
|
|
///
|
|
/// This is used *only* in the informational sync status endpoint, so that a VC using this
|
|
/// node can prefer another node with a healthier EL.
|
|
last_new_payload_errored: RwLock<bool>,
|
|
}
|
|
|
|
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
|
pub struct Config {
|
|
/// Endpoint url for EL nodes that are running the engine api.
|
|
pub execution_endpoint: Option<SensitiveUrl>,
|
|
/// Endpoint urls for services providing the builder api.
|
|
pub builder_url: Option<SensitiveUrl>,
|
|
/// The timeout value used when making a request to fetch a block header
|
|
/// from the builder api.
|
|
pub builder_header_timeout: Option<Duration>,
|
|
/// User agent to send with requests to the builder API.
|
|
pub builder_user_agent: Option<String>,
|
|
/// Disable ssz requests on builder. Only use json.
|
|
pub disable_builder_ssz_requests: bool,
|
|
/// JWT secret for the above endpoint running the engine api.
|
|
pub secret_file: Option<PathBuf>,
|
|
/// The default fee recipient to use on the beacon node if none if provided from
|
|
/// the validator client during block preparation.
|
|
pub suggested_fee_recipient: Option<Address>,
|
|
/// An optional id for the beacon node that will be passed to the EL in the JWT token claim.
|
|
pub jwt_id: Option<String>,
|
|
/// An optional client version for the beacon node that will be passed to the EL in the JWT token claim.
|
|
pub jwt_version: Option<String>,
|
|
/// Default directory for the jwt secret if not provided through cli.
|
|
pub default_datadir: PathBuf,
|
|
pub execution_timeout_multiplier: Option<u32>,
|
|
}
|
|
|
|
/// Provides access to one execution engine and provides a neat interface for consumption by the
|
|
/// `BeaconChain`.
|
|
#[derive(Clone)]
|
|
pub struct ExecutionLayer<E: EthSpec> {
|
|
inner: Arc<Inner<E>>,
|
|
}
|
|
|
|
impl<E: EthSpec> ExecutionLayer<E> {
|
|
/// Instantiate `Self` with an Execution engine specified in `Config`, using JSON-RPC via HTTP.
|
|
pub fn from_config(config: Config, executor: TaskExecutor) -> Result<Self, Error> {
|
|
let Config {
|
|
execution_endpoint: url,
|
|
builder_url,
|
|
builder_user_agent,
|
|
builder_header_timeout,
|
|
disable_builder_ssz_requests,
|
|
secret_file,
|
|
suggested_fee_recipient,
|
|
jwt_id,
|
|
jwt_version,
|
|
default_datadir,
|
|
execution_timeout_multiplier,
|
|
} = config;
|
|
|
|
let execution_url = url.ok_or(Error::NoEngine)?;
|
|
|
|
// Use the default jwt secret path if not provided via cli.
|
|
let secret_file = secret_file.unwrap_or_else(|| default_datadir.join(DEFAULT_JWT_FILE));
|
|
|
|
let jwt_key = if secret_file.exists() {
|
|
// Read secret from file if it already exists
|
|
std::fs::read_to_string(&secret_file)
|
|
.map_err(|e| format!("Failed to read JWT secret file. Error: {:?}", e))
|
|
.and_then(|ref s| {
|
|
let secret = JwtKey::from_slice(
|
|
&hex::decode(strip_prefix(s.trim_end()))
|
|
.map_err(|e| format!("Invalid hex string: {:?}", e))?,
|
|
)?;
|
|
Ok(secret)
|
|
})
|
|
.map_err(Error::InvalidJWTSecret)
|
|
} else {
|
|
// Create a new file and write a randomly generated secret to it if file does not exist
|
|
warn!(path = %secret_file.display(),"No JWT found on disk. Generating");
|
|
std::fs::File::options()
|
|
.write(true)
|
|
.create_new(true)
|
|
.open(&secret_file)
|
|
.map_err(|e| format!("Failed to open JWT secret file. Error: {:?}", e))
|
|
.and_then(|mut f| {
|
|
let secret = auth::JwtKey::random();
|
|
f.write_all(secret.hex_string().as_bytes())
|
|
.map_err(|e| format!("Failed to write to JWT secret file: {:?}", e))?;
|
|
Ok(secret)
|
|
})
|
|
.map_err(Error::InvalidJWTSecret)
|
|
}?;
|
|
|
|
let engine: Engine = {
|
|
let auth = Auth::new(jwt_key, jwt_id, jwt_version);
|
|
debug!(endpoint = %execution_url, jwt_path = ?secret_file.as_path(),"Loaded execution endpoint");
|
|
let api = HttpJsonRpc::new_with_auth(execution_url, auth, execution_timeout_multiplier)
|
|
.map_err(Error::ApiError)?;
|
|
Engine::new(api, executor.clone())
|
|
};
|
|
|
|
let inner = Inner {
|
|
engine: Arc::new(engine),
|
|
builder: ArcSwapOption::empty(),
|
|
execution_engine_forkchoice_lock: <_>::default(),
|
|
suggested_fee_recipient,
|
|
proposer_preparation_data: Mutex::new(HashMap::new()),
|
|
proposers: RwLock::new(HashMap::new()),
|
|
executor,
|
|
payload_cache: PayloadCache::default(),
|
|
last_new_payload_errored: RwLock::new(false),
|
|
};
|
|
|
|
let el = Self {
|
|
inner: Arc::new(inner),
|
|
};
|
|
|
|
if let Some(builder_url) = builder_url {
|
|
el.set_builder_url(
|
|
builder_url,
|
|
builder_user_agent,
|
|
builder_header_timeout,
|
|
disable_builder_ssz_requests,
|
|
)?;
|
|
}
|
|
|
|
Ok(el)
|
|
}
|
|
|
|
fn engine(&self) -> &Arc<Engine> {
|
|
&self.inner.engine
|
|
}
|
|
|
|
pub fn builder(&self) -> Option<Arc<BuilderHttpClient>> {
|
|
self.inner.builder.load_full()
|
|
}
|
|
|
|
/// Set the builder URL after initialization.
|
|
///
|
|
/// This is useful for breaking circular dependencies between mock ELs and mock builders in
|
|
/// tests.
|
|
pub fn set_builder_url(
|
|
&self,
|
|
builder_url: SensitiveUrl,
|
|
builder_user_agent: Option<String>,
|
|
builder_header_timeout: Option<Duration>,
|
|
disable_ssz: bool,
|
|
) -> Result<(), Error> {
|
|
let builder_client = BuilderHttpClient::new(
|
|
builder_url.clone(),
|
|
builder_user_agent,
|
|
builder_header_timeout,
|
|
disable_ssz,
|
|
)
|
|
.map_err(Error::Builder)?;
|
|
info!(
|
|
?builder_url,
|
|
local_user_agent = builder_client.get_user_agent(),
|
|
ssz_disabled = disable_ssz,
|
|
"Using external block builder"
|
|
);
|
|
self.inner.builder.swap(Some(Arc::new(builder_client)));
|
|
Ok(())
|
|
}
|
|
|
|
/// Cache a full payload, keyed on the `tree_hash_root` of the payload
|
|
fn cache_payload(
|
|
&self,
|
|
payload_and_blobs: PayloadContentsRefTuple<E>,
|
|
) -> Option<FullPayloadContents<E>> {
|
|
let (payload_ref, maybe_json_blobs_bundle) = payload_and_blobs;
|
|
|
|
let payload = payload_ref.clone_from_ref();
|
|
let maybe_blobs_bundle = maybe_json_blobs_bundle.cloned();
|
|
|
|
self.inner
|
|
.payload_cache
|
|
.put(FullPayloadContents::new(payload, maybe_blobs_bundle))
|
|
}
|
|
|
|
/// Attempt to retrieve a full payload from the payload cache by the payload root
|
|
pub fn get_payload_by_root(&self, root: &Hash256) -> Option<FullPayloadContents<E>> {
|
|
self.inner.payload_cache.get(root)
|
|
}
|
|
|
|
pub fn executor(&self) -> &TaskExecutor {
|
|
&self.inner.executor
|
|
}
|
|
|
|
/// Get the current difficulty of the PoW chain.
|
|
pub async fn get_current_difficulty(&self) -> Result<Option<Uint256>, ApiError> {
|
|
let block = self
|
|
.engine()
|
|
.api
|
|
.get_block_by_number(BlockByNumberQuery::Tag(LATEST_TAG))
|
|
.await?
|
|
.ok_or(ApiError::ExecutionHeadBlockNotFound)?;
|
|
Ok(block.total_difficulty)
|
|
}
|
|
|
|
/// Gives access to a channel containing if the last engine state is online or not.
|
|
///
|
|
/// This can be called several times.
|
|
pub async fn get_responsiveness_watch(&self) -> WatchStream<EngineState> {
|
|
self.engine().watch_state().await
|
|
}
|
|
|
|
/// Note: this function returns a mutex guard, be careful to avoid deadlocks.
|
|
async fn proposer_preparation_data(
|
|
&self,
|
|
) -> MutexGuard<'_, HashMap<u64, ProposerPreparationDataEntry>> {
|
|
self.inner.proposer_preparation_data.lock().await
|
|
}
|
|
|
|
fn proposers(&self) -> &RwLock<HashMap<ProposerKey, Proposer>> {
|
|
&self.inner.proposers
|
|
}
|
|
|
|
pub async fn execution_engine_forkchoice_lock(&self) -> MutexGuard<'_, ()> {
|
|
self.inner.execution_engine_forkchoice_lock.lock().await
|
|
}
|
|
|
|
/// Convenience function to allow spawning a task without waiting for the result.
|
|
pub fn spawn<F, U>(&self, generate_future: F, name: &'static str)
|
|
where
|
|
F: FnOnce(Self) -> U,
|
|
U: Future<Output = ()> + Send + 'static,
|
|
{
|
|
self.executor().spawn(generate_future(self.clone()), name);
|
|
}
|
|
|
|
/// Spawns a routine which attempts to keep the execution engine online.
|
|
pub fn spawn_watchdog_routine<S: SlotClock + 'static>(&self, slot_clock: S) {
|
|
let watchdog = |el: ExecutionLayer<E>| async move {
|
|
// Run one task immediately.
|
|
el.watchdog_task().await;
|
|
|
|
// Start the loop to periodically update.
|
|
loop {
|
|
el.spawn(
|
|
|el| async move { el.watchdog_task().await },
|
|
"exec_watchdog_task",
|
|
);
|
|
sleep(slot_clock.slot_duration()).await;
|
|
}
|
|
};
|
|
|
|
self.spawn(watchdog, "exec_watchdog");
|
|
}
|
|
|
|
/// Performs a single execution of the watchdog routine.
|
|
pub async fn watchdog_task(&self) {
|
|
self.engine().upcheck().await;
|
|
}
|
|
|
|
/// Spawns a routine which cleans the cached proposer data periodically.
|
|
pub fn spawn_clean_proposer_caches_routine<S: SlotClock + 'static>(&self, slot_clock: S) {
|
|
let preparation_cleaner = |el: ExecutionLayer<E>| async move {
|
|
// Start the loop to periodically clean proposer preparation cache.
|
|
loop {
|
|
if let Some(duration_to_next_epoch) =
|
|
slot_clock.duration_to_next_epoch(E::slots_per_epoch())
|
|
{
|
|
// Wait for next epoch
|
|
sleep(duration_to_next_epoch).await;
|
|
|
|
match slot_clock
|
|
.now()
|
|
.map(|slot| slot.epoch(E::slots_per_epoch()))
|
|
{
|
|
Some(current_epoch) => el
|
|
.clean_proposer_caches(current_epoch)
|
|
.await
|
|
.map_err(|e| {
|
|
error!(
|
|
error = ?e,
|
|
"Failed to clean proposer preparation cache"
|
|
)
|
|
})
|
|
.unwrap_or(()),
|
|
None => error!("Failed to get current epoch from slot clock"),
|
|
}
|
|
} else {
|
|
error!("Failed to read slot clock");
|
|
// If we can't read the slot clock, just wait another slot and retry.
|
|
sleep(slot_clock.slot_duration()).await;
|
|
}
|
|
}
|
|
};
|
|
|
|
self.spawn(preparation_cleaner, "exec_preparation_cleanup");
|
|
}
|
|
|
|
/// Returns `true` if the execution engine is synced and reachable.
|
|
pub async fn is_synced(&self) -> bool {
|
|
self.engine().is_synced().await
|
|
}
|
|
|
|
/// Execution nodes return a "SYNCED" response when they do not have any peers.
|
|
///
|
|
/// This function is a wrapper over `Self::is_synced` that makes an additional
|
|
/// check for the execution layer sync status. Checks if the latest block has
|
|
/// a `block_number != 0` *if* the `current_slot` is also `> 0`.
|
|
/// Returns the `Self::is_synced` response if unable to get latest block.
|
|
pub async fn is_synced_for_notifier(&self, current_slot: Slot) -> bool {
|
|
let synced = self.is_synced().await;
|
|
if synced
|
|
&& let Ok(Some(block)) = self
|
|
.engine()
|
|
.api
|
|
.get_block_by_number(BlockByNumberQuery::Tag(LATEST_TAG))
|
|
.await
|
|
&& block.block_number == 0
|
|
&& current_slot > 0
|
|
{
|
|
return false;
|
|
}
|
|
|
|
synced
|
|
}
|
|
|
|
/// Return `true` if the execution layer is offline or returning errors on `newPayload`.
|
|
///
|
|
/// This function should never be used to prevent any operation in the beacon node, but can
|
|
/// be used to give an indication on the HTTP API that the node's execution layer is struggling,
|
|
/// which can in turn be used by the VC.
|
|
pub async fn is_offline_or_erroring(&self) -> bool {
|
|
self.engine().is_offline().await || *self.inner.last_new_payload_errored.read().await
|
|
}
|
|
|
|
/// Updates the proposer preparation data provided by validators
|
|
pub async fn update_proposer_preparation<'a, I>(&self, update_epoch: Epoch, proposer_data: I)
|
|
where
|
|
I: IntoIterator<Item = (&'a ProposerPreparationData, &'a Option<u64>)>,
|
|
{
|
|
let mut proposer_preparation_data = self.proposer_preparation_data().await;
|
|
|
|
for (preparation_entry, gas_limit) in proposer_data {
|
|
let new = ProposerPreparationDataEntry {
|
|
update_epoch,
|
|
preparation_data: preparation_entry.clone(),
|
|
gas_limit: *gas_limit,
|
|
};
|
|
|
|
match proposer_preparation_data.entry(preparation_entry.validator_index) {
|
|
Entry::Occupied(mut entry) => {
|
|
if entry.get_mut().update(new) {
|
|
metrics::inc_counter(&metrics::EXECUTION_LAYER_PROPOSER_DATA_UPDATED);
|
|
}
|
|
}
|
|
Entry::Vacant(entry) => {
|
|
entry.insert(new);
|
|
metrics::inc_counter(&metrics::EXECUTION_LAYER_PROPOSER_DATA_UPDATED);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Delete proposer preparation data for `proposer_index`. This is only useful in tests.
|
|
pub async fn clear_proposer_preparation(&self, proposer_index: u64) {
|
|
self.proposer_preparation_data()
|
|
.await
|
|
.remove(&proposer_index);
|
|
}
|
|
|
|
/// Removes expired entries from proposer_preparation_data and proposers caches
|
|
async fn clean_proposer_caches(&self, current_epoch: Epoch) -> Result<(), Error> {
|
|
let mut proposer_preparation_data = self.proposer_preparation_data().await;
|
|
|
|
// Keep all entries that have been updated in the last 2 epochs
|
|
let retain_epoch = current_epoch.saturating_sub(Epoch::new(2));
|
|
proposer_preparation_data.retain(|_validator_index, preparation_entry| {
|
|
preparation_entry.update_epoch >= retain_epoch
|
|
});
|
|
drop(proposer_preparation_data);
|
|
|
|
let retain_slot = retain_epoch.start_slot(E::slots_per_epoch());
|
|
self.proposers()
|
|
.write()
|
|
.await
|
|
.retain(|proposer_key, _proposer| proposer_key.slot >= retain_slot);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Returns `true` if there have been any validators registered via
|
|
/// `Self::update_proposer_preparation`.
|
|
pub async fn has_any_proposer_preparation_data(&self) -> bool {
|
|
!self.proposer_preparation_data().await.is_empty()
|
|
}
|
|
|
|
/// Returns `true` if the `proposer_index` has registered as a local validator via
|
|
/// `Self::update_proposer_preparation`.
|
|
pub async fn has_proposer_preparation_data(&self, proposer_index: u64) -> bool {
|
|
self.proposer_preparation_data()
|
|
.await
|
|
.contains_key(&proposer_index)
|
|
}
|
|
|
|
/// Check if a proposer is registered as a local validator, *from a synchronous context*.
|
|
///
|
|
/// This method MUST NOT be called from an async task.
|
|
pub fn has_proposer_preparation_data_blocking(&self, proposer_index: u64) -> bool {
|
|
self.inner
|
|
.proposer_preparation_data
|
|
.blocking_lock()
|
|
.contains_key(&proposer_index)
|
|
}
|
|
|
|
/// Returns the fee-recipient address that should be used to build a block
|
|
#[instrument(level = "debug", skip_all)]
|
|
pub async fn get_suggested_fee_recipient(&self, proposer_index: u64) -> Address {
|
|
if let Some(preparation_data_entry) =
|
|
self.proposer_preparation_data().await.get(&proposer_index)
|
|
{
|
|
// The values provided via the API have first priority.
|
|
preparation_data_entry.preparation_data.fee_recipient
|
|
} else if let Some(address) = self.inner.suggested_fee_recipient {
|
|
// If there has been no fee recipient provided via the API, but the BN has been provided
|
|
// with a global default address, use that.
|
|
address
|
|
} else {
|
|
// If there is no user-provided fee recipient, use a junk value and complain loudly.
|
|
crit!(
|
|
msg = "the suggested_fee_recipient was unknown during block production. \
|
|
a junk address was used, rewards were lost! \
|
|
check the --suggested-fee-recipient flag and VC configuration.",
|
|
?proposer_index,
|
|
"Fee recipient unknown"
|
|
);
|
|
|
|
Address::from_slice(&DEFAULT_SUGGESTED_FEE_RECIPIENT)
|
|
}
|
|
}
|
|
|
|
#[instrument(level = "debug", skip_all)]
|
|
pub async fn get_proposer_gas_limit(&self, proposer_index: u64) -> Option<u64> {
|
|
self.proposer_preparation_data()
|
|
.await
|
|
.get(&proposer_index)
|
|
.and_then(|entry| entry.gas_limit)
|
|
}
|
|
|
|
/// Maps to the `engine_getPayload` JSON-RPC call for post-Gloas payload construction.
|
|
///
|
|
/// However, it will attempt to call `self.prepare_payload` if it cannot find an existing
|
|
/// payload id for the given parameters.
|
|
///
|
|
/// ## Fallback Behavior
|
|
///
|
|
/// The result will be returned from the first node that returns successfully. No more nodes
|
|
/// will be contacted.
|
|
#[instrument(level = "debug", skip_all)]
|
|
pub async fn get_payload_gloas(
|
|
&self,
|
|
payload_parameters: PayloadParameters<'_>,
|
|
) -> Result<BlockProposalContentsGloas<E>, Error> {
|
|
let payload_response_type = self.get_full_payload_caching(payload_parameters).await?;
|
|
let GetPayloadResponseType::Full(payload_response) = payload_response_type else {
|
|
return Err(Error::Unexpected(
|
|
"get_payload_gloas should never return a blinded payload".to_owned(),
|
|
));
|
|
};
|
|
let GetPayloadResponse::Gloas(payload_response) = payload_response else {
|
|
return Err(Error::Unexpected(
|
|
"get_payload_gloas should always return a gloas `GetPayloadResponse` variant"
|
|
.to_owned(),
|
|
));
|
|
};
|
|
metrics::inc_counter_vec(
|
|
&metrics::EXECUTION_LAYER_GET_PAYLOAD_OUTCOME,
|
|
&[metrics::SUCCESS],
|
|
);
|
|
metrics::inc_counter_vec(
|
|
&metrics::EXECUTION_LAYER_GET_PAYLOAD_SOURCE,
|
|
&[metrics::LOCAL],
|
|
);
|
|
|
|
Ok(payload_response.into())
|
|
}
|
|
|
|
/// Maps to the `engine_getPayload` JSON-RPC call.
|
|
///
|
|
/// However, it will attempt to call `self.prepare_payload` if it cannot find an existing
|
|
/// payload id for the given parameters.
|
|
///
|
|
/// ## Fallback Behavior
|
|
///
|
|
/// The result will be returned from the first node that returns successfully. No more nodes
|
|
/// will be contacted.
|
|
#[instrument(level = "debug", skip_all)]
|
|
pub async fn get_payload(
|
|
&self,
|
|
payload_parameters: PayloadParameters<'_>,
|
|
builder_params: BuilderParams,
|
|
spec: &ChainSpec,
|
|
builder_boost_factor: Option<u64>,
|
|
block_production_version: BlockProductionVersion,
|
|
) -> Result<BlockProposalContentsType<E>, Error> {
|
|
let payload_result_type = match block_production_version {
|
|
BlockProductionVersion::V3 => match self
|
|
.determine_and_fetch_payload(
|
|
payload_parameters,
|
|
builder_params,
|
|
builder_boost_factor,
|
|
spec,
|
|
)
|
|
.await
|
|
{
|
|
Ok(payload) => payload,
|
|
Err(e) => {
|
|
metrics::inc_counter_vec(
|
|
&metrics::EXECUTION_LAYER_GET_PAYLOAD_OUTCOME,
|
|
&[metrics::FAILURE],
|
|
);
|
|
return Err(e);
|
|
}
|
|
},
|
|
BlockProductionVersion::BlindedV2 => {
|
|
let _timer = metrics::start_timer_vec(
|
|
&metrics::EXECUTION_LAYER_REQUEST_TIMES,
|
|
&[metrics::GET_BLINDED_PAYLOAD],
|
|
);
|
|
self.determine_and_fetch_payload(payload_parameters, builder_params, None, spec)
|
|
.await?
|
|
}
|
|
BlockProductionVersion::FullV2 => self
|
|
.get_full_payload_with(payload_parameters, noop)
|
|
.await
|
|
.and_then(GetPayloadResponseType::try_into)
|
|
.map(ProvenancedPayload::Local)?,
|
|
};
|
|
|
|
let block_proposal_content_type = match payload_result_type {
|
|
ProvenancedPayload::Local(local_payload) => local_payload,
|
|
ProvenancedPayload::Builder(builder_payload) => builder_payload,
|
|
};
|
|
|
|
match block_proposal_content_type {
|
|
BlockProposalContentsType::Full(block_proposal_contents) => {
|
|
metrics::inc_counter_vec(
|
|
&metrics::EXECUTION_LAYER_GET_PAYLOAD_OUTCOME,
|
|
&[metrics::SUCCESS],
|
|
);
|
|
metrics::inc_counter_vec(
|
|
&metrics::EXECUTION_LAYER_GET_PAYLOAD_SOURCE,
|
|
&[metrics::LOCAL],
|
|
);
|
|
if matches!(block_production_version, BlockProductionVersion::BlindedV2) {
|
|
Ok(BlockProposalContentsType::Blinded(
|
|
block_proposal_contents.into(),
|
|
))
|
|
} else {
|
|
Ok(BlockProposalContentsType::Full(block_proposal_contents))
|
|
}
|
|
}
|
|
BlockProposalContentsType::Blinded(block_proposal_contents) => {
|
|
metrics::inc_counter_vec(
|
|
&metrics::EXECUTION_LAYER_GET_PAYLOAD_OUTCOME,
|
|
&[metrics::SUCCESS],
|
|
);
|
|
metrics::inc_counter_vec(
|
|
&metrics::EXECUTION_LAYER_GET_PAYLOAD_SOURCE,
|
|
&[metrics::BUILDER],
|
|
);
|
|
Ok(BlockProposalContentsType::Blinded(block_proposal_contents))
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Fetches local and builder paylaods concurrently, Logs and returns results.
|
|
async fn fetch_builder_and_local_payloads(
|
|
&self,
|
|
builder: &BuilderHttpClient,
|
|
builder_params: &BuilderParams,
|
|
payload_parameters: PayloadParameters<'_>,
|
|
) -> (
|
|
Result<Option<ForkVersionedResponse<SignedBuilderBid<E>>>, builder_client::Error>,
|
|
Result<GetPayloadResponse<E>, Error>,
|
|
) {
|
|
let slot = builder_params.slot;
|
|
let pubkey = &builder_params.pubkey;
|
|
let parent_hash = payload_parameters.parent_hash;
|
|
|
|
info!(
|
|
?slot,
|
|
?pubkey,
|
|
?parent_hash,
|
|
"Requesting blinded header from connected builder"
|
|
);
|
|
|
|
// Wait for the builder *and* local EL to produce a payload (or return an error).
|
|
let ((relay_result, relay_duration), (local_result, local_duration)) = tokio::join!(
|
|
timed_future(metrics::GET_BLINDED_PAYLOAD_BUILDER, async {
|
|
builder
|
|
.get_builder_header::<E>(slot, parent_hash, pubkey)
|
|
.instrument(debug_span!("get_builder_header"))
|
|
.await
|
|
}),
|
|
timed_future(metrics::GET_BLINDED_PAYLOAD_LOCAL, async {
|
|
self.get_full_payload_caching(payload_parameters)
|
|
.await
|
|
.and_then(|local_result_type| match local_result_type {
|
|
GetPayloadResponseType::Full(payload) => Ok(payload),
|
|
GetPayloadResponseType::Blinded(_) => Err(Error::PayloadTypeMismatch),
|
|
})
|
|
})
|
|
);
|
|
|
|
info!(
|
|
relay_fee_recipient = match &relay_result {
|
|
Ok(Some(r)) => format!("{:?}", r.data.message.header().fee_recipient()),
|
|
Ok(None) => "empty response".to_string(),
|
|
Err(_) => "request failed".to_string(),
|
|
},
|
|
relay_response_ms = relay_duration.as_millis(),
|
|
local_fee_recipient = match &local_result {
|
|
Ok(get_payload_response) => format!("{:?}", get_payload_response.fee_recipient()),
|
|
Err(_) => "request failed".to_string(),
|
|
},
|
|
local_response_ms = local_duration.as_millis(),
|
|
?parent_hash,
|
|
"Requested blinded execution payload"
|
|
);
|
|
|
|
(relay_result, local_result)
|
|
}
|
|
|
|
async fn determine_and_fetch_payload(
|
|
&self,
|
|
payload_parameters: PayloadParameters<'_>,
|
|
builder_params: BuilderParams,
|
|
builder_boost_factor: Option<u64>,
|
|
spec: &ChainSpec,
|
|
) -> Result<ProvenancedPayload<BlockProposalContentsType<E>>, Error> {
|
|
let Some(builder) = self.builder() else {
|
|
// no builder.. return local payload
|
|
return self
|
|
.get_full_payload_caching(payload_parameters)
|
|
.await
|
|
.and_then(GetPayloadResponseType::try_into)
|
|
.map(ProvenancedPayload::Local);
|
|
};
|
|
|
|
// check chain health
|
|
if builder_params.chain_health != ChainHealth::Healthy {
|
|
// chain is unhealthy, gotta use local payload
|
|
match builder_params.chain_health {
|
|
ChainHealth::Unhealthy(condition) => info!(
|
|
info = "this helps protect the network. the --builder-fallback flags \
|
|
can adjust the expected health conditions.",
|
|
failed_condition = ?condition,
|
|
"Chain is unhealthy, using local payload"
|
|
),
|
|
// Intentional no-op, so we never attempt builder API proposals pre-merge.
|
|
ChainHealth::PreMerge => (),
|
|
ChainHealth::Optimistic => info!(
|
|
info = "the local execution engine is syncing and the builder network \
|
|
cannot safely be used - unable to propose block",
|
|
"Chain is optimistic; can't build payload"
|
|
),
|
|
ChainHealth::Healthy => {
|
|
crit!("got healthy but also not healthy.. this shouldn't happen!")
|
|
}
|
|
}
|
|
return self
|
|
.get_full_payload_caching(payload_parameters)
|
|
.await
|
|
.and_then(GetPayloadResponseType::try_into)
|
|
.map(ProvenancedPayload::Local);
|
|
}
|
|
|
|
let parent_hash = payload_parameters.parent_hash;
|
|
let (relay_result, local_result) = self
|
|
.fetch_builder_and_local_payloads(builder.as_ref(), &builder_params, payload_parameters)
|
|
.await;
|
|
|
|
match (relay_result, local_result) {
|
|
(Err(e), Ok(local)) => {
|
|
warn!(
|
|
info = "falling back to local execution client",
|
|
relay_error = ?e,
|
|
local_block_hash = ?local.block_hash(),
|
|
?parent_hash,
|
|
"Builder error when requesting payload"
|
|
);
|
|
Ok(ProvenancedPayload::Local(BlockProposalContentsType::Full(
|
|
local.try_into()?,
|
|
)))
|
|
}
|
|
(Ok(None), Ok(local)) => {
|
|
info!(
|
|
info = "falling back to local execution client",
|
|
local_block_hash=?local.block_hash(),
|
|
?parent_hash,
|
|
"Builder did not return a payload"
|
|
);
|
|
Ok(ProvenancedPayload::Local(BlockProposalContentsType::Full(
|
|
local.try_into()?,
|
|
)))
|
|
}
|
|
(Err(relay_error), Err(local_error)) => {
|
|
crit!(
|
|
info = "the local EL and builder both failed - unable to propose block",
|
|
?relay_error,
|
|
?local_error,
|
|
?parent_hash,
|
|
"Unable to produce execution payload"
|
|
);
|
|
|
|
Err(Error::CannotProduceHeader)
|
|
}
|
|
(Ok(None), Err(local_error)) => {
|
|
crit!(
|
|
info = "the local EL failed and the builder returned nothing - \
|
|
the block proposal will be missed",
|
|
?local_error,
|
|
?parent_hash,
|
|
"Unable to produce execution payload"
|
|
);
|
|
|
|
Err(Error::CannotProduceHeader)
|
|
}
|
|
(Ok(Some(relay)), Ok(local)) => {
|
|
let header = &relay.data.message.header();
|
|
|
|
info!(
|
|
relay_block_hash = ?header.block_hash(),
|
|
local_block_hash=?local.block_hash(),
|
|
?parent_hash,
|
|
"Received local and builder payloads"
|
|
);
|
|
|
|
// check relay payload validity
|
|
if let Err(reason) =
|
|
verify_builder_bid(&relay, payload_parameters, Some(local.block_number()), spec)
|
|
{
|
|
// relay payload invalid -> return local
|
|
metrics::inc_counter_vec(
|
|
&metrics::EXECUTION_LAYER_GET_PAYLOAD_BUILDER_REJECTIONS,
|
|
&[reason.as_ref().as_ref()],
|
|
);
|
|
warn!(
|
|
info = "using local payload",
|
|
%reason,
|
|
relay_block_hash = ?header.block_hash(),
|
|
?parent_hash,
|
|
"Builder returned invalid payload"
|
|
);
|
|
return Ok(ProvenancedPayload::Local(BlockProposalContentsType::Full(
|
|
local.try_into()?,
|
|
)));
|
|
}
|
|
|
|
let relay_value = *relay.data.message.value();
|
|
|
|
let boosted_relay_value = match builder_boost_factor {
|
|
Some(builder_boost_factor) => (relay_value / Uint256::from(100))
|
|
.saturating_mul(Uint256::from(builder_boost_factor)),
|
|
None => relay_value,
|
|
};
|
|
|
|
let local_value = *local.block_value();
|
|
|
|
if local_value >= boosted_relay_value {
|
|
info!(
|
|
%local_value,
|
|
%relay_value,
|
|
%boosted_relay_value,
|
|
?builder_boost_factor,
|
|
"Local block is more profitable than relay block"
|
|
);
|
|
return Ok(ProvenancedPayload::Local(BlockProposalContentsType::Full(
|
|
local.try_into()?,
|
|
)));
|
|
}
|
|
|
|
if local.should_override_builder().unwrap_or(false) {
|
|
info!(
|
|
%local_value,
|
|
%relay_value,
|
|
"Using local payload because execution engine suggested we ignore builder payload"
|
|
);
|
|
return Ok(ProvenancedPayload::Local(BlockProposalContentsType::Full(
|
|
local.try_into()?,
|
|
)));
|
|
}
|
|
|
|
info!(
|
|
%local_value,
|
|
%relay_value,
|
|
%boosted_relay_value,
|
|
?builder_boost_factor,
|
|
"Relay block is more profitable than local block"
|
|
);
|
|
|
|
Ok(ProvenancedPayload::try_from(relay.data.message)?)
|
|
}
|
|
(Ok(Some(relay)), Err(local_error)) => {
|
|
let header = &relay.data.message.header();
|
|
|
|
info!(
|
|
relay_block_hash = ?header.block_hash(),
|
|
?local_error,
|
|
?parent_hash,
|
|
"Received builder payload with local error"
|
|
);
|
|
|
|
match verify_builder_bid(&relay, payload_parameters, None, spec) {
|
|
Ok(()) => Ok(ProvenancedPayload::try_from(relay.data.message)?),
|
|
Err(reason) => {
|
|
metrics::inc_counter_vec(
|
|
&metrics::EXECUTION_LAYER_GET_PAYLOAD_BUILDER_REJECTIONS,
|
|
&[reason.as_ref().as_ref()],
|
|
);
|
|
crit!(
|
|
info = "no local payload either - unable to propose block",
|
|
%reason,
|
|
relay_block_hash = ?header.block_hash(),
|
|
?parent_hash,
|
|
"Builder returned invalid payload"
|
|
);
|
|
Err(Error::CannotProduceHeader)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Get a full payload and cache its result in the execution layer's payload cache.
|
|
async fn get_full_payload_caching(
|
|
&self,
|
|
payload_parameters: PayloadParameters<'_>,
|
|
) -> Result<GetPayloadResponseType<E>, Error> {
|
|
self.get_full_payload_with(payload_parameters, Self::cache_payload)
|
|
.await
|
|
}
|
|
|
|
#[instrument(level = "debug", skip_all)]
|
|
async fn get_full_payload_with(
|
|
&self,
|
|
payload_parameters: PayloadParameters<'_>,
|
|
cache_fn: fn(
|
|
&ExecutionLayer<E>,
|
|
PayloadContentsRefTuple<E>,
|
|
) -> Option<FullPayloadContents<E>>,
|
|
) -> Result<GetPayloadResponseType<E>, Error> {
|
|
let PayloadParameters {
|
|
parent_hash,
|
|
payload_attributes,
|
|
forkchoice_update_params,
|
|
current_fork,
|
|
..
|
|
} = payload_parameters;
|
|
|
|
self.engine()
|
|
.request(move |engine| async move {
|
|
let payload_id = if let Some(id) = engine
|
|
.get_payload_id(&parent_hash, payload_attributes)
|
|
.await
|
|
{
|
|
// The payload id has been cached for this engine.
|
|
metrics::inc_counter_vec(
|
|
&metrics::EXECUTION_LAYER_PRE_PREPARED_PAYLOAD_ID,
|
|
&[metrics::HIT],
|
|
);
|
|
id
|
|
} else {
|
|
// The payload id has *not* been cached. Trigger an artificial
|
|
// fork choice update to retrieve a payload ID.
|
|
metrics::inc_counter_vec(
|
|
&metrics::EXECUTION_LAYER_PRE_PREPARED_PAYLOAD_ID,
|
|
&[metrics::MISS],
|
|
);
|
|
let fork_choice_state = ForkchoiceState {
|
|
head_block_hash: parent_hash,
|
|
safe_block_hash: forkchoice_update_params
|
|
.justified_hash
|
|
.unwrap_or_else(ExecutionBlockHash::zero),
|
|
finalized_block_hash: forkchoice_update_params
|
|
.finalized_hash
|
|
.unwrap_or_else(ExecutionBlockHash::zero),
|
|
};
|
|
|
|
let response = engine
|
|
.notify_forkchoice_updated(
|
|
fork_choice_state,
|
|
Some(payload_attributes.clone()),
|
|
)
|
|
.await?;
|
|
|
|
match response.payload_id {
|
|
Some(payload_id) => payload_id,
|
|
None => {
|
|
error!(
|
|
msg = "No payload ID, the engine is likely syncing. \
|
|
This has the potential to cause a missed block proposal.",
|
|
status = ?response.payload_status,
|
|
"Exec engine unable to produce payload"
|
|
);
|
|
return Err(ApiError::PayloadIdUnavailable);
|
|
}
|
|
}
|
|
};
|
|
|
|
let payload_response = async {
|
|
debug!(
|
|
suggested_fee_recipient = ?payload_attributes.suggested_fee_recipient(),
|
|
prev_randao = ?payload_attributes.prev_randao(),
|
|
timestamp = payload_attributes.timestamp(),
|
|
?parent_hash,
|
|
"Issuing engine_getPayload"
|
|
);
|
|
let _timer = metrics::start_timer_vec(
|
|
&metrics::EXECUTION_LAYER_REQUEST_TIMES,
|
|
&[metrics::GET_PAYLOAD],
|
|
);
|
|
engine.api.get_payload::<E>(current_fork, payload_id).await
|
|
}
|
|
.await?;
|
|
|
|
if payload_response.execution_payload_ref().fee_recipient()
|
|
!= payload_attributes.suggested_fee_recipient()
|
|
{
|
|
error!(
|
|
msg = "The fee recipient returned from the Execution Engine differs \
|
|
from the suggested_fee_recipient set on the beacon node. This could \
|
|
indicate that fees are being diverted to another address. Please \
|
|
ensure that the value of suggested_fee_recipient is set correctly and \
|
|
that the Execution Engine is trusted.",
|
|
fee_recipient = ?payload_response.execution_payload_ref().fee_recipient(),
|
|
suggested_fee_recipient = ?payload_attributes.suggested_fee_recipient(),
|
|
"Inconsistent fee recipient"
|
|
);
|
|
}
|
|
if cache_fn(
|
|
self,
|
|
(
|
|
payload_response.execution_payload_ref(),
|
|
payload_response.blobs_bundle().ok(),
|
|
),
|
|
)
|
|
.is_some()
|
|
{
|
|
warn!(
|
|
"Duplicate payload cached, this might indicate redundant proposal \
|
|
attempts."
|
|
);
|
|
}
|
|
|
|
Ok(GetPayloadResponseType::Full(payload_response))
|
|
})
|
|
.await
|
|
.map_err(Box::new)
|
|
.map_err(Error::EngineError)
|
|
}
|
|
|
|
/// Maps to the `engine_newPayload` JSON-RPC call.
|
|
/// TODO(EIP-7732) figure out how and why Mark relaxed new_payload_request param's typ to NewPayloadRequest<E>
|
|
pub async fn notify_new_payload(
|
|
&self,
|
|
new_payload_request: NewPayloadRequest<'_, E>,
|
|
) -> Result<PayloadStatus, Error> {
|
|
let _timer = metrics::start_timer_vec(
|
|
&metrics::EXECUTION_LAYER_REQUEST_TIMES,
|
|
&[metrics::NEW_PAYLOAD],
|
|
);
|
|
let timer = std::time::Instant::now();
|
|
|
|
let block_number = new_payload_request.block_number();
|
|
let block_hash = new_payload_request.block_hash();
|
|
let parent_hash = new_payload_request.parent_hash();
|
|
|
|
let result = self
|
|
.engine()
|
|
.request(|engine| engine.api.new_payload(new_payload_request))
|
|
.await;
|
|
|
|
if let Ok(status) = &result {
|
|
let status_str = <&'static str>::from(status.status);
|
|
metrics::inc_counter_vec(
|
|
&metrics::EXECUTION_LAYER_PAYLOAD_STATUS,
|
|
&["new_payload", status_str],
|
|
);
|
|
debug!(
|
|
status = status_str,
|
|
?parent_hash,
|
|
?block_hash,
|
|
block_number,
|
|
response_time_ms = timer.elapsed().as_millis(),
|
|
"Processed engine_newPayload"
|
|
);
|
|
}
|
|
*self.inner.last_new_payload_errored.write().await = result.is_err();
|
|
|
|
process_payload_status(block_hash, result)
|
|
.map_err(Box::new)
|
|
.map_err(Error::EngineError)
|
|
}
|
|
|
|
/// Update engine sync status.
|
|
pub async fn upcheck(&self) {
|
|
self.engine().upcheck().await;
|
|
}
|
|
|
|
/// Register that the given `validator_index` is going to produce a block at `slot`.
|
|
///
|
|
/// The block will be built atop `head_block_root` and the EL will need to prepare an
|
|
/// `ExecutionPayload` as defined by the given `payload_attributes`.
|
|
pub async fn insert_proposer(
|
|
&self,
|
|
slot: Slot,
|
|
head_block_root: Hash256,
|
|
validator_index: u64,
|
|
payload_attributes: PayloadAttributes,
|
|
) -> bool {
|
|
let proposers_key = ProposerKey {
|
|
slot,
|
|
head_block_root,
|
|
};
|
|
|
|
let existing = self.proposers().write().await.insert(
|
|
proposers_key,
|
|
Proposer {
|
|
validator_index,
|
|
payload_attributes,
|
|
},
|
|
);
|
|
|
|
if existing.is_none() {
|
|
metrics::inc_counter(&metrics::EXECUTION_LAYER_PROPOSER_INSERTED);
|
|
}
|
|
|
|
existing.is_some()
|
|
}
|
|
|
|
/// If there has been a proposer registered via `Self::insert_proposer` with a matching `slot`
|
|
/// `head_block_root`, then return the appropriate `PayloadAttributes` for inclusion in
|
|
/// `forkchoiceUpdated` calls.
|
|
pub async fn payload_attributes(
|
|
&self,
|
|
current_slot: Slot,
|
|
head_block_root: Hash256,
|
|
) -> Option<PayloadAttributes> {
|
|
let proposers_key = ProposerKey {
|
|
slot: current_slot,
|
|
head_block_root,
|
|
};
|
|
|
|
let proposer = self.proposers().read().await.get(&proposers_key).cloned()?;
|
|
|
|
debug!(
|
|
payload_attributes = ?proposer.payload_attributes,
|
|
?head_block_root,
|
|
slot = %current_slot,
|
|
validator_index = proposer.validator_index,
|
|
"Beacon proposer found"
|
|
);
|
|
|
|
Some(proposer.payload_attributes)
|
|
}
|
|
|
|
/// Maps to the `engine_consensusValidated` JSON-RPC call.
|
|
pub async fn notify_forkchoice_updated(
|
|
&self,
|
|
head_block_hash: ExecutionBlockHash,
|
|
justified_block_hash: ExecutionBlockHash,
|
|
finalized_block_hash: ExecutionBlockHash,
|
|
current_slot: Slot,
|
|
head_block_root: Hash256,
|
|
) -> Result<PayloadStatus, Error> {
|
|
let _timer = metrics::start_timer_vec(
|
|
&metrics::EXECUTION_LAYER_REQUEST_TIMES,
|
|
&[metrics::FORKCHOICE_UPDATED],
|
|
);
|
|
|
|
debug!(
|
|
?finalized_block_hash,
|
|
?justified_block_hash,
|
|
?head_block_hash,
|
|
?head_block_root,
|
|
?current_slot,
|
|
"Issuing engine_forkchoiceUpdated"
|
|
);
|
|
|
|
let next_slot = current_slot + 1;
|
|
let payload_attributes = self.payload_attributes(next_slot, head_block_root).await;
|
|
|
|
// Compute the "lookahead", the time between when the payload will be produced and now.
|
|
if let Some(ref payload_attributes) = payload_attributes
|
|
&& let Ok(now) = SystemTime::now().duration_since(UNIX_EPOCH)
|
|
{
|
|
let timestamp = Duration::from_secs(payload_attributes.timestamp());
|
|
if let Some(lookahead) = timestamp.checked_sub(now) {
|
|
metrics::observe_duration(
|
|
&metrics::EXECUTION_LAYER_PAYLOAD_ATTRIBUTES_LOOKAHEAD,
|
|
lookahead,
|
|
);
|
|
} else {
|
|
debug!(?timestamp, ?now, "Late payload attributes")
|
|
}
|
|
}
|
|
|
|
let forkchoice_state = ForkchoiceState {
|
|
head_block_hash,
|
|
safe_block_hash: justified_block_hash,
|
|
finalized_block_hash,
|
|
};
|
|
|
|
self.engine()
|
|
.set_latest_forkchoice_state(forkchoice_state)
|
|
.await;
|
|
|
|
let result = self
|
|
.engine()
|
|
.request(|engine| async move {
|
|
engine
|
|
.notify_forkchoice_updated(forkchoice_state, payload_attributes)
|
|
.await
|
|
})
|
|
.await;
|
|
|
|
if let Ok(status) = &result {
|
|
metrics::inc_counter_vec(
|
|
&metrics::EXECUTION_LAYER_PAYLOAD_STATUS,
|
|
&["forkchoice_updated", status.payload_status.status.into()],
|
|
);
|
|
}
|
|
|
|
process_payload_status(
|
|
head_block_hash,
|
|
result.map(|response| response.payload_status),
|
|
)
|
|
.map_err(Box::new)
|
|
.map_err(Error::EngineError)
|
|
}
|
|
|
|
/// Returns the execution engine capabilities resulting from a call to
|
|
/// engine_exchangeCapabilities. If the capabilities cache is not populated,
|
|
/// or if it is populated with a cached result of age >= `age_limit`, this
|
|
/// method will fetch the result from the execution engine and populate the
|
|
/// cache before returning it. Otherwise it will return a cached result from
|
|
/// a previous call.
|
|
///
|
|
/// Set `age_limit` to `None` to always return the cached result
|
|
/// Set `age_limit` to `Some(Duration::ZERO)` to force fetching from EE
|
|
pub async fn get_engine_capabilities(
|
|
&self,
|
|
age_limit: Option<Duration>,
|
|
) -> Result<EngineCapabilities, Error> {
|
|
self.engine()
|
|
.request(|engine| engine.get_engine_capabilities(age_limit))
|
|
.await
|
|
.map_err(Into::into)
|
|
}
|
|
|
|
/// Returns the execution engine version resulting from a call to
|
|
/// engine_clientVersionV1. If the version cache is not populated, or if it
|
|
/// is populated with a cached result of age >= `age_limit`, this method will
|
|
/// fetch the result from the execution engine and populate the cache before
|
|
/// returning it. Otherwise it will return the cached result from an earlier
|
|
/// call.
|
|
///
|
|
/// Set `age_limit` to `None` to always return the cached result
|
|
/// Set `age_limit` to `Some(Duration::ZERO)` to force fetching from EE
|
|
pub async fn get_engine_version(
|
|
&self,
|
|
age_limit: Option<Duration>,
|
|
) -> Result<Vec<ClientVersionV1>, Error> {
|
|
let versions = self
|
|
.engine()
|
|
.request(|engine| engine.get_engine_version(age_limit))
|
|
.await
|
|
.map_err(Into::<Error>::into)?;
|
|
metrics::expose_execution_layer_info(&versions);
|
|
|
|
Ok(versions)
|
|
}
|
|
|
|
pub async fn get_payload_bodies_by_hash(
|
|
&self,
|
|
hashes: Vec<ExecutionBlockHash>,
|
|
) -> Result<Vec<Option<ExecutionPayloadBodyV1<E>>>, Error> {
|
|
self.engine()
|
|
.request(|engine: &Engine| async move {
|
|
engine.api.get_payload_bodies_by_hash_v1(hashes).await
|
|
})
|
|
.await
|
|
.map_err(Box::new)
|
|
.map_err(Error::EngineError)
|
|
}
|
|
|
|
pub async fn get_payload_bodies_by_range(
|
|
&self,
|
|
start: u64,
|
|
count: u64,
|
|
) -> Result<Vec<Option<ExecutionPayloadBodyV1<E>>>, Error> {
|
|
let _timer = metrics::start_timer(&metrics::EXECUTION_LAYER_GET_PAYLOAD_BODIES_BY_RANGE);
|
|
self.engine()
|
|
.request(|engine: &Engine| async move {
|
|
engine
|
|
.api
|
|
.get_payload_bodies_by_range_v1(start, count)
|
|
.await
|
|
})
|
|
.await
|
|
.map_err(Box::new)
|
|
.map_err(Error::EngineError)
|
|
}
|
|
|
|
/// Fetch a full payload from the execution node.
|
|
///
|
|
/// This will fail if the payload is not from the finalized portion of the chain.
|
|
pub async fn get_payload_for_header(
|
|
&self,
|
|
header: &ExecutionPayloadHeader<E>,
|
|
fork: ForkName,
|
|
) -> Result<Option<ExecutionPayload<E>>, Error> {
|
|
let block_number = header.block_number();
|
|
|
|
// Handle default payload body.
|
|
if header.block_hash() == ExecutionBlockHash::zero() {
|
|
let payload = match fork {
|
|
ForkName::Bellatrix => ExecutionPayloadBellatrix::default().into(),
|
|
ForkName::Capella => ExecutionPayloadCapella::default().into(),
|
|
ForkName::Deneb => ExecutionPayloadDeneb::default().into(),
|
|
ForkName::Electra => ExecutionPayloadElectra::default().into(),
|
|
ForkName::Fulu => ExecutionPayloadFulu::default().into(),
|
|
ForkName::Base | ForkName::Altair => {
|
|
return Err(Error::InvalidForkForPayload);
|
|
}
|
|
ForkName::Gloas => {
|
|
return Err(Error::InvalidForkForPayload);
|
|
}
|
|
};
|
|
return Ok(Some(payload));
|
|
}
|
|
|
|
// Use efficient payload bodies by range method if supported.
|
|
let capabilities = self.get_engine_capabilities(None).await?;
|
|
if capabilities.get_payload_bodies_by_range_v1 {
|
|
let mut payload_bodies = self.get_payload_bodies_by_range(block_number, 1).await?;
|
|
|
|
if payload_bodies.len() != 1 {
|
|
return Ok(None);
|
|
}
|
|
|
|
let opt_payload_body = payload_bodies.pop().flatten();
|
|
opt_payload_body
|
|
.map(|body| {
|
|
body.to_payload(header.clone())
|
|
.map_err(Error::InvalidPayloadBody)
|
|
})
|
|
.transpose()
|
|
} else {
|
|
Err(Error::PayloadBodiesByRangeNotSupported)
|
|
}
|
|
}
|
|
|
|
pub async fn get_blobs_v1(
|
|
&self,
|
|
query: Vec<Hash256>,
|
|
) -> Result<Vec<Option<BlobAndProofV1<E>>>, Error> {
|
|
let capabilities = self.get_engine_capabilities(None).await?;
|
|
|
|
if capabilities.get_blobs_v1 {
|
|
self.engine()
|
|
.request(|engine| async move { engine.api.get_blobs_v1(query).await })
|
|
.await
|
|
.map_err(Box::new)
|
|
.map_err(Error::EngineError)
|
|
} else {
|
|
Err(Error::GetBlobsNotSupported)
|
|
}
|
|
}
|
|
|
|
pub async fn get_blobs_v2(
|
|
&self,
|
|
query: Vec<Hash256>,
|
|
) -> Result<Option<Vec<BlobAndProofV2<E>>>, Error> {
|
|
let capabilities = self.get_engine_capabilities(None).await?;
|
|
|
|
if capabilities.get_blobs_v2 {
|
|
self.engine()
|
|
.request(|engine| async move { engine.api.get_blobs_v2(query).await })
|
|
.await
|
|
.map_err(Box::new)
|
|
.map_err(Error::EngineError)
|
|
} else {
|
|
Err(Error::GetBlobsNotSupported)
|
|
}
|
|
}
|
|
|
|
pub async fn get_block_by_number(
|
|
&self,
|
|
query: BlockByNumberQuery<'_>,
|
|
) -> Result<Option<ExecutionBlock>, Error> {
|
|
self.engine()
|
|
.request(|engine| async move { engine.api.get_block_by_number(query).await })
|
|
.await
|
|
.map_err(Box::new)
|
|
.map_err(Error::EngineError)
|
|
}
|
|
|
|
pub async fn propose_blinded_beacon_block(
|
|
&self,
|
|
block_root: Hash256,
|
|
block: &SignedBlindedBeaconBlock<E>,
|
|
spec: &ChainSpec,
|
|
) -> Result<SubmitBlindedBlockResponse<E>, Error> {
|
|
debug!(?block_root, "Sending block to builder");
|
|
if spec.is_fulu_scheduled() {
|
|
let resp = self
|
|
.post_builder_blinded_blocks_v2(block_root, block)
|
|
.await
|
|
.map(|()| SubmitBlindedBlockResponse::V2);
|
|
// Fallback to v1 if v2 fails because the relay doesn't support it.
|
|
// Note: we should remove the fallback post fulu when all relays have support for v2.
|
|
if resp.is_err() {
|
|
self.post_builder_blinded_blocks_v1(block_root, block)
|
|
.await
|
|
.map(|full_payload| SubmitBlindedBlockResponse::V1(Box::new(full_payload)))
|
|
} else {
|
|
resp
|
|
}
|
|
} else {
|
|
self.post_builder_blinded_blocks_v1(block_root, block)
|
|
.await
|
|
.map(|full_payload| SubmitBlindedBlockResponse::V1(Box::new(full_payload)))
|
|
}
|
|
}
|
|
|
|
async fn post_builder_blinded_blocks_v1(
|
|
&self,
|
|
block_root: Hash256,
|
|
block: &SignedBlindedBeaconBlock<E>,
|
|
) -> Result<FullPayloadContents<E>, Error> {
|
|
if let Some(builder) = self.builder() {
|
|
let (payload_result, duration) =
|
|
timed_future(metrics::POST_BLINDED_PAYLOAD_BUILDER, async {
|
|
let ssz_enabled = builder.is_ssz_available();
|
|
debug!(
|
|
?block_root,
|
|
ssz = ssz_enabled,
|
|
"Calling submit_blinded_block v1 on builder"
|
|
);
|
|
if ssz_enabled {
|
|
builder
|
|
.post_builder_blinded_blocks_v1_ssz(block)
|
|
.await
|
|
.map_err(Error::Builder)
|
|
} else {
|
|
builder
|
|
.post_builder_blinded_blocks_v1(block)
|
|
.await
|
|
.map_err(Error::Builder)
|
|
.map(|d| d.data)
|
|
}
|
|
})
|
|
.await;
|
|
|
|
match &payload_result {
|
|
Ok(unblinded_response) => {
|
|
metrics::inc_counter_vec(
|
|
&metrics::EXECUTION_LAYER_BUILDER_REVEAL_PAYLOAD_OUTCOME,
|
|
&[metrics::SUCCESS],
|
|
);
|
|
let payload = unblinded_response.payload_ref();
|
|
info!(
|
|
relay_response_ms = duration.as_millis(),
|
|
?block_root,
|
|
fee_recipient = ?payload.fee_recipient(),
|
|
block_hash = ?payload.block_hash(),
|
|
parent_hash = ?payload.parent_hash(),
|
|
"Builder successfully revealed payload"
|
|
)
|
|
}
|
|
Err(e) => {
|
|
metrics::inc_counter_vec(
|
|
&metrics::EXECUTION_LAYER_BUILDER_REVEAL_PAYLOAD_OUTCOME,
|
|
&[metrics::FAILURE],
|
|
);
|
|
warn!(
|
|
info = "this is common behaviour for some builders and may not indicate an issue",
|
|
error = ?e,
|
|
relay_response_ms = duration.as_millis(),
|
|
?block_root,
|
|
parent_hash = ?block
|
|
.message()
|
|
.execution_payload()
|
|
.map(|payload| format!("{}", payload.parent_hash()))
|
|
.unwrap_or_else(|_| "unknown".to_string()),
|
|
"Builder failed to reveal payload"
|
|
)
|
|
}
|
|
}
|
|
|
|
payload_result
|
|
} else {
|
|
Err(Error::NoPayloadBuilder)
|
|
}
|
|
}
|
|
|
|
async fn post_builder_blinded_blocks_v2(
|
|
&self,
|
|
block_root: Hash256,
|
|
block: &SignedBlindedBeaconBlock<E>,
|
|
) -> Result<(), Error> {
|
|
if let Some(builder) = self.builder() {
|
|
let (result, duration) = timed_future(metrics::POST_BLINDED_PAYLOAD_BUILDER, async {
|
|
let ssz_enabled = builder.is_ssz_available();
|
|
debug!(
|
|
?block_root,
|
|
ssz = ssz_enabled,
|
|
"Calling submit_blinded_block v2 on builder"
|
|
);
|
|
if ssz_enabled {
|
|
builder
|
|
.post_builder_blinded_blocks_v2_ssz(block)
|
|
.await
|
|
.map_err(Error::Builder)
|
|
} else {
|
|
builder
|
|
.post_builder_blinded_blocks_v2(block)
|
|
.await
|
|
.map_err(Error::Builder)
|
|
}
|
|
})
|
|
.await;
|
|
|
|
match result {
|
|
Ok(()) => {
|
|
metrics::inc_counter_vec(
|
|
&metrics::EXECUTION_LAYER_BUILDER_REVEAL_PAYLOAD_OUTCOME,
|
|
&[metrics::SUCCESS],
|
|
);
|
|
info!(
|
|
relay_response_ms = duration.as_millis(),
|
|
?block_root,
|
|
"Successfully submitted blinded block to the builder"
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
Err(e) => {
|
|
metrics::inc_counter_vec(
|
|
&metrics::EXECUTION_LAYER_BUILDER_REVEAL_PAYLOAD_OUTCOME,
|
|
&[metrics::FAILURE],
|
|
);
|
|
error!(
|
|
info = "this may result in a missed block proposal",
|
|
error = ?e,
|
|
relay_response_ms = duration.as_millis(),
|
|
?block_root,
|
|
"Failed to submit blinded block to the builder"
|
|
);
|
|
Err(e)
|
|
}
|
|
}
|
|
} else {
|
|
Err(Error::NoPayloadBuilder)
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(AsRefStr)]
|
|
#[strum(serialize_all = "snake_case")]
|
|
enum InvalidBuilderPayload {
|
|
ParentHash {
|
|
payload: ExecutionBlockHash,
|
|
expected: ExecutionBlockHash,
|
|
},
|
|
PrevRandao {
|
|
payload: Hash256,
|
|
expected: Hash256,
|
|
},
|
|
Timestamp {
|
|
payload: u64,
|
|
expected: u64,
|
|
},
|
|
BlockNumber {
|
|
payload: u64,
|
|
expected: Option<u64>,
|
|
},
|
|
Fork {
|
|
payload: ForkName,
|
|
expected: ForkName,
|
|
},
|
|
Signature {
|
|
signature: Signature,
|
|
pubkey: PublicKeyBytes,
|
|
},
|
|
WithdrawalsRoot {
|
|
payload: Option<Hash256>,
|
|
expected: Option<Hash256>,
|
|
},
|
|
GasLimitMismatch {
|
|
payload: u64,
|
|
expected: u64,
|
|
},
|
|
SszTypesError(ssz_types::Error),
|
|
}
|
|
|
|
impl fmt::Display for InvalidBuilderPayload {
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
match self {
|
|
InvalidBuilderPayload::ParentHash { payload, expected } => {
|
|
write!(f, "payload block hash was {} not {}", payload, expected)
|
|
}
|
|
InvalidBuilderPayload::PrevRandao { payload, expected } => {
|
|
write!(f, "payload prev randao was {} not {}", payload, expected)
|
|
}
|
|
InvalidBuilderPayload::Timestamp { payload, expected } => {
|
|
write!(f, "payload timestamp was {} not {}", payload, expected)
|
|
}
|
|
InvalidBuilderPayload::BlockNumber { payload, expected } => {
|
|
write!(f, "payload block number was {} not {:?}", payload, expected)
|
|
}
|
|
InvalidBuilderPayload::Fork { payload, expected } => {
|
|
write!(f, "payload fork was {} not {}", payload, expected)
|
|
}
|
|
InvalidBuilderPayload::Signature { signature, pubkey } => write!(
|
|
f,
|
|
"invalid payload signature {} for pubkey {}",
|
|
signature, pubkey
|
|
),
|
|
InvalidBuilderPayload::WithdrawalsRoot { payload, expected } => {
|
|
let opt_string = |opt_hash: &Option<Hash256>| {
|
|
opt_hash
|
|
.map(|hash| hash.to_string())
|
|
.unwrap_or_else(|| "None".to_string())
|
|
};
|
|
write!(
|
|
f,
|
|
"payload withdrawals root was {} not {}",
|
|
opt_string(payload),
|
|
opt_string(expected)
|
|
)
|
|
}
|
|
InvalidBuilderPayload::GasLimitMismatch { payload, expected } => {
|
|
write!(f, "payload gas limit was {} not {}", payload, expected)
|
|
}
|
|
Self::SszTypesError(e) => write!(f, "{:?}", e),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Calculate the expected gas limit for a block.
|
|
pub fn expected_gas_limit(
|
|
parent_gas_limit: u64,
|
|
target_gas_limit: u64,
|
|
spec: &ChainSpec,
|
|
) -> Option<u64> {
|
|
// Calculate the maximum gas limit difference allowed safely
|
|
let max_gas_limit_difference = parent_gas_limit
|
|
.checked_div(spec.gas_limit_adjustment_factor)
|
|
.and_then(|result| result.checked_sub(1))
|
|
.unwrap_or(0);
|
|
|
|
// Adjust the gas limit safely
|
|
if target_gas_limit > parent_gas_limit {
|
|
let gas_diff = target_gas_limit.saturating_sub(parent_gas_limit);
|
|
parent_gas_limit.checked_add(std::cmp::min(gas_diff, max_gas_limit_difference))
|
|
} else {
|
|
let gas_diff = parent_gas_limit.saturating_sub(target_gas_limit);
|
|
parent_gas_limit.checked_sub(std::cmp::min(gas_diff, max_gas_limit_difference))
|
|
}
|
|
}
|
|
|
|
/// Perform some cursory, non-exhaustive validation of the bid returned from the builder.
|
|
fn verify_builder_bid<E: EthSpec>(
|
|
bid: &ForkVersionedResponse<SignedBuilderBid<E>>,
|
|
payload_parameters: PayloadParameters<'_>,
|
|
block_number: Option<u64>,
|
|
spec: &ChainSpec,
|
|
) -> Result<(), Box<InvalidBuilderPayload>> {
|
|
let PayloadParameters {
|
|
parent_hash,
|
|
payload_attributes,
|
|
current_fork,
|
|
parent_gas_limit,
|
|
proposer_gas_limit,
|
|
..
|
|
} = payload_parameters;
|
|
|
|
let is_signature_valid = bid.data.verify_signature(spec);
|
|
let header = &bid.data.message.header();
|
|
|
|
metrics::set_gauge_vec(
|
|
&metrics::EXECUTION_LAYER_PAYLOAD_BIDS,
|
|
&[metrics::BUILDER],
|
|
bid.data.message.value().to_i64(),
|
|
);
|
|
|
|
let expected_withdrawals_root = payload_attributes
|
|
.withdrawals()
|
|
.ok()
|
|
.cloned()
|
|
.map(|withdrawals| {
|
|
Withdrawals::<E>::try_from(withdrawals)
|
|
.map_err(|e| Box::new(InvalidBuilderPayload::SszTypesError(e)))
|
|
.map(|w| w.tree_hash_root())
|
|
})
|
|
.transpose()?;
|
|
|
|
let payload_withdrawals_root = header.withdrawals_root().ok();
|
|
let expected_gas_limit = proposer_gas_limit
|
|
.and_then(|target_gas_limit| expected_gas_limit(parent_gas_limit, target_gas_limit, spec));
|
|
|
|
if header.parent_hash() != parent_hash {
|
|
Err(Box::new(InvalidBuilderPayload::ParentHash {
|
|
payload: header.parent_hash(),
|
|
expected: parent_hash,
|
|
}))
|
|
} else if header.prev_randao() != payload_attributes.prev_randao() {
|
|
Err(Box::new(InvalidBuilderPayload::PrevRandao {
|
|
payload: header.prev_randao(),
|
|
expected: payload_attributes.prev_randao(),
|
|
}))
|
|
} else if header.timestamp() != payload_attributes.timestamp() {
|
|
Err(Box::new(InvalidBuilderPayload::Timestamp {
|
|
payload: header.timestamp(),
|
|
expected: payload_attributes.timestamp(),
|
|
}))
|
|
} else if block_number.is_some_and(|n| n != header.block_number()) {
|
|
Err(Box::new(InvalidBuilderPayload::BlockNumber {
|
|
payload: header.block_number(),
|
|
expected: block_number,
|
|
}))
|
|
} else if bid.version != current_fork {
|
|
Err(Box::new(InvalidBuilderPayload::Fork {
|
|
payload: bid.version,
|
|
expected: current_fork,
|
|
}))
|
|
} else if !is_signature_valid {
|
|
Err(Box::new(InvalidBuilderPayload::Signature {
|
|
signature: bid.data.signature.clone(),
|
|
pubkey: *bid.data.message.pubkey(),
|
|
}))
|
|
} else if payload_withdrawals_root != expected_withdrawals_root {
|
|
Err(Box::new(InvalidBuilderPayload::WithdrawalsRoot {
|
|
payload: payload_withdrawals_root,
|
|
expected: expected_withdrawals_root,
|
|
}))
|
|
} else if expected_gas_limit
|
|
.map(|gas_limit| header.gas_limit() != gas_limit)
|
|
.unwrap_or(false)
|
|
{
|
|
Err(Box::new(InvalidBuilderPayload::GasLimitMismatch {
|
|
payload: header.gas_limit(),
|
|
expected: expected_gas_limit.unwrap_or(0),
|
|
}))
|
|
} else {
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
/// A helper function to record the time it takes to execute a future.
|
|
async fn timed_future<F: Future<Output = T>, T>(metric: &str, future: F) -> (T, Duration) {
|
|
let start = Instant::now();
|
|
let result = future.await;
|
|
let duration = start.elapsed();
|
|
metrics::observe_timer_vec(&metrics::EXECUTION_LAYER_REQUEST_TIMES, &[metric], duration);
|
|
(result, duration)
|
|
}
|
|
|
|
fn noop<E: EthSpec>(
|
|
_: &ExecutionLayer<E>,
|
|
_: PayloadContentsRefTuple<E>,
|
|
) -> Option<FullPayloadContents<E>> {
|
|
None
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod test {
|
|
use super::*;
|
|
use crate::test_utils::MockExecutionLayer as GenericMockExecutionLayer;
|
|
use task_executor::test_utils::TestRuntime;
|
|
use types::MainnetEthSpec;
|
|
|
|
type MockExecutionLayer = GenericMockExecutionLayer<MainnetEthSpec>;
|
|
|
|
#[tokio::test]
|
|
async fn produce_three_valid_pos_execution_blocks() {
|
|
let runtime = TestRuntime::default();
|
|
MockExecutionLayer::default_params(runtime.task_executor.clone())
|
|
.produce_valid_execution_payload_on_head()
|
|
.await
|
|
.produce_valid_execution_payload_on_head()
|
|
.await
|
|
.produce_valid_execution_payload_on_head()
|
|
.await;
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_expected_gas_limit() {
|
|
let spec = ChainSpec::mainnet();
|
|
assert_eq!(
|
|
expected_gas_limit(30_000_000, 30_000_000, &spec),
|
|
Some(30_000_000)
|
|
);
|
|
assert_eq!(
|
|
expected_gas_limit(30_000_000, 40_000_000, &spec),
|
|
Some(30_029_295)
|
|
);
|
|
assert_eq!(
|
|
expected_gas_limit(30_029_295, 40_000_000, &spec),
|
|
Some(30_058_619)
|
|
);
|
|
assert_eq!(
|
|
expected_gas_limit(30_058_619, 30_000_000, &spec),
|
|
Some(30_029_266)
|
|
);
|
|
}
|
|
}
|