mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-07 00:42:42 +00:00
Implement Metadatav3 (#6303)
* Add a V3 variant for metadata * Add v3 for requests; persistence logic * Set custody_subnets on setting metadata * Fix tests * Address some comments * fmt * Address more comments * Fix tests * Update metadata rpc limits * Update method doc.
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
//! Implementation of Lighthouse's peer management system.
|
||||
|
||||
use crate::discovery::enr_ext::EnrExt;
|
||||
use crate::discovery::peer_id_to_node_id;
|
||||
use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCode};
|
||||
use crate::service::TARGET_SUBNET_PEERS;
|
||||
use crate::{error, metrics, Gossipsub};
|
||||
@@ -716,7 +717,8 @@ impl<E: EthSpec> PeerManager<E> {
|
||||
debug!(self.log, "Obtained peer's metadata";
|
||||
"peer_id" => %peer_id, "new_seq_no" => meta_data.seq_number());
|
||||
}
|
||||
peer_info.set_meta_data(meta_data);
|
||||
let node_id_opt = peer_id_to_node_id(peer_id).ok();
|
||||
peer_info.set_meta_data(meta_data, node_id_opt, &self.network_globals.spec);
|
||||
} else {
|
||||
error!(self.log, "Received METADATA from an unknown peer";
|
||||
"peer_id" => %peer_id);
|
||||
@@ -1678,7 +1680,11 @@ mod tests {
|
||||
.write()
|
||||
.peer_info_mut(&peer0)
|
||||
.unwrap()
|
||||
.set_meta_data(MetaData::V2(metadata));
|
||||
.set_meta_data(
|
||||
MetaData::V2(metadata),
|
||||
None,
|
||||
&peer_manager.network_globals.spec,
|
||||
);
|
||||
peer_manager
|
||||
.network_globals
|
||||
.peers
|
||||
@@ -1698,7 +1704,11 @@ mod tests {
|
||||
.write()
|
||||
.peer_info_mut(&peer2)
|
||||
.unwrap()
|
||||
.set_meta_data(MetaData::V2(metadata));
|
||||
.set_meta_data(
|
||||
MetaData::V2(metadata),
|
||||
None,
|
||||
&peer_manager.network_globals.spec,
|
||||
);
|
||||
peer_manager
|
||||
.network_globals
|
||||
.peers
|
||||
@@ -1718,7 +1728,11 @@ mod tests {
|
||||
.write()
|
||||
.peer_info_mut(&peer4)
|
||||
.unwrap()
|
||||
.set_meta_data(MetaData::V2(metadata));
|
||||
.set_meta_data(
|
||||
MetaData::V2(metadata),
|
||||
None,
|
||||
&peer_manager.network_globals.spec,
|
||||
);
|
||||
peer_manager
|
||||
.network_globals
|
||||
.peers
|
||||
@@ -1792,7 +1806,11 @@ mod tests {
|
||||
.write()
|
||||
.peer_info_mut(&peer)
|
||||
.unwrap()
|
||||
.set_meta_data(MetaData::V2(metadata));
|
||||
.set_meta_data(
|
||||
MetaData::V2(metadata),
|
||||
None,
|
||||
&peer_manager.network_globals.spec,
|
||||
);
|
||||
peer_manager
|
||||
.network_globals
|
||||
.peers
|
||||
@@ -1916,7 +1934,11 @@ mod tests {
|
||||
.write()
|
||||
.peer_info_mut(&peer)
|
||||
.unwrap()
|
||||
.set_meta_data(MetaData::V2(metadata));
|
||||
.set_meta_data(
|
||||
MetaData::V2(metadata),
|
||||
None,
|
||||
&peer_manager.network_globals.spec,
|
||||
);
|
||||
let long_lived_subnets = peer_manager
|
||||
.network_globals
|
||||
.peers
|
||||
@@ -2025,7 +2047,11 @@ mod tests {
|
||||
.write()
|
||||
.peer_info_mut(&peer)
|
||||
.unwrap()
|
||||
.set_meta_data(MetaData::V2(metadata));
|
||||
.set_meta_data(
|
||||
MetaData::V2(metadata),
|
||||
None,
|
||||
&peer_manager.network_globals.spec,
|
||||
);
|
||||
let long_lived_subnets = peer_manager
|
||||
.network_globals
|
||||
.peers
|
||||
@@ -2191,7 +2217,11 @@ mod tests {
|
||||
.write()
|
||||
.peer_info_mut(&peer)
|
||||
.unwrap()
|
||||
.set_meta_data(MetaData::V2(metadata));
|
||||
.set_meta_data(
|
||||
MetaData::V2(metadata),
|
||||
None,
|
||||
&peer_manager.network_globals.spec,
|
||||
);
|
||||
let long_lived_subnets = peer_manager
|
||||
.network_globals
|
||||
.peers
|
||||
@@ -2348,7 +2378,11 @@ mod tests {
|
||||
|
||||
let mut peer_db = peer_manager.network_globals.peers.write();
|
||||
let peer_info = peer_db.peer_info_mut(&condition.peer_id).unwrap();
|
||||
peer_info.set_meta_data(MetaData::V2(metadata));
|
||||
peer_info.set_meta_data(
|
||||
MetaData::V2(metadata),
|
||||
None,
|
||||
&peer_manager.network_globals.spec,
|
||||
);
|
||||
peer_info.set_gossipsub_score(condition.gossipsub_score);
|
||||
peer_info.add_to_score(condition.score);
|
||||
|
||||
|
||||
@@ -256,6 +256,8 @@ impl<E: EthSpec> PeerDB<E> {
|
||||
.map(|(peer_id, _)| peer_id)
|
||||
}
|
||||
|
||||
/// Returns an iterator of all good gossipsub peers that are supposed to be custodying
|
||||
/// the given subnet id.
|
||||
pub fn good_custody_subnet_peer(
|
||||
&self,
|
||||
subnet: DataColumnSubnetId,
|
||||
@@ -263,15 +265,8 @@ impl<E: EthSpec> PeerDB<E> {
|
||||
self.peers
|
||||
.iter()
|
||||
.filter(move |(_, info)| {
|
||||
// TODO(das): we currently consider peer to be a subnet peer if the peer is *either*
|
||||
// subscribed to the subnet or assigned to the subnet.
|
||||
// The first condition is currently required as we don't have custody count in
|
||||
// metadata implemented yet, and therefore unable to reliably determine custody
|
||||
// subnet count (ENR is not always available).
|
||||
// This condition can be removed later so that we can identify peers that are not
|
||||
// serving custody columns and penalise accordingly.
|
||||
let is_custody_subnet_peer = info.on_subnet_gossipsub(&Subnet::DataColumn(subnet))
|
||||
|| info.is_assigned_to_custody_subnet(&subnet);
|
||||
// The custody_subnets hashset can be populated via enr or metadata
|
||||
let is_custody_subnet_peer = info.is_assigned_to_custody_subnet(&subnet);
|
||||
info.is_connected() && info.is_good_gossipsub_peer() && is_custody_subnet_peer
|
||||
})
|
||||
.map(|(peer_id, _)| peer_id)
|
||||
|
||||
@@ -3,6 +3,7 @@ use super::score::{PeerAction, Score, ScoreState};
|
||||
use super::sync_status::SyncStatus;
|
||||
use crate::discovery::Eth2Enr;
|
||||
use crate::{rpc::MetaData, types::Subnet};
|
||||
use discv5::enr::NodeId;
|
||||
use discv5::Enr;
|
||||
use libp2p::core::multiaddr::{Multiaddr, Protocol};
|
||||
use serde::{
|
||||
@@ -13,7 +14,7 @@ use std::collections::HashSet;
|
||||
use std::net::IpAddr;
|
||||
use std::time::Instant;
|
||||
use strum::AsRefStr;
|
||||
use types::{DataColumnSubnetId, EthSpec};
|
||||
use types::{ChainSpec, DataColumnSubnetId, EthSpec};
|
||||
use PeerConnectionStatus::*;
|
||||
|
||||
/// Information about a given connected peer.
|
||||
@@ -89,6 +90,7 @@ impl<E: EthSpec> PeerInfo<E> {
|
||||
}
|
||||
|
||||
/// Returns if the peer is subscribed to a given `Subnet` from the metadata attnets/syncnets field.
|
||||
/// Also returns true if the peer is assigned to custody a given data column `Subnet` computed from the metadata `custody_column_count` field or ENR `csc` field.
|
||||
pub fn on_subnet_metadata(&self, subnet: &Subnet) -> bool {
|
||||
if let Some(meta_data) = &self.meta_data {
|
||||
match subnet {
|
||||
@@ -100,15 +102,7 @@ impl<E: EthSpec> PeerInfo<E> {
|
||||
.syncnets()
|
||||
.map_or(false, |s| s.get(**id as usize).unwrap_or(false))
|
||||
}
|
||||
Subnet::DataColumn(_) => {
|
||||
// TODO(das): Pending spec PR https://github.com/ethereum/consensus-specs/pull/3821
|
||||
// We should use MetaDataV3 for peer selection rather than
|
||||
// looking at subscribed peers (current behavior). Until MetaDataV3 is
|
||||
// implemented, this is the perhaps the only viable option on the current devnet
|
||||
// as the peer count is low and it's important to identify supernodes to get a
|
||||
// good distribution of peers across subnets.
|
||||
return true;
|
||||
}
|
||||
Subnet::DataColumn(column) => return self.custody_subnets.contains(column),
|
||||
}
|
||||
}
|
||||
false
|
||||
@@ -364,8 +358,32 @@ impl<E: EthSpec> PeerInfo<E> {
|
||||
|
||||
/// Sets an explicit value for the meta data.
|
||||
// VISIBILITY: The peer manager is able to adjust the meta_data
|
||||
pub(in crate::peer_manager) fn set_meta_data(&mut self, meta_data: MetaData<E>) {
|
||||
self.meta_data = Some(meta_data)
|
||||
pub(in crate::peer_manager) fn set_meta_data(
|
||||
&mut self,
|
||||
meta_data: MetaData<E>,
|
||||
node_id_opt: Option<NodeId>,
|
||||
spec: &ChainSpec,
|
||||
) {
|
||||
// If we don't have a node id, we cannot compute the custody duties anyway
|
||||
let Some(node_id) = node_id_opt else {
|
||||
self.meta_data = Some(meta_data);
|
||||
return;
|
||||
};
|
||||
|
||||
// Already set by enr if custody_subnets is non empty
|
||||
if self.custody_subnets.is_empty() {
|
||||
if let Ok(custody_subnet_count) = meta_data.custody_subnet_count() {
|
||||
let custody_subnets = DataColumnSubnetId::compute_custody_subnets::<E>(
|
||||
node_id.raw().into(),
|
||||
std::cmp::min(*custody_subnet_count, spec.data_column_sidecar_subnet_count),
|
||||
spec,
|
||||
)
|
||||
.collect::<HashSet<_>>();
|
||||
self.set_custody_subnets(custody_subnets);
|
||||
}
|
||||
}
|
||||
|
||||
self.meta_data = Some(meta_data);
|
||||
}
|
||||
|
||||
/// Sets the connection status of the peer.
|
||||
|
||||
@@ -82,9 +82,10 @@ impl<E: EthSpec> Encoder<RPCCodedResponse<E>> for SSZSnappyInboundCodec<E> {
|
||||
{
|
||||
match self.protocol.versioned_protocol {
|
||||
SupportedProtocol::MetaDataV1 => res.metadata_v1().as_ssz_bytes(),
|
||||
// We always send V2 metadata responses from the behaviour
|
||||
// No change required.
|
||||
SupportedProtocol::MetaDataV2 => res.metadata_v2().as_ssz_bytes(),
|
||||
SupportedProtocol::MetaDataV3 => {
|
||||
res.metadata_v3(&self.fork_context.spec).as_ssz_bytes()
|
||||
}
|
||||
_ => unreachable!(
|
||||
"We only send metadata responses on negotiating metadata requests"
|
||||
),
|
||||
@@ -136,6 +137,9 @@ impl<E: EthSpec> Decoder for SSZSnappyInboundCodec<E> {
|
||||
if self.protocol.versioned_protocol == SupportedProtocol::MetaDataV2 {
|
||||
return Ok(Some(InboundRequest::MetaData(MetadataRequest::new_v2())));
|
||||
}
|
||||
if self.protocol.versioned_protocol == SupportedProtocol::MetaDataV3 {
|
||||
return Ok(Some(InboundRequest::MetaData(MetadataRequest::new_v3())));
|
||||
}
|
||||
let Some(length) = handle_length(&mut self.inner, &mut self.len, src)? else {
|
||||
return Ok(None);
|
||||
};
|
||||
@@ -549,6 +553,15 @@ fn handle_rpc_request<E: EthSpec>(
|
||||
}
|
||||
// MetaData requests return early from InboundUpgrade and do not reach the decoder.
|
||||
// Handle this case just for completeness.
|
||||
SupportedProtocol::MetaDataV3 => {
|
||||
if !decoded_buffer.is_empty() {
|
||||
Err(RPCError::InternalError(
|
||||
"Metadata requests shouldn't reach decoder",
|
||||
))
|
||||
} else {
|
||||
Ok(Some(InboundRequest::MetaData(MetadataRequest::new_v3())))
|
||||
}
|
||||
}
|
||||
SupportedProtocol::MetaDataV2 => {
|
||||
if !decoded_buffer.is_empty() {
|
||||
Err(RPCError::InternalError(
|
||||
@@ -712,7 +725,10 @@ fn handle_rpc_response<E: EthSpec>(
|
||||
),
|
||||
)),
|
||||
},
|
||||
// MetaData V2 responses have no context bytes, so behave similarly to V1 responses
|
||||
// MetaData V2/V3 responses have no context bytes, so behave similarly to V1 responses
|
||||
SupportedProtocol::MetaDataV3 => Ok(Some(RPCResponse::MetaData(MetaData::V3(
|
||||
MetaDataV3::from_ssz_bytes(decoded_buffer)?,
|
||||
)))),
|
||||
SupportedProtocol::MetaDataV2 => Ok(Some(RPCResponse::MetaData(MetaData::V2(
|
||||
MetaDataV2::from_ssz_bytes(decoded_buffer)?,
|
||||
)))),
|
||||
@@ -984,6 +1000,15 @@ mod tests {
|
||||
})
|
||||
}
|
||||
|
||||
fn metadata_v3() -> MetaData<Spec> {
|
||||
MetaData::V3(MetaDataV3 {
|
||||
seq_number: 1,
|
||||
attnets: EnrAttestationBitfield::<Spec>::default(),
|
||||
syncnets: EnrSyncCommitteeBitfield::<Spec>::default(),
|
||||
custody_subnet_count: 1,
|
||||
})
|
||||
}
|
||||
|
||||
/// Encodes the given protocol response as bytes.
|
||||
fn encode_response(
|
||||
protocol: SupportedProtocol,
|
||||
@@ -1217,6 +1242,17 @@ mod tests {
|
||||
Ok(Some(RPCResponse::MetaData(metadata()))),
|
||||
);
|
||||
|
||||
// A MetaDataV3 still encodes as a MetaDataV2 since version is Version::V2
|
||||
assert_eq!(
|
||||
encode_then_decode_response(
|
||||
SupportedProtocol::MetaDataV2,
|
||||
RPCCodedResponse::Success(RPCResponse::MetaData(metadata_v3())),
|
||||
ForkName::Base,
|
||||
&chain_spec,
|
||||
),
|
||||
Ok(Some(RPCResponse::MetaData(metadata_v2()))),
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
encode_then_decode_response(
|
||||
SupportedProtocol::BlobsByRangeV1,
|
||||
|
||||
@@ -89,7 +89,7 @@ pub struct Ping {
|
||||
|
||||
/// The METADATA request structure.
|
||||
#[superstruct(
|
||||
variants(V1, V2),
|
||||
variants(V1, V2, V3),
|
||||
variant_attributes(derive(Clone, Debug, PartialEq, Serialize),)
|
||||
)]
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
@@ -109,11 +109,17 @@ impl<E: EthSpec> MetadataRequest<E> {
|
||||
_phantom_data: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn new_v3() -> Self {
|
||||
Self::V3(MetadataRequestV3 {
|
||||
_phantom_data: PhantomData,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// The METADATA response structure.
|
||||
#[superstruct(
|
||||
variants(V1, V2),
|
||||
variants(V1, V2, V3),
|
||||
variant_attributes(
|
||||
derive(Encode, Decode, Clone, Debug, PartialEq, Serialize),
|
||||
serde(bound = "E: EthSpec", deny_unknown_fields),
|
||||
@@ -127,8 +133,10 @@ pub struct MetaData<E: EthSpec> {
|
||||
/// The persistent attestation subnet bitfield.
|
||||
pub attnets: EnrAttestationBitfield<E>,
|
||||
/// The persistent sync committee bitfield.
|
||||
#[superstruct(only(V2))]
|
||||
#[superstruct(only(V2, V3))]
|
||||
pub syncnets: EnrSyncCommitteeBitfield<E>,
|
||||
#[superstruct(only(V3))]
|
||||
pub custody_subnet_count: u64,
|
||||
}
|
||||
|
||||
impl<E: EthSpec> MetaData<E> {
|
||||
@@ -140,6 +148,10 @@ impl<E: EthSpec> MetaData<E> {
|
||||
seq_number: metadata.seq_number,
|
||||
attnets: metadata.attnets.clone(),
|
||||
}),
|
||||
MetaData::V3(metadata) => MetaData::V1(MetaDataV1 {
|
||||
seq_number: metadata.seq_number,
|
||||
attnets: metadata.attnets.clone(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -152,6 +164,30 @@ impl<E: EthSpec> MetaData<E> {
|
||||
syncnets: Default::default(),
|
||||
}),
|
||||
md @ MetaData::V2(_) => md.clone(),
|
||||
MetaData::V3(metadata) => MetaData::V2(MetaDataV2 {
|
||||
seq_number: metadata.seq_number,
|
||||
attnets: metadata.attnets.clone(),
|
||||
syncnets: metadata.syncnets.clone(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a V3 MetaData response from self by filling unavailable fields with default.
|
||||
pub fn metadata_v3(&self, spec: &ChainSpec) -> Self {
|
||||
match self {
|
||||
MetaData::V1(metadata) => MetaData::V3(MetaDataV3 {
|
||||
seq_number: metadata.seq_number,
|
||||
attnets: metadata.attnets.clone(),
|
||||
syncnets: Default::default(),
|
||||
custody_subnet_count: spec.custody_requirement,
|
||||
}),
|
||||
MetaData::V2(metadata) => MetaData::V3(MetaDataV3 {
|
||||
seq_number: metadata.seq_number,
|
||||
attnets: metadata.attnets.clone(),
|
||||
syncnets: metadata.syncnets.clone(),
|
||||
custody_subnet_count: spec.custody_requirement,
|
||||
}),
|
||||
md @ MetaData::V3(_) => md.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -159,6 +195,7 @@ impl<E: EthSpec> MetaData<E> {
|
||||
match self {
|
||||
MetaData::V1(md) => md.as_ssz_bytes(),
|
||||
MetaData::V2(md) => md.as_ssz_bytes(),
|
||||
MetaData::V3(md) => md.as_ssz_bytes(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,7 +21,9 @@ use std::time::Duration;
|
||||
use types::{EthSpec, ForkContext};
|
||||
|
||||
pub(crate) use handler::{HandlerErr, HandlerEvent};
|
||||
pub(crate) use methods::{MetaData, MetaDataV1, MetaDataV2, Ping, RPCCodedResponse, RPCResponse};
|
||||
pub(crate) use methods::{
|
||||
MetaData, MetaDataV1, MetaDataV2, MetaDataV3, Ping, RPCCodedResponse, RPCResponse,
|
||||
};
|
||||
pub(crate) use protocol::InboundRequest;
|
||||
|
||||
pub use handler::SubstreamId;
|
||||
|
||||
@@ -94,6 +94,7 @@ impl<E: EthSpec> OutboundRequest<E> {
|
||||
Encoding::SSZSnappy,
|
||||
)],
|
||||
OutboundRequest::MetaData(_) => vec![
|
||||
ProtocolId::new(SupportedProtocol::MetaDataV3, Encoding::SSZSnappy),
|
||||
ProtocolId::new(SupportedProtocol::MetaDataV2, Encoding::SSZSnappy),
|
||||
ProtocolId::new(SupportedProtocol::MetaDataV1, Encoding::SSZSnappy),
|
||||
],
|
||||
@@ -153,6 +154,7 @@ impl<E: EthSpec> OutboundRequest<E> {
|
||||
OutboundRequest::MetaData(req) => match req {
|
||||
MetadataRequest::V1(_) => SupportedProtocol::MetaDataV1,
|
||||
MetadataRequest::V2(_) => SupportedProtocol::MetaDataV2,
|
||||
MetadataRequest::V3(_) => SupportedProtocol::MetaDataV3,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -332,6 +332,7 @@ pub enum SupportedProtocol {
|
||||
PingV1,
|
||||
MetaDataV1,
|
||||
MetaDataV2,
|
||||
MetaDataV3,
|
||||
LightClientBootstrapV1,
|
||||
LightClientOptimisticUpdateV1,
|
||||
LightClientFinalityUpdateV1,
|
||||
@@ -353,6 +354,7 @@ impl SupportedProtocol {
|
||||
SupportedProtocol::PingV1 => "1",
|
||||
SupportedProtocol::MetaDataV1 => "1",
|
||||
SupportedProtocol::MetaDataV2 => "2",
|
||||
SupportedProtocol::MetaDataV3 => "3",
|
||||
SupportedProtocol::LightClientBootstrapV1 => "1",
|
||||
SupportedProtocol::LightClientOptimisticUpdateV1 => "1",
|
||||
SupportedProtocol::LightClientFinalityUpdateV1 => "1",
|
||||
@@ -374,6 +376,7 @@ impl SupportedProtocol {
|
||||
SupportedProtocol::PingV1 => Protocol::Ping,
|
||||
SupportedProtocol::MetaDataV1 => Protocol::MetaData,
|
||||
SupportedProtocol::MetaDataV2 => Protocol::MetaData,
|
||||
SupportedProtocol::MetaDataV3 => Protocol::MetaData,
|
||||
SupportedProtocol::LightClientBootstrapV1 => Protocol::LightClientBootstrap,
|
||||
SupportedProtocol::LightClientOptimisticUpdateV1 => {
|
||||
Protocol::LightClientOptimisticUpdate
|
||||
@@ -392,9 +395,20 @@ impl SupportedProtocol {
|
||||
ProtocolId::new(Self::BlocksByRootV2, Encoding::SSZSnappy),
|
||||
ProtocolId::new(Self::BlocksByRootV1, Encoding::SSZSnappy),
|
||||
ProtocolId::new(Self::PingV1, Encoding::SSZSnappy),
|
||||
ProtocolId::new(Self::MetaDataV2, Encoding::SSZSnappy),
|
||||
ProtocolId::new(Self::MetaDataV1, Encoding::SSZSnappy),
|
||||
];
|
||||
if fork_context.spec.is_peer_das_scheduled() {
|
||||
supported.extend_from_slice(&[
|
||||
// V3 variants have higher preference for protocol negotation
|
||||
ProtocolId::new(Self::MetaDataV3, Encoding::SSZSnappy),
|
||||
ProtocolId::new(Self::MetaDataV2, Encoding::SSZSnappy),
|
||||
ProtocolId::new(Self::MetaDataV1, Encoding::SSZSnappy),
|
||||
]);
|
||||
} else {
|
||||
supported.extend_from_slice(&[
|
||||
ProtocolId::new(Self::MetaDataV2, Encoding::SSZSnappy),
|
||||
ProtocolId::new(Self::MetaDataV1, Encoding::SSZSnappy),
|
||||
]);
|
||||
}
|
||||
if fork_context.fork_exists(ForkName::Deneb) {
|
||||
supported.extend_from_slice(&[
|
||||
ProtocolId::new(SupportedProtocol::BlobsByRootV1, Encoding::SSZSnappy),
|
||||
@@ -554,7 +568,7 @@ impl ProtocolId {
|
||||
),
|
||||
Protocol::MetaData => RpcLimits::new(
|
||||
<MetaDataV1<E> as Encode>::ssz_fixed_len(),
|
||||
<MetaDataV2<E> as Encode>::ssz_fixed_len(),
|
||||
<MetaDataV3<E> as Encode>::ssz_fixed_len(),
|
||||
),
|
||||
Protocol::LightClientBootstrap => {
|
||||
rpc_light_client_bootstrap_limits_by_fork(fork_context.current_fork())
|
||||
@@ -587,6 +601,7 @@ impl ProtocolId {
|
||||
| SupportedProtocol::PingV1
|
||||
| SupportedProtocol::MetaDataV1
|
||||
| SupportedProtocol::MetaDataV2
|
||||
| SupportedProtocol::MetaDataV3
|
||||
| SupportedProtocol::GoodbyeV1 => false,
|
||||
}
|
||||
}
|
||||
@@ -671,6 +686,9 @@ where
|
||||
SupportedProtocol::MetaDataV2 => {
|
||||
Ok((InboundRequest::MetaData(MetadataRequest::new_v2()), socket))
|
||||
}
|
||||
SupportedProtocol::MetaDataV3 => {
|
||||
Ok((InboundRequest::MetaData(MetadataRequest::new_v3()), socket))
|
||||
}
|
||||
SupportedProtocol::LightClientOptimisticUpdateV1 => {
|
||||
Ok((InboundRequest::LightClientOptimisticUpdate, socket))
|
||||
}
|
||||
@@ -757,6 +775,7 @@ impl<E: EthSpec> InboundRequest<E> {
|
||||
InboundRequest::MetaData(req) => match req {
|
||||
MetadataRequest::V1(_) => SupportedProtocol::MetaDataV1,
|
||||
MetadataRequest::V2(_) => SupportedProtocol::MetaDataV2,
|
||||
MetadataRequest::V3(_) => SupportedProtocol::MetaDataV3,
|
||||
},
|
||||
InboundRequest::LightClientBootstrap(_) => SupportedProtocol::LightClientBootstrapV1,
|
||||
InboundRequest::LightClientOptimisticUpdate => {
|
||||
|
||||
@@ -165,7 +165,17 @@ impl<E: EthSpec> Network<E> {
|
||||
ctx.chain_spec,
|
||||
)?;
|
||||
// Construct the metadata
|
||||
let meta_data = utils::load_or_build_metadata(&config.network_dir, &log);
|
||||
let custody_subnet_count = if ctx.chain_spec.is_peer_das_scheduled() {
|
||||
if config.subscribe_all_data_column_subnets {
|
||||
Some(ctx.chain_spec.data_column_sidecar_subnet_count)
|
||||
} else {
|
||||
Some(ctx.chain_spec.custody_requirement)
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let meta_data =
|
||||
utils::load_or_build_metadata(&config.network_dir, custody_subnet_count, &log);
|
||||
let globals = NetworkGlobals::new(
|
||||
enr,
|
||||
meta_data,
|
||||
@@ -1130,8 +1140,14 @@ impl<E: EthSpec> Network<E> {
|
||||
|
||||
/// Sends a METADATA request to a peer.
|
||||
fn send_meta_data_request(&mut self, peer_id: PeerId) {
|
||||
// We always prefer sending V2 requests
|
||||
let event = OutboundRequest::MetaData(MetadataRequest::new_v2());
|
||||
let event = if self.fork_context.spec.is_peer_das_scheduled() {
|
||||
// Nodes with higher custody will probably start advertising it
|
||||
// before peerdas is activated
|
||||
OutboundRequest::MetaData(MetadataRequest::new_v3())
|
||||
} else {
|
||||
// We always prefer sending V2 requests otherwise
|
||||
OutboundRequest::MetaData(MetadataRequest::new_v2())
|
||||
};
|
||||
self.eth2_rpc_mut()
|
||||
.send_request(peer_id, RequestId::Internal, event);
|
||||
}
|
||||
@@ -1139,15 +1155,12 @@ impl<E: EthSpec> Network<E> {
|
||||
/// Sends a METADATA response to a peer.
|
||||
fn send_meta_data_response(
|
||||
&mut self,
|
||||
req: MetadataRequest<E>,
|
||||
_req: MetadataRequest<E>,
|
||||
id: PeerRequestId,
|
||||
peer_id: PeerId,
|
||||
) {
|
||||
let metadata = self.network_globals.local_metadata.read().clone();
|
||||
let metadata = match req {
|
||||
MetadataRequest::V1(_) => metadata.metadata_v1(),
|
||||
MetadataRequest::V2(_) => metadata,
|
||||
};
|
||||
// The encoder is responsible for sending the negotiated version of the metadata
|
||||
let event = RPCCodedResponse::Success(RPCResponse::MetaData(metadata));
|
||||
self.eth2_rpc_mut().send_response(peer_id, id, event);
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::multiaddr::Protocol;
|
||||
use crate::rpc::methods::MetaDataV3;
|
||||
use crate::rpc::{MetaData, MetaDataV1, MetaDataV2};
|
||||
use crate::types::{
|
||||
error, EnrAttestationBitfield, EnrSyncCommitteeBitfield, GossipEncoding, GossipKind,
|
||||
@@ -12,7 +13,6 @@ use libp2p::{core, noise, yamux, PeerId, Transport};
|
||||
use prometheus_client::registry::Registry;
|
||||
use slog::{debug, warn};
|
||||
use ssz::Decode;
|
||||
use ssz::Encode;
|
||||
use std::collections::HashSet;
|
||||
use std::fs::File;
|
||||
use std::io::prelude::*;
|
||||
@@ -169,6 +169,7 @@ pub fn strip_peer_id(addr: &mut Multiaddr) {
|
||||
/// Load metadata from persisted file. Return default metadata if loading fails.
|
||||
pub fn load_or_build_metadata<E: EthSpec>(
|
||||
network_dir: &std::path::Path,
|
||||
custody_subnet_count: Option<u64>,
|
||||
log: &slog::Logger,
|
||||
) -> MetaData<E> {
|
||||
// We load a V2 metadata version by default (regardless of current fork)
|
||||
@@ -219,7 +220,16 @@ pub fn load_or_build_metadata<E: EthSpec>(
|
||||
};
|
||||
|
||||
// Wrap the MetaData
|
||||
let meta_data = MetaData::V2(meta_data);
|
||||
let meta_data = if let Some(custody_count) = custody_subnet_count {
|
||||
MetaData::V3(MetaDataV3 {
|
||||
attnets: meta_data.attnets,
|
||||
seq_number: meta_data.seq_number,
|
||||
syncnets: meta_data.syncnets,
|
||||
custody_subnet_count: custody_count,
|
||||
})
|
||||
} else {
|
||||
MetaData::V2(meta_data)
|
||||
};
|
||||
|
||||
debug!(log, "Metadata sequence number"; "seq_num" => meta_data.seq_number());
|
||||
save_metadata_to_disk(network_dir, meta_data.clone(), log);
|
||||
@@ -276,10 +286,11 @@ pub(crate) fn save_metadata_to_disk<E: EthSpec>(
|
||||
log: &slog::Logger,
|
||||
) {
|
||||
let _ = std::fs::create_dir_all(dir);
|
||||
let metadata_bytes = match metadata {
|
||||
MetaData::V1(md) => md.as_ssz_bytes(),
|
||||
MetaData::V2(md) => md.as_ssz_bytes(),
|
||||
};
|
||||
// We always store the metadata v2 to disk because
|
||||
// custody_subnet_count parameter doesn't need to be persisted across runs.
|
||||
// custody_subnet_count is what the user sets it for the current run.
|
||||
// This is to prevent ugly branching logic when reading the metadata from disk.
|
||||
let metadata_bytes = metadata.metadata_v2().as_ssz_bytes();
|
||||
match File::create(dir.join(METADATA_FILENAME)).and_then(|mut f| f.write_all(&metadata_bytes)) {
|
||||
Ok(_) => {
|
||||
debug!(log, "Metadata written to disk");
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
//! A collection of variables that are accessible outside of the network thread itself.
|
||||
use crate::peer_manager::peerdb::PeerDB;
|
||||
use crate::rpc::{MetaData, MetaDataV2};
|
||||
use crate::rpc::{MetaData, MetaDataV3};
|
||||
use crate::types::{BackFillState, SyncState};
|
||||
use crate::EnrExt;
|
||||
use crate::{Client, Eth2Enr};
|
||||
@@ -26,7 +26,7 @@ pub struct NetworkGlobals<E: EthSpec> {
|
||||
pub sync_state: RwLock<SyncState>,
|
||||
/// The current state of the backfill sync.
|
||||
pub backfill_state: RwLock<BackFillState>,
|
||||
spec: ChainSpec,
|
||||
pub spec: ChainSpec,
|
||||
}
|
||||
|
||||
impl<E: EthSpec> NetworkGlobals<E> {
|
||||
@@ -137,10 +137,8 @@ impl<E: EthSpec> NetworkGlobals<E> {
|
||||
|
||||
/// Returns a connected peer that:
|
||||
/// 1. is connected
|
||||
/// 2. assigned to custody the column based on it's `custody_subnet_count` from ENR or metadata (WIP)
|
||||
/// 2. assigned to custody the column based on it's `custody_subnet_count` from ENR or metadata
|
||||
/// 3. has a good score
|
||||
/// 4. subscribed to the specified column - this condition can be removed later, so we can
|
||||
/// identify and penalise peers that are supposed to custody the column.
|
||||
pub fn custody_peers_for_column(&self, column_index: ColumnIndex) -> Vec<PeerId> {
|
||||
self.peers
|
||||
.read()
|
||||
@@ -164,10 +162,11 @@ impl<E: EthSpec> NetworkGlobals<E> {
|
||||
let enr = discv5::enr::Enr::builder().build(&enr_key).unwrap();
|
||||
NetworkGlobals::new(
|
||||
enr,
|
||||
MetaData::V2(MetaDataV2 {
|
||||
MetaData::V3(MetaDataV3 {
|
||||
seq_number: 0,
|
||||
attnets: Default::default(),
|
||||
syncnets: Default::default(),
|
||||
custody_subnet_count: spec.data_column_sidecar_subnet_count,
|
||||
}),
|
||||
trusted_peers,
|
||||
false,
|
||||
|
||||
Reference in New Issue
Block a user