use beacon_chain::{ BeaconChainTypes, block_verification_types::{AvailableBlockData, RangeSyncBlock}, data_availability_checker::DataAvailabilityChecker, data_column_verification::CustodyDataColumn, get_block_root, }; use lighthouse_network::{ PeerId, service::api_types::{ BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId, }, }; use ssz_types::RuntimeVariableList; use std::{collections::HashMap, sync::Arc}; use tracing::{Span, debug}; use types::{ BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, Hash256, SignedBeaconBlock, }; use crate::sync::network_context::MAX_COLUMN_RETRIES; /// Accumulates and couples beacon blocks with their associated data (blobs or data columns) /// from range sync network responses. /// /// This struct acts as temporary storage while multiple network responses arrive: /// - Blocks themselves (always required) /// - Blob sidecars (pre-Fulu fork) /// - Data columns (Fulu fork and later) /// /// It accumulates responses until all expected components are received, then couples /// them together and returns complete `RpcBlock`s ready for processing. Handles validation /// and peer failure detection during the coupling process. pub struct RangeBlockComponentsRequest { /// Blocks we have received awaiting for their corresponding sidecar. blocks_request: ByRangeRequest>>>, /// Sidecars we have received awaiting for their corresponding block. block_data_request: RangeBlockDataRequest, /// Span to track the range request and all children range requests. pub(crate) request_span: Span, } pub enum ByRangeRequest { Active(I), Complete(T), } enum RangeBlockDataRequest { NoData, Blobs(ByRangeRequest>>>), DataColumns { requests: HashMap< DataColumnsByRangeRequestId, ByRangeRequest>, >, /// The column indices corresponding to the request column_peers: HashMap>, expected_custody_columns: Vec, attempt: usize, }, } #[derive(Debug)] pub(crate) enum CouplingError { InternalError(String), /// The peer we requested the columns from was faulty/malicious DataColumnPeerFailure { error: String, faulty_peers: Vec<(ColumnIndex, PeerId)>, exceeded_retries: bool, }, BlobPeerFailure(String), } impl RangeBlockComponentsRequest { /// Creates a new range request for blocks and their associated data (blobs or data columns). /// /// # Arguments /// * `blocks_req_id` - Request ID for the blocks /// * `blobs_req_id` - Optional request ID for blobs (pre-Fulu fork) /// * `data_columns` - Optional tuple of (request_id->column_indices pairs, expected_custody_columns) for Fulu fork #[allow(clippy::type_complexity)] pub fn new( blocks_req_id: BlocksByRangeRequestId, blobs_req_id: Option, data_columns: Option<( Vec<(DataColumnsByRangeRequestId, Vec)>, Vec, )>, request_span: Span, ) -> Self { let block_data_request = if let Some(blobs_req_id) = blobs_req_id { RangeBlockDataRequest::Blobs(ByRangeRequest::Active(blobs_req_id)) } else if let Some((requests, expected_custody_columns)) = data_columns { let column_peers: HashMap<_, _> = requests.into_iter().collect(); RangeBlockDataRequest::DataColumns { requests: column_peers .keys() .map(|id| (*id, ByRangeRequest::Active(*id))) .collect(), column_peers, expected_custody_columns, attempt: 0, } } else { RangeBlockDataRequest::NoData }; Self { blocks_request: ByRangeRequest::Active(blocks_req_id), block_data_request, request_span, } } /// Modifies `self` by inserting a new `DataColumnsByRangeRequestId` for a formerly failed /// request for some columns. pub fn reinsert_failed_column_requests( &mut self, failed_column_requests: Vec<(DataColumnsByRangeRequestId, Vec)>, ) -> Result<(), String> { match &mut self.block_data_request { RangeBlockDataRequest::DataColumns { requests, expected_custody_columns: _, column_peers, attempt: _, } => { for (request, columns) in failed_column_requests.into_iter() { requests.insert(request, ByRangeRequest::Active(request)); column_peers.insert(request, columns); } Ok(()) } _ => Err("not a column request".to_string()), } } /// Adds received blocks to the request. /// /// Returns an error if the request ID doesn't match the expected blocks request. pub fn add_blocks( &mut self, req_id: BlocksByRangeRequestId, blocks: Vec>>, ) -> Result<(), String> { self.blocks_request.finish(req_id, blocks) } /// Adds received blobs to the request. /// /// Returns an error if this request expects data columns instead of blobs, /// or if the request ID doesn't match. pub fn add_blobs( &mut self, req_id: BlobsByRangeRequestId, blobs: Vec>>, ) -> Result<(), String> { match &mut self.block_data_request { RangeBlockDataRequest::NoData => Err("received blobs but expected no data".to_owned()), RangeBlockDataRequest::Blobs(req) => req.finish(req_id, blobs), RangeBlockDataRequest::DataColumns { .. } => { Err("received blobs but expected data columns".to_owned()) } } } /// Adds received custody columns to the request. /// /// Returns an error if this request expects blobs instead of data columns, /// or if the request ID is unknown. pub fn add_custody_columns( &mut self, req_id: DataColumnsByRangeRequestId, columns: Vec>>, ) -> Result<(), String> { match &mut self.block_data_request { RangeBlockDataRequest::NoData => { Err("received data columns but expected no data".to_owned()) } RangeBlockDataRequest::Blobs(_) => { Err("received data columns but expected blobs".to_owned()) } RangeBlockDataRequest::DataColumns { requests, .. } => { let req = requests .get_mut(&req_id) .ok_or(format!("unknown data columns by range req_id {req_id}"))?; req.finish(req_id, columns) } } } /// Attempts to construct RPC blocks from all received components. /// /// Returns `None` if not all expected requests have completed. /// Returns `Some(Ok(_))` with valid RPC blocks if all data is present and valid. /// Returns `Some(Err(_))` if there are issues coupling blocks with their data. pub fn responses( &mut self, da_checker: Arc>, spec: Arc, ) -> Option>, CouplingError>> where T: BeaconChainTypes, { let Some(blocks) = self.blocks_request.to_finished() else { return None; }; // Increment the attempt once this function returns the response or errors match &mut self.block_data_request { RangeBlockDataRequest::NoData => Some(Self::responses_with_blobs( blocks.to_vec(), vec![], da_checker, spec, )), RangeBlockDataRequest::Blobs(request) => { let Some(blobs) = request.to_finished() else { return None; }; Some(Self::responses_with_blobs( blocks.to_vec(), blobs.to_vec(), da_checker, spec, )) } RangeBlockDataRequest::DataColumns { requests, expected_custody_columns, column_peers, attempt, } => { let mut data_columns = vec![]; let mut column_to_peer_id: HashMap = HashMap::new(); for req in requests.values() { let Some(data) = req.to_finished() else { return None; }; data_columns.extend(data.clone()) } // An "attempt" is complete here after we have received a response for all the // requests we made. i.e. `req.to_finished()` returns Some for all requests. *attempt += 1; // Note: this assumes that only 1 peer is responsible for a column // with a batch. for (id, columns) in column_peers { for column in columns { column_to_peer_id.insert(*column, id.peer); } } let resp = Self::responses_with_custody_columns( blocks.to_vec(), data_columns, column_to_peer_id, expected_custody_columns, *attempt, da_checker, spec, ); if let Err(CouplingError::DataColumnPeerFailure { error: _, faulty_peers, exceeded_retries: _, }) = &resp { for (_, peer) in faulty_peers.iter() { // find the req id associated with the peer and // delete it from the entries as we are going to make // a separate attempt for those components. requests.retain(|&k, _| k.peer != *peer); } } Some(resp) } } } fn responses_with_blobs( blocks: Vec>>, blobs: Vec>>, da_checker: Arc>, spec: Arc, ) -> Result>, CouplingError> where T: BeaconChainTypes, { // There can't be more more blobs than blocks. i.e. sending any blob (empty // included) for a skipped slot is not permitted. let mut responses = Vec::with_capacity(blocks.len()); let mut blob_iter = blobs.into_iter().peekable(); for block in blocks.into_iter() { let max_blobs_per_block = spec.max_blobs_per_block(block.epoch()) as usize; let mut blob_list = Vec::with_capacity(max_blobs_per_block); while { blob_iter .peek() .map(|sidecar| sidecar.slot() == block.slot()) .unwrap_or(false) } { blob_list.push(blob_iter.next().ok_or_else(|| { CouplingError::BlobPeerFailure("Missing next blob".to_string()) })?); } let mut blobs_buffer = vec![None; max_blobs_per_block]; for blob in blob_list { let blob_index = blob.index as usize; let Some(blob_opt) = blobs_buffer.get_mut(blob_index) else { return Err(CouplingError::BlobPeerFailure( "Invalid blob index".to_string(), )); }; if blob_opt.is_some() { return Err(CouplingError::BlobPeerFailure( "Repeat blob index".to_string(), )); } else { *blob_opt = Some(blob); } } let blobs = RuntimeVariableList::new( blobs_buffer.into_iter().flatten().collect::>(), max_blobs_per_block, ) .map_err(|_| { CouplingError::BlobPeerFailure("Blobs returned exceeds max length".to_string()) })?; let block_data = AvailableBlockData::new_with_blobs(blobs); responses.push( RangeSyncBlock::new(block, block_data, &da_checker, spec.clone()) .map_err(|e| CouplingError::BlobPeerFailure(format!("{e:?}")))?, ) } // if accumulated sidecars is not empty, log an error but return the responses // as we can still make progress. if blob_iter.next().is_some() { let remaining_blobs = blob_iter .map(|b| (b.index, b.block_root())) .collect::>(); debug!(?remaining_blobs, "Received sidecars that don't pair well",); } Ok(responses) } fn responses_with_custody_columns( blocks: Vec>>, data_columns: DataColumnSidecarList, column_to_peer: HashMap, expects_custody_columns: &[ColumnIndex], attempt: usize, da_checker: Arc>, spec: Arc, ) -> Result>, CouplingError> where T: BeaconChainTypes, { // Group data columns by block_root and index let mut data_columns_by_block = HashMap::>>>::new(); for column in data_columns { let block_root = column.block_root(); let index = *column.index(); if data_columns_by_block .entry(block_root) .or_default() .insert(index, column) .is_some() { // `DataColumnsByRangeRequestItems` ensures that we do not request any duplicated indices across all peers // we request the data from. // If there are duplicated indices, its likely a peer sending us the same index multiple times. // However we can still proceed even if there are extra columns, just log an error. debug!(?block_root, ?index, "Repeated column for block_root"); continue; } } // Now iterate all blocks ensuring that the block roots of each block and data column match, // plus we have columns for our custody requirements let mut range_sync_blocks = Vec::with_capacity(blocks.len()); let exceeded_retries = attempt >= MAX_COLUMN_RETRIES; for block in blocks { let block_root = get_block_root(&block); range_sync_blocks.push(if block.num_expected_blobs() > 0 { let Some(mut data_columns_by_index) = data_columns_by_block.remove(&block_root) else { let responsible_peers = column_to_peer.iter().map(|c| (*c.0, *c.1)).collect(); return Err(CouplingError::DataColumnPeerFailure { error: format!("No columns for block {block_root:?} with data"), faulty_peers: responsible_peers, exceeded_retries, }); }; let mut custody_columns = vec![]; let mut naughty_peers = vec![]; for index in expects_custody_columns { // Safe to convert to `CustodyDataColumn`: we have asserted that the index of // this column is in the set of `expects_custody_columns` and with the expected // block root, so for the expected epoch of this batch. if let Some(data_column) = data_columns_by_index.remove(index) { custody_columns.push(CustodyDataColumn::from_asserted_custody(data_column)); } else { let Some(responsible_peer) = column_to_peer.get(index) else { return Err(CouplingError::InternalError(format!("Internal error, no request made for column {}", index))); }; naughty_peers.push((*index, *responsible_peer)); } } if !naughty_peers.is_empty() { return Err(CouplingError::DataColumnPeerFailure { error: format!("Peers did not return column for block_root {block_root:?} {naughty_peers:?}"), faulty_peers: naughty_peers, exceeded_retries }); } // Assert that there are no columns left if !data_columns_by_index.is_empty() { let remaining_indices = data_columns_by_index.keys().collect::>(); // log the error but don't return an error, we can still progress with extra columns. debug!( ?block_root, ?remaining_indices, "Not all columns consumed for block" ); } let block_data = AvailableBlockData::new_with_data_columns(custody_columns.iter().map(|c| c.as_data_column().clone()).collect::>()); RangeSyncBlock::new(block, block_data, &da_checker, spec.clone()) .map_err(|e| CouplingError::InternalError(format!("{:?}", e)))? } else { // Block has no data, expects zero columns RangeSyncBlock::new(block, AvailableBlockData::NoData, &da_checker, spec.clone()) .map_err(|e| CouplingError::InternalError(format!("{:?}", e)))? }); } // Assert that there are no columns left for other blocks if !data_columns_by_block.is_empty() { let remaining_roots = data_columns_by_block.keys().collect::>(); // log the error but don't return an error, we can still progress with responses. // this is most likely an internal error with overrequesting or a client bug. debug!(?remaining_roots, "Not all columns consumed for block"); } Ok(range_sync_blocks) } } impl ByRangeRequest { pub fn finish(&mut self, id: I, data: T) -> Result<(), String> { match self { Self::Active(expected_id) => { if expected_id != &id { return Err(format!("unexpected req_id expected {expected_id} got {id}")); } *self = Self::Complete(data); Ok(()) } Self::Complete(_) => Err("request already complete".to_owned()), } } pub fn to_finished(&self) -> Option<&T> { match self { Self::Active(_) => None, Self::Complete(data) => Some(data), } } } #[cfg(test)] mod tests { use crate::sync::network_context::MAX_COLUMN_RETRIES; use super::RangeBlockComponentsRequest; use beacon_chain::custody_context::NodeCustodyType; use beacon_chain::test_utils::{ NumBlobs, generate_rand_block_and_blobs, generate_rand_block_and_data_columns, test_da_checker, test_spec, }; use lighthouse_network::{ PeerId, service::api_types::{ BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, DataColumnsByRangeRequestId, DataColumnsByRangeRequester, Id, RangeRequestId, }, }; use std::{collections::HashMap, sync::Arc}; use tracing::Span; use types::{Epoch, ForkName, MinimalEthSpec as E, SignedBeaconBlock}; fn components_id() -> ComponentsByRangeRequestId { ComponentsByRangeRequestId { id: 0, requester: RangeRequestId::RangeSync { chain_id: 1, batch_id: Epoch::new(0), }, } } fn blocks_id(parent_request_id: ComponentsByRangeRequestId) -> BlocksByRangeRequestId { BlocksByRangeRequestId { id: 1, parent_request_id, } } fn blobs_id(parent_request_id: ComponentsByRangeRequestId) -> BlobsByRangeRequestId { BlobsByRangeRequestId { id: 1, parent_request_id, } } fn columns_id( id: Id, parent_request_id: DataColumnsByRangeRequester, ) -> DataColumnsByRangeRequestId { DataColumnsByRangeRequestId { id, parent_request_id, peer: PeerId::random(), } } fn is_finished(info: &mut RangeBlockComponentsRequest) -> bool { let spec = Arc::new(test_spec::()); let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); info.responses(da_checker, spec).is_some() } #[test] fn no_blobs_into_responses() { let mut u = types::test_utils::test_unstructured(); let blocks = (0..4) .map(|_| { generate_rand_block_and_blobs::(ForkName::Base, NumBlobs::None, &mut u) .unwrap() .0 .into() }) .collect::>>>(); let blocks_req_id = blocks_id(components_id()); let mut info = RangeBlockComponentsRequest::::new(blocks_req_id, None, None, Span::none()); // Send blocks and complete terminate response info.add_blocks(blocks_req_id, blocks).unwrap(); let spec = Arc::new(test_spec::()); let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); // Assert response is finished and RpcBlocks can be constructed info.responses(da_checker, spec).unwrap().unwrap(); } #[test] fn empty_blobs_into_responses() { let mut u = types::test_utils::test_unstructured(); let blocks = (0..4) .map(|_| { // Always generate some blobs. generate_rand_block_and_blobs::(ForkName::Deneb, NumBlobs::Number(3), &mut u) .unwrap() .0 .into() }) .collect::>>>(); let components_id = components_id(); let blocks_req_id = blocks_id(components_id); let blobs_req_id = blobs_id(components_id); let mut info = RangeBlockComponentsRequest::::new( blocks_req_id, Some(blobs_req_id), None, Span::none(), ); // Send blocks and complete terminate response info.add_blocks(blocks_req_id, blocks).unwrap(); // Expect no blobs returned info.add_blobs(blobs_req_id, vec![]).unwrap(); let mut spec = test_spec::(); spec.deneb_fork_epoch = Some(Epoch::new(0)); let spec = Arc::new(spec); let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); // Assert response is finished and RpcBlocks cannot be constructed, because blobs weren't returned. let result = info.responses(da_checker, spec).unwrap(); assert!(result.is_err()) } #[test] fn rpc_block_with_custody_columns() { let mut spec = test_spec::(); spec.deneb_fork_epoch = Some(Epoch::new(0)); spec.fulu_fork_epoch = Some(Epoch::new(0)); let spec = Arc::new(spec); let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); let expects_custody_columns = da_checker .custody_context() .sampling_columns_for_epoch(Epoch::new(0), &spec) .to_vec(); let mut u = types::test_utils::test_unstructured(); let blocks = (0..4) .map(|_| { generate_rand_block_and_data_columns::( ForkName::Fulu, NumBlobs::Number(1), &mut u, &spec, ) .unwrap() }) .collect::>(); let components_id = components_id(); let blocks_req_id = blocks_id(components_id); let columns_req_id = expects_custody_columns .iter() .enumerate() .map(|(i, column)| { ( columns_id( i as Id, DataColumnsByRangeRequester::ComponentsByRange(components_id), ), vec![*column], ) }) .collect::>(); let mut info = RangeBlockComponentsRequest::::new( blocks_req_id, None, Some((columns_req_id.clone(), expects_custody_columns.clone())), Span::none(), ); // Send blocks and complete terminate response info.add_blocks( blocks_req_id, blocks.iter().map(|b| b.0.clone().into()).collect(), ) .unwrap(); // Assert response is not finished assert!(!is_finished(&mut info)); // Send data columns for (i, &column_index) in expects_custody_columns.iter().enumerate() { let (req, _columns) = columns_req_id.get(i).unwrap(); info.add_custody_columns( *req, blocks .iter() .flat_map(|b| b.1.iter().filter(|d| *d.index() == column_index).cloned()) .collect(), ) .unwrap(); if i < expects_custody_columns.len() - 1 { assert!( !is_finished(&mut info), "requested should not be finished at loop {i}" ); } } // All completed construct response info.responses(da_checker, spec).unwrap().unwrap(); } #[test] fn rpc_block_with_custody_columns_batched() { let mut spec = test_spec::(); spec.deneb_fork_epoch = Some(Epoch::new(0)); spec.fulu_fork_epoch = Some(Epoch::new(0)); let spec = Arc::new(spec); let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); let expected_sampling_columns = da_checker .custody_context() .sampling_columns_for_epoch(Epoch::new(0), &spec) .to_vec(); // Split sampling columns into two batches let mid = expected_sampling_columns.len() / 2; let batched_column_requests = [ expected_sampling_columns[..mid].to_vec(), expected_sampling_columns[mid..].to_vec(), ]; let custody_column_request_ids = (0..batched_column_requests.len() as u32).collect::>(); let num_of_data_column_requests = custody_column_request_ids.len(); let components_id = components_id(); let blocks_req_id = blocks_id(components_id); let columns_req_id = batched_column_requests .iter() .enumerate() .map(|(i, columns)| { ( columns_id( i as Id, DataColumnsByRangeRequester::ComponentsByRange(components_id), ), columns.clone(), ) }) .collect::>(); let mut info = RangeBlockComponentsRequest::::new( blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), Span::none(), ); let mut u = types::test_utils::test_unstructured(); let blocks = (0..4) .map(|_| { generate_rand_block_and_data_columns::( ForkName::Fulu, NumBlobs::Number(1), &mut u, &spec, ) .unwrap() }) .collect::>(); // Send blocks and complete terminate response info.add_blocks( blocks_req_id, blocks.iter().map(|b| b.0.clone().into()).collect(), ) .unwrap(); // Assert response is not finished assert!(!is_finished(&mut info)); for (i, column_indices) in batched_column_requests.iter().enumerate() { let (req, _columns) = columns_req_id.get(i).unwrap(); // Send the set of columns in the same batch request info.add_custody_columns( *req, blocks .iter() .flat_map(|b| { b.1.iter() .filter(|d| column_indices.contains(d.index())) .cloned() }) .collect::>(), ) .unwrap(); if i < num_of_data_column_requests - 1 { assert!( !is_finished(&mut info), "requested should not be finished at loop {i}" ); } } // All completed construct response info.responses(da_checker, spec).unwrap().unwrap(); } #[test] fn missing_custody_columns_from_faulty_peers() { // GIVEN: A request expecting sampling columns from multiple peers let spec = Arc::new(test_spec::()); let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); let expected_sampling_columns = da_checker .custody_context() .sampling_columns_for_epoch(Epoch::new(0), &spec) .to_vec(); let mut u = types::test_utils::test_unstructured(); let blocks = (0..2) .map(|_| { generate_rand_block_and_data_columns::( ForkName::Fulu, NumBlobs::Number(1), &mut u, &spec, ) .unwrap() }) .collect::>(); let components_id = components_id(); let blocks_req_id = blocks_id(components_id); let columns_req_id = expected_sampling_columns .iter() .enumerate() .map(|(i, column)| { ( columns_id( i as Id, DataColumnsByRangeRequester::ComponentsByRange(components_id), ), vec![*column], ) }) .collect::>(); let mut info = RangeBlockComponentsRequest::::new( blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), Span::none(), ); // AND: All blocks are received successfully info.add_blocks( blocks_req_id, blocks.iter().map(|b| b.0.clone().into()).collect(), ) .unwrap(); // AND: Only the first 2 sampling columns are received successfully for (i, &column_index) in expected_sampling_columns.iter().take(2).enumerate() { let (req, _columns) = columns_req_id.get(i).unwrap(); info.add_custody_columns( *req, blocks .iter() .flat_map(|b| b.1.iter().filter(|d| *d.index() == column_index).cloned()) .collect(), ) .unwrap(); } // AND: Remaining column requests are completed with empty data (simulating faulty peers) for i in 2..expected_sampling_columns.len() { let (req, _columns) = columns_req_id.get(i).unwrap(); info.add_custody_columns(*req, vec![]).unwrap(); } // WHEN: Attempting to construct RPC blocks let result = info.responses(da_checker, spec).unwrap(); // THEN: Should fail with PeerFailure identifying the faulty peers assert!(result.is_err()); if let Err(super::CouplingError::DataColumnPeerFailure { error, faulty_peers, exceeded_retries, }) = result { assert!(error.contains("Peers did not return column")); // All columns after the first 2 should be reported as faulty let expected_faulty_count = expected_sampling_columns.len() - 2; assert_eq!(faulty_peers.len(), expected_faulty_count); // Verify the faulty column indices match for (i, (column_index, _peer)) in faulty_peers.iter().enumerate() { assert_eq!(*column_index, expected_sampling_columns[i + 2]); } assert!(!exceeded_retries); // First attempt, should be false } else { panic!("Expected PeerFailure error"); } } #[test] fn retry_logic_after_peer_failures() { // GIVEN: A request expecting sampling columns where some peers initially fail let mut spec = test_spec::(); spec.deneb_fork_epoch = Some(Epoch::new(0)); spec.fulu_fork_epoch = Some(Epoch::new(0)); let spec = Arc::new(spec); let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); let expected_sampling_columns = da_checker .custody_context() .sampling_columns_for_epoch(Epoch::new(0), &spec) .to_vec(); let mut u = types::test_utils::test_unstructured(); let blocks = (0..2) .map(|_| { generate_rand_block_and_data_columns::( ForkName::Fulu, NumBlobs::Number(1), &mut u, &spec, ) .unwrap() }) .collect::>(); let components_id = components_id(); let blocks_req_id = blocks_id(components_id); let columns_req_id = expected_sampling_columns .iter() .enumerate() .map(|(i, column)| { ( columns_id( i as Id, DataColumnsByRangeRequester::ComponentsByRange(components_id), ), vec![*column], ) }) .collect::>(); let mut info = RangeBlockComponentsRequest::::new( blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), Span::none(), ); // AND: All blocks are received info.add_blocks( blocks_req_id, blocks.iter().map(|b| b.0.clone().into()).collect(), ) .unwrap(); // AND: Only partial sampling columns are received (first column but not others) let (req0, _) = columns_req_id.first().unwrap(); info.add_custody_columns( *req0, blocks .iter() .flat_map(|b| { b.1.iter() .filter(|d| *d.index() == expected_sampling_columns[0]) .cloned() }) .collect(), ) .unwrap(); // AND: The remaining column requests are completed with empty data (peer failure) for i in 1..expected_sampling_columns.len() { let (req, _) = columns_req_id.get(i).unwrap(); info.add_custody_columns(*req, vec![]).unwrap(); } let result: Result< Vec>, crate::sync::block_sidecar_coupling::CouplingError, > = info.responses(da_checker.clone(), spec.clone()).unwrap(); assert!(result.is_err()); // AND: We retry with a new peer for the failed columns let new_columns_req_id = columns_id( 10 as Id, DataColumnsByRangeRequester::ComponentsByRange(components_id), ); for column in &expected_sampling_columns[1..] { let failed_column_requests = vec![(new_columns_req_id, vec![*column])]; info.reinsert_failed_column_requests(failed_column_requests) .unwrap(); } // AND: The new peer provides the missing column data let failed_column_indices: Vec<_> = expected_sampling_columns[1..].to_vec(); info.add_custody_columns( new_columns_req_id, blocks .iter() .flat_map(|b| { b.1.iter() .filter(|d| failed_column_indices.contains(d.index())) .cloned() }) .collect(), ) .unwrap(); // WHEN: Attempting to get responses again let result = info.responses(da_checker, spec).unwrap(); // THEN: Should succeed with complete RangeSync blocks assert!(result.is_ok()); let range_sync_blocks = result.unwrap(); assert_eq!(range_sync_blocks.len(), 2); } #[test] fn max_retries_exceeded_behavior() { // GIVEN: A request where peers consistently fail to provide required columns let mut spec = test_spec::(); spec.deneb_fork_epoch = Some(Epoch::new(0)); spec.fulu_fork_epoch = Some(Epoch::new(0)); let spec = Arc::new(spec); let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); let expected_sampling_columns = da_checker .custody_context() .sampling_columns_for_epoch(Epoch::new(0), &spec) .to_vec(); let mut u = types::test_utils::test_unstructured(); let blocks = (0..1) .map(|_| { generate_rand_block_and_data_columns::( ForkName::Fulu, NumBlobs::Number(1), &mut u, &spec, ) .unwrap() }) .collect::>(); let components_id = components_id(); let blocks_req_id = blocks_id(components_id); let columns_req_id = expected_sampling_columns .iter() .enumerate() .map(|(i, column)| { ( columns_id( i as Id, DataColumnsByRangeRequester::ComponentsByRange(components_id), ), vec![*column], ) }) .collect::>(); let mut info = RangeBlockComponentsRequest::::new( blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), Span::none(), ); // AND: All blocks are received info.add_blocks( blocks_req_id, blocks.iter().map(|b| b.0.clone().into()).collect(), ) .unwrap(); // AND: Only the first sampling column is provided successfully let (req0, _) = columns_req_id.first().unwrap(); info.add_custody_columns( *req0, blocks .iter() .flat_map(|b| { b.1.iter() .filter(|d| *d.index() == expected_sampling_columns[0]) .cloned() }) .collect(), ) .unwrap(); // AND: All other column requests complete with empty data (persistent peer failure) for i in 1..expected_sampling_columns.len() { let (req, _) = columns_req_id.get(i).unwrap(); info.add_custody_columns(*req, vec![]).unwrap(); } // WHEN: Multiple retry attempts are made (up to max retries) for _ in 0..MAX_COLUMN_RETRIES { let result = info.responses(da_checker.clone(), spec.clone()).unwrap(); assert!(result.is_err()); if let Err(super::CouplingError::DataColumnPeerFailure { exceeded_retries, .. }) = &result && *exceeded_retries { break; } } // AND: One final attempt after exceeding max retries let result = info.responses(da_checker, spec).unwrap(); // THEN: Should fail with exceeded_retries = true assert!(result.is_err()); if let Err(super::CouplingError::DataColumnPeerFailure { error: _, faulty_peers, exceeded_retries, }) = result { // All columns except the first one should be faulty let expected_faulty_count = expected_sampling_columns.len() - 1; assert_eq!(faulty_peers.len(), expected_faulty_count); let mut faulty_peers = faulty_peers.into_iter().collect::>(); // Only the columns that failed (indices 1..N) should be in faulty_peers for column in &expected_sampling_columns[1..] { faulty_peers.remove(column); } assert!(faulty_peers.is_empty()); assert!(exceeded_retries); // Should be true after max retries } else { panic!("Expected PeerFailure error with exceeded_retries=true"); } } }