diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index c682517e49..fdf355f39b 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -270,11 +270,26 @@ pub fn validate_blob_sidecar_for_gossip( }) } -#[derive(Debug, Clone)] +/// Wrapper over a `BlobSidecar` for which we have completed kzg verification. +/// i.e. `verify_blob_kzg_proof(blob, commitment, proof) == true`. +#[derive(Debug, Derivative, Clone)] +#[derivative(PartialEq, Eq)] pub struct KzgVerifiedBlob { blob: Arc>, } +impl PartialOrd for KzgVerifiedBlob { + fn partial_cmp(&self, other: &Self) -> Option { + self.blob.partial_cmp(&other.blob) + } +} + +impl Ord for KzgVerifiedBlob { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.blob.cmp(&other.blob) + } +} + impl KzgVerifiedBlob { pub fn to_blob(self) -> Arc> { self.blob @@ -291,8 +306,14 @@ impl KzgVerifiedBlob { pub fn block_root(&self) -> Hash256 { self.blob.block_root } + pub fn blob_index(&self) -> u64 { + self.blob.index + } } +/// Complete kzg verification for a `GossipVerifiedBlob`. +/// +/// Returns an error if the kzg verification check fails. pub fn verify_kzg_for_blob( blob: Arc>, kzg: &Kzg, @@ -307,6 +328,11 @@ pub fn verify_kzg_for_blob( } } +/// Complete kzg verification for a list of `BlobSidecar`s. +/// Returns an error if any of the `BlobSidecar`s fails kzg verification. +/// +/// Note: This function should be preferred over calling `verify_kzg_for_blob` +/// in a loop since this function kzg verifies a list of blobs more efficiently. pub fn verify_kzg_for_blob_list( blob_list: BlobSidecarList, kzg: &Kzg, @@ -346,6 +372,7 @@ pub enum MaybeAvailableBlock { AvailabilityPending(AvailabilityPendingBlock), } +/// Trait for common block operations. pub trait AsBlock { fn slot(&self) -> Slot; fn epoch(&self) -> Epoch; diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 7c13714686..36cc723319 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -784,14 +784,14 @@ impl AvailabilityPendingExecutedBlock { self.get_filtered_blob_ids(|_| true) } - pub fn get_filtered_blob_ids(&self, filter: impl Fn(usize) -> bool) -> Vec { + pub fn get_filtered_blob_ids(&self, filter: impl Fn(u64) -> bool) -> Vec { let num_blobs_expected = self.num_blobs_expected(); let mut blob_ids = Vec::with_capacity(num_blobs_expected); - for i in 0..num_blobs_expected { + for i in 0..num_blobs_expected as u64 { if filter(i) { blob_ids.push(BlobIdentifier { block_root: self.import_data.block_root, - index: i as u64, + index: i, }); } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 95bc3ed87a..173c9fb025 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -6,9 +6,9 @@ use crate::block_verification::{AvailabilityPendingExecutedBlock, AvailableExecu use kzg::Error as KzgError; use kzg::Kzg; -use parking_lot::{Mutex, RwLock}; +use parking_lot::RwLock; use slot_clock::SlotClock; -use ssz_types::{Error, VariableList}; +use ssz_types::{Error, FixedVector, VariableList}; use state_processing::per_block_processing::deneb::deneb::verify_kzg_commitments_against_transactions; use std::collections::hash_map::{Entry, OccupiedEntry}; use std::collections::HashMap; @@ -53,23 +53,34 @@ impl From for AvailabilityCheckError { /// have not been verified against blobs /// - blocks that have been fully verified and only require a data availability check pub struct DataAvailabilityChecker { - rpc_blob_cache: RwLock>>>, - gossip_blob_cache: Mutex>>, + availability_cache: RwLock>>, slot_clock: S, kzg: Option>, spec: ChainSpec, } -struct GossipBlobCache { - verified_blobs: Vec>, +/// Caches partially available blobs and execution verified blocks corresponding +/// to a given `block_hash` that are received over gossip. +/// +/// The blobs are all gossip and kzg verified. +/// The block has completed all verifications except the availability check. +struct ReceivedComponents { + /// We use a `BTreeMap` here to maintain the order of `BlobSidecar`s based on index. + verified_blobs: FixedVector>, T::MaxBlobsPerBlock>, executed_block: Option>, missing_blob_ids: Vec, } -impl GossipBlobCache { +impl ReceivedComponents { fn new_from_blob(blob: KzgVerifiedBlob) -> Self { + let mut verified_blobs = FixedVector::<_, _>::default(); + // TODO: verify that we've already ensured the blob index < T::MaxBlobsPerBlock + if let Some(mut_maybe_blob) = verified_blobs.get_mut(blob.blob_index() as usize) { + *mut_maybe_blob = Some(blob); + } + Self { - verified_blobs: vec![blob], + verified_blobs, executed_block: None, missing_blob_ids: vec![], } @@ -78,17 +89,33 @@ impl GossipBlobCache { fn new_from_block(block: AvailabilityPendingExecutedBlock) -> Self { let missing_blob_ids = block.get_all_blob_ids(); Self { - verified_blobs: vec![], + verified_blobs: <_>::default(), executed_block: Some(block), missing_blob_ids, } } + /// Returns `true` if the cache has all blobs corresponding to the + /// kzg commitments in the block. fn has_all_blobs(&self, block: &AvailabilityPendingExecutedBlock) -> bool { - self.verified_blobs.len() == block.num_blobs_expected() + for i in 0..block.num_blobs_expected() { + if self + .verified_blobs + .get(i) + .map(|maybe_blob| maybe_blob.is_none()) + .unwrap_or(true) + { + return false; + } + } + true } } +/// This type is returned after adding a block / blob to the `DataAvailabilityChecker`. +/// +/// Indicates if the block is fully `Available` or if we need blobs or blocks +/// to "complete" the requirements for an `AvailableBlock`. pub enum Availability { PendingBlobs(Vec), PendingBlock(Hash256), @@ -96,6 +123,8 @@ pub enum Availability { } impl Availability { + /// Returns all the blob identifiers associated with an `AvailableBlock`. + /// Returns `None` if avaiability hasn't been fully satisfied yet. pub fn get_available_blob_ids(&self) -> Option> { if let Self::Available(block) = self { Some(block.get_all_blob_ids()) @@ -108,17 +137,22 @@ impl Availability { impl DataAvailabilityChecker { pub fn new(slot_clock: S, kzg: Option>, spec: ChainSpec) -> Self { Self { - rpc_blob_cache: <_>::default(), - gossip_blob_cache: <_>::default(), + availability_cache: <_>::default(), slot_clock, kzg, spec, } } - /// Get a blob from the RPC cache. + /// Get a blob from the availability cache. pub fn get_blob(&self, blob_id: &BlobIdentifier) -> Option>> { - self.rpc_blob_cache.read().get(blob_id).cloned() + self.availability_cache + .read() + .get(&blob_id.block_root)? + .verified_blobs + .get(blob_id.index as usize)? + .as_ref() + .map(|kzg_verified_blob| kzg_verified_blob.clone_blob()) } pub fn put_rpc_blob( @@ -137,9 +171,9 @@ impl DataAvailabilityChecker { }) } - /// This first validate the KZG commitments included in the blob sidecar. + /// This first validates the KZG commitments included in the blob sidecar. /// Check if we've cached other blobs for this block. If it completes a set and we also - /// have a block cached, return the Availability variant triggering block import. + /// have a block cached, return the `Availability` variant triggering block import. /// Otherwise cache the blob sidecar. /// /// This should only accept gossip verified blobs, so we should not have to worry about dupes. @@ -154,34 +188,24 @@ impl DataAvailabilityChecker { return Err(AvailabilityCheckError::KzgNotInitialized); }; - self.put_kzg_verified_blob(kzg_verified_blob, |_, _| true) - } - - fn put_kzg_verified_blob( - &self, - kzg_verified_blob: KzgVerifiedBlob, - predicate: impl FnOnce(BlobIdentifier, &[BlobIdentifier]) -> bool, - ) -> Result, AvailabilityCheckError> { - let blob = kzg_verified_blob.clone_blob(); - let blob_id = blob.id(); - let mut blob_cache = self.gossip_blob_cache.lock(); - - // Gossip cache. - let availability = match blob_cache.entry(blob.block_root) { + let availability = match self + .availability_cache + .write() + .entry(kzg_verified_blob.block_root()) + { Entry::Occupied(mut occupied_entry) => { // All blobs reaching this cache should be gossip verified and gossip verification // should filter duplicates, as well as validate indices. - let cache = occupied_entry.get_mut(); + let received_components = occupied_entry.get_mut(); - if !predicate(blob_id, cache.missing_blob_ids.as_slice()) { - // ignore this blob + if let Some(maybe_verified_blob) = received_components + .verified_blobs + .get_mut(kzg_verified_blob.blob_index() as usize) + { + *maybe_verified_blob = Some(kzg_verified_blob) } - cache - .verified_blobs - .insert(blob.index as usize, kzg_verified_blob); - - if let Some(executed_block) = cache.executed_block.take() { + if let Some(executed_block) = received_components.executed_block.take() { self.check_block_availability_maybe_cache(occupied_entry, executed_block)? } else { Availability::PendingBlock(blob.block_root) @@ -189,18 +213,11 @@ impl DataAvailabilityChecker { } Entry::Vacant(vacant_entry) => { let block_root = kzg_verified_blob.block_root(); - vacant_entry.insert(GossipBlobCache::new_from_blob(kzg_verified_blob)); + vacant_entry.insert(ReceivedComponents::new_from_blob(kzg_verified_blob)); Availability::PendingBlock(block_root) } }; - drop(blob_cache); - - if let Some(blob_ids) = availability.get_available_blob_ids() { - self.prune_rpc_blob_cache(&blob_ids); - } else { - self.rpc_blob_cache.write().insert(blob_id, blob); - } Ok(availability) } @@ -210,44 +227,56 @@ impl DataAvailabilityChecker { &self, executed_block: AvailabilityPendingExecutedBlock, ) -> Result, AvailabilityCheckError> { - let mut guard = self.gossip_blob_cache.lock(); - let entry = guard.entry(executed_block.import_data.block_root); - - let availability = match entry { + let availability = match self + .availability_cache + .write() + .entry(executed_block.import_data.block_root) + { Entry::Occupied(occupied_entry) => { self.check_block_availability_maybe_cache(occupied_entry, executed_block)? } Entry::Vacant(vacant_entry) => { let all_blob_ids = executed_block.get_all_blob_ids(); - vacant_entry.insert(GossipBlobCache::new_from_block(executed_block)); + vacant_entry.insert(ReceivedComponents::new_from_block(executed_block)); Availability::PendingBlobs(all_blob_ids) } }; - drop(guard); - - if let Some(blob_ids) = availability.get_available_blob_ids() { - self.prune_rpc_blob_cache(&blob_ids); - } - Ok(availability) } + /// Checks if the provided `executed_block` contains all required blobs to be considered an + /// `AvailableBlock` based on blobs that are cached. + /// + /// Returns an error if there was an error when matching the block commitments against blob commitments. + /// + /// Returns `Ok(Availability::Available(_))` if all blobs for the block are present in cache. + /// Returns `Ok(Availability::PendingBlobs(_))` if all corresponding blobs have not been received in the cache. fn check_block_availability_maybe_cache( &self, - mut occupied_entry: OccupiedEntry>, + mut occupied_entry: OccupiedEntry>, executed_block: AvailabilityPendingExecutedBlock, ) -> Result, AvailabilityCheckError> { if occupied_entry.get().has_all_blobs(&executed_block) { + let num_blobs_expected = executed_block.num_blobs_expected(); let AvailabilityPendingExecutedBlock { block, import_data, payload_verification_outcome, } = executed_block; - let cache = occupied_entry.remove(); + let ReceivedComponents { + verified_blobs, + executed_block: _, + } = occupied_entry.remove(); - let available_block = self.make_available(block, cache.verified_blobs)?; + let verified_blobs = Vec::from(verified_blobs) + .into_iter() + .take(num_blobs_expected) + .map(|maybe_blob| maybe_blob.ok_or(AvailabilityCheckError::MissingBlobs)) + .collect::, _>>()?; + + let available_block = self.make_available(block, verified_blobs)?; Ok(Availability::Available(Box::new( AvailableExecutedBlock::new( available_block, @@ -256,19 +285,23 @@ impl DataAvailabilityChecker { ), ))) } else { - let cache = occupied_entry.get_mut(); + let received_components = occupied_entry.get_mut(); - let missing_blob_ids = executed_block - .get_filtered_blob_ids(|index| cache.verified_blobs.get(index).is_none()); + let missing_blob_ids = executed_block.get_filtered_blob_ids(|index| { + received_components + .verified_blobs + .get(index as usize) + .map(|maybe_blob| maybe_blob.is_none()) + .unwrap_or(true) + }); - let _ = cache.executed_block.insert(executed_block); - cache.missing_blob_ids = missing_blob_ids.clone(); + let _ = received_components.executed_block.insert(executed_block); Ok(Availability::PendingBlobs(missing_blob_ids)) } } - /// Checks if a block is available, returns a MaybeAvailableBlock enum that may include the fully + /// Checks if a block is available, returns a `MaybeAvailableBlock` that may include the fully /// available block. pub fn check_availability( &self, @@ -351,11 +384,11 @@ impl DataAvailabilityChecker { /// Verifies an AvailabilityPendingBlock against a set of KZG verified blobs. /// This does not check whether a block *should* have blobs, these checks should must have been - /// completed when producing the AvailabilityPendingBlock. + /// completed when producing the `AvailabilityPendingBlock`. pub fn make_available( &self, block: AvailabilityPendingBlock, - blobs: KzgVerifiedBlobList, + blobs: Vec>, ) -> Result, AvailabilityCheckError> { let block_kzg_commitments = block.kzg_commitments()?; if blobs.len() != block_kzg_commitments.len() { @@ -438,13 +471,6 @@ impl DataAvailabilityChecker { self.data_availability_boundary() .map_or(false, |da_epoch| block_epoch >= da_epoch) } - - pub fn prune_rpc_blob_cache(&self, blob_ids: &[BlobIdentifier]) { - let mut guard = self.rpc_blob_cache.write(); - for id in blob_ids { - guard.remove(id); - } - } } pub enum BlobRequirements { @@ -458,6 +484,12 @@ pub enum BlobRequirements { PreDeneb, } +/// A wrapper over a `SignedBeaconBlock` where we have not verified availability of +/// corresponding `BlobSidecar`s and hence, is not ready for import into fork choice. +/// +/// Note: This wrapper does not necessarily correspond to a pre-deneb block as a pre-deneb +/// block that is ready for import will be of type `AvailableBlock` with its `blobs` field +/// set to `VerifiedBlobs::PreDeneb`. #[derive(Clone, Debug, PartialEq)] pub struct AvailabilityPendingBlock { block: Arc>, @@ -482,6 +514,20 @@ impl AvailabilityPendingBlock { } } +#[derive(Clone, Debug, PartialEq)] +pub enum VerifiedBlobs { + /// These blobs are available. + Available(BlobSidecarList), + /// This block is from outside the data availability boundary so doesn't require + /// a data availability check. + NotRequired, + /// The block's `kzg_commitments` field is empty so it does not contain any blobs. + EmptyBlobs, + /// This is a block prior to the 4844 fork, so doesn't require any blobs + PreDeneb, +} + +/// A fully available block that is ready to be imported into fork choice. #[derive(Clone, Debug, PartialEq)] pub struct AvailableBlock { block: Arc>, @@ -503,19 +549,6 @@ impl AvailableBlock { } } -#[derive(Clone, Debug, PartialEq)] -pub enum VerifiedBlobs { - /// These blobs are available. - Available(BlobSidecarList), - /// This block is from outside the data availability boundary so doesn't require - /// a data availability check. - NotRequired, - /// The block's `kzg_commitments` field is empty so it does not contain any blobs. - EmptyBlobs, - /// This is a block prior to the 4844 fork, so doesn't require any blobs - PreDeneb, -} - impl AsBlock for AvailableBlock { fn slot(&self) -> Slot { self.block.slot() diff --git a/beacon_node/lighthouse_network/src/types/pubsub.rs b/beacon_node/lighthouse_network/src/types/pubsub.rs index 49cd8860b3..c69610cdb0 100644 --- a/beacon_node/lighthouse_network/src/types/pubsub.rs +++ b/beacon_node/lighthouse_network/src/types/pubsub.rs @@ -12,7 +12,7 @@ use types::{ Attestation, AttesterSlashing, EthSpec, ForkContext, ForkName, LightClientFinalityUpdate, LightClientOptimisticUpdate, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockCapella, - SignedBeaconBlockMerge, SignedBlobSidecar, SignedBlsToExecutionChange, + SignedBeaconBlockDeneb, SignedBeaconBlockMerge, SignedBlobSidecar, SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId, }; @@ -184,16 +184,14 @@ impl PubsubMessage { SignedBeaconBlockMerge::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?, ), - Some(ForkName::Deneb) => { - return Err( - "beacon_block topic is not used from deneb fork onwards" - .to_string(), - ) - } Some(ForkName::Capella) => SignedBeaconBlock::::Capella( SignedBeaconBlockCapella::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?, ), + Some(ForkName::Deneb) => SignedBeaconBlock::::Deneb( + SignedBeaconBlockDeneb::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?, + ), None => { return Err(format!( "Unknown gossipsub fork digest: {:?}", diff --git a/consensus/types/src/blob_sidecar.rs b/consensus/types/src/blob_sidecar.rs index de2c5e487f..9e38e3e215 100644 --- a/consensus/types/src/blob_sidecar.rs +++ b/consensus/types/src/blob_sidecar.rs @@ -19,6 +19,18 @@ pub struct BlobIdentifier { pub index: u64, } +impl PartialOrd for BlobIdentifier { + fn partial_cmp(&self, other: &Self) -> Option { + self.index.partial_cmp(&other.index) + } +} + +impl Ord for BlobIdentifier { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.index.cmp(&other.index) + } +} + #[derive( Debug, Clone, @@ -34,7 +46,7 @@ pub struct BlobIdentifier { )] #[serde(bound = "T: EthSpec")] #[arbitrary(bound = "T: EthSpec")] -#[derivative(PartialEq, Hash(bound = "T: EthSpec"))] +#[derivative(PartialEq, Eq, Hash(bound = "T: EthSpec"))] pub struct BlobSidecar { pub block_root: Hash256, #[serde(with = "eth2_serde_utils::quoted_u64")] @@ -49,6 +61,18 @@ pub struct BlobSidecar { pub kzg_proof: KzgProof, } +impl PartialOrd for BlobSidecar { + fn partial_cmp(&self, other: &Self) -> Option { + self.index.partial_cmp(&other.index) + } +} + +impl Ord for BlobSidecar { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.index.cmp(&other.index) + } +} + pub type BlobSidecarList = VariableList>, ::MaxBlobsPerBlock>; pub type Blobs = VariableList, ::MaxExtraDataBytes>;