gossip boilerplate

This commit is contained in:
Daniel Knopik
2022-09-17 14:58:27 +02:00
parent bcc738cb9d
commit 292a16a6eb
8 changed files with 115 additions and 5 deletions

View File

@@ -65,6 +65,7 @@ use types::{
SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, SubnetId,
SyncCommitteeMessage, SyncSubnetId,
};
use types::signed_blobs_sidecar::SignedBlobsSidecar;
use work_reprocessing_queue::{
spawn_reprocess_scheduler, QueuedAggregate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork,
};
@@ -184,6 +185,7 @@ pub const GOSSIP_ATTESTATION_BATCH: &str = "gossip_attestation_batch";
pub const GOSSIP_AGGREGATE: &str = "gossip_aggregate";
pub const GOSSIP_AGGREGATE_BATCH: &str = "gossip_aggregate_batch";
pub const GOSSIP_BLOCK: &str = "gossip_block";
pub const GOSSIP_BLOBS_SIDECAR: &str = "gossip_blobs_sidecar";
pub const DELAYED_IMPORT_BLOCK: &str = "delayed_import_block";
pub const GOSSIP_VOLUNTARY_EXIT: &str = "gossip_voluntary_exit";
pub const GOSSIP_PROPOSER_SLASHING: &str = "gossip_proposer_slashing";
@@ -400,6 +402,26 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
}
}
/// Create a new `Work` event for some blobs sidecar.
pub fn gossip_blobs_sidecar(
message_id: MessageId,
peer_id: PeerId,
peer_client: Client,
blobs: Arc<SignedBlobsSidecar<T::EthSpec>>,
seen_timestamp: Duration,
) -> Self {
Self {
drop_during_sync: false,
work: Work::GossipBlobsSidecar {
message_id,
peer_id,
peer_client,
blobs,
seen_timestamp,
},
}
}
/// Create a new `Work` event for some sync committee signature.
pub fn gossip_sync_signature(
message_id: MessageId,
@@ -671,6 +693,13 @@ pub enum Work<T: BeaconChainTypes> {
block: Arc<SignedBeaconBlock<T::EthSpec>>,
seen_timestamp: Duration,
},
GossipBlobsSidecar {
message_id: MessageId,
peer_id: PeerId,
peer_client: Client,
blobs: Arc<SignedBlobsSidecar<T::EthSpec>>,
seen_timestamp: Duration,
},
DelayedImportBlock {
peer_id: PeerId,
block: Box<GossipVerifiedBlock<T>>,
@@ -739,6 +768,7 @@ impl<T: BeaconChainTypes> Work<T> {
Work::GossipAggregate { .. } => GOSSIP_AGGREGATE,
Work::GossipAggregateBatch { .. } => GOSSIP_AGGREGATE_BATCH,
Work::GossipBlock { .. } => GOSSIP_BLOCK,
Work::GossipBlobsSidecar { .. } => GOSSIP_BLOBS_SIDECAR,
Work::DelayedImportBlock { .. } => DELAYED_IMPORT_BLOCK,
Work::GossipVoluntaryExit { .. } => GOSSIP_VOLUNTARY_EXIT,
Work::GossipProposerSlashing { .. } => GOSSIP_PROPOSER_SLASHING,
@@ -888,6 +918,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN);
let mut gossip_blobs_sidecar_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN);
let mut delayed_block_queue = FifoQueue::new(MAX_DELAYED_BLOCK_QUEUE_LEN);
let mut status_queue = FifoQueue::new(MAX_STATUS_QUEUE_LEN);
@@ -1199,6 +1230,9 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
Work::GossipBlock { .. } => {
gossip_block_queue.push(work, work_id, &self.log)
}
Work::GossipBlobsSidecar { .. } => {
gossip_blobs_sidecar_queue.push(work, work_id, &self.log)
}
Work::DelayedImportBlock { .. } => {
delayed_block_queue.push(work, work_id, &self.log)
}
@@ -1451,6 +1485,28 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
)
.await
}),
/*
* Verification for blobs sidecars received on gossip.
*/
Work::GossipBlobsSidecar {
message_id,
peer_id,
peer_client,
blobs,
seen_timestamp,
} => task_spawner.spawn_async(async move {
worker
.process_gossip_blobs_sidecar(
message_id,
peer_id,
peer_client,
blobs,
work_reprocessing_tx,
duplicate_cache,
seen_timestamp,
)
.await
}),
/*
* Import for blocks that we received earlier than their intended slot.
*/

View File

@@ -22,6 +22,7 @@ use types::{
SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit,
Slot, SubnetId, SyncCommitteeMessage, SyncSubnetId,
};
use types::signed_blobs_sidecar::SignedBlobsSidecar;
use super::{
super::work_reprocessing_queue::{
@@ -987,6 +988,19 @@ impl<T: BeaconChainTypes> Worker<T> {
};
}
pub async fn process_gossip_blobs_sidecar(
self,
message_id: MessageId,
peer_id: PeerId,
peer_client: Client,
blobs: Arc<SignedBlobsSidecar<T::EthSpec>>,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
duplicate_cache: DuplicateCache,
seen_duration: Duration,
) {
}
pub fn process_gossip_voluntary_exit(
self,
message_id: MessageId,

View File

@@ -229,6 +229,14 @@ impl<T: BeaconChainTypes> Router<T> {
block,
);
}
PubsubMessage::BlobsSidecars(blobs) => {
self.processor.on_blobs_gossip(
id,
peer_id,
self.network_globals.client(&peer_id),
blobs,
);
}
PubsubMessage::VoluntaryExit(exit) => {
debug!(self.log, "Received a voluntary exit"; "peer_id" => %peer_id);
self.processor.on_voluntary_exit_gossip(id, peer_id, exit);

View File

@@ -20,6 +20,7 @@ use types::{
Attestation, AttesterSlashing, EthSpec, ProposerSlashing, SignedAggregateAndProof,
SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncSubnetId,
};
use types::signed_blobs_sidecar::SignedBlobsSidecar;
/// Processes validated messages from the network. It relays necessary data to the syncing thread
/// and processes blocks from the pubsub network.
@@ -255,6 +256,22 @@ impl<T: BeaconChainTypes> Processor<T> {
))
}
pub fn on_blobs_gossip(
&mut self,
message_id: MessageId,
peer_id: PeerId,
peer_client: Client,
blobs: Arc<SignedBlobsSidecar<T::EthSpec>>,
) {
self.send_beacon_processor_work(BeaconWorkEvent::gossip_blobs_sidecar(
message_id,
peer_id,
peer_client,
blobs,
timestamp_now(),
))
}
pub fn on_unaggregated_attestation_gossip(
&mut self,
message_id: MessageId,