From 8a6e8d51b6428efadf7bbaf8427fe7f2a228b0ac Mon Sep 17 00:00:00 2001 From: realbigsean Date: Tue, 25 Jul 2023 10:45:58 -0400 Subject: [PATCH] make block and blob single lookups generic --- beacon_node/network/src/router.rs | 13 +- .../network/src/sync/block_lookups/mod.rs | 1025 +++++------------ .../src/sync/block_lookups/parent_lookup.rs | 205 +--- .../sync/block_lookups/single_block_lookup.rs | 895 ++++++++------ .../network/src/sync/block_lookups/tests.rs | 20 +- beacon_node/network/src/sync/manager.rs | 182 ++- .../network/src/sync/network_context.rs | 175 ++- 7 files changed, 1114 insertions(+), 1401 deletions(-) diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 30a75a9105..927229e7b7 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -482,7 +482,10 @@ impl Router { ) { let request_id = match request_id { RequestId::Sync(sync_id) => match sync_id { - SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. } => { + SyncId::SingleBlock { .. } + | SyncId::SingleBlob { .. } + | SyncId::ParentLookup { .. } + | SyncId::ParentLookupBlob { .. } => { unreachable!("Block lookups do not request BBRange requests") } id @ (SyncId::BackFillBlocks { .. } @@ -550,6 +553,9 @@ impl Router { | SyncId::BackFillBlockAndBlobs { .. } => { unreachable!("Batch syncing do not request BBRoot requests") } + SyncId::SingleBlob { .. } | SyncId::ParentLookupBlob { .. } => { + unreachable!("Blob response to block by roots request") + } }, RequestId::Router => unreachable!("All BBRoot requests belong to sync"), }; @@ -576,7 +582,10 @@ impl Router { ) { let request_id = match request_id { RequestId::Sync(sync_id) => match sync_id { - id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id, + id @ (SyncId::SingleBlob { .. } | SyncId::ParentLookupBlob { .. }) => id, + SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. } => { + unreachable!("Block response to blobs by roots request") + } SyncId::BackFillBlocks { .. } | SyncId::RangeBlocks { .. } | SyncId::RangeBlockAndBlobs { .. } diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index ff095c719e..aef88bfc3e 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -1,22 +1,24 @@ -use self::parent_lookup::PARENT_FAIL_TOLERANCE; -use self::parent_lookup::{ParentLookup, ParentVerifyError}; -use self::single_block_lookup::{LookupVerifyError, SingleBlockLookup}; +use self::parent_lookup::ParentVerifyError; +use self::single_block_lookup::SingleBlockLookup; use super::manager::BlockProcessingResult; use super::BatchProcessResult; -use super::{ - manager::{BlockProcessType, Id}, - network_context::SyncNetworkContext, -}; +use super::{manager::BlockProcessType, network_context::SyncNetworkContext}; use crate::metrics; use crate::network_beacon_processor::ChainSegmentProcessId; -use crate::sync::block_lookups::single_block_lookup::LookupId; +use crate::sync::block_lookups::parent_lookup::ParentLookup; +use crate::sync::block_lookups::single_block_lookup::LookupVerifyError; +use crate::sync::manager::{Id, ResponseType}; use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; use beacon_chain::data_availability_checker::{AvailabilityCheckError, DataAvailabilityChecker}; use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; +use fnv::FnvHashMap; use lighthouse_network::rpc::RPCError; use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; pub use single_block_lookup::UnknownParentComponents; +pub use single_block_lookup::{ + BlobRequestState, BlockRequestState, Current, Lookup, Parent, RequestState, +}; use slog::{debug, error, trace, warn, Logger}; use smallvec::SmallVec; use std::collections::HashMap; @@ -26,7 +28,7 @@ use std::time::Duration; use store::{Hash256, SignedBeaconBlock}; use strum::Display; use types::blob_sidecar::FixedBlobSidecarList; -use types::{BlobSidecar, Slot}; +use types::{BlobSidecar, EthSpec, Slot}; pub(crate) mod delayed_lookup; mod parent_lookup; @@ -39,19 +41,18 @@ pub type RootBlockTuple = (Hash256, Arc>); pub type RootBlobsTuple = (Hash256, FixedBlobSidecarList); const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60; -const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3; +pub const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3; -pub(crate) struct BlockLookups { +pub struct BlockLookups { /// Parent chain lookups being downloaded. parent_lookups: SmallVec<[ParentLookup; 3]>, - processing_parent_lookups: - HashMap, SingleBlockLookup)>, + processing_parent_lookups: HashMap, SingleBlockLookup)>, /// A cache of failed chain lookups to prevent duplicate searches. failed_chains: LRUTimeCache, - single_block_lookups: Vec>, + single_block_lookups: FnvHashMap>, da_checker: Arc>, @@ -59,34 +60,6 @@ pub(crate) struct BlockLookups { log: Logger, } -pub type BlockRequestId = Id; -pub type BlobRequestId = Id; - -#[derive(Debug, PartialEq)] -enum StreamTerminator { - True, - False, -} - -impl From for StreamTerminator { - fn from(value: bool) -> Self { - if value { - StreamTerminator::True - } else { - StreamTerminator::False - } - } -} - -/// Used to track block or blob responses in places we want to reduce code duplication in -/// response handling. -// NOTE: a better solution may be to wrap request `Id` in an enum. -#[derive(Debug, Copy, Clone)] -pub enum ResponseType { - Block, - Blob, -} - /// This enum is used to track what a peer *should* be able to respond with respond based on /// other messages we've seen from this peer on the network. This is useful for peer scoring. /// We expect a peer tracked by the `BlockAndBlobs` variant to be able to respond to all @@ -124,13 +97,6 @@ impl PeerShouldHave { } } -/// Tracks the conditions under which we want to drop a parent or single block lookup. -#[derive(Debug, Copy, Clone)] -pub enum ShouldRemoveLookup { - True, - False, -} - impl BlockLookups { pub fn new(da_checker: Arc>, log: Logger) -> Self { Self { @@ -152,9 +118,9 @@ impl BlockLookups { &mut self, block_root: Hash256, peer_source: PeerShouldHave, - cx: &mut SyncNetworkContext, + cx: &SyncNetworkContext, ) { - let lookup = self.search_block_with(block_root, None, &[peer_source]); + let lookup = self.new_current_lookup(block_root, None, &[peer_source], cx); if let Some(lookup) = lookup { self.trigger_single_lookup(lookup, cx); } @@ -163,8 +129,13 @@ impl BlockLookups { /// /// The request is not immediately triggered, and should be triggered by a call to /// `trigger_lookup_by_root`. - pub fn search_block_delayed(&mut self, block_root: Hash256, peer_source: PeerShouldHave) { - let lookup = self.search_block_with(block_root, None, &[peer_source]); + pub fn search_block_delayed( + &mut self, + block_root: Hash256, + peer_source: PeerShouldHave, + cx: &SyncNetworkContext, + ) { + let lookup = self.new_current_lookup(block_root, None, &[peer_source], cx); if let Some(lookup) = lookup { self.add_single_lookup(lookup) } @@ -181,9 +152,9 @@ impl BlockLookups { block_root: Hash256, parent_components: Option>, peer_source: &[PeerShouldHave], - cx: &mut SyncNetworkContext, + cx: &SyncNetworkContext, ) { - let lookup = self.search_block_with(block_root, parent_components, peer_source); + let lookup = self.new_current_lookup(block_root, parent_components, peer_source, cx); if let Some(lookup) = lookup { self.trigger_single_lookup(lookup, cx); } @@ -201,8 +172,9 @@ impl BlockLookups { block_root: Hash256, parent_components: Option>, peer_source: &[PeerShouldHave], + cx: &SyncNetworkContext, ) { - let lookup = self.search_block_with(block_root, parent_components, peer_source); + let lookup = self.new_current_lookup(block_root, parent_components, peer_source, cx); if let Some(lookup) = lookup { self.add_single_lookup(lookup) } @@ -211,8 +183,8 @@ impl BlockLookups { /// Attempts to trigger the request matching the given `block_root`. pub fn trigger_single_lookup( &mut self, - mut single_block_lookup: SingleBlockLookup, - cx: &mut SyncNetworkContext, + mut single_block_lookup: SingleBlockLookup, + cx: &SyncNetworkContext, ) { if !single_block_lookup.triggered && single_block_lookup.request_block_and_blobs(cx).is_ok() { @@ -221,11 +193,9 @@ impl BlockLookups { } } - pub fn add_single_lookup( - &mut self, - single_block_lookup: SingleBlockLookup, - ) { - self.single_block_lookups.push(single_block_lookup); + pub fn add_single_lookup(&mut self, single_block_lookup: SingleBlockLookup) { + self.single_block_lookups + .insert(single_block_lookup.id, single_block_lookup); metrics::set_gauge( &metrics::SYNC_SINGLE_BLOCK_LOOKUPS, @@ -236,12 +206,13 @@ impl BlockLookups { pub fn trigger_lookup_by_root( &mut self, block_root: Hash256, - cx: &mut SyncNetworkContext, + cx: &SyncNetworkContext, ) -> Result<(), ()> { - for lookup in self.single_block_lookups.iter_mut() { + for (_, lookup) in self.single_block_lookups.iter_mut() { if lookup.block_request_state.requested_block_root == block_root && !lookup.triggered { - lookup.request_block_and_blobs(cx)?; - lookup.triggered = true; + if lookup.request_block_and_blobs(cx).is_ok() { + lookup.triggered = true; + } } } Ok(()) @@ -249,22 +220,23 @@ impl BlockLookups { pub fn remove_lookup_by_root(&mut self, block_root: Hash256) { self.single_block_lookups - .retain(|lookup| lookup.block_request_state.requested_block_root != block_root); + .retain(|_id, lookup| lookup.block_request_state.requested_block_root != block_root); } /// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is /// constructed. - pub fn search_block_with( + pub fn new_current_lookup( &mut self, block_root: Hash256, parent_components: Option>, peers: &[PeerShouldHave], - ) -> Option> { + cx: &SyncNetworkContext, + ) -> Option> { // Do not re-request a block that is already being requested - if let Some(lookup) = self + if let Some((_, lookup)) = self .single_block_lookups .iter_mut() - .find(|lookup| lookup.is_for_block(block_root)) + .find(|(id, lookup)| lookup.is_for_block(block_root)) { lookup.add_peers(peers); if let Some(components) = parent_components { @@ -304,6 +276,7 @@ impl BlockLookups { parent_components, peers, self.da_checker.clone(), + cx, )) } @@ -315,7 +288,7 @@ impl BlockLookups { block_root: Hash256, parent_root: Hash256, peer_id: PeerId, - cx: &mut SyncNetworkContext, + cx: &SyncNetworkContext, ) { // Gossip blocks or blobs shouldn't be propagated if parents are unavailable. let peer_source = PeerShouldHave::BlockAndBlobs(peer_id); @@ -345,210 +318,131 @@ impl BlockLookups { // we are already processing this block, ignore it. return; } - - let parent_lookup = ParentLookup::new( + let mut parent_lookup = ParentLookup::new( block_root, parent_root, peer_source, self.da_checker.clone(), + cx, ); - self.request_parent_block_and_blobs(parent_lookup, cx); + if let Ok(()) = parent_lookup + .current_parent_request + .request_block_and_blobs(cx) + { + self.parent_lookups.push(parent_lookup); + } } /* Lookup responses */ - pub fn single_block_lookup_response( + pub fn single_lookup_response>( &mut self, id: Id, peer_id: PeerId, - block: Option>>, + response: Option, seen_timestamp: Duration, - cx: &mut SyncNetworkContext, + cx: &SyncNetworkContext, ) { - let stream_terminator = block.is_none().into(); + let is_stream_terminator = response.is_none(); + let response_type = R::response_type(); let log = self.log.clone(); - let Some((has_pending_parent_request, request_ref)) = self.find_single_lookup_request(id, stream_terminator, ResponseType::Block) else { + let Some(lookup) = self.single_block_lookups.get_mut(&id) else { + if !is_stream_terminator { + warn!( + self.log, + "Block returned for single block lookup not present"; + "response_type" => ?response_type, + ); + } return; }; - let should_remove = match request_ref.verify_block(block) { - Ok(Some((block_root, block))) => { - if let Some(parent_components) = request_ref.unknown_parent_components.as_mut() { - parent_components.add_unknown_parent_block(block.clone()); + let expected_block_root = lookup.block_request_state.requested_block_root; + + let has_pending_parent_request = self + .parent_lookups + .iter() + .any(|parent_lookup| parent_lookup.chain_hash() == expected_block_root); + + let request_state = R::request_state_mut(lookup); + + match request_state.verify_response(expected_block_root, response) { + Ok(Some((root, verified_response))) => { + if let Some(parent_components) = lookup.unknown_parent_components.as_mut() { + R::add_to_parent_components(verified_response, parent_components); + + if !has_pending_parent_request { + if let Some(rpc_block) = lookup.get_downloaded_block() { + if let Err(()) = self.send_block_for_processing( + expected_block_root, + rpc_block, + seen_timestamp, + BlockProcessType::SingleBlock { id }, + cx, + ) { + self.single_block_lookups.remove(&id); + } + } + } + } else { + if let Err(()) = R::send_for_processing( + id, + self, + root, + R::verified_to_reconstructed(verified_response), + seen_timestamp, + &cx, + ) { + self.single_block_lookups.remove(&id); + } + } + } + Ok(None) => {} + Err(e) => { + let msg = if matches!(e, LookupVerifyError::BenignFailure) { + request_state + .get_state_mut() + .remove_peer_if_useless(&peer_id); + "peer could not response to request" + } else { + let msg = e.into(); + cx.report_peer(peer_id, PeerAction::LowToleranceError, msg); + msg }; - if !has_pending_parent_request { - let rpc_block = request_ref - .get_downloaded_block() - .unwrap_or(RpcBlock::new_without_blobs(block)); - // This is the correct block, send it for processing - match self.send_block_for_processing( - block_root, - rpc_block, - seen_timestamp, - BlockProcessType::SingleBlock { id }, - cx, - ) { - Ok(()) => ShouldRemoveLookup::False, - Err(()) => ShouldRemoveLookup::True, - } - } else { - ShouldRemoveLookup::False + debug!(log, "Single block lookup failed"; + "peer_id" => %peer_id, + "error" => msg, + "block_root" => ?expected_block_root, + "response_type" => ?response_type + ); + if let Err(()) = request_state.retry_request_after_failure(id, cx, &log) { + self.single_block_lookups.remove(&id); } } - Ok(None) => ShouldRemoveLookup::False, - Err(e) => handle_block_lookup_verify_error( - request_ref, - ResponseType::Block, - peer_id, - e, - cx, - &log, - ), - }; - - if matches!(should_remove, ShouldRemoveLookup::True) { - self.single_block_lookups - .retain(|req| req.id.block_request_id != Some(id)); } + //TODO(sean) move metric to trait to differentiate block and blob metrics::set_gauge( &metrics::SYNC_SINGLE_BLOCK_LOOKUPS, self.single_block_lookups.len() as i64, ); } - pub fn single_blob_lookup_response( - &mut self, - id: Id, - peer_id: PeerId, - blob: Option>>, - seen_timestamp: Duration, - cx: &mut SyncNetworkContext, - ) { - let stream_terminator = blob.is_none().into(); - - let log = self.log.clone(); - - let Some((has_pending_parent_requests, request_ref)) = - self.find_single_lookup_request(id, stream_terminator, ResponseType::Blob) else { - return; - }; - - let should_remove = match request_ref.verify_blob(blob) { - Ok(Some((block_root, blobs))) => { - if let Some(parent_components) = request_ref.unknown_parent_components.as_mut() { - parent_components.add_unknown_parent_blobs(blobs); - - if !has_pending_parent_requests { - request_ref - .get_downloaded_block() - .map(|block| { - match self.send_block_for_processing( - block_root, - block, - seen_timestamp, - BlockProcessType::SingleBlock { id }, - cx, - ) { - Ok(()) => ShouldRemoveLookup::False, - Err(()) => ShouldRemoveLookup::True, - } - }) - .unwrap_or(ShouldRemoveLookup::False) - } else { - ShouldRemoveLookup::False - } - } else { - // These are the correct blobs, send them for processing - match self.send_blobs_for_processing( - block_root, - blobs, - seen_timestamp, - BlockProcessType::SingleBlock { id }, - cx, - ) { - Ok(()) => ShouldRemoveLookup::False, - Err(()) => ShouldRemoveLookup::True, - } - } - } - Ok(None) => ShouldRemoveLookup::False, - Err(e) => handle_block_lookup_verify_error( - request_ref, - ResponseType::Blob, - peer_id, - e, - cx, - &log, - ), - }; - - if matches!(should_remove, ShouldRemoveLookup::True) { - self.single_block_lookups - .retain(|req| req.id.blob_request_id != Some(id)); - } - - metrics::set_gauge( - &metrics::SYNC_SINGLE_BLOCK_LOOKUPS, - self.single_block_lookups.len() as i64, - ); - } - - /// Returns the lookup along with a `bool` representing whether the lookup has an outstanding - /// parent lookup that has yet to be resolved. This determines whether we send the - /// block or blob for processing because we would fail block processing and trigger a new lookup - /// via `UnknownParentBlock` or `UnknownParentBlob` until we process the parent. - fn find_single_lookup_request( - &mut self, - target_id: Id, - stream_terminator: StreamTerminator, - response_type: ResponseType, - ) -> Option<( - bool, - &mut SingleBlockLookup, - )> { - let lookup = self.single_block_lookups.iter_mut().find_map(|req| { - let id_opt = match response_type { - ResponseType::Block => req.id.block_request_id, - ResponseType::Blob => req.id.blob_request_id, - }; - if let Some(lookup_id) = id_opt { - if lookup_id == target_id { - let has_pending_parent_request = self.parent_lookups.iter().any(|lookup| { - lookup.chain_hash() == req.block_request_state.requested_block_root - }); - - return Some((has_pending_parent_request, req)); - } - } - None - }); - - if lookup.is_none() && matches!(stream_terminator, StreamTerminator::False) { - warn!( - self.log, - "Block returned for single block lookup not present"; - "response_type" => ?response_type, - ); - } - lookup - } - /// Process a response received from a parent lookup request. - pub fn parent_lookup_response( + pub fn parent_lookup_response>( &mut self, id: Id, peer_id: PeerId, - block: Option>>, + block: Option, seen_timestamp: Duration, - cx: &mut SyncNetworkContext, + cx: &SyncNetworkContext, ) { let mut parent_lookup = if let Some(pos) = self .parent_lookups .iter() - .position(|request| request.pending_block_response(id)) + .position(|request| request.current_parent_request.id == id) { self.parent_lookups.remove(pos) } else { @@ -558,9 +452,15 @@ impl BlockLookups { return; }; - match parent_lookup.verify_block(block, &mut self.failed_chains) { - Ok(Some((block_root, block))) => { - parent_lookup.add_current_request_block(block); + match parent_lookup.verify_block::(block, &mut self.failed_chains) { + Ok(Some((block_root, verified_response))) => { + if let Some(parent_components) = parent_lookup + .current_parent_request + .unknown_parent_components + .as_mut() + { + R::add_to_parent_components(verified_response, parent_components); + } if let Some(rpc_block) = parent_lookup.current_parent_request.get_downloaded_block() { let chain_hash = parent_lookup.chain_hash(); @@ -577,27 +477,9 @@ impl BlockLookups { self.parent_lookups.push(parent_lookup) } } else { - let outstanding_blobs_req = parent_lookup - .current_parent_request - .id - .blob_request_id - .is_some(); - if !outstanding_blobs_req { - if let Ok(peer_id) = parent_lookup - .current_parent_request - .downloading_peer(ResponseType::Blob) - { - cx.report_peer( - peer_id.to_peer_id(), - PeerAction::MidToleranceError, - "bbroot_failed_chains", - ); - } - - self.request_parent_blobs(parent_lookup, cx); - } else { - self.parent_lookups.push(parent_lookup) - } + //TODO(sean) here, we could penalize a peer who previously sent us a blob list + // that was incomplete, and trigger a re-request immediately + self.parent_lookups.push(parent_lookup) } } Ok(None) => { @@ -605,82 +487,22 @@ impl BlockLookups { // processing result arrives. self.parent_lookups.push(parent_lookup); } - Err(e) => { - self.handle_parent_verify_error(peer_id, parent_lookup, ResponseType::Block, e, cx) - } + Err(e) => self.handle_parent_verify_error::(peer_id, parent_lookup, e, cx), }; + //TODO(sean) move metric to trait to differentiate block and blob metrics::set_gauge( &metrics::SYNC_PARENT_BLOCK_LOOKUPS, self.parent_lookups.len() as i64, ); } - pub fn parent_lookup_blob_response( - &mut self, - id: Id, - peer_id: PeerId, - blob: Option>>, - seen_timestamp: Duration, - cx: &mut SyncNetworkContext, - ) { - let mut parent_lookup = if let Some(pos) = self - .parent_lookups - .iter() - .position(|request| request.pending_blob_response(id)) - { - self.parent_lookups.remove(pos) - } else { - if blob.is_some() { - debug!(self.log, "Response for a parent lookup blob request that was not found"; "peer_id" => %peer_id); - } - return; - }; - - match parent_lookup.verify_blob(blob, &mut self.failed_chains) { - Ok(Some((block_root, blobs))) => { - parent_lookup.add_current_request_blobs(blobs); - let chain_hash = parent_lookup.chain_hash(); - if let Some(rpc_block) = parent_lookup.current_parent_request.get_downloaded_block() - { - if self - .send_block_for_processing( - block_root, - rpc_block, - seen_timestamp, - BlockProcessType::ParentLookup { chain_hash }, - cx, - ) - .is_ok() - { - self.parent_lookups.push(parent_lookup) - } - } else { - self.parent_lookups.push(parent_lookup) - } - } - Ok(None) => { - // Waiting for more blobs to arrive - self.parent_lookups.push(parent_lookup); - } - Err(e) => { - self.handle_parent_verify_error(peer_id, parent_lookup, ResponseType::Blob, e, cx) - } - }; - - metrics::set_gauge( - &metrics::SYNC_PARENT_BLOCK_LOOKUPS, - self.parent_lookups.len() as i64, - ); - } - - fn handle_parent_verify_error( + fn handle_parent_verify_error>( &mut self, peer_id: PeerId, mut parent_lookup: ParentLookup, - response_type: ResponseType, e: ParentVerifyError, - cx: &mut SyncNetworkContext, + cx: &SyncNetworkContext, ) { match e { ParentVerifyError::RootMismatch @@ -699,10 +521,7 @@ impl BlockLookups { cx.report_peer(peer_id, PeerAction::LowToleranceError, e); // We try again if possible. - match response_type { - ResponseType::Block => self.request_parent_block(parent_lookup, cx), - ResponseType::Blob => self.request_parent_blobs(parent_lookup, cx), - }; + self.request_parent(parent_lookup, cx) } ParentVerifyError::PreviousFailure { parent_root } => { debug!( @@ -724,13 +543,9 @@ impl BlockLookups { self.log, "Requested peer could not respond to block request, requesting a new peer"; ); - parent_lookup - .current_parent_request - .remove_peer_if_useless(&peer_id, response_type); - match response_type { - ResponseType::Block => self.request_parent_block(parent_lookup, cx), - ResponseType::Blob => self.request_parent_blobs(parent_lookup, cx), - }; + let request_state = R::request_state_mut(&mut parent_lookup.current_parent_request); + request_state.remove_if_useless(&peer_id); + self.request_parent(parent_lookup, cx) } } } @@ -738,14 +553,12 @@ impl BlockLookups { /* Error responses */ pub fn peer_disconnected(&mut self, peer_id: &PeerId, cx: &mut SyncNetworkContext) { - self.single_block_lookups.retain_mut(|req| { - let should_remove_block = - should_remove_disconnected_peer(ResponseType::Block, peer_id, cx, req, &self.log); - let should_remove_blob = - should_remove_disconnected_peer(ResponseType::Blob, peer_id, cx, req, &self.log); + /* Check disconnection for single lookups */ + self.single_block_lookups.retain(|id, req| { + let should_remove_lookup = + req.should_remove_disconnected_peer(*id, peer_id, cx, &self.log); - matches!(should_remove_block, ShouldRemoveLookup::False) - && matches!(should_remove_blob, ShouldRemoveLookup::False) + !should_remove_lookup }); /* Check disconnection for parent lookups */ @@ -755,83 +568,60 @@ impl BlockLookups { }) { let parent_lookup = self.parent_lookups.remove(pos); trace!(self.log, "Parent lookup's peer disconnected"; &parent_lookup); - self.request_parent_block_and_blobs(parent_lookup, cx); + self.request_parent(parent_lookup, cx); } } /// An RPC error has occurred during a parent lookup. This function handles this case. - pub fn parent_lookup_failed( + pub fn parent_lookup_failed>( &mut self, id: Id, peer_id: PeerId, - cx: &mut SyncNetworkContext, + cx: &SyncNetworkContext, error: RPCError, ) { let msg = error.as_static_str(); if let Some(pos) = self .parent_lookups .iter() - .position(|request| request.pending_block_response(id)) + .position(|request| request.current_parent_request.id == id) { let mut parent_lookup = self.parent_lookups.remove(pos); - parent_lookup.block_download_failed(); + R::request_state_mut(&mut parent_lookup.current_parent_request) + .register_failure_downloading(); trace!(self.log, "Parent lookup block request failed"; &parent_lookup, "error" => msg); - self.request_parent_block(parent_lookup, cx); + self.request_parent(parent_lookup, cx); } else { return debug!(self.log, "RPC failure for a block parent lookup request that was not found"; "peer_id" => %peer_id, "error" => msg); }; - if let Some(pos) = self - .parent_lookups - .iter() - .position(|request| request.pending_blob_response(id)) - { - let mut parent_lookup = self.parent_lookups.remove(pos); - parent_lookup.blob_download_failed(); - trace!(self.log, "Parent lookup blobs request failed"; &parent_lookup, "error" => msg); - - self.request_parent_blobs(parent_lookup, cx); - } else { - return debug!(self.log, "RPC failure for a blobs parent lookup request that was not found"; "peer_id" => %peer_id, "error" => msg); - }; metrics::set_gauge( &metrics::SYNC_PARENT_BLOCK_LOOKUPS, self.parent_lookups.len() as i64, ); } - pub fn single_block_lookup_failed( + pub fn single_block_lookup_failed>( &mut self, id: Id, peer_id: &PeerId, - cx: &mut SyncNetworkContext, + cx: &SyncNetworkContext, error: RPCError, ) { let msg = error.as_static_str(); - self.single_block_lookups.retain_mut(|req| { - let should_remove_block = should_remove_failed_lookup( - id, - ResponseType::Block, - msg, - peer_id, - cx, - req, - &self.log, - ); - let should_remove_blob = should_remove_failed_lookup( - id, - ResponseType::Blob, - msg, - peer_id, - cx, - req, - &self.log, - ); - - matches!(should_remove_block, ShouldRemoveLookup::False) - && matches!(should_remove_blob, ShouldRemoveLookup::False) - }); + let Some(lookup) = self.single_block_lookups.get_mut(&id) else { + debug!(self.log, "Error response to dropped lookup"; "error" => ?error); + return; + }; + let root = lookup.block_request_state.requested_block_root; + let request_state = R::request_state_mut(lookup); + request_state.register_failure_downloading(); + let response_type = R::response_type(); + trace!(self.log, "Single lookup failed"; "block_root" => ?root, "error" => msg, "response_type" => ?response_type); + if let Err(()) = request_state.retry_request_after_failure(id, cx, &self.log) { + self.single_block_lookups.remove(&id); + }; metrics::set_gauge( &metrics::SYNC_SINGLE_BLOCK_LOOKUPS, @@ -841,48 +631,71 @@ impl BlockLookups { /* Processing responses */ - pub fn single_block_component_processed( + pub fn single_block_component_processed>( &mut self, target_id: Id, result: BlockProcessingResult, - response_type: ResponseType, - cx: &mut SyncNetworkContext, + cx: &SyncNetworkContext, ) { - let lookup_components_opt = - self.single_block_lookups - .iter_mut() - .enumerate() - .find_map(|(index, req)| { - let block_match = req.id.block_request_id.as_ref() == Some(&target_id); - let blob_match = req.id.blob_request_id.as_ref() == Some(&target_id); - (block_match || blob_match).then_some((index, req)) - }); - let (index, request_ref) = match lookup_components_opt { - Some(req) => req, - None => { - return debug!( - self.log, - "Block component processed for single block lookup not present" - ); - } - }; + let Some(request_ref) = self.single_block_lookups.get_mut(&target_id) else { + debug!(self.log, "Block component processed for single block lookup not present" ); + return; + }; let root = request_ref.block_request_state.requested_block_root; - let peer_id = request_ref.processing_peer(response_type); + let request_state = R::request_state_mut(request_ref); + let peer_id = request_state.get_state().processing_peer(); + request_state.get_state_mut().component_processed = true; let peer_id = match peer_id { Ok(peer) => peer, Err(_) => return, }; - let should_remove_lookup = match result { + match result { BlockProcessingResult::Ok(status) => match status { AvailabilityProcessingStatus::Imported(root) => { trace!(self.log, "Single block processing succeeded"; "block" => %root); - ShouldRemoveLookup::True + self.single_block_lookups.remove(&target_id); } AvailabilityProcessingStatus::MissingComponents(_, _block_root) => { - should_remove_missing_components(request_ref, response_type, cx, &self.log) + // if this was the result of a blocks request, the block peer did nothing wrong. + // if we already had a blobs resposne, we should penalize the blobs peer because + // they did not provide all blobs. + if request_ref.both_components_processed() { + if let Ok(blob_peer) = + request_ref.blob_request_state.state.processing_peer() + { + if let PeerShouldHave::BlockAndBlobs(blob_peer) = blob_peer { + cx.report_peer( + blob_peer, + PeerAction::MidToleranceError, + "single_block_failure", + ); + } + request_ref + .blob_request_state + .state + .remove_peer_if_useless(blob_peer.as_peer_id()); + if !::EthSpec, + > as RequestState>::downloading( + &request_ref.blob_request_state, + ) { + // Try it again if possible. + if let Err(()) = request_ref + .blob_request_state + .retry_request_after_failure(target_id, cx, &self.log) + { + self.single_block_lookups.remove(&target_id); + }; + } + } else { + trace!(self.log, "Dropped blob peer prior to penalizing"; "root" => ?root); + self.single_block_lookups.remove(&target_id); + }; + } } }, BlockProcessingResult::Ignored => { @@ -893,26 +706,25 @@ impl BlockLookups { "Single block processing was ignored, cpu might be overloaded"; "action" => "dropping single block request" ); - ShouldRemoveLookup::True + self.single_block_lookups.remove(&target_id); } BlockProcessingResult::Err(e) => { trace!(self.log, "Single block processing failed"; "block" => %root, "error" => %e); match e { BlockError::BlockIsAlreadyKnown => { // No error here - ShouldRemoveLookup::True + self.single_block_lookups.remove(&target_id); } BlockError::BeaconChainError(e) => { // Internal error error!(self.log, "Beacon chain error processing single block"; "block_root" => %root, "error" => ?e); - ShouldRemoveLookup::True + self.single_block_lookups.remove(&target_id); } BlockError::ParentUnknown(block) => { let slot = block.slot(); let parent_root = block.parent_root(); request_ref.add_unknown_parent_components(block.into()); self.search_parent(slot, root, parent_root, peer_id.to_peer_id(), cx); - ShouldRemoveLookup::False } ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => { // These errors indicate that the execution layer is offline @@ -923,7 +735,7 @@ impl BlockLookups { "root" => %root, "error" => ?e ); - ShouldRemoveLookup::True + self.single_block_lookups.remove(&target_id); } BlockError::AvailabilityCheck( AvailabilityCheckError::KzgVerificationFailed, @@ -931,27 +743,28 @@ impl BlockLookups { | BlockError::AvailabilityCheck(AvailabilityCheckError::Kzg(_)) | BlockError::BlobValidation(_) => { warn!(self.log, "Blob validation failure"; "root" => %root, "peer_id" => %peer_id); - if let Ok(blob_peer) = request_ref.processing_peer(ResponseType::Blob) { + if let Ok(blob_peer) = + request_ref.blob_request_state.state.processing_peer() + { cx.report_peer( blob_peer.to_peer_id(), PeerAction::MidToleranceError, "single_blob_failure", ); // Try it again if possible. - retry_request_after_failure( - request_ref, - ResponseType::Blob, - peer_id.as_peer_id(), - cx, - &self.log, - ) - } else { - ShouldRemoveLookup::False + if let Err(()) = request_ref + .blob_request_state + .retry_request_after_failure(target_id, cx, &self.log) + { + self.single_block_lookups.remove(&target_id); + }; } } other => { warn!(self.log, "Peer sent invalid block in single block lookup"; "root" => %root, "error" => ?other, "peer_id" => %peer_id); - if let Ok(block_peer) = request_ref.processing_peer(ResponseType::Block) { + if let Ok(block_peer) = + request_ref.block_request_state.state.processing_peer() + { cx.report_peer( block_peer.to_peer_id(), PeerAction::MidToleranceError, @@ -959,25 +772,19 @@ impl BlockLookups { ); // Try it again if possible. - retry_request_after_failure( - request_ref, - ResponseType::Block, - block_peer.as_peer_id(), - cx, - &self.log, - ) - } else { - ShouldRemoveLookup::False + if let Err(()) = request_ref + .blob_request_state + .retry_request_after_failure(target_id, cx, &self.log) + { + self.single_block_lookups.remove(&target_id); + }; } } } } }; - if matches!(should_remove_lookup, ShouldRemoveLookup::True) { - self.single_block_lookups.remove(index); - } - + //TODO(sean) move metrics to lookup response trait metrics::set_gauge( &metrics::SYNC_SINGLE_BLOCK_LOOKUPS, self.single_block_lookups.len() as i64, @@ -989,7 +796,7 @@ impl BlockLookups { chain_hash: Hash256, result: BlockProcessingResult, response_type: ResponseType, - cx: &mut SyncNetworkContext, + cx: &SyncNetworkContext, ) { let index = self .parent_lookups @@ -1002,13 +809,9 @@ impl BlockLookups { return debug!(self.log, "Process response for a parent lookup request that was not found"; "chain_hash" => %chain_hash); }; - let peer_id = parent_lookup - .current_parent_request - .processing_peer(response_type); - - let peer_id = match peer_id { - Ok(peer) => peer, - Err(_) => return, + let Ok(peer_id) = + parent_lookup.processing_peer() else { + return }; match &result { @@ -1042,7 +845,7 @@ impl BlockLookups { } BlockProcessingResult::Err(BlockError::ParentUnknown(block)) => { parent_lookup.add_unknown_parent_block(block); - self.request_parent_block_and_blobs(parent_lookup, cx); + self.request_parent(parent_lookup, cx); } BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(_)) | BlockProcessingResult::Err(BlockError::BlockIsAlreadyKnown { .. }) => { @@ -1059,15 +862,27 @@ impl BlockLookups { }; let (chain_hash, mut blocks, hashes, block_request) = parent_lookup.parts_for_processing(); - if let Some(child_block) = self.single_block_lookups.iter_mut().find_map(|req| { - if req.block_request_state.requested_block_root == chain_hash { - req.get_downloaded_block() + + // Find the child block that spawned the parent lookup request and add it to the chain + // to send for processing. + if let Some(child_lookup_id) = + self.single_block_lookups.iter().find_map(|(id, lookup)| { + (lookup.block_request_state.requested_block_root == chain_hash).then(|| *id) + }) + { + let Some(child_lookup) = self.single_block_lookups.get_mut(&child_lookup_id) else { + debug!(self.log, "Missing child for parent lookup request"; "child_root" => ?chain_hash); + return; + }; + if let Some(rpc_block) = child_lookup.get_downloaded_block() { + blocks.push(rpc_block); } else { - None + trace!(self.log, "Parent lookup chain complete, awaiting child response"; "chain_hash" => ?chain_hash); } - }) { - blocks.push(child_block); + } else { + debug!(self.log, "Missing child for parent lookup request"; "child_root" => ?chain_hash); }; + let process_id = ChainSegmentProcessId::ParentLookup(chain_hash); match beacon_processor.send_chain_segment(process_id, blocks) { @@ -1120,7 +935,7 @@ impl BlockLookups { &mut self, outcome: BlockError<::EthSpec>, peer_id: PeerId, - cx: &mut SyncNetworkContext, + cx: &SyncNetworkContext, mut parent_lookup: ParentLookup, ) { // all else we consider the chain a failure and downvote the peer that sent @@ -1135,16 +950,15 @@ impl BlockLookups { // ambiguity. cx.report_peer(peer_id, PeerAction::MidToleranceError, "parent_request_err"); // Try again if possible - parent_lookup.block_processing_failed(); - parent_lookup.blob_processing_failed(); - self.request_parent_block_and_blobs(parent_lookup, cx); + parent_lookup.processing_failed(); + self.request_parent(parent_lookup, cx); } pub fn parent_chain_processed( &mut self, chain_hash: Hash256, result: BatchProcessResult, - cx: &mut SyncNetworkContext, + cx: &SyncNetworkContext, ) { let request = match self.processing_parent_lookups.remove(&chain_hash) { Some((_hashes, request)) => request, @@ -1156,42 +970,36 @@ impl BlockLookups { debug!(self.log, "Parent chain processed"; "chain_hash" => %chain_hash, "result" => ?result); match result { BatchProcessResult::Success { .. } => { - if let Some((index, _)) = self + let Some(id) = self .single_block_lookups .iter() - .enumerate() - .find(|(_, req)| req.block_request_state.requested_block_root == chain_hash) - { - if let Some((lookup_id, rpc_block)) = - self.single_block_lookups.get_mut(index).and_then(|lookup| { - lookup - .get_downloaded_block() - .map(|block| (lookup.id.clone(), block)) - }) - { - let LookupId { - block_request_id, - blob_request_id, - } = lookup_id; - let Some(id) = block_request_id.or(blob_request_id) else { - warn!(self.log, "No id found for single block lookup"; "chain_hash" => %chain_hash); - return; - }; + .find_map(|(id, req)| + (req.block_request_state.requested_block_root == chain_hash).then(|| *id)) else { + warn!(self.log, "No id found for single block lookup"; "chain_hash" => %chain_hash); + return; + }; - // This is the correct block, send it for processing - if self - .send_block_for_processing( - chain_hash, - rpc_block, - Duration::from_secs(0), //TODO(sean) pipe this through - BlockProcessType::SingleBlock { id }, - cx, - ) - .is_err() - { - // Remove to avoid inconsistencies - self.single_block_lookups.remove(index); - } + let Some(lookup) = self + .single_block_lookups + .get_mut(&id) else { + warn!(self.log, "No id found for single block lookup"; "chain_hash" => %chain_hash); + return; + }; + + if let Some(rpc_block) = lookup.get_downloaded_block() { + // This is the correct block, send it for processing + if self + .send_block_for_processing( + chain_hash, + rpc_block, + Duration::from_secs(0), //TODO(sean) pipe this through + BlockProcessType::SingleBlock { id: id }, + cx, + ) + .is_err() + { + // Remove to avoid inconsistencies + self.single_block_lookups.remove(&id); } } } @@ -1225,7 +1033,7 @@ impl BlockLookups { block: RpcBlock, duration: Duration, process_type: BlockProcessType, - cx: &mut SyncNetworkContext, + cx: &SyncNetworkContext, ) -> Result<(), ()> { match cx.beacon_processor_if_enabled() { Some(beacon_processor) => { @@ -1259,7 +1067,7 @@ impl BlockLookups { blobs: FixedBlobSidecarList, duration: Duration, process_type: BlockProcessType, - cx: &mut SyncNetworkContext, + cx: &SyncNetworkContext, ) -> Result<(), ()> { let blob_count = blobs.iter().filter(|b| b.is_some()).count(); if blob_count == 0 { @@ -1288,49 +1096,10 @@ impl BlockLookups { } } - fn request_parent_block( - &mut self, - mut parent_lookup: ParentLookup, - cx: &mut SyncNetworkContext, - ) { - let response = parent_lookup.request_parent_block(cx); - self.handle_response(parent_lookup, cx, response, ResponseType::Block); - } + fn request_parent(&mut self, mut parent_lookup: ParentLookup, cx: &SyncNetworkContext) { + let response = parent_lookup.request_parent(cx); - fn request_parent_blobs( - &mut self, - mut parent_lookup: ParentLookup, - cx: &mut SyncNetworkContext, - ) { - let response = parent_lookup.request_parent_blobs(cx); - self.handle_response(parent_lookup, cx, response, ResponseType::Blob); - } - - fn request_parent_block_and_blobs( - &mut self, - mut parent_lookup: ParentLookup, - cx: &mut SyncNetworkContext, - ) { - let block_res = parent_lookup.request_parent_block(cx); - match block_res { - Ok(()) => { - let blob_res = parent_lookup.request_parent_blobs(cx); - self.handle_response(parent_lookup, cx, blob_res, ResponseType::Blob) - } - Err(e) => { - self.handle_response(parent_lookup, cx, Err(e), ResponseType::Block); - } - } - } - - fn handle_response( - &mut self, - parent_lookup: ParentLookup, - cx: &mut SyncNetworkContext, - result: Result<(), parent_lookup::RequestError>, - response_type: ResponseType, - ) { - match result { + match response { Err(e) => { debug!(self.log, "Failed to request parent"; &parent_lookup, "error" => e.as_static()); match e { @@ -1340,7 +1109,7 @@ impl BlockLookups { parent_lookup::RequestError::ChainTooLong => { self.failed_chains.insert(parent_lookup.chain_hash()); // This indicates faulty peers. - for &peer_id in parent_lookup.used_peers(response_type) { + for &peer_id in parent_lookup.used_peers() { cx.report_peer(peer_id, PeerAction::LowToleranceError, e.as_static()) } } @@ -1353,7 +1122,7 @@ impl BlockLookups { self.failed_chains.insert(parent_lookup.chain_hash()); } // This indicates faulty peers. - for &peer_id in parent_lookup.used_peers(response_type) { + for &peer_id in parent_lookup.used_peers() { cx.report_peer(peer_id, PeerAction::LowToleranceError, e.as_static()) } } @@ -1361,6 +1130,7 @@ impl BlockLookups { // This happens if the peer disconnects while the block is being // processed. Drop the request without extra penalty } + parent_lookup::RequestError::AlreadyDownloaded => {} } } Ok(_) => { @@ -1389,175 +1159,8 @@ impl BlockLookups { } } -fn handle_block_lookup_verify_error( - request_ref: &mut SingleBlockLookup, - response_type: ResponseType, - peer_id: PeerId, - e: LookupVerifyError, - cx: &mut SyncNetworkContext, - log: &Logger, -) -> ShouldRemoveLookup { - let msg = if matches!(e, LookupVerifyError::BenignFailure) { - request_ref.remove_peer_if_useless(&peer_id, response_type); - "peer could not response to request" - } else { - let msg = e.into(); - cx.report_peer(peer_id, PeerAction::LowToleranceError, msg); - msg - }; - - debug!(log, "Single block lookup failed"; - "peer_id" => %peer_id, - "error" => msg, - "block_root" => ?request_ref.block_request_state.requested_block_root, - "response_type" => ?response_type - ); - retry_request_after_failure(request_ref, response_type, &peer_id, cx, log) -} - -fn retry_request_after_failure( - request_ref: &mut SingleBlockLookup, - response_type: ResponseType, - initial_peer_id: &PeerId, - cx: &mut SyncNetworkContext, - log: &Logger, -) -> ShouldRemoveLookup { - let requested_block_root = request_ref.block_request_state.requested_block_root; - - // try the request again if possible - match response_type { - ResponseType::Block => { - let id = request_ref.request_block().map(|request_opt| { - request_opt - .map(|(peer_id, request)| cx.single_block_lookup_request(peer_id, request)) - }); - match id { - Ok(Some(Ok(id))) => { - request_ref.id.block_request_id = Some(id); - } - Ok(Some(Err(e))) => { - debug!(log, "Single block lookup failed"; - "peer_id" => %initial_peer_id, - "error" => ?e, - "block_root" => ?requested_block_root, - "response_type" => ?response_type); - return ShouldRemoveLookup::True; - } - Ok(None) => { - request_ref.id.block_request_id = None; - // The lookup failed but the block or blob was found via other means. - } - Err(e) => { - debug!(log, "Single block lookup failed"; - "peer_id" => %initial_peer_id, - "error" => ?e, - "block_root" => ?requested_block_root, - "response_type" => ?response_type); - return ShouldRemoveLookup::True; - } - } - } - ResponseType::Blob => { - let id = request_ref.request_blobs().map(|request_opt| { - request_opt - .map(|(peer_id, request)| cx.single_blobs_lookup_request(peer_id, request)) - }); - - match id { - Ok(Some(Ok(id))) => { - request_ref.id.blob_request_id = Some(id); - } - Ok(Some(Err(e))) => { - debug!(log, "Single block lookup failed"; - "peer_id" => %initial_peer_id, - "error" => ?e, - "block_root" => ?requested_block_root, - "response_type" => ?response_type); - return ShouldRemoveLookup::True; - } - Ok(None) => { - request_ref.id.blob_request_id = None; - // The lookup failed but the block or blob was found via other means. - } - Err(e) => { - debug!(log, "Single block lookup failed"; - "peer_id" => %initial_peer_id, - "error" => ?e, - "block_root" => ?requested_block_root, - "response_type" => ?response_type); - return ShouldRemoveLookup::True; - } - } - } - }; - ShouldRemoveLookup::False -} - -fn should_remove_disconnected_peer( - response_type: ResponseType, - peer_id: &PeerId, - cx: &mut SyncNetworkContext, - req: &mut SingleBlockLookup, - log: &Logger, -) -> ShouldRemoveLookup { - if req.check_peer_disconnected(peer_id, response_type).is_err() { - trace!(log, "Single lookup failed on peer disconnection"; "block_root" => ?req.block_request_state.requested_block_root, "response_type" => ?response_type); - retry_request_after_failure(req, response_type, peer_id, cx, log) - } else { - ShouldRemoveLookup::False - } -} - -fn should_remove_failed_lookup( - id: Id, - response_type: ResponseType, - msg: &'static str, - peer_id: &PeerId, - cx: &mut SyncNetworkContext, - req: &mut SingleBlockLookup, - log: &Logger, -) -> ShouldRemoveLookup { - if req.id.block_request_id == Some(id) || req.id.blob_request_id == Some(id) { - req.register_failure_downloading(response_type); - trace!(log, "Single lookup failed"; "block" => %req.block_request_state.requested_block_root, "error" => msg, "response_type" => ?response_type); - retry_request_after_failure(req, response_type, peer_id, cx, log) - } else { - ShouldRemoveLookup::False - } -} - -fn should_remove_missing_components( - request_ref: &mut SingleBlockLookup, - response_type: ResponseType, - cx: &mut SyncNetworkContext, - log: &Logger, -) -> ShouldRemoveLookup { - request_ref.set_component_processed(response_type); - - // If we get a missing component response after processing both a blob and a block response, the - // blobs must be what are missing. - if request_ref.both_components_processed() { - let Ok(blob_peer) = request_ref.processing_peer(ResponseType::Blob) else { - return ShouldRemoveLookup::False; - }; - if let PeerShouldHave::BlockAndBlobs(blob_peer) = blob_peer { - cx.report_peer( - blob_peer, - PeerAction::MidToleranceError, - "single_block_failure", - ); - } - request_ref.remove_peer_if_useless(blob_peer.as_peer_id(), ResponseType::Blob); - if !request_ref.downloading(ResponseType::Blob) { - // Try it again if possible. - return retry_request_after_failure( - request_ref, - ResponseType::Blob, - blob_peer.as_peer_id(), - cx, - log, - ); - } - } - ShouldRemoveLookup::False +#[derive(Debug, Copy, Clone)] +pub enum LookupType { + Current, + Parent, } diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index 6d870b5aba..1df51bf7f0 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -1,18 +1,21 @@ use super::single_block_lookup::{LookupRequestError, LookupVerifyError, SingleBlockLookup}; -use super::{BlobRequestId, BlockRequestId, DownloadedBlocks, PeerShouldHave, ResponseType}; -use crate::sync::block_lookups::single_block_lookup::{State, UnknownParentComponents}; -use crate::sync::block_lookups::{RootBlobsTuple, RootBlockTuple}; +use super::{DownloadedBlocks, PeerShouldHave}; +use crate::sync::block_lookups::single_block_lookup::{ + Parent, RequestState, State, UnknownParentComponents, +}; +use crate::sync::block_lookups::Lookup; use crate::sync::{manager::SLOT_IMPORT_TOLERANCE, network_context::SyncNetworkContext}; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::data_availability_checker::DataAvailabilityChecker; use beacon_chain::BeaconChainTypes; +use itertools::Itertools; use lighthouse_network::PeerId; use std::sync::Arc; use store::Hash256; use strum::IntoStaticStr; use types::blob_sidecar::FixedBlobSidecarList; -use types::{BlobSidecar, SignedBeaconBlock}; +use types::SignedBeaconBlock; /// How many attempts we try to find a parent of a block before we give up trying. pub(crate) const PARENT_FAIL_TOLERANCE: u8 = 5; @@ -28,7 +31,7 @@ pub(crate) struct ParentLookup { /// The blocks that have currently been downloaded. downloaded_blocks: Vec>, /// Request of the last parent. - pub current_parent_request: SingleBlockLookup, + pub current_parent_request: SingleBlockLookup, } #[derive(Debug, PartialEq, Eq, IntoStaticStr)] @@ -55,6 +58,7 @@ pub enum RequestError { cannot_process: bool, }, NoPeers, + AlreadyDownloaded, } impl ParentLookup { @@ -63,9 +67,15 @@ impl ParentLookup { parent_root: Hash256, peer_id: PeerShouldHave, da_checker: Arc>, + cx: &SyncNetworkContext, ) -> Self { - let current_parent_request = - SingleBlockLookup::new(parent_root, Some(<_>::default()), &[peer_id], da_checker); + let current_parent_request = SingleBlockLookup::new( + parent_root, + Some(<_>::default()), + &[peer_id], + da_checker, + cx, + ); Self { chain_hash: block_root, @@ -85,52 +95,15 @@ impl ParentLookup { } /// Attempts to request the next unknown parent. If the request fails, it should be removed. - pub fn request_parent_block( - &mut self, - cx: &mut SyncNetworkContext, - ) -> Result<(), RequestError> { + pub fn request_parent(&mut self, cx: &SyncNetworkContext) -> Result<(), RequestError> { // check to make sure this request hasn't failed if self.downloaded_blocks.len() + 1 >= PARENT_DEPTH_TOLERANCE { return Err(RequestError::ChainTooLong); } - if let Some((peer_id, request)) = self.current_parent_request.request_block()? { - match cx.parent_lookup_block_request(peer_id, request) { - Ok(request_id) => { - self.current_parent_request.id.block_request_id = Some(request_id); - return Ok(()); - } - Err(reason) => { - self.current_parent_request.id.block_request_id = None; - return Err(RequestError::SendFailed(reason)); - } - } - } - Ok(()) - } - - pub fn request_parent_blobs( - &mut self, - cx: &mut SyncNetworkContext, - ) -> Result<(), RequestError> { - // check to make sure this request hasn't failed - if self.downloaded_blocks.len() + 1 >= PARENT_DEPTH_TOLERANCE { - return Err(RequestError::ChainTooLong); - } - - if let Some((peer_id, request)) = self.current_parent_request.request_blobs()? { - match cx.parent_lookup_blobs_request(peer_id, request) { - Ok(request_id) => { - self.current_parent_request.id.blob_request_id = Some(request_id); - return Ok(()); - } - Err(reason) => { - self.current_parent_request.id.blob_request_id = None; - return Err(RequestError::SendFailed(reason)); - } - } - } - Ok(()) + self.current_parent_request + .request_block_and_blobs(cx) + .map_err(Into::into) } pub fn check_block_peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), ()> { @@ -162,11 +135,9 @@ impl ParentLookup { .block_request_state .requested_block_root = next_parent; self.current_parent_request.block_request_state.state.state = State::AwaitingDownload; - self.current_parent_request.id.block_request_id = None; // Update the blobs request. self.current_parent_request.blob_request_state.state.state = State::AwaitingDownload; - self.current_parent_request.id.blob_request_id = None; // Reset the unknown parent components. self.current_parent_request.unknown_parent_components = @@ -176,25 +147,24 @@ impl ParentLookup { pub fn add_current_request_block(&mut self, block: Arc>) { // Cache the block. self.current_parent_request.add_unknown_parent_block(block); - - // Update the request. - self.current_parent_request.id.block_request_id = None; } pub fn add_current_request_blobs(&mut self, blobs: FixedBlobSidecarList) { // Cache the blobs. self.current_parent_request.add_unknown_parent_blobs(blobs); - - // Update the request. - self.current_parent_request.id.blob_request_id = None; } - pub fn pending_block_response(&self, req_id: BlockRequestId) -> bool { - self.current_parent_request.id.block_request_id == Some(req_id) - } - - pub fn pending_blob_response(&self, req_id: BlobRequestId) -> bool { - self.current_parent_request.id.blob_request_id == Some(req_id) + pub fn processing_peer(&self) -> Result { + self.current_parent_request + .block_request_state + .state + .processing_peer() + .or_else(|()| { + self.current_parent_request + .blob_request_state + .state + .processing_peer() + }) } /// Consumes the parent request and destructures it into it's parts. @@ -205,7 +175,7 @@ impl ParentLookup { Hash256, Vec>, Vec, - SingleBlockLookup, + SingleBlockLookup, ) { let ParentLookup { chain_hash, @@ -227,26 +197,14 @@ impl ParentLookup { self.chain_hash } - pub fn block_download_failed(&mut self) { + pub fn processing_failed(&mut self) { self.current_parent_request .block_request_state .state - .register_failure_downloading(); - self.current_parent_request.id.block_request_id = None; - } - - pub fn blob_download_failed(&mut self) { + .register_failure_processing(); self.current_parent_request .blob_request_state .state - .register_failure_downloading(); - self.current_parent_request.id.blob_request_id = None; - } - - pub fn block_processing_failed(&mut self) { - self.current_parent_request - .block_request_state - .state .register_failure_processing(); if let Some(components) = self .current_parent_request @@ -254,46 +212,33 @@ impl ParentLookup { .as_mut() { components.downloaded_block = None; - } - self.current_parent_request.id.block_request_id = None; - } - - pub fn blob_processing_failed(&mut self) { - self.current_parent_request - .blob_request_state - .state - .register_failure_processing(); - if let Some(components) = self - .current_parent_request - .unknown_parent_components - .as_mut() - { components.downloaded_blobs = <_>::default(); } - self.current_parent_request.id.blob_request_id = None; } /// Verifies that the received block is what we requested. If so, parent lookup now waits for /// the processing result of the block. - pub fn verify_block( + pub fn verify_block>( &mut self, - block: Option>>, + block: Option, failed_chains: &mut lru_cache::LRUTimeCache, - ) -> Result>, ParentVerifyError> { - let root_and_block = self.current_parent_request.verify_block(block)?; + ) -> Result, ParentVerifyError> { + let expected_block_root = self + .current_parent_request + .block_request_state + .requested_block_root; + let request_state = R::request_state_mut(&mut self.current_parent_request); + let root_and_block = request_state.verify_response(expected_block_root, block)?; // check if the parent of this block isn't in the failed cache. If it is, this chain should // be dropped and the peer downscored. if let Some(parent_root) = root_and_block .as_ref() - .map(|(_, block)| block.parent_root()) + .map(|(_, block)| R::get_parent_root(block)) + .flatten() { if failed_chains.contains(&parent_root) { - self.current_parent_request - .block_request_state - .state - .register_failure_downloading(); - self.current_parent_request.id.block_request_id = None; + request_state.register_failure_downloading(); return Err(ParentVerifyError::PreviousFailure { parent_root }); } } @@ -301,49 +246,24 @@ impl ParentLookup { Ok(root_and_block) } - pub fn verify_blob( - &mut self, - blob: Option>>, - failed_chains: &mut lru_cache::LRUTimeCache, - ) -> Result>, ParentVerifyError> { - let parent_root_opt = blob.as_ref().map(|b| b.block_parent_root); - let blobs = self.current_parent_request.verify_blob(blob)?; - - // check if the parent of this block isn't in the failed cache. If it is, this chain should - // be dropped and the peer downscored. - if let Some(parent_root) = parent_root_opt { - if failed_chains.contains(&parent_root) { - self.current_parent_request - .blob_request_state - .state - .register_failure_downloading(); - self.current_parent_request.id.blob_request_id = None; - return Err(ParentVerifyError::PreviousFailure { parent_root }); - } - } - - Ok(blobs) - } - pub fn add_peers(&mut self, peer_source: &[PeerShouldHave]) { self.current_parent_request.add_peers(peer_source) } - pub fn used_peers(&self, response_type: ResponseType) -> impl Iterator + '_ { - match response_type { - ResponseType::Block => self - .current_parent_request - .block_request_state - .state - .used_peers - .iter(), - ResponseType::Blob => self - .current_parent_request - .blob_request_state - .state - .used_peers - .iter(), - } + pub fn used_peers(&self) -> impl Iterator + '_ { + self.current_parent_request + .block_request_state + .state + .used_peers + .iter() + .chain( + self.current_parent_request + .blob_request_state + .state + .used_peers + .iter(), + ) + .unique() } } @@ -371,6 +291,8 @@ impl From for RequestError { RequestError::TooManyAttempts { cannot_process } } E::NoPeers => RequestError::NoPeers, + E::AlreadyDownloaded => RequestError::AlreadyDownloaded, + E::SendFailed(msg) => RequestError::SendFailed(msg), } } } @@ -398,6 +320,7 @@ impl RequestError { } RequestError::TooManyAttempts { cannot_process: _ } => "too_many_downloading_attempts", RequestError::NoPeers => "no_peers", + RequestError::AlreadyDownloaded => "already_downloaded", } } } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 90829905b8..e3143f75f9 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -1,26 +1,59 @@ -use crate::sync::block_lookups::{BlobRequestId, BlockRequestId, RootBlobsTuple, RootBlockTuple}; +use super::{PeerShouldHave, ResponseType}; +use crate::sync::block_lookups::parent_lookup::RequestError::SendFailed; +use crate::sync::block_lookups::parent_lookup::PARENT_FAIL_TOLERANCE; +use crate::sync::block_lookups::{ + BlockLookups, Id, LookupType, RootBlobsTuple, RootBlockTuple, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, +}; +use crate::sync::manager::BlockProcessType; use crate::sync::network_context::SyncNetworkContext; -use beacon_chain::block_verification_types::RpcBlock; +use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; use beacon_chain::data_availability_checker::DataAvailabilityChecker; use beacon_chain::{get_block_root, BeaconChainTypes}; +use itertools::Itertools; use lighthouse_network::rpc::methods::BlobsByRootRequest; use lighthouse_network::{rpc::BlocksByRootRequest, PeerId}; use rand::seq::IteratorRandom; +use slog::{debug, trace, Logger}; use ssz_types::VariableList; use std::collections::HashSet; +use std::marker::PhantomData; use std::ops::IndexMut; use std::sync::Arc; +use std::time::Duration; use store::Hash256; use strum::IntoStaticStr; use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList}; use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; -use super::{PeerShouldHave, ResponseType}; +pub trait Lookup { + const MAX_ATTEMPTS: u8; + fn lookup_type() -> LookupType; + fn max_attempts() -> u8 { + Self::MAX_ATTEMPTS + } +} -pub struct SingleBlockLookup { - pub id: LookupId, - pub block_request_state: BlockRequestState, - pub blob_request_state: BlobRequestState, +pub struct Parent; +pub struct Current; + +impl Lookup for Parent { + const MAX_ATTEMPTS: u8 = PARENT_FAIL_TOLERANCE; + fn lookup_type() -> LookupType { + LookupType::Parent + } +} + +impl Lookup for Current { + const MAX_ATTEMPTS: u8 = SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS; + fn lookup_type() -> LookupType { + LookupType::Current + } +} + +pub struct SingleBlockLookup { + pub id: Id, + pub block_request_state: BlockRequestState, + pub blob_request_state: BlobRequestState, pub da_checker: Arc>, /// Only necessary for requests triggered by an `UnknownBlockParent` or `UnknownBlockParent` because any /// blocks or blobs without parents won't hit the data availability cache. @@ -30,101 +63,418 @@ pub struct SingleBlockLookup { pub triggered: bool, } -#[derive(Default, Clone)] -pub struct LookupId { - pub block_request_id: Option, - pub blob_request_id: Option, +// generic across block + blob +pub trait RequestState { + type RequestType; + type ResponseType; + type ReconstructedResponseType; + type VerifiedResponseType; + + // response verify + fn response_type() -> ResponseType; + fn get_parent_root(verified_response: &Self::VerifiedResponseType) -> Option; + fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self; + fn add_to_parent_components( + verified_response: Self::VerifiedResponseType, + components: &mut UnknownParentComponents, + ); + fn verified_to_reconstructed( + verified: Self::VerifiedResponseType, + ) -> Self::ReconstructedResponseType; + fn send_for_processing( + id: Id, + bl: &mut BlockLookups, + block_root: Hash256, + verified: Self::ReconstructedResponseType, + duration: Duration, + cx: &SyncNetworkContext, + ) -> Result<(), ()>; + + fn get_state(&self) -> &SingleLookupRequestState; + fn get_state_mut(&mut self) -> &mut SingleLookupRequestState; + fn processing_peer(&self) -> Result { + self.get_state().processing_peer() + } + fn downloading_peer(&self) -> Result { + self.get_state().peer() + } + fn set_component_processed(&mut self) { + self.get_state_mut().component_processed = true; + } + fn new_request(&self) -> Self::RequestType; + fn max_attempts() -> u8; + fn retry_request( + id: Id, + cx: &SyncNetworkContext, + peer_id: PeerId, + request: Self::RequestType, + ) -> Result<(), &'static str>; + fn verify_response( + &mut self, + expected_block_root: Hash256, + response: Option, + ) -> Result, LookupVerifyError> { + let request_state = self.get_state_mut(); + match request_state.state { + State::AwaitingDownload => { + request_state.register_failure_downloading(); + Err(LookupVerifyError::ExtraBlocksReturned) + } + State::Downloading { peer_id } => { + self.verify_response_inner(expected_block_root, response, peer_id) + } + State::Processing { peer_id: _ } => match response { + Some(_) => { + // We sent the block for processing and received an extra block. + request_state.register_failure_downloading(); + Err(LookupVerifyError::ExtraBlocksReturned) + } + None => { + // This is simply the stream termination and we are already processing the + // block + Ok(None) + } + }, + } + } + fn verify_response_inner( + &mut self, + expected_block_root: Hash256, + response: Option, + peer_id: PeerShouldHave, + ) -> Result, LookupVerifyError>; + + fn retry_request_after_failure( + &mut self, + id: Id, + cx: &SyncNetworkContext, + log: &Logger, + ) -> Result<(), ()> { + if let Err(e) = self + .build_request() + .map_err(Into::into) + .and_then(|(peer_id, request)| Self::retry_request(id, cx, peer_id, request)) + { + //TODO(sean) pass this error up? check downloaded contents prior to retry-ing? + debug!(log, "Single block lookup failed"; + "error" => ?e, + ); + return Err(()); + } + Ok(()) + } + fn build_request(&mut self) -> Result<(PeerId, Self::RequestType), LookupRequestError> { + debug_assert!(matches!(self.get_state().state, State::AwaitingDownload)); + self.too_many_attempts()?; + let peer = self.get_peer()?; + let request = self.new_request(); + Ok((peer, request)) + } + fn too_many_attempts(&self) -> Result<(), LookupRequestError> { + let max_attempts = Self::max_attempts(); + if self.get_state().failed_attempts() >= max_attempts { + Err(LookupRequestError::TooManyAttempts { + cannot_process: self.cannot_process(), + }) + } else { + Ok(()) + } + } + fn cannot_process(&self) -> bool { + let request_state = self.get_state(); + request_state.failed_processing >= request_state.failed_downloading + } + fn get_peer(&mut self) -> Result { + let mut request_state = self.get_state_mut(); + let Some(peer_id) = request_state + .available_peers + .iter() + .choose(&mut rand::thread_rng()) + .copied() + .map(PeerShouldHave::BlockAndBlobs).or(request_state + .potential_peers + .iter() + .choose(&mut rand::thread_rng()) + .copied() + .map(PeerShouldHave::Neither)) else { + return Err(LookupRequestError::NoPeers); + }; + request_state.used_peers.insert(peer_id.to_peer_id()); + request_state.state = State::Downloading { peer_id }; + Ok(peer_id.to_peer_id()) + } + fn check_peer_disconnected(&mut self, peer: &PeerId) -> Result<(), ()> { + self.get_state_mut().check_peer_disconnected(peer) + } + fn remove_if_useless(&mut self, peer: &PeerId) { + self.get_state_mut().remove_peer_if_useless(peer) + } + fn downloading(&self) -> bool { + matches!(self.get_state().state, State::Downloading { .. }) + } + fn register_failure_downloading(&mut self) { + self.get_state_mut().register_failure_downloading() + } } -pub struct BlobRequestState { +impl RequestState for BlockRequestState { + type RequestType = BlocksByRootRequest; + type ResponseType = Arc>; + type ReconstructedResponseType = RpcBlock; + type VerifiedResponseType = Arc>; + + // response verify + fn response_type() -> ResponseType { + ResponseType::Block + } + + fn get_parent_root(verified_response: &Arc>) -> Option { + Some(verified_response.parent_root()) + } + fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self { + &mut request.block_request_state + } + fn add_to_parent_components( + verified_response: Arc>, + components: &mut UnknownParentComponents, + ) { + components.add_unknown_parent_block(verified_response); + } + + fn verified_to_reconstructed( + block: Arc>, + ) -> RpcBlock { + RpcBlock::new_without_blobs(block) + } + + fn send_for_processing( + id: Id, + bl: &mut BlockLookups, + block_root: Hash256, + constructed: RpcBlock, + duration: Duration, + cx: &SyncNetworkContext, + ) -> Result<(), ()> { + bl.send_block_for_processing( + block_root, + constructed, + duration, + BlockProcessType::SingleBlock { id }, + cx, + ) + } + + fn get_state(&self) -> &SingleLookupRequestState { + &self.state + } + fn get_state_mut(&mut self) -> &mut SingleLookupRequestState { + &mut self.state + } + fn new_request(&self) -> BlocksByRootRequest { + BlocksByRootRequest::new(VariableList::from(vec![self.requested_block_root])) + } + fn max_attempts() -> u8 { + L::MAX_ATTEMPTS + } + fn retry_request( + id: Id, + cx: &SyncNetworkContext, + peer_id: PeerId, + request: Self::RequestType, + ) -> Result<(), &'static str> { + cx.single_block_lookup_request_retry(id, peer_id, request, L::lookup_type()) + } + + fn verify_response_inner( + &mut self, + expected_block_root: Hash256, + response: Option, + peer_id: PeerShouldHave, + ) -> Result>, LookupVerifyError> { + match response { + Some(block) => { + // Compute the block root using this specific function so that we can get timing + // metrics. + let block_root = get_block_root(&block); + if block_root != expected_block_root { + // return an error and drop the block + // NOTE: we take this is as a download failure to prevent counting the + // attempt as a chain failure, but simply a peer failure. + self.state.register_failure_downloading(); + Err(LookupVerifyError::RootMismatch) + } else { + // Return the block for processing. + self.state.state = State::Processing { peer_id }; + Ok(Some((block_root, block))) + } + } + None => { + if peer_id.should_have_block() { + self.state.register_failure_downloading(); + Err(LookupVerifyError::NoBlockReturned) + } else { + self.state.state = State::AwaitingDownload; + Err(LookupVerifyError::BenignFailure) + } + } + } + } +} + +impl RequestState for BlobRequestState { + type RequestType = BlobsByRootRequest; + type ResponseType = Arc>; + type ReconstructedResponseType = FixedBlobSidecarList; + type VerifiedResponseType = FixedBlobSidecarList; + + // response verify + fn response_type() -> ResponseType { + ResponseType::Blob + } + + fn get_parent_root(verified_response: &FixedBlobSidecarList) -> Option { + verified_response + .into_iter() + .filter_map(|blob| blob.as_ref()) + .map(|blob| blob.block_parent_root) + .next() + } + fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self { + &mut request.blob_request_state + } + fn add_to_parent_components( + verified_response: FixedBlobSidecarList, + components: &mut UnknownParentComponents, + ) { + components.add_unknown_parent_blobs(verified_response); + } + fn verified_to_reconstructed( + blobs: FixedBlobSidecarList, + ) -> FixedBlobSidecarList { + blobs + } + + fn send_for_processing( + id: Id, + bl: &mut BlockLookups, + block_root: Hash256, + verified: FixedBlobSidecarList, + duration: Duration, + cx: &SyncNetworkContext, + ) -> Result<(), ()> { + bl.send_blobs_for_processing( + block_root, + verified, + duration, + BlockProcessType::SingleBlob { id }, + cx, + ) + } + + fn get_state(&self) -> &SingleLookupRequestState { + &self.state + } + fn get_state_mut(&mut self) -> &mut SingleLookupRequestState { + &mut self.state + } + fn new_request(&self) -> BlobsByRootRequest { + BlobsByRootRequest { + blob_ids: VariableList::from(self.requested_ids.clone()), + } + } + fn max_attempts() -> u8 { + L::MAX_ATTEMPTS + } + fn retry_request( + id: Id, + cx: &SyncNetworkContext, + peer_id: PeerId, + request: Self::RequestType, + ) -> Result<(), &'static str> { + cx.single_blob_lookup_request_retry(id, peer_id, request, L::lookup_type()) + } + + fn verify_response_inner( + &mut self, + expected_block_root: Hash256, + blob: Option, + peer_id: PeerShouldHave, + ) -> Result>, LookupVerifyError> { + match blob { + Some(blob) => { + let received_id = blob.id(); + if !self.requested_ids.contains(&received_id) { + self.state.register_failure_downloading(); + Err(LookupVerifyError::UnrequestedBlobId) + } else { + // State should remain downloading until we receive the stream terminator. + self.requested_ids.retain(|id| *id != received_id); + let blob_index = blob.index; + + if blob_index >= T::EthSpec::max_blobs_per_block() as u64 { + return Err(LookupVerifyError::InvalidIndex(blob.index)); + } + *self.blob_download_queue.index_mut(blob_index as usize) = Some(blob); + Ok(None) + } + } + None => { + self.state.state = State::Processing { peer_id }; + let blobs = std::mem::take(&mut self.blob_download_queue); + Ok(Some((expected_block_root, blobs))) + } + } + } +} + +pub struct BlobRequestState { pub requested_ids: Vec, /// Where we store blobs until we receive the stream terminator. pub blob_download_queue: FixedBlobSidecarList, - pub state: SingleLookupRequestState, + pub state: SingleLookupRequestState, + _phantom: PhantomData, } -impl BlobRequestState { +impl BlobRequestState { pub fn new(peer_source: &[PeerShouldHave]) -> Self { Self { requested_ids: <_>::default(), blob_download_queue: <_>::default(), state: SingleLookupRequestState::new(peer_source), + _phantom: PhantomData::default(), } } } -pub struct BlockRequestState { +pub struct BlockRequestState { pub requested_block_root: Hash256, - pub state: SingleLookupRequestState, + pub state: SingleLookupRequestState, + _phantom: PhantomData, } -impl BlockRequestState { +impl BlockRequestState { pub fn new(block_root: Hash256, peers: &[PeerShouldHave]) -> Self { Self { requested_block_root: block_root, state: SingleLookupRequestState::new(peers), + _phantom: PhantomData::default(), } } } -impl SingleBlockLookup { - pub(crate) fn register_failure_downloading(&mut self, response_type: ResponseType) { - match response_type { - ResponseType::Block => self - .block_request_state - .state - .register_failure_downloading(), - ResponseType::Blob => self.blob_request_state.state.register_failure_downloading(), - } - } -} - -impl SingleBlockLookup { - pub(crate) fn downloading(&mut self, response_type: ResponseType) -> bool { - match response_type { - ResponseType::Block => { - matches!( - self.block_request_state.state.state, - State::Downloading { .. } - ) - } - ResponseType::Blob => { - matches!( - self.blob_request_state.state.state, - State::Downloading { .. } - ) - } +impl SingleBlockLookup { + pub(crate) fn block_already_downloaded(&self) -> bool { + if let Some(components) = self.unknown_parent_components.as_ref() { + components.downloaded_block.is_some() + } else { + self.da_checker + .has_block(&self.block_request_state.requested_block_root) } } - pub(crate) fn remove_peer_if_useless(&mut self, peer_id: &PeerId, response_type: ResponseType) { - match response_type { - ResponseType::Block => self - .block_request_state - .state - .remove_peer_if_useless(peer_id), - ResponseType::Blob => self - .blob_request_state - .state - .remove_peer_if_useless(peer_id), - } - } - - pub(crate) fn check_peer_disconnected( - &mut self, - peer_id: &PeerId, - response_type: ResponseType, - ) -> Result<(), ()> { - match response_type { - ResponseType::Block => self - .block_request_state - .state - .check_peer_disconnected(peer_id), - ResponseType::Blob => self - .blob_request_state - .state - .check_peer_disconnected(peer_id), - } + pub(crate) fn blobs_already_downloaded(&mut self) -> bool { + self.update_blobs_request(); + self.blob_request_state.requested_ids.is_empty() } } @@ -179,7 +529,7 @@ impl UnknownParentComponents { /// Object representing the state of a single block or blob lookup request. #[derive(PartialEq, Eq, Debug)] -pub struct SingleLookupRequestState { +pub struct SingleLookupRequestState { /// State of this request. pub state: State, /// Peers that should have this block or blob. @@ -225,17 +575,20 @@ pub enum LookupRequestError { cannot_process: bool, }, NoPeers, + SendFailed(&'static str), + AlreadyDownloaded, } -impl SingleBlockLookup { +impl SingleBlockLookup { pub fn new( requested_block_root: Hash256, unknown_parent_components: Option>, peers: &[PeerShouldHave], da_checker: Arc>, + cx: &SyncNetworkContext, ) -> Self { Self { - id: <_>::default(), + id: cx.next_id(), block_request_state: BlockRequestState::new(requested_block_root, peers), blob_request_state: BlobRequestState::new(peers), da_checker, @@ -251,27 +604,60 @@ impl SingleBlockLookup) -> Result<(), ()> { - let block_request_id = if let Ok(Some((peer_id, block_request))) = self.request_block() { - cx.single_block_lookup_request(peer_id, block_request).ok() - } else { - None - }; - - let blob_request_id = if let Ok(Some((peer_id, blob_request))) = self.request_blobs() { - cx.single_blobs_lookup_request(peer_id, blob_request).ok() - } else { - None - }; - - if block_request_id.is_none() && blob_request_id.is_none() { - return Err(()); + pub fn request_block_and_blobs( + &mut self, + cx: &SyncNetworkContext, + ) -> Result<(), LookupRequestError> { + let block_root = self.block_request_state.requested_block_root; + if self.block_already_downloaded() && self.blobs_already_downloaded() { + // drop lookup + trace!(cx.log, "Lookup request already completed"; "block_root"=> ?block_root); + return Err(LookupRequestError::AlreadyDownloaded); } - self.id = LookupId { - block_request_id, - blob_request_id, + let (block_peer_id, block_request) = + match as RequestState>::build_request( + &mut self.block_request_state, + ) { + Ok(opt) => opt, + Err(e) => { + // drop lookup + debug!(cx.log, + "Lookup request block error, dropping lookup"; + "block_root"=> ?block_root, + "error"=> ?e + ); + return Err(e); + } + }; + + let (blob_peer_id, blob_request) = match ::EthSpec, + > as RequestState>::build_request( + &mut self.blob_request_state + ) { + Ok(opt) => opt, + Err(e) => { + // drop lookup + debug!(cx.log, + "Lookup request blob error, dropping lookup"; + "block_root"=> ?block_root, + "error"=> ?e + ); + return Err(e); + } }; + + cx.single_lookup_request( + self.id, + block_peer_id, + block_request, + blob_peer_id, + blob_request, + L::lookup_type(), + ) + .map_err(LookupRequestError::SendFailed)?; Ok(()) } @@ -368,268 +754,6 @@ impl SingleBlockLookup>>, - ) -> Result>, LookupVerifyError> { - match self.block_request_state.state.state { - State::AwaitingDownload => { - self.block_request_state - .state - .register_failure_downloading(); - Err(LookupVerifyError::ExtraBlocksReturned) - } - State::Downloading { peer_id } => { - match block { - Some(block) => { - // Compute the block root using this specific function so that we can get timing - // metrics. - let block_root = get_block_root(&block); - if block_root != self.block_request_state.requested_block_root { - // return an error and drop the block - // NOTE: we take this is as a download failure to prevent counting the - // attempt as a chain failure, but simply a peer failure. - self.block_request_state - .state - .register_failure_downloading(); - Err(LookupVerifyError::RootMismatch) - } else { - // Return the block for processing. - self.block_request_state.state.state = State::Processing { peer_id }; - Ok(Some((block_root, block))) - } - } - None => { - if peer_id.should_have_block() { - self.block_request_state - .state - .register_failure_downloading(); - Err(LookupVerifyError::NoBlockReturned) - } else { - self.block_request_state.state.state = State::AwaitingDownload; - Err(LookupVerifyError::BenignFailure) - } - } - } - } - State::Processing { peer_id: _ } => match block { - Some(_) => { - // We sent the block for processing and received an extra block. - self.block_request_state - .state - .register_failure_downloading(); - Err(LookupVerifyError::ExtraBlocksReturned) - } - None => { - // This is simply the stream termination and we are already processing the - // block - Ok(None) - } - }, - } - } - - pub fn verify_blob( - &mut self, - blob: Option>>, - ) -> Result>, LookupVerifyError> { - match self.blob_request_state.state.state { - State::AwaitingDownload => { - self.blob_request_state.state.register_failure_downloading(); - Err(LookupVerifyError::ExtraBlobsReturned) - } - State::Downloading { - peer_id: peer_source, - } => match blob { - Some(blob) => { - let received_id = blob.id(); - if !self.blob_request_state.requested_ids.contains(&received_id) { - self.blob_request_state.state.register_failure_downloading(); - Err(LookupVerifyError::UnrequestedBlobId) - } else { - // State should remain downloading until we receive the stream terminator. - self.blob_request_state - .requested_ids - .retain(|id| *id != received_id); - let blob_index = blob.index; - - if blob_index >= T::EthSpec::max_blobs_per_block() as u64 { - return Err(LookupVerifyError::InvalidIndex(blob.index)); - } - *self - .blob_request_state - .blob_download_queue - .index_mut(blob_index as usize) = Some(blob); - Ok(None) - } - } - None => { - self.blob_request_state.state.state = State::Processing { - peer_id: peer_source, - }; - Ok(Some(( - self.block_request_state.requested_block_root, - std::mem::take(&mut self.blob_request_state.blob_download_queue), - ))) - } - }, - State::Processing { peer_id: _ } => match blob { - Some(_) => { - // We sent the blob for processing and received an extra blob. - self.blob_request_state.state.register_failure_downloading(); - Err(LookupVerifyError::ExtraBlobsReturned) - } - None => { - // This is simply the stream termination and we are already processing the - // block - Ok(None) - } - }, - } - } - - pub fn request_block( - &mut self, - ) -> Result, LookupRequestError> { - let block_already_downloaded = - if let Some(components) = self.unknown_parent_components.as_ref() { - components.downloaded_block.is_some() - } else { - self.da_checker - .has_block(&self.block_request_state.requested_block_root) - }; - - if block_already_downloaded { - return Ok(None); - } - - debug_assert!(matches!( - self.block_request_state.state.state, - State::AwaitingDownload - )); - let request = BlocksByRootRequest::new(VariableList::from(vec![ - self.block_request_state.requested_block_root, - ])); - let response_type = ResponseType::Block; - if self.too_many_attempts(response_type) { - Err(LookupRequestError::TooManyAttempts { - cannot_process: self.cannot_process(response_type), - }) - } else if let Some(peer_id) = self.get_peer(response_type) { - self.add_used_peer(peer_id, response_type); - Ok(Some((peer_id.to_peer_id(), request))) - } else { - Err(LookupRequestError::NoPeers) - } - } - - pub fn request_blobs( - &mut self, - ) -> Result, LookupRequestError> { - self.update_blobs_request(); - - if self.blob_request_state.requested_ids.is_empty() { - return Ok(None); - } - - debug_assert!(matches!( - self.blob_request_state.state.state, - State::AwaitingDownload - )); - let request = BlobsByRootRequest { - blob_ids: VariableList::from(self.blob_request_state.requested_ids.clone()), - }; - let response_type = ResponseType::Blob; - if self.too_many_attempts(response_type) { - Err(LookupRequestError::TooManyAttempts { - cannot_process: self.cannot_process(response_type), - }) - } else if let Some(peer_id) = self.get_peer(response_type) { - self.add_used_peer(peer_id, response_type); - Ok(Some((peer_id.to_peer_id(), request))) - } else { - Err(LookupRequestError::NoPeers) - } - } - - fn too_many_attempts(&self, response_type: ResponseType) -> bool { - match response_type { - ResponseType::Block => self.block_request_state.state.failed_attempts() >= MAX_ATTEMPTS, - ResponseType::Blob => self.blob_request_state.state.failed_attempts() >= MAX_ATTEMPTS, - } - } - - fn cannot_process(&self, response_type: ResponseType) -> bool { - match response_type { - ResponseType::Block => { - self.block_request_state.state.failed_processing - >= self.block_request_state.state.failed_downloading - } - ResponseType::Blob => { - self.blob_request_state.state.failed_processing - >= self.blob_request_state.state.failed_downloading - } - } - } - - fn get_peer(&self, response_type: ResponseType) -> Option { - match response_type { - ResponseType::Block => self - .block_request_state - .state - .available_peers - .iter() - .choose(&mut rand::thread_rng()) - .copied() - .map(PeerShouldHave::BlockAndBlobs) - .or(self - .block_request_state - .state - .potential_peers - .iter() - .choose(&mut rand::thread_rng()) - .copied() - .map(PeerShouldHave::Neither)), - ResponseType::Blob => self - .blob_request_state - .state - .available_peers - .iter() - .choose(&mut rand::thread_rng()) - .copied() - .map(PeerShouldHave::BlockAndBlobs) - .or(self - .blob_request_state - .state - .potential_peers - .iter() - .choose(&mut rand::thread_rng()) - .copied() - .map(PeerShouldHave::Neither)), - } - } - - fn add_used_peer(&mut self, peer_id: PeerShouldHave, response_type: ResponseType) { - match response_type { - ResponseType::Block => { - self.block_request_state - .state - .used_peers - .insert(peer_id.to_peer_id()); - self.block_request_state.state.state = State::Downloading { peer_id }; - } - ResponseType::Blob => { - self.blob_request_state - .state - .used_peers - .insert(peer_id.to_peer_id()); - self.blob_request_state.state.state = State::Downloading { peer_id }; - } - } - } - pub fn add_peers(&mut self, peers: &[PeerShouldHave]) { for peer in peers { match peer { @@ -645,34 +769,48 @@ impl SingleBlockLookup Result { - match response_type { - ResponseType::Block => self.block_request_state.state.processing_peer(), - ResponseType::Blob => self.blob_request_state.state.processing_peer(), - } - } - - pub fn downloading_peer(&self, response_type: ResponseType) -> Result { - match response_type { - ResponseType::Block => self.block_request_state.state.peer(), - ResponseType::Blob => self.blob_request_state.state.peer(), - } - } - pub fn both_components_processed(&self) -> bool { self.block_request_state.state.component_processed && self.blob_request_state.state.component_processed } - pub fn set_component_processed(&mut self, response_type: ResponseType) { - match response_type { - ResponseType::Block => self.block_request_state.state.component_processed = true, - ResponseType::Blob => self.blob_request_state.state.component_processed = true, - } + pub fn should_remove_disconnected_peer( + &mut self, + id: Id, + peer_id: &PeerId, + cx: &SyncNetworkContext, + log: &Logger, + ) -> bool { + let useless_block_peer = + if as RequestState>::check_peer_disconnected( + &mut self.block_request_state, + peer_id, + ) + .is_err() + { + trace!(log, "Single lookup failed on peer disconnection"; "block_root" => ?self.block_request_state.requested_block_root, "response_type" => ?ResponseType::Block); + self.block_request_state + .retry_request_after_failure(id, cx, log) + .is_err() + } else { + false + }; + let useless_blob_peer = if ::EthSpec> as RequestState>::check_peer_disconnected(&mut self + .blob_request_state, peer_id) + .is_err() + { + trace!(log, "Single lookup failed on peer disconnection"; "block_root" => ?self.block_request_state.requested_block_root, "response_type" => ?ResponseType::Blob); + self.blob_request_state + .retry_request_after_failure(id, cx, log) + .is_err() + } else { + false + }; + useless_block_peer && useless_blob_peer } } -impl SingleLookupRequestState { +impl SingleLookupRequestState { pub fn new(peers: &[PeerShouldHave]) -> Self { let mut available_peers = HashSet::default(); let mut potential_peers = HashSet::default(); @@ -763,9 +901,7 @@ impl SingleLookupRequestState { } } -impl slog::Value - for SingleBlockLookup -{ +impl slog::Value for SingleBlockLookup { fn serialize( &self, _record: &slog::Record, @@ -773,6 +909,7 @@ impl slog::Value serializer: &mut dyn slog::Serializer, ) -> slog::Result { serializer.emit_str("request", key)?; + serializer.emit_arguments("lookup_type", &format_args!("{:?}", L::lookup_type()))?; serializer.emit_arguments( "hash", &format_args!("{}", self.block_request_state.requested_block_root), @@ -793,7 +930,7 @@ impl slog::Value } } -impl slog::Value for SingleLookupRequestState { +impl slog::Value for SingleLookupRequestState { fn serialize( &self, record: &slog::Record, diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 561593e6d1..12f345ac7e 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -304,7 +304,7 @@ fn test_single_block_lookup_happy_path() { // The peer provides the correct block, should not be penalized. Now the block should be sent // for processing. - bl.single_block_lookup_response(id, peer_id, Some(block.into()), D, &mut cx); + bl.single_lookup_response(id, peer_id, Some(block.into()), D, &mut cx); rig.expect_empty_network(); rig.expect_block_process(response_type); @@ -313,7 +313,7 @@ fn test_single_block_lookup_happy_path() { // Send the stream termination. Peer should have not been penalized, and the request removed // after processing. - bl.single_block_lookup_response(id, peer_id, None, D, &mut cx); + bl.single_lookup_response(id, peer_id, None, D, &mut cx); bl.single_block_component_processed( id, BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), @@ -346,7 +346,7 @@ fn test_single_block_lookup_empty_response() { } // The peer does not have the block. It should be penalized. - bl.single_block_lookup_response(id, peer_id, None, D, &mut cx); + bl.single_lookup_response(id, peer_id, None, D, &mut cx); rig.expect_penalty(); rig.expect_block_request(response_type); // it should be retried @@ -375,12 +375,12 @@ fn test_single_block_lookup_wrong_response() { // Peer sends something else. It should be penalized. let bad_block = rig.rand_block(fork_name); - bl.single_block_lookup_response(id, peer_id, Some(bad_block.into()), D, &mut cx); + bl.single_lookup_response(id, peer_id, Some(bad_block.into()), D, &mut cx); rig.expect_penalty(); rig.expect_block_request(response_type); // should be retried // Send the stream termination. This should not produce an additional penalty. - bl.single_block_lookup_response(id, peer_id, None, D, &mut cx); + bl.single_lookup_response(id, peer_id, None, D, &mut cx); rig.expect_empty_network(); } @@ -438,7 +438,7 @@ fn test_single_block_lookup_becomes_parent_request() { // The peer provides the correct block, should not be penalized. Now the block should be sent // for processing. - bl.single_block_lookup_response(id, peer_id, Some(block.clone()), D, &mut cx); + bl.single_lookup_response(id, peer_id, Some(block.clone()), D, &mut cx); rig.expect_empty_network(); rig.expect_block_process(response_type); @@ -971,7 +971,7 @@ fn test_single_block_lookup_ignored_response() { // The peer provides the correct block, should not be penalized. Now the block should be sent // for processing. - bl.single_block_lookup_response(id, peer_id, Some(block.into()), D, &mut cx); + bl.single_lookup_response(id, peer_id, Some(block.into()), D, &mut cx); rig.expect_empty_network(); rig.expect_block_process(response_type); @@ -980,7 +980,7 @@ fn test_single_block_lookup_ignored_response() { // Send the stream termination. Peer should have not been penalized, and the request removed // after processing. - bl.single_block_lookup_response(id, peer_id, None, D, &mut cx); + bl.single_lookup_response(id, peer_id, None, D, &mut cx); // Send an Ignored response, the request should be dropped bl.single_block_component_processed(id, BlockProcessingResult::Ignored, response_type, &mut cx); rig.expect_empty_network(); @@ -1353,7 +1353,7 @@ mod deneb_only { fn block_response(mut self) -> Self { // The peer provides the correct block, should not be penalized. Now the block should be sent // for processing. - self.bl.single_block_lookup_response( + self.bl.single_lookup_response( self.block_req_id.expect("block request id"), self.peer_id, self.block.clone(), @@ -1402,7 +1402,7 @@ mod deneb_only { } fn empty_block_response(mut self) -> Self { - self.bl.single_block_lookup_response( + self.bl.single_lookup_response( self.block_req_id.expect("block request id"), self.peer_id, None, diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 5e8fc4a4e9..3106233658 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -43,8 +43,9 @@ use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::block_lookups::delayed_lookup; use crate::sync::block_lookups::delayed_lookup::DelayedLookupMessage; -pub use crate::sync::block_lookups::ResponseType; -use crate::sync::block_lookups::UnknownParentComponents; +use crate::sync::block_lookups::{ + BlobRequestState, BlockRequestState, Current, Parent, RequestState, UnknownParentComponents, +}; use crate::sync::range_sync::ByRangeRequestType; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::block_verification_types::RpcBlock; @@ -83,27 +84,47 @@ pub const DELAY_QUEUE_CHANNEL_SIZE: usize = 128; pub type Id = u32; +#[derive(Debug)] +pub enum ResponseType { + Block, + Blob, +} + /// Id of rpc requests sent by sync to the network. #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum RequestId { /// Request searching for a block given a hash. - SingleBlock { id: Id }, + SingleBlock { + id: Id, + }, + SingleBlob { + id: Id, + }, /// Request searching for a block's parent. The id is the chain - ParentLookup { id: Id }, + ParentLookup { + id: Id, + }, + ParentLookupBlob { + id: Id, + }, /// Request was from the backfill sync algorithm. - BackFillBlocks { id: Id }, + BackFillBlocks { + id: Id, + }, /// Backfill request that is composed by both a block range request and a blob range request. - BackFillBlockAndBlobs { id: Id }, + BackFillBlockAndBlobs { + id: Id, + }, /// The request was from a chain in the range sync algorithm. - RangeBlocks { id: Id }, + RangeBlocks { + id: Id, + }, /// Range request that is composed by both a block range request and a blob range request. - RangeBlockAndBlobs { id: Id }, + RangeBlockAndBlobs { + id: Id, + }, } -// TODO(diva) I'm updating functions what at a time, but this should be revisited because I think -// some code paths that are split for blobs and blocks can be made just one after sync as a whole -// is updated. - #[derive(Debug)] /// A message that can be sent to the sync manager thread. pub enum SyncMessage { @@ -174,6 +195,7 @@ pub enum SyncMessage { #[derive(Debug, Clone)] pub enum BlockProcessType { SingleBlock { id: Id }, + SingleBlob { id: Id }, ParentLookup { chain_hash: Hash256 }, } @@ -324,16 +346,40 @@ impl SyncManager { trace!(self.log, "Sync manager received a failed RPC"); match request_id { RequestId::SingleBlock { id } => { - self.block_lookups.single_block_lookup_failed( - id, - &peer_id, - &mut self.network, - error, - ); + self.block_lookups + .single_block_lookup_failed::>( + id, + &peer_id, + &mut self.network, + error, + ); + } + RequestId::SingleBlob { id } => { + self.block_lookups + .single_block_lookup_failed::>( + id, + &peer_id, + &mut self.network, + error, + ); } RequestId::ParentLookup { id } => { self.block_lookups - .parent_lookup_failed(id, peer_id, &mut self.network, error); + .parent_lookup_failed::>( + id, + peer_id, + &mut self.network, + error, + ); + } + RequestId::ParentLookupBlob { id } => { + self.block_lookups + .parent_lookup_failed::>( + id, + peer_id, + &mut self.network, + error, + ); } RequestId::BackFillBlocks { id } => { if let Some(batch_id) = self @@ -652,8 +698,11 @@ impl SyncManager { // If we are not synced, ignore this block. if self.synced_and_connected(&peer_id) { if self.should_delay_lookup(slot) { - self.block_lookups - .search_block_delayed(block_root, PeerShouldHave::Neither(peer_id)); + self.block_lookups.search_block_delayed( + block_root, + PeerShouldHave::Neither(peer_id), + &self.network, + ); if let Err(e) = self .delayed_lookups .try_send(DelayedLookupMessage::MissingComponents(block_root)) @@ -695,7 +744,18 @@ impl SyncManager { } => match process_type { BlockProcessType::SingleBlock { id } => self .block_lookups - .single_block_component_processed(id, result, response_type, &mut self.network), + .single_block_component_processed::>( + id, + result, + &self.network, + ), + BlockProcessType::SingleBlob { id } => self + .block_lookups + .single_block_component_processed::>( + id, + result, + &self.network, + ), BlockProcessType::ParentLookup { chain_hash } => self .block_lookups .parent_block_processed(chain_hash, result, response_type, &mut self.network), @@ -753,6 +813,7 @@ impl SyncManager { block_root, parent_components, &[PeerShouldHave::Neither(peer_id)], + &self.network, ); if let Err(e) = self .delayed_lookups @@ -883,20 +944,30 @@ impl SyncManager { seen_timestamp: Duration, ) { match request_id { - RequestId::SingleBlock { id } => self.block_lookups.single_block_lookup_response( - id, - peer_id, - block, - seen_timestamp, - &mut self.network, - ), - RequestId::ParentLookup { id } => self.block_lookups.parent_lookup_response( - id, - peer_id, - block, - seen_timestamp, - &mut self.network, - ), + RequestId::SingleBlock { id } => self + .block_lookups + .single_lookup_response::>( + id, + peer_id, + block, + seen_timestamp, + &self.network, + ), + RequestId::SingleBlob { id } => { + crit!(self.log, "Blob received during block request"; "peer_id" => %peer_id ); + } + RequestId::ParentLookup { id } => self + .block_lookups + .parent_lookup_response::>( + id, + peer_id, + block, + seen_timestamp, + &self.network, + ), + RequestId::ParentLookupBlob { id } => { + crit!(self.log, "Blob received during parent block request"; "peer_id" => %peer_id ); + } RequestId::BackFillBlocks { id } => { let is_stream_terminator = block.is_none(); if let Some(batch_id) = self @@ -954,20 +1025,31 @@ impl SyncManager { seen_timestamp: Duration, ) { match request_id { - RequestId::SingleBlock { id } => self.block_lookups.single_blob_lookup_response( - id, - peer_id, - blob, - seen_timestamp, - &mut self.network, - ), - RequestId::ParentLookup { id } => self.block_lookups.parent_lookup_blob_response( - id, - peer_id, - blob, - seen_timestamp, - &mut self.network, - ), + RequestId::SingleBlock { id } => { + crit!(self.log, "Single blob received during block request"; "peer_id" => %peer_id ); + } + RequestId::SingleBlob { id } => self + .block_lookups + .single_lookup_response::>( + id, + peer_id, + blob, + seen_timestamp, + &mut self.network, + ), + + RequestId::ParentLookup { id } => { + crit!(self.log, "Single blob received during parent block request"; "peer_id" => %peer_id ); + } + RequestId::ParentLookupBlob { id } => self + .block_lookups + .parent_lookup_response::>( + id, + peer_id, + blob, + seen_timestamp, + &mut self.network, + ), RequestId::BackFillBlocks { id: _ } => { crit!(self.log, "Blob received during backfill block request"; "peer_id" => %peer_id ); } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index d635dd2ea1..3524da7859 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -7,7 +7,7 @@ use super::range_sync::{BatchId, ByRangeRequestType, ChainId}; use crate::network_beacon_processor::NetworkBeaconProcessor; use crate::service::{NetworkMessage, RequestId}; use crate::status::ToStatusMessage; -use crate::sync::block_lookups::{BlobRequestId, BlockRequestId}; +use crate::sync::block_lookups::LookupType; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState}; use fnv::FnvHashMap; @@ -37,7 +37,7 @@ pub struct SyncNetworkContext { network_send: mpsc::UnboundedSender>, /// A sequential ID for all RPC requests. - request_id: Id, + request_id: std::cell::Cell, /// BlocksByRange requests made by the range syncing algorithm. range_requests: FnvHashMap, @@ -62,7 +62,7 @@ pub struct SyncNetworkContext { pub chain: Arc>, /// Logger for the `SyncNetworkContext`. - log: slog::Logger, + pub log: slog::Logger, } /// Small enumeration to make dealing with block and blob requests easier. @@ -93,7 +93,7 @@ impl SyncNetworkContext { SyncNetworkContext { network_send, execution_engine_state: EngineState::Online, // always assume `Online` at the start - request_id: 1, + request_id: std::cell::Cell::new(1), range_requests: FnvHashMap::default(), backfill_requests: FnvHashMap::default(), range_blocks_and_blobs_requests: FnvHashMap::default(), @@ -118,11 +118,7 @@ impl SyncNetworkContext { .unwrap_or_default() } - pub fn status_peers( - &mut self, - chain: &C, - peers: impl Iterator, - ) { + pub fn status_peers(&self, chain: &C, peers: impl Iterator) { let status_message = chain.status_message(); for peer_id in peers { debug!( @@ -409,20 +405,39 @@ impl SyncNetworkContext { } /// Sends a blocks by root request for a parent request. - pub fn single_block_lookup_request( - &mut self, + pub fn single_lookup_request( + &self, + id: Id, peer_id: PeerId, request: BlocksByRootRequest, - ) -> Result { - let id = self.next_id(); - let request_id = RequestId::Sync(SyncRequestId::SingleBlock { id }); + blob_peer_id: PeerId, + blob_request: BlobsByRootRequest, + lookup_type: LookupType, + ) -> Result<(), &'static str> { + self.single_block_lookup_request_retry(id, peer_id, request, lookup_type)?; + self.single_blob_lookup_request_retry(id, blob_peer_id, blob_request, lookup_type)?; + Ok(()) + } + pub fn single_block_lookup_request_retry( + &self, + id: Id, + peer_id: PeerId, + request: BlocksByRootRequest, + lookup_type: LookupType, + ) -> Result<(), &'static str> { + let sync_id = match lookup_type { + LookupType::Current => SyncRequestId::SingleBlock { id }, + LookupType::Parent => SyncRequestId::ParentLookup { id }, + }; + let request_id = RequestId::Sync(sync_id); trace!( self.log, "Sending BlocksByRoot Request"; "method" => "BlocksByRoot", "count" => request.block_roots().len(), - "peer" => %peer_id + "peer" => %peer_id, + "lookup_type" => ?lookup_type ); self.send_network_msg(NetworkMessage::SendRequest { @@ -430,82 +445,34 @@ impl SyncNetworkContext { request: Request::BlocksByRoot(request), request_id, })?; - Ok(id) + Ok(()) } - /// Sends a blobs by root request for a parent request. - pub fn single_blobs_lookup_request( - &mut self, - peer_id: PeerId, - request: BlobsByRootRequest, - ) -> Result { - let id = self.next_id(); - let request_id = RequestId::Sync(SyncRequestId::SingleBlock { id }); + pub fn single_blob_lookup_request_retry( + &self, + id: Id, + blob_peer_id: PeerId, + blob_request: BlobsByRootRequest, + lookup_type: LookupType, + ) -> Result<(), &'static str> { + let request_id = RequestId::Sync(SyncRequestId::SingleBlob { id }); - trace!( - self.log, - "Sending BlobsByRoot Request"; - "method" => "BlobsByRoot", - "count" => request.blob_ids.len(), - "peer" => %peer_id - ); + if !blob_request.blob_ids.is_empty() { + trace!( + self.log, + "Sending BlobsByRoot Request"; + "method" => "BlobsByRoot", + "count" => blob_request.blob_ids.len(), + "peer" => %blob_peer_id + ); - self.send_network_msg(NetworkMessage::SendRequest { - peer_id, - request: Request::BlobsByRoot(request), - request_id, - })?; - Ok(id) - } - - /// Sends a blocks by root request for a parent request. - pub fn parent_lookup_block_request( - &mut self, - peer_id: PeerId, - request: BlocksByRootRequest, - ) -> Result { - let id = self.next_id(); - let request_id = RequestId::Sync(SyncRequestId::ParentLookup { id }); - - trace!( - self.log, - "Sending parent BlocksByRoot Request"; - "method" => "BlocksByRoot", - "count" => request.block_roots().len(), - "peer" => %peer_id - ); - - self.send_network_msg(NetworkMessage::SendRequest { - peer_id, - request: Request::BlocksByRoot(request), - request_id, - })?; - Ok(id) - } - - /// Sends a blocks by root request for a parent request. - pub fn parent_lookup_blobs_request( - &mut self, - peer_id: PeerId, - request: BlobsByRootRequest, - ) -> Result { - let id = self.next_id(); - let request_id = RequestId::Sync(SyncRequestId::ParentLookup { id }); - - trace!( - self.log, - "Sending parent BlobsByRoot Request"; - "method" => "BlobsByRoot", - "count" => request.blob_ids.len(), - "peer" => %peer_id - ); - - self.send_network_msg(NetworkMessage::SendRequest { - peer_id, - request: Request::BlobsByRoot(request), - request_id, - })?; - Ok(id) + self.send_network_msg(NetworkMessage::SendRequest { + peer_id: blob_peer_id, + request: Request::BlobsByRoot(blob_request), + request_id, + })?; + } + Ok(()) } pub fn is_execution_engine_online(&self) -> bool { @@ -532,7 +499,7 @@ impl SyncNetworkContext { } /// Reports to the scoring algorithm the behaviour of a peer. - pub fn report_peer(&mut self, peer_id: PeerId, action: PeerAction, msg: &'static str) { + pub fn report_peer(&self, peer_id: PeerId, action: PeerAction, msg: &'static str) { debug!(self.log, "Sync reporting peer"; "peer_id" => %peer_id, "action" => %action); self.network_send .send(NetworkMessage::ReportPeer { @@ -547,7 +514,7 @@ impl SyncNetworkContext { } /// Subscribes to core topics. - pub fn subscribe_core_topics(&mut self) { + pub fn subscribe_core_topics(&self) { self.network_send .send(NetworkMessage::SubscribeCoreTopics) .unwrap_or_else(|e| { @@ -556,7 +523,7 @@ impl SyncNetworkContext { } /// Sends an arbitrary network message. - fn send_network_msg(&mut self, msg: NetworkMessage) -> Result<(), &'static str> { + fn send_network_msg(&self, msg: NetworkMessage) -> Result<(), &'static str> { self.network_send.send(msg).map_err(|_| { debug!(self.log, "Could not send message to the network service"); "Network channel send Failed" @@ -572,10 +539,10 @@ impl SyncNetworkContext { &self.network_beacon_processor } - fn next_id(&mut self) -> Id { - let id = self.request_id; - self.request_id += 1; - id + pub fn next_id(&self) -> Id { + let current_value = self.request_id.get(); + self.request_id.set(current_value + 1); + current_value } /// Check whether a batch for this epoch (and only this epoch) should request just blocks or @@ -587,25 +554,17 @@ impl SyncNetworkContext { const _: () = assert!( super::backfill_sync::BACKFILL_EPOCHS_PER_BATCH == 1 && super::range_sync::EPOCHS_PER_BATCH == 1, - "To deal with alignment with 4844 boundaries, batches need to be of just one epoch" + "To deal with alignment with deneb boundaries, batches need to be of just one epoch" ); - #[cfg(test)] - { - // Keep tests only for blocks. - ByRangeRequestType::Blocks - } - #[cfg(not(test))] - { - if let Some(data_availability_boundary) = self.chain.data_availability_boundary() { - if epoch >= data_availability_boundary { - ByRangeRequestType::BlocksAndBlobs - } else { - ByRangeRequestType::Blocks - } + if let Some(data_availability_boundary) = self.chain.data_availability_boundary() { + if epoch >= data_availability_boundary { + ByRangeRequestType::BlocksAndBlobs } else { ByRangeRequestType::Blocks } + } else { + ByRangeRequestType::Blocks } } }