diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 6df05921c2..5de38ad9a5 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -287,7 +287,7 @@ impl RangeBlockComponentsRequest { // Gloas path: if payloads are present, produce Gloas blocks let resp = if let Some(payloads_req) = &self.payloads_request { let payloads = payloads_req.to_finished().expect("checked above").to_vec(); - Self::responses_gloas( + Self::responses_with_envelopes_and_columns( blocks.to_vec(), payloads, data_columns, @@ -408,97 +408,43 @@ impl RangeBlockComponentsRequest { where T: BeaconChainTypes, { - // Group data columns by block_root and index - let mut data_columns_by_block = - HashMap::>>>::new(); - - for column in data_columns { - let block_root = column.block_root(); - let index = *column.index(); - if data_columns_by_block - .entry(block_root) - .or_default() - .insert(index, column) - .is_some() - { - // `DataColumnsByRangeRequestItems` ensures that we do not request any duplicated indices across all peers - // we request the data from. - // If there are duplicated indices, its likely a peer sending us the same index multiple times. - // However we can still proceed even if there are extra columns, just log an error. - debug!(?block_root, ?index, "Repeated column for block_root"); - continue; - } - } - - // Now iterate all blocks ensuring that the block roots of each block and data column match, - // plus we have columns for our custody requirements + let mut columns_by_root = Self::group_columns_by_root(data_columns); let mut range_sync_blocks = Vec::with_capacity(blocks.len()); - let exceeded_retries = attempt >= MAX_COLUMN_RETRIES; + for block in blocks { let block_root = get_block_root(&block); range_sync_blocks.push(if block.num_expected_blobs() > 0 { - let Some(mut data_columns_by_index) = data_columns_by_block.remove(&block_root) - else { - let responsible_peers = column_to_peer.iter().map(|c| (*c.0, *c.1)).collect(); - return Err(CouplingError::DataColumnPeerFailure { - error: format!("No columns for block {block_root:?} with data"), - faulty_peers: responsible_peers, - exceeded_retries, - - }); - }; - - let mut custody_columns = vec![]; - let mut naughty_peers = vec![]; - for index in expects_custody_columns { - // Safe to convert to `CustodyDataColumn`: we have asserted that the index of - // this column is in the set of `expects_custody_columns` and with the expected - // block root, so for the expected epoch of this batch. - if let Some(data_column) = data_columns_by_index.remove(index) { - custody_columns.push(CustodyDataColumn::from_asserted_custody(data_column)); - } else { - let Some(responsible_peer) = column_to_peer.get(index) else { - return Err(CouplingError::InternalError(format!("Internal error, no request made for column {}", index))); - }; - naughty_peers.push((*index, *responsible_peer)); - } - } - if !naughty_peers.is_empty() { - return Err(CouplingError::DataColumnPeerFailure { - error: format!("Peers did not return column for block_root {block_root:?} {naughty_peers:?}"), - faulty_peers: naughty_peers, - exceeded_retries - }); - } - - // Assert that there are no columns left - if !data_columns_by_index.is_empty() { - let remaining_indices = data_columns_by_index.keys().collect::>(); - // log the error but don't return an error, we can still progress with extra columns. - debug!( - ?block_root, - ?remaining_indices, - "Not all columns consumed for block" - ); - } - - let block_data = AvailableBlockData::new_with_data_columns(custody_columns.iter().map(|c| c.as_data_column().clone()).collect::>()); - + // Safe to convert to `CustodyDataColumn`: we have asserted that the index of + // this column is in the set of `expects_custody_columns` and with the expected + // block root, so for the expected epoch of this batch. + let columns = Self::extract_custody_columns_for_root( + block_root, + &mut columns_by_root, + expects_custody_columns, + &column_to_peer, + exceeded_retries, + )?; + let custody_columns = columns + .into_iter() + .map(CustodyDataColumn::from_asserted_custody) + .collect::>(); + let block_data = AvailableBlockData::new_with_data_columns( + custody_columns + .iter() + .map(|c| c.as_data_column().clone()) + .collect::>(), + ); RangeSyncBlock::new(block, block_data, &da_checker, spec.clone()) .map_err(|e| CouplingError::InternalError(format!("{:?}", e)))? } else { - // Block has no data, expects zero columns RangeSyncBlock::new(block, AvailableBlockData::NoData, &da_checker, spec.clone()) .map_err(|e| CouplingError::InternalError(format!("{:?}", e)))? }); } - // Assert that there are no columns left for other blocks - if !data_columns_by_block.is_empty() { - let remaining_roots = data_columns_by_block.keys().collect::>(); - // log the error but don't return an error, we can still progress with responses. - // this is most likely an internal error with overrequesting or a client bug. + if !columns_by_root.is_empty() { + let remaining_roots = columns_by_root.keys().collect::>(); debug!(?remaining_roots, "Not all columns consumed for block"); } @@ -506,7 +452,7 @@ impl RangeBlockComponentsRequest { } /// Couples blocks with payload envelopes and custody columns for Gloas range sync. - fn responses_gloas( + fn responses_with_envelopes_and_columns( blocks: Vec>>, payloads: Vec>>, data_columns: DataColumnSidecarList, @@ -515,22 +461,7 @@ impl RangeBlockComponentsRequest { attempt: usize, spec: Arc, ) -> Result>, CouplingError> { - let mut data_columns_by_block = - HashMap::>>>::new(); - - for column in data_columns { - let block_root = column.block_root(); - let index = *column.index(); - if data_columns_by_block - .entry(block_root) - .or_default() - .insert(index, column) - .is_some() - { - debug!(?block_root, ?index, "Repeated column for block_root"); - } - } - + let mut columns_by_root = Self::group_columns_by_root(data_columns); let mut range_sync_blocks = Vec::with_capacity(blocks.len()); let mut payload_iter = payloads.into_iter().peekable(); let exceeded_retries = attempt >= MAX_COLUMN_RETRIES; @@ -553,41 +484,13 @@ impl RangeBlockComponentsRequest { "Missing payload envelope for block {block_root:?} with blobs" )) })?; - - let Some(mut data_columns_by_index) = data_columns_by_block.remove(&block_root) - else { - let responsible_peers = column_to_peer.iter().map(|c| (*c.0, *c.1)).collect(); - return Err(CouplingError::DataColumnPeerFailure { - error: format!("No columns for block {block_root:?} with data"), - faulty_peers: responsible_peers, - exceeded_retries, - }); - }; - - let mut custody_columns = vec![]; - let mut naughty_peers = vec![]; - for index in expects_custody_columns { - if let Some(data_column) = data_columns_by_index.remove(index) { - custody_columns.push(data_column); - } else { - let Some(responsible_peer) = column_to_peer.get(index) else { - return Err(CouplingError::InternalError(format!( - "Internal error, no request made for column {index}" - ))); - }; - naughty_peers.push((*index, *responsible_peer)); - } - } - if !naughty_peers.is_empty() { - return Err(CouplingError::DataColumnPeerFailure { - error: format!( - "Peers did not return column for block_root {block_root:?} {naughty_peers:?}" - ), - faulty_peers: naughty_peers, - exceeded_retries, - }); - } - + let custody_columns = Self::extract_custody_columns_for_root( + block_root, + &mut columns_by_root, + expects_custody_columns, + &column_to_peer, + exceeded_retries, + )?; Some(Box::new(AvailableEnvelope::new( envelope.block_hash(), envelope, @@ -618,8 +521,8 @@ impl RangeBlockComponentsRequest { ); } - if !data_columns_by_block.is_empty() { - let remaining_roots = data_columns_by_block.keys().collect::>(); + if !columns_by_root.is_empty() { + let remaining_roots = columns_by_root.keys().collect::>(); debug!( ?remaining_roots, "Not all columns consumed for Gloas blocks" @@ -628,6 +531,85 @@ impl RangeBlockComponentsRequest { Ok(range_sync_blocks) } + + /// Groups data columns by their block root, logging and skipping duplicates. + fn group_columns_by_root( + data_columns: DataColumnSidecarList, + ) -> HashMap>>> { + let mut by_root = + HashMap::>>>::new(); + for column in data_columns { + let block_root = column.block_root(); + let index = *column.index(); + if by_root + .entry(block_root) + .or_default() + .insert(index, column) + .is_some() + { + // `DataColumnsByRangeRequestItems` ensures no duplicated indices across peers. + // Duplicates are likely a peer sending the same index multiple times; log and skip. + debug!(?block_root, ?index, "Repeated column for block_root"); + } + } + by_root + } + + /// Extracts and validates custody columns for a single block root. + /// + /// Removes the matching entry from `columns_by_root`, checks all expected indices are + /// present, and logs any extras. Returns the raw columns; callers wrap them as needed. + fn extract_custody_columns_for_root( + block_root: Hash256, + columns_by_root: &mut HashMap>>>, + expects_custody_columns: &[ColumnIndex], + column_to_peer: &HashMap, + exceeded_retries: bool, + ) -> Result>>, CouplingError> { + let Some(mut by_index) = columns_by_root.remove(&block_root) else { + let responsible_peers = column_to_peer.iter().map(|c| (*c.0, *c.1)).collect(); + return Err(CouplingError::DataColumnPeerFailure { + error: format!("No columns for block {block_root:?} with data"), + faulty_peers: responsible_peers, + exceeded_retries, + }); + }; + + let mut columns = vec![]; + let mut naughty_peers = vec![]; + for index in expects_custody_columns { + if let Some(col) = by_index.remove(index) { + columns.push(col); + } else { + let Some(responsible_peer) = column_to_peer.get(index) else { + return Err(CouplingError::InternalError(format!( + "Internal error, no request made for column {index}" + ))); + }; + naughty_peers.push((*index, *responsible_peer)); + } + } + if !naughty_peers.is_empty() { + return Err(CouplingError::DataColumnPeerFailure { + error: format!( + "Peers did not return column for block_root {block_root:?} {naughty_peers:?}" + ), + faulty_peers: naughty_peers, + exceeded_retries, + }); + } + + if !by_index.is_empty() { + let remaining_indices = by_index.keys().collect::>(); + debug!( + ?block_root, + ?remaining_indices, + "Not all columns consumed for block" + ); + } + + Ok(columns) + } } impl ByRangeRequest { @@ -662,6 +644,8 @@ mod tests { NumBlobs, generate_rand_block_and_blobs, generate_rand_block_and_data_columns, test_da_checker, test_spec, }; + use bls::Signature; + use lighthouse_network::service::api_types::PayloadEnvelopesByRangeRequestId; use lighthouse_network::{ PeerId, service::api_types::{ @@ -672,7 +656,11 @@ mod tests { use rand::SeedableRng; use std::{collections::HashMap, sync::Arc}; use tracing::Span; - use types::{Epoch, ForkName, MinimalEthSpec as E, SignedBeaconBlock, test_utils::XorShiftRng}; + use types::{ + Epoch, EthSpec, ExecutionBlockHash, ExecutionPayloadEnvelope, ExecutionPayloadGloas, + ExecutionRequests, ForkName, Hash256, MinimalEthSpec as E, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, Slot, test_utils::XorShiftRng, + }; fn components_id() -> ComponentsByRangeRequestId { ComponentsByRangeRequestId { @@ -759,6 +747,7 @@ mod tests { blocks_req_id, Some(blobs_req_id), None, + None, Span::none(), ); @@ -818,6 +807,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expects_custody_columns.clone())), + None, Span::none(), ); // Send blocks and complete terminate response @@ -894,6 +884,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); @@ -986,6 +977,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); @@ -1083,6 +1075,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); @@ -1198,6 +1191,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); @@ -1269,4 +1263,170 @@ mod tests { panic!("Expected PeerFailure error with exceeded_retries=true"); } } + + // --- Gloas tests --- + + fn make_gloas_envelope( + slot: Slot, + rng: &mut impl rand::Rng, + ) -> Arc> { + let envelope = ExecutionPayloadEnvelope { + payload: ExecutionPayloadGloas { + slot_number: slot, + block_hash: ExecutionBlockHash::from_root(Hash256::from(rng.random::<[u8; 32]>())), + ..ExecutionPayloadGloas::default() + }, + execution_requests: ExecutionRequests::default(), + builder_index: 0, + beacon_block_root: Hash256::from(rng.random::<[u8; 32]>()), + }; + Arc::new(SignedExecutionPayloadEnvelope { + message: envelope, + signature: Signature::empty(), + }) + } + + fn envelope_id( + parent_request_id: ComponentsByRangeRequestId, + ) -> PayloadEnvelopesByRangeRequestId { + use lighthouse_network::service::api_types::PayloadEnvelopesByRangeRequestId; + PayloadEnvelopesByRangeRequestId { + id: 99, + parent_request_id, + } + } + + #[test] + fn gloas_blocks_couple_with_envelopes() { + let mut spec = test_spec::(); + spec.deneb_fork_epoch = Some(Epoch::new(0)); + spec.fulu_fork_epoch = Some(Epoch::new(0)); + spec.gloas_fork_epoch = Some(Epoch::new(0)); + let spec = Arc::new(spec); + let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); + let mut rng = XorShiftRng::from_seed([42; 16]); + + let blocks = (0..4) + .map(|_| { + let (raw_block, _) = generate_rand_block_and_data_columns::( + ForkName::Gloas, + NumBlobs::None, + &mut rng, + &spec, + ); + Arc::new(raw_block) as Arc> + }) + .collect::>(); + + // Build envelopes with slots matching each block + let envelopes: Vec>> = blocks + .iter() + .map(|b| make_gloas_envelope::(b.slot(), &mut rng)) + .collect(); + + let components_id = components_id(); + let blocks_req_id = blocks_id(components_id); + let env_req_id = envelope_id(components_id); + + let mut info = RangeBlockComponentsRequest::::new( + blocks_req_id, + None, + None, + Some(env_req_id), + Span::none(), + ); + + info.add_blocks(blocks_req_id, blocks).unwrap(); + // Not finished — envelopes still pending + assert!(!is_finished(&mut info)); + + info.add_payload_envelopes(env_req_id, envelopes).unwrap(); + + let result = info.responses(da_checker, spec).unwrap(); + assert!(result.is_ok()); + assert_eq!(result.unwrap().len(), 4); + } + + #[test] + fn gloas_blocks_without_envelopes_succeed() { + // Blocks with no blobs don't require envelopes — they should couple fine with an empty envelope response. + let mut spec = test_spec::(); + spec.deneb_fork_epoch = Some(Epoch::new(0)); + spec.fulu_fork_epoch = Some(Epoch::new(0)); + spec.gloas_fork_epoch = Some(Epoch::new(0)); + let spec = Arc::new(spec); + let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); + let mut rng = XorShiftRng::from_seed([42; 16]); + + let (raw_block, _) = generate_rand_block_and_data_columns::( + ForkName::Gloas, + NumBlobs::None, + &mut rng, + &spec, + ); + let block: Arc> = Arc::new(raw_block); + + let components_id = components_id(); + let blocks_req_id = blocks_id(components_id); + let env_req_id = envelope_id(components_id); + + let mut info = RangeBlockComponentsRequest::::new( + blocks_req_id, + None, + None, + Some(env_req_id), + Span::none(), + ); + + info.add_blocks(blocks_req_id, vec![block]).unwrap(); + // No envelope for this block (peer didn't send one) + info.add_payload_envelopes(env_req_id, vec![]).unwrap(); + + let result = info.responses(da_checker, spec).unwrap(); + assert!(result.is_ok(), "expected Ok, got: {:?}", result); + assert_eq!(result.unwrap().len(), 1); + } + + #[test] + fn gloas_extra_envelopes_are_ignored() { + let mut spec = test_spec::(); + spec.deneb_fork_epoch = Some(Epoch::new(0)); + spec.fulu_fork_epoch = Some(Epoch::new(0)); + spec.gloas_fork_epoch = Some(Epoch::new(0)); + let spec = Arc::new(spec); + let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); + let mut rng = XorShiftRng::from_seed([99; 16]); + + let (raw_block, _) = generate_rand_block_and_data_columns::( + ForkName::Gloas, + NumBlobs::None, + &mut rng, + &spec, + ); + let block: Arc> = Arc::new(raw_block); + let slot = block.slot(); + + let components_id = components_id(); + let blocks_req_id = blocks_id(components_id); + let env_req_id = envelope_id(components_id); + + let mut info = RangeBlockComponentsRequest::::new( + blocks_req_id, + None, + None, + Some(env_req_id), + Span::none(), + ); + + info.add_blocks(blocks_req_id, vec![block]).unwrap(); + // Two envelopes: one matching, one extra at a different slot + let env1 = make_gloas_envelope::(slot, &mut rng); + let env2 = make_gloas_envelope::(Slot::new(slot.as_u64() + 10), &mut rng); + info.add_payload_envelopes(env_req_id, vec![env1, env2]) + .unwrap(); + + let result = info.responses(da_checker, spec).unwrap(); + assert!(result.is_ok()); + assert_eq!(result.unwrap().len(), 1); + } }