diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 1a7432a4bf..6cc35e6e87 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -188,7 +188,6 @@ pub enum AvailabilityProcessingStatus { Imported(Hash256), } -//TODO(sean) using this in tests for now impl TryInto for AvailabilityProcessingStatus { type Error = (); @@ -2676,9 +2675,13 @@ impl BeaconChain { // The block was imported successfully. } AvailabilityProcessingStatus::MissingComponents(slot, block_root) => { + warn!(self.log, "Blobs missing in response to range request"; + "block_root" => ?block_root, "slot" => slot); return ChainSegmentResult::Failed { imported_blocks, - error: BlockError::MissingBlockParts(slot, block_root), + error: BlockError::AvailabilityCheck( + AvailabilityCheckError::MissingBlobs, + ), }; } } diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index 9c213a68af..6132515f18 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -505,17 +505,6 @@ pub enum MaybeAvailableBlock { AvailabilityPending(AvailabilityPendingBlock), } -impl TryInto> for MaybeAvailableBlock { - type Error = AvailabilityCheckError; - - fn try_into(self) -> Result, Self::Error> { - match self { - Self::Available(block) => Ok(block), - Self::AvailabilityPending(_block) => Err(AvailabilityCheckError::MissingBlobs), - } - } -} - /// Trait for common block operations. pub trait AsBlock { fn slot(&self) -> Slot; diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 3b974a8c49..1ba0a30e25 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -150,10 +150,7 @@ pub enum BlockError { /// its parent. ParentUnknown(BlockWrapper), /// The block skips too many slots and is a DoS risk. - TooManySkippedSlots { - parent_slot: Slot, - block_slot: Slot, - }, + TooManySkippedSlots { parent_slot: Slot, block_slot: Slot }, /// The block slot is greater than the present slot. /// /// ## Peer scoring @@ -168,10 +165,7 @@ pub enum BlockError { /// ## Peer scoring /// /// The peer has incompatible state transition logic and is faulty. - StateRootMismatch { - block: Hash256, - local: Hash256, - }, + StateRootMismatch { block: Hash256, local: Hash256 }, /// The block was a genesis block, these blocks cannot be re-imported. GenesisBlock, /// The slot is finalized, no need to import. @@ -190,9 +184,7 @@ pub enum BlockError { /// /// It's unclear if this block is valid, but it conflicts with finality and shouldn't be /// imported. - NotFinalizedDescendant { - block_parent_root: Hash256, - }, + NotFinalizedDescendant { block_parent_root: Hash256 }, /// Block is already known, no need to re-import. /// /// ## Peer scoring @@ -205,10 +197,7 @@ pub enum BlockError { /// /// The `proposer` has already proposed a block at this slot. The existing block may or may not /// be equal to the given block. - RepeatProposal { - proposer: u64, - slot: Slot, - }, + RepeatProposal { proposer: u64, slot: Slot }, /// The block slot exceeds the MAXIMUM_BLOCK_SLOT_NUMBER. /// /// ## Peer scoring @@ -223,10 +212,7 @@ pub enum BlockError { /// ## Peer scoring /// /// The block is invalid and the peer is faulty. - IncorrectBlockProposer { - block: u64, - local_shuffling: u64, - }, + IncorrectBlockProposer { block: u64, local_shuffling: u64 }, /// The proposal signature in invalid. /// /// ## Peer scoring @@ -250,10 +236,7 @@ pub enum BlockError { /// ## Peer scoring /// /// The block is invalid and the peer is faulty. - BlockIsNotLaterThanParent { - block_slot: Slot, - parent_slot: Slot, - }, + BlockIsNotLaterThanParent { block_slot: Slot, parent_slot: Slot }, /// At least one block in the chain segment did not have it's parent root set to the root of /// the prior block. /// @@ -309,14 +292,11 @@ pub enum BlockError { /// If it's actually our fault (e.g. our execution node database is corrupt) we have bigger /// problems to worry about than losing peers, and we're doing the network a favour by /// disconnecting. - ParentExecutionPayloadInvalid { - parent_root: Hash256, - }, - /// The blob alone failed validation. + ParentExecutionPayloadInvalid { parent_root: Hash256 }, + /// A blob alone failed validation. BlobValidation(BlobError), /// The block and blob together failed validation. AvailabilityCheck(AvailabilityCheckError), - MissingBlockParts(Slot, Hash256), } impl From> for BlockError { diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index c1e27be742..2bf7bb0dfd 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -32,15 +32,14 @@ pub const OVERFLOW_LRU_CAPACITY: usize = 1024; #[derive(Debug, IntoStaticStr)] pub enum AvailabilityCheckError { Kzg(KzgError), - KzgVerificationFailed, KzgNotInitialized, + KzgVerificationFailed, SszTypes(ssz_types::Error), - MissingBlobs, NumBlobsMismatch { - /// The peer sent us an invalid block, we must penalise harshly. num_kzg_commitments: usize, num_blobs: usize, }, + MissingBlobs, TxKzgCommitmentMismatch(String), KzgCommitmentMismatch { blob_index: u64, @@ -53,10 +52,6 @@ pub enum AvailabilityCheckError { block_root: Hash256, blob_block_root: Hash256, }, - UnorderedBlobs { - expected_index: u64, - blob_index: u64, - }, } impl From for AvailabilityCheckError { @@ -135,9 +130,7 @@ impl DataAvailabilityChecker { &self, block_root: Hash256, ) -> Option> { - let (block, blob_indices) = self - .availability_cache - .get_missing_blob_ids_checking_cache(block_root); + let (block, blob_indices) = self.availability_cache.get_missing_blob_info(block_root); self.get_missing_blob_ids(block_root, block.as_ref(), Some(blob_indices)) } @@ -507,7 +500,7 @@ impl AvailabilityPendingBlock { } /// Verifies an AvailabilityPendingBlock against a set of KZG verified blobs. - /// This does not check whether a block *should* have blobs, these checks should must have been + /// This does not check whether a block *should* have blobs, these checks should have been /// completed when producing the `AvailabilityPendingBlock`. pub fn make_available( self, diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 384ff9a667..42d0ac8235 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -13,6 +13,8 @@ use std::{collections::HashSet, sync::Arc}; use types::blob_sidecar::BlobIdentifier; use types::{BlobSidecar, Epoch, EthSpec, Hash256, SignedBeaconBlock}; +type MissingBlobInfo = (Option>>, HashSet); + /// Caches partially available blobs and execution verified blocks corresponding /// to a given `block_hash` that are received over gossip. /// @@ -199,7 +201,7 @@ impl OverflowStore { &self, blob_id: &BlobIdentifier, ) -> Result>>, AvailabilityCheckError> { - let key = OverflowKey::from_blob_id::(blob_id.clone())?; + let key = OverflowKey::from_blob_id::(*blob_id)?; self.0 .hot_db @@ -331,11 +333,7 @@ impl OverflowLRUCache { pub fn has_block(&self, block_root: &Hash256) -> bool { self.critical.read().has_block(block_root) } - - pub fn get_missing_blob_ids_checking_cache( - &self, - block_root: Hash256, - ) -> (Option>>, HashSet) { + pub fn get_missing_blob_info(&self, block_root: Hash256) -> MissingBlobInfo { self.critical .read() .in_memory @@ -497,11 +495,12 @@ impl OverflowLRUCache { payload_verification_outcome, } = executed_block; - let verified_blobs = Vec::from(pending_components.verified_blobs) + let Some(verified_blobs) = Vec::from(pending_components.verified_blobs) .into_iter() .take(num_blobs_expected) - .map(|maybe_blob| maybe_blob.ok_or(AvailabilityCheckError::MissingBlobs)) - .collect::, _>>()?; + .collect::>>() else { + return Ok(Availability::MissingComponents(import_data.block_root)) + }; let available_block = block.make_available(verified_blobs)?; Ok(Availability::Available(Box::new( diff --git a/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs b/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs index f20808b896..96514a15ff 100644 --- a/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs +++ b/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs @@ -587,7 +587,7 @@ impl ExecutionBlockGenerator { // get random number between 0 and Max Blobs let num_blobs = rand::random::() % T::max_blobs_per_block(); let kzg = self.kzg.as_ref().ok_or("kzg not initialized")?; - let (bundle, transactions) = generate_random_blobs(num_blobs, &kzg)?; + let (bundle, transactions) = generate_random_blobs(num_blobs, kzg)?; for tx in Vec::from(transactions) { execution_payload .transactions_mut() diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 31b698c5ca..a817de1f95 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -989,9 +989,7 @@ impl Worker { ); return None; } - Err(e @ BlockError::BlobValidation(_)) - | Err(e @ BlockError::MissingBlockParts(_, _)) - | Err(e @ BlockError::AvailabilityCheck(_)) => { + Err(e @ BlockError::BlobValidation(_)) | Err(e @ BlockError::AvailabilityCheck(_)) => { warn!(self.log, "Could not verify block against known blobs in gossip. Rejecting the block"; "error" => %e); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index b28e48c842..06a9ff31ad 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -1,5 +1,5 @@ use beacon_chain::blob_verification::{AsBlock, BlockWrapper}; -use beacon_chain::data_availability_checker::DataAvailabilityChecker; +use beacon_chain::data_availability_checker::{AvailabilityCheckError, DataAvailabilityChecker}; use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; use lighthouse_network::rpc::RPCError; use lighthouse_network::{PeerAction, PeerId}; @@ -26,7 +26,7 @@ use super::{ }; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent}; use crate::metrics; -use crate::sync::block_lookups::single_block_lookup::UnknownParentComponents; +use crate::sync::block_lookups::single_block_lookup::{LookupId, UnknownParentComponents}; mod parent_lookup; mod single_block_lookup; @@ -50,15 +50,7 @@ pub(crate) struct BlockLookups { /// A cache of failed chain lookups to prevent duplicate searches. failed_chains: LRUTimeCache, - /// A collection of block hashes being searched for and a flag indicating if a result has been - /// received or not. - /// - /// The flag allows us to determine if the peer returned data or sent us nothing. - single_block_lookups: Vec<( - Option, - Option, - SingleBlockLookup, - )>, + single_block_lookups: Vec>, da_checker: Arc>, @@ -82,6 +74,9 @@ impl From for StreamTerminator { } } +pub type BlockRequestId = Id; +pub type BlobRequestId = Id; + #[derive(Debug, Copy, Clone)] pub enum ResponseType { Block, @@ -142,7 +137,7 @@ impl BlockLookups { peer_source: PeerShouldHave, cx: &mut SyncNetworkContext, ) { - self.search_block_with(block_root, None, None, peer_source, cx) + self.search_block_with(block_root, None, None, &[peer_source], cx) } pub fn search_current_unknown_parent_block_and_blobs( @@ -150,16 +145,10 @@ impl BlockLookups { block_root: Hash256, block: Option>>, blobs: Option>, - peer_id: PeerId, + peer_source: &[PeerShouldHave], cx: &mut SyncNetworkContext, ) { - self.search_block_with( - block_root, - block, - blobs, - PeerShouldHave::Neither(peer_id), - cx, - ); + self.search_block_with(block_root, block, blobs, peer_source, cx); } /// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is @@ -169,22 +158,22 @@ impl BlockLookups { block_root: Hash256, block: Option>>, blobs: Option>, - peer_source: PeerShouldHave, + peer_source: &[PeerShouldHave], cx: &mut SyncNetworkContext, ) { // Do not re-request a block that is already being requested if self .single_block_lookups .iter_mut() - .any(|(_, _, single_block_request)| { - single_block_request.add_peer_if_useful(&block_root, peer_source) + .any(|single_block_request| { + single_block_request.add_peers_if_useful(&block_root, peer_source) }) { return; } if self.parent_lookups.iter_mut().any(|parent_req| { - parent_req.add_peer_if_useful(&block_root, peer_source) + parent_req.add_peers_if_useful(&block_root, peer_source) || parent_req.contains_block(&block_root) }) { // If the block was already downloaded, or is being downloaded in this moment, do not @@ -224,23 +213,8 @@ impl BlockLookups { peer_source, self.da_checker.clone(), ); - - let block_request_id = - if let Ok(Some((peer_id, block_request))) = single_block_request.request_block() { - cx.single_block_lookup_request(peer_id, block_request).ok() - } else { - None - }; - - let blob_request_id = - if let Ok(Some((peer_id, blob_request))) = single_block_request.request_blobs() { - cx.single_blobs_lookup_request(peer_id, blob_request).ok() - } else { - None - }; - - self.single_block_lookups - .push((block_request_id, blob_request_id, single_block_request)); + single_block_request.request_block_and_blobs(cx); + self.single_block_lookups.push(single_block_request); metrics::set_gauge( &metrics::SYNC_SINGLE_BLOCK_LOOKUPS, @@ -272,7 +246,7 @@ impl BlockLookups { // being searched for. if self.parent_lookups.iter_mut().any(|parent_req| { parent_req.contains_block(&block_root) - || parent_req.add_peer_if_useful(&block_root, peer_source) + || parent_req.add_peers_if_useful(&block_root, &[peer_source]) }) { // we are already searching for this block, ignore it return; @@ -309,7 +283,7 @@ impl BlockLookups { let stream_terminator = block.is_none().into(); let log = self.log.clone(); - let Some((pending_parent_request, request_id_ref, request_ref)) = self.find_single_lookup_request(id, stream_terminator, ResponseType::Block) else { + let Some((has_pending_parent_request, request_ref)) = self.find_single_lookup_request(id, stream_terminator, ResponseType::Block) else { return; }; @@ -319,7 +293,7 @@ impl BlockLookups { parent_components.add_unknown_parent_block(block.clone()); }; - if !pending_parent_request { + if !has_pending_parent_request { let block_wrapper = request_ref .get_downloaded_block() .unwrap_or(BlockWrapper::Block(block)); @@ -340,7 +314,6 @@ impl BlockLookups { } Ok(None) => ShouldRemoveLookup::False, Err(e) => handle_block_lookup_verify_error( - request_id_ref, request_ref, ResponseType::Block, peer_id, @@ -352,7 +325,7 @@ impl BlockLookups { if matches!(should_remove, ShouldRemoveLookup::True) { self.single_block_lookups - .retain(|(block_id, _, _)| block_id != &Some(id)); + .retain(|req| req.id.block_request_id != Some(id)); } metrics::set_gauge( @@ -373,7 +346,7 @@ impl BlockLookups { let log = self.log.clone(); - let Some((pending_parent_requests, request_id_ref, request_ref)) = + let Some((has_pending_parent_requests, request_ref)) = self.find_single_lookup_request(id, stream_terminator, ResponseType::Blob) else { return; }; @@ -383,7 +356,7 @@ impl BlockLookups { if let Some(parent_components) = request_ref.unknown_parent_components.as_mut() { parent_components.add_unknown_parent_blobs(blobs); - if !pending_parent_requests { + if !has_pending_parent_requests { request_ref .get_downloaded_block() .map(|block| { @@ -418,7 +391,6 @@ impl BlockLookups { } Ok(None) => ShouldRemoveLookup::False, Err(e) => handle_block_lookup_verify_error( - request_id_ref, request_ref, ResponseType::Blob, peer_id, @@ -430,7 +402,7 @@ impl BlockLookups { if matches!(should_remove, ShouldRemoveLookup::True) { self.single_block_lookups - .retain(|(_, blob_id, _)| blob_id != &Some(id)); + .retain(|req| req.id.blob_request_id != Some(id)); } metrics::set_gauge( @@ -446,36 +418,29 @@ impl BlockLookups { response_type: ResponseType, ) -> Option<( bool, - &mut Id, &mut SingleBlockLookup, )> { - let lookup = - self.single_block_lookups - .iter_mut() - .find_map(|(block_id_opt, blob_id_opt, req)| { - let id_opt = match response_type { - ResponseType::Block => block_id_opt, - ResponseType::Blob => blob_id_opt, - }; - if let Some(lookup_id) = id_opt { - if *lookup_id == target_id { - // Only send for processing if we don't have parent requests that were triggered by - // this block. - let triggered_parent_request = self - .parent_lookups - .iter() - .any(|lookup| lookup.chain_hash() == req.requested_block_root); + let lookup = self.single_block_lookups.iter_mut().find_map(|req| { + let id_opt = match response_type { + ResponseType::Block => req.id.block_request_id, + ResponseType::Blob => req.id.blob_request_id, + }; + if let Some(lookup_id) = id_opt { + if lookup_id == target_id { + // Only send for processing if we don't have parent requests that were triggered by + // this block. + let triggered_parent_request = self.parent_lookups.iter().any(|lookup| { + lookup.chain_hash() == req.block_request_state.requested_block_root + }); - return Some((triggered_parent_request, lookup_id, req)); - } - } - None - }); - - let (triggered_parent_request, id_ref, request) = match lookup { - Some((triggered_parent_request, id_ref, req)) => { - (triggered_parent_request, id_ref, req) + return Some((triggered_parent_request, req)); + } } + None + }); + + let (triggered_parent_request, request) = match lookup { + Some((triggered_parent_request, req)) => (triggered_parent_request, req), None => { if matches!(stream_terminator, StreamTerminator::False,) { debug!( @@ -487,7 +452,7 @@ impl BlockLookups { return None; } }; - Some((triggered_parent_request, id_ref, request)) + Some((triggered_parent_request, request)) } /// Process a response received from a parent lookup request. @@ -532,8 +497,11 @@ impl BlockLookups { self.parent_lookups.push(parent_lookup) } } else { - let outstanding_blobs_req = - parent_lookup.current_parent_blob_request_id.is_some(); + let outstanding_blobs_req = parent_lookup + .current_parent_request + .id + .blob_request_id + .is_some(); if !outstanding_blobs_req { if let Ok(peer_id) = parent_lookup .current_parent_request @@ -691,28 +659,15 @@ impl BlockLookups { /* Error responses */ pub fn peer_disconnected(&mut self, peer_id: &PeerId, cx: &mut SyncNetworkContext) { - self.single_block_lookups - .retain_mut(|(block_id_opt, blob_id_opt, req)| { - let should_remove_block = should_remove_disconnected_peer( - block_id_opt, - ResponseType::Block, - peer_id, - cx, - req, - &self.log, - ); - let should_remove_blob = should_remove_disconnected_peer( - blob_id_opt, - ResponseType::Blob, - peer_id, - cx, - req, - &self.log, - ); + self.single_block_lookups.retain_mut(|req| { + let should_remove_block = + should_remove_disconnected_peer(ResponseType::Block, peer_id, cx, req, &self.log); + let should_remove_blob = + should_remove_disconnected_peer(ResponseType::Blob, peer_id, cx, req, &self.log); - matches!(should_remove_block, ShouldRemoveLookup::False) - && matches!(should_remove_blob, ShouldRemoveLookup::False) - }); + matches!(should_remove_block, ShouldRemoveLookup::False) + && matches!(should_remove_blob, ShouldRemoveLookup::False) + }); /* Check disconnection for parent lookups */ while let Some(pos) = self.parent_lookups.iter_mut().position(|req| { @@ -775,32 +730,29 @@ impl BlockLookups { error: RPCError, ) { let msg = error.as_static_str(); - self.single_block_lookups - .retain_mut(|(block_id_opt, blob_id_opt, req)| { - let should_remove_block = should_remove_failed_lookup( - block_id_opt, - ResponseType::Block, - id, - msg, - peer_id, - cx, - req, - &self.log, - ); - let should_remove_blob = should_remove_failed_lookup( - blob_id_opt, - ResponseType::Blob, - id, - msg, - peer_id, - cx, - req, - &self.log, - ); + self.single_block_lookups.retain_mut(|req| { + let should_remove_block = should_remove_failed_lookup( + id, + ResponseType::Block, + msg, + peer_id, + cx, + req, + &self.log, + ); + let should_remove_blob = should_remove_failed_lookup( + id, + ResponseType::Blob, + msg, + peer_id, + cx, + req, + &self.log, + ); - matches!(should_remove_block, ShouldRemoveLookup::False) - && matches!(should_remove_blob, ShouldRemoveLookup::False) - }); + matches!(should_remove_block, ShouldRemoveLookup::False) + && matches!(should_remove_blob, ShouldRemoveLookup::False) + }); metrics::set_gauge( &metrics::SYNC_SINGLE_BLOCK_LOOKUPS, @@ -817,14 +769,16 @@ impl BlockLookups { response_type: ResponseType, cx: &mut SyncNetworkContext, ) { - let lookup_components_opt = self.single_block_lookups.iter_mut().enumerate().find_map( - |(index, (block_id_opt, blob_id_opt, req))| { - let block_match = block_id_opt.as_ref() == Some(&target_id); - let blob_match = blob_id_opt.as_ref() == Some(&target_id); - (block_match || blob_match).then_some((index, block_id_opt, blob_id_opt, req)) - }, - ); - let (index, block_id_ref, blob_id_ref, request_ref) = match lookup_components_opt { + let lookup_components_opt = + self.single_block_lookups + .iter_mut() + .enumerate() + .find_map(|(index, req)| { + let block_match = req.id.block_request_id.as_ref() == Some(&target_id); + let blob_match = req.id.blob_request_id.as_ref() == Some(&target_id); + (block_match || blob_match).then_some((index, req)) + }); + let (index, request_ref) = match lookup_components_opt { Some(req) => req, None => { return debug!( @@ -834,7 +788,7 @@ impl BlockLookups { } }; - let root = request_ref.requested_block_root; + let root = request_ref.block_request_state.requested_block_root; let peer_id = request_ref.processing_peer(response_type); let peer_id = match peer_id { @@ -849,13 +803,7 @@ impl BlockLookups { ShouldRemoveLookup::True } AvailabilityProcessingStatus::MissingComponents(_, _block_root) => { - should_remove_missing_components( - request_ref, - response_type, - blob_id_ref, - cx, - &self.log, - ) + should_remove_missing_components(request_ref, response_type, cx, &self.log) } }, BlockProcessingResult::Ignored => { @@ -902,42 +850,42 @@ impl BlockLookups { ); ShouldRemoveLookup::True } + BlockError::AvailabilityCheck( + AvailabilityCheckError::KzgVerificationFailed, + ) + | BlockError::AvailabilityCheck(AvailabilityCheckError::Kzg(_)) + | BlockError::BlobValidation(_) => { + warn!(self.log, "Availability check failed"; "root" => %root, "error" => ?e, "peer_id" => %peer_id); + + // Try it again if possible. + retry_request_after_failure( + request_ref, + ResponseType::Blob, + peer_id.as_peer_id(), + cx, + &self.log, + ) + } other => { warn!(self.log, "Peer sent invalid block in single block lookup"; "root" => %root, "error" => ?other, "peer_id" => %peer_id); - cx.report_peer( - peer_id.to_peer_id(), - PeerAction::MidToleranceError, - "single_block_failure", - ); - if matches!(response_type, ResponseType::Blob) - || (!request_ref.blob_request_state.component_processed - && !request_ref.downloading(ResponseType::Blob)) - { - retry_request_after_failure( - blob_id_ref.as_mut().unwrap(), - request_ref, - ResponseType::Blob, - peer_id.as_peer_id(), - cx, - &self.log, - ); - } + if let Ok(block_peer) = request_ref.processing_peer(ResponseType::Block) { + cx.report_peer( + block_peer.to_peer_id(), + PeerAction::MidToleranceError, + "single_block_failure", + ); - if matches!(response_type, ResponseType::Block) - || (!request_ref.block_request_state.component_processed - && !request_ref.downloading(ResponseType::Block)) - { - // Try it again if possible. - retry_request_after_failure( - block_id_ref.as_mut().unwrap(), - request_ref, - ResponseType::Block, - peer_id.as_peer_id(), - cx, - &self.log, - ); + // Try it again if possible. + retry_request_after_failure( + request_ref, + ResponseType::Block, + block_peer.as_peer_id(), + cx, + &self.log, + ) + } else { + ShouldRemoveLookup::False } - ShouldRemoveLookup::False } } } @@ -1028,17 +976,13 @@ impl BlockLookups { }; let (chain_hash, mut blocks, hashes, block_request) = parent_lookup.parts_for_processing(); - if let Some(child_block) = - self.single_block_lookups - .iter_mut() - .find_map(|(_, _, req)| { - if req.requested_block_root == chain_hash { - req.get_downloaded_block() - } else { - None - } - }) - { + if let Some(child_block) = self.single_block_lookups.iter_mut().find_map(|req| { + if req.block_request_state.requested_block_root == chain_hash { + req.get_downloaded_block() + } else { + None + } + }) { blocks.push(child_block); }; let process_id = ChainSegmentProcessId::ParentLookup(chain_hash); @@ -1132,22 +1076,24 @@ impl BlockLookups { debug!(self.log, "Parent chain processed"; "chain_hash" => %chain_hash, "result" => ?result); match result { BatchProcessResult::Success { .. } => { - if let Some((index, (_, _, _))) = self + if let Some((index, _)) = self .single_block_lookups .iter() .enumerate() - .find(|(_, (_, _, req))| req.requested_block_root == chain_hash) + .find(|(_, req)| req.block_request_state.requested_block_root == chain_hash) { - if let Some((block_id, blob_id, block_wrapper)) = self - .single_block_lookups - .get_mut(index) - .and_then(|(block_id, blob_id, lookup)| { + if let Some((lookup_id, block_wrapper)) = + self.single_block_lookups.get_mut(index).and_then(|lookup| { lookup .get_downloaded_block() - .map(|block| (block_id, blob_id, block)) + .map(|block| (lookup.id.clone(), block)) }) { - let Some(id) = block_id.or(*blob_id) else { + let LookupId { + block_request_id, + blob_request_id, + } = lookup_id; + let Some(id) = block_request_id.or(blob_request_id) else { warn!(self.log, "No id found for single block lookup"; "chain_hash" => %chain_hash); return; }; @@ -1174,8 +1120,8 @@ impl BlockLookups { penalty, } => { self.failed_chains.insert(chain_hash); - let mut all_peers = request.block_request_state.used_peers.clone(); - all_peers.extend(request.blob_request_state.used_peers); + let mut all_peers = request.block_request_state.state.used_peers.clone(); + all_peers.extend(request.blob_request_state.state.used_peers); for peer_source in all_peers { cx.report_peer(peer_source, penalty, "parent_chain_failure") } @@ -1359,7 +1305,6 @@ impl BlockLookups { } fn handle_block_lookup_verify_error( - request_id_ref: &mut u32, request_ref: &mut SingleBlockLookup, response_type: ResponseType, peer_id: PeerId, @@ -1379,110 +1324,126 @@ fn handle_block_lookup_verify_error( debug!(log, "Single block lookup failed"; "peer_id" => %peer_id, "error" => msg, - "block_root" => ?request_ref.requested_block_root, + "block_root" => ?request_ref.block_request_state.requested_block_root, "response_type" => ?response_type ); - retry_request_after_failure( - request_id_ref, - request_ref, - response_type, - &peer_id, - cx, - log, - ) + retry_request_after_failure(request_ref, response_type, &peer_id, cx, log) } fn retry_request_after_failure( - request_id_ref: &mut u32, request_ref: &mut SingleBlockLookup, response_type: ResponseType, initial_peer_id: &PeerId, cx: &mut SyncNetworkContext, log: &Logger, ) -> ShouldRemoveLookup { - let requested_block_root = request_ref.requested_block_root; + let requested_block_root = request_ref.block_request_state.requested_block_root; // try the request again if possible - let id_opt = match response_type { - ResponseType::Block => request_ref.request_block().map(|request_opt| { - request_opt.map(|(peer_id, request)| cx.single_block_lookup_request(peer_id, request)) - }), - ResponseType::Blob => request_ref.request_blobs().map(|request_opt| { - request_opt.map(|(peer_id, request)| cx.single_blobs_lookup_request(peer_id, request)) - }), - }; + match response_type { + ResponseType::Block => { + let id = request_ref.request_block().map(|request_opt| { + request_opt + .map(|(peer_id, request)| cx.single_block_lookup_request(peer_id, request)) + }); + match id { + Ok(Some(Ok(id))) => { + request_ref.id.block_request_id = Some(id); + } + Ok(Some(Err(e))) => { + debug!(log, "Single block lookup failed"; + "peer_id" => %initial_peer_id, + "error" => ?e, + "block_root" => ?requested_block_root, + "response_type" => ?response_type); + return ShouldRemoveLookup::True; + } + Ok(None) => { + request_ref.id.block_request_id = None; + // The lookup failed but the block or blob was found via other means. + } + Err(e) => { + debug!(log, "Single block lookup failed"; + "peer_id" => %initial_peer_id, + "error" => ?e, + "block_root" => ?requested_block_root, + "response_type" => ?response_type); + return ShouldRemoveLookup::True; + } + } + } + ResponseType::Blob => { + let id = request_ref.request_blobs().map(|request_opt| { + request_opt + .map(|(peer_id, request)| cx.single_blobs_lookup_request(peer_id, request)) + }); - match id_opt { - Ok(Some(Ok(id))) => { - *request_id_ref = id; - } - Ok(Some(Err(e))) => { - debug!(log, "Single block lookup failed"; + match id { + Ok(Some(Ok(id))) => { + request_ref.id.blob_request_id = Some(id); + } + Ok(Some(Err(e))) => { + debug!(log, "Single block lookup failed"; "peer_id" => %initial_peer_id, "error" => ?e, "block_root" => ?requested_block_root, "response_type" => ?response_type); - return ShouldRemoveLookup::True; - } - Ok(None) => { - // The lookup failed but the block or blob was found via other means. - } - Err(e) => { - debug!(log, "Single block lookup failed"; + return ShouldRemoveLookup::True; + } + Ok(None) => { + request_ref.id.blob_request_id = None; + // The lookup failed but the block or blob was found via other means. + } + Err(e) => { + debug!(log, "Single block lookup failed"; "peer_id" => %initial_peer_id, "error" => ?e, "block_root" => ?requested_block_root, "response_type" => ?response_type); - return ShouldRemoveLookup::True; + return ShouldRemoveLookup::True; + } + } } - } + }; ShouldRemoveLookup::False } fn should_remove_disconnected_peer( - id: &mut Option, response_type: ResponseType, peer_id: &PeerId, cx: &mut SyncNetworkContext, req: &mut SingleBlockLookup, log: &Logger, ) -> ShouldRemoveLookup { - id - .as_mut() - .filter(|_| req.check_peer_disconnected(peer_id, response_type) - .is_err()) - .map(|block_id| { - trace!(log, "Single lookup failed on peer disconnection"; "block_root" => ?req.requested_block_root, "response_type" => ?response_type); - retry_request_after_failure(block_id, req, response_type, peer_id, cx, log) - }) - .unwrap_or(ShouldRemoveLookup::False) + if req.check_peer_disconnected(peer_id, response_type).is_err() { + trace!(log, "Single lookup failed on peer disconnection"; "block_root" => ?req.block_request_state.requested_block_root, "response_type" => ?response_type); + retry_request_after_failure(req, response_type, peer_id, cx, log) + } else { + ShouldRemoveLookup::False + } } fn should_remove_failed_lookup( - id: &mut Option, + id: Id, response_type: ResponseType, - target_id: Id, msg: &'static str, peer_id: &PeerId, cx: &mut SyncNetworkContext, req: &mut SingleBlockLookup, log: &Logger, ) -> ShouldRemoveLookup { - id - .as_mut() - .filter(|id| **id == target_id) - .map(|id| { - req.register_failure_downloading(response_type); - trace!(log, "Single lookup failed"; "block" => %req.requested_block_root, "error" => msg, "response_type" => ?response_type); - retry_request_after_failure(id, req, response_type, peer_id, cx, log) - }) - .unwrap_or(ShouldRemoveLookup::False) + if req.id.block_request_id == Some(id) || req.id.blob_request_id == Some(id) { + req.register_failure_downloading(response_type); + trace!(log, "Single lookup failed"; "block" => %req.block_request_state.requested_block_root, "error" => msg, "response_type" => ?response_type); + retry_request_after_failure(req, response_type, peer_id, cx, log) + } else { + ShouldRemoveLookup::False + } } fn should_remove_missing_components( request_ref: &mut SingleBlockLookup, response_type: ResponseType, - blob_id_ref: &mut Option, cx: &mut SyncNetworkContext, log: &Logger, ) -> ShouldRemoveLookup { @@ -1502,18 +1463,15 @@ fn should_remove_missing_components( ); } request_ref.remove_peer_if_useless(blob_peer.as_peer_id(), ResponseType::Blob); - if let Some(blob_id_ref) = blob_id_ref { - if !request_ref.downloading(ResponseType::Blob) { - // Try it again if possible. - return retry_request_after_failure( - blob_id_ref, - request_ref, - ResponseType::Blob, - blob_peer.as_peer_id(), - cx, - log, - ); - } + if !request_ref.downloading(ResponseType::Blob) { + // Try it again if possible. + return retry_request_after_failure( + request_ref, + ResponseType::Blob, + blob_peer.as_peer_id(), + cx, + log, + ); } } ShouldRemoveLookup::False diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index 14027c1669..f7d4887981 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -1,11 +1,8 @@ use super::single_block_lookup::{LookupRequestError, LookupVerifyError, SingleBlockLookup}; -use super::{DownloadedBlocks, PeerShouldHave, ResponseType}; +use super::{BlobRequestId, BlockRequestId, DownloadedBlocks, PeerShouldHave, ResponseType}; use crate::sync::block_lookups::single_block_lookup::{State, UnknownParentComponents}; use crate::sync::block_lookups::{RootBlobsTuple, RootBlockTuple}; -use crate::sync::{ - manager::{Id, SLOT_IMPORT_TOLERANCE}, - network_context::SyncNetworkContext, -}; +use crate::sync::{manager::SLOT_IMPORT_TOLERANCE, network_context::SyncNetworkContext}; use beacon_chain::blob_verification::AsBlock; use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::data_availability_checker::DataAvailabilityChecker; @@ -32,9 +29,6 @@ pub(crate) struct ParentLookup { downloaded_blocks: Vec>, /// Request of the last parent. pub current_parent_request: SingleBlockLookup, - /// Id of the last parent request. - pub current_parent_request_id: Option, - pub current_parent_blob_request_id: Option, } #[derive(Debug, PartialEq, Eq, IntoStaticStr)] @@ -77,14 +71,12 @@ impl ParentLookup { da_checker: Arc>, ) -> Self { let current_parent_request = - SingleBlockLookup::new(parent_root, Some(<_>::default()), peer_id, da_checker); + SingleBlockLookup::new(parent_root, Some(<_>::default()), &[peer_id], da_checker); Self { chain_hash: block_root, downloaded_blocks: vec![], current_parent_request, - current_parent_request_id: None, - current_parent_blob_request_id: None, } } @@ -101,11 +93,11 @@ impl ParentLookup { if let Some((peer_id, request)) = self.current_parent_request.request_block()? { match cx.parent_lookup_block_request(peer_id, request) { Ok(request_id) => { - self.current_parent_request_id = Some(request_id); + self.current_parent_request.id.block_request_id = Some(request_id); return Ok(()); } Err(reason) => { - self.current_parent_request_id = None; + self.current_parent_request.id.block_request_id = None; return Err(RequestError::SendFailed(reason)); } } @@ -125,11 +117,11 @@ impl ParentLookup { if let Some((peer_id, request)) = self.current_parent_request.request_blobs()? { match cx.parent_lookup_blobs_request(peer_id, request) { Ok(request_id) => { - self.current_parent_blob_request_id = Some(request_id); + self.current_parent_request.id.blob_request_id = Some(request_id); return Ok(()); } Err(reason) => { - self.current_parent_blob_request_id = None; + self.current_parent_request.id.blob_request_id = None; return Err(RequestError::SendFailed(reason)); } } @@ -140,12 +132,14 @@ impl ParentLookup { pub fn check_block_peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), ()> { self.current_parent_request .block_request_state + .state .check_peer_disconnected(peer_id) } pub fn check_blob_peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), ()> { self.current_parent_request .blob_request_state + .state .check_peer_disconnected(peer_id) } @@ -153,17 +147,22 @@ impl ParentLookup { let next_parent = block.parent_root(); // Cache the block. - let current_root = self.current_parent_request.requested_block_root; + let current_root = self + .current_parent_request + .block_request_state + .requested_block_root; self.downloaded_blocks.push((current_root, block)); // Update the block request. - self.current_parent_request.requested_block_root = next_parent; - self.current_parent_request.block_request_state.state = State::AwaitingDownload; - self.current_parent_request_id = None; + self.current_parent_request + .block_request_state + .requested_block_root = next_parent; + self.current_parent_request.block_request_state.state.state = State::AwaitingDownload; + self.current_parent_request.id.block_request_id = None; // Update the blobs request. - self.current_parent_request.blob_request_state.state = State::AwaitingDownload; - self.current_parent_blob_request_id = None; + self.current_parent_request.blob_request_state.state.state = State::AwaitingDownload; + self.current_parent_request.id.blob_request_id = None; // Reset the unknown parent components. self.current_parent_request.unknown_parent_components = @@ -175,7 +174,7 @@ impl ParentLookup { self.current_parent_request.add_unknown_parent_block(block); // Update the request. - self.current_parent_request_id = None; + self.current_parent_request.id.block_request_id = None; } pub fn add_current_request_blobs(&mut self, blobs: FixedBlobSidecarList) { @@ -183,15 +182,15 @@ impl ParentLookup { self.current_parent_request.add_unknown_parent_blobs(blobs); // Update the request. - self.current_parent_blob_request_id = None; + self.current_parent_request.id.blob_request_id = None; } - pub fn pending_block_response(&self, req_id: Id) -> bool { - self.current_parent_request_id == Some(req_id) + pub fn pending_block_response(&self, req_id: BlockRequestId) -> bool { + self.current_parent_request.id.block_request_id == Some(req_id) } - pub fn pending_blob_response(&self, req_id: Id) -> bool { - self.current_parent_blob_request_id == Some(req_id) + pub fn pending_blob_response(&self, req_id: BlobRequestId) -> bool { + self.current_parent_request.id.blob_request_id == Some(req_id) } /// Consumes the parent request and destructures it into it's parts. @@ -208,8 +207,6 @@ impl ParentLookup { chain_hash, downloaded_blocks, current_parent_request, - current_parent_request_id: _, - current_parent_blob_request_id: _, } = self; let block_count = downloaded_blocks.len(); let mut blocks = Vec::with_capacity(block_count); @@ -229,37 +226,47 @@ impl ParentLookup { pub fn block_download_failed(&mut self) { self.current_parent_request .block_request_state + .state .register_failure_downloading(); - self.current_parent_request_id = None; + self.current_parent_request.id.block_request_id = None; } pub fn blob_download_failed(&mut self) { self.current_parent_request .blob_request_state + .state .register_failure_downloading(); - self.current_parent_blob_request_id = None; + self.current_parent_request.id.blob_request_id = None; } pub fn block_processing_failed(&mut self) { self.current_parent_request .block_request_state + .state .register_failure_processing(); - self.current_parent_request + if let Some(components) = self + .current_parent_request .unknown_parent_components .as_mut() - .map(|components| components.downloaded_block = None); - self.current_parent_request_id = None; + { + components.downloaded_block = None; + } + self.current_parent_request.id.block_request_id = None; } pub fn blob_processing_failed(&mut self) { self.current_parent_request .blob_request_state + .state .register_failure_processing(); - self.current_parent_request + if let Some(components) = self + .current_parent_request .unknown_parent_components .as_mut() - .map(|components| components.downloaded_blobs = <_>::default()); - self.current_parent_blob_request_id = None; + { + components.downloaded_blobs = <_>::default(); + } + self.current_parent_request.id.blob_request_id = None; } /// Verifies that the received block is what we requested. If so, parent lookup now waits for @@ -280,8 +287,9 @@ impl ParentLookup { if failed_chains.contains(&parent_root) { self.current_parent_request .block_request_state + .state .register_failure_downloading(); - self.current_parent_request_id = None; + self.current_parent_request.id.block_request_id = None; return Err(ParentVerifyError::PreviousFailure { parent_root }); } } @@ -303,8 +311,9 @@ impl ParentLookup { if failed_chains.contains(&parent_root) { self.current_parent_request .blob_request_state + .state .register_failure_downloading(); - self.current_parent_blob_request_id = None; + self.current_parent_request.id.blob_request_id = None; return Err(ParentVerifyError::PreviousFailure { parent_root }); } } @@ -312,20 +321,13 @@ impl ParentLookup { Ok(blobs) } - #[cfg(test)] - pub fn failed_block_attempts(&self) -> u8 { - self.current_parent_request - .block_request_state - .failed_attempts() - } - - pub fn add_peer_if_useful( + pub fn add_peers_if_useful( &mut self, block_root: &Hash256, - peer_source: PeerShouldHave, + peer_source: &[PeerShouldHave], ) -> bool { self.current_parent_request - .add_peer_if_useful(block_root, peer_source) + .add_peers_if_useful(block_root, peer_source) } pub fn used_peers(&self, response_type: ResponseType) -> impl Iterator + '_ { @@ -333,11 +335,13 @@ impl ParentLookup { ResponseType::Block => self .current_parent_request .block_request_state + .state .used_peers .iter(), ResponseType::Blob => self .current_parent_request .blob_request_state + .state .used_peers .iter(), } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index c5da84a88f..9e539b8c0b 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -1,4 +1,5 @@ -use crate::sync::block_lookups::{RootBlobsTuple, RootBlockTuple}; +use crate::sync::block_lookups::{BlobRequestId, BlockRequestId, RootBlobsTuple, RootBlockTuple}; +use crate::sync::network_context::SyncNetworkContext; use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::data_availability_checker::DataAvailabilityChecker; use beacon_chain::{get_block_root, BeaconChainTypes}; @@ -17,50 +18,92 @@ use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; use super::{PeerShouldHave, ResponseType}; pub struct SingleBlockLookup { - pub requested_block_root: Hash256, - pub requested_ids: Vec, - /// Where we store blobs until we receive the stream terminator. - pub blob_download_queue: FixedBlobSidecarList, - pub block_request_state: SingleLookupRequestState, - pub blob_request_state: SingleLookupRequestState, + pub id: LookupId, + pub block_request_state: BlockRequestState, + pub blob_request_state: BlobRequestState, pub da_checker: Arc>, - /// Only necessary for requests triggered by an `UnkownParent` because any + /// Only necessary for requests triggered by an `UnknownParent` because any /// blocks or blobs without parents won't hit the data availability cache. pub unknown_parent_components: Option>, } +#[derive(Default, Clone)] +pub struct LookupId { + pub block_request_id: Option, + pub blob_request_id: Option, +} + +pub struct BlobRequestState { + pub requested_ids: Vec, + /// Where we store blobs until we receive the stream terminator. + pub blob_download_queue: FixedBlobSidecarList, + pub state: SingleLookupRequestState, +} + +impl BlobRequestState { + pub fn new(peer_source: &[PeerShouldHave]) -> Self { + Self { + requested_ids: <_>::default(), + blob_download_queue: <_>::default(), + state: SingleLookupRequestState::new(peer_source), + } + } +} + +pub struct BlockRequestState { + pub requested_block_root: Hash256, + pub state: SingleLookupRequestState, +} + +impl BlockRequestState { + pub fn new(block_root: Hash256, peers: &[PeerShouldHave]) -> Self { + Self { + requested_block_root: block_root, + state: SingleLookupRequestState::new(peers), + } + } +} + impl SingleBlockLookup { pub(crate) fn register_failure_downloading(&mut self, response_type: ResponseType) { match response_type { - ResponseType::Block => self.block_request_state.register_failure_downloading(), - ResponseType::Blob => self.blob_request_state.register_failure_downloading(), + ResponseType::Block => self + .block_request_state + .state + .register_failure_downloading(), + ResponseType::Blob => self.blob_request_state.state.register_failure_downloading(), } } } impl SingleBlockLookup { - pub(crate) fn awaiting_download(&mut self, response_type: ResponseType) -> bool { - match response_type { - ResponseType::Block => { - matches!(self.block_request_state.state, State::AwaitingDownload) - } - ResponseType::Blob => matches!(self.blob_request_state.state, State::AwaitingDownload), - } - } - pub(crate) fn downloading(&mut self, response_type: ResponseType) -> bool { match response_type { ResponseType::Block => { - matches!(self.block_request_state.state, State::Downloading {..}) + matches!( + self.block_request_state.state.state, + State::Downloading { .. } + ) + } + ResponseType::Blob => { + matches!( + self.blob_request_state.state.state, + State::Downloading { .. } + ) } - ResponseType::Blob => matches!(self.blob_request_state.state, State::Downloading { .. }), } } pub(crate) fn remove_peer_if_useless(&mut self, peer_id: &PeerId, response_type: ResponseType) { match response_type { - ResponseType::Block => self.block_request_state.remove_peer_if_useless(peer_id), - ResponseType::Blob => self.blob_request_state.remove_peer_if_useless(peer_id), + ResponseType::Block => self + .block_request_state + .state + .remove_peer_if_useless(peer_id), + ResponseType::Blob => self + .blob_request_state + .state + .remove_peer_if_useless(peer_id), } } @@ -70,8 +113,14 @@ impl SingleBlockLookup Result<(), ()> { match response_type { - ResponseType::Block => self.block_request_state.check_peer_disconnected(peer_id), - ResponseType::Blob => self.blob_request_state.check_peer_disconnected(peer_id), + ResponseType::Block => self + .block_request_state + .state + .check_peer_disconnected(peer_id), + ResponseType::Blob => self + .blob_request_state + .state + .check_peer_disconnected(peer_id), } } } @@ -156,33 +205,52 @@ impl SingleBlockLookup>, - peer_source: PeerShouldHave, + peers: &[PeerShouldHave], da_checker: Arc>, ) -> Self { Self { - requested_block_root, - requested_ids: <_>::default(), - blob_download_queue: <_>::default(), - block_request_state: SingleLookupRequestState::new(peer_source), - blob_request_state: SingleLookupRequestState::new(peer_source), + id: <_>::default(), + block_request_state: BlockRequestState::new(requested_block_root, peers), + blob_request_state: BlobRequestState::new(peers), da_checker, unknown_parent_components, } } + pub fn request_block_and_blobs(&mut self, cx: &mut SyncNetworkContext) { + let block_request_id = if let Ok(Some((peer_id, block_request))) = self.request_block() { + cx.single_block_lookup_request(peer_id, block_request).ok() + } else { + None + }; + + let blob_request_id = if let Ok(Some((peer_id, blob_request))) = self.request_blobs() { + cx.single_blobs_lookup_request(peer_id, blob_request).ok() + } else { + None + }; + + self.id = LookupId { + block_request_id, + blob_request_id, + }; + } + pub fn update_blobs_request(&mut self) { - self.requested_ids = if let Some(components) = self.unknown_parent_components.as_ref() { + self.blob_request_state.requested_ids = if let Some(components) = + self.unknown_parent_components.as_ref() + { let blobs = components.downloaded_indices(); self.da_checker .get_missing_blob_ids( - self.requested_block_root, + self.block_request_state.requested_block_root, components.downloaded_block.as_ref(), Some(blobs), ) .unwrap_or_default() } else { self.da_checker - .get_missing_blob_ids_checking_cache(self.requested_block_root) + .get_missing_blob_ids_checking_cache(self.block_request_state.requested_block_root) .unwrap_or_default() }; } @@ -194,7 +262,7 @@ impl SingleBlockLookup SingleBlockLookup SingleBlockLookup>>, ) -> Result>, LookupVerifyError> { - match self.block_request_state.state { + match self.block_request_state.state.state { State::AwaitingDownload => { - self.block_request_state.register_failure_downloading(); + self.block_request_state + .state + .register_failure_downloading(); Err(LookupVerifyError::ExtraBlocksReturned) } State::Downloading { peer_id } => { @@ -256,24 +323,28 @@ impl SingleBlockLookup { if peer_id.should_have_block() { - self.block_request_state.register_failure_downloading(); + self.block_request_state + .state + .register_failure_downloading(); Err(LookupVerifyError::NoBlockReturned) } else { - self.block_request_state.state = State::AwaitingDownload; + self.block_request_state.state.state = State::AwaitingDownload; Err(LookupVerifyError::BenignFailure) } } @@ -282,7 +353,9 @@ impl SingleBlockLookup match block { Some(_) => { // We sent the block for processing and received an extra block. - self.block_request_state.register_failure_downloading(); + self.block_request_state + .state + .register_failure_downloading(); Err(LookupVerifyError::ExtraBlocksReturned) } None => { @@ -298,9 +371,9 @@ impl SingleBlockLookup>>, ) -> Result>, LookupVerifyError> { - match self.blob_request_state.state { + match self.blob_request_state.state.state { State::AwaitingDownload => { - self.blob_request_state.register_failure_downloading(); + self.blob_request_state.state.register_failure_downloading(); Err(LookupVerifyError::ExtraBlobsReturned) } State::Downloading { @@ -308,33 +381,40 @@ impl SingleBlockLookup match blob { Some(blob) => { let received_id = blob.id(); - if !self.requested_ids.contains(&received_id) { - self.blob_request_state.register_failure_downloading(); + if !self.blob_request_state.requested_ids.contains(&received_id) { + self.blob_request_state.state.register_failure_downloading(); Err(LookupVerifyError::UnrequestedBlobId) } else { // State should remain downloading until we receive the stream terminator. - self.requested_ids.retain(|id| *id != received_id); - //TODO(sean) validate index here - // EArr(LookupVerifyError::InvalidIndex(blob.index)) + self.blob_request_state + .requested_ids + .retain(|id| *id != received_id); let blob_index = blob.index; - *self.blob_download_queue.index_mut(blob_index as usize) = Some(blob); + + if blob_index >= T::EthSpec::max_blobs_per_block() as u64 { + return Err(LookupVerifyError::InvalidIndex(blob.index)); + } + *self + .blob_request_state + .blob_download_queue + .index_mut(blob_index as usize) = Some(blob); Ok(None) } } None => { - self.blob_request_state.state = State::Processing { + self.blob_request_state.state.state = State::Processing { peer_id: peer_source, }; Ok(Some(( - self.requested_block_root, - std::mem::replace(&mut self.blob_download_queue, <_>::default()), + self.block_request_state.requested_block_root, + std::mem::take(&mut self.blob_request_state.blob_download_queue), ))) } }, State::Processing { peer_id: _ } => match blob { Some(_) => { // We sent the blob for processing and received an extra blob. - self.blob_request_state.register_failure_downloading(); + self.blob_request_state.state.register_failure_downloading(); Err(LookupVerifyError::ExtraBlobsReturned) } None => { @@ -353,7 +433,8 @@ impl SingleBlockLookup SingleBlockLookup SingleBlockLookup Result, LookupRequestError> { self.update_blobs_request(); - if self.requested_ids.is_empty() { + if self.blob_request_state.requested_ids.is_empty() { return Ok(None); } debug_assert!(matches!( - self.blob_request_state.state, + self.blob_request_state.state.state, State::AwaitingDownload )); let request = BlobsByRootRequest { - blob_ids: VariableList::from(self.requested_ids.clone()), + blob_ids: VariableList::from(self.blob_request_state.requested_ids.clone()), }; let response_type = ResponseType::Blob; if self.too_many_attempts(response_type) { @@ -411,20 +492,20 @@ impl SingleBlockLookup bool { match response_type { - ResponseType::Block => self.block_request_state.failed_attempts() >= MAX_ATTEMPTS, - ResponseType::Blob => self.blob_request_state.failed_attempts() >= MAX_ATTEMPTS, + ResponseType::Block => self.block_request_state.state.failed_attempts() >= MAX_ATTEMPTS, + ResponseType::Blob => self.blob_request_state.state.failed_attempts() >= MAX_ATTEMPTS, } } fn cannot_process(&self, response_type: ResponseType) -> bool { match response_type { ResponseType::Block => { - self.block_request_state.failed_processing - >= self.block_request_state.failed_downloading + self.block_request_state.state.failed_processing + >= self.block_request_state.state.failed_downloading } ResponseType::Blob => { - self.blob_request_state.failed_processing - >= self.blob_request_state.failed_downloading + self.blob_request_state.state.failed_processing + >= self.blob_request_state.state.failed_downloading } } } @@ -433,6 +514,7 @@ impl SingleBlockLookup self .block_request_state + .state .available_peers .iter() .choose(&mut rand::thread_rng()) @@ -440,6 +522,7 @@ impl SingleBlockLookup SingleBlockLookup self .blob_request_state + .state .available_peers .iter() .choose(&mut rand::thread_rng()) @@ -454,6 +538,7 @@ impl SingleBlockLookup SingleBlockLookup { self.block_request_state + .state .used_peers .insert(peer_id.to_peer_id()); - self.block_request_state.state = State::Downloading { peer_id }; + self.block_request_state.state.state = State::Downloading { peer_id }; } ResponseType::Blob => { self.blob_request_state + .state .used_peers .insert(peer_id.to_peer_id()); - self.blob_request_state.state = State::Downloading { peer_id }; + self.blob_request_state.state.state = State::Downloading { peer_id }; } } } - pub fn add_peer_if_useful( - &mut self, - block_root: &Hash256, - peer_source: PeerShouldHave, - ) -> bool { - if *block_root != self.requested_block_root { + pub fn add_peers_if_useful(&mut self, block_root: &Hash256, peers: &[PeerShouldHave]) -> bool { + if *block_root != self.block_request_state.requested_block_root { return false; } - match peer_source { - PeerShouldHave::BlockAndBlobs(peer_id) => { - self.block_request_state.add_peer(&peer_id); - self.blob_request_state.add_peer(&peer_id); - } - PeerShouldHave::Neither(peer_id) => { - self.block_request_state.add_potential_peer(&peer_id); - self.blob_request_state.add_potential_peer(&peer_id); + for peer in peers { + match peer { + PeerShouldHave::BlockAndBlobs(peer_id) => { + self.block_request_state.state.add_peer(peer_id); + self.blob_request_state.state.add_peer(peer_id); + } + PeerShouldHave::Neither(peer_id) => { + self.block_request_state.state.add_potential_peer(peer_id); + self.blob_request_state.state.add_potential_peer(peer_id); + } } } + true } pub fn processing_peer(&self, response_type: ResponseType) -> Result { match response_type { - ResponseType::Block => self.block_request_state.processing_peer(), - ResponseType::Blob => self.blob_request_state.processing_peer(), + ResponseType::Block => self.block_request_state.state.processing_peer(), + ResponseType::Blob => self.blob_request_state.state.processing_peer(), } } pub fn downloading_peer(&self, response_type: ResponseType) -> Result { match response_type { - ResponseType::Block => self.block_request_state.peer(), - ResponseType::Blob => self.blob_request_state.peer(), + ResponseType::Block => self.block_request_state.state.peer(), + ResponseType::Blob => self.blob_request_state.state.peer(), } } pub fn both_components_processed(&self) -> bool { - self.block_request_state.component_processed && self.block_request_state.component_processed + self.block_request_state.state.component_processed + && self.blob_request_state.state.component_processed } pub fn set_component_processed(&mut self, response_type: ResponseType) { match response_type { - ResponseType::Block => self.block_request_state.component_processed = true, - ResponseType::Blob => self.blob_request_state.component_processed = true, + ResponseType::Block => self.block_request_state.state.component_processed = true, + ResponseType::Blob => self.blob_request_state.state.component_processed = true, } } } impl SingleLookupRequestState { - pub fn new(peer_source: PeerShouldHave) -> Self { - let (available_peers, potential_peers) = match peer_source { - PeerShouldHave::BlockAndBlobs(peer_id) => { - (HashSet::from([peer_id]), HashSet::default()) + pub fn new(peers: &[PeerShouldHave]) -> Self { + let mut available_peers = HashSet::default(); + let mut potential_peers = HashSet::default(); + for peer in peers { + match peer { + PeerShouldHave::BlockAndBlobs(peer_id) => { + available_peers.insert(*peer_id); + } + PeerShouldHave::Neither(peer_id) => { + potential_peers.insert(*peer_id); + } } - PeerShouldHave::Neither(peer_id) => (HashSet::default(), HashSet::from([peer_id])), - }; + } Self { state: State::AwaitingDownload, available_peers, @@ -621,15 +714,21 @@ impl slog::Value serializer: &mut dyn slog::Serializer, ) -> slog::Result { serializer.emit_str("request", key)?; - serializer.emit_arguments("hash", &format_args!("{}", self.requested_block_root))?; - serializer.emit_arguments("blob_ids", &format_args!("{:?}", self.requested_ids))?; serializer.emit_arguments( - "block_request_state", - &format_args!("{:?}", self.block_request_state), + "hash", + &format_args!("{}", self.block_request_state.requested_block_root), )?; serializer.emit_arguments( - "blob_request_state", - &format_args!("{:?}", self.blob_request_state), + "blob_ids", + &format_args!("{:?}", self.blob_request_state.requested_ids), + )?; + serializer.emit_arguments( + "block_request_state.state", + &format_args!("{:?}", self.block_request_state.state), + )?; + serializer.emit_arguments( + "blob_request_state.state", + &format_args!("{:?}", self.blob_request_state.state), )?; slog::Result::Ok(()) } @@ -665,7 +764,6 @@ mod tests { use super::*; use beacon_chain::builder::Witness; use beacon_chain::eth1_chain::CachingEth1Backend; - use slog::Logger; use sloggers::null::NullLoggerBuilder; use sloggers::Build; use slot_clock::{SlotClock, TestingSlotClock}; @@ -705,7 +803,7 @@ mod tests { .expect("data availability checker"), ); let mut sl = - SingleBlockLookup::<4, T>::new(block.canonical_root(), None, peer_id, da_checker); + SingleBlockLookup::<4, T>::new(block.canonical_root(), None, &[peer_id], da_checker); sl.request_block().unwrap(); sl.verify_block(Some(block.into())).unwrap().unwrap(); } @@ -733,12 +831,12 @@ mod tests { let mut sl = SingleBlockLookup::::new( block.canonical_root(), None, - peer_id, + &[peer_id], da_checker, ); for _ in 1..FAILURES { sl.request_block().unwrap(); - sl.block_request_state.register_failure_downloading(); + sl.block_request_state.state.register_failure_downloading(); } // Now we receive the block and send it for processing @@ -746,7 +844,7 @@ mod tests { sl.verify_block(Some(block.into())).unwrap().unwrap(); // One processing failure maxes the available attempts - sl.block_request_state.register_failure_processing(); + sl.block_request_state.state.register_failure_processing(); assert_eq!( sl.request_block(), Err(LookupRequestError::TooManyAttempts { diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 9a565b9c4e..58945ddbd1 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -58,6 +58,7 @@ use lighthouse_network::{PeerAction, PeerId}; use slog::{crit, debug, error, info, trace, warn, Logger}; use slot_clock::SlotClock; use std::boxed::Box; +use std::collections::HashMap; use std::ops::IndexMut; use std::ops::Sub; use std::sync::Arc; @@ -127,8 +128,18 @@ pub enum SyncMessage { /// A block with an unknown parent has been received. UnknownBlock(PeerId, BlockWrapper, Hash256), + /// A blob with an unknown parent has been received. BlobParentUnknown(PeerId, Arc>), + /// Used to re-trigger requests after delaying the lookup for the block + blobs in the + /// current slot. + MergedParentUnknown( + Hash256, + Vec, + Option>>, + Option>, + ), + /// A peer has sent an attestation that references a block that is unknown. This triggers the /// manager to attempt to find the block matching the unknown hash. UnknownBlockHashFromAttestation(PeerId, Hash256), @@ -293,11 +304,57 @@ pub fn spawn( sleep(sleep_duration).await; - //TODO(sean) aggregate messages for blobs for the same block + let mut merged = HashMap::new(); while let Ok(msg) = delayed_lookups_recv.try_recv() { + match msg { + SyncMessage::BlobParentUnknown(peer_id, blob) => { + let blob_index = blob.index; + if blob_index < T::EthSpec::max_blobs_per_block() as u64 { + let (_, blobs, peers) = merged.entry(blob.block_root).or_insert_with(||{ + (None, Some(FixedBlobSidecarList::default()), vec![]) + }); + if let Some(blobs) = blobs { + *blobs.index_mut(blob_index as usize) = Some(blob); + } + peers.push(PeerShouldHave::Neither(peer_id)); + } else { + warn!(log, "Received blob with invalid index"; "index" => blob_index, "peer_id" => %peer_id); + } + } + SyncMessage::UnknownBlock(peer_id, block, root) => { + let (block, blobs) = block.deconstruct(); + let (cached_block, cached_blobs, peers) = merged.entry(root).or_insert_with(||{ + (None, Some(FixedBlobSidecarList::default()), vec![]) + }); + if let (Some(cached_blobs), Some( mut blobs)) = (cached_blobs, blobs) { + for blob in blobs.iter_mut() { + if let Some(blob) = blob.take() { + let blob_index = blob.index; + if blob_index < T::EthSpec::max_blobs_per_block() as u64 { + *cached_blobs.index_mut(blob_index as usize) = Some(blob); + } else { + warn!(log, "Received blob with invalid index"; "index" => blob_index, "peer_id" => %peer_id); + } + } + } + } + *cached_block = Some(block); + peers.push(PeerShouldHave::Neither(peer_id)); + } + _ => { + if let Err(e) = sync_send.send(msg) { + warn!(log, "Failed to send delayed lookup message"; "error" => ?e); + } + } + } + } + + // Send `MergedParentUnknown` messages to the sync manager. + for (root, (block, blobs, peers)) in merged { + let msg = SyncMessage::MergedParentUnknown(root, peers, block, blobs); if let Err(e) = sync_send.send(msg) { - warn!(log, "Failed to send delayed lookup message"; "error" => ?e); + warn!(log, "Failed to send merged delayed lookup message"; "error" => ?e); } } } @@ -644,7 +701,13 @@ impl SyncManager { if self.synced_and_connected_within_tolerance(block_slot, &peer_id) { let parent_root = block.parent_root(); - + self.block_lookups.search_parent( + block_slot, + block_root, + parent_root, + peer_id, + &mut self.network, + ); if self.should_delay_lookup(block_slot) { if let Err(e) = self .delayed_lookups @@ -659,16 +722,9 @@ impl SyncManager { block_root, Some(block), blobs, - peer_id, + &[PeerShouldHave::Neither(peer_id)], &mut self.network, ); - self.block_lookups.search_parent( - block_slot, - block_root, - parent_root, - peer_id, - &mut self.network, - ); } } } @@ -679,7 +735,13 @@ impl SyncManager { let block_root = blob.block_root; let parent_root = blob.block_parent_root; let blob_index = blob.index; - + self.block_lookups.search_parent( + blob_slot, + block_root, + parent_root, + peer_id, + &mut self.network, + ); if self.should_delay_lookup(blob_slot) { if let Err(e) = self .delayed_lookups @@ -695,18 +757,21 @@ impl SyncManager { block_root, None, Some(blobs), - peer_id, + &[PeerShouldHave::Neither(peer_id)], &mut self.network, ); } - self.block_lookups.search_parent( - blob_slot, + } + } + SyncMessage::MergedParentUnknown(block_root, peers, block, blobs) => { + self.block_lookups + .search_current_unknown_parent_block_and_blobs( block_root, - parent_root, - peer_id, + block, + blobs, + peers.as_slice(), &mut self.network, ); - } } SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_hash) => { // If we are not synced, ignore this block. diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 66fcb0fe03..c04ea47695 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -7,6 +7,7 @@ use super::range_sync::{BatchId, ByRangeRequestType, ChainId}; use crate::beacon_processor::WorkEvent; use crate::service::{NetworkMessage, RequestId}; use crate::status::ToStatusMessage; +use crate::sync::block_lookups::{BlobRequestId, BlockRequestId}; use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState}; use fnv::FnvHashMap; @@ -463,7 +464,7 @@ impl SyncNetworkContext { &mut self, peer_id: PeerId, request: BlocksByRootRequest, - ) -> Result { + ) -> Result { let id = self.next_id(); let request_id = RequestId::Sync(SyncRequestId::ParentLookup { id }); @@ -488,7 +489,7 @@ impl SyncNetworkContext { &mut self, peer_id: PeerId, request: BlobsByRootRequest, - ) -> Result { + ) -> Result { let id = self.next_id(); let request_id = RequestId::Sync(SyncRequestId::ParentLookup { id });