diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 72fdf96cb6..d1f1a126c7 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -468,7 +468,7 @@ pub struct BeaconChain { /// Provides monitoring of a set of explicitly defined validators. pub validator_monitor: RwLock>, pub proposal_blob_cache: BlobCache, - pub data_availability_checker: DataAvailabilityChecker, + pub data_availability_checker: Arc>, pub kzg: Option>, } @@ -2775,8 +2775,8 @@ impl BeaconChain { block_root: Hash256, unverified_block: B, count_unrealized: CountUnrealized, - notify_execution_layer: notifyexecutionlayer, - ) -> result> { + notify_execution_layer: NotifyExecutionLayer, + ) -> Result> { // Start the Prometheus timer. let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES); @@ -2915,12 +2915,9 @@ impl BeaconChain { Availability::Available(block) => { self.import_available_block(block, count_unrealized).await } - Availability::PendingBlock(block_root) => { - Ok(AvailabilityProcessingStatus::PendingBlock(block_root)) + Availability::MissingParts(block_root) => { + Ok(AvailabilityProcessingStatus::MissingParts(block_root)) } - Availability::PendingBlobs(block_root, blob_ids) => Ok( - AvailabilityProcessingStatus::PendingBlobs(block_root, blob_ids), - ), } } diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index 70c6fbe375..531e376e88 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -372,17 +372,6 @@ pub enum MaybeAvailableBlock { AvailabilityPending(AvailabilityPendingBlock), } -impl MaybeAvailableBlock { - pub fn get_missing_blob_ids(&self) -> Option<&Vec> { - match self { - MaybeAvailableBlock::Available(_) => None, - MaybeAvailableBlock::AvailabilityPending(pending_block) => { - Some(pending_block.get_missing_blob_ids()) - } - } - } -} - /// Trait for common block operations. pub trait AsBlock { fn slot(&self) -> Slot; diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 8b292c400f..9f31306d21 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -146,7 +146,7 @@ pub enum BlockError { /// /// It's unclear if this block is valid, but it cannot be processed without already knowing /// its parent. - ParentUnknown(MaybeAvailableBlock), + ParentUnknown(BlockWrapper), /// The block skips too many slots and is a DoS risk. TooManySkippedSlots { parent_slot: Slot, @@ -311,6 +311,7 @@ pub enum BlockError { parent_root: Hash256, }, BlobValidation(BlobError), + AvailabilityCheck(AvailabilityCheckError), } impl From for BlockError { @@ -1331,7 +1332,7 @@ impl ExecutionPendingBlock { // because it will revert finalization. Note that the finalized block is stored in fork // choice, so we will not reject any child of the finalized block (this is relevant during // genesis). - return Err(BlockError::ParentUnknown(block)); + return Err(BlockError::ParentUnknown(block.into_block_wrapper())); } // Reject any block that exceeds our limit on skipped slots. @@ -1795,7 +1796,7 @@ pub fn check_block_is_finalized_checkpoint_or_descendant< block_parent_root: block.parent_root(), }) } else { - Err(BlockError::ParentUnknown(block)) + Err(BlockError::ParentUnknown(block.into_block_wrapper())) } } } @@ -1876,7 +1877,7 @@ fn verify_parent_block_is_known( { Ok((proto_block, block)) } else { - Err(BlockError::ParentUnknown(block)) + Err(BlockError::ParentUnknown(block.into_block_wrapper())) } } @@ -1907,7 +1908,7 @@ fn load_parent>( .fork_choice_read_lock() .contains_block(&block.parent_root()) { - return Err(BlockError::ParentUnknown(block)); + return Err(BlockError::ParentUnknown(block.into_block_wrapper())); } let block_delay = chain diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 7620a588d6..ebb61afffe 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -850,11 +850,11 @@ where slasher: self.slasher.clone(), validator_monitor: RwLock::new(validator_monitor), //TODO(sean) should we move kzg solely to the da checker? - data_availability_checker: DataAvailabilityChecker::new( + data_availability_checker: Arc::new(DataAvailabilityChecker::new( slot_clock, kzg.clone(), self.spec, - ), + )), proposal_blob_cache: BlobCache::default(), kzg, }; diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index aea1907dd5..ac2673b984 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -122,23 +122,10 @@ impl ReceivedComponents { /// Indicates if the block is fully `Available` or if we need blobs or blocks /// to "complete" the requirements for an `AvailableBlock`. pub enum Availability { - PendingBlobs(Hash256, Vec), - PendingBlock(Hash256), + MissingParts(Hash256), Available(Box>), } -impl Availability { - /// Returns all the blob identifiers associated with an `AvailableBlock`. - /// Returns `None` if avaiability hasn't been fully satisfied yet. - pub fn get_available_blob_ids(&self) -> Option> { - if let Self::Available(block) = self { - Some(block.get_all_blob_ids()) - } else { - None - } - } -} - impl DataAvailabilityChecker { pub fn new(slot_clock: S, kzg: Option>, spec: ChainSpec) -> Self { Self { @@ -161,7 +148,6 @@ impl DataAvailabilityChecker { BlobRequirements::PreDeneb => BlockWrapper::Block(block), BlobRequirements::Required => { let expected_num_blobs = block - .block() .message() .body() .blob_kzg_commitments() @@ -175,7 +161,7 @@ impl DataAvailabilityChecker { num_blobs: blobs.len(), }); } - for blob in blobs { + for blob in blobs.iter() { if blob.block_root != block_root { return Err(AvailabilityCheckError::BlockBlobRootMismatch { block_root, @@ -278,12 +264,12 @@ impl DataAvailabilityChecker { if let Some(executed_block) = received_components.executed_block.take() { self.check_block_availability_maybe_cache(occupied_entry, executed_block)? } else { - Availability::PendingBlock(block_root) + Availability::MissingParts(block_root) } } Entry::Vacant(vacant_entry) => { vacant_entry.insert(ReceivedComponents::new_from_blobs(kzg_verified_blobs)); - Availability::PendingBlock(block_root) + Availability::MissingParts(block_root) } }; @@ -305,10 +291,9 @@ impl DataAvailabilityChecker { self.check_block_availability_maybe_cache(occupied_entry, executed_block)? } Entry::Vacant(vacant_entry) => { - let all_blob_ids = executed_block.get_all_blob_ids(); let block_root = executed_block.import_data.block_root; vacant_entry.insert(ReceivedComponents::new_from_block(executed_block)); - Availability::PendingBlobs(block_root, all_blob_ids) + Availability::MissingParts(block_root) } }; @@ -357,19 +342,11 @@ impl DataAvailabilityChecker { } else { let received_components = occupied_entry.get_mut(); - let missing_blob_ids = executed_block.get_filtered_blob_ids(|index, _| { - received_components - .verified_blobs - .get(index as usize) - .map(|maybe_blob| maybe_blob.is_none()) - .unwrap_or(true) - }); - let block_root = executed_block.import_data.block_root; let _ = received_components.executed_block.insert(executed_block); - Ok(Availability::PendingBlobs(block_root, missing_blob_ids)) + Ok(Availability::MissingParts(block_root)) } } diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index b413f93aab..e1d21cca62 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -53,13 +53,8 @@ pub(crate) struct BlockLookups { /// Parent chain lookups being downloaded. parent_lookups: SmallVec<[ParentLookup; 3]>, - processing_parent_lookups: HashMap< - Hash256, - ( - Vec, - SingleBlockLookup, - ), - >, + processing_parent_lookups: + HashMap, SingleBlockLookup)>, /// A cache of failed chain lookups to prevent duplicate searches. failed_chains: LRUTimeCache, @@ -68,10 +63,11 @@ pub(crate) struct BlockLookups { /// received or not. /// /// The flag allows us to determine if the peer returned data or sent us nothing. - single_block_lookups: - FnvHashMap>, - - blob_ids_to_block_ids: HashMap, + single_block_lookups: Vec<( + Option, + Option, + SingleBlockLookup, + )>, da_checker: Arc>, @@ -96,7 +92,6 @@ impl BlockLookups { )), single_block_lookups: Default::default(), da_checker, - blob_ids_to_block_ids: Default::default(), log, } } @@ -119,8 +114,10 @@ impl BlockLookups { // Do not re-request a block that is already being requested if self .single_block_lookups - .values_mut() - .any(|single_block_request| single_block_request.add_peer(&hash, &peer_id)) + .iter_mut() + .any(|(block_id, blob_id, single_block_request)| { + single_block_request.add_peer(&hash, &peer_id) + }) { return; } @@ -152,27 +149,27 @@ impl BlockLookups { let mut single_block_request = SingleBlockLookup::new(hash, peer_id, da_checker); cache_fn(&mut single_block_request); - let (peer_id, block_request) = single_block_request - .request_block() - .expect("none of the possible failure cases apply for a newly created block lookup"); - let (peer_id, blob_request) = single_block_request - .request_blobs() - .expect("none of the possible failure cases apply for a newly created blob lookup"); + let block_request_id = + if let Ok(Some((peer_id, block_request))) = single_block_request.request_block() { + cx.single_block_lookup_request(peer_id, block_request).ok() + } else { + None + }; - if let (Ok(request_id), Ok(blob_request_id)) = ( - cx.single_block_lookup_request(peer_id, block_request), - cx.single_blobs_lookup_request(peer_id, blob_request), - ) { - self.single_block_lookups - .insert(request_id, single_block_request); - self.blob_ids_to_block_ids - .insert(blob_request_id, request_id); + let blob_request_id = + if let Ok(Some((peer_id, blob_request))) = single_block_request.request_blobs() { + cx.single_blobs_lookup_request(peer_id, blob_request).ok() + } else { + None + }; - metrics::set_gauge( - &metrics::SYNC_SINGLE_BLOCK_LOOKUPS, - self.single_block_lookups.len() as i64, - ); - } + self.single_block_lookups + .push((block_request_id, blob_request_id, single_block_request)); + + metrics::set_gauge( + &metrics::SYNC_SINGLE_BLOCK_LOOKUPS, + self.single_block_lookups.len() as i64, + ); } pub fn search_current_unknown_parent( @@ -182,7 +179,14 @@ impl BlockLookups { peer_id: PeerId, cx: &mut SyncNetworkContext, ) { - self.search_block_with(|request| request.add_block(block), block_root, peer_id, cx); + self.search_block_with( + |request| { + let _ = request.add_block_wrapper(block_root, block); + }, + block_root, + peer_id, + cx, + ); } /// If a block is attempted to be processed but we do not know its parent, this function is @@ -474,6 +478,10 @@ impl BlockLookups { match parent_lookup.verify_blob(blob, &mut self.failed_chains) { Ok(Some(blobs)) => { + let block_root = blobs + .first() + .map(|blob| blob.block_root) + .unwrap_or(parent_lookup.chain_hash()); let processed_or_search = parent_lookup.add_blobs(blobs); match processed_or_search { @@ -695,7 +703,7 @@ impl BlockLookups { error!(self.log, "Beacon chain error processing single block"; "block_root" => %root, "error" => ?e); } BlockError::ParentUnknown(block) => { - self.search_parent(root, block, peer_id, cx); + self.search_parent(root, block.parent_root(), peer_id, cx); } ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => { // These errors indicate that the execution layer is offline @@ -964,13 +972,14 @@ impl BlockLookups { let response = parent_lookup .request_parent_block(cx) .and_then(|| parent_lookup.request_parent_blobs(cx)); - self.handle_response(parent_lookup, response); + self.handle_response(parent_lookup, cx, response); } //TODO(sean) how should peer scoring work with failures in this method? fn handle_response( &mut self, mut parent_lookup: ParentLookup, + cx: &mut SyncNetworkContext, result: Result<(), parent_lookup::RequestError>, ) { match result { diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index 7236089572..9b41b1de72 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -35,7 +35,7 @@ pub(crate) struct ParentLookup { /// The blocks that have currently been downloaded. downloaded_blocks: Vec>, /// Request of the last parent. - pub current_parent_request: SingleBlockLookup, + pub current_parent_request: SingleBlockLookup, /// Id of the last parent request. current_parent_request_id: Option, current_parent_blob_request_id: Option, @@ -101,30 +101,31 @@ impl ParentLookup { return Err(RequestError::ChainTooLong); } - let (peer_id, request) = self.current_parent_request.make_request()?; - match cx.parent_lookup_block_request(peer_id, request) { - Ok(request_id) => { - self.current_parent_request_id = Some(request_id); - Ok(()) - } - Err(reason) => { - self.current_parent_request_id = None; - Err(RequestError::SendFailed(reason)) + if let Some((peer_id, request)) = self.current_parent_request.request_block()? { + match cx.parent_lookup_block_request(peer_id, request) { + Ok(request_id) => { + self.current_parent_request_id = Some(request_id); + Ok(()) + } + Err(reason) => { + self.current_parent_request_id = None; + Err(RequestError::SendFailed(reason)) + } } } + Ok(()) } pub fn request_parent_blobs( &mut self, cx: &mut SyncNetworkContext, ) -> Result<(), RequestError> { - if let Some(blob_req) = self.current_parent_request.as_mut() { - // check to make sure this request hasn't failed - if self.downloaded_blocks.len() >= PARENT_DEPTH_TOLERANCE { - return Err(RequestError::ChainTooLong); - } + // check to make sure this request hasn't failed + if self.downloaded_blocks.len() >= PARENT_DEPTH_TOLERANCE { + return Err(RequestError::ChainTooLong); + } - let (peer_id, request) = blob_req.request_blobs()?; + if let Some((peer_id, request)) = self.current_parent_request.request_blobs()? { match cx.parent_lookup_blobs_request(peer_id, request) { Ok(request_id) => { self.current_parent_blob_request_id = Some(request_id); @@ -183,14 +184,13 @@ impl ParentLookup { Hash256, Vec>, Vec, - SingleBlockLookup, + SingleBlockLookup, Option>, ) { let ParentLookup { chain_hash, downloaded_blocks, current_parent_request, - current_parent_blob_request, current_parent_request_id: _, current_parent_blob_request_id: _, } = self; @@ -253,7 +253,9 @@ impl ParentLookup { .map(|(_, block)| block.parent_root()) { if failed_chains.contains(&parent_root) { - self.current_parent_request.register_failure_downloading(); + self.current_parent_request + .block_request_state + .register_failure_downloading(); self.current_parent_request_id = None; return Err(VerifyError::PreviousFailure { parent_root }); } @@ -267,11 +269,7 @@ impl ParentLookup { blob: Option>>, failed_chains: &mut lru_cache::LRUTimeCache, ) -> Result>>>, VerifyError> { - let blobs = self - .current_parent_blob_request - .map(|mut req| req.verify_blob(blob)) - .transpose()? - .flatten(); + let blobs = self.current_parent_request.verify_blob(blob)?; // check if the parent of this block isn't in the failed cache. If it is, this chain should // be dropped and the peer downscored. @@ -281,7 +279,8 @@ impl ParentLookup { .map(|blob| blob.block_parent_root) { if failed_chains.contains(&parent_root) { - self.current_parent_blob_request + self.current_parent_request + .blob_request_state .register_failure_downloading(); self.current_parent_blob_request_id = None; return Err(VerifyError::PreviousFailure { parent_root }); diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 64eb292146..e41dd1d6b0 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -23,7 +23,7 @@ use types::{BlobSidecar, SignedBeaconBlock}; pub struct SingleBlockLookup { pub requested_block_root: Hash256, pub requested_ids: Vec, - pub downloaded_blobs: Vec>>, + pub downloaded_blobs: Vec>>>, pub downloaded_block: Option>>, pub block_request_state: SingleLookupRequestState, pub blob_request_state: SingleLookupRequestState, @@ -83,8 +83,6 @@ impl SingleBlockLookup>, ) -> Self { - da_checker.get_missing_parts_for_hash(&requested_block_root); - Self { requested_block_root, requested_ids: vec![], @@ -99,8 +97,8 @@ impl SingleBlockLookup>>, - ) -> RequestResult { + blobs: Vec>>, + ) -> RequestResult { //TODO(sean) smart extend, we don't want dupes self.downloaded_blobs.extend(blobs); @@ -118,11 +116,35 @@ impl SingleBlockLookup>, - ) -> RequestResult { + block: Arc>, + ) -> RequestResult { //TODO(sean) check for existing block? self.downloaded_block = Some(block); + match self + .da_checker + .zip_block(block_root, block, self.downloaded_blobs) + { + Ok(wrapper) => RequestResult::Process(wrapper), + Err(AvailabilityCheckError::MissingBlobs) => RequestResult::SearchBlock(block_root), + _ => todo!(), + } + } + + pub fn add_block_wrapper( + &mut self, + block_root: Hash256, + block: BlockWrapper, + ) -> RequestResult { + match block { + BlockWrapper::Block(block) => self.add_block(block_root, block), + BlockWrapper::BlockAndBlobs(block, blobs) => { + //TODO(sean) check for existing block? + self.downloaded_block = Some(block); + self.add_blobs(block_root, blobs) + } + } + match self.da_checker.zip_block(block_root, block, blobs) { Ok(wrapper) => RequestResult::Process(wrapper), Err(AvailabilityCheckError::MissingBlobs) => RequestResult::SearchBlock(block_root), @@ -134,8 +156,8 @@ impl SingleBlockLookup>>, - ) -> Result>, VerifyError> { + block: Option>>, + ) -> Result>, VerifyError> { match self.block_request_state.state { State::AwaitingDownload => { self.block_request_state.register_failure_downloading(); @@ -178,37 +200,10 @@ impl SingleBlockLookup Result<(PeerId, BlocksByRootRequest), LookupRequestError> { - debug_assert!(matches!( - self.block_request_state.state, - State::AwaitingDownload - )); - if self.failed_attempts() >= MAX_ATTEMPTS { - Err(LookupRequestError::TooManyAttempts { - cannot_process: self.block_request_state.failed_processing - >= self.block_request_state.failed_downloading, - }) - } else if let Some(&peer_id) = self - .block_request_state - .available_peers - .iter() - .choose(&mut rand::thread_rng()) - { - let request = BlocksByRootRequest { - block_roots: VariableList::from(vec![self.requested_block_root]), - }; - self.block_request_state.state = State::Downloading { peer_id }; - self.block_request_state.used_peers.insert(peer_id); - Ok((peer_id, request)) - } else { - Err(LookupRequestError::NoPeers) - } - } - - pub fn verify_blob( + pub fn verify_blob( &mut self, - blob: Option>>, - ) -> Result>>>, BlobVerifyError> { + blob: Option>>, + ) -> Result>>>, BlobVerifyError> { match self.block_request_state.state { State::AwaitingDownload => { self.blob_request_state.register_failure_downloading(); @@ -246,7 +241,46 @@ impl SingleBlockLookup Result<(PeerId, BlobsByRootRequest), LookupRequestError> { + pub fn request_block( + &mut self, + ) -> Result, LookupRequestError> { + if self.da_checker.has_block(self.requested_block_root) || self.downloaded_block.is_some() { + return Ok(None); + } + + debug_assert!(matches!( + self.block_request_state.state, + State::AwaitingDownload + )); + if self.failed_attempts() >= MAX_ATTEMPTS { + Err(LookupRequestError::TooManyAttempts { + cannot_process: self.block_request_state.failed_processing + >= self.block_request_state.failed_downloading, + }) + } else if let Some(&peer_id) = self + .block_request_state + .available_peers + .iter() + .choose(&mut rand::thread_rng()) + { + let request = BlocksByRootRequest { + block_roots: VariableList::from(vec![self.requested_block_root]), + }; + self.block_request_state.state = State::Downloading { peer_id }; + self.block_request_state.used_peers.insert(peer_id); + Ok(Some((peer_id, request))) + } else { + Err(LookupRequestError::NoPeers) + } + } + + pub fn request_blobs( + &mut self, + ) -> Result, LookupRequestError> { + if self.da_checker.has_block(self.requested_block_root) || self.downloaded_block.is_some() { + return Ok(None); + } + debug_assert!(matches!( self.block_request_state.state, State::AwaitingDownload @@ -267,7 +301,7 @@ impl SingleBlockLookup SingleLookupRequestState { } } -impl slog::Value for SingleBlockLookup { +impl slog::Value + for SingleBlockLookup +{ fn serialize( &self, record: &slog::Record, diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 5de054412f..a77fd233fc 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -120,7 +120,7 @@ pub enum SyncMessage { }, /// A block with an unknown parent has been received. - UnknownBlock(PeerId, MaybeAvailableBlock, Hash256), + UnknownBlock(PeerId, BlockWrapper, Hash256), /// A peer has sent an attestation that references a block that is unknown. This triggers the /// manager to attempt to find the block matching the unknown hash. @@ -246,15 +246,18 @@ pub fn spawn( log.clone(), ), range_sync: RangeSync::new(beacon_chain.clone(), log.clone()), - backfill_sync: BackFillSync::new(beacon_chain, network_globals, log.clone()), - block_lookups: BlockLookups::new(log.clone()), + backfill_sync: BackFillSync::new(beacon_chain.clone(), network_globals, log.clone()), + block_lookups: BlockLookups::new( + beacon_chain.data_availability_checker.clone(), + log.clone(), + ), delayed_lookups: delayed_lookups_send, log: log.clone(), }; executor.spawn( async move { - let slot_duration = slot_clock.slot_duration(); + let slot_duration = beacon_chain.slot_clock.slot_duration(); // TODO(sean) think about what this should be let delay = beacon_chain.slot_clock.unagg_attestation_production_delay(); @@ -346,7 +349,7 @@ impl SyncManager { } RequestId::ParentLookup { id } => { self.block_lookups - .parent_lookup_failed(id, peer_id, &mut self.network, eror); + .parent_lookup_failed(id, peer_id, &mut self.network, error); } RequestId::BackFillBlocks { id } => { if let Some(batch_id) = self