make sure blobs are sent for processing after stream termination, delete copied tests

This commit is contained in:
realbigsean
2023-05-03 12:02:02 -04:00
parent f5facd603e
commit a0f6159cae
4 changed files with 93 additions and 800 deletions

View File

@@ -681,8 +681,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
}
Ok(None) => {
// Request finished successfully, nothing else to do. It will be removed after the
// processing result arrives.
// Waiting for more blobs to arrive
self.parent_lookups.push(parent_lookup);
}
Err(e) => match e {

View File

@@ -303,15 +303,12 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
blob: Option<Arc<BlobSidecar<T::EthSpec>>>,
failed_chains: &mut lru_cache::LRUTimeCache<Hash256>,
) -> Result<Option<RootBlobsTuple<T::EthSpec>>, ParentVerifyError> {
let parent_root_opt = blob.as_ref().map(|b| b.block_parent_root);
let blobs = self.current_parent_request.verify_blob(blob)?;
// check if the parent of this block isn't in the failed cache. If it is, this chain should
// be dropped and the peer downscored.
if let Some(parent_root) = blobs
.as_ref()
.and_then(|(_, blobs)| blobs.first())
.and_then(|blob| blob.as_ref().map(|b| b.block_parent_root))
{
if let Some(parent_root) = parent_root_opt {
if failed_chains.contains(&parent_root) {
self.current_parent_request
.blob_request_state

View File

@@ -250,10 +250,7 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
self.requested_ids.retain(|id| *id != received_id);
if let Some(blob_opt) = self.downloaded_blobs.get_mut(blob.index as usize) {
*blob_opt = Some(blob);
Ok(Some((
self.requested_block_root,
self.downloaded_blobs.clone(),
)))
Ok(None)
} else {
Err(LookupVerifyError::InvalidIndex(blob.index))
}
@@ -269,6 +266,7 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
self.blob_request_state.state = State::Processing {
peer_id: peer_source,
};
dbg!("sending blobs for processing");
Ok(Some((
self.requested_block_root,
self.downloaded_blobs.clone(),
@@ -291,24 +289,25 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
}
fn downloaded_all_blobs(&self) -> bool {
let expected_num_blobs = self
let Some(expected_num_blobs) = self
.downloaded_block
.as_ref()
.map(|block| {
.and_then(|block| {
block
.message()
.body()
.blob_kzg_commitments()
.map_or(0, |commitments| commitments.len())
})
.unwrap_or(0);
let downloaded_enough_blobs = expected_num_blobs
== self
.downloaded_blobs
.iter()
.map(|blob_opt| blob_opt.is_some())
.count();
downloaded_enough_blobs
.blob_kzg_commitments().ok()
.map(|commitments| commitments.len())
}) else {
return true
};
let downloaded_num_blobs = self
.downloaded_blobs
.iter()
.map(|blob_opt| blob_opt.is_some())
.count();
downloaded_num_blobs == expected_num_blobs
}
pub fn request_block(

View File

@@ -1116,7 +1116,7 @@ mod deneb_only {
}
#[test]
fn test_single_block_and_blob_lookup_happy_path() {
fn single_block_and_blob_lookup_block_returned_first() {
let fork_name = get_fork_name();
if !matches!(fork_name, ForkName::Deneb) {
return;
@@ -1167,7 +1167,9 @@ mod deneb_only {
assert_eq!(bl.single_block_lookups.len(), 1);
}
// Send the blob stream termination. Peer should have not been penalized.
bl.single_blob_lookup_response(block_id, peer_id, None, D, &mut cx);
bl.single_blob_lookup_response(blob_id, peer_id, None, D, &mut cx);
rig.expect_empty_network();
rig.expect_block_process(ResponseType::Blob);
// Missing blobs should be the request is not removed, the outstanding blobs request should
// mean we do not send a new request.
@@ -1182,7 +1184,75 @@ mod deneb_only {
}
#[test]
fn test_single_block_and_blob_lookup_empty_response() {
fn single_block_and_blob_lookup_blob_returned_first() {
let fork_name = get_fork_name();
if !matches!(fork_name, ForkName::Deneb) {
return;
}
let (mut bl, mut cx, mut rig) = TestRig::test_setup(false);
rig.harness
.chain
.slot_clock
.set_slot(E::slots_per_epoch() * rig.harness.spec.deneb_fork_epoch.unwrap().as_u64());
let (block, blobs) = rig.rand_block_and_blobs(fork_name, NumBlobs::Random);
let slot = block.slot();
let peer_id = PeerId::random();
let block_root = block.canonical_root();
// Trigger the request
bl.search_block(block_root, PeerSource::Attestation(peer_id), &mut cx);
let block_id = rig.expect_block_request(ResponseType::Block);
let blob_id = rig.expect_block_request(ResponseType::Blob);
for blob in blobs {
bl.single_blob_lookup_response(blob_id, peer_id, Some(Arc::new(blob)), D, &mut cx);
rig.expect_empty_network();
assert_eq!(bl.single_block_lookups.len(), 1);
}
// Send the blob stream termination. Peer should have not been penalized.
bl.single_blob_lookup_response(blob_id, peer_id, None, D, &mut cx);
rig.expect_empty_network();
rig.expect_block_process(ResponseType::Blob);
// The request should still be active.
assert_eq!(bl.single_block_lookups.len(), 1);
// The peer provides the correct block, should not be penalized. Now the block should be sent
// for processing.
bl.single_block_lookup_response(block_id, peer_id, Some(block.into()), D, &mut cx);
rig.expect_empty_network();
rig.expect_block_process(ResponseType::Block);
// Send the stream termination. Peer should have not been penalized.
bl.single_block_lookup_response(block_id, peer_id, None, D, &mut cx);
// Missing blobs should be the request is not removed, the outstanding blobs request should
// mean we do not send a new request.
bl.single_block_processed(
block_id,
BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents(
slot, block_root,
)),
ResponseType::Block,
&mut cx,
);
rig.expect_empty_network();
assert_eq!(bl.single_block_lookups.len(), 1);
// Missing blobs should be the request is not removed, the outstanding blobs request should
// mean we do not send a new request.
bl.single_block_processed(
block_id,
BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)),
ResponseType::Block,
&mut cx,
);
rig.expect_empty_network();
assert_eq!(bl.single_block_lookups.len(), 0);
}
#[test]
fn single_block_and_blob_lookup_empty_response() {
let response_type = ResponseType::Block;
let fork_name = get_fork_name();
if !matches!(fork_name, ForkName::Deneb) {
@@ -1211,7 +1281,7 @@ mod deneb_only {
}
#[test]
fn test_single_blob_lookup_empty_response() {
fn single_blob_lookup_empty_response() {
let response_type = ResponseType::Block;
let fork_name = get_fork_name();
if !matches!(fork_name, ForkName::Deneb) {
@@ -1286,776 +1356,4 @@ mod deneb_only {
rig.expect_block_request(ResponseType::Blob); // it should be retried
rig.expect_empty_network();
}
#[test]
fn test_single_block_lookup_wrong_response() {
let response_type = ResponseType::Block;
let fork_name = get_fork_name();
if !matches!(fork_name, ForkName::Deneb) {
return;
}
let (mut bl, mut cx, mut rig) = TestRig::test_setup(false);
let fork_name = rig
.harness
.spec
.fork_name_at_slot::<E>(rig.harness.chain.slot().unwrap());
let block_hash = Hash256::random();
let peer_id = PeerId::random();
// Trigger the request
bl.search_block(block_hash, PeerSource::Attestation(peer_id), &mut cx);
let id = rig.expect_block_request(response_type);
let blob_id = rig.expect_block_request(ResponseType::Blob);
// Peer sends something else. It should be penalized.
let bad_block = rig.rand_block(fork_name);
bl.single_block_lookup_response(id, peer_id, Some(bad_block.into()), D, &mut cx);
rig.expect_penalty();
rig.expect_block_request(response_type); // should be retried
// Send the stream termination. This should not produce an additional penalty.
bl.single_block_lookup_response(id, peer_id, None, D, &mut cx);
rig.expect_empty_network();
}
#[test]
fn test_single_block_lookup_failure() {
let response_type = ResponseType::Block;
let (mut bl, mut cx, mut rig) = TestRig::test_setup(false);
let fork_name = rig
.harness
.spec
.fork_name_at_slot::<E>(rig.harness.chain.slot().unwrap());
let block_hash = Hash256::random();
let peer_id = PeerId::random();
// Trigger the request
bl.search_block(block_hash, PeerSource::Attestation(peer_id), &mut cx);
let id = rig.expect_block_request(response_type);
// If we're in deneb, a blob request should have been triggered as well,
// we don't require a response because we're generateing 0-blob blocks in this test.
if matches!(fork_name, ForkName::Deneb) {
let _ = rig.expect_block_request(ResponseType::Blob);
}
// The request fails. RPC failures are handled elsewhere so we should not penalize the peer.
bl.single_block_lookup_failed(id, &peer_id, &mut cx, RPCError::UnsupportedProtocol);
rig.expect_block_request(response_type);
rig.expect_empty_network();
}
#[test]
fn test_single_block_lookup_becomes_parent_request() {
let response_type = ResponseType::Block;
let (mut bl, mut cx, mut rig) = TestRig::test_setup(false);
let fork_name = rig
.harness
.spec
.fork_name_at_slot::<E>(rig.harness.chain.slot().unwrap());
let block = Arc::new(rig.rand_block(fork_name));
let peer_id = PeerId::random();
// Trigger the request
bl.search_block(
block.canonical_root(),
PeerSource::Attestation(peer_id),
&mut cx,
);
let id = rig.expect_block_request(response_type);
// If we're in deneb, a blob request should have been triggered as well,
// we don't require a response because we're generateing 0-blob blocks in this test.
if matches!(fork_name, ForkName::Deneb) {
let _ = rig.expect_block_request(ResponseType::Blob);
}
// The peer provides the correct block, should not be penalized. Now the block should be sent
// for processing.
bl.single_block_lookup_response(id, peer_id, Some(block.clone()), D, &mut cx);
rig.expect_empty_network();
rig.expect_block_process(response_type);
// The request should still be active.
assert_eq!(bl.single_block_lookups.len(), 1);
// Send the stream termination. Peer should have not been penalized, and the request moved to a
// parent request after processing.
bl.single_block_processed(
id,
BlockError::ParentUnknown(block.into()).into(),
response_type,
&mut cx,
);
assert_eq!(bl.single_block_lookups.len(), 1);
rig.expect_parent_request(response_type);
// If we're in deneb, a blob request should have been triggered as well,
// we don't require a response because we're generateing 0-blob blocks in this test.
if matches!(fork_name, ForkName::Deneb) {
let _ = rig.expect_parent_request(ResponseType::Blob);
}
rig.expect_empty_network();
assert_eq!(bl.parent_lookups.len(), 1);
}
#[test]
fn test_parent_lookup_happy_path() {
let response_type = ResponseType::Block;
let (mut bl, mut cx, mut rig) = TestRig::test_setup(false);
let fork_name = rig
.harness
.spec
.fork_name_at_slot::<E>(rig.harness.chain.slot().unwrap());
let parent = rig.rand_block(fork_name);
let block = rig.block_with_parent(parent.canonical_root(), fork_name);
let chain_hash = block.canonical_root();
let peer_id = PeerId::random();
let block_root = block.canonical_root();
let parent_root = block.parent_root();
let slot = block.slot();
// Trigger the request
bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx);
let id = rig.expect_parent_request(response_type);
// If we're in deneb, a blob request should have been triggered as well,
// we don't require a response because we're generateing 0-blob blocks in this test.
if matches!(fork_name, ForkName::Deneb) {
let _ = rig.expect_parent_request(ResponseType::Blob);
}
// Peer sends the right block, it should be sent for processing. Peer should not be penalized.
bl.parent_lookup_response(id, peer_id, Some(parent.into()), D, &mut cx);
rig.expect_block_process(response_type);
rig.expect_empty_network();
// Processing succeeds, now the rest of the chain should be sent for processing.
bl.parent_block_processed(
chain_hash,
BlockError::BlockIsAlreadyKnown.into(),
response_type,
&mut cx,
);
rig.expect_parent_chain_process();
let process_result = BatchProcessResult::Success {
was_non_empty: true,
};
bl.parent_chain_processed(chain_hash, process_result, &mut cx);
assert_eq!(bl.parent_lookups.len(), 0);
}
#[test]
fn test_parent_lookup_wrong_response() {
let response_type = ResponseType::Block;
let (mut bl, mut cx, mut rig) = TestRig::test_setup(false);
let fork_name = rig
.harness
.spec
.fork_name_at_slot::<E>(rig.harness.chain.slot().unwrap());
let parent = rig.rand_block(fork_name);
let block = rig.block_with_parent(parent.canonical_root(), fork_name);
let chain_hash = block.canonical_root();
let peer_id = PeerId::random();
let block_root = block.canonical_root();
let parent_root = block.parent_root();
let slot = block.slot();
// Trigger the request
bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx);
let id1 = rig.expect_parent_request(response_type);
// If we're in deneb, a blob request should have been triggered as well,
// we don't require a response because we're generateing 0-blob blocks in this test.
if matches!(fork_name, ForkName::Deneb) {
let _ = rig.expect_parent_request(ResponseType::Blob);
}
// Peer sends the wrong block, peer should be penalized and the block re-requested.
let bad_block = rig.rand_block(fork_name);
bl.parent_lookup_response(id1, peer_id, Some(bad_block.into()), D, &mut cx);
rig.expect_penalty();
let id2 = rig.expect_parent_request(response_type);
// Send the stream termination for the first request. This should not produce extra penalties.
bl.parent_lookup_response(id1, peer_id, None, D, &mut cx);
rig.expect_empty_network();
// Send the right block this time.
bl.parent_lookup_response(id2, peer_id, Some(parent.into()), D, &mut cx);
rig.expect_block_process(response_type);
// Processing succeeds, now the rest of the chain should be sent for processing.
bl.parent_block_processed(
chain_hash,
BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)),
response_type,
&mut cx,
);
rig.expect_parent_chain_process();
let process_result = BatchProcessResult::Success {
was_non_empty: true,
};
bl.parent_chain_processed(chain_hash, process_result, &mut cx);
assert_eq!(bl.parent_lookups.len(), 0);
}
#[test]
fn test_parent_lookup_empty_response() {
let response_type = ResponseType::Block;
let (mut bl, mut cx, mut rig) = TestRig::test_setup(false);
let fork_name = rig
.harness
.spec
.fork_name_at_slot::<E>(rig.harness.chain.slot().unwrap());
let parent = rig.rand_block(fork_name);
let block = rig.block_with_parent(parent.canonical_root(), fork_name);
let chain_hash = block.canonical_root();
let peer_id = PeerId::random();
let block_root = block.canonical_root();
let parent_root = block.parent_root();
let slot = block.slot();
// Trigger the request
bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx);
let id1 = rig.expect_parent_request(response_type);
// If we're in deneb, a blob request should have been triggered as well,
// we don't require a response because we're generateing 0-blob blocks in this test.
if matches!(fork_name, ForkName::Deneb) {
let _ = rig.expect_parent_request(ResponseType::Blob);
}
// Peer sends an empty response, peer should be penalized and the block re-requested.
bl.parent_lookup_response(id1, peer_id, None, D, &mut cx);
rig.expect_penalty();
let id2 = rig.expect_parent_request(response_type);
// Send the right block this time.
bl.parent_lookup_response(id2, peer_id, Some(parent.into()), D, &mut cx);
rig.expect_block_process(response_type);
// Processing succeeds, now the rest of the chain should be sent for processing.
bl.parent_block_processed(
chain_hash,
BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)),
response_type,
&mut cx,
);
rig.expect_parent_chain_process();
let process_result = BatchProcessResult::Success {
was_non_empty: true,
};
bl.parent_chain_processed(chain_hash, process_result, &mut cx);
assert_eq!(bl.parent_lookups.len(), 0);
}
#[test]
fn test_parent_lookup_rpc_failure() {
let response_type = ResponseType::Block;
let (mut bl, mut cx, mut rig) = TestRig::test_setup(false);
let fork_name = rig
.harness
.spec
.fork_name_at_slot::<E>(rig.harness.chain.slot().unwrap());
let parent = rig.rand_block(fork_name);
let block = rig.block_with_parent(parent.canonical_root(), fork_name);
let chain_hash = block.canonical_root();
let peer_id = PeerId::random();
let block_root = block.canonical_root();
let parent_root = block.parent_root();
let slot = block.slot();
// Trigger the request
bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx);
let id1 = rig.expect_parent_request(response_type);
// If we're in deneb, a blob request should have been triggered as well,
// we don't require a response because we're generateing 0-blob blocks in this test.
if matches!(fork_name, ForkName::Deneb) {
let _ = rig.expect_parent_request(ResponseType::Blob);
}
// The request fails. It should be tried again.
bl.parent_lookup_failed(
id1,
peer_id,
&mut cx,
RPCError::ErrorResponse(
RPCResponseErrorCode::ResourceUnavailable,
"older than deneb".into(),
),
);
let id2 = rig.expect_parent_request(response_type);
// Send the right block this time.
bl.parent_lookup_response(id2, peer_id, Some(parent.into()), D, &mut cx);
rig.expect_block_process(response_type);
// Processing succeeds, now the rest of the chain should be sent for processing.
bl.parent_block_processed(
chain_hash,
BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)),
response_type,
&mut cx,
);
rig.expect_parent_chain_process();
let process_result = BatchProcessResult::Success {
was_non_empty: true,
};
bl.parent_chain_processed(chain_hash, process_result, &mut cx);
assert_eq!(bl.parent_lookups.len(), 0);
}
#[test]
fn test_parent_lookup_too_many_attempts() {
let response_type = ResponseType::Block;
let (mut bl, mut cx, mut rig) = TestRig::test_setup(false);
let fork_name = rig
.harness
.spec
.fork_name_at_slot::<E>(rig.harness.chain.slot().unwrap());
let parent = rig.rand_block(fork_name);
let block = rig.block_with_parent(parent.canonical_root(), fork_name);
let peer_id = PeerId::random();
let block_root = block.canonical_root();
let parent_root = block.parent_root();
let slot = block.slot();
// Trigger the request
bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx);
for i in 1..=parent_lookup::PARENT_FAIL_TOLERANCE {
let id = rig.expect_parent_request(response_type);
// If we're in deneb, a blob request should have been triggered as well,
// we don't require a response because we're generateing 0-blob blocks in this test.
if matches!(fork_name, ForkName::Deneb) && i == 1 {
let _ = rig.expect_parent_request(ResponseType::Blob);
}
match i % 2 {
// make sure every error is accounted for
0 => {
// The request fails. It should be tried again.
bl.parent_lookup_failed(
id,
peer_id,
&mut cx,
RPCError::ErrorResponse(
RPCResponseErrorCode::ResourceUnavailable,
"older than deneb".into(),
),
);
}
_ => {
// Send a bad block this time. It should be tried again.
let bad_block = rig.rand_block(fork_name);
bl.parent_lookup_response(id, peer_id, Some(bad_block.into()), D, &mut cx);
// Send the stream termination
bl.parent_lookup_response(id, peer_id, None, D, &mut cx);
rig.expect_penalty();
}
}
if i < parent_lookup::PARENT_FAIL_TOLERANCE {
assert_eq!(bl.parent_lookups[0].failed_block_attempts(), dbg!(i));
}
}
assert_eq!(bl.parent_lookups.len(), 0);
}
#[test]
fn test_parent_lookup_too_many_download_attempts_no_blacklist() {
let response_type = ResponseType::Block;
let (mut bl, mut cx, mut rig) = TestRig::test_setup(false);
let fork_name = rig
.harness
.spec
.fork_name_at_slot::<E>(rig.harness.chain.slot().unwrap());
let parent = rig.rand_block(fork_name);
let block = rig.block_with_parent(parent.canonical_root(), fork_name);
let block_hash = block.canonical_root();
let peer_id = PeerId::random();
let block_root = block.canonical_root();
let parent_root = block.parent_root();
let slot = block.slot();
// Trigger the request
bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx);
for i in 1..=parent_lookup::PARENT_FAIL_TOLERANCE {
assert!(!bl.failed_chains.contains(&block_hash));
let id = rig.expect_parent_request(response_type);
// If we're in deneb, a blob request should have been triggered as well,
// we don't require a response because we're generateing 0-blob blocks in this test.
if matches!(fork_name, ForkName::Deneb) && i == 1 {
let _ = rig.expect_parent_request(ResponseType::Blob);
}
if i % 2 != 0 {
// The request fails. It should be tried again.
bl.parent_lookup_failed(
id,
peer_id,
&mut cx,
RPCError::ErrorResponse(
RPCResponseErrorCode::ResourceUnavailable,
"older than deneb".into(),
),
);
} else {
// Send a bad block this time. It should be tried again.
let bad_block = rig.rand_block(fork_name);
bl.parent_lookup_response(id, peer_id, Some(bad_block.into()), D, &mut cx);
rig.expect_penalty();
}
if i < parent_lookup::PARENT_FAIL_TOLERANCE {
assert_eq!(bl.parent_lookups[0].failed_block_attempts(), dbg!(i));
}
}
assert_eq!(bl.parent_lookups.len(), 0);
assert!(!bl.failed_chains.contains(&block_hash));
assert!(!bl.failed_chains.contains(&parent.canonical_root()));
}
#[test]
fn test_parent_lookup_too_many_processing_attempts_must_blacklist() {
let response_type = ResponseType::Block;
const PROCESSING_FAILURES: u8 = parent_lookup::PARENT_FAIL_TOLERANCE / 2 + 1;
let (mut bl, mut cx, mut rig) = TestRig::test_setup(false);
let fork_name = rig
.harness
.spec
.fork_name_at_slot::<E>(rig.harness.chain.slot().unwrap());
let parent = Arc::new(rig.rand_block(fork_name));
let block = rig.block_with_parent(parent.canonical_root(), fork_name);
let peer_id = PeerId::random();
let block_root = block.canonical_root();
let parent_root = block.parent_root();
let slot = block.slot();
// Trigger the request
bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx);
// Fail downloading the block
for i in 0..(parent_lookup::PARENT_FAIL_TOLERANCE - PROCESSING_FAILURES) {
let id = rig.expect_parent_request(response_type);
// If we're in deneb, a blob request should have been triggered as well,
// we don't require a response because we're generateing 0-blob blocks in this test.
if matches!(fork_name, ForkName::Deneb) && i == 0 {
let _ = rig.expect_parent_request(ResponseType::Blob);
}
// The request fails. It should be tried again.
bl.parent_lookup_failed(
id,
peer_id,
&mut cx,
RPCError::ErrorResponse(
RPCResponseErrorCode::ResourceUnavailable,
"older than deneb".into(),
),
);
}
// Now fail processing a block in the parent request
for _ in 0..PROCESSING_FAILURES {
let id = dbg!(rig.expect_parent_request(response_type));
// If we're in deneb, a blob request should have been triggered as well,
// we don't require a response because we're generateing 0-blob blocks in this test.
assert!(!bl.failed_chains.contains(&block_root));
// send the right parent but fail processing
bl.parent_lookup_response(id, peer_id, Some(parent.clone()), D, &mut cx);
bl.parent_block_processed(
block_root,
BlockError::InvalidSignature.into(),
response_type,
&mut cx,
);
bl.parent_lookup_response(id, peer_id, None, D, &mut cx);
rig.expect_penalty();
}
assert!(bl.failed_chains.contains(&block_root));
assert_eq!(bl.parent_lookups.len(), 0);
}
#[test]
fn test_parent_lookup_too_deep() {
let response_type = ResponseType::Block;
let (mut bl, mut cx, mut rig) = TestRig::test_setup(false);
let fork_name = rig
.harness
.spec
.fork_name_at_slot::<E>(rig.harness.chain.slot().unwrap());
let mut blocks =
Vec::<Arc<SignedBeaconBlock<E>>>::with_capacity(parent_lookup::PARENT_DEPTH_TOLERANCE);
while blocks.len() < parent_lookup::PARENT_DEPTH_TOLERANCE {
let parent = blocks
.last()
.map(|b| b.canonical_root())
.unwrap_or_else(Hash256::random);
let block = Arc::new(rig.block_with_parent(parent, fork_name));
blocks.push(block);
}
let peer_id = PeerId::random();
let trigger_block = blocks.pop().unwrap();
let chain_hash = trigger_block.canonical_root();
let trigger_block_root = trigger_block.canonical_root();
let trigger_parent_root = trigger_block.parent_root();
let trigger_slot = trigger_block.slot();
bl.search_parent(
trigger_slot,
trigger_block_root,
trigger_parent_root,
peer_id,
&mut cx,
);
for block in blocks.into_iter().rev() {
let id = rig.expect_parent_request(response_type);
// If we're in deneb, a blob request should have been triggered as well,
// we don't require a response because we're generateing 0-blob blocks in this test.
if matches!(fork_name, ForkName::Deneb) {
let _ = rig.expect_parent_request(ResponseType::Blob);
}
// the block
bl.parent_lookup_response(id, peer_id, Some(block.clone()), D, &mut cx);
// the stream termination
bl.parent_lookup_response(id, peer_id, None, D, &mut cx);
// the processing request
rig.expect_block_process(response_type);
// the processing result
bl.parent_block_processed(
chain_hash,
BlockError::ParentUnknown(block.into()).into(),
response_type,
&mut cx,
)
}
rig.expect_penalty();
assert!(bl.failed_chains.contains(&chain_hash));
}
#[test]
fn test_parent_lookup_disconnection() {
let (mut bl, mut cx, mut rig) = TestRig::test_setup(false);
let peer_id = PeerId::random();
let fork_name = rig
.harness
.spec
.fork_name_at_slot::<E>(rig.harness.chain.slot().unwrap());
let trigger_block = rig.rand_block(fork_name);
let trigger_block_root = trigger_block.canonical_root();
let trigger_parent_root = trigger_block.parent_root();
let trigger_slot = trigger_block.slot();
bl.search_parent(
trigger_slot,
trigger_block_root,
trigger_parent_root,
peer_id,
&mut cx,
);
bl.peer_disconnected(&peer_id, &mut cx);
assert!(bl.parent_lookups.is_empty());
}
#[test]
fn test_single_block_lookup_ignored_response() {
let response_type = ResponseType::Block;
let (mut bl, mut cx, mut rig) = TestRig::test_setup(false);
let fork_name = rig
.harness
.spec
.fork_name_at_slot::<E>(rig.harness.chain.slot().unwrap());
let block = rig.rand_block(fork_name);
let peer_id = PeerId::random();
// Trigger the request
bl.search_block(
block.canonical_root(),
PeerSource::Attestation(peer_id),
&mut cx,
);
let id = rig.expect_block_request(response_type);
// If we're in deneb, a blob request should have been triggered as well,
// we don't require a response because we're generateing 0-blob blocks in this test.
if matches!(fork_name, ForkName::Deneb) {
let _ = rig.expect_block_request(ResponseType::Blob);
}
// The peer provides the correct block, should not be penalized. Now the block should be sent
// for processing.
bl.single_block_lookup_response(id, peer_id, Some(block.into()), D, &mut cx);
rig.expect_empty_network();
rig.expect_block_process(response_type);
// The request should still be active.
assert_eq!(bl.single_block_lookups.len(), 1);
// Send the stream termination. Peer should have not been penalized, and the request removed
// after processing.
bl.single_block_lookup_response(id, peer_id, None, D, &mut cx);
// Send an Ignored response, the request should be dropped
bl.single_block_processed(id, BlockProcessingResult::Ignored, response_type, &mut cx);
rig.expect_empty_network();
assert_eq!(bl.single_block_lookups.len(), 0);
}
#[test]
fn test_parent_lookup_ignored_response() {
let response_type = ResponseType::Block;
let (mut bl, mut cx, mut rig) = TestRig::test_setup(false);
let fork_name = rig
.harness
.spec
.fork_name_at_slot::<E>(rig.harness.chain.slot().unwrap());
let parent = rig.rand_block(fork_name);
let block = rig.block_with_parent(parent.canonical_root(), fork_name);
let chain_hash = block.canonical_root();
let peer_id = PeerId::random();
let block_root = block.canonical_root();
let parent_root = block.parent_root();
let slot = block.slot();
// Trigger the request
bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx);
let id = rig.expect_parent_request(response_type);
// If we're in deneb, a blob request should have been triggered as well,
// we don't require a response because we're generateing 0-blob blocks in this test.
if matches!(fork_name, ForkName::Deneb) {
let _ = rig.expect_parent_request(ResponseType::Blob);
}
// Peer sends the right block, it should be sent for processing. Peer should not be penalized.
bl.parent_lookup_response(id, peer_id, Some(parent.into()), D, &mut cx);
rig.expect_block_process(response_type);
rig.expect_empty_network();
// Return an Ignored result. The request should be dropped
bl.parent_block_processed(
chain_hash,
BlockProcessingResult::Ignored,
response_type,
&mut cx,
);
rig.expect_empty_network();
assert_eq!(bl.parent_lookups.len(), 0);
}
/// This is a regression test.
#[test]
fn test_same_chain_race_condition() {
let response_type = ResponseType::Block;
let (mut bl, mut cx, mut rig) = TestRig::test_setup(true);
let fork_name = rig
.harness
.spec
.fork_name_at_slot::<E>(rig.harness.chain.slot().unwrap());
#[track_caller]
fn parent_lookups_consistency(bl: &BlockLookups<T>) {
let hashes: Vec<_> = bl
.parent_lookups
.iter()
.map(|req| req.chain_hash())
.collect();
let expected = hashes.len();
assert_eq!(
expected,
hashes
.into_iter()
.collect::<std::collections::HashSet<_>>()
.len(),
"duplicated chain hashes in parent queue"
)
}
// if we use one or two blocks it will match on the hash or the parent hash, so make a longer
// chain.
let depth = 4;
let mut blocks = Vec::<Arc<SignedBeaconBlock<E>>>::with_capacity(depth);
while blocks.len() < depth {
let parent = blocks
.last()
.map(|b| b.canonical_root())
.unwrap_or_else(Hash256::random);
let block = Arc::new(rig.block_with_parent(parent, fork_name));
blocks.push(block);
}
let peer_id = PeerId::random();
let trigger_block = blocks.pop().unwrap();
let chain_hash = trigger_block.canonical_root();
let trigger_block_root = trigger_block.canonical_root();
let trigger_parent_root = trigger_block.parent_root();
let trigger_slot = trigger_block.slot();
bl.search_parent(
trigger_slot,
trigger_block_root,
trigger_parent_root,
peer_id,
&mut cx,
);
for (i, block) in blocks.into_iter().rev().enumerate() {
let id = rig.expect_parent_request(response_type);
// If we're in deneb, a blob request should have been triggered as well,
// we don't require a response because we're generateing 0-blob blocks in this test.
if matches!(fork_name, ForkName::Deneb) {
let _ = rig.expect_parent_request(ResponseType::Blob);
}
// the block
bl.parent_lookup_response(id, peer_id, Some(block.clone()), D, &mut cx);
// the stream termination
bl.parent_lookup_response(id, peer_id, None, D, &mut cx);
// the processing request
rig.expect_block_process(response_type);
// the processing result
if i + 2 == depth {
// one block was removed
bl.parent_block_processed(
chain_hash,
BlockError::BlockIsAlreadyKnown.into(),
response_type,
&mut cx,
)
} else {
bl.parent_block_processed(
chain_hash,
BlockError::ParentUnknown(block.into()).into(),
response_type,
&mut cx,
)
}
parent_lookups_consistency(&bl)
}
// Processing succeeds, now the rest of the chain should be sent for processing.
rig.expect_parent_chain_process();
// Try to get this block again while the chain is being processed. We should not request it again.
let peer_id = PeerId::random();
let trigger_block_root = trigger_block.canonical_root();
let trigger_parent_root = trigger_block.parent_root();
let trigger_slot = trigger_block.slot();
bl.search_parent(
trigger_slot,
trigger_block_root,
trigger_parent_root,
peer_id,
&mut cx,
);
parent_lookups_consistency(&bl);
let process_result = BatchProcessResult::Success {
was_non_empty: true,
};
bl.parent_chain_processed(chain_hash, process_result, &mut cx);
assert_eq!(bl.parent_lookups.len(), 0);
}
}