diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 85e4f04641..9ddba86b81 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -1200,8 +1200,15 @@ impl NetworkBeaconProcessor { .chain .custody_columns_for_epoch(Some(request_start_epoch)); + let indices_to_retrieve = req + .columns + .iter() + .copied() + .filter(|c| available_columns.contains(c)) + .collect::>(); + for root in block_roots { - for index in available_columns { + for index in &indices_to_retrieve { match self.chain.get_data_column(&root, index) { Ok(Some(data_column_sidecar)) => { // Due to skip slots, data columns could be out of the range, we ensure they diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 916548eb9d..2027a525e6 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -21,7 +21,9 @@ use beacon_processor::{work_reprocessing_queue::*, *}; use gossipsub::MessageAcceptance; use itertools::Itertools; use lighthouse_network::rpc::InboundRequestId; -use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MetaDataV3}; +use lighthouse_network::rpc::methods::{ + BlobsByRangeRequest, DataColumnsByRangeRequest, MetaDataV3, +}; use lighthouse_network::{ Client, MessageId, NetworkConfig, NetworkGlobals, PeerId, Response, discv5::enr::{self, CombinedKey}, @@ -30,6 +32,7 @@ use lighthouse_network::{ }; use matches::assert_matches; use slot_clock::SlotClock; +use std::collections::HashSet; use std::iter::Iterator; use std::sync::Arc; use std::time::Duration; @@ -432,6 +435,20 @@ impl TestRig { .unwrap(); } + pub fn enqueue_data_columns_by_range_request(&self, count: u64, columns: Vec) { + self.network_beacon_processor + .send_data_columns_by_range_request( + PeerId::random(), + InboundRequestId::new_unchecked(42, 24), + DataColumnsByRangeRequest { + start_slot: 0, + count, + columns, + }, + ) + .unwrap(); + } + pub fn enqueue_backfill_batch(&self) { self.network_beacon_processor .send_chain_segment( @@ -1463,3 +1480,56 @@ async fn test_data_column_import_notifies_sync() { } } } + +#[tokio::test] +async fn test_data_columns_by_range_request_only_returns_requested_columns() { + if test_spec::().fulu_fork_epoch.is_none() { + return; + }; + + let mut rig = TestRig::new(64).await; + let slot_count = 4; + + let all_custody_columns = rig + .chain + .sampling_columns_for_epoch(rig.chain.epoch().unwrap()); + let available_columns: Vec = all_custody_columns.to_vec(); + + let requested_columns = vec![available_columns[0], available_columns[2]]; + + rig.enqueue_data_columns_by_range_request(slot_count, requested_columns.clone()); + + let mut received_columns = Vec::new(); + + while let Some(next) = rig.network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::DataColumnsByRange(data_column), + inbound_request_id: _, + } = next + { + if let Some(column) = data_column { + received_columns.push(column.index); + } else { + break; + } + } else { + panic!("unexpected message {:?}", next); + } + } + + for received_index in &received_columns { + assert!( + requested_columns.contains(received_index), + "Received column index {} was not in requested columns {:?}", + received_index, + requested_columns + ); + } + + let unique_received: HashSet<_> = received_columns.into_iter().collect(); + assert!( + !unique_received.is_empty(), + "Should have received at least some data columns" + ); +}