Compare commits

...

1 Commits

Author SHA1 Message Date
Age Manning
497c7d7190 Mallory - Single commit 2025-07-31 17:05:08 -07:00
21 changed files with 1371 additions and 884 deletions

1683
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -131,9 +131,9 @@ deposit_contract = { path = "common/deposit_contract" }
derivative = "2"
directory = { path = "common/directory" }
dirs = "3"
discv5 = { version = "0.9", features = ["libp2p"] }
doppelganger_service = { path = "validator_client/doppelganger_service" }
either = "1.9"
discv5 = { git= "https://github.com/sigp/discv5", features = ["libp2p"], branch = "mallory" }
env_logger = "0.9"
environment = { path = "lighthouse/environment" }
eth2 = { path = "common/eth2" }
@@ -159,11 +159,11 @@ fork_choice = { path = "consensus/fork_choice" }
fs2 = "0.4"
futures = "0.3"
genesis = { path = "beacon_node/genesis" }
gossipsub = { package = "libp2p-gossipsub", git = "https://github.com/sigp/rust-libp2p.git", rev = "61b2820" }
graffiti_file = { path = "validator_client/graffiti_file" }
gossipsub = { package = "libp2p-gossipsub", git = "https://github.com/sigp/rust-libp2p.git", branch = "mallory" }
hex = "0.4"
hashlink = "0.9.0"
health_metrics = { path = "common/health_metrics" }
hex = "0.4"
http_api = { path = "beacon_node/http_api" }
hyper = "1"
initialized_validators = { path = "validator_client/initialized_validators" }

View File

@@ -35,7 +35,7 @@ pub struct Config {
pub network_dir: PathBuf,
/// IP addresses to listen on.
pub(crate) listen_addresses: ListenAddress,
pub listen_addresses: ListenAddress,
/// The address to broadcast to peers about which address we are listening on. None indicates
/// that no discovery address has been set in the CLI args.
@@ -142,6 +142,9 @@ pub struct Config {
/// Flag for advertising a fake CGC to peers for testing ONLY.
pub advertise_false_custody_group_count: Option<u64>,
/// Extra configurations for Mallory.
#[serde(skip)]
pub attacker_config: crate::MalloryConfig,
}
impl Config {
@@ -367,6 +370,7 @@ impl Default for Config {
inbound_rate_limiter_config: None,
idontwant_message_size_threshold: DEFAULT_IDONTWANT_MESSAGE_SIZE_THRESHOLD,
advertise_false_custody_group_count: None,
attacker_config: Default::default(),
}
}
}

View File

@@ -241,14 +241,8 @@ impl<E: EthSpec> Discovery<E> {
quic = bootnode_enr.quic4(),
"Adding node to routing table"
);
let repr = bootnode_enr.to_string();
let _ = discv5.add_enr(bootnode_enr).map_err(|e| {
error!(
addr = repr,
error = e.to_string(),
"Could not add peer to the local routing table"
)
});
// Error is suppressed for mallory
let _ = discv5.add_enr(bootnode_enr);
}
// Start the discv5 service and obtain an event stream

View File

@@ -2,11 +2,12 @@
/// all required libp2p functionality.
///
/// This crate builds and manages the libp2p services required by the beacon node.
mod config;
pub mod config;
pub mod service;
pub mod discovery;
pub mod listen_addr;
mod mallory_config;
pub mod metrics;
pub mod peer_manager;
pub mod rpc;
@@ -38,6 +39,12 @@ impl FromStr for PeerIdSerialized {
}
}
impl From<PeerId> for PeerIdSerialized {
fn from(peer_id: PeerId) -> Self {
PeerIdSerialized(peer_id)
}
}
impl Serialize for PeerIdSerialized {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
@@ -111,8 +118,9 @@ pub use discovery::{CombinedKeyExt, EnrExt, Eth2Enr};
pub use discv5;
pub use gossipsub::{IdentTopic, MessageAcceptance, MessageId, Topic, TopicHash};
pub use libp2p;
pub use libp2p::{core::ConnectedPoint, PeerId, Swarm};
pub use libp2p::{core::ConnectedPoint, identity::Keypair, PeerId, Swarm};
pub use libp2p::{multiaddr, Multiaddr};
pub use mallory_config::MalloryConfig;
pub use metrics::scrape_discovery_metrics;
pub use peer_manager::{
peerdb::client::Client,
@@ -120,6 +128,7 @@ pub use peer_manager::{
peerdb::PeerDB,
ConnectionDirection, PeerConnectionStatus, PeerInfo, PeerManager, SyncInfo, SyncStatus,
};
pub use service::Behaviour;
// pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_FILENAME};
pub use service::api_types::Response;
pub use service::utils::*;

View File

@@ -0,0 +1,37 @@
/// Every configuration needed for Mallory.
#[derive(Debug, Clone)]
pub struct MalloryConfig {
/* Peer manager stuff */
/// Ping inbound peers this often (in seconds) instead of the default `PING_INTERVAL_INBOUND`.
pub inbound_peers_ping: Option<u64>,
/// Ping outbound peers this often (in seconds) instead of the default `PING_INTERVAL_OUTBOUND`.
pub outbound_peers_ping: Option<u64>,
/// Status peers this often (in seconds) instead of the default `STATUS_INTERVAL`.
pub status_interval: Option<u64>,
/* RPC stuff */
/// Duration in seconds after which an inbound connection with a peer times out instead of the
/// default `RESPONSE_TIMEOUT`.
pub inbound_rpc_timeout: Option<u64>,
/// Duration in seconds after which an outbound connection with a peer times out instead of the
/// default `RESPONSE_TIMEOUT`.
pub outbound_rpc_timeout: Option<u64>,
/* Behaviour Stuff */
// Allow the user to handle a ping request
pub user_handle_ping: bool,
}
impl Default for MalloryConfig {
fn default() -> Self {
Self {
inbound_peers_ping: None,
outbound_peers_ping: None,
status_interval: None,
inbound_rpc_timeout: None,
outbound_rpc_timeout: None,
user_handle_ping: false,
}
}
}

View File

@@ -3,11 +3,11 @@
//! Currently using identify to fingerprint.
use libp2p::identify::Info as IdentifyInfo;
use serde::Serialize;
use serde::{Deserialize, Serialize};
use strum::{AsRefStr, EnumIter, IntoStaticStr};
/// Various client and protocol information related to a node.
#[derive(Clone, Debug, Serialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Client {
/// The client's name (Ex: lighthouse, prism, nimbus, etc)
pub kind: ClientKind,
@@ -21,7 +21,9 @@ pub struct Client {
pub agent_string: Option<String>,
}
#[derive(Clone, Copy, Debug, Serialize, PartialEq, AsRefStr, IntoStaticStr, EnumIter)]
#[derive(
Clone, Copy, Debug, Serialize, PartialEq, AsRefStr, IntoStaticStr, EnumIter, Deserialize,
)]
pub enum ClientKind {
/// A lighthouse node (the best kind).
Lighthouse,

View File

@@ -100,7 +100,7 @@ impl<E: EthSpec> SSZSnappyInboundCodec<E> {
}
}
},
RpcResponse::Error(_, err) => err.as_ssz_bytes(),
RpcResponse::Error(_, err) => err.as_bytes().to_vec().as_ssz_bytes(),
RpcResponse::StreamTermination(_) => {
unreachable!("Code error - attempting to encode a stream termination")
}
@@ -334,6 +334,14 @@ impl<E: EthSpec> Encoder<RequestType<E>> for SSZSnappyOutboundCodec<E> {
type Error = RPCError;
fn encode(&mut self, item: RequestType<E>, dst: &mut BytesMut) -> Result<(), Self::Error> {
let compress = !matches!(
&item,
RequestType::Raw(RawRequest {
mode: RawMode::Raw,
..
})
);
let bytes = match item {
RequestType::Status(req) => {
// Send the status message based on the negotiated protocol
@@ -365,14 +373,25 @@ impl<E: EthSpec> Encoder<RequestType<E>> for SSZSnappyOutboundCodec<E> {
RequestType::MetaData(_)
| RequestType::LightClientOptimisticUpdate
| RequestType::LightClientFinalityUpdate => return Ok(()),
RequestType::Raw(RawRequest {
bytes,
protocol: _,
mode,
}) => match mode {
RawMode::EncodeAndCompress => bytes.as_ssz_bytes(),
RawMode::Compress | RawMode::Raw => bytes,
},
};
// Mallory doesn't care about inbound limits
/*
// SSZ encoded bytes should be within `max_packet_size`
if bytes.len() > self.max_packet_size {
return Err(RPCError::InternalError(
"attempting to encode data > max_packet_size",
));
}
*/
// Inserts the length prefix of the uncompressed bytes into dst
// encoded as a unsigned varint
@@ -380,12 +399,14 @@ impl<E: EthSpec> Encoder<RequestType<E>> for SSZSnappyOutboundCodec<E> {
.encode(bytes.len(), dst)
.map_err(RPCError::from)?;
let mut writer = FrameEncoder::new(Vec::new());
writer.write_all(&bytes).map_err(RPCError::from)?;
writer.flush().map_err(RPCError::from)?;
// Write compressed bytes to `dst`
dst.extend_from_slice(writer.get_ref());
if compress {
let mut writer = FrameEncoder::new(Vec::new());
writer.write_all(&bytes).map_err(RPCError::from)?;
writer.flush().map_err(RPCError::from)?;
dst.extend_from_slice(writer.get_ref());
} else {
dst.extend_from_slice(&bytes);
}
Ok(())
}
}

View File

@@ -4,6 +4,7 @@
use super::methods::{GoodbyeReason, RpcErrorResponse, RpcResponse};
use super::outbound::OutboundRequestContainer;
use super::protocol::{InboundOutput, Protocol, RPCError, RPCProtocol, RequestType};
use super::MalloryLocalConfig;
use super::{RPCReceived, RPCSend, ReqId};
use crate::rpc::outbound::OutboundFramed;
use crate::rpc::protocol::InboundFramed;
@@ -143,6 +144,9 @@ where
/// Timeout that will be used for inbound and outbound responses.
resp_timeout: Duration,
/// Additional configurations for the RPC Handler
config: MalloryLocalConfig,
}
enum HandlerState {
@@ -227,6 +231,7 @@ where
resp_timeout: Duration,
peer_id: PeerId,
connection_id: ConnectionId,
config: MalloryLocalConfig,
) -> Self {
RPCHandler {
connection_id,
@@ -247,6 +252,7 @@ where
fork_context,
waker: None,
resp_timeout,
config,
}
}
@@ -711,8 +717,10 @@ where
request,
};
substream_entry.max_remaining_chunks = Some(max_remaining_chunks);
self.outbound_substreams_delay
.reset(delay_key, self.resp_timeout);
self.outbound_substreams_delay.reset(
delay_key,
Duration::from_secs(self.config.outbound_timeout),
);
}
}
@@ -1035,9 +1043,10 @@ where
Some(max_responses)
};
// new outbound request. Store the stream and tag the output.
let delay_key = self
.outbound_substreams_delay
.insert(self.current_outbound_substream_id, self.resp_timeout);
let delay_key = self.outbound_substreams_delay.insert(
self.current_outbound_substream_id,
Duration::from_secs(self.config.outbound_timeout),
);
let awaiting_stream = OutboundSubstreamState::RequestPendingResponse {
substream: Box::new(substream),
request,

View File

@@ -1,5 +1,6 @@
//! Available RPC methods types and ids.
use super::protocol::SupportedProtocol;
use crate::types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield};
use regex::bytes::Regex;
use serde::Serialize;
@@ -11,6 +12,7 @@ use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::Arc;
use strum::IntoStaticStr;
use strum::{Display as StrumDisplay, EnumString};
use superstruct::superstruct;
use types::blob_sidecar::BlobIdentifier;
use types::light_client_update::MAX_REQUEST_LIGHT_CLIENT_UPDATES;
@@ -129,6 +131,38 @@ pub struct Ping {
pub data: u64,
}
#[derive(Debug, Clone, PartialEq)]
pub struct RawRequest {
pub bytes: Vec<u8>,
pub protocol: SupportedProtocol,
pub mode: RawMode,
}
#[derive(Debug, Clone, PartialEq, EnumString, StrumDisplay)]
pub enum RawMode {
/// SSZ encode, Snappy compress.
#[strum(serialize = "encode-compress")]
EncodeAndCompress,
/// Only Snappy compress.
#[strum(serialize = "compress")]
Compress,
/// Do not alter the bytes.
#[strum(serialize = "raw")]
Raw,
}
impl Default for RawMode {
fn default() -> Self {
RawMode::EncodeAndCompress
}
}
impl std::fmt::Display for RawRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
/// The METADATA request structure.
#[superstruct(
variants(V1, V2, V3),
@@ -411,6 +445,8 @@ impl DataColumnsByRangeRequest {
}
}
const MALLORY_MAX_REQUEST_BLOCKS: usize = 10000000000000000000;
/// Request a number of beacon block roots from a peer.
#[superstruct(
variants(V1, V2),
@@ -496,6 +532,11 @@ impl BlocksByRootRequest {
let block_roots = RuntimeVariableList::from_vec(block_roots, max_request_blocks);
Self::V1(BlocksByRootRequestV1 { block_roots })
}
pub fn mallory_new(block_roots: Vec<Hash256>) -> Self {
let block_roots = RuntimeVariableList::from_vec(block_roots, MALLORY_MAX_REQUEST_BLOCKS);
Self::V2(BlocksByRootRequestV2 { block_roots })
}
}
/// Request a number of beacon blocks and blobs from a peer.
@@ -653,10 +694,10 @@ impl ResponseTermination {
/// and the contents of the response
#[derive(Debug, Clone)]
pub enum RpcResponse<E: EthSpec> {
/// The response is a successful.
/// The response is successful.
Success(RpcSuccessResponse<E>),
Error(RpcErrorResponse, ErrorType),
Error(RpcErrorResponse, String),
/// Received a stream termination indicating which response is being terminated.
StreamTermination(ResponseTermination),
@@ -706,7 +747,7 @@ impl<E: EthSpec> RpcResponse<E> {
140 => RpcErrorResponse::BlobsNotFoundForBlock,
_ => RpcErrorResponse::Unknown,
};
RpcResponse::Error(code, err)
RpcResponse::Error(code, err.to_string())
}
/// Returns true if this response always terminates the stream.

View File

@@ -29,12 +29,14 @@ use self::protocol::RPCProtocol;
use self::self_limiter::SelfRateLimiter;
use crate::rpc::rate_limiter::RateLimiterItem;
use crate::rpc::response_limiter::ResponseLimiter;
use crate::MalloryConfig;
pub use handler::SubstreamId;
pub use methods::{
BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, LightClientBootstrapRequest,
ResponseTermination, RpcErrorResponse, StatusMessage,
};
pub use protocol::{Protocol, RPCError};
pub use methods::{RawMode, RawRequest};
pub use protocol::{Protocol, RPCError, SupportedProtocol};
pub(crate) mod codec;
pub mod config;
@@ -149,6 +151,16 @@ pub struct NetworkParams {
pub resp_timeout: Duration,
}
/// Additional configurations for the RPC Behaviour.
#[derive(Clone, Copy)]
pub struct MalloryLocalConfig {
/// Timeout in seconds for inbound connections.
pub inbound_timeout: u64,
/// Timeout for outbound connections.
pub outbound_timeout: u64,
pub self_handle_ping: bool,
}
/// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level
/// logic.
pub struct RPC<Id: ReqId, E: EthSpec> {
@@ -166,6 +178,9 @@ pub struct RPC<Id: ReqId, E: EthSpec> {
network_params: NetworkParams,
/// A sequential counter indicating when data gets modified.
seq_number: u64,
/// Mallory Config
config: MalloryLocalConfig,
}
impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
@@ -182,6 +197,7 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
outbound_rate_limiter_config: Option<OutboundRateLimiterConfig>,
network_params: NetworkParams,
seq_number: u64,
mallory_config: &MalloryConfig,
) -> Self {
let response_limiter = inbound_rate_limiter_config.map(|config| {
debug!(?config, "Using response rate limiting params");
@@ -193,6 +209,16 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
SelfRateLimiter::new(outbound_rate_limiter_config, fork_context.clone())
.expect("Outbound limiter configuration parameters are valid");
let mallory_config = MalloryLocalConfig {
inbound_timeout: mallory_config
.inbound_rpc_timeout
.unwrap_or(network_params.resp_timeout.as_secs()),
outbound_timeout: mallory_config
.outbound_rpc_timeout
.unwrap_or(network_params.resp_timeout.as_secs()),
self_handle_ping: mallory_config.user_handle_ping,
};
RPC {
response_limiter,
outbound_request_limiter,
@@ -202,6 +228,7 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
enable_light_client_server,
network_params,
seq_number,
config: mallory_config,
}
}
@@ -344,6 +371,15 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
trace!(%peer_id, "Sending Ping");
self.send_request(peer_id, id, RequestType::Ping(ping));
}
/// Sends a pong response
pub fn pong(&mut self, peer_id: PeerId, inbound_request_id: InboundRequestId, data: u64) {
self.send_response(
inbound_request_id,
RpcResponse::Success(RpcSuccessResponse::Pong(Ping { data })),
)
.expect("request should exist");
}
}
impl<Id, E> NetworkBehaviour for RPC<Id, E>
@@ -378,6 +414,7 @@ where
self.network_params.resp_timeout,
peer_id,
connection_id,
self.config.clone(),
);
Ok(handler)
@@ -408,6 +445,7 @@ where
self.network_params.resp_timeout,
peer_id,
connection_id,
self.config,
);
Ok(handler)

View File

@@ -733,6 +733,7 @@ pub enum RequestType<E: EthSpec> {
LightClientUpdatesByRange(LightClientUpdatesByRangeRequest),
Ping(Ping),
MetaData(MetadataRequest<E>),
Raw(RawRequest),
}
/// Implements the encoding per supported protocol for `RPCRequest`.
@@ -756,6 +757,7 @@ impl<E: EthSpec> RequestType<E> {
RequestType::LightClientOptimisticUpdate => 1,
RequestType::LightClientFinalityUpdate => 1,
RequestType::LightClientUpdatesByRange(req) => req.count,
RequestType::Raw(_) => 1,
}
}
@@ -795,6 +797,7 @@ impl<E: EthSpec> RequestType<E> {
RequestType::LightClientUpdatesByRange(_) => {
SupportedProtocol::LightClientUpdatesByRangeV1
}
RequestType::Raw(r) => r.protocol,
}
}
@@ -818,6 +821,7 @@ impl<E: EthSpec> RequestType<E> {
RequestType::LightClientFinalityUpdate => unreachable!(),
RequestType::LightClientOptimisticUpdate => unreachable!(),
RequestType::LightClientUpdatesByRange(_) => unreachable!(),
RequestType::Raw(_) => unreachable!(),
}
}
@@ -881,6 +885,7 @@ impl<E: EthSpec> RequestType<E> {
SupportedProtocol::LightClientUpdatesByRangeV1,
Encoding::SSZSnappy,
)],
RequestType::Raw(req) => vec![ProtocolId::new(req.protocol, Encoding::SSZSnappy)],
}
}
@@ -900,6 +905,7 @@ impl<E: EthSpec> RequestType<E> {
RequestType::LightClientOptimisticUpdate => true,
RequestType::LightClientFinalityUpdate => true,
RequestType::LightClientUpdatesByRange(_) => true,
RequestType::Raw(_) => true,
}
}
}
@@ -1021,6 +1027,7 @@ impl<E: EthSpec> std::fmt::Display for RequestType<E> {
RequestType::LightClientUpdatesByRange(_) => {
write!(f, "Light client updates by range request")
}
RequestType::Raw(raw) => write!(f, "Raw: {}", raw),
}
}
}

View File

@@ -1,7 +1,10 @@
use crate::rpc::methods::{ResponseTermination, RpcResponse, RpcSuccessResponse, StatusMessage};
use crate::rpc::methods::{
Ping, ResponseTermination, RpcResponse, RpcSuccessResponse, StatusMessage,
};
use libp2p::PeerId;
use std::fmt::{Display, Formatter};
use std::sync::Arc;
use strum::IntoStaticStr;
use types::{
BlobSidecar, DataColumnSidecar, Epoch, EthSpec, LightClientBootstrap,
LightClientFinalityUpdate, LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock,
@@ -124,8 +127,9 @@ pub enum AppRequestId {
// sent. The main difference is the absense of Pong and Metadata, which don't leave the
// Behaviour. For all protocol reponses managed by RPC see `RPCResponse` and
// `RPCCodedResponse`.
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, IntoStaticStr)]
pub enum Response<E: EthSpec> {
Ping(u64),
/// A Status message.
Status(StatusMessage),
/// A response to a get BLOCKS_BY_RANGE request. A None response signals the end of the batch.
@@ -178,6 +182,7 @@ impl<E: EthSpec> std::convert::From<Response<E>> for RpcResponse<E> {
None => RpcResponse::StreamTermination(ResponseTermination::DataColumnsByRange),
},
Response::Status(s) => RpcResponse::Success(RpcSuccessResponse::Status(s)),
Response::Ping(data) => RpcResponse::Success(RpcSuccessResponse::Pong(Ping { data })),
Response::LightClientBootstrap(b) => {
RpcResponse::Success(RpcSuccessResponse::LightClientBootstrap(b))
}

View File

@@ -0,0 +1,187 @@
use super::*;
use libp2p::core::transport::{ListenerId, TransportError};
use libp2p::core::ConnectedPoint;
use libp2p::swarm::*;
use std::io;
/// Custom error that can be produced by the [`ConnectionHandler`] of the [`NetworkBehaviour`].
#[derive(Debug)]
pub enum MallorySwarmEvent {
/// One of the listeners gracefully closed.
ListenerClosed {
/// The listener that closed.
listener_id: libp2p::core::transport::ListenerId,
/// The addresses that the listener was listening on. These addresses are now considered
/// expired, similar to if a [`ExpiredListenAddr`](SwarmEvent::ExpiredListenAddr) event
/// has been generated for each of them.
addresses: Vec<Multiaddr>,
/// Reason for the closure. Contains `Ok(())` if the stream produced `None`, or `Err`
/// if the stream produced an error.
reason: Result<(), std::io::Error>,
},
/// One of the listeners reported a non-fatal error.
ListenerError {
/// The listener that errored.
listener_id: ListenerId,
/// The listener error.
error: io::Error,
},
/// Outgoing connection attempt failed.
OutgoingConnectionError {
/// Identifier of the connection.
connection_id: ConnectionId,
/// If known, [`PeerId`] of the peer we tried to reach.
peer_id: Option<PeerId>,
/// Error that has been encountered.
error: DialError,
},
IncomingConnection {
/// Identifier of the connection.
connection_id: ConnectionId,
/// Local connection address.
/// This address has been earlier reported with a [`NewListenAddr`](SwarmEvent::NewListenAddr)
/// event.
local_addr: Multiaddr,
/// Address used to send back data to the remote.
send_back_addr: Multiaddr,
},
/// An error happened on a connection during its initial handshake.
///
/// This can include, for example, an error during the handshake of the encryption layer, or
/// the connection unexpectedly closed.
IncomingConnectionError {
/// Identifier of the connection.
connection_id: ConnectionId,
/// Local connection address.
/// This address has been earlier reported with a [`NewListenAddr`](SwarmEvent::NewListenAddr)
/// event.
local_addr: Multiaddr,
/// Address used to send back data to the remote.
send_back_addr: Multiaddr,
/// The error that happened.
error: ListenError,
},
Dialing {
/// Identity of the peer that we are connecting to.
peer_id: Option<PeerId>,
/// Identifier of the connection.
connection_id: ConnectionId,
},
ConnectionClosed {
/// Identity of the peer that we have connected to.
peer_id: PeerId,
/// Identifier of the connection.
connection_id: ConnectionId,
/// Endpoint of the connection that has been closed.
endpoint: ConnectedPoint,
/// Number of other remaining connections to this same peer.
num_established: u32,
/// Reason for the disconnection, if it was not a successful
/// active close.
cause: Option<String>,
},
/// A connection to the given peer has been opened.
ConnectionEstablished {
/// Identity of the peer that we have connected to.
peer_id: PeerId,
/// Identifier of the connection.
connection_id: ConnectionId,
/// Endpoint of the connection that has been opened.
endpoint: ConnectedPoint,
/// Number of established connections to this peer, including the one that has just been
/// opened.
num_established: std::num::NonZeroU32,
/// [`Some`] when the new connection is an outgoing connection.
/// Addresses are dialed concurrently. Contains the addresses and errors
/// of dial attempts that failed before the one successful dial.
concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<io::Error>)>>,
/// How long it took to establish this connection
established_in: std::time::Duration,
},
}
impl<B> TryFrom<SwarmEvent<B>> for MallorySwarmEvent {
type Error = SwarmEvent<B>;
fn try_from(event: SwarmEvent<B>) -> Result<MallorySwarmEvent, Self::Error> {
match event {
SwarmEvent::ListenerClosed {
listener_id,
addresses,
reason,
} => Ok(MallorySwarmEvent::ListenerClosed {
listener_id,
addresses,
reason,
}),
SwarmEvent::ListenerError { listener_id, error } => {
Ok(MallorySwarmEvent::ListenerError { listener_id, error })
}
SwarmEvent::OutgoingConnectionError {
connection_id,
peer_id,
error,
} => Ok(MallorySwarmEvent::OutgoingConnectionError {
connection_id,
peer_id,
error,
}),
SwarmEvent::IncomingConnection {
connection_id,
local_addr,
send_back_addr,
} => Ok(MallorySwarmEvent::IncomingConnection {
connection_id,
local_addr,
send_back_addr,
}),
SwarmEvent::IncomingConnectionError {
connection_id,
local_addr,
send_back_addr,
error,
} => Ok(MallorySwarmEvent::IncomingConnectionError {
connection_id,
local_addr,
send_back_addr,
error,
}),
SwarmEvent::Dialing {
peer_id,
connection_id,
} => Ok(MallorySwarmEvent::Dialing {
peer_id,
connection_id,
}),
SwarmEvent::ConnectionClosed {
peer_id,
connection_id,
endpoint,
num_established,
cause,
} => Ok(MallorySwarmEvent::ConnectionClosed {
peer_id,
connection_id,
endpoint,
num_established,
cause: cause.map(|v| format!("{:?}", v)),
}),
SwarmEvent::ConnectionEstablished {
peer_id,
connection_id,
endpoint,
num_established,
concurrent_dial_errors,
established_in,
} => Ok(MallorySwarmEvent::ConnectionEstablished {
peer_id,
connection_id,
endpoint,
num_established,
concurrent_dial_errors,
established_in,
}),
ev => Err(ev), // Don't pass other events up.
}
}
}
// Used for Mallory

View File

@@ -23,8 +23,8 @@ use crate::{metrics, Enr, NetworkGlobals, PubsubMessage, TopicHash};
use api_types::{AppRequestId, Response};
use futures::stream::StreamExt;
use gossipsub::{
IdentTopic as Topic, MessageAcceptance, MessageAuthenticity, MessageId, PublishError,
TopicScoreParams,
Config as GossipsubConfig, IdentTopic as Topic, MessageAcceptance, MessageAuthenticity,
MessageId, PublishError, RawMessage, TopicScoreParams,
};
use gossipsub_scoring_parameters::{lighthouse_gossip_thresholds, PeerScoreSettings};
use libp2p::multiaddr::{self, Multiaddr, Protocol as MProtocol};
@@ -48,7 +48,9 @@ use utils::{build_transport, strip_peer_id, Context as ServiceContext};
pub mod api_types;
mod gossip_cache;
pub mod gossipsub_scoring_parameters;
mod mallory;
pub mod utils;
pub use mallory::*;
/// The number of peers we target per subnet for discovery queries.
pub const TARGET_SUBNET_PEERS: usize = 3;
@@ -104,6 +106,10 @@ pub enum NetworkEvent<E: EthSpec> {
ZeroListeners,
/// A peer has an updated custody group count from MetaData.
PeerUpdatedCustodyGroupCount(PeerId),
/// Mallory: Identify has been received.
IdentifyReceived(PeerId),
/// Mallory: Pass swarm events to mallory to handle
MallorySwarmEvent(MallorySwarmEvent),
}
pub type Gossipsub = gossipsub::Behaviour<SnappyTransform, SubscriptionFilter>;
@@ -111,7 +117,7 @@ pub type SubscriptionFilter =
gossipsub::MaxCountSubscriptionFilter<gossipsub::WhitelistSubscriptionFilter>;
#[derive(NetworkBehaviour)]
pub(crate) struct Behaviour<E>
pub struct Behaviour<E>
where
E: EthSpec,
{
@@ -145,7 +151,7 @@ where
/// This core behaviour is managed by `Behaviour` which adds peer management to all core
/// behaviours.
pub struct Network<E: EthSpec> {
swarm: libp2p::swarm::Swarm<Behaviour<E>>,
pub swarm: libp2p::swarm::Swarm<Behaviour<E>>,
/* Auxiliary Fields */
/// A collections of variables accessible outside the network service.
network_globals: Arc<NetworkGlobals<E>>,
@@ -160,9 +166,11 @@ pub struct Network<E: EthSpec> {
score_settings: PeerScoreSettings<E>,
/// The interval for updating gossipsub scores
update_gossipsub_scores: tokio::time::Interval,
gossip_cache: GossipCache,
pub gossip_cache: GossipCache,
/// This node's PeerId.
pub local_peer_id: PeerId,
/// Mallory specific. User handles the ping requests.
user_handle_ping: bool,
}
/// Implements the combined behaviour for the libp2p service.
@@ -177,11 +185,12 @@ impl<E: EthSpec> Network<E> {
executor: task_executor::TaskExecutor,
mut ctx: ServiceContext<'_>,
custody_group_count: u64,
gs_config: Option<GossipsubConfig>,
) -> Result<(Self, Arc<NetworkGlobals<E>>), String> {
let config = ctx.config.clone();
trace!("Libp2p Service starting");
// initialise the node's ID
let local_keypair = utils::load_private_key(&config);
let local_keypair = ctx.keypair;
// Trusted peers will also be marked as explicit in GossipSub.
// Cfr. https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#explicit-peering-agreements
@@ -234,14 +243,18 @@ impl<E: EthSpec> Network<E> {
message_domain_valid_snappy: ctx.chain_spec.message_domain_valid_snappy,
gossipsub_max_transmit_size: ctx.chain_spec.max_message_size(),
};
let gs_config = gossipsub_config(
config.network_load,
ctx.fork_context.clone(),
gossipsub_config_params,
ctx.chain_spec.seconds_per_slot,
E::slots_per_epoch(),
config.idontwant_message_size_threshold,
);
let gs_config = match gs_config {
Some(config) => config,
None => gossipsub_config(
config.network_load,
ctx.fork_context.clone(),
gossipsub_config_params,
ctx.chain_spec.seconds_per_slot,
E::slots_per_epoch(),
config.idontwant_message_size_threshold,
),
};
let score_settings = PeerScoreSettings::new(&ctx.chain_spec, gs_config.mesh_n());
@@ -386,6 +399,7 @@ impl<E: EthSpec> Network<E> {
config.outbound_rate_limiter_config.clone(),
network_params,
seq_number,
&config.attacker_config,
);
let discovery = {
@@ -424,20 +438,35 @@ impl<E: EthSpec> Network<E> {
quic_enabled: !config.disable_quic_support,
metrics_enabled: config.metrics_enabled,
target_peer_count: config.target_peers,
ping_interval_inbound: config
.attacker_config
.inbound_peers_ping
.unwrap_or(crate::peer_manager::config::DEFAULT_PING_INTERVAL_INBOUND),
ping_interval_outbound: config
.attacker_config
.outbound_peers_ping
.unwrap_or(crate::peer_manager::config::DEFAULT_PING_INTERVAL_OUTBOUND),
status_interval: config
.attacker_config
.status_interval
.unwrap_or(crate::peer_manager::config::DEFAULT_STATUS_INTERVAL),
..Default::default()
};
PeerManager::new(peer_manager_cfg, network_globals.clone())?
};
let max_incomming = if let Some(connections) = ctx.incoming_connections.as_ref() {
*connections
} else {
(config.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR - MIN_OUTBOUND_ONLY_FACTOR))
.ceil() as u32
};
let connection_limits = {
let limits = libp2p::connection_limits::ConnectionLimits::default()
.with_max_pending_incoming(Some(5))
.with_max_pending_incoming(Some(max_incomming))
.with_max_pending_outgoing(Some(16))
.with_max_established_incoming(Some(
(config.target_peers as f32
* (1.0 + PEER_EXCESS_FACTOR - MIN_OUTBOUND_ONLY_FACTOR))
.ceil() as u32,
))
.with_max_established_incoming(Some(max_incomming))
.with_max_established_outgoing(Some(
(config.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR)).ceil() as u32,
))
@@ -521,6 +550,7 @@ impl<E: EthSpec> Network<E> {
update_gossipsub_scores,
gossip_cache,
local_peer_id,
user_handle_ping: config.attacker_config.user_handle_ping,
};
network.start(&config).await?;
@@ -1439,7 +1469,7 @@ impl<E: EthSpec> Network<E> {
name = "libp2p",
skip_all
)]
fn send_meta_data_request(&mut self, peer_id: PeerId) {
pub fn send_meta_data_request(&mut self, peer_id: PeerId) {
let event = if self.fork_context.spec.is_peer_das_scheduled() {
// Nodes with higher custody will probably start advertising it
// before peerdas is activated
@@ -1743,10 +1773,25 @@ impl<E: EthSpec> Network<E> {
/* Behaviour managed protocols: Ping and Metadata */
RequestType::Ping(ping) => {
// inform the peer manager and send the response
if self.user_handle_ping {
return Some(NetworkEvent::RequestReceived {
peer_id,
inbound_request_id,
request_type,
});
}
self.peer_manager_mut().ping_request(&peer_id, ping.data);
None
}
RequestType::MetaData(_req) => {
RequestType::Raw(_) => {
// inform the peer manager and send the response
return Some(NetworkEvent::RequestReceived {
peer_id,
inbound_request_id,
request_type,
});
}
RequestType::MetaData(req) => {
// send the requested meta-data
let metadata = self.network_globals.local_metadata.read().clone();
// The encoder is responsible for sending the negotiated version of the metadata
@@ -1993,6 +2038,7 @@ impl<E: EthSpec> Network<E> {
}
// send peer info to the peer manager.
self.peer_manager_mut().identify(&peer_id, &info);
return Some(NetworkEvent::IdentifyReceived(peer_id));
}
identify::Event::Sent { .. } => {}
identify::Event::Error { .. } => {}
@@ -2122,7 +2168,15 @@ impl<E: EthSpec> Network<E> {
// Poll the libp2p `Swarm`.
// This will poll the swarm and do maintenance routines.
Some(event) = self.swarm.next() => {
if let Some(event) = self.parse_swarm_event(event) {
// Try convert to mallory event.This just passes some swarm events up to mallory,
// rather than processing here.
// Attempt passing swarm events up to Mallory
let swarm_event = match MallorySwarmEvent::try_from(event) {
Ok(ev) => return NetworkEvent::MallorySwarmEvent(ev),
Err(ev) => ev,
};
if let Some(event) = self.parse_swarm_event(swarm_event) {
return event;
}
},
@@ -2148,6 +2202,31 @@ impl<E: EthSpec> Network<E> {
}
}
}
#[instrument(parent = None,
level = "trace",
fields(service = "libp2p"),
name = "libp2p",
skip_all
)]
/// Publish a raw gossipsub RPC message to a specific target.
pub fn publish_raw_targeted(&mut self, msg: RawMessage, target: PeerId) {
if let Err(e) = self.gossipsub_mut().raw_publish_targeted(target, msg) {
warn!("error" = ?e, "Could not publish message");
}
}
#[instrument(parent = None,
level = "trace",
fields(service = "libp2p"),
name = "libp2p",
skip_all
)]
/// Publish a raw gossipsub RPC message to a specific target.
pub fn publish_raw(&mut self, msg: RawMessage, topic: Topic) {
if let Err(e) = self.gossipsub_mut().raw_publish(topic, msg) {
warn!("error" = ?e, "Could not publish message");
}
}
#[instrument(parent = None,
level = "trace",

View File

@@ -30,6 +30,8 @@ pub struct Context<'a> {
pub fork_context: Arc<ForkContext>,
pub chain_spec: Arc<ChainSpec>,
pub libp2p_registry: Option<&'a mut Registry>,
pub keypair: Keypair,
pub incoming_connections: Option<u32>,
}
type BoxedTransport = Boxed<(PeerId, StreamMuxerBox)>;

View File

@@ -1,7 +1,7 @@
use gossipsub::{IdentTopic as Topic, TopicHash};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use strum::AsRefStr;
use strum::{AsRefStr, IntoStaticStr};
use types::{ChainSpec, DataColumnSubnetId, EthSpec, ForkName, SubnetId, SyncSubnetId, Unsigned};
use crate::Subnet;
@@ -145,7 +145,7 @@ pub struct GossipTopic {
/// Enum that brings these topics into the rust type system.
// NOTE: There is intentionally no unknown type here. We only allow known gossipsub topics.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash, AsRefStr)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash, AsRefStr, IntoStaticStr)]
#[strum(serialize_all = "snake_case")]
pub enum GossipKind {
/// Topic for publishing beacon blocks.

View File

@@ -5,7 +5,7 @@ mod metrics;
mod nat;
mod network_beacon_processor;
mod persisted_dht;
mod router;
pub mod router;
mod status;
mod subnet_service;
mod sync;

View File

@@ -314,6 +314,7 @@ impl<T: BeaconChainTypes> Router<T> {
Response::LightClientBootstrap(_)
| Response::LightClientOptimisticUpdate(_)
| Response::LightClientFinalityUpdate(_)
| Response::Ping(_)
| Response::LightClientUpdatesByRange(_) => unreachable!(),
}
}

View File

@@ -267,6 +267,10 @@ impl<T: BeaconChainTypes> NetworkService<T> {
&beacon_chain.spec,
));
debug!(fork_name = ?fork_context.current_fork_name(), "Current fork");
let keypair = lighthouse_network::load_private_key(&config);
// construct the libp2p service context
let service_context = Context {
config: config.clone(),
@@ -274,6 +278,8 @@ impl<T: BeaconChainTypes> NetworkService<T> {
fork_context: fork_context.clone(),
chain_spec: beacon_chain.spec.clone(),
libp2p_registry,
keypair,
incoming_connections: None,
};
// launch libp2p service
@@ -284,6 +290,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
.data_availability_checker
.custody_context()
.custody_group_count_at_head(&beacon_chain.spec),
None
)
.await?;
@@ -475,6 +482,11 @@ impl<T: BeaconChainTypes> NetworkService<T> {
shutdown_sender: &mut Sender<ShutdownReason>,
) {
match ev {
// mallory event
NetworkEvent::MallorySwarmEvent(_) => {}
// mallory event
NetworkEvent::IdentifyReceived(_) => {}
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
self.send_to_router(RouterMessage::StatusPeer(peer_id));
}

View File

@@ -244,7 +244,7 @@ impl Eth2NetworkConfig {
}
}
fn get_genesis_state_from_bytes<E: EthSpec>(&self) -> Result<BeaconState<E>, String> {
pub fn get_genesis_state_from_bytes<E: EthSpec>(&self) -> Result<BeaconState<E>, String> {
let spec = self.chain_spec::<E>()?;
self.genesis_state_bytes
.as_ref()