Mallory - Single commit

This commit is contained in:
Age Manning
2025-05-12 15:23:29 +10:00
committed by Pawan Dhananjay
parent 134039d014
commit 497c7d7190
21 changed files with 1371 additions and 884 deletions

1683
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -131,9 +131,9 @@ deposit_contract = { path = "common/deposit_contract" }
derivative = "2"
directory = { path = "common/directory" }
dirs = "3"
discv5 = { version = "0.9", features = ["libp2p"] }
doppelganger_service = { path = "validator_client/doppelganger_service" }
either = "1.9"
discv5 = { git= "https://github.com/sigp/discv5", features = ["libp2p"], branch = "mallory" }
env_logger = "0.9"
environment = { path = "lighthouse/environment" }
eth2 = { path = "common/eth2" }
@@ -159,11 +159,11 @@ fork_choice = { path = "consensus/fork_choice" }
fs2 = "0.4"
futures = "0.3"
genesis = { path = "beacon_node/genesis" }
gossipsub = { package = "libp2p-gossipsub", git = "https://github.com/sigp/rust-libp2p.git", rev = "61b2820" }
graffiti_file = { path = "validator_client/graffiti_file" }
gossipsub = { package = "libp2p-gossipsub", git = "https://github.com/sigp/rust-libp2p.git", branch = "mallory" }
hex = "0.4"
hashlink = "0.9.0"
health_metrics = { path = "common/health_metrics" }
hex = "0.4"
http_api = { path = "beacon_node/http_api" }
hyper = "1"
initialized_validators = { path = "validator_client/initialized_validators" }

View File

@@ -35,7 +35,7 @@ pub struct Config {
pub network_dir: PathBuf,
/// IP addresses to listen on.
pub(crate) listen_addresses: ListenAddress,
pub listen_addresses: ListenAddress,
/// The address to broadcast to peers about which address we are listening on. None indicates
/// that no discovery address has been set in the CLI args.
@@ -142,6 +142,9 @@ pub struct Config {
/// Flag for advertising a fake CGC to peers for testing ONLY.
pub advertise_false_custody_group_count: Option<u64>,
/// Extra configurations for Mallory.
#[serde(skip)]
pub attacker_config: crate::MalloryConfig,
}
impl Config {
@@ -367,6 +370,7 @@ impl Default for Config {
inbound_rate_limiter_config: None,
idontwant_message_size_threshold: DEFAULT_IDONTWANT_MESSAGE_SIZE_THRESHOLD,
advertise_false_custody_group_count: None,
attacker_config: Default::default(),
}
}
}

View File

@@ -241,14 +241,8 @@ impl<E: EthSpec> Discovery<E> {
quic = bootnode_enr.quic4(),
"Adding node to routing table"
);
let repr = bootnode_enr.to_string();
let _ = discv5.add_enr(bootnode_enr).map_err(|e| {
error!(
addr = repr,
error = e.to_string(),
"Could not add peer to the local routing table"
)
});
// Error is suppressed for mallory
let _ = discv5.add_enr(bootnode_enr);
}
// Start the discv5 service and obtain an event stream

View File

@@ -2,11 +2,12 @@
/// all required libp2p functionality.
///
/// This crate builds and manages the libp2p services required by the beacon node.
mod config;
pub mod config;
pub mod service;
pub mod discovery;
pub mod listen_addr;
mod mallory_config;
pub mod metrics;
pub mod peer_manager;
pub mod rpc;
@@ -38,6 +39,12 @@ impl FromStr for PeerIdSerialized {
}
}
impl From<PeerId> for PeerIdSerialized {
fn from(peer_id: PeerId) -> Self {
PeerIdSerialized(peer_id)
}
}
impl Serialize for PeerIdSerialized {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
@@ -111,8 +118,9 @@ pub use discovery::{CombinedKeyExt, EnrExt, Eth2Enr};
pub use discv5;
pub use gossipsub::{IdentTopic, MessageAcceptance, MessageId, Topic, TopicHash};
pub use libp2p;
pub use libp2p::{core::ConnectedPoint, PeerId, Swarm};
pub use libp2p::{core::ConnectedPoint, identity::Keypair, PeerId, Swarm};
pub use libp2p::{multiaddr, Multiaddr};
pub use mallory_config::MalloryConfig;
pub use metrics::scrape_discovery_metrics;
pub use peer_manager::{
peerdb::client::Client,
@@ -120,6 +128,7 @@ pub use peer_manager::{
peerdb::PeerDB,
ConnectionDirection, PeerConnectionStatus, PeerInfo, PeerManager, SyncInfo, SyncStatus,
};
pub use service::Behaviour;
// pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_FILENAME};
pub use service::api_types::Response;
pub use service::utils::*;

View File

@@ -0,0 +1,37 @@
/// Every configuration needed for Mallory.
#[derive(Debug, Clone)]
pub struct MalloryConfig {
/* Peer manager stuff */
/// Ping inbound peers this often (in seconds) instead of the default `PING_INTERVAL_INBOUND`.
pub inbound_peers_ping: Option<u64>,
/// Ping outbound peers this often (in seconds) instead of the default `PING_INTERVAL_OUTBOUND`.
pub outbound_peers_ping: Option<u64>,
/// Status peers this often (in seconds) instead of the default `STATUS_INTERVAL`.
pub status_interval: Option<u64>,
/* RPC stuff */
/// Duration in seconds after which an inbound connection with a peer times out instead of the
/// default `RESPONSE_TIMEOUT`.
pub inbound_rpc_timeout: Option<u64>,
/// Duration in seconds after which an outbound connection with a peer times out instead of the
/// default `RESPONSE_TIMEOUT`.
pub outbound_rpc_timeout: Option<u64>,
/* Behaviour Stuff */
// Allow the user to handle a ping request
pub user_handle_ping: bool,
}
impl Default for MalloryConfig {
fn default() -> Self {
Self {
inbound_peers_ping: None,
outbound_peers_ping: None,
status_interval: None,
inbound_rpc_timeout: None,
outbound_rpc_timeout: None,
user_handle_ping: false,
}
}
}

View File

@@ -3,11 +3,11 @@
//! Currently using identify to fingerprint.
use libp2p::identify::Info as IdentifyInfo;
use serde::Serialize;
use serde::{Deserialize, Serialize};
use strum::{AsRefStr, EnumIter, IntoStaticStr};
/// Various client and protocol information related to a node.
#[derive(Clone, Debug, Serialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Client {
/// The client's name (Ex: lighthouse, prism, nimbus, etc)
pub kind: ClientKind,
@@ -21,7 +21,9 @@ pub struct Client {
pub agent_string: Option<String>,
}
#[derive(Clone, Copy, Debug, Serialize, PartialEq, AsRefStr, IntoStaticStr, EnumIter)]
#[derive(
Clone, Copy, Debug, Serialize, PartialEq, AsRefStr, IntoStaticStr, EnumIter, Deserialize,
)]
pub enum ClientKind {
/// A lighthouse node (the best kind).
Lighthouse,

View File

@@ -100,7 +100,7 @@ impl<E: EthSpec> SSZSnappyInboundCodec<E> {
}
}
},
RpcResponse::Error(_, err) => err.as_ssz_bytes(),
RpcResponse::Error(_, err) => err.as_bytes().to_vec().as_ssz_bytes(),
RpcResponse::StreamTermination(_) => {
unreachable!("Code error - attempting to encode a stream termination")
}
@@ -334,6 +334,14 @@ impl<E: EthSpec> Encoder<RequestType<E>> for SSZSnappyOutboundCodec<E> {
type Error = RPCError;
fn encode(&mut self, item: RequestType<E>, dst: &mut BytesMut) -> Result<(), Self::Error> {
let compress = !matches!(
&item,
RequestType::Raw(RawRequest {
mode: RawMode::Raw,
..
})
);
let bytes = match item {
RequestType::Status(req) => {
// Send the status message based on the negotiated protocol
@@ -365,14 +373,25 @@ impl<E: EthSpec> Encoder<RequestType<E>> for SSZSnappyOutboundCodec<E> {
RequestType::MetaData(_)
| RequestType::LightClientOptimisticUpdate
| RequestType::LightClientFinalityUpdate => return Ok(()),
RequestType::Raw(RawRequest {
bytes,
protocol: _,
mode,
}) => match mode {
RawMode::EncodeAndCompress => bytes.as_ssz_bytes(),
RawMode::Compress | RawMode::Raw => bytes,
},
};
// Mallory doesn't care about inbound limits
/*
// SSZ encoded bytes should be within `max_packet_size`
if bytes.len() > self.max_packet_size {
return Err(RPCError::InternalError(
"attempting to encode data > max_packet_size",
));
}
*/
// Inserts the length prefix of the uncompressed bytes into dst
// encoded as a unsigned varint
@@ -380,12 +399,14 @@ impl<E: EthSpec> Encoder<RequestType<E>> for SSZSnappyOutboundCodec<E> {
.encode(bytes.len(), dst)
.map_err(RPCError::from)?;
if compress {
let mut writer = FrameEncoder::new(Vec::new());
writer.write_all(&bytes).map_err(RPCError::from)?;
writer.flush().map_err(RPCError::from)?;
// Write compressed bytes to `dst`
dst.extend_from_slice(writer.get_ref());
} else {
dst.extend_from_slice(&bytes);
}
Ok(())
}
}

View File

@@ -4,6 +4,7 @@
use super::methods::{GoodbyeReason, RpcErrorResponse, RpcResponse};
use super::outbound::OutboundRequestContainer;
use super::protocol::{InboundOutput, Protocol, RPCError, RPCProtocol, RequestType};
use super::MalloryLocalConfig;
use super::{RPCReceived, RPCSend, ReqId};
use crate::rpc::outbound::OutboundFramed;
use crate::rpc::protocol::InboundFramed;
@@ -143,6 +144,9 @@ where
/// Timeout that will be used for inbound and outbound responses.
resp_timeout: Duration,
/// Additional configurations for the RPC Handler
config: MalloryLocalConfig,
}
enum HandlerState {
@@ -227,6 +231,7 @@ where
resp_timeout: Duration,
peer_id: PeerId,
connection_id: ConnectionId,
config: MalloryLocalConfig,
) -> Self {
RPCHandler {
connection_id,
@@ -247,6 +252,7 @@ where
fork_context,
waker: None,
resp_timeout,
config,
}
}
@@ -711,8 +717,10 @@ where
request,
};
substream_entry.max_remaining_chunks = Some(max_remaining_chunks);
self.outbound_substreams_delay
.reset(delay_key, self.resp_timeout);
self.outbound_substreams_delay.reset(
delay_key,
Duration::from_secs(self.config.outbound_timeout),
);
}
}
@@ -1035,9 +1043,10 @@ where
Some(max_responses)
};
// new outbound request. Store the stream and tag the output.
let delay_key = self
.outbound_substreams_delay
.insert(self.current_outbound_substream_id, self.resp_timeout);
let delay_key = self.outbound_substreams_delay.insert(
self.current_outbound_substream_id,
Duration::from_secs(self.config.outbound_timeout),
);
let awaiting_stream = OutboundSubstreamState::RequestPendingResponse {
substream: Box::new(substream),
request,

View File

@@ -1,5 +1,6 @@
//! Available RPC methods types and ids.
use super::protocol::SupportedProtocol;
use crate::types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield};
use regex::bytes::Regex;
use serde::Serialize;
@@ -11,6 +12,7 @@ use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::Arc;
use strum::IntoStaticStr;
use strum::{Display as StrumDisplay, EnumString};
use superstruct::superstruct;
use types::blob_sidecar::BlobIdentifier;
use types::light_client_update::MAX_REQUEST_LIGHT_CLIENT_UPDATES;
@@ -129,6 +131,38 @@ pub struct Ping {
pub data: u64,
}
#[derive(Debug, Clone, PartialEq)]
pub struct RawRequest {
pub bytes: Vec<u8>,
pub protocol: SupportedProtocol,
pub mode: RawMode,
}
#[derive(Debug, Clone, PartialEq, EnumString, StrumDisplay)]
pub enum RawMode {
/// SSZ encode, Snappy compress.
#[strum(serialize = "encode-compress")]
EncodeAndCompress,
/// Only Snappy compress.
#[strum(serialize = "compress")]
Compress,
/// Do not alter the bytes.
#[strum(serialize = "raw")]
Raw,
}
impl Default for RawMode {
fn default() -> Self {
RawMode::EncodeAndCompress
}
}
impl std::fmt::Display for RawRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
/// The METADATA request structure.
#[superstruct(
variants(V1, V2, V3),
@@ -411,6 +445,8 @@ impl DataColumnsByRangeRequest {
}
}
const MALLORY_MAX_REQUEST_BLOCKS: usize = 10000000000000000000;
/// Request a number of beacon block roots from a peer.
#[superstruct(
variants(V1, V2),
@@ -496,6 +532,11 @@ impl BlocksByRootRequest {
let block_roots = RuntimeVariableList::from_vec(block_roots, max_request_blocks);
Self::V1(BlocksByRootRequestV1 { block_roots })
}
pub fn mallory_new(block_roots: Vec<Hash256>) -> Self {
let block_roots = RuntimeVariableList::from_vec(block_roots, MALLORY_MAX_REQUEST_BLOCKS);
Self::V2(BlocksByRootRequestV2 { block_roots })
}
}
/// Request a number of beacon blocks and blobs from a peer.
@@ -653,10 +694,10 @@ impl ResponseTermination {
/// and the contents of the response
#[derive(Debug, Clone)]
pub enum RpcResponse<E: EthSpec> {
/// The response is a successful.
/// The response is successful.
Success(RpcSuccessResponse<E>),
Error(RpcErrorResponse, ErrorType),
Error(RpcErrorResponse, String),
/// Received a stream termination indicating which response is being terminated.
StreamTermination(ResponseTermination),
@@ -706,7 +747,7 @@ impl<E: EthSpec> RpcResponse<E> {
140 => RpcErrorResponse::BlobsNotFoundForBlock,
_ => RpcErrorResponse::Unknown,
};
RpcResponse::Error(code, err)
RpcResponse::Error(code, err.to_string())
}
/// Returns true if this response always terminates the stream.

View File

@@ -29,12 +29,14 @@ use self::protocol::RPCProtocol;
use self::self_limiter::SelfRateLimiter;
use crate::rpc::rate_limiter::RateLimiterItem;
use crate::rpc::response_limiter::ResponseLimiter;
use crate::MalloryConfig;
pub use handler::SubstreamId;
pub use methods::{
BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, LightClientBootstrapRequest,
ResponseTermination, RpcErrorResponse, StatusMessage,
};
pub use protocol::{Protocol, RPCError};
pub use methods::{RawMode, RawRequest};
pub use protocol::{Protocol, RPCError, SupportedProtocol};
pub(crate) mod codec;
pub mod config;
@@ -149,6 +151,16 @@ pub struct NetworkParams {
pub resp_timeout: Duration,
}
/// Additional configurations for the RPC Behaviour.
#[derive(Clone, Copy)]
pub struct MalloryLocalConfig {
/// Timeout in seconds for inbound connections.
pub inbound_timeout: u64,
/// Timeout for outbound connections.
pub outbound_timeout: u64,
pub self_handle_ping: bool,
}
/// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level
/// logic.
pub struct RPC<Id: ReqId, E: EthSpec> {
@@ -166,6 +178,9 @@ pub struct RPC<Id: ReqId, E: EthSpec> {
network_params: NetworkParams,
/// A sequential counter indicating when data gets modified.
seq_number: u64,
/// Mallory Config
config: MalloryLocalConfig,
}
impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
@@ -182,6 +197,7 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
outbound_rate_limiter_config: Option<OutboundRateLimiterConfig>,
network_params: NetworkParams,
seq_number: u64,
mallory_config: &MalloryConfig,
) -> Self {
let response_limiter = inbound_rate_limiter_config.map(|config| {
debug!(?config, "Using response rate limiting params");
@@ -193,6 +209,16 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
SelfRateLimiter::new(outbound_rate_limiter_config, fork_context.clone())
.expect("Outbound limiter configuration parameters are valid");
let mallory_config = MalloryLocalConfig {
inbound_timeout: mallory_config
.inbound_rpc_timeout
.unwrap_or(network_params.resp_timeout.as_secs()),
outbound_timeout: mallory_config
.outbound_rpc_timeout
.unwrap_or(network_params.resp_timeout.as_secs()),
self_handle_ping: mallory_config.user_handle_ping,
};
RPC {
response_limiter,
outbound_request_limiter,
@@ -202,6 +228,7 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
enable_light_client_server,
network_params,
seq_number,
config: mallory_config,
}
}
@@ -344,6 +371,15 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
trace!(%peer_id, "Sending Ping");
self.send_request(peer_id, id, RequestType::Ping(ping));
}
/// Sends a pong response
pub fn pong(&mut self, peer_id: PeerId, inbound_request_id: InboundRequestId, data: u64) {
self.send_response(
inbound_request_id,
RpcResponse::Success(RpcSuccessResponse::Pong(Ping { data })),
)
.expect("request should exist");
}
}
impl<Id, E> NetworkBehaviour for RPC<Id, E>
@@ -378,6 +414,7 @@ where
self.network_params.resp_timeout,
peer_id,
connection_id,
self.config.clone(),
);
Ok(handler)
@@ -408,6 +445,7 @@ where
self.network_params.resp_timeout,
peer_id,
connection_id,
self.config,
);
Ok(handler)

View File

@@ -733,6 +733,7 @@ pub enum RequestType<E: EthSpec> {
LightClientUpdatesByRange(LightClientUpdatesByRangeRequest),
Ping(Ping),
MetaData(MetadataRequest<E>),
Raw(RawRequest),
}
/// Implements the encoding per supported protocol for `RPCRequest`.
@@ -756,6 +757,7 @@ impl<E: EthSpec> RequestType<E> {
RequestType::LightClientOptimisticUpdate => 1,
RequestType::LightClientFinalityUpdate => 1,
RequestType::LightClientUpdatesByRange(req) => req.count,
RequestType::Raw(_) => 1,
}
}
@@ -795,6 +797,7 @@ impl<E: EthSpec> RequestType<E> {
RequestType::LightClientUpdatesByRange(_) => {
SupportedProtocol::LightClientUpdatesByRangeV1
}
RequestType::Raw(r) => r.protocol,
}
}
@@ -818,6 +821,7 @@ impl<E: EthSpec> RequestType<E> {
RequestType::LightClientFinalityUpdate => unreachable!(),
RequestType::LightClientOptimisticUpdate => unreachable!(),
RequestType::LightClientUpdatesByRange(_) => unreachable!(),
RequestType::Raw(_) => unreachable!(),
}
}
@@ -881,6 +885,7 @@ impl<E: EthSpec> RequestType<E> {
SupportedProtocol::LightClientUpdatesByRangeV1,
Encoding::SSZSnappy,
)],
RequestType::Raw(req) => vec![ProtocolId::new(req.protocol, Encoding::SSZSnappy)],
}
}
@@ -900,6 +905,7 @@ impl<E: EthSpec> RequestType<E> {
RequestType::LightClientOptimisticUpdate => true,
RequestType::LightClientFinalityUpdate => true,
RequestType::LightClientUpdatesByRange(_) => true,
RequestType::Raw(_) => true,
}
}
}
@@ -1021,6 +1027,7 @@ impl<E: EthSpec> std::fmt::Display for RequestType<E> {
RequestType::LightClientUpdatesByRange(_) => {
write!(f, "Light client updates by range request")
}
RequestType::Raw(raw) => write!(f, "Raw: {}", raw),
}
}
}

View File

@@ -1,7 +1,10 @@
use crate::rpc::methods::{ResponseTermination, RpcResponse, RpcSuccessResponse, StatusMessage};
use crate::rpc::methods::{
Ping, ResponseTermination, RpcResponse, RpcSuccessResponse, StatusMessage,
};
use libp2p::PeerId;
use std::fmt::{Display, Formatter};
use std::sync::Arc;
use strum::IntoStaticStr;
use types::{
BlobSidecar, DataColumnSidecar, Epoch, EthSpec, LightClientBootstrap,
LightClientFinalityUpdate, LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock,
@@ -124,8 +127,9 @@ pub enum AppRequestId {
// sent. The main difference is the absense of Pong and Metadata, which don't leave the
// Behaviour. For all protocol reponses managed by RPC see `RPCResponse` and
// `RPCCodedResponse`.
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, IntoStaticStr)]
pub enum Response<E: EthSpec> {
Ping(u64),
/// A Status message.
Status(StatusMessage),
/// A response to a get BLOCKS_BY_RANGE request. A None response signals the end of the batch.
@@ -178,6 +182,7 @@ impl<E: EthSpec> std::convert::From<Response<E>> for RpcResponse<E> {
None => RpcResponse::StreamTermination(ResponseTermination::DataColumnsByRange),
},
Response::Status(s) => RpcResponse::Success(RpcSuccessResponse::Status(s)),
Response::Ping(data) => RpcResponse::Success(RpcSuccessResponse::Pong(Ping { data })),
Response::LightClientBootstrap(b) => {
RpcResponse::Success(RpcSuccessResponse::LightClientBootstrap(b))
}

View File

@@ -0,0 +1,187 @@
use super::*;
use libp2p::core::transport::{ListenerId, TransportError};
use libp2p::core::ConnectedPoint;
use libp2p::swarm::*;
use std::io;
/// Custom error that can be produced by the [`ConnectionHandler`] of the [`NetworkBehaviour`].
#[derive(Debug)]
pub enum MallorySwarmEvent {
/// One of the listeners gracefully closed.
ListenerClosed {
/// The listener that closed.
listener_id: libp2p::core::transport::ListenerId,
/// The addresses that the listener was listening on. These addresses are now considered
/// expired, similar to if a [`ExpiredListenAddr`](SwarmEvent::ExpiredListenAddr) event
/// has been generated for each of them.
addresses: Vec<Multiaddr>,
/// Reason for the closure. Contains `Ok(())` if the stream produced `None`, or `Err`
/// if the stream produced an error.
reason: Result<(), std::io::Error>,
},
/// One of the listeners reported a non-fatal error.
ListenerError {
/// The listener that errored.
listener_id: ListenerId,
/// The listener error.
error: io::Error,
},
/// Outgoing connection attempt failed.
OutgoingConnectionError {
/// Identifier of the connection.
connection_id: ConnectionId,
/// If known, [`PeerId`] of the peer we tried to reach.
peer_id: Option<PeerId>,
/// Error that has been encountered.
error: DialError,
},
IncomingConnection {
/// Identifier of the connection.
connection_id: ConnectionId,
/// Local connection address.
/// This address has been earlier reported with a [`NewListenAddr`](SwarmEvent::NewListenAddr)
/// event.
local_addr: Multiaddr,
/// Address used to send back data to the remote.
send_back_addr: Multiaddr,
},
/// An error happened on a connection during its initial handshake.
///
/// This can include, for example, an error during the handshake of the encryption layer, or
/// the connection unexpectedly closed.
IncomingConnectionError {
/// Identifier of the connection.
connection_id: ConnectionId,
/// Local connection address.
/// This address has been earlier reported with a [`NewListenAddr`](SwarmEvent::NewListenAddr)
/// event.
local_addr: Multiaddr,
/// Address used to send back data to the remote.
send_back_addr: Multiaddr,
/// The error that happened.
error: ListenError,
},
Dialing {
/// Identity of the peer that we are connecting to.
peer_id: Option<PeerId>,
/// Identifier of the connection.
connection_id: ConnectionId,
},
ConnectionClosed {
/// Identity of the peer that we have connected to.
peer_id: PeerId,
/// Identifier of the connection.
connection_id: ConnectionId,
/// Endpoint of the connection that has been closed.
endpoint: ConnectedPoint,
/// Number of other remaining connections to this same peer.
num_established: u32,
/// Reason for the disconnection, if it was not a successful
/// active close.
cause: Option<String>,
},
/// A connection to the given peer has been opened.
ConnectionEstablished {
/// Identity of the peer that we have connected to.
peer_id: PeerId,
/// Identifier of the connection.
connection_id: ConnectionId,
/// Endpoint of the connection that has been opened.
endpoint: ConnectedPoint,
/// Number of established connections to this peer, including the one that has just been
/// opened.
num_established: std::num::NonZeroU32,
/// [`Some`] when the new connection is an outgoing connection.
/// Addresses are dialed concurrently. Contains the addresses and errors
/// of dial attempts that failed before the one successful dial.
concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<io::Error>)>>,
/// How long it took to establish this connection
established_in: std::time::Duration,
},
}
impl<B> TryFrom<SwarmEvent<B>> for MallorySwarmEvent {
type Error = SwarmEvent<B>;
fn try_from(event: SwarmEvent<B>) -> Result<MallorySwarmEvent, Self::Error> {
match event {
SwarmEvent::ListenerClosed {
listener_id,
addresses,
reason,
} => Ok(MallorySwarmEvent::ListenerClosed {
listener_id,
addresses,
reason,
}),
SwarmEvent::ListenerError { listener_id, error } => {
Ok(MallorySwarmEvent::ListenerError { listener_id, error })
}
SwarmEvent::OutgoingConnectionError {
connection_id,
peer_id,
error,
} => Ok(MallorySwarmEvent::OutgoingConnectionError {
connection_id,
peer_id,
error,
}),
SwarmEvent::IncomingConnection {
connection_id,
local_addr,
send_back_addr,
} => Ok(MallorySwarmEvent::IncomingConnection {
connection_id,
local_addr,
send_back_addr,
}),
SwarmEvent::IncomingConnectionError {
connection_id,
local_addr,
send_back_addr,
error,
} => Ok(MallorySwarmEvent::IncomingConnectionError {
connection_id,
local_addr,
send_back_addr,
error,
}),
SwarmEvent::Dialing {
peer_id,
connection_id,
} => Ok(MallorySwarmEvent::Dialing {
peer_id,
connection_id,
}),
SwarmEvent::ConnectionClosed {
peer_id,
connection_id,
endpoint,
num_established,
cause,
} => Ok(MallorySwarmEvent::ConnectionClosed {
peer_id,
connection_id,
endpoint,
num_established,
cause: cause.map(|v| format!("{:?}", v)),
}),
SwarmEvent::ConnectionEstablished {
peer_id,
connection_id,
endpoint,
num_established,
concurrent_dial_errors,
established_in,
} => Ok(MallorySwarmEvent::ConnectionEstablished {
peer_id,
connection_id,
endpoint,
num_established,
concurrent_dial_errors,
established_in,
}),
ev => Err(ev), // Don't pass other events up.
}
}
}
// Used for Mallory

View File

@@ -23,8 +23,8 @@ use crate::{metrics, Enr, NetworkGlobals, PubsubMessage, TopicHash};
use api_types::{AppRequestId, Response};
use futures::stream::StreamExt;
use gossipsub::{
IdentTopic as Topic, MessageAcceptance, MessageAuthenticity, MessageId, PublishError,
TopicScoreParams,
Config as GossipsubConfig, IdentTopic as Topic, MessageAcceptance, MessageAuthenticity,
MessageId, PublishError, RawMessage, TopicScoreParams,
};
use gossipsub_scoring_parameters::{lighthouse_gossip_thresholds, PeerScoreSettings};
use libp2p::multiaddr::{self, Multiaddr, Protocol as MProtocol};
@@ -48,7 +48,9 @@ use utils::{build_transport, strip_peer_id, Context as ServiceContext};
pub mod api_types;
mod gossip_cache;
pub mod gossipsub_scoring_parameters;
mod mallory;
pub mod utils;
pub use mallory::*;
/// The number of peers we target per subnet for discovery queries.
pub const TARGET_SUBNET_PEERS: usize = 3;
@@ -104,6 +106,10 @@ pub enum NetworkEvent<E: EthSpec> {
ZeroListeners,
/// A peer has an updated custody group count from MetaData.
PeerUpdatedCustodyGroupCount(PeerId),
/// Mallory: Identify has been received.
IdentifyReceived(PeerId),
/// Mallory: Pass swarm events to mallory to handle
MallorySwarmEvent(MallorySwarmEvent),
}
pub type Gossipsub = gossipsub::Behaviour<SnappyTransform, SubscriptionFilter>;
@@ -111,7 +117,7 @@ pub type SubscriptionFilter =
gossipsub::MaxCountSubscriptionFilter<gossipsub::WhitelistSubscriptionFilter>;
#[derive(NetworkBehaviour)]
pub(crate) struct Behaviour<E>
pub struct Behaviour<E>
where
E: EthSpec,
{
@@ -145,7 +151,7 @@ where
/// This core behaviour is managed by `Behaviour` which adds peer management to all core
/// behaviours.
pub struct Network<E: EthSpec> {
swarm: libp2p::swarm::Swarm<Behaviour<E>>,
pub swarm: libp2p::swarm::Swarm<Behaviour<E>>,
/* Auxiliary Fields */
/// A collections of variables accessible outside the network service.
network_globals: Arc<NetworkGlobals<E>>,
@@ -160,9 +166,11 @@ pub struct Network<E: EthSpec> {
score_settings: PeerScoreSettings<E>,
/// The interval for updating gossipsub scores
update_gossipsub_scores: tokio::time::Interval,
gossip_cache: GossipCache,
pub gossip_cache: GossipCache,
/// This node's PeerId.
pub local_peer_id: PeerId,
/// Mallory specific. User handles the ping requests.
user_handle_ping: bool,
}
/// Implements the combined behaviour for the libp2p service.
@@ -177,11 +185,12 @@ impl<E: EthSpec> Network<E> {
executor: task_executor::TaskExecutor,
mut ctx: ServiceContext<'_>,
custody_group_count: u64,
gs_config: Option<GossipsubConfig>,
) -> Result<(Self, Arc<NetworkGlobals<E>>), String> {
let config = ctx.config.clone();
trace!("Libp2p Service starting");
// initialise the node's ID
let local_keypair = utils::load_private_key(&config);
let local_keypair = ctx.keypair;
// Trusted peers will also be marked as explicit in GossipSub.
// Cfr. https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#explicit-peering-agreements
@@ -234,14 +243,18 @@ impl<E: EthSpec> Network<E> {
message_domain_valid_snappy: ctx.chain_spec.message_domain_valid_snappy,
gossipsub_max_transmit_size: ctx.chain_spec.max_message_size(),
};
let gs_config = gossipsub_config(
let gs_config = match gs_config {
Some(config) => config,
None => gossipsub_config(
config.network_load,
ctx.fork_context.clone(),
gossipsub_config_params,
ctx.chain_spec.seconds_per_slot,
E::slots_per_epoch(),
config.idontwant_message_size_threshold,
);
),
};
let score_settings = PeerScoreSettings::new(&ctx.chain_spec, gs_config.mesh_n());
@@ -386,6 +399,7 @@ impl<E: EthSpec> Network<E> {
config.outbound_rate_limiter_config.clone(),
network_params,
seq_number,
&config.attacker_config,
);
let discovery = {
@@ -424,20 +438,35 @@ impl<E: EthSpec> Network<E> {
quic_enabled: !config.disable_quic_support,
metrics_enabled: config.metrics_enabled,
target_peer_count: config.target_peers,
ping_interval_inbound: config
.attacker_config
.inbound_peers_ping
.unwrap_or(crate::peer_manager::config::DEFAULT_PING_INTERVAL_INBOUND),
ping_interval_outbound: config
.attacker_config
.outbound_peers_ping
.unwrap_or(crate::peer_manager::config::DEFAULT_PING_INTERVAL_OUTBOUND),
status_interval: config
.attacker_config
.status_interval
.unwrap_or(crate::peer_manager::config::DEFAULT_STATUS_INTERVAL),
..Default::default()
};
PeerManager::new(peer_manager_cfg, network_globals.clone())?
};
let max_incomming = if let Some(connections) = ctx.incoming_connections.as_ref() {
*connections
} else {
(config.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR - MIN_OUTBOUND_ONLY_FACTOR))
.ceil() as u32
};
let connection_limits = {
let limits = libp2p::connection_limits::ConnectionLimits::default()
.with_max_pending_incoming(Some(5))
.with_max_pending_incoming(Some(max_incomming))
.with_max_pending_outgoing(Some(16))
.with_max_established_incoming(Some(
(config.target_peers as f32
* (1.0 + PEER_EXCESS_FACTOR - MIN_OUTBOUND_ONLY_FACTOR))
.ceil() as u32,
))
.with_max_established_incoming(Some(max_incomming))
.with_max_established_outgoing(Some(
(config.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR)).ceil() as u32,
))
@@ -521,6 +550,7 @@ impl<E: EthSpec> Network<E> {
update_gossipsub_scores,
gossip_cache,
local_peer_id,
user_handle_ping: config.attacker_config.user_handle_ping,
};
network.start(&config).await?;
@@ -1439,7 +1469,7 @@ impl<E: EthSpec> Network<E> {
name = "libp2p",
skip_all
)]
fn send_meta_data_request(&mut self, peer_id: PeerId) {
pub fn send_meta_data_request(&mut self, peer_id: PeerId) {
let event = if self.fork_context.spec.is_peer_das_scheduled() {
// Nodes with higher custody will probably start advertising it
// before peerdas is activated
@@ -1743,10 +1773,25 @@ impl<E: EthSpec> Network<E> {
/* Behaviour managed protocols: Ping and Metadata */
RequestType::Ping(ping) => {
// inform the peer manager and send the response
if self.user_handle_ping {
return Some(NetworkEvent::RequestReceived {
peer_id,
inbound_request_id,
request_type,
});
}
self.peer_manager_mut().ping_request(&peer_id, ping.data);
None
}
RequestType::MetaData(_req) => {
RequestType::Raw(_) => {
// inform the peer manager and send the response
return Some(NetworkEvent::RequestReceived {
peer_id,
inbound_request_id,
request_type,
});
}
RequestType::MetaData(req) => {
// send the requested meta-data
let metadata = self.network_globals.local_metadata.read().clone();
// The encoder is responsible for sending the negotiated version of the metadata
@@ -1993,6 +2038,7 @@ impl<E: EthSpec> Network<E> {
}
// send peer info to the peer manager.
self.peer_manager_mut().identify(&peer_id, &info);
return Some(NetworkEvent::IdentifyReceived(peer_id));
}
identify::Event::Sent { .. } => {}
identify::Event::Error { .. } => {}
@@ -2122,7 +2168,15 @@ impl<E: EthSpec> Network<E> {
// Poll the libp2p `Swarm`.
// This will poll the swarm and do maintenance routines.
Some(event) = self.swarm.next() => {
if let Some(event) = self.parse_swarm_event(event) {
// Try convert to mallory event.This just passes some swarm events up to mallory,
// rather than processing here.
// Attempt passing swarm events up to Mallory
let swarm_event = match MallorySwarmEvent::try_from(event) {
Ok(ev) => return NetworkEvent::MallorySwarmEvent(ev),
Err(ev) => ev,
};
if let Some(event) = self.parse_swarm_event(swarm_event) {
return event;
}
},
@@ -2148,6 +2202,31 @@ impl<E: EthSpec> Network<E> {
}
}
}
#[instrument(parent = None,
level = "trace",
fields(service = "libp2p"),
name = "libp2p",
skip_all
)]
/// Publish a raw gossipsub RPC message to a specific target.
pub fn publish_raw_targeted(&mut self, msg: RawMessage, target: PeerId) {
if let Err(e) = self.gossipsub_mut().raw_publish_targeted(target, msg) {
warn!("error" = ?e, "Could not publish message");
}
}
#[instrument(parent = None,
level = "trace",
fields(service = "libp2p"),
name = "libp2p",
skip_all
)]
/// Publish a raw gossipsub RPC message to a specific target.
pub fn publish_raw(&mut self, msg: RawMessage, topic: Topic) {
if let Err(e) = self.gossipsub_mut().raw_publish(topic, msg) {
warn!("error" = ?e, "Could not publish message");
}
}
#[instrument(parent = None,
level = "trace",

View File

@@ -30,6 +30,8 @@ pub struct Context<'a> {
pub fork_context: Arc<ForkContext>,
pub chain_spec: Arc<ChainSpec>,
pub libp2p_registry: Option<&'a mut Registry>,
pub keypair: Keypair,
pub incoming_connections: Option<u32>,
}
type BoxedTransport = Boxed<(PeerId, StreamMuxerBox)>;

View File

@@ -1,7 +1,7 @@
use gossipsub::{IdentTopic as Topic, TopicHash};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use strum::AsRefStr;
use strum::{AsRefStr, IntoStaticStr};
use types::{ChainSpec, DataColumnSubnetId, EthSpec, ForkName, SubnetId, SyncSubnetId, Unsigned};
use crate::Subnet;
@@ -145,7 +145,7 @@ pub struct GossipTopic {
/// Enum that brings these topics into the rust type system.
// NOTE: There is intentionally no unknown type here. We only allow known gossipsub topics.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash, AsRefStr)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash, AsRefStr, IntoStaticStr)]
#[strum(serialize_all = "snake_case")]
pub enum GossipKind {
/// Topic for publishing beacon blocks.

View File

@@ -5,7 +5,7 @@ mod metrics;
mod nat;
mod network_beacon_processor;
mod persisted_dht;
mod router;
pub mod router;
mod status;
mod subnet_service;
mod sync;

View File

@@ -314,6 +314,7 @@ impl<T: BeaconChainTypes> Router<T> {
Response::LightClientBootstrap(_)
| Response::LightClientOptimisticUpdate(_)
| Response::LightClientFinalityUpdate(_)
| Response::Ping(_)
| Response::LightClientUpdatesByRange(_) => unreachable!(),
}
}

View File

@@ -267,6 +267,10 @@ impl<T: BeaconChainTypes> NetworkService<T> {
&beacon_chain.spec,
));
debug!(fork_name = ?fork_context.current_fork_name(), "Current fork");
let keypair = lighthouse_network::load_private_key(&config);
// construct the libp2p service context
let service_context = Context {
config: config.clone(),
@@ -274,6 +278,8 @@ impl<T: BeaconChainTypes> NetworkService<T> {
fork_context: fork_context.clone(),
chain_spec: beacon_chain.spec.clone(),
libp2p_registry,
keypair,
incoming_connections: None,
};
// launch libp2p service
@@ -284,6 +290,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
.data_availability_checker
.custody_context()
.custody_group_count_at_head(&beacon_chain.spec),
None
)
.await?;
@@ -475,6 +482,11 @@ impl<T: BeaconChainTypes> NetworkService<T> {
shutdown_sender: &mut Sender<ShutdownReason>,
) {
match ev {
// mallory event
NetworkEvent::MallorySwarmEvent(_) => {}
// mallory event
NetworkEvent::IdentifyReceived(_) => {}
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
self.send_to_router(RouterMessage::StatusPeer(peer_id));
}

View File

@@ -244,7 +244,7 @@ impl Eth2NetworkConfig {
}
}
fn get_genesis_state_from_bytes<E: EthSpec>(&self) -> Result<BeaconState<E>, String> {
pub fn get_genesis_state_from_bytes<E: EthSpec>(&self) -> Result<BeaconState<E>, String> {
let spec = self.chain_spec::<E>()?;
self.genesis_state_bytes
.as_ref()