From ffc2b97699dff59c91dc07375e090e3473508240 Mon Sep 17 00:00:00 2001 From: Eitan Seri- Levi Date: Tue, 24 Feb 2026 00:55:29 -0800 Subject: [PATCH] Serve rpc by range and by root: --- beacon_node/beacon_chain/src/beacon_chain.rs | 53 ++++ .../execution_payload_envelope_streamer.rs | 138 +++++++++ beacon_node/beacon_chain/src/lib.rs | 1 + beacon_node/beacon_processor/src/lib.rs | 31 +- .../src/scheduler/work_queue.rs | 12 + .../src/peer_manager/mod.rs | 6 + .../lighthouse_network/src/rpc/codec.rs | 34 +++ .../lighthouse_network/src/rpc/config.rs | 28 ++ .../lighthouse_network/src/rpc/methods.rs | 70 ++++- .../lighthouse_network/src/rpc/protocol.rs | 51 ++++ .../src/rpc/rate_limiter.rs | 38 ++- .../src/service/api_types.rs | 15 + .../lighthouse_network/src/service/mod.rs | 38 +++ .../src/network_beacon_processor/mod.rs | 43 ++- .../network_beacon_processor/rpc_methods.rs | 284 ++++++++++++++++++ .../src/network_beacon_processor/tests.rs | 254 +++++++++++++++- beacon_node/network/src/router.rs | 23 ++ beacon_node/store/src/hot_cold_store.rs | 22 ++ beacon_node/store/src/lib.rs | 7 + 19 files changed, 1140 insertions(+), 8 deletions(-) create mode 100644 beacon_node/beacon_chain/src/execution_payload_envelope_streamer.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 26ad2e714b..6b773ac03a 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -30,6 +30,7 @@ use crate::early_attester_cache::EarlyAttesterCache; use crate::errors::{BeaconChainError as Error, BlockProductionError}; use crate::events::ServerSentEventHandler; use crate::execution_payload::{NotifyExecutionLayer, PreparePayloadHandle, get_execution_payload}; +use crate::execution_payload_envelope_streamer::PayloadEnvelopeStreamer; use crate::fetch_blobs::EngineGetBlobsOutput; use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx}; use crate::graffiti_calculator::{GraffitiCalculator, GraffitiSettings}; @@ -1125,6 +1126,58 @@ impl BeaconChain { .map_or_else(|| self.get_blobs(block_root), Ok) } + /// Returns the execution payload envelopes at the given roots, if any. + /// + /// Will also check any associated caches. The expected use for this function is *only* for returning blocks requested + /// from P2P peers. + /// + /// ## Errors + /// + /// May return a database error. + #[allow(clippy::type_complexity)] + pub fn get_payload_envelopes_checking_caches( + self: &Arc, + block_roots: Vec, + ) -> Result< + impl Stream< + Item = ( + Hash256, + Arc>>, Error>>, + ), + >, + Error, + > { + Ok(PayloadEnvelopeStreamer::::new( + self.execution_layer.clone(), + self.store.clone(), + self.task_executor.clone(), + CheckCaches::Yes, + )? + .launch_stream(block_roots)) + } + + #[allow(clippy::type_complexity)] + pub fn get_payload_envelopes( + self: &Arc, + block_roots: Vec, + ) -> Result< + impl Stream< + Item = ( + Hash256, + Arc>>, Error>>, + ), + >, + Error, + > { + Ok(PayloadEnvelopeStreamer::::new( + self.execution_layer.clone(), + self.store.clone(), + self.task_executor.clone(), + CheckCaches::No, + )? + .launch_stream(block_roots)) + } + pub fn get_data_columns_checking_all_caches( &self, block_root: Hash256, diff --git a/beacon_node/beacon_chain/src/execution_payload_envelope_streamer.rs b/beacon_node/beacon_chain/src/execution_payload_envelope_streamer.rs new file mode 100644 index 0000000000..e6522d7beb --- /dev/null +++ b/beacon_node/beacon_chain/src/execution_payload_envelope_streamer.rs @@ -0,0 +1,138 @@ +use std::sync::Arc; + +use bls::Hash256; +use execution_layer::ExecutionLayer; +use futures::Stream; +use task_executor::TaskExecutor; +use tokio::sync::mpsc::{self, UnboundedSender}; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tracing::debug; +use types::{EthSpec, SignedExecutionPayloadEnvelope}; + +use crate::{BeaconChainError, BeaconChainTypes, BeaconStore, beacon_block_streamer::CheckCaches}; + +type PayloadEnvelopeResult = + Result>>, BeaconChainError>; + +pub struct PayloadEnvelopeStreamer { + execution_layer: ExecutionLayer, + store: BeaconStore, + task_executor: TaskExecutor, + _check_caches: CheckCaches, +} + +// TODO(gloas) eventually we'll need to expand this to support loading blinded payload envelopes from the dsb +// and fetching the execution payload from the EL. See BlockStreamer impl as an example +impl PayloadEnvelopeStreamer { + pub fn new( + execution_layer_opt: Option>, + store: BeaconStore, + task_executor: TaskExecutor, + check_caches: CheckCaches, + ) -> Result, BeaconChainError> { + let execution_layer = execution_layer_opt + .as_ref() + .ok_or(BeaconChainError::ExecutionLayerMissing)? + .clone(); + + Ok(Arc::new(Self { + execution_layer, + store, + task_executor, + _check_caches: check_caches, + })) + } + + // TODO(gloas) simply a strub impl for now. Should check some exec payload envelope cache + // and return the envelope if it exists in the cache + fn check_payload_envelope_cache( + &self, + _beacon_block_root: Hash256, + ) -> Option>> { + // if self.check_caches == CheckCaches::Yes + None + } + + // used when the execution engine doesn't support the payload bodies methods + async fn stream_payload_envelopes_fallback( + self: Arc, + beacon_block_roots: Vec, + sender: UnboundedSender<(Hash256, Arc>)>, + ) { + debug!("Using slower fallback method of eth_getBlockByHash()"); + for beacon_block_root in beacon_block_roots { + let cached_envelope = self.check_payload_envelope_cache(beacon_block_root); + + let envelope_result = if cached_envelope.is_some() { + Ok(cached_envelope) + } else { + // TODO(gloas) we'll want to use the execution layer directly to call + // the engine api method eth_getBlockByHash() + self.store + .get_payload_envelope(&beacon_block_root) + .map(|opt_envelope| opt_envelope.map(Arc::new)) + .map_err(BeaconChainError::DBError) + }; + + if sender + .send((beacon_block_root, Arc::new(envelope_result))) + .is_err() + { + break; + } + } + } + + pub async fn stream( + self: Arc, + beacon_block_roots: Vec, + sender: UnboundedSender<(Hash256, Arc>)>, + ) { + match self + .execution_layer + .get_engine_capabilities(None) + .await + .map_err(Box::new) + .map_err(BeaconChainError::EngineGetCapabilititesFailed) + { + Ok(_engine_capabilities) => { + // TODO(gloas) should check engine capabilities for get_payload_bodies_by_range_v1 + self.stream_payload_envelopes_fallback(beacon_block_roots, sender) + .await; + } + Err(e) => { + send_errors(beacon_block_roots, sender, e).await; + } + } + } + + pub fn launch_stream( + self: Arc, + beacon_block_roots: Vec, + ) -> impl Stream>)> { + let (envelope_tx, envelope_rx) = mpsc::unbounded_channel(); + debug!( + envelopes = beacon_block_roots.len(), + "Launching a PayloadEnvelopeStreamer" + ); + let executor = self.task_executor.clone(); + executor.spawn( + self.stream(beacon_block_roots, envelope_tx), + "get_payload_envelopes_sender", + ); + UnboundedReceiverStream::new(envelope_rx) + } +} + +async fn send_errors( + beacon_block_roots: Vec, + sender: UnboundedSender<(Hash256, Arc>)>, + beacon_chain_error: BeaconChainError, +) { + let result = Arc::new(Err(beacon_chain_error)); + for beacon_block_root in beacon_block_roots { + if sender.send((beacon_block_root, result.clone())).is_err() { + break; + } + } +} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 3b03395a66..d7253f7969 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -24,6 +24,7 @@ mod early_attester_cache; mod errors; pub mod events; pub mod execution_payload; +pub mod execution_payload_envelope_streamer; pub mod fetch_blobs; pub mod fork_choice_signal; pub mod fork_revert; diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 33a00bfa49..8a672e066d 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -408,6 +408,8 @@ pub enum Work { Status(BlockingFn), BlocksByRangeRequest(AsyncFn), BlocksByRootsRequest(AsyncFn), + PayloadEnvelopesByRangeRequest(AsyncFn), + PayloadEnvelopesByRootRequest(AsyncFn), BlobsByRangeRequest(BlockingFn), BlobsByRootsRequest(BlockingFn), DataColumnsByRootsRequest(BlockingFn), @@ -464,6 +466,8 @@ pub enum WorkType { Status, BlocksByRangeRequest, BlocksByRootsRequest, + PayloadEnvelopesByRangeRequest, + PayloadEnvelopesByRootRequest, BlobsByRangeRequest, BlobsByRootsRequest, DataColumnsByRootsRequest, @@ -522,6 +526,8 @@ impl Work { Work::Status(_) => WorkType::Status, Work::BlocksByRangeRequest(_) => WorkType::BlocksByRangeRequest, Work::BlocksByRootsRequest(_) => WorkType::BlocksByRootsRequest, + Work::PayloadEnvelopesByRangeRequest(_) => WorkType::PayloadEnvelopesByRangeRequest, + Work::PayloadEnvelopesByRootRequest(_) => WorkType::PayloadEnvelopesByRootRequest, Work::BlobsByRangeRequest(_) => WorkType::BlobsByRangeRequest, Work::BlobsByRootsRequest(_) => WorkType::BlobsByRootsRequest, Work::DataColumnsByRootsRequest(_) => WorkType::DataColumnsByRootsRequest, @@ -969,6 +975,12 @@ impl BeaconProcessor { Some(item) } else if let Some(item) = work_queues.dcbrange_queue.pop() { Some(item) + } else if let Some(item) = work_queues.payload_envelopes_brange_queue.pop() + { + Some(item) + } else if let Some(item) = work_queues.payload_envelopes_broots_queue.pop() + { + Some(item) // Check slashings after all other consensus messages so we prioritize // following head. // @@ -1155,6 +1167,12 @@ impl BeaconProcessor { Work::BlocksByRootsRequest { .. } => { work_queues.block_broots_queue.push(work, work_id) } + Work::PayloadEnvelopesByRangeRequest { .. } => work_queues + .payload_envelopes_brange_queue + .push(work, work_id), + Work::PayloadEnvelopesByRootRequest { .. } => work_queues + .payload_envelopes_broots_queue + .push(work, work_id), Work::BlobsByRangeRequest { .. } => { work_queues.blob_brange_queue.push(work, work_id) } @@ -1270,6 +1288,12 @@ impl BeaconProcessor { WorkType::Status => work_queues.status_queue.len(), WorkType::BlocksByRangeRequest => work_queues.block_brange_queue.len(), WorkType::BlocksByRootsRequest => work_queues.block_broots_queue.len(), + WorkType::PayloadEnvelopesByRangeRequest => { + work_queues.payload_envelopes_brange_queue.len() + } + WorkType::PayloadEnvelopesByRootRequest => { + work_queues.payload_envelopes_broots_queue.len() + } WorkType::BlobsByRangeRequest => work_queues.blob_brange_queue.len(), WorkType::BlobsByRootsRequest => work_queues.blob_broots_queue.len(), WorkType::DataColumnsByRootsRequest => work_queues.dcbroots_queue.len(), @@ -1456,9 +1480,10 @@ impl BeaconProcessor { | Work::DataColumnsByRangeRequest(process_fn) => { task_spawner.spawn_blocking(process_fn) } - Work::BlocksByRangeRequest(work) | Work::BlocksByRootsRequest(work) => { - task_spawner.spawn_async(work) - } + Work::BlocksByRangeRequest(work) + | Work::BlocksByRootsRequest(work) + | Work::PayloadEnvelopesByRangeRequest(work) + | Work::PayloadEnvelopesByRootRequest(work) => task_spawner.spawn_async(work), Work::ChainSegmentBackfill(process_fn) => { if self.config.enable_backfill_rate_limiting { task_spawner.spawn_blocking_with_rayon(RayonPoolType::LowPriority, process_fn) diff --git a/beacon_node/beacon_processor/src/scheduler/work_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_queue.rs index 934659b304..0def1792e3 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_queue.rs @@ -134,6 +134,8 @@ pub struct BeaconProcessorQueueLengths { blob_brange_queue: usize, dcbroots_queue: usize, dcbrange_queue: usize, + payload_envelopes_brange_queue: usize, + payload_envelopes_broots_queue: usize, gossip_bls_to_execution_change_queue: usize, gossip_execution_payload_queue: usize, gossip_execution_payload_bid_queue: usize, @@ -204,6 +206,8 @@ impl BeaconProcessorQueueLengths { blob_brange_queue: 1024, dcbroots_queue: 1024, dcbrange_queue: 1024, + payload_envelopes_brange_queue: 1024, + payload_envelopes_broots_queue: 1024, gossip_bls_to_execution_change_queue: 16384, // TODO(EIP-7732): verify 1024 is preferable. I used same value as `gossip_block_queue` and `gossip_blob_queue` gossip_execution_payload_queue: 1024, @@ -253,6 +257,8 @@ pub struct WorkQueues { pub status_queue: FifoQueue>, pub block_brange_queue: FifoQueue>, pub block_broots_queue: FifoQueue>, + pub payload_envelopes_brange_queue: FifoQueue>, + pub payload_envelopes_broots_queue: FifoQueue>, pub blob_broots_queue: FifoQueue>, pub blob_brange_queue: FifoQueue>, pub dcbroots_queue: FifoQueue>, @@ -323,6 +329,10 @@ impl WorkQueues { let blob_brange_queue = FifoQueue::new(queue_lengths.blob_brange_queue); let dcbroots_queue = FifoQueue::new(queue_lengths.dcbroots_queue); let dcbrange_queue = FifoQueue::new(queue_lengths.dcbrange_queue); + let payload_envelopes_brange_queue = + FifoQueue::new(queue_lengths.payload_envelopes_brange_queue); + let payload_envelopes_broots_queue = + FifoQueue::new(queue_lengths.payload_envelopes_broots_queue); let gossip_bls_to_execution_change_queue = FifoQueue::new(queue_lengths.gossip_bls_to_execution_change_queue); @@ -382,6 +392,8 @@ impl WorkQueues { blob_brange_queue, dcbroots_queue, dcbrange_queue, + payload_envelopes_brange_queue, + payload_envelopes_broots_queue, gossip_bls_to_execution_change_queue, gossip_execution_payload_queue, gossip_execution_payload_bid_queue, diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 43a44c85fc..2edd9de2d9 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -590,6 +590,8 @@ impl PeerManager { Protocol::BlocksByRange => PeerAction::MidToleranceError, Protocol::BlocksByRoot => PeerAction::MidToleranceError, Protocol::BlobsByRange => PeerAction::MidToleranceError, + Protocol::PayloadEnvelopesByRange => PeerAction::MidToleranceError, + Protocol::PayloadEnvelopesByRoot => PeerAction::MidToleranceError, // Lighthouse does not currently make light client requests; therefore, this // is an unexpected scenario. We do not ban the peer for rate limiting. Protocol::LightClientBootstrap => return, @@ -615,6 +617,8 @@ impl PeerManager { Protocol::Ping => PeerAction::Fatal, Protocol::BlocksByRange => return, Protocol::BlocksByRoot => return, + Protocol::PayloadEnvelopesByRange => return, + Protocol::PayloadEnvelopesByRoot => return, Protocol::BlobsByRange => return, Protocol::BlobsByRoot => return, Protocol::DataColumnsByRoot => return, @@ -638,6 +642,8 @@ impl PeerManager { Protocol::Ping => PeerAction::LowToleranceError, Protocol::BlocksByRange => PeerAction::MidToleranceError, Protocol::BlocksByRoot => PeerAction::MidToleranceError, + Protocol::PayloadEnvelopesByRange => PeerAction::MidToleranceError, + Protocol::PayloadEnvelopesByRoot => PeerAction::MidToleranceError, Protocol::BlobsByRange => PeerAction::MidToleranceError, Protocol::BlobsByRoot => PeerAction::MidToleranceError, Protocol::DataColumnsByRoot => PeerAction::MidToleranceError, diff --git a/beacon_node/lighthouse_network/src/rpc/codec.rs b/beacon_node/lighthouse_network/src/rpc/codec.rs index d1a3182fad..ea615452da 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec.rs @@ -15,6 +15,7 @@ use std::io::{Read, Write}; use std::marker::PhantomData; use std::sync::Arc; use tokio_util::codec::{Decoder, Encoder}; +use types::SignedExecutionPayloadEnvelope; use types::{ BlobSidecar, ChainSpec, DataColumnSidecar, DataColumnsByRootIdentifier, EthSpec, ForkContext, ForkName, Hash256, LightClientBootstrap, LightClientFinalityUpdate, @@ -76,6 +77,8 @@ impl SSZSnappyInboundCodec { }, RpcSuccessResponse::BlocksByRange(res) => res.as_ssz_bytes(), RpcSuccessResponse::BlocksByRoot(res) => res.as_ssz_bytes(), + RpcSuccessResponse::PayloadEnvelopesbyRange(res) => res.as_ssz_bytes(), + RpcSuccessResponse::PayloadEnvelopesByRoot(res) => res.as_ssz_bytes(), RpcSuccessResponse::BlobsByRange(res) => res.as_ssz_bytes(), RpcSuccessResponse::BlobsByRoot(res) => res.as_ssz_bytes(), RpcSuccessResponse::DataColumnsByRoot(res) => res.as_ssz_bytes(), @@ -356,6 +359,8 @@ impl Encoder> for SSZSnappyOutboundCodec { BlocksByRootRequest::V1(req) => req.block_roots.as_ssz_bytes(), BlocksByRootRequest::V2(req) => req.block_roots.as_ssz_bytes(), }, + RequestType::PayloadEnvelopesByRange(req) => req.as_ssz_bytes(), + RequestType::PayloadEnvelopesByRoot(req) => req.beacon_block_roots.as_ssz_bytes(), RequestType::BlobsByRange(req) => req.as_ssz_bytes(), RequestType::BlobsByRoot(req) => req.blob_ids.as_ssz_bytes(), RequestType::DataColumnsByRange(req) => req.as_ssz_bytes(), @@ -548,6 +553,19 @@ fn handle_rpc_request( )?, }), ))), + SupportedProtocol::PayloadEnvelopesByRangeV1 => { + Ok(Some(RequestType::PayloadEnvelopesByRange( + PayloadEnvelopesByRangeRequest::from_ssz_bytes(decoded_buffer)?, + ))) + } + SupportedProtocol::PayloadEnvelopesByRootV1 => Ok(Some( + RequestType::PayloadEnvelopesByRoot(PayloadEnvelopesByRootRequest { + beacon_block_roots: RuntimeVariableList::from_ssz_bytes( + decoded_buffer, + spec.max_request_blocks(current_fork), + )?, + }), + )), SupportedProtocol::BlobsByRangeV1 => Ok(Some(RequestType::BlobsByRange( BlobsByRangeRequest::from_ssz_bytes(decoded_buffer)?, ))), @@ -650,6 +668,16 @@ fn handle_rpc_response( SupportedProtocol::BlocksByRootV1 => Ok(Some(RpcSuccessResponse::BlocksByRoot(Arc::new( SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?), )))), + SupportedProtocol::PayloadEnvelopesByRangeV1 => { + Ok(Some(RpcSuccessResponse::PayloadEnvelopesbyRange(Arc::new( + SignedExecutionPayloadEnvelope::from_ssz_bytes(decoded_buffer)?, + )))) + } + SupportedProtocol::PayloadEnvelopesByRootV1 => { + Ok(Some(RpcSuccessResponse::PayloadEnvelopesByRoot(Arc::new( + SignedExecutionPayloadEnvelope::from_ssz_bytes(decoded_buffer)?, + )))) + } SupportedProtocol::BlobsByRangeV1 => match fork_name { Some(fork_name) => { if fork_name.deneb_enabled() { @@ -1260,6 +1288,12 @@ mod tests { RequestType::BlobsByRange(blbrange) => { assert_eq!(decoded, RequestType::BlobsByRange(blbrange)) } + RequestType::PayloadEnvelopesByRange(perange) => { + assert_eq!(decoded, RequestType::PayloadEnvelopesByRange(perange)) + } + RequestType::PayloadEnvelopesByRoot(peroot) => { + assert_eq!(decoded, RequestType::PayloadEnvelopesByRoot(peroot)) + } RequestType::BlobsByRoot(bbroot) => { assert_eq!(decoded, RequestType::BlobsByRoot(bbroot)) } diff --git a/beacon_node/lighthouse_network/src/rpc/config.rs b/beacon_node/lighthouse_network/src/rpc/config.rs index b0ee6fea64..9e1c6541ec 100644 --- a/beacon_node/lighthouse_network/src/rpc/config.rs +++ b/beacon_node/lighthouse_network/src/rpc/config.rs @@ -89,6 +89,8 @@ pub struct RateLimiterConfig { pub(super) goodbye_quota: Quota, pub(super) blocks_by_range_quota: Quota, pub(super) blocks_by_root_quota: Quota, + pub(super) payload_envelopes_by_range_quota: Quota, + pub(super) payload_envelopes_by_root_quota: Quota, pub(super) blobs_by_range_quota: Quota, pub(super) blobs_by_root_quota: Quota, pub(super) data_columns_by_root_quota: Quota, @@ -111,6 +113,10 @@ impl RateLimiterConfig { Quota::n_every(NonZeroU64::new(128).unwrap(), 10); pub const DEFAULT_BLOCKS_BY_ROOT_QUOTA: Quota = Quota::n_every(NonZeroU64::new(128).unwrap(), 10); + pub const DEFAULT_PAYLOAD_ENVELOPES_BY_RANGE_QUOTA: Quota = + Quota::n_every(NonZeroU64::new(128).unwrap(), 10); + pub const DEFAULT_PAYLOAD_ENVELOPES_BY_ROOT_QUOTA: Quota = + Quota::n_every(NonZeroU64::new(128).unwrap(), 10); // `DEFAULT_BLOCKS_BY_RANGE_QUOTA` * (target + 1) to account for high usage pub const DEFAULT_BLOBS_BY_RANGE_QUOTA: Quota = Quota::n_every(NonZeroU64::new(896).unwrap(), 10); @@ -137,6 +143,8 @@ impl Default for RateLimiterConfig { goodbye_quota: Self::DEFAULT_GOODBYE_QUOTA, blocks_by_range_quota: Self::DEFAULT_BLOCKS_BY_RANGE_QUOTA, blocks_by_root_quota: Self::DEFAULT_BLOCKS_BY_ROOT_QUOTA, + payload_envelopes_by_range_quota: Self::DEFAULT_PAYLOAD_ENVELOPES_BY_RANGE_QUOTA, + payload_envelopes_by_root_quota: Self::DEFAULT_PAYLOAD_ENVELOPES_BY_ROOT_QUOTA, blobs_by_range_quota: Self::DEFAULT_BLOBS_BY_RANGE_QUOTA, blobs_by_root_quota: Self::DEFAULT_BLOBS_BY_ROOT_QUOTA, data_columns_by_root_quota: Self::DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA, @@ -169,6 +177,14 @@ impl Debug for RateLimiterConfig { .field("goodbye", fmt_q!(&self.goodbye_quota)) .field("blocks_by_range", fmt_q!(&self.blocks_by_range_quota)) .field("blocks_by_root", fmt_q!(&self.blocks_by_root_quota)) + .field( + "payload_envelopes_by_range", + fmt_q!(&self.payload_envelopes_by_range_quota), + ) + .field( + "payload_envelopes_by_root", + fmt_q!(&self.payload_envelopes_by_root_quota), + ) .field("blobs_by_range", fmt_q!(&self.blobs_by_range_quota)) .field("blobs_by_root", fmt_q!(&self.blobs_by_root_quota)) .field( @@ -197,6 +213,8 @@ impl FromStr for RateLimiterConfig { let mut goodbye_quota = None; let mut blocks_by_range_quota = None; let mut blocks_by_root_quota = None; + let mut payload_envelopes_by_range_quota = None; + let mut payload_envelopes_by_root_quota = None; let mut blobs_by_range_quota = None; let mut blobs_by_root_quota = None; let mut data_columns_by_root_quota = None; @@ -214,6 +232,12 @@ impl FromStr for RateLimiterConfig { Protocol::Goodbye => goodbye_quota = goodbye_quota.or(quota), Protocol::BlocksByRange => blocks_by_range_quota = blocks_by_range_quota.or(quota), Protocol::BlocksByRoot => blocks_by_root_quota = blocks_by_root_quota.or(quota), + Protocol::PayloadEnvelopesByRange => { + payload_envelopes_by_range_quota = payload_envelopes_by_range_quota.or(quota) + } + Protocol::PayloadEnvelopesByRoot => { + payload_envelopes_by_root_quota = payload_envelopes_by_root_quota.or(quota) + } Protocol::BlobsByRange => blobs_by_range_quota = blobs_by_range_quota.or(quota), Protocol::BlobsByRoot => blobs_by_root_quota = blobs_by_root_quota.or(quota), Protocol::DataColumnsByRoot => { @@ -250,6 +274,10 @@ impl FromStr for RateLimiterConfig { .unwrap_or(Self::DEFAULT_BLOCKS_BY_RANGE_QUOTA), blocks_by_root_quota: blocks_by_root_quota .unwrap_or(Self::DEFAULT_BLOCKS_BY_ROOT_QUOTA), + payload_envelopes_by_range_quota: payload_envelopes_by_range_quota + .unwrap_or(Self::DEFAULT_PAYLOAD_ENVELOPES_BY_RANGE_QUOTA), + payload_envelopes_by_root_quota: payload_envelopes_by_root_quota + .unwrap_or(Self::DEFAULT_PAYLOAD_ENVELOPES_BY_ROOT_QUOTA), blobs_by_range_quota: blobs_by_range_quota .unwrap_or(Self::DEFAULT_BLOBS_BY_RANGE_QUOTA), blobs_by_root_quota: blobs_by_root_quota.unwrap_or(Self::DEFAULT_BLOBS_BY_ROOT_QUOTA), diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 5a9a683b75..a07ad8d183 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -17,7 +17,8 @@ use types::light_client::consts::MAX_REQUEST_LIGHT_CLIENT_UPDATES; use types::{ BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnsByRootIdentifier, Epoch, EthSpec, ForkContext, Hash256, LightClientBootstrap, LightClientFinalityUpdate, - LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock, Slot, + LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, Slot, }; /// Maximum length of error message. @@ -362,6 +363,16 @@ impl BlocksByRangeRequest { } } +/// Request a number of execution payload envelopes from a peer. +#[derive(Encode, Decode, Clone, Debug, PartialEq)] +pub struct PayloadEnvelopesByRangeRequest { + /// The starting slot to request execution payload envelopes. + pub start_slot: u64, + + /// The number of slots from the start slot. + pub count: u64, +} + /// Request a number of beacon blobs from a peer. #[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct BlobsByRangeRequest { @@ -505,6 +516,31 @@ impl BlocksByRootRequest { } } +/// Reqwuest a number of execution payload envelopes from a peer +#[derive(Clone, Debug, PartialEq)] +pub struct PayloadEnvelopesByRootRequest { + /// The list of beacon block roots used to request execution payload envelopes. + pub beacon_block_roots: RuntimeVariableList, +} + +impl PayloadEnvelopesByRootRequest { + pub fn new( + beacon_block_roots: Vec, + fork_context: &ForkContext, + ) -> Result { + let max_requests_envelopes = fork_context + .spec + .max_request_blocks(fork_context.current_fork_name()); + + let beacon_block_roots = + RuntimeVariableList::new(beacon_block_roots, max_requests_envelopes).map_err(|e| { + format!("ExecutionPayloadEnvelopesByRootRequest too many beacon block roots: {e:?}") + })?; + + Ok(Self { beacon_block_roots }) + } +} + /// Request a number of beacon blocks and blobs from a peer. #[derive(Clone, Debug, PartialEq)] pub struct BlobsByRootRequest { @@ -588,6 +624,13 @@ pub enum RpcSuccessResponse { /// A response to a get BLOCKS_BY_ROOT request. BlocksByRoot(Arc>), + /// A response to a get EXECUTION_PAYLOAD_ENVELOPES_BY_RANGE request. A None response signifies + /// the end of the batch. + PayloadEnvelopesbyRange(Arc>), + + /// A response to a get EXECUTION_PAYLOAD_ENVELOPES_BY_ROOT request. + PayloadEnvelopesByRoot(Arc>), + /// A response to a get BLOBS_BY_RANGE request BlobsByRange(Arc>), @@ -628,6 +671,12 @@ pub enum ResponseTermination { /// Blocks by root stream termination. BlocksByRoot, + /// Execution payload envelopes by range stream termination. + PayloadEnvelopesByRange, + + /// Execution payload envelopes by root stream termination. + PayloadEnvelopesByRoot, + /// Blobs by range stream termination. BlobsByRange, @@ -649,6 +698,8 @@ impl ResponseTermination { match self { ResponseTermination::BlocksByRange => Protocol::BlocksByRange, ResponseTermination::BlocksByRoot => Protocol::BlocksByRoot, + ResponseTermination::PayloadEnvelopesByRange => Protocol::PayloadEnvelopesByRange, + ResponseTermination::PayloadEnvelopesByRoot => Protocol::PayloadEnvelopesByRoot, ResponseTermination::BlobsByRange => Protocol::BlobsByRange, ResponseTermination::BlobsByRoot => Protocol::BlobsByRoot, ResponseTermination::DataColumnsByRoot => Protocol::DataColumnsByRoot, @@ -744,6 +795,8 @@ impl RpcSuccessResponse { RpcSuccessResponse::Status(_) => Protocol::Status, RpcSuccessResponse::BlocksByRange(_) => Protocol::BlocksByRange, RpcSuccessResponse::BlocksByRoot(_) => Protocol::BlocksByRoot, + RpcSuccessResponse::PayloadEnvelopesbyRange(_) => Protocol::PayloadEnvelopesByRange, + RpcSuccessResponse::PayloadEnvelopesByRoot(_) => Protocol::PayloadEnvelopesByRoot, RpcSuccessResponse::BlobsByRange(_) => Protocol::BlobsByRange, RpcSuccessResponse::BlobsByRoot(_) => Protocol::BlobsByRoot, RpcSuccessResponse::DataColumnsByRoot(_) => Protocol::DataColumnsByRoot, @@ -762,6 +815,7 @@ impl RpcSuccessResponse { pub fn slot(&self) -> Option { match self { Self::BlocksByRange(r) | Self::BlocksByRoot(r) => Some(r.slot()), + Self::PayloadEnvelopesByRoot(r) | Self::PayloadEnvelopesbyRange(r) => Some(r.slot()), Self::BlobsByRange(r) | Self::BlobsByRoot(r) => Some(r.slot()), Self::DataColumnsByRange(r) | Self::DataColumnsByRoot(r) => Some(r.slot()), Self::LightClientBootstrap(r) => Some(r.get_slot()), @@ -812,6 +866,20 @@ impl std::fmt::Display for RpcSuccessResponse { RpcSuccessResponse::BlocksByRoot(block) => { write!(f, "BlocksByRoot: Block slot: {}", block.slot()) } + RpcSuccessResponse::PayloadEnvelopesbyRange(envelope) => { + write!( + f, + "ExecutionPayloadEnvelopesByRange: Envelope slot: {}", + envelope.slot() + ) + } + RpcSuccessResponse::PayloadEnvelopesByRoot(envelope) => { + write!( + f, + "ExecutionPayloadEnvelopesByRoot: Envelope slot: {}", + envelope.slot() + ) + } RpcSuccessResponse::BlobsByRange(blob) => { write!(f, "BlobsByRange: Blob slot: {}", blob.slot()) } diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index b75ca72eda..dd9982056b 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -242,6 +242,12 @@ pub enum Protocol { /// The `BlobsByRange` protocol name. #[strum(serialize = "blob_sidecars_by_range")] BlobsByRange, + /// The `ExecutionPayloadEnvelopesByRoot` protocol name. + #[strum(serialize = "execution_payload_envelopes_by_root")] + PayloadEnvelopesByRoot, + /// The `ExecutionPayloadEnvelopesByRange` protocol name. + #[strum(serialize = "execution_payload_envelopes_by_range")] + PayloadEnvelopesByRange, /// The `BlobsByRoot` protocol name. #[strum(serialize = "blob_sidecars_by_root")] BlobsByRoot, @@ -277,6 +283,8 @@ impl Protocol { Protocol::Goodbye => None, Protocol::BlocksByRange => Some(ResponseTermination::BlocksByRange), Protocol::BlocksByRoot => Some(ResponseTermination::BlocksByRoot), + Protocol::PayloadEnvelopesByRange => Some(ResponseTermination::PayloadEnvelopesByRange), + Protocol::PayloadEnvelopesByRoot => Some(ResponseTermination::PayloadEnvelopesByRoot), Protocol::BlobsByRange => Some(ResponseTermination::BlobsByRange), Protocol::BlobsByRoot => Some(ResponseTermination::BlobsByRoot), Protocol::DataColumnsByRoot => Some(ResponseTermination::DataColumnsByRoot), @@ -307,6 +315,8 @@ pub enum SupportedProtocol { BlocksByRangeV2, BlocksByRootV1, BlocksByRootV2, + PayloadEnvelopesByRangeV1, + PayloadEnvelopesByRootV1, BlobsByRangeV1, BlobsByRootV1, DataColumnsByRootV1, @@ -329,6 +339,8 @@ impl SupportedProtocol { SupportedProtocol::GoodbyeV1 => "1", SupportedProtocol::BlocksByRangeV1 => "1", SupportedProtocol::BlocksByRangeV2 => "2", + SupportedProtocol::PayloadEnvelopesByRangeV1 => "1", + SupportedProtocol::PayloadEnvelopesByRootV1 => "1", SupportedProtocol::BlocksByRootV1 => "1", SupportedProtocol::BlocksByRootV2 => "2", SupportedProtocol::BlobsByRangeV1 => "1", @@ -355,6 +367,8 @@ impl SupportedProtocol { SupportedProtocol::BlocksByRangeV2 => Protocol::BlocksByRange, SupportedProtocol::BlocksByRootV1 => Protocol::BlocksByRoot, SupportedProtocol::BlocksByRootV2 => Protocol::BlocksByRoot, + SupportedProtocol::PayloadEnvelopesByRangeV1 => Protocol::PayloadEnvelopesByRange, + SupportedProtocol::PayloadEnvelopesByRootV1 => Protocol::PayloadEnvelopesByRoot, SupportedProtocol::BlobsByRangeV1 => Protocol::BlobsByRange, SupportedProtocol::BlobsByRootV1 => Protocol::BlobsByRoot, SupportedProtocol::DataColumnsByRootV1 => Protocol::DataColumnsByRoot, @@ -511,6 +525,11 @@ impl ProtocolId { ::ssz_fixed_len(), ), Protocol::BlocksByRoot => RpcLimits::new(0, spec.max_blocks_by_root_request), + Protocol::PayloadEnvelopesByRange => RpcLimits::new( + ::ssz_fixed_len(), + ::ssz_fixed_len(), + ), + Protocol::PayloadEnvelopesByRoot => RpcLimits::new(0, spec.max_blocks_by_root_request), Protocol::BlobsByRange => RpcLimits::new( ::ssz_fixed_len(), ::ssz_fixed_len(), @@ -549,6 +568,12 @@ impl ProtocolId { Protocol::Goodbye => RpcLimits::new(0, 0), // Goodbye request has no response Protocol::BlocksByRange => rpc_block_limits_by_fork(fork_context.current_fork_name()), Protocol::BlocksByRoot => rpc_block_limits_by_fork(fork_context.current_fork_name()), + Protocol::PayloadEnvelopesByRange => { + rpc_block_limits_by_fork(fork_context.current_fork_name()) + } + Protocol::PayloadEnvelopesByRoot => { + rpc_block_limits_by_fork(fork_context.current_fork_name()) + } Protocol::BlobsByRange => rpc_blob_limits::(), Protocol::BlobsByRoot => rpc_blob_limits::(), Protocol::DataColumnsByRoot => { @@ -586,6 +611,8 @@ impl ProtocolId { match self.versioned_protocol { SupportedProtocol::BlocksByRangeV2 | SupportedProtocol::BlocksByRootV2 + | SupportedProtocol::PayloadEnvelopesByRangeV1 + | SupportedProtocol::PayloadEnvelopesByRootV1 | SupportedProtocol::BlobsByRangeV1 | SupportedProtocol::BlobsByRootV1 | SupportedProtocol::DataColumnsByRootV1 @@ -737,6 +764,8 @@ pub enum RequestType { Goodbye(GoodbyeReason), BlocksByRange(OldBlocksByRangeRequest), BlocksByRoot(BlocksByRootRequest), + PayloadEnvelopesByRange(PayloadEnvelopesByRangeRequest), + PayloadEnvelopesByRoot(PayloadEnvelopesByRootRequest), BlobsByRange(BlobsByRangeRequest), BlobsByRoot(BlobsByRootRequest), DataColumnsByRoot(DataColumnsByRootRequest), @@ -760,6 +789,8 @@ impl RequestType { RequestType::Goodbye(_) => 0, RequestType::BlocksByRange(req) => *req.count(), RequestType::BlocksByRoot(req) => req.block_roots().len() as u64, + RequestType::PayloadEnvelopesByRange(req) => req.count, + RequestType::PayloadEnvelopesByRoot(req) => req.beacon_block_roots.len() as u64, RequestType::BlobsByRange(req) => req.max_blobs_requested(digest_epoch, spec), RequestType::BlobsByRoot(req) => req.blob_ids.len() as u64, RequestType::DataColumnsByRoot(req) => req.max_requested() as u64, @@ -789,6 +820,8 @@ impl RequestType { BlocksByRootRequest::V1(_) => SupportedProtocol::BlocksByRootV1, BlocksByRootRequest::V2(_) => SupportedProtocol::BlocksByRootV2, }, + RequestType::PayloadEnvelopesByRange(_) => SupportedProtocol::PayloadEnvelopesByRangeV1, + RequestType::PayloadEnvelopesByRoot(_) => SupportedProtocol::PayloadEnvelopesByRootV1, RequestType::BlobsByRange(_) => SupportedProtocol::BlobsByRangeV1, RequestType::BlobsByRoot(_) => SupportedProtocol::BlobsByRootV1, RequestType::DataColumnsByRoot(_) => SupportedProtocol::DataColumnsByRootV1, @@ -820,6 +853,8 @@ impl RequestType { // variants that have `multiple_responses()` can have values. RequestType::BlocksByRange(_) => ResponseTermination::BlocksByRange, RequestType::BlocksByRoot(_) => ResponseTermination::BlocksByRoot, + RequestType::PayloadEnvelopesByRange(_) => ResponseTermination::PayloadEnvelopesByRange, + RequestType::PayloadEnvelopesByRoot(_) => ResponseTermination::PayloadEnvelopesByRoot, RequestType::BlobsByRange(_) => ResponseTermination::BlobsByRange, RequestType::BlobsByRoot(_) => ResponseTermination::BlobsByRoot, RequestType::DataColumnsByRoot(_) => ResponseTermination::DataColumnsByRoot, @@ -854,6 +889,14 @@ impl RequestType { ProtocolId::new(SupportedProtocol::BlocksByRootV2, Encoding::SSZSnappy), ProtocolId::new(SupportedProtocol::BlocksByRootV1, Encoding::SSZSnappy), ], + RequestType::PayloadEnvelopesByRange(_) => vec![ProtocolId::new( + SupportedProtocol::PayloadEnvelopesByRangeV1, + Encoding::SSZSnappy, + )], + RequestType::PayloadEnvelopesByRoot(_) => vec![ProtocolId::new( + SupportedProtocol::PayloadEnvelopesByRootV1, + Encoding::SSZSnappy, + )], RequestType::BlobsByRange(_) => vec![ProtocolId::new( SupportedProtocol::BlobsByRangeV1, Encoding::SSZSnappy, @@ -905,6 +948,8 @@ impl RequestType { RequestType::BlocksByRange(_) => false, RequestType::BlocksByRoot(_) => false, RequestType::BlobsByRange(_) => false, + RequestType::PayloadEnvelopesByRange(_) => false, + RequestType::PayloadEnvelopesByRoot(_) => false, RequestType::BlobsByRoot(_) => false, RequestType::DataColumnsByRoot(_) => false, RequestType::DataColumnsByRange(_) => false, @@ -1015,6 +1060,12 @@ impl std::fmt::Display for RequestType { RequestType::Goodbye(reason) => write!(f, "Goodbye: {}", reason), RequestType::BlocksByRange(req) => write!(f, "Blocks by range: {}", req), RequestType::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req), + RequestType::PayloadEnvelopesByRange(req) => { + write!(f, "Payload envelopes by range: {:?}", req) + } + RequestType::PayloadEnvelopesByRoot(req) => { + write!(f, "Payload envelopes by root: {:?}", req) + } RequestType::BlobsByRange(req) => write!(f, "Blobs by range: {:?}", req), RequestType::BlobsByRoot(req) => write!(f, "Blobs by root: {:?}", req), RequestType::DataColumnsByRoot(req) => write!(f, "Data columns by root: {:?}", req), diff --git a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs index 2407038bc3..b0fd9f4dd5 100644 --- a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs @@ -109,7 +109,11 @@ pub struct RPCRateLimiter { blbrange_rl: Limiter, /// BlobsByRoot rate limiter. blbroot_rl: Limiter, - /// DataColumnssByRoot rate limiter. + /// PayloadEnvelopesByRange rate limiter. + perange_rl: Limiter, + /// PayloadEnvelopesByRoot rate limiter. + peroots_rl: Limiter, + /// DataColumnsByRoot rate limiter. dcbroot_rl: Limiter, /// DataColumnsByRange rate limiter. dcbrange_rl: Limiter, @@ -148,6 +152,10 @@ pub struct RPCRateLimiterBuilder { bbrange_quota: Option, /// Quota for the BlocksByRoot protocol. bbroots_quota: Option, + /// Quota for the ExecutionPayloadEnvelopesByRange protocol. + perange_quota: Option, + /// Quota for the ExecutionPayloadEnvelopesByRoot protocol. + peroots_quota: Option, /// Quota for the BlobsByRange protocol. blbrange_quota: Option, /// Quota for the BlobsByRoot protocol. @@ -177,6 +185,8 @@ impl RPCRateLimiterBuilder { Protocol::Goodbye => self.goodbye_quota = q, Protocol::BlocksByRange => self.bbrange_quota = q, Protocol::BlocksByRoot => self.bbroots_quota = q, + Protocol::PayloadEnvelopesByRange => self.perange_quota = q, + Protocol::PayloadEnvelopesByRoot => self.peroots_quota = q, Protocol::BlobsByRange => self.blbrange_quota = q, Protocol::BlobsByRoot => self.blbroot_quota = q, Protocol::DataColumnsByRoot => self.dcbroot_quota = q, @@ -201,6 +211,12 @@ impl RPCRateLimiterBuilder { let bbrange_quota = self .bbrange_quota .ok_or("BlocksByRange quota not specified")?; + let perange_quota = self + .perange_quota + .ok_or("PayloadEnvelopesByRange quota not specified")?; + let peroots_quota = self + .peroots_quota + .ok_or("PayloadEnvelopesByRoot quota not specified")?; let lc_bootstrap_quota = self .lcbootstrap_quota .ok_or("LightClientBootstrap quota not specified")?; @@ -236,6 +252,8 @@ impl RPCRateLimiterBuilder { let goodbye_rl = Limiter::from_quota(goodbye_quota)?; let bbroots_rl = Limiter::from_quota(bbroots_quota)?; let bbrange_rl = Limiter::from_quota(bbrange_quota)?; + let perange_rl = Limiter::from_quota(perange_quota)?; + let peroots_rl = Limiter::from_quota(peroots_quota)?; let blbrange_rl = Limiter::from_quota(blbrange_quota)?; let blbroot_rl = Limiter::from_quota(blbroots_quota)?; let dcbroot_rl = Limiter::from_quota(dcbroot_quota)?; @@ -259,6 +277,8 @@ impl RPCRateLimiterBuilder { goodbye_rl, bbroots_rl, bbrange_rl, + perange_rl, + peroots_rl, blbrange_rl, blbroot_rl, dcbroot_rl, @@ -312,6 +332,8 @@ impl RPCRateLimiter { goodbye_quota, blocks_by_range_quota, blocks_by_root_quota, + payload_envelopes_by_range_quota, + payload_envelopes_by_root_quota, blobs_by_range_quota, blobs_by_root_quota, data_columns_by_root_quota, @@ -329,6 +351,14 @@ impl RPCRateLimiter { .set_quota(Protocol::Goodbye, goodbye_quota) .set_quota(Protocol::BlocksByRange, blocks_by_range_quota) .set_quota(Protocol::BlocksByRoot, blocks_by_root_quota) + .set_quota( + Protocol::PayloadEnvelopesByRange, + payload_envelopes_by_range_quota, + ) + .set_quota( + Protocol::PayloadEnvelopesByRoot, + payload_envelopes_by_root_quota, + ) .set_quota(Protocol::BlobsByRange, blobs_by_range_quota) .set_quota(Protocol::BlobsByRoot, blobs_by_root_quota) .set_quota(Protocol::DataColumnsByRoot, data_columns_by_root_quota) @@ -376,6 +406,8 @@ impl RPCRateLimiter { Protocol::Goodbye => &mut self.goodbye_rl, Protocol::BlocksByRange => &mut self.bbrange_rl, Protocol::BlocksByRoot => &mut self.bbroots_rl, + Protocol::PayloadEnvelopesByRange => &mut self.perange_rl, + Protocol::PayloadEnvelopesByRoot => &mut self.peroots_rl, Protocol::BlobsByRange => &mut self.blbrange_rl, Protocol::BlobsByRoot => &mut self.blbroot_rl, Protocol::DataColumnsByRoot => &mut self.dcbroot_rl, @@ -400,6 +432,8 @@ impl RPCRateLimiter { status_rl, bbrange_rl, bbroots_rl, + perange_rl, + peroots_rl, blbrange_rl, blbroot_rl, dcbroot_rl, @@ -417,6 +451,8 @@ impl RPCRateLimiter { status_rl.prune(time_since_start); bbrange_rl.prune(time_since_start); bbroots_rl.prune(time_since_start); + perange_rl.prune(time_since_start); + peroots_rl.prune(time_since_start); blbrange_rl.prune(time_since_start); blbroot_rl.prune(time_since_start); dcbrange_rl.prune(time_since_start); diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index d0323bab52..6277fc7dec 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use types::{ BlobSidecar, DataColumnSidecar, Epoch, EthSpec, LightClientBootstrap, LightClientFinalityUpdate, LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, }; pub type Id = u32; @@ -160,6 +161,10 @@ pub enum Response { DataColumnsByRange(Option>>), /// A response to a get BLOCKS_BY_ROOT request. BlocksByRoot(Option>>), + /// A response to a get `EXECUTION_PAYLOAD_ENVELOPES_BY_ROOT` request. + PayloadEnvelopesByRoot(Option>>), + /// A response to a get `EXECUTION_PAYLOAD_ENVELOPES_BYH_RANGE` request. + PayloadEnvelopesByRange(Option>>), /// A response to a get BLOBS_BY_ROOT request. BlobsByRoot(Option>>), /// A response to a get DATA_COLUMN_SIDECARS_BY_ROOT request. @@ -185,6 +190,16 @@ impl std::convert::From> for RpcResponse { Some(b) => RpcResponse::Success(RpcSuccessResponse::BlocksByRange(b)), None => RpcResponse::StreamTermination(ResponseTermination::BlocksByRange), }, + Response::PayloadEnvelopesByRoot(r) => match r { + Some(p) => RpcResponse::Success(RpcSuccessResponse::PayloadEnvelopesByRoot(p)), + None => RpcResponse::StreamTermination(ResponseTermination::PayloadEnvelopesByRoot), + }, + Response::PayloadEnvelopesByRange(r) => match r { + Some(p) => RpcResponse::Success(RpcSuccessResponse::PayloadEnvelopesbyRange(p)), + None => { + RpcResponse::StreamTermination(ResponseTermination::PayloadEnvelopesByRange) + } + }, Response::BlobsByRoot(r) => match r { Some(b) => RpcResponse::Success(RpcSuccessResponse::BlobsByRoot(b)), None => RpcResponse::StreamTermination(ResponseTermination::BlobsByRoot), diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 94e0ad0710..304f7f5baf 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -1525,6 +1525,28 @@ impl Network { request_type, }) } + RequestType::PayloadEnvelopesByRange(_) => { + metrics::inc_counter_vec( + &metrics::TOTAL_RPC_REQUESTS, + &["payload_envelopes_by_range"], + ); + Some(NetworkEvent::RequestReceived { + peer_id, + inbound_request_id, + request_type, + }) + } + RequestType::PayloadEnvelopesByRoot(_) => { + metrics::inc_counter_vec( + &metrics::TOTAL_RPC_REQUESTS, + &["payload_envelopes_by_root"], + ); + Some(NetworkEvent::RequestReceived { + peer_id, + inbound_request_id, + request_type, + }) + } RequestType::BlobsByRange(_) => { metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["blobs_by_range"]); Some(NetworkEvent::RequestReceived { @@ -1639,6 +1661,16 @@ impl Network { RpcSuccessResponse::BlocksByRoot(resp) => { self.build_response(id, peer_id, Response::BlocksByRoot(Some(resp))) } + RpcSuccessResponse::PayloadEnvelopesbyRange(resp) => self.build_response( + id, + peer_id, + Response::PayloadEnvelopesByRange(Some(resp)), + ), + RpcSuccessResponse::PayloadEnvelopesByRoot(resp) => self.build_response( + id, + peer_id, + Response::PayloadEnvelopesByRoot(Some(resp)), + ), RpcSuccessResponse::BlobsByRoot(resp) => { self.build_response(id, peer_id, Response::BlobsByRoot(Some(resp))) } @@ -1673,6 +1705,12 @@ impl Network { let response = match termination { ResponseTermination::BlocksByRange => Response::BlocksByRange(None), ResponseTermination::BlocksByRoot => Response::BlocksByRoot(None), + ResponseTermination::PayloadEnvelopesByRange => { + Response::PayloadEnvelopesByRange(None) + } + ResponseTermination::PayloadEnvelopesByRoot => { + Response::PayloadEnvelopesByRoot(None) + } ResponseTermination::BlobsByRange => Response::BlobsByRange(None), ResponseTermination::BlobsByRoot => Response::BlobsByRoot(None), ResponseTermination::DataColumnsByRoot => Response::DataColumnsByRoot(None), diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index e1adf860de..039b22db7c 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -14,7 +14,8 @@ use beacon_processor::{ use lighthouse_network::rpc::InboundRequestId; use lighthouse_network::rpc::methods::{ BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest, - LightClientUpdatesByRangeRequest, + LightClientUpdatesByRangeRequest, PayloadEnvelopesByRangeRequest, + PayloadEnvelopesByRootRequest, }; use lighthouse_network::service::api_types::CustodyBackfillBatchId; use lighthouse_network::{ @@ -686,6 +687,46 @@ impl NetworkBeaconProcessor { }) } + /// Create a new work event to process `PayloadEnvelopesByRootRequest`s from the RPC network. + pub fn send_payload_envelopes_by_roots_request( + self: &Arc, + peer_id: PeerId, + inbound_request_id: InboundRequestId, // Use ResponseId here + request: PayloadEnvelopesByRootRequest, + ) -> Result<(), Error> { + let processor = self.clone(); + let process_fn = async move { + processor + .handle_payload_envelopes_by_root_request(peer_id, inbound_request_id, request) + .await; + }; + + self.try_send(BeaconWorkEvent { + drop_during_sync: false, + work: Work::PayloadEnvelopesByRootRequest(Box::pin(process_fn)), + }) + } + + /// Create a new work event to process `PayloadEnvelopesByRangeRequest`s from the RPC network. + pub fn send_payload_envelopes_by_range_request( + self: &Arc, + peer_id: PeerId, + inbound_request_id: InboundRequestId, + request: PayloadEnvelopesByRangeRequest, + ) -> Result<(), Error> { + let processor = self.clone(); + let process_fn = async move { + processor + .handle_payload_envelopes_by_range_request(peer_id, inbound_request_id, request) + .await; + }; + + self.try_send(BeaconWorkEvent { + drop_during_sync: false, + work: Work::PayloadEnvelopesByRangeRequest(Box::pin(process_fn)), + }) + } + /// Create a new work event to process `BlobsByRangeRequest`s from the RPC network. pub fn send_blobs_by_range_request( self: &Arc, diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 279870d444..412814a5cb 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -7,6 +7,7 @@ use beacon_chain::{BeaconChainError, BeaconChainTypes, BlockProcessStatus, WhenS use itertools::{Itertools, process_results}; use lighthouse_network::rpc::methods::{ BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest, + PayloadEnvelopesByRangeRequest, PayloadEnvelopesByRootRequest, }; use lighthouse_network::rpc::*; use lighthouse_network::{PeerId, ReportSource, Response, SyncInfo}; @@ -254,6 +255,113 @@ impl NetworkBeaconProcessor { Ok(()) } + /// Handle a `ExecutionPayloadEnvelopesByRoot` request from the peer. + #[instrument( + name = "lh_handle_payload_envelopes_by_root_request", + parent = None, + level = "debug", + skip_all, + fields(peer_id = %peer_id, client = tracing::field::Empty) + )] + pub async fn handle_payload_envelopes_by_root_request( + self: Arc, + peer_id: PeerId, + inbound_request_id: InboundRequestId, + request: PayloadEnvelopesByRootRequest, + ) { + let client = self.network_globals.client(&peer_id); + Span::current().record("client", field::display(client.kind)); + + self.terminate_response_stream( + peer_id, + inbound_request_id, + self.clone() + .handle_payload_envelopes_by_root_request_inner( + peer_id, + inbound_request_id, + request, + ) + .await, + Response::PayloadEnvelopesByRoot, + ); + } + + /// Handle a `ExecutionPayloadEnvelopes` request from the peer. + async fn handle_payload_envelopes_by_root_request_inner( + self: Arc, + peer_id: PeerId, + inbound_request_id: InboundRequestId, + request: PayloadEnvelopesByRootRequest, + ) -> Result<(), (RpcErrorResponse, &'static str)> { + let log_results = |peer_id, requested_envelopes, send_envelope_count| { + debug!( + %peer_id, + requested = requested_envelopes, + returned = %send_envelope_count, + "ExecutionPayloadEnvelopes outgoing response processed" + ); + }; + + let requested_envelopes = request.beacon_block_roots.len(); + let mut envelope_stream = match self + .chain + .get_payload_envelopes_checking_caches(request.beacon_block_roots.to_vec()) + { + Ok(envelope_stream) => envelope_stream, + Err(e) => { + error!( error = ?e, "Error getting payload envelope stream"); + return Err(( + RpcErrorResponse::ServerError, + "Error getting payload envelope stream", + )); + } + }; + // Fetching payload envelopes is async because it may have to hit the execution layer for payloads. + let mut send_envelope_count = 0; + while let Some((root, result)) = envelope_stream.next().await { + match result.as_ref() { + Ok(Some(envelope)) => { + self.send_response( + peer_id, + inbound_request_id, + Response::PayloadEnvelopesByRoot(Some(envelope.clone())), + ); + send_envelope_count += 1; + } + Ok(None) => { + debug!( + %peer_id, + request_root = ?root, + "Peer requested unknown payload envelope" + ); + } + Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => { + debug!( + block_root = ?root, + reason = "execution layer not synced", + "Failed to fetch execution payload for payload envelopes by root request" + ); + log_results(peer_id, requested_envelopes, send_envelope_count); + return Err(( + RpcErrorResponse::ResourceUnavailable, + "Execution layer not synced", + )); + } + Err(e) => { + debug!( + ?peer_id, + request_root = ?root, + error = ?e, + "Error fetching payload envelope for peer" + ); + } + } + } + log_results(peer_id, requested_envelopes, send_envelope_count); + + Ok(()) + } + /// Handle a `BlobsByRoot` request from the peer. #[instrument( name = "lh_handle_blobs_by_root_request", @@ -983,6 +1091,182 @@ impl NetworkBeaconProcessor { .collect::>()) } + /// Handle a `ExecutionPayloadEnvelopesByRange` request from the peer. + #[instrument( + name = "lh_handle_payload_envelopes_by_range_request", + parent = None, + level = "debug", + skip_all, + fields(peer_id = %peer_id, client = tracing::field::Empty) + )] + pub async fn handle_payload_envelopes_by_range_request( + self: Arc, + peer_id: PeerId, + inbound_request_id: InboundRequestId, + req: PayloadEnvelopesByRangeRequest, + ) { + let client = self.network_globals.client(&peer_id); + Span::current().record("client", field::display(client.kind)); + + self.terminate_response_stream( + peer_id, + inbound_request_id, + self.clone() + .handle_payload_envelopes_by_range_request_inner(peer_id, inbound_request_id, req) + .await, + Response::PayloadEnvelopesByRange, + ); + } + + /// Handle a `ExecutionPayloadEnvelopesByRange` request from the peer. + async fn handle_payload_envelopes_by_range_request_inner( + self: Arc, + peer_id: PeerId, + inbound_request_id: InboundRequestId, + req: PayloadEnvelopesByRangeRequest, + ) -> Result<(), (RpcErrorResponse, &'static str)> { + let req_start_slot = req.start_slot; + let req_count = req.count; + + debug!( + %peer_id, + count = req_count, + start_slot = %req_start_slot, + "Received ExecutionPayloadEnvelopesByRange Request" + ); + + // Spawn a blocking handle since get_block_roots_for_slot_range takes a sync lock on the + // fork-choice. + let network_beacon_processor = self.clone(); + let block_roots = self + .executor + .spawn_blocking_handle( + move || { + network_beacon_processor.get_block_roots_for_slot_range( + req_start_slot, + req_count, + "ExecutionPayloadEnvelopesByRange", + ) + }, + "get_block_roots_for_slot_range", + ) + .ok_or((RpcErrorResponse::ServerError, "shutting down"))? + .await + .map_err(|_| (RpcErrorResponse::ServerError, "tokio join"))?? + .iter() + .map(|(root, _)| *root) + .collect::>(); + + let current_slot = self + .chain + .slot() + .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); + + let log_results = |peer_id, payloads_sent| { + if payloads_sent < (req_count as usize) { + debug!( + %peer_id, + msg = "Failed to return all requested payload envelopes", + start_slot = %req_start_slot, + %current_slot, + requested = req_count, + returned = payloads_sent, + "ExecutionPayloadEnvelopesByRange outgoing response processed" + ); + } else { + debug!( + %peer_id, + start_slot = %req_start_slot, + %current_slot, + requested = req_count, + returned = payloads_sent, + "ExecutionPayloadEnvelopesByRange outgoing response processed" + ); + } + }; + + let mut envelope_stream = match self.chain.get_payload_envelopes(block_roots) { + Ok(envelope_stream) => envelope_stream, + Err(e) => { + error!(error = ?e, "Error getting payload envelope stream"); + return Err((RpcErrorResponse::ServerError, "Iterator error")); + } + }; + + // Fetching payload envelopes is async because it may have to hit the execution layer for payloads. + let mut envelopes_sent = 0; + while let Some((root, result)) = envelope_stream.next().await { + match result.as_ref() { + Ok(Some(envelope)) => { + // Due to skip slots, blocks could be out of the range, we ensure they + // are in the range before sending + if envelope.slot() >= req_start_slot + && envelope.slot() < req_start_slot + req.count + { + envelopes_sent += 1; + self.send_network_message(NetworkMessage::SendResponse { + peer_id, + inbound_request_id, + response: Response::PayloadEnvelopesByRange(Some(envelope.clone())), + }); + } + } + Ok(None) => { + error!( + request = ?req, + %peer_id, + request_root = ?root, + "Envelope in the chain is not in the store" + ); + log_results(peer_id, envelopes_sent); + return Err((RpcErrorResponse::ServerError, "Database inconsistency")); + } + Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => { + debug!( + block_root = ?root, + reason = "execution layer not synced", + "Failed to fetch execution payload for envelope by range request" + ); + log_results(peer_id, envelopes_sent); + // send the stream terminator + return Err(( + RpcErrorResponse::ResourceUnavailable, + "Execution layer not synced", + )); + } + Err(e) => { + if matches!( + e, + BeaconChainError::ExecutionLayerErrorPayloadReconstruction(_block_hash, boxed_error) + if matches!(**boxed_error, execution_layer::Error::EngineError(_)) + ) { + warn!( + info = "this may occur occasionally when the EE is busy", + block_root = ?root, + error = ?e, + "Error rebuilding payload for peer" + ); + } else { + error!( + block_root = ?root, + error = ?e, + "Error fetching payload envelope for peer" + ); + } + log_results(peer_id, envelopes_sent); + // send the stream terminator + return Err(( + RpcErrorResponse::ServerError, + "Failed fetching payload envelopes", + )); + } + } + } + + log_results(peer_id, envelopes_sent); + Ok(()) + } + /// Handle a `BlobsByRange` request from the peer. #[instrument( name = "lh_handle_blobs_by_range_request", diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 4b0ca0d46c..7e75f3be04 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -19,11 +19,14 @@ use beacon_chain::test_utils::{ }; use beacon_chain::{BeaconChain, WhenSlotSkipped}; use beacon_processor::{work_reprocessing_queue::*, *}; +use bls::Signature; +use fixed_bytes::FixedBytesExtended; use itertools::Itertools; use libp2p::gossipsub::MessageAcceptance; use lighthouse_network::rpc::InboundRequestId; use lighthouse_network::rpc::methods::{ BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, MetaDataV3, + PayloadEnvelopesByRangeRequest, PayloadEnvelopesByRootRequest, }; use lighthouse_network::{ Client, MessageId, NetworkConfig, NetworkGlobals, PeerId, Response, @@ -41,8 +44,9 @@ use std::time::Duration; use tokio::sync::mpsc; use types::{ AttesterSlashing, BlobSidecar, ChainSpec, DataColumnSidecarList, DataColumnSubnetId, Epoch, - EthSpec, Hash256, MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, - SignedVoluntaryExit, SingleAttestation, Slot, SubnetId, + EthSpec, ExecutionPayloadEnvelope, ExecutionPayloadGloas, ExecutionRequests, Hash256, + MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, SignedVoluntaryExit, SingleAttestation, Slot, SubnetId, }; use types::{ BlobSidecarList, @@ -534,6 +538,29 @@ impl TestRig { .unwrap(); } + pub fn enqueue_payload_envelopes_by_range_request(&self, start_slot: u64, count: u64) { + self.network_beacon_processor + .send_payload_envelopes_by_range_request( + PeerId::random(), + InboundRequestId::new_unchecked(42, 24), + PayloadEnvelopesByRangeRequest { start_slot, count }, + ) + .unwrap(); + } + + pub fn enqueue_payload_envelopes_by_root_request( + &self, + beacon_block_roots: RuntimeVariableList, + ) { + self.network_beacon_processor + .send_payload_envelopes_by_roots_request( + PeerId::random(), + InboundRequestId::new_unchecked(42, 24), + PayloadEnvelopesByRootRequest { beacon_block_roots }, + ) + .unwrap(); + } + pub fn enqueue_backfill_batch(&self, epoch: Epoch) { self.network_beacon_processor .send_chain_segment( @@ -2102,3 +2129,226 @@ async fn test_data_columns_by_range_no_duplicates_with_skip_slots() { unique_roots.len(), ); } + +/// Create a test `SignedExecutionPayloadEnvelope` with the given slot and beacon block root. +fn make_test_payload_envelope( + slot: Slot, + beacon_block_root: Hash256, +) -> SignedExecutionPayloadEnvelope { + SignedExecutionPayloadEnvelope { + message: ExecutionPayloadEnvelope { + payload: ExecutionPayloadGloas::default(), + execution_requests: ExecutionRequests::default(), + builder_index: 0, + beacon_block_root, + slot, + state_root: Hash256::zero(), + }, + signature: Signature::empty(), + } +} + +#[tokio::test] +async fn test_payload_envelopes_by_range() { + // Only test when Gloas fork is scheduled + if test_spec::().gloas_fork_epoch.is_none() { + return; + }; + + let mut rig = TestRig::new(64).await; + let start_slot = 0; + let slot_count = 32; + + // Manually store payload envelopes for each block in the range + let mut expected_count = 0; + for slot in start_slot..slot_count { + if let Some(root) = rig + .chain + .block_root_at_slot(Slot::new(slot), WhenSlotSkipped::None) + .unwrap() + { + let envelope = make_test_payload_envelope(Slot::new(slot), root); + rig.chain + .store + .put_payload_envelope(&root, envelope) + .unwrap(); + expected_count += 1; + } + } + + rig.enqueue_payload_envelopes_by_range_request(start_slot, slot_count); + + let mut actual_count = 0; + while let Some(next) = rig.network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::PayloadEnvelopesByRange(envelope), + inbound_request_id: _, + } = next + { + if envelope.is_some() { + actual_count += 1; + } else { + break; + } + } else if let NetworkMessage::SendErrorResponse { .. } = next { + // Error response terminates the stream + break; + } else { + panic!("unexpected message {:?}", next); + } + } + assert_eq!(expected_count, actual_count); +} + +#[tokio::test] +async fn test_payload_envelopes_by_root() { + // Only test when Gloas fork is scheduled + if test_spec::().gloas_fork_epoch.is_none() { + return; + }; + + let mut rig = TestRig::new(64).await; + + let block_root = rig + .chain + .block_root_at_slot(Slot::new(1), WhenSlotSkipped::None) + .unwrap() + .unwrap(); + + // Manually store a payload envelope for this block + let envelope = make_test_payload_envelope(Slot::new(1), block_root); + rig.chain + .store + .put_payload_envelope(&block_root, envelope) + .unwrap(); + + let roots = RuntimeVariableList::new(vec![block_root], 1).unwrap(); + rig.enqueue_payload_envelopes_by_root_request(roots); + + let mut actual_count = 0; + while let Some(next) = rig.network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::PayloadEnvelopesByRoot(envelope), + inbound_request_id: _, + } = next + { + if envelope.is_some() { + actual_count += 1; + } else { + break; + } + } else { + panic!("unexpected message {:?}", next); + } + } + assert_eq!(1, actual_count); +} + +#[tokio::test] +async fn test_payload_envelopes_by_root_unknown_root_returns_empty() { + // Only test when Gloas fork is scheduled + if test_spec::().gloas_fork_epoch.is_none() { + return; + }; + + let mut rig = TestRig::new(64).await; + + // Request envelope for a root that has no stored envelope + let block_root = rig + .chain + .block_root_at_slot(Slot::new(1), WhenSlotSkipped::None) + .unwrap() + .unwrap(); + + // Don't store any envelope — the handler should return 0 envelopes + let roots = RuntimeVariableList::new(vec![block_root], 1).unwrap(); + rig.enqueue_payload_envelopes_by_root_request(roots); + + let mut actual_count = 0; + while let Some(next) = rig.network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::PayloadEnvelopesByRoot(envelope), + inbound_request_id: _, + } = next + { + if envelope.is_some() { + actual_count += 1; + } else { + break; + } + } else { + panic!("unexpected message {:?}", next); + } + } + assert_eq!(0, actual_count); +} + +#[tokio::test] +async fn test_payload_envelopes_by_range_no_duplicates_with_skip_slots() { + // Only test when Gloas fork is scheduled + if test_spec::().gloas_fork_epoch.is_none() { + return; + }; + + // Build a chain of 128 slots (4 epochs) with skip slots at positions 5 and 6. + let skip_slots: HashSet = [5, 6].into_iter().collect(); + let mut rig = TestRig::new_with_skip_slots(128, &skip_slots).await; + + let start_slot = 0u64; + let slot_count = 10u64; + + // Store payload envelopes for all blocks in the range (skipping the skip slots) + for slot in start_slot..slot_count { + if let Some(root) = rig + .chain + .block_root_at_slot(Slot::new(slot), WhenSlotSkipped::None) + .unwrap() + { + let envelope = make_test_payload_envelope(Slot::new(slot), root); + rig.chain + .store + .put_payload_envelope(&root, envelope) + .unwrap(); + } + } + + rig.enqueue_payload_envelopes_by_range_request(start_slot, slot_count); + + let mut beacon_block_roots: Vec = Vec::new(); + while let Some(next) = rig.network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::PayloadEnvelopesByRange(envelope), + inbound_request_id: _, + } = next + { + if let Some(env) = envelope { + beacon_block_roots.push(env.beacon_block_root()); + } else { + break; + } + } else if let NetworkMessage::SendErrorResponse { .. } = next { + break; + } else { + panic!("unexpected message {:?}", next); + } + } + + assert!( + !beacon_block_roots.is_empty(), + "Should have received at least some payload envelopes" + ); + + // Skip slots should not cause duplicate envelopes for the same block root + let unique_roots: HashSet<_> = beacon_block_roots.iter().collect(); + assert_eq!( + beacon_block_roots.len(), + unique_roots.len(), + "Response contained duplicate block roots: got {} envelopes but only {} unique roots", + beacon_block_roots.len(), + unique_roots.len(), + ); +} diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 8373dec322..f82214810c 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -229,6 +229,24 @@ impl Router { request, ), ), + RequestType::PayloadEnvelopesByRoot(request) => self + .handle_beacon_processor_send_result( + self.network_beacon_processor + .send_payload_envelopes_by_roots_request( + peer_id, + inbound_request_id, + request, + ), + ), + RequestType::PayloadEnvelopesByRange(request) => self + .handle_beacon_processor_send_result( + self.network_beacon_processor + .send_payload_envelopes_by_range_request( + peer_id, + inbound_request_id, + request, + ), + ), RequestType::BlobsByRange(request) => self.handle_beacon_processor_send_result( self.network_beacon_processor.send_blobs_by_range_request( peer_id, @@ -309,6 +327,11 @@ impl Router { Response::DataColumnsByRange(data_column) => { self.on_data_columns_by_range_response(peer_id, app_request_id, data_column); } + // TODO(EIP-7732): implement outgoing payload envelopes by range and root + // responses once sync manager requests them. + Response::PayloadEnvelopesByRoot(_) | Response::PayloadEnvelopesByRange(_) => { + unreachable!() + } // Light client responses should not be received Response::LightClientBootstrap(_) | Response::LightClientOptimisticUpdate(_) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 6e165702a2..557d2a34d6 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -1,3 +1,4 @@ +use crate::DatabasePayloadEnvelope; use crate::config::{OnDiskStoreConfig, StoreConfig}; use crate::database::interface::BeaconNodeBackend; use crate::forwards_iter::{HybridForwardsBlockRootsIterator, HybridForwardsStateRootsIterator}; @@ -745,6 +746,27 @@ impl, Cold: ItemStore> HotColdDB .map_err(|e| e.into()) } + pub fn try_get_full_payload_envelope( + &self, + block_root: &Hash256, + ) -> Result>, Error> { + // TODO(gloas) metrics + // metrics::inc_counter(&metrics::PAYLOAD_ENVELOPE_GET_COUNT); + + // Load the execution payload envelope + // TODO(gloas) we'll want to implement a way to load a blinded envelope + let Some(envelope) = self.get_payload_envelope(block_root)? else { + return Ok(None); + }; + + Ok(Some(DatabasePayloadEnvelope::Full(envelope))) + + // TODO(gloas) implement the logic described below (see `try_get_full_block`) + // If the payload envelope is after the split point then we should have the full execution payload + // stored in the database. If it isn't but payload pruning is disabled, try to load it + // on-demand. + } + pub fn get_payload_envelope( &self, block_root: &Hash256, diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 3363eb800c..3351b182ec 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -390,6 +390,13 @@ pub enum DatabaseBlock { Blinded(SignedBeaconBlock>), } +/// An execution payload envelope from the database +// TODO(gloas) implement blinded variant +pub enum DatabasePayloadEnvelope { + Full(SignedExecutionPayloadEnvelope), + Blinded(SignedExecutionPayloadEnvelope), +} + impl DBColumn { pub fn as_str(self) -> &'static str { self.into()