diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/gossip_verified_envelope.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/gossip_verified_envelope.rs index 03a3a91ac5..9a4ed2d044 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/gossip_verified_envelope.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/gossip_verified_envelope.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use educe::Educe; +use eth2::types::{EventKind, SseExecutionPayloadGossip}; use parking_lot::{Mutex, RwLock}; use store::DatabaseBlock; use tracing::{Span, debug}; @@ -10,7 +11,7 @@ use types::{ }; use crate::{ - BeaconChain, BeaconChainError, BeaconChainTypes, BeaconStore, + BeaconChain, BeaconChainError, BeaconChainTypes, BeaconStore, ServerSentEventHandler, beacon_proposer_cache::{self, BeaconProposerCache}, canonical_head::CanonicalHead, payload_envelope_verification::{ @@ -28,6 +29,7 @@ pub struct GossipVerificationContext<'a, T: BeaconChainTypes> { pub beacon_proposer_cache: &'a Mutex, pub validator_pubkey_cache: &'a RwLock>, pub genesis_validators_root: Hash256, + pub event_handler: &'a Option>, } /// Verify that an execution payload envelope is consistent with its beacon block @@ -213,6 +215,20 @@ impl GossipVerifiedEnvelope { return Err(EnvelopeError::BadSignature); } + if let Some(event_handler) = ctx.event_handler.as_ref() + && event_handler.has_execution_payload_gossip_subscribers() + { + event_handler.register(EventKind::ExecutionPayloadGossip( + SseExecutionPayloadGossip { + slot: block.slot(), + builder_index, + block_hash: signed_envelope.message.payload.block_hash, + block_root: beacon_block_root, + state_root: signed_envelope.message.state_root, + }, + )); + } + Ok(Self { signed_envelope, block, @@ -235,6 +251,7 @@ impl BeaconChain { beacon_proposer_cache: &self.beacon_proposer_cache, validator_pubkey_cache: &self.validator_pubkey_cache, genesis_validators_root: self.genesis_validators_root, + event_handler: &self.event_handler, } } diff --git a/beacon_node/http_api/src/beacon/execution_payload_envelope.rs b/beacon_node/http_api/src/beacon/execution_payload_envelope.rs index 81f2ea41ea..4a974c9919 100644 --- a/beacon_node/http_api/src/beacon/execution_payload_envelope.rs +++ b/beacon_node/http_api/src/beacon/execution_payload_envelope.rs @@ -1,16 +1,25 @@ +use crate::block_id::BlockId; use crate::task_spawner::{Priority, TaskSpawner}; use crate::utils::{ChainFilter, EthV1Filter, NetworkTxFilter, ResponseFilter, TaskSpawnerFilter}; +use crate::version::{ + ResponseIncludesVersion, add_consensus_version_header, add_ssz_content_type_header, + execution_optimistic_finalized_beacon_response, +}; use beacon_chain::{BeaconChain, BeaconChainTypes}; use bytes::Bytes; +use eth2::types as api_types; use eth2::{CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER}; use lighthouse_network::PubsubMessage; use network::NetworkMessage; -use ssz::Decode; +use ssz::{Decode, Encode}; use std::sync::Arc; use tokio::sync::mpsc::UnboundedSender; use tracing::{info, warn}; use types::SignedExecutionPayloadEnvelope; -use warp::{Filter, Rejection, Reply, reply::Response}; +use warp::{ + Filter, Rejection, Reply, + hyper::{Body, Response}, +}; // POST beacon/execution_payload_envelope (SSZ) pub(crate) fn post_beacon_execution_payload_envelope_ssz( @@ -81,7 +90,7 @@ pub async fn publish_execution_payload_envelope( envelope: SignedExecutionPayloadEnvelope, chain: Arc>, network_tx: &UnboundedSender>, -) -> Result { +) -> Result, Rejection> { let slot = envelope.message.slot; let beacon_block_root = envelope.message.beacon_block_root; @@ -114,3 +123,72 @@ pub async fn publish_execution_payload_envelope( Ok(warp::reply().into_response()) } + +// TODO(gloas): add tests for this endpoint once we support importing payloads into the db +// GET beacon/execution_payload_envelope/{block_id} +pub(crate) fn get_beacon_execution_payload_envelope( + eth_v1: EthV1Filter, + block_id_or_err: impl Filter + + Clone + + Send + + Sync + + 'static, + task_spawner_filter: TaskSpawnerFilter, + chain_filter: ChainFilter, +) -> ResponseFilter { + eth_v1 + .and(warp::path("beacon")) + .and(warp::path("execution_payload_envelope")) + .and(block_id_or_err) + .and(warp::path::end()) + .and(task_spawner_filter) + .and(chain_filter) + .and(warp::header::optional::("accept")) + .then( + |block_id: BlockId, + task_spawner: TaskSpawner, + chain: Arc>, + accept_header: Option| { + task_spawner.blocking_response_task(Priority::P1, move || { + let (root, execution_optimistic, finalized) = block_id.root(&chain)?; + + let envelope = chain + .get_payload_envelope(&root) + .map_err(warp_utils::reject::unhandled_error)? + .ok_or_else(|| { + warp_utils::reject::custom_not_found(format!( + "execution payload envelope for block root {root}" + )) + })?; + + let fork_name = chain + .spec + .fork_name_at_slot::(envelope.message.slot); + + match accept_header { + Some(api_types::Accept::Ssz) => Response::builder() + .status(200) + .body(envelope.as_ssz_bytes().into()) + .map(|res: Response| add_ssz_content_type_header(res)) + .map_err(|e| { + warp_utils::reject::custom_server_error(format!( + "failed to create response: {}", + e + )) + }), + _ => { + let res = execution_optimistic_finalized_beacon_response( + ResponseIncludesVersion::Yes(fork_name), + execution_optimistic, + finalized, + &envelope, + )?; + Ok(warp::reply::json(&res).into_response()) + } + } + .map(|resp| add_consensus_version_header(resp, fork_name)) + }) + }, + ) + .boxed() +} diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 68ab91dc4c..5df1078617 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -35,7 +35,8 @@ mod validators; mod version; use crate::beacon::execution_payload_envelope::{ - post_beacon_execution_payload_envelope, post_beacon_execution_payload_envelope_ssz, + get_beacon_execution_payload_envelope, post_beacon_execution_payload_envelope, + post_beacon_execution_payload_envelope_ssz, }; use crate::beacon::pool::*; use crate::light_client::{get_light_client_bootstrap, get_light_client_updates}; @@ -1509,6 +1510,14 @@ pub fn serve( network_tx_filter.clone(), ); + // GET beacon/execution_payload_envelope/{block_id} + let get_beacon_execution_payload_envelope = get_beacon_execution_payload_envelope( + eth_v1.clone(), + block_id_or_err, + task_spawner_filter.clone(), + chain_filter.clone(), + ); + let beacon_rewards_path = eth_v1 .clone() .and(warp::path("beacon")) @@ -3308,6 +3317,7 @@ pub fn serve( .uor(get_beacon_block_root) .uor(get_blob_sidecars) .uor(get_blobs) + .uor(get_beacon_execution_payload_envelope) .uor(get_beacon_pool_attestations) .uor(get_beacon_pool_attester_slashings) .uor(get_beacon_pool_proposer_slashings) diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 40c5ef58a6..d5140a3878 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -2732,6 +2732,55 @@ impl BeaconNodeHttpClient { Ok(()) } + /// Path for `v1/beacon/execution_payload_envelope/{block_id}` + pub fn get_beacon_execution_payload_envelope_path( + &self, + block_id: BlockId, + ) -> Result { + let mut path = self.eth_path(V1)?; + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("execution_payload_envelope") + .push(&block_id.to_string()); + Ok(path) + } + + /// `GET v1/beacon/execution_payload_envelope/{block_id}` + /// + /// Returns `Ok(None)` on a 404 error. + pub async fn get_beacon_execution_payload_envelope( + &self, + block_id: BlockId, + ) -> Result< + Option>>, + Error, + > { + let path = self.get_beacon_execution_payload_envelope_path(block_id)?; + self.get_opt(path) + .await + .map(|opt| opt.map(BeaconResponse::ForkVersioned)) + } + + /// `GET v1/beacon/execution_payload_envelope/{block_id}` in SSZ format + /// + /// Returns `Ok(None)` on a 404 error. + pub async fn get_beacon_execution_payload_envelope_ssz( + &self, + block_id: BlockId, + ) -> Result>, Error> { + let path = self.get_beacon_execution_payload_envelope_path(block_id)?; + let opt_response = self + .get_bytes_opt_accept_header(path, Accept::Ssz, self.timeouts.get_beacon_blocks_ssz) + .await?; + match opt_response { + Some(bytes) => SignedExecutionPayloadEnvelope::from_ssz_bytes(&bytes) + .map(Some) + .map_err(Error::InvalidSsz), + None => Ok(None), + } + } + /// `GET v2/validator/blocks/{slot}` in ssz format pub async fn get_validator_blocks_ssz( &self, diff --git a/consensus/types/src/core/chain_spec.rs b/consensus/types/src/core/chain_spec.rs index 01c4c7bbfd..cc79d3fc29 100644 --- a/consensus/types/src/core/chain_spec.rs +++ b/consensus/types/src/core/chain_spec.rs @@ -828,15 +828,17 @@ impl ChainSpec { /// Returns the min epoch for blob / data column sidecar requests based on the current epoch. /// Switch to use the column sidecar config once the `blob_retention_epoch` has passed Fulu fork epoch. + /// Never uses the `blob_retention_epoch` for networks that started with Fulu enabled. pub fn min_epoch_data_availability_boundary(&self, current_epoch: Epoch) -> Option { - let fork_epoch = self.deneb_fork_epoch?; + let deneb_fork_epoch = self.deneb_fork_epoch?; let blob_retention_epoch = current_epoch.saturating_sub(self.min_epochs_for_blob_sidecars_requests); - match self.fulu_fork_epoch { - Some(fulu_fork_epoch) if blob_retention_epoch > fulu_fork_epoch => Some( - current_epoch.saturating_sub(self.min_epochs_for_data_column_sidecars_requests), - ), - _ => Some(std::cmp::max(fork_epoch, blob_retention_epoch)), + if let Some(fulu_fork_epoch) = self.fulu_fork_epoch + && blob_retention_epoch >= fulu_fork_epoch + { + Some(current_epoch.saturating_sub(self.min_epochs_for_data_column_sidecars_requests)) + } else { + Some(std::cmp::max(deneb_fork_epoch, blob_retention_epoch)) } } @@ -3398,17 +3400,19 @@ mod yaml_tests { spec.min_epoch_data_availability_boundary(fulu_fork_epoch) ); - // `min_epochs_for_data_sidecar_requests` at fulu fork epoch + min_epochs_for_blob_sidecars_request - let blob_retention_epoch_after_fulu = fulu_fork_epoch + blob_retention_epochs; - let expected_blob_retention_epoch = blob_retention_epoch_after_fulu - blob_retention_epochs; + // Now, the blob retention period starts still before the fulu fork epoch, so the boundary + // should respect the blob retention period. + let half_blob_retention_epoch_after_fulu = fulu_fork_epoch + (blob_retention_epochs / 2); + let expected_blob_retention_epoch = + half_blob_retention_epoch_after_fulu - blob_retention_epochs; assert_eq!( Some(expected_blob_retention_epoch), - spec.min_epoch_data_availability_boundary(blob_retention_epoch_after_fulu) + spec.min_epoch_data_availability_boundary(half_blob_retention_epoch_after_fulu) ); - // After the final blob retention epoch, `min_epochs_for_data_sidecar_requests` should be calculated - // using `min_epochs_for_data_column_sidecars_request` - let current_epoch = blob_retention_epoch_after_fulu + 1; + // If the retention period starts with the fulu fork epoch, there are no more blobs to + // retain, and the return value will be based on the data column retention period. + let current_epoch = fulu_fork_epoch + blob_retention_epochs; let expected_data_column_retention_epoch = current_epoch - data_column_retention_epochs; assert_eq!( Some(expected_data_column_retention_epoch), @@ -3416,6 +3420,39 @@ mod yaml_tests { ); } + #[test] + fn min_epochs_for_data_sidecar_requests_fulu_genesis() { + type E = MainnetEthSpec; + let spec = { + // fulu active at genesis + let mut spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); + // set a different value for testing purpose, 4096 / 2 = 2048 + spec.min_epochs_for_data_column_sidecars_requests = + spec.min_epochs_for_blob_sidecars_requests / 2; + Arc::new(spec) + }; + let blob_retention_epochs = spec.min_epochs_for_blob_sidecars_requests; + let data_column_retention_epochs = spec.min_epochs_for_data_column_sidecars_requests; + + // If Fulu is activated at genesis, the column retention period should always be used. + let assert_correct_boundary = |epoch| { + let epoch = Epoch::new(epoch); + assert_eq!( + Some(epoch.saturating_sub(data_column_retention_epochs)), + spec.min_epoch_data_availability_boundary(epoch) + ) + }; + + assert_correct_boundary(0); + assert_correct_boundary(1); + assert_correct_boundary(blob_retention_epochs - 1); + assert_correct_boundary(blob_retention_epochs); + assert_correct_boundary(blob_retention_epochs + 1); + assert_correct_boundary(data_column_retention_epochs - 1); + assert_correct_boundary(data_column_retention_epochs); + assert_correct_boundary(data_column_retention_epochs + 1); + } + #[test] fn proposer_shuffling_decision_root_around_epoch_boundary() { type E = MainnetEthSpec;