mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-02 16:21:42 +00:00
Fix duplicate data columns in DataColumnsByRange responses (#8843)
Co-Authored-By: Jimmy Chen <jchen.tc@gmail.com>
This commit is contained in:
@@ -977,7 +977,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
};
|
||||
|
||||
// remove all skip slots i.e. duplicated roots
|
||||
Ok(block_roots.into_iter().unique().collect::<Vec<_>>())
|
||||
Ok(block_roots
|
||||
.into_iter()
|
||||
.unique_by(|(root, _)| *root)
|
||||
.collect::<Vec<_>>())
|
||||
}
|
||||
|
||||
/// Handle a `BlobsByRange` request from the peer.
|
||||
|
||||
@@ -120,6 +120,39 @@ impl TestRig {
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn new_with_skip_slots(chain_length: u64, skip_slots: &HashSet<u64>) -> Self {
|
||||
let mut spec = test_spec::<E>();
|
||||
spec.shard_committee_period = 2;
|
||||
let spec = Arc::new(spec);
|
||||
let beacon_processor_config = BeaconProcessorConfig::default();
|
||||
let harness = BeaconChainHarness::builder(MainnetEthSpec)
|
||||
.spec(spec.clone())
|
||||
.deterministic_keypairs(VALIDATOR_COUNT)
|
||||
.fresh_ephemeral_store()
|
||||
.mock_execution_layer()
|
||||
.node_custody_type(NodeCustodyType::Fullnode)
|
||||
.chain_config(<_>::default())
|
||||
.build();
|
||||
|
||||
harness.advance_slot();
|
||||
|
||||
for slot in 1..=chain_length {
|
||||
if !skip_slots.contains(&slot) {
|
||||
harness
|
||||
.extend_chain(
|
||||
1,
|
||||
BlockStrategy::OnCanonicalHead,
|
||||
AttestationStrategy::AllValidators,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
harness.advance_slot();
|
||||
}
|
||||
|
||||
Self::from_harness(harness, beacon_processor_config, spec).await
|
||||
}
|
||||
|
||||
pub async fn new_parametric(
|
||||
chain_length: u64,
|
||||
beacon_processor_config: BeaconProcessorConfig,
|
||||
@@ -150,6 +183,14 @@ impl TestRig {
|
||||
harness.advance_slot();
|
||||
}
|
||||
|
||||
Self::from_harness(harness, beacon_processor_config, spec).await
|
||||
}
|
||||
|
||||
async fn from_harness(
|
||||
harness: BeaconChainHarness<T>,
|
||||
beacon_processor_config: BeaconProcessorConfig,
|
||||
spec: Arc<ChainSpec>,
|
||||
) -> Self {
|
||||
let head = harness.chain.head_snapshot();
|
||||
|
||||
assert_eq!(
|
||||
@@ -1986,3 +2027,78 @@ async fn test_data_columns_by_range_request_only_returns_requested_columns() {
|
||||
"Should have received at least some data columns"
|
||||
);
|
||||
}
|
||||
|
||||
/// Test that DataColumnsByRange does not return duplicate data columns for skip slots.
|
||||
///
|
||||
/// When skip slots occur, `forwards_iter_block_roots` returns the same block root for
|
||||
/// consecutive slots. The deduplication in `get_block_roots_from_store` must use
|
||||
/// `unique_by` on the root (not the full `(root, slot)` tuple) to avoid serving
|
||||
/// duplicate data columns for the same block.
|
||||
#[tokio::test]
|
||||
async fn test_data_columns_by_range_no_duplicates_with_skip_slots() {
|
||||
if test_spec::<E>().fulu_fork_epoch.is_none() {
|
||||
return;
|
||||
};
|
||||
|
||||
// Build a chain of 128 slots (4 epochs) with skip slots at positions 5 and 6.
|
||||
// After 4 epochs, finalized_epoch=2 (finalized_slot=64). Requesting slots 0-9
|
||||
// satisfies req_start_slot + req_count <= finalized_slot (10 <= 64), which routes
|
||||
// through `get_block_roots_from_store` — the code path with the bug.
|
||||
let skip_slots: HashSet<u64> = [5, 6].into_iter().collect();
|
||||
let mut rig = TestRig::new_with_skip_slots(128, &skip_slots).await;
|
||||
|
||||
let all_custody_columns = rig.chain.custody_columns_for_epoch(Some(Epoch::new(0)));
|
||||
let requested_column = vec![all_custody_columns[0]];
|
||||
|
||||
// Request a range that spans the skip slots (slots 0 through 9).
|
||||
let start_slot = 0;
|
||||
let slot_count = 10;
|
||||
|
||||
rig.network_beacon_processor
|
||||
.send_data_columns_by_range_request(
|
||||
PeerId::random(),
|
||||
InboundRequestId::new_unchecked(42, 24),
|
||||
DataColumnsByRangeRequest {
|
||||
start_slot,
|
||||
count: slot_count,
|
||||
columns: requested_column.clone(),
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Collect block roots from all data column responses.
|
||||
let mut block_roots: Vec<Hash256> = Vec::new();
|
||||
|
||||
while let Some(next) = rig.network_rx.recv().await {
|
||||
if let NetworkMessage::SendResponse {
|
||||
peer_id: _,
|
||||
response: Response::DataColumnsByRange(data_column),
|
||||
inbound_request_id: _,
|
||||
} = next
|
||||
{
|
||||
if let Some(column) = data_column {
|
||||
block_roots.push(column.block_root());
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
panic!("unexpected message {:?}", next);
|
||||
}
|
||||
}
|
||||
|
||||
assert!(
|
||||
!block_roots.is_empty(),
|
||||
"Should have received at least some data columns"
|
||||
);
|
||||
|
||||
// Before the fix, skip slots caused the same block root to appear multiple times
|
||||
// (once per skip slot) because .unique() on (Hash256, Slot) tuples didn't deduplicate.
|
||||
let unique_roots: HashSet<_> = block_roots.iter().collect();
|
||||
assert_eq!(
|
||||
block_roots.len(),
|
||||
unique_roots.len(),
|
||||
"Response contained duplicate block roots: got {} columns but only {} unique roots",
|
||||
block_roots.len(),
|
||||
unique_roots.len(),
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user