add some tests and fix a bug

This commit is contained in:
realbigsean
2023-05-08 11:58:05 -04:00
parent a0f6159cae
commit 6aff52c5b4
2 changed files with 245 additions and 256 deletions

View File

@@ -21,6 +21,7 @@ pub struct SingleBlockLookup<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> {
pub requested_ids: Vec<BlobIdentifier>,
pub downloaded_blobs: FixedBlobSidecarList<T::EthSpec>,
pub downloaded_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
pub expected_num_blobs: Option<usize>,
pub block_request_state: SingleLookupRequestState<MAX_ATTEMPTS>,
pub blob_request_state: SingleLookupRequestState<MAX_ATTEMPTS>,
pub da_checker: Arc<DataAvailabilityChecker<T::EthSpec, T::SlotClock>>,
@@ -88,6 +89,7 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
requested_ids: <_>::default(),
downloaded_block: None,
downloaded_blobs: <_>::default(),
expected_num_blobs: None,
block_request_state: SingleLookupRequestState::new(peer_source),
blob_request_state: SingleLookupRequestState::new(peer_source),
da_checker,
@@ -187,32 +189,40 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
self.block_request_state.register_failure_downloading();
Err(LookupVerifyError::ExtraBlocksReturned)
}
State::Downloading { peer_id } => match block {
Some(block) => {
// Compute the block root using this specific function so that we can get timing
// metrics.
let block_root = get_block_root(&block);
if block_root != self.requested_block_root {
// return an error and drop the block
// NOTE: we take this is as a download failure to prevent counting the
// attempt as a chain failure, but simply a peer failure.
self.block_request_state.register_failure_downloading();
Err(LookupVerifyError::RootMismatch)
} else {
// Return the block for processing.
self.block_request_state.state = State::Processing { peer_id };
Ok(Some((block_root, block)))
State::Downloading { peer_id } => {
match block {
Some(block) => {
// Compute the block root using this specific function so that we can get timing
// metrics.
let block_root = get_block_root(&block);
if block_root != self.requested_block_root {
// return an error and drop the block
// NOTE: we take this is as a download failure to prevent counting the
// attempt as a chain failure, but simply a peer failure.
self.block_request_state.register_failure_downloading();
Err(LookupVerifyError::RootMismatch)
} else {
self.expected_num_blobs =
block.message().body().blob_kzg_commitments().ok().map(
|commitments| {
std::cmp::min(self.requested_ids.len(), commitments.len())
},
);
// Return the block for processing.
self.block_request_state.state = State::Processing { peer_id };
Ok(Some((block_root, block)))
}
}
None => {
if peer_id.should_have_block() {
self.block_request_state.register_failure_downloading();
Err(LookupVerifyError::NoBlockReturned)
} else {
Err(LookupVerifyError::BenignFailure)
}
}
}
None => {
if peer_id.should_have_block() {
self.block_request_state.register_failure_downloading();
Err(LookupVerifyError::NoBlockReturned)
} else {
Err(LookupVerifyError::BenignFailure)
}
}
},
}
State::Processing { peer_id: _ } => match block {
Some(_) => {
// We sent the block for processing and received an extra block.
@@ -257,16 +267,15 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
}
}
None => {
let downloaded_all_blobs = self.downloaded_all_blobs();
if peer_source.should_have_blobs() && !downloaded_all_blobs {
let downloaded_expected_blobs = self.downloaded_expected_blobs();
if peer_source.should_have_blobs() && !downloaded_expected_blobs {
return Err(LookupVerifyError::NotEnoughBlobsReturned);
} else if !downloaded_all_blobs {
} else if !downloaded_expected_blobs {
return Err(LookupVerifyError::BenignFailure);
}
self.blob_request_state.state = State::Processing {
peer_id: peer_source,
};
dbg!("sending blobs for processing");
Ok(Some((
self.requested_block_root,
self.downloaded_blobs.clone(),
@@ -288,24 +297,15 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
}
}
fn downloaded_all_blobs(&self) -> bool {
let Some(expected_num_blobs) = self
.downloaded_block
.as_ref()
.and_then(|block| {
block
.message()
.body()
.blob_kzg_commitments().ok()
.map(|commitments| commitments.len())
}) else {
return true
};
fn downloaded_expected_blobs(&self) -> bool {
let Some(expected_num_blobs) = self.expected_num_blobs else {
return true;
};
let downloaded_num_blobs = self
.downloaded_blobs
.iter()
.map(|blob_opt| blob_opt.is_some())
.filter(|blob_opt| blob_opt.is_some())
.count();
downloaded_num_blobs == expected_num_blobs
}

View File

@@ -1102,6 +1102,174 @@ mod deneb_only {
use super::*;
use std::str::FromStr;
struct DenebTester {
bl: BlockLookups<T>,
cx: SyncNetworkContext<T>,
rig: TestRig,
block: Option<Arc<SignedBeaconBlock<E>>>,
blobs: Vec<Arc<BlobSidecar<E>>>,
peer_id: PeerId,
block_req_id: u32,
blob_req_id: u32,
slot: Slot,
block_root: Hash256,
}
impl DenebTester {
fn new() -> Option<Self> {
let fork_name = get_fork_name();
if !matches!(fork_name, ForkName::Deneb) {
return None;
}
let (mut bl, mut cx, mut rig) = TestRig::test_setup(false);
rig.harness.chain.slot_clock.set_slot(
E::slots_per_epoch() * rig.harness.spec.deneb_fork_epoch.unwrap().as_u64(),
);
let (block, blobs) = rig.rand_block_and_blobs(fork_name, NumBlobs::Random);
let blobs = blobs.into_iter().map(Arc::new).collect::<Vec<_>>();
let peer_id = PeerId::random();
let slot = block.slot();
let block_root = block.canonical_root();
// Trigger the request
bl.search_block(block_root, PeerSource::Attestation(peer_id), &mut cx);
let block_req_id = rig.expect_block_request(ResponseType::Block);
let blob_req_id = rig.expect_block_request(ResponseType::Blob);
Some(Self {
bl,
cx,
rig,
block: Some(Arc::new(block)),
blobs,
peer_id,
block_req_id,
blob_req_id,
slot,
block_root,
})
}
fn block_response(mut self) -> Self {
// The peer provides the correct block, should not be penalized. Now the block should be sent
// for processing.
self.bl.single_block_lookup_response(
self.block_req_id,
self.peer_id,
self.block.clone(),
D,
&mut self.cx,
);
self.rig.expect_empty_network();
self.rig.expect_block_process(ResponseType::Block);
// The request should still be active.
assert_eq!(self.bl.single_block_lookups.len(), 1);
self
}
fn blobs_response(mut self) -> Self {
for blob in &self.blobs {
self.bl.single_blob_lookup_response(
self.blob_req_id,
self.peer_id,
Some(blob.clone()),
D,
&mut self.cx,
);
self.rig.expect_empty_network();
assert_eq!(self.bl.single_block_lookups.len(), 1);
}
// Send the blob stream termination. Peer should have not been penalized.
self.bl.single_blob_lookup_response(
self.blob_req_id,
self.peer_id,
None,
D,
&mut self.cx,
);
self.rig.expect_empty_network();
self.rig.expect_block_process(ResponseType::Blob);
self
}
fn empty_block_response(mut self) -> Self {
self.bl.single_block_lookup_response(
self.block_req_id,
self.peer_id,
None,
D,
&mut self.cx,
);
self
}
fn empty_blobs_response(mut self) -> Self {
self.bl.single_blob_lookup_response(
self.blob_req_id,
self.peer_id,
None,
D,
&mut self.cx,
);
self
}
fn block_imported(mut self) -> Self {
// Missing blobs should be the request is not removed, the outstanding blobs request should
// mean we do not send a new request.
self.bl.single_block_processed(
self.block_req_id,
BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(self.block_root)),
ResponseType::Block,
&mut self.cx,
);
self.rig.expect_empty_network();
assert_eq!(self.bl.single_block_lookups.len(), 0);
self
}
fn missing_components(mut self) -> Self {
self.bl.single_block_processed(
self.block_req_id,
BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents(
self.slot,
self.block_root,
)),
ResponseType::Block,
&mut self.cx,
);
assert_eq!(self.bl.single_block_lookups.len(), 1);
self
}
fn expect_penalty(mut self) -> Self {
self.rig.expect_penalty();
self
}
fn expect_no_penalty(mut self) -> Self {
self.rig.expect_empty_network();
self
}
fn expect_block_request(mut self) -> Self {
self.rig.expect_block_request(ResponseType::Block);
self
}
fn expect_blobs_request(mut self) -> Self {
self.rig.expect_block_request(ResponseType::Blob);
self
}
fn expect_no_blobs_request(mut self) -> Self {
self.rig.expect_empty_network();
self
}
fn expect_no_block_request(mut self) -> Self {
self.rig.expect_empty_network();
self
}
}
fn get_fork_name() -> ForkName {
ForkName::from_str(
&std::env::var(beacon_chain::test_utils::FORK_NAME_ENV_VAR).unwrap_or_else(|e| {
@@ -1117,243 +1285,64 @@ mod deneb_only {
#[test]
fn single_block_and_blob_lookup_block_returned_first() {
let fork_name = get_fork_name();
if !matches!(fork_name, ForkName::Deneb) {
let Some(tester) = DenebTester::new() else {
return;
}
let (mut bl, mut cx, mut rig) = TestRig::test_setup(false);
rig.harness
.chain
.slot_clock
.set_slot(E::slots_per_epoch() * rig.harness.spec.deneb_fork_epoch.unwrap().as_u64());
};
let (block, blobs) = rig.rand_block_and_blobs(fork_name, NumBlobs::Random);
let slot = block.slot();
let peer_id = PeerId::random();
let block_root = block.canonical_root();
// Trigger the request
bl.search_block(block_root, PeerSource::Attestation(peer_id), &mut cx);
let block_id = rig.expect_block_request(ResponseType::Block);
let blob_id = rig.expect_block_request(ResponseType::Blob);
// The peer provides the correct block, should not be penalized. Now the block should be sent
// for processing.
bl.single_block_lookup_response(block_id, peer_id, Some(block.into()), D, &mut cx);
rig.expect_empty_network();
rig.expect_block_process(ResponseType::Block);
// The request should still be active.
assert_eq!(bl.single_block_lookups.len(), 1);
// Send the stream termination. Peer should have not been penalized.
bl.single_block_lookup_response(block_id, peer_id, None, D, &mut cx);
// Missing blobs should be the request is not removed, the outstanding blobs request should
// mean we do not send a new request.
bl.single_block_processed(
block_id,
BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents(
slot, block_root,
)),
ResponseType::Block,
&mut cx,
);
rig.expect_empty_network();
assert_eq!(bl.single_block_lookups.len(), 1);
for blob in blobs {
bl.single_blob_lookup_response(blob_id, peer_id, Some(Arc::new(blob)), D, &mut cx);
rig.expect_empty_network();
assert_eq!(bl.single_block_lookups.len(), 1);
}
// Send the blob stream termination. Peer should have not been penalized.
bl.single_blob_lookup_response(blob_id, peer_id, None, D, &mut cx);
rig.expect_empty_network();
rig.expect_block_process(ResponseType::Blob);
// Missing blobs should be the request is not removed, the outstanding blobs request should
// mean we do not send a new request.
bl.single_block_processed(
block_id,
BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)),
ResponseType::Block,
&mut cx,
);
rig.expect_empty_network();
assert_eq!(bl.single_block_lookups.len(), 0);
tester.block_response().blobs_response().block_imported();
}
#[test]
fn single_block_and_blob_lookup_blob_returned_first() {
let fork_name = get_fork_name();
if !matches!(fork_name, ForkName::Deneb) {
fn single_block_and_blob_lookup_blobs_returned_first() {
let Some(tester) = DenebTester::new() else {
return;
}
let (mut bl, mut cx, mut rig) = TestRig::test_setup(false);
rig.harness
.chain
.slot_clock
.set_slot(E::slots_per_epoch() * rig.harness.spec.deneb_fork_epoch.unwrap().as_u64());
};
let (block, blobs) = rig.rand_block_and_blobs(fork_name, NumBlobs::Random);
let slot = block.slot();
let peer_id = PeerId::random();
let block_root = block.canonical_root();
// Trigger the request
bl.search_block(block_root, PeerSource::Attestation(peer_id), &mut cx);
let block_id = rig.expect_block_request(ResponseType::Block);
let blob_id = rig.expect_block_request(ResponseType::Blob);
for blob in blobs {
bl.single_blob_lookup_response(blob_id, peer_id, Some(Arc::new(blob)), D, &mut cx);
rig.expect_empty_network();
assert_eq!(bl.single_block_lookups.len(), 1);
}
// Send the blob stream termination. Peer should have not been penalized.
bl.single_blob_lookup_response(blob_id, peer_id, None, D, &mut cx);
rig.expect_empty_network();
rig.expect_block_process(ResponseType::Blob);
// The request should still be active.
assert_eq!(bl.single_block_lookups.len(), 1);
// The peer provides the correct block, should not be penalized. Now the block should be sent
// for processing.
bl.single_block_lookup_response(block_id, peer_id, Some(block.into()), D, &mut cx);
rig.expect_empty_network();
rig.expect_block_process(ResponseType::Block);
// Send the stream termination. Peer should have not been penalized.
bl.single_block_lookup_response(block_id, peer_id, None, D, &mut cx);
// Missing blobs should be the request is not removed, the outstanding blobs request should
// mean we do not send a new request.
bl.single_block_processed(
block_id,
BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents(
slot, block_root,
)),
ResponseType::Block,
&mut cx,
);
rig.expect_empty_network();
assert_eq!(bl.single_block_lookups.len(), 1);
// Missing blobs should be the request is not removed, the outstanding blobs request should
// mean we do not send a new request.
bl.single_block_processed(
block_id,
BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)),
ResponseType::Block,
&mut cx,
);
rig.expect_empty_network();
assert_eq!(bl.single_block_lookups.len(), 0);
tester.blobs_response().block_response().block_imported();
}
#[test]
fn single_block_and_blob_lookup_empty_response() {
let response_type = ResponseType::Block;
let fork_name = get_fork_name();
if !matches!(fork_name, ForkName::Deneb) {
let Some(tester) = DenebTester::new() else {
return;
}
let (mut bl, mut cx, mut rig) = TestRig::test_setup(false);
};
let block_hash = Hash256::random();
let peer_id = PeerId::random();
// Trigger the request
bl.search_block(block_hash, PeerSource::Attestation(peer_id), &mut cx);
let id = rig.expect_block_request(response_type);
let blob_id = rig.expect_block_request(ResponseType::Blob);
// The peer does not have the block. It should be penalized.
bl.single_block_lookup_response(id, peer_id, None, D, &mut cx);
rig.expect_penalty();
rig.expect_block_request(response_type); // it should be retried
rig.expect_empty_network(); // there should be no blob retry
bl.single_blob_lookup_response(blob_id, peer_id, None, D, &mut cx);
rig.expect_empty_network(); // there should be no penalty or retry, we don't know
// whether we should have blobs
tester
.empty_block_response()
.expect_penalty()
.expect_block_request()
.expect_no_blobs_request()
.empty_blobs_response()
.expect_no_penalty()
.expect_no_block_request()
.expect_no_blobs_request();
}
#[test]
fn single_blob_lookup_empty_response() {
let response_type = ResponseType::Block;
let fork_name = get_fork_name();
if !matches!(fork_name, ForkName::Deneb) {
let Some(tester) = DenebTester::new() else {
return;
}
let (mut bl, mut cx, mut rig) = TestRig::test_setup(false);
};
let block_hash = Hash256::random();
let peer_id = PeerId::random();
// Trigger the request
bl.search_block(block_hash, PeerSource::Attestation(peer_id), &mut cx);
let id = rig.expect_block_request(response_type);
let _ = rig.expect_block_request(ResponseType::Blob);
// The peer does not have the block. It should be penalized.
bl.single_blob_lookup_response(id, peer_id, None, D, &mut cx);
rig.expect_empty_network(); // there should be no penalty or retry, we don't know
// whether we should have blobs
tester
.empty_blobs_response()
.expect_no_penalty()
.expect_no_block_request()
.expect_no_blobs_request();
}
#[test]
fn test_single_block_response_then_empty_blob_response() {
let fork_name = get_fork_name();
if !matches!(fork_name, ForkName::Deneb) {
fn single_block_response_then_empty_blob_response() {
let Some(tester) = DenebTester::new() else {
return;
}
let (mut bl, mut cx, mut rig) = TestRig::test_setup(false);
rig.harness
.chain
.slot_clock
.set_slot(E::slots_per_epoch() * rig.harness.spec.deneb_fork_epoch.unwrap().as_u64());
};
let (block, _) = rig.rand_block_and_blobs(fork_name, NumBlobs::Random);
let slot = block.slot();
let peer_id = PeerId::random();
let block_root = block.canonical_root();
// Trigger the request
bl.search_block(block_root, PeerSource::Attestation(peer_id), &mut cx);
let block_id = rig.expect_block_request(ResponseType::Block);
let blob_id = rig.expect_block_request(ResponseType::Blob);
// The peer provides the correct block, should not be penalized. Now the block should be sent
// for processing.
bl.single_block_lookup_response(block_id, peer_id, Some(block.into()), D, &mut cx);
rig.expect_empty_network();
rig.expect_block_process(ResponseType::Block);
// The request should still be active.
assert_eq!(bl.single_block_lookups.len(), 1);
// Send the stream termination. Peer should have not been penalized.
bl.single_block_lookup_response(block_id, peer_id, None, D, &mut cx);
// Missing blobs should be the request is not removed, the outstanding blobs request should
// mean we do not send a new request.
bl.single_block_processed(
block_id,
BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents(
slot, block_root,
)),
ResponseType::Block,
&mut cx,
);
rig.expect_empty_network();
assert_eq!(bl.single_block_lookups.len(), 1);
// The peer does not have the block. It should be penalized.
bl.single_blob_lookup_response(blob_id, peer_id, None, D, &mut cx);
rig.expect_penalty();
rig.expect_block_request(ResponseType::Blob); // it should be retried
rig.expect_empty_network();
tester
.block_response()
.missing_components()
.empty_blobs_response()
.expect_penalty()
.expect_blobs_request()
.expect_no_block_request();
}
}