diff --git a/beacon_node/beacon_chain/src/beacon_block.rs b/beacon_node/beacon_chain/src/beacon_block.rs index 10a9fbe7c5..3b1fa00027 100644 --- a/beacon_node/beacon_chain/src/beacon_block.rs +++ b/beacon_node/beacon_chain/src/beacon_block.rs @@ -589,14 +589,15 @@ impl BeaconChain { state_root: payload_data.state_root, }; - // Cache the envelope for later retrieval for signing and publishing. + // Cache the envelope for later retrieval by the validator for signing and publishing. + let envelope_slot = payload_data.slot; self.pending_payload_envelopes .write() - .insert(beacon_block_root, execution_payload_envelope); + .insert(envelope_slot, execution_payload_envelope); debug!( %beacon_block_root, - slot = %block.slot(), + slot = %envelope_slot, "Cached pending execution payload envelope" ); } diff --git a/beacon_node/beacon_chain/src/pending_payload_envelopes.rs b/beacon_node/beacon_chain/src/pending_payload_envelopes.rs index 353830f175..14979972b8 100644 --- a/beacon_node/beacon_chain/src/pending_payload_envelopes.rs +++ b/beacon_node/beacon_chain/src/pending_payload_envelopes.rs @@ -1,20 +1,22 @@ //! Provides the `PendingPayloadEnvelopes` cache for storing execution payload envelopes -//! that have been produced during local block production but not yet imported to fork choice. +//! that have been produced during local block production. //! //! For local building, the envelope is created during block production. -//! This cache holds the envelopes temporarily until the proposer can sign and publish the payload. +//! This cache holds the envelopes temporarily until the validator fetches, signs, +//! and publishes the payload. use std::collections::HashMap; -use types::{EthSpec, ExecutionPayloadEnvelope, Hash256, Slot}; +use types::{EthSpec, ExecutionPayloadEnvelope, Slot}; /// Cache for pending execution payload envelopes awaiting publishing. /// -/// Envelopes are keyed by beacon block root and pruned based on slot age. +/// Envelopes are keyed by slot and pruned based on slot age. +/// This cache is only used for local building. pub struct PendingPayloadEnvelopes { /// Maximum number of slots to keep envelopes before pruning. max_slot_age: u64, - /// The envelopes, keyed by beacon block root. - envelopes: HashMap>, + /// The envelopes, keyed by slot. + envelopes: HashMap>, } impl Default for PendingPayloadEnvelopes { @@ -36,32 +38,31 @@ impl PendingPayloadEnvelopes { } /// Insert a pending envelope into the cache. - pub fn insert(&mut self, block_root: Hash256, envelope: ExecutionPayloadEnvelope) { - self.envelopes.insert(block_root, envelope); + pub fn insert(&mut self, slot: Slot, envelope: ExecutionPayloadEnvelope) { + self.envelopes.insert(slot, envelope); } - /// Get a pending envelope by block root. - pub fn get(&self, block_root: &Hash256) -> Option<&ExecutionPayloadEnvelope> { - self.envelopes.get(block_root) + /// Get a pending envelope by slot. + pub fn get(&self, slot: Slot) -> Option<&ExecutionPayloadEnvelope> { + self.envelopes.get(&slot) } - /// Remove and return a pending envelope by block root. - pub fn remove(&mut self, block_root: &Hash256) -> Option> { - self.envelopes.remove(block_root) + /// Remove and return a pending envelope by slot. + pub fn remove(&mut self, slot: Slot) -> Option> { + self.envelopes.remove(&slot) } - /// Check if an envelope exists for the given block root. - pub fn contains(&self, block_root: &Hash256) -> bool { - self.envelopes.contains_key(block_root) + /// Check if an envelope exists for the given slot. + pub fn contains(&self, slot: Slot) -> bool { + self.envelopes.contains_key(&slot) } /// Prune envelopes older than `current_slot - max_slot_age`. /// - /// This removes stale envelopes from blocks that were never imported. + /// This removes stale envelopes from blocks that were never published. pub fn prune(&mut self, current_slot: Slot) { let min_slot = current_slot.saturating_sub(self.max_slot_age); - self.envelopes - .retain(|_, envelope| envelope.slot >= min_slot); + self.envelopes.retain(|slot, _| *slot >= min_slot); } /// Returns the number of pending envelopes in the cache. @@ -78,16 +79,16 @@ impl PendingPayloadEnvelopes { #[cfg(test)] mod tests { use super::*; - use types::{ExecutionPayloadGloas, ExecutionRequests, MainnetEthSpec}; + use types::{ExecutionPayloadGloas, ExecutionRequests, Hash256, MainnetEthSpec}; type E = MainnetEthSpec; - fn make_envelope(slot: Slot, block_root: Hash256) -> ExecutionPayloadEnvelope { + fn make_envelope(slot: Slot) -> ExecutionPayloadEnvelope { ExecutionPayloadEnvelope { payload: ExecutionPayloadGloas::default(), execution_requests: ExecutionRequests::default(), builder_index: 0, - beacon_block_root: block_root, + beacon_block_root: Hash256::ZERO, slot, state_root: Hash256::ZERO, } @@ -96,31 +97,31 @@ mod tests { #[test] fn insert_and_get() { let mut cache = PendingPayloadEnvelopes::::default(); - let block_root = Hash256::repeat_byte(1); - let envelope = make_envelope(Slot::new(1), block_root); + let slot = Slot::new(1); + let envelope = make_envelope(slot); - assert!(!cache.contains(&block_root)); + assert!(!cache.contains(slot)); assert_eq!(cache.len(), 0); - cache.insert(block_root, envelope.clone()); + cache.insert(slot, envelope.clone()); - assert!(cache.contains(&block_root)); + assert!(cache.contains(slot)); assert_eq!(cache.len(), 1); - assert_eq!(cache.get(&block_root), Some(&envelope)); + assert_eq!(cache.get(slot), Some(&envelope)); } #[test] fn remove() { let mut cache = PendingPayloadEnvelopes::::default(); - let block_root = Hash256::repeat_byte(1); - let envelope = make_envelope(Slot::new(1), block_root); + let slot = Slot::new(1); + let envelope = make_envelope(slot); - cache.insert(block_root, envelope.clone()); - assert!(cache.contains(&block_root)); + cache.insert(slot, envelope.clone()); + assert!(cache.contains(slot)); - let removed = cache.remove(&block_root); + let removed = cache.remove(slot); assert_eq!(removed, Some(envelope)); - assert!(!cache.contains(&block_root)); + assert!(!cache.contains(slot)); assert_eq!(cache.len(), 0); } @@ -129,14 +130,12 @@ mod tests { let mut cache = PendingPayloadEnvelopes::::new(2); // Insert envelope at slot 5 - let block_root_1 = Hash256::repeat_byte(1); - let envelope_1 = make_envelope(Slot::new(5), block_root_1); - cache.insert(block_root_1, envelope_1); + let slot_1 = Slot::new(5); + cache.insert(slot_1, make_envelope(slot_1)); // Insert envelope at slot 10 - let block_root_2 = Hash256::repeat_byte(2); - let envelope_2 = make_envelope(Slot::new(10), block_root_2); - cache.insert(block_root_2, envelope_2); + let slot_2 = Slot::new(10); + cache.insert(slot_2, make_envelope(slot_2)); assert_eq!(cache.len(), 2); @@ -144,7 +143,7 @@ mod tests { cache.prune(Slot::new(10)); assert_eq!(cache.len(), 1); - assert!(!cache.contains(&block_root_1)); // slot 5 < 8, pruned - assert!(cache.contains(&block_root_2)); // slot 10 >= 8, kept + assert!(!cache.contains(slot_1)); // slot 5 < 8, pruned + assert!(cache.contains(slot_2)); // slot 10 >= 8, kept } } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 4d7c76eb20..3cb52d8224 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -2469,6 +2469,14 @@ pub fn serve( task_spawner_filter.clone(), ); + // GET validator/execution_payload_envelope/{slot}/{builder_index} + let get_validator_execution_payload_envelope = get_validator_execution_payload_envelope( + eth_v1.clone().clone(), + chain_filter.clone(), + not_while_syncing_filter.clone(), + task_spawner_filter.clone(), + ); + // GET validator/attestation_data?slot,committee_index let get_validator_attestation_data = get_validator_attestation_data( eth_v1.clone().clone(), @@ -3327,6 +3335,7 @@ pub fn serve( .uor(get_validator_duties_proposer) .uor(get_validator_blocks) .uor(get_validator_blinded_blocks) + .uor(get_validator_execution_payload_envelope) .uor(get_validator_attestation_data) .uor(get_validator_aggregate_attestation) .uor(get_validator_sync_committee_contribution) diff --git a/beacon_node/http_api/src/validator/mod.rs b/beacon_node/http_api/src/validator/mod.rs index b82905be52..e7efa9564f 100644 --- a/beacon_node/http_api/src/validator/mod.rs +++ b/beacon_node/http_api/src/validator/mod.rs @@ -21,6 +21,7 @@ use eth2::types::{ use lighthouse_network::PubsubMessage; use network::{NetworkMessage, ValidatorSubscriptionMessage}; use slot_clock::SlotClock; +use ssz::Encode; use std::sync::Arc; use tokio::sync::mpsc::{Sender, UnboundedSender}; use tokio::sync::oneshot; @@ -30,6 +31,7 @@ use types::{ SignedContributionAndProof, SignedValidatorRegistrationData, Slot, SyncContributionData, ValidatorSubscription, }; +use warp::http::Response; use warp::{Filter, Rejection, Reply}; use warp_utils::reject::convert_rejection; @@ -374,6 +376,99 @@ pub fn get_validator_execution_payload_bid( .boxed() } +// GET validator/execution_payload_envelope/{slot}/{builder_index} +pub fn get_validator_execution_payload_envelope( + eth_v1: EthV1Filter, + chain_filter: ChainFilter, + not_while_syncing_filter: NotWhileSyncingFilter, + task_spawner_filter: TaskSpawnerFilter, +) -> ResponseFilter { + eth_v1 + .and(warp::path("validator")) + .and(warp::path("execution_payload_envelope")) + .and(warp::path::param::().or_else(|_| async { + Err(warp_utils::reject::custom_bad_request( + "Invalid slot".to_string(), + )) + })) + .and(warp::path::param::().or_else(|_| async { + Err(warp_utils::reject::custom_bad_request( + "Invalid builder_index".to_string(), + )) + })) + .and(warp::path::end()) + .and(warp::header::optional::("accept")) + .and(not_while_syncing_filter) + .and(task_spawner_filter) + .and(chain_filter) + .then( + |slot: Slot, + // TODO(gloas) we're only doing local building + // we'll need to implement builder index logic + // eventually. + _builder_index: u64, + accept_header: Option, + not_synced_filter: Result<(), Rejection>, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.spawn_async_with_rejection(Priority::P0, async move { + debug!(?slot, "Execution payload envelope request from HTTP API"); + + not_synced_filter?; + + // Get the envelope from the pending cache (local building only) + let envelope = chain + .pending_payload_envelopes + .read() + .get(slot) + .cloned() + .ok_or_else(|| { + warp_utils::reject::custom_not_found(format!( + "Execution payload envelope not available for slot {slot}" + )) + })?; + + let fork_name = chain.spec.fork_name_at_slot::(slot); + + match accept_header { + Some(Accept::Ssz) => Response::builder() + .status(200) + .header("Content-Type", "application/octet-stream") + .header("Eth-Consensus-Version", fork_name.to_string()) + .body(envelope.as_ssz_bytes().into()) + .map_err(|e| { + warp_utils::reject::custom_server_error(format!( + "Failed to build SSZ response: {e}" + )) + }), + _ => { + let json_response = GenericResponse { data: envelope }; + Response::builder() + .status(200) + .header("Content-Type", "application/json") + .header("Eth-Consensus-Version", fork_name.to_string()) + .body( + serde_json::to_string(&json_response) + .map_err(|e| { + warp_utils::reject::custom_server_error(format!( + "Failed to serialize response: {e}" + )) + })? + .into(), + ) + .map_err(|e| { + warp_utils::reject::custom_server_error(format!( + "Failed to build JSON response: {e}" + )) + }) + } + } + }) + }, + ) + .boxed() +} + // POST validator/liveness/{epoch} pub fn post_validator_liveness_epoch( eth_v1: EthV1Filter, diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 2e5ce12323..90712d5a6b 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -3800,13 +3800,13 @@ impl ApiTester { assert!(!metadata.consensus_block_value.is_zero()); // Verify that the execution payload envelope is cached for local building. - // The envelope is stored in the pending cache until publishing. + // The envelope is stored in the pending cache (keyed by slot) until publishing. let block_root = block.tree_hash_root(); let envelope = self .chain .pending_payload_envelopes .read() - .get(&block_root) + .get(slot) .cloned() .expect("envelope should exist in pending cache for local building"); assert_eq!(envelope.beacon_block_root, block_root); @@ -3910,6 +3910,159 @@ impl ApiTester { self } + /// Test fetching execution payload envelope via HTTP API (JSON). Only runs if Gloas is scheduled. + pub async fn test_get_execution_payload_envelope(self) -> Self { + if !self.chain.spec.is_gloas_scheduled() { + return self; + } + + let fork = self.chain.canonical_head.cached_head().head_fork(); + let genesis_validators_root = self.chain.genesis_validators_root; + + for _ in 0..E::slots_per_epoch() * 3 { + let slot = self.chain.slot().unwrap(); + let epoch = self.chain.epoch().unwrap(); + + // Skip if not in Gloas fork yet + let fork_name = self.chain.spec.fork_name_at_slot::(slot); + if !fork_name.gloas_enabled() { + self.chain.slot_clock.set_slot(slot.as_u64() + 1); + continue; + } + + let proposer_pubkey_bytes = self + .client + .get_validator_duties_proposer(epoch) + .await + .unwrap() + .data + .into_iter() + .find(|duty| duty.slot == slot) + .map(|duty| duty.pubkey) + .unwrap(); + let proposer_pubkey = (&proposer_pubkey_bytes).try_into().unwrap(); + + let sk = self + .validator_keypairs() + .iter() + .find(|kp| kp.pk == proposer_pubkey) + .map(|kp| kp.sk.clone()) + .unwrap(); + + let randao_reveal = { + let domain = self.chain.spec.get_domain( + epoch, + Domain::Randao, + &fork, + genesis_validators_root, + ); + let message = epoch.signing_root(domain); + sk.sign(message).into() + }; + + // Produce a V4 block (which caches the envelope) + let (response, _metadata) = self + .client + .get_validator_blocks_v4::(slot, &randao_reveal, None, None, None) + .await + .unwrap(); + + let block = response.data; + let block_root = block.tree_hash_root(); + + // Fetch the envelope via HTTP API (using builder_index=0 for local building) + let envelope_response = self + .client + .get_validator_execution_payload_envelope::(slot, 0) + .await + .unwrap(); + + let envelope = envelope_response.data; + assert_eq!(envelope.beacon_block_root, block_root); + assert_eq!(envelope.slot, slot); + + self.chain.slot_clock.set_slot(slot.as_u64() + 1); + } + + self + } + + /// Test fetching execution payload envelope via HTTP API (SSZ). Only runs if Gloas is scheduled. + pub async fn test_get_execution_payload_envelope_ssz(self) -> Self { + if !self.chain.spec.is_gloas_scheduled() { + return self; + } + + let fork = self.chain.canonical_head.cached_head().head_fork(); + let genesis_validators_root = self.chain.genesis_validators_root; + + for _ in 0..E::slots_per_epoch() * 3 { + let slot = self.chain.slot().unwrap(); + let epoch = self.chain.epoch().unwrap(); + + // Skip if not in Gloas fork yet + let fork_name = self.chain.spec.fork_name_at_slot::(slot); + if !fork_name.gloas_enabled() { + self.chain.slot_clock.set_slot(slot.as_u64() + 1); + continue; + } + + let proposer_pubkey_bytes = self + .client + .get_validator_duties_proposer(epoch) + .await + .unwrap() + .data + .into_iter() + .find(|duty| duty.slot == slot) + .map(|duty| duty.pubkey) + .unwrap(); + let proposer_pubkey = (&proposer_pubkey_bytes).try_into().unwrap(); + + let sk = self + .validator_keypairs() + .iter() + .find(|kp| kp.pk == proposer_pubkey) + .map(|kp| kp.sk.clone()) + .unwrap(); + + let randao_reveal = { + let domain = self.chain.spec.get_domain( + epoch, + Domain::Randao, + &fork, + genesis_validators_root, + ); + let message = epoch.signing_root(domain); + sk.sign(message).into() + }; + + // Produce a V4 block (which caches the envelope) + let (response, _metadata) = self + .client + .get_validator_blocks_v4::(slot, &randao_reveal, None, None, None) + .await + .unwrap(); + + let block = response.data; + let block_root = block.tree_hash_root(); + + // Fetch the envelope via HTTP API in SSZ format + let envelope = self + .client + .get_validator_execution_payload_envelope_ssz::(slot, 0) + .await + .unwrap(); + + assert_eq!(envelope.beacon_block_root, block_root); + assert_eq!(envelope.slot, slot); + + self.chain.slot_clock.set_slot(slot.as_u64() + 1); + } + + self + } + pub async fn test_block_production_no_verify_randao(self) -> Self { for _ in 0..E::slots_per_epoch() { let slot = self.chain.slot().unwrap(); @@ -7659,6 +7812,22 @@ async fn block_production_v4_ssz() { .await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn get_execution_payload_envelope() { + ApiTester::new_with_hard_forks() + .await + .test_get_execution_payload_envelope() + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn get_execution_payload_envelope_ssz() { + ApiTester::new_with_hard_forks() + .await + .test_get_execution_payload_envelope_ssz() + .await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn blinded_block_production_full_payload_premerge() { ApiTester::new().await.test_blinded_block_production().await; diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index c9672f2221..35219ff924 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -42,7 +42,7 @@ use reqwest::{ #[cfg(feature = "events")] use reqwest_eventsource::{Event, EventSource}; use serde::{Serialize, de::DeserializeOwned}; -use ssz::Encode; +use ssz::{Decode, Encode}; use std::fmt; use std::future::Future; use std::time::Duration; @@ -2581,6 +2581,48 @@ impl BeaconNodeHttpClient { opt_response.ok_or(Error::StatusCode(StatusCode::NOT_FOUND)) } + /// `GET v1/validator/execution_payload_envelope/{slot}/{builder_index}` + pub async fn get_validator_execution_payload_envelope( + &self, + slot: Slot, + builder_index: u64, + ) -> Result>, Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("validator") + .push("execution_payload_envelope") + .push(&slot.to_string()) + .push(&builder_index.to_string()); + + self.get(path).await + } + + /// `GET v1/validator/execution_payload_envelope/{slot}/{builder_index}` in SSZ format + pub async fn get_validator_execution_payload_envelope_ssz( + &self, + slot: Slot, + builder_index: u64, + ) -> Result, Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("validator") + .push("execution_payload_envelope") + .push(&slot.to_string()) + .push(&builder_index.to_string()); + + let opt_response = self + .get_bytes_opt_accept_header(path, Accept::Ssz, self.timeouts.get_validator_block) + .await?; + + let response_bytes = opt_response.ok_or(Error::StatusCode(StatusCode::NOT_FOUND))?; + + ExecutionPayloadEnvelope::from_ssz_bytes(&response_bytes).map_err(Error::InvalidSsz) + } + /// `GET v2/validator/blocks/{slot}` in ssz format pub async fn get_validator_blocks_ssz( &self,