diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index e92c1ceb26..088d6d316e 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -8,7 +8,8 @@ use crate::beacon_proposer_cache::compute_proposer_duties_from_head; use crate::beacon_proposer_cache::BeaconProposerCache; use crate::blob_cache::BlobCache; use crate::blob_verification::{ - AsBlock, AvailableBlock, BlobError, BlockWrapper, IntoAvailableBlock, VerifiedBlobs, + self, AsBlock, AvailableBlock, BlobError, BlockWrapper, GossipVerifiedBlobSidecar, + IntoAvailableBlock, VerifiedBlobs, }; use crate::block_times_cache::BlockTimesCache; use crate::block_verification::{ @@ -1898,6 +1899,15 @@ impl BeaconChain { }) } + pub fn verify_blob_sidecar_for_gossip( + self: &Arc, + blob_sidecar: Arc>, + subnet_id: u64, + ) -> Result // TODO(pawan): make a GossipVerifedBlob type + { + blob_verification::validate_blob_sidecar_for_gossip(blob_sidecar, subnet_id, self) + } + /// Accepts some 'LightClientOptimisticUpdate' from the network and attempts to verify it pub fn verify_optimistic_update_for_gossip( self: &Arc, diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index d80fe03f79..7f0512ae67 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -7,6 +7,7 @@ use crate::beacon_chain::{ BeaconChain, BeaconChainTypes, MAXIMUM_GOSSIP_CLOCK_DISPARITY, VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT, }; +use crate::gossip_blob_cache::AvailabilityCheckError; use crate::snapshot_cache::PreProcessingSnapshot; use crate::BeaconChainError; use state_processing::per_block_processing::eip4844::eip4844::verify_kzg_commitments_against_transactions; @@ -112,6 +113,8 @@ pub enum BlobError { /// /// The block is invalid and the peer is faulty. UnknownValidator(u64), + + BlobCacheError(AvailabilityCheckError), } impl From for BlobError { @@ -138,15 +141,23 @@ impl GossipVerifiedBlob { self.blob } } +pub struct GossipVerifiedBlobSidecar { + /// Indicates if all blobs for a given block_root are available + /// in the blob cache. + pub all_blobs_available: bool, + pub block_root: Hash256, + // TODO(pawan): add an Arced blob sidecar which when returned to gossip_methods + // adds the entire thing to the blob cache. +} pub fn validate_blob_sidecar_for_gossip( - blob_sidecar: SignedBlobSidecar, + signed_blob_sidecar: Arc>, subnet: u64, chain: &BeaconChain, -) -> Result, BlobError> { - let blob_slot = blob_sidecar.message.slot; - let blob_index = blob_sidecar.message.index; - let block_root = blob_sidecar.message.block_root; +) -> Result { + let blob_slot = signed_blob_sidecar.message.slot; + let blob_index = signed_blob_sidecar.message.index; + let block_root = signed_blob_sidecar.message.block_root; // Verify that the blob_sidecar was received on the correct subnet. if blob_index != subnet { @@ -185,7 +196,7 @@ pub fn validate_blob_sidecar_for_gossip( // TODO(pawan): should we verify locally that the parent root is correct // or just use whatever the proposer gives us? - let proposer_shuffling_root = blob_sidecar.message.block_parent_root; + let proposer_shuffling_root = signed_blob_sidecar.message.block_parent_root; let (proposer_index, fork) = match chain .beacon_proposer_cache @@ -202,7 +213,7 @@ pub fn validate_blob_sidecar_for_gossip( } }; - let blob_proposer_index = blob_sidecar.message.proposer_index; + let blob_proposer_index = signed_blob_sidecar.message.proposer_index; if proposer_index != blob_proposer_index as usize { return Err(BlobError::ProposerIndexMismatch { sidecar: blob_proposer_index as usize, @@ -221,7 +232,7 @@ pub fn validate_blob_sidecar_for_gossip( .get(proposer_index as usize) .ok_or_else(|| BlobError::UnknownValidator(proposer_index as u64))?; - blob_sidecar.verify_signature( + signed_blob_sidecar.verify_signature( None, pubkey, &fork, @@ -239,7 +250,10 @@ pub fn validate_blob_sidecar_for_gossip( // TODO(pawan): Check if other blobs for the same proposer index and blob index have been // received and drop if required. - // TODO(pawan): potentially add to a seen cache at this point. + let da_checker = &chain.data_availability_checker; + let all_blobs_available = da_checker + .put_blob_temp(signed_blob_sidecar) + .map_err(BlobError::BlobCacheError)?; // Verify if the corresponding block for this blob has been received. // Note: this should be the last gossip check so that we can forward the blob @@ -257,8 +271,9 @@ pub fn validate_blob_sidecar_for_gossip( }); } - Ok(GossipVerifiedBlob { - blob: blob_sidecar.message, + Ok(GossipVerifiedBlobSidecar { + all_blobs_available, + block_root, }) } diff --git a/beacon_node/beacon_chain/src/gossip_blob_cache.rs b/beacon_node/beacon_chain/src/gossip_blob_cache.rs index 08fe4a6645..51e937b3bd 100644 --- a/beacon_node/beacon_chain/src/gossip_blob_cache.rs +++ b/beacon_node/beacon_chain/src/gossip_blob_cache.rs @@ -14,7 +14,7 @@ use std::future::Future; use std::sync::{mpsc, Arc}; use tokio::sync::mpsc::Sender; use types::blob_sidecar::{BlobIdentifier, BlobSidecar}; -use types::{EthSpec, Hash256, SignedBeaconBlock}; +use types::{EthSpec, Hash256, SignedBeaconBlock, SignedBlobSidecar}; #[derive(Debug)] pub enum AvailabilityCheckError { @@ -236,4 +236,21 @@ impl DataAvailabilityChecker { }; Ok(availability) } + + /// Adds the blob to the cache. Returns true if adding the blob completes + /// all the required blob sidecars for a given block root. + /// + /// Note: we can only know this if we know `block.kzg_commitments.len()` + pub fn put_blob_temp( + &self, + blob: Arc>, + ) -> Result { + unimplemented!() + } + + /// Returns all blobs associated with a given block root otherwise returns + /// a UnavailableBlobs error. + pub fn blobs(&self, block_root: Hash256) -> Result, AvailabilityCheckError> { + unimplemented!() + } } diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 91584ee303..b4c1607e6f 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -658,15 +658,36 @@ impl Worker { peer_client: Client, blob_index: u64, signed_blob: Arc>, + reprocess_tx: mpsc::Sender>, _seen_duration: Duration, ) { + if let Ok(gossip_verified) = self + .chain + .verify_blob_sidecar_for_gossip(signed_blob, blob_index) + { + if gossip_verified.all_blobs_available { + if reprocess_tx + .try_send(ReprocessQueueMessage::BlobsAvailable( + gossip_verified.block_root, + )) + .is_err() + { + { + error!( + self.log, + "Failed to send blob availability message"; + "block_root" => ?gossip_verified.block_root, + "location" => "block gossip" + ) + } + } + } + } // TODO: gossip verification crit!(self.log, "UNIMPLEMENTED gossip blob verification"; "peer_id" => %peer_id, "client" => %peer_client, "blob_topic" => blob_index, - "blob_index" => signed_blob.message.index, - "blob_slot" => signed_blob.message.slot ); }