diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index be73ef15d7..456e8794e7 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -2,10 +2,11 @@ use crate::data_availability_checker::{AvailabilityCheckError, DataAvailabilityC pub use crate::data_availability_checker::{ AvailableBlock, AvailableBlockData, MaybeAvailableBlock, }; +use crate::payload_envelope_verification::AvailableEnvelope; use crate::{BeaconChainTypes, PayloadVerificationOutcome}; -use educe::Educe; use state_processing::ConsensusContext; use std::fmt::{Debug, Formatter}; +use std::hash::{Hash, Hasher}; use std::sync::Arc; use types::data::BlobIdentifier; use types::{ @@ -40,43 +41,63 @@ impl LookupBlock { } } -/// A fully available block that has been constructed by range sync. -/// The block contains all the data required to import into fork choice. -/// This includes any and all blobs/columns required, including zero if -/// none are required. This can happen if the block is pre-deneb or if -/// it's simply past the DA boundary. -#[derive(Clone, Educe)] -#[educe(Hash(bound(E: EthSpec)))] -pub struct RangeSyncBlock { - block: AvailableBlock, +/// A block that has been constructed by range sync with all required data. +/// +/// - `Base`: Pre-Gloas blocks bundled as an `AvailableBlock` (block + blobs/columns). +/// - `Gloas`: Post-Gloas blocks where the execution payload is a separate envelope. +#[derive(Clone)] +pub enum RangeSyncBlock { + Base(AvailableBlock), + Gloas { + block: Arc>, + envelope: Option>>, + }, +} + +impl Hash for RangeSyncBlock { + fn hash(&self, state: &mut H) { + self.block_root().hash(state); + } } impl Debug for RangeSyncBlock { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "RpcBlock({:?})", self.block_root()) + write!(f, "RangeSyncBlock({:?})", self.block_root()) } } impl RangeSyncBlock { pub fn block_root(&self) -> Hash256 { - self.block.block_root() + match self { + Self::Base(block) => block.block_root(), + Self::Gloas { block, .. } => block.canonical_root(), + } } pub fn as_block(&self) -> &SignedBeaconBlock { - self.block.block() + match self { + Self::Base(block) => block.block(), + Self::Gloas { block, .. } => block, + } } pub fn block_cloned(&self) -> Arc> { - self.block.block_cloned() + match self { + Self::Base(block) => block.block_cloned(), + Self::Gloas { block, .. } => block.clone(), + } } pub fn block_data(&self) -> &AvailableBlockData { - self.block.data() + match self { + Self::Base(block) => block.data(), + Self::Gloas { .. } => { + unreachable!("block_data called on Gloas variant — use envelope data instead") + } + } } -} -impl RangeSyncBlock { - /// Constructs an `RangeSyncBlock` from a block and availability data. + /// Constructs a `Base` variant from a block and availability data. /// /// # Errors /// @@ -94,32 +115,54 @@ impl RangeSyncBlock { T: BeaconChainTypes, { let available_block = AvailableBlock::new(block, block_data, da_checker, spec)?; - Ok(Self { - block: available_block, - }) + Ok(Self::Base(available_block)) + } + + /// Constructs a `Gloas` variant from a block and optional available envelope. + pub fn new_gloas( + block: Arc>, + envelope: Option>>, + ) -> Self { + Self::Gloas { block, envelope } } #[allow(clippy::type_complexity)] pub fn deconstruct(self) -> (Hash256, Arc>, AvailableBlockData) { - self.block.deconstruct() + match self { + Self::Base(block) => block.deconstruct(), + Self::Gloas { .. } => { + unreachable!("deconstruct called on Gloas variant") + } + } } pub fn n_blobs(&self) -> usize { - match self.block_data() { - AvailableBlockData::NoData | AvailableBlockData::DataColumns(_) => 0, - AvailableBlockData::Blobs(blobs) => blobs.len(), + match self { + Self::Base(block) => match block.data() { + AvailableBlockData::NoData | AvailableBlockData::DataColumns(_) => 0, + AvailableBlockData::Blobs(blobs) => blobs.len(), + }, + Self::Gloas { .. } => 0, } } pub fn n_data_columns(&self) -> usize { - match self.block_data() { - AvailableBlockData::NoData | AvailableBlockData::Blobs(_) => 0, - AvailableBlockData::DataColumns(columns) => columns.len(), + match self { + Self::Base(block) => match block.data() { + AvailableBlockData::NoData | AvailableBlockData::Blobs(_) => 0, + AvailableBlockData::DataColumns(columns) => columns.len(), + }, + Self::Gloas { .. } => 0, } } pub fn into_available_block(self) -> AvailableBlock { - self.block + match self { + Self::Base(block) => block, + Self::Gloas { .. } => { + unreachable!("into_available_block called on Gloas variant") + } + } } } @@ -405,13 +448,13 @@ impl AsBlock for RangeSyncBlock { self.as_block().message() } fn as_block(&self) -> &SignedBeaconBlock { - self.block.as_block() + RangeSyncBlock::as_block(self) } fn block_cloned(&self) -> Arc> { - self.block.block_cloned() + RangeSyncBlock::block_cloned(self) } fn canonical_root(&self) -> Hash256 { - self.block.block_root() + self.block_root() } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 4372efa809..1eb59b9b7b 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -20,7 +20,7 @@ use tracing::{debug, error, instrument}; use types::data::{BlobIdentifier, FixedBlobSidecarList}; use types::{ BlobSidecar, BlobSidecarList, BlockImportSource, ChainSpec, DataColumnSidecar, - DataColumnSidecarList, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot, + DataColumnSidecarList, Epoch, EthSpec, ForkName, Hash256, SignedBeaconBlock, Slot, }; mod error; @@ -420,6 +420,11 @@ impl DataAvailabilityChecker { self.da_check_required_for_epoch(epoch) && self.spec.is_peer_das_enabled_for_epoch(epoch) } + /// Determines if execution payload envelopes are required for an epoch (Gloas and later). + pub fn envelopes_required_for_epoch(&self, epoch: Epoch) -> bool { + self.spec.fork_name_at_epoch(epoch) >= ForkName::Gloas + } + /// See `Self::blobs_required_for_epoch` fn blobs_required_for_block(&self, block: &SignedBeaconBlock) -> bool { block.num_expected_blobs() > 0 && self.blobs_required_for_epoch(block.epoch()) diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs index 8ca6871dda..b4628ae2d0 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs @@ -47,7 +47,7 @@ pub struct EnvelopeImportData { pub post_state: Box>, } -#[derive(Debug)] +#[derive(Debug, Clone)] #[allow(dead_code)] pub struct AvailableEnvelope { execution_block_hash: ExecutionBlockHash, diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index a190a42a80..0b1d84b706 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -33,6 +33,8 @@ pub enum SyncRequestId { DataColumnsByRange(DataColumnsByRangeRequestId), /// Request searching for an execution payload envelope given a block root. SinglePayloadEnvelope { id: SingleLookupReqId }, + /// Payload envelopes by range request + PayloadEnvelopesByRange(PayloadEnvelopesByRangeRequestId), } /// Request ID for data_columns_by_root requests. Block lookups do not issue this request directly. @@ -78,6 +80,14 @@ pub enum DataColumnsByRangeRequester { CustodyBackfillSync(CustodyBackFillBatchRequestId), } +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct PayloadEnvelopesByRangeRequestId { + /// Id to identify this attempt at a payload_envelopes_by_range request for `parent_request_id` + pub id: Id, + /// The Id of the overall By Range request for block components. + pub parent_request_id: ComponentsByRangeRequestId, +} + /// Block components by range request for range sync. Includes an ID for downstream consumers to /// handle retries and tie all their sub requests together. #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] @@ -254,6 +264,12 @@ macro_rules! impl_display { impl_display!(BlocksByRangeRequestId, "{}/{}", id, parent_request_id); impl_display!(BlobsByRangeRequestId, "{}/{}", id, parent_request_id); impl_display!(DataColumnsByRangeRequestId, "{}/{}", id, parent_request_id); +impl_display!( + PayloadEnvelopesByRangeRequestId, + "{}/{}", + id, + parent_request_id +); impl_display!(ComponentsByRangeRequestId, "{}/{}", id, requester); impl_display!(DataColumnsByRootRequestId, "{}/{}", id, requester); impl_display!(SingleLookupReqId, "{}/Lookup/{}", req_id, lookup_id); diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index f6d4940121..f14816139c 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -619,6 +619,15 @@ impl NetworkBeaconProcessor { return; }; + // TODO(gloas): Implement Gloas chain segment processing. + // Gloas blocks carry separate envelopes and need a different import path. + if downloaded_blocks + .iter() + .any(|b| matches!(b, RangeSyncBlock::Gloas { .. })) + { + todo!("Gloas chain segment processing"); + } + let start_slot = downloaded_blocks.first().map(|b| b.slot().as_u64()); let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64()); let sent_blocks = downloaded_blocks.len(); diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 3fb2196975..faa55b48bf 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -333,10 +333,8 @@ impl Router { Response::PayloadEnvelopesByRoot(envelope) => { self.on_payload_envelopes_by_root_response(peer_id, app_request_id, envelope); } - // TODO(EIP-7732): implement outgoing payload envelopes by range responses once - // range sync requests them. - Response::PayloadEnvelopesByRange(_) => { - unreachable!() + Response::PayloadEnvelopesByRange(envelope) => { + self.on_payload_envelopes_by_range_response(peer_id, app_request_id, envelope); } // Light client responses should not be received Response::LightClientBootstrap(_) @@ -834,6 +832,29 @@ impl Router { } } + pub fn on_payload_envelopes_by_range_response( + &mut self, + peer_id: PeerId, + app_request_id: AppRequestId, + envelope: Option>>, + ) { + trace!( + %peer_id, + "Received PayloadEnvelopesByRange Response" + ); + + if let AppRequestId::Sync(sync_request_id) = app_request_id { + self.send_to_sync(SyncMessage::RpcPayloadEnvelope { + peer_id, + sync_request_id, + envelope, + seen_timestamp: timestamp_now(), + }); + } else { + crit!("All payload envelopes by range responses should belong to sync"); + } + } + fn handle_beacon_processor_send_result( &mut self, result: Result<(), crate::network_beacon_processor::Error>, diff --git a/beacon_node/network/src/sync/batch.rs b/beacon_node/network/src/sync/batch.rs index 10af1bf503..e0704e2569 100644 --- a/beacon_node/network/src/sync/batch.rs +++ b/beacon_node/network/src/sync/batch.rs @@ -33,6 +33,7 @@ pub type BatchId = Epoch; #[strum(serialize_all = "snake_case")] pub enum ByRangeRequestType { BlocksAndColumns, + BlocksAndEnvelopesAndColumns, BlocksAndBlobs, Blocks, Columns(HashSet), diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 98cf3e0a1f..e475b60de9 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -4,11 +4,13 @@ use beacon_chain::{ data_availability_checker::DataAvailabilityChecker, data_column_verification::CustodyDataColumn, get_block_root, + payload_envelope_verification::AvailableEnvelope, }; use lighthouse_network::{ PeerId, service::api_types::{ BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId, + PayloadEnvelopesByRangeRequestId, }, }; use ssz_types::RuntimeVariableList; @@ -16,7 +18,7 @@ use std::{collections::HashMap, sync::Arc}; use tracing::{Span, debug}; use types::{ BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, - Hash256, SignedBeaconBlock, + Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, }; use crate::sync::network_context::MAX_COLUMN_RETRIES; @@ -35,6 +37,13 @@ use crate::sync::network_context::MAX_COLUMN_RETRIES; pub struct RangeBlockComponentsRequest { /// Blocks we have received awaiting for their corresponding sidecar. blocks_request: ByRangeRequest>>>, + /// Payload envelopes (Gloas+). None for pre-Gloas forks. + payloads_request: Option< + ByRangeRequest< + PayloadEnvelopesByRangeRequestId, + Vec>>, + >, + >, /// Sidecars we have received awaiting for their corresponding block. block_data_request: RangeBlockDataRequest, /// Span to track the range request and all children range requests. @@ -88,6 +97,7 @@ impl RangeBlockComponentsRequest { Vec<(DataColumnsByRangeRequestId, Vec)>, Vec, )>, + payloads_req_id: Option, request_span: Span, ) -> Self { let block_data_request = if let Some(blobs_req_id) = blobs_req_id { @@ -109,6 +119,7 @@ impl RangeBlockComponentsRequest { Self { blocks_request: ByRangeRequest::Active(blocks_req_id), + payloads_request: payloads_req_id.map(ByRangeRequest::Active), block_data_request, request_span, } @@ -191,6 +202,18 @@ impl RangeBlockComponentsRequest { } } + /// Adds received payload envelopes to the request. + pub fn add_payload_envelopes( + &mut self, + req_id: PayloadEnvelopesByRangeRequestId, + envelopes: Vec>>, + ) -> Result<(), String> { + match &mut self.payloads_request { + Some(req) => req.finish(req_id, envelopes), + None => Err("received payload envelopes but none expected".to_owned()), + } + } + /// Attempts to construct RPC blocks from all received components. /// /// Returns `None` if not all expected requests have completed. @@ -208,6 +231,13 @@ impl RangeBlockComponentsRequest { return None; }; + // If payloads are expected, they must also be complete before we can produce responses. + if let Some(payloads_req) = &self.payloads_request + && payloads_req.to_finished().is_none() + { + return None; + } + // Increment the attempt once this function returns the response or errors match &mut self.block_data_request { RangeBlockDataRequest::NoData => Some(Self::responses_with_blobs( @@ -254,15 +284,29 @@ impl RangeBlockComponentsRequest { } } - let resp = Self::responses_with_custody_columns( - blocks.to_vec(), - data_columns, - column_to_peer_id, - expected_custody_columns, - *attempt, - da_checker, - spec, - ); + // Gloas path: if payloads are present, produce Gloas blocks + let resp = if let Some(payloads_req) = &self.payloads_request { + let payloads = payloads_req.to_finished().expect("checked above").to_vec(); + Self::responses_gloas( + blocks.to_vec(), + payloads, + data_columns, + column_to_peer_id, + expected_custody_columns, + *attempt, + spec, + ) + } else { + Self::responses_with_custody_columns( + blocks.to_vec(), + data_columns, + column_to_peer_id, + expected_custody_columns, + *attempt, + da_checker, + spec, + ) + }; if let Err(CouplingError::DataColumnPeerFailure { error: _, @@ -460,6 +504,136 @@ impl RangeBlockComponentsRequest { Ok(range_sync_blocks) } + + /// Couples blocks with payload envelopes and custody columns for Gloas. + /// In Gloas, columns are associated with the envelope (not the block directly). + fn responses_gloas( + blocks: Vec>>, + payloads: Vec>>, + data_columns: DataColumnSidecarList, + column_to_peer: HashMap, + expects_custody_columns: &[ColumnIndex], + attempt: usize, + spec: Arc, + ) -> Result>, CouplingError> { + // Group data columns by block_root + let mut data_columns_by_block = + HashMap::>>>::new(); + + for column in data_columns { + let block_root = column.block_root(); + let index = *column.index(); + if data_columns_by_block + .entry(block_root) + .or_default() + .insert(index, column) + .is_some() + { + debug!(?block_root, ?index, "Repeated column for block_root"); + } + } + + let mut range_sync_blocks = Vec::with_capacity(blocks.len()); + let mut payload_iter = payloads.into_iter().peekable(); + let exceeded_retries = attempt >= MAX_COLUMN_RETRIES; + + for block in blocks { + // Match payload envelope to block by slot + let mut envelope_for_block = None; + if payload_iter + .peek() + .map(|e| e.message.slot == block.slot()) + .unwrap_or(false) + { + envelope_for_block = payload_iter.next(); + } + + let block_root = get_block_root(&block); + + let available_envelope = if block.num_expected_blobs() > 0 { + // Block has blobs — envelope and columns are required + let envelope = envelope_for_block.ok_or_else(|| { + CouplingError::InternalError(format!( + "Missing payload envelope for block {block_root:?} with blobs" + )) + })?; + + let Some(mut data_columns_by_index) = data_columns_by_block.remove(&block_root) + else { + let responsible_peers = column_to_peer.iter().map(|c| (*c.0, *c.1)).collect(); + return Err(CouplingError::DataColumnPeerFailure { + error: format!("No columns for block {block_root:?} with data"), + faulty_peers: responsible_peers, + exceeded_retries, + }); + }; + + let mut custody_columns = vec![]; + let mut naughty_peers = vec![]; + for index in expects_custody_columns { + if let Some(data_column) = data_columns_by_index.remove(index) { + custody_columns.push(data_column); + } else { + let Some(responsible_peer) = column_to_peer.get(index) else { + return Err(CouplingError::InternalError(format!( + "Internal error, no request made for column {index}" + ))); + }; + naughty_peers.push((*index, *responsible_peer)); + } + } + if !naughty_peers.is_empty() { + return Err(CouplingError::DataColumnPeerFailure { + error: format!( + "Peers did not return column for block_root {block_root:?} {naughty_peers:?}" + ), + faulty_peers: naughty_peers, + exceeded_retries, + }); + } + + Some(Box::new(AvailableEnvelope::new( + envelope.block_hash(), + envelope, + custody_columns, + None, + spec.clone(), + ))) + } else { + envelope_for_block.map(|envelope| { + Box::new(AvailableEnvelope::new( + envelope.block_hash(), + envelope, + vec![], + None, + spec.clone(), + )) + }) + }; + + range_sync_blocks.push(RangeSyncBlock::new_gloas(block, available_envelope)); + } + + // Log any remaining unmatched payloads + if payload_iter.next().is_some() { + let remaining = payload_iter.count() + 1; + debug!( + remaining, + "Received payload envelopes that don't pair with blocks" + ); + } + + // Log remaining unmatched columns + if !data_columns_by_block.is_empty() { + let remaining_roots = data_columns_by_block.keys().collect::>(); + debug!( + ?remaining_roots, + "Not all columns consumed for Gloas blocks" + ); + } + + Ok(range_sync_blocks) + } } impl ByRangeRequest { @@ -560,7 +734,7 @@ mod tests { let blocks_req_id = blocks_id(components_id()); let mut info = - RangeBlockComponentsRequest::::new(blocks_req_id, None, None, Span::none()); + RangeBlockComponentsRequest::::new(blocks_req_id, None, None, None, Span::none()); // Send blocks and complete terminate response info.add_blocks(blocks_req_id, blocks).unwrap(); @@ -591,6 +765,7 @@ mod tests { blocks_req_id, Some(blobs_req_id), None, + None, Span::none(), ); @@ -650,6 +825,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expects_custody_columns.clone())), + None, Span::none(), ); // Send blocks and complete terminate response @@ -726,6 +902,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); @@ -818,6 +995,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); @@ -915,6 +1093,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); @@ -1030,6 +1209,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 256752d5fb..64c10e8f46 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -60,7 +60,8 @@ use lighthouse_network::service::api_types::{ BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, CustodyBackFillBatchRequestId, CustodyBackfillBatchId, CustodyRequester, DataColumnsByRangeRequestId, DataColumnsByRangeRequester, DataColumnsByRootRequestId, - DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, + DataColumnsByRootRequester, Id, PayloadEnvelopesByRangeRequestId, SingleLookupReqId, + SyncRequestId, }; use lighthouse_network::types::{NetworkGlobals, SyncState}; use lighthouse_network::{PeerAction, PeerId}; @@ -522,6 +523,8 @@ impl SyncManager { SyncRequestId::SinglePayloadEnvelope { id } => { self.on_single_envelope_response(id, peer_id, RpcEvent::RPCError(error)) } + SyncRequestId::PayloadEnvelopesByRange(req_id) => self + .on_payload_envelopes_by_range_response(req_id, peer_id, RpcEvent::RPCError(error)), } } @@ -1262,8 +1265,15 @@ impl SyncManager { peer_id, RpcEvent::from_chunk(envelope, seen_timestamp), ), + SyncRequestId::PayloadEnvelopesByRange(req_id) => { + self.on_payload_envelopes_by_range_response( + req_id, + peer_id, + RpcEvent::from_chunk(envelope, seen_timestamp), + ); + } _ => { - crit!(%peer_id, "bad request id for payload envelope"); + crit!(%peer_id, "bad request id for payload_envelope"); } } } @@ -1302,6 +1312,24 @@ impl SyncManager { } } + fn on_payload_envelopes_by_range_response( + &mut self, + id: PayloadEnvelopesByRangeRequestId, + peer_id: PeerId, + envelope: RpcEvent>>, + ) { + if let Some(resp) = self + .network + .on_payload_envelopes_by_range_response(id, peer_id, envelope) + { + self.on_range_components_response( + id.parent_request_id, + peer_id, + RangeBlockComponent::PayloadEnvelope(id, resp), + ); + } + } + fn on_single_blob_response( &mut self, id: SingleLookupReqId, diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index e9d289b777..d512c9e24f 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -22,14 +22,17 @@ use beacon_chain::block_verification_types::{AsBlock, RangeSyncBlock}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState}; use custody::CustodyRequestResult; use fnv::FnvHashMap; -use lighthouse_network::rpc::methods::{BlobsByRangeRequest, DataColumnsByRangeRequest}; +use lighthouse_network::rpc::methods::{ + BlobsByRangeRequest, DataColumnsByRangeRequest, PayloadEnvelopesByRangeRequest, +}; use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError, RequestType}; pub use lighthouse_network::service::api_types::RangeRequestId; use lighthouse_network::service::api_types::{ AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, CustodyBackFillBatchRequestId, CustodyBackfillBatchId, CustodyId, CustodyRequester, DataColumnsByRangeRequestId, DataColumnsByRangeRequester, DataColumnsByRootRequestId, - DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, + DataColumnsByRootRequester, Id, PayloadEnvelopesByRangeRequestId, SingleLookupReqId, + SyncRequestId, }; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource}; use parking_lot::RwLock; @@ -37,7 +40,8 @@ pub use requests::LookupVerifyError; use requests::{ ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems, BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems, - PayloadEnvelopesByRootRequestItems, PayloadEnvelopesByRootSingleRequest, + PayloadEnvelopesByRangeRequestItems, PayloadEnvelopesByRootRequestItems, + PayloadEnvelopesByRootSingleRequest, }; #[cfg(test)] use slot_clock::SlotClock; @@ -217,6 +221,11 @@ pub struct SyncNetworkContext { /// A mapping of active PayloadEnvelopesByRoot requests payload_envelopes_by_root_requests: ActiveRequests>, + /// A mapping of active PayloadEnvelopesByRange requests + payload_envelopes_by_range_requests: ActiveRequests< + PayloadEnvelopesByRangeRequestId, + PayloadEnvelopesByRangeRequestItems, + >, /// Mapping of active custody column requests for a block root custody_by_root_requests: FnvHashMap>, @@ -254,6 +263,10 @@ pub enum RangeBlockComponent { DataColumnsByRangeRequestId, RpcResponseResult>>>, ), + PayloadEnvelope( + PayloadEnvelopesByRangeRequestId, + RpcResponseResult>>>, + ), } #[cfg(test)] @@ -303,6 +316,7 @@ impl SyncNetworkContext { blobs_by_range_requests: ActiveRequests::new("blobs_by_range"), data_columns_by_range_requests: ActiveRequests::new("data_columns_by_range"), payload_envelopes_by_root_requests: ActiveRequests::new("payload_envelopes_by_root"), + payload_envelopes_by_range_requests: ActiveRequests::new("payload_envelopes_by_range"), custody_by_root_requests: <_>::default(), components_by_range_requests: FnvHashMap::default(), custody_backfill_data_column_batch_requests: FnvHashMap::default(), @@ -332,6 +346,7 @@ impl SyncNetworkContext { blobs_by_range_requests, data_columns_by_range_requests, payload_envelopes_by_root_requests, + payload_envelopes_by_range_requests, // custody_by_root_requests is a meta request of data_columns_by_root_requests custody_by_root_requests: _, // components_by_range_requests is a meta request of various _by_range requests @@ -371,6 +386,10 @@ impl SyncNetworkContext { .active_requests_of_peer(peer_id) .into_iter() .map(|id| SyncRequestId::SinglePayloadEnvelope { id: *id }); + let payload_envelope_by_range_ids = payload_envelopes_by_range_requests + .active_requests_of_peer(peer_id) + .into_iter() + .map(|req_id| SyncRequestId::PayloadEnvelopesByRange(*req_id)); blocks_by_root_ids .chain(blobs_by_root_ids) .chain(data_column_by_root_ids) @@ -378,6 +397,7 @@ impl SyncNetworkContext { .chain(blobs_by_range_ids) .chain(data_column_by_range_ids) .chain(envelope_by_root_ids) + .chain(payload_envelope_by_range_ids) .collect() } @@ -435,6 +455,7 @@ impl SyncNetworkContext { blobs_by_range_requests, data_columns_by_range_requests, payload_envelopes_by_root_requests, + payload_envelopes_by_range_requests, // custody_by_root_requests is a meta request of data_columns_by_root_requests custody_by_root_requests: _, // components_by_range_requests is a meta request of various _by_range requests @@ -458,6 +479,7 @@ impl SyncNetworkContext { .chain(blobs_by_range_requests.iter_request_peers()) .chain(data_columns_by_range_requests.iter_request_peers()) .chain(payload_envelopes_by_root_requests.iter_request_peers()) + .chain(payload_envelopes_by_range_requests.iter_request_peers()) { *active_request_count_by_peer.entry(peer_id).or_default() += 1; } @@ -590,24 +612,26 @@ impl SyncNetworkContext { }; // Attempt to find all required custody peers before sending any request or creating an ID - let columns_by_range_peers_to_request = - if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) { - let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); - let column_indexes = self - .chain - .sampling_columns_for_epoch(epoch) - .iter() - .cloned() - .collect(); - Some(self.select_columns_by_range_peers_to_request( - &column_indexes, - column_peers, - active_request_count_by_peer, - peers_to_deprioritize, - )?) - } else { - None - }; + let columns_by_range_peers_to_request = if matches!( + batch_type, + ByRangeRequestType::BlocksAndColumns | ByRangeRequestType::BlocksAndEnvelopesAndColumns + ) { + let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); + let column_indexes = self + .chain + .sampling_columns_for_epoch(epoch) + .iter() + .cloned() + .collect(); + Some(self.select_columns_by_range_peers_to_request( + &column_indexes, + column_peers, + active_request_count_by_peer, + peers_to_deprioritize, + )?) + } else { + None + }; // Create the overall components_by_range request ID before its individual components let id = ComponentsByRangeRequestId { @@ -672,6 +696,28 @@ impl SyncNetworkContext { .transpose()?; let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); + + // Send envelope request for Gloas epochs + let payloads_req_id = + if matches!(batch_type, ByRangeRequestType::BlocksAndEnvelopesAndColumns) { + Some(self.send_payload_envelopes_by_range_request( + block_peer, + PayloadEnvelopesByRangeRequest { + start_slot: *request.start_slot(), + count: *request.count(), + }, + id, + new_range_request_span!( + self, + "outgoing_envelopes_by_range", + range_request_span.clone(), + block_peer + ), + )?) + } else { + None + }; + let info = RangeBlockComponentsRequest::new( blocks_req_id, blobs_req_id, @@ -681,6 +727,7 @@ impl SyncNetworkContext { self.chain.sampling_columns_for_epoch(epoch).to_vec(), ) }), + payloads_req_id, range_request_span, ); self.components_by_range_requests.insert(id, info); @@ -783,6 +830,17 @@ impl SyncNetworkContext { }) }) } + RangeBlockComponent::PayloadEnvelope(req_id, resp) => { + resp.and_then(|(envelopes, _)| { + request + .add_payload_envelopes(req_id, envelopes) + .map_err(|e| { + RpcResponseError::BlockComponentCouplingError( + CouplingError::InternalError(e), + ) + }) + }) + } } } { entry.remove(); @@ -1352,6 +1410,57 @@ impl SyncNetworkContext { Ok((id, requested_columns)) } + fn send_payload_envelopes_by_range_request( + &mut self, + peer_id: PeerId, + request: PayloadEnvelopesByRangeRequest, + parent_request_id: ComponentsByRangeRequestId, + request_span: Span, + ) -> Result { + let id = PayloadEnvelopesByRangeRequestId { + id: self.next_id(), + parent_request_id, + }; + + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request: RequestType::PayloadEnvelopesByRange(request.clone()), + app_request_id: AppRequestId::Sync(SyncRequestId::PayloadEnvelopesByRange(id)), + }) + .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; + + debug!( + method = "PayloadEnvelopesByRange", + slots = request.count, + epoch = %Slot::new(request.start_slot).epoch(T::EthSpec::slots_per_epoch()), + peer = %peer_id, + %id, + "Sync RPC request sent" + ); + + self.payload_envelopes_by_range_requests.insert( + id, + peer_id, + false, + PayloadEnvelopesByRangeRequestItems::new(request), + request_span, + ); + Ok(id) + } + + #[allow(clippy::type_complexity)] + pub(crate) fn on_payload_envelopes_by_range_response( + &mut self, + id: PayloadEnvelopesByRangeRequestId, + peer_id: PeerId, + rpc_event: RpcEvent>>, + ) -> Option>>>> { + let resp = self + .payload_envelopes_by_range_requests + .on_response(id, rpc_event); + self.on_rpc_response_result(resp, peer_id) + } + pub fn is_execution_engine_online(&self) -> bool { self.execution_engine_state == EngineState::Online } @@ -1433,6 +1542,12 @@ impl SyncNetworkContext { ); if self + .chain + .data_availability_checker + .envelopes_required_for_epoch(epoch) + { + ByRangeRequestType::BlocksAndEnvelopesAndColumns + } else if self .chain .data_availability_checker .data_columns_required_for_epoch(epoch) @@ -1900,6 +2015,10 @@ impl SyncNetworkContext { "data_columns_by_range", self.data_columns_by_range_requests.len(), ), + ( + "payload_envelopes_by_range", + self.payload_envelopes_by_range_requests.len(), + ), ("custody_by_root", self.custody_by_root_requests.len()), ( "components_by_range", diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 5b5e779d9b..7ba0838ee1 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -19,6 +19,7 @@ pub use data_columns_by_root::{ pub use payload_envelopes_by_root::{ PayloadEnvelopesByRootRequestItems, PayloadEnvelopesByRootSingleRequest, }; +pub use payload_envelopes_by_range::PayloadEnvelopesByRangeRequestItems; use crate::metrics; @@ -31,6 +32,7 @@ mod blocks_by_root; mod data_columns_by_range; mod data_columns_by_root; mod payload_envelopes_by_root; +mod payload_envelopes_by_range; #[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum LookupVerifyError { diff --git a/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_range.rs b/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_range.rs new file mode 100644 index 0000000000..3d4ea8248b --- /dev/null +++ b/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_range.rs @@ -0,0 +1,42 @@ +use super::{ActiveRequestItems, LookupVerifyError}; +use lighthouse_network::rpc::methods::PayloadEnvelopesByRangeRequest; +use std::sync::Arc; +use types::{EthSpec, SignedExecutionPayloadEnvelope}; + +/// Accumulates results of a payload_envelopes_by_range request. Only returns items after +/// receiving the stream termination. +pub struct PayloadEnvelopesByRangeRequestItems { + request: PayloadEnvelopesByRangeRequest, + items: Vec>>, +} + +impl PayloadEnvelopesByRangeRequestItems { + pub fn new(request: PayloadEnvelopesByRangeRequest) -> Self { + Self { + request, + items: vec![], + } + } +} + +impl ActiveRequestItems for PayloadEnvelopesByRangeRequestItems { + type Item = Arc>; + + fn add(&mut self, envelope: Self::Item) -> Result { + let slot = envelope.slot(); + if slot < self.request.start_slot || slot >= self.request.start_slot + self.request.count { + return Err(LookupVerifyError::UnrequestedSlot(slot)); + } + if self.items.iter().any(|existing| existing.slot() == slot) { + return Err(LookupVerifyError::DuplicatedData(slot, 0)); + } + + self.items.push(envelope); + + Ok(false) + } + + fn consume(&mut self) -> Vec { + std::mem::take(&mut self.items) + } +}