Merge stuff

This commit is contained in:
Pawan Dhananjay
2023-03-20 21:09:00 +05:30
parent c0445e2536
commit 3c1687d23c
7 changed files with 76 additions and 103 deletions

View File

@@ -8,8 +8,8 @@ use crate::beacon_proposer_cache::compute_proposer_duties_from_head;
use crate::beacon_proposer_cache::BeaconProposerCache;
use crate::blob_cache::BlobCache;
use crate::blob_verification::{
self, AsBlock, AvailableBlock, BlobError, BlockWrapper, GossipVerifiedBlobSidecar,
IntoAvailableBlock, VerifiedBlobs,
self, AsBlock, AvailableBlock, BlobError, BlockWrapper, GossipVerifiedBlob, IntoAvailableBlock,
VerifiedBlobs,
};
use crate::block_times_cache::BlockTimesCache;
use crate::block_verification::{
@@ -1901,9 +1901,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn verify_blob_sidecar_for_gossip(
self: &Arc<Self>,
blob_sidecar: Arc<SignedBlobSidecar<T::EthSpec>>,
blob_sidecar: SignedBlobSidecar<T::EthSpec>,
subnet_id: u64,
) -> Result<GossipVerifiedBlobSidecar, BlobError> // TODO(pawan): make a GossipVerifedBlob type
) -> Result<GossipVerifiedBlob<T::EthSpec>, BlobError> // TODO(pawan): make a GossipVerifedBlob type
{
blob_verification::validate_blob_sidecar_for_gossip(blob_sidecar, subnet_id, self)
}

View File

@@ -141,20 +141,12 @@ impl<T: EthSpec> GossipVerifiedBlob<T> {
self.blob
}
}
pub struct GossipVerifiedBlobSidecar {
/// Indicates if all blobs for a given block_root are available
/// in the blob cache.
pub all_blobs_available: bool,
pub block_root: Hash256,
// TODO(pawan): add an Arced blob sidecar which when returned to gossip_methods
// adds the entire thing to the blob cache.
}
pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes>(
signed_blob_sidecar: Arc<SignedBlobSidecar<T::EthSpec>>,
signed_blob_sidecar: SignedBlobSidecar<T::EthSpec>,
subnet: u64,
chain: &BeaconChain<T>,
) -> Result<GossipVerifiedBlobSidecar, BlobError> {
) -> Result<GossipVerifiedBlob<T::EthSpec>, BlobError> {
let blob_slot = signed_blob_sidecar.message.slot;
let blob_index = signed_blob_sidecar.message.index;
let block_root = signed_blob_sidecar.message.block_root;
@@ -250,11 +242,6 @@ pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes>(
// TODO(pawan): Check if other blobs for the same proposer index and blob index have been
// received and drop if required.
let da_checker = &chain.data_availability_checker;
let all_blobs_available = da_checker
.put_blob_temp(signed_blob_sidecar)
.map_err(BlobError::BlobCacheError)?;
// Verify if the corresponding block for this blob has been received.
// Note: this should be the last gossip check so that we can forward the blob
// over the gossip network even if we haven't received the corresponding block yet
@@ -265,15 +252,15 @@ pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes>(
.get_block(&block_root)
.or_else(|| chain.early_attester_cache.get_proto_block(block_root)); // TODO(pawan): should we be checking this cache?
// TODO(pawan): this may be redundant with the new `AvailabilityProcessingStatus::PendingBlock variant`
if block_opt.is_none() {
return Err(BlobError::UnknownHeadBlock {
beacon_block_root: block_root,
});
}
Ok(GossipVerifiedBlobSidecar {
all_blobs_available,
block_root,
Ok(GossipVerifiedBlob {
blob: signed_blob_sidecar.message,
})
}

View File

@@ -454,7 +454,7 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
peer_id: PeerId,
peer_client: Client,
blob_index: u64,
signed_blob: Arc<SignedBlobSidecar<T::EthSpec>>,
signed_blob: SignedBlobSidecar<T::EthSpec>,
seen_timestamp: Duration,
) -> Self {
Self {
@@ -881,7 +881,7 @@ pub enum Work<T: BeaconChainTypes> {
peer_id: PeerId,
peer_client: Client,
blob_index: u64,
signed_blob: Arc<SignedBlobSidecar<T::EthSpec>>,
signed_blob: SignedBlobSidecar<T::EthSpec>,
seen_timestamp: Duration,
},
DelayedImportBlock {
@@ -1804,6 +1804,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
peer_client,
blob_index,
signed_blob,
work_reprocessing_tx,
seen_timestamp,
)
.await

View File

@@ -657,38 +657,62 @@ impl<T: BeaconChainTypes> Worker<T> {
peer_id: PeerId,
peer_client: Client,
blob_index: u64,
signed_blob: Arc<SignedBlobSidecar<T::EthSpec>>,
signed_blob: SignedBlobSidecar<T::EthSpec>,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
_seen_duration: Duration,
) {
if let Ok(gossip_verified) = self
match self
.chain
.verify_blob_sidecar_for_gossip(signed_blob, blob_index)
{
if gossip_verified.all_blobs_available {
if reprocess_tx
.try_send(ReprocessQueueMessage::BlobsAvailable(
gossip_verified.block_root,
))
.is_err()
{
{
error!(
self.log,
"Failed to send blob availability message";
"block_root" => ?gossip_verified.block_root,
"location" => "block gossip"
)
}
}
Ok(gossip_verified_blob) => {
self.process_gossip_verified_blob(
peer_id,
gossip_verified_blob,
reprocess_tx,
_seen_duration,
)
.await
}
Err(_) => {
// TODO(pawan): handle all blob errors for peer scoring
todo!()
}
}
}
pub async fn process_gossip_verified_blob(
self,
peer_id: PeerId,
verified_blob: GossipVerifiedBlob<T::EthSpec>,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
// This value is not used presently, but it might come in handy for debugging.
_seen_duration: Duration,
) {
// TODO
match self
.chain
.process_blob(verified_blob.to_blob(), CountUnrealized::True)
.await
{
Ok(AvailabilityProcessingStatus::Imported(hash)) => {
todo!()
// add to metrics
// logging
}
Ok(AvailabilityProcessingStatus::PendingBlobs(pending_blobs)) => self
.send_sync_message(SyncMessage::UnknownBlobHash {
peer_id,
pending_blobs,
}),
Ok(AvailabilityProcessingStatus::PendingBlock(block_hash)) => {
self.send_sync_message(SyncMessage::UnknownBlockHash(peer_id, block_hash));
}
Err(e) => {
// handle errors
todo!()
}
}
// TODO: gossip verification
crit!(self.log, "UNIMPLEMENTED gossip blob verification";
"peer_id" => %peer_id,
"client" => %peer_client,
"blob_topic" => blob_index,
);
}
/// Process the beacon block received from the gossip network and:
@@ -986,29 +1010,6 @@ impl<T: BeaconChainTypes> Worker<T> {
}
}
pub async fn process_gossip_verified_blob(
self,
peer_id: PeerId,
verified_blob: GossipVerifiedBlob<T::EthSpec>,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
// This value is not used presently, but it might come in handy for debugging.
_seen_duration: Duration,
) {
// TODO
match self
.chain
.process_blob(verified_blob.to_blob(), CountUnrealized::True)
.await
{
Ok(hash) => {
// block imported
}
Err(e) => {
// handle errors
}
}
}
/// Process the beacon block that has already passed gossip verification.
///
/// Raises a log if there are errors.
@@ -1159,35 +1160,19 @@ impl<T: BeaconChainTypes> Worker<T> {
self.chain.recompute_head_at_current_slot().await;
}
Ok(AvailabilityProcessingStatus::PendingBlock(block_root)) => {
// make rpc request for block
todo!()
// This error variant doesn't make any sense in this context
crit!(
self.log,
"Internal error. Cannot get AvailabilityProcessingStatus::PendingBlock on processing block";
"block_root" => %block_root
);
}
Ok(AvailabilityProcessingStatus::PendingBlobs(blob_ids)) => {
Ok(AvailabilityProcessingStatus::PendingBlobs(pending_blobs)) => {
// make rpc request for blob
// let block_slot = block.block.slot();
// // Make rpc request for blobs
// self.send_sync_message(SyncMessage::UnknownBlobHash {
// peer_id,
// block_root: block.block_root,
// });
// // Send block to reprocessing queue to await blobs
// if reprocess_tx
// .try_send(ReprocessQueueMessage::ExecutedBlock(QueuedExecutedBlock {
// peer_id,
// block,
// seen_timestamp: seen_duration,
// }))
// .is_err()
// {
// error!(
// self.log,
// "Failed to send partially verified block to reprocessing queue";
// "block_slot" => %block_slot,
// "block_root" => ?block_root,
// "location" => "block gossip"
// )
// }
self.send_sync_message(SyncMessage::UnknownBlobHash {
peer_id,
pending_blobs,
});
}
Err(BlockError::AvailabilityCheck(_)) => {
todo!()

View File

@@ -257,7 +257,7 @@ impl<T: BeaconChainTypes> Router<T> {
peer_id,
self.network_globals.client(&peer_id),
blob_index,
Arc::new(signed_blob),
signed_blob,
);
}
PubsubMessage::VoluntaryExit(exit) => {

View File

@@ -365,7 +365,7 @@ impl<T: BeaconChainTypes> Processor<T> {
peer_id: PeerId,
peer_client: Client,
blob_index: u64, // TODO: add a type for the blob index
signed_blob: Arc<SignedBlobSidecar<T::EthSpec>>,
signed_blob: SignedBlobSidecar<T::EthSpec>,
) {
self.send_beacon_processor_work(BeaconWorkEvent::gossip_signed_blob_sidecar(
message_id,

View File

@@ -56,6 +56,7 @@ use std::ops::Sub;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use types::blob_sidecar::BlobIdentifier;
use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot};
/// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync
@@ -120,11 +121,10 @@ pub enum SyncMessage<T: EthSpec> {
UnknownBlockHash(PeerId, Hash256),
/// A peer has sent us a block that we haven't received all the blobs for. This triggers
/// the manager to attempt to find a blobs for the given block root.
/// TODO: add required blob indices as well.
/// the manager to attempt to find the pending blobs for the given block root.
UnknownBlobHash {
peer_id: PeerId,
block_root: Hash256,
pending_blobs: Vec<BlobIdentifier>,
},
/// A peer has disconnected.