diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 8c6be1f5da..41cffa0301 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -70,7 +70,7 @@ use crate::{ use derivative::Derivative; use eth2::types::EventKind; use execution_layer::PayloadStatus; -use fork_choice::{AttestationFromBlock, PayloadVerificationStatus}; +pub use fork_choice::{AttestationFromBlock, PayloadVerificationStatus}; use parking_lot::RwLockReadGuard; use proto_array::Block as ProtoBlock; use safe_arith::ArithError; diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index dd573f5cc9..fb577b9a43 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -11,7 +11,7 @@ use slot_clock::SlotClock; use ssz_types::{Error, FixedVector, VariableList}; use state_processing::per_block_processing::deneb::deneb::verify_kzg_commitments_against_transactions; use std::collections::hash_map::{Entry, OccupiedEntry}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use strum::IntoStaticStr; use types::beacon_block_body::KzgCommitments; @@ -142,75 +142,61 @@ impl DataAvailabilityChecker { .map_or(false, |cache| cache.executed_block.is_some()) } - pub fn get_missing_blob_ids(&self, block_root: &Hash256) -> Vec { - let epoch = self.slot_clock.now().map(|s| s.epoch(T::slots_per_epoch())); - if epoch.map_or(false, |e| self.da_check_required(e)) { - self.availability_cache - .read() - .get(block_root) - .and_then(|cache| { - cache.executed_block.as_ref().map(|block| { - block.get_filtered_blob_ids(|i, _| cache.verified_blobs.get(i).is_none()) + pub fn get_missing_blob_ids_checking_cache( + &self, + block_root: Hash256, + ) -> Option> { + let guard = self.availability_cache.read(); + let (block, blob_indices) = guard + .get(&block_root) + .map(|cache| { + let block_opt = cache + .executed_block + .as_ref() + .map(|block| &block.block.block); + let blobs = cache + .verified_blobs + .iter() + .enumerate() + .filter_map(|(i, maybe_blob)| maybe_blob.as_ref().map(|_| i)) + .collect::>(); + (block_opt, blobs) + }) + .unwrap_or_default(); + self.get_missing_blob_ids(block_root, block, Some(blob_indices)) + } + + /// A `None` indicates blobs are not required. + /// + /// If there's no block, all possible ids will be returned that don't exist in the given blobs. + /// If there no blobs, all possible ids will be returned. + pub fn get_missing_blob_ids( + &self, + block_root: Hash256, + block_opt: Option<&Arc>>, + blobs_opt: Option>, + ) -> Option> { + let epoch = self.slot_clock.now()?.epoch(T::slots_per_epoch()); + + self.da_check_required(epoch).then(|| { + block_opt + .map(|block| { + block.get_filtered_blob_ids(Some(block_root), |i, _| { + blobs_opt.as_ref().map_or(true, |blobs| !blobs.contains(&i)) }) }) .unwrap_or_else(|| { let mut blob_ids = Vec::with_capacity(T::max_blobs_per_block()); for i in 0..T::max_blobs_per_block() { - blob_ids.push(BlobIdentifier { - block_root: *block_root, - index: i as u64, - }); + if blobs_opt.as_ref().map_or(true, |blobs| !blobs.contains(&i)) { + blob_ids.push(BlobIdentifier { + block_root, + index: i as u64, + }); + } } blob_ids }) - } else { - vec![] - } - } - - pub fn wrap_block( - &self, - block_root: Hash256, - block: Arc>, - blobs: FixedBlobSidecarList, - ) -> Result, AvailabilityCheckError> { - Ok(match self.get_blob_requirements(&block)? { - BlobRequirements::EmptyBlobs => BlockWrapper::Block(block), - BlobRequirements::NotRequired => BlockWrapper::Block(block), - BlobRequirements::PreDeneb => BlockWrapper::Block(block), - BlobRequirements::Required => { - let expected_num_blobs = block - .message() - .body() - .blob_kzg_commitments() - .map(|commitments| commitments.len()) - .unwrap_or(0); - - let mut blob_count = 0; - while let Some((index, Some(blob))) = blobs.iter().enumerate().next() { - blob_count += 1; - if blob.block_root != block_root { - return Err(AvailabilityCheckError::BlockBlobRootMismatch { - block_root, - blob_block_root: blob.block_root, - }); - } - - let expected_index = index as u64; - if expected_index != blob.index { - return Err(AvailabilityCheckError::UnorderedBlobs { - expected_index, - blob_index: blob.index, - }); - } - } - - if blob_count < expected_num_blobs { - return Err(AvailabilityCheckError::MissingBlobs(block_root)); - } - - BlockWrapper::BlockAndBlobs(block, blobs) - } }) } @@ -567,12 +553,11 @@ impl AvailabilityPendingBlock { self.block.slot() } pub fn num_blobs_expected(&self) -> usize { - self.kzg_commitments() - .map_or(0, |commitments| commitments.len()) + self.block.num_expected_blobs() } pub fn get_all_blob_ids(&self, block_root: Option) -> Vec { - self.get_filtered_blob_ids(block_root, |_, _| true) + self.block.get_expected_blob_ids(block_root) } pub fn get_filtered_blob_ids( @@ -580,18 +565,7 @@ impl AvailabilityPendingBlock { block_root: Option, filter: impl Fn(usize, Hash256) -> bool, ) -> Vec { - let block_root = block_root.unwrap_or_else(|| self.as_block().canonical_root()); - let num_blobs_expected = self.num_blobs_expected(); - let mut blob_ids = Vec::with_capacity(num_blobs_expected); - for i in 0..num_blobs_expected { - if filter(i, block_root) { - blob_ids.push(BlobIdentifier { - block_root, - index: i as u64, - }); - } - } - blob_ids + self.block.get_filtered_blob_ids(block_root, filter) } } diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index fa3287a4ef..74c99f18d9 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -69,7 +69,9 @@ pub use self::historical_blocks::HistoricalBlockError; pub use attestation_verification::Error as AttestationError; pub use beacon_fork_choice_store::{BeaconForkChoiceStore, Error as ForkChoiceStoreError}; pub use block_verification::{ - get_block_root, BlockError, ExecutedBlock, ExecutionPayloadError, GossipVerifiedBlock, + get_block_root, AvailabilityPendingExecutedBlock, BlockError, ExecutedBlock, + ExecutionPayloadError, GossipVerifiedBlock, IntoExecutionPendingBlock, + PayloadVerificationOutcome, PayloadVerificationStatus, }; pub use canonical_head::{CachedHead, CanonicalHead, CanonicalHeadRwLock}; pub use eth1_chain::{Eth1Chain, Eth1ChainBackend}; diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index b057961c4e..2abdaf295f 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -58,7 +58,7 @@ impl Worker { ) { if !should_process { // Sync handles these results - self.send_sync_message(SyncMessage::BlockPartProcessed { + self.send_sync_message(SyncMessage::BlockComponentProcessed { process_type, result: crate::sync::manager::BlockProcessingResult::Ignored, response_type: crate::sync::manager::ResponseType::Block, @@ -192,7 +192,7 @@ impl Worker { } } // Sync handles these results - self.send_sync_message(SyncMessage::BlockPartProcessed { + self.send_sync_message(SyncMessage::BlockComponentProcessed { process_type, result: result.into(), response_type: ResponseType::Block, @@ -229,7 +229,7 @@ impl Worker { .await; // Sync handles these results - self.send_sync_message(SyncMessage::BlockPartProcessed { + self.send_sync_message(SyncMessage::BlockComponentProcessed { process_type, result: result.into(), response_type: ResponseType::Blob, diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index e7fde333df..595d5dbbfd 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -1,5 +1,5 @@ use beacon_chain::blob_verification::{AsBlock, BlockWrapper}; -use beacon_chain::data_availability_checker::{AvailabilityCheckError, DataAvailabilityChecker}; +use beacon_chain::data_availability_checker::{DataAvailabilityChecker}; use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; use lighthouse_network::rpc::RPCError; use lighthouse_network::{PeerAction, PeerId}; @@ -15,7 +15,7 @@ use strum::Display; use types::blob_sidecar::FixedBlobSidecarList; use types::{BlobSidecar, SignedBeaconBlock, Slot}; -use self::parent_lookup::{LookupDownloadStatus, PARENT_FAIL_TOLERANCE}; +use self::parent_lookup::PARENT_FAIL_TOLERANCE; use self::parent_lookup::{ParentLookup, ParentVerifyError}; use self::single_block_lookup::{LookupVerifyError, SingleBlockLookup}; use super::manager::BlockProcessingResult; @@ -26,6 +26,7 @@ use super::{ }; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent}; use crate::metrics; +use crate::sync::block_lookups::single_block_lookup::UnknownParentComponents; mod parent_lookup; mod single_block_lookup; @@ -111,12 +112,6 @@ impl PeerShouldHave { PeerShouldHave::Neither(_) => false, } } - fn should_have_blobs(&self) -> bool { - match self { - PeerShouldHave::BlockAndBlobs(_) => true, - PeerShouldHave::Neither(_) => false, - } - } } #[derive(Debug, Copy, Clone)] @@ -146,19 +141,37 @@ impl BlockLookups { pub fn search_block( &mut self, - hash: Hash256, + block_root: Hash256, peer_source: PeerShouldHave, cx: &mut SyncNetworkContext, ) { - self.search_block_with(|_| {}, hash, peer_source, cx) + self.search_block_with(block_root, None, None, peer_source, cx) + } + + pub fn search_current_unknown_parent_block_and_blobs( + &mut self, + block_root: Hash256, + block: Option>>, + blobs: Option>, + peer_id: PeerId, + cx: &mut SyncNetworkContext, + ) { + self.search_block_with( + block_root, + block, + blobs, + PeerShouldHave::Neither(peer_id), + cx, + ); } /// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is /// constructed. pub fn search_block_with( &mut self, - cache_fn: impl Fn(&mut SingleBlockLookup), - hash: Hash256, + block_root: Hash256, + block: Option>>, + blobs: Option>, peer_source: PeerShouldHave, cx: &mut SyncNetworkContext, ) { @@ -167,14 +180,15 @@ impl BlockLookups { .single_block_lookups .iter_mut() .any(|(_, _, single_block_request)| { - single_block_request.add_peer_if_useful(&hash, peer_source) + single_block_request.add_peer_if_useful(&block_root, peer_source) }) { return; } if self.parent_lookups.iter_mut().any(|parent_req| { - parent_req.add_peer_if_useful(&hash, peer_source) || parent_req.contains_block(&hash) + parent_req.add_peer_if_useful(&block_root, peer_source) + || parent_req.contains_block(&block_root) }) { // If the block was already downloaded, or is being downloaded in this moment, do not // request it. @@ -184,7 +198,7 @@ impl BlockLookups { if self .processing_parent_lookups .values() - .any(|(hashes, _last_parent_request)| hashes.contains(&hash)) + .any(|(hashes, _last_parent_request)| hashes.contains(&block_root)) { // we are already processing this block, ignore it. return; @@ -193,13 +207,26 @@ impl BlockLookups { debug!( self.log, "Searching for block"; - "peer_id" => %peer_source, - "block" => %hash + "peer_id" => ?peer_source, + "block" => ?block_root ); - let mut single_block_request = - SingleBlockLookup::new(hash, peer_source, self.da_checker.clone()); - cache_fn(&mut single_block_request); + let triggered_by_unknown_parent = block.is_some() || blobs.is_some(); + let unknown_parent_components = if triggered_by_unknown_parent { + Some(UnknownParentComponents { + downloaded_block: block, + downloaded_blobs: blobs.unwrap_or_default(), + }) + } else { + None + }; + + let mut single_block_request = SingleBlockLookup::new( + block_root, + unknown_parent_components, + peer_source, + self.da_checker.clone(), + ); let block_request_id = if let Ok(Some((peer_id, block_request))) = single_block_request.request_block() { @@ -224,40 +251,6 @@ impl BlockLookups { ); } - pub fn search_current_unknown_parent( - &mut self, - block_root: Hash256, - block: BlockWrapper, - peer_id: PeerId, - cx: &mut SyncNetworkContext, - ) { - self.search_block_with( - |request| { - let _ = request.add_block_wrapper(block_root, block.clone()); - }, - block_root, - PeerShouldHave::Neither(peer_id), - cx, - ); - } - - pub fn search_current_unknown_blob_parent( - &mut self, - blob: Arc>, - peer_id: PeerId, - cx: &mut SyncNetworkContext, - ) { - let block_root = blob.block_root; - self.search_block_with( - |request| { - let _ = request.add_blob(blob.clone()); - }, - block_root, - PeerShouldHave::Neither(peer_id), - cx, - ); - } - /// If a block is attempted to be processed but we do not know its parent, this function is /// called in order to find the block's parent. pub fn search_parent( @@ -268,7 +261,7 @@ impl BlockLookups { peer_id: PeerId, cx: &mut SyncNetworkContext, ) { - // Gossip blocks or blobs shouldn't be propogated if parents are unavailable. + // Gossip blocks or blobs shouldn't be propagated if parents are unavailable. let peer_source = PeerShouldHave::BlockAndBlobs(peer_id); // If this block or it's parent is part of a known failed chain, ignore it. @@ -319,33 +312,24 @@ impl BlockLookups { let stream_terminator = block.is_none().into(); let log = self.log.clone(); - let Some((triggered_parent_request, request_id_ref, request_ref)) = self.find_single_lookup_request(id, stream_terminator, ResponseType::Block) else { + let Some((pending_parent_request, request_id_ref, request_ref)) = self.find_single_lookup_request(id, stream_terminator, ResponseType::Block) else { return; }; let should_remove = match request_ref.verify_block(block) { - Ok(Some((root, block))) => { - if triggered_parent_request { - if let LookupDownloadStatus::AvailabilityCheck(e) = - request_ref.add_block(root, block) - { - handle_availability_check_error( - request_id_ref, - request_ref, - ResponseType::Blob, - peer_id, - e, - cx, - &log, - ) - } else { - ShouldRemoveLookup::False - } - } else { + Ok(Some((block_root, block))) => { + if let Some(parent_components) = request_ref.unknown_parent_components.as_mut() { + parent_components.add_unknown_parent_block(block.clone()); + }; + + if !pending_parent_request { + let block_wrapper = request_ref + .get_downloaded_block() + .unwrap_or(BlockWrapper::Block(block)); // This is the correct block, send it for processing match self.send_block_for_processing( - root, - BlockWrapper::Block(block), + block_root, + block_wrapper, seen_timestamp, BlockProcessType::SingleBlock { id }, cx, @@ -353,6 +337,8 @@ impl BlockLookups { Ok(()) => ShouldRemoveLookup::False, Err(()) => ShouldRemoveLookup::True, } + } else { + ShouldRemoveLookup::False } } Ok(None) => ShouldRemoveLookup::False, @@ -390,26 +376,32 @@ impl BlockLookups { let log = self.log.clone(); - let Some((triggered_parent_request, request_id_ref, request_ref)) = + let Some((pending_parent_requests, request_id_ref, request_ref)) = self.find_single_lookup_request(id, stream_terminator, ResponseType::Blob) else { return; }; let should_remove = match request_ref.verify_blob(blob) { Ok(Some((block_root, blobs))) => { - if triggered_parent_request { - if let LookupDownloadStatus::AvailabilityCheck(e) = - request_ref.add_blobs(block_root, blobs) - { - handle_availability_check_error( - request_id_ref, - request_ref, - ResponseType::Blob, - peer_id, - e, - cx, - &log, - ) + if let Some(parent_components) = request_ref.unknown_parent_components.as_mut() { + parent_components.add_unknown_parent_blobs(blobs); + + if !pending_parent_requests { + request_ref + .get_downloaded_block() + .map(|block| { + match self.send_block_for_processing( + block_root, + block, + seen_timestamp, + BlockProcessType::SingleBlock { id }, + cx, + ) { + Ok(()) => ShouldRemoveLookup::False, + Err(()) => ShouldRemoveLookup::True, + } + }) + .unwrap_or(ShouldRemoveLookup::False) } else { ShouldRemoveLookup::False } @@ -525,40 +517,41 @@ impl BlockLookups { match parent_lookup.verify_block(block, &mut self.failed_chains) { Ok(Some((block_root, block))) => { - let process_or_search = parent_lookup.add_block(block_root, block); - match process_or_search { - LookupDownloadStatus::Process(wrapper) => { - let chain_hash = parent_lookup.chain_hash(); - if self - .send_block_for_processing( - block_root, - wrapper, - seen_timestamp, - BlockProcessType::ParentLookup { chain_hash }, - cx, - ) - .is_ok() - { - self.parent_lookups.push(parent_lookup) - } + parent_lookup.add_current_request_block(block); + if let Some(block_wrapper) = + parent_lookup.current_parent_request.get_downloaded_block() + { + let chain_hash = parent_lookup.chain_hash(); + if self + .send_block_for_processing( + block_root, + block_wrapper, + seen_timestamp, + BlockProcessType::ParentLookup { chain_hash }, + cx, + ) + .is_ok() + { + self.parent_lookups.push(parent_lookup) } - LookupDownloadStatus::SearchBlock(block_root) => { - if let Some(peer_source) = - parent_lookup.peer_source(ResponseType::Block, peer_id) - { - self.search_block(block_root, peer_source, cx); - self.parent_lookups.push(parent_lookup) - } else { - warn!(self.log, "Response from untracked peer"; "peer_id" => %peer_id, "block_root" => ?block_root); + } else { + let outstanding_blobs_req = + parent_lookup.current_parent_blob_request_id.is_some(); + if !outstanding_blobs_req { + if let Ok(peer_id) = parent_lookup + .current_parent_request + .downloading_peer(ResponseType::Blob) { + cx.report_peer( + peer_id.to_peer_id(), + PeerAction::MidToleranceError, + "bbroot_failed_chains", + ); } + + self.request_parent_blob(parent_lookup, cx); + } else { + self.parent_lookups.push(parent_lookup) } - LookupDownloadStatus::AvailabilityCheck(e) => self.handle_invalid_block( - BlockError::AvailabilityCheck(e), - peer_id, - cx, - ResponseType::Block, - parent_lookup, - ), } } Ok(None) => { @@ -629,11 +622,9 @@ impl BlockLookups { seen_timestamp: Duration, cx: &mut SyncNetworkContext, ) { - let mut parent_lookup = if let Some(pos) = self - .parent_lookups - .iter() - .position(|request| request.pending_blob_response(id)) - { + let mut parent_lookup = if let Some(pos) = self.parent_lookups.iter().position(|request| { + request.pending_blob_response(id) + }) { self.parent_lookups.remove(pos) } else { if blob.is_some() { @@ -644,40 +635,25 @@ impl BlockLookups { match parent_lookup.verify_blob(blob, &mut self.failed_chains) { Ok(Some((block_root, blobs))) => { - let processed_or_search = parent_lookup.add_blobs(block_root, blobs); - match processed_or_search { - LookupDownloadStatus::Process(wrapper) => { - let chain_hash = parent_lookup.chain_hash(); - if self - .send_block_for_processing( - block_root, - wrapper, - seen_timestamp, - BlockProcessType::ParentLookup { chain_hash }, - cx, - ) - .is_ok() - { - self.parent_lookups.push(parent_lookup) - } + parent_lookup.add_current_request_blobs(blobs); + let chain_hash = parent_lookup.chain_hash(); + if let Some(block_wrapper) = + parent_lookup.current_parent_request.get_downloaded_block() + { + if self + .send_block_for_processing( + block_root, + block_wrapper, + seen_timestamp, + BlockProcessType::ParentLookup { chain_hash }, + cx, + ) + .is_ok() + { + self.parent_lookups.push(parent_lookup) } - LookupDownloadStatus::SearchBlock(block_root) => { - if let Some(peer_source) = - parent_lookup.peer_source(ResponseType::Block, peer_id) - { - self.search_block(block_root, peer_source, cx); - self.parent_lookups.push(parent_lookup) - } else { - warn!(self.log, "Response from untracked peer"; "peer_id" => %peer_id, "block_root" => ?block_root); - } - } - LookupDownloadStatus::AvailabilityCheck(e) => self.handle_invalid_block( - BlockError::AvailabilityCheck(e), - peer_id, - cx, - ResponseType::Blob, - parent_lookup, - ), + } else { + self.parent_lookups.push(parent_lookup) } } Ok(None) => { @@ -865,7 +841,7 @@ impl BlockLookups { /* Processing responses */ - pub fn single_block_processed( + pub fn single_block_component_processed( &mut self, target_id: Id, result: BlockProcessingResult, @@ -874,19 +850,21 @@ impl BlockLookups { ) { let lookup_components_opt = self.single_block_lookups.iter_mut().enumerate().find_map( |(index, (block_id_opt, blob_id_opt, req))| { - let id_filter = |id: &&mut Id| -> bool { target_id == **id }; - - let block_id = block_id_opt.as_mut().filter(id_filter); - let blob_id = blob_id_opt.as_mut().filter(id_filter); - block_id.or(blob_id).map(|id_ref| (index, id_ref, req)) + let id_filter = |id: &Id| -> bool { target_id == *id }; + let block_matches = block_id_opt.as_ref().map(id_filter).unwrap_or(false); + let blob_matches = blob_id_opt.as_ref().map(id_filter).unwrap_or(false); + if !block_matches && !blob_matches { + return None; + } + Some((index, block_id_opt, blob_id_opt, req)) }, ); - let (index, request_id_ref, request_ref) = match lookup_components_opt { + let (index, block_id_ref, blob_id_ref, request_ref) = match lookup_components_opt { Some(req) => req, None => { return debug!( self.log, - "Block processed for single block lookup not present" + "Block component processed for single block lookup not present" ); } }; @@ -906,8 +884,115 @@ impl BlockLookups { ShouldRemoveLookup::True } AvailabilityProcessingStatus::MissingComponents(_, block_root) => { - self.search_block(block_root, peer_id, cx); - ShouldRemoveLookup::False + // if peer should have both, and missing components is received after we've + // processed the opposite, then we can downscore. + let should_remove = match response_type { + ResponseType::Block => { + if request_ref.blob_request_state.component_processed { + match request_ref.processing_peer(ResponseType::Blob) { + Ok(PeerShouldHave::BlockAndBlobs(other_peer)) => { + cx.report_peer( + peer_id.to_peer_id(), + PeerAction::MidToleranceError, + "single_block_failure", + ); + if let Some(blob_id_ref) = blob_id_ref { + // Try it again if possible. + retry_request_after_failure( + blob_id_ref, + request_ref, + ResponseType::Blob, + peer_id.as_peer_id(), + cx, + &self.log, + ) + } else { + ShouldRemoveLookup::False + } + } + Ok(PeerShouldHave::Neither(other_peer)) => { + request_ref + .blob_request_state + .remove_peer_if_useless(&other_peer); + if let Some(blob_id_ref) = blob_id_ref { + // Try it again if possible. + retry_request_after_failure( + blob_id_ref, + request_ref, + ResponseType::Blob, + peer_id.as_peer_id(), + cx, + &self.log, + ) + } else { + ShouldRemoveLookup::False + } + } + Err(()) => { + //TODO(sean) retry? + ShouldRemoveLookup::False + } + } + } else { + request_ref.block_request_state.component_processed = true; + ShouldRemoveLookup::False + } + } + ResponseType::Blob => { + if request_ref.block_request_state.component_processed { + match request_ref.processing_peer(ResponseType::Blob) { + Ok(PeerShouldHave::BlockAndBlobs(other_peer)) => { + cx.report_peer( + other_peer, + PeerAction::MidToleranceError, + "single_block_failure", + ); + if let Some(blob_id_ref) = blob_id_ref { + // Try it again if possible. + retry_request_after_failure( + blob_id_ref, + request_ref, + ResponseType::Blob, + peer_id.as_peer_id(), + cx, + &self.log, + ) + } else { + ShouldRemoveLookup::False + } + } + Ok(PeerShouldHave::Neither(other_peer)) => { + request_ref + .blob_request_state + .remove_peer_if_useless(&other_peer); + if let Some(blob_id_ref) = blob_id_ref { + // Try it again if possible. + retry_request_after_failure( + blob_id_ref, + request_ref, + ResponseType::Blob, + peer_id.as_peer_id(), + cx, + &self.log, + ) + } else { + ShouldRemoveLookup::False + } + } + Err(()) => { + //TODO(sean) retry? + ShouldRemoveLookup::False + } + } + } else { + // retry block here? + + request_ref.blob_request_state.component_processed = true; + ShouldRemoveLookup::False + } + } + }; + should_remove } }, BlockProcessingResult::Ignored => { @@ -960,15 +1045,29 @@ impl BlockLookups { PeerAction::MidToleranceError, "single_block_failure", ); - // Try it again if possible. - retry_request_after_failure( - request_id_ref, - request_ref, - response_type, - peer_id.as_peer_id(), - cx, - &self.log, - ) + if let Some(blob_id_ref) = blob_id_ref { + // Try it again if possible. + retry_request_after_failure( + blob_id_ref, + request_ref, + ResponseType::Blob, + peer_id.as_peer_id(), + cx, + &self.log, + ) + } else if let Some(block_id_ref) = block_id_ref { + // Try it again if possible. + retry_request_after_failure( + block_id_ref, + request_ref, + ResponseType::Block, + peer_id.as_peer_id(), + cx, + &self.log, + ) + } else { + ShouldRemoveLookup::True + } } } } @@ -1041,7 +1140,7 @@ impl BlockLookups { self.search_block(block_root, peer_id, cx); } BlockProcessingResult::Err(BlockError::ParentUnknown(block)) => { - parent_lookup.add_block_wrapper(block); + parent_lookup.add_unknown_parent_block(block); self.request_parent_block_and_blobs(parent_lookup, cx); } BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(_)) @@ -1057,10 +1156,22 @@ impl BlockLookups { ); } }; - let (chain_hash, blocks, hashes, block_request) = + let (chain_hash, mut blocks, hashes, block_request) = parent_lookup.parts_for_processing(); + if let Some(child_block) = + self.single_block_lookups + .iter_mut() + .find_map(|(_, _, req)| { + if req.requested_block_root == chain_hash { + req.get_downloaded_block() + } else { + None + } + }) + { + blocks.push(child_block); + }; let process_id = ChainSegmentProcessId::ParentLookup(chain_hash); - let work = WorkEvent::chain_segment(process_id, blocks); match beacon_processor_send.try_send(work) { @@ -1090,13 +1201,7 @@ impl BlockLookups { ); } BlockProcessingResult::Err(outcome) => { - self.handle_invalid_block( - outcome, - peer_id.to_peer_id(), - cx, - response_type, - parent_lookup, - ); + self.handle_invalid_block(outcome, peer_id.to_peer_id(), cx, parent_lookup); } BlockProcessingResult::Ignored => { // Beacon processor signalled to ignore the block processing result. @@ -1120,7 +1225,6 @@ impl BlockLookups { outcome: BlockError<::EthSpec>, peer_id: PeerId, cx: &mut SyncNetworkContext, - response_type: ResponseType, mut parent_lookup: ParentLookup, ) { // all else we consider the chain a failure and downvote the peer that sent @@ -1135,16 +1239,11 @@ impl BlockLookups { // ambiguity. cx.report_peer(peer_id, PeerAction::MidToleranceError, "parent_request_err"); // Try again if possible - match response_type { - ResponseType::Block => { - parent_lookup.block_processing_failed(); - self.request_parent_block(parent_lookup, cx); - } - ResponseType::Blob => { - parent_lookup.blob_processing_failed(); - self.request_parent_blob(parent_lookup, cx); - } - } + //TODO(sean) I don't see why we want to retry here, because this block is invalid and has + // passed block root validation, so querying again by block root will yield the same result. + parent_lookup.block_processing_failed(); + parent_lookup.blob_processing_failed(); + self.request_parent_block_and_blobs(parent_lookup, cx); } pub fn parent_chain_processed( @@ -1262,6 +1361,10 @@ impl BlockLookups { process_type: BlockProcessType, cx: &mut SyncNetworkContext, ) -> Result<(), ()> { + let blob_count = blobs.iter().filter(|b| b.is_some()).count(); + if blob_count == 0 { + return Ok(()); + } match cx.processor_channel_if_enabled() { Some(beacon_processor_send) => { trace!(self.log, "Sending blobs for processing"; "block" => ?block_root, "process_type" => ?process_type); @@ -1385,35 +1488,7 @@ impl BlockLookups { } } -fn handle_availability_check_error( - request_id_ref: &mut u32, - request_ref: &mut SingleBlockLookup, - response_type: ResponseType, - peer_id: PeerId, - e: AvailabilityCheckError, - cx: &mut SyncNetworkContext, - log: &Logger, -) -> ShouldRemoveLookup { - warn!(log, "Peer sent message in single block lookup causing failed availability check"; - "root" => ?request_ref.requested_block_root, - "error" => ?e, - "peer_id" => %peer_id, - "response_type" => ?response_type - ); - cx.report_peer( - peer_id, - PeerAction::MidToleranceError, - "single_block_failure", - ); - retry_request_after_failure( - request_id_ref, - request_ref, - ResponseType::Blob, - &peer_id, - cx, - log, - ) -} + fn handle_block_lookup_verify_error( request_id_ref: &mut u32, diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index 1441d26062..996d7561f6 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -1,6 +1,7 @@ use super::single_block_lookup::{LookupRequestError, LookupVerifyError, SingleBlockLookup}; use super::{DownloadedBlocks, PeerShouldHave, ResponseType}; -use crate::sync::block_lookups::{single_block_lookup, RootBlobsTuple, RootBlockTuple}; +use crate::sync::block_lookups::single_block_lookup::{State, UnknownParentComponents}; +use crate::sync::block_lookups::{RootBlobsTuple, RootBlockTuple}; use crate::sync::{ manager::{Id, SLOT_IMPORT_TOLERANCE}, network_context::SyncNetworkContext, @@ -32,8 +33,8 @@ pub(crate) struct ParentLookup { /// Request of the last parent. pub current_parent_request: SingleBlockLookup, /// Id of the last parent request. - current_parent_request_id: Option, - current_parent_blob_request_id: Option, + pub current_parent_request_id: Option, + pub current_parent_blob_request_id: Option, } #[derive(Debug, PartialEq, Eq, IntoStaticStr)] @@ -62,6 +63,7 @@ pub enum RequestError { NoPeers, } +#[derive(Debug)] pub enum LookupDownloadStatus { Process(BlockWrapper), SearchBlock(Hash256), @@ -93,7 +95,8 @@ impl ParentLookup { peer_id: PeerShouldHave, da_checker: Arc>, ) -> Self { - let current_parent_request = SingleBlockLookup::new(parent_root, peer_id, da_checker); + let current_parent_request = + SingleBlockLookup::new(parent_root, Some(<_>::default()), peer_id, da_checker); Self { chain_hash: block_root, @@ -165,39 +168,44 @@ impl ParentLookup { .check_peer_disconnected(peer_id) } - pub fn add_block_wrapper(&mut self, block: BlockWrapper) { + pub fn add_unknown_parent_block(&mut self, block: BlockWrapper) { let next_parent = block.parent_root(); + + // Cache the block. let current_root = self.current_parent_request.requested_block_root; - self.downloaded_blocks.push((current_root, block)); + + // Update the block request. self.current_parent_request.requested_block_root = next_parent; - - self.current_parent_request.block_request_state.state = - single_block_lookup::State::AwaitingDownload; - self.current_parent_request.blob_request_state.state = - single_block_lookup::State::AwaitingDownload; + self.current_parent_request.block_request_state.state = State::AwaitingDownload; self.current_parent_request_id = None; + + // Update the blobs request. + self.current_parent_request.blob_request_state.state = State::AwaitingDownload; self.current_parent_blob_request_id = None; - self.current_parent_request.downloaded_block = None; - self.current_parent_request.downloaded_blobs = <_>::default(); + + // Reset the unknown parent components. + self.current_parent_request.unknown_parent_components = + Some(UnknownParentComponents::default()); } - pub fn add_block( + pub fn add_current_request_block( &mut self, - block_root: Hash256, block: Arc>, - ) -> LookupDownloadStatus { + ) { + // Cache the block. + self.current_parent_request.add_unknown_parent_block(block); + + // Update the request. self.current_parent_request_id = None; - self.current_parent_request.add_block(block_root, block) } - pub fn add_blobs( - &mut self, - block_root: Hash256, - blobs: FixedBlobSidecarList, - ) -> LookupDownloadStatus { + pub fn add_current_request_blobs(&mut self, blobs: FixedBlobSidecarList) { + // Cache the blobs. + self.current_parent_request.add_unknown_parent_blobs(blobs); + + // Update the request. self.current_parent_blob_request_id = None; - self.current_parent_request.add_blobs(block_root, blobs) } pub fn pending_block_response(&self, req_id: Id) -> bool { @@ -258,7 +266,10 @@ impl ParentLookup { self.current_parent_request .block_request_state .register_failure_processing(); - self.current_parent_request.downloaded_block = None; + self.current_parent_request + .unknown_parent_components + .as_mut() + .map(|components| components.downloaded_block = None); self.current_parent_request_id = None; } @@ -266,8 +277,10 @@ impl ParentLookup { self.current_parent_request .blob_request_state .register_failure_processing(); - //TODO(sean) can make this only clear the blobs that failed to process - self.current_parent_request.downloaded_blobs = <_>::default(); + self.current_parent_request + .unknown_parent_components + .as_mut() + .map(|components| components.downloaded_blobs = <_>::default()); self.current_parent_blob_request_id = None; } @@ -351,15 +364,6 @@ impl ParentLookup { .iter(), } } - - pub(crate) fn peer_source( - &self, - response_type: ResponseType, - peer_id: PeerId, - ) -> Option { - self.current_parent_request - .peer_source(response_type, peer_id) - } } impl From for ParentVerifyError { diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index b4aaaaba66..e68a7e7f81 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -1,4 +1,3 @@ -use crate::sync::block_lookups::parent_lookup::LookupDownloadStatus; use crate::sync::block_lookups::{RootBlobsTuple, RootBlockTuple}; use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::data_availability_checker::DataAvailabilityChecker; @@ -8,28 +7,55 @@ use lighthouse_network::{rpc::BlocksByRootRequest, PeerId}; use rand::seq::IteratorRandom; use ssz_types::VariableList; use std::collections::HashSet; +use std::ops::IndexMut; use std::sync::Arc; use store::Hash256; use strum::IntoStaticStr; use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList}; -use types::{BlobSidecar, SignedBeaconBlock}; +use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; use super::{PeerShouldHave, ResponseType}; pub struct SingleBlockLookup { pub requested_block_root: Hash256, pub requested_ids: Vec, - pub downloaded_blobs: FixedBlobSidecarList, - pub downloaded_block: Option>>, - pub expected_num_blobs: Option, + /// Where we store blobs until we receive the stream terminator. + pub blob_download_queue: FixedBlobSidecarList, pub block_request_state: SingleLookupRequestState, pub blob_request_state: SingleLookupRequestState, pub da_checker: Arc>, + /// Only necessary for requests triggered by an `UnkownParent` because any + /// blocks or blobs without parents won't hit the data availability cache. + pub unknown_parent_components: Option>, } -/// Object representing a single block lookup request. -/// -//previously assumed we would have a single block. Now we may have the block but not the blobs +#[derive(Default)] +pub struct UnknownParentComponents { + pub downloaded_block: Option>>, + pub downloaded_blobs: FixedBlobSidecarList, +} + +impl UnknownParentComponents { + pub fn add_unknown_parent_block(&mut self, block: Arc>) { + self.downloaded_block = Some(block); + } + pub fn add_unknown_parent_blobs(&mut self, blobs: FixedBlobSidecarList) { + for (index, blob_opt) in self.downloaded_blobs.iter_mut().enumerate() { + if let Some(Some(downloaded_blob)) = blobs.get(index) { + *blob_opt = Some(downloaded_blob.clone()); + } + } + } + pub fn downloaded_indices(&self) -> HashSet { + self.downloaded_blobs + .iter() + .enumerate() + .filter_map(|(i, blob_opt)| blob_opt.as_ref().map(|_| i)) + .collect::>() + } +} + +/// Object representing the state of a single block or blob lookup request. #[derive(PartialEq, Eq, Debug)] pub struct SingleLookupRequestState { /// State of this request. @@ -44,6 +70,7 @@ pub struct SingleLookupRequestState { failed_processing: u8, /// How many times have we attempted to download this block or blob. failed_downloading: u8, + pub component_processed: bool, } #[derive(Debug, PartialEq, Eq)] @@ -81,100 +108,87 @@ pub enum LookupRequestError { impl SingleBlockLookup { pub fn new( requested_block_root: Hash256, + unknown_parent_components: Option>, peer_source: PeerShouldHave, da_checker: Arc>, ) -> Self { Self { requested_block_root, requested_ids: <_>::default(), - downloaded_block: None, - downloaded_blobs: <_>::default(), - expected_num_blobs: None, + blob_download_queue: <_>::default(), block_request_state: SingleLookupRequestState::new(peer_source), blob_request_state: SingleLookupRequestState::new(peer_source), da_checker, + unknown_parent_components, } } + pub fn update_blobs_request(&mut self) { + self.requested_ids = if let Some(components) = self.unknown_parent_components.as_ref() { + let blobs = components.downloaded_indices(); + self.da_checker + .get_missing_blob_ids( + self.requested_block_root, + components.downloaded_block.as_ref(), + Some(blobs), + ) + .unwrap_or_default() + } else { + self.da_checker + .get_missing_blob_ids_checking_cache(self.requested_block_root) + .unwrap_or_default() + }; + } + pub fn get_downloaded_block(&mut self) -> Option> { - if self.requested_ids.is_empty() { - if let Some(block) = self.downloaded_block.take() { - return Some(BlockWrapper::BlockAndBlobs( - block, - self.downloaded_blobs.clone(), - )); - } - } - None - } - - pub fn add_blob( - &mut self, - blob: Arc>, - ) -> Result, LookupVerifyError> { - let block_root = blob.block_root; - - if let Some(blob_opt) = self.downloaded_blobs.get_mut(blob.index as usize) { - *blob_opt = Some(blob.clone()); - - if let Some(block) = self.downloaded_block.as_ref() { - let result = self.da_checker.wrap_block( - block_root, - block.clone(), - self.downloaded_blobs.clone(), + self.unknown_parent_components + .as_mut() + .and_then(|components| { + let downloaded_block = components.downloaded_block.as_ref(); + let downloaded_indices = components.downloaded_indices(); + let missing_ids = self.da_checker.get_missing_blob_ids( + self.requested_block_root, + downloaded_block, + Some(downloaded_indices), ); - Ok(LookupDownloadStatus::from(result)) - } else { - Ok(LookupDownloadStatus::SearchBlock(block_root)) - } + let download_complete = + missing_ids.map_or(true, |missing_ids| missing_ids.is_empty()); + if download_complete { + let UnknownParentComponents { + downloaded_block, + downloaded_blobs, + } = components; + downloaded_block.as_ref().map(|block| { + BlockWrapper::BlockAndBlobs( + block.clone(), + std::mem::replace(downloaded_blobs, FixedBlobSidecarList::default()), + ) + }) + } else { + None + } + }) + } + + pub fn add_unknown_parent_block(&mut self, block: Arc>) { + if let Some(ref mut components) = self.unknown_parent_components.as_mut() { + components.add_unknown_parent_block(block) } else { - Err(LookupVerifyError::InvalidIndex(blob.index)) + self.unknown_parent_components = Some(UnknownParentComponents { + downloaded_block: Some(block), + downloaded_blobs: FixedBlobSidecarList::default(), + }) } } - pub fn add_blobs( - &mut self, - block_root: Hash256, - blobs: FixedBlobSidecarList, - ) -> LookupDownloadStatus { - for (index, blob_opt) in self.downloaded_blobs.iter_mut().enumerate() { - if let Some(Some(downloaded_blob)) = blobs.get(index) { - *blob_opt = Some(downloaded_blob.clone()); - } - } - - if let Some(block) = self.downloaded_block.as_ref() { - self.da_checker - .wrap_block(block_root, block.clone(), blobs) - .into() + pub fn add_unknown_parent_blobs(&mut self, blobs: FixedBlobSidecarList) { + if let Some(ref mut components) = self.unknown_parent_components.as_mut() { + components.add_unknown_parent_blobs(blobs) } else { - LookupDownloadStatus::SearchBlock(block_root) - } - } - - pub fn add_block( - &mut self, - block_root: Hash256, - block: Arc>, - ) -> LookupDownloadStatus { - self.downloaded_block = Some(block.clone()); - - self.da_checker - .wrap_block(block_root, block, self.downloaded_blobs.clone()) - .into() - } - - pub fn add_block_wrapper( - &mut self, - block_root: Hash256, - block: BlockWrapper, - ) -> LookupDownloadStatus { - match block { - BlockWrapper::Block(block) => self.add_block(block_root, block), - BlockWrapper::BlockAndBlobs(block, blobs) => { - self.downloaded_block = Some(block); - self.add_blobs(block_root, blobs) - } + self.unknown_parent_components = Some(UnknownParentComponents { + downloaded_block: None, + downloaded_blobs: blobs, + }) } } @@ -202,12 +216,6 @@ impl SingleBlockLookup SingleBlockLookup SingleBlockLookup { - let downloaded_expected_blobs = self.downloaded_expected_blobs(); - if peer_source.should_have_blobs() && !downloaded_expected_blobs { - return Err(LookupVerifyError::NotEnoughBlobsReturned); - } else if !downloaded_expected_blobs { - return Err(LookupVerifyError::BenignFailure); - } self.blob_request_state.state = State::Processing { peer_id: peer_source, }; Ok(Some(( self.requested_block_root, - self.downloaded_blobs.clone(), + std::mem::replace(&mut self.blob_download_queue, <_>::default()), ))) } }, @@ -297,24 +299,17 @@ impl SingleBlockLookup bool { - let Some(expected_num_blobs) = self.expected_num_blobs else { - return true; - }; - - let downloaded_num_blobs = self - .downloaded_blobs - .iter() - .filter(|blob_opt| blob_opt.is_some()) - .count(); - downloaded_num_blobs == expected_num_blobs - } - pub fn request_block( &mut self, ) -> Result, LookupRequestError> { - if self.da_checker.has_block(&self.requested_block_root) || self.downloaded_block.is_some() - { + let block_already_downloaded = + if let Some(components) = self.unknown_parent_components.as_ref() { + components.downloaded_block.is_some() + } else { + self.da_checker.has_block(&self.requested_block_root) + }; + + if block_already_downloaded { return Ok(None); } @@ -365,10 +360,9 @@ impl SingleBlockLookup Result, LookupRequestError> { - let missing_ids = self - .da_checker - .get_missing_blob_ids(&self.requested_block_root); - if missing_ids.is_empty() || self.downloaded_block.is_some() { + self.update_blobs_request(); + + if self.requested_ids.is_empty() { return Ok(None); } @@ -388,11 +382,10 @@ impl SingleBlockLookup SingleBlockLookup SingleBlockLookup Option { + pub fn downloading_peer(&self, response_type: ResponseType) -> Result { match response_type { - ResponseType::Block => { - if self.block_request_state.available_peers.contains(&peer_id) { - Some(PeerShouldHave::BlockAndBlobs(peer_id)) - } else if self.block_request_state.potential_peers.contains(&peer_id) { - Some(PeerShouldHave::Neither(peer_id)) - } else { - None - } - } - ResponseType::Blob => { - if self.blob_request_state.available_peers.contains(&peer_id) { - Some(PeerShouldHave::BlockAndBlobs(peer_id)) - } else if self.blob_request_state.potential_peers.contains(&peer_id) { - Some(PeerShouldHave::Neither(peer_id)) - } else { - None - } - } + ResponseType::Block => self.block_request_state.peer(), + ResponseType::Blob => self.blob_request_state.peer(), } } } @@ -489,6 +461,7 @@ impl SingleLookupRequestState { used_peers: HashSet::default(), failed_processing: 0, failed_downloading: 0, + component_processed: false, } } @@ -543,6 +516,14 @@ impl SingleLookupRequestState { } } + pub fn peer(&self) -> Result { + match &self.state { + State::Processing { peer_id } => Ok(*peer_id), + State::Downloading { peer_id } => Ok(*peer_id), + _ => Err(()), + } + } + pub fn remove_peer_if_useless(&mut self, peer_id: &PeerId) { if !self.available_peers.is_empty() || self.potential_peers.len() > 1 { self.potential_peers.remove(peer_id); @@ -634,7 +615,8 @@ mod tests { Duration::from_secs(spec.seconds_per_slot), ); let da_checker = Arc::new(DataAvailabilityChecker::new(slot_clock, None, spec)); - let mut sl = SingleBlockLookup::<4, T>::new(block.canonical_root(), peer_id, da_checker); + let mut sl = + SingleBlockLookup::<4, T>::new(block.canonical_root(), None, peer_id, da_checker); sl.request_block().unwrap(); sl.verify_block(Some(block.into())).unwrap().unwrap(); } @@ -653,8 +635,12 @@ mod tests { let da_checker = Arc::new(DataAvailabilityChecker::new(slot_clock, None, spec)); - let mut sl = - SingleBlockLookup::::new(block.canonical_root(), peer_id, da_checker); + let mut sl = SingleBlockLookup::::new( + block.canonical_root(), + None, + peer_id, + da_checker, + ); for _ in 1..FAILURES { sl.request_block().unwrap(); sl.block_request_state.register_failure_downloading(); diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 70bf2243e7..a89eca2c28 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -16,7 +16,7 @@ use execution_layer::BlobsBundleV1; pub use genesis::{interop_genesis_state, DEFAULT_ETH1_BLOCK_HASH}; use lighthouse_network::rpc::RPCResponseErrorCode; use lighthouse_network::{NetworkGlobals, Request}; -use slot_clock::TestingSlotClock; +use slot_clock::{SlotClock, TestingSlotClock}; use std::time::Duration; use store::MemoryStore; use tokio::sync::mpsc; @@ -52,6 +52,11 @@ impl TestRig { .logger(log.clone()) .deterministic_keypairs(1) .fresh_ephemeral_store() + .testing_slot_clock(TestingSlotClock::new( + Slot::new(0), + Duration::from_secs(0), + Duration::from_secs(12), + )) .build(); let chain = harness.chain.clone(); @@ -115,7 +120,7 @@ impl TestRig { &self.harness.chain.kzg.as_ref().unwrap(), ) .unwrap(); - //TODO(sean) maybe we want to keep other random txs ? + payload.execution_payload.transactions = <_>::default(); for tx in Vec::from(transactions) { payload.execution_payload.transactions.push(tx).unwrap(); @@ -236,6 +241,14 @@ impl TestRig { ); } + #[track_caller] + fn expect_empty_beacon_processor(&mut self) { + assert_eq!( + self.beacon_processor_rx.try_recv().expect_err("must err"), + mpsc::error::TryRecvError::Empty + ); + } + #[track_caller] pub fn expect_penalty(&mut self) { match self.network_rx.try_recv() { @@ -253,6 +266,22 @@ impl TestRig { *block.message_mut().parent_root_mut() = parent_root; block } + + pub fn block_with_parent_and_blobs( + &mut self, + parent_root: Hash256, + fork_name: ForkName, + num_blobs: NumBlobs, + ) -> (SignedBeaconBlock, Vec>) { + let (mut block, mut blobs) = self.rand_block_and_blobs(fork_name, num_blobs); + *block.message_mut().parent_root_mut() = parent_root; + let block_root = block.canonical_root(); + blobs.iter_mut().for_each(|blob| { + blob.block_parent_root = parent_root; + blob.block_root = block_root; + }); + (block, blobs) + } } #[test] @@ -288,7 +317,7 @@ fn test_single_block_lookup_happy_path() { // Send the stream termination. Peer should have not been penalized, and the request removed // after processing. bl.single_block_lookup_response(id, peer_id, None, D, &mut cx); - bl.single_block_processed( + bl.single_block_component_processed( id, BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), response_type, @@ -421,7 +450,7 @@ fn test_single_block_lookup_becomes_parent_request() { // Send the stream termination. Peer should have not been penalized, and the request moved to a // parent request after processing. - bl.single_block_processed( + bl.single_block_component_processed( id, BlockError::ParentUnknown(block.into()).into(), response_type, @@ -797,8 +826,11 @@ fn test_parent_lookup_too_many_processing_attempts_must_blacklist() { } // Now fail processing a block in the parent request - for _ in 0..PROCESSING_FAILURES { + for i in 0..PROCESSING_FAILURES { let id = dbg!(rig.expect_parent_request(response_type)); + if matches!(fork_name, ForkName::Deneb) && i != 0 { + let _ = rig.expect_parent_request(ResponseType::Blob); + } // If we're in deneb, a blob request should have been triggered as well, // we don't require a response because we're generateing 0-blob blocks in this test. assert!(!bl.failed_chains.contains(&block_root)); @@ -939,7 +971,7 @@ fn test_single_block_lookup_ignored_response() { // after processing. bl.single_block_lookup_response(id, peer_id, None, D, &mut cx); // Send an Ignored response, the request should be dropped - bl.single_block_processed(id, BlockProcessingResult::Ignored, response_type, &mut cx); + bl.single_block_component_processed(id, BlockProcessingResult::Ignored, response_type, &mut cx); rig.expect_empty_network(); assert_eq!(bl.single_block_lookups.len(), 0); } @@ -1100,7 +1132,13 @@ fn test_same_chain_race_condition() { mod deneb_only { use super::*; - use beacon_chain::blob_verification::BlobError; + use beacon_chain::blob_verification::{BlobError, MaybeAvailableBlock}; + use beacon_chain::data_availability_checker::AvailabilityPendingBlock; + use beacon_chain::ExecutedBlock::AvailabilityPending; + use beacon_chain::IntoExecutionPendingBlock; + use beacon_chain::PayloadVerificationOutcome; + use beacon_chain::{AvailabilityPendingExecutedBlock, NotifyExecutionLayer}; + use std::ops::IndexMut; use std::str::FromStr; struct DenebTester { @@ -1109,10 +1147,12 @@ mod deneb_only { rig: TestRig, block: Option>>, blobs: Vec>>, + parent_block: Option>>, + parent_blobs: Vec>>, peer_id: PeerId, block_req_id: Option, parent_block_req_id: Option, - blob_req_id: u32, + blob_req_id: Option, parent_blob_req_id: Option, slot: Slot, block_root: Hash256, @@ -1137,11 +1177,15 @@ mod deneb_only { ); let (block, blobs) = rig.rand_block_and_blobs(fork_name, NumBlobs::Random); let block = Arc::new(block); - let blobs = blobs.into_iter().map(Arc::new).collect::>(); + let slot = block.slot(); + let mut block_root = block.canonical_root(); + let mut block = Some(block); + let mut blobs = blobs.into_iter().map(Arc::new).collect::>(); + + let mut parent_block = None; + let mut parent_blobs = vec![]; let peer_id = PeerId::random(); - let slot = block.slot(); - let block_root = block.canonical_root(); // Trigger the request let (block_req_id, blob_req_id, parent_block_req_id, parent_blob_req_id) = @@ -1154,38 +1198,79 @@ mod deneb_only { ); let block_req_id = rig.expect_block_request(ResponseType::Block); let blob_req_id = rig.expect_block_request(ResponseType::Blob); - (Some(block_req_id), blob_req_id, None, None) + (Some(block_req_id), Some(blob_req_id), None, None) } RequestTrigger::GossipUnknownParentBlock => { - bl.search_current_unknown_parent( + let (child_block, child_blobs) = rig.block_with_parent_and_blobs( block_root, - BlockWrapper::Block(block.clone()), + get_fork_name(), + NumBlobs::Random, + ); + parent_block = Some(Arc::new(child_block)); + parent_blobs = child_blobs.into_iter().map(Arc::new).collect::>(); + std::mem::swap(&mut parent_block, &mut block); + std::mem::swap(&mut parent_blobs, &mut blobs); + + let child_block = block.as_ref().expect("block").clone(); + let child_root = child_block.canonical_root(); + let parent_root = block_root; + block_root = child_root; + bl.search_current_unknown_parent_block_and_blobs( + child_root, + Some(child_block), + None, peer_id, &mut cx, ); + let blob_req_id = rig.expect_block_request(ResponseType::Blob); rig.expect_empty_network(); // expect no block request - bl.search_parent(slot, block_root, block.parent_root(), peer_id, &mut cx); + bl.search_parent(slot, child_root, parent_root, peer_id, &mut cx); let parent_block_req_id = rig.expect_parent_request(ResponseType::Block); let parent_blob_req_id = rig.expect_parent_request(ResponseType::Blob); ( None, - blob_req_id, + Some(blob_req_id), Some(parent_block_req_id), Some(parent_blob_req_id), ) } RequestTrigger::GossipUnknownParentBlob => { - let blob = blobs.first().cloned().unwrap(); - bl.search_current_unknown_blob_parent(blob, peer_id, &mut cx); + let (child_block, child_blobs) = rig.block_with_parent_and_blobs( + block_root, + get_fork_name(), + NumBlobs::Random, + ); + + parent_block = Some(Arc::new(child_block)); + parent_blobs = child_blobs.into_iter().map(Arc::new).collect::>(); + std::mem::swap(&mut parent_block, &mut block); + std::mem::swap(&mut parent_blobs, &mut blobs); + + let child_blob = blobs.first().cloned().unwrap(); + let parent_root = block_root; + let child_root = child_blob.block_root; + block_root = child_root; + + let mut blobs = FixedBlobSidecarList::default(); + *blobs.index_mut(0) = Some(child_blob); + bl.search_current_unknown_parent_block_and_blobs( + child_root, + None, + Some(blobs), + peer_id, + &mut cx, + ); + let block_req_id = rig.expect_block_request(ResponseType::Block); - let blob_req_id = rig.expect_block_request(ResponseType::Blob); - bl.search_parent(slot, block_root, block.parent_root(), peer_id, &mut cx); + let blobs_req_id = rig.expect_block_request(ResponseType::Blob); + rig.expect_empty_network(); // expect no block request + bl.search_parent(slot, child_root, parent_root, peer_id, &mut cx); let parent_block_req_id = rig.expect_parent_request(ResponseType::Block); let parent_blob_req_id = rig.expect_parent_request(ResponseType::Blob); ( Some(block_req_id), - blob_req_id, + Some(blobs_req_id), Some(parent_block_req_id), Some(parent_blob_req_id), ) @@ -1194,7 +1279,7 @@ mod deneb_only { bl.search_block(block_root, PeerShouldHave::Neither(peer_id), &mut cx); let block_req_id = rig.expect_block_request(ResponseType::Block); let blob_req_id = rig.expect_block_request(ResponseType::Blob); - (Some(block_req_id), blob_req_id, None, None) + (Some(block_req_id), Some(blob_req_id), None, None) } }; @@ -1202,8 +1287,10 @@ mod deneb_only { bl, cx, rig, - block: Some(block), + block, blobs, + parent_block, + parent_blobs, peer_id, block_req_id, parent_block_req_id, @@ -1214,6 +1301,53 @@ mod deneb_only { }) } + fn parent_block_response(mut self) -> Self { + self.rig.expect_empty_network(); + self.bl.parent_lookup_response( + self.parent_block_req_id.expect("parent request id"), + self.peer_id, + self.parent_block.clone(), + D, + &mut self.cx, + ); + + assert_eq!(self.bl.parent_lookups.len(), 1); + self + } + + fn parent_blob_response(mut self) -> Self { + for blob in &self.parent_blobs { + dbg!("sendingblob"); + self.bl.parent_lookup_blob_response( + self.parent_blob_req_id.expect("parent blob request id"), + self.peer_id, + Some(blob.clone()), + D, + &mut self.cx, + ); + assert_eq!(self.bl.parent_lookups.len(), 1); + } + dbg!("sending stream terminator"); + self.bl.parent_lookup_blob_response( + self.parent_blob_req_id.expect("blob request id"), + self.peer_id, + None, + D, + &mut self.cx, + ); + + self + } + + fn block_response_triggering_process(mut self) -> Self { + let mut me = self.block_response(); + me.rig.expect_block_process(ResponseType::Block); + + // The request should still be active. + assert_eq!(me.bl.single_block_lookups.len(), 1); + me + } + fn block_response(mut self) -> Self { // The peer provides the correct block, should not be penalized. Now the block should be sent // for processing. @@ -1225,7 +1359,6 @@ mod deneb_only { &mut self.cx, ); self.rig.expect_empty_network(); - self.rig.expect_block_process(ResponseType::Block); // The request should still be active. assert_eq!(self.bl.single_block_lookups.len(), 1); @@ -1235,7 +1368,7 @@ mod deneb_only { fn blobs_response(mut self) -> Self { for blob in &self.blobs { self.bl.single_blob_lookup_response( - self.blob_req_id, + self.blob_req_id.expect("blob request id"), self.peer_id, Some(blob.clone()), D, @@ -1243,9 +1376,8 @@ mod deneb_only { ); assert_eq!(self.bl.single_block_lookups.len(), 1); } - // Send the blob stream termination. Peer should have not been penalized. self.bl.single_blob_lookup_response( - self.blob_req_id, + self.blob_req_id.expect("blob request id"), self.peer_id, None, D, @@ -1256,7 +1388,14 @@ mod deneb_only { fn blobs_response_was_valid(mut self) -> Self { self.rig.expect_empty_network(); - self.rig.expect_block_process(ResponseType::Blob); + if self.blobs.len() > 0 { + self.rig.expect_block_process(ResponseType::Blob); + } + self + } + + fn expect_empty_beacon_processor(mut self) -> Self { + self.rig.expect_empty_beacon_processor(); self } @@ -1273,7 +1412,29 @@ mod deneb_only { fn empty_blobs_response(mut self) -> Self { self.bl.single_blob_lookup_response( - self.blob_req_id, + self.blob_req_id.expect("blob request id"), + self.peer_id, + None, + D, + &mut self.cx, + ); + self + } + + fn empty_parent_block_response(mut self) -> Self { + self.bl.parent_lookup_response( + self.parent_block_req_id.expect("block request id"), + self.peer_id, + None, + D, + &mut self.cx, + ); + self + } + + fn empty_parent_blobs_response(mut self) -> Self { + self.bl.parent_lookup_blob_response( + self.parent_blob_req_id.expect("blob request id"), self.peer_id, None, D, @@ -1285,7 +1446,7 @@ mod deneb_only { fn block_imported(mut self) -> Self { // Missing blobs should be the request is not removed, the outstanding blobs request should // mean we do not send a new request. - self.bl.single_block_processed( + self.bl.single_block_component_processed( self.block_req_id.expect("block request id"), BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(self.block_root)), ResponseType::Block, @@ -1296,8 +1457,44 @@ mod deneb_only { self } + fn parent_block_imported(mut self) -> Self { + self.bl.parent_block_processed( + self.block_root, + BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(self.block_root)), + ResponseType::Block, + &mut self.cx, + ); + self.rig.expect_empty_network(); + assert_eq!(self.bl.parent_lookups.len(), 0); + self + } + + fn parent_block_unknown_parent(mut self) -> Self { + self.bl.parent_block_processed( + self.block_root, + BlockProcessingResult::Err(BlockError::ParentUnknown(BlockWrapper::Block( + self.parent_block.clone().expect("parent block"), + ))), + ResponseType::Block, + &mut self.cx, + ); + assert_eq!(self.bl.parent_lookups.len(), 1); + self + } + + fn invalid_parent_processed(mut self) -> Self { + self.bl.parent_block_processed( + self.block_root, + BlockProcessingResult::Err(BlockError::ProposalSignatureInvalid), + ResponseType::Block, + &mut self.cx, + ); + assert_eq!(self.bl.parent_lookups.len(), 1); + self + } + fn invalid_block_processed(mut self) -> Self { - self.bl.single_block_processed( + self.bl.single_block_component_processed( self.block_req_id.expect("block request id"), BlockProcessingResult::Err(BlockError::ProposalSignatureInvalid), ResponseType::Block, @@ -1308,8 +1505,8 @@ mod deneb_only { } fn invalid_blob_processed(mut self) -> Self { - self.bl.single_block_processed( - self.blob_req_id, + self.bl.single_block_component_processed( + self.blob_req_id.expect("blob request id"), BlockProcessingResult::Err(BlockError::BlobValidation( BlobError::ProposerSignatureInvalid, )), @@ -1321,7 +1518,7 @@ mod deneb_only { } fn missing_components_from_block_request(mut self) -> Self { - self.bl.single_block_processed( + self.bl.single_block_component_processed( self.block_req_id.expect("block request id"), BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( self.slot, @@ -1335,8 +1532,8 @@ mod deneb_only { } fn missing_components_from_blob_request(mut self) -> Self { - self.bl.single_block_processed( - self.blob_req_id, + self.bl.single_block_component_processed( + self.blob_req_id.expect("blob request id"), BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( self.slot, self.block_root, @@ -1357,11 +1554,23 @@ mod deneb_only { self } fn expect_block_request(mut self) -> Self { - self.rig.expect_block_request(ResponseType::Block); + let id = self.rig.expect_block_request(ResponseType::Block); + self.block_req_id = Some(id); self } fn expect_blobs_request(mut self) -> Self { - self.rig.expect_block_request(ResponseType::Blob); + let id = self.rig.expect_block_request(ResponseType::Blob); + self.blob_req_id = Some(id); + self + } + fn expect_parent_block_request(mut self) -> Self { + let id = self.rig.expect_parent_request(ResponseType::Block); + self.parent_block_req_id = Some(id); + self + } + fn expect_parent_blobs_request(mut self) -> Self { + let id = self.rig.expect_parent_request(ResponseType::Blob); + self.parent_blob_req_id = Some(id); self } fn expect_no_blobs_request(mut self) -> Self { @@ -1381,6 +1590,14 @@ mod deneb_only { self.blobs.push(first_blob); self } + fn expect_parent_chain_process(mut self) -> Self { + self.rig.expect_parent_chain_process(); + self + } + fn expect_block_process(mut self) -> Self { + self.rig.expect_block_process(ResponseType::Block); + self + } } fn get_fork_name() -> ForkName { @@ -1403,7 +1620,7 @@ mod deneb_only { }; tester - .block_response() + .block_response_triggering_process() .blobs_response() .blobs_response_was_valid() .block_imported(); @@ -1418,7 +1635,7 @@ mod deneb_only { tester .blobs_response() .blobs_response_was_valid() - .block_response() + .block_response_triggering_process() .block_imported(); } @@ -1434,9 +1651,12 @@ mod deneb_only { .expect_block_request() .expect_no_blobs_request() .empty_blobs_response() + .expect_empty_beacon_processor() .expect_no_penalty() .expect_no_block_request() - .expect_no_blobs_request(); + .expect_no_blobs_request() + .block_response_triggering_process() + .missing_components_from_block_request(); } #[test] @@ -1446,9 +1666,10 @@ mod deneb_only { }; tester - .block_response() + .block_response_triggering_process() .missing_components_from_block_request() .empty_blobs_response() + .missing_components_from_blob_request() .expect_penalty() .expect_blobs_request() .expect_no_block_request(); @@ -1480,7 +1701,7 @@ mod deneb_only { }; tester - .block_response() + .block_response_triggering_process() .invalid_block_processed() .expect_penalty() .expect_block_request() @@ -1499,7 +1720,7 @@ mod deneb_only { }; tester - .block_response() + .block_response_triggering_process() .missing_components_from_block_request() .blobs_response() .invalid_blob_processed() @@ -1515,9 +1736,11 @@ mod deneb_only { }; tester - .block_response() + .block_response_triggering_process() + .missing_components_from_block_request() .invalidate_blobs_too_few() .blobs_response() + .missing_components_from_blob_request() .expect_penalty() .expect_blobs_request() .expect_no_block_request(); @@ -1530,7 +1753,7 @@ mod deneb_only { }; tester - .block_response() + .block_response_triggering_process() .invalidate_blobs_too_many() .blobs_response() .expect_penalty() @@ -1546,12 +1769,11 @@ mod deneb_only { tester .invalidate_blobs_too_few() .blobs_response() - // No way to know if the response was valid before we've seen the block. .blobs_response_was_valid() .expect_no_penalty() .expect_no_blobs_request() .expect_no_block_request() - .block_response(); + .block_response_triggering_process(); } #[test] @@ -1566,7 +1788,7 @@ mod deneb_only { .expect_penalty() .expect_blobs_request() .expect_no_block_request() - .block_response(); + .block_response_triggering_process(); } #[test] @@ -1576,7 +1798,7 @@ mod deneb_only { }; tester - .block_response() + .block_response_triggering_process() .blobs_response() .blobs_response_was_valid() .block_imported(); @@ -1591,7 +1813,7 @@ mod deneb_only { tester .blobs_response() .blobs_response_was_valid() - .block_response() + .block_response_triggering_process() .block_imported(); } @@ -1609,7 +1831,9 @@ mod deneb_only { .empty_blobs_response() .expect_no_penalty() .expect_no_block_request() - .expect_no_blobs_request(); + .expect_no_blobs_request() + .block_response_triggering_process() + .missing_components_from_block_request(); } #[test] @@ -1619,9 +1843,10 @@ mod deneb_only { }; tester - .block_response() + .block_response_triggering_process() .missing_components_from_block_request() .empty_blobs_response() + .missing_components_from_blob_request() .expect_blobs_request() .expect_no_penalty() .expect_no_block_request(); @@ -1642,7 +1867,6 @@ mod deneb_only { .missing_components_from_blob_request() .empty_block_response() .expect_block_request() - // No penalty because the blob was seen over gossip .expect_no_penalty() .expect_no_blobs_request(); } @@ -1654,7 +1878,7 @@ mod deneb_only { }; tester - .block_response() + .block_response_triggering_process() .invalid_block_processed() .expect_penalty() .expect_block_request() @@ -1673,7 +1897,7 @@ mod deneb_only { }; tester - .block_response() + .block_response_triggering_process() .missing_components_from_block_request() .blobs_response() .invalid_blob_processed() @@ -1689,9 +1913,11 @@ mod deneb_only { }; tester - .block_response() + .block_response_triggering_process() + .missing_components_from_block_request() .invalidate_blobs_too_few() .blobs_response() + .missing_components_from_blob_request() .expect_blobs_request() .expect_no_penalty() .expect_no_block_request(); @@ -1704,7 +1930,7 @@ mod deneb_only { }; tester - .block_response() + .block_response_triggering_process() .invalidate_blobs_too_many() .blobs_response() .expect_penalty() @@ -1720,12 +1946,14 @@ mod deneb_only { tester .invalidate_blobs_too_few() .blobs_response() - // No way to know if the response was valid before we've seen the block. .blobs_response_was_valid() + .missing_components_from_blob_request() .expect_no_penalty() .expect_no_blobs_request() .expect_no_block_request() - .block_response(); + .block_response_triggering_process() + .missing_components_from_block_request() + .expect_blobs_request(); } #[test] @@ -1740,6 +1968,228 @@ mod deneb_only { .expect_penalty() .expect_blobs_request() .expect_no_block_request() - .block_response(); + .block_response_triggering_process(); + } + + #[test] + fn parent_block_unknown_parent() { + let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownParentBlock) else { + return; + }; + + tester + .blobs_response() + .expect_empty_beacon_processor() + .parent_block_response() + .parent_blob_response() + .expect_block_process() + .parent_block_unknown_parent() + .expect_parent_block_request() + .expect_parent_blobs_request() + .expect_empty_beacon_processor(); + } + + #[test] + fn parent_block_invalid_parent() { + let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownParentBlock) else { + return; + }; + + tester + .blobs_response() + .expect_empty_beacon_processor() + .parent_block_response() + .parent_blob_response() + .expect_block_process() + .invalid_parent_processed() + .expect_penalty() + .expect_parent_block_request() + .expect_parent_blobs_request() + .expect_empty_beacon_processor(); + } + + #[test] + fn parent_block_and_blob_lookup_parent_returned_first() { + let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownParentBlock) else { + return; + }; + + tester + .parent_block_response() + .parent_blob_response() + .expect_block_process() + .parent_block_imported() + .blobs_response() + .expect_parent_chain_process(); + } + + #[test] + fn parent_block_and_blob_lookup_child_returned_first() { + let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownParentBlock) else { + return; + }; + + tester + .blobs_response() + .expect_no_penalty() + .expect_no_block_request() + .expect_no_blobs_request() + .parent_block_response() + .parent_blob_response() + .expect_block_process() + .parent_block_imported() + .expect_parent_chain_process(); + } + + #[test] + fn empty_parent_block_then_parent_blob() { + let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownParentBlock) else { + return; + }; + + tester + .empty_parent_block_response() + .expect_penalty() + .expect_parent_block_request() + .expect_no_blobs_request() + .parent_blob_response() + .expect_empty_beacon_processor() + .parent_block_response() + .expect_block_process() + .parent_block_imported() + .blobs_response() + .expect_parent_chain_process(); + } + + #[test] + fn empty_parent_blobs_then_parent_block() { + let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownParentBlock) else { + return; + }; + + tester + .blobs_response() + .empty_parent_blobs_response() + .expect_no_penalty() + .expect_no_blobs_request() + .expect_no_block_request() + .parent_block_response() + .expect_penalty() + .expect_parent_blobs_request() + .parent_blob_response() + .expect_block_process() + .parent_block_imported() + .expect_parent_chain_process(); + } + + #[test] + fn parent_blob_unknown_parent() { + let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownParentBlob) else { + return; + }; + + tester + .block_response() + .expect_empty_beacon_processor() + .parent_block_response() + .parent_blob_response() + .expect_block_process() + .parent_block_unknown_parent() + .expect_parent_block_request() + .expect_parent_blobs_request() + .expect_empty_beacon_processor(); + } + + #[test] + fn parent_blob_invalid_parent() { + let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownParentBlob) else { + return; + }; + + tester + .block_response() + .expect_empty_beacon_processor() + .parent_block_response() + .parent_blob_response() + .expect_block_process() + .invalid_parent_processed() + .expect_penalty() + .expect_parent_block_request() + .expect_parent_blobs_request() + .expect_empty_beacon_processor(); + } + + #[test] + fn parent_block_and_blob_lookup_parent_returned_first_blob_trigger() { + let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownParentBlob) else { + return; + }; + + tester + .parent_block_response() + .parent_blob_response() + .expect_block_process() + .parent_block_imported() + .block_response() + .expect_parent_chain_process(); + } + + #[test] + fn parent_block_and_blob_lookup_child_returned_first_blob_trigger() { + let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownParentBlob) else { + return; + }; + + tester + .block_response() + .expect_no_penalty() + .expect_no_block_request() + .expect_no_blobs_request() + .parent_block_response() + .parent_blob_response() + .expect_block_process() + .parent_block_imported() + .expect_parent_chain_process(); + } + + #[test] + fn empty_parent_block_then_parent_blob_blob_trigger() { + let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownParentBlob) else { + return; + }; + + tester + .empty_parent_block_response() + .expect_penalty() + .expect_parent_block_request() + .expect_no_blobs_request() + .parent_blob_response() + .expect_empty_beacon_processor() + .parent_block_response() + .expect_block_process() + .parent_block_imported() + .block_response() + .expect_parent_chain_process(); + } + + #[test] + fn empty_parent_blobs_then_parent_block_blob_trigger() { + let Some(tester) = DenebTester::new(RequestTrigger::GossipUnknownParentBlob) else { + return; + }; + + tester + .block_response() + .empty_parent_blobs_response() + .expect_no_penalty() + .expect_no_blobs_request() + .expect_no_block_request() + .parent_block_response() + .expect_penalty() + .expect_parent_blobs_request() + .parent_blob_response() + .expect_block_process() + .parent_block_imported() + .expect_parent_chain_process(); } } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 95b8e57627..9a565b9c4e 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -58,11 +58,13 @@ use lighthouse_network::{PeerAction, PeerId}; use slog::{crit, debug, error, info, trace, warn, Logger}; use slot_clock::SlotClock; use std::boxed::Box; +use std::ops::IndexMut; use std::ops::Sub; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; use tokio::time::sleep; +use types::blob_sidecar::FixedBlobSidecarList; use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot}; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync @@ -152,7 +154,7 @@ pub enum SyncMessage { }, /// Block processed - BlockPartProcessed { + BlockComponentProcessed { process_type: BlockProcessType, result: BlockProcessingResult, response_type: ResponseType, @@ -291,6 +293,8 @@ pub fn spawn( sleep(sleep_duration).await; + //TODO(sean) aggregate messages for blobs for the same block + while let Ok(msg) = delayed_lookups_recv.try_recv() { if let Err(e) = sync_send.send(msg) { warn!(log, "Failed to send delayed lookup message"; "error" => ?e); @@ -649,12 +653,15 @@ impl SyncManager { warn!(self.log, "Delayed lookups dropped for block"; "block_root" => ?block_root, "error" => ?e); } } else { - self.block_lookups.search_current_unknown_parent( - block_root, - block, - peer_id, - &mut self.network, - ); + let (block, blobs) = block.deconstruct(); + self.block_lookups + .search_current_unknown_parent_block_and_blobs( + block_root, + Some(block), + blobs, + peer_id, + &mut self.network, + ); self.block_lookups.search_parent( block_slot, block_root, @@ -671,6 +678,7 @@ impl SyncManager { if self.synced_and_connected_within_tolerance(blob_slot, &peer_id) { let block_root = blob.block_root; let parent_root = blob.block_parent_root; + let blob_index = blob.index; if self.should_delay_lookup(blob_slot) { if let Err(e) = self @@ -680,11 +688,16 @@ impl SyncManager { warn!(self.log, "Delayed lookups dropped for blob"; "block_root" => ?block_root, "error" => ?e); } } else { - self.block_lookups.search_current_unknown_blob_parent( - blob, - peer_id, - &mut self.network, - ); + let mut blobs = FixedBlobSidecarList::default(); + *blobs.index_mut(blob_index as usize) = Some(blob); + self.block_lookups + .search_current_unknown_parent_block_and_blobs( + block_root, + None, + Some(blobs), + peer_id, + &mut self.network, + ); } self.block_lookups.search_parent( blob_slot, @@ -732,17 +745,14 @@ impl SyncManager { request_id, error, } => self.inject_error(peer_id, request_id, error), - SyncMessage::BlockPartProcessed { + SyncMessage::BlockComponentProcessed { process_type, result, response_type, } => match process_type { - BlockProcessType::SingleBlock { id } => self.block_lookups.single_block_processed( - id, - result, - response_type, - &mut self.network, - ), + BlockProcessType::SingleBlock { id } => self + .block_lookups + .single_block_component_processed(id, result, response_type, &mut self.network), BlockProcessType::ParentLookup { chain_hash } => self .block_lookups .parent_block_processed(chain_hash, result, response_type, &mut self.network), diff --git a/consensus/types/src/signed_beacon_block.rs b/consensus/types/src/signed_beacon_block.rs index 58810150c2..8f0394ed93 100644 --- a/consensus/types/src/signed_beacon_block.rs +++ b/consensus/types/src/signed_beacon_block.rs @@ -1,3 +1,4 @@ +use crate::blob_sidecar::BlobIdentifier; use crate::*; use bls::Signature; use derivative::Derivative; @@ -242,6 +243,38 @@ impl> SignedBeaconBlock pub fn canonical_root(&self) -> Hash256 { self.message().tree_hash_root() } + + pub fn num_expected_blobs(&self) -> usize { + self.message() + .body() + .blob_kzg_commitments() + .map(|c| c.len()) + .unwrap_or(0) + } + + pub fn get_expected_blob_ids(&self, block_root: Option) -> Vec { + self.get_filtered_blob_ids(block_root, |_, _| true) + } + + /// If the filter returns `true` the id for the corresponding index and root will be included. + pub fn get_filtered_blob_ids( + &self, + block_root: Option, + filter: impl Fn(usize, Hash256) -> bool, + ) -> Vec { + let block_root = block_root.unwrap_or_else(|| self.canonical_root()); + let num_blobs_expected = self.num_expected_blobs(); + let mut blob_ids = Vec::with_capacity(num_blobs_expected); + for i in 0..num_blobs_expected { + if filter(i, block_root) { + blob_ids.push(BlobIdentifier { + block_root, + index: i as u64, + }); + } + } + blob_ids + } } // We can convert pre-Bellatrix blocks without payloads into blocks with payloads.