From 8378f7d2949f03f00ee61b2d70b7801dfe185aeb Mon Sep 17 00:00:00 2001 From: Eitan Seri- Levi Date: Tue, 3 Mar 2026 18:32:57 -0800 Subject: [PATCH] Fixes based on feedback --- beacon_node/beacon_chain/src/errors.rs | 9 ++ .../execution_payload_envelope_streamer.rs | 89 +++++++++++-------- .../lighthouse_network/src/rpc/methods.rs | 4 +- .../lighthouse_network/src/rpc/protocol.rs | 34 +++++-- .../src/network_beacon_processor/tests.rs | 20 ++--- common/task_executor/src/lib.rs | 40 +++++++++ consensus/types/src/core/chain_spec.rs | 15 +++- .../execution/execution_payload_envelope.rs | 4 +- .../signed_execution_payload_envelope.rs | 31 +++++++ 9 files changed, 186 insertions(+), 60 deletions(-) diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 6c8f0d2794..d99afe1dba 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -325,6 +325,15 @@ pub enum BlockProductionError { GloasNotImplemented(String), } +impl From for BeaconChainError { + fn from(e: task_executor::SpawnBlockingError) -> Self { + match e { + task_executor::SpawnBlockingError::RuntimeShutdown => BeaconChainError::RuntimeShutdown, + task_executor::SpawnBlockingError::JoinError(e) => BeaconChainError::TokioJoin(e), + } + } +} + easy_from_to!(BlockProcessingError, BlockProductionError); easy_from_to!(BeaconStateError, BlockProductionError); easy_from_to!(SlotProcessingError, BlockProductionError); diff --git a/beacon_node/beacon_chain/src/execution_payload_envelope_streamer.rs b/beacon_node/beacon_chain/src/execution_payload_envelope_streamer.rs index a30682e3a5..d1dae6efc9 100644 --- a/beacon_node/beacon_chain/src/execution_payload_envelope_streamer.rs +++ b/beacon_node/beacon_chain/src/execution_payload_envelope_streamer.rs @@ -15,7 +15,9 @@ type PayloadEnvelopeResult = Result>>, BeaconChainError>; pub struct PayloadEnvelopeStreamer { - execution_layer: ExecutionLayer, + // TODO(gloas) remove _ when we use the execution layer + // to load payload envelopes + _execution_layer: ExecutionLayer, store: BeaconStore, task_executor: TaskExecutor, _check_caches: CheckCaches, @@ -36,7 +38,7 @@ impl PayloadEnvelopeStreamer { .clone(); Ok(Arc::new(Self { - execution_layer, + _execution_layer: execution_layer, store, task_executor, _check_caches: check_caches, @@ -53,31 +55,56 @@ impl PayloadEnvelopeStreamer { None } - // used when the execution engine doesn't support the payload bodies methods - async fn stream_payload_envelopes_fallback( + async fn load_envelopes( + self: &Arc, + beacon_block_roots: &[Hash256], + ) -> Result)>, BeaconChainError> { + let streamer = self.clone(); + let roots = beacon_block_roots.to_vec(); + // Loading from the DB is slow -> spawn a blocking task + self.task_executor + .spawn_blocking_and_await( + move || { + let mut results = Vec::new(); + for root in roots { + if let Some(cached) = streamer.check_payload_envelope_cache(root) { + results.push((root, Ok(Some(cached)))); + continue; + } + // TODO(gloas) we'll want to use the execution layer directly to call + // the engine api method eth_getBlockByHash() + match streamer.store.get_payload_envelope(&root) { + Ok(opt_envelope) => { + results.push((root, Ok(opt_envelope.map(Arc::new)))); + } + Err(e) => { + results.push((root, Err(BeaconChainError::DBError(e)))); + } + } + } + results + }, + "load_execution_payload_envelopes", + ) + .await + .map_err(BeaconChainError::from) + } + + async fn stream_payload_envelopes( self: Arc, beacon_block_roots: Vec, sender: UnboundedSender<(Hash256, Arc>)>, ) { - debug!("Using slower fallback method of eth_getBlockByHash()"); - for beacon_block_root in beacon_block_roots { - let cached_envelope = self.check_payload_envelope_cache(beacon_block_root); + let results = match self.load_envelopes(&beacon_block_roots).await { + Ok(results) => results, + Err(e) => { + send_errors(beacon_block_roots, sender, e).await; + return; + } + }; - let envelope_result = if cached_envelope.is_some() { - Ok(cached_envelope) - } else { - // TODO(gloas) we'll want to use the execution layer directly to call - // the engine api method eth_getBlockByHash() - self.store - .get_payload_envelope(&beacon_block_root) - .map(|opt_envelope| opt_envelope.map(Arc::new)) - .map_err(BeaconChainError::DBError) - }; - - if sender - .send((beacon_block_root, Arc::new(envelope_result))) - .is_err() - { + for (root, result) in results { + if sender.send((root, Arc::new(result))).is_err() { break; } } @@ -88,22 +115,8 @@ impl PayloadEnvelopeStreamer { beacon_block_roots: Vec, sender: UnboundedSender<(Hash256, Arc>)>, ) { - match self - .execution_layer - .get_engine_capabilities(None) - .await - .map_err(Box::new) - .map_err(BeaconChainError::EngineGetCapabilititesFailed) - { - Ok(_engine_capabilities) => { - // TODO(gloas) should check engine capabilities for get_payload_bodies_by_range_v1 - self.stream_payload_envelopes_fallback(beacon_block_roots, sender) - .await; - } - Err(e) => { - send_errors(beacon_block_roots, sender, e).await; - } - } + self.stream_payload_envelopes(beacon_block_roots, sender) + .await; } pub fn launch_stream( diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 390510d2e3..baabf48683 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -528,9 +528,7 @@ impl PayloadEnvelopesByRootRequest { beacon_block_roots: Vec, fork_context: &ForkContext, ) -> Result { - let max_requests_envelopes = fork_context - .spec - .max_request_payloads(fork_context.current_fork_name()); + let max_requests_envelopes = fork_context.spec.max_request_payloads(); let beacon_block_roots = RuntimeVariableList::new(beacon_block_roots, max_requests_envelopes).map_err(|e| { diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 17b4054248..74c9ddef9c 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -22,7 +22,7 @@ use types::{ LightClientBootstrap, LightClientBootstrapAltair, LightClientFinalityUpdate, LightClientFinalityUpdateAltair, LightClientOptimisticUpdate, LightClientOptimisticUpdateAltair, LightClientUpdate, MainnetEthSpec, MinimalEthSpec, - SignedBeaconBlock, + SignedBeaconBlock, SignedExecutionPayloadEnvelope, }; // Note: Hardcoding the `EthSpec` type for `SignedBeaconBlock` as min/max values is @@ -65,6 +65,12 @@ pub static SIGNED_BEACON_BLOCK_BELLATRIX_MAX: LazyLock = + types::ExecutionPayload::::max_execution_payload_bellatrix_size() // adding max size of execution payload (~16gb) + ssz::BYTES_PER_LENGTH_OFFSET); // Adding the additional ssz offset for the `ExecutionPayload` field +pub static SIGNED_EXECUTION_PAYLOAD_ENVELOPE_MIN: LazyLock = + LazyLock::new(SignedExecutionPayloadEnvelope::::min_size); + +pub static SIGNED_EXECUTION_PAYLOAD_ENVELOPE_MAX: LazyLock = + LazyLock::new(SignedExecutionPayloadEnvelope::::max_size); + pub static BLOB_SIDECAR_SIZE: LazyLock = LazyLock::new(BlobSidecar::::max_size); @@ -147,6 +153,14 @@ pub fn rpc_block_limits_by_fork(current_fork: ForkName) -> RpcLimits { } } +/// Returns the rpc limits for payload_envelope_by_range and payload_envelope_by_root responses. +pub fn rpc_payload_limits() -> RpcLimits { + RpcLimits::new( + *SIGNED_EXECUTION_PAYLOAD_ENVELOPE_MIN, + *SIGNED_EXECUTION_PAYLOAD_ENVELOPE_MAX, + ) +} + fn rpc_light_client_updates_by_range_limits_by_fork(current_fork: ForkName) -> RpcLimits { let altair_fixed_len = LightClientFinalityUpdateAltair::::ssz_fixed_len(); @@ -425,8 +439,14 @@ impl SupportedProtocol { } if fork_context.fork_exists(ForkName::Gloas) { supported.extend_from_slice(&[ - ProtocolId::new(SupportedProtocol::PayloadEnvelopesByRangeV1, Encoding::SSZSnappy), - ProtocolId::new(SupportedProtocol::PayloadEnvelopesByRootV1, Encoding::SSZSnappy), + ProtocolId::new( + SupportedProtocol::PayloadEnvelopesByRangeV1, + Encoding::SSZSnappy, + ), + ProtocolId::new( + SupportedProtocol::PayloadEnvelopesByRootV1, + Encoding::SSZSnappy, + ), ]); } supported @@ -535,7 +555,9 @@ impl ProtocolId { ::ssz_fixed_len(), ::ssz_fixed_len(), ), - Protocol::PayloadEnvelopesByRoot => RpcLimits::new(0, spec.max_blocks_by_root_request), + Protocol::PayloadEnvelopesByRoot => { + RpcLimits::new(0, spec.max_payload_envelopes_by_root_request) + } Protocol::BlobsByRange => RpcLimits::new( ::ssz_fixed_len(), ::ssz_fixed_len(), @@ -577,9 +599,7 @@ impl ProtocolId { Protocol::PayloadEnvelopesByRange => { rpc_block_limits_by_fork(fork_context.current_fork_name()) } - Protocol::PayloadEnvelopesByRoot => { - rpc_block_limits_by_fork(fork_context.current_fork_name()) - } + Protocol::PayloadEnvelopesByRoot => rpc_payload_limits(), Protocol::BlobsByRange => rpc_blob_limits::(), Protocol::BlobsByRoot => rpc_blob_limits::(), Protocol::DataColumnsByRoot => { diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 7e75f3be04..2e4b0fbd2a 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -2160,7 +2160,7 @@ async fn test_payload_envelopes_by_range() { let slot_count = 32; // Manually store payload envelopes for each block in the range - let mut expected_count = 0; + let mut expected_roots = Vec::new(); for slot in start_slot..slot_count { if let Some(root) = rig .chain @@ -2172,13 +2172,13 @@ async fn test_payload_envelopes_by_range() { .store .put_payload_envelope(&root, envelope) .unwrap(); - expected_count += 1; + expected_roots.push(root); } } rig.enqueue_payload_envelopes_by_range_request(start_slot, slot_count); - let mut actual_count = 0; + let mut actual_roots = Vec::new(); while let Some(next) = rig.network_rx.recv().await { if let NetworkMessage::SendResponse { peer_id: _, @@ -2186,8 +2186,8 @@ async fn test_payload_envelopes_by_range() { inbound_request_id: _, } = next { - if envelope.is_some() { - actual_count += 1; + if let Some(env) = envelope { + actual_roots.push(env.beacon_block_root()); } else { break; } @@ -2198,7 +2198,7 @@ async fn test_payload_envelopes_by_range() { panic!("unexpected message {:?}", next); } } - assert_eq!(expected_count, actual_count); + assert_eq!(expected_roots, actual_roots); } #[tokio::test] @@ -2226,7 +2226,7 @@ async fn test_payload_envelopes_by_root() { let roots = RuntimeVariableList::new(vec![block_root], 1).unwrap(); rig.enqueue_payload_envelopes_by_root_request(roots); - let mut actual_count = 0; + let mut actual_roots = Vec::new(); while let Some(next) = rig.network_rx.recv().await { if let NetworkMessage::SendResponse { peer_id: _, @@ -2234,8 +2234,8 @@ async fn test_payload_envelopes_by_root() { inbound_request_id: _, } = next { - if envelope.is_some() { - actual_count += 1; + if let Some(env) = envelope { + actual_roots.push(env.beacon_block_root()); } else { break; } @@ -2243,7 +2243,7 @@ async fn test_payload_envelopes_by_root() { panic!("unexpected message {:?}", next); } } - assert_eq!(1, actual_count); + assert_eq!(vec![block_root], actual_roots); } #[tokio::test] diff --git a/common/task_executor/src/lib.rs b/common/task_executor/src/lib.rs index d3d862f96c..25cf7acb41 100644 --- a/common/task_executor/src/lib.rs +++ b/common/task_executor/src/lib.rs @@ -12,6 +12,26 @@ use crate::rayon_pool_provider::RayonPoolProvider; pub use crate::rayon_pool_provider::RayonPoolType; pub use tokio::task::JoinHandle; +/// Error type for spawning a blocking task and awaiting its result. +#[derive(Debug)] +pub enum SpawnBlockingError { + /// The runtime is shutting down. + RuntimeShutdown, + /// The blocking task failed (e.g. due to a panic). + JoinError(tokio::task::JoinError), +} + +impl std::fmt::Display for SpawnBlockingError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SpawnBlockingError::RuntimeShutdown => write!(f, "runtime shutdown"), + SpawnBlockingError::JoinError(e) => write!(f, "join error: {e}"), + } + } +} + +impl std::error::Error for SpawnBlockingError {} + /// Provides a reason when Lighthouse is shut down. #[derive(Copy, Clone, Debug, PartialEq)] pub enum ShutdownReason { @@ -343,6 +363,26 @@ impl TaskExecutor { Some(future) } + /// Spawn a blocking task and await its result. + /// + /// Maps the `Option` (runtime shutdown) and `tokio::JoinError` into a single + /// `SpawnBlockingError`. + pub async fn spawn_blocking_and_await( + &self, + task: F, + name: &'static str, + ) -> Result + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let handle = self + .spawn_blocking_handle(task, name) + .ok_or(SpawnBlockingError::RuntimeShutdown)?; + + handle.await.map_err(SpawnBlockingError::JoinError) + } + /// Block the current (non-async) thread on the completion of some future. /// /// ## Warning diff --git a/consensus/types/src/core/chain_spec.rs b/consensus/types/src/core/chain_spec.rs index 90253c12aa..2f3b5da956 100644 --- a/consensus/types/src/core/chain_spec.rs +++ b/consensus/types/src/core/chain_spec.rs @@ -306,6 +306,7 @@ pub struct ChainSpec { pub max_blocks_by_root_request_deneb: usize, pub max_blobs_by_root_request: usize, pub max_data_columns_by_root_request: usize, + pub max_payload_envelopes_by_root_request: usize, /* * Application params @@ -969,6 +970,8 @@ impl ChainSpec { max_blobs_by_root_request_common(self.max_request_blob_sidecars); self.max_data_columns_by_root_request = max_data_columns_by_root_request_common::(self.max_request_blocks_deneb); + self.max_payload_envelopes_by_root_request = + max_blocks_by_root_request_common(self.max_request_payloads); self } @@ -1299,6 +1302,7 @@ impl ChainSpec { min_epochs_for_data_column_sidecars_requests: default_min_epochs_for_data_column_sidecars_requests(), max_data_columns_by_root_request: default_data_columns_by_root_request(), + max_payload_envelopes_by_root_request: default_max_payload_envelopes_by_root_request(), /* * Application specific @@ -1629,7 +1633,7 @@ impl ChainSpec { builder_payment_threshold_denominator: 10, min_builder_withdrawability_delay: Epoch::new(4096), max_request_payloads: 128, - + /* * Network specific */ @@ -1685,6 +1689,7 @@ impl ChainSpec { min_epochs_for_data_column_sidecars_requests: default_min_epochs_for_data_column_sidecars_requests(), max_data_columns_by_root_request: default_data_columns_by_root_request(), + max_payload_envelopes_by_root_request: default_max_payload_envelopes_by_root_request(), /* * Application specific @@ -2349,6 +2354,14 @@ fn default_data_columns_by_root_request() -> usize { max_data_columns_by_root_request_common::(default_max_request_blocks_deneb()) } +fn default_max_payload_envelopes_by_root_request() -> usize { + max_blocks_by_root_request_common(default_max_request_payloads()) +} + +fn default_max_request_payloads() -> u64 { + 128 +} + impl Default for Config { fn default() -> Self { let chain_spec = MainnetEthSpec::default_spec(); diff --git a/consensus/types/src/execution/execution_payload_envelope.rs b/consensus/types/src/execution/execution_payload_envelope.rs index 7f68dae037..64afaa8655 100644 --- a/consensus/types/src/execution/execution_payload_envelope.rs +++ b/consensus/types/src/execution/execution_payload_envelope.rs @@ -8,7 +8,9 @@ use ssz_derive::{Decode, Encode}; use test_random_derive::TestRandom; use tree_hash_derive::TreeHash; -#[derive(Debug, Clone, Serialize, Encode, Decode, Deserialize, TestRandom, TreeHash, Educe)] +#[derive( + Debug, Default, Clone, Serialize, Encode, Decode, Deserialize, TestRandom, TreeHash, Educe, +)] #[educe(PartialEq, Hash(bound(E: EthSpec)))] #[context_deserialize(ForkName)] #[serde(bound = "E: EthSpec")] diff --git a/consensus/types/src/execution/signed_execution_payload_envelope.rs b/consensus/types/src/execution/signed_execution_payload_envelope.rs index b1d949f863..65c657e878 100644 --- a/consensus/types/src/execution/signed_execution_payload_envelope.rs +++ b/consensus/types/src/execution/signed_execution_payload_envelope.rs @@ -8,6 +8,7 @@ use bls::{PublicKey, Signature}; use context_deserialize::context_deserialize; use educe::Educe; use serde::{Deserialize, Serialize}; +use ssz::{BYTES_PER_LENGTH_OFFSET, Encode as SszEncode}; use ssz_derive::{Decode, Encode}; use test_random_derive::TestRandom; use tree_hash_derive::TreeHash; @@ -22,6 +23,36 @@ pub struct SignedExecutionPayloadEnvelope { } impl SignedExecutionPayloadEnvelope { + /// Returns the minimum SSZ-encoded size (all variable-length fields empty). + pub fn min_size() -> usize { + Self { + message: ExecutionPayloadEnvelope::default(), + signature: Signature::empty(), + } + .as_ssz_bytes() + .len() + } + + /// Returns the maximum SSZ-encoded size. + #[allow(clippy::arithmetic_side_effects)] + pub fn max_size() -> usize { + // Start from the min size (all variable-length fields empty) + Self::min_size() + // ExecutionPayloadGloas variable-length fields: + + (E::max_extra_data_bytes() * ::ssz_fixed_len()) + + (E::max_transactions_per_payload() + * (BYTES_PER_LENGTH_OFFSET + E::max_bytes_per_transaction())) + + (E::max_withdrawals_per_payload() + * ::ssz_fixed_len()) + // ExecutionRequests variable-length fields: + + (E::max_deposit_requests_per_payload() + * ::ssz_fixed_len()) + + (E::max_withdrawal_requests_per_payload() + * ::ssz_fixed_len()) + + (E::max_consolidation_requests_per_payload() + * ::ssz_fixed_len()) + } + pub fn slot(&self) -> Slot { self.message.slot }