From 374ec4800a47acfd68b80b82a26df2852f3eff4b Mon Sep 17 00:00:00 2001 From: realbigsean Date: Wed, 19 Apr 2023 16:44:19 -0400 Subject: [PATCH] much work --- beacon_node/beacon_chain/src/beacon_chain.rs | 16 ++-- .../beacon_chain/src/blob_verification.rs | 3 + .../beacon_chain/src/block_verification.rs | 1 + .../src/data_availability_checker.rs | 74 +++++++----------- beacon_node/http_api/src/publish_blocks.rs | 2 +- .../network/src/beacon_processor/mod.rs | 12 ++- .../work_reprocessing_queue.rs | 4 +- .../beacon_processor/worker/gossip_methods.rs | 19 ++--- .../beacon_processor/worker/sync_methods.rs | 7 +- .../network/src/sync/block_lookups/mod.rs | 76 +++++++++++++------ .../src/sync/block_lookups/parent_lookup.rs | 10 ++- .../sync/block_lookups/single_block_lookup.rs | 70 +++++++++-------- .../src/sync/block_sidecar_coupling.rs | 14 +++- beacon_node/network/src/sync/manager.rs | 25 +++--- 14 files changed, 188 insertions(+), 145 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index d1f1a126c7..d154e58c2c 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -115,7 +115,7 @@ use tokio_stream::Stream; use tree_hash::TreeHash; use types::beacon_block_body::KzgCommitments; use types::beacon_state::CloneConfig; -use types::blob_sidecar::{BlobIdentifier, BlobSidecarList, Blobs}; +use types::blob_sidecar::{BlobSidecarList, Blobs}; use types::consts::deneb::MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS; use types::consts::merge::INTERVALS_PER_SLOT; use types::*; @@ -190,7 +190,7 @@ pub enum WhenSlotSkipped { #[derive(Debug, PartialEq)] pub enum AvailabilityProcessingStatus { - MissingParts(Hash256), + MissingParts(Slot, Hash256), Imported(Hash256), } @@ -2670,8 +2670,11 @@ impl BeaconChain { AvailabilityProcessingStatus::Imported(_) => { // The block was imported successfully. } - AvailabilityProcessingStatus::MissingParts(block_root) => { - //TODO(sean) fail + AvailabilityProcessingStatus::MissingParts(slot, block_root) => { + return ChainSegmentResult::Failed { + imported_blocks, + error: BlockError::MissingBlockParts(slot, block_root), + }; } } } @@ -2748,6 +2751,7 @@ impl BeaconChain { count_unrealized: CountUnrealized, ) -> Result> { self.check_availability_and_maybe_import( + blob.slot(), |chain| chain.data_availability_checker.put_gossip_blob(blob), count_unrealized, ) @@ -2804,6 +2808,7 @@ impl BeaconChain { } ExecutedBlock::AvailabilityPending(block) => { self.check_availability_and_maybe_import( + block.block.slot(), |chain| { chain .data_availability_checker @@ -2907,6 +2912,7 @@ impl BeaconChain { /// (i.e., this function is not atomic). pub async fn check_availability_and_maybe_import( self: &Arc, + slot: Slot, cache_fn: impl FnOnce(Arc) -> Result, AvailabilityCheckError>, count_unrealized: CountUnrealized, ) -> Result> { @@ -2916,7 +2922,7 @@ impl BeaconChain { self.import_available_block(block, count_unrealized).await } Availability::MissingParts(block_root) => { - Ok(AvailabilityProcessingStatus::MissingParts(block_root)) + Ok(AvailabilityProcessingStatus::MissingParts(slot, block_root)) } } } diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index 1e1c35f271..2747760934 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -147,6 +147,9 @@ impl GossipVerifiedBlob { pub fn to_blob(self) -> Arc> { self.blob } + pub fn slot(&self) -> Slot { + self.blob.slot + } } pub fn validate_blob_sidecar_for_gossip( diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 9f31306d21..7b181aa518 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -312,6 +312,7 @@ pub enum BlockError { }, BlobValidation(BlobError), AvailabilityCheck(AvailabilityCheckError), + MissingBlockParts(Slot, Hash256), } impl From for BlockError { diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 8dd856649f..99ecdaa4c9 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -2,9 +2,7 @@ use crate::blob_verification::{ verify_kzg_for_blob, verify_kzg_for_blob_list, AsBlock, BlockWrapper, GossipVerifiedBlob, KzgVerifiedBlob, KzgVerifiedBlobList, MaybeAvailableBlock, }; -use crate::block_verification::{ - AvailabilityPendingExecutedBlock, AvailableExecutedBlock, IntoExecutionPendingBlock, -}; +use crate::block_verification::{AvailabilityPendingExecutedBlock, AvailableExecutedBlock}; use kzg::Error as KzgError; use kzg::Kzg; @@ -13,15 +11,14 @@ use slot_clock::SlotClock; use ssz_types::{Error, FixedVector, VariableList}; use state_processing::per_block_processing::deneb::deneb::verify_kzg_commitments_against_transactions; use std::collections::hash_map::{Entry, OccupiedEntry}; -use std::collections::{HashMap, HashSet}; -use std::ops::Index; +use std::collections::HashMap; use std::sync::Arc; use types::beacon_block_body::KzgCommitments; use types::blob_sidecar::{BlobIdentifier, BlobSidecar}; use types::consts::deneb::MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS; use types::{ BeaconBlockRef, BlobSidecarList, ChainSpec, Epoch, EthSpec, ExecPayload, FullPayload, Hash256, - SignedBeaconBlock, SignedBeaconBlockHeader, SignedBlobSidecar, Slot, + SignedBeaconBlock, SignedBeaconBlockHeader, Slot, }; #[derive(Debug)] @@ -202,16 +199,20 @@ impl DataAvailabilityChecker { block_root: Hash256, blobs: FixedVector>>, T::MaxBlobsPerBlock>, ) -> Result, AvailabilityCheckError> { - //TODO(sean) merge with existing blobs, only kzg verify blobs we haven't yet verified + // TODO(sean) we may duplicated kzg verification on some blobs we already have cached so we could optimize this - // Verify the KZG commitment. - let kzg_verified_blobs = if let Some(kzg) = self.kzg.as_ref() { - verify_kzg_for_blob_list(blobs, kzg)? + let mut verified_blobs = vec![]; + if let Some(kzg) = self.kzg.as_ref() { + for blob_opt in blobs.into_iter() { + if let Some(blob) = blob_opt { + verified_blobs.push(verify_kzg_for_blob(blob.clone(), kzg)?) + } + } } else { return Err(AvailabilityCheckError::KzgNotInitialized); }; - self.put_kzg_verified_blobs(block_root, &kzg_verified_blobs) + self.put_kzg_verified_blobs(block_root, &verified_blobs) } /// This first validates the KZG commitments included in the blob sidecar. @@ -366,7 +367,12 @@ impl DataAvailabilityChecker { .kzg .as_ref() .ok_or(AvailabilityCheckError::KzgNotInitialized)?; - let verified_blobs = verify_kzg_for_blob_list(blob_list, kzg)?; + let filtered_blobs = blob_list + .to_vec() + .into_iter() + .filter_map(|blob| blob) + .collect(); + let verified_blobs = verify_kzg_for_blob_list(filtered_blobs, kzg)?; Ok(MaybeAvailableBlock::Available( self.check_availability_with_blobs(block, verified_blobs)?, @@ -375,40 +381,6 @@ impl DataAvailabilityChecker { } } - /// For a given block wrapper, find the missing blobs. Useful for parent unknown blocks. - /// Because these don't otherwise hit the data availability caches. - pub fn get_missing_blob_ids( - &self, - block: BlockWrapper, - block_root: Option, - ) -> Result, AvailabilityCheckError> { - let (block, blobs) = block.deconstruct(); - let maybe_available = self.check_availability_without_blobs(block)?; - let blob_ids = match &maybe_available { - MaybeAvailableBlock::Available(_) => { - vec![] - } - MaybeAvailableBlock::AvailabilityPending(pending_block) => { - if let Some(blobs) = blobs { - pending_block.get_filtered_blob_ids(block_root, |index_usize, block_root| { - let index = index_usize as u64; - let blob_in_wrapper = blobs - .get(index_usize) - .map(|blob| blob.index == index) - .unwrap_or(false); - let blob_in_cache = self - .get_blob(&BlobIdentifier { block_root, index }) - .is_some(); - !blob_in_wrapper && !blob_in_cache - }) - } else { - pending_block.get_all_blob_ids(block_root) - } - } - }; - Ok(blob_ids) - } - /// Checks if a block is available, returning an error if the block is not immediately available. /// Does not access the gossip cache. pub fn try_check_availability( @@ -582,6 +554,9 @@ pub struct AvailabilityPendingBlock { } impl AvailabilityPendingBlock { + pub fn slot(&self) -> Slot { + self.block.slot() + } pub fn num_blobs_expected(&self) -> usize { self.kzg_commitments() .map_or(0, |commitments| commitments.len()) @@ -712,7 +687,12 @@ impl AsBlock for AvailableBlock { fn into_block_wrapper(self) -> BlockWrapper { let (block, blobs_opt) = self.deconstruct(); if let Some(blobs) = blobs_opt { - BlockWrapper::BlockAndBlobs(block, blobs.to_vec()) + let blobs_vec = blobs + .to_vec() + .into_iter() + .map(Option::Some) + .collect::>(); + BlockWrapper::BlockAndBlobs(block, FixedVector::from(blobs_vec)) } else { BlockWrapper::Block(block) } diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index acc58e202b..da20d44f0b 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -136,7 +136,7 @@ pub async fn publish_block( Ok(()) } - Ok(AvailabilityProcessingStatus::MissingParts(block_root)) => { + Ok(AvailabilityProcessingStatus::MissingParts(_, block_root)) => { let msg = format!("Missing parts of block with root {:?}", block_root); error!( log, diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 3cadf49f05..969a522ab1 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -66,7 +66,7 @@ use task_executor::TaskExecutor; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; use types::{ - Attestation, AttesterSlashing, BlobSidecar, Hash256, LightClientFinalityUpdate, + Attestation, AttesterSlashing, BlobSidecar, EthSpec, Hash256, LightClientFinalityUpdate, LightClientOptimisticUpdate, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBlobSidecar, SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId, @@ -632,7 +632,10 @@ impl WorkEvent { pub fn rpc_blobs( block_root: Hash256, - blobs: FixedVector>>, T::EthSpec::MaxBlobsPerBlock>, + blobs: FixedVector< + Option>>, + <::EthSpec as EthSpec>::MaxBlobsPerBlock, + >, seen_timestamp: Duration, process_type: BlockProcessType, ) -> Self { @@ -948,7 +951,10 @@ pub enum Work { }, RpcBlobs { block_root: Hash256, - blobs: FixedVector>>, T::EthSpec::MaxBlobsPerBlock>, + blobs: FixedVector< + Option>>, + <::EthSpec as EthSpec>::MaxBlobsPerBlock, + >, seen_timestamp: Duration, process_type: BlockProcessType, }, diff --git a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs index 14ce8737b1..3dfd866a73 100644 --- a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs +++ b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs @@ -27,7 +27,6 @@ use slot_clock::SlotClock; use std::collections::{HashMap, HashSet}; use std::future::Future; use std::pin::Pin; -use std::sync::Arc; use std::task::Context; use std::time::Duration; use strum::AsRefStr; @@ -36,8 +35,7 @@ use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::time::error::Error as TimeError; use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey}; use types::{ - Attestation, EthSpec, Hash256, LightClientOptimisticUpdate, SignedAggregateAndProof, - SignedBeaconBlock, SubnetId, + Attestation, EthSpec, Hash256, LightClientOptimisticUpdate, SignedAggregateAndProof, SubnetId, }; const TASK_NAME: &str = "beacon_processor_reprocess_queue"; diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 3ff5adc13c..3cdb791083 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -680,21 +680,19 @@ impl Worker { // This value is not used presently, but it might come in handy for debugging. _seen_duration: Duration, ) { - // TODO match self .chain .process_blob(verified_blob, CountUnrealized::True) .await { Ok(AvailabilityProcessingStatus::Imported(_hash)) => { - todo!() - // add to metrics - // logging + //TODO(sean) add metrics and logging + self.chain.recompute_head_at_current_slot().await; } - Ok(AvailabilityProcessingStatus::MissingParts(block_hash)) => { + Ok(AvailabilityProcessingStatus::MissingParts(slot, block_hash)) => { self.send_sync_message(SyncMessage::UnknownBlockHashFromGossipBlob( - peer_id, block_hash, - )); //TODO(sean) update + slot, peer_id, block_hash, + )); } Err(_err) => { // handle errors @@ -991,6 +989,7 @@ impl Worker { ); None } + _ => todo!(), //TODO(sean) } } @@ -1045,12 +1044,10 @@ impl Worker { self.chain.recompute_head_at_current_slot().await; } - Ok(AvailabilityProcessingStatus::MissingParts(block_root)) => { + Ok(AvailabilityProcessingStatus::MissingParts(slot, block_root)) => { // make rpc request for blob self.send_sync_message(SyncMessage::UnknownBlockHashFromGossipBlob( - peer_id, - block_root, - Duration::from_secs(0), //TODO(sean) update + slot, peer_id, block_root, )); } Err(BlockError::ParentUnknown(block)) => { diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index 8233d5d8dd..cba272a35d 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -5,7 +5,7 @@ use crate::beacon_processor::work_reprocessing_queue::QueuedRpcBlock; use crate::beacon_processor::worker::FUTURE_SLOT_TOLERANCE; use crate::beacon_processor::DuplicateCache; use crate::metrics; -use crate::sync::manager::{BlockProcessType, SyncMessage}; +use crate::sync::manager::{BlockProcessType, ResponseType, SyncMessage}; use crate::sync::{BatchProcessResult, ChainId}; use beacon_chain::blob_verification::AsBlock; use beacon_chain::blob_verification::BlockWrapper; @@ -144,7 +144,10 @@ impl Worker { pub async fn process_rpc_blobs( self, block_root: Hash256, - blobs: FixedVector>>, T::EthSpec::MaxBlobsPerBlock>, + blobs: FixedVector< + Option>>, + <::EthSpec as EthSpec>::MaxBlobsPerBlock, + >, seen_timestamp: Duration, process_type: BlockProcessType, ) { diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index af105d3fe5..07e2039f98 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -1,4 +1,4 @@ -use beacon_chain::blob_verification::{AsBlock, BlockWrapper, MaybeAvailableBlock}; +use beacon_chain::blob_verification::{AsBlock, BlockWrapper}; use beacon_chain::data_availability_checker::DataAvailabilityChecker; use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; use lighthouse_network::rpc::RPCError; @@ -7,11 +7,11 @@ use lru_cache::LRUTimeCache; use slog::{debug, error, trace, warn, Logger}; use smallvec::SmallVec; use ssz_types::FixedVector; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::Duration; use store::Hash256; -use types::{BlobSidecar, SignedBeaconBlock, Slot}; +use types::{BlobSidecar, EthSpec, SignedBeaconBlock, Slot}; use self::parent_lookup::{LookupDownloadStatus, PARENT_FAIL_TOLERANCE}; use self::parent_lookup::{ParentLookup, ParentVerifyError}; @@ -259,7 +259,7 @@ impl BlockLookups { let triggered_parent_request = self .parent_lookups .iter() - .any(|lookup| lookup.chain_hash() == block_root); + .any(|lookup| lookup.chain_hash() == root); if triggered_parent_request { // The lookup status here is irrelevant because we wait until the parent chain @@ -269,7 +269,7 @@ impl BlockLookups { // This is the correct block, send it for processing if self .send_block_for_processing( - block_root, + root, BlockWrapper::Block(block), seen_timestamp, BlockProcessType::SingleBlock { id }, @@ -290,7 +290,7 @@ impl BlockLookups { cx.report_peer(peer_id, PeerAction::LowToleranceError, msg); debug!(self.log, "Single block lookup failed"; - "peer_id" => %peer_id, "error" => msg, "block_root" => %request.requested_block_root); + "peer_id" => %peer_id, "error" => msg, "block_root" => %request_ref.requested_block_root); // try the request again if possible if let Ok((peer_id, request)) = request_ref.request_block() { if let Ok(id) = cx.single_block_lookup_request(peer_id, request) { @@ -337,7 +337,7 @@ impl BlockLookups { if triggered_parent_request { // The lookup status here is irrelevant because we wait until the parent chain // is complete before processing the block. - let _ = request_ref.add_block(root, block)?; + let _ = request_ref.add_blobs(block_root, blobs)?; } else { // These are the correct blobs, send them for processing if self @@ -362,7 +362,7 @@ impl BlockLookups { cx.report_peer(peer_id, PeerAction::LowToleranceError, msg); debug!(self.log, "Single block lookup failed"; - "peer_id" => %peer_id, "error" => msg, "block_root" => %request.requested_block_root); + "peer_id" => %peer_id, "error" => msg, "block_root" => %request_ref.requested_block_root); // try the request again if possible if let Ok((peer_id, request)) = request_ref.request_blobs() { if let Ok(id) = cx.single_blobs_lookup_request(peer_id, request) { @@ -566,10 +566,6 @@ impl BlockLookups { self.search_block(block_root, peer_id, cx); self.parent_lookups.push(parent_lookup) } - LookupDownloadStatus::Err(e) => { - warn!(self.log, "Peer sent invalid response to parent request."; - "peer_id" => %peer_id, "reason" => %e); - } } } Ok(None) => { @@ -748,7 +744,7 @@ impl BlockLookups { self.single_block_lookups.retain_mut(|(block_id, blob_id, req)|{ if &Some(id) == block_id { req.block_request_state.register_failure_downloading(); - trace!(self.log, "Single block lookup failed"; "block" => %request.requested_block_root); + trace!(self.log, "Single block lookup failed"; "block" => %req.requested_block_root); match req.request_block() { Ok(Some((peer_id, block_request))) => { if let Ok(request_id) = cx.single_block_lookup_request(peer_id, block_request) { @@ -769,7 +765,7 @@ impl BlockLookups { } if &Some(id) == blob_id { req.blob_request_state.register_failure_downloading(); - trace!(self.log, "Single blob lookup failed"; "block" => %request.requested_block_root); + trace!(self.log, "Single blob lookup failed"; "block" => %req.requested_block_root); match req.request_blobs() { Ok(Some((peer_id, blob_request))) => { if let Ok(request_id) = cx.single_blobs_lookup_request(peer_id, blob_request) { @@ -851,7 +847,7 @@ impl BlockLookups { trace!(self.log, "Single block processing succeeded"; "block" => %root); true } - AvailabilityProcessingStatus::MissingParts(block_root) => { + AvailabilityProcessingStatus::MissingParts(_, block_root) => { self.search_block(block_root, peer_id, cx); false } @@ -907,7 +903,7 @@ impl BlockLookups { ResponseType::Block => { req.block_request_state.register_failure_processing(); match req.request_block() { - Ok(Some((peer_id, requeest))) => { + Ok(Some((peer_id, request))) => { if let Ok(request_id) = cx.single_block_lookup_request(peer_id, request) { @@ -965,7 +961,7 @@ impl BlockLookups { .parent_lookups .iter() .enumerate() - .find(|(index, _)| lookup.chain_hash() == chain_hash) + .find(|(_, lookup)| lookup.chain_hash() == chain_hash) .map(|(index, _)| index); let Some(mut parent_lookup) = index.map(|index|self.parent_lookups.remove(index)) else { @@ -993,7 +989,7 @@ impl BlockLookups { AvailabilityProcessingStatus::Imported(hash) => { trace!(self.log, "Parent block processing succeeded"; &parent_lookup) } - AvailabilityProcessingStatus::MissingParts(block_root) => { + AvailabilityProcessingStatus::MissingParts(_, block_root) => { trace!(self.log, "Parent missing parts, triggering single block lookup "; &parent_lookup) } }, @@ -1012,6 +1008,7 @@ impl BlockLookups { match result { BlockPartProcessingResult::Ok(AvailabilityProcessingStatus::MissingParts( + _, block_root, )) => { self.search_block(block_root, peer_id, cx); @@ -1124,14 +1121,42 @@ impl BlockLookups { debug!(self.log, "Parent chain processed"; "chain_hash" => %chain_hash, "result" => ?result); match result { BatchProcessResult::Success { .. } => { - //TODO(sean) find single blob and block lookups and send for processing + if let Some((index, (_, _, req))) = self + .single_block_lookups + .iter() + .enumerate() + .find(|(index, (_, _, req))| req.requested_block_root == chain_hash) + { + self.single_block_lookups + .get_mut(index) + .and_then(|(_, _, lookup)| lookup.get_downloaded_block()) + .map(|block_wrapper| { + // This is the correct block, send it for processing + if self + .send_block_for_processing( + chain_hash, + block_wrapper, + Duration::from_secs(0), //TODO(sean) pipe this through + BlockProcessType::SingleBlock { id }, + cx, + ) + .is_err() + { + // Remove to avoid inconsistencies + self.single_block_lookups.remove(index); + } + }); + } } BatchProcessResult::FaultyFailure { imported_blocks: _, penalty, } => { + //TODO(sean) improve peer scoring to block or blob granularity self.failed_chains.insert(chain_hash); - for peer_id in request.used_peers { + let mut all_peers = request.blob_request_state.used_peers.clone(); + all_peers.extend(request.blob_request_state.used_peers); + for peer_id in all_peers { cx.report_peer(peer_id, penalty, "parent_chain_failure") } } @@ -1181,14 +1206,17 @@ impl BlockLookups { fn send_blobs_for_processing( &self, block_root: Hash256, - blobs: FixedVector>>, T::EthSpec::MaxBlobsPerBlock>, - seen_timestamp: Duration, - id: BlockProcessType, + blobs: FixedVector< + Option>>, + <::EthSpec as EthSpec>::MaxBlobsPerBlock, + >, + duration: Duration, + process_type: BlockProcessType, cx: &mut SyncNetworkContext, ) -> Result<(), ()> { match cx.processor_channel_if_enabled() { Some(beacon_processor_send) => { - trace!(self.log, "Sending blobs for processing"; "block" => ?block_root, "process" => ?process_type); + trace!(self.log, "Sending blobs for processing"; "block" => ?block_root, "process_type" => ?process_type); let event = WorkEvent::rpc_blobs(block_root, blobs, duration, process_type); if let Err(e) = beacon_processor_send.try_send(event) { error!( diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index 49e5b1826e..23ce722bc1 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -192,7 +192,10 @@ impl ParentLookup { pub fn add_blobs( &mut self, block_root: Hash256, - blobs: FixedVector>>, T::EthSpec::MaxBlobsPerBlock>, + blobs: FixedVector< + Option>>, + <::EthSpec as EthSpec>::MaxBlobsPerBlock, + >, ) -> Result, ParentVerifyError> { self.current_parent_blob_request_id = None; self.current_parent_request @@ -302,7 +305,10 @@ impl ParentLookup { ) -> Result< Option<( Hash256, - FixedVector>>, T::EthSpec::MaxBlobsPerBlock>, + FixedVector< + Option>>, + <::EthSpec as EthSpec>::MaxBlobsPerBlock, + >, )>, ParentVerifyError, > { diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 81f968277b..2cbaeb5ffb 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -1,6 +1,5 @@ use crate::sync::block_lookups::parent_lookup::LookupDownloadStatus; use crate::sync::block_lookups::RootBlockTuple; -use beacon_chain::blob_verification::AsBlock; use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::data_availability_checker::{AvailabilityCheckError, DataAvailabilityChecker}; use beacon_chain::{get_block_root, BeaconChainTypes}; @@ -10,17 +9,19 @@ use rand::seq::IteratorRandom; use ssz_types::{FixedVector, VariableList}; use std::collections::HashSet; use std::sync::Arc; -use store::{EthSpec, Hash256}; +use store::Hash256; use strum::IntoStaticStr; use types::blob_sidecar::BlobIdentifier; -use types::{BlobSidecar, SignedBeaconBlock}; +use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; pub struct SingleBlockLookup { pub requested_block_root: Hash256, pub requested_ids: Vec, - pub downloaded_blobs: - FixedVector>>, T::EthSpec::MaxBlobsPerBlock>, - pub downloaded_block: Option>>, + pub downloaded_blobs: FixedVector< + Option>>, + <::EthSpec as EthSpec>::MaxBlobsPerBlock, + >, + pub downloaded_block: Option>>, pub block_request_state: SingleLookupRequestState, pub blob_request_state: SingleLookupRequestState, pub da_checker: Arc>, @@ -88,10 +89,25 @@ impl SingleBlockLookup Option> { + if self.requested_ids.is_empty() { + if let Some(block) = self.downloaded_block.take() { + return Some(BlockWrapper::BlockAndBlobs( + block, + self.downloaded_blobs.clone(), + )); + } + } + None + } + pub fn add_blobs( &mut self, block_root: Hash256, - blobs: FixedVector>>, T::EthSpec::MaxBlobsPerBlock>, + blobs: FixedVector< + Option>>, + <::EthSpec as EthSpec>::MaxBlobsPerBlock, + >, ) -> Result, LookupVerifyError> { for (index, blob_opt) in self.downloaded_blobs.iter_mut().enumerate() { if let Some(Some(downloaded_blob)) = blobs.get(index) { @@ -109,7 +125,7 @@ impl SingleBlockLookup Err(LookupVerifyError::AvailabilityCheck(e)), } } else { - Ok(LookupDownloadStatus::SearchBlock(block_hash)) + Ok(LookupDownloadStatus::SearchBlock(block_root)) } } @@ -118,12 +134,8 @@ impl SingleBlockLookup>, ) -> Result, LookupVerifyError> { - for (index, blob_opt) in self.downloaded_blobs.iter_mut().enumerate() { - if let Some(Some(downloaded_blob)) = blobs.get(index) { - //TODO(sean) should we log a warn if there is already a downloaded blob? - *blob_opt = Some(downloaded_blob); - } - } + //TODO(sean) check for existing block? + self.downloaded_block = Some(block); match self .da_checker @@ -204,27 +216,32 @@ impl SingleBlockLookup>>, ) -> Result< - Option>>, T::EthSpec::MaxBlobsPerBlock>>, - BlobVerifyError, + Option< + FixedVector< + Option>>, + <::EthSpec as EthSpec>::MaxBlobsPerBlock, + >, + >, + LookupVerifyError, > { match self.block_request_state.state { State::AwaitingDownload => { self.blob_request_state.register_failure_downloading(); - Err(BlobVerifyError::ExtraBlobsReturned) + Err(LookupVerifyError::ExtraBlobsReturned) } State::Downloading { peer_id } => match blob { Some(blob) => { let received_id = blob.id(); if !self.requested_ids.contains(&received_id) { self.blob_request_state.register_failure_downloading(); - Err(BlobVerifyError::UnrequestedBlobId) + Err(LookupVerifyError::UnrequestedBlobId) } else { // State should remain downloading until we receive the stream terminator. self.requested_ids.retain(|id| id != received_id); if let Some(blob_opt) = self.downloaded_blobs.get_mut(blob.index) { *blob_opt = Some(blob); } else { - return Err(BlobVerifyError::InvalidIndex(blob.index)); + return Err(LookupVerifyError::InvalidIndex(blob.index)); } } } @@ -233,11 +250,11 @@ impl SingleBlockLookup match block { + State::Processing { peer_id: _ } => match blob { Some(_) => { // We sent the blob for processing and received an extra blob. self.blob_request_state.register_failure_downloading(); - Err(BlobVerifyError::ExtraBlobsReturned) + Err(LookupVerifyError::ExtraBlobsReturned) } None => { // This is simply the stream termination and we are already processing the @@ -304,7 +321,7 @@ impl SingleBlockLookup SingleBlockLookup bool { - let is_useful = self.requested_ids.contains(blob_id); - if is_useful { - self.block_request_state.add_peer(peer_id); - } - is_useful - } } impl SingleLookupRequestState { diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index e6c5549cc9..25b20aa37f 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -1,4 +1,5 @@ use beacon_chain::blob_verification::BlockWrapper; +use ssz_types::FixedVector; use std::{collections::VecDeque, sync::Arc}; use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; @@ -55,7 +56,18 @@ impl BlocksAndBlobsRequestInfo { if blob_list.is_empty() { responses.push(BlockWrapper::Block(block)) } else { - responses.push(BlockWrapper::BlockAndBlobs(block, blob_list)) + let mut blobs_fixed = Vec::with_capacity(T::max_blobs_per_block()); + for blob in blob_list { + let blob_index = blob.index as usize; + if blob_index >= T::max_blobs_per_block() { + return Err(format!("Invalid blob index {blob_index:?}").as_str()); + } + blobs_fixed.insert(blob_index, Some(blob)); + } + responses.push(BlockWrapper::BlockAndBlobs( + block, + FixedVector::from(blobs_fixed), + )) } } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 0ab96e3141..22820b496d 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -41,10 +41,10 @@ use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent}; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; -use crate::sync::block_lookups::ResponseType; +pub use crate::sync::block_lookups::ResponseType; use crate::sync::range_sync::ByRangeRequestType; +use beacon_chain::blob_verification::AsBlock; use beacon_chain::blob_verification::BlockWrapper; -use beacon_chain::blob_verification::{AsBlock, MaybeAvailableBlock}; use beacon_chain::{ AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, EngineState, }; @@ -58,12 +58,10 @@ use slog::{crit, debug, error, info, trace, warn, Logger}; use slot_clock::SlotClock; use std::boxed::Box; use std::ops::Sub; -use std::sync::mpsc::TryRecvError; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; use tokio::time::sleep; -use types::blob_sidecar::BlobIdentifier; use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot}; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync @@ -662,12 +660,12 @@ impl SyncManager { } }; - if block.slot() == self.chain.slot_clock.now() { + if block.slot() == slot { if let Err(e) = self .delayed_lookups - .send(SyncMessage::UnknownBlock(peer_id, block, block_root)) + .try_send(SyncMessage::UnknownBlock(peer_id, block, block_root)) { - warn!(self.log, "Delayed lookups receiver dropped for block"; "block_root" => block_hash); + warn!(self.log, "Delayed lookups dropped for block"; "block_root" => ?block_root); } } else { self.block_lookups.search_current_unknown_parent( @@ -709,14 +707,11 @@ impl SyncManager { }; if slot == current_slot { - if let Err(e) = - self.delayed_lookups - .send(SyncMessage::UnknownBlockHashFromAttestation( - peer_id, block_hash, - )) - { - warn!(self.log, "Delayed lookups receiver dropped for block referenced by a blob"; - "block_root" => block_hash); + if let Err(e) = self.delayed_lookups.try_send( + SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_hash), + ) { + warn!(self.log, "Delayed lookup dropped for block referenced by a blob"; + "block_root" => ?block_hash); } } else { self.block_lookups