Don't request block components until having block (#5774)

* Don't request block components until having block

* Update tests

* Resolve todo comment

* Merge branch 'unstable' into request-blocks-first
This commit is contained in:
Lion - dapplion
2024-05-14 17:50:38 +03:00
committed by GitHub
parent ce66ab374e
commit 683d9df63b
6 changed files with 207 additions and 314 deletions

View File

@@ -7,7 +7,6 @@ use crate::metrics;
use crate::sync::block_lookups::common::{ResponseType, PARENT_DEPTH_TOLERANCE};
use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor;
use crate::sync::manager::{Id, SingleLookupReqId};
use crate::sync::network_context::LookupFailure;
use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::data_availability_checker::AvailabilityCheckErrorCategory;
use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError};
@@ -325,12 +324,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
response: RpcProcessingResult<R::VerifiedResponseType>,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupResult, LookupRequestError> {
// Downscore peer even if lookup is not known
// Only downscore lookup verify errors. RPC errors are downscored in the network handler.
if let Err(LookupFailure::LookupVerifyError(e)) = &response {
// Note: the error is displayed in full debug form on the match below
cx.report_peer(peer_id, PeerAction::LowToleranceError, e.into());
}
// Note: no need to downscore peers here, already downscored on network context
let response_type = R::response_type();
let Some(lookup) = self.single_block_lookups.get_mut(&id.lookup_id) else {
@@ -459,23 +453,16 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// if both components have been processed.
request_state.on_processing_success()?;
// If this was the result of a block request, we can't determined if the block peer did anything
// wrong. If we already had both a block and blobs response processed, we should penalize the
// blobs peer because they did not provide all blobs on the initial request.
if lookup.both_components_processed() {
if let Some(blob_peer) = lookup
.blob_request_state
.state
.on_post_process_validation_failure()?
{
cx.report_peer(
blob_peer,
PeerAction::MidToleranceError,
"sent_incomplete_blobs",
);
}
// We don't request for other block components until being sure that the block has
// data. If we request blobs / columns to a peer we are sure those must exist.
// Therefore if all components are processed and we still receive `MissingComponents`
// it indicates an internal bug.
return Err(LookupRequestError::MissingComponentsAfterAllProcessed);
} else {
// Continue request, potentially request blobs
Action::Retry
}
Action::Retry
}
BlockProcessingResult::Ignored => {
// Beacon processor signalled to ignore the block processing result.

View File

@@ -39,6 +39,9 @@ pub enum LookupRequestError {
BadState(String),
/// Lookup failed for some other reason and should be dropped
Failed,
/// Received MissingComponents when all components have been processed. This should never
/// happen, and indicates some internal bug
MissingComponentsAfterAllProcessed,
/// Attempted to retrieve a not known lookup id
UnknownLookup,
/// Received a download result for a different request id than the in-flight request.
@@ -158,7 +161,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}
/// Potentially makes progress on this request if it's in a progress-able state
pub fn continue_request<R: RequestState<T>>(
fn continue_request<R: RequestState<T>>(
&mut self,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
@@ -285,10 +288,8 @@ pub enum State<T: Clone> {
AwaitingProcess(DownloadResult<T>),
/// Request is processing, sent by lookup sync
Processing(DownloadResult<T>),
/// Request is processed:
/// - `Processed(Some)` if lookup sync downloaded and sent to process this request
/// - `Processed(None)` if another source (i.e. gossip) sent this component for processing
Processed(Option<PeerId>),
/// Request is processed
Processed,
}
/// Object representing the state of a single block or blob lookup request.
@@ -463,8 +464,8 @@ impl<T: Clone> SingleLookupRequestState<T> {
pub fn on_processing_success(&mut self) -> Result<(), LookupRequestError> {
match &self.state {
State::Processing(result) => {
self.state = State::Processed(Some(result.peer_id));
State::Processing(_) => {
self.state = State::Processed;
Ok(())
}
other => Err(LookupRequestError::BadState(format!(
@@ -473,27 +474,11 @@ impl<T: Clone> SingleLookupRequestState<T> {
}
}
pub fn on_post_process_validation_failure(
&mut self,
) -> Result<Option<PeerId>, LookupRequestError> {
match &self.state {
State::Processed(peer_id) => {
let peer_id = *peer_id;
self.failed_processing = self.failed_processing.saturating_add(1);
self.state = State::AwaitingDownload;
Ok(peer_id)
}
other => Err(LookupRequestError::BadState(format!(
"Bad state on_post_process_validation_failure expected Processed got {other}"
))),
}
}
/// Mark a request as complete without any download or processing
pub fn on_completed_request(&mut self) -> Result<(), LookupRequestError> {
match &self.state {
State::AwaitingDownload => {
self.state = State::Processed(None);
self.state = State::Processed;
Ok(())
}
other => Err(LookupRequestError::BadState(format!(

View File

@@ -287,6 +287,11 @@ impl TestRig {
self.expect_no_active_single_lookups();
}
fn expect_no_active_lookups_empty_network(&mut self) {
self.expect_no_active_lookups();
self.expect_empty_network();
}
fn new_connected_peer(&mut self) -> PeerId {
let peer_id = PeerId::random();
self.network_globals
@@ -461,14 +466,16 @@ impl TestRig {
);
}
fn complete_single_lookup_block_valid(&mut self, block: SignedBeaconBlock<E>, import: bool) {
fn complete_lookup_block_download(&mut self, block: SignedBeaconBlock<E>) {
let block_root = block.canonical_root();
let block_slot = block.slot();
let id = self.expect_block_lookup_request(block_root);
self.expect_empty_network();
let peer_id = self.new_connected_peer();
self.single_lookup_block_response(id, peer_id, Some(block.into()));
self.single_lookup_block_response(id, peer_id, None);
}
fn complete_lookup_block_import_valid(&mut self, block_root: Hash256, import: bool) {
self.expect_block_process(ResponseType::Block);
let id = self.find_single_lookup_for(block_root);
self.single_block_component_processed(
@@ -477,12 +484,19 @@ impl TestRig {
BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root))
} else {
BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents(
block_slot, block_root,
Slot::new(0),
block_root,
))
},
)
}
fn complete_single_lookup_block_valid(&mut self, block: SignedBeaconBlock<E>, import: bool) {
let block_root = block.canonical_root();
self.complete_lookup_block_download(block);
self.complete_lookup_block_import_valid(block_root, import)
}
fn parent_lookup_failed(&mut self, id: SingleLookupReqId, peer_id: PeerId, error: RPCError) {
self.send_sync_message(SyncMessage::RpcError {
peer_id,
@@ -676,26 +690,6 @@ impl TestRig {
.unwrap_or_else(|e| panic!("Expected blob parent request for {for_block:?}: {e}"))
}
fn expect_lookup_request_block_and_blobs(&mut self, block_root: Hash256) -> SingleLookupReqId {
let id = self.expect_block_lookup_request(block_root);
// If we're in deneb, a blob request should have been triggered as well,
// we don't require a response because we're generateing 0-blob blocks in this test.
if self.after_deneb() {
let _ = self.expect_blob_lookup_request(block_root);
}
id
}
fn expect_parent_request_block_and_blobs(&mut self, block_root: Hash256) -> SingleLookupReqId {
let id = self.expect_block_parent_request(block_root);
// If we're in deneb, a blob request should have been triggered as well,
// we don't require a response because we're generateing 0-blob blocks in this test.
if self.after_deneb() {
let _ = self.expect_blob_parent_request(block_root);
}
id
}
#[track_caller]
fn expect_block_process(&mut self, response_type: ResponseType) {
match response_type {
@@ -932,7 +926,7 @@ fn test_single_block_lookup_happy_path() {
let block_root = block.canonical_root();
// Trigger the request
rig.trigger_unknown_block_from_attestation(block_root, peer_id);
let id = rig.expect_lookup_request_block_and_blobs(block_root);
let id = rig.expect_block_lookup_request(block_root);
// The peer provides the correct block, should not be penalized. Now the block should be sent
// for processing.
@@ -962,7 +956,7 @@ fn test_single_block_lookup_empty_response() {
// Trigger the request
r.trigger_unknown_block_from_attestation(block_root, peer_id);
let id = r.expect_lookup_request_block_and_blobs(block_root);
let id = r.expect_block_lookup_request(block_root);
// The peer does not have the block. It should be penalized.
r.single_lookup_block_response(id, peer_id, None);
@@ -985,7 +979,7 @@ fn test_single_block_lookup_wrong_response() {
// Trigger the request
rig.trigger_unknown_block_from_attestation(block_hash, peer_id);
let id = rig.expect_lookup_request_block_and_blobs(block_hash);
let id = rig.expect_block_lookup_request(block_hash);
// Peer sends something else. It should be penalized.
let bad_block = rig.rand_block();
@@ -1007,7 +1001,7 @@ fn test_single_block_lookup_failure() {
// Trigger the request
rig.trigger_unknown_block_from_attestation(block_hash, peer_id);
let id = rig.expect_lookup_request_block_and_blobs(block_hash);
let id = rig.expect_block_lookup_request(block_hash);
// The request fails. RPC failures are handled elsewhere so we should not penalize the peer.
rig.single_lookup_failed(id, peer_id, RPCError::UnsupportedProtocol);
@@ -1026,7 +1020,7 @@ fn test_single_block_lookup_becomes_parent_request() {
// Trigger the request
rig.trigger_unknown_block_from_attestation(block.canonical_root(), peer_id);
let id = rig.expect_lookup_request_block_and_blobs(block_root);
let id = rig.expect_block_parent_request(block_root);
// The peer provides the correct block, should not be penalized. Now the block should be sent
// for processing.
@@ -1044,7 +1038,7 @@ fn test_single_block_lookup_becomes_parent_request() {
BlockError::ParentUnknown(RpcBlock::new_without_blobs(None, block)).into(),
);
assert_eq!(rig.active_single_lookups_count(), 2); // 2 = current + parent
rig.expect_parent_request_block_and_blobs(parent_root);
rig.expect_block_parent_request(parent_root);
rig.expect_empty_network();
assert_eq!(rig.active_parent_lookups_count(), 1);
}
@@ -1058,10 +1052,12 @@ fn test_parent_lookup_happy_path() {
// Trigger the request
rig.trigger_unknown_parent_block(peer_id, block.into());
let id = rig.expect_parent_request_block_and_blobs(parent_root);
let id = rig.expect_block_parent_request(parent_root);
// Peer sends the right block, it should be sent for processing. Peer should not be penalized.
rig.parent_lookup_block_response(id, peer_id, Some(parent.into()));
// No request of blobs because the block has not data
rig.expect_empty_network();
rig.expect_block_process(ResponseType::Block);
rig.expect_empty_network();
@@ -1074,7 +1070,7 @@ fn test_parent_lookup_happy_path() {
);
rig.expect_parent_chain_process();
rig.parent_chain_processed_success(block_root, &[]);
rig.expect_no_active_lookups();
rig.expect_no_active_lookups_empty_network();
}
#[test]
@@ -1086,7 +1082,7 @@ fn test_parent_lookup_wrong_response() {
// Trigger the request
rig.trigger_unknown_parent_block(peer_id, block.into());
let id1 = rig.expect_parent_request_block_and_blobs(parent_root);
let id1 = rig.expect_block_parent_request(parent_root);
// Peer sends the wrong block, peer should be penalized and the block re-requested.
let bad_block = rig.rand_block();
@@ -1108,7 +1104,7 @@ fn test_parent_lookup_wrong_response() {
rig.parent_block_processed_imported(block_root);
rig.expect_parent_chain_process();
rig.parent_chain_processed_success(block_root, &[]);
rig.expect_no_active_lookups();
rig.expect_no_active_lookups_empty_network();
}
#[test]
@@ -1120,14 +1116,14 @@ fn test_parent_lookup_rpc_failure() {
// Trigger the request
rig.trigger_unknown_parent_block(peer_id, block.into());
let id1 = rig.expect_parent_request_block_and_blobs(parent_root);
let id = rig.expect_block_parent_request(parent_root);
// The request fails. It should be tried again.
rig.parent_lookup_failed_unavailable(id1, peer_id);
let id2 = rig.expect_block_parent_request(parent_root);
rig.parent_lookup_failed_unavailable(id, peer_id);
let id = rig.expect_block_parent_request(parent_root);
// Send the right block this time.
rig.parent_lookup_block_response(id2, peer_id, Some(parent.into()));
rig.parent_lookup_block_response(id, peer_id, Some(parent.into()));
rig.expect_block_process(ResponseType::Block);
// Add peer to child lookup to prevent it being dropped
@@ -1136,7 +1132,7 @@ fn test_parent_lookup_rpc_failure() {
rig.parent_block_processed_imported(block_root);
rig.expect_parent_chain_process();
rig.parent_chain_processed_success(block_root, &[]);
rig.expect_no_active_lookups();
rig.expect_no_active_lookups_empty_network();
}
#[test]
@@ -1152,9 +1148,6 @@ fn test_parent_lookup_too_many_attempts() {
for i in 1..=PARENT_FAIL_TOLERANCE {
let id = rig.expect_block_parent_request(parent_root);
// Blobs are only requested in the first iteration as this test only retries blocks
if rig.after_deneb() && i == 1 {
let _ = rig.expect_blob_parent_request(parent_root);
}
if i % 2 == 0 {
// make sure every error is accounted for
@@ -1178,7 +1171,7 @@ fn test_parent_lookup_too_many_attempts() {
}
}
rig.expect_no_active_lookups();
rig.expect_no_active_lookups_empty_network();
}
#[test]
@@ -1193,10 +1186,6 @@ fn test_parent_lookup_too_many_download_attempts_no_blacklist() {
for i in 1..=PARENT_FAIL_TOLERANCE {
assert!(!rig.failed_chains_contains(&block_root));
let id = rig.expect_block_parent_request(parent_root);
// Blobs are only requested in the first iteration as this test only retries blocks
if rig.after_deneb() && i == 1 {
let _ = rig.expect_blob_parent_request(parent_root);
}
if i % 2 != 0 {
// The request fails. It should be tried again.
rig.parent_lookup_failed_unavailable(id, peer_id);
@@ -1210,7 +1199,7 @@ fn test_parent_lookup_too_many_download_attempts_no_blacklist() {
assert!(!rig.failed_chains_contains(&block_root));
assert!(!rig.failed_chains_contains(&parent.canonical_root()));
rig.expect_no_active_lookups();
rig.expect_no_active_lookups_empty_network();
}
#[test]
@@ -1224,12 +1213,8 @@ fn test_parent_lookup_too_many_processing_attempts_must_blacklist() {
rig.trigger_unknown_parent_block(peer_id, block.into());
rig.log("Fail downloading the block");
for i in 0..(PARENT_FAIL_TOLERANCE - PROCESSING_FAILURES) {
for _ in 0..(PARENT_FAIL_TOLERANCE - PROCESSING_FAILURES) {
let id = rig.expect_block_parent_request(parent_root);
// Blobs are only requested in the first iteration as this test only retries blocks
if rig.after_deneb() && i == 0 {
let _ = rig.expect_blob_parent_request(parent_root);
}
// The request fails. It should be tried again.
rig.parent_lookup_failed_unavailable(id, peer_id);
}
@@ -1247,7 +1232,7 @@ fn test_parent_lookup_too_many_processing_attempts_must_blacklist() {
}
rig.assert_not_failed_chain(block_root);
rig.expect_no_active_lookups();
rig.expect_no_active_lookups_empty_network();
}
#[test]
@@ -1261,7 +1246,7 @@ fn test_parent_lookup_too_deep() {
rig.trigger_unknown_parent_block(peer_id, trigger_block);
for block in blocks.into_iter().rev() {
let id = rig.expect_parent_request_block_and_blobs(block.canonical_root());
let id = rig.expect_block_parent_request(block.canonical_root());
// the block
rig.parent_lookup_block_response(id, peer_id, Some(block.clone()));
// the stream termination
@@ -1326,7 +1311,7 @@ fn test_single_block_lookup_ignored_response() {
// Trigger the request
rig.trigger_unknown_block_from_attestation(block.canonical_root(), peer_id);
let id = rig.expect_lookup_request_block_and_blobs(block.canonical_root());
let id = rig.expect_block_lookup_request(block.canonical_root());
// The peer provides the correct block, should not be penalized. Now the block should be sent
// for processing.
@@ -1342,8 +1327,7 @@ fn test_single_block_lookup_ignored_response() {
rig.single_lookup_block_response(id, peer_id, None);
// Send an Ignored response, the request should be dropped
rig.single_block_component_processed(id.lookup_id, BlockProcessingResult::Ignored);
rig.expect_empty_network();
rig.expect_no_active_lookups();
rig.expect_no_active_lookups_empty_network();
}
#[test]
@@ -1355,7 +1339,7 @@ fn test_parent_lookup_ignored_response() {
// Trigger the request
rig.trigger_unknown_parent_block(peer_id, block.clone().into());
let id = rig.expect_parent_request_block_and_blobs(parent_root);
let id = rig.expect_block_parent_request(parent_root);
// Note: single block lookup for current `block` does not trigger any request because it does
// not have blobs, and the block is already cached
@@ -1385,7 +1369,7 @@ fn test_same_chain_race_condition() {
rig.trigger_unknown_parent_block(peer_id, trigger_block.clone());
for (i, block) in blocks.clone().into_iter().rev().enumerate() {
let id = rig.expect_parent_request_block_and_blobs(block.canonical_root());
let id = rig.expect_block_parent_request(block.canonical_root());
// the block
rig.parent_lookup_block_response(id, peer_id, Some(block.clone()));
// the stream termination
@@ -1421,7 +1405,7 @@ fn test_same_chain_race_condition() {
rig.expect_parent_chain_process();
rig.single_block_component_processed_imported(block.canonical_root());
}
rig.expect_no_active_lookups();
rig.expect_no_active_lookups_empty_network();
}
#[test]
@@ -1453,12 +1437,13 @@ fn block_in_processing_cache_becomes_invalid() {
r.insert_block_to_processing_cache(block.clone().into());
r.trigger_unknown_block_from_attestation(block_root, peer_id);
// Should not trigger block request
let id = r.expect_blob_lookup_request(block_root);
r.expect_empty_network();
// Simulate invalid block, removing it from processing cache
r.simulate_block_gossip_processing_becomes_invalid(block_root);
// Should download and process the block
r.complete_single_lookup_block_valid(block, false);
// Should download block, then issue blobs request
r.complete_lookup_block_download(block);
let id = r.expect_blob_lookup_request(block_root);
r.complete_lookup_block_import_valid(block_root, false);
// Resolve blob and expect lookup completed
r.complete_single_lookup_blob_lookup_valid(id, peer_id, blobs, true);
r.expect_no_active_lookups();
@@ -1475,10 +1460,10 @@ fn block_in_processing_cache_becomes_valid_imported() {
r.insert_block_to_processing_cache(block.clone().into());
r.trigger_unknown_block_from_attestation(block_root, peer_id);
// Should not trigger block request
let id = r.expect_blob_lookup_request(block_root);
r.expect_empty_network();
// Resolve the block from processing step
r.simulate_block_gossip_processing_becomes_valid_missing_components(block.into());
let id = r.expect_blob_lookup_request(block_root);
// Resolve blob and expect lookup completed
r.complete_single_lookup_blob_lookup_valid(id, peer_id, blobs, true);
r.expect_no_active_lookups();
@@ -1592,8 +1577,7 @@ mod deneb_only {
peer_id, block_root,
));
let block_req_id = rig.expect_block_lookup_request(block_root);
let blob_req_id = rig.expect_blob_lookup_request(block_root);
(Some(block_req_id), Some(blob_req_id), None, None)
(Some(block_req_id), None, None, None)
}
RequestTrigger::GossipUnknownParentBlock { .. } => {
rig.send_sync_message(SyncMessage::UnknownParentBlock(
@@ -1604,14 +1588,8 @@ mod deneb_only {
let parent_root = block.parent_root();
let parent_block_req_id = rig.expect_block_parent_request(parent_root);
let parent_blob_req_id = rig.expect_blob_parent_request(parent_root);
rig.expect_empty_network(); // expect no more requests
(
None,
None,
Some(parent_block_req_id),
Some(parent_blob_req_id),
)
(None, None, Some(parent_block_req_id), None)
}
RequestTrigger::GossipUnknownParentBlob { .. } => {
let single_blob = blobs.first().cloned().unwrap();
@@ -1619,14 +1597,8 @@ mod deneb_only {
rig.send_sync_message(SyncMessage::UnknownParentBlob(peer_id, single_blob));
let parent_block_req_id = rig.expect_block_parent_request(parent_root);
let parent_blob_req_id = rig.expect_blob_parent_request(parent_root);
rig.expect_empty_network(); // expect no more requests
(
None,
None,
Some(parent_block_req_id),
Some(parent_blob_req_id),
)
(None, None, Some(parent_block_req_id), None)
}
};
@@ -1675,6 +1647,23 @@ mod deneb_only {
self
}
fn parent_block_response_expect_blobs(mut self) -> Self {
self.rig.expect_empty_network();
let block = self.parent_block.pop_front().unwrap().clone();
let _ = self.unknown_parent_block.insert(block.clone());
self.rig.parent_lookup_block_response(
self.parent_block_req_id.expect("parent request id"),
self.peer_id,
Some(block),
);
// Expect blobs request after sending block
let s = self.expect_parent_blobs_request();
s.rig.assert_parent_lookups_count(1);
s
}
fn parent_blob_response(mut self) -> Self {
let blobs = self.parent_blobs.pop_front().unwrap();
let _ = self.unknown_parent_blobs.insert(blobs.clone());
@@ -1687,7 +1676,7 @@ mod deneb_only {
assert_eq!(self.rig.active_parent_lookups_count(), 1);
}
self.rig.parent_lookup_blob_response(
self.parent_blob_req_id.expect("blob request id"),
self.parent_blob_req_id.expect("parent blob request id"),
self.peer_id,
None,
);
@@ -1696,7 +1685,7 @@ mod deneb_only {
}
fn block_response_triggering_process(self) -> Self {
let mut me = self.block_response();
let mut me = self.block_response_and_expect_blob_request();
me.rig.expect_block_process(ResponseType::Block);
// The request should still be active.
@@ -1704,7 +1693,7 @@ mod deneb_only {
me
}
fn block_response(mut self) -> Self {
fn block_response_and_expect_blob_request(mut self) -> Self {
// The peer provides the correct block, should not be penalized. Now the block should be sent
// for processing.
self.rig.single_lookup_block_response(
@@ -1712,12 +1701,14 @@ mod deneb_only {
self.peer_id,
Some(self.block.clone()),
);
self.rig.expect_empty_network();
// After responding with block the node will issue a blob request
let mut s = self.expect_blobs_request();
s.rig.expect_empty_network();
// The request should still be active.
self.rig
.assert_lookup_is_active(self.block.canonical_root());
self
s.rig.assert_lookup_is_active(s.block.canonical_root());
s
}
fn blobs_response(mut self) -> Self {
@@ -1831,6 +1822,21 @@ mod deneb_only {
self
}
fn parent_block_missing_components(mut self) -> Self {
let parent_root = *self.parent_block_roots.first().unwrap();
self.rig
.log(&format!("parent_block_missing_components {parent_root:?}"));
self.rig.parent_block_processed(
self.block_root,
BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents(
Slot::new(0),
parent_root,
)),
);
self.rig.expect_no_requests_for(parent_root);
self
}
fn parent_blob_imported(mut self) -> Self {
let parent_root = *self.parent_block_roots.first().unwrap();
self.rig
@@ -1864,26 +1870,6 @@ mod deneb_only {
self
}
fn parent_block_missing_components(mut self) -> Self {
let block = self.unknown_parent_block.clone().unwrap();
self.rig.parent_block_processed(
self.block_root,
BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents(
block.slot(),
block.canonical_root(),
)),
);
self.rig.parent_blob_processed(
self.block_root,
BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents(
block.slot(),
block.canonical_root(),
)),
);
assert_eq!(self.rig.active_parent_lookups_count(), 1);
self
}
fn invalid_parent_processed(mut self) -> Self {
self.rig.parent_block_processed(
self.block_root,
@@ -1922,45 +1908,35 @@ mod deneb_only {
self.block_root,
)),
);
self.rig.assert_single_lookups_count(1);
self
}
// Add block to da_checker so blobs request can continue
self.rig.insert_block_to_da_checker(self.block.clone());
fn missing_components_from_blob_request(mut self) -> Self {
self.rig.single_blob_component_processed(
self.blob_req_id.expect("blob request id").lookup_id,
BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents(
self.slot,
self.block_root,
)),
);
self.rig.assert_single_lookups_count(1);
self
}
fn complete_current_block_and_blobs_lookup(self) -> Self {
self.expect_block_request()
.expect_blobs_request()
.block_response()
.block_response_and_expect_blob_request()
.blobs_response()
// TODO: Should send blobs for processing
.expect_block_process()
.block_imported()
}
fn empty_parent_blobs_then_parent_block(self) -> Self {
fn parent_block_then_empty_parent_blobs(self) -> Self {
self.log(
" Return empty blobs for parent, block errors with missing components, downscore",
)
.empty_parent_blobs_response()
.expect_no_penalty_and_no_requests()
.parent_block_response()
.parent_block_missing_components()
.expect_penalty("sent_incomplete_blobs")
.expect_parent_blobs_request()
.empty_parent_blobs_response()
.expect_penalty("NotEnoughResponsesReturned")
.log("Re-request parent blobs, succeed and import parent")
.expect_parent_blobs_request()
.parent_blob_response()
.expect_block_process()
.parent_block_missing_components()
// Insert new peer into child request before completing parent
.trigger_unknown_block_from_attestation()
.parent_blob_imported()
@@ -2044,77 +2020,27 @@ mod deneb_only {
return;
};
tester
.block_response_triggering_process()
.block_response_and_expect_blob_request()
.blobs_response()
.block_missing_components() // blobs not yet imported
.blobs_response_was_valid()
.blob_imported(); // now blobs resolve as imported
}
#[test]
fn single_block_and_blob_lookup_blobs_returned_first_attestation() {
let Some(tester) = DenebTester::new(RequestTrigger::AttestationUnknownBlock) else {
return;
};
tester
.blobs_response() // hold blobs for processing
.block_response_triggering_process()
.block_missing_components() // blobs not yet imported
.blobs_response_was_valid()
.blob_imported(); // now blobs resolve as imported
}
#[test]
fn single_block_and_blob_lookup_empty_response_attestation() {
let Some(tester) = DenebTester::new(RequestTrigger::AttestationUnknownBlock) else {
return;
};
tester
.empty_block_response()
.expect_penalty("NoResponseReturned")
.expect_block_request()
.expect_no_blobs_request()
.empty_blobs_response()
.expect_empty_beacon_processor()
.expect_no_penalty()
.expect_no_block_request()
.expect_no_blobs_request()
.block_response_triggering_process()
.missing_components_from_block_request();
}
#[test]
fn single_block_response_then_empty_blob_response_attestation() {
let Some(tester) = DenebTester::new(RequestTrigger::AttestationUnknownBlock) else {
return;
};
tester
.block_response_triggering_process()
.block_response_and_expect_blob_request()
.missing_components_from_block_request()
.empty_blobs_response()
.missing_components_from_blob_request()
.expect_penalty("sent_incomplete_blobs")
.expect_penalty("NotEnoughResponsesReturned")
.expect_blobs_request()
.expect_no_block_request();
}
#[test]
fn single_blob_response_then_empty_block_response_attestation() {
let Some(tester) = DenebTester::new(RequestTrigger::AttestationUnknownBlock) else {
return;
};
tester
.blobs_response()
.expect_no_penalty_and_no_requests()
// blobs not sent for processing until the block is processed
.empty_block_response()
.expect_penalty("NoResponseReturned")
.expect_block_request()
.expect_no_blobs_request();
}
#[test]
fn single_invalid_block_response_then_blob_response_attestation() {
let Some(tester) = DenebTester::new(RequestTrigger::AttestationUnknownBlock) else {
@@ -2156,8 +2082,7 @@ mod deneb_only {
.missing_components_from_block_request()
.invalidate_blobs_too_few()
.blobs_response()
.missing_components_from_blob_request()
.expect_penalty("sent_incomplete_blobs")
.expect_penalty("NotEnoughResponsesReturned")
.expect_blobs_request()
.expect_no_block_request();
}
@@ -2171,37 +2096,12 @@ mod deneb_only {
.block_response_triggering_process()
.invalidate_blobs_too_many()
.blobs_response()
.expect_penalty("DuplicateData")
.expect_blobs_request()
.expect_penalty("TooManyResponses")
// Network context returns "download success" because the request has enough blobs + it
// downscores the peer for returning too many.
.expect_no_block_request();
}
#[test]
fn too_few_blobs_response_then_block_response_attestation() {
let Some(tester) = DenebTester::new(RequestTrigger::AttestationUnknownBlock) else {
return;
};
tester
.invalidate_blobs_too_few()
.blobs_response() // blobs are not sent until the block is processed
.expect_no_penalty_and_no_requests()
.block_response_triggering_process();
}
#[test]
fn too_many_blobs_response_then_block_response_attestation() {
let Some(tester) = DenebTester::new(RequestTrigger::AttestationUnknownBlock) else {
return;
};
tester
.invalidate_blobs_too_many()
.blobs_response()
.expect_penalty("DuplicateData")
.expect_blobs_request()
.expect_no_block_request()
.block_response_triggering_process();
}
// Test peer returning block that has unknown parent, and a new lookup is created
#[test]
fn parent_block_unknown_parent() {
@@ -2210,12 +2110,11 @@ mod deneb_only {
};
tester
.expect_empty_beacon_processor()
.parent_block_response()
.parent_block_response_expect_blobs()
.parent_blob_response()
.expect_block_process()
.parent_block_unknown_parent()
.expect_parent_block_request()
.expect_parent_blobs_request()
.expect_empty_beacon_processor();
}
@@ -2226,7 +2125,7 @@ mod deneb_only {
return;
};
tester
.parent_block_response()
.parent_block_response_expect_blobs()
.parent_blob_response()
.expect_block_process()
.invalid_parent_processed()
@@ -2246,19 +2145,19 @@ mod deneb_only {
.expect_penalty("NoResponseReturned")
.expect_block_request()
.expect_no_blobs_request()
.block_response()
.block_response_and_expect_blob_request()
.blobs_response()
.block_imported()
.expect_no_active_lookups();
}
#[test]
fn empty_parent_blobs_then_parent_block() {
fn parent_block_then_empty_parent_blobs() {
let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownParentBlock(1)) else {
return;
};
tester
.empty_parent_blobs_then_parent_block()
.parent_block_then_empty_parent_blobs()
.log("resolve original block trigger blobs request and import")
// Should not have block request, it is cached
.expect_blobs_request()
@@ -2274,12 +2173,11 @@ mod deneb_only {
};
tester
.expect_empty_beacon_processor()
.parent_block_response()
.parent_block_response_expect_blobs()
.parent_blob_response()
.expect_block_process()
.parent_block_unknown_parent()
.expect_parent_block_request()
.expect_parent_blobs_request()
.expect_empty_beacon_processor();
}
@@ -2290,7 +2188,7 @@ mod deneb_only {
};
tester
.expect_empty_beacon_processor()
.parent_block_response()
.parent_block_response_expect_blobs()
.parent_blob_response()
.expect_block_process()
.invalid_parent_processed()
@@ -2307,6 +2205,7 @@ mod deneb_only {
};
tester
.parent_block_response()
.expect_parent_blobs_request()
.parent_blob_response()
.expect_block_process()
.trigger_unknown_block_from_attestation()
@@ -2316,12 +2215,12 @@ mod deneb_only {
}
#[test]
fn empty_parent_blobs_then_parent_block_blob_trigger() {
fn parent_block_then_empty_parent_blobs_blob_trigger() {
let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownParentBlob(1)) else {
return;
};
tester
.empty_parent_blobs_then_parent_block()
.parent_block_then_empty_parent_blobs()
.log("resolve original block trigger blobs request and import")
.complete_current_block_and_blobs_lookup()
.expect_no_active_lookups();
@@ -2334,15 +2233,15 @@ mod deneb_only {
};
tester
.expect_empty_beacon_processor()
.parent_block_response()
.parent_block_response_expect_blobs()
.parent_blob_response()
.expect_no_penalty()
.expect_block_process()
.parent_block_unknown_parent()
.expect_parent_block_request()
.expect_parent_blobs_request()
.expect_empty_beacon_processor()
.parent_block_response()
.expect_parent_blobs_request()
.parent_blob_response()
.expect_no_penalty()
.expect_block_process();

View File

@@ -816,7 +816,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
peer_id: PeerId,
block: RpcEvent<Arc<SignedBeaconBlock<T::EthSpec>>>,
) {
if let Some(resp) = self.network.on_single_block_response(id, block) {
if let Some(resp) = self.network.on_single_block_response(id, peer_id, block) {
self.block_lookups
.on_download_response::<BlockRequestState<T::EthSpec>>(
id,
@@ -858,7 +858,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
peer_id: PeerId,
blob: RpcEvent<Arc<BlobSidecar<T::EthSpec>>>,
) {
if let Some(resp) = self.network.on_single_blob_response(id, blob) {
if let Some(resp) = self.network.on_single_blob_response(id, peer_id, blob) {
self.block_lookups
.on_download_response::<BlobRequestState<T::EthSpec>>(
id,

View File

@@ -12,7 +12,6 @@ use crate::status::ToStatusMessage;
use crate::sync::block_lookups::SingleLookupId;
use crate::sync::manager::{BlockProcessType, SingleLookupReqId};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::validator_monitor::timestamp_now;
use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState};
use fnv::FnvHashMap;
use lighthouse_network::rpc::methods::BlobsByRangeRequest;
@@ -383,24 +382,18 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
block_root: Hash256,
downloaded_block_expected_blobs: Option<usize>,
) -> Result<LookupRequestResult, &'static str> {
let expected_blobs = downloaded_block_expected_blobs
.or_else(|| {
self.chain
.data_availability_checker
.num_expected_blobs(&block_root)
})
.unwrap_or_else(|| {
// If we don't about the block being requested, attempt to fetch all blobs
if self
.chain
.data_availability_checker
.da_check_required_for_current_epoch()
{
T::EthSpec::max_blobs_per_block()
} else {
0
}
});
let Some(expected_blobs) = downloaded_block_expected_blobs.or_else(|| {
self.chain
.data_availability_checker
.num_expected_blobs(&block_root)
}) else {
// Wait to download the block before downloading blobs. Then we can be sure that the
// block has data, so there's no need to do "blind" requests for all possible blobs and
// latter handle the case where if the peer sent no blobs, penalize.
// - if `downloaded_block_expected_blobs` is Some = block is downloading or processing.
// - if `num_expected_blobs` returns Some = block is processed.
return Ok(LookupRequestResult::Pending);
};
let imported_blob_indexes = self
.chain
@@ -554,13 +547,14 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn on_single_block_response(
&mut self,
request_id: SingleLookupReqId,
peer_id: PeerId,
block: RpcEvent<Arc<SignedBeaconBlock<T::EthSpec>>>,
) -> Option<RpcProcessingResult<Arc<SignedBeaconBlock<T::EthSpec>>>> {
let Entry::Occupied(mut request) = self.blocks_by_root_requests.entry(request_id) else {
return None;
};
Some(match block {
let resp = match block {
RpcEvent::Response(block, seen_timestamp) => {
match request.get_mut().add_response(block) {
Ok(block) => Ok((block, seen_timestamp)),
@@ -579,43 +573,61 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request.remove();
Err(e.into())
}
})
};
if let Err(LookupFailure::LookupVerifyError(e)) = &resp {
self.report_peer(peer_id, PeerAction::LowToleranceError, e.into());
}
Some(resp)
}
pub fn on_single_blob_response(
&mut self,
request_id: SingleLookupReqId,
peer_id: PeerId,
blob: RpcEvent<Arc<BlobSidecar<T::EthSpec>>>,
) -> Option<RpcProcessingResult<FixedBlobSidecarList<T::EthSpec>>> {
let Entry::Occupied(mut request) = self.blobs_by_root_requests.entry(request_id) else {
return None;
};
Some(match blob {
RpcEvent::Response(blob, _) => match request.get_mut().add_response(blob) {
Ok(Some(blobs)) => to_fixed_blob_sidecar_list(blobs)
.map(|blobs| (blobs, timestamp_now()))
.map_err(Into::into),
Ok(None) => return None,
Err(e) => {
request.remove();
Err(e.into())
let resp = match blob {
RpcEvent::Response(blob, seen_timestamp) => {
let request = request.get_mut();
match request.add_response(blob) {
Ok(Some(blobs)) => to_fixed_blob_sidecar_list(blobs)
.map(|blobs| (blobs, seen_timestamp))
.map_err(|e| (e.into(), request.resolve())),
Ok(None) => return None,
Err(e) => Err((e.into(), request.resolve())),
}
}
RpcEvent::StreamTermination => match request.remove().terminate() {
Ok(_) => return None,
// (err, false = not resolved) because terminate returns Ok() if resolved
Err(e) => Err((e.into(), false)),
},
RpcEvent::StreamTermination => {
// Stream terminator
match request.remove().terminate() {
Some(blobs) => to_fixed_blob_sidecar_list(blobs)
.map(|blobs| (blobs, timestamp_now()))
.map_err(Into::into),
None => return None,
RpcEvent::RPCError(e) => Err((e.into(), request.remove().resolve())),
};
match resp {
Ok(resp) => Some(Ok(resp)),
// Track if this request has already returned some value downstream. Ensure that
// downstream code only receives a single Result per request. If the serving peer does
// multiple penalizable actions per request, downscore and return None. This allows to
// catch if a peer is returning more blobs than requested or if the excess blobs are
// invalid.
Err((e, resolved)) => {
if let LookupFailure::LookupVerifyError(e) = &e {
self.report_peer(peer_id, PeerAction::LowToleranceError, e.into());
}
if resolved {
None
} else {
Some(Err(e))
}
}
RpcEvent::RPCError(e) => {
request.remove();
Err(e.into())
}
})
}
}
pub fn send_block_for_processing(

View File

@@ -9,6 +9,7 @@ use types::{
#[derive(Debug, PartialEq, Eq, IntoStaticStr)]
pub enum LookupVerifyError {
NoResponseReturned,
NotEnoughResponsesReturned { expected: usize, actual: usize },
TooManyResponses,
UnrequestedBlockRoot(Hash256),
UnrequestedBlobIndex(u64),
@@ -139,11 +140,20 @@ impl<E: EthSpec> ActiveBlobsByRootRequest<E> {
}
}
pub fn terminate(self) -> Option<Vec<Arc<BlobSidecar<E>>>> {
pub fn terminate(self) -> Result<(), LookupVerifyError> {
if self.resolved {
None
Ok(())
} else {
Some(self.blobs)
Err(LookupVerifyError::NotEnoughResponsesReturned {
expected: self.request.indices.len(),
actual: self.blobs.len(),
})
}
}
/// Mark request as resolved (= has returned something downstream) while marking this status as
/// true for future calls.
pub fn resolve(&mut self) -> bool {
std::mem::replace(&mut self.resolved, true)
}
}