mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-17 03:42:46 +00:00
Add get payload envelope route
This commit is contained in:
@@ -589,14 +589,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
state_root: payload_data.state_root,
|
||||
};
|
||||
|
||||
// Cache the envelope for later retrieval for signing and publishing.
|
||||
// Cache the envelope for later retrieval by the validator for signing and publishing.
|
||||
let envelope_slot = payload_data.slot;
|
||||
self.pending_payload_envelopes
|
||||
.write()
|
||||
.insert(beacon_block_root, execution_payload_envelope);
|
||||
.insert(envelope_slot, execution_payload_envelope);
|
||||
|
||||
debug!(
|
||||
%beacon_block_root,
|
||||
slot = %block.slot(),
|
||||
slot = %envelope_slot,
|
||||
"Cached pending execution payload envelope"
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1,20 +1,22 @@
|
||||
//! Provides the `PendingPayloadEnvelopes` cache for storing execution payload envelopes
|
||||
//! that have been produced during local block production but not yet imported to fork choice.
|
||||
//! that have been produced during local block production.
|
||||
//!
|
||||
//! For local building, the envelope is created during block production.
|
||||
//! This cache holds the envelopes temporarily until the proposer can sign and publish the payload.
|
||||
//! This cache holds the envelopes temporarily until the validator fetches, signs,
|
||||
//! and publishes the payload.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use types::{EthSpec, ExecutionPayloadEnvelope, Hash256, Slot};
|
||||
use types::{EthSpec, ExecutionPayloadEnvelope, Slot};
|
||||
|
||||
/// Cache for pending execution payload envelopes awaiting publishing.
|
||||
///
|
||||
/// Envelopes are keyed by beacon block root and pruned based on slot age.
|
||||
/// Envelopes are keyed by slot and pruned based on slot age.
|
||||
/// This cache is only used for local building.
|
||||
pub struct PendingPayloadEnvelopes<E: EthSpec> {
|
||||
/// Maximum number of slots to keep envelopes before pruning.
|
||||
max_slot_age: u64,
|
||||
/// The envelopes, keyed by beacon block root.
|
||||
envelopes: HashMap<Hash256, ExecutionPayloadEnvelope<E>>,
|
||||
/// The envelopes, keyed by slot.
|
||||
envelopes: HashMap<Slot, ExecutionPayloadEnvelope<E>>,
|
||||
}
|
||||
|
||||
impl<E: EthSpec> Default for PendingPayloadEnvelopes<E> {
|
||||
@@ -36,32 +38,31 @@ impl<E: EthSpec> PendingPayloadEnvelopes<E> {
|
||||
}
|
||||
|
||||
/// Insert a pending envelope into the cache.
|
||||
pub fn insert(&mut self, block_root: Hash256, envelope: ExecutionPayloadEnvelope<E>) {
|
||||
self.envelopes.insert(block_root, envelope);
|
||||
pub fn insert(&mut self, slot: Slot, envelope: ExecutionPayloadEnvelope<E>) {
|
||||
self.envelopes.insert(slot, envelope);
|
||||
}
|
||||
|
||||
/// Get a pending envelope by block root.
|
||||
pub fn get(&self, block_root: &Hash256) -> Option<&ExecutionPayloadEnvelope<E>> {
|
||||
self.envelopes.get(block_root)
|
||||
/// Get a pending envelope by slot.
|
||||
pub fn get(&self, slot: Slot) -> Option<&ExecutionPayloadEnvelope<E>> {
|
||||
self.envelopes.get(&slot)
|
||||
}
|
||||
|
||||
/// Remove and return a pending envelope by block root.
|
||||
pub fn remove(&mut self, block_root: &Hash256) -> Option<ExecutionPayloadEnvelope<E>> {
|
||||
self.envelopes.remove(block_root)
|
||||
/// Remove and return a pending envelope by slot.
|
||||
pub fn remove(&mut self, slot: Slot) -> Option<ExecutionPayloadEnvelope<E>> {
|
||||
self.envelopes.remove(&slot)
|
||||
}
|
||||
|
||||
/// Check if an envelope exists for the given block root.
|
||||
pub fn contains(&self, block_root: &Hash256) -> bool {
|
||||
self.envelopes.contains_key(block_root)
|
||||
/// Check if an envelope exists for the given slot.
|
||||
pub fn contains(&self, slot: Slot) -> bool {
|
||||
self.envelopes.contains_key(&slot)
|
||||
}
|
||||
|
||||
/// Prune envelopes older than `current_slot - max_slot_age`.
|
||||
///
|
||||
/// This removes stale envelopes from blocks that were never imported.
|
||||
/// This removes stale envelopes from blocks that were never published.
|
||||
pub fn prune(&mut self, current_slot: Slot) {
|
||||
let min_slot = current_slot.saturating_sub(self.max_slot_age);
|
||||
self.envelopes
|
||||
.retain(|_, envelope| envelope.slot >= min_slot);
|
||||
self.envelopes.retain(|slot, _| *slot >= min_slot);
|
||||
}
|
||||
|
||||
/// Returns the number of pending envelopes in the cache.
|
||||
@@ -78,16 +79,16 @@ impl<E: EthSpec> PendingPayloadEnvelopes<E> {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use types::{ExecutionPayloadGloas, ExecutionRequests, MainnetEthSpec};
|
||||
use types::{ExecutionPayloadGloas, ExecutionRequests, Hash256, MainnetEthSpec};
|
||||
|
||||
type E = MainnetEthSpec;
|
||||
|
||||
fn make_envelope(slot: Slot, block_root: Hash256) -> ExecutionPayloadEnvelope<E> {
|
||||
fn make_envelope(slot: Slot) -> ExecutionPayloadEnvelope<E> {
|
||||
ExecutionPayloadEnvelope {
|
||||
payload: ExecutionPayloadGloas::default(),
|
||||
execution_requests: ExecutionRequests::default(),
|
||||
builder_index: 0,
|
||||
beacon_block_root: block_root,
|
||||
beacon_block_root: Hash256::ZERO,
|
||||
slot,
|
||||
state_root: Hash256::ZERO,
|
||||
}
|
||||
@@ -96,31 +97,31 @@ mod tests {
|
||||
#[test]
|
||||
fn insert_and_get() {
|
||||
let mut cache = PendingPayloadEnvelopes::<E>::default();
|
||||
let block_root = Hash256::repeat_byte(1);
|
||||
let envelope = make_envelope(Slot::new(1), block_root);
|
||||
let slot = Slot::new(1);
|
||||
let envelope = make_envelope(slot);
|
||||
|
||||
assert!(!cache.contains(&block_root));
|
||||
assert!(!cache.contains(slot));
|
||||
assert_eq!(cache.len(), 0);
|
||||
|
||||
cache.insert(block_root, envelope.clone());
|
||||
cache.insert(slot, envelope.clone());
|
||||
|
||||
assert!(cache.contains(&block_root));
|
||||
assert!(cache.contains(slot));
|
||||
assert_eq!(cache.len(), 1);
|
||||
assert_eq!(cache.get(&block_root), Some(&envelope));
|
||||
assert_eq!(cache.get(slot), Some(&envelope));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remove() {
|
||||
let mut cache = PendingPayloadEnvelopes::<E>::default();
|
||||
let block_root = Hash256::repeat_byte(1);
|
||||
let envelope = make_envelope(Slot::new(1), block_root);
|
||||
let slot = Slot::new(1);
|
||||
let envelope = make_envelope(slot);
|
||||
|
||||
cache.insert(block_root, envelope.clone());
|
||||
assert!(cache.contains(&block_root));
|
||||
cache.insert(slot, envelope.clone());
|
||||
assert!(cache.contains(slot));
|
||||
|
||||
let removed = cache.remove(&block_root);
|
||||
let removed = cache.remove(slot);
|
||||
assert_eq!(removed, Some(envelope));
|
||||
assert!(!cache.contains(&block_root));
|
||||
assert!(!cache.contains(slot));
|
||||
assert_eq!(cache.len(), 0);
|
||||
}
|
||||
|
||||
@@ -129,14 +130,12 @@ mod tests {
|
||||
let mut cache = PendingPayloadEnvelopes::<E>::new(2);
|
||||
|
||||
// Insert envelope at slot 5
|
||||
let block_root_1 = Hash256::repeat_byte(1);
|
||||
let envelope_1 = make_envelope(Slot::new(5), block_root_1);
|
||||
cache.insert(block_root_1, envelope_1);
|
||||
let slot_1 = Slot::new(5);
|
||||
cache.insert(slot_1, make_envelope(slot_1));
|
||||
|
||||
// Insert envelope at slot 10
|
||||
let block_root_2 = Hash256::repeat_byte(2);
|
||||
let envelope_2 = make_envelope(Slot::new(10), block_root_2);
|
||||
cache.insert(block_root_2, envelope_2);
|
||||
let slot_2 = Slot::new(10);
|
||||
cache.insert(slot_2, make_envelope(slot_2));
|
||||
|
||||
assert_eq!(cache.len(), 2);
|
||||
|
||||
@@ -144,7 +143,7 @@ mod tests {
|
||||
cache.prune(Slot::new(10));
|
||||
|
||||
assert_eq!(cache.len(), 1);
|
||||
assert!(!cache.contains(&block_root_1)); // slot 5 < 8, pruned
|
||||
assert!(cache.contains(&block_root_2)); // slot 10 >= 8, kept
|
||||
assert!(!cache.contains(slot_1)); // slot 5 < 8, pruned
|
||||
assert!(cache.contains(slot_2)); // slot 10 >= 8, kept
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2469,6 +2469,14 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
task_spawner_filter.clone(),
|
||||
);
|
||||
|
||||
// GET validator/execution_payload_envelope/{slot}/{builder_index}
|
||||
let get_validator_execution_payload_envelope = get_validator_execution_payload_envelope(
|
||||
eth_v1.clone().clone(),
|
||||
chain_filter.clone(),
|
||||
not_while_syncing_filter.clone(),
|
||||
task_spawner_filter.clone(),
|
||||
);
|
||||
|
||||
// GET validator/attestation_data?slot,committee_index
|
||||
let get_validator_attestation_data = get_validator_attestation_data(
|
||||
eth_v1.clone().clone(),
|
||||
@@ -3327,6 +3335,7 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
.uor(get_validator_duties_proposer)
|
||||
.uor(get_validator_blocks)
|
||||
.uor(get_validator_blinded_blocks)
|
||||
.uor(get_validator_execution_payload_envelope)
|
||||
.uor(get_validator_attestation_data)
|
||||
.uor(get_validator_aggregate_attestation)
|
||||
.uor(get_validator_sync_committee_contribution)
|
||||
|
||||
@@ -21,6 +21,7 @@ use eth2::types::{
|
||||
use lighthouse_network::PubsubMessage;
|
||||
use network::{NetworkMessage, ValidatorSubscriptionMessage};
|
||||
use slot_clock::SlotClock;
|
||||
use ssz::Encode;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc::{Sender, UnboundedSender};
|
||||
use tokio::sync::oneshot;
|
||||
@@ -30,6 +31,7 @@ use types::{
|
||||
SignedContributionAndProof, SignedValidatorRegistrationData, Slot, SyncContributionData,
|
||||
ValidatorSubscription,
|
||||
};
|
||||
use warp::http::Response;
|
||||
use warp::{Filter, Rejection, Reply};
|
||||
use warp_utils::reject::convert_rejection;
|
||||
|
||||
@@ -374,6 +376,99 @@ pub fn get_validator_execution_payload_bid<T: BeaconChainTypes>(
|
||||
.boxed()
|
||||
}
|
||||
|
||||
// GET validator/execution_payload_envelope/{slot}/{builder_index}
|
||||
pub fn get_validator_execution_payload_envelope<T: BeaconChainTypes>(
|
||||
eth_v1: EthV1Filter,
|
||||
chain_filter: ChainFilter<T>,
|
||||
not_while_syncing_filter: NotWhileSyncingFilter,
|
||||
task_spawner_filter: TaskSpawnerFilter<T>,
|
||||
) -> ResponseFilter {
|
||||
eth_v1
|
||||
.and(warp::path("validator"))
|
||||
.and(warp::path("execution_payload_envelope"))
|
||||
.and(warp::path::param::<Slot>().or_else(|_| async {
|
||||
Err(warp_utils::reject::custom_bad_request(
|
||||
"Invalid slot".to_string(),
|
||||
))
|
||||
}))
|
||||
.and(warp::path::param::<u64>().or_else(|_| async {
|
||||
Err(warp_utils::reject::custom_bad_request(
|
||||
"Invalid builder_index".to_string(),
|
||||
))
|
||||
}))
|
||||
.and(warp::path::end())
|
||||
.and(warp::header::optional::<Accept>("accept"))
|
||||
.and(not_while_syncing_filter)
|
||||
.and(task_spawner_filter)
|
||||
.and(chain_filter)
|
||||
.then(
|
||||
|slot: Slot,
|
||||
// TODO(gloas) we're only doing local building
|
||||
// we'll need to implement builder index logic
|
||||
// eventually.
|
||||
_builder_index: u64,
|
||||
accept_header: Option<Accept>,
|
||||
not_synced_filter: Result<(), Rejection>,
|
||||
task_spawner: TaskSpawner<T::EthSpec>,
|
||||
chain: Arc<BeaconChain<T>>| {
|
||||
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
|
||||
debug!(?slot, "Execution payload envelope request from HTTP API");
|
||||
|
||||
not_synced_filter?;
|
||||
|
||||
// Get the envelope from the pending cache (local building only)
|
||||
let envelope = chain
|
||||
.pending_payload_envelopes
|
||||
.read()
|
||||
.get(slot)
|
||||
.cloned()
|
||||
.ok_or_else(|| {
|
||||
warp_utils::reject::custom_not_found(format!(
|
||||
"Execution payload envelope not available for slot {slot}"
|
||||
))
|
||||
})?;
|
||||
|
||||
let fork_name = chain.spec.fork_name_at_slot::<T::EthSpec>(slot);
|
||||
|
||||
match accept_header {
|
||||
Some(Accept::Ssz) => Response::builder()
|
||||
.status(200)
|
||||
.header("Content-Type", "application/octet-stream")
|
||||
.header("Eth-Consensus-Version", fork_name.to_string())
|
||||
.body(envelope.as_ssz_bytes().into())
|
||||
.map_err(|e| {
|
||||
warp_utils::reject::custom_server_error(format!(
|
||||
"Failed to build SSZ response: {e}"
|
||||
))
|
||||
}),
|
||||
_ => {
|
||||
let json_response = GenericResponse { data: envelope };
|
||||
Response::builder()
|
||||
.status(200)
|
||||
.header("Content-Type", "application/json")
|
||||
.header("Eth-Consensus-Version", fork_name.to_string())
|
||||
.body(
|
||||
serde_json::to_string(&json_response)
|
||||
.map_err(|e| {
|
||||
warp_utils::reject::custom_server_error(format!(
|
||||
"Failed to serialize response: {e}"
|
||||
))
|
||||
})?
|
||||
.into(),
|
||||
)
|
||||
.map_err(|e| {
|
||||
warp_utils::reject::custom_server_error(format!(
|
||||
"Failed to build JSON response: {e}"
|
||||
))
|
||||
})
|
||||
}
|
||||
}
|
||||
})
|
||||
},
|
||||
)
|
||||
.boxed()
|
||||
}
|
||||
|
||||
// POST validator/liveness/{epoch}
|
||||
pub fn post_validator_liveness_epoch<T: BeaconChainTypes>(
|
||||
eth_v1: EthV1Filter,
|
||||
|
||||
@@ -3800,13 +3800,13 @@ impl ApiTester {
|
||||
assert!(!metadata.consensus_block_value.is_zero());
|
||||
|
||||
// Verify that the execution payload envelope is cached for local building.
|
||||
// The envelope is stored in the pending cache until publishing.
|
||||
// The envelope is stored in the pending cache (keyed by slot) until publishing.
|
||||
let block_root = block.tree_hash_root();
|
||||
let envelope = self
|
||||
.chain
|
||||
.pending_payload_envelopes
|
||||
.read()
|
||||
.get(&block_root)
|
||||
.get(slot)
|
||||
.cloned()
|
||||
.expect("envelope should exist in pending cache for local building");
|
||||
assert_eq!(envelope.beacon_block_root, block_root);
|
||||
@@ -3910,6 +3910,159 @@ impl ApiTester {
|
||||
self
|
||||
}
|
||||
|
||||
/// Test fetching execution payload envelope via HTTP API (JSON). Only runs if Gloas is scheduled.
|
||||
pub async fn test_get_execution_payload_envelope(self) -> Self {
|
||||
if !self.chain.spec.is_gloas_scheduled() {
|
||||
return self;
|
||||
}
|
||||
|
||||
let fork = self.chain.canonical_head.cached_head().head_fork();
|
||||
let genesis_validators_root = self.chain.genesis_validators_root;
|
||||
|
||||
for _ in 0..E::slots_per_epoch() * 3 {
|
||||
let slot = self.chain.slot().unwrap();
|
||||
let epoch = self.chain.epoch().unwrap();
|
||||
|
||||
// Skip if not in Gloas fork yet
|
||||
let fork_name = self.chain.spec.fork_name_at_slot::<E>(slot);
|
||||
if !fork_name.gloas_enabled() {
|
||||
self.chain.slot_clock.set_slot(slot.as_u64() + 1);
|
||||
continue;
|
||||
}
|
||||
|
||||
let proposer_pubkey_bytes = self
|
||||
.client
|
||||
.get_validator_duties_proposer(epoch)
|
||||
.await
|
||||
.unwrap()
|
||||
.data
|
||||
.into_iter()
|
||||
.find(|duty| duty.slot == slot)
|
||||
.map(|duty| duty.pubkey)
|
||||
.unwrap();
|
||||
let proposer_pubkey = (&proposer_pubkey_bytes).try_into().unwrap();
|
||||
|
||||
let sk = self
|
||||
.validator_keypairs()
|
||||
.iter()
|
||||
.find(|kp| kp.pk == proposer_pubkey)
|
||||
.map(|kp| kp.sk.clone())
|
||||
.unwrap();
|
||||
|
||||
let randao_reveal = {
|
||||
let domain = self.chain.spec.get_domain(
|
||||
epoch,
|
||||
Domain::Randao,
|
||||
&fork,
|
||||
genesis_validators_root,
|
||||
);
|
||||
let message = epoch.signing_root(domain);
|
||||
sk.sign(message).into()
|
||||
};
|
||||
|
||||
// Produce a V4 block (which caches the envelope)
|
||||
let (response, _metadata) = self
|
||||
.client
|
||||
.get_validator_blocks_v4::<E>(slot, &randao_reveal, None, None, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let block = response.data;
|
||||
let block_root = block.tree_hash_root();
|
||||
|
||||
// Fetch the envelope via HTTP API (using builder_index=0 for local building)
|
||||
let envelope_response = self
|
||||
.client
|
||||
.get_validator_execution_payload_envelope::<E>(slot, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let envelope = envelope_response.data;
|
||||
assert_eq!(envelope.beacon_block_root, block_root);
|
||||
assert_eq!(envelope.slot, slot);
|
||||
|
||||
self.chain.slot_clock.set_slot(slot.as_u64() + 1);
|
||||
}
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
/// Test fetching execution payload envelope via HTTP API (SSZ). Only runs if Gloas is scheduled.
|
||||
pub async fn test_get_execution_payload_envelope_ssz(self) -> Self {
|
||||
if !self.chain.spec.is_gloas_scheduled() {
|
||||
return self;
|
||||
}
|
||||
|
||||
let fork = self.chain.canonical_head.cached_head().head_fork();
|
||||
let genesis_validators_root = self.chain.genesis_validators_root;
|
||||
|
||||
for _ in 0..E::slots_per_epoch() * 3 {
|
||||
let slot = self.chain.slot().unwrap();
|
||||
let epoch = self.chain.epoch().unwrap();
|
||||
|
||||
// Skip if not in Gloas fork yet
|
||||
let fork_name = self.chain.spec.fork_name_at_slot::<E>(slot);
|
||||
if !fork_name.gloas_enabled() {
|
||||
self.chain.slot_clock.set_slot(slot.as_u64() + 1);
|
||||
continue;
|
||||
}
|
||||
|
||||
let proposer_pubkey_bytes = self
|
||||
.client
|
||||
.get_validator_duties_proposer(epoch)
|
||||
.await
|
||||
.unwrap()
|
||||
.data
|
||||
.into_iter()
|
||||
.find(|duty| duty.slot == slot)
|
||||
.map(|duty| duty.pubkey)
|
||||
.unwrap();
|
||||
let proposer_pubkey = (&proposer_pubkey_bytes).try_into().unwrap();
|
||||
|
||||
let sk = self
|
||||
.validator_keypairs()
|
||||
.iter()
|
||||
.find(|kp| kp.pk == proposer_pubkey)
|
||||
.map(|kp| kp.sk.clone())
|
||||
.unwrap();
|
||||
|
||||
let randao_reveal = {
|
||||
let domain = self.chain.spec.get_domain(
|
||||
epoch,
|
||||
Domain::Randao,
|
||||
&fork,
|
||||
genesis_validators_root,
|
||||
);
|
||||
let message = epoch.signing_root(domain);
|
||||
sk.sign(message).into()
|
||||
};
|
||||
|
||||
// Produce a V4 block (which caches the envelope)
|
||||
let (response, _metadata) = self
|
||||
.client
|
||||
.get_validator_blocks_v4::<E>(slot, &randao_reveal, None, None, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let block = response.data;
|
||||
let block_root = block.tree_hash_root();
|
||||
|
||||
// Fetch the envelope via HTTP API in SSZ format
|
||||
let envelope = self
|
||||
.client
|
||||
.get_validator_execution_payload_envelope_ssz::<E>(slot, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(envelope.beacon_block_root, block_root);
|
||||
assert_eq!(envelope.slot, slot);
|
||||
|
||||
self.chain.slot_clock.set_slot(slot.as_u64() + 1);
|
||||
}
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn test_block_production_no_verify_randao(self) -> Self {
|
||||
for _ in 0..E::slots_per_epoch() {
|
||||
let slot = self.chain.slot().unwrap();
|
||||
@@ -7659,6 +7812,22 @@ async fn block_production_v4_ssz() {
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn get_execution_payload_envelope() {
|
||||
ApiTester::new_with_hard_forks()
|
||||
.await
|
||||
.test_get_execution_payload_envelope()
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn get_execution_payload_envelope_ssz() {
|
||||
ApiTester::new_with_hard_forks()
|
||||
.await
|
||||
.test_get_execution_payload_envelope_ssz()
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn blinded_block_production_full_payload_premerge() {
|
||||
ApiTester::new().await.test_blinded_block_production().await;
|
||||
|
||||
Reference in New Issue
Block a user