mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-06 02:01:40 +00:00
Compare commits
1 Commits
bal-devnet
...
mallory-fu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
497c7d7190 |
1683
Cargo.lock
generated
1683
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -131,9 +131,9 @@ deposit_contract = { path = "common/deposit_contract" }
|
||||
derivative = "2"
|
||||
directory = { path = "common/directory" }
|
||||
dirs = "3"
|
||||
discv5 = { version = "0.9", features = ["libp2p"] }
|
||||
doppelganger_service = { path = "validator_client/doppelganger_service" }
|
||||
either = "1.9"
|
||||
discv5 = { git= "https://github.com/sigp/discv5", features = ["libp2p"], branch = "mallory" }
|
||||
env_logger = "0.9"
|
||||
environment = { path = "lighthouse/environment" }
|
||||
eth2 = { path = "common/eth2" }
|
||||
@@ -159,11 +159,11 @@ fork_choice = { path = "consensus/fork_choice" }
|
||||
fs2 = "0.4"
|
||||
futures = "0.3"
|
||||
genesis = { path = "beacon_node/genesis" }
|
||||
gossipsub = { package = "libp2p-gossipsub", git = "https://github.com/sigp/rust-libp2p.git", rev = "61b2820" }
|
||||
graffiti_file = { path = "validator_client/graffiti_file" }
|
||||
gossipsub = { package = "libp2p-gossipsub", git = "https://github.com/sigp/rust-libp2p.git", branch = "mallory" }
|
||||
hex = "0.4"
|
||||
hashlink = "0.9.0"
|
||||
health_metrics = { path = "common/health_metrics" }
|
||||
hex = "0.4"
|
||||
http_api = { path = "beacon_node/http_api" }
|
||||
hyper = "1"
|
||||
initialized_validators = { path = "validator_client/initialized_validators" }
|
||||
|
||||
@@ -35,7 +35,7 @@ pub struct Config {
|
||||
pub network_dir: PathBuf,
|
||||
|
||||
/// IP addresses to listen on.
|
||||
pub(crate) listen_addresses: ListenAddress,
|
||||
pub listen_addresses: ListenAddress,
|
||||
|
||||
/// The address to broadcast to peers about which address we are listening on. None indicates
|
||||
/// that no discovery address has been set in the CLI args.
|
||||
@@ -142,6 +142,9 @@ pub struct Config {
|
||||
|
||||
/// Flag for advertising a fake CGC to peers for testing ONLY.
|
||||
pub advertise_false_custody_group_count: Option<u64>,
|
||||
/// Extra configurations for Mallory.
|
||||
#[serde(skip)]
|
||||
pub attacker_config: crate::MalloryConfig,
|
||||
}
|
||||
|
||||
impl Config {
|
||||
@@ -367,6 +370,7 @@ impl Default for Config {
|
||||
inbound_rate_limiter_config: None,
|
||||
idontwant_message_size_threshold: DEFAULT_IDONTWANT_MESSAGE_SIZE_THRESHOLD,
|
||||
advertise_false_custody_group_count: None,
|
||||
attacker_config: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -241,14 +241,8 @@ impl<E: EthSpec> Discovery<E> {
|
||||
quic = bootnode_enr.quic4(),
|
||||
"Adding node to routing table"
|
||||
);
|
||||
let repr = bootnode_enr.to_string();
|
||||
let _ = discv5.add_enr(bootnode_enr).map_err(|e| {
|
||||
error!(
|
||||
addr = repr,
|
||||
error = e.to_string(),
|
||||
"Could not add peer to the local routing table"
|
||||
)
|
||||
});
|
||||
// Error is suppressed for mallory
|
||||
let _ = discv5.add_enr(bootnode_enr);
|
||||
}
|
||||
|
||||
// Start the discv5 service and obtain an event stream
|
||||
|
||||
@@ -2,11 +2,12 @@
|
||||
/// all required libp2p functionality.
|
||||
///
|
||||
/// This crate builds and manages the libp2p services required by the beacon node.
|
||||
mod config;
|
||||
pub mod config;
|
||||
pub mod service;
|
||||
|
||||
pub mod discovery;
|
||||
pub mod listen_addr;
|
||||
mod mallory_config;
|
||||
pub mod metrics;
|
||||
pub mod peer_manager;
|
||||
pub mod rpc;
|
||||
@@ -38,6 +39,12 @@ impl FromStr for PeerIdSerialized {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PeerId> for PeerIdSerialized {
|
||||
fn from(peer_id: PeerId) -> Self {
|
||||
PeerIdSerialized(peer_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for PeerIdSerialized {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
@@ -111,8 +118,9 @@ pub use discovery::{CombinedKeyExt, EnrExt, Eth2Enr};
|
||||
pub use discv5;
|
||||
pub use gossipsub::{IdentTopic, MessageAcceptance, MessageId, Topic, TopicHash};
|
||||
pub use libp2p;
|
||||
pub use libp2p::{core::ConnectedPoint, PeerId, Swarm};
|
||||
pub use libp2p::{core::ConnectedPoint, identity::Keypair, PeerId, Swarm};
|
||||
pub use libp2p::{multiaddr, Multiaddr};
|
||||
pub use mallory_config::MalloryConfig;
|
||||
pub use metrics::scrape_discovery_metrics;
|
||||
pub use peer_manager::{
|
||||
peerdb::client::Client,
|
||||
@@ -120,6 +128,7 @@ pub use peer_manager::{
|
||||
peerdb::PeerDB,
|
||||
ConnectionDirection, PeerConnectionStatus, PeerInfo, PeerManager, SyncInfo, SyncStatus,
|
||||
};
|
||||
pub use service::Behaviour;
|
||||
// pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_FILENAME};
|
||||
pub use service::api_types::Response;
|
||||
pub use service::utils::*;
|
||||
|
||||
37
beacon_node/lighthouse_network/src/mallory_config.rs
Normal file
37
beacon_node/lighthouse_network/src/mallory_config.rs
Normal file
@@ -0,0 +1,37 @@
|
||||
/// Every configuration needed for Mallory.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct MalloryConfig {
|
||||
/* Peer manager stuff */
|
||||
/// Ping inbound peers this often (in seconds) instead of the default `PING_INTERVAL_INBOUND`.
|
||||
pub inbound_peers_ping: Option<u64>,
|
||||
/// Ping outbound peers this often (in seconds) instead of the default `PING_INTERVAL_OUTBOUND`.
|
||||
pub outbound_peers_ping: Option<u64>,
|
||||
/// Status peers this often (in seconds) instead of the default `STATUS_INTERVAL`.
|
||||
pub status_interval: Option<u64>,
|
||||
|
||||
/* RPC stuff */
|
||||
/// Duration in seconds after which an inbound connection with a peer times out instead of the
|
||||
/// default `RESPONSE_TIMEOUT`.
|
||||
pub inbound_rpc_timeout: Option<u64>,
|
||||
|
||||
/// Duration in seconds after which an outbound connection with a peer times out instead of the
|
||||
/// default `RESPONSE_TIMEOUT`.
|
||||
pub outbound_rpc_timeout: Option<u64>,
|
||||
|
||||
/* Behaviour Stuff */
|
||||
// Allow the user to handle a ping request
|
||||
pub user_handle_ping: bool,
|
||||
}
|
||||
|
||||
impl Default for MalloryConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
inbound_peers_ping: None,
|
||||
outbound_peers_ping: None,
|
||||
status_interval: None,
|
||||
inbound_rpc_timeout: None,
|
||||
outbound_rpc_timeout: None,
|
||||
user_handle_ping: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3,11 +3,11 @@
|
||||
//! Currently using identify to fingerprint.
|
||||
|
||||
use libp2p::identify::Info as IdentifyInfo;
|
||||
use serde::Serialize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use strum::{AsRefStr, EnumIter, IntoStaticStr};
|
||||
|
||||
/// Various client and protocol information related to a node.
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct Client {
|
||||
/// The client's name (Ex: lighthouse, prism, nimbus, etc)
|
||||
pub kind: ClientKind,
|
||||
@@ -21,7 +21,9 @@ pub struct Client {
|
||||
pub agent_string: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize, PartialEq, AsRefStr, IntoStaticStr, EnumIter)]
|
||||
#[derive(
|
||||
Clone, Copy, Debug, Serialize, PartialEq, AsRefStr, IntoStaticStr, EnumIter, Deserialize,
|
||||
)]
|
||||
pub enum ClientKind {
|
||||
/// A lighthouse node (the best kind).
|
||||
Lighthouse,
|
||||
|
||||
@@ -100,7 +100,7 @@ impl<E: EthSpec> SSZSnappyInboundCodec<E> {
|
||||
}
|
||||
}
|
||||
},
|
||||
RpcResponse::Error(_, err) => err.as_ssz_bytes(),
|
||||
RpcResponse::Error(_, err) => err.as_bytes().to_vec().as_ssz_bytes(),
|
||||
RpcResponse::StreamTermination(_) => {
|
||||
unreachable!("Code error - attempting to encode a stream termination")
|
||||
}
|
||||
@@ -334,6 +334,14 @@ impl<E: EthSpec> Encoder<RequestType<E>> for SSZSnappyOutboundCodec<E> {
|
||||
type Error = RPCError;
|
||||
|
||||
fn encode(&mut self, item: RequestType<E>, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
||||
let compress = !matches!(
|
||||
&item,
|
||||
RequestType::Raw(RawRequest {
|
||||
mode: RawMode::Raw,
|
||||
..
|
||||
})
|
||||
);
|
||||
|
||||
let bytes = match item {
|
||||
RequestType::Status(req) => {
|
||||
// Send the status message based on the negotiated protocol
|
||||
@@ -365,14 +373,25 @@ impl<E: EthSpec> Encoder<RequestType<E>> for SSZSnappyOutboundCodec<E> {
|
||||
RequestType::MetaData(_)
|
||||
| RequestType::LightClientOptimisticUpdate
|
||||
| RequestType::LightClientFinalityUpdate => return Ok(()),
|
||||
RequestType::Raw(RawRequest {
|
||||
bytes,
|
||||
protocol: _,
|
||||
mode,
|
||||
}) => match mode {
|
||||
RawMode::EncodeAndCompress => bytes.as_ssz_bytes(),
|
||||
RawMode::Compress | RawMode::Raw => bytes,
|
||||
},
|
||||
};
|
||||
|
||||
// Mallory doesn't care about inbound limits
|
||||
/*
|
||||
// SSZ encoded bytes should be within `max_packet_size`
|
||||
if bytes.len() > self.max_packet_size {
|
||||
return Err(RPCError::InternalError(
|
||||
"attempting to encode data > max_packet_size",
|
||||
));
|
||||
}
|
||||
*/
|
||||
|
||||
// Inserts the length prefix of the uncompressed bytes into dst
|
||||
// encoded as a unsigned varint
|
||||
@@ -380,12 +399,14 @@ impl<E: EthSpec> Encoder<RequestType<E>> for SSZSnappyOutboundCodec<E> {
|
||||
.encode(bytes.len(), dst)
|
||||
.map_err(RPCError::from)?;
|
||||
|
||||
let mut writer = FrameEncoder::new(Vec::new());
|
||||
writer.write_all(&bytes).map_err(RPCError::from)?;
|
||||
writer.flush().map_err(RPCError::from)?;
|
||||
|
||||
// Write compressed bytes to `dst`
|
||||
dst.extend_from_slice(writer.get_ref());
|
||||
if compress {
|
||||
let mut writer = FrameEncoder::new(Vec::new());
|
||||
writer.write_all(&bytes).map_err(RPCError::from)?;
|
||||
writer.flush().map_err(RPCError::from)?;
|
||||
dst.extend_from_slice(writer.get_ref());
|
||||
} else {
|
||||
dst.extend_from_slice(&bytes);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
use super::methods::{GoodbyeReason, RpcErrorResponse, RpcResponse};
|
||||
use super::outbound::OutboundRequestContainer;
|
||||
use super::protocol::{InboundOutput, Protocol, RPCError, RPCProtocol, RequestType};
|
||||
use super::MalloryLocalConfig;
|
||||
use super::{RPCReceived, RPCSend, ReqId};
|
||||
use crate::rpc::outbound::OutboundFramed;
|
||||
use crate::rpc::protocol::InboundFramed;
|
||||
@@ -143,6 +144,9 @@ where
|
||||
|
||||
/// Timeout that will be used for inbound and outbound responses.
|
||||
resp_timeout: Duration,
|
||||
|
||||
/// Additional configurations for the RPC Handler
|
||||
config: MalloryLocalConfig,
|
||||
}
|
||||
|
||||
enum HandlerState {
|
||||
@@ -227,6 +231,7 @@ where
|
||||
resp_timeout: Duration,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
config: MalloryLocalConfig,
|
||||
) -> Self {
|
||||
RPCHandler {
|
||||
connection_id,
|
||||
@@ -247,6 +252,7 @@ where
|
||||
fork_context,
|
||||
waker: None,
|
||||
resp_timeout,
|
||||
config,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -711,8 +717,10 @@ where
|
||||
request,
|
||||
};
|
||||
substream_entry.max_remaining_chunks = Some(max_remaining_chunks);
|
||||
self.outbound_substreams_delay
|
||||
.reset(delay_key, self.resp_timeout);
|
||||
self.outbound_substreams_delay.reset(
|
||||
delay_key,
|
||||
Duration::from_secs(self.config.outbound_timeout),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1035,9 +1043,10 @@ where
|
||||
Some(max_responses)
|
||||
};
|
||||
// new outbound request. Store the stream and tag the output.
|
||||
let delay_key = self
|
||||
.outbound_substreams_delay
|
||||
.insert(self.current_outbound_substream_id, self.resp_timeout);
|
||||
let delay_key = self.outbound_substreams_delay.insert(
|
||||
self.current_outbound_substream_id,
|
||||
Duration::from_secs(self.config.outbound_timeout),
|
||||
);
|
||||
let awaiting_stream = OutboundSubstreamState::RequestPendingResponse {
|
||||
substream: Box::new(substream),
|
||||
request,
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
//! Available RPC methods types and ids.
|
||||
|
||||
use super::protocol::SupportedProtocol;
|
||||
use crate::types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield};
|
||||
use regex::bytes::Regex;
|
||||
use serde::Serialize;
|
||||
@@ -11,6 +12,7 @@ use std::marker::PhantomData;
|
||||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
use strum::IntoStaticStr;
|
||||
use strum::{Display as StrumDisplay, EnumString};
|
||||
use superstruct::superstruct;
|
||||
use types::blob_sidecar::BlobIdentifier;
|
||||
use types::light_client_update::MAX_REQUEST_LIGHT_CLIENT_UPDATES;
|
||||
@@ -129,6 +131,38 @@ pub struct Ping {
|
||||
pub data: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct RawRequest {
|
||||
pub bytes: Vec<u8>,
|
||||
pub protocol: SupportedProtocol,
|
||||
pub mode: RawMode,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, EnumString, StrumDisplay)]
|
||||
pub enum RawMode {
|
||||
/// SSZ encode, Snappy compress.
|
||||
#[strum(serialize = "encode-compress")]
|
||||
EncodeAndCompress,
|
||||
/// Only Snappy compress.
|
||||
#[strum(serialize = "compress")]
|
||||
Compress,
|
||||
/// Do not alter the bytes.
|
||||
#[strum(serialize = "raw")]
|
||||
Raw,
|
||||
}
|
||||
|
||||
impl Default for RawMode {
|
||||
fn default() -> Self {
|
||||
RawMode::EncodeAndCompress
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for RawRequest {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{:?}", self)
|
||||
}
|
||||
}
|
||||
|
||||
/// The METADATA request structure.
|
||||
#[superstruct(
|
||||
variants(V1, V2, V3),
|
||||
@@ -411,6 +445,8 @@ impl DataColumnsByRangeRequest {
|
||||
}
|
||||
}
|
||||
|
||||
const MALLORY_MAX_REQUEST_BLOCKS: usize = 10000000000000000000;
|
||||
|
||||
/// Request a number of beacon block roots from a peer.
|
||||
#[superstruct(
|
||||
variants(V1, V2),
|
||||
@@ -496,6 +532,11 @@ impl BlocksByRootRequest {
|
||||
let block_roots = RuntimeVariableList::from_vec(block_roots, max_request_blocks);
|
||||
Self::V1(BlocksByRootRequestV1 { block_roots })
|
||||
}
|
||||
|
||||
pub fn mallory_new(block_roots: Vec<Hash256>) -> Self {
|
||||
let block_roots = RuntimeVariableList::from_vec(block_roots, MALLORY_MAX_REQUEST_BLOCKS);
|
||||
Self::V2(BlocksByRootRequestV2 { block_roots })
|
||||
}
|
||||
}
|
||||
|
||||
/// Request a number of beacon blocks and blobs from a peer.
|
||||
@@ -653,10 +694,10 @@ impl ResponseTermination {
|
||||
/// and the contents of the response
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum RpcResponse<E: EthSpec> {
|
||||
/// The response is a successful.
|
||||
/// The response is successful.
|
||||
Success(RpcSuccessResponse<E>),
|
||||
|
||||
Error(RpcErrorResponse, ErrorType),
|
||||
Error(RpcErrorResponse, String),
|
||||
|
||||
/// Received a stream termination indicating which response is being terminated.
|
||||
StreamTermination(ResponseTermination),
|
||||
@@ -706,7 +747,7 @@ impl<E: EthSpec> RpcResponse<E> {
|
||||
140 => RpcErrorResponse::BlobsNotFoundForBlock,
|
||||
_ => RpcErrorResponse::Unknown,
|
||||
};
|
||||
RpcResponse::Error(code, err)
|
||||
RpcResponse::Error(code, err.to_string())
|
||||
}
|
||||
|
||||
/// Returns true if this response always terminates the stream.
|
||||
|
||||
@@ -29,12 +29,14 @@ use self::protocol::RPCProtocol;
|
||||
use self::self_limiter::SelfRateLimiter;
|
||||
use crate::rpc::rate_limiter::RateLimiterItem;
|
||||
use crate::rpc::response_limiter::ResponseLimiter;
|
||||
use crate::MalloryConfig;
|
||||
pub use handler::SubstreamId;
|
||||
pub use methods::{
|
||||
BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, LightClientBootstrapRequest,
|
||||
ResponseTermination, RpcErrorResponse, StatusMessage,
|
||||
};
|
||||
pub use protocol::{Protocol, RPCError};
|
||||
pub use methods::{RawMode, RawRequest};
|
||||
pub use protocol::{Protocol, RPCError, SupportedProtocol};
|
||||
|
||||
pub(crate) mod codec;
|
||||
pub mod config;
|
||||
@@ -149,6 +151,16 @@ pub struct NetworkParams {
|
||||
pub resp_timeout: Duration,
|
||||
}
|
||||
|
||||
/// Additional configurations for the RPC Behaviour.
|
||||
#[derive(Clone, Copy)]
|
||||
pub struct MalloryLocalConfig {
|
||||
/// Timeout in seconds for inbound connections.
|
||||
pub inbound_timeout: u64,
|
||||
/// Timeout for outbound connections.
|
||||
pub outbound_timeout: u64,
|
||||
pub self_handle_ping: bool,
|
||||
}
|
||||
|
||||
/// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level
|
||||
/// logic.
|
||||
pub struct RPC<Id: ReqId, E: EthSpec> {
|
||||
@@ -166,6 +178,9 @@ pub struct RPC<Id: ReqId, E: EthSpec> {
|
||||
network_params: NetworkParams,
|
||||
/// A sequential counter indicating when data gets modified.
|
||||
seq_number: u64,
|
||||
|
||||
/// Mallory Config
|
||||
config: MalloryLocalConfig,
|
||||
}
|
||||
|
||||
impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
|
||||
@@ -182,6 +197,7 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
|
||||
outbound_rate_limiter_config: Option<OutboundRateLimiterConfig>,
|
||||
network_params: NetworkParams,
|
||||
seq_number: u64,
|
||||
mallory_config: &MalloryConfig,
|
||||
) -> Self {
|
||||
let response_limiter = inbound_rate_limiter_config.map(|config| {
|
||||
debug!(?config, "Using response rate limiting params");
|
||||
@@ -193,6 +209,16 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
|
||||
SelfRateLimiter::new(outbound_rate_limiter_config, fork_context.clone())
|
||||
.expect("Outbound limiter configuration parameters are valid");
|
||||
|
||||
let mallory_config = MalloryLocalConfig {
|
||||
inbound_timeout: mallory_config
|
||||
.inbound_rpc_timeout
|
||||
.unwrap_or(network_params.resp_timeout.as_secs()),
|
||||
outbound_timeout: mallory_config
|
||||
.outbound_rpc_timeout
|
||||
.unwrap_or(network_params.resp_timeout.as_secs()),
|
||||
self_handle_ping: mallory_config.user_handle_ping,
|
||||
};
|
||||
|
||||
RPC {
|
||||
response_limiter,
|
||||
outbound_request_limiter,
|
||||
@@ -202,6 +228,7 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
|
||||
enable_light_client_server,
|
||||
network_params,
|
||||
seq_number,
|
||||
config: mallory_config,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -344,6 +371,15 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
|
||||
trace!(%peer_id, "Sending Ping");
|
||||
self.send_request(peer_id, id, RequestType::Ping(ping));
|
||||
}
|
||||
|
||||
/// Sends a pong response
|
||||
pub fn pong(&mut self, peer_id: PeerId, inbound_request_id: InboundRequestId, data: u64) {
|
||||
self.send_response(
|
||||
inbound_request_id,
|
||||
RpcResponse::Success(RpcSuccessResponse::Pong(Ping { data })),
|
||||
)
|
||||
.expect("request should exist");
|
||||
}
|
||||
}
|
||||
|
||||
impl<Id, E> NetworkBehaviour for RPC<Id, E>
|
||||
@@ -378,6 +414,7 @@ where
|
||||
self.network_params.resp_timeout,
|
||||
peer_id,
|
||||
connection_id,
|
||||
self.config.clone(),
|
||||
);
|
||||
|
||||
Ok(handler)
|
||||
@@ -408,6 +445,7 @@ where
|
||||
self.network_params.resp_timeout,
|
||||
peer_id,
|
||||
connection_id,
|
||||
self.config,
|
||||
);
|
||||
|
||||
Ok(handler)
|
||||
|
||||
@@ -733,6 +733,7 @@ pub enum RequestType<E: EthSpec> {
|
||||
LightClientUpdatesByRange(LightClientUpdatesByRangeRequest),
|
||||
Ping(Ping),
|
||||
MetaData(MetadataRequest<E>),
|
||||
Raw(RawRequest),
|
||||
}
|
||||
|
||||
/// Implements the encoding per supported protocol for `RPCRequest`.
|
||||
@@ -756,6 +757,7 @@ impl<E: EthSpec> RequestType<E> {
|
||||
RequestType::LightClientOptimisticUpdate => 1,
|
||||
RequestType::LightClientFinalityUpdate => 1,
|
||||
RequestType::LightClientUpdatesByRange(req) => req.count,
|
||||
RequestType::Raw(_) => 1,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -795,6 +797,7 @@ impl<E: EthSpec> RequestType<E> {
|
||||
RequestType::LightClientUpdatesByRange(_) => {
|
||||
SupportedProtocol::LightClientUpdatesByRangeV1
|
||||
}
|
||||
RequestType::Raw(r) => r.protocol,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -818,6 +821,7 @@ impl<E: EthSpec> RequestType<E> {
|
||||
RequestType::LightClientFinalityUpdate => unreachable!(),
|
||||
RequestType::LightClientOptimisticUpdate => unreachable!(),
|
||||
RequestType::LightClientUpdatesByRange(_) => unreachable!(),
|
||||
RequestType::Raw(_) => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -881,6 +885,7 @@ impl<E: EthSpec> RequestType<E> {
|
||||
SupportedProtocol::LightClientUpdatesByRangeV1,
|
||||
Encoding::SSZSnappy,
|
||||
)],
|
||||
RequestType::Raw(req) => vec![ProtocolId::new(req.protocol, Encoding::SSZSnappy)],
|
||||
}
|
||||
}
|
||||
|
||||
@@ -900,6 +905,7 @@ impl<E: EthSpec> RequestType<E> {
|
||||
RequestType::LightClientOptimisticUpdate => true,
|
||||
RequestType::LightClientFinalityUpdate => true,
|
||||
RequestType::LightClientUpdatesByRange(_) => true,
|
||||
RequestType::Raw(_) => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1021,6 +1027,7 @@ impl<E: EthSpec> std::fmt::Display for RequestType<E> {
|
||||
RequestType::LightClientUpdatesByRange(_) => {
|
||||
write!(f, "Light client updates by range request")
|
||||
}
|
||||
RequestType::Raw(raw) => write!(f, "Raw: {}", raw),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
use crate::rpc::methods::{ResponseTermination, RpcResponse, RpcSuccessResponse, StatusMessage};
|
||||
use crate::rpc::methods::{
|
||||
Ping, ResponseTermination, RpcResponse, RpcSuccessResponse, StatusMessage,
|
||||
};
|
||||
use libp2p::PeerId;
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::sync::Arc;
|
||||
use strum::IntoStaticStr;
|
||||
use types::{
|
||||
BlobSidecar, DataColumnSidecar, Epoch, EthSpec, LightClientBootstrap,
|
||||
LightClientFinalityUpdate, LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock,
|
||||
@@ -124,8 +127,9 @@ pub enum AppRequestId {
|
||||
// sent. The main difference is the absense of Pong and Metadata, which don't leave the
|
||||
// Behaviour. For all protocol reponses managed by RPC see `RPCResponse` and
|
||||
// `RPCCodedResponse`.
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
#[derive(Debug, Clone, PartialEq, IntoStaticStr)]
|
||||
pub enum Response<E: EthSpec> {
|
||||
Ping(u64),
|
||||
/// A Status message.
|
||||
Status(StatusMessage),
|
||||
/// A response to a get BLOCKS_BY_RANGE request. A None response signals the end of the batch.
|
||||
@@ -178,6 +182,7 @@ impl<E: EthSpec> std::convert::From<Response<E>> for RpcResponse<E> {
|
||||
None => RpcResponse::StreamTermination(ResponseTermination::DataColumnsByRange),
|
||||
},
|
||||
Response::Status(s) => RpcResponse::Success(RpcSuccessResponse::Status(s)),
|
||||
Response::Ping(data) => RpcResponse::Success(RpcSuccessResponse::Pong(Ping { data })),
|
||||
Response::LightClientBootstrap(b) => {
|
||||
RpcResponse::Success(RpcSuccessResponse::LightClientBootstrap(b))
|
||||
}
|
||||
|
||||
187
beacon_node/lighthouse_network/src/service/mallory.rs
Normal file
187
beacon_node/lighthouse_network/src/service/mallory.rs
Normal file
@@ -0,0 +1,187 @@
|
||||
use super::*;
|
||||
use libp2p::core::transport::{ListenerId, TransportError};
|
||||
use libp2p::core::ConnectedPoint;
|
||||
use libp2p::swarm::*;
|
||||
use std::io;
|
||||
/// Custom error that can be produced by the [`ConnectionHandler`] of the [`NetworkBehaviour`].
|
||||
#[derive(Debug)]
|
||||
pub enum MallorySwarmEvent {
|
||||
/// One of the listeners gracefully closed.
|
||||
ListenerClosed {
|
||||
/// The listener that closed.
|
||||
listener_id: libp2p::core::transport::ListenerId,
|
||||
/// The addresses that the listener was listening on. These addresses are now considered
|
||||
/// expired, similar to if a [`ExpiredListenAddr`](SwarmEvent::ExpiredListenAddr) event
|
||||
/// has been generated for each of them.
|
||||
addresses: Vec<Multiaddr>,
|
||||
/// Reason for the closure. Contains `Ok(())` if the stream produced `None`, or `Err`
|
||||
/// if the stream produced an error.
|
||||
reason: Result<(), std::io::Error>,
|
||||
},
|
||||
/// One of the listeners reported a non-fatal error.
|
||||
ListenerError {
|
||||
/// The listener that errored.
|
||||
listener_id: ListenerId,
|
||||
/// The listener error.
|
||||
error: io::Error,
|
||||
},
|
||||
/// Outgoing connection attempt failed.
|
||||
OutgoingConnectionError {
|
||||
/// Identifier of the connection.
|
||||
connection_id: ConnectionId,
|
||||
/// If known, [`PeerId`] of the peer we tried to reach.
|
||||
peer_id: Option<PeerId>,
|
||||
/// Error that has been encountered.
|
||||
error: DialError,
|
||||
},
|
||||
IncomingConnection {
|
||||
/// Identifier of the connection.
|
||||
connection_id: ConnectionId,
|
||||
/// Local connection address.
|
||||
/// This address has been earlier reported with a [`NewListenAddr`](SwarmEvent::NewListenAddr)
|
||||
/// event.
|
||||
local_addr: Multiaddr,
|
||||
/// Address used to send back data to the remote.
|
||||
send_back_addr: Multiaddr,
|
||||
},
|
||||
/// An error happened on a connection during its initial handshake.
|
||||
///
|
||||
/// This can include, for example, an error during the handshake of the encryption layer, or
|
||||
/// the connection unexpectedly closed.
|
||||
IncomingConnectionError {
|
||||
/// Identifier of the connection.
|
||||
connection_id: ConnectionId,
|
||||
/// Local connection address.
|
||||
/// This address has been earlier reported with a [`NewListenAddr`](SwarmEvent::NewListenAddr)
|
||||
/// event.
|
||||
local_addr: Multiaddr,
|
||||
/// Address used to send back data to the remote.
|
||||
send_back_addr: Multiaddr,
|
||||
/// The error that happened.
|
||||
error: ListenError,
|
||||
},
|
||||
Dialing {
|
||||
/// Identity of the peer that we are connecting to.
|
||||
peer_id: Option<PeerId>,
|
||||
/// Identifier of the connection.
|
||||
connection_id: ConnectionId,
|
||||
},
|
||||
ConnectionClosed {
|
||||
/// Identity of the peer that we have connected to.
|
||||
peer_id: PeerId,
|
||||
/// Identifier of the connection.
|
||||
connection_id: ConnectionId,
|
||||
/// Endpoint of the connection that has been closed.
|
||||
endpoint: ConnectedPoint,
|
||||
/// Number of other remaining connections to this same peer.
|
||||
num_established: u32,
|
||||
/// Reason for the disconnection, if it was not a successful
|
||||
/// active close.
|
||||
cause: Option<String>,
|
||||
},
|
||||
/// A connection to the given peer has been opened.
|
||||
ConnectionEstablished {
|
||||
/// Identity of the peer that we have connected to.
|
||||
peer_id: PeerId,
|
||||
/// Identifier of the connection.
|
||||
connection_id: ConnectionId,
|
||||
/// Endpoint of the connection that has been opened.
|
||||
endpoint: ConnectedPoint,
|
||||
/// Number of established connections to this peer, including the one that has just been
|
||||
/// opened.
|
||||
num_established: std::num::NonZeroU32,
|
||||
/// [`Some`] when the new connection is an outgoing connection.
|
||||
/// Addresses are dialed concurrently. Contains the addresses and errors
|
||||
/// of dial attempts that failed before the one successful dial.
|
||||
concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<io::Error>)>>,
|
||||
/// How long it took to establish this connection
|
||||
established_in: std::time::Duration,
|
||||
},
|
||||
}
|
||||
|
||||
impl<B> TryFrom<SwarmEvent<B>> for MallorySwarmEvent {
|
||||
type Error = SwarmEvent<B>;
|
||||
|
||||
fn try_from(event: SwarmEvent<B>) -> Result<MallorySwarmEvent, Self::Error> {
|
||||
match event {
|
||||
SwarmEvent::ListenerClosed {
|
||||
listener_id,
|
||||
addresses,
|
||||
reason,
|
||||
} => Ok(MallorySwarmEvent::ListenerClosed {
|
||||
listener_id,
|
||||
addresses,
|
||||
reason,
|
||||
}),
|
||||
SwarmEvent::ListenerError { listener_id, error } => {
|
||||
Ok(MallorySwarmEvent::ListenerError { listener_id, error })
|
||||
}
|
||||
SwarmEvent::OutgoingConnectionError {
|
||||
connection_id,
|
||||
peer_id,
|
||||
error,
|
||||
} => Ok(MallorySwarmEvent::OutgoingConnectionError {
|
||||
connection_id,
|
||||
peer_id,
|
||||
error,
|
||||
}),
|
||||
SwarmEvent::IncomingConnection {
|
||||
connection_id,
|
||||
local_addr,
|
||||
send_back_addr,
|
||||
} => Ok(MallorySwarmEvent::IncomingConnection {
|
||||
connection_id,
|
||||
local_addr,
|
||||
send_back_addr,
|
||||
}),
|
||||
SwarmEvent::IncomingConnectionError {
|
||||
connection_id,
|
||||
local_addr,
|
||||
send_back_addr,
|
||||
error,
|
||||
} => Ok(MallorySwarmEvent::IncomingConnectionError {
|
||||
connection_id,
|
||||
local_addr,
|
||||
send_back_addr,
|
||||
error,
|
||||
}),
|
||||
SwarmEvent::Dialing {
|
||||
peer_id,
|
||||
connection_id,
|
||||
} => Ok(MallorySwarmEvent::Dialing {
|
||||
peer_id,
|
||||
connection_id,
|
||||
}),
|
||||
SwarmEvent::ConnectionClosed {
|
||||
peer_id,
|
||||
connection_id,
|
||||
endpoint,
|
||||
num_established,
|
||||
cause,
|
||||
} => Ok(MallorySwarmEvent::ConnectionClosed {
|
||||
peer_id,
|
||||
connection_id,
|
||||
endpoint,
|
||||
num_established,
|
||||
cause: cause.map(|v| format!("{:?}", v)),
|
||||
}),
|
||||
SwarmEvent::ConnectionEstablished {
|
||||
peer_id,
|
||||
connection_id,
|
||||
endpoint,
|
||||
num_established,
|
||||
concurrent_dial_errors,
|
||||
established_in,
|
||||
} => Ok(MallorySwarmEvent::ConnectionEstablished {
|
||||
peer_id,
|
||||
connection_id,
|
||||
endpoint,
|
||||
num_established,
|
||||
concurrent_dial_errors,
|
||||
established_in,
|
||||
}),
|
||||
ev => Err(ev), // Don't pass other events up.
|
||||
}
|
||||
}
|
||||
}
|
||||
// Used for Mallory
|
||||
@@ -23,8 +23,8 @@ use crate::{metrics, Enr, NetworkGlobals, PubsubMessage, TopicHash};
|
||||
use api_types::{AppRequestId, Response};
|
||||
use futures::stream::StreamExt;
|
||||
use gossipsub::{
|
||||
IdentTopic as Topic, MessageAcceptance, MessageAuthenticity, MessageId, PublishError,
|
||||
TopicScoreParams,
|
||||
Config as GossipsubConfig, IdentTopic as Topic, MessageAcceptance, MessageAuthenticity,
|
||||
MessageId, PublishError, RawMessage, TopicScoreParams,
|
||||
};
|
||||
use gossipsub_scoring_parameters::{lighthouse_gossip_thresholds, PeerScoreSettings};
|
||||
use libp2p::multiaddr::{self, Multiaddr, Protocol as MProtocol};
|
||||
@@ -48,7 +48,9 @@ use utils::{build_transport, strip_peer_id, Context as ServiceContext};
|
||||
pub mod api_types;
|
||||
mod gossip_cache;
|
||||
pub mod gossipsub_scoring_parameters;
|
||||
mod mallory;
|
||||
pub mod utils;
|
||||
pub use mallory::*;
|
||||
/// The number of peers we target per subnet for discovery queries.
|
||||
pub const TARGET_SUBNET_PEERS: usize = 3;
|
||||
|
||||
@@ -104,6 +106,10 @@ pub enum NetworkEvent<E: EthSpec> {
|
||||
ZeroListeners,
|
||||
/// A peer has an updated custody group count from MetaData.
|
||||
PeerUpdatedCustodyGroupCount(PeerId),
|
||||
/// Mallory: Identify has been received.
|
||||
IdentifyReceived(PeerId),
|
||||
/// Mallory: Pass swarm events to mallory to handle
|
||||
MallorySwarmEvent(MallorySwarmEvent),
|
||||
}
|
||||
|
||||
pub type Gossipsub = gossipsub::Behaviour<SnappyTransform, SubscriptionFilter>;
|
||||
@@ -111,7 +117,7 @@ pub type SubscriptionFilter =
|
||||
gossipsub::MaxCountSubscriptionFilter<gossipsub::WhitelistSubscriptionFilter>;
|
||||
|
||||
#[derive(NetworkBehaviour)]
|
||||
pub(crate) struct Behaviour<E>
|
||||
pub struct Behaviour<E>
|
||||
where
|
||||
E: EthSpec,
|
||||
{
|
||||
@@ -145,7 +151,7 @@ where
|
||||
/// This core behaviour is managed by `Behaviour` which adds peer management to all core
|
||||
/// behaviours.
|
||||
pub struct Network<E: EthSpec> {
|
||||
swarm: libp2p::swarm::Swarm<Behaviour<E>>,
|
||||
pub swarm: libp2p::swarm::Swarm<Behaviour<E>>,
|
||||
/* Auxiliary Fields */
|
||||
/// A collections of variables accessible outside the network service.
|
||||
network_globals: Arc<NetworkGlobals<E>>,
|
||||
@@ -160,9 +166,11 @@ pub struct Network<E: EthSpec> {
|
||||
score_settings: PeerScoreSettings<E>,
|
||||
/// The interval for updating gossipsub scores
|
||||
update_gossipsub_scores: tokio::time::Interval,
|
||||
gossip_cache: GossipCache,
|
||||
pub gossip_cache: GossipCache,
|
||||
/// This node's PeerId.
|
||||
pub local_peer_id: PeerId,
|
||||
/// Mallory specific. User handles the ping requests.
|
||||
user_handle_ping: bool,
|
||||
}
|
||||
|
||||
/// Implements the combined behaviour for the libp2p service.
|
||||
@@ -177,11 +185,12 @@ impl<E: EthSpec> Network<E> {
|
||||
executor: task_executor::TaskExecutor,
|
||||
mut ctx: ServiceContext<'_>,
|
||||
custody_group_count: u64,
|
||||
gs_config: Option<GossipsubConfig>,
|
||||
) -> Result<(Self, Arc<NetworkGlobals<E>>), String> {
|
||||
let config = ctx.config.clone();
|
||||
trace!("Libp2p Service starting");
|
||||
// initialise the node's ID
|
||||
let local_keypair = utils::load_private_key(&config);
|
||||
let local_keypair = ctx.keypair;
|
||||
|
||||
// Trusted peers will also be marked as explicit in GossipSub.
|
||||
// Cfr. https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#explicit-peering-agreements
|
||||
@@ -234,14 +243,18 @@ impl<E: EthSpec> Network<E> {
|
||||
message_domain_valid_snappy: ctx.chain_spec.message_domain_valid_snappy,
|
||||
gossipsub_max_transmit_size: ctx.chain_spec.max_message_size(),
|
||||
};
|
||||
let gs_config = gossipsub_config(
|
||||
config.network_load,
|
||||
ctx.fork_context.clone(),
|
||||
gossipsub_config_params,
|
||||
ctx.chain_spec.seconds_per_slot,
|
||||
E::slots_per_epoch(),
|
||||
config.idontwant_message_size_threshold,
|
||||
);
|
||||
|
||||
let gs_config = match gs_config {
|
||||
Some(config) => config,
|
||||
None => gossipsub_config(
|
||||
config.network_load,
|
||||
ctx.fork_context.clone(),
|
||||
gossipsub_config_params,
|
||||
ctx.chain_spec.seconds_per_slot,
|
||||
E::slots_per_epoch(),
|
||||
config.idontwant_message_size_threshold,
|
||||
),
|
||||
};
|
||||
|
||||
let score_settings = PeerScoreSettings::new(&ctx.chain_spec, gs_config.mesh_n());
|
||||
|
||||
@@ -386,6 +399,7 @@ impl<E: EthSpec> Network<E> {
|
||||
config.outbound_rate_limiter_config.clone(),
|
||||
network_params,
|
||||
seq_number,
|
||||
&config.attacker_config,
|
||||
);
|
||||
|
||||
let discovery = {
|
||||
@@ -424,20 +438,35 @@ impl<E: EthSpec> Network<E> {
|
||||
quic_enabled: !config.disable_quic_support,
|
||||
metrics_enabled: config.metrics_enabled,
|
||||
target_peer_count: config.target_peers,
|
||||
ping_interval_inbound: config
|
||||
.attacker_config
|
||||
.inbound_peers_ping
|
||||
.unwrap_or(crate::peer_manager::config::DEFAULT_PING_INTERVAL_INBOUND),
|
||||
ping_interval_outbound: config
|
||||
.attacker_config
|
||||
.outbound_peers_ping
|
||||
.unwrap_or(crate::peer_manager::config::DEFAULT_PING_INTERVAL_OUTBOUND),
|
||||
status_interval: config
|
||||
.attacker_config
|
||||
.status_interval
|
||||
.unwrap_or(crate::peer_manager::config::DEFAULT_STATUS_INTERVAL),
|
||||
..Default::default()
|
||||
};
|
||||
PeerManager::new(peer_manager_cfg, network_globals.clone())?
|
||||
};
|
||||
|
||||
let max_incomming = if let Some(connections) = ctx.incoming_connections.as_ref() {
|
||||
*connections
|
||||
} else {
|
||||
(config.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR - MIN_OUTBOUND_ONLY_FACTOR))
|
||||
.ceil() as u32
|
||||
};
|
||||
|
||||
let connection_limits = {
|
||||
let limits = libp2p::connection_limits::ConnectionLimits::default()
|
||||
.with_max_pending_incoming(Some(5))
|
||||
.with_max_pending_incoming(Some(max_incomming))
|
||||
.with_max_pending_outgoing(Some(16))
|
||||
.with_max_established_incoming(Some(
|
||||
(config.target_peers as f32
|
||||
* (1.0 + PEER_EXCESS_FACTOR - MIN_OUTBOUND_ONLY_FACTOR))
|
||||
.ceil() as u32,
|
||||
))
|
||||
.with_max_established_incoming(Some(max_incomming))
|
||||
.with_max_established_outgoing(Some(
|
||||
(config.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR)).ceil() as u32,
|
||||
))
|
||||
@@ -521,6 +550,7 @@ impl<E: EthSpec> Network<E> {
|
||||
update_gossipsub_scores,
|
||||
gossip_cache,
|
||||
local_peer_id,
|
||||
user_handle_ping: config.attacker_config.user_handle_ping,
|
||||
};
|
||||
|
||||
network.start(&config).await?;
|
||||
@@ -1439,7 +1469,7 @@ impl<E: EthSpec> Network<E> {
|
||||
name = "libp2p",
|
||||
skip_all
|
||||
)]
|
||||
fn send_meta_data_request(&mut self, peer_id: PeerId) {
|
||||
pub fn send_meta_data_request(&mut self, peer_id: PeerId) {
|
||||
let event = if self.fork_context.spec.is_peer_das_scheduled() {
|
||||
// Nodes with higher custody will probably start advertising it
|
||||
// before peerdas is activated
|
||||
@@ -1743,10 +1773,25 @@ impl<E: EthSpec> Network<E> {
|
||||
/* Behaviour managed protocols: Ping and Metadata */
|
||||
RequestType::Ping(ping) => {
|
||||
// inform the peer manager and send the response
|
||||
if self.user_handle_ping {
|
||||
return Some(NetworkEvent::RequestReceived {
|
||||
peer_id,
|
||||
inbound_request_id,
|
||||
request_type,
|
||||
});
|
||||
}
|
||||
self.peer_manager_mut().ping_request(&peer_id, ping.data);
|
||||
None
|
||||
}
|
||||
RequestType::MetaData(_req) => {
|
||||
RequestType::Raw(_) => {
|
||||
// inform the peer manager and send the response
|
||||
return Some(NetworkEvent::RequestReceived {
|
||||
peer_id,
|
||||
inbound_request_id,
|
||||
request_type,
|
||||
});
|
||||
}
|
||||
RequestType::MetaData(req) => {
|
||||
// send the requested meta-data
|
||||
let metadata = self.network_globals.local_metadata.read().clone();
|
||||
// The encoder is responsible for sending the negotiated version of the metadata
|
||||
@@ -1993,6 +2038,7 @@ impl<E: EthSpec> Network<E> {
|
||||
}
|
||||
// send peer info to the peer manager.
|
||||
self.peer_manager_mut().identify(&peer_id, &info);
|
||||
return Some(NetworkEvent::IdentifyReceived(peer_id));
|
||||
}
|
||||
identify::Event::Sent { .. } => {}
|
||||
identify::Event::Error { .. } => {}
|
||||
@@ -2122,7 +2168,15 @@ impl<E: EthSpec> Network<E> {
|
||||
// Poll the libp2p `Swarm`.
|
||||
// This will poll the swarm and do maintenance routines.
|
||||
Some(event) = self.swarm.next() => {
|
||||
if let Some(event) = self.parse_swarm_event(event) {
|
||||
// Try convert to mallory event.This just passes some swarm events up to mallory,
|
||||
// rather than processing here.
|
||||
// Attempt passing swarm events up to Mallory
|
||||
let swarm_event = match MallorySwarmEvent::try_from(event) {
|
||||
Ok(ev) => return NetworkEvent::MallorySwarmEvent(ev),
|
||||
Err(ev) => ev,
|
||||
};
|
||||
|
||||
if let Some(event) = self.parse_swarm_event(swarm_event) {
|
||||
return event;
|
||||
}
|
||||
},
|
||||
@@ -2148,6 +2202,31 @@ impl<E: EthSpec> Network<E> {
|
||||
}
|
||||
}
|
||||
}
|
||||
#[instrument(parent = None,
|
||||
level = "trace",
|
||||
fields(service = "libp2p"),
|
||||
name = "libp2p",
|
||||
skip_all
|
||||
)]
|
||||
/// Publish a raw gossipsub RPC message to a specific target.
|
||||
pub fn publish_raw_targeted(&mut self, msg: RawMessage, target: PeerId) {
|
||||
if let Err(e) = self.gossipsub_mut().raw_publish_targeted(target, msg) {
|
||||
warn!("error" = ?e, "Could not publish message");
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(parent = None,
|
||||
level = "trace",
|
||||
fields(service = "libp2p"),
|
||||
name = "libp2p",
|
||||
skip_all
|
||||
)]
|
||||
/// Publish a raw gossipsub RPC message to a specific target.
|
||||
pub fn publish_raw(&mut self, msg: RawMessage, topic: Topic) {
|
||||
if let Err(e) = self.gossipsub_mut().raw_publish(topic, msg) {
|
||||
warn!("error" = ?e, "Could not publish message");
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(parent = None,
|
||||
level = "trace",
|
||||
|
||||
@@ -30,6 +30,8 @@ pub struct Context<'a> {
|
||||
pub fork_context: Arc<ForkContext>,
|
||||
pub chain_spec: Arc<ChainSpec>,
|
||||
pub libp2p_registry: Option<&'a mut Registry>,
|
||||
pub keypair: Keypair,
|
||||
pub incoming_connections: Option<u32>,
|
||||
}
|
||||
|
||||
type BoxedTransport = Boxed<(PeerId, StreamMuxerBox)>;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use gossipsub::{IdentTopic as Topic, TopicHash};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashSet;
|
||||
use strum::AsRefStr;
|
||||
use strum::{AsRefStr, IntoStaticStr};
|
||||
use types::{ChainSpec, DataColumnSubnetId, EthSpec, ForkName, SubnetId, SyncSubnetId, Unsigned};
|
||||
|
||||
use crate::Subnet;
|
||||
@@ -145,7 +145,7 @@ pub struct GossipTopic {
|
||||
|
||||
/// Enum that brings these topics into the rust type system.
|
||||
// NOTE: There is intentionally no unknown type here. We only allow known gossipsub topics.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash, AsRefStr)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash, AsRefStr, IntoStaticStr)]
|
||||
#[strum(serialize_all = "snake_case")]
|
||||
pub enum GossipKind {
|
||||
/// Topic for publishing beacon blocks.
|
||||
|
||||
@@ -5,7 +5,7 @@ mod metrics;
|
||||
mod nat;
|
||||
mod network_beacon_processor;
|
||||
mod persisted_dht;
|
||||
mod router;
|
||||
pub mod router;
|
||||
mod status;
|
||||
mod subnet_service;
|
||||
mod sync;
|
||||
|
||||
@@ -314,6 +314,7 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
Response::LightClientBootstrap(_)
|
||||
| Response::LightClientOptimisticUpdate(_)
|
||||
| Response::LightClientFinalityUpdate(_)
|
||||
| Response::Ping(_)
|
||||
| Response::LightClientUpdatesByRange(_) => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -267,6 +267,10 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
&beacon_chain.spec,
|
||||
));
|
||||
|
||||
debug!(fork_name = ?fork_context.current_fork_name(), "Current fork");
|
||||
|
||||
let keypair = lighthouse_network::load_private_key(&config);
|
||||
|
||||
// construct the libp2p service context
|
||||
let service_context = Context {
|
||||
config: config.clone(),
|
||||
@@ -274,6 +278,8 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
fork_context: fork_context.clone(),
|
||||
chain_spec: beacon_chain.spec.clone(),
|
||||
libp2p_registry,
|
||||
keypair,
|
||||
incoming_connections: None,
|
||||
};
|
||||
|
||||
// launch libp2p service
|
||||
@@ -284,6 +290,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
.data_availability_checker
|
||||
.custody_context()
|
||||
.custody_group_count_at_head(&beacon_chain.spec),
|
||||
None
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -475,6 +482,11 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
shutdown_sender: &mut Sender<ShutdownReason>,
|
||||
) {
|
||||
match ev {
|
||||
// mallory event
|
||||
NetworkEvent::MallorySwarmEvent(_) => {}
|
||||
// mallory event
|
||||
NetworkEvent::IdentifyReceived(_) => {}
|
||||
|
||||
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
|
||||
self.send_to_router(RouterMessage::StatusPeer(peer_id));
|
||||
}
|
||||
|
||||
@@ -244,7 +244,7 @@ impl Eth2NetworkConfig {
|
||||
}
|
||||
}
|
||||
|
||||
fn get_genesis_state_from_bytes<E: EthSpec>(&self) -> Result<BeaconState<E>, String> {
|
||||
pub fn get_genesis_state_from_bytes<E: EthSpec>(&self) -> Result<BeaconState<E>, String> {
|
||||
let spec = self.chain_spec::<E>()?;
|
||||
self.genesis_state_bytes
|
||||
.as_ref()
|
||||
|
||||
Reference in New Issue
Block a user