From c7055b604f9958db410b2e42023763cb19dd7138 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Wed, 25 Mar 2026 15:45:24 +0900 Subject: [PATCH] Gloas serve envelope rpc (#8896) Serves envelope by range and by root requests. Added PayloadEnvelopeStreamer so that we dont need to alter upstream code when we introduce blinded payload envelopes. Co-Authored-By: Eitan Seri- Levi Co-Authored-By: Eitan Seri-Levi Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com> --- beacon_node/beacon_chain/src/beacon_chain.rs | 17 + .../beacon_chain/src/canonical_head.rs | 7 + beacon_node/beacon_chain/src/errors.rs | 2 + beacon_node/beacon_chain/src/lib.rs | 1 + .../beacon_chain_adapter.rs | 42 ++ .../src/payload_envelope_streamer/mod.rs | 219 ++++++++++ .../src/payload_envelope_streamer/tests.rs | 386 ++++++++++++++++++ beacon_node/beacon_processor/src/lib.rs | 31 +- .../src/scheduler/work_queue.rs | 12 + .../src/peer_manager/mod.rs | 6 + .../lighthouse_network/src/rpc/codec.rs | 66 +++ .../lighthouse_network/src/rpc/config.rs | 28 ++ .../lighthouse_network/src/rpc/handler.rs | 29 ++ .../lighthouse_network/src/rpc/methods.rs | 68 ++- .../lighthouse_network/src/rpc/protocol.rs | 92 ++++- .../src/rpc/rate_limiter.rs | 38 +- .../src/service/api_types.rs | 15 + .../lighthouse_network/src/service/mod.rs | 38 ++ .../src/network_beacon_processor/mod.rs | 43 +- .../network_beacon_processor/rpc_methods.rs | 285 ++++++++++++- .../src/network_beacon_processor/tests.rs | 254 +++++++++++- beacon_node/network/src/router.rs | 23 ++ .../types/src/block/signed_beacon_block.rs | 10 + consensus/types/src/core/chain_spec.rs | 20 + .../execution/execution_payload_envelope.rs | 40 ++ .../signed_execution_payload_envelope.rs | 19 + 26 files changed, 1778 insertions(+), 13 deletions(-) create mode 100644 beacon_node/beacon_chain/src/payload_envelope_streamer/beacon_chain_adapter.rs create mode 100644 beacon_node/beacon_chain/src/payload_envelope_streamer/mod.rs create mode 100644 beacon_node/beacon_chain/src/payload_envelope_streamer/tests.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index c7009fc6dc..81735bdd9d 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -54,6 +54,8 @@ use crate::observed_block_producers::ObservedBlockProducers; use crate::observed_data_sidecars::ObservedDataSidecars; use crate::observed_operations::{ObservationOutcome, ObservedOperations}; use crate::observed_slashable::ObservedSlashable; +#[cfg(not(test))] +use crate::payload_envelope_streamer::{EnvelopeRequestSource, launch_payload_envelope_stream}; use crate::pending_payload_envelopes::PendingPayloadEnvelopes; use crate::persisted_beacon_chain::PersistedBeaconChain; use crate::persisted_custody::persist_custody_context; @@ -1135,6 +1137,21 @@ impl BeaconChain { .map_or_else(|| self.get_blobs(block_root), Ok) } + #[cfg(not(test))] + #[allow(clippy::type_complexity)] + pub fn get_payload_envelopes( + self: &Arc, + block_roots: Vec, + request_source: EnvelopeRequestSource, + ) -> impl Stream< + Item = ( + Hash256, + Arc>>, Error>>, + ), + > { + launch_payload_envelope_stream(self.clone(), block_roots, request_source) + } + pub fn get_data_columns_checking_all_caches( &self, block_root: Hash256, diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index 0faddd1792..3a429bdb8a 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -371,6 +371,13 @@ impl CanonicalHead { Ok((head, execution_status)) } + // TODO(gloas) just a stub for now, implement this once we have fork choice. + /// Returns true if the payload for this block is canonical according to fork choice + /// Returns an error if the block root doesn't exist in fork choice. + pub fn block_has_canonical_payload(&self, _root: &Hash256) -> Result { + Ok(true) + } + /// Returns a clone of `self.cached_head`. /// /// Takes a read-lock on `self.cached_head` for a short time (just long enough to clone it). diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 6c8f0d2794..210c4a4482 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -8,6 +8,7 @@ use crate::observed_aggregates::Error as ObservedAttestationsError; use crate::observed_attesters::Error as ObservedAttestersError; use crate::observed_block_producers::Error as ObservedBlockProducersError; use crate::observed_data_sidecars::Error as ObservedDataSidecarsError; +use crate::payload_envelope_streamer::Error as EnvelopeStreamerError; use bls::PublicKeyBytes; use execution_layer::PayloadStatus; use fork_choice::ExecutionStatus; @@ -157,6 +158,7 @@ pub enum BeaconChainError { reconstructed_transactions_root: Hash256, }, BlockStreamerError(BlockStreamerError), + EnvelopeStreamerError(EnvelopeStreamerError), AddPayloadLogicError, ExecutionForkChoiceUpdateFailed(execution_layer::Error), PrepareProposerFailed(BlockProcessingError), diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 29081fd767..cf427d1a40 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -43,6 +43,7 @@ pub mod observed_block_producers; pub mod observed_data_sidecars; pub mod observed_operations; mod observed_slashable; +pub mod payload_envelope_streamer; pub mod payload_envelope_verification; pub mod pending_payload_envelopes; pub mod persisted_beacon_chain; diff --git a/beacon_node/beacon_chain/src/payload_envelope_streamer/beacon_chain_adapter.rs b/beacon_node/beacon_chain/src/payload_envelope_streamer/beacon_chain_adapter.rs new file mode 100644 index 0000000000..47c58f07b9 --- /dev/null +++ b/beacon_node/beacon_chain/src/payload_envelope_streamer/beacon_chain_adapter.rs @@ -0,0 +1,42 @@ +use std::sync::Arc; + +#[cfg(test)] +use mockall::automock; +use task_executor::TaskExecutor; +use types::{Hash256, SignedExecutionPayloadEnvelope, Slot}; + +use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; + +/// An adapter to the `BeaconChain` functionalities to remove `BeaconChain` from direct dependency to enable testing envelope streamer logic. +pub(crate) struct EnvelopeStreamerBeaconAdapter { + chain: Arc>, +} + +#[cfg_attr(test, automock, allow(dead_code))] +impl EnvelopeStreamerBeaconAdapter { + pub(crate) fn new(chain: Arc>) -> Self { + Self { chain } + } + + pub(crate) fn executor(&self) -> &TaskExecutor { + &self.chain.task_executor + } + + pub(crate) fn get_payload_envelope( + &self, + root: &Hash256, + ) -> Result>, store::Error> { + self.chain.store.get_payload_envelope(root) + } + + pub(crate) fn get_split_slot(&self) -> Slot { + self.chain.store.get_split_info().slot + } + + pub(crate) fn block_has_canonical_payload( + &self, + root: &Hash256, + ) -> Result { + self.chain.canonical_head.block_has_canonical_payload(root) + } +} diff --git a/beacon_node/beacon_chain/src/payload_envelope_streamer/mod.rs b/beacon_node/beacon_chain/src/payload_envelope_streamer/mod.rs new file mode 100644 index 0000000000..d10e3762a4 --- /dev/null +++ b/beacon_node/beacon_chain/src/payload_envelope_streamer/mod.rs @@ -0,0 +1,219 @@ +mod beacon_chain_adapter; +#[cfg(test)] +mod tests; + +use std::sync::Arc; + +#[cfg_attr(test, double)] +use crate::payload_envelope_streamer::beacon_chain_adapter::EnvelopeStreamerBeaconAdapter; +use futures::Stream; +#[cfg(test)] +use mockall_double::double; +use tokio::sync::mpsc::{self, UnboundedSender}; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tracing::{debug, error, warn}; +use types::{EthSpec, Hash256, SignedExecutionPayloadEnvelope}; + +#[cfg(not(test))] +use crate::BeaconChain; +use crate::{BeaconChainError, BeaconChainTypes}; + +type PayloadEnvelopeResult = + Result>>, BeaconChainError>; + +#[derive(Debug)] +pub enum Error { + BlockMissingFromForkChoice, +} + +#[derive(Debug, PartialEq)] +pub enum EnvelopeRequestSource { + ByRoot, + ByRange, +} + +pub struct PayloadEnvelopeStreamer { + adapter: EnvelopeStreamerBeaconAdapter, + request_source: EnvelopeRequestSource, +} + +// TODO(gloas) eventually we'll need to expand this to support loading blinded payload envelopes from the db +// and fetching the execution payload from the EL. See BlockStreamer impl as an example +impl PayloadEnvelopeStreamer { + pub(crate) fn new( + adapter: EnvelopeStreamerBeaconAdapter, + request_source: EnvelopeRequestSource, + ) -> Arc { + Arc::new(Self { + adapter, + request_source, + }) + } + + // TODO(gloas) simply a stub impl for now. Should check some exec payload envelope cache + // and return the envelope if it exists in the cache + fn check_payload_envelope_cache( + &self, + _beacon_block_root: &Hash256, + ) -> Option>> { + // if self.check_caches == CheckCaches::Yes + None + } + + fn load_envelope( + self: &Arc, + beacon_block_root: &Hash256, + ) -> Result>>, BeaconChainError> { + if let Some(cached_envelope) = self.check_payload_envelope_cache(beacon_block_root) { + Ok(Some(cached_envelope)) + } else { + // TODO(gloas) we'll want to use the execution layer directly to call + // the engine api method eth_getPayloadBodiesByRange() + match self.adapter.get_payload_envelope(beacon_block_root) { + Ok(opt_envelope) => Ok(opt_envelope.map(Arc::new)), + Err(e) => Err(BeaconChainError::DBError(e)), + } + } + } + + async fn load_envelopes( + self: &Arc, + block_roots: &[Hash256], + ) -> Result)>, BeaconChainError> { + let streamer = self.clone(); + let block_roots = block_roots.to_vec(); + let split_slot = streamer.adapter.get_split_slot(); + // Loading from the DB is slow -> spawn a blocking task + self.adapter + .executor() + .spawn_blocking_handle( + move || { + let mut results: Vec<(Hash256, PayloadEnvelopeResult)> = Vec::new(); + for root in block_roots.iter() { + // TODO(gloas) we are loading the full envelope from the db. + // in a future PR we will only be storing the blinded envelope. + // When that happens we'll need to use the EL here to fetch + // the payload and reconstruct the non-blinded envelope. + let opt_envelope = match streamer.load_envelope(root) { + Ok(opt_envelope) => opt_envelope, + Err(e) => { + results.push((*root, Err(e))); + continue; + } + }; + + if streamer.request_source == EnvelopeRequestSource::ByRoot { + // No envelope verification required for `ENVELOPE_BY_ROOT` requests. + // If we only served envelopes that match our canonical view, nodes + // wouldn't be able to sync other branches. + results.push((*root, Ok(opt_envelope))); + continue; + } + + // When loading envelopes on or after the split slot, we must cross reference the bid from the child beacon block. + // There can be payloads that have been imported into the hot db but don't match our current view + // of the canonical chain. + + if let Some(envelope) = opt_envelope { + // Ensure that the envelopes we're serving match our view of the canonical chain. + + // When loading envelopes before the split slot, there is no need to check. + // Non-canonical payload envelopes will have already been pruned. + if split_slot > envelope.slot() { + results.push((*root, Ok(Some(envelope)))); + continue; + } + + match streamer.adapter.block_has_canonical_payload(root) { + Ok(is_envelope_canonical) => { + if is_envelope_canonical { + results.push((*root, Ok(Some(envelope)))); + } else { + results.push((*root, Ok(None))); + } + } + Err(_) => { + results.push(( + *root, + Err(BeaconChainError::EnvelopeStreamerError( + Error::BlockMissingFromForkChoice, + )), + )); + } + } + } else { + results.push((*root, Ok(None))); + } + } + results + }, + "load_execution_payload_envelopes", + ) + .ok_or(BeaconChainError::RuntimeShutdown)? + .await + .map_err(BeaconChainError::TokioJoin) + } + + async fn stream_payload_envelopes( + self: Arc, + beacon_block_roots: Vec, + sender: UnboundedSender<(Hash256, Arc>)>, + ) { + let results = match self.load_envelopes(&beacon_block_roots).await { + Ok(results) => results, + Err(e) => { + warn!(error = ?e, "Failed to load payload envelopes"); + send_errors(&beacon_block_roots, sender, e).await; + return; + } + }; + + for (root, result) in results { + if sender.send((root, Arc::new(result))).is_err() { + break; + } + } + } + + pub fn launch_stream( + self: Arc, + block_roots: Vec, + ) -> impl Stream>)> { + let (envelope_tx, envelope_rx) = mpsc::unbounded_channel(); + debug!( + envelopes = block_roots.len(), + "Launching a PayloadEnvelopeStreamer" + ); + let executor = self.adapter.executor().clone(); + executor.spawn( + self.stream_payload_envelopes(block_roots, envelope_tx), + "get_payload_envelopes_sender", + ); + UnboundedReceiverStream::new(envelope_rx) + } +} + +/// Create a `PayloadEnvelopeStreamer` from a `BeaconChain` and launch a stream. +#[cfg(not(test))] +pub fn launch_payload_envelope_stream( + chain: Arc>, + block_roots: Vec, + request_source: EnvelopeRequestSource, +) -> impl Stream>)> { + let adapter = beacon_chain_adapter::EnvelopeStreamerBeaconAdapter::new(chain); + PayloadEnvelopeStreamer::new(adapter, request_source).launch_stream(block_roots) +} + +async fn send_errors( + block_roots: &[Hash256], + sender: UnboundedSender<(Hash256, Arc>)>, + beacon_chain_error: BeaconChainError, +) { + let result = Arc::new(Err(beacon_chain_error)); + for beacon_block_root in block_roots { + if sender.send((*beacon_block_root, result.clone())).is_err() { + error!("EnvelopeStreamer channel closed unexpectedly"); + break; + } + } +} diff --git a/beacon_node/beacon_chain/src/payload_envelope_streamer/tests.rs b/beacon_node/beacon_chain/src/payload_envelope_streamer/tests.rs new file mode 100644 index 0000000000..9e869a59b8 --- /dev/null +++ b/beacon_node/beacon_chain/src/payload_envelope_streamer/tests.rs @@ -0,0 +1,386 @@ +use super::*; +use crate::payload_envelope_streamer::beacon_chain_adapter::MockEnvelopeStreamerBeaconAdapter; +use crate::test_utils::EphemeralHarnessType; +use bls::{FixedBytesExtended, Signature}; +use futures::StreamExt; +use std::collections::HashMap; +use task_executor::test_utils::TestRuntime; +use types::{ + ExecutionBlockHash, ExecutionPayloadEnvelope, ExecutionPayloadGloas, Hash256, MinimalEthSpec, + SignedExecutionPayloadEnvelope, Slot, +}; + +type E = MinimalEthSpec; +type T = EphemeralHarnessType; + +struct SlotEntry { + block_root: Hash256, + slot: Slot, + envelope: Option>, + non_canonical_envelope: bool, +} + +impl SlotEntry { + fn expect_envelope(&self, split_slot: Option) -> bool { + if self.envelope.is_none() { + return false; + } + if !self.non_canonical_envelope { + return true; + } + // Non-canonical envelopes before the split slot are returned + // (in production they would have been pruned). + split_slot.is_some_and(|s| self.slot < s) + } +} + +fn roots(chain: &[SlotEntry]) -> Vec { + chain.iter().map(|s| s.block_root).collect() +} + +/// Build test chain data. +fn build_chain( + num_slots: u64, + skipped_slots: &[u64], + missing_envelope_slots: &[u64], + non_canonical_envelope_slots: &[u64], +) -> Vec { + let mut chain = Vec::new(); + for i in 1..=num_slots { + if skipped_slots.contains(&i) { + continue; + } + let slot = Slot::new(i); + let block_root = Hash256::from_low_u64_be(i); + let has_envelope = !missing_envelope_slots.contains(&i); + let is_non_canonical = non_canonical_envelope_slots.contains(&i); + + let envelope = if has_envelope { + let block_hash = if is_non_canonical { + ExecutionBlockHash::from_root(Hash256::repeat_byte(0xFF)) + } else { + ExecutionBlockHash::from_root(Hash256::from_low_u64_be(i)) + }; + Some(SignedExecutionPayloadEnvelope { + message: ExecutionPayloadEnvelope { + payload: ExecutionPayloadGloas { + block_hash, + ..Default::default() + }, + execution_requests: Default::default(), + builder_index: 0, + beacon_block_root: block_root, + slot, + state_root: Hash256::zero(), + }, + signature: Signature::empty(), + }) + } else { + None + }; + + chain.push(SlotEntry { + block_root, + slot, + envelope, + non_canonical_envelope: is_non_canonical, + }); + } + chain +} + +fn mock_adapter() -> (MockEnvelopeStreamerBeaconAdapter, TestRuntime) { + let runtime = TestRuntime::default(); + let mut mock = MockEnvelopeStreamerBeaconAdapter::default(); + mock.expect_executor() + .return_const(runtime.task_executor.clone()); + (mock, runtime) +} + +/// Configure `get_payload_envelope` to return envelopes from chain data. +fn mock_envelopes(mock: &mut MockEnvelopeStreamerBeaconAdapter, chain: &[SlotEntry]) { + let envelope_map: HashMap>> = chain + .iter() + .map(|entry| (entry.block_root, entry.envelope.clone())) + .collect(); + mock.expect_get_payload_envelope() + .returning(move |root| Ok(envelope_map.get(root).cloned().flatten())); +} + +/// Configure `block_has_canonical_payload` based on chain's non-canonical entries. +fn mock_canonical_head(mock: &mut MockEnvelopeStreamerBeaconAdapter, chain: &[SlotEntry]) { + let non_canonical: Vec = chain + .iter() + .filter(|e| e.non_canonical_envelope) + .map(|e| e.block_root) + .collect(); + mock.expect_block_has_canonical_payload() + .returning(move |root| Ok(!non_canonical.contains(root))); +} + +fn unwrap_result( + result: &Arc>, +) -> &Option>> { + result + .as_ref() + .as_ref() + .expect("unexpected error in stream result") +} + +async fn assert_stream_matches( + stream: &mut (impl Stream>)> + Unpin), + chain: &[SlotEntry], + split_slot: Option, +) { + for (i, entry) in chain.iter().enumerate() { + let (root, result) = stream + .next() + .await + .unwrap_or_else(|| panic!("stream ended early at index {i}")); + assert_eq!(root, entry.block_root, "root mismatch at index {i}"); + + let result = unwrap_result(&result); + + if entry.expect_envelope(split_slot) { + let envelope = result + .as_ref() + .unwrap_or_else(|| panic!("expected Some at index {i} but got None")); + let expected_envelope = entry.envelope.as_ref().unwrap(); + assert_eq!( + envelope.block_hash(), + expected_envelope.block_hash(), + "block_hash mismatch at index {i}" + ); + } else { + assert!( + result.is_none(), + "expected None at index {i} (missing or non-canonical), got Some" + ); + } + } + + assert!(stream.next().await.is_none(), "stream should be exhausted"); +} + +/// Happy path: all envelopes exist and are canonical. +#[tokio::test] +async fn stream_envelopes_by_range() { + let chain = build_chain(8, &[], &[], &[]); + let (mut mock, _runtime) = mock_adapter(); + mock.expect_get_split_slot().return_const(Slot::new(0)); + mock_envelopes(&mut mock, &chain); + mock_canonical_head(&mut mock, &chain); + + let streamer = PayloadEnvelopeStreamer::new(mock, EnvelopeRequestSource::ByRange); + let mut stream = streamer.launch_stream(roots(&chain)); + assert_stream_matches(&mut stream, &chain, None).await; +} + +/// Mixed chain: skipped slots, missing envelopes, and non-canonical envelopes. +#[tokio::test] +async fn stream_envelopes_by_range_mixed() { + let chain = build_chain(12, &[3, 8], &[5], &[7, 11]); + let (mut mock, _runtime) = mock_adapter(); + mock.expect_get_split_slot().return_const(Slot::new(0)); + mock_envelopes(&mut mock, &chain); + mock_canonical_head(&mut mock, &chain); + + let streamer = PayloadEnvelopeStreamer::new(mock, EnvelopeRequestSource::ByRange); + let mut stream = streamer.launch_stream(roots(&chain)); + assert_stream_matches(&mut stream, &chain, None).await; +} + +/// Non-canonical envelopes before the split slot bypass canonical verification +/// and are returned. Non-canonical envelopes after the split slot are filtered out. +#[tokio::test] +async fn stream_envelopes_by_range_before_split() { + // Non-canonical envelopes at slots 2 and 4 (before split), slot 8 (after split). + let chain = build_chain(10, &[], &[], &[2, 4, 8]); + let split_slot = Slot::new(6); + let (mut mock, _runtime) = mock_adapter(); + mock.expect_get_split_slot().return_const(split_slot); + mock_envelopes(&mut mock, &chain); + mock_canonical_head(&mut mock, &chain); + + let streamer = PayloadEnvelopeStreamer::new(mock, EnvelopeRequestSource::ByRange); + let mut stream = streamer.launch_stream(roots(&chain)); + assert_stream_matches(&mut stream, &chain, Some(split_slot)).await; +} + +#[tokio::test] +async fn stream_envelopes_empty_roots() { + let (mut mock, _runtime) = mock_adapter(); + mock.expect_get_split_slot().return_const(Slot::new(0)); + + let streamer = PayloadEnvelopeStreamer::new(mock, EnvelopeRequestSource::ByRange); + let mut stream = streamer.launch_stream(vec![]); + assert!( + stream.next().await.is_none(), + "empty roots should produce no results" + ); +} + +#[tokio::test] +async fn stream_envelopes_single_root() { + let chain = build_chain(3, &[], &[], &[]); + let (mut mock, _runtime) = mock_adapter(); + mock.expect_get_split_slot().return_const(Slot::new(0)); + mock_envelopes(&mut mock, &chain); + mock_canonical_head(&mut mock, &chain); + + let streamer = PayloadEnvelopeStreamer::new(mock, EnvelopeRequestSource::ByRange); + let mut stream = streamer.launch_stream(vec![chain[1].block_root]); + + let (root, result) = stream.next().await.expect("should get one result"); + assert_eq!(root, chain[1].block_root); + let envelope = unwrap_result(&result) + .as_ref() + .expect("should have envelope"); + assert_eq!( + envelope.block_hash(), + chain[1].envelope.as_ref().unwrap().block_hash(), + ); + + assert!(stream.next().await.is_none(), "stream should be exhausted"); +} + +/// ByRoot requests skip canonical verification, so non-canonical envelopes +/// should still be returned. `block_has_canonical_payload` should never be called. +#[tokio::test] +async fn stream_envelopes_by_root() { + let chain = build_chain(8, &[], &[], &[3, 5, 7]); + let (mut mock, _runtime) = mock_adapter(); + mock.expect_get_split_slot().return_const(Slot::new(0)); + mock_envelopes(&mut mock, &chain); + mock.expect_block_has_canonical_payload().times(0); + + let streamer = PayloadEnvelopeStreamer::new(mock, EnvelopeRequestSource::ByRoot); + let mut stream = streamer.launch_stream(roots(&chain)); + + // Every envelope should come back as Some, even the non-canonical ones. + for (i, entry) in chain.iter().enumerate() { + let (root, result) = stream + .next() + .await + .unwrap_or_else(|| panic!("stream ended early at index {i}")); + assert_eq!(root, entry.block_root, "root mismatch at index {i}"); + + let envelope = unwrap_result(&result) + .as_ref() + .unwrap_or_else(|| panic!("expected Some at index {i} for ByRoot request")); + let expected_envelope = entry.envelope.as_ref().unwrap(); + assert_eq!( + envelope.block_hash(), + expected_envelope.block_hash(), + "block_hash mismatch at index {i}" + ); + } + + assert!(stream.next().await.is_none(), "stream should be exhausted"); +} + +/// When `block_has_canonical_payload` returns an error, the streamer should +/// yield `Err(EnvelopeStreamerError(BlockMissingFromForkChoice))` for those roots. +#[tokio::test] +async fn stream_envelopes_error() { + let chain = build_chain(4, &[], &[], &[]); + let (mut mock, _runtime) = mock_adapter(); + mock.expect_get_split_slot().return_const(Slot::new(0)); + mock_envelopes(&mut mock, &chain); + mock.expect_block_has_canonical_payload() + .returning(|_| Err(BeaconChainError::CanonicalHeadLockTimeout)); + + let streamer = PayloadEnvelopeStreamer::new(mock, EnvelopeRequestSource::ByRange); + let mut stream = streamer.launch_stream(roots(&chain)); + + for (i, entry) in chain.iter().enumerate() { + let (root, result) = stream + .next() + .await + .unwrap_or_else(|| panic!("stream ended early at index {i}")); + assert_eq!(root, entry.block_root, "root mismatch at index {i}"); + assert!( + matches!( + result.as_ref(), + Err(BeaconChainError::EnvelopeStreamerError( + Error::BlockMissingFromForkChoice + )) + ), + "expected BlockMissingFromForkChoice error at index {i}, got {:?}", + result + ); + } + + assert!(stream.next().await.is_none(), "stream should be exhausted"); +} + +/// Requesting unknown roots (not in the store) via ByRange should return Ok(None). +#[tokio::test] +async fn stream_envelopes_by_range_unknown_roots() { + let (mut mock, _runtime) = mock_adapter(); + mock.expect_get_split_slot().return_const(Slot::new(0)); + mock.expect_get_payload_envelope().returning(|_| Ok(None)); + + let unknown_roots: Vec = (1..=4) + .map(|i| Hash256::from_low_u64_be(i * 1000)) + .collect(); + + let streamer = PayloadEnvelopeStreamer::new(mock, EnvelopeRequestSource::ByRange); + let mut stream = streamer.launch_stream(unknown_roots.clone()); + + for (i, expected_root) in unknown_roots.iter().enumerate() { + let (root, result) = stream + .next() + .await + .unwrap_or_else(|| panic!("stream ended early at index {i}")); + assert_eq!(root, *expected_root, "root mismatch at index {i}"); + let envelope = unwrap_result(&result); + assert!( + envelope.is_none(), + "expected None for unknown root at index {i}" + ); + } + + assert!(stream.next().await.is_none(), "stream should be exhausted"); +} + +/// Requesting roots via ByRoot where some envelopes are missing should +/// return Ok(None) for those roots. +#[tokio::test] +async fn stream_envelopes_by_root_missing_envelopes() { + let chain = build_chain(6, &[], &[2, 4], &[]); + let (mut mock, _runtime) = mock_adapter(); + mock.expect_get_split_slot().return_const(Slot::new(0)); + mock_envelopes(&mut mock, &chain); + mock.expect_block_has_canonical_payload().times(0); + + let streamer = PayloadEnvelopeStreamer::new(mock, EnvelopeRequestSource::ByRoot); + let mut stream = streamer.launch_stream(roots(&chain)); + + for (i, entry) in chain.iter().enumerate() { + let (root, result) = stream + .next() + .await + .unwrap_or_else(|| panic!("stream ended early at index {i}")); + assert_eq!(root, entry.block_root, "root mismatch at index {i}"); + + let envelope_opt = unwrap_result(&result); + if let Some(entry_envelope) = &entry.envelope { + let envelope = envelope_opt + .as_ref() + .unwrap_or_else(|| panic!("expected Some at index {i}")); + assert_eq!( + envelope.block_hash(), + entry_envelope.block_hash(), + "block_hash mismatch at index {i}" + ); + } else { + assert!( + envelope_opt.is_none(), + "expected None for missing envelope at index {i}" + ); + } + } + + assert!(stream.next().await.is_none(), "stream should be exhausted"); +} diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index c33f4840e0..724c41cfc9 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -426,6 +426,8 @@ pub enum Work { Status(BlockingFn), BlocksByRangeRequest(AsyncFn), BlocksByRootsRequest(AsyncFn), + PayloadEnvelopesByRangeRequest(AsyncFn), + PayloadEnvelopesByRootRequest(AsyncFn), BlobsByRangeRequest(BlockingFn), BlobsByRootsRequest(BlockingFn), DataColumnsByRootsRequest(BlockingFn), @@ -483,6 +485,8 @@ pub enum WorkType { Status, BlocksByRangeRequest, BlocksByRootsRequest, + PayloadEnvelopesByRangeRequest, + PayloadEnvelopesByRootRequest, BlobsByRangeRequest, BlobsByRootsRequest, DataColumnsByRootsRequest, @@ -542,6 +546,8 @@ impl Work { Work::Status(_) => WorkType::Status, Work::BlocksByRangeRequest(_) => WorkType::BlocksByRangeRequest, Work::BlocksByRootsRequest(_) => WorkType::BlocksByRootsRequest, + Work::PayloadEnvelopesByRangeRequest(_) => WorkType::PayloadEnvelopesByRangeRequest, + Work::PayloadEnvelopesByRootRequest(_) => WorkType::PayloadEnvelopesByRootRequest, Work::BlobsByRangeRequest(_) => WorkType::BlobsByRangeRequest, Work::BlobsByRootsRequest(_) => WorkType::BlobsByRootsRequest, Work::DataColumnsByRootsRequest(_) => WorkType::DataColumnsByRootsRequest, @@ -991,6 +997,12 @@ impl BeaconProcessor { Some(item) } else if let Some(item) = work_queues.dcbrange_queue.pop() { Some(item) + } else if let Some(item) = work_queues.payload_envelopes_brange_queue.pop() + { + Some(item) + } else if let Some(item) = work_queues.payload_envelopes_broots_queue.pop() + { + Some(item) // Check slashings after all other consensus messages so we prioritize // following head. // @@ -1180,6 +1192,12 @@ impl BeaconProcessor { Work::BlocksByRootsRequest { .. } => { work_queues.block_broots_queue.push(work, work_id) } + Work::PayloadEnvelopesByRangeRequest { .. } => work_queues + .payload_envelopes_brange_queue + .push(work, work_id), + Work::PayloadEnvelopesByRootRequest { .. } => work_queues + .payload_envelopes_broots_queue + .push(work, work_id), Work::BlobsByRangeRequest { .. } => { work_queues.blob_brange_queue.push(work, work_id) } @@ -1296,6 +1314,12 @@ impl BeaconProcessor { WorkType::Status => work_queues.status_queue.len(), WorkType::BlocksByRangeRequest => work_queues.block_brange_queue.len(), WorkType::BlocksByRootsRequest => work_queues.block_broots_queue.len(), + WorkType::PayloadEnvelopesByRangeRequest => { + work_queues.payload_envelopes_brange_queue.len() + } + WorkType::PayloadEnvelopesByRootRequest => { + work_queues.payload_envelopes_broots_queue.len() + } WorkType::BlobsByRangeRequest => work_queues.blob_brange_queue.len(), WorkType::BlobsByRootsRequest => work_queues.blob_broots_queue.len(), WorkType::DataColumnsByRootsRequest => work_queues.dcbroots_queue.len(), @@ -1487,9 +1511,10 @@ impl BeaconProcessor { | Work::DataColumnsByRangeRequest(process_fn) => { task_spawner.spawn_blocking(process_fn) } - Work::BlocksByRangeRequest(work) | Work::BlocksByRootsRequest(work) => { - task_spawner.spawn_async(work) - } + Work::BlocksByRangeRequest(work) + | Work::BlocksByRootsRequest(work) + | Work::PayloadEnvelopesByRangeRequest(work) + | Work::PayloadEnvelopesByRootRequest(work) => task_spawner.spawn_async(work), Work::ChainSegmentBackfill(process_fn) => { if self.config.enable_backfill_rate_limiting { task_spawner.spawn_blocking_with_rayon(RayonPoolType::LowPriority, process_fn) diff --git a/beacon_node/beacon_processor/src/scheduler/work_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_queue.rs index e48c776b6d..363ec06097 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_queue.rs @@ -135,6 +135,8 @@ pub struct BeaconProcessorQueueLengths { blob_brange_queue: usize, dcbroots_queue: usize, dcbrange_queue: usize, + payload_envelopes_brange_queue: usize, + payload_envelopes_broots_queue: usize, gossip_bls_to_execution_change_queue: usize, gossip_execution_payload_queue: usize, gossip_execution_payload_bid_queue: usize, @@ -206,6 +208,8 @@ impl BeaconProcessorQueueLengths { blob_brange_queue: 1024, dcbroots_queue: 1024, dcbrange_queue: 1024, + payload_envelopes_brange_queue: 1024, + payload_envelopes_broots_queue: 1024, gossip_bls_to_execution_change_queue: 16384, // TODO(EIP-7732): verify 1024 is preferable. I used same value as `gossip_block_queue` and `gossip_blob_queue` gossip_execution_payload_queue: 1024, @@ -256,6 +260,8 @@ pub struct WorkQueues { pub status_queue: FifoQueue>, pub block_brange_queue: FifoQueue>, pub block_broots_queue: FifoQueue>, + pub payload_envelopes_brange_queue: FifoQueue>, + pub payload_envelopes_broots_queue: FifoQueue>, pub blob_broots_queue: FifoQueue>, pub blob_brange_queue: FifoQueue>, pub dcbroots_queue: FifoQueue>, @@ -327,6 +333,10 @@ impl WorkQueues { let blob_brange_queue = FifoQueue::new(queue_lengths.blob_brange_queue); let dcbroots_queue = FifoQueue::new(queue_lengths.dcbroots_queue); let dcbrange_queue = FifoQueue::new(queue_lengths.dcbrange_queue); + let payload_envelopes_brange_queue = + FifoQueue::new(queue_lengths.payload_envelopes_brange_queue); + let payload_envelopes_broots_queue = + FifoQueue::new(queue_lengths.payload_envelopes_broots_queue); let gossip_bls_to_execution_change_queue = FifoQueue::new(queue_lengths.gossip_bls_to_execution_change_queue); @@ -387,6 +397,8 @@ impl WorkQueues { blob_brange_queue, dcbroots_queue, dcbrange_queue, + payload_envelopes_brange_queue, + payload_envelopes_broots_queue, gossip_bls_to_execution_change_queue, gossip_execution_payload_queue, gossip_execution_payload_bid_queue, diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 43a44c85fc..2edd9de2d9 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -590,6 +590,8 @@ impl PeerManager { Protocol::BlocksByRange => PeerAction::MidToleranceError, Protocol::BlocksByRoot => PeerAction::MidToleranceError, Protocol::BlobsByRange => PeerAction::MidToleranceError, + Protocol::PayloadEnvelopesByRange => PeerAction::MidToleranceError, + Protocol::PayloadEnvelopesByRoot => PeerAction::MidToleranceError, // Lighthouse does not currently make light client requests; therefore, this // is an unexpected scenario. We do not ban the peer for rate limiting. Protocol::LightClientBootstrap => return, @@ -615,6 +617,8 @@ impl PeerManager { Protocol::Ping => PeerAction::Fatal, Protocol::BlocksByRange => return, Protocol::BlocksByRoot => return, + Protocol::PayloadEnvelopesByRange => return, + Protocol::PayloadEnvelopesByRoot => return, Protocol::BlobsByRange => return, Protocol::BlobsByRoot => return, Protocol::DataColumnsByRoot => return, @@ -638,6 +642,8 @@ impl PeerManager { Protocol::Ping => PeerAction::LowToleranceError, Protocol::BlocksByRange => PeerAction::MidToleranceError, Protocol::BlocksByRoot => PeerAction::MidToleranceError, + Protocol::PayloadEnvelopesByRange => PeerAction::MidToleranceError, + Protocol::PayloadEnvelopesByRoot => PeerAction::MidToleranceError, Protocol::BlobsByRange => PeerAction::MidToleranceError, Protocol::BlobsByRoot => PeerAction::MidToleranceError, Protocol::DataColumnsByRoot => PeerAction::MidToleranceError, diff --git a/beacon_node/lighthouse_network/src/rpc/codec.rs b/beacon_node/lighthouse_network/src/rpc/codec.rs index d1a3182fad..346e350825 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec.rs @@ -15,6 +15,7 @@ use std::io::{Read, Write}; use std::marker::PhantomData; use std::sync::Arc; use tokio_util::codec::{Decoder, Encoder}; +use types::SignedExecutionPayloadEnvelope; use types::{ BlobSidecar, ChainSpec, DataColumnSidecar, DataColumnsByRootIdentifier, EthSpec, ForkContext, ForkName, Hash256, LightClientBootstrap, LightClientFinalityUpdate, @@ -76,6 +77,8 @@ impl SSZSnappyInboundCodec { }, RpcSuccessResponse::BlocksByRange(res) => res.as_ssz_bytes(), RpcSuccessResponse::BlocksByRoot(res) => res.as_ssz_bytes(), + RpcSuccessResponse::PayloadEnvelopesByRange(res) => res.as_ssz_bytes(), + RpcSuccessResponse::PayloadEnvelopesByRoot(res) => res.as_ssz_bytes(), RpcSuccessResponse::BlobsByRange(res) => res.as_ssz_bytes(), RpcSuccessResponse::BlobsByRoot(res) => res.as_ssz_bytes(), RpcSuccessResponse::DataColumnsByRoot(res) => res.as_ssz_bytes(), @@ -356,6 +359,8 @@ impl Encoder> for SSZSnappyOutboundCodec { BlocksByRootRequest::V1(req) => req.block_roots.as_ssz_bytes(), BlocksByRootRequest::V2(req) => req.block_roots.as_ssz_bytes(), }, + RequestType::PayloadEnvelopesByRange(req) => req.as_ssz_bytes(), + RequestType::PayloadEnvelopesByRoot(req) => req.beacon_block_roots.as_ssz_bytes(), RequestType::BlobsByRange(req) => req.as_ssz_bytes(), RequestType::BlobsByRoot(req) => req.blob_ids.as_ssz_bytes(), RequestType::DataColumnsByRange(req) => req.as_ssz_bytes(), @@ -548,6 +553,19 @@ fn handle_rpc_request( )?, }), ))), + SupportedProtocol::PayloadEnvelopesByRangeV1 => { + Ok(Some(RequestType::PayloadEnvelopesByRange( + PayloadEnvelopesByRangeRequest::from_ssz_bytes(decoded_buffer)?, + ))) + } + SupportedProtocol::PayloadEnvelopesByRootV1 => Ok(Some( + RequestType::PayloadEnvelopesByRoot(PayloadEnvelopesByRootRequest { + beacon_block_roots: RuntimeVariableList::from_ssz_bytes( + decoded_buffer, + spec.max_request_payloads(), + )?, + }), + )), SupportedProtocol::BlobsByRangeV1 => Ok(Some(RequestType::BlobsByRange( BlobsByRangeRequest::from_ssz_bytes(decoded_buffer)?, ))), @@ -650,6 +668,48 @@ fn handle_rpc_response( SupportedProtocol::BlocksByRootV1 => Ok(Some(RpcSuccessResponse::BlocksByRoot(Arc::new( SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?), )))), + SupportedProtocol::PayloadEnvelopesByRangeV1 => match fork_name { + Some(fork_name) => { + if fork_name.gloas_enabled() { + Ok(Some(RpcSuccessResponse::PayloadEnvelopesByRange(Arc::new( + SignedExecutionPayloadEnvelope::from_ssz_bytes(decoded_buffer)?, + )))) + } else { + Err(RPCError::ErrorResponse( + RpcErrorResponse::InvalidRequest, + "Invalid fork name for payload envelopes by range".to_string(), + )) + } + } + None => Err(RPCError::ErrorResponse( + RpcErrorResponse::InvalidRequest, + format!( + "No context bytes provided for {:?} response", + versioned_protocol + ), + )), + }, + SupportedProtocol::PayloadEnvelopesByRootV1 => match fork_name { + Some(fork_name) => { + if fork_name.gloas_enabled() { + Ok(Some(RpcSuccessResponse::PayloadEnvelopesByRoot(Arc::new( + SignedExecutionPayloadEnvelope::from_ssz_bytes(decoded_buffer)?, + )))) + } else { + Err(RPCError::ErrorResponse( + RpcErrorResponse::InvalidRequest, + "Invalid fork name for payload envelopes by root".to_string(), + )) + } + } + None => Err(RPCError::ErrorResponse( + RpcErrorResponse::InvalidRequest, + format!( + "No context bytes provided for {:?} response", + versioned_protocol + ), + )), + }, SupportedProtocol::BlobsByRangeV1 => match fork_name { Some(fork_name) => { if fork_name.deneb_enabled() { @@ -1260,6 +1320,12 @@ mod tests { RequestType::BlobsByRange(blbrange) => { assert_eq!(decoded, RequestType::BlobsByRange(blbrange)) } + RequestType::PayloadEnvelopesByRange(perange) => { + assert_eq!(decoded, RequestType::PayloadEnvelopesByRange(perange)) + } + RequestType::PayloadEnvelopesByRoot(peroot) => { + assert_eq!(decoded, RequestType::PayloadEnvelopesByRoot(peroot)) + } RequestType::BlobsByRoot(bbroot) => { assert_eq!(decoded, RequestType::BlobsByRoot(bbroot)) } diff --git a/beacon_node/lighthouse_network/src/rpc/config.rs b/beacon_node/lighthouse_network/src/rpc/config.rs index b0ee6fea64..9e1c6541ec 100644 --- a/beacon_node/lighthouse_network/src/rpc/config.rs +++ b/beacon_node/lighthouse_network/src/rpc/config.rs @@ -89,6 +89,8 @@ pub struct RateLimiterConfig { pub(super) goodbye_quota: Quota, pub(super) blocks_by_range_quota: Quota, pub(super) blocks_by_root_quota: Quota, + pub(super) payload_envelopes_by_range_quota: Quota, + pub(super) payload_envelopes_by_root_quota: Quota, pub(super) blobs_by_range_quota: Quota, pub(super) blobs_by_root_quota: Quota, pub(super) data_columns_by_root_quota: Quota, @@ -111,6 +113,10 @@ impl RateLimiterConfig { Quota::n_every(NonZeroU64::new(128).unwrap(), 10); pub const DEFAULT_BLOCKS_BY_ROOT_QUOTA: Quota = Quota::n_every(NonZeroU64::new(128).unwrap(), 10); + pub const DEFAULT_PAYLOAD_ENVELOPES_BY_RANGE_QUOTA: Quota = + Quota::n_every(NonZeroU64::new(128).unwrap(), 10); + pub const DEFAULT_PAYLOAD_ENVELOPES_BY_ROOT_QUOTA: Quota = + Quota::n_every(NonZeroU64::new(128).unwrap(), 10); // `DEFAULT_BLOCKS_BY_RANGE_QUOTA` * (target + 1) to account for high usage pub const DEFAULT_BLOBS_BY_RANGE_QUOTA: Quota = Quota::n_every(NonZeroU64::new(896).unwrap(), 10); @@ -137,6 +143,8 @@ impl Default for RateLimiterConfig { goodbye_quota: Self::DEFAULT_GOODBYE_QUOTA, blocks_by_range_quota: Self::DEFAULT_BLOCKS_BY_RANGE_QUOTA, blocks_by_root_quota: Self::DEFAULT_BLOCKS_BY_ROOT_QUOTA, + payload_envelopes_by_range_quota: Self::DEFAULT_PAYLOAD_ENVELOPES_BY_RANGE_QUOTA, + payload_envelopes_by_root_quota: Self::DEFAULT_PAYLOAD_ENVELOPES_BY_ROOT_QUOTA, blobs_by_range_quota: Self::DEFAULT_BLOBS_BY_RANGE_QUOTA, blobs_by_root_quota: Self::DEFAULT_BLOBS_BY_ROOT_QUOTA, data_columns_by_root_quota: Self::DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA, @@ -169,6 +177,14 @@ impl Debug for RateLimiterConfig { .field("goodbye", fmt_q!(&self.goodbye_quota)) .field("blocks_by_range", fmt_q!(&self.blocks_by_range_quota)) .field("blocks_by_root", fmt_q!(&self.blocks_by_root_quota)) + .field( + "payload_envelopes_by_range", + fmt_q!(&self.payload_envelopes_by_range_quota), + ) + .field( + "payload_envelopes_by_root", + fmt_q!(&self.payload_envelopes_by_root_quota), + ) .field("blobs_by_range", fmt_q!(&self.blobs_by_range_quota)) .field("blobs_by_root", fmt_q!(&self.blobs_by_root_quota)) .field( @@ -197,6 +213,8 @@ impl FromStr for RateLimiterConfig { let mut goodbye_quota = None; let mut blocks_by_range_quota = None; let mut blocks_by_root_quota = None; + let mut payload_envelopes_by_range_quota = None; + let mut payload_envelopes_by_root_quota = None; let mut blobs_by_range_quota = None; let mut blobs_by_root_quota = None; let mut data_columns_by_root_quota = None; @@ -214,6 +232,12 @@ impl FromStr for RateLimiterConfig { Protocol::Goodbye => goodbye_quota = goodbye_quota.or(quota), Protocol::BlocksByRange => blocks_by_range_quota = blocks_by_range_quota.or(quota), Protocol::BlocksByRoot => blocks_by_root_quota = blocks_by_root_quota.or(quota), + Protocol::PayloadEnvelopesByRange => { + payload_envelopes_by_range_quota = payload_envelopes_by_range_quota.or(quota) + } + Protocol::PayloadEnvelopesByRoot => { + payload_envelopes_by_root_quota = payload_envelopes_by_root_quota.or(quota) + } Protocol::BlobsByRange => blobs_by_range_quota = blobs_by_range_quota.or(quota), Protocol::BlobsByRoot => blobs_by_root_quota = blobs_by_root_quota.or(quota), Protocol::DataColumnsByRoot => { @@ -250,6 +274,10 @@ impl FromStr for RateLimiterConfig { .unwrap_or(Self::DEFAULT_BLOCKS_BY_RANGE_QUOTA), blocks_by_root_quota: blocks_by_root_quota .unwrap_or(Self::DEFAULT_BLOCKS_BY_ROOT_QUOTA), + payload_envelopes_by_range_quota: payload_envelopes_by_range_quota + .unwrap_or(Self::DEFAULT_PAYLOAD_ENVELOPES_BY_RANGE_QUOTA), + payload_envelopes_by_root_quota: payload_envelopes_by_root_quota + .unwrap_or(Self::DEFAULT_PAYLOAD_ENVELOPES_BY_ROOT_QUOTA), blobs_by_range_quota: blobs_by_range_quota .unwrap_or(Self::DEFAULT_BLOBS_BY_RANGE_QUOTA), blobs_by_root_quota: blobs_by_root_quota.unwrap_or(Self::DEFAULT_BLOBS_BY_ROOT_QUOTA), diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs index 9861119ac1..336747fb83 100644 --- a/beacon_node/lighthouse_network/src/rpc/handler.rs +++ b/beacon_node/lighthouse_network/src/rpc/handler.rs @@ -954,6 +954,35 @@ where return; } } + RequestType::PayloadEnvelopesByRange(request) => { + let max_allowed = spec.max_request_payloads; + if request.count > max_allowed { + self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound { + id: self.current_inbound_substream_id, + proto: Protocol::PayloadEnvelopesByRange, + error: RPCError::InvalidData(format!( + "requested exceeded limit. allowed: {}, requested: {}", + max_allowed, request.count + )), + })); + return; + } + } + RequestType::DataColumnsByRange(request) => { + let max_requested = request.max_requested::(); + let max_allowed = spec.max_request_data_column_sidecars; + if max_requested > max_allowed { + self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound { + id: self.current_inbound_substream_id, + proto: Protocol::DataColumnsByRange, + error: RPCError::InvalidData(format!( + "requested exceeded limit. allowed: {}, requested: {}", + max_allowed, max_requested + )), + })); + return; + } + } _ => {} }; diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 5a9a683b75..baabf48683 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -17,7 +17,8 @@ use types::light_client::consts::MAX_REQUEST_LIGHT_CLIENT_UPDATES; use types::{ BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnsByRootIdentifier, Epoch, EthSpec, ForkContext, Hash256, LightClientBootstrap, LightClientFinalityUpdate, - LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock, Slot, + LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, Slot, }; /// Maximum length of error message. @@ -362,6 +363,16 @@ impl BlocksByRangeRequest { } } +/// Request a number of execution payload envelopes from a peer. +#[derive(Encode, Decode, Clone, Debug, PartialEq)] +pub struct PayloadEnvelopesByRangeRequest { + /// The starting slot to request execution payload envelopes. + pub start_slot: u64, + + /// The number of slots from the start slot. + pub count: u64, +} + /// Request a number of beacon blobs from a peer. #[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct BlobsByRangeRequest { @@ -505,6 +516,29 @@ impl BlocksByRootRequest { } } +/// Request a number of execution payload envelopes from a peer. +#[derive(Clone, Debug, PartialEq)] +pub struct PayloadEnvelopesByRootRequest { + /// The list of beacon block roots used to request execution payload envelopes. + pub beacon_block_roots: RuntimeVariableList, +} + +impl PayloadEnvelopesByRootRequest { + pub fn new( + beacon_block_roots: Vec, + fork_context: &ForkContext, + ) -> Result { + let max_requests_envelopes = fork_context.spec.max_request_payloads(); + + let beacon_block_roots = + RuntimeVariableList::new(beacon_block_roots, max_requests_envelopes).map_err(|e| { + format!("ExecutionPayloadEnvelopesByRootRequest too many beacon block roots: {e:?}") + })?; + + Ok(Self { beacon_block_roots }) + } +} + /// Request a number of beacon blocks and blobs from a peer. #[derive(Clone, Debug, PartialEq)] pub struct BlobsByRootRequest { @@ -588,6 +622,13 @@ pub enum RpcSuccessResponse { /// A response to a get BLOCKS_BY_ROOT request. BlocksByRoot(Arc>), + /// A response to a get EXECUTION_PAYLOAD_ENVELOPES_BY_RANGE request. A None response signifies + /// the end of the batch. + PayloadEnvelopesByRange(Arc>), + + /// A response to a get EXECUTION_PAYLOAD_ENVELOPES_BY_ROOT request. + PayloadEnvelopesByRoot(Arc>), + /// A response to a get BLOBS_BY_RANGE request BlobsByRange(Arc>), @@ -628,6 +669,12 @@ pub enum ResponseTermination { /// Blocks by root stream termination. BlocksByRoot, + /// Execution payload envelopes by range stream termination. + PayloadEnvelopesByRange, + + /// Execution payload envelopes by root stream termination. + PayloadEnvelopesByRoot, + /// Blobs by range stream termination. BlobsByRange, @@ -649,6 +696,8 @@ impl ResponseTermination { match self { ResponseTermination::BlocksByRange => Protocol::BlocksByRange, ResponseTermination::BlocksByRoot => Protocol::BlocksByRoot, + ResponseTermination::PayloadEnvelopesByRange => Protocol::PayloadEnvelopesByRange, + ResponseTermination::PayloadEnvelopesByRoot => Protocol::PayloadEnvelopesByRoot, ResponseTermination::BlobsByRange => Protocol::BlobsByRange, ResponseTermination::BlobsByRoot => Protocol::BlobsByRoot, ResponseTermination::DataColumnsByRoot => Protocol::DataColumnsByRoot, @@ -744,6 +793,8 @@ impl RpcSuccessResponse { RpcSuccessResponse::Status(_) => Protocol::Status, RpcSuccessResponse::BlocksByRange(_) => Protocol::BlocksByRange, RpcSuccessResponse::BlocksByRoot(_) => Protocol::BlocksByRoot, + RpcSuccessResponse::PayloadEnvelopesByRange(_) => Protocol::PayloadEnvelopesByRange, + RpcSuccessResponse::PayloadEnvelopesByRoot(_) => Protocol::PayloadEnvelopesByRoot, RpcSuccessResponse::BlobsByRange(_) => Protocol::BlobsByRange, RpcSuccessResponse::BlobsByRoot(_) => Protocol::BlobsByRoot, RpcSuccessResponse::DataColumnsByRoot(_) => Protocol::DataColumnsByRoot, @@ -762,6 +813,7 @@ impl RpcSuccessResponse { pub fn slot(&self) -> Option { match self { Self::BlocksByRange(r) | Self::BlocksByRoot(r) => Some(r.slot()), + Self::PayloadEnvelopesByRoot(r) | Self::PayloadEnvelopesByRange(r) => Some(r.slot()), Self::BlobsByRange(r) | Self::BlobsByRoot(r) => Some(r.slot()), Self::DataColumnsByRange(r) | Self::DataColumnsByRoot(r) => Some(r.slot()), Self::LightClientBootstrap(r) => Some(r.get_slot()), @@ -812,6 +864,20 @@ impl std::fmt::Display for RpcSuccessResponse { RpcSuccessResponse::BlocksByRoot(block) => { write!(f, "BlocksByRoot: Block slot: {}", block.slot()) } + RpcSuccessResponse::PayloadEnvelopesByRange(envelope) => { + write!( + f, + "ExecutionPayloadEnvelopesByRange: Envelope slot: {}", + envelope.slot() + ) + } + RpcSuccessResponse::PayloadEnvelopesByRoot(envelope) => { + write!( + f, + "ExecutionPayloadEnvelopesByRoot: Envelope slot: {}", + envelope.slot() + ) + } RpcSuccessResponse::BlobsByRange(blob) => { write!(f, "BlobsByRange: Blob slot: {}", blob.slot()) } diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index b75ca72eda..2c92e17c44 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -22,7 +22,7 @@ use types::{ LightClientBootstrap, LightClientBootstrapAltair, LightClientFinalityUpdate, LightClientFinalityUpdateAltair, LightClientOptimisticUpdate, LightClientOptimisticUpdateAltair, LightClientUpdate, MainnetEthSpec, MinimalEthSpec, - SignedBeaconBlock, + SignedBeaconBlock, SignedExecutionPayloadEnvelope, }; // Note: Hardcoding the `EthSpec` type for `SignedBeaconBlock` as min/max values is @@ -65,6 +65,12 @@ pub static SIGNED_BEACON_BLOCK_BELLATRIX_MAX: LazyLock = + types::ExecutionPayload::::max_execution_payload_bellatrix_size() // adding max size of execution payload (~16gb) + ssz::BYTES_PER_LENGTH_OFFSET); // Adding the additional ssz offset for the `ExecutionPayload` field +pub static SIGNED_EXECUTION_PAYLOAD_ENVELOPE_MIN: LazyLock = + LazyLock::new(SignedExecutionPayloadEnvelope::::min_size); + +pub static SIGNED_EXECUTION_PAYLOAD_ENVELOPE_MAX: LazyLock = + LazyLock::new(SignedExecutionPayloadEnvelope::::max_size); + pub static BLOB_SIDECAR_SIZE: LazyLock = LazyLock::new(BlobSidecar::::max_size); @@ -140,13 +146,30 @@ pub fn rpc_block_limits_by_fork(current_fork: ForkName) -> RpcLimits { ), // After the merge the max SSZ size of a block is absurdly big. The size is actually // bound by other constants, so here we default to the bellatrix's max value - _ => RpcLimits::new( - *SIGNED_BEACON_BLOCK_BASE_MIN, // Base block is smaller than altair and bellatrix blocks - *SIGNED_BEACON_BLOCK_BELLATRIX_MAX, // Bellatrix block is larger than base and altair blocks + // After the merge the max SSZ size includes the execution payload. + // Gloas blocks no longer contain the execution payload, but we must + // still accept pre-Gloas blocks during historical sync, so we keep the + // Bellatrix max as the upper bound. + ForkName::Bellatrix + | ForkName::Capella + | ForkName::Deneb + | ForkName::Electra + | ForkName::Fulu + | ForkName::Gloas => RpcLimits::new( + *SIGNED_BEACON_BLOCK_BASE_MIN, + *SIGNED_BEACON_BLOCK_BELLATRIX_MAX, ), } } +/// Returns the rpc limits for payload_envelope_by_range and payload_envelope_by_root responses. +pub fn rpc_payload_limits() -> RpcLimits { + RpcLimits::new( + *SIGNED_EXECUTION_PAYLOAD_ENVELOPE_MIN, + *SIGNED_EXECUTION_PAYLOAD_ENVELOPE_MAX, + ) +} + fn rpc_light_client_updates_by_range_limits_by_fork(current_fork: ForkName) -> RpcLimits { let altair_fixed_len = LightClientFinalityUpdateAltair::::ssz_fixed_len(); @@ -242,6 +265,12 @@ pub enum Protocol { /// The `BlobsByRange` protocol name. #[strum(serialize = "blob_sidecars_by_range")] BlobsByRange, + /// The `ExecutionPayloadEnvelopesByRoot` protocol name. + #[strum(serialize = "execution_payload_envelopes_by_root")] + PayloadEnvelopesByRoot, + /// The `ExecutionPayloadEnvelopesByRange` protocol name. + #[strum(serialize = "execution_payload_envelopes_by_range")] + PayloadEnvelopesByRange, /// The `BlobsByRoot` protocol name. #[strum(serialize = "blob_sidecars_by_root")] BlobsByRoot, @@ -277,6 +306,8 @@ impl Protocol { Protocol::Goodbye => None, Protocol::BlocksByRange => Some(ResponseTermination::BlocksByRange), Protocol::BlocksByRoot => Some(ResponseTermination::BlocksByRoot), + Protocol::PayloadEnvelopesByRange => Some(ResponseTermination::PayloadEnvelopesByRange), + Protocol::PayloadEnvelopesByRoot => Some(ResponseTermination::PayloadEnvelopesByRoot), Protocol::BlobsByRange => Some(ResponseTermination::BlobsByRange), Protocol::BlobsByRoot => Some(ResponseTermination::BlobsByRoot), Protocol::DataColumnsByRoot => Some(ResponseTermination::DataColumnsByRoot), @@ -307,6 +338,8 @@ pub enum SupportedProtocol { BlocksByRangeV2, BlocksByRootV1, BlocksByRootV2, + PayloadEnvelopesByRangeV1, + PayloadEnvelopesByRootV1, BlobsByRangeV1, BlobsByRootV1, DataColumnsByRootV1, @@ -329,6 +362,8 @@ impl SupportedProtocol { SupportedProtocol::GoodbyeV1 => "1", SupportedProtocol::BlocksByRangeV1 => "1", SupportedProtocol::BlocksByRangeV2 => "2", + SupportedProtocol::PayloadEnvelopesByRangeV1 => "1", + SupportedProtocol::PayloadEnvelopesByRootV1 => "1", SupportedProtocol::BlocksByRootV1 => "1", SupportedProtocol::BlocksByRootV2 => "2", SupportedProtocol::BlobsByRangeV1 => "1", @@ -355,6 +390,8 @@ impl SupportedProtocol { SupportedProtocol::BlocksByRangeV2 => Protocol::BlocksByRange, SupportedProtocol::BlocksByRootV1 => Protocol::BlocksByRoot, SupportedProtocol::BlocksByRootV2 => Protocol::BlocksByRoot, + SupportedProtocol::PayloadEnvelopesByRangeV1 => Protocol::PayloadEnvelopesByRange, + SupportedProtocol::PayloadEnvelopesByRootV1 => Protocol::PayloadEnvelopesByRoot, SupportedProtocol::BlobsByRangeV1 => Protocol::BlobsByRange, SupportedProtocol::BlobsByRootV1 => Protocol::BlobsByRoot, SupportedProtocol::DataColumnsByRootV1 => Protocol::DataColumnsByRoot, @@ -409,6 +446,18 @@ impl SupportedProtocol { ProtocolId::new(SupportedProtocol::DataColumnsByRangeV1, Encoding::SSZSnappy), ]); } + if fork_context.fork_exists(ForkName::Gloas) { + supported.extend_from_slice(&[ + ProtocolId::new( + SupportedProtocol::PayloadEnvelopesByRangeV1, + Encoding::SSZSnappy, + ), + ProtocolId::new( + SupportedProtocol::PayloadEnvelopesByRootV1, + Encoding::SSZSnappy, + ), + ]); + } supported } } @@ -511,6 +560,13 @@ impl ProtocolId { ::ssz_fixed_len(), ), Protocol::BlocksByRoot => RpcLimits::new(0, spec.max_blocks_by_root_request), + Protocol::PayloadEnvelopesByRange => RpcLimits::new( + ::ssz_fixed_len(), + ::ssz_fixed_len(), + ), + Protocol::PayloadEnvelopesByRoot => { + RpcLimits::new(0, spec.max_payload_envelopes_by_root_request) + } Protocol::BlobsByRange => RpcLimits::new( ::ssz_fixed_len(), ::ssz_fixed_len(), @@ -549,6 +605,8 @@ impl ProtocolId { Protocol::Goodbye => RpcLimits::new(0, 0), // Goodbye request has no response Protocol::BlocksByRange => rpc_block_limits_by_fork(fork_context.current_fork_name()), Protocol::BlocksByRoot => rpc_block_limits_by_fork(fork_context.current_fork_name()), + Protocol::PayloadEnvelopesByRange => rpc_payload_limits(), + Protocol::PayloadEnvelopesByRoot => rpc_payload_limits(), Protocol::BlobsByRange => rpc_blob_limits::(), Protocol::BlobsByRoot => rpc_blob_limits::(), Protocol::DataColumnsByRoot => { @@ -586,6 +644,8 @@ impl ProtocolId { match self.versioned_protocol { SupportedProtocol::BlocksByRangeV2 | SupportedProtocol::BlocksByRootV2 + | SupportedProtocol::PayloadEnvelopesByRangeV1 + | SupportedProtocol::PayloadEnvelopesByRootV1 | SupportedProtocol::BlobsByRangeV1 | SupportedProtocol::BlobsByRootV1 | SupportedProtocol::DataColumnsByRootV1 @@ -737,6 +797,8 @@ pub enum RequestType { Goodbye(GoodbyeReason), BlocksByRange(OldBlocksByRangeRequest), BlocksByRoot(BlocksByRootRequest), + PayloadEnvelopesByRange(PayloadEnvelopesByRangeRequest), + PayloadEnvelopesByRoot(PayloadEnvelopesByRootRequest), BlobsByRange(BlobsByRangeRequest), BlobsByRoot(BlobsByRootRequest), DataColumnsByRoot(DataColumnsByRootRequest), @@ -760,6 +822,8 @@ impl RequestType { RequestType::Goodbye(_) => 0, RequestType::BlocksByRange(req) => *req.count(), RequestType::BlocksByRoot(req) => req.block_roots().len() as u64, + RequestType::PayloadEnvelopesByRange(req) => req.count, + RequestType::PayloadEnvelopesByRoot(req) => req.beacon_block_roots.len() as u64, RequestType::BlobsByRange(req) => req.max_blobs_requested(digest_epoch, spec), RequestType::BlobsByRoot(req) => req.blob_ids.len() as u64, RequestType::DataColumnsByRoot(req) => req.max_requested() as u64, @@ -789,6 +853,8 @@ impl RequestType { BlocksByRootRequest::V1(_) => SupportedProtocol::BlocksByRootV1, BlocksByRootRequest::V2(_) => SupportedProtocol::BlocksByRootV2, }, + RequestType::PayloadEnvelopesByRange(_) => SupportedProtocol::PayloadEnvelopesByRangeV1, + RequestType::PayloadEnvelopesByRoot(_) => SupportedProtocol::PayloadEnvelopesByRootV1, RequestType::BlobsByRange(_) => SupportedProtocol::BlobsByRangeV1, RequestType::BlobsByRoot(_) => SupportedProtocol::BlobsByRootV1, RequestType::DataColumnsByRoot(_) => SupportedProtocol::DataColumnsByRootV1, @@ -820,6 +886,8 @@ impl RequestType { // variants that have `multiple_responses()` can have values. RequestType::BlocksByRange(_) => ResponseTermination::BlocksByRange, RequestType::BlocksByRoot(_) => ResponseTermination::BlocksByRoot, + RequestType::PayloadEnvelopesByRange(_) => ResponseTermination::PayloadEnvelopesByRange, + RequestType::PayloadEnvelopesByRoot(_) => ResponseTermination::PayloadEnvelopesByRoot, RequestType::BlobsByRange(_) => ResponseTermination::BlobsByRange, RequestType::BlobsByRoot(_) => ResponseTermination::BlobsByRoot, RequestType::DataColumnsByRoot(_) => ResponseTermination::DataColumnsByRoot, @@ -854,6 +922,14 @@ impl RequestType { ProtocolId::new(SupportedProtocol::BlocksByRootV2, Encoding::SSZSnappy), ProtocolId::new(SupportedProtocol::BlocksByRootV1, Encoding::SSZSnappy), ], + RequestType::PayloadEnvelopesByRange(_) => vec![ProtocolId::new( + SupportedProtocol::PayloadEnvelopesByRangeV1, + Encoding::SSZSnappy, + )], + RequestType::PayloadEnvelopesByRoot(_) => vec![ProtocolId::new( + SupportedProtocol::PayloadEnvelopesByRootV1, + Encoding::SSZSnappy, + )], RequestType::BlobsByRange(_) => vec![ProtocolId::new( SupportedProtocol::BlobsByRangeV1, Encoding::SSZSnappy, @@ -905,6 +981,8 @@ impl RequestType { RequestType::BlocksByRange(_) => false, RequestType::BlocksByRoot(_) => false, RequestType::BlobsByRange(_) => false, + RequestType::PayloadEnvelopesByRange(_) => false, + RequestType::PayloadEnvelopesByRoot(_) => false, RequestType::BlobsByRoot(_) => false, RequestType::DataColumnsByRoot(_) => false, RequestType::DataColumnsByRange(_) => false, @@ -1015,6 +1093,12 @@ impl std::fmt::Display for RequestType { RequestType::Goodbye(reason) => write!(f, "Goodbye: {}", reason), RequestType::BlocksByRange(req) => write!(f, "Blocks by range: {}", req), RequestType::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req), + RequestType::PayloadEnvelopesByRange(req) => { + write!(f, "Payload envelopes by range: {:?}", req) + } + RequestType::PayloadEnvelopesByRoot(req) => { + write!(f, "Payload envelopes by root: {:?}", req) + } RequestType::BlobsByRange(req) => write!(f, "Blobs by range: {:?}", req), RequestType::BlobsByRoot(req) => write!(f, "Blobs by root: {:?}", req), RequestType::DataColumnsByRoot(req) => write!(f, "Data columns by root: {:?}", req), diff --git a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs index 2407038bc3..ebdca386d8 100644 --- a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs @@ -109,7 +109,11 @@ pub struct RPCRateLimiter { blbrange_rl: Limiter, /// BlobsByRoot rate limiter. blbroot_rl: Limiter, - /// DataColumnssByRoot rate limiter. + /// PayloadEnvelopesByRange rate limiter. + envrange_rl: Limiter, + /// PayloadEnvelopesByRoot rate limiter. + envroots_rl: Limiter, + /// DataColumnsByRoot rate limiter. dcbroot_rl: Limiter, /// DataColumnsByRange rate limiter. dcbrange_rl: Limiter, @@ -148,6 +152,10 @@ pub struct RPCRateLimiterBuilder { bbrange_quota: Option, /// Quota for the BlocksByRoot protocol. bbroots_quota: Option, + /// Quota for the ExecutionPayloadEnvelopesByRange protocol. + perange_quota: Option, + /// Quota for the ExecutionPayloadEnvelopesByRoot protocol. + peroots_quota: Option, /// Quota for the BlobsByRange protocol. blbrange_quota: Option, /// Quota for the BlobsByRoot protocol. @@ -177,6 +185,8 @@ impl RPCRateLimiterBuilder { Protocol::Goodbye => self.goodbye_quota = q, Protocol::BlocksByRange => self.bbrange_quota = q, Protocol::BlocksByRoot => self.bbroots_quota = q, + Protocol::PayloadEnvelopesByRange => self.perange_quota = q, + Protocol::PayloadEnvelopesByRoot => self.peroots_quota = q, Protocol::BlobsByRange => self.blbrange_quota = q, Protocol::BlobsByRoot => self.blbroot_quota = q, Protocol::DataColumnsByRoot => self.dcbroot_quota = q, @@ -201,6 +211,12 @@ impl RPCRateLimiterBuilder { let bbrange_quota = self .bbrange_quota .ok_or("BlocksByRange quota not specified")?; + let perange_quota = self + .perange_quota + .ok_or("PayloadEnvelopesByRange quota not specified")?; + let peroots_quota = self + .peroots_quota + .ok_or("PayloadEnvelopesByRoot quota not specified")?; let lc_bootstrap_quota = self .lcbootstrap_quota .ok_or("LightClientBootstrap quota not specified")?; @@ -236,6 +252,8 @@ impl RPCRateLimiterBuilder { let goodbye_rl = Limiter::from_quota(goodbye_quota)?; let bbroots_rl = Limiter::from_quota(bbroots_quota)?; let bbrange_rl = Limiter::from_quota(bbrange_quota)?; + let envrange_rl = Limiter::from_quota(perange_quota)?; + let envroots_rl = Limiter::from_quota(peroots_quota)?; let blbrange_rl = Limiter::from_quota(blbrange_quota)?; let blbroot_rl = Limiter::from_quota(blbroots_quota)?; let dcbroot_rl = Limiter::from_quota(dcbroot_quota)?; @@ -259,6 +277,8 @@ impl RPCRateLimiterBuilder { goodbye_rl, bbroots_rl, bbrange_rl, + envrange_rl, + envroots_rl, blbrange_rl, blbroot_rl, dcbroot_rl, @@ -312,6 +332,8 @@ impl RPCRateLimiter { goodbye_quota, blocks_by_range_quota, blocks_by_root_quota, + payload_envelopes_by_range_quota, + payload_envelopes_by_root_quota, blobs_by_range_quota, blobs_by_root_quota, data_columns_by_root_quota, @@ -329,6 +351,14 @@ impl RPCRateLimiter { .set_quota(Protocol::Goodbye, goodbye_quota) .set_quota(Protocol::BlocksByRange, blocks_by_range_quota) .set_quota(Protocol::BlocksByRoot, blocks_by_root_quota) + .set_quota( + Protocol::PayloadEnvelopesByRange, + payload_envelopes_by_range_quota, + ) + .set_quota( + Protocol::PayloadEnvelopesByRoot, + payload_envelopes_by_root_quota, + ) .set_quota(Protocol::BlobsByRange, blobs_by_range_quota) .set_quota(Protocol::BlobsByRoot, blobs_by_root_quota) .set_quota(Protocol::DataColumnsByRoot, data_columns_by_root_quota) @@ -376,6 +406,8 @@ impl RPCRateLimiter { Protocol::Goodbye => &mut self.goodbye_rl, Protocol::BlocksByRange => &mut self.bbrange_rl, Protocol::BlocksByRoot => &mut self.bbroots_rl, + Protocol::PayloadEnvelopesByRange => &mut self.envrange_rl, + Protocol::PayloadEnvelopesByRoot => &mut self.envroots_rl, Protocol::BlobsByRange => &mut self.blbrange_rl, Protocol::BlobsByRoot => &mut self.blbroot_rl, Protocol::DataColumnsByRoot => &mut self.dcbroot_rl, @@ -400,6 +432,8 @@ impl RPCRateLimiter { status_rl, bbrange_rl, bbroots_rl, + envrange_rl, + envroots_rl, blbrange_rl, blbroot_rl, dcbroot_rl, @@ -417,6 +451,8 @@ impl RPCRateLimiter { status_rl.prune(time_since_start); bbrange_rl.prune(time_since_start); bbroots_rl.prune(time_since_start); + envrange_rl.prune(time_since_start); + envroots_rl.prune(time_since_start); blbrange_rl.prune(time_since_start); blbroot_rl.prune(time_since_start); dcbrange_rl.prune(time_since_start); diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index d0323bab52..486a443857 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use types::{ BlobSidecar, DataColumnSidecar, Epoch, EthSpec, LightClientBootstrap, LightClientFinalityUpdate, LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, }; pub type Id = u32; @@ -160,6 +161,10 @@ pub enum Response { DataColumnsByRange(Option>>), /// A response to a get BLOCKS_BY_ROOT request. BlocksByRoot(Option>>), + /// A response to a get `EXECUTION_PAYLOAD_ENVELOPES_BY_ROOT` request. + PayloadEnvelopesByRoot(Option>>), + /// A response to a get `EXECUTION_PAYLOAD_ENVELOPES_BY_RANGE` request. + PayloadEnvelopesByRange(Option>>), /// A response to a get BLOBS_BY_ROOT request. BlobsByRoot(Option>>), /// A response to a get DATA_COLUMN_SIDECARS_BY_ROOT request. @@ -185,6 +190,16 @@ impl std::convert::From> for RpcResponse { Some(b) => RpcResponse::Success(RpcSuccessResponse::BlocksByRange(b)), None => RpcResponse::StreamTermination(ResponseTermination::BlocksByRange), }, + Response::PayloadEnvelopesByRoot(r) => match r { + Some(p) => RpcResponse::Success(RpcSuccessResponse::PayloadEnvelopesByRoot(p)), + None => RpcResponse::StreamTermination(ResponseTermination::PayloadEnvelopesByRoot), + }, + Response::PayloadEnvelopesByRange(r) => match r { + Some(p) => RpcResponse::Success(RpcSuccessResponse::PayloadEnvelopesByRange(p)), + None => { + RpcResponse::StreamTermination(ResponseTermination::PayloadEnvelopesByRange) + } + }, Response::BlobsByRoot(r) => match r { Some(b) => RpcResponse::Success(RpcSuccessResponse::BlobsByRoot(b)), None => RpcResponse::StreamTermination(ResponseTermination::BlobsByRoot), diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 184a334591..56fcbb3bb6 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -1524,6 +1524,28 @@ impl Network { request_type, }) } + RequestType::PayloadEnvelopesByRange(_) => { + metrics::inc_counter_vec( + &metrics::TOTAL_RPC_REQUESTS, + &["payload_envelopes_by_range"], + ); + Some(NetworkEvent::RequestReceived { + peer_id, + inbound_request_id, + request_type, + }) + } + RequestType::PayloadEnvelopesByRoot(_) => { + metrics::inc_counter_vec( + &metrics::TOTAL_RPC_REQUESTS, + &["payload_envelopes_by_root"], + ); + Some(NetworkEvent::RequestReceived { + peer_id, + inbound_request_id, + request_type, + }) + } RequestType::BlobsByRange(_) => { metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["blobs_by_range"]); Some(NetworkEvent::RequestReceived { @@ -1638,6 +1660,16 @@ impl Network { RpcSuccessResponse::BlocksByRoot(resp) => { self.build_response(id, peer_id, Response::BlocksByRoot(Some(resp))) } + RpcSuccessResponse::PayloadEnvelopesByRange(resp) => self.build_response( + id, + peer_id, + Response::PayloadEnvelopesByRange(Some(resp)), + ), + RpcSuccessResponse::PayloadEnvelopesByRoot(resp) => self.build_response( + id, + peer_id, + Response::PayloadEnvelopesByRoot(Some(resp)), + ), RpcSuccessResponse::BlobsByRoot(resp) => { self.build_response(id, peer_id, Response::BlobsByRoot(Some(resp))) } @@ -1672,6 +1704,12 @@ impl Network { let response = match termination { ResponseTermination::BlocksByRange => Response::BlocksByRange(None), ResponseTermination::BlocksByRoot => Response::BlocksByRoot(None), + ResponseTermination::PayloadEnvelopesByRange => { + Response::PayloadEnvelopesByRange(None) + } + ResponseTermination::PayloadEnvelopesByRoot => { + Response::PayloadEnvelopesByRoot(None) + } ResponseTermination::BlobsByRange => Response::BlobsByRange(None), ResponseTermination::BlobsByRoot => Response::BlobsByRoot(None), ResponseTermination::DataColumnsByRoot => Response::DataColumnsByRoot(None), diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index e40eacce08..f74e7dacfb 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -15,7 +15,8 @@ use beacon_processor::{ use lighthouse_network::rpc::InboundRequestId; use lighthouse_network::rpc::methods::{ BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest, - LightClientUpdatesByRangeRequest, + LightClientUpdatesByRangeRequest, PayloadEnvelopesByRangeRequest, + PayloadEnvelopesByRootRequest, }; use lighthouse_network::service::api_types::CustodyBackfillBatchId; use lighthouse_network::{ @@ -693,6 +694,46 @@ impl NetworkBeaconProcessor { }) } + /// Create a new work event to process `PayloadEnvelopesByRootRequest`s from the RPC network. + pub fn send_payload_envelopes_by_roots_request( + self: &Arc, + peer_id: PeerId, + inbound_request_id: InboundRequestId, // Use ResponseId here + request: PayloadEnvelopesByRootRequest, + ) -> Result<(), Error> { + let processor = self.clone(); + let process_fn = async move { + processor + .handle_payload_envelopes_by_root_request(peer_id, inbound_request_id, request) + .await; + }; + + self.try_send(BeaconWorkEvent { + drop_during_sync: false, + work: Work::PayloadEnvelopesByRootRequest(Box::pin(process_fn)), + }) + } + + /// Create a new work event to process `PayloadEnvelopesByRangeRequest`s from the RPC network. + pub fn send_payload_envelopes_by_range_request( + self: &Arc, + peer_id: PeerId, + inbound_request_id: InboundRequestId, + request: PayloadEnvelopesByRangeRequest, + ) -> Result<(), Error> { + let processor = self.clone(); + let process_fn = async move { + processor + .handle_payload_envelopes_by_range_request(peer_id, inbound_request_id, request) + .await; + }; + + self.try_send(BeaconWorkEvent { + drop_during_sync: false, + work: Work::PayloadEnvelopesByRangeRequest(Box::pin(process_fn)), + }) + } + /// Create a new work event to process `BlobsByRangeRequest`s from the RPC network. pub fn send_blobs_by_range_request( self: &Arc, diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 279870d444..8b31b67acb 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -3,10 +3,12 @@ use crate::network_beacon_processor::{FUTURE_SLOT_TOLERANCE, NetworkBeaconProces use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::SyncMessage; +use beacon_chain::payload_envelope_streamer::EnvelopeRequestSource; use beacon_chain::{BeaconChainError, BeaconChainTypes, BlockProcessStatus, WhenSlotSkipped}; use itertools::{Itertools, process_results}; use lighthouse_network::rpc::methods::{ BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest, + PayloadEnvelopesByRangeRequest, PayloadEnvelopesByRootRequest, }; use lighthouse_network::rpc::*; use lighthouse_network::{PeerId, ReportSource, Response, SyncInfo}; @@ -15,7 +17,7 @@ use slot_clock::SlotClock; use std::collections::{HashMap, HashSet, hash_map::Entry}; use std::sync::Arc; use tokio_stream::StreamExt; -use tracing::{Span, debug, error, field, instrument, warn}; +use tracing::{Span, debug, error, field, instrument, trace, warn}; use types::data::BlobIdentifier; use types::{ColumnIndex, Epoch, EthSpec, Hash256, Slot}; @@ -254,6 +256,104 @@ impl NetworkBeaconProcessor { Ok(()) } + /// Handle a `ExecutionPayloadEnvelopesByRoot` request from the peer. + #[instrument( + name = "lh_handle_payload_envelopes_by_root_request", + parent = None, + level = "debug", + skip_all, + fields(peer_id = %peer_id, client = tracing::field::Empty) + )] + pub async fn handle_payload_envelopes_by_root_request( + self: Arc, + peer_id: PeerId, + inbound_request_id: InboundRequestId, + request: PayloadEnvelopesByRootRequest, + ) { + let client = self.network_globals.client(&peer_id); + Span::current().record("client", field::display(client.kind)); + + self.terminate_response_stream( + peer_id, + inbound_request_id, + self.clone() + .handle_payload_envelopes_by_root_request_inner( + peer_id, + inbound_request_id, + request, + ) + .await, + Response::PayloadEnvelopesByRoot, + ); + } + + /// Handle a `ExecutionPayloadEnvelopesByRoot` request from the peer. + async fn handle_payload_envelopes_by_root_request_inner( + self: Arc, + peer_id: PeerId, + inbound_request_id: InboundRequestId, + request: PayloadEnvelopesByRootRequest, + ) -> Result<(), (RpcErrorResponse, &'static str)> { + let log_results = |peer_id, requested_envelopes, send_envelope_count| { + debug!( + %peer_id, + requested = requested_envelopes, + returned = %send_envelope_count, + "ExecutionPayloadEnvelopes outgoing response processed" + ); + }; + + let requested_envelopes = request.beacon_block_roots.len(); + let mut envelope_stream = self.chain.get_payload_envelopes( + request.beacon_block_roots.to_vec(), + EnvelopeRequestSource::ByRoot, + ); + // Fetching payload envelopes is async because it may have to hit the execution layer for payloads. + let mut send_envelope_count = 0; + while let Some((root, result)) = envelope_stream.next().await { + match result.as_ref() { + Ok(Some(envelope)) => { + self.send_response( + peer_id, + inbound_request_id, + Response::PayloadEnvelopesByRoot(Some(envelope.clone())), + ); + send_envelope_count += 1; + } + Ok(None) => { + debug!( + %peer_id, + request_root = ?root, + "Peer requested unknown payload envelope" + ); + } + Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => { + debug!( + block_root = ?root, + reason = "execution layer not synced", + "Failed to fetch execution payload for payload envelopes by root request" + ); + log_results(peer_id, requested_envelopes, send_envelope_count); + return Err(( + RpcErrorResponse::ResourceUnavailable, + "Execution layer not synced", + )); + } + Err(e) => { + debug!( + ?peer_id, + request_root = ?root, + error = ?e, + "Error fetching payload envelope for peer" + ); + } + } + } + log_results(peer_id, requested_envelopes, send_envelope_count); + + Ok(()) + } + /// Handle a `BlobsByRoot` request from the peer. #[instrument( name = "lh_handle_blobs_by_root_request", @@ -983,6 +1083,189 @@ impl NetworkBeaconProcessor { .collect::>()) } + /// Handle a `ExecutionPayloadEnvelopesByRange` request from the peer. + #[instrument( + name = "lh_handle_payload_envelopes_by_range_request", + parent = None, + level = "debug", + skip_all, + fields(peer_id = %peer_id, client = tracing::field::Empty) + )] + pub async fn handle_payload_envelopes_by_range_request( + self: Arc, + peer_id: PeerId, + inbound_request_id: InboundRequestId, + req: PayloadEnvelopesByRangeRequest, + ) { + let client = self.network_globals.client(&peer_id); + Span::current().record("client", field::display(client.kind)); + + self.terminate_response_stream( + peer_id, + inbound_request_id, + self.clone() + .handle_payload_envelopes_by_range_request_inner(peer_id, inbound_request_id, req) + .await, + Response::PayloadEnvelopesByRange, + ); + } + + /// Handle a `ExecutionPayloadEnvelopesByRange` request from the peer. + async fn handle_payload_envelopes_by_range_request_inner( + self: Arc, + peer_id: PeerId, + inbound_request_id: InboundRequestId, + req: PayloadEnvelopesByRangeRequest, + ) -> Result<(), (RpcErrorResponse, &'static str)> { + let req_start_slot = req.start_slot; + let req_count = req.count; + + debug!( + %peer_id, + count = req_count, + start_slot = %req_start_slot, + "Received ExecutionPayloadEnvelopesByRange Request" + ); + + let request_start_slot = Slot::from(req_start_slot); + let fork_name = self + .chain + .spec + .fork_name_at_slot::(request_start_slot); + + if !fork_name.gloas_enabled() { + return Err(( + RpcErrorResponse::InvalidRequest, + "Requested envelopes for pre-gloas slots", + )); + } + + // Spawn a blocking handle since get_block_roots_for_slot_range takes a sync lock on the + // fork-choice. + let network_beacon_processor = self.clone(); + let block_roots = self + .executor + .spawn_blocking_handle( + move || { + network_beacon_processor.get_block_roots_for_slot_range( + req_start_slot, + req_count, + "ExecutionPayloadEnvelopesByRange", + ) + }, + "get_block_roots_for_slot_range", + ) + .ok_or((RpcErrorResponse::ServerError, "shutting down"))? + .await + .map_err(|_| (RpcErrorResponse::ServerError, "tokio join"))?? + .iter() + .map(|(root, _)| *root) + .collect::>(); + + let current_slot = self + .chain + .slot() + .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); + + let log_results = |peer_id, payloads_sent| { + if payloads_sent < (req_count as usize) { + debug!( + %peer_id, + msg = "Failed to return all requested payload envelopes", + start_slot = %req_start_slot, + %current_slot, + requested = req_count, + returned = payloads_sent, + "ExecutionPayloadEnvelopesByRange outgoing response processed" + ); + } else { + debug!( + %peer_id, + start_slot = %req_start_slot, + %current_slot, + requested = req_count, + returned = payloads_sent, + "ExecutionPayloadEnvelopesByRange outgoing response processed" + ); + } + }; + + let mut envelope_stream = self + .chain + .get_payload_envelopes(block_roots, EnvelopeRequestSource::ByRange); + + // Fetching payload envelopes is async because it may have to hit the execution layer for payloads. + let mut envelopes_sent = 0; + while let Some((root, result)) = envelope_stream.next().await { + match result.as_ref() { + Ok(Some(envelope)) => { + // Due to skip slots, blocks could be out of the range, we ensure they + // are in the range before sending + if envelope.slot() >= req_start_slot + && envelope.slot() < req_start_slot.saturating_add(req.count) + { + envelopes_sent += 1; + self.send_network_message(NetworkMessage::SendResponse { + peer_id, + inbound_request_id, + response: Response::PayloadEnvelopesByRange(Some(envelope.clone())), + }); + } + } + Ok(None) => { + trace!( + request = ?req, + %peer_id, + request_root = ?root, + "No envelope for block root" + ); + } + Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => { + debug!( + block_root = ?root, + reason = "execution layer not synced", + "Failed to fetch execution payload for envelope by range request" + ); + log_results(peer_id, envelopes_sent); + // send the stream terminator + return Err(( + RpcErrorResponse::ResourceUnavailable, + "Execution layer not synced", + )); + } + Err(e) => { + if matches!( + e, + BeaconChainError::ExecutionLayerErrorPayloadReconstruction(_block_hash, boxed_error) + if matches!(**boxed_error, execution_layer::Error::EngineError(_)) + ) { + warn!( + info = "this may occur occasionally when the EE is busy", + block_root = ?root, + error = ?e, + "Error rebuilding payload for peer" + ); + } else { + error!( + block_root = ?root, + error = ?e, + "Error fetching payload envelope for peer" + ); + } + log_results(peer_id, envelopes_sent); + // send the stream terminator + return Err(( + RpcErrorResponse::ServerError, + "Failed fetching payload envelopes", + )); + } + } + } + + log_results(peer_id, envelopes_sent); + Ok(()) + } + /// Handle a `BlobsByRange` request from the peer. #[instrument( name = "lh_handle_blobs_by_range_request", diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index c5ccbc2ae6..d0f0557223 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -19,11 +19,14 @@ use beacon_chain::test_utils::{ }; use beacon_chain::{BeaconChain, WhenSlotSkipped}; use beacon_processor::{work_reprocessing_queue::*, *}; +use bls::Signature; +use fixed_bytes::FixedBytesExtended; use itertools::Itertools; use libp2p::gossipsub::MessageAcceptance; use lighthouse_network::rpc::InboundRequestId; use lighthouse_network::rpc::methods::{ BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, MetaDataV3, + PayloadEnvelopesByRangeRequest, PayloadEnvelopesByRootRequest, }; use lighthouse_network::{ Client, MessageId, NetworkConfig, NetworkGlobals, PeerId, Response, @@ -41,8 +44,9 @@ use std::time::Duration; use tokio::sync::mpsc; use types::{ AttesterSlashing, BlobSidecar, ChainSpec, DataColumnSidecarList, DataColumnSubnetId, Epoch, - EthSpec, Hash256, MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, - SignedVoluntaryExit, SingleAttestation, Slot, SubnetId, + EthSpec, ExecutionPayloadEnvelope, ExecutionPayloadGloas, ExecutionRequests, Hash256, + MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, SignedVoluntaryExit, SingleAttestation, Slot, SubnetId, }; use types::{ BlobSidecarList, @@ -522,6 +526,29 @@ impl TestRig { .unwrap(); } + pub fn enqueue_payload_envelopes_by_range_request(&self, start_slot: u64, count: u64) { + self.network_beacon_processor + .send_payload_envelopes_by_range_request( + PeerId::random(), + InboundRequestId::new_unchecked(42, 24), + PayloadEnvelopesByRangeRequest { start_slot, count }, + ) + .unwrap(); + } + + pub fn enqueue_payload_envelopes_by_root_request( + &self, + beacon_block_roots: RuntimeVariableList, + ) { + self.network_beacon_processor + .send_payload_envelopes_by_roots_request( + PeerId::random(), + InboundRequestId::new_unchecked(42, 24), + PayloadEnvelopesByRootRequest { beacon_block_roots }, + ) + .unwrap(); + } + pub fn enqueue_backfill_batch(&self, epoch: Epoch) { self.network_beacon_processor .send_chain_segment( @@ -2091,6 +2118,229 @@ async fn test_data_columns_by_range_no_duplicates_with_skip_slots() { ); } +/// Create a test `SignedExecutionPayloadEnvelope` with the given slot and beacon block root. +fn make_test_payload_envelope( + slot: Slot, + beacon_block_root: Hash256, +) -> SignedExecutionPayloadEnvelope { + SignedExecutionPayloadEnvelope { + message: ExecutionPayloadEnvelope { + payload: ExecutionPayloadGloas::default(), + execution_requests: ExecutionRequests::default(), + builder_index: 0, + beacon_block_root, + slot, + state_root: Hash256::zero(), + }, + signature: Signature::empty(), + } +} + +#[tokio::test] +async fn test_payload_envelopes_by_range() { + // Only test when Gloas fork is scheduled + if test_spec::().gloas_fork_epoch.is_none() { + return; + }; + + let mut rig = TestRig::new(64).await; + let start_slot = 0; + let slot_count = 32; + + // Manually store payload envelopes for each block in the range + let mut expected_roots = Vec::new(); + for slot in start_slot..slot_count { + if let Some(root) = rig + .chain + .block_root_at_slot(Slot::new(slot), WhenSlotSkipped::None) + .unwrap() + { + let envelope = make_test_payload_envelope(Slot::new(slot), root); + rig.chain + .store + .put_payload_envelope(&root, envelope) + .unwrap(); + expected_roots.push(root); + } + } + + rig.enqueue_payload_envelopes_by_range_request(start_slot, slot_count); + + let mut actual_roots = Vec::new(); + while let Some(next) = rig.network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::PayloadEnvelopesByRange(envelope), + inbound_request_id: _, + } = next + { + if let Some(env) = envelope { + actual_roots.push(env.beacon_block_root()); + } else { + break; + } + } else if let NetworkMessage::SendErrorResponse { .. } = next { + // Error response terminates the stream + break; + } else { + panic!("unexpected message {:?}", next); + } + } + assert_eq!(expected_roots, actual_roots); +} + +#[tokio::test] +async fn test_payload_envelopes_by_root() { + // Only test when Gloas fork is scheduled + if test_spec::().gloas_fork_epoch.is_none() { + return; + }; + + let mut rig = TestRig::new(64).await; + + let block_root = rig + .chain + .block_root_at_slot(Slot::new(1), WhenSlotSkipped::None) + .unwrap() + .unwrap(); + + // Manually store a payload envelope for this block + let envelope = make_test_payload_envelope(Slot::new(1), block_root); + rig.chain + .store + .put_payload_envelope(&block_root, envelope) + .unwrap(); + + let roots = RuntimeVariableList::new(vec![block_root], 1).unwrap(); + rig.enqueue_payload_envelopes_by_root_request(roots); + + let mut actual_roots = Vec::new(); + while let Some(next) = rig.network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::PayloadEnvelopesByRoot(envelope), + inbound_request_id: _, + } = next + { + if let Some(env) = envelope { + actual_roots.push(env.beacon_block_root()); + } else { + break; + } + } else { + panic!("unexpected message {:?}", next); + } + } + assert_eq!(vec![block_root], actual_roots); +} + +#[tokio::test] +async fn test_payload_envelopes_by_root_unknown_root_returns_empty() { + // Only test when Gloas fork is scheduled + if test_spec::().gloas_fork_epoch.is_none() { + return; + }; + + let mut rig = TestRig::new(64).await; + + // Request envelope for a root that has no stored envelope + let block_root = rig + .chain + .block_root_at_slot(Slot::new(1), WhenSlotSkipped::None) + .unwrap() + .unwrap(); + + // Don't store any envelope — the handler should return 0 envelopes + let roots = RuntimeVariableList::new(vec![block_root], 1).unwrap(); + rig.enqueue_payload_envelopes_by_root_request(roots); + + let mut actual_count = 0; + while let Some(next) = rig.network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::PayloadEnvelopesByRoot(envelope), + inbound_request_id: _, + } = next + { + if envelope.is_some() { + actual_count += 1; + } else { + break; + } + } else { + panic!("unexpected message {:?}", next); + } + } + assert_eq!(0, actual_count); +} + +#[tokio::test] +async fn test_payload_envelopes_by_range_no_duplicates_with_skip_slots() { + // Only test when Gloas fork is scheduled + if test_spec::().gloas_fork_epoch.is_none() { + return; + }; + + // Build a chain of 128 slots (4 epochs) with skip slots at positions 5 and 6. + let skip_slots: HashSet = [5, 6].into_iter().collect(); + let mut rig = TestRig::new_with_skip_slots(128, &skip_slots).await; + + let start_slot = 0u64; + let slot_count = 10u64; + + // Store payload envelopes for all blocks in the range (skipping the skip slots) + for slot in start_slot..slot_count { + if let Some(root) = rig + .chain + .block_root_at_slot(Slot::new(slot), WhenSlotSkipped::None) + .unwrap() + { + let envelope = make_test_payload_envelope(Slot::new(slot), root); + rig.chain + .store + .put_payload_envelope(&root, envelope) + .unwrap(); + } + } + + rig.enqueue_payload_envelopes_by_range_request(start_slot, slot_count); + + let mut beacon_block_roots: Vec = Vec::new(); + while let Some(next) = rig.network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::PayloadEnvelopesByRange(envelope), + inbound_request_id: _, + } = next + { + if let Some(env) = envelope { + beacon_block_roots.push(env.beacon_block_root()); + } else { + break; + } + } else if let NetworkMessage::SendErrorResponse { .. } = next { + break; + } else { + panic!("unexpected message {:?}", next); + } + } + + assert!( + !beacon_block_roots.is_empty(), + "Should have received at least some payload envelopes" + ); + + // Skip slots should not cause duplicate envelopes for the same block root + let unique_roots: HashSet<_> = beacon_block_roots.iter().collect(); + assert_eq!( + beacon_block_roots.len(), + unique_roots.len(), + "Response contained duplicate block roots: got {} envelopes but only {} unique roots", + beacon_block_roots.len(), + unique_roots.len(), + ); +} + // TODO(ePBS): Add integration tests for envelope deferral (UnknownBlockForEnvelope): // 1. Gossip envelope arrives before its block → queued via UnknownBlockForEnvelope // 2. Block imported → envelope released and processed successfully diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 77d64c92e6..e6982e6a84 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -229,6 +229,24 @@ impl Router { request, ), ), + RequestType::PayloadEnvelopesByRoot(request) => self + .handle_beacon_processor_send_result( + self.network_beacon_processor + .send_payload_envelopes_by_roots_request( + peer_id, + inbound_request_id, + request, + ), + ), + RequestType::PayloadEnvelopesByRange(request) => self + .handle_beacon_processor_send_result( + self.network_beacon_processor + .send_payload_envelopes_by_range_request( + peer_id, + inbound_request_id, + request, + ), + ), RequestType::BlobsByRange(request) => self.handle_beacon_processor_send_result( self.network_beacon_processor.send_blobs_by_range_request( peer_id, @@ -309,6 +327,11 @@ impl Router { Response::DataColumnsByRange(data_column) => { self.on_data_columns_by_range_response(peer_id, app_request_id, data_column); } + // TODO(EIP-7732): implement outgoing payload envelopes by range and root + // responses once sync manager requests them. + Response::PayloadEnvelopesByRoot(_) | Response::PayloadEnvelopesByRange(_) => { + debug!("Requesting envelopes by root and by range not supported yet"); + } // Light client responses should not be received Response::LightClientBootstrap(_) | Response::LightClientOptimisticUpdate(_) diff --git a/consensus/types/src/block/signed_beacon_block.rs b/consensus/types/src/block/signed_beacon_block.rs index b6218ba64d..dd6f52426a 100644 --- a/consensus/types/src/block/signed_beacon_block.rs +++ b/consensus/types/src/block/signed_beacon_block.rs @@ -377,6 +377,16 @@ impl> SignedBeaconBlock .map(|bid| bid.message.block_hash) } + /// Convenience accessor for the block's bid's `parent_block_hash`. + /// + /// This method returns an error prior to Gloas. + pub fn payload_bid_parent_block_hash(&self) -> Result { + self.message() + .body() + .signed_execution_payload_bid() + .map(|bid| bid.message.parent_block_hash) + } + /// Check if the `parent_hash` in this block's `signed_payload_bid` matches `parent_block_hash`. /// /// This function is useful post-Gloas for determining if the parent block is full, *without* diff --git a/consensus/types/src/core/chain_spec.rs b/consensus/types/src/core/chain_spec.rs index adf87dee94..2f3b5da956 100644 --- a/consensus/types/src/core/chain_spec.rs +++ b/consensus/types/src/core/chain_spec.rs @@ -295,6 +295,7 @@ pub struct ChainSpec { /* * Networking Gloas */ + pub max_request_payloads: u64, /* * Networking Derived @@ -305,6 +306,7 @@ pub struct ChainSpec { pub max_blocks_by_root_request_deneb: usize, pub max_blobs_by_root_request: usize, pub max_data_columns_by_root_request: usize, + pub max_payload_envelopes_by_root_request: usize, /* * Application params @@ -700,6 +702,10 @@ impl ChainSpec { } } + pub fn max_request_payloads(&self) -> usize { + self.max_request_payloads as usize + } + pub fn max_request_blob_sidecars(&self, fork_name: ForkName) -> usize { if fork_name.electra_enabled() { self.max_request_blob_sidecars_electra as usize @@ -964,6 +970,8 @@ impl ChainSpec { max_blobs_by_root_request_common(self.max_request_blob_sidecars); self.max_data_columns_by_root_request = max_data_columns_by_root_request_common::(self.max_request_blocks_deneb); + self.max_payload_envelopes_by_root_request = + max_blocks_by_root_request_common(self.max_request_payloads); self } @@ -1228,6 +1236,7 @@ impl ChainSpec { builder_payment_threshold_numerator: 6, builder_payment_threshold_denominator: 10, min_builder_withdrawability_delay: Epoch::new(4096), + max_request_payloads: 128, /* * Network specific @@ -1293,6 +1302,7 @@ impl ChainSpec { min_epochs_for_data_column_sidecars_requests: default_min_epochs_for_data_column_sidecars_requests(), max_data_columns_by_root_request: default_data_columns_by_root_request(), + max_payload_envelopes_by_root_request: default_max_payload_envelopes_by_root_request(), /* * Application specific @@ -1622,6 +1632,7 @@ impl ChainSpec { builder_payment_threshold_numerator: 6, builder_payment_threshold_denominator: 10, min_builder_withdrawability_delay: Epoch::new(4096), + max_request_payloads: 128, /* * Network specific @@ -1678,6 +1689,7 @@ impl ChainSpec { min_epochs_for_data_column_sidecars_requests: default_min_epochs_for_data_column_sidecars_requests(), max_data_columns_by_root_request: default_data_columns_by_root_request(), + max_payload_envelopes_by_root_request: default_max_payload_envelopes_by_root_request(), /* * Application specific @@ -2342,6 +2354,14 @@ fn default_data_columns_by_root_request() -> usize { max_data_columns_by_root_request_common::(default_max_request_blocks_deneb()) } +fn default_max_payload_envelopes_by_root_request() -> usize { + max_blocks_by_root_request_common(default_max_request_payloads()) +} + +fn default_max_request_payloads() -> u64 { + 128 +} + impl Default for Config { fn default() -> Self { let chain_spec = MainnetEthSpec::default_spec(); diff --git a/consensus/types/src/execution/execution_payload_envelope.rs b/consensus/types/src/execution/execution_payload_envelope.rs index 7f68dae037..169331a884 100644 --- a/consensus/types/src/execution/execution_payload_envelope.rs +++ b/consensus/types/src/execution/execution_payload_envelope.rs @@ -3,7 +3,9 @@ use crate::test_utils::TestRandom; use crate::{EthSpec, ForkName, Hash256, SignedRoot, Slot}; use context_deserialize::context_deserialize; use educe::Educe; +use fixed_bytes::FixedBytesExtended; use serde::{Deserialize, Serialize}; +use ssz::{BYTES_PER_LENGTH_OFFSET, Encode as SszEncode}; use ssz_derive::{Decode, Encode}; use test_random_derive::TestRandom; use tree_hash_derive::TreeHash; @@ -22,6 +24,44 @@ pub struct ExecutionPayloadEnvelope { pub state_root: Hash256, } +impl ExecutionPayloadEnvelope { + /// Returns an empty envelope with all fields zeroed. Used for SSZ size calculations. + pub fn empty() -> Self { + Self { + payload: ExecutionPayloadGloas::default(), + execution_requests: ExecutionRequests::default(), + builder_index: 0, + beacon_block_root: Hash256::zero(), + slot: Slot::new(0), + state_root: Hash256::zero(), + } + } + + /// Returns the minimum SSZ-encoded size (all variable-length fields empty). + pub fn min_size() -> usize { + Self::empty().as_ssz_bytes().len() + } + + /// Returns the maximum SSZ-encoded size. + #[allow(clippy::arithmetic_side_effects)] + pub fn max_size() -> usize { + Self::min_size() + // ExecutionPayloadGloas variable-length fields: + + (E::max_extra_data_bytes() * ::ssz_fixed_len()) + + (E::max_transactions_per_payload() + * (BYTES_PER_LENGTH_OFFSET + E::max_bytes_per_transaction())) + + (E::max_withdrawals_per_payload() + * ::ssz_fixed_len()) + // ExecutionRequests variable-length fields: + + (E::max_deposit_requests_per_payload() + * ::ssz_fixed_len()) + + (E::max_withdrawal_requests_per_payload() + * ::ssz_fixed_len()) + + (E::max_consolidation_requests_per_payload() + * ::ssz_fixed_len()) + } +} + impl SignedRoot for ExecutionPayloadEnvelope {} #[cfg(test)] diff --git a/consensus/types/src/execution/signed_execution_payload_envelope.rs b/consensus/types/src/execution/signed_execution_payload_envelope.rs index b1d949f863..76fa841680 100644 --- a/consensus/types/src/execution/signed_execution_payload_envelope.rs +++ b/consensus/types/src/execution/signed_execution_payload_envelope.rs @@ -8,6 +8,7 @@ use bls::{PublicKey, Signature}; use context_deserialize::context_deserialize; use educe::Educe; use serde::{Deserialize, Serialize}; +use ssz::Encode; use ssz_derive::{Decode, Encode}; use test_random_derive::TestRandom; use tree_hash_derive::TreeHash; @@ -22,6 +23,24 @@ pub struct SignedExecutionPayloadEnvelope { } impl SignedExecutionPayloadEnvelope { + /// Returns the minimum SSZ-encoded size (all variable-length fields empty). + pub fn min_size() -> usize { + Self { + message: ExecutionPayloadEnvelope::empty(), + signature: Signature::empty(), + } + .as_ssz_bytes() + .len() + } + + /// Returns the maximum SSZ-encoded size. + #[allow(clippy::arithmetic_side_effects)] + pub fn max_size() -> usize { + // Signature is fixed-size, so the variable-length delta is entirely from the envelope. + Self::min_size() + ExecutionPayloadEnvelope::::max_size() + - ExecutionPayloadEnvelope::::min_size() + } + pub fn slot(&self) -> Slot { self.message.slot }