From 6aff52c5b488072015f2c9e5d331bed3a0813d13 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Mon, 8 May 2023 11:58:05 -0400 Subject: [PATCH] add some tests and fix a bug --- .../sync/block_lookups/single_block_lookup.rs | 84 ++-- .../network/src/sync/block_lookups/tests.rs | 417 +++++++++--------- 2 files changed, 245 insertions(+), 256 deletions(-) diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 56c9bda1d9..13ab2fad27 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -21,6 +21,7 @@ pub struct SingleBlockLookup { pub requested_ids: Vec, pub downloaded_blobs: FixedBlobSidecarList, pub downloaded_block: Option>>, + pub expected_num_blobs: Option, pub block_request_state: SingleLookupRequestState, pub blob_request_state: SingleLookupRequestState, pub da_checker: Arc>, @@ -88,6 +89,7 @@ impl SingleBlockLookup::default(), downloaded_block: None, downloaded_blobs: <_>::default(), + expected_num_blobs: None, block_request_state: SingleLookupRequestState::new(peer_source), blob_request_state: SingleLookupRequestState::new(peer_source), da_checker, @@ -187,32 +189,40 @@ impl SingleBlockLookup match block { - Some(block) => { - // Compute the block root using this specific function so that we can get timing - // metrics. - let block_root = get_block_root(&block); - if block_root != self.requested_block_root { - // return an error and drop the block - // NOTE: we take this is as a download failure to prevent counting the - // attempt as a chain failure, but simply a peer failure. - self.block_request_state.register_failure_downloading(); - Err(LookupVerifyError::RootMismatch) - } else { - // Return the block for processing. - self.block_request_state.state = State::Processing { peer_id }; - Ok(Some((block_root, block))) + State::Downloading { peer_id } => { + match block { + Some(block) => { + // Compute the block root using this specific function so that we can get timing + // metrics. + let block_root = get_block_root(&block); + if block_root != self.requested_block_root { + // return an error and drop the block + // NOTE: we take this is as a download failure to prevent counting the + // attempt as a chain failure, but simply a peer failure. + self.block_request_state.register_failure_downloading(); + Err(LookupVerifyError::RootMismatch) + } else { + self.expected_num_blobs = + block.message().body().blob_kzg_commitments().ok().map( + |commitments| { + std::cmp::min(self.requested_ids.len(), commitments.len()) + }, + ); + // Return the block for processing. + self.block_request_state.state = State::Processing { peer_id }; + Ok(Some((block_root, block))) + } + } + None => { + if peer_id.should_have_block() { + self.block_request_state.register_failure_downloading(); + Err(LookupVerifyError::NoBlockReturned) + } else { + Err(LookupVerifyError::BenignFailure) + } } } - None => { - if peer_id.should_have_block() { - self.block_request_state.register_failure_downloading(); - Err(LookupVerifyError::NoBlockReturned) - } else { - Err(LookupVerifyError::BenignFailure) - } - } - }, + } State::Processing { peer_id: _ } => match block { Some(_) => { // We sent the block for processing and received an extra block. @@ -257,16 +267,15 @@ impl SingleBlockLookup { - let downloaded_all_blobs = self.downloaded_all_blobs(); - if peer_source.should_have_blobs() && !downloaded_all_blobs { + let downloaded_expected_blobs = self.downloaded_expected_blobs(); + if peer_source.should_have_blobs() && !downloaded_expected_blobs { return Err(LookupVerifyError::NotEnoughBlobsReturned); - } else if !downloaded_all_blobs { + } else if !downloaded_expected_blobs { return Err(LookupVerifyError::BenignFailure); } self.blob_request_state.state = State::Processing { peer_id: peer_source, }; - dbg!("sending blobs for processing"); Ok(Some(( self.requested_block_root, self.downloaded_blobs.clone(), @@ -288,24 +297,15 @@ impl SingleBlockLookup bool { - let Some(expected_num_blobs) = self - .downloaded_block - .as_ref() - .and_then(|block| { - block - .message() - .body() - .blob_kzg_commitments().ok() - .map(|commitments| commitments.len()) - }) else { - return true - }; + fn downloaded_expected_blobs(&self) -> bool { + let Some(expected_num_blobs) = self.expected_num_blobs else { + return true; + }; let downloaded_num_blobs = self .downloaded_blobs .iter() - .map(|blob_opt| blob_opt.is_some()) + .filter(|blob_opt| blob_opt.is_some()) .count(); downloaded_num_blobs == expected_num_blobs } diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 466556b07c..64b043d600 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -1102,6 +1102,174 @@ mod deneb_only { use super::*; use std::str::FromStr; + struct DenebTester { + bl: BlockLookups, + cx: SyncNetworkContext, + rig: TestRig, + block: Option>>, + blobs: Vec>>, + peer_id: PeerId, + block_req_id: u32, + blob_req_id: u32, + slot: Slot, + block_root: Hash256, + } + + impl DenebTester { + fn new() -> Option { + let fork_name = get_fork_name(); + if !matches!(fork_name, ForkName::Deneb) { + return None; + } + let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + rig.harness.chain.slot_clock.set_slot( + E::slots_per_epoch() * rig.harness.spec.deneb_fork_epoch.unwrap().as_u64(), + ); + let (block, blobs) = rig.rand_block_and_blobs(fork_name, NumBlobs::Random); + let blobs = blobs.into_iter().map(Arc::new).collect::>(); + + let peer_id = PeerId::random(); + let slot = block.slot(); + let block_root = block.canonical_root(); + + // Trigger the request + bl.search_block(block_root, PeerSource::Attestation(peer_id), &mut cx); + let block_req_id = rig.expect_block_request(ResponseType::Block); + let blob_req_id = rig.expect_block_request(ResponseType::Blob); + + Some(Self { + bl, + cx, + rig, + block: Some(Arc::new(block)), + blobs, + peer_id, + block_req_id, + blob_req_id, + slot, + block_root, + }) + } + + fn block_response(mut self) -> Self { + // The peer provides the correct block, should not be penalized. Now the block should be sent + // for processing. + self.bl.single_block_lookup_response( + self.block_req_id, + self.peer_id, + self.block.clone(), + D, + &mut self.cx, + ); + self.rig.expect_empty_network(); + self.rig.expect_block_process(ResponseType::Block); + + // The request should still be active. + assert_eq!(self.bl.single_block_lookups.len(), 1); + self + } + + fn blobs_response(mut self) -> Self { + for blob in &self.blobs { + self.bl.single_blob_lookup_response( + self.blob_req_id, + self.peer_id, + Some(blob.clone()), + D, + &mut self.cx, + ); + self.rig.expect_empty_network(); + assert_eq!(self.bl.single_block_lookups.len(), 1); + } + // Send the blob stream termination. Peer should have not been penalized. + self.bl.single_blob_lookup_response( + self.blob_req_id, + self.peer_id, + None, + D, + &mut self.cx, + ); + self.rig.expect_empty_network(); + self.rig.expect_block_process(ResponseType::Blob); + self + } + + fn empty_block_response(mut self) -> Self { + self.bl.single_block_lookup_response( + self.block_req_id, + self.peer_id, + None, + D, + &mut self.cx, + ); + self + } + + fn empty_blobs_response(mut self) -> Self { + self.bl.single_blob_lookup_response( + self.blob_req_id, + self.peer_id, + None, + D, + &mut self.cx, + ); + self + } + + fn block_imported(mut self) -> Self { + // Missing blobs should be the request is not removed, the outstanding blobs request should + // mean we do not send a new request. + self.bl.single_block_processed( + self.block_req_id, + BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(self.block_root)), + ResponseType::Block, + &mut self.cx, + ); + self.rig.expect_empty_network(); + assert_eq!(self.bl.single_block_lookups.len(), 0); + self + } + + fn missing_components(mut self) -> Self { + self.bl.single_block_processed( + self.block_req_id, + BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( + self.slot, + self.block_root, + )), + ResponseType::Block, + &mut self.cx, + ); + assert_eq!(self.bl.single_block_lookups.len(), 1); + self + } + + fn expect_penalty(mut self) -> Self { + self.rig.expect_penalty(); + self + } + fn expect_no_penalty(mut self) -> Self { + self.rig.expect_empty_network(); + self + } + fn expect_block_request(mut self) -> Self { + self.rig.expect_block_request(ResponseType::Block); + self + } + fn expect_blobs_request(mut self) -> Self { + self.rig.expect_block_request(ResponseType::Blob); + self + } + fn expect_no_blobs_request(mut self) -> Self { + self.rig.expect_empty_network(); + self + } + fn expect_no_block_request(mut self) -> Self { + self.rig.expect_empty_network(); + self + } + } + fn get_fork_name() -> ForkName { ForkName::from_str( &std::env::var(beacon_chain::test_utils::FORK_NAME_ENV_VAR).unwrap_or_else(|e| { @@ -1117,243 +1285,64 @@ mod deneb_only { #[test] fn single_block_and_blob_lookup_block_returned_first() { - let fork_name = get_fork_name(); - if !matches!(fork_name, ForkName::Deneb) { + let Some(tester) = DenebTester::new() else { return; - } - let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); - rig.harness - .chain - .slot_clock - .set_slot(E::slots_per_epoch() * rig.harness.spec.deneb_fork_epoch.unwrap().as_u64()); + }; - let (block, blobs) = rig.rand_block_and_blobs(fork_name, NumBlobs::Random); - let slot = block.slot(); - let peer_id = PeerId::random(); - let block_root = block.canonical_root(); - - // Trigger the request - bl.search_block(block_root, PeerSource::Attestation(peer_id), &mut cx); - let block_id = rig.expect_block_request(ResponseType::Block); - let blob_id = rig.expect_block_request(ResponseType::Blob); - - // The peer provides the correct block, should not be penalized. Now the block should be sent - // for processing. - bl.single_block_lookup_response(block_id, peer_id, Some(block.into()), D, &mut cx); - rig.expect_empty_network(); - rig.expect_block_process(ResponseType::Block); - - // The request should still be active. - assert_eq!(bl.single_block_lookups.len(), 1); - - // Send the stream termination. Peer should have not been penalized. - bl.single_block_lookup_response(block_id, peer_id, None, D, &mut cx); - // Missing blobs should be the request is not removed, the outstanding blobs request should - // mean we do not send a new request. - bl.single_block_processed( - block_id, - BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( - slot, block_root, - )), - ResponseType::Block, - &mut cx, - ); - rig.expect_empty_network(); - assert_eq!(bl.single_block_lookups.len(), 1); - - for blob in blobs { - bl.single_blob_lookup_response(blob_id, peer_id, Some(Arc::new(blob)), D, &mut cx); - rig.expect_empty_network(); - assert_eq!(bl.single_block_lookups.len(), 1); - } - // Send the blob stream termination. Peer should have not been penalized. - bl.single_blob_lookup_response(blob_id, peer_id, None, D, &mut cx); - rig.expect_empty_network(); - rig.expect_block_process(ResponseType::Blob); - - // Missing blobs should be the request is not removed, the outstanding blobs request should - // mean we do not send a new request. - bl.single_block_processed( - block_id, - BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), - ResponseType::Block, - &mut cx, - ); - rig.expect_empty_network(); - assert_eq!(bl.single_block_lookups.len(), 0); + tester.block_response().blobs_response().block_imported(); } #[test] - fn single_block_and_blob_lookup_blob_returned_first() { - let fork_name = get_fork_name(); - if !matches!(fork_name, ForkName::Deneb) { + fn single_block_and_blob_lookup_blobs_returned_first() { + let Some(tester) = DenebTester::new() else { return; - } - let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); - rig.harness - .chain - .slot_clock - .set_slot(E::slots_per_epoch() * rig.harness.spec.deneb_fork_epoch.unwrap().as_u64()); + }; - let (block, blobs) = rig.rand_block_and_blobs(fork_name, NumBlobs::Random); - let slot = block.slot(); - let peer_id = PeerId::random(); - let block_root = block.canonical_root(); - - // Trigger the request - bl.search_block(block_root, PeerSource::Attestation(peer_id), &mut cx); - let block_id = rig.expect_block_request(ResponseType::Block); - let blob_id = rig.expect_block_request(ResponseType::Blob); - - for blob in blobs { - bl.single_blob_lookup_response(blob_id, peer_id, Some(Arc::new(blob)), D, &mut cx); - rig.expect_empty_network(); - assert_eq!(bl.single_block_lookups.len(), 1); - } - // Send the blob stream termination. Peer should have not been penalized. - bl.single_blob_lookup_response(blob_id, peer_id, None, D, &mut cx); - rig.expect_empty_network(); - rig.expect_block_process(ResponseType::Blob); - - // The request should still be active. - assert_eq!(bl.single_block_lookups.len(), 1); - - // The peer provides the correct block, should not be penalized. Now the block should be sent - // for processing. - bl.single_block_lookup_response(block_id, peer_id, Some(block.into()), D, &mut cx); - rig.expect_empty_network(); - rig.expect_block_process(ResponseType::Block); - - // Send the stream termination. Peer should have not been penalized. - bl.single_block_lookup_response(block_id, peer_id, None, D, &mut cx); - // Missing blobs should be the request is not removed, the outstanding blobs request should - // mean we do not send a new request. - bl.single_block_processed( - block_id, - BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( - slot, block_root, - )), - ResponseType::Block, - &mut cx, - ); - rig.expect_empty_network(); - assert_eq!(bl.single_block_lookups.len(), 1); - - // Missing blobs should be the request is not removed, the outstanding blobs request should - // mean we do not send a new request. - bl.single_block_processed( - block_id, - BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), - ResponseType::Block, - &mut cx, - ); - rig.expect_empty_network(); - assert_eq!(bl.single_block_lookups.len(), 0); + tester.blobs_response().block_response().block_imported(); } #[test] fn single_block_and_blob_lookup_empty_response() { - let response_type = ResponseType::Block; - let fork_name = get_fork_name(); - if !matches!(fork_name, ForkName::Deneb) { + let Some(tester) = DenebTester::new() else { return; - } - let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + }; - let block_hash = Hash256::random(); - let peer_id = PeerId::random(); - - // Trigger the request - bl.search_block(block_hash, PeerSource::Attestation(peer_id), &mut cx); - let id = rig.expect_block_request(response_type); - let blob_id = rig.expect_block_request(ResponseType::Blob); - - // The peer does not have the block. It should be penalized. - bl.single_block_lookup_response(id, peer_id, None, D, &mut cx); - rig.expect_penalty(); - - rig.expect_block_request(response_type); // it should be retried - rig.expect_empty_network(); // there should be no blob retry - - bl.single_blob_lookup_response(blob_id, peer_id, None, D, &mut cx); - rig.expect_empty_network(); // there should be no penalty or retry, we don't know - // whether we should have blobs + tester + .empty_block_response() + .expect_penalty() + .expect_block_request() + .expect_no_blobs_request() + .empty_blobs_response() + .expect_no_penalty() + .expect_no_block_request() + .expect_no_blobs_request(); } #[test] fn single_blob_lookup_empty_response() { - let response_type = ResponseType::Block; - let fork_name = get_fork_name(); - if !matches!(fork_name, ForkName::Deneb) { + let Some(tester) = DenebTester::new() else { return; - } - let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + }; - let block_hash = Hash256::random(); - let peer_id = PeerId::random(); - - // Trigger the request - bl.search_block(block_hash, PeerSource::Attestation(peer_id), &mut cx); - let id = rig.expect_block_request(response_type); - let _ = rig.expect_block_request(ResponseType::Blob); - - // The peer does not have the block. It should be penalized. - bl.single_blob_lookup_response(id, peer_id, None, D, &mut cx); - rig.expect_empty_network(); // there should be no penalty or retry, we don't know - // whether we should have blobs + tester + .empty_blobs_response() + .expect_no_penalty() + .expect_no_block_request() + .expect_no_blobs_request(); } #[test] - fn test_single_block_response_then_empty_blob_response() { - let fork_name = get_fork_name(); - if !matches!(fork_name, ForkName::Deneb) { + fn single_block_response_then_empty_blob_response() { + let Some(tester) = DenebTester::new() else { return; - } - let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); - rig.harness - .chain - .slot_clock - .set_slot(E::slots_per_epoch() * rig.harness.spec.deneb_fork_epoch.unwrap().as_u64()); + }; - let (block, _) = rig.rand_block_and_blobs(fork_name, NumBlobs::Random); - let slot = block.slot(); - let peer_id = PeerId::random(); - let block_root = block.canonical_root(); - - // Trigger the request - bl.search_block(block_root, PeerSource::Attestation(peer_id), &mut cx); - let block_id = rig.expect_block_request(ResponseType::Block); - let blob_id = rig.expect_block_request(ResponseType::Blob); - - // The peer provides the correct block, should not be penalized. Now the block should be sent - // for processing. - bl.single_block_lookup_response(block_id, peer_id, Some(block.into()), D, &mut cx); - rig.expect_empty_network(); - rig.expect_block_process(ResponseType::Block); - - // The request should still be active. - assert_eq!(bl.single_block_lookups.len(), 1); - - // Send the stream termination. Peer should have not been penalized. - bl.single_block_lookup_response(block_id, peer_id, None, D, &mut cx); - // Missing blobs should be the request is not removed, the outstanding blobs request should - // mean we do not send a new request. - bl.single_block_processed( - block_id, - BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( - slot, block_root, - )), - ResponseType::Block, - &mut cx, - ); - rig.expect_empty_network(); - assert_eq!(bl.single_block_lookups.len(), 1); - - // The peer does not have the block. It should be penalized. - bl.single_blob_lookup_response(blob_id, peer_id, None, D, &mut cx); - rig.expect_penalty(); - - rig.expect_block_request(ResponseType::Blob); // it should be retried - rig.expect_empty_network(); + tester + .block_response() + .missing_components() + .empty_blobs_response() + .expect_penalty() + .expect_blobs_request() + .expect_no_block_request(); } }