From 8618c301b5f6e21d63bf7ee50b38c705d4d386c1 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Fri, 14 Apr 2023 16:50:41 -0400 Subject: [PATCH] add delayed processing logic and combine some requests --- beacon_node/beacon_chain/src/beacon_chain.rs | 12 +- .../beacon_chain/src/blob_verification.rs | 11 + .../src/data_availability_checker.rs | 73 +++- beacon_node/http_api/src/publish_blocks.rs | 13 +- .../beacon_processor/worker/gossip_methods.rs | 30 +- .../network/src/sync/block_lookups/mod.rs | 411 +++++++----------- .../src/sync/block_lookups/parent_lookup.rs | 110 ++--- .../sync/block_lookups/single_block_lookup.rs | 182 ++++---- beacon_node/network/src/sync/manager.rs | 122 ++++-- 9 files changed, 445 insertions(+), 519 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 85b19b32a0..72fdf96cb6 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -190,8 +190,7 @@ pub enum WhenSlotSkipped { #[derive(Debug, PartialEq)] pub enum AvailabilityProcessingStatus { - PendingBlobs(Hash256, Vec), - PendingBlock(Hash256), + MissingParts(Hash256), Imported(Hash256), } @@ -2671,9 +2670,8 @@ impl BeaconChain { AvailabilityProcessingStatus::Imported(_) => { // The block was imported successfully. } - AvailabilityProcessingStatus::PendingBlobs(block_root, blobs) => {} - AvailabilityProcessingStatus::PendingBlock(_) => { - // doesn't makes sense + AvailabilityProcessingStatus::MissingParts(block_root) => { + //TODO(sean) fail } } } @@ -2777,8 +2775,8 @@ impl BeaconChain { block_root: Hash256, unverified_block: B, count_unrealized: CountUnrealized, - notify_execution_layer: NotifyExecutionLayer, - ) -> Result> { + notify_execution_layer: notifyexecutionlayer, + ) -> result> { // Start the Prometheus timer. let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES); diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index 531e376e88..70c6fbe375 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -372,6 +372,17 @@ pub enum MaybeAvailableBlock { AvailabilityPending(AvailabilityPendingBlock), } +impl MaybeAvailableBlock { + pub fn get_missing_blob_ids(&self) -> Option<&Vec> { + match self { + MaybeAvailableBlock::Available(_) => None, + MaybeAvailableBlock::AvailabilityPending(pending_block) => { + Some(pending_block.get_missing_blob_ids()) + } + } + } +} + /// Trait for common block operations. pub trait AsBlock { fn slot(&self) -> Slot; diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 80ebc753d0..aea1907dd5 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -2,7 +2,9 @@ use crate::blob_verification::{ verify_kzg_for_blob, verify_kzg_for_blob_list, AsBlock, BlockWrapper, GossipVerifiedBlob, KzgVerifiedBlob, KzgVerifiedBlobList, MaybeAvailableBlock, }; -use crate::block_verification::{AvailabilityPendingExecutedBlock, AvailableExecutedBlock}; +use crate::block_verification::{ + AvailabilityPendingExecutedBlock, AvailableExecutedBlock, IntoExecutionPendingBlock, +}; use kzg::Error as KzgError; use kzg::Kzg; @@ -11,7 +13,7 @@ use slot_clock::SlotClock; use ssz_types::{Error, FixedVector, VariableList}; use state_processing::per_block_processing::deneb::deneb::verify_kzg_commitments_against_transactions; use std::collections::hash_map::{Entry, OccupiedEntry}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::ops::Index; use std::sync::Arc; use types::beacon_block_body::KzgCommitments; @@ -19,7 +21,7 @@ use types::blob_sidecar::{BlobIdentifier, BlobSidecar}; use types::consts::deneb::MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS; use types::{ BeaconBlockRef, BlobSidecarList, ChainSpec, Epoch, EthSpec, ExecPayload, FullPayload, Hash256, - SignedBeaconBlock, SignedBeaconBlockHeader, Slot, + SignedBeaconBlock, SignedBeaconBlockHeader, SignedBlobSidecar, Slot, }; #[derive(Debug)] @@ -147,6 +149,56 @@ impl DataAvailabilityChecker { } } + pub fn zip_block( + &self, + block_root: Hash256, + block: Arc>, + blobs: Vec>>, + ) -> Result, AvailabilityCheckError> { + Ok(match self.get_blob_requirements(&block)? { + BlobRequirements::EmptyBlobs => BlockWrapper::Block(block), + BlobRequirements::NotRequired => BlockWrapper::Block(block), + BlobRequirements::PreDeneb => BlockWrapper::Block(block), + BlobRequirements::Required => { + let expected_num_blobs = block + .block() + .message() + .body() + .blob_kzg_commitments() + .map(|commitments| commitments.len()) + .unwrap_or(0); + let mut expected_indices: HashSet = + (0..expected_num_blobs).into_iter().collect(); + if blobs.len() < expected_num_blobs { + return Err(AvailabilityCheckError::NumBlobsMismatch { + num_kzg_commitments: expected_num_blobs, + num_blobs: blobs.len(), + }); + } + for blob in blobs { + if blob.block_root != block_root { + return Err(AvailabilityCheckError::BlockBlobRootMismatch { + block_root, + blob_block_root: blob.block_root, + }); + } + let removed = expected_indices.remove(&(blob.index as usize)); + if !removed { + return Err(AvailabilityCheckError::MissingBlobs); + } + } + + if !expected_indices.is_empty() { + return Err(AvailabilityCheckError::DuplicateBlob(block_root)); + } + + //TODO(sean) do we re-order blobs here to the correct order? + + BlockWrapper::BlockAndBlobs(block, blobs) + } + }) + } + /// Get a blob from the availability cache. pub fn get_blob(&self, blob_id: &BlobIdentifier) -> Option>> { self.availability_cache @@ -426,7 +478,7 @@ impl DataAvailabilityChecker { BlobRequirements::PreDeneb => VerifiedBlobs::PreDeneb, BlobRequirements::Required => { return Ok(MaybeAvailableBlock::AvailabilityPending( - AvailabilityPendingBlock { block, blobs }, + AvailabilityPendingBlock { block }, )) } }; @@ -547,22 +599,9 @@ pub enum BlobRequirements { #[derive(Clone, Debug, PartialEq)] pub struct AvailabilityPendingBlock { block: Arc>, - missing_blob_ids: Vec, } impl AvailabilityPendingBlock { - pub fn get_missing_blob_ids(&self) -> &Vec { - &self.missing_blob_ids - } - - pub fn has_blob(mut self, blob_id: &BlobIdentifier) -> bool { - if let Some(Some(blob)) = self.blobs.get(blob_id.index as usize) { - blob.block_root == blob_id.block_root - } else { - false - } - } - pub fn num_blobs_expected(&self) -> usize { self.kzg_commitments() .map_or(0, |commitments| commitments.len()) diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 5c759cdb2a..acc58e202b 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -136,17 +136,8 @@ pub async fn publish_block( Ok(()) } - Ok(AvailabilityProcessingStatus::PendingBlock(block_root)) => { - let msg = format!("Missing block with root {:?}", block_root); - error!( - log, - "Invalid block provided to HTTP API"; - "reason" => &msg - ); - Err(warp_utils::reject::broadcast_without_import(msg)) - } - Ok(AvailabilityProcessingStatus::PendingBlobs(_, blob_ids)) => { - let msg = format!("Missing blobs {:?}", blob_ids); + Ok(AvailabilityProcessingStatus::MissingParts(block_root)) => { + let msg = format!("Missing parts of block with root {:?}", block_root); error!( log, "Invalid block provided to HTTP API"; diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 07bf3e8663..3ff5adc13c 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -691,18 +691,9 @@ impl Worker { // add to metrics // logging } - Ok(AvailabilityProcessingStatus::PendingBlobs(block_root, pending_blobs)) => self - .send_sync_message(SyncMessage::MissingBlobs { - peer_id, - block_root, - pending_blobs, - search_delay: Duration::from_secs(0), //TODO(sean) update - }), - Ok(AvailabilityProcessingStatus::PendingBlock(block_hash)) => { + Ok(AvailabilityProcessingStatus::MissingParts(block_hash)) => { self.send_sync_message(SyncMessage::UnknownBlockHashFromGossipBlob( - peer_id, - block_hash, - Duration::from_secs(0), + peer_id, block_hash, )); //TODO(sean) update } Err(_err) => { @@ -1054,22 +1045,13 @@ impl Worker { self.chain.recompute_head_at_current_slot().await; } - Ok(AvailabilityProcessingStatus::PendingBlock(block_root)) => { - // This error variant doesn't make any sense in this context - crit!( - self.log, - "Internal error. Cannot get AvailabilityProcessingStatus::PendingBlock on processing block"; - "block_root" => %block_root - ); - } - Ok(AvailabilityProcessingStatus::PendingBlobs(block_rooot, pending_blobs)) => { + Ok(AvailabilityProcessingStatus::MissingParts(block_root)) => { // make rpc request for blob - self.send_sync_message(SyncMessage::MissingBlobs { + self.send_sync_message(SyncMessage::UnknownBlockHashFromGossipBlob( peer_id, block_root, - pending_blobs, - search_delay: Duration::from_secs(0), //TODO(sean) update - }); + Duration::from_secs(0), //TODO(sean) update + )); } Err(BlockError::ParentUnknown(block)) => { // Inform the sync manager to find parents for this block diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index e97703f2a3..b413f93aab 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -6,6 +6,7 @@ use std::time::Duration; use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::blob_verification::{AsBlock, MaybeAvailableBlock}; +use beacon_chain::data_availability_checker::DataAvailabilityChecker; use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; use fnv::FnvHashMap; use itertools::Itertools; @@ -20,13 +21,14 @@ use types::{BlobSidecar, SignedBeaconBlock}; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent}; use crate::metrics; -use crate::sync::block_lookups::parent_lookup::ParentRequest; +use crate::sync::block_lookups::parent_lookup::{ParentRequest, RequestResult}; use crate::sync::block_lookups::single_block_lookup::SingleBlobsRequest; +use crate::sync::network_context::BlockOrBlob; use self::parent_lookup::PARENT_FAIL_TOLERANCE; use self::{ parent_lookup::{ParentLookup, VerifyError}, - single_block_lookup::SingleBlockRequest, + single_block_lookup::SingleBlockLookup, }; use super::manager::BlockOrBlobProcessResult; @@ -55,8 +57,7 @@ pub(crate) struct BlockLookups { Hash256, ( Vec, - SingleBlockRequest, - Option>, + SingleBlockLookup, ), >, @@ -68,10 +69,11 @@ pub(crate) struct BlockLookups { /// /// The flag allows us to determine if the peer returned data or sent us nothing. single_block_lookups: - FnvHashMap>, + FnvHashMap>, - single_blob_lookups: - FnvHashMap>, + blob_ids_to_block_ids: HashMap, + + da_checker: Arc>, /// The logger for the import manager. log: Logger, @@ -82,7 +84,10 @@ pub(crate) struct BlockLookups { // 2. when a chain is processed, find the child requests and send for processing impl BlockLookups { - pub fn new(log: Logger) -> Self { + pub fn new( + da_checker: Arc>, + log: Logger, + ) -> Self { Self { parent_lookups: Default::default(), processing_parent_lookups: Default::default(), @@ -90,16 +95,27 @@ impl BlockLookups { FAILED_CHAINS_CACHE_EXPIRY_SECONDS, )), single_block_lookups: Default::default(), - single_blob_lookups: Default::default(), + da_checker, + blob_ids_to_block_ids: Default::default(), log, } } /* Lookup requests */ + pub fn search_block(&mut self, hash: Hash256, peer_id: PeerId, cx: &mut SyncNetworkContext) { + self.search_block_with(|| {}, hash, peer_id, cx) + } + /// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is /// constructed. - pub fn search_block(&mut self, hash: Hash256, peer_id: PeerId, cx: &mut SyncNetworkContext) { + pub fn search_block_with( + &mut self, + cache_fn: impl Fn(&mut SingleBlockLookup), + hash: Hash256, + peer_id: PeerId, + cx: &mut SyncNetworkContext, + ) { // Do not re-request a block that is already being requested if self .single_block_lookups @@ -133,14 +149,24 @@ impl BlockLookups { "block" => %hash ); - let mut single_block_request = SingleBlockRequest::new(hash, peer_id); + let mut single_block_request = SingleBlockLookup::new(hash, peer_id, da_checker); + cache_fn(&mut single_block_request); - let (peer_id, request) = single_block_request - .make_request() + let (peer_id, block_request) = single_block_request + .request_block() .expect("none of the possible failure cases apply for a newly created block lookup"); - if let Ok(request_id) = cx.single_block_lookup_request(peer_id, request) { + let (peer_id, blob_request) = single_block_request + .request_blobs() + .expect("none of the possible failure cases apply for a newly created blob lookup"); + + if let (Ok(request_id), Ok(blob_request_id)) = ( + cx.single_block_lookup_request(peer_id, block_request), + cx.single_blobs_lookup_request(peer_id, blob_request), + ) { self.single_block_lookups .insert(request_id, single_block_request); + self.blob_ids_to_block_ids + .insert(blob_request_id, request_id); metrics::set_gauge( &metrics::SYNC_SINGLE_BLOCK_LOOKUPS, @@ -149,95 +175,14 @@ impl BlockLookups { } } - pub fn search_blobs( + pub fn search_current_unknown_parent( &mut self, block_root: Hash256, - blob_ids: Vec, + block: BlockWrapper, peer_id: PeerId, cx: &mut SyncNetworkContext, ) { - let to_request = blob_ids - .into_iter() - .filter(|id| { - // Do not re-request blobs that are already being requested - if self - .single_blob_lookups - .values_mut() - .any(|single_blob_request| { - single_blob_request.add_peer_if_useful(&blob_ids, &peer_id) - }) - { - return false; - } - - if self.parent_lookups.iter_mut().any(|parent_req| { - parent_req.add_blobs_peer(&blob_ids, &peer_id) || parent_req.contains_blob(id) - }) { - // If the blob was already downloaded, or is being downloaded in this moment, do not - // request it. - return false; - } - - if self - .processing_parent_lookups - .values() - .any(|(hashes, _, _)| hashes.contains(&id.block_root)) - { - // we are already processing this blob, ignore it. - return false; - } - true - }) - .collect(); - - debug!( - self.log, - "Searching for blobs"; - "peer_id" => %peer_id, - "blobs" => %to_request - ); - - let mut single_blob_request = SingleBlobsRequest::new(to_request, peer_id); - - let (peer_id, request) = single_blob_request - .make_request() - .expect("none of the possible failure cases apply for a newly created blob lookup"); - if let Ok(request_id) = cx.single_blobs_lookup_request(peer_id, request) { - self.single_blob_lookups - .insert(request_id, single_blob_request); - - metrics::set_gauge( - &metrics::SYNC_SINGLE_BLOB_LOOKUPS, - self.single_blob_lookups.len() as i64, - ); - } - } - - pub fn search_block_delayed( - &mut self, - peer_id: PeerId, - hash: Hash256, - delay: Duration, - cx: &mut SyncNetworkContext, - ) { - //TODO(sean) handle delay - //TODO(sean) cannot use peer id here cause it assumes it has the block, this is from gossip so not true - // - // after the delay expires, need to check da cache for what we have before requesting - self.search_block(hash, peer_id, cx); - } - - pub fn search_blobs_delayed( - &mut self, - peer_id: PeerId, - block_root: Hash256, - blob_ids: Vec, - delay: Duration, - cx: &mut SyncNetworkContext, - ) { - //TODO(sean) handle delay - // after the delay expires, need to check da cache for what we have before requesting - self.search_blobs(block_root, blob_ids, peer_id, cx); + self.search_block_with(|request| request.add_block(block), block_root, peer_id, cx); } /// If a block is attempted to be processed but we do not know its parent, this function is @@ -245,11 +190,10 @@ impl BlockLookups { pub fn search_parent( &mut self, block_root: Hash256, - block: MaybeAvailableBlock, + parent_root: Hash256, peer_id: PeerId, cx: &mut SyncNetworkContext, ) { - let parent_root = block.parent_root(); // If this block or it's parent is part of a known failed chain, ignore it. if self.failed_chains.contains(&parent_root) || self.failed_chains.contains(&block_root) { debug!(self.log, "Block is from a past failed chain. Dropping"; @@ -262,7 +206,6 @@ impl BlockLookups { if self.parent_lookups.iter_mut().any(|parent_req| { parent_req.contains_block(&block_root) || parent_req.add_block_peer(&block_root, &peer_id) - || parent_req.add_block_peer(&parent_root, &peer_id) }) { // we are already searching for this block, ignore it return; @@ -277,7 +220,7 @@ impl BlockLookups { return; } - let parent_lookup = ParentLookup::new(block_root, block, peer_id); + let parent_lookup = ParentLookup::new(block_root, peer_id, self.da_checker.clone()); self.request_parent_block_and_blobs(parent_lookup, cx); } @@ -306,6 +249,73 @@ impl BlockLookups { match request.get_mut().verify_response(block) { Ok(Some((block_root, block))) => { + //TODO(sean) only send for processing if we don't have parent requests + // for this block + + // This is the correct block, send it for processing + if self + .send_block_for_processing( + block_root, + block, + seen_timestamp, + BlockProcessType::SingleBlock { id }, + cx, + ) + .is_err() + { + // Remove to avoid inconsistencies + self.single_block_lookups.remove(&id); + } + } + Ok(None) => { + // request finished correctly, it will be removed after the block is processed. + } + Err(error) => { + let msg: &str = error.into(); + cx.report_peer(peer_id, PeerAction::LowToleranceError, msg); + // Remove the request, if it can be retried it will be added with a new id. + let mut req = request.remove(); + + debug!(self.log, "Single block lookup failed"; + "peer_id" => %peer_id, "error" => msg, "block_root" => %req.requested_thing); + // try the request again if possible + if let Ok((peer_id, request)) = req.make_request() { + if let Ok(id) = cx.single_block_lookup_request(peer_id, request) { + self.single_block_lookups.insert(id, req); + } + } + } + } + + metrics::set_gauge( + &metrics::SYNC_SINGLE_BLOCK_LOOKUPS, + self.single_block_lookups.len() as i64, + ); + } + + pub fn single_blob_lookup_response( + &mut self, + id: Id, + peer_id: PeerId, + blob: Option>>, + seen_timestamp: Duration, + cx: &mut SyncNetworkContext, + ) { + let mut request = match self.single_block_lookups.entry(id) { + Entry::Occupied(req) => req, + Entry::Vacant(_) => { + if blob.is_some() { + debug!( + self.log, + "Block returned for single blob lookup not present" + ); + } + return; + } + }; + + match request.get_mut().verify_blob(blob) { + Ok(Some((block_root, blobs))) => { //TODO(sean) only send for processing if we don't have parent requests trigger // for this block @@ -374,24 +384,14 @@ impl BlockLookups { match parent_lookup.verify_block(block, &mut self.failed_chains) { Ok(Some((block_root, block))) => { - let block_wrapper = parent_lookup - .current_parent_blob_request - .as_ref() - .map_or(BlockWrapper::Block(block.clone()), |req| { - BlockWrapper::BlockAndBlobs(block, req.downloaded_blobs.clone()) - }); - - let maybe_available = cx - .chain - .data_availability_checker - .check_availability(wrapper) - .unwrap(); //TODO(sean) remove unwrap - match maybe_available { - MaybeAvailableBlock::Available(available) => { + let res = parent_lookup.add_block(block_root, block); + match res { + RequestResult::Process(wrapper) => { + let chain_hash = parent_lookup.chain_hash(); if self .send_block_for_processing( block_root, - available, + wrapper, seen_timestamp, BlockProcessType::ParentLookup { chain_hash }, cx, @@ -401,18 +401,8 @@ impl BlockLookups { self.parent_lookups.push(parent_lookup) } } - MaybeAvailableBlock::AvailabilityPending(pending) => { - let missing_ids = pending.get_missing_blob_ids(); - - self.search_blobs(block_root, missing_ids, peer_id, cx); - let _ = parent_lookup - .current_parent_request - .downloaded_block - .insert(( - block_root, - MaybeAvailableBlock::AvailabilityPending(pending), - )); - + RequestResult::SearchBlock(block_root) => { + self.search_block(block_root, peer_id, cx); self.parent_lookups.push(parent_lookup) } } @@ -461,73 +451,6 @@ impl BlockLookups { ); } - pub fn single_lookup_blob_response( - &mut self, - id: Id, - peer_id: PeerId, - blob: Option>>, - seen_timestamp: Duration, - cx: &mut SyncNetworkContext, - ) { - let mut request = match self.single_blob_lookups.entry(id) { - Entry::Occupied(req) => req, - Entry::Vacant(_) => { - if blob.is_some() { - debug!( - self.log, - "Block returned for single blob lookup not present" - ); - } - return; - } - }; - - match request.get_mut().verify_blob(blob) { - Ok(Some((block_root, blobs))) => { - //TODO(sean) only send for processing if we don't have parent requests trigger - // for this block - - // This is the correct block, send it for processing - if self - .send_block_for_processing( - block_root, - block, - seen_timestamp, - BlockProcessType::SingleBlock { id }, - cx, - ) - .is_err() - { - // Remove to avoid inconsistencies - self.single_block_lookups.remove(&id); - } - } - Ok(None) => { - // request finished correctly, it will be removed after the block is processed. - } - Err(error) => { - let msg: &str = error.into(); - cx.report_peer(peer_id, PeerAction::LowToleranceError, msg); - // Remove the request, if it can be retried it will be added with a new id. - let mut req = request.remove(); - - debug!(self.log, "Single block lookup failed"; - "peer_id" => %peer_id, "error" => msg, "block_root" => %req.requested_thing); - // try the request again if possible - if let Ok((peer_id, request)) = req.make_request() { - if let Ok(id) = cx.single_block_lookup_request(peer_id, request) { - self.single_block_lookups.insert(id, req); - } - } - } - } - - metrics::set_gauge( - &metrics::SYNC_SINGLE_BLOCK_LOOKUPS, - self.single_block_lookups.len() as i64, - ); - } - pub fn parent_lookup_blob_response( &mut self, id: Id, @@ -551,50 +474,28 @@ impl BlockLookups { match parent_lookup.verify_blob(blob, &mut self.failed_chains) { Ok(Some(blobs)) => { - if let Some((block_root, block)) = - parent_lookup.current_parent_request.downloaded_block.take() - { - let block_wrapper = parent_lookup - .current_parent_blob_request - .as_ref() - .map_or(BlockWrapper::Block(block.clone()), |req| { - BlockWrapper::BlockAndBlobs(block, req.downloaded_blobs.clone()) - }); + let processed_or_search = parent_lookup.add_blobs(blobs); - let maybe_available = cx - .chain - .data_availability_checker - .check_availability(wrapper) - .unwrap(); //TODO(sean) remove unwrap - match maybe_available { - MaybeAvailableBlock::Available(available) => { - if self - .send_block_for_processing( - block_root, - available, - seen_timestamp, - BlockProcessType::ParentLookup { chain_hash }, - cx, - ) - .is_ok() - { - self.parent_lookups.push(parent_lookup) - } - } - MaybeAvailableBlock::AvailabilityPending(pending) => { - let missing_ids = pending.get_missing_blob_ids(); - - self.search_blobs(block_root, missing_ids, peer_id, cx); - parent_lookup - .current_parent_request - .downloaded_block - .insert(( - block_root, - MaybeAvailableBlock::AvailabilityPending(pending), - )); + match processed_or_search { + RequestResult::Process(wrapper) => { + let chain_hash = parent_lookup.chain_hash(); + if self + .send_block_for_processing( + block_root, + wrapper, + seen_timestamp, + BlockProcessType::ParentLookup { chain_hash }, + cx, + ) + .is_ok() + { self.parent_lookups.push(parent_lookup) } } + RequestResult::SearchBlock(block_root) => { + self.search_block(block_root, peer_id, cx); + self.parent_lookups.push(parent_lookup) + } } } Ok(None) => { @@ -770,11 +671,8 @@ impl BlockLookups { AvailabilityProcessingStatus::Imported(hash) => { trace!(self.log, "Single block processing succeeded"; "block" => %root); } - AvailabilityProcessingStatus::PendingBlobs(block_root, blobs_ids) => { - self.search_blobs(block_root, blobs_ids, peer_id, cx); - } - AvailabilityProcessingStatus::PendingBlock(hash) => { - warn!(self.log, "Block processed but returned PendingBlock"; "block" => %hash); + AvailabilityProcessingStatus::MissingParts(block_root) => { + self.search_block(block_root, peer_id, cx); } }, BlockOrBlobProcessResult::Ignored => { @@ -857,21 +755,14 @@ impl BlockLookups { }; match &result { - BlockOrBlobProcessResult::Ok(status) => { - match status { - AvailabilityProcessingStatus::Imported(hash) => { - trace!(self.log, "Parent block processing succeeded"; &parent_lookup) - } - AvailabilityProcessingStatus::PendingBlobs(block_root, blobs) => { - // trigger? - - // make sure we have a pending blobs request outstanding - } - AvailabilityProcessingStatus::PendingBlock(hash) => { - // logic error - } + BlockOrBlobProcessResult::Ok(status) => match status { + AvailabilityProcessingStatus::Imported(hash) => { + trace!(self.log, "Parent block processing succeeded"; &parent_lookup) } - } + AvailabilityProcessingStatus::MissingParts(block_root) => { + trace!(self.log, "Parent missing parts, triggering single block lookup "; &parent_lookup) + } + }, BlockOrBlobProcessResult::Err(e) => { trace!(self.log, "Parent block processing failed"; &parent_lookup, "error" => %e) } @@ -886,17 +777,15 @@ impl BlockLookups { } match result { - BlockOrBlobProcessResult::Ok(AvailabilityProcessingStatus::PendingBlock(_)) => { - // doesn't make sense - } - BlockOrBlobProcessResult::Ok(AvailabilityProcessingStatus::PendingBlobs( + BlockOrBlobProcessResult::Ok(AvailabilityProcessingStatus::MissingParts( block_root, - blobs_ids, )) => { - self.search_blobs(block_root, blobs_ids, peer_id, cx); + self.search_block(block_root, peer_id, cx); } BlockOrBlobProcessResult::Err(BlockError::ParentUnknown(block)) => { parent_lookup.add_block(block); + // `ParentUnknown` triggered by a parent block lookup should always have all blobs + // so we don't re-request blobs for the current block. self.request_parent_block_and_blobs(parent_lookup, cx); } BlockOrBlobProcessResult::Ok(AvailabilityProcessingStatus::Imported(_)) diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index e8d8951da4..7236089572 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -1,13 +1,16 @@ use super::DownlodedBlocks; use crate::sync::block_lookups::single_block_lookup::{RequestableThing, SingleBlobsRequest}; use crate::sync::block_lookups::RootBlockTuple; +use crate::sync::manager::BlockProcessType; use crate::sync::{ manager::{Id, SLOT_IMPORT_TOLERANCE}, network_context::SyncNetworkContext, }; use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::blob_verification::{AsBlock, MaybeAvailableBlock}; +use beacon_chain::data_availability_checker::{AvailableBlock, DataAvailabilityChecker}; use beacon_chain::BeaconChainTypes; +use lighthouse_network::libp2p::core::either::EitherName::A; use lighthouse_network::PeerId; use std::iter; use std::sync::Arc; @@ -16,7 +19,7 @@ use strum::IntoStaticStr; use types::blob_sidecar::BlobIdentifier; use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; -use super::single_block_lookup::{self, SingleBlockRequest}; +use super::single_block_lookup::{self, SingleBlockLookup}; /// How many attempts we try to find a parent of a block before we give up trying. pub(crate) const PARENT_FAIL_TOLERANCE: u8 = 5; @@ -32,10 +35,9 @@ pub(crate) struct ParentLookup { /// The blocks that have currently been downloaded. downloaded_blocks: Vec>, /// Request of the last parent. - pub current_parent_request: SingleBlockRequest, + pub current_parent_request: SingleBlockLookup, /// Id of the last parent request. current_parent_request_id: Option, - pub current_parent_blob_request: Option>, current_parent_blob_request_id: Option, } @@ -60,6 +62,11 @@ pub enum RequestError { NoPeers, } +pub enum RequestResult { + Process(BlockWrapper), + SearchBlock(Hash256), +} + impl ParentLookup { pub fn contains_block(&self, block_root: &Hash256) -> bool { self.downloaded_blocks @@ -67,64 +74,23 @@ impl ParentLookup { .any(|(root, _d_block)| root == block_root) } - pub fn contains_blob(&self, blob_id: &BlobIdentifier) -> bool { - self.downloaded_blocks - .iter() - .any(|(_root, block)| match block { - MaybeAvailableBlock::Available(_) => false, - MaybeAvailableBlock::AvailabilityPending(pending) => pending.has_blob(&blob_id), - }) - } - pub fn new( block_root: Hash256, - block: MaybeAvailableBlock, peer_id: PeerId, + da_checker: Arc>, ) -> Self { - // if available, just add to downloaded blocks, - - // if maybe available, treat it as a single blob lookup that will be requested after - // this parent chain segment is processed - - let current_parent_request = SingleBlockRequest::new(block.parent_root(), peer_id); - - let (current_parent_blob_request, current_blobs_request) = match block.as_ref() { - MaybeAvailableBlock::Available(available) => { - let current_parent_blob_request = if available.da_check_required() { - Some(SingleBlobsRequest::new_with_all_ids( - block.parent_root(), - peer_id, - )) - } else { - None - }; - (current_parent_blob_request, None) - } - MaybeAvailableBlock::AvailabilityPending(pending) => { - let parent_req = SingleBlobsRequest::new_with_all_ids(block.parent_root(), peer_id); - let current_req = - SingleBlobsRequest::new(pending.get_missing_blob_ids().clone(), peer_id); - (Some(parent_req), Some(current_req)) - } - }; + let current_parent_request = + SingleBlockLookup::new(block.parent_root(), peer_id, da_checker); Self { chain_hash: block_root, - downloaded_blocks: vec![(block_root, block)], + downloaded_blocks: vec![], current_parent_request, current_parent_request_id: None, - current_parent_blob_request, current_parent_blob_request_id: None, } } - pub fn new_with_blobs_request( - block_root: Hash256, - block_wrapper: MaybeAvailableBlock, - peer_id: PeerId, - ) -> Self { - } - /// Attempts to request the next unknown parent. If the request fails, it should be removed. pub fn request_parent_block( &mut self, @@ -152,7 +118,7 @@ impl ParentLookup { &mut self, cx: &mut SyncNetworkContext, ) -> Result<(), RequestError> { - if let Some(blob_req) = self.current_parent_blob_request.as_mut() { + if let Some(blob_req) = self.current_parent_request.as_mut() { // check to make sure this request hasn't failed if self.downloaded_blocks.len() >= PARENT_DEPTH_TOLERANCE { return Err(RequestError::ChainTooLong); @@ -183,42 +149,22 @@ impl ParentLookup { .unwrap_or_default() } - pub fn add_block(&mut self, block: MaybeAvailableBlock) { - let next_parent = block.parent_root(); - let current_root = self.current_parent_request.requested_thing; - - self.downloaded_blocks.push((current_root, block)); - - // Block request updates - self.current_parent_request.requested_block_root = next_parent; - self.current_parent_request.request_state.state = - single_block_lookup::State::AwaitingDownload; + pub fn add_block( + &mut self, + block_root: Hash256, + block: Arc>, + ) -> RequestResult { self.current_parent_request_id = None; - - // Blob request updates - if let Some(blob_req) = self.current_parent_blob_request.as_mut() { - let mut all_ids = Vec::with_capacity(T::EthSpec::max_blobs_per_block()); - for i in 0..T::EthSpec::max_blobs_per_block() { - all_ids.push(BlobIdentifier { - block_root: next_parent, - index: i as u64, - }); - } - blob_req.requested_ids = all_ids; - blob_req.request_state.state = single_block_lookup::State::AwaitingDownload; - } - self.current_parent_blob_request_id = None; + self.current_parent_request.add_block(block_root, block) } - pub fn add_blobs(&mut self, blobs: Vec) { - self.current_parent_blob_request.map_or_else( - SingleBlobsRequest::new(blobs, peer_id), - |mut req| { - req.requested_thing = next_parent; - req.state = single_block_lookup::State::AwaitingDownload; - }, - ); + pub fn add_blobs( + &mut self, + block_root: Hash256, + blobs: Vec>>, + ) -> RequestResult { self.current_parent_blob_request_id = None; + self.current_parent_request.add_blobs(block_root, blobs) } pub fn pending_block_response(&self, req_id: Id) -> bool { @@ -237,7 +183,7 @@ impl ParentLookup { Hash256, Vec>, Vec, - SingleBlockRequest, + SingleBlockLookup, Option>, ) { let ParentLookup { diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 35c1323954..64eb292146 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -1,9 +1,14 @@ use super::DownlodedBlocks; +use crate::sync::block_lookups::parent_lookup::RequestResult; use crate::sync::block_lookups::RootBlockTuple; +use crate::sync::manager::BlockProcessType; use crate::sync::network_context::SyncNetworkContext; use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::blob_verification::{AsBlock, MaybeAvailableBlock}; -use beacon_chain::get_block_root; +use beacon_chain::data_availability_checker::{ + AvailabilityCheckError, AvailabilityPendingBlock, DataAvailabilityChecker, +}; +use beacon_chain::{get_block_root, BeaconChainTypes}; use lighthouse_network::rpc::methods::BlobsByRootRequest; use lighthouse_network::{rpc::BlocksByRootRequest, PeerId, Request}; use rand::seq::IteratorRandom; @@ -15,16 +20,14 @@ use strum::IntoStaticStr; use types::blob_sidecar::BlobIdentifier; use types::{BlobSidecar, SignedBeaconBlock}; -pub struct SingleBlockRequest { +pub struct SingleBlockLookup { pub requested_block_root: Hash256, - pub downloaded_block: Option<(Hash256, MaybeAvailableBlock)>, - pub request_state: SingleLookupRequestState, -} - -pub struct SingleBlobsRequest { pub requested_ids: Vec, - pub downloaded_blobs: Vec>>, - pub request_state: SingleLookupRequestState, + pub downloaded_blobs: Vec>>, + pub downloaded_block: Option>>, + pub block_request_state: SingleLookupRequestState, + pub blob_request_state: SingleLookupRequestState, + pub da_checker: Arc>, } /// Object representing a single block lookup request. @@ -58,6 +61,12 @@ pub enum VerifyError { ExtraBlocksReturned, } +#[derive(Debug, PartialEq, Eq, IntoStaticStr)] +pub enum BlobVerifyError { + UnrequestedBlobId, + ExtraBlobsReturned, +} + #[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum LookupRequestError { /// Too many failed attempts @@ -68,12 +77,56 @@ pub enum LookupRequestError { NoPeers, } -impl SingleBlockRequest { - pub fn new(requested_block_root: Hash256, peer_id: PeerId) -> Self { +impl SingleBlockLookup { + pub fn new( + requested_block_root: Hash256, + peer_id: PeerId, + da_checker: Arc>, + ) -> Self { + da_checker.get_missing_parts_for_hash(&requested_block_root); + Self { requested_block_root, + requested_ids: vec![], downloaded_block: None, - request_state: SingleLookupRequestState::new(peer_id), + downloaded_blobs: vec![], + block_request_state: SingleLookupRequestState::new(peer_id), + blob_request_state: SingleLookupRequestState::new(peer_id), + da_checker, + } + } + + pub fn add_blobs( + &mut self, + block_root: Hash256, + blobs: Vec>>, + ) -> RequestResult { + //TODO(sean) smart extend, we don't want dupes + self.downloaded_blobs.extend(blobs); + + if let Some(block) = self.downloaded_block.as_ref() { + match self.da_checker.zip_block(block_root, block, blobs) { + Ok(wrapper) => RequestResult::Process(wrapper), + Err(AvailabilityCheckError::MissingBlobs) => RequestResult::SearchBlock(block_root), + _ => todo!(), + } + } else { + RequestResult::SearchBlock(block_hash) + } + } + + pub fn add_block( + &mut self, + block_root: Hash256, + block: Arc>, + ) -> RequestResult { + //TODO(sean) check for existing block? + self.downloaded_block = Some(block); + + match self.da_checker.zip_block(block_root, block, blobs) { + Ok(wrapper) => RequestResult::Process(wrapper), + Err(AvailabilityCheckError::MissingBlobs) => RequestResult::SearchBlock(block_root), + _ => todo!(), } } @@ -83,9 +136,9 @@ impl SingleBlockRequest { &mut self, block: Option>>, ) -> Result>, VerifyError> { - match self.request_state.state { + match self.block_request_state.state { State::AwaitingDownload => { - self.request_state.register_failure_downloading(); + self.block_request_state.register_failure_downloading(); Err(VerifyError::ExtraBlocksReturned) } State::Downloading { peer_id } => match block { @@ -97,11 +150,11 @@ impl SingleBlockRequest { // return an error and drop the block // NOTE: we take this is as a download failure to prevent counting the // attempt as a chain failure, but simply a peer failure. - self.request_state.register_failure_downloading(); + self.block_request_state.register_failure_downloading(); Err(VerifyError::RootMismatch) } else { // Return the block for processing. - self.request_state.state = State::Processing { peer_id }; + self.block_request_state.state = State::Processing { peer_id }; Ok(Some((block_root, block))) } } @@ -113,7 +166,7 @@ impl SingleBlockRequest { State::Processing { peer_id: _ } => match block { Some(_) => { // We sent the block for processing and received an extra block. - self.request_state.register_failure_downloading(); + self.block_request_state.register_failure_downloading(); Err(VerifyError::ExtraBlocksReturned) } None => { @@ -126,14 +179,17 @@ impl SingleBlockRequest { } pub fn request_block(&mut self) -> Result<(PeerId, BlocksByRootRequest), LookupRequestError> { - debug_assert!(matches!(self.request_state.state, State::AwaitingDownload)); + debug_assert!(matches!( + self.block_request_state.state, + State::AwaitingDownload + )); if self.failed_attempts() >= MAX_ATTEMPTS { Err(LookupRequestError::TooManyAttempts { - cannot_process: self.request_state.failed_processing - >= self.request_state.failed_downloading, + cannot_process: self.block_request_state.failed_processing + >= self.block_request_state.failed_downloading, }) } else if let Some(&peer_id) = self - .request_state + .block_request_state .available_peers .iter() .choose(&mut rand::thread_rng()) @@ -141,63 +197,29 @@ impl SingleBlockRequest { let request = BlocksByRootRequest { block_roots: VariableList::from(vec![self.requested_block_root]), }; - self.request_state.state = State::Downloading { peer_id }; - self.request_state.used_peers.insert(peer_id); + self.block_request_state.state = State::Downloading { peer_id }; + self.block_request_state.used_peers.insert(peer_id); Ok((peer_id, request)) } else { Err(LookupRequestError::NoPeers) } } - pub fn add_peer_if_useful(&mut self, block_root: &Hash256, peer_id: &PeerId) -> bool { - let is_useful = self.requested_block_root == *block_root; - if is_useful { - self.request_state.add_peer(peer_id); - } - is_useful - } -} - -impl SingleBlobsRequest { - pub fn new(blob_ids: Vec, peer_id: PeerId) -> Self { - Self { - requested_ids: blob_ids, - downloaded_blobs: vec![], - request_state: SingleLookupRequestState::new(peer_id), - } - } - - pub fn new_with_all_ids(block_root: Hash256, peer_id: PeerId) -> Self { - let mut ids = Vec::with_capacity(T::max_blobs_per_block()); - for i in 0..T::max_blobs_per_block() { - ids.push(BlobIdentifier { - block_root, - index: i as u64, - }); - } - - Self { - requested_ids: ids, - downloaded_blobs: vec![], - request_state: SingleLookupRequestState::new(peer_id), - } - } - pub fn verify_blob( &mut self, blob: Option>>, - ) -> Result>>>, VerifyError> { - match self.request_state.state { + ) -> Result>>>, BlobVerifyError> { + match self.block_request_state.state { State::AwaitingDownload => { - self.request_state.register_failure_downloading(); - Err(VerifyError::ExtraBlocksReturned) + self.blob_request_state.register_failure_downloading(); + Err(BlobVerifyError::ExtraBlobsReturned) } State::Downloading { peer_id } => match blob { Some(blob) => { let received_id = blob.id(); if !self.requested_ids.contains(&received_id) { - self.request_state.register_failure_downloading(); - Err(VerifyError::RootMismatch) + self.blob_request_state.register_failure_downloading(); + Err(BlobVerifyError::UnrequestedBlobId) } else { // state should still be downloading self.requested_ids.retain(|id| id != received_id); @@ -205,15 +227,15 @@ impl SingleBlobsRequest { } } None => { - self.request_state.state = State::Processing { peer_id }; + self.blob_request_state.state = State::Processing { peer_id }; Ok(Some(self.downloaded_blobs.clone())) } }, State::Processing { peer_id: _ } => match block { Some(_) => { - // We sent the block for processing and received an extra block. - self.request_state.register_failure_downloading(); - Err(VerifyError::ExtraBlocksReturned) + // We sent the blob for processing and received an extra blob. + self.blob_request_state.register_failure_downloading(); + Err(BlobVerifyError::ExtraBlobsReturned) } None => { // This is simply the stream termination and we are already processing the @@ -225,14 +247,17 @@ impl SingleBlobsRequest { } pub fn request_blobs(&mut self) -> Result<(PeerId, BlobsByRootRequest), LookupRequestError> { - debug_assert!(matches!(self.request_state.state, State::AwaitingDownload)); + debug_assert!(matches!( + self.block_request_state.state, + State::AwaitingDownload + )); if self.failed_attempts() >= MAX_ATTEMPTS { Err(LookupRequestError::TooManyAttempts { - cannot_process: self.request_state.failed_processing - >= self.request_state.failed_downloading, + cannot_process: self.blob_request_state.failed_processing + >= self.blob_request_state.failed_downloading, }) } else if let Some(&peer_id) = self - .request_state + .blob_request_state .available_peers .iter() .choose(&mut rand::thread_rng()) @@ -240,18 +265,19 @@ impl SingleBlobsRequest { let request = BlobsByRootRequest { blob_ids: VariableList::from(self.requested_ids), }; - self.request_state.state = State::Downloading { peer_id }; - self.request_state.used_peers.insert(peer_id); + self.blob_request_state.state = State::Downloading { peer_id }; + self.blob_request_state.used_peers.insert(peer_id); Ok((peer_id, request)) } else { Err(LookupRequestError::NoPeers) } } - pub fn add_peer_if_useful(&mut self, blob_id: &BlobIdentifier, peer_id: &PeerId) -> bool { + // TODO: only add peers if they have *all* blob ids in the request, this could probably be improved. + pub fn add_peer_if_useful(&mut self, blob_id: &[BlobIdentifier], peer_id: &PeerId) -> bool { let is_useful = self.requested_ids.contains(blob_id); if is_useful { - self.request_state.add_peer(peer_id); + self.block_request_state.add_peer(peer_id); } is_useful } @@ -312,7 +338,7 @@ impl SingleLookupRequestState { } } -impl slog::Value for SingleBlockRequest { +impl slog::Value for SingleBlockLookup { fn serialize( &self, record: &slog::Record, @@ -361,7 +387,7 @@ mod tests { let peer_id = PeerId::random(); let block = rand_block(); - let mut sl = SingleBlockRequest::<4>::new(block.canonical_root(), peer_id); + let mut sl = SingleBlockLookup::<4>::new(block.canonical_root(), peer_id); sl.make_request().unwrap(); sl.verify_response(Some(block.into())).unwrap().unwrap(); } @@ -372,7 +398,7 @@ mod tests { let peer_id = PeerId::random(); let block = rand_block(); - let mut sl = SingleBlockRequest::::new(block.canonical_root(), peer_id); + let mut sl = SingleBlockLookup::::new(block.canonical_root(), peer_id); for _ in 1..FAILURES { sl.make_request().unwrap(); sl.register_failure_downloading(); diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index bcebbcd368..5de054412f 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -54,11 +54,14 @@ use lighthouse_network::types::{NetworkGlobals, SyncState}; use lighthouse_network::SyncInfo; use lighthouse_network::{PeerAction, PeerId}; use slog::{crit, debug, error, info, trace, warn, Logger}; +use slot_clock::SlotClock; use std::boxed::Box; use std::ops::Sub; +use std::sync::mpsc::TryRecvError; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; +use tokio::time::sleep; use types::blob_sidecar::BlobIdentifier; use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot}; @@ -125,17 +128,7 @@ pub enum SyncMessage { /// A peer has sent a blob that references a block that is unknown. This triggers the /// manager to attempt to find the block matching the unknown hash when the specified delay expires. - UnknownBlockHashFromGossipBlob(PeerId, Hash256, Duration), - - /// A peer has sent us a block that we haven't received all the blobs for. This triggers - /// the manager to attempt to find the pending blobs for the given block root when the specified - /// delay expires. - MissingBlobs { - peer_id: PeerId, - block_root: Hash256, - pending_blobs: Vec, - search_delay: Duration, - }, + UnknownBlockHashFromGossipBlob(Slot, PeerId, Hash256), /// A peer has disconnected. Disconnect(PeerId), @@ -214,6 +207,8 @@ pub struct SyncManager { block_lookups: BlockLookups, + delayed_lookups: mpsc::Sender>, + /// The logger for the import manager. log: Logger, } @@ -235,6 +230,8 @@ pub fn spawn( ); // generate the message channel let (sync_send, sync_recv) = mpsc::unbounded_channel::>(); + let (delayed_lookups_send, mut delayed_lookups_recv) = + mpsc::channel::>(512); //TODO(sean) what's a reasonable size for this channel? h // create an instance of the SyncManager let mut sync_manager = SyncManager { @@ -251,9 +248,53 @@ pub fn spawn( range_sync: RangeSync::new(beacon_chain.clone(), log.clone()), backfill_sync: BackFillSync::new(beacon_chain, network_globals, log.clone()), block_lookups: BlockLookups::new(log.clone()), + delayed_lookups: delayed_lookups_send, log: log.clone(), }; + executor.spawn( + async move { + let slot_duration = slot_clock.slot_duration(); + // TODO(sean) think about what this should be + let delay = beacon_chain.slot_clock.unagg_attestation_production_delay(); + + loop { + let sleep_duration = match ( + beacon_chain.slot_clock.duration_to_next_slot(), + beacon_chain.slot_clock.seconds_from_current_slot_start(), + ) { + (Some(duration_to_next_slot), Some(seconds_from_current_slot_start)) => { + if seconds_from_current_slot_start > delay { + duration_to_next_slot + delay + } else { + delay - seconds_from_current_slot_start + } + } + _ => { + error!(log, "Failed to read slot clock"); + // If we can't read the slot clock, just wait another slot. + sleep(slot_duration).await; + continue; + } + }; + + sleep(sleep_duration).await; + + while let next = delayed_lookups_recv.try_recv() { + match next { + Ok(msg) => { + if let Err(e) = sync_send.send(msg) { + warn!(log, "Failed to send delayed lookup message"; "error" => ?e); + } + } + Err(_) => break, + } + } + } + }, + "delayed_lookups", + ); + // spawn the sync manager thread debug!(log, "Sync Manager started"); executor.spawn(async move { Box::pin(sync_manager.main()).await }, "sync"); @@ -603,8 +644,25 @@ impl SyncManager { if self.network_globals.peers.read().is_connected(&peer_id) && self.network.is_execution_engine_online() { - self.block_lookups - .search_parent(block_root, block, peer_id, &mut self.network); + let parent_root = block.parent_root(); + //TODO(sean) what about early blocks + if block.slot() == self.chain.slot_clock.now() { + self.delayed_lookups + .send(SyncMessage::UnknownBlock(peer_id, block, block_root)); + } else { + self.block_lookups.search_current_unknown_parent( + block_root, + block, + peer_id, + &mut self.network, + ); + } + self.block_lookups.search_parent( + block_root, + parent_root, + peer_id, + &mut self.network, + ); } } SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_hash) => { @@ -612,35 +670,21 @@ impl SyncManager { if self.synced_and_connected(&peer_id) { self.block_lookups .search_block(block_hash, peer_id, &mut self.network); - //TODO(sean) we could always request all blobs at this point } } - SyncMessage::UnknownBlockHashFromGossipBlob(peer_id, block_hash, delay) => { + SyncMessage::UnknownBlockHashFromGossipBlob(slot, peer_id, block_hash) => { // If we are not synced, ignore this block. if self.synced_and_connected(&peer_id) { - self.block_lookups.search_block_delayed( - peer_id, - block_hash, - delay, - &mut self.network, - ); - } - } - SyncMessage::MissingBlobs { - peer_id, - block_root, - pending_blobs, - search_delay, - } => { - // If we are not synced, ignore these blobs. - if self.synced_and_connected(&peer_id) { - self.block_lookups.search_blobs_delayed( - peer_id, - block_root, - pending_blobs, - search_delay, - &mut self.network, - ); + //TODO(sean) what about early gossip messages? + if Some(slot) == self.chain.slot_clock.now() { + self.delayed_lookups + .send(SyncMessage::UnknownBlockHashFromAttestation( + peer_id, block_hash, + )) + } else { + self.block_lookups + .search_block(block_hash, peer_id, &mut self.network) + } } } SyncMessage::Disconnect(peer_id) => { @@ -835,7 +879,7 @@ impl SyncManager { seen_timestamp: Duration, ) { match request_id { - RequestId::SingleBlock { id } => self.block_lookups.single_lookup_blob_response( + RequestId::SingleBlock { id } => self.block_lookups.single_blob_lookup_response( id, peer_id, blob,