resolve merge conflicts

This commit is contained in:
Eitan Seri-Levi
2025-02-22 07:33:36 -08:00
127 changed files with 4596 additions and 2671 deletions

View File

@@ -22,6 +22,7 @@ hex = { workspace = true }
itertools = { workspace = true }
libp2p-mplex = "0.43"
lighthouse_version = { workspace = true }
local-ip-address = "0.6"
lru = { workspace = true }
lru_cache = { workspace = true }
metrics = { workspace = true }

View File

@@ -6,6 +6,7 @@ use directory::{
DEFAULT_BEACON_NODE_DIR, DEFAULT_HARDCODED_NETWORK, DEFAULT_NETWORK_DIR, DEFAULT_ROOT_DIR,
};
use libp2p::Multiaddr;
use local_ip_address::local_ipv6;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::net::{Ipv4Addr, Ipv6Addr};
@@ -266,6 +267,18 @@ impl Config {
}
}
/// A helper function to check if the local host has a globally routeable IPv6 address. If so,
/// returns true.
pub fn is_ipv6_supported() -> bool {
// If IPv6 is supported
let Ok(std::net::IpAddr::V6(local_ip)) = local_ipv6() else {
return false;
};
// If its globally routable, return true
is_global_ipv6(&local_ip)
}
pub fn listen_addrs(&self) -> &ListenAddress {
&self.listen_addresses
}
@@ -354,7 +367,7 @@ impl Default for Config {
topics: Vec::new(),
proposer_only: false,
metrics_enabled: false,
enable_light_client_server: false,
enable_light_client_server: true,
outbound_rate_limiter_config: None,
invalid_block_storage: None,
inbound_rate_limiter_config: None,

View File

@@ -689,8 +689,8 @@ impl<E: EthSpec> PeerDB<E> {
&mut self,
supernode: bool,
spec: &ChainSpec,
enr_key: CombinedKey,
) -> PeerId {
let enr_key = CombinedKey::generate_secp256k1();
let mut enr = Enr::builder().build(&enr_key).unwrap();
let peer_id = enr.peer_id();

View File

@@ -234,6 +234,11 @@ impl<E: EthSpec> PeerInfo<E> {
self.custody_subnets.contains(subnet)
}
/// Returns an iterator on this peer's custody subnets
pub fn custody_subnets_iter(&self) -> impl Iterator<Item = &DataColumnSubnetId> {
self.custody_subnets.iter()
}
/// Returns true if the peer is connected to a long-lived subnet.
pub fn has_long_lived_subnet(&self) -> bool {
// Check the meta_data

View File

@@ -411,6 +411,27 @@ impl OldBlocksByRangeRequest {
}
}
impl From<BlocksByRangeRequest> for OldBlocksByRangeRequest {
fn from(req: BlocksByRangeRequest) -> Self {
match req {
BlocksByRangeRequest::V1(ref req) => {
OldBlocksByRangeRequest::V1(OldBlocksByRangeRequestV1 {
start_slot: req.start_slot,
count: req.count,
step: 1,
})
}
BlocksByRangeRequest::V2(ref req) => {
OldBlocksByRangeRequest::V2(OldBlocksByRangeRequestV2 {
start_slot: req.start_slot,
count: req.count,
step: 1,
})
}
}
}
}
/// Request a number of beacon block bodies from a peer.
#[superstruct(variants(V1, V2), variant_attributes(derive(Clone, Debug, PartialEq)))]
#[derive(Clone, Debug, PartialEq)]

View File

@@ -232,14 +232,14 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
}
}
} else {
ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
event: RPCSend::Request(request_id, req),
}
RPCSend::Request(request_id, req)
};
self.events.push(event);
self.events.push(BehaviourAction::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
event,
});
}
/// Lighthouse wishes to disconnect from this peer by sending a Goodbye message. This

View File

@@ -35,7 +35,7 @@ pub(crate) struct SelfRateLimiter<Id: ReqId, E: EthSpec> {
/// Rate limiter for our own requests.
limiter: RateLimiter,
/// Requests that are ready to be sent.
ready_requests: SmallVec<[BehaviourAction<Id, E>; 3]>,
ready_requests: SmallVec<[(PeerId, RPCSend<Id, E>); 3]>,
/// Slog logger.
log: Logger,
}
@@ -76,7 +76,7 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
peer_id: PeerId,
request_id: Id,
req: RequestType<E>,
) -> Result<BehaviourAction<Id, E>, Error> {
) -> Result<RPCSend<Id, E>, Error> {
let protocol = req.versioned_protocol().protocol();
// First check that there are not already other requests waiting to be sent.
if let Some(queued_requests) = self.delayed_requests.get_mut(&(peer_id, protocol)) {
@@ -108,13 +108,9 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
request_id: Id,
req: RequestType<E>,
log: &Logger,
) -> Result<BehaviourAction<Id, E>, (QueuedRequest<Id, E>, Duration)> {
) -> Result<RPCSend<Id, E>, (QueuedRequest<Id, E>, Duration)> {
match limiter.allows(&peer_id, &req) {
Ok(()) => Ok(BehaviourAction::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
event: RPCSend::Request(request_id, req),
}),
Ok(()) => Ok(RPCSend::Request(request_id, req)),
Err(e) => {
let protocol = req.versioned_protocol();
match e {
@@ -126,11 +122,7 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
"Self rate limiting error for a batch that will never fit. Sending request anyway. Check configuration parameters.";
"protocol" => %req.versioned_protocol().protocol()
);
Ok(BehaviourAction::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
event: RPCSend::Request(request_id, req),
})
Ok(RPCSend::Request(request_id, req))
}
RateLimitedErr::TooSoon(wait_time) => {
debug!(log, "Self rate limiting"; "protocol" => %protocol.protocol(), "wait_time_ms" => wait_time.as_millis(), "peer_id" => %peer_id);
@@ -156,7 +148,7 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
// If one fails just wait for the next window that allows sending requests.
return;
}
Ok(event) => self.ready_requests.push(event),
Ok(event) => self.ready_requests.push((peer_id, event)),
}
}
if queued_requests.is_empty() {
@@ -203,8 +195,12 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
let _ = self.limiter.poll_unpin(cx);
// Finally return any queued events.
if !self.ready_requests.is_empty() {
return Poll::Ready(self.ready_requests.remove(0));
if let Some((peer_id, event)) = self.ready_requests.pop() {
return Poll::Ready(BehaviourAction::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
event,
});
}
Poll::Pending
@@ -217,7 +213,7 @@ mod tests {
use crate::rpc::rate_limiter::Quota;
use crate::rpc::self_limiter::SelfRateLimiter;
use crate::rpc::{Ping, Protocol, RequestType};
use crate::service::api_types::{AppRequestId, RequestId, SyncRequestId};
use crate::service::api_types::{AppRequestId, RequestId, SingleLookupReqId, SyncRequestId};
use libp2p::PeerId;
use std::time::Duration;
use types::{EthSpec, ForkContext, Hash256, MainnetEthSpec, Slot};
@@ -238,12 +234,16 @@ mod tests {
let mut limiter: SelfRateLimiter<RequestId, MainnetEthSpec> =
SelfRateLimiter::new(config, fork_context, log).unwrap();
let peer_id = PeerId::random();
let lookup_id = 0;
for i in 1..=5u32 {
let _ = limiter.allows(
peer_id,
RequestId::Application(AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs {
id: i,
RequestId::Application(AppRequestId::Sync(SyncRequestId::SingleBlock {
id: SingleLookupReqId {
lookup_id,
req_id: i,
},
})),
RequestType::Ping(Ping { data: i as u64 }),
);
@@ -261,9 +261,9 @@ mod tests {
for i in 2..=5u32 {
assert!(matches!(
iter.next().unwrap().request_id,
RequestId::Application(AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs {
id,
})) if id == i
RequestId::Application(AppRequestId::Sync(SyncRequestId::SingleBlock {
id: SingleLookupReqId { req_id, .. },
})) if req_id == i,
));
}
@@ -286,9 +286,9 @@ mod tests {
for i in 3..=5 {
assert!(matches!(
iter.next().unwrap().request_id,
RequestId::Application(AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs {
id
})) if id == i
RequestId::Application(AppRequestId::Sync(SyncRequestId::SingleBlock {
id: SingleLookupReqId { req_id, .. },
})) if req_id == i,
));
}

View File

@@ -1,15 +1,14 @@
use std::sync::Arc;
use libp2p::swarm::ConnectionId;
use types::{
BlobSidecar, DataColumnSidecar, EthSpec, Hash256, LightClientBootstrap,
LightClientFinalityUpdate, LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock,
};
use crate::rpc::{
methods::{ResponseTermination, RpcResponse, RpcSuccessResponse, StatusMessage},
SubstreamId,
};
use libp2p::swarm::ConnectionId;
use std::fmt::{Display, Formatter};
use std::sync::Arc;
use types::{
BlobSidecar, DataColumnSidecar, Epoch, EthSpec, Hash256, LightClientBootstrap,
LightClientFinalityUpdate, LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock,
};
/// Identifier of requests sent by a peer.
pub type PeerRequestId = (ConnectionId, SubstreamId);
@@ -31,8 +30,12 @@ pub enum SyncRequestId {
SingleBlob { id: SingleLookupReqId },
/// Request searching for a set of data columns given a hash and list of column indices.
DataColumnsByRoot(DataColumnsByRootRequestId),
/// Range request that is composed by both a block range request and a blob range request.
RangeBlockAndBlobs { id: Id },
/// Blocks by range request
BlocksByRange(BlocksByRangeRequestId),
/// Blobs by range request
BlobsByRange(BlobsByRangeRequestId),
/// Data columns by range request
DataColumnsByRange(DataColumnsByRangeRequestId),
}
/// Request ID for data_columns_by_root requests. Block lookups do not issue this request directly.
@@ -43,12 +46,60 @@ pub struct DataColumnsByRootRequestId {
pub requester: DataColumnsByRootRequester,
}
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct BlocksByRangeRequestId {
/// Id to identify this attempt at a blocks_by_range request for `parent_request_id`
pub id: Id,
/// The Id of the overall By Range request for block components.
pub parent_request_id: ComponentsByRangeRequestId,
}
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct BlobsByRangeRequestId {
/// Id to identify this attempt at a blobs_by_range request for `parent_request_id`
pub id: Id,
/// The Id of the overall By Range request for block components.
pub parent_request_id: ComponentsByRangeRequestId,
}
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct DataColumnsByRangeRequestId {
/// Id to identify this attempt at a data_columns_by_range request for `parent_request_id`
pub id: Id,
/// The Id of the overall By Range request for block components.
pub parent_request_id: ComponentsByRangeRequestId,
}
/// Block components by range request for range sync. Includes an ID for downstream consumers to
/// handle retries and tie all their sub requests together.
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct ComponentsByRangeRequestId {
/// Each `RangeRequestId` may request the same data in a later retry. This Id identifies the
/// current attempt.
pub id: Id,
/// What sync component is issuing a components by range request and expecting data back
pub requester: RangeRequestId,
}
/// Range sync chain or backfill batch
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum RangeRequestId {
RangeSync { chain_id: Id, batch_id: Epoch },
BackfillSync { batch_id: Epoch },
}
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum DataColumnsByRootRequester {
Sampling(SamplingId),
Custody(CustodyId),
}
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum RangeRequester {
RangeSync { chain_id: u64, batch_id: Epoch },
BackfillSync { batch_id: Epoch },
}
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct SamplingId {
pub id: SamplingRequester,
@@ -183,9 +234,108 @@ impl slog::Value for RequestId {
}
}
// This custom impl reduces log boilerplate not printing `DataColumnsByRootRequestId` on each id log
impl std::fmt::Display for DataColumnsByRootRequestId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} {:?}", self.id, self.requester)
macro_rules! impl_display {
($structname: ty, $format: literal, $($field:ident),*) => {
impl Display for $structname {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, $format, $(self.$field,)*)
}
}
};
}
// Since each request Id is deeply nested with various types, if rendered with Debug on logs they
// take too much visual space. This custom Display implementations make the overall Id short while
// not losing information
impl_display!(BlocksByRangeRequestId, "{}/{}", id, parent_request_id);
impl_display!(BlobsByRangeRequestId, "{}/{}", id, parent_request_id);
impl_display!(DataColumnsByRangeRequestId, "{}/{}", id, parent_request_id);
impl_display!(ComponentsByRangeRequestId, "{}/{}", id, requester);
impl_display!(DataColumnsByRootRequestId, "{}/{}", id, requester);
impl_display!(SingleLookupReqId, "{}/Lookup/{}", req_id, lookup_id);
impl_display!(CustodyId, "{}", requester);
impl_display!(SamplingId, "{}/{}", sampling_request_id, id);
impl Display for DataColumnsByRootRequester {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Custody(id) => write!(f, "Custody/{id}"),
Self::Sampling(id) => write!(f, "Sampling/{id}"),
}
}
}
impl Display for CustodyRequester {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl Display for RangeRequestId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::RangeSync { chain_id, batch_id } => write!(f, "RangeSync/{batch_id}/{chain_id}"),
Self::BackfillSync { batch_id } => write!(f, "BackfillSync/{batch_id}"),
}
}
}
impl Display for SamplingRequestId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl Display for SamplingRequester {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::ImportedBlock(block) => write!(f, "ImportedBlock/{block}"),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn display_id_data_columns_by_root_custody() {
let id = DataColumnsByRootRequestId {
id: 123,
requester: DataColumnsByRootRequester::Custody(CustodyId {
requester: CustodyRequester(SingleLookupReqId {
req_id: 121,
lookup_id: 101,
}),
}),
};
assert_eq!(format!("{id}"), "123/Custody/121/Lookup/101");
}
#[test]
fn display_id_data_columns_by_root_sampling() {
let id = DataColumnsByRootRequestId {
id: 123,
requester: DataColumnsByRootRequester::Sampling(SamplingId {
id: SamplingRequester::ImportedBlock(Hash256::ZERO),
sampling_request_id: SamplingRequestId(101),
}),
};
assert_eq!(format!("{id}"), "123/Sampling/101/ImportedBlock/0x0000000000000000000000000000000000000000000000000000000000000000");
}
#[test]
fn display_id_data_columns_by_range() {
let id = DataColumnsByRangeRequestId {
id: 123,
parent_request_id: ComponentsByRangeRequestId {
id: 122,
requester: RangeRequestId::RangeSync {
chain_id: 54,
batch_id: Epoch::new(0),
},
},
};
assert_eq!(format!("{id}"), "123/122/RangeSync/0/54");
}
}

View File

@@ -14,9 +14,8 @@ use crate::rpc::{
RequestType, ResponseTermination, RpcErrorResponse, RpcResponse, RpcSuccessResponse, RPC,
};
use crate::types::{
attestation_sync_committee_topics, fork_core_topics, subnet_from_topic_hash, GossipEncoding,
GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery, ALTAIR_CORE_TOPICS,
BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS,
all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic, subnet_from_topic_hash,
GossipEncoding, GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery,
};
use crate::EnrExt;
use crate::Eth2Enr;
@@ -280,14 +279,39 @@ impl<E: EthSpec> Network<E> {
// Set up a scoring update interval
let update_gossipsub_scores = tokio::time::interval(params.decay_interval);
let max_topics = ctx.chain_spec.attestation_subnet_count as usize
+ SYNC_COMMITTEE_SUBNET_COUNT as usize
+ ctx.chain_spec.blob_sidecar_subnet_count_max() as usize
+ ctx.chain_spec.data_column_sidecar_subnet_count as usize
+ BASE_CORE_TOPICS.len()
+ ALTAIR_CORE_TOPICS.len()
+ CAPELLA_CORE_TOPICS.len() // 0 core deneb and electra topics
+ LIGHT_CLIENT_GOSSIP_TOPICS.len();
let current_and_future_forks = ForkName::list_all().into_iter().filter_map(|fork| {
if fork >= ctx.fork_context.current_fork() {
ctx.fork_context
.to_context_bytes(fork)
.map(|fork_digest| (fork, fork_digest))
} else {
None
}
});
let all_topics_for_forks = current_and_future_forks
.map(|(fork, fork_digest)| {
all_topics_at_fork::<E>(fork, &ctx.chain_spec)
.into_iter()
.map(|topic| {
Topic::new(GossipTopic::new(
topic,
GossipEncoding::default(),
fork_digest,
))
.into()
})
.collect::<Vec<TopicHash>>()
})
.collect::<Vec<_>>();
// For simplicity find the fork with the most individual topics and assume all forks
// have the same topic count
let max_topics_at_any_fork = all_topics_for_forks
.iter()
.map(|topics| topics.len())
.max()
.expect("each fork has at least 5 hardcoded core topics");
let possible_fork_digests = ctx.fork_context.all_fork_digests();
let filter = gossipsub::MaxCountSubscriptionFilter {
@@ -297,9 +321,9 @@ impl<E: EthSpec> Network<E> {
SYNC_COMMITTEE_SUBNET_COUNT,
),
// during a fork we subscribe to both the old and new topics
max_subscribed_topics: max_topics * 4,
max_subscribed_topics: max_topics_at_any_fork * 4,
// 424 in theory = (64 attestation + 4 sync committee + 7 core topics + 9 blob topics + 128 column topics) * 2
max_subscriptions_per_request: max_topics * 2,
max_subscriptions_per_request: max_topics_at_any_fork * 2,
};
// If metrics are enabled for libp2p build the configuration
@@ -332,17 +356,9 @@ impl<E: EthSpec> Network<E> {
// If we are using metrics, then register which topics we want to make sure to keep
// track of
if ctx.libp2p_registry.is_some() {
let topics_to_keep_metrics_for = attestation_sync_committee_topics::<E>()
.map(|gossip_kind| {
Topic::from(GossipTopic::new(
gossip_kind,
GossipEncoding::default(),
enr_fork_id.fork_digest,
))
.into()
})
.collect::<Vec<TopicHash>>();
gossipsub.register_topics_for_metrics(topics_to_keep_metrics_for);
for topics in all_topics_for_forks {
gossipsub.register_topics_for_metrics(topics);
}
}
(gossipsub, update_gossipsub_scores)
@@ -700,32 +716,26 @@ impl<E: EthSpec> Network<E> {
/// Subscribe to all required topics for the `new_fork` with the given `new_fork_digest`.
pub fn subscribe_new_fork_topics(&mut self, new_fork: ForkName, new_fork_digest: [u8; 4]) {
// Subscribe to existing topics with new fork digest
// Re-subscribe to non-core topics with the new fork digest
let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone();
for mut topic in subscriptions.into_iter() {
topic.fork_digest = new_fork_digest;
self.subscribe(topic);
if is_fork_non_core_topic(&topic, new_fork) {
topic.fork_digest = new_fork_digest;
self.subscribe(topic);
}
}
// Subscribe to core topics for the new fork
for kind in fork_core_topics::<E>(&new_fork, &self.fork_context.spec) {
for kind in core_topics_to_subscribe::<E>(
new_fork,
&self.network_globals.as_topic_config(),
&self.fork_context.spec,
) {
let topic = GossipTopic::new(kind, GossipEncoding::default(), new_fork_digest);
self.subscribe(topic);
}
// Register the new topics for metrics
let topics_to_keep_metrics_for = attestation_sync_committee_topics::<E>()
.map(|gossip_kind| {
Topic::from(GossipTopic::new(
gossip_kind,
GossipEncoding::default(),
new_fork_digest,
))
.into()
})
.collect::<Vec<TopicHash>>();
self.gossipsub_mut()
.register_topics_for_metrics(topics_to_keep_metrics_for);
// Already registered all possible gossipsub topics for metrics
}
/// Unsubscribe from all topics that doesn't have the given fork_digest

View File

@@ -1,4 +1,5 @@
//! A collection of variables that are accessible outside of the network thread itself.
use super::TopicConfig;
use crate::peer_manager::peerdb::PeerDB;
use crate::rpc::{MetaData, MetaDataV3};
use crate::types::{BackFillState, SyncState};
@@ -183,6 +184,16 @@ impl<E: EthSpec> NetworkGlobals<E> {
.collect::<Vec<_>>()
}
/// Returns the TopicConfig to compute the set of Gossip topics for a given fork
pub fn as_topic_config(&self) -> TopicConfig {
TopicConfig {
enable_light_client_server: self.config.enable_light_client_server,
subscribe_all_subnets: self.config.subscribe_all_subnets,
subscribe_all_data_column_subnets: self.config.subscribe_all_data_column_subnets,
sampling_subnets: &self.sampling_subnets,
}
}
/// TESTING ONLY. Build a dummy NetworkGlobals instance.
pub fn new_test_globals(
trusted_peers: Vec<PeerId>,

View File

@@ -16,7 +16,6 @@ pub use pubsub::{PubsubMessage, SnappyTransform};
pub use subnet::{Subnet, SubnetDiscovery};
pub use sync_state::{BackFillState, SyncState};
pub use topics::{
attestation_sync_committee_topics, core_topics_to_subscribe, fork_core_topics,
subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, ALTAIR_CORE_TOPICS,
BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS,
all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic, subnet_from_topic_hash,
GossipEncoding, GossipKind, GossipTopic, TopicConfig,
};

View File

@@ -1,5 +1,6 @@
use gossipsub::{IdentTopic as Topic, TopicHash};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use strum::AsRefStr;
use types::{ChainSpec, DataColumnSubnetId, EthSpec, ForkName, SubnetId, SyncSubnetId, Unsigned};
@@ -25,81 +26,115 @@ pub const LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_update";
pub const LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update";
pub const INCLUSION_LIST_TOPIC: &str = "inclusion_list";
pub const BASE_CORE_TOPICS: [GossipKind; 5] = [
GossipKind::BeaconBlock,
GossipKind::BeaconAggregateAndProof,
GossipKind::VoluntaryExit,
GossipKind::ProposerSlashing,
GossipKind::AttesterSlashing,
];
pub const ALTAIR_CORE_TOPICS: [GossipKind; 1] = [GossipKind::SignedContributionAndProof];
pub const CAPELLA_CORE_TOPICS: [GossipKind; 1] = [GossipKind::BlsToExecutionChange];
pub const LIGHT_CLIENT_GOSSIP_TOPICS: [GossipKind; 2] = [
GossipKind::LightClientFinalityUpdate,
GossipKind::LightClientOptimisticUpdate,
];
pub const FULU_CORE_TOPICS: [GossipKind; 1] = [GossipKind::InclusionList];
/// Returns the core topics associated with each fork that are new to the previous fork
pub fn fork_core_topics<E: EthSpec>(fork_name: &ForkName, spec: &ChainSpec) -> Vec<GossipKind> {
match fork_name {
ForkName::Base => BASE_CORE_TOPICS.to_vec(),
ForkName::Altair => ALTAIR_CORE_TOPICS.to_vec(),
ForkName::Bellatrix => vec![],
ForkName::Capella => CAPELLA_CORE_TOPICS.to_vec(),
ForkName::Deneb => {
// All of deneb blob topics are core topics
let mut deneb_blob_topics = Vec::new();
for i in 0..spec.blob_sidecar_subnet_count(ForkName::Deneb) {
deneb_blob_topics.push(GossipKind::BlobSidecar(i));
}
deneb_blob_topics
}
ForkName::Electra => {
// All of electra blob topics are core topics
let mut electra_blob_topics = Vec::new();
for i in 0..spec.blob_sidecar_subnet_count(ForkName::Electra) {
electra_blob_topics.push(GossipKind::BlobSidecar(i));
}
electra_blob_topics
}
ForkName::Fulu => FULU_CORE_TOPICS.to_vec(),
}
#[derive(Debug)]
pub struct TopicConfig<'a> {
pub enable_light_client_server: bool,
pub subscribe_all_subnets: bool,
pub subscribe_all_data_column_subnets: bool,
pub sampling_subnets: &'a HashSet<DataColumnSubnetId>,
}
/// Returns all the attestation and sync committee topics, for a given fork.
pub fn attestation_sync_committee_topics<E: EthSpec>() -> impl Iterator<Item = GossipKind> {
(0..E::SubnetBitfieldLength::to_usize())
.map(|subnet_id| GossipKind::Attestation(SubnetId::new(subnet_id as u64)))
.chain(
(0..E::SyncCommitteeSubnetCount::to_usize()).map(|sync_committee_id| {
GossipKind::SyncCommitteeMessage(SyncSubnetId::new(sync_committee_id as u64))
}),
)
}
/// Returns all the topics that we need to subscribe to for a given fork
/// including topics from older forks and new topics for the current fork.
/// Returns all the topics the node should subscribe at `fork_name`
pub fn core_topics_to_subscribe<E: EthSpec>(
mut current_fork: ForkName,
fork_name: ForkName,
opts: &TopicConfig,
spec: &ChainSpec,
) -> Vec<GossipKind> {
let mut topics = fork_core_topics::<E>(&current_fork, spec);
while let Some(previous_fork) = current_fork.previous_fork() {
let previous_fork_topics = fork_core_topics::<E>(&previous_fork, spec);
topics.extend(previous_fork_topics);
current_fork = previous_fork;
let mut topics = vec![
GossipKind::BeaconBlock,
GossipKind::BeaconAggregateAndProof,
GossipKind::VoluntaryExit,
GossipKind::ProposerSlashing,
GossipKind::AttesterSlashing,
];
if opts.subscribe_all_subnets {
for i in 0..spec.attestation_subnet_count {
topics.push(GossipKind::Attestation(i.into()));
}
}
// Remove duplicates
if fork_name.altair_enabled() {
topics.push(GossipKind::SignedContributionAndProof);
if opts.subscribe_all_subnets {
for i in 0..E::SyncCommitteeSubnetCount::to_u64() {
topics.push(GossipKind::SyncCommitteeMessage(i.into()));
}
}
if opts.enable_light_client_server {
topics.push(GossipKind::LightClientFinalityUpdate);
topics.push(GossipKind::LightClientOptimisticUpdate);
}
}
if fork_name.capella_enabled() {
topics.push(GossipKind::BlsToExecutionChange);
}
if fork_name.deneb_enabled() && !fork_name.fulu_enabled() {
// All of deneb blob topics are core topics
for i in 0..spec.blob_sidecar_subnet_count(fork_name) {
topics.push(GossipKind::BlobSidecar(i));
}
}
if fork_name.electra_enabled() {
topics.push(GossipKind::InclusionList);
}
if fork_name.fulu_enabled() {
if opts.subscribe_all_data_column_subnets {
for i in 0..spec.data_column_sidecar_subnet_count {
topics.push(GossipKind::DataColumnSidecar(i.into()));
}
} else {
for subnet in opts.sampling_subnets {
topics.push(GossipKind::DataColumnSidecar(*subnet));
}
}
}
topics
.into_iter()
.collect::<std::collections::HashSet<_>>()
.into_iter()
.collect()
}
/// Returns true if a given non-core `GossipTopic` MAY be subscribe at this fork.
///
/// For example: the `Attestation` topic is not subscribed as a core topic if
/// subscribe_all_subnets = false` but we may subscribe to it outside of a fork
/// boundary if the node is an aggregator.
pub fn is_fork_non_core_topic(topic: &GossipTopic, _fork_name: ForkName) -> bool {
match topic.kind() {
// Node may be aggregator of attestation and sync_committee_message topics for all known
// forks
GossipKind::Attestation(_) | GossipKind::SyncCommitteeMessage(_) => true,
// All these topics are core-only
GossipKind::BeaconBlock
| GossipKind::BeaconAggregateAndProof
| GossipKind::BlobSidecar(_)
| GossipKind::DataColumnSidecar(_)
| GossipKind::VoluntaryExit
| GossipKind::ProposerSlashing
| GossipKind::AttesterSlashing
| GossipKind::SignedContributionAndProof
| GossipKind::BlsToExecutionChange
| GossipKind::LightClientFinalityUpdate
| GossipKind::LightClientOptimisticUpdate
| GossipKind::InclusionList => false,
}
}
pub fn all_topics_at_fork<E: EthSpec>(fork: ForkName, spec: &ChainSpec) -> Vec<GossipKind> {
// Compute the worst case of all forks
let sampling_subnets = HashSet::from_iter(spec.all_data_column_sidecar_subnets());
let opts = TopicConfig {
enable_light_client_server: true,
subscribe_all_subnets: true,
subscribe_all_data_column_subnets: true,
sampling_subnets: &sampling_subnets,
};
core_topics_to_subscribe::<E>(fork, &opts, spec)
}
/// A gossipsub topic which encapsulates the type of messages that should be sent and received over
@@ -349,10 +384,9 @@ fn subnet_topic_index(topic: &str) -> Option<GossipKind> {
#[cfg(test)]
mod tests {
use types::MainnetEthSpec;
use super::GossipKind::*;
use super::*;
use types::{Epoch, MainnetEthSpec as E};
const GOOD_FORK_DIGEST: &str = "e1925f3b";
const BAD_PREFIX: &str = "tezos";
@@ -477,24 +511,94 @@ mod tests {
assert_eq!("attester_slashing", AttesterSlashing.as_ref());
}
fn get_spec() -> ChainSpec {
let mut spec = E::default_spec();
spec.altair_fork_epoch = Some(Epoch::new(1));
spec.bellatrix_fork_epoch = Some(Epoch::new(2));
spec.capella_fork_epoch = Some(Epoch::new(3));
spec.deneb_fork_epoch = Some(Epoch::new(4));
spec.electra_fork_epoch = Some(Epoch::new(5));
spec.fulu_fork_epoch = Some(Epoch::new(6));
spec
}
fn get_sampling_subnets() -> HashSet<DataColumnSubnetId> {
HashSet::new()
}
fn get_topic_config(sampling_subnets: &HashSet<DataColumnSubnetId>) -> TopicConfig {
TopicConfig {
enable_light_client_server: false,
subscribe_all_subnets: false,
subscribe_all_data_column_subnets: false,
sampling_subnets,
}
}
#[test]
fn base_topics_are_always_active() {
let spec = get_spec();
let s = get_sampling_subnets();
let topic_config = get_topic_config(&s);
for fork in ForkName::list_all() {
assert!(core_topics_to_subscribe::<E>(fork, &topic_config, &spec,)
.contains(&GossipKind::BeaconBlock));
}
}
#[test]
fn blobs_are_not_subscribed_in_peerdas() {
let spec = get_spec();
let s = get_sampling_subnets();
let topic_config = get_topic_config(&s);
assert!(
!core_topics_to_subscribe::<E>(ForkName::Fulu, &topic_config, &spec,)
.contains(&GossipKind::BlobSidecar(0))
);
}
#[test]
fn columns_are_subscribed_in_peerdas() {
let spec = get_spec();
let s = get_sampling_subnets();
let mut topic_config = get_topic_config(&s);
topic_config.subscribe_all_data_column_subnets = true;
assert!(
core_topics_to_subscribe::<E>(ForkName::Fulu, &topic_config, &spec)
.contains(&GossipKind::DataColumnSidecar(0.into()))
);
}
#[test]
fn test_core_topics_to_subscribe() {
type E = MainnetEthSpec;
let spec = E::default_spec();
let mut all_topics = Vec::new();
let mut electra_core_topics = fork_core_topics::<E>(&ForkName::Electra, &spec);
let mut deneb_core_topics = fork_core_topics::<E>(&ForkName::Deneb, &spec);
all_topics.append(&mut electra_core_topics);
all_topics.append(&mut deneb_core_topics);
all_topics.extend(CAPELLA_CORE_TOPICS);
all_topics.extend(ALTAIR_CORE_TOPICS);
all_topics.extend(BASE_CORE_TOPICS);
let spec = get_spec();
let s = HashSet::from_iter([1, 2].map(DataColumnSubnetId::new));
let mut topic_config = get_topic_config(&s);
topic_config.enable_light_client_server = true;
let latest_fork = *ForkName::list_all().last().unwrap();
let core_topics = core_topics_to_subscribe::<E>(latest_fork, &spec);
let topics = core_topics_to_subscribe::<E>(latest_fork, &topic_config, &spec);
let mut expected_topics = vec![
GossipKind::BeaconBlock,
GossipKind::BeaconAggregateAndProof,
GossipKind::VoluntaryExit,
GossipKind::ProposerSlashing,
GossipKind::AttesterSlashing,
GossipKind::SignedContributionAndProof,
GossipKind::LightClientFinalityUpdate,
GossipKind::LightClientOptimisticUpdate,
GossipKind::BlsToExecutionChange,
];
for subnet in s {
expected_topics.push(GossipKind::DataColumnSidecar(subnet));
}
// Need to check all the topics exist in an order independent manner
for topic in all_topics {
assert!(core_topics.contains(&topic));
for expected_topic in expected_topics {
assert!(
topics.contains(&expected_topic),
"Should contain {:?}",
expected_topic
);
}
}
}