From 6bcfaf4fdea843302d4333d639a504a87e9690e8 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Tue, 1 Aug 2023 17:14:45 -0400 Subject: [PATCH] clean up everything add child component, fix peer scoring and retry logic --- .../src/block_verification_types.rs | 18 + .../src/network_beacon_processor/mod.rs | 4 + .../src/network_beacon_processor/tests.rs | 2 + .../network/src/sync/block_lookups/common.rs | 448 +++++++ .../network/src/sync/block_lookups/mod.rs | 1123 ++++++++++------- .../src/sync/block_lookups/parent_lookup.rs | 72 +- .../sync/block_lookups/single_block_lookup.rs | 1057 ++++++---------- .../network/src/sync/block_lookups/tests.rs | 44 +- beacon_node/network/src/sync/manager.rs | 78 +- beacon_node/network/src/sync/mod.rs | 2 +- .../network/src/sync/network_context.rs | 37 +- 11 files changed, 1654 insertions(+), 1231 deletions(-) create mode 100644 beacon_node/network/src/sync/block_lookups/common.rs diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index c41090c421..ab110c2104 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -5,8 +5,10 @@ use crate::eth1_finalization_cache::Eth1FinalizationData; use crate::{data_availability_checker, GossipVerifiedBlock, PayloadVerificationOutcome}; use derivative::Derivative; use ssz_derive::{Decode, Encode}; +use ssz_types::VariableList; use state_processing::ConsensusContext; use std::sync::Arc; +use types::blob_sidecar::FixedBlobSidecarList; use types::{ blob_sidecar::BlobIdentifier, ssz_tagged_beacon_state, ssz_tagged_signed_beacon_block, ssz_tagged_signed_beacon_block_arc, @@ -72,6 +74,22 @@ impl RpcBlock { Ok(Self { block: inner }) } + pub fn new_from_fixed( + block: Arc>, + blobs: FixedBlobSidecarList, + ) -> Result { + let filtered = blobs + .into_iter() + .filter_map(|b| b.clone()) + .collect::>(); + let blobs = if filtered.is_empty() { + None + } else { + Some(VariableList::from(filtered)) + }; + Self::new(block, blobs) + } + pub fn deconstruct(self) -> (Arc>, Option>) { match self.block { RpcBlockInner::Block(block) => (block, None), diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 4a214c3637..3906dcaaf6 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -434,6 +434,10 @@ impl NetworkBeaconProcessor { seen_timestamp: Duration, process_type: BlockProcessType, ) -> Result<(), Error> { + let blob_count = blobs.iter().filter(|b| b.is_some()).count(); + if blob_count == 0 { + return Ok(()); + } let process_fn = self.clone().generate_rpc_blobs_process_fn( block_root, blobs, diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 2c37d177aa..1bdc3a8816 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -318,6 +318,7 @@ impl TestRig { } pub fn enqueue_single_lookup_rpc_blobs(&self) { if let Some(blobs) = self.next_blobs.clone() { + dbg!(blobs.len()); let blobs = FixedBlobSidecarList::from( blobs .into_iter() @@ -1003,6 +1004,7 @@ async fn test_rpc_block_reprocessing() { rig.enqueue_single_lookup_rpc_blobs(); if rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0) > 0 { + dbg!("here"); rig.assert_event_journal(&[RPC_BLOBS, WORKER_FREED, NOTHING_TO_DO]) .await; } diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs new file mode 100644 index 0000000000..b4711e26be --- /dev/null +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -0,0 +1,448 @@ +use crate::sync::block_lookups::parent_lookup::PARENT_FAIL_TOLERANCE; +use crate::sync::block_lookups::single_block_lookup::{ + LookupRequestError, LookupVerifyError, SingleBlockLookup, SingleLookupRequestState, State, +}; +use crate::sync::block_lookups::{ + BlobRequestState, BlockLookups, BlockRequestState, PeerShouldHave, + SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, +}; +use crate::sync::manager::{BlockProcessType, Id, SingleLookupReqId}; +use crate::sync::network_context::SyncNetworkContext; +use crate::sync::ChildComponents; +use beacon_chain::block_verification_types::RpcBlock; +use beacon_chain::{get_block_root, BeaconChainTypes}; +use lighthouse_network::rpc::methods::BlobsByRootRequest; +use lighthouse_network::rpc::BlocksByRootRequest; +use lighthouse_network::PeerId; +use rand::prelude::IteratorRandom; +use ssz_types::VariableList; +use std::ops::IndexMut; +use std::sync::Arc; +use std::time::Duration; +use types::blob_sidecar::FixedBlobSidecarList; +use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock}; + +#[derive(Debug, Copy, Clone)] +pub enum ResponseType { + Block, + Blob, +} + +#[derive(Debug, Copy, Clone)] +pub enum LookupType { + Current, + Parent, +} + +/// This trait helps differentiate `SingleBlockLookup`s from `ParentLookup`s .This is useful in +/// ensuring requests and responses are handled separately and enables us to use different failure +/// tolerances for each, while re-using the same basic request and retry logic. +pub trait Lookup { + const MAX_ATTEMPTS: u8; + fn lookup_type() -> LookupType; + fn max_attempts() -> u8 { + Self::MAX_ATTEMPTS + } +} + +/// A `Lookup` that is a part of a `ParentLookup`. +pub struct Parent; + +impl Lookup for Parent { + const MAX_ATTEMPTS: u8 = PARENT_FAIL_TOLERANCE; + fn lookup_type() -> LookupType { + LookupType::Parent + } +} + +/// A `Lookup` that part of a single block lookup. +pub struct Current; + +impl Lookup for Current { + const MAX_ATTEMPTS: u8 = SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS; + fn lookup_type() -> LookupType { + LookupType::Current + } +} + +/// This trait unifies common single block lookup functionality across blocks and blobs. This +/// includes making requests, verifying responses, and handling processing results. A +/// `SingleBlockLookup` includes both a `BlockRequestState` and a `BlobRequestState`, this trait is +/// implemented for each. +/// +/// The use of the `ResponseType` associated type gives us a degree of type +/// safety when handling a block/blob response ensuring we only mutate the correct corresponding +/// state. +pub trait RequestState { + /// The type of the request . + type RequestType; + + /// A block or blob response. + type ResponseType; + + /// The type created after validation. + type VerifiedResponseType: Clone; + + /// We convert a `VerifiedResponseType` to this type prior to sending it to the beacon processor. + type ReconstructedResponseType; + + /* Request building methods */ + + /// Construct a new request. + fn build_request(&mut self) -> Result<(PeerId, Self::RequestType), LookupRequestError> { + debug_assert!(matches!(self.get_state().state, State::AwaitingDownload)); + self.too_many_attempts()?; + let peer = self.get_peer()?; + let request = self.new_request(); + self.get_state_mut().req_counter += 1; + Ok((peer, request)) + } + + /// Verify the current request has not exceeded the maximum number of attempts. + fn too_many_attempts(&self) -> Result<(), LookupRequestError> { + let max_attempts = L::max_attempts(); + let request_state = self.get_state(); + + if request_state.failed_attempts() >= max_attempts { + let cannot_process = + request_state.failed_processing >= request_state.failed_downloading; + Err(LookupRequestError::TooManyAttempts { cannot_process }) + } else { + Ok(()) + } + } + + /// Get the next peer to request. Draws from the set of peers we think should have both the + /// block and blob first. If that fails, we draw from the set of peers that may have either. + fn get_peer(&mut self) -> Result { + let request_state = self.get_state_mut(); + let available_peer_opt = request_state + .available_peers + .iter() + .choose(&mut rand::thread_rng()) + .copied() + .map(PeerShouldHave::BlockAndBlobs); + + let Some(peer_id) = available_peer_opt.or_else(||request_state + .potential_peers + .iter() + .choose(&mut rand::thread_rng()) + .copied() + .map(PeerShouldHave::Neither)) else { + return Err(LookupRequestError::NoPeers); + }; + request_state.used_peers.insert(peer_id.to_peer_id()); + request_state.state = State::Downloading { peer_id }; + Ok(peer_id.to_peer_id()) + } + + /// Initialize `Self::RequestType`. + fn new_request(&self) -> Self::RequestType; + + /// Send the request to the network service. + fn make_request( + id: SingleLookupReqId, + peer_id: PeerId, + request: Self::RequestType, + cx: &SyncNetworkContext, + ) -> Result<(), LookupRequestError>; + + /* Response handling methods */ + + /// Verify the response is valid based on what we requested. + fn verify_response( + &mut self, + expected_block_root: Hash256, + response: Option, + ) -> Result, LookupVerifyError> { + let request_state = self.get_state_mut(); + match request_state.state { + State::AwaitingDownload => { + request_state.register_failure_downloading(); + Err(LookupVerifyError::ExtraBlocksReturned) + } + State::Downloading { peer_id } => { + self.verify_response_inner(expected_block_root, response, peer_id) + } + State::Processing { peer_id: _ } => match response { + Some(_) => { + // We sent the block for processing and received an extra block. + request_state.register_failure_downloading(); + Err(LookupVerifyError::ExtraBlocksReturned) + } + None => { + // This is simply the stream termination and we are already processing the + // block + Ok(None) + } + }, + } + } + + /// The response verification unique to block or blobs. + fn verify_response_inner( + &mut self, + expected_block_root: Hash256, + response: Option, + peer_id: PeerShouldHave, + ) -> Result, LookupVerifyError>; + + /// A getter for the parent root of the response. Returns an `Option` because we won't know + /// the blob parent if we don't end up getting any blobs in the response. + fn get_parent_root(verified_response: &Self::VerifiedResponseType) -> Option; + + /// Caches the verified response in the lookup if necessary. This is only necessary for lookups + /// triggered by `UnknownParent` errors. + fn add_to_child_components( + verified_response: Self::VerifiedResponseType, + components: &mut ChildComponents, + ); + + /// Convert a verified response to the type we send to the beacon processor. + fn verified_to_reconstructed( + verified: Self::VerifiedResponseType, + ) -> Self::ReconstructedResponseType; + + /// Send the response to the beacon processor. + fn send_for_processing( + id: Id, + bl: &BlockLookups, + block_root: Hash256, + verified: Self::ReconstructedResponseType, + duration: Duration, + cx: &SyncNetworkContext, + ) -> Result<(), LookupRequestError>; + + /// Remove the peer from the lookup if it is useless. + fn remove_if_useless(&mut self, peer: &PeerId) { + self.get_state_mut().remove_peer_if_useless(peer) + } + + /// Register a failure to process the block or blob. + fn register_failure_downloading(&mut self) { + self.get_state_mut().register_failure_downloading() + } + + /* Utility methods */ + + /// Returns the `ResponseType` associated with this trait implementation. Useful in logging. + fn response_type() -> ResponseType; + + /// A getter for the `BlockRequestState` or `BlobRequestState` associated with this trait. + fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self; + + /// A getter for a reference to the `SingleLookupRequestState` associated with this trait. + fn get_state(&self) -> &SingleLookupRequestState; + + /// A getter for a mutable reference to the SingleLookupRequestState associated with this trait. + fn get_state_mut(&mut self) -> &mut SingleLookupRequestState; +} + +impl RequestState for BlockRequestState { + type RequestType = BlocksByRootRequest; + type ResponseType = Arc>; + type VerifiedResponseType = Arc>; + type ReconstructedResponseType = RpcBlock; + + fn new_request(&self) -> BlocksByRootRequest { + BlocksByRootRequest::new(VariableList::from(vec![self.requested_block_root])) + } + + fn make_request( + id: SingleLookupReqId, + peer_id: PeerId, + request: Self::RequestType, + cx: &SyncNetworkContext, + ) -> Result<(), LookupRequestError> { + cx.block_lookup_request(id, peer_id, request, L::lookup_type()) + .map_err(LookupRequestError::SendFailed) + } + + fn verify_response_inner( + &mut self, + expected_block_root: Hash256, + response: Option, + peer_id: PeerShouldHave, + ) -> Result>>, LookupVerifyError> { + match response { + Some(block) => { + // Compute the block root using this specific function so that we can get timing + // metrics. + let block_root = get_block_root(&block); + if block_root != expected_block_root { + // return an error and drop the block + // NOTE: we take this is as a download failure to prevent counting the + // attempt as a chain failure, but simply a peer failure. + self.state.register_failure_downloading(); + Err(LookupVerifyError::RootMismatch) + } else { + // Return the block for processing. + self.state.state = State::Processing { peer_id }; + Ok(Some(block)) + } + } + None => { + if peer_id.should_have_block() { + self.state.register_failure_downloading(); + Err(LookupVerifyError::NoBlockReturned) + } else { + self.state.state = State::AwaitingDownload; + Err(LookupVerifyError::BenignFailure) + } + } + } + } + + fn get_parent_root(verified_response: &Arc>) -> Option { + Some(verified_response.parent_root()) + } + + fn add_to_child_components( + verified_response: Arc>, + components: &mut ChildComponents, + ) { + components.add_unknown_parent_block(verified_response); + } + + fn verified_to_reconstructed( + block: Arc>, + ) -> RpcBlock { + RpcBlock::new_without_blobs(block) + } + + fn send_for_processing( + id: Id, + bl: &BlockLookups, + block_root: Hash256, + constructed: RpcBlock, + duration: Duration, + cx: &SyncNetworkContext, + ) -> Result<(), LookupRequestError> { + bl.send_block_for_processing( + block_root, + constructed, + duration, + BlockProcessType::SingleBlock { id }, + cx, + ) + } + + fn response_type() -> ResponseType { + ResponseType::Block + } + fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self { + &mut request.block_request_state + } + fn get_state(&self) -> &SingleLookupRequestState { + &self.state + } + fn get_state_mut(&mut self) -> &mut SingleLookupRequestState { + &mut self.state + } +} + +impl RequestState for BlobRequestState { + type RequestType = BlobsByRootRequest; + type ResponseType = Arc>; + type VerifiedResponseType = FixedBlobSidecarList; + type ReconstructedResponseType = FixedBlobSidecarList; + + fn new_request(&self) -> BlobsByRootRequest { + BlobsByRootRequest { + blob_ids: VariableList::from(self.requested_ids.clone()), + } + } + + fn make_request( + id: SingleLookupReqId, + peer_id: PeerId, + request: Self::RequestType, + cx: &SyncNetworkContext, + ) -> Result<(), LookupRequestError> { + cx.blob_lookup_request(id, peer_id, request, L::lookup_type()) + .map_err(LookupRequestError::SendFailed) + } + + fn verify_response_inner( + &mut self, + _expected_block_root: Hash256, + blob: Option, + peer_id: PeerShouldHave, + ) -> Result>, LookupVerifyError> { + match blob { + Some(blob) => { + let received_id = blob.id(); + if !self.requested_ids.contains(&received_id) { + self.state.register_failure_downloading(); + Err(LookupVerifyError::UnrequestedBlobId) + } else { + // State should remain downloading until we receive the stream terminator. + self.requested_ids.retain(|id| *id != received_id); + let blob_index = blob.index; + + if blob_index >= T::EthSpec::max_blobs_per_block() as u64 { + return Err(LookupVerifyError::InvalidIndex(blob.index)); + } + *self.blob_download_queue.index_mut(blob_index as usize) = Some(blob); + Ok(None) + } + } + None => { + self.state.state = State::Processing { peer_id }; + let blobs = std::mem::take(&mut self.blob_download_queue); + Ok(Some(blobs)) + } + } + } + + fn get_parent_root(verified_response: &FixedBlobSidecarList) -> Option { + verified_response + .into_iter() + .filter_map(|blob| blob.as_ref()) + .map(|blob| blob.block_parent_root) + .next() + } + + fn add_to_child_components( + verified_response: FixedBlobSidecarList, + components: &mut ChildComponents, + ) { + components.add_unknown_parent_blobs(verified_response); + } + + fn verified_to_reconstructed( + blobs: FixedBlobSidecarList, + ) -> FixedBlobSidecarList { + blobs + } + + fn send_for_processing( + id: Id, + bl: &BlockLookups, + block_root: Hash256, + verified: FixedBlobSidecarList, + duration: Duration, + cx: &SyncNetworkContext, + ) -> Result<(), LookupRequestError> { + bl.send_blobs_for_processing( + block_root, + verified, + duration, + BlockProcessType::SingleBlob { id }, + cx, + ) + } + + fn response_type() -> ResponseType { + ResponseType::Blob + } + fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self { + &mut request.blob_request_state + } + fn get_state(&self) -> &SingleLookupRequestState { + &self.state + } + fn get_state_mut(&mut self) -> &mut SingleLookupRequestState { + &mut self.state + } +} diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 79ef277468..43fb533018 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -5,31 +5,38 @@ use super::BatchProcessResult; use super::{manager::BlockProcessType, network_context::SyncNetworkContext}; use crate::metrics; use crate::network_beacon_processor::ChainSegmentProcessId; -use crate::sync::block_lookups::parent_lookup::ParentLookup; -use crate::sync::block_lookups::single_block_lookup::LookupVerifyError; -use crate::sync::manager::{Id, ResponseType}; +use crate::sync::block_lookups::parent_lookup::{ParentLookup, RequestError}; +use crate::sync::block_lookups::single_block_lookup::{ + CachedChild, LookupRequestError, LookupVerifyError, +}; +use crate::sync::manager::{Id, SingleLookupReqId}; use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; -use beacon_chain::data_availability_checker::{AvailabilityCheckError, DataAvailabilityChecker}; +use beacon_chain::data_availability_checker::DataAvailabilityChecker; +use beacon_chain::validator_monitor::timestamp_now; use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; +pub use common::Current; +pub use common::Lookup; +use common::LookupType; +pub use common::Parent; +pub use common::RequestState; use fnv::FnvHashMap; use lighthouse_network::rpc::RPCError; use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; -pub use single_block_lookup::UnknownParentComponents; -pub use single_block_lookup::{ - BlobRequestState, BlockRequestState, Current, Lookup, Parent, RequestState, -}; +pub use single_block_lookup::ChildComponents; +pub use single_block_lookup::{BlobRequestState, BlockRequestState}; use slog::{debug, error, trace, warn, Logger}; use smallvec::SmallVec; use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; use std::time::Duration; -use store::{Hash256, SignedBeaconBlock}; +use store::Hash256; use strum::Display; use types::blob_sidecar::FixedBlobSidecarList; use types::Slot; +pub mod common; pub(crate) mod delayed_lookup; mod parent_lookup; mod single_block_lookup; @@ -37,29 +44,10 @@ mod single_block_lookup; mod tests; pub type DownloadedBlocks = (Hash256, RpcBlock); -pub type RootBlockTuple = (Hash256, Arc>); -pub type RootBlobsTuple = (Hash256, FixedBlobSidecarList); const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60; pub const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3; -pub struct BlockLookups { - /// Parent chain lookups being downloaded. - parent_lookups: SmallVec<[ParentLookup; 3]>, - - processing_parent_lookups: HashMap, SingleBlockLookup)>, - - /// A cache of failed chain lookups to prevent duplicate searches. - failed_chains: LRUTimeCache, - - single_block_lookups: FnvHashMap>, - - da_checker: Arc>, - - /// The logger for the import manager. - log: Logger, -} - /// This enum is used to track what a peer *should* be able to respond with respond based on /// other messages we've seen from this peer on the network. This is useful for peer scoring. /// We expect a peer tracked by the `BlockAndBlobs` variant to be able to respond to all @@ -97,6 +85,23 @@ impl PeerShouldHave { } } +pub struct BlockLookups { + /// Parent chain lookups being downloaded. + parent_lookups: SmallVec<[ParentLookup; 3]>, + + processing_parent_lookups: HashMap, SingleBlockLookup)>, + + /// A cache of failed chain lookups to prevent duplicate searches. + failed_chains: LRUTimeCache, + + single_block_lookups: FnvHashMap>, + + da_checker: Arc>, + + /// The logger for the import manager. + log: Logger, +} + impl BlockLookups { pub fn new(da_checker: Arc>, log: Logger) -> Self { Self { @@ -143,14 +148,14 @@ impl BlockLookups { /// Creates a lookup for the block with the given `block_root`, while caching other block /// components we've already received. The block components are cached here because we haven't - /// imported it's parent and therefore can't fully validate it and store it in the data + /// imported its parent and therefore can't fully validate it and store it in the data /// availability cache. /// /// The request is immediately triggered. pub fn search_child_block( &mut self, block_root: Hash256, - parent_components: Option>, + parent_components: Option>, peer_source: &[PeerShouldHave], cx: &mut SyncNetworkContext, ) { @@ -170,7 +175,7 @@ impl BlockLookups { pub fn search_child_delayed( &mut self, block_root: Hash256, - parent_components: Option>, + parent_components: Option>, peer_source: &[PeerShouldHave], cx: &mut SyncNetworkContext, ) { @@ -186,13 +191,19 @@ impl BlockLookups { mut single_block_lookup: SingleBlockLookup, cx: &SyncNetworkContext, ) { - if !single_block_lookup.triggered && single_block_lookup.request_block_and_blobs(cx).is_ok() - { - single_block_lookup.triggered = true; - self.add_single_lookup(single_block_lookup) + let block_root = single_block_lookup.block_root(); + match single_block_lookup.request_block_and_blobs(cx) { + Ok(()) => self.add_single_lookup(single_block_lookup), + Err(e) => { + debug!(self.log, "Single block lookup failed"; + "error" => ?e, + "block_root" => ?block_root, + ); + } } } + /// Adds a lookup to the `single_block_lookups` map. pub fn add_single_lookup(&mut self, single_block_lookup: SingleBlockLookup) { self.single_block_lookups .insert(single_block_lookup.id, single_block_lookup); @@ -203,25 +214,20 @@ impl BlockLookups { ); } - pub fn trigger_lookup_by_root( - &mut self, - block_root: Hash256, - cx: &SyncNetworkContext, - ) -> Result<(), ()> { - for (_, lookup) in self.single_block_lookups.iter_mut() { - if lookup.block_request_state.requested_block_root == block_root - && !lookup.triggered - && lookup.request_block_and_blobs(cx).is_ok() - { - lookup.triggered = true; + /// Trigger any lookups that are waiting for the given `block_root`. + pub fn trigger_lookup_by_root(&mut self, block_root: Hash256, cx: &SyncNetworkContext) { + self.single_block_lookups.retain(|_id, lookup| { + if lookup.block_root() == block_root { + if let Err(e) = lookup.request_block_and_blobs(cx) { + debug!(self.log, "Delayed single block lookup failed"; + "error" => ?e, + "block_root" => ?block_root, + ); + return false; + } } - } - Ok(()) - } - - pub fn remove_lookup_by_root(&mut self, block_root: Hash256) { - self.single_block_lookups - .retain(|_id, lookup| lookup.block_request_state.requested_block_root != block_root); + true + }); } /// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is @@ -229,7 +235,7 @@ impl BlockLookups { pub fn new_current_lookup( &mut self, block_root: Hash256, - parent_components: Option>, + parent_components: Option>, peers: &[PeerShouldHave], cx: &mut SyncNetworkContext, ) -> Option> { @@ -241,7 +247,7 @@ impl BlockLookups { { lookup.add_peers(peers); if let Some(components) = parent_components { - lookup.add_unknown_parent_components(components); + lookup.add_child_components(components); } return None; } @@ -319,37 +325,68 @@ impl BlockLookups { // we are already processing this block, ignore it. return; } - let mut parent_lookup = ParentLookup::new( + let parent_lookup = ParentLookup::new( block_root, parent_root, peer_source, self.da_checker.clone(), cx, ); - if let Ok(()) = parent_lookup - .current_parent_request - .request_block_and_blobs(cx) - { - self.parent_lookups.push(parent_lookup); - } + self.request_parent(parent_lookup, cx); } /* Lookup responses */ + /// Get a single block lookup by its ID. This method additionally ensures the `req_counter` + /// matches the current `req_counter` for the lookup. This any stale responses from requests + /// that have been retried are ignored. + fn get_single_lookup>( + &mut self, + id: SingleLookupReqId, + ) -> Option> { + let mut lookup = self.single_block_lookups.remove(&id.id)?; + + let request_state = R::request_state_mut(&mut lookup); + if id.req_counter != request_state.get_state().req_counter { + // We don't want to drop the lookup, just ignore the old response. + self.single_block_lookups.insert(id.id, lookup); + return None; + } + Some(lookup) + } + + /// Checks whether a single block lookup is waiting for a parent lookup to complete. This is + /// necessary because we want to make sure all parents are processed before sending a child + /// for processing, otherwise the block will fail validation and will be returned to the network + /// layer with an `UnknownParent` error. + pub fn has_pending_parent_request(&self, target_id: Id) -> bool { + self.single_block_lookups.iter().any(|(id, lookup)| { + if *id == target_id { + self.parent_lookups + .iter() + .any(|parent_lookup| parent_lookup.chain_hash() == lookup.block_root()) + } else { + false + } + }) + } + + /// Process a block or blob response received from a single lookup request. pub fn single_lookup_response>( &mut self, - id: Id, + lookup_id: SingleLookupReqId, peer_id: PeerId, response: Option, seen_timestamp: Duration, cx: &SyncNetworkContext, ) { - let is_stream_terminator = response.is_none(); + let id = lookup_id.id; let response_type = R::response_type(); - let log = self.log.clone(); - let Some(lookup) = self.single_block_lookups.get_mut(&id) else { - if !is_stream_terminator { + let chain_hash_opt = self.has_pending_parent_request(id); + + let Some(lookup) = self.get_single_lookup::(lookup_id) else { + if response.is_some() { warn!( self.log, "Block returned for single block lookup not present"; @@ -359,150 +396,258 @@ impl BlockLookups { return; }; - let expected_block_root = lookup.block_request_state.requested_block_root; + let expected_block_root = lookup.block_root(); - let has_pending_parent_request = self - .parent_lookups - .iter() - .any(|parent_lookup| parent_lookup.chain_hash() == expected_block_root); - - let request_state = R::request_state_mut(lookup); - - match request_state.verify_response(expected_block_root, response) { - Ok(Some((root, verified_response))) => { - if let Some(parent_components) = lookup.unknown_parent_components.as_mut() { - R::add_to_parent_components(verified_response, parent_components); - - if !has_pending_parent_request { - if let Some(rpc_block) = lookup.get_downloaded_block() { - if let Err(()) = self.send_block_for_processing( - expected_block_root, - rpc_block, - seen_timestamp, - BlockProcessType::SingleBlock { id }, - cx, - ) { - self.single_block_lookups.remove(&id); - } - } - } - } else if let Err(()) = R::send_for_processing( - id, - self, - root, - R::verified_to_reconstructed(verified_response), - seen_timestamp, - cx, - ) { - self.single_block_lookups.remove(&id); - } + match self.single_lookup_response_inner::( + peer_id, + response, + seen_timestamp, + cx, + chain_hash_opt, + lookup, + ) { + Ok(lookup) => { + self.single_block_lookups.insert(id, lookup); } - Ok(None) => {} Err(e) => { - let msg = if matches!(e, LookupVerifyError::BenignFailure) { - request_state - .get_state_mut() - .remove_peer_if_useless(&peer_id); - "peer could not response to request" - } else { - let msg = e.into(); - cx.report_peer(peer_id, PeerAction::LowToleranceError, msg); - msg - }; - - debug!(log, "Single block lookup failed"; - "peer_id" => %peer_id, - "error" => msg, + debug!(self.log, + "Single lookup retry failed"; + "error" => ?e, "block_root" => ?expected_block_root, - "response_type" => ?response_type ); - if let Err(()) = request_state.retry_request_after_failure(id, cx, &log) { - self.single_block_lookups.remove(&id); - } } } - //TODO(sean) move metric to trait to differentiate block and blob metrics::set_gauge( &metrics::SYNC_SINGLE_BLOCK_LOOKUPS, self.single_block_lookups.len() as i64, ); } - /// Process a response received from a parent lookup request. - pub fn parent_lookup_response>( - &mut self, - id: Id, + /// Consolidates error handling for `single_lookup_response`. An `Err` here should always mean + /// the lookup is dropped. + fn single_lookup_response_inner>( + &self, peer_id: PeerId, - block: Option, + response: Option, seen_timestamp: Duration, cx: &SyncNetworkContext, - ) { + delay_send: bool, + mut lookup: SingleBlockLookup, + ) -> Result, LookupRequestError> { + let response_type = R::response_type(); + let log = self.log.clone(); + let expected_block_root = lookup.block_root(); + let request_state = R::request_state_mut(&mut lookup); + + match request_state.verify_response(expected_block_root, response) { + Ok(Some(verified_response)) => { + self.handle_verified_response::( + seen_timestamp, + cx, + delay_send, + None, + verified_response, + &mut lookup, + )?; + } + Ok(None) => {} + Err(e) => { + debug!( + log, + "Single lookup response verification failed, retrying"; + "block_root" => ?expected_block_root, + "peer_id" => %peer_id, + "response_type" => ?response_type, + "error" => ?e + ); + if matches!(e, LookupVerifyError::BenignFailure) { + request_state + .get_state_mut() + .remove_peer_if_useless(&peer_id); + } else { + let msg = e.into(); + cx.report_peer(peer_id, PeerAction::LowToleranceError, msg); + }; + + request_state.register_failure_downloading(); + lookup.request_block_and_blobs(cx)?; + } + } + Ok(lookup) + } + + fn handle_verified_response>( + &self, + seen_timestamp: Duration, + cx: &SyncNetworkContext, + delay_send: bool, + chain_hash: Option, + verified_response: R::VerifiedResponseType, + lookup: &mut SingleBlockLookup, + ) -> Result<(), LookupRequestError> { + let id = lookup.id; + let block_root = lookup.block_root(); + + R::request_state_mut(lookup) + .get_state_mut() + .component_downloaded = true; + + let cached_child = lookup.add_response::(verified_response.clone()); + + match cached_child { + CachedChild::Ok(block) => { + if let Some(chain_hash) = chain_hash { + if !delay_send { + let process_type = match L::lookup_type() { + LookupType::Parent => BlockProcessType::ParentLookup { chain_hash }, + LookupType::Current => BlockProcessType::SingleBlock { id }, + }; + self.send_block_for_processing( + block_root, + block, + seen_timestamp, + process_type, + cx, + )? + } + } + } + CachedChild::DownloadIncomplete => { + // If this was the result of a block request, we can't determined if the block peer + // did anything wrong. If we already had both a block and blobs response processed, + // we should penalize the blobs peer because they did not provide all blobs on the + // initial request. + if lookup.both_components_downloaded() { + lookup.penalize_lazy_blob_peer(cx); + lookup + .blob_request_state + .state + .register_failure_downloading(); + } + lookup.request_block_and_blobs(cx)?; + } + CachedChild::NotRequired => R::send_for_processing( + id, + self, + block_root, + R::verified_to_reconstructed(verified_response), + seen_timestamp, + cx, + )?, + CachedChild::Err(e) => { + warn!(self.log, "Consistency error in cached block"; + "error" => ?e, + "block_root" => ?block_root + ); + lookup.handle_consistency_failure(cx); + lookup.request_block_and_blobs(cx)?; + } + } + Ok(()) + } + + /// Get a parent block lookup by its ID. This method additionally ensures the `req_counter` + /// matches the current `req_counter` for the lookup. This any stale responses from requests + /// that have been retried are ignored. + fn get_parent_lookup>( + &mut self, + id: SingleLookupReqId, + ) -> Option> { let mut parent_lookup = if let Some(pos) = self .parent_lookups .iter() - .position(|request| request.current_parent_request.id == id) + .position(|request| request.current_parent_request.id == id.id) { self.parent_lookups.remove(pos) } else { - if block.is_some() { + return None; + }; + + if R::request_state_mut(&mut parent_lookup.current_parent_request) + .get_state() + .req_counter + != id.req_counter + { + self.parent_lookups.push(parent_lookup); + return None; + } + Some(parent_lookup) + } + + /// Process a response received from a parent lookup request. + pub fn parent_lookup_response>( + &mut self, + id: SingleLookupReqId, + peer_id: PeerId, + response: Option, + seen_timestamp: Duration, + cx: &SyncNetworkContext, + ) { + let Some(mut parent_lookup) = self.get_parent_lookup::(id) else { + if response.is_some() { debug!(self.log, "Response for a parent lookup request that was not found"; "peer_id" => %peer_id); } - return; + return }; - match parent_lookup.verify_block::(block, &mut self.failed_chains) { - Ok(Some((block_root, verified_response))) => { - if let Some(parent_components) = parent_lookup - .current_parent_request - .unknown_parent_components - .as_mut() - { - R::add_to_parent_components(verified_response, parent_components); - } - if let Some(rpc_block) = parent_lookup.current_parent_request.get_downloaded_block() - { - let chain_hash = parent_lookup.chain_hash(); - if self - .send_block_for_processing( - block_root, - rpc_block, - seen_timestamp, - BlockProcessType::ParentLookup { chain_hash }, - cx, - ) - .is_ok() - { - self.parent_lookups.push(parent_lookup) - } - } else { - //TODO(sean) here, we could penalize a peer who previously sent us a blob list - // that was incomplete, and trigger a re-request immediately - self.parent_lookups.push(parent_lookup) - } - } - Ok(None) => { - // Request finished successfully, nothing else to do. It will be removed after the - // processing result arrives. + match self.parent_lookup_response_inner::( + peer_id, + response, + seen_timestamp, + cx, + &mut parent_lookup, + ) { + Ok(()) => { + debug!(self.log, "Requesting parent"; &parent_lookup); self.parent_lookups.push(parent_lookup); } - Err(e) => self.handle_parent_verify_error::(peer_id, parent_lookup, e, cx), - }; + Err(e) => { + self.handle_parent_request_error(&mut parent_lookup, cx, e); + } + } - //TODO(sean) move metric to trait to differentiate block and blob metrics::set_gauge( &metrics::SYNC_PARENT_BLOCK_LOOKUPS, self.parent_lookups.len() as i64, ); } + /// Consolidates error handling for `parent_lookup_response`. An `Err` here should always mean + /// the lookup is dropped. + fn parent_lookup_response_inner>( + &mut self, + peer_id: PeerId, + block: Option, + seen_timestamp: Duration, + cx: &SyncNetworkContext, + parent_lookup: &mut ParentLookup, + ) -> Result<(), RequestError> { + match parent_lookup.verify_block::(block, &mut self.failed_chains) { + Ok(Some(verified_response)) => { + self.handle_verified_response::( + seen_timestamp, + cx, + false, + Some(parent_lookup.chain_hash()), + verified_response, + &mut parent_lookup.current_parent_request, + )?; + } + Ok(None) => {} + Err(e) => self.handle_parent_verify_error::(peer_id, parent_lookup, e, cx)?, + }; + Ok(()) + } + + /// Handle logging and peer scoring for `ParentVerifyError`s during parent lookup requests. fn handle_parent_verify_error>( &mut self, peer_id: PeerId, - mut parent_lookup: ParentLookup, + parent_lookup: &mut ParentLookup, e: ParentVerifyError, cx: &SyncNetworkContext, - ) { + ) -> Result<(), RequestError> { match e { ParentVerifyError::RootMismatch | ParentVerifyError::NoBlockReturned @@ -520,7 +665,7 @@ impl BlockLookups { cx.report_peer(peer_id, PeerAction::LowToleranceError, e); // We try again if possible. - self.request_parent(parent_lookup, cx) + parent_lookup.request_parent(cx)?; } ParentVerifyError::PreviousFailure { parent_root } => { debug!( @@ -544,7 +689,47 @@ impl BlockLookups { ); let request_state = R::request_state_mut(&mut parent_lookup.current_parent_request); request_state.remove_if_useless(&peer_id); - self.request_parent(parent_lookup, cx) + parent_lookup.request_parent(cx)?; + } + } + Ok(()) + } + + /// Handle logging and peer scoring for `RequestError`s during parent lookup requests. + fn handle_parent_request_error( + &mut self, + parent_lookup: &mut ParentLookup, + cx: &SyncNetworkContext, + e: RequestError, + ) { + debug!(self.log, "Failed to request parent"; "error" => e.as_static()); + match e { + RequestError::SendFailed(_) => { + // Probably shutting down, nothing to do here. Drop the request + } + RequestError::ChainTooLong => { + self.failed_chains.insert(parent_lookup.chain_hash()); + // This indicates faulty peers. + for &peer_id in parent_lookup.used_peers() { + cx.report_peer(peer_id, PeerAction::LowToleranceError, e.as_static()) + } + } + RequestError::TooManyAttempts { cannot_process } => { + // We only consider the chain failed if we were unable to process it. + // We could have failed because one peer continually failed to send us + // bad blocks. We still allow other peers to send us this chain. Note + // that peers that do this, still get penalised. + if cannot_process { + self.failed_chains.insert(parent_lookup.chain_hash()); + } + // This indicates faulty peers. + for &peer_id in parent_lookup.used_peers() { + cx.report_peer(peer_id, PeerAction::LowToleranceError, e.as_static()) + } + } + RequestError::NoPeers => { + // This happens if the peer disconnects while the block is being + // processed. Drop the request without extra penalty } } } @@ -553,18 +738,19 @@ impl BlockLookups { pub fn peer_disconnected(&mut self, peer_id: &PeerId, cx: &mut SyncNetworkContext) { /* Check disconnection for single lookups */ - self.single_block_lookups.retain(|id, req| { - let should_remove_lookup = - req.should_remove_disconnected_peer(*id, peer_id, cx, &self.log); + self.single_block_lookups.retain(|_, req| { + let should_drop_lookup = + req.should_drop_lookup_on_disconnected_peer(peer_id, cx, &self.log); - !should_remove_lookup + !should_drop_lookup }); /* Check disconnection for parent lookups */ - while let Some(pos) = self.parent_lookups.iter_mut().position(|req| { - req.check_block_peer_disconnected(peer_id).is_err() - || req.check_blob_peer_disconnected(peer_id).is_err() - }) { + while let Some(pos) = self + .parent_lookups + .iter_mut() + .position(|req| req.check_peer_disconnected(peer_id).is_err()) + { let parent_lookup = self.parent_lookups.remove(pos); trace!(self.log, "Parent lookup's peer disconnected"; &parent_lookup); self.request_parent(parent_lookup, cx); @@ -574,26 +760,25 @@ impl BlockLookups { /// An RPC error has occurred during a parent lookup. This function handles this case. pub fn parent_lookup_failed>( &mut self, - id: Id, + id: SingleLookupReqId, peer_id: PeerId, cx: &SyncNetworkContext, error: RPCError, ) { let msg = error.as_static_str(); - if let Some(pos) = self - .parent_lookups - .iter() - .position(|request| request.current_parent_request.id == id) - { - let mut parent_lookup = self.parent_lookups.remove(pos); - R::request_state_mut(&mut parent_lookup.current_parent_request) - .register_failure_downloading(); - trace!(self.log, "Parent lookup block request failed"; &parent_lookup, "error" => msg); - - self.request_parent(parent_lookup, cx); - } else { - return debug!(self.log, "RPC failure for a block parent lookup request that was not found"; "peer_id" => %peer_id, "error" => msg); + let Some(mut parent_lookup) = self.get_parent_lookup::(id) else { + debug!(self.log, + "RPC failure for a block parent lookup request that was not found"; + "peer_id" => %peer_id, + "error" => msg + ); + return }; + R::request_state_mut(&mut parent_lookup.current_parent_request) + .register_failure_downloading(); + trace!(self.log, "Parent lookup block request failed"; &parent_lookup, "error" => msg); + + self.request_parent(parent_lookup, cx); metrics::set_gauge( &metrics::SYNC_PARENT_BLOCK_LOOKUPS, @@ -601,32 +786,40 @@ impl BlockLookups { ); } + /// An RPC error has occurred during a single lookup. This function handles this case.\ pub fn single_block_lookup_failed>( &mut self, - id: Id, + id: SingleLookupReqId, peer_id: &PeerId, cx: &SyncNetworkContext, error: RPCError, ) { let msg = error.as_static_str(); - let Some(lookup) = self.single_block_lookups.get_mut(&id) else { - debug!(self.log, "Error response to dropped lookup"; "error" => ?error); + let log = self.log.clone(); + let Some(mut lookup) = self.get_single_lookup::(id) else { + debug!(log, "Error response to dropped lookup"; "error" => ?error); return; }; - let root = lookup.block_request_state.requested_block_root; - let request_state = R::request_state_mut(lookup); - request_state.register_failure_downloading(); + let block_root = lookup.block_root(); + let request_state = R::request_state_mut(&mut lookup); let response_type = R::response_type(); - trace!(self.log, + trace!(log, "Single lookup failed"; - "block_root" => ?root, + "block_root" => ?block_root, "error" => msg, "peer_id" => %peer_id, "response_type" => ?response_type ); - if let Err(()) = request_state.retry_request_after_failure(id, cx, &self.log) { + let id = id.id; + request_state.register_failure_downloading(); + if let Err(e) = lookup.request_block_and_blobs(cx) { + debug!(self.log, + "Single lookup retry failed"; + "error" => ?e, + "block_root" => ?block_root, + ); self.single_block_lookups.remove(&id); - }; + } metrics::set_gauge( &metrics::SYNC_SINGLE_BLOCK_LOOKUPS, @@ -642,64 +835,35 @@ impl BlockLookups { result: BlockProcessingResult, cx: &mut SyncNetworkContext, ) { - let Some(request_ref) = self.single_block_lookups.get_mut(&target_id) else { - debug!(self.log, "Block component processed for single block lookup not present" ); + let Some(mut lookup) = self.single_block_lookups.remove(&target_id) else { return; }; - let root = request_ref.block_request_state.requested_block_root; - let request_state = R::request_state_mut(request_ref); - let peer_id = request_state.get_state().processing_peer(); - request_state.get_state_mut().component_processed = true; + let root = lookup.block_root(); + let request_state = R::request_state_mut(&mut lookup); - let peer_id = match peer_id { - Ok(peer) => peer, - Err(_) => return, + let Ok(peer_id) = request_state.get_state().processing_peer() else { + return }; + debug!( + self.log, + "Block component processed for single block lookup not present" + ); match result { BlockProcessingResult::Ok(status) => match status { AvailabilityProcessingStatus::Imported(root) => { trace!(self.log, "Single block processing succeeded"; "block" => %root); - self.single_block_lookups.remove(&target_id); } AvailabilityProcessingStatus::MissingComponents(_, _block_root) => { - // if this was the result of a blocks request, the block peer did nothing wrong. - // if we already had a blobs resposne, we should penalize the blobs peer because - // they did not provide all blobs. - if request_ref.both_components_processed() { - if let Ok(blob_peer) = - request_ref.blob_request_state.state.processing_peer() - { - if let PeerShouldHave::BlockAndBlobs(blob_peer) = blob_peer { - cx.report_peer( - blob_peer, - PeerAction::MidToleranceError, - "single_block_failure", - ); - } - request_ref - .blob_request_state - .state - .remove_peer_if_useless(blob_peer.as_peer_id()); - if !::EthSpec, - > as RequestState>::downloading( - &request_ref.blob_request_state, - ) { - // Try it again if possible. - if let Err(()) = request_ref - .blob_request_state - .retry_request_after_failure(target_id, cx, &self.log) - { - self.single_block_lookups.remove(&target_id); - }; - } - } else { - trace!(self.log, "Dropped blob peer prior to penalizing"; "root" => ?root); - self.single_block_lookups.remove(&target_id); - }; + match self.handle_missing_components::(cx, &mut lookup) { + Ok(()) => { + self.single_block_lookups.insert(target_id, lookup); + } + Err(e) => { + // Drop with an additional error. + warn!(self.log, "Single block lookup failed"; "block" => %root, "error" => ?e); + } } } }, @@ -711,89 +875,132 @@ impl BlockLookups { "Single block processing was ignored, cpu might be overloaded"; "action" => "dropping single block request" ); - self.single_block_lookups.remove(&target_id); } BlockProcessingResult::Err(e) => { - trace!(self.log, "Single block processing failed"; "block" => %root, "error" => %e); - match e { - BlockError::BlockIsAlreadyKnown => { - // No error here - self.single_block_lookups.remove(&target_id); + match self.handle_single_lookup_block_error(cx, lookup, peer_id, e) { + Ok(Some(lookup)) => { + self.single_block_lookups.insert(target_id, lookup); } - BlockError::BeaconChainError(e) => { - // Internal error - error!(self.log, "Beacon chain error processing single block"; "block_root" => %root, "error" => ?e); - self.single_block_lookups.remove(&target_id); + Ok(None) => { + // Drop without an additional error. } - BlockError::ParentUnknown(block) => { - let slot = block.slot(); - let parent_root = block.parent_root(); - request_ref.add_unknown_parent_components(block.into()); - self.search_parent(slot, root, parent_root, peer_id.to_peer_id(), cx); - } - ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => { - // These errors indicate that the execution layer is offline - // and failed to validate the execution payload. Do not downscore peer. - debug!( - self.log, - "Single block lookup failed. Execution layer is offline / unsynced / misconfigured"; - "root" => %root, - "error" => ?e - ); - self.single_block_lookups.remove(&target_id); - } - BlockError::AvailabilityCheck( - AvailabilityCheckError::KzgVerificationFailed, - ) - | BlockError::AvailabilityCheck(AvailabilityCheckError::Kzg(_)) - | BlockError::BlobValidation(_) => { - warn!(self.log, "Blob validation failure"; "root" => %root, "peer_id" => %peer_id); - if let Ok(blob_peer) = - request_ref.blob_request_state.state.processing_peer() - { - cx.report_peer( - blob_peer.to_peer_id(), - PeerAction::MidToleranceError, - "single_blob_failure", - ); - // Try it again if possible. - if let Err(()) = request_ref - .blob_request_state - .retry_request_after_failure(target_id, cx, &self.log) - { - self.single_block_lookups.remove(&target_id); - }; - } - } - other => { - warn!(self.log, "Peer sent invalid block in single block lookup"; "root" => %root, "error" => ?other, "peer_id" => %peer_id); - if let Ok(block_peer) = - request_ref.block_request_state.state.processing_peer() - { - cx.report_peer( - block_peer.to_peer_id(), - PeerAction::MidToleranceError, - "single_block_failure", - ); - - // Try it again if possible. - if let Err(()) = request_ref - .blob_request_state - .retry_request_after_failure(target_id, cx, &self.log) - { - self.single_block_lookups.remove(&target_id); - }; - } + Err(e) => { + // Drop with an additional error. + warn!(self.log, "Single block lookup failed"; "block" => %root, "error" => ?e); } } } }; + } - //TODO(sean) move metrics to lookup response trait - metrics::set_gauge( - &metrics::SYNC_SINGLE_BLOCK_LOOKUPS, - self.single_block_lookups.len() as i64, - ); + /// Handles a `MissingComponents` block processing error. Handles peer scoring and retries. + /// + /// If this was the result of a block request, we can't determined if the block peer did anything + /// wrong. If we already had both a block and blobs response processed, we should penalize the + /// blobs peer because they did not provide all blobs on the initial request. + fn handle_missing_components>( + &self, + cx: &mut SyncNetworkContext, + lookup: &mut SingleBlockLookup, + ) -> Result<(), LookupRequestError> { + let request_state = R::request_state_mut(lookup); + + request_state.get_state_mut().component_processed = true; + if lookup.both_components_processed() { + lookup.penalize_lazy_blob_peer(cx); + + // Try it again if possible. + lookup + .blob_request_state + .state + .register_failure_processing(); + lookup.request_block_and_blobs(cx)?; + } + Ok(()) + } + + /// Handles peer scoring and retries related to a `BlockError` in response to a single block + /// or blob lookup processing result. + fn handle_single_lookup_block_error( + &mut self, + cx: &mut SyncNetworkContext, + mut lookup: SingleBlockLookup, + peer_id: PeerShouldHave, + e: BlockError, + ) -> Result>, LookupRequestError> { + let root = lookup.block_root(); + trace!(self.log, "Single block processing failed"; "block" => %root, "error" => %e); + match e { + BlockError::BlockIsAlreadyKnown => { + // No error here + return Ok(None); + } + BlockError::BeaconChainError(e) => { + // Internal error + error!(self.log, "Beacon chain error processing single block"; "block_root" => %root, "error" => ?e); + return Ok(None); + } + BlockError::ParentUnknown(block) => { + let slot = block.slot(); + let parent_root = block.parent_root(); + lookup.add_child_components(block.into()); + lookup.request_block_and_blobs(cx)?; + self.search_parent(slot, root, parent_root, peer_id.to_peer_id(), cx); + } + ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => { + // These errors indicate that the execution layer is offline + // and failed to validate the execution payload. Do not downscore peer. + debug!( + self.log, + "Single block lookup failed. Execution layer is offline / unsynced / misconfigured"; + "root" => %root, + "error" => ?e + ); + return Ok(None); + } + BlockError::AvailabilityCheck(e) => { + warn!(self.log, "Availability check failure"; "root" => %root, "peer_id" => %peer_id, "error" => ?e); + lookup.handle_availability_check_failure(cx); + lookup.request_block_and_blobs(cx)? + } + BlockError::BlobValidation(_) => { + warn!(self.log, "Blob validation failure"; "root" => %root, "peer_id" => %peer_id); + if let Ok(blob_peer) = lookup.blob_request_state.state.processing_peer() { + cx.report_peer( + blob_peer.to_peer_id(), + PeerAction::MidToleranceError, + "single_block_failure", + ); + lookup + .blob_request_state + .state + .remove_peer_if_useless(blob_peer.as_peer_id()); + } + lookup + .blob_request_state + .state + .register_failure_processing(); + lookup.request_block_and_blobs(cx)? + } + other => { + warn!(self.log, "Peer sent invalid block in single block lookup"; "root" => %root, "error" => ?other, "peer_id" => %peer_id); + if let Ok(block_peer) = lookup.block_request_state.state.processing_peer() { + cx.report_peer( + block_peer.to_peer_id(), + PeerAction::MidToleranceError, + "single_block_failure", + ); + + // Try it again if possible. + lookup + .block_request_state + .state + .register_failure_processing(); + lookup.request_block_and_blobs(cx)? + } + } + } + Ok(Some(lookup)) } pub fn parent_block_processed( @@ -813,11 +1020,6 @@ impl BlockLookups { return debug!(self.log, "Process response for a parent lookup request that was not found"; "chain_hash" => %chain_hash); }; - let Ok(peer_id) = - parent_lookup.processing_peer() else { - return - }; - match &result { BlockProcessingResult::Ok(status) => match status { AvailabilityProcessingStatus::Imported(block_root) => { @@ -845,7 +1047,32 @@ impl BlockLookups { _, block_root, )) => { - self.search_block(block_root, peer_id, cx); + let expected_block_root = parent_lookup.current_parent_request.block_root(); + if block_root != expected_block_root { + warn!( + self.log, + "Parent block processing result/request root mismatch"; + "request" =>?expected_block_root, + "result" => ?block_root + ); + return; + } + + // We only send parent blocks + blobs for processing together. This means a + // `MissingComponents` response here indicates missing blobs. Therefore we always + // register a blob processing failure here. + parent_lookup + .current_parent_request + .blob_request_state + .state + .register_failure_processing(); + match parent_lookup + .current_parent_request + .request_block_and_blobs(cx) + { + Ok(()) => self.parent_lookups.push(parent_lookup), + Err(e) => self.handle_parent_request_error(&mut parent_lookup, cx, e.into()), + } } BlockProcessingResult::Err(BlockError::ParentUnknown(block)) => { parent_lookup.add_unknown_parent_block(block); @@ -864,29 +1091,10 @@ impl BlockLookups { ); } }; - let (chain_hash, mut blocks, hashes, block_request) = + let (chain_hash, blocks, hashes, block_request) = parent_lookup.parts_for_processing(); - // Find the child block that spawned the parent lookup request and add it to the chain - // to send for processing. - if let Some(child_lookup_id) = - self.single_block_lookups.iter().find_map(|(id, lookup)| { - (lookup.block_request_state.requested_block_root == chain_hash) - .then_some(*id) - }) - { - let Some(child_lookup) = self.single_block_lookups.get_mut(&child_lookup_id) else { - debug!(self.log, "Missing child for parent lookup request"; "child_root" => ?chain_hash); - return; - }; - if let Some(rpc_block) = child_lookup.get_downloaded_block() { - blocks.push(rpc_block); - } else { - trace!(self.log, "Parent lookup chain complete, awaiting child response"; "chain_hash" => ?chain_hash); - } - } else { - debug!(self.log, "Missing child for parent lookup request"; "child_root" => ?chain_hash); - }; + let blocks = self.add_child_block_to_chain(chain_hash, blocks, cx); let process_id = ChainSegmentProcessId::ParentLookup(chain_hash); @@ -917,7 +1125,7 @@ impl BlockLookups { ); } BlockProcessingResult::Err(outcome) => { - self.handle_invalid_block(outcome, peer_id.to_peer_id(), cx, parent_lookup); + self.handle_parent_block_error(outcome, cx, parent_lookup); } BlockProcessingResult::Ignored => { // Beacon processor signalled to ignore the block processing result. @@ -936,24 +1144,107 @@ impl BlockLookups { ); } - fn handle_invalid_block( + /// Find the child block that spawned the parent lookup request and add it to the chain + /// to send for processing. + fn add_child_block_to_chain( + &mut self, + chain_hash: Hash256, + mut blocks: Vec>, + cx: &SyncNetworkContext, + ) -> Vec> { + // Find the child block that spawned the parent lookup request and add it to the chain + // to send for processing. + if let Some(child_lookup_id) = self + .single_block_lookups + .iter() + .find_map(|(id, lookup)| (lookup.block_root() == chain_hash).then_some(*id)) + { + let Some(child_lookup) = self.single_block_lookups.get_mut(&child_lookup_id) else { + debug!(self.log, "Missing child for parent lookup request"; "child_root" => ?chain_hash); + return blocks; + }; + match child_lookup.get_cached_child_block() { + CachedChild::Ok(rpc_block) => { + blocks.push(rpc_block); + } + CachedChild::DownloadIncomplete => { + trace!(self.log, "Parent lookup chain complete, awaiting child response"; "chain_hash" => ?chain_hash); + } + CachedChild::NotRequired => { + warn!(self.log, "Child not cached for parent lookup"; "chain_hash" => %chain_hash); + } + CachedChild::Err(e) => { + warn!( + self.log, + "Consistency error in child block triggering chain or parent lookups"; + "error" => ?e, + "chain_hash" => ?chain_hash + ); + child_lookup.handle_consistency_failure(cx); + if child_lookup.request_block_and_blobs(cx).is_err() { + self.single_block_lookups.remove(&child_lookup_id); + } + } + } + } else { + debug!(self.log, "Missing child for parent lookup request"; "child_root" => ?chain_hash); + }; + blocks + } + + /// Handle the peer scoring, retries, and logging related to a `BlockError` returned from + /// processing a block + blobs for a parent lookup. + fn handle_parent_block_error( &mut self, outcome: BlockError<::EthSpec>, - peer_id: PeerId, cx: &SyncNetworkContext, mut parent_lookup: ParentLookup, ) { + // We should always have a block peer. + let Ok(block_peer_id) = + parent_lookup.block_processing_peer() else { + return + }; + let block_peer_id = block_peer_id.to_peer_id(); + + // We may not have a blob peer, if there were no blobs required for this block. + let blob_peer_id = parent_lookup + .blob_processing_peer() + .ok() + .map(PeerShouldHave::to_peer_id); + // all else we consider the chain a failure and downvote the peer that sent // us the last block warn!( self.log, "Invalid parent chain"; "score_adjustment" => %PeerAction::MidToleranceError, "outcome" => ?outcome, - "last_peer" => %peer_id, + "block_peer_id" => %block_peer_id, ); // This currently can be a host of errors. We permit this due to the partial // ambiguity. - cx.report_peer(peer_id, PeerAction::MidToleranceError, "parent_request_err"); + cx.report_peer( + block_peer_id, + PeerAction::MidToleranceError, + "parent_request_err", + ); + // Don't downscore the same peer twice + if let Some(blob_peer_id) = blob_peer_id { + if block_peer_id != blob_peer_id { + debug!( + self.log, "Additionally down-scoring blob peer"; + "score_adjustment" => %PeerAction::MidToleranceError, + "outcome" => ?outcome, + "blob_peer_id" => %blob_peer_id, + ); + cx.report_peer( + blob_peer_id, + PeerAction::MidToleranceError, + "parent_request_err", + ); + } + } + // Try again if possible parent_lookup.processing_failed(); self.request_parent(parent_lookup, cx); @@ -979,7 +1270,7 @@ impl BlockLookups { .single_block_lookups .iter() .find_map(|(id, req)| - (req.block_request_state.requested_block_root == chain_hash).then_some(*id)) else { + (req.block_root() == chain_hash).then_some(*id)) else { warn!(self.log, "No id found for single block lookup"; "chain_hash" => %chain_hash); return; }; @@ -991,20 +1282,40 @@ impl BlockLookups { return; }; - if let Some(rpc_block) = lookup.get_downloaded_block() { - // This is the correct block, send it for processing - if self - .send_block_for_processing( - chain_hash, - rpc_block, - Duration::from_secs(0), //TODO(sean) pipe this through - BlockProcessType::SingleBlock { id }, - cx, - ) - .is_err() - { - // Remove to avoid inconsistencies - self.single_block_lookups.remove(&id); + match lookup.get_cached_child_block() { + CachedChild::Ok(rpc_block) => { + // This is the correct block, send it for processing + if self + .send_block_for_processing( + chain_hash, + rpc_block, + timestamp_now(), + BlockProcessType::SingleBlock { id }, + cx, + ) + .is_err() + { + // Remove to avoid inconsistencies + self.single_block_lookups.remove(&id); + } + } + CachedChild::DownloadIncomplete => { + trace!(self.log, "Parent chain complete, awaiting child response"; "chain_hash" => %chain_hash); + } + CachedChild::NotRequired => { + warn!(self.log, "Child not cached for parent lookup"; "chain_hash" => %chain_hash); + } + CachedChild::Err(e) => { + warn!( + self.log, + "Consistency error in child block triggering parent lookup"; + "chain_hash" => %chain_hash, + "error" => ?e + ); + lookup.handle_consistency_failure(cx); + if lookup.request_block_and_blobs(cx).is_err() { + self.single_block_lookups.remove(&id); + } } } } @@ -1013,9 +1324,7 @@ impl BlockLookups { penalty, } => { self.failed_chains.insert(chain_hash); - let mut all_peers = request.block_request_state.state.used_peers.clone(); - all_peers.extend(request.blob_request_state.state.used_peers); - for peer_source in all_peers { + for peer_source in request.all_peers() { cx.report_peer(peer_source, penalty, "parent_chain_failure") } } @@ -1033,13 +1342,13 @@ impl BlockLookups { /* Helper functions */ fn send_block_for_processing( - &mut self, + &self, block_root: Hash256, block: RpcBlock, duration: Duration, process_type: BlockProcessType, cx: &SyncNetworkContext, - ) -> Result<(), ()> { + ) -> Result<(), LookupRequestError> { match cx.beacon_processor_if_enabled() { Some(beacon_processor) => { trace!(self.log, "Sending block for processing"; "block" => ?block_root, "process" => ?process_type); @@ -1054,14 +1363,18 @@ impl BlockLookups { "Failed to send sync block to processor"; "error" => ?e ); - Err(()) + Err(LookupRequestError::SendFailed( + "beacon processor send failure", + )) } else { Ok(()) } } None => { trace!(self.log, "Dropping block ready for processing. Beacon processor not available"; "block" => %block_root); - Err(()) + Err(LookupRequestError::SendFailed( + "beacon processor unavailable", + )) } } } @@ -1073,11 +1386,7 @@ impl BlockLookups { duration: Duration, process_type: BlockProcessType, cx: &SyncNetworkContext, - ) -> Result<(), ()> { - let blob_count = blobs.iter().filter(|b| b.is_some()).count(); - if blob_count == 0 { - return Ok(()); - } + ) -> Result<(), LookupRequestError> { match cx.beacon_processor_if_enabled() { Some(beacon_processor) => { trace!(self.log, "Sending blobs for processing"; "block" => ?block_root, "process_type" => ?process_type); @@ -1089,54 +1398,30 @@ impl BlockLookups { "Failed to send sync blobs to processor"; "error" => ?e ); - Err(()) + Err(LookupRequestError::SendFailed( + "beacon processor send failure", + )) } else { Ok(()) } } None => { trace!(self.log, "Dropping blobs ready for processing. Beacon processor not available"; "block_root" => %block_root); - Err(()) + Err(LookupRequestError::SendFailed( + "beacon processor unavailable", + )) } } } + /// Attempts to request the next unknown parent. This method handles peer scoring and dropping + /// the lookup in the event of failure. fn request_parent(&mut self, mut parent_lookup: ParentLookup, cx: &SyncNetworkContext) { let response = parent_lookup.request_parent(cx); match response { Err(e) => { - debug!(self.log, "Failed to request parent"; &parent_lookup, "error" => e.as_static()); - match e { - parent_lookup::RequestError::SendFailed(_) => { - // Probably shutting down, nothing to do here. Drop the request - } - parent_lookup::RequestError::ChainTooLong => { - self.failed_chains.insert(parent_lookup.chain_hash()); - // This indicates faulty peers. - for &peer_id in parent_lookup.used_peers() { - cx.report_peer(peer_id, PeerAction::LowToleranceError, e.as_static()) - } - } - parent_lookup::RequestError::TooManyAttempts { cannot_process } => { - // We only consider the chain failed if we were unable to process it. - // We could have failed because one peer continually failed to send us - // bad blocks. We still allow other peers to send us this chain. Note - // that peers that do this, still get penalised. - if cannot_process { - self.failed_chains.insert(parent_lookup.chain_hash()); - } - // This indicates faulty peers. - for &peer_id in parent_lookup.used_peers() { - cx.report_peer(peer_id, PeerAction::LowToleranceError, e.as_static()) - } - } - parent_lookup::RequestError::NoPeers => { - // This happens if the peer disconnects while the block is being - // processed. Drop the request without extra penalty - } - parent_lookup::RequestError::AlreadyDownloaded => {} - } + self.handle_parent_request_error(&mut parent_lookup, cx, e); } Ok(_) => { debug!(self.log, "Requesting parent"; &parent_lookup); @@ -1163,9 +1448,3 @@ impl BlockLookups { self.parent_lookups.drain(..).len() } } - -#[derive(Debug, Copy, Clone)] -pub enum LookupType { - Current, - Parent, -} diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index 6afeaccc7b..b94a3be71e 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -1,8 +1,7 @@ use super::single_block_lookup::{LookupRequestError, LookupVerifyError, SingleBlockLookup}; use super::{DownloadedBlocks, PeerShouldHave}; -use crate::sync::block_lookups::single_block_lookup::{ - Parent, RequestState, State, UnknownParentComponents, -}; +use crate::sync::block_lookups::common::Parent; +use crate::sync::block_lookups::common::RequestState; use crate::sync::{manager::SLOT_IMPORT_TOLERANCE, network_context::SyncNetworkContext}; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::block_verification_types::RpcBlock; @@ -55,7 +54,6 @@ pub enum RequestError { cannot_process: bool, }, NoPeers, - AlreadyDownloaded, } impl ParentLookup { @@ -103,55 +101,42 @@ impl ParentLookup { .map_err(Into::into) } - pub fn check_block_peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), ()> { + pub fn check_peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), ()> { self.current_parent_request .block_request_state .state .check_peer_disconnected(peer_id) - } - - pub fn check_blob_peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), ()> { - self.current_parent_request - .blob_request_state - .state - .check_peer_disconnected(peer_id) + .and_then(|()| { + self.current_parent_request + .blob_request_state + .state + .check_peer_disconnected(peer_id) + }) } pub fn add_unknown_parent_block(&mut self, block: RpcBlock) { let next_parent = block.parent_root(); - // Cache the block. - let current_root = self - .current_parent_request - .block_request_state - .requested_block_root; + let current_root = self.current_parent_request.block_root(); self.downloaded_blocks.push((current_root, block)); - // Update the block request. + // Update the parent request. self.current_parent_request - .block_request_state - .requested_block_root = next_parent; - self.current_parent_request.block_request_state.state.state = State::AwaitingDownload; - - // Update the blobs request. - self.current_parent_request.blob_request_state.state.state = State::AwaitingDownload; - - // Reset the unknown parent components. - self.current_parent_request.unknown_parent_components = - Some(UnknownParentComponents::default()); + .update_requested_parent_block(next_parent) } - pub fn processing_peer(&self) -> Result { + pub fn block_processing_peer(&self) -> Result { self.current_parent_request .block_request_state .state .processing_peer() - .or_else(|()| { - self.current_parent_request - .blob_request_state - .state - .processing_peer() - }) + } + + pub fn blob_processing_peer(&self) -> Result { + self.current_parent_request + .blob_request_state + .state + .processing_peer() } /// Consumes the parent request and destructures it into it's parts. @@ -193,11 +178,7 @@ impl ParentLookup { .blob_request_state .state .register_failure_processing(); - if let Some(components) = self - .current_parent_request - .unknown_parent_components - .as_mut() - { + if let Some(components) = self.current_parent_request.cached_child_components.as_mut() { components.downloaded_block = None; components.downloaded_blobs = <_>::default(); } @@ -209,11 +190,8 @@ impl ParentLookup { &mut self, block: Option, failed_chains: &mut lru_cache::LRUTimeCache, - ) -> Result, ParentVerifyError> { - let expected_block_root = self - .current_parent_request - .block_request_state - .requested_block_root; + ) -> Result, ParentVerifyError> { + let expected_block_root = self.current_parent_request.block_root(); let request_state = R::request_state_mut(&mut self.current_parent_request); let root_and_block = request_state.verify_response(expected_block_root, block)?; @@ -221,7 +199,7 @@ impl ParentLookup { // be dropped and the peer downscored. if let Some(parent_root) = root_and_block .as_ref() - .and_then(|(_, block)| R::get_parent_root(block)) + .and_then(|block| R::get_parent_root(block)) { if failed_chains.contains(&parent_root) { request_state.register_failure_downloading(); @@ -277,7 +255,6 @@ impl From for RequestError { RequestError::TooManyAttempts { cannot_process } } E::NoPeers => RequestError::NoPeers, - E::AlreadyDownloaded => RequestError::AlreadyDownloaded, E::SendFailed(msg) => RequestError::SendFailed(msg), } } @@ -306,7 +283,6 @@ impl RequestError { } RequestError::TooManyAttempts { cannot_process: _ } => "too_many_downloading_attempts", RequestError::NoPeers => "no_peers", - RequestError::AlreadyDownloaded => "already_downloaded", } } } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 5499c66986..3cb084d9be 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -1,544 +1,20 @@ -use super::{PeerShouldHave, ResponseType}; -use crate::sync::block_lookups::parent_lookup::PARENT_FAIL_TOLERANCE; -use crate::sync::block_lookups::{ - BlockLookups, Id, LookupType, RootBlobsTuple, RootBlockTuple, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, -}; -use crate::sync::manager::BlockProcessType; +use super::PeerShouldHave; +use crate::sync::block_lookups::common::{Lookup, RequestState}; +use crate::sync::block_lookups::Id; +use crate::sync::manager::SingleLookupReqId; use crate::sync::network_context::SyncNetworkContext; use beacon_chain::block_verification_types::RpcBlock; -use beacon_chain::data_availability_checker::DataAvailabilityChecker; -use beacon_chain::{get_block_root, BeaconChainTypes}; -use lighthouse_network::rpc::methods::BlobsByRootRequest; -use lighthouse_network::{rpc::BlocksByRootRequest, PeerId}; -use rand::seq::IteratorRandom; -use slog::{debug, trace, Logger}; -use ssz_types::VariableList; +use beacon_chain::data_availability_checker::{AvailabilityCheckError, DataAvailabilityChecker}; +use beacon_chain::BeaconChainTypes; +use lighthouse_network::{PeerAction, PeerId}; +use slog::{trace, Logger}; use std::collections::HashSet; use std::marker::PhantomData; -use std::ops::IndexMut; use std::sync::Arc; -use std::time::Duration; use store::Hash256; use strum::IntoStaticStr; use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList}; -use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; - -pub trait Lookup { - const MAX_ATTEMPTS: u8; - fn lookup_type() -> LookupType; - fn max_attempts() -> u8 { - Self::MAX_ATTEMPTS - } -} - -pub struct Parent; -pub struct Current; - -impl Lookup for Parent { - const MAX_ATTEMPTS: u8 = PARENT_FAIL_TOLERANCE; - fn lookup_type() -> LookupType { - LookupType::Parent - } -} - -impl Lookup for Current { - const MAX_ATTEMPTS: u8 = SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS; - fn lookup_type() -> LookupType { - LookupType::Current - } -} - -pub struct SingleBlockLookup { - pub id: Id, - pub block_request_state: BlockRequestState, - pub blob_request_state: BlobRequestState, - pub da_checker: Arc>, - /// Only necessary for requests triggered by an `UnknownBlockParent` or `UnknownBlockParent` because any - /// blocks or blobs without parents won't hit the data availability cache. - pub unknown_parent_components: Option>, - /// We may want to delay the actual request trigger to give us a chance to receive all block - /// components over gossip. - pub triggered: bool, -} - -// generic across block + blob -pub trait RequestState { - type RequestType; - type ResponseType; - type ReconstructedResponseType; - type VerifiedResponseType; - - // response verify - fn response_type() -> ResponseType; - fn get_parent_root(verified_response: &Self::VerifiedResponseType) -> Option; - fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self; - fn add_to_parent_components( - verified_response: Self::VerifiedResponseType, - components: &mut UnknownParentComponents, - ); - fn verified_to_reconstructed( - verified: Self::VerifiedResponseType, - ) -> Self::ReconstructedResponseType; - fn send_for_processing( - id: Id, - bl: &mut BlockLookups, - block_root: Hash256, - verified: Self::ReconstructedResponseType, - duration: Duration, - cx: &SyncNetworkContext, - ) -> Result<(), ()>; - - fn get_state(&self) -> &SingleLookupRequestState; - fn get_state_mut(&mut self) -> &mut SingleLookupRequestState; - fn processing_peer(&self) -> Result { - self.get_state().processing_peer() - } - fn set_component_processed(&mut self) { - self.get_state_mut().component_processed = true; - } - fn new_request(&self) -> Self::RequestType; - fn max_attempts() -> u8; - fn retry_request( - id: Id, - cx: &SyncNetworkContext, - peer_id: PeerId, - request: Self::RequestType, - ) -> Result<(), &'static str>; - fn verify_response( - &mut self, - expected_block_root: Hash256, - response: Option, - ) -> Result, LookupVerifyError> { - let request_state = self.get_state_mut(); - match request_state.state { - State::AwaitingDownload => { - request_state.register_failure_downloading(); - Err(LookupVerifyError::ExtraBlocksReturned) - } - State::Downloading { peer_id } => { - self.verify_response_inner(expected_block_root, response, peer_id) - } - State::Processing { peer_id: _ } => match response { - Some(_) => { - // We sent the block for processing and received an extra block. - request_state.register_failure_downloading(); - Err(LookupVerifyError::ExtraBlocksReturned) - } - None => { - // This is simply the stream termination and we are already processing the - // block - Ok(None) - } - }, - } - } - fn verify_response_inner( - &mut self, - expected_block_root: Hash256, - response: Option, - peer_id: PeerShouldHave, - ) -> Result, LookupVerifyError>; - - fn retry_request_after_failure( - &mut self, - id: Id, - cx: &SyncNetworkContext, - log: &Logger, - ) -> Result<(), ()> { - if let Err(e) = self - .build_request() - .map_err(Into::into) - .and_then(|(peer_id, request)| Self::retry_request(id, cx, peer_id, request)) - { - //TODO(sean) pass this error up? check downloaded contents prior to retry-ing? - debug!(log, "Single block lookup failed"; - "error" => ?e, - ); - return Err(()); - } - Ok(()) - } - fn build_request(&mut self) -> Result<(PeerId, Self::RequestType), LookupRequestError> { - debug_assert!(matches!(self.get_state().state, State::AwaitingDownload)); - self.too_many_attempts()?; - let peer = self.get_peer()?; - let request = self.new_request(); - Ok((peer, request)) - } - fn too_many_attempts(&self) -> Result<(), LookupRequestError> { - let max_attempts = Self::max_attempts(); - if self.get_state().failed_attempts() >= max_attempts { - Err(LookupRequestError::TooManyAttempts { - cannot_process: self.cannot_process(), - }) - } else { - Ok(()) - } - } - fn cannot_process(&self) -> bool { - let request_state = self.get_state(); - request_state.failed_processing >= request_state.failed_downloading - } - fn get_peer(&mut self) -> Result { - let request_state = self.get_state_mut(); - let Some(peer_id) = request_state - .available_peers - .iter() - .choose(&mut rand::thread_rng()) - .copied() - .map(PeerShouldHave::BlockAndBlobs).or(request_state - .potential_peers - .iter() - .choose(&mut rand::thread_rng()) - .copied() - .map(PeerShouldHave::Neither)) else { - return Err(LookupRequestError::NoPeers); - }; - request_state.used_peers.insert(peer_id.to_peer_id()); - request_state.state = State::Downloading { peer_id }; - Ok(peer_id.to_peer_id()) - } - fn check_peer_disconnected(&mut self, peer: &PeerId) -> Result<(), ()> { - self.get_state_mut().check_peer_disconnected(peer) - } - fn remove_if_useless(&mut self, peer: &PeerId) { - self.get_state_mut().remove_peer_if_useless(peer) - } - fn downloading(&self) -> bool { - matches!(self.get_state().state, State::Downloading { .. }) - } - fn register_failure_downloading(&mut self) { - self.get_state_mut().register_failure_downloading() - } -} - -impl RequestState for BlockRequestState { - type RequestType = BlocksByRootRequest; - type ResponseType = Arc>; - type ReconstructedResponseType = RpcBlock; - type VerifiedResponseType = Arc>; - - // response verify - fn response_type() -> ResponseType { - ResponseType::Block - } - - fn get_parent_root(verified_response: &Arc>) -> Option { - Some(verified_response.parent_root()) - } - fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self { - &mut request.block_request_state - } - fn add_to_parent_components( - verified_response: Arc>, - components: &mut UnknownParentComponents, - ) { - components.add_unknown_parent_block(verified_response); - } - - fn verified_to_reconstructed( - block: Arc>, - ) -> RpcBlock { - RpcBlock::new_without_blobs(block) - } - - fn send_for_processing( - id: Id, - bl: &mut BlockLookups, - block_root: Hash256, - constructed: RpcBlock, - duration: Duration, - cx: &SyncNetworkContext, - ) -> Result<(), ()> { - bl.send_block_for_processing( - block_root, - constructed, - duration, - BlockProcessType::SingleBlock { id }, - cx, - ) - } - - fn get_state(&self) -> &SingleLookupRequestState { - &self.state - } - fn get_state_mut(&mut self) -> &mut SingleLookupRequestState { - &mut self.state - } - fn new_request(&self) -> BlocksByRootRequest { - BlocksByRootRequest::new(VariableList::from(vec![self.requested_block_root])) - } - fn max_attempts() -> u8 { - L::MAX_ATTEMPTS - } - fn retry_request( - id: Id, - cx: &SyncNetworkContext, - peer_id: PeerId, - request: Self::RequestType, - ) -> Result<(), &'static str> { - cx.single_block_lookup_request_retry(id, peer_id, request, L::lookup_type()) - } - - fn verify_response_inner( - &mut self, - expected_block_root: Hash256, - response: Option, - peer_id: PeerShouldHave, - ) -> Result>, LookupVerifyError> { - match response { - Some(block) => { - // Compute the block root using this specific function so that we can get timing - // metrics. - let block_root = get_block_root(&block); - if block_root != expected_block_root { - // return an error and drop the block - // NOTE: we take this is as a download failure to prevent counting the - // attempt as a chain failure, but simply a peer failure. - self.state.register_failure_downloading(); - Err(LookupVerifyError::RootMismatch) - } else { - // Return the block for processing. - self.state.state = State::Processing { peer_id }; - Ok(Some((block_root, block))) - } - } - None => { - if peer_id.should_have_block() { - self.state.register_failure_downloading(); - Err(LookupVerifyError::NoBlockReturned) - } else { - self.state.state = State::AwaitingDownload; - Err(LookupVerifyError::BenignFailure) - } - } - } - } -} - -impl RequestState for BlobRequestState { - type RequestType = BlobsByRootRequest; - type ResponseType = Arc>; - type ReconstructedResponseType = FixedBlobSidecarList; - type VerifiedResponseType = FixedBlobSidecarList; - - // response verify - fn response_type() -> ResponseType { - ResponseType::Blob - } - - fn get_parent_root(verified_response: &FixedBlobSidecarList) -> Option { - verified_response - .into_iter() - .filter_map(|blob| blob.as_ref()) - .map(|blob| blob.block_parent_root) - .next() - } - fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self { - &mut request.blob_request_state - } - fn add_to_parent_components( - verified_response: FixedBlobSidecarList, - components: &mut UnknownParentComponents, - ) { - components.add_unknown_parent_blobs(verified_response); - } - fn verified_to_reconstructed( - blobs: FixedBlobSidecarList, - ) -> FixedBlobSidecarList { - blobs - } - - fn send_for_processing( - id: Id, - bl: &mut BlockLookups, - block_root: Hash256, - verified: FixedBlobSidecarList, - duration: Duration, - cx: &SyncNetworkContext, - ) -> Result<(), ()> { - bl.send_blobs_for_processing( - block_root, - verified, - duration, - BlockProcessType::SingleBlob { id }, - cx, - ) - } - - fn get_state(&self) -> &SingleLookupRequestState { - &self.state - } - fn get_state_mut(&mut self) -> &mut SingleLookupRequestState { - &mut self.state - } - fn new_request(&self) -> BlobsByRootRequest { - BlobsByRootRequest { - blob_ids: VariableList::from(self.requested_ids.clone()), - } - } - fn max_attempts() -> u8 { - L::MAX_ATTEMPTS - } - fn retry_request( - id: Id, - cx: &SyncNetworkContext, - peer_id: PeerId, - request: Self::RequestType, - ) -> Result<(), &'static str> { - cx.single_blob_lookup_request_retry(id, peer_id, request, L::lookup_type()) - } - - fn verify_response_inner( - &mut self, - expected_block_root: Hash256, - blob: Option, - peer_id: PeerShouldHave, - ) -> Result>, LookupVerifyError> { - match blob { - Some(blob) => { - let received_id = blob.id(); - if !self.requested_ids.contains(&received_id) { - self.state.register_failure_downloading(); - Err(LookupVerifyError::UnrequestedBlobId) - } else { - // State should remain downloading until we receive the stream terminator. - self.requested_ids.retain(|id| *id != received_id); - let blob_index = blob.index; - - if blob_index >= T::EthSpec::max_blobs_per_block() as u64 { - return Err(LookupVerifyError::InvalidIndex(blob.index)); - } - *self.blob_download_queue.index_mut(blob_index as usize) = Some(blob); - Ok(None) - } - } - None => { - self.state.state = State::Processing { peer_id }; - let blobs = std::mem::take(&mut self.blob_download_queue); - Ok(Some((expected_block_root, blobs))) - } - } - } -} - -pub struct BlobRequestState { - pub requested_ids: Vec, - /// Where we store blobs until we receive the stream terminator. - pub blob_download_queue: FixedBlobSidecarList, - pub state: SingleLookupRequestState, - _phantom: PhantomData, -} - -impl BlobRequestState { - pub fn new(peer_source: &[PeerShouldHave]) -> Self { - Self { - requested_ids: <_>::default(), - blob_download_queue: <_>::default(), - state: SingleLookupRequestState::new(peer_source), - _phantom: PhantomData, - } - } -} - -pub struct BlockRequestState { - pub requested_block_root: Hash256, - pub state: SingleLookupRequestState, - _phantom: PhantomData, -} - -impl BlockRequestState { - pub fn new(block_root: Hash256, peers: &[PeerShouldHave]) -> Self { - Self { - requested_block_root: block_root, - state: SingleLookupRequestState::new(peers), - _phantom: PhantomData, - } - } -} - -impl SingleBlockLookup { - pub(crate) fn block_already_downloaded(&self) -> bool { - if let Some(components) = self.unknown_parent_components.as_ref() { - components.downloaded_block.is_some() - } else { - self.da_checker - .has_block(&self.block_request_state.requested_block_root) - } - } - - pub(crate) fn blobs_already_downloaded(&mut self) -> bool { - self.update_blobs_request(); - self.blob_request_state.requested_ids.is_empty() - } -} - -/// For requests triggered by an `UnknownBlockParent` or `UnknownBlockParent`, this struct -/// is used to cache components as they are sent to the networking layer. We can't use the -/// data availability cache currently because any blocks or blobs without parents won't hit -/// won't pass validation and therefore won't make it into the cache. -#[derive(Default)] -pub struct UnknownParentComponents { - pub downloaded_block: Option>>, - pub downloaded_blobs: FixedBlobSidecarList, -} - -impl From> for UnknownParentComponents { - fn from(value: RpcBlock) -> Self { - let (block, blobs) = value.deconstruct(); - let fixed_blobs = blobs.map(|blobs| { - FixedBlobSidecarList::from(blobs.into_iter().map(Some).collect::>()) - }); - Self::new(Some(block), fixed_blobs) - } -} - -impl UnknownParentComponents { - pub fn new( - block: Option>>, - blobs: Option>, - ) -> Self { - Self { - downloaded_block: block, - downloaded_blobs: blobs.unwrap_or_default(), - } - } - pub fn add_unknown_parent_block(&mut self, block: Arc>) { - self.downloaded_block = Some(block); - } - pub fn add_unknown_parent_blobs(&mut self, blobs: FixedBlobSidecarList) { - for (index, blob_opt) in self.downloaded_blobs.iter_mut().enumerate() { - if let Some(Some(downloaded_blob)) = blobs.get(index) { - *blob_opt = Some(downloaded_blob.clone()); - } - } - } - pub fn downloaded_indices(&self) -> HashSet { - self.downloaded_blobs - .iter() - .enumerate() - .filter_map(|(i, blob_opt)| blob_opt.as_ref().map(|_| i)) - .collect::>() - } -} - -/// Object representing the state of a single block or blob lookup request. -#[derive(PartialEq, Eq, Debug)] -pub struct SingleLookupRequestState { - /// State of this request. - pub state: State, - /// Peers that should have this block or blob. - pub available_peers: HashSet, - /// Peers that mar or may not have this block or blob. - pub potential_peers: HashSet, - /// Peers from which we have requested this block. - pub used_peers: HashSet, - /// How many times have we attempted to process this block or blob. - failed_processing: u8, - /// How many times have we attempted to download this block or blob. - failed_downloading: u8, - pub component_processed: bool, -} +use types::{EthSpec, SignedBeaconBlock}; #[derive(Debug, PartialEq, Eq)] pub enum State { @@ -571,13 +47,22 @@ pub enum LookupRequestError { }, NoPeers, SendFailed(&'static str), - AlreadyDownloaded, +} + +pub struct SingleBlockLookup { + pub id: Id, + pub block_request_state: BlockRequestState, + pub blob_request_state: BlobRequestState, + pub da_checker: Arc>, + /// Only necessary for requests triggered by an `UnknownBlockParent` or `UnknownBlockParent` + /// because any blocks or blobs without parents won't hit the data availability cache. + pub cached_child_components: Option>, } impl SingleBlockLookup { pub fn new( requested_block_root: Hash256, - unknown_parent_components: Option>, + unknown_parent_components: Option>, peers: &[PeerShouldHave], da_checker: Arc>, id: Id, @@ -587,135 +72,123 @@ impl SingleBlockLookup { block_request_state: BlockRequestState::new(requested_block_root, peers), blob_request_state: BlobRequestState::new(peers), da_checker, - unknown_parent_components, - triggered: false, + cached_child_components: unknown_parent_components, } } - pub fn is_for_block(&self, block_root: Hash256) -> bool { - self.block_request_state.requested_block_root == block_root + /// Get the block root that is being requested. + pub fn block_root(&self) -> Hash256 { + self.block_request_state.requested_block_root } - /// Send the necessary request for blobs and blocks and update `self.id` with the latest - /// request `Id`s. This will return `Err(())` if neither the block nor blob request could be made - /// or are no longer required. + /// Check the block root matches the requested block root. + pub fn is_for_block(&self, block_root: Hash256) -> bool { + self.block_root() == block_root + } + + /// Update the requested block, this should only be used in a chain of parent lookups to request + /// the next parent. + pub fn update_requested_parent_block(&mut self, block_root: Hash256) { + self.block_request_state.requested_block_root = block_root; + self.block_request_state.state.state = State::AwaitingDownload; + self.blob_request_state.state.state = State::AwaitingDownload; + self.cached_child_components = Some(ChildComponents::default()); + } + + /// Get all unique peers across block and blob requests. + pub fn all_peers(&self) -> HashSet { + let mut all_peers = self.block_request_state.state.used_peers.clone(); + all_peers.extend(self.blob_request_state.state.used_peers.clone()); + all_peers + } + + /// Send the necessary requests for blocks and/or blobs. This will check whether we have + /// downloaded the block and/or blobs already and will not send requests if so. It will also + /// inspect the request state or blocks and blobs to ensure we are not already processing or + /// downloading the block and/or blobs. pub fn request_block_and_blobs( &mut self, cx: &SyncNetworkContext, ) -> Result<(), LookupRequestError> { - let block_root = self.block_request_state.requested_block_root; - if self.block_already_downloaded() && self.blobs_already_downloaded() { - // drop lookup + let block_root = self.block_root(); + let block_already_downloaded = self.block_already_downloaded(); + let blobs_already_downloaded = self.blobs_already_downloaded(); + + if block_already_downloaded && blobs_already_downloaded { trace!(cx.log, "Lookup request already completed"; "block_root"=> ?block_root); - return Err(LookupRequestError::AlreadyDownloaded); + return Ok(()); } - let (block_peer_id, block_request) = - match as RequestState>::build_request( - &mut self.block_request_state, - ) { - Ok(opt) => opt, - Err(e) => { - // drop lookup - debug!(cx.log, - "Lookup request block error, dropping lookup"; - "block_root"=> ?block_root, - "error"=> ?e - ); - return Err(e); - } + self.request_generic::>(block_already_downloaded, cx)?; + self.request_generic::>(blobs_already_downloaded, cx) + } + + /// Common checks and request logic for blocks and blobs. + fn request_generic>( + &mut self, + already_downloaded: bool, + cx: &SyncNetworkContext, + ) -> Result<(), LookupRequestError> { + let id = self.id; + let request_state = R::request_state_mut(self); + + let should_request = !already_downloaded + && matches!(request_state.get_state().state, State::AwaitingDownload); + if should_request { + let (peer_id, request) = request_state.build_request()?; + let id = SingleLookupReqId { + id, + req_counter: request_state.get_state().req_counter, }; - - let (blob_peer_id, blob_request) = match ::EthSpec, - > as RequestState>::build_request( - &mut self.blob_request_state - ) { - Ok(opt) => opt, - Err(e) => { - // drop lookup - debug!(cx.log, - "Lookup request blob error, dropping lookup"; - "block_root"=> ?block_root, - "error"=> ?e - ); - return Err(e); - } - }; - - cx.single_lookup_request( - self.id, - block_peer_id, - block_request, - blob_peer_id, - blob_request, - L::lookup_type(), - ) - .map_err(LookupRequestError::SendFailed)?; + R::make_request(id, peer_id, request, cx)?; + } Ok(()) } - pub fn update_blobs_request(&mut self) { - self.blob_request_state.requested_ids = if let Some(components) = - self.unknown_parent_components.as_ref() - { - let blobs = components.downloaded_indices(); - self.da_checker - .get_missing_blob_ids( - self.block_request_state.requested_block_root, - components.downloaded_block.as_ref(), - Some(blobs), - ) - .unwrap_or_default() + /// Returns a `CachedChild`, which is a wrapper around a `RpcBlock` that is either: + /// + /// 1. `NotRequired`: there is no child caching required for this lookup. + /// 2. `DownloadIncomplete`: Child caching is required, but all components are not yet downloaded. + /// 3. `Ok`: The child is required and we have downloaded it. + /// 4. `Err`: The child is required, but has failed consistency checks. + pub fn get_cached_child_block(&self) -> CachedChild { + if let Some(components) = self.cached_child_components.as_ref() { + let Some(block) = components.downloaded_block.as_ref()else { + return CachedChild::DownloadIncomplete + }; + + if !self.missing_blob_ids().is_empty() { + return CachedChild::DownloadIncomplete; + } + + match RpcBlock::new_from_fixed(block.clone(), components.downloaded_blobs.clone()) { + Ok(rpc_block) => CachedChild::Ok(rpc_block), + Err(e) => CachedChild::Err(e), + } } else { - self.da_checker - .get_missing_blob_ids_checking_cache(self.block_request_state.requested_block_root) - .unwrap_or_default() - }; + CachedChild::NotRequired + } } - pub fn get_downloaded_block(&mut self) -> Option> { - self.unknown_parent_components - .as_mut() - .and_then(|components| { - let downloaded_block = components.downloaded_block.as_ref(); - let downloaded_indices = components.downloaded_indices(); - let missing_ids = self.da_checker.get_missing_blob_ids( - self.block_request_state.requested_block_root, - downloaded_block, - Some(downloaded_indices), - ); - let download_complete = - missing_ids.map_or(true, |missing_ids| missing_ids.is_empty()); - if download_complete { - let UnknownParentComponents { - downloaded_block, - downloaded_blobs, - } = components; - downloaded_block.as_ref().and_then(|block| { - //TODO(sean) figure out how to properly deal with a consistency error here, - // should we downscore the peer sending blobs? - let blobs = std::mem::take(downloaded_blobs); - let filtered = blobs - .into_iter() - .filter_map(|b| b.clone()) - .collect::>(); - let blobs = VariableList::from(filtered); - RpcBlock::new(block.clone(), Some(blobs)).ok() - }) - } else { - None - } - }) - } - - pub fn add_unknown_parent_components( + /// Accepts a verified response, and adds it to the child components if required. This method + /// returns a `CachedChild` which provides a completed block + blob response if all components have been + /// received, or information about whether the child is required and if it has been downloaded. + pub fn add_response>( &mut self, - components: UnknownParentComponents, - ) { - if let Some(ref mut existing_components) = self.unknown_parent_components { - let UnknownParentComponents { + verified_response: R::VerifiedResponseType, + ) -> CachedChild { + if let Some(cached_child_components) = self.cached_child_components.as_mut() { + R::add_to_child_components(verified_response, cached_child_components); + self.get_cached_child_block() + } else { + CachedChild::NotRequired + } + } + + /// Add a child component to the lookup request. Merges with any existing child components. + pub fn add_child_components(&mut self, components: ChildComponents) { + if let Some(ref mut existing_components) = self.cached_child_components { + let ChildComponents { downloaded_block, downloaded_blobs, } = components; @@ -724,10 +197,11 @@ impl SingleBlockLookup { } existing_components.add_unknown_parent_blobs(downloaded_blobs); } else { - self.unknown_parent_components = Some(components); + self.cached_child_components = Some(components); } } + /// Add all given peers to both block and blob request states. pub fn add_peers(&mut self, peers: &[PeerShouldHave]) { for peer in peers { match peer { @@ -743,45 +217,267 @@ impl SingleBlockLookup { } } + /// Returns true if the block has already been downloaded. + pub fn both_components_downloaded(&self) -> bool { + self.block_request_state.state.component_downloaded + && self.blob_request_state.state.component_downloaded + } + + /// Returns true if the block has already been downloaded. pub fn both_components_processed(&self) -> bool { self.block_request_state.state.component_processed && self.blob_request_state.state.component_processed } - pub fn should_remove_disconnected_peer( + /// Checks both the block and blob request states to see if the peer is disconnected. + /// + /// Returns true if the lookup should be dropped. + pub fn should_drop_lookup_on_disconnected_peer( &mut self, - id: Id, peer_id: &PeerId, cx: &SyncNetworkContext, log: &Logger, ) -> bool { - let useless_block_peer = - if as RequestState>::check_peer_disconnected( - &mut self.block_request_state, - peer_id, - ) - .is_err() - { - trace!(log, "Single lookup failed on peer disconnection"; "block_root" => ?self.block_request_state.requested_block_root, "response_type" => ?ResponseType::Block); - self.block_request_state - .retry_request_after_failure(id, cx, log) - .is_err() - } else { - false - }; - let useless_blob_peer = if ::EthSpec> as RequestState>::check_peer_disconnected(&mut self - .blob_request_state, peer_id) - .is_err() - { - trace!(log, "Single lookup failed on peer disconnection"; "block_root" => ?self.block_request_state.requested_block_root, "response_type" => ?ResponseType::Blob); - self.blob_request_state - .retry_request_after_failure(id, cx, log) - .is_err() - } else { - false - }; - useless_block_peer && useless_blob_peer + let block_root = self.block_root(); + let block_peer_disconnected = self + .block_request_state + .state + .check_peer_disconnected(peer_id) + .is_err(); + let blob_peer_disconnected = self + .blob_request_state + .state + .check_peer_disconnected(peer_id) + .is_err(); + + if block_peer_disconnected || blob_peer_disconnected { + if let Err(e) = self.request_block_and_blobs(cx) { + trace!(log, "Single lookup failed on peer disconnection"; "block_root" => ?block_root, "error" => ?e); + return true; + } + } + false } + + /// Returns `true` if the block has already been downloaded. + pub(crate) fn block_already_downloaded(&self) -> bool { + if let Some(components) = self.cached_child_components.as_ref() { + components.downloaded_block.is_some() + } else { + self.da_checker.has_block(&self.block_root()) + } + } + + /// Updates the `requested_ids` field of the `BlockRequestState` with the most recent picture + /// of which blobs still need to be requested. Returns `true` if there are no more blobs to + /// request. + pub(crate) fn blobs_already_downloaded(&mut self) -> bool { + self.update_blobs_request(); + self.blob_request_state.requested_ids.is_empty() + } + + /// Updates this request with the most recent picture of which blobs still need to be requested. + pub fn update_blobs_request(&mut self) { + self.blob_request_state.requested_ids = self.missing_blob_ids() + } + + /// If `unknown_parent_components` is `Some`, we know block components won't hit the data + /// availability cache, so we don't check it. In either case we use the data availability + /// checker to get a picture of outstanding blob requirements for the block root. + pub(crate) fn missing_blob_ids(&self) -> Vec { + if let Some(components) = self.cached_child_components.as_ref() { + let blobs = components.downloaded_indices(); + self.da_checker + .get_missing_blob_ids( + self.block_root(), + components.downloaded_block.as_ref(), + Some(blobs), + ) + .unwrap_or_default() + } else { + self.da_checker + .get_missing_blob_ids_checking_cache(self.block_root()) + .unwrap_or_default() + } + } + + /// Penalizes a blob peer if it should have blobs but didn't return them to us. Does not penalize + /// a peer who we request blobs from based on seeing a block or blobs over gossip. This may + /// have been a benign failure. + pub fn penalize_lazy_blob_peer(&mut self, cx: &SyncNetworkContext) { + if let Ok(blob_peer) = self.blob_request_state.state.processing_peer() { + if let PeerShouldHave::BlockAndBlobs(blob_peer) = blob_peer { + cx.report_peer( + blob_peer, + PeerAction::MidToleranceError, + "single_block_failure", + ); + } + self.blob_request_state + .state + .remove_peer_if_useless(blob_peer.as_peer_id()); + } + } + + /// This failure occurs on download, so register a failure downloading, penalize the peer if + /// necessary and clear the blob cache. + pub fn handle_consistency_failure(&mut self, cx: &SyncNetworkContext) { + self.penalize_lazy_blob_peer(cx); + if let Some(cached_child) = self.cached_child_components.as_mut() { + cached_child.clear_blobs(); + } + self.blob_request_state.state.register_failure_downloading() + } + + /// This failure occurs after processing, so register a failure processing, penalize the peer if + /// necessary and clear the blob cache. + pub fn handle_availability_check_failure(&mut self, cx: &SyncNetworkContext) { + self.penalize_lazy_blob_peer(cx); + if let Some(cached_child) = self.cached_child_components.as_mut() { + cached_child.clear_blobs(); + } + self.blob_request_state.state.register_failure_processing() + } +} + +/// The state of the blob request component of a `SingleBlockLookup`. +pub struct BlobRequestState { + /// The latest picture of which blobs still need to be requested. This includes information + /// from both block/blobs downloaded in the network layer and any blocks/blobs that exist in + /// the data availability checker. + pub requested_ids: Vec, + /// Where we store blobs until we receive the stream terminator. + pub blob_download_queue: FixedBlobSidecarList, + pub state: SingleLookupRequestState, + _phantom: PhantomData, +} + +impl BlobRequestState { + pub fn new(peer_source: &[PeerShouldHave]) -> Self { + Self { + requested_ids: <_>::default(), + blob_download_queue: <_>::default(), + state: SingleLookupRequestState::new(peer_source), + _phantom: PhantomData, + } + } +} + +/// The state of the block request component of a `SingleBlockLookup`. +pub struct BlockRequestState { + pub requested_block_root: Hash256, + pub state: SingleLookupRequestState, + _phantom: PhantomData, +} + +impl BlockRequestState { + pub fn new(block_root: Hash256, peers: &[PeerShouldHave]) -> Self { + Self { + requested_block_root: block_root, + state: SingleLookupRequestState::new(peers), + _phantom: PhantomData, + } + } +} + +/// This is the status of cached components for a lookup if they are required. It provides information +/// about whether we should send a responses immediately for processing, whether we require more +/// responses, or whether all cached components have been received and the reconstructed block +/// should be sent for processing. +pub enum CachedChild { + /// All child components have been received, this is the reconstructed block, including all. + /// It has been checked for consistency between blobs and block, but no consensus checks have + /// been performed and no kzg verification has been performed. + Ok(RpcBlock), + /// All child components have not yet been received. + DownloadIncomplete, + /// Child components should not be cached, send this directly for processing. + NotRequired, + /// There was an error during consistency checks between block and blobs. + Err(AvailabilityCheckError), +} + +/// For requests triggered by an `UnknownBlockParent` or `UnknownBlobParent`, this struct +/// is used to cache components as they are sent to the network service. We can't use the +/// data availability cache currently because any blocks or blobs without parents +/// won't pass validation and therefore won't make it into the cache. +#[derive(Default)] +pub struct ChildComponents { + pub downloaded_block: Option>>, + pub downloaded_blobs: FixedBlobSidecarList, +} + +impl From> for ChildComponents { + fn from(value: RpcBlock) -> Self { + let (block, blobs) = value.deconstruct(); + let fixed_blobs = blobs.map(|blobs| { + FixedBlobSidecarList::from(blobs.into_iter().map(Some).collect::>()) + }); + Self::new(Some(block), fixed_blobs) + } +} + +impl ChildComponents { + pub fn new( + block: Option>>, + blobs: Option>, + ) -> Self { + Self { + downloaded_block: block, + downloaded_blobs: blobs.unwrap_or_default(), + } + } + + pub fn clear_blobs(&mut self) { + self.downloaded_blobs = FixedBlobSidecarList::default(); + } + + pub fn add_unknown_parent_block(&mut self, block: Arc>) { + self.downloaded_block = Some(block); + } + + pub fn add_unknown_parent_blobs(&mut self, blobs: FixedBlobSidecarList) { + for (index, blob_opt) in self.downloaded_blobs.iter_mut().enumerate() { + if let Some(Some(downloaded_blob)) = blobs.get(index) { + *blob_opt = Some(downloaded_blob.clone()); + } + } + } + + pub fn downloaded_indices(&self) -> HashSet { + self.downloaded_blobs + .iter() + .enumerate() + .filter_map(|(i, blob_opt)| blob_opt.as_ref().map(|_| i)) + .collect::>() + } +} + +/// Object representing the state of a single block or blob lookup request. +#[derive(PartialEq, Eq, Debug)] +pub struct SingleLookupRequestState { + /// State of this request. + pub state: State, + /// Peers that should have this block or blob. + pub available_peers: HashSet, + /// Peers that mar or may not have this block or blob. + pub potential_peers: HashSet, + /// Peers from which we have requested this block. + pub used_peers: HashSet, + /// How many times have we attempted to process this block or blob. + pub failed_processing: u8, + /// How many times have we attempted to download this block or blob. + pub failed_downloading: u8, + /// Whether or not we have downloaded this block or blob. + pub component_downloaded: bool, + /// Whether or not we have processed this block or blob. + pub component_processed: bool, + /// Should be incremented everytime this request is retried. The purpose of this is to + /// differentiate retries of the same block/blob request within a lookup. We currently penalize + /// peers and retry requests prior to receiving the stream terminator. This means responses + /// from a prior request may arrive after a new request has been sent, this counter allows + /// us to differentiate these two responses. + pub req_counter: u32, } impl SingleLookupRequestState { @@ -805,7 +501,9 @@ impl SingleLookupRequestState { used_peers: HashSet::default(), failed_processing: 0, failed_downloading: 0, + component_downloaded: false, component_processed: false, + req_counter: 0, } } @@ -827,11 +525,13 @@ impl SingleLookupRequestState { self.failed_processing + self.failed_downloading } + /// This method should be used for peers wrapped in `PeerShouldHave::BlockAndBlobs`. pub fn add_peer(&mut self, peer_id: &PeerId) { self.potential_peers.remove(peer_id); self.available_peers.insert(*peer_id); } + /// This method should be used for peers wrapped in `PeerShouldHave::Neither`. pub fn add_potential_peer(&mut self, peer_id: &PeerId) { if !self.available_peers.contains(peer_id) { self.potential_peers.insert(*peer_id); @@ -852,6 +552,8 @@ impl SingleLookupRequestState { Ok(()) } + /// Returns the id peer we downloaded from if we have downloaded a verified block, otherwise + /// returns an error. pub fn processing_peer(&self) -> Result { if let State::Processing { peer_id } = &self.state { Ok(*peer_id) @@ -860,6 +562,8 @@ impl SingleLookupRequestState { } } + /// Remove the given peer from the set of potential peers, so long as there is at least one + /// other potential peer or we have any available peers. pub fn remove_peer_if_useless(&mut self, peer_id: &PeerId) { if !self.available_peers.is_empty() || self.potential_peers.len() > 1 { self.potential_peers.remove(peer_id); @@ -876,10 +580,7 @@ impl slog::Value for SingleBlockLookup { ) -> slog::Result { serializer.emit_str("request", key)?; serializer.emit_arguments("lookup_type", &format_args!("{:?}", L::lookup_type()))?; - serializer.emit_arguments( - "hash", - &format_args!("{}", self.block_request_state.requested_block_root), - )?; + serializer.emit_arguments("hash", &format_args!("{}", self.block_root()))?; serializer.emit_arguments( "blob_ids", &format_args!("{:?}", self.blob_request_state.requested_ids), @@ -924,6 +625,8 @@ impl slog::Value for SingleLookupRequestState { #[cfg(test)] mod tests { use super::*; + use crate::sync::block_lookups::common::LookupType; + use crate::sync::block_lookups::common::{Lookup, RequestState}; use beacon_chain::builder::Witness; use beacon_chain::eth1_chain::CachingEth1Backend; use sloggers::null::NullLoggerBuilder; diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index d7c9688e50..305c0279cb 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -1,12 +1,13 @@ use crate::network_beacon_processor::NetworkBeaconProcessor; use crate::service::RequestId; -use crate::sync::manager::RequestId as SyncId; +use crate::sync::manager::{RequestId as SyncId, SingleLookupReqId}; use crate::NetworkMessage; use std::sync::Arc; use super::*; +use crate::sync::block_lookups::common::ResponseType; use beacon_chain::builder::Witness; use beacon_chain::eth1_chain::CachingEth1Backend; use beacon_chain::test_utils::{build_log, BeaconChainHarness, EphemeralHarnessType}; @@ -156,7 +157,7 @@ impl TestRig { } #[track_caller] - fn expect_block_request(&mut self, response_type: ResponseType) -> Id { + fn expect_block_request(&mut self, response_type: ResponseType) -> SingleLookupReqId { match response_type { ResponseType::Block => match self.network_rx.try_recv() { Ok(NetworkMessage::SendRequest { @@ -182,7 +183,7 @@ impl TestRig { } #[track_caller] - fn expect_parent_request(&mut self, response_type: ResponseType) -> Id { + fn expect_parent_request(&mut self, response_type: ResponseType) -> SingleLookupReqId { match response_type { ResponseType::Block => match self.network_rx.try_recv() { Ok(NetworkMessage::SendRequest { @@ -322,7 +323,7 @@ fn test_single_block_lookup_happy_path() { // after processing. bl.single_lookup_response::>(id, peer_id, None, D, &cx); bl.single_block_component_processed::>( - id, + id.id, BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), &mut cx, ); @@ -471,7 +472,7 @@ fn test_single_block_lookup_becomes_parent_request() { // Send the stream termination. Peer should have not been penalized, and the request moved to a // parent request after processing. bl.single_block_component_processed::>( - id, + id.id, BlockError::ParentUnknown(block.into()).into(), &mut cx, ); @@ -766,6 +767,14 @@ fn test_parent_lookup_too_many_attempts() { &cx, ); // Send the stream termination + + // Note, previously we would send the same lookup id with a stream terminator, + // we'd ignore it because we'd intrepret it as an unrequested response, since + // we already got one response for the block. I'm not sure what the intent is + // for having this stream terminator line in this test at all. Receiving an invalid + // block and a stream terminator with the same Id now results in two failed attempts, + // I'm unsure if this is how it should behave? + // bl.parent_lookup_response::>(id, peer_id, None, D, &cx); rig.expect_penalty(); } @@ -1051,7 +1060,7 @@ fn test_single_block_lookup_ignored_response() { bl.single_lookup_response::>(id, peer_id, None, D, &cx); // Send an Ignored response, the request should be dropped bl.single_block_component_processed::>( - id, + id.id, BlockProcessingResult::Ignored, &mut cx, ); @@ -1216,6 +1225,7 @@ fn test_same_chain_race_condition() { mod deneb_only { use super::*; + use crate::sync::block_lookups::common::ResponseType; use beacon_chain::blob_verification::BlobError; use std::ops::IndexMut; use std::str::FromStr; @@ -1229,10 +1239,10 @@ mod deneb_only { parent_block: Option>>, parent_blobs: Vec>>, peer_id: PeerId, - block_req_id: Option, - parent_block_req_id: Option, - blob_req_id: Option, - parent_blob_req_id: Option, + block_req_id: Option, + parent_block_req_id: Option, + blob_req_id: Option, + parent_blob_req_id: Option, slot: Slot, block_root: Hash256, } @@ -1296,7 +1306,7 @@ mod deneb_only { block_root = child_root; bl.search_child_block( child_root, - Some(UnknownParentComponents::new(Some(child_block), None)), + Some(ChildComponents::new(Some(child_block), None)), &[PeerShouldHave::Neither(peer_id)], &mut cx, ); @@ -1334,7 +1344,7 @@ mod deneb_only { *blobs.index_mut(0) = Some(child_blob); bl.search_child_block( child_root, - Some(UnknownParentComponents::new(None, Some(blobs))), + Some(ChildComponents::new(None, Some(blobs))), &[PeerShouldHave::Neither(peer_id)], &mut cx, ); @@ -1531,7 +1541,7 @@ mod deneb_only { // mean we do not send a new request. self.bl .single_block_component_processed::>( - self.block_req_id.expect("block request id"), + self.block_req_id.expect("block request id").id, BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported( self.block_root, )), @@ -1578,7 +1588,7 @@ mod deneb_only { fn invalid_block_processed(mut self) -> Self { self.bl .single_block_component_processed::>( - self.block_req_id.expect("block request id"), + self.block_req_id.expect("block request id").id, BlockProcessingResult::Err(BlockError::ProposalSignatureInvalid), &mut self.cx, ); @@ -1589,7 +1599,7 @@ mod deneb_only { fn invalid_blob_processed(mut self) -> Self { self.bl .single_block_component_processed::>( - self.blob_req_id.expect("blob request id"), + self.blob_req_id.expect("blob request id").id, BlockProcessingResult::Err(BlockError::BlobValidation( BlobError::ProposerSignatureInvalid, )), @@ -1602,7 +1612,7 @@ mod deneb_only { fn missing_components_from_block_request(mut self) -> Self { self.bl .single_block_component_processed::>( - self.block_req_id.expect("block request id"), + self.block_req_id.expect("block request id").id, BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( self.slot, self.block_root, @@ -1616,7 +1626,7 @@ mod deneb_only { fn missing_components_from_blob_request(mut self) -> Self { self.bl .single_block_component_processed::>( - self.blob_req_id.expect("blob request id"), + self.blob_req_id.expect("blob request id").id, BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( self.slot, self.block_root, diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index cedad8890f..696a2949d5 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -41,11 +41,10 @@ use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor}; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; +use crate::sync::block_lookups::common::{Current, Parent}; use crate::sync::block_lookups::delayed_lookup; use crate::sync::block_lookups::delayed_lookup::DelayedLookupMessage; -use crate::sync::block_lookups::{ - BlobRequestState, BlockRequestState, Current, Parent, UnknownParentComponents, -}; +use crate::sync::block_lookups::{BlobRequestState, BlockRequestState, ChildComponents}; use crate::sync::range_sync::ByRangeRequestType; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::block_verification_types::RpcBlock; @@ -84,45 +83,39 @@ pub const DELAY_QUEUE_CHANNEL_SIZE: usize = 128; pub type Id = u32; -#[derive(Debug, Copy, Clone)] -pub enum ResponseType { - Block, - Blob, +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct SingleLookupReqId { + pub id: Id, + pub req_counter: Id, +} + +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct ParentLookupReqId { + pub id: Id, + pub req_counter: Id, } /// Id of rpc requests sent by sync to the network. #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum RequestId { /// Request searching for a block given a hash. - SingleBlock { - id: Id, - }, - SingleBlob { - id: Id, - }, - /// Request searching for a block's parent. The id is the chain - ParentLookup { - id: Id, - }, - ParentLookupBlob { - id: Id, - }, + SingleBlock { id: SingleLookupReqId }, + /// Request searching for a set of blobs given a hash. + SingleBlob { id: SingleLookupReqId }, + /// Request searching for a block's parent. The id is the chain, share with the corresponding + /// blob id. + ParentLookup { id: SingleLookupReqId }, + /// Request searching for a block's parent blobs. The id is the chain, shared with the corresponding + /// block id. + ParentLookupBlob { id: SingleLookupReqId }, /// Request was from the backfill sync algorithm. - BackFillBlocks { - id: Id, - }, + BackFillBlocks { id: Id }, /// Backfill request that is composed by both a block range request and a blob range request. - BackFillBlockAndBlobs { - id: Id, - }, + BackFillBlockAndBlobs { id: Id }, /// The request was from a chain in the range sync algorithm. - RangeBlocks { - id: Id, - }, + RangeBlocks { id: Id }, /// Range request that is composed by both a block range request and a blob range request. - RangeBlockAndBlobs { - id: Id, - }, + RangeBlockAndBlobs { id: Id }, } #[derive(Debug)] @@ -680,7 +673,7 @@ impl SyncManager { block_root, parent_root, blob_slot, - Some(UnknownParentComponents::new(None, Some(blobs))), + Some(ChildComponents::new(None, Some(blobs))), ); } SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_hash) => { @@ -718,16 +711,9 @@ impl SyncManager { } } } - SyncMessage::MissingGossipBlockComponentsDelayed(block_root) => { - if self - .block_lookups - .trigger_lookup_by_root(block_root, &self.network) - .is_err() - { - // No request was made for block or blob so the lookup is dropped. - self.block_lookups.remove_lookup_by_root(block_root); - } - } + SyncMessage::MissingGossipBlockComponentsDelayed(block_root) => self + .block_lookups + .trigger_lookup_by_root(block_root, &self.network), SyncMessage::Disconnect(peer_id) => { self.peer_disconnect(&peer_id); } @@ -796,7 +782,7 @@ impl SyncManager { block_root: Hash256, parent_root: Hash256, slot: Slot, - parent_components: Option>, + parent_components: Option>, ) { if self.should_search_for_block(slot, &peer_id) { self.block_lookups.search_parent( @@ -951,7 +937,7 @@ impl SyncManager { seen_timestamp, &self.network, ), - RequestId::SingleBlob { id: _ } => { + RequestId::SingleBlob { .. } => { crit!(self.log, "Blob received during block request"; "peer_id" => %peer_id ); } RequestId::ParentLookup { id } => self @@ -1023,7 +1009,7 @@ impl SyncManager { seen_timestamp: Duration, ) { match request_id { - RequestId::SingleBlock { id: _ } => { + RequestId::SingleBlock { .. } => { crit!(self.log, "Single blob received during block request"; "peer_id" => %peer_id ); } RequestId::SingleBlob { id } => self diff --git a/beacon_node/network/src/sync/mod.rs b/beacon_node/network/src/sync/mod.rs index 1dd33bd31c..23e055f60a 100644 --- a/beacon_node/network/src/sync/mod.rs +++ b/beacon_node/network/src/sync/mod.rs @@ -9,6 +9,6 @@ mod network_context; mod peer_sync_info; mod range_sync; -pub use block_lookups::UnknownParentComponents; +pub use block_lookups::ChildComponents; pub use manager::{BatchProcessResult, SyncMessage}; pub use range_sync::{BatchOperationOutcome, ChainId}; diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index b7c6de2fc6..adc9469d71 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -7,7 +7,8 @@ use super::range_sync::{BatchId, ByRangeRequestType, ChainId}; use crate::network_beacon_processor::NetworkBeaconProcessor; use crate::service::{NetworkMessage, RequestId}; use crate::status::ToStatusMessage; -use crate::sync::block_lookups::LookupType; +use crate::sync::block_lookups::common::LookupType; +use crate::sync::manager::SingleLookupReqId; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState}; use fnv::FnvHashMap; @@ -404,23 +405,9 @@ impl SyncNetworkContext { } } - /// Sends a blocks by root request for a parent request. - pub fn single_lookup_request( + pub fn block_lookup_request( &self, - id: Id, - peer_id: PeerId, - request: BlocksByRootRequest, - blob_peer_id: PeerId, - blob_request: BlobsByRootRequest, - lookup_type: LookupType, - ) -> Result<(), &'static str> { - self.single_block_lookup_request_retry(id, peer_id, request, lookup_type)?; - self.single_blob_lookup_request_retry(id, blob_peer_id, blob_request, lookup_type)?; - Ok(()) - } - pub fn single_block_lookup_request_retry( - &self, - id: Id, + id: SingleLookupReqId, peer_id: PeerId, request: BlocksByRootRequest, lookup_type: LookupType, @@ -448,14 +435,18 @@ impl SyncNetworkContext { Ok(()) } - pub fn single_blob_lookup_request_retry( + pub fn blob_lookup_request( &self, - id: Id, + id: SingleLookupReqId, blob_peer_id: PeerId, blob_request: BlobsByRootRequest, lookup_type: LookupType, ) -> Result<(), &'static str> { - let request_id = RequestId::Sync(SyncRequestId::SingleBlob { id }); + let sync_id = match lookup_type { + LookupType::Current => SyncRequestId::SingleBlock { id }, + LookupType::Parent => SyncRequestId::ParentLookup { id }, + }; + let request_id = RequestId::Sync(sync_id); if !blob_request.blob_ids.is_empty() { trace!( @@ -558,6 +549,12 @@ impl SyncNetworkContext { "To deal with alignment with deneb boundaries, batches need to be of just one epoch" ); + #[cfg(test)] + { + // Keep tests only for blocks. + ByRangeRequestType::Blocks + } + #[cfg(not(test))] if let Some(data_availability_boundary) = self.chain.data_availability_boundary() { if epoch >= data_availability_boundary { ByRangeRequestType::BlocksAndBlobs