Merge remote-tracking branch 'origin/unstable' into tree-states

This commit is contained in:
Michael Sproul
2022-09-14 13:51:23 +10:00
404 changed files with 28947 additions and 12000 deletions

View File

@@ -1,12 +1,11 @@
use crate::engines::ForkChoiceState;
use async_trait::async_trait;
use eth1::http::RpcError;
pub use ethers_core::types::Transaction;
use http::deposit_methods::RpcError;
pub use json_structures::TransitionConfigurationV1;
use reqwest::StatusCode;
use serde::{Deserialize, Serialize};
use slog::Logger;
use ssz_types::FixedVector;
use strum::IntoStaticStr;
pub use types::{
Address, EthSpec, ExecutionBlockHash, ExecutionPayload, ExecutionPayloadHeader, Hash256,
Uint256, VariableList,
@@ -29,10 +28,7 @@ pub enum Error {
InvalidExecutePayloadResponse(&'static str),
JsonRpc(RpcError),
Json(serde_json::Error),
ServerMessage {
code: i64,
message: String,
},
ServerMessage { code: i64, message: String },
Eip155Failure,
IsSyncing,
ExecutionBlockNotFound(ExecutionBlockHash),
@@ -41,15 +37,9 @@ pub enum Error {
PayloadIdUnavailable,
TransitionConfigurationMismatch,
PayloadConversionLogicFlaw,
InvalidBuilderQuery,
MissingPayloadId {
parent_hash: ExecutionBlockHash,
timestamp: u64,
prev_randao: Hash256,
suggested_fee_recipient: Address,
},
DeserializeTransaction(ssz_types::Error),
DeserializeTransactions(ssz_types::Error),
BuilderApi(builder_client::Error),
}
impl From<reqwest::Error> for Error {
@@ -77,27 +67,20 @@ impl From<auth::Error> for Error {
}
}
pub struct EngineApi;
pub struct BuilderApi;
#[async_trait]
pub trait Builder {
async fn notify_forkchoice_updated(
&self,
forkchoice_state: ForkChoiceState,
payload_attributes: Option<PayloadAttributes>,
log: &Logger,
) -> Result<ForkchoiceUpdatedResponse, Error>;
impl From<builder_client::Error> for Error {
fn from(e: builder_client::Error) -> Self {
Error::BuilderApi(e)
}
}
#[derive(Clone, Copy, Debug, PartialEq)]
#[derive(Clone, Copy, Debug, PartialEq, IntoStaticStr)]
#[strum(serialize_all = "snake_case")]
pub enum PayloadStatusV1Status {
Valid,
Invalid,
Syncing,
Accepted,
InvalidBlockHash,
InvalidTerminalBlock,
}
#[derive(Clone, Debug, PartialEq)]
@@ -125,6 +108,8 @@ pub struct ExecutionBlock {
pub block_number: u64,
pub parent_hash: ExecutionBlockHash,
pub total_difficulty: Uint256,
#[serde(with = "eth2_serde_utils::u64_hex_be")]
pub timestamp: u64,
}
/// Representation of an exection block with enough detail to reconstruct a payload.

View File

@@ -1,3 +1,5 @@
use std::path::PathBuf;
use jsonwebtoken::{encode, get_current_timestamp, Algorithm, EncodingKey, Header};
use rand::Rng;
use serde::{Deserialize, Serialize};
@@ -13,6 +15,7 @@ pub const JWT_SECRET_LENGTH: usize = 32;
pub enum Error {
JWT(jsonwebtoken::errors::Error),
InvalidToken,
InvalidKey(String),
}
impl From<jsonwebtoken::errors::Error> for Error {
@@ -22,7 +25,7 @@ impl From<jsonwebtoken::errors::Error> for Error {
}
/// Provides wrapper around `[u8; JWT_SECRET_LENGTH]` that implements `Zeroize`.
#[derive(Zeroize)]
#[derive(Zeroize, Clone)]
#[zeroize(drop)]
pub struct JwtKey([u8; JWT_SECRET_LENGTH as usize]);
@@ -57,6 +60,14 @@ impl JwtKey {
}
}
pub fn strip_prefix(s: &str) -> &str {
if let Some(stripped) = s.strip_prefix("0x") {
stripped
} else {
s
}
}
/// Contains the JWT secret and claims parameters.
pub struct Auth {
key: EncodingKey,
@@ -73,6 +84,28 @@ impl Auth {
}
}
/// Create a new `Auth` struct given the path to the file containing the hex
/// encoded jwt key.
pub fn new_with_path(
jwt_path: PathBuf,
id: Option<String>,
clv: Option<String>,
) -> Result<Self, Error> {
std::fs::read_to_string(&jwt_path)
.map_err(|e| {
Error::InvalidKey(format!(
"Failed to read JWT secret file {:?}, error: {:?}",
jwt_path, e
))
})
.and_then(|ref s| {
let secret_bytes = hex::decode(strip_prefix(s.trim_end()))
.map_err(|e| Error::InvalidKey(format!("Invalid hex string: {:?}", e)))?;
let secret = JwtKey::from_slice(&secret_bytes).map_err(Error::InvalidKey)?;
Ok(Self::new(secret, id, clv))
})
}
/// Generate a JWT token with `claims.iat` set to current time.
pub fn generate_token(&self) -> Result<String, Error> {
let claims = self.generate_claims_at_timestamp();
@@ -126,12 +159,12 @@ pub struct Claims {
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::JWT_SECRET;
use crate::test_utils::DEFAULT_JWT_SECRET;
#[test]
fn test_roundtrip() {
let auth = Auth::new(
JwtKey::from_slice(&JWT_SECRET).unwrap(),
JwtKey::from_slice(&DEFAULT_JWT_SECRET).unwrap(),
Some("42".into()),
Some("Lighthouse".into()),
);
@@ -139,7 +172,7 @@ mod tests {
let token = auth.generate_token_with_claims(&claims).unwrap();
assert_eq!(
Auth::validate_token(&token, &JwtKey::from_slice(&JWT_SECRET).unwrap())
Auth::validate_token(&token, &JwtKey::from_slice(&DEFAULT_JWT_SECRET).unwrap())
.unwrap()
.claims,
claims

View File

@@ -3,15 +3,15 @@
use super::*;
use crate::auth::Auth;
use crate::json_structures::*;
use eth1::http::EIP155_ERROR_STR;
use reqwest::header::CONTENT_TYPE;
use sensitive_url::SensitiveUrl;
use serde::de::DeserializeOwned;
use serde_json::json;
use std::marker::PhantomData;
use std::time::Duration;
use types::{BlindedPayload, EthSpec, ExecutionPayloadHeader, SignedBeaconBlock};
use std::time::Duration;
use types::EthSpec;
pub use deposit_log::{DepositLog, Log};
pub use reqwest::Client;
const STATIC_ID: u32 = 1;
@@ -26,42 +26,507 @@ pub const ETH_GET_BLOCK_BY_HASH: &str = "eth_getBlockByHash";
pub const ETH_GET_BLOCK_BY_HASH_TIMEOUT: Duration = Duration::from_secs(1);
pub const ETH_SYNCING: &str = "eth_syncing";
pub const ETH_SYNCING_TIMEOUT: Duration = Duration::from_millis(250);
pub const ETH_SYNCING_TIMEOUT: Duration = Duration::from_secs(1);
pub const ENGINE_NEW_PAYLOAD_V1: &str = "engine_newPayloadV1";
pub const ENGINE_NEW_PAYLOAD_TIMEOUT: Duration = Duration::from_secs(6);
pub const ENGINE_NEW_PAYLOAD_TIMEOUT: Duration = Duration::from_secs(8);
pub const ENGINE_GET_PAYLOAD_V1: &str = "engine_getPayloadV1";
pub const ENGINE_GET_PAYLOAD_TIMEOUT: Duration = Duration::from_secs(2);
pub const ENGINE_FORKCHOICE_UPDATED_V1: &str = "engine_forkchoiceUpdatedV1";
pub const ENGINE_FORKCHOICE_UPDATED_TIMEOUT: Duration = Duration::from_secs(6);
pub const ENGINE_FORKCHOICE_UPDATED_TIMEOUT: Duration = Duration::from_secs(8);
pub const ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1: &str =
"engine_exchangeTransitionConfigurationV1";
pub const ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1_TIMEOUT: Duration =
Duration::from_millis(500);
pub const ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1_TIMEOUT: Duration = Duration::from_secs(1);
pub const BUILDER_GET_PAYLOAD_HEADER_V1: &str = "builder_getPayloadHeaderV1";
pub const BUILDER_GET_PAYLOAD_HEADER_TIMEOUT: Duration = Duration::from_secs(2);
/// This error is returned during a `chainId` call by Geth.
pub const EIP155_ERROR_STR: &str = "chain not synced beyond EIP-155 replay-protection fork block";
pub const BUILDER_PROPOSE_BLINDED_BLOCK_V1: &str = "builder_proposeBlindedBlockV1";
pub const BUILDER_PROPOSE_BLINDED_BLOCK_TIMEOUT: Duration = Duration::from_secs(2);
/// Contains methods to convert arbitary bytes to an ETH2 deposit contract object.
pub mod deposit_log {
use ssz::Decode;
use state_processing::per_block_processing::signature_sets::deposit_pubkey_signature_message;
use types::{ChainSpec, DepositData, Hash256, PublicKeyBytes, SignatureBytes};
pub struct HttpJsonRpc<T = EngineApi> {
pub use eth2::lighthouse::DepositLog;
/// The following constants define the layout of bytes in the deposit contract `DepositEvent`. The
/// event bytes are formatted according to the Ethereum ABI.
const PUBKEY_START: usize = 192;
const PUBKEY_LEN: usize = 48;
const CREDS_START: usize = PUBKEY_START + 64 + 32;
const CREDS_LEN: usize = 32;
const AMOUNT_START: usize = CREDS_START + 32 + 32;
const AMOUNT_LEN: usize = 8;
const SIG_START: usize = AMOUNT_START + 32 + 32;
const SIG_LEN: usize = 96;
const INDEX_START: usize = SIG_START + 96 + 32;
const INDEX_LEN: usize = 8;
/// A reduced set of fields from an Eth1 contract log.
#[derive(Debug, PartialEq, Clone)]
pub struct Log {
pub block_number: u64,
pub data: Vec<u8>,
}
impl Log {
/// Attempts to parse a raw `Log` from the deposit contract into a `DepositLog`.
pub fn to_deposit_log(&self, spec: &ChainSpec) -> Result<DepositLog, String> {
let bytes = &self.data;
let pubkey = bytes
.get(PUBKEY_START..PUBKEY_START + PUBKEY_LEN)
.ok_or("Insufficient bytes for pubkey")?;
let withdrawal_credentials = bytes
.get(CREDS_START..CREDS_START + CREDS_LEN)
.ok_or("Insufficient bytes for withdrawal credential")?;
let amount = bytes
.get(AMOUNT_START..AMOUNT_START + AMOUNT_LEN)
.ok_or("Insufficient bytes for amount")?;
let signature = bytes
.get(SIG_START..SIG_START + SIG_LEN)
.ok_or("Insufficient bytes for signature")?;
let index = bytes
.get(INDEX_START..INDEX_START + INDEX_LEN)
.ok_or("Insufficient bytes for index")?;
let deposit_data = DepositData {
pubkey: PublicKeyBytes::from_ssz_bytes(pubkey)
.map_err(|e| format!("Invalid pubkey ssz: {:?}", e))?,
withdrawal_credentials: Hash256::from_ssz_bytes(withdrawal_credentials)
.map_err(|e| format!("Invalid withdrawal_credentials ssz: {:?}", e))?,
amount: u64::from_ssz_bytes(amount)
.map_err(|e| format!("Invalid amount ssz: {:?}", e))?,
signature: SignatureBytes::from_ssz_bytes(signature)
.map_err(|e| format!("Invalid signature ssz: {:?}", e))?,
};
let signature_is_valid = deposit_pubkey_signature_message(&deposit_data, spec)
.map_or(false, |(public_key, signature, msg)| {
signature.verify(&public_key, msg)
});
Ok(DepositLog {
deposit_data,
block_number: self.block_number,
index: u64::from_ssz_bytes(index)
.map_err(|e| format!("Invalid index ssz: {:?}", e))?,
signature_is_valid,
})
}
}
#[cfg(test)]
pub mod tests {
use super::*;
use types::{EthSpec, MainnetEthSpec};
/// The data from a deposit event, using the v0.8.3 version of the deposit contract.
pub const EXAMPLE_LOG: &[u8] = &[
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 160, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 1, 64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 128, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 48, 167, 108, 6, 69, 88, 17,
3, 51, 6, 4, 158, 232, 82, 248, 218, 2, 71, 219, 55, 102, 86, 125, 136, 203, 36, 77,
64, 213, 43, 52, 175, 154, 239, 50, 142, 52, 201, 77, 54, 239, 0, 229, 22, 46, 139,
120, 62, 240, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
8, 0, 64, 89, 115, 7, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 96, 140, 74, 175, 158, 209, 20, 206, 30, 63, 215, 238, 113, 60,
132, 216, 211, 100, 186, 202, 71, 34, 200, 160, 225, 212, 213, 119, 88, 51, 80, 101,
74, 2, 45, 78, 153, 12, 192, 44, 51, 77, 40, 10, 72, 246, 34, 193, 187, 22, 95, 4, 211,
245, 224, 13, 162, 21, 163, 54, 225, 22, 124, 3, 56, 14, 81, 122, 189, 149, 250, 251,
159, 22, 77, 94, 157, 197, 196, 253, 110, 201, 88, 193, 246, 136, 226, 221, 18, 113,
232, 105, 100, 114, 103, 237, 189, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 7, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
];
#[test]
fn can_parse_example_log() {
let log = Log {
block_number: 42,
data: EXAMPLE_LOG.to_vec(),
};
log.to_deposit_log(&MainnetEthSpec::default_spec())
.expect("should decode log");
}
}
}
/// Contains subset of the HTTP JSON-RPC methods used to query an execution node for
/// state of the deposit contract.
pub mod deposit_methods {
use super::Log;
use crate::HttpJsonRpc;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::fmt;
use std::ops::Range;
use std::str::FromStr;
use std::time::Duration;
use types::Hash256;
/// `keccak("DepositEvent(bytes,bytes,bytes,bytes,bytes)")`
pub const DEPOSIT_EVENT_TOPIC: &str =
"0x649bbc62d0e31342afea4e5cd82d4049e7e1ee912fc0889aa790803be39038c5";
/// `keccak("get_deposit_root()")[0..4]`
pub const DEPOSIT_ROOT_FN_SIGNATURE: &str = "0xc5f2892f";
/// `keccak("get_deposit_count()")[0..4]`
pub const DEPOSIT_COUNT_FN_SIGNATURE: &str = "0x621fd130";
/// Number of bytes in deposit contract deposit root response.
pub const DEPOSIT_COUNT_RESPONSE_BYTES: usize = 96;
/// Number of bytes in deposit contract deposit root (value only).
pub const DEPOSIT_ROOT_BYTES: usize = 32;
/// Represents an eth1 chain/network id.
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub enum Eth1Id {
Goerli,
Mainnet,
Custom(u64),
}
#[derive(Debug, PartialEq, Clone)]
pub struct Block {
pub hash: Hash256,
pub timestamp: u64,
pub number: u64,
}
/// Used to identify a block when querying the Eth1 node.
#[derive(Clone, Copy)]
pub enum BlockQuery {
Number(u64),
Latest,
}
impl Into<u64> for Eth1Id {
fn into(self) -> u64 {
match self {
Eth1Id::Mainnet => 1,
Eth1Id::Goerli => 5,
Eth1Id::Custom(id) => id,
}
}
}
impl From<u64> for Eth1Id {
fn from(id: u64) -> Self {
let into = |x: Eth1Id| -> u64 { x.into() };
match id {
id if id == into(Eth1Id::Mainnet) => Eth1Id::Mainnet,
id if id == into(Eth1Id::Goerli) => Eth1Id::Goerli,
id => Eth1Id::Custom(id),
}
}
}
impl FromStr for Eth1Id {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
s.parse::<u64>()
.map(Into::into)
.map_err(|e| format!("Failed to parse eth1 network id {}", e))
}
}
/// Represents an error received from a remote procecdure call.
#[derive(Debug, Serialize, Deserialize)]
pub enum RpcError {
NoResultField,
Eip155Error,
InvalidJson(String),
Error(String),
}
impl fmt::Display for RpcError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RpcError::NoResultField => write!(f, "No result field in response"),
RpcError::Eip155Error => write!(f, "Not synced past EIP-155"),
RpcError::InvalidJson(e) => write!(f, "Malformed JSON received: {}", e),
RpcError::Error(s) => write!(f, "{}", s),
}
}
}
impl From<RpcError> for String {
fn from(e: RpcError) -> String {
e.to_string()
}
}
/// Parses a `0x`-prefixed, **big-endian** hex string as a u64.
///
/// Note: the JSON-RPC encodes integers as big-endian. The deposit contract uses little-endian.
/// Therefore, this function is only useful for numbers encoded by the JSON RPC.
///
/// E.g., `0x01 == 1`
fn hex_to_u64_be(hex: &str) -> Result<u64, String> {
u64::from_str_radix(strip_prefix(hex)?, 16)
.map_err(|e| format!("Failed to parse hex as u64: {:?}", e))
}
/// Parses a `0x`-prefixed, big-endian hex string as bytes.
///
/// E.g., `0x0102 == vec![1, 2]`
fn hex_to_bytes(hex: &str) -> Result<Vec<u8>, String> {
hex::decode(strip_prefix(hex)?)
.map_err(|e| format!("Failed to parse hex as bytes: {:?}", e))
}
/// Removes the `0x` prefix from some bytes. Returns an error if the prefix is not present.
fn strip_prefix(hex: &str) -> Result<&str, String> {
if let Some(stripped) = hex.strip_prefix("0x") {
Ok(stripped)
} else {
Err("Hex string did not start with `0x`".to_string())
}
}
impl HttpJsonRpc {
/// Get the eth1 chain id of the given endpoint.
pub async fn get_chain_id(&self, timeout: Duration) -> Result<Eth1Id, String> {
let chain_id: String = self
.rpc_request("eth_chainId", json!([]), timeout)
.await
.map_err(|e| format!("eth_chainId call failed {:?}", e))?;
hex_to_u64_be(chain_id.as_str()).map(|id| id.into())
}
/// Returns the current block number.
pub async fn get_block_number(&self, timeout: Duration) -> Result<u64, String> {
let response: String = self
.rpc_request("eth_blockNumber", json!([]), timeout)
.await
.map_err(|e| format!("eth_blockNumber call failed {:?}", e))?;
hex_to_u64_be(response.as_str())
.map_err(|e| format!("Failed to get block number: {}", e))
}
/// Gets a block hash by block number.
pub async fn get_block(
&self,
query: BlockQuery,
timeout: Duration,
) -> Result<Block, String> {
let query_param = match query {
BlockQuery::Number(block_number) => format!("0x{:x}", block_number),
BlockQuery::Latest => "latest".to_string(),
};
let params = json!([
query_param,
false // do not return full tx objects.
]);
let response: Value = self
.rpc_request("eth_getBlockByNumber", params, timeout)
.await
.map_err(|e| format!("eth_getBlockByNumber call failed {:?}", e))?;
let hash: Vec<u8> = hex_to_bytes(
response
.get("hash")
.ok_or("No hash for block")?
.as_str()
.ok_or("Block hash was not string")?,
)?;
let hash: Hash256 = if hash.len() == 32 {
Hash256::from_slice(&hash)
} else {
return Err(format!("Block hash was not 32 bytes: {:?}", hash));
};
let timestamp = hex_to_u64_be(
response
.get("timestamp")
.ok_or("No timestamp for block")?
.as_str()
.ok_or("Block timestamp was not string")?,
)?;
let number = hex_to_u64_be(
response
.get("number")
.ok_or("No number for block")?
.as_str()
.ok_or("Block number was not string")?,
)?;
if number <= usize::max_value() as u64 {
Ok(Block {
hash,
timestamp,
number,
})
} else {
Err(format!("Block number {} is larger than a usize", number))
}
.map_err(|e| format!("Failed to get block number: {}", e))
}
/// Returns the value of the `get_deposit_count()` call at the given `address` for the given
/// `block_number`.
///
/// Assumes that the `address` has the same ABI as the eth2 deposit contract.
pub async fn get_deposit_count(
&self,
address: &str,
block_number: u64,
timeout: Duration,
) -> Result<Option<u64>, String> {
let result = self
.call(address, DEPOSIT_COUNT_FN_SIGNATURE, block_number, timeout)
.await?;
match result {
None => Err("Deposit root response was none".to_string()),
Some(bytes) => {
if bytes.is_empty() {
Ok(None)
} else if bytes.len() == DEPOSIT_COUNT_RESPONSE_BYTES {
let mut array = [0; 8];
array.copy_from_slice(&bytes[32 + 32..32 + 32 + 8]);
Ok(Some(u64::from_le_bytes(array)))
} else {
Err(format!(
"Deposit count response was not {} bytes: {:?}",
DEPOSIT_COUNT_RESPONSE_BYTES, bytes
))
}
}
}
}
/// Returns the value of the `get_hash_tree_root()` call at the given `block_number`.
///
/// Assumes that the `address` has the same ABI as the eth2 deposit contract.
pub async fn get_deposit_root(
&self,
address: &str,
block_number: u64,
timeout: Duration,
) -> Result<Option<Hash256>, String> {
let result = self
.call(address, DEPOSIT_ROOT_FN_SIGNATURE, block_number, timeout)
.await?;
match result {
None => Err("Deposit root response was none".to_string()),
Some(bytes) => {
if bytes.is_empty() {
Ok(None)
} else if bytes.len() == DEPOSIT_ROOT_BYTES {
Ok(Some(Hash256::from_slice(&bytes)))
} else {
Err(format!(
"Deposit root response was not {} bytes: {:?}",
DEPOSIT_ROOT_BYTES, bytes
))
}
}
}
}
/// Performs a instant, no-transaction call to the contract `address` with the given `0x`-prefixed
/// `hex_data`.
///
/// Returns bytes, if any.
async fn call(
&self,
address: &str,
hex_data: &str,
block_number: u64,
timeout: Duration,
) -> Result<Option<Vec<u8>>, String> {
let params = json! ([
{
"to": address,
"data": hex_data,
},
format!("0x{:x}", block_number)
]);
let response: Option<String> = self
.rpc_request("eth_call", params, timeout)
.await
.map_err(|e| format!("eth_call call failed {:?}", e))?;
response.map(|s| hex_to_bytes(&s)).transpose()
}
/// Returns logs for the `DEPOSIT_EVENT_TOPIC`, for the given `address` in the given
/// `block_height_range`.
///
/// It's not clear from the Ethereum JSON-RPC docs if this range is inclusive or not.
pub async fn get_deposit_logs_in_range(
&self,
address: &str,
block_height_range: Range<u64>,
timeout: Duration,
) -> Result<Vec<Log>, String> {
let params = json! ([{
"address": address,
"topics": [DEPOSIT_EVENT_TOPIC],
"fromBlock": format!("0x{:x}", block_height_range.start),
"toBlock": format!("0x{:x}", block_height_range.end),
}]);
let response: Value = self
.rpc_request("eth_getLogs", params, timeout)
.await
.map_err(|e| format!("eth_getLogs call failed {:?}", e))?;
response
.as_array()
.cloned()
.ok_or("'result' value was not an array")?
.into_iter()
.map(|value| {
let block_number = value
.get("blockNumber")
.ok_or("No block number field in log")?
.as_str()
.ok_or("Block number was not string")?;
let data = value
.get("data")
.ok_or("No block number field in log")?
.as_str()
.ok_or("Data was not string")?;
Ok(Log {
block_number: hex_to_u64_be(block_number)?,
data: hex_to_bytes(data)?,
})
})
.collect::<Result<Vec<Log>, String>>()
.map_err(|e| format!("Failed to get logs in range: {}", e))
}
}
}
pub struct HttpJsonRpc {
pub client: Client,
pub url: SensitiveUrl,
auth: Option<Auth>,
_phantom: PhantomData<T>,
}
impl<T> HttpJsonRpc<T> {
impl HttpJsonRpc {
pub fn new(url: SensitiveUrl) -> Result<Self, Error> {
Ok(Self {
client: Client::builder().build()?,
url,
auth: None,
_phantom: PhantomData,
})
}
@@ -70,7 +535,6 @@ impl<T> HttpJsonRpc<T> {
client: Client::builder().build()?,
url,
auth: Some(auth),
_phantom: PhantomData,
})
}
@@ -117,7 +581,13 @@ impl<T> HttpJsonRpc<T> {
}
}
impl HttpJsonRpc<EngineApi> {
impl std::fmt::Display for HttpJsonRpc {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}, auth={}", self.url, self.auth.is_some())
}
}
impl HttpJsonRpc {
pub async fn upcheck(&self) -> Result<(), Error> {
let result: serde_json::Value = self
.rpc_request(ETH_SYNCING, json!([]), ETH_SYNCING_TIMEOUT)
@@ -233,67 +703,11 @@ impl HttpJsonRpc<EngineApi> {
}
}
impl HttpJsonRpc<BuilderApi> {
pub async fn get_payload_header_v1<T: EthSpec>(
&self,
payload_id: PayloadId,
) -> Result<ExecutionPayloadHeader<T>, Error> {
let params = json!([JsonPayloadIdRequest::from(payload_id)]);
let response: JsonExecutionPayloadHeaderV1<T> = self
.rpc_request(
BUILDER_GET_PAYLOAD_HEADER_V1,
params,
BUILDER_GET_PAYLOAD_HEADER_TIMEOUT,
)
.await?;
Ok(response.into())
}
pub async fn forkchoice_updated_v1(
&self,
forkchoice_state: ForkChoiceState,
payload_attributes: Option<PayloadAttributes>,
) -> Result<ForkchoiceUpdatedResponse, Error> {
let params = json!([
JsonForkChoiceStateV1::from(forkchoice_state),
payload_attributes.map(JsonPayloadAttributesV1::from)
]);
let response: JsonForkchoiceUpdatedV1Response = self
.rpc_request(
ENGINE_FORKCHOICE_UPDATED_V1,
params,
ENGINE_FORKCHOICE_UPDATED_TIMEOUT,
)
.await?;
Ok(response.into())
}
pub async fn propose_blinded_block_v1<T: EthSpec>(
&self,
block: SignedBeaconBlock<T, BlindedPayload<T>>,
) -> Result<ExecutionPayload<T>, Error> {
let params = json!([block]);
let response: JsonExecutionPayloadV1<T> = self
.rpc_request(
BUILDER_PROPOSE_BLINDED_BLOCK_V1,
params,
BUILDER_PROPOSE_BLINDED_BLOCK_TIMEOUT,
)
.await?;
Ok(response.into())
}
}
#[cfg(test)]
mod test {
use super::auth::JwtKey;
use super::*;
use crate::test_utils::{MockServer, JWT_SECRET};
use crate::test_utils::{MockServer, DEFAULT_JWT_SECRET};
use std::future::Future;
use std::str::FromStr;
use std::sync::Arc;
@@ -313,8 +727,10 @@ mod test {
let echo_url = SensitiveUrl::parse(&format!("{}/echo", server.url())).unwrap();
// Create rpc clients that include JWT auth headers if `with_auth` is true.
let (rpc_client, echo_client) = if with_auth {
let rpc_auth = Auth::new(JwtKey::from_slice(&JWT_SECRET).unwrap(), None, None);
let echo_auth = Auth::new(JwtKey::from_slice(&JWT_SECRET).unwrap(), None, None);
let rpc_auth =
Auth::new(JwtKey::from_slice(&DEFAULT_JWT_SECRET).unwrap(), None, None);
let echo_auth =
Auth::new(JwtKey::from_slice(&DEFAULT_JWT_SECRET).unwrap(), None, None);
(
Arc::new(HttpJsonRpc::new_with_auth(rpc_url, rpc_auth).unwrap()),
Arc::new(HttpJsonRpc::new_with_auth(echo_url, echo_auth).unwrap()),

View File

@@ -78,6 +78,7 @@ pub struct JsonExecutionPayloadHeaderV1<T: EthSpec> {
pub timestamp: u64,
#[serde(with = "ssz_types::serde_utils::hex_var_list")]
pub extra_data: VariableList<u8, T::MaxExtraDataBytes>,
#[serde(with = "eth2_serde_utils::u256_hex_be")]
pub base_fee_per_gas: Uint256,
pub block_hash: ExecutionBlockHash,
pub transactions_root: Hash256,
@@ -142,6 +143,7 @@ pub struct JsonExecutionPayloadV1<T: EthSpec> {
pub timestamp: u64,
#[serde(with = "ssz_types::serde_utils::hex_var_list")]
pub extra_data: VariableList<u8, T::MaxExtraDataBytes>,
#[serde(with = "eth2_serde_utils::u256_hex_be")]
pub base_fee_per_gas: Uint256,
pub block_hash: ExecutionBlockHash,
#[serde(with = "ssz_types::serde_utils::list_of_hex_var_list")]
@@ -320,7 +322,6 @@ pub enum JsonPayloadStatusV1Status {
Syncing,
Accepted,
InvalidBlockHash,
InvalidTerminalBlock,
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
@@ -339,9 +340,6 @@ impl From<PayloadStatusV1Status> for JsonPayloadStatusV1Status {
PayloadStatusV1Status::Syncing => JsonPayloadStatusV1Status::Syncing,
PayloadStatusV1Status::Accepted => JsonPayloadStatusV1Status::Accepted,
PayloadStatusV1Status::InvalidBlockHash => JsonPayloadStatusV1Status::InvalidBlockHash,
PayloadStatusV1Status::InvalidTerminalBlock => {
JsonPayloadStatusV1Status::InvalidTerminalBlock
}
}
}
}
@@ -353,9 +351,6 @@ impl From<JsonPayloadStatusV1Status> for PayloadStatusV1Status {
JsonPayloadStatusV1Status::Syncing => PayloadStatusV1Status::Syncing,
JsonPayloadStatusV1Status::Accepted => PayloadStatusV1Status::Accepted,
JsonPayloadStatusV1Status::InvalidBlockHash => PayloadStatusV1Status::InvalidBlockHash,
JsonPayloadStatusV1Status::InvalidTerminalBlock => {
PayloadStatusV1Status::InvalidTerminalBlock
}
}
}
}
@@ -430,62 +425,10 @@ impl From<ForkchoiceUpdatedResponse> for JsonForkchoiceUpdatedV1Response {
}
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub enum JsonProposeBlindedBlockResponseStatus {
Valid,
Invalid,
Syncing,
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
#[serde(bound = "E: EthSpec")]
pub struct JsonProposeBlindedBlockResponse<E: EthSpec> {
pub result: ExecutionPayload<E>,
pub error: Option<String>,
}
impl<E: EthSpec> From<JsonProposeBlindedBlockResponse<E>> for ExecutionPayload<E> {
fn from(j: JsonProposeBlindedBlockResponse<E>) -> Self {
let JsonProposeBlindedBlockResponse { result, error: _ } = j;
result
}
}
impl From<JsonProposeBlindedBlockResponseStatus> for ProposeBlindedBlockResponseStatus {
fn from(j: JsonProposeBlindedBlockResponseStatus) -> Self {
match j {
JsonProposeBlindedBlockResponseStatus::Valid => {
ProposeBlindedBlockResponseStatus::Valid
}
JsonProposeBlindedBlockResponseStatus::Invalid => {
ProposeBlindedBlockResponseStatus::Invalid
}
JsonProposeBlindedBlockResponseStatus::Syncing => {
ProposeBlindedBlockResponseStatus::Syncing
}
}
}
}
impl From<ProposeBlindedBlockResponseStatus> for JsonProposeBlindedBlockResponseStatus {
fn from(f: ProposeBlindedBlockResponseStatus) -> Self {
match f {
ProposeBlindedBlockResponseStatus::Valid => {
JsonProposeBlindedBlockResponseStatus::Valid
}
ProposeBlindedBlockResponseStatus::Invalid => {
JsonProposeBlindedBlockResponseStatus::Invalid
}
ProposeBlindedBlockResponseStatus::Syncing => {
JsonProposeBlindedBlockResponseStatus::Syncing
}
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TransitionConfigurationV1 {
#[serde(with = "eth2_serde_utils::u256_hex_be")]
pub terminal_total_difficulty: Uint256,
pub terminal_block_hash: ExecutionBlockHash,
#[serde(with = "eth2_serde_utils::u64_hex_be")]

View File

@@ -1,16 +1,16 @@
//! Provides generic behaviour for multiple execution engines, specifically fallback behaviour.
use crate::engine_api::{
Builder, EngineApi, Error as EngineApiError, ForkchoiceUpdatedResponse, PayloadAttributes,
PayloadId,
Error as EngineApiError, ForkchoiceUpdatedResponse, PayloadAttributes, PayloadId,
};
use crate::{BuilderApi, HttpJsonRpc};
use async_trait::async_trait;
use futures::future::join_all;
use crate::HttpJsonRpc;
use lru::LruCache;
use slog::{crit, debug, info, warn, Logger};
use slog::{debug, error, info, Logger};
use std::future::Future;
use tokio::sync::{Mutex, RwLock};
use std::sync::Arc;
use task_executor::TaskExecutor;
use tokio::sync::{watch, Mutex, RwLock};
use tokio_stream::wrappers::WatchStream;
use types::{Address, ExecutionBlockHash, Hash256};
/// The number of payload IDs that will be stored for each `Engine`.
@@ -19,14 +19,74 @@ use types::{Address, ExecutionBlockHash, Hash256};
const PAYLOAD_ID_LRU_CACHE_SIZE: usize = 512;
/// Stores the remembered state of a engine.
#[derive(Copy, Clone, PartialEq)]
enum EngineState {
#[derive(Copy, Clone, PartialEq, Debug, Eq, Default)]
enum EngineStateInternal {
Synced,
#[default]
Offline,
Syncing,
AuthFailed,
}
/// A subset of the engine state to inform other services if the engine is online or offline.
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
pub enum EngineState {
Online,
Offline,
}
impl From<EngineStateInternal> for EngineState {
fn from(state: EngineStateInternal) -> Self {
match state {
EngineStateInternal::Synced | EngineStateInternal::Syncing => EngineState::Online,
EngineStateInternal::Offline | EngineStateInternal::AuthFailed => EngineState::Offline,
}
}
}
/// Wrapper structure that ensures changes to the engine state are correctly reported to watchers.
struct State {
/// The actual engine state.
state: EngineStateInternal,
/// Notifier to watch the engine state.
notifier: watch::Sender<EngineState>,
}
impl std::ops::Deref for State {
type Target = EngineStateInternal;
fn deref(&self) -> &Self::Target {
&self.state
}
}
impl Default for State {
fn default() -> Self {
let state = EngineStateInternal::default();
let (notifier, _receiver) = watch::channel(state.into());
State { state, notifier }
}
}
impl State {
// Updates the state and notifies all watchers if the state has changed.
pub fn update(&mut self, new_state: EngineStateInternal) {
self.state = new_state;
self.notifier.send_if_modified(|last_state| {
let changed = *last_state != new_state.into(); // notify conditionally
*last_state = new_state.into(); // update the state unconditionally
changed
});
}
/// Gives access to a channel containing whether the last state is online.
///
/// This can be called several times.
pub fn watch(&self) -> WatchStream<EngineState> {
self.notifier.subscribe().into()
}
}
#[derive(Copy, Clone, PartialEq, Debug)]
pub struct ForkChoiceState {
pub head_block_hash: ExecutionBlockHash,
@@ -34,22 +94,6 @@ pub struct ForkChoiceState {
pub finalized_block_hash: ExecutionBlockHash,
}
/// Used to enable/disable logging on some tasks.
#[derive(Copy, Clone, PartialEq)]
pub enum Logging {
Enabled,
Disabled,
}
impl Logging {
pub fn is_enabled(&self) -> bool {
match self {
Logging::Enabled => true,
Logging::Disabled => false,
}
}
}
#[derive(Hash, PartialEq, std::cmp::Eq)]
struct PayloadIdCacheKey {
pub head_block_hash: ExecutionBlockHash,
@@ -58,25 +102,44 @@ struct PayloadIdCacheKey {
pub suggested_fee_recipient: Address,
}
/// An execution engine.
pub struct Engine<T> {
pub id: String,
pub api: HttpJsonRpc<T>,
payload_id_cache: Mutex<LruCache<PayloadIdCacheKey, PayloadId>>,
state: RwLock<EngineState>,
#[derive(Debug)]
pub enum EngineError {
Offline,
Api { error: EngineApiError },
BuilderApi { error: EngineApiError },
Auth,
}
impl<T> Engine<T> {
/// An execution engine.
pub struct Engine {
pub api: HttpJsonRpc,
payload_id_cache: Mutex<LruCache<PayloadIdCacheKey, PayloadId>>,
state: RwLock<State>,
latest_forkchoice_state: RwLock<Option<ForkChoiceState>>,
executor: TaskExecutor,
log: Logger,
}
impl Engine {
/// Creates a new, offline engine.
pub fn new(id: String, api: HttpJsonRpc<T>) -> Self {
pub fn new(api: HttpJsonRpc, executor: TaskExecutor, log: &Logger) -> Self {
Self {
id,
api,
payload_id_cache: Mutex::new(LruCache::new(PAYLOAD_ID_LRU_CACHE_SIZE)),
state: RwLock::new(EngineState::Offline),
state: Default::default(),
latest_forkchoice_state: Default::default(),
executor,
log: log.clone(),
}
}
/// Gives access to a channel containing the last engine state.
///
/// This can be called several times.
pub async fn watch_state(&self) -> WatchStream<EngineState> {
self.state.read().await.watch()
}
pub async fn get_payload_id(
&self,
head_block_hash: ExecutionBlockHash,
@@ -95,11 +158,8 @@ impl<T> Engine<T> {
})
.cloned()
}
}
#[async_trait]
impl Builder for Engine<EngineApi> {
async fn notify_forkchoice_updated(
pub async fn notify_forkchoice_updated(
&self,
forkchoice_state: ForkChoiceState,
payload_attributes: Option<PayloadAttributes>,
@@ -126,57 +186,7 @@ impl Builder for Engine<EngineApi> {
Ok(response)
}
}
#[async_trait]
impl Builder for Engine<BuilderApi> {
async fn notify_forkchoice_updated(
&self,
forkchoice_state: ForkChoiceState,
pa: Option<PayloadAttributes>,
log: &Logger,
) -> Result<ForkchoiceUpdatedResponse, EngineApiError> {
let payload_attributes = pa.ok_or(EngineApiError::InvalidBuilderQuery)?;
let response = self
.api
.forkchoice_updated_v1(forkchoice_state, Some(payload_attributes))
.await?;
if let Some(payload_id) = response.payload_id {
let key = PayloadIdCacheKey::new(&forkchoice_state, &payload_attributes);
self.payload_id_cache.lock().await.put(key, payload_id);
} else {
warn!(
log,
"Builder should have returned a payload_id for attributes {:?}", payload_attributes
);
}
Ok(response)
}
}
/// Holds multiple execution engines and provides functionality for managing them in a fallback
/// manner.
pub struct Engines {
pub engines: Vec<Engine<EngineApi>>,
pub latest_forkchoice_state: RwLock<Option<ForkChoiceState>>,
pub log: Logger,
}
pub struct Builders {
pub builders: Vec<Engine<BuilderApi>>,
pub log: Logger,
}
#[derive(Debug)]
pub enum EngineError {
Offline { id: String },
Api { id: String, error: EngineApiError },
Auth { id: String },
}
impl Engines {
async fn get_latest_forkchoice_state(&self) -> Option<ForkChoiceState> {
*self.latest_forkchoice_state.read().await
}
@@ -185,7 +195,7 @@ impl Engines {
*self.latest_forkchoice_state.write().await = Some(state);
}
async fn send_latest_forkchoice_state(&self, engine: &Engine<EngineApi>) {
async fn send_latest_forkchoice_state(&self) {
let latest_forkchoice_state = self.get_latest_forkchoice_state().await;
if let Some(forkchoice_state) = latest_forkchoice_state {
@@ -194,7 +204,6 @@ impl Engines {
self.log,
"No need to call forkchoiceUpdated";
"msg" => "head does not have execution enabled",
"id" => &engine.id,
);
return;
}
@@ -203,323 +212,138 @@ impl Engines {
self.log,
"Issuing forkchoiceUpdated";
"forkchoice_state" => ?forkchoice_state,
"id" => &engine.id,
);
// For simplicity, payload attributes are never included in this call. It may be
// reasonable to include them in the future.
if let Err(e) = engine
.api
.forkchoice_updated_v1(forkchoice_state, None)
.await
{
if let Err(e) = self.api.forkchoice_updated_v1(forkchoice_state, None).await {
debug!(
self.log,
"Failed to issue latest head to engine";
"error" => ?e,
"id" => &engine.id,
);
}
} else {
debug!(
self.log,
"No head, not sending to engine";
"id" => &engine.id,
);
}
}
/// Returns `true` if there is at least one engine with a "synced" status.
pub async fn any_synced(&self) -> bool {
for engine in &self.engines {
if *engine.state.read().await == EngineState::Synced {
return true;
}
}
false
/// Returns `true` if the engine has a "synced" status.
pub async fn is_synced(&self) -> bool {
**self.state.read().await == EngineStateInternal::Synced
}
/// Run the `EngineApi::upcheck` function on all nodes which are currently offline.
///
/// This can be used to try and recover any offline nodes.
pub async fn upcheck_not_synced(&self, logging: Logging) {
let upcheck_futures = self.engines.iter().map(|engine| async move {
let mut state_lock = engine.state.write().await;
if *state_lock != EngineState::Synced {
match engine.api.upcheck().await {
Ok(()) => {
if logging.is_enabled() {
info!(
self.log,
"Execution engine online";
"id" => &engine.id
);
}
/// Run the `EngineApi::upcheck` function if the node's last known state is not synced. This
/// might be used to recover the node if offline.
pub async fn upcheck(&self) {
let state: EngineStateInternal = match self.api.upcheck().await {
Ok(()) => {
let mut state = self.state.write().await;
if **state != EngineStateInternal::Synced {
info!(
self.log,
"Execution engine online";
);
// Send the node our latest forkchoice_state.
self.send_latest_forkchoice_state(engine).await;
*state_lock = EngineState::Synced
}
Err(EngineApiError::IsSyncing) => {
if logging.is_enabled() {
warn!(
self.log,
"Execution engine syncing";
"id" => &engine.id
)
}
// Send the node our latest forkchoice_state, it may assist with syncing.
self.send_latest_forkchoice_state(engine).await;
*state_lock = EngineState::Syncing
}
Err(EngineApiError::Auth(err)) => {
if logging.is_enabled() {
warn!(
self.log,
"Failed jwt authorization";
"error" => ?err,
"id" => &engine.id
);
}
*state_lock = EngineState::AuthFailed
}
Err(e) => {
if logging.is_enabled() {
warn!(
self.log,
"Execution engine offline";
"error" => ?e,
"id" => &engine.id
)
}
}
}
}
*state_lock
});
let num_synced = join_all(upcheck_futures)
.await
.into_iter()
.filter(|state: &EngineState| *state == EngineState::Synced)
.count();
if num_synced == 0 && logging.is_enabled() {
crit!(
self.log,
"No synced execution engines";
)
}
}
/// Run `func` on all engines, in the order in which they are defined, returning the first
/// successful result that is found.
///
/// This function might try to run `func` twice. If all nodes return an error on the first time
/// it runs, it will try to upcheck all offline nodes and then run the function again.
pub async fn first_success<'a, F, G, H>(&'a self, func: F) -> Result<H, Vec<EngineError>>
where
F: Fn(&'a Engine<EngineApi>) -> G + Copy,
G: Future<Output = Result<H, EngineApiError>>,
{
match self.first_success_without_retry(func).await {
Ok(result) => Ok(result),
Err(mut first_errors) => {
// Try to recover some nodes.
self.upcheck_not_synced(Logging::Enabled).await;
// Retry the call on all nodes.
match self.first_success_without_retry(func).await {
Ok(result) => Ok(result),
Err(second_errors) => {
first_errors.extend(second_errors);
Err(first_errors)
}
}
}
}
}
/// Run `func` on all engines, in the order in which they are defined, returning the first
/// successful result that is found.
pub async fn first_success_without_retry<'a, F, G, H>(
&'a self,
func: F,
) -> Result<H, Vec<EngineError>>
where
F: Fn(&'a Engine<EngineApi>) -> G,
G: Future<Output = Result<H, EngineApiError>>,
{
let mut errors = vec![];
for engine in &self.engines {
let (engine_synced, engine_auth_failed) = {
let state = engine.state.read().await;
(
*state == EngineState::Synced,
*state == EngineState::AuthFailed,
)
};
if engine_synced {
match func(engine).await {
Ok(result) => return Ok(result),
Err(error) => {
debug!(
self.log,
"Execution engine call failed";
"error" => ?error,
"id" => &engine.id
);
*engine.state.write().await = EngineState::Offline;
errors.push(EngineError::Api {
id: engine.id.clone(),
error,
})
}
}
} else if engine_auth_failed {
errors.push(EngineError::Auth {
id: engine.id.clone(),
})
} else {
errors.push(EngineError::Offline {
id: engine.id.clone(),
})
}
}
Err(errors)
}
/// Runs `func` on all nodes concurrently, returning all results. Any nodes that are offline
/// will be ignored, however all synced or unsynced nodes will receive the broadcast.
///
/// This function might try to run `func` twice. If all nodes return an error on the first time
/// it runs, it will try to upcheck all offline nodes and then run the function again.
pub async fn broadcast<'a, F, G, H>(&'a self, func: F) -> Vec<Result<H, EngineError>>
where
F: Fn(&'a Engine<EngineApi>) -> G + Copy,
G: Future<Output = Result<H, EngineApiError>>,
{
let first_results = self.broadcast_without_retry(func).await;
let mut any_offline = false;
for result in &first_results {
match result {
Ok(_) => return first_results,
Err(EngineError::Offline { .. }) => any_offline = true,
_ => (),
}
}
if any_offline {
self.upcheck_not_synced(Logging::Enabled).await;
self.broadcast_without_retry(func).await
} else {
first_results
}
}
/// Runs `func` on all nodes concurrently, returning all results.
pub async fn broadcast_without_retry<'a, F, G, H>(
&'a self,
func: F,
) -> Vec<Result<H, EngineError>>
where
F: Fn(&'a Engine<EngineApi>) -> G,
G: Future<Output = Result<H, EngineApiError>>,
{
let func = &func;
let futures = self.engines.iter().map(|engine| async move {
let is_offline = *engine.state.read().await == EngineState::Offline;
if !is_offline {
match func(engine).await {
Ok(res) => Ok(res),
Err(error) => {
debug!(
self.log,
"Execution engine call failed";
"error" => ?error,
"id" => &engine.id
);
*engine.state.write().await = EngineState::Offline;
Err(EngineError::Api {
id: engine.id.clone(),
error,
})
}
}
} else {
Err(EngineError::Offline {
id: engine.id.clone(),
})
}
});
join_all(futures).await
}
}
impl Builders {
pub async fn first_success_without_retry<'a, F, G, H>(
&'a self,
func: F,
) -> Result<H, Vec<EngineError>>
where
F: Fn(&'a Engine<BuilderApi>) -> G,
G: Future<Output = Result<H, EngineApiError>>,
{
let mut errors = vec![];
for builder in &self.builders {
match func(builder).await {
Ok(result) => return Ok(result),
Err(error) => {
// Send the node our latest forkchoice_state.
self.send_latest_forkchoice_state().await;
} else {
debug!(
self.log,
"Builder call failed";
"error" => ?error,
"id" => &builder.id
"Execution engine online";
);
errors.push(EngineError::Api {
id: builder.id.clone(),
error,
})
}
state.update(EngineStateInternal::Synced);
**state
}
}
Err(EngineApiError::IsSyncing) => {
let mut state = self.state.write().await;
state.update(EngineStateInternal::Syncing);
**state
}
Err(EngineApiError::Auth(err)) => {
error!(
self.log,
"Failed jwt authorization";
"error" => ?err,
);
Err(errors)
let mut state = self.state.write().await;
state.update(EngineStateInternal::AuthFailed);
**state
}
Err(e) => {
error!(
self.log,
"Error during execution engine upcheck";
"error" => ?e,
);
let mut state = self.state.write().await;
state.update(EngineStateInternal::Offline);
**state
}
};
debug!(
self.log,
"Execution engine upcheck complete";
"state" => ?state,
);
}
pub async fn broadcast_without_retry<'a, F, G, H>(
&'a self,
func: F,
) -> Vec<Result<H, EngineError>>
/// Run `func` on the node regardless of the node's current state.
///
/// ## Note
///
/// This function takes locks on `self.state`, holding a conflicting lock might cause a
/// deadlock.
pub async fn request<'a, F, G, H>(self: &'a Arc<Self>, func: F) -> Result<H, EngineError>
where
F: Fn(&'a Engine<BuilderApi>) -> G,
F: Fn(&'a Engine) -> G,
G: Future<Output = Result<H, EngineApiError>>,
{
let func = &func;
let futures = self.builders.iter().map(|engine| async move {
func(engine).await.map_err(|error| {
debug!(
self.log,
"Builder call failed";
"error" => ?error,
"id" => &engine.id
);
EngineError::Api {
id: engine.id.clone(),
error,
}
})
});
match func(self).await {
Ok(result) => {
// Take a clone *without* holding the read-lock since the `upcheck` function will
// take a write-lock.
let state: EngineStateInternal = **self.state.read().await;
join_all(futures).await
// Keep an up to date engine state.
if state != EngineStateInternal::Synced {
// Spawn the upcheck in another task to avoid slowing down this request.
let inner_self = self.clone();
self.executor.spawn(
async move { inner_self.upcheck().await },
"upcheck_after_success",
);
}
Ok(result)
}
Err(error) => {
error!(
self.log,
"Execution engine call failed";
"error" => ?error,
);
// The node just returned an error, run an upcheck so we can update the endpoint
// state.
//
// Spawn the upcheck in another task to avoid slowing down this request.
let inner_self = self.clone();
self.executor.spawn(
async move { inner_self.upcheck().await },
"upcheck_after_error",
);
Err(EngineError::Api { error })
}
}
}
}
@@ -533,3 +357,22 @@ impl PayloadIdCacheKey {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio_stream::StreamExt;
#[tokio::test]
async fn test_state_notifier() {
let mut state = State::default();
let initial_state: EngineState = state.state.into();
assert_eq!(initial_state, EngineState::Offline);
state.update(EngineStateInternal::Synced);
// a watcher that arrives after the first update.
let mut watcher = state.watch();
let new_state = watcher.next().await.expect("Last state is always present");
assert_eq!(new_state, EngineState::Online);
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -3,6 +3,7 @@ pub use lighthouse_metrics::*;
pub const HIT: &str = "hit";
pub const MISS: &str = "miss";
pub const GET_PAYLOAD: &str = "get_payload";
pub const GET_BLINDED_PAYLOAD: &str = "get_blinded_payload";
pub const NEW_PAYLOAD: &str = "new_payload";
pub const FORKCHOICE_UPDATED: &str = "forkchoice_updated";
pub const GET_TERMINAL_POW_BLOCK_HASH: &str = "get_terminal_pow_block_hash";
@@ -35,4 +36,9 @@ lazy_static::lazy_static! {
"execution_layer_get_payload_by_block_hash_time",
"Time to reconstruct a payload from the EE using eth_getBlockByHash"
);
pub static ref EXECUTION_LAYER_PAYLOAD_STATUS: Result<IntCounterVec> = try_create_int_counter_vec(
"execution_layer_payload_status",
"Indicates the payload status returned for a particular method",
&["method", "status"]
);
}

View File

@@ -0,0 +1,33 @@
use lru::LruCache;
use parking_lot::Mutex;
use tree_hash::TreeHash;
use types::{EthSpec, ExecutionPayload, Hash256};
pub const DEFAULT_PAYLOAD_CACHE_SIZE: usize = 10;
/// A cache mapping execution payloads by tree hash roots.
pub struct PayloadCache<T: EthSpec> {
payloads: Mutex<LruCache<PayloadCacheId, ExecutionPayload<T>>>,
}
#[derive(Hash, PartialEq, Eq)]
struct PayloadCacheId(Hash256);
impl<T: EthSpec> Default for PayloadCache<T> {
fn default() -> Self {
PayloadCache {
payloads: Mutex::new(LruCache::new(DEFAULT_PAYLOAD_CACHE_SIZE)),
}
}
}
impl<T: EthSpec> PayloadCache<T> {
pub fn put(&self, payload: ExecutionPayload<T>) -> Option<ExecutionPayload<T>> {
let root = payload.tree_hash_root();
self.payloads.lock().put(PayloadCacheId(root), payload)
}
pub fn pop(&self, root: &Hash256) -> Option<ExecutionPayload<T>> {
self.payloads.lock().pop(&PayloadCacheId(*root))
}
}

View File

@@ -1,7 +1,6 @@
use crate::engine_api::{Error as ApiError, PayloadStatusV1, PayloadStatusV1Status};
use crate::engines::EngineError;
use crate::Error;
use slog::{crit, warn, Logger};
use slog::{warn, Logger};
use types::ExecutionBlockHash;
/// Provides a simpler, easier to parse version of `PayloadStatusV1` for upstream users.
@@ -19,173 +18,103 @@ pub enum PayloadStatus {
InvalidBlockHash {
validation_error: Option<String>,
},
InvalidTerminalBlock {
validation_error: Option<String>,
},
}
/// Processes the responses from multiple execution engines, finding the "best" status and returning
/// it (if any).
///
/// This function has the following basic goals:
///
/// - Detect a consensus failure between nodes.
/// - Find the most-synced node by preferring a definite response (valid/invalid) over a
/// syncing/accepted response or error.
///
/// # Details
///
/// - If there are conflicting valid/invalid responses, always return an error.
/// - If there are syncing/accepted responses but valid/invalid responses exist, return the
/// valid/invalid responses since they're definite.
/// - If there are multiple valid responses, return the first one processed.
/// - If there are multiple invalid responses, return the first one processed.
/// - Syncing/accepted responses are grouped, if there are multiple of them, return the first one
/// processed.
/// - If there are no responses (only errors or nothing), return an error.
pub fn process_multiple_payload_statuses(
/// Processes the response from the execution engine.
pub fn process_payload_status(
head_block_hash: ExecutionBlockHash,
statuses: impl Iterator<Item = Result<PayloadStatusV1, EngineError>>,
status: Result<PayloadStatusV1, EngineError>,
log: &Logger,
) -> Result<PayloadStatus, Error> {
let mut errors = vec![];
let mut valid_statuses = vec![];
let mut invalid_statuses = vec![];
let mut other_statuses = vec![];
for status in statuses {
match status {
Err(e) => errors.push(e),
Ok(response) => match &response.status {
PayloadStatusV1Status::Valid => {
if response
.latest_valid_hash
.map_or(false, |h| h == head_block_hash)
{
// The response is only valid if `latest_valid_hash` is not `null` and
// equal to the provided `block_hash`.
valid_statuses.push(PayloadStatus::Valid)
} else {
errors.push(EngineError::Api {
id: "unknown".to_string(),
error: ApiError::BadResponse(
format!(
"new_payload: response.status = VALID but invalid latest_valid_hash. Expected({:?}) Found({:?})",
head_block_hash,
response.latest_valid_hash,
)
),
});
}
}
PayloadStatusV1Status::Invalid => {
if let Some(latest_valid_hash) = response.latest_valid_hash {
// The response is only valid if `latest_valid_hash` is not `null`.
invalid_statuses.push(PayloadStatus::Invalid {
latest_valid_hash,
validation_error: response.validation_error.clone(),
})
} else {
errors.push(EngineError::Api {
id: "unknown".to_string(),
error: ApiError::BadResponse(
"new_payload: response.status = INVALID but null latest_valid_hash"
.to_string(),
),
});
}
}
PayloadStatusV1Status::InvalidBlockHash => {
// In the interests of being liberal with what we accept, only raise a
// warning here.
if response.latest_valid_hash.is_some() {
warn!(
log,
"Malformed response from execution engine";
"msg" => "expected a null latest_valid_hash",
"status" => ?response.status
)
}
invalid_statuses.push(PayloadStatus::InvalidBlockHash {
validation_error: response.validation_error.clone(),
});
}
PayloadStatusV1Status::InvalidTerminalBlock => {
// In the interests of being liberal with what we accept, only raise a
// warning here.
if response.latest_valid_hash.is_some() {
warn!(
log,
"Malformed response from execution engine";
"msg" => "expected a null latest_valid_hash",
"status" => ?response.status
)
}
invalid_statuses.push(PayloadStatus::InvalidTerminalBlock {
validation_error: response.validation_error.clone(),
});
}
PayloadStatusV1Status::Syncing => {
// In the interests of being liberal with what we accept, only raise a
// warning here.
if response.latest_valid_hash.is_some() {
warn!(
log,
"Malformed response from execution engine";
"msg" => "expected a null latest_valid_hash",
"status" => ?response.status
)
}
other_statuses.push(PayloadStatus::Syncing)
}
PayloadStatusV1Status::Accepted => {
// In the interests of being liberal with what we accept, only raise a
// warning here.
if response.latest_valid_hash.is_some() {
warn!(
log,
"Malformed response from execution engine";
"msg" => "expected a null latest_valid_hash",
"status" => ?response.status
)
}
other_statuses.push(PayloadStatus::Accepted)
}
},
}
}
if !valid_statuses.is_empty() && !invalid_statuses.is_empty() {
crit!(
log,
"Consensus failure between execution nodes";
"invalid_statuses" => ?invalid_statuses,
"valid_statuses" => ?valid_statuses,
);
// Choose to exit and ignore the valid response. This preferences correctness over
// liveness.
return Err(Error::ConsensusFailure);
}
// Log any errors to assist with troubleshooting.
for error in &errors {
warn!(
) -> Result<PayloadStatus, EngineError> {
match status {
Err(error) => {
warn!(
log,
"Error whilst processing payload status";
"error" => ?error,
);
}
);
Err(error)
}
Ok(response) => match &response.status {
PayloadStatusV1Status::Valid => {
if response
.latest_valid_hash
.map_or(false, |h| h == head_block_hash)
{
// The response is only valid if `latest_valid_hash` is not `null` and
// equal to the provided `block_hash`.
Ok(PayloadStatus::Valid)
} else {
let error = format!(
"new_payload: response.status = VALID but invalid latest_valid_hash. Expected({:?}) Found({:?})",
head_block_hash,
response.latest_valid_hash
);
Err(EngineError::Api {
error: ApiError::BadResponse(error),
})
}
}
PayloadStatusV1Status::Invalid => {
if let Some(latest_valid_hash) = response.latest_valid_hash {
// The response is only valid if `latest_valid_hash` is not `null`.
Ok(PayloadStatus::Invalid {
latest_valid_hash,
validation_error: response.validation_error.clone(),
})
} else {
Err(EngineError::Api {
error: ApiError::BadResponse(
"new_payload: response.status = INVALID but null latest_valid_hash"
.to_string(),
),
})
}
}
PayloadStatusV1Status::InvalidBlockHash => {
// In the interests of being liberal with what we accept, only raise a
// warning here.
if response.latest_valid_hash.is_some() {
warn!(
log,
"Malformed response from execution engine";
"msg" => "expected a null latest_valid_hash",
"status" => ?response.status
)
}
valid_statuses
.first()
.or_else(|| invalid_statuses.first())
.or_else(|| other_statuses.first())
.cloned()
.map(Result::Ok)
.unwrap_or_else(|| Err(Error::EngineErrors(errors)))
Ok(PayloadStatus::InvalidBlockHash {
validation_error: response.validation_error.clone(),
})
}
PayloadStatusV1Status::Syncing => {
// In the interests of being liberal with what we accept, only raise a
// warning here.
if response.latest_valid_hash.is_some() {
warn!(
log,
"Malformed response from execution engine";
"msg" => "expected a null latest_valid_hash",
"status" => ?response.status
)
}
Ok(PayloadStatus::Syncing)
}
PayloadStatusV1Status::Accepted => {
// In the interests of being liberal with what we accept, only raise a
// warning here.
if response.latest_valid_hash.is_some() {
warn!(
log,
"Malformed response from execution engine";
"msg" => "expected a null latest_valid_hash",
"status" => ?response.status
)
}
Ok(PayloadStatus::Accepted)
}
},
}
}

View File

@@ -1,10 +1,13 @@
use crate::engine_api::{
json_structures::{
JsonForkchoiceUpdatedV1Response, JsonPayloadStatusV1, JsonPayloadStatusV1Status,
},
ExecutionBlock, PayloadAttributes, PayloadId, PayloadStatusV1, PayloadStatusV1Status,
};
use crate::engines::ForkChoiceState;
use crate::{
engine_api::{
json_structures::{
JsonForkchoiceUpdatedV1Response, JsonPayloadStatusV1, JsonPayloadStatusV1Status,
},
ExecutionBlock, PayloadAttributes, PayloadId, PayloadStatusV1, PayloadStatusV1Status,
},
ExecutionBlockWithTransactions,
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tree_hash::TreeHash;
@@ -57,15 +60,39 @@ impl<T: EthSpec> Block<T> {
block_number: block.block_number,
parent_hash: block.parent_hash,
total_difficulty: block.total_difficulty,
timestamp: block.timestamp,
},
Block::PoS(payload) => ExecutionBlock {
block_hash: payload.block_hash,
block_number: payload.block_number,
parent_hash: payload.parent_hash,
total_difficulty,
timestamp: payload.timestamp,
},
}
}
pub fn as_execution_block_with_tx(&self) -> Option<ExecutionBlockWithTransactions<T>> {
match self {
Block::PoS(payload) => Some(ExecutionBlockWithTransactions {
parent_hash: payload.parent_hash,
fee_recipient: payload.fee_recipient,
state_root: payload.state_root,
receipts_root: payload.receipts_root,
logs_bloom: payload.logs_bloom.clone(),
prev_randao: payload.prev_randao,
block_number: payload.block_number,
gas_limit: payload.gas_limit,
gas_used: payload.gas_used,
timestamp: payload.timestamp,
extra_data: payload.extra_data.clone(),
base_fee_per_gas: payload.base_fee_per_gas,
block_hash: payload.block_hash,
transactions: vec![],
}),
Block::PoW(_) => None,
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize, TreeHash)]
@@ -75,8 +102,10 @@ pub struct PoWBlock {
pub block_hash: ExecutionBlockHash,
pub parent_hash: ExecutionBlockHash,
pub total_difficulty: Uint256,
pub timestamp: u64,
}
#[derive(Clone)]
pub struct ExecutionBlockGenerator<T: EthSpec> {
/*
* Common database
@@ -153,6 +182,14 @@ impl<T: EthSpec> ExecutionBlockGenerator<T> {
.map(|block| block.as_execution_block(self.terminal_total_difficulty))
}
pub fn execution_block_with_txs_by_hash(
&self,
hash: ExecutionBlockHash,
) -> Option<ExecutionBlockWithTransactions<T>> {
self.block_by_hash(hash)
.and_then(|block| block.as_execution_block_with_tx())
}
pub fn move_to_block_prior_to_terminal_block(&mut self) -> Result<(), String> {
let target_block = self
.terminal_block_number
@@ -233,6 +270,26 @@ impl<T: EthSpec> ExecutionBlockGenerator<T> {
Ok(())
}
pub fn modify_last_block(&mut self, block_modifier: impl FnOnce(&mut Block<T>)) {
if let Some((last_block_hash, block_number)) =
self.block_hashes.keys().max().and_then(|block_number| {
self.block_hashes
.get(block_number)
.map(|block| (block, *block_number))
})
{
let mut block = self.blocks.remove(last_block_hash).unwrap();
block_modifier(&mut block);
// Update the block hash after modifying the block
match &mut block {
Block::PoW(b) => b.block_hash = ExecutionBlockHash::from_root(b.tree_hash_root()),
Block::PoS(b) => b.block_hash = ExecutionBlockHash::from_root(b.tree_hash_root()),
}
self.block_hashes.insert(block_number, block.block_hash());
self.blocks.insert(block.block_hash(), block);
}
}
pub fn get_payload(&mut self, id: &PayloadId) -> Option<ExecutionPayload<T>> {
self.payload_ids.get(id).cloned()
}
@@ -279,7 +336,9 @@ impl<T: EthSpec> ExecutionBlockGenerator<T> {
}
let unknown_head_block_hash = !self.blocks.contains_key(&forkchoice_state.head_block_hash);
let unknown_safe_block_hash = !self.blocks.contains_key(&forkchoice_state.safe_block_hash);
let unknown_safe_block_hash = forkchoice_state.safe_block_hash
!= ExecutionBlockHash::zero()
&& !self.blocks.contains_key(&forkchoice_state.safe_block_hash);
let unknown_finalized_block_hash = forkchoice_state.finalized_block_hash
!= ExecutionBlockHash::zero()
&& !self
@@ -390,6 +449,7 @@ pub fn generate_pow_block(
block_hash: ExecutionBlockHash::zero(),
parent_hash,
total_difficulty,
timestamp: block_number,
};
block.block_hash = ExecutionBlockHash::from_root(block.tree_hash_root());

View File

@@ -49,12 +49,30 @@ pub async fn handle_rpc<T: EthSpec>(
.map_err(|e| format!("unable to parse hash: {:?}", e))
})?;
Ok(serde_json::to_value(
ctx.execution_block_generator
.read()
.execution_block_by_hash(hash),
)
.unwrap())
// If we have a static response set, just return that.
if let Some(response) = *ctx.static_get_block_by_hash_response.lock() {
return Ok(serde_json::to_value(response).unwrap());
}
let full_tx = params
.get(1)
.and_then(JsonValue::as_bool)
.ok_or_else(|| "missing/invalid params[1] value".to_string())?;
if full_tx {
Ok(serde_json::to_value(
ctx.execution_block_generator
.read()
.execution_block_with_txs_by_hash(hash),
)
.unwrap())
} else {
Ok(serde_json::to_value(
ctx.execution_block_generator
.read()
.execution_block_by_hash(hash),
)
.unwrap())
}
}
ENGINE_NEW_PAYLOAD_V1 => {
let request: JsonExecutionPayloadV1<T> = get_param(params, 0)?;
@@ -120,6 +138,15 @@ pub async fn handle_rpc<T: EthSpec>(
Ok(serde_json::to_value(response).unwrap())
}
ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1 => {
let block_generator = ctx.execution_block_generator.read();
let transition_config: TransitionConfigurationV1 = TransitionConfigurationV1 {
terminal_total_difficulty: block_generator.terminal_total_difficulty,
terminal_block_hash: block_generator.terminal_block_hash,
terminal_block_number: block_generator.terminal_block_number,
};
Ok(serde_json::to_value(transition_config).unwrap())
}
other => Err(format!(
"The method {} does not exist/is not available",
other

View File

@@ -0,0 +1,385 @@
use crate::test_utils::DEFAULT_JWT_SECRET;
use crate::{Config, ExecutionLayer, PayloadAttributes};
use async_trait::async_trait;
use eth2::types::{BlockId, StateId, ValidatorId};
use eth2::{BeaconNodeHttpClient, Timeouts};
use ethereum_consensus::crypto::{SecretKey, Signature};
use ethereum_consensus::primitives::BlsPublicKey;
pub use ethereum_consensus::state_transition::Context;
use fork_choice::ForkchoiceUpdateParameters;
use mev_build_rs::{
sign_builder_message, verify_signed_builder_message, BidRequest, BlindedBlockProviderError,
BlindedBlockProviderServer, BuilderBid, ExecutionPayload as ServerPayload,
ExecutionPayloadHeader as ServerPayloadHeader, SignedBlindedBeaconBlock, SignedBuilderBid,
SignedValidatorRegistration,
};
use parking_lot::RwLock;
use sensitive_url::SensitiveUrl;
use ssz::{Decode, Encode};
use ssz_rs::{Merkleized, SimpleSerialize};
use std::collections::HashMap;
use std::fmt::Debug;
use std::net::Ipv4Addr;
use std::sync::Arc;
use std::time::Duration;
use task_executor::TaskExecutor;
use tempfile::NamedTempFile;
use tree_hash::TreeHash;
use types::{
Address, BeaconState, BlindedPayload, ChainSpec, EthSpec, ExecPayload, Hash256, Slot, Uint256,
};
#[derive(Clone)]
pub enum Operation {
FeeRecipient(Address),
GasLimit(usize),
Value(Uint256),
ParentHash(Hash256),
PrevRandao(Hash256),
BlockNumber(usize),
Timestamp(usize),
}
impl Operation {
fn apply(self, bid: &mut BuilderBid) -> Result<(), BlindedBlockProviderError> {
match self {
Operation::FeeRecipient(fee_recipient) => {
bid.header.fee_recipient = to_ssz_rs(&fee_recipient)?
}
Operation::GasLimit(gas_limit) => bid.header.gas_limit = gas_limit as u64,
Operation::Value(value) => bid.value = to_ssz_rs(&value)?,
Operation::ParentHash(parent_hash) => bid.header.parent_hash = to_ssz_rs(&parent_hash)?,
Operation::PrevRandao(prev_randao) => bid.header.prev_randao = to_ssz_rs(&prev_randao)?,
Operation::BlockNumber(block_number) => bid.header.block_number = block_number as u64,
Operation::Timestamp(timestamp) => bid.header.timestamp = timestamp as u64,
}
Ok(())
}
}
pub struct TestingBuilder<E: EthSpec> {
server: BlindedBlockProviderServer<MockBuilder<E>>,
pub builder: MockBuilder<E>,
}
impl<E: EthSpec> TestingBuilder<E> {
pub fn new(
mock_el_url: SensitiveUrl,
builder_url: SensitiveUrl,
beacon_url: SensitiveUrl,
spec: ChainSpec,
executor: TaskExecutor,
) -> Self {
let file = NamedTempFile::new().unwrap();
let path = file.path().into();
std::fs::write(&path, hex::encode(DEFAULT_JWT_SECRET)).unwrap();
// This EL should not talk to a builder
let config = Config {
execution_endpoints: vec![mock_el_url],
secret_files: vec![path],
suggested_fee_recipient: None,
..Default::default()
};
let el =
ExecutionLayer::from_config(config, executor.clone(), executor.log().clone()).unwrap();
// This should probably be done for all fields, we only update ones we are testing with so far.
let mut context = Context::for_mainnet();
context.terminal_total_difficulty = to_ssz_rs(&spec.terminal_total_difficulty).unwrap();
context.terminal_block_hash = to_ssz_rs(&spec.terminal_block_hash).unwrap();
context.terminal_block_hash_activation_epoch =
to_ssz_rs(&spec.terminal_block_hash_activation_epoch).unwrap();
let builder = MockBuilder::new(
el,
BeaconNodeHttpClient::new(beacon_url, Timeouts::set_all(Duration::from_secs(1))),
spec,
context,
);
let port = builder_url.full.port().unwrap();
let host: Ipv4Addr = builder_url
.full
.host_str()
.unwrap()
.to_string()
.parse()
.unwrap();
let server = BlindedBlockProviderServer::new(host, port, builder.clone());
Self { server, builder }
}
pub async fn run(&self) {
self.server.run().await
}
}
#[derive(Clone)]
pub struct MockBuilder<E: EthSpec> {
el: ExecutionLayer<E>,
beacon_client: BeaconNodeHttpClient,
spec: ChainSpec,
context: Arc<Context>,
val_registration_cache: Arc<RwLock<HashMap<BlsPublicKey, SignedValidatorRegistration>>>,
builder_sk: SecretKey,
operations: Arc<RwLock<Vec<Operation>>>,
invalidate_signatures: Arc<RwLock<bool>>,
}
impl<E: EthSpec> MockBuilder<E> {
pub fn new(
el: ExecutionLayer<E>,
beacon_client: BeaconNodeHttpClient,
spec: ChainSpec,
context: Context,
) -> Self {
let sk = SecretKey::random(&mut rand::thread_rng()).unwrap();
Self {
el,
beacon_client,
// Should keep spec and context consistent somehow
spec,
context: Arc::new(context),
val_registration_cache: Arc::new(RwLock::new(HashMap::new())),
builder_sk: sk,
operations: Arc::new(RwLock::new(vec![])),
invalidate_signatures: Arc::new(RwLock::new(false)),
}
}
pub fn add_operation(&self, op: Operation) {
// Insert operations at the front of the vec to make sure `apply_operations` applies them
// in the order they are added.
self.operations.write().insert(0, op);
}
pub fn invalid_signatures(&self) {
*self.invalidate_signatures.write() = true;
}
pub fn valid_signatures(&mut self) {
*self.invalidate_signatures.write() = false;
}
fn apply_operations(&self, bid: &mut BuilderBid) -> Result<(), BlindedBlockProviderError> {
let mut guard = self.operations.write();
while let Some(op) = guard.pop() {
op.apply(bid)?;
}
Ok(())
}
}
#[async_trait]
impl<E: EthSpec> mev_build_rs::BlindedBlockProvider for MockBuilder<E> {
async fn register_validators(
&self,
registrations: &mut [SignedValidatorRegistration],
) -> Result<(), BlindedBlockProviderError> {
for registration in registrations {
let pubkey = registration.message.public_key.clone();
let message = &mut registration.message;
verify_signed_builder_message(
message,
&registration.signature,
&pubkey,
&self.context,
)?;
self.val_registration_cache.write().insert(
registration.message.public_key.clone(),
registration.clone(),
);
}
Ok(())
}
async fn fetch_best_bid(
&self,
bid_request: &BidRequest,
) -> Result<SignedBuilderBid, BlindedBlockProviderError> {
let slot = Slot::new(bid_request.slot);
let signed_cached_data = self
.val_registration_cache
.read()
.get(&bid_request.public_key)
.ok_or_else(|| convert_err("missing registration"))?
.clone();
let cached_data = signed_cached_data.message;
let head = self
.beacon_client
.get_beacon_blocks::<E>(BlockId::Head)
.await
.map_err(convert_err)?
.ok_or_else(|| convert_err("missing head block"))?;
let block = head.data.message_merge().map_err(convert_err)?;
let head_block_root = block.tree_hash_root();
let head_execution_hash = block.body.execution_payload.execution_payload.block_hash;
if head_execution_hash != from_ssz_rs(&bid_request.parent_hash)? {
return Err(BlindedBlockProviderError::Custom(format!(
"head mismatch: {} {}",
head_execution_hash, bid_request.parent_hash
)));
}
let finalized_execution_hash = self
.beacon_client
.get_beacon_blocks::<E>(BlockId::Finalized)
.await
.map_err(convert_err)?
.ok_or_else(|| convert_err("missing finalized block"))?
.data
.message_merge()
.map_err(convert_err)?
.body
.execution_payload
.execution_payload
.block_hash;
let justified_execution_hash = self
.beacon_client
.get_beacon_blocks::<E>(BlockId::Justified)
.await
.map_err(convert_err)?
.ok_or_else(|| convert_err("missing finalized block"))?
.data
.message_merge()
.map_err(convert_err)?
.body
.execution_payload
.execution_payload
.block_hash;
let val_index = self
.beacon_client
.get_beacon_states_validator_id(
StateId::Head,
&ValidatorId::PublicKey(from_ssz_rs(&cached_data.public_key)?),
)
.await
.map_err(convert_err)?
.ok_or_else(|| convert_err("missing validator from state"))?
.data
.index;
let fee_recipient = from_ssz_rs(&cached_data.fee_recipient)?;
let slots_since_genesis = slot.as_u64() - self.spec.genesis_slot.as_u64();
let genesis_time = self
.beacon_client
.get_beacon_genesis()
.await
.map_err(convert_err)?
.data
.genesis_time;
let timestamp = (slots_since_genesis * self.spec.seconds_per_slot) + genesis_time;
let head_state: BeaconState<E> = self
.beacon_client
.get_debug_beacon_states(StateId::Head)
.await
.map_err(convert_err)?
.ok_or_else(|| BlindedBlockProviderError::Custom("missing head state".to_string()))?
.data;
let prev_randao = head_state
.get_randao_mix(head_state.current_epoch())
.map_err(convert_err)?;
let payload_attributes = PayloadAttributes {
timestamp,
prev_randao: *prev_randao,
suggested_fee_recipient: fee_recipient,
};
self.el
.insert_proposer(slot, head_block_root, val_index, payload_attributes)
.await;
let forkchoice_update_params = ForkchoiceUpdateParameters {
head_root: Hash256::zero(),
head_hash: None,
justified_hash: Some(justified_execution_hash),
finalized_hash: Some(finalized_execution_hash),
};
let payload = self
.el
.get_full_payload_caching::<BlindedPayload<E>>(
head_execution_hash,
timestamp,
*prev_randao,
fee_recipient,
forkchoice_update_params,
)
.await
.map_err(convert_err)?
.to_execution_payload_header();
let json_payload = serde_json::to_string(&payload).map_err(convert_err)?;
let mut header: ServerPayloadHeader =
serde_json::from_str(json_payload.as_str()).map_err(convert_err)?;
header.gas_limit = cached_data.gas_limit;
let mut message = BuilderBid {
header,
value: ssz_rs::U256::default(),
public_key: self.builder_sk.public_key(),
};
self.apply_operations(&mut message)?;
let mut signature =
sign_builder_message(&mut message, &self.builder_sk, self.context.as_ref())?;
if *self.invalidate_signatures.read() {
signature = Signature::default();
}
let signed_bid = SignedBuilderBid { message, signature };
Ok(signed_bid)
}
async fn open_bid(
&self,
signed_block: &mut SignedBlindedBeaconBlock,
) -> Result<ServerPayload, BlindedBlockProviderError> {
let payload = self
.el
.get_payload_by_root(&from_ssz_rs(
&signed_block
.message
.body
.execution_payload_header
.hash_tree_root()
.map_err(convert_err)?,
)?)
.ok_or_else(|| convert_err("missing payload for tx root"))?;
let json_payload = serde_json::to_string(&payload).map_err(convert_err)?;
serde_json::from_str(json_payload.as_str()).map_err(convert_err)
}
}
pub fn from_ssz_rs<T: SimpleSerialize, U: Decode>(
ssz_rs_data: &T,
) -> Result<U, BlindedBlockProviderError> {
U::from_ssz_bytes(
ssz_rs::serialize(ssz_rs_data)
.map_err(convert_err)?
.as_ref(),
)
.map_err(convert_err)
}
pub fn to_ssz_rs<T: Encode, U: SimpleSerialize>(
ssz_data: &T,
) -> Result<U, BlindedBlockProviderError> {
ssz_rs::deserialize::<U>(&ssz_data.as_ssz_bytes()).map_err(convert_err)
}
fn convert_err<E: Debug>(e: E) -> BlindedBlockProviderError {
BlindedBlockProviderError::Custom(format!("{e:?}"))
}

View File

@@ -1,15 +1,19 @@
use crate::{
test_utils::{MockServer, DEFAULT_TERMINAL_BLOCK, DEFAULT_TERMINAL_DIFFICULTY, JWT_SECRET},
test_utils::{
MockServer, DEFAULT_BUILDER_THRESHOLD_WEI, DEFAULT_JWT_SECRET, DEFAULT_TERMINAL_BLOCK,
DEFAULT_TERMINAL_DIFFICULTY,
},
Config, *,
};
use sensitive_url::SensitiveUrl;
use task_executor::TaskExecutor;
use tempfile::NamedTempFile;
use tree_hash::TreeHash;
use types::{Address, ChainSpec, Epoch, EthSpec, FullPayload, Hash256, Uint256};
pub struct MockExecutionLayer<T: EthSpec> {
pub server: MockServer<T>,
pub el: ExecutionLayer,
pub el: ExecutionLayer<T>,
pub executor: TaskExecutor,
pub spec: ChainSpec,
}
@@ -22,6 +26,8 @@ impl<T: EthSpec> MockExecutionLayer<T> {
DEFAULT_TERMINAL_BLOCK,
ExecutionBlockHash::zero(),
Epoch::new(0),
Some(JwtKey::from_slice(&DEFAULT_JWT_SECRET).unwrap()),
None,
)
}
@@ -31,6 +37,8 @@ impl<T: EthSpec> MockExecutionLayer<T> {
terminal_block: u64,
terminal_block_hash: ExecutionBlockHash,
terminal_block_hash_activation_epoch: Epoch,
jwt_key: Option<JwtKey>,
builder_url: Option<SensitiveUrl>,
) -> Self {
let handle = executor.handle().unwrap();
@@ -39,8 +47,10 @@ impl<T: EthSpec> MockExecutionLayer<T> {
spec.terminal_block_hash = terminal_block_hash;
spec.terminal_block_hash_activation_epoch = terminal_block_hash_activation_epoch;
let jwt_key = jwt_key.unwrap_or_else(JwtKey::random);
let server = MockServer::new(
&handle,
jwt_key,
terminal_total_difficulty,
terminal_block,
terminal_block_hash,
@@ -50,12 +60,14 @@ impl<T: EthSpec> MockExecutionLayer<T> {
let file = NamedTempFile::new().unwrap();
let path = file.path().into();
std::fs::write(&path, hex::encode(JWT_SECRET)).unwrap();
std::fs::write(&path, hex::encode(DEFAULT_JWT_SECRET)).unwrap();
let config = Config {
execution_endpoints: vec![url],
builder_url,
secret_files: vec![path],
suggested_fee_recipient: Some(Address::repeat_byte(42)),
builder_profit_threshold: DEFAULT_BUILDER_THRESHOLD_WEI,
..Default::default()
};
let el =
@@ -79,11 +91,16 @@ impl<T: EthSpec> MockExecutionLayer<T> {
let block_number = latest_execution_block.block_number() + 1;
let timestamp = block_number;
let prev_randao = Hash256::from_low_u64_be(block_number);
let finalized_block_hash = parent_hash;
let head_block_root = Hash256::repeat_byte(42);
let forkchoice_update_params = ForkchoiceUpdateParameters {
head_root: head_block_root,
head_hash: Some(parent_hash),
justified_hash: None,
finalized_hash: None,
};
// Insert a proposer to ensure the fork choice updated command works.
let slot = Slot::new(0);
let head_block_root = Hash256::repeat_byte(42);
let validator_index = 0;
self.el
.insert_proposer(
@@ -102,6 +119,7 @@ impl<T: EthSpec> MockExecutionLayer<T> {
.notify_forkchoice_updated(
parent_hash,
ExecutionBlockHash::zero(),
ExecutionBlockHash::zero(),
slot,
head_block_root,
)
@@ -109,14 +127,21 @@ impl<T: EthSpec> MockExecutionLayer<T> {
.unwrap();
let validator_index = 0;
let builder_params = BuilderParams {
pubkey: PublicKeyBytes::empty(),
slot,
chain_health: ChainHealth::Healthy,
};
let payload = self
.el
.get_payload::<T, FullPayload<T>>(
.get_payload::<FullPayload<T>>(
parent_hash,
timestamp,
prev_randao,
finalized_block_hash,
validator_index,
forkchoice_update_params,
builder_params,
&self.spec,
)
.await
.unwrap()
@@ -127,6 +152,43 @@ impl<T: EthSpec> MockExecutionLayer<T> {
assert_eq!(payload.timestamp, timestamp);
assert_eq!(payload.prev_randao, prev_randao);
// Ensure the payload cache is empty.
assert!(self
.el
.get_payload_by_root(&payload.tree_hash_root())
.is_none());
let builder_params = BuilderParams {
pubkey: PublicKeyBytes::empty(),
slot,
chain_health: ChainHealth::Healthy,
};
let payload_header = self
.el
.get_payload::<BlindedPayload<T>>(
parent_hash,
timestamp,
prev_randao,
validator_index,
forkchoice_update_params,
builder_params,
&self.spec,
)
.await
.unwrap()
.execution_payload_header;
assert_eq!(payload_header.block_hash, block_hash);
assert_eq!(payload_header.parent_hash, parent_hash);
assert_eq!(payload_header.block_number, block_number);
assert_eq!(payload_header.timestamp, timestamp);
assert_eq!(payload_header.prev_randao, prev_randao);
// Ensure the payload cache has the correct payload.
assert_eq!(
self.el
.get_payload_by_root(&payload_header.tree_hash_root()),
Some(payload.clone())
);
let status = self.el.notify_new_payload(&payload).await.unwrap();
assert_eq!(status, PayloadStatus::Valid);
@@ -137,6 +199,7 @@ impl<T: EthSpec> MockExecutionLayer<T> {
.notify_forkchoice_updated(
block_hash,
ExecutionBlockHash::zero(),
ExecutionBlockHash::zero(),
slot,
head_block_root,
)
@@ -173,7 +236,7 @@ impl<T: EthSpec> MockExecutionLayer<T> {
pub async fn with_terminal_block<'a, U, V>(self, func: U) -> Self
where
U: Fn(ChainSpec, ExecutionLayer, Option<ExecutionBlock>) -> V,
U: Fn(ChainSpec, ExecutionLayer<T>, Option<ExecutionBlock>) -> V,
V: Future<Output = ()>,
{
let terminal_block_number = self

View File

@@ -2,11 +2,11 @@
use crate::engine_api::auth::JwtKey;
use crate::engine_api::{
auth::Auth, http::JSONRPC_VERSION, PayloadStatusV1, PayloadStatusV1Status,
auth::Auth, http::JSONRPC_VERSION, ExecutionBlock, PayloadStatusV1, PayloadStatusV1Status,
};
use bytes::Bytes;
use environment::null_logger;
use execution_block_generator::{Block, PoWBlock};
use execution_block_generator::PoWBlock;
use handle_rpc::handle_rpc;
use parking_lot::{Mutex, RwLock, RwLockWriteGuard};
use serde::{Deserialize, Serialize};
@@ -21,17 +21,41 @@ use tokio::{runtime, sync::oneshot};
use types::{EthSpec, ExecutionBlockHash, Uint256};
use warp::{http::StatusCode, Filter, Rejection};
pub use execution_block_generator::{generate_pow_block, ExecutionBlockGenerator};
pub use execution_block_generator::{generate_pow_block, Block, ExecutionBlockGenerator};
pub use mock_builder::{Context as MockBuilderContext, MockBuilder, Operation, TestingBuilder};
pub use mock_execution_layer::MockExecutionLayer;
pub const DEFAULT_TERMINAL_DIFFICULTY: u64 = 6400;
pub const DEFAULT_TERMINAL_BLOCK: u64 = 64;
pub const JWT_SECRET: [u8; 32] = [42; 32];
pub const DEFAULT_JWT_SECRET: [u8; 32] = [42; 32];
pub const DEFAULT_BUILDER_THRESHOLD_WEI: u128 = 1_000_000_000_000_000_000;
mod execution_block_generator;
mod handle_rpc;
mod mock_builder;
mod mock_execution_layer;
/// Configuration for the MockExecutionLayer.
pub struct MockExecutionConfig {
pub server_config: Config,
pub jwt_key: JwtKey,
pub terminal_difficulty: Uint256,
pub terminal_block: u64,
pub terminal_block_hash: ExecutionBlockHash,
}
impl Default for MockExecutionConfig {
fn default() -> Self {
Self {
jwt_key: JwtKey::random(),
terminal_difficulty: DEFAULT_TERMINAL_DIFFICULTY.into(),
terminal_block: DEFAULT_TERMINAL_BLOCK,
terminal_block_hash: ExecutionBlockHash::zero(),
server_config: Config::default(),
}
}
}
pub struct MockServer<T: EthSpec> {
_shutdown_tx: oneshot::Sender<()>,
listen_socket_addr: SocketAddr,
@@ -43,25 +67,29 @@ impl<T: EthSpec> MockServer<T> {
pub fn unit_testing() -> Self {
Self::new(
&runtime::Handle::current(),
JwtKey::from_slice(&DEFAULT_JWT_SECRET).unwrap(),
DEFAULT_TERMINAL_DIFFICULTY.into(),
DEFAULT_TERMINAL_BLOCK,
ExecutionBlockHash::zero(),
)
}
pub fn new(
handle: &runtime::Handle,
terminal_difficulty: Uint256,
terminal_block: u64,
terminal_block_hash: ExecutionBlockHash,
) -> Self {
pub fn new_with_config(handle: &runtime::Handle, config: MockExecutionConfig) -> Self {
let MockExecutionConfig {
jwt_key,
terminal_difficulty,
terminal_block,
terminal_block_hash,
server_config,
} = config;
let last_echo_request = Arc::new(RwLock::new(None));
let preloaded_responses = Arc::new(Mutex::new(vec![]));
let execution_block_generator =
ExecutionBlockGenerator::new(terminal_difficulty, terminal_block, terminal_block_hash);
let ctx: Arc<Context<T>> = Arc::new(Context {
config: <_>::default(),
config: server_config,
jwt_key,
log: null_logger().unwrap(),
last_echo_request: last_echo_request.clone(),
execution_block_generator: RwLock::new(execution_block_generator),
@@ -69,6 +97,7 @@ impl<T: EthSpec> MockServer<T> {
preloaded_responses,
static_new_payload_response: <_>::default(),
static_forkchoice_updated_response: <_>::default(),
static_get_block_by_hash_response: <_>::default(),
_phantom: PhantomData,
});
@@ -99,6 +128,25 @@ impl<T: EthSpec> MockServer<T> {
}
}
pub fn new(
handle: &runtime::Handle,
jwt_key: JwtKey,
terminal_difficulty: Uint256,
terminal_block: u64,
terminal_block_hash: ExecutionBlockHash,
) -> Self {
Self::new_with_config(
handle,
MockExecutionConfig {
server_config: Config::default(),
jwt_key,
terminal_difficulty,
terminal_block,
terminal_block_hash,
},
)
}
pub fn execution_block_generator(&self) -> RwLockWriteGuard<'_, ExecutionBlockGenerator<T>> {
self.ctx.execution_block_generator.write()
}
@@ -198,8 +246,8 @@ impl<T: EthSpec> MockServer<T> {
fn invalid_terminal_block_status() -> PayloadStatusV1 {
PayloadStatusV1 {
status: PayloadStatusV1Status::InvalidTerminalBlock,
latest_valid_hash: None,
status: PayloadStatusV1Status::Invalid,
latest_valid_hash: Some(ExecutionBlockHash::zero()),
validation_error: Some("static response".into()),
}
}
@@ -271,6 +319,16 @@ impl<T: EthSpec> MockServer<T> {
self.set_forkchoice_updated_response(Self::invalid_terminal_block_status());
}
/// This will make the node appear like it is syncing.
pub fn all_get_block_by_hash_requests_return_none(&self) {
*self.ctx.static_get_block_by_hash_response.lock() = Some(None);
}
/// The node will respond "naturally"; it will return blocks if they're known to it.
pub fn all_get_block_by_hash_requests_return_natural_value(&self) {
*self.ctx.static_get_block_by_hash_response.lock() = None;
}
/// Disables any static payload responses so the execution block generator will do its own
/// verification.
pub fn full_payload_verification(&self) {
@@ -290,6 +348,7 @@ impl<T: EthSpec> MockServer<T> {
block_hash,
parent_hash,
total_difficulty,
timestamp: block_number,
});
self.ctx
@@ -351,6 +410,7 @@ impl warp::reject::Reject for AuthError {}
/// The server will gracefully handle the case where any fields are `None`.
pub struct Context<T: EthSpec> {
pub config: Config,
pub jwt_key: JwtKey,
pub log: Logger,
pub last_echo_request: Arc<RwLock<Option<Bytes>>>,
pub execution_block_generator: RwLock<ExecutionBlockGenerator<T>>,
@@ -358,6 +418,7 @@ pub struct Context<T: EthSpec> {
pub previous_request: Arc<Mutex<Option<serde_json::Value>>>,
pub static_new_payload_response: Arc<Mutex<Option<StaticNewPayloadResponse>>>,
pub static_forkchoice_updated_response: Arc<Mutex<Option<PayloadStatusV1>>>,
pub static_get_block_by_hash_response: Arc<Mutex<Option<Option<ExecutionBlock>>>>,
pub _phantom: PhantomData<T>,
}
@@ -386,28 +447,30 @@ struct ErrorMessage {
/// Returns a `warp` header which filters out request that has a missing or incorrectly
/// signed JWT token.
fn auth_header_filter() -> warp::filters::BoxedFilter<()> {
fn auth_header_filter(jwt_key: JwtKey) -> warp::filters::BoxedFilter<()> {
warp::any()
.and(warp::filters::header::optional("Authorization"))
.and_then(move |authorization: Option<String>| async move {
match authorization {
None => Err(warp::reject::custom(AuthError(
"auth absent from request".to_string(),
))),
Some(auth) => {
if let Some(token) = auth.strip_prefix("Bearer ") {
let secret = JwtKey::from_slice(&JWT_SECRET).unwrap();
match Auth::validate_token(token, &secret) {
Ok(_) => Ok(()),
Err(e) => Err(warp::reject::custom(AuthError(format!(
"Auth failure: {:?}",
e
)))),
.and_then(move |authorization: Option<String>| {
let secret = jwt_key.clone();
async move {
match authorization {
None => Err(warp::reject::custom(AuthError(
"auth absent from request".to_string(),
))),
Some(auth) => {
if let Some(token) = auth.strip_prefix("Bearer ") {
match Auth::validate_token(token, &secret) {
Ok(_) => Ok(()),
Err(e) => Err(warp::reject::custom(AuthError(format!(
"Auth failure: {:?}",
e
)))),
}
} else {
Err(warp::reject::custom(AuthError(
"Bearer token not present in auth header".to_string(),
)))
}
} else {
Err(warp::reject::custom(AuthError(
"Bearer token not present in auth header".to_string(),
)))
}
}
}
@@ -523,7 +586,7 @@ pub fn serve<T: EthSpec>(
});
let routes = warp::post()
.and(auth_header_filter())
.and(auth_header_filter(ctx.jwt_key.clone()))
.and(root.or(echo))
.recover(handle_rejection)
// Add a `Server` header.