diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 6c8a8eab63..45cb1aeace 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -1,89 +1,156 @@ use beacon_chain::{ block_verification_types::RpcBlock, data_column_verification::CustodyDataColumn, get_block_root, }; -use std::{ - collections::{HashMap, VecDeque}, - sync::Arc, +use lighthouse_network::service::api_types::{ + BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId, }; +use std::{collections::HashMap, sync::Arc}; use types::{ - BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, EthSpec, Hash256, RuntimeVariableList, - SignedBeaconBlock, + BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, + Hash256, RuntimeVariableList, SignedBeaconBlock, }; -#[derive(Debug)] pub struct RangeBlockComponentsRequest { /// Blocks we have received awaiting for their corresponding sidecar. - blocks: VecDeque>>, + blocks_request: ByRangeRequest>>>, /// Sidecars we have received awaiting for their corresponding block. - blobs: VecDeque>>, - data_columns: VecDeque>>, - /// Whether the individual RPC request for blocks is finished or not. - is_blocks_stream_terminated: bool, - /// Whether the individual RPC request for sidecars is finished or not. - is_sidecars_stream_terminated: bool, - custody_columns_streams_terminated: usize, - /// Used to determine if this accumulator should wait for a sidecars stream termination - expects_blobs: bool, - expects_custody_columns: Option>, - /// Used to determine if the number of data columns stream termination this accumulator should - /// wait for. This may be less than the number of `expects_custody_columns` due to request batching. - num_custody_column_requests: Option, + block_data_request: RangeBlockDataRequest, +} + +enum ByRangeRequest { + Active(I), + Complete(T), +} + +enum RangeBlockDataRequest { + NoData, + Blobs(ByRangeRequest>>>), + DataColumns { + requests: HashMap< + DataColumnsByRangeRequestId, + ByRangeRequest>, + >, + expected_custody_columns: Vec, + }, } impl RangeBlockComponentsRequest { pub fn new( - expects_blobs: bool, - expects_custody_columns: Option>, - num_custody_column_requests: Option, + blocks_req_id: BlocksByRangeRequestId, + blobs_req_id: Option, + data_columns: Option<(Vec, Vec)>, ) -> Self { - Self { - blocks: <_>::default(), - blobs: <_>::default(), - data_columns: <_>::default(), - is_blocks_stream_terminated: false, - is_sidecars_stream_terminated: false, - custody_columns_streams_terminated: 0, - expects_blobs, - expects_custody_columns, - num_custody_column_requests, - } - } - - pub fn add_blocks(&mut self, blocks: Vec>>) { - for block in blocks { - self.blocks.push_back(block); - } - self.is_blocks_stream_terminated = true; - } - - pub fn add_blobs(&mut self, blobs: Vec>>) { - for blob in blobs { - self.blobs.push_back(blob); - } - self.is_sidecars_stream_terminated = true; - } - - pub fn add_custody_columns(&mut self, columns: Vec>>) { - for column in columns { - self.data_columns.push_back(column); - } - // TODO(das): this mechanism is dangerous, if somehow there are two requests for the - // same column index it can terminate early. This struct should track that all requests - // for all custody columns terminate. - self.custody_columns_streams_terminated += 1; - } - - pub fn into_responses(self, spec: &ChainSpec) -> Result>, String> { - if let Some(expects_custody_columns) = self.expects_custody_columns.clone() { - self.into_responses_with_custody_columns(expects_custody_columns, spec) + let block_data_request = if let Some(blobs_req_id) = blobs_req_id { + RangeBlockDataRequest::Blobs(ByRangeRequest::Active(blobs_req_id)) + } else if let Some((requests, expected_custody_columns)) = data_columns { + RangeBlockDataRequest::DataColumns { + requests: requests + .into_iter() + .map(|id| (id, ByRangeRequest::Active(id))) + .collect(), + expected_custody_columns, + } } else { - self.into_responses_with_blobs(spec) + RangeBlockDataRequest::NoData + }; + + Self { + blocks_request: ByRangeRequest::Active(blocks_req_id), + block_data_request, } } - fn into_responses_with_blobs(self, spec: &ChainSpec) -> Result>, String> { - let RangeBlockComponentsRequest { blocks, blobs, .. } = self; + pub fn add_blocks( + &mut self, + req_id: BlocksByRangeRequestId, + blocks: Vec>>, + ) -> Result<(), String> { + self.blocks_request.finish(req_id, blocks) + } + pub fn add_blobs( + &mut self, + req_id: BlobsByRangeRequestId, + blobs: Vec>>, + ) -> Result<(), String> { + match &mut self.block_data_request { + RangeBlockDataRequest::NoData => Err("received blobs but expected no data".to_owned()), + RangeBlockDataRequest::Blobs(ref mut req) => req.finish(req_id, blobs), + RangeBlockDataRequest::DataColumns { .. } => { + Err("received blobs but expected data columns".to_owned()) + } + } + } + + pub fn add_custody_columns( + &mut self, + req_id: DataColumnsByRangeRequestId, + columns: Vec>>, + ) -> Result<(), String> { + match &mut self.block_data_request { + RangeBlockDataRequest::NoData => { + Err("received data columns but expected no data".to_owned()) + } + RangeBlockDataRequest::Blobs(_) => { + Err("received data columns but expected blobs".to_owned()) + } + RangeBlockDataRequest::DataColumns { + ref mut requests, .. + } => { + let req = requests + .get_mut(&req_id) + .ok_or(format!("unknown data columns by range req_id {req_id}"))?; + req.finish(req_id, columns) + } + } + } + + pub fn responses(&self, spec: &ChainSpec) -> Option>, String>> { + let Some(blocks) = self.blocks_request.to_finished() else { + return None; + }; + + match &self.block_data_request { + RangeBlockDataRequest::NoData => { + Some(Self::responses_with_blobs(blocks.to_vec(), vec![], spec)) + } + RangeBlockDataRequest::Blobs(request) => { + let Some(blobs) = request.to_finished() else { + return None; + }; + Some(Self::responses_with_blobs( + blocks.to_vec(), + blobs.to_vec(), + spec, + )) + } + RangeBlockDataRequest::DataColumns { + requests, + expected_custody_columns, + } => { + let mut data_columns = vec![]; + for req in requests.values() { + let Some(data) = req.to_finished() else { + return None; + }; + data_columns.extend(data.clone()) + } + + Some(Self::responses_with_custody_columns( + blocks.to_vec(), + data_columns, + expected_custody_columns, + spec, + )) + } + } + } + + fn responses_with_blobs( + blocks: Vec>>, + blobs: Vec>>, + spec: &ChainSpec, + ) -> Result>, String> { // There can't be more more blobs than blocks. i.e. sending any blob (empty // included) for a skipped slot is not permitted. let mut responses = Vec::with_capacity(blocks.len()); @@ -129,17 +196,12 @@ impl RangeBlockComponentsRequest { Ok(responses) } - fn into_responses_with_custody_columns( - self, - expects_custody_columns: Vec, + fn responses_with_custody_columns( + blocks: Vec>>, + data_columns: DataColumnSidecarList, + expects_custody_columns: &[ColumnIndex], spec: &ChainSpec, ) -> Result>, String> { - let RangeBlockComponentsRequest { - blocks, - data_columns, - .. - } = self; - // Group data columns by block_root and index let mut data_columns_by_block = HashMap::>>>::new(); @@ -177,7 +239,7 @@ impl RangeBlockComponentsRequest { }; let mut custody_columns = vec![]; - for index in &expects_custody_columns { + for index in expects_custody_columns { let Some(data_column) = data_columns_by_index.remove(index) else { return Err(format!("No column for block {block_root:?} index {index}")); }; @@ -210,20 +272,27 @@ impl RangeBlockComponentsRequest { Ok(rpc_blocks) } +} - pub fn is_finished(&self) -> bool { - if !self.is_blocks_stream_terminated { - return false; - } - if self.expects_blobs && !self.is_sidecars_stream_terminated { - return false; - } - if let Some(expects_custody_column_responses) = self.num_custody_column_requests { - if self.custody_columns_streams_terminated < expects_custody_column_responses { - return false; +impl ByRangeRequest { + fn finish(&mut self, id: I, data: T) -> Result<(), String> { + match self { + Self::Active(expected_id) => { + if expected_id != &id { + return Err(format!("unexpected req_id expected {expected_id} got {id}")); + } + *self = Self::Complete(data); + Ok(()) } + Self::Complete(_) => Err("request already complete".to_owned()), + } + } + + fn to_finished(&self) -> Option<&T> { + match self { + Self::Active(_) => None, + Self::Complete(data) => Some(data), } - true } } @@ -233,9 +302,52 @@ mod tests { use beacon_chain::test_utils::{ generate_rand_block_and_blobs, generate_rand_block_and_data_columns, test_spec, NumBlobs, }; + use lighthouse_network::service::api_types::{ + BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, + DataColumnsByRangeRequestId, Id, RangeRequestId, + }; use rand::SeedableRng; use std::sync::Arc; - use types::{test_utils::XorShiftRng, ForkName, MinimalEthSpec as E, SignedBeaconBlock}; + use types::{test_utils::XorShiftRng, Epoch, ForkName, MinimalEthSpec as E, SignedBeaconBlock}; + + fn components_id() -> ComponentsByRangeRequestId { + ComponentsByRangeRequestId { + id: 0, + requester: RangeRequestId::RangeSync { + chain_id: 1, + batch_id: Epoch::new(0), + }, + } + } + + fn blocks_id(parent_request_id: ComponentsByRangeRequestId) -> BlocksByRangeRequestId { + BlocksByRangeRequestId { + id: 1, + parent_request_id, + } + } + + fn blobs_id(parent_request_id: ComponentsByRangeRequestId) -> BlobsByRangeRequestId { + BlobsByRangeRequestId { + id: 1, + parent_request_id, + } + } + + fn columns_id( + id: Id, + parent_request_id: ComponentsByRangeRequestId, + ) -> DataColumnsByRangeRequestId { + DataColumnsByRangeRequestId { + id, + parent_request_id, + } + } + + fn is_finished(info: &RangeBlockComponentsRequest) -> bool { + let spec = test_spec::(); + info.responses(&spec).is_some() + } #[test] fn no_blobs_into_responses() { @@ -248,14 +360,15 @@ mod tests { .into() }) .collect::>>>(); - let mut info = RangeBlockComponentsRequest::::new(false, None, None); + + let blocks_req_id = blocks_id(components_id()); + let mut info = RangeBlockComponentsRequest::::new(blocks_req_id, None, None); // Send blocks and complete terminate response - info.add_blocks(blocks); + info.add_blocks(blocks_req_id, blocks).unwrap(); // Assert response is finished and RpcBlocks can be constructed - assert!(info.is_finished()); - info.into_responses(&test_spec::()).unwrap(); + info.responses(&test_spec::()).unwrap().unwrap(); } #[test] @@ -275,18 +388,22 @@ mod tests { .into() }) .collect::>>>(); - let mut info = RangeBlockComponentsRequest::::new(true, None, None); + + let components_id = components_id(); + let blocks_req_id = blocks_id(components_id); + let blobs_req_id = blobs_id(components_id); + let mut info = + RangeBlockComponentsRequest::::new(blocks_req_id, Some(blobs_req_id), None); // Send blocks and complete terminate response - info.add_blocks(blocks); + info.add_blocks(blocks_req_id, blocks).unwrap(); // Expect no blobs returned - info.add_blobs(vec![]); + info.add_blobs(blobs_req_id, vec![]).unwrap(); // Assert response is finished and RpcBlocks can be constructed, even if blobs weren't returned. // This makes sure we don't expect blobs here when they have expired. Checking this logic should // be hendled elsewhere. - assert!(info.is_finished()); - info.into_responses(&test_spec::()).unwrap(); + info.responses(&test_spec::()).unwrap().unwrap(); } #[test] @@ -304,40 +421,49 @@ mod tests { ) }) .collect::>(); + + let components_id = components_id(); + let blocks_req_id = blocks_id(components_id); + let columns_req_id = expects_custody_columns + .iter() + .enumerate() + .map(|(i, _)| columns_id(i as Id, components_id)) + .collect::>(); let mut info = RangeBlockComponentsRequest::::new( - false, - Some(expects_custody_columns.clone()), - Some(expects_custody_columns.len()), + blocks_req_id, + None, + Some((columns_req_id.clone(), expects_custody_columns.clone())), ); // Send blocks and complete terminate response - info.add_blocks(blocks.iter().map(|b| b.0.clone().into()).collect()); + info.add_blocks( + blocks_req_id, + blocks.iter().map(|b| b.0.clone().into()).collect(), + ) + .unwrap(); // Assert response is not finished - assert!(!info.is_finished()); + assert!(!is_finished(&info)); // Send data columns for (i, &column_index) in expects_custody_columns.iter().enumerate() { info.add_custody_columns( + columns_req_id.get(i).copied().unwrap(), blocks .iter() .flat_map(|b| b.1.iter().filter(|d| d.index == column_index).cloned()) .collect(), - ); + ) + .unwrap(); if i < expects_custody_columns.len() - 1 { assert!( - !info.is_finished(), + !is_finished(&info), "requested should not be finished at loop {i}" ); - } else { - assert!( - info.is_finished(), - "request should be finishied at loop {i}" - ); } } // All completed construct response - info.into_responses(&spec).unwrap(); + info.responses(&spec).unwrap().unwrap(); } #[test] @@ -353,10 +479,18 @@ mod tests { (0..batched_column_requests.len() as u32).collect::>(); let num_of_data_column_requests = custody_column_request_ids.len(); + let components_id = components_id(); + let blocks_req_id = blocks_id(components_id); + let columns_req_id = batched_column_requests + .iter() + .enumerate() + .map(|(i, _)| columns_id(i as Id, components_id)) + .collect::>(); + let mut info = RangeBlockComponentsRequest::::new( - false, - Some(expects_custody_columns.clone()), - Some(num_of_data_column_requests), + blocks_req_id, + None, + Some((columns_req_id.clone(), expects_custody_columns.clone())), ); let mut rng = XorShiftRng::from_seed([42; 16]); @@ -372,13 +506,18 @@ mod tests { .collect::>(); // Send blocks and complete terminate response - info.add_blocks(blocks.iter().map(|b| b.0.clone().into()).collect()); + info.add_blocks( + blocks_req_id, + blocks.iter().map(|b| b.0.clone().into()).collect(), + ) + .unwrap(); // Assert response is not finished - assert!(!info.is_finished()); + assert!(!is_finished(&info)); for (i, column_indices) in batched_column_requests.iter().enumerate() { // Send the set of columns in the same batch request info.add_custody_columns( + columns_req_id.get(i).copied().unwrap(), blocks .iter() .flat_map(|b| { @@ -387,19 +526,18 @@ mod tests { .cloned() }) .collect::>(), - ); + ) + .unwrap(); if i < num_of_data_column_requests - 1 { assert!( - !info.is_finished(), + !is_finished(&info), "requested should not be finished at loop {i}" ); - } else { - assert!(info.is_finished(), "request should be finished at loop {i}"); } } // All completed construct response - info.into_responses(&spec).unwrap(); + info.responses(&spec).unwrap().unwrap(); } } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 1e6d21d68a..9a48e9aa5d 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -1167,7 +1167,7 @@ impl SyncManager { self.on_range_components_response( id.parent_request_id, peer_id, - RangeBlockComponent::Block(resp), + RangeBlockComponent::Block(id, resp), ); } } @@ -1182,7 +1182,7 @@ impl SyncManager { self.on_range_components_response( id.parent_request_id, peer_id, - RangeBlockComponent::Blob(resp), + RangeBlockComponent::Blob(id, resp), ); } } @@ -1200,7 +1200,7 @@ impl SyncManager { self.on_range_components_response( id.parent_request_id, peer_id, - RangeBlockComponent::CustodyColumns(resp), + RangeBlockComponent::CustodyColumns(id, resp), ); } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 68a963dd41..16fcf93bcf 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -218,9 +218,18 @@ pub struct SyncNetworkContext { /// Small enumeration to make dealing with block and blob requests easier. pub enum RangeBlockComponent { - Block(RpcResponseResult>>>), - Blob(RpcResponseResult>>>), - CustodyColumns(RpcResponseResult>>>), + Block( + BlocksByRangeRequestId, + RpcResponseResult>>>, + ), + Blob( + BlobsByRangeRequestId, + RpcResponseResult>>>, + ), + CustodyColumns( + DataColumnsByRangeRequestId, + RpcResponseResult>>>, + ), } impl SyncNetworkContext { @@ -386,7 +395,16 @@ impl SyncNetworkContext { requester, }; - let _blocks_req_id = self.send_blocks_by_range_request(peer_id, request.clone(), id)?; + // Compute custody column peers before sending the blocks_by_range request. If we don't have + // enough peers, error here. + let data_column_requests = if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) { + let column_indexes = self.network_globals().sampling_columns.clone(); + Some(self.make_columns_by_range_requests(request.clone(), &column_indexes)?) + } else { + None + }; + + let blocks_req_id = self.send_blocks_by_range_request(peer_id, request.clone(), id)?; let blobs_req_id = if matches!(batch_type, ByRangeRequestType::BlocksAndBlobs) { Some(self.send_blobs_by_range_request( @@ -401,35 +419,27 @@ impl SyncNetworkContext { None }; - let (expects_columns, data_column_requests) = - if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) { - let column_indexes = self.network_globals().sampling_columns.clone(); - let data_column_requests = self - .make_columns_by_range_requests(request, &column_indexes)? - .into_iter() - .map(|(peer_id, columns_by_range_request)| { - self.send_data_columns_by_range_request( - peer_id, - columns_by_range_request, - id, - ) - }) - .collect::, _>>()?; + let data_columns = if let Some(data_column_requests) = data_column_requests { + let data_column_requests = data_column_requests + .into_iter() + .map(|(peer_id, columns_by_range_request)| { + self.send_data_columns_by_range_request(peer_id, columns_by_range_request, id) + }) + .collect::, _>>()?; - ( - Some(column_indexes.into_iter().collect::>()), - Some(data_column_requests), - ) - } else { - (None, None) - }; + Some(( + data_column_requests, + self.network_globals() + .sampling_columns + .iter() + .cloned() + .collect::>(), + )) + } else { + None + }; - let expected_blobs = blobs_req_id.is_some(); - let info = RangeBlockComponentsRequest::new( - expected_blobs, - expects_columns, - data_column_requests.map(|items| items.len()), - ); + let info = RangeBlockComponentsRequest::new(blocks_req_id, blobs_req_id, data_columns); self.components_by_range_requests.insert(id, info); Ok(id.id) @@ -484,28 +494,33 @@ impl SyncNetworkContext { if let Err(e) = { let request = entry.get_mut(); match range_block_component { - RangeBlockComponent::Block(resp) => resp.map(|(blocks, _)| { - request.add_blocks(blocks); + RangeBlockComponent::Block(req_id, resp) => resp.and_then(|(blocks, _)| { + request + .add_blocks(req_id, blocks) + .map_err(RpcResponseError::BlockComponentCouplingError) }), - RangeBlockComponent::Blob(resp) => resp.map(|(blobs, _)| { - request.add_blobs(blobs); - }), - RangeBlockComponent::CustodyColumns(resp) => resp.map(|(custody_columns, _)| { - request.add_custody_columns(custody_columns); + RangeBlockComponent::Blob(req_id, resp) => resp.and_then(|(blobs, _)| { + request + .add_blobs(req_id, blobs) + .map_err(RpcResponseError::BlockComponentCouplingError) }), + RangeBlockComponent::CustodyColumns(req_id, resp) => { + resp.and_then(|(custody_columns, _)| { + request + .add_custody_columns(req_id, custody_columns) + .map_err(RpcResponseError::BlockComponentCouplingError) + }) + } } } { entry.remove(); return Some(Err(e)); } - if entry.get_mut().is_finished() { + if let Some(blocks_result) = entry.get().responses(&self.chain.spec) { + entry.remove(); // If the request is finished, dequeue everything - let request = entry.remove(); - let blocks = request - .into_responses(&self.chain.spec) - .map_err(RpcResponseError::BlockComponentCouplingError); - Some(blocks) + Some(blocks_result.map_err(RpcResponseError::BlockComponentCouplingError)) } else { None }