mirror of
https://github.com/sigp/lighthouse.git
synced 2026-07-03 12:54:27 +00:00
Fix data columns by range returning all columns (#7942)
N/A In https://github.com/sigp/lighthouse/pull/7897 , we seem to have modified data columns by range to return all the columns we have for the requested epoch disregarding what columns the peer requested.
This commit is contained in:
@@ -1200,8 +1200,15 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
.chain
|
.chain
|
||||||
.custody_columns_for_epoch(Some(request_start_epoch));
|
.custody_columns_for_epoch(Some(request_start_epoch));
|
||||||
|
|
||||||
|
let indices_to_retrieve = req
|
||||||
|
.columns
|
||||||
|
.iter()
|
||||||
|
.copied()
|
||||||
|
.filter(|c| available_columns.contains(c))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
for root in block_roots {
|
for root in block_roots {
|
||||||
for index in available_columns {
|
for index in &indices_to_retrieve {
|
||||||
match self.chain.get_data_column(&root, index) {
|
match self.chain.get_data_column(&root, index) {
|
||||||
Ok(Some(data_column_sidecar)) => {
|
Ok(Some(data_column_sidecar)) => {
|
||||||
// Due to skip slots, data columns could be out of the range, we ensure they
|
// Due to skip slots, data columns could be out of the range, we ensure they
|
||||||
|
|||||||
@@ -21,7 +21,9 @@ use beacon_processor::{work_reprocessing_queue::*, *};
|
|||||||
use gossipsub::MessageAcceptance;
|
use gossipsub::MessageAcceptance;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use lighthouse_network::rpc::InboundRequestId;
|
use lighthouse_network::rpc::InboundRequestId;
|
||||||
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MetaDataV3};
|
use lighthouse_network::rpc::methods::{
|
||||||
|
BlobsByRangeRequest, DataColumnsByRangeRequest, MetaDataV3,
|
||||||
|
};
|
||||||
use lighthouse_network::{
|
use lighthouse_network::{
|
||||||
Client, MessageId, NetworkConfig, NetworkGlobals, PeerId, Response,
|
Client, MessageId, NetworkConfig, NetworkGlobals, PeerId, Response,
|
||||||
discv5::enr::{self, CombinedKey},
|
discv5::enr::{self, CombinedKey},
|
||||||
@@ -30,6 +32,7 @@ use lighthouse_network::{
|
|||||||
};
|
};
|
||||||
use matches::assert_matches;
|
use matches::assert_matches;
|
||||||
use slot_clock::SlotClock;
|
use slot_clock::SlotClock;
|
||||||
|
use std::collections::HashSet;
|
||||||
use std::iter::Iterator;
|
use std::iter::Iterator;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
@@ -432,6 +435,20 @@ impl TestRig {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn enqueue_data_columns_by_range_request(&self, count: u64, columns: Vec<u64>) {
|
||||||
|
self.network_beacon_processor
|
||||||
|
.send_data_columns_by_range_request(
|
||||||
|
PeerId::random(),
|
||||||
|
InboundRequestId::new_unchecked(42, 24),
|
||||||
|
DataColumnsByRangeRequest {
|
||||||
|
start_slot: 0,
|
||||||
|
count,
|
||||||
|
columns,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
pub fn enqueue_backfill_batch(&self) {
|
pub fn enqueue_backfill_batch(&self) {
|
||||||
self.network_beacon_processor
|
self.network_beacon_processor
|
||||||
.send_chain_segment(
|
.send_chain_segment(
|
||||||
@@ -1463,3 +1480,56 @@ async fn test_data_column_import_notifies_sync() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_data_columns_by_range_request_only_returns_requested_columns() {
|
||||||
|
if test_spec::<E>().fulu_fork_epoch.is_none() {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut rig = TestRig::new(64).await;
|
||||||
|
let slot_count = 4;
|
||||||
|
|
||||||
|
let all_custody_columns = rig
|
||||||
|
.chain
|
||||||
|
.sampling_columns_for_epoch(rig.chain.epoch().unwrap());
|
||||||
|
let available_columns: Vec<u64> = all_custody_columns.to_vec();
|
||||||
|
|
||||||
|
let requested_columns = vec![available_columns[0], available_columns[2]];
|
||||||
|
|
||||||
|
rig.enqueue_data_columns_by_range_request(slot_count, requested_columns.clone());
|
||||||
|
|
||||||
|
let mut received_columns = Vec::new();
|
||||||
|
|
||||||
|
while let Some(next) = rig.network_rx.recv().await {
|
||||||
|
if let NetworkMessage::SendResponse {
|
||||||
|
peer_id: _,
|
||||||
|
response: Response::DataColumnsByRange(data_column),
|
||||||
|
inbound_request_id: _,
|
||||||
|
} = next
|
||||||
|
{
|
||||||
|
if let Some(column) = data_column {
|
||||||
|
received_columns.push(column.index);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
panic!("unexpected message {:?}", next);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for received_index in &received_columns {
|
||||||
|
assert!(
|
||||||
|
requested_columns.contains(received_index),
|
||||||
|
"Received column index {} was not in requested columns {:?}",
|
||||||
|
received_index,
|
||||||
|
requested_columns
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
let unique_received: HashSet<_> = received_columns.into_iter().collect();
|
||||||
|
assert!(
|
||||||
|
!unique_received.is_empty(),
|
||||||
|
"Should have received at least some data columns"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user