This commit is contained in:
Eitan Seri-Levi
2026-04-28 21:12:22 +02:00
parent d9cadaeb0d
commit 46c9f313e3

View File

@@ -287,7 +287,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
// Gloas path: if payloads are present, produce Gloas blocks
let resp = if let Some(payloads_req) = &self.payloads_request {
let payloads = payloads_req.to_finished().expect("checked above").to_vec();
Self::responses_gloas(
Self::responses_with_envelopes_and_columns(
blocks.to_vec(),
payloads,
data_columns,
@@ -408,97 +408,43 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
where
T: BeaconChainTypes<EthSpec = E>,
{
// Group data columns by block_root and index
let mut data_columns_by_block =
HashMap::<Hash256, HashMap<ColumnIndex, Arc<DataColumnSidecar<E>>>>::new();
for column in data_columns {
let block_root = column.block_root();
let index = *column.index();
if data_columns_by_block
.entry(block_root)
.or_default()
.insert(index, column)
.is_some()
{
// `DataColumnsByRangeRequestItems` ensures that we do not request any duplicated indices across all peers
// we request the data from.
// If there are duplicated indices, its likely a peer sending us the same index multiple times.
// However we can still proceed even if there are extra columns, just log an error.
debug!(?block_root, ?index, "Repeated column for block_root");
continue;
}
}
// Now iterate all blocks ensuring that the block roots of each block and data column match,
// plus we have columns for our custody requirements
let mut columns_by_root = Self::group_columns_by_root(data_columns);
let mut range_sync_blocks = Vec::with_capacity(blocks.len());
let exceeded_retries = attempt >= MAX_COLUMN_RETRIES;
for block in blocks {
let block_root = get_block_root(&block);
range_sync_blocks.push(if block.num_expected_blobs() > 0 {
let Some(mut data_columns_by_index) = data_columns_by_block.remove(&block_root)
else {
let responsible_peers = column_to_peer.iter().map(|c| (*c.0, *c.1)).collect();
return Err(CouplingError::DataColumnPeerFailure {
error: format!("No columns for block {block_root:?} with data"),
faulty_peers: responsible_peers,
exceeded_retries,
});
};
let mut custody_columns = vec![];
let mut naughty_peers = vec![];
for index in expects_custody_columns {
// Safe to convert to `CustodyDataColumn`: we have asserted that the index of
// this column is in the set of `expects_custody_columns` and with the expected
// block root, so for the expected epoch of this batch.
if let Some(data_column) = data_columns_by_index.remove(index) {
custody_columns.push(CustodyDataColumn::from_asserted_custody(data_column));
} else {
let Some(responsible_peer) = column_to_peer.get(index) else {
return Err(CouplingError::InternalError(format!("Internal error, no request made for column {}", index)));
};
naughty_peers.push((*index, *responsible_peer));
}
}
if !naughty_peers.is_empty() {
return Err(CouplingError::DataColumnPeerFailure {
error: format!("Peers did not return column for block_root {block_root:?} {naughty_peers:?}"),
faulty_peers: naughty_peers,
exceeded_retries
});
}
// Assert that there are no columns left
if !data_columns_by_index.is_empty() {
let remaining_indices = data_columns_by_index.keys().collect::<Vec<_>>();
// log the error but don't return an error, we can still progress with extra columns.
debug!(
?block_root,
?remaining_indices,
"Not all columns consumed for block"
);
}
let block_data = AvailableBlockData::new_with_data_columns(custody_columns.iter().map(|c| c.as_data_column().clone()).collect::<Vec<_>>());
// Safe to convert to `CustodyDataColumn`: we have asserted that the index of
// this column is in the set of `expects_custody_columns` and with the expected
// block root, so for the expected epoch of this batch.
let columns = Self::extract_custody_columns_for_root(
block_root,
&mut columns_by_root,
expects_custody_columns,
&column_to_peer,
exceeded_retries,
)?;
let custody_columns = columns
.into_iter()
.map(CustodyDataColumn::from_asserted_custody)
.collect::<Vec<_>>();
let block_data = AvailableBlockData::new_with_data_columns(
custody_columns
.iter()
.map(|c| c.as_data_column().clone())
.collect::<Vec<_>>(),
);
RangeSyncBlock::new(block, block_data, &da_checker, spec.clone())
.map_err(|e| CouplingError::InternalError(format!("{:?}", e)))?
} else {
// Block has no data, expects zero columns
RangeSyncBlock::new(block, AvailableBlockData::NoData, &da_checker, spec.clone())
.map_err(|e| CouplingError::InternalError(format!("{:?}", e)))?
});
}
// Assert that there are no columns left for other blocks
if !data_columns_by_block.is_empty() {
let remaining_roots = data_columns_by_block.keys().collect::<Vec<_>>();
// log the error but don't return an error, we can still progress with responses.
// this is most likely an internal error with overrequesting or a client bug.
if !columns_by_root.is_empty() {
let remaining_roots = columns_by_root.keys().collect::<Vec<_>>();
debug!(?remaining_roots, "Not all columns consumed for block");
}
@@ -506,7 +452,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
}
/// Couples blocks with payload envelopes and custody columns for Gloas range sync.
fn responses_gloas(
fn responses_with_envelopes_and_columns(
blocks: Vec<Arc<SignedBeaconBlock<E>>>,
payloads: Vec<Arc<SignedExecutionPayloadEnvelope<E>>>,
data_columns: DataColumnSidecarList<E>,
@@ -515,22 +461,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
attempt: usize,
spec: Arc<ChainSpec>,
) -> Result<Vec<RangeSyncBlock<E>>, CouplingError> {
let mut data_columns_by_block =
HashMap::<Hash256, HashMap<ColumnIndex, Arc<DataColumnSidecar<E>>>>::new();
for column in data_columns {
let block_root = column.block_root();
let index = *column.index();
if data_columns_by_block
.entry(block_root)
.or_default()
.insert(index, column)
.is_some()
{
debug!(?block_root, ?index, "Repeated column for block_root");
}
}
let mut columns_by_root = Self::group_columns_by_root(data_columns);
let mut range_sync_blocks = Vec::with_capacity(blocks.len());
let mut payload_iter = payloads.into_iter().peekable();
let exceeded_retries = attempt >= MAX_COLUMN_RETRIES;
@@ -553,41 +484,13 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
"Missing payload envelope for block {block_root:?} with blobs"
))
})?;
let Some(mut data_columns_by_index) = data_columns_by_block.remove(&block_root)
else {
let responsible_peers = column_to_peer.iter().map(|c| (*c.0, *c.1)).collect();
return Err(CouplingError::DataColumnPeerFailure {
error: format!("No columns for block {block_root:?} with data"),
faulty_peers: responsible_peers,
exceeded_retries,
});
};
let mut custody_columns = vec![];
let mut naughty_peers = vec![];
for index in expects_custody_columns {
if let Some(data_column) = data_columns_by_index.remove(index) {
custody_columns.push(data_column);
} else {
let Some(responsible_peer) = column_to_peer.get(index) else {
return Err(CouplingError::InternalError(format!(
"Internal error, no request made for column {index}"
)));
};
naughty_peers.push((*index, *responsible_peer));
}
}
if !naughty_peers.is_empty() {
return Err(CouplingError::DataColumnPeerFailure {
error: format!(
"Peers did not return column for block_root {block_root:?} {naughty_peers:?}"
),
faulty_peers: naughty_peers,
exceeded_retries,
});
}
let custody_columns = Self::extract_custody_columns_for_root(
block_root,
&mut columns_by_root,
expects_custody_columns,
&column_to_peer,
exceeded_retries,
)?;
Some(Box::new(AvailableEnvelope::new(
envelope.block_hash(),
envelope,
@@ -618,8 +521,8 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
);
}
if !data_columns_by_block.is_empty() {
let remaining_roots = data_columns_by_block.keys().collect::<Vec<_>>();
if !columns_by_root.is_empty() {
let remaining_roots = columns_by_root.keys().collect::<Vec<_>>();
debug!(
?remaining_roots,
"Not all columns consumed for Gloas blocks"
@@ -628,6 +531,85 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
Ok(range_sync_blocks)
}
/// Groups data columns by their block root, logging and skipping duplicates.
fn group_columns_by_root(
data_columns: DataColumnSidecarList<E>,
) -> HashMap<Hash256, HashMap<ColumnIndex, Arc<DataColumnSidecar<E>>>> {
let mut by_root =
HashMap::<Hash256, HashMap<ColumnIndex, Arc<DataColumnSidecar<E>>>>::new();
for column in data_columns {
let block_root = column.block_root();
let index = *column.index();
if by_root
.entry(block_root)
.or_default()
.insert(index, column)
.is_some()
{
// `DataColumnsByRangeRequestItems` ensures no duplicated indices across peers.
// Duplicates are likely a peer sending the same index multiple times; log and skip.
debug!(?block_root, ?index, "Repeated column for block_root");
}
}
by_root
}
/// Extracts and validates custody columns for a single block root.
///
/// Removes the matching entry from `columns_by_root`, checks all expected indices are
/// present, and logs any extras. Returns the raw columns; callers wrap them as needed.
fn extract_custody_columns_for_root(
block_root: Hash256,
columns_by_root: &mut HashMap<Hash256, HashMap<ColumnIndex, Arc<DataColumnSidecar<E>>>>,
expects_custody_columns: &[ColumnIndex],
column_to_peer: &HashMap<u64, PeerId>,
exceeded_retries: bool,
) -> Result<Vec<Arc<DataColumnSidecar<E>>>, CouplingError> {
let Some(mut by_index) = columns_by_root.remove(&block_root) else {
let responsible_peers = column_to_peer.iter().map(|c| (*c.0, *c.1)).collect();
return Err(CouplingError::DataColumnPeerFailure {
error: format!("No columns for block {block_root:?} with data"),
faulty_peers: responsible_peers,
exceeded_retries,
});
};
let mut columns = vec![];
let mut naughty_peers = vec![];
for index in expects_custody_columns {
if let Some(col) = by_index.remove(index) {
columns.push(col);
} else {
let Some(responsible_peer) = column_to_peer.get(index) else {
return Err(CouplingError::InternalError(format!(
"Internal error, no request made for column {index}"
)));
};
naughty_peers.push((*index, *responsible_peer));
}
}
if !naughty_peers.is_empty() {
return Err(CouplingError::DataColumnPeerFailure {
error: format!(
"Peers did not return column for block_root {block_root:?} {naughty_peers:?}"
),
faulty_peers: naughty_peers,
exceeded_retries,
});
}
if !by_index.is_empty() {
let remaining_indices = by_index.keys().collect::<Vec<_>>();
debug!(
?block_root,
?remaining_indices,
"Not all columns consumed for block"
);
}
Ok(columns)
}
}
impl<I: PartialEq + std::fmt::Display, T> ByRangeRequest<I, T> {
@@ -662,6 +644,8 @@ mod tests {
NumBlobs, generate_rand_block_and_blobs, generate_rand_block_and_data_columns,
test_da_checker, test_spec,
};
use bls::Signature;
use lighthouse_network::service::api_types::PayloadEnvelopesByRangeRequestId;
use lighthouse_network::{
PeerId,
service::api_types::{
@@ -672,7 +656,11 @@ mod tests {
use rand::SeedableRng;
use std::{collections::HashMap, sync::Arc};
use tracing::Span;
use types::{Epoch, ForkName, MinimalEthSpec as E, SignedBeaconBlock, test_utils::XorShiftRng};
use types::{
Epoch, EthSpec, ExecutionBlockHash, ExecutionPayloadEnvelope, ExecutionPayloadGloas,
ExecutionRequests, ForkName, Hash256, MinimalEthSpec as E, SignedBeaconBlock,
SignedExecutionPayloadEnvelope, Slot, test_utils::XorShiftRng,
};
fn components_id() -> ComponentsByRangeRequestId {
ComponentsByRangeRequestId {
@@ -759,6 +747,7 @@ mod tests {
blocks_req_id,
Some(blobs_req_id),
None,
None,
Span::none(),
);
@@ -818,6 +807,7 @@ mod tests {
blocks_req_id,
None,
Some((columns_req_id.clone(), expects_custody_columns.clone())),
None,
Span::none(),
);
// Send blocks and complete terminate response
@@ -894,6 +884,7 @@ mod tests {
blocks_req_id,
None,
Some((columns_req_id.clone(), expected_sampling_columns.clone())),
None,
Span::none(),
);
@@ -986,6 +977,7 @@ mod tests {
blocks_req_id,
None,
Some((columns_req_id.clone(), expected_sampling_columns.clone())),
None,
Span::none(),
);
@@ -1083,6 +1075,7 @@ mod tests {
blocks_req_id,
None,
Some((columns_req_id.clone(), expected_sampling_columns.clone())),
None,
Span::none(),
);
@@ -1198,6 +1191,7 @@ mod tests {
blocks_req_id,
None,
Some((columns_req_id.clone(), expected_sampling_columns.clone())),
None,
Span::none(),
);
@@ -1269,4 +1263,170 @@ mod tests {
panic!("Expected PeerFailure error with exceeded_retries=true");
}
}
// --- Gloas tests ---
fn make_gloas_envelope<E: EthSpec>(
slot: Slot,
rng: &mut impl rand::Rng,
) -> Arc<SignedExecutionPayloadEnvelope<E>> {
let envelope = ExecutionPayloadEnvelope {
payload: ExecutionPayloadGloas {
slot_number: slot,
block_hash: ExecutionBlockHash::from_root(Hash256::from(rng.random::<[u8; 32]>())),
..ExecutionPayloadGloas::default()
},
execution_requests: ExecutionRequests::default(),
builder_index: 0,
beacon_block_root: Hash256::from(rng.random::<[u8; 32]>()),
};
Arc::new(SignedExecutionPayloadEnvelope {
message: envelope,
signature: Signature::empty(),
})
}
fn envelope_id(
parent_request_id: ComponentsByRangeRequestId,
) -> PayloadEnvelopesByRangeRequestId {
use lighthouse_network::service::api_types::PayloadEnvelopesByRangeRequestId;
PayloadEnvelopesByRangeRequestId {
id: 99,
parent_request_id,
}
}
#[test]
fn gloas_blocks_couple_with_envelopes() {
let mut spec = test_spec::<E>();
spec.deneb_fork_epoch = Some(Epoch::new(0));
spec.fulu_fork_epoch = Some(Epoch::new(0));
spec.gloas_fork_epoch = Some(Epoch::new(0));
let spec = Arc::new(spec);
let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode));
let mut rng = XorShiftRng::from_seed([42; 16]);
let blocks = (0..4)
.map(|_| {
let (raw_block, _) = generate_rand_block_and_data_columns::<E>(
ForkName::Gloas,
NumBlobs::None,
&mut rng,
&spec,
);
Arc::new(raw_block) as Arc<SignedBeaconBlock<E>>
})
.collect::<Vec<_>>();
// Build envelopes with slots matching each block
let envelopes: Vec<Arc<SignedExecutionPayloadEnvelope<E>>> = blocks
.iter()
.map(|b| make_gloas_envelope::<E>(b.slot(), &mut rng))
.collect();
let components_id = components_id();
let blocks_req_id = blocks_id(components_id);
let env_req_id = envelope_id(components_id);
let mut info = RangeBlockComponentsRequest::<E>::new(
blocks_req_id,
None,
None,
Some(env_req_id),
Span::none(),
);
info.add_blocks(blocks_req_id, blocks).unwrap();
// Not finished — envelopes still pending
assert!(!is_finished(&mut info));
info.add_payload_envelopes(env_req_id, envelopes).unwrap();
let result = info.responses(da_checker, spec).unwrap();
assert!(result.is_ok());
assert_eq!(result.unwrap().len(), 4);
}
#[test]
fn gloas_blocks_without_envelopes_succeed() {
// Blocks with no blobs don't require envelopes — they should couple fine with an empty envelope response.
let mut spec = test_spec::<E>();
spec.deneb_fork_epoch = Some(Epoch::new(0));
spec.fulu_fork_epoch = Some(Epoch::new(0));
spec.gloas_fork_epoch = Some(Epoch::new(0));
let spec = Arc::new(spec);
let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode));
let mut rng = XorShiftRng::from_seed([42; 16]);
let (raw_block, _) = generate_rand_block_and_data_columns::<E>(
ForkName::Gloas,
NumBlobs::None,
&mut rng,
&spec,
);
let block: Arc<SignedBeaconBlock<E>> = Arc::new(raw_block);
let components_id = components_id();
let blocks_req_id = blocks_id(components_id);
let env_req_id = envelope_id(components_id);
let mut info = RangeBlockComponentsRequest::<E>::new(
blocks_req_id,
None,
None,
Some(env_req_id),
Span::none(),
);
info.add_blocks(blocks_req_id, vec![block]).unwrap();
// No envelope for this block (peer didn't send one)
info.add_payload_envelopes(env_req_id, vec![]).unwrap();
let result = info.responses(da_checker, spec).unwrap();
assert!(result.is_ok(), "expected Ok, got: {:?}", result);
assert_eq!(result.unwrap().len(), 1);
}
#[test]
fn gloas_extra_envelopes_are_ignored() {
let mut spec = test_spec::<E>();
spec.deneb_fork_epoch = Some(Epoch::new(0));
spec.fulu_fork_epoch = Some(Epoch::new(0));
spec.gloas_fork_epoch = Some(Epoch::new(0));
let spec = Arc::new(spec);
let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode));
let mut rng = XorShiftRng::from_seed([99; 16]);
let (raw_block, _) = generate_rand_block_and_data_columns::<E>(
ForkName::Gloas,
NumBlobs::None,
&mut rng,
&spec,
);
let block: Arc<SignedBeaconBlock<E>> = Arc::new(raw_block);
let slot = block.slot();
let components_id = components_id();
let blocks_req_id = blocks_id(components_id);
let env_req_id = envelope_id(components_id);
let mut info = RangeBlockComponentsRequest::<E>::new(
blocks_req_id,
None,
None,
Some(env_req_id),
Span::none(),
);
info.add_blocks(blocks_req_id, vec![block]).unwrap();
// Two envelopes: one matching, one extra at a different slot
let env1 = make_gloas_envelope::<E>(slot, &mut rng);
let env2 = make_gloas_envelope::<E>(Slot::new(slot.as_u64() + 10), &mut rng);
info.add_payload_envelopes(env_req_id, vec![env1, env2])
.unwrap();
let result = info.responses(da_checker, spec).unwrap();
assert!(result.is_ok());
assert_eq!(result.unwrap().len(), 1);
}
}