From 561898fc1c74c11a1a765f252ae504f35263f6ed Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Thu, 19 Feb 2026 06:08:56 +0530 Subject: [PATCH 01/13] Process head_chains in descending order of number of peers (#8859) N/A Another find by @gitToki. Sort the preferred_ids in descending order as originally intended from the comment in the function. Co-Authored-By: Pawan Dhananjay --- beacon_node/network/src/sync/range_sync/chain_collection.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 1d57ee6c3d..bd4dd6c181 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -351,7 +351,8 @@ impl ChainCollection { .iter() .map(|(id, chain)| (chain.available_peers(), !chain.is_syncing(), *id)) .collect::>(); - preferred_ids.sort_unstable(); + // Sort in descending order + preferred_ids.sort_unstable_by(|a, b| b.cmp(a)); let mut syncing_chains = SmallVec::<[Id; PARALLEL_HEAD_CHAINS]>::new(); for (_, _, id) in preferred_ids { From 4588971085840dc56cedc85ba0f12bcaa99be8ed Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Thu, 19 Feb 2026 12:57:53 +1100 Subject: [PATCH 02/13] Add sync batch state metrics (#8847) Co-Authored-By: Jimmy Chen --- beacon_node/network/src/metrics.rs | 7 +++++ .../network/src/sync/backfill_sync/mod.rs | 20 +++++++++++++- beacon_node/network/src/sync/batch.rs | 26 ++++++++++++++++++- .../src/sync/custody_backfill_sync/mod.rs | 21 +++++++++++++-- beacon_node/network/src/sync/manager.rs | 3 +++ .../network/src/sync/range_sync/chain.rs | 11 +++++++- .../src/sync/range_sync/chain_collection.rs | 21 +++++++++++++++ .../network/src/sync/range_sync/range.rs | 4 +++ 8 files changed, 108 insertions(+), 5 deletions(-) diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index cea06a28c8..0fa95b4758 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -462,6 +462,13 @@ pub static SYNCING_CHAIN_BATCH_AWAITING_PROCESSING: LazyLock> ]), ) }); +pub static SYNCING_CHAIN_BATCHES: LazyLock> = LazyLock::new(|| { + try_create_int_gauge_vec( + "sync_batches", + "Number of batches in sync chains by sync type and state", + &["sync_type", "state"], + ) +}); pub static SYNC_SINGLE_BLOCK_LOOKUPS: LazyLock> = LazyLock::new(|| { try_create_int_gauge( "sync_single_block_lookups", diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 9802ec56a1..f18d31863b 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -8,9 +8,11 @@ //! If a batch fails, the backfill sync cannot progress. In this scenario, we mark the backfill //! sync as failed, log an error and attempt to retry once a new peer joins the node. +use crate::metrics; use crate::network_beacon_processor::ChainSegmentProcessId; use crate::sync::batch::{ - BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, + BatchConfig, BatchId, BatchInfo, BatchMetricsState, BatchOperationOutcome, + BatchProcessingResult, BatchState, }; use crate::sync::block_sidecar_coupling::CouplingError; use crate::sync::manager::BatchProcessResult; @@ -31,6 +33,7 @@ use std::collections::{ use std::hash::{Hash, Hasher}; use std::marker::PhantomData; use std::sync::Arc; +use strum::IntoEnumIterator; use tracing::{debug, error, info, warn}; use types::{ColumnIndex, Epoch, EthSpec}; @@ -1181,6 +1184,21 @@ impl BackFillSync { .epoch(T::EthSpec::slots_per_epoch()) } + pub fn register_metrics(&self) { + for state in BatchMetricsState::iter() { + let count = self + .batches + .values() + .filter(|b| b.state().metrics_state() == state) + .count(); + metrics::set_gauge_vec( + &metrics::SYNCING_CHAIN_BATCHES, + &["backfill", state.into()], + count as i64, + ); + } + } + /// Updates the global network state indicating the current state of a backfill sync. fn set_state(&self, state: BackFillState) { *self.network_globals.backfill_state.write() = state; diff --git a/beacon_node/network/src/sync/batch.rs b/beacon_node/network/src/sync/batch.rs index 8de386f5be..8f8d39ca4b 100644 --- a/beacon_node/network/src/sync/batch.rs +++ b/beacon_node/network/src/sync/batch.rs @@ -10,10 +10,22 @@ use std::marker::PhantomData; use std::ops::Sub; use std::time::Duration; use std::time::Instant; -use strum::Display; +use strum::{Display, EnumIter, IntoStaticStr}; use types::Slot; use types::{DataColumnSidecarList, Epoch, EthSpec}; +/// Batch states used as metrics labels. +#[derive(Debug, Clone, Copy, PartialEq, Eq, EnumIter, IntoStaticStr)] +#[strum(serialize_all = "snake_case")] +pub enum BatchMetricsState { + AwaitingDownload, + Downloading, + AwaitingProcessing, + Processing, + AwaitingValidation, + Failed, +} + pub type BatchId = Epoch; /// Type of expected batch. @@ -142,6 +154,18 @@ impl BatchState { pub fn poison(&mut self) -> BatchState { std::mem::replace(self, BatchState::Poisoned) } + + /// Returns the metrics state for this batch. + pub fn metrics_state(&self) -> BatchMetricsState { + match self { + BatchState::AwaitingDownload => BatchMetricsState::AwaitingDownload, + BatchState::Downloading(_) => BatchMetricsState::Downloading, + BatchState::AwaitingProcessing(..) => BatchMetricsState::AwaitingProcessing, + BatchState::Processing(_) => BatchMetricsState::Processing, + BatchState::AwaitingValidation(_) => BatchMetricsState::AwaitingValidation, + BatchState::Poisoned | BatchState::Failed => BatchMetricsState::Failed, + } + } } impl BatchInfo { diff --git a/beacon_node/network/src/sync/custody_backfill_sync/mod.rs b/beacon_node/network/src/sync/custody_backfill_sync/mod.rs index fa8b70c8b4..893aa849d3 100644 --- a/beacon_node/network/src/sync/custody_backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/custody_backfill_sync/mod.rs @@ -12,14 +12,16 @@ use lighthouse_network::{ }; use logging::crit; use std::hash::{DefaultHasher, Hash, Hasher}; +use strum::IntoEnumIterator; use tracing::{debug, error, info, info_span, warn}; use types::{DataColumnSidecarList, Epoch, EthSpec}; +use crate::metrics; use crate::sync::{ backfill_sync::{BACKFILL_EPOCHS_PER_BATCH, ProcessResult, SyncStart}, batch::{ - BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, - ByRangeRequestType, + BatchConfig, BatchId, BatchInfo, BatchMetricsState, BatchOperationOutcome, + BatchProcessingResult, BatchState, ByRangeRequestType, }, block_sidecar_coupling::CouplingError, manager::CustodyBatchProcessResult, @@ -1114,6 +1116,21 @@ impl CustodyBackFillSync { *self.network_globals.custody_sync_state.write() = state; } + pub fn register_metrics(&self) { + for state in BatchMetricsState::iter() { + let count = self + .batches + .values() + .filter(|b| b.state().metrics_state() == state) + .count(); + metrics::set_gauge_vec( + &metrics::SYNCING_CHAIN_BATCHES, + &["custody_backfill", state.into()], + count as i64, + ); + } + } + /// A fully synced peer has joined us. /// If we are in a failed state, update a local variable to indicate we are able to restart /// the failed sync on the next attempt. diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 096ed9c328..c2faff5b62 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -784,6 +784,9 @@ impl SyncManager { } _ = register_metrics_interval.tick() => { self.network.register_metrics(); + self.range_sync.register_metrics(); + self.backfill_sync.register_metrics(); + self.custody_backfill_sync.register_metrics(); } _ = epoch_interval.tick() => { self.update_sync_state(); diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index d67d6468a9..61161ae6f4 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -3,7 +3,8 @@ use crate::metrics; use crate::network_beacon_processor::ChainSegmentProcessId; use crate::sync::batch::BatchId; use crate::sync::batch::{ - BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, + BatchConfig, BatchInfo, BatchMetricsState, BatchOperationOutcome, BatchProcessingResult, + BatchState, }; use crate::sync::block_sidecar_coupling::CouplingError; use crate::sync::network_context::{RangeRequestId, RpcRequestSendError, RpcResponseError}; @@ -234,6 +235,14 @@ impl SyncingChain { .sum() } + /// Returns the number of batches in the given metrics state. + pub fn count_batches_in_state(&self, state: BatchMetricsState) -> usize { + self.batches + .values() + .filter(|b| b.state().metrics_state() == state) + .count() + } + /// Removes a peer from the chain. /// If the peer has active batches, those are considered failed and re-requested. pub fn remove_peer(&mut self, peer_id: &PeerId) -> ProcessingResult { diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index bd4dd6c181..b430b7c572 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -6,6 +6,7 @@ use super::chain::{ChainId, ProcessingResult, RemoveChain, SyncingChain}; use super::sync_type::RangeSyncType; use crate::metrics; +use crate::sync::batch::BatchMetricsState; use crate::sync::network_context::SyncNetworkContext; use beacon_chain::{BeaconChain, BeaconChainTypes}; use fnv::FnvHashMap; @@ -17,6 +18,7 @@ use smallvec::SmallVec; use std::collections::HashMap; use std::collections::hash_map::Entry; use std::sync::Arc; +use strum::IntoEnumIterator; use tracing::{debug, error}; use types::EthSpec; use types::{Epoch, Hash256, Slot}; @@ -516,6 +518,25 @@ impl ChainCollection { } } + pub fn register_metrics(&self) { + for (sync_type, chains) in [ + ("range_finalized", &self.finalized_chains), + ("range_head", &self.head_chains), + ] { + for state in BatchMetricsState::iter() { + let count: usize = chains + .values() + .map(|chain| chain.count_batches_in_state(state)) + .sum(); + metrics::set_gauge_vec( + &metrics::SYNCING_CHAIN_BATCHES, + &[sync_type, state.into()], + count as i64, + ); + } + } + } + fn update_metrics(&self) { metrics::set_gauge_vec( &metrics::SYNCING_CHAINS_COUNT, diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index c9656ad1d0..4c2123451a 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -371,6 +371,10 @@ where .update(network, &local, &mut self.awaiting_head_peers); } + pub fn register_metrics(&self) { + self.chains.register_metrics(); + } + /// Kickstarts sync. pub fn resume(&mut self, network: &mut SyncNetworkContext) { for (removed_chain, sync_type, remove_reason) in From ffc2b97699dff59c91dc07375e090e3473508240 Mon Sep 17 00:00:00 2001 From: Eitan Seri- Levi Date: Tue, 24 Feb 2026 00:55:29 -0800 Subject: [PATCH 03/13] Serve rpc by range and by root: --- beacon_node/beacon_chain/src/beacon_chain.rs | 53 ++++ .../execution_payload_envelope_streamer.rs | 138 +++++++++ beacon_node/beacon_chain/src/lib.rs | 1 + beacon_node/beacon_processor/src/lib.rs | 31 +- .../src/scheduler/work_queue.rs | 12 + .../src/peer_manager/mod.rs | 6 + .../lighthouse_network/src/rpc/codec.rs | 34 +++ .../lighthouse_network/src/rpc/config.rs | 28 ++ .../lighthouse_network/src/rpc/methods.rs | 70 ++++- .../lighthouse_network/src/rpc/protocol.rs | 51 ++++ .../src/rpc/rate_limiter.rs | 38 ++- .../src/service/api_types.rs | 15 + .../lighthouse_network/src/service/mod.rs | 38 +++ .../src/network_beacon_processor/mod.rs | 43 ++- .../network_beacon_processor/rpc_methods.rs | 284 ++++++++++++++++++ .../src/network_beacon_processor/tests.rs | 254 +++++++++++++++- beacon_node/network/src/router.rs | 23 ++ beacon_node/store/src/hot_cold_store.rs | 22 ++ beacon_node/store/src/lib.rs | 7 + 19 files changed, 1140 insertions(+), 8 deletions(-) create mode 100644 beacon_node/beacon_chain/src/execution_payload_envelope_streamer.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 26ad2e714b..6b773ac03a 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -30,6 +30,7 @@ use crate::early_attester_cache::EarlyAttesterCache; use crate::errors::{BeaconChainError as Error, BlockProductionError}; use crate::events::ServerSentEventHandler; use crate::execution_payload::{NotifyExecutionLayer, PreparePayloadHandle, get_execution_payload}; +use crate::execution_payload_envelope_streamer::PayloadEnvelopeStreamer; use crate::fetch_blobs::EngineGetBlobsOutput; use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx}; use crate::graffiti_calculator::{GraffitiCalculator, GraffitiSettings}; @@ -1125,6 +1126,58 @@ impl BeaconChain { .map_or_else(|| self.get_blobs(block_root), Ok) } + /// Returns the execution payload envelopes at the given roots, if any. + /// + /// Will also check any associated caches. The expected use for this function is *only* for returning blocks requested + /// from P2P peers. + /// + /// ## Errors + /// + /// May return a database error. + #[allow(clippy::type_complexity)] + pub fn get_payload_envelopes_checking_caches( + self: &Arc, + block_roots: Vec, + ) -> Result< + impl Stream< + Item = ( + Hash256, + Arc>>, Error>>, + ), + >, + Error, + > { + Ok(PayloadEnvelopeStreamer::::new( + self.execution_layer.clone(), + self.store.clone(), + self.task_executor.clone(), + CheckCaches::Yes, + )? + .launch_stream(block_roots)) + } + + #[allow(clippy::type_complexity)] + pub fn get_payload_envelopes( + self: &Arc, + block_roots: Vec, + ) -> Result< + impl Stream< + Item = ( + Hash256, + Arc>>, Error>>, + ), + >, + Error, + > { + Ok(PayloadEnvelopeStreamer::::new( + self.execution_layer.clone(), + self.store.clone(), + self.task_executor.clone(), + CheckCaches::No, + )? + .launch_stream(block_roots)) + } + pub fn get_data_columns_checking_all_caches( &self, block_root: Hash256, diff --git a/beacon_node/beacon_chain/src/execution_payload_envelope_streamer.rs b/beacon_node/beacon_chain/src/execution_payload_envelope_streamer.rs new file mode 100644 index 0000000000..e6522d7beb --- /dev/null +++ b/beacon_node/beacon_chain/src/execution_payload_envelope_streamer.rs @@ -0,0 +1,138 @@ +use std::sync::Arc; + +use bls::Hash256; +use execution_layer::ExecutionLayer; +use futures::Stream; +use task_executor::TaskExecutor; +use tokio::sync::mpsc::{self, UnboundedSender}; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tracing::debug; +use types::{EthSpec, SignedExecutionPayloadEnvelope}; + +use crate::{BeaconChainError, BeaconChainTypes, BeaconStore, beacon_block_streamer::CheckCaches}; + +type PayloadEnvelopeResult = + Result>>, BeaconChainError>; + +pub struct PayloadEnvelopeStreamer { + execution_layer: ExecutionLayer, + store: BeaconStore, + task_executor: TaskExecutor, + _check_caches: CheckCaches, +} + +// TODO(gloas) eventually we'll need to expand this to support loading blinded payload envelopes from the dsb +// and fetching the execution payload from the EL. See BlockStreamer impl as an example +impl PayloadEnvelopeStreamer { + pub fn new( + execution_layer_opt: Option>, + store: BeaconStore, + task_executor: TaskExecutor, + check_caches: CheckCaches, + ) -> Result, BeaconChainError> { + let execution_layer = execution_layer_opt + .as_ref() + .ok_or(BeaconChainError::ExecutionLayerMissing)? + .clone(); + + Ok(Arc::new(Self { + execution_layer, + store, + task_executor, + _check_caches: check_caches, + })) + } + + // TODO(gloas) simply a strub impl for now. Should check some exec payload envelope cache + // and return the envelope if it exists in the cache + fn check_payload_envelope_cache( + &self, + _beacon_block_root: Hash256, + ) -> Option>> { + // if self.check_caches == CheckCaches::Yes + None + } + + // used when the execution engine doesn't support the payload bodies methods + async fn stream_payload_envelopes_fallback( + self: Arc, + beacon_block_roots: Vec, + sender: UnboundedSender<(Hash256, Arc>)>, + ) { + debug!("Using slower fallback method of eth_getBlockByHash()"); + for beacon_block_root in beacon_block_roots { + let cached_envelope = self.check_payload_envelope_cache(beacon_block_root); + + let envelope_result = if cached_envelope.is_some() { + Ok(cached_envelope) + } else { + // TODO(gloas) we'll want to use the execution layer directly to call + // the engine api method eth_getBlockByHash() + self.store + .get_payload_envelope(&beacon_block_root) + .map(|opt_envelope| opt_envelope.map(Arc::new)) + .map_err(BeaconChainError::DBError) + }; + + if sender + .send((beacon_block_root, Arc::new(envelope_result))) + .is_err() + { + break; + } + } + } + + pub async fn stream( + self: Arc, + beacon_block_roots: Vec, + sender: UnboundedSender<(Hash256, Arc>)>, + ) { + match self + .execution_layer + .get_engine_capabilities(None) + .await + .map_err(Box::new) + .map_err(BeaconChainError::EngineGetCapabilititesFailed) + { + Ok(_engine_capabilities) => { + // TODO(gloas) should check engine capabilities for get_payload_bodies_by_range_v1 + self.stream_payload_envelopes_fallback(beacon_block_roots, sender) + .await; + } + Err(e) => { + send_errors(beacon_block_roots, sender, e).await; + } + } + } + + pub fn launch_stream( + self: Arc, + beacon_block_roots: Vec, + ) -> impl Stream>)> { + let (envelope_tx, envelope_rx) = mpsc::unbounded_channel(); + debug!( + envelopes = beacon_block_roots.len(), + "Launching a PayloadEnvelopeStreamer" + ); + let executor = self.task_executor.clone(); + executor.spawn( + self.stream(beacon_block_roots, envelope_tx), + "get_payload_envelopes_sender", + ); + UnboundedReceiverStream::new(envelope_rx) + } +} + +async fn send_errors( + beacon_block_roots: Vec, + sender: UnboundedSender<(Hash256, Arc>)>, + beacon_chain_error: BeaconChainError, +) { + let result = Arc::new(Err(beacon_chain_error)); + for beacon_block_root in beacon_block_roots { + if sender.send((beacon_block_root, result.clone())).is_err() { + break; + } + } +} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 3b03395a66..d7253f7969 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -24,6 +24,7 @@ mod early_attester_cache; mod errors; pub mod events; pub mod execution_payload; +pub mod execution_payload_envelope_streamer; pub mod fetch_blobs; pub mod fork_choice_signal; pub mod fork_revert; diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 33a00bfa49..8a672e066d 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -408,6 +408,8 @@ pub enum Work { Status(BlockingFn), BlocksByRangeRequest(AsyncFn), BlocksByRootsRequest(AsyncFn), + PayloadEnvelopesByRangeRequest(AsyncFn), + PayloadEnvelopesByRootRequest(AsyncFn), BlobsByRangeRequest(BlockingFn), BlobsByRootsRequest(BlockingFn), DataColumnsByRootsRequest(BlockingFn), @@ -464,6 +466,8 @@ pub enum WorkType { Status, BlocksByRangeRequest, BlocksByRootsRequest, + PayloadEnvelopesByRangeRequest, + PayloadEnvelopesByRootRequest, BlobsByRangeRequest, BlobsByRootsRequest, DataColumnsByRootsRequest, @@ -522,6 +526,8 @@ impl Work { Work::Status(_) => WorkType::Status, Work::BlocksByRangeRequest(_) => WorkType::BlocksByRangeRequest, Work::BlocksByRootsRequest(_) => WorkType::BlocksByRootsRequest, + Work::PayloadEnvelopesByRangeRequest(_) => WorkType::PayloadEnvelopesByRangeRequest, + Work::PayloadEnvelopesByRootRequest(_) => WorkType::PayloadEnvelopesByRootRequest, Work::BlobsByRangeRequest(_) => WorkType::BlobsByRangeRequest, Work::BlobsByRootsRequest(_) => WorkType::BlobsByRootsRequest, Work::DataColumnsByRootsRequest(_) => WorkType::DataColumnsByRootsRequest, @@ -969,6 +975,12 @@ impl BeaconProcessor { Some(item) } else if let Some(item) = work_queues.dcbrange_queue.pop() { Some(item) + } else if let Some(item) = work_queues.payload_envelopes_brange_queue.pop() + { + Some(item) + } else if let Some(item) = work_queues.payload_envelopes_broots_queue.pop() + { + Some(item) // Check slashings after all other consensus messages so we prioritize // following head. // @@ -1155,6 +1167,12 @@ impl BeaconProcessor { Work::BlocksByRootsRequest { .. } => { work_queues.block_broots_queue.push(work, work_id) } + Work::PayloadEnvelopesByRangeRequest { .. } => work_queues + .payload_envelopes_brange_queue + .push(work, work_id), + Work::PayloadEnvelopesByRootRequest { .. } => work_queues + .payload_envelopes_broots_queue + .push(work, work_id), Work::BlobsByRangeRequest { .. } => { work_queues.blob_brange_queue.push(work, work_id) } @@ -1270,6 +1288,12 @@ impl BeaconProcessor { WorkType::Status => work_queues.status_queue.len(), WorkType::BlocksByRangeRequest => work_queues.block_brange_queue.len(), WorkType::BlocksByRootsRequest => work_queues.block_broots_queue.len(), + WorkType::PayloadEnvelopesByRangeRequest => { + work_queues.payload_envelopes_brange_queue.len() + } + WorkType::PayloadEnvelopesByRootRequest => { + work_queues.payload_envelopes_broots_queue.len() + } WorkType::BlobsByRangeRequest => work_queues.blob_brange_queue.len(), WorkType::BlobsByRootsRequest => work_queues.blob_broots_queue.len(), WorkType::DataColumnsByRootsRequest => work_queues.dcbroots_queue.len(), @@ -1456,9 +1480,10 @@ impl BeaconProcessor { | Work::DataColumnsByRangeRequest(process_fn) => { task_spawner.spawn_blocking(process_fn) } - Work::BlocksByRangeRequest(work) | Work::BlocksByRootsRequest(work) => { - task_spawner.spawn_async(work) - } + Work::BlocksByRangeRequest(work) + | Work::BlocksByRootsRequest(work) + | Work::PayloadEnvelopesByRangeRequest(work) + | Work::PayloadEnvelopesByRootRequest(work) => task_spawner.spawn_async(work), Work::ChainSegmentBackfill(process_fn) => { if self.config.enable_backfill_rate_limiting { task_spawner.spawn_blocking_with_rayon(RayonPoolType::LowPriority, process_fn) diff --git a/beacon_node/beacon_processor/src/scheduler/work_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_queue.rs index 934659b304..0def1792e3 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_queue.rs @@ -134,6 +134,8 @@ pub struct BeaconProcessorQueueLengths { blob_brange_queue: usize, dcbroots_queue: usize, dcbrange_queue: usize, + payload_envelopes_brange_queue: usize, + payload_envelopes_broots_queue: usize, gossip_bls_to_execution_change_queue: usize, gossip_execution_payload_queue: usize, gossip_execution_payload_bid_queue: usize, @@ -204,6 +206,8 @@ impl BeaconProcessorQueueLengths { blob_brange_queue: 1024, dcbroots_queue: 1024, dcbrange_queue: 1024, + payload_envelopes_brange_queue: 1024, + payload_envelopes_broots_queue: 1024, gossip_bls_to_execution_change_queue: 16384, // TODO(EIP-7732): verify 1024 is preferable. I used same value as `gossip_block_queue` and `gossip_blob_queue` gossip_execution_payload_queue: 1024, @@ -253,6 +257,8 @@ pub struct WorkQueues { pub status_queue: FifoQueue>, pub block_brange_queue: FifoQueue>, pub block_broots_queue: FifoQueue>, + pub payload_envelopes_brange_queue: FifoQueue>, + pub payload_envelopes_broots_queue: FifoQueue>, pub blob_broots_queue: FifoQueue>, pub blob_brange_queue: FifoQueue>, pub dcbroots_queue: FifoQueue>, @@ -323,6 +329,10 @@ impl WorkQueues { let blob_brange_queue = FifoQueue::new(queue_lengths.blob_brange_queue); let dcbroots_queue = FifoQueue::new(queue_lengths.dcbroots_queue); let dcbrange_queue = FifoQueue::new(queue_lengths.dcbrange_queue); + let payload_envelopes_brange_queue = + FifoQueue::new(queue_lengths.payload_envelopes_brange_queue); + let payload_envelopes_broots_queue = + FifoQueue::new(queue_lengths.payload_envelopes_broots_queue); let gossip_bls_to_execution_change_queue = FifoQueue::new(queue_lengths.gossip_bls_to_execution_change_queue); @@ -382,6 +392,8 @@ impl WorkQueues { blob_brange_queue, dcbroots_queue, dcbrange_queue, + payload_envelopes_brange_queue, + payload_envelopes_broots_queue, gossip_bls_to_execution_change_queue, gossip_execution_payload_queue, gossip_execution_payload_bid_queue, diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 43a44c85fc..2edd9de2d9 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -590,6 +590,8 @@ impl PeerManager { Protocol::BlocksByRange => PeerAction::MidToleranceError, Protocol::BlocksByRoot => PeerAction::MidToleranceError, Protocol::BlobsByRange => PeerAction::MidToleranceError, + Protocol::PayloadEnvelopesByRange => PeerAction::MidToleranceError, + Protocol::PayloadEnvelopesByRoot => PeerAction::MidToleranceError, // Lighthouse does not currently make light client requests; therefore, this // is an unexpected scenario. We do not ban the peer for rate limiting. Protocol::LightClientBootstrap => return, @@ -615,6 +617,8 @@ impl PeerManager { Protocol::Ping => PeerAction::Fatal, Protocol::BlocksByRange => return, Protocol::BlocksByRoot => return, + Protocol::PayloadEnvelopesByRange => return, + Protocol::PayloadEnvelopesByRoot => return, Protocol::BlobsByRange => return, Protocol::BlobsByRoot => return, Protocol::DataColumnsByRoot => return, @@ -638,6 +642,8 @@ impl PeerManager { Protocol::Ping => PeerAction::LowToleranceError, Protocol::BlocksByRange => PeerAction::MidToleranceError, Protocol::BlocksByRoot => PeerAction::MidToleranceError, + Protocol::PayloadEnvelopesByRange => PeerAction::MidToleranceError, + Protocol::PayloadEnvelopesByRoot => PeerAction::MidToleranceError, Protocol::BlobsByRange => PeerAction::MidToleranceError, Protocol::BlobsByRoot => PeerAction::MidToleranceError, Protocol::DataColumnsByRoot => PeerAction::MidToleranceError, diff --git a/beacon_node/lighthouse_network/src/rpc/codec.rs b/beacon_node/lighthouse_network/src/rpc/codec.rs index d1a3182fad..ea615452da 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec.rs @@ -15,6 +15,7 @@ use std::io::{Read, Write}; use std::marker::PhantomData; use std::sync::Arc; use tokio_util::codec::{Decoder, Encoder}; +use types::SignedExecutionPayloadEnvelope; use types::{ BlobSidecar, ChainSpec, DataColumnSidecar, DataColumnsByRootIdentifier, EthSpec, ForkContext, ForkName, Hash256, LightClientBootstrap, LightClientFinalityUpdate, @@ -76,6 +77,8 @@ impl SSZSnappyInboundCodec { }, RpcSuccessResponse::BlocksByRange(res) => res.as_ssz_bytes(), RpcSuccessResponse::BlocksByRoot(res) => res.as_ssz_bytes(), + RpcSuccessResponse::PayloadEnvelopesbyRange(res) => res.as_ssz_bytes(), + RpcSuccessResponse::PayloadEnvelopesByRoot(res) => res.as_ssz_bytes(), RpcSuccessResponse::BlobsByRange(res) => res.as_ssz_bytes(), RpcSuccessResponse::BlobsByRoot(res) => res.as_ssz_bytes(), RpcSuccessResponse::DataColumnsByRoot(res) => res.as_ssz_bytes(), @@ -356,6 +359,8 @@ impl Encoder> for SSZSnappyOutboundCodec { BlocksByRootRequest::V1(req) => req.block_roots.as_ssz_bytes(), BlocksByRootRequest::V2(req) => req.block_roots.as_ssz_bytes(), }, + RequestType::PayloadEnvelopesByRange(req) => req.as_ssz_bytes(), + RequestType::PayloadEnvelopesByRoot(req) => req.beacon_block_roots.as_ssz_bytes(), RequestType::BlobsByRange(req) => req.as_ssz_bytes(), RequestType::BlobsByRoot(req) => req.blob_ids.as_ssz_bytes(), RequestType::DataColumnsByRange(req) => req.as_ssz_bytes(), @@ -548,6 +553,19 @@ fn handle_rpc_request( )?, }), ))), + SupportedProtocol::PayloadEnvelopesByRangeV1 => { + Ok(Some(RequestType::PayloadEnvelopesByRange( + PayloadEnvelopesByRangeRequest::from_ssz_bytes(decoded_buffer)?, + ))) + } + SupportedProtocol::PayloadEnvelopesByRootV1 => Ok(Some( + RequestType::PayloadEnvelopesByRoot(PayloadEnvelopesByRootRequest { + beacon_block_roots: RuntimeVariableList::from_ssz_bytes( + decoded_buffer, + spec.max_request_blocks(current_fork), + )?, + }), + )), SupportedProtocol::BlobsByRangeV1 => Ok(Some(RequestType::BlobsByRange( BlobsByRangeRequest::from_ssz_bytes(decoded_buffer)?, ))), @@ -650,6 +668,16 @@ fn handle_rpc_response( SupportedProtocol::BlocksByRootV1 => Ok(Some(RpcSuccessResponse::BlocksByRoot(Arc::new( SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?), )))), + SupportedProtocol::PayloadEnvelopesByRangeV1 => { + Ok(Some(RpcSuccessResponse::PayloadEnvelopesbyRange(Arc::new( + SignedExecutionPayloadEnvelope::from_ssz_bytes(decoded_buffer)?, + )))) + } + SupportedProtocol::PayloadEnvelopesByRootV1 => { + Ok(Some(RpcSuccessResponse::PayloadEnvelopesByRoot(Arc::new( + SignedExecutionPayloadEnvelope::from_ssz_bytes(decoded_buffer)?, + )))) + } SupportedProtocol::BlobsByRangeV1 => match fork_name { Some(fork_name) => { if fork_name.deneb_enabled() { @@ -1260,6 +1288,12 @@ mod tests { RequestType::BlobsByRange(blbrange) => { assert_eq!(decoded, RequestType::BlobsByRange(blbrange)) } + RequestType::PayloadEnvelopesByRange(perange) => { + assert_eq!(decoded, RequestType::PayloadEnvelopesByRange(perange)) + } + RequestType::PayloadEnvelopesByRoot(peroot) => { + assert_eq!(decoded, RequestType::PayloadEnvelopesByRoot(peroot)) + } RequestType::BlobsByRoot(bbroot) => { assert_eq!(decoded, RequestType::BlobsByRoot(bbroot)) } diff --git a/beacon_node/lighthouse_network/src/rpc/config.rs b/beacon_node/lighthouse_network/src/rpc/config.rs index b0ee6fea64..9e1c6541ec 100644 --- a/beacon_node/lighthouse_network/src/rpc/config.rs +++ b/beacon_node/lighthouse_network/src/rpc/config.rs @@ -89,6 +89,8 @@ pub struct RateLimiterConfig { pub(super) goodbye_quota: Quota, pub(super) blocks_by_range_quota: Quota, pub(super) blocks_by_root_quota: Quota, + pub(super) payload_envelopes_by_range_quota: Quota, + pub(super) payload_envelopes_by_root_quota: Quota, pub(super) blobs_by_range_quota: Quota, pub(super) blobs_by_root_quota: Quota, pub(super) data_columns_by_root_quota: Quota, @@ -111,6 +113,10 @@ impl RateLimiterConfig { Quota::n_every(NonZeroU64::new(128).unwrap(), 10); pub const DEFAULT_BLOCKS_BY_ROOT_QUOTA: Quota = Quota::n_every(NonZeroU64::new(128).unwrap(), 10); + pub const DEFAULT_PAYLOAD_ENVELOPES_BY_RANGE_QUOTA: Quota = + Quota::n_every(NonZeroU64::new(128).unwrap(), 10); + pub const DEFAULT_PAYLOAD_ENVELOPES_BY_ROOT_QUOTA: Quota = + Quota::n_every(NonZeroU64::new(128).unwrap(), 10); // `DEFAULT_BLOCKS_BY_RANGE_QUOTA` * (target + 1) to account for high usage pub const DEFAULT_BLOBS_BY_RANGE_QUOTA: Quota = Quota::n_every(NonZeroU64::new(896).unwrap(), 10); @@ -137,6 +143,8 @@ impl Default for RateLimiterConfig { goodbye_quota: Self::DEFAULT_GOODBYE_QUOTA, blocks_by_range_quota: Self::DEFAULT_BLOCKS_BY_RANGE_QUOTA, blocks_by_root_quota: Self::DEFAULT_BLOCKS_BY_ROOT_QUOTA, + payload_envelopes_by_range_quota: Self::DEFAULT_PAYLOAD_ENVELOPES_BY_RANGE_QUOTA, + payload_envelopes_by_root_quota: Self::DEFAULT_PAYLOAD_ENVELOPES_BY_ROOT_QUOTA, blobs_by_range_quota: Self::DEFAULT_BLOBS_BY_RANGE_QUOTA, blobs_by_root_quota: Self::DEFAULT_BLOBS_BY_ROOT_QUOTA, data_columns_by_root_quota: Self::DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA, @@ -169,6 +177,14 @@ impl Debug for RateLimiterConfig { .field("goodbye", fmt_q!(&self.goodbye_quota)) .field("blocks_by_range", fmt_q!(&self.blocks_by_range_quota)) .field("blocks_by_root", fmt_q!(&self.blocks_by_root_quota)) + .field( + "payload_envelopes_by_range", + fmt_q!(&self.payload_envelopes_by_range_quota), + ) + .field( + "payload_envelopes_by_root", + fmt_q!(&self.payload_envelopes_by_root_quota), + ) .field("blobs_by_range", fmt_q!(&self.blobs_by_range_quota)) .field("blobs_by_root", fmt_q!(&self.blobs_by_root_quota)) .field( @@ -197,6 +213,8 @@ impl FromStr for RateLimiterConfig { let mut goodbye_quota = None; let mut blocks_by_range_quota = None; let mut blocks_by_root_quota = None; + let mut payload_envelopes_by_range_quota = None; + let mut payload_envelopes_by_root_quota = None; let mut blobs_by_range_quota = None; let mut blobs_by_root_quota = None; let mut data_columns_by_root_quota = None; @@ -214,6 +232,12 @@ impl FromStr for RateLimiterConfig { Protocol::Goodbye => goodbye_quota = goodbye_quota.or(quota), Protocol::BlocksByRange => blocks_by_range_quota = blocks_by_range_quota.or(quota), Protocol::BlocksByRoot => blocks_by_root_quota = blocks_by_root_quota.or(quota), + Protocol::PayloadEnvelopesByRange => { + payload_envelopes_by_range_quota = payload_envelopes_by_range_quota.or(quota) + } + Protocol::PayloadEnvelopesByRoot => { + payload_envelopes_by_root_quota = payload_envelopes_by_root_quota.or(quota) + } Protocol::BlobsByRange => blobs_by_range_quota = blobs_by_range_quota.or(quota), Protocol::BlobsByRoot => blobs_by_root_quota = blobs_by_root_quota.or(quota), Protocol::DataColumnsByRoot => { @@ -250,6 +274,10 @@ impl FromStr for RateLimiterConfig { .unwrap_or(Self::DEFAULT_BLOCKS_BY_RANGE_QUOTA), blocks_by_root_quota: blocks_by_root_quota .unwrap_or(Self::DEFAULT_BLOCKS_BY_ROOT_QUOTA), + payload_envelopes_by_range_quota: payload_envelopes_by_range_quota + .unwrap_or(Self::DEFAULT_PAYLOAD_ENVELOPES_BY_RANGE_QUOTA), + payload_envelopes_by_root_quota: payload_envelopes_by_root_quota + .unwrap_or(Self::DEFAULT_PAYLOAD_ENVELOPES_BY_ROOT_QUOTA), blobs_by_range_quota: blobs_by_range_quota .unwrap_or(Self::DEFAULT_BLOBS_BY_RANGE_QUOTA), blobs_by_root_quota: blobs_by_root_quota.unwrap_or(Self::DEFAULT_BLOBS_BY_ROOT_QUOTA), diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 5a9a683b75..a07ad8d183 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -17,7 +17,8 @@ use types::light_client::consts::MAX_REQUEST_LIGHT_CLIENT_UPDATES; use types::{ BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnsByRootIdentifier, Epoch, EthSpec, ForkContext, Hash256, LightClientBootstrap, LightClientFinalityUpdate, - LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock, Slot, + LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, Slot, }; /// Maximum length of error message. @@ -362,6 +363,16 @@ impl BlocksByRangeRequest { } } +/// Request a number of execution payload envelopes from a peer. +#[derive(Encode, Decode, Clone, Debug, PartialEq)] +pub struct PayloadEnvelopesByRangeRequest { + /// The starting slot to request execution payload envelopes. + pub start_slot: u64, + + /// The number of slots from the start slot. + pub count: u64, +} + /// Request a number of beacon blobs from a peer. #[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct BlobsByRangeRequest { @@ -505,6 +516,31 @@ impl BlocksByRootRequest { } } +/// Reqwuest a number of execution payload envelopes from a peer +#[derive(Clone, Debug, PartialEq)] +pub struct PayloadEnvelopesByRootRequest { + /// The list of beacon block roots used to request execution payload envelopes. + pub beacon_block_roots: RuntimeVariableList, +} + +impl PayloadEnvelopesByRootRequest { + pub fn new( + beacon_block_roots: Vec, + fork_context: &ForkContext, + ) -> Result { + let max_requests_envelopes = fork_context + .spec + .max_request_blocks(fork_context.current_fork_name()); + + let beacon_block_roots = + RuntimeVariableList::new(beacon_block_roots, max_requests_envelopes).map_err(|e| { + format!("ExecutionPayloadEnvelopesByRootRequest too many beacon block roots: {e:?}") + })?; + + Ok(Self { beacon_block_roots }) + } +} + /// Request a number of beacon blocks and blobs from a peer. #[derive(Clone, Debug, PartialEq)] pub struct BlobsByRootRequest { @@ -588,6 +624,13 @@ pub enum RpcSuccessResponse { /// A response to a get BLOCKS_BY_ROOT request. BlocksByRoot(Arc>), + /// A response to a get EXECUTION_PAYLOAD_ENVELOPES_BY_RANGE request. A None response signifies + /// the end of the batch. + PayloadEnvelopesbyRange(Arc>), + + /// A response to a get EXECUTION_PAYLOAD_ENVELOPES_BY_ROOT request. + PayloadEnvelopesByRoot(Arc>), + /// A response to a get BLOBS_BY_RANGE request BlobsByRange(Arc>), @@ -628,6 +671,12 @@ pub enum ResponseTermination { /// Blocks by root stream termination. BlocksByRoot, + /// Execution payload envelopes by range stream termination. + PayloadEnvelopesByRange, + + /// Execution payload envelopes by root stream termination. + PayloadEnvelopesByRoot, + /// Blobs by range stream termination. BlobsByRange, @@ -649,6 +698,8 @@ impl ResponseTermination { match self { ResponseTermination::BlocksByRange => Protocol::BlocksByRange, ResponseTermination::BlocksByRoot => Protocol::BlocksByRoot, + ResponseTermination::PayloadEnvelopesByRange => Protocol::PayloadEnvelopesByRange, + ResponseTermination::PayloadEnvelopesByRoot => Protocol::PayloadEnvelopesByRoot, ResponseTermination::BlobsByRange => Protocol::BlobsByRange, ResponseTermination::BlobsByRoot => Protocol::BlobsByRoot, ResponseTermination::DataColumnsByRoot => Protocol::DataColumnsByRoot, @@ -744,6 +795,8 @@ impl RpcSuccessResponse { RpcSuccessResponse::Status(_) => Protocol::Status, RpcSuccessResponse::BlocksByRange(_) => Protocol::BlocksByRange, RpcSuccessResponse::BlocksByRoot(_) => Protocol::BlocksByRoot, + RpcSuccessResponse::PayloadEnvelopesbyRange(_) => Protocol::PayloadEnvelopesByRange, + RpcSuccessResponse::PayloadEnvelopesByRoot(_) => Protocol::PayloadEnvelopesByRoot, RpcSuccessResponse::BlobsByRange(_) => Protocol::BlobsByRange, RpcSuccessResponse::BlobsByRoot(_) => Protocol::BlobsByRoot, RpcSuccessResponse::DataColumnsByRoot(_) => Protocol::DataColumnsByRoot, @@ -762,6 +815,7 @@ impl RpcSuccessResponse { pub fn slot(&self) -> Option { match self { Self::BlocksByRange(r) | Self::BlocksByRoot(r) => Some(r.slot()), + Self::PayloadEnvelopesByRoot(r) | Self::PayloadEnvelopesbyRange(r) => Some(r.slot()), Self::BlobsByRange(r) | Self::BlobsByRoot(r) => Some(r.slot()), Self::DataColumnsByRange(r) | Self::DataColumnsByRoot(r) => Some(r.slot()), Self::LightClientBootstrap(r) => Some(r.get_slot()), @@ -812,6 +866,20 @@ impl std::fmt::Display for RpcSuccessResponse { RpcSuccessResponse::BlocksByRoot(block) => { write!(f, "BlocksByRoot: Block slot: {}", block.slot()) } + RpcSuccessResponse::PayloadEnvelopesbyRange(envelope) => { + write!( + f, + "ExecutionPayloadEnvelopesByRange: Envelope slot: {}", + envelope.slot() + ) + } + RpcSuccessResponse::PayloadEnvelopesByRoot(envelope) => { + write!( + f, + "ExecutionPayloadEnvelopesByRoot: Envelope slot: {}", + envelope.slot() + ) + } RpcSuccessResponse::BlobsByRange(blob) => { write!(f, "BlobsByRange: Blob slot: {}", blob.slot()) } diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index b75ca72eda..dd9982056b 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -242,6 +242,12 @@ pub enum Protocol { /// The `BlobsByRange` protocol name. #[strum(serialize = "blob_sidecars_by_range")] BlobsByRange, + /// The `ExecutionPayloadEnvelopesByRoot` protocol name. + #[strum(serialize = "execution_payload_envelopes_by_root")] + PayloadEnvelopesByRoot, + /// The `ExecutionPayloadEnvelopesByRange` protocol name. + #[strum(serialize = "execution_payload_envelopes_by_range")] + PayloadEnvelopesByRange, /// The `BlobsByRoot` protocol name. #[strum(serialize = "blob_sidecars_by_root")] BlobsByRoot, @@ -277,6 +283,8 @@ impl Protocol { Protocol::Goodbye => None, Protocol::BlocksByRange => Some(ResponseTermination::BlocksByRange), Protocol::BlocksByRoot => Some(ResponseTermination::BlocksByRoot), + Protocol::PayloadEnvelopesByRange => Some(ResponseTermination::PayloadEnvelopesByRange), + Protocol::PayloadEnvelopesByRoot => Some(ResponseTermination::PayloadEnvelopesByRoot), Protocol::BlobsByRange => Some(ResponseTermination::BlobsByRange), Protocol::BlobsByRoot => Some(ResponseTermination::BlobsByRoot), Protocol::DataColumnsByRoot => Some(ResponseTermination::DataColumnsByRoot), @@ -307,6 +315,8 @@ pub enum SupportedProtocol { BlocksByRangeV2, BlocksByRootV1, BlocksByRootV2, + PayloadEnvelopesByRangeV1, + PayloadEnvelopesByRootV1, BlobsByRangeV1, BlobsByRootV1, DataColumnsByRootV1, @@ -329,6 +339,8 @@ impl SupportedProtocol { SupportedProtocol::GoodbyeV1 => "1", SupportedProtocol::BlocksByRangeV1 => "1", SupportedProtocol::BlocksByRangeV2 => "2", + SupportedProtocol::PayloadEnvelopesByRangeV1 => "1", + SupportedProtocol::PayloadEnvelopesByRootV1 => "1", SupportedProtocol::BlocksByRootV1 => "1", SupportedProtocol::BlocksByRootV2 => "2", SupportedProtocol::BlobsByRangeV1 => "1", @@ -355,6 +367,8 @@ impl SupportedProtocol { SupportedProtocol::BlocksByRangeV2 => Protocol::BlocksByRange, SupportedProtocol::BlocksByRootV1 => Protocol::BlocksByRoot, SupportedProtocol::BlocksByRootV2 => Protocol::BlocksByRoot, + SupportedProtocol::PayloadEnvelopesByRangeV1 => Protocol::PayloadEnvelopesByRange, + SupportedProtocol::PayloadEnvelopesByRootV1 => Protocol::PayloadEnvelopesByRoot, SupportedProtocol::BlobsByRangeV1 => Protocol::BlobsByRange, SupportedProtocol::BlobsByRootV1 => Protocol::BlobsByRoot, SupportedProtocol::DataColumnsByRootV1 => Protocol::DataColumnsByRoot, @@ -511,6 +525,11 @@ impl ProtocolId { ::ssz_fixed_len(), ), Protocol::BlocksByRoot => RpcLimits::new(0, spec.max_blocks_by_root_request), + Protocol::PayloadEnvelopesByRange => RpcLimits::new( + ::ssz_fixed_len(), + ::ssz_fixed_len(), + ), + Protocol::PayloadEnvelopesByRoot => RpcLimits::new(0, spec.max_blocks_by_root_request), Protocol::BlobsByRange => RpcLimits::new( ::ssz_fixed_len(), ::ssz_fixed_len(), @@ -549,6 +568,12 @@ impl ProtocolId { Protocol::Goodbye => RpcLimits::new(0, 0), // Goodbye request has no response Protocol::BlocksByRange => rpc_block_limits_by_fork(fork_context.current_fork_name()), Protocol::BlocksByRoot => rpc_block_limits_by_fork(fork_context.current_fork_name()), + Protocol::PayloadEnvelopesByRange => { + rpc_block_limits_by_fork(fork_context.current_fork_name()) + } + Protocol::PayloadEnvelopesByRoot => { + rpc_block_limits_by_fork(fork_context.current_fork_name()) + } Protocol::BlobsByRange => rpc_blob_limits::(), Protocol::BlobsByRoot => rpc_blob_limits::(), Protocol::DataColumnsByRoot => { @@ -586,6 +611,8 @@ impl ProtocolId { match self.versioned_protocol { SupportedProtocol::BlocksByRangeV2 | SupportedProtocol::BlocksByRootV2 + | SupportedProtocol::PayloadEnvelopesByRangeV1 + | SupportedProtocol::PayloadEnvelopesByRootV1 | SupportedProtocol::BlobsByRangeV1 | SupportedProtocol::BlobsByRootV1 | SupportedProtocol::DataColumnsByRootV1 @@ -737,6 +764,8 @@ pub enum RequestType { Goodbye(GoodbyeReason), BlocksByRange(OldBlocksByRangeRequest), BlocksByRoot(BlocksByRootRequest), + PayloadEnvelopesByRange(PayloadEnvelopesByRangeRequest), + PayloadEnvelopesByRoot(PayloadEnvelopesByRootRequest), BlobsByRange(BlobsByRangeRequest), BlobsByRoot(BlobsByRootRequest), DataColumnsByRoot(DataColumnsByRootRequest), @@ -760,6 +789,8 @@ impl RequestType { RequestType::Goodbye(_) => 0, RequestType::BlocksByRange(req) => *req.count(), RequestType::BlocksByRoot(req) => req.block_roots().len() as u64, + RequestType::PayloadEnvelopesByRange(req) => req.count, + RequestType::PayloadEnvelopesByRoot(req) => req.beacon_block_roots.len() as u64, RequestType::BlobsByRange(req) => req.max_blobs_requested(digest_epoch, spec), RequestType::BlobsByRoot(req) => req.blob_ids.len() as u64, RequestType::DataColumnsByRoot(req) => req.max_requested() as u64, @@ -789,6 +820,8 @@ impl RequestType { BlocksByRootRequest::V1(_) => SupportedProtocol::BlocksByRootV1, BlocksByRootRequest::V2(_) => SupportedProtocol::BlocksByRootV2, }, + RequestType::PayloadEnvelopesByRange(_) => SupportedProtocol::PayloadEnvelopesByRangeV1, + RequestType::PayloadEnvelopesByRoot(_) => SupportedProtocol::PayloadEnvelopesByRootV1, RequestType::BlobsByRange(_) => SupportedProtocol::BlobsByRangeV1, RequestType::BlobsByRoot(_) => SupportedProtocol::BlobsByRootV1, RequestType::DataColumnsByRoot(_) => SupportedProtocol::DataColumnsByRootV1, @@ -820,6 +853,8 @@ impl RequestType { // variants that have `multiple_responses()` can have values. RequestType::BlocksByRange(_) => ResponseTermination::BlocksByRange, RequestType::BlocksByRoot(_) => ResponseTermination::BlocksByRoot, + RequestType::PayloadEnvelopesByRange(_) => ResponseTermination::PayloadEnvelopesByRange, + RequestType::PayloadEnvelopesByRoot(_) => ResponseTermination::PayloadEnvelopesByRoot, RequestType::BlobsByRange(_) => ResponseTermination::BlobsByRange, RequestType::BlobsByRoot(_) => ResponseTermination::BlobsByRoot, RequestType::DataColumnsByRoot(_) => ResponseTermination::DataColumnsByRoot, @@ -854,6 +889,14 @@ impl RequestType { ProtocolId::new(SupportedProtocol::BlocksByRootV2, Encoding::SSZSnappy), ProtocolId::new(SupportedProtocol::BlocksByRootV1, Encoding::SSZSnappy), ], + RequestType::PayloadEnvelopesByRange(_) => vec![ProtocolId::new( + SupportedProtocol::PayloadEnvelopesByRangeV1, + Encoding::SSZSnappy, + )], + RequestType::PayloadEnvelopesByRoot(_) => vec![ProtocolId::new( + SupportedProtocol::PayloadEnvelopesByRootV1, + Encoding::SSZSnappy, + )], RequestType::BlobsByRange(_) => vec![ProtocolId::new( SupportedProtocol::BlobsByRangeV1, Encoding::SSZSnappy, @@ -905,6 +948,8 @@ impl RequestType { RequestType::BlocksByRange(_) => false, RequestType::BlocksByRoot(_) => false, RequestType::BlobsByRange(_) => false, + RequestType::PayloadEnvelopesByRange(_) => false, + RequestType::PayloadEnvelopesByRoot(_) => false, RequestType::BlobsByRoot(_) => false, RequestType::DataColumnsByRoot(_) => false, RequestType::DataColumnsByRange(_) => false, @@ -1015,6 +1060,12 @@ impl std::fmt::Display for RequestType { RequestType::Goodbye(reason) => write!(f, "Goodbye: {}", reason), RequestType::BlocksByRange(req) => write!(f, "Blocks by range: {}", req), RequestType::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req), + RequestType::PayloadEnvelopesByRange(req) => { + write!(f, "Payload envelopes by range: {:?}", req) + } + RequestType::PayloadEnvelopesByRoot(req) => { + write!(f, "Payload envelopes by root: {:?}", req) + } RequestType::BlobsByRange(req) => write!(f, "Blobs by range: {:?}", req), RequestType::BlobsByRoot(req) => write!(f, "Blobs by root: {:?}", req), RequestType::DataColumnsByRoot(req) => write!(f, "Data columns by root: {:?}", req), diff --git a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs index 2407038bc3..b0fd9f4dd5 100644 --- a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs @@ -109,7 +109,11 @@ pub struct RPCRateLimiter { blbrange_rl: Limiter, /// BlobsByRoot rate limiter. blbroot_rl: Limiter, - /// DataColumnssByRoot rate limiter. + /// PayloadEnvelopesByRange rate limiter. + perange_rl: Limiter, + /// PayloadEnvelopesByRoot rate limiter. + peroots_rl: Limiter, + /// DataColumnsByRoot rate limiter. dcbroot_rl: Limiter, /// DataColumnsByRange rate limiter. dcbrange_rl: Limiter, @@ -148,6 +152,10 @@ pub struct RPCRateLimiterBuilder { bbrange_quota: Option, /// Quota for the BlocksByRoot protocol. bbroots_quota: Option, + /// Quota for the ExecutionPayloadEnvelopesByRange protocol. + perange_quota: Option, + /// Quota for the ExecutionPayloadEnvelopesByRoot protocol. + peroots_quota: Option, /// Quota for the BlobsByRange protocol. blbrange_quota: Option, /// Quota for the BlobsByRoot protocol. @@ -177,6 +185,8 @@ impl RPCRateLimiterBuilder { Protocol::Goodbye => self.goodbye_quota = q, Protocol::BlocksByRange => self.bbrange_quota = q, Protocol::BlocksByRoot => self.bbroots_quota = q, + Protocol::PayloadEnvelopesByRange => self.perange_quota = q, + Protocol::PayloadEnvelopesByRoot => self.peroots_quota = q, Protocol::BlobsByRange => self.blbrange_quota = q, Protocol::BlobsByRoot => self.blbroot_quota = q, Protocol::DataColumnsByRoot => self.dcbroot_quota = q, @@ -201,6 +211,12 @@ impl RPCRateLimiterBuilder { let bbrange_quota = self .bbrange_quota .ok_or("BlocksByRange quota not specified")?; + let perange_quota = self + .perange_quota + .ok_or("PayloadEnvelopesByRange quota not specified")?; + let peroots_quota = self + .peroots_quota + .ok_or("PayloadEnvelopesByRoot quota not specified")?; let lc_bootstrap_quota = self .lcbootstrap_quota .ok_or("LightClientBootstrap quota not specified")?; @@ -236,6 +252,8 @@ impl RPCRateLimiterBuilder { let goodbye_rl = Limiter::from_quota(goodbye_quota)?; let bbroots_rl = Limiter::from_quota(bbroots_quota)?; let bbrange_rl = Limiter::from_quota(bbrange_quota)?; + let perange_rl = Limiter::from_quota(perange_quota)?; + let peroots_rl = Limiter::from_quota(peroots_quota)?; let blbrange_rl = Limiter::from_quota(blbrange_quota)?; let blbroot_rl = Limiter::from_quota(blbroots_quota)?; let dcbroot_rl = Limiter::from_quota(dcbroot_quota)?; @@ -259,6 +277,8 @@ impl RPCRateLimiterBuilder { goodbye_rl, bbroots_rl, bbrange_rl, + perange_rl, + peroots_rl, blbrange_rl, blbroot_rl, dcbroot_rl, @@ -312,6 +332,8 @@ impl RPCRateLimiter { goodbye_quota, blocks_by_range_quota, blocks_by_root_quota, + payload_envelopes_by_range_quota, + payload_envelopes_by_root_quota, blobs_by_range_quota, blobs_by_root_quota, data_columns_by_root_quota, @@ -329,6 +351,14 @@ impl RPCRateLimiter { .set_quota(Protocol::Goodbye, goodbye_quota) .set_quota(Protocol::BlocksByRange, blocks_by_range_quota) .set_quota(Protocol::BlocksByRoot, blocks_by_root_quota) + .set_quota( + Protocol::PayloadEnvelopesByRange, + payload_envelopes_by_range_quota, + ) + .set_quota( + Protocol::PayloadEnvelopesByRoot, + payload_envelopes_by_root_quota, + ) .set_quota(Protocol::BlobsByRange, blobs_by_range_quota) .set_quota(Protocol::BlobsByRoot, blobs_by_root_quota) .set_quota(Protocol::DataColumnsByRoot, data_columns_by_root_quota) @@ -376,6 +406,8 @@ impl RPCRateLimiter { Protocol::Goodbye => &mut self.goodbye_rl, Protocol::BlocksByRange => &mut self.bbrange_rl, Protocol::BlocksByRoot => &mut self.bbroots_rl, + Protocol::PayloadEnvelopesByRange => &mut self.perange_rl, + Protocol::PayloadEnvelopesByRoot => &mut self.peroots_rl, Protocol::BlobsByRange => &mut self.blbrange_rl, Protocol::BlobsByRoot => &mut self.blbroot_rl, Protocol::DataColumnsByRoot => &mut self.dcbroot_rl, @@ -400,6 +432,8 @@ impl RPCRateLimiter { status_rl, bbrange_rl, bbroots_rl, + perange_rl, + peroots_rl, blbrange_rl, blbroot_rl, dcbroot_rl, @@ -417,6 +451,8 @@ impl RPCRateLimiter { status_rl.prune(time_since_start); bbrange_rl.prune(time_since_start); bbroots_rl.prune(time_since_start); + perange_rl.prune(time_since_start); + peroots_rl.prune(time_since_start); blbrange_rl.prune(time_since_start); blbroot_rl.prune(time_since_start); dcbrange_rl.prune(time_since_start); diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index d0323bab52..6277fc7dec 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use types::{ BlobSidecar, DataColumnSidecar, Epoch, EthSpec, LightClientBootstrap, LightClientFinalityUpdate, LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, }; pub type Id = u32; @@ -160,6 +161,10 @@ pub enum Response { DataColumnsByRange(Option>>), /// A response to a get BLOCKS_BY_ROOT request. BlocksByRoot(Option>>), + /// A response to a get `EXECUTION_PAYLOAD_ENVELOPES_BY_ROOT` request. + PayloadEnvelopesByRoot(Option>>), + /// A response to a get `EXECUTION_PAYLOAD_ENVELOPES_BYH_RANGE` request. + PayloadEnvelopesByRange(Option>>), /// A response to a get BLOBS_BY_ROOT request. BlobsByRoot(Option>>), /// A response to a get DATA_COLUMN_SIDECARS_BY_ROOT request. @@ -185,6 +190,16 @@ impl std::convert::From> for RpcResponse { Some(b) => RpcResponse::Success(RpcSuccessResponse::BlocksByRange(b)), None => RpcResponse::StreamTermination(ResponseTermination::BlocksByRange), }, + Response::PayloadEnvelopesByRoot(r) => match r { + Some(p) => RpcResponse::Success(RpcSuccessResponse::PayloadEnvelopesByRoot(p)), + None => RpcResponse::StreamTermination(ResponseTermination::PayloadEnvelopesByRoot), + }, + Response::PayloadEnvelopesByRange(r) => match r { + Some(p) => RpcResponse::Success(RpcSuccessResponse::PayloadEnvelopesbyRange(p)), + None => { + RpcResponse::StreamTermination(ResponseTermination::PayloadEnvelopesByRange) + } + }, Response::BlobsByRoot(r) => match r { Some(b) => RpcResponse::Success(RpcSuccessResponse::BlobsByRoot(b)), None => RpcResponse::StreamTermination(ResponseTermination::BlobsByRoot), diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 94e0ad0710..304f7f5baf 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -1525,6 +1525,28 @@ impl Network { request_type, }) } + RequestType::PayloadEnvelopesByRange(_) => { + metrics::inc_counter_vec( + &metrics::TOTAL_RPC_REQUESTS, + &["payload_envelopes_by_range"], + ); + Some(NetworkEvent::RequestReceived { + peer_id, + inbound_request_id, + request_type, + }) + } + RequestType::PayloadEnvelopesByRoot(_) => { + metrics::inc_counter_vec( + &metrics::TOTAL_RPC_REQUESTS, + &["payload_envelopes_by_root"], + ); + Some(NetworkEvent::RequestReceived { + peer_id, + inbound_request_id, + request_type, + }) + } RequestType::BlobsByRange(_) => { metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["blobs_by_range"]); Some(NetworkEvent::RequestReceived { @@ -1639,6 +1661,16 @@ impl Network { RpcSuccessResponse::BlocksByRoot(resp) => { self.build_response(id, peer_id, Response::BlocksByRoot(Some(resp))) } + RpcSuccessResponse::PayloadEnvelopesbyRange(resp) => self.build_response( + id, + peer_id, + Response::PayloadEnvelopesByRange(Some(resp)), + ), + RpcSuccessResponse::PayloadEnvelopesByRoot(resp) => self.build_response( + id, + peer_id, + Response::PayloadEnvelopesByRoot(Some(resp)), + ), RpcSuccessResponse::BlobsByRoot(resp) => { self.build_response(id, peer_id, Response::BlobsByRoot(Some(resp))) } @@ -1673,6 +1705,12 @@ impl Network { let response = match termination { ResponseTermination::BlocksByRange => Response::BlocksByRange(None), ResponseTermination::BlocksByRoot => Response::BlocksByRoot(None), + ResponseTermination::PayloadEnvelopesByRange => { + Response::PayloadEnvelopesByRange(None) + } + ResponseTermination::PayloadEnvelopesByRoot => { + Response::PayloadEnvelopesByRoot(None) + } ResponseTermination::BlobsByRange => Response::BlobsByRange(None), ResponseTermination::BlobsByRoot => Response::BlobsByRoot(None), ResponseTermination::DataColumnsByRoot => Response::DataColumnsByRoot(None), diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index e1adf860de..039b22db7c 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -14,7 +14,8 @@ use beacon_processor::{ use lighthouse_network::rpc::InboundRequestId; use lighthouse_network::rpc::methods::{ BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest, - LightClientUpdatesByRangeRequest, + LightClientUpdatesByRangeRequest, PayloadEnvelopesByRangeRequest, + PayloadEnvelopesByRootRequest, }; use lighthouse_network::service::api_types::CustodyBackfillBatchId; use lighthouse_network::{ @@ -686,6 +687,46 @@ impl NetworkBeaconProcessor { }) } + /// Create a new work event to process `PayloadEnvelopesByRootRequest`s from the RPC network. + pub fn send_payload_envelopes_by_roots_request( + self: &Arc, + peer_id: PeerId, + inbound_request_id: InboundRequestId, // Use ResponseId here + request: PayloadEnvelopesByRootRequest, + ) -> Result<(), Error> { + let processor = self.clone(); + let process_fn = async move { + processor + .handle_payload_envelopes_by_root_request(peer_id, inbound_request_id, request) + .await; + }; + + self.try_send(BeaconWorkEvent { + drop_during_sync: false, + work: Work::PayloadEnvelopesByRootRequest(Box::pin(process_fn)), + }) + } + + /// Create a new work event to process `PayloadEnvelopesByRangeRequest`s from the RPC network. + pub fn send_payload_envelopes_by_range_request( + self: &Arc, + peer_id: PeerId, + inbound_request_id: InboundRequestId, + request: PayloadEnvelopesByRangeRequest, + ) -> Result<(), Error> { + let processor = self.clone(); + let process_fn = async move { + processor + .handle_payload_envelopes_by_range_request(peer_id, inbound_request_id, request) + .await; + }; + + self.try_send(BeaconWorkEvent { + drop_during_sync: false, + work: Work::PayloadEnvelopesByRangeRequest(Box::pin(process_fn)), + }) + } + /// Create a new work event to process `BlobsByRangeRequest`s from the RPC network. pub fn send_blobs_by_range_request( self: &Arc, diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 279870d444..412814a5cb 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -7,6 +7,7 @@ use beacon_chain::{BeaconChainError, BeaconChainTypes, BlockProcessStatus, WhenS use itertools::{Itertools, process_results}; use lighthouse_network::rpc::methods::{ BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest, + PayloadEnvelopesByRangeRequest, PayloadEnvelopesByRootRequest, }; use lighthouse_network::rpc::*; use lighthouse_network::{PeerId, ReportSource, Response, SyncInfo}; @@ -254,6 +255,113 @@ impl NetworkBeaconProcessor { Ok(()) } + /// Handle a `ExecutionPayloadEnvelopesByRoot` request from the peer. + #[instrument( + name = "lh_handle_payload_envelopes_by_root_request", + parent = None, + level = "debug", + skip_all, + fields(peer_id = %peer_id, client = tracing::field::Empty) + )] + pub async fn handle_payload_envelopes_by_root_request( + self: Arc, + peer_id: PeerId, + inbound_request_id: InboundRequestId, + request: PayloadEnvelopesByRootRequest, + ) { + let client = self.network_globals.client(&peer_id); + Span::current().record("client", field::display(client.kind)); + + self.terminate_response_stream( + peer_id, + inbound_request_id, + self.clone() + .handle_payload_envelopes_by_root_request_inner( + peer_id, + inbound_request_id, + request, + ) + .await, + Response::PayloadEnvelopesByRoot, + ); + } + + /// Handle a `ExecutionPayloadEnvelopes` request from the peer. + async fn handle_payload_envelopes_by_root_request_inner( + self: Arc, + peer_id: PeerId, + inbound_request_id: InboundRequestId, + request: PayloadEnvelopesByRootRequest, + ) -> Result<(), (RpcErrorResponse, &'static str)> { + let log_results = |peer_id, requested_envelopes, send_envelope_count| { + debug!( + %peer_id, + requested = requested_envelopes, + returned = %send_envelope_count, + "ExecutionPayloadEnvelopes outgoing response processed" + ); + }; + + let requested_envelopes = request.beacon_block_roots.len(); + let mut envelope_stream = match self + .chain + .get_payload_envelopes_checking_caches(request.beacon_block_roots.to_vec()) + { + Ok(envelope_stream) => envelope_stream, + Err(e) => { + error!( error = ?e, "Error getting payload envelope stream"); + return Err(( + RpcErrorResponse::ServerError, + "Error getting payload envelope stream", + )); + } + }; + // Fetching payload envelopes is async because it may have to hit the execution layer for payloads. + let mut send_envelope_count = 0; + while let Some((root, result)) = envelope_stream.next().await { + match result.as_ref() { + Ok(Some(envelope)) => { + self.send_response( + peer_id, + inbound_request_id, + Response::PayloadEnvelopesByRoot(Some(envelope.clone())), + ); + send_envelope_count += 1; + } + Ok(None) => { + debug!( + %peer_id, + request_root = ?root, + "Peer requested unknown payload envelope" + ); + } + Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => { + debug!( + block_root = ?root, + reason = "execution layer not synced", + "Failed to fetch execution payload for payload envelopes by root request" + ); + log_results(peer_id, requested_envelopes, send_envelope_count); + return Err(( + RpcErrorResponse::ResourceUnavailable, + "Execution layer not synced", + )); + } + Err(e) => { + debug!( + ?peer_id, + request_root = ?root, + error = ?e, + "Error fetching payload envelope for peer" + ); + } + } + } + log_results(peer_id, requested_envelopes, send_envelope_count); + + Ok(()) + } + /// Handle a `BlobsByRoot` request from the peer. #[instrument( name = "lh_handle_blobs_by_root_request", @@ -983,6 +1091,182 @@ impl NetworkBeaconProcessor { .collect::>()) } + /// Handle a `ExecutionPayloadEnvelopesByRange` request from the peer. + #[instrument( + name = "lh_handle_payload_envelopes_by_range_request", + parent = None, + level = "debug", + skip_all, + fields(peer_id = %peer_id, client = tracing::field::Empty) + )] + pub async fn handle_payload_envelopes_by_range_request( + self: Arc, + peer_id: PeerId, + inbound_request_id: InboundRequestId, + req: PayloadEnvelopesByRangeRequest, + ) { + let client = self.network_globals.client(&peer_id); + Span::current().record("client", field::display(client.kind)); + + self.terminate_response_stream( + peer_id, + inbound_request_id, + self.clone() + .handle_payload_envelopes_by_range_request_inner(peer_id, inbound_request_id, req) + .await, + Response::PayloadEnvelopesByRange, + ); + } + + /// Handle a `ExecutionPayloadEnvelopesByRange` request from the peer. + async fn handle_payload_envelopes_by_range_request_inner( + self: Arc, + peer_id: PeerId, + inbound_request_id: InboundRequestId, + req: PayloadEnvelopesByRangeRequest, + ) -> Result<(), (RpcErrorResponse, &'static str)> { + let req_start_slot = req.start_slot; + let req_count = req.count; + + debug!( + %peer_id, + count = req_count, + start_slot = %req_start_slot, + "Received ExecutionPayloadEnvelopesByRange Request" + ); + + // Spawn a blocking handle since get_block_roots_for_slot_range takes a sync lock on the + // fork-choice. + let network_beacon_processor = self.clone(); + let block_roots = self + .executor + .spawn_blocking_handle( + move || { + network_beacon_processor.get_block_roots_for_slot_range( + req_start_slot, + req_count, + "ExecutionPayloadEnvelopesByRange", + ) + }, + "get_block_roots_for_slot_range", + ) + .ok_or((RpcErrorResponse::ServerError, "shutting down"))? + .await + .map_err(|_| (RpcErrorResponse::ServerError, "tokio join"))?? + .iter() + .map(|(root, _)| *root) + .collect::>(); + + let current_slot = self + .chain + .slot() + .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); + + let log_results = |peer_id, payloads_sent| { + if payloads_sent < (req_count as usize) { + debug!( + %peer_id, + msg = "Failed to return all requested payload envelopes", + start_slot = %req_start_slot, + %current_slot, + requested = req_count, + returned = payloads_sent, + "ExecutionPayloadEnvelopesByRange outgoing response processed" + ); + } else { + debug!( + %peer_id, + start_slot = %req_start_slot, + %current_slot, + requested = req_count, + returned = payloads_sent, + "ExecutionPayloadEnvelopesByRange outgoing response processed" + ); + } + }; + + let mut envelope_stream = match self.chain.get_payload_envelopes(block_roots) { + Ok(envelope_stream) => envelope_stream, + Err(e) => { + error!(error = ?e, "Error getting payload envelope stream"); + return Err((RpcErrorResponse::ServerError, "Iterator error")); + } + }; + + // Fetching payload envelopes is async because it may have to hit the execution layer for payloads. + let mut envelopes_sent = 0; + while let Some((root, result)) = envelope_stream.next().await { + match result.as_ref() { + Ok(Some(envelope)) => { + // Due to skip slots, blocks could be out of the range, we ensure they + // are in the range before sending + if envelope.slot() >= req_start_slot + && envelope.slot() < req_start_slot + req.count + { + envelopes_sent += 1; + self.send_network_message(NetworkMessage::SendResponse { + peer_id, + inbound_request_id, + response: Response::PayloadEnvelopesByRange(Some(envelope.clone())), + }); + } + } + Ok(None) => { + error!( + request = ?req, + %peer_id, + request_root = ?root, + "Envelope in the chain is not in the store" + ); + log_results(peer_id, envelopes_sent); + return Err((RpcErrorResponse::ServerError, "Database inconsistency")); + } + Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => { + debug!( + block_root = ?root, + reason = "execution layer not synced", + "Failed to fetch execution payload for envelope by range request" + ); + log_results(peer_id, envelopes_sent); + // send the stream terminator + return Err(( + RpcErrorResponse::ResourceUnavailable, + "Execution layer not synced", + )); + } + Err(e) => { + if matches!( + e, + BeaconChainError::ExecutionLayerErrorPayloadReconstruction(_block_hash, boxed_error) + if matches!(**boxed_error, execution_layer::Error::EngineError(_)) + ) { + warn!( + info = "this may occur occasionally when the EE is busy", + block_root = ?root, + error = ?e, + "Error rebuilding payload for peer" + ); + } else { + error!( + block_root = ?root, + error = ?e, + "Error fetching payload envelope for peer" + ); + } + log_results(peer_id, envelopes_sent); + // send the stream terminator + return Err(( + RpcErrorResponse::ServerError, + "Failed fetching payload envelopes", + )); + } + } + } + + log_results(peer_id, envelopes_sent); + Ok(()) + } + /// Handle a `BlobsByRange` request from the peer. #[instrument( name = "lh_handle_blobs_by_range_request", diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 4b0ca0d46c..7e75f3be04 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -19,11 +19,14 @@ use beacon_chain::test_utils::{ }; use beacon_chain::{BeaconChain, WhenSlotSkipped}; use beacon_processor::{work_reprocessing_queue::*, *}; +use bls::Signature; +use fixed_bytes::FixedBytesExtended; use itertools::Itertools; use libp2p::gossipsub::MessageAcceptance; use lighthouse_network::rpc::InboundRequestId; use lighthouse_network::rpc::methods::{ BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, MetaDataV3, + PayloadEnvelopesByRangeRequest, PayloadEnvelopesByRootRequest, }; use lighthouse_network::{ Client, MessageId, NetworkConfig, NetworkGlobals, PeerId, Response, @@ -41,8 +44,9 @@ use std::time::Duration; use tokio::sync::mpsc; use types::{ AttesterSlashing, BlobSidecar, ChainSpec, DataColumnSidecarList, DataColumnSubnetId, Epoch, - EthSpec, Hash256, MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, - SignedVoluntaryExit, SingleAttestation, Slot, SubnetId, + EthSpec, ExecutionPayloadEnvelope, ExecutionPayloadGloas, ExecutionRequests, Hash256, + MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, SignedVoluntaryExit, SingleAttestation, Slot, SubnetId, }; use types::{ BlobSidecarList, @@ -534,6 +538,29 @@ impl TestRig { .unwrap(); } + pub fn enqueue_payload_envelopes_by_range_request(&self, start_slot: u64, count: u64) { + self.network_beacon_processor + .send_payload_envelopes_by_range_request( + PeerId::random(), + InboundRequestId::new_unchecked(42, 24), + PayloadEnvelopesByRangeRequest { start_slot, count }, + ) + .unwrap(); + } + + pub fn enqueue_payload_envelopes_by_root_request( + &self, + beacon_block_roots: RuntimeVariableList, + ) { + self.network_beacon_processor + .send_payload_envelopes_by_roots_request( + PeerId::random(), + InboundRequestId::new_unchecked(42, 24), + PayloadEnvelopesByRootRequest { beacon_block_roots }, + ) + .unwrap(); + } + pub fn enqueue_backfill_batch(&self, epoch: Epoch) { self.network_beacon_processor .send_chain_segment( @@ -2102,3 +2129,226 @@ async fn test_data_columns_by_range_no_duplicates_with_skip_slots() { unique_roots.len(), ); } + +/// Create a test `SignedExecutionPayloadEnvelope` with the given slot and beacon block root. +fn make_test_payload_envelope( + slot: Slot, + beacon_block_root: Hash256, +) -> SignedExecutionPayloadEnvelope { + SignedExecutionPayloadEnvelope { + message: ExecutionPayloadEnvelope { + payload: ExecutionPayloadGloas::default(), + execution_requests: ExecutionRequests::default(), + builder_index: 0, + beacon_block_root, + slot, + state_root: Hash256::zero(), + }, + signature: Signature::empty(), + } +} + +#[tokio::test] +async fn test_payload_envelopes_by_range() { + // Only test when Gloas fork is scheduled + if test_spec::().gloas_fork_epoch.is_none() { + return; + }; + + let mut rig = TestRig::new(64).await; + let start_slot = 0; + let slot_count = 32; + + // Manually store payload envelopes for each block in the range + let mut expected_count = 0; + for slot in start_slot..slot_count { + if let Some(root) = rig + .chain + .block_root_at_slot(Slot::new(slot), WhenSlotSkipped::None) + .unwrap() + { + let envelope = make_test_payload_envelope(Slot::new(slot), root); + rig.chain + .store + .put_payload_envelope(&root, envelope) + .unwrap(); + expected_count += 1; + } + } + + rig.enqueue_payload_envelopes_by_range_request(start_slot, slot_count); + + let mut actual_count = 0; + while let Some(next) = rig.network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::PayloadEnvelopesByRange(envelope), + inbound_request_id: _, + } = next + { + if envelope.is_some() { + actual_count += 1; + } else { + break; + } + } else if let NetworkMessage::SendErrorResponse { .. } = next { + // Error response terminates the stream + break; + } else { + panic!("unexpected message {:?}", next); + } + } + assert_eq!(expected_count, actual_count); +} + +#[tokio::test] +async fn test_payload_envelopes_by_root() { + // Only test when Gloas fork is scheduled + if test_spec::().gloas_fork_epoch.is_none() { + return; + }; + + let mut rig = TestRig::new(64).await; + + let block_root = rig + .chain + .block_root_at_slot(Slot::new(1), WhenSlotSkipped::None) + .unwrap() + .unwrap(); + + // Manually store a payload envelope for this block + let envelope = make_test_payload_envelope(Slot::new(1), block_root); + rig.chain + .store + .put_payload_envelope(&block_root, envelope) + .unwrap(); + + let roots = RuntimeVariableList::new(vec![block_root], 1).unwrap(); + rig.enqueue_payload_envelopes_by_root_request(roots); + + let mut actual_count = 0; + while let Some(next) = rig.network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::PayloadEnvelopesByRoot(envelope), + inbound_request_id: _, + } = next + { + if envelope.is_some() { + actual_count += 1; + } else { + break; + } + } else { + panic!("unexpected message {:?}", next); + } + } + assert_eq!(1, actual_count); +} + +#[tokio::test] +async fn test_payload_envelopes_by_root_unknown_root_returns_empty() { + // Only test when Gloas fork is scheduled + if test_spec::().gloas_fork_epoch.is_none() { + return; + }; + + let mut rig = TestRig::new(64).await; + + // Request envelope for a root that has no stored envelope + let block_root = rig + .chain + .block_root_at_slot(Slot::new(1), WhenSlotSkipped::None) + .unwrap() + .unwrap(); + + // Don't store any envelope — the handler should return 0 envelopes + let roots = RuntimeVariableList::new(vec![block_root], 1).unwrap(); + rig.enqueue_payload_envelopes_by_root_request(roots); + + let mut actual_count = 0; + while let Some(next) = rig.network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::PayloadEnvelopesByRoot(envelope), + inbound_request_id: _, + } = next + { + if envelope.is_some() { + actual_count += 1; + } else { + break; + } + } else { + panic!("unexpected message {:?}", next); + } + } + assert_eq!(0, actual_count); +} + +#[tokio::test] +async fn test_payload_envelopes_by_range_no_duplicates_with_skip_slots() { + // Only test when Gloas fork is scheduled + if test_spec::().gloas_fork_epoch.is_none() { + return; + }; + + // Build a chain of 128 slots (4 epochs) with skip slots at positions 5 and 6. + let skip_slots: HashSet = [5, 6].into_iter().collect(); + let mut rig = TestRig::new_with_skip_slots(128, &skip_slots).await; + + let start_slot = 0u64; + let slot_count = 10u64; + + // Store payload envelopes for all blocks in the range (skipping the skip slots) + for slot in start_slot..slot_count { + if let Some(root) = rig + .chain + .block_root_at_slot(Slot::new(slot), WhenSlotSkipped::None) + .unwrap() + { + let envelope = make_test_payload_envelope(Slot::new(slot), root); + rig.chain + .store + .put_payload_envelope(&root, envelope) + .unwrap(); + } + } + + rig.enqueue_payload_envelopes_by_range_request(start_slot, slot_count); + + let mut beacon_block_roots: Vec = Vec::new(); + while let Some(next) = rig.network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::PayloadEnvelopesByRange(envelope), + inbound_request_id: _, + } = next + { + if let Some(env) = envelope { + beacon_block_roots.push(env.beacon_block_root()); + } else { + break; + } + } else if let NetworkMessage::SendErrorResponse { .. } = next { + break; + } else { + panic!("unexpected message {:?}", next); + } + } + + assert!( + !beacon_block_roots.is_empty(), + "Should have received at least some payload envelopes" + ); + + // Skip slots should not cause duplicate envelopes for the same block root + let unique_roots: HashSet<_> = beacon_block_roots.iter().collect(); + assert_eq!( + beacon_block_roots.len(), + unique_roots.len(), + "Response contained duplicate block roots: got {} envelopes but only {} unique roots", + beacon_block_roots.len(), + unique_roots.len(), + ); +} diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 8373dec322..f82214810c 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -229,6 +229,24 @@ impl Router { request, ), ), + RequestType::PayloadEnvelopesByRoot(request) => self + .handle_beacon_processor_send_result( + self.network_beacon_processor + .send_payload_envelopes_by_roots_request( + peer_id, + inbound_request_id, + request, + ), + ), + RequestType::PayloadEnvelopesByRange(request) => self + .handle_beacon_processor_send_result( + self.network_beacon_processor + .send_payload_envelopes_by_range_request( + peer_id, + inbound_request_id, + request, + ), + ), RequestType::BlobsByRange(request) => self.handle_beacon_processor_send_result( self.network_beacon_processor.send_blobs_by_range_request( peer_id, @@ -309,6 +327,11 @@ impl Router { Response::DataColumnsByRange(data_column) => { self.on_data_columns_by_range_response(peer_id, app_request_id, data_column); } + // TODO(EIP-7732): implement outgoing payload envelopes by range and root + // responses once sync manager requests them. + Response::PayloadEnvelopesByRoot(_) | Response::PayloadEnvelopesByRange(_) => { + unreachable!() + } // Light client responses should not be received Response::LightClientBootstrap(_) | Response::LightClientOptimisticUpdate(_) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 6e165702a2..557d2a34d6 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -1,3 +1,4 @@ +use crate::DatabasePayloadEnvelope; use crate::config::{OnDiskStoreConfig, StoreConfig}; use crate::database::interface::BeaconNodeBackend; use crate::forwards_iter::{HybridForwardsBlockRootsIterator, HybridForwardsStateRootsIterator}; @@ -745,6 +746,27 @@ impl, Cold: ItemStore> HotColdDB .map_err(|e| e.into()) } + pub fn try_get_full_payload_envelope( + &self, + block_root: &Hash256, + ) -> Result>, Error> { + // TODO(gloas) metrics + // metrics::inc_counter(&metrics::PAYLOAD_ENVELOPE_GET_COUNT); + + // Load the execution payload envelope + // TODO(gloas) we'll want to implement a way to load a blinded envelope + let Some(envelope) = self.get_payload_envelope(block_root)? else { + return Ok(None); + }; + + Ok(Some(DatabasePayloadEnvelope::Full(envelope))) + + // TODO(gloas) implement the logic described below (see `try_get_full_block`) + // If the payload envelope is after the split point then we should have the full execution payload + // stored in the database. If it isn't but payload pruning is disabled, try to load it + // on-demand. + } + pub fn get_payload_envelope( &self, block_root: &Hash256, diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 3363eb800c..3351b182ec 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -390,6 +390,13 @@ pub enum DatabaseBlock { Blinded(SignedBeaconBlock>), } +/// An execution payload envelope from the database +// TODO(gloas) implement blinded variant +pub enum DatabasePayloadEnvelope { + Full(SignedExecutionPayloadEnvelope), + Blinded(SignedExecutionPayloadEnvelope), +} + impl DBColumn { pub fn as_str(self) -> &'static str { self.into() From e966428ef047df5c9e0efb1a09425fac031f8aa3 Mon Sep 17 00:00:00 2001 From: Eitan Seri- Levi Date: Tue, 24 Feb 2026 01:03:07 -0800 Subject: [PATCH 04/13] fix spelling issues --- .../src/execution_payload_envelope_streamer.rs | 4 ++-- beacon_node/lighthouse_network/src/rpc/codec.rs | 4 ++-- beacon_node/lighthouse_network/src/rpc/methods.rs | 10 +++++----- .../lighthouse_network/src/service/api_types.rs | 4 ++-- beacon_node/lighthouse_network/src/service/mod.rs | 2 +- .../src/network_beacon_processor/rpc_methods.rs | 2 +- 6 files changed, 13 insertions(+), 13 deletions(-) diff --git a/beacon_node/beacon_chain/src/execution_payload_envelope_streamer.rs b/beacon_node/beacon_chain/src/execution_payload_envelope_streamer.rs index e6522d7beb..a30682e3a5 100644 --- a/beacon_node/beacon_chain/src/execution_payload_envelope_streamer.rs +++ b/beacon_node/beacon_chain/src/execution_payload_envelope_streamer.rs @@ -21,7 +21,7 @@ pub struct PayloadEnvelopeStreamer { _check_caches: CheckCaches, } -// TODO(gloas) eventually we'll need to expand this to support loading blinded payload envelopes from the dsb +// TODO(gloas) eventually we'll need to expand this to support loading blinded payload envelopes from the db // and fetching the execution payload from the EL. See BlockStreamer impl as an example impl PayloadEnvelopeStreamer { pub fn new( @@ -43,7 +43,7 @@ impl PayloadEnvelopeStreamer { })) } - // TODO(gloas) simply a strub impl for now. Should check some exec payload envelope cache + // TODO(gloas) simply a stub impl for now. Should check some exec payload envelope cache // and return the envelope if it exists in the cache fn check_payload_envelope_cache( &self, diff --git a/beacon_node/lighthouse_network/src/rpc/codec.rs b/beacon_node/lighthouse_network/src/rpc/codec.rs index ea615452da..ceb7090b21 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec.rs @@ -77,7 +77,7 @@ impl SSZSnappyInboundCodec { }, RpcSuccessResponse::BlocksByRange(res) => res.as_ssz_bytes(), RpcSuccessResponse::BlocksByRoot(res) => res.as_ssz_bytes(), - RpcSuccessResponse::PayloadEnvelopesbyRange(res) => res.as_ssz_bytes(), + RpcSuccessResponse::PayloadEnvelopesByRange(res) => res.as_ssz_bytes(), RpcSuccessResponse::PayloadEnvelopesByRoot(res) => res.as_ssz_bytes(), RpcSuccessResponse::BlobsByRange(res) => res.as_ssz_bytes(), RpcSuccessResponse::BlobsByRoot(res) => res.as_ssz_bytes(), @@ -669,7 +669,7 @@ fn handle_rpc_response( SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?), )))), SupportedProtocol::PayloadEnvelopesByRangeV1 => { - Ok(Some(RpcSuccessResponse::PayloadEnvelopesbyRange(Arc::new( + Ok(Some(RpcSuccessResponse::PayloadEnvelopesByRange(Arc::new( SignedExecutionPayloadEnvelope::from_ssz_bytes(decoded_buffer)?, )))) } diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index a07ad8d183..8113a0b5c1 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -516,7 +516,7 @@ impl BlocksByRootRequest { } } -/// Reqwuest a number of execution payload envelopes from a peer +/// Request a number of execution payload envelopes from a peer. #[derive(Clone, Debug, PartialEq)] pub struct PayloadEnvelopesByRootRequest { /// The list of beacon block roots used to request execution payload envelopes. @@ -626,7 +626,7 @@ pub enum RpcSuccessResponse { /// A response to a get EXECUTION_PAYLOAD_ENVELOPES_BY_RANGE request. A None response signifies /// the end of the batch. - PayloadEnvelopesbyRange(Arc>), + PayloadEnvelopesByRange(Arc>), /// A response to a get EXECUTION_PAYLOAD_ENVELOPES_BY_ROOT request. PayloadEnvelopesByRoot(Arc>), @@ -795,7 +795,7 @@ impl RpcSuccessResponse { RpcSuccessResponse::Status(_) => Protocol::Status, RpcSuccessResponse::BlocksByRange(_) => Protocol::BlocksByRange, RpcSuccessResponse::BlocksByRoot(_) => Protocol::BlocksByRoot, - RpcSuccessResponse::PayloadEnvelopesbyRange(_) => Protocol::PayloadEnvelopesByRange, + RpcSuccessResponse::PayloadEnvelopesByRange(_) => Protocol::PayloadEnvelopesByRange, RpcSuccessResponse::PayloadEnvelopesByRoot(_) => Protocol::PayloadEnvelopesByRoot, RpcSuccessResponse::BlobsByRange(_) => Protocol::BlobsByRange, RpcSuccessResponse::BlobsByRoot(_) => Protocol::BlobsByRoot, @@ -815,7 +815,7 @@ impl RpcSuccessResponse { pub fn slot(&self) -> Option { match self { Self::BlocksByRange(r) | Self::BlocksByRoot(r) => Some(r.slot()), - Self::PayloadEnvelopesByRoot(r) | Self::PayloadEnvelopesbyRange(r) => Some(r.slot()), + Self::PayloadEnvelopesByRoot(r) | Self::PayloadEnvelopesByRange(r) => Some(r.slot()), Self::BlobsByRange(r) | Self::BlobsByRoot(r) => Some(r.slot()), Self::DataColumnsByRange(r) | Self::DataColumnsByRoot(r) => Some(r.slot()), Self::LightClientBootstrap(r) => Some(r.get_slot()), @@ -866,7 +866,7 @@ impl std::fmt::Display for RpcSuccessResponse { RpcSuccessResponse::BlocksByRoot(block) => { write!(f, "BlocksByRoot: Block slot: {}", block.slot()) } - RpcSuccessResponse::PayloadEnvelopesbyRange(envelope) => { + RpcSuccessResponse::PayloadEnvelopesByRange(envelope) => { write!( f, "ExecutionPayloadEnvelopesByRange: Envelope slot: {}", diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 6277fc7dec..486a443857 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -163,7 +163,7 @@ pub enum Response { BlocksByRoot(Option>>), /// A response to a get `EXECUTION_PAYLOAD_ENVELOPES_BY_ROOT` request. PayloadEnvelopesByRoot(Option>>), - /// A response to a get `EXECUTION_PAYLOAD_ENVELOPES_BYH_RANGE` request. + /// A response to a get `EXECUTION_PAYLOAD_ENVELOPES_BY_RANGE` request. PayloadEnvelopesByRange(Option>>), /// A response to a get BLOBS_BY_ROOT request. BlobsByRoot(Option>>), @@ -195,7 +195,7 @@ impl std::convert::From> for RpcResponse { None => RpcResponse::StreamTermination(ResponseTermination::PayloadEnvelopesByRoot), }, Response::PayloadEnvelopesByRange(r) => match r { - Some(p) => RpcResponse::Success(RpcSuccessResponse::PayloadEnvelopesbyRange(p)), + Some(p) => RpcResponse::Success(RpcSuccessResponse::PayloadEnvelopesByRange(p)), None => { RpcResponse::StreamTermination(ResponseTermination::PayloadEnvelopesByRange) } diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 304f7f5baf..6c503d3e10 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -1661,7 +1661,7 @@ impl Network { RpcSuccessResponse::BlocksByRoot(resp) => { self.build_response(id, peer_id, Response::BlocksByRoot(Some(resp))) } - RpcSuccessResponse::PayloadEnvelopesbyRange(resp) => self.build_response( + RpcSuccessResponse::PayloadEnvelopesByRange(resp) => self.build_response( id, peer_id, Response::PayloadEnvelopesByRange(Some(resp)), diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 412814a5cb..88d3697c9d 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -286,7 +286,7 @@ impl NetworkBeaconProcessor { ); } - /// Handle a `ExecutionPayloadEnvelopes` request from the peer. + /// Handle a `ExecutionPayloadEnvelopesByRoot` request from the peer. async fn handle_payload_envelopes_by_root_request_inner( self: Arc, peer_id: PeerId, From 45912fb810d9e93cdc313658111605f6af3515f7 Mon Sep 17 00:00:00 2001 From: Eitan Seri- Levi Date: Tue, 24 Feb 2026 01:13:35 -0800 Subject: [PATCH 05/13] Delete dead code --- beacon_node/store/src/hot_cold_store.rs | 22 ---------------------- beacon_node/store/src/lib.rs | 7 ------- 2 files changed, 29 deletions(-) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 557d2a34d6..6e165702a2 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -1,4 +1,3 @@ -use crate::DatabasePayloadEnvelope; use crate::config::{OnDiskStoreConfig, StoreConfig}; use crate::database::interface::BeaconNodeBackend; use crate::forwards_iter::{HybridForwardsBlockRootsIterator, HybridForwardsStateRootsIterator}; @@ -746,27 +745,6 @@ impl, Cold: ItemStore> HotColdDB .map_err(|e| e.into()) } - pub fn try_get_full_payload_envelope( - &self, - block_root: &Hash256, - ) -> Result>, Error> { - // TODO(gloas) metrics - // metrics::inc_counter(&metrics::PAYLOAD_ENVELOPE_GET_COUNT); - - // Load the execution payload envelope - // TODO(gloas) we'll want to implement a way to load a blinded envelope - let Some(envelope) = self.get_payload_envelope(block_root)? else { - return Ok(None); - }; - - Ok(Some(DatabasePayloadEnvelope::Full(envelope))) - - // TODO(gloas) implement the logic described below (see `try_get_full_block`) - // If the payload envelope is after the split point then we should have the full execution payload - // stored in the database. If it isn't but payload pruning is disabled, try to load it - // on-demand. - } - pub fn get_payload_envelope( &self, block_root: &Hash256, diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 3351b182ec..3363eb800c 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -390,13 +390,6 @@ pub enum DatabaseBlock { Blinded(SignedBeaconBlock>), } -/// An execution payload envelope from the database -// TODO(gloas) implement blinded variant -pub enum DatabasePayloadEnvelope { - Full(SignedExecutionPayloadEnvelope), - Blinded(SignedExecutionPayloadEnvelope), -} - impl DBColumn { pub fn as_str(self) -> &'static str { self.into() From 2f43d234d88a9c43518f032d5e4b34fe1c889068 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Fri, 27 Feb 2026 08:05:57 +1100 Subject: [PATCH 06/13] Add CI job to check for deleted files (#8727) Co-Authored-By: Michael Sproul Co-Authored-By: Michael Sproul --- .github/CODEOWNERS | 1 + .github/forbidden-files.txt | 7 +++++++ .github/workflows/test-suite.yml | 22 ++++++++++++++++++++++ 3 files changed, 30 insertions(+) create mode 100644 .github/forbidden-files.txt diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index cdec442276..e9ec8740a0 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1,3 +1,4 @@ /beacon_node/network/ @jxs /beacon_node/lighthouse_network/ @jxs /beacon_node/store/ @michaelsproul +/.github/forbidden-files.txt @michaelsproul diff --git a/.github/forbidden-files.txt b/.github/forbidden-files.txt new file mode 100644 index 0000000000..ec89bd2e4b --- /dev/null +++ b/.github/forbidden-files.txt @@ -0,0 +1,7 @@ +# Files that have been intentionally deleted and should not be re-added. +# This prevents accidentally reviving files during botched merges. +# Add one file path per line (relative to repo root). + +beacon_node/beacon_chain/src/otb_verification_service.rs +beacon_node/store/src/partial_beacon_state.rs +beacon_node/store/src/consensus_context.rs diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml index 72ea9d41ae..d9efbfc148 100644 --- a/.github/workflows/test-suite.yml +++ b/.github/workflows/test-suite.yml @@ -72,6 +72,27 @@ jobs: steps: - name: Check that the pull request is not targeting the stable branch run: test ${{ github.base_ref }} != "stable" + + forbidden-files-check: + name: forbidden-files-check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Check for forbidden files + run: | + if [ -f .github/forbidden-files.txt ]; then + status=0 + while IFS= read -r file || [ -n "$file" ]; do + # Skip comments and empty lines + [[ "$file" =~ ^#.*$ || -z "$file" ]] && continue + if [ -f "$file" ]; then + echo "::error::Forbidden file exists: $file" + status=1 + fi + done < .github/forbidden-files.txt + exit $status + fi + release-tests-ubuntu: name: release-tests-ubuntu needs: [check-labels] @@ -430,6 +451,7 @@ jobs: needs: [ 'check-labels', 'target-branch-check', + 'forbidden-files-check', 'release-tests-ubuntu', 'beacon-chain-tests', 'op-pool-tests', From 8cf6ffac4b1954bfc61939e116afd5e2ab349dbb Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Wed, 25 Feb 2026 23:10:41 +1100 Subject: [PATCH 07/13] Update yanked keccak 0.1.5 to 0.1.6 (#8900) Co-Authored-By: Jimmy Chen --- Cargo.lock | 7 +++---- Cargo.toml | 1 + 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7d75f5c197..dd1637045b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4832,9 +4832,9 @@ dependencies = [ [[package]] name = "keccak" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ecc2af9a1119c51f12a14607e783cb977bde58bc069ff0c3da1095e635d70654" +checksum = "cb26cec98cce3a3d96cbb7bced3c4b16e3d13f27ec56dbd62cbc8f39cfb9d653" dependencies = [ "cpufeatures", ] @@ -10607,8 +10607,7 @@ dependencies = [ [[package]] name = "yamux" version = "0.13.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "deab71f2e20691b4728b349c6cee8fc7223880fa67b6b4f92225ec32225447e5" +source = "git+https://github.com/sigp/rust-yamux?rev=575b17c0f44f4253079a6bafaa2de74ca1d6dfaa#575b17c0f44f4253079a6bafaa2de74ca1d6dfaa" dependencies = [ "futures", "log", diff --git a/Cargo.toml b/Cargo.toml index aac26e060b..61caacf5df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -303,3 +303,4 @@ debug = true [patch.crates-io] quick-protobuf = { git = "https://github.com/sigp/quick-protobuf.git", rev = "681f413312404ab6e51f0b46f39b0075c6f4ebfd" } +yamux = { git = "https://github.com/sigp/rust-yamux", rev = "575b17c0f44f4253079a6bafaa2de74ca1d6dfaa" } From 95f12d0927831971ca9e1acf7ca7e87fdda56f4a Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Fri, 27 Feb 2026 16:48:56 +1100 Subject: [PATCH 08/13] Bump version to v8.1.1 (#8853) --- Cargo.lock | 14 +++++++------- Cargo.toml | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dd1637045b..40c550f4c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4,7 +4,7 @@ version = 4 [[package]] name = "account_manager" -version = "8.1.0" +version = "8.1.1" dependencies = [ "account_utils", "bls", @@ -1276,7 +1276,7 @@ dependencies = [ [[package]] name = "beacon_node" -version = "8.1.0" +version = "8.1.1" dependencies = [ "account_utils", "beacon_chain", @@ -1513,7 +1513,7 @@ dependencies = [ [[package]] name = "boot_node" -version = "8.1.0" +version = "8.1.1" dependencies = [ "beacon_node", "bytes", @@ -4897,7 +4897,7 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "lcli" -version = "8.1.0" +version = "8.1.1" dependencies = [ "account_utils", "beacon_chain", @@ -5383,7 +5383,7 @@ dependencies = [ [[package]] name = "lighthouse" -version = "8.1.0" +version = "8.1.1" dependencies = [ "account_manager", "account_utils", @@ -5515,7 +5515,7 @@ dependencies = [ [[package]] name = "lighthouse_version" -version = "8.1.0" +version = "8.1.1" dependencies = [ "regex", ] @@ -9622,7 +9622,7 @@ dependencies = [ [[package]] name = "validator_client" -version = "8.1.0" +version = "8.1.1" dependencies = [ "account_utils", "beacon_node_fallback", diff --git a/Cargo.toml b/Cargo.toml index 61caacf5df..5f6f43d2f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -91,7 +91,7 @@ resolver = "2" [workspace.package] edition = "2024" -version = "8.1.0" +version = "8.1.1" [workspace.dependencies] account_utils = { path = "common/account_utils" } From 6194dddc5b9ea176fc38796fb5de6c7fac8a8143 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Mon, 2 Mar 2026 17:43:51 +1100 Subject: [PATCH 09/13] Persist custody context more readily (#8921) We received a bug report of a node restarting custody backfill unnecessarily after upgrading to Lighthouse v8.1.1. What happened is: - User started LH v8.0.1 many months ago, CGC updated 0 -> N but the CGC was not eagerly persisted. - LH experienced an unclean shutdown (not sure of what type). - Upon restarting (still running v8.0.1), the custody context read from disk contains CGC=0: `DEBUG Loaded persisted custody context custody_context: CustodyContext { validator_custody_count: 0, ...`). - CGC updates again to N, retriggering custody backfill: `DEBUG Validator count at head updated old_count: 0, new_count: N`. - Custody backfill does a bunch of downloading for no gain: `DEBUG Imported historical data columns epoch: Epoch(428433), total_imported: 0` - While custody backfill is running user updated to v8.1.1, and we see logs for the CGC=N being peristed upon clean shutdown, and then correctly read on startup with v8.1.1. - Custody backfill keeps running and downloading due to the CGC change still being considered in progress. - Call `persist_custody_context` inside the `register_validators` handler so that it is written to disk eagerly whenever it changes. The performance impact of this should be minimal as the amount of data is very small and this call can only happen at most ~128 times (once for each change) in the entire life of a beacon node. - Call `persist_custody_context` inside `BeaconChainBuilder::build` so that changes caused by CLI flags are persisted (otherwise starting a node with `--semi-supernode` and no validators, then shutting it down uncleanly would cause use to forget the CGC). These changes greatly reduce the timespan during which an unclean shutdown can create inconsistency. In the worst case, we only lose backfill progress that runs concurrently with the `register_validators` handler (should be extremely minimal, nigh impossible). Co-Authored-By: Michael Sproul --- beacon_node/beacon_chain/src/beacon_chain.rs | 13 ++++++++++++- beacon_node/beacon_chain/src/builder.rs | 5 +++++ beacon_node/http_api/src/validator/mod.rs | 12 ++++++++++++ 3 files changed, 29 insertions(+), 1 deletion(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 9d204ac7f2..703ed24420 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -662,7 +662,18 @@ impl BeaconChain { .custody_context() .as_ref() .into(); - debug!(?custody_context, "Persisting custody context to store"); + + // Pattern match to avoid accidentally missing fields and to ignore deprecated fields. + let CustodyContextSsz { + validator_custody_at_head, + epoch_validator_custody_requirements, + persisted_is_supernode: _, + } = &custody_context; + debug!( + validator_custody_at_head, + ?epoch_validator_custody_requirements, + "Persisting custody context to store" + ); persist_custody_context::( self.store.clone(), diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 2c1dae9215..66a54d46e8 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -1083,6 +1083,11 @@ where let cgc_change_effective_slot = cgc_changed.effective_epoch.start_slot(E::slots_per_epoch()); beacon_chain.update_data_column_custody_info(Some(cgc_change_effective_slot)); + + // Persist change to disk. + beacon_chain + .persist_custody_context() + .map_err(|e| format!("Failed writing updated CGC: {e:?}"))?; } info!( diff --git a/beacon_node/http_api/src/validator/mod.rs b/beacon_node/http_api/src/validator/mod.rs index df237d9f9b..a9082df715 100644 --- a/beacon_node/http_api/src/validator/mod.rs +++ b/beacon_node/http_api/src/validator/mod.rs @@ -727,6 +727,18 @@ pub fn post_validator_prepare_beacon_proposer( debug!(error = %e, "Could not send message to the network service. \ Likely shutdown") }); + + // Write the updated custody context to disk. This happens at most 128 + // times ever, so the I/O burden should be extremely minimal. Without a + // write here we risk forgetting custody backfill progress upon an + // unclean shutdown. The custody context is otherwise only persisted in + // `BeaconChain::drop`. + if let Err(error) = chain.persist_custody_context() { + error!( + ?error, + "Failed to persist custody context after CGC update" + ); + } } } From f4b5b033a227fcacdb8e8514bbca6cf6702f3a24 Mon Sep 17 00:00:00 2001 From: Mac L Date: Mon, 2 Mar 2026 09:19:41 +0200 Subject: [PATCH 10/13] Add `testing` feature to validator_client/http_api (#8909) Create a `testing` feature which we can use to gate off `test_utils.rs` and its associated dependencies from the rest of the crate. Co-Authored-By: Mac L --- validator_client/http_api/Cargo.toml | 12 +++++++++--- validator_client/http_api/src/lib.rs | 4 +++- validator_manager/Cargo.toml | 2 +- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/validator_client/http_api/Cargo.toml b/validator_client/http_api/Cargo.toml index 2bd57867ac..e334ab9db0 100644 --- a/validator_client/http_api/Cargo.toml +++ b/validator_client/http_api/Cargo.toml @@ -8,14 +8,17 @@ authors = ["Sigma Prime "] name = "validator_http_api" path = "src/lib.rs" +[features] +testing = ["dep:deposit_contract", "dep:doppelganger_service", "dep:tempfile"] + [dependencies] account_utils = { workspace = true } beacon_node_fallback = { workspace = true } bls = { workspace = true } -deposit_contract = { workspace = true } +deposit_contract = { workspace = true, optional = true } directory = { workspace = true } dirs = { workspace = true } -doppelganger_service = { workspace = true } +doppelganger_service = { workspace = true, optional = true } eth2 = { workspace = true, features = ["lighthouse"] } eth2_keystore = { workspace = true } ethereum_serde_utils = { workspace = true } @@ -38,7 +41,7 @@ slot_clock = { workspace = true } sysinfo = { workspace = true } system_health = { workspace = true } task_executor = { workspace = true } -tempfile = { workspace = true } +tempfile = { workspace = true, optional = true } tokio = { workspace = true } tokio-stream = { workspace = true } tracing = { workspace = true } @@ -53,7 +56,10 @@ warp_utils = { workspace = true } zeroize = { workspace = true } [dev-dependencies] +deposit_contract = { workspace = true } +doppelganger_service = { workspace = true } futures = { workspace = true } itertools = { workspace = true } rand = { workspace = true, features = ["small_rng"] } ssz_types = { workspace = true } +tempfile = { workspace = true } diff --git a/validator_client/http_api/src/lib.rs b/validator_client/http_api/src/lib.rs index a35b4ec6c6..8e9c077e57 100644 --- a/validator_client/http_api/src/lib.rs +++ b/validator_client/http_api/src/lib.rs @@ -1,3 +1,6 @@ +#[cfg(feature = "testing")] +pub mod test_utils; + mod api_secret; mod create_signed_voluntary_exit; mod create_validator; @@ -6,7 +9,6 @@ mod keystores; mod remotekeys; mod tests; -pub mod test_utils; pub use api_secret::PK_FILENAME; use graffiti::{delete_graffiti, get_graffiti, set_graffiti}; diff --git a/validator_manager/Cargo.toml b/validator_manager/Cargo.toml index 16ce1e023f..d0155698b4 100644 --- a/validator_manager/Cargo.toml +++ b/validator_manager/Cargo.toml @@ -29,4 +29,4 @@ beacon_chain = { workspace = true } http_api = { workspace = true } regex = { workspace = true } tempfile = { workspace = true } -validator_http_api = { workspace = true } +validator_http_api = { workspace = true, features = ["testing"] } From 0212e84fa8b1be350552eda728796465029a5b83 Mon Sep 17 00:00:00 2001 From: Eitan Seri- Levi Date: Mon, 2 Mar 2026 23:39:24 -0800 Subject: [PATCH 11/13] Use max request payloads --- beacon_node/lighthouse_network/src/rpc/methods.rs | 2 +- beacon_node/lighthouse_network/src/rpc/protocol.rs | 6 ++++++ consensus/types/src/core/chain_spec.rs | 9 ++++++++- 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 8113a0b5c1..390510d2e3 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -530,7 +530,7 @@ impl PayloadEnvelopesByRootRequest { ) -> Result { let max_requests_envelopes = fork_context .spec - .max_request_blocks(fork_context.current_fork_name()); + .max_request_payloads(fork_context.current_fork_name()); let beacon_block_roots = RuntimeVariableList::new(beacon_block_roots, max_requests_envelopes).map_err(|e| { diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index dd9982056b..17b4054248 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -423,6 +423,12 @@ impl SupportedProtocol { ProtocolId::new(SupportedProtocol::DataColumnsByRangeV1, Encoding::SSZSnappy), ]); } + if fork_context.fork_exists(ForkName::Gloas) { + supported.extend_from_slice(&[ + ProtocolId::new(SupportedProtocol::PayloadEnvelopesByRangeV1, Encoding::SSZSnappy), + ProtocolId::new(SupportedProtocol::PayloadEnvelopesByRootV1, Encoding::SSZSnappy), + ]); + } supported } } diff --git a/consensus/types/src/core/chain_spec.rs b/consensus/types/src/core/chain_spec.rs index adf87dee94..90253c12aa 100644 --- a/consensus/types/src/core/chain_spec.rs +++ b/consensus/types/src/core/chain_spec.rs @@ -295,6 +295,7 @@ pub struct ChainSpec { /* * Networking Gloas */ + pub max_request_payloads: u64, /* * Networking Derived @@ -700,6 +701,10 @@ impl ChainSpec { } } + pub fn max_request_payloads(&self) -> usize { + self.max_request_payloads as usize + } + pub fn max_request_blob_sidecars(&self, fork_name: ForkName) -> usize { if fork_name.electra_enabled() { self.max_request_blob_sidecars_electra as usize @@ -1228,6 +1233,7 @@ impl ChainSpec { builder_payment_threshold_numerator: 6, builder_payment_threshold_denominator: 10, min_builder_withdrawability_delay: Epoch::new(4096), + max_request_payloads: 128, /* * Network specific @@ -1622,7 +1628,8 @@ impl ChainSpec { builder_payment_threshold_numerator: 6, builder_payment_threshold_denominator: 10, min_builder_withdrawability_delay: Epoch::new(4096), - + max_request_payloads: 128, + /* * Network specific */ From 8378f7d2949f03f00ee61b2d70b7801dfe185aeb Mon Sep 17 00:00:00 2001 From: Eitan Seri- Levi Date: Tue, 3 Mar 2026 18:32:57 -0800 Subject: [PATCH 12/13] Fixes based on feedback --- beacon_node/beacon_chain/src/errors.rs | 9 ++ .../execution_payload_envelope_streamer.rs | 89 +++++++++++-------- .../lighthouse_network/src/rpc/methods.rs | 4 +- .../lighthouse_network/src/rpc/protocol.rs | 34 +++++-- .../src/network_beacon_processor/tests.rs | 20 ++--- common/task_executor/src/lib.rs | 40 +++++++++ consensus/types/src/core/chain_spec.rs | 15 +++- .../execution/execution_payload_envelope.rs | 4 +- .../signed_execution_payload_envelope.rs | 31 +++++++ 9 files changed, 186 insertions(+), 60 deletions(-) diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 6c8f0d2794..d99afe1dba 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -325,6 +325,15 @@ pub enum BlockProductionError { GloasNotImplemented(String), } +impl From for BeaconChainError { + fn from(e: task_executor::SpawnBlockingError) -> Self { + match e { + task_executor::SpawnBlockingError::RuntimeShutdown => BeaconChainError::RuntimeShutdown, + task_executor::SpawnBlockingError::JoinError(e) => BeaconChainError::TokioJoin(e), + } + } +} + easy_from_to!(BlockProcessingError, BlockProductionError); easy_from_to!(BeaconStateError, BlockProductionError); easy_from_to!(SlotProcessingError, BlockProductionError); diff --git a/beacon_node/beacon_chain/src/execution_payload_envelope_streamer.rs b/beacon_node/beacon_chain/src/execution_payload_envelope_streamer.rs index a30682e3a5..d1dae6efc9 100644 --- a/beacon_node/beacon_chain/src/execution_payload_envelope_streamer.rs +++ b/beacon_node/beacon_chain/src/execution_payload_envelope_streamer.rs @@ -15,7 +15,9 @@ type PayloadEnvelopeResult = Result>>, BeaconChainError>; pub struct PayloadEnvelopeStreamer { - execution_layer: ExecutionLayer, + // TODO(gloas) remove _ when we use the execution layer + // to load payload envelopes + _execution_layer: ExecutionLayer, store: BeaconStore, task_executor: TaskExecutor, _check_caches: CheckCaches, @@ -36,7 +38,7 @@ impl PayloadEnvelopeStreamer { .clone(); Ok(Arc::new(Self { - execution_layer, + _execution_layer: execution_layer, store, task_executor, _check_caches: check_caches, @@ -53,31 +55,56 @@ impl PayloadEnvelopeStreamer { None } - // used when the execution engine doesn't support the payload bodies methods - async fn stream_payload_envelopes_fallback( + async fn load_envelopes( + self: &Arc, + beacon_block_roots: &[Hash256], + ) -> Result)>, BeaconChainError> { + let streamer = self.clone(); + let roots = beacon_block_roots.to_vec(); + // Loading from the DB is slow -> spawn a blocking task + self.task_executor + .spawn_blocking_and_await( + move || { + let mut results = Vec::new(); + for root in roots { + if let Some(cached) = streamer.check_payload_envelope_cache(root) { + results.push((root, Ok(Some(cached)))); + continue; + } + // TODO(gloas) we'll want to use the execution layer directly to call + // the engine api method eth_getBlockByHash() + match streamer.store.get_payload_envelope(&root) { + Ok(opt_envelope) => { + results.push((root, Ok(opt_envelope.map(Arc::new)))); + } + Err(e) => { + results.push((root, Err(BeaconChainError::DBError(e)))); + } + } + } + results + }, + "load_execution_payload_envelopes", + ) + .await + .map_err(BeaconChainError::from) + } + + async fn stream_payload_envelopes( self: Arc, beacon_block_roots: Vec, sender: UnboundedSender<(Hash256, Arc>)>, ) { - debug!("Using slower fallback method of eth_getBlockByHash()"); - for beacon_block_root in beacon_block_roots { - let cached_envelope = self.check_payload_envelope_cache(beacon_block_root); + let results = match self.load_envelopes(&beacon_block_roots).await { + Ok(results) => results, + Err(e) => { + send_errors(beacon_block_roots, sender, e).await; + return; + } + }; - let envelope_result = if cached_envelope.is_some() { - Ok(cached_envelope) - } else { - // TODO(gloas) we'll want to use the execution layer directly to call - // the engine api method eth_getBlockByHash() - self.store - .get_payload_envelope(&beacon_block_root) - .map(|opt_envelope| opt_envelope.map(Arc::new)) - .map_err(BeaconChainError::DBError) - }; - - if sender - .send((beacon_block_root, Arc::new(envelope_result))) - .is_err() - { + for (root, result) in results { + if sender.send((root, Arc::new(result))).is_err() { break; } } @@ -88,22 +115,8 @@ impl PayloadEnvelopeStreamer { beacon_block_roots: Vec, sender: UnboundedSender<(Hash256, Arc>)>, ) { - match self - .execution_layer - .get_engine_capabilities(None) - .await - .map_err(Box::new) - .map_err(BeaconChainError::EngineGetCapabilititesFailed) - { - Ok(_engine_capabilities) => { - // TODO(gloas) should check engine capabilities for get_payload_bodies_by_range_v1 - self.stream_payload_envelopes_fallback(beacon_block_roots, sender) - .await; - } - Err(e) => { - send_errors(beacon_block_roots, sender, e).await; - } - } + self.stream_payload_envelopes(beacon_block_roots, sender) + .await; } pub fn launch_stream( diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 390510d2e3..baabf48683 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -528,9 +528,7 @@ impl PayloadEnvelopesByRootRequest { beacon_block_roots: Vec, fork_context: &ForkContext, ) -> Result { - let max_requests_envelopes = fork_context - .spec - .max_request_payloads(fork_context.current_fork_name()); + let max_requests_envelopes = fork_context.spec.max_request_payloads(); let beacon_block_roots = RuntimeVariableList::new(beacon_block_roots, max_requests_envelopes).map_err(|e| { diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 17b4054248..74c9ddef9c 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -22,7 +22,7 @@ use types::{ LightClientBootstrap, LightClientBootstrapAltair, LightClientFinalityUpdate, LightClientFinalityUpdateAltair, LightClientOptimisticUpdate, LightClientOptimisticUpdateAltair, LightClientUpdate, MainnetEthSpec, MinimalEthSpec, - SignedBeaconBlock, + SignedBeaconBlock, SignedExecutionPayloadEnvelope, }; // Note: Hardcoding the `EthSpec` type for `SignedBeaconBlock` as min/max values is @@ -65,6 +65,12 @@ pub static SIGNED_BEACON_BLOCK_BELLATRIX_MAX: LazyLock = + types::ExecutionPayload::::max_execution_payload_bellatrix_size() // adding max size of execution payload (~16gb) + ssz::BYTES_PER_LENGTH_OFFSET); // Adding the additional ssz offset for the `ExecutionPayload` field +pub static SIGNED_EXECUTION_PAYLOAD_ENVELOPE_MIN: LazyLock = + LazyLock::new(SignedExecutionPayloadEnvelope::::min_size); + +pub static SIGNED_EXECUTION_PAYLOAD_ENVELOPE_MAX: LazyLock = + LazyLock::new(SignedExecutionPayloadEnvelope::::max_size); + pub static BLOB_SIDECAR_SIZE: LazyLock = LazyLock::new(BlobSidecar::::max_size); @@ -147,6 +153,14 @@ pub fn rpc_block_limits_by_fork(current_fork: ForkName) -> RpcLimits { } } +/// Returns the rpc limits for payload_envelope_by_range and payload_envelope_by_root responses. +pub fn rpc_payload_limits() -> RpcLimits { + RpcLimits::new( + *SIGNED_EXECUTION_PAYLOAD_ENVELOPE_MIN, + *SIGNED_EXECUTION_PAYLOAD_ENVELOPE_MAX, + ) +} + fn rpc_light_client_updates_by_range_limits_by_fork(current_fork: ForkName) -> RpcLimits { let altair_fixed_len = LightClientFinalityUpdateAltair::::ssz_fixed_len(); @@ -425,8 +439,14 @@ impl SupportedProtocol { } if fork_context.fork_exists(ForkName::Gloas) { supported.extend_from_slice(&[ - ProtocolId::new(SupportedProtocol::PayloadEnvelopesByRangeV1, Encoding::SSZSnappy), - ProtocolId::new(SupportedProtocol::PayloadEnvelopesByRootV1, Encoding::SSZSnappy), + ProtocolId::new( + SupportedProtocol::PayloadEnvelopesByRangeV1, + Encoding::SSZSnappy, + ), + ProtocolId::new( + SupportedProtocol::PayloadEnvelopesByRootV1, + Encoding::SSZSnappy, + ), ]); } supported @@ -535,7 +555,9 @@ impl ProtocolId { ::ssz_fixed_len(), ::ssz_fixed_len(), ), - Protocol::PayloadEnvelopesByRoot => RpcLimits::new(0, spec.max_blocks_by_root_request), + Protocol::PayloadEnvelopesByRoot => { + RpcLimits::new(0, spec.max_payload_envelopes_by_root_request) + } Protocol::BlobsByRange => RpcLimits::new( ::ssz_fixed_len(), ::ssz_fixed_len(), @@ -577,9 +599,7 @@ impl ProtocolId { Protocol::PayloadEnvelopesByRange => { rpc_block_limits_by_fork(fork_context.current_fork_name()) } - Protocol::PayloadEnvelopesByRoot => { - rpc_block_limits_by_fork(fork_context.current_fork_name()) - } + Protocol::PayloadEnvelopesByRoot => rpc_payload_limits(), Protocol::BlobsByRange => rpc_blob_limits::(), Protocol::BlobsByRoot => rpc_blob_limits::(), Protocol::DataColumnsByRoot => { diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 7e75f3be04..2e4b0fbd2a 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -2160,7 +2160,7 @@ async fn test_payload_envelopes_by_range() { let slot_count = 32; // Manually store payload envelopes for each block in the range - let mut expected_count = 0; + let mut expected_roots = Vec::new(); for slot in start_slot..slot_count { if let Some(root) = rig .chain @@ -2172,13 +2172,13 @@ async fn test_payload_envelopes_by_range() { .store .put_payload_envelope(&root, envelope) .unwrap(); - expected_count += 1; + expected_roots.push(root); } } rig.enqueue_payload_envelopes_by_range_request(start_slot, slot_count); - let mut actual_count = 0; + let mut actual_roots = Vec::new(); while let Some(next) = rig.network_rx.recv().await { if let NetworkMessage::SendResponse { peer_id: _, @@ -2186,8 +2186,8 @@ async fn test_payload_envelopes_by_range() { inbound_request_id: _, } = next { - if envelope.is_some() { - actual_count += 1; + if let Some(env) = envelope { + actual_roots.push(env.beacon_block_root()); } else { break; } @@ -2198,7 +2198,7 @@ async fn test_payload_envelopes_by_range() { panic!("unexpected message {:?}", next); } } - assert_eq!(expected_count, actual_count); + assert_eq!(expected_roots, actual_roots); } #[tokio::test] @@ -2226,7 +2226,7 @@ async fn test_payload_envelopes_by_root() { let roots = RuntimeVariableList::new(vec![block_root], 1).unwrap(); rig.enqueue_payload_envelopes_by_root_request(roots); - let mut actual_count = 0; + let mut actual_roots = Vec::new(); while let Some(next) = rig.network_rx.recv().await { if let NetworkMessage::SendResponse { peer_id: _, @@ -2234,8 +2234,8 @@ async fn test_payload_envelopes_by_root() { inbound_request_id: _, } = next { - if envelope.is_some() { - actual_count += 1; + if let Some(env) = envelope { + actual_roots.push(env.beacon_block_root()); } else { break; } @@ -2243,7 +2243,7 @@ async fn test_payload_envelopes_by_root() { panic!("unexpected message {:?}", next); } } - assert_eq!(1, actual_count); + assert_eq!(vec![block_root], actual_roots); } #[tokio::test] diff --git a/common/task_executor/src/lib.rs b/common/task_executor/src/lib.rs index d3d862f96c..25cf7acb41 100644 --- a/common/task_executor/src/lib.rs +++ b/common/task_executor/src/lib.rs @@ -12,6 +12,26 @@ use crate::rayon_pool_provider::RayonPoolProvider; pub use crate::rayon_pool_provider::RayonPoolType; pub use tokio::task::JoinHandle; +/// Error type for spawning a blocking task and awaiting its result. +#[derive(Debug)] +pub enum SpawnBlockingError { + /// The runtime is shutting down. + RuntimeShutdown, + /// The blocking task failed (e.g. due to a panic). + JoinError(tokio::task::JoinError), +} + +impl std::fmt::Display for SpawnBlockingError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SpawnBlockingError::RuntimeShutdown => write!(f, "runtime shutdown"), + SpawnBlockingError::JoinError(e) => write!(f, "join error: {e}"), + } + } +} + +impl std::error::Error for SpawnBlockingError {} + /// Provides a reason when Lighthouse is shut down. #[derive(Copy, Clone, Debug, PartialEq)] pub enum ShutdownReason { @@ -343,6 +363,26 @@ impl TaskExecutor { Some(future) } + /// Spawn a blocking task and await its result. + /// + /// Maps the `Option` (runtime shutdown) and `tokio::JoinError` into a single + /// `SpawnBlockingError`. + pub async fn spawn_blocking_and_await( + &self, + task: F, + name: &'static str, + ) -> Result + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let handle = self + .spawn_blocking_handle(task, name) + .ok_or(SpawnBlockingError::RuntimeShutdown)?; + + handle.await.map_err(SpawnBlockingError::JoinError) + } + /// Block the current (non-async) thread on the completion of some future. /// /// ## Warning diff --git a/consensus/types/src/core/chain_spec.rs b/consensus/types/src/core/chain_spec.rs index 90253c12aa..2f3b5da956 100644 --- a/consensus/types/src/core/chain_spec.rs +++ b/consensus/types/src/core/chain_spec.rs @@ -306,6 +306,7 @@ pub struct ChainSpec { pub max_blocks_by_root_request_deneb: usize, pub max_blobs_by_root_request: usize, pub max_data_columns_by_root_request: usize, + pub max_payload_envelopes_by_root_request: usize, /* * Application params @@ -969,6 +970,8 @@ impl ChainSpec { max_blobs_by_root_request_common(self.max_request_blob_sidecars); self.max_data_columns_by_root_request = max_data_columns_by_root_request_common::(self.max_request_blocks_deneb); + self.max_payload_envelopes_by_root_request = + max_blocks_by_root_request_common(self.max_request_payloads); self } @@ -1299,6 +1302,7 @@ impl ChainSpec { min_epochs_for_data_column_sidecars_requests: default_min_epochs_for_data_column_sidecars_requests(), max_data_columns_by_root_request: default_data_columns_by_root_request(), + max_payload_envelopes_by_root_request: default_max_payload_envelopes_by_root_request(), /* * Application specific @@ -1629,7 +1633,7 @@ impl ChainSpec { builder_payment_threshold_denominator: 10, min_builder_withdrawability_delay: Epoch::new(4096), max_request_payloads: 128, - + /* * Network specific */ @@ -1685,6 +1689,7 @@ impl ChainSpec { min_epochs_for_data_column_sidecars_requests: default_min_epochs_for_data_column_sidecars_requests(), max_data_columns_by_root_request: default_data_columns_by_root_request(), + max_payload_envelopes_by_root_request: default_max_payload_envelopes_by_root_request(), /* * Application specific @@ -2349,6 +2354,14 @@ fn default_data_columns_by_root_request() -> usize { max_data_columns_by_root_request_common::(default_max_request_blocks_deneb()) } +fn default_max_payload_envelopes_by_root_request() -> usize { + max_blocks_by_root_request_common(default_max_request_payloads()) +} + +fn default_max_request_payloads() -> u64 { + 128 +} + impl Default for Config { fn default() -> Self { let chain_spec = MainnetEthSpec::default_spec(); diff --git a/consensus/types/src/execution/execution_payload_envelope.rs b/consensus/types/src/execution/execution_payload_envelope.rs index 7f68dae037..64afaa8655 100644 --- a/consensus/types/src/execution/execution_payload_envelope.rs +++ b/consensus/types/src/execution/execution_payload_envelope.rs @@ -8,7 +8,9 @@ use ssz_derive::{Decode, Encode}; use test_random_derive::TestRandom; use tree_hash_derive::TreeHash; -#[derive(Debug, Clone, Serialize, Encode, Decode, Deserialize, TestRandom, TreeHash, Educe)] +#[derive( + Debug, Default, Clone, Serialize, Encode, Decode, Deserialize, TestRandom, TreeHash, Educe, +)] #[educe(PartialEq, Hash(bound(E: EthSpec)))] #[context_deserialize(ForkName)] #[serde(bound = "E: EthSpec")] diff --git a/consensus/types/src/execution/signed_execution_payload_envelope.rs b/consensus/types/src/execution/signed_execution_payload_envelope.rs index b1d949f863..65c657e878 100644 --- a/consensus/types/src/execution/signed_execution_payload_envelope.rs +++ b/consensus/types/src/execution/signed_execution_payload_envelope.rs @@ -8,6 +8,7 @@ use bls::{PublicKey, Signature}; use context_deserialize::context_deserialize; use educe::Educe; use serde::{Deserialize, Serialize}; +use ssz::{BYTES_PER_LENGTH_OFFSET, Encode as SszEncode}; use ssz_derive::{Decode, Encode}; use test_random_derive::TestRandom; use tree_hash_derive::TreeHash; @@ -22,6 +23,36 @@ pub struct SignedExecutionPayloadEnvelope { } impl SignedExecutionPayloadEnvelope { + /// Returns the minimum SSZ-encoded size (all variable-length fields empty). + pub fn min_size() -> usize { + Self { + message: ExecutionPayloadEnvelope::default(), + signature: Signature::empty(), + } + .as_ssz_bytes() + .len() + } + + /// Returns the maximum SSZ-encoded size. + #[allow(clippy::arithmetic_side_effects)] + pub fn max_size() -> usize { + // Start from the min size (all variable-length fields empty) + Self::min_size() + // ExecutionPayloadGloas variable-length fields: + + (E::max_extra_data_bytes() * ::ssz_fixed_len()) + + (E::max_transactions_per_payload() + * (BYTES_PER_LENGTH_OFFSET + E::max_bytes_per_transaction())) + + (E::max_withdrawals_per_payload() + * ::ssz_fixed_len()) + // ExecutionRequests variable-length fields: + + (E::max_deposit_requests_per_payload() + * ::ssz_fixed_len()) + + (E::max_withdrawal_requests_per_payload() + * ::ssz_fixed_len()) + + (E::max_consolidation_requests_per_payload() + * ::ssz_fixed_len()) + } + pub fn slot(&self) -> Slot { self.message.slot } From 81e105d90f9f422a53b26178926ef40d78dd8e63 Mon Sep 17 00:00:00 2001 From: Eitan Seri- Levi Date: Tue, 3 Mar 2026 18:57:13 -0800 Subject: [PATCH 13/13] more fixes --- .../lighthouse_network/src/rpc/codec.rs | 52 +++++-- .../lighthouse_network/src/rpc/protocol.rs | 34 ++++- consensus/types/src/block/beacon_block.rs | 142 +++++++++++++++++- 3 files changed, 205 insertions(+), 23 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/codec.rs b/beacon_node/lighthouse_network/src/rpc/codec.rs index ceb7090b21..10156a9ff5 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec.rs @@ -668,16 +668,48 @@ fn handle_rpc_response( SupportedProtocol::BlocksByRootV1 => Ok(Some(RpcSuccessResponse::BlocksByRoot(Arc::new( SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?), )))), - SupportedProtocol::PayloadEnvelopesByRangeV1 => { - Ok(Some(RpcSuccessResponse::PayloadEnvelopesByRange(Arc::new( - SignedExecutionPayloadEnvelope::from_ssz_bytes(decoded_buffer)?, - )))) - } - SupportedProtocol::PayloadEnvelopesByRootV1 => { - Ok(Some(RpcSuccessResponse::PayloadEnvelopesByRoot(Arc::new( - SignedExecutionPayloadEnvelope::from_ssz_bytes(decoded_buffer)?, - )))) - } + SupportedProtocol::PayloadEnvelopesByRangeV1 => match fork_name { + Some(fork_name) => { + if fork_name.gloas_enabled() { + Ok(Some(RpcSuccessResponse::PayloadEnvelopesByRange(Arc::new( + SignedExecutionPayloadEnvelope::from_ssz_bytes(decoded_buffer)?, + )))) + } else { + Err(RPCError::ErrorResponse( + RpcErrorResponse::InvalidRequest, + "Invalid fork name for payload envelopes by range".to_string(), + )) + } + } + None => Err(RPCError::ErrorResponse( + RpcErrorResponse::InvalidRequest, + format!( + "No context bytes provided for {:?} response", + versioned_protocol + ), + )), + }, + SupportedProtocol::PayloadEnvelopesByRootV1 => match fork_name { + Some(fork_name) => { + if fork_name.gloas_enabled() { + Ok(Some(RpcSuccessResponse::PayloadEnvelopesByRoot(Arc::new( + SignedExecutionPayloadEnvelope::from_ssz_bytes(decoded_buffer)?, + )))) + } else { + Err(RPCError::ErrorResponse( + RpcErrorResponse::InvalidRequest, + "Invalid fork name for payload envelopes by root".to_string(), + )) + } + } + None => Err(RPCError::ErrorResponse( + RpcErrorResponse::InvalidRequest, + format!( + "No context bytes provided for {:?} response", + versioned_protocol + ), + )), + }, SupportedProtocol::BlobsByRangeV1 => match fork_name { Some(fork_name) => { if fork_name.deneb_enabled() { diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 74c9ddef9c..14bf2415c4 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -17,10 +17,10 @@ use tokio_util::{ compat::{Compat, FuturesAsyncReadCompatExt}, }; use types::{ - BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BlobSidecar, ChainSpec, DataColumnSidecarFulu, - DataColumnSidecarGloas, EmptyBlock, Epoch, EthSpec, EthSpecId, ForkContext, ForkName, - LightClientBootstrap, LightClientBootstrapAltair, LightClientFinalityUpdate, - LightClientFinalityUpdateAltair, LightClientOptimisticUpdate, + BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockGloas, BlobSidecar, ChainSpec, + DataColumnSidecarFulu, DataColumnSidecarGloas, EmptyBlock, Epoch, EthSpec, EthSpecId, + ForkContext, ForkName, LightClientBootstrap, LightClientBootstrapAltair, + LightClientFinalityUpdate, LightClientFinalityUpdateAltair, LightClientOptimisticUpdate, LightClientOptimisticUpdateAltair, LightClientUpdate, MainnetEthSpec, MinimalEthSpec, SignedBeaconBlock, SignedExecutionPayloadEnvelope, }; @@ -65,6 +65,17 @@ pub static SIGNED_BEACON_BLOCK_BELLATRIX_MAX: LazyLock = + types::ExecutionPayload::::max_execution_payload_bellatrix_size() // adding max size of execution payload (~16gb) + ssz::BYTES_PER_LENGTH_OFFSET); // Adding the additional ssz offset for the `ExecutionPayload` field +/// Gloas blocks no longer contain an execution payload (it's in the envelope), +/// so they are significantly smaller than Bellatrix+ blocks. +pub static SIGNED_BEACON_BLOCK_GLOAS_MAX: LazyLock = LazyLock::new(|| { + SignedBeaconBlock::::from_block( + BeaconBlock::Gloas(BeaconBlockGloas::full(&MainnetEthSpec::default_spec())), + Signature::empty(), + ) + .as_ssz_bytes() + .len() +}); + pub static SIGNED_EXECUTION_PAYLOAD_ENVELOPE_MIN: LazyLock = LazyLock::new(SignedExecutionPayloadEnvelope::::min_size); @@ -146,10 +157,19 @@ pub fn rpc_block_limits_by_fork(current_fork: ForkName) -> RpcLimits { ), // After the merge the max SSZ size of a block is absurdly big. The size is actually // bound by other constants, so here we default to the bellatrix's max value - _ => RpcLimits::new( + ForkName::Bellatrix + | ForkName::Capella + | ForkName::Deneb + | ForkName::Electra + | ForkName::Fulu => RpcLimits::new( *SIGNED_BEACON_BLOCK_BASE_MIN, // Base block is smaller than altair and bellatrix blocks *SIGNED_BEACON_BLOCK_BELLATRIX_MAX, // Bellatrix block is larger than base and altair blocks ), + // Gloas blocks no longer contain the execution payload, so they are much smaller + ForkName::Gloas => RpcLimits::new( + *SIGNED_BEACON_BLOCK_BASE_MIN, + *SIGNED_BEACON_BLOCK_GLOAS_MAX, + ), } } @@ -596,9 +616,7 @@ impl ProtocolId { Protocol::Goodbye => RpcLimits::new(0, 0), // Goodbye request has no response Protocol::BlocksByRange => rpc_block_limits_by_fork(fork_context.current_fork_name()), Protocol::BlocksByRoot => rpc_block_limits_by_fork(fork_context.current_fork_name()), - Protocol::PayloadEnvelopesByRange => { - rpc_block_limits_by_fork(fork_context.current_fork_name()) - } + Protocol::PayloadEnvelopesByRange => rpc_payload_limits(), Protocol::PayloadEnvelopesByRoot => rpc_payload_limits(), Protocol::BlobsByRange => rpc_blob_limits::(), Protocol::BlobsByRoot => rpc_blob_limits::(), diff --git a/consensus/types/src/block/beacon_block.rs b/consensus/types/src/block/beacon_block.rs index 5634d842b6..c5ce8586c4 100644 --- a/consensus/types/src/block/beacon_block.rs +++ b/consensus/types/src/block/beacon_block.rs @@ -15,8 +15,11 @@ use tree_hash_derive::TreeHash; use typenum::Unsigned; use crate::{ - SignedExecutionPayloadBid, - attestation::{AttestationBase, AttestationData, IndexedAttestationBase}, + Address, SignedBlsToExecutionChange, SignedExecutionPayloadBid, + attestation::{ + AttestationBase, AttestationData, AttestationElectra, IndexedAttestationBase, + IndexedAttestationElectra, PayloadAttestation, PayloadAttestationData, + }, block::{ BeaconBlockBodyAltair, BeaconBlockBodyBase, BeaconBlockBodyBellatrix, BeaconBlockBodyCapella, BeaconBlockBodyDeneb, BeaconBlockBodyElectra, BeaconBlockBodyFulu, @@ -26,12 +29,12 @@ use crate::{ core::{ChainSpec, Domain, Epoch, EthSpec, Graffiti, Hash256, SignedRoot, Slot}, deposit::{Deposit, DepositData}, execution::{ - AbstractExecPayload, BlindedPayload, Eth1Data, ExecutionPayload, ExecutionRequests, - FullPayload, + AbstractExecPayload, BlindedPayload, BlsToExecutionChange, Eth1Data, ExecutionPayload, + ExecutionRequests, FullPayload, }, exit::{SignedVoluntaryExit, VoluntaryExit}, fork::{Fork, ForkName, InconsistentFork, map_fork_name}, - slashing::{AttesterSlashingBase, ProposerSlashing}, + slashing::{AttesterSlashingBase, AttesterSlashingElectra, ProposerSlashing}, state::BeaconStateError, sync_committee::SyncAggregate, test_utils::TestRandom, @@ -724,6 +727,135 @@ impl> EmptyBlock for BeaconBlockGloa } } +impl> BeaconBlockGloas { + /// Returns a full Gloas block with all variable-length fields at max capacity. + /// Used for computing the maximum SSZ-encoded size. + pub fn full(spec: &ChainSpec) -> Self { + let header = BeaconBlockHeader { + slot: Slot::new(1), + proposer_index: 0, + parent_root: Hash256::zero(), + state_root: Hash256::zero(), + body_root: Hash256::zero(), + }; + + let signed_header = SignedBeaconBlockHeader { + message: header, + signature: Signature::empty(), + }; + + let indexed_attestation = IndexedAttestationElectra { + attesting_indices: VariableList::new(vec![0_u64; E::MaxValidatorsPerSlot::to_usize()]) + .unwrap(), + data: AttestationData::default(), + signature: AggregateSignature::empty(), + }; + + let proposer_slashing = ProposerSlashing { + signed_header_1: signed_header.clone(), + signed_header_2: signed_header, + }; + + let attester_slashing = AttesterSlashingElectra { + attestation_1: indexed_attestation.clone(), + attestation_2: indexed_attestation, + }; + + let attestation = AttestationElectra { + aggregation_bits: BitList::with_capacity(E::MaxValidatorsPerSlot::to_usize()).unwrap(), + data: AttestationData::default(), + signature: AggregateSignature::empty(), + committee_bits: BitVector::default(), + }; + + let deposit_data = DepositData { + pubkey: PublicKeyBytes::empty(), + withdrawal_credentials: Hash256::zero(), + amount: 0, + signature: SignatureBytes::empty(), + }; + + let deposit = Deposit { + proof: FixedVector::from_elem(Hash256::zero()), + data: deposit_data, + }; + + let signed_voluntary_exit = SignedVoluntaryExit { + message: VoluntaryExit { + epoch: Epoch::new(1), + validator_index: 1, + }, + signature: Signature::empty(), + }; + + let signed_bls_to_execution_change = SignedBlsToExecutionChange { + message: BlsToExecutionChange { + validator_index: 0, + from_bls_pubkey: PublicKeyBytes::empty(), + to_execution_address: Address::zero(), + }, + signature: Signature::empty(), + }; + + let payload_attestation = PayloadAttestation { + aggregation_bits: BitVector::default(), + data: PayloadAttestationData { + beacon_block_root: Hash256::zero(), + slot: Slot::new(0), + payload_present: false, + blob_data_available: false, + }, + signature: AggregateSignature::empty(), + }; + + let mut block = BeaconBlockGloas::::empty(spec); + + for _ in 0..E::MaxProposerSlashings::to_usize() { + block + .body + .proposer_slashings + .push(proposer_slashing.clone()) + .unwrap(); + } + for _ in 0..E::max_attester_slashings_electra() { + block + .body + .attester_slashings + .push(attester_slashing.clone()) + .unwrap(); + } + for _ in 0..E::max_attestations_electra() { + block.body.attestations.push(attestation.clone()).unwrap(); + } + for _ in 0..E::MaxDeposits::to_usize() { + block.body.deposits.push(deposit.clone()).unwrap(); + } + for _ in 0..E::MaxVoluntaryExits::to_usize() { + block + .body + .voluntary_exits + .push(signed_voluntary_exit.clone()) + .unwrap(); + } + for _ in 0..E::MaxBlsToExecutionChanges::to_usize() { + block + .body + .bls_to_execution_changes + .push(signed_bls_to_execution_change.clone()) + .unwrap(); + } + for _ in 0..E::MaxPayloadAttestations::to_usize() { + block + .body + .payload_attestations + .push(payload_attestation.clone()) + .unwrap(); + } + + block + } +} + // TODO(EIP-7732) Mark's branch had the following implementation but not sure if it's needed so will just add header below for reference // impl> BeaconBlockEIP7732 {