diff --git a/beacon_node/lighthouse_network/src/service/gossip_cache.rs b/beacon_node/lighthouse_network/src/service/gossip_cache.rs index d3971a7d74..a6e003934c 100644 --- a/beacon_node/lighthouse_network/src/service/gossip_cache.rs +++ b/beacon_node/lighthouse_network/src/service/gossip_cache.rs @@ -21,7 +21,7 @@ pub struct GossipCache { /// Timeout for blocks. beacon_block: Option, /// Timeout for blobs. - beacon_block_and_blobs_sidecar: Option, + blob_sidecar: Option, /// Timeout for aggregate attestations. aggregates: Option, /// Timeout for attestations. @@ -50,7 +50,7 @@ pub struct GossipCacheBuilder { /// Timeout for blocks. beacon_block: Option, /// Timeout for blob sidecars. - beacon_block_and_blobs_sidecar: Option, + blob_sidecar: Option, /// Timeout for aggregate attestations. aggregates: Option, /// Timeout for attestations. @@ -151,7 +151,7 @@ impl GossipCacheBuilder { let GossipCacheBuilder { default_timeout, beacon_block, - beacon_block_and_blobs_sidecar, + blob_sidecar, aggregates, attestation, voluntary_exit, @@ -167,7 +167,7 @@ impl GossipCacheBuilder { expirations: DelayQueue::default(), topic_msgs: HashMap::default(), beacon_block: beacon_block.or(default_timeout), - beacon_block_and_blobs_sidecar: beacon_block_and_blobs_sidecar.or(default_timeout), + blob_sidecar: blob_sidecar.or(default_timeout), aggregates: aggregates.or(default_timeout), attestation: attestation.or(default_timeout), voluntary_exit: voluntary_exit.or(default_timeout), @@ -193,7 +193,7 @@ impl GossipCache { pub fn insert(&mut self, topic: GossipTopic, data: Vec) { let expire_timeout = match topic.kind() { GossipKind::BeaconBlock => self.beacon_block, - GossipKind::BeaconBlocksAndBlobsSidecar => self.beacon_block_and_blobs_sidecar, + GossipKind::BlobSidecar(_) => self.blob_sidecar, GossipKind::BeaconAggregateAndProof => self.aggregates, GossipKind::Attestation(_) => self.attestation, GossipKind::VoluntaryExit => self.voluntary_exit, diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 4c8c770b94..0154a4dd32 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -235,6 +235,7 @@ impl Network { possible_fork_digests, ctx.chain_spec.attestation_subnet_count, SYNC_COMMITTEE_SUBNET_COUNT, + 4, // TODO(pawan): get this from chainspec ), max_subscribed_topics: 200, max_subscriptions_per_request: 150, // 148 in theory = (64 attestation + 4 sync committee + 6 core topics) * 2 diff --git a/beacon_node/lighthouse_network/src/service/utils.rs b/beacon_node/lighthouse_network/src/service/utils.rs index 383b78abf2..e0eb3ff50d 100644 --- a/beacon_node/lighthouse_network/src/service/utils.rs +++ b/beacon_node/lighthouse_network/src/service/utils.rs @@ -236,6 +236,7 @@ pub(crate) fn create_whitelist_filter( possible_fork_digests: Vec<[u8; 4]>, attestation_subnet_count: u64, sync_committee_subnet_count: u64, + blob_sidecar_subnet_count: u64, ) -> WhitelistSubscriptionFilter { let mut possible_hashes = HashSet::new(); for fork_digest in possible_fork_digests { @@ -255,13 +256,15 @@ pub(crate) fn create_whitelist_filter( add(BlsToExecutionChange); add(LightClientFinalityUpdate); add(LightClientOptimisticUpdate); - add(BeaconBlocksAndBlobsSidecar); for id in 0..attestation_subnet_count { add(Attestation(SubnetId::new(id))); } for id in 0..sync_committee_subnet_count { add(SyncCommitteeMessage(SyncSubnetId::new(id))); } + for id in 0..blob_sidecar_subnet_count { + add(BlobSidecar(id)); + } } WhitelistSubscriptionFilter(possible_hashes) } diff --git a/beacon_node/lighthouse_network/src/types/pubsub.rs b/beacon_node/lighthouse_network/src/types/pubsub.rs index 7951a07243..243e831485 100644 --- a/beacon_node/lighthouse_network/src/types/pubsub.rs +++ b/beacon_node/lighthouse_network/src/types/pubsub.rs @@ -11,8 +11,8 @@ use std::sync::Arc; use types::{ Attestation, AttesterSlashing, EthSpec, ForkContext, ForkName, LightClientFinalityUpdate, LightClientOptimisticUpdate, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, - SignedBeaconBlockAltair, SignedBeaconBlockAndBlobsSidecar, SignedBeaconBlockBase, - SignedBeaconBlockCapella, SignedBeaconBlockMerge, SignedBlsToExecutionChange, + SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockCapella, + SignedBeaconBlockMerge, SignedBlobSidecar, SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId, }; @@ -20,8 +20,8 @@ use types::{ pub enum PubsubMessage { /// Gossipsub message providing notification of a new block. BeaconBlock(Arc>), - /// Gossipsub message providing notification of a new SignedBeaconBlock coupled with a blobs sidecar. - BeaconBlockAndBlobsSidecars(SignedBeaconBlockAndBlobsSidecar), + /// Gossipsub message providing notification of a [`SignedBlobSidecar`] along with the subnet id where it was received. + BlobSidecar(Box<(u64, SignedBlobSidecar)>), /// Gossipsub message providing notification of a Aggregate attestation and associated proof. AggregateAndProofAttestation(Box>), /// Gossipsub message providing notification of a raw un-aggregated attestation with its shard id. @@ -115,8 +115,8 @@ impl PubsubMessage { pub fn kind(&self) -> GossipKind { match self { PubsubMessage::BeaconBlock(_) => GossipKind::BeaconBlock, - PubsubMessage::BeaconBlockAndBlobsSidecars(_) => { - GossipKind::BeaconBlocksAndBlobsSidecar + PubsubMessage::BlobSidecar(blob_sidecar_data) => { + GossipKind::BlobSidecar(blob_sidecar_data.0) } PubsubMessage::AggregateAndProofAttestation(_) => GossipKind::BeaconAggregateAndProof, PubsubMessage::Attestation(attestation_data) => { @@ -203,15 +203,15 @@ impl PubsubMessage { }; Ok(PubsubMessage::BeaconBlock(Arc::new(beacon_block))) } - GossipKind::BeaconBlocksAndBlobsSidecar => { + GossipKind::BlobSidecar(blob_index) => { match fork_context.from_context_bytes(gossip_topic.fork_digest) { Some(ForkName::Eip4844) => { - let block_and_blobs_sidecar = - SignedBeaconBlockAndBlobsSidecar::from_ssz_bytes(data) - .map_err(|e| format!("{:?}", e))?; - Ok(PubsubMessage::BeaconBlockAndBlobsSidecars( - block_and_blobs_sidecar, - )) + let blob_sidecar = SignedBlobSidecar::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?; + Ok(PubsubMessage::BlobSidecar(Box::new(( + *blob_index, + blob_sidecar, + )))) } Some( ForkName::Base @@ -293,7 +293,7 @@ impl PubsubMessage { // messages for us. match &self { PubsubMessage::BeaconBlock(data) => data.as_ssz_bytes(), - PubsubMessage::BeaconBlockAndBlobsSidecars(data) => data.as_ssz_bytes(), + PubsubMessage::BlobSidecar(data) => data.1.as_ssz_bytes(), PubsubMessage::AggregateAndProofAttestation(data) => data.as_ssz_bytes(), PubsubMessage::VoluntaryExit(data) => data.as_ssz_bytes(), PubsubMessage::ProposerSlashing(data) => data.as_ssz_bytes(), @@ -317,11 +317,10 @@ impl std::fmt::Display for PubsubMessage { block.slot(), block.message().proposer_index() ), - PubsubMessage::BeaconBlockAndBlobsSidecars(block_and_blob) => write!( + PubsubMessage::BlobSidecar(data) => write!( f, - "Beacon block and Blobs Sidecar: slot: {}, blobs: {}", - block_and_blob.beacon_block.message().slot(), - block_and_blob.blobs_sidecar.blobs.len(), + "BlobSidecar: slot: {}, blob index: {}", + data.1.blob.slot, data.1.blob.index, ), PubsubMessage::AggregateAndProofAttestation(att) => write!( f, diff --git a/beacon_node/lighthouse_network/src/types/topics.rs b/beacon_node/lighthouse_network/src/types/topics.rs index 20f836b76a..88764fb951 100644 --- a/beacon_node/lighthouse_network/src/types/topics.rs +++ b/beacon_node/lighthouse_network/src/types/topics.rs @@ -11,9 +11,9 @@ use crate::Subnet; pub const TOPIC_PREFIX: &str = "eth2"; pub const SSZ_SNAPPY_ENCODING_POSTFIX: &str = "ssz_snappy"; pub const BEACON_BLOCK_TOPIC: &str = "beacon_block"; -pub const BEACON_BLOCK_AND_BLOBS_SIDECAR_TOPIC: &str = "beacon_block_and_blobs_sidecar"; pub const BEACON_AGGREGATE_AND_PROOF_TOPIC: &str = "beacon_aggregate_and_proof"; pub const BEACON_ATTESTATION_PREFIX: &str = "beacon_attestation_"; +pub const BLOB_SIDECAR_PREFIX: &str = "blob_sidecar_"; pub const VOLUNTARY_EXIT_TOPIC: &str = "voluntary_exit"; pub const PROPOSER_SLASHING_TOPIC: &str = "proposer_slashing"; pub const ATTESTER_SLASHING_TOPIC: &str = "attester_slashing"; @@ -82,10 +82,10 @@ pub struct GossipTopic { pub enum GossipKind { /// Topic for publishing beacon blocks. BeaconBlock, - /// Topic for publishing beacon block coupled with blob sidecars. - BeaconBlocksAndBlobsSidecar, /// Topic for publishing aggregate attestations and proofs. BeaconAggregateAndProof, + /// Topic for publishing BlobSidecars. + BlobSidecar(u64), /// Topic for publishing raw attestations on a particular subnet. #[strum(serialize = "beacon_attestation")] Attestation(SubnetId), @@ -115,6 +115,9 @@ impl std::fmt::Display for GossipKind { GossipKind::SyncCommitteeMessage(subnet_id) => { write!(f, "sync_committee_{}", **subnet_id) } + GossipKind::BlobSidecar(blob_index) => { + write!(f, "{}{}", BLOB_SIDECAR_PREFIX, blob_index) + } x => f.write_str(x.as_ref()), } } @@ -175,7 +178,6 @@ impl GossipTopic { let kind = match topic_parts[3] { BEACON_BLOCK_TOPIC => GossipKind::BeaconBlock, BEACON_AGGREGATE_AND_PROOF_TOPIC => GossipKind::BeaconAggregateAndProof, - BEACON_BLOCK_AND_BLOBS_SIDECAR_TOPIC => GossipKind::BeaconBlocksAndBlobsSidecar, SIGNED_CONTRIBUTION_AND_PROOF_TOPIC => GossipKind::SignedContributionAndProof, VOLUNTARY_EXIT_TOPIC => GossipKind::VoluntaryExit, PROPOSER_SLASHING_TOPIC => GossipKind::ProposerSlashing, @@ -183,11 +185,8 @@ impl GossipTopic { BLS_TO_EXECUTION_CHANGE_TOPIC => GossipKind::BlsToExecutionChange, LIGHT_CLIENT_FINALITY_UPDATE => GossipKind::LightClientFinalityUpdate, LIGHT_CLIENT_OPTIMISTIC_UPDATE => GossipKind::LightClientOptimisticUpdate, - topic => match committee_topic_index(topic) { - Some(subnet) => match subnet { - Subnet::Attestation(s) => GossipKind::Attestation(s), - Subnet::SyncCommittee(s) => GossipKind::SyncCommitteeMessage(s), - }, + topic => match subnet_topic_index(topic) { + Some(kind) => kind, None => return Err(format!("Unknown topic: {}", topic)), }, }; @@ -232,7 +231,6 @@ impl std::fmt::Display for GossipTopic { let kind = match self.kind { GossipKind::BeaconBlock => BEACON_BLOCK_TOPIC.into(), - GossipKind::BeaconBlocksAndBlobsSidecar => BEACON_BLOCK_AND_BLOBS_SIDECAR_TOPIC.into(), GossipKind::BeaconAggregateAndProof => BEACON_AGGREGATE_AND_PROOF_TOPIC.into(), GossipKind::VoluntaryExit => VOLUNTARY_EXIT_TOPIC.into(), GossipKind::ProposerSlashing => PROPOSER_SLASHING_TOPIC.into(), @@ -242,6 +240,9 @@ impl std::fmt::Display for GossipTopic { GossipKind::SyncCommitteeMessage(index) => { format!("{}{}", SYNC_COMMITTEE_PREFIX_TOPIC, *index) } + GossipKind::BlobSidecar(blob_index) => { + format!("{}{}", BLOB_SIDECAR_PREFIX, blob_index) + } GossipKind::BlsToExecutionChange => BLS_TO_EXECUTION_CHANGE_TOPIC.into(), GossipKind::LightClientFinalityUpdate => LIGHT_CLIENT_FINALITY_UPDATE.into(), GossipKind::LightClientOptimisticUpdate => LIGHT_CLIENT_OPTIMISTIC_UPDATE.into(), @@ -273,22 +274,18 @@ pub fn subnet_from_topic_hash(topic_hash: &TopicHash) -> Option { GossipTopic::decode(topic_hash.as_str()).ok()?.subnet_id() } -// Determines if a string is an attestation or sync committee topic. -fn committee_topic_index(topic: &str) -> Option { - if topic.starts_with(BEACON_ATTESTATION_PREFIX) { - return Some(Subnet::Attestation(SubnetId::new( - topic - .trim_start_matches(BEACON_ATTESTATION_PREFIX) - .parse::() - .ok()?, +// Determines if the topic name is of an indexed topic. +fn subnet_topic_index(topic: &str) -> Option { + if let Some(index) = topic.strip_prefix(BEACON_ATTESTATION_PREFIX) { + return Some(GossipKind::Attestation(SubnetId::new( + index.parse::().ok()?, ))); - } else if topic.starts_with(SYNC_COMMITTEE_PREFIX_TOPIC) { - return Some(Subnet::SyncCommittee(SyncSubnetId::new( - topic - .trim_start_matches(SYNC_COMMITTEE_PREFIX_TOPIC) - .parse::() - .ok()?, + } else if let Some(index) = topic.strip_prefix(SYNC_COMMITTEE_PREFIX_TOPIC) { + return Some(GossipKind::SyncCommitteeMessage(SyncSubnetId::new( + index.parse::().ok()?, ))); + } else if let Some(index) = topic.strip_prefix(BLOB_SIDECAR_PREFIX) { + return Some(GossipKind::BlobSidecar(index.parse::().ok()?)); } None } diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index a2dfb853ea..ce48378532 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -101,6 +101,7 @@ pub mod sqlite; pub mod blob_sidecar; pub mod blobs_sidecar; +pub mod signed_blob; pub mod signed_block_and_blobs; pub mod transaction; @@ -181,6 +182,7 @@ pub use crate::signed_beacon_block::{ SignedBlindedBeaconBlock, }; pub use crate::signed_beacon_block_header::SignedBeaconBlockHeader; +pub use crate::signed_blob::*; pub use crate::signed_block_and_blobs::SignedBeaconBlockAndBlobsSidecar; pub use crate::signed_block_and_blobs::SignedBeaconBlockAndBlobsSidecarDecode; pub use crate::signed_bls_to_execution_change::SignedBlsToExecutionChange; diff --git a/consensus/types/src/signed_blob.rs b/consensus/types/src/signed_blob.rs new file mode 100644 index 0000000000..a3cfb9d58a --- /dev/null +++ b/consensus/types/src/signed_blob.rs @@ -0,0 +1,24 @@ +use crate::{test_utils::TestRandom, BlobSidecar, EthSpec, Signature}; +use serde_derive::{Deserialize, Serialize}; +use ssz_derive::{Decode, Encode}; +use test_random_derive::TestRandom; +use tree_hash_derive::TreeHash; + +#[derive( + Debug, + Clone, + PartialEq, + Serialize, + Deserialize, + Encode, + Decode, + TestRandom, + TreeHash, + arbitrary::Arbitrary, +)] +#[serde(bound = "T: EthSpec")] +#[arbitrary(bound = "T: EthSpec")] +pub struct SignedBlobSidecar { + pub blob: BlobSidecar, + pub signature: Signature, +}