diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index e5ac442551..9c213a68af 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -511,7 +511,7 @@ impl TryInto> for MaybeAvailableBlock { fn try_into(self) -> Result, Self::Error> { match self { Self::Available(block) => Ok(block), - Self::AvailabilityPending(block) => Err(AvailabilityCheckError::MissingBlobs), + Self::AvailabilityPending(_block) => Err(AvailabilityCheckError::MissingBlobs), } } } diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index a7264ff85e..3b974a8c49 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -312,7 +312,9 @@ pub enum BlockError { ParentExecutionPayloadInvalid { parent_root: Hash256, }, + /// The blob alone failed validation. BlobValidation(BlobError), + /// The block and blob together failed validation. AvailabilityCheck(AvailabilityCheckError), MissingBlockParts(Slot, Hash256), } diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 72700947ca..c1e27be742 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -37,6 +37,7 @@ pub enum AvailabilityCheckError { SszTypes(ssz_types::Error), MissingBlobs, NumBlobsMismatch { + /// The peer sent us an invalid block, we must penalise harshly. num_kzg_commitments: usize, num_blobs: usize, }, diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 40a81e38c3..081cc2b221 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -546,7 +546,7 @@ impl BlockLookups { ); } - self.request_parent_blob(parent_lookup, cx); + self.request_parent_blobs(parent_lookup, cx); } else { self.parent_lookups.push(parent_lookup) } @@ -557,53 +557,9 @@ impl BlockLookups { // processing result arrives. self.parent_lookups.push(parent_lookup); } - Err(e) => match e { - ParentVerifyError::RootMismatch - | ParentVerifyError::NoBlockReturned - | ParentVerifyError::NotEnoughBlobsReturned - | ParentVerifyError::ExtraBlocksReturned - | ParentVerifyError::UnrequestedBlobId - | ParentVerifyError::ExtraBlobsReturned - | ParentVerifyError::InvalidIndex(_) => { - let e = e.into(); - warn!(self.log, "Peer sent invalid response to parent request."; - "peer_id" => %peer_id, "reason" => %e); - - // We do not tolerate these kinds of errors. We will accept a few but these are signs - // of a faulty peer. - cx.report_peer(peer_id, PeerAction::LowToleranceError, e); - - // We try again if possible. - self.request_parent_block(parent_lookup, cx); - } - ParentVerifyError::PreviousFailure { parent_root } => { - debug!( - self.log, - "Parent chain ignored due to past failure"; - "block" => %parent_root, - ); - // Add the root block to failed chains - self.failed_chains.insert(parent_lookup.chain_hash()); - - cx.report_peer( - peer_id, - PeerAction::MidToleranceError, - "bbroot_failed_chains", - ); - } - ParentVerifyError::BenignFailure => { - trace!( - self.log, - "Requested peer could not respond to block request, requesting a new peer"; - ); - parent_lookup - .current_parent_request - .block_request_state - .potential_peers - .remove(&peer_id); - self.request_parent_block(parent_lookup, cx); - } - }, + Err(e) => { + self.handle_parent_verify_error(peer_id, parent_lookup, ResponseType::Block, e, cx) + } }; metrics::set_gauge( @@ -660,53 +616,9 @@ impl BlockLookups { // Waiting for more blobs to arrive self.parent_lookups.push(parent_lookup); } - Err(e) => match e { - ParentVerifyError::RootMismatch - | ParentVerifyError::NoBlockReturned - | ParentVerifyError::NotEnoughBlobsReturned - | ParentVerifyError::ExtraBlocksReturned - | ParentVerifyError::UnrequestedBlobId - | ParentVerifyError::ExtraBlobsReturned - | ParentVerifyError::InvalidIndex(_) => { - let e = e.into(); - warn!(self.log, "Peer sent invalid response to parent request."; - "peer_id" => %peer_id, "reason" => %e); - - // We do not tolerate these kinds of errors. We will accept a few but these are signs - // of a faulty peer. - cx.report_peer(peer_id, PeerAction::LowToleranceError, e); - - // We try again if possible. - self.request_parent_blob(parent_lookup, cx); - } - ParentVerifyError::PreviousFailure { parent_root } => { - debug!( - self.log, - "Parent chain ignored due to past failure"; - "block" => %parent_root, - ); - // Add the root block to failed chains - self.failed_chains.insert(parent_lookup.chain_hash()); - - cx.report_peer( - peer_id, - PeerAction::MidToleranceError, - "bbroot_failed_chains", - ); - } - ParentVerifyError::BenignFailure => { - trace!( - self.log, - "Requested peer could not respond to blob request, requesting a new peer"; - ); - parent_lookup - .current_parent_request - .blob_request_state - .potential_peers - .remove(&peer_id); - self.request_parent_blob(parent_lookup, cx); - } - }, + Err(e) => { + self.handle_parent_verify_error(peer_id, parent_lookup, ResponseType::Blob, e, cx) + } }; metrics::set_gauge( @@ -715,36 +627,91 @@ impl BlockLookups { ); } + fn handle_parent_verify_error( + &mut self, + peer_id: PeerId, + mut parent_lookup: ParentLookup, + response_type: ResponseType, + e: ParentVerifyError, + cx: &mut SyncNetworkContext, + ) { + match e { + ParentVerifyError::RootMismatch + | ParentVerifyError::NoBlockReturned + | ParentVerifyError::NotEnoughBlobsReturned + | ParentVerifyError::ExtraBlocksReturned + | ParentVerifyError::UnrequestedBlobId + | ParentVerifyError::ExtraBlobsReturned + | ParentVerifyError::InvalidIndex(_) => { + let e = e.into(); + warn!(self.log, "Peer sent invalid response to parent request."; + "peer_id" => %peer_id, "reason" => %e); + + // We do not tolerate these kinds of errors. We will accept a few but these are signs + // of a faulty peer. + cx.report_peer(peer_id, PeerAction::LowToleranceError, e); + + // We try again if possible. + match response_type { + ResponseType::Block => self.request_parent_block(parent_lookup, cx), + ResponseType::Blob => self.request_parent_blobs(parent_lookup, cx), + }; + } + ParentVerifyError::PreviousFailure { parent_root } => { + debug!( + self.log, + "Parent chain ignored due to past failure"; + "block" => %parent_root, + ); + // Add the root block to failed chains + self.failed_chains.insert(parent_lookup.chain_hash()); + + cx.report_peer( + peer_id, + PeerAction::MidToleranceError, + "bbroot_failed_chains", + ); + } + ParentVerifyError::BenignFailure => { + trace!( + self.log, + "Requested peer could not respond to block request, requesting a new peer"; + ); + parent_lookup + .current_parent_request + .remove_peer_if_useless(&peer_id, response_type); + match response_type { + ResponseType::Block => self.request_parent_block(parent_lookup, cx), + ResponseType::Blob => self.request_parent_blobs(parent_lookup, cx), + }; + } + } + } + /* Error responses */ pub fn peer_disconnected(&mut self, peer_id: &PeerId, cx: &mut SyncNetworkContext) { self.single_block_lookups .retain_mut(|(block_id_opt, blob_id_opt, req)| { - let should_remove_block = block_id_opt - .as_mut() - .filter(|_| req.block_request_state - .check_peer_disconnected(peer_id) - .is_err()) - .map(|block_id| { + let should_remove_block = should_remove_disconnected_peer( + block_id_opt, + ResponseType::Block, + peer_id, + cx, + req, + &self.log, + ); + let should_remove_blob = should_remove_disconnected_peer( + blob_id_opt, + ResponseType::Blob, + peer_id, + cx, + req, + &self.log, + ); - trace!(self.log, "Single block lookup failed on peer disconnection"; "block_root" => ?req.requested_block_root, ); - retry_request_after_failure(block_id, req, ResponseType::Block, peer_id, cx, &self.log) - }) - .unwrap_or(ShouldRemoveLookup::False); - - let should_remove_blob = blob_id_opt - .as_mut() - .filter(|_| req.blob_request_state - .check_peer_disconnected(peer_id) - .is_err()) - .map(|blob_id| { - - trace!(self.log, "Single blob lookup failed on peer disconnection"; "block_root" => ?req.requested_block_root, ); - retry_request_after_failure(blob_id, req, ResponseType::Blob, peer_id, cx, &self.log) - }) - .unwrap_or(ShouldRemoveLookup::False); - - matches!(should_remove_block, ShouldRemoveLookup::False) && matches!(should_remove_blob, ShouldRemoveLookup::False) + matches!(should_remove_block, ShouldRemoveLookup::False) + && matches!(should_remove_blob, ShouldRemoveLookup::False) }); /* Check disconnection for parent lookups */ @@ -790,7 +757,7 @@ impl BlockLookups { parent_lookup.blob_download_failed(); trace!(self.log, "Parent lookup blobs request failed"; &parent_lookup, "error" => msg); - self.request_parent_blob(parent_lookup, cx); + self.request_parent_blobs(parent_lookup, cx); } else { return debug!(self.log, "RPC failure for a blobs parent lookup request that was not found"; "peer_id" => %peer_id, "error" => msg); }; @@ -808,30 +775,32 @@ impl BlockLookups { error: RPCError, ) { let msg = error.as_static_str(); - self.single_block_lookups.retain_mut(|(block_id_opt, blob_id_opt, req)|{ + self.single_block_lookups + .retain_mut(|(block_id_opt, blob_id_opt, req)| { + let should_remove_block = should_remove_failed_lookup( + block_id_opt, + ResponseType::Block, + id, + msg, + peer_id, + cx, + req, + &self.log, + ); + let should_remove_blob = should_remove_failed_lookup( + blob_id_opt, + ResponseType::Blob, + id, + msg, + peer_id, + cx, + req, + &self.log, + ); - let should_remove_block = block_id_opt - .as_mut() - .filter(|block_id| **block_id == id) - .map(|block_id| { - req.block_request_state.register_failure_downloading(); - trace!(self.log, "Single block lookup failed"; "block" => %req.requested_block_root, "error" => msg); - retry_request_after_failure(block_id, req, ResponseType::Block, peer_id, cx, &self.log) - }) - .unwrap_or(ShouldRemoveLookup::False); - - let should_remove_blob = blob_id_opt - .as_mut() - .filter(|blob_id| **blob_id == id) - .map(|blob_id| { - req.blob_request_state.register_failure_downloading(); - trace!(self.log, "Single blob lookup failed"; "block" => %req.requested_block_root, "error" =>msg); - retry_request_after_failure(blob_id, req, ResponseType::Block, peer_id, cx, &self.log) - }) - .unwrap_or(ShouldRemoveLookup::False); - - matches!(should_remove_block, ShouldRemoveLookup::False) && matches!(should_remove_blob, ShouldRemoveLookup::False) - }); + matches!(should_remove_block, ShouldRemoveLookup::False) + && matches!(should_remove_blob, ShouldRemoveLookup::False) + }); metrics::set_gauge( &metrics::SYNC_SINGLE_BLOCK_LOOKUPS, @@ -850,13 +819,9 @@ impl BlockLookups { ) { let lookup_components_opt = self.single_block_lookups.iter_mut().enumerate().find_map( |(index, (block_id_opt, blob_id_opt, req))| { - let id_filter = |id: &Id| -> bool { target_id == *id }; - let block_matches = block_id_opt.as_ref().map(id_filter).unwrap_or(false); - let blob_matches = blob_id_opt.as_ref().map(id_filter).unwrap_or(false); - if !block_matches && !blob_matches { - return None; - } - Some((index, block_id_opt, blob_id_opt, req)) + let block_match = block_id_opt.as_ref() == Some(&target_id); + let blob_match = blob_id_opt.as_ref() == Some(&target_id); + (block_match || blob_match).then_some((index, block_id_opt, blob_id_opt, req)) }, ); let (index, block_id_ref, blob_id_ref, request_ref) = match lookup_components_opt { @@ -883,116 +848,14 @@ impl BlockLookups { trace!(self.log, "Single block processing succeeded"; "block" => %root); ShouldRemoveLookup::True } - AvailabilityProcessingStatus::MissingComponents(_, block_root) => { - // if peer should have both, and missing components is received after we've - // processed the opposite, then we can downscore. - let should_remove = match response_type { - ResponseType::Block => { - if request_ref.blob_request_state.component_processed { - match request_ref.processing_peer(ResponseType::Blob) { - Ok(PeerShouldHave::BlockAndBlobs(other_peer)) => { - cx.report_peer( - peer_id.to_peer_id(), - PeerAction::MidToleranceError, - "single_block_failure", - ); - if let Some(blob_id_ref) = blob_id_ref { - // Try it again if possible. - retry_request_after_failure( - blob_id_ref, - request_ref, - ResponseType::Blob, - peer_id.as_peer_id(), - cx, - &self.log, - ) - } else { - ShouldRemoveLookup::False - } - } - Ok(PeerShouldHave::Neither(other_peer)) => { - request_ref - .blob_request_state - .remove_peer_if_useless(&other_peer); - if let Some(blob_id_ref) = blob_id_ref { - // Try it again if possible. - retry_request_after_failure( - blob_id_ref, - request_ref, - ResponseType::Blob, - peer_id.as_peer_id(), - cx, - &self.log, - ) - } else { - ShouldRemoveLookup::False - } - } - Err(()) => { - //TODO(sean) retry? - ShouldRemoveLookup::False - } - } - } else { - request_ref.block_request_state.component_processed = true; - ShouldRemoveLookup::False - } - } - ResponseType::Blob => { - if request_ref.block_request_state.component_processed { - match request_ref.processing_peer(ResponseType::Blob) { - Ok(PeerShouldHave::BlockAndBlobs(other_peer)) => { - cx.report_peer( - other_peer, - PeerAction::MidToleranceError, - "single_block_failure", - ); - if let Some(blob_id_ref) = blob_id_ref { - // Try it again if possible. - retry_request_after_failure( - blob_id_ref, - request_ref, - ResponseType::Blob, - peer_id.as_peer_id(), - cx, - &self.log, - ) - } else { - ShouldRemoveLookup::False - } - } - Ok(PeerShouldHave::Neither(other_peer)) => { - request_ref - .blob_request_state - .remove_peer_if_useless(&other_peer); - if let Some(blob_id_ref) = blob_id_ref { - // Try it again if possible. - retry_request_after_failure( - blob_id_ref, - request_ref, - ResponseType::Blob, - peer_id.as_peer_id(), - cx, - &self.log, - ) - } else { - ShouldRemoveLookup::False - } - } - Err(()) => { - //TODO(sean) retry? - ShouldRemoveLookup::False - } - } - } else { - // retry block here? - - request_ref.blob_request_state.component_processed = true; - ShouldRemoveLookup::False - } - } - }; - should_remove + AvailabilityProcessingStatus::MissingComponents(_, _block_root) => { + should_remove_missing_components( + request_ref, + response_type, + blob_id_ref, + cx, + &self.log, + ) } }, BlockProcessingResult::Ignored => { @@ -1018,13 +881,14 @@ impl BlockLookups { ShouldRemoveLookup::True } BlockError::ParentUnknown(block) => { - self.search_parent( - block.slot(), - root, - block.parent_root(), - peer_id.to_peer_id(), - cx, - ); + let slot = block.slot(); + let parent_root = block.parent_root(); + let (block, blobs) = block.deconstruct(); + request_ref.add_unknown_parent_block(block); + if let Some(blobs) = blobs { + request_ref.add_unknown_parent_blobs(blobs); + } + self.search_parent(slot, root, parent_root, peer_id.to_peer_id(), cx); ShouldRemoveLookup::False } ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => { @@ -1047,27 +911,32 @@ impl BlockLookups { ); if let Some(blob_id_ref) = blob_id_ref { // Try it again if possible. - retry_request_after_failure( - blob_id_ref, - request_ref, - ResponseType::Blob, - peer_id.as_peer_id(), - cx, - &self.log, - ) - } else if let Some(block_id_ref) = block_id_ref { - // Try it again if possible. - retry_request_after_failure( - block_id_ref, - request_ref, - ResponseType::Block, - peer_id.as_peer_id(), - cx, - &self.log, - ) - } else { - ShouldRemoveLookup::True + if !request_ref.awaiting_download(ResponseType::Blob) { + retry_request_after_failure( + blob_id_ref, + request_ref, + ResponseType::Blob, + peer_id.as_peer_id(), + cx, + &self.log, + ); + } } + + if let Some(block_id_ref) = block_id_ref { + if !request_ref.awaiting_download(ResponseType::Block) { + // Try it again if possible. + retry_request_after_failure( + block_id_ref, + request_ref, + ResponseType::Block, + peer_id.as_peer_id(), + cx, + &self.log, + ); + } + } + ShouldRemoveLookup::False } } } @@ -1396,7 +1265,7 @@ impl BlockLookups { self.handle_response(parent_lookup, cx, response, ResponseType::Block); } - fn request_parent_blob( + fn request_parent_blobs( &mut self, mut parent_lookup: ParentLookup, cx: &mut SyncNetworkContext, @@ -1498,19 +1367,7 @@ fn handle_block_lookup_verify_error( log: &Logger, ) -> ShouldRemoveLookup { let msg = if matches!(e, LookupVerifyError::BenignFailure) { - match response_type { - // Only remove a potential peer if there are better options - ResponseType::Block => { - request_ref - .block_request_state - .remove_peer_if_useless(&peer_id); - } - ResponseType::Blob => { - request_ref - .blob_request_state - .remove_peer_if_useless(&peer_id); - } - }; + request_ref.remove_peer_if_useless(&peer_id, response_type); "peer could not response to request" } else { let msg = e.into(); @@ -1580,3 +1437,83 @@ fn retry_request_after_failure( } ShouldRemoveLookup::False } + +fn should_remove_disconnected_peer( + id: &mut Option, + response_type: ResponseType, + peer_id: &PeerId, + cx: &mut SyncNetworkContext, + req: &mut SingleBlockLookup, + log: &Logger, +) -> ShouldRemoveLookup { + id + .as_mut() + .filter(|_| req.check_peer_disconnected(peer_id, response_type) + .is_err()) + .map(|block_id| { + trace!(log, "Single lookup failed on peer disconnection"; "block_root" => ?req.requested_block_root, "response_type" => ?response_type); + retry_request_after_failure(block_id, req, response_type, peer_id, cx, log) + }) + .unwrap_or(ShouldRemoveLookup::False) +} + +fn should_remove_failed_lookup( + id: &mut Option, + response_type: ResponseType, + target_id: Id, + msg: &'static str, + peer_id: &PeerId, + cx: &mut SyncNetworkContext, + req: &mut SingleBlockLookup, + log: &Logger, +) -> ShouldRemoveLookup { + id + .as_mut() + .filter(|id| **id == target_id) + .map(|id| { + req.register_failure_downloading(response_type); + trace!(log, "Single lookup failed"; "block" => %req.requested_block_root, "error" => msg, "response_type" => ?response_type); + retry_request_after_failure(id, req, response_type, peer_id, cx, log) + }) + .unwrap_or(ShouldRemoveLookup::False) +} + +fn should_remove_missing_components( + request_ref: &mut SingleBlockLookup, + response_type: ResponseType, + blob_id_ref: &mut Option, + cx: &mut SyncNetworkContext, + log: &Logger, +) -> ShouldRemoveLookup { + request_ref.set_component_processed(response_type); + + // If we get a missing component response after processing both a blob and a block response, the + // blobs must be what are missing. + if request_ref.both_components_processed() { + let Ok(blob_peer) = request_ref.processing_peer(ResponseType::Blob) else { + return ShouldRemoveLookup::False; + }; + if let PeerShouldHave::BlockAndBlobs(blob_peer) = blob_peer { + cx.report_peer( + blob_peer, + PeerAction::MidToleranceError, + "single_block_failure", + ); + } + request_ref.remove_peer_if_useless(blob_peer.as_peer_id(), ResponseType::Blob); + if let Some(blob_id_ref) = blob_id_ref { + if !request_ref.awaiting_download(ResponseType::Blob) { + // Try it again if possible. + return retry_request_after_failure( + blob_id_ref, + request_ref, + ResponseType::Blob, + blob_peer.as_peer_id(), + cx, + log, + ); + } + } + } + ShouldRemoveLookup::False +} diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index e56436bef1..14027c1669 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -8,14 +8,14 @@ use crate::sync::{ }; use beacon_chain::blob_verification::AsBlock; use beacon_chain::blob_verification::BlockWrapper; -use beacon_chain::data_availability_checker::{AvailabilityCheckError, DataAvailabilityChecker}; +use beacon_chain::data_availability_checker::DataAvailabilityChecker; use beacon_chain::BeaconChainTypes; use lighthouse_network::PeerId; use std::sync::Arc; use store::Hash256; use strum::IntoStaticStr; use types::blob_sidecar::FixedBlobSidecarList; -use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; +use types::{BlobSidecar, SignedBeaconBlock}; /// How many attempts we try to find a parent of a block before we give up trying. pub(crate) const PARENT_FAIL_TOLERANCE: u8 = 5; diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 3ee9f51987..36e976dad0 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -29,6 +29,44 @@ pub struct SingleBlockLookup { pub unknown_parent_components: Option>, } +impl SingleBlockLookup { + pub(crate) fn register_failure_downloading(&mut self, response_type: ResponseType) { + match response_type { + ResponseType::Block => self.block_request_state.register_failure_downloading(), + ResponseType::Blob => self.blob_request_state.register_failure_downloading(), + } + } +} + +impl SingleBlockLookup { + pub(crate) fn awaiting_download(&mut self, response_type: ResponseType) -> bool { + match response_type { + ResponseType::Block => { + matches!(self.block_request_state.state, State::AwaitingDownload) + } + ResponseType::Blob => matches!(self.blob_request_state.state, State::AwaitingDownload), + } + } + + pub(crate) fn remove_peer_if_useless(&mut self, peer_id: &PeerId, response_type: ResponseType) { + match response_type { + ResponseType::Block => self.block_request_state.remove_peer_if_useless(peer_id), + ResponseType::Blob => self.blob_request_state.remove_peer_if_useless(peer_id), + } + } + + pub(crate) fn check_peer_disconnected( + &mut self, + peer_id: &PeerId, + response_type: ResponseType, + ) -> Result<(), ()> { + match response_type { + ResponseType::Block => self.block_request_state.check_peer_disconnected(peer_id), + ResponseType::Blob => self.blob_request_state.check_peer_disconnected(peer_id), + } + } +} + #[derive(Default)] pub struct UnknownParentComponents { pub downloaded_block: Option>>, @@ -317,41 +355,17 @@ impl SingleBlockLookup= MAX_ATTEMPTS { + let request = BlocksByRootRequest { + block_roots: VariableList::from(vec![self.requested_block_root]), + }; + let response_type = ResponseType::Block; + if self.too_many_attempts(response_type) { Err(LookupRequestError::TooManyAttempts { - cannot_process: self.block_request_state.failed_processing - >= self.block_request_state.failed_downloading, + cannot_process: self.cannot_process(response_type), }) - } else if let Some(&peer_id) = self - .block_request_state - .available_peers - .iter() - .choose(&mut rand::thread_rng()) - { - let request = BlocksByRootRequest { - block_roots: VariableList::from(vec![self.requested_block_root]), - }; - self.block_request_state.used_peers.insert(peer_id); - let peer_source = PeerShouldHave::BlockAndBlobs(peer_id); - self.block_request_state.state = State::Downloading { - peer_id: peer_source, - }; - Ok(Some((peer_id, request))) - } else if let Some(&peer_id) = self - .block_request_state - .potential_peers - .iter() - .choose(&mut rand::thread_rng()) - { - let request = BlocksByRootRequest { - block_roots: VariableList::from(vec![self.requested_block_root]), - }; - self.block_request_state.used_peers.insert(peer_id); - let peer_source = PeerShouldHave::Neither(peer_id); - self.block_request_state.state = State::Downloading { - peer_id: peer_source, - }; - Ok(Some((peer_id, request))) + } else if let Some(peer_id) = self.get_peer(response_type) { + self.add_used_peer(peer_id, response_type); + Ok(Some((peer_id.to_peer_id(), request))) } else { Err(LookupRequestError::NoPeers) } @@ -370,46 +384,92 @@ impl SingleBlockLookup= MAX_ATTEMPTS { + let request = BlobsByRootRequest { + blob_ids: VariableList::from(self.requested_ids.clone()), + }; + let response_type = ResponseType::Blob; + if self.too_many_attempts(response_type) { Err(LookupRequestError::TooManyAttempts { - cannot_process: self.blob_request_state.failed_processing - >= self.blob_request_state.failed_downloading, + cannot_process: self.cannot_process(response_type), }) - } else if let Some(&peer_id) = self - .blob_request_state - .available_peers - .iter() - .choose(&mut rand::thread_rng()) - { - let request = BlobsByRootRequest { - blob_ids: VariableList::from(self.requested_ids.clone()), - }; - self.blob_request_state.used_peers.insert(peer_id); - let peer_source = PeerShouldHave::BlockAndBlobs(peer_id); - self.blob_request_state.state = State::Downloading { - peer_id: peer_source, - }; - Ok(Some((peer_id, request))) - } else if let Some(&peer_id) = self - .blob_request_state - .potential_peers - .iter() - .choose(&mut rand::thread_rng()) - { - let request = BlobsByRootRequest { - blob_ids: VariableList::from(self.requested_ids.clone()), - }; - self.blob_request_state.used_peers.insert(peer_id); - let peer_source = PeerShouldHave::Neither(peer_id); - self.blob_request_state.state = State::Downloading { - peer_id: peer_source, - }; - Ok(Some((peer_id, request))) + } else if let Some(peer_id) = self.get_peer(response_type) { + self.add_used_peer(peer_id, response_type); + Ok(Some((peer_id.to_peer_id(), request))) } else { Err(LookupRequestError::NoPeers) } } + fn too_many_attempts(&self, response_type: ResponseType) -> bool { + match response_type { + ResponseType::Block => self.block_request_state.failed_attempts() >= MAX_ATTEMPTS, + ResponseType::Blob => self.blob_request_state.failed_attempts() >= MAX_ATTEMPTS, + } + } + + fn cannot_process(&self, response_type: ResponseType) -> bool { + match response_type { + ResponseType::Block => { + self.block_request_state.failed_processing + >= self.block_request_state.failed_downloading + } + ResponseType::Blob => { + self.blob_request_state.failed_processing + >= self.blob_request_state.failed_downloading + } + } + } + + fn get_peer(&self, response_type: ResponseType) -> Option { + match response_type { + ResponseType::Block => self + .block_request_state + .available_peers + .iter() + .choose(&mut rand::thread_rng()) + .copied() + .map(PeerShouldHave::BlockAndBlobs) + .or(self + .block_request_state + .potential_peers + .iter() + .choose(&mut rand::thread_rng()) + .copied() + .map(PeerShouldHave::Neither)), + ResponseType::Blob => self + .blob_request_state + .available_peers + .iter() + .choose(&mut rand::thread_rng()) + .copied() + .map(PeerShouldHave::BlockAndBlobs) + .or(self + .blob_request_state + .potential_peers + .iter() + .choose(&mut rand::thread_rng()) + .copied() + .map(PeerShouldHave::Neither)), + } + } + + fn add_used_peer(&mut self, peer_id: PeerShouldHave, response_type: ResponseType) { + match response_type { + ResponseType::Block => { + self.block_request_state + .used_peers + .insert(peer_id.to_peer_id()); + self.block_request_state.state = State::Downloading { peer_id }; + } + ResponseType::Blob => { + self.blob_request_state + .used_peers + .insert(peer_id.to_peer_id()); + self.blob_request_state.state = State::Downloading { peer_id }; + } + } + } + pub fn add_peer_if_useful( &mut self, block_root: &Hash256, @@ -444,6 +504,17 @@ impl SingleBlockLookup self.blob_request_state.peer(), } } + + pub fn both_components_processed(&self) -> bool { + self.block_request_state.component_processed && self.block_request_state.component_processed + } + + pub fn set_component_processed(&mut self, response_type: ResponseType) { + match response_type { + ResponseType::Block => self.block_request_state.component_processed = true, + ResponseType::Blob => self.blob_request_state.component_processed = true, + } + } } impl SingleLookupRequestState { @@ -585,12 +656,15 @@ mod tests { use super::*; use beacon_chain::builder::Witness; use beacon_chain::eth1_chain::CachingEth1Backend; + use slog::Logger; + use sloggers::null::NullLoggerBuilder; + use sloggers::Build; use slot_clock::{SlotClock, TestingSlotClock}; use std::time::Duration; - use store::MemoryStore; + use store::{HotColdDB, MemoryStore, StoreConfig}; use types::{ test_utils::{SeedableRng, TestRandom, XorShiftRng}, - EthSpec, MinimalEthSpec as E, SignedBeaconBlock, Slot, + ChainSpec, EthSpec, MinimalEthSpec as E, SignedBeaconBlock, Slot, }; fn rand_block() -> SignedBeaconBlock { @@ -614,7 +688,13 @@ mod tests { Duration::from_secs(0), Duration::from_secs(spec.seconds_per_slot), ); - let da_checker = Arc::new(DataAvailabilityChecker::new(slot_clock, None, spec)); + let log = NullLoggerBuilder.build().expect("logger should build"); + let store = HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal(), log) + .expect("store"); + let da_checker = Arc::new( + DataAvailabilityChecker::new(slot_clock, None, store.into(), spec) + .expect("data availability checker"), + ); let mut sl = SingleBlockLookup::<4, T>::new(block.canonical_root(), None, peer_id, da_checker); sl.request_block().unwrap(); @@ -632,8 +712,14 @@ mod tests { Duration::from_secs(0), Duration::from_secs(spec.seconds_per_slot), ); + let log = NullLoggerBuilder.build().expect("logger should build"); + let store = HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal(), log) + .expect("store"); - let da_checker = Arc::new(DataAvailabilityChecker::new(slot_clock, None, spec)); + let da_checker = Arc::new( + DataAvailabilityChecker::new(slot_clock, None, store.into(), spec) + .expect("data availability checker"), + ); let mut sl = SingleBlockLookup::::new( block.canonical_root(), diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index a89eca2c28..f0793d3216 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -1704,8 +1704,8 @@ mod deneb_only { .block_response_triggering_process() .invalid_block_processed() .expect_penalty() + .expect_blobs_request() .expect_block_request() - .expect_no_blobs_request() .blobs_response() .missing_components_from_blob_request() .expect_no_penalty() @@ -1726,7 +1726,7 @@ mod deneb_only { .invalid_blob_processed() .expect_penalty() .expect_blobs_request() - .expect_no_block_request(); + .expect_block_request(); } #[test] @@ -1881,8 +1881,8 @@ mod deneb_only { .block_response_triggering_process() .invalid_block_processed() .expect_penalty() + .expect_blobs_request() .expect_block_request() - .expect_no_blobs_request() .blobs_response() .missing_components_from_blob_request() .expect_no_penalty() @@ -1903,7 +1903,7 @@ mod deneb_only { .invalid_blob_processed() .expect_penalty() .expect_blobs_request() - .expect_no_block_request(); + .expect_block_request(); } #[test] diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index a328fa74bf..66fcb0fe03 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -70,7 +70,7 @@ pub struct SyncNetworkContext { /// Small enumeration to make dealing with block and blob requests easier. pub enum BlockOrBlob { Block(Option>>), - Sidecar(Option>>), + Blob(Option>>), } impl From>>> for BlockOrBlob { @@ -81,7 +81,7 @@ impl From>>> for BlockOrBlob { impl From>>> for BlockOrBlob { fn from(blob: Option>>) -> Self { - BlockOrBlob::Sidecar(blob) + BlockOrBlob::Blob(blob) } } @@ -312,7 +312,7 @@ impl SyncNetworkContext { let info = &mut req.block_blob_info; match block_or_blob { BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block), - BlockOrBlob::Sidecar(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), + BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), } if info.is_finished() { // If the request is finished, dequeue everything @@ -389,7 +389,7 @@ impl SyncNetworkContext { let (_, info) = entry.get_mut(); match block_or_blob { BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block), - BlockOrBlob::Sidecar(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), + BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), } if info.is_finished() { // If the request is finished, dequeue everything