This commit is contained in:
Pawan Dhananjay
2023-03-21 00:46:31 +05:30
parent 38f2b88dd0
commit 0958ce610f
12 changed files with 44 additions and 196 deletions

View File

@@ -459,7 +459,7 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
peer_id,
peer_client,
blob_index,
signed_blob,
signed_blob: Box::new(signed_blob),
seen_timestamp,
},
}
@@ -864,7 +864,7 @@ pub enum Work<T: BeaconChainTypes> {
peer_id: PeerId,
peer_client: Client,
blob_index: u64,
signed_blob: SignedBlobSidecar<T::EthSpec>,
signed_blob: Box<SignedBlobSidecar<T::EthSpec>>,
seen_timestamp: Duration,
},
DelayedImportBlock {
@@ -1759,8 +1759,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
peer_id,
peer_client,
blob_index,
signed_blob,
work_reprocessing_tx,
*signed_blob,
seen_timestamp,
)
.await

View File

@@ -2,7 +2,6 @@ use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
use beacon_chain::blob_verification::{AsBlock, BlockWrapper, GossipVerifiedBlob};
use beacon_chain::store::Error;
use beacon_chain::ExecutedBlock;
use beacon_chain::{
attestation_verification::{self, Error as AttnError, VerifiedAttestation},
light_client_finality_update_verification::Error as LightClientFinalityUpdateError,
@@ -654,10 +653,9 @@ impl<T: BeaconChainTypes> Worker<T> {
self,
_message_id: MessageId,
peer_id: PeerId,
peer_client: Client,
_peer_client: Client,
blob_index: u64,
signed_blob: SignedBlobSidecar<T::EthSpec>,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
_seen_duration: Duration,
) {
match self
@@ -665,13 +663,8 @@ impl<T: BeaconChainTypes> Worker<T> {
.verify_blob_sidecar_for_gossip(signed_blob, blob_index)
{
Ok(gossip_verified_blob) => {
self.process_gossip_verified_blob(
peer_id,
gossip_verified_blob,
reprocess_tx,
_seen_duration,
)
.await
self.process_gossip_verified_blob(peer_id, gossip_verified_blob, _seen_duration)
.await
}
Err(_) => {
// TODO(pawan): handle all blob errors for peer scoring
@@ -684,7 +677,6 @@ impl<T: BeaconChainTypes> Worker<T> {
self,
peer_id: PeerId,
verified_blob: GossipVerifiedBlob<T::EthSpec>,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
// This value is not used presently, but it might come in handy for debugging.
_seen_duration: Duration,
) {
@@ -694,7 +686,7 @@ impl<T: BeaconChainTypes> Worker<T> {
.process_blob(verified_blob.to_blob(), CountUnrealized::True)
.await
{
Ok(AvailabilityProcessingStatus::Imported(hash)) => {
Ok(AvailabilityProcessingStatus::Imported(_hash)) => {
todo!()
// add to metrics
// logging
@@ -707,7 +699,7 @@ impl<T: BeaconChainTypes> Worker<T> {
Ok(AvailabilityProcessingStatus::PendingBlock(block_hash)) => {
self.send_sync_message(SyncMessage::UnknownBlockHash(peer_id, block_hash));
}
Err(e) => {
Err(_err) => {
// handle errors
todo!()
}
@@ -847,7 +839,7 @@ impl<T: BeaconChainTypes> Worker<T> {
verified_block
}
Err(BlockError::AvailabilityCheck(e)) => {
Err(BlockError::AvailabilityCheck(_err)) => {
todo!()
}
Err(BlockError::ParentUnknown(block)) => {
@@ -1008,104 +1000,6 @@ impl<T: BeaconChainTypes> Worker<T> {
}
}
/// Process the beacon block that has already passed gossip verification.
///
/// Raises a log if there are errors.
pub async fn process_execution_verified_block(
self,
peer_id: PeerId,
executed_block: ExecutedBlock<T::EthSpec>,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
// This value is not used presently, but it might come in handy for debugging.
seen_duration: Duration,
) {
let block_root = executed_block.block_root;
let block = executed_block.block.block_cloned();
match self
.chain
.check_availability_and_maybe_import(
|chain| {
chain
.data_availability_checker
.check_block_availability(executed_block)
},
CountUnrealized::True,
)
.await
{
Ok(AvailabilityProcessingStatus::Imported(block_root)) => {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL);
if reprocess_tx
.try_send(ReprocessQueueMessage::BlockImported {
block_root,
parent_root: block.message().parent_root(),
})
.is_err()
{
error!(
self.log,
"Failed to inform block import";
"source" => "gossip",
"block_root" => ?block_root,
)
};
debug!(
self.log,
"Gossipsub block processed";
"block" => ?block_root,
"peer_id" => %peer_id
);
self.chain.recompute_head_at_current_slot().await;
}
Ok(AvailabilityProcessingStatus::PendingBlobs(_))
| Ok(AvailabilityProcessingStatus::PendingBlock(_))
| Err(BlockError::AvailabilityCheck(_)) => {
// TODO(need to do something different if it's unavailble again)
unimplemented!()
}
Err(BlockError::ParentUnknown(block)) => {
// Inform the sync manager to find parents for this block
// This should not occur. It should be checked by `should_forward_block`
error!(
self.log,
"Block with unknown parent attempted to be processed";
"peer_id" => %peer_id
);
self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block, block_root));
}
Err(ref e @ BlockError::ExecutionPayloadError(ref epe)) if !epe.penalize_peer() => {
debug!(
self.log,
"Failed to verify execution payload";
"error" => %e
);
}
other => {
debug!(
self.log,
"Invalid gossip beacon block";
"outcome" => ?other,
"block root" => ?block_root,
"block slot" => block.slot()
);
self.gossip_penalize_peer(
peer_id,
PeerAction::MidToleranceError,
"bad_gossip_block_ssz",
);
trace!(
self.log,
"Invalid gossip beacon block ssz";
"ssz" => format_args!("0x{}", hex::encode(block.as_ssz_bytes())),
);
}
};
}
/// Process the beacon block that has already passed gossip verification.
///
/// Raises a log if there are errors.
@@ -1115,7 +1009,7 @@ impl<T: BeaconChainTypes> Worker<T> {
verified_block: GossipVerifiedBlock<T>,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
// This value is not used presently, but it might come in handy for debugging.
seen_duration: Duration,
_seen_duration: Duration,
) {
let block = verified_block.block.block_cloned();
let block_root = verified_block.block_root;

View File

@@ -225,9 +225,9 @@ impl<T: BeaconChainTypes> Worker<T> {
executor.spawn(
async move {
let requested_blobs = request.blob_ids.len();
let mut send_block_count = 0;
let send_block_count = 0;
let mut send_response = true;
for BlobIdentifier{ block_root: root, index } in request.blob_ids.into_iter() {
for BlobIdentifier{ block_root: root, index: _index } in request.blob_ids.into_iter() {
match self
.chain
.get_block_and_blobs_checking_early_attester_cache(&root)
@@ -833,12 +833,12 @@ impl<T: BeaconChainTypes> Worker<T> {
// remove all skip slots
let block_roots = block_roots.into_iter().flatten().collect::<Vec<_>>();
let mut blobs_sent = 0;
let blobs_sent = 0;
let mut send_response = true;
for root in block_roots {
match self.chain.get_blobs(&root) {
Ok(Some(blobs)) => {
Ok(Some(_blobs)) => {
todo!();
// // TODO: more GROSS code ahead. Reader beware
// let types::BlobsSidecar {

View File

@@ -7,7 +7,7 @@ use crate::beacon_processor::DuplicateCache;
use crate::metrics;
use crate::sync::manager::{BlockProcessType, SyncMessage};
use crate::sync::{BatchProcessResult, ChainId};
use beacon_chain::blob_verification::{AsBlock, BlockWrapper, IntoAvailableBlock};
use beacon_chain::blob_verification::{AsBlock, BlockWrapper};
use beacon_chain::{AvailabilityProcessingStatus, CountUnrealized};
use beacon_chain::{
BeaconChainError, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError,