diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index fdf355f39b..531e376e88 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -334,7 +334,7 @@ pub fn verify_kzg_for_blob( /// Note: This function should be preferred over calling `verify_kzg_for_blob` /// in a loop since this function kzg verifies a list of blobs more efficiently. pub fn verify_kzg_for_blob_list( - blob_list: BlobSidecarList, + blob_list: Vec>>, kzg: &Kzg, ) -> Result, AvailabilityCheckError> { let (blobs, (commitments, proofs)): (Vec<_>, (Vec<_>, Vec<_>)) = blob_list diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 36cc723319..9f31306d21 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -781,21 +781,17 @@ impl AvailabilityPendingExecutedBlock { } pub fn get_all_blob_ids(&self) -> Vec { - self.get_filtered_blob_ids(|_| true) + let block_root = self.import_data.block_root; + self.block + .get_filtered_blob_ids(Some(block_root), |_, _| true) } - pub fn get_filtered_blob_ids(&self, filter: impl Fn(u64) -> bool) -> Vec { - let num_blobs_expected = self.num_blobs_expected(); - let mut blob_ids = Vec::with_capacity(num_blobs_expected); - for i in 0..num_blobs_expected as u64 { - if filter(i) { - blob_ids.push(BlobIdentifier { - block_root: self.import_data.block_root, - index: i, - }); - } - } - blob_ids + pub fn get_filtered_blob_ids( + &self, + filter: impl Fn(usize, Hash256) -> bool, + ) -> Vec { + self.block + .get_filtered_blob_ids(Some(self.import_data.block_root), filter) } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 173c9fb025..69fd6da4db 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -12,6 +12,7 @@ use ssz_types::{Error, FixedVector, VariableList}; use state_processing::per_block_processing::deneb::deneb::verify_kzg_commitments_against_transactions; use std::collections::hash_map::{Entry, OccupiedEntry}; use std::collections::HashMap; +use std::ops::Index; use std::sync::Arc; use types::beacon_block_body::KzgCommitments; use types::blob_sidecar::{BlobIdentifier, BlobSidecar}; @@ -39,6 +40,10 @@ pub enum AvailabilityCheckError { }, Pending, IncorrectFork, + BlockBlobRootMismatch { + block_root: Hash256, + blob_block_root: Hash256, + }, } impl From for AvailabilityCheckError { @@ -68,30 +73,28 @@ struct ReceivedComponents { /// We use a `BTreeMap` here to maintain the order of `BlobSidecar`s based on index. verified_blobs: FixedVector>, T::MaxBlobsPerBlock>, executed_block: Option>, - missing_blob_ids: Vec, } impl ReceivedComponents { - fn new_from_blob(blob: KzgVerifiedBlob) -> Self { + fn new_from_blobs(blobs: &[KzgVerifiedBlob]) -> Self { let mut verified_blobs = FixedVector::<_, _>::default(); - // TODO: verify that we've already ensured the blob index < T::MaxBlobsPerBlock - if let Some(mut_maybe_blob) = verified_blobs.get_mut(blob.blob_index() as usize) { - *mut_maybe_blob = Some(blob); + for blob in blobs { + // TODO: verify that we've already ensured the blob index < T::MaxBlobsPerBlock + if let Some(mut_maybe_blob) = verified_blobs.get_mut(blob.blob_index() as usize) { + *mut_maybe_blob = Some(blob.clone()); + } } Self { verified_blobs, executed_block: None, - missing_blob_ids: vec![], } } fn new_from_block(block: AvailabilityPendingExecutedBlock) -> Self { - let missing_blob_ids = block.get_all_blob_ids(); Self { verified_blobs: <_>::default(), executed_block: Some(block), - missing_blob_ids, } } @@ -155,20 +158,19 @@ impl DataAvailabilityChecker { .map(|kzg_verified_blob| kzg_verified_blob.clone_blob()) } - pub fn put_rpc_blob( + pub fn put_rpc_blobs( &self, - blob: Arc>, + block_root: Hash256, + blobs: Vec>>, ) -> Result, AvailabilityCheckError> { // Verify the KZG commitment. - let kzg_verified_blob = if let Some(kzg) = self.kzg.as_ref() { - verify_kzg_for_blob(blob, kzg)? + let kzg_verified_blobs = if let Some(kzg) = self.kzg.as_ref() { + verify_kzg_for_blob_list(blobs, kzg)? } else { return Err(AvailabilityCheckError::KzgNotInitialized); }; - self.put_kzg_verified_blob(kzg_verified_blob, |blob_id, missing_blob_ids| { - missing_blob_ids.contains(&blob_id) - }) + self.put_kzg_verified_blobs(block_root, &kzg_verified_blobs) } /// This first validates the KZG commitments included in the blob sidecar. @@ -188,32 +190,47 @@ impl DataAvailabilityChecker { return Err(AvailabilityCheckError::KzgNotInitialized); }; - let availability = match self - .availability_cache - .write() - .entry(kzg_verified_blob.block_root()) - { + self.put_kzg_verified_blobs(kzg_verified_blob.block_root(), &[kzg_verified_blob]) + } + + fn put_kzg_verified_blobs( + &self, + block_root: Hash256, + kzg_verified_blobs: &[KzgVerifiedBlob], + ) -> Result, AvailabilityCheckError> { + for blob in kzg_verified_blobs { + let blob_block_root = blob.block_root(); + if blob_block_root != block_root { + return Err(AvailabilityCheckError::BlockBlobRootMismatch { + block_root, + blob_block_root, + }); + } + } + + let availability = match self.availability_cache.write().entry(block_root) { Entry::Occupied(mut occupied_entry) => { // All blobs reaching this cache should be gossip verified and gossip verification // should filter duplicates, as well as validate indices. let received_components = occupied_entry.get_mut(); - if let Some(maybe_verified_blob) = received_components - .verified_blobs - .get_mut(kzg_verified_blob.blob_index() as usize) - { - *maybe_verified_blob = Some(kzg_verified_blob) + for kzg_verified_blob in kzg_verified_blobs { + if let Some(maybe_verified_blob) = received_components + .verified_blobs + .get_mut(kzg_verified_blob.blob_index() as usize) + { + *maybe_verified_blob = Some(kzg_verified_blob.clone()) + } } if let Some(executed_block) = received_components.executed_block.take() { self.check_block_availability_maybe_cache(occupied_entry, executed_block)? } else { - Availability::PendingBlock(blob.block_root) + Availability::PendingBlock(block_root) } } Entry::Vacant(vacant_entry) => { - let block_root = kzg_verified_blob.block_root(); - vacant_entry.insert(ReceivedComponents::new_from_blob(kzg_verified_blob)); + vacant_entry.insert(ReceivedComponents::new_from_blobs(kzg_verified_blobs)); Availability::PendingBlock(block_root) } }; @@ -287,7 +304,7 @@ impl DataAvailabilityChecker { } else { let received_components = occupied_entry.get_mut(); - let missing_blob_ids = executed_block.get_filtered_blob_ids(|index| { + let missing_blob_ids = executed_block.get_filtered_blob_ids(|index, _| { received_components .verified_blobs .get(index as usize) @@ -314,7 +331,7 @@ impl DataAvailabilityChecker { .kzg .as_ref() .ok_or(AvailabilityCheckError::KzgNotInitialized)?; - let verified_blobs = verify_kzg_for_blob_list(VariableList::new(blob_list)?, kzg)?; + let verified_blobs = verify_kzg_for_blob_list(blob_list, kzg)?; Ok(MaybeAvailableBlock::Available( self.check_availability_with_blobs(block, verified_blobs)?, @@ -323,6 +340,40 @@ impl DataAvailabilityChecker { } } + /// For a given block wrapper, find the missing blobs. Useful for parent unknown blocks. + /// Because these don't otherwise hit the data availability caches. + pub fn get_missing_blob_ids( + &self, + block: BlockWrapper, + block_root: Option, + ) -> Result, AvailabilityCheckError> { + let (block, blobs) = block.deconstruct(); + let maybe_available = self.check_availability_without_blobs(block)?; + let blob_ids = match &maybe_available { + MaybeAvailableBlock::Available(_) => { + vec![] + } + MaybeAvailableBlock::AvailabilityPending(pending_block) => { + if let Some(blobs) = blobs { + pending_block.get_filtered_blob_ids(block_root, |index_usize, block_root| { + let index = index_usize as u64; + let blob_in_wrapper = blobs + .get(index_usize) + .map(|blob| blob.index == index) + .unwrap_or(false); + let blob_in_cache = self + .get_blob(&BlobIdentifier { block_root, index }) + .is_some(); + !blob_in_wrapper && !blob_in_cache + }) + } else { + pending_block.get_all_blob_ids(block_root) + } + } + }; + Ok(blob_ids) + } + /// Checks if a block is available, returning an error if the block is not immediately available. /// Does not access the gossip cache. pub fn try_check_availability( @@ -495,6 +546,36 @@ pub struct AvailabilityPendingBlock { block: Arc>, } +impl AvailabilityPendingBlock { + pub fn num_blobs_expected(&self) -> usize { + self.kzg_commitments() + .map_or(0, |commitments| commitments.len()) + } + + pub fn get_all_blob_ids(&self, block_root: Option) -> Vec { + self.get_filtered_blob_ids(block_root, |_, _| true) + } + + pub fn get_filtered_blob_ids( + &self, + block_root: Option, + filter: impl Fn(usize, Hash256) -> bool, + ) -> Vec { + let block_root = block_root.unwrap_or_else(|| self.as_block().canonical_root()); + let num_blobs_expected = self.num_blobs_expected(); + let mut blob_ids = Vec::with_capacity(num_blobs_expected); + for i in 0..num_blobs_expected { + if filter(i, block_root) { + blob_ids.push(BlobIdentifier { + block_root, + index: i as u64, + }); + } + } + blob_ids + } +} + impl AvailabilityPendingBlock { pub fn to_block(self) -> Arc> { self.block diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index caa358904b..9e3c0bd0c8 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -625,15 +625,17 @@ impl WorkEvent { } } - pub fn rpc_blob( - blob: Arc>, + pub fn rpc_blobs( + block_root: Hash256, + blobs: Vec>>, seen_timestamp: Duration, process_type: BlockProcessType, ) -> Self { Self { drop_during_sync: false, - work: Work::RpcBlob { - block: blob, + work: Work::RpcBlobs { + block_root, + blobs, seen_timestamp, process_type, }, @@ -936,8 +938,9 @@ pub enum Work { process_type: BlockProcessType, should_process: bool, }, - RpcBlob { - block: Arc>, + RpcBlobs { + block_root: Hash256, + blobs: Vec>>, seen_timestamp: Duration, process_type: BlockProcessType, }, @@ -1000,7 +1003,7 @@ impl Work { Work::GossipLightClientFinalityUpdate { .. } => GOSSIP_LIGHT_CLIENT_FINALITY_UPDATE, Work::GossipLightClientOptimisticUpdate { .. } => GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE, Work::RpcBlock { .. } => RPC_BLOCK, - Work::RpcBlob { .. } => RPC_BLOB, + Work::RpcBlobs { .. } => RPC_BLOB, Work::ChainSegment { .. } => CHAIN_SEGMENT, Work::Status { .. } => STATUS_PROCESSING, Work::BlocksByRangeRequest { .. } => BLOCKS_BY_RANGE_REQUEST, @@ -1513,7 +1516,7 @@ impl BeaconProcessor { optimistic_update_queue.push(work, work_id, &self.log) } Work::RpcBlock { .. } => rpc_block_queue.push(work, work_id, &self.log), - Work::RpcBlob { .. } => rpc_blob_queue.push(work, work_id, &self.log), + Work::RpcBlobs { .. } => rpc_blob_queue.push(work, work_id, &self.log), Work::ChainSegment { ref process_id, .. } => match process_id { ChainSegmentProcessId::RangeBatchId { .. } | ChainSegmentProcessId::ParentLookup { .. } => { @@ -1936,12 +1939,14 @@ impl BeaconProcessor { duplicate_cache, should_process, )), - Work::RpcBlob { - block, + Work::RpcBlobs { + block_root, + blobs, seen_timestamp, process_type, - } => task_spawner.spawn_async(worker.process_rpc_blob( - block, + } => task_spawner.spawn_async(worker.process_rpc_blobs( + block_root, + blobs, seen_timestamp, process_type, )), diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index 264d02b9b1..e856e9a075 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -136,16 +136,21 @@ impl Worker { drop(handle); } - pub async fn process_rpc_blob( + pub async fn process_rpc_blobs( self, - blob: Arc>, + block_root: Hash256, + blobs: Vec>>, seen_timestamp: Duration, process_type: BlockProcessType, ) { let result = self .chain .check_availability_and_maybe_import( - |chain| chain.data_availability_checker.put_rpc_blob(blob), + |chain| { + chain + .data_availability_checker + .put_rpc_blobs(block_root, blobs) + }, CountUnrealized::True, ) .await; diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 47e964a7c3..a531d42603 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -1,5 +1,5 @@ use std::collections::hash_map::Entry; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::thread::sleep; use std::time::Duration; @@ -19,6 +19,7 @@ use types::{BlobSidecar, SignedBeaconBlock}; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent}; use crate::metrics; +use crate::sync::block_lookups::single_block_lookup::SingleBlobRequest; use self::parent_lookup::PARENT_FAIL_TOLERANCE; use self::{ @@ -59,6 +60,8 @@ pub(crate) struct BlockLookups { /// The flag allows us to determine if the peer returned data or sent us nothing. single_block_lookups: FnvHashMap>, + single_blob_lookups: FnvHashMap>, + /// The logger for the import manager. log: Logger, } @@ -72,6 +75,7 @@ impl BlockLookups { FAILED_CHAINS_CACHE_EXPIRY_SECONDS, )), single_block_lookups: Default::default(), + single_blob_lookups: Default::default(), log, } } @@ -137,6 +141,56 @@ impl BlockLookups { cx: &mut SyncNetworkContext, ) { todo!() + + // + // let hash = Hash256::zero(); + // + // // Do not re-request a blo that is already being requested + // if self + // .single_blob_lookups + // .values_mut() + // .any(|single_block_request| single_block_request.add_peer(&hash, &peer_id)) + // { + // return; + // } + // + // if self.parent_lookups.iter_mut().any(|parent_req| { + // parent_req.add_peer(&hash, &peer_id) || parent_req.contains_block(&hash) + // }) { + // // If the block was already downloaded, or is being downloaded in this moment, do not + // // request it. + // return; + // } + // + // if self + // .processing_parent_lookups + // .values() + // .any(|(hashes, _last_parent_request)| hashes.contains(&hash)) + // { + // // we are already processing this block, ignore it. + // return; + // } + // + // debug!( + // self.log, + // "Searching for block"; + // "peer_id" => %peer_id, + // "block" => %hash + // ); + // + // let mut single_block_request = SingleBlobRequest::new(hash, peer_id); + // + // let (peer_id, request) = single_block_request + // .request_block() + // .expect("none of the possible failure cases apply for a newly created block lookup"); + // if let Ok(request_id) = cx.single_block_lookup_request(peer_id, request) { + // self.single_blob_lookups + // .insert(request_id, single_block_request); + // + // metrics::set_gauge( + // &metrics::SYNC_SINGLE_BLOB_LOOKUPS, + // self.single_blob_lookups.len() as i64, + // ); } pub fn search_block_delayed( @@ -171,6 +225,13 @@ impl BlockLookups { peer_id: PeerId, cx: &mut SyncNetworkContext, ) { + // + // let missing_ids = cx.chain.data_availability_checker.get_missing_blob_ids(block, Some(root)); + // // TODO(sean) how do we handle this erroring? + // if let Ok(missing_ids) = missing_ids { + // self.search_blobs(missing_ids, peer_id, cx); + // } + let parent_root = block.parent_root(); // If this block or it's parent is part of a known failed chain, ignore it. if self.failed_chains.contains(&parent_root) || self.failed_chains.contains(&block_root) { @@ -497,19 +558,17 @@ impl BlockLookups { }; match result { - BlockProcessResult::Ok(status) => { - match status { - AvailabilityProcessingStatus::Imported(hash) => { - trace!(self.log, "Single block processing succeeded"; "block" => %root); - } - AvailabilityProcessingStatus::PendingBlobs(blobs) => { - // trigger? - } - AvailabilityProcessingStatus::PendingBlock(hash) => { - // logic error - } + BlockProcessResult::Ok(status) => match status { + AvailabilityProcessingStatus::Imported(hash) => { + trace!(self.log, "Single block processing succeeded"; "block" => %root); } - } + AvailabilityProcessingStatus::PendingBlobs(blobs_ids) => { + self.search_blobs(blobs_ids, peer_id, cx); + } + AvailabilityProcessingStatus::PendingBlock(hash) => { + warn!(self.log, "Block processed but returned PendingBlock"; "block" => %hash); + } + }, BlockProcessResult::Ignored => { // Beacon processor signalled to ignore the block processing result. // This implies that the cpu is overloaded. Drop the request. @@ -620,13 +679,18 @@ impl BlockLookups { BlockProcessResult::Ok(AvailabilityProcessingStatus::PendingBlock(_)) => { // doesn't make sense } - BlockProcessResult::Ok(AvailabilityProcessingStatus::PendingBlobs(blobs)) => { - // trigger + BlockProcessResult::Ok(AvailabilityProcessingStatus::PendingBlobs(blobs_ids)) => { + self.search_blobs(blobs_ids, peer_id, cx); } BlockProcessResult::Err(BlockError::ParentUnknown(block)) => { - // need to keep looking for parents - // add the block back to the queue and continue the search + // TODO(sean) how do we handle this erroring? + let missing_ids = cx + .chain + .data_availability_checker + .get_missing_blob_ids(block.clone(), None) + .unwrap_or_default(); parent_lookup.add_block(block); + self.search_blobs(missing_ids, peer_id, cx); self.request_parent(parent_lookup, cx); } BlockProcessResult::Ok(AvailabilityProcessingStatus::Imported(_)) diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 3698da33c1..d486f4ba78 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -29,7 +29,23 @@ pub struct SingleBlockRequest { failed_processing: u8, /// How many times have we attempted to download this block. failed_downloading: u8, - missing_blobs: Vec, +} + +#[derive(PartialEq, Eq)] +pub struct SingleBlobRequest { + /// The hash of the requested block. + pub hash: Hash256, + pub blob_ids: Vec, + /// State of this request. + pub state: State, + /// Peers that should have this block. + pub available_peers: HashSet, + /// Peers from which we have requested this block. + pub used_peers: HashSet, + /// How many times have we attempted to process this block. + failed_processing: u8, + /// How many times have we attempted to download this block. + failed_downloading: u8, } #[derive(Debug, PartialEq, Eq)] @@ -65,7 +81,6 @@ impl SingleBlockRequest { used_peers: HashSet::default(), failed_processing: 0, failed_downloading: 0, - missing_blobs: vec![], } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 42b3279e85..840e94f3b3 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -64,7 +64,7 @@ pub struct SyncNetworkContext { /// Channel to send work to the beacon processor. beacon_processor_send: mpsc::Sender>, - chain: Arc>, + pub chain: Arc>, /// Logger for the `SyncNetworkContext`. log: slog::Logger, @@ -411,6 +411,8 @@ impl SyncNetworkContext { } } + // TODO(sean) add single blob lookup + parent lookup request methods + /// Sends a blocks by root request for a single block lookup. pub fn single_block_lookup_request( &mut self,