diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index 531e376e88..1e1c35f271 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -1,5 +1,6 @@ use derivative::Derivative; use slot_clock::SlotClock; +use ssz_types::FixedVector; use std::sync::Arc; use crate::beacon_chain::{ @@ -14,8 +15,8 @@ use crate::BeaconChainError; use kzg::Kzg; use types::blob_sidecar::BlobIdentifier; use types::{ - BeaconBlockRef, BeaconStateError, BlobSidecar, BlobSidecarList, Epoch, EthSpec, Hash256, - KzgCommitment, SignedBeaconBlock, SignedBeaconBlockHeader, SignedBlobSidecar, Slot, + BeaconBlockRef, BeaconStateError, BlobSidecar, Epoch, EthSpec, Hash256, KzgCommitment, + SignedBeaconBlock, SignedBeaconBlockHeader, SignedBlobSidecar, Slot, }; #[derive(Debug)] @@ -475,11 +476,19 @@ impl AsBlock for &MaybeAvailableBlock { #[derivative(Hash(bound = "E: EthSpec"))] pub enum BlockWrapper { Block(Arc>), - BlockAndBlobs(Arc>, Vec>>), + BlockAndBlobs( + Arc>, + FixedVector>>, E::MaxBlobsPerBlock>, + ), } impl BlockWrapper { - pub fn deconstruct(self) -> (Arc>, Option>>>) { + pub fn deconstruct( + self, + ) -> ( + Arc>, + Option>>, E::MaxBlobsPerBlock>>, + ) { match self { BlockWrapper::Block(block) => (block, None), BlockWrapper::BlockAndBlobs(block, blobs) => (block, Some(blobs)), diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index ac2673b984..8dd856649f 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -46,6 +46,10 @@ pub enum AvailabilityCheckError { block_root: Hash256, blob_block_root: Hash256, }, + UnorderedBlobs { + expected_index: u64, + blob_index: u64, + }, } impl From for AvailabilityCheckError { @@ -136,11 +140,11 @@ impl DataAvailabilityChecker { } } - pub fn zip_block( + pub fn wrap_block( &self, block_root: Hash256, block: Arc>, - blobs: Vec>>, + blobs: FixedVector>>, T::MaxBlobsPerBlock>, ) -> Result, AvailabilityCheckError> { Ok(match self.get_blob_requirements(&block)? { BlobRequirements::EmptyBlobs => BlockWrapper::Block(block), @@ -153,33 +157,30 @@ impl DataAvailabilityChecker { .blob_kzg_commitments() .map(|commitments| commitments.len()) .unwrap_or(0); - let mut expected_indices: HashSet = - (0..expected_num_blobs).into_iter().collect(); - if blobs.len() < expected_num_blobs { - return Err(AvailabilityCheckError::NumBlobsMismatch { - num_kzg_commitments: expected_num_blobs, - num_blobs: blobs.len(), - }); - } - for blob in blobs.iter() { + + let mut blob_count = 0; + while let Some((index, Some(blob))) = blobs.iter().enumerate().next() { + blob_count += 1; if blob.block_root != block_root { return Err(AvailabilityCheckError::BlockBlobRootMismatch { block_root, blob_block_root: blob.block_root, }); } - let removed = expected_indices.remove(&(blob.index as usize)); - if !removed { - return Err(AvailabilityCheckError::MissingBlobs); + + let expected_index = index as u64; + if expected_index != blob.index { + return Err(AvailabilityCheckError::UnorderedBlobs { + expected_index, + blob_index: blob.index, + }); } } - if !expected_indices.is_empty() { - return Err(AvailabilityCheckError::DuplicateBlob(block_root)); + if blob_count < expected_num_blobs { + return Err(AvailabilityCheckError::MissingBlobs); } - //TODO(sean) do we re-order blobs here to the correct order? - BlockWrapper::BlockAndBlobs(block, blobs) } }) @@ -199,8 +200,10 @@ impl DataAvailabilityChecker { pub fn put_rpc_blobs( &self, block_root: Hash256, - blobs: Vec>>, + blobs: FixedVector>>, T::MaxBlobsPerBlock>, ) -> Result, AvailabilityCheckError> { + //TODO(sean) merge with existing blobs, only kzg verify blobs we haven't yet verified + // Verify the KZG commitment. let kzg_verified_blobs = if let Some(kzg) = self.kzg.as_ref() { verify_kzg_for_blob_list(blobs, kzg)? diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 87fc7d72b4..3cadf49f05 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -54,6 +54,7 @@ use lighthouse_network::{ }; use logging::TimeLatch; use slog::{crit, debug, error, trace, warn, Logger}; +use ssz_types::FixedVector; use std::collections::VecDeque; use std::future::Future; use std::pin::Pin; @@ -631,7 +632,7 @@ impl WorkEvent { pub fn rpc_blobs( block_root: Hash256, - blobs: Vec>>, + blobs: FixedVector>>, T::EthSpec::MaxBlobsPerBlock>, seen_timestamp: Duration, process_type: BlockProcessType, ) -> Self { @@ -947,7 +948,7 @@ pub enum Work { }, RpcBlobs { block_root: Hash256, - blobs: Vec>>, + blobs: FixedVector>>, T::EthSpec::MaxBlobsPerBlock>, seen_timestamp: Duration, process_type: BlockProcessType, }, diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index f2af2a03df..8233d5d8dd 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -16,6 +16,7 @@ use beacon_chain::{ }; use lighthouse_network::PeerAction; use slog::{debug, error, info, warn}; +use ssz_types::FixedVector; use std::sync::Arc; use tokio::sync::mpsc; use types::{BlobSidecar, Epoch, Hash256, SignedBeaconBlock}; @@ -54,9 +55,10 @@ impl Worker { ) { if !should_process { // Sync handles these results - self.send_sync_message(SyncMessage::BlockOrBlobProcessed { + self.send_sync_message(SyncMessage::BlockPartProcessed { process_type, - result: crate::sync::manager::BlockOrBlobProcessResult::Ignored, + result: crate::sync::manager::BlockPartProcessingResult::Ignored, + response_type: crate::sync::manager::ResponseType::Block, }); return; } @@ -129,9 +131,10 @@ impl Worker { } } // Sync handles these results - self.send_sync_message(SyncMessage::BlockOrBlobProcessed { + self.send_sync_message(SyncMessage::BlockPartProcessed { process_type, result: result.into(), + response_type: ResponseType::Block, }); // Drop the handle to remove the entry from the cache @@ -141,7 +144,7 @@ impl Worker { pub async fn process_rpc_blobs( self, block_root: Hash256, - blobs: Vec>>, + blobs: FixedVector>>, T::EthSpec::MaxBlobsPerBlock>, seen_timestamp: Duration, process_type: BlockProcessType, ) { @@ -158,9 +161,10 @@ impl Worker { .await; // Sync handles these results - self.send_sync_message(SyncMessage::BlockOrBlobProcessed { + self.send_sync_message(SyncMessage::BlockPartProcessed { process_type, result: result.into(), + response_type: ResponseType::Blobs, }); } diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index e1d21cca62..af105d3fe5 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -1,49 +1,36 @@ -use std::collections::hash_map::Entry; -use std::collections::{HashMap, HashSet}; -use std::sync::Arc; -use std::thread::sleep; -use std::time::Duration; - -use beacon_chain::blob_verification::BlockWrapper; -use beacon_chain::blob_verification::{AsBlock, MaybeAvailableBlock}; +use beacon_chain::blob_verification::{AsBlock, BlockWrapper, MaybeAvailableBlock}; use beacon_chain::data_availability_checker::DataAvailabilityChecker; use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; -use fnv::FnvHashMap; -use itertools::Itertools; -use lighthouse_network::rpc::{RPCError, RPCResponseErrorCode}; +use lighthouse_network::rpc::RPCError; use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; use slog::{debug, error, trace, warn, Logger}; use smallvec::SmallVec; +use ssz_types::FixedVector; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; use store::Hash256; -use types::blob_sidecar::BlobIdentifier; -use types::{BlobSidecar, SignedBeaconBlock}; +use types::{BlobSidecar, SignedBeaconBlock, Slot}; -use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent}; -use crate::metrics; -use crate::sync::block_lookups::parent_lookup::{ParentRequest, RequestResult}; -use crate::sync::block_lookups::single_block_lookup::SingleBlobsRequest; -use crate::sync::network_context::BlockOrBlob; - -use self::parent_lookup::PARENT_FAIL_TOLERANCE; -use self::{ - parent_lookup::{ParentLookup, VerifyError}, - single_block_lookup::SingleBlockLookup, -}; - -use super::manager::BlockOrBlobProcessResult; +use self::parent_lookup::{LookupDownloadStatus, PARENT_FAIL_TOLERANCE}; +use self::parent_lookup::{ParentLookup, ParentVerifyError}; +use self::single_block_lookup::SingleBlockLookup; +use super::manager::BlockPartProcessingResult; use super::BatchProcessResult; use super::{ manager::{BlockProcessType, Id}, network_context::SyncNetworkContext, }; +use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent}; +use crate::metrics; mod parent_lookup; mod single_block_lookup; #[cfg(test)] mod tests; -pub type DownlodedBlocks = (Hash256, MaybeAvailableBlock); +pub type DownloadedBlocks = (Hash256, BlockWrapper); pub type RootBlockTuple = (Hash256, Arc>); const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60; @@ -75,9 +62,27 @@ pub(crate) struct BlockLookups { log: Logger, } -// 1. on a completed single block lookup or single blob lookup, don't send for processing if a parent -// chain is being requested or processed -// 2. when a chain is processed, find the child requests and send for processing +#[derive(Debug, PartialEq)] +enum StreamTerminator { + True, + False, +} + +impl From for StreamTerminator { + fn from(value: bool) -> Self { + if value { + StreamTerminator::True + } else { + StreamTerminator::False + } + } +} + +#[derive(Debug)] +pub enum ResponseType { + Block, + Blob, +} impl BlockLookups { pub fn new( @@ -99,7 +104,7 @@ impl BlockLookups { /* Lookup requests */ pub fn search_block(&mut self, hash: Hash256, peer_id: PeerId, cx: &mut SyncNetworkContext) { - self.search_block_with(|| {}, hash, peer_id, cx) + self.search_block_with(|_| {}, hash, peer_id, cx) } /// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is @@ -146,7 +151,8 @@ impl BlockLookups { "block" => %hash ); - let mut single_block_request = SingleBlockLookup::new(hash, peer_id, da_checker); + let mut single_block_request = + SingleBlockLookup::new(hash, peer_id, self.da_checker.clone()); cache_fn(&mut single_block_request); let block_request_id = @@ -193,6 +199,7 @@ impl BlockLookups { /// called in order to find the block's parent. pub fn search_parent( &mut self, + slot: Slot, block_root: Hash256, parent_root: Hash256, peer_id: PeerId, @@ -201,7 +208,7 @@ impl BlockLookups { // If this block or it's parent is part of a known failed chain, ignore it. if self.failed_chains.contains(&parent_root) || self.failed_chains.contains(&block_root) { debug!(self.log, "Block is from a past failed chain. Dropping"; - "block_root" => ?block_root, "block_slot" => block.slot()); + "block_root" => ?block_root, "block_slot" => slot); return; } @@ -224,7 +231,8 @@ impl BlockLookups { return; } - let parent_lookup = ParentLookup::new(block_root, peer_id, self.da_checker.clone()); + let parent_lookup = + ParentLookup::new(block_root, parent_root, peer_id, self.da_checker.clone()); self.request_parent_block_and_blobs(parent_lookup, cx); } @@ -238,57 +246,63 @@ impl BlockLookups { seen_timestamp: Duration, cx: &mut SyncNetworkContext, ) { - let mut request = match self.single_block_lookups.entry(id) { - Entry::Occupied(req) => req, - Entry::Vacant(_) => { - if block.is_some() { - debug!( - self.log, - "Block returned for single block lookup not present" - ); - } - return; - } + let stream_terminator = block.is_none().into(); + + let Some((request_id_ref, request_ref)) = self.find_single_lookup_request(id, stream_terminator, ResponseType::Block) else { + return; }; - match request.get_mut().verify_response(block) { - Ok(Some((block_root, block))) => { - //TODO(sean) only send for processing if we don't have parent requests - // for this block + if let Err(error) = request_ref.verify_block(block).and_then(|root_block_opt| { + if let Some((root, block)) = root_block_opt { + // Only send for processing if we don't have parent requests that were triggered by + // this block. + let triggered_parent_request = self + .parent_lookups + .iter() + .any(|lookup| lookup.chain_hash() == block_root); - // This is the correct block, send it for processing - if self - .send_block_for_processing( - block_root, - block, - seen_timestamp, - BlockProcessType::SingleBlock { id }, - cx, - ) - .is_err() - { - // Remove to avoid inconsistencies - self.single_block_lookups.remove(&id); - } - } - Ok(None) => { - // request finished correctly, it will be removed after the block is processed. - } - Err(error) => { - let msg: &str = error.into(); - cx.report_peer(peer_id, PeerAction::LowToleranceError, msg); - // Remove the request, if it can be retried it will be added with a new id. - let mut req = request.remove(); - - debug!(self.log, "Single block lookup failed"; - "peer_id" => %peer_id, "error" => msg, "block_root" => %req.requested_thing); - // try the request again if possible - if let Ok((peer_id, request)) = req.make_request() { - if let Ok(id) = cx.single_block_lookup_request(peer_id, request) { - self.single_block_lookups.insert(id, req); + if triggered_parent_request { + // The lookup status here is irrelevant because we wait until the parent chain + // is complete before processing the block. + let _ = request_ref.add_block(root, block)?; + } else { + // This is the correct block, send it for processing + if self + .send_block_for_processing( + block_root, + BlockWrapper::Block(block), + seen_timestamp, + BlockProcessType::SingleBlock { id }, + cx, + ) + .is_err() + { + // Remove to avoid inconsistencies + self.single_block_lookups + .retain(|(block_id, _, _)| block_id != &Some(id)); } } } + + Ok(()) + }) { + let msg: &str = error.into(); + cx.report_peer(peer_id, PeerAction::LowToleranceError, msg); + + debug!(self.log, "Single block lookup failed"; + "peer_id" => %peer_id, "error" => msg, "block_root" => %request.requested_block_root); + // try the request again if possible + if let Ok((peer_id, request)) = request_ref.request_block() { + if let Ok(id) = cx.single_block_lookup_request(peer_id, request) { + *request_id_ref = id; + } else { + self.single_block_lookups + .retain(|(block_id, _, _)| block_id != &Some(id)); + } + } else { + self.single_block_lookups + .retain(|(block_id, _, _)| block_id != &Some(id)); + } } metrics::set_gauge( @@ -305,57 +319,62 @@ impl BlockLookups { seen_timestamp: Duration, cx: &mut SyncNetworkContext, ) { - let mut request = match self.single_block_lookups.entry(id) { - Entry::Occupied(req) => req, - Entry::Vacant(_) => { - if blob.is_some() { - debug!( - self.log, - "Block returned for single blob lookup not present" - ); - } - return; - } + let stream_terminator = blob.is_none().into(); + + let Some((request_id_ref, request_ref)) = self.find_single_lookup_request(id, stream_terminator, ResponseType::Blob) else { + return; }; - match request.get_mut().verify_blob(blob) { - Ok(Some((block_root, blobs))) => { - //TODO(sean) only send for processing if we don't have parent requests trigger - // for this block + if let Err(error) = request_ref.verify_blob(blob).and_then(|root_blobs_opt| { + if let Some((block_root, blobs)) = root_blobs_opt { + // Only send for processing if we don't have parent requests that were triggered by + // this block. + let triggered_parent_request = self + .parent_lookups + .iter() + .any(|lookup| lookup.chain_hash() == block_root); - // This is the correct block, send it for processing - if self - .send_block_for_processing( - block_root, - block, - seen_timestamp, - BlockProcessType::SingleBlock { id }, - cx, - ) - .is_err() - { - // Remove to avoid inconsistencies - self.single_block_lookups.remove(&id); - } - } - Ok(None) => { - // request finished correctly, it will be removed after the block is processed. - } - Err(error) => { - let msg: &str = error.into(); - cx.report_peer(peer_id, PeerAction::LowToleranceError, msg); - // Remove the request, if it can be retried it will be added with a new id. - let mut req = request.remove(); - - debug!(self.log, "Single block lookup failed"; - "peer_id" => %peer_id, "error" => msg, "block_root" => %req.requested_thing); - // try the request again if possible - if let Ok((peer_id, request)) = req.make_request() { - if let Ok(id) = cx.single_block_lookup_request(peer_id, request) { - self.single_block_lookups.insert(id, req); + if triggered_parent_request { + // The lookup status here is irrelevant because we wait until the parent chain + // is complete before processing the block. + let _ = request_ref.add_block(root, block)?; + } else { + // These are the correct blobs, send them for processing + if self + .send_blobs_for_processing( + block_root, + blobs, + seen_timestamp, + BlockProcessType::SingleBlock { id }, + cx, + ) + .is_err() + { + // Remove to avoid inconsistencies + self.single_block_lookups + .retain(|(_, blob_id, _)| blob_id != &Some(id)); } } } + Ok(()) + }) { + let msg: &str = error.into(); + cx.report_peer(peer_id, PeerAction::LowToleranceError, msg); + + debug!(self.log, "Single block lookup failed"; + "peer_id" => %peer_id, "error" => msg, "block_root" => %request.requested_block_root); + // try the request again if possible + if let Ok((peer_id, request)) = request_ref.request_blobs() { + if let Ok(id) = cx.single_blobs_lookup_request(peer_id, request) { + *request_id_ref = id; + } else { + self.single_block_lookups + .retain(|(_, blob_id, _)| blob_id != &Some(id)); + } + } else { + self.single_block_lookups + .retain(|(_, blob_id, _)| blob_id != &Some(id)); + } } metrics::set_gauge( @@ -364,6 +383,50 @@ impl BlockLookups { ); } + fn find_single_lookup_request( + &mut self, + target_id: Id, + stream_terminator: StreamTerminator, + response_type: ResponseType, + ) -> Option<( + &mut Id, + &mut SingleBlockLookup, + )> { + let lookup: Option<( + &mut Id, + &mut SingleBlockLookup, + )> = self + .single_block_lookups + .iter_mut() + .find_map(|(block_id_opt, blob_id_opt, req)| { + let id_opt = match response_type { + ResponseType::Block => block_id_opt, + ResponseType::Blob => blob_id_opt, + }; + if let Some(lookup_id) = id_opt { + if lookup_id == target_id { + Some((lookup_id, req)) + } + } + None + }); + + let (id_ref, request) = match lookup { + Some((id_ref, req)) => (id_ref, req), + None => { + if matches!(StreamTerminator::False, stream_terminator) { + debug!( + self.log, + "Block returned for single block lookup not present"; + "response_type" => response_type, + ); + } + return None; + } + }; + Some((id_ref, request)) + } + /// Process a response received from a parent lookup request. pub fn parent_lookup_response( &mut self, @@ -388,9 +451,9 @@ impl BlockLookups { match parent_lookup.verify_block(block, &mut self.failed_chains) { Ok(Some((block_root, block))) => { - let res = parent_lookup.add_block(block_root, block); - match res { - RequestResult::Process(wrapper) => { + let process_or_search = parent_lookup.add_block(block_root, block); + match process_or_search { + LookupDownloadStatus::Process(wrapper) => { let chain_hash = parent_lookup.chain_hash(); if self .send_block_for_processing( @@ -405,7 +468,7 @@ impl BlockLookups { self.parent_lookups.push(parent_lookup) } } - RequestResult::SearchBlock(block_root) => { + LookupDownloadStatus::SearchBlock(block_root) => { self.search_block(block_root, peer_id, cx); self.parent_lookups.push(parent_lookup) } @@ -417,9 +480,13 @@ impl BlockLookups { self.parent_lookups.push(parent_lookup); } Err(e) => match e { - VerifyError::RootMismatch - | VerifyError::NoBlockReturned - | VerifyError::ExtraBlocksReturned => { + ParentVerifyError::RootMismatch + | ParentVerifyError::NoBlockReturned + | ParentVerifyError::ExtraBlocksReturned + | ParentVerifyError::UnrequestedBlobId + | ParentVerifyError::ExtraBlobsReturned + | ParentVerifyError::InvalidIndex(_) + | ParentVerifyError::AvailabilityCheck(_) => { let e = e.into(); warn!(self.log, "Peer sent invalid response to parent request."; "peer_id" => %peer_id, "reason" => %e); @@ -431,7 +498,7 @@ impl BlockLookups { // We try again if possible. self.request_parent_block(parent_lookup, cx); } - VerifyError::PreviousFailure { parent_root } => { + ParentVerifyError::PreviousFailure { parent_root } => { debug!( self.log, "Parent chain ignored due to past failure"; @@ -477,15 +544,10 @@ impl BlockLookups { }; match parent_lookup.verify_blob(blob, &mut self.failed_chains) { - Ok(Some(blobs)) => { - let block_root = blobs - .first() - .map(|blob| blob.block_root) - .unwrap_or(parent_lookup.chain_hash()); - let processed_or_search = parent_lookup.add_blobs(blobs); - + Ok(Some((block_root, blobs))) => { + let processed_or_search = parent_lookup.add_blobs(block_root, blobs); match processed_or_search { - RequestResult::Process(wrapper) => { + LookupDownloadStatus::Process(wrapper) => { let chain_hash = parent_lookup.chain_hash(); if self .send_block_for_processing( @@ -500,10 +562,14 @@ impl BlockLookups { self.parent_lookups.push(parent_lookup) } } - RequestResult::SearchBlock(block_root) => { + LookupDownloadStatus::SearchBlock(block_root) => { self.search_block(block_root, peer_id, cx); self.parent_lookups.push(parent_lookup) } + LookupDownloadStatus::Err(e) => { + warn!(self.log, "Peer sent invalid response to parent request."; + "peer_id" => %peer_id, "reason" => %e); + } } } Ok(None) => { @@ -512,9 +578,13 @@ impl BlockLookups { self.parent_lookups.push(parent_lookup); } Err(e) => match e.into() { - VerifyError::RootMismatch - | VerifyError::NoBlockReturned - | VerifyError::ExtraBlocksReturned => { + ParentVerifyError::RootMismatch + | ParentVerifyError::NoBlockReturned + | ParentVerifyError::ExtraBlocksReturned + | ParentVerifyError::UnrequestedBlobId + | ParentVerifyError::ExtraBlobsReturned + | ParentVerifyError::InvalidIndex(_) + | ParentVerifyError::AvailabilityCheck(_) => { let e = e.into(); warn!(self.log, "Peer sent invalid response to parent request."; "peer_id" => %peer_id, "reason" => %e); @@ -526,7 +596,7 @@ impl BlockLookups { // We try again if possible. self.request_parent_blob(parent_lookup, cx); } - VerifyError::PreviousFailure { parent_root } => { + ParentVerifyError::PreviousFailure { parent_root } => { debug!( self.log, "Parent chain ignored due to past failure"; @@ -552,58 +622,83 @@ impl BlockLookups { /* Error responses */ - #[allow(clippy::needless_collect)] // false positive pub fn peer_disconnected(&mut self, peer_id: &PeerId, cx: &mut SyncNetworkContext) { - /* Check disconnection for single block lookups */ - // better written after https://github.com/rust-lang/rust/issues/59618 - let remove_retry_ids: Vec = self - .single_block_lookups - .iter_mut() - .filter_map(|(id, req)| { - if req.check_peer_disconnected(peer_id).is_err() { - Some(*id) - } else { - None - } - }) - .collect(); - - for mut req in remove_retry_ids - .into_iter() - .map(|id| self.single_block_lookups.remove(&id).unwrap()) - .collect::>() - { - // retry the request - match req.make_request() { - Ok((peer_id, block_request)) => { - if let Ok(request_id) = cx.single_block_lookup_request(peer_id, block_request) { - self.single_block_lookups.insert(request_id, req); + self.single_block_lookups + .retain_mut(|(block_id, blob_id, req)| { + if req + .block_request_state + .check_peer_disconnected(peer_id) + .is_err() + { + // retry the request + match req.request_block() { + Ok(Some((peer_id, block_request))) => { + if let Ok(request_id) = + cx.single_block_lookup_request(peer_id, block_request) + { + *block_id = Some(request_id); + return true; + } + } + Ok(None) => { + // We've already successfully downloaded the block, we may be waiting + // for blobs, so don't drop the lookup. + return true; + } + Err(e) => { + trace!( + self.log, + "Single block request failed on peer disconnection"; + "block_root" => %req.requested_block_root, + "peer_id" => %peer_id, + "reason" => <&str>::from(e), + ); + } } } - Err(e) => { - trace!( - self.log, - "Single block request failed on peer disconnection"; - "block_root" => %req.requested_thing, - "peer_id" => %peer_id, - "reason" => <&str>::from(e), - ); + if req + .blob_request_state + .check_peer_disconnected(peer_id) + .is_err() + { + // retry the request + match req.request_blobs() { + Ok(Some((peer_id, blobs_request))) => { + if let Ok(request_id) = + cx.single_blobs_lookup_request(peer_id, blobs_request) + { + *blob_id = Some(request_id); + return true; + } + } + Ok(None) => { + // We've already successfully downloaded the blobs, we may be waiting + // for block, so don't drop the lookup. + return true; + } + Err(e) => { + trace!( + self.log, + "Single blobs request failed on peer disconnection"; + "block_root" => %req.requested_block_root, + "peer_id" => %peer_id, + "reason" => <&str>::from(e), + ); + } + } } - } - } + false + }); /* Check disconnection for parent lookups */ - while let Some(pos) = self - .parent_lookups - .iter_mut() - .position(|req| req.check_block_peer_disconnected(peer_id).is_err()) - { + while let Some(pos) = self.parent_lookups.iter_mut().position(|req| { + req.check_block_peer_disconnected(peer_id).is_err() + && req.check_blob_peer_disconnected(peer_id).is_err() + }) { let parent_lookup = self.parent_lookups.remove(pos); trace!(self.log, "Parent lookup's peer disconnected"; &parent_lookup); self.request_parent_block_and_blobs(parent_lookup, cx); } - - //TODO(sean) add lookups for blobs } /// An RPC error has occurred during a parent lookup. This function handles this case. @@ -614,18 +709,34 @@ impl BlockLookups { cx: &mut SyncNetworkContext, error: RPCError, ) { + // check if there's a pending blob response when deciding whether to drop + if let Some(pos) = self .parent_lookups .iter() .position(|request| request.pending_block_response(id)) { let mut parent_lookup = self.parent_lookups.remove(pos); - parent_lookup.block_download_failed(id); - trace!(self.log, "Parent lookup request failed"; &parent_lookup); + parent_lookup.block_download_failed(); + trace!(self.log, "Parent lookup block request failed"; &parent_lookup); self.request_parent_block(parent_lookup, cx); } else { - return debug!(self.log, "RPC failure for a parent lookup request that was not found"; "peer_id" => %peer_id); + return debug!(self.log, "RPC failure for a block parent lookup request that was not found"; "peer_id" => %peer_id); + }; + + if let Some(pos) = self + .parent_lookups + .iter() + .position(|request| request.pending_blob_response(id)) + { + let mut parent_lookup = self.parent_lookups.remove(pos); + parent_lookup.blob_download_failed(); + trace!(self.log, "Parent lookup blobs request failed"; &parent_lookup); + + self.request_parent_blob(parent_lookup, cx); + } else { + return debug!(self.log, "RPC failure for a blobs parent lookup request that was not found"; "peer_id" => %peer_id); }; metrics::set_gauge( &metrics::SYNC_PARENT_BLOCK_LOOKUPS, @@ -634,15 +745,51 @@ impl BlockLookups { } pub fn single_block_lookup_failed(&mut self, id: Id, cx: &mut SyncNetworkContext) { - if let Some(mut request) = self.single_block_lookups.remove(&id) { - request.register_failure_downloading(); - trace!(self.log, "Single block lookup failed"; "block" => %request.requested_thing); - if let Ok((peer_id, block_request)) = request.make_request() { - if let Ok(request_id) = cx.single_block_lookup_request(peer_id, block_request) { - self.single_block_lookups.insert(request_id, request); + self.single_block_lookups.retain_mut(|(block_id, blob_id, req)|{ + if &Some(id) == block_id { + req.block_request_state.register_failure_downloading(); + trace!(self.log, "Single block lookup failed"; "block" => %request.requested_block_root); + match req.request_block() { + Ok(Some((peer_id, block_request))) => { + if let Ok(request_id) = cx.single_block_lookup_request(peer_id, block_request) { + *block_id = Some(request_id); + return true + } + }, + Ok(None) => return true, + Err(e) => { + trace!( + self.log, + "Single block request failed"; + "block_root" => %req.requested_block_root, + "reason" => <&str>::from(e), + ); + } + } + } + if &Some(id) == blob_id { + req.blob_request_state.register_failure_downloading(); + trace!(self.log, "Single blob lookup failed"; "block" => %request.requested_block_root); + match req.request_blobs() { + Ok(Some((peer_id, blob_request))) => { + if let Ok(request_id) = cx.single_blobs_lookup_request(peer_id, blob_request) { + *blob_id = Some(request_id); + return true + } + }, + Ok(None) => return true, + Err(e) => { + trace!( + self.log, + "Single blob request failed"; + "block_root" => %req.requested_block_root, + "reason" => <&str>::from(e), + ); + } } } - } + false + }); metrics::set_gauge( &metrics::SYNC_SINGLE_BLOCK_LOOKUPS, @@ -655,10 +802,28 @@ impl BlockLookups { pub fn single_block_processed( &mut self, id: Id, - result: BlockOrBlobProcessResult, + result: BlockPartProcessingResult, + response_type: ResponseType, cx: &mut SyncNetworkContext, ) { - let mut req = match self.single_block_lookups.remove(&id) { + let (index, req_id, req) = match self.single_block_lookups.iter_mut().enumerate().find_map( + |(index, (block_id, blob_id, req))| match response_type { + ResponseType::Block => { + if block_id == &Some(id) { + Some((index, block_id, req)) + } else { + None + } + } + ResponseType::Blob => { + if blob_id == &Some(id) { + Some((index, blob_id, req)) + } else { + None + } + } + }, + ) { Some(req) => req, None => { return debug!( @@ -668,22 +833,30 @@ impl BlockLookups { } }; - let root = req.requested_thing; - let peer_id = match req.processing_peer() { - Ok(peer) => peer, - Err(_) => return, + let root = req.requested_block_root; + let peer_id = match response_type { + ResponseType::Block => match req.block_request_state.processing_peer() { + Ok(peer) => peer, + Err(_) => return, + }, + ResponseType::Blob => match req.blob_request_state.processing_peer() { + Ok(peer) => peer, + Err(_) => return, + }, }; - match result { - BlockOrBlobProcessResult::Ok(status) => match status { + let remove = match result { + BlockPartProcessingResult::Ok(status) => match status { AvailabilityProcessingStatus::Imported(hash) => { trace!(self.log, "Single block processing succeeded"; "block" => %root); + true } AvailabilityProcessingStatus::MissingParts(block_root) => { self.search_block(block_root, peer_id, cx); + false } }, - BlockOrBlobProcessResult::Ignored => { + BlockPartProcessingResult::Ignored => { // Beacon processor signalled to ignore the block processing result. // This implies that the cpu is overloaded. Drop the request. warn!( @@ -691,19 +864,24 @@ impl BlockLookups { "Single block processing was ignored, cpu might be overloaded"; "action" => "dropping single block request" ); + true } - BlockOrBlobProcessResult::Err(e) => { + BlockPartProcessingResult::Err(e) => { trace!(self.log, "Single block processing failed"; "block" => %root, "error" => %e); match e { BlockError::BlockIsAlreadyKnown => { // No error here + true } BlockError::BeaconChainError(e) => { // Internal error error!(self.log, "Beacon chain error processing single block"; "block_root" => %root, "error" => ?e); + true } BlockError::ParentUnknown(block) => { - self.search_parent(root, block.parent_root(), peer_id, cx); + self.search_parent(block.slot(), root, block.parent_root(), peer_id, cx); + //TODO(sean) - handle request for parts of this block + false } ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => { // These errors indicate that the execution layer is offline @@ -714,6 +892,8 @@ impl BlockLookups { "root" => %root, "error" => ?e ); + //TODO(sean) is this right? + true } other => { warn!(self.log, "Peer sent invalid block in single block lookup"; "root" => %root, "error" => ?other, "peer_id" => %peer_id); @@ -723,17 +903,49 @@ impl BlockLookups { "single_block_failure", ); // Try it again if possible. - req.register_failure_processing(); - if let Ok((peer_id, request)) = req.make_request() { - if let Ok(request_id) = cx.single_block_lookup_request(peer_id, request) - { - // insert with the new id - self.single_block_lookups.insert(request_id, req); + match response_type { + ResponseType::Block => { + req.block_request_state.register_failure_processing(); + match req.request_block() { + Ok(Some((peer_id, requeest))) => { + if let Ok(request_id) = + cx.single_block_lookup_request(peer_id, request) + { + *req_id = Some(request_id); + false + } else { + true + } + } + Ok(None) => false, + Err(_) => true, + } + } + ResponseType::Blob => { + req.blob_request_state.register_failure_processing(); + match req.request_blobs() { + Ok(Some((peer_id, request))) => { + if let Ok(request_id) = + cx.single_blobs_lookup_request(peer_id, request) + { + *req_id = Some(request_id); + false + } else { + true + } + } + Ok(None) => false, + Err(_) => true, + } } } } } } + }; + + if remove { + self.single_block_lookups.remove(index); } metrics::set_gauge( @@ -745,25 +957,39 @@ impl BlockLookups { pub fn parent_block_processed( &mut self, chain_hash: Hash256, - result: BlockOrBlobProcessResult, + result: BlockPartProcessingResult, + response_type: ResponseType, cx: &mut SyncNetworkContext, ) { - let (mut parent_lookup, peer_id) = if let Some((pos, peer)) = self + let index = self .parent_lookups .iter() .enumerate() - .find_map(|(pos, request)| { - request - .get_block_processing_peer(chain_hash) - .map(|peer| (pos, peer)) - }) { - (self.parent_lookups.remove(pos), peer) - } else { + .find(|(index, _)| lookup.chain_hash() == chain_hash) + .map(|(index, _)| index); + + let Some(mut parent_lookup) = index.map(|index|self.parent_lookups.remove(index)) else { return debug!(self.log, "Process response for a parent lookup request that was not found"; "chain_hash" => %chain_hash); }; + let peer_id = match response_type { + ResponseType::Block => parent_lookup + .current_parent_request + .block_request_state + .processing_peer(), + ResponseType::Blob => parent_lookup + .current_parent_request + .blob_request_state + .processing_peer(), + }; + + let peer_id = match peer_id { + Ok(peer) => peer, + Err(_) => return, + }; + match &result { - BlockOrBlobProcessResult::Ok(status) => match status { + BlockPartProcessingResult::Ok(status) => match status { AvailabilityProcessingStatus::Imported(hash) => { trace!(self.log, "Parent block processing succeeded"; &parent_lookup) } @@ -771,10 +997,10 @@ impl BlockLookups { trace!(self.log, "Parent missing parts, triggering single block lookup "; &parent_lookup) } }, - BlockOrBlobProcessResult::Err(e) => { + BlockPartProcessingResult::Err(e) => { trace!(self.log, "Parent block processing failed"; &parent_lookup, "error" => %e) } - BlockOrBlobProcessResult::Ignored => { + BlockPartProcessingResult::Ignored => { trace!( self.log, "Parent block processing job was ignored"; @@ -785,19 +1011,17 @@ impl BlockLookups { } match result { - BlockOrBlobProcessResult::Ok(AvailabilityProcessingStatus::MissingParts( + BlockPartProcessingResult::Ok(AvailabilityProcessingStatus::MissingParts( block_root, )) => { self.search_block(block_root, peer_id, cx); } - BlockOrBlobProcessResult::Err(BlockError::ParentUnknown(block)) => { - parent_lookup.add_block(block); - // `ParentUnknown` triggered by a parent block lookup should always have all blobs - // so we don't re-request blobs for the current block. + BlockPartProcessingResult::Err(BlockError::ParentUnknown(block)) => { + parent_lookup.add_block_wrapper(block); self.request_parent_block_and_blobs(parent_lookup, cx); } - BlockOrBlobProcessResult::Ok(AvailabilityProcessingStatus::Imported(_)) - | BlockOrBlobProcessResult::Err(BlockError::BlockIsAlreadyKnown { .. }) => { + BlockPartProcessingResult::Ok(AvailabilityProcessingStatus::Imported(_)) + | BlockPartProcessingResult::Err(BlockError::BlockIsAlreadyKnown { .. }) => { // Check if the beacon processor is available let beacon_processor_send = match cx.processor_channel_if_enabled() { Some(channel) => channel, @@ -809,7 +1033,7 @@ impl BlockLookups { ); } }; - let (chain_hash, blocks, hashes, block_request, blob_request) = + let (chain_hash, blocks, hashes, block_request) = parent_lookup.parts_for_processing(); let process_id = ChainSegmentProcessId::ParentLookup(chain_hash); @@ -829,7 +1053,7 @@ impl BlockLookups { } } } - ref e @ BlockOrBlobProcessResult::Err(BlockError::ExecutionPayloadError(ref epe)) + ref e @ BlockPartProcessingResult::Err(BlockError::ExecutionPayloadError(ref epe)) if !epe.penalize_peer() => { // These errors indicate that the execution layer is offline @@ -841,7 +1065,7 @@ impl BlockLookups { "error" => ?e ); } - BlockOrBlobProcessResult::Err(outcome) => { + BlockPartProcessingResult::Err(outcome) => { // all else we consider the chain a failure and downvote the peer that sent // us the last block warn!( @@ -856,10 +1080,18 @@ impl BlockLookups { cx.report_peer(peer_id, PeerAction::MidToleranceError, "parent_request_err"); // Try again if possible - parent_lookup.block_processing_failed(); - self.request_parent_block(parent_lookup, cx); + match response_type { + ResponseType::Block => { + parent_lookup.block_processing_failed(); + self.request_parent_block(parent_lookup, cx); + } + ResponseType::Blob => { + parent_lookup.blob_processing_failed(); + self.request_parent_blob(parent_lookup, cx); + } + } } - BlockOrBlobProcessResult::Ignored => { + BlockPartProcessingResult::Ignored => { // Beacon processor signalled to ignore the block processing result. // This implies that the cpu is overloaded. Drop the request. warn!( @@ -946,13 +1178,43 @@ impl BlockLookups { } } + fn send_blobs_for_processing( + &self, + block_root: Hash256, + blobs: FixedVector>>, T::EthSpec::MaxBlobsPerBlock>, + seen_timestamp: Duration, + id: BlockProcessType, + cx: &mut SyncNetworkContext, + ) -> Result<(), ()> { + match cx.processor_channel_if_enabled() { + Some(beacon_processor_send) => { + trace!(self.log, "Sending blobs for processing"; "block" => ?block_root, "process" => ?process_type); + let event = WorkEvent::rpc_blobs(block_root, blobs, duration, process_type); + if let Err(e) = beacon_processor_send.try_send(event) { + error!( + self.log, + "Failed to send sync blobs to processor"; + "error" => ?e + ); + Err(()) + } else { + Ok(()) + } + } + None => { + trace!(self.log, "Dropping blobs ready for processing. Beacon processor not available"; "block_root" => %block_root); + Err(()) + } + } + } + fn request_parent_block( &mut self, mut parent_lookup: ParentLookup, cx: &mut SyncNetworkContext, ) { let response = parent_lookup.request_parent_block(cx); - self.handle_response(parent_lookup, response); + self.handle_response(parent_lookup, cx, response); } fn request_parent_blob( @@ -961,7 +1223,7 @@ impl BlockLookups { cx: &mut SyncNetworkContext, ) { let response = parent_lookup.request_parent_blobs(cx); - self.handle_response(parent_lookup, response); + self.handle_response(parent_lookup, cx, response); } fn request_parent_block_and_blobs( @@ -971,7 +1233,7 @@ impl BlockLookups { ) { let response = parent_lookup .request_parent_block(cx) - .and_then(|| parent_lookup.request_parent_blobs(cx)); + .and_then(|_| parent_lookup.request_parent_blobs(cx)); self.handle_response(parent_lookup, cx, response); } @@ -1030,7 +1292,9 @@ impl BlockLookups { /// Drops all the single block requests and returns how many requests were dropped. pub fn drop_single_block_requests(&mut self) -> usize { - self.single_block_lookups.drain().len() + let requests_to_drop = self.single_block_lookups.len(); + self.single_block_lookups.clear(); + requests_to_drop } /// Drops all the parent chain requests and returns how many requests were dropped. diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index 9b41b1de72..49e5b1826e 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -1,17 +1,17 @@ -use super::DownlodedBlocks; -use crate::sync::block_lookups::single_block_lookup::{RequestableThing, SingleBlobsRequest}; -use crate::sync::block_lookups::RootBlockTuple; -use crate::sync::manager::BlockProcessType; +use super::single_block_lookup::{LookupRequestError, LookupVerifyError, SingleBlockLookup}; +use super::DownloadedBlocks; +use crate::sync::block_lookups::{single_block_lookup, RootBlockTuple}; use crate::sync::{ manager::{Id, SLOT_IMPORT_TOLERANCE}, network_context::SyncNetworkContext, }; use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::blob_verification::{AsBlock, MaybeAvailableBlock}; -use beacon_chain::data_availability_checker::{AvailableBlock, DataAvailabilityChecker}; +use beacon_chain::data_availability_checker::AvailabilityCheckError; +use beacon_chain::data_availability_checker::DataAvailabilityChecker; use beacon_chain::BeaconChainTypes; -use lighthouse_network::libp2p::core::either::EitherName::A; use lighthouse_network::PeerId; +use ssz_types::FixedVector; use std::iter; use std::sync::Arc; use store::Hash256; @@ -19,8 +19,6 @@ use strum::IntoStaticStr; use types::blob_sidecar::BlobIdentifier; use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; -use super::single_block_lookup::{self, SingleBlockLookup}; - /// How many attempts we try to find a parent of a block before we give up trying. pub(crate) const PARENT_FAIL_TOLERANCE: u8 = 5; /// The maximum depth we will search for a parent block. In principle we should have sync'd any @@ -33,7 +31,7 @@ pub(crate) struct ParentLookup { /// The root of the block triggering this parent request. chain_hash: Hash256, /// The blocks that have currently been downloaded. - downloaded_blocks: Vec>, + downloaded_blocks: Vec>, /// Request of the last parent. pub current_parent_request: SingleBlockLookup, /// Id of the last parent request. @@ -42,11 +40,15 @@ pub(crate) struct ParentLookup { } #[derive(Debug, PartialEq, Eq, IntoStaticStr)] -pub enum VerifyError { +pub enum ParentVerifyError { RootMismatch, NoBlockReturned, ExtraBlocksReturned, + UnrequestedBlobId, + ExtraBlobsReturned, + InvalidIndex(u64), PreviousFailure { parent_root: Hash256 }, + AvailabilityCheck(AvailabilityCheckError), } #[derive(Debug, PartialEq, Eq)] @@ -62,7 +64,7 @@ pub enum RequestError { NoPeers, } -pub enum RequestResult { +pub enum LookupDownloadStatus { Process(BlockWrapper), SearchBlock(Hash256), } @@ -76,11 +78,11 @@ impl ParentLookup { pub fn new( block_root: Hash256, + parent_root: Hash256, peer_id: PeerId, da_checker: Arc>, ) -> Self { - let current_parent_request = - SingleBlockLookup::new(block.parent_root(), peer_id, da_checker); + let current_parent_request = SingleBlockLookup::new(parent_root, peer_id, da_checker); Self { chain_hash: block_root, @@ -141,31 +143,61 @@ impl ParentLookup { } pub fn check_block_peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), ()> { - self.current_parent_request.check_peer_disconnected(peer_id) + self.current_parent_request + .block_request_state + .check_peer_disconnected(peer_id) } pub fn check_blob_peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), ()> { - self.current_parent_blob_request - .map(|mut req| req.check_peer_disconnected(peer_id)) - .unwrap_or_default() + self.current_parent_request + .blob_request_state + .check_peer_disconnected(peer_id) + } + + pub fn add_block_wrapper(&mut self, block: BlockWrapper) { + let next_parent = block.parent_root(); + let current_root = self.current_parent_request.requested_block_root; + + self.downloaded_blocks.push((current_root, block)); + self.current_parent_request.requested_block_root = next_parent; + + let mut blob_ids = Vec::with_capacity(T::EthSpec::max_blobs_per_block()); + for i in 0..T::EthSpec::max_blobs_per_block() { + blob_ids.push(BlobIdentifier { + block_root: current_root, + index: i as u64, + }); + } + + self.current_parent_request.requested_ids = blob_ids; + self.current_parent_request.block_request_state.state = + single_block_lookup::State::AwaitingDownload; + self.current_parent_request.blob_request_state.state = + single_block_lookup::State::AwaitingDownload; + self.current_parent_request_id = None; + self.current_parent_blob_request_id = None; } pub fn add_block( &mut self, block_root: Hash256, block: Arc>, - ) -> RequestResult { + ) -> Result, ParentVerifyError> { self.current_parent_request_id = None; - self.current_parent_request.add_block(block_root, block) + self.current_parent_request + .add_block(block_root, block) + .map_err(Into::into) } pub fn add_blobs( &mut self, block_root: Hash256, - blobs: Vec>>, - ) -> RequestResult { + blobs: FixedVector>>, T::EthSpec::MaxBlobsPerBlock>, + ) -> Result, ParentVerifyError> { self.current_parent_blob_request_id = None; - self.current_parent_request.add_blobs(block_root, blobs) + self.current_parent_request + .add_blobs(block_root, blobs) + .map_err(Into::into) } pub fn pending_block_response(&self, req_id: Id) -> bool { @@ -182,10 +214,9 @@ impl ParentLookup { self, ) -> ( Hash256, - Vec>, + Vec>, Vec, SingleBlockLookup, - Option>, ) { let ParentLookup { chain_hash, @@ -201,13 +232,7 @@ impl ParentLookup { blocks.push(block); hashes.push(hash); } - ( - chain_hash, - blocks, - hashes, - current_parent_request, - current_parent_blob_request, - ) + (chain_hash, blocks, hashes, current_parent_request) } /// Get the parent lookup's chain hash. @@ -216,24 +241,30 @@ impl ParentLookup { } pub fn block_download_failed(&mut self) { - self.current_parent_request.register_failure_downloading(); - self.current_parent_request_id = None; - } - - pub fn block_processing_failed(&mut self) { - self.current_parent_request.register_failure_processing(); + self.current_parent_request + .block_request_state + .register_failure_downloading(); self.current_parent_request_id = None; } pub fn blob_download_failed(&mut self) { - self.current_parent_blob_request - .map(|mut req| req.register_failure_downloading()); + self.current_parent_request + .blob_request_state + .register_failure_downloading(); self.current_parent_blob_request_id = None; } + pub fn block_processing_failed(&mut self) { + self.current_parent_request + .block_request_state + .register_failure_processing(); + self.current_parent_request_id = None; + } + pub fn blob_processing_failed(&mut self) { - self.current_parent_blob_request - .map(|mut req| req.register_failure_processing()); + self.current_parent_request + .blob_request_state + .register_failure_processing(); self.current_parent_blob_request_id = None; } @@ -243,7 +274,7 @@ impl ParentLookup { &mut self, block: Option>>, failed_chains: &mut lru_cache::LRUTimeCache, - ) -> Result>, VerifyError> { + ) -> Result>, ParentVerifyError> { let root_and_block = self.current_parent_request.verify_block(block)?; // check if the parent of this block isn't in the failed cache. If it is, this chain should @@ -257,7 +288,7 @@ impl ParentLookup { .block_request_state .register_failure_downloading(); self.current_parent_request_id = None; - return Err(VerifyError::PreviousFailure { parent_root }); + return Err(ParentVerifyError::PreviousFailure { parent_root }); } } @@ -268,7 +299,14 @@ impl ParentLookup { &mut self, blob: Option>>, failed_chains: &mut lru_cache::LRUTimeCache, - ) -> Result>>>, VerifyError> { + ) -> Result< + Option<( + Hash256, + FixedVector>>, T::EthSpec::MaxBlobsPerBlock>, + )>, + ParentVerifyError, + > { + let block_root = self.current_parent_request.requested_block_root; let blobs = self.current_parent_request.verify_blob(blob)?; // check if the parent of this block isn't in the failed cache. If it is, this chain should @@ -276,23 +314,38 @@ impl ParentLookup { if let Some(parent_root) = blobs .as_ref() .and_then(|blobs| blobs.first()) - .map(|blob| blob.block_parent_root) + .map(|blob| blob.block_parent_root_id) { if failed_chains.contains(&parent_root) { self.current_parent_request .blob_request_state .register_failure_downloading(); self.current_parent_blob_request_id = None; - return Err(VerifyError::PreviousFailure { parent_root }); + return Err(ParentVerifyError::PreviousFailure { parent_root }); } } - Ok(blobs) + Ok((block_root, blobs)) } pub fn get_block_processing_peer(&self, chain_hash: Hash256) -> Option { if self.chain_hash == chain_hash { - return self.current_parent_request.processing_peer().ok(); + return self + .current_parent_request + .block_request_state + .processing_peer() + .ok(); + } + None + } + + pub fn get_blob_processing_peer(&self, chain_hash: Hash256) -> Option { + if self.chain_hash == chain_hash { + return self + .current_parent_request + .blob_request_state + .processing_peer() + .ok(); } None } @@ -310,15 +363,6 @@ impl ParentLookup { self.current_parent_request.used_peers.iter() } - pub fn get_blob_processing_peer(&self, chain_hash: Hash256) -> Option { - if self.chain_hash == chain_hash { - return self - .current_parent_blob_request - .and_then(|req| req.processing_peer().ok()); - } - None - } - #[cfg(test)] pub fn failed_blob_attempts(&self) -> u8 { self.current_parent_blob_request @@ -336,20 +380,24 @@ impl ParentLookup { } } -impl From for VerifyError { - fn from(e: super::single_block_lookup::VerifyError) -> Self { - use super::single_block_lookup::VerifyError as E; +impl From for ParentVerifyError { + fn from(e: LookupVerifyError) -> Self { + use LookupVerifyError as E; match e { - E::RootMismatch => VerifyError::RootMismatch, - E::NoBlockReturned => VerifyError::NoBlockReturned, - E::ExtraBlocksReturned => VerifyError::ExtraBlocksReturned, + E::RootMismatch => ParentVerifyError::RootMismatch, + E::NoBlockReturned => ParentVerifyError::NoBlockReturned, + E::ExtraBlocksReturned => ParentVerifyError::ExtraBlocksReturned, + E::UnrequestedBlobId => ParentVerifyError::UnrequestedBlobId, + E::ExtraBlobsReturned => ParentVerifyError::ExtraBlobsReturned, + E::InvalidIndex(index) => ParentVerifyError::InvalidIndex(index), + E::AvailabilityCheck(e) => ParentVerifyError::AvailabilityCheck(e), } } } -impl From for RequestError { - fn from(e: super::single_block_lookup::LookupRequestError) -> Self { - use super::single_block_lookup::LookupRequestError as E; +impl From for RequestError { + fn from(e: LookupRequestError) -> Self { + use LookupRequestError as E; match e { E::TooManyAttempts { cannot_process } => { RequestError::TooManyAttempts { cannot_process } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index e41dd1d6b0..81f968277b 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -1,18 +1,13 @@ -use super::DownlodedBlocks; -use crate::sync::block_lookups::parent_lookup::RequestResult; +use crate::sync::block_lookups::parent_lookup::LookupDownloadStatus; use crate::sync::block_lookups::RootBlockTuple; -use crate::sync::manager::BlockProcessType; -use crate::sync::network_context::SyncNetworkContext; +use beacon_chain::blob_verification::AsBlock; use beacon_chain::blob_verification::BlockWrapper; -use beacon_chain::blob_verification::{AsBlock, MaybeAvailableBlock}; -use beacon_chain::data_availability_checker::{ - AvailabilityCheckError, AvailabilityPendingBlock, DataAvailabilityChecker, -}; +use beacon_chain::data_availability_checker::{AvailabilityCheckError, DataAvailabilityChecker}; use beacon_chain::{get_block_root, BeaconChainTypes}; use lighthouse_network::rpc::methods::BlobsByRootRequest; -use lighthouse_network::{rpc::BlocksByRootRequest, PeerId, Request}; +use lighthouse_network::{rpc::BlocksByRootRequest, PeerId}; use rand::seq::IteratorRandom; -use ssz_types::VariableList; +use ssz_types::{FixedVector, VariableList}; use std::collections::HashSet; use std::sync::Arc; use store::{EthSpec, Hash256}; @@ -23,7 +18,8 @@ use types::{BlobSidecar, SignedBeaconBlock}; pub struct SingleBlockLookup { pub requested_block_root: Hash256, pub requested_ids: Vec, - pub downloaded_blobs: Vec>>>, + pub downloaded_blobs: + FixedVector>>, T::EthSpec::MaxBlobsPerBlock>, pub downloaded_block: Option>>, pub block_request_state: SingleLookupRequestState, pub blob_request_state: SingleLookupRequestState, @@ -55,16 +51,14 @@ pub enum State { } #[derive(Debug, PartialEq, Eq, IntoStaticStr)] -pub enum VerifyError { +pub enum LookupVerifyError { RootMismatch, NoBlockReturned, ExtraBlocksReturned, -} - -#[derive(Debug, PartialEq, Eq, IntoStaticStr)] -pub enum BlobVerifyError { UnrequestedBlobId, ExtraBlobsReturned, + InvalidIndex(u64), + AvailabilityCheck(AvailabilityCheckError), } #[derive(Debug, PartialEq, Eq, IntoStaticStr)] @@ -85,9 +79,9 @@ impl SingleBlockLookup Self { Self { requested_block_root, - requested_ids: vec![], + requested_ids: <_>::default(), downloaded_block: None, - downloaded_blobs: vec![], + downloaded_blobs: <_>::default(), block_request_state: SingleLookupRequestState::new(peer_id), blob_request_state: SingleLookupRequestState::new(peer_id), da_checker, @@ -97,19 +91,25 @@ impl SingleBlockLookup>>, - ) -> RequestResult { - //TODO(sean) smart extend, we don't want dupes - self.downloaded_blobs.extend(blobs); + blobs: FixedVector>>, T::EthSpec::MaxBlobsPerBlock>, + ) -> Result, LookupVerifyError> { + for (index, blob_opt) in self.downloaded_blobs.iter_mut().enumerate() { + if let Some(Some(downloaded_blob)) = blobs.get(index) { + //TODO(sean) should we log a warn if there is already a downloaded blob? + blob_opt = Some(downloaded_blob); + } + } if let Some(block) = self.downloaded_block.as_ref() { - match self.da_checker.zip_block(block_root, block, blobs) { - Ok(wrapper) => RequestResult::Process(wrapper), - Err(AvailabilityCheckError::MissingBlobs) => RequestResult::SearchBlock(block_root), - _ => todo!(), + match self.da_checker.wrap_block(block_root, block, blobs) { + Ok(wrapper) => Ok(LookupDownloadStatus::Process(wrapper)), + Err(AvailabilityCheckError::MissingBlobs) => { + Ok(LookupDownloadStatus::SearchBlock(block_root)) + } + Err(e) => Err(LookupVerifyError::AvailabilityCheck(e)), } } else { - RequestResult::SearchBlock(block_hash) + Ok(LookupDownloadStatus::SearchBlock(block_hash)) } } @@ -117,17 +117,23 @@ impl SingleBlockLookup>, - ) -> RequestResult { - //TODO(sean) check for existing block? - self.downloaded_block = Some(block); + ) -> Result, LookupVerifyError> { + for (index, blob_opt) in self.downloaded_blobs.iter_mut().enumerate() { + if let Some(Some(downloaded_blob)) = blobs.get(index) { + //TODO(sean) should we log a warn if there is already a downloaded blob? + *blob_opt = Some(downloaded_blob); + } + } match self .da_checker - .zip_block(block_root, block, self.downloaded_blobs) + .wrap_block(block_root, block, self.downloaded_blobs) { - Ok(wrapper) => RequestResult::Process(wrapper), - Err(AvailabilityCheckError::MissingBlobs) => RequestResult::SearchBlock(block_root), - _ => todo!(), + Ok(wrapper) => Ok(LookupDownloadStatus::Process(wrapper)), + Err(AvailabilityCheckError::MissingBlobs) => { + Ok(LookupDownloadStatus::SearchBlock(block_root)) + } + Err(e) => LookupVerifyError::AvailabilityCheck(e), } } @@ -135,7 +141,7 @@ impl SingleBlockLookup, - ) -> RequestResult { + ) -> Result, LookupVerifyError> { match block { BlockWrapper::Block(block) => self.add_block(block_root, block), BlockWrapper::BlockAndBlobs(block, blobs) => { @@ -144,12 +150,6 @@ impl SingleBlockLookup RequestResult::Process(wrapper), - Err(AvailabilityCheckError::MissingBlobs) => RequestResult::SearchBlock(block_root), - _ => todo!(), - } } /// Verifies if the received block matches the requested one. @@ -157,11 +157,11 @@ impl SingleBlockLookup>>, - ) -> Result>, VerifyError> { + ) -> Result>, LookupVerifyError> { match self.block_request_state.state { State::AwaitingDownload => { self.block_request_state.register_failure_downloading(); - Err(VerifyError::ExtraBlocksReturned) + Err(LookupVerifyError::ExtraBlocksReturned) } State::Downloading { peer_id } => match block { Some(block) => { @@ -173,7 +173,7 @@ impl SingleBlockLookup SingleBlockLookup { self.register_failure_downloading(); - Err(VerifyError::NoBlockReturned) + Err(LookupVerifyError::NoBlockReturned) } }, State::Processing { peer_id: _ } => match block { Some(_) => { // We sent the block for processing and received an extra block. self.block_request_state.register_failure_downloading(); - Err(VerifyError::ExtraBlocksReturned) + Err(LookupVerifyError::ExtraBlocksReturned) } None => { // This is simply the stream termination and we are already processing the @@ -203,7 +203,10 @@ impl SingleBlockLookup>>, - ) -> Result>>>, BlobVerifyError> { + ) -> Result< + Option>>, T::EthSpec::MaxBlobsPerBlock>>, + BlobVerifyError, + > { match self.block_request_state.state { State::AwaitingDownload => { self.blob_request_state.register_failure_downloading(); @@ -216,9 +219,13 @@ impl SingleBlockLookup { diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index cb8c1af46e..07d0b77423 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -607,7 +607,7 @@ fn test_single_block_lookup_ignored_response() { // after processing. bl.single_block_lookup_response(id, peer_id, None, D, &mut cx); // Send an Ignored response, the request should be dropped - bl.single_block_processed(id, BlockOrBlobProcessResult::Ignored, &mut cx); + bl.single_block_processed(id, BlockPartProcessingResult::Ignored, &mut cx); rig.expect_empty_network(); assert_eq!(bl.single_block_lookups.len(), 0); } @@ -631,7 +631,7 @@ fn test_parent_lookup_ignored_response() { rig.expect_empty_network(); // Return an Ignored result. The request should be dropped - bl.parent_block_processed(chain_hash, BlockOrBlobProcessResult::Ignored, &mut cx); + bl.parent_block_processed(chain_hash, BlockPartProcessingResult::Ignored, &mut cx); rig.expect_empty_network(); assert_eq!(bl.parent_lookups.len(), 0); } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index a77fd233fc..0ab96e3141 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -41,6 +41,7 @@ use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent}; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; +use crate::sync::block_lookups::ResponseType; use crate::sync::range_sync::ByRangeRequestType; use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::blob_verification::{AsBlock, MaybeAvailableBlock}; @@ -147,9 +148,10 @@ pub enum SyncMessage { }, /// Block processed - BlockOrBlobProcessed { + BlockPartProcessed { process_type: BlockProcessType, - result: BlockOrBlobProcessResult, + result: BlockPartProcessingResult, + response_type: ResponseType, }, } @@ -161,7 +163,7 @@ pub enum BlockProcessType { } #[derive(Debug)] -pub enum BlockOrBlobProcessResult { +pub enum BlockPartProcessingResult { Ok(AvailabilityProcessingStatus), Err(BlockError), Ignored, @@ -649,9 +651,24 @@ impl SyncManager { { let parent_root = block.parent_root(); //TODO(sean) what about early blocks + let slot = match self.chain.slot_clock.now() { + Some(slot) => slot, + None => { + error!( + self.log, + "Could not read slot clock, dropping unknown block message" + ); + return; + } + }; + if block.slot() == self.chain.slot_clock.now() { - self.delayed_lookups - .send(SyncMessage::UnknownBlock(peer_id, block, block_root)); + if let Err(e) = self + .delayed_lookups + .send(SyncMessage::UnknownBlock(peer_id, block, block_root)) + { + warn!(self.log, "Delayed lookups receiver dropped for block"; "block_root" => block_hash); + } } else { self.block_lookups.search_current_unknown_parent( block_root, @@ -661,6 +678,7 @@ impl SyncManager { ); } self.block_lookups.search_parent( + block.slot(), block_root, parent_root, peer_id, @@ -679,11 +697,27 @@ impl SyncManager { // If we are not synced, ignore this block. if self.synced_and_connected(&peer_id) { //TODO(sean) what about early gossip messages? - if Some(slot) == self.chain.slot_clock.now() { - self.delayed_lookups - .send(SyncMessage::UnknownBlockHashFromAttestation( - peer_id, block_hash, - )) + let current_slot = match self.chain.slot_clock.now() { + Some(slot) => slot, + None => { + error!( + self.log, + "Could not read slot clock, dropping unknown block message" + ); + return; + } + }; + + if slot == current_slot { + if let Err(e) = + self.delayed_lookups + .send(SyncMessage::UnknownBlockHashFromAttestation( + peer_id, block_hash, + )) + { + warn!(self.log, "Delayed lookups receiver dropped for block referenced by a blob"; + "block_root" => block_hash); + } } else { self.block_lookups .search_block(block_hash, peer_id, &mut self.network) @@ -698,17 +732,20 @@ impl SyncManager { request_id, error, } => self.inject_error(peer_id, request_id, error), - SyncMessage::BlockOrBlobProcessed { + SyncMessage::BlockPartProcessed { process_type, result, + response_type, } => match process_type { - BlockProcessType::SingleBlock { id } => { - self.block_lookups - .single_block_processed(id, result, &mut self.network) - } + BlockProcessType::SingleBlock { id } => self.block_lookups.single_block_processed( + id, + result, + response_type, + &mut self.network, + ), BlockProcessType::ParentLookup { chain_hash } => self .block_lookups - .parent_block_processed(chain_hash, result, &mut self.network), + .parent_block_processed(chain_hash, result, response_type, &mut self.network), }, SyncMessage::BatchProcessed { sync_type, result } => match sync_type { ChainSegmentProcessId::RangeBatchId(chain_id, epoch, _) => { @@ -1011,18 +1048,18 @@ impl SyncManager { } impl From>> - for BlockOrBlobProcessResult + for BlockPartProcessingResult { fn from(result: Result>) -> Self { match result { - Ok(status) => BlockOrBlobProcessResult::Ok(status), - Err(e) => BlockOrBlobProcessResult::Err(e), + Ok(status) => BlockPartProcessingResult::Ok(status), + Err(e) => BlockPartProcessingResult::Err(e), } } } -impl From> for BlockOrBlobProcessResult { +impl From> for BlockPartProcessingResult { fn from(e: BlockError) -> Self { - BlockOrBlobProcessResult::Err(e) + BlockPartProcessingResult::Err(e) } }