diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index fcfd8eaa36..efddc393bc 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -147,19 +147,20 @@ impl DataAvailabilityChecker { self.availability_cache .read() .get(block_root) - .map_or(vec![], |cache| { - if let Some(block) = cache.executed_block.as_ref() { + .and_then(|cache| { + cache.executed_block.as_ref().map(|block| { block.get_filtered_blob_ids(|i, _| cache.verified_blobs.get(i).is_none()) - } else { - let mut blob_ids = Vec::with_capacity(T::max_blobs_per_block()); - for i in 0..T::max_blobs_per_block() { - blob_ids.push(BlobIdentifier { - block_root: *block_root, - index: i as u64, - }); - } - blob_ids + }) + }) + .unwrap_or_else(|| { + let mut blob_ids = Vec::with_capacity(T::max_blobs_per_block()); + for i in 0..T::max_blobs_per_block() { + blob_ids.push(BlobIdentifier { + block_root: *block_root, + index: i as u64, + }); } + blob_ids }) } else { vec![] diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 0fad8d0ff2..7c868c04f4 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -51,4 +51,5 @@ execution_layer = { path = "../execution_layer" } [features] deterministic_long_lived_attnets = [ "ethereum-types" ] spec-minimal = ["beacon_chain/spec-minimal"] +fork_from_env = ["beacon_chain/fork_from_env"] # default = ["deterministic_long_lived_attnets"] diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 826998128f..3bb4ece4ae 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -521,9 +521,7 @@ impl BlockLookups { self.search_block(block_root, peer_id, PeerShouldHave::BlockAndBlobs, cx); self.parent_lookups.push(parent_lookup) } - Err(e) => { - - } + Err(e) => {} } } Ok(None) => { diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index ba6e694931..ec47faa372 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -242,7 +242,7 @@ impl SingleBlockLookup>>, ) -> Result>, LookupVerifyError> { - match self.block_request_state.state { + match self.blob_request_state.state { State::AwaitingDownload => { self.blob_request_state.register_failure_downloading(); Err(LookupVerifyError::ExtraBlobsReturned) @@ -277,6 +277,7 @@ impl SingleBlockLookup match blob { Some(_) => { + dbg!("here"); // We sent the blob for processing and received an extra blob. self.blob_request_state.register_failure_downloading(); Err(LookupVerifyError::ExtraBlobsReturned) @@ -350,8 +351,9 @@ impl SingleBlockLookup SignedBeaconBlock { + self.rand_block_and_blobs(fork_name).0 + } + + fn rand_block_and_blobs( + &mut self, + fork_name: ForkName, + ) -> (SignedBeaconBlock, Vec>) { let inner = map_fork_name!(fork_name, BeaconBlock, <_>::random_for_test(&mut self.rng)); let mut block = SignedBeaconBlock::from_block(inner, types::Signature::random_for_test(&mut self.rng)); + let mut blob_sidecars = vec![]; if let Ok(message) = block.message_deneb_mut() { // get random number between 0 and Max Blobs let mut payload: &mut FullPayloadDeneb = &mut message.body.execution_payload; @@ -90,14 +99,41 @@ impl TestRig { &self.harness.chain.kzg.as_ref().unwrap(), ) .unwrap(); + //TODO(sean) maybe we want to keep other random txs ? payload.execution_payload.transactions = <_>::default(); for tx in Vec::from(transactions) { payload.execution_payload.transactions.push(tx).unwrap(); } message.body.blob_kzg_commitments = bundle.commitments.clone(); + + let BlobsBundleV1 { + commitments, + proofs, + blobs, + } = bundle; + + let block_root = block.canonical_root(); + + for (index, ((blob, kzg_commitment), kzg_proof)) in blobs + .into_iter() + .zip(commitments.into_iter()) + .zip(proofs.into_iter()) + .enumerate() + { + blob_sidecars.push(BlobSidecar { + block_root, + index: index as u64, + slot: block.slot(), + block_parent_root: block.parent_root(), + proposer_index: block.message().proposer_index(), + blob: blob.clone(), + kzg_commitment: kzg_commitment.clone(), + kzg_proof: kzg_proof.clone(), + }); + } } - block + (block, blob_sidecars) } #[track_caller] @@ -204,14 +240,14 @@ impl TestRig { } macro_rules! common_tests { - ($mod_name: ident, $fork_name: ident, $response_type: ident) => { + ($mod_name: ident, $fork_name: ident) => { #[cfg(test)] mod $mod_name { use super::*; #[test] fn test_single_block_lookup_happy_path() { let fork_name = ForkName::$fork_name; - let response_type = ResponseType::$response_type; + let response_type = ResponseType::Block; let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); let block = rig.rand_block(fork_name); @@ -236,7 +272,7 @@ macro_rules! common_tests { bl.single_block_processed( id, BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), - $response_type, + response_type, &mut cx, ); rig.expect_empty_network(); @@ -245,7 +281,7 @@ macro_rules! common_tests { #[test] fn test_single_block_lookup_empty_response() { - let response_type = ResponseType::$response_type; + let response_type = ResponseType::Block; let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); let block_hash = Hash256::random(); @@ -265,7 +301,7 @@ macro_rules! common_tests { #[test] fn test_single_block_lookup_wrong_response() { let fork_name = ForkName::$fork_name; - let response_type = ResponseType::$response_type; + let response_type = ResponseType::Block; let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); let block_hash = Hash256::random(); @@ -288,7 +324,7 @@ macro_rules! common_tests { #[test] fn test_single_block_lookup_failure() { - let response_type = ResponseType::$response_type; + let response_type = ResponseType::Block; let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); let block_hash = Hash256::random(); @@ -307,7 +343,7 @@ macro_rules! common_tests { #[test] fn test_single_block_lookup_becomes_parent_request() { let fork_name = ForkName::$fork_name; - let response_type = ResponseType::$response_type; + let response_type = ResponseType::Block; let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); let block = Arc::new(rig.rand_block(fork_name)); @@ -336,7 +372,7 @@ macro_rules! common_tests { bl.single_block_processed( id, BlockError::ParentUnknown(block.into()).into(), - $response_type, + response_type, &mut cx, ); assert_eq!(bl.single_block_lookups.len(), 1); @@ -348,7 +384,7 @@ macro_rules! common_tests { #[test] fn test_parent_lookup_happy_path() { let fork_name = ForkName::$fork_name; - let response_type = ResponseType::$response_type; + let response_type = ResponseType::Block; let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); let parent = rig.rand_block(fork_name); @@ -372,7 +408,7 @@ macro_rules! common_tests { bl.parent_block_processed( chain_hash, BlockError::BlockIsAlreadyKnown.into(), - $response_type, + response_type, &mut cx, ); rig.expect_parent_chain_process(); @@ -386,7 +422,7 @@ macro_rules! common_tests { #[test] fn test_parent_lookup_wrong_response() { let fork_name = ForkName::$fork_name; - let response_type = ResponseType::$response_type; + let response_type = ResponseType::Block; let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); let parent = rig.rand_block(fork_name); @@ -419,7 +455,7 @@ macro_rules! common_tests { bl.parent_block_processed( chain_hash, BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), - $response_type, + response_type, &mut cx, ); rig.expect_parent_chain_process(); @@ -433,7 +469,7 @@ macro_rules! common_tests { #[test] fn test_parent_lookup_empty_response() { let fork_name = ForkName::$fork_name; - let response_type = ResponseType::$response_type; + let response_type = ResponseType::Block; let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); let parent = rig.rand_block(fork_name); @@ -461,7 +497,7 @@ macro_rules! common_tests { bl.parent_block_processed( chain_hash, BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), - $response_type, + response_type, &mut cx, ); rig.expect_parent_chain_process(); @@ -475,7 +511,7 @@ macro_rules! common_tests { #[test] fn test_parent_lookup_rpc_failure() { let fork_name = ForkName::$fork_name; - let response_type = ResponseType::$response_type; + let response_type = ResponseType::Block; let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); let parent = rig.rand_block(fork_name); @@ -510,7 +546,7 @@ macro_rules! common_tests { bl.parent_block_processed( chain_hash, BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), - $response_type, + response_type, &mut cx, ); rig.expect_parent_chain_process(); @@ -524,7 +560,7 @@ macro_rules! common_tests { #[test] fn test_parent_lookup_too_many_attempts() { let fork_name = ForkName::$fork_name; - let response_type = ResponseType::$response_type; + let response_type = ResponseType::Block; let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); let parent = rig.rand_block(fork_name); @@ -578,7 +614,7 @@ macro_rules! common_tests { #[test] fn test_parent_lookup_too_many_download_attempts_no_blacklist() { let fork_name = ForkName::$fork_name; - let response_type = ResponseType::$response_type; + let response_type = ResponseType::Block; let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); let parent = rig.rand_block(fork_name); @@ -624,7 +660,7 @@ macro_rules! common_tests { #[test] fn test_parent_lookup_too_many_processing_attempts_must_blacklist() { let fork_name = ForkName::$fork_name; - let response_type = ResponseType::$response_type; + let response_type = ResponseType::Block; const PROCESSING_FAILURES: u8 = parent_lookup::PARENT_FAIL_TOLERANCE / 2 + 1; let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); @@ -662,7 +698,7 @@ macro_rules! common_tests { bl.parent_block_processed( block_root, BlockError::InvalidSignature.into(), - $response_type, + response_type, &mut cx, ); bl.parent_lookup_response(id, peer_id, None, D, &mut cx); @@ -676,7 +712,7 @@ macro_rules! common_tests { #[test] fn test_parent_lookup_too_deep() { let fork_name = ForkName::$fork_name; - let response_type = ResponseType::$response_type; + let response_type = ResponseType::Block; let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); let mut blocks = Vec::>>::with_capacity( parent_lookup::PARENT_DEPTH_TOLERANCE, @@ -716,7 +752,7 @@ macro_rules! common_tests { bl.parent_block_processed( chain_hash, BlockError::ParentUnknown(block.into()).into(), - $response_type, + response_type, &mut cx, ) } @@ -749,7 +785,7 @@ macro_rules! common_tests { #[test] fn test_single_block_lookup_ignored_response() { let fork_name = ForkName::$fork_name; - let response_type = ResponseType::$response_type; + let response_type = ResponseType::Block; let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); let block = rig.rand_block(fork_name); @@ -780,7 +816,7 @@ macro_rules! common_tests { bl.single_block_processed( id, BlockProcessingResult::Ignored, - $response_type, + response_type, &mut cx, ); rig.expect_empty_network(); @@ -790,7 +826,7 @@ macro_rules! common_tests { #[test] fn test_parent_lookup_ignored_response() { let fork_name = ForkName::$fork_name; - let response_type = ResponseType::$response_type; + let response_type = ResponseType::Block; let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); let parent = rig.rand_block(fork_name); @@ -814,7 +850,7 @@ macro_rules! common_tests { bl.parent_block_processed( chain_hash, BlockProcessingResult::Ignored, - $response_type, + response_type, &mut cx, ); rig.expect_empty_network(); @@ -825,7 +861,7 @@ macro_rules! common_tests { #[test] fn test_same_chain_race_condition() { let fork_name = ForkName::$fork_name; - let response_type = ResponseType::$response_type; + let response_type = ResponseType::Block; let (mut bl, mut cx, mut rig) = TestRig::test_setup(true); #[track_caller] @@ -886,14 +922,14 @@ macro_rules! common_tests { bl.parent_block_processed( chain_hash, BlockError::BlockIsAlreadyKnown.into(), - $response_type, + response_type, &mut cx, ) } else { bl.parent_block_processed( chain_hash, BlockError::ParentUnknown(block.into()).into(), - $response_type, + response_type, &mut cx, ) } @@ -926,9 +962,743 @@ macro_rules! common_tests { } }; } -use crate::sync::manager::ResponseType::{Blob, Block}; -common_tests!(base, Base, Block); -common_tests!(capella, Capella, Block); -common_tests!(deneb, Deneb, Block); -// common_tests!(deneb, Deneb, Blob); +common_tests!(base, Base); +common_tests!(capella, Capella); +common_tests!(deneb, Deneb); + +mod deneb_only { + use super::*; + #[test] + fn test_single_block_lookup_happy_path() { + let fork_name = ForkName::Deneb; + + let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + rig.harness + .chain + .slot_clock + .set_slot(E::slots_per_epoch() * rig.harness.spec.deneb_fork_epoch.unwrap().as_u64()); + + let (block, blobs) = rig.rand_block_and_blobs(fork_name); + let slot = block.slot(); + let peer_id = PeerId::random(); + let block_root = block.canonical_root(); + + // Trigger the request + bl.search_block(block_root, peer_id, PeerShouldHave::BlockAndBlobs, &mut cx); + let block_id = rig.expect_block_request(ResponseType::Block); + let blob_id = rig.expect_block_request(ResponseType::Blob); + + // The peer provides the correct block, should not be penalized. Now the block should be sent + // for processing. + bl.single_block_lookup_response(block_id, peer_id, Some(block.into()), D, &mut cx); + rig.expect_empty_network(); + rig.expect_block_process(ResponseType::Block); + + // The request should still be active. + assert_eq!(bl.single_block_lookups.len(), 1); + + // Send the stream termination. Peer should have not been penalized. + bl.single_block_lookup_response(block_id, peer_id, None, D, &mut cx); + // Missing blobs should be the request is not removed, the outstanding blobs request should + // mean we do not send a new request. + bl.single_block_processed( + block_id, + BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( + slot, block_root, + )), + ResponseType::Block, + &mut cx, + ); + rig.expect_empty_network(); + assert_eq!(bl.single_block_lookups.len(), 1); + + for blob in blobs { + bl.single_blob_lookup_response(blob_id, peer_id, Some(Arc::new(blob)), D, &mut cx); + rig.expect_empty_network(); + assert_eq!(bl.single_block_lookups.len(), 1); + } + // Send the blob stream termination. Peer should have not been penalized. + bl.single_blob_lookup_response(block_id, peer_id, None, D, &mut cx); + + // Missing blobs should be the request is not removed, the outstanding blobs request should + // mean we do not send a new request. + bl.single_block_processed( + block_id, + BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), + ResponseType::Block, + &mut cx, + ); + rig.expect_empty_network(); + assert_eq!(bl.single_block_lookups.len(), 0); + } + + // #[test] + // fn test_single_block_lookup_empty_response() { + // let response_type = ResponseType::Block; + // let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + // + // let block_hash = Hash256::random(); + // let peer_id = PeerId::random(); + // + // // Trigger the request + // bl.search_block(block_hash, peer_id, PeerShouldHave::BlockAndBlobs, &mut cx); + // let id = rig.expect_block_request(response_type); + // + // // The peer does not have the block. It should be penalized. + // bl.single_block_lookup_response(id, peer_id, None, D, &mut cx); + // rig.expect_penalty(); + // + // rig.expect_block_request(response_type); // it should be retried + // } + // + // #[test] + // fn test_single_block_lookup_wrong_response() { + // let fork_name = ForkName::Deneb; + // let response_type = ResponseType::Block; + // let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + // + // let block_hash = Hash256::random(); + // let peer_id = PeerId::random(); + // + // // Trigger the request + // bl.search_block(block_hash, peer_id, PeerShouldHave::BlockAndBlobs, &mut cx); + // let id = rig.expect_block_request(response_type); + // + // // Peer sends something else. It should be penalized. + // let bad_block = rig.rand_block(fork_name); + // bl.single_block_lookup_response(id, peer_id, Some(bad_block.into()), D, &mut cx); + // rig.expect_penalty(); + // rig.expect_block_request(response_type); // should be retried + // + // // Send the stream termination. This should not produce an additional penalty. + // bl.single_block_lookup_response(id, peer_id, None, D, &mut cx); + // rig.expect_empty_network(); + // } + // + // #[test] + // fn test_single_block_lookup_failure() { + // let response_type = ResponseType::Block; + // let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + // + // let block_hash = Hash256::random(); + // let peer_id = PeerId::random(); + // + // // Trigger the request + // bl.search_block(block_hash, peer_id, PeerShouldHave::BlockAndBlobs, &mut cx); + // let id = rig.expect_block_request(response_type); + // + // // The request fails. RPC failures are handled elsewhere so we should not penalize the peer. + // bl.single_block_lookup_failed(id, &peer_id, &mut cx, RPCError::UnsupportedProtocol); + // rig.expect_block_request(response_type); + // rig.expect_empty_network(); + // } + // + // #[test] + // fn test_single_block_lookup_becomes_parent_request() { + // let fork_name = ForkName::Deneb; + // let response_type = ResponseType::Block; + // let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + // + // let block = Arc::new(rig.rand_block(fork_name)); + // let peer_id = PeerId::random(); + // + // // Trigger the request + // bl.search_block( + // block.canonical_root(), + // peer_id, + // PeerShouldHave::BlockAndBlobs, + // &mut cx, + // ); + // let id = rig.expect_block_request(response_type); + // + // // The peer provides the correct block, should not be penalized. Now the block should be sent + // // for processing. + // bl.single_block_lookup_response(id, peer_id, Some(block.clone()), D, &mut cx); + // rig.expect_empty_network(); + // rig.expect_block_process(response_type); + // + // // The request should still be active. + // assert_eq!(bl.single_block_lookups.len(), 1); + // + // // Send the stream termination. Peer should have not been penalized, and the request moved to a + // // parent request after processing. + // bl.single_block_processed( + // id, + // BlockError::ParentUnknown(block.into()).into(), + // response_type, + // &mut cx, + // ); + // assert_eq!(bl.single_block_lookups.len(), 1); + // rig.expect_parent_request(response_type); + // rig.expect_empty_network(); + // assert_eq!(bl.parent_lookups.len(), 1); + // } + // + // #[test] + // fn test_parent_lookup_happy_path() { + // let fork_name = ForkName::Deneb; + // let response_type = ResponseType::Block; + // let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + // + // let parent = rig.rand_block(fork_name); + // let block = rig.block_with_parent(parent.canonical_root(), fork_name); + // let chain_hash = block.canonical_root(); + // let peer_id = PeerId::random(); + // let block_root = block.canonical_root(); + // let parent_root = block.parent_root(); + // let slot = block.slot(); + // + // // Trigger the request + // bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); + // let id = rig.expect_parent_request(response_type); + // + // // Peer sends the right block, it should be sent for processing. Peer should not be penalized. + // bl.parent_lookup_response(id, peer_id, Some(parent.into()), D, &mut cx); + // rig.expect_block_process(response_type); + // rig.expect_empty_network(); + // + // // Processing succeeds, now the rest of the chain should be sent for processing. + // bl.parent_block_processed( + // chain_hash, + // BlockError::BlockIsAlreadyKnown.into(), + // response_type, + // &mut cx, + // ); + // rig.expect_parent_chain_process(); + // let process_result = BatchProcessResult::Success { + // was_non_empty: true, + // }; + // bl.parent_chain_processed(chain_hash, process_result, &mut cx); + // assert_eq!(bl.parent_lookups.len(), 0); + // } + // + // #[test] + // fn test_parent_lookup_wrong_response() { + // let fork_name = ForkName::Deneb; + // let response_type = ResponseType::Block; + // let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + // + // let parent = rig.rand_block(fork_name); + // let block = rig.block_with_parent(parent.canonical_root(), fork_name); + // let chain_hash = block.canonical_root(); + // let peer_id = PeerId::random(); + // let block_root = block.canonical_root(); + // let parent_root = block.parent_root(); + // let slot = block.slot(); + // + // // Trigger the request + // bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); + // let id1 = rig.expect_parent_request(response_type); + // + // // Peer sends the wrong block, peer should be penalized and the block re-requested. + // let bad_block = rig.rand_block(fork_name); + // bl.parent_lookup_response(id1, peer_id, Some(bad_block.into()), D, &mut cx); + // rig.expect_penalty(); + // let id2 = rig.expect_parent_request(response_type); + // + // // Send the stream termination for the first request. This should not produce extra penalties. + // bl.parent_lookup_response(id1, peer_id, None, D, &mut cx); + // rig.expect_empty_network(); + // + // // Send the right block this time. + // bl.parent_lookup_response(id2, peer_id, Some(parent.into()), D, &mut cx); + // rig.expect_block_process(response_type); + // + // // Processing succeeds, now the rest of the chain should be sent for processing. + // bl.parent_block_processed( + // chain_hash, + // BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), + // response_type, + // &mut cx, + // ); + // rig.expect_parent_chain_process(); + // let process_result = BatchProcessResult::Success { + // was_non_empty: true, + // }; + // bl.parent_chain_processed(chain_hash, process_result, &mut cx); + // assert_eq!(bl.parent_lookups.len(), 0); + // } + // + // #[test] + // fn test_parent_lookup_empty_response() { + // let fork_name = ForkName::Deneb; + // let response_type = ResponseType::Block; + // let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + // + // let parent = rig.rand_block(fork_name); + // let block = rig.block_with_parent(parent.canonical_root(), fork_name); + // let chain_hash = block.canonical_root(); + // let peer_id = PeerId::random(); + // let block_root = block.canonical_root(); + // let parent_root = block.parent_root(); + // let slot = block.slot(); + // + // // Trigger the request + // bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); + // let id1 = rig.expect_parent_request(response_type); + // + // // Peer sends an empty response, peer should be penalized and the block re-requested. + // bl.parent_lookup_response(id1, peer_id, None, D, &mut cx); + // rig.expect_penalty(); + // let id2 = rig.expect_parent_request(response_type); + // + // // Send the right block this time. + // bl.parent_lookup_response(id2, peer_id, Some(parent.into()), D, &mut cx); + // rig.expect_block_process(response_type); + // + // // Processing succeeds, now the rest of the chain should be sent for processing. + // bl.parent_block_processed( + // chain_hash, + // BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), + // response_type, + // &mut cx, + // ); + // rig.expect_parent_chain_process(); + // let process_result = BatchProcessResult::Success { + // was_non_empty: true, + // }; + // bl.parent_chain_processed(chain_hash, process_result, &mut cx); + // assert_eq!(bl.parent_lookups.len(), 0); + // } + // + // #[test] + // fn test_parent_lookup_rpc_failure() { + // let fork_name = ForkName::Deneb; + // let response_type = ResponseType::Block; + // let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + // + // let parent = rig.rand_block(fork_name); + // let block = rig.block_with_parent(parent.canonical_root(), fork_name); + // let chain_hash = block.canonical_root(); + // let peer_id = PeerId::random(); + // let block_root = block.canonical_root(); + // let parent_root = block.parent_root(); + // let slot = block.slot(); + // + // // Trigger the request + // bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); + // let id1 = rig.expect_parent_request(response_type); + // + // // The request fails. It should be tried again. + // bl.parent_lookup_failed( + // id1, + // peer_id, + // &mut cx, + // RPCError::ErrorResponse( + // RPCResponseErrorCode::ResourceUnavailable, + // "older than deneb".into(), + // ), + // ); + // let id2 = rig.expect_parent_request(response_type); + // + // // Send the right block this time. + // bl.parent_lookup_response(id2, peer_id, Some(parent.into()), D, &mut cx); + // rig.expect_block_process(response_type); + // + // // Processing succeeds, now the rest of the chain should be sent for processing. + // bl.parent_block_processed( + // chain_hash, + // BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), + // response_type, + // &mut cx, + // ); + // rig.expect_parent_chain_process(); + // let process_result = BatchProcessResult::Success { + // was_non_empty: true, + // }; + // bl.parent_chain_processed(chain_hash, process_result, &mut cx); + // assert_eq!(bl.parent_lookups.len(), 0); + // } + // + // #[test] + // fn test_parent_lookup_too_many_attempts() { + // let fork_name = ForkName::Deneb; + // let response_type = ResponseType::Block; + // let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + // + // let parent = rig.rand_block(fork_name); + // let block = rig.block_with_parent(parent.canonical_root(), fork_name); + // let peer_id = PeerId::random(); + // let block_root = block.canonical_root(); + // let parent_root = block.parent_root(); + // let slot = block.slot(); + // + // // Trigger the request + // bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); + // for i in 1..=parent_lookup::PARENT_FAIL_TOLERANCE { + // let id = rig.expect_parent_request(response_type); + // match i % 2 { + // // make sure every error is accounted for + // 0 => { + // // The request fails. It should be tried again. + // bl.parent_lookup_failed( + // id, + // peer_id, + // &mut cx, + // RPCError::ErrorResponse( + // RPCResponseErrorCode::ResourceUnavailable, + // "older than deneb".into(), + // ), + // ); + // } + // _ => { + // // Send a bad block this time. It should be tried again. + // let bad_block = rig.rand_block(fork_name); + // bl.parent_lookup_response(id, peer_id, Some(bad_block.into()), D, &mut cx); + // // Send the stream termination + // bl.parent_lookup_response(id, peer_id, None, D, &mut cx); + // rig.expect_penalty(); + // } + // } + // if i < parent_lookup::PARENT_FAIL_TOLERANCE { + // assert_eq!(bl.parent_lookups[0].failed_block_attempts(), dbg!(i)); + // } + // } + // + // assert_eq!(bl.parent_lookups.len(), 0); + // } + // + // #[test] + // fn test_parent_lookup_too_many_download_attempts_no_blacklist() { + // let fork_name = ForkName::Deneb; + // let response_type = ResponseType::Block; + // let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + // + // let parent = rig.rand_block(fork_name); + // let block = rig.block_with_parent(parent.canonical_root(), fork_name); + // let block_hash = block.canonical_root(); + // let peer_id = PeerId::random(); + // let block_root = block.canonical_root(); + // let parent_root = block.parent_root(); + // let slot = block.slot(); + // + // // Trigger the request + // bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); + // for i in 1..=parent_lookup::PARENT_FAIL_TOLERANCE { + // assert!(!bl.failed_chains.contains(&block_hash)); + // let id = rig.expect_parent_request(response_type); + // if i % 2 != 0 { + // // The request fails. It should be tried again. + // bl.parent_lookup_failed( + // id, + // peer_id, + // &mut cx, + // RPCError::ErrorResponse( + // RPCResponseErrorCode::ResourceUnavailable, + // "older than deneb".into(), + // ), + // ); + // } else { + // // Send a bad block this time. It should be tried again. + // let bad_block = rig.rand_block(fork_name); + // bl.parent_lookup_response(id, peer_id, Some(bad_block.into()), D, &mut cx); + // rig.expect_penalty(); + // } + // if i < parent_lookup::PARENT_FAIL_TOLERANCE { + // assert_eq!(bl.parent_lookups[0].failed_block_attempts(), dbg!(i)); + // } + // } + // + // assert_eq!(bl.parent_lookups.len(), 0); + // assert!(!bl.failed_chains.contains(&block_hash)); + // assert!(!bl.failed_chains.contains(&parent.canonical_root())); + // } + // + // #[test] + // fn test_parent_lookup_too_many_processing_attempts_must_blacklist() { + // let fork_name = ForkName::Deneb; + // let response_type = ResponseType::Block; + // const PROCESSING_FAILURES: u8 = parent_lookup::PARENT_FAIL_TOLERANCE / 2 + 1; + // let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + // + // let parent = Arc::new(rig.rand_block(fork_name)); + // let block = rig.block_with_parent(parent.canonical_root(), fork_name); + // let peer_id = PeerId::random(); + // let block_root = block.canonical_root(); + // let parent_root = block.parent_root(); + // let slot = block.slot(); + // + // // Trigger the request + // bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); + // + // // Fail downloading the block + // for _ in 0..(parent_lookup::PARENT_FAIL_TOLERANCE - PROCESSING_FAILURES) { + // let id = rig.expect_parent_request(response_type); + // // The request fails. It should be tried again. + // bl.parent_lookup_failed( + // id, + // peer_id, + // &mut cx, + // RPCError::ErrorResponse( + // RPCResponseErrorCode::ResourceUnavailable, + // "older than deneb".into(), + // ), + // ); + // } + // + // // Now fail processing a block in the parent request + // for _ in 0..PROCESSING_FAILURES { + // let id = dbg!(rig.expect_parent_request(response_type)); + // assert!(!bl.failed_chains.contains(&block_root)); + // // send the right parent but fail processing + // bl.parent_lookup_response(id, peer_id, Some(parent.clone()), D, &mut cx); + // bl.parent_block_processed( + // block_root, + // BlockError::InvalidSignature.into(), + // response_type, + // &mut cx, + // ); + // bl.parent_lookup_response(id, peer_id, None, D, &mut cx); + // rig.expect_penalty(); + // } + // + // assert!(bl.failed_chains.contains(&block_root)); + // assert_eq!(bl.parent_lookups.len(), 0); + // } + // + // #[test] + // fn test_parent_lookup_too_deep() { + // let fork_name = ForkName::Deneb; + // let response_type = ResponseType::Block; + // let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + // let mut blocks = + // Vec::>>::with_capacity(parent_lookup::PARENT_DEPTH_TOLERANCE); + // while blocks.len() < parent_lookup::PARENT_DEPTH_TOLERANCE { + // let parent = blocks + // .last() + // .map(|b| b.canonical_root()) + // .unwrap_or_else(Hash256::random); + // let block = Arc::new(rig.block_with_parent(parent, fork_name)); + // blocks.push(block); + // } + // + // let peer_id = PeerId::random(); + // let trigger_block = blocks.pop().unwrap(); + // let chain_hash = trigger_block.canonical_root(); + // let trigger_block_root = trigger_block.canonical_root(); + // let trigger_parent_root = trigger_block.parent_root(); + // let trigger_slot = trigger_block.slot(); + // bl.search_parent( + // trigger_slot, + // trigger_block_root, + // trigger_parent_root, + // peer_id, + // &mut cx, + // ); + // + // for block in blocks.into_iter().rev() { + // let id = rig.expect_parent_request(response_type); + // // the block + // bl.parent_lookup_response(id, peer_id, Some(block.clone()), D, &mut cx); + // // the stream termination + // bl.parent_lookup_response(id, peer_id, None, D, &mut cx); + // // the processing request + // rig.expect_block_process(response_type); + // // the processing result + // bl.parent_block_processed( + // chain_hash, + // BlockError::ParentUnknown(block.into()).into(), + // response_type, + // &mut cx, + // ) + // } + // + // rig.expect_penalty(); + // assert!(bl.failed_chains.contains(&chain_hash)); + // } + // + // #[test] + // fn test_parent_lookup_disconnection() { + // let fork_name = ForkName::Deneb; + // let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + // let peer_id = PeerId::random(); + // let trigger_block = rig.rand_block(fork_name); + // let trigger_block_root = trigger_block.canonical_root(); + // let trigger_parent_root = trigger_block.parent_root(); + // let trigger_slot = trigger_block.slot(); + // bl.search_parent( + // trigger_slot, + // trigger_block_root, + // trigger_parent_root, + // peer_id, + // &mut cx, + // ); + // + // bl.peer_disconnected(&peer_id, &mut cx); + // assert!(bl.parent_lookups.is_empty()); + // } + // + // #[test] + // fn test_single_block_lookup_ignored_response() { + // let fork_name = ForkName::Deneb; + // let response_type = ResponseType::Block; + // let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + // + // let block = rig.rand_block(fork_name); + // let peer_id = PeerId::random(); + // + // // Trigger the request + // bl.search_block( + // block.canonical_root(), + // peer_id, + // PeerShouldHave::BlockAndBlobs, + // &mut cx, + // ); + // let id = rig.expect_block_request(response_type); + // + // // The peer provides the correct block, should not be penalized. Now the block should be sent + // // for processing. + // bl.single_block_lookup_response(id, peer_id, Some(block.into()), D, &mut cx); + // rig.expect_empty_network(); + // rig.expect_block_process(response_type); + // + // // The request should still be active. + // assert_eq!(bl.single_block_lookups.len(), 1); + // + // // Send the stream termination. Peer should have not been penalized, and the request removed + // // after processing. + // bl.single_block_lookup_response(id, peer_id, None, D, &mut cx); + // // Send an Ignored response, the request should be dropped + // bl.single_block_processed(id, BlockProcessingResult::Ignored, response_type, &mut cx); + // rig.expect_empty_network(); + // assert_eq!(bl.single_block_lookups.len(), 0); + // } + // + // #[test] + // fn test_parent_lookup_ignored_response() { + // let fork_name = ForkName::Deneb; + // let response_type = ResponseType::Block; + // let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + // + // let parent = rig.rand_block(fork_name); + // let block = rig.block_with_parent(parent.canonical_root(), fork_name); + // let chain_hash = block.canonical_root(); + // let peer_id = PeerId::random(); + // let block_root = block.canonical_root(); + // let parent_root = block.parent_root(); + // let slot = block.slot(); + // + // // Trigger the request + // bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); + // let id = rig.expect_parent_request(response_type); + // + // // Peer sends the right block, it should be sent for processing. Peer should not be penalized. + // bl.parent_lookup_response(id, peer_id, Some(parent.into()), D, &mut cx); + // rig.expect_block_process(response_type); + // rig.expect_empty_network(); + // + // // Return an Ignored result. The request should be dropped + // bl.parent_block_processed( + // chain_hash, + // BlockProcessingResult::Ignored, + // response_type, + // &mut cx, + // ); + // rig.expect_empty_network(); + // assert_eq!(bl.parent_lookups.len(), 0); + // } + // + // /// This is a regression test. + // #[test] + // fn test_same_chain_race_condition() { + // let fork_name = ForkName::Deneb; + // let response_type = ResponseType::Block; + // let (mut bl, mut cx, mut rig) = TestRig::test_setup(true); + // + // #[track_caller] + // fn parent_lookups_consistency(bl: &BlockLookups) { + // let hashes: Vec<_> = bl + // .parent_lookups + // .iter() + // .map(|req| req.chain_hash()) + // .collect(); + // let expected = hashes.len(); + // assert_eq!( + // expected, + // hashes + // .into_iter() + // .collect::>() + // .len(), + // "duplicated chain hashes in parent queue" + // ) + // } + // // if we use one or two blocks it will match on the hash or the parent hash, so make a longer + // // chain. + // let depth = 4; + // let mut blocks = Vec::>>::with_capacity(depth); + // while blocks.len() < depth { + // let parent = blocks + // .last() + // .map(|b| b.canonical_root()) + // .unwrap_or_else(Hash256::random); + // let block = Arc::new(rig.block_with_parent(parent, fork_name)); + // blocks.push(block); + // } + // + // let peer_id = PeerId::random(); + // let trigger_block = blocks.pop().unwrap(); + // let chain_hash = trigger_block.canonical_root(); + // let trigger_block_root = trigger_block.canonical_root(); + // let trigger_parent_root = trigger_block.parent_root(); + // let trigger_slot = trigger_block.slot(); + // bl.search_parent( + // trigger_slot, + // trigger_block_root, + // trigger_parent_root, + // peer_id, + // &mut cx, + // ); + // + // for (i, block) in blocks.into_iter().rev().enumerate() { + // let id = rig.expect_parent_request(response_type); + // // the block + // bl.parent_lookup_response(id, peer_id, Some(block.clone()), D, &mut cx); + // // the stream termination + // bl.parent_lookup_response(id, peer_id, None, D, &mut cx); + // // the processing request + // rig.expect_block_process(response_type); + // // the processing result + // if i + 2 == depth { + // // one block was removed + // bl.parent_block_processed( + // chain_hash, + // BlockError::BlockIsAlreadyKnown.into(), + // response_type, + // &mut cx, + // ) + // } else { + // bl.parent_block_processed( + // chain_hash, + // BlockError::ParentUnknown(block.into()).into(), + // response_type, + // &mut cx, + // ) + // } + // parent_lookups_consistency(&bl) + // } + // + // // Processing succeeds, now the rest of the chain should be sent for processing. + // rig.expect_parent_chain_process(); + // + // // Try to get this block again while the chain is being processed. We should not request it again. + // let peer_id = PeerId::random(); + // let trigger_block_root = trigger_block.canonical_root(); + // let trigger_parent_root = trigger_block.parent_root(); + // let trigger_slot = trigger_block.slot(); + // bl.search_parent( + // trigger_slot, + // trigger_block_root, + // trigger_parent_root, + // peer_id, + // &mut cx, + // ); + // parent_lookups_consistency(&bl); + // + // let process_result = BatchProcessResult::Success { + // was_non_empty: true, + // }; + // bl.parent_chain_processed(chain_hash, process_result, &mut cx); + // assert_eq!(bl.parent_lookups.len(), 0); + // } +}