From 25ff6e8a5f290b0d50634f4008ef73e6bd4c7d15 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Tue, 11 Apr 2023 13:13:13 -0400 Subject: [PATCH] more work --- beacon_node/beacon_chain/src/beacon_chain.rs | 6 +- .../beacon_chain/src/block_verification.rs | 11 +- .../src/data_availability_checker.rs | 22 +- .../network/src/beacon_processor/mod.rs | 4 +- .../work_reprocessing_queue.rs | 4 +- .../beacon_processor/worker/gossip_methods.rs | 6 - .../beacon_processor/worker/sync_methods.rs | 14 +- .../network/src/sync/block_lookups/mod.rs | 521 +++++++++++++----- .../src/sync/block_lookups/parent_lookup.rs | 267 +++++++-- .../sync/block_lookups/single_block_lookup.rs | 357 +++++++----- .../network/src/sync/block_lookups/tests.rs | 8 +- beacon_node/network/src/sync/manager.rs | 43 +- .../network/src/sync/network_context.rs | 132 ++--- 13 files changed, 918 insertions(+), 477 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 5a6c043933..6d2fca68d5 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2880,9 +2880,9 @@ impl BeaconChain { Availability::PendingBlock(block_root) => { Ok(AvailabilityProcessingStatus::PendingBlock(block_root)) } - Availability::PendingBlobs(block_root, blob_ids) => { - Ok(AvailabilityProcessingStatus::PendingBlobs(block_root, blob_ids)) - } + Availability::PendingBlobs(block_root, blob_ids) => Ok( + AvailabilityProcessingStatus::PendingBlobs(block_root, blob_ids), + ), } } diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 9f31306d21..8b292c400f 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -146,7 +146,7 @@ pub enum BlockError { /// /// It's unclear if this block is valid, but it cannot be processed without already knowing /// its parent. - ParentUnknown(BlockWrapper), + ParentUnknown(MaybeAvailableBlock), /// The block skips too many slots and is a DoS risk. TooManySkippedSlots { parent_slot: Slot, @@ -311,7 +311,6 @@ pub enum BlockError { parent_root: Hash256, }, BlobValidation(BlobError), - AvailabilityCheck(AvailabilityCheckError), } impl From for BlockError { @@ -1332,7 +1331,7 @@ impl ExecutionPendingBlock { // because it will revert finalization. Note that the finalized block is stored in fork // choice, so we will not reject any child of the finalized block (this is relevant during // genesis). - return Err(BlockError::ParentUnknown(block.into_block_wrapper())); + return Err(BlockError::ParentUnknown(block)); } // Reject any block that exceeds our limit on skipped slots. @@ -1796,7 +1795,7 @@ pub fn check_block_is_finalized_checkpoint_or_descendant< block_parent_root: block.parent_root(), }) } else { - Err(BlockError::ParentUnknown(block.into_block_wrapper())) + Err(BlockError::ParentUnknown(block)) } } } @@ -1877,7 +1876,7 @@ fn verify_parent_block_is_known( { Ok((proto_block, block)) } else { - Err(BlockError::ParentUnknown(block.into_block_wrapper())) + Err(BlockError::ParentUnknown(block)) } } @@ -1908,7 +1907,7 @@ fn load_parent>( .fork_choice_read_lock() .contains_block(&block.parent_root()) { - return Err(BlockError::ParentUnknown(block.into_block_wrapper())); + return Err(BlockError::ParentUnknown(block)); } let block_delay = chain diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 96d8ae3810..80ebc753d0 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -426,7 +426,7 @@ impl DataAvailabilityChecker { BlobRequirements::PreDeneb => VerifiedBlobs::PreDeneb, BlobRequirements::Required => { return Ok(MaybeAvailableBlock::AvailabilityPending( - AvailabilityPendingBlock { block }, + AvailabilityPendingBlock { block, blobs }, )) } }; @@ -547,9 +547,22 @@ pub enum BlobRequirements { #[derive(Clone, Debug, PartialEq)] pub struct AvailabilityPendingBlock { block: Arc>, + missing_blob_ids: Vec, } impl AvailabilityPendingBlock { + pub fn get_missing_blob_ids(&self) -> &Vec { + &self.missing_blob_ids + } + + pub fn has_blob(mut self, blob_id: &BlobIdentifier) -> bool { + if let Some(Some(blob)) = self.blobs.get(blob_id.index as usize) { + blob.block_root == blob_id.block_root + } else { + false + } + } + pub fn num_blobs_expected(&self) -> usize { self.kzg_commitments() .map_or(0, |commitments| commitments.len()) @@ -623,6 +636,13 @@ impl AvailableBlock { &self.block } + pub fn da_check_required(&self) -> bool { + match self.blobs { + VerifiedBlobs::PreDeneb | VerifiedBlobs::NotRequired => false, + VerifiedBlobs::EmptyBlobs | VerifiedBlobs::Available(_) => true, + } + } + pub fn deconstruct(self) -> (Arc>, Option>) { match self.blobs { VerifiedBlobs::EmptyBlobs | VerifiedBlobs::NotRequired | VerifiedBlobs::PreDeneb => { diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 9e3c0bd0c8..3c2e277dbc 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -609,7 +609,7 @@ impl WorkEvent { /// sent to the other side of `result_tx`. pub fn rpc_beacon_block( block_root: Hash256, - block: Arc>, + block: BlockWrapper, seen_timestamp: Duration, process_type: BlockProcessType, ) -> Self { @@ -933,7 +933,7 @@ pub enum Work { }, RpcBlock { block_root: Hash256, - block: Arc>, + block: BlockWrapper, seen_timestamp: Duration, process_type: BlockProcessType, should_process: bool, diff --git a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs index 0ed6ae72f9..573b569b12 100644 --- a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs +++ b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs @@ -13,7 +13,7 @@ use super::MAX_SCHEDULED_WORK_QUEUE_LEN; use crate::metrics; use crate::sync::manager::BlockProcessType; -use beacon_chain::blob_verification::AsBlock; +use beacon_chain::blob_verification::{AsBlock, BlockWrapper}; use beacon_chain::{BeaconChainTypes, GossipVerifiedBlock, MAXIMUM_GOSSIP_CLOCK_DISPARITY}; use fnv::FnvHashMap; use futures::task::Poll; @@ -137,7 +137,7 @@ pub struct QueuedGossipBlock { /// It is queued for later import. pub struct QueuedRpcBlock { pub block_root: Hash256, - pub block: Arc>, + pub block: BlockWrapper, pub process_type: BlockProcessType, pub seen_timestamp: Duration, /// Indicates if the beacon chain should process this block or not. diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 141c958676..07bf3e8663 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -845,9 +845,6 @@ impl Worker { verified_block } - Err(BlockError::AvailabilityCheck(_err)) => { - todo!() - } Err(BlockError::ParentUnknown(block)) => { debug!( self.log, @@ -1074,9 +1071,6 @@ impl Worker { search_delay: Duration::from_secs(0), //TODO(sean) update }); } - Err(BlockError::AvailabilityCheck(_)) => { - todo!() - } Err(BlockError::ParentUnknown(block)) => { // Inform the sync manager to find parents for this block // This should not occur. It should be checked by `should_forward_block` diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index e856e9a075..f2af2a03df 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -45,7 +45,7 @@ impl Worker { pub async fn process_rpc_block( self, block_root: Hash256, - block: Arc>, + block: BlockWrapper, seen_timestamp: Duration, process_type: BlockProcessType, reprocess_tx: mpsc::Sender>, @@ -54,9 +54,9 @@ impl Worker { ) { if !should_process { // Sync handles these results - self.send_sync_message(SyncMessage::BlockProcessed { + self.send_sync_message(SyncMessage::BlockOrBlobProcessed { process_type, - result: crate::sync::manager::BlockProcessResult::Ignored, + result: crate::sync::manager::BlockOrBlobProcessResult::Ignored, }); return; } @@ -88,6 +88,8 @@ impl Worker { let slot = block.slot(); let parent_root = block.message().parent_root(); + // TODO(sean) check availability here and send information to sync? + let result = self .chain .process_block( @@ -127,7 +129,7 @@ impl Worker { } } // Sync handles these results - self.send_sync_message(SyncMessage::BlockProcessed { + self.send_sync_message(SyncMessage::BlockOrBlobProcessed { process_type, result: result.into(), }); @@ -156,9 +158,9 @@ impl Worker { .await; // Sync handles these results - self.send_sync_message(SyncMessage::BlobProcessed { + self.send_sync_message(SyncMessage::BlockOrBlobProcessed { process_type, - result, + result: result.into(), }); } diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 66410fd360..e97703f2a3 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -4,8 +4,8 @@ use std::sync::Arc; use std::thread::sleep; use std::time::Duration; -use beacon_chain::blob_verification::AsBlock; use beacon_chain::blob_verification::BlockWrapper; +use beacon_chain::blob_verification::{AsBlock, MaybeAvailableBlock}; use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; use fnv::FnvHashMap; use itertools::Itertools; @@ -20,7 +20,8 @@ use types::{BlobSidecar, SignedBeaconBlock}; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent}; use crate::metrics; -use crate::sync::block_lookups::single_block_lookup::SingleBlobRequest; +use crate::sync::block_lookups::parent_lookup::ParentRequest; +use crate::sync::block_lookups::single_block_lookup::SingleBlobsRequest; use self::parent_lookup::PARENT_FAIL_TOLERANCE; use self::{ @@ -28,7 +29,7 @@ use self::{ single_block_lookup::SingleBlockRequest, }; -use super::manager::BlockProcessResult; +use super::manager::BlockOrBlobProcessResult; use super::BatchProcessResult; use super::{ manager::{BlockProcessType, Id}, @@ -40,6 +41,7 @@ mod single_block_lookup; #[cfg(test)] mod tests; +pub type DownlodedBlocks = (Hash256, MaybeAvailableBlock); pub type RootBlockTuple = (Hash256, Arc>); const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60; @@ -49,8 +51,14 @@ pub(crate) struct BlockLookups { /// Parent chain lookups being downloaded. parent_lookups: SmallVec<[ParentLookup; 3]>, - processing_parent_lookups: - HashMap, SingleBlockRequest)>, + processing_parent_lookups: HashMap< + Hash256, + ( + Vec, + SingleBlockRequest, + Option>, + ), + >, /// A cache of failed chain lookups to prevent duplicate searches. failed_chains: LRUTimeCache, @@ -59,14 +67,20 @@ pub(crate) struct BlockLookups { /// received or not. /// /// The flag allows us to determine if the peer returned data or sent us nothing. - single_block_lookups: FnvHashMap>, + single_block_lookups: + FnvHashMap>, - single_blob_lookups: FnvHashMap>, + single_blob_lookups: + FnvHashMap>, /// The logger for the import manager. log: Logger, } +// 1. on a completed single block lookup or single blob lookup, don't send for processing if a parent +// chain is being requested or processed +// 2. when a chain is processed, find the child requests and send for processing + impl BlockLookups { pub fn new(log: Logger) -> Self { Self { @@ -96,7 +110,7 @@ impl BlockLookups { } if self.parent_lookups.iter_mut().any(|parent_req| { - parent_req.add_peer(&hash, &peer_id) || parent_req.contains_block(&hash) + parent_req.add_block_peer(&hash, &peer_id) || parent_req.contains_block(&hash) }) { // If the block was already downloaded, or is being downloaded in this moment, do not // request it. @@ -122,7 +136,7 @@ impl BlockLookups { let mut single_block_request = SingleBlockRequest::new(hash, peer_id); let (peer_id, request) = single_block_request - .request_block() + .make_request() .expect("none of the possible failure cases apply for a newly created block lookup"); if let Ok(request_id) = cx.single_block_lookup_request(peer_id, request) { self.single_block_lookups @@ -142,52 +156,61 @@ impl BlockLookups { peer_id: PeerId, cx: &mut SyncNetworkContext, ) { - // Do not re-request blobs that are already being requested - if self - .single_blob_lookups - .values_mut() - .any(|single_block_request| single_block_request.add_peer(&blob_ids, &peer_id)) - { - return; - } - // - // if self.parent_lookups.iter_mut().any(|parent_req| { - // parent_req.add_peer(&hash, &peer_id) || parent_req.contains_block(&hash) - // }) { - // // If the block was already downloaded, or is being downloaded in this moment, do not - // // request it. - // return; - // } - // - // if self - // .processing_parent_lookups - // .values() - // .any(|(hashes, _last_parent_request)| hashes.contains(&hash)) - // { - // // we are already processing this block, ignore it. - // return; - // } - // - // debug!( - // self.log, - // "Searching for block"; - // "peer_id" => %peer_id, - // "block" => %hash - // ); - // - // let mut single_block_request = SingleBlobRequest::new(hash, peer_id); - // - // let (peer_id, request) = single_block_request - // .request_block() - // .expect("none of the possible failure cases apply for a newly created block lookup"); - // if let Ok(request_id) = cx.single_block_lookup_request(peer_id, request) { - // self.single_blob_lookups - // .insert(request_id, single_block_request); - // - // metrics::set_gauge( - // &metrics::SYNC_SINGLE_BLOB_LOOKUPS, - // self.single_blob_lookups.len() as i64, - // ); + let to_request = blob_ids + .into_iter() + .filter(|id| { + // Do not re-request blobs that are already being requested + if self + .single_blob_lookups + .values_mut() + .any(|single_blob_request| { + single_blob_request.add_peer_if_useful(&blob_ids, &peer_id) + }) + { + return false; + } + + if self.parent_lookups.iter_mut().any(|parent_req| { + parent_req.add_blobs_peer(&blob_ids, &peer_id) || parent_req.contains_blob(id) + }) { + // If the blob was already downloaded, or is being downloaded in this moment, do not + // request it. + return false; + } + + if self + .processing_parent_lookups + .values() + .any(|(hashes, _, _)| hashes.contains(&id.block_root)) + { + // we are already processing this blob, ignore it. + return false; + } + true + }) + .collect(); + + debug!( + self.log, + "Searching for blobs"; + "peer_id" => %peer_id, + "blobs" => %to_request + ); + + let mut single_blob_request = SingleBlobsRequest::new(to_request, peer_id); + + let (peer_id, request) = single_blob_request + .make_request() + .expect("none of the possible failure cases apply for a newly created blob lookup"); + if let Ok(request_id) = cx.single_blobs_lookup_request(peer_id, request) { + self.single_blob_lookups + .insert(request_id, single_blob_request); + + metrics::set_gauge( + &metrics::SYNC_SINGLE_BLOB_LOOKUPS, + self.single_blob_lookups.len() as i64, + ); + } } pub fn search_block_delayed( @@ -199,6 +222,8 @@ impl BlockLookups { ) { //TODO(sean) handle delay //TODO(sean) cannot use peer id here cause it assumes it has the block, this is from gossip so not true + // + // after the delay expires, need to check da cache for what we have before requesting self.search_block(hash, peer_id, cx); } @@ -211,6 +236,7 @@ impl BlockLookups { cx: &mut SyncNetworkContext, ) { //TODO(sean) handle delay + // after the delay expires, need to check da cache for what we have before requesting self.search_blobs(block_root, blob_ids, peer_id, cx); } @@ -219,17 +245,10 @@ impl BlockLookups { pub fn search_parent( &mut self, block_root: Hash256, - block: BlockWrapper, + block: MaybeAvailableBlock, peer_id: PeerId, cx: &mut SyncNetworkContext, ) { - // - // let missing_ids = cx.chain.data_availability_checker.get_missing_blob_ids(block, Some(root)); - // // TODO(sean) how do we handle this erroring? - // if let Ok(missing_ids) = missing_ids { - // self.search_blobs(missing_ids, peer_id, cx); - // } - let parent_root = block.parent_root(); // If this block or it's parent is part of a known failed chain, ignore it. if self.failed_chains.contains(&parent_root) || self.failed_chains.contains(&block_root) { @@ -242,8 +261,8 @@ impl BlockLookups { // being searched for. if self.parent_lookups.iter_mut().any(|parent_req| { parent_req.contains_block(&block_root) - || parent_req.add_peer(&block_root, &peer_id) - || parent_req.add_peer(&parent_root, &peer_id) + || parent_req.add_block_peer(&block_root, &peer_id) + || parent_req.add_block_peer(&parent_root, &peer_id) }) { // we are already searching for this block, ignore it return; @@ -259,7 +278,7 @@ impl BlockLookups { } let parent_lookup = ParentLookup::new(block_root, block, peer_id); - self.request_parent(parent_lookup, cx); + self.request_parent_block_and_blobs(parent_lookup, cx); } /* Lookup responses */ @@ -285,8 +304,11 @@ impl BlockLookups { } }; - match request.get_mut().verify_block(block) { + match request.get_mut().verify_response(block) { Ok(Some((block_root, block))) => { + //TODO(sean) only send for processing if we don't have parent requests trigger + // for this block + // This is the correct block, send it for processing if self .send_block_for_processing( @@ -314,7 +336,7 @@ impl BlockLookups { debug!(self.log, "Single block lookup failed"; "peer_id" => %peer_id, "error" => msg, "block_root" => %req.requested_thing); // try the request again if possible - if let Ok((peer_id, request)) = req.request_block() { + if let Ok((peer_id, request)) = req.make_request() { if let Ok(id) = cx.single_block_lookup_request(peer_id, request) { self.single_block_lookups.insert(id, req); } @@ -340,7 +362,7 @@ impl BlockLookups { let mut parent_lookup = if let Some(pos) = self .parent_lookups .iter() - .position(|request| request.pending_response(id)) + .position(|request| request.pending_block_response(id)) { self.parent_lookups.remove(pos) } else { @@ -352,19 +374,47 @@ impl BlockLookups { match parent_lookup.verify_block(block, &mut self.failed_chains) { Ok(Some((block_root, block))) => { - // Block is correct, send to the beacon processor. - let chain_hash = parent_lookup.chain_hash(); - if self - .send_block_for_processing( - block_root, - block, - seen_timestamp, - BlockProcessType::ParentLookup { chain_hash }, - cx, - ) - .is_ok() - { - self.parent_lookups.push(parent_lookup) + let block_wrapper = parent_lookup + .current_parent_blob_request + .as_ref() + .map_or(BlockWrapper::Block(block.clone()), |req| { + BlockWrapper::BlockAndBlobs(block, req.downloaded_blobs.clone()) + }); + + let maybe_available = cx + .chain + .data_availability_checker + .check_availability(wrapper) + .unwrap(); //TODO(sean) remove unwrap + match maybe_available { + MaybeAvailableBlock::Available(available) => { + if self + .send_block_for_processing( + block_root, + available, + seen_timestamp, + BlockProcessType::ParentLookup { chain_hash }, + cx, + ) + .is_ok() + { + self.parent_lookups.push(parent_lookup) + } + } + MaybeAvailableBlock::AvailabilityPending(pending) => { + let missing_ids = pending.get_missing_blob_ids(); + + self.search_blobs(block_root, missing_ids, peer_id, cx); + let _ = parent_lookup + .current_parent_request + .downloaded_block + .insert(( + block_root, + MaybeAvailableBlock::AvailabilityPending(pending), + )); + + self.parent_lookups.push(parent_lookup) + } } } Ok(None) => { @@ -385,7 +435,7 @@ impl BlockLookups { cx.report_peer(peer_id, PeerAction::LowToleranceError, e); // We try again if possible. - self.request_parent(parent_lookup, cx); + self.request_parent_block(parent_lookup, cx); } VerifyError::PreviousFailure { parent_root } => { debug!( @@ -415,22 +465,180 @@ impl BlockLookups { &mut self, id: Id, peer_id: PeerId, - block: Option>>, + blob: Option>>, seen_timestamp: Duration, cx: &mut SyncNetworkContext, ) { - todo!() + let mut request = match self.single_blob_lookups.entry(id) { + Entry::Occupied(req) => req, + Entry::Vacant(_) => { + if blob.is_some() { + debug!( + self.log, + "Block returned for single blob lookup not present" + ); + } + return; + } + }; + + match request.get_mut().verify_blob(blob) { + Ok(Some((block_root, blobs))) => { + //TODO(sean) only send for processing if we don't have parent requests trigger + // for this block + + // This is the correct block, send it for processing + if self + .send_block_for_processing( + block_root, + block, + seen_timestamp, + BlockProcessType::SingleBlock { id }, + cx, + ) + .is_err() + { + // Remove to avoid inconsistencies + self.single_block_lookups.remove(&id); + } + } + Ok(None) => { + // request finished correctly, it will be removed after the block is processed. + } + Err(error) => { + let msg: &str = error.into(); + cx.report_peer(peer_id, PeerAction::LowToleranceError, msg); + // Remove the request, if it can be retried it will be added with a new id. + let mut req = request.remove(); + + debug!(self.log, "Single block lookup failed"; + "peer_id" => %peer_id, "error" => msg, "block_root" => %req.requested_thing); + // try the request again if possible + if let Ok((peer_id, request)) = req.make_request() { + if let Ok(id) = cx.single_block_lookup_request(peer_id, request) { + self.single_block_lookups.insert(id, req); + } + } + } + } + + metrics::set_gauge( + &metrics::SYNC_SINGLE_BLOCK_LOOKUPS, + self.single_block_lookups.len() as i64, + ); } pub fn parent_lookup_blob_response( &mut self, id: Id, peer_id: PeerId, - block: Option>>, + blob: Option>>, seen_timestamp: Duration, cx: &mut SyncNetworkContext, ) { - todo!() + let mut parent_lookup = if let Some(pos) = self + .parent_lookups + .iter() + .position(|request| request.pending_blob_response(id)) + { + self.parent_lookups.remove(pos) + } else { + if blob.is_some() { + debug!(self.log, "Response for a parent lookup blob request that was not found"; "peer_id" => %peer_id); + } + return; + }; + + match parent_lookup.verify_blob(blob, &mut self.failed_chains) { + Ok(Some(blobs)) => { + if let Some((block_root, block)) = + parent_lookup.current_parent_request.downloaded_block.take() + { + let block_wrapper = parent_lookup + .current_parent_blob_request + .as_ref() + .map_or(BlockWrapper::Block(block.clone()), |req| { + BlockWrapper::BlockAndBlobs(block, req.downloaded_blobs.clone()) + }); + + let maybe_available = cx + .chain + .data_availability_checker + .check_availability(wrapper) + .unwrap(); //TODO(sean) remove unwrap + match maybe_available { + MaybeAvailableBlock::Available(available) => { + if self + .send_block_for_processing( + block_root, + available, + seen_timestamp, + BlockProcessType::ParentLookup { chain_hash }, + cx, + ) + .is_ok() + { + self.parent_lookups.push(parent_lookup) + } + } + MaybeAvailableBlock::AvailabilityPending(pending) => { + let missing_ids = pending.get_missing_blob_ids(); + + self.search_blobs(block_root, missing_ids, peer_id, cx); + parent_lookup + .current_parent_request + .downloaded_block + .insert(( + block_root, + MaybeAvailableBlock::AvailabilityPending(pending), + )); + self.parent_lookups.push(parent_lookup) + } + } + } + } + Ok(None) => { + // Request finished successfully, nothing else to do. It will be removed after the + // processing result arrives. + self.parent_lookups.push(parent_lookup); + } + Err(e) => match e.into() { + VerifyError::RootMismatch + | VerifyError::NoBlockReturned + | VerifyError::ExtraBlocksReturned => { + let e = e.into(); + warn!(self.log, "Peer sent invalid response to parent request."; + "peer_id" => %peer_id, "reason" => %e); + + // We do not tolerate these kinds of errors. We will accept a few but these are signs + // of a faulty peer. + cx.report_peer(peer_id, PeerAction::LowToleranceError, e); + + // We try again if possible. + self.request_parent_blob(parent_lookup, cx); + } + VerifyError::PreviousFailure { parent_root } => { + debug!( + self.log, + "Parent chain ignored due to past failure"; + "block" => %parent_root, + ); + // Add the root block to failed chains + self.failed_chains.insert(parent_lookup.chain_hash()); + + cx.report_peer( + peer_id, + PeerAction::MidToleranceError, + "bbroot_failed_chains", + ); + } + }, + }; + + metrics::set_gauge( + &metrics::SYNC_PARENT_BLOCK_LOOKUPS, + self.parent_lookups.len() as i64, + ); } /* Error responses */ @@ -457,7 +665,7 @@ impl BlockLookups { .collect::>() { // retry the request - match req.request_block() { + match req.make_request() { Ok((peer_id, block_request)) => { if let Ok(request_id) = cx.single_block_lookup_request(peer_id, block_request) { self.single_block_lookups.insert(request_id, req); @@ -479,12 +687,14 @@ impl BlockLookups { while let Some(pos) = self .parent_lookups .iter_mut() - .position(|req| req.check_peer_disconnected(peer_id).is_err()) + .position(|req| req.check_block_peer_disconnected(peer_id).is_err()) { let parent_lookup = self.parent_lookups.remove(pos); trace!(self.log, "Parent lookup's peer disconnected"; &parent_lookup); - self.request_parent(parent_lookup, cx); + self.request_parent_block_and_blobs(parent_lookup, cx); } + + //TODO(sean) add lookups for blobs } /// An RPC error has occurred during a parent lookup. This function handles this case. @@ -498,13 +708,13 @@ impl BlockLookups { if let Some(pos) = self .parent_lookups .iter() - .position(|request| request.pending_response(id)) + .position(|request| request.pending_block_response(id)) { let mut parent_lookup = self.parent_lookups.remove(pos); - parent_lookup.download_failed(); + parent_lookup.block_download_failed(id); trace!(self.log, "Parent lookup request failed"; &parent_lookup); - self.request_parent(parent_lookup, cx); + self.request_parent_block(parent_lookup, cx); } else { return debug!(self.log, "RPC failure for a parent lookup request that was not found"; "peer_id" => %peer_id); }; @@ -518,7 +728,7 @@ impl BlockLookups { if let Some(mut request) = self.single_block_lookups.remove(&id) { request.register_failure_downloading(); trace!(self.log, "Single block lookup failed"; "block" => %request.requested_thing); - if let Ok((peer_id, block_request)) = request.request_block() { + if let Ok((peer_id, block_request)) = request.make_request() { if let Ok(request_id) = cx.single_block_lookup_request(peer_id, block_request) { self.single_block_lookups.insert(request_id, request); } @@ -536,7 +746,7 @@ impl BlockLookups { pub fn single_block_processed( &mut self, id: Id, - result: BlockProcessResult, + result: BlockOrBlobProcessResult, cx: &mut SyncNetworkContext, ) { let mut req = match self.single_block_lookups.remove(&id) { @@ -556,7 +766,7 @@ impl BlockLookups { }; match result { - BlockProcessResult::Ok(status) => match status { + BlockOrBlobProcessResult::Ok(status) => match status { AvailabilityProcessingStatus::Imported(hash) => { trace!(self.log, "Single block processing succeeded"; "block" => %root); } @@ -567,7 +777,7 @@ impl BlockLookups { warn!(self.log, "Block processed but returned PendingBlock"; "block" => %hash); } }, - BlockProcessResult::Ignored => { + BlockOrBlobProcessResult::Ignored => { // Beacon processor signalled to ignore the block processing result. // This implies that the cpu is overloaded. Drop the request. warn!( @@ -576,7 +786,7 @@ impl BlockLookups { "action" => "dropping single block request" ); } - BlockProcessResult::Err(e) => { + BlockOrBlobProcessResult::Err(e) => { trace!(self.log, "Single block processing failed"; "block" => %root, "error" => %e); match e { BlockError::BlockIsAlreadyKnown => { @@ -608,7 +818,7 @@ impl BlockLookups { ); // Try it again if possible. req.register_failure_processing(); - if let Ok((peer_id, request)) = req.request_block() { + if let Ok((peer_id, request)) = req.make_request() { if let Ok(request_id) = cx.single_block_lookup_request(peer_id, request) { // insert with the new id @@ -629,7 +839,7 @@ impl BlockLookups { pub fn parent_block_processed( &mut self, chain_hash: Hash256, - result: BlockProcessResult, + result: BlockOrBlobProcessResult, cx: &mut SyncNetworkContext, ) { let (mut parent_lookup, peer_id) = if let Some((pos, peer)) = self @@ -638,7 +848,7 @@ impl BlockLookups { .enumerate() .find_map(|(pos, request)| { request - .get_processing_peer(chain_hash) + .get_block_processing_peer(chain_hash) .map(|peer| (pos, peer)) }) { (self.parent_lookups.remove(pos), peer) @@ -647,23 +857,25 @@ impl BlockLookups { }; match &result { - BlockProcessResult::Ok(status) => { + BlockOrBlobProcessResult::Ok(status) => { match status { AvailabilityProcessingStatus::Imported(hash) => { trace!(self.log, "Parent block processing succeeded"; &parent_lookup) } AvailabilityProcessingStatus::PendingBlobs(block_root, blobs) => { // trigger? + + // make sure we have a pending blobs request outstanding } AvailabilityProcessingStatus::PendingBlock(hash) => { // logic error } } } - BlockProcessResult::Err(e) => { + BlockOrBlobProcessResult::Err(e) => { trace!(self.log, "Parent block processing failed"; &parent_lookup, "error" => %e) } - BlockProcessResult::Ignored => { + BlockOrBlobProcessResult::Ignored => { trace!( self.log, "Parent block processing job was ignored"; @@ -674,27 +886,21 @@ impl BlockLookups { } match result { - BlockProcessResult::Ok(AvailabilityProcessingStatus::PendingBlock(_)) => { + BlockOrBlobProcessResult::Ok(AvailabilityProcessingStatus::PendingBlock(_)) => { // doesn't make sense } - BlockProcessResult::Ok(AvailabilityProcessingStatus::PendingBlobs(block_root, blobs_ids)) => { - self.search_blobs(block_root, blobs_ids, peer_id, cx); + BlockOrBlobProcessResult::Ok(AvailabilityProcessingStatus::PendingBlobs( + block_root, + blobs_ids, + )) => { + self.search_blobs(block_root, blobs_ids, peer_id, cx); } - BlockProcessResult::Err(BlockError::ParentUnknown(block)) => { - // TODO(sean) how do we handle this erroring? - let missing_ids = cx - .chain - .data_availability_checker - .get_missing_blob_ids(block.clone(), None) - .unwrap_or_default(); - if let Some(block_root) = missing_ids.first().map(|first_id| first_id.block_root){ - self.search_blobs(block_root, missing_ids, peer_id, cx); - } + BlockOrBlobProcessResult::Err(BlockError::ParentUnknown(block)) => { parent_lookup.add_block(block); - self.request_parent(parent_lookup, cx); + self.request_parent_block_and_blobs(parent_lookup, cx); } - BlockProcessResult::Ok(AvailabilityProcessingStatus::Imported(_)) - | BlockProcessResult::Err(BlockError::BlockIsAlreadyKnown { .. }) => { + BlockOrBlobProcessResult::Ok(AvailabilityProcessingStatus::Imported(_)) + | BlockOrBlobProcessResult::Err(BlockError::BlockIsAlreadyKnown { .. }) => { // Check if the beacon processor is available let beacon_processor_send = match cx.processor_channel_if_enabled() { Some(channel) => channel, @@ -706,7 +912,8 @@ impl BlockLookups { ); } }; - let (chain_hash, blocks, hashes, request) = parent_lookup.parts_for_processing(); + let (chain_hash, blocks, hashes, block_request, blob_request) = + parent_lookup.parts_for_processing(); let process_id = ChainSegmentProcessId::ParentLookup(chain_hash); let work = WorkEvent::chain_segment(process_id, blocks); @@ -714,7 +921,7 @@ impl BlockLookups { match beacon_processor_send.try_send(work) { Ok(_) => { self.processing_parent_lookups - .insert(chain_hash, (hashes, request)); + .insert(chain_hash, (hashes, block_request)); } Err(e) => { error!( @@ -725,7 +932,7 @@ impl BlockLookups { } } } - ref e @ BlockProcessResult::Err(BlockError::ExecutionPayloadError(ref epe)) + ref e @ BlockOrBlobProcessResult::Err(BlockError::ExecutionPayloadError(ref epe)) if !epe.penalize_peer() => { // These errors indicate that the execution layer is offline @@ -737,7 +944,7 @@ impl BlockLookups { "error" => ?e ); } - BlockProcessResult::Err(outcome) => { + BlockOrBlobProcessResult::Err(outcome) => { // all else we consider the chain a failure and downvote the peer that sent // us the last block warn!( @@ -752,10 +959,10 @@ impl BlockLookups { cx.report_peer(peer_id, PeerAction::MidToleranceError, "parent_request_err"); // Try again if possible - parent_lookup.processing_failed(); - self.request_parent(parent_lookup, cx); + parent_lookup.block_processing_failed(); + self.request_parent_block(parent_lookup, cx); } - BlockProcessResult::Ignored => { + BlockOrBlobProcessResult::Ignored => { // Beacon processor signalled to ignore the block processing result. // This implies that the cpu is overloaded. Drop the request. warn!( @@ -772,24 +979,6 @@ impl BlockLookups { ); } - pub fn single_blob_processed( - &mut self, - id: Id, - result: BlockProcessResult, - cx: &mut SyncNetworkContext, - ) { - todo!() - } - - pub fn parent_blob_processed( - &mut self, - chain_hash: Hash256, - result: BlockProcessResult, - cx: &mut SyncNetworkContext, - ) { - todo!() - } - pub fn parent_chain_processed( &mut self, chain_hash: Hash256, @@ -806,7 +995,7 @@ impl BlockLookups { debug!(self.log, "Parent chain processed"; "chain_hash" => %chain_hash, "result" => ?result); match result { BatchProcessResult::Success { .. } => { - // nothing to do. + //TODO(sean) find single blob and block lookups and send for processing } BatchProcessResult::FaultyFailure { imported_blocks: _, @@ -833,7 +1022,7 @@ impl BlockLookups { fn send_block_for_processing( &mut self, block_root: Hash256, - block: Arc>, + block: BlockWrapper, duration: Duration, process_type: BlockProcessType, cx: &mut SyncNetworkContext, @@ -860,12 +1049,42 @@ impl BlockLookups { } } - fn request_parent( + fn request_parent_block( &mut self, mut parent_lookup: ParentLookup, cx: &mut SyncNetworkContext, ) { - match parent_lookup.request_parent(cx) { + let response = parent_lookup.request_parent_block(cx); + self.handle_response(parent_lookup, response); + } + + fn request_parent_blob( + &mut self, + mut parent_lookup: ParentLookup, + cx: &mut SyncNetworkContext, + ) { + let response = parent_lookup.request_parent_blobs(cx); + self.handle_response(parent_lookup, response); + } + + fn request_parent_block_and_blobs( + &mut self, + mut parent_lookup: ParentLookup, + cx: &mut SyncNetworkContext, + ) { + let response = parent_lookup + .request_parent_block(cx) + .and_then(|| parent_lookup.request_parent_blobs(cx)); + self.handle_response(parent_lookup, response); + } + + //TODO(sean) how should peer scoring work with failures in this method? + fn handle_response( + &mut self, + mut parent_lookup: ParentLookup, + result: Result<(), parent_lookup::RequestError>, + ) { + match result { Err(e) => { debug!(self.log, "Failed to request parent"; &parent_lookup, "error" => e.as_static()); match e { @@ -875,7 +1094,7 @@ impl BlockLookups { parent_lookup::RequestError::ChainTooLong => { self.failed_chains.insert(parent_lookup.chain_hash()); // This indicates faulty peers. - for &peer_id in parent_lookup.used_peers() { + for &peer_id in parent_lookup.used_block_peers() { cx.report_peer(peer_id, PeerAction::LowToleranceError, e.as_static()) } } @@ -888,7 +1107,7 @@ impl BlockLookups { self.failed_chains.insert(parent_lookup.chain_hash()); } // This indicates faulty peers. - for &peer_id in parent_lookup.used_peers() { + for &peer_id in parent_lookup.used_block_peers() { cx.report_peer(peer_id, PeerAction::LowToleranceError, e.as_static()) } } diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index c9b49c1907..e8d8951da4 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -1,17 +1,20 @@ -use super::RootBlockTuple; +use super::DownlodedBlocks; +use crate::sync::block_lookups::single_block_lookup::{RequestableThing, SingleBlobsRequest}; +use crate::sync::block_lookups::RootBlockTuple; use crate::sync::{ manager::{Id, SLOT_IMPORT_TOLERANCE}, network_context::SyncNetworkContext, }; -use beacon_chain::blob_verification::AsBlock; use beacon_chain::blob_verification::BlockWrapper; +use beacon_chain::blob_verification::{AsBlock, MaybeAvailableBlock}; use beacon_chain::BeaconChainTypes; use lighthouse_network::PeerId; +use std::iter; use std::sync::Arc; use store::Hash256; use strum::IntoStaticStr; -use types::{BlobSidecar, SignedBeaconBlock}; -use crate::sync::block_lookups::single_block_lookup::SingleBlobRequest; +use types::blob_sidecar::BlobIdentifier; +use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; use super::single_block_lookup::{self, SingleBlockRequest}; @@ -27,13 +30,13 @@ pub(crate) struct ParentLookup { /// The root of the block triggering this parent request. chain_hash: Hash256, /// The blocks that have currently been downloaded. - downloaded_blocks: Vec>, - downloaded_blobs: Vec>>>>, + downloaded_blocks: Vec>, /// Request of the last parent. - current_parent_request: SingleBlockRequest, - current_parent_blobs_request: SingleBlobRequest, + pub current_parent_request: SingleBlockRequest, /// Id of the last parent request. current_parent_request_id: Option, + pub current_parent_blob_request: Option>, + current_parent_blob_request_id: Option, } #[derive(Debug, PartialEq, Eq, IntoStaticStr)] @@ -64,34 +67,76 @@ impl ParentLookup { .any(|(root, _d_block)| root == block_root) } + pub fn contains_blob(&self, blob_id: &BlobIdentifier) -> bool { + self.downloaded_blocks + .iter() + .any(|(_root, block)| match block { + MaybeAvailableBlock::Available(_) => false, + MaybeAvailableBlock::AvailabilityPending(pending) => pending.has_blob(&blob_id), + }) + } + pub fn new( block_root: Hash256, - block_wrapper: BlockWrapper, + block: MaybeAvailableBlock, peer_id: PeerId, ) -> Self { - let (block, blobs) = block_wrapper.deconstruct(); + // if available, just add to downloaded blocks, + + // if maybe available, treat it as a single blob lookup that will be requested after + // this parent chain segment is processed + let current_parent_request = SingleBlockRequest::new(block.parent_root(), peer_id); - let current_parent_blobs_request = todo!(); + + let (current_parent_blob_request, current_blobs_request) = match block.as_ref() { + MaybeAvailableBlock::Available(available) => { + let current_parent_blob_request = if available.da_check_required() { + Some(SingleBlobsRequest::new_with_all_ids( + block.parent_root(), + peer_id, + )) + } else { + None + }; + (current_parent_blob_request, None) + } + MaybeAvailableBlock::AvailabilityPending(pending) => { + let parent_req = SingleBlobsRequest::new_with_all_ids(block.parent_root(), peer_id); + let current_req = + SingleBlobsRequest::new(pending.get_missing_blob_ids().clone(), peer_id); + (Some(parent_req), Some(current_req)) + } + }; Self { chain_hash: block_root, downloaded_blocks: vec![(block_root, block)], - downloaded_blobs: vec![blobs], current_parent_request, - current_parent_blobs_request, current_parent_request_id: None, + current_parent_blob_request, + current_parent_blob_request_id: None, } } + pub fn new_with_blobs_request( + block_root: Hash256, + block_wrapper: MaybeAvailableBlock, + peer_id: PeerId, + ) -> Self { + } + /// Attempts to request the next unknown parent. If the request fails, it should be removed. - pub fn request_parent(&mut self, cx: &mut SyncNetworkContext) -> Result<(), RequestError> { + pub fn request_parent_block( + &mut self, + cx: &mut SyncNetworkContext, + ) -> Result<(), RequestError> { // check to make sure this request hasn't failed if self.downloaded_blocks.len() >= PARENT_DEPTH_TOLERANCE { return Err(RequestError::ChainTooLong); } - let (peer_id, request) = self.current_parent_request.request_block()?; - match cx.parent_lookup_request(peer_id, request) { + let (peer_id, request) = self.current_parent_request.make_request()?; + match cx.parent_lookup_block_request(peer_id, request) { Ok(request_id) => { self.current_parent_request_id = Some(request_id); Ok(()) @@ -103,58 +148,120 @@ impl ParentLookup { } } - pub fn check_peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), ()> { + pub fn request_parent_blobs( + &mut self, + cx: &mut SyncNetworkContext, + ) -> Result<(), RequestError> { + if let Some(blob_req) = self.current_parent_blob_request.as_mut() { + // check to make sure this request hasn't failed + if self.downloaded_blocks.len() >= PARENT_DEPTH_TOLERANCE { + return Err(RequestError::ChainTooLong); + } + + let (peer_id, request) = blob_req.request_blobs()?; + match cx.parent_lookup_blobs_request(peer_id, request) { + Ok(request_id) => { + self.current_parent_blob_request_id = Some(request_id); + Ok(()) + } + Err(reason) => { + self.current_parent_blob_request_id = None; + Err(RequestError::SendFailed(reason)) + } + } + } + Ok(()) + } + + pub fn check_block_peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), ()> { self.current_parent_request.check_peer_disconnected(peer_id) } - pub fn add_block(&mut self, block_wrapper: BlockWrapper) { - let next_parent = block_wrapper.parent_root(); - let current_root = self.current_parent_request.requested_thing; - let (block, blobs) = block_wrapper.deconstruct(); - self.downloaded_blocks.push((current_root, block)); - self.downloaded_blobs.push(blobs); - self.current_parent_request.requested_thing = next_parent; - self.current_parent_request.state = single_block_lookup::State::AwaitingDownload; - self.current_parent_request_id = None; + pub fn check_blob_peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), ()> { + self.current_parent_blob_request + .map(|mut req| req.check_peer_disconnected(peer_id)) + .unwrap_or_default() } - pub fn pending_response(&self, req_id: Id) -> bool { + pub fn add_block(&mut self, block: MaybeAvailableBlock) { + let next_parent = block.parent_root(); + let current_root = self.current_parent_request.requested_thing; + + self.downloaded_blocks.push((current_root, block)); + + // Block request updates + self.current_parent_request.requested_block_root = next_parent; + self.current_parent_request.request_state.state = + single_block_lookup::State::AwaitingDownload; + self.current_parent_request_id = None; + + // Blob request updates + if let Some(blob_req) = self.current_parent_blob_request.as_mut() { + let mut all_ids = Vec::with_capacity(T::EthSpec::max_blobs_per_block()); + for i in 0..T::EthSpec::max_blobs_per_block() { + all_ids.push(BlobIdentifier { + block_root: next_parent, + index: i as u64, + }); + } + blob_req.requested_ids = all_ids; + blob_req.request_state.state = single_block_lookup::State::AwaitingDownload; + } + self.current_parent_blob_request_id = None; + } + + pub fn add_blobs(&mut self, blobs: Vec) { + self.current_parent_blob_request.map_or_else( + SingleBlobsRequest::new(blobs, peer_id), + |mut req| { + req.requested_thing = next_parent; + req.state = single_block_lookup::State::AwaitingDownload; + }, + ); + self.current_parent_blob_request_id = None; + } + + pub fn pending_block_response(&self, req_id: Id) -> bool { self.current_parent_request_id == Some(req_id) } + pub fn pending_blob_response(&self, req_id: Id) -> bool { + self.current_parent_blob_request_id == Some(req_id) + } + /// Consumes the parent request and destructures it into it's parts. #[allow(clippy::type_complexity)] pub fn parts_for_processing( self, ) -> ( Hash256, - Vec>, + Vec>, Vec, SingleBlockRequest, + Option>, ) { let ParentLookup { chain_hash, downloaded_blocks, - downloaded_blobs, current_parent_request, - current_parent_blobs_request, current_parent_request_id: _, + current_parent_blob_request, + current_parent_request_id: _, + current_parent_blob_request_id: _, } = self; let block_count = downloaded_blocks.len(); let mut blocks = Vec::with_capacity(block_count); let mut hashes = Vec::with_capacity(block_count); - for ((hash, block), blobs) in downloaded_blocks - .into_iter() - .zip(downloaded_blobs.into_iter()) - { - let wrapped_block = if let Some(blobs) = blobs { - BlockWrapper::BlockAndBlobs(block, blobs) - } else { - BlockWrapper::Block(block) - }; - blocks.push(wrapped_block); + for (hash, block) in downloaded_blocks.into_iter() { + blocks.push(block); hashes.push(hash); } - (chain_hash, blocks, hashes, current_parent_request) + ( + chain_hash, + blocks, + hashes, + current_parent_request, + current_parent_blob_request, + ) } /// Get the parent lookup's chain hash. @@ -162,16 +269,28 @@ impl ParentLookup { self.chain_hash } - pub fn download_failed(&mut self) { + pub fn block_download_failed(&mut self) { self.current_parent_request.register_failure_downloading(); self.current_parent_request_id = None; } - pub fn processing_failed(&mut self) { + pub fn block_processing_failed(&mut self) { self.current_parent_request.register_failure_processing(); self.current_parent_request_id = None; } + pub fn blob_download_failed(&mut self) { + self.current_parent_blob_request + .map(|mut req| req.register_failure_downloading()); + self.current_parent_blob_request_id = None; + } + + pub fn blob_processing_failed(&mut self) { + self.current_parent_blob_request + .map(|mut req| req.register_failure_processing()); + self.current_parent_blob_request_id = None; + } + /// Verifies that the received block is what we requested. If so, parent lookup now waits for /// the processing result of the block. pub fn verify_block( @@ -197,7 +316,36 @@ impl ParentLookup { Ok(root_and_block) } - pub fn get_processing_peer(&self, chain_hash: Hash256) -> Option { + pub fn verify_blob( + &mut self, + blob: Option>>, + failed_chains: &mut lru_cache::LRUTimeCache, + ) -> Result>>>, VerifyError> { + let blobs = self + .current_parent_blob_request + .map(|mut req| req.verify_blob(blob)) + .transpose()? + .flatten(); + + // check if the parent of this block isn't in the failed cache. If it is, this chain should + // be dropped and the peer downscored. + if let Some(parent_root) = blobs + .as_ref() + .and_then(|blobs| blobs.first()) + .map(|blob| blob.block_parent_root) + { + if failed_chains.contains(&parent_root) { + self.current_parent_blob_request + .register_failure_downloading(); + self.current_parent_blob_request_id = None; + return Err(VerifyError::PreviousFailure { parent_root }); + } + } + + Ok(blobs) + } + + pub fn get_block_processing_peer(&self, chain_hash: Hash256) -> Option { if self.chain_hash == chain_hash { return self.current_parent_request.processing_peer().ok(); } @@ -205,17 +353,42 @@ impl ParentLookup { } #[cfg(test)] - pub fn failed_attempts(&self) -> u8 { + pub fn failed_block_attempts(&self) -> u8 { self.current_parent_request.failed_attempts() } - pub fn add_peer(&mut self, block_root: &Hash256, peer_id: &PeerId) -> bool { + pub fn add_block_peer(&mut self, block_root: &Hash256, peer_id: &PeerId) -> bool { self.current_parent_request.add_peer(block_root, peer_id) } - pub fn used_peers(&self) -> impl Iterator + '_ { + pub fn used_block_peers(&self) -> impl Iterator + '_ { self.current_parent_request.used_peers.iter() } + + pub fn get_blob_processing_peer(&self, chain_hash: Hash256) -> Option { + if self.chain_hash == chain_hash { + return self + .current_parent_blob_request + .and_then(|req| req.processing_peer().ok()); + } + None + } + + #[cfg(test)] + pub fn failed_blob_attempts(&self) -> u8 { + self.current_parent_blob_request + .map_or(0, |req| req.failed_attempts()) + } + + pub fn add_blobs_peer(&mut self, blobs: &[BlobIdentifier], peer_id: &PeerId) -> bool { + self.current_parent_blob_request + .map_or(false, |mut req| req.add_peer(blobs, peer_id)) + } + + pub fn used_blob_peers(&self) -> impl Iterator + '_ { + self.current_parent_blob_request + .map_or(iter::empty(), |req| req.used_peers.iter()) + } } impl From for VerifyError { diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index f429ee8edc..35c1323954 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -1,7 +1,10 @@ -use super::RootBlockTuple; -use beacon_chain::blob_verification::AsBlock; +use super::DownlodedBlocks; +use crate::sync::block_lookups::RootBlockTuple; +use crate::sync::network_context::SyncNetworkContext; use beacon_chain::blob_verification::BlockWrapper; +use beacon_chain::blob_verification::{AsBlock, MaybeAvailableBlock}; use beacon_chain::get_block_root; +use lighthouse_network::rpc::methods::BlobsByRootRequest; use lighthouse_network::{rpc::BlocksByRootRequest, PeerId, Request}; use rand::seq::IteratorRandom; use ssz_types::VariableList; @@ -9,20 +12,26 @@ use std::collections::HashSet; use std::sync::Arc; use store::{EthSpec, Hash256}; use strum::IntoStaticStr; -use lighthouse_network::rpc::methods::BlobsByRootRequest; use types::blob_sidecar::BlobIdentifier; use types::{BlobSidecar, SignedBeaconBlock}; -pub type SingleBlockRequest = SingleLookupRequest; -pub type SingleBlobRequest = SingleLookupRequest>; +pub struct SingleBlockRequest { + pub requested_block_root: Hash256, + pub downloaded_block: Option<(Hash256, MaybeAvailableBlock)>, + pub request_state: SingleLookupRequestState, +} + +pub struct SingleBlobsRequest { + pub requested_ids: Vec, + pub downloaded_blobs: Vec>>, + pub request_state: SingleLookupRequestState, +} /// Object representing a single block lookup request. /// //previously assumed we would have a single block. Now we may have the block but not the blobs #[derive(PartialEq, Eq)] -pub struct SingleLookupRequest { - /// The hash of the requested block. - pub requested_thing: T, +pub struct SingleLookupRequestState { /// State of this request. pub state: State, /// Peers that should have this block. @@ -35,62 +44,6 @@ pub struct SingleLookupRequest { failed_downloading: u8, } -pub trait RequestableThing { - type Request; - type Response; - type WrappedResponse; - fn verify_response(&self, response: &Self::Response) -> bool; - fn make_request(&self) -> Self::Request; - fn wrapped_response(&self, response: Self::Response) -> Self::WrappedResponse; - fn is_useful(&self, other: &Self) -> bool; -} - -impl RequestableThing for Hash256 { - type Request = BlocksByRootRequest; - type Response = Arc>; - type WrappedResponse = RootBlockTuple; - fn verify_response(&self, response: &Self::Response) -> bool{ - // Compute the block root using this specific function so that we can get timing - // metrics. - let block_root = get_block_root(response); - *self == block_root - } - fn make_request(&self) -> Self::Request{ - let request = BlocksByRootRequest { - block_roots: VariableList::from(vec![*self]), - }; - request - } - fn wrapped_response(&self, response: Self::Response) -> Self::WrappedResponse { - (*self, response) - } - - fn is_useful(&self, other: &Self) -> bool { - self == other - } -} - -impl RequestableThing for Vec{ - type Request = BlobsByRootRequest; - type Response = Arc>; - type WrappedResponse = Arc>; - - fn verify_response(&self, response: &Self::Response) -> bool{ - true - } - fn make_request(&self) -> Self::Request{ - todo!() - } - - fn wrapped_response(&self, response: Self::Response) -> Self::WrappedResponse { - response - } - - fn is_useful(&self, other: &Self) -> bool { - todo!() - } -} - #[derive(Debug, PartialEq, Eq)] pub enum State { AwaitingDownload, @@ -115,10 +68,198 @@ pub enum LookupRequestError { NoPeers, } -impl SingleLookupRequest { - pub fn new(requested_thing: T, peer_id: PeerId) -> Self { +impl SingleBlockRequest { + pub fn new(requested_block_root: Hash256, peer_id: PeerId) -> Self { + Self { + requested_block_root, + downloaded_block: None, + request_state: SingleLookupRequestState::new(peer_id), + } + } + + /// Verifies if the received block matches the requested one. + /// Returns the block for processing if the response is what we expected. + pub fn verify_block( + &mut self, + block: Option>>, + ) -> Result>, VerifyError> { + match self.request_state.state { + State::AwaitingDownload => { + self.request_state.register_failure_downloading(); + Err(VerifyError::ExtraBlocksReturned) + } + State::Downloading { peer_id } => match block { + Some(block) => { + // Compute the block root using this specific function so that we can get timing + // metrics. + let block_root = get_block_root(&block); + if block_root != self.requested_block_root { + // return an error and drop the block + // NOTE: we take this is as a download failure to prevent counting the + // attempt as a chain failure, but simply a peer failure. + self.request_state.register_failure_downloading(); + Err(VerifyError::RootMismatch) + } else { + // Return the block for processing. + self.request_state.state = State::Processing { peer_id }; + Ok(Some((block_root, block))) + } + } + None => { + self.register_failure_downloading(); + Err(VerifyError::NoBlockReturned) + } + }, + State::Processing { peer_id: _ } => match block { + Some(_) => { + // We sent the block for processing and received an extra block. + self.request_state.register_failure_downloading(); + Err(VerifyError::ExtraBlocksReturned) + } + None => { + // This is simply the stream termination and we are already processing the + // block + Ok(None) + } + }, + } + } + + pub fn request_block(&mut self) -> Result<(PeerId, BlocksByRootRequest), LookupRequestError> { + debug_assert!(matches!(self.request_state.state, State::AwaitingDownload)); + if self.failed_attempts() >= MAX_ATTEMPTS { + Err(LookupRequestError::TooManyAttempts { + cannot_process: self.request_state.failed_processing + >= self.request_state.failed_downloading, + }) + } else if let Some(&peer_id) = self + .request_state + .available_peers + .iter() + .choose(&mut rand::thread_rng()) + { + let request = BlocksByRootRequest { + block_roots: VariableList::from(vec![self.requested_block_root]), + }; + self.request_state.state = State::Downloading { peer_id }; + self.request_state.used_peers.insert(peer_id); + Ok((peer_id, request)) + } else { + Err(LookupRequestError::NoPeers) + } + } + + pub fn add_peer_if_useful(&mut self, block_root: &Hash256, peer_id: &PeerId) -> bool { + let is_useful = self.requested_block_root == *block_root; + if is_useful { + self.request_state.add_peer(peer_id); + } + is_useful + } +} + +impl SingleBlobsRequest { + pub fn new(blob_ids: Vec, peer_id: PeerId) -> Self { + Self { + requested_ids: blob_ids, + downloaded_blobs: vec![], + request_state: SingleLookupRequestState::new(peer_id), + } + } + + pub fn new_with_all_ids(block_root: Hash256, peer_id: PeerId) -> Self { + let mut ids = Vec::with_capacity(T::max_blobs_per_block()); + for i in 0..T::max_blobs_per_block() { + ids.push(BlobIdentifier { + block_root, + index: i as u64, + }); + } + + Self { + requested_ids: ids, + downloaded_blobs: vec![], + request_state: SingleLookupRequestState::new(peer_id), + } + } + + pub fn verify_blob( + &mut self, + blob: Option>>, + ) -> Result>>>, VerifyError> { + match self.request_state.state { + State::AwaitingDownload => { + self.request_state.register_failure_downloading(); + Err(VerifyError::ExtraBlocksReturned) + } + State::Downloading { peer_id } => match blob { + Some(blob) => { + let received_id = blob.id(); + if !self.requested_ids.contains(&received_id) { + self.request_state.register_failure_downloading(); + Err(VerifyError::RootMismatch) + } else { + // state should still be downloading + self.requested_ids.retain(|id| id != received_id); + self.downloaded_blobs.push(blob) + } + } + None => { + self.request_state.state = State::Processing { peer_id }; + Ok(Some(self.downloaded_blobs.clone())) + } + }, + State::Processing { peer_id: _ } => match block { + Some(_) => { + // We sent the block for processing and received an extra block. + self.request_state.register_failure_downloading(); + Err(VerifyError::ExtraBlocksReturned) + } + None => { + // This is simply the stream termination and we are already processing the + // block + Ok(None) + } + }, + } + } + + pub fn request_blobs(&mut self) -> Result<(PeerId, BlobsByRootRequest), LookupRequestError> { + debug_assert!(matches!(self.request_state.state, State::AwaitingDownload)); + if self.failed_attempts() >= MAX_ATTEMPTS { + Err(LookupRequestError::TooManyAttempts { + cannot_process: self.request_state.failed_processing + >= self.request_state.failed_downloading, + }) + } else if let Some(&peer_id) = self + .request_state + .available_peers + .iter() + .choose(&mut rand::thread_rng()) + { + let request = BlobsByRootRequest { + blob_ids: VariableList::from(self.requested_ids), + }; + self.request_state.state = State::Downloading { peer_id }; + self.request_state.used_peers.insert(peer_id); + Ok((peer_id, request)) + } else { + Err(LookupRequestError::NoPeers) + } + } + + pub fn add_peer_if_useful(&mut self, blob_id: &BlobIdentifier, peer_id: &PeerId) -> bool { + let is_useful = self.requested_ids.contains(blob_id); + if is_useful { + self.request_state.add_peer(peer_id); + } + is_useful + } +} + +impl SingleLookupRequestState { + pub fn new(peer_id: PeerId) -> Self { Self { - requested_thing, state: State::AwaitingDownload, available_peers: HashSet::from([peer_id]), used_peers: HashSet::default(), @@ -145,12 +286,8 @@ impl SingleLookupRequest bool { - let is_useful = self.requested_thing.is_useful(requested_thing); - if is_useful { - self.available_peers.insert(*peer_id); - } - is_useful + pub fn add_peer(&mut self, peer_id: &PeerId) -> bool { + self.available_peers.insert(*peer_id) } /// If a peer disconnects, this request could be failed. If so, an error is returned @@ -166,68 +303,6 @@ impl SingleLookupRequest( - &mut self, - block: Option>, - ) -> Result>, VerifyError> { - match self.state { - State::AwaitingDownload => { - self.register_failure_downloading(); - Err(VerifyError::ExtraBlocksReturned) - } - State::Downloading { peer_id } => match block { - Some(block) => { - if self.requested_thing.verify_response(&block) { - // return an error and drop the block - // NOTE: we take this is as a download failure to prevent counting the - // attempt as a chain failure, but simply a peer failure. - self.register_failure_downloading(); - Err(VerifyError::RootMismatch) - } else { - // Return the block for processing. - self.state = State::Processing { peer_id }; - Ok(Some(self.requested_thing.wrapped_response(block))) - } - } - None => { - self.register_failure_downloading(); - Err(VerifyError::NoBlockReturned) - } - }, - State::Processing { peer_id: _ } => match block { - Some(_) => { - // We sent the block for processing and received an extra block. - self.register_failure_downloading(); - Err(VerifyError::ExtraBlocksReturned) - } - None => { - // This is simply the stream termination and we are already processing the - // block - Ok(None) - } - }, - } - } - - pub fn request_block(&mut self) -> Result<(PeerId, T::Request), LookupRequestError> { - debug_assert!(matches!(self.state, State::AwaitingDownload)); - if self.failed_attempts() >= MAX_ATTEMPTS { - Err(LookupRequestError::TooManyAttempts { - cannot_process: self.failed_processing >= self.failed_downloading, - }) - } else if let Some(&peer_id) = self.available_peers.iter().choose(&mut rand::thread_rng()) { - let request = self.requested_thing.make_request(); - - self.state = State::Downloading { peer_id }; - self.used_peers.insert(peer_id); - Ok((peer_id, request)) - } else { - Err(LookupRequestError::NoPeers) - } - } - pub fn processing_peer(&self) -> Result { if let State::Processing { peer_id } = &self.state { Ok(*peer_id) @@ -287,8 +362,8 @@ mod tests { let block = rand_block(); let mut sl = SingleBlockRequest::<4>::new(block.canonical_root(), peer_id); - sl.request_block().unwrap(); - sl.verify_block(Some(block.into())).unwrap().unwrap(); + sl.make_request().unwrap(); + sl.verify_response(Some(block.into())).unwrap().unwrap(); } #[test] @@ -299,18 +374,18 @@ mod tests { let mut sl = SingleBlockRequest::::new(block.canonical_root(), peer_id); for _ in 1..FAILURES { - sl.request_block().unwrap(); + sl.make_request().unwrap(); sl.register_failure_downloading(); } // Now we receive the block and send it for processing - sl.request_block().unwrap(); - sl.verify_block(Some(block.into())).unwrap().unwrap(); + sl.make_request().unwrap(); + sl.verify_response(Some(block.into())).unwrap().unwrap(); // One processing failure maxes the available attempts sl.register_failure_processing(); assert_eq!( - sl.request_block(), + sl.make_request(), Err(LookupRequestError::TooManyAttempts { cannot_process: false }) diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index d7eca40fc2..cb8c1af46e 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -438,7 +438,7 @@ fn test_parent_lookup_too_many_attempts() { } } if i < parent_lookup::PARENT_FAIL_TOLERANCE { - assert_eq!(bl.parent_lookups[0].failed_attempts(), dbg!(i)); + assert_eq!(bl.parent_lookups[0].failed_block_attempts(), dbg!(i)); } } @@ -477,7 +477,7 @@ fn test_parent_lookup_too_many_download_attempts_no_blacklist() { rig.expect_penalty(); } if i < parent_lookup::PARENT_FAIL_TOLERANCE { - assert_eq!(bl.parent_lookups[0].failed_attempts(), dbg!(i)); + assert_eq!(bl.parent_lookups[0].failed_block_attempts(), dbg!(i)); } } @@ -607,7 +607,7 @@ fn test_single_block_lookup_ignored_response() { // after processing. bl.single_block_lookup_response(id, peer_id, None, D, &mut cx); // Send an Ignored response, the request should be dropped - bl.single_block_processed(id, BlockProcessResult::Ignored, &mut cx); + bl.single_block_processed(id, BlockOrBlobProcessResult::Ignored, &mut cx); rig.expect_empty_network(); assert_eq!(bl.single_block_lookups.len(), 0); } @@ -631,7 +631,7 @@ fn test_parent_lookup_ignored_response() { rig.expect_empty_network(); // Return an Ignored result. The request should be dropped - bl.parent_block_processed(chain_hash, BlockProcessResult::Ignored, &mut cx); + bl.parent_block_processed(chain_hash, BlockOrBlobProcessResult::Ignored, &mut cx); rig.expect_empty_network(); assert_eq!(bl.parent_lookups.len(), 0); } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index ba64dafc87..bcebbcd368 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -42,8 +42,8 @@ use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEven use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::range_sync::ByRangeRequestType; -use beacon_chain::blob_verification::AsBlock; use beacon_chain::blob_verification::BlockWrapper; +use beacon_chain::blob_verification::{AsBlock, MaybeAvailableBlock}; use beacon_chain::{ AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, EngineState, }; @@ -117,7 +117,7 @@ pub enum SyncMessage { }, /// A block with an unknown parent has been received. - UnknownBlock(PeerId, BlockWrapper, Hash256), + UnknownBlock(PeerId, MaybeAvailableBlock, Hash256), /// A peer has sent an attestation that references a block that is unknown. This triggers the /// manager to attempt to find the block matching the unknown hash. @@ -154,15 +154,9 @@ pub enum SyncMessage { }, /// Block processed - BlockProcessed { + BlockOrBlobProcessed { process_type: BlockProcessType, - result: BlockProcessResult, - }, - - /// Block processed - BlobProcessed { - process_type: BlockProcessType, - result: Result>, + result: BlockOrBlobProcessResult, }, } @@ -174,7 +168,7 @@ pub enum BlockProcessType { } #[derive(Debug)] -pub enum BlockProcessResult { +pub enum BlockOrBlobProcessResult { Ok(AvailabilityProcessingStatus), Err(BlockError), Ignored, @@ -311,7 +305,7 @@ impl SyncManager { } RequestId::ParentLookup { id } => { self.block_lookups - .parent_lookup_failed(id, peer_id, &mut self.network, error); + .parent_lookup_failed(id, peer_id, &mut self.network, eror); } RequestId::BackFillBlocks { id } => { if let Some(batch_id) = self @@ -618,6 +612,7 @@ impl SyncManager { if self.synced_and_connected(&peer_id) { self.block_lookups .search_block(block_hash, peer_id, &mut self.network); + //TODO(sean) we could always request all blobs at this point } } SyncMessage::UnknownBlockHashFromGossipBlob(peer_id, block_hash, delay) => { @@ -656,7 +651,7 @@ impl SyncManager { request_id, error, } => self.inject_error(peer_id, request_id, error), - SyncMessage::BlockProcessed { + SyncMessage::BlockOrBlobProcessed { process_type, result, } => match process_type { @@ -668,18 +663,6 @@ impl SyncManager { .block_lookups .parent_block_processed(chain_hash, result, &mut self.network), }, - SyncMessage::BlobProcessed { - process_type, - result, - } => match process_type { - BlockProcessType::SingleBlock { id } => { - self.block_lookups - .single_blob_processed(id, result.into(), &mut self.network) - } - BlockProcessType::ParentLookup { chain_hash } => self - .block_lookups - .parent_blob_processed(chain_hash, result.into(), &mut self.network), - }, SyncMessage::BatchProcessed { sync_type, result } => match sync_type { ChainSegmentProcessId::RangeBatchId(chain_id, epoch, _) => { self.range_sync.handle_block_process_result( @@ -981,18 +964,18 @@ impl SyncManager { } impl From>> - for BlockProcessResult + for BlockOrBlobProcessResult { fn from(result: Result>) -> Self { match result { - Ok(status) => BlockProcessResult::Ok(status), - Err(e) => BlockProcessResult::Err(e), + Ok(status) => BlockOrBlobProcessResult::Ok(status), + Err(e) => BlockOrBlobProcessResult::Err(e), } } } -impl From> for BlockProcessResult { +impl From> for BlockOrBlobProcessResult { fn from(e: BlockError) -> Self { - BlockProcessResult::Err(e) + BlockOrBlobProcessResult::Err(e) } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 840e94f3b3..e86499e91c 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -20,7 +20,7 @@ use std::collections::hash_map::Entry; use std::sync::Arc; use tokio::sync::mpsc; use types::blob_sidecar::BlobIdentifier; -use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; +use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock}; pub struct BlocksAndBlobsByRangeResponse { pub batch_id: BatchId, @@ -411,9 +411,7 @@ impl SyncNetworkContext { } } - // TODO(sean) add single blob lookup + parent lookup request methods - - /// Sends a blocks by root request for a single block lookup. + /// Sends a blocks by root request for a parent request. pub fn single_block_lookup_request( &mut self, peer_id: PeerId, @@ -421,42 +419,6 @@ impl SyncNetworkContext { ) -> Result { let id = self.next_id(); let request_id = RequestId::Sync(SyncRequestId::SingleBlock { id }); - if self - .chain - .is_data_availability_check_required() - .map_err(|_| "Unable to read slot clock")? - { - let mut blob_ids = - VariableList::new(vec![]).map_err(|_| "could not create blob request list")?; - for block_root in request.block_roots.iter() { - // Request all blobs because we have no way of knowing how many blobs exist for a block - // until we have the block. - let blobs_per_block = T::EthSpec::max_blobs_per_block(); - for i in 0..blobs_per_block { - blob_ids - .push(BlobIdentifier { - block_root: *block_root, - index: i as u64, - }) - .map_err(|_| "too many blobs in by root request")?; - } - } - - let blobs_count = blob_ids.len(); - let blob_request = Request::BlobsByRoot(BlobsByRootRequest { blob_ids }); - self.send_network_msg(NetworkMessage::SendRequest { - peer_id, - request: blob_request, - request_id, - })?; - trace!( - self.log, - "Sending BlobsByRoot Request"; - "method" => "BlobsByRoot", - "count" => blobs_count, - "peer" => %peer_id - ); - } trace!( self.log, @@ -474,54 +436,43 @@ impl SyncNetworkContext { Ok(id) } + /// Sends a blobs by root request for a parent request. + pub fn single_blobs_lookup_request( + &mut self, + peer_id: PeerId, + request: BlobsByRootRequest, + ) -> Result { + let id = self.next_id(); + let request_id = RequestId::Sync(SyncRequestId::SingleBlock { id }); + + trace!( + self.log, + "Sending BlobsByRoot Request"; + "method" => "BlobsByRoot", + "count" => request.blob_ids.len(), + "peer" => %peer_id + ); + + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request: Request::BlobsByRoot(request), + request_id, + })?; + Ok(id) + } + /// Sends a blocks by root request for a parent request. - pub fn parent_lookup_request( + pub fn parent_lookup_block_request( &mut self, peer_id: PeerId, request: BlocksByRootRequest, ) -> Result { let id = self.next_id(); let request_id = RequestId::Sync(SyncRequestId::ParentLookup { id }); - if self - .chain - .is_data_availability_check_required() - .map_err(|_| "Unable to read slot clock")? - { - let mut blob_ids = - VariableList::new(vec![]).map_err(|e| "could not create blob request list")?; - for block_root in request.block_roots.iter() { - // Request all blobs because we have no way of knowing how many blobs exist for a block - // until we have the block. - let blobs_per_block = T::EthSpec::max_blobs_per_block(); - for i in 0..blobs_per_block { - blob_ids - .push(BlobIdentifier { - block_root: *block_root, - index: i as u64, - }) - .map_err(|_| "too many blobs in by root request")?; - } - } - - let blobs_count = blob_ids.len(); - let blob_request = Request::BlobsByRoot(BlobsByRootRequest { blob_ids }); - self.send_network_msg(NetworkMessage::SendRequest { - peer_id, - request: blob_request, - request_id, - })?; - trace!( - self.log, - "Sending BlobsByRoot Request"; - "method" => "BlobsByRoot", - "count" => blobs_count, - "peer" => %peer_id - ); - } trace!( self.log, - "Sending BlocksByRoot Request"; + "Sending parent BlocksByRoot Request"; "method" => "BlocksByRoot", "count" => request.block_roots.len(), "peer" => %peer_id @@ -535,6 +486,31 @@ impl SyncNetworkContext { Ok(id) } + /// Sends a blocks by root request for a parent request. + pub fn parent_lookup_blobs_request( + &mut self, + peer_id: PeerId, + request: BlobsByRootRequest, + ) -> Result { + let id = self.next_id(); + let request_id = RequestId::Sync(SyncRequestId::ParentLookup { id }); + + trace!( + self.log, + "Sending BlobsByRoot Request"; + "method" => "BlobsByRoot", + "count" => request.blob_ids.len(), + "peer" => %peer_id + ); + + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request: Request::BlobsByRoot(request), + request_id, + })?; + Ok(id) + } + pub fn is_execution_engine_online(&self) -> bool { self.execution_engine_state == EngineState::Online }