From 3c1687d23ca16e77f498f3919adcb8d9085542d1 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Mon, 20 Mar 2023 21:09:00 +0530 Subject: [PATCH] Merge stuff --- beacon_node/beacon_chain/src/beacon_chain.rs | 8 +- .../beacon_chain/src/blob_verification.rs | 23 +-- .../network/src/beacon_processor/mod.rs | 5 +- .../beacon_processor/worker/gossip_methods.rs | 133 ++++++++---------- beacon_node/network/src/router/mod.rs | 2 +- beacon_node/network/src/router/processor.rs | 2 +- beacon_node/network/src/sync/manager.rs | 6 +- 7 files changed, 76 insertions(+), 103 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 088d6d316e..5aab4cde9a 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -8,8 +8,8 @@ use crate::beacon_proposer_cache::compute_proposer_duties_from_head; use crate::beacon_proposer_cache::BeaconProposerCache; use crate::blob_cache::BlobCache; use crate::blob_verification::{ - self, AsBlock, AvailableBlock, BlobError, BlockWrapper, GossipVerifiedBlobSidecar, - IntoAvailableBlock, VerifiedBlobs, + self, AsBlock, AvailableBlock, BlobError, BlockWrapper, GossipVerifiedBlob, IntoAvailableBlock, + VerifiedBlobs, }; use crate::block_times_cache::BlockTimesCache; use crate::block_verification::{ @@ -1901,9 +1901,9 @@ impl BeaconChain { pub fn verify_blob_sidecar_for_gossip( self: &Arc, - blob_sidecar: Arc>, + blob_sidecar: SignedBlobSidecar, subnet_id: u64, - ) -> Result // TODO(pawan): make a GossipVerifedBlob type + ) -> Result, BlobError> // TODO(pawan): make a GossipVerifedBlob type { blob_verification::validate_blob_sidecar_for_gossip(blob_sidecar, subnet_id, self) } diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index 7f0512ae67..ed082083aa 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -141,20 +141,12 @@ impl GossipVerifiedBlob { self.blob } } -pub struct GossipVerifiedBlobSidecar { - /// Indicates if all blobs for a given block_root are available - /// in the blob cache. - pub all_blobs_available: bool, - pub block_root: Hash256, - // TODO(pawan): add an Arced blob sidecar which when returned to gossip_methods - // adds the entire thing to the blob cache. -} pub fn validate_blob_sidecar_for_gossip( - signed_blob_sidecar: Arc>, + signed_blob_sidecar: SignedBlobSidecar, subnet: u64, chain: &BeaconChain, -) -> Result { +) -> Result, BlobError> { let blob_slot = signed_blob_sidecar.message.slot; let blob_index = signed_blob_sidecar.message.index; let block_root = signed_blob_sidecar.message.block_root; @@ -250,11 +242,6 @@ pub fn validate_blob_sidecar_for_gossip( // TODO(pawan): Check if other blobs for the same proposer index and blob index have been // received and drop if required. - let da_checker = &chain.data_availability_checker; - let all_blobs_available = da_checker - .put_blob_temp(signed_blob_sidecar) - .map_err(BlobError::BlobCacheError)?; - // Verify if the corresponding block for this blob has been received. // Note: this should be the last gossip check so that we can forward the blob // over the gossip network even if we haven't received the corresponding block yet @@ -265,15 +252,15 @@ pub fn validate_blob_sidecar_for_gossip( .get_block(&block_root) .or_else(|| chain.early_attester_cache.get_proto_block(block_root)); // TODO(pawan): should we be checking this cache? + // TODO(pawan): this may be redundant with the new `AvailabilityProcessingStatus::PendingBlock variant` if block_opt.is_none() { return Err(BlobError::UnknownHeadBlock { beacon_block_root: block_root, }); } - Ok(GossipVerifiedBlobSidecar { - all_blobs_available, - block_root, + Ok(GossipVerifiedBlob { + blob: signed_blob_sidecar.message, }) } diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index d725342d30..66447845dc 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -454,7 +454,7 @@ impl WorkEvent { peer_id: PeerId, peer_client: Client, blob_index: u64, - signed_blob: Arc>, + signed_blob: SignedBlobSidecar, seen_timestamp: Duration, ) -> Self { Self { @@ -881,7 +881,7 @@ pub enum Work { peer_id: PeerId, peer_client: Client, blob_index: u64, - signed_blob: Arc>, + signed_blob: SignedBlobSidecar, seen_timestamp: Duration, }, DelayedImportBlock { @@ -1804,6 +1804,7 @@ impl BeaconProcessor { peer_client, blob_index, signed_blob, + work_reprocessing_tx, seen_timestamp, ) .await diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index b4c1607e6f..27b7104d99 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -657,38 +657,62 @@ impl Worker { peer_id: PeerId, peer_client: Client, blob_index: u64, - signed_blob: Arc>, + signed_blob: SignedBlobSidecar, reprocess_tx: mpsc::Sender>, _seen_duration: Duration, ) { - if let Ok(gossip_verified) = self + match self .chain .verify_blob_sidecar_for_gossip(signed_blob, blob_index) { - if gossip_verified.all_blobs_available { - if reprocess_tx - .try_send(ReprocessQueueMessage::BlobsAvailable( - gossip_verified.block_root, - )) - .is_err() - { - { - error!( - self.log, - "Failed to send blob availability message"; - "block_root" => ?gossip_verified.block_root, - "location" => "block gossip" - ) - } - } + Ok(gossip_verified_blob) => { + self.process_gossip_verified_blob( + peer_id, + gossip_verified_blob, + reprocess_tx, + _seen_duration, + ) + .await + } + Err(_) => { + // TODO(pawan): handle all blob errors for peer scoring + todo!() + } + } + } + + pub async fn process_gossip_verified_blob( + self, + peer_id: PeerId, + verified_blob: GossipVerifiedBlob, + reprocess_tx: mpsc::Sender>, + // This value is not used presently, but it might come in handy for debugging. + _seen_duration: Duration, + ) { + // TODO + match self + .chain + .process_blob(verified_blob.to_blob(), CountUnrealized::True) + .await + { + Ok(AvailabilityProcessingStatus::Imported(hash)) => { + todo!() + // add to metrics + // logging + } + Ok(AvailabilityProcessingStatus::PendingBlobs(pending_blobs)) => self + .send_sync_message(SyncMessage::UnknownBlobHash { + peer_id, + pending_blobs, + }), + Ok(AvailabilityProcessingStatus::PendingBlock(block_hash)) => { + self.send_sync_message(SyncMessage::UnknownBlockHash(peer_id, block_hash)); + } + Err(e) => { + // handle errors + todo!() } } - // TODO: gossip verification - crit!(self.log, "UNIMPLEMENTED gossip blob verification"; - "peer_id" => %peer_id, - "client" => %peer_client, - "blob_topic" => blob_index, - ); } /// Process the beacon block received from the gossip network and: @@ -986,29 +1010,6 @@ impl Worker { } } - pub async fn process_gossip_verified_blob( - self, - peer_id: PeerId, - verified_blob: GossipVerifiedBlob, - reprocess_tx: mpsc::Sender>, - // This value is not used presently, but it might come in handy for debugging. - _seen_duration: Duration, - ) { - // TODO - match self - .chain - .process_blob(verified_blob.to_blob(), CountUnrealized::True) - .await - { - Ok(hash) => { - // block imported - } - Err(e) => { - // handle errors - } - } - } - /// Process the beacon block that has already passed gossip verification. /// /// Raises a log if there are errors. @@ -1159,35 +1160,19 @@ impl Worker { self.chain.recompute_head_at_current_slot().await; } Ok(AvailabilityProcessingStatus::PendingBlock(block_root)) => { - // make rpc request for block - todo!() + // This error variant doesn't make any sense in this context + crit!( + self.log, + "Internal error. Cannot get AvailabilityProcessingStatus::PendingBlock on processing block"; + "block_root" => %block_root + ); } - Ok(AvailabilityProcessingStatus::PendingBlobs(blob_ids)) => { + Ok(AvailabilityProcessingStatus::PendingBlobs(pending_blobs)) => { // make rpc request for blob - // let block_slot = block.block.slot(); - // // Make rpc request for blobs - // self.send_sync_message(SyncMessage::UnknownBlobHash { - // peer_id, - // block_root: block.block_root, - // }); - - // // Send block to reprocessing queue to await blobs - // if reprocess_tx - // .try_send(ReprocessQueueMessage::ExecutedBlock(QueuedExecutedBlock { - // peer_id, - // block, - // seen_timestamp: seen_duration, - // })) - // .is_err() - // { - // error!( - // self.log, - // "Failed to send partially verified block to reprocessing queue"; - // "block_slot" => %block_slot, - // "block_root" => ?block_root, - // "location" => "block gossip" - // ) - // } + self.send_sync_message(SyncMessage::UnknownBlobHash { + peer_id, + pending_blobs, + }); } Err(BlockError::AvailabilityCheck(_)) => { todo!() diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index a18ce4e7c8..067d84044f 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -257,7 +257,7 @@ impl Router { peer_id, self.network_globals.client(&peer_id), blob_index, - Arc::new(signed_blob), + signed_blob, ); } PubsubMessage::VoluntaryExit(exit) => { diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 76962b373f..281d0ef616 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -365,7 +365,7 @@ impl Processor { peer_id: PeerId, peer_client: Client, blob_index: u64, // TODO: add a type for the blob index - signed_blob: Arc>, + signed_blob: SignedBlobSidecar, ) { self.send_beacon_processor_work(BeaconWorkEvent::gossip_signed_blob_sidecar( message_id, diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index cc043c6269..ad7cad3216 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -56,6 +56,7 @@ use std::ops::Sub; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; +use types::blob_sidecar::BlobIdentifier; use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot}; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync @@ -120,11 +121,10 @@ pub enum SyncMessage { UnknownBlockHash(PeerId, Hash256), /// A peer has sent us a block that we haven't received all the blobs for. This triggers - /// the manager to attempt to find a blobs for the given block root. - /// TODO: add required blob indices as well. + /// the manager to attempt to find the pending blobs for the given block root. UnknownBlobHash { peer_id: PeerId, - block_root: Hash256, + pending_blobs: Vec, }, /// A peer has disconnected.